Broker
The celine.sdk.broker module provides an MQTT client abstraction for CELINE services.
Overview
The MQTT client wraps aiomqtt (or paho-mqtt) with:
- Automatic reconnection on connection loss
- JWT-based authentication (token refreshed before each reconnect)
- Structured subscription handling
- Connection statistics
Usage
from celine.sdk.broker import MqttBroker
from celine.sdk.settings import MqttSettings
settings = MqttSettings()
broker = MqttBroker(settings, token_provider=provider)
async with broker.connect() as client:
# Publish
await client.publish("celine/events/community/COMM1", payload=json_bytes)
# Subscribe
async with client.subscribe("celine/events/#") as messages:
async for message in messages:
print(message.topic, message.payload)
Contracts
The contracts.py module defines the BrokerProtocol interface:
class BrokerProtocol(Protocol):
async def publish(self, topic: str, payload: bytes, qos: int = 1) -> None: ...
async def subscribe(self, topic: str) -> AsyncContextManager: ...
Authentication
The broker uses JWT tokens for MQTT authentication: - Username is set to the OIDC client ID - Password is set to the current access token - On reconnect, a fresh token is fetched from the OIDC provider
Models
class MqttMessage:
topic: str
payload: bytes
qos: int
retain: bool
Stats
The broker tracks connection statistics:
stats = broker.stats()
# stats.connected: bool
# stats.reconnect_count: int
# stats.messages_sent: int
# stats.messages_received: int
Configuration
| Variable | Description | Default |
|---|---|---|
MQTT_HOST |
Broker hostname | localhost |
MQTT_PORT |
Broker port | 1883 |
MQTT_TLS |
Enable TLS | false |
MQTT_CLIENT_ID |
MQTT client identifier | auto-generated |