EvoSpikeNet SDK Quick Start Guide
Copyright: 2026 Moonlight Technologies Inc. All Rights Reserved.
Author: Masahiro Aoki
Last updated: January 8, 2026 Get started with EvoSpikeNet SDK in 30 seconds
Purpose and use of this document
- Purpose: To show the steps to set up the SDK in the shortest possible time and run the API client.
- Target audience: Developers who are starting to use the SDK.
- First reading order: Installation → API server startup → Minimal usage example.
- Related links: Distributed brain script is
examples/run_zenoh_distributed_brain.py(as the operation confirmation environment), PFC/Zenoh/Executive details are implementation/PFC_ZENOH_EXECUTIVE.md. - Implementation notes (artifacts):
docs/implementation/ARTIFACT_MANIFESTS.md— Seeartifact_manifest.jsonand CLI flag specifications (--artifact-name/--precision/--quantize/--privacy-level/--node-type).
install
pip install -e .
Start API server
sudo ./scripts/run_api_server.sh
Minimal usage example
from evospikenet.sdk import EvoSpikeNetAPIClient
# Client initialization (assuming the API server is running at http://localhost:8000)
client = EvoSpikeNetAPIClient(base_url="http://localhost:8000")
# Server response confirmation and simple call
if client.wait_for_server(timeout=10):
try:
result = client.generate("人工知能とは", max_length=128)
print(result.get('generated_text', result))
except Exception as e:
print('API 呼び出しでエラー:', e)
else:
print('サーバーに接続できませんでした。API サーバーが起動していることを確認してください。')
Frequently used patterns
1️⃣ Simple text generation
<!-- TODO: update or remove - import fail<!-- Remember: Automatic conversion not possible — please fix manually -->eNetAPIClient -->
client = EvoSpikeNetAPIClient()
result = client.generate("機械学習の応用例を5つ列挙してください")
print(result['generated_text'])
2️⃣ Handling multiple prompts
prompts = ["What is AI?", "Explain machine learning", "Deep learning basics"]
results = client.batch_generate(prompts, max_length=100)
for prompt, result in zip(prompts, results):
print(f"{prompt}: {result.get('generated_text', 'Failed')}")
3️⃣ Multimodal processing including images
response = client.submit_prompt(
prompt="この画像に写っているものは何ですか?",
image_path="./image.jpg"
)
result = client.poll_for_result(timeout=60)
print(result['response'])
4️⃣ Execution with error handling
# prompt validation
if client.validate_prompt("テストプロンプト"):
# Run with automatic retry
result = client.with_error_handling(
client.generate,
prompt="テストプロンプト",
max_length=100,
retries=3
)
if result:
print("成功:", result['generated_text'])
5️⃣ Monitoring asynchronous tasks
# Send task
client.submit_prompt(prompt="複雑なタスク")
# poll for results
result = client.poll_for_result(timeout=120, interval=5)
if result:
print("結果:", result['response'])
else:
print("タイムアウト")
6️⃣ Model save and restore
7️⃣ Get node discovery information
# Get status from node discovery API
health = client.node_discovery_health()
print("active nodes", health.get("summary", {}).get("active"))
topo = client.node_discovery_topology()
print("topology nodes", len(topo.get("nodes", [])))
import torch
import io
# Create session
session = client.create_log_session("モデル訓練実験")
session_id = session['session_id']
# save model
model_buffer = io.BytesIO()
torch.save(model.state_dict(), model_buffer)
model_buffer.seek(0)
# upload
artifact = client.upload_artifact(
session_id=session_id,
artifact_type="model",
name="model.pth",
file=model_buffer
)
# download
client.download_artifact(
artifact_id=artifact['artifact_id'],
destination_path="./downloaded_model.pth"
)
### 7️⃣ Genome preservation and application (evolution dashboard linkage)
```python
# Example of using genome management API (if available)
if hasattr(client, 'list_genomes'):
genomes = client.list_genomes()
if genomes:
target = genomes[0].get('name')
genome = client.get_genome(target)
genome.setdefault('metadata', {})['note'] = 'edited via quickstart'
client.save_genome(target, genome, make_active=True)
client.apply_genome(target)
else:
# Safely guard against SDK version differences
print('The Genome Management API is not available in this SDK build.')```
サンプル: `examples/genome_management_sdk.py`
7️⃣ Dataset upload
import zipfile
import os
# Preparing the training data directory
data_dir = "./training_data"
os.makedirs(f"{data_dir}/images", exist_ok=True)
# ZIP the dataset
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
# Add CSV file
zf.write(f"{data_dir}/captions.csv", arcname='captions.csv')
# Add image file
for root, _, files in os.walk(f"{data_dir}/images"):
for file in files:
full_path = os.path.join(root, file)
archive_name = os.path.join('images', os.path.relpath(full_path, f"{data_dir}/images"))
zf.write(full_path, arcname=archive_name)
zip_buffer.seek(0)
zip_buffer.name = "training_dataset.zip"
# Dataset upload
dataset_artifact = client.upload_artifact(
session_id=session_id,
artifact_type="dataset",
name="vision_training_data",
file=zip_buffer,
llm_type="SpikingEvoMultiModalLM"
)
print(f"データセットアップロード完了: {dataset_artifact['artifact_id']}")
8️⃣ AEG-Comm communication optimization ⭐ NEW
# AEG-Comm settings (85-93% reduction in communication)
client.set_aeg_comm_config(
node_id="brain_node_1",
enable_comm=True,
energy_threshold=10.0,
critical_modalities=["force", "safety"]
)
# Distributed brain simulation execution (optimization enabled)
result = client.run_distributed_brain_simulation(
query="ロボットの動作計画",
modalities=["text", "force"],
config={"enable_aeg_comm": True}
)
# Check communication statistics
stats = client.get_communication_stats("brain_node_1")
print(f"通信削減率: {stats['reduction_rate']}%")
9️⃣ Distributed Coordinator ⭐ NEW
# Initializing the distributed coordinator
client.init_coordinator(
node_id="coordinator_node_1",
zenoh_config={"connect": ["tcp/127.0.0.1:7447"]},
raft_config={"election_timeout": [5000, 10000]}
)
# Start Coordinator
client.start_coordinator()
print("分散coordinatorが開始されました")
# Submit a collaborative task
task_id = client.submit_coordination_task(
task_type="federated_learning",
payload={
"model": "resnet50",
"dataset": "cifar10",
"rounds": 10
}
)
print(f"協調タスク送信: {task_id}")
# Check task status
status = client.get_coordination_task_status(task_id)
if status:
print(f"タスク状態: {status['status']}")
# Get cluster status
cluster_status = client.get_cluster_status()
print(f"リーダー: {cluster_status['leader_id']}")
print(f"アクティブノード数: {len(cluster_status['nodes'])}")
# Registering a new node
node_info = {
"address": "192.168.1.100",
"port": 8001,
"capabilities": ["gpu", "cpu"]
}
success = client.register_coordination_node("node_2", node_info)
print(f"ノード登録: {'成功' if success else '失敗'}")
# Stopping the Coordinator
client.stop_coordinator()
print("分散coordinatorが停止されました")
> 内部タスク実装(簡易版): `federated_learning` は updates の数値平均、`distributed_inference` は inputs/batches をそのまま完了扱い、`model_aggregation` は weights を平均化して返却します。ノード発見は `/nodes/list` 応答を取り込みつつ、古いハートビートのノードは自動クリーンアップされます。
Check server information
# Server health check
is_healthy = client.is_server_healthy()
print(f"サーバーは正常ですか?: {'はい' if is_healthy else 'いいえ'}")
# status monitoring
status = client.get_simulation_status()
print(f"現在のプロンプトステータス: {status.get('last_prompt_status', 'N/A')}")
print(f"アクティブノード数: {len(status.get('nodes', []))}")
Common errors and solutions
| Error | Cause | Solution |
|---|---|---|
ConnectionError |
API server is not started | Start with sudo ./scripts/run_api_server.sh |
Timeout |
Processing is slow | Increase the timeout parameter |
Invalid prompt |
Prompt does not meet the conditions | Pre-check with validate_prompt() |
Next steps
- 📖 Read Complete SDK Documentation
- 📁 Check sample code
- 🔧 Refer to Troubleshooting
- 💬 Ask a question on GitHub Issues
Advanced features (P3 implementation complete)
🔄 Latency monitoring and optimization
# Get delay statistics
latency_stats = client.get_latency_stats()
print(f"平均遅延: {latency_stats['mean']:.2f}ms")
print(f"95パーセンタイル: {latency_stats['p95']:.2f}ms")
# Checking the delay target
target_met = client.check_latency_target(500.0) # 500ms target
print(f"遅延目標達成: {target_met}")
💾 Snapshot/Recovery
# Create system snapshot
snapshot_result = client.create_snapshot(
snapshot_name="backup_20251212",
include_models=True,
include_data=True
)
# Snapshot list
snapshots = client.list_snapshots()
# system recovery
restore_result = client.restore_snapshot(
snapshot_path="/path/to/snapshot.gz",
restore_models=True,
restore_data=True
)
📊 Scalability test
# Scalability test execution
test_result = client.run_scalability_test(
max_nodes=1000,
test_duration=300.0,
load_pattern="linear"
)
# Get resource usage status
resources = client.get_resource_usage()
print(f"CPU使用率: {resources['cpu_usage']}%")
print(f"メモリ使用量: {resources['memory_usage']}MB")
🔧 Hardware optimization
# Hardware optimization (ONNX/quantization, etc.)
optimization_result = client.optimize_model(
model_type="vision", # "vision" | "audio"
optimizations=["onnx", "quantize"]
)
# model benchmark
benchmark_result = client.benchmark_model(
model_type="vision",
num_runs=50
)
🛡️ High Availability Monitoring
# Get availability status
availability = client.get_availability_status()
print(f"全体可用性: {availability['overall_availability']}%")
print(f"アップタイム: {availability['uptime_percentage']}%")
# Run health check
health_result = client.perform_health_check()
# Get availability statistics
stats = client.get_availability_stats(time_window="24h")
🌐 Asynchronous Zenoh communication
# Zenoh communication statistics acquisition
zenoh_stats = client.get_zenoh_stats()
print(f"メッセージ数: {zenoh_stats['messages_sent']}")
print(f"平均遅延: {zenoh_stats['avg_latency']}ms")
⚖️ Distributed consensus
# consensus proposal
proposal_result = client.propose_consensus_decision(
decision_type="resource_allocation",
payload={"resource": "gpu", "amount": 50},
priority=1
)
# Get consensus results
result = client.get_consensus_result(proposal_result['proposal_id'])
# consensus statistics
consensus_stats = client.get_consensus_stats()
Cheat Sheet
<!-- TODO: update<!-- モジュール 'evospikenet' が見つかりません。パッケージ内の移動/名前変更を確認してください -->kenet.sdk import EvoSpikeNetr() # Waiting for startup
client.is_server_healthy() # health check
# text generation
client.generate(prompt) # simple generation
client.batch_generate(prompts) # Batch processing
client.submit_prompt(prompt) # Asynchronous sending
client.poll_for_result() # Waiting for results
# Verification/control
client.validate_prompt(prompt) # prompt validation
client.with_error_handling(func) # Execute with retry
# status log
client.get_simulation_status() # Get status
client.get_simulation_result() # Get results
client.get_remote_log() # Log acquisition
# Artifact management
client.create_log_session() # Create session
client.upload_artifact() # upload
client.download_artifact() # download
client.list_artifacts() # List display
6️⃣ Run LLM training jobs (new feature)
Vision Encoder Training
<!-- TODO: update or remove - impo<!-- Module 'evospikenet' not found. Please check the move/rename in the package -->EvoSpikeNetAPIClient -->
client = EvoSpikeNetAPICli<!-- Remember: Cannot convert automatically — please fix manually --> "model_name": "google/vit-base-patch16-224",
"dataset_path": "data/llm_training/Vision/vision_data.jsonl",
"output_dir": "saved_models/Vision/vision-training-run",
"gpu": True,
"epochs": 3,
"batch_size": 8,
"learning_rate": 0.00001
}
response = client.submit_training_job(job_data)
print(f"トレーニングジョブを開始しました: {response['job_id']}")
# Check job status
status = client.get_training_status(response['job_id'])
print(f"ジョブステータス: {status['status']}")
Audio Encoder Training
# Submitting an Audio Encoder training job
job_data = {
"category": "Audio",
"model_name": "openai/whisper-base",
"dataset_path": "data/llm_training/Audio/audio_data.jsonl",
"output_dir": "saved_models/Audio/audio-training-run",
"gpu": True,
"epochs": 3,
"batch_size": 8,
"learning_rate": 0.00001
}
response = client.submit_training_job(job_data)
print(f"Audioトレーニングジョブを開始しました: {response['job_id']}")
Monitoring training jobs
# Get list of all training jobs
jobs = client.list_training_jobs()
for job in jobs:
print(f"ジョブID: {job['job_id']}, ステータス: {job['status']}, カテゴリ: {job['category']}")
# Get specific job details
job_details = client.get_training_job_details("vision_training_job_001")
print(f"ジョブ詳細: {job_details}")
Distributed brain node support training
# Example of retrieving node type definition from shared module
from evospikenet.node_types import node_type_for_rank, NODE_TYPE_DEFINITIONS
# Rank → type conversion example
rank = 2
nt = node_type_for_rank(rank) # For example "vision"
# Default model associated with node type
default_llm = NODE_TYPE_DEFINITIONS[nt]["llm"]
# Create training job
training_job = {
"category": nt.capitalize(),
"model_name": default_llm,
"dataset_path": f"data/llm_training/{nt}/{nt}_data.jsonl",
"output_dir": f"saved_models/{nt}/rank{rank}",
"gpu": True,
"epochs": 5,
"batch_size": 16,
"learning_rate": 0.00002,
# Rank can also be specified
"rank": rank,
}
client.submit_training_job(training_job)
```### 6️⃣ Using a distributed coordinator
```python
# Guarded example: initialize API client and coordinator-related calls
try:
from evospikenet.sdk import EvoSpikeNetAPIClient
except Exception:
EvoSpikeNetAPIClient = None
if EvoSpikeNetAPIClient is not None:
client = EvoSpikeNetAPIClient()
# Coordinator initialization (if provided via API)
if hasattr(client, "init_coordinator"):
client.init_coordinator()
print("Coordinator started")
# Submitting a task (depending on the API present)
if hasattr(client, "submit_coordination_task"):
task_id = client.submit_coordination_task(
"federated_learning",
{
"model": "resnet50",
}
)
else:
print("EvoSpikeNetAPIClient not available in this environment; skip coordinator example")
"dataset": "cifar10",
"epochs": 10
}
)
print(f"タスク送信完了: {task_id}")
# Check task status
status = client.get_coordination_task_status(task_id)
if status:
print(f"タスク状態: {status['status']}")
# Get cluster status
cluster_status = client.get_cluster_status()
print(f"アクティブノード数: {cluster_status['active_nodes']}")
# Registering a new node
node_info = {
"address": "192.168.1.100",
"port": 8001,
"capabilities": ["gpu", "cpu"]
}
client.register_coordination_node("node_2", node_info)
# Stopping the Coordinator
client.stop_coordinator()
print("分散coordinatorが停止されました")
Happy coding! 🚀