EvoSpikeNet SDK tutorial
Copyright: 2026 Moonlight Technologies Inc.
Author: Masahiro Aoki
Last updated: January 15, 2026
overview
This tutorial will take you step-by-step through the basic usage of EvoSpikeNet SDK to advanced features. Each section includes practical code examples and explanations.
table of contents
- Basic Setup
- [Text Generation] (#2-Text Generation)
- [Multi-modal processing] (#3-Multi-modal processing)
- [Distributed Brain Simulation] (#4-Distributed Brain Simulation)
- [Artifact Management] (#5-Artifact Management)
- Error Handling
- Batch processing
- Monitoring and Statistics
- Distributed Coordinator
- Advanced Features
- Jupyter Notebook integration
1. Basic setup
1.1 SDK import and initialization
<!-- from evospikenet.sdk import EvoSpikeNetAPIClient -->
# Basic client initialization
client = EvoSpikeNetAPIClient()
# Initialization with custom settings
client = EvoSpikeNetAPIClient(
base_url="http://localhost:8000",
api_key="your_api_key",
timeout=60,
max_retries=3
)
print("✓ SDK client initialized successfully")
1.2 Checking the server connection
# health check
health = client.health_check()
print(f"Server health: {health}")
# Get server information
info = client.get_server_info()
if info:
print(f"Server version: {info.get('version')}")
print(f"Available models: {info.get('models', [])}")
# Wait until server is ready
if client.wait_for_server(timeout=30):
print("✓ Server is ready")
else:
print("✗ Server failed to become ready")
2. Text generation
2.1 Basic text generation
# simple text generation
prompt = "人工知能の未来について説明してください。"
result = client.generate(prompt, max_length=100)
print("Prompt:", prompt)
print("Generated text:", result['generated_text'])
print("Full response:", result)
2.2 Generating with different prompts
prompts = [
"Pythonの基本的な文法を説明してください。",
"機械学習の応用例を3つ挙げてください。",
"神経ネットワークの仕組みを簡単に説明してください。",
]
for prompt in prompts:
result = client.generate(prompt, max_length=150)
print(f"\n--- {prompt[:30]}... ---")
print(result['generated_text'][:100] + "...")
2.3 Adjusting generation parameters
# Generation at various lengths
prompt = "量子コンピューティングについて"
lengths = [50, 100, 200]
for length in lengths:
result = client.generate(prompt, max_length=length)
text_length = len(result['generated_text'])
print(f"Requested: {length}, Generated: {text_length} characters")
print(result['generated_text'][:100] + "...\n")
3. Multimodal processing
3.1 Prompts with images
# Prompt using image file
image_path = "./sample_image.jpg"
prompt = "この画像に写っているものを詳しく説明してください。"
try:
# prompt send
response = client.submit_prompt(
prompt=prompt,
image_path=image_path
)
print("✓ Prompt submitted successfully")
print("Response ID:", response.get('id'))
# Waiting for results
result = client.poll_for_result(timeout=120)
if result:
print("✓ Result received:")
print(result['response'])
else:
print("✗ No result received within timeout")
except Exception as e:
print(f"✗ Error: {e}")
3.2 Prompts with audio
# Prompts using audio files
audio_path = "./sample_audio.wav"
prompt = "この音声を文字起こしし、内容を要約してください。"
try:
response = client.submit_prompt(
prompt=prompt,
audio_path=audio_path
)
result = client.poll_for_result(timeout=180) # Audio processing takes time
if result:
print("✓ Audio processing result:")
print(result['response'])
else:
print("✗ Audio processing timed out")
except Exception as e:
print(f"✗ Error: {e}")
3.3 Multimodal validation
# File existence check and validation
import os
def validate_multimodal_input(prompt, image_path=None, audio_path=None):
"""マルチモーダル入力のバリデーション"""
if not prompt and not image_path and not audio_path:
return False, "At least one input (prompt, image, or audio) is required"
if image_path and not os.path.exists(image_path):
return False, f"Image file not found: {image_path}"
if audio_path and not os.path.exists(audio_path):
return False, f"Audio file not found: {audio_path}"
# File size check
if image_path:
size = os.path.getsize(image_path) / (1024 * 1024) # MB
if size > 10:
return False, f"Image file too large: {size:.1f}MB (max 10MB)"
if audio_path:
size = os.path.getsize(audio_path) / (1024 * 1024) # MB
if size > 50:
return False, f"Audio file too large: {size:.1f}MB (max 50MB)"
return True, "Validation passed"
# Usage example
is_valid, message = validate_multimodal_input(
"Describe this image",
image_path="./test.jpg"
)
if is_valid:
print("✓ Input validation passed")
# Continue processing
else:
print(f"✗ Validation failed: {message}")
4. Distributed brain simulation
4.1 Basic simulation execution
# text-based queries
query = "人間の脳はどのように学習するのか説明してください。"
try:
# prompt send
response = client.submit_prompt(prompt=query)
print(f"✓ Query submitted: {query}")
# Simulation status monitoring
import time
for i in range(10): # Check up to 10 times
status = client.get_simulation_status()
print(f"Status check {i+1}: {status}")
if status.get('completed', False):
break
time.sleep(2)
# Get results
result = client.get_simulation_result()
if result and result.get('response'):
print("✓ Simulation result:")
print(result['response'])
else:
print("✗ No result available")
except Exception as e:
print(f"✗ Simulation error: {e}")
4.2 Detailed monitoring of simulation status
def monitor_simulation():
"""シミュレーションの詳細な状態監視"""
response = client.submit_prompt(prompt="複雑な推論タスクを実行してください")
print("Monitoring simulation progress...")
while True:
status = client.get_simulation_status()
# Status display
active_nodes = status.get('active_nodes', 0)
total_nodes = status.get('total_nodes', 0)
completed_tasks = status.get('completed_tasks', 0)
total_tasks = status.get('total_tasks', 0)
print(f"Active nodes: {active_nodes}/{total_nodes}")
print(f"Completed tasks: {completed_tasks}/{total_tasks}")
# Details by node
nodes = status.get('nodes', [])
for node in nodes:
node_id = node.get('id')
node_status = node.get('status')
node_load = node.get('load', 0)
print(f" Node {node_id}: {node_status} (load: {node_load}%)")
if status.get('completed', False):
print("✓ Simulation completed")
break
if status.get('failed', False):
print("✗ Simulation failed")
break
time.sleep(5)
# Get final results
result = client.get_simulation_result()
return result
# execution
result = monitor_simulation()
if result:
print("Final result:", result.get('response'))
4.3 Remote log acquisition
# Log acquisition from remote node
remote_config = {
'user': 'ubuntu',
'ip': '192.168.1.100',
'key_path': '~/.ssh/id_rsa',
'log_file_path': '/var/log/evospikenet/simulation.log'
}
try:
logs = client.get_remote_log(**remote_config)
print("✓ Remote logs retrieved:")
print(logs.get('content', 'No content'))
except Exception as e:
print(f"✗ Failed to retrieve remote logs: {e}")
5. Artifact Management
5.1 Creating a log session
# Create new log session
session = client.create_log_session(
description="Tutorial session for artifact management"
)
session_id = session.get('session_id')
print(f"✓ Created session: {session_id}")
5.2 Uploading the model
import io
# Upload model file
model_path = "./trained_model.pkl"
with open(model_path, 'rb') as f:
model_data = io.BytesIO(f.read())
result = client.upload_artifact(
session_id=session_id,
artifact_type="model",
name="tutorial_model_v1",
file=model_data,
llm_type="SpikingEvoVisionEncoder",
model_category="image_classification",
model_variant="standard"
)
print(f"✓ Model uploaded: {result}")
5.3 Manage artifacts
# Get list of artifacts
artifacts = client.list_artifacts()
print(f"Total artifacts: {len(artifacts) if isinstance(artifacts, list) else 'N/A'}")
# Filter only model type
models = client.list_artifacts(artifact_type="model")
print(f"Model artifacts: {len(models) if isinstance(models, list) else 'N/A'}")
# Specific artifact download
if isinstance(artifacts, list) and artifacts:
artifact_id = artifacts[0].get('id')
client.download_artifact(artifact_id, "./downloaded_model.pkl")
print(f"✓ Downloaded artifact: {artifact_id}")
5.4 Managing configuration files
# Upload configuration file
config_data = """
model:
type: SpikingEvoVisionEncoder
layers: 5
neurons_per_layer: 100
training:
epochs: 100
batch_size: 32
learning_rate: 0.001
"""
config_file = io.BytesIO(config_data.encode('utf-8'))
result = client.upload_artifact(
session_id=session_id,
artifact_type="config",
name="tutorial_config",
file=config_file
)
print(f"✓ Config uploaded: {result}")
6. Error handling
6.1 Basic error handling
<!-- TODO: update or remove - import fail<!-- Remember: Automatic conversion not possible — please fix manually -->eNetAPIError -->
def safe_generate(prompt, max_retries=3):
"""安全なテキスト生成関数"""
for attempt in range(max_retries):
try:
result = client.generate(prompt, max_length=100)
return result
except EvoSpikeNetAPIError as e:
print(f"Attempt {attempt + 1} failed: {e.error_info.message}")
if e.error_info.retry_after:
print(f"Retrying after {e.error_info.retry_after} seconds...")
time.sleep(e.error_info.retry_after)
else:
break
except Exception as e:
print(f"Unexpected error: {e}")
break
return None
# Usage example
result = safe_generate("Test prompt")
if result:
print("✓ Generation successful:", result['generated_text'])
else:
print("✗ All attempts failed")
6.2 Comprehensive error handling
def robust_simulation_workflow(prompt):
"""堅牢なシミュレーションワークフロー"""
try:
# 1. Prompt validation
if not client.validate_prompt(prompt):
raise ValueError("Invalid prompt")
# 2. Prompt sending (with retry)
response = client.with_error_handling(
client.submit_prompt,
prompt=prompt,
retries=3
)
if not response:
raise RuntimeError("Failed to submit prompt")
# 3. Result polling (with timeout)
result = client.poll_for_result(timeout=300, interval=10)
if not result:
raise TimeoutError("Simulation timed out")
return result
except EvoSpikeNetAPIError as e:
print(f"API Error: {e.error_info.error_type}")
print(f"Message: {e.error_info.message}")
if e.error_info.details:
print(f"Details: {e.error_info.details}")
return None
except Exception as e:
print(f"Unexpected error: {type(e).__name__}: {e}")
return None
# Usage example
result = robust_simulation_workflow("複雑な分析タスク")
if result:
print("✓ Workflow completed successfully")
print("Result:", result.get('response'))
else:
print("✗ Workflow failed")
6.3 Custom error handler
class SimulationErrorHandler:
"""シミュレーションエラーのカスタムハンドラー"""
def __init__(self, client):
self.client = client
self.error_counts = {}
def handle_error(self, error, context=""):
"""エラーハンドリングとログ記録"""
error_type = type(error).__name__
# error count
self.error_counts[error_type] = self.error_counts.get(error_type, 0) + 1
# logging
print(f"[{context}] Error: {error_type}")
print(f"Message: {str(error)}")
if isinstance(error, EvoSpikeNetAPIError):
print(f"API Error Type: {error.error_info.error_type}")
print(f"Status Code: {error.error_info.status_code}")
# Handling specific errors
if error.error_info.error_type == "TimeoutError":
print("→ Consider increasing timeout or checking server load")
elif error.error_info.error_type == "ConnectionError":
print("→ Check network connectivity and server status")
# Retry judgment
should_retry = self.should_retry(error)
if should_retry:
print("→ Retrying operation...")
else:
print("→ Not retrying this type of error")
return should_retry
def should_retry(self, error):
"""リトライが必要かどうかの判断"""
if isinstance(error, EvoSpikeNetAPIError):
# Retry on server error or timeout
if error.error_info.status_code in [500, 502, 503, 504]:
return True
if error.error_info.error_type in ["TimeoutError", "ConnectionError"]:
return True
return False
def get_error_summary(self):
"""エラー統計の取得"""
return {
"total_errors": sum(self.error_counts.values()),
"error_types": self.error_counts.copy()
}
# Usage example
handler = SimulationErrorHandler(client)
try:
result = client.generate("Test prompt")
except Exception as e:
should_retry = handler.handle_error(e, "text_generation")
if should_retry:
# Retry processing
pass
print("Error summary:", handler.get_error_summary())
7. Batch processing
7.1 Batch generation of multiple prompts
# batch text generation
prompts = [
"Pythonのリスト内包表記について説明してください。",
"機械学習における過学習を防ぐ方法を教えてください。",
"ニューラルネットワークの活性化関数について説明してください。",
"データサイエンスのワークフローについて説明してください。",
"クラウドコンピューティングの利点を挙げてください。",
]
print(f"Processing {len(prompts)} prompts...")
# Batch processing
results = client.batch_generate(prompts, max_length=150)
# Results display
for i, (prompt, result) in enumerate(zip(prompts, results), 1):
print(f"\n--- Prompt {i} ---")
print(f"Input: {prompt}")
if 'generated_text' in result:
print(f"Output: {result['generated_text'][:200]}...")
elif 'error' in result:
print(f"Error: {result['error']}")
else:
print("Unexpected result format")
7.2 Parallel batch processing
import concurrent.futures
import threading
def generate_with_thread_safety(prompt, client, results, index):
"""スレッドセーフな生成関数"""
try:
result = client.generate(prompt, max_length=100)
results[index] = result
print(f"✓ Completed prompt {index + 1}")
except Exception as e:
results[index] = {"error": str(e), "prompt": prompt}
print(f"✗ Failed prompt {index + 1}: {e}")
def parallel_batch_generate(prompts, max_workers=3):
"""並列バッチ生成"""
results = [None] * len(prompts)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for i, prompt in enumerate(prompts):
future = executor.submit(generate_with_thread_safety, prompt, client, results, i)
futures.append(future)
# Waiting for completion
concurrent.futures.wait(futures)
return results
# Usage example
prompts = [
"量子コンピューティングの基本原理を説明してください。",
"ブロックチェーン技術の仕組みについて教えてください。",
"5Gネットワークの特徴を説明してください。",
]
print("Starting parallel batch generation...")
results = parallel_batch_generate(prompts, max_workers=2)
for i, result in enumerate(results):
print(f"\nPrompt {i+1}:")
if 'generated_text' in result:
print(result['generated_text'][:150] + "...")
else:
print(f"Error: {result.get('error', 'Unknown error')}")
7.3 Batch processing with progress bar
def batch_generate_with_progress(prompts, batch_size=5):
"""プログレスバー付きバッチ処理"""
results = []
try:
from tqdm import tqdm
use_tqdm = True
except ImportError:
use_tqdm = False
print("tqdm not available, using simple progress")
if use_tqdm:
pbar = tqdm(total=len(prompts), desc="Generating")
for i in range(0, len(prompts), batch_size):
batch = prompts[i:i + batch_size]
# Batch processing
batch_results = client.batch_generate(batch, max_length=120)
for result in batch_results:
results.append(result)
if use_tqdm:
pbar.update(1)
else:
print(f"Processed {len(results)}/{len(prompts)} prompts")
if use_tqdm:
pbar.close()
return results
# Usage example
prompts = [f"トピック {i} についての説明を書いてください。" for i in range(1, 21)]
results = batch_generate_with_progress(prompts, batch_size=3)
successful = sum(1 for r in results if 'generated_text' in r)
print(f"\n✓ Successfully generated {successful}/{len(prompts)} responses")
8. Monitoring and statistics
8.1 Monitoring client statistics
# Get basic statistics
stats = client.get_stats()
print("=== Client Statistics ===")
print(f"Total requests: {stats['requests']}")
print(f"Total errors: {stats['errors']}")
print(f"Total retries: {stats['retries']}")
print(f"Average latency: {stats['average_latency']:.3f}s")
print(f"Error rate: {stats['error_rate']:.1%}")
print(f"Retry rate: {stats['retry_rate']:.1%}")
# Reset statistics
client.reset_stats()
print("✓ Statistics reset")
8.2 Performance monitoring
import time
def benchmark_generation(prompts, num_runs=5):
"""生成パフォーマンスのベンチマーク"""
results = []
for run in range(num_runs):
print(f"\n--- Benchmark Run {run + 1}/{num_runs} ---")
start_time = time.time()
batch_results = client.batch_generate(prompts, max_length=100)
end_time = time.time()
run_time = end_time - start_time
successful = sum(1 for r in batch_results if 'generated_text' in r)
results.append({
'run_time': run_time,
'successful': successful,
'total': len(prompts),
'avg_time_per_prompt': run_time / len(prompts)
})
print(f"Time: {run_time:.2f}s")
print(f"Success rate: {successful}/{len(prompts)}")
# statistical calculation
avg_time = sum(r['run_time'] for r in results) / len(results)
avg_success_rate = sum(r['successful'] for r in results) / sum(r['total'] for r in results)
avg_time_per_prompt = sum(r['avg_time_per_prompt'] for r in results) / len(results)
print("
=== Benchmark Summary ===")
print(f"Average time: {avg_time:.2f}s")
print(f"Average success rate: {avg_success_rate:.1%}")
print(f"Average time per prompt: {avg_time_per_prompt:.3f}s")
return results
# Usage example
test_prompts = [
"Hello, world!",
"What is AI?",
"Explain machine learning."
]
benchmark_results = benchmark_generation(test_prompts, num_runs=3)
8.3 System resource monitoring
# Obtaining system resource usage status
try:
resources = client.get_resource_usage()
print("=== System Resources ===")
print(f"CPU usage: {resources.get('cpu_percent', 'N/A')}%")
print(f"Memory usage: {resources.get('memory_percent', 'N/A')}%")
print(f"Disk usage: {resources.get('disk_percent', 'N/A')}%")
# Resources by node
nodes = resources.get('nodes', [])
for node in nodes:
print(f"Node {node.get('id')}: CPU {node.get('cpu')}%, Memory {node.get('memory')}%")
except Exception as e:
print(f"Resource monitoring not available: {e}")
8.4 Delay monitoring
# Get delay statistics
try:
latency_stats = client.get_latency_stats()
print("=== Latency Statistics ===")
for component, stats in latency_stats.items():
print(f"{component}:")
print(f" Average: {stats.get('avg', 'N/A')}ms")
print(f" Min: {stats.get('min', 'N/A')}ms")
print(f" Max: {stats.get('max', 'N/A')}ms")
print(f" P95: {stats.get('p95', 'N/A')}ms")
# Check for delayed targets
target_check = client.check_latency_target()
print(f"\nLatency targets met: {target_check.get('met', 'Unknown')}")
except Exception as e:
print(f"Latency monitoring not available: {e}")
9. Distributed Coordinator
9.1 Initializing the Distributed Coordinator
# Initializing the distributed coordinator
client.init_coordinator(
node_id="tutorial_node_1",
zenoh_config={
"connect": ["tcp/127.0.0.1:7447"]
},
raft_config={
"election_timeout": [5000, 10000]
}
)
print("✓ Distributed coordinator initialized")
9.2 Starting and stopping the Coordinator
# Starting the Coordinator
client.start_coordinator()
print("✓ Coordinator started")
# Checking the cluster status
cluster_status = client.get_cluster_status()
print(f"Leader: {cluster_status.get('leader_id', 'None')}")
print(f"Active nodes: {len(cluster_status.get('nodes', {}))}")
# Stopping the Coordinator
client.stop_coordinator()
print("✓ Coordinator stopped")
9.3 Submitting collaborative tasks
# Starting the Coordinator
client.init_coordinator("tutorial_node_1")
client.start_coordinator()
# Submit a collaborative task
task_id = client.submit_coordination_task(
task_type="federated_learning",
payload={
"model": "resnet50",
"dataset": "cifar10",
"rounds": 10,
"learning_rate": 0.01
}
)
print(f"✓ Coordination task submitted: {task_id}")
# Monitoring task status
import time
for _ in range(30): # Monitor for 30 seconds
status = client.get_coordination_task_status(task_id)
if status:
print(f"Task status: {status['status']}")
if status['status'] in ['completed', 'failed']:
break
time.sleep(1)
print("Task monitoring completed")
Internal task execution logic (simple implementation built into SDK)
federated_learning: Averagepayload['updates'](list of numeric dictionaries) and giveaggregated_parameters.distributed_inference: Convertpayload['inputs']/payload['batches']into the result as is, and return each entry withnode_idandstatus=completed.model_aggregation: Average theweightslist contained inpayload['models']and generateaggregated_model['weights'].- Node discovery and cleanup: Automatically delete nodes that have no heartbeat for a certain period of time while incorporating
/nodes/listresponses via Zenoh.
9.4 Node management
# Registering a new node
node_info = {
"address": "192.168.1.100",
"port": 8001,
"capabilities": ["gpu", "cpu"],
"resources": {
"cpu_cores": 8,
"memory_gb": 16,
"gpu_count": 1
}
}
success = client.register_coordination_node("worker_node_1", node_info)
print(f"✓ Node registration: {'successful' if success else 'failed'}")
# Check registered nodes
cluster_status = client.get_cluster_status()
print("Registered nodes:")
for node_id, node_data in cluster_status.get('nodes', {}).items():
print(f" {node_id}: {node_data.get('capabilities', [])}")
# Release a node
success = client.unregister_coordination_node("worker_node_1")
print(f"✓ Node unregistration: {'successful' if success else 'failed'}")
9.5 Advanced Cooperative Scenarios
# Cooperative execution of multiple tasks
tasks = [
{
"type": "model_training",
"payload": {"model": "bert", "dataset": "squad"}
},
{
"type": "data_processing",
"payload": {"operation": "preprocessing", "data_size": "large"}
},
{
"type": "inference",
"payload": {"model": "gpt2", "batch_size": 32}
}
]
task_ids = []
for task in tasks:
task_id = client.submit_coordination_task(task["type"], task["payload"])
task_ids.append(task_id)
print(f"✓ Submitted {task['type']} task: {task_id}")
# Status monitoring of all tasks
completed_tasks = 0
while completed_tasks < len(task_ids):
for task_id in task_ids:
status = client.get_coordination_task_status(task_id)
if status and status['status'] == 'completed':
if task_id not in [t['id'] for t in completed_tasks]:
print(f"✓ Task {task_id} completed")
completed_tasks += 1
time.sleep(2)
print("All coordination tasks completed")
10. Advanced features
9.1 Snapshot Management
# Creating a system snapshot
try:
snapshot = client.create_snapshot(
snapshot_name="tutorial_backup",
include_models=True,
include_data=True,
compression_level=6
)
print(f"✓ Snapshot created: {snapshot}")
# Snapshot list
snapshots = client.list_snapshots()
print(f"Available snapshots: {len(snapshots)}")
# Verifying the snapshot
if snapshots:
snapshot_path = snapshots[0].get('path')
validation = client.validate_snapshot(snapshot_path)
print(f"Snapshot validation: {validation}")
except Exception as e:
print(f"Snapshot operations not available: {e}")
9.2 Scalability test
# Running scalability tests
try:
scalability_test = client.run_scalability_test(
max_nodes=20,
test_duration=30
)
print(f"✓ Scalability test started: {scalability_test}")
# Get test results
results = client.get_scalability_results()
print(f"Scalability results: {results}")
# node scalability test
node_test = client.test_node_scalability(
node_counts=[5, 10, 15, 20],
test_duration=60
)
print(f"Node scalability test: {node_test}")
except Exception as e:
print(f"Scalability testing not available: {e}")
9.3 Zenoh Communication
# Zenoh connection
try:
zenoh_connection = client.connect_zenoh(node_id="tutorial_client")
print(f"✓ Connected to Zenoh: {zenoh_connection}")
# Send message
message = client.publish_zenoh_message(
topic="tutorial/test",
payload={"message": "Hello from SDK tutorial", "timestamp": time.time()},
priority="normal",
message_type="notification"
)
print(f"✓ Message published: {message}")
# Send request
request = client.send_zenoh_request(
target_node="brain_node_1",
request={"action": "get_status"},
timeout=5.0
)
print(f"✓ Request sent: {request}")
# Get statistics
zenoh_stats = client.get_zenoh_stats()
print(f"Zenoh stats: {zenoh_stats}")
except Exception as e:
print(f"Zenoh operations not available: {e}")
9.4 AEG-Comm communication optimization ⭐ NEW (2026-01-23)
AEG-Comm is a feature that intelligently optimizes communication in distributed brain simulations.
# AEG-Comm settings
try:
# Communication optimization settings
config_result = client.set_aeg_comm_config(
node_id="brain_node_1",
enable_comm=True,
energy_threshold=10.0,
critical_modalities=["force", "safety", "text"],
force_change_threshold=10.0
)
print(f"✓ AEG-Comm configured: {config_result}")
# Communication statistics acquisition
comm_stats = client.get_communication_stats(node_id="brain_node_1")
print(f"通信削減率: {comm_stats.get('reduction_rate', 0)}%")
print(f"送信パケット数: {comm_stats.get('sent_packets', 0)}")
print(f"ブロックされたパケット数: {comm_stats.get('blocked_packets', 0)}")
# AEG-Comm status check
status = client.get_aeg_comm_status(node_id="brain_node_1")
print(f"AEG-Comm status: {status}")
except Exception as e:
print(f"AEG-Comm operations not available: {e}")
9.5 Consensus operations
# Consensus decision proposal
try:
proposal = client.propose_consensus_decision(
decision_type="resource_allocation",
payload={"resource": "gpu", "amount": 50},
priority=2,
dependencies=["previous_decision_123"]
)
print(f"✓ Consensus proposal submitted: {proposal}")
# Obtain decision result
if 'proposal_id' in proposal:
result = client.get_consensus_result(proposal['proposal_id'], timeout=30)
print(f"✓ Consensus result: {result}")
# Node status update
node_update = client.update_node_status(node_id="node_1", active=True)
print(f"✓ Node status updated: {node_update}")
# consensus statistics
stats = client.get_consensus_stats()
print(f"Consensus stats: {stats}")
except Exception as e:
print(f"Consensus operations not available: {e}")
10. Jupyter Notebook integration
10.1 Initialization in Jupyter environment
# SDK usage in Jupyter Notebook
<!-- Module 'evospikenet' not found. Check moves/renames within the package -->
<!-<!-- Remember: Cannot convert automatically — please fix it manually -->
jupyter_client = JupyterAPIClient()
# Display mode settings
jupyter_client.set_display_mode("html") # "html", "json", "text"
# Server information display
jupyter_client.show_server_info()
# Statistics display
jupyter_client.show_stats()
10.2 Jupyter Magic Commands
Use below magic command in Jupyter Notebook:
# Text generation using cell magic
%%evospikenet_generate 100
人工知能の最新トレンドについて説明してください。
# Connection using line magic
%evospikenet_connect http://localhost:8000
# Statistics display
%evospikenet_stats
# Server information display
%evospikenet_info
10.3 Interactive Prompt Validation
# Interactive prompt validation
test_prompt = "これはテストプロンプトです。"
is_valid = jupyter_client.validate_prompt_interactive(test_prompt)
print(f"Prompt valid: {is_valid}")
# Continue processing only if valid
if is_valid:
result = jupyter_client.generate(test_prompt, show_output=True)
summary
In this tutorial, you learned how to use the EvoSpikeNet SDK from basic usage to advanced features. Main points:
- Basic setup: Client initialization and server connection
- Text generation: simple and batch generation
- Multimodal processing: Prompts with images and sounds
- Distributed brain simulation: Status monitoring and result acquisition
- Artifact Management: Upload/Download/Manage
- Error Handling: Robust error handling and retries
- Batch Processing: Efficient multiple processing
- Monitoring: Performance and statistics monitoring
- Distributed Coordinator: Distributed coordination using Zenoh DDS + Raft consensus
- Advanced Features: Snapshots, Zenoh, AEG-Comm, Consensus
- Jupyter integration: Use in a notebook environment
By combining these features, you can take full advantage of EvoSpikeNet's capabilities. In actual application development, implement proper error handling and monitoring.
Next steps:
- Check detailed specifications in API Reference
- See practical examples in Sample Code
- Learn advanced settings with Configuration Guide