コンテンツにスキップ

センサー ドライバ開発ガイド

[!NOTE] 最新の実装状況は 機能実装ステータス (Remaining Functionality) を参照してください。

EvoSpikeNet の分散脳シミュレーションは外部センサーをプラグインとして 取り込めるように設計されています。本書では新しいデバイス用のドライバを 作成する手順とベストプラクティスをまとめます。


1. 仕組みの概要

センサー統合パッケージは evospikenet.sensor_integration に実装されており、 主な構成要素は以下のとおりです。

  • SensorDriver : 抽象基底クラス。接続/ストリーム/サンプル取得のインターフェースを 定義。
  • SensorManager : ドライバの登録と生成を行うシングルトン・ファクトリ。
  • SensorType : 列挙型。CAMERA, LIDAR, ENVIRONMENTAL, AUDIO, GPIO 等のカテゴリを定義。
  • 各種サンプル/情報 dataclass (SensorInfo, SensorSample) 。

ドライバは SensorManager.register_driver で型名と共に登録され、 アプリケーションは型を指定して作成します。

from evospikenet.sensor_integration import SensorManager, SensorType, SensorInfo

info = SensorInfo(sensor_type=SensorType.CAMERA, name="webcam0")
driver = SensorManager.create_driver(SensorType.CAMERA, info=info, source=0)

登録されたドライバは最後に登録されたものが返る単純なマッピングです。 並列に複数の種類を使いたい場合は別途ファクトリ関数を作るか、 SensorManager.available_types() で登録済みの種類を調べて切り替えます。


2. 新規ドライバの実装手順

  1. evospikenet/sensor_integration/ 配下に新しいモジュールを作成します。 例えば my_sensor.py など。

  2. ファイルの冒頭で必要なクラスをインポート:

    from evospikenet.sensor_integration.device_interface import (
        SensorDriver, SensorInfo, SensorSample, SensorStatus, SensorType, SensorManager
    )
    

  3. SensorDriver を継承したクラスを定義し、以下の抽象メソッドを実装します。

  4. connect(self) -> bool
  5. disconnect(self) -> None
  6. start_stream(self) -> None
  7. stop_stream(self) -> None
  8. read_sample(self) -> Optional[SensorSample]

既存ドライバを参考にすると実装が楽です。

  1. 必要に応じてコンストラクタ引数を追加し、SensorInfocapabilities で 機能を記録します。

  2. モジュール末尾で SensorManager.register_driver を呼び出して登録します。

    SensorManager.register_driver(SensorType.MY_TYPE, MySensorDriver)
    
    ※ONVIFCamera のように引数依存で登録したくない場合は登録を省略し、 クライアントが自らインスタンス化するスタイルでも構いません。

  3. ドライバ実装に対する単体テストを tests/unit/ に追加します。 モックを使えばハードウェアがなくても検証可能です。

  4. 必要に応じてドキュメント (docs/) に例と説明を追加します。


3. テストを書く

  • 既存の tests/unit/test_usb_camera_driver.pytest_stereo_infrared_onvif_env.py を参考にします。
  • unittest.mock.patch で機器依存ライブラリ(cv2, rplidar など)を 置き換え、正常系・異常系を網羅します。
  • SensorManager.create_driver を使うテストと、直接クラスを 呼び出すテストの両方を用意すると堅牢です。

4. ドライバ登録の管理

同じ SensorType に複数ドライバを登録すると、後から登録したものが 返ってきます。用途によってドライバを使い分けたい場合は、

# 使用時に明示的にクラスを指定してインスタンス化
from evospikenet.sensor_integration import ONVIFCameraDriver, SensorInfo
info = SensorInfo(sensor_type=SensorType.CAMERA, name="ipcam")
driver = ONVIFCameraDriver(info, url="rtsp://...")

とするか、ファクトリ関数を別途用意してください。


5. 例:新規ドライバサンプル

5.1 USBカメラドライバの具体例

USBカメラは最も一般的なセンサーの一つであるため、 ドライバ実装の手順を順を追って説明します。

  1. 新規モジュール作成: evospikenet/sensor_integration/usb_camera.py を作成。
  2. 依存ライブラリのインポート: OpenCV (cv2) を使うので try: import cv2 except ImportError: cv2 = None と記述し、 ライブラリ未インストールでもモジュールが読み込めるようにする。
  3. クラス定義: SensorDriver を継承し、 connect/disconnect/start_stream/stop_stream/read_sample を 実装。connect では cv2.VideoCapture を開き、 read_sample は取得フレームを RGB numpy.ndarray で返す。
  4. 登録: ファイル末尾で SensorManager.register_driver(SensorType.CAMERA, USBCameraDriver) を呼び出してデフォルトドライバとして登録。
  5. テスト: tests/unit/test_usb_camera_driver.py を追加し、 cv2 をモックして各メソッドの動作を検証。
  6. ドキュメント追加: 本ガイドに詳しい手順を書き、 READMEや BRIEF にリンクする。

実際の実装コードは以下のとおり。

# evospikenet/sensor_integration/usb_camera.py
from evospikenet.sensor_integration.device_interface import (
    SensorDriver, SensorInfo, SensorSample, SensorStatus, SensorType, SensorManager,
)

import time
import numpy as np

try:
    import cv2
except ImportError:
    cv2 = None

class USBCameraDriver(SensorDriver):
    def __init__(self, info: SensorInfo, source: Any = 0):
        super().__init__(info)
        self.source = source
        self._capture = None

    def connect(self) -> bool:
        if cv2 is None:
            raise RuntimeError("OpenCV required")
        self._capture = cv2.VideoCapture(self.source)
        success = self._capture.isOpened()
        self._set_status(SensorStatus.CONNECTED if success else SensorStatus.ERROR)
        return success

    def disconnect(self) -> None:
        if self._capture:
            self._capture.release()
            self._capture = None
        self._set_status(SensorStatus.DISCONNECTED)

    def start_stream(self) -> None:
        if not self.is_connected:
            raise RuntimeError("camera not connected")
        self._set_status(SensorStatus.STREAMING)

    def stop_stream(self) -> None:
        self._set_status(SensorStatus.CONNECTED)

    def read_sample(self):
        if not self.is_streaming or self._capture is None:
            return None
        ret, frame = self._capture.read()
        if not ret:
            return None
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        self._update_sample_time()
        return SensorSample(timestamp_ns=int(time.time()*1e9), data=frame)

# テスト例
```python
# tests/unit/test_usb_camera_driver.py
import unittest

5.2 GPIOスイッチドライバ例

GPIO やリレーを制御するプラグインも同様の手順で実装できます。

  1. 新規ファイル gpio_switch.py を作成。
  2. RPi.GPIO の有無に依存しないよう try/except でインポート。
  3. SensorDriver を継承し、pin 番号をコンストラクタで受け取る。
  4. write_state メソッドを追加して 0/1 出力を切り替える。
  5. ダミードライバをサブクラス化し、テスト用にログ出力。
  6. 最後に SensorManager.register_driver で登録。
# evospikenet/sensor_integration/gpio_switch.py
from evospikenet.sensor_integration.device_interface import (
    SensorDriver, SensorInfo, SensorSample, SensorStatus, SensorType, SensorManager
)
import time

try:
    import RPi.GPIO as GPIO
except ImportError:
    GPIO = None

class GPIOSwitchDriver(SensorDriver):
    def __init__(self, info: SensorInfo, pin=None):
        super().__init__(info)
        if info.capabilities is None:
            info.capabilities = {}
        self.pin = pin if pin is not None else info.capabilities.get("pin")
        if self.pin is None:
            raise ValueError("pin number must be provided")
        if GPIO and not GPIO.getmode():
            GPIO.setmode(GPIO.BCM)
        self._state = 0

    def connect(self) -> bool:
        if GPIO:
            GPIO.setup(self.pin, GPIO.OUT)
        self._set_status(SensorStatus.CONNECTED)
        return True

    def write_state(self, value: int) -> None:
        if value not in (0,1):
            raise ValueError("state must be 0 or 1")
        self._state = value
        if GPIO:
            GPIO.output(self.pin, value)
        self._update_sample_time()

    # 他メソッドは省略…

SensorManager.register_driver(SensorType.AUDIO, DummyGPIOSwitchDriver)

テスト例は tests/unit/test_gpio_switch.py を参照してください。

5.3 音声入出力ドライバ例

マイクとスピーカは sounddevice を利用するドライバを用意しました。

# evospikenet/sensor_integration/audio_input.py
from evospikenet.sensor_integration.device_interface import (
    SensorDriver, SensorInfo, SensorSample, SensorStatus, SensorType, SensorManager
)
import numpy as np, time
try:
    import sounddevice as sd
except ImportError:
    sd = None

class MicrophoneDriver(SensorDriver):
    def __init__(self, info: SensorInfo):
        super().__init__(info)
        if info.capabilities is None:
            info.capabilities = {}
        self.stream = None
        self.buffer = None
    def connect(self):
        if sd is None:
            raise RuntimeError("sounddevice required")
        self.stream = sd.InputStream(samplerate=info.capabilities.get("samplerate",16000), channels=info.capabilities.get("channels",1), callback=self._callback)
        self._set_status(SensorStatus.CONNECTED)
        return True
    # 省略…

SensorManager.register_driver(SensorType.AUDIO, DummyMicrophoneDriver)

スピーカ側は audio_output.py に同様の構造。テストは tests/unit/test_audio_drivers.py を参照。

```from unittest.mock import MagicMock, patch import numpy as np from evospikenet.sensor_integration import USBCameraDriver, SensorInfo, SensorType, SensorStatus

class TestUSBCameraDriver(unittest.TestCase): @patch('evospikenet.sensor_integration.usb_camera.cv2') def test_connect_and_read(self, mock_cv2): frame = np.zeros((10,10,3), dtype=np.uint8) cap = MagicMock() cap.isOpened.return_value = True cap.read.return_value = (True, frame) mock_cv2.VideoCapture.return_value = cap mock_cv2.cvtColor.return_value = frame

    info = SensorInfo(sensor_type=SensorType.CAMERA, name='cam')
    drv = USBCameraDriver(info, source=0)
    self.assertTrue(drv.connect())
    drv.start_stream()
    sample = drv.read_sample()
    self.assertIsNotNone(sample)
    self.assertEqual(sample.data.shape, frame.shape)
    drv.stop_stream()
    drv.disconnect()

python

evospikenet/sensor_integration/temperature_sensor.py

    if not ret:
        return None
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    self._update_sample_time()
    return SensorSample(timestamp_ns=int(time.time()*1e9), data=frame)

SensorManager.register_driver(SensorType.CAMERA, USBCameraDriver) ```

# evospikenet/sensor_integration/temperature_sensor.py
from evospikenet.sensor_integration.device_interface import (
    SensorDriver, SensorInfo, SensorSample, SensorStatus, SensorType, SensorManager
)

class TempSensorDriver(SensorDriver):
    def __init__(self, info: SensorInfo, port: str):
        super().__init__(info)
        self.port = port
        self._conn = None

    def connect(self) -> bool:
        self._conn = open_serial(self.port)  # 仮の関数
        self._set_status(SensorStatus.CONNECTED)
        return True

    def disconnect(self) -> None:
        if self._conn:
            self._conn.close()
            self._conn = None
        self._set_status(SensorStatus.DISCONNECTED)

    def start_stream(self) -> None:
        if not self.is_connected:
            raise RuntimeError("not connected")
        self._set_status(SensorStatus.STREAMING)

    def stop_stream(self) -> None:
        self._set_status(SensorStatus.CONNECTED)

    def read_sample(self):
        raw = self._conn.read_line()
        temp = float(raw)
        self._update_sample_time()
        return SensorSample(timestamp_ns=int(time.time()*1e9), data={"temp": temp})

SensorManager.register_driver(SensorType.ENVIRONMENTAL, TempSensorDriver)

6. ドキュメントへのリンク

このガイドの存在を明示するため、docs/NEUROSCIENCE_BRAIN_SIMULATION_BRIEF.md の「センサー接続プラグイン」セクション末尾にリンクを追加します。

詳細なドライバ開発手順は [センサー ドライバ開発ガイド](SENSOR_DRIVER_DEVELOPMENT.md) を参照。

7. ベストプラクティス/注意点

  • ハードウェア特有のライブラリはオプション依存にして、インポート時に 失敗してもモジュールが読み込めるようにする
  • SensorInfo.capabilities にデバイス固有設定を保持しておくと拡張が楽
  • 実機テストは docker/container 環境では難しいため、CI ではモックを 用いたユニットテストでカバーする
  • ドライバは状態遷移をきっちりロギングすると、デバッグが容易

このガイドを基に、LiDAR、ステレオカメラ、赤外線カメラ、ONVIF/IPカメラ、 環境センサーのどれでも新規ドライバが作れるようになります。その他の センサー種別にも同様の流れで対応可能です。