Advanced node detection system
Copyright: 2026 Moonlight Technologies Inc. All Rights Reserved.
Author: Masahiro Aoki
Implementation notes (artifacts): See
docs/implementation/ARTIFACT_MANIFESTS.mdfor theartifact_manifest.jsonoutput by the training script and recommended CLI flags.
overview
EvoSpikeNet's advanced node detection system enables dynamic detection and management of active nodes in distributed brain simulations. It leverages Zenoh's Pub/Sub mechanism to track node state in real-time and enables adaptive routing through the PFC decision engine. This system uses a heartbeat mechanism to check whether a node is alive at 2-second intervals and determines it to be inactive after a 5-second timeout. Build a scalable distributed system with automatic registration of new nodes and discovery announcements.
Purpose and use of this document
- Purpose: Organize the functions, flows, and dependencies of the node detection system and share implementation/operation guidelines.
- Target audience: Distributed brain node implementers, PFC/Executive implementers, UI/operation personnel.
- Read first: Overview → Key Features → Architecture → Implementation Details/Fallbacks.
- Related links: Examples/run_zenoh_distributed_brain.py for distributed brain execution script, implementation/PFC_ZENOH_EXECUTIVE.md for PFC/Zenoh/Executive details.
Main features
1. Dynamic Node Discovery & Health Management
- Automatic detection of active nodes through heartbeat mechanism
- Heartbeat includes resource usage such as CPU/MEM/DISK.
health_scoreis calculated and reflected in load balancing and UI - Automatic registration of new nodes and discovery announcements
- Node state management (active/inactive/error) and region (
region_id)
2. Filtering by module type and geo-first
- Obtain a list of nodes by module type such as PFC, lang-main, visual, audio, motor, etc.
- Canonicalization of compound module names (lang-embed-18 → lang-main)
- Extract only active/healthy nodes
- Parameters can be provided to the load balancer to narrow down based on region priority
3. Network topology management
- Visualization data generation of connection relationships between nodes
- Formatted data export for UI
- Real-time node statistics (including health scores, region information)
4. Fallback mechanism integration
- Check active node of target module
- Automatic fallback to lang-main on inactivity
- Prevent UI hangs by guaranteeing query completion
5. REST API Endpoint
The following endpoints have been added, and external systems and UI are Health information and topology can now be obtained directly.
GET /api/node-discovery/health– equivalent toexport_for_ui()Returns a node list and summary.GET /api/node-discovery/topology– Returns the current topology (nodes/edges).
Architecture
flowchart TD
subgraph "Node Discovery Service"
ND["ZenohNodeDiscovery: Heartbeat monitoring 2 seconds interval, Discovery announcement processing, node status management timeout: 5 seconds"]
end
ND --> ZC
subgraph "Zenoh Communication"
HB["heartbeat/node: sent every 2 seconds"]
DA["discovery/announce: sent at startup"]
end
ZC --> DBN
subgraph "Distributed Brain Nodes"
PFC["PFC: Rank 0"]
LM["Lang-Main: Rank 1"]
V["Visual: Rank 2"]
A["Audio: Rank 3"]
end
PFC --> CHK
LM --> CHK
V --> CHK
A --> CHK
CHK["_has_active_nodes_for_module: Check active using NodeDiscovery"]
Implementation details
NodeDiscovery class
(Updated 2026‑02‑22)
- Added health_score and region_id to NodeInfo.
- Individual health values can be obtained with get_node_health(node_id).
- Automatic load update with LoadBalancer.refresh_from_discovery().
Initialization```python
from evospikenet.node_discovery import init_node_discovery, get_node_discovery
Initializing the global instance
discovery = init_node_discovery(namespace="evospikenet")
Get an existing instance
discovery = get_node_discovery()
#### Get node informationpython
Get all nodes
all_nodes = discovery.get_all_nodes()
Get only active nodes
active_nodes = discovery.get_active_nodes()
Acquisition by module type
visual_nodes = discovery.get_nodes_by_type("visual") lang_nodes = discovery.get_nodes_by_type("lang-main")
Node count statistics
counts = discovery.get_node_count()
Returns: {"active": 4, "inactive": 0, "total": 4}
#### Data export for UI```python
# Format for UI display
ui_data = discovery.export_for_ui()
# Returns:
# {
# "nodes": [
# {
# "node_id": "pfc-0",
# "type": "pfc",
# "host": "brain-node-0",
# "status": "active",
# "status_icon": "🟢",
# "last_seen": "14:23:15",
# "uptime": "2.3s ago",
# "metadata": {...}
# },
# ...
# ],
# "summary": {"active": 4, "inactive": 0, "total": 4},
# "updated_at": "14:23:17"
# }
# Network topology acquisition
topology = discovery.get_topology()
# Returns:
# {
# "nodes": [...],
# "edges": [...],
# "timestamp": 1701234567.89
# }
Brain Node integration
Heartbeat transmission
Each Brain Node sends heartbeats periodically:
def _send_heartbeat(self):
"""Send heartbeat to node discovery service."""
current_time = time.time()
if current_time - self._last_heartbeat < self._heartbeat_interval:
return
self._last_heartbeat = current_time
heartbeat_data = {
"node_id": self.node_id,
"module_type": self.module_type,
"host": os.environ.get("HOSTNAME", "unknown"),
"timestamp": current_time,
"metadata": {
"step_count": self.step_count,
"active_task": self.active_task
}
}
self.comm.publish(f"heartbeat/{self.node_id}", heartbeat_data, serialize="json")
Check active node
PFC node checks active node of target module before routing:
def _has_active_nodes_for_module(self, module_type: str) -> bool:
"""
Check if there are active nodes for the given module type.
Uses NodeDiscovery service to dynamically detect active nodes.
"""
# lang-main is always available
if module_type == "lang-main":
return True
# Use NodeDiscovery service
if self.node_discovery is not None:
try:
# Normalization of module types
base_type = self._get_base_module_type(module_type)
# Get active node
active_nodes = self.node_discovery.get_nodes_by_type(base_type)
active_nodes = [n for n in active_nodes if n.status == "active"]
has_nodes = len(active_nodes) > 0
if has_nodes:
self.logger.debug(f"Found {len(active_nodes)} active {base_type} nodes")
else:
self.logger.debug(f"No active {base_type} nodes found")
return has_nodes
except Exception as e:
self.logger.warning(f"Error querying node discovery: {e}")
# Fallback: Guaranteed only for lang-main
return False
Zenoh Topics
Discovery Topics
| Topic | Direction | Content | Purpose |
|---|---|---|---|
discovery/announce |
Node → Discovery | Node information (JSON) | Registering a new node |
heartbeat/<node_id> |
Node → Discovery | Heartbeat (JSON) | Node survival confirmation |
Heartbeat Message Format
{
"node_id": "pfc-0",
"module_type": "pfc",
"host": "brain-node-0",
"timestamp": 1701234567.89,
"metadata": {
"step_count": 42,
"active_task": true
}
}
Discovery Announcement Format
{
"node_id": "visual-2",
"module_type": "visual",
"host": "brain-node-2",
"metadata": {
"config": "Visual processing node"
}
}
Configuration parameters
NodeDiscovery settings```python
ZenohNodeDiscovery( namespace="evospikenet", # Zenoh namespace timeout=5.0 # Node inactivity timeout (seconds) )
### Brain Node settings```python
# Heartbeat transmission interval
self._heartbeat_interval = 2.0 # every 2 seconds
# NodeDiscovery integration
self.node_discovery = get_node_discovery()
Usage example
Basic usage```python
Node discovery service initialization
discovery = init_node_discovery()
Check active node
active_nodes = discovery.get_active_nodes() for node in active_nodes: print(f"{node.node_id} ({node.module_type}) - {node.status}")
Check specific module
visual_nodes = discovery.get_nodes_by_type("visual") if visual_nodes: print(f"Found {len(visual_nodes)} visual processing nodes") else: print("No visual nodes available, falling back to lang-main")
### Integration with PFC Routing```python
# Routing within PFC Decision Engine
if self._has_active_nodes_for_module(target_module):
# Route to target module
topic = self._get_topic_for_module(target_module)
self.comm.publish(topic, data)
else:
# Fallback to lang-main
self.logger.warning(f"No active {target_module} nodes, falling back to lang-main")
self.comm.publish("pfc/text_prompt", data)
UI integration```python
Node status display on front end
ui_data = discovery.export_for_ui()
Rendering with React/Vue etc.
for node in ui_data['nodes']: render_node_card( icon=node['status_icon'], name=node['node_id'], type=node['type'], status=node['status'], last_seen=node['last_seen'] )
## Performance characteristics
### Latency
- **Heartbeat transmission**: ~1ms (asynchronous publish)
- **Node discovery**: up to 2 seconds (heartbeat_interval)
- **Status Check**: ~0.1ms (in-memory data structure)
### Resource usage
- **Memory**: ~1KB per node (NodeInfo)
- **Network**: ~200 bytes per heartbeat (every 2 seconds)
- **CPU**: < 0.1% (monitoring thread)
### Scalability
- **Number of supported nodes**: Thousands of nodes
- **Discovery time**: O(1) - Search in memory
- **Timeout processing**: O(n) - Check every second
## troubleshooting
### Node not detected```bash
# Zenoh session confirmation
# Check the log for "Node discovery initialized"
# Heartbeat transmission confirmation
# Check log for "heartbeat/<node_id>" publish
# Check timeout settings
# Default: 5 seconds (increase if too short)
Fallbacks occur frequently```python
Check node status
discovery = get_node_discovery() counts = discovery.get_node_count() print(f"Active: {counts['active']}, Inactive: {counts['inactive']}")
Check the node of a specific module
nodes = discovery.get_nodes_by_type("visual") for node in nodes: print(f"{node.node_id}: {node.status} (last seen: {time.time() - node.last_seen:.1f}s ago)")
### Memory leak concerns```python
# Automatic cleanup of old nodes (to be implemented in the future)
# Currently: Inactive nodes are also kept in memory
# Recommended: Periodic discovery reinitialization or service restart
Future expansion
Implementation status
✅ Implemented features (v0.1.0)
| Features | Status | Details |
|---|---|---|
| ZenohNodeDiscovery | ✅ Fully implemented | Heartbeat monitoring, node status management, timeout processing |
| Dynamic node discovery | ✅ Fully implemented | Heartbeat every 2 seconds, timeout 5 seconds |
| Filtering by module type | ✅ Fully implemented | get_nodes_by_type(), compound name normalization |
| Network topology management | ✅ Fully implemented | get_topology(), data export for UI |
| Fallback mechanism | ✅ Fully implemented | _has_active_nodes_for_module() integration |
| UI integration | ✅ Fully implemented | export_for_ui(), real-time status display |
| Brain Node integration | ✅ Fully implemented | Heartbeat transmission, active node confirmation |
⏳ Planned/unimplemented functions
Schedule function
- Automatic node cleanup: Removal of long-term inactive nodes
- Node-to-node connectivity graph: Automatically infer edges from metadata
- Health Check: Health metrics other than heartbeats
- Load Balancing: Load balancing among multiple nodes of the same module type
- Geographically distributed support: Node management across multiple data centers
Customization points
- Adjust timeout value
- Optimized heartbeat interval
- Expanded metadata fields
- Custom filtering logic
Related documents
- DISTRIBUTED_BRAIN_SYSTEM.md - Entire distributed brain system
- ADVANCED_DECISION_ENGINE.md - Advanced decision engine
- SPIKE_COMMUNICATION_ANALYSIS.md - Zenoh communication details
summary
With its advanced node discovery system, EvoSpikeNet's distributed brain simulation enables:
- ✅ Dynamic node detection: Track active nodes in real time
- ✅ Adaptive Routing: Flexible routing based on available nodes
- ✅ Fallback Guarantee: Always guarantees query completion
- ✅ UI integration: Real-time node status visualization
- ✅ Scalable: Can support up to thousands of nodes
This provides high availability and flexibility in full brain simulation.