EvoSpikeNet SDK API Reference
Copyright: 2026 Moonlight Technologies Inc.
Author: Masahiro Aoki
Last updated: February 20, 2026 🎯 Feature 13 + Spatial Generative Neural Integration + Feature 36/39/40 (Automatic Recovery/Audit Log/Geographically Distributed Node Management)
overview
This document provides a complete API reference for the EvoSpikeNet Python SDK. The SDK is implemented as a REST API client and supports features such as text generation, distributed brain simulation, spatial cognitive processing (Rank 12-15 Feature 13), artifact management, and RAG/vector search (versioned index).
Class structure
EvoSpikeNetAPIClient
This is the main API client class. Provides HTTP request management, error handling, and connection pooling.
Constructor
EvoSpikeNetAPIClient(
base_url: str = "http://localhost:8000",
spatial_base_url: Optional[str] = None,
rag_base_url: Optional[str] = None,
api_key: Optional[str] = None,
timeout: int = 60,
max_retries: int = 3,
session: Optional[requests.Session] = None,
)
Parameters:
- base_url: API server base URL
- spatial_base_url: Base URL of spatial generation service (FastAPI version). If not specified, base_url is used.
- rag_base_url: Base URL of RAG service. If not specified, base_url is used.
- api_key: API authentication key
- timeout: Request timeout (seconds)
- max_retries: Maximum number of retries
- session: Custom requests session
Basic methods
generate(prompt: str, max_length: int = 50) -> Dict[str, str]
Perform text generation.
Parameters:
- prompt: input prompt
- max_length: Maximum number of characters to generate
Return value: Dictionary containing the generated results
example:```python result = client.generate("Hello, world!", max_length=100) print(result['generated_text'])
### LocalMemoryClient (offline helper)
A lightweight wrapper to try out the long-term memory module and spike compression layer locally without using the HTTP API.
```python
from evospikenet.sdk import LocalMemoryClient
import torch
client = LocalMemoryClient(state_dim=16, time_steps=6, embedding_dim=16, max_traces=32)
spikes = torch.rand(6, 16)
summary = client.store_spike_episode(
spike_sequence=spikes,
context={"scenario": "demo"},
action="look",
outcome="noted",
reward=0.2,
semantic_tags=["demo_concept"],
)
stats = client.reservoir_stats()
result = client.retrieve({"scenario": "demo", "semantic_tags": ["demo_concept"]}, top_k=1)
Related implementations: evospikenet/snn_memory_extension.py, evospikenet/long_term_memory.py, evospikenet/forgetting_controller.py.
submit_prompt(prompt: Optional[str] = None, image_path: Optional[str] = None, audio_path: Optional[str] = None) -> Dict
Send multimodal prompts to distributed brain simulations.
Operation memo (2026-04-20):
- The ASR execution path in a distributed environment depends on the server environment variable VIDEO_ANALYSIS_ASR_BACKEND (asr_fallback / whisper_real).
- Whisper model/device is controlled by VIDEO_ANALYSIS_WHISPER_MODEL / VIDEO_ANALYSIS_WHISPER_DEVICE.
- This change is an extension of the server configuration and does not change the signature of the SDK method.
Parameters:
- prompt: text prompt
- image_path: Image file path
- audio_path: Audio file path
Return value: Dictionary containing send confirmation
get_simulation_status() -> Dict
Get the current state of the distributed brain simulation.
Return value: Dictionary containing the states of all nodes
get_simulation_result() -> Dict
Get the latest results for completed simulations.
Return value: Dictionary containing the response, or None if no result is available
poll_for_result(timeout: int = 120, interval: int = 5) -> Optional[Dict[str, Any]]
Poll until results are available.
Parameters:
- timeout: Maximum waiting time (seconds)
- interval: Polling interval (seconds)
Return value: Result dictionary, or None on timeout
Evolution/Genome Management Methods
list_genomes() -> List[Dict[str, Any]]
List saved genomes and get metadata such as name / generation / fitness.
get_genome(genome_name: str) -> Dict[str, Any]
Gets the payload of the specified genome file.
save_genome(genome_name: str, genome: Dict[str, Any], make_active: bool = False) -> Dict[str, Any]
Save your genome. Promote it to active_genome.json with make_active=True.
apply_genome(genome_name: str) -> Dict[str, Any]
Activates an existing genome (no payload editing).
Usage example:```python from evospikenet.sdk import EvoSpikeNetAPIClient
Example: use EvoSpikeNetAPIClient to manage genomes
client = EvoSpikeNetAPIClient(base_url="http://localhost:8000") genomes = client.list_genomes() if genomes: target = genomes[0]["name"] payload = client.get_genome(target) payload.setdefault("metadata", {})["note"] = "edited via SDK" client.save_genome(target, payload, make_active=True) client.apply_genome(target) else: print("No genomes found on the server")
Related sample: `examples/genome_management_sdk.py`
#### Artifact management methods
##### upload_artifact(session_id: str, artifact_type: str, name: str, file: io.BytesIO, **metadata) -> Dict
Upload the artifact file.
**Parameters:**
- `session_id`: Session ID
- `artifact_type`: Artifact type ('model', 'log', 'config', etc.)
- `name`: Artifact name
- `file`: file buffer to upload
- `**metadata`: Additional metadata (llm_type, node_type, model_category, model_variant)
**Return value:** Dictionary containing the upload results
##### list_artifacts(artifact_type: Optional[str] = None) -> Union[List[Dict[str, Any]], Dict[str, Any]]
List artifacts.
**Parameters:**
- `artifact_type`: Artifact type to filter
**Returns:** List or dictionary of artifacts
##### download_artifact(artifact_id: str, destination_path: str) -> None
Download the artifact.
**Parameters:**
- `artifact_id`: ID of the artifact to download
- `destination_path`: Destination file path
#### Log management methods
##### create_log_session(description: str) -> Dict
Create a new log session.
**Parameters:**
- `description`: Session description
**Return value:** Session information
##### get_remote_log(user: str, ip: str, key_path: str, log_file_path: str) -> Dict
Get logs from remote nodes.
**Parameters:**
- `user`: SSH username
- `ip`: IP address of remote host
- `key_path`: SSH private key path
- `log_file_path`: Absolute path of remote log file
**Return value:** Dictionary containing log contents
#### Batch processing methods
##### batch_generate(prompts: List[str], max_length: int = 50) -> List[Dict[str, str]]
Process multiple prompts sequentially.
**Parameters:**
- `prompts`: list of prompts to handle
- `max_length`: maximum number of characters for each generation
**Returns:** Results list for each prompt
#### Helper methods
##### health_check() -> Dict[str, Any]
Performs a health check on the API server.
**Returns:** Dictionary containing health states
## Multi-language binding
The Python SDK includes lightweight proxy implementations for Go and TypeScript. These are originally planned to have Go/JS implementations in separate repositories,
It mimics the functionality so that it can be easily `import` in a Python environment.
### Go SDK Proxy
```python
from evospikenet import sdk_go
client = sdk_go.init_client("http://localhost:8000", api_key="KEY")
path = client.download_file("foo.bin", dest="/tmp/foo.bin")
| Method | Description |
|---|---|
init_client(endpoint: str, api_key: Optional[str]=None) |
Returns a Go client object |
GoSDKClient.download_file(filename, dest=None, timeout=60) |
Get file from EvoSpikeNet API |
TypeScript SDK Proxy
from evospikenet import sdk_ts
client = sdk_ts.initClient("https://api", apiKey=None)
client.downloadFile("foo", dest="out.bin")
| Method | Description |
|---|---|
initClient(endpoint: str, apiKey: Optional[str]=None) |
Returns a TS/JS client object |
TSSDKClient.downloadFile(filename, dest=None, timeout=60) |
Same as above (camelCase naming) |
These clients internally use requests for HTTP communication and are available for use in Python tests and documentation examples.
node_discovery_health() -> Dict[str, Any]
Get the current node list and summary from the node discovery service. Internally it calls the /api/node-discovery/health endpoint.
Return value: Dictionary containing node states (nodes, summary, updated_at, etc.)
node_discovery_topology() -> Dict[str, Any]
Obtain network topology information from the node discovery service. Internally it calls the /api/node-discovery/topology endpoint.
Returns: Topology dictionary containing nodes and edges
spatial_generate(input_text: str, cognitive_context: Optional[Dict[str, Any]] = None, eeg_data: Optional[Dict[str, Any]] = None, timeout: int = 30) -> Dict[str, Any]
Call the spatial generation service /generate to get the scene specifications (scene_type, objects, spatial_layout, physics, lighting, navigation, metadata, etc.).
spatial_health(timeout: int = 10) -> Dict[str, Any]
Call the spatial generation service /health to get the status of neural_components_available, components, metrics, knowledge_base_loaded, etc.
rag_upload_file(
file_path: str,
stream: bool = False,
background: bool = False,
init: bool = False,
session_id: Optional[str] = None,
final: bool = False,
timeout: int = 120,
) -> Dict[str, Any]
Submit the file to the RAG service /upload_file and receive the doc_key and version.
Additional flags allow control of temporary sessions and background jobs:
stream: Operates the document parser in streaming modebackground: Parse and index processing as a Celery background job (The backend generatesjob_idand can be tracked withupload_status)
Note: Specify the query parameter job_id to search for the same instance.
When you call the API, the progress of the chunk being processed is reflected in upload_jobs.
This makes it easy to incorporate notification and logging hooks.
- init: Starts a multipart upload session, allowing subsequent parts with session_id and final to build the same version.
- session_id/final: Add a file part to an existing session and end the session with final=True in the final part.
Automatically on the server side:
- MIME/extension validation
- Document analysis (PDF/Word/Excel/PPT/Markdown, etc.)
- Token-based automatic chunking (
chunk_text_auto) - Embed generation ・ Index to Milvus/Elasticsearch
- Version control (history with
doc_key)
Streaming and split uploads are more memory efficient for large (1GB+) documents; Background mode eliminates the need to wait for processing.
Return value example: {"doc_key":"sample.pdf","version":3,"chunks_indexed":42,...}
rag_upload_file_async(
file_path: str,
stream: bool = False,
background: bool = False,
init: bool = False,
session_id: Optional[str] = None,
final: bool = False,
timeout: int = 120,
) -> Dict[str, Any]
Asynchronous counterpart to :func:rag_upload_file offering the same
parameter set and return structure. Useful when integrating with
asyncio-based workflows.
create_batch_job(payload: Dict[str, Any], timeout: int = 30) -> Dict[str, Any]
Create a new batch ingestion job on the RAG service. The payload may
include arbitrary job parameters such as file location, parsing options,
metadata, etc. The server responds with a job_id and initial status
(queued).
cancel_batch_job(job_id: str, timeout: int = 10) -> Dict[str, Any]
Request cancellation of an existing batch job previously created via
:func:create_batch_job. The returned dictionary echoes the job_id
and updated status (cancelled).
rag_query(query: str, llm_type: str = "huggingface", hf_model_name: Optional[str] = None, timeout: int = 60) -> Dict[str, Any]
Execute search and generation using RAG service /query. Returns context (hit document) and optionally sdk_response (EvoSDK generated result). The server simultaneously generates a patch for comparing differences between versions, and when combined with /document_chunks, a difference UI can be built on the client side.
Note: Large-document streaming ingestion (split‑upload) and advanced diff viewer support are detailed in docs/RAG_SYSTEM_DETAILED.md §7.6.
rag_query(query: str, llm_type: str = "huggingface", hf_model_name: Optional[str] = None, timeout: int = 60) -> Dict[str, Any]
Execute search and generation using RAG service /query. Returns context (hit document) and optionally sdk_response (EvoSDK generated result).
get_document_versions(doc_key: str) -> List[Dict[str, Any]]
Returns a list of available versions of the document. Each item includes version, checksum, chunk_count, and indexed_at.
rag_init_session(file_path: str, timeout: int = 30) -> Dict[str, Any]
Start a new upload session. Returns a session identifier along with the
determined doc_key and tentative version. Use this id for subsequent
the following API calls:
rag_upload_part(session_id: str, file_path: str, final: bool = False, timeout: int = 120) -> Dict[str, Any]
Upload a single chunk belonging to an open session. final=True signals
the last part and causes cleanup of the session state on the server.
upload_status(job_id: str) -> Dict[str, Any]
Check status of an asynchronous upload job (see /upload_file?background=true).
Returns a dictionary with job_id, status (queued, running, completed,
failed), and optional progress percentage.
get_document_chunks(doc_key: str, version: int) -> List[Dict[str, Any]]
Returns the chunks (ordered) associated with the version of the specified doc_key. Each chunk has a chunk_id, content, and source_filename.
get_server_info() -> Optional[Dict]
Get server information and loaded models.
Return value: Server information, or None if unavailable
wait_for_server(timeout: int = 60, interval: int = 2) -> bool
Wait until the API server is healthy.
Parameters:
- timeout: Maximum waiting time (seconds)
- interval: Health check interval (seconds)
Return value: True if the server is now healthy
validate_prompt(prompt: str) -> bool
Verify that the prompt is suitable for sending.
Parameters:
- prompt: Prompt to validate
Returns: True if enabled
with_error_handling(func: Callable[..., Any], args, retries: int = 3, *kwargs) -> Any
Executes a function with automatic retry.
Parameters:
- func: Function to execute
- retries: Number of retries
- *args: positional arguments to pass to the function
- **kwargs: Keyword arguments to pass to the function
Return value: Return value of the function, or None on failure
Statistics/monitoring methods
get_stats() -> Dict[str, Any]
Get client usage statistics.
Return value: Statistics (number of requests, number of errors, average delay, etc.)
reset_stats() -> None
Reset statistics counters.
Delay monitoring method
get_latency_stats() -> Dict
Get delay statistics for all components.
Return value: Latency statistics data
check_latency_target() -> Dict
Check if delay target is met.
Return value: Check result details
Snapshot management methods
create_snapshot(snapshot_name: str, include_models: bool = True, include_data: bool = True, compression_level: int = 6) -> Dict
Create a snapshot of your system.
Parameters:
- snapshot_name: Snapshot name
- include_models: Include models?
- include_data: Include data?
- compression_level: Compression level (0-9)
Return value: Creation result
list_snapshots() -> Dict
List available snapshots.
Return value: Snapshot list
restore_snapshot(snapshot_path: str, restore_models: bool = True, restore_data: bool = True) -> Dict
Restore from snapshot.
Parameters:
- snapshot_path: Path of snapshot to restore
- restore_models: Restore models?
- restore_data: Restore data?
Return value: Restoration result
delete_snapshot(snapshot_path: str) -> Dict
Delete the snapshot.
Parameters:
- snapshot_path: Path of snapshot to delete
Return value: Deletion result
validate_snapshot(snapshot_path: str) -> Dict
Validate the snapshot.
Parameters:
- snapshot_path: Path of snapshot to verify
Return value: Verification result
cleanup_snapshots(max_age_days: int = 30) -> Dict
Clean up old snapshots.
Parameters:
- max_age_days: Maximum number of days to keep
Return value: Cleanup result
Scalability test method
run_scalability_test(max_nodes: int = 50, test_duration: int = 30) -> Dict
Run scalability tests.
Parameters:
- max_nodes: Maximum number of nodes
- test_duration: Test duration (seconds)
Return value: Test result
get_scalability_results() -> Dict
Get scalability test results.
Return value: Test result data
get_scalability_status() -> Dict
Get the scalability test status.
Return value: Current state
test_node_scalability(node_counts: List[int], test_duration: float = 60.0) -> Dict
Test scalability with different number of nodes.
Parameters:
- node_counts: list of number of nodes to test
- test_duration: Duration of each test
Return value: Test result
get_resource_usage() -> Dict
Get resource usage.
Return value: Resource usage statistics
run_stress_test(intensity: str = "high", duration: float = 120.0) -> Dict
Run a stress test.
Parameters:
- intensity: Test intensity ("low", "medium", "high")
- duration: Test duration (seconds)
Return value: Test result
get_system_limits() -> Dict
Get system limits.
Return value: System limit information
Hardware optimization methods
optimize_model(model_type: str, optimizations: Optional[List[str]] = None) -> Dict
Optimize the model for your hardware.
Parameters:
- model_type: Model type
- optimizations: list of optimizations to apply
Return value: Optimization result
benchmark_model(model_type: str, num_runs: int = 50) -> Dict[str, Any]
Benchmark your model.
Parameters:
- model_type: Model type to benchmark
- num_runs: Number of executions
Return value: Benchmark result
get_hardware_info() -> Dict
Get hardware information.
Return value: Hardware information
High availability monitoring methods
get_availability_status() -> Dict
Get availability status.
Return value: Availability state
get_availability_stats(time_window: str = "24h") -> Dict
Get availability statistics.
Parameters:
- time_window: statistical time window (e.g. "24h", "7d")
Return value: Availability statistics
perform_health_check() -> Dict
Run a health check.
Return value: Health check result
trigger_recovery_action(action_type: str, parameters: Optional[Dict[str, Any]] = None) -> Dict
Trigger a recovery action.
Parameters:
- action_type: Action type
- parameters: action parameters
Return value: Action result
get_availability_alerts(limit: int = 50) -> Dict
Get availability alerts.
Parameters:
- limit: upper limit on the number of alerts to get
Return value: Alert list
schedule_maintenance(start_time: str, duration_minutes: int, reason: str) -> Dict
Schedule maintenance.
Parameters:
- start_time: Start time (ISO format)
- duration_minutes: Duration (minutes)
- reason: Maintenance reason
Return value: Schedule result
Asynchronous Zenoh communication methods
connect_zenoh(node_id: str = "api_node") -> Dict
Connect to Zenoh network.
Parameters:
- node_id: Node identifier
Return value: Connection result
publish_zenoh_message(topic: str, payload: Any, priority: str = "normal", message_type: str = "notification", node_id: str = "api_node") -> Dict
Publish Zenoh messages.
Parameters:
- topic: message topic
- payload: Message payload
- priority: message priority
- message_type: Message type
- node_id: Source node ID
Return value: Publish result
send_zenoh_request(target_node: str, request: Any, timeout: float = 5.0, node_id: str = "api_node") -> Dict
Submit a Zenoh request.
Parameters:
- target_node: Target node ID
- request: request data
- timeout: Timeout (seconds)
- node_id: Source node ID
Return value: Response
send_zenoh_notification(target_nodes: List[str], notification: Any, priority: str = "normal", node_id: str = "api_node") -> Dict
Send Zenoh notifications.
Parameters:
- target_nodes: list of target node IDs
- notification: notification data
- priority: Notification priority
- node_id: Source node ID
Return value: Transmission result
get_zenoh_stats(node_id: str = "api_node") -> Dict
Get Zenoh statistics.
Parameters:
- node_id: Node ID
Return value: Zenoh statistics
set_aeg_comm_config(node_id: str, enable_comm: bool = True, energy_threshold: float = 10.0, critical_modalities: List[str] = None, force_change_threshold: float = 10.0) -> Dict
Configure AEG-Comm communication optimization settings.
Parameters:
- node_id: Target node ID
- enable_comm: Enable/disable communication optimization
- energy_threshold: Energy threshold
- critical_modalities: Important modalities list
- force_change_threshold: Force change threshold
Return value: Setting result
get_communication_stats(node_id: str = "api_node") -> Dict
Get AEG-Comm communication statistics.
Parameters:
- node_id: Node ID
Return value: Communication statistics (sending rate, reduction rate, delay, error rate)
get_aeg_comm_status(node_id: str = "api_node") -> Dict
Get the current status of AEG-Comm.
Parameters:
- node_id: Node ID
Return value: AEG-Comm status (number of active nodes, energy level, number of critical packets)
Distributed consensus method
propose_consensus_decision(decision_type: str, payload: Any, priority: int = 1, dependencies: Optional[List[str]] = None) -> Dict
Recommend a consensus decision.
Parameters:
- decision_type: Decision type
- payload: decision payload
- priority: priority
- dependencies: list of dependencies
Return value: Suggestion result
get_consensus_result(proposal_id: str, timeout: float = 30.0) -> Dict
Get consensus results.
Parameters:
- proposal_id: proposal ID
- timeout: Wait timeout (seconds)
Return value: Consensus result
update_node_status(node_id: str, active: bool) -> Dict
Update node state.
Parameters:
- node_id: Node ID
- active: Active state
Return value: Update result
get_consensus_stats() -> Dict
Get consensus statistics.
Return value: Statistical data
cleanup_consensus(max_age: float = 300.0) -> Dict
Clean up old consensus data.
Parameters:
- max_age: Maximum time to retain (seconds)
Return value: Cleanup result
Artifact extension methods
get_session_artifacts(session_id: str) -> Any
Get session artifacts.
Parameters:
- session_id: Session ID
Return value: Artifact data
get_artifact(artifact_id: str, destination_path: Optional[str] = None) -> bytes
Get the artifact.
Parameters:
- artifact_id: Artifact ID
- destination_path: Destination path (optional)
Return value: Artifact data
Async methods
generate_async(prompt: str, max_length: int = 50) -> Dict[str, str]
Asynchronous text generation.
submit_prompt_async(prompt: Optional[str] = None, image_path: Optional[str] = None, audio_path: Optional[str] = None) -> Dict
Asynchronous multimodal prompting.
health_check_async() -> Dict[str, Any]
Asynchronous health checks.
Jupyter integration class
JupyterAPIClient
Extended client for Jupyter Notebook environments.
Additional methods
set_display_mode(mode: str) -> None
Set the output display mode.
Parameters:
- mode: Display mode ("html", "json", "text")
show_server_info() -> None
Displays server information in rich format.
show_stats() -> None
Display client statistics in rich format.
validate_prompt_interactive(prompt: str) -> bool
Validate the prompt interactively.
WebSocket Client
WebSocketClient
WebSocket client for real-time communication.
Constructor
WebSocketClient(
ws_url: str = "ws://localhost:8000/ws",
reconnect_attempts: int = 5,
reconnect_delay: float = 2.0,
)
Method
connect() -> None
Establish a WebSocket connection.
disconnect() -> None
Close the WebSocket connection.
send_message(message: Dict[str, Any]) -> None
Send a message.
receive_message() -> Dict[str, Any]
Receive messages.
register_handler(message_type: str, handler: Callable) -> None
Register a message handler.
listen() -> None
Listen for messages.
Error handling
EvoSpikeNetAPIError
API-related exception class.
Attributes
error_info: ErrorInfo objectmessage: error messagestatus_code: HTTP status code (optional)details: Additional error details
ErrorInfo
Data class that stores error information.
Attributes
error_type: Error typemessage: error messagedetails: Additional detailsretry_after: Wait time before retry (seconds)status_code: HTTP status code
L5 Self-Evolution/Self-Healing API
Last updated: 2026-03-11 (v2.2.0)
Core classes for L5 self-evolving systems. A reference for using genomes, evolution engines, fitness evaluation, and memory management with "direct Python import."
GenomePool — evospikenet.genome_pool
from evospikenet.genome_pool import GenomePool, SelectionConfig, MutationConfig, CrossoverConfig
from evospikenet.snapshot_recovery import SnapshotManager
Constructor
GenomePool(
population_size: int = 50,
selection_config: Optional[SelectionConfig] = None,
mutation_config: Optional[MutationConfig] = None,
crossover_config: Optional[CrossoverConfig] = None,
seed: Optional[int] = None,
snapshot_manager: Optional[SnapshotManager] = None, # v2.2.0 new
)
Parameters:
- population_size: Population size
- selection_config: Selection strategy settings (tournament / roulette / rank)
- mutation_config: Mutation settings (mutation rate, structural mutation rate, etc.)
- crossover_config: Crossover settings (uniform / single_point / two_point)
- snapshot_manager: SnapshotManager instance that creates automatic snapshots after generation update
evolve_generation() -> PopulationStats
Asynchronous method that evolves one generation and returns statistics.
If snapshot_manager is set, create_snapshot("post_generation_{N}") is automatically called after generation update.
Usage example:```python import asyncio from evospikenet.genome_pool import GenomePool from evospikenet.snapshot_recovery import SnapshotManager
manager = SnapshotManager(snapshot_dir="/tmp/evo_snapshots") pool = GenomePool(population_size=50, snapshot_manager=manager) pool.initialize_population()
async def run(): for _ in range(10): pool.update_fitness(pool.genomes[0].genome_id, 0.95) stats = await pool.evolve_generation() print(f"Gen {stats.generation}: best={stats.max_fitness:.3f}")
asyncio.run(run())
---
### `MutationEngine` — `evospikenet.evolution_engine`
```python
from evospikenet.evolution_engine import MutationEngine
from evospikenet.advanced_mutations import AdvancedMutationEngine, AdaptiveMutationConfig
Constructor
MutationEngine(
base_mutation_rate: float = 0.05,
mutation_strength: float = 0.1,
advanced_engine: Optional[AdvancedMutationEngine] = None, # v2.2.0 new
)
Parameters:
- base_mutation_rate: Mutation probability for each Rengen
- mutation_strength: Magnitude of parameter change
- advanced_engine: Structural mutation engine that calls apply_mutations() after basic mutations are completed. If None, only the conventional basic mutation is executed.
mutate_genome(genome: EvoGenome) -> EvoGenome
Apply mutations to the entire genome and return copies.
If advanced_engine is set, apply advanced structural mutations (add/delete layers, skip connections, pruning, etc.) after basic mutations.
Usage example:```python from evospikenet.evolution_engine import MutationEngine from evospikenet.advanced_mutations import AdvancedMutationEngine, AdaptiveMutationConfig
advanced = AdvancedMutationEngine(AdaptiveMutationConfig(base_rate=0.1)) engine = MutationEngine(base_mutation_rate=0.05, advanced_engine=advanced)
mutated = engine.mutate_genome(genome)
---
### `AdvancedMutationEngine` — `evospikenet.advanced_mutations`
```python
from evospikenet.advanced_mutations import (
AdvancedMutationEngine, AdaptiveMutationConfig, MutationType
)
Constructor
AdvancedMutationEngine(config: AdaptiveMutationConfig = AdaptiveMutationConfig())
apply_mutations(genome, mutation_types=None, performance_data=None) -> EvoGenome
Applying 10 types of structural mutations to the genome.
MutationType |
Content |
|---|---|
ADD_LAYER_SMART |
Insert a layer between several layers and automatically update the connection matrix |
REMOVE_LAYER_SMART |
Remove least important layer, bridge connection |
ADD_SKIP_CONNECTION |
Add longest skip connection |
REMOVE_SKIP_CONNECTION |
Randomly remove skip connections |
PRUNE_CONNECTIONS |
Cut off two connections with the lowest 20% importance |
GROW_CONNECTIONS |
Add 10% connections starting from short distance |
MODULE_DUPLICATION |
Duplicate module (chromosome) |
MODULE_FUSION |
Combine two modules |
ADAPTIVE_LAYER_SIZE |
Increase or decrease layer size depending on activity |
RECURRENT_CONNECTION |
Add recursive connection |
update_mutation_rates(generation, fitness_improvements)
Adaptively updates the mutation rate according to the success rate of each mutation type.
FitnessEvaluator — evospikenet.fitness_evaluator
from evospikenet.fitness_evaluator import FitnessEvaluator, TaskBenchmark
_evaluate_robustness(genome, brain) -> Dict[str, float]
Replaced placeholder with implementation in v2.2.0.
If brain is provided:```
ノイズ注入テスト (σ=0.2)
クリーン入力とノイズ入力の出力差分 → noise_resistance
障害許容性テスト 中間層の重みを10%マスク後に出力差分 → failure_tolerance
**If `brain` is `None`:**```
構造的プロキシ
スキップ接続数 → noise_resistance (0.5 + skips * 0.05)
再帰接続数 → stability (0.6 + recurrent * 0.04)
層深度 → failure_tolerance (深すぎる/浅すぎると下がる)
Door value: {"noise_resistance": float, "failure_tolerance": float, "stability": float}
rollback_to_snapshot() — evospikenet.rollback
from evospikenet.rollback import rollback_to_snapshot, rollback_version
rollback_to_snapshot(snapshot_id, manager, target_path) -> Path
A convenience function that searches for a snapshot from SnapshotManager by ID and restores it to target_path.
Parameters:
- snapshot_id: str — Snapshot ID returned by SnapshotManager.create_snapshot()
- manager: SnapshotManager — instance that holds snapshot metadata
- target_path: str | Path — Restore destination path
True value: Path of the restored file
Exception: DataLoadError — if ID is unknown or file does not exist
Usage example:```python from evospikenet.rollback import rollback_to_snapshot from evospikenet.snapshot_recovery import SnapshotManager
manager = SnapshotManager(snapshot_dir="/tmp/evo_snapshots")
Get the snapshot list and restore the latest one
snapshots = manager.list_snapshots() if snapshots: latest = sorted(snapshots, key=lambda x: x["timestamp"], reverse=True)[0] restored = rollback_to_snapshot( snapshot_id=latest["id"], manager=manager, target_path="/tmp/restored_state.gz", ) print(f"復元完了: {restored}")
#### `rollback_version(version_entry, target_path) -> Path`
A low-level function that directly specifies and restores a history entry dictionary file.
**Parameters:**
- `version_entry: Dict` — Dictionary containing the `"path"` key
- `target_path: str | Path` — Restore destination path
---
## Data type
### Priority
Enumeration type representing priority.
- `LOW = "low"`
- `NORMAL = "normal"`
- `HIGH = "high"`
- `CRITICAL = "critical"`
### MessageType
An enumerated type representing the message type.
- `NOTIFICATION = "notification"`
- `REQUEST = "request"`
- `RESPONSE = "response"`
- `EVENT = "event"`
### OptimizationType
Enumeration representing optimization types.
- `SPEED = "speed"`
- `MEMORY = "memory"`
- `ACCURACY = "accuracy"`
- `POWER = "power"`
### ArtifactType
Enum type representing artifact type.
- `MODEL = "model"`
- `LOG = "log"`
- `CONFIG = "config"`
- `DATA = "data"`
- `METRICS = "metrics"`
## Usage example
### Basic usage example
```python
<!-- TODO: update or remove - import fail<!-- Remember: Automatic conversion not possible — please fix manually -->eNetAPIClient -->
# Client initialization
client = EvoSpikeNetAPIClient(base_url="http://localhost:8000")
# text generation
result = client.generate("Hello, world!", max_length=100)
print(result['generated_text'])
# multimodal processing
response = client.submit_prompt(
prompt="この画像を説明してください",
image_path="image.jpg"
)
result = client.poll_for_result(timeout=60)
print(result['response'])
Artifact Management
# Artifact upload
with open('model.pkl', 'rb') as f:
file_data = io.BytesIO(f.read())
result = client.upload_artifact(
session_id="session_123",
artifact_type="model",
name="trained_model",
file=file_data,
model_category="classification"
)
# Artifact list
artifacts = client.list_artifacts(artifact_type="model")
# artifact download
client.download_artifact("artifact_id_123", "downloaded_model.pkl")
Error handling
<!-- Module 'evospikenet' not found. Check moves/renames within the package -->
<!-<!-- Remember: Cannot convert automatically — please fix manually -->t.generate("Test prompt")
except EvoSpikeNetAPIError as e:
print(f"API Error: {e.error_info.message}")
if e.error_info.retry_after:
print(f"Retry after: {e.error_info.retry_after}s")
Usage in Jupyter Notebook
<!-- TODO: update<!-- Module 'evospikenet' not found. Please check moves/renames in the package -->kenet.sdk_jupyter import Jup<!-- Remember: Cannot convert automatically — please fix manually -->Configuration
client.set_display_mode("html")
# Server information display
client.show_server_info()
# Statistics display
client.show_stats()
Real-time communication using WebSockets
import asyncio
<!-- TODO: update or remove - impo<!-- Module 'evospikenet' not found. Please check the move/rename in the package -->WebSocketClient -->
async def main():
client = <!-- Please note: Cannot convert automatically — please fix manually -->e_message(msg):
print(f"Received: {msg}")
client.register_handler("notification", handle_message)
# Send message
await client.send_message({
"type": "notification",
"data": "Hello from SDK!"
})
# Start listening
await client.listen()
asyncio.run(main())
Best practices
- Connection Management: Reuse sessions in long-running applications.
- Error Handling: Always implement proper exception handling.
- Timeout: Please set an appropriate timeout according to your network conditions.
- Retry: Use automatic retry for temporary errors.
- Resource Management: Use streaming when working with large files.
- Authentication: Manage the API key using environment variables.
- Monitoring: Check client statistics regularly.
troubleshooting
Common issues
- Connection error: Please check if the API server is started.
- Authentication error: Please check if the API key is set correctly.
- Timeout: Please check network conditions and server load.
- Memory error: Use streaming when working with large files.
Debug information
# Check client statistics
stats = client.get_stats()
print(f"Requests: {stats['requests']}")
print(f"Errors: {stats['errors']}")
print(f"Average latency: {stats['average_latency']:.3f}s")
# Check server information
info = client.get_server_info()
if info:
print(f"Server version: {info.get('version')}")
print(f"Loaded models: {info.get('models', [])}")
Distributed coordinator method
init_coordinator(node_id: str, zenoh_config: Optional[Dict[str, Any]] = None, raft_config: Optional[Dict[str, Any]] = None) -> None
Initialize the distributed coordinator.
Parameters:
- node_id: Unique identifier for this node
- zenoh_config: Zenoh DDS configuration (optional)
- raft_config: Raft consensus configuration (optional)
example:```python
basic initialization
client.init_coordinator("node_1")
Initialization with custom settings
zenoh_config = {"connect": ["tcp/127.0.0.1:7447"]} raft_config = {"election_timeout": 5000} client.init_coordinator("node_1", zenoh_config, raft_config)
##### start_coordinator() -> None
Start the distributed coordinator.
**Exception:**
- `RuntimeError`: If coordinator is not initialized
**example:**```python
client.init_coordinator("node_1")
client.start_coordinator()
print("Coordinator started")
stop_coordinator() -> None
Stop the distributed coordinator.
Exception:
- RuntimeError: If coordinator is not initialized
example:```python client.stop_coordinator() print("Coordinator stopped")
##### submit_coordination_task(task_type: str, payload: Dict[str, Any]) -> str
Submit a distributed coordination task.
**Parameters:**
- `task_type`: Task type
- `payload`: task payload data
**Return value:** Task ID
**Exception:**
- `RuntimeError`: If coordinator is not initialized
**example:**```python
task_id = client.submit_coordination_task(
"federated_learning",
{"model": "resnet50", "dataset": "cifar10"}
)
print(f"Task submitted: {task_id}")
Internal task implementation memo (SDK built-in simple version)
federated_learning: Average numeric field ofpayload['updates']and giveaggregated_parametersandaggregation_meta.distributed_inference:payload['inputs']/payload['batches']are converted into results as they are, andnode_idandstatus=completedare added to each entry.model_aggregation: Average theweightslist ofpayload['models']and giveaggregated_model['weights']andaggregation_meta.
get_coordination_task_status(task_id: str) -> Optional[Dict[str, Any]]
Get the status of the coordination task.
Parameters:
- task_id: Task ID to check
Return value: Task state information, or None if not found
Exception:
- RuntimeError: If coordinator is not initialized
example:```python status = client.get_coordination_task_status(task_id) if status: print(f"Task status: {status['status']}")
##### get_cluster_status() -> Dict[str, Any]
Get the current cluster state.
**Return value:** Cluster state information
**Exception:**
- `RuntimeError`: If coordinator is not initialized
**example:**```python
status = client.get_cluster_status()
print(f"Active nodes: {status['active_nodes']}")
print(f"Leader: {status['leader_id']}")
register_coordination_node(node: Union[NodeInfo, str], node_info: Optional[Dict[str, Any]] = None) -> bool
Register a new node to the coordination cluster.
Parameters:
- node: NodeInfo or node ID string
- node_info: Required when passing a string to node. A dictionary containing address, port, role, capabilities, etc.
Return value: True if registration was successful
Exception:
- RuntimeError: If coordinator is not initialized
example:```python
Pass NodeInfo directly
from datetime import datetime
node_info = NodeInfo( node_id="node_2", a=datetime.now(), capabilities=["gpu", "cpu"], ) success = client.register_coordination_node(node_info)
When passing ID + dict
success = client.register_coordination_node( "node_3", {"address": "192.168.1.101", "port": 9001, "role": "follower", "capabilities": ["cpu"]}, )
##### unregister_coordination_node(node_id: str) -> bool
Remove the node from the coordination cluster.
**Parameters:**
- `node_id`: Node ID to be released
**Return value:** True if release was successful
**Exception:**
- `RuntimeError`: If coordinator is not initialized
**example:**```python
success = client.unregister_coordination_node("node_2")
Structure details of return value
Return value of generate()
{
"generated_text": "生成されたテキスト",
"tokens": 150,
"latency_ms": 250
}
Return value of submit_prompt()
{
"prompt_id": "prompt_12345",
"status": "submitted",
"timestamp": "2026-02-17T10:30:45Z",
"processing_nodes": {
"visual": "Vision-1",
"spatial": "Spatial-12", # Feature 13: Rank 12 (Where pathway)
"integration": "Spatial-14" # Feature 13: Rank 14 (Integration)
}
}
Return value of poll_for_result()
{
"prompt_id": "prompt_12345",
"status": "completed",
"response": "応答テキスト",
"spatial_analysis": {
"rank_12_where": {
"positions": [[100, 200], [150, 250]],
"depth": [0.5, 0.7],
"latency_ms": 47
},
"rank_14_integration": {
"what_where_fusion": "結果",
"world_model": "..."
}
},
"processing_time_ms": 150
}
Return value of get_simulation_result()
{
"response": "応答内容",
"nodes_involved": [
"Spatial-12", "Spatial-13", "Spatial-14", "Spatial-15"
],
"timestamp": "2026-02-17T10:31:00Z"
}
Return value of batch_generate()
[
{
"generated_text": "テキスト1",
"tokens": 100,
"latency_ms": 200
},
{
"generated_text": "テキスト2",
"tokens": 150,
"latency_ms": 250
},
{
"error": "エラーメッセージ",
"prompt": "失敗したプロンプト"
}
]
Compatible with Feature 13 (spatial recognition/generation system)
EvoSpikeNet SDK supports Rank 12-15 spatial processing nodes (Feature 13).
| Node | Rank | Processing target | Delay target |
|---|---|---|---|
| SpatialWhereNode | 12 | Spatial position/depth estimation | <50ms |
| SpatialWhatNode | 13 | Object recognition/scene generation | <30ms |
| SpatialIntegrationNode | 14 | What-Where Integration/World Model | <50ms |
| SpatialAttentionControlNode | 15 | Saccade planning/attention control | <30ms |
Examples of spatial processing
# Perform spatial processing with multimodal input
response = client.submit_prompt(
prompt="画像内の物体の位置を説明してください",
image_path="./sample_image.jpg"
)
result = client.poll_for_result(timeout=60)
# Obtain processing results for Rank 12-15
if result and 'spatial_analysis' in result:
where_result = result['spatial_analysis'].get('rank_12_where')
integration_result = result['spatial_analysis'].get('rank_14_integration')
print(f"Detected positions: {where_result['positions']}")
print(f"Integrated analysis: {integration_result}")
Space generation service (neural version)
- Endpoint:
POST /generate - Input: JSON
input_text(str, <=4000 chars, required): Natural language description of the scene you want to generatecognitive_context(object, optional): Cognitive state hints such as attention/stress/engagement etc.eeg_data(object, optional): Apply LIF analysis when passing EEG raw data insignalsarraymodel_version(str, optional): The version of the high-precision model stored on the server. If it exists, an attempt is made to reload the model and thehigh_precisionoutput flag is updated.
- Validation: type checking and maximum length checking (4000 characters). Invalid input will result in
400 Bad Request. - Output: scene specification
scene_type,spatial_coordinates,spatial_layout,objects,dimensions,physics,lighting,navigationhigh_precision(bool): Whether a high precision model is currently loadedmodel_version(str): currently active model versionmetadata:confidence,processing_time,quantum_modulation_alpha,cognitive_entropy,encoding_norm,input_text,quality,quality_factor
Health Check
(Additional information)
- model_version: Version of the currently loaded spatial generation model.
- high_precision: Whether the above model is in high precision mode.
- Endpoint:
GET /health - Main fields:
status:running|error|degradeddevice:cpu|cudaneural_components_available: Are all components successfully initialized?- Availability boolean for
components: language_adapter / encoder_decoder / attention / plasticity / quantum / embodied_physics / lif_layer / izhikevich_layer metrics: request_count, error_rate, avg_latency_ms, uptime_secondsknowledge_base_loaded: Is the scene/object KB already loaded?last_spike_cached: Is the latest spike train cached?
Sample (direct HTTP)
curl -X POST http://localhost:8000/generate \
-H "Content-Type: application/json" \
-d '{
"input_text": "Calm indoor room with a table and soft light",
"cognitive_context": {"attention_level": 0.6, "stress_level": 0.2}
}'
curl http://localhost:8000/health | jq
SDK Wrapper
from evospikenet.sdk import EvoSpikeNetAPIClient
# Example of space generation using SDK
client = EvoSpikeNetAPIClient(spatial_base_url="http://localhost:8000")
scene = client.spatial_generate(
"Calm indoor room with a table and soft warm lighting",
cognitive_context={"attention_level": 0.6, "stress_level": 0.2},
)
health = client.spatial_health()
print(scene.get("metadata", {}).get("confidence"))
Quantum auxiliary reasoning
- Endpoint:
POST /quantum/infer - Input: JSON
scene(object): scene object returned by/generatespike_trains(tensor/list): Spike trains used for reinforcement (optional)
- Output: Extended scene with
metadata.quantum_modulation_alphaandcognitive_entropyadded to the givenscene
RAG system (versioned index)
- Base URL example:
http://localhost:8001 - Endpoint:
POST /upload_file: Verify file → Analyze → Autochunk → Embed → Store in Milvus+Elasticsearch. The response includesdoc_keyandversion.POST /query: Search and generate with {"query": "...", "llm_type": "huggingface"|"evosdk"}. Returns the hit document incontext, which may includesdk_response.
- Supported extensions (main):
.pdf,.docx,.doc,.pptx,.ppt,.xlsx,.xls,.txt,.md,.markdown,.gdoc,.html - Sample (enter docs/README.md and search):
export RAG_API_URL=${RAG_API_URL:-http://localhost:8001}
curl -X POST "$RAG_API_URL/upload_file" \
-F "file=@docs/README.md" | jq
curl -X POST "$RAG_API_URL/query" \
-H "Content-Type: application/json" \
-d '{"query": "What does this project do?", "llm_type": "huggingface"}' | jq
SDK Wrapper
from evospikenet.sdk import EvoSpikeNetAPIClient
# Example of SDK wrapper for RAG
client = EvoSpikeNetAPIClient(rag_base_url="http://localhost:8001")
# Input file
upload = client.rag_upload_file("docs/README.md")
print(upload.get("doc_key"))
# Query execution (using SDK's rag_query)
answer = client.rag_query("What does this project do?", llm_type="huggingface")
if answer:
ctx = answer.get("context", [])
if ctx:
print(ctx[0].get("text"))
Sample script:
- examples/spatial_generation_client.py — Simple client that hits /generate and /health.
- examples/rag_ingest_and_query.py — Search in /query after indexing in /upload_file.
- examples/rag_markdown_sdk.py — Input and search Markdown files via SDK.
Error handling
The SDK handles errors with the EvoSpikeNetAPIError exception.
from evospikenet.sdk import EvoSpikeNetAPIClient, EvoSpikeNetAPIError
client = EvoSpikeNetAPIClient()
try:
result = client.generate("Hello, world!")
except EvoSpikeNetAPIError as e:
print(f"Error type: {e.error_info.error_type}")
print(f"Details: {e.error_info.details}")
if e.error_info.retry_after:
print(f"Retry after {e.error_info.retry_after} seconds")
Implementation Guide
Recommended implementation pattern
1. Synchronous processing (simple script)
from evospikenet.sdk import EvoSpikeNetAPIClient
client = EvoSpikeNetAPIClient()
client.submit_prompt(prompt="テキスト生成クエリ")
# Get results by polling
result = client.poll_for_result(timeout=60, interval=5)
if result:
print(f"Response: {result.get('response')}")
sential request)
prompts = ["クエリ1", "クエリ2", "クエリ3"]
results = client.batch_generate(prompts, max_length=100)
for i, result in enumerate(results):
if 'error' not in result:
print(f"Result {i}: {result.get('generated_text')}")
Benefits: Easily handle multiple requests Applications: Test data generation, baseline experiments, dataset preparation
3. Asynchronous processing (high throughput)
See async_spatial_processing_example.py for details.
import asyncio
from evospikenet.sdk import EvoSpikeNetAPIClient
async def run_async_examples():
client = EvoSpikeNetAPIClient()
tasks = [
client.submit_prompt_async("クエリ1"),
client.submit_prompt_async("クエリ2"),
client.submit_prompt_async("クエリ3"),
]
responses = await asyncio.gather(*tasks)
print(responses)
asyncio.run(run_async_examples())
Advantages: High throughput, resource efficient Applications: Large-scale batch processing, production systems, microservices
Sample code
Basics: Feature 13 Spatial processing
# Synchronization sample (recommended: script/experiment)
python examples/spatial_processing_example.py
# Asynchronous samples (recommended: production/high throughput)
python examples/async_spatial_processing_example.py
Detailed features:
- Synchronization example spatial_processing_example.py:
- ✅ Text generation and spatial analysis
- ✅ Multimodal input (image + text)
- ✅ Batch processing
- ✅ Health check
- Async example
async_spatial_processing_example.py: - ✅ Concurrent tasks
- ✅ Rate limited sending
- ✅ Asynchronous health checks
- ✅ Error handling
Best Practices
1. Timeout management
# Recommended: Set appropriate timeouts
result = client.poll_for_result(
timeout=120, # 2 minutes
interval=5 # Poll every 5 seconds
)
2. Error handling
try:
result = client.submit_prompt(prompt="クエリ")
except EvoSpikeNetAPIError as e:
if e.error_info.error_type in ["timeout", "server_error"]:
# retryable error
pass
else:
# Errors that cannot be retried (input errors, etc.)
pass
3. Scaling concurrency
# Small scale (< 10 requests): synchronous processing
for prompt in small_list:
result = client.generate(prompt)
# Large scale (> 100 requests): Asynchronous processing
import asyncio
tasks = [client.generate_async(p) for p in large_list]
results = asyncio.run(asyncio.gather(*tasks))
4. Resource efficiency
# ✅ Good example: context manager
with client.create_session() as session:
for prompt in prompts:
result = client.submit_prompt(prompt)
# ❌ Avoid: New connections every time
for prompt in prompts:
new_client = EvoSpikeNetAPIClient() # inefficiency
Specifications and limitations
API Limits
| Item | Limit value |
|---|---|
| Maximum prompt length | 10,000 characters |
| Maximum growth | 5,000 characters |
| Timeout | 300 seconds (5 minutes) |
| Maximum batch size | 100 |
| Number of simultaneous connections | 1,000 |
| QPS (queries per second) | 1,000 |
Performance indicators (Feature 13)
| Metrics | Target value |
|---|---|
| Text generation | < 500ms |
| Spatial analysis | < 200ms (Rank 12-15) |
| Batch processing | 100/sec (100 requests/sec) |
| Availability | 99.9% |
| P99 Latency | 1,000ms |
Feature 36: Automatic recovery method
overview
AutoRecoveryEngine autonomously recovers the system through AI-based anomaly detection and automatic playbook execution. Target MTTR -80%.
List of API endpoints
| Method | Path | Description |
|---|---|---|
| GET | /api/recovery/status |
Get recovery engine status |
| GET | /api/recovery/incidents |
Get list of incidents |
| GET | /api/recovery/incidents/{id} |
Get specific incident |
| POST | /api/recovery/incidents/{id}/acknowledge |
Change incident to acknowledged |
| POST | /api/recovery/incidents/{id}/resolve |
Mark incident resolved |
| POST | /api/recovery/trigger |
Manually trigger abnormality diagnosis |
get_recovery_status()
import requests
resp = requests.get(
"http://localhost:8000/api/recovery/status",
headers={"X-API-Key": API_KEY}
)
status = resp.json()
# {
# "total_incidents": 5,
# "open_incidents": 1,
# "mttr_seconds": 120.4,
# "monitoring_interval_seconds": 30,
# }
list_incidents(status, severity, limit)
params = {"status": "open", "severity": "critical", "limit": 20}
resp = requests.get(
"http://localhost:8000/api/recovery/incidents",
headers={"X-API-Key": API_KEY},
params=params,
)
incidents = resp.json()["incidents"]
| Parameter | Type | Description |
|---|---|---|
status |
str | open / acknowledged / resolved / auto_resolved |
severity |
str | low / medium / high / critical |
limit |
int | Maximum number of acquisitions (default: 50) |
trigger_diagnosis(metrics)
payload = {
"cpu_percent": 95.0,
"memory_percent": 80.0,
"db_connected": False,
"error_rate": 0.15,
}
resp = requests.post(
"http://localhost:8000/api/recovery/trigger",
headers={"X-API-Key": API_KEY},
json=payload,
)
result = resp.json()
# { "status": "incident_created", "incident": { "id": "...", ... } }
# or
# { "status": "no_anomaly_detected" }
Complete sample code
→ auto_recovery_sdk.py
Feature 39: Audit log method
overview
AuditLogManager provides tamper-detected audit logs using SHA-256 hash chains. All HTTP requests are automatically logged.
List of API endpoints
| Method | Path | Description |
|---|---|---|
| GET | /api/audit/stats |
Get statistics |
| GET | /api/audit/logs |
Log search |
| POST | /api/audit/log |
Manual entry entry |
| GET | /api/audit/verify |
Hash chain verification |
| GET | /api/audit/export |
Log export (JSON/CSV) |
get_audit_stats()
resp = requests.get(
"http://localhost:8000/api/audit/stats",
headers={"X-API-Key": API_KEY}
)
stats = resp.json()
query_audit_logs(actor, action, resource, result, since, until, limit)
params = {
"actor": "admin_user",
"action": "model.train",
"result": "success",
"limit": 100,
}
resp = requests.get(
"http://localhost:8000/api/audit/logs",
headers={"X-API-Key": API_KEY},
params=params,
)
entries = resp.json()["entries"]
| Parameter | Type | Description |
|---|---|---|
actor |
str | Filter by operator |
action |
str | Action prefix match filter |
result |
str | success / failure / error |
since / until |
str | period ISO8601 |
limit |
int | Default: 100 |
write_audit_entry(action, actor, resource, result, detail)
resp = requests.post(
"http://localhost:8000/api/audit/log",
headers={"X-API-Key": API_KEY},
json={
"action": "model.deploy",
"actor": "ci_pipeline",
"resource": "/api/models/genome_v3",
"result": "success",
"detail": {"version": "3.2.1"},
},
)
# { "status": "logged", "entry_id": "...", "entry_hash": "sha256:..." }
verify_chain()
resp = requests.get(
"http://localhost:8000/api/audit/verify",
headers={"X-API-Key": API_KEY}
)
# { "valid": true, "checked": 1500, "error": null }
export_logs(format)
# JSON format
resp = requests.get("http://localhost:8000/api/audit/export",
headers={"X-API-Key": API_KEY}, params={"format": "json"})
logs_json = resp.json()
# CSV format
resp = requests.get("http://localhost:8000/api/audit/export",
headers={"X-API-Key": API_KEY}, params={"format": "csv"})
csv_text = resp.text
Complete sample code
→ audit_log_sdk.py
Feature 40: Geographically distributed node management method
*Note: There are partially implemented modules. API is available but Continuing to refine tests and documentation. *
overview
GeoNodeManager provides multi-region node management, automatic failover, and inter-region latency measurement.
List of API endpoints
| Method | Path | Description |
|---|---|---|
| GET | /api/geo/status |
Get overall status |
| GET/POST | /api/geo/regions |
Region list/registration |
| GET/DELETE | /api/geo/regions/{id} |
Region details/delete |
| GET/POST | /api/geo/nodes |
Node list/registration |
| DELETE | /api/geo/nodes/{id} |
Delete node |
| POST | /api/geo/failover |
Execute failover |
| GET | /api/geo/failover/history |
failover history |
| GET | /api/geo/latency-matrix |
latency matrix |
| GET/PUT | /api/geo/active-region |
Get/Change active region |
| GET | /api/geo/replication-group/{id} |
replication group |
register_node(node_id, region_id, endpoint, node_type)
resp = requests.post(
"http://localhost:8000/api/geo/nodes",
headers={"X-API-Key": API_KEY},
json={
"node_id": "prod-gpu-node-01",
"region_id": "ap-northeast-1",
"endpoint": "10.0.1.100:8000",
"node_type": "gpu",
},
)
# { "status": "registered", "node_id": "prod-gpu-node-01" }
trigger_failover(from_region, to_region, reason, triggered_by)
resp = requests.post(
"http://localhost:8000/api/geo/failover",
headers={"X-API-Key": API_KEY},
json={
"from_region": "ap-northeast-1",
"to_region": "us-east-1",
"reason": "region_unavailable",
"triggered_by": "monitoring_system",
},
)
# { "status": "failover_executed", "event": { ... } }
get_latency_matrix()
resp = requests.get(
"http://localhost:8000/api/geo/latency-matrix",
headers={"X-API-Key": API_KEY}
)
matrix = resp.json()["matrix"]
# matrix["ap-northeast-1"]["us-east-1"] == 145.3 (ms)
GeoRegion structure
{
"region_id": "ap-northeast-1",
"display_name": "Asia Pacific (Tokyo)",
"provider": "aws",
"latitude": 35.6762,
"longitude": 139.6503,
"priority": 1,
"status": "online",
"node_count": 4
}
Performance indicators (Feature 36/39/40)
| Metrics | Target value |
|---|---|
| Audit log write throughput | ≥ 1,000 entries/sec |
| Hash chain verification (1,000 results) | < 5,000 ms |
| Anomaly detection latency (p99) | < 10 ms |
| Latency matrix calculation (50 regions) | < 500 ms |
| Incident creation latency | < 100 ms |
| Automatic failover completion time | < 30 seconds |
Complete sample code
→ geo_node_manager_sdk.py
Biomimicry integration API (BrainSimulationFramework)
Updated date: 2026-03-06 Phase A/B all 11 items completed. Detailed evaluation:
docs-dev/biomimetic_integration_evaluation.mdv2.0 (8.7/10)
BrainSimulationFramework
Top-level integration class for the evospikenet.brain_simulation module. biomimetic/ Combine all modules (neuromodulation, STDP, sleep, circuits, cortical topology, DMN, etc.) with the SNN core.
Constructor
BrainSimulationFramework(
enable_biomimetic: bool = False,
config: Optional[BrainSimulationConfig] = None,
)
Parameters:
- enable_biomimetic: Set to True to initialize all biomimetic/ modules and enable full loop integration.
- config: Simulation configuration object (default if omitted).
Method
run_simulation(duration: int = 1000) -> Dict[str, Any]
Run the biomimetic 6-phase pipeline.
Phase:
1. DevelopmentalSchedule.plasticity_multiplier(t) — Developmental schedule
2. NeuralCircuitModeler.simulate_timestep() — Circuit simulation (Izhikevich compatible)
3. STDP × NeuromodulatorGate.gated_learning_rate() — Plasticity modulation
4. NodeEnergyBudget.energy_fitness_term() — Energy homeostasis
5. HippocampalBuffer.store(episode) — Episodic memory record
6. SleepConsolidation.offline_consolidation() — Sleep consolidation replay
Return value: Dictionary containing {phase_results, biomimetic_status, duration_ms}
run_idle_phase(duration_s: float = 10.0) -> Dict[str, Any]
Executes DMN (Default Mode Network) idle cycles asynchronously.
import asyncio
activities = asyncio.run(framework.run_idle_phase(duration_s=10.0))
Returns: Dictionary of DMN activity logs and spike records for each time step
biomimetic_status() -> Dict[str, Any]
Returns a state snapshot of all biomimetic modules.
status = framework.biomimetic_status()
# {
# "stdp_connected_gate": True,
# "sleep_consolidation_replay": True,
# "izhikevich_circuits": 1,
# "cortical_columns_registered": 0,
# "neuromodulator_registry_linked": True,
# "efference_copy_adaptive": True,
# "mirror_neuron_default_classifier": True,
# "dmn_idle_phase_available": True,
# }
NeuralCircuitModeler (Izhikevich backend)
Constructor
NeuralCircuitModeler(
config: NeuralCircuitConfig,
neuron_type: str = "lif", # "lif" | "izhikevich"
)
Parameters:
- neuron_type: Specifying "izhikevich" uses the IzhikevichNeuron.step() backend. Compatible with various firing patterns such as RS/IB/CH/FS/LTS.
simulate_timestep(input_current, t) -> Tuple[np.ndarray, np.ndarray]
Execute simulation for 1 time step (dt=1ms).
Return value: (spike_array, membrane_voltages)
BrainRegionIntegrator
An integrated management class for cortical topology and brain regions.
add_cortical_topology(generator, nx_cols, ny_cols) -> int
Register the column layouts generated by CorticalTopologyGenerator as BrainRegionConfig and set microcosmic connections between adjacent columns (distance ≤ √2 mm).
from evospikenet.biomimetic import CorticalTopologyGenerator
from evospikenet.brain_simulation import BrainRegionIntegrator
gen = CorticalTopologyGenerator()
integrator = BrainRegionIntegrator()
n = integrator.add_cortical_topology(gen, nx_cols=4, ny_cols=4)
print(n) # 16
Parameters:
- generator: CorticalTopologyGenerator instance
- nx_cols: Number of columns in the X direction
- ny_cols: Number of columns in Y direction
Return value: Number of registered columns (int)
STDP — Biomimetic extension method
STDP.with_neuromodulation(gate) -> STDP (class method)
Factory method. Create an STDP instance injected with NeuromodulatorGate.
gate = NeuromodulatorGate()
stdp = STDP.with_neuromodulation(gate)
STDP.connect_plasticity_gate(gate) -> None
Retrofit NeuromodulatorGate to an existing STDP instance.
stdp = STDP()
stdp.connect_plasticity_gate(gate)
SleepConsolidation — extension method
offline_consolidation(episodes, stdp) -> Dict[str, Any]
Consolidate episode lists with STDP replay learning.
stats = sleep.offline_consolidation(episodes=buffer.replay(), stdp=stdp)
# stats: {replayed_episodes, weight_updates_mean, replay_duration_ms}
Return value (stats): Dictionary containing replayed_episodes, weight_updates_mean, replay_duration_ms
NeuromodulatorGate — Registry cooperation method
connect_to_registry(registry) -> None
Establish a two-way bridge with the NeuromodulatorRegistry.
push_to_registry() -> None
Writes the current status of Gate (DA/ACh/OT level, etc.) to the Registry.
pull_from_registry() -> None
Reflect the Registry value to Gate state.
gate = NeuromodulatorGate()
registry = NeuromodulatorRegistry()
gate.connect_to_registry(registry)
gate.push_to_registry()
gate.pull_from_registry()
EfferenceCopy — adaptive gain method
adaptive_gain_update(prediction_error: float) -> float
Adaptively update the gain based on the prediction error.
reset() -> None
Resets the gain and internal status to initial values.
efference = EfferenceCopy()
gain = efference.adaptive_gain_update(prediction_error=0.3)
efference.reset()
Complete sample code
→ sdk_distributed_brain.py (see demonstrate_biomimetic_brain_simulation() function)
Test list
| Test file | Target |
|---|---|
tests/unit/test_biomimetic_init_api.py |
__init__.py All symbols |
tests/unit/test_stdp_neuromodulation.py |
STDP ↔ Gate wiring |
tests/unit/test_sleep_consolidation_stdp.py |
offline_consolidation() + stats |
tests/unit/test_efference_copy_adaptive.py |
Adaptive gain/reset() |
tests/unit/test_mirror_neurons_default_classify.py |
_default_classify() |
tests/integration/test_brain_simulation_biomimetic.py |
BrainSimulationFramework Full integration |
tests/integration/test_dmn_idle_phase.py |
run_idle_phase() / DMN stop confirmation |
# Batch execution of biomimetic tests with Docker
docker compose -f docker-compose.test.yml --profile biomimetic run --rm biomimetic-test
Biomimicry REST API Endpoint
Endpoints under /biomimetic/* manipulate the entire biomimetic module via HTTP.
It can be called by the requests library or by EvoSpikeNetAPIClient._make_request().
import requests
BASE_URL = "http://localhost:8000"
API_KEY = "your-api-key" # Can be an empty string if not required
HEADERS = {"X-API-Key": API_KEY} if API_KEY else {}
Endpoint list
| Method | Path | Description |
|---|---|---|
| GET | /biomimetic/status |
All module status snapshot |
| POST | /biomimetic/simulate |
Full brain simulation execution |
| POST | /biomimetic/neuromod/update |
Neuromodulator level update |
| POST | /biomimetic/reward |
TD reward signal sending |
| POST | /biomimetic/sleep/consolidate |
Offline memory consolidation (replay) |
| POST | /biomimetic/sleep/wake-config |
Sleep/wake cycle settings |
| POST | /biomimetic/framework/reset |
Framework reinitialization |
GET /biomimetic/status
Returns the current state of all biomimetic modules.
resp = requests.get(f"{BASE_URL}/biomimetic/status", headers=HEADERS)
data = resp.json()
# {
# "biomimetic_enabled": true,
# "neuromod_levels": {"dopamine": 0.5, "noradrenaline": 0.4, ...},
# "energy_budget": {"current_w": 9.2, "budget_w": 10.0},
# "sleep_stats": {"cycles_completed": 3, "replayed_events": 128},
# }
POST /biomimetic/simulate
Run the simulation by specifying all the parameters of NeuralCircuitConfig.
payload = {
# NeuralCircuitConfig
"num_neurons": 1000,
"connection_probability": 0.1,
"excitatory_ratio": 0.8,
"inhibitory_ratio": 0.2,
"refractory_period": 5,
"membrane_time_constant": 20.0,
# BrainSimulationFramework
"enable_biomimetic": True,
"development_epoch": 0,
"total_epochs": 1000,
"energy_budget_w": 10.0,
# simulation control
"duration": 1000,
"plasticity_rule": "stdp", # "stdp" | "bcm" | "oja"
"plasticity_interval": 10,
"sleep_every": 500,
"sleep_cycles": 3,
"neuron_type": "lif", # "lif" | "izhikevich"
}
resp = requests.post(f"{BASE_URL}/biomimetic/simulate", headers=HEADERS, json=payload)
data = resp.json()
# {
# "status": "ok",
# "duration_ms": 3842,
# "spikes_total": 18423,
# "biomimetic_status": { ... }
# }
| Parameters | Type | Default | Description |
|---|---|---|---|
num_neurons |
int | 1000 | number of circuit neurons |
connection_probability |
float | 0.1 | connection probability |
excitatory_ratio |
float | 0.8 | excitatory neuron ratio |
inhibitory_ratio |
float | 0.2 | inhibitory neuron ratio |
refractory_period |
int | 5 | Refractory period (ms) |
membrane_time_constant |
float | 20.0 | Membrane time constant (ms) |
enable_biomimetic |
bool | true | Enable biomimetic module |
development_epoch |
int | 0 | current developmental epoch |
total_epochs |
int | 1000 | total number of epochs |
energy_budget_w |
float | 10.0 | Node energy limit (W) |
duration |
int | 1000 | simulation length (ms) |
plasticity_rule |
str | "stdp" | plasticity rule |
plasticity_interval |
int | 10 | Plasticity update interval (steps) |
sleep_every |
int | 500 | Sleep phase start timing (steps) |
sleep_cycles |
int | 3 | number of sleep cycles |
neuron_type |
str | "lif" | Neuron model type |
POST /biomimetic/neuromod/update
Instantly updates neuromodulator levels such as dopamine.
payload = {
"dopamine": 0.8, # Rewards/motivation (0.0–1.0)
"noradrenaline": 0.4, # Awakening/attention
"acetylcholine": 0.6, # Memory/Learning
"serotonin": 0.5, # Mood/impulse control
"oxytocin": 0.3, # social bonds
"emotion_factor": 0.7, # general emotional intensity
"motivation_factor": 0.9, # Motivation to action
}
resp = requests.post(f"{BASE_URL}/biomimetic/neuromod/update", headers=HEADERS, json=payload)
data = resp.json()
# {
# "message": "Neuromodulator levels updated",
# "updated_levels": {"dopamine": 0.8, "serotonin": 0.5, ...}
# }
| Parameter | Type | Description |
|---|---|---|
dopamine |
float | Dopamine value (0.0–1.0) |
noradrenaline |
float | noradrenaline value |
acetylcholine |
float | acetylcholine value |
serotonin |
float | serotonin value |
oxytocin |
float | oxytocin value |
emotion_factor |
float | emotion intensity factor |
motivation_factor |
float | motivation factor |
POST /biomimetic/reward
TD Sends reward signals to update the dopamine system module.
payload = {
"reward": 1.0, # Reward value (-1.0 to 1.0)
"td_error": 0.35, # time difference error
"update_neuromod": True, # Do you want to instantly update your dopamine system?
}
resp = requests.post(f"{BASE_URL}/biomimetic/reward", headers=HEADERS, json=payload)
data = resp.json()
# {
# "status": "ok",
# "dopamine_level": 0.73,
# }
POST /biomimetic/sleep/consolidate
STDP is applied by replaying the memory traces accumulated in the hippocampal buffer offline.
payload = {
"sleep_cycles": 3, # Number of sleep cycles to perform
"replay_buffer_size": 256, # Replay buffer size (number of spikes)
"stdp_lr": 0.01, # STDP learning rate (during consolidation)
}
resp = requests.post(f"{BASE_URL}/biomimetic/sleep/consolidate", headers=HEADERS, json=payload)
data = resp.json()
# {
# "status": "ok",
# "cycles_completed": 3,
# "replayed_events": 128,
# }
POST /biomimetic/sleep/wake-config
Update the schedule parameters of SleepWakeCycleController.
payload = {
"awake_phase_steps": 500, # Number of steps in awakening phase
"sleep_phase_steps": 100, # Number of sleep phase steps
"sleep_cycles_per_epoch": 3, # Number of cycles per epoch
}
resp = requests.post(f"{BASE_URL}/biomimetic/sleep/wake-config", headers=HEADERS, json=payload)
data = resp.json()
# {
# "status": "ok",
# "config_applied": {"awake_phase_steps": 500, ...}
# }
POST /biomimetic/framework/reset
Reinitializes BrainSimulationFramework and resets the state of all modules.
payload = {
"enable_biomimetic": True, # Should biomimetics be enabled after reset?
}
resp = requests.post(f"{BASE_URL}/biomimetic/framework/reset", headers=HEADERS, json=payload)
data = resp.json()
# {
# "status": "ok",
# "message": "BrainSimulationFramework reinitialized",
# }
Calling via EvoSpikeNetAPIClient
Biomimetic endpoints that do not have a dedicated method can be called with _make_request().
SDK session features (automatic retries, header management) are available.
from evospikenet.sdk import EvoSpikeNetAPIClient
client = EvoSpikeNetAPIClient(base_url="http://localhost:8000", api_key=API_KEY)
# GET /biomimetic/status
status = client._make_request("GET", f"{client.base_url}/biomimetic/status")
# POST /biomimetic/neuromod/update
result = client._make_request(
"POST",
f"{client.base_url}/biomimetic/neuromod/update",
json={"dopamine": 0.8, "serotonin": 0.5},
)
Complete sample code
→ sdk_biomimetic_rest_api.py
# Execute all endpoints in pipeline
python examples/sdk/sdk_biomimetic_rest_api.py --base-url http://localhost:8000
# Individual demo execution
python examples/sdk/sdk_biomimetic_rest_api.py --demo simulate
python examples/sdk/sdk_biomimetic_rest_api.py --demo neuromod
python examples/sdk/sdk_biomimetic_rest_api.py --demo sleep
Phase E-3 Connectome production/automatic synchronization API
Last updated: 2026-03-19 (Phase E-3 fully completed)
sync_connectome — automatic synchronization pipeline (E-3-1)
apply_delta(base_path, delta_path, result_path)
Apply delta JSON to existing NPZ and atomically generate new NPZ.
from scripts.sync_connectome import apply_delta
apply_delta(
base_path="data/connectome/cache/visual.npz",
delta_path="data/connectome/delta_v42.json",
result_path="data/connectome/cache/visual_v43.npz",
)
Differential JSON Schema:```json { "schema_version": "1.0", "materialization_version_from": 42, "materialization_version_to": 43, "added": [ {"pre_id": 100, "post_id": 200, "weight": 0.15, "delay_ms": 1.5} ], "removed": [ {"pre_id": 50, "post_id": 80} ] }
#### `apply_delta_with_validation(base_path, delta_path, result_path, *, rollback_dir, ei_ratio_range)`
Apply difference with E/I ratio verification. If out of range, generate a backup in `rollback_dir` and raise `ConnectomeSyncValidationError`.
```python
from scripts.sync_connectome import apply_delta_with_validation, ConnectomeSyncValidationError
try:
apply_delta_with_validation(
base_path="visual.npz",
delta_path="delta.json",
result_path="visual_new.npz",
rollback_dir="data/connectome/rollback/",
ei_ratio_range=(3.5, 5.0),
)
except ConnectomeSyncValidationError as e:
print(f"E/I検証失敗・ロールバック完了: {e}")
sync_connectome(config_path, cache_path, output_path, *, dry_run, ...)
Fully automatic synchronization orchestrator. Recommended starting from CAVE API delta acquisition to application.
from scripts.sync_connectome import sync_connectome
result = sync_connectome(
config_path="config/connectome_config.yaml",
cache_path="data/connectome/cache/visual.npz",
output_path="data/connectome/cache/visual.npz",
dry_run=False,
)
print(result["status"]) # "success" | "dry_run" | "no_update"
CLI Usage Examples```bash
Normal execution
python scripts/sync_connectome.py \ --config config/connectome_config.yaml \ --cache data/connectome/cache/visual.npz \ --output data/connectome/cache/visual.npz
Dry run (display results without updating the output file)
python scripts/sync_connectome.py --dry-run \ --cache data/connectome/cache/visual.npz \ --output /dev/null
---
### brain_routing — HCP Latency Aware Zenoh Routing (E-3-3)
#### `compute_delay_matrix(manifest, config_path)`
The inter-node delay matrix is constructed from the HCP actual measurements.
```python
from evospikenet.brain_routing import compute_delay_matrix
from evospikenet.connectome import build_manifest
manifest = build_manifest("data/connectome/")
delay_matrix = compute_delay_matrix(
manifest=manifest,
config_path="config/connectome_config.yaml",
)
# {"pfc": {"memory_spike": 12.0, "visual": 9.0, ...}, ...}
print(delay_matrix["pfc"]["memory_spike"]) # 12.0 ms
build_hcp_routing_table(config_path)
Generate a routing table with a full pipeline.
from evospikenet.brain_routing import build_hcp_routing_table
routing = build_hcp_routing_table("config/connectome_config.yaml")
# routing["delay_matrix"] — {src: {dst: ms}}
# routing["routing_plan"]["routing_edges"] — priority descending edge list
# routing["zenoh_topics"] — "brain_routing/delays/{node_id}" list
HCPDelayRouter
Class that publishes a delay profile to a Zenoh session. Also works with session=None (log only).
from evospikenet.brain_routing import HCPDelayRouter
# Tests that work without a Zenoh session
# If session=None, the routing function is only logs
router = HCPDelayRouter(session=None, config_path="config/connectome_config.yaml")
router.load_routing_table()
# Assign delay profile to data payload
data = {"spikes": [1, 0, 1], "timestamp": 1.0}
enriched = router.apply_hcp_delays(node_id="pfc", data=data)
# {"spikes": [...], "timestamp": 1.0, "routing_delays": {"memory_spike": 12.0, ...}}
# Issuing delay information to all nodes at once
# Zenoh topic: brain_routing/delays/{node_id}
router.publish_all()
auto_node_mapper — Auto Node Mapper CLI
map_connectome(input_path, output_dir, config_path, *, dry_run, seed)
Divide the connectome into node-based NPZs.
from scripts.auto_node_mapper import map_connectome
result = map_connectome(
input_path="data/connectome/flywire_visual.json",
output_dir="data/connectome/nodes/",
config_path="config/connectome_config.yaml",
dry_run=False,
seed=42,
)
print(f"マッピング完了: {result.total_nodes} ノード中 {result.mapped_nodes} 成功")
print(f"マニフェスト: {result.manifest_path}")
for entry in result.entries:
print(f" {entry.node_type}: {entry.n_neurons} neurons, E/I={entry.ei_ratio:.2f}, {entry.status}")
Output structure:``` data/connectome/nodes/ ├── visual.npz ← 視覚野ノード用圧縮 NPZ ├── memory_spike.npz ← 記憶ノード用 └── node_manifest.yaml ← 全ノードメタデータ
**node_manifest.yaml schema:**```yaml
schema_version: "1.0"
generated_at: "2026-03-19T10:00:00Z"
base_dataset: "flywire_visual"
nodes:
visual:
npz_path: data/connectome/nodes/visual.npz
n_neurons: 1024
ei_ratio: 4.1
coarsening_method: stratified_sample
status: ok
memory_spike:
npz_path: data/connectome/nodes/memory_spike.npz
n_neurons: 512
ei_ratio: 4.0
coarsening_method: spectral_coarsen
status: ok
CLI Usage Examples```bash
full mapping
python scripts/auto_node_mapper.py \ --input data/connectome/flywire_visual.json \ --output-dir data/connectome/nodes/ \ --config config/connectome_config.yaml
Dry run/fixed constant for reproducibility
python scripts/auto_node_mapper.py \ --input data/connectome/flywire_visual.json \ --output-dir data/connectome/nodes/ \ --dry-run --seed 42
---
### Complete Connectome E-3 Step-by-Step Sample
Typical workflow from full initialization to regular sync:
```python
import evospikenet as esn
from scripts.auto_node_mapper import map_connectome
from scripts.sync_connectome import sync_connectome
from evospikenet.brain_routing import HCPDelayRouter
from evospikenet import load_connectome_npz, apply_connectome_to_layer, ConnectomeLIFLayer
# --- Step 1: Connectome → Divide into NPZ by node ---
result = map_connectome(
input_path="data/connectome/flywire_visual.json",
output_dir="data/connectome/nodes/",
config_path="config/connectome_config.yaml",
seed=42,
)
print(f"E-3 Auto Node Mapper: {result.mapped_nodes} nodes mapped")
# --- Step 2: Inject per-node NPZ into ConnectomeLIFLayer ---
visual_data = load_connectome_npz("data/connectome/nodes/visual.npz")
layer = ConnectomeLIFLayer(num_neurons=visual_data["n_neurons"], device="cpu")
apply_connectome_to_layer(visual_data, layer)
print(f"structural_mask: {layer.structural_mask.shape}, ei_ratio={layer.ei_ratio:.2f}")
# --- Step 3: Construct HCP delay routing ---
router = HCPDelayRouter(session=None, config_path="config/connectome_config.yaml")
router.load_routing_table()
router.publish_all() # Without Zenoh, only logs
# --- Step 4: Automatic synchronization (obtain differences from CAVE API) ---
sync_result = sync_connectome(
config_path="config/connectome_config.yaml",
cache_path="data/connectome/nodes/visual.npz",
output_path="data/connectome/nodes/visual.npz",
dry_run=True, # First check with a dry run
)
print(f"sync status: {sync_result['status']}")
Complete walkthrough sample: connectome_e3_demo.py
````