Getting Started
This guide walks you through integrating your service with the CELINE Policy Service.
Prerequisites
- Access to the CELINE Keycloak instance
- An OAuth2 client configured for your service (see Scopes & Permissions)
- Network access to the policy service endpoint
Overview
Your service will: 1. Obtain a JWT from Keycloak (for users or service accounts) 2. Call the policy service with the JWT to check authorization 3. Proceed or deny based on the response
Your service sends the JWT to the Policy Service, which returns an allow/deny decision with a reason string.
Step 1: Configure Your OAuth Client
Request a Keycloak client for your service with appropriate scopes. Example for a data processing service:
| Setting | Value |
|---|---|
| Client ID | svc-my-service |
| Client Type | Confidential |
| Service Account | Enabled |
| Scopes | dataset.query, mqtt.write |
Step 2: Obtain a JWT
For Service-to-Service (Client Credentials)
import httpx
async def get_service_token() -> str:
async with httpx.AsyncClient() as client:
response = await client.post(
"http://keycloak:8080/realms/celine/protocol/openid-connect/token",
data={
"grant_type": "client_credentials",
"client_id": "svc-my-service",
"client_secret": "your-client-secret",
}
)
return response.json()["access_token"]
For User Requests
Pass through the user's JWT from the incoming request:
from fastapi import Header
async def my_endpoint(authorization: str = Header(...)):
# Forward this to policy service
jwt_token = authorization # "Bearer <token>"
Step 3: Check Authorization
Using the Generic /authorize Endpoint
import httpx
from dataclasses import dataclass
@dataclass
class PolicyClient:
base_url: str = "http://policy-service:8009"
async def check_access(
self,
jwt_token: str,
resource_type: str,
resource_id: str,
action: str,
attributes: dict | None = None
) -> tuple[bool, str]:
"""
Check if the request is authorized.
Returns:
(allowed, reason) tuple
"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/authorize",
headers={
"Authorization": jwt_token, # Include "Bearer " prefix
"X-Source-Service": "my-service",
"X-Request-Id": "correlation-id-here",
},
json={
"resource": {
"type": resource_type,
"id": resource_id,
"attributes": attributes or {},
},
"action": {
"name": action,
},
},
)
data = response.json()
return data["allowed"], data.get("reason", "")
Example: Protecting a Dataset Endpoint
from fastapi import FastAPI, Header, HTTPException
app = FastAPI()
policy = PolicyClient()
@app.get("/datasets/{dataset_id}")
async def get_dataset(
dataset_id: str,
authorization: str = Header(...),
):
# Check authorization
allowed, reason = await policy.check_access(
jwt_token=authorization,
resource_type="dataset",
resource_id=dataset_id,
action="read",
attributes={"access_level": "internal"},
)
if not allowed:
raise HTTPException(status_code=403, detail=reason)
# Proceed with business logic
return {"dataset_id": dataset_id, "data": "..."}
Step 4: Use Domain-Specific Endpoints (Optional)
For common use cases, convenience endpoints are available:
Dataset Access Check
async def check_dataset_access(jwt: str, dataset_id: str, access_level: str, action: str = "read"):
async with httpx.AsyncClient() as client:
response = await client.post(
f"{POLICY_URL}/dataset/access",
headers={"Authorization": jwt},
json={
"dataset_id": dataset_id,
"access_level": access_level,
"action": action,
},
)
return response.json()["allowed"]
Dataset Row Filters
Get filters to apply to your database queries:
async def get_dataset_filters(jwt: str, dataset_id: str, access_level: str):
async with httpx.AsyncClient() as client:
response = await client.post(
f"{POLICY_URL}/dataset/filters",
headers={"Authorization": jwt},
json={
"dataset_id": dataset_id,
"access_level": access_level,
},
)
data = response.json()
if data["allowed"]:
return data["filters"] # Apply these to your query
return None
Pipeline State Transition
async def can_transition_pipeline(jwt: str, pipeline_id: str, from_state: str, to_state: str):
async with httpx.AsyncClient() as client:
response = await client.post(
f"{POLICY_URL}/pipeline/transition",
headers={"Authorization": jwt},
json={
"pipeline_id": pipeline_id,
"from_state": from_state,
"to_state": to_state,
},
)
return response.json()["allowed"]
Step 5: Handle Responses
Successful Authorization
{
"allowed": true,
"reason": "user has viewer access and client has dataset.query scope",
"request_id": "550e8400-e29b-41d4-a716-446655440000"
}
Denied Authorization
{
"allowed": false,
"reason": "insufficient group privileges",
"request_id": "550e8400-e29b-41d4-a716-446655440000"
}
Map denial reasons to appropriate HTTP status codes:
| Reason Pattern | HTTP Status |
|---|---|
| "anonymous access denied" | 401 Unauthorized |
| "insufficient privileges" | 403 Forbidden |
| "missing scope" | 403 Forbidden |
Complete Integration Example
from fastapi import FastAPI, Header, HTTPException, Depends
from functools import lru_cache
import httpx
app = FastAPI()
class PolicyClient:
def __init__(self, base_url: str = "http://policy-service:8009"):
self.base_url = base_url
async def authorize(
self,
jwt: str,
resource_type: str,
resource_id: str,
action: str,
attributes: dict | None = None,
source_service: str = "my-service",
) -> dict:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.post(
f"{self.base_url}/authorize",
headers={
"Authorization": jwt,
"X-Source-Service": source_service,
},
json={
"resource": {
"type": resource_type,
"id": resource_id,
"attributes": attributes or {},
},
"action": {"name": action},
},
)
response.raise_for_status()
return response.json()
@lru_cache
def get_policy_client():
return PolicyClient()
async def require_authorization(
resource_type: str,
resource_id: str,
action: str,
attributes: dict | None = None,
):
"""Dependency that enforces authorization."""
async def checker(
authorization: str = Header(...),
policy: PolicyClient = Depends(get_policy_client),
):
result = await policy.authorize(
jwt=authorization,
resource_type=resource_type,
resource_id=resource_id,
action=action,
attributes=attributes,
)
if not result["allowed"]:
raise HTTPException(
status_code=403,
detail=result.get("reason", "Access denied"),
)
return result
return checker
# Usage
@app.get("/twin/{twin_id}")
async def get_twin(
twin_id: str,
auth_result: dict = Depends(
require_authorization(
resource_type="dt",
resource_id="twin_id", # Will be resolved
action="read",
)
),
):
return {"twin_id": twin_id, "status": "ok"}
Testing Your Integration
Local Testing with Docker Compose
# Start the stack
docker compose up -d
# Get a test token (adjust for your Keycloak setup)
TOKEN=$(curl -s -X POST \
"http://localhost:8080/realms/celine/protocol/openid-connect/token" \
-d "grant_type=client_credentials" \
-d "client_id=svc-test" \
-d "client_secret=test-secret" \
| jq -r '.access_token')
# Test authorization
curl -X POST http://localhost:8009/authorize \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"resource": {"type": "dataset", "id": "test", "attributes": {"access_level": "internal"}},
"action": {"name": "read"}
}'
Unit Testing with Mocks
import pytest
from unittest.mock import AsyncMock, patch
@pytest.fixture
def mock_policy_client():
with patch("myapp.policy.PolicyClient") as mock:
client = mock.return_value
client.authorize = AsyncMock(return_value={
"allowed": True,
"reason": "test",
"request_id": "test-123",
})
yield client
async def test_authorized_access(mock_policy_client):
# Your test using the mocked policy client
result = await mock_policy_client.authorize(...)
assert result["allowed"] is True
Next Steps
- Review Scopes & Permissions to understand what scopes your service needs
- See API Reference for complete endpoint documentation
- Check MQTT Integration if your service uses MQTT