Skip to content

EvoSpikeNet SDK tutorial

Author: Masahiro Aoki

Last updated: January 15, 2026

overview

This tutorial will take you step-by-step through the basic usage of EvoSpikeNet SDK to advanced features. Each section includes practical code examples and explanations.

table of contents

  1. Basic Setup
  2. [Text Generation] (#2-Text Generation)
  3. [Multi-modal processing] (#3-Multi-modal processing)
  4. [Distributed Brain Simulation] (#4-Distributed Brain Simulation)
  5. [Artifact Management] (#5-Artifact Management)
  6. Error Handling
  7. Batch processing
  8. Monitoring and Statistics
  9. Distributed Coordinator
  10. Advanced Features
  11. Jupyter Notebook integration

1. Basic setup

1.1 SDK import and initialization

<!-- from evospikenet.sdk import EvoSpikeNetAPIClient -->

# Basic client initialization
client = EvoSpikeNetAPIClient()

# Initialization with custom settings
client = EvoSpikeNetAPIClient(
    base_url="http://localhost:8000",
    api_key="your_api_key",
    timeout=60,
    max_retries=3
)

print("✓ SDK client initialized successfully")

1.2 Checking the server connection

# health check
health = client.health_check()
print(f"Server health: {health}")

# Get server information
info = client.get_server_info()
if info:
    print(f"Server version: {info.get('version')}")
    print(f"Available models: {info.get('models', [])}")

# Wait until server is ready
if client.wait_for_server(timeout=30):
    print("✓ Server is ready")
else:
    print("✗ Server failed to become ready")

2. Text generation

2.1 Basic text generation

# simple text generation
prompt = "人工知能の未来について説明してください。"
result = client.generate(prompt, max_length=100)

print("Prompt:", prompt)
print("Generated text:", result['generated_text'])
print("Full response:", result)

2.2 Generating with different prompts

prompts = [
    "Pythonの基本的な文法を説明してください。",
    "機械学習の応用例を3つ挙げてください。",
    "神経ネットワークの仕組みを簡単に説明してください。",
]

for prompt in prompts:
    result = client.generate(prompt, max_length=150)
    print(f"\n--- {prompt[:30]}... ---")
    print(result['generated_text'][:100] + "...")

2.3 Adjusting generation parameters

# Generation at various lengths
prompt = "量子コンピューティングについて"

lengths = [50, 100, 200]
for length in lengths:
    result = client.generate(prompt, max_length=length)
    text_length = len(result['generated_text'])
    print(f"Requested: {length}, Generated: {text_length} characters")
    print(result['generated_text'][:100] + "...\n")

3. Multimodal processing

3.1 Prompts with images

# Prompt using image file
image_path = "./sample_image.jpg"
prompt = "この画像に写っているものを詳しく説明してください。"

try:
    # prompt send
    response = client.submit_prompt(
        prompt=prompt,
        image_path=image_path
    )
    print("✓ Prompt submitted successfully")
    print("Response ID:", response.get('id'))

    # Waiting for results
    result = client.poll_for_result(timeout=120)
    if result:
        print("✓ Result received:")
        print(result['response'])
    else:
        print("✗ No result received within timeout")

except Exception as e:
    print(f"✗ Error: {e}")

3.2 Prompts with audio

# Prompts using audio files
audio_path = "./sample_audio.wav"
prompt = "この音声を文字起こしし、内容を要約してください。"

try:
    response = client.submit_prompt(
        prompt=prompt,
        audio_path=audio_path
    )

    result = client.poll_for_result(timeout=180)  # Audio processing takes time
    if result:
        print("✓ Audio processing result:")
        print(result['response'])
    else:
        print("✗ Audio processing timed out")

except Exception as e:
    print(f"✗ Error: {e}")

3.3 Multimodal validation

# File existence check and validation
import os

def validate_multimodal_input(prompt, image_path=None, audio_path=None):
    """マルチモーダル入力のバリデーション"""
    if not prompt and not image_path and not audio_path:
        return False, "At least one input (prompt, image, or audio) is required"

    if image_path and not os.path.exists(image_path):
        return False, f"Image file not found: {image_path}"

    if audio_path and not os.path.exists(audio_path):
        return False, f"Audio file not found: {audio_path}"

    # File size check
    if image_path:
        size = os.path.getsize(image_path) / (1024 * 1024)  # MB
        if size > 10:
            return False, f"Image file too large: {size:.1f}MB (max 10MB)"

    if audio_path:
        size = os.path.getsize(audio_path) / (1024 * 1024)  # MB
        if size > 50:
            return False, f"Audio file too large: {size:.1f}MB (max 50MB)"

    return True, "Validation passed"

# Usage example
is_valid, message = validate_multimodal_input(
    "Describe this image",
    image_path="./test.jpg"
)

if is_valid:
    print("✓ Input validation passed")
    # Continue processing
else:
    print(f"✗ Validation failed: {message}")

4. Distributed brain simulation

4.1 Basic simulation execution

# text-based queries
query = "人間の脳はどのように学習するのか説明してください。"

try:
    # prompt send
    response = client.submit_prompt(prompt=query)
    print(f"✓ Query submitted: {query}")

    # Simulation status monitoring
    import time
    for i in range(10):  # Check up to 10 times
        status = client.get_simulation_status()
        print(f"Status check {i+1}: {status}")

        if status.get('completed', False):
            break

        time.sleep(2)

    # Get results
    result = client.get_simulation_result()
    if result and result.get('response'):
        print("✓ Simulation result:")
        print(result['response'])
    else:
        print("✗ No result available")

except Exception as e:
    print(f"✗ Simulation error: {e}")

4.2 Detailed monitoring of simulation status

def monitor_simulation():
    """シミュレーションの詳細な状態監視"""
    response = client.submit_prompt(prompt="複雑な推論タスクを実行してください")

    print("Monitoring simulation progress...")

    while True:
        status = client.get_simulation_status()

        # Status display
        active_nodes = status.get('active_nodes', 0)
        total_nodes = status.get('total_nodes', 0)
        completed_tasks = status.get('completed_tasks', 0)
        total_tasks = status.get('total_tasks', 0)

        print(f"Active nodes: {active_nodes}/{total_nodes}")
        print(f"Completed tasks: {completed_tasks}/{total_tasks}")

        # Details by node
        nodes = status.get('nodes', [])
        for node in nodes:
            node_id = node.get('id')
            node_status = node.get('status')
            node_load = node.get('load', 0)
            print(f"  Node {node_id}: {node_status} (load: {node_load}%)")

        if status.get('completed', False):
            print("✓ Simulation completed")
            break

        if status.get('failed', False):
            print("✗ Simulation failed")
            break

        time.sleep(5)

    # Get final results
    result = client.get_simulation_result()
    return result

# execution
result = monitor_simulation()
if result:
    print("Final result:", result.get('response'))

4.3 Remote log acquisition

# Log acquisition from remote node
remote_config = {
    'user': 'ubuntu',
    'ip': '192.168.1.100',
    'key_path': '~/.ssh/id_rsa',
    'log_file_path': '/var/log/evospikenet/simulation.log'
}

try:
    logs = client.get_remote_log(**remote_config)
    print("✓ Remote logs retrieved:")
    print(logs.get('content', 'No content'))

except Exception as e:
    print(f"✗ Failed to retrieve remote logs: {e}")

5. Artifact Management

5.1 Creating a log session

# Create new log session
session = client.create_log_session(
    description="Tutorial session for artifact management"
)
session_id = session.get('session_id')
print(f"✓ Created session: {session_id}")

5.2 Uploading the model

import io

# Upload model file
model_path = "./trained_model.pkl"

with open(model_path, 'rb') as f:
    model_data = io.BytesIO(f.read())

result = client.upload_artifact(
    session_id=session_id,
    artifact_type="model",
    name="tutorial_model_v1",
    file=model_data,
    llm_type="SpikingEvoVisionEncoder",
    model_category="image_classification",
    model_variant="standard"
)

print(f"✓ Model uploaded: {result}")

5.3 Manage artifacts

# Get list of artifacts
artifacts = client.list_artifacts()
print(f"Total artifacts: {len(artifacts) if isinstance(artifacts, list) else 'N/A'}")

# Filter only model type
models = client.list_artifacts(artifact_type="model")
print(f"Model artifacts: {len(models) if isinstance(models, list) else 'N/A'}")

# Specific artifact download
if isinstance(artifacts, list) and artifacts:
    artifact_id = artifacts[0].get('id')
    client.download_artifact(artifact_id, "./downloaded_model.pkl")
    print(f"✓ Downloaded artifact: {artifact_id}")

5.4 Managing configuration files

# Upload configuration file
config_data = """
model:
  type: SpikingEvoVisionEncoder
  layers: 5
  neurons_per_layer: 100

training:
  epochs: 100
  batch_size: 32
  learning_rate: 0.001
"""

config_file = io.BytesIO(config_data.encode('utf-8'))

result = client.upload_artifact(
    session_id=session_id,
    artifact_type="config",
    name="tutorial_config",
    file=config_file
)

print(f"✓ Config uploaded: {result}")

6. Error handling

6.1 Basic error handling

<!-- TODO: update or remove - import fail<!-- Remember: Automatic conversion not possible  please fix manually -->eNetAPIError -->

def safe_generate(prompt, max_retries=3):
    """安全なテキスト生成関数"""
    for attempt in range(max_retries):
        try:
            result = client.generate(prompt, max_length=100)
            return result

        except EvoSpikeNetAPIError as e:
            print(f"Attempt {attempt + 1} failed: {e.error_info.message}")

            if e.error_info.retry_after:
                print(f"Retrying after {e.error_info.retry_after} seconds...")
                time.sleep(e.error_info.retry_after)
            else:
                break

        except Exception as e:
            print(f"Unexpected error: {e}")
            break

    return None

# Usage example
result = safe_generate("Test prompt")
if result:
    print("✓ Generation successful:", result['generated_text'])
else:
    print("✗ All attempts failed")

6.2 Comprehensive error handling

def robust_simulation_workflow(prompt):
    """堅牢なシミュレーションワークフロー"""
    try:
        # 1. Prompt validation
        if not client.validate_prompt(prompt):
            raise ValueError("Invalid prompt")

        # 2. Prompt sending (with retry)
        response = client.with_error_handling(
            client.submit_prompt,
            prompt=prompt,
            retries=3
        )

        if not response:
            raise RuntimeError("Failed to submit prompt")

        # 3. Result polling (with timeout)
        result = client.poll_for_result(timeout=300, interval=10)

        if not result:
            raise TimeoutError("Simulation timed out")

        return result

    except EvoSpikeNetAPIError as e:
        print(f"API Error: {e.error_info.error_type}")
        print(f"Message: {e.error_info.message}")
        if e.error_info.details:
            print(f"Details: {e.error_info.details}")
        return None

    except Exception as e:
        print(f"Unexpected error: {type(e).__name__}: {e}")
        return None

# Usage example
result = robust_simulation_workflow("複雑な分析タスク")
if result:
    print("✓ Workflow completed successfully")
    print("Result:", result.get('response'))
else:
    print("✗ Workflow failed")

6.3 Custom error handler

class SimulationErrorHandler:
    """シミュレーションエラーのカスタムハンドラー"""

    def __init__(self, client):
        self.client = client
        self.error_counts = {}

    def handle_error(self, error, context=""):
        """エラーハンドリングとログ記録"""
        error_type = type(error).__name__

        # error count
        self.error_counts[error_type] = self.error_counts.get(error_type, 0) + 1

        # logging
        print(f"[{context}] Error: {error_type}")
        print(f"Message: {str(error)}")

        if isinstance(error, EvoSpikeNetAPIError):
            print(f"API Error Type: {error.error_info.error_type}")
            print(f"Status Code: {error.error_info.status_code}")

            # Handling specific errors
            if error.error_info.error_type == "TimeoutError":
                print("→ Consider increasing timeout or checking server load")
            elif error.error_info.error_type == "ConnectionError":
                print("→ Check network connectivity and server status")

        # Retry judgment
        should_retry = self.should_retry(error)
        if should_retry:
            print("→ Retrying operation...")
        else:
            print("→ Not retrying this type of error")

        return should_retry

    def should_retry(self, error):
        """リトライが必要かどうかの判断"""
        if isinstance(error, EvoSpikeNetAPIError):
            # Retry on server error or timeout
            if error.error_info.status_code in [500, 502, 503, 504]:
                return True
            if error.error_info.error_type in ["TimeoutError", "ConnectionError"]:
                return True

        return False

    def get_error_summary(self):
        """エラー統計の取得"""
        return {
            "total_errors": sum(self.error_counts.values()),
            "error_types": self.error_counts.copy()
        }

# Usage example
handler = SimulationErrorHandler(client)

try:
    result = client.generate("Test prompt")
except Exception as e:
    should_retry = handler.handle_error(e, "text_generation")
    if should_retry:
        # Retry processing
        pass

print("Error summary:", handler.get_error_summary())

7. Batch processing

7.1 Batch generation of multiple prompts

# batch text generation
prompts = [
    "Pythonのリスト内包表記について説明してください。",
    "機械学習における過学習を防ぐ方法を教えてください。",
    "ニューラルネットワークの活性化関数について説明してください。",
    "データサイエンスのワークフローについて説明してください。",
    "クラウドコンピューティングの利点を挙げてください。",
]

print(f"Processing {len(prompts)} prompts...")

# Batch processing
results = client.batch_generate(prompts, max_length=150)

# Results display
for i, (prompt, result) in enumerate(zip(prompts, results), 1):
    print(f"\n--- Prompt {i} ---")
    print(f"Input: {prompt}")

    if 'generated_text' in result:
        print(f"Output: {result['generated_text'][:200]}...")
    elif 'error' in result:
        print(f"Error: {result['error']}")
    else:
        print("Unexpected result format")

7.2 Parallel batch processing

import concurrent.futures
import threading

def generate_with_thread_safety(prompt, client, results, index):
    """スレッドセーフな生成関数"""
    try:
        result = client.generate(prompt, max_length=100)
        results[index] = result
        print(f"✓ Completed prompt {index + 1}")
    except Exception as e:
        results[index] = {"error": str(e), "prompt": prompt}
        print(f"✗ Failed prompt {index + 1}: {e}")

def parallel_batch_generate(prompts, max_workers=3):
    """並列バッチ生成"""
    results = [None] * len(prompts)

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        for i, prompt in enumerate(prompts):
            future = executor.submit(generate_with_thread_safety, prompt, client, results, i)
            futures.append(future)

        # Waiting for completion
        concurrent.futures.wait(futures)

    return results

# Usage example
prompts = [
    "量子コンピューティングの基本原理を説明してください。",
    "ブロックチェーン技術の仕組みについて教えてください。",
    "5Gネットワークの特徴を説明してください。",
]

print("Starting parallel batch generation...")
results = parallel_batch_generate(prompts, max_workers=2)

for i, result in enumerate(results):
    print(f"\nPrompt {i+1}:")
    if 'generated_text' in result:
        print(result['generated_text'][:150] + "...")
    else:
        print(f"Error: {result.get('error', 'Unknown error')}")

7.3 Batch processing with progress bar

def batch_generate_with_progress(prompts, batch_size=5):
    """プログレスバー付きバッチ処理"""
    results = []

    try:
        from tqdm import tqdm
        use_tqdm = True
    except ImportError:
        use_tqdm = False
        print("tqdm not available, using simple progress")

    if use_tqdm:
        pbar = tqdm(total=len(prompts), desc="Generating")

    for i in range(0, len(prompts), batch_size):
        batch = prompts[i:i + batch_size]

        # Batch processing
        batch_results = client.batch_generate(batch, max_length=120)

        for result in batch_results:
            results.append(result)
            if use_tqdm:
                pbar.update(1)
            else:
                print(f"Processed {len(results)}/{len(prompts)} prompts")

    if use_tqdm:
        pbar.close()

    return results

# Usage example
prompts = [f"トピック {i} についての説明を書いてください。" for i in range(1, 21)]

results = batch_generate_with_progress(prompts, batch_size=3)

successful = sum(1 for r in results if 'generated_text' in r)
print(f"\n✓ Successfully generated {successful}/{len(prompts)} responses")

8. Monitoring and statistics

8.1 Monitoring client statistics

# Get basic statistics
stats = client.get_stats()
print("=== Client Statistics ===")
print(f"Total requests: {stats['requests']}")
print(f"Total errors: {stats['errors']}")
print(f"Total retries: {stats['retries']}")
print(f"Average latency: {stats['average_latency']:.3f}s")
print(f"Error rate: {stats['error_rate']:.1%}")
print(f"Retry rate: {stats['retry_rate']:.1%}")

# Reset statistics
client.reset_stats()
print("✓ Statistics reset")

8.2 Performance monitoring

import time

def benchmark_generation(prompts, num_runs=5):
    """生成パフォーマンスのベンチマーク"""
    results = []

    for run in range(num_runs):
        print(f"\n--- Benchmark Run {run + 1}/{num_runs} ---")

        start_time = time.time()
        batch_results = client.batch_generate(prompts, max_length=100)
        end_time = time.time()

        run_time = end_time - start_time
        successful = sum(1 for r in batch_results if 'generated_text' in r)

        results.append({
            'run_time': run_time,
            'successful': successful,
            'total': len(prompts),
            'avg_time_per_prompt': run_time / len(prompts)
        })

        print(f"Time: {run_time:.2f}s")
        print(f"Success rate: {successful}/{len(prompts)}")

    # statistical calculation
    avg_time = sum(r['run_time'] for r in results) / len(results)
    avg_success_rate = sum(r['successful'] for r in results) / sum(r['total'] for r in results)
    avg_time_per_prompt = sum(r['avg_time_per_prompt'] for r in results) / len(results)

    print("
=== Benchmark Summary ===")
    print(f"Average time: {avg_time:.2f}s")
    print(f"Average success rate: {avg_success_rate:.1%}")
    print(f"Average time per prompt: {avg_time_per_prompt:.3f}s")

    return results

# Usage example
test_prompts = [
    "Hello, world!",
    "What is AI?",
    "Explain machine learning."
]

benchmark_results = benchmark_generation(test_prompts, num_runs=3)

8.3 System resource monitoring

# Obtaining system resource usage status
try:
    resources = client.get_resource_usage()
    print("=== System Resources ===")
    print(f"CPU usage: {resources.get('cpu_percent', 'N/A')}%")
    print(f"Memory usage: {resources.get('memory_percent', 'N/A')}%")
    print(f"Disk usage: {resources.get('disk_percent', 'N/A')}%")

    # Resources by node
    nodes = resources.get('nodes', [])
    for node in nodes:
        print(f"Node {node.get('id')}: CPU {node.get('cpu')}%, Memory {node.get('memory')}%")

except Exception as e:
    print(f"Resource monitoring not available: {e}")

8.4 Delay monitoring

# Get delay statistics
try:
    latency_stats = client.get_latency_stats()
    print("=== Latency Statistics ===")

    for component, stats in latency_stats.items():
        print(f"{component}:")
        print(f"  Average: {stats.get('avg', 'N/A')}ms")
        print(f"  Min: {stats.get('min', 'N/A')}ms")
        print(f"  Max: {stats.get('max', 'N/A')}ms")
        print(f"  P95: {stats.get('p95', 'N/A')}ms")

    # Check for delayed targets
    target_check = client.check_latency_target()
    print(f"\nLatency targets met: {target_check.get('met', 'Unknown')}")

except Exception as e:
    print(f"Latency monitoring not available: {e}")

9. Distributed Coordinator

9.1 Initializing the Distributed Coordinator

# Initializing the distributed coordinator
client.init_coordinator(
    node_id="tutorial_node_1",
    zenoh_config={
        "connect": ["tcp/127.0.0.1:7447"]
    },
    raft_config={
        "election_timeout": [5000, 10000]
    }
)

print("✓ Distributed coordinator initialized")

9.2 Starting and stopping the Coordinator

# Starting the Coordinator
client.start_coordinator()
print("✓ Coordinator started")

# Checking the cluster status
cluster_status = client.get_cluster_status()
print(f"Leader: {cluster_status.get('leader_id', 'None')}")
print(f"Active nodes: {len(cluster_status.get('nodes', {}))}")

# Stopping the Coordinator
client.stop_coordinator()
print("✓ Coordinator stopped")

9.3 Submitting collaborative tasks

# Starting the Coordinator
client.init_coordinator("tutorial_node_1")
client.start_coordinator()

# Submit a collaborative task
task_id = client.submit_coordination_task(
    task_type="federated_learning",
    payload={
        "model": "resnet50",
        "dataset": "cifar10",
        "rounds": 10,
        "learning_rate": 0.01
    }
)

print(f"✓ Coordination task submitted: {task_id}")

# Monitoring task status
import time

for _ in range(30):  # Monitor for 30 seconds
    status = client.get_coordination_task_status(task_id)
    if status:
        print(f"Task status: {status['status']}")
        if status['status'] in ['completed', 'failed']:
            break
    time.sleep(1)

print("Task monitoring completed")

Internal task execution logic (simple implementation built into SDK)

  • federated_learning: Average payload['updates'] (list of numeric dictionaries) and give aggregated_parameters.
  • distributed_inference: Convert payload['inputs'] / payload['batches'] into the result as is, and return each entry with node_id and status=completed.
  • model_aggregation: Average the weights list contained in payload['models'] and generate aggregated_model['weights'].
  • Node discovery and cleanup: Automatically delete nodes that have no heartbeat for a certain period of time while incorporating /nodes/list responses via Zenoh.

9.4 Node management

# Registering a new node
node_info = {
    "address": "192.168.1.100",
    "port": 8001,
    "capabilities": ["gpu", "cpu"],
    "resources": {
        "cpu_cores": 8,
        "memory_gb": 16,
        "gpu_count": 1
    }
}

success = client.register_coordination_node("worker_node_1", node_info)
print(f"✓ Node registration: {'successful' if success else 'failed'}")

# Check registered nodes
cluster_status = client.get_cluster_status()
print("Registered nodes:")
for node_id, node_data in cluster_status.get('nodes', {}).items():
    print(f"  {node_id}: {node_data.get('capabilities', [])}")

# Release a node
success = client.unregister_coordination_node("worker_node_1")
print(f"✓ Node unregistration: {'successful' if success else 'failed'}")

9.5 Advanced Cooperative Scenarios

# Cooperative execution of multiple tasks
tasks = [
    {
        "type": "model_training",
        "payload": {"model": "bert", "dataset": "squad"}
    },
    {
        "type": "data_processing",
        "payload": {"operation": "preprocessing", "data_size": "large"}
    },
    {
        "type": "inference",
        "payload": {"model": "gpt2", "batch_size": 32}
    }
]

task_ids = []
for task in tasks:
    task_id = client.submit_coordination_task(task["type"], task["payload"])
    task_ids.append(task_id)
    print(f"✓ Submitted {task['type']} task: {task_id}")

# Status monitoring of all tasks
completed_tasks = 0
while completed_tasks < len(task_ids):
    for task_id in task_ids:
        status = client.get_coordination_task_status(task_id)
        if status and status['status'] == 'completed':
            if task_id not in [t['id'] for t in completed_tasks]:
                print(f"✓ Task {task_id} completed")
                completed_tasks += 1
    time.sleep(2)

print("All coordination tasks completed")

10. Advanced features

9.1 Snapshot Management

# Creating a system snapshot
try:
    snapshot = client.create_snapshot(
        snapshot_name="tutorial_backup",
        include_models=True,
        include_data=True,
        compression_level=6
    )
    print(f"✓ Snapshot created: {snapshot}")

    # Snapshot list
    snapshots = client.list_snapshots()
    print(f"Available snapshots: {len(snapshots)}")

    # Verifying the snapshot
    if snapshots:
        snapshot_path = snapshots[0].get('path')
        validation = client.validate_snapshot(snapshot_path)
        print(f"Snapshot validation: {validation}")

except Exception as e:
    print(f"Snapshot operations not available: {e}")

9.2 Scalability test

# Running scalability tests
try:
    scalability_test = client.run_scalability_test(
        max_nodes=20,
        test_duration=30
    )
    print(f"✓ Scalability test started: {scalability_test}")

    # Get test results
    results = client.get_scalability_results()
    print(f"Scalability results: {results}")

    # node scalability test
    node_test = client.test_node_scalability(
        node_counts=[5, 10, 15, 20],
        test_duration=60
    )
    print(f"Node scalability test: {node_test}")

except Exception as e:
    print(f"Scalability testing not available: {e}")

9.3 Zenoh Communication

# Zenoh connection
try:
    zenoh_connection = client.connect_zenoh(node_id="tutorial_client")
    print(f"✓ Connected to Zenoh: {zenoh_connection}")

    # Send message
    message = client.publish_zenoh_message(
        topic="tutorial/test",
        payload={"message": "Hello from SDK tutorial", "timestamp": time.time()},
        priority="normal",
        message_type="notification"
    )
    print(f"✓ Message published: {message}")

    # Send request
    request = client.send_zenoh_request(
        target_node="brain_node_1",
        request={"action": "get_status"},
        timeout=5.0
    )
    print(f"✓ Request sent: {request}")

    # Get statistics
    zenoh_stats = client.get_zenoh_stats()
    print(f"Zenoh stats: {zenoh_stats}")

except Exception as e:
    print(f"Zenoh operations not available: {e}")

9.4 AEG-Comm communication optimization ⭐ NEW (2026-01-23)

AEG-Comm is a feature that intelligently optimizes communication in distributed brain simulations.

# AEG-Comm settings
try:
    # Communication optimization settings
    config_result = client.set_aeg_comm_config(
        node_id="brain_node_1",
        enable_comm=True,
        energy_threshold=10.0,
        critical_modalities=["force", "safety", "text"],
        force_change_threshold=10.0
    )
    print(f"✓ AEG-Comm configured: {config_result}")

    # Communication statistics acquisition
    comm_stats = client.get_communication_stats(node_id="brain_node_1")
    print(f"通信削減率: {comm_stats.get('reduction_rate', 0)}%")
    print(f"送信パケット数: {comm_stats.get('sent_packets', 0)}")
    print(f"ブロックされたパケット数: {comm_stats.get('blocked_packets', 0)}")

    # AEG-Comm status check
    status = client.get_aeg_comm_status(node_id="brain_node_1")
    print(f"AEG-Comm status: {status}")

except Exception as e:
    print(f"AEG-Comm operations not available: {e}")

9.5 Consensus operations

# Consensus decision proposal
try:
    proposal = client.propose_consensus_decision(
        decision_type="resource_allocation",
        payload={"resource": "gpu", "amount": 50},
        priority=2,
        dependencies=["previous_decision_123"]
    )
    print(f"✓ Consensus proposal submitted: {proposal}")

    # Obtain decision result
    if 'proposal_id' in proposal:
        result = client.get_consensus_result(proposal['proposal_id'], timeout=30)
        print(f"✓ Consensus result: {result}")

    # Node status update
    node_update = client.update_node_status(node_id="node_1", active=True)
    print(f"✓ Node status updated: {node_update}")

    # consensus statistics
    stats = client.get_consensus_stats()
    print(f"Consensus stats: {stats}")

except Exception as e:
    print(f"Consensus operations not available: {e}")

10. Jupyter Notebook integration

10.1 Initialization in Jupyter environment

# SDK usage in Jupyter Notebook
<!-- Module 'evospikenet' not found. Check moves/renames within the package -->
<!-<!-- Remember: Cannot convert automatically  please fix it manually -->
jupyter_client = JupyterAPIClient()

# Display mode settings
jupyter_client.set_display_mode("html")  # "html", "json", "text"

# Server information display
jupyter_client.show_server_info()

# Statistics display
jupyter_client.show_stats()

10.2 Jupyter Magic Commands

Use below magic command in Jupyter Notebook:

# Text generation using cell magic
%%evospikenet_generate 100
人工知能の最新トレンドについて説明してください
# Connection using line magic
%evospikenet_connect http://localhost:8000

# Statistics display
%evospikenet_stats

# Server information display
%evospikenet_info

10.3 Interactive Prompt Validation

# Interactive prompt validation
test_prompt = "これはテストプロンプトです。"

is_valid = jupyter_client.validate_prompt_interactive(test_prompt)
print(f"Prompt valid: {is_valid}")

# Continue processing only if valid
if is_valid:
    result = jupyter_client.generate(test_prompt, show_output=True)

summary

In this tutorial, you learned how to use the EvoSpikeNet SDK from basic usage to advanced features. Main points:

  1. Basic setup: Client initialization and server connection
  2. Text generation: simple and batch generation
  3. Multimodal processing: Prompts with images and sounds
  4. Distributed brain simulation: Status monitoring and result acquisition
  5. Artifact Management: Upload/Download/Manage
  6. Error Handling: Robust error handling and retries
  7. Batch Processing: Efficient multiple processing
  8. Monitoring: Performance and statistics monitoring
  9. Distributed Coordinator: Distributed coordination using Zenoh DDS + Raft consensus
  10. Advanced Features: Snapshots, Zenoh, AEG-Comm, Consensus
  11. Jupyter integration: Use in a notebook environment

By combining these features, you can take full advantage of EvoSpikeNet's capabilities. In actual application development, implement proper error handling and monitoring.

Next steps: - Check detailed specifications in API Reference - See practical examples in Sample Code - Learn advanced settings with Configuration Guide /Users/maoki/Documents/GitHub/EvoSpikeNet/docs/SDK_TUTORIAL.md