Skip to content

Sensor driver development guide

[!NOTE] For the latest implementation status, please refer to Functional Implementation Status (Remaining Functionality).

EvoSpikeNet's distributed brain simulation allows external sensors to be plugged in. It's designed to be captured. This book describes drivers for new devices. We'll summarize the steps and best practices to create one.


1. Overview of mechanism

The sensor integration package is implemented in evospikenet.sensor_integration, The main components are:

  • SensorDriver: Abstract base class. Connect/Stream/Sample Acquisition Interface Definition.
  • SensorManager: Singleton factory that registers and generates drivers.
  • SensorType : Enum type. Define categories such as CAMERA, LIDAR, ENVIRONMENTAL, AUDIO, GPIO, etc.
  • Various samples/information dataclass (SensorInfo, SensorSample).

The driver is registered with SensorManager.register_driver along with the type name, Applications are created by specifying types.

from evospikenet.sensor_integration import SensorManager, SensorType, SensorInfo

info = SensorInfo(sensor_type=SensorType.CAMERA, name="webcam0")
driver = SensorManager.create_driver(SensorType.CAMERA, info=info, source=0)

Registered drivers are simple mappings that return the last one registered. If you want to use multiple types in parallel, create a separate factory function, or Use SensorManager.available_types() to check the registered types and switch.


2. New driver implementation steps

  1. Create a new module under evospikenet/sensor_integration/. For example, my_sensor.py.

  2. Import the required classes at the beginning of the file:python from evospikenet.sensor_integration.device_interface import ( SensorDriver, SensorInfo, SensorSample, SensorStatus, SensorType, SensorManager )

  3. Define a class that inherits from SensorDriver and implement the following abstract methods.

  4. connect(self) -> bool
  5. disconnect(self) -> None
  6. start_stream(self) -> None
  7. stop_stream(self) -> None
  8. read_sample(self) -> Optional[SensorSample]

Implementation is easier if you refer to existing drivers.

  1. Add constructor arguments as needed and set them in capabilities of SensorInfo. Record functionality.

  2. Call SensorManager.register_driver at the end of the module to register.python SensorManager.register_driver(SensorType.MY_TYPE, MySensorDriver)*If you do not want to register depending on arguments like ONVIFCamera, omit the registration, It can also be a style that the client instantiates itself.

  3. Add unit tests for your driver implementation to tests/unit/. Mocks allow you to test without hardware.

  4. Add examples and explanations to the documentation (docs/) as needed.


3. Write the test

  • Existing tests/unit/test_usb_camera_driver.py or Refer to test_stereo_infrared_onvif_env.py.
  • Install device-dependent libraries (cv2, rplidar, etc.) with unittest.mock.patch. Covers replacement, normal system and abnormal system.
  • Tests using SensorManager.create_driver and classes directly It is robust to have tests that call both.

4. Manage driver registration

If multiple drivers are registered to the same SensorType, the one registered later will be I'll be back. If you want to use different drivers depending on the purpose,

# Instantiate by explicitly specifying the class at time of use
from evospikenet.sensor_integration import ONVIFCameraDriver, SensorInfo
info = SensorInfo(sensor_type=SensorType.CAMERA, name="ipcam")
driver = ONVIFCameraDriver(info, url="rtsp://...")

Or prepare a separate factory function.


5. Example: New driver sample

5.1 Specific example of USB camera driver

Since USB cameras are one of the most common sensors, We will provide step-by-step instructions for driver implementation.

  1. Create a new module: Create evospikenet/sensor_integration/usb_camera.py.
  2. Importing dependent libraries: Since we use OpenCV (cv2) Write try: import cv2 except ImportError: cv2 = None, Allow the module to be loaded even if the library is not installed.
  3. Class definition: Inherit from SensorDriver, connect/disconnect/start_stream/stop_stream/read_sample Implementation. connect opens cv2.VideoCapture, read_sample returns the acquired frame as RGB numpy.ndarray.
  4. Register: At the end of the file SensorManager.register_driver(SensorType.CAMERA, USBCameraDriver) Call to register as the default driver.
  5. Test: Add tests/unit/test_usb_camera_driver.py and Mock cv2 and verify the operation of each method.
  6. Add documentation: Add detailed instructions to this guide. Link to README or BRIEF.

The actual implementation code is as follows.

# evospikenet/sensor_integration/usb_camera.py
from evospikenet.sensor_integration.device_interface import (
    SensorDriver, SensorInfo, SensorSample, SensorStatus, SensorType, SensorManager,
)

import time
import numpy as np

try:
    import cv2
except ImportError:
    cv2 = None

class USBCameraDriver(SensorDriver):
    def __init__(self, info: SensorInfo, source: Any = 0):
        super().__init__(info)
        self.source = source
        self._capture = None

    def connect(self) -> bool:
        if cv2 is None:
            raise RuntimeError("OpenCV required")
        self._capture = cv2.VideoCapture(self.source)
        success = self._capture.isOpened()
        self._set_status(SensorStatus.CONNECTED if success else SensorStatus.ERROR)
        return success

    def disconnect(self) -> None:
        if self._capture:
            self._capture.release()
            self._capture = None
        self._set_status(SensorStatus.DISCONNECTED)

    def start_stream(self) -> None:
        if not self.is_connected:
            raise RuntimeError("camera not connected")
        self._set_status(SensorStatus.STREAMING)

    def stop_stream(self) -> None:
        self._set_status(SensorStatus.CONNECTED)

    def read_sample(self):
        if not self.is_streaming or self._capture is None:
            return None
        ret, frame = self._capture.read()
        if not ret:
            return None
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        self._update_sample_time()
        return SensorSample(timestamp_ns=int(time.time()*1e9), data=frame)

# Test example
```python
# tests/unit/test_usb_camera_driver.py
import unittest

5.2 GPIO switch driver example

GPIO やリレーを制御するプラグインも同様の手順で実装できます。

  1. 新規ファイル gpio_switch.py を作成。
  2. RPi.GPIO の有無に依存しないよう try/except でインポート。
  3. SensorDriver を継承し、pin 番号をコンストラクタで受け取る。
  4. write_state メソッドを追加して 0/1 出力を切り替える。
  5. ダミードライバをサブクラス化し、テスト用にログ出力。
  6. 最後に SensorManager.register_driver で登録。
# evospikenet/sensor_integration/gpio_switch.py
from evospikenet.sensor_integration.device_interface import (
    SensorDriver, SensorInfo, SensorSample, SensorStatus, SensorType, SensorManager
)
import time

try:
    import RPi.GPIO as GPIO
except ImportError:
    GPIO = None

class GPIOSwitchDriver(SensorDriver):
    def __init__(self, info: SensorInfo, pin=None):
        super().__init__(info)
        if info.capabilities is None:
            info.capabilities = {}
        self.pin = pin if pin is not None else info.capabilities.get("pin")
        if self.pin is None:
            raise ValueError("pin number must be provided")
        if GPIO and not GPIO.getmode():
            GPIO.setmode(GPIO.BCM)
        self._state = 0

    def connect(self) -> bool:
        if GPIO:
            GPIO.setup(self.pin, GPIO.OUT)
        self._set_status(SensorStatus.CONNECTED)
        return True

    def write_state(self, value: int) -> None:
        if value not in (0,1):
            raise ValueError("state must be 0 or 1")
        self._state = value
        if GPIO:
            GPIO.output(self.pin, value)
        self._update_sample_time()

# Other methods are omitted...

SensorManager.register_driver(SensorType.AUDIO, DummyGPIOSwitchDriver)

テスト例は tests/unit/test_gpio_switch.py を参照してください。

5.3 Audio input/output driver example

マイクとスピーカは sounddevice を利用するドライバを用意しました。

# evospikenet/sensor_integration/audio_input.py
from evospikenet.sensor_integration.device_interface import (
    SensorDriver, SensorInfo, SensorSample, SensorStatus, SensorType, SensorManager
)
import numpy as np, time
try:
    import sounddevice as sd
except ImportError:
    sd = None

class MicrophoneDriver(SensorDriver):
    def __init__(self, info: SensorInfo):
        super().__init__(info)
        if info.capabilities is None:
            info.capabilities = {}
        self.stream = None
        self.buffer = None
    def connect(self):
        if sd is None:
            raise RuntimeError("sounddevice required")
        self.stream = sd.InputStream(samplerate=info.capabilities.get("samplerate",16000), channels=info.capabilities.get("channels",1), callback=self._callback)
        self._set_status(SensorStatus.CONNECTED)
        return True
    # Omitted...

SensorManager.register_driver(SensorType.AUDIO, DummyMicrophoneDriver)

スピーカ側は audio_output.py に同様の構造。テストは tests/unit/test_audio_drivers.py を参照。

```from unittest.mock import MagicMock, patch import numpy as np from evospikenet.sensor_integration import USBCameraDriver, SensorInfo, SensorType, SensorStatus

class TestUSBCameraDriver(unittest.TestCase): @patch('evospikenet.sensor_integration.usb_camera.cv2') def test_connect_and_read(self, mock_cv2): frame = np.zeros((10,10,3), dtype=np.uint8) cap = MagicMock() cap.isOpened.return_value = True cap.read.return_value = (True, frame) mock_cv2.VideoCapture.return_value = cap mock_cv2.cvtColor.return_value = frame

    info = SensorInfo(sensor_type=SensorType.CAMERA, name='cam')
    drv = USBCameraDriver(info, source=0)
    self.assertTrue(drv.connect())
    drv.start_stream()
    sample = drv.read_sample()
    self.assertIsNotNone(sample)
    self.assertEqual(sample.data.shape, frame.shape)
    drv.stop_stream()
    drv.disconnect()

python

evospikenet/sensor_integration/temperature_sensor.py

    if not ret:
        return None
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    self._update_sample_time()
    return SensorSample(timestamp_ns=int(time.time()*1e9), data=frame)

SensorManager.register_driver(SensorType.CAMERA, USBCameraDriver) ```

# evospikenet/sensor_integration/temperature_sensor.py
from evospikenet.sensor_integration.device_interface import (
    SensorDriver, SensorInfo, SensorSample, SensorStatus, SensorType, SensorManager
)

class TempSensorDriver(SensorDriver):
    def __init__(self, info: SensorInfo, port: str):
        super().__init__(info)
        self.port = port
        self._conn = None

def connect(self) -> bool:
        self._conn = open_serial(self.port) # temporary function
        self._set_status(SensorStatus.CONNECTED)
        return True

    def disconnect(self) -> None:
        if self._conn:
            self._conn.close()
            self._conn = None
        self._set_status(SensorStatus.DISCONNECTED)

    def start_stream(self) -> None:
        if not self.is_connected:
            raise RuntimeError("not connected")
        self._set_status(SensorStatus.STREAMING)

    def stop_stream(self) -> None:
        self._set_status(SensorStatus.CONNECTED)

    def read_sample(self):
        raw = self._conn.read_line()
        temp = float(raw)
        self._update_sample_time()
        return SensorSample(timestamp_ns=int(time.time()*1e9), data={"temp": temp})

SensorManager.register_driver(SensorType.ENVIRONMENTAL, TempSensorDriver)

このガイドの存在を明示するため、docs/NEUROSCIENCE_BRAIN_SIMULATION_BRIEF.md の「センサー接続プラグイン」セクション末尾にリンクを追加します。

```markdown For detailed driver development procedures, refer to Sensor Driver Development Guide. ````


7. Best practices/notes

  • Make hardware-specific libraries option-dependent and import them at the time of import. Allow module to load even if it fails
  • Keeping device-specific settings in SensorInfo.capabilities makes expansion easier.
  • Since actual machine testing is difficult in a docker/container environment, mocks are used in CI. Cover it with the unit test you used
  • Drivers can be easily debugged if state transitions are properly logged.

Based on this guide, you can use LiDAR, stereo camera, infrared camera, ONVIF/IP camera, You can now create new drivers for any environmental sensor. Other The same process can be applied to different sensor types.