コンテンツにスキップ

EvoSpikeNet SDK チュートリアル

Author: Masahiro Aoki

最終更新日: 2026年1月15日

概要

このチュートリアルでは、EvoSpikeNet SDKの基本的な使用方法から高度な機能までを段階的に学習できます。各セクションには実践的なコード例と説明が含まれています。

目次

  1. 基本的なセットアップ
  2. テキスト生成
  3. マルチモーダル処理
  4. 分散脳シミュレーション
  5. アーティファクト管理
  6. エラーハンドリング
  7. バッチ処理
  8. 監視と統計
  9. 分散Coordinator
  10. 高度な機能
  11. Jupyter Notebook統合

1. 基本的なセットアップ

1.1 SDKのインポートと初期化

<!-- from evospikenet.sdk import EvoSpikeNetAPIClient -->

# 基本的なクライアント初期化
client = EvoSpikeNetAPIClient()

# カスタム設定での初期化
client = EvoSpikeNetAPIClient(
    base_url="http://localhost:8000",
    api_key="your_api_key",
    timeout=60,
    max_retries=3
)

print("✓ SDK client initialized successfully")

1.2 サーバー接続の確認

# ヘルスチェック
health = client.health_check()
print(f"Server health: {health}")

# サーバー情報取得
info = client.get_server_info()
if info:
    print(f"Server version: {info.get('version')}")
    print(f"Available models: {info.get('models', [])}")

# サーバーが準備できるまで待機
if client.wait_for_server(timeout=30):
    print("✓ Server is ready")
else:
    print("✗ Server failed to become ready")

2. テキスト生成

2.1 基本的なテキスト生成

# シンプルなテキスト生成
prompt = "人工知能の未来について説明してください。"
result = client.generate(prompt, max_length=100)

print("Prompt:", prompt)
print("Generated text:", result['generated_text'])
print("Full response:", result)

2.2 さまざまなプロンプトでの生成

prompts = [
    "Pythonの基本的な文法を説明してください。",
    "機械学習の応用例を3つ挙げてください。",
    "神経ネットワークの仕組みを簡単に説明してください。",
]

for prompt in prompts:
    result = client.generate(prompt, max_length=150)
    print(f"\n--- {prompt[:30]}... ---")
    print(result['generated_text'][:100] + "...")

2.3 生成パラメータの調整

# さまざまな長さでの生成
prompt = "量子コンピューティングについて"

lengths = [50, 100, 200]
for length in lengths:
    result = client.generate(prompt, max_length=length)
    text_length = len(result['generated_text'])
    print(f"Requested: {length}, Generated: {text_length} characters")
    print(result['generated_text'][:100] + "...\n")

3. マルチモーダル処理

3.1 画像付きプロンプト

# 画像ファイルを使用したプロンプト
image_path = "./sample_image.jpg"
prompt = "この画像に写っているものを詳しく説明してください。"

try:
    # プロンプト送信
    response = client.submit_prompt(
        prompt=prompt,
        image_path=image_path
    )
    print("✓ Prompt submitted successfully")
    print("Response ID:", response.get('id'))

    # 結果待機
    result = client.poll_for_result(timeout=120)
    if result:
        print("✓ Result received:")
        print(result['response'])
    else:
        print("✗ No result received within timeout")

except Exception as e:
    print(f"✗ Error: {e}")

3.2 音声付きプロンプト

# 音声ファイルを使用したプロンプト
audio_path = "./sample_audio.wav"
prompt = "この音声を文字起こしし、内容を要約してください。"

try:
    response = client.submit_prompt(
        prompt=prompt,
        audio_path=audio_path
    )

    result = client.poll_for_result(timeout=180)  # 音声処理は時間がかかる
    if result:
        print("✓ Audio processing result:")
        print(result['response'])
    else:
        print("✗ Audio processing timed out")

except Exception as e:
    print(f"✗ Error: {e}")

3.3 マルチモーダルバリデーション

# ファイルの存在確認とバリデーション
import os

def validate_multimodal_input(prompt, image_path=None, audio_path=None):
    """マルチモーダル入力のバリデーション"""
    if not prompt and not image_path and not audio_path:
        return False, "At least one input (prompt, image, or audio) is required"

    if image_path and not os.path.exists(image_path):
        return False, f"Image file not found: {image_path}"

    if audio_path and not os.path.exists(audio_path):
        return False, f"Audio file not found: {audio_path}"

    # ファイルサイズチェック
    if image_path:
        size = os.path.getsize(image_path) / (1024 * 1024)  # MB
        if size > 10:
            return False, f"Image file too large: {size:.1f}MB (max 10MB)"

    if audio_path:
        size = os.path.getsize(audio_path) / (1024 * 1024)  # MB
        if size > 50:
            return False, f"Audio file too large: {size:.1f}MB (max 50MB)"

    return True, "Validation passed"

# 使用例
is_valid, message = validate_multimodal_input(
    "Describe this image",
    image_path="./test.jpg"
)

if is_valid:
    print("✓ Input validation passed")
    # 処理続行
else:
    print(f"✗ Validation failed: {message}")

4. 分散脳シミュレーション

4.1 基本的なシミュレーション実行

# テキストベースのクエリ
query = "人間の脳はどのように学習するのか説明してください。"

try:
    # プロンプト送信
    response = client.submit_prompt(prompt=query)
    print(f"✓ Query submitted: {query}")

    # シミュレーション状態監視
    import time
    for i in range(10):  # 最大10回チェック
        status = client.get_simulation_status()
        print(f"Status check {i+1}: {status}")

        if status.get('completed', False):
            break

        time.sleep(2)

    # 結果取得
    result = client.get_simulation_result()
    if result and result.get('response'):
        print("✓ Simulation result:")
        print(result['response'])
    else:
        print("✗ No result available")

except Exception as e:
    print(f"✗ Simulation error: {e}")

4.2 シミュレーション状態の詳細監視

def monitor_simulation():
    """シミュレーションの詳細な状態監視"""
    response = client.submit_prompt(prompt="複雑な推論タスクを実行してください")

    print("Monitoring simulation progress...")

    while True:
        status = client.get_simulation_status()

        # 状態表示
        active_nodes = status.get('active_nodes', 0)
        total_nodes = status.get('total_nodes', 0)
        completed_tasks = status.get('completed_tasks', 0)
        total_tasks = status.get('total_tasks', 0)

        print(f"Active nodes: {active_nodes}/{total_nodes}")
        print(f"Completed tasks: {completed_tasks}/{total_tasks}")

        # ノード別の詳細
        nodes = status.get('nodes', [])
        for node in nodes:
            node_id = node.get('id')
            node_status = node.get('status')
            node_load = node.get('load', 0)
            print(f"  Node {node_id}: {node_status} (load: {node_load}%)")

        if status.get('completed', False):
            print("✓ Simulation completed")
            break

        if status.get('failed', False):
            print("✗ Simulation failed")
            break

        time.sleep(5)

    # 最終結果取得
    result = client.get_simulation_result()
    return result

# 実行
result = monitor_simulation()
if result:
    print("Final result:", result.get('response'))

4.3 リモートログ取得

# リモートノードからのログ取得
remote_config = {
    'user': 'ubuntu',
    'ip': '192.168.1.100',
    'key_path': '~/.ssh/id_rsa',
    'log_file_path': '/var/log/evospikenet/simulation.log'
}

try:
    logs = client.get_remote_log(**remote_config)
    print("✓ Remote logs retrieved:")
    print(logs.get('content', 'No content'))

except Exception as e:
    print(f"✗ Failed to retrieve remote logs: {e}")

5. アーティファクト管理

5.1 ログセッションの作成

# 新しいログセッション作成
session = client.create_log_session(
    description="Tutorial session for artifact management"
)
session_id = session.get('session_id')
print(f"✓ Created session: {session_id}")

5.2 モデルのアップロード

import io

# モデルファイルのアップロード
model_path = "./trained_model.pkl"

with open(model_path, 'rb') as f:
    model_data = io.BytesIO(f.read())

result = client.upload_artifact(
    session_id=session_id,
    artifact_type="model",
    name="tutorial_model_v1",
    file=model_data,
    llm_type="SpikingEvoVisionEncoder",
    model_category="image_classification",
    model_variant="standard"
)

print(f"✓ Model uploaded: {result}")

5.3 アーティファクトの管理

# アーティファクト一覧取得
artifacts = client.list_artifacts()
print(f"Total artifacts: {len(artifacts) if isinstance(artifacts, list) else 'N/A'}")

# モデルタイプのみフィルタリング
models = client.list_artifacts(artifact_type="model")
print(f"Model artifacts: {len(models) if isinstance(models, list) else 'N/A'}")

# 特定のアーティファクトダウンロード
if isinstance(artifacts, list) and artifacts:
    artifact_id = artifacts[0].get('id')
    client.download_artifact(artifact_id, "./downloaded_model.pkl")
    print(f"✓ Downloaded artifact: {artifact_id}")

5.4 設定ファイルの管理

# 設定ファイルのアップロード
config_data = """
model:
  type: SpikingEvoVisionEncoder
  layers: 5
  neurons_per_layer: 100

training:
  epochs: 100
  batch_size: 32
  learning_rate: 0.001
"""

config_file = io.BytesIO(config_data.encode('utf-8'))

result = client.upload_artifact(
    session_id=session_id,
    artifact_type="config",
    name="tutorial_config",
    file=config_file
)

print(f"✓ Config uploaded: {result}")

6. エラーハンドリング

6.1 基本的なエラーハンドリング

<!-- TODO: update or remove - import faileNetAPIError -->

def safe_generate(prompt, max_retries=3):
    """安全なテキスト生成関数"""
    for attempt in range(max_retries):
        try:
            result = client.generate(prompt, max_length=100)
            return result

        except EvoSpikeNetAPIError as e:
            print(f"Attempt {attempt + 1} failed: {e.error_info.message}")

            if e.error_info.retry_after:
                print(f"Retrying after {e.error_info.retry_after} seconds...")
                time.sleep(e.error_info.retry_after)
            else:
                break

        except Exception as e:
            print(f"Unexpected error: {e}")
            break

    return None

# 使用例
result = safe_generate("Test prompt")
if result:
    print("✓ Generation successful:", result['generated_text'])
else:
    print("✗ All attempts failed")

6.2 包括的なエラーハンドリング

def robust_simulation_workflow(prompt):
    """堅牢なシミュレーションワークフロー"""
    try:
        # 1. プロンプトバリデーション
        if not client.validate_prompt(prompt):
            raise ValueError("Invalid prompt")

        # 2. プロンプト送信(リトライ付き)
        response = client.with_error_handling(
            client.submit_prompt,
            prompt=prompt,
            retries=3
        )

        if not response:
            raise RuntimeError("Failed to submit prompt")

        # 3. 結果ポーリング(タイムアウト付き)
        result = client.poll_for_result(timeout=300, interval=10)

        if not result:
            raise TimeoutError("Simulation timed out")

        return result

    except EvoSpikeNetAPIError as e:
        print(f"API Error: {e.error_info.error_type}")
        print(f"Message: {e.error_info.message}")
        if e.error_info.details:
            print(f"Details: {e.error_info.details}")
        return None

    except Exception as e:
        print(f"Unexpected error: {type(e).__name__}: {e}")
        return None

# 使用例
result = robust_simulation_workflow("複雑な分析タスク")
if result:
    print("✓ Workflow completed successfully")
    print("Result:", result.get('response'))
else:
    print("✗ Workflow failed")

6.3 カスタムエラーハンドラー

class SimulationErrorHandler:
    """シミュレーションエラーのカスタムハンドラー"""

    def __init__(self, client):
        self.client = client
        self.error_counts = {}

    def handle_error(self, error, context=""):
        """エラーハンドリングとログ記録"""
        error_type = type(error).__name__

        # エラーカウント
        self.error_counts[error_type] = self.error_counts.get(error_type, 0) + 1

        # ログ記録
        print(f"[{context}] Error: {error_type}")
        print(f"Message: {str(error)}")

        if isinstance(error, EvoSpikeNetAPIError):
            print(f"API Error Type: {error.error_info.error_type}")
            print(f"Status Code: {error.error_info.status_code}")

            # 特定のエラーに対する処理
            if error.error_info.error_type == "TimeoutError":
                print("→ Consider increasing timeout or checking server load")
            elif error.error_info.error_type == "ConnectionError":
                print("→ Check network connectivity and server status")

        # リトライ判断
        should_retry = self.should_retry(error)
        if should_retry:
            print("→ Retrying operation...")
        else:
            print("→ Not retrying this type of error")

        return should_retry

    def should_retry(self, error):
        """リトライが必要かどうかの判断"""
        if isinstance(error, EvoSpikeNetAPIError):
            # サーバーエラーやタイムアウトはリトライ
            if error.error_info.status_code in [500, 502, 503, 504]:
                return True
            if error.error_info.error_type in ["TimeoutError", "ConnectionError"]:
                return True

        return False

    def get_error_summary(self):
        """エラー統計の取得"""
        return {
            "total_errors": sum(self.error_counts.values()),
            "error_types": self.error_counts.copy()
        }

# 使用例
handler = SimulationErrorHandler(client)

try:
    result = client.generate("Test prompt")
except Exception as e:
    should_retry = handler.handle_error(e, "text_generation")
    if should_retry:
        # リトライ処理
        pass

print("Error summary:", handler.get_error_summary())

7. バッチ処理

7.1 複数プロンプトのバッチ生成

# バッチテキスト生成
prompts = [
    "Pythonのリスト内包表記について説明してください。",
    "機械学習における過学習を防ぐ方法を教えてください。",
    "ニューラルネットワークの活性化関数について説明してください。",
    "データサイエンスのワークフローについて説明してください。",
    "クラウドコンピューティングの利点を挙げてください。",
]

print(f"Processing {len(prompts)} prompts...")

# バッチ処理
results = client.batch_generate(prompts, max_length=150)

# 結果表示
for i, (prompt, result) in enumerate(zip(prompts, results), 1):
    print(f"\n--- Prompt {i} ---")
    print(f"Input: {prompt}")

    if 'generated_text' in result:
        print(f"Output: {result['generated_text'][:200]}...")
    elif 'error' in result:
        print(f"Error: {result['error']}")
    else:
        print("Unexpected result format")

7.2 並列バッチ処理

import concurrent.futures
import threading

def generate_with_thread_safety(prompt, client, results, index):
    """スレッドセーフな生成関数"""
    try:
        result = client.generate(prompt, max_length=100)
        results[index] = result
        print(f"✓ Completed prompt {index + 1}")
    except Exception as e:
        results[index] = {"error": str(e), "prompt": prompt}
        print(f"✗ Failed prompt {index + 1}: {e}")

def parallel_batch_generate(prompts, max_workers=3):
    """並列バッチ生成"""
    results = [None] * len(prompts)

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        for i, prompt in enumerate(prompts):
            future = executor.submit(generate_with_thread_safety, prompt, client, results, i)
            futures.append(future)

        # 完了待機
        concurrent.futures.wait(futures)

    return results

# 使用例
prompts = [
    "量子コンピューティングの基本原理を説明してください。",
    "ブロックチェーン技術の仕組みについて教えてください。",
    "5Gネットワークの特徴を説明してください。",
]

print("Starting parallel batch generation...")
results = parallel_batch_generate(prompts, max_workers=2)

for i, result in enumerate(results):
    print(f"\nPrompt {i+1}:")
    if 'generated_text' in result:
        print(result['generated_text'][:150] + "...")
    else:
        print(f"Error: {result.get('error', 'Unknown error')}")

7.3 プログレスバー付きバッチ処理

def batch_generate_with_progress(prompts, batch_size=5):
    """プログレスバー付きバッチ処理"""
    results = []

    try:
        from tqdm import tqdm
        use_tqdm = True
    except ImportError:
        use_tqdm = False
        print("tqdm not available, using simple progress")

    if use_tqdm:
        pbar = tqdm(total=len(prompts), desc="Generating")

    for i in range(0, len(prompts), batch_size):
        batch = prompts[i:i + batch_size]

        # バッチ処理
        batch_results = client.batch_generate(batch, max_length=120)

        for result in batch_results:
            results.append(result)
            if use_tqdm:
                pbar.update(1)
            else:
                print(f"Processed {len(results)}/{len(prompts)} prompts")

    if use_tqdm:
        pbar.close()

    return results

# 使用例
prompts = [f"トピック {i} についての説明を書いてください。" for i in range(1, 21)]

results = batch_generate_with_progress(prompts, batch_size=3)

successful = sum(1 for r in results if 'generated_text' in r)
print(f"\n✓ Successfully generated {successful}/{len(prompts)} responses")

8. 監視と統計

8.1 クライアント統計の監視

# 基本統計の取得
stats = client.get_stats()
print("=== Client Statistics ===")
print(f"Total requests: {stats['requests']}")
print(f"Total errors: {stats['errors']}")
print(f"Total retries: {stats['retries']}")
print(f"Average latency: {stats['average_latency']:.3f}s")
print(f"Error rate: {stats['error_rate']:.1%}")
print(f"Retry rate: {stats['retry_rate']:.1%}")

# 統計のリセット
client.reset_stats()
print("✓ Statistics reset")

8.2 パフォーマンス監視

import time

def benchmark_generation(prompts, num_runs=5):
    """生成パフォーマンスのベンチマーク"""
    results = []

    for run in range(num_runs):
        print(f"\n--- Benchmark Run {run + 1}/{num_runs} ---")

        start_time = time.time()
        batch_results = client.batch_generate(prompts, max_length=100)
        end_time = time.time()

        run_time = end_time - start_time
        successful = sum(1 for r in batch_results if 'generated_text' in r)

        results.append({
            'run_time': run_time,
            'successful': successful,
            'total': len(prompts),
            'avg_time_per_prompt': run_time / len(prompts)
        })

        print(f"Time: {run_time:.2f}s")
        print(f"Success rate: {successful}/{len(prompts)}")

    # 統計計算
    avg_time = sum(r['run_time'] for r in results) / len(results)
    avg_success_rate = sum(r['successful'] for r in results) / sum(r['total'] for r in results)
    avg_time_per_prompt = sum(r['avg_time_per_prompt'] for r in results) / len(results)

    print("
=== Benchmark Summary ===")
    print(f"Average time: {avg_time:.2f}s")
    print(f"Average success rate: {avg_success_rate:.1%}")
    print(f"Average time per prompt: {avg_time_per_prompt:.3f}s")

    return results

# 使用例
test_prompts = [
    "Hello, world!",
    "What is AI?",
    "Explain machine learning."
]

benchmark_results = benchmark_generation(test_prompts, num_runs=3)

8.3 システムリソース監視

# システムリソース使用状況の取得
try:
    resources = client.get_resource_usage()
    print("=== System Resources ===")
    print(f"CPU usage: {resources.get('cpu_percent', 'N/A')}%")
    print(f"Memory usage: {resources.get('memory_percent', 'N/A')}%")
    print(f"Disk usage: {resources.get('disk_percent', 'N/A')}%")

    # ノード別のリソース
    nodes = resources.get('nodes', [])
    for node in nodes:
        print(f"Node {node.get('id')}: CPU {node.get('cpu')}%, Memory {node.get('memory')}%")

except Exception as e:
    print(f"Resource monitoring not available: {e}")

8.4 遅延監視

# 遅延統計の取得
try:
    latency_stats = client.get_latency_stats()
    print("=== Latency Statistics ===")

    for component, stats in latency_stats.items():
        print(f"{component}:")
        print(f"  Average: {stats.get('avg', 'N/A')}ms")
        print(f"  Min: {stats.get('min', 'N/A')}ms")
        print(f"  Max: {stats.get('max', 'N/A')}ms")
        print(f"  P95: {stats.get('p95', 'N/A')}ms")

    # 遅延ターゲットのチェック
    target_check = client.check_latency_target()
    print(f"\nLatency targets met: {target_check.get('met', 'Unknown')}")

except Exception as e:
    print(f"Latency monitoring not available: {e}")

9. 分散Coordinator

9.1 分散Coordinatorの初期化

# 分散Coordinatorの初期化
client.init_coordinator(
    node_id="tutorial_node_1",
    zenoh_config={
        "connect": ["tcp/127.0.0.1:7447"]
    },
    raft_config={
        "election_timeout": [5000, 10000]
    }
)

print("✓ Distributed coordinator initialized")

9.2 Coordinatorの開始と停止

# Coordinatorの開始
client.start_coordinator()
print("✓ Coordinator started")

# クラスタ状態の確認
cluster_status = client.get_cluster_status()
print(f"Leader: {cluster_status.get('leader_id', 'None')}")
print(f"Active nodes: {len(cluster_status.get('nodes', {}))}")

# Coordinatorの停止
client.stop_coordinator()
print("✓ Coordinator stopped")

9.3 協調タスクの送信

# Coordinatorの開始
client.init_coordinator("tutorial_node_1")
client.start_coordinator()

# 協調タスクの送信
task_id = client.submit_coordination_task(
    task_type="federated_learning",
    payload={
        "model": "resnet50",
        "dataset": "cifar10",
        "rounds": 10,
        "learning_rate": 0.01
    }
)

print(f"✓ Coordination task submitted: {task_id}")

# タスク状態の監視
import time

for _ in range(30):  # 30秒間監視
    status = client.get_coordination_task_status(task_id)
    if status:
        print(f"Task status: {status['status']}")
        if status['status'] in ['completed', 'failed']:
            break
    time.sleep(1)

print("Task monitoring completed")

内部タスク実行ロジック(SDK組み込みの簡易実装)

  • federated_learning: payload['updates'](数値ディクショナリのリスト)を平均して aggregated_parameters を付与。
  • distributed_inference: payload['inputs'] / payload['batches'] をそのまま結果化し、各エントリに node_idstatus=completed を付けて返却。
  • model_aggregation: payload['models'] に含まれる weights リストを平均し、aggregated_model['weights'] を生成。
  • ノード発見とクリーンアップ: Zenoh経由の /nodes/list 応答を取り込みつつ、一定時間ハートビートが無いノードを自動削除。

9.4 ノード管理

# 新しいノードの登録
node_info = {
    "address": "192.168.1.100",
    "port": 8001,
    "capabilities": ["gpu", "cpu"],
    "resources": {
        "cpu_cores": 8,
        "memory_gb": 16,
        "gpu_count": 1
    }
}

success = client.register_coordination_node("worker_node_1", node_info)
print(f"✓ Node registration: {'successful' if success else 'failed'}")

# 登録ノードの確認
cluster_status = client.get_cluster_status()
print("Registered nodes:")
for node_id, node_data in cluster_status.get('nodes', {}).items():
    print(f"  {node_id}: {node_data.get('capabilities', [])}")

# ノードの解除
success = client.unregister_coordination_node("worker_node_1")
print(f"✓ Node unregistration: {'successful' if success else 'failed'}")

9.5 高度な協調シナリオ

# 複数タスクの協調実行
tasks = [
    {
        "type": "model_training",
        "payload": {"model": "bert", "dataset": "squad"}
    },
    {
        "type": "data_processing",
        "payload": {"operation": "preprocessing", "data_size": "large"}
    },
    {
        "type": "inference",
        "payload": {"model": "gpt2", "batch_size": 32}
    }
]

task_ids = []
for task in tasks:
    task_id = client.submit_coordination_task(task["type"], task["payload"])
    task_ids.append(task_id)
    print(f"✓ Submitted {task['type']} task: {task_id}")

# 全タスクの状態監視
completed_tasks = 0
while completed_tasks < len(task_ids):
    for task_id in task_ids:
        status = client.get_coordination_task_status(task_id)
        if status and status['status'] == 'completed':
            if task_id not in [t['id'] for t in completed_tasks]:
                print(f"✓ Task {task_id} completed")
                completed_tasks += 1
    time.sleep(2)

print("All coordination tasks completed")

10. 高度な機能

9.1 スナップショット管理

# システムスナップショットの作成
try:
    snapshot = client.create_snapshot(
        snapshot_name="tutorial_backup",
        include_models=True,
        include_data=True,
        compression_level=6
    )
    print(f"✓ Snapshot created: {snapshot}")

    # スナップショット一覧
    snapshots = client.list_snapshots()
    print(f"Available snapshots: {len(snapshots)}")

    # スナップショットの検証
    if snapshots:
        snapshot_path = snapshots[0].get('path')
        validation = client.validate_snapshot(snapshot_path)
        print(f"Snapshot validation: {validation}")

except Exception as e:
    print(f"Snapshot operations not available: {e}")

9.2 スケーラビリティテスト

# スケーラビリティテストの実行
try:
    scalability_test = client.run_scalability_test(
        max_nodes=20,
        test_duration=30
    )
    print(f"✓ Scalability test started: {scalability_test}")

    # テスト結果取得
    results = client.get_scalability_results()
    print(f"Scalability results: {results}")

    # ノードスケーラビリティテスト
    node_test = client.test_node_scalability(
        node_counts=[5, 10, 15, 20],
        test_duration=60
    )
    print(f"Node scalability test: {node_test}")

except Exception as e:
    print(f"Scalability testing not available: {e}")

9.3 Zenoh通信

# Zenoh接続
try:
    zenoh_connection = client.connect_zenoh(node_id="tutorial_client")
    print(f"✓ Connected to Zenoh: {zenoh_connection}")

    # メッセージ送信
    message = client.publish_zenoh_message(
        topic="tutorial/test",
        payload={"message": "Hello from SDK tutorial", "timestamp": time.time()},
        priority="normal",
        message_type="notification"
    )
    print(f"✓ Message published: {message}")

    # リクエスト送信
    request = client.send_zenoh_request(
        target_node="brain_node_1",
        request={"action": "get_status"},
        timeout=5.0
    )
    print(f"✓ Request sent: {request}")

    # 統計取得
    zenoh_stats = client.get_zenoh_stats()
    print(f"Zenoh stats: {zenoh_stats}")

except Exception as e:
    print(f"Zenoh operations not available: {e}")

9.4 AEG-Comm通信最適化 ⭐ NEW (2026-01-23)

AEG-Commは分散脳シミュレーションにおける通信をインテリジェントに最適化する機能です。

# AEG-Comm設定
try:
    # 通信最適化設定
    config_result = client.set_aeg_comm_config(
        node_id="brain_node_1",
        enable_comm=True,
        energy_threshold=10.0,
        critical_modalities=["force", "safety", "text"],
        force_change_threshold=10.0
    )
    print(f"✓ AEG-Comm configured: {config_result}")

    # 通信統計取得
    comm_stats = client.get_communication_stats(node_id="brain_node_1")
    print(f"通信削減率: {comm_stats.get('reduction_rate', 0)}%")
    print(f"送信パケット数: {comm_stats.get('sent_packets', 0)}")
    print(f"ブロックされたパケット数: {comm_stats.get('blocked_packets', 0)}")

    # AEG-Commステータス確認
    status = client.get_aeg_comm_status(node_id="brain_node_1")
    print(f"AEG-Comm status: {status}")

except Exception as e:
    print(f"AEG-Comm operations not available: {e}")

9.5 コンセンサス操作

# コンセンサス決定の提案
try:
    proposal = client.propose_consensus_decision(
        decision_type="resource_allocation",
        payload={"resource": "gpu", "amount": 50},
        priority=2,
        dependencies=["previous_decision_123"]
    )
    print(f"✓ Consensus proposal submitted: {proposal}")

    # 決定結果取得
    if 'proposal_id' in proposal:
        result = client.get_consensus_result(proposal['proposal_id'], timeout=30)
        print(f"✓ Consensus result: {result}")

    # ノード状態更新
    node_update = client.update_node_status(node_id="node_1", active=True)
    print(f"✓ Node status updated: {node_update}")

    # コンセンサス統計
    stats = client.get_consensus_stats()
    print(f"Consensus stats: {stats}")

except Exception as e:
    print(f"Consensus operations not available: {e}")

10. Jupyter Notebook統合

10.1 Jupyter環境での初期化

# Jupyter NotebookでのSDK使用
<!-- モジュール 'evospikenet' が見つかりませんパッケージ内の移動/名前変更を確認してください -->
<!-
jupyter_client = JupyterAPIClient()

# 表示モード設定
jupyter_client.set_display_mode("html")  # "html", "json", "text"

# サーバー情報表示
jupyter_client.show_server_info()

# 統計表示
jupyter_client.show_stats()

10.2 Jupyterマジックコマンド

Jupyter Notebookで以下のマジックコマンドを使用:

# セルマジックを使用したテキスト生成
%%evospikenet_generate 100
人工知能の最新トレンドについて説明してください
# ラインマジックを使用した接続
%evospikenet_connect http://localhost:8000

# 統計表示
%evospikenet_stats

# サーバー情報表示
%evospikenet_info

10.3 インタラクティブなプロンプト検証

# インタラクティブなプロンプト検証
test_prompt = "これはテストプロンプトです。"

is_valid = jupyter_client.validate_prompt_interactive(test_prompt)
print(f"Prompt valid: {is_valid}")

# 有効な場合のみ処理続行
if is_valid:
    result = jupyter_client.generate(test_prompt, show_output=True)

まとめ

このチュートリアルでは、EvoSpikeNet SDKの基本的な使用方法から高度な機能までを学習しました。主要なポイント:

  1. 基本セットアップ: クライアントの初期化とサーバー接続
  2. テキスト生成: 単純およびバッチ生成
  3. マルチモーダル処理: 画像・音声付きプロンプト
  4. 分散脳シミュレーション: 状態監視と結果取得
  5. アーティファクト管理: アップロード・ダウンロード・管理
  6. エラーハンドリング: 堅牢なエラー処理とリトライ
  7. バッチ処理: 効率的な複数処理
  8. 監視: パフォーマンスと統計の監視
  9. 分散Coordinator: Zenoh DDS + Raftコンセンサスによる分散協調
  10. 高度な機能: スナップショット、Zenoh、AEG-Comm、コンセンサス
  11. Jupyter統合: ノートブック環境での使用

これらの機能を組み合わせることで、EvoSpikeNetの機能を最大限に活用できます。実際のアプリケーション開発では、適切なエラーハンドリングと監視を実装してください。

次のステップ: - APIリファレンスで詳細な仕様を確認 - サンプルコードで実践的な例を参照 - 設定ガイドで高度な設定を学習 /Users/maoki/Documents/GitHub/EvoSpikeNet/docs/SDK_TUTORIAL.md