コンテンツにスキップ

EvoSpikeNet Python SDK ドキュメント

Author: Masahiro Aoki

実装ノート(アーティファクト): トレーニングスクリプトが出力する artifact_manifest.json と推奨CLIフラグについては docs/implementation/ARTIFACT_MANIFESTS.md を参照してください。

最終更新日: 2025年12月12日

このドキュメントの目的と使い方

  • 目的: SDKの全体像と主要機能を把握し、セットアップから利用までの手順を案内する。
  • 対象読者: SDK利用を開始する開発者、API連携担当。
  • まず読む順: 1.概要 → 2.セットアップとインストール → 3.クイックスタート/サンプルコード。
  • 関連リンク: 分散脳スクリプトは examples/run_zenoh_distributed_brain.py(動作環境の一例)、PFC/Zenoh/Executive詳細は implementation/PFC_ZENOH_EXECUTIVE.md

1. 概要

EvoSpikeNet Python SDKは、EvoSpikeNet APIと対話するための高レベルなインターフェースを提供するクライアントライブラリです。このSDKを利用することで、開発者はHTTPリクエストの詳細を意識することなく、数行のPythonコードでEvoSpikeNetのテキスト生成、データロギング、分散脳シミュレーション機能を自身のアプリケーションに簡単に統合できます。

新規統合機能

  • 🔄 遅延監視: get_latency_stats(), check_latency_target()
  • 💾 スナップショット管理: create_snapshot(), restore_snapshot(), list_snapshots(), delete_snapshot(), validate_snapshot(), cleanup_snapshots()
  • 📊 スケーラビリティテスト: run_scalability_test(), test_node_scalability(), run_stress_test(), get_resource_usage(), get_system_limits()
  • 🔧 ハードウェア最適化: optimize_model(), benchmark_model(), get_hardware_info()
  • 🛡️ 高可用性監視: get_availability_status(), get_availability_stats(), perform_health_check(), trigger_recovery_action()
  • 🌐 非同期Zenoh通信: connect_zenoh(), publish_zenoh_message(), send_zenoh_request(), send_zenoh_notification(), get_zenoh_stats()
  • ⚖️ 分散コンセンサス: propose_consensus_decision(), get_consensus_result(), update_node_status(), cleanup_consensus()

SDKの可用性指標

  • API互換性: 25個以上の新エンドポイント対応
  • エラーハンドリング: 包括的な例外処理とリトライ機構
  • パフォーマンス: 全機能で< 500msの応答時間保証
  • 可用性: 99.9%+のAPI可用性
  • スケーラビリティ: 1000ノード以上での並列操作対応

1.2. 主要メソッド一覧

SDKで利用可能な主要メソッドを以下に示します:

基本機能

  • generate(prompt, max_length): テキスト生成
  • submit_prompt(prompt, config): プロンプト送信
  • batch_generate(prompts, max_length): バッチテキスト生成

分散脳シミュレーション

  • get_simulation_status(): シミュレーション状態取得
  • get_simulation_result(): シミュレーション結果取得
  • poll_for_result(timeout, interval): 結果ポーリング

アーティファクト管理

  • upload_artifact(file_path, artifact_type, metadata): アーティファクトアップロード
  • list_artifacts(artifact_type): アーティファクト一覧
  • download_artifact(artifact_id, destination_path): アーティファクトダウンロード

ログ管理

  • get_remote_log(user, ip, key_path, log_file_path): リモートログ取得
  • create_log_session(description): ログセッション作成

新規P3機能

  • get_latency_stats(): 遅延統計取得
  • create_snapshot(): スナップショット作成
  • run_scalability_test(): スケーラビリティテスト実行
  • optimize_model(): モデル最適化
  • connect_zenoh(): Zenoh接続
  • propose_consensus_decision(): コンセンサス決定提案

分散Coordinator ⭐ NEW

  • init_coordinator(node_id, zenoh_config, raft_config): 分散coordinator初期化
  • start_coordinator(): coordinator開始
  • stop_coordinator(): coordinator停止
  • submit_coordination_task(task_type, payload): 協調タスク送信(簡易組み込み実装: federated_learning=updates平均, distributed_inference=入力を即完了, model_aggregation=weights平均)
  • get_coordination_task_status(task_id): タスク状態取得
  • get_cluster_status(): クラスタ状態取得
  • register_coordination_node(node | NodeInfo, node_info=None): ノード登録(ID+dict または NodeInfo)
  • unregister_coordination_node(node_id): ノード解除

2. セットアップとインストール

2.1. 前提条件

  • Python 3.8以降
  • requestsライブラリ
  • 実行中のEvoSpikeNet APIサーバー

2.2. インストール手順

本SDKは、evospikenetパッケージの一部として提供されます。プロジェクトのルートディレクトリで以下のコマンドを実行し、プロジェクトを編集可能モードでインストールしてください。

pip install -e .

2.3. APIサーバーの起動

SDKを使用する前に、APIサーバーが起動している必要があります:

# Docker Composeを使用する場合(推奨)
sudo ./scripts/run_api_server.sh

# または、全サービス(UI含む)を起動
sudo ./scripts/run_frontend_cpu.sh

3. EvoSpikeNetAPIClient クラス

APIとのすべての通信を管理する中心的なクラスです。

3.1. 初期化

from evospikenet.sdk import EvoSpikeNetAPIClient

# APIサーバーがデフォルトのURL (http://localhost:8000) で実行されている場合
client = EvoSpikeNetAPIClient()

# Docker環境内から接続する場合
client = EvoSpikeNetAPIClient(base_url="http://api:8000")

3.2. ヘルスチェック

health_check() -> Dict[str, Any]

APIサーバーの /api/health を呼び出し、status と詳細情報を返します。

get_server_info() -> Optional[Dict]

サーバー情報とロード済みモデル情報を取得します。利用できない場合は None を返します。

is_server_healthy() -> bool

health_check() を真偽値へ畳み込み、APIサーバーが正常に稼働しているかを簡易判定します。

wait_for_server(timeout: int = 60, interval: int = 2) -> bool

is_server_healthy() を繰り返し呼び出し、サーバーが応答するようになるまで待機します。

node_discovery_health() -> Dict[str, Any]

ノード検出サービスのヘルス情報をSDK経由で取得します。返却辞書は /api/node-discovery/health に準拠し、nodes リストや summary を含みます。

node_discovery_topology() -> Dict[str, Any]

最新のネットワークトポロジー情報を取得します。SDKは /api/node-discovery/topology を呼び出します。


3.3. ノード検出サービスのSDK利用例

以下の例では、SDKクライアントを初期化し、ノード検出APIから健康状態とトポロジーを取得する方法を示します。

from evospikenet.sdk import EvoSpikeNetAPIClient

client = EvoSpikeNetAPIClient()

# ノードの健康状態を取得
health = client.node_discovery_health()
print("Health summary:", health.get("summary"))
for node in health.get("nodes", []):
    print(node["node_id"], node["status"], node.get("health_score"))

# ネットワークトポロジーを取得
topo = client.node_discovery_topology()
print("Nodes:", len(topo.get("nodes", [])))
print("Edges:", len(topo.get("edges", [])))

上記メソッドを使うことで、外部システムや運用スクリプトから簡単にノード検出情報へアクセスできます。

例:

client = EvoSpikeNetAPIClient()

health = client.health_check()
print("Health:", health.get("status"))

info = client.get_server_info()
if info:
    print("Version:", info.get("version"))
    print("Models:", info.get("models", []))

if client.is_server_healthy():
    print("✅ APIサーバーは正常に稼働しています")
else:
    print("❌ APIサーバーに接続できませんでした")


4. ノードタイプとモデルカテゴリ

EvoSpikeNet は分散脳シミュレーションのための様々な脳ノードタイプとモデルカテゴリをサポートしています。

4.1. ノードタイプ

以下のノードタイプがサポートされています:

ノードタイプ 説明 Rank
vision 視覚ノード(後頭葉 V1-V5) 1
motor 運動ノード(運動野 M1 + 小脳 + 脊髄) 2
auditory 聴覚ノード(側頭葉 A1-A2) 5
speech 音声生成ノード(ブローカ野 + 小脳) 6
executive 実行制御ノード(前頭前野 dlPFC) 0
general 汎用ノード N/A

4.2. モデルカテゴリ

各ノードタイプは特定のモデルカテゴリをサポートしています:

視覚ノードカテゴリ

  • image_classification: 画像分類
  • object_detection: 物体検出
  • semantic_segmentation: セマンティックセグメンテーション
  • image_generation: 画像生成
  • visual_qa: 視覚的質問応答

運動ノードカテゴリ

  • motion_control: 運動制御
  • trajectory_planning: 軌道計画
  • inverse_kinematics: 逆運動学
  • motor_adaptation: 運動適応

聴覚ノードカテゴリ

  • speech_recognition: 音声認識
  • audio_classification: 音声分類
  • sound_event_detection: 音イベント検出
  • speaker_recognition: 話者認識

音声ノードカテゴリ

  • text_to_speech: テキスト音声合成
  • voice_conversion: 声質変換
  • speech_synthesis: 音声合成

実行制御ノードカテゴリ

  • text_generation: テキスト生成
  • decision_making: 意思決定
  • planning: プランニング
  • reasoning: 推論
  • rag: 検索拡張生成 (RAG)

汎用カテゴリ

  • multimodal: マルチモーダル処理
  • embedding: 埋め込み生成
  • tokenization: トークン化

5. テキスト生成

4.1. 基本的なテキスト生成

generate(prompt: str, max_length: int = 50) -> Dict[str, str]

標準的なテキスト生成エンドポイント (/api/generate) を呼び出します。

例:

result = client.generate("人工知能とは", max_length=100)
print(f"生成テキスト: {result.get('generated_text', '')}")

4.2. バッチ処理

batch_generate(prompts: List[str], max_length: int = 50) -> List[Dict]

複数のプロンプトを順番に処理します。

例:

prompts = ["AIとは?", "機械学習の応用例"]
results = client.batch_generate(prompts)
for res in results:
    print(res.get('generated_text', 'エラー'))

4.3. エラーハンドリング付き実行

with_error_handling(func: Callable, retries: int = 3, *args, **kwargs)

API呼び出しをラップし、失敗時に指数バックオフ付きで自動リトライします。

例:

result = client.with_error_handling(
    client.generate,
    retries=3,
    prompt="テストプロンプト",
    max_length=50
)
if result:
    print(f"成功: {result['generated_text']}")
else:
    print("失敗: すべてのリトライが失敗しました")


5. 分散脳シミュレーション

5.1. マルチモーダルプロンプトの送信

submit_prompt(prompt: str = None, image_path: str = None, audio_path: str = None) -> Dict

シミュレーションにマルチモーダルなプロンプトを送信します。内部で画像・音声ファイルはBase64エンコードされます。

例:

# テキストと画像を組み合わせて送信
response = client.submit_prompt(
    prompt="この画像に写っているものは何ですか?",
    image_path="./examples/dummy_image.png"
)
prompt_id = response.get('prompt_id')
print(f"プロンプト送信成功: {prompt_id}")

5.2. 結果のポーリング

poll_for_result(timeout: int = 120, interval: int = 5) -> Optional[Dict]

システムの結果エンドポイントを定期的にポーリングして、利用可能な最新の結果を取得します(SDK実装では内部的にグローバルな結果エンドポイントを参照します)。個別の prompt_id を扱う場合は、サーバー側のレスポンスで返される識別子をクライアント側でフィルタして利用してください。

例:

# サーバーに送信した後、一定時間で結果を待つ
result = client.poll_for_result(timeout=120, interval=5)

if result and result.get('response'):
    print(f"✅ 応答: {result['response']}")
else:
    print("❌ タイムアウトまたはエラー")

5.3. 状態監視とリモートログ

get_simulation_status() -> Dict

現在のシミュレーション全体のステータスを取得します。

get_remote_log(user: str, ip: str, key_path: str, log_file_path: str) -> Dict

SSH経由でリモートノードのログファイル(末尾100行)を取得します。

例:

log_data = client.get_remote_log(
    user="ubuntu",
    ip="192.168.1.101",
    key_path="~/.ssh/id_rsa",
    log_file_path="/tmp/sim_rank_1.log"
)
print(log_data.get('log_content'))


6. データロギングとアーティファクト管理

6.1. セッションの作成

create_log_session(description: str) -> Dict

新しい実験セッションを開始し、session_idを取得します。

例:

session = client.create_log_session(description="テキストモデルのファインチューニング")
session_id = session['session_id']

6.2. アーティファクトのアップロード

upload_artifact(session_id: str, artifact_type: str, name: str, file: io.BytesIO, llm_type: str = None) -> Dict

※ 注意: upload_artifact はファイルバッファ(io.BytesIO 等)のアップロードを想定しており、アップロード時にメタ情報として file.name が利用されます。ファイルパスを直接渡すのではなく、バイナリを読み込んだ BytesIO オブジェクトに .name を設定して渡してください。 モデル、設定ファイル、トークナイザーなどをセッションに関連付けてアップロードします。

  • llm_type: モデルのアーキテクチャを正確に記録するために重要です。(例: SpikingEvoTextLM, SpikingEvoMultiModalLM

例: モデルとトークナイザーのアップロード

import io
import torch
import json
from transformers import AutoTokenizer

# --- モデルと設定の準備 ---
config = { 'vocab_size': 1000, 'd_model': 128, ... }
model = SpikingEvoTextLM(**config)
# ... 訓練 ...

# --- アーティファクトのアップロード ---
# 1. モデルの重み
model_buffer = io.BytesIO()
torch.save(model.state_dict(), model_buffer)
model_buffer.seek(0)
model_buffer.name = 'spiking_lm.pth'  # 必須: upload_artifact は file.name を参照します
client.upload_artifact(session_id, "model", model_buffer.name, model_buffer, llm_type="SpikingEvoTextLM")

# 2. 設定ファイル
config_buffer = io.BytesIO(json.dumps(config).encode('utf-8'))
config_buffer.name = 'config.json'
client.upload_artifact(session_id, "config", "config.json", config_buffer, llm_type="SpikingEvoTextLM")

# 3. トークナイザー
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
tokenizer.save_pretrained('./tokenizer_temp')
shutil.make_archive('tokenizer', 'zip', './tokenizer_temp')

with open('tokenizer.zip', 'rb') as f:
    zip_buffer = io.BytesIO(f.read())
    zip_buffer.name = 'spiking_lm_tokenizer.zip'
    client.upload_artifact(session_id, "tokenizer", "spiking_lm_tokenizer.zip", zip_buffer, llm_type="SpikingEvoTextLM")

6.3. アーティファクトのリスト化とダウンロード

list_artifacts(artifact_type: str = None) -> List[Dict]

保存されているアーティファクトのリストを取得します。

download_artifact(artifact_id: str, destination_path: str)

指定したアーティファクトIDのファイルをダウンロードします。

例: 最新モデルのダウンロード

models = client.list_artifacts(artifact_type="model")
if models:
    latest_model_artifact = models[0]
    client.download_artifact(
        artifact_id=latest_model_artifact['artifact_id'],
        destination_path="./latest_model.pth"
    )
    print("✅ 最新モデルをダウンロードしました")


7. 総合的な使用例

7.1. モデル訓練とアーティファクト管理の完全なワークフロー

import io
import json

from evospikenet.sdk import EvoSpikeNetAPIClient


def complete_ml_workflow():
    client = EvoSpikeNetAPIClient()

    health = client.health_check()
    if health.get("status") != "healthy":
        print("APIサーバーに接続できません")
        return

    session = client.create_log_session("Complete training workflow example")
    session_id = session["session_id"]
    print(f"セッションID: {session_id}")

    config = {
        "model_name": "SpikingEvoTextLM",
        "vocab_size": 30522,
        "d_model": 128,
        "n_heads": 4,
    }
    config_buffer = io.BytesIO(json.dumps(config).encode("utf-8"))
    config_buffer.name = "config.json"
    client.upload_artifact(session_id, "config", config_buffer.name, config_buffer, llm_type="SpikingEvoTextLM")
    print("設定をアップロード")

    metrics = {"accuracy": 0.91, "loss": 0.12}
    metrics_buffer = io.BytesIO(json.dumps(metrics).encode("utf-8"))
    metrics_buffer.name = "metrics.json"
    client.upload_artifact(session_id, "log", metrics_buffer.name, metrics_buffer)
    print("メトリクスをアップロード")

    artifacts = client.list_artifacts(artifact_type="config")
    print(f"config artifacts: {artifacts}")


if __name__ == "__main__":
    complete_ml_workflow()

8. エラーハンドリングとベストプラクティス

8.1. 指数バックオフリトライ

with_error_handling()メソッドは、失敗時に指数バックオフでリトライします:

\[ \text{待機時間} = 2^{\text{試行回数} - 1} \text{ 秒} \]

使用例:

client = EvoSpikeNetAPIClient()

# 接続性を先に確認
health = client.health_check()
if health.get("status") != "healthy":
    print("サーバーに接続できません")
    exit(1)

info = client.get_server_info()
if info:
    print(f"server version: {info.get('version')}")

# リトライ付きAPI呼び出し
result = client.with_error_handling(
    client.generate,
    retries=5,
    prompt="テストプロンプト",
    max_length=100
)

if result:
    print(result['generated_text'])
else:
    print("生成に失敗しました")

8.2. 主な例外

例外 原因 対処法
EvoSpikeNetAPIError 接続失敗、HTTPエラー、タイムアウト error_info を確認し、必要に応じて wait_for_server() やリトライを併用
ValueError 不正な引数 入力データを検証

8. P3機能メソッド一覧

8.1. 遅延監視メソッド

get_latency_stats() -> Dict

全コンポーネントの遅延統計を取得します。

check_latency_target() -> Dict

各コンポーネントのターゲット(p95ベース)達成状況を確認します。

8.2. スナップショット管理メソッド

create_snapshot(snapshot_name: str, include_models: bool = True, include_data: bool = True, compression_level: int = 6) -> Dict

システムスナップショットを作成します。

restore_snapshot(snapshot_path: str, restore_models: bool = True, restore_data: bool = True) -> Dict

スナップショットからシステムを復旧します。

list_snapshots() -> Dict

利用可能なスナップショット一覧を取得します。

delete_snapshot(snapshot_path: str) -> Dict

スナップショットを削除します。

validate_snapshot(snapshot_path: str) -> Dict

スナップショットの整合性を検証します。

cleanup_snapshots(max_age_days: int = 30) -> Dict

古いスナップショットをクリーンアップします。

8.3. スケーラビリティテストメソッド

run_scalability_test(max_nodes: int = 50, test_duration: int = 30) -> Dict

スケーラビリティテストを実行します。

test_node_scalability(node_counts: List[int], test_duration: float = 60.0) -> Dict

複数ノード数での性能を比較テストします。

run_stress_test(intensity: str = "high", duration: float = 120.0) -> Dict

ストレステストを実行します。

get_resource_usage() -> Dict

現在のリソース使用状況を取得します。

get_system_limits() -> Dict

推奨最大ノード数やスループットの上限を取得します。

8.4. ハードウェア最適化メソッド

optimize_model(model_type: str, optimizations: Optional[List[str]] = None) -> Dict

ONNXエクスポートや量子化などの最適化を実行します。

benchmark_model(model_type: str, num_runs: int = 50) -> Dict

モデルの実行性能をベンチマークします。

get_hardware_info() -> Dict

ハードウェア最適化の対応状況を取得します。

8.5. 高可用性監視メソッド

get_availability_status() -> Dict

現在の可用性ステータスを取得します。

get_availability_stats(time_window: str = "24h") -> Dict

可用性統計を取得します。

perform_health_check() -> Dict

ヘルスチェックを実行します。

trigger_recovery_action(action_type: str, parameters: Dict[str, Any] = None) -> Dict

リカバリアクションを実行します。

get_availability_alerts(limit: int = 50) -> Dict

直近のアラートを取得します。

schedule_maintenance(start_time: str, duration_minutes: int, reason: str) -> Dict

メンテナンスウィンドウを予約します。

8.6. 非同期Zenoh通信メソッド

connect_zenoh(node_id: str = "api_node") -> Dict

Zenohルーターへ接続します。

publish_zenoh_message(topic: str, payload: Any, priority: str = "normal", message_type: str = "notification", node_id: str = "api_node") -> Dict

ZenohでメッセージをPublishします。

send_zenoh_request(target_node: str, request: Any, timeout: float = 5.0, node_id: str = "api_node") -> Dict

リクエスト/レスポンスのやり取りを行います。

send_zenoh_notification(target_nodes: List[str], notification: Any, priority: str = "normal", node_id: str = "api_node") -> Dict

複数ノードへ通知を送信します。

get_zenoh_stats(node_id: str = "api_node") -> Dict

Zenoh通信の統計情報を取得します。

8.7. 分散コンセンサスメソッド

propose_consensus_decision(decision_type: str, payload: Any, priority: int = 1, dependencies: List[str] = None) -> Dict

コンセンサス決定を提案します。

get_consensus_result(proposal_id: str, timeout: float = 30.0) -> Dict

コンセンサス結果を取得します。

update_node_status(node_id: str, active: bool) -> Dict

コンセンサスノードの稼働状態を更新します。

get_consensus_stats() -> Dict

コンセンサス統計を取得します。

cleanup_consensus(max_age: float = 300.0) -> Dict

古いコンセンサス提案をクリーンアップします。


8. LLMトレーニングジョブ管理 (新機能)

EvoSpikeNet SDKは、分散脳システム向けのLLMトレーニングジョブの管理機能を備えています。この機能により、Vision/Audio Encoderなどのモダリティ固有のモデルをAPI経由でトレーニングできます。

8.1. トレーニングジョブの送信

submit_training_job(job_config: Dict) -> Dict

新しいトレーニングジョブをAPIサーバーに送信します。

パラメータ: - job_config: トレーニング設定を含む辞書 - category: モデルカテゴリ ("LangText", "Vision", "Audio", "MultiModal") - model_name: 使用するモデル名 - dataset_path: トレーニングデータのパス - output_dir: 出力ディレクトリ - gpu: GPU使用フラグ - epochs: エポック数 - batch_size: バッチサイズ - learning_rate: 学習率

例:

# Vision Encoderトレーニング
vision_job = {
    "category": "Vision",
    "model_name": "google/vit-base-patch16-224",
    "dataset_path": "data/llm_training/Vision/vision_data.jsonl",
    "output_dir": "saved_models/Vision/vision-run-001",
    "gpu": True,
    "epochs": 3,
    "batch_size": 8,
    "learning_rate": 0.00001
}

response = client.submit_training_job(vision_job)
print(f"ジョブID: {response['job_id']}")

8.2. ジョブステータスの確認

get_training_status(job_id: str) -> Dict

指定したトレーニングジョブの現在のステータスを取得します。

例:

status = client.get_training_status("vision_training_job_001")
print(f"ステータス: {status['status']}")  # running, completed, failed
print(f"進捗: {status.get('progress', 0)}%")

list_training_jobs(status_filter: str = None) -> List[Dict]

すべてのトレーニングジョブのリストを取得します。

例:

# すべてのジョブを取得
all_jobs = client.list_training_jobs()

# 実行中のジョブのみを取得
running_jobs = client.list_training_jobs(status_filter="running")

for job in running_jobs:
    print(f"{job['job_id']}: {job['category']} - {job['status']}")

8.3. ジョブ詳細の取得

get_training_job_details(job_id: str) -> Dict

トレーニングジョブの詳細情報を取得します。

例:

details = client.get_training_job_details("vision_training_job_001")
print(f"モデル: {details['model_name']}")
print(f"データセット: {details['dataset_path']}")
print(f"開始時間: {details['start_time']}")
print(f"ログ: {details.get('logs', [])}")

8.4. 分散脳ノード対応トレーニング

SDKは分散脳システムのノード構成に最適化されたトレーニング設定をサポートしています。

例:

# 分散脳ノードタイプ別の設定
node_training_configs = {
    "Vision": {
        "model_name": "google/vit-base-patch16-224",
        "node_types": ["Vision-Primary", "Vision-Secondary"],
        "dataset_path": "data/llm_training/Vision/vision_data.jsonl"
    },
    "Audio": {
        "model_name": "openai/whisper-base",
        "node_types": ["Audio-Primary", "Audio-Secondary"],
        "dataset_path": "data/llm_training/Audio/audio_data.jsonl"
    },
    "LangText": {
        "model_name": "microsoft/DialoGPT-medium",
        "node_types": ["Lang-Primary", "Lang-Secondary"],
        "dataset_path": "data/llm_training/LangText/langtext_data.jsonl"
    }
}

# Visionノード用のトレーニングジョブ
vision_config = node_training_configs["Vision"]
job_config = {
    "category": "Vision",
    "model_name": vision_config["model_name"],
    "dataset_path": vision_config["dataset_path"],
    "output_dir": f"saved_models/Vision/distributed-{datetime.now().strftime('%Y%m%d_%H%M%S')}",
    "gpu": True,
    "epochs": 5,
    "batch_size": 16,
    "learning_rate": 0.00002
}

response = client.submit_training_job(job_config)
print(f"分散Visionトレーニングを開始: {response['job_id']}")

8.5. トレーニング監視と自動化

定期的なステータス監視

import time

def monitor_training_job(job_id: str, check_interval: int = 30):
    """トレーニングジョブを監視し、完了まで待機"""
    while True:
        status = client.get_training_status(job_id)
        print(f"ジョブ {job_id}: {status['status']}")

        if status['status'] in ['completed', 'failed']:
            return status

        time.sleep(check_interval)

# 使用例
final_status = monitor_training_job("vision_training_job_001")
if final_status['status'] == 'completed':
    print("トレーニングが正常に完了しました")
else:
    print(f"トレーニングが失敗しました: {final_status.get('error', 'Unknown error')}")

複数ジョブの一括管理

def submit_multiple_training_jobs(job_configs: List[Dict]) -> List[str]:
    """複数のトレーニングジョブを一括送信"""
    job_ids = []
    for config in job_configs:
        try:
            response = client.submit_training_job(config)
            job_ids.append(response['job_id'])
            print(f"ジョブ送信成功: {response['job_id']} ({config['category']})")
        except Exception as e:
            print(f"ジョブ送信失敗: {config['category']} - {e}")

    return job_ids

# 使用例
configs = [
    {"category": "Vision", "model_name": "google/vit-base-patch16-224", ...},
    {"category": "Audio", "model_name": "openai/whisper-base", ...},
    {"category": "LangText", "model_name": "microsoft/DialoGPT-medium", ...}
]

job_ids = submit_multiple_training_jobs(configs)
print(f"送信したジョブ数: {len(job_ids)}")

8.6. サンプルコード集

SDKの各機能を実際に試すための包括的なサンプルコードを以下に示します。これらのサンプルは examples/sdk/ ディレクトリに配置されています。

8.6.1. 基本的な使用例 (sdk_basic_usage.py)

#!/usr/bin/env python3
# Copyright 2026 Moonlight Technologies Inc.
# Auth Masahiro Aoki

"""
Basic SDK Usage Example

This script demonstrates the basic usage of the EvoSpikeNet SDK,
including text generation, model management, and artifact handling.
"""

from evospikenet.sdk import EvoSpikeNetAPIClient


def main():
    client = EvoSpikeNetAPIClient(base_url="http://localhost:8000")

    print("=== EvoSpikeNet SDK Basic Usage Demo ===\n")

    print("1. Health Check")
    health = client.health_check()
    print(f"Health status: {health.get('status')}")

    info = client.get_server_info()
    if info:
        print(f"Server version: {info.get('version')}")
        print(f"Loaded models: {info.get('models', [])}")

    print()

    print("2. Text Generation")
    result = client.generate(
        "Explain quantum computing in simple terms",
        max_length=150,
    )
    print(f"Generated text: {result.get('generated_text', '')[:100]}...")

    print()

    print("3. Artifact Management")
    artifacts = client.list_artifacts()
    artifact_count = len(artifacts) if isinstance(artifacts, list) else len(artifacts.get("artifacts", []))
    print(f"Available artifacts: {artifact_count}")

    print()

    print("4. Client Statistics")
    stats = client.get_stats()
    print(f"Requests: {stats['requests']}")
    print(f"Errors: {stats['errors']}")

    print("\n=== Demo completed ===")


if __name__ == "__main__":
    main()

8.6.2. 高度な機能デモ (sdk_advanced_features.py)

P3機能を含む高度なSDK機能をデモンストレーションします:

  • 遅延監視
  • スナップショット管理
  • スケーラビリティテスト
  • ハードウェア最適化
  • 高可用性監視
  • 非同期Zenoh通信
  • 分散コンセンサス

8.6.3. エラーハンドリング (sdk_error_handling.py)

包括的なエラーハンドリングとリトライ機構のデモンストレーション:

  • 基本的な例外処理
  • リトライロジック
  • 回路ブレーカー
  • グレースフルデグラデーション
  • カスタムエラーハンドリング
  • エラーリカバリー

8.6.4. パフォーマンス監視 (sdk_performance_monitoring.py)

パフォーマンス監視とベンチマーク機能:

  • テキスト生成ベンチマーク
  • 分散シミュレーションベンチマーク
  • リソース使用量監視
  • API呼び出しプロファイリング
  • パフォーマンス最適化

8.6.5. 設定管理 (sdk_configuration_management.py)

設定管理機能の包括的なデモンストレーション:

  • 基本設定
  • 環境変数ベース設定
  • ファイルベース設定
  • 動的設定更新
  • 設定検証
  • 設定プロファイル
  • 設定永続化
  • 設定マージ
  • セキュア設定

8.6.6. 分散脳シミュレーション (sdk_distributed_brain.py)

分散脳シミュレーションの高度な機能:

  • 基本分散シミュレーション
  • スケーラブルシミュレーション
  • ロードバランシング
  • フォールトトレランス
  • リアルタイム監視
  • ノード管理
  • 分散データ処理
  • パフォーマンス最適化

8.6.7. 実行方法

各サンプルは以下のコマンドで実行できます:

# 基本使用例
python examples/sdk/sdk_basic_usage.py

# 高度な機能
python examples/sdk/sdk_advanced_features.py

# エラーハンドリング
python examples/sdk/sdk_error_handling.py

# パフォーマンス監視
python examples/sdk/sdk_performance_monitoring.py

# 設定管理
python examples/sdk/sdk_configuration_management.py

# 分散脳シミュレーション
python examples/sdk/sdk_distributed_brain.py

8.6.8. サンプルコードの学習パス

  1. 初めての方: sdk_basic_usage.py から開始
  2. エラーハンドリング: sdk_error_handling.py で堅牢なコードを書く
  3. パフォーマンス: sdk_performance_monitoring.py で最適化
  4. 設定管理: sdk_configuration_management.py で本番環境対応
  5. 高度な機能: sdk_advanced_features.py でP3機能を活用
  6. 分散処理: sdk_distributed_brain.py でスケーラブルなアプリケーション

10. Phase E-3 コネクトーム本番化 SDK

最終更新日: 2026-03-19(Phase E-3 全完了)

Phase E-3 による新機能を SDK から利用する方法。

10.1 新規 API メソッド一覧

コネクトーム自動同期(scripts/sync_connectome.py)

  • apply_delta(base_path, delta_path, result_path): 差分 JSON を NPZ に適用
  • apply_delta_with_validation(base_path, delta_path, result_path, *, rollback_dir, ei_ratio_range): E/I 比検証付き差分適用
  • fetch_cave_synapses_with_retry(url, params, max_retries, backoff_factor): 429 リトライ付き CAVE API 取得
  • sync_connectome(config_path, cache_path, output_path, *, dry_run): 完全自動同期オーケストレーター
  • ConnectomeSyncValidationError: E/I 検証失敗例外

HCP 遅延ルーティング(evospikenet.brain_routing)

  • compute_delay_matrix(manifest, config_path): HCP 遅延行列生成
  • optimize_routing_delays(manifest, delay_matrix): 結連に priority スコアを付与
  • build_hcp_routing_table(config_path): 完全パイプラインでルーティングテーブルを構範
  • HCPDelayRouter(session, config_path): 遅延アウェアルーター(session=None でグレースフル動作)

Auto Node Mapper(scripts/auto_node_mapper.py)

  • map_connectome(input_path, output_dir, config_path, *, dry_run, seed): コネクトーム → ノード別 NPZ 分割 + node_manifest.yaml
  • generate_manifest(output_dir, config_path): 既存 NPZ からマニフェストを再生成
  • MappingResult / NodeMappingEntry データクラス

10.2 ステップバイステップサンプル

# Phase E-3 全コンポーネントウォークスルー
import evospikenet as esn
from scripts.auto_node_mapper import map_connectome
from scripts.sync_connectome import sync_connectome
from evospikenet.brain_routing import HCPDelayRouter
from evospikenet import load_connectome_npz, apply_connectome_to_layer, ConnectomeLIFLayer

# 1. コネクトーム → ノード別 NPZ
result = map_connectome(
    input_path="data/connectome/flywire_visual.json",
    output_dir="data/connectome/nodes/",
    config_path="config/connectome_config.yaml",
    seed=42,
)

# 2. NPZ を ConnectomeLIFLayer に注入
data = load_connectome_npz("data/connectome/nodes/visual.npz")
layer = ConnectomeLIFLayer(num_neurons=data["n_neurons"], device="cpu")
apply_connectome_to_layer(data, layer)

# 3. HCP 遅延ルーティングを起動
router = HCPDelayRouter(session=None, config_path="config/connectome_config.yaml")
router.load_routing_table()
router.publish_all()

# 4. 定期同期(CAVE API)
sync_result = sync_connectome(
    config_path="config/connectome_config.yaml",
    cache_path="data/connectome/nodes/visual.npz",
    output_path="data/connectome/nodes/visual.npz",
    dry_run=True,
)
print(f"sync: {sync_result['status']}")

10.3 完全サンプルコード

完全なウォークスルーサンプルは connectome_e3_demo.py を参照。


9. まとめ

EvoSpikeNet Python SDKを使用することで、数行のコードでAPIの全機能にアクセスできます。主な利点:

シンプルなインターフェース: HTTPリクエストの詳細を隠蔽 ✅ 自動リトライ: 指数バックオフで堅牢性向上 ✅ マルチモーダル対応: テキスト・画像・音声の統合処理 ✅ 完全なMLOpsサポート: セッション管理、アーティファクトアップロード ✅ LLMトレーニング統合: 分散脳向けトレーニングジョブ管理

詳細なコード例はexamples/sdk_usage.pyを参照してください。