Skip to content

Event Subscription Documentation

This document describes the event subscription system in the CELINE Digital Twin runtime.

The subscription system enables DT apps and external handlers to receive and react to events published via the broker.


Overview

The subscription infrastructure provides:

  • SubscriptionService for managing event subscriptions
  • MQTT subscriber with automatic token refresh
  • Concurrent event dispatch to handlers
  • Decorator-based and programmatic registration
  • YAML configuration for static subscriptions

Quick Start

1. Configure subscriptions (YAML)

Create or edit config/subscriptions.yaml:

subscriptions:
  - id: log-ev-charging
    topics:
      - "dt/ev-charging/#"
    handler: "celine.dt.handlers.examples:log_ev_charging_readiness"
    enabled: true

2. Create a handler

# celine/dt/handlers/my_handlers.py
from celine.dt.contracts.events import DTEvent
from celine.dt.contracts.subscription import EventContext

async def log_ev_charging_readiness(event: DTEvent, context: EventContext) -> None:
    print(f"Received {event.type} on {context.topic}")

3. Or use the decorator

from celine.dt.core.subscription import subscribe

@subscribe("dt/ev-charging/+/readiness")
async def handle_readiness(event: DTEvent, context: EventContext) -> None:
    print(f"EV Charging indicator: {event.payload.indicator}")

4. Or subscribe programmatically

# At runtime
sub_id = await dt.subscribe(
    topics=["dt/alerts/#"],
    handler=my_alert_handler,
)

# Later, unsubscribe
await dt.unsubscribe(sub_id)

Architecture

┌─────────────────────────────────────────────────────────────────┐
│                         DT Runtime                               │
│                                                                  │
│  ┌────────────────────────────────────────────────────────────┐ │
│  │                  SubscriptionService                        │ │
│  │  ┌─────────────────┐    ┌─────────────────────────────────┐│ │
│  │  │SubscriptionReg. │    │     EventDispatcher             ││ │
│  │  │                 │    │  - Concurrent handler dispatch  ││ │
│  │  │ - Topic patterns│◄───│  - Error isolation              ││ │
│  │  │ - Handlers      │    │  - Metrics                      ││ │
│  │  └─────────────────┘    └─────────────────────────────────┘│ │
│  └────────────────────────────────────────────────────────────┘ │
│                              ▲                                   │
│                              │                                   │
│  ┌───────────────────────────┴────────────────────────────────┐ │
│  │                    MqttSubscriber                           │ │
│  │  - JWT token refresh                                        │ │
│  │  - Automatic reconnection                                   │ │
│  │  - Topic management                                         │ │
│  └─────────────────────────────────────────────────────────────┘ │
└───────────────────────────────────────────────────────────────────┘
                              │
                              ▼
                    ┌──────────────────┐
                    │   MQTT Broker    │
                    │   (Mosquitto)    │
                    └──────────────────┘

Configuration

Application Settings

In .env or environment:

# Enable/disable subscriptions
SUBSCRIPTIONS_ENABLED=true

# Maximum concurrent handler invocations
SUBSCRIPTIONS_MAX_CONCURRENT=100

Subscriptions YAML

Location: config/subscriptions.yaml

subscriptions:
  # Simple handler
  - id: log-all-events
    topics:
      - "dt/#"
    handler: "mymodule:log_event"
    enabled: true

  # Multiple topics
  - id: multi-topic-handler
    topics:
      - "dt/ev-charging/+/readiness"
      - "dt/pv-forecast/+/updated"
    handler: "mymodule:handle_energy_events"
    enabled: true
    metadata:
      description: "Handles energy-related events"
      owner: "energy-team"

Topic Patterns

Subscriptions support MQTT-style topic wildcards:

Wildcard Description Example
+ Matches exactly one level dt/ev-charging/+/readiness matches dt/ev-charging/rec-folgaria/readiness
# Matches zero or more levels dt/ev-charging/# matches all under dt/ev-charging/

Handler Contract

Handlers must be async functions with this signature:

async def my_handler(event: DTEvent, context: EventContext) -> None:
    pass

EventContext

@dataclass
class EventContext:
    topic: str           # Actual topic (after wildcard resolution)
    broker_name: str     # Which broker delivered this
    received_at: datetime
    message_id: str | None
    raw_payload: bytes | None

Registration Methods

1. Decorator

@subscribe("dt/ev-charging/#")
async def handle_ev_events(event: DTEvent, context: EventContext) -> None:
    print(f"Received: {event.type}")

2. YAML Configuration

subscriptions:
  - id: my-handler
    topics: ["dt/ev-charging/#"]
    handler: "mymodule.handlers:my_handler"

3. Programmatic

sub_id = await dt.subscribe(
    topics=["dt/alerts/#"],
    handler=alert_handler,
)
await dt.unsubscribe(sub_id)

Token Refresh

When using JWT authentication, the subscriber automatically refreshes tokens at 80% of lifetime and reconnects seamlessly.


Error Handling

Errors in handlers are logged but don't affect other handlers. Each handler runs in isolation.


API Endpoints

List Subscriptions

GET /subscriptions

Returns:

{
  "subscriptions": [
    {
      "id": "my-handler",
      "topics": ["dt/ev-charging/#"],
      "enabled": true
    }
  ],
  "stats": {
    "running": true,
    "subscription_count": 3,
    "dispatch_count": 1234,
    "dispatch_errors": 2
  }
}