Skip to content

Event Brokers

This document describes the event broker system in the CELINE Digital Twin runtime.

The broker system enables DT apps to publish computed events to external systems (e.g., MQTT brokers, message queues) and subscribe to incoming events for real-time integration.


Overview

The broker infrastructure provides:

  • Unified Broker protocol for both publishing and subscribing
  • MQTT implementation for IoT/edge scenarios
  • Token provider integration for JWT authentication
  • Automatic reconnection and token refresh
  • Configuration-driven broker setup

Quick Start

1. Configure the broker

Create or edit config/brokers.yaml:

brokers:
  mqtt_local:
    class: celine.dt.core.broker.mqtt:MqttBroker
    enabled: true
    config:
      host: localhost
      port: 1883
      topic_prefix: "celine/dt/"

default_broker: mqtt_local

2. Publish events from an app

from celine.dt.contracts.app import DTApp
from celine.dt.core.context import RunContext

class MyApp(DTApp[MyConfig, MyResult]):
    async def run(self, config: MyConfig, context: RunContext) -> MyResult:
        result = await self._compute(config)

        # Publish event if broker is available
        if context.has_broker():
            event = create_my_event(
                community_id=config.community_id,
                indicator=result.indicator,
            )
            await context.publish_event(event)

        return result

3. Subscribe to events

from celine.dt.core.broker import MqttBroker, MqttConfig, ReceivedMessage

async def main():
    broker = MqttBroker(MqttConfig(host="localhost"))

    async def handle_message(msg: ReceivedMessage):
        print(f"Received on {msg.topic}: {msg.payload}")

    async with broker:
        # Subscribe
        result = await broker.subscribe(
            topics=["dt/my-module/#"],
            handler=handle_message,
        )

        # Keep running to receive messages
        await asyncio.sleep(3600)

        # Cleanup
        await broker.unsubscribe(result.subscription_id)

Architecture

The broker provides a unified interface for both publishing and subscribing:

┌─────────────────────────────────────────────────────────────────┐
│                         DT Runtime                               │
│                                                                  │
│  ┌─────────────┐        ┌─────────────────────────────────────┐ │
│  │   DT App    │───────▶│           MqttBroker                │ │
│  └─────────────┘        │                                     │ │
│                         │  ┌─────────────┐  ┌──────────────┐  │ │
│  ┌─────────────┐        │  │  publish()  │  │ subscribe()  │  │ │
│  │  Handlers   │◀───────│  └─────────────┘  └──────────────┘  │ │
│  └─────────────┘        │                                     │ │
│                         │  • Token refresh (auto)             │ │
│                         │  • Reconnection (auto)              │ │
│                         │  • TLS support                      │ │
│                         └──────────────┬──────────────────────┘ │
└────────────────────────────────────────┼────────────────────────┘
                                         │
                                         ▼
                              ┌──────────────────┐
                              │  MQTT Broker     │
                              │  (Mosquitto)     │
                              └──────────────────┘

Configuration

Broker Configuration File

Location: config/brokers.yaml

brokers:
  mqtt_local:
    class: celine.dt.core.broker.mqtt:MqttBroker
    enabled: true
    config:
      host: "${MQTT_HOST:-localhost}"
      port: ${MQTT_PORT:-1883}
      topic_prefix: "celine/dt/"
      keepalive: 60
      clean_session: true

  mqtt_secure:
    class: celine.dt.core.broker.mqtt:MqttBroker
    enabled: false
    config:
      host: secure-broker.example.com
      port: 8883
      use_tls: true
      ca_certs: /etc/ssl/certs/ca-certificates.crt

default_broker: mqtt_local

MqttConfig Reference

Field Type Default Description
host str localhost MQTT broker hostname
port int 1883 MQTT broker port
client_id str auto Unique client identifier
username str None Authentication username (ignored if token_provider set)
password str None Authentication password (ignored if token_provider set)
use_tls bool false Enable TLS encryption
ca_certs str None Path to CA certificate file
certfile str None Path to client certificate
keyfile str None Path to client private key
keepalive int 60 Keepalive interval (seconds)
clean_session bool true Start with clean session
reconnect_interval float 5.0 Seconds between reconnect attempts
topic_prefix str "" Prefix for all topics
token_refresh_margin float 30.0 Seconds before token expiry to refresh

Environment Variables

Variable Description Default
MQTT_HOST MQTT broker hostname localhost
MQTT_PORT MQTT broker port 1883
MQTT_USERNAME Authentication username (none)
MQTT_PASSWORD Authentication password (none)

Authentication

The broker supports two authentication methods:

1. Username/Password

Static credentials configured in brokers.yaml:

brokers:
  mqtt_local:
    class: celine.dt.core.broker.mqtt:MqttBroker
    config:
      host: broker.example.com
      username: "${MQTT_USERNAME}"
      password: "${MQTT_PASSWORD}"

2. JWT via TokenProvider

For OIDC/OAuth2 authentication, pass a TokenProvider to the broker:

from celine.sdk.auth.oidc import OidcClientCredentialsProvider
from celine.dt.core.broker import MqttBroker, MqttConfig

# Create token provider
token_provider = OidcClientCredentialsProvider(
    base_url="https://keycloak.example.com/realms/celine",
    client_id="dt-service",
    client_secret="secret",
    scope="openid",
)

# Create broker with JWT auth
broker = MqttBroker(
    config=MqttConfig(
        host="secure-broker.example.com",
        port=8883,
        use_tls=True,
    ),
    token_provider=token_provider,
)

async with broker:
    # Both publish and subscribe use JWT auth
    await broker.publish(message)
    await broker.subscribe(topics, handler)

When a token_provider is configured: - Username is set to "jwt" (convention for JWT auth in Mosquitto) - Password is set to the access token - Tokens are automatically refreshed before expiry - The broker reconnects seamlessly with new credentials

See Token Providers for more details.


Publishing Events

The simplest way to publish events from an app:

from celine.dt.contracts.broker import QoS

class MyApp(DTApp[MyConfig, MyResult]):
    async def run(self, config: MyConfig, context: RunContext) -> MyResult:
        result = await self._compute(config)

        if context.has_broker():
            event = create_my_event(...)

            # Simple publish
            await context.publish_event(event)

            # With explicit QoS
            await context.broker.publish_event(
                event,
                qos=QoS.EXACTLY_ONCE,
                retain=True,
            )

        return result

Using MqttBroker Directly

For standalone usage:

from celine.dt.core.broker import MqttBroker, MqttConfig, BrokerMessage, QoS

async def standalone_publish():
    broker = MqttBroker(MqttConfig(
        host="localhost",
        topic_prefix="celine/dt/",
    ))

    async with broker:
        result = await broker.publish(BrokerMessage(
            topic="events/my-event",
            payload={"indicator": "OPTIMAL"},
            qos=QoS.AT_LEAST_ONCE,
        ))

        if result.success:
            print(f"Published: {result.message_id}")

QoS Levels

Level Name Description
0 AT_MOST_ONCE Fire and forget
1 AT_LEAST_ONCE Acknowledged delivery (default)
2 EXACTLY_ONCE Guaranteed single delivery

Subscribing to Events

Using the Broker Directly

from celine.dt.core.broker import MqttBroker, MqttConfig, ReceivedMessage

async def main():
    broker = MqttBroker(MqttConfig(host="localhost"))

    # Define handler
    async def handle_alerts(msg: ReceivedMessage):
        print(f"Alert on {msg.topic}")
        print(f"Payload: {msg.payload}")
        print(f"Received at: {msg.timestamp}")

    async with broker:
        # Subscribe to multiple topic patterns
        result = await broker.subscribe(
            topics=["dt/alerts/#", "dt/errors/+/critical"],
            handler=handle_alerts,
            qos=QoS.AT_LEAST_ONCE,
        )

        print(f"Subscribed with ID: {result.subscription_id}")

        # Do other work while receiving messages...
        await asyncio.sleep(3600)

        # Unsubscribe when done
        await broker.unsubscribe(result.subscription_id)

Multiple Subscriptions

async with broker:
    # Each subscription gets its own handler
    alerts_sub = await broker.subscribe(
        topics=["dt/alerts/#"],
        handler=handle_alerts,
    )

    metrics_sub = await broker.subscribe(
        topics=["dt/metrics/#"],
        handler=handle_metrics,
    )

    # Unsubscribe individually
    await broker.unsubscribe(alerts_sub.subscription_id)
    await broker.unsubscribe(metrics_sub.subscription_id)

Topic Wildcards

Topic patterns follow MQTT conventions: - + matches exactly one level: dt/module/+/event matches dt/module/foo/event - # matches zero or more levels (must be last): dt/module/# matches all under dt/module/

ReceivedMessage

Handlers receive a ReceivedMessage with:

@dataclass(frozen=True)
class ReceivedMessage:
    topic: str              # Topic the message arrived on
    payload: dict[str, Any] # Parsed JSON payload
    raw_payload: bytes      # Original message bytes
    qos: QoS                # QoS level of delivery
    message_id: str | None  # Broker message ID
    timestamp: datetime     # When received

Token Providers

Token providers enable OAuth2/OIDC authentication for brokers and other services.

TokenProvider Protocol

from abc import ABC, abstractmethod
from celine.sdk.auth.models import AccessToken

class TokenProvider(ABC):
    @abstractmethod
    async def get_token(self) -> AccessToken:
        """
        Return a valid access token.
        Implementations must refresh or re-authenticate if needed.
        """
        ...

Built-in: OidcClientCredentialsProvider

For service-to-service authentication using OAuth2 client credentials flow:

from celine.sdk.auth.oidc import OidcClientCredentialsProvider

provider = OidcClientCredentialsProvider(
    base_url="https://keycloak.example.com/realms/celine",
    client_id="dt-service",
    client_secret="secret",
    scope="openid profile",
    timeout=10.0,
)

# Get a token (automatically handles refresh)
token = await provider.get_token()
print(f"Access token: {token.access_token}")
print(f"Expires at: {token.expires_at}")

Configuration

Configure OIDC via environment variables:

Variable Description
OIDC_BASE_URL OIDC issuer URL
OIDC_CLIENT_ID Client ID
OIDC_CLIENT_SECRET Client secret

Custom Token Provider

from celine.sdk.auth.provider import TokenProvider
from celine.sdk.auth.models import AccessToken

class MyTokenProvider(TokenProvider):
    def __init__(self, api_key: str):
        self._api_key = api_key

    async def get_token(self) -> AccessToken:
        return AccessToken(
            access_token=self._api_key,
            expires_at=float('inf'),
        )

Event Schemas

Events follow a common envelope pattern with type-specific payloads.

Base Event Structure

{
  "@context": "https://celine-project.eu/contexts/dt-event.jsonld",
  "@type": "dt.my-module.my-event",
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "source": {
    "app_key": "my-module.my-app",
    "app_version": "1.0.0",
    "module": "my-module"
  },
  "timestamp": "2025-01-24T10:30:00Z",
  "correlation_id": "req-12345",
  "payload": { ... },
  "metadata": {}
}

Core vs Module Events

The celine.dt.contracts.events module provides generic events:

Type Description
dt.app.execution-started App execution began
dt.app.execution-completed App execution finished
dt.app.execution-failed App execution failed
dt.alert.raised Alert triggered

Module-specific events belong in the module, not in contracts:

# ✅ Correct - import from the module
from celine.dt.modules.ev_charging.events import create_ev_charging_readiness_event

# ❌ Wrong - don't import module events from contracts
from celine.dt.contracts.events import create_ev_charging_event

Topic Conventions

Events are published to topics following this convention:

{prefix}/dt/{module}/{event-type}/{entity-id}

Examples: - celine/dt/ev-charging/readiness-computed/rec-folgaria - celine/dt/app/execution-completed/my-app - celine/dt/alert/raised/threshold-001


Creating Custom Brokers

To implement a custom broker (e.g., Kafka, RabbitMQ):

from celine.dt.contracts.broker import (
    BrokerBase,
    BrokerMessage,
    MessageHandler,
    PublishResult,
    QoS,
    ReceivedMessage,
    SubscribeResult,
)

class KafkaBroker(BrokerBase):
    def __init__(self, bootstrap_servers: str, **kwargs):
        self._servers = bootstrap_servers
        self._producer = None
        self._consumer = None
        self._subscriptions = {}

    async def connect(self) -> None:
        # Initialize Kafka producer and consumer
        pass

    async def disconnect(self) -> None:
        # Close connections
        pass

    async def publish(self, message: BrokerMessage) -> PublishResult:
        # Publish to Kafka topic
        pass

    async def subscribe(
        self,
        topics: list[str],
        handler: MessageHandler,
        qos: QoS = QoS.AT_LEAST_ONCE,
    ) -> SubscribeResult:
        # Subscribe to Kafka topics
        pass

    async def unsubscribe(self, subscription_id: str) -> bool:
        # Remove subscription
        pass

    @property
    def is_connected(self) -> bool:
        return self._producer is not None

Register in config/brokers.yaml:

brokers:
  kafka:
    class: mymodule.kafka:KafkaBroker
    config:
      bootstrap_servers: "kafka:9092"

Testing

Running Mosquitto Locally

docker run -d \
  --name mosquitto \
  -p 1883:1883 \
  eclipse-mosquitto

CLI Testing

# Subscribe
mosquitto_sub -h localhost -t "celine/dt/#" -v

# Publish
mosquitto_pub -h localhost \
  -t "celine/dt/test/event/entity-1" \
  -m '{"value": 42}'

Unit Testing

import pytest
from unittest.mock import AsyncMock
from celine.dt.contracts.broker import ReceivedMessage, QoS

@pytest.mark.asyncio
async def test_message_handler():
    msg = ReceivedMessage(
        topic="dt/test/event",
        payload={"value": 42},
        raw_payload=b'{"value": 42}',
        qos=QoS.AT_LEAST_ONCE,
    )

    handler = AsyncMock()
    await handler(msg)

    handler.assert_called_once_with(msg)

Next Steps

  • Apps - Build Digital Twin applications
  • Components - Build reusable computation units
  • Clients - Configure data clients
  • Values API - Configure data fetchers