Skip to content

EvoSpikeNet SDK API Reference

Author: Masahiro Aoki

Last updated: February 20, 2026 🎯 Feature 13 + Spatial Generative Neural Integration + Feature 36/39/40 (Automatic Recovery/Audit Log/Geographically Distributed Node Management)

overview

This document provides a complete API reference for the EvoSpikeNet Python SDK. The SDK is implemented as a REST API client and supports features such as text generation, distributed brain simulation, spatial cognitive processing (Rank 12-15 Feature 13), artifact management, and RAG/vector search (versioned index).

Class structure

EvoSpikeNetAPIClient

This is the main API client class. Provides HTTP request management, error handling, and connection pooling.

Constructor

EvoSpikeNetAPIClient(
    base_url: str = "http://localhost:8000",
    spatial_base_url: Optional[str] = None,
    rag_base_url: Optional[str] = None,
    api_key: Optional[str] = None,
    timeout: int = 60,
    max_retries: int = 3,
    session: Optional[requests.Session] = None,
)

Parameters: - base_url: API server base URL - spatial_base_url: Base URL of spatial generation service (FastAPI version). If not specified, base_url is used. - rag_base_url: Base URL of RAG service. If not specified, base_url is used. - api_key: API authentication key - timeout: Request timeout (seconds) - max_retries: Maximum number of retries - session: Custom requests session

Basic methods

generate(prompt: str, max_length: int = 50) -> Dict[str, str]

Perform text generation.

Parameters: - prompt: input prompt - max_length: Maximum number of characters to generate

Return value: Dictionary containing the generated results

example:```python result = client.generate("Hello, world!", max_length=100) print(result['generated_text'])

### LocalMemoryClient (offline helper)

A lightweight wrapper to try out the long-term memory module and spike compression layer locally without using the HTTP API.

```python
from evospikenet.sdk import LocalMemoryClient
import torch

client = LocalMemoryClient(state_dim=16, time_steps=6, embedding_dim=16, max_traces=32)
spikes = torch.rand(6, 16)
summary = client.store_spike_episode(
    spike_sequence=spikes,
    context={"scenario": "demo"},
    action="look",
    outcome="noted",
    reward=0.2,
    semantic_tags=["demo_concept"],
)
stats = client.reservoir_stats()
result = client.retrieve({"scenario": "demo", "semantic_tags": ["demo_concept"]}, top_k=1)

Related implementations: evospikenet/snn_memory_extension.py, evospikenet/long_term_memory.py, evospikenet/forgetting_controller.py.

submit_prompt(prompt: Optional[str] = None, image_path: Optional[str] = None, audio_path: Optional[str] = None) -> Dict

Send multimodal prompts to distributed brain simulations.

Operation memo (2026-04-20): - The ASR execution path in a distributed environment depends on the server environment variable VIDEO_ANALYSIS_ASR_BACKEND (asr_fallback / whisper_real). - Whisper model/device is controlled by VIDEO_ANALYSIS_WHISPER_MODEL / VIDEO_ANALYSIS_WHISPER_DEVICE. - This change is an extension of the server configuration and does not change the signature of the SDK method.

Parameters: - prompt: text prompt - image_path: Image file path - audio_path: Audio file path

Return value: Dictionary containing send confirmation

get_simulation_status() -> Dict

Get the current state of the distributed brain simulation.

Return value: Dictionary containing the states of all nodes

get_simulation_result() -> Dict

Get the latest results for completed simulations.

Return value: Dictionary containing the response, or None if no result is available

poll_for_result(timeout: int = 120, interval: int = 5) -> Optional[Dict[str, Any]]

Poll until results are available.

Parameters: - timeout: Maximum waiting time (seconds) - interval: Polling interval (seconds)

Return value: Result dictionary, or None on timeout

Evolution/Genome Management Methods

list_genomes() -> List[Dict[str, Any]]

List saved genomes and get metadata such as name / generation / fitness.

get_genome(genome_name: str) -> Dict[str, Any]

Gets the payload of the specified genome file.

save_genome(genome_name: str, genome: Dict[str, Any], make_active: bool = False) -> Dict[str, Any]

Save your genome. Promote it to active_genome.json with make_active=True.

apply_genome(genome_name: str) -> Dict[str, Any]

Activates an existing genome (no payload editing).

Usage example:```python from evospikenet.sdk import EvoSpikeNetAPIClient

Example: use EvoSpikeNetAPIClient to manage genomes

client = EvoSpikeNetAPIClient(base_url="http://localhost:8000") genomes = client.list_genomes() if genomes: target = genomes[0]["name"] payload = client.get_genome(target) payload.setdefault("metadata", {})["note"] = "edited via SDK" client.save_genome(target, payload, make_active=True) client.apply_genome(target) else: print("No genomes found on the server")

Related sample: `examples/genome_management_sdk.py`

#### Artifact management methods

##### upload_artifact(session_id: str, artifact_type: str, name: str, file: io.BytesIO, **metadata) -> Dict

Upload the artifact file.

**Parameters:**
- `session_id`: Session ID
- `artifact_type`: Artifact type ('model', 'log', 'config', etc.)
- `name`: Artifact name
- `file`: file buffer to upload
- `**metadata`: Additional metadata (llm_type, node_type, model_category, model_variant)

**Return value:** Dictionary containing the upload results

##### list_artifacts(artifact_type: Optional[str] = None) -> Union[List[Dict[str, Any]], Dict[str, Any]]

List artifacts.

**Parameters:**
- `artifact_type`: Artifact type to filter

**Returns:** List or dictionary of artifacts

##### download_artifact(artifact_id: str, destination_path: str) -> None

Download the artifact.

**Parameters:**
- `artifact_id`: ID of the artifact to download
- `destination_path`: Destination file path

#### Log management methods

##### create_log_session(description: str) -> Dict

Create a new log session.

**Parameters:**
- `description`: Session description

**Return value:** Session information

##### get_remote_log(user: str, ip: str, key_path: str, log_file_path: str) -> Dict

Get logs from remote nodes.

**Parameters:**
- `user`: SSH username
- `ip`: IP address of remote host
- `key_path`: SSH private key path
- `log_file_path`: Absolute path of remote log file

**Return value:** Dictionary containing log contents

#### Batch processing methods

##### batch_generate(prompts: List[str], max_length: int = 50) -> List[Dict[str, str]]

Process multiple prompts sequentially.

**Parameters:**
- `prompts`: list of prompts to handle
- `max_length`: maximum number of characters for each generation

**Returns:** Results list for each prompt

#### Helper methods

##### health_check() -> Dict[str, Any]

Performs a health check on the API server.

**Returns:** Dictionary containing health states


## Multi-language binding

The Python SDK includes lightweight proxy implementations for Go and TypeScript. These are originally planned to have Go/JS implementations in separate repositories,
It mimics the functionality so that it can be easily `import` in a Python environment.

### Go SDK Proxy

```python
from evospikenet import sdk_go
client = sdk_go.init_client("http://localhost:8000", api_key="KEY")
path = client.download_file("foo.bin", dest="/tmp/foo.bin")

Method Description
init_client(endpoint: str, api_key: Optional[str]=None) Returns a Go client object
GoSDKClient.download_file(filename, dest=None, timeout=60) Get file from EvoSpikeNet API

TypeScript SDK Proxy

from evospikenet import sdk_ts
client = sdk_ts.initClient("https://api", apiKey=None)
client.downloadFile("foo", dest="out.bin")
Method Description
initClient(endpoint: str, apiKey: Optional[str]=None) Returns a TS/JS client object
TSSDKClient.downloadFile(filename, dest=None, timeout=60) Same as above (camelCase naming)

These clients internally use requests for HTTP communication and are available for use in Python tests and documentation examples.

node_discovery_health() -> Dict[str, Any]

Get the current node list and summary from the node discovery service. Internally it calls the /api/node-discovery/health endpoint.

Return value: Dictionary containing node states (nodes, summary, updated_at, etc.)

node_discovery_topology() -> Dict[str, Any]

Obtain network topology information from the node discovery service. Internally it calls the /api/node-discovery/topology endpoint.

Returns: Topology dictionary containing nodes and edges

spatial_generate(input_text: str, cognitive_context: Optional[Dict[str, Any]] = None, eeg_data: Optional[Dict[str, Any]] = None, timeout: int = 30) -> Dict[str, Any]

Call the spatial generation service /generate to get the scene specifications (scene_type, objects, spatial_layout, physics, lighting, navigation, metadata, etc.).

spatial_health(timeout: int = 10) -> Dict[str, Any]

Call the spatial generation service /health to get the status of neural_components_available, components, metrics, knowledge_base_loaded, etc.

rag_upload_file(
file_path: str,
stream: bool = False,
background: bool = False,
init: bool = False,
session_id: Optional[str] = None,
final: bool = False,
timeout: int = 120,

) -> Dict[str, Any]

Submit the file to the RAG service /upload_file and receive the doc_key and version. Additional flags allow control of temporary sessions and background jobs:

  • stream: Operates the document parser in streaming mode
  • background: Parse and index processing as a Celery background job (The backend generates job_id and can be tracked with upload_status)

Note: Specify the query parameter job_id to search for the same instance. When you call the API, the progress of the chunk being processed is reflected in upload_jobs. This makes it easy to incorporate notification and logging hooks. - init: Starts a multipart upload session, allowing subsequent parts with session_id and final to build the same version. - session_id/final: Add a file part to an existing session and end the session with final=True in the final part.

Automatically on the server side:

  1. MIME/extension validation
  2. Document analysis (PDF/Word/Excel/PPT/Markdown, etc.)
  3. Token-based automatic chunking (chunk_text_auto)
  4. Embed generation ・ Index to Milvus/Elasticsearch
  5. Version control (history with doc_key)

Streaming and split uploads are more memory efficient for large (1GB+) documents; Background mode eliminates the need to wait for processing.

Return value example: {"doc_key":"sample.pdf","version":3,"chunks_indexed":42,...}

rag_upload_file_async(
file_path: str,
stream: bool = False,
background: bool = False,
init: bool = False,
session_id: Optional[str] = None,
final: bool = False,
timeout: int = 120,

) -> Dict[str, Any]

Asynchronous counterpart to :func:rag_upload_file offering the same parameter set and return structure. Useful when integrating with asyncio-based workflows.

create_batch_job(payload: Dict[str, Any], timeout: int = 30) -> Dict[str, Any]

Create a new batch ingestion job on the RAG service. The payload may include arbitrary job parameters such as file location, parsing options, metadata, etc. The server responds with a job_id and initial status (queued).

cancel_batch_job(job_id: str, timeout: int = 10) -> Dict[str, Any]

Request cancellation of an existing batch job previously created via :func:create_batch_job. The returned dictionary echoes the job_id and updated status (cancelled).

rag_query(query: str, llm_type: str = "huggingface", hf_model_name: Optional[str] = None, timeout: int = 60) -> Dict[str, Any]

Execute search and generation using RAG service /query. Returns context (hit document) and optionally sdk_response (EvoSDK generated result). The server simultaneously generates a patch for comparing differences between versions, and when combined with /document_chunks, a difference UI can be built on the client side.

Note: Large-document streaming ingestion (split‑upload) and advanced diff viewer support are detailed in docs/RAG_SYSTEM_DETAILED.md §7.6.

rag_query(query: str, llm_type: str = "huggingface", hf_model_name: Optional[str] = None, timeout: int = 60) -> Dict[str, Any]

Execute search and generation using RAG service /query. Returns context (hit document) and optionally sdk_response (EvoSDK generated result).

get_document_versions(doc_key: str) -> List[Dict[str, Any]]

Returns a list of available versions of the document. Each item includes version, checksum, chunk_count, and indexed_at.

rag_init_session(file_path: str, timeout: int = 30) -> Dict[str, Any]

Start a new upload session. Returns a session identifier along with the determined doc_key and tentative version. Use this id for subsequent the following API calls:

rag_upload_part(session_id: str, file_path: str, final: bool = False, timeout: int = 120) -> Dict[str, Any]

Upload a single chunk belonging to an open session. final=True signals the last part and causes cleanup of the session state on the server.

upload_status(job_id: str) -> Dict[str, Any]

Check status of an asynchronous upload job (see /upload_file?background=true). Returns a dictionary with job_id, status (queued, running, completed, failed), and optional progress percentage.

get_document_chunks(doc_key: str, version: int) -> List[Dict[str, Any]]

Returns the chunks (ordered) associated with the version of the specified doc_key. Each chunk has a chunk_id, content, and source_filename.

get_server_info() -> Optional[Dict]

Get server information and loaded models.

Return value: Server information, or None if unavailable

wait_for_server(timeout: int = 60, interval: int = 2) -> bool

Wait until the API server is healthy.

Parameters: - timeout: Maximum waiting time (seconds) - interval: Health check interval (seconds)

Return value: True if the server is now healthy

validate_prompt(prompt: str) -> bool

Verify that the prompt is suitable for sending.

Parameters: - prompt: Prompt to validate

Returns: True if enabled

with_error_handling(func: Callable[..., Any], args, retries: int = 3, *kwargs) -> Any

Executes a function with automatic retry.

Parameters: - func: Function to execute - retries: Number of retries - *args: positional arguments to pass to the function - **kwargs: Keyword arguments to pass to the function

Return value: Return value of the function, or None on failure

Statistics/monitoring methods

get_stats() -> Dict[str, Any]

Get client usage statistics.

Return value: Statistics (number of requests, number of errors, average delay, etc.)

reset_stats() -> None

Reset statistics counters.

Delay monitoring method

get_latency_stats() -> Dict

Get delay statistics for all components.

Return value: Latency statistics data

check_latency_target() -> Dict

Check if delay target is met.

Return value: Check result details

Snapshot management methods

create_snapshot(snapshot_name: str, include_models: bool = True, include_data: bool = True, compression_level: int = 6) -> Dict

Create a snapshot of your system.

Parameters: - snapshot_name: Snapshot name - include_models: Include models? - include_data: Include data? - compression_level: Compression level (0-9)

Return value: Creation result

list_snapshots() -> Dict

List available snapshots.

Return value: Snapshot list

restore_snapshot(snapshot_path: str, restore_models: bool = True, restore_data: bool = True) -> Dict

Restore from snapshot.

Parameters: - snapshot_path: Path of snapshot to restore - restore_models: Restore models? - restore_data: Restore data?

Return value: Restoration result

delete_snapshot(snapshot_path: str) -> Dict

Delete the snapshot.

Parameters: - snapshot_path: Path of snapshot to delete

Return value: Deletion result

validate_snapshot(snapshot_path: str) -> Dict

Validate the snapshot.

Parameters: - snapshot_path: Path of snapshot to verify

Return value: Verification result

cleanup_snapshots(max_age_days: int = 30) -> Dict

Clean up old snapshots.

Parameters: - max_age_days: Maximum number of days to keep

Return value: Cleanup result

Scalability test method

run_scalability_test(max_nodes: int = 50, test_duration: int = 30) -> Dict

Run scalability tests.

Parameters: - max_nodes: Maximum number of nodes - test_duration: Test duration (seconds)

Return value: Test result

get_scalability_results() -> Dict

Get scalability test results.

Return value: Test result data

get_scalability_status() -> Dict

Get the scalability test status.

Return value: Current state

test_node_scalability(node_counts: List[int], test_duration: float = 60.0) -> Dict

Test scalability with different number of nodes.

Parameters: - node_counts: list of number of nodes to test - test_duration: Duration of each test

Return value: Test result

get_resource_usage() -> Dict

Get resource usage.

Return value: Resource usage statistics

run_stress_test(intensity: str = "high", duration: float = 120.0) -> Dict

Run a stress test.

Parameters: - intensity: Test intensity ("low", "medium", "high") - duration: Test duration (seconds)

Return value: Test result

get_system_limits() -> Dict

Get system limits.

Return value: System limit information

Hardware optimization methods

optimize_model(model_type: str, optimizations: Optional[List[str]] = None) -> Dict

Optimize the model for your hardware.

Parameters: - model_type: Model type - optimizations: list of optimizations to apply

Return value: Optimization result

benchmark_model(model_type: str, num_runs: int = 50) -> Dict[str, Any]

Benchmark your model.

Parameters: - model_type: Model type to benchmark - num_runs: Number of executions

Return value: Benchmark result

get_hardware_info() -> Dict

Get hardware information.

Return value: Hardware information

High availability monitoring methods

get_availability_status() -> Dict

Get availability status.

Return value: Availability state

get_availability_stats(time_window: str = "24h") -> Dict

Get availability statistics.

Parameters: - time_window: statistical time window (e.g. "24h", "7d")

Return value: Availability statistics

perform_health_check() -> Dict

Run a health check.

Return value: Health check result

trigger_recovery_action(action_type: str, parameters: Optional[Dict[str, Any]] = None) -> Dict

Trigger a recovery action.

Parameters: - action_type: Action type - parameters: action parameters

Return value: Action result

get_availability_alerts(limit: int = 50) -> Dict

Get availability alerts.

Parameters: - limit: upper limit on the number of alerts to get

Return value: Alert list

schedule_maintenance(start_time: str, duration_minutes: int, reason: str) -> Dict

Schedule maintenance.

Parameters: - start_time: Start time (ISO format) - duration_minutes: Duration (minutes) - reason: Maintenance reason

Return value: Schedule result

Asynchronous Zenoh communication methods

connect_zenoh(node_id: str = "api_node") -> Dict

Connect to Zenoh network.

Parameters: - node_id: Node identifier

Return value: Connection result

publish_zenoh_message(topic: str, payload: Any, priority: str = "normal", message_type: str = "notification", node_id: str = "api_node") -> Dict

Publish Zenoh messages.

Parameters: - topic: message topic - payload: Message payload - priority: message priority - message_type: Message type - node_id: Source node ID

Return value: Publish result

send_zenoh_request(target_node: str, request: Any, timeout: float = 5.0, node_id: str = "api_node") -> Dict

Submit a Zenoh request.

Parameters: - target_node: Target node ID - request: request data - timeout: Timeout (seconds) - node_id: Source node ID

Return value: Response

send_zenoh_notification(target_nodes: List[str], notification: Any, priority: str = "normal", node_id: str = "api_node") -> Dict

Send Zenoh notifications.

Parameters: - target_nodes: list of target node IDs - notification: notification data - priority: Notification priority - node_id: Source node ID

Return value: Transmission result

get_zenoh_stats(node_id: str = "api_node") -> Dict

Get Zenoh statistics.

Parameters: - node_id: Node ID

Return value: Zenoh statistics

set_aeg_comm_config(node_id: str, enable_comm: bool = True, energy_threshold: float = 10.0, critical_modalities: List[str] = None, force_change_threshold: float = 10.0) -> Dict

Configure AEG-Comm communication optimization settings.

Parameters: - node_id: Target node ID - enable_comm: Enable/disable communication optimization - energy_threshold: Energy threshold - critical_modalities: Important modalities list - force_change_threshold: Force change threshold

Return value: Setting result

get_communication_stats(node_id: str = "api_node") -> Dict

Get AEG-Comm communication statistics.

Parameters: - node_id: Node ID

Return value: Communication statistics (sending rate, reduction rate, delay, error rate)

get_aeg_comm_status(node_id: str = "api_node") -> Dict

Get the current status of AEG-Comm.

Parameters: - node_id: Node ID

Return value: AEG-Comm status (number of active nodes, energy level, number of critical packets)

Distributed consensus method

propose_consensus_decision(decision_type: str, payload: Any, priority: int = 1, dependencies: Optional[List[str]] = None) -> Dict

Recommend a consensus decision.

Parameters: - decision_type: Decision type - payload: decision payload - priority: priority - dependencies: list of dependencies

Return value: Suggestion result

get_consensus_result(proposal_id: str, timeout: float = 30.0) -> Dict

Get consensus results.

Parameters: - proposal_id: proposal ID - timeout: Wait timeout (seconds)

Return value: Consensus result

update_node_status(node_id: str, active: bool) -> Dict

Update node state.

Parameters: - node_id: Node ID - active: Active state

Return value: Update result

get_consensus_stats() -> Dict

Get consensus statistics.

Return value: Statistical data

cleanup_consensus(max_age: float = 300.0) -> Dict

Clean up old consensus data.

Parameters: - max_age: Maximum time to retain (seconds)

Return value: Cleanup result

Artifact extension methods

get_session_artifacts(session_id: str) -> Any

Get session artifacts.

Parameters: - session_id: Session ID

Return value: Artifact data

get_artifact(artifact_id: str, destination_path: Optional[str] = None) -> bytes

Get the artifact.

Parameters: - artifact_id: Artifact ID - destination_path: Destination path (optional)

Return value: Artifact data

Async methods

generate_async(prompt: str, max_length: int = 50) -> Dict[str, str]

Asynchronous text generation.

submit_prompt_async(prompt: Optional[str] = None, image_path: Optional[str] = None, audio_path: Optional[str] = None) -> Dict

Asynchronous multimodal prompting.

health_check_async() -> Dict[str, Any]

Asynchronous health checks.

Jupyter integration class

JupyterAPIClient

Extended client for Jupyter Notebook environments.

Additional methods

set_display_mode(mode: str) -> None

Set the output display mode.

Parameters: - mode: Display mode ("html", "json", "text")

show_server_info() -> None

Displays server information in rich format.

show_stats() -> None

Display client statistics in rich format.

validate_prompt_interactive(prompt: str) -> bool

Validate the prompt interactively.

WebSocket Client

WebSocketClient

WebSocket client for real-time communication.

Constructor

WebSocketClient(
    ws_url: str = "ws://localhost:8000/ws",
    reconnect_attempts: int = 5,
    reconnect_delay: float = 2.0,
)

Method

connect() -> None

Establish a WebSocket connection.

disconnect() -> None

Close the WebSocket connection.

send_message(message: Dict[str, Any]) -> None

Send a message.

receive_message() -> Dict[str, Any]

Receive messages.

register_handler(message_type: str, handler: Callable) -> None

Register a message handler.

listen() -> None

Listen for messages.

Error handling

EvoSpikeNetAPIError

API-related exception class.

Attributes

  • error_info: ErrorInfo object
  • message: error message
  • status_code: HTTP status code (optional)
  • details: Additional error details

ErrorInfo

Data class that stores error information.

Attributes

  • error_type: Error type
  • message: error message
  • details: Additional details
  • retry_after: Wait time before retry (seconds)
  • status_code: HTTP status code

L5 Self-Evolution/Self-Healing API

Last updated: 2026-03-11 (v2.2.0)

Core classes for L5 self-evolving systems. A reference for using genomes, evolution engines, fitness evaluation, and memory management with "direct Python import."

GenomePoolevospikenet.genome_pool

from evospikenet.genome_pool import GenomePool, SelectionConfig, MutationConfig, CrossoverConfig
from evospikenet.snapshot_recovery import SnapshotManager

Constructor

GenomePool(
    population_size: int = 50,
    selection_config: Optional[SelectionConfig] = None,
    mutation_config: Optional[MutationConfig] = None,
    crossover_config: Optional[CrossoverConfig] = None,
    seed: Optional[int] = None,
    snapshot_manager: Optional[SnapshotManager] = None,  # v2.2.0 new
)

Parameters: - population_size: Population size - selection_config: Selection strategy settings (tournament / roulette / rank) - mutation_config: Mutation settings (mutation rate, structural mutation rate, etc.) - crossover_config: Crossover settings (uniform / single_point / two_point) - snapshot_manager: SnapshotManager instance that creates automatic snapshots after generation update

evolve_generation() -> PopulationStats

Asynchronous method that evolves one generation and returns statistics. If snapshot_manager is set, create_snapshot("post_generation_{N}") is automatically called after generation update.

Usage example:```python import asyncio from evospikenet.genome_pool import GenomePool from evospikenet.snapshot_recovery import SnapshotManager

manager = SnapshotManager(snapshot_dir="/tmp/evo_snapshots") pool = GenomePool(population_size=50, snapshot_manager=manager) pool.initialize_population()

async def run(): for _ in range(10): pool.update_fitness(pool.genomes[0].genome_id, 0.95) stats = await pool.evolve_generation() print(f"Gen {stats.generation}: best={stats.max_fitness:.3f}")

asyncio.run(run())

---

### `MutationEngine` — `evospikenet.evolution_engine`

```python
from evospikenet.evolution_engine import MutationEngine
from evospikenet.advanced_mutations import AdvancedMutationEngine, AdaptiveMutationConfig

Constructor

MutationEngine(
    base_mutation_rate: float = 0.05,
    mutation_strength: float = 0.1,
    advanced_engine: Optional[AdvancedMutationEngine] = None,  # v2.2.0 new
)

Parameters: - base_mutation_rate: Mutation probability for each Rengen - mutation_strength: Magnitude of parameter change - advanced_engine: Structural mutation engine that calls apply_mutations() after basic mutations are completed. If None, only the conventional basic mutation is executed.

mutate_genome(genome: EvoGenome) -> EvoGenome

Apply mutations to the entire genome and return copies. If advanced_engine is set, apply advanced structural mutations (add/delete layers, skip connections, pruning, etc.) after basic mutations.

Usage example:```python from evospikenet.evolution_engine import MutationEngine from evospikenet.advanced_mutations import AdvancedMutationEngine, AdaptiveMutationConfig

advanced = AdvancedMutationEngine(AdaptiveMutationConfig(base_rate=0.1)) engine = MutationEngine(base_mutation_rate=0.05, advanced_engine=advanced)

mutated = engine.mutate_genome(genome)

---

### `AdvancedMutationEngine` — `evospikenet.advanced_mutations`

```python
from evospikenet.advanced_mutations import (
    AdvancedMutationEngine, AdaptiveMutationConfig, MutationType
)

Constructor

AdvancedMutationEngine(config: AdaptiveMutationConfig = AdaptiveMutationConfig())

apply_mutations(genome, mutation_types=None, performance_data=None) -> EvoGenome

Applying 10 types of structural mutations to the genome.

MutationType Content
ADD_LAYER_SMART Insert a layer between several layers and automatically update the connection matrix
REMOVE_LAYER_SMART Remove least important layer, bridge connection
ADD_SKIP_CONNECTION Add longest skip connection
REMOVE_SKIP_CONNECTION Randomly remove skip connections
PRUNE_CONNECTIONS Cut off two connections with the lowest 20% importance
GROW_CONNECTIONS Add 10% connections starting from short distance
MODULE_DUPLICATION Duplicate module (chromosome)
MODULE_FUSION Combine two modules
ADAPTIVE_LAYER_SIZE Increase or decrease layer size depending on activity
RECURRENT_CONNECTION Add recursive connection

update_mutation_rates(generation, fitness_improvements)

Adaptively updates the mutation rate according to the success rate of each mutation type.


FitnessEvaluatorevospikenet.fitness_evaluator

from evospikenet.fitness_evaluator import FitnessEvaluator, TaskBenchmark

_evaluate_robustness(genome, brain) -> Dict[str, float]

Replaced placeholder with implementation in v2.2.0.

If brain is provided:``` ノイズ注入テスト (σ=0.2) クリーン入力とノイズ入力の出力差分 → noise_resistance

障害許容性テスト 中間層の重みを10%マスク後に出力差分 → failure_tolerance

**If `brain` is `None`:**```
構造的プロキシ
  スキップ接続数 → noise_resistance (0.5 + skips * 0.05)
  再帰接続数 → stability      (0.6 + recurrent * 0.04)
  層深度         → failure_tolerance (深すぎる/浅すぎると下がる)

Door value: {"noise_resistance": float, "failure_tolerance": float, "stability": float}


rollback_to_snapshot()evospikenet.rollback

from evospikenet.rollback import rollback_to_snapshot, rollback_version

rollback_to_snapshot(snapshot_id, manager, target_path) -> Path

A convenience function that searches for a snapshot from SnapshotManager by ID and restores it to target_path.

Parameters: - snapshot_id: str — Snapshot ID returned by SnapshotManager.create_snapshot() - manager: SnapshotManager — instance that holds snapshot metadata - target_path: str | Path — Restore destination path

True value: Path of the restored file

Exception: DataLoadError — if ID is unknown or file does not exist

Usage example:```python from evospikenet.rollback import rollback_to_snapshot from evospikenet.snapshot_recovery import SnapshotManager

manager = SnapshotManager(snapshot_dir="/tmp/evo_snapshots")

Get the snapshot list and restore the latest one

snapshots = manager.list_snapshots() if snapshots: latest = sorted(snapshots, key=lambda x: x["timestamp"], reverse=True)[0] restored = rollback_to_snapshot( snapshot_id=latest["id"], manager=manager, target_path="/tmp/restored_state.gz", ) print(f"復元完了: {restored}")

#### `rollback_version(version_entry, target_path) -> Path`

A low-level function that directly specifies and restores a history entry dictionary file.

**Parameters:**
- `version_entry: Dict` — Dictionary containing the `"path"` key
- `target_path: str | Path` — Restore destination path

---

## Data type

### Priority

Enumeration type representing priority.

- `LOW = "low"`
- `NORMAL = "normal"`
- `HIGH = "high"`
- `CRITICAL = "critical"`

### MessageType

An enumerated type representing the message type.

- `NOTIFICATION = "notification"`
- `REQUEST = "request"`
- `RESPONSE = "response"`
- `EVENT = "event"`

### OptimizationType

Enumeration representing optimization types.

- `SPEED = "speed"`
- `MEMORY = "memory"`
- `ACCURACY = "accuracy"`
- `POWER = "power"`

### ArtifactType

Enum type representing artifact type.

- `MODEL = "model"`
- `LOG = "log"`
- `CONFIG = "config"`
- `DATA = "data"`
- `METRICS = "metrics"`

## Usage example

### Basic usage example

```python
<!-- TODO: update or remove - import fail<!-- Remember: Automatic conversion not possible — please fix manually -->eNetAPIClient -->

# Client initialization
client = EvoSpikeNetAPIClient(base_url="http://localhost:8000")

# text generation
result = client.generate("Hello, world!", max_length=100)
print(result['generated_text'])

# multimodal processing
response = client.submit_prompt(
    prompt="この画像を説明してください",
    image_path="image.jpg"
)
result = client.poll_for_result(timeout=60)
print(result['response'])

Artifact Management

# Artifact upload
with open('model.pkl', 'rb') as f:
    file_data = io.BytesIO(f.read())
    result = client.upload_artifact(
        session_id="session_123",
        artifact_type="model",
        name="trained_model",
        file=file_data,
        model_category="classification"
    )

# Artifact list
artifacts = client.list_artifacts(artifact_type="model")

# artifact download
client.download_artifact("artifact_id_123", "downloaded_model.pkl")

Error handling

<!-- Module 'evospikenet' not found. Check moves/renames within the package -->
<!-<!-- Remember: Cannot convert automatically  please fix manually -->t.generate("Test prompt")
except EvoSpikeNetAPIError as e:
    print(f"API Error: {e.error_info.message}")
    if e.error_info.retry_after:
        print(f"Retry after: {e.error_info.retry_after}s")

Usage in Jupyter Notebook

<!-- TODO: update<!-- Module 'evospikenet' not found. Please check moves/renames in the package -->kenet.sdk_jupyter import Jup<!-- Remember: Cannot convert automatically  please fix manually -->Configuration
client.set_display_mode("html")

# Server information display
client.show_server_info()

# Statistics display
client.show_stats()

Real-time communication using WebSockets

import asyncio
<!-- TODO: update or remove - impo<!-- Module 'evospikenet' not found. Please check the move/rename in the package -->WebSocketClient -->

async def main():
client = <!-- Please note: Cannot convert automatically  please fix manually -->e_message(msg):
        print(f"Received: {msg}")

    client.register_handler("notification", handle_message)

    # Send message
    await client.send_message({
        "type": "notification",
        "data": "Hello from SDK!"
    })

    # Start listening
    await client.listen()

asyncio.run(main())

Best practices

  1. Connection Management: Reuse sessions in long-running applications.
  2. Error Handling: Always implement proper exception handling.
  3. Timeout: Please set an appropriate timeout according to your network conditions.
  4. Retry: Use automatic retry for temporary errors.
  5. Resource Management: Use streaming when working with large files.
  6. Authentication: Manage the API key using environment variables.
  7. Monitoring: Check client statistics regularly.

troubleshooting

Common issues

  1. Connection error: Please check if the API server is started.
  2. Authentication error: Please check if the API key is set correctly.
  3. Timeout: Please check network conditions and server load.
  4. Memory error: Use streaming when working with large files.

Debug information

# Check client statistics
stats = client.get_stats()
print(f"Requests: {stats['requests']}")
print(f"Errors: {stats['errors']}")
print(f"Average latency: {stats['average_latency']:.3f}s")

# Check server information
info = client.get_server_info()
if info:
    print(f"Server version: {info.get('version')}")
    print(f"Loaded models: {info.get('models', [])}")

Distributed coordinator method

init_coordinator(node_id: str, zenoh_config: Optional[Dict[str, Any]] = None, raft_config: Optional[Dict[str, Any]] = None) -> None

Initialize the distributed coordinator.

Parameters: - node_id: Unique identifier for this node - zenoh_config: Zenoh DDS configuration (optional) - raft_config: Raft consensus configuration (optional)

example:```python

basic initialization

client.init_coordinator("node_1")

Initialization with custom settings

zenoh_config = {"connect": ["tcp/127.0.0.1:7447"]} raft_config = {"election_timeout": 5000} client.init_coordinator("node_1", zenoh_config, raft_config)

##### start_coordinator() -> None

Start the distributed coordinator.

**Exception:**
- `RuntimeError`: If coordinator is not initialized

**example:**```python
client.init_coordinator("node_1")
client.start_coordinator()
print("Coordinator started")

stop_coordinator() -> None

Stop the distributed coordinator.

Exception: - RuntimeError: If coordinator is not initialized

example:```python client.stop_coordinator() print("Coordinator stopped")

##### submit_coordination_task(task_type: str, payload: Dict[str, Any]) -> str

Submit a distributed coordination task.

**Parameters:**
- `task_type`: Task type
- `payload`: task payload data

**Return value:** Task ID

**Exception:**
- `RuntimeError`: If coordinator is not initialized

**example:**```python
task_id = client.submit_coordination_task(
    "federated_learning",
    {"model": "resnet50", "dataset": "cifar10"}
)
print(f"Task submitted: {task_id}")

Internal task implementation memo (SDK built-in simple version)
  • federated_learning: Average numeric field of payload['updates'] and give aggregated_parameters and aggregation_meta.
  • distributed_inference: payload['inputs'] / payload['batches'] are converted into results as they are, and node_id and status=completed are added to each entry.
  • model_aggregation: Average the weights list of payload['models'] and give aggregated_model['weights'] and aggregation_meta.
get_coordination_task_status(task_id: str) -> Optional[Dict[str, Any]]

Get the status of the coordination task.

Parameters: - task_id: Task ID to check

Return value: Task state information, or None if not found

Exception: - RuntimeError: If coordinator is not initialized

example:```python status = client.get_coordination_task_status(task_id) if status: print(f"Task status: {status['status']}")

##### get_cluster_status() -> Dict[str, Any]

Get the current cluster state.

**Return value:** Cluster state information

**Exception:**
- `RuntimeError`: If coordinator is not initialized

**example:**```python
status = client.get_cluster_status()
print(f"Active nodes: {status['active_nodes']}")
print(f"Leader: {status['leader_id']}")

register_coordination_node(node: Union[NodeInfo, str], node_info: Optional[Dict[str, Any]] = None) -> bool

Register a new node to the coordination cluster.

Parameters: - node: NodeInfo or node ID string - node_info: Required when passing a string to node. A dictionary containing address, port, role, capabilities, etc.

Return value: True if registration was successful

Exception: - RuntimeError: If coordinator is not initialized

example:```python

Pass NodeInfo directly

from datetime import datetime

node_info = NodeInfo( node_id="node_2", a=datetime.now(), capabilities=["gpu", "cpu"], ) success = client.register_coordination_node(node_info)

When passing ID + dict

success = client.register_coordination_node( "node_3", {"address": "192.168.1.101", "port": 9001, "role": "follower", "capabilities": ["cpu"]}, )

##### unregister_coordination_node(node_id: str) -> bool

Remove the node from the coordination cluster.

**Parameters:**
- `node_id`: Node ID to be released

**Return value:** True if release was successful

**Exception:**
- `RuntimeError`: If coordinator is not initialized

**example:**```python
success = client.unregister_coordination_node("node_2")


Structure details of return value

Return value of generate()

{
    "generated_text": "生成されたテキスト",
    "tokens": 150,
    "latency_ms": 250
}

Return value of submit_prompt()

{
    "prompt_id": "prompt_12345",
    "status": "submitted",
    "timestamp": "2026-02-17T10:30:45Z",
    "processing_nodes": {
        "visual": "Vision-1",
        "spatial": "Spatial-12",  # Feature 13: Rank 12 (Where pathway)
        "integration": "Spatial-14"  # Feature 13: Rank 14 (Integration)
    }
}

Return value of poll_for_result()

{
    "prompt_id": "prompt_12345",
    "status": "completed",
    "response": "応答テキスト",
    "spatial_analysis": {
        "rank_12_where": {
            "positions": [[100, 200], [150, 250]],
            "depth": [0.5, 0.7],
            "latency_ms": 47
        },
        "rank_14_integration": {
            "what_where_fusion": "結果",
            "world_model": "..."
        }
    },
    "processing_time_ms": 150
}

Return value of get_simulation_result()

{
    "response": "応答内容",
    "nodes_involved": [
        "Spatial-12", "Spatial-13", "Spatial-14", "Spatial-15"
    ],
    "timestamp": "2026-02-17T10:31:00Z"
}

Return value of batch_generate()

[
    {
        "generated_text": "テキスト1",
        "tokens": 100,
        "latency_ms": 200
    },
    {
        "generated_text": "テキスト2",
        "tokens": 150,
        "latency_ms": 250
    },
    {
        "error": "エラーメッセージ",
        "prompt": "失敗したプロンプト"
    }
]

Compatible with Feature 13 (spatial recognition/generation system)

EvoSpikeNet SDK supports Rank 12-15 spatial processing nodes (Feature 13).

Node Rank Processing target Delay target
SpatialWhereNode 12 Spatial position/depth estimation <50ms
SpatialWhatNode 13 Object recognition/scene generation <30ms
SpatialIntegrationNode 14 What-Where Integration/World Model <50ms
SpatialAttentionControlNode 15 Saccade planning/attention control <30ms

Examples of spatial processing

# Perform spatial processing with multimodal input
response = client.submit_prompt(
    prompt="画像内の物体の位置を説明してください",
    image_path="./sample_image.jpg"
)
result = client.poll_for_result(timeout=60)

# Obtain processing results for Rank 12-15
if result and 'spatial_analysis' in result:
    where_result = result['spatial_analysis'].get('rank_12_where')
    integration_result = result['spatial_analysis'].get('rank_14_integration')
    print(f"Detected positions: {where_result['positions']}")
    print(f"Integrated analysis: {integration_result}")

Space generation service (neural version)

  • Endpoint: POST /generate
  • Input: JSON
    • input_text (str, <=4000 chars, required): Natural language description of the scene you want to generate
    • cognitive_context (object, optional): Cognitive state hints such as attention/stress/engagement etc.
    • eeg_data (object, optional): Apply LIF analysis when passing EEG raw data in signals array
    • model_version (str, optional): The version of the high-precision model stored on the server. If it exists, an attempt is made to reload the model and the high_precision output flag is updated.
  • Validation: type checking and maximum length checking (4000 characters). Invalid input will result in 400 Bad Request.
  • Output: scene specification
    • scene_type, spatial_coordinates, spatial_layout, objects, dimensions, physics, lighting, navigation
    • high_precision (bool): Whether a high precision model is currently loaded
    • model_version (str): currently active model version
    • metadata: confidence, processing_time, quantum_modulation_alpha, cognitive_entropy, encoding_norm, input_text, quality, quality_factor

Health Check

(Additional information) - model_version: Version of the currently loaded spatial generation model. - high_precision: Whether the above model is in high precision mode.

  • Endpoint: GET /health
  • Main fields:
    • status: running|error|degraded
    • device: cpu|cuda
    • neural_components_available: Are all components successfully initialized?
    • Availability boolean for components: language_adapter / encoder_decoder / attention / plasticity / quantum / embodied_physics / lif_layer / izhikevich_layer
    • metrics: request_count, error_rate, avg_latency_ms, uptime_seconds
    • knowledge_base_loaded: Is the scene/object KB already loaded?
    • last_spike_cached: Is the latest spike train cached?

Sample (direct HTTP)

curl -X POST http://localhost:8000/generate \
    -H "Content-Type: application/json" \
    -d '{
        "input_text": "Calm indoor room with a table and soft light",
        "cognitive_context": {"attention_level": 0.6, "stress_level": 0.2}
    }'

curl http://localhost:8000/health | jq

SDK Wrapper

from evospikenet.sdk import EvoSpikeNetAPIClient

# Example of space generation using SDK
client = EvoSpikeNetAPIClient(spatial_base_url="http://localhost:8000")
scene = client.spatial_generate(
    "Calm indoor room with a table and soft warm lighting",
    cognitive_context={"attention_level": 0.6, "stress_level": 0.2},
)

health = client.spatial_health()
print(scene.get("metadata", {}).get("confidence"))

Quantum auxiliary reasoning

  • Endpoint: POST /quantum/infer
  • Input: JSON
    • scene (object): scene object returned by /generate
    • spike_trains (tensor/list): Spike trains used for reinforcement (optional)
  • Output: Extended scene with metadata.quantum_modulation_alpha and cognitive_entropy added to the given scene

RAG system (versioned index)

  • Base URL example: http://localhost:8001
  • Endpoint:
    • POST /upload_file: Verify file → Analyze → Autochunk → Embed → Store in Milvus+Elasticsearch. The response includes doc_key and version.
    • POST /query: Search and generate with {"query": "...", "llm_type": "huggingface"|"evosdk"}. Returns the hit document in context, which may include sdk_response.
  • Supported extensions (main): .pdf, .docx, .doc, .pptx, .ppt, .xlsx, .xls, .txt, .md, .markdown, .gdoc, .html
  • Sample (enter docs/README.md and search):
export RAG_API_URL=${RAG_API_URL:-http://localhost:8001}

curl -X POST "$RAG_API_URL/upload_file" \
    -F "file=@docs/README.md" | jq

curl -X POST "$RAG_API_URL/query" \
    -H "Content-Type: application/json" \
    -d '{"query": "What does this project do?", "llm_type": "huggingface"}' | jq

SDK Wrapper

from evospikenet.sdk import EvoSpikeNetAPIClient

# Example of SDK wrapper for RAG
client = EvoSpikeNetAPIClient(rag_base_url="http://localhost:8001")

# Input file
upload = client.rag_upload_file("docs/README.md")
print(upload.get("doc_key"))

# Query execution (using SDK's rag_query)
answer = client.rag_query("What does this project do?", llm_type="huggingface")
if answer:
    ctx = answer.get("context", [])
    if ctx:
        print(ctx[0].get("text"))

Sample script: - examples/spatial_generation_client.py — Simple client that hits /generate and /health. - examples/rag_ingest_and_query.py — Search in /query after indexing in /upload_file. - examples/rag_markdown_sdk.py — Input and search Markdown files via SDK.


Error handling

The SDK handles errors with the EvoSpikeNetAPIError exception.

from evospikenet.sdk import EvoSpikeNetAPIClient, EvoSpikeNetAPIError

client = EvoSpikeNetAPIClient()
try:
    result = client.generate("Hello, world!")
except EvoSpikeNetAPIError as e:
    print(f"Error type: {e.error_info.error_type}")
    print(f"Details: {e.error_info.details}")
    if e.error_info.retry_after:
        print(f"Retry after {e.error_info.retry_after} seconds")

Implementation Guide

1. Synchronous processing (simple script)

from evospikenet.sdk import EvoSpikeNetAPIClient

client = EvoSpikeNetAPIClient()
client.submit_prompt(prompt="テキスト生成クエリ")

# Get results by polling
result = client.poll_for_result(timeout=60, interval=5)

if result:
    print(f"Response: {result.get('response')}")

sential request)

prompts = ["クエリ1", "クエリ2", "クエリ3"]
results = client.batch_generate(prompts, max_length=100)

for i, result in enumerate(results):
    if 'error' not in result:
        print(f"Result {i}: {result.get('generated_text')}")

Benefits: Easily handle multiple requests Applications: Test data generation, baseline experiments, dataset preparation

3. Asynchronous processing (high throughput)

See async_spatial_processing_example.py for details.

import asyncio
from evospikenet.sdk import EvoSpikeNetAPIClient

async def run_async_examples():
    client = EvoSpikeNetAPIClient()
    tasks = [
        client.submit_prompt_async("クエリ1"),
        client.submit_prompt_async("クエリ2"),
        client.submit_prompt_async("クエリ3"),
    ]
    responses = await asyncio.gather(*tasks)
    print(responses)

asyncio.run(run_async_examples())

Advantages: High throughput, resource efficient Applications: Large-scale batch processing, production systems, microservices

Sample code

Basics: Feature 13 Spatial processing

# Synchronization sample (recommended: script/experiment)
python examples/spatial_processing_example.py

# Asynchronous samples (recommended: production/high throughput)
python examples/async_spatial_processing_example.py

Detailed features: - Synchronization example spatial_processing_example.py: - ✅ Text generation and spatial analysis - ✅ Multimodal input (image + text) - ✅ Batch processing - ✅ Health check

  • Async example async_spatial_processing_example.py:
  • ✅ Concurrent tasks
  • ✅ Rate limited sending
  • ✅ Asynchronous health checks
  • ✅ Error handling

Best Practices

1. Timeout management

# Recommended: Set appropriate timeouts
result = client.poll_for_result(
    timeout=120,  # 2 minutes
    interval=5    # Poll every 5 seconds
)

2. Error handling

try:
    result = client.submit_prompt(prompt="クエリ")
except EvoSpikeNetAPIError as e:
    if e.error_info.error_type in ["timeout", "server_error"]:
        # retryable error
        pass
    else:
        # Errors that cannot be retried (input errors, etc.)
        pass

3. Scaling concurrency

# Small scale (< 10 requests): synchronous processing
for prompt in small_list:
    result = client.generate(prompt)

# Large scale (> 100 requests): Asynchronous processing
import asyncio
tasks = [client.generate_async(p) for p in large_list]
results = asyncio.run(asyncio.gather(*tasks))

4. Resource efficiency

# ✅ Good example: context manager
with client.create_session() as session:
    for prompt in prompts:
        result = client.submit_prompt(prompt)

# ❌ Avoid: New connections every time
for prompt in prompts:
    new_client = EvoSpikeNetAPIClient()  # inefficiency

Specifications and limitations

API Limits

Item Limit value
Maximum prompt length 10,000 characters
Maximum growth 5,000 characters
Timeout 300 seconds (5 minutes)
Maximum batch size 100
Number of simultaneous connections 1,000
QPS (queries per second) 1,000

Performance indicators (Feature 13)

Metrics Target value
Text generation < 500ms
Spatial analysis < 200ms (Rank 12-15)
Batch processing 100/sec (100 requests/sec)
Availability 99.9%
P99 Latency 1,000ms
/Users/maoki/Documents/GitHub/EvoSpikeNet/docs/SDK_API_REFERENCE.md

Feature 36: Automatic recovery method

overview

AutoRecoveryEngine autonomously recovers the system through AI-based anomaly detection and automatic playbook execution. Target MTTR -80%.

List of API endpoints

Method Path Description
GET /api/recovery/status Get recovery engine status
GET /api/recovery/incidents Get list of incidents
GET /api/recovery/incidents/{id} Get specific incident
POST /api/recovery/incidents/{id}/acknowledge Change incident to acknowledged
POST /api/recovery/incidents/{id}/resolve Mark incident resolved
POST /api/recovery/trigger Manually trigger abnormality diagnosis

get_recovery_status()

import requests

resp = requests.get(
    "http://localhost:8000/api/recovery/status",
    headers={"X-API-Key": API_KEY}
)
status = resp.json()
# {
#   "total_incidents": 5,
#   "open_incidents": 1,
#   "mttr_seconds": 120.4,
#   "monitoring_interval_seconds": 30,
# }

list_incidents(status, severity, limit)

params = {"status": "open", "severity": "critical", "limit": 20}
resp = requests.get(
    "http://localhost:8000/api/recovery/incidents",
    headers={"X-API-Key": API_KEY},
    params=params,
)
incidents = resp.json()["incidents"]
Parameter Type Description
status str open / acknowledged / resolved / auto_resolved
severity str low / medium / high / critical
limit int Maximum number of acquisitions (default: 50)

trigger_diagnosis(metrics)

payload = {
    "cpu_percent": 95.0,
    "memory_percent": 80.0,
    "db_connected": False,
    "error_rate": 0.15,
}
resp = requests.post(
    "http://localhost:8000/api/recovery/trigger",
    headers={"X-API-Key": API_KEY},
    json=payload,
)
result = resp.json()
# { "status": "incident_created", "incident": { "id": "...", ... } }
# or
# { "status": "no_anomaly_detected" }

Complete sample code

auto_recovery_sdk.py


Feature 39: Audit log method

overview

AuditLogManager provides tamper-detected audit logs using SHA-256 hash chains. All HTTP requests are automatically logged.

List of API endpoints

Method Path Description
GET /api/audit/stats Get statistics
GET /api/audit/logs Log search
POST /api/audit/log Manual entry entry
GET /api/audit/verify Hash chain verification
GET /api/audit/export Log export (JSON/CSV)

get_audit_stats()

resp = requests.get(
    "http://localhost:8000/api/audit/stats",
    headers={"X-API-Key": API_KEY}
)
stats = resp.json()

query_audit_logs(actor, action, resource, result, since, until, limit)

params = {
    "actor": "admin_user",
    "action": "model.train",
    "result": "success",
    "limit": 100,
}
resp = requests.get(
    "http://localhost:8000/api/audit/logs",
    headers={"X-API-Key": API_KEY},
    params=params,
)
entries = resp.json()["entries"]
Parameter Type Description
actor str Filter by operator
action str Action prefix match filter
result str success / failure / error
since / until str period ISO8601
limit int Default: 100

write_audit_entry(action, actor, resource, result, detail)

resp = requests.post(
    "http://localhost:8000/api/audit/log",
    headers={"X-API-Key": API_KEY},
    json={
        "action": "model.deploy",
        "actor": "ci_pipeline",
        "resource": "/api/models/genome_v3",
        "result": "success",
        "detail": {"version": "3.2.1"},
    },
)
# { "status": "logged", "entry_id": "...", "entry_hash": "sha256:..." }

verify_chain()

resp = requests.get(
    "http://localhost:8000/api/audit/verify",
    headers={"X-API-Key": API_KEY}
)
# { "valid": true, "checked": 1500, "error": null }

export_logs(format)

# JSON format
resp = requests.get("http://localhost:8000/api/audit/export",
    headers={"X-API-Key": API_KEY}, params={"format": "json"})
logs_json = resp.json()

# CSV format
resp = requests.get("http://localhost:8000/api/audit/export",
    headers={"X-API-Key": API_KEY}, params={"format": "csv"})
csv_text = resp.text

Complete sample code

audit_log_sdk.py


Feature 40: Geographically distributed node management method

*Note: There are partially implemented modules. API is available but Continuing to refine tests and documentation. *

overview

GeoNodeManager provides multi-region node management, automatic failover, and inter-region latency measurement.

List of API endpoints

Method Path Description
GET /api/geo/status Get overall status
GET/POST /api/geo/regions Region list/registration
GET/DELETE /api/geo/regions/{id} Region details/delete
GET/POST /api/geo/nodes Node list/registration
DELETE /api/geo/nodes/{id} Delete node
POST /api/geo/failover Execute failover
GET /api/geo/failover/history failover history
GET /api/geo/latency-matrix latency matrix
GET/PUT /api/geo/active-region Get/Change active region
GET /api/geo/replication-group/{id} replication group

register_node(node_id, region_id, endpoint, node_type)

resp = requests.post(
    "http://localhost:8000/api/geo/nodes",
    headers={"X-API-Key": API_KEY},
    json={
        "node_id": "prod-gpu-node-01",
        "region_id": "ap-northeast-1",
        "endpoint": "10.0.1.100:8000",
        "node_type": "gpu",
    },
)
# { "status": "registered", "node_id": "prod-gpu-node-01" }

trigger_failover(from_region, to_region, reason, triggered_by)

resp = requests.post(
    "http://localhost:8000/api/geo/failover",
    headers={"X-API-Key": API_KEY},
    json={
        "from_region": "ap-northeast-1",
        "to_region": "us-east-1",
        "reason": "region_unavailable",
        "triggered_by": "monitoring_system",
    },
)
# { "status": "failover_executed", "event": { ... } }

get_latency_matrix()

resp = requests.get(
    "http://localhost:8000/api/geo/latency-matrix",
    headers={"X-API-Key": API_KEY}
)
matrix = resp.json()["matrix"]
# matrix["ap-northeast-1"]["us-east-1"] == 145.3  (ms)

GeoRegion structure

{
  "region_id": "ap-northeast-1",
  "display_name": "Asia Pacific (Tokyo)",
  "provider": "aws",
  "latitude": 35.6762,
  "longitude": 139.6503,
  "priority": 1,
  "status": "online",
  "node_count": 4
}

Performance indicators (Feature 36/39/40)

Metrics Target value
Audit log write throughput ≥ 1,000 entries/sec
Hash chain verification (1,000 results) < 5,000 ms
Anomaly detection latency (p99) < 10 ms
Latency matrix calculation (50 regions) < 500 ms
Incident creation latency < 100 ms
Automatic failover completion time < 30 seconds

Complete sample code

geo_node_manager_sdk.py


Biomimicry integration API (BrainSimulationFramework)

Updated date: 2026-03-06 Phase A/B all 11 items completed. Detailed evaluation: docs-dev/biomimetic_integration_evaluation.md v2.0 (8.7/10)

BrainSimulationFramework

Top-level integration class for the evospikenet.brain_simulation module. biomimetic/ Combine all modules (neuromodulation, STDP, sleep, circuits, cortical topology, DMN, etc.) with the SNN core.

Constructor

BrainSimulationFramework(
    enable_biomimetic: bool = False,
    config: Optional[BrainSimulationConfig] = None,
)

Parameters: - enable_biomimetic: Set to True to initialize all biomimetic/ modules and enable full loop integration. - config: Simulation configuration object (default if omitted).

Method

run_simulation(duration: int = 1000) -> Dict[str, Any]

Run the biomimetic 6-phase pipeline.

Phase: 1. DevelopmentalSchedule.plasticity_multiplier(t) — Developmental schedule 2. NeuralCircuitModeler.simulate_timestep() — Circuit simulation (Izhikevich compatible) 3. STDP × NeuromodulatorGate.gated_learning_rate() — Plasticity modulation 4. NodeEnergyBudget.energy_fitness_term() — Energy homeostasis 5. HippocampalBuffer.store(episode) — Episodic memory record 6. SleepConsolidation.offline_consolidation() — Sleep consolidation replay

Return value: Dictionary containing {phase_results, biomimetic_status, duration_ms}

run_idle_phase(duration_s: float = 10.0) -> Dict[str, Any]

Executes DMN (Default Mode Network) idle cycles asynchronously.

import asyncio
activities = asyncio.run(framework.run_idle_phase(duration_s=10.0))

Returns: Dictionary of DMN activity logs and spike records for each time step

biomimetic_status() -> Dict[str, Any]

Returns a state snapshot of all biomimetic modules.

status = framework.biomimetic_status()
# {
#   "stdp_connected_gate": True,
#   "sleep_consolidation_replay": True,
#   "izhikevich_circuits": 1,
#   "cortical_columns_registered": 0,
#   "neuromodulator_registry_linked": True,
#   "efference_copy_adaptive": True,
#   "mirror_neuron_default_classifier": True,
#   "dmn_idle_phase_available": True,
# }

NeuralCircuitModeler (Izhikevich backend)

Constructor

NeuralCircuitModeler(
    config: NeuralCircuitConfig,
    neuron_type: str = "lif",  # "lif" | "izhikevich"
)

Parameters: - neuron_type: Specifying "izhikevich" uses the IzhikevichNeuron.step() backend. Compatible with various firing patterns such as RS/IB/CH/FS/LTS.

simulate_timestep(input_current, t) -> Tuple[np.ndarray, np.ndarray]

Execute simulation for 1 time step (dt=1ms).

Return value: (spike_array, membrane_voltages)


BrainRegionIntegrator

An integrated management class for cortical topology and brain regions.

add_cortical_topology(generator, nx_cols, ny_cols) -> int

Register the column layouts generated by CorticalTopologyGenerator as BrainRegionConfig and set microcosmic connections between adjacent columns (distance ≤ √2 mm).

from evospikenet.biomimetic import CorticalTopologyGenerator
from evospikenet.brain_simulation import BrainRegionIntegrator

gen = CorticalTopologyGenerator()
integrator = BrainRegionIntegrator()
n = integrator.add_cortical_topology(gen, nx_cols=4, ny_cols=4)
print(n)  # 16

Parameters: - generator: CorticalTopologyGenerator instance - nx_cols: Number of columns in the X direction - ny_cols: Number of columns in Y direction

Return value: Number of registered columns (int)


STDP — Biomimetic extension method

STDP.with_neuromodulation(gate) -> STDP (class method)

Factory method. Create an STDP instance injected with NeuromodulatorGate.

gate = NeuromodulatorGate()
stdp = STDP.with_neuromodulation(gate)

STDP.connect_plasticity_gate(gate) -> None

Retrofit NeuromodulatorGate to an existing STDP instance.

stdp = STDP()
stdp.connect_plasticity_gate(gate)

SleepConsolidation — extension method

offline_consolidation(episodes, stdp) -> Dict[str, Any]

Consolidate episode lists with STDP replay learning.

stats = sleep.offline_consolidation(episodes=buffer.replay(), stdp=stdp)
# stats: {replayed_episodes, weight_updates_mean, replay_duration_ms}

Return value (stats): Dictionary containing replayed_episodes, weight_updates_mean, replay_duration_ms


NeuromodulatorGate — Registry cooperation method

connect_to_registry(registry) -> None

Establish a two-way bridge with the NeuromodulatorRegistry.

push_to_registry() -> None

Writes the current status of Gate (DA/ACh/OT level, etc.) to the Registry.

pull_from_registry() -> None

Reflect the Registry value to Gate state.

gate = NeuromodulatorGate()
registry = NeuromodulatorRegistry()
gate.connect_to_registry(registry)
gate.push_to_registry()
gate.pull_from_registry()

EfferenceCopy — adaptive gain method

adaptive_gain_update(prediction_error: float) -> float

Adaptively update the gain based on the prediction error.

reset() -> None

Resets the gain and internal status to initial values.

efference = EfferenceCopy()
gain = efference.adaptive_gain_update(prediction_error=0.3)
efference.reset()

Complete sample code

sdk_distributed_brain.py (see demonstrate_biomimetic_brain_simulation() function)

Test list

Test file Target
tests/unit/test_biomimetic_init_api.py __init__.py All symbols
tests/unit/test_stdp_neuromodulation.py STDP ↔ Gate wiring
tests/unit/test_sleep_consolidation_stdp.py offline_consolidation() + stats
tests/unit/test_efference_copy_adaptive.py Adaptive gain/reset()
tests/unit/test_mirror_neurons_default_classify.py _default_classify()
tests/integration/test_brain_simulation_biomimetic.py BrainSimulationFramework Full integration
tests/integration/test_dmn_idle_phase.py run_idle_phase() / DMN stop confirmation
# Batch execution of biomimetic tests with Docker
docker compose -f docker-compose.test.yml --profile biomimetic run --rm biomimetic-test

Biomimicry REST API Endpoint

Endpoints under /biomimetic/* manipulate the entire biomimetic module via HTTP. It can be called by the requests library or by EvoSpikeNetAPIClient._make_request().

import requests

BASE_URL = "http://localhost:8000"
API_KEY  = "your-api-key"   # Can be an empty string if not required
HEADERS  = {"X-API-Key": API_KEY} if API_KEY else {}

Endpoint list

Method Path Description
GET /biomimetic/status All module status snapshot
POST /biomimetic/simulate Full brain simulation execution
POST /biomimetic/neuromod/update Neuromodulator level update
POST /biomimetic/reward TD reward signal sending
POST /biomimetic/sleep/consolidate Offline memory consolidation (replay)
POST /biomimetic/sleep/wake-config Sleep/wake cycle settings
POST /biomimetic/framework/reset Framework reinitialization

GET /biomimetic/status

Returns the current state of all biomimetic modules.

resp = requests.get(f"{BASE_URL}/biomimetic/status", headers=HEADERS)
data = resp.json()
# {
#   "biomimetic_enabled": true,
#   "neuromod_levels": {"dopamine": 0.5, "noradrenaline": 0.4, ...},
#   "energy_budget":   {"current_w": 9.2, "budget_w": 10.0},
#   "sleep_stats":     {"cycles_completed": 3, "replayed_events": 128},
# }

POST /biomimetic/simulate

Run the simulation by specifying all the parameters of NeuralCircuitConfig.

payload = {
    # NeuralCircuitConfig
    "num_neurons": 1000,
    "connection_probability": 0.1,
    "excitatory_ratio": 0.8,
    "inhibitory_ratio": 0.2,
    "refractory_period": 5,
    "membrane_time_constant": 20.0,
    # BrainSimulationFramework
    "enable_biomimetic": True,
    "development_epoch": 0,
    "total_epochs": 1000,
    "energy_budget_w": 10.0,
    # simulation control
    "duration": 1000,
    "plasticity_rule": "stdp",     # "stdp" | "bcm" | "oja"
    "plasticity_interval": 10,
    "sleep_every": 500,
    "sleep_cycles": 3,
    "neuron_type": "lif",          # "lif" | "izhikevich"
}
resp = requests.post(f"{BASE_URL}/biomimetic/simulate", headers=HEADERS, json=payload)
data = resp.json()
# {
#   "status": "ok",
#   "duration_ms": 3842,
#   "spikes_total": 18423,
#   "biomimetic_status": { ... }
# }
Parameters Type Default Description
num_neurons int 1000 number of circuit neurons
connection_probability float 0.1 connection probability
excitatory_ratio float 0.8 excitatory neuron ratio
inhibitory_ratio float 0.2 inhibitory neuron ratio
refractory_period int 5 Refractory period (ms)
membrane_time_constant float 20.0 Membrane time constant (ms)
enable_biomimetic bool true Enable biomimetic module
development_epoch int 0 current developmental epoch
total_epochs int 1000 total number of epochs
energy_budget_w float 10.0 Node energy limit (W)
duration int 1000 simulation length (ms)
plasticity_rule str "stdp" plasticity rule
plasticity_interval int 10 Plasticity update interval (steps)
sleep_every int 500 Sleep phase start timing (steps)
sleep_cycles int 3 number of sleep cycles
neuron_type str "lif" Neuron model type

POST /biomimetic/neuromod/update

Instantly updates neuromodulator levels such as dopamine.

payload = {
    "dopamine":          0.8,   # Rewards/motivation (0.0–1.0)
    "noradrenaline":     0.4,   # Awakening/attention
    "acetylcholine":     0.6,   # Memory/Learning
    "serotonin":         0.5,   # Mood/impulse control
    "oxytocin":          0.3,   # social bonds
    "emotion_factor":    0.7,   # general emotional intensity
    "motivation_factor": 0.9,   # Motivation to action
}
resp = requests.post(f"{BASE_URL}/biomimetic/neuromod/update", headers=HEADERS, json=payload)
data = resp.json()
# {
#   "message": "Neuromodulator levels updated",
#   "updated_levels": {"dopamine": 0.8, "serotonin": 0.5, ...}
# }
Parameter Type Description
dopamine float Dopamine value (0.0–1.0)
noradrenaline float noradrenaline value
acetylcholine float acetylcholine value
serotonin float serotonin value
oxytocin float oxytocin value
emotion_factor float emotion intensity factor
motivation_factor float motivation factor

POST /biomimetic/reward

TD Sends reward signals to update the dopamine system module.

payload = {
    "reward":          1.0,   # Reward value (-1.0 to 1.0)
    "td_error":        0.35,  # time difference error
    "update_neuromod": True,  # Do you want to instantly update your dopamine system?
}
resp = requests.post(f"{BASE_URL}/biomimetic/reward", headers=HEADERS, json=payload)
data = resp.json()
# {
#   "status": "ok",
#   "dopamine_level": 0.73,
# }

POST /biomimetic/sleep/consolidate

STDP is applied by replaying the memory traces accumulated in the hippocampal buffer offline.

payload = {
    "sleep_cycles":      3,     # Number of sleep cycles to perform
    "replay_buffer_size": 256,  # Replay buffer size (number of spikes)
    "stdp_lr":           0.01,  # STDP learning rate (during consolidation)
}
resp = requests.post(f"{BASE_URL}/biomimetic/sleep/consolidate", headers=HEADERS, json=payload)
data = resp.json()
# {
#   "status": "ok",
#   "cycles_completed": 3,
#   "replayed_events":  128,
# }

POST /biomimetic/sleep/wake-config

Update the schedule parameters of SleepWakeCycleController.

payload = {
    "awake_phase_steps":      500,  # Number of steps in awakening phase
    "sleep_phase_steps":      100,  # Number of sleep phase steps
    "sleep_cycles_per_epoch": 3,    # Number of cycles per epoch
}
resp = requests.post(f"{BASE_URL}/biomimetic/sleep/wake-config", headers=HEADERS, json=payload)
data = resp.json()
# {
#   "status": "ok",
#   "config_applied": {"awake_phase_steps": 500, ...}
# }

POST /biomimetic/framework/reset

Reinitializes BrainSimulationFramework and resets the state of all modules.

payload = {
    "enable_biomimetic": True,  # Should biomimetics be enabled after reset?
}
resp = requests.post(f"{BASE_URL}/biomimetic/framework/reset", headers=HEADERS, json=payload)
data = resp.json()
# {
#   "status":  "ok",
#   "message": "BrainSimulationFramework reinitialized",
# }

Calling via EvoSpikeNetAPIClient

Biomimetic endpoints that do not have a dedicated method can be called with _make_request(). SDK session features (automatic retries, header management) are available.

from evospikenet.sdk import EvoSpikeNetAPIClient

client = EvoSpikeNetAPIClient(base_url="http://localhost:8000", api_key=API_KEY)

# GET /biomimetic/status
status = client._make_request("GET", f"{client.base_url}/biomimetic/status")

# POST /biomimetic/neuromod/update
result = client._make_request(
    "POST",
    f"{client.base_url}/biomimetic/neuromod/update",
    json={"dopamine": 0.8, "serotonin": 0.5},
)

Complete sample code

sdk_biomimetic_rest_api.py

# Execute all endpoints in pipeline
python examples/sdk/sdk_biomimetic_rest_api.py --base-url http://localhost:8000

# Individual demo execution
python examples/sdk/sdk_biomimetic_rest_api.py --demo simulate
python examples/sdk/sdk_biomimetic_rest_api.py --demo neuromod
python examples/sdk/sdk_biomimetic_rest_api.py --demo sleep

Phase E-3 Connectome production/automatic synchronization API

Last updated: 2026-03-19 (Phase E-3 fully completed)

sync_connectome — automatic synchronization pipeline (E-3-1)

apply_delta(base_path, delta_path, result_path)

Apply delta JSON to existing NPZ and atomically generate new NPZ.

from scripts.sync_connectome import apply_delta

apply_delta(
    base_path="data/connectome/cache/visual.npz",
    delta_path="data/connectome/delta_v42.json",
    result_path="data/connectome/cache/visual_v43.npz",
)

Differential JSON Schema:```json { "schema_version": "1.0", "materialization_version_from": 42, "materialization_version_to": 43, "added": [ {"pre_id": 100, "post_id": 200, "weight": 0.15, "delay_ms": 1.5} ], "removed": [ {"pre_id": 50, "post_id": 80} ] }

#### `apply_delta_with_validation(base_path, delta_path, result_path, *, rollback_dir, ei_ratio_range)`

Apply difference with E/I ratio verification. If out of range, generate a backup in `rollback_dir` and raise `ConnectomeSyncValidationError`.

```python
from scripts.sync_connectome import apply_delta_with_validation, ConnectomeSyncValidationError

try:
    apply_delta_with_validation(
        base_path="visual.npz",
        delta_path="delta.json",
        result_path="visual_new.npz",
        rollback_dir="data/connectome/rollback/",
        ei_ratio_range=(3.5, 5.0),
    )
except ConnectomeSyncValidationError as e:
    print(f"E/I検証失敗・ロールバック完了: {e}")

sync_connectome(config_path, cache_path, output_path, *, dry_run, ...)

Fully automatic synchronization orchestrator. Recommended starting from CAVE API delta acquisition to application.

from scripts.sync_connectome import sync_connectome

result = sync_connectome(
    config_path="config/connectome_config.yaml",
    cache_path="data/connectome/cache/visual.npz",
    output_path="data/connectome/cache/visual.npz",
    dry_run=False,
)
print(result["status"])  # "success" | "dry_run" | "no_update"

CLI Usage Examples```bash

Normal execution

python scripts/sync_connectome.py \ --config config/connectome_config.yaml \ --cache data/connectome/cache/visual.npz \ --output data/connectome/cache/visual.npz

Dry run (display results without updating the output file)

python scripts/sync_connectome.py --dry-run \ --cache data/connectome/cache/visual.npz \ --output /dev/null

---

### brain_routing — HCP Latency Aware Zenoh Routing (E-3-3)

#### `compute_delay_matrix(manifest, config_path)`

The inter-node delay matrix is ​​constructed from the HCP actual measurements.

```python
from evospikenet.brain_routing import compute_delay_matrix
from evospikenet.connectome import build_manifest

manifest = build_manifest("data/connectome/")
delay_matrix = compute_delay_matrix(
    manifest=manifest,
    config_path="config/connectome_config.yaml",
)
# {"pfc": {"memory_spike": 12.0, "visual": 9.0, ...}, ...}
print(delay_matrix["pfc"]["memory_spike"])  # 12.0 ms

build_hcp_routing_table(config_path)

Generate a routing table with a full pipeline.

from evospikenet.brain_routing import build_hcp_routing_table

routing = build_hcp_routing_table("config/connectome_config.yaml")
# routing["delay_matrix"]  — {src: {dst: ms}}
# routing["routing_plan"]["routing_edges"] — priority descending edge list
# routing["zenoh_topics"] — "brain_routing/delays/{node_id}" list

HCPDelayRouter

Class that publishes a delay profile to a Zenoh session. Also works with session=None (log only).

from evospikenet.brain_routing import HCPDelayRouter

# Tests that work without a Zenoh session
# If session=None, the routing function is only logs
router = HCPDelayRouter(session=None, config_path="config/connectome_config.yaml")
router.load_routing_table()

# Assign delay profile to data payload
data = {"spikes": [1, 0, 1], "timestamp": 1.0}
enriched = router.apply_hcp_delays(node_id="pfc", data=data)
# {"spikes": [...], "timestamp": 1.0, "routing_delays": {"memory_spike": 12.0, ...}}

# Issuing delay information to all nodes at once
# Zenoh topic: brain_routing/delays/{node_id}
router.publish_all()

auto_node_mapper — Auto Node Mapper CLI

map_connectome(input_path, output_dir, config_path, *, dry_run, seed)

Divide the connectome into node-based NPZs.

from scripts.auto_node_mapper import map_connectome

result = map_connectome(
    input_path="data/connectome/flywire_visual.json",
    output_dir="data/connectome/nodes/",
    config_path="config/connectome_config.yaml",
    dry_run=False,
    seed=42,
)
print(f"マッピング完了: {result.total_nodes} ノード中 {result.mapped_nodes} 成功")
print(f"マニフェスト: {result.manifest_path}")
for entry in result.entries:
    print(f"  {entry.node_type}: {entry.n_neurons} neurons, E/I={entry.ei_ratio:.2f}, {entry.status}")

Output structure:``` data/connectome/nodes/ ├── visual.npz ← 視覚野ノード用圧縮 NPZ ├── memory_spike.npz ← 記憶ノード用 └── node_manifest.yaml ← 全ノードメタデータ

**node_manifest.yaml schema:**```yaml
schema_version: "1.0"
generated_at: "2026-03-19T10:00:00Z"
base_dataset: "flywire_visual"
nodes:
  visual:
    npz_path: data/connectome/nodes/visual.npz
    n_neurons: 1024
    ei_ratio: 4.1
    coarsening_method: stratified_sample
    status: ok
  memory_spike:
    npz_path: data/connectome/nodes/memory_spike.npz
    n_neurons: 512
    ei_ratio: 4.0
    coarsening_method: spectral_coarsen
    status: ok

CLI Usage Examples```bash

full mapping

python scripts/auto_node_mapper.py \ --input data/connectome/flywire_visual.json \ --output-dir data/connectome/nodes/ \ --config config/connectome_config.yaml

Dry run/fixed constant for reproducibility

python scripts/auto_node_mapper.py \ --input data/connectome/flywire_visual.json \ --output-dir data/connectome/nodes/ \ --dry-run --seed 42

---

### Complete Connectome E-3 Step-by-Step Sample

Typical workflow from full initialization to regular sync:

```python
import evospikenet as esn
from scripts.auto_node_mapper import map_connectome
from scripts.sync_connectome import sync_connectome
from evospikenet.brain_routing import HCPDelayRouter
from evospikenet import load_connectome_npz, apply_connectome_to_layer, ConnectomeLIFLayer

# --- Step 1: Connectome → Divide into NPZ by node ---
result = map_connectome(
    input_path="data/connectome/flywire_visual.json",
    output_dir="data/connectome/nodes/",
    config_path="config/connectome_config.yaml",
    seed=42,
)
print(f"E-3 Auto Node Mapper: {result.mapped_nodes} nodes mapped")

# --- Step 2: Inject per-node NPZ into ConnectomeLIFLayer ---
visual_data = load_connectome_npz("data/connectome/nodes/visual.npz")
layer = ConnectomeLIFLayer(num_neurons=visual_data["n_neurons"], device="cpu")
apply_connectome_to_layer(visual_data, layer)
print(f"structural_mask: {layer.structural_mask.shape}, ei_ratio={layer.ei_ratio:.2f}")

# --- Step 3: Construct HCP delay routing ---
router = HCPDelayRouter(session=None, config_path="config/connectome_config.yaml")
router.load_routing_table()
router.publish_all()  # Without Zenoh, only logs

# --- Step 4: Automatic synchronization (obtain differences from CAVE API) ---
sync_result = sync_connectome(
    config_path="config/connectome_config.yaml",
    cache_path="data/connectome/nodes/visual.npz",
    output_path="data/connectome/nodes/visual.npz",
    dry_run=True,  # First check with a dry run
)
print(f"sync status: {sync_result['status']}")

Complete walkthrough sample: connectome_e3_demo.py ````