センサー ドライバ開発ガイド
[!NOTE] 最新の実装状況は 機能実装ステータス (Remaining Functionality) を参照してください。
EvoSpikeNet の分散脳シミュレーションは外部センサーをプラグインとして 取り込めるように設計されています。本書では新しいデバイス用のドライバを 作成する手順とベストプラクティスをまとめます。
1. 仕組みの概要
センサー統合パッケージは evospikenet.sensor_integration に実装されており、
主な構成要素は以下のとおりです。
SensorDriver: 抽象基底クラス。接続/ストリーム/サンプル取得のインターフェースを 定義。SensorManager: ドライバの登録と生成を行うシングルトン・ファクトリ。SensorType: 列挙型。CAMERA,LIDAR,ENVIRONMENTAL,AUDIO,GPIO等のカテゴリを定義。- 各種サンプル/情報 dataclass (
SensorInfo,SensorSample) 。
ドライバは SensorManager.register_driver で型名と共に登録され、
アプリケーションは型を指定して作成します。
from evospikenet.sensor_integration import SensorManager, SensorType, SensorInfo
info = SensorInfo(sensor_type=SensorType.CAMERA, name="webcam0")
driver = SensorManager.create_driver(SensorType.CAMERA, info=info, source=0)
登録されたドライバは最後に登録されたものが返る単純なマッピングです。
並列に複数の種類を使いたい場合は別途ファクトリ関数を作るか、
SensorManager.available_types() で登録済みの種類を調べて切り替えます。
2. 新規ドライバの実装手順
-
evospikenet/sensor_integration/配下に新しいモジュールを作成します。 例えばmy_sensor.pyなど。 -
ファイルの冒頭で必要なクラスをインポート:
from evospikenet.sensor_integration.device_interface import ( SensorDriver, SensorInfo, SensorSample, SensorStatus, SensorType, SensorManager ) -
SensorDriverを継承したクラスを定義し、以下の抽象メソッドを実装します。 connect(self) -> booldisconnect(self) -> Nonestart_stream(self) -> Nonestop_stream(self) -> Noneread_sample(self) -> Optional[SensorSample]
既存ドライバを参考にすると実装が楽です。
-
必要に応じてコンストラクタ引数を追加し、
SensorInfoのcapabilitiesで 機能を記録します。 -
モジュール末尾で
SensorManager.register_driverを呼び出して登録します。※ONVIFCamera のように引数依存で登録したくない場合は登録を省略し、 クライアントが自らインスタンス化するスタイルでも構いません。SensorManager.register_driver(SensorType.MY_TYPE, MySensorDriver) -
ドライバ実装に対する単体テストを
tests/unit/に追加します。 モックを使えばハードウェアがなくても検証可能です。 -
必要に応じてドキュメント (
docs/) に例と説明を追加します。
3. テストを書く
- 既存の
tests/unit/test_usb_camera_driver.pyやtest_stereo_infrared_onvif_env.pyを参考にします。 unittest.mock.patchで機器依存ライブラリ(cv2,rplidarなど)を 置き換え、正常系・異常系を網羅します。SensorManager.create_driverを使うテストと、直接クラスを 呼び出すテストの両方を用意すると堅牢です。
4. ドライバ登録の管理
同じ SensorType に複数ドライバを登録すると、後から登録したものが
返ってきます。用途によってドライバを使い分けたい場合は、
# 使用時に明示的にクラスを指定してインスタンス化
from evospikenet.sensor_integration import ONVIFCameraDriver, SensorInfo
info = SensorInfo(sensor_type=SensorType.CAMERA, name="ipcam")
driver = ONVIFCameraDriver(info, url="rtsp://...")
とするか、ファクトリ関数を別途用意してください。
5. 例:新規ドライバサンプル
5.1 USBカメラドライバの具体例
USBカメラは最も一般的なセンサーの一つであるため、 ドライバ実装の手順を順を追って説明します。
- 新規モジュール作成:
evospikenet/sensor_integration/usb_camera.pyを作成。 - 依存ライブラリのインポート: OpenCV (
cv2) を使うのでtry: import cv2 except ImportError: cv2 = Noneと記述し、 ライブラリ未インストールでもモジュールが読み込めるようにする。 - クラス定義:
SensorDriverを継承し、connect/disconnect/start_stream/stop_stream/read_sampleを 実装。connectではcv2.VideoCaptureを開き、read_sampleは取得フレームを RGBnumpy.ndarrayで返す。 - 登録: ファイル末尾で
SensorManager.register_driver(SensorType.CAMERA, USBCameraDriver)を呼び出してデフォルトドライバとして登録。 - テスト:
tests/unit/test_usb_camera_driver.pyを追加し、cv2をモックして各メソッドの動作を検証。 - ドキュメント追加: 本ガイドに詳しい手順を書き、 READMEや BRIEF にリンクする。
実際の実装コードは以下のとおり。
# evospikenet/sensor_integration/usb_camera.py
from evospikenet.sensor_integration.device_interface import (
SensorDriver, SensorInfo, SensorSample, SensorStatus, SensorType, SensorManager,
)
import time
import numpy as np
try:
import cv2
except ImportError:
cv2 = None
class USBCameraDriver(SensorDriver):
def __init__(self, info: SensorInfo, source: Any = 0):
super().__init__(info)
self.source = source
self._capture = None
def connect(self) -> bool:
if cv2 is None:
raise RuntimeError("OpenCV required")
self._capture = cv2.VideoCapture(self.source)
success = self._capture.isOpened()
self._set_status(SensorStatus.CONNECTED if success else SensorStatus.ERROR)
return success
def disconnect(self) -> None:
if self._capture:
self._capture.release()
self._capture = None
self._set_status(SensorStatus.DISCONNECTED)
def start_stream(self) -> None:
if not self.is_connected:
raise RuntimeError("camera not connected")
self._set_status(SensorStatus.STREAMING)
def stop_stream(self) -> None:
self._set_status(SensorStatus.CONNECTED)
def read_sample(self):
if not self.is_streaming or self._capture is None:
return None
ret, frame = self._capture.read()
if not ret:
return None
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
self._update_sample_time()
return SensorSample(timestamp_ns=int(time.time()*1e9), data=frame)
# テスト例
```python
# tests/unit/test_usb_camera_driver.py
import unittest
5.2 GPIOスイッチドライバ例
GPIO やリレーを制御するプラグインも同様の手順で実装できます。
- 新規ファイル
gpio_switch.pyを作成。 RPi.GPIOの有無に依存しないよう try/except でインポート。SensorDriverを継承し、pin 番号をコンストラクタで受け取る。write_stateメソッドを追加して 0/1 出力を切り替える。- ダミードライバをサブクラス化し、テスト用にログ出力。
- 最後に
SensorManager.register_driverで登録。
# evospikenet/sensor_integration/gpio_switch.py
from evospikenet.sensor_integration.device_interface import (
SensorDriver, SensorInfo, SensorSample, SensorStatus, SensorType, SensorManager
)
import time
try:
import RPi.GPIO as GPIO
except ImportError:
GPIO = None
class GPIOSwitchDriver(SensorDriver):
def __init__(self, info: SensorInfo, pin=None):
super().__init__(info)
if info.capabilities is None:
info.capabilities = {}
self.pin = pin if pin is not None else info.capabilities.get("pin")
if self.pin is None:
raise ValueError("pin number must be provided")
if GPIO and not GPIO.getmode():
GPIO.setmode(GPIO.BCM)
self._state = 0
def connect(self) -> bool:
if GPIO:
GPIO.setup(self.pin, GPIO.OUT)
self._set_status(SensorStatus.CONNECTED)
return True
def write_state(self, value: int) -> None:
if value not in (0,1):
raise ValueError("state must be 0 or 1")
self._state = value
if GPIO:
GPIO.output(self.pin, value)
self._update_sample_time()
# 他メソッドは省略…
SensorManager.register_driver(SensorType.AUDIO, DummyGPIOSwitchDriver)
テスト例は tests/unit/test_gpio_switch.py を参照してください。
5.3 音声入出力ドライバ例
マイクとスピーカは sounddevice を利用するドライバを用意しました。
# evospikenet/sensor_integration/audio_input.py
from evospikenet.sensor_integration.device_interface import (
SensorDriver, SensorInfo, SensorSample, SensorStatus, SensorType, SensorManager
)
import numpy as np, time
try:
import sounddevice as sd
except ImportError:
sd = None
class MicrophoneDriver(SensorDriver):
def __init__(self, info: SensorInfo):
super().__init__(info)
if info.capabilities is None:
info.capabilities = {}
self.stream = None
self.buffer = None
def connect(self):
if sd is None:
raise RuntimeError("sounddevice required")
self.stream = sd.InputStream(samplerate=info.capabilities.get("samplerate",16000), channels=info.capabilities.get("channels",1), callback=self._callback)
self._set_status(SensorStatus.CONNECTED)
return True
# 省略…
SensorManager.register_driver(SensorType.AUDIO, DummyMicrophoneDriver)
スピーカ側は audio_output.py に同様の構造。テストは
tests/unit/test_audio_drivers.py を参照。
```from unittest.mock import MagicMock, patch import numpy as np from evospikenet.sensor_integration import USBCameraDriver, SensorInfo, SensorType, SensorStatus
class TestUSBCameraDriver(unittest.TestCase): @patch('evospikenet.sensor_integration.usb_camera.cv2') def test_connect_and_read(self, mock_cv2): frame = np.zeros((10,10,3), dtype=np.uint8) cap = MagicMock() cap.isOpened.return_value = True cap.read.return_value = (True, frame) mock_cv2.VideoCapture.return_value = cap mock_cv2.cvtColor.return_value = frame
info = SensorInfo(sensor_type=SensorType.CAMERA, name='cam')
drv = USBCameraDriver(info, source=0)
self.assertTrue(drv.connect())
drv.start_stream()
sample = drv.read_sample()
self.assertIsNotNone(sample)
self.assertEqual(sample.data.shape, frame.shape)
drv.stop_stream()
drv.disconnect()
python
evospikenet/sensor_integration/temperature_sensor.py
if not ret:
return None
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
self._update_sample_time()
return SensorSample(timestamp_ns=int(time.time()*1e9), data=frame)
SensorManager.register_driver(SensorType.CAMERA, USBCameraDriver) ```
# evospikenet/sensor_integration/temperature_sensor.py
from evospikenet.sensor_integration.device_interface import (
SensorDriver, SensorInfo, SensorSample, SensorStatus, SensorType, SensorManager
)
class TempSensorDriver(SensorDriver):
def __init__(self, info: SensorInfo, port: str):
super().__init__(info)
self.port = port
self._conn = None
def connect(self) -> bool:
self._conn = open_serial(self.port) # 仮の関数
self._set_status(SensorStatus.CONNECTED)
return True
def disconnect(self) -> None:
if self._conn:
self._conn.close()
self._conn = None
self._set_status(SensorStatus.DISCONNECTED)
def start_stream(self) -> None:
if not self.is_connected:
raise RuntimeError("not connected")
self._set_status(SensorStatus.STREAMING)
def stop_stream(self) -> None:
self._set_status(SensorStatus.CONNECTED)
def read_sample(self):
raw = self._conn.read_line()
temp = float(raw)
self._update_sample_time()
return SensorSample(timestamp_ns=int(time.time()*1e9), data={"temp": temp})
SensorManager.register_driver(SensorType.ENVIRONMENTAL, TempSensorDriver)
6. ドキュメントへのリンク
このガイドの存在を明示するため、docs/NEUROSCIENCE_BRAIN_SIMULATION_BRIEF.md
の「センサー接続プラグイン」セクション末尾にリンクを追加します。
詳細なドライバ開発手順は [センサー ドライバ開発ガイド](SENSOR_DRIVER_DEVELOPMENT.md) を参照。
7. ベストプラクティス/注意点
- ハードウェア特有のライブラリはオプション依存にして、インポート時に 失敗してもモジュールが読み込めるようにする
SensorInfo.capabilitiesにデバイス固有設定を保持しておくと拡張が楽- 実機テストは docker/container 環境では難しいため、CI ではモックを 用いたユニットテストでカバーする
- ドライバは状態遷移をきっちりロギングすると、デバッグが容易
このガイドを基に、LiDAR、ステレオカメラ、赤外線カメラ、ONVIF/IPカメラ、 環境センサーのどれでも新規ドライバが作れるようになります。その他の センサー種別にも同様の流れで対応可能です。