-
言語: TypeScript 5.0+(strict mode必須)
-
スタイルガイド: Biome設定準拠
- インデント: 2スペース
- クォート: シングルクォート
- セミコロン: あり
- 行幅: 100文字
-
命名規則:
- 関数名: camelCase
- クラス名: PascalCase
- 定数: UPPER_SNAKE_CASE
- インターフェース: PascalCase(Iプレフィックスなし)
-
ドキュメント: JSDoc形式
-
インポート: 型インポートを使用 (
import type
)
-
単一責任の原則: 各関数/クラスは1つの明確な責任を持つ
-
依存性注入: 外部依存は注入可能にする
-
純粋関数: 可能な限り副作用を避ける
-
早期リターン: ネストを減らし可読性を向上
// 良い例
async function processRequest(request: string): Promise<Result<Output, ToolError>> {
if (!request) {
return err({
code: "INVALID_INPUT",
message: "リクエストが空です",
recoverable: true,
suggestions: ["有効なリクエストを入力してください"]
});
}
try {
const result = await internalProcess(request);
return ok(result);
} catch (error) {
return err({
code: "PROCESSING_ERROR",
message: `処理中にエラーが発生しました: ${error}`,
recoverable: false
});
}
}
/**
* Tool: tool_name
* Purpose: ツールの目的を簡潔に記述
* Author: 開発者名
* Date: YYYY-MM-DD
*/
import type { Result } from 'neverthrow';
import { ok, err } from 'neverthrow';
import type { z } from 'zod';
// 入力型定義
export const ToolInputSchema = z.object({
requiredField: z.string().min(1),
optionalField: z.string().optional(),
});
export type ToolInput = z.infer<typeof ToolInputSchema>;
// 出力型定義
export interface ToolOutput {
result: string;
metadata: Record<string, unknown>;
warnings?: string[];
}
// エラー型定義
export interface ToolError {
code: string;
message: string;
toolName: string;
recoverable: boolean;
suggestions?: string[];
}
// メインツールクラス
export class ToolName {
/**
* ツールの説明
*/
constructor(private readonly config: Record<string, unknown> = {}) {
this.setup();
}
private setup(): void {
// 初期設定
}
/**
* ツールのメイン実行メソッド
* @param inputData - 入力データ
* @returns 成功時は出力、失敗時はエラー
*/
async execute(inputData: unknown): Promise<Result<ToolOutput, ToolError>> {
// 入力検証
const parseResult = ToolInputSchema.safeParse(inputData);
if (!parseResult.success) {
return err({
code: 'INVALID_INPUT',
message: parseResult.error.message,
toolName: this.constructor.name,
recoverable: true,
});
}
const validInput = parseResult.data;
try {
// メイン処理
const result = await this.process(validInput);
// 出力構築
const output: ToolOutput = {
result,
metadata: { processedAt: new Date().toISOString() },
};
console.log(`Successfully processed: ${validInput.requiredField}`);
return ok(output);
} catch (error) {
console.error('Processing failed:', error);
return err({
code: 'PROCESSING_ERROR',
message: error instanceof Error ? error.message : String(error),
toolName: this.constructor.name,
recoverable: false,
});
}
}
private async process(inputData: ToolInput): Promise<string> {
// ここに具体的な処理を実装
return 'processed';
}
}
import type { Result } from 'neverthrow';
import { ok, err } from 'neverthrow';
export class AsyncToolName {
/**
* 非同期処理をサポートするツール
*/
/**
* 非同期実行
*/
async executeAsync(inputData: unknown): Promise<Result<ToolOutput, ToolError>> {
const parseResult = ToolInputSchema.safeParse(inputData);
if (!parseResult.success) {
return err({
code: 'INVALID_INPUT',
message: parseResult.error.message,
toolName: this.constructor.name,
recoverable: true,
});
}
const validInput = parseResult.data;
try {
const client = await this.getClient();
try {
const result = await this.processAsync(client, validInput);
return ok({ result, metadata: {} });
} finally {
await client.close();
}
} catch (error) {
return err({
code: 'ASYNC_ERROR',
message: error instanceof Error ? error.message : String(error),
toolName: this.constructor.name,
recoverable: true,
});
}
}
/**
* ストリーミング実行
*/
async *executeStream(
inputData: ToolInput,
): AsyncGenerator<Result<string, ToolError>> {
try {
for await (const chunk of this.streamProcess(inputData)) {
yield ok(chunk);
}
} catch (error) {
yield err({
code: 'STREAM_ERROR',
message: error instanceof Error ? error.message : String(error),
toolName: this.constructor.name,
recoverable: false,
});
}
}
private async getClient(): Promise<any> {
// クライアント取得ロジック
return {};
}
private async processAsync(client: any, input: ToolInput): Promise<string> {
// 非同期処理ロジック
return 'processed';
}
private async *streamProcess(input: ToolInput): AsyncGenerator<string> {
// ストリーミング処理ロジック
yield 'chunk1';
yield 'chunk2';
}
}
import { describe, it, expect, beforeEach, vi } from 'vitest';
import { ToolName } from './tool-name';
import type { ToolInput } from './tool-name';
describe('ToolName', () => {
let tool: ToolName;
beforeEach(() => {
tool = new ToolName({ testMode: true });
});
describe('execute', () => {
it('正常系: 有効な入力で成功する', async () => {
// Arrange
const inputData: ToolInput = {
requiredField: 'test',
optionalField: 'optional',
};
// Act
const result = await tool.execute(inputData);
// Assert
expect(result.isOk()).toBe(true);
if (result.isOk()) {
expect(result.value.result).toBe('expected_output');
}
});
it('異常系: 無効な入力でエラーを返す', async () => {
// Arrange
const inputData = {}; // 必須フィールドが欠落
// Act
const result = await tool.execute(inputData);
// Assert
expect(result.isErr()).toBe(true);
if (result.isErr()) {
expect(result.error.code).toBe('INVALID_INPUT');
}
});
it('外部サービスのモック', async () => {
// Arrange
const mockService = vi.fn().mockResolvedValue({ status: 'success' });
vi.spyOn(tool as any, 'externalService').mockImplementation(mockService);
const inputData: ToolInput = {
requiredField: 'test',
};
// Act
const result = await tool.execute(inputData);
// Assert
expect(mockService).toHaveBeenCalledOnce();
expect(result.isOk()).toBe(true);
});
});
});
import { describe, it, expect } from 'vitest';
import { AnalyzeRequest } from './analyze-request';
import { GenerateWorkflowStructure } from './generate-workflow-structure';
import { CreateDslYaml } from './create-dsl-yaml';
describe('Tool Integration', () => {
it('複数ツールの連携テスト', async () => {
// Tool 1: 要求分析
const analyzer = new AnalyzeRequest();
const analysisResult = await analyzer.execute({
request: 'FAQボットを作りたい',
});
expect(analysisResult.isOk()).toBe(true);
if (!analysisResult.isOk()) return;
// Tool 2: ワークフロー構造生成
const generator = new GenerateWorkflowStructure();
const structureResult = await generator.execute(analysisResult.value);
expect(structureResult.isOk()).toBe(true);
if (!structureResult.isOk()) return;
// Tool 3: DSL YAML生成
const creator = new CreateDslYaml();
const yamlResult = await creator.execute(structureResult.value);
expect(yamlResult.isOk()).toBe(true);
if (!yamlResult.isOk()) return;
// 最終出力の検証
const yamlContent = yamlResult.value.yaml_content;
expect(yamlContent).toContain('version: 0.3.0');
expect(yamlContent).toContain('kind: app');
});
});
import { createHash } from 'crypto';
import { LRUCache } from 'lru-cache';
export class CachedTool {
/**
* キャッシュを活用するツール
*/
private cache: LRUCache<string, ToolOutput>;
private memoCache = new Map<string, string>();
constructor() {
this.cache = new LRUCache<string, ToolOutput>({
max: 500,
ttl: 1000 * 60 * 5, // 5分間のTTL
});
}
async execute(inputData: ToolInput): Promise<Result<ToolOutput, ToolError>> {
// キャッシュキーの生成
const cacheKey = this.generateCacheKey(inputData);
// キャッシュチェック
const cached = this.cache.get(cacheKey);
if (cached) {
console.log(`Cache hit for key: ${cacheKey}`);
return ok(cached);
}
// 実際の処理
const result = await this.process(inputData);
if (result.isOk()) {
this.cache.set(cacheKey, result.value);
}
return result;
}
private generateCacheKey(inputData: ToolInput): string {
/**
* 安定したキャッシュキーの生成
*/
const keyData = `${inputData.requiredField}:${inputData.optionalField || ''}`;
return createHash('sha256').update(keyData).digest('hex').substring(0, 16);
}
private memoize<T extends (...args: any[]) => any>(
fn: T,
keyFn?: (...args: Parameters<T>) => string,
): T {
/**
* 高コストな操作のメモ化
*/
return ((...args: Parameters<T>) => {
const key = keyFn ? keyFn(...args) : JSON.stringify(args);
if (this.memoCache.has(key)) {
return this.memoCache.get(key);
}
const result = fn(...args);
this.memoCache.set(key, result);
return result;
}) as T;
}
private async process(inputData: ToolInput): Promise<Result<ToolOutput, ToolError>> {
// 実装
return ok({ result: 'processed', metadata: {} });
}
}
import { Worker } from 'worker_threads';
import pLimit from 'p-limit';
export class ParallelTool {
/**
* 並列処理をサポートするツール
*/
private limit: pLimit.Limit;
constructor() {
// 同時実行数を制限
this.limit = pLimit(4);
}
/**
* 複数アイテムの並列処理
*/
async executeParallel(
items: ToolInput[],
): Promise<Result<ToolOutput, ToolError>[]> {
// I/Oバウンドなタスク
if (this.isIoBound()) {
const promises = items.map((item) =>
this.limit(() => this.processAsync(item)),
);
return Promise.all(promises);
}
// CPUバウンドなタスク(Worker Threads使用)
const workers = await Promise.all(
items.map((item) => this.processWithWorker(item)),
);
return workers;
}
private isIoBound(): boolean {
// I/Oバウンドかどうかの判定ロジック
return true;
}
private async processAsync(
input: ToolInput,
): Promise<Result<ToolOutput, ToolError>> {
// 非同期処理
return ok({ result: 'processed', metadata: {} });
}
private async processWithWorker(
input: ToolInput,
): Promise<Result<ToolOutput, ToolError>> {
return new Promise((resolve) => {
const worker = new Worker('./worker.js', {
workerData: { input },
});
worker.on('message', (result) => {
resolve(ok(result));
});
worker.on('error', (error) => {
resolve(
err({
code: 'WORKER_ERROR',
message: error.message,
toolName: 'ParallelTool',
recoverable: false,
}),
);
});
});
}
}
// worker.js
import { parentPort, workerData } from 'worker_threads';
// CPU集約的な処理
const result = heavyComputation(workerData.input);
parentPort?.postMessage(result);
import re
from typing import Any
import html
class SecureTool:
"""セキュリティを重視したツール"""
def _sanitize_input(self, value: Any) -> str:
"""入力のサニタイゼーション"""
if not isinstance(value, str):
value = str(value)
# HTMLエスケープ
value = html.escape(value)
# 危険な文字の除去
value = re.sub(r'[<>\"\';&]', '', value)
# 長さ制限
max_length = 10000
if len(value) > max_length:
value = value[:max_length]
return value
def _validate_yaml_content(self, yaml_str: str) -> bool:
"""YAML内容の安全性チェック"""
dangerous_patterns = [
r'!!\w+', # YAMLタグ
r'__.*__', # Pythonマジックメソッド
r'exec\s*\(', # 実行関数
r'eval\s*\(', # 評価関数
]
for pattern in dangerous_patterns:
if re.search(pattern, yaml_str):
return False
return True
import os
from cryptography.fernet import Fernet
class SecretManager:
"""シークレット管理"""
def __init__(self):
self._key = os.environ.get('ENCRYPTION_KEY', Fernet.generate_key())
self._cipher = Fernet(self._key)
def encrypt_sensitive_data(self, data: str) -> str:
"""機密データの暗号化"""
return self._cipher.encrypt(data.encode()).decode()
def decrypt_sensitive_data(self, encrypted: str) -> str:
"""機密データの復号化"""
return self._cipher.decrypt(encrypted.encode()).decode()
def mask_secrets_in_output(self, output: dict) -> dict:
"""出力内のシークレットをマスク"""
secret_patterns = [
(r'api[_-]?key["\']?\s*[:=]\s*["\']?([^"\'\\s]+)', 'API_KEY_MASKED'),
(r'password["\']?\s*[:=]\s*["\']?([^"\'\\s]+)', 'PASSWORD_MASKED'),
(r'token["\']?\s*[:=]\s*["\']?([^"\'\\s]+)', 'TOKEN_MASKED'),
]
output_str = str(output)
for pattern, replacement in secret_patterns:
output_str = re.sub(pattern, replacement, output_str, flags=re.IGNORECASE)
return eval(output_str) # 実際の実装では安全なパーサーを使用
import structlog
from datetime import datetime
# ロガー設定
logger = structlog.get_logger()
class MonitoredTool:
"""モニタリング機能を持つツール"""
def execute(self, input_data: ToolInput) -> Result[ToolOutput, ToolError]:
# 実行開始ログ
start_time = datetime.now()
request_id = self._generate_request_id()
logger.info(
"tool_execution_started",
tool_name=self.__class__.__name__,
request_id=request_id,
input_size=len(str(input_data)),
timestamp=start_time.isoformat()
)
try:
result = self._process(input_data)
# 成功ログ
duration_ms = (datetime.now() - start_time).total_seconds() * 1000
logger.info(
"tool_execution_completed",
tool_name=self.__class__.__name__,
request_id=request_id,
duration_ms=duration_ms,
success=True
)
return Ok(result)
except Exception as e:
# エラーログ
logger.error(
"tool_execution_failed",
tool_name=self.__class__.__name__,
request_id=request_id,
error_type=type(e).__name__,
error_message=str(e),
traceback=traceback.format_exc()
)
return Err(ToolError(
code="EXECUTION_ERROR",
message=str(e)
))
from prometheus_client import Counter, Histogram, Gauge
import time
# メトリクス定義
tool_executions = Counter(
'tool_executions_total',
'Total number of tool executions',
['tool_name', 'status']
)
tool_duration = Histogram(
'tool_duration_seconds',
'Tool execution duration',
['tool_name']
)
active_executions = Gauge(
'tool_active_executions',
'Number of active tool executions',
['tool_name']
)
class MetricsTool:
"""メトリクス収集機能を持つツール"""
def execute(self, input_data: ToolInput) -> Result[ToolOutput, ToolError]:
tool_name = self.__class__.__name__
# アクティブ実行数を増加
active_executions.labels(tool_name=tool_name).inc()
# 実行時間の計測
with tool_duration.labels(tool_name=tool_name).time():
try:
result = self._process(input_data)
# 成功カウント
tool_executions.labels(
tool_name=tool_name,
status='success'
).inc()
return Ok(result)
except Exception as e:
# 失敗カウント
tool_executions.labels(
tool_name=tool_name,
status='failure'
).inc()
return Err(ToolError(code="ERROR", message=str(e)))
finally:
# アクティブ実行数を減少
active_executions.labels(tool_name=tool_name).dec()
from typing import Protocol
from dataclasses import dataclass
# 依存性のプロトコル定義
class LLMService(Protocol):
def generate(self, prompt: str) -> str: ...
class CacheService(Protocol):
def get(self, key: str) -> Optional[Any]: ...
def set(self, key: str, value: Any) -> None: ...
@dataclass
class ToolDependencies:
"""ツールの依存性コンテナ"""
llm_service: LLMService
cache_service: CacheService
config: dict
class DependentTool:
"""依存性注入を使用するツール"""
def __init__(self, dependencies: ToolDependencies):
self.deps = dependencies
def execute(self, input_data: ToolInput) -> Result[ToolOutput, ToolError]:
# キャッシュチェック
cached = self.deps.cache_service.get(input_data.required_field)
if cached:
return Ok(cached)
# LLMサービスの使用
generated = self.deps.llm_service.generate(
f"Process: {input_data.required_field}"
)
# 結果をキャッシュ
output = ToolOutput(result=generated, metadata={})
self.deps.cache_service.set(input_data.required_field, output)
return Ok(output)
from typing import get_type_hints
import inspect
import json
def generate_tool_documentation(tool_class):
"""ツールのドキュメントを自動生成"""
doc = {
"name": tool_class.__name__,
"description": inspect.getdoc(tool_class) or "No description",
"methods": {}
}
# メソッドの解析
for name, method in inspect.getmembers(tool_class, inspect.ismethod):
if name.startswith('_'):
continue
signature = inspect.signature(method)
doc["methods"][name] = {
"description": inspect.getdoc(method) or "No description",
"parameters": {
param_name: {
"type": str(param.annotation),
"default": str(param.default) if param.default != param.empty else None
}
for param_name, param in signature.parameters.items()
if param_name != 'self'
},
"return_type": str(signature.return_annotation)
}
return doc
# 使用例
if __name__ == "__main__":
doc = generate_tool_documentation(AnalyzeRequest)
print(json.dumps(doc, indent=2, ensure_ascii=False))
import functools
import time
from typing import Callable
def debug_trace(func: Callable) -> Callable:
"""関数の実行をトレースするデコレータ"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
# 実行前
logger.debug(
f"Entering {func.__name__}",
args=args[1:], # selfを除外
kwargs=kwargs
)
start_time = time.time()
try:
result = func(*args, **kwargs)
# 実行後
duration = time.time() - start_time
logger.debug(
f"Exiting {func.__name__}",
duration_seconds=duration,
result_type=type(result).__name__
)
return result
except Exception as e:
# エラー時
duration = time.time() - start_time
logger.error(
f"Error in {func.__name__}",
duration_seconds=duration,
error_type=type(e).__name__,
error_message=str(e)
)
raise
return wrapper
class DebugTool:
"""デバッグ機能付きツール"""
@debug_trace
def execute(self, input_data: ToolInput) -> Result[ToolOutput, ToolError]:
return self._process(input_data)