Getting Started
This guide walks you through integrating your service with the CELINE Policy Service.
Prerequisites
- Access to the CELINE Keycloak instance
- An OAuth2 client configured for your service (see Scopes & Permissions)
- Network access to the policy service endpoint
Overview
Your service will: 1. Obtain a JWT from Keycloak (for users or service accounts) 2. Call the policy service with the JWT to check authorization 3. Proceed or deny based on the response
┌─────────────┐ ┌──────────────┐ ┌────────────────┐
│ Your Service│─────▶│Policy Service│─────▶│ Allow / Deny │
│ + JWT │ │ │ │ + Reason │
└─────────────┘ └──────────────┘ └────────────────┘
Step 1: Configure Your OAuth Client
Request a Keycloak client for your service with appropriate scopes. Example for a data processing service:
| Setting | Value |
|---|---|
| Client ID | svc-my-service |
| Client Type | Confidential |
| Service Account | Enabled |
| Scopes | dataset.query, mqtt.write |
Step 2: Obtain a JWT
For Service-to-Service (Client Credentials)
import httpx
async def get_service_token() -> str:
async with httpx.AsyncClient() as client:
response = await client.post(
"http://keycloak:8080/realms/celine/protocol/openid-connect/token",
data={
"grant_type": "client_credentials",
"client_id": "svc-my-service",
"client_secret": "your-client-secret",
}
)
return response.json()["access_token"]
For User Requests
Pass through the user's JWT from the incoming request:
from fastapi import Header
async def my_endpoint(authorization: str = Header(...)):
# Forward this to policy service
jwt_token = authorization # "Bearer <token>"
Step 3: Check Authorization
Using the Generic /authorize Endpoint
import httpx
from dataclasses import dataclass
@dataclass
class PolicyClient:
base_url: str = "http://policy-service:8009"
async def check_access(
self,
jwt_token: str,
resource_type: str,
resource_id: str,
action: str,
attributes: dict | None = None
) -> tuple[bool, str]:
"""
Check if the request is authorized.
Returns:
(allowed, reason) tuple
"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/authorize",
headers={
"Authorization": jwt_token, # Include "Bearer " prefix
"X-Source-Service": "my-service",
"X-Request-Id": "correlation-id-here",
},
json={
"resource": {
"type": resource_type,
"id": resource_id,
"attributes": attributes or {},
},
"action": {
"name": action,
},
},
)
data = response.json()
return data["allowed"], data.get("reason", "")
Example: Protecting a Dataset Endpoint
from fastapi import FastAPI, Header, HTTPException
app = FastAPI()
policy = PolicyClient()
@app.get("/datasets/{dataset_id}")
async def get_dataset(
dataset_id: str,
authorization: str = Header(...),
):
# Check authorization
allowed, reason = await policy.check_access(
jwt_token=authorization,
resource_type="dataset",
resource_id=dataset_id,
action="read",
attributes={"access_level": "internal"},
)
if not allowed:
raise HTTPException(status_code=403, detail=reason)
# Proceed with business logic
return {"dataset_id": dataset_id, "data": "..."}
Step 4: Use Domain-Specific Endpoints (Optional)
For common use cases, convenience endpoints are available:
Dataset Access Check
async def check_dataset_access(jwt: str, dataset_id: str, access_level: str, action: str = "read"):
async with httpx.AsyncClient() as client:
response = await client.post(
f"{POLICY_URL}/dataset/access",
headers={"Authorization": jwt},
json={
"dataset_id": dataset_id,
"access_level": access_level,
"action": action,
},
)
return response.json()["allowed"]
Dataset Row Filters
Get filters to apply to your database queries:
async def get_dataset_filters(jwt: str, dataset_id: str, access_level: str):
async with httpx.AsyncClient() as client:
response = await client.post(
f"{POLICY_URL}/dataset/filters",
headers={"Authorization": jwt},
json={
"dataset_id": dataset_id,
"access_level": access_level,
},
)
data = response.json()
if data["allowed"]:
return data["filters"] # Apply these to your query
return None
Pipeline State Transition
async def can_transition_pipeline(jwt: str, pipeline_id: str, from_state: str, to_state: str):
async with httpx.AsyncClient() as client:
response = await client.post(
f"{POLICY_URL}/pipeline/transition",
headers={"Authorization": jwt},
json={
"pipeline_id": pipeline_id,
"from_state": from_state,
"to_state": to_state,
},
)
return response.json()["allowed"]
Step 5: Handle Responses
Successful Authorization
{
"allowed": true,
"reason": "user has viewer access and client has dataset.query scope",
"request_id": "550e8400-e29b-41d4-a716-446655440000"
}
Denied Authorization
{
"allowed": false,
"reason": "insufficient group privileges",
"request_id": "550e8400-e29b-41d4-a716-446655440000"
}
Map denial reasons to appropriate HTTP status codes:
| Reason Pattern | HTTP Status |
|---|---|
| "anonymous access denied" | 401 Unauthorized |
| "insufficient privileges" | 403 Forbidden |
| "missing scope" | 403 Forbidden |
Complete Integration Example
from fastapi import FastAPI, Header, HTTPException, Depends
from functools import lru_cache
import httpx
app = FastAPI()
class PolicyClient:
def __init__(self, base_url: str = "http://policy-service:8009"):
self.base_url = base_url
async def authorize(
self,
jwt: str,
resource_type: str,
resource_id: str,
action: str,
attributes: dict | None = None,
source_service: str = "my-service",
) -> dict:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.post(
f"{self.base_url}/authorize",
headers={
"Authorization": jwt,
"X-Source-Service": source_service,
},
json={
"resource": {
"type": resource_type,
"id": resource_id,
"attributes": attributes or {},
},
"action": {"name": action},
},
)
response.raise_for_status()
return response.json()
@lru_cache
def get_policy_client():
return PolicyClient()
async def require_authorization(
resource_type: str,
resource_id: str,
action: str,
attributes: dict | None = None,
):
"""Dependency that enforces authorization."""
async def checker(
authorization: str = Header(...),
policy: PolicyClient = Depends(get_policy_client),
):
result = await policy.authorize(
jwt=authorization,
resource_type=resource_type,
resource_id=resource_id,
action=action,
attributes=attributes,
)
if not result["allowed"]:
raise HTTPException(
status_code=403,
detail=result.get("reason", "Access denied"),
)
return result
return checker
# Usage
@app.get("/twin/{twin_id}")
async def get_twin(
twin_id: str,
auth_result: dict = Depends(
require_authorization(
resource_type="dt",
resource_id="twin_id", # Will be resolved
action="read",
)
),
):
return {"twin_id": twin_id, "status": "ok"}
Testing Your Integration
Local Testing with Docker Compose
# Start the stack
docker compose up -d
# Get a test token (adjust for your Keycloak setup)
TOKEN=$(curl -s -X POST \
"http://localhost:8080/realms/celine/protocol/openid-connect/token" \
-d "grant_type=client_credentials" \
-d "client_id=svc-test" \
-d "client_secret=test-secret" \
| jq -r '.access_token')
# Test authorization
curl -X POST http://localhost:8009/authorize \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"resource": {"type": "dataset", "id": "test", "attributes": {"access_level": "internal"}},
"action": {"name": "read"}
}'
Unit Testing with Mocks
import pytest
from unittest.mock import AsyncMock, patch
@pytest.fixture
def mock_policy_client():
with patch("myapp.policy.PolicyClient") as mock:
client = mock.return_value
client.authorize = AsyncMock(return_value={
"allowed": True,
"reason": "test",
"request_id": "test-123",
})
yield client
async def test_authorized_access(mock_policy_client):
# Your test using the mocked policy client
result = await mock_policy_client.authorize(...)
assert result["allowed"] is True
Next Steps
- Review Scopes & Permissions to understand what scopes your service needs
- See API Reference for complete endpoint documentation
- Check MQTT Integration if your service uses MQTT