EvoSpikeNet Python SDK documentation
Copyright: 2026 Moonlight Technologies Inc.
Author: Masahiro Aoki
Implementation notes (artifacts): See
docs/implementation/ARTIFACT_MANIFESTS.mdfor theartifact_manifest.jsonoutput by the training script and recommended CLI flags.
Last updated: December 12, 2025
Purpose and use of this document
- Purpose: Understand the overall picture and main functions of the SDK, and guide you through the steps from setup to usage.
- Target audience: Developers who are starting to use the SDK, and those in charge of API collaboration.
- First reading order: 1. Overview → 2. Setup and Installation → 3. Quick Start/Sample Code.
- Related links: Distributed brain script is
examples/run_zenoh_distributed_brain.py(example of operating environment), PFC/Zenoh/Executive details are implementation/PFC_ZENOH_EXECUTIVE.md.
1. Overview
The EvoSpikeNet Python SDK is a client library that provides a high-level interface for interacting with the EvoSpikeNet API. This SDK allows developers to easily integrate EvoSpikeNet's text generation, data logging, and distributed brain simulation capabilities into their applications with a few lines of Python code, without having to worry about the details of HTTP requests.
New integration features
- 🔄 Latency monitoring:
get_latency_stats(),check_latency_target() - 💾 Snapshot management:
create_snapshot(),restore_snapshot(),list_snapshots(),delete_snapshot(),validate_snapshot(),cleanup_snapshots() - 📊 Scalability testing:
run_scalability_test(),test_node_scalability(),run_stress_test(),get_resource_usage(),get_system_limits() - 🔧 Hardware optimization:
optimize_model(),benchmark_model(),get_hardware_info() - 🛡️ High availability monitoring:
get_availability_status(),get_availability_stats(),perform_health_check(),trigger_recovery_action() - 🌐 Asynchronous Zenoh communication:
connect_zenoh(),publish_zenoh_message(),send_zenoh_request(),send_zenoh_notification(),get_zenoh_stats() - ⚖️ Distributed consensus:
propose_consensus_decision(),get_consensus_result(),update_node_status(),cleanup_consensus()
SDK Availability Metrics
- API Compatibility: Supports over 25 new endpoints
- Error Handling: Comprehensive exception handling and retry mechanism
- Performance: Guaranteed < 500ms response time for all functions
- Availability: 99.9%+ API availability
- Scalability: Supports parallel operations on more than 1000 nodes
1.2. List of main methods
The main methods available in the SDK are listed below:
Basic functions
generate(prompt, max_length): Text generationsubmit_prompt(prompt, config): Send promptbatch_generate(prompts, max_length): Batch text generation
Distributed brain simulation
get_simulation_status(): Get simulation statusget_simulation_result(): Get simulation resultpoll_for_result(timeout, interval): result polling
Artifact Management
upload_artifact(file_path, artifact_type, metadata): Artifact uploadlist_artifacts(artifact_type): Artifact listdownload_artifact(artifact_id, destination_path): Artifact download
Log management
get_remote_log(user, ip, key_path, log_file_path): Get remote logcreate_log_session(description): Create log session
New P3 features
get_latency_stats(): Get latency statisticscreate_snapshot(): Create snapshotrun_scalability_test(): Execute scalability testoptimize_model(): Model optimizationconnect_zenoh(): Zenoh connectionpropose_consensus_decision(): Consensus decision proposal
Distributed Coordinator ⭐ NEW
init_coordinator(node_id, zenoh_config, raft_config): Distributed coordinator initializationstart_coordinator(): Start coordinatorstop_coordinator(): Stop coordinatorsubmit_coordination_task(task_type, payload): Cooperative task submission (simple built-in implementation: federated_learning=updates average, distributed_inference=input completed immediately, model_aggregation=weights average)get_coordination_task_status(task_id): Get task statusget_cluster_status(): Get cluster statusregister_coordination_node(node | NodeInfo, node_info=None): Node registration (ID+dict or NodeInfo)unregister_coordination_node(node_id): Unregister node
2. Setup and installation
2.1. Prerequisites
- Python 3.8 or later
requestslibrary- A running EvoSpikeNet API server
2.2. Installation steps
This SDK is provided as part of the evospikenet package. Run the following command in the project root directory to install the project in editable mode.
pip install -e .
2.3. Starting the API server
Before using the SDK, the API server must be started:
# When using Docker Compose (recommended)
sudo ./scripts/run_api_server.sh
# Or start all services (including UI)
sudo ./scripts/run_frontend_cpu.sh
3. EvoSpikeNetAPIClient class
A central class that manages all communication with the API.
3.1. Initialization
<!-- from evospikenet.sdk import EvoSpikeNetAPIClient -->
# If the API server is running at the default URL (http://localhost:8000)
client = EvoSpikeNetAPIClient()
# When connecting from within the Docker environment
client = EvoSpikeNetAPIClient(base_url="http://api:8000")
3.2. Health check
is_server_healthy() -> bool
Check whether the API server is working properly.
wait_for_server(timeout: int = 60, interval: int = 2) -> bool
Wait until the server becomes responsive.
node_discovery_health() -> Dict[str, Any]
Get the health information of the node discovery service via SDK. The return dictionary conforms to /api/node-discovery/health and includes a nodes list and a summary.
node_discovery_topology() -> Dict[str, Any]
Get the latest network topology information. The SDK calls /api/node-discovery/topology.
3.3. SDK usage example for node detection service
The example below shows how to initialize the SDK client and retrieve health status and topology from the node discovery API.
from evospikenet.sdk import EvoSpikeNetAPIClient
client = EvoSpikeNetAPIClient()
# Get node health status
health = client.node_discovery_health()
print("Health summary:", health.get("summary"))
for node in health.get("nodes", []):
print(node["node_id"], node["status"], node.get("health_score"))
# Get network topology
topo = client.node_discovery_topology()
print("Nodes:", len(topo.get("nodes", [])))
print("Edges:", len(topo.get("edges", [])))
By using the above method, you can easily access node detection information from external systems or operational scripts.
example:```python client = EvoSpikeNetAPIClient()
print("サーバーを待機中...") if client.wait_for_server(timeout=60): print("✅ APIサーバーは正常に稼働しています") else: print("❌ APIサーバーに接続できませんでした")
---
## 4. Node types and model categories
EvoSpikeNet supports various brain node types and model categories for distributed brain simulation.
### 4.1. Node types
The following node types are supported:
| Node Type | Description | Rank |
|-------------|------|------|
| `vision` | Visual node (occipital lobe V1-V5) | 1 |
| `motor` | Motor node (motor cortex M1 + cerebellum + spinal cord) | 2 |
| `auditory` | Auditory node (temporal lobe A1-A2) | 5 |
| `speech` | Speech generation node (Broca's area + cerebellum) | 6 |
| `executive` | Executive control node (prefrontal cortex dlPFC) | 0 |
| `general` | General node | N/A |
### 4.2. Model Categories
Each node type supports specific model categories:
#### Visual node category
- `image_classification`: Image classification
- `object_detection`: Object detection
- `semantic_segmentation`: Semantic segmentation
- `image_generation`: Image generation
- `visual_qa`: Visual question answering
#### Exercise node category
- `motion_control`: motion control
- `trajectory_planning`: Trajectory planning
- `inverse_kinematics`: Inverse kinematics
- `motor_adaptation`: Motor adaptation
#### Auditory Node Category
- `speech_recognition`: Speech recognition
- `audio_classification`: Audio classification
- `sound_event_detection`: Sound event detection
- `speaker_recognition`: Speaker recognition
#### Audio node category
- `text_to_speech`: Text-to-speech synthesis
- `voice_conversion`: Voice conversion
- `speech_synthesis`: Speech synthesis
#### Execution control node category
- `text_generation`: Text generation
- `decision_making`: Decision making
- `planning`: Planning
- `reasoning`: Reasoning
- `rag`: Search extension generation (RAG)
#### Generic category
- `multimodal`: Multimodal processing
- `embedding`: embedding generation
- `tokenization`: Tokenization
---
## 5. Text generation
### 4.1. Basic text generation
#### `generate(prompt: str, max_length: int = 50) -> Dict[str, str]`
Calls the standard text generation endpoint (`/api/generate`).
**example:**```python
result = client.generate("人工知能とは", max_length=100)
print(f"生成テキスト: {result.get('generated_text', '')}")
4.2. Batch processing
batch_generate(prompts: List[str], max_length: int = 50) -> List[Dict]
Process multiple prompts in sequence.
example:```python prompts = ["AIとは?", "機械学習の応用例"] results = client.batch_generate(prompts) for res in results: print(res.get('generated_text', 'エラー'))
### 4.3. Execution with error handling
#### `with_error_handling(func: Callable, retries: int = 3, *args, **kwargs)`
Wraps API calls and automatically retries on failure with exponential backoff.
**example:**```python
result = client.with_error_handling(
client.generate,
retries=3,
prompt="テストプロンプト",
max_length=50
)
if result:
print(f"成功: {result['generated_text']}")
else:
print("失敗: すべてのリトライが失敗しました")
5. Distributed brain simulation
5.1. Sending multimodal prompts
submit_prompt(prompt: str = None, image_path: str = None, audio_path: str = None) -> Dict
Send multimodal prompts to the simulation. Image and audio files are internally encoded in Base64.
example:```python
Send a combination of text and images
response = client.submit_prompt( prompt="この画像に写っているものは何ですか?", image_path="./examples/dummy_image.png" ) prompt_id = response.get('prompt_id') print(f"プロンプト送信成功: {prompt_id}")
### 5.2. Polling for results
#### `poll_for_result(timeout: int = 120, interval: int = 5) -> Optional[Dict]`
Periodically polls the system results endpoint to get the latest results available (the SDK implementation internally references the global results endpoint). When handling individual `prompt_id`, filter the identifier returned in the server response on the client side.
**example:**```python
# After sending to the server, wait for the result in a certain amount of time
result = client.poll_for_result(timeout=120, interval=5)
if result and result.get('response'):
print(f"✅ 応答: {result['response']}")
else:
print("❌ タイムアウトまたはエラー")
5.3. Status monitoring and remote logging
get_simulation_status() -> Dict
Get the current overall simulation status.
get_remote_log(user: str, ip: str, key_path: str, log_file_path: str) -> Dict
Get the remote node's log file (last 100 lines) via SSH.
example:```python log_data = client.get_remote_log( user="ubuntu", ip="192.168.1.101", key_path="~/.ssh/id_rsa", log_file_path="/tmp/sim_rank_1.log" ) print(log_data.get('log_content'))
---
## 6. Data logging and artifact management
### 6.1. Creating a session
#### `create_log_session(description: str) -> Dict`
Start a new experimental session and get the `session_id`.
**example:**```python
session = client.create_log_session(description="テキストモデルのファインチューニング")
session_id = session['session_id']
6.2. Uploading artifacts
upload_artifact(session_id: str, artifact_type: str, name: str, file: io.BytesIO, llm_type: str = None) -> Dict
*Note: upload_artifact assumes uploading a file buffer (io.BytesIO, etc.), and file.name is used as meta information when uploading. Instead of passing the file path directly, set .name to the BytesIO object that has loaded the binary and pass it.
Upload models, configuration files, tokenizers, etc. associated with the session.
llm_type: Important for accurately recording the model's architecture. (Example:SpikingEvoTextLM,SpikingEvoMultiModalLM)
Example: Uploading models and tokenizers```python import io import torch import json from transformers import AutoTokenizer
--- Preparing the model and settings ---
config = { 'vocab_size': 1000, 'd_model': 128, ... } model = SpikingEvoTextLM(**config)
... training ...
--- Uploading artifacts ---
1. Model weights
model_buffer = io.BytesIO() torch.save(model.state_dict(), model_buffer) model_buffer.seek(0) model_buffer.name = 'spiking_lm.pth' # Required: upload_artifact refers to file.name client.upload_artifact(session_id, "model", model_buffer.name, model_buffer, llm_type="SpikingEvoTextLM")
2. Configuration file
config_buffer = io.BytesIO(json.dumps(config).encode('utf-8')) config_buffer.name = 'config.json' client.upload_artifact(session_id, "config", "config.json", config_buffer, llm_type="SpikingEvoTextLM")
3. Tokenizer
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased') tokenizer.save_pretrained('./tokenizer_temp') shutil.make_archive('tokenizer', 'zip', './tokenizer_temp')
with open('tokenizer.zip', 'rb') as f: zip_buffer = io.BytesIO(f.read()) zip_buffer.name = 'spiking_lm_tokenizer.zip' client.upload_artifact(session_id, "tokenizer", "spiking_lm_tokenizer.zip", zip_buffer, llm_type="SpikingEvoTextLM")
### 6.3. Listing and downloading artifacts
#### `list_artifacts(artifact_type: str = None) -> List[Dict]`
Get a list of saved artifacts.
#### `download_artifact(artifact_id: str, destination_path: str)`
Downloads the file with the specified artifact ID.
**Example: Download latest model**```python
models = client.list_artifacts(artifact_type="model")
if models:
latest_model_artifact = models[0]
client.download_artifact(
artifact_id=latest_model_artifact['artifact_id'],
destination_path="./latest_model.pth"
)
print("✅ 最新モデルをダウンロードしました")
7. Comprehensive usage example
7.1. Complete model training and artifact management workflow
<!-- TODO: update or remove - import fail<!-- Remember: Automatic conversion not possible — please fix manually -->eNetAPIClient -->
<!-- Module 'evospikenet' not found. Check moves/renames within the package -->
<!-<!-- Remember: Cannot convert automatically — please fix it manually -->
import io
import shutil
from transformers import AutoTokenizer
def complete_ml_workflow():
client = EvoSpikeNetAPIClient()
if not client.wait_for_server(): return
# 1. Session creation
session = client.create_log_session("Complete training workflow example")
session_id = session['session_id']
print(f"セッションID: {session_id}")
# 2. Model training (dummy)
config = {
'vocab_size': 30522, 'd_model': 128, 'n_heads': 4,
'num_transformer_blocks': 2, 'time_steps': 10, 'neuron_type': 'LIF'
}
model = SpikingEvoTextLM(**config)
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
print("モデルとトークナイザーを初期化")
# 3. Uploading artifacts
# model
model_buffer = io.BytesIO(); torch.save(model.state_dict(), model_buffer)
model_buffer.seek(0); model_buffer.name = 'spiking_lm.pth'
client.upload_artifact(session_id, "model", model_buffer.name, model_buffer, llm_type="SpikingEvoTextLM")
print("モデルをアップロード")
# setting
config_buffer = io.BytesIO(json.dumps(config).encode('utf-8')); config_buffer.name = 'config.json'
client.upload_artifact(session_id, "config", config_buffer.name, config_buffer, llm_type="SpikingEvoTextLM")
print("設定をアップロード")
# tokenizer
tokenizer.save_pretrained('./tokenizer_temp')
shutil.make_archive('tokenizer', 'zip', './tokenizer_temp')
with open('tokenizer.zip', 'rb') as f:
zip_buffer = io.BytesIO(f.read())
zip_buffer.name = 'spiking_lm_tokenizer.zip'
client.upload_artifact(session_id, "tokenizer", zip_buffer.name, zip_buffer, llm_type="SpikingEvoTextLM")
print("トークナイザーをアップロード")
# 4. Check for artifacts
artifacts = client.list_artifacts(artifact_type="model")
print(f"最新のモデルアーティファクト: {artifacts[0]['name']}")
if __name__ == "__main__":
complete_ml_workflow()
8. Error Handling and Best Practices
8.1. Exponential backoff retry
The with_error_handling() method retries with exponential backoff on failure:
Usage example:
client = EvoSpikeNetAPIClient()
# Wait for server to start
if not client.wait_for_server(timeout=60):
print("サーバーに接続できません")
exit(1)
# API call with retry
result = client.with_error_handling(
client.generate,
retries=5,
prompt="テストプロンプト",
max_length=100
)
if result:
print(result['generated_text'])
else:
print("生成に失敗しました")
8.2. Main exceptions
| Exception | Cause | Solution |
|---|---|---|
requests.exceptions.ConnectionError |
API server stopped | Waiting with wait_for_server() |
requests.exceptions.Timeout |
Response delay | Increase timeout value |
requests.exceptions.HTTPError |
HTTP 4xx/5xx error | Check response content |
ValueError |
Bad argument | Validate input data |
8. P3 function method list
8.1. Delay monitoring method
get_latency_stats() -> Dict
Get delay statistics for all components.
check_latency_target() -> Dict
Check target (p95 based) achievement status for each component.
8.2. Snapshot management methods
create_snapshot(snapshot_name: str, include_models: bool = True, include_data: bool = True, compression_level: int = 6) -> Dict
Create a system snapshot.
restore_snapshot(snapshot_path: str, restore_models: bool = True, restore_data: bool = True) -> Dict
Recover your system from a snapshot.
list_snapshots() -> Dict
Get a list of available snapshots.
delete_snapshot(snapshot_path: str) -> Dict
Delete the snapshot.
validate_snapshot(snapshot_path: str) -> Dict
Verify the integrity of the snapshot.
cleanup_snapshots(max_age_days: int = 30) -> Dict
Clean up old snapshots.
8.3. Scalability test method
run_scalability_test(max_nodes: int = 50, test_duration: int = 30) -> Dict
Run scalability tests.
test_node_scalability(node_counts: List[int], test_duration: float = 60.0) -> Dict
Comparative test of performance with multiple number of nodes.
run_stress_test(intensity: str = "high", duration: float = 120.0) -> Dict
Run a stress test.
get_resource_usage() -> Dict
Get current resource usage.
get_system_limits() -> Dict
Get the recommended maximum number of nodes and throughput upper limit.
8.4. Hardware optimization methods
optimize_model(model_type: str, optimizations: Optional[List[str]] = None) -> Dict
Perform optimizations such as ONNX export and quantization.
benchmark_model(model_type: str, num_runs: int = 50) -> Dict
Benchmark the model execution performance.
get_hardware_info() -> Dict
Get the hardware optimization compatibility status.
8.5. High availability monitoring methods
get_availability_status() -> Dict
Get the current availability status.
get_availability_stats(time_window: str = "24h") -> Dict
Get availability statistics.
perform_health_check() -> Dict
Run a health check.
trigger_recovery_action(action_type: str, parameters: Dict[str, Any] = None) -> Dict
Perform recovery actions.
get_availability_alerts(limit: int = 50) -> Dict
Get the latest alerts.
schedule_maintenance(start_time: str, duration_minutes: int, reason: str) -> Dict
Schedule a maintenance window.
8.6. Asynchronous Zenoh communication methods
connect_zenoh(node_id: str = "api_node") -> Dict
Connect to Zenoh router.
publish_zenoh_message(topic: str, payload: Any, priority: str = "normal", message_type: str = "notification", node_id: str = "api_node") -> Dict
Publish a message with Zenoh.
send_zenoh_request(target_node: str, request: Any, timeout: float = 5.0, node_id: str = "api_node") -> Dict
Exchange requests/responses.
send_zenoh_notification(target_nodes: List[str], notification: Any, priority: str = "normal", node_id: str = "api_node") -> Dict
Send notifications to multiple nodes.
get_zenoh_stats(node_id: str = "api_node") -> Dict
Get Zenoh communication statistics.
8.7. Distributed consensus method
propose_consensus_decision(decision_type: str, payload: Any, priority: int = 1, dependencies: List[str] = None) -> Dict
Recommend a consensus decision.
get_consensus_result(proposal_id: str, timeout: float = 30.0) -> Dict
Get consensus results.
update_node_status(node_id: str, active: bool) -> Dict
Update the operating status of consensus nodes.
get_consensus_stats() -> Dict
Get consensus statistics.
cleanup_consensus(max_age: float = 300.0) -> Dict
Clean up old consensus proposals.
8. LLM Training Job Management (New Feature)
EvoSpikeNet SDK provides management of LLM training jobs for distributed brain systems. This feature allows modality-specific models such as Vision/Audio Encoder to be trained via API.
8.1. Submit a training job
submit_training_job(job_config: Dict) -> Dict
Submit a new training job to the API server.
Parameters:
- job_config: dictionary containing training settings
- category: Model category ("LangText", "Vision", "Audio", "MultiModal")
- model_name: Model name to use
- dataset_path: Training data path
- output_dir: Output directory
- gpu: GPU usage flag
- epochs: number of epochs
- batch_size: batch size
- learning_rate: learning rate
example:```python
Vision Encoder training
vision_job = { "category": "Vision", "model_name": "google/vit-base-patch16-224", "dataset_path": "data/llm_training/Vision/vision_data.jsonl", "output_dir": "saved_models/Vision/vision-run-001", "gpu": True, "epochs": 3, "batch_size": 8, "learning_rate": 0.00001 }
response = client.submit_training_job(vision_job) print(f"ジョブID: {response['job_id']}")
### 8.2. Checking job status
#### `get_training_status(job_id: str) -> Dict`
Gets the current status of the specified training job.
**example:**```python
status = client.get_training_status("vision_training_job_001")
print(f"ステータス: {status['status']}") # running, completed, failed
print(f"進捗: {status.get('progress', 0)}%")
list_training_jobs(status_filter: str = None) -> List[Dict]
Get a list of all training jobs.
example:```python
Get all jobs
all_jobs = client.list_training_jobs()
Get only running jobs
running_jobs = client.list_training_jobs(status_filter="running")
for job in running_jobs: print(f"{job['job_id']}: {job['category']} - {job['status']}")
### 8.3. Get job details
#### `get_training_job_details(job_id: str) -> Dict`
Get detailed information about a training job.
**example:**```python
details = client.get_training_job_details("vision_training_job_001")
print(f"モデル: {details['model_name']}")
print(f"データセット: {details['dataset_path']}")
print(f"開始時間: {details['start_time']}")
print(f"ログ: {details.get('logs', [])}")
8.4. Distributed brain node support training
The SDK supports training settings optimized for distributed brain system node configurations.
example:```python
Settings by distributed brain node type
node_training_configs = { "Vision": { "model_name": "google/vit-base-patch16-224", "node_types": ["Vision-Primary", "Vision-Secondary"], "dataset_path": "data/llm_training/Vision/vision_data.jsonl" }, "Audio": { "model_name": "openai/whisper-base", "node_types": ["Audio-Primary", "Audio-Secondary"], "dataset_path": "data/llm_training/Audio/audio_data.jsonl" }, "LangText": { "model_name": "microsoft/DialoGPT-medium", "node_types": ["Lang-Primary", "Lang-Secondary"], "dataset_path": "data/llm_training/LangText/langtext_data.jsonl" } }
Training job for Vision nodes
vision_config = node_training_configs["Vision"] job_config = { "category": "Vision", "model_name": vision_config["model_name"], "dataset_path": vision_config["dataset_path"], "output_dir": f"saved_models/Vision/distributed-{datetime.now().strftime('%Y%m%d_%H%M%S')}", "gpu": True, "epochs": 5, "batch_size": 16, "learning_rate": 0.00002 }
response = client.submit_training_job(job_config) print(f"分散Visionトレーニングを開始: {response['job_id']}")
### 8.5. Training Monitoring and Automation
#### Regular status monitoring```python
import time
def monitor_training_job(job_id: str, check_interval: int = 30):
"""トレーニングジョブを監視し、完了まで待機"""
while True:
status = client.get_training_status(job_id)
print(f"ジョブ {job_id}: {status['status']}")
if status['status'] in ['completed', 'failed']:
return status
time.sleep(check_interval)
# Usage example
final_status = monitor_training_job("vision_training_job_001")
if final_status['status'] == 'completed':
print("トレーニングが正常に完了しました")
else:
print(f"トレーニングが失敗しました: {final_status.get('error', 'Unknown error')}")
Bulk management of multiple jobs```python
def submit_multiple_training_jobs(job_configs: List[Dict]) -> List[str]: """複数のトレーニングジョブを一括送信""" job_ids = [] for config in job_configs: try: response = client.submit_training_job(config) job_ids.append(response['job_id']) print(f"ジョブ送信成功: {response['job_id']} ({config['category']})") except Exception as e: print(f"ジョブ送信失敗: {config['category']} - {e}")
return job_ids
Usage example
configs = [ {"category": "Vision", "model_name": "google/vit-base-patch16-224", ...}, {"category": "Audio", "model_name": "openai/whisper-base", ...}, {"category": "LangText", "model_name": "microsoft/DialoGPT-medium", ...} ]
job_ids = submit_multiple_training_jobs(configs) print(f"送信したジョブ数: {len(job_ids)}")
---
## 8.6. Sample code collection
Below is comprehensive sample code to demonstrate each of the SDK's features. These examples are located in the `examples/sdk/` directory.
### 8.6.1. Basic usage example (`sdk_basic_usage.py`)
```python
#!/usr/bin/env python3
# Copyright 2026 Moonlight Technologies Inc.
# Auth Masahiro Aoki
"""
Basic SDK Usage Example
This script demonstrates the basic usage of the EvoSpikeNet SDK,
including text generation, model management, and artifact handling.
"""
<!-- TODO: update<!-- Module 'evospikenet' not found. Please check moves/renames in the package -->kenet.sdk import EvoSpikeNet<!-- Remember: Cannot convert automatically — please fix manually --> # Initialize client
client = EvoSpikeNetAPIClient(base_url="http://localhost:8000")
print("=== EvoSpikeNet SDK Basic Usage Demo ===\n")
# 1. Health check
print("1. Health Check")
try:
health = client.health_check()
print(f"API Status: {health}")
except Exception as e:
print(f"Health check failed: {e}")
print()
# 2. Text generation
print("2. Text Generation")
try:
result = client.generate_text(
"Explain quantum computing in simple terms",
max_tokens=150
)
print(f"Generated text: {result['text'][:100]}...")
except Exception as e:
print(f"Text generation failed: {e}")
print()
# 3. Model management
print("3. Model Management")
try:
models = client.list_models()
print(f"Available models: {len(models)}")
if models:
print(f"First model: {models[0]}")
except Exception as e:
print(f"Model listing failed: {e}")
print()
# 4. Artifact management
print("4. Artifact Management")
try:
artifacts = client.list_artifacts()
print(f"Available artifacts: {len(artifacts)}")
except Exception as e:
print(f"Artifact listing failed: {e}")
print()
# 5. Log retrieval
print("5. Log Retrieval")
try:
logs = client.get_logs(limit=5)
print(f"Recent logs: {len(logs)} entries")
except Exception as e:
print(f"Log retrieval failed: {e}")
print("\n=== Demo completed ===")
if __name__ == "__main__":
main()
8.6.2. Advanced features demo (sdk_advanced_features.py)
Demonstrates advanced SDK features including P3 features:
- Delay monitoring
- Snapshot management
- Scalability testing
- Hardware optimization
- High availability monitoring
- Asynchronous Zenoh communication
- Distributed consensus
8.6.3. Error handling (sdk_error_handling.py)
Demonstration of comprehensive error handling and retry mechanisms:
- Basic exception handling
- Retry logic
- circuit breaker
- Graceful Degradation
- Custom error handling
- Error recovery
8.6.4. Performance Monitoring (sdk_performance_monitoring.py)
Performance monitoring and benchmarking features:
- Text generation benchmark
- Distributed simulation benchmark
- Resource usage monitoring
- API call profiling
- Performance optimization
8.6.5. Configuration management (sdk_configuration_management.py)
Comprehensive demonstration of configuration management features:
- Basic settings
- Environment variable based configuration
- File-based configuration
- Dynamic configuration updates
- Configuration validation
- Settings profile
- Setting persistence
- Settings merge
- Secure settings
8.6.6. Distributed brain simulation (sdk_distributed_brain.py)
Advanced features of distributed brain simulation:
- Basic distributed simulation
- Scalable simulation
- Load balancing
- Fault tolerance
- Real-time monitoring
- Node management
- Distributed data processing
- Performance optimization
8.6.7. How to run
Each sample can be run with the following command:
# Basic usage example
python examples/sdk/sdk_basic_usage.py
# Advanced features
python examples/sdk/sdk_advanced_features.py
# error handling
python examples/sdk/sdk_error_handling.py
# performance monitoring
python examples/sdk/sdk_performance_monitoring.py
# Settings management
python examples/sdk/sdk_configuration_management.py
# distributed brain simulation
python examples/sdk/sdk_distributed_brain.py
8.6.8. Sample code learning path
- First-time users: Start with
sdk_basic_usage.py - Error handling: Write robust code with
sdk_error_handling.py - Performance: Optimized with
sdk_performance_monitoring.py - Configuration management:
sdk_configuration_management.pyfor production environment - Advanced Features: Leverage P3 features in
sdk_advanced_features.py - Distributed processing: Scalable applications with
sdk_distributed_brain.py
10. Phase E-3 Connectome production SDK
Last updated: 2026-03-19 (Phase E-3 fully completed)
How to use the new features of Phase E-3 from the SDK.
10.1 List of new API methods
Connectome automatic synchronization (scripts/sync_connectome.py)
apply_delta(base_path, delta_path, result_path): Apply differential JSON to NPZapply_delta_with_validation(base_path, delta_path, result_path, *, rollback_dir, ei_ratio_range): Apply differential with E/I ratio validationfetch_cave_synapses_with_retry(url, params, max_retries, backoff_factor): 429 CAVE API fetch with retriessync_connectome(config_path, cache_path, output_path, *, dry_run): Fully automatic synchronization orchestratorConnectomeSyncValidationError: E/I validation failure exception
HCP Lazy Routing (evospikenet.brain_routing)
compute_delay_matrix(manifest, config_path): HCP delay matrix generationoptimize_routing_delays(manifest, delay_matrix): Give priority score to connectionsbuild_hcp_routing_table(config_path): Build routing table with full pipelineHCPDelayRouter(session, config_path): Delay-aware router (operates gracefully with session=None)
Auto Node Mapper (scripts/auto_node_mapper.py)
map_connectome(input_path, output_dir, config_path, *, dry_run, seed): Connectome → NPZ division by node +node_manifest.yamlgenerate_manifest(output_dir, config_path): Regenerate manifest from existing NPZMappingResult/NodeMappingEntrydata class
10.2 Step-by-step sample
# Phase E-3 All Components Walkthrough
import evospikenet as esn
from scripts.auto_node_mapper import map_connectome
from scripts.sync_connectome import sync_connectome
from evospikenet.brain_routing import HCPDelayRouter
from evospikenet import load_connectome_npz, apply_connectome_to_layer, ConnectomeLIFLayer
# 1. Connectome → NPZ by node
result = map_connectome(
input_path="data/connectome/flywire_visual.json",
output_dir="data/connectome/nodes/",
config_path="config/connectome_config.yaml",
seed=42,
)
# 2. Inject NPZ into ConnectomeLIFLayer
data = load_connectome_npz("data/connectome/nodes/visual.npz")
layer = ConnectomeLIFLayer(num_neurons=data["n_neurons"], device="cpu")
apply_connectome_to_layer(data, layer)
# 3. Invoke HCP delay routing
router = HCPDelayRouter(session=None, config_path="config/connectome_config.yaml")
router.load_routing_table()
router.publish_all()
# 4. Periodic synchronization (CAVE API)
sync_result = sync_connectome(
config_path="config/connectome_config.yaml",
cache_path="data/connectome/nodes/visual.npz",
output_path="data/connectome/nodes/visual.npz",
dry_run=True,
)
print(f"sync: {sync_result['status']}")
10.3 Complete sample code
See connectome_e3_demo.py for a complete walkthrough example.
9. Summary
The EvoSpikeNet Python SDK allows you to access the full functionality of the API with just a few lines of code. Main advantages:
✅ Simple Interface: Hide HTTP request details ✅ Automatic retry: Improved robustness with exponential backoff ✅ Multimodal support: Integrated processing of text, images, and audio ✅ Full MLOps support: session management, artifact upload ✅ LLM Training Integration: Training job management for distributed brains
See examples/sdk_usage.py for a detailed code example.