コンテンツにスキップ

AEG / AEG-Comm 実装概要

[!NOTE] 最新の実装状況は 機能実装ステータス (Remaining Functionality) を参照してください。

対象読者: エネルギー制御ゲーティングと分散通信の実装/レビューを行うエンジニア。参考になるコード: control.pyaeg_comm.py

本ドキュメントでは、AEGとAEG-Commの実装背景と構成要素を、概念、処理フロー、データ構造、統合手順の順に説明します。各図には役割と読み方を明記し、設計意図を追えるようにしています。

1. コンセプト

  • AEG (Activity-driven Energy Gating): ニューロンごとにエネルギーを保持し、スパイクで消費。エネルギーが閾値以下ならゲートし、報酬での補給や省電力適応をサポート。
  • AEG-Comm: AEGをノード間メッセージングに拡張し、3層セーフティ(エネルギーゲート、クリティカル優先、タイムスタンプ保証)に加えてバッファ/バッチ送信を実装。
  • トランスポート: ZenohCommunicator による Zenoh Pub/Sub(圧縮、任意の暗号化、リトライ、性能統計)。

概要説明: AEGはローカル計算負荷をエネルギー概念で管理し、過剰発火を抑える。一方AEG-Commは、通信の冗長性と安全性を両立するために3層セーフティで送信を選別し、必要時のみZenohへ流す。

2. コンポーネントの役割

  • AEG(ローカルゲート): エネルギー管理、負荷応答型消費、任意の適応閾値、報酬補給、電力統計。
  • AEGCommGate: AEG をラップして送信可否を判定。モダリティ別セーフティ、バッファリング、バッチ、テレメトリを付与。
  • ZenohCommunicator: スパイクパケットをPublish。圧縮、暗号化フック(MT25-EV015)、リトライとACKをサポート。

詳細説明: - AEGupdate() でスパイクごとの消費量を算出し、閾値を下回るニューロンをマスクする。enable_power_optimization が有効なら負荷に応じて消費率を自動調整し、adaptive_threshold が有効なら負荷履歴から閾値を動的に更新する。 - AEGCommGate はローカルでゲートされたスパイクを受け取り、3層セーフティで送信可否を決める。クリティカル条件やタイムスタンプ制約を優先し、送れない場合はローカルバッファに積んで後でバッチ送信する。 - ZenohCommunicator は圧縮・暗号化・再送・ACK・再接続を備えたPub/Subレイヤ。publish_spikes でスパイクとメタデータをトピックに送信し、性能統計を内部で追跡する。

3. ハイレベルアーキテクチャ

graph LR
    subgraph ローカルノード
        S[スパイクテンソル]
    I[重要度<br/>メタ・コンテキスト]
        S -->|spikes| G[AEG.update]
        I --> G
    G -->|ゲート済みスパイク| C{AEG-Comm<br/>3層セーフティ}
        C -->|allow| TX[Zenoh publish_spikes]
    C -->|block| B[ローカルバッファ<br/>バッチフラッシュ]
    end
    TX --> Net[Zenoh topics]
    Net --> Remote[リモートモジュール/ノード]

図の説明: 左のローカルノードでスパイクと重要度が AEG.update に入り、ゲート済みスパイクが3層セーフティ判定へ進む。許可されたものはZenoh経由でネットワークにPublishされ、拒否されたものはローカルバッファに蓄積されバッチ送信で後追いする。これにより通信量を抑えつつ重要データを確実に送る設計を示す。

4. AEG-Comm 3層セーフティ(動作)

flowchart TD
    P[SpikePacket
modality / data / metadata / ts] --> L1{Layer 1:
エネルギーゲート}
    L1 -->|energy ok| L2{Layer 2:
クリティカル優先?}
    L1 -->|low energy| BUF[バッファ]
    L2 -->|critical| SEND[即時送信]
    L2 -->|normal| L3{Layer 3:
タイムスタンプ期限超過?}
    L3 -->|> max_interval| SEND
    L3 -->|within window| BUF
    BUF --> FLUSH[定期バッチフラッシュ]
    SEND --> PUB[Zenoh publish_spikes]
- Layer 1: 消費推定 = consumption_rate * sum(spikes) * importance; 平均エネルギーが threshold + estimated_consumption を超えるときのみ送信。 - Layer 2(クリティカル優先): モダリティが critical_modalities(デフォルト forcesafety)、力変化が force_change_threshold 超え、テキスト内の緊急キーワード、emergency / safety_interrupt フラグのいずれかで常に送信。 - Layer 3(タイムスタンプ保証): モダリティごとに最後の送信から max_interval_ms(デフォルト50ms)を超えたら強制送信。 - バッファ: 未送信パケットをキャッシュ(デフォルト1000件)。spikes_batch/{target} で定期的にバッチ送信(デフォルト1秒)。

図の説明: パケットはまずエネルギー判定(Layer 1)を受け、不足時はバッファへ。クリティカル条件(Layer 2)に該当すれば即時送信。通常パケットはタイムスタンプ保証(Layer 3)により一定間隔で強制送信され、間隔内ならバッファに留められる。バッファは定期的にバッチ送信され、送信遅延と帯域の両方を最適化する。

5. データ構造とデフォルト

  • SpikePacket: {timestamp_ns, modality, data (tensor), metadata}
  • LocalSpikeBuffer(max_size=1000): FIFOキャッシュ。spikes_batch/{target} にバッチフラッシュ。
  • AEG デフォルト: initial_energy=255.0, threshold=10.0, consumption_rate=1.0, supply_rate=5.0, adaptive_threshold=False, enable_power_optimization 任意。
  • AEGCommGate デフォルト: critical_modalities=["force","safety"], emergency_keywords=["ストップ","痛い","危険","止まれ","stop","pain","danger","halt"], force_change_threshold=10.0, max_interval_ms=50.0, batch_interval_s=1.0

6. シーケンス(送信経路)

sequenceDiagram
    participant Src as ローカルモジュール
    participant AEG as AEG.update()
    participant Gate as AEGCommGate
    participant Zen as ZenohCommunicator
    participant Rmt as リモートノード

    Src->>AEG: spikes, importance
    AEG-->>Src: gated_spikes
    Src->>Gate: gated_spikes, modality, metadata
    Gate->>Gate: _should_send (energy, criticality, timestamp)
    alt should_send
        Gate->>Zen: publish_spikes(target, data, metadata)
    else buffer
        Gate->>Gate: cache(packet)
    end
    Zen-->>Rmt: spikes/{target}

図の説明: ローカルモジュールがスパイクと重要度を AEG.update に渡し、ゲート済みスパイクを AEGCommGate へ供給。AEGCommGate は3層セーフティ判定を行い、送信可なら ZenohCommunicator にPublish依頼し、不可ならバッファ。Zenohがリモートノードにスパイクを届け、上流/下流のモジュール連携が成立する。

7. 統合ポイント

  • モデル側ゲート: forward中に AEG.update(spikes, importance) を呼び、エネルギー > threshold でマスク。
  • 通信側ゲート: 出力を AEGCommGate(..., communicator=ZenohCommunicator(...)) でラップし、モダリティ/メタデータを渡して優先ロジックを効かせる。
  • 報酬補給: AEG.supply(reward) または AEGCommGate.supply(reward) でエネルギー補給。
  • 統計: AEGCommGate.get_stats() で送信/バッファ/クリティカル率と削減率、AEG.get_power_statistics() でエネルギー/効率メトリクスを取得。

8. セーフティと信頼性の注意

  • クリティカル経路はエネルギーゲート(Layer 2)とタイムスタンプガード(Layer 3)で新鮮性を担保しつつバイパス送信。
  • ローカルバッファで低エネルギー時のロスを防ぎ、バッチ送信で帯域を抑制。
  • Zenoh層は圧縮・任意暗号化・リトライ・ACK・再接続ロジックを持ち、配送の信頼性を確保。

9. クイックスタート(接続例)

1) コミュニケータ生成: comm = ZenohCommunicator(node_id="pfc-0", config=ZenohConfig(...))。 2) ラップ: gate = AEGCommGate(num_neurons=..., communicator=comm, target_node="pfc")。 3) forward内:

importance = ...  # shape broadcastable to spikes
spikes_out = gate(spikes, importance, modality="vision", metadata={"text": "stop"})
4) 強化学習ループなどから任意で gate.supply(reward) を呼ぶ。

10. トレーサビリティ

  • コアゲート処理: control.py
  • 通信セーフティラッパー: aeg_comm.py
  • トランスポート/ランタイム: zenoh_comm.py
  • テスト: test_aeg_comm.py, test_control.py