fleetforge_control_plane/
lib.rs

1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::sync::Arc;
4
5use anyhow::{Context, Result};
6use axum::{
7    extract::{Path, State},
8    http::StatusCode,
9    response::IntoResponse,
10    routing::{get, post},
11    Json, Router,
12};
13use chrono::{DateTime, Utc};
14use parking_lot::RwLock;
15use serde::{Deserialize, Serialize};
16use tokio::net::TcpListener;
17use tower_http::trace::TraceLayer;
18use tracing::{info, instrument};
19use uuid::Uuid;
20
21const VALID_ROLES: &[&str] = &["admin", "viewer", "billing"];
22
23#[derive(Debug, Clone)]
24pub struct ControlPlaneConfig {
25    pub addr: SocketAddr,
26}
27
28#[derive(Clone, Debug, Serialize, Deserialize)]
29pub struct Tenant {
30    pub id: Uuid,
31    pub name: String,
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub contact: Option<String>,
34    pub status: &'static str,
35}
36
37#[derive(Clone, Debug, Serialize, Deserialize)]
38pub struct Project {
39    pub id: Uuid,
40    pub tenant_id: Uuid,
41    pub name: String,
42}
43
44#[derive(Clone, Debug, Serialize, Deserialize)]
45pub struct ApiKey {
46    pub id: Uuid,
47    pub project_id: Uuid,
48    pub key: String,
49}
50
51#[derive(Clone, Debug, Serialize, Deserialize)]
52pub struct TenantQuota {
53    pub tenant_id: Uuid,
54    #[serde(default)]
55    pub daily_token_limit: Option<i64>,
56    #[serde(default)]
57    pub daily_cost_limit: Option<f64>,
58}
59
60#[derive(Clone, Debug, Serialize, Deserialize)]
61pub struct BillingRecord {
62    pub tenant_id: Uuid,
63    pub recorded_at: DateTime<Utc>,
64    pub tokens: i64,
65    pub cost: f64,
66}
67
68#[derive(Default)]
69struct ControlPlaneState {
70    tenants: RwLock<HashMap<Uuid, Tenant>>,
71    projects: RwLock<HashMap<Uuid, Project>>,
72    keys: RwLock<HashMap<Uuid, ApiKey>>,
73    quotas: RwLock<HashMap<Uuid, TenantQuota>>,
74    tenant_roles: RwLock<HashMap<Uuid, HashMap<String, String>>>,
75    billing: RwLock<Vec<BillingRecord>>,
76}
77
78#[derive(Debug, Deserialize)]
79struct TenantSpec {
80    name: String,
81    #[serde(default)]
82    contact: Option<String>,
83}
84
85#[derive(Debug, Deserialize)]
86struct ProjectSpec {
87    name: String,
88}
89
90#[derive(Debug, Deserialize)]
91struct QuotaSpec {
92    #[serde(default)]
93    daily_token_limit: Option<i64>,
94    #[serde(default)]
95    daily_cost_limit: Option<f64>,
96}
97
98#[derive(Debug, Deserialize)]
99struct RoleSpec {
100    email: String,
101    role: String,
102}
103
104#[derive(Debug, Deserialize)]
105struct UsageSpec {
106    #[serde(default)]
107    tokens: Option<i64>,
108    #[serde(default)]
109    cost: Option<f64>,
110    #[serde(default)]
111    recorded_at: Option<DateTime<Utc>>,
112}
113
114#[derive(Debug, Serialize)]
115struct CreateTenantResponse {
116    tenant: Tenant,
117}
118
119#[derive(Debug, Serialize)]
120struct CreateProjectResponse {
121    project: Project,
122}
123
124#[derive(Debug, Serialize)]
125struct CreateKeyResponse {
126    key: ApiKey,
127}
128
129#[derive(Debug, Serialize)]
130struct ErrorResponse {
131    error: String,
132}
133
134#[derive(Debug, Serialize)]
135struct QuotaResponse {
136    quota: TenantQuota,
137}
138
139#[derive(Debug, Serialize)]
140struct RolesResponse {
141    roles: Vec<RoleBinding>,
142}
143
144#[derive(Debug, Serialize, Clone)]
145struct RoleBinding {
146    email: String,
147    role: String,
148}
149
150#[derive(Debug, Serialize)]
151struct UsageResponse {
152    records: Vec<BillingRecord>,
153}
154
155pub async fn serve(config: ControlPlaneConfig) -> Result<()> {
156    let listener = TcpListener::bind(config.addr)
157        .await
158        .with_context(|| format!("failed to bind control plane on {}", config.addr))?;
159    info!(addr = %config.addr, "starting managed control plane");
160
161    let state = Arc::new(ControlPlaneState::default());
162    let app = Router::new()
163        .route("/healthz", get(healthz))
164        .route("/v1/tenants", post(create_tenant).get(list_tenants))
165        .route("/v1/tenants/:tenant_id/projects", post(create_project))
166        .route(
167            "/v1/tenants/:tenant_id/quotas",
168            post(upsert_quota).get(get_quota),
169        )
170        .route(
171            "/v1/tenants/:tenant_id/roles",
172            post(assign_role).get(list_roles),
173        )
174        .route(
175            "/v1/tenants/:tenant_id/usage",
176            post(record_usage).get(list_usage),
177        )
178        .route(
179            "/v1/projects/:project_id/keys",
180            post(issue_key).get(list_keys),
181        )
182        .with_state(state)
183        .layer(TraceLayer::new_for_http());
184
185    axum::serve(listener, app)
186        .await
187        .context("control plane server exited unexpectedly")
188}
189
190async fn healthz() -> &'static str {
191    "ok"
192}
193
194#[instrument(skip(state, payload))]
195async fn create_tenant(
196    State(state): State<Arc<ControlPlaneState>>,
197    Json(payload): Json<TenantSpec>,
198) -> impl IntoResponse {
199    if payload.name.trim().is_empty() {
200        return (
201            StatusCode::BAD_REQUEST,
202            Json(ErrorResponse {
203                error: "name is required".into(),
204            }),
205        );
206    }
207
208    let tenant = Tenant {
209        id: Uuid::new_v4(),
210        name: payload.name.clone(),
211        contact: payload.contact.clone(),
212        status: "provisioning",
213    };
214
215    state.tenants.write().insert(tenant.id, tenant.clone());
216    info!(tenant_id = %tenant.id, "provisioned tenant");
217    (StatusCode::CREATED, Json(CreateTenantResponse { tenant }))
218}
219
220async fn list_tenants(State(state): State<Arc<ControlPlaneState>>) -> Json<Vec<Tenant>> {
221    let tenants = state.tenants.read();
222    Json(tenants.values().cloned().collect())
223}
224
225#[instrument(skip(state, payload))]
226async fn create_project(
227    State(state): State<Arc<ControlPlaneState>>,
228    Path(tenant_id): Path<Uuid>,
229    Json(payload): Json<ProjectSpec>,
230) -> impl IntoResponse {
231    if payload.name.trim().is_empty() {
232        return (
233            StatusCode::BAD_REQUEST,
234            Json(ErrorResponse {
235                error: "name is required".into(),
236            }),
237        );
238    }
239
240    let tenants = state.tenants.read();
241    if !tenants.contains_key(&tenant_id) {
242        return (
243            StatusCode::NOT_FOUND,
244            Json(ErrorResponse {
245                error: "tenant not found".into(),
246            }),
247        );
248    }
249    drop(tenants);
250
251    let project = Project {
252        id: Uuid::new_v4(),
253        tenant_id,
254        name: payload.name.clone(),
255    };
256    state.projects.write().insert(project.id, project.clone());
257    info!(tenant_id = %tenant_id, project_id = %project.id, "created project");
258    (StatusCode::CREATED, Json(CreateProjectResponse { project }))
259}
260
261#[instrument(skip(state))]
262async fn issue_key(
263    State(state): State<Arc<ControlPlaneState>>,
264    Path(project_id): Path<Uuid>,
265) -> impl IntoResponse {
266    let projects = state.projects.read();
267    if !projects.contains_key(&project_id) {
268        return (
269            StatusCode::NOT_FOUND,
270            Json(ErrorResponse {
271                error: "project not found".into(),
272            }),
273        );
274    }
275    drop(projects);
276
277    let api_key = ApiKey {
278        id: Uuid::new_v4(),
279        project_id,
280        key: format!("ffk_{}", Uuid::new_v4().without_hyphens()),
281    };
282    state.keys.write().insert(api_key.id, api_key.clone());
283    info!(project_id = %project_id, key_id = %api_key.id, "issued project key");
284    (
285        StatusCode::CREATED,
286        Json(CreateKeyResponse { key: api_key }),
287    )
288}
289
290async fn list_keys(
291    State(state): State<Arc<ControlPlaneState>>,
292    Path(project_id): Path<Uuid>,
293) -> impl IntoResponse {
294    let projects = state.projects.read();
295    if !projects.contains_key(&project_id) {
296        return (
297            StatusCode::NOT_FOUND,
298            Json(ErrorResponse {
299                error: "project not found".into(),
300            }),
301        );
302    }
303    drop(projects);
304
305    let keys = state
306        .keys
307        .read()
308        .values()
309        .filter(|key| key.project_id == project_id)
310        .cloned()
311        .collect::<Vec<_>>();
312
313    Json(keys)
314}
315
316#[instrument(skip(state, payload))]
317async fn assign_role(
318    State(state): State<Arc<ControlPlaneState>>,
319    Path(tenant_id): Path<Uuid>,
320    Json(payload): Json<RoleSpec>,
321) -> impl IntoResponse {
322    let tenants = state.tenants.read();
323    if !tenants.contains_key(&tenant_id) {
324        return (
325            StatusCode::NOT_FOUND,
326            Json(ErrorResponse {
327                error: "tenant not found".into(),
328            }),
329        );
330    }
331    drop(tenants);
332
333    let role = payload.role.to_ascii_lowercase();
334    if !VALID_ROLES.contains(&role.as_str()) {
335        return (
336            StatusCode::BAD_REQUEST,
337            Json(ErrorResponse {
338                error: format!("unknown role '{}'", payload.role),
339            }),
340        );
341    }
342    if payload.email.trim().is_empty() {
343        return (
344            StatusCode::BAD_REQUEST,
345            Json(ErrorResponse {
346                error: "email is required".into(),
347            }),
348        );
349    }
350
351    let mut roles = state.tenant_roles.write();
352    roles
353        .entry(tenant_id)
354        .or_default()
355        .insert(payload.email.clone(), role.clone());
356    info!(tenant_id = %tenant_id, email = %payload.email, role = %role, "assigned tenant role");
357    (
358        StatusCode::OK,
359        Json(RolesResponse {
360            roles: roles[&tenant_id]
361                .iter()
362                .map(|(email, role)| RoleBinding {
363                    email: email.clone(),
364                    role: role.clone(),
365                })
366                .collect(),
367        }),
368    )
369}
370
371async fn list_roles(
372    State(state): State<Arc<ControlPlaneState>>,
373    Path(tenant_id): Path<Uuid>,
374) -> impl IntoResponse {
375    let tenants = state.tenants.read();
376    if !tenants.contains_key(&tenant_id) {
377        return (
378            StatusCode::NOT_FOUND,
379            Json(ErrorResponse {
380                error: "tenant not found".into(),
381            }),
382        );
383    }
384    drop(tenants);
385
386    let roles = state.tenant_roles.read();
387    let bindings = roles
388        .get(&tenant_id)
389        .map(|entries| {
390            entries
391                .iter()
392                .map(|(email, role)| RoleBinding {
393                    email: email.clone(),
394                    role: role.clone(),
395                })
396                .collect::<Vec<_>>()
397        })
398        .unwrap_or_default();
399    Json(RolesResponse { roles: bindings })
400}
401
402#[instrument(skip(state, payload))]
403async fn record_usage(
404    State(state): State<Arc<ControlPlaneState>>,
405    Path(tenant_id): Path<Uuid>,
406    Json(payload): Json<UsageSpec>,
407) -> impl IntoResponse {
408    let tenants = state.tenants.read();
409    if !tenants.contains_key(&tenant_id) {
410        return (
411            StatusCode::NOT_FOUND,
412            Json(ErrorResponse {
413                error: "tenant not found".into(),
414            }),
415        );
416    }
417    drop(tenants);
418
419    let tokens = payload.tokens.unwrap_or_default().max(0);
420    let cost = payload.cost.unwrap_or_default().max(0.0);
421    let record = BillingRecord {
422        tenant_id,
423        recorded_at: payload.recorded_at.unwrap_or_else(Utc::now),
424        tokens,
425        cost,
426    };
427    state.billing.write().push(record.clone());
428    info!(tenant_id = %tenant_id, tokens = tokens, cost = cost, "recorded usage sample");
429    (StatusCode::CREATED, Json(record))
430}
431
432async fn list_usage(
433    State(state): State<Arc<ControlPlaneState>>,
434    Path(tenant_id): Path<Uuid>,
435) -> impl IntoResponse {
436    let tenants = state.tenants.read();
437    if !tenants.contains_key(&tenant_id) {
438        return (
439            StatusCode::NOT_FOUND,
440            Json(ErrorResponse {
441                error: "tenant not found".into(),
442            }),
443        );
444    }
445    drop(tenants);
446
447    let records = state
448        .billing
449        .read()
450        .iter()
451        .filter(|record| record.tenant_id == tenant_id)
452        .cloned()
453        .collect::<Vec<_>>();
454    Json(UsageResponse { records })
455}
456
457#[instrument(skip(state, payload))]
458async fn upsert_quota(
459    State(state): State<Arc<ControlPlaneState>>,
460    Path(tenant_id): Path<Uuid>,
461    Json(payload): Json<QuotaSpec>,
462) -> impl IntoResponse {
463    let tenants = state.tenants.read();
464    if !tenants.contains_key(&tenant_id) {
465        return (
466            StatusCode::NOT_FOUND,
467            Json(ErrorResponse {
468                error: "tenant not found".into(),
469            }),
470        );
471    }
472    drop(tenants);
473
474    if payload.daily_token_limit.is_none() && payload.daily_cost_limit.is_none() {
475        state.quotas.write().remove(&tenant_id);
476        info!(tenant_id = %tenant_id, "cleared tenant quota limits");
477        return StatusCode::NO_CONTENT;
478    }
479
480    let quota = TenantQuota {
481        tenant_id,
482        daily_token_limit: payload.daily_token_limit,
483        daily_cost_limit: payload.daily_cost_limit,
484    };
485    state.quotas.write().insert(tenant_id, quota.clone());
486    info!(tenant_id = %tenant_id, "updated tenant quota limits");
487    (StatusCode::OK, Json(QuotaResponse { quota }))
488}
489
490async fn get_quota(
491    State(state): State<Arc<ControlPlaneState>>,
492    Path(tenant_id): Path<Uuid>,
493) -> impl IntoResponse {
494    let tenants = state.tenants.read();
495    if !tenants.contains_key(&tenant_id) {
496        return (
497            StatusCode::NOT_FOUND,
498            Json(ErrorResponse {
499                error: "tenant not found".into(),
500            }),
501        );
502    }
503    drop(tenants);
504
505    let quotas = state.quotas.read();
506    match quotas.get(&tenant_id) {
507        Some(quota) => (
508            StatusCode::OK,
509            Json(QuotaResponse {
510                quota: quota.clone(),
511            }),
512        ),
513        None => (
514            StatusCode::NOT_FOUND,
515            Json(ErrorResponse {
516                error: "quota not configured".into(),
517            }),
518        ),
519    }
520}
521
522#[cfg(test)]
523mod tests {
524    use super::*;
525    use axum::extract::State;
526
527    fn seeded_state() -> (Arc<ControlPlaneState>, Uuid) {
528        let state = ControlPlaneState::default();
529        let tenant = Tenant {
530            id: Uuid::new_v4(),
531            name: "Acme".into(),
532            contact: Some("ops@acme.test".into()),
533            status: "active",
534        };
535        let id = tenant.id;
536        state.tenants.write().insert(id, tenant);
537        (Arc::new(state), id)
538    }
539
540    #[tokio::test]
541    async fn assign_role_and_list_roles() {
542        let (state, tenant_id) = seeded_state();
543
544        let response = assign_role(
545            State(state.clone()),
546            Path(tenant_id),
547            Json(RoleSpec {
548                email: "owner@acme.test".into(),
549                role: "admin".into(),
550            }),
551        )
552        .await;
553
554        assert_eq!(response.0, StatusCode::OK);
555
556        let Json(listed) = list_roles(State(state.clone()), Path(tenant_id)).await;
557        assert_eq!(listed.roles.len(), 1);
558        assert_eq!(listed.roles[0].role, "admin");
559    }
560
561    #[tokio::test]
562    async fn assign_role_rejects_unknown_role() {
563        let (state, tenant_id) = seeded_state();
564
565        let response = assign_role(
566            State(state),
567            Path(tenant_id),
568            Json(RoleSpec {
569                email: "owner@acme.test".into(),
570                role: "unknown".into(),
571            }),
572        )
573        .await;
574
575        assert_eq!(response.0, StatusCode::BAD_REQUEST);
576    }
577
578    #[tokio::test]
579    async fn record_and_list_usage() {
580        let (state, tenant_id) = seeded_state();
581
582        let response = record_usage(
583            State(state.clone()),
584            Path(tenant_id),
585            Json(UsageSpec {
586                tokens: Some(120),
587                cost: Some(0.45),
588                recorded_at: None,
589            }),
590        )
591        .await;
592
593        assert_eq!(response.0, StatusCode::CREATED);
594
595        let Json(listed) = list_usage(State(state), Path(tenant_id)).await;
596        assert_eq!(listed.records.len(), 1);
597        assert_eq!(listed.records[0].tokens, 120);
598    }
599}