コンテンツにスキップ

EvoSpikeNet SDK クイックスタートガイド

Author: Masahiro Aoki

最終更新日: 2026年1月8日 30秒で始めるEvoSpikeNet SDK

このドキュメントの目的と使い方

  • 目的: SDKを最短でセットアップし、APIクライアントを動かす手順を示す。
  • 対象読者: SDK利用を開始する開発者。
  • まず読む順: インストール → APIサーバー起動 → 最小限の使用例。
  • 関連リンク: 分散脳スクリプトは examples/run_zenoh_distributed_brain.py(動作確認環境として)、PFC/Zenoh/Executive詳細は implementation/PFC_ZENOH_EXECUTIVE.md
  • 実装ノート(アーティファクト): docs/implementation/ARTIFACT_MANIFESTS.mdartifact_manifest.json と CLI フラグの仕様(--artifact-name / --precision / --quantize / --privacy-level / --node-type)を参照してください。

インストール

pip install -e .

APIサーバーの起動

sudo ./scripts/run_api_server.sh

最小限の使用例

from evospikenet.sdk import EvoSpikeNetAPIClient

# クライアント初期化(API サーバーが http://localhost:8000 で動作している前提)
client = EvoSpikeNetAPIClient(base_url="http://localhost:8000")

# サーバーの応答確認と簡易呼び出し
if client.wait_for_server(timeout=10):
    try:
        result = client.generate("人工知能とは", max_length=128)
        print(result.get('generated_text', result))
    except Exception as e:
        print('API 呼び出しでエラー:', e)
else:
    print('サーバーに接続できませんでした。API サーバーが起動していることを確認してください。')

よく使うパターン

1️⃣ シンプルなテキスト生成

<!-- TODO: update or remove - import faileNetAPIClient -->

client = EvoSpikeNetAPIClient()
result = client.generate("機械学習の応用例を5つ列挙してください")
print(result['generated_text'])

2️⃣ 複数プロンプトの処理

prompts = ["What is AI?", "Explain machine learning", "Deep learning basics"]
results = client.batch_generate(prompts, max_length=100)

for prompt, result in zip(prompts, results):
    print(f"{prompt}: {result.get('generated_text', 'Failed')}")

3️⃣ 画像を含むマルチモーダル処理

response = client.submit_prompt(
    prompt="この画像に写っているものは何ですか?",
    image_path="./image.jpg"
)
result = client.poll_for_result(timeout=60)
print(result['response'])

4️⃣ エラーハンドリング付き実行

# プロンプト検証
if client.validate_prompt("テストプロンプト"):
    # 自動リトライ付きで実行
    result = client.with_error_handling(
        client.generate,
        prompt="テストプロンプト",
        max_length=100,
        retries=3
    )
    if result:
        print("成功:", result['generated_text'])

5️⃣ 非同期タスクの監視

# タスク送信
client.submit_prompt(prompt="複雑なタスク")

# 結果をポーリング
result = client.poll_for_result(timeout=120, interval=5)

if result:
    print("結果:", result['response'])
else:
    print("タイムアウト")

6️⃣ モデル保存と復元

7️⃣ ノード検出情報の取得

# ノード検出APIから状態を取得
health = client.node_discovery_health()
print("active nodes", health.get("summary", {}).get("active"))

topo = client.node_discovery_topology()
print("topology nodes", len(topo.get("nodes", [])))
import torch
import io

# セッション作成
session = client.create_log_session("モデル訓練実験")
session_id = session['session_id']

# モデル保存
model_buffer = io.BytesIO()
torch.save(model.state_dict(), model_buffer)
model_buffer.seek(0)

# アップロード
artifact = client.upload_artifact(
    session_id=session_id,
    artifact_type="model",
    name="model.pth",
    file=model_buffer
)

# ダウンロード
client.download_artifact(
    artifact_id=artifact['artifact_id'],
    destination_path="./downloaded_model.pth"
)

### 7️⃣ ゲノム保存・適用(進化ダッシュボード連携)

```python
# ゲノム管理 API の利用例(存在する場合)
if hasattr(client, 'list_genomes'):
    genomes = client.list_genomes()
    if genomes:
        target = genomes[0].get('name')
        genome = client.get_genome(target)
        genome.setdefault('metadata', {})['note'] = 'edited via quickstart'
        client.save_genome(target, genome, make_active=True)
        client.apply_genome(target)
else:
    # SDK バージョン差異のため安全にガード
    print('この SDK ビルドではゲノム管理 API が利用できません。')

サンプル: examples/genome_management_sdk.py

### 7️⃣ データセットアップロード

```python
import zipfile
import os

# トレーニングデータディレクトリの準備
data_dir = "./training_data"
os.makedirs(f"{data_dir}/images", exist_ok=True)

# データセットをZIP圧縮
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
    # CSVファイル追加
    zf.write(f"{data_dir}/captions.csv", arcname='captions.csv')
    # 画像ファイル追加
    for root, _, files in os.walk(f"{data_dir}/images"):
        for file in files:
            full_path = os.path.join(root, file)
            archive_name = os.path.join('images', os.path.relpath(full_path, f"{data_dir}/images"))
            zf.write(full_path, arcname=archive_name)

zip_buffer.seek(0)
zip_buffer.name = "training_dataset.zip"

# データセットアップロード
dataset_artifact = client.upload_artifact(
    session_id=session_id,
    artifact_type="dataset",
    name="vision_training_data",
    file=zip_buffer,
    llm_type="SpikingEvoMultiModalLM"
)

print(f"データセットアップロード完了: {dataset_artifact['artifact_id']}")

8️⃣ AEG-Comm通信最適化 ⭐ NEW

# AEG-Comm設定(通信を85-93%削減)
client.set_aeg_comm_config(
    node_id="brain_node_1",
    enable_comm=True,
    energy_threshold=10.0,
    critical_modalities=["force", "safety"]
)

# 分散脳シミュレーション実行(最適化有効)
result = client.run_distributed_brain_simulation(
    query="ロボットの動作計画",
    modalities=["text", "force"],
    config={"enable_aeg_comm": True}
)

# 通信統計確認
stats = client.get_communication_stats("brain_node_1")
print(f"通信削減率: {stats['reduction_rate']}%")

9️⃣ 分散Coordinator ⭐ NEW

# 分散coordinatorの初期化
client.init_coordinator(
    node_id="coordinator_node_1",
    zenoh_config={"connect": ["tcp/127.0.0.1:7447"]},
    raft_config={"election_timeout": [5000, 10000]}
)

# Coordinator開始
client.start_coordinator()
print("分散coordinatorが開始されました")

# 協調タスクの送信
task_id = client.submit_coordination_task(
    task_type="federated_learning",
    payload={
        "model": "resnet50",
        "dataset": "cifar10",
        "rounds": 10
    }
)
print(f"協調タスク送信: {task_id}")

# タスク状態の確認
status = client.get_coordination_task_status(task_id)
if status:
    print(f"タスク状態: {status['status']}")

# クラスタ状態の取得
cluster_status = client.get_cluster_status()
print(f"リーダー: {cluster_status['leader_id']}")
print(f"アクティブノード数: {len(cluster_status['nodes'])}")

# 新しいノードの登録
node_info = {
    "address": "192.168.1.100",
    "port": 8001,
    "capabilities": ["gpu", "cpu"]
}
success = client.register_coordination_node("node_2", node_info)
print(f"ノード登録: {'成功' if success else '失敗'}")

# Coordinatorの停止
client.stop_coordinator()
print("分散coordinatorが停止されました")

> 内部タスク実装簡易版: `federated_learning`  updates の数値平均、`distributed_inference`  inputs/batches をそのまま完了扱い、`model_aggregation`  weights を平均化して返却しますノード発見は `/nodes/list` 応答を取り込みつつ古いハートビートのノードは自動クリーンアップされます

サーバー情報の確認

# サーバーヘルスチェック
is_healthy = client.is_server_healthy()
print(f"サーバーは正常ですか?: {'はい' if is_healthy else 'いいえ'}")

# ステータス監視
status = client.get_simulation_status()
print(f"現在のプロンプトステータス: {status.get('last_prompt_status', 'N/A')}")
print(f"アクティブノード数: {len(status.get('nodes', []))}")

よくあるエラーと解決方法

エラー 原因 解決策
ConnectionError APIサーバーが起動していない sudo ./scripts/run_api_server.sh で起動
Timeout 処理が遅い timeoutパラメータを増やす
Invalid prompt プロンプトが条件を満たさない validate_prompt()で事前確認

次のステップ


高度な機能 (P3実装完了)

🔄 遅延監視と最適化

# 遅延統計の取得
latency_stats = client.get_latency_stats()
print(f"平均遅延: {latency_stats['mean']:.2f}ms")
print(f"95パーセンタイル: {latency_stats['p95']:.2f}ms")

# 遅延ターゲットの確認
target_met = client.check_latency_target(500.0)  # 500ms目標
print(f"遅延目標達成: {target_met}")

💾 スナップショット/復旧

# システムスナップショット作成
snapshot_result = client.create_snapshot(
    snapshot_name="backup_20251212",
    include_models=True,
    include_data=True
)

# スナップショット一覧
snapshots = client.list_snapshots()

# システム復旧
restore_result = client.restore_snapshot(
    snapshot_path="/path/to/snapshot.gz",
    restore_models=True,
    restore_data=True
)

📊 スケーラビリティテスト

# スケーラビリティテスト実行
test_result = client.run_scalability_test(
    max_nodes=1000,
    test_duration=300.0,
    load_pattern="linear"
)

# リソース使用状況取得
resources = client.get_resource_usage()
print(f"CPU使用率: {resources['cpu_usage']}%")
print(f"メモリ使用量: {resources['memory_usage']}MB")

🔧 ハードウェア最適化

# ハードウェア最適化(ONNX/量子化など)
optimization_result = client.optimize_model(
    model_type="vision",              # "vision" | "audio"
    optimizations=["onnx", "quantize"]
)

# モデルベンチマーク
benchmark_result = client.benchmark_model(
    model_type="vision",
    num_runs=50
)

🛡️ 高可用性監視

# 可用性ステータス取得
availability = client.get_availability_status()
print(f"全体可用性: {availability['overall_availability']}%")
print(f"アップタイム: {availability['uptime_percentage']}%")

# ヘルスチェック実行
health_result = client.perform_health_check()

# 可用性統計取得
stats = client.get_availability_stats(time_window="24h")

🌐 非同期Zenoh通信

# Zenoh通信統計取得
zenoh_stats = client.get_zenoh_stats()
print(f"メッセージ数: {zenoh_stats['messages_sent']}")
print(f"平均遅延: {zenoh_stats['avg_latency']}ms")

⚖️ 分散コンセンサス

# コンセンサス提案
proposal_result = client.propose_consensus_decision(
    decision_type="resource_allocation",
    payload={"resource": "gpu", "amount": 50},
    priority=1
)

# コンセンサス結果取得
result = client.get_consensus_result(proposal_result['proposal_id'])

# コンセンサス統計
consensus_stats = client.get_consensus_stats()

チートシート

<!-- TODO: update<!-- モジュール 'evospikenet' が見つかりませんパッケージ内の移動/名前変更を確認してください -->kenet.sdk import EvoSpikeNetr()           # 起動待機
client.is_server_healthy()         # ヘルスチェック

# テキスト生成
client.generate(prompt)            # シンプル生成
client.batch_generate(prompts)     # バッチ処理
client.submit_prompt(prompt)       # 非同期送信
client.poll_for_result()           # 結果待機

# 検証・制御
client.validate_prompt(prompt)     # プロンプト検証
client.with_error_handling(func)   # リトライ付き実行

# ステータス・ログ
client.get_simulation_status()     # ステータス取得
client.get_simulation_result()     # 結果取得
client.get_remote_log()            # ログ取得

# アーティファクト管理
client.create_log_session()        # セッション作成
client.upload_artifact()           # アップロード
client.download_artifact()         # ダウンロード
client.list_artifacts()            # リスト表示

6️⃣ LLMトレーニングジョブの実行 (新機能)

Vision Encoderトレーニング

<!-- TODO: update or remove - impo<!-- モジュール 'evospikenet' が見つかりませんパッケージ内の移動/名前変更を確認してください -->EvoSpikeNetAPIClient -->

client = EvoSpikeNetAPICli "model_name": "google/vit-base-patch16-224",
    "dataset_path": "data/llm_training/Vision/vision_data.jsonl",
    "output_dir": "saved_models/Vision/vision-training-run",
    "gpu": True,
    "epochs": 3,
    "batch_size": 8,
    "learning_rate": 0.00001
}

response = client.submit_training_job(job_data)
print(f"トレーニングジョブを開始しました: {response['job_id']}")

# ジョブステータスの確認
status = client.get_training_status(response['job_id'])
print(f"ジョブステータス: {status['status']}")

Audio Encoderトレーニング

# Audio Encoderトレーニングジョブの送信
job_data = {
    "category": "Audio",
    "model_name": "openai/whisper-base",
    "dataset_path": "data/llm_training/Audio/audio_data.jsonl",
    "output_dir": "saved_models/Audio/audio-training-run",
    "gpu": True,
    "epochs": 3,
    "batch_size": 8,
    "learning_rate": 0.00001
}

response = client.submit_training_job(job_data)
print(f"Audioトレーニングジョブを開始しました: {response['job_id']}")

トレーニングジョブの監視

# すべてのトレーニングジョブのリストを取得
jobs = client.list_training_jobs()
for job in jobs:
    print(f"ジョブID: {job['job_id']}, ステータス: {job['status']}, カテゴリ: {job['category']}")

# 特定のジョブの詳細を取得
job_details = client.get_training_job_details("vision_training_job_001")
print(f"ジョブ詳細: {job_details}")

分散脳ノード対応トレーニング

# ノードタイプ定義を共有モジュールから取得する例
from evospikenet.node_types import node_type_for_rank, NODE_TYPE_DEFINITIONS

# ランク→タイプ変換例
rank = 2
nt = node_type_for_rank(rank)  # 例えば "vision"

# ノードタイプに紐づくデフォルトモデル
default_llm = NODE_TYPE_DEFINITIONS[nt]["llm"]

# トレーニングジョブを作成
training_job = {
    "category": nt.capitalize(),
    "model_name": default_llm,
    "dataset_path": f"data/llm_training/{nt}/{nt}_data.jsonl",
    "output_dir": f"saved_models/{nt}/rank{rank}",
    "gpu": True,
    "epochs": 5,
    "batch_size": 16,
    "learning_rate": 0.00002,
    # ランク指定も可能
    "rank": rank,
}

client.submit_training_job(training_job)

6️⃣ 分散coordinatorの使用

# Guarded example: initialize API client and coordinator-related calls
try:
    from evospikenet.sdk import EvoSpikeNetAPIClient
except Exception:
    EvoSpikeNetAPIClient = None

if EvoSpikeNetAPIClient is not None:
    client = EvoSpikeNetAPIClient()

    # Coordinatorの初期化(API経由で提供される場合)
    if hasattr(client, "init_coordinator"):
        client.init_coordinator()
        print("Coordinator started")

    # タスクの送信(存在するAPIに依存)
    if hasattr(client, "submit_coordination_task"):
        task_id = client.submit_coordination_task(
            "federated_learning",
            {
                "model": "resnet50",
            }
        )
else:
    print("EvoSpikeNetAPIClient not available in this environment; skip coordinator example")
        "dataset": "cifar10",
        "epochs": 10
    }
)
print(f"タスク送信完了: {task_id}")

# タスク状態の確認
status = client.get_coordination_task_status(task_id)
if status:
    print(f"タスク状態: {status['status']}")

# クラスタ状態の取得
cluster_status = client.get_cluster_status()
print(f"アクティブノード数: {cluster_status['active_nodes']}")

# 新しいノードの登録
node_info = {
    "address": "192.168.1.100",
    "port": 8001,
    "capabilities": ["gpu", "cpu"]
}
client.register_coordination_node("node_2", node_info)

# Coordinatorの停止
client.stop_coordinator()
print("分散coordinatorが停止されました")

Happy coding! 🚀