EvoSpikeNet SDK クイックスタートガイド
Copyright: 2026 Moonlight Technologies Inc. All Rights Reserved.
Author: Masahiro Aoki
最終更新日: 2026年5月22日 30秒で始めるEvoSpikeNet SDK
このドキュメントの目的と使い方
- 目的: SDKを最短でセットアップし、APIクライアントを動かす手順を示す。
- 対象読者: SDK利用を開始する開発者。
- まず読む順: インストール → APIサーバー起動 → 最小限の使用例。
- 関連リンク: 分散脳スクリプトは
examples/run_zenoh_distributed_brain.py(動作確認環境として)、PFC/Zenoh/Executive詳細は implementation/PFC_ZENOH_EXECUTIVE.md。 - 実装ノート(アーティファクト):
docs/implementation/ARTIFACT_MANIFESTS.md—artifact_manifest.jsonと CLI フラグの仕様(--artifact-name/--precision/--quantize/--privacy-level/--node-type)を参照してください。
インストール
pip install -e .
APIサーバーの起動
sudo ./scripts/run_api_server.sh
最小限の使用例
from evospikenet.sdk import EvoSpikeNetAPIClient
# クライアント初期化(API サーバーが http://localhost:8000 で動作している前提)
client = EvoSpikeNetAPIClient(base_url="http://localhost:8000")
# サーバーの応答確認と簡易呼び出し
health = client.health_check()
if health.get("status") == "healthy":
try:
result = client.generate("人工知能とは", max_length=128)
print(result.get('generated_text', result))
except Exception as e:
print('API 呼び出しでエラー:', e)
else:
print('サーバーに接続できませんでした。API サーバーが起動していることを確認してください。')
RAG v2 を最短で試す
/api/v2/rag/search、/api/v2/rag/feedback、/api/v2/rag/preprocessing/health が有効な API サーバーを前提に、最短で RAG v2 を確認する例です。
from evospikenet.sdk import EvoSpikeNetAPIClient
client = EvoSpikeNetAPIClient(base_url="http://localhost:8000")
# 1. 前処理の readiness を確認
health = client.rag_v2_preprocessing_health()
print("ready:", health.get("ready"))
# 2. 検索を実行
search = client.rag_v2_search(
query="EV-2024-001 の進捗会議",
top_k=3,
return_debug_info=True,
)
print("session_id:", search.get("session_id"))
print("top result:", search.get("results", [])[:1])
# 3. フィードバックを返す
feedback = client.rag_v2_feedback(search["session_id"], rating=5)
print("feedback_id:", feedback.get("feedback_id"))
CLI で試す場合は examples/sdk/programs/rag_v2_sdk_demo.py を利用できます。
python examples/sdk/programs/rag_v2_sdk_demo.py --query "EV-2024-001 の進捗会議"
python examples/sdk/programs/rag_v2_sdk_demo.py --rating 5
よく使うパターン
1️⃣ シンプルなテキスト生成
from evospikenet.sdk import EvoSpikeNetAPIClient
client = EvoSpikeNetAPIClient()
result = client.generate("機械学習の応用例を5つ列挙してください")
print(result['generated_text'])
2️⃣ 複数プロンプトの処理
prompts = ["What is AI?", "Explain machine learning", "Deep learning basics"]
results = client.batch_generate(prompts, max_length=100)
for prompt, result in zip(prompts, results):
print(f"{prompt}: {result.get('generated_text', 'Failed')}")
3️⃣ 画像を含むマルチモーダル処理
response = client.submit_prompt(
prompt="この画像に写っているものは何ですか?",
image_path="./image.jpg"
)
result = client.poll_for_result(timeout=60)
print(result['response'])
4️⃣ エラーハンドリング付き実行
# プロンプト検証
if client.validate_prompt("テストプロンプト"):
# 自動リトライ付きで実行
result = client.with_error_handling(
client.generate,
prompt="テストプロンプト",
max_length=100,
retries=3
)
if result:
print("成功:", result['generated_text'])
5️⃣ 非同期タスクの監視
# タスク送信
client.submit_prompt(prompt="複雑なタスク")
# 結果をポーリング
result = client.poll_for_result(timeout=120, interval=5)
if result:
print("結果:", result['response'])
else:
print("タイムアウト")
6️⃣ モデル保存と復元
import io
import torch
# セッション作成
session = client.create_log_session("モデル訓練実験")
session_id = session['session_id']
# モデル保存
model_buffer = io.BytesIO()
torch.save(model.state_dict(), model_buffer)
model_buffer.seek(0)
# アップロード
artifact = client.upload_artifact(
session_id=session_id,
artifact_type="model",
name="model.pth",
file=model_buffer,
)
# ダウンロード
client.download_artifact(
artifact_id=artifact['artifact_id'],
destination_path="./downloaded_model.pth",
)
7️⃣ ノード検出情報の取得
# ノード検出APIから状態を取得
health = client.node_discovery_health()
print("active nodes", health.get("summary", {}).get("active"))
topo = client.node_discovery_topology()
print("topology nodes", len(topo.get("nodes", [])))
8️⃣ ゲノム保存・適用(進化ダッシュボード連携)
# ゲノム管理 API の利用例(存在する場合)
if hasattr(client, 'list_genomes'):
genomes = client.list_genomes()
if genomes:
target = genomes[0].get('name')
genome = client.get_genome(target)
genome.setdefault('metadata', {})['note'] = 'edited via quickstart'
client.save_genome(target, genome, make_active=True)
client.apply_genome(target)
else:
print('この SDK ビルドではゲノム管理 API が利用できません。')
サンプル: examples/genome_management_sdk.py
9️⃣ データセットアップロード
import io
import os
import zipfile
# トレーニングデータディレクトリの準備
data_dir = "./training_data"
os.makedirs(f"{data_dir}/images", exist_ok=True)
# データセットをZIP圧縮
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
zf.write(f"{data_dir}/captions.csv", arcname='captions.csv')
for root, _, files in os.walk(f"{data_dir}/images"):
for file in files:
full_path = os.path.join(root, file)
archive_name = os.path.join('images', os.path.relpath(full_path, f"{data_dir}/images"))
zf.write(full_path, arcname=archive_name)
zip_buffer.seek(0)
zip_buffer.name = "training_dataset.zip"
dataset_artifact = client.upload_artifact(
session_id=session_id,
artifact_type="dataset",
name="vision_training_data",
file=zip_buffer,
llm_type="SpikingEvoMultiModalLM",
)
print(f"データセットアップロード完了: {dataset_artifact['artifact_id']}")
10. AEG-Comm通信最適化 ⭐ NEW
# AEG-Comm設定(通信を85-93%削減)
client.set_aeg_comm_config(
node_id="brain_node_1",
enable_comm=True,
energy_threshold=10.0,
critical_modalities=["force", "safety"],
)
# 通常のプロンプト送信と組み合わせて利用
client.submit_prompt(prompt="ロボットの動作計画")
result = client.poll_for_result(timeout=120)
print(result.get('response'))
# 通信統計確認
stats = client.get_communication_stats("brain_node_1")
print(f"通信削減率: {stats.get('reduction_rate')}")
11. 分散Coordinator ⭐ NEW
client.init_coordinator(
node_id="coordinator_node_1",
zenoh_config={"connect": ["tcp/127.0.0.1:7447"]},
raft_config={"election_timeout": [5000, 10000]},
)
client.start_coordinator()
print("分散coordinatorが開始されました")
task_id = client.submit_coordination_task(
task_type="federated_learning",
payload={
"model": "resnet50",
"dataset": "cifar10",
"rounds": 10,
},
)
print(f"協調タスク送信: {task_id}")
status = client.get_coordination_task_status(task_id)
if status:
print(f"タスク状態: {status.get('status')}")
cluster_status = client.get_cluster_status()
print(f"クラスタ状態: {cluster_status}")
node_info = {
"address": "192.168.1.100",
"port": 8001,
"capabilities": ["gpu", "cpu"],
}
success = client.register_coordination_node("node_2", node_info)
print(f"ノード登録: {'成功' if success else '失敗'}")
client.stop_coordinator()
print("分散coordinatorが停止されました")
サーバー情報の確認
# サーバーヘルスチェック
health = client.health_check()
print(f"サーバーステータス: {health.get('status')}")
# サーバー情報
info = client.get_server_info()
print(f"サーバー情報: {info}")
# ステータス監視
status = client.get_simulation_status()
print(f"現在のプロンプトステータス: {status.get('last_prompt_status', 'N/A')}")
print(f"アクティブノード数: {len(status.get('nodes', []))}")
よくあるエラーと解決方法
| エラー | 原因 | 解決策 |
|---|---|---|
ConnectionError |
APIサーバーが起動していない | sudo ./scripts/run_api_server.sh で起動 |
Timeout |
処理が遅い | timeoutパラメータを増やす |
Invalid prompt |
プロンプトが条件を満たさない | validate_prompt()で事前確認 |
次のステップ
- 📖 完全なSDKドキュメントを読む
- 📁 サンプルコードを確認する
- 🔧 トラブルシューティングを参照する
- 💬 GitHub Issuesで質問する
高度な機能 (P3実装完了)
🔄 遅延監視と最適化
# 遅延統計の取得
latency_stats = client.get_latency_stats()
print(f"平均遅延: {latency_stats['mean']:.2f}ms")
print(f"95パーセンタイル: {latency_stats['p95']:.2f}ms")
# 遅延ターゲットの確認
target_met = client.check_latency_target(500.0) # 500ms目標
print(f"遅延目標達成: {target_met}")
💾 スナップショット/復旧
# システムスナップショット作成
snapshot_result = client.create_snapshot(
snapshot_name="backup_20251212",
include_models=True,
include_data=True
)
# スナップショット一覧
snapshots = client.list_snapshots()
# システム復旧
restore_result = client.restore_snapshot(
snapshot_path="/path/to/snapshot.gz",
restore_models=True,
restore_data=True
)
📊 スケーラビリティテスト
# スケーラビリティテスト実行
test_result = client.run_scalability_test(
max_nodes=1000,
test_duration=300.0,
load_pattern="linear"
)
# リソース使用状況取得
resources = client.get_resource_usage()
print(f"CPU使用率: {resources['cpu_usage']}%")
print(f"メモリ使用量: {resources['memory_usage']}MB")
🔧 ハードウェア最適化
# ハードウェア最適化(ONNX/量子化など)
optimization_result = client.optimize_model(
model_type="vision", # "vision" | "audio"
optimizations=["onnx", "quantize"]
)
# モデルベンチマーク
benchmark_result = client.benchmark_model(
model_type="vision",
num_runs=50
)
🛡️ 高可用性監視
# 可用性ステータス取得
availability = client.get_availability_status()
print(f"全体可用性: {availability['overall_availability']}%")
print(f"アップタイム: {availability['uptime_percentage']}%")
# ヘルスチェック実行
health_result = client.perform_health_check()
# 可用性統計取得
stats = client.get_availability_stats(time_window="24h")
🌐 非同期Zenoh通信
# Zenoh通信統計取得
zenoh_stats = client.get_zenoh_stats()
print(f"メッセージ数: {zenoh_stats['messages_sent']}")
print(f"平均遅延: {zenoh_stats['avg_latency']}ms")
⚖️ 分散コンセンサス
# コンセンサス提案
proposal_result = client.propose_consensus_decision(
decision_type="resource_allocation",
payload={"resource": "gpu", "amount": 50},
priority=1
)
# コンセンサス結果取得
result = client.get_consensus_result(proposal_result['proposal_id'])
# コンセンサス統計
consensus_stats = client.get_consensus_stats()
チートシート
from evospikenet.sdk import EvoSpikeNetAPIClient
client = EvoSpikeNetAPIClient()
# ヘルス/基本接続
client.health_check()
client.get_server_info()
# テキスト生成
client.generate(prompt)
client.batch_generate(prompts)
client.submit_prompt(prompt)
client.poll_for_result()
# RAG v2
client.rag_v2_preprocessing_health()
client.rag_v2_search("EV-2024-001 の進捗会議")
client.rag_v2_feedback("session_id", rating=5)
# 検証・制御
client.validate_prompt(prompt)
client.with_error_handling(func)
# ステータス・ログ
client.get_simulation_status()
client.get_simulation_result()
client.get_remote_log()
# アーティファクト管理
client.create_log_session("demo")
client.upload_artifact(...)
client.download_artifact("artifact_id", "./out.bin")
client.list_artifacts()
LLMトレーニングジョブの実行(利用可能なビルド向け)
Vision Encoderトレーニング
from evospikenet.sdk import EvoSpikeNetAPIClient
client = EvoSpikeNetAPIClient()
if hasattr(client, "submit_training_job") and hasattr(client, "get_training_status"):
job_data = {
"category": "Vision",
"model_name": "google/vit-base-patch16-224",
"dataset_path": "data/llm_training/Vision/vision_data.jsonl",
"output_dir": "saved_models/Vision/vision-training-run",
"gpu": True,
"epochs": 3,
"batch_size": 8,
"learning_rate": 0.00001,
}
response = client.submit_training_job(job_data)
print(f"トレーニングジョブを開始しました: {response['job_id']}")
status = client.get_training_status(response['job_id'])
print(f"ジョブステータス: {status['status']}")
else:
print("この SDK ビルドでは training job API は利用できません。")
Audio Encoderトレーニング
if hasattr(client, "submit_training_job"):
job_data = {
"category": "Audio",
"model_name": "openai/whisper-base",
"dataset_path": "data/llm_training/Audio/audio_data.jsonl",
"output_dir": "saved_models/Audio/audio-training-run",
"gpu": True,
"epochs": 3,
"batch_size": 8,
"learning_rate": 0.00001,
}
response = client.submit_training_job(job_data)
print(f"Audioトレーニングジョブを開始しました: {response['job_id']}")
else:
print("Audio training job API は利用できません。")
トレーニングジョブの監視
if hasattr(client, "list_training_jobs") and hasattr(client, "get_training_job_details"):
jobs = client.list_training_jobs()
for job in jobs:
print(f"ジョブID: {job['job_id']}, ステータス: {job['status']}, カテゴリ: {job['category']}")
job_details = client.get_training_job_details("vision_training_job_001")
print(f"ジョブ詳細: {job_details}")
else:
print("training job monitoring API は利用できません。")
分散脳ノード対応トレーニング
from evospikenet.node_types import NODE_TYPE_DEFINITIONS, node_type_for_rank
rank = 2
node_type = node_type_for_rank(rank)
default_llm = NODE_TYPE_DEFINITIONS[node_type]["llm"]
training_job = {
"category": node_type.capitalize(),
"model_name": default_llm,
"dataset_path": f"data/llm_training/{node_type}/{node_type}_data.jsonl",
"output_dir": f"saved_models/{node_type}/rank{rank}",
"gpu": True,
"epochs": 5,
"batch_size": 16,
"learning_rate": 0.00002,
"rank": rank,
}
if hasattr(client, "submit_training_job"):
client.submit_training_job(training_job)
else:
print("ノード対応 training job API は利用できません。")
分散coordinatorの guarded example
from evospikenet.sdk import EvoSpikeNetAPIClient
client = EvoSpikeNetAPIClient()
if hasattr(client, "init_coordinator") and hasattr(client, "submit_coordination_task"):
client.init_coordinator(
node_id="coordinator_node_1",
zenoh_config={"connect": ["tcp/127.0.0.1:7447"]},
raft_config={"election_timeout": [5000, 10000]},
)
client.start_coordinator()
task_id = client.submit_coordination_task(
"federated_learning",
{
"model": "resnet50",
"dataset": "cifar10",
"epochs": 10,
},
)
print(f"タスク送信完了: {task_id}")
status = client.get_coordination_task_status(task_id)
print(f"タスク状態: {status}")
cluster_status = client.get_cluster_status()
print(f"クラスタ状態: {cluster_status}")
client.stop_coordinator()
else:
print("この SDK ビルドでは coordinator API は利用できません。")
Happy coding! 🚀