コンテンツにスキップ

EvoSpikeNet SDK API Reference

Author: Masahiro Aoki

最終更新日: 2026年2月20日 🎯 Feature 13 + 空間生成ニューラル統合 + Feature 36/39/40(自動リカバリー・監査ログ・地理的分散ノード管理)

概要

このドキュメントは、EvoSpikeNet Python SDKの完全なAPIリファレンスを提供します。SDKはREST APIクライアントとして実装されており、テキスト生成、分散脳シミュレーション、空間認知処理(Rank 12-15 Feature 13)、アーティファクト管理、RAG/ベクター検索(バージョン付きインデックス)などの機能をサポートしています。

クラス構造

EvoSpikeNetAPIClient

メインのAPIクライアントクラスです。HTTPリクエストの管理、エラーハンドリング、接続プーリングを提供します。

コンストラクタ

EvoSpikeNetAPIClient(
    base_url: str = "http://localhost:8000",
    spatial_base_url: Optional[str] = None,
    rag_base_url: Optional[str] = None,
    api_key: Optional[str] = None,
    timeout: int = 60,
    max_retries: int = 3,
    session: Optional[requests.Session] = None,
)

パラメータ: - base_url: APIサーバーのベースURL - spatial_base_url: 空間生成サービス(FastAPI版)のベースURL。未指定時は base_url を使用。 - rag_base_url: RAGサービスのベースURL。未指定時は base_url を使用。 - api_key: API認証キー - timeout: リクエストタイムアウト(秒) - max_retries: 最大リトライ回数 - session: カスタムrequestsセッション

基本メソッド

generate(prompt: str, max_length: int = 50) -> Dict[str, str]

テキスト生成を実行します。

パラメータ: - prompt: 入力プロンプト - max_length: 生成する最大文字数

戻り値: 生成結果を含む辞書

例:

result = client.generate("Hello, world!", max_length=100)
print(result['generated_text'])

LocalMemoryClient(オフラインヘルパー)

HTTP APIを使わずにローカルで長期記憶モジュールとスパイク圧縮レイヤを試す軽量ラッパー。

from evospikenet.sdk import LocalMemoryClient
import torch

client = LocalMemoryClient(state_dim=16, time_steps=6, embedding_dim=16, max_traces=32)
spikes = torch.rand(6, 16)
summary = client.store_spike_episode(
    spike_sequence=spikes,
    context={"scenario": "demo"},
    action="look",
    outcome="noted",
    reward=0.2,
    semantic_tags=["demo_concept"],
)
stats = client.reservoir_stats()
result = client.retrieve({"scenario": "demo", "semantic_tags": ["demo_concept"]}, top_k=1)

関連実装: evospikenet/snn_memory_extension.py, evospikenet/long_term_memory.py, evospikenet/forgetting_controller.py

submit_prompt(prompt: Optional[str] = None, image_path: Optional[str] = None, audio_path: Optional[str] = None) -> Dict

マルチモーダルプロンプトを分散脳シミュレーションに送信します。

運用メモ(2026-04-20): - 分散環境でのASR実行経路はサーバー環境変数 VIDEO_ANALYSIS_ASR_BACKEND に依存します(asr_fallback / whisper_real)。 - Whisperのモデル/デバイスは VIDEO_ANALYSIS_WHISPER_MODEL / VIDEO_ANALYSIS_WHISPER_DEVICE で制御します。 - 本変更はサーバー構成の拡張であり、SDKメソッドのシグネチャ変更はありません。

パラメータ: - prompt: テキストプロンプト - image_path: 画像ファイルのパス - audio_path: 音声ファイルのパス

戻り値: 送信確認を含む辞書

get_simulation_status() -> Dict

分散脳シミュレーションの現在の状態を取得します。

戻り値: 全ノードの状態を含む辞書

get_simulation_result() -> Dict

完了したシミュレーションの最新結果を取得します。

戻り値: 応答を含む辞書、または結果が利用できない場合はNone

poll_for_result(timeout: int = 120, interval: int = 5) -> Optional[Dict[str, Any]]

結果が利用可能になるまでポーリングします。

パラメータ: - timeout: 最大待機時間(秒) - interval: ポーリング間隔(秒)

戻り値: 結果辞書、またはタイムアウト時はNone

進化/ゲノム管理メソッド

list_genomes() -> List[Dict[str, Any]]

保存済みゲノムを一覧し、name / generation / fitness などのメタデータを取得します。

get_genome(genome_name: str) -> Dict[str, Any]

指定したゲノムファイルのペイロードを取得します。

save_genome(genome_name: str, genome: Dict[str, Any], make_active: bool = False) -> Dict[str, Any]

ゲノムを保存します。make_active=Trueactive_genome.json に昇格します。

apply_genome(genome_name: str) -> Dict[str, Any]

既存のゲノムをアクティブ化します(ペイロード編集なし)。

使用例:

from evospikenet.sdk import EvoSpikeNetAPIClient

# Example: use EvoSpikeNetAPIClient to manage genomes
client = EvoSpikeNetAPIClient(base_url="http://localhost:8000")
genomes = client.list_genomes()
if genomes:
    target = genomes[0]["name"]
    payload = client.get_genome(target)
    payload.setdefault("metadata", {})["note"] = "edited via SDK"
    client.save_genome(target, payload, make_active=True)
    client.apply_genome(target)
else:
    print("No genomes found on the server")

関連サンプル: examples/genome_management_sdk.py

アーティファクト管理メソッド

upload_artifact(session_id: str, artifact_type: str, name: str, file: io.BytesIO, **metadata) -> Dict

アーティファクトファイルをアップロードします。

パラメータ: - session_id: セッションID - artifact_type: アーティファクトタイプ ('model', 'log', 'config' など) - name: アーティファクト名 - file: アップロードするファイルバッファ - **metadata: 追加のメタデータ(llm_type, node_type, model_category, model_variant)

戻り値: アップロード結果を含む辞書

list_artifacts(artifact_type: Optional[str] = None) -> Union[List[Dict[str, Any]], Dict[str, Any]]

アーティファクトを一覧表示します。

パラメータ: - artifact_type: フィルタリングするアーティファクトタイプ

戻り値: アーティファクトのリストまたは辞書

download_artifact(artifact_id: str, destination_path: str) -> None

アーティファクトをダウンロードします。

パラメータ: - artifact_id: ダウンロードするアーティファクトのID - destination_path: 保存先ファイルパス

ログ管理メソッド

create_log_session(description: str) -> Dict

新しいログセッションを作成します。

パラメータ: - description: セッションの説明

戻り値: セッション情報

get_remote_log(user: str, ip: str, key_path: str, log_file_path: str) -> Dict

リモートノードからログを取得します。

パラメータ: - user: SSHユーザー名 - ip: リモートホストのIPアドレス - key_path: SSH秘密鍵のパス - log_file_path: リモートログファイルの絶対パス

戻り値: ログ内容を含む辞書

バッチ処理メソッド

batch_generate(prompts: List[str], max_length: int = 50) -> List[Dict[str, str]]

複数のプロンプトを順次処理します。

パラメータ: - prompts: 処理するプロンプトのリスト - max_length: 各生成の最大文字数

戻り値: 各プロンプトの結果リスト

ヘルパーメソッド

health_check() -> Dict[str, Any]

APIサーバーのヘルスチェックを実行します。

戻り値: ヘルス状態を含む辞書

マルチ言語バインディング

Python SDKには Go および TypeScript 向けの軽量プロキシ実装が含まれています。これらは本来別リポジトリで Go/JS 実装が存在する予定ですが、 Python 環境でも簡単に import できるように機能を模倣したものです。

Go SDK プロキシ

from evospikenet import sdk_go
client = sdk_go.init_client("http://localhost:8000", api_key="KEY")
path = client.download_file("foo.bin", dest="/tmp/foo.bin")
メソッド 説明
init_client(endpoint: str, api_key: Optional[str]=None) Goクライアントオブジェクトを返します
GoSDKClient.download_file(filename, dest=None, timeout=60) EvoSpikeNet API からファイルを取得します

TypeScript SDK プロキシ

from evospikenet import sdk_ts
client = sdk_ts.initClient("https://api", apiKey=None)
client.downloadFile("foo", dest="out.bin")
メソッド 説明
initClient(endpoint: str, apiKey: Optional[str]=None) TS/JSクライアントオブジェクトを返します
TSSDKClient.downloadFile(filename, dest=None, timeout=60) 同上 (camelCase命名)

これらのクライアントは内部的に requests を使って HTTP 通信を行い、Python テストやドキュメントのサンプルで利用可能です。

node_discovery_health() -> Dict[str, Any]

ノード検出サービスから現在のノード一覧とサマリを取得します。内部では /api/node-discovery/health エンドポイントを呼び出します。

戻り値: ノード状態 (nodes, summary, updated_at など) を含む辞書

node_discovery_topology() -> Dict[str, Any]

ノード検出サービスからネットワークトポロジー情報を取得します。内部では /api/node-discovery/topology エンドポイントを呼び出します。

戻り値: nodesedges を含むトポロジー辞書

spatial_generate(input_text: str, cognitive_context: Optional[Dict[str, Any]] = None, eeg_data: Optional[Dict[str, Any]] = None, timeout: int = 30) -> Dict[str, Any]

空間生成サービス /generate を呼び出し、シーン仕様(scene_type, objects, spatial_layout, physics, lighting, navigation, metadata など)を取得します。

spatial_health(timeout: int = 10) -> Dict[str, Any]

空間生成サービス /health を呼び出し、neural_components_availablecomponentsmetricsknowledge_base_loaded などの状態を取得します。

rag_upload_file(
file_path: str,
stream: bool = False,
background: bool = False,
init: bool = False,
session_id: Optional[str] = None,
final: bool = False,
timeout: int = 120,

) -> Dict[str, Any]

RAGサービス /upload_file にファイルを投入し、doc_keyversion を受け取ります。 追加のフラグにより一時セッションやバックグラウンドジョブの制御が可能です:

  • stream: ドキュメントパーサをストリーミングモードで動作させます
  • background: 解析・インデックス処理を Celery バックグラウンドジョブとして行います (バックエンドは job_id を生成し upload_status で追跡可能)

Note: クエリパラメータ job_id を指定して同じインスタンスの APIを呼び出すと、処理中のチャンク進捗が upload_jobs に反映され るため、通知やログ用フックを簡単に組み込めます。 - init: マルチパートアップロードセッションを開始し、以降の session_id 指定・final 付きパーツで同一バージョンを構築できます - session_id/final: 既存セッションにファイルパーツを追加し、最終パートでは final=True にしてセッションを終了

サーバー側で自動的に:

  1. MIME/拡張子検証
  2. ドキュメント解析 (PDF/Word/Excel/PPT/Markdown 等)
  3. トークンベース自動チャンク化 (chunk_text_auto)
  4. 埋め込み生成 ・ Milvus/Elasticsearch へのインデックス
  5. バージョン管理 (doc_key による履歴)

ストリーミングや分割アップロードは大容量 (1GB+) ドキュメントのメモリ効率を高め、 バックグラウンドモードは処理待ちを不要にします。

戻り値例: {"doc_key":"sample.pdf","version":3,"chunks_indexed":42,...}

rag_upload_file_async(
file_path: str,
stream: bool = False,
background: bool = False,
init: bool = False,
session_id: Optional[str] = None,
final: bool = False,
timeout: int = 120,

) -> Dict[str, Any]

Asynchronous counterpart to :func:rag_upload_file offering the same parameter set and return structure. Useful when integrating with asyncio-based workflows.

create_batch_job(payload: Dict[str, Any], timeout: int = 30) -> Dict[str, Any]

Create a new batch ingestion job on the RAG service. The payload may include arbitrary job parameters such as file location, parsing options, metadata, etc. The server responds with a job_id and initial status (queued).

cancel_batch_job(job_id: str, timeout: int = 10) -> Dict[str, Any]

Request cancellation of an existing batch job previously created via :func:create_batch_job. The returned dictionary echoes the job_id and updated status (cancelled).

rag_query(query: str, llm_type: str = "huggingface", hf_model_name: Optional[str] = None, timeout: int = 60) -> Dict[str, Any]

RAGサービス /query で検索・生成を実行します。context(ヒット文書)、必要に応じて sdk_response(EvoSDK生成結果)を返します。サーバーは同時にバージョンごとの差分比較用パッチを生成し、/document_chunks と組み合わせればクライアント側で差分UIを構築できます。

Note: Large-document streaming ingestion (split‑upload) and advanced diff viewer support are detailed in docs/RAG_SYSTEM_DETAILED.md §7.6.

rag_query(query: str, llm_type: str = "huggingface", hf_model_name: Optional[str] = None, timeout: int = 60) -> Dict[str, Any]

RAGサービス /query で検索・生成を実行します。context(ヒット文書)、必要に応じて sdk_response(EvoSDK生成結果)を返します。

get_document_versions(doc_key: str) -> List[Dict[str, Any]]

ドキュメントの利用可能なバージョン一覧を返します。各項目は version, checksum, chunk_count, indexed_at を含みます。

rag_init_session(file_path: str, timeout: int = 30) -> Dict[str, Any]

Start a new upload session. Returns a session identifier along with the determined doc_key and tentative version. Use this id for subsequent the following API calls:

rag_upload_part(session_id: str, file_path: str, final: bool = False, timeout: int = 120) -> Dict[str, Any]

Upload a single chunk belonging to an open session. final=True signals the last part and causes cleanup of the session state on the server.

upload_status(job_id: str) -> Dict[str, Any]

Check status of an asynchronous upload job (see /upload_file?background=true). Returns a dictionary with job_id, status (queued, running, completed, failed), and optional progress percentage.

get_document_chunks(doc_key: str, version: int) -> List[Dict[str, Any]]

指定した doc_keyversion に関連するチャンク(順序付き)を返します。各チャンクは chunk_id, content, source_filename を持ちます。

get_server_info() -> Optional[Dict]

サーバー情報とロードされたモデルを取得します。

戻り値: サーバー情報、または利用できない場合はNone

wait_for_server(timeout: int = 60, interval: int = 2) -> bool

APIサーバーが正常になるまで待機します。

パラメータ: - timeout: 最大待機時間(秒) - interval: ヘルスチェック間隔(秒)

戻り値: サーバーが正常になった場合はTrue

validate_prompt(prompt: str) -> bool

プロンプトが送信に適しているかを検証します。

パラメータ: - prompt: 検証するプロンプト

戻り値: 有効な場合はTrue

with_error_handling(func: Callable[..., Any], args, retries: int = 3, *kwargs) -> Any

自動リトライ付きで関数を実行します。

パラメータ: - func: 実行する関数 - retries: リトライ回数 - *args: 関数に渡す位置引数 - **kwargs: 関数に渡すキーワード引数

戻り値: 関数の戻り値、または失敗時はNone

統計・監視メソッド

get_stats() -> Dict[str, Any]

クライアントの使用統計を取得します。

戻り値: 統計情報(リクエスト数、エラー数、平均遅延など)

reset_stats() -> None

統計カウンターをリセットします。

遅延監視メソッド

get_latency_stats() -> Dict

全コンポーネントの遅延統計を取得します。

戻り値: 遅延統計データ

check_latency_target() -> Dict

遅延ターゲットが満たされているかをチェックします。

戻り値: チェック結果の詳細

スナップショット管理メソッド

create_snapshot(snapshot_name: str, include_models: bool = True, include_data: bool = True, compression_level: int = 6) -> Dict

システムのスナップショットを作成します。

パラメータ: - snapshot_name: スナップショット名 - include_models: モデルを含めるか - include_data: データを含めるか - compression_level: 圧縮レベル(0-9)

戻り値: 作成結果

list_snapshots() -> Dict

利用可能なスナップショットを一覧表示します。

戻り値: スナップショットリスト

restore_snapshot(snapshot_path: str, restore_models: bool = True, restore_data: bool = True) -> Dict

スナップショットから復元します。

パラメータ: - snapshot_path: 復元するスナップショットのパス - restore_models: モデルを復元するか - restore_data: データを復元するか

戻り値: 復元結果

delete_snapshot(snapshot_path: str) -> Dict

スナップショットを削除します。

パラメータ: - snapshot_path: 削除するスナップショットのパス

戻り値: 削除結果

validate_snapshot(snapshot_path: str) -> Dict

スナップショットの妥当性を検証します。

パラメータ: - snapshot_path: 検証するスナップショットのパス

戻り値: 検証結果

cleanup_snapshots(max_age_days: int = 30) -> Dict

古いスナップショットをクリーンアップします。

パラメータ: - max_age_days: 保持する最大日数

戻り値: クリーンアップ結果

スケーラビリティテストメソッド

run_scalability_test(max_nodes: int = 50, test_duration: int = 30) -> Dict

スケーラビリティテストを実行します。

パラメータ: - max_nodes: 最大ノード数 - test_duration: テスト継続時間(秒)

戻り値: テスト結果

get_scalability_results() -> Dict

スケーラビリティテストの結果を取得します。

戻り値: テスト結果データ

get_scalability_status() -> Dict

スケーラビリティテストの状態を取得します。

戻り値: 現在の状態

test_node_scalability(node_counts: List[int], test_duration: float = 60.0) -> Dict

異なるノード数でのスケーラビリティをテストします。

パラメータ: - node_counts: テストするノード数のリスト - test_duration: 各テストの継続時間

戻り値: テスト結果

get_resource_usage() -> Dict

リソース使用状況を取得します。

戻り値: リソース使用統計

run_stress_test(intensity: str = "high", duration: float = 120.0) -> Dict

ストレステストを実行します。

パラメータ: - intensity: テスト強度 ("low", "medium", "high") - duration: テスト継続時間(秒)

戻り値: テスト結果

get_system_limits() -> Dict

システム制限を取得します。

戻り値: システム制限情報

ハードウェア最適化メソッド

optimize_model(model_type: str, optimizations: Optional[List[str]] = None) -> Dict

モデルをハードウェア用に最適化します。

パラメータ: - model_type: モデルタイプ - optimizations: 適用する最適化のリスト

戻り値: 最適化結果

benchmark_model(model_type: str, num_runs: int = 50) -> Dict[str, Any]

モデルのベンチマークを実行します。

パラメータ: - model_type: ベンチマークするモデルタイプ - num_runs: 実行回数

戻り値: ベンチマーク結果

get_hardware_info() -> Dict

ハードウェア情報を取得します。

戻り値: ハードウェア情報

高可用性監視メソッド

get_availability_status() -> Dict

可用性状態を取得します。

戻り値: 可用性状態

get_availability_stats(time_window: str = "24h") -> Dict

可用性統計を取得します。

パラメータ: - time_window: 統計時間枠(例: "24h", "7d")

戻り値: 可用性統計

perform_health_check() -> Dict

ヘルスチェックを実行します。

戻り値: ヘルスチェック結果

trigger_recovery_action(action_type: str, parameters: Optional[Dict[str, Any]] = None) -> Dict

回復アクションをトリガーします。

パラメータ: - action_type: アクションタイプ - parameters: アクションパラメータ

戻り値: アクション結果

get_availability_alerts(limit: int = 50) -> Dict

可用性アラートを取得します。

パラメータ: - limit: 取得するアラート数の上限

戻り値: アラートリスト

schedule_maintenance(start_time: str, duration_minutes: int, reason: str) -> Dict

メンテナンスをスケジュールします。

パラメータ: - start_time: 開始時刻(ISO形式) - duration_minutes: 継続時間(分) - reason: メンテナンス理由

戻り値: スケジュール結果

非同期Zenoh通信メソッド

connect_zenoh(node_id: str = "api_node") -> Dict

Zenohネットワークに接続します。

パラメータ: - node_id: ノード識別子

戻り値: 接続結果

publish_zenoh_message(topic: str, payload: Any, priority: str = "normal", message_type: str = "notification", node_id: str = "api_node") -> Dict

Zenohメッセージをパブリッシュします。

パラメータ: - topic: メッセージトピック - payload: メッセージペイロード - priority: メッセージ優先度 - message_type: メッセージタイプ - node_id: 送信元ノードID

戻り値: パブリッシュ結果

send_zenoh_request(target_node: str, request: Any, timeout: float = 5.0, node_id: str = "api_node") -> Dict

Zenohリクエストを送信します。

パラメータ: - target_node: ターゲットノードID - request: リクエストデータ - timeout: タイムアウト(秒) - node_id: 送信元ノードID

戻り値: レスポンス

send_zenoh_notification(target_nodes: List[str], notification: Any, priority: str = "normal", node_id: str = "api_node") -> Dict

Zenoh通知を送信します。

パラメータ: - target_nodes: ターゲットノードIDのリスト - notification: 通知データ - priority: 通知優先度 - node_id: 送信元ノードID

戻り値: 送信結果

get_zenoh_stats(node_id: str = "api_node") -> Dict

Zenoh統計を取得します。

パラメータ: - node_id: ノードID

戻り値: Zenoh統計

set_aeg_comm_config(node_id: str, enable_comm: bool = True, energy_threshold: float = 10.0, critical_modalities: List[str] = None, force_change_threshold: float = 10.0) -> Dict

AEG-Comm通信最適化設定を行います。

パラメータ: - node_id: 対象ノードID - enable_comm: 通信最適化有効/無効 - energy_threshold: エネルギー閾値 - critical_modalities: 重要モダリティリスト - force_change_threshold: 強制変更閾値

戻り値: 設定結果

get_communication_stats(node_id: str = "api_node") -> Dict

AEG-Comm通信統計を取得します。

パラメータ: - node_id: ノードID

戻り値: 通信統計(送信率、削減率、遅延、エラー率)

get_aeg_comm_status(node_id: str = "api_node") -> Dict

AEG-Commの現在のステータスを取得します。

パラメータ: - node_id: ノードID

戻り値: AEG-Commステータス(活性ノード数、エネルギーレベル、クリティカルパケット数)

分散コンセンサスメソッド

propose_consensus_decision(decision_type: str, payload: Any, priority: int = 1, dependencies: Optional[List[str]] = None) -> Dict

コンセンサス決定を提案します。

パラメータ: - decision_type: 決定タイプ - payload: 決定ペイロード - priority: 優先度 - dependencies: 依存関係のリスト

戻り値: 提案結果

get_consensus_result(proposal_id: str, timeout: float = 30.0) -> Dict

コンセンサス結果を取得します。

パラメータ: - proposal_id: 提案ID - timeout: 待機タイムアウト(秒)

戻り値: コンセンサス結果

update_node_status(node_id: str, active: bool) -> Dict

ノード状態を更新します。

パラメータ: - node_id: ノードID - active: アクティブ状態

戻り値: 更新結果

get_consensus_stats() -> Dict

コンセンサス統計を取得します。

戻り値: 統計データ

cleanup_consensus(max_age: float = 300.0) -> Dict

古いコンセンサスデータをクリーンアップします。

パラメータ: - max_age: 保持する最大時間(秒)

戻り値: クリーンアップ結果

アーティファクト拡張メソッド

get_session_artifacts(session_id: str) -> Any

セッションのアーティファクトを取得します。

パラメータ: - session_id: セッションID

戻り値: アーティファクトデータ

get_artifact(artifact_id: str, destination_path: Optional[str] = None) -> bytes

アーティファクトを取得します。

パラメータ: - artifact_id: アーティファクトID - destination_path: 保存先パス(オプション)

戻り値: アーティファクトデータ

非同期メソッド

generate_async(prompt: str, max_length: int = 50) -> Dict[str, str]

非同期テキスト生成。

submit_prompt_async(prompt: Optional[str] = None, image_path: Optional[str] = None, audio_path: Optional[str] = None) -> Dict

非同期マルチモーダルプロンプト送信。

health_check_async() -> Dict[str, Any]

非同期ヘルスチェック。

Jupyter統合クラス

JupyterAPIClient

Jupyter Notebook環境向けの拡張クライアント。

追加メソッド

set_display_mode(mode: str) -> None

出力表示モードを設定します。

パラメータ: - mode: 表示モード ("html", "json", "text")

show_server_info() -> None

サーバー情報をリッチ形式で表示します。

show_stats() -> None

クライアント統計をリッチ形式で表示します。

validate_prompt_interactive(prompt: str) -> bool

プロンプトをインタラクティブに検証します。

WebSocketクライアント

WebSocketClient

リアルタイム通信用のWebSocketクライアント。

コンストラクタ

WebSocketClient(
    ws_url: str = "ws://localhost:8000/ws",
    reconnect_attempts: int = 5,
    reconnect_delay: float = 2.0,
)

メソッド

connect() -> None

WebSocket接続を確立します。

disconnect() -> None

WebSocket接続を閉じます。

send_message(message: Dict[str, Any]) -> None

メッセージを送信します。

receive_message() -> Dict[str, Any]

メッセージを受信します。

register_handler(message_type: str, handler: Callable) -> None

メッセージハンドラーを登録します。

listen() -> None

メッセージをリッスンします。

エラーハンドリング

EvoSpikeNetAPIError

API関連の例外クラス。

属性

  • error_info: ErrorInfoオブジェクト
  • message: エラーメッセージ
  • status_code: HTTPステータスコード(オプション)
  • details: 追加のエラー詳細

ErrorInfo

エラー情報を格納するデータクラス。

属性

  • error_type: エラータイプ
  • message: エラーメッセージ
  • details: 追加の詳細情報
  • retry_after: リトライまでの待機時間(秒)
  • status_code: HTTPステータスコード

L5 自己進化 / 自己修復 API

最終更新日: 2026-03-11 (v2.2.0)

L5 自己進化システムのコアクラス群。ゲノム、進化エンジン、適応度評価、メモリ管理を「直接 Python import」で利用する場合のリファレンス。

GenomePoolevospikenet.genome_pool

from evospikenet.genome_pool import GenomePool, SelectionConfig, MutationConfig, CrossoverConfig
from evospikenet.snapshot_recovery import SnapshotManager

コンストラクタ

GenomePool(
    population_size: int = 50,
    selection_config: Optional[SelectionConfig] = None,
    mutation_config: Optional[MutationConfig] = None,
    crossover_config: Optional[CrossoverConfig] = None,
    seed: Optional[int] = None,
    snapshot_manager: Optional[SnapshotManager] = None,  # v2.2.0新規
)

パラメータ: - population_size: ポピュレーションサイズ - selection_config: 選択戦略設定 (tournament / roulette / rank) - mutation_config: 変異設定 (変異率・構造変異率等) - crossover_config: 交叉設定 (uniform / single_point / two_point) - snapshot_manager: 世代更新後に自動スナップショットを作成する SnapshotManager インスタンス

evolve_generation() -> PopulationStats

1世代進化し、統計を返す非同期メソッド。 snapshot_manager が設定されている場合、世代更新後に create_snapshot("post_generation_{N}") を自動呼び出す。

使用例:

import asyncio
from evospikenet.genome_pool import GenomePool
from evospikenet.snapshot_recovery import SnapshotManager

manager = SnapshotManager(snapshot_dir="/tmp/evo_snapshots")
pool = GenomePool(population_size=50, snapshot_manager=manager)
pool.initialize_population()

async def run():
    for _ in range(10):
        pool.update_fitness(pool.genomes[0].genome_id, 0.95)
        stats = await pool.evolve_generation()
        print(f"Gen {stats.generation}: best={stats.max_fitness:.3f}")

asyncio.run(run())


MutationEngineevospikenet.evolution_engine

from evospikenet.evolution_engine import MutationEngine
from evospikenet.advanced_mutations import AdvancedMutationEngine, AdaptiveMutationConfig

コンストラクタ

MutationEngine(
    base_mutation_rate: float = 0.05,
    mutation_strength: float = 0.1,
    advanced_engine: Optional[AdvancedMutationEngine] = None,  # v2.2.0新規
)

パラメータ: - base_mutation_rate: 遗伝子ごとの変異確率 - mutation_strength: パラメータ変化の大きさ - advanced_engine: 基本変異完了後に apply_mutations() を呼び出す構造変異エンジン。None の場合は従来の基本変異のみ実行

mutate_genome(genome: EvoGenome) -> EvoGenome

ゲノム全体に変異を適用し、コピーを返す。 advanced_engine が設定されている場合、基本変異の後に高度構造変異(レイヤ追加/削除、スキップ接続、プルーニング等)を適用。

使用例:

from evospikenet.evolution_engine import MutationEngine
from evospikenet.advanced_mutations import AdvancedMutationEngine, AdaptiveMutationConfig

advanced = AdvancedMutationEngine(AdaptiveMutationConfig(base_rate=0.1))
engine = MutationEngine(base_mutation_rate=0.05, advanced_engine=advanced)

mutated = engine.mutate_genome(genome)


AdvancedMutationEngineevospikenet.advanced_mutations

from evospikenet.advanced_mutations import (
    AdvancedMutationEngine, AdaptiveMutationConfig, MutationType
)

コンストラクタ

AdvancedMutationEngine(config: AdaptiveMutationConfig = AdaptiveMutationConfig())

apply_mutations(genome, mutation_types=None, performance_data=None) -> EvoGenome

10種類の構造変異をゲノムに適用。

MutationType 内容
ADD_LAYER_SMART 数層の中間にレイヤを挿入、接続行列を自動更新
REMOVE_LAYER_SMART 最小重要レイヤを削除、接続をブリッジ
ADD_SKIP_CONNECTION 最長スキップ接続を追加
REMOVE_SKIP_CONNECTION ランダムにスキップ接続を削除
PRUNE_CONNECTIONS 重要度が下位20%の接続を2切り
GROW_CONNECTIONS 短距離から順に10%の接続を追加
MODULE_DUPLICATION モジュール(染色体)を複製
MODULE_FUSION 2モジュールを結合
ADAPTIVE_LAYER_SIZE アクティビティに応じてレイヤサイズを増減
RECURRENT_CONNECTION 再帰接続を追加

update_mutation_rates(generation, fitness_improvements)

各変異タイプの成功率に応じて変異率を適応的に更新。


FitnessEvaluatorevospikenet.fitness_evaluator

from evospikenet.fitness_evaluator import FitnessEvaluator, TaskBenchmark

_evaluate_robustness(genome, brain) -> Dict[str, float]

v2.2.0 でプレースホルダーから実装に置き換え。

brain が提供されている場合:

ノイズ注入テスト (σ=0.2)
  クリーン入力とノイズ入力の出力差分 → noise_resistance

障害許容性テスト
  中間層の重みを10%マスク後に出力差分 → failure_tolerance

brainNone の場合:

構造的プロキシ
  スキップ接続数 → noise_resistance (0.5 + skips * 0.05)
  再帰接続数 → stability      (0.6 + recurrent * 0.04)
  層深度         → failure_tolerance (深すぎる/浅すぎると下がる)

戸り値: {"noise_resistance": float, "failure_tolerance": float, "stability": float}


rollback_to_snapshot()evospikenet.rollback

from evospikenet.rollback import rollback_to_snapshot, rollback_version

rollback_to_snapshot(snapshot_id, manager, target_path) -> Path

SnapshotManager からスナップショットをIDにより検索し、target_path に復元する便利関数。

パラメータ: - snapshot_id: strSnapshotManager.create_snapshot() が返すスナップショットID - manager: SnapshotManager — スナップショットメタデータを保持するインスタンス - target_path: str | Path — 復元先パス

戸り値: 復元されたファイルの Path

例外: DataLoadError — IDが不明またはファイルが存在しない場合

使用例:

from evospikenet.rollback import rollback_to_snapshot
from evospikenet.snapshot_recovery import SnapshotManager

manager = SnapshotManager(snapshot_dir="/tmp/evo_snapshots")

# スナップショット一覧を取得し最新を復元
 snapshots = manager.list_snapshots()
if snapshots:
    latest = sorted(snapshots, key=lambda x: x["timestamp"], reverse=True)[0]
    restored = rollback_to_snapshot(
        snapshot_id=latest["id"],
        manager=manager,
        target_path="/tmp/restored_state.gz",
    )
    print(f"復元完了: {restored}")

rollback_version(version_entry, target_path) -> Path

履歴エントリー辞書ファイルを直接指定して復元する低レベル関数。

パラメータ: - version_entry: Dict"path" キーを含む辞書 - target_path: str | Path — 復元先パス


データ型

Priority

優先度を表す列挙型。

  • LOW = "low"
  • NORMAL = "normal"
  • HIGH = "high"
  • CRITICAL = "critical"

MessageType

メッセージタイプを表す列挙型。

  • NOTIFICATION = "notification"
  • REQUEST = "request"
  • RESPONSE = "response"
  • EVENT = "event"

OptimizationType

最適化タイプを表す列挙型。

  • SPEED = "speed"
  • MEMORY = "memory"
  • ACCURACY = "accuracy"
  • POWER = "power"

ArtifactType

アーティファクトタイプを表す列挙型。

  • MODEL = "model"
  • LOG = "log"
  • CONFIG = "config"
  • DATA = "data"
  • METRICS = "metrics"

使用例

基本的な使用例

<!-- TODO: update or remove - import faileNetAPIClient -->

# クライアント初期化
client = EvoSpikeNetAPIClient(base_url="http://localhost:8000")

# テキスト生成
result = client.generate("Hello, world!", max_length=100)
print(result['generated_text'])

# マルチモーダル処理
response = client.submit_prompt(
    prompt="この画像を説明してください",
    image_path="image.jpg"
)
result = client.poll_for_result(timeout=60)
print(result['response'])

アーティファクト管理

# アーティファクトアップロード
with open('model.pkl', 'rb') as f:
    file_data = io.BytesIO(f.read())
    result = client.upload_artifact(
        session_id="session_123",
        artifact_type="model",
        name="trained_model",
        file=file_data,
        model_category="classification"
    )

# アーティファクト一覧
artifacts = client.list_artifacts(artifact_type="model")

# アーティファクトダウンロード
client.download_artifact("artifact_id_123", "downloaded_model.pkl")

エラーハンドリング

<!-- モジュール 'evospikenet' が見つかりませんパッケージ内の移動/名前変更を確認してください -->
<!-t.generate("Test prompt")
except EvoSpikeNetAPIError as e:
    print(f"API Error: {e.error_info.message}")
    if e.error_info.retry_after:
        print(f"Retry after: {e.error_info.retry_after}s")

Jupyter Notebookでの使用

<!-- TODO: update<!-- モジュール 'evospikenet' が見つかりませんパッケージ内の移動/名前変更を確認してください -->kenet.sdk_jupyter import Jup定
client.set_display_mode("html")

# サーバー情報表示
client.show_server_info()

# 統計表示
client.show_stats()

WebSocketを使用したリアルタイム通信

import asyncio
<!-- TODO: update or remove - impo<!-- モジュール 'evospikenet' が見つかりませんパッケージ内の移動/名前変更を確認してください -->WebSocketClient -->

async def main():
    client = e_message(msg):
        print(f"Received: {msg}")

    client.register_handler("notification", handle_message)

    # メッセージ送信
    await client.send_message({
        "type": "notification",
        "data": "Hello from SDK!"
    })

    # リッスン開始
    await client.listen()

asyncio.run(main())

ベストプラクティス

  1. 接続管理: 長時間実行するアプリケーションでは、セッションを再利用してください。
  2. エラーハンドリング: 常に適切な例外処理を実装してください。
  3. タイムアウト: ネットワーク条件に応じて適切なタイムアウトを設定してください。
  4. リトライ: 一時的なエラーに対しては自動リトライを使用してください。
  5. リソース管理: 大きなファイルを扱う場合は、ストリーミングを使用してください。
  6. 認証: APIキーは環境変数で管理してください。
  7. 監視: 定期的にクライアント統計を確認してください。

トラブルシューティング

一般的な問題

  1. 接続エラー: APIサーバーが起動しているか確認してください。
  2. 認証エラー: APIキーが正しく設定されているか確認してください。
  3. タイムアウト: ネットワーク条件やサーバー負荷を確認してください。
  4. メモリエラー: 大きなファイルを扱う場合は、ストリーミングを使用してください。

デバッグ情報

# クライアント統計の確認
stats = client.get_stats()
print(f"Requests: {stats['requests']}")
print(f"Errors: {stats['errors']}")
print(f"Average latency: {stats['average_latency']:.3f}s")

# サーバー情報の確認
info = client.get_server_info()
if info:
    print(f"Server version: {info.get('version')}")
    print(f"Loaded models: {info.get('models', [])}")

分散coordinatorメソッド

init_coordinator(node_id: str, zenoh_config: Optional[Dict[str, Any]] = None, raft_config: Optional[Dict[str, Any]] = None) -> None

分散coordinatorを初期化します。

パラメータ: - node_id: このノードの一意な識別子 - zenoh_config: Zenoh DDS設定(オプション) - raft_config: Raftコンセンサス設定(オプション)

例:

# 基本的な初期化
client.init_coordinator("node_1")

# カスタム設定での初期化
zenoh_config = {"connect": ["tcp/127.0.0.1:7447"]}
raft_config = {"election_timeout": 5000}
client.init_coordinator("node_1", zenoh_config, raft_config)

start_coordinator() -> None

分散coordinatorを開始します。

例外: - RuntimeError: coordinatorが初期化されていない場合

例:

client.init_coordinator("node_1")
client.start_coordinator()
print("Coordinator started")

stop_coordinator() -> None

分散coordinatorを停止します。

例外: - RuntimeError: coordinatorが初期化されていない場合

例:

client.stop_coordinator()
print("Coordinator stopped")

submit_coordination_task(task_type: str, payload: Dict[str, Any]) -> str

分散coordinationタスクを送信します。

パラメータ: - task_type: タスクタイプ - payload: タスクペイロードデータ

戻り値: タスクID

例外: - RuntimeError: coordinatorが初期化されていない場合

例:

task_id = client.submit_coordination_task(
    "federated_learning",
    {"model": "resnet50", "dataset": "cifar10"}
)
print(f"Task submitted: {task_id}")

内部タスク実装メモ(SDK組み込み簡易版)
  • federated_learning: payload['updates'] の数値フィールドを平均し、aggregated_parametersaggregation_meta を付与。
  • distributed_inference: payload['inputs'] / payload['batches'] をそのまま結果化し、各エントリに node_idstatus=completed を付加。
  • model_aggregation: payload['models']weights リストを平均し、aggregated_model['weights']aggregation_meta を付与。
get_coordination_task_status(task_id: str) -> Optional[Dict[str, Any]]

coordinationタスクの状態を取得します。

パラメータ: - task_id: 確認するタスクID

戻り値: タスク状態情報、または見つからない場合はNone

例外: - RuntimeError: coordinatorが初期化されていない場合

例:

status = client.get_coordination_task_status(task_id)
if status:
    print(f"Task status: {status['status']}")

get_cluster_status() -> Dict[str, Any]

現在のクラスタ状態を取得します。

戻り値: クラスタ状態情報

例外: - RuntimeError: coordinatorが初期化されていない場合

例:

status = client.get_cluster_status()
print(f"Active nodes: {status['active_nodes']}")
print(f"Leader: {status['leader_id']}")

register_coordination_node(node: Union[NodeInfo, str], node_info: Optional[Dict[str, Any]] = None) -> bool

coordinationクラスタに新しいノードを登録します。

パラメータ: - node: NodeInfo もしくはノードID文字列 - node_info: node に文字列を渡す場合は必須。address, port, role, capabilities などを含む辞書。

戻り値: 登録が成功した場合はTrue

例外: - RuntimeError: coordinatorが初期化されていない場合

例:

# NodeInfoを直接渡す
from datetime import datetime

node_info = NodeInfo(
    node_id="node_2",
    a=datetime.now(),
    capabilities=["gpu", "cpu"],
)
success = client.register_coordination_node(node_info)

# ID + dict で渡す場合
success = client.register_coordination_node(
    "node_3",
    {"address": "192.168.1.101", "port": 9001, "role": "follower", "capabilities": ["cpu"]},
)

unregister_coordination_node(node_id: str) -> bool

coordinationクラスタからノードを解除します。

パラメータ: - node_id: 解除するノードID

戻り値: 解除が成功した場合はTrue

例外: - RuntimeError: coordinatorが初期化されていない場合

例:

success = client.unregister_coordination_node("node_2")


戻り値の構造詳細

generate() の戻り値

{
    "generated_text": "生成されたテキスト",
    "tokens": 150,
    "latency_ms": 250
}

submit_prompt() の戻り値

{
    "prompt_id": "prompt_12345",
    "status": "submitted",
    "timestamp": "2026-02-17T10:30:45Z",
    "processing_nodes": {
        "visual": "Vision-1",
        "spatial": "Spatial-12",  # Feature 13: Rank 12 (Where pathway)
        "integration": "Spatial-14"  # Feature 13: Rank 14 (Integration)
    }
}

poll_for_result() の戻り値

{
    "prompt_id": "prompt_12345",
    "status": "completed",
    "response": "応答テキスト",
    "spatial_analysis": {
        "rank_12_where": {
            "positions": [[100, 200], [150, 250]],
            "depth": [0.5, 0.7],
            "latency_ms": 47
        },
        "rank_14_integration": {
            "what_where_fusion": "結果",
            "world_model": "..."
        }
    },
    "processing_time_ms": 150
}

get_simulation_result() の戻り値

{
    "response": "応答内容",
    "nodes_involved": [
        "Spatial-12", "Spatial-13", "Spatial-14", "Spatial-15"
    ],
    "timestamp": "2026-02-17T10:31:00Z"
}

batch_generate() の戻り値

[
    {
        "generated_text": "テキスト1",
        "tokens": 100,
        "latency_ms": 200
    },
    {
        "generated_text": "テキスト2",
        "tokens": 150,
        "latency_ms": 250
    },
    {
        "error": "エラーメッセージ",
        "prompt": "失敗したプロンプト"
    }
]

Feature 13(空間認知・生成システム)対応

EvoSpikeNet SDK は、Rank 12-15の空間処理ノード(Feature 13)に対応しています。

ノード Rank 処理対象 遅延目標
SpatialWhereNode 12 空間位置・奥行き推定 <50ms
SpatialWhatNode 13 物体認識・シーン生成 <30ms
SpatialIntegrationNode 14 What-Where統合・世界モデル <50ms
SpatialAttentionControlNode 15 サッカード計画・注意制御 <30ms

空間処理の例

# マルチモーダル入力で空間処理を実行
response = client.submit_prompt(
    prompt="画像内の物体の位置を説明してください",
    image_path="./sample_image.jpg"
)
result = client.poll_for_result(timeout=60)

# Rank 12-15の処理結果を取得
if result and 'spatial_analysis' in result:
    where_result = result['spatial_analysis'].get('rank_12_where')
    integration_result = result['spatial_analysis'].get('rank_14_integration')
    print(f"Detected positions: {where_result['positions']}")
    print(f"Integrated analysis: {integration_result}")

空間生成サービス(ニューラル版)

  • エンドポイント: POST /generate
  • 入力: JSON
    • input_text (str, <=4000 chars, required): 生成したいシーンの自然言語記述
    • cognitive_context (object, optional): 注意/ストレス/エンゲージメント等の認知状態ヒント
    • eeg_data (object, optional): signals 配列で EEG 生データを渡すと LIF 解析を適用
    • model_version (str, optional): サーバに保存された高精度モデルのバージョン。存在する場合はモデルのリロードが試みられ、high_precision 出力フラグが更新される。
  • バリデーション: 型チェックと最大長チェック(4000文字)。不正入力は 400 Bad Request
  • 出力: シーン仕様
    • scene_type, spatial_coordinates, spatial_layout, objects, dimensions, physics, lighting, navigation
    • high_precision (bool): 現在高精度モデルがロードされているか
    • model_version (str): 現在アクティブなモデルバージョン
    • metadata: confidence, processing_time, quantum_modulation_alpha, cognitive_entropy, encoding_norm, input_text, quality, quality_factor

ヘルスチェック

(追加情報) - model_version: 現在ロードされている空間生成モデルのバージョン。 - high_precision: 上記モデルが高精度モードかどうか。

  • エンドポイント: GET /health
  • 主要フィールド:
    • status: running|error|degraded
    • device: cpu|cuda
    • neural_components_available: 全コンポーネント初期化に成功したか
    • components: language_adapter / encoder_decoder / attention / plasticity / quantum / embodied_physics / lif_layer / izhikevich_layer の可用性ブール
    • metrics: request_count, error_rate, avg_latency_ms, uptime_seconds
    • knowledge_base_loaded: シーン/オブジェクトKB読込済みか
    • last_spike_cached: 直近のスパイク列がキャッシュされているか

サンプル(直接HTTP)

curl -X POST http://localhost:8000/generate \
    -H "Content-Type: application/json" \
    -d '{
        "input_text": "Calm indoor room with a table and soft light",
        "cognitive_context": {"attention_level": 0.6, "stress_level": 0.2}
    }'

curl http://localhost:8000/health | jq

SDK ラッパー

from evospikenet.sdk import EvoSpikeNetAPIClient

# SDKを使った空間生成の例
client = EvoSpikeNetAPIClient(spatial_base_url="http://localhost:8000")
scene = client.spatial_generate(
    "Calm indoor room with a table and soft warm lighting",
    cognitive_context={"attention_level": 0.6, "stress_level": 0.2},
)

health = client.spatial_health()
print(scene.get("metadata", {}).get("confidence"))

量子補助推論

  • エンドポイント: POST /quantum/infer
  • 入力: JSON
    • scene (object): /generate で返されたシーンオブジェクト
    • spike_trains (tensor/list): 強化に用いるスパイク列(省略可)
  • 出力: 与えられた scenemetadata.quantum_modulation_alphacognitive_entropy を追加した拡張シーン

RAG システム(バージョン付きインデックス)

  • ベースURL例: http://localhost:8001
  • エンドポイント:
    • POST /upload_file: ファイルを検証→解析→オートチャンク→埋め込み→Milvus+Elasticsearchに格納。レスポンスに doc_keyversion が含まれる。
    • POST /query: {"query": "...", "llm_type": "huggingface"|"evosdk"} で検索・生成。context にヒット文書を返し、sdk_response を含む場合あり。
  • サポート拡張子(主要): .pdf, .docx, .doc, .pptx, .ppt, .xlsx, .xls, .txt, .md, .markdown, .gdoc, .html
  • サンプル(docs/README.md を投入し検索):
export RAG_API_URL=${RAG_API_URL:-http://localhost:8001}

curl -X POST "$RAG_API_URL/upload_file" \
    -F "file=@docs/README.md" | jq

curl -X POST "$RAG_API_URL/query" \
    -H "Content-Type: application/json" \
    -d '{"query": "What does this project do?", "llm_type": "huggingface"}' | jq

SDK ラッパー

from evospikenet.sdk import EvoSpikeNetAPIClient

# RAG用SDKラッパーの例
client = EvoSpikeNetAPIClient(rag_base_url="http://localhost:8001")

# ファイル投入
upload = client.rag_upload_file("docs/README.md")
print(upload.get("doc_key"))

# クエリ実行(SDKのrag_queryを利用)
answer = client.rag_query("What does this project do?", llm_type="huggingface")
if answer:
    ctx = answer.get("context", [])
    if ctx:
        print(ctx[0].get("text"))

サンプルスクリプト: - examples/spatial_generation_client.py — /generate と /health を叩く簡易クライアント。 - examples/rag_ingest_and_query.py — /upload_file でインデックス後に /query で検索。 - examples/rag_markdown_sdk.py — Markdown ファイルを SDK 経由で投入・検索。


エラーハンドリング

SDK はEvoSpikeNetAPIError例外でエラーを処理します。

from evospikenet.sdk import EvoSpikeNetAPIClient, EvoSpikeNetAPIError

client = EvoSpikeNetAPIClient()
try:
    result = client.generate("Hello, world!")
except EvoSpikeNetAPIError as e:
    print(f"Error type: {e.error_info.error_type}")
    print(f"Details: {e.error_info.details}")
    if e.error_info.retry_after:
        print(f"Retry after {e.error_info.retry_after} seconds")

実装ガイド

推奨実装パターン

1. 同期処理(シンプルなスクリプト)

from evospikenet.sdk import EvoSpikeNetAPIClient

client = EvoSpikeNetAPIClient()
client.submit_prompt(prompt="テキスト生成クエリ")

# ポーリングで結果取得
result = client.poll_for_result(timeout=60, interval=5)

if result:
    print(f"Response: {result.get('response')}")

ケンシャルなリクエスト)

prompts = ["クエリ1", "クエリ2", "クエリ3"]
results = client.batch_generate(prompts, max_length=100)

for i, result in enumerate(results):
    if 'error' not in result:
        print(f"Result {i}: {result.get('generated_text')}")

利点: 複数リクエストを簡潔に処理
用途: テストデータ生成、ベースライン実験、データセット準備

3. 非同期処理(高スループット)

詳細は async_spatial_processing_example.py を参照してください。

import asyncio
from evospikenet.sdk import EvoSpikeNetAPIClient

async def run_async_examples():
    client = EvoSpikeNetAPIClient()
    tasks = [
        client.submit_prompt_async("クエリ1"),
        client.submit_prompt_async("クエリ2"),
        client.submit_prompt_async("クエリ3"),
    ]
    responses = await asyncio.gather(*tasks)
    print(responses)

asyncio.run(run_async_examples())

利点: 高スループット、リソース効率的
用途: 大規模バッチ処理、実運用システム、マイクロサービス

サンプルコード

基本: Feature 13 空間処理

# 同期サンプル(推奨: スクリプト・実験)
python examples/spatial_processing_example.py

# 非同期サンプル(推奨: 本番・高スループット)
python examples/async_spatial_processing_example.py

詳細機能: - 同期サンプル spatial_processing_example.py: - ✅ テキスト生成と空間分析 - ✅ マルチモーダル入力(画像+テキスト) - ✅ バッチ処理 - ✅ ヘルスチェック

  • 非同期サンプル async_spatial_processing_example.py:
  • ✅ 同時実行タスク
  • ✅ レート制限付き送信
  • ✅ 非同期ヘルスチェック
  • ✅ エラーハンドリング

ベストプラクティス

1. タイムアウト管理

# 推奨: 適切なタイムアウトを設定
result = client.poll_for_result(
    timeout=120,  # 2分
    interval=5    # 5秒ごとにポーリング
)

2. エラー処理

try:
    result = client.submit_prompt(prompt="クエリ")
except EvoSpikeNetAPIError as e:
    if e.error_info.error_type in ["timeout", "server_error"]:
        # リトライ可能なエラー
        pass
    else:
        # リトライ不可なエラー(入力エラー等)
        pass

3. 並行処理のスケーリング

# 小規模 (< 10リクエスト): 同期処理
for prompt in small_list:
    result = client.generate(prompt)

# 大規模 (> 100リクエスト): 非同期処理
import asyncio
tasks = [client.generate_async(p) for p in large_list]
results = asyncio.run(asyncio.gather(*tasks))

4. リソース効率

# ✅ 良い例: コンテキストマネージャ
with client.create_session() as session:
    for prompt in prompts:
        result = client.submit_prompt(prompt)

# ❌ 避けるべき: 毎回新規接続
for prompt in prompts:
    new_client = EvoSpikeNetAPIClient()  # 非効率

仕様と制限

API制限

項目 制限値
最大プロンプト長 10,000文字
最大生成長 5,000文字
タイムアウト 300秒(5分)
最大バッチサイズ 100
同時接続数 1,000
QPS(クエリ/秒) 1,000

性能指標(Feature 13)

メトリクス 目標値
テキスト生成 < 500ms
空間分析 < 200ms (Rank 12-15)
バッチ処理 100/秒 (100リクエスト/秒)
可用性 99.9%
P99レイテンシ 1,000ms
/Users/maoki/Documents/GitHub/EvoSpikeNet/docs/SDK_API_REFERENCE.md

Feature 36: 自動リカバリーメソッド

概要

AutoRecoveryEngine は AI ベースの異常検知とプレイブック自動実行によってシステムを自律回復させます。目標 MTTR -80%。

APIエンドポイント一覧

メソッド パス 説明
GET /api/recovery/status リカバリーエンジンのステータス取得
GET /api/recovery/incidents インシデント一覧取得
GET /api/recovery/incidents/{id} 特定インシデント取得
POST /api/recovery/incidents/{id}/acknowledge インシデントを確認済みに変更
POST /api/recovery/incidents/{id}/resolve インシデントを解決済みに変更
POST /api/recovery/trigger 異常診断を手動トリガー

get_recovery_status()

import requests

resp = requests.get(
    "http://localhost:8000/api/recovery/status",
    headers={"X-API-Key": API_KEY}
)
status = resp.json()
# {
#   "total_incidents": 5,
#   "open_incidents": 1,
#   "mttr_seconds": 120.4,
#   "monitoring_interval_seconds": 30,
# }

list_incidents(status, severity, limit)

params = {"status": "open", "severity": "critical", "limit": 20}
resp = requests.get(
    "http://localhost:8000/api/recovery/incidents",
    headers={"X-API-Key": API_KEY},
    params=params,
)
incidents = resp.json()["incidents"]
パラメータ 説明
status str open / acknowledged / resolved / auto_resolved
severity str low / medium / high / critical
limit int 最大取得件数(デフォルト: 50)

trigger_diagnosis(metrics)

payload = {
    "cpu_percent": 95.0,
    "memory_percent": 80.0,
    "db_connected": False,
    "error_rate": 0.15,
}
resp = requests.post(
    "http://localhost:8000/api/recovery/trigger",
    headers={"X-API-Key": API_KEY},
    json=payload,
)
result = resp.json()
# { "status": "incident_created", "incident": { "id": "...", ... } }
# または
# { "status": "no_anomaly_detected" }

完全なサンプルコード

auto_recovery_sdk.py


Feature 39: 監査ログメソッド

概要

AuditLogManager は SHA-256 ハッシュチェーンによる改ざん検知機能付き監査ログを提供します。全 HTTP リクエストは自動記録されます。

APIエンドポイント一覧

メソッド パス 説明
GET /api/audit/stats 統計情報取得
GET /api/audit/logs ログ検索
POST /api/audit/log エントリ手動書き込み
GET /api/audit/verify ハッシュチェーン検証
GET /api/audit/export ログエクスポート (JSON/CSV)

get_audit_stats()

resp = requests.get(
    "http://localhost:8000/api/audit/stats",
    headers={"X-API-Key": API_KEY}
)
stats = resp.json()

query_audit_logs(actor, action, resource, result, since, until, limit)

params = {
    "actor": "admin_user",
    "action": "model.train",
    "result": "success",
    "limit": 100,
}
resp = requests.get(
    "http://localhost:8000/api/audit/logs",
    headers={"X-API-Key": API_KEY},
    params=params,
)
entries = resp.json()["entries"]
パラメータ 説明
actor str 操作者でフィルタ
action str アクション前方一致フィルタ
result str success / failure / error
since / until str 期間 ISO8601
limit int デフォルト: 100

write_audit_entry(action, actor, resource, result, detail)

resp = requests.post(
    "http://localhost:8000/api/audit/log",
    headers={"X-API-Key": API_KEY},
    json={
        "action": "model.deploy",
        "actor": "ci_pipeline",
        "resource": "/api/models/genome_v3",
        "result": "success",
        "detail": {"version": "3.2.1"},
    },
)
# { "status": "logged", "entry_id": "...", "entry_hash": "sha256:..." }

verify_chain()

resp = requests.get(
    "http://localhost:8000/api/audit/verify",
    headers={"X-API-Key": API_KEY}
)
# { "valid": true, "checked": 1500, "error": null }

export_logs(format)

# JSON形式
resp = requests.get("http://localhost:8000/api/audit/export",
    headers={"X-API-Key": API_KEY}, params={"format": "json"})
logs_json = resp.json()

# CSV形式
resp = requests.get("http://localhost:8000/api/audit/export",
    headers={"X-API-Key": API_KEY}, params={"format": "csv"})
csv_text = resp.text

完全なサンプルコード

audit_log_sdk.py


Feature 40: 地理的分散ノード管理メソッド

Note: 部分実装モジュールあり。APIは利用可能だが テスト&ドキュメントのリファインを継続中。

概要

GeoNodeManager はマルチリージョンのノード管理、自動フェイルオーバー、リージョン間レイテンシー計測を提供します。

APIエンドポイント一覧

メソッド パス 説明
GET /api/geo/status 全体ステータス取得
GET/POST /api/geo/regions リージョン一覧/登録
GET/DELETE /api/geo/regions/{id} リージョン詳細/削除
GET/POST /api/geo/nodes ノード一覧/登録
DELETE /api/geo/nodes/{id} ノード削除
POST /api/geo/failover フェイルオーバー実行
GET /api/geo/failover/history フェイルオーバー履歴
GET /api/geo/latency-matrix レイテンシー行列
GET/PUT /api/geo/active-region アクティブリージョン取得/変更
GET /api/geo/replication-group/{id} レプリケーショングループ

register_node(node_id, region_id, endpoint, node_type)

resp = requests.post(
    "http://localhost:8000/api/geo/nodes",
    headers={"X-API-Key": API_KEY},
    json={
        "node_id": "prod-gpu-node-01",
        "region_id": "ap-northeast-1",
        "endpoint": "10.0.1.100:8000",
        "node_type": "gpu",
    },
)
# { "status": "registered", "node_id": "prod-gpu-node-01" }

trigger_failover(from_region, to_region, reason, triggered_by)

resp = requests.post(
    "http://localhost:8000/api/geo/failover",
    headers={"X-API-Key": API_KEY},
    json={
        "from_region": "ap-northeast-1",
        "to_region": "us-east-1",
        "reason": "region_unavailable",
        "triggered_by": "monitoring_system",
    },
)
# { "status": "failover_executed", "event": { ... } }

get_latency_matrix()

resp = requests.get(
    "http://localhost:8000/api/geo/latency-matrix",
    headers={"X-API-Key": API_KEY}
)
matrix = resp.json()["matrix"]
# matrix["ap-northeast-1"]["us-east-1"] == 145.3  (ms)

GeoRegion構造

{
  "region_id": "ap-northeast-1",
  "display_name": "Asia Pacific (Tokyo)",
  "provider": "aws",
  "latitude": 35.6762,
  "longitude": 139.6503,
  "priority": 1,
  "status": "online",
  "node_count": 4
}

性能指標(Feature 36/39/40)

メトリクス 目標値
監査ログ書き込みスループット ≥ 1,000 エントリ/秒
ハッシュチェーン検証 (1,000 件) < 5,000 ms
異常検知レイテンシ (p99) < 10 ms
レイテンシー行列計算 (50 リージョン) < 500 ms
インシデント作成レイテンシ < 100 ms
自動フェイルオーバー完了時間 < 30 秒

完全なサンプルコード

geo_node_manager_sdk.py


生物模倣統合 API(BrainSimulationFramework)

更新日: 2026-03-06
Phase A/B 全 11 項目完了。詳細評価: docs-dev/biomimetic_integration_evaluation.md v2.0 (8.7/10)

BrainSimulationFramework

evospikenet.brain_simulation モジュールの最上位統合クラス。biomimetic/ 全モジュール(神経調節・STDP・睡眠・回路・皮質トポロジー・DMN等)を SNN コアと結合する。

コンストラクタ

BrainSimulationFramework(
    enable_biomimetic: bool = False,
    config: Optional[BrainSimulationConfig] = None,
)

パラメータ: - enable_biomimetic: True にすると biomimetic/ 全モジュールを初期化してフルループ統合を有効化。 - config: シミュレーション設定オブジェクト(省略時はデフォルト)。

メソッド

run_simulation(duration: int = 1000) -> Dict[str, Any]

生物模倣 6 フェーズパイプラインを実行します。

フェーズ: 1. DevelopmentalSchedule.plasticity_multiplier(t) — 発達スケジュール 2. NeuralCircuitModeler.simulate_timestep() — 回路シミュレーション(Izhikevich 対応) 3. STDP × NeuromodulatorGate.gated_learning_rate() — 可塑性変調 4. NodeEnergyBudget.energy_fitness_term() — エネルギーホメオスタシス 5. HippocampalBuffer.store(episode) — エピソード記憶記録 6. SleepConsolidation.offline_consolidation() — 睡眠統合リプレイ

戻り値: {phase_results, biomimetic_status, duration_ms} を含む辞書

run_idle_phase(duration_s: float = 10.0) -> Dict[str, Any]

DMN(デフォルトモードネットワーク)アイドルサイクルを非同期実行します。

import asyncio
activities = asyncio.run(framework.run_idle_phase(duration_s=10.0))

戻り値: DMN 活動ログと各時間ステップのスパイク記録の辞書

biomimetic_status() -> Dict[str, Any]

全生物模倣モジュールの状態スナップショットを返します。

status = framework.biomimetic_status()
# {
#   "stdp_connected_gate": True,
#   "sleep_consolidation_replay": True,
#   "izhikevich_circuits": 1,
#   "cortical_columns_registered": 0,
#   "neuromodulator_registry_linked": True,
#   "efference_copy_adaptive": True,
#   "mirror_neuron_default_classifier": True,
#   "dmn_idle_phase_available": True,
# }

NeuralCircuitModeler(Izhikevich バックエンド)

コンストラクタ

NeuralCircuitModeler(
    config: NeuralCircuitConfig,
    neuron_type: str = "lif",  # "lif" | "izhikevich"
)

パラメータ: - neuron_type: "izhikevich" を指定すると IzhikevichNeuron.step() バックエンドを使用。RS/IB/CH/FS/LTS 等の多様な発火パターンに対応。

simulate_timestep(input_current, t) -> Tuple[np.ndarray, np.ndarray]

1 タイムステップ分 (dt=1ms) のシミュレーションを実行。

戻り値: (spike_array, membrane_voltages)


BrainRegionIntegrator

皮質トポロジーと脳領域の統合管理クラス。

add_cortical_topology(generator, nx_cols, ny_cols) -> int

CorticalTopologyGenerator が生成したカラム配置を BrainRegionConfig として一括登録し、隣接カラム(距離 ≤ √2 mm)間に小世界的接続を設定します。

from evospikenet.biomimetic import CorticalTopologyGenerator
from evospikenet.brain_simulation import BrainRegionIntegrator

gen = CorticalTopologyGenerator()
integrator = BrainRegionIntegrator()
n = integrator.add_cortical_topology(gen, nx_cols=4, ny_cols=4)
print(n)  # 16

パラメータ: - generator: CorticalTopologyGenerator インスタンス - nx_cols: X 方向カラム数 - ny_cols: Y 方向カラム数

戻り値: 登録されたカラム数 (int)


STDP — 生物模倣拡張メソッド

STDP.with_neuromodulation(gate) -> STDP(クラスメソッド)

ファクトリメソッド。NeuromodulatorGate を注入した STDP インスタンスを生成します。

gate = NeuromodulatorGate()
stdp = STDP.with_neuromodulation(gate)

STDP.connect_plasticity_gate(gate) -> None

既存の STDP インスタンスに NeuromodulatorGate を後付け接続します。

stdp = STDP()
stdp.connect_plasticity_gate(gate)

SleepConsolidation — 拡張メソッド

offline_consolidation(episodes, stdp) -> Dict[str, Any]

エピソードリストを STDP リプレイ学習で統合します。

stats = sleep.offline_consolidation(episodes=buffer.replay(), stdp=stdp)
# stats: {replayed_episodes, weight_updates_mean, replay_duration_ms}

戻り値 (stats): replayed_episodes, weight_updates_mean, replay_duration_ms を含む辞書


NeuromodulatorGate — Registry 連携メソッド

connect_to_registry(registry) -> None

NeuromodulatorRegistry と双方向ブリッジを確立します。

push_to_registry() -> None

Gate の現在状態(DA/ACh/OT レベル等)を Registry に書き込みます。

pull_from_registry() -> None

Registry の値を Gate state に反映します。

gate = NeuromodulatorGate()
registry = NeuromodulatorRegistry()
gate.connect_to_registry(registry)
gate.push_to_registry()
gate.pull_from_registry()

EfferenceCopy — 適応ゲインメソッド

adaptive_gain_update(prediction_error: float) -> float

予測誤差に基づいてゲインを適応更新します。

reset() -> None

ゲイン・内部状態を初期値にリセットします。

efference = EfferenceCopy()
gain = efference.adaptive_gain_update(prediction_error=0.3)
efference.reset()

完全なサンプルコード

sdk_distributed_brain.pydemonstrate_biomimetic_brain_simulation() 関数を参照)

テスト一覧

テストファイル 対象
tests/unit/test_biomimetic_init_api.py __init__.py 全シンボル
tests/unit/test_stdp_neuromodulation.py STDP ↔ Gate 配線
tests/unit/test_sleep_consolidation_stdp.py offline_consolidation() + stats
tests/unit/test_efference_copy_adaptive.py 適応ゲイン・reset()
tests/unit/test_mirror_neurons_default_classify.py _default_classify()
tests/integration/test_brain_simulation_biomimetic.py BrainSimulationFramework フル統合
tests/integration/test_dmn_idle_phase.py run_idle_phase() / DMN 停止確認
# Docker で生物模倣テスト一括実行
docker compose -f docker-compose.test.yml --profile biomimetic run --rm biomimetic-test

生物模倣 REST API エンドポイント

/biomimetic/* 配下のエンドポイントは HTTP 経由で生物模倣モジュール全体を操作します。 requests ライブラリまたは EvoSpikeNetAPIClient._make_request() で呼び出せます。

import requests

BASE_URL = "http://localhost:8000"
API_KEY  = "your-api-key"   # 不要な場合は空文字列でも可
HEADERS  = {"X-API-Key": API_KEY} if API_KEY else {}

エンドポイント一覧

メソッド パス 説明
GET /biomimetic/status 全モジュール状態スナップショット
POST /biomimetic/simulate フル脳シミュレーション実行
POST /biomimetic/neuromod/update 神経調節物質レベル更新
POST /biomimetic/reward TD 報酬シグナル送信
POST /biomimetic/sleep/consolidate オフライン記憶固定(リプレイ)
POST /biomimetic/sleep/wake-config 睡眠覚醒サイクル設定
POST /biomimetic/framework/reset フレームワーク再初期化

GET /biomimetic/status

全生物模倣モジュールの現在状態を返します。

resp = requests.get(f"{BASE_URL}/biomimetic/status", headers=HEADERS)
data = resp.json()
# {
#   "biomimetic_enabled": true,
#   "neuromod_levels": {"dopamine": 0.5, "noradrenaline": 0.4, ...},
#   "energy_budget":   {"current_w": 9.2, "budget_w": 10.0},
#   "sleep_stats":     {"cycles_completed": 3, "replayed_events": 128},
# }

POST /biomimetic/simulate

NeuralCircuitConfig の全パラメータを指定してシミュレーションを実行します。

payload = {
    # NeuralCircuitConfig
    "num_neurons": 1000,
    "connection_probability": 0.1,
    "excitatory_ratio": 0.8,
    "inhibitory_ratio": 0.2,
    "refractory_period": 5,
    "membrane_time_constant": 20.0,
    # BrainSimulationFramework
    "enable_biomimetic": True,
    "development_epoch": 0,
    "total_epochs": 1000,
    "energy_budget_w": 10.0,
    # シミュレーション制御
    "duration": 1000,
    "plasticity_rule": "stdp",     # "stdp" | "bcm" | "oja"
    "plasticity_interval": 10,
    "sleep_every": 500,
    "sleep_cycles": 3,
    "neuron_type": "lif",          # "lif" | "izhikevich"
}
resp = requests.post(f"{BASE_URL}/biomimetic/simulate", headers=HEADERS, json=payload)
data = resp.json()
# {
#   "status": "ok",
#   "duration_ms": 3842,
#   "spikes_total": 18423,
#   "biomimetic_status": { ... }
# }
パラメータ デフォルト 説明
num_neurons int 1000 回路ニューロン数
connection_probability float 0.1 結合確率
excitatory_ratio float 0.8 興奮性ニューロン比率
inhibitory_ratio float 0.2 抑制性ニューロン比率
refractory_period int 5 不応期 (ms)
membrane_time_constant float 20.0 膜時定数 (ms)
enable_biomimetic bool true 生物模倣モジュールを有効化
development_epoch int 0 現在の developmental epoch
total_epochs int 1000 総エポック数
energy_budget_w float 10.0 ノードエネルギー上限 (W)
duration int 1000 シミュレーション長 (ms)
plasticity_rule str "stdp" 可塑性ルール
plasticity_interval int 10 可塑性更新間隔 (steps)
sleep_every int 500 睡眠フェーズ開始タイミング (steps)
sleep_cycles int 3 睡眠サイクル数
neuron_type str "lif" ニューロンモデル種別

POST /biomimetic/neuromod/update

ドーパミン等の神経調節物質レベルを即時更新します。

payload = {
    "dopamine":          0.8,   # 報酬・動機付け (0.0–1.0)
    "noradrenaline":     0.4,   # 覚醒・注意
    "acetylcholine":     0.6,   # 記憶・学習
    "serotonin":         0.5,   # 気分・衝動制御
    "oxytocin":          0.3,   # 社会的絆
    "emotion_factor":    0.7,   # 全般的感情強度
    "motivation_factor": 0.9,   # 行動への動機付け
}
resp = requests.post(f"{BASE_URL}/biomimetic/neuromod/update", headers=HEADERS, json=payload)
data = resp.json()
# {
#   "message": "Neuromodulator levels updated",
#   "updated_levels": {"dopamine": 0.8, "serotonin": 0.5, ...}
# }
パラメータ 説明
dopamine float ドーパミン値 (0.0–1.0)
noradrenaline float ノルアドレナリン値
acetylcholine float アセチルコリン値
serotonin float セロトニン値
oxytocin float オキシトシン値
emotion_factor float 感情強度係数
motivation_factor float 動機付け係数

POST /biomimetic/reward

TD 報酬シグナルを送信してドーパミン系モジュールを更新します。

payload = {
    "reward":          1.0,   # 報酬値 (-1.0 ~ 1.0)
    "td_error":        0.35,  # 時間差分誤差
    "update_neuromod": True,  # ドーパミン系を即時更新するか
}
resp = requests.post(f"{BASE_URL}/biomimetic/reward", headers=HEADERS, json=payload)
data = resp.json()
# {
#   "status": "ok",
#   "dopamine_level": 0.73,
# }

POST /biomimetic/sleep/consolidate

海馬バッファに蓄積された記憶トレースをオフラインでリプレイし STDP を適用します。

payload = {
    "sleep_cycles":      3,     # 実行するスリープサイクル数
    "replay_buffer_size": 256,  # リプレイバッファサイズ (スパイク数)
    "stdp_lr":           0.01,  # STDP 学習率 (consolidation 中)
}
resp = requests.post(f"{BASE_URL}/biomimetic/sleep/consolidate", headers=HEADERS, json=payload)
data = resp.json()
# {
#   "status": "ok",
#   "cycles_completed": 3,
#   "replayed_events":  128,
# }

POST /biomimetic/sleep/wake-config

SleepWakeCycleController のスケジュールパラメータを更新します。

payload = {
    "awake_phase_steps":      500,  # 覚醒フェーズのステップ数
    "sleep_phase_steps":      100,  # 睡眠フェーズのステップ数
    "sleep_cycles_per_epoch": 3,    # エポックごとのサイクル数
}
resp = requests.post(f"{BASE_URL}/biomimetic/sleep/wake-config", headers=HEADERS, json=payload)
data = resp.json()
# {
#   "status": "ok",
#   "config_applied": {"awake_phase_steps": 500, ...}
# }

POST /biomimetic/framework/reset

BrainSimulationFramework を再初期化し、全モジュールの状態をリセットします。

payload = {
    "enable_biomimetic": True,  # リセット後もバイオミメティクスを有効にするか
}
resp = requests.post(f"{BASE_URL}/biomimetic/framework/reset", headers=HEADERS, json=payload)
data = resp.json()
# {
#   "status":  "ok",
#   "message": "BrainSimulationFramework reinitialized",
# }

EvoSpikeNetAPIClient 経由での呼び出し

専用メソッドがない biomimetic エンドポイントは _make_request() で呼び出せます。 SDK セッション機能(自動リトライ、ヘッダー管理)が利用できます。

from evospikenet.sdk import EvoSpikeNetAPIClient

client = EvoSpikeNetAPIClient(base_url="http://localhost:8000", api_key=API_KEY)

# GET /biomimetic/status
status = client._make_request("GET", f"{client.base_url}/biomimetic/status")

# POST /biomimetic/neuromod/update
result = client._make_request(
    "POST",
    f"{client.base_url}/biomimetic/neuromod/update",
    json={"dopamine": 0.8, "serotonin": 0.5},
)

完全なサンプルコード

sdk_biomimetic_rest_api.py

# 全エンドポイントをパイプラインで実行
python examples/sdk/sdk_biomimetic_rest_api.py --base-url http://localhost:8000

# 個別デモ実行
python examples/sdk/sdk_biomimetic_rest_api.py --demo simulate
python examples/sdk/sdk_biomimetic_rest_api.py --demo neuromod
python examples/sdk/sdk_biomimetic_rest_api.py --demo sleep

Phase E-3 コネクトーム本番化・自動同期 API

最終更新日: 2026-03-19(Phase E-3 全完了)

sync_connectome — 自動同期パイプライン(E-3-1)

apply_delta(base_path, delta_path, result_path)

差分 JSON を既存 NPZ に適用し、新しい NPZ をアトミックに生成する。

from scripts.sync_connectome import apply_delta

apply_delta(
    base_path="data/connectome/cache/visual.npz",
    delta_path="data/connectome/delta_v42.json",
    result_path="data/connectome/cache/visual_v43.npz",
)

差分 JSON スキーマ:

{
    "schema_version": "1.0",
    "materialization_version_from": 42,
    "materialization_version_to": 43,
    "added": [
        {"pre_id": 100, "post_id": 200, "weight": 0.15, "delay_ms": 1.5}
    ],
    "removed": [
        {"pre_id": 50, "post_id": 80}
    ]
}

apply_delta_with_validation(base_path, delta_path, result_path, *, rollback_dir, ei_ratio_range)

E/I 比検証付きで差分を適用。範囲外の場合 rollback_dir にバックアップを生成して ConnectomeSyncValidationError を送出。

from scripts.sync_connectome import apply_delta_with_validation, ConnectomeSyncValidationError

try:
    apply_delta_with_validation(
        base_path="visual.npz",
        delta_path="delta.json",
        result_path="visual_new.npz",
        rollback_dir="data/connectome/rollback/",
        ei_ratio_range=(3.5, 5.0),
    )
except ConnectomeSyncValidationError as e:
    print(f"E/I検証失敗・ロールバック完了: {e}")

sync_connectome(config_path, cache_path, output_path, *, dry_run, ...)

完全自動同期オーケストレーター。CAVE API デルタ取得に始まり適用まで一勧。

from scripts.sync_connectome import sync_connectome

result = sync_connectome(
    config_path="config/connectome_config.yaml",
    cache_path="data/connectome/cache/visual.npz",
    output_path="data/connectome/cache/visual.npz",
    dry_run=False,
)
print(result["status"])  # "success" | "dry_run" | "no_update"

CLI 使用例

# 通常実行
python scripts/sync_connectome.py \
    --config config/connectome_config.yaml \
    --cache data/connectome/cache/visual.npz \
    --output data/connectome/cache/visual.npz

# ドライラン(出力ファイルを更新せず結果を表示)
python scripts/sync_connectome.py --dry-run \
    --cache data/connectome/cache/visual.npz \
    --output /dev/null

brain_routing — HCP 遅延アウェア Zenoh ルーティング(E-3-3)

compute_delay_matrix(manifest, config_path)

HCP 実測値からノード間遅延行列を構範する。

from evospikenet.brain_routing import compute_delay_matrix
from evospikenet.connectome import build_manifest

manifest = build_manifest("data/connectome/")
delay_matrix = compute_delay_matrix(
    manifest=manifest,
    config_path="config/connectome_config.yaml",
)
# {"pfc": {"memory_spike": 12.0, "visual": 9.0, ...}, ...}
print(delay_matrix["pfc"]["memory_spike"])  # 12.0 ms

build_hcp_routing_table(config_path)

フルパイプラインでルーティングテーブルを生成。

from evospikenet.brain_routing import build_hcp_routing_table

routing = build_hcp_routing_table("config/connectome_config.yaml")
# routing["delay_matrix"]  — {src: {dst: ms}}
# routing["routing_plan"]["routing_edges"]  — priority降順エッジリスト
# routing["zenoh_topics"]  — "brain_routing/delays/{node_id}" リスト

HCPDelayRouter

Zenoh セッションに遅延プロファイルを発行するクラス。session=None でも動作する(ログのみ)。

from evospikenet.brain_routing import HCPDelayRouter

# Zenoh セッションなしで動作するテスト
# session=None の場合はルーティング機能はログのみ
router = HCPDelayRouter(session=None, config_path="config/connectome_config.yaml")
router.load_routing_table()

# データペイロードに遅延プロファイルを付与
data = {"spikes": [1, 0, 1], "timestamp": 1.0}
enriched = router.apply_hcp_delays(node_id="pfc", data=data)
# {"spikes": [...], "timestamp": 1.0, "routing_delays": {"memory_spike": 12.0, ...}}

# 全ノードに遅延情報を一括発行
# Zenoh トピック: brain_routing/delays/{node_id}
router.publish_all()

auto_node_mapper — Auto Node Mapper CLI

map_connectome(input_path, output_dir, config_path, *, dry_run, seed)

コネクトームをノード別 NPZ に分割する。

from scripts.auto_node_mapper import map_connectome

result = map_connectome(
    input_path="data/connectome/flywire_visual.json",
    output_dir="data/connectome/nodes/",
    config_path="config/connectome_config.yaml",
    dry_run=False,
    seed=42,
)
print(f"マッピング完了: {result.total_nodes} ノード中 {result.mapped_nodes} 成功")
print(f"マニフェスト: {result.manifest_path}")
for entry in result.entries:
    print(f"  {entry.node_type}: {entry.n_neurons} neurons, E/I={entry.ei_ratio:.2f}, {entry.status}")

出力構造:

data/connectome/nodes/
├── visual.npz          ← 視覚野ノード用圧縮 NPZ
├── memory_spike.npz   ← 記憶ノード用
└── node_manifest.yaml  ← 全ノードメタデータ

node_manifest.yaml スキーマ:

schema_version: "1.0"
generated_at: "2026-03-19T10:00:00Z"
base_dataset: "flywire_visual"
nodes:
  visual:
    npz_path: data/connectome/nodes/visual.npz
    n_neurons: 1024
    ei_ratio: 4.1
    coarsening_method: stratified_sample
    status: ok
  memory_spike:
    npz_path: data/connectome/nodes/memory_spike.npz
    n_neurons: 512
    ei_ratio: 4.0
    coarsening_method: spectral_coarsen
    status: ok

CLI 使用例

# フルマッピング
python scripts/auto_node_mapper.py \
    --input data/connectome/flywire_visual.json \
    --output-dir data/connectome/nodes/ \
    --config config/connectome_config.yaml

# ドライラン・定数固定で再現性あり
python scripts/auto_node_mapper.py \
    --input data/connectome/flywire_visual.json \
    --output-dir data/connectome/nodes/ \
    --dry-run --seed 42

全構コネクトーム E-3 ステップバイステップサンプル

完全な初期化から定期同期までの典型的なワークフロー:

import evospikenet as esn
from scripts.auto_node_mapper import map_connectome
from scripts.sync_connectome import sync_connectome
from evospikenet.brain_routing import HCPDelayRouter
from evospikenet import load_connectome_npz, apply_connectome_to_layer, ConnectomeLIFLayer

# --- ステップ 1: コネクトーム → ノード別 NPZ に分割 ---
result = map_connectome(
    input_path="data/connectome/flywire_visual.json",
    output_dir="data/connectome/nodes/",
    config_path="config/connectome_config.yaml",
    seed=42,
)
print(f"E-3 Auto Node Mapper: {result.mapped_nodes} nodes mapped")

# --- ステップ 2: ノード別 NPZ を ConnectomeLIFLayer に注入 ---
visual_data = load_connectome_npz("data/connectome/nodes/visual.npz")
layer = ConnectomeLIFLayer(num_neurons=visual_data["n_neurons"], device="cpu")
apply_connectome_to_layer(visual_data, layer)
print(f"structural_mask: {layer.structural_mask.shape}, ei_ratio={layer.ei_ratio:.2f}")

# --- ステップ 3: HCP 遅延ルーティングを構範 ---
router = HCPDelayRouter(session=None, config_path="config/connectome_config.yaml")
router.load_routing_table()
router.publish_all()  # Zenoh 無しの場合はログのみ

# --- ステップ 4: 自動同期(CAVE API より差分取得) ---
sync_result = sync_connectome(
    config_path="config/connectome_config.yaml",
    cache_path="data/connectome/nodes/visual.npz",
    output_path="data/connectome/nodes/visual.npz",
    dry_run=True,  # まずドライランで確認
)
print(f"sync status: {sync_result['status']}")

完全なウォークスルーサンプル: connectome_e3_demo.py ```