Sensor driver development guide
[!NOTE] For the latest implementation status, please refer to Functional Implementation Status (Remaining Functionality).
EvoSpikeNet's distributed brain simulation allows external sensors to be plugged in. It's designed to be captured. This book describes drivers for new devices. We'll summarize the steps and best practices to create one.
1. Overview of mechanism
The sensor integration package is implemented in evospikenet.sensor_integration,
The main components are:
SensorDriver: Abstract base class. Connect/Stream/Sample Acquisition Interface Definition.SensorManager: Singleton factory that registers and generates drivers.SensorType: Enum type. Define categories such asCAMERA,LIDAR,ENVIRONMENTAL,AUDIO,GPIO, etc.- Various samples/information dataclass (
SensorInfo,SensorSample).
The driver is registered with SensorManager.register_driver along with the type name,
Applications are created by specifying types.
from evospikenet.sensor_integration import SensorManager, SensorType, SensorInfo
info = SensorInfo(sensor_type=SensorType.CAMERA, name="webcam0")
driver = SensorManager.create_driver(SensorType.CAMERA, info=info, source=0)
Registered drivers are simple mappings that return the last one registered.
If you want to use multiple types in parallel, create a separate factory function, or
Use SensorManager.available_types() to check the registered types and switch.
2. New driver implementation steps
-
Create a new module under
evospikenet/sensor_integration/. For example,my_sensor.py. -
Import the required classes at the beginning of the file:
python from evospikenet.sensor_integration.device_interface import ( SensorDriver, SensorInfo, SensorSample, SensorStatus, SensorType, SensorManager ) -
Define a class that inherits from
SensorDriverand implement the following abstract methods. connect(self) -> booldisconnect(self) -> Nonestart_stream(self) -> Nonestop_stream(self) -> Noneread_sample(self) -> Optional[SensorSample]
Implementation is easier if you refer to existing drivers.
-
Add constructor arguments as needed and set them in
capabilitiesofSensorInfo. Record functionality. -
Call
SensorManager.register_driverat the end of the module to register.python SensorManager.register_driver(SensorType.MY_TYPE, MySensorDriver)*If you do not want to register depending on arguments like ONVIFCamera, omit the registration, It can also be a style that the client instantiates itself. -
Add unit tests for your driver implementation to
tests/unit/. Mocks allow you to test without hardware. -
Add examples and explanations to the documentation (
docs/) as needed.
3. Write the test
- Existing
tests/unit/test_usb_camera_driver.pyor Refer totest_stereo_infrared_onvif_env.py. - Install device-dependent libraries (
cv2,rplidar, etc.) withunittest.mock.patch. Covers replacement, normal system and abnormal system. - Tests using
SensorManager.create_driverand classes directly It is robust to have tests that call both.
4. Manage driver registration
If multiple drivers are registered to the same SensorType, the one registered later will be
I'll be back. If you want to use different drivers depending on the purpose,
# Instantiate by explicitly specifying the class at time of use
from evospikenet.sensor_integration import ONVIFCameraDriver, SensorInfo
info = SensorInfo(sensor_type=SensorType.CAMERA, name="ipcam")
driver = ONVIFCameraDriver(info, url="rtsp://...")
Or prepare a separate factory function.
5. Example: New driver sample
5.1 Specific example of USB camera driver
Since USB cameras are one of the most common sensors, We will provide step-by-step instructions for driver implementation.
- Create a new module: Create
evospikenet/sensor_integration/usb_camera.py. - Importing dependent libraries: Since we use OpenCV (
cv2) Writetry: import cv2 except ImportError: cv2 = None, Allow the module to be loaded even if the library is not installed. - Class definition: Inherit from
SensorDriver,connect/disconnect/start_stream/stop_stream/read_sampleImplementation.connectopenscv2.VideoCapture,read_samplereturns the acquired frame as RGBnumpy.ndarray. - Register: At the end of the file
SensorManager.register_driver(SensorType.CAMERA, USBCameraDriver)Call to register as the default driver. - Test: Add
tests/unit/test_usb_camera_driver.pyand Mockcv2and verify the operation of each method. - Add documentation: Add detailed instructions to this guide. Link to README or BRIEF.
The actual implementation code is as follows.
# evospikenet/sensor_integration/usb_camera.py
from evospikenet.sensor_integration.device_interface import (
SensorDriver, SensorInfo, SensorSample, SensorStatus, SensorType, SensorManager,
)
import time
import numpy as np
try:
import cv2
except ImportError:
cv2 = None
class USBCameraDriver(SensorDriver):
def __init__(self, info: SensorInfo, source: Any = 0):
super().__init__(info)
self.source = source
self._capture = None
def connect(self) -> bool:
if cv2 is None:
raise RuntimeError("OpenCV required")
self._capture = cv2.VideoCapture(self.source)
success = self._capture.isOpened()
self._set_status(SensorStatus.CONNECTED if success else SensorStatus.ERROR)
return success
def disconnect(self) -> None:
if self._capture:
self._capture.release()
self._capture = None
self._set_status(SensorStatus.DISCONNECTED)
def start_stream(self) -> None:
if not self.is_connected:
raise RuntimeError("camera not connected")
self._set_status(SensorStatus.STREAMING)
def stop_stream(self) -> None:
self._set_status(SensorStatus.CONNECTED)
def read_sample(self):
if not self.is_streaming or self._capture is None:
return None
ret, frame = self._capture.read()
if not ret:
return None
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
self._update_sample_time()
return SensorSample(timestamp_ns=int(time.time()*1e9), data=frame)
# Test example
```python
# tests/unit/test_usb_camera_driver.py
import unittest
5.2 GPIO switch driver example
GPIO やリレーを制御するプラグインも同様の手順で実装できます。
- 新規ファイル
gpio_switch.pyを作成。 RPi.GPIOの有無に依存しないよう try/except でインポート。SensorDriverを継承し、pin 番号をコンストラクタで受け取る。write_stateメソッドを追加して 0/1 出力を切り替える。- ダミードライバをサブクラス化し、テスト用にログ出力。
- 最後に
SensorManager.register_driverで登録。
# evospikenet/sensor_integration/gpio_switch.py
from evospikenet.sensor_integration.device_interface import (
SensorDriver, SensorInfo, SensorSample, SensorStatus, SensorType, SensorManager
)
import time
try:
import RPi.GPIO as GPIO
except ImportError:
GPIO = None
class GPIOSwitchDriver(SensorDriver):
def __init__(self, info: SensorInfo, pin=None):
super().__init__(info)
if info.capabilities is None:
info.capabilities = {}
self.pin = pin if pin is not None else info.capabilities.get("pin")
if self.pin is None:
raise ValueError("pin number must be provided")
if GPIO and not GPIO.getmode():
GPIO.setmode(GPIO.BCM)
self._state = 0
def connect(self) -> bool:
if GPIO:
GPIO.setup(self.pin, GPIO.OUT)
self._set_status(SensorStatus.CONNECTED)
return True
def write_state(self, value: int) -> None:
if value not in (0,1):
raise ValueError("state must be 0 or 1")
self._state = value
if GPIO:
GPIO.output(self.pin, value)
self._update_sample_time()
# Other methods are omitted...
SensorManager.register_driver(SensorType.AUDIO, DummyGPIOSwitchDriver)
テスト例は tests/unit/test_gpio_switch.py を参照してください。
5.3 Audio input/output driver example
マイクとスピーカは sounddevice を利用するドライバを用意しました。
# evospikenet/sensor_integration/audio_input.py
from evospikenet.sensor_integration.device_interface import (
SensorDriver, SensorInfo, SensorSample, SensorStatus, SensorType, SensorManager
)
import numpy as np, time
try:
import sounddevice as sd
except ImportError:
sd = None
class MicrophoneDriver(SensorDriver):
def __init__(self, info: SensorInfo):
super().__init__(info)
if info.capabilities is None:
info.capabilities = {}
self.stream = None
self.buffer = None
def connect(self):
if sd is None:
raise RuntimeError("sounddevice required")
self.stream = sd.InputStream(samplerate=info.capabilities.get("samplerate",16000), channels=info.capabilities.get("channels",1), callback=self._callback)
self._set_status(SensorStatus.CONNECTED)
return True
# Omitted...
SensorManager.register_driver(SensorType.AUDIO, DummyMicrophoneDriver)
スピーカ側は audio_output.py に同様の構造。テストは
tests/unit/test_audio_drivers.py を参照。
```from unittest.mock import MagicMock, patch import numpy as np from evospikenet.sensor_integration import USBCameraDriver, SensorInfo, SensorType, SensorStatus
class TestUSBCameraDriver(unittest.TestCase): @patch('evospikenet.sensor_integration.usb_camera.cv2') def test_connect_and_read(self, mock_cv2): frame = np.zeros((10,10,3), dtype=np.uint8) cap = MagicMock() cap.isOpened.return_value = True cap.read.return_value = (True, frame) mock_cv2.VideoCapture.return_value = cap mock_cv2.cvtColor.return_value = frame
info = SensorInfo(sensor_type=SensorType.CAMERA, name='cam')
drv = USBCameraDriver(info, source=0)
self.assertTrue(drv.connect())
drv.start_stream()
sample = drv.read_sample()
self.assertIsNotNone(sample)
self.assertEqual(sample.data.shape, frame.shape)
drv.stop_stream()
drv.disconnect()
python
evospikenet/sensor_integration/temperature_sensor.py
if not ret:
return None
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
self._update_sample_time()
return SensorSample(timestamp_ns=int(time.time()*1e9), data=frame)
SensorManager.register_driver(SensorType.CAMERA, USBCameraDriver) ```
# evospikenet/sensor_integration/temperature_sensor.py
from evospikenet.sensor_integration.device_interface import (
SensorDriver, SensorInfo, SensorSample, SensorStatus, SensorType, SensorManager
)
class TempSensorDriver(SensorDriver):
def __init__(self, info: SensorInfo, port: str):
super().__init__(info)
self.port = port
self._conn = None
def connect(self) -> bool:
self._conn = open_serial(self.port) # temporary function
self._set_status(SensorStatus.CONNECTED)
return True
def disconnect(self) -> None:
if self._conn:
self._conn.close()
self._conn = None
self._set_status(SensorStatus.DISCONNECTED)
def start_stream(self) -> None:
if not self.is_connected:
raise RuntimeError("not connected")
self._set_status(SensorStatus.STREAMING)
def stop_stream(self) -> None:
self._set_status(SensorStatus.CONNECTED)
def read_sample(self):
raw = self._conn.read_line()
temp = float(raw)
self._update_sample_time()
return SensorSample(timestamp_ns=int(time.time()*1e9), data={"temp": temp})
SensorManager.register_driver(SensorType.ENVIRONMENTAL, TempSensorDriver)
6. Link to documentation
このガイドの存在を明示するため、docs/NEUROSCIENCE_BRAIN_SIMULATION_BRIEF.md
の「センサー接続プラグイン」セクション末尾にリンクを追加します。
```markdown For detailed driver development procedures, refer to Sensor Driver Development Guide. ````
7. Best practices/notes
- Make hardware-specific libraries option-dependent and import them at the time of import. Allow module to load even if it fails
- Keeping device-specific settings in
SensorInfo.capabilitiesmakes expansion easier. - Since actual machine testing is difficult in a docker/container environment, mocks are used in CI. Cover it with the unit test you used
- Drivers can be easily debugged if state transitions are properly logged.
Based on this guide, you can use LiDAR, stereo camera, infrared camera, ONVIF/IP camera, You can now create new drivers for any environmental sensor. Other The same process can be applied to different sensor types.