Developer Guide
This guide shows you how to build Apps, Components, and Simulations for the CELINE Digital Twin. It's organized as a linear tutorial—work through it from start to finish.
Prerequisites
- Python 3.11+
- Understanding of Concepts
- Familiarity with Pydantic models
Part 1: Creating an App
Apps are external-facing operations exposed via /apps API.
Step 1: Define Models
Create Pydantic models for input and output:
# my_module/models.py
from pydantic import BaseModel, Field
class GreetingConfig(BaseModel):
"""Input configuration for the greeting app."""
name: str = Field(..., description="Name to greet")
formal: bool = Field(default=False, description="Use formal greeting")
class GreetingResult(BaseModel):
"""Output from the greeting app."""
message: str
timestamp: str
Step 2: Implement the App
Apps implement the DTApp protocol:
# my_module/apps/greeting.py
from datetime import datetime
from celine.dt.contracts.app import DTApp
from celine.dt.core.context import RunContext
from ..models import GreetingConfig, GreetingResult
class GreetingApp(DTApp[GreetingConfig, GreetingResult]):
"""A simple greeting app."""
key = "my-module.greeting"
version = "1.0.0"
config_type = GreetingConfig
result_type = GreetingResult
input_mapper = None # Optional: transform API input
output_mapper = None # Optional: transform API output
async def run(
self,
config: GreetingConfig,
context: RunContext,
) -> GreetingResult:
# Domain logic only - no HTTP, no database imports
if config.formal:
message = f"Good day, {config.name}."
else:
message = f"Hello, {config.name}!"
return GreetingResult(
message=message,
timestamp=context.now.isoformat(),
)
Step 3: Register in Module
# my_module/module.py
from celine.dt.core.registry import DTRegistry
from .apps.greeting import GreetingApp
class MyModule:
name = "my-module"
version = "1.0.0"
def register(self, registry: DTRegistry) -> None:
registry.register_app(GreetingApp())
module = MyModule()
Step 4: Configure
# config/modules.yaml
modules:
- name: my-module
version: ">=1.0.0"
import: celine.dt.modules.my_module.module:module
enabled: true
Step 5: Test
# tests/test_greeting.py
import pytest
from celine.dt.core.registry import DTRegistry
from celine.dt.core.runner import DTAppRunner
from celine.dt.core.context import RunContext
from celine.dt.modules.my_module.apps.greeting import GreetingApp
@pytest.mark.asyncio
async def test_greeting_informal():
registry = DTRegistry()
registry.register_app(GreetingApp())
runner = DTAppRunner()
context = RunContext.create(datasets=None, state=None, token_provider=None)
result = await runner.run(
registry=registry,
app_key="my-module.greeting",
payload={"name": "World"},
context=context,
)
assert result["message"] == "Hello, World!"
Step 6: Use via API
# List apps
curl http://localhost:8000/apps
# Describe app
curl http://localhost:8000/apps/my-module.greeting/describe
# Run app
curl -X POST http://localhost:8000/apps/my-module.greeting/run \
-H "Content-Type: application/json" \
-d '{"name": "CELINE", "formal": true}'
Part 2: Creating a Component
Components are pure, internal computation units.
Step 1: Define Models
# my_module/models.py
from pydantic import BaseModel
class EnergyBalanceInput(BaseModel):
"""Input for energy balance calculation."""
generation_kwh: list[float]
consumption_kwh: list[float]
class EnergyBalanceOutput(BaseModel):
"""Output from energy balance calculation."""
self_consumption_ratio: float
self_sufficiency_ratio: float
grid_import_kwh: float
grid_export_kwh: float
Step 2: Implement the Component
Components implement the DTComponent protocol:
# my_module/components/energy_balance.py
from celine.dt.contracts.component import DTComponent
from ..models import EnergyBalanceInput, EnergyBalanceOutput
class EnergyBalanceComponent(DTComponent[EnergyBalanceInput, EnergyBalanceOutput]):
"""Pure energy balance calculation."""
key = "my-module.energy-balance"
version = "1.0.0"
input_type = EnergyBalanceInput
output_type = EnergyBalanceOutput
async def compute(
self,
input: EnergyBalanceInput,
context, # RunContext - available but optional for pure components
) -> EnergyBalanceOutput:
# Pure computation - no side effects
total_gen = sum(input.generation_kwh)
total_cons = sum(input.consumption_kwh)
# Simple model: self-consumed = min(gen, cons) at each timestep
self_consumed = sum(
min(g, c) for g, c in zip(input.generation_kwh, input.consumption_kwh)
)
grid_import = total_cons - self_consumed
grid_export = total_gen - self_consumed
return EnergyBalanceOutput(
self_consumption_ratio=self_consumed / total_gen if total_gen > 0 else 0,
self_sufficiency_ratio=self_consumed / total_cons if total_cons > 0 else 0,
grid_import_kwh=grid_import,
grid_export_kwh=grid_export,
)
Step 3: Register in Module
# my_module/module.py
from celine.dt.core.registry import DTRegistry
from .apps.greeting import GreetingApp
from .components.energy_balance import EnergyBalanceComponent
class MyModule:
name = "my-module"
version = "1.0.0"
def register(self, registry: DTRegistry) -> None:
registry.register_app(GreetingApp())
registry.register_component(EnergyBalanceComponent())
module = MyModule()
Step 4: Use in Apps or Simulations
# Inside an app or simulation
component = context.get_component("my-module.energy-balance")
result = await component.compute(
EnergyBalanceInput(generation_kwh=[...], consumption_kwh=[...]),
context,
)
Part 3: Creating a Simulation
Simulations enable what-if exploration with a two-phase execution model.
Step 1: Define Models
Four models are required:
# my_module/models.py
from datetime import datetime
from pydantic import BaseModel, Field
# 1. Scenario Configuration - what context to build
class RECScenarioConfig(BaseModel):
"""Configuration for building a REC planning scenario."""
community_id: str = Field(..., description="REC identifier")
reference_start: datetime
reference_end: datetime
resolution: str = Field(default="1h")
# 2. Scenario - the built, immutable context
class RECScenario(BaseModel):
"""Built scenario with cached data and baseline metrics."""
scenario_id: str = ""
community_id: str
reference_start: datetime
reference_end: datetime
baseline_consumption_kwh: float
baseline_generation_kwh: float
baseline_self_consumption_ratio: float
# 3. Parameters - what-if variables
class RECParameters(BaseModel):
"""Parameters for what-if exploration."""
pv_kwp: float = Field(default=0.0, ge=0.0, description="Additional PV capacity")
battery_kwh: float = Field(default=0.0, ge=0.0, description="Battery capacity")
electricity_price: float = Field(default=0.25, ge=0.0)
# 4. Result - simulation output
class RECResult(BaseModel):
"""Result of a REC planning simulation."""
self_consumption_ratio: float
self_sufficiency_ratio: float
npv_eur: float
payback_years: float | None
Step 2: Implement the Simulation
Simulations implement the DTSimulation protocol:
# my_module/simulations/rec_planning.py
from celine.dt.contracts.simulation import DTSimulation
from ..models import RECScenarioConfig, RECScenario, RECParameters, RECResult
class RECPlanningSimulation(DTSimulation[RECScenarioConfig, RECScenario, RECParameters, RECResult]):
"""REC planning what-if simulation."""
key = "my-module.rec-planning"
version = "1.0.0"
scenario_config_type = RECScenarioConfig
scenario_type = RECScenario
parameters_type = RECParameters
result_type = RECResult
async def build_scenario(
self,
config: RECScenarioConfig,
workspace, # FileWorkspace for storing artifacts
context, # RunContext for data fetching
) -> RECScenario:
"""
Build scenario: EXPENSIVE operation.
- Fetch historical data
- Compute baseline metrics
- Store intermediate artifacts
"""
# Fetch consumption data via values
consumption_data = await context.values.fetch(
"consumption_timeseries",
{
"community_id": config.community_id,
"start": config.reference_start.isoformat(),
"end": config.reference_end.isoformat(),
}
)
# Fetch generation data
generation_data = await context.values.fetch(
"generation_timeseries",
{"community_id": config.community_id, ...}
)
# Compute baseline metrics
total_consumption = sum(r["kwh"] for r in consumption_data)
total_generation = sum(r["kwh"] for r in generation_data)
self_consumed = min(total_consumption, total_generation)
# Store artifacts in workspace (for later runs)
await workspace.write_json("consumption.json", consumption_data)
await workspace.write_json("generation.json", generation_data)
return RECScenario(
community_id=config.community_id,
reference_start=config.reference_start,
reference_end=config.reference_end,
baseline_consumption_kwh=total_consumption,
baseline_generation_kwh=total_generation,
baseline_self_consumption_ratio=self_consumed / total_generation if total_generation > 0 else 0,
)
async def simulate(
self,
scenario: RECScenario,
parameters: RECParameters,
context,
) -> RECResult:
"""
Run simulation: FAST operation.
- Load cached scenario data
- Apply parameters
- Compute results
"""
# Apply PV addition
added_generation = parameters.pv_kwp * 1000 # Simplified
total_generation = scenario.baseline_generation_kwh + added_generation
# Compute new ratios
self_consumed = min(scenario.baseline_consumption_kwh, total_generation)
self_consumption_ratio = self_consumed / total_generation if total_generation > 0 else 0
self_sufficiency_ratio = self_consumed / scenario.baseline_consumption_kwh
# Simple economics
investment = parameters.pv_kwp * 1200 + parameters.battery_kwh * 500
annual_savings = (self_consumed - scenario.baseline_self_consumption_ratio * scenario.baseline_generation_kwh) * parameters.electricity_price
npv = annual_savings * 20 - investment # Simplified 20-year NPV
payback = investment / annual_savings if annual_savings > 0 else None
return RECResult(
self_consumption_ratio=self_consumption_ratio,
self_sufficiency_ratio=self_sufficiency_ratio,
npv_eur=npv,
payback_years=payback,
)
def get_default_parameters(self) -> RECParameters:
"""Return default parameters for baseline comparison."""
return RECParameters()
Step 3: Register in Module
# my_module/module.py
from celine.dt.core.registry import DTRegistry
from .simulations.rec_planning import RECPlanningSimulation
class MyModule:
name = "my-module"
version = "1.0.0"
def register(self, registry: DTRegistry) -> None:
registry.register_simulation(RECPlanningSimulation())
module = MyModule()
Step 4: Use via API
# 1. Build scenario (expensive, cached)
curl -X POST http://localhost:8000/simulations/my-module.rec-planning/scenarios \
-H "Content-Type: application/json" \
-d '{
"config": {
"community_id": "rec-folgaria",
"reference_start": "2024-01-01T00:00:00Z",
"reference_end": "2024-12-31T23:59:59Z"
},
"ttl_hours": 24
}'
# Returns: {"scenario_id": "abc123", ...}
# 2. Run simulation (fast, parameterized)
curl -X POST http://localhost:8000/simulations/my-module.rec-planning/runs \
-H "Content-Type: application/json" \
-d '{
"scenario_id": "abc123",
"parameters": {"pv_kwp": 100, "battery_kwh": 50}
}'
# 3. Run parameter sweep
curl -X POST http://localhost:8000/simulations/my-module.rec-planning/sweep \
-H "Content-Type: application/json" \
-d '{
"scenario_id": "abc123",
"parameter_sets": [
{"pv_kwp": 50},
{"pv_kwp": 100},
{"pv_kwp": 150}
],
"include_baseline": true
}'
Part 4: Using Data in Apps and Simulations
Option 1: Value Fetchers (Recommended)
Configure declarative fetchers:
# config/values.yaml
values:
consumption_timeseries:
client: dataset_api
query: |
SELECT timestamp, kwh FROM consumption
WHERE community_id = :community_id
AND timestamp >= :start
AND timestamp < :end
payload:
type: object
required: [community_id, start, end]
properties:
community_id: { type: string }
start: { type: string }
end: { type: string }
Use in code:
data = await context.values.fetch(
"consumption_timeseries",
{"community_id": "rec-folgaria", "start": "2024-01-01", "end": "2024-12-31"}
)
Option 2: Direct Client Access
For complex queries:
client = context.datasets
rows = await client.query(
sql="SELECT * FROM consumption WHERE ...",
limit=1000,
)
Part 5: Publishing Events
Apps can publish events to brokers:
from celine.dt.contracts.events import create_custom_event
class MyApp(DTApp[...]):
async def run(self, config, context):
# Compute result
result = await self._compute(config)
# Publish event
if context.has_broker():
event = create_custom_event(
event_type="my-module.result-computed",
payload={"value": result.value},
source_app_key=self.key,
)
await context.publish_event(event)
return result
Part 6: Testing
Unit Testing Apps
@pytest.mark.asyncio
async def test_my_app():
registry = DTRegistry()
registry.register_app(MyApp())
runner = DTAppRunner()
context = RunContext.create(datasets=None, state=None, token_provider=None)
result = await runner.run(
registry=registry,
app_key="my-module.my-app",
payload={"input": "value"},
context=context,
)
assert result["output"] == "expected"
Unit Testing Components
@pytest.mark.asyncio
async def test_energy_balance():
component = EnergyBalanceComponent()
result = await component.compute(
EnergyBalanceInput(
generation_kwh=[100, 200, 150],
consumption_kwh=[80, 250, 100],
),
context=None, # Not needed for pure components
)
assert result.self_sufficiency_ratio > 0
Unit Testing Simulations
@pytest.mark.asyncio
async def test_rec_planning_simulation(tmp_path):
from celine.dt.core.simulation.workspace import FileWorkspace
simulation = RECPlanningSimulation()
workspace = FileWorkspace("test", tmp_path)
mock_context = MagicMock()
# Test build_scenario
config = RECScenarioConfig(
community_id="test",
reference_start=datetime(2024, 1, 1),
reference_end=datetime(2024, 12, 31),
)
scenario = await simulation.build_scenario(config, workspace, mock_context)
# Test simulate
parameters = RECParameters(pv_kwp=100)
result = await simulation.simulate(scenario, parameters, mock_context)
assert result.self_consumption_ratio >= 0
Common Patterns
Using Components in Apps
class MyApp(DTApp[...]):
async def run(self, config, context):
# Get component from registry
energy_balance = context.get_component("energy-balance")
# Use component
balance = await energy_balance.compute(
EnergyBalanceInput(...),
context,
)
return MyResult(ratio=balance.self_consumption_ratio)
Workspace Artifacts in Simulations
async def build_scenario(self, config, workspace, context):
# Store time series as Parquet
await workspace.write_parquet("consumption.parquet", df)
# Store metadata as JSON
await workspace.write_json("metadata.json", {"version": "1.0"})
# List artifacts
files = await workspace.list_files()
async def simulate(self, scenario, parameters, context):
# Read from workspace (attached to context)
df = await context.workspace.read_parquet("consumption.parquet")
Error Handling
class MyApp(DTApp[...]):
async def run(self, config, context):
try:
data = await context.values.fetch("my-data", {...})
except ValueError as e:
# Validation errors become 400 responses
raise ValueError(f"Invalid input: {e}")
except RuntimeError as e:
# Runtime errors become 500 responses
raise RuntimeError(f"Computation failed: {e}")
Anti-Patterns to Avoid
❌ Don't import infrastructure in domain code
# BAD
from fastapi import Request
from sqlalchemy import create_engine
❌ Don't hardcode data access
# BAD
import requests
data = requests.get("https://api.example.com/data")
❌ Don't use global state
# BAD
_cache = {} # Module-level state
❌ Don't test only via HTTP
# BAD - integration test as only test
def test_app():
response = client.post("/apps/my-app/run", json={...})
✅ Do use context for everything
# GOOD
data = await context.values.fetch("my-data", params)
await context.publish_event(event)
Next Steps
- Simulations - Deep dive into simulation features
- Values API - Configure data fetchers
- Brokers - Set up event publishing