fleetforge_runtime_api/
lib.rs

1//! FleetForge gRPC API surface definitions and server wiring.
2
3mod auth;
4mod http;
5mod transparency;
6mod transparency_writer;
7
8use std::collections::{HashMap, HashSet, VecDeque};
9use std::convert::{Infallible, TryFrom};
10use std::env;
11use std::net::SocketAddr;
12use std::sync::Arc;
13
14use ::http::{header, HeaderValue, Method};
15use anyhow::{anyhow, Context, Result};
16use auth::{AuthError, AuthHandle, Role};
17use bytes::Bytes;
18use chrono::{DateTime, TimeZone, Utc};
19use clickhouse::{Client as ClickhouseClient, Row};
20use fleetforge_changeops::{
21    BudgetSignal as GateBudgetSignal, ChangeDecisionEffect as GateEffect, ChangeDiff as GateDiff,
22    ChangeFollowupRecord, ChangeFollowupRequest as GateFollowupRequest,
23    ChangeGateDecision as GateDecision, ChangeGateEngine, ChangeGateRequest as GateRequest,
24    CoverageComponent as GateCoverageComponent, CoverageReport as GateCoverageReport,
25    DecisionScorecard as GateScorecard, EvalMetric as GateEvalMetric,
26    FollowupAction as GateFollowupAction, FollowupOutcome as GateFollowupOutcome,
27    MetricDirection as GateMetricDirection,
28};
29use fleetforge_common::{
30    labels::{labels_map_to_value, labels_value_to_map},
31    licensing::{feature_allowed, LicensedFeature},
32    prost_json::{json_to_prost_struct, json_to_prost_value, prost_struct_to_json},
33    validation::{validate_run_spec, validate_step_spec, SchemaValidationError},
34};
35use fleetforge_contracts::{
36    runtime,
37    tap::{
38        self, subscribe_run_request, EventSeverity, PolicyDecisionEffect, QueryEventSource,
39        QueryEventsRequest, QueryEventsResponse, RunEventEnvelope, RunTraceContext,
40        SubscribeRunAck, SubscribeRunOpen, SubscribeRunRequest, SubscribeRunResponse,
41    },
42    RuntimeService, RuntimeServiceClient, RuntimeServiceServer, TapService, TapServiceClient,
43    TapServiceServer,
44};
45use fleetforge_storage::{models, models::NewOutboxEvent, Storage};
46use fleetforge_telemetry::audit::Audit;
47use fleetforge_trust::{
48    Attestation, AttestationVault, InMemoryAttestationVault, ObjectStoreAttestationVault,
49    TrustSubject,
50};
51use futures::Stream;
52use http_body::Body as _;
53use hyper::service::{make_service_fn, service_fn};
54use hyper::{Body, Request as HyperRequest, Server as HyperServer};
55use serde::Deserialize;
56use serde_json::{json, Map, Number, Value};
57use std::pin::Pin;
58use tokio::net::TcpListener;
59use tokio::sync::mpsc;
60use tokio::time::{interval, sleep, Duration, MissedTickBehavior};
61use tokio_stream::wrappers::ReceiverStream;
62use tonic::transport::Server;
63use tonic::{Request, Response, Status};
64use tonic_health::pb::health_check_response::ServingStatus;
65use tonic_health::server::health_reporter;
66use tonic_reflection::server::Builder as ReflectionBuilder;
67use tower::util::BoxCloneService;
68use tower::{Service, ServiceExt};
69use tower_http::cors::CorsLayer;
70use tracing::{error, info, instrument, warn};
71use transparency::{transparency_log_from_env, TransparencyContext, TransparencyLog};
72use transparency_writer::TransparencyWriter;
73use uuid::Uuid;
74
75#[derive(Clone, Copy, Debug, PartialEq, Eq)]
76enum TransparencyScope {
77    Disabled,
78    Gates,
79    Runs,
80    Artifacts,
81}
82
83impl TransparencyScope {
84    fn includes_gates(self) -> bool {
85        matches!(
86            self,
87            TransparencyScope::Gates | TransparencyScope::Runs | TransparencyScope::Artifacts
88        )
89    }
90}
91#[derive(Clone)]
92struct RuntimeApiService {
93    storage: Arc<Storage>,
94    auth: Arc<AuthHandle>,
95    audit: Arc<Audit>,
96    attestation_vault: Arc<dyn AttestationVault>,
97    changeops: ChangeGateEngine,
98    transparency: Arc<dyn TransparencyLog>,
99    transparency_writer_enabled: bool,
100    transparency_scope: TransparencyScope,
101}
102
103impl RuntimeApiService {
104    fn new(
105        storage: Arc<Storage>,
106        auth: Arc<AuthHandle>,
107        audit: Arc<Audit>,
108        attestation_vault: Arc<dyn AttestationVault>,
109        transparency: Arc<dyn TransparencyLog>,
110        transparency_writer_enabled: bool,
111        transparency_scope: TransparencyScope,
112    ) -> Self {
113        Self {
114            storage,
115            auth,
116            audit,
117            attestation_vault,
118            changeops: ChangeGateEngine::default(),
119            transparency,
120            transparency_writer_enabled,
121            transparency_scope,
122        }
123    }
124
125    async fn require_role(
126        &self,
127        metadata: &tonic::metadata::MetadataMap,
128        required: Role,
129    ) -> Result<Role, Status> {
130        self.auth
131            .require_grpc(metadata, required)
132            .await
133            .map_err(|err| match err {
134                AuthError::MissingToken | AuthError::InvalidToken => {
135                    Status::unauthenticated(err.to_string())
136                }
137                AuthError::InsufficientRole { .. }
138                | AuthError::MissingRoleClaim(_)
139                | AuthError::UnsupportedRole { .. } => Status::permission_denied(err.to_string()),
140                AuthError::NotConfigured => Status::unauthenticated(err.to_string()),
141                AuthError::Backend(_) => Status::internal(err.to_string()),
142            })
143    }
144
145    fn actor_from_metadata(metadata: &tonic::metadata::MetadataMap, role: Role) -> String {
146        metadata
147            .get("x-actor")
148            .and_then(|value| value.to_str().ok())
149            .map(|value| value.to_string())
150            .unwrap_or_else(|| format!("role:{role:?}"))
151    }
152}
153
154fn invalid_argument(message: impl Into<String>) -> Status {
155    Status::invalid_argument(message.into())
156}
157
158fn internal_error(message: impl Into<String>) -> Status {
159    Status::internal(message.into())
160}
161
162const QUERY_DEFAULT_PAGE_SIZE: u32 = 100;
163const QUERY_MAX_PAGE_SIZE: u32 = 500;
164const QUERY_FETCH_MULTIPLIER: u32 = 4;
165const TRANSPARENCY_WRITER_FLAG: &str = "FLEETFORGE_TRANSPARENCY_WRITER";
166const TRANSPARENCY_WRITER_INTERVAL_ENV: &str = "FLEETFORGE_TRANSPARENCY_WRITER_INTERVAL_SECS";
167const TRANSPARENCY_WRITER_INTERVAL_MIN_SECS: u64 = 5;
168const TRANSPARENCY_WRITER_DEFAULT_INTERVAL_SECS: u64 = 30;
169
170fn to_timestamp(time: DateTime<Utc>) -> prost_types::Timestamp {
171    prost_types::Timestamp {
172        seconds: time.timestamp(),
173        nanos: time.timestamp_subsec_nanos() as i32,
174    }
175}
176
177fn value_to_struct(value: &Value) -> Option<prost_types::Struct> {
178    if value.is_null() {
179        return None;
180    }
181    match json_to_prost_struct(value) {
182        Ok(struct_value) => Some(struct_value),
183        Err(err) => {
184            warn!(error = %err, "failed to convert JSON value to protobuf Struct");
185            None
186        }
187    }
188}
189
190impl RuntimeApiService {
191    async fn publish_scitt_entry(
192        &self,
193        change_id: &str,
194        gate_id: Uuid,
195        artifact_sha: &str,
196        attestation_refs: &[Uuid],
197        metadata: &mut Value,
198    ) -> Result<(), Status> {
199        let signer = fleetforge_trust::scitt_signer()
200            .map_err(|err| internal_error(format!("SCITT signer unavailable: {err}")))?;
201        let scitt_entry = fleetforge_trust::build_scitt_entry(
202            change_id,
203            attestation_refs,
204            artifact_sha,
205            metadata,
206            signer.as_ref(),
207        )
208        .map_err(|err| internal_error(format!("failed to build SCITT entry: {err}")))?;
209
210        let transparency_ctx = TransparencyContext {
211            change_id,
212            gate_id: &gate_id,
213            artifact_sha256: artifact_sha,
214            attestation_ids: attestation_refs,
215        };
216        let record = self
217            .transparency
218            .append(&scitt_entry, &transparency_ctx)
219            .await
220            .map_err(|err| internal_error(format!("failed to publish SCITT entry: {err}")))?;
221
222        let scitt_metadata = json!({
223            "id": record.entry_id,
224            "sha256": hex::encode(record.artifact.sha256),
225            "media_type": record.artifact.media_type.clone(),
226            "size": record.artifact.size,
227            "attestation_ids": attestation_refs.iter().map(|id| id.to_string()).collect::<Vec<_>>(),
228            "transparency": record.receipt,
229        });
230
231        if let Value::Object(mut meta_obj) = metadata.take() {
232            meta_obj.insert("scitt_entry".to_string(), scitt_metadata);
233            *metadata = Value::Object(meta_obj);
234        } else {
235            *metadata = json!({ "scitt_entry": scitt_metadata });
236        }
237        Ok(())
238    }
239}
240
241fn struct_to_value(data: Option<prost_types::Struct>) -> Value {
242    data.as_ref()
243        .map(prost_struct_to_json)
244        .unwrap_or(Value::Null)
245}
246
247fn attestation_to_proto(att: Attestation) -> Result<runtime::Attestation, Status> {
248    let subject = att
249        .subject
250        .as_ref()
251        .map(attestation_subject_to_proto)
252        .transpose()?;
253
254    let claims_value = Value::Object(att.claims.into_iter().collect());
255    let claims_struct = json_to_prost_struct(&claims_value)
256        .map_err(|err| internal_error(format!("failed to serialise attestation claims: {err}")))?;
257
258    Ok(runtime::Attestation {
259        attestation_id: att.id.to_string(),
260        issued_at: Some(to_timestamp(att.issued_at)),
261        subject,
262        artifact_hash: att.artifact_hash.map(|value| value.into()),
263        claims: Some(claims_struct),
264        signature: att.signature.map(|value| value.into()),
265    })
266}
267
268fn attestation_subject_to_proto(
269    subject: &TrustSubject,
270) -> Result<runtime::AttestationSubject, Status> {
271    use runtime::attestation_subject::Kind;
272    let kind = match subject {
273        TrustSubject::Run { run_id } => Kind::Run(runtime::AttestationRunSubject {
274            run_id: run_id.to_string(),
275        }),
276        TrustSubject::Step { run_id, step_id } => Kind::Step(runtime::AttestationStepSubject {
277            run_id: run_id.to_string(),
278            step_id: step_id.to_string(),
279        }),
280        TrustSubject::Tool { name } => {
281            Kind::Tool(runtime::AttestationToolSubject { name: name.clone() })
282        }
283        TrustSubject::Artifact { uri } => {
284            Kind::Artifact(runtime::AttestationArtifactSubject { uri: uri.clone() })
285        }
286        TrustSubject::CapabilityToken { token_id } => {
287            Kind::CapabilityToken(runtime::AttestationCapabilitySubject {
288                token_id: token_id.to_string(),
289            })
290        }
291        TrustSubject::Custom { label } => Kind::Custom(runtime::AttestationCustomSubject {
292            label: label.clone(),
293        }),
294    };
295    Ok(runtime::AttestationSubject { kind: Some(kind) })
296}
297
298fn gate_effect_to_proto(effect: GateEffect) -> runtime::ChangeDecisionEffect {
299    match effect {
300        GateEffect::Allow => runtime::ChangeDecisionEffect::Allow,
301        GateEffect::FollowUp => runtime::ChangeDecisionEffect::FollowUp,
302        GateEffect::Deny => runtime::ChangeDecisionEffect::Deny,
303    }
304}
305
306fn gate_effect_from_str(value: &str) -> GateEffect {
307    match value {
308        "allow" => GateEffect::Allow,
309        "deny" => GateEffect::Deny,
310        "follow_up" => GateEffect::FollowUp,
311        other => {
312            warn!(effect = %other, "unknown change gate effect stored; defaulting to follow_up");
313            GateEffect::FollowUp
314        }
315    }
316}
317
318fn gate_followup_action_to_proto(action: &GateFollowupAction) -> runtime::FollowupAction {
319    runtime::FollowupAction {
320        slug: action.slug.clone(),
321        description: action.description.clone(),
322        recommended: action.recommended,
323    }
324}
325
326fn gate_scorecard_to_proto(scorecard: &GateScorecard) -> Option<runtime::DecisionScorecard> {
327    Some(runtime::DecisionScorecard {
328        novelty: Some(runtime::NoveltySummary {
329            max_novelty: scorecard.novelty.max_novelty,
330            avg_novelty: scorecard.novelty.avg_novelty,
331            high_risk_paths: scorecard.novelty.high_risk_paths.clone(),
332        }),
333        coverage: Some(runtime::CoverageSummary {
334            overall_ratio: scorecard.coverage.overall_ratio,
335            components_below_threshold: scorecard.coverage.components_below_threshold.clone(),
336        }),
337        evals: Some(runtime::EvalSummary {
338            failing_metrics: scorecard.evals.failing_metrics.clone(),
339            attention_metrics: scorecard.evals.attention_metrics.clone(),
340            score_per_token: scorecard.evals.score_per_token,
341        }),
342        budgets: Some(runtime::BudgetSummary {
343            breaches: scorecard.budgets.breaches.clone(),
344            near_limits: scorecard.budgets.near_limits.clone(),
345        }),
346        telemetry: value_to_struct(&scorecard.telemetry),
347    })
348}
349
350fn proto_followup_outcome(outcome: runtime::FollowupOutcome) -> GateFollowupOutcome {
351    match outcome {
352        runtime::FollowupOutcome::Approved => GateFollowupOutcome::Approved,
353        runtime::FollowupOutcome::NeedsChanges => GateFollowupOutcome::NeedsChanges,
354        runtime::FollowupOutcome::Deferred => GateFollowupOutcome::Deferred,
355        runtime::FollowupOutcome::Unspecified => GateFollowupOutcome::Approved,
356    }
357}
358
359fn followup_outcome_to_proto(outcome: GateFollowupOutcome) -> runtime::FollowupOutcome {
360    match outcome {
361        GateFollowupOutcome::Approved => runtime::FollowupOutcome::Approved,
362        GateFollowupOutcome::NeedsChanges => runtime::FollowupOutcome::NeedsChanges,
363        GateFollowupOutcome::Deferred => runtime::FollowupOutcome::Deferred,
364    }
365}
366
367fn parse_reasons(value: &Value) -> Vec<String> {
368    serde_json::from_value::<Vec<String>>(value.clone()).unwrap_or_default()
369}
370
371fn parse_followup_actions(value: &Value) -> Vec<GateFollowupAction> {
372    serde_json::from_value::<Vec<GateFollowupAction>>(value.clone()).unwrap_or_default()
373}
374
375fn parse_scorecard(value: &Value) -> GateScorecard {
376    serde_json::from_value::<GateScorecard>(value.clone()).unwrap_or_default()
377}
378
379fn gate_followup_record_to_proto(
380    record: &models::ChangeGateFollowup,
381) -> runtime::ChangeGateFollowup {
382    let outcome =
383        GateFollowupOutcome::from_str(&record.outcome).unwrap_or(GateFollowupOutcome::Approved);
384    let details_struct = value_to_struct(&record.details);
385
386    runtime::ChangeGateFollowup {
387        followup_id: record.followup_id.to_string(),
388        gate_id: record.gate_id.to_string(),
389        actor: record.actor.clone(),
390        outcome: followup_outcome_to_proto(outcome) as i32,
391        note: record.note.clone(),
392        details: details_struct,
393        recorded_at: Some(to_timestamp(record.recorded_at)),
394    }
395}
396
397fn build_proto_change_gate(
398    gate_id: Uuid,
399    change_id: &str,
400    revision: Option<&str>,
401    decision: &GateDecision,
402    telemetry: &Value,
403    acknowledgements: &[models::ChangeGateFollowup],
404) -> Result<runtime::ChangeGateDecision, Status> {
405    let effect = gate_effect_to_proto(decision.effect) as i32;
406    let scorecard = gate_scorecard_to_proto(&decision.scorecard);
407    let metadata_struct = value_to_struct(&decision.metadata);
408    let telemetry_struct = value_to_struct(telemetry);
409    let ack_proto = acknowledgements
410        .iter()
411        .map(gate_followup_record_to_proto)
412        .collect();
413
414    Ok(runtime::ChangeGateDecision {
415        gate_id: gate_id.to_string(),
416        change_id: change_id.to_string(),
417        revision: revision.unwrap_or_default().to_string(),
418        effect,
419        reasons: decision.reasons.clone(),
420        followups: decision
421            .followups
422            .iter()
423            .map(gate_followup_action_to_proto)
424            .collect(),
425        scorecard,
426        decided_at: Some(to_timestamp(decision.decided_at)),
427        metadata: metadata_struct,
428        telemetry: telemetry_struct,
429        acknowledgements: ack_proto,
430    })
431}
432
433fn attestation_ids_from_value(value: &Value) -> Vec<Uuid> {
434    fn recurse(value: &Value, acc: &mut HashSet<Uuid>) {
435        match value {
436            Value::Object(map) => {
437                if let Some(ids) = map.get("attestation_ids").and_then(Value::as_array) {
438                    for id in ids {
439                        if let Some(s) = id.as_str() {
440                            if let Ok(uuid) = Uuid::parse_str(s) {
441                                acc.insert(uuid);
442                            }
443                        }
444                    }
445                }
446                for v in map.values() {
447                    recurse(v, acc);
448                }
449            }
450            Value::Array(items) => {
451                for item in items {
452                    recurse(item, acc);
453                }
454            }
455            _ => {}
456        }
457    }
458    let mut set = HashSet::new();
459    recurse(value, &mut set);
460    set.into_iter().collect()
461}
462
463#[derive(Clone)]
464struct TapApiService {
465    storage: Arc<Storage>,
466    auth: Arc<AuthHandle>,
467    clickhouse: Option<ClickhouseEventQuery>,
468}
469
470impl TapApiService {
471    fn new(storage: Arc<Storage>, auth: Arc<AuthHandle>) -> Self {
472        let clickhouse = ClickhouseEventQuery::from_env().ok();
473        Self {
474            storage,
475            auth,
476            clickhouse,
477        }
478    }
479
480    async fn require_role(
481        &self,
482        metadata: &tonic::metadata::MetadataMap,
483        required: Role,
484    ) -> Result<Role, Status> {
485        self.auth
486            .require_grpc(metadata, required)
487            .await
488            .map_err(|err| match err {
489                AuthError::MissingToken | AuthError::InvalidToken => {
490                    Status::unauthenticated(err.to_string())
491                }
492                AuthError::InsufficientRole { .. }
493                | AuthError::MissingRoleClaim(_)
494                | AuthError::UnsupportedRole { .. } => Status::permission_denied(err.to_string()),
495                AuthError::NotConfigured => Status::unauthenticated(err.to_string()),
496                AuthError::Backend(_) => Status::internal(err.to_string()),
497            })
498    }
499}
500
501#[tonic::async_trait]
502impl TapService for TapApiService {
503    type SubscribeRunStream =
504        Pin<Box<dyn Stream<Item = Result<SubscribeRunResponse, Status>> + Send>>;
505
506    async fn subscribe_run(
507        &self,
508        request: Request<tonic::Streaming<SubscribeRunRequest>>,
509    ) -> Result<Response<Self::SubscribeRunStream>, Status> {
510        let metadata = request.metadata().clone();
511        self.require_role(&metadata, Role::Reader).await?;
512
513        let mut inbound = request.into_inner();
514        let first = inbound
515            .message()
516            .await
517            .map_err(|status| internal_error(status.to_string()))?
518            .ok_or_else(|| invalid_argument("missing subscribe request"))?;
519
520        let open = match first.request {
521            Some(subscribe_run_request::Request::Open(open)) => open,
522            Some(subscribe_run_request::Request::Ack(_)) => {
523                return Err(invalid_argument(
524                    "first SubscribeRun message must be `open`",
525                ))
526            }
527            None => return Err(invalid_argument("first SubscribeRun message must be set")),
528        };
529
530        let mut config = SubscriptionConfig::try_from_open(open)?;
531        if let Some(cursor) = config.cursor.clone() {
532            if config.since_offset == 0 {
533                match self.storage.fetch_tap_offset(config.run_id, &cursor).await {
534                    Ok(Some(stored)) => {
535                        config.since_offset = stored;
536                    }
537                    Ok(None) => {}
538                    Err(err) => {
539                        error!(
540                            run = %config.run_id,
541                            cursor = %cursor,
542                            error = %err,
543                            "failed to load stored tap cursor"
544                        );
545                        return Err(internal_error("failed to load stored tap cursor"));
546                    }
547                }
548            }
549        }
550        let (tx, rx) =
551            mpsc::channel::<Result<SubscribeRunResponse, Status>>(config.max_inflight * 2);
552
553        let storage = Arc::clone(&self.storage);
554        tokio::spawn(async move {
555            if let Err(err) = run_subscription(storage, config, inbound, tx.clone()).await {
556                let _ = tx.send(Err(err)).await;
557            }
558        });
559
560        Ok(Response::new(
561            Box::pin(ReceiverStream::new(rx)) as Self::SubscribeRunStream
562        ))
563    }
564
565    async fn query_events(
566        &self,
567        request: Request<QueryEventsRequest>,
568    ) -> Result<Response<QueryEventsResponse>, Status> {
569        let metadata = request.metadata().clone();
570        self.require_role(&metadata, Role::Reader).await?;
571
572        let clickhouse = self.clickhouse.as_ref().ok_or_else(|| {
573            Status::failed_precondition("QueryEvents requires ClickHouse configuration")
574        })?;
575
576        let inner = request.into_inner();
577        let filters = QueryFiltersOwned::from_request(&inner)?;
578
579        let requested_page = if inner.page_size == 0 {
580            QUERY_DEFAULT_PAGE_SIZE
581        } else {
582            inner.page_size
583        };
584        let page_size = requested_page.clamp(1, QUERY_MAX_PAGE_SIZE);
585        let offset = parse_page_token(inner.page_token.as_str())?;
586
587        let QueryBatch {
588            events,
589            next_offset,
590        } = clickhouse
591            .fetch(&filters, offset, page_size)
592            .await
593            .map_err(|err| {
594                error!(error = %err, "failed to query ClickHouse events");
595                internal_error("failed to query ClickHouse events")
596            })?;
597
598        let next_page_token = next_offset
599            .map(|value| value.to_string())
600            .unwrap_or_default();
601
602        Ok(Response::new(QueryEventsResponse {
603            events,
604            next_page_token,
605        }))
606    }
607}
608
609struct SubscriptionConfig {
610    run_id: Uuid,
611    step_ids: Option<HashSet<Uuid>>,
612    kinds: Option<HashSet<String>>,
613    severities: Option<HashSet<EventSeverity>>,
614    tags: Option<HashSet<String>>,
615    cursor: Option<String>,
616    since_offset: i64,
617    batch_size: i64,
618    max_inflight: usize,
619    heartbeat: Duration,
620}
621
622impl SubscriptionConfig {
623    fn try_from_open(open: SubscribeRunOpen) -> Result<Self, Status> {
624        let SubscribeRunOpen {
625            run_ids,
626            step_ids,
627            kinds,
628            severities,
629            tags,
630            since_offset,
631            batch_size,
632            max_inflight,
633            heartbeat_interval_ms,
634            cursor,
635            ..
636        } = open;
637
638        if run_ids.is_empty() {
639            return Err(invalid_argument(
640                "subscribe_run requires at least one run_id",
641            ));
642        }
643        if run_ids.len() > 1 {
644            return Err(invalid_argument(
645                "subscribe_run currently supports a single run_id",
646            ));
647        }
648        let run_id = Uuid::parse_str(&run_ids[0])
649            .map_err(|err| invalid_argument(format!("invalid run_id '{}' : {err}", run_ids[0])))?;
650
651        let step_ids = if step_ids.is_empty() {
652            None
653        } else {
654            let mut ids = HashSet::with_capacity(step_ids.len());
655            for raw in step_ids {
656                let parsed = Uuid::parse_str(&raw)
657                    .map_err(|err| invalid_argument(format!("invalid step_id '{raw}': {err}")))?;
658                ids.insert(parsed);
659            }
660            Some(ids)
661        };
662
663        let kinds = if kinds.is_empty() {
664            None
665        } else {
666            Some(
667                kinds
668                    .into_iter()
669                    .map(|kind| kind.to_ascii_lowercase())
670                    .collect::<HashSet<_>>(),
671            )
672        };
673
674        let severities = if severities.is_empty() {
675            None
676        } else {
677            let mut set = HashSet::with_capacity(severities.len());
678            for raw in severities {
679                let sev = EventSeverity::try_from(raw)
680                    .map_err(|_| invalid_argument(format!("unknown severity enum value {raw}")))?;
681                set.insert(sev);
682            }
683            Some(set)
684        };
685
686        let tags = if tags.is_empty() {
687            None
688        } else {
689            Some(
690                tags.into_iter()
691                    .map(|tag| tag.to_ascii_lowercase())
692                    .collect::<HashSet<_>>(),
693            )
694        };
695
696        if since_offset < 0 {
697            return Err(invalid_argument("since_offset must be >= 0"));
698        }
699
700        let batch_raw = if batch_size == 0 { 256 } else { batch_size };
701        let batch_size = batch_raw.max(1).min(2048) as i64;
702
703        let inflight_raw = if max_inflight == 0 { 128 } else { max_inflight };
704        let max_inflight = inflight_raw.max(1).min(4096) as usize;
705
706        let heartbeat_ms = if heartbeat_interval_ms == 0 {
707            1000u64
708        } else {
709            heartbeat_interval_ms as u64
710        }
711        .clamp(250, 60_000);
712
713        let cursor = {
714            let trimmed = cursor.trim();
715            if trimmed.is_empty() {
716                None
717            } else {
718                Some(trimmed.to_string())
719            }
720        };
721
722        Ok(Self {
723            run_id,
724            step_ids,
725            kinds,
726            severities,
727            tags,
728            cursor,
729            since_offset,
730            batch_size,
731            max_inflight,
732            heartbeat: Duration::from_millis(heartbeat_ms),
733        })
734    }
735}
736
737async fn run_subscription(
738    storage: Arc<Storage>,
739    config: SubscriptionConfig,
740    mut inbound: tonic::Streaming<SubscribeRunRequest>,
741    tx: mpsc::Sender<Result<SubscribeRunResponse, Status>>,
742) -> Result<(), Status> {
743    let mut last_seen = config.since_offset;
744    let mut last_ack = config.since_offset;
745    let mut inflight: VecDeque<i64> = VecDeque::new();
746    let mut ticker = interval(config.heartbeat);
747    ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
748
749    loop {
750        while inflight.len() < config.max_inflight {
751            let events = storage
752                .fetch_outbox_for_run_since(config.run_id, last_seen, config.batch_size)
753                .await
754                .map_err(|err| internal_error(format!("failed to fetch outbox events: {err}")))?;
755
756            if events.is_empty() {
757                break;
758            }
759
760            last_seen = events.last().map(|event| event.id).unwrap_or(last_seen);
761
762            for event in events {
763                last_seen = event.id;
764                let severity = derive_severity(&event.kind, &event.payload);
765                let tags = derive_tags(&event.kind, event.step_id, &event.payload, severity);
766                if !passes_filters(&config, &event, severity, &tags) {
767                    continue;
768                }
769
770                let response = build_response(&config, &event, severity, &tags, last_seen)?;
771                if tx.send(Ok(response)).await.is_err() {
772                    return Ok(());
773                }
774                inflight.push_back(event.id);
775            }
776        }
777
778        tokio::select! {
779            maybe_req = inbound.message() => {
780                match maybe_req {
781                    Ok(Some(SubscribeRunRequest { request: Some(subscribe_run_request::Request::Ack(ack)) })) => {
782                        handle_ack(&mut inflight, &mut last_ack, ack.offset);
783                        let ack_cursor = if !ack.cursor.trim().is_empty() {
784                            Some(ack.cursor)
785                        } else {
786                            config.cursor.clone()
787                        };
788                        if let Some(cursor_id) = ack_cursor {
789                            if ack.offset < 0 {
790                                return Err(invalid_argument("ack offset must be >= 0"));
791                            }
792                            if let Err(err) = storage
793                                .upsert_tap_offset(config.run_id, &cursor_id, ack.offset)
794                                .await
795                            {
796                                error!(
797                                    run = %config.run_id,
798                                    cursor = %cursor_id,
799                                    error = %err,
800                                    "failed to persist tap cursor offset"
801                                );
802                                return Err(internal_error("failed to persist tap cursor offset"));
803                            }
804                        }
805                    }
806                    Ok(Some(SubscribeRunRequest { request: Some(subscribe_run_request::Request::Open(_)) })) => {
807                        return Err(invalid_argument("subscribe_run received multiple open messages"));
808                    }
809                    Ok(Some(SubscribeRunRequest { request: None })) => {}
810                    Ok(None) => {
811                        return Ok(());
812                    }
813                    Err(status) => {
814                        return Err(internal_error(status.to_string()));
815                    }
816                }
817            }
818            _ = ticker.tick() => {
819                let heartbeat = SubscribeRunResponse {
820                    offset: last_ack,
821                    watermark: last_seen,
822                    heartbeat: true,
823                    event: None,
824                };
825                if tx.send(Ok(heartbeat)).await.is_err() {
826                    return Ok(());
827                }
828            }
829        }
830    }
831}
832
833#[derive(Debug, Clone)]
834struct QueryFiltersOwned {
835    source: QueryEventSource,
836    run_ids: Vec<Uuid>,
837    step_ids: Vec<Uuid>,
838    workspace_ids: Vec<String>,
839    kinds: Vec<String>,
840    policy_effects: Vec<PolicyDecisionEffect>,
841    severities: Vec<EventSeverity>,
842    start_time: Option<DateTime<Utc>>,
843    end_time: Option<DateTime<Utc>>,
844    ascending: bool,
845}
846
847impl QueryFiltersOwned {
848    fn from_request(request: &QueryEventsRequest) -> Result<Self, Status> {
849        let mut run_ids = Vec::with_capacity(request.run_ids.len());
850        for raw in &request.run_ids {
851            let parsed = Uuid::parse_str(raw)
852                .map_err(|err| invalid_argument(format!("invalid run_id '{raw}': {err}")))?;
853            run_ids.push(parsed);
854        }
855
856        let mut step_ids = Vec::with_capacity(request.step_ids.len());
857        for raw in &request.step_ids {
858            let parsed = Uuid::parse_str(raw)
859                .map_err(|err| invalid_argument(format!("invalid step_id '{raw}': {err}")))?;
860            step_ids.push(parsed);
861        }
862
863        let source = QueryEventSource::try_from(request.source).unwrap_or(QueryEventSource::Run);
864
865        let start_time = request
866            .start_time
867            .as_ref()
868            .map(timestamp_to_datetime)
869            .transpose()?;
870        let end_time = request
871            .end_time
872            .as_ref()
873            .map(timestamp_to_datetime)
874            .transpose()?;
875
876        let kinds = request
877            .kinds
878            .iter()
879            .filter(|kind| !kind.is_empty())
880            .map(|kind| kind.to_ascii_lowercase())
881            .collect::<Vec<_>>();
882
883        let workspace_ids = request
884            .workspace_ids
885            .iter()
886            .filter(|id| !id.is_empty())
887            .map(|id| id.to_ascii_lowercase())
888            .collect::<Vec<_>>();
889
890        let mut policy_effects = Vec::with_capacity(request.policy_effects.len());
891        for raw in &request.policy_effects {
892            if let Ok(effect) = PolicyDecisionEffect::try_from(*raw) {
893                if effect != PolicyDecisionEffect::Unspecified {
894                    policy_effects.push(effect);
895                }
896            }
897        }
898
899        let mut severities = Vec::with_capacity(request.severities.len());
900        for raw in &request.severities {
901            if let Ok(severity) = EventSeverity::try_from(*raw) {
902                if severity != EventSeverity::Unspecified {
903                    severities.push(severity);
904                }
905            }
906        }
907
908        Ok(Self {
909            source,
910            run_ids,
911            step_ids,
912            workspace_ids,
913            kinds,
914            policy_effects,
915            severities,
916            start_time,
917            end_time,
918            ascending: request.ascending,
919        })
920    }
921}
922
923struct QueryBatch {
924    events: Vec<RunEventEnvelope>,
925    next_offset: Option<u64>,
926}
927
928#[derive(Clone)]
929struct ClickhouseEventQuery {
930    client: ClickhouseClient,
931    run_table: String,
932    step_table: String,
933}
934
935#[derive(Debug, Deserialize, Row)]
936struct RunEventRow {
937    run_id: Uuid,
938    created_at: DateTime<Utc>,
939    status: String,
940    attributes: Value,
941}
942
943#[derive(Debug, Deserialize, Row)]
944struct StepEventRow {
945    run_id: Uuid,
946    step_id: Uuid,
947    status: String,
948    attempt: i32,
949    recorded_at: DateTime<Utc>,
950    attributes: Value,
951}
952
953impl ClickhouseEventQuery {
954    fn from_env() -> Result<Self> {
955        let url = env::var("CLICKHOUSE_DSN")
956            .context("CLICKHOUSE_DSN is not configured for Tap QueryEvents")?;
957        let database = env::var("CLICKHOUSE_DATABASE").unwrap_or_else(|_| "telemetry".to_string());
958        let run_table =
959            env::var("CLICKHOUSE_RUN_TABLE").unwrap_or_else(|_| "run_events".to_string());
960        let step_table =
961            env::var("CLICKHOUSE_STEP_TABLE").unwrap_or_else(|_| "step_events".to_string());
962
963        let client = ClickhouseClient::default()
964            .with_url(url)
965            .with_database(database);
966
967        Ok(Self {
968            client,
969            run_table,
970            step_table,
971        })
972    }
973
974    async fn fetch(
975        &self,
976        filters: &QueryFiltersOwned,
977        offset: u64,
978        limit: u32,
979    ) -> Result<QueryBatch> {
980        match filters.source {
981            QueryEventSource::Step => self.query_step_events(filters, offset, limit).await,
982            _ => self.query_run_events(filters, offset, limit).await,
983        }
984    }
985
986    async fn query_run_events(
987        &self,
988        filters: &QueryFiltersOwned,
989        offset: u64,
990        limit: u32,
991    ) -> Result<QueryBatch> {
992        let chunk = std::cmp::max(limit, 1)
993            .saturating_mul(QUERY_FETCH_MULTIPLIER)
994            .max(limit.max(1));
995        let mut collected = Vec::with_capacity(limit as usize);
996        let mut scan_offset = offset;
997        let mut next_offset = None;
998
999        while collected.len() < limit as usize {
1000            let rows = self.fetch_run_rows(filters, scan_offset, chunk).await?;
1001            if rows.is_empty() {
1002                break;
1003            }
1004
1005            let mut consumed = 0usize;
1006            for row in rows {
1007                let raw_offset = scan_offset + consumed as u64;
1008                consumed += 1;
1009                let payload = normalize_payload(row.attributes);
1010                let kind = format!("run_{}", row.status.to_ascii_lowercase());
1011                let severity = derive_severity(&kind, &payload);
1012                if !matches_filters(filters, &payload, &kind, severity, None) {
1013                    continue;
1014                }
1015                let envelope = make_envelope(
1016                    row.run_id,
1017                    None,
1018                    row.created_at,
1019                    &payload,
1020                    &kind,
1021                    severity,
1022                    raw_offset,
1023                )?;
1024                collected.push(envelope);
1025                if collected.len() >= limit as usize {
1026                    next_offset = Some(raw_offset.saturating_add(1));
1027                    break;
1028                }
1029            }
1030
1031            scan_offset = scan_offset.saturating_add(consumed as u64);
1032            if consumed < chunk as usize {
1033                break;
1034            }
1035        }
1036
1037        Ok(QueryBatch {
1038            events: collected,
1039            next_offset,
1040        })
1041    }
1042
1043    async fn query_step_events(
1044        &self,
1045        filters: &QueryFiltersOwned,
1046        offset: u64,
1047        limit: u32,
1048    ) -> Result<QueryBatch> {
1049        let chunk = std::cmp::max(limit, 1)
1050            .saturating_mul(QUERY_FETCH_MULTIPLIER)
1051            .max(limit.max(1));
1052        let mut collected = Vec::with_capacity(limit as usize);
1053        let mut scan_offset = offset;
1054        let mut next_offset = None;
1055
1056        while collected.len() < limit as usize {
1057            let rows = self.fetch_step_rows(filters, scan_offset, chunk).await?;
1058            if rows.is_empty() {
1059                break;
1060            }
1061
1062            let mut consumed = 0usize;
1063            for row in rows {
1064                let raw_offset = scan_offset + consumed as u64;
1065                consumed += 1;
1066                let payload = normalize_payload(row.attributes);
1067                let kind = format!("step_{}", row.status.to_ascii_lowercase());
1068                let severity = derive_severity(&kind, &payload);
1069                if !matches_filters(filters, &payload, &kind, severity, Some(row.step_id)) {
1070                    continue;
1071                }
1072                let envelope = make_envelope(
1073                    row.run_id,
1074                    Some(row.step_id),
1075                    row.recorded_at,
1076                    &payload,
1077                    &kind,
1078                    severity,
1079                    raw_offset,
1080                )?;
1081                collected.push(envelope);
1082                if collected.len() >= limit as usize {
1083                    next_offset = Some(raw_offset.saturating_add(1));
1084                    break;
1085                }
1086            }
1087
1088            scan_offset = scan_offset.saturating_add(consumed as u64);
1089            if consumed < chunk as usize {
1090                break;
1091            }
1092        }
1093
1094        Ok(QueryBatch {
1095            events: collected,
1096            next_offset,
1097        })
1098    }
1099
1100    async fn fetch_run_rows(
1101        &self,
1102        filters: &QueryFiltersOwned,
1103        offset: u64,
1104        limit: u32,
1105    ) -> Result<Vec<RunEventRow>> {
1106        let mut where_clauses = Vec::new();
1107        if !filters.run_ids.is_empty() {
1108            where_clauses.push(format!(
1109                "run_id IN ({})",
1110                format_uuid_list(&filters.run_ids)
1111            ));
1112        }
1113        if let Some(start) = filters.start_time {
1114            where_clauses.push(format!(
1115                "created_at >= toDateTime64('{}', 3, 'UTC')",
1116                start.format("%Y-%m-%d %H:%M:%S%.3f")
1117            ));
1118        }
1119        if let Some(end) = filters.end_time {
1120            where_clauses.push(format!(
1121                "created_at <= toDateTime64('{}', 3, 'UTC')",
1122                end.format("%Y-%m-%d %H:%M:%S%.3f")
1123            ));
1124        }
1125
1126        let mut sql = format!(
1127            "SELECT run_id, created_at, status, attributes FROM {}",
1128            self.run_table
1129        );
1130        if !where_clauses.is_empty() {
1131            sql.push_str(" WHERE ");
1132            sql.push_str(&where_clauses.join(" AND "));
1133        }
1134        sql.push_str(" ORDER BY created_at ");
1135        sql.push_str(if filters.ascending { "ASC" } else { "DESC" });
1136        sql.push_str(", run_id ");
1137        sql.push_str(if filters.ascending { "ASC" } else { "DESC" });
1138        sql.push_str(&format!(" LIMIT {} OFFSET {}", limit, offset));
1139
1140        let rows = self
1141            .client
1142            .query(&sql)
1143            .fetch_all::<RunEventRow>()
1144            .await
1145            .context("failed to fetch run events from ClickHouse")?;
1146        Ok(rows)
1147    }
1148
1149    async fn fetch_step_rows(
1150        &self,
1151        filters: &QueryFiltersOwned,
1152        offset: u64,
1153        limit: u32,
1154    ) -> Result<Vec<StepEventRow>> {
1155        let mut where_clauses = Vec::new();
1156        if !filters.run_ids.is_empty() {
1157            where_clauses.push(format!(
1158                "run_id IN ({})",
1159                format_uuid_list(&filters.run_ids)
1160            ));
1161        }
1162        if !filters.step_ids.is_empty() {
1163            where_clauses.push(format!(
1164                "step_id IN ({})",
1165                format_uuid_list(&filters.step_ids)
1166            ));
1167        }
1168
1169        let time_expr = "coalesce(finished_at, started_at)";
1170        if let Some(start) = filters.start_time {
1171            where_clauses.push(format!(
1172                "{time_expr} >= toDateTime64('{}', 3, 'UTC')",
1173                start.format("%Y-%m-%d %H:%M:%S%.3f")
1174            ));
1175        }
1176        if let Some(end) = filters.end_time {
1177            where_clauses.push(format!(
1178                "{time_expr} <= toDateTime64('{}', 3, 'UTC')",
1179                end.format("%Y-%m-%d %H:%M:%S%.3f")
1180            ));
1181        }
1182
1183        let mut sql = format!(
1184            "SELECT run_id, step_id, status, attempt, {time_expr} AS recorded_at, attributes FROM {}",
1185            self.step_table
1186        );
1187        if !where_clauses.is_empty() {
1188            sql.push_str(" WHERE ");
1189            sql.push_str(&where_clauses.join(" AND "));
1190        }
1191        sql.push_str(&format!(" ORDER BY {time_expr} "));
1192        sql.push_str(if filters.ascending { "ASC" } else { "DESC" });
1193        sql.push_str(", step_id ");
1194        sql.push_str(if filters.ascending { "ASC" } else { "DESC" });
1195        sql.push_str(&format!(" LIMIT {} OFFSET {}", limit, offset));
1196
1197        let rows = self
1198            .client
1199            .query(&sql)
1200            .fetch_all::<StepEventRow>()
1201            .await
1202            .context("failed to fetch step events from ClickHouse")?;
1203        Ok(rows)
1204    }
1205}
1206
1207fn parse_page_token(raw: &str) -> Result<u64, Status> {
1208    if raw.trim().is_empty() {
1209        return Ok(0);
1210    }
1211    raw.parse::<u64>()
1212        .map_err(|_| invalid_argument("page_token must be a positive integer"))
1213}
1214
1215fn timestamp_to_datetime(ts: &prost_types::Timestamp) -> Result<DateTime<Utc>, Status> {
1216    Utc.timestamp_opt(ts.seconds, ts.nanos as u32)
1217        .single()
1218        .ok_or_else(|| invalid_argument("timestamp out of range"))
1219}
1220
1221fn normalize_payload(value: Value) -> Value {
1222    match value {
1223        Value::Object(_) => value,
1224        Value::Null => Value::Object(Map::new()),
1225        other => {
1226            let mut map = Map::new();
1227            map.insert("value".to_string(), other);
1228            Value::Object(map)
1229        }
1230    }
1231}
1232
1233fn matches_filters(
1234    filters: &QueryFiltersOwned,
1235    payload: &Value,
1236    kind: &str,
1237    severity: EventSeverity,
1238    step_id: Option<Uuid>,
1239) -> bool {
1240    if !filters.kinds.is_empty() {
1241        let kind_value = kind.to_ascii_lowercase();
1242        if !filters.kinds.iter().any(|expected| expected == &kind_value) {
1243            return false;
1244        }
1245    }
1246
1247    if !filters.workspace_ids.is_empty() {
1248        let workspace = json_get_string(payload, &["trace", "run", "workspace_id"])
1249            .map(|value| value.to_ascii_lowercase());
1250        let Some(workspace) = workspace else {
1251            return false;
1252        };
1253        if !filters
1254            .workspace_ids
1255            .iter()
1256            .any(|expected| expected == &workspace)
1257        {
1258            return false;
1259        }
1260    }
1261
1262    if !filters.policy_effects.is_empty() {
1263        let effect = payload
1264            .get("effect")
1265            .and_then(Value::as_str)
1266            .map(|value| value.to_ascii_lowercase());
1267        if let Some(effect) = effect {
1268            if !filters
1269                .policy_effects
1270                .iter()
1271                .any(|expected| policy_effect_label(*expected) == effect)
1272            {
1273                return false;
1274            }
1275        } else {
1276            return false;
1277        }
1278    }
1279
1280    if !filters.severities.is_empty() && !filters.severities.contains(&severity) {
1281        return false;
1282    }
1283
1284    if filters.source == QueryEventSource::Step && !filters.step_ids.is_empty() {
1285        if let Some(step_id) = step_id {
1286            if !filters.step_ids.contains(&step_id) {
1287                return false;
1288            }
1289        } else {
1290            return false;
1291        }
1292    }
1293
1294    true
1295}
1296
1297fn policy_effect_label(effect: PolicyDecisionEffect) -> &'static str {
1298    match effect {
1299        PolicyDecisionEffect::Allow => "allow",
1300        PolicyDecisionEffect::Deny => "deny",
1301        PolicyDecisionEffect::Escalate => "escalate",
1302        PolicyDecisionEffect::Modify => "modify",
1303        _ => "",
1304    }
1305}
1306
1307fn json_get_string<'a>(value: &'a Value, path: &[&str]) -> Option<&'a str> {
1308    let mut current = value;
1309    for segment in path {
1310        current = current.get(*segment)?;
1311    }
1312    current.as_str()
1313}
1314
1315fn format_uuid_list(values: &[Uuid]) -> String {
1316    values
1317        .iter()
1318        .map(|value| format!("'{}'", value))
1319        .collect::<Vec<_>>()
1320        .join(", ")
1321}
1322
1323fn make_envelope(
1324    run_id: Uuid,
1325    step_id: Option<Uuid>,
1326    recorded_at: DateTime<Utc>,
1327    payload: &Value,
1328    kind: &str,
1329    severity: EventSeverity,
1330    raw_offset: u64,
1331) -> Result<RunEventEnvelope> {
1332    let trace = build_trace_context(run_id, step_id, payload);
1333    let tags = derive_tags(kind, step_id, payload, severity);
1334    let raw_struct = json_to_prost_struct(payload)
1335        .map_err(|err| anyhow!("failed to encode ClickHouse payload into Struct: {err}"))?;
1336    let offset =
1337        i64::try_from(raw_offset).map_err(|_| anyhow!("query offset exceeds i64 range"))?;
1338
1339    Ok(RunEventEnvelope {
1340        trace: Some(trace),
1341        recorded_at: Some(to_timestamp(recorded_at)),
1342        offset,
1343        severity: severity as i32,
1344        tags,
1345        raw: Some(raw_struct),
1346        body: None,
1347    })
1348}
1349
1350fn handle_ack(inflight: &mut VecDeque<i64>, last_ack: &mut i64, ack_offset: i64) {
1351    if ack_offset <= *last_ack {
1352        return;
1353    }
1354    *last_ack = ack_offset;
1355    while let Some(front) = inflight.front() {
1356        if *front <= ack_offset {
1357            inflight.pop_front();
1358        } else {
1359            break;
1360        }
1361    }
1362}
1363
1364fn passes_filters(
1365    config: &SubscriptionConfig,
1366    event: &models::OutboxEvent,
1367    severity: EventSeverity,
1368    tags: &[String],
1369) -> bool {
1370    if let Some(ref kinds) = config.kinds {
1371        if !kinds.contains(&event.kind.to_ascii_lowercase()) {
1372            return false;
1373        }
1374    }
1375
1376    if let Some(ref severities) = config.severities {
1377        if !severities.contains(&severity) {
1378            return false;
1379        }
1380    }
1381
1382    if let Some(ref step_filter) = config.step_ids {
1383        match event.step_id {
1384            Some(step_id) if step_filter.contains(&step_id) => {}
1385            _ => return false,
1386        }
1387    }
1388
1389    if let Some(ref tag_filter) = config.tags {
1390        if !tags
1391            .iter()
1392            .any(|tag| tag_filter.contains(&tag.to_ascii_lowercase()))
1393        {
1394            return false;
1395        }
1396    }
1397
1398    true
1399}
1400
1401fn derive_severity(kind: &str, payload: &Value) -> EventSeverity {
1402    let kind_lower = kind.to_ascii_lowercase();
1403    if kind_lower.contains("failed") || kind_lower.contains("error") {
1404        EventSeverity::Error
1405    } else if kind_lower.contains("denied") || kind_lower.contains("skipped") {
1406        EventSeverity::Warn
1407    } else if kind_lower.contains("debug") {
1408        EventSeverity::Debug
1409    } else if payload
1410        .get("severity")
1411        .and_then(Value::as_str)
1412        .map(|value| value.eq_ignore_ascii_case("warn"))
1413        .unwrap_or(false)
1414    {
1415        EventSeverity::Warn
1416    } else {
1417        EventSeverity::Info
1418    }
1419}
1420
1421fn severity_label(severity: EventSeverity) -> &'static str {
1422    match severity {
1423        EventSeverity::Trace => "trace",
1424        EventSeverity::Debug => "debug",
1425        EventSeverity::Info => "info",
1426        EventSeverity::Warn => "warn",
1427        EventSeverity::Error => "error",
1428        EventSeverity::Fatal => "fatal",
1429        EventSeverity::Unspecified => "unspecified",
1430    }
1431}
1432
1433fn derive_tags(
1434    kind: &str,
1435    step_id: Option<Uuid>,
1436    payload: &Value,
1437    severity: EventSeverity,
1438) -> Vec<String> {
1439    let mut tags = Vec::with_capacity(6);
1440    tags.push("tap".to_string());
1441    tags.push(format!("kind:{}", kind.to_ascii_lowercase()));
1442    tags.push(format!("severity:{}", severity_label(severity)));
1443
1444    if let Some(step_id) = step_id {
1445        tags.push("step".to_string());
1446        tags.push(format!("step_id:{step_id}"));
1447    } else {
1448        tags.push("run".to_string());
1449    }
1450
1451    if let Some(status) = payload.get("status").and_then(Value::as_str) {
1452        tags.push(format!("status:{}", status.to_ascii_lowercase()));
1453    }
1454    if let Some(effect) = payload.get("effect").and_then(Value::as_str) {
1455        tags.push(format!("effect:{}", effect.to_ascii_lowercase()));
1456    }
1457    tags
1458}
1459
1460fn build_response(
1461    config: &SubscriptionConfig,
1462    event: &models::OutboxEvent,
1463    severity: EventSeverity,
1464    tags: &[String],
1465    watermark: i64,
1466) -> Result<SubscribeRunResponse, Status> {
1467    let raw_struct = json_to_prost_struct(&event.payload).map_err(|err| {
1468        internal_error(format!(
1469            "failed to encode outbox payload {}: {err}",
1470            event.id
1471        ))
1472    })?;
1473
1474    let trace = build_trace_context(config.run_id, event.step_id, &event.payload);
1475
1476    let envelope = RunEventEnvelope {
1477        trace: Some(trace),
1478        recorded_at: Some(to_timestamp(event.created_at)),
1479        offset: event.id,
1480        severity: severity as i32,
1481        tags: tags.to_vec(),
1482        raw: Some(raw_struct),
1483        body: None,
1484    };
1485
1486    Ok(SubscribeRunResponse {
1487        offset: event.id,
1488        watermark,
1489        heartbeat: false,
1490        event: Some(envelope),
1491    })
1492}
1493
1494fn build_trace_context(run_id: Uuid, step_id: Option<Uuid>, payload: &Value) -> RunTraceContext {
1495    let mut ctx = RunTraceContext {
1496        trace_id: String::new(),
1497        span_id: String::new(),
1498        parent_span_id: String::new(),
1499        run: Some(tap::RunScope {
1500            run_id: run_id.to_string(),
1501            ..Default::default()
1502        }),
1503        step: None,
1504        tool_call: None,
1505        policy_decision: None,
1506        budget: None,
1507        seed: None,
1508        baggage: Vec::new(),
1509    };
1510
1511    if let Some(trace_value) = payload.get("trace").and_then(Value::as_object) {
1512        if let Some(trace_id) = trace_value.get("trace_id").and_then(Value::as_str) {
1513            ctx.trace_id = trace_id.to_string();
1514        }
1515        if let Some(span_id) = trace_value.get("span_id").and_then(Value::as_str) {
1516            ctx.span_id = span_id.to_string();
1517        }
1518        if let Some(parent_span_id) = trace_value.get("parent_span_id").and_then(Value::as_str) {
1519            ctx.parent_span_id = parent_span_id.to_string();
1520        }
1521        if let Some(seed) = trace_value.get("seed").and_then(Value::as_i64) {
1522            ctx.seed = Some(seed);
1523        }
1524        if let Some(run_obj) = trace_value.get("run").and_then(Value::as_object) {
1525            let run_id_value = run_obj
1526                .get("run_id")
1527                .and_then(Value::as_str)
1528                .map(str::to_owned)
1529                .unwrap_or_else(|| run_id.to_string());
1530            let mut run_scope = tap::RunScope {
1531                run_id: run_id_value,
1532                workspace_id: run_obj
1533                    .get("workspace_id")
1534                    .and_then(Value::as_str)
1535                    .unwrap_or_default()
1536                    .to_string(),
1537                app_id: run_obj
1538                    .get("app_id")
1539                    .and_then(Value::as_str)
1540                    .unwrap_or_default()
1541                    .to_string(),
1542                attempt_id: run_obj
1543                    .get("attempt_id")
1544                    .and_then(Value::as_str)
1545                    .unwrap_or_default()
1546                    .to_string(),
1547                labels: Default::default(),
1548            };
1549            if let Some(labels) = run_obj.get("labels").and_then(Value::as_object) {
1550                for (key, value) in labels {
1551                    if let Some(string) = value.as_str() {
1552                        run_scope.labels.insert(key.clone(), string.to_string());
1553                    }
1554                }
1555            }
1556            ctx.run = Some(run_scope);
1557        }
1558        if let Some(step_obj) = trace_value.get("step").and_then(Value::as_object) {
1559            let mut scope = tap::StepScope {
1560                step_id: step_obj
1561                    .get("step_id")
1562                    .and_then(Value::as_str)
1563                    .map(str::to_owned)
1564                    .unwrap_or_else(|| step_id.map(|id| id.to_string()).unwrap_or_default()),
1565                index: step_obj
1566                    .get("index")
1567                    .and_then(Value::as_u64)
1568                    .map(|value| value as u32)
1569                    .unwrap_or_default(),
1570                attempt: step_obj
1571                    .get("attempt")
1572                    .and_then(Value::as_u64)
1573                    .map(|value| value as u32)
1574                    .unwrap_or_default(),
1575                kind: step_obj
1576                    .get("kind")
1577                    .and_then(Value::as_str)
1578                    .unwrap_or_default()
1579                    .to_string(),
1580                scheduler: step_obj
1581                    .get("scheduler")
1582                    .and_then(Value::as_str)
1583                    .unwrap_or_default()
1584                    .to_string(),
1585            };
1586            if scope.step_id.is_empty() {
1587                if let Some(id) = step_id {
1588                    scope.step_id = id.to_string();
1589                }
1590            }
1591            ctx.step = Some(scope);
1592        } else if let Some(id) = step_id {
1593            ctx.step = Some(tap::StepScope {
1594                step_id: id.to_string(),
1595                ..Default::default()
1596            });
1597        }
1598        if let Some(tool_obj) = trace_value.get("tool_call").and_then(Value::as_object) {
1599            ctx.tool_call = Some(tap::ToolCallScope {
1600                tool_call_id: tool_obj
1601                    .get("tool_call_id")
1602                    .and_then(Value::as_str)
1603                    .unwrap_or_default()
1604                    .to_string(),
1605                tool_name: tool_obj
1606                    .get("tool_name")
1607                    .and_then(Value::as_str)
1608                    .unwrap_or_default()
1609                    .to_string(),
1610                tool_variant: tool_obj
1611                    .get("tool_variant")
1612                    .and_then(Value::as_str)
1613                    .unwrap_or_default()
1614                    .to_string(),
1615                provider: tool_obj
1616                    .get("provider")
1617                    .and_then(Value::as_str)
1618                    .unwrap_or_default()
1619                    .to_string(),
1620            });
1621        }
1622        if let Some(policy_obj) = trace_value
1623            .get("policy_decision")
1624            .and_then(Value::as_object)
1625        {
1626            ctx.policy_decision = Some(tap::PolicyDecisionScope {
1627                policy_decision_id: policy_obj
1628                    .get("policy_decision_id")
1629                    .and_then(Value::as_str)
1630                    .unwrap_or_default()
1631                    .to_string(),
1632                policy_pack_id: policy_obj
1633                    .get("policy_pack_id")
1634                    .and_then(Value::as_str)
1635                    .unwrap_or_default()
1636                    .to_string(),
1637                policy_name: policy_obj
1638                    .get("policy_name")
1639                    .and_then(Value::as_str)
1640                    .unwrap_or_default()
1641                    .to_string(),
1642                policy_version: policy_obj
1643                    .get("policy_version")
1644                    .and_then(Value::as_str)
1645                    .unwrap_or_default()
1646                    .to_string(),
1647            });
1648        }
1649        if let Some(budget_obj) = trace_value.get("budget").and_then(Value::as_object) {
1650            let mut budget_scope = tap::BudgetScope {
1651                budget_id: budget_obj
1652                    .get("budget_id")
1653                    .and_then(Value::as_str)
1654                    .unwrap_or_default()
1655                    .to_string(),
1656                budget_kind: budget_obj
1657                    .get("budget_kind")
1658                    .and_then(Value::as_str)
1659                    .unwrap_or_default()
1660                    .to_string(),
1661                amount_remaining: None,
1662                amount_allocated: None,
1663            };
1664            if let Some(value) = budget_obj.get("amount_remaining").and_then(Value::as_f64) {
1665                budget_scope.amount_remaining = Some(value);
1666            }
1667            if let Some(value) = budget_obj.get("amount_allocated").and_then(Value::as_f64) {
1668                budget_scope.amount_allocated = Some(value);
1669            }
1670            ctx.budget = Some(budget_scope);
1671        }
1672        if let Some(baggage_value) = trace_value.get("baggage") {
1673            ctx.baggage = parse_baggage(baggage_value);
1674        }
1675    } else if ctx.step.is_none() {
1676        if let Some(step_id) = step_id {
1677            ctx.step = Some(tap::StepScope {
1678                step_id: step_id.to_string(),
1679                ..Default::default()
1680            });
1681        }
1682    }
1683
1684    if ctx.run.is_none() {
1685        ctx.run = Some(tap::RunScope {
1686            run_id: run_id.to_string(),
1687            ..Default::default()
1688        });
1689    }
1690
1691    ctx
1692}
1693
1694fn parse_baggage(value: &Value) -> Vec<tap::BaggageItem> {
1695    if let Some(array) = value.as_array() {
1696        let mut items = Vec::with_capacity(array.len());
1697        for entry in array {
1698            if let Some(obj) = entry.as_object() {
1699                if let (Some(key), Some(value)) = (
1700                    obj.get("key").and_then(Value::as_str),
1701                    obj.get("value").and_then(Value::as_str),
1702                ) {
1703                    items.push(tap::BaggageItem {
1704                        key: key.to_string(),
1705                        value: value.to_string(),
1706                    });
1707                }
1708            }
1709        }
1710        return items;
1711    }
1712
1713    if let Some(header) = value.as_str() {
1714        return header
1715            .split(',')
1716            .filter_map(|entry| {
1717                let mut parts = entry.splitn(2, '=');
1718                let key = parts.next()?.trim();
1719                let remainder = parts.next()?.trim();
1720                let value = remainder
1721                    .split_once(';')
1722                    .map(|(val, _)| val.trim())
1723                    .unwrap_or(remainder);
1724                if key.is_empty() || value.is_empty() {
1725                    None
1726                } else {
1727                    Some(tap::BaggageItem {
1728                        key: key.to_string(),
1729                        value: value.to_string(),
1730                    })
1731                }
1732            })
1733            .collect();
1734    }
1735
1736    Vec::new()
1737}
1738
1739#[derive(Debug)]
1740struct SubmitRunSpec {
1741    dag_json: Value,
1742    inputs: Value,
1743    labels: HashMap<String, String>,
1744    seed: i64,
1745    traceparent: Option<String>,
1746    tracestate: Option<String>,
1747    baggage: Option<String>,
1748    debug: Option<Value>,
1749    breakpoints: Vec<String>,
1750}
1751
1752#[derive(Debug)]
1753enum SubmitRunOutcome {
1754    Created(Uuid),
1755    Reused(Uuid),
1756}
1757
1758#[derive(Debug)]
1759enum SubmitRunError {
1760    Invalid(String),
1761    Internal(String),
1762}
1763
1764impl std::fmt::Display for SubmitRunError {
1765    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1766        match self {
1767            SubmitRunError::Invalid(msg) => write!(f, "invalid run submission: {msg}"),
1768            SubmitRunError::Internal(msg) => write!(f, "internal run submission error: {msg}"),
1769        }
1770    }
1771}
1772
1773impl SubmitRunError {
1774    fn invalid(message: impl Into<String>) -> Self {
1775        Self::Invalid(message.into())
1776    }
1777
1778    fn internal(message: impl Into<String>) -> Self {
1779        Self::Internal(message.into())
1780    }
1781}
1782
1783fn format_schema_validation_error(err: SchemaValidationError) -> String {
1784    if err.details.is_empty() {
1785        err.schema.to_string()
1786    } else {
1787        err.details
1788            .iter()
1789            .map(|detail| detail.to_string())
1790            .collect::<Vec<_>>()
1791            .join("; ")
1792    }
1793}
1794
1795fn normalize_idempotency_key(raw: Option<&str>) -> Option<String> {
1796    raw.and_then(|value| {
1797        let trimmed = value.trim();
1798        if trimmed.is_empty() {
1799            None
1800        } else {
1801            Some(trimmed.to_owned())
1802        }
1803    })
1804}
1805
1806fn resolve_breakpoint(
1807    spec: &str,
1808    slug_map: &HashMap<String, Uuid>,
1809    step_ids: &HashSet<Uuid>,
1810) -> Result<Uuid, SubmitRunError> {
1811    let trimmed = spec.trim();
1812    if trimmed.is_empty() {
1813        return Err(SubmitRunError::invalid("breakpoint spec cannot be empty"));
1814    }
1815
1816    let (kind, target) = match trimmed.split_once(':') {
1817        Some((kind, rest)) => (kind.trim(), rest.trim()),
1818        None => ("step", trimmed),
1819    };
1820
1821    if target.is_empty() {
1822        return Err(SubmitRunError::invalid(format!(
1823            "breakpoint '{}' missing target",
1824            spec
1825        )));
1826    }
1827
1828    match kind.to_ascii_lowercase().as_str() {
1829        "step" => {
1830            if let Ok(uuid) = Uuid::parse_str(target) {
1831                if !step_ids.contains(&uuid) {
1832                    return Err(SubmitRunError::invalid(format!(
1833                        "breakpoint references unknown step id '{}'",
1834                        target
1835                    )));
1836                }
1837                return Ok(uuid);
1838            }
1839
1840            let key = target.to_ascii_lowercase();
1841            if let Some(uuid) = slug_map.get(&key) {
1842                return Ok(*uuid);
1843            }
1844
1845            Err(SubmitRunError::invalid(format!(
1846                "breakpoint references unknown step slug '{}'",
1847                target
1848            )))
1849        }
1850        other => Err(SubmitRunError::invalid(format!(
1851            "unsupported breakpoint kind '{}'",
1852            other
1853        ))),
1854    }
1855}
1856
1857async fn submit_run_impl(
1858    storage: &Storage,
1859    audit: &Audit,
1860    spec: SubmitRunSpec,
1861    idempotency_key: Option<String>,
1862    actor: &str,
1863) -> Result<SubmitRunOutcome, SubmitRunError> {
1864    if let Some(ref key) = idempotency_key {
1865        if let Some(existing_run) =
1866            storage
1867                .fetch_run_by_idempotency_key(key)
1868                .await
1869                .map_err(|err| {
1870                    SubmitRunError::internal(format!("failed to check idempotency key: {err}"))
1871                })?
1872        {
1873            info!(
1874                run_id = %existing_run.run_id,
1875                idempotency_key = %key,
1876                "reusing existing run for idempotent submission"
1877            );
1878            return Ok(SubmitRunOutcome::Reused(existing_run.run_id));
1879        }
1880    }
1881
1882    let SubmitRunSpec {
1883        dag_json,
1884        inputs,
1885        labels,
1886        seed,
1887        traceparent,
1888        tracestate,
1889        baggage,
1890        debug,
1891        breakpoints,
1892    } = spec;
1893
1894    if seed <= 0 {
1895        return Err(SubmitRunError::invalid(
1896            "seed must be a positive integer to guarantee deterministic replay",
1897        ));
1898    }
1899
1900    let steps_array = dag_json
1901        .get("steps")
1902        .and_then(Value::as_array)
1903        .ok_or_else(|| SubmitRunError::invalid("dag_json.steps must be an array"))?;
1904
1905    let inputs_value = match inputs {
1906        Value::Null => Value::Object(Map::new()),
1907        Value::Object(map) => Value::Object(map),
1908        _ => return Err(SubmitRunError::invalid("inputs must be a JSON object")),
1909    };
1910
1911    let labels_value = labels_map_to_value(&labels);
1912
1913    let run_spec_instance = json!({
1914        "dag_json": dag_json.clone(),
1915        "inputs": inputs_value.clone(),
1916        "labels": labels_value.clone(),
1917        "seed": seed,
1918        "breakpoints": breakpoints.clone(),
1919    });
1920
1921    if let Err(err) = validate_run_spec(&run_spec_instance) {
1922        return Err(SubmitRunError::invalid(format!(
1923            "run spec schema validation failed: {}",
1924            format_schema_validation_error(err)
1925        )));
1926    }
1927
1928    let mut debug_map = match debug {
1929        Some(Value::Object(map)) => map,
1930        Some(_) => {
1931            return Err(SubmitRunError::invalid(
1932                "debug metadata must be a JSON object",
1933            ))
1934        }
1935        None => Map::new(),
1936    };
1937
1938    let mut ctx_map = Map::new();
1939    ctx_map.insert("inputs".to_string(), inputs_value.clone());
1940    ctx_map.insert("labels".to_string(), labels_value.clone());
1941    if traceparent.is_some() || tracestate.is_some() || baggage.is_some() {
1942        let mut trace_obj = Map::new();
1943        if let Some(tp) = traceparent {
1944            trace_obj.insert("traceparent".to_string(), Value::String(tp));
1945        }
1946        if let Some(ts) = tracestate {
1947            trace_obj.insert("tracestate".to_string(), Value::String(ts));
1948        }
1949        if let Some(bg) = baggage {
1950            trace_obj.insert("baggage".to_string(), Value::String(bg));
1951        }
1952        ctx_map.insert("trace".to_string(), Value::Object(trace_obj));
1953    }
1954
1955    let submission_time = Utc::now();
1956
1957    struct PendingStep {
1958        idx: i32,
1959        step_id: Uuid,
1960        priority: i16,
1961        spec: Value,
1962        max_attempts: i32,
1963        retry_backoff_ms: i64,
1964        retry_backoff_factor: f64,
1965        deadline_at: Option<DateTime<Utc>>,
1966        estimated_tokens: i64,
1967        estimated_cost: f64,
1968        compensation_target: Option<String>,
1969    }
1970
1971    let mut pending_steps = Vec::with_capacity(steps_array.len());
1972    let mut seen_step_ids = HashSet::with_capacity(steps_array.len());
1973    let mut slug_to_step: HashMap<String, Uuid> = HashMap::new();
1974    let mut slug_entries: Vec<(String, Uuid)> = Vec::new();
1975
1976    for (idx, step_value) in steps_array.iter().enumerate() {
1977        if let Err(err) = validate_step_spec(step_value) {
1978            let detail = err
1979                .details
1980                .iter()
1981                .map(|err| err.to_string())
1982                .collect::<Vec<_>>()
1983                .join("; ");
1984            return Err(SubmitRunError::invalid(format!(
1985                "step schema validation failed: {detail}"
1986            )));
1987        }
1988
1989        let step_obj = step_value
1990            .as_object()
1991            .ok_or_else(|| SubmitRunError::invalid("each step must be a JSON object"))?;
1992
1993        let step_id_str = step_obj
1994            .get("id")
1995            .and_then(Value::as_str)
1996            .ok_or_else(|| SubmitRunError::invalid("step.id must be a UUID string"))?;
1997        let step_id = Uuid::parse_str(step_id_str)
1998            .map_err(|err| SubmitRunError::invalid(format!("invalid step.id: {err}")))?;
1999
2000        if !seen_step_ids.insert(step_id) {
2001            return Err(SubmitRunError::invalid(format!(
2002                "duplicate step.id '{}' detected in dag_json.steps",
2003                step_id
2004            )));
2005        }
2006
2007        if let Some(slug_value) = step_obj.get("slug").and_then(Value::as_str) {
2008            let trimmed = slug_value.trim();
2009            if trimmed.is_empty() {
2010                return Err(SubmitRunError::invalid("step.slug cannot be empty"));
2011            }
2012            let key = trimmed.to_ascii_lowercase();
2013            if slug_to_step.insert(key.clone(), step_id).is_some() {
2014                return Err(SubmitRunError::invalid(format!(
2015                    "duplicate step.slug '{}' detected in dag_json.steps",
2016                    trimmed
2017                )));
2018            }
2019            slug_entries.push((trimmed.to_string(), step_id));
2020        }
2021
2022        let priority = extract_priority(step_obj);
2023
2024        let execution_obj = step_obj.get("execution").and_then(Value::as_object);
2025        let max_attempts = execution_obj
2026            .and_then(|exec| exec.get("max_attempts").and_then(Value::as_i64))
2027            .and_then(|v| i32::try_from(v).ok())
2028            .filter(|v| *v >= 1)
2029            .unwrap_or(1);
2030        let retry_backoff_ms = execution_obj
2031            .and_then(|exec| exec.get("retry_backoff_ms").and_then(Value::as_i64))
2032            .map(|v| v.max(0))
2033            .unwrap_or(0);
2034        let retry_backoff_factor = execution_obj
2035            .and_then(|exec| exec.get("retry_backoff_factor").and_then(Value::as_f64))
2036            .filter(|v| *v >= 1.0)
2037            .unwrap_or(1.0);
2038        let deadline_at = execution_obj
2039            .and_then(|exec| exec.get("deadline_ms").and_then(Value::as_i64))
2040            .filter(|ms| *ms > 0)
2041            .map(|ms| submission_time + chrono::Duration::milliseconds(ms));
2042
2043        let cost_hint =
2044            execution_obj.and_then(|exec| exec.get("cost_hint").and_then(Value::as_object));
2045        let estimated_tokens = cost_hint
2046            .and_then(|hint| hint.get("tokens").and_then(Value::as_i64))
2047            .map(|v| v.max(0))
2048            .unwrap_or(0);
2049        let estimated_cost = cost_hint
2050            .and_then(|hint| hint.get("usd").and_then(Value::as_f64))
2051            .map(|v| v.max(0.0))
2052            .unwrap_or(0.0);
2053
2054        let compensation_target = execution_obj
2055            .and_then(|exec| exec.get("compensation").and_then(Value::as_object))
2056            .and_then(|comp| comp.get("step").and_then(Value::as_str))
2057            .map(|s| s.to_string());
2058
2059        pending_steps.push(PendingStep {
2060            idx: idx as i32,
2061            step_id,
2062            priority,
2063            spec: step_value.clone(),
2064            max_attempts,
2065            retry_backoff_ms,
2066            retry_backoff_factor,
2067            deadline_at,
2068            estimated_tokens,
2069            estimated_cost,
2070            compensation_target,
2071        });
2072    }
2073    let mut incoming_counts: HashMap<Uuid, i32> =
2074        pending_steps.iter().map(|step| (step.step_id, 0)).collect();
2075
2076    let mut edges: Vec<(Uuid, Uuid)> = Vec::new();
2077    if let Some(edges_array) = dag_json.get("edges").and_then(Value::as_array) {
2078        let mut seen_edges = HashSet::with_capacity(edges_array.len());
2079        for edge_value in edges_array {
2080            let pair = edge_value.as_array().ok_or_else(|| {
2081                SubmitRunError::invalid("dag_json.edges entries must be [from, to] arrays")
2082            })?;
2083            if pair.len() != 2 {
2084                return Err(SubmitRunError::invalid(
2085                    "dag_json.edges entries must contain exactly two elements",
2086                ));
2087            }
2088
2089            let from_str = pair[0]
2090                .as_str()
2091                .ok_or_else(|| SubmitRunError::invalid("edge source must be a UUID string"))?;
2092            let to_str = pair[1]
2093                .as_str()
2094                .ok_or_else(|| SubmitRunError::invalid("edge target must be a UUID string"))?;
2095
2096            let from = Uuid::parse_str(from_str).map_err(|err| {
2097                SubmitRunError::invalid(format!("invalid edge source UUID: {err}"))
2098            })?;
2099            let to = Uuid::parse_str(to_str).map_err(|err| {
2100                SubmitRunError::invalid(format!("invalid edge target UUID: {err}"))
2101            })?;
2102
2103            if from == to {
2104                return Err(SubmitRunError::invalid(
2105                    "edge cannot reference the same step twice",
2106                ));
2107            }
2108            if !seen_step_ids.contains(&from) || !seen_step_ids.contains(&to) {
2109                return Err(SubmitRunError::invalid("edge references unknown step id"));
2110            }
2111            if !seen_edges.insert((from, to)) {
2112                return Err(SubmitRunError::invalid(
2113                    "duplicate edge detected in dag_json.edges",
2114                ));
2115            }
2116
2117            let entry = incoming_counts.get_mut(&to).ok_or_else(|| {
2118                SubmitRunError::invalid("edge target not present in dag_json.steps")
2119            })?;
2120            let next_count = (*entry).checked_add(1).ok_or_else(|| {
2121                SubmitRunError::invalid("too many incoming edges for a single step")
2122            })?;
2123            *entry = next_count;
2124
2125            edges.push((from, to));
2126        }
2127    }
2128
2129    let mut breakpoint_payloads: Vec<Value> = Vec::new();
2130    let mut breakpoint_specs: Vec<String> = Vec::new();
2131    let mut breakpoint_ids: HashSet<Uuid> = HashSet::new();
2132    for bp in &breakpoints {
2133        let resolved = resolve_breakpoint(bp, &slug_to_step, &seen_step_ids)?;
2134        if !breakpoint_ids.insert(resolved) {
2135            return Err(SubmitRunError::invalid(format!(
2136                "duplicate breakpoint targeting step {}",
2137                resolved
2138            )));
2139        }
2140
2141        let mut record = Map::new();
2142        record.insert("spec".to_string(), Value::String(bp.clone()));
2143        record.insert("step_id".to_string(), Value::String(resolved.to_string()));
2144        breakpoint_payloads.push(Value::Object(record));
2145        breakpoint_specs.push(bp.clone());
2146    }
2147
2148    if !breakpoint_payloads.is_empty() {
2149        debug_map.insert(
2150            "breakpoints".to_string(),
2151            Value::Array(breakpoint_payloads.clone()),
2152        );
2153        let specs = breakpoint_specs
2154            .iter()
2155            .cloned()
2156            .map(Value::String)
2157            .collect::<Vec<_>>();
2158        debug_map.insert("breakpoint_specs".to_string(), Value::Array(specs));
2159    }
2160
2161    if !slug_entries.is_empty() {
2162        let mut slugs = Map::new();
2163        for (slug, step_id) in &slug_entries {
2164            slugs.insert(slug.clone(), Value::String(step_id.to_string()));
2165        }
2166        debug_map.insert("slugs".to_string(), Value::Object(slugs));
2167    }
2168
2169    debug_map
2170        .entry("active_pause".to_string())
2171        .or_insert(Value::Null);
2172
2173    if !debug_map.is_empty() {
2174        ctx_map.insert("debug".to_string(), Value::Object(debug_map.clone()));
2175    }
2176
2177    let input_ctx = Value::Object(ctx_map);
2178
2179    let run_id = Uuid::new_v4();
2180    let mut steps = Vec::with_capacity(pending_steps.len());
2181    for pending in pending_steps {
2182        let PendingStep {
2183            idx,
2184            step_id,
2185            priority,
2186            spec,
2187            max_attempts,
2188            retry_backoff_ms,
2189            retry_backoff_factor,
2190            deadline_at,
2191            estimated_tokens,
2192            estimated_cost,
2193            compensation_target,
2194        } = pending;
2195
2196        let dep_count = *incoming_counts.get(&step_id).unwrap_or(&0);
2197        let status = if dep_count == 0 {
2198            models::StepStatus::Queued
2199        } else {
2200            models::StepStatus::Blocked
2201        };
2202
2203        let compensation_step = if let Some(target) = compensation_target {
2204            let trimmed = target.trim();
2205            if trimmed.is_empty() {
2206                return Err(SubmitRunError::invalid(format!(
2207                    "compensation step reference cannot be empty for step {}",
2208                    step_id
2209                )));
2210            }
2211
2212            let resolved = if let Ok(uuid) = Uuid::parse_str(trimmed) {
2213                if !seen_step_ids.contains(&uuid) {
2214                    return Err(SubmitRunError::invalid(format!(
2215                        "compensation step '{}' not found in dag_json.steps",
2216                        trimmed
2217                    )));
2218                }
2219                uuid
2220            } else {
2221                let key = trimmed.to_ascii_lowercase();
2222                *slug_to_step.get(&key).ok_or_else(|| {
2223                    SubmitRunError::invalid(format!(
2224                        "compensation step '{}' not found in dag_json.steps",
2225                        trimmed
2226                    ))
2227                })?
2228            };
2229
2230            if resolved == step_id {
2231                return Err(SubmitRunError::invalid(format!(
2232                    "step {} cannot reference itself as a compensation step",
2233                    step_id
2234                )));
2235            }
2236
2237            Some(resolved)
2238        } else {
2239            None
2240        };
2241
2242        steps.push(models::NewStep {
2243            run_id,
2244            step_id,
2245            idx,
2246            priority,
2247            spec_json: spec,
2248            status,
2249            attempt: 0,
2250            input_snapshot: None,
2251            provider: None,
2252            provider_version: None,
2253            max_attempts,
2254            retry_backoff_ms,
2255            retry_backoff_factor,
2256            not_before: None,
2257            deadline_at,
2258            pending_dependencies: dep_count,
2259            total_dependencies: dep_count,
2260            estimated_tokens,
2261            estimated_cost,
2262            actual_tokens: None,
2263            actual_cost: None,
2264            checkpoint: None,
2265            compensation_step,
2266            compensation_scheduled: false,
2267        });
2268    }
2269
2270    let new_run = models::NewRun {
2271        run_id,
2272        status: models::RunStatus::Pending,
2273        dag_json: dag_json.clone(),
2274        input_ctx,
2275        seed,
2276        idempotency_key: idempotency_key.clone(),
2277    };
2278
2279    let stored_run_id = storage
2280        .insert_run_with_steps(new_run, &steps, &edges)
2281        .await
2282        .map_err(|err| {
2283            error!(error = ?err, "failed to persist run");
2284            SubmitRunError::internal(format!("failed to persist run: {err}"))
2285        })?;
2286
2287    info!(run_id = %stored_run_id, "accepted run submission");
2288
2289    let tenant_id = labels
2290        .get("tenant_id")
2291        .or_else(|| labels.get("tenant"))
2292        .and_then(|value| Uuid::parse_str(value).ok());
2293    let audit_resource = json!({
2294        "run_id": stored_run_id.to_string(),
2295        "idempotency_key": idempotency_key,
2296        "labels": labels,
2297    });
2298
2299    if let Err(err) = audit
2300        .append(actor, tenant_id, "run.submit", &audit_resource)
2301        .await
2302    {
2303        warn!(
2304            error = %err,
2305            run_id = %stored_run_id,
2306            "failed to append audit log for run submission"
2307        );
2308    }
2309
2310    Ok(SubmitRunOutcome::Created(stored_run_id))
2311}
2312
2313#[tonic::async_trait]
2314impl RuntimeService for RuntimeApiService {
2315    async fn submit_run(
2316        &self,
2317        request: Request<runtime::SubmitRunRequest>,
2318    ) -> Result<Response<runtime::SubmitRunResponse>, Status> {
2319        let metadata = request.metadata().clone();
2320        let role = self.require_role(&metadata, Role::Writer).await?;
2321        let req = request.into_inner();
2322        let runtime::SubmitRunRequest {
2323            spec,
2324            idempotency_key: raw_idempotency_key,
2325        } = req;
2326
2327        let idempotency_key = normalize_idempotency_key(Some(raw_idempotency_key.as_str()));
2328
2329        let spec = spec.ok_or_else(|| invalid_argument("spec is required"))?;
2330
2331        let dag_json = parse_dag_json(&spec.dag_json)?;
2332        let inputs_value = spec
2333            .inputs
2334            .as_ref()
2335            .map(prost_struct_to_json)
2336            .unwrap_or_else(|| Value::Object(Map::new()));
2337
2338        let traceparent = metadata
2339            .get("traceparent")
2340            .and_then(|value| value.to_str().ok())
2341            .map(|s| s.to_string());
2342        let tracestate = metadata
2343            .get("tracestate")
2344            .and_then(|value| value.to_str().ok())
2345            .map(|s| s.to_string());
2346        let baggage = metadata
2347            .get("baggage")
2348            .and_then(|value| value.to_str().ok())
2349            .map(|s| s.to_string());
2350
2351        let submission = SubmitRunSpec {
2352            dag_json,
2353            inputs: inputs_value,
2354            labels: spec.labels.clone(),
2355            seed: spec.seed,
2356            traceparent,
2357            tracestate,
2358            baggage,
2359            debug: None,
2360            breakpoints: spec.breakpoints.clone(),
2361        };
2362
2363        let actor = Self::actor_from_metadata(&metadata, role);
2364
2365        let outcome = submit_run_impl(
2366            self.storage.as_ref(),
2367            self.audit.as_ref(),
2368            submission,
2369            idempotency_key,
2370            &actor,
2371        )
2372        .await
2373        .map_err(|err| match err {
2374            SubmitRunError::Invalid(message) => invalid_argument(message),
2375            SubmitRunError::Internal(message) => internal_error(message),
2376        })?;
2377
2378        let run_id = match outcome {
2379            SubmitRunOutcome::Created(run_id) | SubmitRunOutcome::Reused(run_id) => run_id,
2380        };
2381
2382        Ok(Response::new(runtime::SubmitRunResponse {
2383            run_id: run_id.to_string(),
2384        }))
2385    }
2386
2387    async fn get_run(
2388        &self,
2389        request: Request<runtime::GetRunRequest>,
2390    ) -> Result<Response<runtime::GetRunResponse>, Status> {
2391        let metadata = request.metadata().clone();
2392        self.require_role(&metadata, Role::Reader).await?;
2393        let req = request.into_inner();
2394        let run_id = Uuid::parse_str(&req.run_id)
2395            .map_err(|err| invalid_argument(format!("invalid run_id: {err}")))?;
2396
2397        let run = self
2398            .storage
2399            .fetch_run(run_id)
2400            .await
2401            .map_err(|err| internal_error(format!("failed to fetch run: {err}")))?
2402            .ok_or_else(|| Status::not_found("run not found"))?;
2403
2404        let steps = self
2405            .storage
2406            .fetch_steps_for_run(run_id)
2407            .await
2408            .map_err(|err| internal_error(format!("failed to fetch steps: {err}")))?;
2409
2410        let run_detail = build_run_detail(run, steps)?;
2411
2412        Ok(Response::new(runtime::GetRunResponse {
2413            run: Some(run_detail),
2414        }))
2415    }
2416
2417    type TapRunStream = Pin<Box<dyn Stream<Item = Result<runtime::RunEvent, Status>> + Send>>;
2418
2419    async fn tap_run(
2420        &self,
2421        request: Request<runtime::TapRunRequest>,
2422    ) -> Result<Response<Self::TapRunStream>, Status> {
2423        let metadata = request.metadata().clone();
2424        self.require_role(&metadata, Role::Reader).await?;
2425        let req = request.into_inner();
2426        let run_uuid = Uuid::parse_str(&req.run_id)
2427            .map_err(|err| invalid_argument(format!("invalid run_id: {err}")))?;
2428
2429        // Ensure the run exists before entering the stream loop.
2430        if self
2431            .storage
2432            .fetch_run(run_uuid)
2433            .await
2434            .map_err(|err| internal_error(format!("failed to verify run: {err}")))?
2435            .is_none()
2436        {
2437            return Err(Status::not_found("run not found"));
2438        }
2439
2440        let storage = Arc::clone(&self.storage);
2441        let (tx, rx) = mpsc::channel::<Result<runtime::RunEvent, Status>>(32);
2442
2443        tokio::spawn(async move {
2444            let mut last_id: i64 = 0;
2445
2446            loop {
2447                if tx.is_closed() {
2448                    break;
2449                }
2450
2451                match storage
2452                    .fetch_outbox_for_run_since(run_uuid, last_id, 256)
2453                    .await
2454                {
2455                    Ok(events) => {
2456                        if events.is_empty() {
2457                            sleep(Duration::from_millis(500)).await;
2458                            continue;
2459                        }
2460
2461                        for event in events {
2462                            last_id = event.id;
2463                            match run_event_from_outbox(event) {
2464                                Ok(run_event) => {
2465                                    if tx.send(Ok(run_event)).await.is_err() {
2466                                        return;
2467                                    }
2468                                }
2469                                Err(status) => {
2470                                    let _ = tx.send(Err(status)).await;
2471                                    return;
2472                                }
2473                            }
2474                        }
2475                    }
2476                    Err(err) => {
2477                        warn!(run_id = %run_uuid, error = %err, "tap stream failed to read outbox events");
2478                        let status = internal_error(format!("failed to fetch run events: {err}"));
2479                        let _ = tx.send(Err(status)).await;
2480                        break;
2481                    }
2482                }
2483            }
2484        });
2485
2486        let stream = ReceiverStream::new(rx);
2487        Ok(Response::new(Box::pin(stream) as Self::TapRunStream))
2488    }
2489
2490    async fn replay_run(
2491        &self,
2492        request: Request<runtime::ReplayRunRequest>,
2493    ) -> Result<Response<runtime::ReplayRunResponse>, Status> {
2494        let metadata = request.metadata().clone();
2495        self.require_role(&metadata, Role::Writer).await?;
2496        let req = request.into_inner();
2497        let run_uuid = Uuid::parse_str(&req.run_id)
2498            .map_err(|err| invalid_argument(format!("invalid run_id: {err}")))?;
2499
2500        if self
2501            .storage
2502            .fetch_run(run_uuid)
2503            .await
2504            .map_err(|err| internal_error(format!("failed to load run: {err}")))?
2505            .is_none()
2506        {
2507            return Err(Status::not_found("run not found"));
2508        }
2509
2510        let strategy = match runtime::ReplayMode::try_from(req.mode).ok() {
2511            Some(runtime::ReplayMode::Unspecified)
2512            | Some(runtime::ReplayMode::Full)
2513            | Some(runtime::ReplayMode::Diff) => runtime::ReplayStrategy::Mocked,
2514            Some(runtime::ReplayMode::StepOnly) => {
2515                return Err(Status::unimplemented(
2516                    "Replay mode STEP_ONLY is not yet supported",
2517                ));
2518            }
2519            None => {
2520                return Err(invalid_argument("unrecognized replay mode"));
2521            }
2522        };
2523
2524        let steps = self
2525            .storage
2526            .fetch_steps_for_run(run_uuid)
2527            .await
2528            .map_err(|err| internal_error(format!("failed to load steps for replay: {err}")))?;
2529        let attempts = self
2530            .storage
2531            .fetch_step_attempts_for_run(run_uuid)
2532            .await
2533            .map_err(|err| {
2534                internal_error(format!("failed to load step attempts for replay: {err}"))
2535            })?;
2536
2537        let mut attempts_by_step: HashMap<Uuid, Vec<models::StepAttempt>> = HashMap::new();
2538        for attempt in attempts {
2539            attempts_by_step
2540                .entry(attempt.step_id)
2541                .or_default()
2542                .push(attempt);
2543        }
2544        for entries in attempts_by_step.values_mut() {
2545            entries.sort_by_key(|attempt| attempt.attempt);
2546        }
2547
2548        let replay_mode = runtime::ReplayMode::try_from(req.mode).unwrap_or(runtime::ReplayMode::Full);
2549        let diff_mode = matches!(replay_mode, runtime::ReplayMode::Diff);
2550
2551        let mut proto_steps = Vec::with_capacity(steps.len());
2552        let mut all_within = true;
2553
2554        for step in steps {
2555            let attempts_for_step = attempts_by_step.get(&step.step_id);
2556            let latest_attempt = attempts_for_step.and_then(|entries| entries.last());
2557
2558            let recorded_snapshot = latest_attempt
2559                .and_then(|attempt| attempt.output_snapshot.clone())
2560                .or_else(|| step.output_snapshot.clone())
2561                .ok_or_else(|| {
2562                    Status::failed_precondition(format!(
2563                        "step {} missing output_snapshot; cannot replay",
2564                        step.step_id
2565                    ))
2566                })?;
2567
2568            let baseline_attempt = if diff_mode {
2569                attempts_for_step.and_then(|entries| {
2570                    if entries.len() >= 2 {
2571                        entries.get(entries.len() - 2)
2572                    } else {
2573                        None
2574                    }
2575                })
2576            } else {
2577                latest_attempt
2578            };
2579
2580            let baseline_output = baseline_attempt
2581                .and_then(|attempt| attempt.output_snapshot.clone())
2582                .unwrap_or_else(|| recorded_snapshot.clone());
2583
2584            let current_output = if diff_mode {
2585                recorded_snapshot.clone()
2586            } else {
2587                step.output_json
2588                    .clone()
2589                    .unwrap_or_else(|| recorded_snapshot.clone())
2590            };
2591
2592            let drift_result = compute_drift(
2593                &baseline_output,
2594                &current_output,
2595                DEFAULT_TOKEN_TOLERANCE,
2596                DEFAULT_COST_TOLERANCE,
2597            );
2598            if !drift_result.within_tolerance {
2599                all_within = false;
2600            }
2601
2602            let snapshot_value = json_to_prost_value(&recorded_snapshot)
2603                .map_err(|err| internal_error(format!("failed to encode snapshot: {err}")))?;
2604
2605            let recorded_meta = wrap_attempt_output(baseline_attempt, &baseline_output);
2606            let current_meta = wrap_attempt_output(latest_attempt, &current_output);
2607
2608            let drift_proto = runtime::ReplayStepDrift {
2609                recorded: Some(json_to_prost_value(&recorded_meta).map_err(|err| {
2610                    internal_error(format!("failed to encode recorded drift snapshot: {err}"))
2611                })?),
2612                current: Some(json_to_prost_value(&current_meta).map_err(|err| {
2613                    internal_error(format!("failed to encode current drift snapshot: {err}"))
2614                })?),
2615                token_delta: drift_result.token_delta,
2616                cost_delta: drift_result.cost_delta,
2617                within_tolerance: drift_result.within_tolerance,
2618            };
2619
2620            proto_steps.push(runtime::ReplayStep {
2621                step_id: step.step_id.to_string(),
2622                snapshot: Some(snapshot_value),
2623                drift: Some(drift_proto),
2624            });
2625        }
2626
2627        let response = runtime::ReplayRunResponse {
2628            run_id: req.run_id,
2629            all_within_tolerance: all_within,
2630            strategy: strategy as i32,
2631            steps: proto_steps,
2632        };
2633
2634        Ok(Response::new(response))
2635    }
2636
2637    async fn update_breakpoints(
2638        &self,
2639        request: Request<runtime::UpdateBreakpointsRequest>,
2640    ) -> Result<Response<runtime::UpdateBreakpointsResponse>, Status> {
2641        let metadata = request.metadata().clone();
2642        let role = self.require_role(&metadata, Role::Writer).await?;
2643        let actor = Self::actor_from_metadata(&metadata, role);
2644        let req = request.into_inner();
2645
2646        let run_uuid = Uuid::parse_str(&req.run_id)
2647            .map_err(|err| invalid_argument(format!("invalid run_id: {err}")))?;
2648
2649        if !req.clear_all && req.add.is_empty() && req.remove.is_empty() {
2650            return Err(invalid_argument(
2651                "update_breakpoints requires add/remove entries or clear_all",
2652            ));
2653        }
2654
2655        let run = self
2656            .storage
2657            .fetch_run(run_uuid)
2658            .await
2659            .map_err(|err| internal_error(format!("failed to fetch run: {err}")))?
2660            .ok_or_else(|| Status::not_found("run not found"))?;
2661
2662        let step_ids = collect_step_ids(&run);
2663
2664        let ctx_value = run.input_ctx.clone();
2665        let mut ctx_map = match ctx_value {
2666            Value::Object(map) => map,
2667            Value::Null => Map::new(),
2668            other => {
2669                return Err(internal_error(format!(
2670                    "run input context must be an object, found {other}"
2671                )))
2672            }
2673        };
2674
2675        let debug_entry = ctx_map
2676            .entry("debug".to_string())
2677            .or_insert_with(|| Value::Object(Map::new()));
2678        let debug_map = debug_entry.as_object_mut().ok_or_else(|| {
2679            internal_error("run debug context is not an object; unable to update breakpoints")
2680        })?;
2681
2682        let mut breakpoints = load_debug_breakpoints(debug_map);
2683        let existing_slug_entries = load_slug_map(debug_map);
2684
2685        let mut slug_lookup: HashMap<String, Uuid> = existing_slug_entries
2686            .iter()
2687            .map(|(slug, id)| (slug.to_ascii_lowercase(), *id))
2688            .collect();
2689        if slug_lookup.is_empty() {
2690            for (slug, id) in collect_slug_entries(&run) {
2691                slug_lookup.insert(slug.to_ascii_lowercase(), id);
2692            }
2693        }
2694
2695        let slug_entries_store: Vec<(String, Uuid)> = if existing_slug_entries.is_empty() {
2696            collect_slug_entries(&run)
2697        } else {
2698            existing_slug_entries
2699                .iter()
2700                .map(|(slug, id)| (slug.clone(), *id))
2701                .collect()
2702        };
2703
2704        if req.clear_all {
2705            breakpoints.clear();
2706        }
2707
2708        let removal_specs: HashSet<String> = req
2709            .remove
2710            .iter()
2711            .map(|s| s.trim().to_string())
2712            .filter(|s| !s.is_empty())
2713            .collect();
2714
2715        let mut removal_ids: HashSet<Uuid> = HashSet::new();
2716        for spec in &req.remove {
2717            if let Ok(step_id) = resolve_breakpoint(spec, &slug_lookup, &step_ids) {
2718                removal_ids.insert(step_id);
2719            }
2720        }
2721
2722        if !removal_specs.is_empty() || !removal_ids.is_empty() {
2723            breakpoints.retain(|(spec, step_id)| {
2724                let spec_trim = spec.trim();
2725                let matches_spec = removal_specs.contains(spec_trim);
2726                let matches_id = removal_ids.contains(step_id);
2727                !(matches_spec || matches_id)
2728            });
2729        }
2730
2731        for spec in &req.add {
2732            let step_id = resolve_breakpoint(spec, &slug_lookup, &step_ids)
2733                .map_err(|err| invalid_argument(err.to_string()))?;
2734            breakpoints.retain(|(_, existing)| existing != &step_id);
2735            breakpoints.push((spec.trim().to_string(), step_id));
2736        }
2737
2738        breakpoints.sort_by(|a, b| a.0.cmp(&b.0));
2739        store_debug_breakpoints(debug_map, &breakpoints);
2740        if !slug_entries_store.is_empty() {
2741            store_slug_map(debug_map, &slug_entries_store);
2742        }
2743
2744        let mut tx = self
2745            .storage
2746            .pool()
2747            .begin()
2748            .await
2749            .map_err(|err| internal_error(format!("failed to begin transaction: {err}")))?;
2750
2751        self.storage
2752            .update_run_input_ctx_tx(&mut tx, run_uuid, Value::Object(ctx_map.clone()))
2753            .await
2754            .map_err(|err| {
2755                internal_error(format!("failed to persist breakpoint changes: {err}"))
2756            })?;
2757
2758        let payload_breakpoints = breakpoints
2759            .iter()
2760            .map(|(spec, step_id)| {
2761                json!({
2762                    "spec": spec,
2763                    "step_id": step_id.to_string(),
2764                })
2765            })
2766            .collect::<Vec<_>>();
2767
2768        self.storage
2769            .enqueue_outbox_event(
2770                &mut tx,
2771                NewOutboxEvent {
2772                    run_id: run_uuid,
2773                    step_id: None,
2774                    kind: "run_breakpoints_updated".to_string(),
2775                    payload: json!({
2776                        "run_id": run_uuid.to_string(),
2777                        "breakpoints": payload_breakpoints,
2778                        "added": req.add,
2779                        "removed": req.remove,
2780                        "cleared": req.clear_all,
2781                    }),
2782                },
2783            )
2784            .await
2785            .map_err(|err| internal_error(format!("failed to enqueue outbox event: {err}")))?;
2786
2787        tx.commit()
2788            .await
2789            .map_err(|err| internal_error(format!("failed to commit breakpoint update: {err}")))?;
2790
2791        if let Err(err) = self
2792            .audit
2793            .append(
2794                &actor,
2795                self.storage
2796                    .fetch_run_tenant(run_uuid)
2797                    .await
2798                    .ok()
2799                    .flatten()
2800                    .and_then(|tenant| Uuid::parse_str(&tenant).ok()),
2801                "debug.breakpoints.update",
2802                &json!({
2803                    "run_id": run_uuid.to_string(),
2804                    "added": req.add,
2805                    "removed": req.remove,
2806                    "cleared": req.clear_all,
2807                }),
2808            )
2809            .await
2810        {
2811            warn!(error = %err, run = %run_uuid, "failed to append audit record for breakpoint update");
2812        }
2813
2814        Ok(Response::new(runtime::UpdateBreakpointsResponse {
2815            breakpoints: breakpoints.into_iter().map(|(spec, _)| spec).collect(),
2816        }))
2817    }
2818
2819    async fn resume_run(
2820        &self,
2821        request: Request<runtime::ResumeRunRequest>,
2822    ) -> Result<Response<runtime::ResumeRunResponse>, Status> {
2823        let metadata = request.metadata().clone();
2824        let role = self.require_role(&metadata, Role::Writer).await?;
2825        let actor = Self::actor_from_metadata(&metadata, role);
2826        let req = request.into_inner();
2827
2828        let run_uuid = Uuid::parse_str(&req.run_id)
2829            .map_err(|err| invalid_argument(format!("invalid run_id: {err}")))?;
2830
2831        let run = self
2832            .storage
2833            .fetch_run(run_uuid)
2834            .await
2835            .map_err(|err| internal_error(format!("failed to fetch run: {err}")))?
2836            .ok_or_else(|| Status::not_found("run not found"))?;
2837
2838        if run.status != models::RunStatus::PausedAtStep {
2839            return Err(Status::failed_precondition(
2840                "run is not paused at a breakpoint",
2841            ));
2842        }
2843
2844        let ctx_value = run.input_ctx.clone();
2845        let mut ctx_map = match ctx_value {
2846            Value::Object(map) => map,
2847            Value::Null => Map::new(),
2848            other => {
2849                return Err(internal_error(format!(
2850                    "run input context must be an object, found {other}"
2851                )))
2852            }
2853        };
2854
2855        let debug_entry = ctx_map
2856            .get_mut("debug")
2857            .and_then(Value::as_object_mut)
2858            .ok_or_else(|| internal_error("run debug context missing; cannot resume paused run"))?;
2859
2860        let paused_step = extract_active_pause_step_id(debug_entry).ok_or_else(|| {
2861            Status::failed_precondition("run is paused but no active step recorded")
2862        })?;
2863
2864        debug_entry.insert("active_pause".to_string(), Value::Null);
2865
2866        let mut tx = self
2867            .storage
2868            .pool()
2869            .begin()
2870            .await
2871            .map_err(|err| internal_error(format!("failed to begin transaction: {err}")))?;
2872
2873        self.storage
2874            .update_run_input_ctx_tx(&mut tx, run_uuid, Value::Object(ctx_map.clone()))
2875            .await
2876            .map_err(|err| internal_error(format!("failed to persist pause state: {err}")))?;
2877
2878        self.storage
2879            .set_run_status_tx(&mut tx, run_uuid, models::RunStatus::Running)
2880            .await
2881            .map_err(|err| internal_error(format!("failed to update run status: {err}")))?;
2882
2883        self.storage
2884            .enqueue_outbox_event(
2885                &mut tx,
2886                NewOutboxEvent {
2887                    run_id: run_uuid,
2888                    step_id: Some(paused_step),
2889                    kind: "run_resumed".to_string(),
2890                    payload: json!({
2891                        "run_id": run_uuid.to_string(),
2892                        "step_id": paused_step.to_string(),
2893                    }),
2894                },
2895            )
2896            .await
2897            .map_err(|err| internal_error(format!("failed to enqueue resume event: {err}")))?;
2898
2899        tx.commit()
2900            .await
2901            .map_err(|err| internal_error(format!("failed to commit resume transaction: {err}")))?;
2902
2903        if let Err(err) = self
2904            .audit
2905            .append(
2906                &actor,
2907                self.storage
2908                    .fetch_run_tenant(run_uuid)
2909                    .await
2910                    .ok()
2911                    .flatten()
2912                    .and_then(|tenant| Uuid::parse_str(&tenant).ok()),
2913                "debug.breakpoints.resume",
2914                &json!({
2915                    "run_id": run_uuid.to_string(),
2916                    "step_id": paused_step.to_string(),
2917                }),
2918            )
2919            .await
2920        {
2921            warn!(error = %err, run = %run_uuid, "failed to append audit record for resume");
2922        }
2923
2924        Ok(Response::new(runtime::ResumeRunResponse {
2925            status: runtime::RunStatus::Running as i32,
2926            resumed_step_id: paused_step.to_string(),
2927        }))
2928    }
2929
2930    async fn patch_step(
2931        &self,
2932        request: Request<runtime::PatchStepRequest>,
2933    ) -> Result<Response<runtime::PatchStepResponse>, Status> {
2934        let metadata = request.metadata().clone();
2935        let role = self.require_role(&metadata, Role::Writer).await?;
2936        let actor = Self::actor_from_metadata(&metadata, role);
2937        let req = request.into_inner();
2938
2939        let run_uuid = Uuid::parse_str(&req.run_id)
2940            .map_err(|err| invalid_argument(format!("invalid run_id: {err}")))?;
2941        let step_uuid = Uuid::parse_str(&req.step_id)
2942            .map_err(|err| invalid_argument(format!("invalid step_id: {err}")))?;
2943
2944        let run = self
2945            .storage
2946            .fetch_run(run_uuid)
2947            .await
2948            .map_err(|err| internal_error(format!("failed to fetch run: {err}")))?
2949            .ok_or_else(|| Status::not_found("run not found"))?;
2950
2951        if run.status != models::RunStatus::PausedAtStep {
2952            return Err(Status::failed_precondition(
2953                "run must be paused before patching step",
2954            ));
2955        }
2956
2957        let ctx_value = run.input_ctx.clone();
2958        let paused_step = match ctx_value {
2959            Value::Object(ref map) => map
2960                .get("debug")
2961                .and_then(Value::as_object)
2962                .and_then(extract_active_pause_step_id),
2963            _ => None,
2964        };
2965
2966        if let Some(active_step) = paused_step {
2967            if active_step != step_uuid {
2968                return Err(Status::failed_precondition(
2969                    "only the paused step can be patched",
2970                ));
2971            }
2972        }
2973
2974        let inputs_patch_value = req.inputs_patch.as_ref().map(prost_struct_to_json);
2975        let env_override = req.env.as_ref().map(prost_struct_to_json);
2976
2977        let model_override = req.model.clone();
2978
2979        if inputs_patch_value.is_none() && env_override.is_none() && model_override.is_none() {
2980            return Err(invalid_argument(
2981                "patch_step requires inputs_patch, env, or model to be set",
2982            ));
2983        }
2984
2985        let steps = self
2986            .storage
2987            .fetch_steps_for_run(run_uuid)
2988            .await
2989            .map_err(|err| internal_error(format!("failed to fetch steps: {err}")))?;
2990
2991        let step = steps
2992            .into_iter()
2993            .find(|step| step.step_id == step_uuid)
2994            .ok_or_else(|| Status::not_found("step not found"))?;
2995
2996        let mut spec_map = match step.spec_json.clone() {
2997            Value::Object(map) => map,
2998            _ => {
2999                return Err(internal_error(
3000                    "step spec must be a JSON object to apply patches",
3001                ))
3002            }
3003        };
3004
3005        let inputs_entry = spec_map
3006            .entry("inputs".to_string())
3007            .or_insert_with(|| Value::Object(Map::new()));
3008        let inputs_map = inputs_entry
3009            .as_object_mut()
3010            .ok_or_else(|| internal_error("step inputs is not an object; cannot apply patch"))?;
3011
3012        if let Some(patch_value) = inputs_patch_value.as_ref() {
3013            let patch_obj = patch_value
3014                .as_object()
3015                .ok_or_else(|| invalid_argument("inputs_patch must be a JSON object"))?;
3016            merge_json_object(inputs_map, patch_obj);
3017        }
3018
3019        if let Some(env_value) = env_override.as_ref() {
3020            if !env_value.is_object() {
3021                return Err(invalid_argument("env override must be a JSON object"));
3022            }
3023            inputs_map.insert("env".to_string(), env_value.clone());
3024        }
3025
3026        if let Some(model) = model_override.as_ref() {
3027            inputs_map.insert("model".to_string(), Value::String(model.clone()));
3028        }
3029
3030        let updated_spec = Value::Object(spec_map.clone());
3031
3032        let mut tx = self
3033            .storage
3034            .pool()
3035            .begin()
3036            .await
3037            .map_err(|err| internal_error(format!("failed to begin transaction: {err}")))?;
3038
3039        self.storage
3040            .update_step_spec_tx(&mut tx, run_uuid, step_uuid, updated_spec.clone())
3041            .await
3042            .map_err(|err| internal_error(format!("failed to persist step patch: {err}")))?;
3043
3044        self.storage
3045            .enqueue_outbox_event(
3046                &mut tx,
3047                NewOutboxEvent {
3048                    run_id: run_uuid,
3049                    step_id: Some(step_uuid),
3050                    kind: "step_patched".to_string(),
3051                    payload: json!({
3052                        "run_id": run_uuid.to_string(),
3053                        "step_id": step_uuid.to_string(),
3054                        "inputs_patch": inputs_patch_value,
3055                        "env_override": env_override,
3056                        "model_override": model_override,
3057                        "note": req.note,
3058                    }),
3059                },
3060            )
3061            .await
3062            .map_err(|err| internal_error(format!("failed to enqueue patch event: {err}")))?;
3063
3064        tx.commit()
3065            .await
3066            .map_err(|err| internal_error(format!("failed to commit patch transaction: {err}")))?;
3067
3068        if let Err(err) = self
3069            .audit
3070            .append(
3071                &actor,
3072                self.storage
3073                    .fetch_run_tenant(run_uuid)
3074                    .await
3075                    .ok()
3076                    .flatten()
3077                    .and_then(|tenant| Uuid::parse_str(&tenant).ok()),
3078                "debug.step.patch",
3079                &json!({
3080                    "run_id": run_uuid.to_string(),
3081                    "step_id": step_uuid.to_string(),
3082                    "note": req.note,
3083                }),
3084            )
3085            .await
3086        {
3087            warn!(error = %err, run = %run_uuid, step = %step_uuid, "failed to append audit record for step patch");
3088        }
3089
3090        Ok(Response::new(runtime::PatchStepResponse {
3091            step_id: step_uuid.to_string(),
3092        }))
3093    }
3094
3095    async fn create_branch_run(
3096        &self,
3097        request: Request<runtime::CreateBranchRunRequest>,
3098    ) -> Result<Response<runtime::CreateBranchRunResponse>, Status> {
3099        let metadata = request.metadata().clone();
3100        let role = self.require_role(&metadata, Role::Writer).await?;
3101        let actor = Self::actor_from_metadata(&metadata, role);
3102        let req = request.into_inner();
3103
3104        let run_uuid = Uuid::parse_str(&req.run_id)
3105            .map_err(|err| invalid_argument(format!("invalid run_id: {err}")))?;
3106
3107        let from_step_spec = req.from_step.trim().to_string();
3108        if from_step_spec.is_empty() {
3109            return Err(invalid_argument("from_step is required"));
3110        }
3111
3112        let seed_mode = runtime::BranchSeedMode::try_from(req.seed_mode)
3113            .unwrap_or(runtime::BranchSeedMode::Same);
3114
3115        let run = self
3116            .storage
3117            .fetch_run(run_uuid)
3118            .await
3119            .map_err(|err| internal_error(format!("failed to fetch run: {err}")))?
3120            .ok_or_else(|| Status::not_found("run not found"))?;
3121
3122        let (inputs_value, mut labels_map) = unpack_input_ctx(&run.input_ctx);
3123
3124        let debug_entry = run
3125            .input_ctx
3126            .get("debug")
3127            .and_then(Value::as_object)
3128            .cloned()
3129            .unwrap_or_default();
3130        let slug_lookup = load_slug_map(&debug_entry);
3131        let step_ids = collect_step_ids(&run);
3132        let target_step = resolve_breakpoint(&from_step_spec, &slug_lookup, &step_ids)
3133            .map_err(|err| invalid_argument(err.to_string()))?;
3134
3135        let branch_dag = build_branch_dag(&run.dag_json, target_step)
3136            .map_err(|err| invalid_argument(err.to_string()))?;
3137
3138        let mut inputs_map = match inputs_value {
3139            Value::Object(map) => map,
3140            Value::Null => Map::new(),
3141            _ => Map::new(),
3142        };
3143
3144        let mut inputs_patch_debug: Option<Value> = None;
3145        if let Some(patch) = req.inputs_patch.as_ref() {
3146            let patch_value = prost_struct_to_json(patch);
3147            let patch_obj = patch_value
3148                .as_object()
3149                .ok_or_else(|| invalid_argument("inputs_patch must be a JSON object"))?;
3150            merge_json_object(&mut inputs_map, patch_obj);
3151            inputs_patch_debug = Some(Value::Object(patch_obj.clone()));
3152        }
3153
3154        let mut labels_patch_debug: Option<Value> = None;
3155        if let Some(labels_patch) = req.labels_patch.as_ref() {
3156            let patch_value = prost_struct_to_json(labels_patch);
3157            let patch_obj = patch_value
3158                .as_object()
3159                .ok_or_else(|| invalid_argument("labels_patch must be a JSON object"))?;
3160            for (key, value) in patch_obj {
3161                let rendered = value
3162                    .as_str()
3163                    .map(|s| s.to_string())
3164                    .unwrap_or_else(|| value.to_string());
3165                labels_map.insert(key.clone(), rendered);
3166            }
3167            labels_patch_debug = Some(Value::Object(patch_obj.clone()));
3168        }
3169
3170        labels_map.insert("branch_parent".to_string(), run_uuid.to_string());
3171        labels_map.insert("branch_from_step".to_string(), target_step.to_string());
3172
3173        let branch_seed = match seed_mode {
3174            runtime::BranchSeedMode::Same | runtime::BranchSeedMode::Unspecified => run.seed,
3175            runtime::BranchSeedMode::New => Utc::now().timestamp_nanos_opt().unwrap_or(run.seed),
3176            runtime::BranchSeedMode::Custom => req
3177                .seed
3178                .as_ref()
3179                .copied()
3180                .ok_or_else(|| invalid_argument("seed value required for custom mode"))?,
3181        };
3182
3183        let prompt_packs = collect_prompt_packs_json(&run.dag_json);
3184
3185        let mut branch_meta = Map::new();
3186        branch_meta.insert(
3187            "parent_run_id".to_string(),
3188            Value::String(run_uuid.to_string()),
3189        );
3190        branch_meta.insert(
3191            "parent_step_id".to_string(),
3192            Value::String(target_step.to_string()),
3193        );
3194        branch_meta.insert(
3195            "from_step_spec".to_string(),
3196            Value::String(from_step_spec.clone()),
3197        );
3198        branch_meta.insert(
3199            "parent_seed".to_string(),
3200            Value::Number(Number::from(run.seed)),
3201        );
3202        branch_meta.insert(
3203            "branch_seed".to_string(),
3204            Value::Number(Number::from(branch_seed)),
3205        );
3206        let seed_mode_str = match seed_mode {
3207            runtime::BranchSeedMode::Same | runtime::BranchSeedMode::Unspecified => "same",
3208            runtime::BranchSeedMode::New => "new",
3209            runtime::BranchSeedMode::Custom => "custom",
3210        };
3211        branch_meta.insert(
3212            "seed_mode".to_string(),
3213            Value::String(seed_mode_str.to_string()),
3214        );
3215        branch_meta.insert(
3216            "created_at".to_string(),
3217            Value::String(Utc::now().to_rfc3339()),
3218        );
3219        branch_meta.insert(
3220            "prompt_packs".to_string(),
3221            Value::Array(prompt_packs.into_iter().map(Value::String).collect()),
3222        );
3223        let note = req.note.trim();
3224        if !note.is_empty() {
3225            branch_meta.insert("note".to_string(), Value::String(note.to_string()));
3226        }
3227        if let Some(patch) = inputs_patch_debug {
3228            branch_meta.insert("inputs_patch".to_string(), patch);
3229        }
3230        if let Some(patch) = labels_patch_debug {
3231            branch_meta.insert("labels_patch".to_string(), patch);
3232        }
3233
3234        let mut debug_extra = Map::new();
3235        debug_extra.insert("branch".to_string(), Value::Object(branch_meta));
3236
3237        let submission = SubmitRunSpec {
3238            dag_json: branch_dag,
3239            inputs: Value::Object(inputs_map.clone()),
3240            labels: labels_map.clone(),
3241            seed: branch_seed,
3242            traceparent: None,
3243            tracestate: None,
3244            baggage: None,
3245            debug: Some(Value::Object(debug_extra.clone())),
3246            breakpoints: Vec::new(),
3247        };
3248
3249        let outcome = submit_run_impl(
3250            self.storage.as_ref(),
3251            self.audit.as_ref(),
3252            submission,
3253            None,
3254            &actor,
3255        )
3256        .await
3257        .map_err(|err| match err {
3258            SubmitRunError::Invalid(message) => invalid_argument(message),
3259            SubmitRunError::Internal(message) => internal_error(message),
3260        })?;
3261
3262        let new_run_uuid = match outcome {
3263            SubmitRunOutcome::Created(run_id) | SubmitRunOutcome::Reused(run_id) => run_id,
3264        };
3265
3266        let mut tx = self.storage.pool().begin().await.map_err(|err| {
3267            internal_error(format!("failed to begin branch event transaction: {err}"))
3268        })?;
3269
3270        self.storage
3271            .enqueue_outbox_event(
3272                &mut tx,
3273                NewOutboxEvent {
3274                    run_id: run_uuid,
3275                    step_id: Some(target_step),
3276                    kind: "run_branch_created".to_string(),
3277                    payload: json!({
3278                        "run_id": run_uuid.to_string(),
3279                        "child_run_id": new_run_uuid.to_string(),
3280                        "from_step_spec": from_step_spec,
3281                        "seed": branch_seed,
3282                    }),
3283                },
3284            )
3285            .await
3286            .map_err(|err| internal_error(format!("failed to enqueue branch event: {err}")))?;
3287
3288        tx.commit().await.map_err(|err| {
3289            internal_error(format!("failed to commit branch event transaction: {err}"))
3290        })?;
3291
3292        if let Err(err) = self
3293            .audit
3294            .append(
3295                &actor,
3296                self.storage
3297                    .fetch_run_tenant(run_uuid)
3298                    .await
3299                    .ok()
3300                    .flatten()
3301                    .and_then(|tenant| Uuid::parse_str(&tenant).ok()),
3302                "debug.branch.create",
3303                &json!({
3304                    "run_id": run_uuid.to_string(),
3305                    "child_run_id": new_run_uuid.to_string(),
3306                    "step_id": target_step.to_string(),
3307                }),
3308            )
3309            .await
3310        {
3311            warn!(error = %err, run = %run_uuid, "failed to append audit record for branch");
3312        }
3313
3314        Ok(Response::new(runtime::CreateBranchRunResponse {
3315            run_id: new_run_uuid.to_string(),
3316        }))
3317    }
3318
3319    async fn get_attestations(
3320        &self,
3321        request: Request<runtime::GetAttestationsRequest>,
3322    ) -> Result<Response<runtime::GetAttestationsResponse>, Status> {
3323        let metadata = request.metadata().clone();
3324        self.require_role(&metadata, Role::Reader).await?;
3325        let req = request.into_inner();
3326        let mut seen: HashSet<Uuid> = HashSet::new();
3327        let mut results = Vec::new();
3328
3329        for raw in req.attestation_ids {
3330            let trimmed = raw.trim();
3331            if trimmed.is_empty() {
3332                continue;
3333            }
3334            let id = Uuid::parse_str(trimmed).map_err(|err| {
3335                invalid_argument(format!("invalid attestation id '{trimmed}': {err}"))
3336            })?;
3337            if !seen.insert(id) {
3338                continue;
3339            }
3340            match self.attestation_vault.fetch(id).await {
3341                Ok(Some(attestation)) => {
3342                    results.push(attestation_to_proto(attestation)?);
3343                }
3344                Ok(None) => {
3345                    // Ignore missing entries so clients can compare counts.
3346                }
3347                Err(err) => {
3348                    return Err(internal_error(format!(
3349                        "failed to load attestation {id}: {err}"
3350                    )));
3351                }
3352            }
3353        }
3354
3355        Ok(Response::new(runtime::GetAttestationsResponse {
3356            attestations: results,
3357        }))
3358    }
3359
3360    async fn sync_eval_pack(
3361        &self,
3362        request: Request<runtime::SyncEvalPackRequest>,
3363    ) -> Result<Response<runtime::SyncEvalPackResponse>, Status> {
3364        let metadata = request.metadata().clone();
3365        self.require_role(&metadata, Role::Writer).await?;
3366
3367        let req = request.into_inner();
3368        let pack = req.pack.trim();
3369        if pack.is_empty() {
3370            return Err(invalid_argument("pack is required"));
3371        }
3372        let pack_name = pack.to_string();
3373
3374        let mut scenario_ids = Vec::with_capacity(req.scenarios.len());
3375        for scenario in req.scenarios {
3376            let slug = scenario.slug.trim();
3377            if slug.is_empty() {
3378                return Err(invalid_argument("scenario slug cannot be empty"));
3379            }
3380            let run_spec_struct = scenario
3381                .run_spec
3382                .ok_or_else(|| invalid_argument("scenario run_spec must be provided"))?;
3383            let expected_struct = scenario
3384                .expected
3385                .ok_or_else(|| invalid_argument("scenario expected payload must be provided"))?;
3386
3387            // prost_struct_to_json yields a serde_json::Value directly.
3388            let run_spec_json = prost_struct_to_json(&run_spec_struct);
3389            let expected_json = prost_struct_to_json(&expected_struct);
3390
3391            let title = scenario.title.trim();
3392            let description = scenario.description.trim();
3393
3394            let mut spec_map = Map::new();
3395            spec_map.insert("pack".to_string(), Value::String(pack_name.clone()));
3396            spec_map.insert("slug".to_string(), Value::String(slug.to_string()));
3397            if !title.is_empty() {
3398                spec_map.insert("title".to_string(), Value::String(title.to_string()));
3399            }
3400            if !description.is_empty() {
3401                spec_map.insert(
3402                    "description".to_string(),
3403                    Value::String(description.to_string()),
3404                );
3405            }
3406            spec_map.insert("run_spec".to_string(), run_spec_json);
3407            spec_map.insert("expected".to_string(), expected_json);
3408            let spec_value = Value::Object(spec_map);
3409
3410            let scenario_name = format!("{pack}/{slug}");
3411            let scenario_id = self
3412                .storage
3413                .upsert_eval_scenario(
3414                    &scenario_name,
3415                    if description.is_empty() {
3416                        None
3417                    } else {
3418                        Some(description)
3419                    },
3420                    spec_value,
3421                )
3422                .await
3423                .map_err(|err| {
3424                    internal_error(format!(
3425                        "failed to upsert eval scenario '{scenario_name}': {err}"
3426                    ))
3427                })?;
3428            scenario_ids.push(scenario_id.to_string());
3429        }
3430
3431        Ok(Response::new(runtime::SyncEvalPackResponse {
3432            scenario_ids,
3433        }))
3434    }
3435
3436    async fn list_eval_scenarios(
3437        &self,
3438        request: Request<runtime::ListEvalScenariosRequest>,
3439    ) -> Result<Response<runtime::ListEvalScenariosResponse>, Status> {
3440        let metadata = request.metadata().clone();
3441        self.require_role(&metadata, Role::Reader).await?;
3442
3443        let scenarios = self
3444            .storage
3445            .list_eval_scenarios()
3446            .await
3447            .map_err(|err| internal_error(format!("failed to list eval scenarios: {err}")))?;
3448
3449        let mut proto_scenarios = Vec::with_capacity(scenarios.len());
3450        for scenario in scenarios {
3451            let spec = parse_eval_spec(&scenario.spec).map_err(|err| {
3452                internal_error(format!(
3453                    "invalid eval scenario '{}' spec: {err}",
3454                    scenario.name
3455                ))
3456            })?;
3457            let history_models = self
3458                .storage
3459                .list_eval_results(scenario.scenario_id, 20)
3460                .await
3461                .map_err(|err| internal_error(format!("failed to fetch eval results: {err}")))?;
3462            let mut history_proto = Vec::with_capacity(history_models.len());
3463            for result in history_models {
3464                history_proto.push(build_eval_result_proto(result)?);
3465            }
3466            proto_scenarios.push(build_eval_scenario_proto(&scenario, &spec, history_proto)?);
3467        }
3468
3469        Ok(Response::new(runtime::ListEvalScenariosResponse {
3470            scenarios: proto_scenarios,
3471        }))
3472    }
3473
3474    async fn record_eval_result(
3475        &self,
3476        request: Request<runtime::RecordEvalResultRequest>,
3477    ) -> Result<Response<runtime::RecordEvalResultResponse>, Status> {
3478        let metadata = request.metadata().clone();
3479        self.require_role(&metadata, Role::Writer).await?;
3480
3481        let req = request.into_inner();
3482        let pack = req.pack.trim();
3483        let slug = req.slug.trim();
3484        if pack.is_empty() {
3485            return Err(invalid_argument("pack is required"));
3486        }
3487        if slug.is_empty() {
3488            return Err(invalid_argument("slug is required"));
3489        }
3490
3491        let scenario_name = format!("{pack}/{slug}");
3492        let scenario = self
3493            .storage
3494            .fetch_eval_scenario_by_name(&scenario_name)
3495            .await
3496            .map_err(|err| internal_error(format!("failed to fetch eval scenario: {err}")))?
3497            .ok_or_else(|| {
3498                Status::not_found(format!("eval scenario '{scenario_name}' not found"))
3499            })?;
3500
3501        let spec = parse_eval_spec(&scenario.spec).map_err(|err| {
3502            internal_error(format!(
3503                "invalid eval scenario '{}' spec: {err}",
3504                scenario.name
3505            ))
3506        })?;
3507
3508        let run_uuid = Uuid::parse_str(&req.run_id)
3509            .map_err(|err| invalid_argument(format!("invalid run_id: {err}")))?;
3510        let run = self
3511            .storage
3512            .fetch_run(run_uuid)
3513            .await
3514            .map_err(|err| internal_error(format!("failed to fetch run: {err}")))?
3515            .ok_or_else(|| Status::not_found("run not found"))?;
3516        let steps = self
3517            .storage
3518            .fetch_steps_for_run(run_uuid)
3519            .await
3520            .map_err(|err| internal_error(format!("failed to fetch steps: {err}")))?;
3521        let last_event = self
3522            .storage
3523            .fetch_last_outbox_event(run_uuid)
3524            .await
3525            .map_err(|err| internal_error(format!("failed to fetch run events: {err}")))?;
3526
3527        let previous = self
3528            .storage
3529            .latest_eval_result(scenario.scenario_id)
3530            .await
3531            .map_err(|err| {
3532                internal_error(format!("failed to fetch previous eval result: {err}"))
3533            })?;
3534        let previous_metrics = previous
3535            .as_ref()
3536            .filter(|result| result.run_id != run_uuid)
3537            .map(|result| &result.metrics);
3538
3539        let commit = req.commit.trim();
3540        let commit = if commit.is_empty() {
3541            None
3542        } else {
3543            Some(commit.to_string())
3544        };
3545
3546        let (outcome, metrics) = compute_eval_metrics(
3547            &spec,
3548            &run,
3549            &steps,
3550            last_event.as_ref().map(|event| event.created_at),
3551            previous_metrics,
3552            commit.clone(),
3553        )
3554        .map_err(|err| internal_error(format!("failed to compute eval metrics: {err}")))?;
3555
3556        let stored = self
3557            .storage
3558            .insert_eval_result(scenario.scenario_id, run_uuid, &outcome, metrics)
3559            .await
3560            .map_err(|err| internal_error(format!("failed to persist eval result: {err}")))?;
3561
3562        let proto = build_eval_result_proto(stored)?;
3563        Ok(Response::new(runtime::RecordEvalResultResponse {
3564            result: Some(proto),
3565        }))
3566    }
3567
3568    async fn check_change_gate(
3569        &self,
3570        request: Request<runtime::CheckChangeGateRequest>,
3571    ) -> Result<Response<runtime::CheckChangeGateResponse>, Status> {
3572        let metadata = request.metadata().clone();
3573        let role = self.require_role(&metadata, Role::Writer).await?;
3574        let req = request.into_inner();
3575
3576        let change_id = req.change_id.trim();
3577        if change_id.is_empty() {
3578            return Err(invalid_argument("change_id is required"));
3579        }
3580
3581        let revision = req.revision.trim();
3582        let revision = if revision.is_empty() {
3583            None
3584        } else {
3585            Some(revision.to_string())
3586        };
3587
3588        let actor_header = Self::actor_from_metadata(&metadata, role);
3589        let actor = if req.actor.trim().is_empty() {
3590            actor_header
3591        } else {
3592            req.actor.trim().to_string()
3593        };
3594
3595        let diffs = req
3596            .diffs
3597            .into_iter()
3598            .map(|diff| GateDiff {
3599                path: diff.path,
3600                lines_added: diff.lines_added,
3601                lines_deleted: diff.lines_deleted,
3602                novelty_score: diff.novelty_score,
3603                risk_tags: diff.risk_tags,
3604                component: if diff.component.is_empty() {
3605                    None
3606                } else {
3607                    Some(diff.component)
3608                },
3609            })
3610            .collect();
3611
3612        let coverage = req
3613            .coverage
3614            .map(|report| GateCoverageReport {
3615                overall_ratio: report.overall_ratio,
3616                minimum_ratio: report.minimum_ratio,
3617                components: report
3618                    .components
3619                    .into_iter()
3620                    .map(|component| GateCoverageComponent {
3621                        name: component.name,
3622                        coverage_ratio: component.coverage_ratio,
3623                        required_ratio: component.required_ratio,
3624                    })
3625                    .collect(),
3626            })
3627            .unwrap_or_default();
3628
3629        let evals = req
3630            .evals
3631            .into_iter()
3632            .map(|metric| GateEvalMetric {
3633                name: metric.name,
3634                slug: if metric.slug.is_empty() {
3635                    None
3636                } else {
3637                    Some(metric.slug)
3638                },
3639                score: metric.score,
3640                threshold: metric.threshold,
3641                direction: match runtime::MetricDirection::try_from(metric.direction) {
3642                    Ok(runtime::MetricDirection::LowerIsBetter) => {
3643                        GateMetricDirection::LowerIsBetter
3644                    }
3645                    _ => GateMetricDirection::HigherIsBetter,
3646                },
3647                weight: metric.weight,
3648            })
3649            .collect();
3650
3651        let budgets = req
3652            .budgets
3653            .into_iter()
3654            .map(|budget| GateBudgetSignal {
3655                name: budget.name,
3656                actual: budget.actual,
3657                limit: budget.limit,
3658            })
3659            .collect();
3660
3661        let telemetry_value = struct_to_value(req.telemetry);
3662        let metadata_value = struct_to_value(req.metadata);
3663
3664        let gate_request = GateRequest {
3665            change_id: change_id.to_string(),
3666            gate_id: None,
3667            revision: revision.clone(),
3668            diffs,
3669            coverage,
3670            evals,
3671            budgets,
3672            telemetry: telemetry_value.clone(),
3673            metadata: metadata_value.clone(),
3674            actor: Some(actor.clone()),
3675        };
3676
3677        let mut decision = self
3678            .changeops
3679            .evaluate(&gate_request)
3680            .map_err(|err| invalid_argument(err.to_string()))?;
3681
3682        let gate_id = Uuid::new_v4();
3683        let mut stored_request = gate_request.clone();
3684        stored_request.gate_id = Some(gate_id);
3685
3686        let decision_payload = decision.to_value(&stored_request);
3687        let decision_bytes = serde_json::to_vec(&decision_payload).map_err(|err| {
3688            internal_error(format!("failed to serialise change gate decision: {err}"))
3689        })?;
3690
3691        let artifact_meta = self
3692            .storage
3693            .artifacts()
3694            .put(
3695                Bytes::from(decision_bytes),
3696                "application/json",
3697                json!({
3698                    "kind": "change_gate_decision",
3699                    "change_id": change_id,
3700                    "effect": decision.effect.as_str(),
3701                    "gate_id": gate_id.to_string(),
3702                }),
3703            )
3704            .await
3705            .map_err(|err| {
3706                internal_error(format!("failed to persist change gate artifact: {err}"))
3707            })?;
3708
3709        let mut attestation_set: HashSet<Uuid> =
3710            attestation_ids_from_value(&stored_request.telemetry)
3711                .into_iter()
3712                .collect();
3713        attestation_set.extend(attestation_ids_from_value(&decision_payload));
3714        let attestation_refs: Vec<Uuid> = attestation_set.into_iter().collect();
3715
3716        if !attestation_refs.is_empty() && self.transparency_scope.includes_gates() {
3717            let artifact_sha = hex::encode(artifact_meta.sha256);
3718            if self.transparency_writer_enabled {
3719                let payload =
3720                    models::TransparencyJobPayload::ReleaseBundle(models::ReleaseBundlePayload {
3721                        change_id: change_id.to_string(),
3722                        gate_id,
3723                        artifact_sha256: artifact_sha.clone(),
3724                        attestation_ids: attestation_refs.clone(),
3725                        metadata: decision.metadata.clone(),
3726                    });
3727                match self.storage.enqueue_transparency_job(payload).await {
3728                    Ok(job_id) => {
3729                        let queued = json!({
3730                            "status": "queued",
3731                            "job_id": job_id,
3732                            "artifact_sha256": artifact_sha,
3733                            "attestation_ids": attestation_refs.iter().map(|id| id.to_string()).collect::<Vec<_>>(),
3734                        });
3735                        if let Value::Object(mut meta_obj) = decision.metadata.take() {
3736                            meta_obj.insert("scitt_entry".to_string(), queued);
3737                            decision.metadata = Value::Object(meta_obj);
3738                        } else {
3739                            decision.metadata = json!({ "scitt_entry": queued });
3740                        }
3741                    }
3742                    Err(err) => {
3743                        warn!(
3744                            change = change_id,
3745                            gate = %gate_id,
3746                            error = %err,
3747                            "failed to enqueue transparency job; falling back to synchronous publish"
3748                        );
3749                        self.publish_scitt_entry(
3750                            change_id,
3751                            gate_id,
3752                            &artifact_sha,
3753                            &attestation_refs,
3754                            &mut decision.metadata,
3755                        )
3756                        .await?;
3757                    }
3758                }
3759            } else {
3760                self.publish_scitt_entry(
3761                    change_id,
3762                    gate_id,
3763                    &artifact_sha,
3764                    &attestation_refs,
3765                    &mut decision.metadata,
3766                )
3767                .await?;
3768            }
3769        }
3770
3771        let stored_gate = self
3772            .storage
3773            .insert_change_gate(models::NewChangeGate {
3774                gate_id,
3775                change_id: change_id.to_string(),
3776                revision: revision.clone(),
3777                effect: decision.effect.as_str().to_string(),
3778                reasons: serde_json::to_value(&decision.reasons).map_err(|err| {
3779                    internal_error(format!("failed to serialise change gate reasons: {err}"))
3780                })?,
3781                followups: serde_json::to_value(&decision.followups).map_err(|err| {
3782                    internal_error(format!("failed to serialise change gate followups: {err}"))
3783                })?,
3784                scorecard: serde_json::to_value(&decision.scorecard).map_err(|err| {
3785                    internal_error(format!("failed to serialise change gate scorecard: {err}"))
3786                })?,
3787                decided_at: decision.decided_at,
3788                decided_by: Some(actor.clone()),
3789                metadata: decision.metadata.clone(),
3790                input: serde_json::to_value(&stored_request).map_err(|err| {
3791                    internal_error(format!("failed to serialise change gate request: {err}"))
3792                })?,
3793                telemetry: stored_request.telemetry.clone(),
3794                artifact_sha256: Some(artifact_meta.sha256),
3795            })
3796            .await
3797            .map_err(|err| {
3798                internal_error(format!("failed to persist change gate decision: {err}"))
3799            })?;
3800
3801        let audit_payload = decision_payload.clone();
3802        self.audit
3803            .append(&actor, None, "change_gate.check", &audit_payload)
3804            .await
3805            .map_err(|err| {
3806                internal_error(format!(
3807                    "failed to append audit trail for change gate: {err}"
3808                ))
3809            })?;
3810
3811        tracing::info!(
3812            target: "fleetforge.changeops",
3813            gate = %gate_id,
3814            change = %change_id,
3815            effect = decision.effect.as_str(),
3816            reasons = ?decision.reasons,
3817            "change gate evaluated"
3818        );
3819
3820        let proto_decision = build_proto_change_gate(
3821            stored_gate.gate_id,
3822            change_id,
3823            revision.as_deref(),
3824            &decision,
3825            &stored_gate.telemetry,
3826            &[],
3827        )?;
3828
3829        Ok(Response::new(runtime::CheckChangeGateResponse {
3830            decision: Some(proto_decision),
3831        }))
3832    }
3833
3834    async fn record_gate_followup(
3835        &self,
3836        request: Request<runtime::RecordGateFollowupRequest>,
3837    ) -> Result<Response<runtime::RecordGateFollowupResponse>, Status> {
3838        let metadata = request.metadata().clone();
3839        let role = self.require_role(&metadata, Role::Writer).await?;
3840        let req = request.into_inner();
3841
3842        let gate_id = Uuid::parse_str(&req.gate_id)
3843            .map_err(|err| invalid_argument(format!("invalid gate_id: {err}")))?;
3844
3845        let gate = self
3846            .storage
3847            .fetch_change_gate(gate_id)
3848            .await
3849            .map_err(|err| internal_error(format!("failed to fetch change gate: {err}")))?
3850            .ok_or_else(|| Status::not_found("change gate not found"))?;
3851
3852        let actor_header = Self::actor_from_metadata(&metadata, role);
3853        let actor = if req.actor.trim().is_empty() {
3854            actor_header
3855        } else {
3856            req.actor.trim().to_string()
3857        };
3858
3859        let outcome = runtime::FollowupOutcome::try_from(req.outcome)
3860            .unwrap_or(runtime::FollowupOutcome::Approved);
3861        let outcome_gate = proto_followup_outcome(outcome);
3862
3863        let followup_id = Uuid::new_v4();
3864        let note = req.note;
3865        let details_value = struct_to_value(req.details);
3866
3867        let stored_followup = self
3868            .storage
3869            .insert_change_gate_followup(models::NewChangeGateFollowup {
3870                followup_id,
3871                gate_id,
3872                actor: actor.clone(),
3873                outcome: outcome_gate.as_str().to_string(),
3874                note: note.clone(),
3875                details: details_value.clone(),
3876            })
3877            .await
3878            .map_err(|err| {
3879                internal_error(format!("failed to persist change gate follow-up: {err}"))
3880            })?;
3881
3882        let record_payload = ChangeFollowupRecord {
3883            gate_id,
3884            actor: actor.clone(),
3885            outcome: outcome_gate,
3886            note: note.clone(),
3887            details: stored_followup.details.clone(),
3888            recorded_at: stored_followup.recorded_at,
3889        };
3890
3891        self.audit
3892            .append(
3893                &actor,
3894                None,
3895                "change_gate.followup",
3896                &record_payload.to_value(),
3897            )
3898            .await
3899            .map_err(|err| {
3900                internal_error(format!(
3901                    "failed to append change gate follow-up audit: {err}"
3902                ))
3903            })?;
3904
3905        tracing::info!(
3906            target: "fleetforge.changeops",
3907            gate = %gate_id,
3908            followup = %followup_id,
3909            outcome = %stored_followup.outcome,
3910            "change gate follow-up recorded"
3911        );
3912
3913        let proto_followup = gate_followup_record_to_proto(&stored_followup);
3914        Ok(Response::new(runtime::RecordGateFollowupResponse {
3915            followup: Some(proto_followup),
3916        }))
3917    }
3918
3919    async fn list_change_gates(
3920        &self,
3921        request: Request<runtime::ListChangeGatesRequest>,
3922    ) -> Result<Response<runtime::ListChangeGatesResponse>, Status> {
3923        let metadata = request.metadata().clone();
3924        self.require_role(&metadata, Role::Reader).await?;
3925        let req = request.into_inner();
3926
3927        let change_id = req.change_id.trim();
3928        if change_id.is_empty() {
3929            return Err(invalid_argument("change_id is required"));
3930        }
3931
3932        let limit = if req.limit == 0 { 5 } else { req.limit.min(50) } as i64;
3933
3934        let gates = self
3935            .storage
3936            .list_change_gates_for_change(change_id, limit)
3937            .await
3938            .map_err(|err| {
3939                internal_error(format!("failed to fetch change gate decisions: {err}"))
3940            })?;
3941
3942        let mut decisions = Vec::with_capacity(gates.len());
3943        for gate in gates {
3944            let followups = self
3945                .storage
3946                .fetch_change_gate_followups(gate.gate_id)
3947                .await
3948                .map_err(|err| {
3949                    internal_error(format!("failed to fetch change gate follow-ups: {err}"))
3950                })?;
3951
3952            let effect = gate_effect_from_str(&gate.effect);
3953            let reasons = parse_reasons(&gate.reasons);
3954            let followup_actions = parse_followup_actions(&gate.followups);
3955            let scorecard = parse_scorecard(&gate.scorecard);
3956
3957            let gate_decision = GateDecision {
3958                effect,
3959                reasons,
3960                followups: followup_actions,
3961                scorecard,
3962                decided_at: gate.decided_at,
3963                metadata: gate.metadata.clone(),
3964            };
3965
3966            let proto_decision = build_proto_change_gate(
3967                gate.gate_id,
3968                &gate.change_id,
3969                gate.revision.as_deref(),
3970                &gate_decision,
3971                &gate.telemetry,
3972                &followups,
3973            )?;
3974            decisions.push(proto_decision);
3975        }
3976
3977        Ok(Response::new(runtime::ListChangeGatesResponse {
3978            decisions,
3979        }))
3980    }
3981}
3982
3983/// Starts the API servers (gRPC + HTTP) backed by the provided storage handle.
3984#[instrument(skip_all)]
3985pub async fn start_api_server(storage: Arc<Storage>, audit: Arc<Audit>) -> Result<()> {
3986    let auth = Arc::new(AuthHandle::from_env().await?);
3987
3988    let endpoints = resolve_endpoint_config()?;
3989    let cors_config = cors_config_from_env();
3990    let transparency = transparency_log_from_env(Arc::clone(&storage))?;
3991    let transparency_scope = transparency_scope_from_env();
3992    let transparency_writer_enabled = transparency_writer_enabled_flag();
3993    let writer_interval = transparency_writer_interval();
3994
3995    if transparency_writer_enabled && !matches!(transparency_scope, TransparencyScope::Disabled) {
3996        info!(
3997            "transparency writer enabled (interval={}s)",
3998            writer_interval.as_secs()
3999        );
4000        let writer_storage = Arc::clone(&storage);
4001        let writer_log = Arc::clone(&transparency);
4002        tokio::spawn(async move {
4003            let writer = TransparencyWriter::new(writer_storage, writer_log, writer_interval);
4004            writer.run().await;
4005        });
4006    }
4007
4008    let cors_summary = cors_config
4009        .as_ref()
4010        .map(|cfg| {
4011            if cfg.display.is_empty() {
4012                "[]".to_string()
4013            } else {
4014                cfg.display.join(", ")
4015            }
4016        })
4017        .unwrap_or_else(|| "disabled".to_string());
4018
4019    info!(
4020        grpc_endpoint = %format!("http://{}", endpoints.grpc_addr),
4021        http_endpoint = %format!("http://{}", endpoints.http_addr),
4022        cors_allowlist = %cors_summary,
4023        "runtime API configuration ready"
4024    );
4025
4026    if endpoints.combined {
4027        run_combined_server(
4028            endpoints.grpc_addr,
4029            storage,
4030            auth,
4031            audit,
4032            transparency,
4033            transparency_writer_enabled,
4034            transparency_scope,
4035            cors_config,
4036        )
4037        .await
4038    } else {
4039        let grpc_storage = Arc::clone(&storage);
4040        let http_storage = Arc::clone(&storage);
4041        let http_auth = Arc::clone(&auth);
4042        let grpc_transparency = Arc::clone(&transparency);
4043        tokio::try_join!(
4044            run_grpc_server(
4045                endpoints.grpc_addr,
4046                grpc_storage,
4047                Arc::clone(&auth),
4048                Arc::clone(&audit),
4049                grpc_transparency,
4050                transparency_writer_enabled,
4051                transparency_scope,
4052                cors_config.clone(),
4053            ),
4054            run_http_server(
4055                endpoints.http_addr,
4056                http_storage,
4057                http_auth,
4058                Arc::clone(&audit),
4059                cors_config,
4060            ),
4061        )?;
4062        Ok(())
4063    }
4064}
4065
4066#[derive(Clone, Copy, Debug)]
4067struct EndpointConfig {
4068    grpc_addr: SocketAddr,
4069    http_addr: SocketAddr,
4070    combined: bool,
4071}
4072
4073fn resolve_endpoint_config() -> Result<EndpointConfig> {
4074    if let Ok(port_raw) = std::env::var("PORT") {
4075        let port: u16 = port_raw.parse().context("invalid PORT value")?;
4076        let addr = SocketAddr::from(([0, 0, 0, 0], port));
4077        return Ok(EndpointConfig {
4078            grpc_addr: addr,
4079            http_addr: addr,
4080            combined: true,
4081        });
4082    }
4083
4084    let grpc_addr: SocketAddr = std::env::var("FLEETFORGE_API_ADDR")
4085        .unwrap_or_else(|_| "0.0.0.0:50051".to_string())
4086        .parse()
4087        .context("invalid FLEETFORGE_API_ADDR")?;
4088    let http_addr: SocketAddr = std::env::var("FLEETFORGE_HTTP_ADDR")
4089        .unwrap_or_else(|_| "0.0.0.0:8080".to_string())
4090        .parse()
4091        .context("invalid FLEETFORGE_HTTP_ADDR")?;
4092
4093    Ok(EndpointConfig {
4094        grpc_addr,
4095        http_addr,
4096        combined: grpc_addr == http_addr,
4097    })
4098}
4099
4100#[instrument(skip_all)]
4101async fn run_combined_server(
4102    addr: SocketAddr,
4103    storage: Arc<Storage>,
4104    auth: Arc<AuthHandle>,
4105    audit: Arc<Audit>,
4106    transparency: Arc<dyn TransparencyLog>,
4107    transparency_writer_enabled: bool,
4108    transparency_scope: TransparencyScope,
4109    cors: Option<CorsConfig>,
4110) -> Result<()> {
4111    let listener = TcpListener::bind(addr)
4112        .await
4113        .with_context(|| format!("failed to bind listener on {addr}"))?;
4114
4115    let attestation_vault: Arc<dyn AttestationVault> = Arc::new(ObjectStoreAttestationVault::new(
4116        storage.object_store(),
4117        storage.pool().clone(),
4118    ));
4119    let runtime_api = RuntimeApiService::new(
4120        Arc::clone(&storage),
4121        Arc::clone(&auth),
4122        Arc::clone(&audit),
4123        Arc::clone(&attestation_vault),
4124        Arc::clone(&transparency),
4125        transparency_writer_enabled,
4126        transparency_scope,
4127    );
4128    let tap_api = TapApiService::new(Arc::clone(&storage), Arc::clone(&auth));
4129
4130    let (mut health_reporter, health_service) = health_reporter();
4131    health_reporter
4132        .set_service_status(
4133            "fleetforge.runtime.v1.RuntimeService",
4134            tonic_health::ServingStatus::Serving,
4135        )
4136        .await; // API returns () so there is nothing to convert via map_err.
4137    health_reporter
4138        .set_service_status(
4139            "fleetforge.tap.v2.TapService",
4140            tonic_health::ServingStatus::Serving,
4141        )
4142        .await;
4143    health_reporter
4144        .set_service_status("", tonic_health::ServingStatus::Serving)
4145        .await;
4146
4147    let runtime_service = RuntimeServiceServer::new(runtime_api);
4148    let runtime_service = tonic_web::enable(runtime_service);
4149    let tap_service = TapServiceServer::new(tap_api);
4150    let tap_service = tonic_web::enable(tap_service);
4151
4152    let reflection = ReflectionBuilder::configure()
4153        .register_encoded_file_descriptor_set(runtime::FILE_DESCRIPTOR_SET)
4154        .build()
4155        .context("failed to build reflection service")?;
4156
4157    let cors_layer = cors.as_ref().map(|cfg| cfg.to_layer());
4158    let http_router = {
4159        let router = http::router(Arc::clone(&storage), Arc::clone(&auth), Arc::clone(&audit));
4160        match cors_layer.clone() {
4161            Some(layer) => router.layer(layer.clone()),
4162            None => router,
4163        }
4164    };
4165    let mut builder = Server::builder().accept_http1(true);
4166
4167    let grpc_service = builder
4168        .add_service(health_service)
4169        .add_service(reflection)
4170        .add_service(runtime_service)
4171        .add_service(tap_service)
4172        .into_service();
4173
4174    let grpc_service = BoxCloneService::new(grpc_service);
4175
4176    info!(%addr, "starting runtime API server (single listener)");
4177
4178    let make_service = make_service_fn(move |_conn| {
4179        let http_router = http_router.clone();
4180        let grpc_service = grpc_service.clone();
4181
4182        async move {
4183            Ok::<_, Infallible>(service_fn(move |req: HyperRequest<Body>| {
4184                let http_router = http_router.clone();
4185                let mut grpc_service = grpc_service.clone();
4186
4187                async move {
4188                    if is_grpc_request(&req) {
4189                        tower::Service::call(&mut grpc_service, req).await
4190                    } else {
4191                        let router = http_router.clone();
4192                        let response = match router.oneshot(req).await {
4193                            Ok(resp) => resp,
4194                            Err(err) => match err {},
4195                        }
4196                        .map(|body| {
4197                            body.map_err(|err| Status::internal(err.to_string()))
4198                                .boxed_unsync()
4199                        });
4200                        Ok::<_, Box<dyn std::error::Error + Send + Sync>>(response)
4201                    }
4202                }
4203            }))
4204        }
4205    });
4206
4207    let std_listener = listener
4208        .into_std()
4209        .context("failed to convert listener for hyper server")?;
4210    HyperServer::from_tcp(std_listener)
4211        .context("failed to build runtime API server")?
4212        .serve(make_service)
4213        .await
4214        .context("runtime API server exited unexpectedly")
4215}
4216
4217fn is_grpc_request(req: &HyperRequest<Body>) -> bool {
4218    if let Some(content_type) = req
4219        .headers()
4220        .get(header::CONTENT_TYPE)
4221        .and_then(|value| value.to_str().ok())
4222    {
4223        let lower = content_type.to_ascii_lowercase();
4224        if lower.starts_with("application/grpc")
4225            || lower.starts_with("application/grpc-web")
4226            || lower.starts_with("application/connect+")
4227        {
4228            return true;
4229        }
4230    }
4231
4232    let path = req.uri().path();
4233    path.starts_with("/fleetforge.runtime.v1.")
4234        || path.starts_with("/fleetforge.tap.v2.")
4235        || path.starts_with("/grpc.health.v1.")
4236        || path.starts_with("/grpc.reflection.v1alpha.")
4237}
4238
4239#[instrument(skip_all)]
4240async fn run_grpc_server(
4241    addr: SocketAddr,
4242    storage: Arc<Storage>,
4243    auth: Arc<AuthHandle>,
4244    audit: Arc<Audit>,
4245    transparency: Arc<dyn TransparencyLog>,
4246    transparency_writer_enabled: bool,
4247    transparency_scope: TransparencyScope,
4248    cors: Option<CorsConfig>,
4249) -> Result<()> {
4250    let attestation_vault: Arc<dyn AttestationVault> = Arc::new(ObjectStoreAttestationVault::new(
4251        storage.object_store(),
4252        storage.pool().clone(),
4253    ));
4254    let runtime_api = RuntimeApiService::new(
4255        Arc::clone(&storage),
4256        Arc::clone(&auth),
4257        Arc::clone(&audit),
4258        Arc::clone(&attestation_vault),
4259        transparency,
4260        transparency_writer_enabled,
4261        transparency_scope,
4262    );
4263    let tap_api = TapApiService::new(storage, auth);
4264
4265    let (mut health_reporter, health_service) = health_reporter();
4266    let reflection = ReflectionBuilder::configure()
4267        .register_encoded_file_descriptor_set(runtime::FILE_DESCRIPTOR_SET)
4268        .build()
4269        .context("failed to build reflection service")?;
4270
4271    health_reporter
4272        .set_service_status(
4273            "fleetforge.runtime.v1.RuntimeService",
4274            tonic_health::ServingStatus::Serving,
4275        )
4276        .await; // set_service_status returns ()
4277    health_reporter
4278        .set_service_status(
4279            "fleetforge.tap.v2.TapService",
4280            tonic_health::ServingStatus::Serving,
4281        )
4282        .await;
4283    health_reporter
4284        .set_service_status("", tonic_health::ServingStatus::Serving)
4285        .await;
4286
4287    info!(%addr, "starting runtime gRPC API server");
4288
4289    let runtime_service = RuntimeServiceServer::new(runtime_api);
4290    let runtime_service = tonic_web::enable(runtime_service);
4291    let tap_service = TapServiceServer::new(tap_api);
4292    let tap_service = tonic_web::enable(tap_service);
4293    let cors_layer = cors.as_ref().map(|cfg| cfg.to_layer());
4294    Server::builder()
4295        .accept_http1(true)
4296        .add_service(health_service)
4297        .add_service(reflection)
4298        .add_service(runtime_service)
4299        .add_service(tap_service)
4300        .serve(addr)
4301        .await
4302        .context("runtime gRPC API server exited unexpectedly")
4303}
4304
4305#[instrument(skip_all)]
4306async fn run_http_server(
4307    addr: SocketAddr,
4308    storage: Arc<Storage>,
4309    auth: Arc<AuthHandle>,
4310    audit: Arc<Audit>,
4311    cors: Option<CorsConfig>,
4312) -> Result<()> {
4313    let listener = TcpListener::bind(addr)
4314        .await
4315        .with_context(|| format!("failed to bind HTTP listener on {addr}"))?;
4316
4317    // Allow explicit opt-in for local UI origins; default is no cross-origin access.
4318    let cors_layer = cors.as_ref().map(|cfg| cfg.to_layer());
4319    let router = http::router(storage, auth, audit);
4320    let app = match cors_layer {
4321        Some(layer) => router.layer(layer),
4322        None => router,
4323    };
4324
4325    info!(%addr, "starting runtime HTTP API server");
4326
4327    let std_listener = listener
4328        .into_std()
4329        .context("failed to convert HTTP listener for hyper server")?;
4330    HyperServer::from_tcp(std_listener)
4331        .context("failed to build runtime HTTP server")?
4332        .serve(app.into_make_service())
4333        .await
4334        .context("runtime HTTP API server exited unexpectedly")
4335}
4336
4337#[derive(Clone, Debug)]
4338struct CorsConfig {
4339    origins: Vec<HeaderValue>,
4340    display: Vec<String>,
4341}
4342
4343impl CorsConfig {
4344    fn to_layer(&self) -> CorsLayer {
4345        CorsLayer::new()
4346            .allow_origin(self.origins.clone())
4347            .allow_methods([Method::GET, Method::POST, Method::OPTIONS])
4348            .allow_headers([header::CONTENT_TYPE, header::AUTHORIZATION])
4349    }
4350}
4351
4352fn cors_config_from_env() -> Option<CorsConfig> {
4353    let raw = std::env::var("FLEETFORGE_CORS_ALLOW_ORIGINS").ok()?;
4354    let mut origins = Vec::new();
4355    let mut display = Vec::new();
4356    for origin in raw.split(',') {
4357        let trimmed = origin.trim();
4358        if trimmed.is_empty() {
4359            continue;
4360        }
4361        match HeaderValue::from_str(trimmed) {
4362            Ok(value) => {
4363                origins.push(value);
4364                display.push(trimmed.to_string());
4365            }
4366            Err(err) => {
4367                warn!(origin = %trimmed, %err, "invalid origin in FLEETFORGE_CORS_ALLOW_ORIGINS")
4368            }
4369        }
4370    }
4371
4372    if origins.is_empty() {
4373        warn!(
4374            "no valid origins parsed from FLEETFORGE_CORS_ALLOW_ORIGINS; CORS will remain disabled"
4375        );
4376        return None;
4377    }
4378
4379    Some(CorsConfig { origins, display })
4380}
4381
4382fn transparency_writer_enabled_flag() -> bool {
4383    let enabled = matches!(
4384        std::env::var(TRANSPARENCY_WRITER_FLAG)
4385            .ok()
4386            .map(|value| value.trim().to_ascii_lowercase())
4387            .unwrap_or_default()
4388            .as_str(),
4389        "1" | "true" | "yes" | "on"
4390    );
4391    if enabled && !feature_allowed(LicensedFeature::ScittTransparency) {
4392        warn!(
4393            "FLEETFORGE_TRANSPARENCY_WRITER=1 but the current license tier does not include SCITT publishing; disabling writer"
4394        );
4395        return false;
4396    }
4397    enabled
4398}
4399
4400fn transparency_scope_from_env() -> TransparencyScope {
4401    match std::env::var("FLEETFORGE_TRANSPARENCY_SCOPE")
4402        .ok()
4403        .map(|value| value.trim().to_ascii_lowercase())
4404        .as_deref()
4405    {
4406        Some("artifacts") => TransparencyScope::Artifacts,
4407        Some("runs") => TransparencyScope::Runs,
4408        Some("gates") => TransparencyScope::Gates,
4409        Some("disabled") => TransparencyScope::Disabled,
4410        _ => TransparencyScope::Gates,
4411    }
4412}
4413
4414fn transparency_writer_interval() -> Duration {
4415    let interval = std::env::var(TRANSPARENCY_WRITER_INTERVAL_ENV)
4416        .ok()
4417        .and_then(|value| value.parse::<u64>().ok())
4418        .unwrap_or(TRANSPARENCY_WRITER_DEFAULT_INTERVAL_SECS);
4419    Duration::from_secs(interval.max(TRANSPARENCY_WRITER_INTERVAL_MIN_SECS))
4420}
4421
4422fn parse_dag_json(input: &str) -> Result<Value, Status> {
4423    serde_json::from_str(input)
4424        .map_err(|err| invalid_argument(format!("invalid dag_json payload: {err}")))
4425}
4426
4427fn extract_priority(step_obj: &Map<String, Value>) -> i16 {
4428    step_obj
4429        .get("policy")
4430        .and_then(Value::as_object)
4431        .and_then(|policy| policy.get("priority"))
4432        .and_then(Value::as_i64)
4433        .unwrap_or(0)
4434        .clamp(i16::MIN as i64, i16::MAX as i64) as i16
4435}
4436
4437fn build_run_detail(
4438    run: models::Run,
4439    steps: Vec<models::Step>,
4440) -> Result<runtime::RunDetail, Status> {
4441    let (inputs_value, labels_map) = unpack_input_ctx(&run.input_ctx);
4442    let breakpoint_specs = extract_breakpoint_specs(&run.input_ctx);
4443    let inputs_struct = match &inputs_value {
4444        Value::Null => None,
4445        Value::Object(map) if map.is_empty() => None,
4446        Value::Object(_) => Some(json_to_prost_struct(&inputs_value).map_err(|err| {
4447            internal_error(format!("failed to encode inputs for response: {err}"))
4448        })?),
4449        _ => None,
4450    };
4451
4452    let dag_json = serde_json::to_string(&run.dag_json).map_err(|err| {
4453        internal_error(format!("failed to serialise dag_json for response: {err}"))
4454    })?;
4455
4456    let run_spec = runtime::RunSpec {
4457        dag_json,
4458        inputs: inputs_struct,
4459        seed: run.seed,
4460        labels: labels_map,
4461        breakpoints: breakpoint_specs,
4462    };
4463
4464    let step_summaries = steps
4465        .into_iter()
4466        .map(step_to_summary)
4467        .collect::<Result<Vec<_>, Status>>()?;
4468
4469    Ok(runtime::RunDetail {
4470        run_id: run.run_id.to_string(),
4471        status: map_run_status(run.status) as i32,
4472        spec: Some(run_spec),
4473        created_at: Some(to_timestamp(run.created_at)),
4474        updated_at: None,
4475        steps: step_summaries,
4476    })
4477}
4478
4479fn unpack_input_ctx(ctx: &Value) -> (Value, HashMap<String, String>) {
4480    match ctx {
4481        Value::Object(map) => {
4482            let inputs = map
4483                .get("inputs")
4484                .cloned()
4485                .unwrap_or_else(|| Value::Object(Map::new()));
4486            let labels = map
4487                .get("labels")
4488                .map(labels_value_to_map)
4489                .unwrap_or_default();
4490            (inputs, labels)
4491        }
4492        other => (other.clone(), HashMap::new()),
4493    }
4494}
4495
4496fn extract_breakpoint_specs(ctx: &Value) -> Vec<String> {
4497    let Value::Object(map) = ctx else {
4498        return Vec::new();
4499    };
4500    let Some(Value::Object(debug)) = map.get("debug") else {
4501        return Vec::new();
4502    };
4503
4504    if let Some(Value::Array(specs)) = debug.get("breakpoint_specs") {
4505        return specs
4506            .iter()
4507            .filter_map(|value| value.as_str().map(|s| s.to_string()))
4508            .collect();
4509    }
4510
4511    if let Some(Value::Array(entries)) = debug.get("breakpoints") {
4512        return entries
4513            .iter()
4514            .filter_map(|entry| {
4515                entry
4516                    .get("spec")
4517                    .and_then(Value::as_str)
4518                    .map(|s| s.to_string())
4519            })
4520            .collect();
4521    }
4522
4523    Vec::new()
4524}
4525
4526fn load_debug_breakpoints(debug: &Map<String, Value>) -> Vec<(String, Uuid)> {
4527    debug
4528        .get("breakpoints")
4529        .and_then(Value::as_array)
4530        .map(|items| {
4531            items
4532                .iter()
4533                .filter_map(|entry| {
4534                    let obj = entry.as_object()?;
4535                    let spec = obj.get("spec")?.as_str()?.to_string();
4536                    let step_id_str = obj.get("step_id")?.as_str()?;
4537                    let step_id = Uuid::parse_str(step_id_str).ok()?;
4538                    Some((spec, step_id))
4539                })
4540                .collect()
4541        })
4542        .unwrap_or_default()
4543}
4544
4545fn store_debug_breakpoints(debug: &mut Map<String, Value>, entries: &[(String, Uuid)]) {
4546    let list = entries
4547        .iter()
4548        .map(|(spec, step_id)| {
4549            let mut obj = Map::new();
4550            obj.insert("spec".to_string(), Value::String(spec.clone()));
4551            obj.insert("step_id".to_string(), Value::String(step_id.to_string()));
4552            Value::Object(obj)
4553        })
4554        .collect::<Vec<_>>();
4555    debug.insert("breakpoints".to_string(), Value::Array(list));
4556
4557    let specs = entries
4558        .iter()
4559        .map(|(spec, _)| Value::String(spec.clone()))
4560        .collect::<Vec<_>>();
4561    debug.insert("breakpoint_specs".to_string(), Value::Array(specs));
4562}
4563
4564fn load_slug_map(debug: &Map<String, Value>) -> HashMap<String, Uuid> {
4565    debug
4566        .get("slugs")
4567        .and_then(Value::as_object)
4568        .map(|map| {
4569            map.iter()
4570                .filter_map(|(slug, value)| {
4571                    value
4572                        .as_str()
4573                        .and_then(|id| Uuid::parse_str(id).ok())
4574                        .map(|uuid| (slug.clone(), uuid))
4575                })
4576                .collect()
4577        })
4578        .unwrap_or_default()
4579}
4580
4581fn store_slug_map(debug: &mut Map<String, Value>, entries: &[(String, Uuid)]) {
4582    let mut slugs = Map::new();
4583    for (slug, step_id) in entries {
4584        slugs.insert(slug.clone(), Value::String(step_id.to_string()));
4585    }
4586    debug.insert("slugs".to_string(), Value::Object(slugs));
4587}
4588
4589fn extract_active_pause_step_id(debug: &Map<String, Value>) -> Option<Uuid> {
4590    debug
4591        .get("active_pause")
4592        .and_then(|value| value.as_object())
4593        .and_then(|obj| obj.get("step_id"))
4594        .and_then(Value::as_str)
4595        .and_then(|s| Uuid::parse_str(s).ok())
4596}
4597
4598fn merge_json_object(target: &mut Map<String, Value>, patch: &Map<String, Value>) {
4599    for (key, value) in patch {
4600        target.insert(key.clone(), value.clone());
4601    }
4602}
4603
4604fn collect_step_ids(run: &models::Run) -> HashSet<Uuid> {
4605    run.dag_json
4606        .get("steps")
4607        .and_then(Value::as_array)
4608        .map(|steps| {
4609            steps
4610                .iter()
4611                .filter_map(|step| {
4612                    step.get("id")
4613                        .and_then(Value::as_str)
4614                        .and_then(|s| Uuid::parse_str(s).ok())
4615                })
4616                .collect()
4617        })
4618        .unwrap_or_default()
4619}
4620
4621fn collect_slug_entries(run: &models::Run) -> Vec<(String, Uuid)> {
4622    run.dag_json
4623        .get("steps")
4624        .and_then(Value::as_array)
4625        .map(|steps| {
4626            steps
4627                .iter()
4628                .filter_map(|step| {
4629                    let id = step
4630                        .get("id")
4631                        .and_then(Value::as_str)
4632                        .and_then(|s| Uuid::parse_str(s).ok())?;
4633                    let slug = step
4634                        .get("slug")
4635                        .and_then(Value::as_str)
4636                        .map(|s| s.trim().to_string())
4637                        .filter(|s| !s.is_empty())?;
4638                    Some((slug, id))
4639                })
4640                .collect()
4641        })
4642        .unwrap_or_default()
4643}
4644
4645fn build_branch_dag(dag_json: &Value, start: Uuid) -> Result<Value, SubmitRunError> {
4646    let dag_map = dag_json
4647        .as_object()
4648        .cloned()
4649        .ok_or_else(|| SubmitRunError::invalid("dag_json must be a JSON object"))?;
4650    let steps_array = dag_map
4651        .get("steps")
4652        .and_then(Value::as_array)
4653        .ok_or_else(|| SubmitRunError::invalid("dag_json.steps must be an array"))?;
4654    let edges_array = dag_map
4655        .get("edges")
4656        .and_then(Value::as_array)
4657        .ok_or_else(|| SubmitRunError::invalid("dag_json.edges must be an array"))?;
4658
4659    let mut step_defs: HashMap<Uuid, Value> = HashMap::new();
4660    for step in steps_array {
4661        let step_obj = step
4662            .as_object()
4663            .ok_or_else(|| SubmitRunError::invalid("each step must be an object"))?;
4664        let id = step_obj
4665            .get("id")
4666            .and_then(Value::as_str)
4667            .ok_or_else(|| SubmitRunError::invalid("step.id missing"))?;
4668        let step_id = Uuid::parse_str(id)
4669            .map_err(|err| SubmitRunError::invalid(format!("invalid step.id: {err}")))?;
4670        step_defs.insert(step_id, step.clone());
4671    }
4672
4673    if !step_defs.contains_key(&start) {
4674        return Err(SubmitRunError::invalid("from_step references unknown step"));
4675    }
4676
4677    let mut adjacency: HashMap<Uuid, Vec<Uuid>> = HashMap::new();
4678    for edge in edges_array {
4679        let pair = edge
4680            .as_array()
4681            .ok_or_else(|| SubmitRunError::invalid("edges entries must be arrays"))?;
4682        if pair.len() != 2 {
4683            return Err(SubmitRunError::invalid(
4684                "edges entries must contain two members",
4685            ));
4686        }
4687        let from = pair[0]
4688            .as_str()
4689            .ok_or_else(|| SubmitRunError::invalid("edge source must be a string"))?;
4690        let to = pair[1]
4691            .as_str()
4692            .ok_or_else(|| SubmitRunError::invalid("edge target must be a string"))?;
4693        let from_id = Uuid::parse_str(from)
4694            .map_err(|err| SubmitRunError::invalid(format!("invalid edge source: {err}")))?;
4695        let to_id = Uuid::parse_str(to)
4696            .map_err(|err| SubmitRunError::invalid(format!("invalid edge target: {err}")))?;
4697        adjacency.entry(from_id).or_default().push(to_id);
4698    }
4699
4700    let mut queue = VecDeque::new();
4701    let mut visited: HashSet<Uuid> = HashSet::new();
4702    queue.push_back(start);
4703    while let Some(node) = queue.pop_front() {
4704        if !visited.insert(node) {
4705            continue;
4706        }
4707        if let Some(neighbors) = adjacency.get(&node) {
4708            for next in neighbors {
4709                queue.push_back(*next);
4710            }
4711        }
4712    }
4713
4714    let filtered_steps = steps_array
4715        .iter()
4716        .filter_map(|step| {
4717            let id = step
4718                .get("id")
4719                .and_then(Value::as_str)
4720                .and_then(|s| Uuid::parse_str(s).ok());
4721            match id {
4722                Some(step_id) if visited.contains(&step_id) => Some(step.clone()),
4723                _ => None,
4724            }
4725        })
4726        .collect::<Vec<_>>();
4727
4728    let filtered_edges = edges_array
4729        .iter()
4730        .filter(|edge| {
4731            if let Some(pair) = edge.as_array() {
4732                if pair.len() == 2 {
4733                    if let (Some(from), Some(to)) = (
4734                        pair[0].as_str().and_then(|s| Uuid::parse_str(s).ok()),
4735                        pair[1].as_str().and_then(|s| Uuid::parse_str(s).ok()),
4736                    ) {
4737                        return visited.contains(&from) && visited.contains(&to);
4738                    }
4739                }
4740            }
4741            false
4742        })
4743        .cloned()
4744        .collect::<Vec<_>>();
4745
4746    let mut new_dag = dag_json.as_object().cloned().unwrap_or_default();
4747    new_dag.insert("steps".to_string(), Value::Array(filtered_steps));
4748    new_dag.insert("edges".to_string(), Value::Array(filtered_edges));
4749    Ok(Value::Object(new_dag))
4750}
4751
4752fn collect_prompt_packs_json(dag_json: &Value) -> Vec<String> {
4753    match dag_json {
4754        Value::Object(map) => {
4755            let packs = map.get("prompt_packs").or_else(|| map.get("promptPacks"));
4756            match packs {
4757                Some(Value::Array(items)) => items
4758                    .iter()
4759                    .filter_map(|item| item.as_str().map(|s| s.to_string()))
4760                    .collect(),
4761                Some(Value::String(value)) => vec![value.clone()],
4762                _ => Vec::new(),
4763            }
4764        }
4765        _ => Vec::new(),
4766    }
4767}
4768
4769struct DriftComputation {
4770    token_delta: Option<f64>,
4771    cost_delta: Option<f64>,
4772    within_tolerance: bool,
4773}
4774
4775const DEFAULT_TOKEN_TOLERANCE: f64 = 0.0;
4776const DEFAULT_COST_TOLERANCE: f64 = 0.0;
4777const TOKEN_KEYS: &[&str] = &["tokens", "token_count", "total_tokens"];
4778const COST_KEYS: &[&str] = &["cost", "cost_usd", "total_cost"];
4779const SANITIZE_KEYS: &[&str] = &[
4780    "tokens",
4781    "token_count",
4782    "total_tokens",
4783    "cost",
4784    "cost_usd",
4785    "total_cost",
4786];
4787
4788fn wrap_attempt_output(attempt: Option<&models::StepAttempt>, output: &Value) -> Value {
4789    match attempt {
4790        Some(record) => json!({
4791            "attempt": record.attempt,
4792            "status": record.status.as_str(),
4793            "recorded_at": record.recorded_at,
4794            "output": output,
4795        }),
4796        None => output.clone(),
4797    }
4798}
4799
4800fn compute_drift(
4801    recorded: &Value,
4802    current: &Value,
4803    token_tolerance: f64,
4804    cost_tolerance: f64,
4805) -> DriftComputation {
4806    let token_delta = diff_numeric(recorded, current, TOKEN_KEYS);
4807    let cost_delta = diff_numeric(recorded, current, COST_KEYS);
4808
4809    let sanitized_recorded = strip_keys(recorded, SANITIZE_KEYS);
4810    let sanitized_current = strip_keys(current, SANITIZE_KEYS);
4811
4812    let tokens_within = token_delta
4813        .map(|delta| delta.abs() <= token_tolerance)
4814        .unwrap_or(true);
4815    let cost_within = cost_delta
4816        .map(|delta| delta.abs() <= cost_tolerance)
4817        .unwrap_or(true);
4818    let structure_equal = sanitized_recorded == sanitized_current;
4819
4820    DriftComputation {
4821        token_delta,
4822        cost_delta,
4823        within_tolerance: tokens_within && cost_within && structure_equal,
4824    }
4825}
4826
4827fn diff_numeric(recorded: &Value, current: &Value, keys: &[&str]) -> Option<f64> {
4828    let recorded_value = find_numeric(recorded, keys)?;
4829    let current_value = find_numeric(current, keys)?;
4830    Some(current_value - recorded_value)
4831}
4832
4833fn find_numeric(value: &Value, keys: &[&str]) -> Option<f64> {
4834    match value {
4835        Value::Object(map) => {
4836            for key in keys {
4837                if let Some(v) = map.get(*key) {
4838                    if let Some(num) = v.as_f64() {
4839                        return Some(num);
4840                    }
4841                }
4842            }
4843            for v in map.values() {
4844                if let Some(found) = find_numeric(v, keys) {
4845                    return Some(found);
4846                }
4847            }
4848            None
4849        }
4850        Value::Array(items) => {
4851            for item in items {
4852                if let Some(found) = find_numeric(item, keys) {
4853                    return Some(found);
4854                }
4855            }
4856            None
4857        }
4858        _ => None,
4859    }
4860}
4861
4862fn strip_keys(value: &Value, keys: &[&str]) -> Value {
4863    match value {
4864        Value::Object(map) => {
4865            let mut new_map = serde_json::Map::new();
4866            for (k, v) in map {
4867                if keys.iter().any(|key| key == k) {
4868                    continue;
4869                }
4870                new_map.insert(k.clone(), strip_keys(v, keys));
4871            }
4872            Value::Object(new_map)
4873        }
4874        Value::Array(items) => Value::Array(items.iter().map(|v| strip_keys(v, keys)).collect()),
4875        _ => value.clone(),
4876    }
4877}
4878
4879fn step_to_summary(step: models::Step) -> Result<runtime::StepSummary, Status> {
4880    let step_type = step
4881        .spec_json
4882        .get("type")
4883        .and_then(Value::as_str)
4884        .unwrap_or_default()
4885        .to_string();
4886
4887    let output = match step.output_json {
4888        Some(value) => Some(json_to_prost_value(&value).map_err(|err| {
4889            internal_error(format!(
4890                "invalid output payload for step {}: {err}",
4891                step.step_id
4892            ))
4893        })?),
4894        None => None,
4895    };
4896
4897    let error = match step.error_json {
4898        Some(value) => Some(json_to_prost_value(&value).map_err(|err| {
4899            internal_error(format!(
4900                "invalid error payload for step {}: {err}",
4901                step.step_id
4902            ))
4903        })?),
4904        None => None,
4905    };
4906
4907    Ok(runtime::StepSummary {
4908        step_id: step.step_id.to_string(),
4909        idx: step.idx,
4910        r#type: step_type,
4911        status: step.status.as_str().to_string(),
4912        attempt: step.attempt,
4913        leased_at: None,
4914        completed_at: None,
4915        output,
4916        error,
4917    })
4918}
4919
4920fn run_event_from_outbox(event: models::OutboxEvent) -> Result<runtime::RunEvent, Status> {
4921    let payload = json_to_prost_value(&event.payload).map_err(|err| {
4922        internal_error(format!(
4923            "failed to encode outbox payload for event {}: {err}",
4924            event.id
4925        ))
4926    })?;
4927
4928    Ok(runtime::RunEvent {
4929        run_id: event.run_id.to_string(),
4930        step_id: event.step_id.map(|id| id.to_string()).unwrap_or_default(),
4931        kind: event.kind,
4932        payload: Some(payload),
4933        occurred_at: Some(to_timestamp(event.created_at)),
4934    })
4935}
4936
4937fn map_run_status(status: models::RunStatus) -> runtime::RunStatus {
4938    match status {
4939        models::RunStatus::Pending => runtime::RunStatus::Pending,
4940        models::RunStatus::Running => runtime::RunStatus::Running,
4941        models::RunStatus::Succeeded => runtime::RunStatus::Succeeded,
4942        models::RunStatus::Failed => runtime::RunStatus::Failed,
4943        models::RunStatus::Canceled => runtime::RunStatus::Canceled,
4944        models::RunStatus::PausedAtStep => runtime::RunStatus::PausedAtStep,
4945    }
4946}
4947
4948#[derive(Clone, Debug)]
4949pub(crate) struct EvalScenarioSpecData {
4950    pack: String,
4951    slug: String,
4952    title: Option<String>,
4953    description: Option<String>,
4954    run_spec: Value,
4955    expected: Value,
4956}
4957
4958pub(crate) fn parse_eval_spec(spec: &Value) -> anyhow::Result<EvalScenarioSpecData> {
4959    let pack = spec
4960        .get("pack")
4961        .and_then(Value::as_str)
4962        .ok_or_else(|| anyhow!("scenario spec missing 'pack' field"))?
4963        .to_string();
4964    let slug = spec
4965        .get("slug")
4966        .and_then(Value::as_str)
4967        .ok_or_else(|| anyhow!("scenario spec missing 'slug' field"))?
4968        .to_string();
4969    let title = spec
4970        .get("title")
4971        .and_then(Value::as_str)
4972        .map(|value| value.to_string());
4973    let description = spec
4974        .get("description")
4975        .and_then(Value::as_str)
4976        .map(|value| value.to_string());
4977    let run_spec = spec
4978        .get("run_spec")
4979        .cloned()
4980        .ok_or_else(|| anyhow!("scenario spec missing 'run_spec' payload"))?;
4981    if !run_spec.is_object() {
4982        return Err(anyhow!("scenario spec 'run_spec' must be a JSON object"));
4983    }
4984    let expected = spec
4985        .get("expected")
4986        .cloned()
4987        .ok_or_else(|| anyhow!("scenario spec missing 'expected' payload"))?;
4988    if !expected.is_object() {
4989        return Err(anyhow!("scenario spec 'expected' must be a JSON object"));
4990    }
4991
4992    Ok(EvalScenarioSpecData {
4993        pack,
4994        slug,
4995        title,
4996        description,
4997        run_spec,
4998        expected,
4999    })
5000}
5001
5002fn metric_summary(current: f64, previous: Option<f64>) -> Value {
5003    let delta = previous.map(|p| current - p);
5004    json!({
5005        "value": current,
5006        "previous": previous,
5007        "delta": delta,
5008    })
5009}
5010
5011fn extract_metric(metrics: &Value, key: &str) -> Option<f64> {
5012    metrics
5013        .as_object()
5014        .and_then(|map| map.get(key))
5015        .and_then(|entry| entry.get("value"))
5016        .and_then(Value::as_f64)
5017}
5018
5019fn compute_eval_metrics(
5020    spec: &EvalScenarioSpecData,
5021    run: &models::Run,
5022    steps: &[models::Step],
5023    last_event_time: Option<DateTime<Utc>>,
5024    previous_metrics: Option<&Value>,
5025    commit: Option<String>,
5026) -> anyhow::Result<(String, Value)> {
5027    let tokens_actual: f64 = steps
5028        .iter()
5029        .map(|step| {
5030            step.actual_tokens
5031                .map(|value| value as f64)
5032                .unwrap_or_else(|| step.estimated_tokens as f64)
5033        })
5034        .sum();
5035    let cost_actual: f64 = steps
5036        .iter()
5037        .map(|step| step.actual_cost.unwrap_or(step.estimated_cost))
5038        .sum();
5039
5040    let run_end = last_event_time.unwrap_or(run.created_at);
5041    let latency_ms = run_end
5042        .signed_duration_since(run.created_at)
5043        .num_milliseconds()
5044        .max(0) as f64;
5045
5046    let previous_tokens = previous_metrics.and_then(|metrics| extract_metric(metrics, "tokens"));
5047    let previous_cost = previous_metrics.and_then(|metrics| extract_metric(metrics, "cost_usd"));
5048    let previous_latency =
5049        previous_metrics.and_then(|metrics| extract_metric(metrics, "latency_ms"));
5050
5051    let mut differences: Vec<Value> = Vec::new();
5052    if let Some(expected_status) = spec.expected.get("status").and_then(Value::as_str) {
5053        let actual_status = run.status.as_str();
5054        if !expected_status.eq_ignore_ascii_case(actual_status) {
5055            differences.push(json!({
5056                "type": "status",
5057                "expected": expected_status,
5058                "actual": actual_status,
5059            }));
5060        }
5061    }
5062
5063    let mut actual_steps: HashMap<String, Value> = HashMap::with_capacity(steps.len());
5064    for step in steps {
5065        if let Some(output) = step.output_json.clone() {
5066            actual_steps.insert(step.step_id.to_string(), output);
5067        }
5068    }
5069
5070    if let Some(expected_steps) = spec.expected.get("steps").and_then(Value::as_object) {
5071        for (step_id, expected_output) in expected_steps {
5072            match actual_steps.get(step_id) {
5073                Some(actual_output) if actual_output == expected_output => {}
5074                Some(actual_output) => {
5075                    differences.push(json!({
5076                        "type": "step",
5077                        "step_id": step_id,
5078                        "expected": expected_output,
5079                        "actual": actual_output,
5080                    }));
5081                }
5082                None => {
5083                    differences.push(json!({
5084                        "type": "step_missing",
5085                        "step_id": step_id,
5086                        "expected": expected_output,
5087                    }));
5088                }
5089            }
5090        }
5091    }
5092
5093    let quality_matched = differences.is_empty();
5094    let outcome = if quality_matched { "passed" } else { "failed" }.to_string();
5095
5096    let mut metrics_map = Map::new();
5097    if let Some(commit) = commit {
5098        if !commit.is_empty() {
5099            metrics_map.insert("commit".to_string(), Value::String(commit));
5100        }
5101    }
5102    metrics_map.insert(
5103        "tokens".to_string(),
5104        metric_summary(tokens_actual, previous_tokens),
5105    );
5106    metrics_map.insert(
5107        "cost_usd".to_string(),
5108        metric_summary(cost_actual, previous_cost),
5109    );
5110    metrics_map.insert(
5111        "latency_ms".to_string(),
5112        metric_summary(latency_ms, previous_latency),
5113    );
5114    metrics_map.insert(
5115        "quality".to_string(),
5116        Value::Object({
5117            let mut quality = Map::new();
5118            quality.insert("matched".to_string(), Value::Bool(quality_matched));
5119            quality.insert("differences".to_string(), Value::Array(differences));
5120            quality
5121        }),
5122    );
5123
5124    Ok((outcome, Value::Object(metrics_map)))
5125}
5126
5127fn build_eval_result_proto(result: models::EvalResult) -> Result<runtime::EvalResult, Status> {
5128    let metrics_struct = json_to_prost_struct(&result.metrics)
5129        .map_err(|err| internal_error(format!("invalid eval metrics payload: {err}")))?;
5130    let commit = result
5131        .metrics
5132        .get("commit")
5133        .and_then(Value::as_str)
5134        .unwrap_or("")
5135        .to_string();
5136
5137    Ok(runtime::EvalResult {
5138        result_id: result.result_id.to_string(),
5139        run_id: result.run_id.to_string(),
5140        scenario_id: result.scenario_id.to_string(),
5141        outcome: result.outcome,
5142        commit,
5143        executed_at: Some(to_timestamp(result.executed_at)),
5144        metrics: Some(metrics_struct),
5145    })
5146}
5147
5148fn build_eval_scenario_proto(
5149    scenario: &models::EvalScenario,
5150    spec: &EvalScenarioSpecData,
5151    history: Vec<runtime::EvalResult>,
5152) -> Result<runtime::EvalScenario, Status> {
5153    Ok(runtime::EvalScenario {
5154        scenario_id: scenario.scenario_id.to_string(),
5155        pack: spec.pack.clone(),
5156        slug: spec.slug.clone(),
5157        title: spec.title.clone().unwrap_or_else(|| spec.slug.clone()),
5158        description: spec.description.clone().unwrap_or_default(),
5159        created_at: Some(to_timestamp(scenario.created_at)),
5160        history,
5161    })
5162}
5163
5164#[cfg(test)]
5165mod unit_tests {
5166    use super::*;
5167    use chrono::Utc;
5168    use fleetforge_common::prost_json::prost_value_to_json;
5169    use fleetforge_storage::models;
5170    use serde_json::{json, Map, Value};
5171    use std::collections::{HashMap, VecDeque};
5172    use std::time::Duration;
5173    use tonic::Code;
5174    use uuid::Uuid;
5175
5176    #[test]
5177    fn parse_page_token_supports_defaults() {
5178        assert_eq!(parse_page_token("").expect("empty token"), 0);
5179        assert_eq!(parse_page_token("0").expect("zero token"), 0);
5180        assert!(parse_page_token("abc").is_err());
5181    }
5182
5183    #[test]
5184    fn query_filters_normalizes_inputs() {
5185        let run_id = Uuid::new_v4();
5186        let mut request = QueryEventsRequest::default();
5187        request.run_ids.push(run_id.to_string());
5188        request.kinds.push("Step_Succeeded".to_string());
5189        request.workspace_ids.push("Workspace-1".to_string());
5190        request
5191            .policy_effects
5192            .push(PolicyDecisionEffect::PolicyDecisionEffectDeny as i32);
5193        request.severities.push(EventSeverity::Error as i32);
5194
5195        let filters = QueryFiltersOwned::from_request(&request).expect("filters");
5196        assert_eq!(filters.run_ids, vec![run_id]);
5197        assert_eq!(filters.kinds, vec!["step_succeeded"]);
5198        assert_eq!(filters.workspace_ids, vec!["workspace-1"]);
5199        assert_eq!(
5200            filters.policy_effects,
5201            vec![PolicyDecisionEffect::PolicyDecisionEffectDeny]
5202        );
5203        assert_eq!(filters.severities, vec![EventSeverity::Error]);
5204    }
5205
5206    #[test]
5207    fn matches_filters_respects_workspace_and_kind() {
5208        let run_id = Uuid::new_v4();
5209        let mut request = QueryEventsRequest::default();
5210        request.run_ids.push(run_id.to_string());
5211        request.workspace_ids.push("tenant-x".to_string());
5212        request.kinds.push("run_succeeded".to_string());
5213        let filters = QueryFiltersOwned::from_request(&request).expect("filters");
5214
5215        let payload = json!({"trace": {"run": {"workspace_id": "Tenant-X"}}});
5216        let kind = "run_succeeded";
5217        let severity = EventSeverity::Info;
5218        assert!(
5219            matches_filters(&filters, &payload, kind, severity, None),
5220            "matching workspace should pass filter"
5221        );
5222
5223        let other_payload = json!({"trace": {"run": {"workspace_id": "tenant-y"}}});
5224        assert!(
5225            !matches_filters(&filters, &other_payload, kind, severity, None),
5226            "non-matching workspace should fail filter"
5227        );
5228    }
5229
5230    #[test]
5231    fn parse_dag_json_round_trip() {
5232        let dag = r#"{"steps": []}"#;
5233        let parsed = parse_dag_json(dag).expect("valid JSON");
5234        assert_eq!(parsed["steps"], Value::Array(vec![]));
5235
5236        let err = parse_dag_json("not valid json").expect_err("should reject invalid JSON");
5237        assert_eq!(err.code(), Code::InvalidArgument);
5238    }
5239
5240    #[test]
5241    fn extract_priority_clamps_to_i16() {
5242        let mut policy = Map::new();
5243        policy.insert("priority".to_string(), Value::from(40_000));
5244        let mut step = Map::new();
5245        step.insert("policy".to_string(), Value::Object(policy));
5246        let priority = extract_priority(&step);
5247        assert_eq!(priority, i16::MAX);
5248    }
5249
5250    #[test]
5251    fn extract_priority_defaults_to_zero() {
5252        let step = Map::new();
5253        let priority = extract_priority(&step);
5254        assert_eq!(priority, 0);
5255    }
5256
5257    #[test]
5258    fn extract_priority_handles_negative_overflow() {
5259        let mut policy = Map::new();
5260        policy.insert("priority".to_string(), Value::from(-50_000));
5261        let mut step = Map::new();
5262        step.insert("policy".to_string(), Value::Object(policy));
5263        let priority = extract_priority(&step);
5264        assert_eq!(priority, i16::MIN);
5265    }
5266
5267    #[test]
5268    fn labels_map_round_trip() {
5269        let mut map = HashMap::new();
5270        map.insert("team".to_string(), "infra".to_string());
5271        map.insert("env".to_string(), "dev".to_string());
5272        let value = labels_map_to_value(&map);
5273        let round_trip = labels_value_to_map(&value);
5274        assert_eq!(map, round_trip);
5275    }
5276
5277    #[test]
5278    fn build_run_detail_converts_payloads() {
5279        let run_id = Uuid::new_v4();
5280        let step_id = Uuid::new_v4();
5281        let now = Utc::now();
5282
5283        let run = models::Run {
5284            run_id,
5285            created_at: now,
5286            status: models::RunStatus::Pending,
5287            dag_json: json!({"steps": []}),
5288            input_ctx: json!({
5289                "inputs": {"foo": "bar"},
5290                "labels": {"team": "infra"}
5291            }),
5292            seed: 7,
5293            idempotency_key: Some("abc".into()),
5294        };
5295
5296        let step = models::Step {
5297            run_id,
5298            step_id,
5299            idx: 0,
5300            priority: 0,
5301            spec_json: json!({"id": step_id, "type": "tool", "inputs": {}, "policy": {}}),
5302            status: models::StepStatus::Succeeded,
5303            attempt: 1,
5304            created_at: now,
5305            pending_dependencies: 0,
5306            total_dependencies: 0,
5307            estimated_tokens: 0,
5308            estimated_cost: 0.0,
5309            actual_tokens: Some(10),
5310            actual_cost: Some(0.01),
5311            leased_by: None,
5312            lease_expires_at: None,
5313            output_json: Some(json!({"answer": 42})),
5314            error_json: Some(json!({"message": "none"})),
5315            input_snapshot: None,
5316            output_snapshot: None,
5317            provider: Some("test".into()),
5318            provider_version: Some("1.0".into()),
5319        };
5320
5321        let detail = build_run_detail(run, vec![step]).expect("build run detail");
5322        let spec = detail.spec.expect("spec present");
5323        assert_eq!(spec.dag_json, r#"{"steps":[]}"#);
5324        assert_eq!(spec.seed, 7);
5325        assert_eq!(spec.labels.get("team"), Some(&"infra".to_string()));
5326
5327        let step_summary = &detail.steps[0];
5328        assert_eq!(step_summary.step_id, step_id.to_string());
5329        assert_eq!(step_summary.status, "succeeded");
5330        let output_value = prost_value_to_json(step_summary.output.as_ref().unwrap());
5331        assert_eq!(output_value, json!({"answer": 42}));
5332    }
5333
5334    #[test]
5335    fn subscription_config_parses_filters() {
5336        let run_id = Uuid::new_v4().to_string();
5337        let step_id = Uuid::new_v4().to_string();
5338        let open = SubscribeRunOpen {
5339            run_ids: vec![run_id.clone()],
5340            step_ids: vec![step_id.clone()],
5341            kinds: vec!["step_succeeded".to_string()],
5342            severities: vec![EventSeverity::Warn as i32],
5343            tags: vec!["team:core".to_string()],
5344            since_offset: 10,
5345            batch_size: 32,
5346            max_inflight: 20,
5347            heartbeat_interval_ms: 5_000,
5348        };
5349
5350        let config = SubscriptionConfig::try_from_open(open).expect("valid config");
5351        assert_eq!(config.run_id.to_string(), run_id);
5352        assert!(config
5353            .step_ids
5354            .as_ref()
5355            .expect("step filter")
5356            .contains(&Uuid::parse_str(&step_id).unwrap()));
5357        assert!(config
5358            .kinds
5359            .as_ref()
5360            .expect("kind filter")
5361            .contains("step_succeeded"));
5362        assert!(config
5363            .severities
5364            .as_ref()
5365            .expect("severity filter")
5366            .contains(&EventSeverity::Warn));
5367        assert!(config
5368            .tags
5369            .as_ref()
5370            .expect("tag filter")
5371            .contains("team:core"));
5372        assert_eq!(config.since_offset, 10);
5373        assert_eq!(config.batch_size, 32);
5374        assert_eq!(config.max_inflight, 20);
5375        assert_eq!(config.heartbeat.as_millis(), 5_000);
5376    }
5377
5378    #[test]
5379    fn handle_ack_evicts_inflight_offsets() {
5380        let mut inflight = VecDeque::from(vec![5, 9, 12]);
5381        let mut last_ack = 0;
5382        handle_ack(&mut inflight, &mut last_ack, 9);
5383        assert_eq!(last_ack, 9);
5384        assert_eq!(inflight, VecDeque::from(vec![12]));
5385
5386        handle_ack(&mut inflight, &mut last_ack, 8);
5387        assert_eq!(last_ack, 9, "ack lower than last should be ignored");
5388        assert_eq!(inflight, VecDeque::from(vec![12]));
5389
5390        handle_ack(&mut inflight, &mut last_ack, 15);
5391        assert_eq!(last_ack, 15);
5392        assert!(inflight.is_empty());
5393    }
5394
5395    #[test]
5396    fn passes_filters_applies_tags_and_severities() {
5397        let run_id = Uuid::new_v4();
5398        let step_id = Uuid::new_v4();
5399        let base_config = SubscriptionConfig {
5400            run_id,
5401            step_ids: Some(vec![step_id].into_iter().collect()),
5402            kinds: Some(vec!["step_succeeded".to_string()].into_iter().collect()),
5403            severities: Some(vec![EventSeverity::Info].into_iter().collect()),
5404            tags: Some(vec!["severity:info".to_string()].into_iter().collect()),
5405            cursor: None,
5406            since_offset: 0,
5407            batch_size: 16,
5408            max_inflight: 8,
5409            heartbeat: Duration::from_millis(1_000),
5410        };
5411
5412        let event = models::OutboxEvent {
5413            id: 1,
5414            run_id,
5415            step_id: Some(step_id),
5416            kind: "STEP_SUCCEEDED".to_string(),
5417            payload: json!({
5418                "status": "succeeded"
5419            }),
5420            created_at: Utc::now(),
5421            published_at: None,
5422        };
5423
5424        let severity = derive_severity(&event.kind, &event.payload);
5425        assert_eq!(severity, EventSeverity::Info);
5426        let tags = derive_tags(&event.kind, event.step_id, &event.payload, severity);
5427        assert!(passes_filters(&base_config, &event, severity, &tags));
5428
5429        let bad_config = SubscriptionConfig {
5430            tags: Some(vec!["severity:error".to_string()].into_iter().collect()),
5431            ..base_config
5432        };
5433        assert!(!passes_filters(&bad_config, &event, severity, &tags));
5434    }
5435}
5436
5437#[cfg(test)]
5438mod tests {
5439    use super::*;
5440    use chrono::Utc;
5441    use fleetforge_storage::{ObjectStoreConfig, StorageConfig};
5442    use fleetforge_telemetry::audit::Audit;
5443    use serde_json::json;
5444    use std::collections::HashMap;
5445    use testcontainers::clients::Cli;
5446    use testcontainers::images::postgres::Postgres;
5447    use tokio::time::{sleep, Duration};
5448    use uuid::Uuid;
5449
5450    fn build_request(dag: serde_json::Value) -> runtime::SubmitRunRequest {
5451        build_request_with_key(dag, "")
5452    }
5453
5454    fn build_request_with_key(
5455        dag: serde_json::Value,
5456        idempotency_key: impl Into<String>,
5457    ) -> runtime::SubmitRunRequest {
5458        runtime::SubmitRunRequest {
5459            spec: Some(runtime::RunSpec {
5460                dag_json: serde_json::to_string(&dag).unwrap(),
5461                inputs: None,
5462                seed: 123,
5463                labels: HashMap::new(),
5464            }),
5465            idempotency_key: idempotency_key.into(),
5466        }
5467    }
5468
5469    #[tokio::test]
5470    async fn submit_run_rejects_invalid_step_schema() {
5471        if std::fs::metadata("/var/run/docker.sock").is_err()
5472            && std::env::var("DOCKER_HOST").is_err()
5473        {
5474            eprintln!("Skipping API tests because Docker is unavailable");
5475            return;
5476        }
5477
5478        let docker = Cli::default();
5479        let container = docker.run(Postgres::default());
5480        let port = container.get_host_port_ipv4(5432);
5481        let db_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
5482        sleep(Duration::from_millis(250)).await;
5483
5484        let storage = Arc::new(
5485            Storage::connect(StorageConfig {
5486                database_url: db_url,
5487                max_connections: 2,
5488                connect_timeout: Duration::from_secs(10),
5489                object_store: ObjectStoreConfig::InMemory,
5490            })
5491            .await
5492            .expect("storage connect"),
5493        );
5494        let auth = Arc::new(AuthHandle::disabled());
5495        let audit = Arc::new(Audit::new(storage.pool().clone()));
5496        let attestation_vault: Arc<dyn AttestationVault> =
5497            Arc::new(InMemoryAttestationVault::new());
5498        let transparency = transparency::local_transparency_log(Arc::clone(&storage));
5499        let service = RuntimeApiService::new(
5500            storage,
5501            auth,
5502            audit,
5503            attestation_vault,
5504            transparency,
5505            false,
5506            TransparencyScope::Disabled,
5507        );
5508
5509        let dag = serde_json::json!({
5510            "steps": [
5511                {
5512                    "type": "tool",
5513                    "inputs": {}
5514                }
5515            ]
5516        });
5517
5518        let request = build_request(dag);
5519        let response = service.submit_run(Request::new(request)).await;
5520        assert!(response.is_err(), "expected schema validation failure");
5521        let status = response.err().unwrap();
5522        assert_eq!(status.code(), tonic::Code::InvalidArgument);
5523        assert!(status.message().contains("step schema validation failed"));
5524        drop(container);
5525    }
5526
5527    #[tokio::test]
5528    async fn submit_run_rejects_missing_seed() {
5529        if std::fs::metadata("/var/run/docker.sock").is_err()
5530            && std::env::var("DOCKER_HOST").is_err()
5531        {
5532            eprintln!("Skipping API tests because Docker is unavailable");
5533            return;
5534        }
5535
5536        let docker = Cli::default();
5537        let container = docker.run(Postgres::default());
5538        let port = container.get_host_port_ipv4(5432);
5539        let db_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
5540        sleep(Duration::from_millis(250)).await;
5541
5542        let storage = Arc::new(
5543            Storage::connect(StorageConfig {
5544                database_url: db_url,
5545                max_connections: 2,
5546                connect_timeout: Duration::from_secs(10),
5547                object_store: ObjectStoreConfig::InMemory,
5548            })
5549            .await
5550            .expect("storage connect"),
5551        );
5552        let auth = Arc::new(AuthHandle::disabled());
5553        let audit = Arc::new(Audit::new(storage.pool().clone()));
5554        let attestation_vault: Arc<dyn AttestationVault> =
5555            Arc::new(InMemoryAttestationVault::new());
5556        let transparency = transparency::local_transparency_log(Arc::clone(&storage));
5557        let service = RuntimeApiService::new(
5558            storage,
5559            auth,
5560            audit,
5561            attestation_vault,
5562            transparency,
5563            false,
5564            TransparencyScope::Disabled,
5565        );
5566
5567        let step_id = Uuid::new_v4();
5568        let dag = json!({
5569            "steps": [
5570                {
5571                    "id": step_id,
5572                    "type": "tool",
5573                    "inputs": {
5574                        "command": ["echo", "ok"]
5575                    }
5576                }
5577            ]
5578        });
5579
5580        let mut request = build_request(dag);
5581        if let Some(spec) = request.spec.as_mut() {
5582            spec.seed = 0;
5583        }
5584
5585        let response = service.submit_run(Request::new(request)).await;
5586        assert!(response.is_err(), "expected seed validation failure");
5587        let status = response.err().unwrap();
5588        assert_eq!(status.code(), tonic::Code::InvalidArgument);
5589        assert!(status.message().contains("seed must be a positive integer"));
5590
5591        drop(container);
5592    }
5593
5594    #[tokio::test]
5595    async fn submit_run_invalid_dag_variants() {
5596        if std::fs::metadata("/var/run/docker.sock").is_err()
5597            && std::env::var("DOCKER_HOST").is_err()
5598        {
5599            eprintln!("Skipping API tests because Docker is unavailable");
5600            return;
5601        }
5602
5603        let docker = Cli::default();
5604        let container = docker.run(Postgres::default());
5605        let port = container.get_host_port_ipv4(5432);
5606        let db_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
5607        sleep(Duration::from_millis(250)).await;
5608
5609        let storage = Arc::new(
5610            Storage::connect(StorageConfig {
5611                database_url: db_url,
5612                max_connections: 2,
5613                connect_timeout: Duration::from_secs(10),
5614                object_store: ObjectStoreConfig::InMemory,
5615            })
5616            .await
5617            .expect("storage connect"),
5618        );
5619        let auth = Arc::new(AuthHandle::disabled());
5620        let audit = Arc::new(Audit::new(storage.pool().clone()));
5621        let attestation_vault: Arc<dyn AttestationVault> =
5622            Arc::new(InMemoryAttestationVault::new());
5623        let transparency = transparency::local_transparency_log(Arc::clone(&storage));
5624        let service = RuntimeApiService::new(
5625            Arc::clone(&storage),
5626            Arc::clone(&auth),
5627            Arc::clone(&audit),
5628            Arc::clone(&attestation_vault),
5629            transparency,
5630            false,
5631            TransparencyScope::Disabled,
5632        );
5633
5634        let step_a = Uuid::new_v4();
5635        let step_b = Uuid::new_v4();
5636        let valid_step = json!({
5637            "id": step_a,
5638            "type": "tool",
5639            "inputs": {}
5640        });
5641
5642        let cases: Vec<(&str, serde_json::Value, &str)> = vec![
5643            (
5644                "missing_steps",
5645                json!({ "edges": [] }),
5646                "dag_json.steps must be an array",
5647            ),
5648            (
5649                "unknown_step_type",
5650                json!({ "steps": [ { "id": step_a, "type": "unknown", "inputs": {} } ] }),
5651                "step schema validation failed",
5652            ),
5653            (
5654                "duplicate_step_id",
5655                json!({ "steps": [valid_step.clone(), valid_step.clone()] }),
5656                "duplicate step.id",
5657            ),
5658            (
5659                "self_edge",
5660                json!({ "steps": [valid_step.clone()], "edges": [[step_a, step_a]] }),
5661                "edge cannot reference the same step twice",
5662            ),
5663            (
5664                "edge_unknown_target",
5665                json!({ "steps": [valid_step.clone()], "edges": [[step_a, step_b]] }),
5666                "edge references unknown step id",
5667            ),
5668            (
5669                "duplicate_edge",
5670                json!({ "steps": [valid_step.clone(), {"id": step_b, "type": "tool", "inputs": {}}], "edges": [[step_a, step_b], [step_a, step_b]] }),
5671                "duplicate edge detected",
5672            ),
5673        ];
5674
5675        for (label, dag, expected) in cases {
5676            let request = build_request(dag);
5677            let response = service.submit_run(Request::new(request)).await;
5678            assert!(response.is_err(), "case {label} unexpectedly succeeded");
5679            let status = response.err().unwrap();
5680            assert_eq!(status.code(), tonic::Code::InvalidArgument, "case {label}");
5681            assert!(
5682                status.message().contains(expected),
5683                "case {label} expected '{expected}' but got '{}'",
5684                status.message()
5685            );
5686        }
5687
5688        drop(container);
5689    }
5690
5691    #[tokio::test]
5692    async fn submit_run_accepts_valid_dag_and_persists_steps() {
5693        if std::fs::metadata("/var/run/docker.sock").is_err()
5694            && std::env::var("DOCKER_HOST").is_err()
5695        {
5696            eprintln!("Skipping API tests because Docker is unavailable");
5697            return;
5698        }
5699
5700        let docker = Cli::default();
5701        let container = docker.run(Postgres::default());
5702        let port = container.get_host_port_ipv4(5432);
5703        let db_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
5704        sleep(Duration::from_millis(250)).await;
5705
5706        let storage = Arc::new(
5707            Storage::connect(StorageConfig {
5708                database_url: db_url,
5709                max_connections: 2,
5710                connect_timeout: Duration::from_secs(10),
5711                object_store: ObjectStoreConfig::InMemory,
5712            })
5713            .await
5714            .expect("storage connect"),
5715        );
5716        let auth = Arc::new(AuthHandle::disabled());
5717        let audit = Arc::new(Audit::new(storage.pool().clone()));
5718        let attestation_vault: Arc<dyn AttestationVault> =
5719            Arc::new(InMemoryAttestationVault::new());
5720        let transparency = transparency::local_transparency_log(Arc::clone(&storage));
5721        let service = RuntimeApiService::new(
5722            Arc::clone(&storage),
5723            Arc::clone(&auth),
5724            Arc::clone(&audit),
5725            Arc::clone(&attestation_vault),
5726            transparency,
5727            false,
5728            TransparencyScope::Disabled,
5729        );
5730
5731        let step_a = Uuid::new_v4();
5732        let step_b = Uuid::new_v4();
5733        let step_c = Uuid::new_v4();
5734
5735        let dag = serde_json::json!({
5736            "steps": [
5737                {
5738                    "id": step_a,
5739                    "type": "tool",
5740                    "inputs": {},
5741                    "policy": {"priority": 5}
5742                },
5743                {
5744                    "id": step_b,
5745                    "type": "llm",
5746                    "inputs": {},
5747                    "policy": {"priority": 3}
5748                },
5749                {
5750                    "id": step_c,
5751                    "type": "reduce",
5752                    "inputs": {},
5753                    "policy": {"priority": 1}
5754                }
5755            ],
5756            "edges": [
5757                [step_a, step_b],
5758                [step_b, step_c]
5759            ]
5760        });
5761
5762        let request = build_request(dag);
5763        let response = service
5764            .submit_run(Request::new(request))
5765            .await
5766            .expect("valid run should succeed")
5767            .into_inner();
5768
5769        let run_uuid = Uuid::parse_str(&response.run_id).expect("run_id must be UUID");
5770        let persisted_run = storage
5771            .fetch_run(run_uuid)
5772            .await
5773            .expect("fetch run")
5774            .expect("run should exist");
5775        assert_eq!(persisted_run.status, models::RunStatus::Pending);
5776
5777        let steps = storage
5778            .fetch_steps_for_run(run_uuid)
5779            .await
5780            .expect("fetch steps");
5781        assert_eq!(steps.len(), 3);
5782
5783        let mut by_idx = steps;
5784        by_idx.sort_by_key(|s| s.idx);
5785
5786        assert_eq!(by_idx[0].step_id, step_a);
5787        assert_eq!(by_idx[0].status, models::StepStatus::Queued);
5788        assert_eq!(by_idx[0].priority, 5);
5789        assert_eq!(by_idx[0].pending_dependencies, 0);
5790
5791        assert_eq!(by_idx[1].step_id, step_b);
5792        assert_eq!(by_idx[1].status, models::StepStatus::Blocked);
5793        assert_eq!(by_idx[1].priority, 3);
5794        assert_eq!(by_idx[1].pending_dependencies, 1);
5795
5796        assert_eq!(by_idx[2].step_id, step_c);
5797        assert_eq!(by_idx[2].status, models::StepStatus::Blocked);
5798        assert_eq!(by_idx[2].priority, 1);
5799        assert_eq!(by_idx[2].pending_dependencies, 1);
5800        assert!(
5801            by_idx
5802                .iter()
5803                .all(|s| s.total_dependencies >= s.pending_dependencies),
5804            "dependency counters should be consistent"
5805        );
5806        drop(container);
5807    }
5808
5809    #[tokio::test]
5810    async fn submit_run_respects_idempotency_key() {
5811        if std::fs::metadata("/var/run/docker.sock").is_err()
5812            && std::env::var("DOCKER_HOST").is_err()
5813        {
5814            eprintln!("Skipping API tests because Docker is unavailable");
5815            return;
5816        }
5817
5818        let docker = Cli::default();
5819        let container = docker.run(Postgres::default());
5820        let port = container.get_host_port_ipv4(5432);
5821        let db_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
5822        sleep(Duration::from_millis(250)).await;
5823
5824        let storage = Arc::new(
5825            Storage::connect(StorageConfig {
5826                database_url: db_url,
5827                max_connections: 2,
5828                connect_timeout: Duration::from_secs(10),
5829                object_store: ObjectStoreConfig::InMemory,
5830            })
5831            .await
5832            .expect("storage connect"),
5833        );
5834        let auth = Arc::new(AuthHandle::disabled());
5835        let audit = Arc::new(Audit::new(storage.pool().clone()));
5836        let attestation_vault: Arc<dyn AttestationVault> =
5837            Arc::new(InMemoryAttestationVault::new());
5838        let transparency = transparency::local_transparency_log(Arc::clone(&storage));
5839        let service = RuntimeApiService::new(
5840            Arc::clone(&storage),
5841            Arc::clone(&auth),
5842            Arc::clone(&audit),
5843            Arc::clone(&attestation_vault),
5844            transparency,
5845            false,
5846            TransparencyScope::Disabled,
5847        );
5848
5849        let step_id = Uuid::new_v4();
5850        let dag = serde_json::json!({
5851            "steps": [{
5852                "id": step_id,
5853                "type": "tool",
5854                "inputs": {}
5855            }]
5856        });
5857        let raw_key = "  idempotent-key  ";
5858        let normalized_key = "idempotent-key";
5859
5860        let first = service
5861            .submit_run(Request::new(build_request_with_key(dag.clone(), raw_key)))
5862            .await
5863            .expect("first submission")
5864            .into_inner();
5865
5866        let second = service
5867            .submit_run(Request::new(build_request_with_key(dag, raw_key)))
5868            .await
5869            .expect("second submission")
5870            .into_inner();
5871
5872        assert_eq!(
5873            first.run_id, second.run_id,
5874            "idempotent submissions should reuse run_id"
5875        );
5876
5877        let stored = storage
5878            .fetch_run_by_idempotency_key(normalized_key)
5879            .await
5880            .expect("lookup idempotent run")
5881            .expect("run should exist for idempotency key");
5882        assert_eq!(stored.run_id.to_string(), first.run_id);
5883
5884        drop(container);
5885    }
5886
5887    #[tokio::test]
5888    async fn get_attestations_returns_records() {
5889        if std::fs::metadata("/var/run/docker.sock").is_err()
5890            && std::env::var("DOCKER_HOST").is_err()
5891        {
5892            eprintln!("Skipping API tests because Docker is unavailable");
5893            return;
5894        }
5895
5896        let docker = Cli::default();
5897        let container = docker.run(Postgres::default());
5898        let port = container.get_host_port_ipv4(5432);
5899        let db_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
5900        sleep(Duration::from_millis(250)).await;
5901
5902        let storage = Arc::new(
5903            Storage::connect(StorageConfig {
5904                database_url: db_url,
5905                max_connections: 2,
5906                connect_timeout: Duration::from_secs(10),
5907                object_store: ObjectStoreConfig::InMemory,
5908            })
5909            .await
5910            .expect("storage connect"),
5911        );
5912        let auth = Arc::new(AuthHandle::disabled());
5913        let audit = Arc::new(Audit::new(storage.pool().clone()));
5914        let attestation_vault: Arc<dyn AttestationVault> =
5915            Arc::new(InMemoryAttestationVault::new());
5916        let transparency = transparency::local_transparency_log(Arc::clone(&storage));
5917        let service = RuntimeApiService::new(
5918            Arc::clone(&storage),
5919            Arc::clone(&auth),
5920            Arc::clone(&audit),
5921            Arc::clone(&attestation_vault),
5922            transparency,
5923            false,
5924            TransparencyScope::Disabled,
5925        );
5926
5927        let run_id = Uuid::new_v4();
5928        let attestation =
5929            Attestation::new(Uuid::new_v4(), Utc::now()).with_subject(TrustSubject::Run { run_id });
5930        attestation_vault
5931            .record(attestation.clone())
5932            .await
5933            .expect("record attestation");
5934
5935        let response = service
5936            .get_attestations(Request::new(runtime::GetAttestationsRequest {
5937                attestation_ids: vec![
5938                    attestation.id.to_string(),
5939                    attestation.id.to_string(),
5940                    Uuid::new_v4().to_string(),
5941                ],
5942            }))
5943            .await
5944            .expect("get_attestations RPC")
5945            .into_inner();
5946
5947        assert_eq!(response.attestations.len(), 1);
5948        let returned = &response.attestations[0];
5949        assert_eq!(returned.attestation_id, attestation.id.to_string());
5950        match returned
5951            .subject
5952            .as_ref()
5953            .and_then(|subject| subject.kind.as_ref())
5954        {
5955            Some(runtime::attestation_subject::Kind::Run(run_subject)) => {
5956                assert_eq!(run_subject.run_id, run_id.to_string());
5957            }
5958            other => panic!("unexpected subject {other:?}"),
5959        }
5960
5961        drop(container);
5962    }
5963
5964    #[test]
5965    fn compute_drift_reports_within_tolerance() {
5966        let recorded = json!({
5967            "tokens": 10,
5968            "nested": { "value": "ok" }
5969        });
5970        let current = recorded.clone();
5971
5972        let result = compute_drift(
5973            &recorded,
5974            &current,
5975            DEFAULT_TOKEN_TOLERANCE,
5976            DEFAULT_COST_TOLERANCE,
5977        );
5978        assert!(result.within_tolerance);
5979        assert_eq!(result.token_delta, Some(0.0));
5980        assert_eq!(result.cost_delta, None);
5981    }
5982
5983    #[test]
5984    fn compute_drift_flags_token_delta() {
5985        let recorded = json!({
5986            "tokens": 10,
5987            "cost": 1.0,
5988            "payload": { "message": "hello" }
5989        });
5990        let current = json!({
5991            "tokens": 15,
5992            "cost": 1.0,
5993            "payload": { "message": "hello" }
5994        });
5995
5996        let result = compute_drift(
5997            &recorded,
5998            &current,
5999            DEFAULT_TOKEN_TOLERANCE,
6000            DEFAULT_COST_TOLERANCE,
6001        );
6002        assert!(!result.within_tolerance);
6003        assert_eq!(result.token_delta, Some(5.0));
6004        assert_eq!(result.cost_delta, Some(0.0));
6005    }
6006}