EvoSpikeNet SDK API Reference
Copyright: 2026 Moonlight Technologies Inc.
Author: Masahiro Aoki
最終更新日: 2026年2月20日 🎯 Feature 13 + 空間生成ニューラル統合 + Feature 36/39/40(自動リカバリー・監査ログ・地理的分散ノード管理)
概要
このドキュメントは、EvoSpikeNet Python SDKの完全なAPIリファレンスを提供します。SDKはREST APIクライアントとして実装されており、テキスト生成、分散脳シミュレーション、空間認知処理(Rank 12-15 Feature 13)、アーティファクト管理、RAG/ベクター検索(バージョン付きインデックス)などの機能をサポートしています。
クラス構造
EvoSpikeNetAPIClient
メインのAPIクライアントクラスです。HTTPリクエストの管理、エラーハンドリング、接続プーリングを提供します。
コンストラクタ
EvoSpikeNetAPIClient(
base_url: str = "http://localhost:8000",
spatial_base_url: Optional[str] = None,
rag_base_url: Optional[str] = None,
api_key: Optional[str] = None,
timeout: int = 60,
max_retries: int = 3,
session: Optional[requests.Session] = None,
)
パラメータ:
- base_url: APIサーバーのベースURL
- spatial_base_url: 空間生成サービス(FastAPI版)のベースURL。未指定時は base_url を使用。
- rag_base_url: RAGサービスのベースURL。未指定時は base_url を使用。
- api_key: API認証キー
- timeout: リクエストタイムアウト(秒)
- max_retries: 最大リトライ回数
- session: カスタムrequestsセッション
基本メソッド
generate(prompt: str, max_length: int = 50) -> Dict[str, str]
テキスト生成を実行します。
パラメータ:
- prompt: 入力プロンプト
- max_length: 生成する最大文字数
戻り値: 生成結果を含む辞書
例:
result = client.generate("Hello, world!", max_length=100)
print(result['generated_text'])
LocalMemoryClient(オフラインヘルパー)
HTTP APIを使わずにローカルで長期記憶モジュールとスパイク圧縮レイヤを試す軽量ラッパー。
from evospikenet.sdk import LocalMemoryClient
import torch
client = LocalMemoryClient(state_dim=16, time_steps=6, embedding_dim=16, max_traces=32)
spikes = torch.rand(6, 16)
summary = client.store_spike_episode(
spike_sequence=spikes,
context={"scenario": "demo"},
action="look",
outcome="noted",
reward=0.2,
semantic_tags=["demo_concept"],
)
stats = client.reservoir_stats()
result = client.retrieve({"scenario": "demo", "semantic_tags": ["demo_concept"]}, top_k=1)
関連実装: evospikenet/snn_memory_extension.py, evospikenet/long_term_memory.py, evospikenet/forgetting_controller.py。
submit_prompt(prompt: Optional[str] = None, image_path: Optional[str] = None, audio_path: Optional[str] = None) -> Dict
マルチモーダルプロンプトを分散脳シミュレーションに送信します。
運用メモ(2026-04-20):
- 分散環境でのASR実行経路はサーバー環境変数 VIDEO_ANALYSIS_ASR_BACKEND に依存します(asr_fallback / whisper_real)。
- Whisperのモデル/デバイスは VIDEO_ANALYSIS_WHISPER_MODEL / VIDEO_ANALYSIS_WHISPER_DEVICE で制御します。
- 本変更はサーバー構成の拡張であり、SDKメソッドのシグネチャ変更はありません。
パラメータ:
- prompt: テキストプロンプト
- image_path: 画像ファイルのパス
- audio_path: 音声ファイルのパス
戻り値: 送信確認を含む辞書
get_simulation_status() -> Dict
分散脳シミュレーションの現在の状態を取得します。
戻り値: 全ノードの状態を含む辞書
get_simulation_result() -> Dict
完了したシミュレーションの最新結果を取得します。
戻り値: 応答を含む辞書、または結果が利用できない場合はNone
poll_for_result(timeout: int = 120, interval: int = 5) -> Optional[Dict[str, Any]]
結果が利用可能になるまでポーリングします。
パラメータ:
- timeout: 最大待機時間(秒)
- interval: ポーリング間隔(秒)
戻り値: 結果辞書、またはタイムアウト時はNone
進化/ゲノム管理メソッド
list_genomes() -> List[Dict[str, Any]]
保存済みゲノムを一覧し、name / generation / fitness などのメタデータを取得します。
get_genome(genome_name: str) -> Dict[str, Any]
指定したゲノムファイルのペイロードを取得します。
save_genome(genome_name: str, genome: Dict[str, Any], make_active: bool = False) -> Dict[str, Any]
ゲノムを保存します。make_active=True で active_genome.json に昇格します。
apply_genome(genome_name: str) -> Dict[str, Any]
既存のゲノムをアクティブ化します(ペイロード編集なし)。
使用例:
from evospikenet.sdk import EvoSpikeNetAPIClient
# Example: use EvoSpikeNetAPIClient to manage genomes
client = EvoSpikeNetAPIClient(base_url="http://localhost:8000")
genomes = client.list_genomes()
if genomes:
target = genomes[0]["name"]
payload = client.get_genome(target)
payload.setdefault("metadata", {})["note"] = "edited via SDK"
client.save_genome(target, payload, make_active=True)
client.apply_genome(target)
else:
print("No genomes found on the server")
関連サンプル: examples/genome_management_sdk.py
アーティファクト管理メソッド
upload_artifact(session_id: str, artifact_type: str, name: str, file: io.BytesIO, **metadata) -> Dict
アーティファクトファイルをアップロードします。
パラメータ:
- session_id: セッションID
- artifact_type: アーティファクトタイプ ('model', 'log', 'config' など)
- name: アーティファクト名
- file: アップロードするファイルバッファ
- **metadata: 追加のメタデータ(llm_type, node_type, model_category, model_variant)
戻り値: アップロード結果を含む辞書
list_artifacts(artifact_type: Optional[str] = None) -> Union[List[Dict[str, Any]], Dict[str, Any]]
アーティファクトを一覧表示します。
パラメータ:
- artifact_type: フィルタリングするアーティファクトタイプ
戻り値: アーティファクトのリストまたは辞書
download_artifact(artifact_id: str, destination_path: str) -> None
アーティファクトをダウンロードします。
パラメータ:
- artifact_id: ダウンロードするアーティファクトのID
- destination_path: 保存先ファイルパス
ログ管理メソッド
create_log_session(description: str) -> Dict
新しいログセッションを作成します。
パラメータ:
- description: セッションの説明
戻り値: セッション情報
get_remote_log(user: str, ip: str, key_path: str, log_file_path: str) -> Dict
リモートノードからログを取得します。
パラメータ:
- user: SSHユーザー名
- ip: リモートホストのIPアドレス
- key_path: SSH秘密鍵のパス
- log_file_path: リモートログファイルの絶対パス
戻り値: ログ内容を含む辞書
バッチ処理メソッド
batch_generate(prompts: List[str], max_length: int = 50) -> List[Dict[str, str]]
複数のプロンプトを順次処理します。
パラメータ:
- prompts: 処理するプロンプトのリスト
- max_length: 各生成の最大文字数
戻り値: 各プロンプトの結果リスト
ヘルパーメソッド
health_check() -> Dict[str, Any]
APIサーバーのヘルスチェックを実行します。
戻り値: ヘルス状態を含む辞書
マルチ言語バインディング
Python SDKには Go および TypeScript 向けの軽量プロキシ実装が含まれています。これらは本来別リポジトリで Go/JS 実装が存在する予定ですが、
Python 環境でも簡単に import できるように機能を模倣したものです。
Go SDK プロキシ
from evospikenet import sdk_go
client = sdk_go.init_client("http://localhost:8000", api_key="KEY")
path = client.download_file("foo.bin", dest="/tmp/foo.bin")
| メソッド | 説明 |
|---|---|
init_client(endpoint: str, api_key: Optional[str]=None) |
Goクライアントオブジェクトを返します |
GoSDKClient.download_file(filename, dest=None, timeout=60) |
EvoSpikeNet API からファイルを取得します |
TypeScript SDK プロキシ
from evospikenet import sdk_ts
client = sdk_ts.initClient("https://api", apiKey=None)
client.downloadFile("foo", dest="out.bin")
| メソッド | 説明 |
|---|---|
initClient(endpoint: str, apiKey: Optional[str]=None) |
TS/JSクライアントオブジェクトを返します |
TSSDKClient.downloadFile(filename, dest=None, timeout=60) |
同上 (camelCase命名) |
これらのクライアントは内部的に requests を使って HTTP 通信を行い、Python テストやドキュメントのサンプルで利用可能です。
node_discovery_health() -> Dict[str, Any]
ノード検出サービスから現在のノード一覧とサマリを取得します。内部では /api/node-discovery/health エンドポイントを呼び出します。
戻り値: ノード状態 (nodes, summary, updated_at など) を含む辞書
node_discovery_topology() -> Dict[str, Any]
ノード検出サービスからネットワークトポロジー情報を取得します。内部では /api/node-discovery/topology エンドポイントを呼び出します。
戻り値: nodes と edges を含むトポロジー辞書
spatial_generate(input_text: str, cognitive_context: Optional[Dict[str, Any]] = None, eeg_data: Optional[Dict[str, Any]] = None, timeout: int = 30) -> Dict[str, Any]
空間生成サービス /generate を呼び出し、シーン仕様(scene_type, objects, spatial_layout, physics, lighting, navigation, metadata など)を取得します。
spatial_health(timeout: int = 10) -> Dict[str, Any]
空間生成サービス /health を呼び出し、neural_components_available や components、metrics、knowledge_base_loaded などの状態を取得します。
rag_upload_file(
file_path: str,
stream: bool = False,
background: bool = False,
init: bool = False,
session_id: Optional[str] = None,
final: bool = False,
timeout: int = 120,
) -> Dict[str, Any]
RAGサービス /upload_file にファイルを投入し、doc_key と version を受け取ります。
追加のフラグにより一時セッションやバックグラウンドジョブの制御が可能です:
stream: ドキュメントパーサをストリーミングモードで動作させますbackground: 解析・インデックス処理を Celery バックグラウンドジョブとして行います (バックエンドはjob_idを生成しupload_statusで追跡可能)
Note: クエリパラメータ job_id を指定して同じインスタンスの
APIを呼び出すと、処理中のチャンク進捗が upload_jobs に反映され
るため、通知やログ用フックを簡単に組み込めます。
- init: マルチパートアップロードセッションを開始し、以降の session_id 指定・final 付きパーツで同一バージョンを構築できます
- session_id/final: 既存セッションにファイルパーツを追加し、最終パートでは final=True にしてセッションを終了
サーバー側で自動的に:
- MIME/拡張子検証
- ドキュメント解析 (PDF/Word/Excel/PPT/Markdown 等)
- トークンベース自動チャンク化 (
chunk_text_auto) - 埋め込み生成 ・ Milvus/Elasticsearch へのインデックス
- バージョン管理 (
doc_keyによる履歴)
ストリーミングや分割アップロードは大容量 (1GB+) ドキュメントのメモリ効率を高め、 バックグラウンドモードは処理待ちを不要にします。
戻り値例: {"doc_key":"sample.pdf","version":3,"chunks_indexed":42,...}
rag_upload_file_async(
file_path: str,
stream: bool = False,
background: bool = False,
init: bool = False,
session_id: Optional[str] = None,
final: bool = False,
timeout: int = 120,
) -> Dict[str, Any]
Asynchronous counterpart to :func:rag_upload_file offering the same
parameter set and return structure. Useful when integrating with
asyncio-based workflows.
create_batch_job(payload: Dict[str, Any], timeout: int = 30) -> Dict[str, Any]
Create a new batch ingestion job on the RAG service. The payload may
include arbitrary job parameters such as file location, parsing options,
metadata, etc. The server responds with a job_id and initial status
(queued).
cancel_batch_job(job_id: str, timeout: int = 10) -> Dict[str, Any]
Request cancellation of an existing batch job previously created via
:func:create_batch_job. The returned dictionary echoes the job_id
and updated status (cancelled).
rag_query(query: str, llm_type: str = "huggingface", hf_model_name: Optional[str] = None, timeout: int = 60) -> Dict[str, Any]
RAGサービス /query で検索・生成を実行します。context(ヒット文書)、必要に応じて sdk_response(EvoSDK生成結果)を返します。サーバーは同時にバージョンごとの差分比較用パッチを生成し、/document_chunks と組み合わせればクライアント側で差分UIを構築できます。
Note: Large-document streaming ingestion (split‑upload) and advanced diff viewer support are detailed in docs/RAG_SYSTEM_DETAILED.md §7.6.
rag_query(query: str, llm_type: str = "huggingface", hf_model_name: Optional[str] = None, timeout: int = 60) -> Dict[str, Any]
RAGサービス /query で検索・生成を実行します。context(ヒット文書)、必要に応じて sdk_response(EvoSDK生成結果)を返します。
get_document_versions(doc_key: str) -> List[Dict[str, Any]]
ドキュメントの利用可能なバージョン一覧を返します。各項目は version, checksum, chunk_count, indexed_at を含みます。
rag_init_session(file_path: str, timeout: int = 30) -> Dict[str, Any]
Start a new upload session. Returns a session identifier along with the
determined doc_key and tentative version. Use this id for subsequent
the following API calls:
rag_upload_part(session_id: str, file_path: str, final: bool = False, timeout: int = 120) -> Dict[str, Any]
Upload a single chunk belonging to an open session. final=True signals
the last part and causes cleanup of the session state on the server.
upload_status(job_id: str) -> Dict[str, Any]
Check status of an asynchronous upload job (see /upload_file?background=true).
Returns a dictionary with job_id, status (queued, running, completed,
failed), and optional progress percentage.
get_document_chunks(doc_key: str, version: int) -> List[Dict[str, Any]]
指定した doc_key の version に関連するチャンク(順序付き)を返します。各チャンクは chunk_id, content, source_filename を持ちます。
get_server_info() -> Optional[Dict]
サーバー情報とロードされたモデルを取得します。
戻り値: サーバー情報、または利用できない場合はNone
wait_for_server(timeout: int = 60, interval: int = 2) -> bool
APIサーバーが正常になるまで待機します。
パラメータ:
- timeout: 最大待機時間(秒)
- interval: ヘルスチェック間隔(秒)
戻り値: サーバーが正常になった場合はTrue
validate_prompt(prompt: str) -> bool
プロンプトが送信に適しているかを検証します。
パラメータ:
- prompt: 検証するプロンプト
戻り値: 有効な場合はTrue
with_error_handling(func: Callable[..., Any], args, retries: int = 3, *kwargs) -> Any
自動リトライ付きで関数を実行します。
パラメータ:
- func: 実行する関数
- retries: リトライ回数
- *args: 関数に渡す位置引数
- **kwargs: 関数に渡すキーワード引数
戻り値: 関数の戻り値、または失敗時はNone
統計・監視メソッド
get_stats() -> Dict[str, Any]
クライアントの使用統計を取得します。
戻り値: 統計情報(リクエスト数、エラー数、平均遅延など)
reset_stats() -> None
統計カウンターをリセットします。
遅延監視メソッド
get_latency_stats() -> Dict
全コンポーネントの遅延統計を取得します。
戻り値: 遅延統計データ
check_latency_target() -> Dict
遅延ターゲットが満たされているかをチェックします。
戻り値: チェック結果の詳細
スナップショット管理メソッド
create_snapshot(snapshot_name: str, include_models: bool = True, include_data: bool = True, compression_level: int = 6) -> Dict
システムのスナップショットを作成します。
パラメータ:
- snapshot_name: スナップショット名
- include_models: モデルを含めるか
- include_data: データを含めるか
- compression_level: 圧縮レベル(0-9)
戻り値: 作成結果
list_snapshots() -> Dict
利用可能なスナップショットを一覧表示します。
戻り値: スナップショットリスト
restore_snapshot(snapshot_path: str, restore_models: bool = True, restore_data: bool = True) -> Dict
スナップショットから復元します。
パラメータ:
- snapshot_path: 復元するスナップショットのパス
- restore_models: モデルを復元するか
- restore_data: データを復元するか
戻り値: 復元結果
delete_snapshot(snapshot_path: str) -> Dict
スナップショットを削除します。
パラメータ:
- snapshot_path: 削除するスナップショットのパス
戻り値: 削除結果
validate_snapshot(snapshot_path: str) -> Dict
スナップショットの妥当性を検証します。
パラメータ:
- snapshot_path: 検証するスナップショットのパス
戻り値: 検証結果
cleanup_snapshots(max_age_days: int = 30) -> Dict
古いスナップショットをクリーンアップします。
パラメータ:
- max_age_days: 保持する最大日数
戻り値: クリーンアップ結果
スケーラビリティテストメソッド
run_scalability_test(max_nodes: int = 50, test_duration: int = 30) -> Dict
スケーラビリティテストを実行します。
パラメータ:
- max_nodes: 最大ノード数
- test_duration: テスト継続時間(秒)
戻り値: テスト結果
get_scalability_results() -> Dict
スケーラビリティテストの結果を取得します。
戻り値: テスト結果データ
get_scalability_status() -> Dict
スケーラビリティテストの状態を取得します。
戻り値: 現在の状態
test_node_scalability(node_counts: List[int], test_duration: float = 60.0) -> Dict
異なるノード数でのスケーラビリティをテストします。
パラメータ:
- node_counts: テストするノード数のリスト
- test_duration: 各テストの継続時間
戻り値: テスト結果
get_resource_usage() -> Dict
リソース使用状況を取得します。
戻り値: リソース使用統計
run_stress_test(intensity: str = "high", duration: float = 120.0) -> Dict
ストレステストを実行します。
パラメータ:
- intensity: テスト強度 ("low", "medium", "high")
- duration: テスト継続時間(秒)
戻り値: テスト結果
get_system_limits() -> Dict
システム制限を取得します。
戻り値: システム制限情報
ハードウェア最適化メソッド
optimize_model(model_type: str, optimizations: Optional[List[str]] = None) -> Dict
モデルをハードウェア用に最適化します。
パラメータ:
- model_type: モデルタイプ
- optimizations: 適用する最適化のリスト
戻り値: 最適化結果
benchmark_model(model_type: str, num_runs: int = 50) -> Dict[str, Any]
モデルのベンチマークを実行します。
パラメータ:
- model_type: ベンチマークするモデルタイプ
- num_runs: 実行回数
戻り値: ベンチマーク結果
get_hardware_info() -> Dict
ハードウェア情報を取得します。
戻り値: ハードウェア情報
高可用性監視メソッド
get_availability_status() -> Dict
可用性状態を取得します。
戻り値: 可用性状態
get_availability_stats(time_window: str = "24h") -> Dict
可用性統計を取得します。
パラメータ:
- time_window: 統計時間枠(例: "24h", "7d")
戻り値: 可用性統計
perform_health_check() -> Dict
ヘルスチェックを実行します。
戻り値: ヘルスチェック結果
trigger_recovery_action(action_type: str, parameters: Optional[Dict[str, Any]] = None) -> Dict
回復アクションをトリガーします。
パラメータ:
- action_type: アクションタイプ
- parameters: アクションパラメータ
戻り値: アクション結果
get_availability_alerts(limit: int = 50) -> Dict
可用性アラートを取得します。
パラメータ:
- limit: 取得するアラート数の上限
戻り値: アラートリスト
schedule_maintenance(start_time: str, duration_minutes: int, reason: str) -> Dict
メンテナンスをスケジュールします。
パラメータ:
- start_time: 開始時刻(ISO形式)
- duration_minutes: 継続時間(分)
- reason: メンテナンス理由
戻り値: スケジュール結果
非同期Zenoh通信メソッド
connect_zenoh(node_id: str = "api_node") -> Dict
Zenohネットワークに接続します。
パラメータ:
- node_id: ノード識別子
戻り値: 接続結果
publish_zenoh_message(topic: str, payload: Any, priority: str = "normal", message_type: str = "notification", node_id: str = "api_node") -> Dict
Zenohメッセージをパブリッシュします。
パラメータ:
- topic: メッセージトピック
- payload: メッセージペイロード
- priority: メッセージ優先度
- message_type: メッセージタイプ
- node_id: 送信元ノードID
戻り値: パブリッシュ結果
send_zenoh_request(target_node: str, request: Any, timeout: float = 5.0, node_id: str = "api_node") -> Dict
Zenohリクエストを送信します。
パラメータ:
- target_node: ターゲットノードID
- request: リクエストデータ
- timeout: タイムアウト(秒)
- node_id: 送信元ノードID
戻り値: レスポンス
send_zenoh_notification(target_nodes: List[str], notification: Any, priority: str = "normal", node_id: str = "api_node") -> Dict
Zenoh通知を送信します。
パラメータ:
- target_nodes: ターゲットノードIDのリスト
- notification: 通知データ
- priority: 通知優先度
- node_id: 送信元ノードID
戻り値: 送信結果
get_zenoh_stats(node_id: str = "api_node") -> Dict
Zenoh統計を取得します。
パラメータ:
- node_id: ノードID
戻り値: Zenoh統計
set_aeg_comm_config(node_id: str, enable_comm: bool = True, energy_threshold: float = 10.0, critical_modalities: List[str] = None, force_change_threshold: float = 10.0) -> Dict
AEG-Comm通信最適化設定を行います。
パラメータ:
- node_id: 対象ノードID
- enable_comm: 通信最適化有効/無効
- energy_threshold: エネルギー閾値
- critical_modalities: 重要モダリティリスト
- force_change_threshold: 強制変更閾値
戻り値: 設定結果
get_communication_stats(node_id: str = "api_node") -> Dict
AEG-Comm通信統計を取得します。
パラメータ:
- node_id: ノードID
戻り値: 通信統計(送信率、削減率、遅延、エラー率)
get_aeg_comm_status(node_id: str = "api_node") -> Dict
AEG-Commの現在のステータスを取得します。
パラメータ:
- node_id: ノードID
戻り値: AEG-Commステータス(活性ノード数、エネルギーレベル、クリティカルパケット数)
分散コンセンサスメソッド
propose_consensus_decision(decision_type: str, payload: Any, priority: int = 1, dependencies: Optional[List[str]] = None) -> Dict
コンセンサス決定を提案します。
パラメータ:
- decision_type: 決定タイプ
- payload: 決定ペイロード
- priority: 優先度
- dependencies: 依存関係のリスト
戻り値: 提案結果
get_consensus_result(proposal_id: str, timeout: float = 30.0) -> Dict
コンセンサス結果を取得します。
パラメータ:
- proposal_id: 提案ID
- timeout: 待機タイムアウト(秒)
戻り値: コンセンサス結果
update_node_status(node_id: str, active: bool) -> Dict
ノード状態を更新します。
パラメータ:
- node_id: ノードID
- active: アクティブ状態
戻り値: 更新結果
get_consensus_stats() -> Dict
コンセンサス統計を取得します。
戻り値: 統計データ
cleanup_consensus(max_age: float = 300.0) -> Dict
古いコンセンサスデータをクリーンアップします。
パラメータ:
- max_age: 保持する最大時間(秒)
戻り値: クリーンアップ結果
アーティファクト拡張メソッド
get_session_artifacts(session_id: str) -> Any
セッションのアーティファクトを取得します。
パラメータ:
- session_id: セッションID
戻り値: アーティファクトデータ
get_artifact(artifact_id: str, destination_path: Optional[str] = None) -> bytes
アーティファクトを取得します。
パラメータ:
- artifact_id: アーティファクトID
- destination_path: 保存先パス(オプション)
戻り値: アーティファクトデータ
非同期メソッド
generate_async(prompt: str, max_length: int = 50) -> Dict[str, str]
非同期テキスト生成。
submit_prompt_async(prompt: Optional[str] = None, image_path: Optional[str] = None, audio_path: Optional[str] = None) -> Dict
非同期マルチモーダルプロンプト送信。
health_check_async() -> Dict[str, Any]
非同期ヘルスチェック。
Jupyter統合クラス
JupyterAPIClient
Jupyter Notebook環境向けの拡張クライアント。
追加メソッド
set_display_mode(mode: str) -> None
出力表示モードを設定します。
パラメータ:
- mode: 表示モード ("html", "json", "text")
show_server_info() -> None
サーバー情報をリッチ形式で表示します。
show_stats() -> None
クライアント統計をリッチ形式で表示します。
validate_prompt_interactive(prompt: str) -> bool
プロンプトをインタラクティブに検証します。
WebSocketクライアント
WebSocketClient
リアルタイム通信用のWebSocketクライアント。
コンストラクタ
WebSocketClient(
ws_url: str = "ws://localhost:8000/ws",
reconnect_attempts: int = 5,
reconnect_delay: float = 2.0,
)
メソッド
connect() -> None
WebSocket接続を確立します。
disconnect() -> None
WebSocket接続を閉じます。
send_message(message: Dict[str, Any]) -> None
メッセージを送信します。
receive_message() -> Dict[str, Any]
メッセージを受信します。
register_handler(message_type: str, handler: Callable) -> None
メッセージハンドラーを登録します。
listen() -> None
メッセージをリッスンします。
エラーハンドリング
EvoSpikeNetAPIError
API関連の例外クラス。
属性
error_info: ErrorInfoオブジェクトmessage: エラーメッセージstatus_code: HTTPステータスコード(オプション)details: 追加のエラー詳細
ErrorInfo
エラー情報を格納するデータクラス。
属性
error_type: エラータイプmessage: エラーメッセージdetails: 追加の詳細情報retry_after: リトライまでの待機時間(秒)status_code: HTTPステータスコード
L5 自己進化 / 自己修復 API
最終更新日: 2026-03-11 (v2.2.0)
L5 自己進化システムのコアクラス群。ゲノム、進化エンジン、適応度評価、メモリ管理を「直接 Python import」で利用する場合のリファレンス。
GenomePool — evospikenet.genome_pool
from evospikenet.genome_pool import GenomePool, SelectionConfig, MutationConfig, CrossoverConfig
from evospikenet.snapshot_recovery import SnapshotManager
コンストラクタ
GenomePool(
population_size: int = 50,
selection_config: Optional[SelectionConfig] = None,
mutation_config: Optional[MutationConfig] = None,
crossover_config: Optional[CrossoverConfig] = None,
seed: Optional[int] = None,
snapshot_manager: Optional[SnapshotManager] = None, # v2.2.0新規
)
パラメータ:
- population_size: ポピュレーションサイズ
- selection_config: 選択戦略設定 (tournament / roulette / rank)
- mutation_config: 変異設定 (変異率・構造変異率等)
- crossover_config: 交叉設定 (uniform / single_point / two_point)
- snapshot_manager: 世代更新後に自動スナップショットを作成する SnapshotManager インスタンス
evolve_generation() -> PopulationStats
1世代進化し、統計を返す非同期メソッド。
snapshot_manager が設定されている場合、世代更新後に create_snapshot("post_generation_{N}") を自動呼び出す。
使用例:
import asyncio
from evospikenet.genome_pool import GenomePool
from evospikenet.snapshot_recovery import SnapshotManager
manager = SnapshotManager(snapshot_dir="/tmp/evo_snapshots")
pool = GenomePool(population_size=50, snapshot_manager=manager)
pool.initialize_population()
async def run():
for _ in range(10):
pool.update_fitness(pool.genomes[0].genome_id, 0.95)
stats = await pool.evolve_generation()
print(f"Gen {stats.generation}: best={stats.max_fitness:.3f}")
asyncio.run(run())
MutationEngine — evospikenet.evolution_engine
from evospikenet.evolution_engine import MutationEngine
from evospikenet.advanced_mutations import AdvancedMutationEngine, AdaptiveMutationConfig
コンストラクタ
MutationEngine(
base_mutation_rate: float = 0.05,
mutation_strength: float = 0.1,
advanced_engine: Optional[AdvancedMutationEngine] = None, # v2.2.0新規
)
パラメータ:
- base_mutation_rate: 遗伝子ごとの変異確率
- mutation_strength: パラメータ変化の大きさ
- advanced_engine: 基本変異完了後に apply_mutations() を呼び出す構造変異エンジン。None の場合は従来の基本変異のみ実行
mutate_genome(genome: EvoGenome) -> EvoGenome
ゲノム全体に変異を適用し、コピーを返す。
advanced_engine が設定されている場合、基本変異の後に高度構造変異(レイヤ追加/削除、スキップ接続、プルーニング等)を適用。
使用例:
from evospikenet.evolution_engine import MutationEngine
from evospikenet.advanced_mutations import AdvancedMutationEngine, AdaptiveMutationConfig
advanced = AdvancedMutationEngine(AdaptiveMutationConfig(base_rate=0.1))
engine = MutationEngine(base_mutation_rate=0.05, advanced_engine=advanced)
mutated = engine.mutate_genome(genome)
AdvancedMutationEngine — evospikenet.advanced_mutations
from evospikenet.advanced_mutations import (
AdvancedMutationEngine, AdaptiveMutationConfig, MutationType
)
コンストラクタ
AdvancedMutationEngine(config: AdaptiveMutationConfig = AdaptiveMutationConfig())
apply_mutations(genome, mutation_types=None, performance_data=None) -> EvoGenome
10種類の構造変異をゲノムに適用。
MutationType |
内容 |
|---|---|
ADD_LAYER_SMART |
数層の中間にレイヤを挿入、接続行列を自動更新 |
REMOVE_LAYER_SMART |
最小重要レイヤを削除、接続をブリッジ |
ADD_SKIP_CONNECTION |
最長スキップ接続を追加 |
REMOVE_SKIP_CONNECTION |
ランダムにスキップ接続を削除 |
PRUNE_CONNECTIONS |
重要度が下位20%の接続を2切り |
GROW_CONNECTIONS |
短距離から順に10%の接続を追加 |
MODULE_DUPLICATION |
モジュール(染色体)を複製 |
MODULE_FUSION |
2モジュールを結合 |
ADAPTIVE_LAYER_SIZE |
アクティビティに応じてレイヤサイズを増減 |
RECURRENT_CONNECTION |
再帰接続を追加 |
update_mutation_rates(generation, fitness_improvements)
各変異タイプの成功率に応じて変異率を適応的に更新。
FitnessEvaluator — evospikenet.fitness_evaluator
from evospikenet.fitness_evaluator import FitnessEvaluator, TaskBenchmark
_evaluate_robustness(genome, brain) -> Dict[str, float]
v2.2.0 でプレースホルダーから実装に置き換え。
brain が提供されている場合:
ノイズ注入テスト (σ=0.2)
クリーン入力とノイズ入力の出力差分 → noise_resistance
障害許容性テスト
中間層の重みを10%マスク後に出力差分 → failure_tolerance
brain が None の場合:
構造的プロキシ
スキップ接続数 → noise_resistance (0.5 + skips * 0.05)
再帰接続数 → stability (0.6 + recurrent * 0.04)
層深度 → failure_tolerance (深すぎる/浅すぎると下がる)
戸り値: {"noise_resistance": float, "failure_tolerance": float, "stability": float}
rollback_to_snapshot() — evospikenet.rollback
from evospikenet.rollback import rollback_to_snapshot, rollback_version
rollback_to_snapshot(snapshot_id, manager, target_path) -> Path
SnapshotManager からスナップショットをIDにより検索し、target_path に復元する便利関数。
パラメータ:
- snapshot_id: str — SnapshotManager.create_snapshot() が返すスナップショットID
- manager: SnapshotManager — スナップショットメタデータを保持するインスタンス
- target_path: str | Path — 復元先パス
戸り値: 復元されたファイルの Path
例外: DataLoadError — IDが不明またはファイルが存在しない場合
使用例:
from evospikenet.rollback import rollback_to_snapshot
from evospikenet.snapshot_recovery import SnapshotManager
manager = SnapshotManager(snapshot_dir="/tmp/evo_snapshots")
# スナップショット一覧を取得し最新を復元
snapshots = manager.list_snapshots()
if snapshots:
latest = sorted(snapshots, key=lambda x: x["timestamp"], reverse=True)[0]
restored = rollback_to_snapshot(
snapshot_id=latest["id"],
manager=manager,
target_path="/tmp/restored_state.gz",
)
print(f"復元完了: {restored}")
rollback_version(version_entry, target_path) -> Path
履歴エントリー辞書ファイルを直接指定して復元する低レベル関数。
パラメータ:
- version_entry: Dict — "path" キーを含む辞書
- target_path: str | Path — 復元先パス
データ型
Priority
優先度を表す列挙型。
LOW = "low"NORMAL = "normal"HIGH = "high"CRITICAL = "critical"
MessageType
メッセージタイプを表す列挙型。
NOTIFICATION = "notification"REQUEST = "request"RESPONSE = "response"EVENT = "event"
OptimizationType
最適化タイプを表す列挙型。
SPEED = "speed"MEMORY = "memory"ACCURACY = "accuracy"POWER = "power"
ArtifactType
アーティファクトタイプを表す列挙型。
MODEL = "model"LOG = "log"CONFIG = "config"DATA = "data"METRICS = "metrics"
使用例
基本的な使用例
<!-- TODO: update or remove - import faileNetAPIClient -->
# クライアント初期化
client = EvoSpikeNetAPIClient(base_url="http://localhost:8000")
# テキスト生成
result = client.generate("Hello, world!", max_length=100)
print(result['generated_text'])
# マルチモーダル処理
response = client.submit_prompt(
prompt="この画像を説明してください",
image_path="image.jpg"
)
result = client.poll_for_result(timeout=60)
print(result['response'])
アーティファクト管理
# アーティファクトアップロード
with open('model.pkl', 'rb') as f:
file_data = io.BytesIO(f.read())
result = client.upload_artifact(
session_id="session_123",
artifact_type="model",
name="trained_model",
file=file_data,
model_category="classification"
)
# アーティファクト一覧
artifacts = client.list_artifacts(artifact_type="model")
# アーティファクトダウンロード
client.download_artifact("artifact_id_123", "downloaded_model.pkl")
エラーハンドリング
<!-- モジュール 'evospikenet' が見つかりません。パッケージ内の移動/名前変更を確認してください -->
<!-t.generate("Test prompt")
except EvoSpikeNetAPIError as e:
print(f"API Error: {e.error_info.message}")
if e.error_info.retry_after:
print(f"Retry after: {e.error_info.retry_after}s")
Jupyter Notebookでの使用
<!-- TODO: update<!-- モジュール 'evospikenet' が見つかりません。パッケージ内の移動/名前変更を確認してください -->kenet.sdk_jupyter import Jup定
client.set_display_mode("html")
# サーバー情報表示
client.show_server_info()
# 統計表示
client.show_stats()
WebSocketを使用したリアルタイム通信
import asyncio
<!-- TODO: update or remove - impo<!-- モジュール 'evospikenet' が見つかりません。パッケージ内の移動/名前変更を確認してください -->WebSocketClient -->
async def main():
client = e_message(msg):
print(f"Received: {msg}")
client.register_handler("notification", handle_message)
# メッセージ送信
await client.send_message({
"type": "notification",
"data": "Hello from SDK!"
})
# リッスン開始
await client.listen()
asyncio.run(main())
ベストプラクティス
- 接続管理: 長時間実行するアプリケーションでは、セッションを再利用してください。
- エラーハンドリング: 常に適切な例外処理を実装してください。
- タイムアウト: ネットワーク条件に応じて適切なタイムアウトを設定してください。
- リトライ: 一時的なエラーに対しては自動リトライを使用してください。
- リソース管理: 大きなファイルを扱う場合は、ストリーミングを使用してください。
- 認証: APIキーは環境変数で管理してください。
- 監視: 定期的にクライアント統計を確認してください。
トラブルシューティング
一般的な問題
- 接続エラー: APIサーバーが起動しているか確認してください。
- 認証エラー: APIキーが正しく設定されているか確認してください。
- タイムアウト: ネットワーク条件やサーバー負荷を確認してください。
- メモリエラー: 大きなファイルを扱う場合は、ストリーミングを使用してください。
デバッグ情報
# クライアント統計の確認
stats = client.get_stats()
print(f"Requests: {stats['requests']}")
print(f"Errors: {stats['errors']}")
print(f"Average latency: {stats['average_latency']:.3f}s")
# サーバー情報の確認
info = client.get_server_info()
if info:
print(f"Server version: {info.get('version')}")
print(f"Loaded models: {info.get('models', [])}")
分散coordinatorメソッド
init_coordinator(node_id: str, zenoh_config: Optional[Dict[str, Any]] = None, raft_config: Optional[Dict[str, Any]] = None) -> None
分散coordinatorを初期化します。
パラメータ:
- node_id: このノードの一意な識別子
- zenoh_config: Zenoh DDS設定(オプション)
- raft_config: Raftコンセンサス設定(オプション)
例:
# 基本的な初期化
client.init_coordinator("node_1")
# カスタム設定での初期化
zenoh_config = {"connect": ["tcp/127.0.0.1:7447"]}
raft_config = {"election_timeout": 5000}
client.init_coordinator("node_1", zenoh_config, raft_config)
start_coordinator() -> None
分散coordinatorを開始します。
例外:
- RuntimeError: coordinatorが初期化されていない場合
例:
client.init_coordinator("node_1")
client.start_coordinator()
print("Coordinator started")
stop_coordinator() -> None
分散coordinatorを停止します。
例外:
- RuntimeError: coordinatorが初期化されていない場合
例:
client.stop_coordinator()
print("Coordinator stopped")
submit_coordination_task(task_type: str, payload: Dict[str, Any]) -> str
分散coordinationタスクを送信します。
パラメータ:
- task_type: タスクタイプ
- payload: タスクペイロードデータ
戻り値: タスクID
例外:
- RuntimeError: coordinatorが初期化されていない場合
例:
task_id = client.submit_coordination_task(
"federated_learning",
{"model": "resnet50", "dataset": "cifar10"}
)
print(f"Task submitted: {task_id}")
内部タスク実装メモ(SDK組み込み簡易版)
federated_learning:payload['updates']の数値フィールドを平均し、aggregated_parametersとaggregation_metaを付与。distributed_inference:payload['inputs']/payload['batches']をそのまま結果化し、各エントリにnode_idとstatus=completedを付加。model_aggregation:payload['models']のweightsリストを平均し、aggregated_model['weights']とaggregation_metaを付与。
get_coordination_task_status(task_id: str) -> Optional[Dict[str, Any]]
coordinationタスクの状態を取得します。
パラメータ:
- task_id: 確認するタスクID
戻り値: タスク状態情報、または見つからない場合はNone
例外:
- RuntimeError: coordinatorが初期化されていない場合
例:
status = client.get_coordination_task_status(task_id)
if status:
print(f"Task status: {status['status']}")
get_cluster_status() -> Dict[str, Any]
現在のクラスタ状態を取得します。
戻り値: クラスタ状態情報
例外:
- RuntimeError: coordinatorが初期化されていない場合
例:
status = client.get_cluster_status()
print(f"Active nodes: {status['active_nodes']}")
print(f"Leader: {status['leader_id']}")
register_coordination_node(node: Union[NodeInfo, str], node_info: Optional[Dict[str, Any]] = None) -> bool
coordinationクラスタに新しいノードを登録します。
パラメータ:
- node: NodeInfo もしくはノードID文字列
- node_info: node に文字列を渡す場合は必須。address, port, role, capabilities などを含む辞書。
戻り値: 登録が成功した場合はTrue
例外:
- RuntimeError: coordinatorが初期化されていない場合
例:
# NodeInfoを直接渡す
from datetime import datetime
node_info = NodeInfo(
node_id="node_2",
a=datetime.now(),
capabilities=["gpu", "cpu"],
)
success = client.register_coordination_node(node_info)
# ID + dict で渡す場合
success = client.register_coordination_node(
"node_3",
{"address": "192.168.1.101", "port": 9001, "role": "follower", "capabilities": ["cpu"]},
)
unregister_coordination_node(node_id: str) -> bool
coordinationクラスタからノードを解除します。
パラメータ:
- node_id: 解除するノードID
戻り値: 解除が成功した場合はTrue
例外:
- RuntimeError: coordinatorが初期化されていない場合
例:
success = client.unregister_coordination_node("node_2")
戻り値の構造詳細
generate() の戻り値
{
"generated_text": "生成されたテキスト",
"tokens": 150,
"latency_ms": 250
}
submit_prompt() の戻り値
{
"prompt_id": "prompt_12345",
"status": "submitted",
"timestamp": "2026-02-17T10:30:45Z",
"processing_nodes": {
"visual": "Vision-1",
"spatial": "Spatial-12", # Feature 13: Rank 12 (Where pathway)
"integration": "Spatial-14" # Feature 13: Rank 14 (Integration)
}
}
poll_for_result() の戻り値
{
"prompt_id": "prompt_12345",
"status": "completed",
"response": "応答テキスト",
"spatial_analysis": {
"rank_12_where": {
"positions": [[100, 200], [150, 250]],
"depth": [0.5, 0.7],
"latency_ms": 47
},
"rank_14_integration": {
"what_where_fusion": "結果",
"world_model": "..."
}
},
"processing_time_ms": 150
}
get_simulation_result() の戻り値
{
"response": "応答内容",
"nodes_involved": [
"Spatial-12", "Spatial-13", "Spatial-14", "Spatial-15"
],
"timestamp": "2026-02-17T10:31:00Z"
}
batch_generate() の戻り値
[
{
"generated_text": "テキスト1",
"tokens": 100,
"latency_ms": 200
},
{
"generated_text": "テキスト2",
"tokens": 150,
"latency_ms": 250
},
{
"error": "エラーメッセージ",
"prompt": "失敗したプロンプト"
}
]
Feature 13(空間認知・生成システム)対応
EvoSpikeNet SDK は、Rank 12-15の空間処理ノード(Feature 13)に対応しています。
| ノード | Rank | 処理対象 | 遅延目標 |
|---|---|---|---|
| SpatialWhereNode | 12 | 空間位置・奥行き推定 | <50ms |
| SpatialWhatNode | 13 | 物体認識・シーン生成 | <30ms |
| SpatialIntegrationNode | 14 | What-Where統合・世界モデル | <50ms |
| SpatialAttentionControlNode | 15 | サッカード計画・注意制御 | <30ms |
空間処理の例
# マルチモーダル入力で空間処理を実行
response = client.submit_prompt(
prompt="画像内の物体の位置を説明してください",
image_path="./sample_image.jpg"
)
result = client.poll_for_result(timeout=60)
# Rank 12-15の処理結果を取得
if result and 'spatial_analysis' in result:
where_result = result['spatial_analysis'].get('rank_12_where')
integration_result = result['spatial_analysis'].get('rank_14_integration')
print(f"Detected positions: {where_result['positions']}")
print(f"Integrated analysis: {integration_result}")
空間生成サービス(ニューラル版)
- エンドポイント:
POST /generate - 入力: JSON
input_text(str, <=4000 chars, required): 生成したいシーンの自然言語記述cognitive_context(object, optional): 注意/ストレス/エンゲージメント等の認知状態ヒントeeg_data(object, optional):signals配列で EEG 生データを渡すと LIF 解析を適用model_version(str, optional): サーバに保存された高精度モデルのバージョン。存在する場合はモデルのリロードが試みられ、high_precision出力フラグが更新される。
- バリデーション: 型チェックと最大長チェック(4000文字)。不正入力は
400 Bad Request。 - 出力: シーン仕様
scene_type,spatial_coordinates,spatial_layout,objects,dimensions,physics,lighting,navigationhigh_precision(bool): 現在高精度モデルがロードされているかmodel_version(str): 現在アクティブなモデルバージョンmetadata:confidence,processing_time,quantum_modulation_alpha,cognitive_entropy,encoding_norm,input_text,quality,quality_factor
ヘルスチェック
(追加情報)
- model_version: 現在ロードされている空間生成モデルのバージョン。
- high_precision: 上記モデルが高精度モードかどうか。
- エンドポイント:
GET /health - 主要フィールド:
status:running|error|degradeddevice:cpu|cudaneural_components_available: 全コンポーネント初期化に成功したかcomponents: language_adapter / encoder_decoder / attention / plasticity / quantum / embodied_physics / lif_layer / izhikevich_layer の可用性ブールmetrics: request_count, error_rate, avg_latency_ms, uptime_secondsknowledge_base_loaded: シーン/オブジェクトKB読込済みかlast_spike_cached: 直近のスパイク列がキャッシュされているか
サンプル(直接HTTP)
curl -X POST http://localhost:8000/generate \
-H "Content-Type: application/json" \
-d '{
"input_text": "Calm indoor room with a table and soft light",
"cognitive_context": {"attention_level": 0.6, "stress_level": 0.2}
}'
curl http://localhost:8000/health | jq
SDK ラッパー
from evospikenet.sdk import EvoSpikeNetAPIClient
# SDKを使った空間生成の例
client = EvoSpikeNetAPIClient(spatial_base_url="http://localhost:8000")
scene = client.spatial_generate(
"Calm indoor room with a table and soft warm lighting",
cognitive_context={"attention_level": 0.6, "stress_level": 0.2},
)
health = client.spatial_health()
print(scene.get("metadata", {}).get("confidence"))
量子補助推論
- エンドポイント:
POST /quantum/infer - 入力: JSON
scene(object):/generateで返されたシーンオブジェクトspike_trains(tensor/list): 強化に用いるスパイク列(省略可)
- 出力: 与えられた
sceneにmetadata.quantum_modulation_alphaやcognitive_entropyを追加した拡張シーン
RAG システム(バージョン付きインデックス)
- ベースURL例:
http://localhost:8001 - エンドポイント:
POST /upload_file: ファイルを検証→解析→オートチャンク→埋め込み→Milvus+Elasticsearchに格納。レスポンスにdoc_keyとversionが含まれる。POST /query: {"query": "...", "llm_type": "huggingface"|"evosdk"} で検索・生成。contextにヒット文書を返し、sdk_responseを含む場合あり。
- サポート拡張子(主要):
.pdf,.docx,.doc,.pptx,.ppt,.xlsx,.xls,.txt,.md,.markdown,.gdoc,.html - サンプル(docs/README.md を投入し検索):
export RAG_API_URL=${RAG_API_URL:-http://localhost:8001}
curl -X POST "$RAG_API_URL/upload_file" \
-F "file=@docs/README.md" | jq
curl -X POST "$RAG_API_URL/query" \
-H "Content-Type: application/json" \
-d '{"query": "What does this project do?", "llm_type": "huggingface"}' | jq
SDK ラッパー
from evospikenet.sdk import EvoSpikeNetAPIClient
# RAG用SDKラッパーの例
client = EvoSpikeNetAPIClient(rag_base_url="http://localhost:8001")
# ファイル投入
upload = client.rag_upload_file("docs/README.md")
print(upload.get("doc_key"))
# クエリ実行(SDKのrag_queryを利用)
answer = client.rag_query("What does this project do?", llm_type="huggingface")
if answer:
ctx = answer.get("context", [])
if ctx:
print(ctx[0].get("text"))
サンプルスクリプト:
- examples/spatial_generation_client.py — /generate と /health を叩く簡易クライアント。
- examples/rag_ingest_and_query.py — /upload_file でインデックス後に /query で検索。
- examples/rag_markdown_sdk.py — Markdown ファイルを SDK 経由で投入・検索。
エラーハンドリング
SDK はEvoSpikeNetAPIError例外でエラーを処理します。
from evospikenet.sdk import EvoSpikeNetAPIClient, EvoSpikeNetAPIError
client = EvoSpikeNetAPIClient()
try:
result = client.generate("Hello, world!")
except EvoSpikeNetAPIError as e:
print(f"Error type: {e.error_info.error_type}")
print(f"Details: {e.error_info.details}")
if e.error_info.retry_after:
print(f"Retry after {e.error_info.retry_after} seconds")
実装ガイド
推奨実装パターン
1. 同期処理(シンプルなスクリプト)
from evospikenet.sdk import EvoSpikeNetAPIClient
client = EvoSpikeNetAPIClient()
client.submit_prompt(prompt="テキスト生成クエリ")
# ポーリングで結果取得
result = client.poll_for_result(timeout=60, interval=5)
if result:
print(f"Response: {result.get('response')}")
ケンシャルなリクエスト)
prompts = ["クエリ1", "クエリ2", "クエリ3"]
results = client.batch_generate(prompts, max_length=100)
for i, result in enumerate(results):
if 'error' not in result:
print(f"Result {i}: {result.get('generated_text')}")
利点: 複数リクエストを簡潔に処理
用途: テストデータ生成、ベースライン実験、データセット準備
3. 非同期処理(高スループット)
詳細は async_spatial_processing_example.py を参照してください。
import asyncio
from evospikenet.sdk import EvoSpikeNetAPIClient
async def run_async_examples():
client = EvoSpikeNetAPIClient()
tasks = [
client.submit_prompt_async("クエリ1"),
client.submit_prompt_async("クエリ2"),
client.submit_prompt_async("クエリ3"),
]
responses = await asyncio.gather(*tasks)
print(responses)
asyncio.run(run_async_examples())
利点: 高スループット、リソース効率的
用途: 大規模バッチ処理、実運用システム、マイクロサービス
サンプルコード
基本: Feature 13 空間処理
# 同期サンプル(推奨: スクリプト・実験)
python examples/spatial_processing_example.py
# 非同期サンプル(推奨: 本番・高スループット)
python examples/async_spatial_processing_example.py
詳細機能:
- 同期サンプル spatial_processing_example.py:
- ✅ テキスト生成と空間分析
- ✅ マルチモーダル入力(画像+テキスト)
- ✅ バッチ処理
- ✅ ヘルスチェック
- 非同期サンプル
async_spatial_processing_example.py: - ✅ 同時実行タスク
- ✅ レート制限付き送信
- ✅ 非同期ヘルスチェック
- ✅ エラーハンドリング
ベストプラクティス
1. タイムアウト管理
# 推奨: 適切なタイムアウトを設定
result = client.poll_for_result(
timeout=120, # 2分
interval=5 # 5秒ごとにポーリング
)
2. エラー処理
try:
result = client.submit_prompt(prompt="クエリ")
except EvoSpikeNetAPIError as e:
if e.error_info.error_type in ["timeout", "server_error"]:
# リトライ可能なエラー
pass
else:
# リトライ不可なエラー(入力エラー等)
pass
3. 並行処理のスケーリング
# 小規模 (< 10リクエスト): 同期処理
for prompt in small_list:
result = client.generate(prompt)
# 大規模 (> 100リクエスト): 非同期処理
import asyncio
tasks = [client.generate_async(p) for p in large_list]
results = asyncio.run(asyncio.gather(*tasks))
4. リソース効率
# ✅ 良い例: コンテキストマネージャ
with client.create_session() as session:
for prompt in prompts:
result = client.submit_prompt(prompt)
# ❌ 避けるべき: 毎回新規接続
for prompt in prompts:
new_client = EvoSpikeNetAPIClient() # 非効率
仕様と制限
API制限
| 項目 | 制限値 |
|---|---|
| 最大プロンプト長 | 10,000文字 |
| 最大生成長 | 5,000文字 |
| タイムアウト | 300秒(5分) |
| 最大バッチサイズ | 100 |
| 同時接続数 | 1,000 |
| QPS(クエリ/秒) | 1,000 |
性能指標(Feature 13)
| メトリクス | 目標値 |
|---|---|
| テキスト生成 | < 500ms |
| 空間分析 | < 200ms (Rank 12-15) |
| バッチ処理 | 100/秒 (100リクエスト/秒) |
| 可用性 | 99.9% |
| P99レイテンシ | 1,000ms |
Feature 36: 自動リカバリーメソッド
概要
AutoRecoveryEngine は AI ベースの異常検知とプレイブック自動実行によってシステムを自律回復させます。目標 MTTR -80%。
APIエンドポイント一覧
| メソッド | パス | 説明 |
|---|---|---|
| GET | /api/recovery/status |
リカバリーエンジンのステータス取得 |
| GET | /api/recovery/incidents |
インシデント一覧取得 |
| GET | /api/recovery/incidents/{id} |
特定インシデント取得 |
| POST | /api/recovery/incidents/{id}/acknowledge |
インシデントを確認済みに変更 |
| POST | /api/recovery/incidents/{id}/resolve |
インシデントを解決済みに変更 |
| POST | /api/recovery/trigger |
異常診断を手動トリガー |
get_recovery_status()
import requests
resp = requests.get(
"http://localhost:8000/api/recovery/status",
headers={"X-API-Key": API_KEY}
)
status = resp.json()
# {
# "total_incidents": 5,
# "open_incidents": 1,
# "mttr_seconds": 120.4,
# "monitoring_interval_seconds": 30,
# }
list_incidents(status, severity, limit)
params = {"status": "open", "severity": "critical", "limit": 20}
resp = requests.get(
"http://localhost:8000/api/recovery/incidents",
headers={"X-API-Key": API_KEY},
params=params,
)
incidents = resp.json()["incidents"]
| パラメータ | 型 | 説明 |
|---|---|---|
status |
str | open / acknowledged / resolved / auto_resolved |
severity |
str | low / medium / high / critical |
limit |
int | 最大取得件数(デフォルト: 50) |
trigger_diagnosis(metrics)
payload = {
"cpu_percent": 95.0,
"memory_percent": 80.0,
"db_connected": False,
"error_rate": 0.15,
}
resp = requests.post(
"http://localhost:8000/api/recovery/trigger",
headers={"X-API-Key": API_KEY},
json=payload,
)
result = resp.json()
# { "status": "incident_created", "incident": { "id": "...", ... } }
# または
# { "status": "no_anomaly_detected" }
完全なサンプルコード
→ auto_recovery_sdk.py
Feature 39: 監査ログメソッド
概要
AuditLogManager は SHA-256 ハッシュチェーンによる改ざん検知機能付き監査ログを提供します。全 HTTP リクエストは自動記録されます。
APIエンドポイント一覧
| メソッド | パス | 説明 |
|---|---|---|
| GET | /api/audit/stats |
統計情報取得 |
| GET | /api/audit/logs |
ログ検索 |
| POST | /api/audit/log |
エントリ手動書き込み |
| GET | /api/audit/verify |
ハッシュチェーン検証 |
| GET | /api/audit/export |
ログエクスポート (JSON/CSV) |
get_audit_stats()
resp = requests.get(
"http://localhost:8000/api/audit/stats",
headers={"X-API-Key": API_KEY}
)
stats = resp.json()
query_audit_logs(actor, action, resource, result, since, until, limit)
params = {
"actor": "admin_user",
"action": "model.train",
"result": "success",
"limit": 100,
}
resp = requests.get(
"http://localhost:8000/api/audit/logs",
headers={"X-API-Key": API_KEY},
params=params,
)
entries = resp.json()["entries"]
| パラメータ | 型 | 説明 |
|---|---|---|
actor |
str | 操作者でフィルタ |
action |
str | アクション前方一致フィルタ |
result |
str | success / failure / error |
since / until |
str | 期間 ISO8601 |
limit |
int | デフォルト: 100 |
write_audit_entry(action, actor, resource, result, detail)
resp = requests.post(
"http://localhost:8000/api/audit/log",
headers={"X-API-Key": API_KEY},
json={
"action": "model.deploy",
"actor": "ci_pipeline",
"resource": "/api/models/genome_v3",
"result": "success",
"detail": {"version": "3.2.1"},
},
)
# { "status": "logged", "entry_id": "...", "entry_hash": "sha256:..." }
verify_chain()
resp = requests.get(
"http://localhost:8000/api/audit/verify",
headers={"X-API-Key": API_KEY}
)
# { "valid": true, "checked": 1500, "error": null }
export_logs(format)
# JSON形式
resp = requests.get("http://localhost:8000/api/audit/export",
headers={"X-API-Key": API_KEY}, params={"format": "json"})
logs_json = resp.json()
# CSV形式
resp = requests.get("http://localhost:8000/api/audit/export",
headers={"X-API-Key": API_KEY}, params={"format": "csv"})
csv_text = resp.text
完全なサンプルコード
→ audit_log_sdk.py
Feature 40: 地理的分散ノード管理メソッド
Note: 部分実装モジュールあり。APIは利用可能だが テスト&ドキュメントのリファインを継続中。
概要
GeoNodeManager はマルチリージョンのノード管理、自動フェイルオーバー、リージョン間レイテンシー計測を提供します。
APIエンドポイント一覧
| メソッド | パス | 説明 |
|---|---|---|
| GET | /api/geo/status |
全体ステータス取得 |
| GET/POST | /api/geo/regions |
リージョン一覧/登録 |
| GET/DELETE | /api/geo/regions/{id} |
リージョン詳細/削除 |
| GET/POST | /api/geo/nodes |
ノード一覧/登録 |
| DELETE | /api/geo/nodes/{id} |
ノード削除 |
| POST | /api/geo/failover |
フェイルオーバー実行 |
| GET | /api/geo/failover/history |
フェイルオーバー履歴 |
| GET | /api/geo/latency-matrix |
レイテンシー行列 |
| GET/PUT | /api/geo/active-region |
アクティブリージョン取得/変更 |
| GET | /api/geo/replication-group/{id} |
レプリケーショングループ |
register_node(node_id, region_id, endpoint, node_type)
resp = requests.post(
"http://localhost:8000/api/geo/nodes",
headers={"X-API-Key": API_KEY},
json={
"node_id": "prod-gpu-node-01",
"region_id": "ap-northeast-1",
"endpoint": "10.0.1.100:8000",
"node_type": "gpu",
},
)
# { "status": "registered", "node_id": "prod-gpu-node-01" }
trigger_failover(from_region, to_region, reason, triggered_by)
resp = requests.post(
"http://localhost:8000/api/geo/failover",
headers={"X-API-Key": API_KEY},
json={
"from_region": "ap-northeast-1",
"to_region": "us-east-1",
"reason": "region_unavailable",
"triggered_by": "monitoring_system",
},
)
# { "status": "failover_executed", "event": { ... } }
get_latency_matrix()
resp = requests.get(
"http://localhost:8000/api/geo/latency-matrix",
headers={"X-API-Key": API_KEY}
)
matrix = resp.json()["matrix"]
# matrix["ap-northeast-1"]["us-east-1"] == 145.3 (ms)
GeoRegion構造
{
"region_id": "ap-northeast-1",
"display_name": "Asia Pacific (Tokyo)",
"provider": "aws",
"latitude": 35.6762,
"longitude": 139.6503,
"priority": 1,
"status": "online",
"node_count": 4
}
性能指標(Feature 36/39/40)
| メトリクス | 目標値 |
|---|---|
| 監査ログ書き込みスループット | ≥ 1,000 エントリ/秒 |
| ハッシュチェーン検証 (1,000 件) | < 5,000 ms |
| 異常検知レイテンシ (p99) | < 10 ms |
| レイテンシー行列計算 (50 リージョン) | < 500 ms |
| インシデント作成レイテンシ | < 100 ms |
| 自動フェイルオーバー完了時間 | < 30 秒 |
完全なサンプルコード
→ geo_node_manager_sdk.py
生物模倣統合 API(BrainSimulationFramework)
更新日: 2026-03-06
Phase A/B 全 11 項目完了。詳細評価:docs-dev/biomimetic_integration_evaluation.mdv2.0 (8.7/10)
BrainSimulationFramework
evospikenet.brain_simulation モジュールの最上位統合クラス。biomimetic/ 全モジュール(神経調節・STDP・睡眠・回路・皮質トポロジー・DMN等)を SNN コアと結合する。
コンストラクタ
BrainSimulationFramework(
enable_biomimetic: bool = False,
config: Optional[BrainSimulationConfig] = None,
)
パラメータ:
- enable_biomimetic: True にすると biomimetic/ 全モジュールを初期化してフルループ統合を有効化。
- config: シミュレーション設定オブジェクト(省略時はデフォルト)。
メソッド
run_simulation(duration: int = 1000) -> Dict[str, Any]
生物模倣 6 フェーズパイプラインを実行します。
フェーズ:
1. DevelopmentalSchedule.plasticity_multiplier(t) — 発達スケジュール
2. NeuralCircuitModeler.simulate_timestep() — 回路シミュレーション(Izhikevich 対応)
3. STDP × NeuromodulatorGate.gated_learning_rate() — 可塑性変調
4. NodeEnergyBudget.energy_fitness_term() — エネルギーホメオスタシス
5. HippocampalBuffer.store(episode) — エピソード記憶記録
6. SleepConsolidation.offline_consolidation() — 睡眠統合リプレイ
戻り値: {phase_results, biomimetic_status, duration_ms} を含む辞書
run_idle_phase(duration_s: float = 10.0) -> Dict[str, Any]
DMN(デフォルトモードネットワーク)アイドルサイクルを非同期実行します。
import asyncio
activities = asyncio.run(framework.run_idle_phase(duration_s=10.0))
戻り値: DMN 活動ログと各時間ステップのスパイク記録の辞書
biomimetic_status() -> Dict[str, Any]
全生物模倣モジュールの状態スナップショットを返します。
status = framework.biomimetic_status()
# {
# "stdp_connected_gate": True,
# "sleep_consolidation_replay": True,
# "izhikevich_circuits": 1,
# "cortical_columns_registered": 0,
# "neuromodulator_registry_linked": True,
# "efference_copy_adaptive": True,
# "mirror_neuron_default_classifier": True,
# "dmn_idle_phase_available": True,
# }
NeuralCircuitModeler(Izhikevich バックエンド)
コンストラクタ
NeuralCircuitModeler(
config: NeuralCircuitConfig,
neuron_type: str = "lif", # "lif" | "izhikevich"
)
パラメータ:
- neuron_type: "izhikevich" を指定すると IzhikevichNeuron.step() バックエンドを使用。RS/IB/CH/FS/LTS 等の多様な発火パターンに対応。
simulate_timestep(input_current, t) -> Tuple[np.ndarray, np.ndarray]
1 タイムステップ分 (dt=1ms) のシミュレーションを実行。
戻り値: (spike_array, membrane_voltages)
BrainRegionIntegrator
皮質トポロジーと脳領域の統合管理クラス。
add_cortical_topology(generator, nx_cols, ny_cols) -> int
CorticalTopologyGenerator が生成したカラム配置を BrainRegionConfig として一括登録し、隣接カラム(距離 ≤ √2 mm)間に小世界的接続を設定します。
from evospikenet.biomimetic import CorticalTopologyGenerator
from evospikenet.brain_simulation import BrainRegionIntegrator
gen = CorticalTopologyGenerator()
integrator = BrainRegionIntegrator()
n = integrator.add_cortical_topology(gen, nx_cols=4, ny_cols=4)
print(n) # 16
パラメータ:
- generator: CorticalTopologyGenerator インスタンス
- nx_cols: X 方向カラム数
- ny_cols: Y 方向カラム数
戻り値: 登録されたカラム数 (int)
STDP — 生物模倣拡張メソッド
STDP.with_neuromodulation(gate) -> STDP(クラスメソッド)
ファクトリメソッド。NeuromodulatorGate を注入した STDP インスタンスを生成します。
gate = NeuromodulatorGate()
stdp = STDP.with_neuromodulation(gate)
STDP.connect_plasticity_gate(gate) -> None
既存の STDP インスタンスに NeuromodulatorGate を後付け接続します。
stdp = STDP()
stdp.connect_plasticity_gate(gate)
SleepConsolidation — 拡張メソッド
offline_consolidation(episodes, stdp) -> Dict[str, Any]
エピソードリストを STDP リプレイ学習で統合します。
stats = sleep.offline_consolidation(episodes=buffer.replay(), stdp=stdp)
# stats: {replayed_episodes, weight_updates_mean, replay_duration_ms}
戻り値 (stats): replayed_episodes, weight_updates_mean, replay_duration_ms を含む辞書
NeuromodulatorGate — Registry 連携メソッド
connect_to_registry(registry) -> None
NeuromodulatorRegistry と双方向ブリッジを確立します。
push_to_registry() -> None
Gate の現在状態(DA/ACh/OT レベル等)を Registry に書き込みます。
pull_from_registry() -> None
Registry の値を Gate state に反映します。
gate = NeuromodulatorGate()
registry = NeuromodulatorRegistry()
gate.connect_to_registry(registry)
gate.push_to_registry()
gate.pull_from_registry()
EfferenceCopy — 適応ゲインメソッド
adaptive_gain_update(prediction_error: float) -> float
予測誤差に基づいてゲインを適応更新します。
reset() -> None
ゲイン・内部状態を初期値にリセットします。
efference = EfferenceCopy()
gain = efference.adaptive_gain_update(prediction_error=0.3)
efference.reset()
完全なサンプルコード
→ sdk_distributed_brain.py(demonstrate_biomimetic_brain_simulation() 関数を参照)
テスト一覧
| テストファイル | 対象 |
|---|---|
tests/unit/test_biomimetic_init_api.py |
__init__.py 全シンボル |
tests/unit/test_stdp_neuromodulation.py |
STDP ↔ Gate 配線 |
tests/unit/test_sleep_consolidation_stdp.py |
offline_consolidation() + stats |
tests/unit/test_efference_copy_adaptive.py |
適応ゲイン・reset() |
tests/unit/test_mirror_neurons_default_classify.py |
_default_classify() |
tests/integration/test_brain_simulation_biomimetic.py |
BrainSimulationFramework フル統合 |
tests/integration/test_dmn_idle_phase.py |
run_idle_phase() / DMN 停止確認 |
# Docker で生物模倣テスト一括実行
docker compose -f docker-compose.test.yml --profile biomimetic run --rm biomimetic-test
生物模倣 REST API エンドポイント
/biomimetic/* 配下のエンドポイントは HTTP 経由で生物模倣モジュール全体を操作します。
requests ライブラリまたは EvoSpikeNetAPIClient._make_request() で呼び出せます。
import requests
BASE_URL = "http://localhost:8000"
API_KEY = "your-api-key" # 不要な場合は空文字列でも可
HEADERS = {"X-API-Key": API_KEY} if API_KEY else {}
エンドポイント一覧
| メソッド | パス | 説明 |
|---|---|---|
| GET | /biomimetic/status |
全モジュール状態スナップショット |
| POST | /biomimetic/simulate |
フル脳シミュレーション実行 |
| POST | /biomimetic/neuromod/update |
神経調節物質レベル更新 |
| POST | /biomimetic/reward |
TD 報酬シグナル送信 |
| POST | /biomimetic/sleep/consolidate |
オフライン記憶固定(リプレイ) |
| POST | /biomimetic/sleep/wake-config |
睡眠覚醒サイクル設定 |
| POST | /biomimetic/framework/reset |
フレームワーク再初期化 |
GET /biomimetic/status
全生物模倣モジュールの現在状態を返します。
resp = requests.get(f"{BASE_URL}/biomimetic/status", headers=HEADERS)
data = resp.json()
# {
# "biomimetic_enabled": true,
# "neuromod_levels": {"dopamine": 0.5, "noradrenaline": 0.4, ...},
# "energy_budget": {"current_w": 9.2, "budget_w": 10.0},
# "sleep_stats": {"cycles_completed": 3, "replayed_events": 128},
# }
POST /biomimetic/simulate
NeuralCircuitConfig の全パラメータを指定してシミュレーションを実行します。
payload = {
# NeuralCircuitConfig
"num_neurons": 1000,
"connection_probability": 0.1,
"excitatory_ratio": 0.8,
"inhibitory_ratio": 0.2,
"refractory_period": 5,
"membrane_time_constant": 20.0,
# BrainSimulationFramework
"enable_biomimetic": True,
"development_epoch": 0,
"total_epochs": 1000,
"energy_budget_w": 10.0,
# シミュレーション制御
"duration": 1000,
"plasticity_rule": "stdp", # "stdp" | "bcm" | "oja"
"plasticity_interval": 10,
"sleep_every": 500,
"sleep_cycles": 3,
"neuron_type": "lif", # "lif" | "izhikevich"
}
resp = requests.post(f"{BASE_URL}/biomimetic/simulate", headers=HEADERS, json=payload)
data = resp.json()
# {
# "status": "ok",
# "duration_ms": 3842,
# "spikes_total": 18423,
# "biomimetic_status": { ... }
# }
| パラメータ | 型 | デフォルト | 説明 |
|---|---|---|---|
num_neurons |
int | 1000 | 回路ニューロン数 |
connection_probability |
float | 0.1 | 結合確率 |
excitatory_ratio |
float | 0.8 | 興奮性ニューロン比率 |
inhibitory_ratio |
float | 0.2 | 抑制性ニューロン比率 |
refractory_period |
int | 5 | 不応期 (ms) |
membrane_time_constant |
float | 20.0 | 膜時定数 (ms) |
enable_biomimetic |
bool | true | 生物模倣モジュールを有効化 |
development_epoch |
int | 0 | 現在の developmental epoch |
total_epochs |
int | 1000 | 総エポック数 |
energy_budget_w |
float | 10.0 | ノードエネルギー上限 (W) |
duration |
int | 1000 | シミュレーション長 (ms) |
plasticity_rule |
str | "stdp" | 可塑性ルール |
plasticity_interval |
int | 10 | 可塑性更新間隔 (steps) |
sleep_every |
int | 500 | 睡眠フェーズ開始タイミング (steps) |
sleep_cycles |
int | 3 | 睡眠サイクル数 |
neuron_type |
str | "lif" | ニューロンモデル種別 |
POST /biomimetic/neuromod/update
ドーパミン等の神経調節物質レベルを即時更新します。
payload = {
"dopamine": 0.8, # 報酬・動機付け (0.0–1.0)
"noradrenaline": 0.4, # 覚醒・注意
"acetylcholine": 0.6, # 記憶・学習
"serotonin": 0.5, # 気分・衝動制御
"oxytocin": 0.3, # 社会的絆
"emotion_factor": 0.7, # 全般的感情強度
"motivation_factor": 0.9, # 行動への動機付け
}
resp = requests.post(f"{BASE_URL}/biomimetic/neuromod/update", headers=HEADERS, json=payload)
data = resp.json()
# {
# "message": "Neuromodulator levels updated",
# "updated_levels": {"dopamine": 0.8, "serotonin": 0.5, ...}
# }
| パラメータ | 型 | 説明 |
|---|---|---|
dopamine |
float | ドーパミン値 (0.0–1.0) |
noradrenaline |
float | ノルアドレナリン値 |
acetylcholine |
float | アセチルコリン値 |
serotonin |
float | セロトニン値 |
oxytocin |
float | オキシトシン値 |
emotion_factor |
float | 感情強度係数 |
motivation_factor |
float | 動機付け係数 |
POST /biomimetic/reward
TD 報酬シグナルを送信してドーパミン系モジュールを更新します。
payload = {
"reward": 1.0, # 報酬値 (-1.0 ~ 1.0)
"td_error": 0.35, # 時間差分誤差
"update_neuromod": True, # ドーパミン系を即時更新するか
}
resp = requests.post(f"{BASE_URL}/biomimetic/reward", headers=HEADERS, json=payload)
data = resp.json()
# {
# "status": "ok",
# "dopamine_level": 0.73,
# }
POST /biomimetic/sleep/consolidate
海馬バッファに蓄積された記憶トレースをオフラインでリプレイし STDP を適用します。
payload = {
"sleep_cycles": 3, # 実行するスリープサイクル数
"replay_buffer_size": 256, # リプレイバッファサイズ (スパイク数)
"stdp_lr": 0.01, # STDP 学習率 (consolidation 中)
}
resp = requests.post(f"{BASE_URL}/biomimetic/sleep/consolidate", headers=HEADERS, json=payload)
data = resp.json()
# {
# "status": "ok",
# "cycles_completed": 3,
# "replayed_events": 128,
# }
POST /biomimetic/sleep/wake-config
SleepWakeCycleController のスケジュールパラメータを更新します。
payload = {
"awake_phase_steps": 500, # 覚醒フェーズのステップ数
"sleep_phase_steps": 100, # 睡眠フェーズのステップ数
"sleep_cycles_per_epoch": 3, # エポックごとのサイクル数
}
resp = requests.post(f"{BASE_URL}/biomimetic/sleep/wake-config", headers=HEADERS, json=payload)
data = resp.json()
# {
# "status": "ok",
# "config_applied": {"awake_phase_steps": 500, ...}
# }
POST /biomimetic/framework/reset
BrainSimulationFramework を再初期化し、全モジュールの状態をリセットします。
payload = {
"enable_biomimetic": True, # リセット後もバイオミメティクスを有効にするか
}
resp = requests.post(f"{BASE_URL}/biomimetic/framework/reset", headers=HEADERS, json=payload)
data = resp.json()
# {
# "status": "ok",
# "message": "BrainSimulationFramework reinitialized",
# }
EvoSpikeNetAPIClient 経由での呼び出し
専用メソッドがない biomimetic エンドポイントは _make_request() で呼び出せます。
SDK セッション機能(自動リトライ、ヘッダー管理)が利用できます。
from evospikenet.sdk import EvoSpikeNetAPIClient
client = EvoSpikeNetAPIClient(base_url="http://localhost:8000", api_key=API_KEY)
# GET /biomimetic/status
status = client._make_request("GET", f"{client.base_url}/biomimetic/status")
# POST /biomimetic/neuromod/update
result = client._make_request(
"POST",
f"{client.base_url}/biomimetic/neuromod/update",
json={"dopamine": 0.8, "serotonin": 0.5},
)
完全なサンプルコード
→ sdk_biomimetic_rest_api.py
# 全エンドポイントをパイプラインで実行
python examples/sdk/sdk_biomimetic_rest_api.py --base-url http://localhost:8000
# 個別デモ実行
python examples/sdk/sdk_biomimetic_rest_api.py --demo simulate
python examples/sdk/sdk_biomimetic_rest_api.py --demo neuromod
python examples/sdk/sdk_biomimetic_rest_api.py --demo sleep
Phase E-3 コネクトーム本番化・自動同期 API
最終更新日: 2026-03-19(Phase E-3 全完了)
sync_connectome — 自動同期パイプライン(E-3-1)
apply_delta(base_path, delta_path, result_path)
差分 JSON を既存 NPZ に適用し、新しい NPZ をアトミックに生成する。
from scripts.sync_connectome import apply_delta
apply_delta(
base_path="data/connectome/cache/visual.npz",
delta_path="data/connectome/delta_v42.json",
result_path="data/connectome/cache/visual_v43.npz",
)
差分 JSON スキーマ:
{
"schema_version": "1.0",
"materialization_version_from": 42,
"materialization_version_to": 43,
"added": [
{"pre_id": 100, "post_id": 200, "weight": 0.15, "delay_ms": 1.5}
],
"removed": [
{"pre_id": 50, "post_id": 80}
]
}
apply_delta_with_validation(base_path, delta_path, result_path, *, rollback_dir, ei_ratio_range)
E/I 比検証付きで差分を適用。範囲外の場合 rollback_dir にバックアップを生成して ConnectomeSyncValidationError を送出。
from scripts.sync_connectome import apply_delta_with_validation, ConnectomeSyncValidationError
try:
apply_delta_with_validation(
base_path="visual.npz",
delta_path="delta.json",
result_path="visual_new.npz",
rollback_dir="data/connectome/rollback/",
ei_ratio_range=(3.5, 5.0),
)
except ConnectomeSyncValidationError as e:
print(f"E/I検証失敗・ロールバック完了: {e}")
sync_connectome(config_path, cache_path, output_path, *, dry_run, ...)
完全自動同期オーケストレーター。CAVE API デルタ取得に始まり適用まで一勧。
from scripts.sync_connectome import sync_connectome
result = sync_connectome(
config_path="config/connectome_config.yaml",
cache_path="data/connectome/cache/visual.npz",
output_path="data/connectome/cache/visual.npz",
dry_run=False,
)
print(result["status"]) # "success" | "dry_run" | "no_update"
CLI 使用例
# 通常実行
python scripts/sync_connectome.py \
--config config/connectome_config.yaml \
--cache data/connectome/cache/visual.npz \
--output data/connectome/cache/visual.npz
# ドライラン(出力ファイルを更新せず結果を表示)
python scripts/sync_connectome.py --dry-run \
--cache data/connectome/cache/visual.npz \
--output /dev/null
brain_routing — HCP 遅延アウェア Zenoh ルーティング(E-3-3)
compute_delay_matrix(manifest, config_path)
HCP 実測値からノード間遅延行列を構範する。
from evospikenet.brain_routing import compute_delay_matrix
from evospikenet.connectome import build_manifest
manifest = build_manifest("data/connectome/")
delay_matrix = compute_delay_matrix(
manifest=manifest,
config_path="config/connectome_config.yaml",
)
# {"pfc": {"memory_spike": 12.0, "visual": 9.0, ...}, ...}
print(delay_matrix["pfc"]["memory_spike"]) # 12.0 ms
build_hcp_routing_table(config_path)
フルパイプラインでルーティングテーブルを生成。
from evospikenet.brain_routing import build_hcp_routing_table
routing = build_hcp_routing_table("config/connectome_config.yaml")
# routing["delay_matrix"] — {src: {dst: ms}}
# routing["routing_plan"]["routing_edges"] — priority降順エッジリスト
# routing["zenoh_topics"] — "brain_routing/delays/{node_id}" リスト
HCPDelayRouter
Zenoh セッションに遅延プロファイルを発行するクラス。session=None でも動作する(ログのみ)。
from evospikenet.brain_routing import HCPDelayRouter
# Zenoh セッションなしで動作するテスト
# session=None の場合はルーティング機能はログのみ
router = HCPDelayRouter(session=None, config_path="config/connectome_config.yaml")
router.load_routing_table()
# データペイロードに遅延プロファイルを付与
data = {"spikes": [1, 0, 1], "timestamp": 1.0}
enriched = router.apply_hcp_delays(node_id="pfc", data=data)
# {"spikes": [...], "timestamp": 1.0, "routing_delays": {"memory_spike": 12.0, ...}}
# 全ノードに遅延情報を一括発行
# Zenoh トピック: brain_routing/delays/{node_id}
router.publish_all()
auto_node_mapper — Auto Node Mapper CLI
map_connectome(input_path, output_dir, config_path, *, dry_run, seed)
コネクトームをノード別 NPZ に分割する。
from scripts.auto_node_mapper import map_connectome
result = map_connectome(
input_path="data/connectome/flywire_visual.json",
output_dir="data/connectome/nodes/",
config_path="config/connectome_config.yaml",
dry_run=False,
seed=42,
)
print(f"マッピング完了: {result.total_nodes} ノード中 {result.mapped_nodes} 成功")
print(f"マニフェスト: {result.manifest_path}")
for entry in result.entries:
print(f" {entry.node_type}: {entry.n_neurons} neurons, E/I={entry.ei_ratio:.2f}, {entry.status}")
出力構造:
data/connectome/nodes/
├── visual.npz ← 視覚野ノード用圧縮 NPZ
├── memory_spike.npz ← 記憶ノード用
└── node_manifest.yaml ← 全ノードメタデータ
node_manifest.yaml スキーマ:
schema_version: "1.0"
generated_at: "2026-03-19T10:00:00Z"
base_dataset: "flywire_visual"
nodes:
visual:
npz_path: data/connectome/nodes/visual.npz
n_neurons: 1024
ei_ratio: 4.1
coarsening_method: stratified_sample
status: ok
memory_spike:
npz_path: data/connectome/nodes/memory_spike.npz
n_neurons: 512
ei_ratio: 4.0
coarsening_method: spectral_coarsen
status: ok
CLI 使用例
# フルマッピング
python scripts/auto_node_mapper.py \
--input data/connectome/flywire_visual.json \
--output-dir data/connectome/nodes/ \
--config config/connectome_config.yaml
# ドライラン・定数固定で再現性あり
python scripts/auto_node_mapper.py \
--input data/connectome/flywire_visual.json \
--output-dir data/connectome/nodes/ \
--dry-run --seed 42
全構コネクトーム E-3 ステップバイステップサンプル
完全な初期化から定期同期までの典型的なワークフロー:
import evospikenet as esn
from scripts.auto_node_mapper import map_connectome
from scripts.sync_connectome import sync_connectome
from evospikenet.brain_routing import HCPDelayRouter
from evospikenet import load_connectome_npz, apply_connectome_to_layer, ConnectomeLIFLayer
# --- ステップ 1: コネクトーム → ノード別 NPZ に分割 ---
result = map_connectome(
input_path="data/connectome/flywire_visual.json",
output_dir="data/connectome/nodes/",
config_path="config/connectome_config.yaml",
seed=42,
)
print(f"E-3 Auto Node Mapper: {result.mapped_nodes} nodes mapped")
# --- ステップ 2: ノード別 NPZ を ConnectomeLIFLayer に注入 ---
visual_data = load_connectome_npz("data/connectome/nodes/visual.npz")
layer = ConnectomeLIFLayer(num_neurons=visual_data["n_neurons"], device="cpu")
apply_connectome_to_layer(visual_data, layer)
print(f"structural_mask: {layer.structural_mask.shape}, ei_ratio={layer.ei_ratio:.2f}")
# --- ステップ 3: HCP 遅延ルーティングを構範 ---
router = HCPDelayRouter(session=None, config_path="config/connectome_config.yaml")
router.load_routing_table()
router.publish_all() # Zenoh 無しの場合はログのみ
# --- ステップ 4: 自動同期(CAVE API より差分取得) ---
sync_result = sync_connectome(
config_path="config/connectome_config.yaml",
cache_path="data/connectome/nodes/visual.npz",
output_path="data/connectome/nodes/visual.npz",
dry_run=True, # まずドライランで確認
)
print(f"sync status: {sync_result['status']}")
完全なウォークスルーサンプル: connectome_e3_demo.py
```