EvoSpikeNet SDK チュートリアル
Copyright: 2026 Moonlight Technologies Inc.
Author: Masahiro Aoki
最終更新日: 2026年1月15日
概要
このチュートリアルでは、EvoSpikeNet SDKの基本的な使用方法から高度な機能までを段階的に学習できます。各セクションには実践的なコード例と説明が含まれています。
目次
- 基本的なセットアップ
- テキスト生成
- マルチモーダル処理
- 分散脳シミュレーション
- アーティファクト管理
- エラーハンドリング
- バッチ処理
- 監視と統計
- 分散Coordinator
- 高度な機能
- Jupyter Notebook統合
1. 基本的なセットアップ
1.1 SDKのインポートと初期化
<!-- from evospikenet.sdk import EvoSpikeNetAPIClient -->
# 基本的なクライアント初期化
client = EvoSpikeNetAPIClient()
# カスタム設定での初期化
client = EvoSpikeNetAPIClient(
base_url="http://localhost:8000",
api_key="your_api_key",
timeout=60,
max_retries=3
)
print("✓ SDK client initialized successfully")
1.2 サーバー接続の確認
# ヘルスチェック
health = client.health_check()
print(f"Server health: {health}")
# サーバー情報取得
info = client.get_server_info()
if info:
print(f"Server version: {info.get('version')}")
print(f"Available models: {info.get('models', [])}")
# サーバーが準備できるまで待機
if client.wait_for_server(timeout=30):
print("✓ Server is ready")
else:
print("✗ Server failed to become ready")
2. テキスト生成
2.1 基本的なテキスト生成
# シンプルなテキスト生成
prompt = "人工知能の未来について説明してください。"
result = client.generate(prompt, max_length=100)
print("Prompt:", prompt)
print("Generated text:", result['generated_text'])
print("Full response:", result)
2.2 さまざまなプロンプトでの生成
prompts = [
"Pythonの基本的な文法を説明してください。",
"機械学習の応用例を3つ挙げてください。",
"神経ネットワークの仕組みを簡単に説明してください。",
]
for prompt in prompts:
result = client.generate(prompt, max_length=150)
print(f"\n--- {prompt[:30]}... ---")
print(result['generated_text'][:100] + "...")
2.3 生成パラメータの調整
# さまざまな長さでの生成
prompt = "量子コンピューティングについて"
lengths = [50, 100, 200]
for length in lengths:
result = client.generate(prompt, max_length=length)
text_length = len(result['generated_text'])
print(f"Requested: {length}, Generated: {text_length} characters")
print(result['generated_text'][:100] + "...\n")
3. マルチモーダル処理
3.1 画像付きプロンプト
# 画像ファイルを使用したプロンプト
image_path = "./sample_image.jpg"
prompt = "この画像に写っているものを詳しく説明してください。"
try:
# プロンプト送信
response = client.submit_prompt(
prompt=prompt,
image_path=image_path
)
print("✓ Prompt submitted successfully")
print("Response ID:", response.get('id'))
# 結果待機
result = client.poll_for_result(timeout=120)
if result:
print("✓ Result received:")
print(result['response'])
else:
print("✗ No result received within timeout")
except Exception as e:
print(f"✗ Error: {e}")
3.2 音声付きプロンプト
# 音声ファイルを使用したプロンプト
audio_path = "./sample_audio.wav"
prompt = "この音声を文字起こしし、内容を要約してください。"
try:
response = client.submit_prompt(
prompt=prompt,
audio_path=audio_path
)
result = client.poll_for_result(timeout=180) # 音声処理は時間がかかる
if result:
print("✓ Audio processing result:")
print(result['response'])
else:
print("✗ Audio processing timed out")
except Exception as e:
print(f"✗ Error: {e}")
3.3 マルチモーダルバリデーション
# ファイルの存在確認とバリデーション
import os
def validate_multimodal_input(prompt, image_path=None, audio_path=None):
"""マルチモーダル入力のバリデーション"""
if not prompt and not image_path and not audio_path:
return False, "At least one input (prompt, image, or audio) is required"
if image_path and not os.path.exists(image_path):
return False, f"Image file not found: {image_path}"
if audio_path and not os.path.exists(audio_path):
return False, f"Audio file not found: {audio_path}"
# ファイルサイズチェック
if image_path:
size = os.path.getsize(image_path) / (1024 * 1024) # MB
if size > 10:
return False, f"Image file too large: {size:.1f}MB (max 10MB)"
if audio_path:
size = os.path.getsize(audio_path) / (1024 * 1024) # MB
if size > 50:
return False, f"Audio file too large: {size:.1f}MB (max 50MB)"
return True, "Validation passed"
# 使用例
is_valid, message = validate_multimodal_input(
"Describe this image",
image_path="./test.jpg"
)
if is_valid:
print("✓ Input validation passed")
# 処理続行
else:
print(f"✗ Validation failed: {message}")
4. 分散脳シミュレーション
4.1 基本的なシミュレーション実行
# テキストベースのクエリ
query = "人間の脳はどのように学習するのか説明してください。"
try:
# プロンプト送信
response = client.submit_prompt(prompt=query)
print(f"✓ Query submitted: {query}")
# シミュレーション状態監視
import time
for i in range(10): # 最大10回チェック
status = client.get_simulation_status()
print(f"Status check {i+1}: {status}")
if status.get('completed', False):
break
time.sleep(2)
# 結果取得
result = client.get_simulation_result()
if result and result.get('response'):
print("✓ Simulation result:")
print(result['response'])
else:
print("✗ No result available")
except Exception as e:
print(f"✗ Simulation error: {e}")
4.2 シミュレーション状態の詳細監視
def monitor_simulation():
"""シミュレーションの詳細な状態監視"""
response = client.submit_prompt(prompt="複雑な推論タスクを実行してください")
print("Monitoring simulation progress...")
while True:
status = client.get_simulation_status()
# 状態表示
active_nodes = status.get('active_nodes', 0)
total_nodes = status.get('total_nodes', 0)
completed_tasks = status.get('completed_tasks', 0)
total_tasks = status.get('total_tasks', 0)
print(f"Active nodes: {active_nodes}/{total_nodes}")
print(f"Completed tasks: {completed_tasks}/{total_tasks}")
# ノード別の詳細
nodes = status.get('nodes', [])
for node in nodes:
node_id = node.get('id')
node_status = node.get('status')
node_load = node.get('load', 0)
print(f" Node {node_id}: {node_status} (load: {node_load}%)")
if status.get('completed', False):
print("✓ Simulation completed")
break
if status.get('failed', False):
print("✗ Simulation failed")
break
time.sleep(5)
# 最終結果取得
result = client.get_simulation_result()
return result
# 実行
result = monitor_simulation()
if result:
print("Final result:", result.get('response'))
4.3 リモートログ取得
# リモートノードからのログ取得
remote_config = {
'user': 'ubuntu',
'ip': '192.168.1.100',
'key_path': '~/.ssh/id_rsa',
'log_file_path': '/var/log/evospikenet/simulation.log'
}
try:
logs = client.get_remote_log(**remote_config)
print("✓ Remote logs retrieved:")
print(logs.get('content', 'No content'))
except Exception as e:
print(f"✗ Failed to retrieve remote logs: {e}")
5. アーティファクト管理
5.1 ログセッションの作成
# 新しいログセッション作成
session = client.create_log_session(
description="Tutorial session for artifact management"
)
session_id = session.get('session_id')
print(f"✓ Created session: {session_id}")
5.2 モデルのアップロード
import io
# モデルファイルのアップロード
model_path = "./trained_model.pkl"
with open(model_path, 'rb') as f:
model_data = io.BytesIO(f.read())
result = client.upload_artifact(
session_id=session_id,
artifact_type="model",
name="tutorial_model_v1",
file=model_data,
llm_type="SpikingEvoVisionEncoder",
model_category="image_classification",
model_variant="standard"
)
print(f"✓ Model uploaded: {result}")
5.3 アーティファクトの管理
# アーティファクト一覧取得
artifacts = client.list_artifacts()
print(f"Total artifacts: {len(artifacts) if isinstance(artifacts, list) else 'N/A'}")
# モデルタイプのみフィルタリング
models = client.list_artifacts(artifact_type="model")
print(f"Model artifacts: {len(models) if isinstance(models, list) else 'N/A'}")
# 特定のアーティファクトダウンロード
if isinstance(artifacts, list) and artifacts:
artifact_id = artifacts[0].get('id')
client.download_artifact(artifact_id, "./downloaded_model.pkl")
print(f"✓ Downloaded artifact: {artifact_id}")
5.4 設定ファイルの管理
# 設定ファイルのアップロード
config_data = """
model:
type: SpikingEvoVisionEncoder
layers: 5
neurons_per_layer: 100
training:
epochs: 100
batch_size: 32
learning_rate: 0.001
"""
config_file = io.BytesIO(config_data.encode('utf-8'))
result = client.upload_artifact(
session_id=session_id,
artifact_type="config",
name="tutorial_config",
file=config_file
)
print(f"✓ Config uploaded: {result}")
6. エラーハンドリング
6.1 基本的なエラーハンドリング
<!-- TODO: update or remove - import faileNetAPIError -->
def safe_generate(prompt, max_retries=3):
"""安全なテキスト生成関数"""
for attempt in range(max_retries):
try:
result = client.generate(prompt, max_length=100)
return result
except EvoSpikeNetAPIError as e:
print(f"Attempt {attempt + 1} failed: {e.error_info.message}")
if e.error_info.retry_after:
print(f"Retrying after {e.error_info.retry_after} seconds...")
time.sleep(e.error_info.retry_after)
else:
break
except Exception as e:
print(f"Unexpected error: {e}")
break
return None
# 使用例
result = safe_generate("Test prompt")
if result:
print("✓ Generation successful:", result['generated_text'])
else:
print("✗ All attempts failed")
6.2 包括的なエラーハンドリング
def robust_simulation_workflow(prompt):
"""堅牢なシミュレーションワークフロー"""
try:
# 1. プロンプトバリデーション
if not client.validate_prompt(prompt):
raise ValueError("Invalid prompt")
# 2. プロンプト送信(リトライ付き)
response = client.with_error_handling(
client.submit_prompt,
prompt=prompt,
retries=3
)
if not response:
raise RuntimeError("Failed to submit prompt")
# 3. 結果ポーリング(タイムアウト付き)
result = client.poll_for_result(timeout=300, interval=10)
if not result:
raise TimeoutError("Simulation timed out")
return result
except EvoSpikeNetAPIError as e:
print(f"API Error: {e.error_info.error_type}")
print(f"Message: {e.error_info.message}")
if e.error_info.details:
print(f"Details: {e.error_info.details}")
return None
except Exception as e:
print(f"Unexpected error: {type(e).__name__}: {e}")
return None
# 使用例
result = robust_simulation_workflow("複雑な分析タスク")
if result:
print("✓ Workflow completed successfully")
print("Result:", result.get('response'))
else:
print("✗ Workflow failed")
6.3 カスタムエラーハンドラー
class SimulationErrorHandler:
"""シミュレーションエラーのカスタムハンドラー"""
def __init__(self, client):
self.client = client
self.error_counts = {}
def handle_error(self, error, context=""):
"""エラーハンドリングとログ記録"""
error_type = type(error).__name__
# エラーカウント
self.error_counts[error_type] = self.error_counts.get(error_type, 0) + 1
# ログ記録
print(f"[{context}] Error: {error_type}")
print(f"Message: {str(error)}")
if isinstance(error, EvoSpikeNetAPIError):
print(f"API Error Type: {error.error_info.error_type}")
print(f"Status Code: {error.error_info.status_code}")
# 特定のエラーに対する処理
if error.error_info.error_type == "TimeoutError":
print("→ Consider increasing timeout or checking server load")
elif error.error_info.error_type == "ConnectionError":
print("→ Check network connectivity and server status")
# リトライ判断
should_retry = self.should_retry(error)
if should_retry:
print("→ Retrying operation...")
else:
print("→ Not retrying this type of error")
return should_retry
def should_retry(self, error):
"""リトライが必要かどうかの判断"""
if isinstance(error, EvoSpikeNetAPIError):
# サーバーエラーやタイムアウトはリトライ
if error.error_info.status_code in [500, 502, 503, 504]:
return True
if error.error_info.error_type in ["TimeoutError", "ConnectionError"]:
return True
return False
def get_error_summary(self):
"""エラー統計の取得"""
return {
"total_errors": sum(self.error_counts.values()),
"error_types": self.error_counts.copy()
}
# 使用例
handler = SimulationErrorHandler(client)
try:
result = client.generate("Test prompt")
except Exception as e:
should_retry = handler.handle_error(e, "text_generation")
if should_retry:
# リトライ処理
pass
print("Error summary:", handler.get_error_summary())
7. バッチ処理
7.1 複数プロンプトのバッチ生成
# バッチテキスト生成
prompts = [
"Pythonのリスト内包表記について説明してください。",
"機械学習における過学習を防ぐ方法を教えてください。",
"ニューラルネットワークの活性化関数について説明してください。",
"データサイエンスのワークフローについて説明してください。",
"クラウドコンピューティングの利点を挙げてください。",
]
print(f"Processing {len(prompts)} prompts...")
# バッチ処理
results = client.batch_generate(prompts, max_length=150)
# 結果表示
for i, (prompt, result) in enumerate(zip(prompts, results), 1):
print(f"\n--- Prompt {i} ---")
print(f"Input: {prompt}")
if 'generated_text' in result:
print(f"Output: {result['generated_text'][:200]}...")
elif 'error' in result:
print(f"Error: {result['error']}")
else:
print("Unexpected result format")
7.2 並列バッチ処理
import concurrent.futures
import threading
def generate_with_thread_safety(prompt, client, results, index):
"""スレッドセーフな生成関数"""
try:
result = client.generate(prompt, max_length=100)
results[index] = result
print(f"✓ Completed prompt {index + 1}")
except Exception as e:
results[index] = {"error": str(e), "prompt": prompt}
print(f"✗ Failed prompt {index + 1}: {e}")
def parallel_batch_generate(prompts, max_workers=3):
"""並列バッチ生成"""
results = [None] * len(prompts)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for i, prompt in enumerate(prompts):
future = executor.submit(generate_with_thread_safety, prompt, client, results, i)
futures.append(future)
# 完了待機
concurrent.futures.wait(futures)
return results
# 使用例
prompts = [
"量子コンピューティングの基本原理を説明してください。",
"ブロックチェーン技術の仕組みについて教えてください。",
"5Gネットワークの特徴を説明してください。",
]
print("Starting parallel batch generation...")
results = parallel_batch_generate(prompts, max_workers=2)
for i, result in enumerate(results):
print(f"\nPrompt {i+1}:")
if 'generated_text' in result:
print(result['generated_text'][:150] + "...")
else:
print(f"Error: {result.get('error', 'Unknown error')}")
7.3 プログレスバー付きバッチ処理
def batch_generate_with_progress(prompts, batch_size=5):
"""プログレスバー付きバッチ処理"""
results = []
try:
from tqdm import tqdm
use_tqdm = True
except ImportError:
use_tqdm = False
print("tqdm not available, using simple progress")
if use_tqdm:
pbar = tqdm(total=len(prompts), desc="Generating")
for i in range(0, len(prompts), batch_size):
batch = prompts[i:i + batch_size]
# バッチ処理
batch_results = client.batch_generate(batch, max_length=120)
for result in batch_results:
results.append(result)
if use_tqdm:
pbar.update(1)
else:
print(f"Processed {len(results)}/{len(prompts)} prompts")
if use_tqdm:
pbar.close()
return results
# 使用例
prompts = [f"トピック {i} についての説明を書いてください。" for i in range(1, 21)]
results = batch_generate_with_progress(prompts, batch_size=3)
successful = sum(1 for r in results if 'generated_text' in r)
print(f"\n✓ Successfully generated {successful}/{len(prompts)} responses")
8. 監視と統計
8.1 クライアント統計の監視
# 基本統計の取得
stats = client.get_stats()
print("=== Client Statistics ===")
print(f"Total requests: {stats['requests']}")
print(f"Total errors: {stats['errors']}")
print(f"Total retries: {stats['retries']}")
print(f"Average latency: {stats['average_latency']:.3f}s")
print(f"Error rate: {stats['error_rate']:.1%}")
print(f"Retry rate: {stats['retry_rate']:.1%}")
# 統計のリセット
client.reset_stats()
print("✓ Statistics reset")
8.2 パフォーマンス監視
import time
def benchmark_generation(prompts, num_runs=5):
"""生成パフォーマンスのベンチマーク"""
results = []
for run in range(num_runs):
print(f"\n--- Benchmark Run {run + 1}/{num_runs} ---")
start_time = time.time()
batch_results = client.batch_generate(prompts, max_length=100)
end_time = time.time()
run_time = end_time - start_time
successful = sum(1 for r in batch_results if 'generated_text' in r)
results.append({
'run_time': run_time,
'successful': successful,
'total': len(prompts),
'avg_time_per_prompt': run_time / len(prompts)
})
print(f"Time: {run_time:.2f}s")
print(f"Success rate: {successful}/{len(prompts)}")
# 統計計算
avg_time = sum(r['run_time'] for r in results) / len(results)
avg_success_rate = sum(r['successful'] for r in results) / sum(r['total'] for r in results)
avg_time_per_prompt = sum(r['avg_time_per_prompt'] for r in results) / len(results)
print("
=== Benchmark Summary ===")
print(f"Average time: {avg_time:.2f}s")
print(f"Average success rate: {avg_success_rate:.1%}")
print(f"Average time per prompt: {avg_time_per_prompt:.3f}s")
return results
# 使用例
test_prompts = [
"Hello, world!",
"What is AI?",
"Explain machine learning."
]
benchmark_results = benchmark_generation(test_prompts, num_runs=3)
8.3 システムリソース監視
# システムリソース使用状況の取得
try:
resources = client.get_resource_usage()
print("=== System Resources ===")
print(f"CPU usage: {resources.get('cpu_percent', 'N/A')}%")
print(f"Memory usage: {resources.get('memory_percent', 'N/A')}%")
print(f"Disk usage: {resources.get('disk_percent', 'N/A')}%")
# ノード別のリソース
nodes = resources.get('nodes', [])
for node in nodes:
print(f"Node {node.get('id')}: CPU {node.get('cpu')}%, Memory {node.get('memory')}%")
except Exception as e:
print(f"Resource monitoring not available: {e}")
8.4 遅延監視
# 遅延統計の取得
try:
latency_stats = client.get_latency_stats()
print("=== Latency Statistics ===")
for component, stats in latency_stats.items():
print(f"{component}:")
print(f" Average: {stats.get('avg', 'N/A')}ms")
print(f" Min: {stats.get('min', 'N/A')}ms")
print(f" Max: {stats.get('max', 'N/A')}ms")
print(f" P95: {stats.get('p95', 'N/A')}ms")
# 遅延ターゲットのチェック
target_check = client.check_latency_target()
print(f"\nLatency targets met: {target_check.get('met', 'Unknown')}")
except Exception as e:
print(f"Latency monitoring not available: {e}")
9. 分散Coordinator
9.1 分散Coordinatorの初期化
# 分散Coordinatorの初期化
client.init_coordinator(
node_id="tutorial_node_1",
zenoh_config={
"connect": ["tcp/127.0.0.1:7447"]
},
raft_config={
"election_timeout": [5000, 10000]
}
)
print("✓ Distributed coordinator initialized")
9.2 Coordinatorの開始と停止
# Coordinatorの開始
client.start_coordinator()
print("✓ Coordinator started")
# クラスタ状態の確認
cluster_status = client.get_cluster_status()
print(f"Leader: {cluster_status.get('leader_id', 'None')}")
print(f"Active nodes: {len(cluster_status.get('nodes', {}))}")
# Coordinatorの停止
client.stop_coordinator()
print("✓ Coordinator stopped")
9.3 協調タスクの送信
# Coordinatorの開始
client.init_coordinator("tutorial_node_1")
client.start_coordinator()
# 協調タスクの送信
task_id = client.submit_coordination_task(
task_type="federated_learning",
payload={
"model": "resnet50",
"dataset": "cifar10",
"rounds": 10,
"learning_rate": 0.01
}
)
print(f"✓ Coordination task submitted: {task_id}")
# タスク状態の監視
import time
for _ in range(30): # 30秒間監視
status = client.get_coordination_task_status(task_id)
if status:
print(f"Task status: {status['status']}")
if status['status'] in ['completed', 'failed']:
break
time.sleep(1)
print("Task monitoring completed")
内部タスク実行ロジック(SDK組み込みの簡易実装)
federated_learning:payload['updates'](数値ディクショナリのリスト)を平均してaggregated_parametersを付与。distributed_inference:payload['inputs']/payload['batches']をそのまま結果化し、各エントリにnode_idとstatus=completedを付けて返却。model_aggregation:payload['models']に含まれるweightsリストを平均し、aggregated_model['weights']を生成。- ノード発見とクリーンアップ: Zenoh経由の
/nodes/list応答を取り込みつつ、一定時間ハートビートが無いノードを自動削除。
9.4 ノード管理
# 新しいノードの登録
node_info = {
"address": "192.168.1.100",
"port": 8001,
"capabilities": ["gpu", "cpu"],
"resources": {
"cpu_cores": 8,
"memory_gb": 16,
"gpu_count": 1
}
}
success = client.register_coordination_node("worker_node_1", node_info)
print(f"✓ Node registration: {'successful' if success else 'failed'}")
# 登録ノードの確認
cluster_status = client.get_cluster_status()
print("Registered nodes:")
for node_id, node_data in cluster_status.get('nodes', {}).items():
print(f" {node_id}: {node_data.get('capabilities', [])}")
# ノードの解除
success = client.unregister_coordination_node("worker_node_1")
print(f"✓ Node unregistration: {'successful' if success else 'failed'}")
9.5 高度な協調シナリオ
# 複数タスクの協調実行
tasks = [
{
"type": "model_training",
"payload": {"model": "bert", "dataset": "squad"}
},
{
"type": "data_processing",
"payload": {"operation": "preprocessing", "data_size": "large"}
},
{
"type": "inference",
"payload": {"model": "gpt2", "batch_size": 32}
}
]
task_ids = []
for task in tasks:
task_id = client.submit_coordination_task(task["type"], task["payload"])
task_ids.append(task_id)
print(f"✓ Submitted {task['type']} task: {task_id}")
# 全タスクの状態監視
completed_tasks = 0
while completed_tasks < len(task_ids):
for task_id in task_ids:
status = client.get_coordination_task_status(task_id)
if status and status['status'] == 'completed':
if task_id not in [t['id'] for t in completed_tasks]:
print(f"✓ Task {task_id} completed")
completed_tasks += 1
time.sleep(2)
print("All coordination tasks completed")
10. 高度な機能
9.1 スナップショット管理
# システムスナップショットの作成
try:
snapshot = client.create_snapshot(
snapshot_name="tutorial_backup",
include_models=True,
include_data=True,
compression_level=6
)
print(f"✓ Snapshot created: {snapshot}")
# スナップショット一覧
snapshots = client.list_snapshots()
print(f"Available snapshots: {len(snapshots)}")
# スナップショットの検証
if snapshots:
snapshot_path = snapshots[0].get('path')
validation = client.validate_snapshot(snapshot_path)
print(f"Snapshot validation: {validation}")
except Exception as e:
print(f"Snapshot operations not available: {e}")
9.2 スケーラビリティテスト
# スケーラビリティテストの実行
try:
scalability_test = client.run_scalability_test(
max_nodes=20,
test_duration=30
)
print(f"✓ Scalability test started: {scalability_test}")
# テスト結果取得
results = client.get_scalability_results()
print(f"Scalability results: {results}")
# ノードスケーラビリティテスト
node_test = client.test_node_scalability(
node_counts=[5, 10, 15, 20],
test_duration=60
)
print(f"Node scalability test: {node_test}")
except Exception as e:
print(f"Scalability testing not available: {e}")
9.3 Zenoh通信
# Zenoh接続
try:
zenoh_connection = client.connect_zenoh(node_id="tutorial_client")
print(f"✓ Connected to Zenoh: {zenoh_connection}")
# メッセージ送信
message = client.publish_zenoh_message(
topic="tutorial/test",
payload={"message": "Hello from SDK tutorial", "timestamp": time.time()},
priority="normal",
message_type="notification"
)
print(f"✓ Message published: {message}")
# リクエスト送信
request = client.send_zenoh_request(
target_node="brain_node_1",
request={"action": "get_status"},
timeout=5.0
)
print(f"✓ Request sent: {request}")
# 統計取得
zenoh_stats = client.get_zenoh_stats()
print(f"Zenoh stats: {zenoh_stats}")
except Exception as e:
print(f"Zenoh operations not available: {e}")
9.4 AEG-Comm通信最適化 ⭐ NEW (2026-01-23)
AEG-Commは分散脳シミュレーションにおける通信をインテリジェントに最適化する機能です。
# AEG-Comm設定
try:
# 通信最適化設定
config_result = client.set_aeg_comm_config(
node_id="brain_node_1",
enable_comm=True,
energy_threshold=10.0,
critical_modalities=["force", "safety", "text"],
force_change_threshold=10.0
)
print(f"✓ AEG-Comm configured: {config_result}")
# 通信統計取得
comm_stats = client.get_communication_stats(node_id="brain_node_1")
print(f"通信削減率: {comm_stats.get('reduction_rate', 0)}%")
print(f"送信パケット数: {comm_stats.get('sent_packets', 0)}")
print(f"ブロックされたパケット数: {comm_stats.get('blocked_packets', 0)}")
# AEG-Commステータス確認
status = client.get_aeg_comm_status(node_id="brain_node_1")
print(f"AEG-Comm status: {status}")
except Exception as e:
print(f"AEG-Comm operations not available: {e}")
9.5 コンセンサス操作
# コンセンサス決定の提案
try:
proposal = client.propose_consensus_decision(
decision_type="resource_allocation",
payload={"resource": "gpu", "amount": 50},
priority=2,
dependencies=["previous_decision_123"]
)
print(f"✓ Consensus proposal submitted: {proposal}")
# 決定結果取得
if 'proposal_id' in proposal:
result = client.get_consensus_result(proposal['proposal_id'], timeout=30)
print(f"✓ Consensus result: {result}")
# ノード状態更新
node_update = client.update_node_status(node_id="node_1", active=True)
print(f"✓ Node status updated: {node_update}")
# コンセンサス統計
stats = client.get_consensus_stats()
print(f"Consensus stats: {stats}")
except Exception as e:
print(f"Consensus operations not available: {e}")
10. Jupyter Notebook統合
10.1 Jupyter環境での初期化
# Jupyter NotebookでのSDK使用
<!-- モジュール 'evospikenet' が見つかりません。パッケージ内の移動/名前変更を確認してください -->
<!-
jupyter_client = JupyterAPIClient()
# 表示モード設定
jupyter_client.set_display_mode("html") # "html", "json", "text"
# サーバー情報表示
jupyter_client.show_server_info()
# 統計表示
jupyter_client.show_stats()
10.2 Jupyterマジックコマンド
Jupyter Notebookで以下のマジックコマンドを使用:
# セルマジックを使用したテキスト生成
%%evospikenet_generate 100
人工知能の最新トレンドについて説明してください。
# ラインマジックを使用した接続
%evospikenet_connect http://localhost:8000
# 統計表示
%evospikenet_stats
# サーバー情報表示
%evospikenet_info
10.3 インタラクティブなプロンプト検証
# インタラクティブなプロンプト検証
test_prompt = "これはテストプロンプトです。"
is_valid = jupyter_client.validate_prompt_interactive(test_prompt)
print(f"Prompt valid: {is_valid}")
# 有効な場合のみ処理続行
if is_valid:
result = jupyter_client.generate(test_prompt, show_output=True)
まとめ
このチュートリアルでは、EvoSpikeNet SDKの基本的な使用方法から高度な機能までを学習しました。主要なポイント:
- 基本セットアップ: クライアントの初期化とサーバー接続
- テキスト生成: 単純およびバッチ生成
- マルチモーダル処理: 画像・音声付きプロンプト
- 分散脳シミュレーション: 状態監視と結果取得
- アーティファクト管理: アップロード・ダウンロード・管理
- エラーハンドリング: 堅牢なエラー処理とリトライ
- バッチ処理: 効率的な複数処理
- 監視: パフォーマンスと統計の監視
- 分散Coordinator: Zenoh DDS + Raftコンセンサスによる分散協調
- 高度な機能: スナップショット、Zenoh、AEG-Comm、コンセンサス
- Jupyter統合: ノートブック環境での使用
これらの機能を組み合わせることで、EvoSpikeNetの機能を最大限に活用できます。実際のアプリケーション開発では、適切なエラーハンドリングと監視を実装してください。
次のステップ:
- APIリファレンスで詳細な仕様を確認
- サンプルコードで実践的な例を参照
- 設定ガイドで高度な設定を学習