Skip to content

EvoSpikeNet SDK Quick Start Guide

Author: Masahiro Aoki

Last updated: May 22, 2026 Get started with EvoSpikeNet SDK in 30 seconds

Purpose and use of this document

  • Purpose: To show the steps to set up the SDK in the shortest possible time and run the API client.
  • Target audience: Developers who are starting to use the SDK.
  • First reading order: Installation → API server startup → Minimal usage example.
  • Related links: Distributed brain script is examples/run_zenoh_distributed_brain.py (as the operation confirmation environment), PFC/Zenoh/Executive details are implementation/PFC_ZENOH_EXECUTIVE.md.
  • Implementation notes (artifacts): docs/implementation/ARTIFACT_MANIFESTS.md — See artifact_manifest.json and CLI flag specifications (--artifact-name / --precision / --quantize / --privacy-level / --node-type).

install

pip install -e .

Start API server

sudo ./scripts/run_api_server.sh

Minimal usage example

from evospikenet.sdk import EvoSpikeNetAPIClient

# Client initialization (assuming the API server is running at http://localhost:8000)
client = EvoSpikeNetAPIClient(base_url="http://localhost:8000")

# Server response confirmation and simple call
health = client.health_check()
if health.get("status") == "healthy":
    try:
        result = client.generate("人工知能とは", max_length=128)
        print(result.get('generated_text', result))
    except Exception as e:
        print('API call failed:', e)
else:
    print('Could not connect to the server. Confirm that the API server is running.')

Try RAG v2 in the shortest path

This example assumes an API server that exposes /api/v2/rag/search, /api/v2/rag/feedback, and /api/v2/rag/preprocessing/health.

from evospikenet.sdk import EvoSpikeNetAPIClient

client = EvoSpikeNetAPIClient(base_url="http://localhost:8000")

# 1. Check preprocessing readiness
health = client.rag_v2_preprocessing_health()
print("ready:", health.get("ready"))

# 2. Run a search
search = client.rag_v2_search(
    query="EV-2024-001 の進捗会議",
    top_k=3,
    return_debug_info=True,
)
print("session_id:", search.get("session_id"))
print("top result:", search.get("results", [])[:1])

# 3. Send feedback
feedback = client.rag_v2_feedback(search["session_id"], rating=5)
print("feedback_id:", feedback.get("feedback_id"))

For a CLI run, use examples/sdk/programs/rag_v2_sdk_demo.py.

python examples/sdk/programs/rag_v2_sdk_demo.py --query "EV-2024-001 の進捗会議"
python examples/sdk/programs/rag_v2_sdk_demo.py --rating 5

Frequently used patterns

1️⃣ Simple text generation

from evospikenet.sdk import EvoSpikeNetAPIClient

client = EvoSpikeNetAPIClient()
result = client.generate("機械学習の応用例を5つ列挙してください")
print(result['generated_text'])

2️⃣ Handling multiple prompts

prompts = ["What is AI?", "Explain machine learning", "Deep learning basics"]
results = client.batch_generate(prompts, max_length=100)

for prompt, result in zip(prompts, results):
    print(f"{prompt}: {result.get('generated_text', 'Failed')}")

3️⃣ Multimodal processing including images

response = client.submit_prompt(
    prompt="この画像に写っているものは何ですか?",
    image_path="./image.jpg"
)
result = client.poll_for_result(timeout=60)
print(result['response'])

4️⃣ Execution with error handling

# prompt validation
if client.validate_prompt("テストプロンプト"):
    # Run with automatic retry
    result = client.with_error_handling(
        client.generate,
        prompt="テストプロンプト",
        max_length=100,
        retries=3
    )
    if result:
        print("成功:", result['generated_text'])

5️⃣ Monitoring asynchronous tasks

# Send task
client.submit_prompt(prompt="複雑なタスク")

# poll for results
result = client.poll_for_result(timeout=120, interval=5)

if result:
    print("結果:", result['response'])
else:
    print("タイムアウト")

6️⃣ Model save and restore

import io
import torch

# Create session
session = client.create_log_session("model training experiment")
session_id = session['session_id']

# Save model
model_buffer = io.BytesIO()
torch.save(model.state_dict(), model_buffer)
model_buffer.seek(0)

# Upload
artifact = client.upload_artifact(
    session_id=session_id,
    artifact_type="model",
    name="model.pth",
    file=model_buffer,
)

# Download
client.download_artifact(
    artifact_id=artifact['artifact_id'],
    destination_path="./downloaded_model.pth",
)

7️⃣ Get node discovery information

health = client.node_discovery_health()
print("active nodes", health.get("summary", {}).get("active"))

topo = client.node_discovery_topology()
print("topology nodes", len(topo.get("nodes", [])))

8️⃣ Save and apply a genome

if hasattr(client, 'list_genomes'):
    genomes = client.list_genomes()
    if genomes:
        target = genomes[0].get('name')
        genome = client.get_genome(target)
        genome.setdefault('metadata', {})['note'] = 'edited via quickstart'
        client.save_genome(target, genome, make_active=True)
        client.apply_genome(target)
else:
    print('The genome management API is not available in this SDK build.')

Sample: examples/genome_management_sdk.py

9️⃣ Dataset upload

import io
import os
import zipfile

data_dir = "./training_data"
os.makedirs(f"{data_dir}/images", exist_ok=True)

zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
    zf.write(f"{data_dir}/captions.csv", arcname='captions.csv')
    for root, _, files in os.walk(f"{data_dir}/images"):
        for file in files:
            full_path = os.path.join(root, file)
            archive_name = os.path.join('images', os.path.relpath(full_path, f"{data_dir}/images"))
            zf.write(full_path, arcname=archive_name)

zip_buffer.seek(0)
zip_buffer.name = "training_dataset.zip"

dataset_artifact = client.upload_artifact(
    session_id=session_id,
    artifact_type="dataset",
    name="vision_training_data",
    file=zip_buffer,
    llm_type="SpikingEvoMultiModalLM",
)

print(f"Dataset upload completed: {dataset_artifact['artifact_id']}")

10. AEG-Comm communication optimization ⭐ NEW

client.set_aeg_comm_config(
    node_id="brain_node_1",
    enable_comm=True,
    energy_threshold=10.0,
    critical_modalities=["force", "safety"],
)

client.submit_prompt(prompt="ロボットの動作計画")
result = client.poll_for_result(timeout=120)
print(result.get('response'))

stats = client.get_communication_stats("brain_node_1")
print(f"reduction_rate: {stats.get('reduction_rate')}")

11. Distributed Coordinator ⭐ NEW

client.init_coordinator(
    node_id="coordinator_node_1",
    zenoh_config={"connect": ["tcp/127.0.0.1:7447"]},
    raft_config={"election_timeout": [5000, 10000]},
)

client.start_coordinator()
print("Distributed coordinator started")

task_id = client.submit_coordination_task(
    task_type="federated_learning",
    payload={
        "model": "resnet50",
        "dataset": "cifar10",
        "rounds": 10,
    },
)
print(f"Task submitted: {task_id}")

status = client.get_coordination_task_status(task_id)
if status:
    print(f"Task status: {status.get('status')}")

cluster_status = client.get_cluster_status()
print(f"Cluster status: {cluster_status}")

node_info = {
    "address": "192.168.1.100",
    "port": 8001,
    "capabilities": ["gpu", "cpu"],
}
success = client.register_coordination_node("node_2", node_info)
print(f"Node registration: {'ok' if success else 'failed'}")

client.stop_coordinator()
print("Distributed coordinator stopped")

Check server information

# Server health check
health = client.health_check()
print(f"server status: {health.get('status')}")

# Server info
info = client.get_server_info()
print(f"server info: {info}")

# Status monitoring
status = client.get_simulation_status()
print(f"last prompt status: {status.get('last_prompt_status', 'N/A')}")
print(f"active node count: {len(status.get('nodes', []))}")

Common errors and solutions

Error Cause Solution
ConnectionError API server is not started Start with sudo ./scripts/run_api_server.sh
Timeout Processing is slow Increase the timeout parameter
Invalid prompt Prompt does not meet the conditions Pre-check with validate_prompt()

Next steps


Advanced features (P3 implementation complete)

🔄 Latency monitoring and optimization

# Get delay statistics
latency_stats = client.get_latency_stats()
print(f"平均遅延: {latency_stats['mean']:.2f}ms")
print(f"95パーセンタイル: {latency_stats['p95']:.2f}ms")

# Checking the delay target
target_met = client.check_latency_target(500.0)  # 500ms target
print(f"遅延目標達成: {target_met}")

💾 Snapshot/Recovery

# Create system snapshot
snapshot_result = client.create_snapshot(
    snapshot_name="backup_20251212",
    include_models=True,
    include_data=True
)

# Snapshot list
snapshots = client.list_snapshots()

# system recovery
restore_result = client.restore_snapshot(
    snapshot_path="/path/to/snapshot.gz",
    restore_models=True,
    restore_data=True
)

📊 Scalability test

# Scalability test execution
test_result = client.run_scalability_test(
    max_nodes=1000,
    test_duration=300.0,
    load_pattern="linear"
)

# Get resource usage status
resources = client.get_resource_usage()
print(f"CPU使用率: {resources['cpu_usage']}%")
print(f"メモリ使用量: {resources['memory_usage']}MB")

🔧 Hardware optimization

# Hardware optimization (ONNX/quantization, etc.)
optimization_result = client.optimize_model(
    model_type="vision",              # "vision" | "audio"
    optimizations=["onnx", "quantize"]
)

# model benchmark
benchmark_result = client.benchmark_model(
    model_type="vision",
    num_runs=50
)

🛡️ High Availability Monitoring

# Get availability status
availability = client.get_availability_status()
print(f"全体可用性: {availability['overall_availability']}%")
print(f"アップタイム: {availability['uptime_percentage']}%")

# Run health check
health_result = client.perform_health_check()

# Get availability statistics
stats = client.get_availability_stats(time_window="24h")

🌐 Asynchronous Zenoh communication

# Zenoh communication statistics acquisition
zenoh_stats = client.get_zenoh_stats()
print(f"メッセージ数: {zenoh_stats['messages_sent']}")
print(f"平均遅延: {zenoh_stats['avg_latency']}ms")

⚖️ Distributed consensus

# consensus proposal
proposal_result = client.propose_consensus_decision(
    decision_type="resource_allocation",
    payload={"resource": "gpu", "amount": 50},
    priority=1
)

# Get consensus results
result = client.get_consensus_result(proposal_result['proposal_id'])

# consensus statistics
consensus_stats = client.get_consensus_stats()

Cheat Sheet

from evospikenet.sdk import EvoSpikeNetAPIClient

client = EvoSpikeNetAPIClient()

# Health / basic connectivity
client.health_check()
client.get_server_info()

# Text generation
client.generate(prompt)
client.batch_generate(prompts)
client.submit_prompt(prompt)
client.poll_for_result()

# RAG v2
client.rag_v2_preprocessing_health()
client.rag_v2_search("EV-2024-001 の進捗会議")
client.rag_v2_feedback("session_id", rating=5)

# Validation / control
client.validate_prompt(prompt)
client.with_error_handling(func)

# Status / logs
client.get_simulation_status()
client.get_simulation_result()
client.get_remote_log()

# Artifact management
client.create_log_session("demo")
client.upload_artifact(...)
client.download_artifact("artifact_id", "./out.bin")
client.list_artifacts()

Run LLM training jobs (for builds that expose the API)

Vision Encoder Training

from evospikenet.sdk import EvoSpikeNetAPIClient

client = EvoSpikeNetAPIClient()

if hasattr(client, "submit_training_job") and hasattr(client, "get_training_status"):
    job_data = {
        "category": "Vision",
        "model_name": "google/vit-base-patch16-224",
        "dataset_path": "data/llm_training/Vision/vision_data.jsonl",
        "output_dir": "saved_models/Vision/vision-training-run",
        "gpu": True,
        "epochs": 3,
        "batch_size": 8,
        "learning_rate": 0.00001,
    }
    response = client.submit_training_job(job_data)
    print(f"Started training job: {response['job_id']}")

    status = client.get_training_status(response['job_id'])
    print(f"Job status: {status['status']}")
else:
    print("This SDK build does not expose the training job API.")

Audio Encoder Training

if hasattr(client, "submit_training_job"):
    job_data = {
        "category": "Audio",
        "model_name": "openai/whisper-base",
        "dataset_path": "data/llm_training/Audio/audio_data.jsonl",
        "output_dir": "saved_models/Audio/audio-training-run",
        "gpu": True,
        "epochs": 3,
        "batch_size": 8,
        "learning_rate": 0.00001,
    }
    response = client.submit_training_job(job_data)
    print(f"Started audio training job: {response['job_id']}")
else:
    print("Audio training job API is unavailable in this build.")

Monitoring training jobs

if hasattr(client, "list_training_jobs") and hasattr(client, "get_training_job_details"):
    jobs = client.list_training_jobs()
    for job in jobs:
        print(f"job_id={job['job_id']} status={job['status']} category={job['category']}")

    job_details = client.get_training_job_details("vision_training_job_001")
    print(job_details)
else:
    print("Training job monitoring APIs are unavailable in this build.")

Distributed brain node support training

from evospikenet.node_types import NODE_TYPE_DEFINITIONS, node_type_for_rank

rank = 2
node_type = node_type_for_rank(rank)
default_llm = NODE_TYPE_DEFINITIONS[node_type]["llm"]

training_job = {
    "category": node_type.capitalize(),
    "model_name": default_llm,
    "dataset_path": f"data/llm_training/{node_type}/{node_type}_data.jsonl",
    "output_dir": f"saved_models/{node_type}/rank{rank}",
    "gpu": True,
    "epochs": 5,
    "batch_size": 16,
    "learning_rate": 0.00002,
    "rank": rank,
}

if hasattr(client, "submit_training_job"):
    client.submit_training_job(training_job)
else:
    print("Node-aware training API is unavailable in this build.")

Guarded distributed coordinator example

from evospikenet.sdk import EvoSpikeNetAPIClient

client = EvoSpikeNetAPIClient()

if hasattr(client, "init_coordinator") and hasattr(client, "submit_coordination_task"):
    client.init_coordinator(
        node_id="coordinator_node_1",
        zenoh_config={"connect": ["tcp/127.0.0.1:7447"]},
        raft_config={"election_timeout": [5000, 10000]},
    )
    client.start_coordinator()

    task_id = client.submit_coordination_task(
        "federated_learning",
        {
            "model": "resnet50",
            "dataset": "cifar10",
            "epochs": 10,
        },
    )
    print(f"Task submitted: {task_id}")

    status = client.get_coordination_task_status(task_id)
    print(f"Task status: {status}")

    cluster_status = client.get_cluster_status()
    print(f"Cluster status: {cluster_status}")

    client.stop_coordinator()
else:
    print("This SDK build does not expose the coordinator API.")

Happy coding! 🚀