1mod auth;
4mod http;
5mod transparency;
6mod transparency_writer;
7
8use std::collections::{HashMap, HashSet, VecDeque};
9use std::convert::{Infallible, TryFrom};
10use std::env;
11use std::net::SocketAddr;
12use std::sync::Arc;
13
14use ::http::{header, HeaderValue, Method};
15use anyhow::{anyhow, Context, Result};
16use auth::{AuthError, AuthHandle, Role};
17use bytes::Bytes;
18use chrono::{DateTime, TimeZone, Utc};
19use clickhouse::{Client as ClickhouseClient, Row};
20use fleetforge_changeops::{
21 BudgetSignal as GateBudgetSignal, ChangeDecisionEffect as GateEffect, ChangeDiff as GateDiff,
22 ChangeFollowupRecord, ChangeFollowupRequest as GateFollowupRequest,
23 ChangeGateDecision as GateDecision, ChangeGateEngine, ChangeGateRequest as GateRequest,
24 CoverageComponent as GateCoverageComponent, CoverageReport as GateCoverageReport,
25 DecisionScorecard as GateScorecard, EvalMetric as GateEvalMetric,
26 FollowupAction as GateFollowupAction, FollowupOutcome as GateFollowupOutcome,
27 MetricDirection as GateMetricDirection,
28};
29use fleetforge_common::{
30 labels::{labels_map_to_value, labels_value_to_map},
31 licensing::{feature_allowed, LicensedFeature},
32 prost_json::{json_to_prost_struct, json_to_prost_value, prost_struct_to_json},
33 validation::{validate_run_spec, validate_step_spec, SchemaValidationError},
34};
35use fleetforge_contracts::{
36 runtime,
37 tap::{
38 self, subscribe_run_request, EventSeverity, PolicyDecisionEffect, QueryEventSource,
39 QueryEventsRequest, QueryEventsResponse, RunEventEnvelope, RunTraceContext,
40 SubscribeRunAck, SubscribeRunOpen, SubscribeRunRequest, SubscribeRunResponse,
41 },
42 RuntimeService, RuntimeServiceClient, RuntimeServiceServer, TapService, TapServiceClient,
43 TapServiceServer,
44};
45use fleetforge_storage::{models, models::NewOutboxEvent, Storage};
46use fleetforge_telemetry::audit::Audit;
47use fleetforge_trust::{
48 Attestation, AttestationVault, InMemoryAttestationVault, ObjectStoreAttestationVault,
49 TrustSubject,
50};
51use futures::Stream;
52use http_body::Body as _;
53use hyper::service::{make_service_fn, service_fn};
54use hyper::{Body, Request as HyperRequest, Server as HyperServer};
55use serde::Deserialize;
56use serde_json::{json, Map, Number, Value};
57use std::pin::Pin;
58use tokio::net::TcpListener;
59use tokio::sync::mpsc;
60use tokio::time::{interval, sleep, Duration, MissedTickBehavior};
61use tokio_stream::wrappers::ReceiverStream;
62use tonic::transport::Server;
63use tonic::{Request, Response, Status};
64use tonic_health::pb::health_check_response::ServingStatus;
65use tonic_health::server::health_reporter;
66use tonic_reflection::server::Builder as ReflectionBuilder;
67use tower::util::BoxCloneService;
68use tower::{Service, ServiceExt};
69use tower_http::cors::CorsLayer;
70use tracing::{error, info, instrument, warn};
71use transparency::{transparency_log_from_env, TransparencyContext, TransparencyLog};
72use transparency_writer::TransparencyWriter;
73use uuid::Uuid;
74
75#[derive(Clone, Copy, Debug, PartialEq, Eq)]
76enum TransparencyScope {
77 Disabled,
78 Gates,
79 Runs,
80 Artifacts,
81}
82
83impl TransparencyScope {
84 fn includes_gates(self) -> bool {
85 matches!(
86 self,
87 TransparencyScope::Gates | TransparencyScope::Runs | TransparencyScope::Artifacts
88 )
89 }
90}
91#[derive(Clone)]
92struct RuntimeApiService {
93 storage: Arc<Storage>,
94 auth: Arc<AuthHandle>,
95 audit: Arc<Audit>,
96 attestation_vault: Arc<dyn AttestationVault>,
97 changeops: ChangeGateEngine,
98 transparency: Arc<dyn TransparencyLog>,
99 transparency_writer_enabled: bool,
100 transparency_scope: TransparencyScope,
101}
102
103impl RuntimeApiService {
104 fn new(
105 storage: Arc<Storage>,
106 auth: Arc<AuthHandle>,
107 audit: Arc<Audit>,
108 attestation_vault: Arc<dyn AttestationVault>,
109 transparency: Arc<dyn TransparencyLog>,
110 transparency_writer_enabled: bool,
111 transparency_scope: TransparencyScope,
112 ) -> Self {
113 Self {
114 storage,
115 auth,
116 audit,
117 attestation_vault,
118 changeops: ChangeGateEngine::default(),
119 transparency,
120 transparency_writer_enabled,
121 transparency_scope,
122 }
123 }
124
125 async fn require_role(
126 &self,
127 metadata: &tonic::metadata::MetadataMap,
128 required: Role,
129 ) -> Result<Role, Status> {
130 self.auth
131 .require_grpc(metadata, required)
132 .await
133 .map_err(|err| match err {
134 AuthError::MissingToken | AuthError::InvalidToken => {
135 Status::unauthenticated(err.to_string())
136 }
137 AuthError::InsufficientRole { .. }
138 | AuthError::MissingRoleClaim(_)
139 | AuthError::UnsupportedRole { .. } => Status::permission_denied(err.to_string()),
140 AuthError::NotConfigured => Status::unauthenticated(err.to_string()),
141 AuthError::Backend(_) => Status::internal(err.to_string()),
142 })
143 }
144
145 fn actor_from_metadata(metadata: &tonic::metadata::MetadataMap, role: Role) -> String {
146 metadata
147 .get("x-actor")
148 .and_then(|value| value.to_str().ok())
149 .map(|value| value.to_string())
150 .unwrap_or_else(|| format!("role:{role:?}"))
151 }
152}
153
154fn invalid_argument(message: impl Into<String>) -> Status {
155 Status::invalid_argument(message.into())
156}
157
158fn internal_error(message: impl Into<String>) -> Status {
159 Status::internal(message.into())
160}
161
162const QUERY_DEFAULT_PAGE_SIZE: u32 = 100;
163const QUERY_MAX_PAGE_SIZE: u32 = 500;
164const QUERY_FETCH_MULTIPLIER: u32 = 4;
165const TRANSPARENCY_WRITER_FLAG: &str = "FLEETFORGE_TRANSPARENCY_WRITER";
166const TRANSPARENCY_WRITER_INTERVAL_ENV: &str = "FLEETFORGE_TRANSPARENCY_WRITER_INTERVAL_SECS";
167const TRANSPARENCY_WRITER_INTERVAL_MIN_SECS: u64 = 5;
168const TRANSPARENCY_WRITER_DEFAULT_INTERVAL_SECS: u64 = 30;
169
170fn to_timestamp(time: DateTime<Utc>) -> prost_types::Timestamp {
171 prost_types::Timestamp {
172 seconds: time.timestamp(),
173 nanos: time.timestamp_subsec_nanos() as i32,
174 }
175}
176
177fn value_to_struct(value: &Value) -> Option<prost_types::Struct> {
178 if value.is_null() {
179 return None;
180 }
181 match json_to_prost_struct(value) {
182 Ok(struct_value) => Some(struct_value),
183 Err(err) => {
184 warn!(error = %err, "failed to convert JSON value to protobuf Struct");
185 None
186 }
187 }
188}
189
190impl RuntimeApiService {
191 async fn publish_scitt_entry(
192 &self,
193 change_id: &str,
194 gate_id: Uuid,
195 artifact_sha: &str,
196 attestation_refs: &[Uuid],
197 metadata: &mut Value,
198 ) -> Result<(), Status> {
199 let signer = fleetforge_trust::scitt_signer()
200 .map_err(|err| internal_error(format!("SCITT signer unavailable: {err}")))?;
201 let scitt_entry = fleetforge_trust::build_scitt_entry(
202 change_id,
203 attestation_refs,
204 artifact_sha,
205 metadata,
206 signer.as_ref(),
207 )
208 .map_err(|err| internal_error(format!("failed to build SCITT entry: {err}")))?;
209
210 let transparency_ctx = TransparencyContext {
211 change_id,
212 gate_id: &gate_id,
213 artifact_sha256: artifact_sha,
214 attestation_ids: attestation_refs,
215 };
216 let record = self
217 .transparency
218 .append(&scitt_entry, &transparency_ctx)
219 .await
220 .map_err(|err| internal_error(format!("failed to publish SCITT entry: {err}")))?;
221
222 let scitt_metadata = json!({
223 "id": record.entry_id,
224 "sha256": hex::encode(record.artifact.sha256),
225 "media_type": record.artifact.media_type.clone(),
226 "size": record.artifact.size,
227 "attestation_ids": attestation_refs.iter().map(|id| id.to_string()).collect::<Vec<_>>(),
228 "transparency": record.receipt,
229 });
230
231 if let Value::Object(mut meta_obj) = metadata.take() {
232 meta_obj.insert("scitt_entry".to_string(), scitt_metadata);
233 *metadata = Value::Object(meta_obj);
234 } else {
235 *metadata = json!({ "scitt_entry": scitt_metadata });
236 }
237 Ok(())
238 }
239}
240
241fn struct_to_value(data: Option<prost_types::Struct>) -> Value {
242 data.as_ref()
243 .map(prost_struct_to_json)
244 .unwrap_or(Value::Null)
245}
246
247fn attestation_to_proto(att: Attestation) -> Result<runtime::Attestation, Status> {
248 let subject = att
249 .subject
250 .as_ref()
251 .map(attestation_subject_to_proto)
252 .transpose()?;
253
254 let claims_value = Value::Object(att.claims.into_iter().collect());
255 let claims_struct = json_to_prost_struct(&claims_value)
256 .map_err(|err| internal_error(format!("failed to serialise attestation claims: {err}")))?;
257
258 Ok(runtime::Attestation {
259 attestation_id: att.id.to_string(),
260 issued_at: Some(to_timestamp(att.issued_at)),
261 subject,
262 artifact_hash: att.artifact_hash.map(|value| value.into()),
263 claims: Some(claims_struct),
264 signature: att.signature.map(|value| value.into()),
265 })
266}
267
268fn attestation_subject_to_proto(
269 subject: &TrustSubject,
270) -> Result<runtime::AttestationSubject, Status> {
271 use runtime::attestation_subject::Kind;
272 let kind = match subject {
273 TrustSubject::Run { run_id } => Kind::Run(runtime::AttestationRunSubject {
274 run_id: run_id.to_string(),
275 }),
276 TrustSubject::Step { run_id, step_id } => Kind::Step(runtime::AttestationStepSubject {
277 run_id: run_id.to_string(),
278 step_id: step_id.to_string(),
279 }),
280 TrustSubject::Tool { name } => {
281 Kind::Tool(runtime::AttestationToolSubject { name: name.clone() })
282 }
283 TrustSubject::Artifact { uri } => {
284 Kind::Artifact(runtime::AttestationArtifactSubject { uri: uri.clone() })
285 }
286 TrustSubject::CapabilityToken { token_id } => {
287 Kind::CapabilityToken(runtime::AttestationCapabilitySubject {
288 token_id: token_id.to_string(),
289 })
290 }
291 TrustSubject::Custom { label } => Kind::Custom(runtime::AttestationCustomSubject {
292 label: label.clone(),
293 }),
294 };
295 Ok(runtime::AttestationSubject { kind: Some(kind) })
296}
297
298fn gate_effect_to_proto(effect: GateEffect) -> runtime::ChangeDecisionEffect {
299 match effect {
300 GateEffect::Allow => runtime::ChangeDecisionEffect::Allow,
301 GateEffect::FollowUp => runtime::ChangeDecisionEffect::FollowUp,
302 GateEffect::Deny => runtime::ChangeDecisionEffect::Deny,
303 }
304}
305
306fn gate_effect_from_str(value: &str) -> GateEffect {
307 match value {
308 "allow" => GateEffect::Allow,
309 "deny" => GateEffect::Deny,
310 "follow_up" => GateEffect::FollowUp,
311 other => {
312 warn!(effect = %other, "unknown change gate effect stored; defaulting to follow_up");
313 GateEffect::FollowUp
314 }
315 }
316}
317
318fn gate_followup_action_to_proto(action: &GateFollowupAction) -> runtime::FollowupAction {
319 runtime::FollowupAction {
320 slug: action.slug.clone(),
321 description: action.description.clone(),
322 recommended: action.recommended,
323 }
324}
325
326fn gate_scorecard_to_proto(scorecard: &GateScorecard) -> Option<runtime::DecisionScorecard> {
327 Some(runtime::DecisionScorecard {
328 novelty: Some(runtime::NoveltySummary {
329 max_novelty: scorecard.novelty.max_novelty,
330 avg_novelty: scorecard.novelty.avg_novelty,
331 high_risk_paths: scorecard.novelty.high_risk_paths.clone(),
332 }),
333 coverage: Some(runtime::CoverageSummary {
334 overall_ratio: scorecard.coverage.overall_ratio,
335 components_below_threshold: scorecard.coverage.components_below_threshold.clone(),
336 }),
337 evals: Some(runtime::EvalSummary {
338 failing_metrics: scorecard.evals.failing_metrics.clone(),
339 attention_metrics: scorecard.evals.attention_metrics.clone(),
340 score_per_token: scorecard.evals.score_per_token,
341 }),
342 budgets: Some(runtime::BudgetSummary {
343 breaches: scorecard.budgets.breaches.clone(),
344 near_limits: scorecard.budgets.near_limits.clone(),
345 }),
346 telemetry: value_to_struct(&scorecard.telemetry),
347 })
348}
349
350fn proto_followup_outcome(outcome: runtime::FollowupOutcome) -> GateFollowupOutcome {
351 match outcome {
352 runtime::FollowupOutcome::Approved => GateFollowupOutcome::Approved,
353 runtime::FollowupOutcome::NeedsChanges => GateFollowupOutcome::NeedsChanges,
354 runtime::FollowupOutcome::Deferred => GateFollowupOutcome::Deferred,
355 runtime::FollowupOutcome::Unspecified => GateFollowupOutcome::Approved,
356 }
357}
358
359fn followup_outcome_to_proto(outcome: GateFollowupOutcome) -> runtime::FollowupOutcome {
360 match outcome {
361 GateFollowupOutcome::Approved => runtime::FollowupOutcome::Approved,
362 GateFollowupOutcome::NeedsChanges => runtime::FollowupOutcome::NeedsChanges,
363 GateFollowupOutcome::Deferred => runtime::FollowupOutcome::Deferred,
364 }
365}
366
367fn parse_reasons(value: &Value) -> Vec<String> {
368 serde_json::from_value::<Vec<String>>(value.clone()).unwrap_or_default()
369}
370
371fn parse_followup_actions(value: &Value) -> Vec<GateFollowupAction> {
372 serde_json::from_value::<Vec<GateFollowupAction>>(value.clone()).unwrap_or_default()
373}
374
375fn parse_scorecard(value: &Value) -> GateScorecard {
376 serde_json::from_value::<GateScorecard>(value.clone()).unwrap_or_default()
377}
378
379fn gate_followup_record_to_proto(
380 record: &models::ChangeGateFollowup,
381) -> runtime::ChangeGateFollowup {
382 let outcome =
383 GateFollowupOutcome::from_str(&record.outcome).unwrap_or(GateFollowupOutcome::Approved);
384 let details_struct = value_to_struct(&record.details);
385
386 runtime::ChangeGateFollowup {
387 followup_id: record.followup_id.to_string(),
388 gate_id: record.gate_id.to_string(),
389 actor: record.actor.clone(),
390 outcome: followup_outcome_to_proto(outcome) as i32,
391 note: record.note.clone(),
392 details: details_struct,
393 recorded_at: Some(to_timestamp(record.recorded_at)),
394 }
395}
396
397fn build_proto_change_gate(
398 gate_id: Uuid,
399 change_id: &str,
400 revision: Option<&str>,
401 decision: &GateDecision,
402 telemetry: &Value,
403 acknowledgements: &[models::ChangeGateFollowup],
404) -> Result<runtime::ChangeGateDecision, Status> {
405 let effect = gate_effect_to_proto(decision.effect) as i32;
406 let scorecard = gate_scorecard_to_proto(&decision.scorecard);
407 let metadata_struct = value_to_struct(&decision.metadata);
408 let telemetry_struct = value_to_struct(telemetry);
409 let ack_proto = acknowledgements
410 .iter()
411 .map(gate_followup_record_to_proto)
412 .collect();
413
414 Ok(runtime::ChangeGateDecision {
415 gate_id: gate_id.to_string(),
416 change_id: change_id.to_string(),
417 revision: revision.unwrap_or_default().to_string(),
418 effect,
419 reasons: decision.reasons.clone(),
420 followups: decision
421 .followups
422 .iter()
423 .map(gate_followup_action_to_proto)
424 .collect(),
425 scorecard,
426 decided_at: Some(to_timestamp(decision.decided_at)),
427 metadata: metadata_struct,
428 telemetry: telemetry_struct,
429 acknowledgements: ack_proto,
430 })
431}
432
433fn attestation_ids_from_value(value: &Value) -> Vec<Uuid> {
434 fn recurse(value: &Value, acc: &mut HashSet<Uuid>) {
435 match value {
436 Value::Object(map) => {
437 if let Some(ids) = map.get("attestation_ids").and_then(Value::as_array) {
438 for id in ids {
439 if let Some(s) = id.as_str() {
440 if let Ok(uuid) = Uuid::parse_str(s) {
441 acc.insert(uuid);
442 }
443 }
444 }
445 }
446 for v in map.values() {
447 recurse(v, acc);
448 }
449 }
450 Value::Array(items) => {
451 for item in items {
452 recurse(item, acc);
453 }
454 }
455 _ => {}
456 }
457 }
458 let mut set = HashSet::new();
459 recurse(value, &mut set);
460 set.into_iter().collect()
461}
462
463#[derive(Clone)]
464struct TapApiService {
465 storage: Arc<Storage>,
466 auth: Arc<AuthHandle>,
467 clickhouse: Option<ClickhouseEventQuery>,
468}
469
470impl TapApiService {
471 fn new(storage: Arc<Storage>, auth: Arc<AuthHandle>) -> Self {
472 let clickhouse = ClickhouseEventQuery::from_env().ok();
473 Self {
474 storage,
475 auth,
476 clickhouse,
477 }
478 }
479
480 async fn require_role(
481 &self,
482 metadata: &tonic::metadata::MetadataMap,
483 required: Role,
484 ) -> Result<Role, Status> {
485 self.auth
486 .require_grpc(metadata, required)
487 .await
488 .map_err(|err| match err {
489 AuthError::MissingToken | AuthError::InvalidToken => {
490 Status::unauthenticated(err.to_string())
491 }
492 AuthError::InsufficientRole { .. }
493 | AuthError::MissingRoleClaim(_)
494 | AuthError::UnsupportedRole { .. } => Status::permission_denied(err.to_string()),
495 AuthError::NotConfigured => Status::unauthenticated(err.to_string()),
496 AuthError::Backend(_) => Status::internal(err.to_string()),
497 })
498 }
499}
500
501#[tonic::async_trait]
502impl TapService for TapApiService {
503 type SubscribeRunStream =
504 Pin<Box<dyn Stream<Item = Result<SubscribeRunResponse, Status>> + Send>>;
505
506 async fn subscribe_run(
507 &self,
508 request: Request<tonic::Streaming<SubscribeRunRequest>>,
509 ) -> Result<Response<Self::SubscribeRunStream>, Status> {
510 let metadata = request.metadata().clone();
511 self.require_role(&metadata, Role::Reader).await?;
512
513 let mut inbound = request.into_inner();
514 let first = inbound
515 .message()
516 .await
517 .map_err(|status| internal_error(status.to_string()))?
518 .ok_or_else(|| invalid_argument("missing subscribe request"))?;
519
520 let open = match first.request {
521 Some(subscribe_run_request::Request::Open(open)) => open,
522 Some(subscribe_run_request::Request::Ack(_)) => {
523 return Err(invalid_argument(
524 "first SubscribeRun message must be `open`",
525 ))
526 }
527 None => return Err(invalid_argument("first SubscribeRun message must be set")),
528 };
529
530 let mut config = SubscriptionConfig::try_from_open(open)?;
531 if let Some(cursor) = config.cursor.clone() {
532 if config.since_offset == 0 {
533 match self.storage.fetch_tap_offset(config.run_id, &cursor).await {
534 Ok(Some(stored)) => {
535 config.since_offset = stored;
536 }
537 Ok(None) => {}
538 Err(err) => {
539 error!(
540 run = %config.run_id,
541 cursor = %cursor,
542 error = %err,
543 "failed to load stored tap cursor"
544 );
545 return Err(internal_error("failed to load stored tap cursor"));
546 }
547 }
548 }
549 }
550 let (tx, rx) =
551 mpsc::channel::<Result<SubscribeRunResponse, Status>>(config.max_inflight * 2);
552
553 let storage = Arc::clone(&self.storage);
554 tokio::spawn(async move {
555 if let Err(err) = run_subscription(storage, config, inbound, tx.clone()).await {
556 let _ = tx.send(Err(err)).await;
557 }
558 });
559
560 Ok(Response::new(
561 Box::pin(ReceiverStream::new(rx)) as Self::SubscribeRunStream
562 ))
563 }
564
565 async fn query_events(
566 &self,
567 request: Request<QueryEventsRequest>,
568 ) -> Result<Response<QueryEventsResponse>, Status> {
569 let metadata = request.metadata().clone();
570 self.require_role(&metadata, Role::Reader).await?;
571
572 let clickhouse = self.clickhouse.as_ref().ok_or_else(|| {
573 Status::failed_precondition("QueryEvents requires ClickHouse configuration")
574 })?;
575
576 let inner = request.into_inner();
577 let filters = QueryFiltersOwned::from_request(&inner)?;
578
579 let requested_page = if inner.page_size == 0 {
580 QUERY_DEFAULT_PAGE_SIZE
581 } else {
582 inner.page_size
583 };
584 let page_size = requested_page.clamp(1, QUERY_MAX_PAGE_SIZE);
585 let offset = parse_page_token(inner.page_token.as_str())?;
586
587 let QueryBatch {
588 events,
589 next_offset,
590 } = clickhouse
591 .fetch(&filters, offset, page_size)
592 .await
593 .map_err(|err| {
594 error!(error = %err, "failed to query ClickHouse events");
595 internal_error("failed to query ClickHouse events")
596 })?;
597
598 let next_page_token = next_offset
599 .map(|value| value.to_string())
600 .unwrap_or_default();
601
602 Ok(Response::new(QueryEventsResponse {
603 events,
604 next_page_token,
605 }))
606 }
607}
608
609struct SubscriptionConfig {
610 run_id: Uuid,
611 step_ids: Option<HashSet<Uuid>>,
612 kinds: Option<HashSet<String>>,
613 severities: Option<HashSet<EventSeverity>>,
614 tags: Option<HashSet<String>>,
615 cursor: Option<String>,
616 since_offset: i64,
617 batch_size: i64,
618 max_inflight: usize,
619 heartbeat: Duration,
620}
621
622impl SubscriptionConfig {
623 fn try_from_open(open: SubscribeRunOpen) -> Result<Self, Status> {
624 let SubscribeRunOpen {
625 run_ids,
626 step_ids,
627 kinds,
628 severities,
629 tags,
630 since_offset,
631 batch_size,
632 max_inflight,
633 heartbeat_interval_ms,
634 cursor,
635 ..
636 } = open;
637
638 if run_ids.is_empty() {
639 return Err(invalid_argument(
640 "subscribe_run requires at least one run_id",
641 ));
642 }
643 if run_ids.len() > 1 {
644 return Err(invalid_argument(
645 "subscribe_run currently supports a single run_id",
646 ));
647 }
648 let run_id = Uuid::parse_str(&run_ids[0])
649 .map_err(|err| invalid_argument(format!("invalid run_id '{}' : {err}", run_ids[0])))?;
650
651 let step_ids = if step_ids.is_empty() {
652 None
653 } else {
654 let mut ids = HashSet::with_capacity(step_ids.len());
655 for raw in step_ids {
656 let parsed = Uuid::parse_str(&raw)
657 .map_err(|err| invalid_argument(format!("invalid step_id '{raw}': {err}")))?;
658 ids.insert(parsed);
659 }
660 Some(ids)
661 };
662
663 let kinds = if kinds.is_empty() {
664 None
665 } else {
666 Some(
667 kinds
668 .into_iter()
669 .map(|kind| kind.to_ascii_lowercase())
670 .collect::<HashSet<_>>(),
671 )
672 };
673
674 let severities = if severities.is_empty() {
675 None
676 } else {
677 let mut set = HashSet::with_capacity(severities.len());
678 for raw in severities {
679 let sev = EventSeverity::try_from(raw)
680 .map_err(|_| invalid_argument(format!("unknown severity enum value {raw}")))?;
681 set.insert(sev);
682 }
683 Some(set)
684 };
685
686 let tags = if tags.is_empty() {
687 None
688 } else {
689 Some(
690 tags.into_iter()
691 .map(|tag| tag.to_ascii_lowercase())
692 .collect::<HashSet<_>>(),
693 )
694 };
695
696 if since_offset < 0 {
697 return Err(invalid_argument("since_offset must be >= 0"));
698 }
699
700 let batch_raw = if batch_size == 0 { 256 } else { batch_size };
701 let batch_size = batch_raw.max(1).min(2048) as i64;
702
703 let inflight_raw = if max_inflight == 0 { 128 } else { max_inflight };
704 let max_inflight = inflight_raw.max(1).min(4096) as usize;
705
706 let heartbeat_ms = if heartbeat_interval_ms == 0 {
707 1000u64
708 } else {
709 heartbeat_interval_ms as u64
710 }
711 .clamp(250, 60_000);
712
713 let cursor = {
714 let trimmed = cursor.trim();
715 if trimmed.is_empty() {
716 None
717 } else {
718 Some(trimmed.to_string())
719 }
720 };
721
722 Ok(Self {
723 run_id,
724 step_ids,
725 kinds,
726 severities,
727 tags,
728 cursor,
729 since_offset,
730 batch_size,
731 max_inflight,
732 heartbeat: Duration::from_millis(heartbeat_ms),
733 })
734 }
735}
736
737async fn run_subscription(
738 storage: Arc<Storage>,
739 config: SubscriptionConfig,
740 mut inbound: tonic::Streaming<SubscribeRunRequest>,
741 tx: mpsc::Sender<Result<SubscribeRunResponse, Status>>,
742) -> Result<(), Status> {
743 let mut last_seen = config.since_offset;
744 let mut last_ack = config.since_offset;
745 let mut inflight: VecDeque<i64> = VecDeque::new();
746 let mut ticker = interval(config.heartbeat);
747 ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
748
749 loop {
750 while inflight.len() < config.max_inflight {
751 let events = storage
752 .fetch_outbox_for_run_since(config.run_id, last_seen, config.batch_size)
753 .await
754 .map_err(|err| internal_error(format!("failed to fetch outbox events: {err}")))?;
755
756 if events.is_empty() {
757 break;
758 }
759
760 last_seen = events.last().map(|event| event.id).unwrap_or(last_seen);
761
762 for event in events {
763 last_seen = event.id;
764 let severity = derive_severity(&event.kind, &event.payload);
765 let tags = derive_tags(&event.kind, event.step_id, &event.payload, severity);
766 if !passes_filters(&config, &event, severity, &tags) {
767 continue;
768 }
769
770 let response = build_response(&config, &event, severity, &tags, last_seen)?;
771 if tx.send(Ok(response)).await.is_err() {
772 return Ok(());
773 }
774 inflight.push_back(event.id);
775 }
776 }
777
778 tokio::select! {
779 maybe_req = inbound.message() => {
780 match maybe_req {
781 Ok(Some(SubscribeRunRequest { request: Some(subscribe_run_request::Request::Ack(ack)) })) => {
782 handle_ack(&mut inflight, &mut last_ack, ack.offset);
783 let ack_cursor = if !ack.cursor.trim().is_empty() {
784 Some(ack.cursor)
785 } else {
786 config.cursor.clone()
787 };
788 if let Some(cursor_id) = ack_cursor {
789 if ack.offset < 0 {
790 return Err(invalid_argument("ack offset must be >= 0"));
791 }
792 if let Err(err) = storage
793 .upsert_tap_offset(config.run_id, &cursor_id, ack.offset)
794 .await
795 {
796 error!(
797 run = %config.run_id,
798 cursor = %cursor_id,
799 error = %err,
800 "failed to persist tap cursor offset"
801 );
802 return Err(internal_error("failed to persist tap cursor offset"));
803 }
804 }
805 }
806 Ok(Some(SubscribeRunRequest { request: Some(subscribe_run_request::Request::Open(_)) })) => {
807 return Err(invalid_argument("subscribe_run received multiple open messages"));
808 }
809 Ok(Some(SubscribeRunRequest { request: None })) => {}
810 Ok(None) => {
811 return Ok(());
812 }
813 Err(status) => {
814 return Err(internal_error(status.to_string()));
815 }
816 }
817 }
818 _ = ticker.tick() => {
819 let heartbeat = SubscribeRunResponse {
820 offset: last_ack,
821 watermark: last_seen,
822 heartbeat: true,
823 event: None,
824 };
825 if tx.send(Ok(heartbeat)).await.is_err() {
826 return Ok(());
827 }
828 }
829 }
830 }
831}
832
833#[derive(Debug, Clone)]
834struct QueryFiltersOwned {
835 source: QueryEventSource,
836 run_ids: Vec<Uuid>,
837 step_ids: Vec<Uuid>,
838 workspace_ids: Vec<String>,
839 kinds: Vec<String>,
840 policy_effects: Vec<PolicyDecisionEffect>,
841 severities: Vec<EventSeverity>,
842 start_time: Option<DateTime<Utc>>,
843 end_time: Option<DateTime<Utc>>,
844 ascending: bool,
845}
846
847impl QueryFiltersOwned {
848 fn from_request(request: &QueryEventsRequest) -> Result<Self, Status> {
849 let mut run_ids = Vec::with_capacity(request.run_ids.len());
850 for raw in &request.run_ids {
851 let parsed = Uuid::parse_str(raw)
852 .map_err(|err| invalid_argument(format!("invalid run_id '{raw}': {err}")))?;
853 run_ids.push(parsed);
854 }
855
856 let mut step_ids = Vec::with_capacity(request.step_ids.len());
857 for raw in &request.step_ids {
858 let parsed = Uuid::parse_str(raw)
859 .map_err(|err| invalid_argument(format!("invalid step_id '{raw}': {err}")))?;
860 step_ids.push(parsed);
861 }
862
863 let source = QueryEventSource::try_from(request.source).unwrap_or(QueryEventSource::Run);
864
865 let start_time = request
866 .start_time
867 .as_ref()
868 .map(timestamp_to_datetime)
869 .transpose()?;
870 let end_time = request
871 .end_time
872 .as_ref()
873 .map(timestamp_to_datetime)
874 .transpose()?;
875
876 let kinds = request
877 .kinds
878 .iter()
879 .filter(|kind| !kind.is_empty())
880 .map(|kind| kind.to_ascii_lowercase())
881 .collect::<Vec<_>>();
882
883 let workspace_ids = request
884 .workspace_ids
885 .iter()
886 .filter(|id| !id.is_empty())
887 .map(|id| id.to_ascii_lowercase())
888 .collect::<Vec<_>>();
889
890 let mut policy_effects = Vec::with_capacity(request.policy_effects.len());
891 for raw in &request.policy_effects {
892 if let Ok(effect) = PolicyDecisionEffect::try_from(*raw) {
893 if effect != PolicyDecisionEffect::Unspecified {
894 policy_effects.push(effect);
895 }
896 }
897 }
898
899 let mut severities = Vec::with_capacity(request.severities.len());
900 for raw in &request.severities {
901 if let Ok(severity) = EventSeverity::try_from(*raw) {
902 if severity != EventSeverity::Unspecified {
903 severities.push(severity);
904 }
905 }
906 }
907
908 Ok(Self {
909 source,
910 run_ids,
911 step_ids,
912 workspace_ids,
913 kinds,
914 policy_effects,
915 severities,
916 start_time,
917 end_time,
918 ascending: request.ascending,
919 })
920 }
921}
922
923struct QueryBatch {
924 events: Vec<RunEventEnvelope>,
925 next_offset: Option<u64>,
926}
927
928#[derive(Clone)]
929struct ClickhouseEventQuery {
930 client: ClickhouseClient,
931 run_table: String,
932 step_table: String,
933}
934
935#[derive(Debug, Deserialize, Row)]
936struct RunEventRow {
937 run_id: Uuid,
938 created_at: DateTime<Utc>,
939 status: String,
940 attributes: Value,
941}
942
943#[derive(Debug, Deserialize, Row)]
944struct StepEventRow {
945 run_id: Uuid,
946 step_id: Uuid,
947 status: String,
948 attempt: i32,
949 recorded_at: DateTime<Utc>,
950 attributes: Value,
951}
952
953impl ClickhouseEventQuery {
954 fn from_env() -> Result<Self> {
955 let url = env::var("CLICKHOUSE_DSN")
956 .context("CLICKHOUSE_DSN is not configured for Tap QueryEvents")?;
957 let database = env::var("CLICKHOUSE_DATABASE").unwrap_or_else(|_| "telemetry".to_string());
958 let run_table =
959 env::var("CLICKHOUSE_RUN_TABLE").unwrap_or_else(|_| "run_events".to_string());
960 let step_table =
961 env::var("CLICKHOUSE_STEP_TABLE").unwrap_or_else(|_| "step_events".to_string());
962
963 let client = ClickhouseClient::default()
964 .with_url(url)
965 .with_database(database);
966
967 Ok(Self {
968 client,
969 run_table,
970 step_table,
971 })
972 }
973
974 async fn fetch(
975 &self,
976 filters: &QueryFiltersOwned,
977 offset: u64,
978 limit: u32,
979 ) -> Result<QueryBatch> {
980 match filters.source {
981 QueryEventSource::Step => self.query_step_events(filters, offset, limit).await,
982 _ => self.query_run_events(filters, offset, limit).await,
983 }
984 }
985
986 async fn query_run_events(
987 &self,
988 filters: &QueryFiltersOwned,
989 offset: u64,
990 limit: u32,
991 ) -> Result<QueryBatch> {
992 let chunk = std::cmp::max(limit, 1)
993 .saturating_mul(QUERY_FETCH_MULTIPLIER)
994 .max(limit.max(1));
995 let mut collected = Vec::with_capacity(limit as usize);
996 let mut scan_offset = offset;
997 let mut next_offset = None;
998
999 while collected.len() < limit as usize {
1000 let rows = self.fetch_run_rows(filters, scan_offset, chunk).await?;
1001 if rows.is_empty() {
1002 break;
1003 }
1004
1005 let mut consumed = 0usize;
1006 for row in rows {
1007 let raw_offset = scan_offset + consumed as u64;
1008 consumed += 1;
1009 let payload = normalize_payload(row.attributes);
1010 let kind = format!("run_{}", row.status.to_ascii_lowercase());
1011 let severity = derive_severity(&kind, &payload);
1012 if !matches_filters(filters, &payload, &kind, severity, None) {
1013 continue;
1014 }
1015 let envelope = make_envelope(
1016 row.run_id,
1017 None,
1018 row.created_at,
1019 &payload,
1020 &kind,
1021 severity,
1022 raw_offset,
1023 )?;
1024 collected.push(envelope);
1025 if collected.len() >= limit as usize {
1026 next_offset = Some(raw_offset.saturating_add(1));
1027 break;
1028 }
1029 }
1030
1031 scan_offset = scan_offset.saturating_add(consumed as u64);
1032 if consumed < chunk as usize {
1033 break;
1034 }
1035 }
1036
1037 Ok(QueryBatch {
1038 events: collected,
1039 next_offset,
1040 })
1041 }
1042
1043 async fn query_step_events(
1044 &self,
1045 filters: &QueryFiltersOwned,
1046 offset: u64,
1047 limit: u32,
1048 ) -> Result<QueryBatch> {
1049 let chunk = std::cmp::max(limit, 1)
1050 .saturating_mul(QUERY_FETCH_MULTIPLIER)
1051 .max(limit.max(1));
1052 let mut collected = Vec::with_capacity(limit as usize);
1053 let mut scan_offset = offset;
1054 let mut next_offset = None;
1055
1056 while collected.len() < limit as usize {
1057 let rows = self.fetch_step_rows(filters, scan_offset, chunk).await?;
1058 if rows.is_empty() {
1059 break;
1060 }
1061
1062 let mut consumed = 0usize;
1063 for row in rows {
1064 let raw_offset = scan_offset + consumed as u64;
1065 consumed += 1;
1066 let payload = normalize_payload(row.attributes);
1067 let kind = format!("step_{}", row.status.to_ascii_lowercase());
1068 let severity = derive_severity(&kind, &payload);
1069 if !matches_filters(filters, &payload, &kind, severity, Some(row.step_id)) {
1070 continue;
1071 }
1072 let envelope = make_envelope(
1073 row.run_id,
1074 Some(row.step_id),
1075 row.recorded_at,
1076 &payload,
1077 &kind,
1078 severity,
1079 raw_offset,
1080 )?;
1081 collected.push(envelope);
1082 if collected.len() >= limit as usize {
1083 next_offset = Some(raw_offset.saturating_add(1));
1084 break;
1085 }
1086 }
1087
1088 scan_offset = scan_offset.saturating_add(consumed as u64);
1089 if consumed < chunk as usize {
1090 break;
1091 }
1092 }
1093
1094 Ok(QueryBatch {
1095 events: collected,
1096 next_offset,
1097 })
1098 }
1099
1100 async fn fetch_run_rows(
1101 &self,
1102 filters: &QueryFiltersOwned,
1103 offset: u64,
1104 limit: u32,
1105 ) -> Result<Vec<RunEventRow>> {
1106 let mut where_clauses = Vec::new();
1107 if !filters.run_ids.is_empty() {
1108 where_clauses.push(format!(
1109 "run_id IN ({})",
1110 format_uuid_list(&filters.run_ids)
1111 ));
1112 }
1113 if let Some(start) = filters.start_time {
1114 where_clauses.push(format!(
1115 "created_at >= toDateTime64('{}', 3, 'UTC')",
1116 start.format("%Y-%m-%d %H:%M:%S%.3f")
1117 ));
1118 }
1119 if let Some(end) = filters.end_time {
1120 where_clauses.push(format!(
1121 "created_at <= toDateTime64('{}', 3, 'UTC')",
1122 end.format("%Y-%m-%d %H:%M:%S%.3f")
1123 ));
1124 }
1125
1126 let mut sql = format!(
1127 "SELECT run_id, created_at, status, attributes FROM {}",
1128 self.run_table
1129 );
1130 if !where_clauses.is_empty() {
1131 sql.push_str(" WHERE ");
1132 sql.push_str(&where_clauses.join(" AND "));
1133 }
1134 sql.push_str(" ORDER BY created_at ");
1135 sql.push_str(if filters.ascending { "ASC" } else { "DESC" });
1136 sql.push_str(", run_id ");
1137 sql.push_str(if filters.ascending { "ASC" } else { "DESC" });
1138 sql.push_str(&format!(" LIMIT {} OFFSET {}", limit, offset));
1139
1140 let rows = self
1141 .client
1142 .query(&sql)
1143 .fetch_all::<RunEventRow>()
1144 .await
1145 .context("failed to fetch run events from ClickHouse")?;
1146 Ok(rows)
1147 }
1148
1149 async fn fetch_step_rows(
1150 &self,
1151 filters: &QueryFiltersOwned,
1152 offset: u64,
1153 limit: u32,
1154 ) -> Result<Vec<StepEventRow>> {
1155 let mut where_clauses = Vec::new();
1156 if !filters.run_ids.is_empty() {
1157 where_clauses.push(format!(
1158 "run_id IN ({})",
1159 format_uuid_list(&filters.run_ids)
1160 ));
1161 }
1162 if !filters.step_ids.is_empty() {
1163 where_clauses.push(format!(
1164 "step_id IN ({})",
1165 format_uuid_list(&filters.step_ids)
1166 ));
1167 }
1168
1169 let time_expr = "coalesce(finished_at, started_at)";
1170 if let Some(start) = filters.start_time {
1171 where_clauses.push(format!(
1172 "{time_expr} >= toDateTime64('{}', 3, 'UTC')",
1173 start.format("%Y-%m-%d %H:%M:%S%.3f")
1174 ));
1175 }
1176 if let Some(end) = filters.end_time {
1177 where_clauses.push(format!(
1178 "{time_expr} <= toDateTime64('{}', 3, 'UTC')",
1179 end.format("%Y-%m-%d %H:%M:%S%.3f")
1180 ));
1181 }
1182
1183 let mut sql = format!(
1184 "SELECT run_id, step_id, status, attempt, {time_expr} AS recorded_at, attributes FROM {}",
1185 self.step_table
1186 );
1187 if !where_clauses.is_empty() {
1188 sql.push_str(" WHERE ");
1189 sql.push_str(&where_clauses.join(" AND "));
1190 }
1191 sql.push_str(&format!(" ORDER BY {time_expr} "));
1192 sql.push_str(if filters.ascending { "ASC" } else { "DESC" });
1193 sql.push_str(", step_id ");
1194 sql.push_str(if filters.ascending { "ASC" } else { "DESC" });
1195 sql.push_str(&format!(" LIMIT {} OFFSET {}", limit, offset));
1196
1197 let rows = self
1198 .client
1199 .query(&sql)
1200 .fetch_all::<StepEventRow>()
1201 .await
1202 .context("failed to fetch step events from ClickHouse")?;
1203 Ok(rows)
1204 }
1205}
1206
1207fn parse_page_token(raw: &str) -> Result<u64, Status> {
1208 if raw.trim().is_empty() {
1209 return Ok(0);
1210 }
1211 raw.parse::<u64>()
1212 .map_err(|_| invalid_argument("page_token must be a positive integer"))
1213}
1214
1215fn timestamp_to_datetime(ts: &prost_types::Timestamp) -> Result<DateTime<Utc>, Status> {
1216 Utc.timestamp_opt(ts.seconds, ts.nanos as u32)
1217 .single()
1218 .ok_or_else(|| invalid_argument("timestamp out of range"))
1219}
1220
1221fn normalize_payload(value: Value) -> Value {
1222 match value {
1223 Value::Object(_) => value,
1224 Value::Null => Value::Object(Map::new()),
1225 other => {
1226 let mut map = Map::new();
1227 map.insert("value".to_string(), other);
1228 Value::Object(map)
1229 }
1230 }
1231}
1232
1233fn matches_filters(
1234 filters: &QueryFiltersOwned,
1235 payload: &Value,
1236 kind: &str,
1237 severity: EventSeverity,
1238 step_id: Option<Uuid>,
1239) -> bool {
1240 if !filters.kinds.is_empty() {
1241 let kind_value = kind.to_ascii_lowercase();
1242 if !filters.kinds.iter().any(|expected| expected == &kind_value) {
1243 return false;
1244 }
1245 }
1246
1247 if !filters.workspace_ids.is_empty() {
1248 let workspace = json_get_string(payload, &["trace", "run", "workspace_id"])
1249 .map(|value| value.to_ascii_lowercase());
1250 let Some(workspace) = workspace else {
1251 return false;
1252 };
1253 if !filters
1254 .workspace_ids
1255 .iter()
1256 .any(|expected| expected == &workspace)
1257 {
1258 return false;
1259 }
1260 }
1261
1262 if !filters.policy_effects.is_empty() {
1263 let effect = payload
1264 .get("effect")
1265 .and_then(Value::as_str)
1266 .map(|value| value.to_ascii_lowercase());
1267 if let Some(effect) = effect {
1268 if !filters
1269 .policy_effects
1270 .iter()
1271 .any(|expected| policy_effect_label(*expected) == effect)
1272 {
1273 return false;
1274 }
1275 } else {
1276 return false;
1277 }
1278 }
1279
1280 if !filters.severities.is_empty() && !filters.severities.contains(&severity) {
1281 return false;
1282 }
1283
1284 if filters.source == QueryEventSource::Step && !filters.step_ids.is_empty() {
1285 if let Some(step_id) = step_id {
1286 if !filters.step_ids.contains(&step_id) {
1287 return false;
1288 }
1289 } else {
1290 return false;
1291 }
1292 }
1293
1294 true
1295}
1296
1297fn policy_effect_label(effect: PolicyDecisionEffect) -> &'static str {
1298 match effect {
1299 PolicyDecisionEffect::Allow => "allow",
1300 PolicyDecisionEffect::Deny => "deny",
1301 PolicyDecisionEffect::Escalate => "escalate",
1302 PolicyDecisionEffect::Modify => "modify",
1303 _ => "",
1304 }
1305}
1306
1307fn json_get_string<'a>(value: &'a Value, path: &[&str]) -> Option<&'a str> {
1308 let mut current = value;
1309 for segment in path {
1310 current = current.get(*segment)?;
1311 }
1312 current.as_str()
1313}
1314
1315fn format_uuid_list(values: &[Uuid]) -> String {
1316 values
1317 .iter()
1318 .map(|value| format!("'{}'", value))
1319 .collect::<Vec<_>>()
1320 .join(", ")
1321}
1322
1323fn make_envelope(
1324 run_id: Uuid,
1325 step_id: Option<Uuid>,
1326 recorded_at: DateTime<Utc>,
1327 payload: &Value,
1328 kind: &str,
1329 severity: EventSeverity,
1330 raw_offset: u64,
1331) -> Result<RunEventEnvelope> {
1332 let trace = build_trace_context(run_id, step_id, payload);
1333 let tags = derive_tags(kind, step_id, payload, severity);
1334 let raw_struct = json_to_prost_struct(payload)
1335 .map_err(|err| anyhow!("failed to encode ClickHouse payload into Struct: {err}"))?;
1336 let offset =
1337 i64::try_from(raw_offset).map_err(|_| anyhow!("query offset exceeds i64 range"))?;
1338
1339 Ok(RunEventEnvelope {
1340 trace: Some(trace),
1341 recorded_at: Some(to_timestamp(recorded_at)),
1342 offset,
1343 severity: severity as i32,
1344 tags,
1345 raw: Some(raw_struct),
1346 body: None,
1347 })
1348}
1349
1350fn handle_ack(inflight: &mut VecDeque<i64>, last_ack: &mut i64, ack_offset: i64) {
1351 if ack_offset <= *last_ack {
1352 return;
1353 }
1354 *last_ack = ack_offset;
1355 while let Some(front) = inflight.front() {
1356 if *front <= ack_offset {
1357 inflight.pop_front();
1358 } else {
1359 break;
1360 }
1361 }
1362}
1363
1364fn passes_filters(
1365 config: &SubscriptionConfig,
1366 event: &models::OutboxEvent,
1367 severity: EventSeverity,
1368 tags: &[String],
1369) -> bool {
1370 if let Some(ref kinds) = config.kinds {
1371 if !kinds.contains(&event.kind.to_ascii_lowercase()) {
1372 return false;
1373 }
1374 }
1375
1376 if let Some(ref severities) = config.severities {
1377 if !severities.contains(&severity) {
1378 return false;
1379 }
1380 }
1381
1382 if let Some(ref step_filter) = config.step_ids {
1383 match event.step_id {
1384 Some(step_id) if step_filter.contains(&step_id) => {}
1385 _ => return false,
1386 }
1387 }
1388
1389 if let Some(ref tag_filter) = config.tags {
1390 if !tags
1391 .iter()
1392 .any(|tag| tag_filter.contains(&tag.to_ascii_lowercase()))
1393 {
1394 return false;
1395 }
1396 }
1397
1398 true
1399}
1400
1401fn derive_severity(kind: &str, payload: &Value) -> EventSeverity {
1402 let kind_lower = kind.to_ascii_lowercase();
1403 if kind_lower.contains("failed") || kind_lower.contains("error") {
1404 EventSeverity::Error
1405 } else if kind_lower.contains("denied") || kind_lower.contains("skipped") {
1406 EventSeverity::Warn
1407 } else if kind_lower.contains("debug") {
1408 EventSeverity::Debug
1409 } else if payload
1410 .get("severity")
1411 .and_then(Value::as_str)
1412 .map(|value| value.eq_ignore_ascii_case("warn"))
1413 .unwrap_or(false)
1414 {
1415 EventSeverity::Warn
1416 } else {
1417 EventSeverity::Info
1418 }
1419}
1420
1421fn severity_label(severity: EventSeverity) -> &'static str {
1422 match severity {
1423 EventSeverity::Trace => "trace",
1424 EventSeverity::Debug => "debug",
1425 EventSeverity::Info => "info",
1426 EventSeverity::Warn => "warn",
1427 EventSeverity::Error => "error",
1428 EventSeverity::Fatal => "fatal",
1429 EventSeverity::Unspecified => "unspecified",
1430 }
1431}
1432
1433fn derive_tags(
1434 kind: &str,
1435 step_id: Option<Uuid>,
1436 payload: &Value,
1437 severity: EventSeverity,
1438) -> Vec<String> {
1439 let mut tags = Vec::with_capacity(6);
1440 tags.push("tap".to_string());
1441 tags.push(format!("kind:{}", kind.to_ascii_lowercase()));
1442 tags.push(format!("severity:{}", severity_label(severity)));
1443
1444 if let Some(step_id) = step_id {
1445 tags.push("step".to_string());
1446 tags.push(format!("step_id:{step_id}"));
1447 } else {
1448 tags.push("run".to_string());
1449 }
1450
1451 if let Some(status) = payload.get("status").and_then(Value::as_str) {
1452 tags.push(format!("status:{}", status.to_ascii_lowercase()));
1453 }
1454 if let Some(effect) = payload.get("effect").and_then(Value::as_str) {
1455 tags.push(format!("effect:{}", effect.to_ascii_lowercase()));
1456 }
1457 tags
1458}
1459
1460fn build_response(
1461 config: &SubscriptionConfig,
1462 event: &models::OutboxEvent,
1463 severity: EventSeverity,
1464 tags: &[String],
1465 watermark: i64,
1466) -> Result<SubscribeRunResponse, Status> {
1467 let raw_struct = json_to_prost_struct(&event.payload).map_err(|err| {
1468 internal_error(format!(
1469 "failed to encode outbox payload {}: {err}",
1470 event.id
1471 ))
1472 })?;
1473
1474 let trace = build_trace_context(config.run_id, event.step_id, &event.payload);
1475
1476 let envelope = RunEventEnvelope {
1477 trace: Some(trace),
1478 recorded_at: Some(to_timestamp(event.created_at)),
1479 offset: event.id,
1480 severity: severity as i32,
1481 tags: tags.to_vec(),
1482 raw: Some(raw_struct),
1483 body: None,
1484 };
1485
1486 Ok(SubscribeRunResponse {
1487 offset: event.id,
1488 watermark,
1489 heartbeat: false,
1490 event: Some(envelope),
1491 })
1492}
1493
1494fn build_trace_context(run_id: Uuid, step_id: Option<Uuid>, payload: &Value) -> RunTraceContext {
1495 let mut ctx = RunTraceContext {
1496 trace_id: String::new(),
1497 span_id: String::new(),
1498 parent_span_id: String::new(),
1499 run: Some(tap::RunScope {
1500 run_id: run_id.to_string(),
1501 ..Default::default()
1502 }),
1503 step: None,
1504 tool_call: None,
1505 policy_decision: None,
1506 budget: None,
1507 seed: None,
1508 baggage: Vec::new(),
1509 };
1510
1511 if let Some(trace_value) = payload.get("trace").and_then(Value::as_object) {
1512 if let Some(trace_id) = trace_value.get("trace_id").and_then(Value::as_str) {
1513 ctx.trace_id = trace_id.to_string();
1514 }
1515 if let Some(span_id) = trace_value.get("span_id").and_then(Value::as_str) {
1516 ctx.span_id = span_id.to_string();
1517 }
1518 if let Some(parent_span_id) = trace_value.get("parent_span_id").and_then(Value::as_str) {
1519 ctx.parent_span_id = parent_span_id.to_string();
1520 }
1521 if let Some(seed) = trace_value.get("seed").and_then(Value::as_i64) {
1522 ctx.seed = Some(seed);
1523 }
1524 if let Some(run_obj) = trace_value.get("run").and_then(Value::as_object) {
1525 let run_id_value = run_obj
1526 .get("run_id")
1527 .and_then(Value::as_str)
1528 .map(str::to_owned)
1529 .unwrap_or_else(|| run_id.to_string());
1530 let mut run_scope = tap::RunScope {
1531 run_id: run_id_value,
1532 workspace_id: run_obj
1533 .get("workspace_id")
1534 .and_then(Value::as_str)
1535 .unwrap_or_default()
1536 .to_string(),
1537 app_id: run_obj
1538 .get("app_id")
1539 .and_then(Value::as_str)
1540 .unwrap_or_default()
1541 .to_string(),
1542 attempt_id: run_obj
1543 .get("attempt_id")
1544 .and_then(Value::as_str)
1545 .unwrap_or_default()
1546 .to_string(),
1547 labels: Default::default(),
1548 };
1549 if let Some(labels) = run_obj.get("labels").and_then(Value::as_object) {
1550 for (key, value) in labels {
1551 if let Some(string) = value.as_str() {
1552 run_scope.labels.insert(key.clone(), string.to_string());
1553 }
1554 }
1555 }
1556 ctx.run = Some(run_scope);
1557 }
1558 if let Some(step_obj) = trace_value.get("step").and_then(Value::as_object) {
1559 let mut scope = tap::StepScope {
1560 step_id: step_obj
1561 .get("step_id")
1562 .and_then(Value::as_str)
1563 .map(str::to_owned)
1564 .unwrap_or_else(|| step_id.map(|id| id.to_string()).unwrap_or_default()),
1565 index: step_obj
1566 .get("index")
1567 .and_then(Value::as_u64)
1568 .map(|value| value as u32)
1569 .unwrap_or_default(),
1570 attempt: step_obj
1571 .get("attempt")
1572 .and_then(Value::as_u64)
1573 .map(|value| value as u32)
1574 .unwrap_or_default(),
1575 kind: step_obj
1576 .get("kind")
1577 .and_then(Value::as_str)
1578 .unwrap_or_default()
1579 .to_string(),
1580 scheduler: step_obj
1581 .get("scheduler")
1582 .and_then(Value::as_str)
1583 .unwrap_or_default()
1584 .to_string(),
1585 };
1586 if scope.step_id.is_empty() {
1587 if let Some(id) = step_id {
1588 scope.step_id = id.to_string();
1589 }
1590 }
1591 ctx.step = Some(scope);
1592 } else if let Some(id) = step_id {
1593 ctx.step = Some(tap::StepScope {
1594 step_id: id.to_string(),
1595 ..Default::default()
1596 });
1597 }
1598 if let Some(tool_obj) = trace_value.get("tool_call").and_then(Value::as_object) {
1599 ctx.tool_call = Some(tap::ToolCallScope {
1600 tool_call_id: tool_obj
1601 .get("tool_call_id")
1602 .and_then(Value::as_str)
1603 .unwrap_or_default()
1604 .to_string(),
1605 tool_name: tool_obj
1606 .get("tool_name")
1607 .and_then(Value::as_str)
1608 .unwrap_or_default()
1609 .to_string(),
1610 tool_variant: tool_obj
1611 .get("tool_variant")
1612 .and_then(Value::as_str)
1613 .unwrap_or_default()
1614 .to_string(),
1615 provider: tool_obj
1616 .get("provider")
1617 .and_then(Value::as_str)
1618 .unwrap_or_default()
1619 .to_string(),
1620 });
1621 }
1622 if let Some(policy_obj) = trace_value
1623 .get("policy_decision")
1624 .and_then(Value::as_object)
1625 {
1626 ctx.policy_decision = Some(tap::PolicyDecisionScope {
1627 policy_decision_id: policy_obj
1628 .get("policy_decision_id")
1629 .and_then(Value::as_str)
1630 .unwrap_or_default()
1631 .to_string(),
1632 policy_pack_id: policy_obj
1633 .get("policy_pack_id")
1634 .and_then(Value::as_str)
1635 .unwrap_or_default()
1636 .to_string(),
1637 policy_name: policy_obj
1638 .get("policy_name")
1639 .and_then(Value::as_str)
1640 .unwrap_or_default()
1641 .to_string(),
1642 policy_version: policy_obj
1643 .get("policy_version")
1644 .and_then(Value::as_str)
1645 .unwrap_or_default()
1646 .to_string(),
1647 });
1648 }
1649 if let Some(budget_obj) = trace_value.get("budget").and_then(Value::as_object) {
1650 let mut budget_scope = tap::BudgetScope {
1651 budget_id: budget_obj
1652 .get("budget_id")
1653 .and_then(Value::as_str)
1654 .unwrap_or_default()
1655 .to_string(),
1656 budget_kind: budget_obj
1657 .get("budget_kind")
1658 .and_then(Value::as_str)
1659 .unwrap_or_default()
1660 .to_string(),
1661 amount_remaining: None,
1662 amount_allocated: None,
1663 };
1664 if let Some(value) = budget_obj.get("amount_remaining").and_then(Value::as_f64) {
1665 budget_scope.amount_remaining = Some(value);
1666 }
1667 if let Some(value) = budget_obj.get("amount_allocated").and_then(Value::as_f64) {
1668 budget_scope.amount_allocated = Some(value);
1669 }
1670 ctx.budget = Some(budget_scope);
1671 }
1672 if let Some(baggage_value) = trace_value.get("baggage") {
1673 ctx.baggage = parse_baggage(baggage_value);
1674 }
1675 } else if ctx.step.is_none() {
1676 if let Some(step_id) = step_id {
1677 ctx.step = Some(tap::StepScope {
1678 step_id: step_id.to_string(),
1679 ..Default::default()
1680 });
1681 }
1682 }
1683
1684 if ctx.run.is_none() {
1685 ctx.run = Some(tap::RunScope {
1686 run_id: run_id.to_string(),
1687 ..Default::default()
1688 });
1689 }
1690
1691 ctx
1692}
1693
1694fn parse_baggage(value: &Value) -> Vec<tap::BaggageItem> {
1695 if let Some(array) = value.as_array() {
1696 let mut items = Vec::with_capacity(array.len());
1697 for entry in array {
1698 if let Some(obj) = entry.as_object() {
1699 if let (Some(key), Some(value)) = (
1700 obj.get("key").and_then(Value::as_str),
1701 obj.get("value").and_then(Value::as_str),
1702 ) {
1703 items.push(tap::BaggageItem {
1704 key: key.to_string(),
1705 value: value.to_string(),
1706 });
1707 }
1708 }
1709 }
1710 return items;
1711 }
1712
1713 if let Some(header) = value.as_str() {
1714 return header
1715 .split(',')
1716 .filter_map(|entry| {
1717 let mut parts = entry.splitn(2, '=');
1718 let key = parts.next()?.trim();
1719 let remainder = parts.next()?.trim();
1720 let value = remainder
1721 .split_once(';')
1722 .map(|(val, _)| val.trim())
1723 .unwrap_or(remainder);
1724 if key.is_empty() || value.is_empty() {
1725 None
1726 } else {
1727 Some(tap::BaggageItem {
1728 key: key.to_string(),
1729 value: value.to_string(),
1730 })
1731 }
1732 })
1733 .collect();
1734 }
1735
1736 Vec::new()
1737}
1738
1739#[derive(Debug)]
1740struct SubmitRunSpec {
1741 dag_json: Value,
1742 inputs: Value,
1743 labels: HashMap<String, String>,
1744 seed: i64,
1745 traceparent: Option<String>,
1746 tracestate: Option<String>,
1747 baggage: Option<String>,
1748 debug: Option<Value>,
1749 breakpoints: Vec<String>,
1750}
1751
1752#[derive(Debug)]
1753enum SubmitRunOutcome {
1754 Created(Uuid),
1755 Reused(Uuid),
1756}
1757
1758#[derive(Debug)]
1759enum SubmitRunError {
1760 Invalid(String),
1761 Internal(String),
1762}
1763
1764impl std::fmt::Display for SubmitRunError {
1765 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1766 match self {
1767 SubmitRunError::Invalid(msg) => write!(f, "invalid run submission: {msg}"),
1768 SubmitRunError::Internal(msg) => write!(f, "internal run submission error: {msg}"),
1769 }
1770 }
1771}
1772
1773impl SubmitRunError {
1774 fn invalid(message: impl Into<String>) -> Self {
1775 Self::Invalid(message.into())
1776 }
1777
1778 fn internal(message: impl Into<String>) -> Self {
1779 Self::Internal(message.into())
1780 }
1781}
1782
1783fn format_schema_validation_error(err: SchemaValidationError) -> String {
1784 if err.details.is_empty() {
1785 err.schema.to_string()
1786 } else {
1787 err.details
1788 .iter()
1789 .map(|detail| detail.to_string())
1790 .collect::<Vec<_>>()
1791 .join("; ")
1792 }
1793}
1794
1795fn normalize_idempotency_key(raw: Option<&str>) -> Option<String> {
1796 raw.and_then(|value| {
1797 let trimmed = value.trim();
1798 if trimmed.is_empty() {
1799 None
1800 } else {
1801 Some(trimmed.to_owned())
1802 }
1803 })
1804}
1805
1806fn resolve_breakpoint(
1807 spec: &str,
1808 slug_map: &HashMap<String, Uuid>,
1809 step_ids: &HashSet<Uuid>,
1810) -> Result<Uuid, SubmitRunError> {
1811 let trimmed = spec.trim();
1812 if trimmed.is_empty() {
1813 return Err(SubmitRunError::invalid("breakpoint spec cannot be empty"));
1814 }
1815
1816 let (kind, target) = match trimmed.split_once(':') {
1817 Some((kind, rest)) => (kind.trim(), rest.trim()),
1818 None => ("step", trimmed),
1819 };
1820
1821 if target.is_empty() {
1822 return Err(SubmitRunError::invalid(format!(
1823 "breakpoint '{}' missing target",
1824 spec
1825 )));
1826 }
1827
1828 match kind.to_ascii_lowercase().as_str() {
1829 "step" => {
1830 if let Ok(uuid) = Uuid::parse_str(target) {
1831 if !step_ids.contains(&uuid) {
1832 return Err(SubmitRunError::invalid(format!(
1833 "breakpoint references unknown step id '{}'",
1834 target
1835 )));
1836 }
1837 return Ok(uuid);
1838 }
1839
1840 let key = target.to_ascii_lowercase();
1841 if let Some(uuid) = slug_map.get(&key) {
1842 return Ok(*uuid);
1843 }
1844
1845 Err(SubmitRunError::invalid(format!(
1846 "breakpoint references unknown step slug '{}'",
1847 target
1848 )))
1849 }
1850 other => Err(SubmitRunError::invalid(format!(
1851 "unsupported breakpoint kind '{}'",
1852 other
1853 ))),
1854 }
1855}
1856
1857async fn submit_run_impl(
1858 storage: &Storage,
1859 audit: &Audit,
1860 spec: SubmitRunSpec,
1861 idempotency_key: Option<String>,
1862 actor: &str,
1863) -> Result<SubmitRunOutcome, SubmitRunError> {
1864 if let Some(ref key) = idempotency_key {
1865 if let Some(existing_run) =
1866 storage
1867 .fetch_run_by_idempotency_key(key)
1868 .await
1869 .map_err(|err| {
1870 SubmitRunError::internal(format!("failed to check idempotency key: {err}"))
1871 })?
1872 {
1873 info!(
1874 run_id = %existing_run.run_id,
1875 idempotency_key = %key,
1876 "reusing existing run for idempotent submission"
1877 );
1878 return Ok(SubmitRunOutcome::Reused(existing_run.run_id));
1879 }
1880 }
1881
1882 let SubmitRunSpec {
1883 dag_json,
1884 inputs,
1885 labels,
1886 seed,
1887 traceparent,
1888 tracestate,
1889 baggage,
1890 debug,
1891 breakpoints,
1892 } = spec;
1893
1894 if seed <= 0 {
1895 return Err(SubmitRunError::invalid(
1896 "seed must be a positive integer to guarantee deterministic replay",
1897 ));
1898 }
1899
1900 let steps_array = dag_json
1901 .get("steps")
1902 .and_then(Value::as_array)
1903 .ok_or_else(|| SubmitRunError::invalid("dag_json.steps must be an array"))?;
1904
1905 let inputs_value = match inputs {
1906 Value::Null => Value::Object(Map::new()),
1907 Value::Object(map) => Value::Object(map),
1908 _ => return Err(SubmitRunError::invalid("inputs must be a JSON object")),
1909 };
1910
1911 let labels_value = labels_map_to_value(&labels);
1912
1913 let run_spec_instance = json!({
1914 "dag_json": dag_json.clone(),
1915 "inputs": inputs_value.clone(),
1916 "labels": labels_value.clone(),
1917 "seed": seed,
1918 "breakpoints": breakpoints.clone(),
1919 });
1920
1921 if let Err(err) = validate_run_spec(&run_spec_instance) {
1922 return Err(SubmitRunError::invalid(format!(
1923 "run spec schema validation failed: {}",
1924 format_schema_validation_error(err)
1925 )));
1926 }
1927
1928 let mut debug_map = match debug {
1929 Some(Value::Object(map)) => map,
1930 Some(_) => {
1931 return Err(SubmitRunError::invalid(
1932 "debug metadata must be a JSON object",
1933 ))
1934 }
1935 None => Map::new(),
1936 };
1937
1938 let mut ctx_map = Map::new();
1939 ctx_map.insert("inputs".to_string(), inputs_value.clone());
1940 ctx_map.insert("labels".to_string(), labels_value.clone());
1941 if traceparent.is_some() || tracestate.is_some() || baggage.is_some() {
1942 let mut trace_obj = Map::new();
1943 if let Some(tp) = traceparent {
1944 trace_obj.insert("traceparent".to_string(), Value::String(tp));
1945 }
1946 if let Some(ts) = tracestate {
1947 trace_obj.insert("tracestate".to_string(), Value::String(ts));
1948 }
1949 if let Some(bg) = baggage {
1950 trace_obj.insert("baggage".to_string(), Value::String(bg));
1951 }
1952 ctx_map.insert("trace".to_string(), Value::Object(trace_obj));
1953 }
1954
1955 let submission_time = Utc::now();
1956
1957 struct PendingStep {
1958 idx: i32,
1959 step_id: Uuid,
1960 priority: i16,
1961 spec: Value,
1962 max_attempts: i32,
1963 retry_backoff_ms: i64,
1964 retry_backoff_factor: f64,
1965 deadline_at: Option<DateTime<Utc>>,
1966 estimated_tokens: i64,
1967 estimated_cost: f64,
1968 compensation_target: Option<String>,
1969 }
1970
1971 let mut pending_steps = Vec::with_capacity(steps_array.len());
1972 let mut seen_step_ids = HashSet::with_capacity(steps_array.len());
1973 let mut slug_to_step: HashMap<String, Uuid> = HashMap::new();
1974 let mut slug_entries: Vec<(String, Uuid)> = Vec::new();
1975
1976 for (idx, step_value) in steps_array.iter().enumerate() {
1977 if let Err(err) = validate_step_spec(step_value) {
1978 let detail = err
1979 .details
1980 .iter()
1981 .map(|err| err.to_string())
1982 .collect::<Vec<_>>()
1983 .join("; ");
1984 return Err(SubmitRunError::invalid(format!(
1985 "step schema validation failed: {detail}"
1986 )));
1987 }
1988
1989 let step_obj = step_value
1990 .as_object()
1991 .ok_or_else(|| SubmitRunError::invalid("each step must be a JSON object"))?;
1992
1993 let step_id_str = step_obj
1994 .get("id")
1995 .and_then(Value::as_str)
1996 .ok_or_else(|| SubmitRunError::invalid("step.id must be a UUID string"))?;
1997 let step_id = Uuid::parse_str(step_id_str)
1998 .map_err(|err| SubmitRunError::invalid(format!("invalid step.id: {err}")))?;
1999
2000 if !seen_step_ids.insert(step_id) {
2001 return Err(SubmitRunError::invalid(format!(
2002 "duplicate step.id '{}' detected in dag_json.steps",
2003 step_id
2004 )));
2005 }
2006
2007 if let Some(slug_value) = step_obj.get("slug").and_then(Value::as_str) {
2008 let trimmed = slug_value.trim();
2009 if trimmed.is_empty() {
2010 return Err(SubmitRunError::invalid("step.slug cannot be empty"));
2011 }
2012 let key = trimmed.to_ascii_lowercase();
2013 if slug_to_step.insert(key.clone(), step_id).is_some() {
2014 return Err(SubmitRunError::invalid(format!(
2015 "duplicate step.slug '{}' detected in dag_json.steps",
2016 trimmed
2017 )));
2018 }
2019 slug_entries.push((trimmed.to_string(), step_id));
2020 }
2021
2022 let priority = extract_priority(step_obj);
2023
2024 let execution_obj = step_obj.get("execution").and_then(Value::as_object);
2025 let max_attempts = execution_obj
2026 .and_then(|exec| exec.get("max_attempts").and_then(Value::as_i64))
2027 .and_then(|v| i32::try_from(v).ok())
2028 .filter(|v| *v >= 1)
2029 .unwrap_or(1);
2030 let retry_backoff_ms = execution_obj
2031 .and_then(|exec| exec.get("retry_backoff_ms").and_then(Value::as_i64))
2032 .map(|v| v.max(0))
2033 .unwrap_or(0);
2034 let retry_backoff_factor = execution_obj
2035 .and_then(|exec| exec.get("retry_backoff_factor").and_then(Value::as_f64))
2036 .filter(|v| *v >= 1.0)
2037 .unwrap_or(1.0);
2038 let deadline_at = execution_obj
2039 .and_then(|exec| exec.get("deadline_ms").and_then(Value::as_i64))
2040 .filter(|ms| *ms > 0)
2041 .map(|ms| submission_time + chrono::Duration::milliseconds(ms));
2042
2043 let cost_hint =
2044 execution_obj.and_then(|exec| exec.get("cost_hint").and_then(Value::as_object));
2045 let estimated_tokens = cost_hint
2046 .and_then(|hint| hint.get("tokens").and_then(Value::as_i64))
2047 .map(|v| v.max(0))
2048 .unwrap_or(0);
2049 let estimated_cost = cost_hint
2050 .and_then(|hint| hint.get("usd").and_then(Value::as_f64))
2051 .map(|v| v.max(0.0))
2052 .unwrap_or(0.0);
2053
2054 let compensation_target = execution_obj
2055 .and_then(|exec| exec.get("compensation").and_then(Value::as_object))
2056 .and_then(|comp| comp.get("step").and_then(Value::as_str))
2057 .map(|s| s.to_string());
2058
2059 pending_steps.push(PendingStep {
2060 idx: idx as i32,
2061 step_id,
2062 priority,
2063 spec: step_value.clone(),
2064 max_attempts,
2065 retry_backoff_ms,
2066 retry_backoff_factor,
2067 deadline_at,
2068 estimated_tokens,
2069 estimated_cost,
2070 compensation_target,
2071 });
2072 }
2073 let mut incoming_counts: HashMap<Uuid, i32> =
2074 pending_steps.iter().map(|step| (step.step_id, 0)).collect();
2075
2076 let mut edges: Vec<(Uuid, Uuid)> = Vec::new();
2077 if let Some(edges_array) = dag_json.get("edges").and_then(Value::as_array) {
2078 let mut seen_edges = HashSet::with_capacity(edges_array.len());
2079 for edge_value in edges_array {
2080 let pair = edge_value.as_array().ok_or_else(|| {
2081 SubmitRunError::invalid("dag_json.edges entries must be [from, to] arrays")
2082 })?;
2083 if pair.len() != 2 {
2084 return Err(SubmitRunError::invalid(
2085 "dag_json.edges entries must contain exactly two elements",
2086 ));
2087 }
2088
2089 let from_str = pair[0]
2090 .as_str()
2091 .ok_or_else(|| SubmitRunError::invalid("edge source must be a UUID string"))?;
2092 let to_str = pair[1]
2093 .as_str()
2094 .ok_or_else(|| SubmitRunError::invalid("edge target must be a UUID string"))?;
2095
2096 let from = Uuid::parse_str(from_str).map_err(|err| {
2097 SubmitRunError::invalid(format!("invalid edge source UUID: {err}"))
2098 })?;
2099 let to = Uuid::parse_str(to_str).map_err(|err| {
2100 SubmitRunError::invalid(format!("invalid edge target UUID: {err}"))
2101 })?;
2102
2103 if from == to {
2104 return Err(SubmitRunError::invalid(
2105 "edge cannot reference the same step twice",
2106 ));
2107 }
2108 if !seen_step_ids.contains(&from) || !seen_step_ids.contains(&to) {
2109 return Err(SubmitRunError::invalid("edge references unknown step id"));
2110 }
2111 if !seen_edges.insert((from, to)) {
2112 return Err(SubmitRunError::invalid(
2113 "duplicate edge detected in dag_json.edges",
2114 ));
2115 }
2116
2117 let entry = incoming_counts.get_mut(&to).ok_or_else(|| {
2118 SubmitRunError::invalid("edge target not present in dag_json.steps")
2119 })?;
2120 let next_count = (*entry).checked_add(1).ok_or_else(|| {
2121 SubmitRunError::invalid("too many incoming edges for a single step")
2122 })?;
2123 *entry = next_count;
2124
2125 edges.push((from, to));
2126 }
2127 }
2128
2129 let mut breakpoint_payloads: Vec<Value> = Vec::new();
2130 let mut breakpoint_specs: Vec<String> = Vec::new();
2131 let mut breakpoint_ids: HashSet<Uuid> = HashSet::new();
2132 for bp in &breakpoints {
2133 let resolved = resolve_breakpoint(bp, &slug_to_step, &seen_step_ids)?;
2134 if !breakpoint_ids.insert(resolved) {
2135 return Err(SubmitRunError::invalid(format!(
2136 "duplicate breakpoint targeting step {}",
2137 resolved
2138 )));
2139 }
2140
2141 let mut record = Map::new();
2142 record.insert("spec".to_string(), Value::String(bp.clone()));
2143 record.insert("step_id".to_string(), Value::String(resolved.to_string()));
2144 breakpoint_payloads.push(Value::Object(record));
2145 breakpoint_specs.push(bp.clone());
2146 }
2147
2148 if !breakpoint_payloads.is_empty() {
2149 debug_map.insert(
2150 "breakpoints".to_string(),
2151 Value::Array(breakpoint_payloads.clone()),
2152 );
2153 let specs = breakpoint_specs
2154 .iter()
2155 .cloned()
2156 .map(Value::String)
2157 .collect::<Vec<_>>();
2158 debug_map.insert("breakpoint_specs".to_string(), Value::Array(specs));
2159 }
2160
2161 if !slug_entries.is_empty() {
2162 let mut slugs = Map::new();
2163 for (slug, step_id) in &slug_entries {
2164 slugs.insert(slug.clone(), Value::String(step_id.to_string()));
2165 }
2166 debug_map.insert("slugs".to_string(), Value::Object(slugs));
2167 }
2168
2169 debug_map
2170 .entry("active_pause".to_string())
2171 .or_insert(Value::Null);
2172
2173 if !debug_map.is_empty() {
2174 ctx_map.insert("debug".to_string(), Value::Object(debug_map.clone()));
2175 }
2176
2177 let input_ctx = Value::Object(ctx_map);
2178
2179 let run_id = Uuid::new_v4();
2180 let mut steps = Vec::with_capacity(pending_steps.len());
2181 for pending in pending_steps {
2182 let PendingStep {
2183 idx,
2184 step_id,
2185 priority,
2186 spec,
2187 max_attempts,
2188 retry_backoff_ms,
2189 retry_backoff_factor,
2190 deadline_at,
2191 estimated_tokens,
2192 estimated_cost,
2193 compensation_target,
2194 } = pending;
2195
2196 let dep_count = *incoming_counts.get(&step_id).unwrap_or(&0);
2197 let status = if dep_count == 0 {
2198 models::StepStatus::Queued
2199 } else {
2200 models::StepStatus::Blocked
2201 };
2202
2203 let compensation_step = if let Some(target) = compensation_target {
2204 let trimmed = target.trim();
2205 if trimmed.is_empty() {
2206 return Err(SubmitRunError::invalid(format!(
2207 "compensation step reference cannot be empty for step {}",
2208 step_id
2209 )));
2210 }
2211
2212 let resolved = if let Ok(uuid) = Uuid::parse_str(trimmed) {
2213 if !seen_step_ids.contains(&uuid) {
2214 return Err(SubmitRunError::invalid(format!(
2215 "compensation step '{}' not found in dag_json.steps",
2216 trimmed
2217 )));
2218 }
2219 uuid
2220 } else {
2221 let key = trimmed.to_ascii_lowercase();
2222 *slug_to_step.get(&key).ok_or_else(|| {
2223 SubmitRunError::invalid(format!(
2224 "compensation step '{}' not found in dag_json.steps",
2225 trimmed
2226 ))
2227 })?
2228 };
2229
2230 if resolved == step_id {
2231 return Err(SubmitRunError::invalid(format!(
2232 "step {} cannot reference itself as a compensation step",
2233 step_id
2234 )));
2235 }
2236
2237 Some(resolved)
2238 } else {
2239 None
2240 };
2241
2242 steps.push(models::NewStep {
2243 run_id,
2244 step_id,
2245 idx,
2246 priority,
2247 spec_json: spec,
2248 status,
2249 attempt: 0,
2250 input_snapshot: None,
2251 provider: None,
2252 provider_version: None,
2253 max_attempts,
2254 retry_backoff_ms,
2255 retry_backoff_factor,
2256 not_before: None,
2257 deadline_at,
2258 pending_dependencies: dep_count,
2259 total_dependencies: dep_count,
2260 estimated_tokens,
2261 estimated_cost,
2262 actual_tokens: None,
2263 actual_cost: None,
2264 checkpoint: None,
2265 compensation_step,
2266 compensation_scheduled: false,
2267 });
2268 }
2269
2270 let new_run = models::NewRun {
2271 run_id,
2272 status: models::RunStatus::Pending,
2273 dag_json: dag_json.clone(),
2274 input_ctx,
2275 seed,
2276 idempotency_key: idempotency_key.clone(),
2277 };
2278
2279 let stored_run_id = storage
2280 .insert_run_with_steps(new_run, &steps, &edges)
2281 .await
2282 .map_err(|err| {
2283 error!(error = ?err, "failed to persist run");
2284 SubmitRunError::internal(format!("failed to persist run: {err}"))
2285 })?;
2286
2287 info!(run_id = %stored_run_id, "accepted run submission");
2288
2289 let tenant_id = labels
2290 .get("tenant_id")
2291 .or_else(|| labels.get("tenant"))
2292 .and_then(|value| Uuid::parse_str(value).ok());
2293 let audit_resource = json!({
2294 "run_id": stored_run_id.to_string(),
2295 "idempotency_key": idempotency_key,
2296 "labels": labels,
2297 });
2298
2299 if let Err(err) = audit
2300 .append(actor, tenant_id, "run.submit", &audit_resource)
2301 .await
2302 {
2303 warn!(
2304 error = %err,
2305 run_id = %stored_run_id,
2306 "failed to append audit log for run submission"
2307 );
2308 }
2309
2310 Ok(SubmitRunOutcome::Created(stored_run_id))
2311}
2312
2313#[tonic::async_trait]
2314impl RuntimeService for RuntimeApiService {
2315 async fn submit_run(
2316 &self,
2317 request: Request<runtime::SubmitRunRequest>,
2318 ) -> Result<Response<runtime::SubmitRunResponse>, Status> {
2319 let metadata = request.metadata().clone();
2320 let role = self.require_role(&metadata, Role::Writer).await?;
2321 let req = request.into_inner();
2322 let runtime::SubmitRunRequest {
2323 spec,
2324 idempotency_key: raw_idempotency_key,
2325 } = req;
2326
2327 let idempotency_key = normalize_idempotency_key(Some(raw_idempotency_key.as_str()));
2328
2329 let spec = spec.ok_or_else(|| invalid_argument("spec is required"))?;
2330
2331 let dag_json = parse_dag_json(&spec.dag_json)?;
2332 let inputs_value = spec
2333 .inputs
2334 .as_ref()
2335 .map(prost_struct_to_json)
2336 .unwrap_or_else(|| Value::Object(Map::new()));
2337
2338 let traceparent = metadata
2339 .get("traceparent")
2340 .and_then(|value| value.to_str().ok())
2341 .map(|s| s.to_string());
2342 let tracestate = metadata
2343 .get("tracestate")
2344 .and_then(|value| value.to_str().ok())
2345 .map(|s| s.to_string());
2346 let baggage = metadata
2347 .get("baggage")
2348 .and_then(|value| value.to_str().ok())
2349 .map(|s| s.to_string());
2350
2351 let submission = SubmitRunSpec {
2352 dag_json,
2353 inputs: inputs_value,
2354 labels: spec.labels.clone(),
2355 seed: spec.seed,
2356 traceparent,
2357 tracestate,
2358 baggage,
2359 debug: None,
2360 breakpoints: spec.breakpoints.clone(),
2361 };
2362
2363 let actor = Self::actor_from_metadata(&metadata, role);
2364
2365 let outcome = submit_run_impl(
2366 self.storage.as_ref(),
2367 self.audit.as_ref(),
2368 submission,
2369 idempotency_key,
2370 &actor,
2371 )
2372 .await
2373 .map_err(|err| match err {
2374 SubmitRunError::Invalid(message) => invalid_argument(message),
2375 SubmitRunError::Internal(message) => internal_error(message),
2376 })?;
2377
2378 let run_id = match outcome {
2379 SubmitRunOutcome::Created(run_id) | SubmitRunOutcome::Reused(run_id) => run_id,
2380 };
2381
2382 Ok(Response::new(runtime::SubmitRunResponse {
2383 run_id: run_id.to_string(),
2384 }))
2385 }
2386
2387 async fn get_run(
2388 &self,
2389 request: Request<runtime::GetRunRequest>,
2390 ) -> Result<Response<runtime::GetRunResponse>, Status> {
2391 let metadata = request.metadata().clone();
2392 self.require_role(&metadata, Role::Reader).await?;
2393 let req = request.into_inner();
2394 let run_id = Uuid::parse_str(&req.run_id)
2395 .map_err(|err| invalid_argument(format!("invalid run_id: {err}")))?;
2396
2397 let run = self
2398 .storage
2399 .fetch_run(run_id)
2400 .await
2401 .map_err(|err| internal_error(format!("failed to fetch run: {err}")))?
2402 .ok_or_else(|| Status::not_found("run not found"))?;
2403
2404 let steps = self
2405 .storage
2406 .fetch_steps_for_run(run_id)
2407 .await
2408 .map_err(|err| internal_error(format!("failed to fetch steps: {err}")))?;
2409
2410 let run_detail = build_run_detail(run, steps)?;
2411
2412 Ok(Response::new(runtime::GetRunResponse {
2413 run: Some(run_detail),
2414 }))
2415 }
2416
2417 type TapRunStream = Pin<Box<dyn Stream<Item = Result<runtime::RunEvent, Status>> + Send>>;
2418
2419 async fn tap_run(
2420 &self,
2421 request: Request<runtime::TapRunRequest>,
2422 ) -> Result<Response<Self::TapRunStream>, Status> {
2423 let metadata = request.metadata().clone();
2424 self.require_role(&metadata, Role::Reader).await?;
2425 let req = request.into_inner();
2426 let run_uuid = Uuid::parse_str(&req.run_id)
2427 .map_err(|err| invalid_argument(format!("invalid run_id: {err}")))?;
2428
2429 if self
2431 .storage
2432 .fetch_run(run_uuid)
2433 .await
2434 .map_err(|err| internal_error(format!("failed to verify run: {err}")))?
2435 .is_none()
2436 {
2437 return Err(Status::not_found("run not found"));
2438 }
2439
2440 let storage = Arc::clone(&self.storage);
2441 let (tx, rx) = mpsc::channel::<Result<runtime::RunEvent, Status>>(32);
2442
2443 tokio::spawn(async move {
2444 let mut last_id: i64 = 0;
2445
2446 loop {
2447 if tx.is_closed() {
2448 break;
2449 }
2450
2451 match storage
2452 .fetch_outbox_for_run_since(run_uuid, last_id, 256)
2453 .await
2454 {
2455 Ok(events) => {
2456 if events.is_empty() {
2457 sleep(Duration::from_millis(500)).await;
2458 continue;
2459 }
2460
2461 for event in events {
2462 last_id = event.id;
2463 match run_event_from_outbox(event) {
2464 Ok(run_event) => {
2465 if tx.send(Ok(run_event)).await.is_err() {
2466 return;
2467 }
2468 }
2469 Err(status) => {
2470 let _ = tx.send(Err(status)).await;
2471 return;
2472 }
2473 }
2474 }
2475 }
2476 Err(err) => {
2477 warn!(run_id = %run_uuid, error = %err, "tap stream failed to read outbox events");
2478 let status = internal_error(format!("failed to fetch run events: {err}"));
2479 let _ = tx.send(Err(status)).await;
2480 break;
2481 }
2482 }
2483 }
2484 });
2485
2486 let stream = ReceiverStream::new(rx);
2487 Ok(Response::new(Box::pin(stream) as Self::TapRunStream))
2488 }
2489
2490 async fn replay_run(
2491 &self,
2492 request: Request<runtime::ReplayRunRequest>,
2493 ) -> Result<Response<runtime::ReplayRunResponse>, Status> {
2494 let metadata = request.metadata().clone();
2495 self.require_role(&metadata, Role::Writer).await?;
2496 let req = request.into_inner();
2497 let run_uuid = Uuid::parse_str(&req.run_id)
2498 .map_err(|err| invalid_argument(format!("invalid run_id: {err}")))?;
2499
2500 if self
2501 .storage
2502 .fetch_run(run_uuid)
2503 .await
2504 .map_err(|err| internal_error(format!("failed to load run: {err}")))?
2505 .is_none()
2506 {
2507 return Err(Status::not_found("run not found"));
2508 }
2509
2510 let strategy = match runtime::ReplayMode::try_from(req.mode).ok() {
2511 Some(runtime::ReplayMode::Unspecified)
2512 | Some(runtime::ReplayMode::Full)
2513 | Some(runtime::ReplayMode::Diff) => runtime::ReplayStrategy::Mocked,
2514 Some(runtime::ReplayMode::StepOnly) => {
2515 return Err(Status::unimplemented(
2516 "Replay mode STEP_ONLY is not yet supported",
2517 ));
2518 }
2519 None => {
2520 return Err(invalid_argument("unrecognized replay mode"));
2521 }
2522 };
2523
2524 let steps = self
2525 .storage
2526 .fetch_steps_for_run(run_uuid)
2527 .await
2528 .map_err(|err| internal_error(format!("failed to load steps for replay: {err}")))?;
2529 let attempts = self
2530 .storage
2531 .fetch_step_attempts_for_run(run_uuid)
2532 .await
2533 .map_err(|err| {
2534 internal_error(format!("failed to load step attempts for replay: {err}"))
2535 })?;
2536
2537 let mut attempts_by_step: HashMap<Uuid, Vec<models::StepAttempt>> = HashMap::new();
2538 for attempt in attempts {
2539 attempts_by_step
2540 .entry(attempt.step_id)
2541 .or_default()
2542 .push(attempt);
2543 }
2544 for entries in attempts_by_step.values_mut() {
2545 entries.sort_by_key(|attempt| attempt.attempt);
2546 }
2547
2548 let replay_mode = runtime::ReplayMode::try_from(req.mode).unwrap_or(runtime::ReplayMode::Full);
2549 let diff_mode = matches!(replay_mode, runtime::ReplayMode::Diff);
2550
2551 let mut proto_steps = Vec::with_capacity(steps.len());
2552 let mut all_within = true;
2553
2554 for step in steps {
2555 let attempts_for_step = attempts_by_step.get(&step.step_id);
2556 let latest_attempt = attempts_for_step.and_then(|entries| entries.last());
2557
2558 let recorded_snapshot = latest_attempt
2559 .and_then(|attempt| attempt.output_snapshot.clone())
2560 .or_else(|| step.output_snapshot.clone())
2561 .ok_or_else(|| {
2562 Status::failed_precondition(format!(
2563 "step {} missing output_snapshot; cannot replay",
2564 step.step_id
2565 ))
2566 })?;
2567
2568 let baseline_attempt = if diff_mode {
2569 attempts_for_step.and_then(|entries| {
2570 if entries.len() >= 2 {
2571 entries.get(entries.len() - 2)
2572 } else {
2573 None
2574 }
2575 })
2576 } else {
2577 latest_attempt
2578 };
2579
2580 let baseline_output = baseline_attempt
2581 .and_then(|attempt| attempt.output_snapshot.clone())
2582 .unwrap_or_else(|| recorded_snapshot.clone());
2583
2584 let current_output = if diff_mode {
2585 recorded_snapshot.clone()
2586 } else {
2587 step.output_json
2588 .clone()
2589 .unwrap_or_else(|| recorded_snapshot.clone())
2590 };
2591
2592 let drift_result = compute_drift(
2593 &baseline_output,
2594 ¤t_output,
2595 DEFAULT_TOKEN_TOLERANCE,
2596 DEFAULT_COST_TOLERANCE,
2597 );
2598 if !drift_result.within_tolerance {
2599 all_within = false;
2600 }
2601
2602 let snapshot_value = json_to_prost_value(&recorded_snapshot)
2603 .map_err(|err| internal_error(format!("failed to encode snapshot: {err}")))?;
2604
2605 let recorded_meta = wrap_attempt_output(baseline_attempt, &baseline_output);
2606 let current_meta = wrap_attempt_output(latest_attempt, ¤t_output);
2607
2608 let drift_proto = runtime::ReplayStepDrift {
2609 recorded: Some(json_to_prost_value(&recorded_meta).map_err(|err| {
2610 internal_error(format!("failed to encode recorded drift snapshot: {err}"))
2611 })?),
2612 current: Some(json_to_prost_value(¤t_meta).map_err(|err| {
2613 internal_error(format!("failed to encode current drift snapshot: {err}"))
2614 })?),
2615 token_delta: drift_result.token_delta,
2616 cost_delta: drift_result.cost_delta,
2617 within_tolerance: drift_result.within_tolerance,
2618 };
2619
2620 proto_steps.push(runtime::ReplayStep {
2621 step_id: step.step_id.to_string(),
2622 snapshot: Some(snapshot_value),
2623 drift: Some(drift_proto),
2624 });
2625 }
2626
2627 let response = runtime::ReplayRunResponse {
2628 run_id: req.run_id,
2629 all_within_tolerance: all_within,
2630 strategy: strategy as i32,
2631 steps: proto_steps,
2632 };
2633
2634 Ok(Response::new(response))
2635 }
2636
2637 async fn update_breakpoints(
2638 &self,
2639 request: Request<runtime::UpdateBreakpointsRequest>,
2640 ) -> Result<Response<runtime::UpdateBreakpointsResponse>, Status> {
2641 let metadata = request.metadata().clone();
2642 let role = self.require_role(&metadata, Role::Writer).await?;
2643 let actor = Self::actor_from_metadata(&metadata, role);
2644 let req = request.into_inner();
2645
2646 let run_uuid = Uuid::parse_str(&req.run_id)
2647 .map_err(|err| invalid_argument(format!("invalid run_id: {err}")))?;
2648
2649 if !req.clear_all && req.add.is_empty() && req.remove.is_empty() {
2650 return Err(invalid_argument(
2651 "update_breakpoints requires add/remove entries or clear_all",
2652 ));
2653 }
2654
2655 let run = self
2656 .storage
2657 .fetch_run(run_uuid)
2658 .await
2659 .map_err(|err| internal_error(format!("failed to fetch run: {err}")))?
2660 .ok_or_else(|| Status::not_found("run not found"))?;
2661
2662 let step_ids = collect_step_ids(&run);
2663
2664 let ctx_value = run.input_ctx.clone();
2665 let mut ctx_map = match ctx_value {
2666 Value::Object(map) => map,
2667 Value::Null => Map::new(),
2668 other => {
2669 return Err(internal_error(format!(
2670 "run input context must be an object, found {other}"
2671 )))
2672 }
2673 };
2674
2675 let debug_entry = ctx_map
2676 .entry("debug".to_string())
2677 .or_insert_with(|| Value::Object(Map::new()));
2678 let debug_map = debug_entry.as_object_mut().ok_or_else(|| {
2679 internal_error("run debug context is not an object; unable to update breakpoints")
2680 })?;
2681
2682 let mut breakpoints = load_debug_breakpoints(debug_map);
2683 let existing_slug_entries = load_slug_map(debug_map);
2684
2685 let mut slug_lookup: HashMap<String, Uuid> = existing_slug_entries
2686 .iter()
2687 .map(|(slug, id)| (slug.to_ascii_lowercase(), *id))
2688 .collect();
2689 if slug_lookup.is_empty() {
2690 for (slug, id) in collect_slug_entries(&run) {
2691 slug_lookup.insert(slug.to_ascii_lowercase(), id);
2692 }
2693 }
2694
2695 let slug_entries_store: Vec<(String, Uuid)> = if existing_slug_entries.is_empty() {
2696 collect_slug_entries(&run)
2697 } else {
2698 existing_slug_entries
2699 .iter()
2700 .map(|(slug, id)| (slug.clone(), *id))
2701 .collect()
2702 };
2703
2704 if req.clear_all {
2705 breakpoints.clear();
2706 }
2707
2708 let removal_specs: HashSet<String> = req
2709 .remove
2710 .iter()
2711 .map(|s| s.trim().to_string())
2712 .filter(|s| !s.is_empty())
2713 .collect();
2714
2715 let mut removal_ids: HashSet<Uuid> = HashSet::new();
2716 for spec in &req.remove {
2717 if let Ok(step_id) = resolve_breakpoint(spec, &slug_lookup, &step_ids) {
2718 removal_ids.insert(step_id);
2719 }
2720 }
2721
2722 if !removal_specs.is_empty() || !removal_ids.is_empty() {
2723 breakpoints.retain(|(spec, step_id)| {
2724 let spec_trim = spec.trim();
2725 let matches_spec = removal_specs.contains(spec_trim);
2726 let matches_id = removal_ids.contains(step_id);
2727 !(matches_spec || matches_id)
2728 });
2729 }
2730
2731 for spec in &req.add {
2732 let step_id = resolve_breakpoint(spec, &slug_lookup, &step_ids)
2733 .map_err(|err| invalid_argument(err.to_string()))?;
2734 breakpoints.retain(|(_, existing)| existing != &step_id);
2735 breakpoints.push((spec.trim().to_string(), step_id));
2736 }
2737
2738 breakpoints.sort_by(|a, b| a.0.cmp(&b.0));
2739 store_debug_breakpoints(debug_map, &breakpoints);
2740 if !slug_entries_store.is_empty() {
2741 store_slug_map(debug_map, &slug_entries_store);
2742 }
2743
2744 let mut tx = self
2745 .storage
2746 .pool()
2747 .begin()
2748 .await
2749 .map_err(|err| internal_error(format!("failed to begin transaction: {err}")))?;
2750
2751 self.storage
2752 .update_run_input_ctx_tx(&mut tx, run_uuid, Value::Object(ctx_map.clone()))
2753 .await
2754 .map_err(|err| {
2755 internal_error(format!("failed to persist breakpoint changes: {err}"))
2756 })?;
2757
2758 let payload_breakpoints = breakpoints
2759 .iter()
2760 .map(|(spec, step_id)| {
2761 json!({
2762 "spec": spec,
2763 "step_id": step_id.to_string(),
2764 })
2765 })
2766 .collect::<Vec<_>>();
2767
2768 self.storage
2769 .enqueue_outbox_event(
2770 &mut tx,
2771 NewOutboxEvent {
2772 run_id: run_uuid,
2773 step_id: None,
2774 kind: "run_breakpoints_updated".to_string(),
2775 payload: json!({
2776 "run_id": run_uuid.to_string(),
2777 "breakpoints": payload_breakpoints,
2778 "added": req.add,
2779 "removed": req.remove,
2780 "cleared": req.clear_all,
2781 }),
2782 },
2783 )
2784 .await
2785 .map_err(|err| internal_error(format!("failed to enqueue outbox event: {err}")))?;
2786
2787 tx.commit()
2788 .await
2789 .map_err(|err| internal_error(format!("failed to commit breakpoint update: {err}")))?;
2790
2791 if let Err(err) = self
2792 .audit
2793 .append(
2794 &actor,
2795 self.storage
2796 .fetch_run_tenant(run_uuid)
2797 .await
2798 .ok()
2799 .flatten()
2800 .and_then(|tenant| Uuid::parse_str(&tenant).ok()),
2801 "debug.breakpoints.update",
2802 &json!({
2803 "run_id": run_uuid.to_string(),
2804 "added": req.add,
2805 "removed": req.remove,
2806 "cleared": req.clear_all,
2807 }),
2808 )
2809 .await
2810 {
2811 warn!(error = %err, run = %run_uuid, "failed to append audit record for breakpoint update");
2812 }
2813
2814 Ok(Response::new(runtime::UpdateBreakpointsResponse {
2815 breakpoints: breakpoints.into_iter().map(|(spec, _)| spec).collect(),
2816 }))
2817 }
2818
2819 async fn resume_run(
2820 &self,
2821 request: Request<runtime::ResumeRunRequest>,
2822 ) -> Result<Response<runtime::ResumeRunResponse>, Status> {
2823 let metadata = request.metadata().clone();
2824 let role = self.require_role(&metadata, Role::Writer).await?;
2825 let actor = Self::actor_from_metadata(&metadata, role);
2826 let req = request.into_inner();
2827
2828 let run_uuid = Uuid::parse_str(&req.run_id)
2829 .map_err(|err| invalid_argument(format!("invalid run_id: {err}")))?;
2830
2831 let run = self
2832 .storage
2833 .fetch_run(run_uuid)
2834 .await
2835 .map_err(|err| internal_error(format!("failed to fetch run: {err}")))?
2836 .ok_or_else(|| Status::not_found("run not found"))?;
2837
2838 if run.status != models::RunStatus::PausedAtStep {
2839 return Err(Status::failed_precondition(
2840 "run is not paused at a breakpoint",
2841 ));
2842 }
2843
2844 let ctx_value = run.input_ctx.clone();
2845 let mut ctx_map = match ctx_value {
2846 Value::Object(map) => map,
2847 Value::Null => Map::new(),
2848 other => {
2849 return Err(internal_error(format!(
2850 "run input context must be an object, found {other}"
2851 )))
2852 }
2853 };
2854
2855 let debug_entry = ctx_map
2856 .get_mut("debug")
2857 .and_then(Value::as_object_mut)
2858 .ok_or_else(|| internal_error("run debug context missing; cannot resume paused run"))?;
2859
2860 let paused_step = extract_active_pause_step_id(debug_entry).ok_or_else(|| {
2861 Status::failed_precondition("run is paused but no active step recorded")
2862 })?;
2863
2864 debug_entry.insert("active_pause".to_string(), Value::Null);
2865
2866 let mut tx = self
2867 .storage
2868 .pool()
2869 .begin()
2870 .await
2871 .map_err(|err| internal_error(format!("failed to begin transaction: {err}")))?;
2872
2873 self.storage
2874 .update_run_input_ctx_tx(&mut tx, run_uuid, Value::Object(ctx_map.clone()))
2875 .await
2876 .map_err(|err| internal_error(format!("failed to persist pause state: {err}")))?;
2877
2878 self.storage
2879 .set_run_status_tx(&mut tx, run_uuid, models::RunStatus::Running)
2880 .await
2881 .map_err(|err| internal_error(format!("failed to update run status: {err}")))?;
2882
2883 self.storage
2884 .enqueue_outbox_event(
2885 &mut tx,
2886 NewOutboxEvent {
2887 run_id: run_uuid,
2888 step_id: Some(paused_step),
2889 kind: "run_resumed".to_string(),
2890 payload: json!({
2891 "run_id": run_uuid.to_string(),
2892 "step_id": paused_step.to_string(),
2893 }),
2894 },
2895 )
2896 .await
2897 .map_err(|err| internal_error(format!("failed to enqueue resume event: {err}")))?;
2898
2899 tx.commit()
2900 .await
2901 .map_err(|err| internal_error(format!("failed to commit resume transaction: {err}")))?;
2902
2903 if let Err(err) = self
2904 .audit
2905 .append(
2906 &actor,
2907 self.storage
2908 .fetch_run_tenant(run_uuid)
2909 .await
2910 .ok()
2911 .flatten()
2912 .and_then(|tenant| Uuid::parse_str(&tenant).ok()),
2913 "debug.breakpoints.resume",
2914 &json!({
2915 "run_id": run_uuid.to_string(),
2916 "step_id": paused_step.to_string(),
2917 }),
2918 )
2919 .await
2920 {
2921 warn!(error = %err, run = %run_uuid, "failed to append audit record for resume");
2922 }
2923
2924 Ok(Response::new(runtime::ResumeRunResponse {
2925 status: runtime::RunStatus::Running as i32,
2926 resumed_step_id: paused_step.to_string(),
2927 }))
2928 }
2929
2930 async fn patch_step(
2931 &self,
2932 request: Request<runtime::PatchStepRequest>,
2933 ) -> Result<Response<runtime::PatchStepResponse>, Status> {
2934 let metadata = request.metadata().clone();
2935 let role = self.require_role(&metadata, Role::Writer).await?;
2936 let actor = Self::actor_from_metadata(&metadata, role);
2937 let req = request.into_inner();
2938
2939 let run_uuid = Uuid::parse_str(&req.run_id)
2940 .map_err(|err| invalid_argument(format!("invalid run_id: {err}")))?;
2941 let step_uuid = Uuid::parse_str(&req.step_id)
2942 .map_err(|err| invalid_argument(format!("invalid step_id: {err}")))?;
2943
2944 let run = self
2945 .storage
2946 .fetch_run(run_uuid)
2947 .await
2948 .map_err(|err| internal_error(format!("failed to fetch run: {err}")))?
2949 .ok_or_else(|| Status::not_found("run not found"))?;
2950
2951 if run.status != models::RunStatus::PausedAtStep {
2952 return Err(Status::failed_precondition(
2953 "run must be paused before patching step",
2954 ));
2955 }
2956
2957 let ctx_value = run.input_ctx.clone();
2958 let paused_step = match ctx_value {
2959 Value::Object(ref map) => map
2960 .get("debug")
2961 .and_then(Value::as_object)
2962 .and_then(extract_active_pause_step_id),
2963 _ => None,
2964 };
2965
2966 if let Some(active_step) = paused_step {
2967 if active_step != step_uuid {
2968 return Err(Status::failed_precondition(
2969 "only the paused step can be patched",
2970 ));
2971 }
2972 }
2973
2974 let inputs_patch_value = req.inputs_patch.as_ref().map(prost_struct_to_json);
2975 let env_override = req.env.as_ref().map(prost_struct_to_json);
2976
2977 let model_override = req.model.clone();
2978
2979 if inputs_patch_value.is_none() && env_override.is_none() && model_override.is_none() {
2980 return Err(invalid_argument(
2981 "patch_step requires inputs_patch, env, or model to be set",
2982 ));
2983 }
2984
2985 let steps = self
2986 .storage
2987 .fetch_steps_for_run(run_uuid)
2988 .await
2989 .map_err(|err| internal_error(format!("failed to fetch steps: {err}")))?;
2990
2991 let step = steps
2992 .into_iter()
2993 .find(|step| step.step_id == step_uuid)
2994 .ok_or_else(|| Status::not_found("step not found"))?;
2995
2996 let mut spec_map = match step.spec_json.clone() {
2997 Value::Object(map) => map,
2998 _ => {
2999 return Err(internal_error(
3000 "step spec must be a JSON object to apply patches",
3001 ))
3002 }
3003 };
3004
3005 let inputs_entry = spec_map
3006 .entry("inputs".to_string())
3007 .or_insert_with(|| Value::Object(Map::new()));
3008 let inputs_map = inputs_entry
3009 .as_object_mut()
3010 .ok_or_else(|| internal_error("step inputs is not an object; cannot apply patch"))?;
3011
3012 if let Some(patch_value) = inputs_patch_value.as_ref() {
3013 let patch_obj = patch_value
3014 .as_object()
3015 .ok_or_else(|| invalid_argument("inputs_patch must be a JSON object"))?;
3016 merge_json_object(inputs_map, patch_obj);
3017 }
3018
3019 if let Some(env_value) = env_override.as_ref() {
3020 if !env_value.is_object() {
3021 return Err(invalid_argument("env override must be a JSON object"));
3022 }
3023 inputs_map.insert("env".to_string(), env_value.clone());
3024 }
3025
3026 if let Some(model) = model_override.as_ref() {
3027 inputs_map.insert("model".to_string(), Value::String(model.clone()));
3028 }
3029
3030 let updated_spec = Value::Object(spec_map.clone());
3031
3032 let mut tx = self
3033 .storage
3034 .pool()
3035 .begin()
3036 .await
3037 .map_err(|err| internal_error(format!("failed to begin transaction: {err}")))?;
3038
3039 self.storage
3040 .update_step_spec_tx(&mut tx, run_uuid, step_uuid, updated_spec.clone())
3041 .await
3042 .map_err(|err| internal_error(format!("failed to persist step patch: {err}")))?;
3043
3044 self.storage
3045 .enqueue_outbox_event(
3046 &mut tx,
3047 NewOutboxEvent {
3048 run_id: run_uuid,
3049 step_id: Some(step_uuid),
3050 kind: "step_patched".to_string(),
3051 payload: json!({
3052 "run_id": run_uuid.to_string(),
3053 "step_id": step_uuid.to_string(),
3054 "inputs_patch": inputs_patch_value,
3055 "env_override": env_override,
3056 "model_override": model_override,
3057 "note": req.note,
3058 }),
3059 },
3060 )
3061 .await
3062 .map_err(|err| internal_error(format!("failed to enqueue patch event: {err}")))?;
3063
3064 tx.commit()
3065 .await
3066 .map_err(|err| internal_error(format!("failed to commit patch transaction: {err}")))?;
3067
3068 if let Err(err) = self
3069 .audit
3070 .append(
3071 &actor,
3072 self.storage
3073 .fetch_run_tenant(run_uuid)
3074 .await
3075 .ok()
3076 .flatten()
3077 .and_then(|tenant| Uuid::parse_str(&tenant).ok()),
3078 "debug.step.patch",
3079 &json!({
3080 "run_id": run_uuid.to_string(),
3081 "step_id": step_uuid.to_string(),
3082 "note": req.note,
3083 }),
3084 )
3085 .await
3086 {
3087 warn!(error = %err, run = %run_uuid, step = %step_uuid, "failed to append audit record for step patch");
3088 }
3089
3090 Ok(Response::new(runtime::PatchStepResponse {
3091 step_id: step_uuid.to_string(),
3092 }))
3093 }
3094
3095 async fn create_branch_run(
3096 &self,
3097 request: Request<runtime::CreateBranchRunRequest>,
3098 ) -> Result<Response<runtime::CreateBranchRunResponse>, Status> {
3099 let metadata = request.metadata().clone();
3100 let role = self.require_role(&metadata, Role::Writer).await?;
3101 let actor = Self::actor_from_metadata(&metadata, role);
3102 let req = request.into_inner();
3103
3104 let run_uuid = Uuid::parse_str(&req.run_id)
3105 .map_err(|err| invalid_argument(format!("invalid run_id: {err}")))?;
3106
3107 let from_step_spec = req.from_step.trim().to_string();
3108 if from_step_spec.is_empty() {
3109 return Err(invalid_argument("from_step is required"));
3110 }
3111
3112 let seed_mode = runtime::BranchSeedMode::try_from(req.seed_mode)
3113 .unwrap_or(runtime::BranchSeedMode::Same);
3114
3115 let run = self
3116 .storage
3117 .fetch_run(run_uuid)
3118 .await
3119 .map_err(|err| internal_error(format!("failed to fetch run: {err}")))?
3120 .ok_or_else(|| Status::not_found("run not found"))?;
3121
3122 let (inputs_value, mut labels_map) = unpack_input_ctx(&run.input_ctx);
3123
3124 let debug_entry = run
3125 .input_ctx
3126 .get("debug")
3127 .and_then(Value::as_object)
3128 .cloned()
3129 .unwrap_or_default();
3130 let slug_lookup = load_slug_map(&debug_entry);
3131 let step_ids = collect_step_ids(&run);
3132 let target_step = resolve_breakpoint(&from_step_spec, &slug_lookup, &step_ids)
3133 .map_err(|err| invalid_argument(err.to_string()))?;
3134
3135 let branch_dag = build_branch_dag(&run.dag_json, target_step)
3136 .map_err(|err| invalid_argument(err.to_string()))?;
3137
3138 let mut inputs_map = match inputs_value {
3139 Value::Object(map) => map,
3140 Value::Null => Map::new(),
3141 _ => Map::new(),
3142 };
3143
3144 let mut inputs_patch_debug: Option<Value> = None;
3145 if let Some(patch) = req.inputs_patch.as_ref() {
3146 let patch_value = prost_struct_to_json(patch);
3147 let patch_obj = patch_value
3148 .as_object()
3149 .ok_or_else(|| invalid_argument("inputs_patch must be a JSON object"))?;
3150 merge_json_object(&mut inputs_map, patch_obj);
3151 inputs_patch_debug = Some(Value::Object(patch_obj.clone()));
3152 }
3153
3154 let mut labels_patch_debug: Option<Value> = None;
3155 if let Some(labels_patch) = req.labels_patch.as_ref() {
3156 let patch_value = prost_struct_to_json(labels_patch);
3157 let patch_obj = patch_value
3158 .as_object()
3159 .ok_or_else(|| invalid_argument("labels_patch must be a JSON object"))?;
3160 for (key, value) in patch_obj {
3161 let rendered = value
3162 .as_str()
3163 .map(|s| s.to_string())
3164 .unwrap_or_else(|| value.to_string());
3165 labels_map.insert(key.clone(), rendered);
3166 }
3167 labels_patch_debug = Some(Value::Object(patch_obj.clone()));
3168 }
3169
3170 labels_map.insert("branch_parent".to_string(), run_uuid.to_string());
3171 labels_map.insert("branch_from_step".to_string(), target_step.to_string());
3172
3173 let branch_seed = match seed_mode {
3174 runtime::BranchSeedMode::Same | runtime::BranchSeedMode::Unspecified => run.seed,
3175 runtime::BranchSeedMode::New => Utc::now().timestamp_nanos_opt().unwrap_or(run.seed),
3176 runtime::BranchSeedMode::Custom => req
3177 .seed
3178 .as_ref()
3179 .copied()
3180 .ok_or_else(|| invalid_argument("seed value required for custom mode"))?,
3181 };
3182
3183 let prompt_packs = collect_prompt_packs_json(&run.dag_json);
3184
3185 let mut branch_meta = Map::new();
3186 branch_meta.insert(
3187 "parent_run_id".to_string(),
3188 Value::String(run_uuid.to_string()),
3189 );
3190 branch_meta.insert(
3191 "parent_step_id".to_string(),
3192 Value::String(target_step.to_string()),
3193 );
3194 branch_meta.insert(
3195 "from_step_spec".to_string(),
3196 Value::String(from_step_spec.clone()),
3197 );
3198 branch_meta.insert(
3199 "parent_seed".to_string(),
3200 Value::Number(Number::from(run.seed)),
3201 );
3202 branch_meta.insert(
3203 "branch_seed".to_string(),
3204 Value::Number(Number::from(branch_seed)),
3205 );
3206 let seed_mode_str = match seed_mode {
3207 runtime::BranchSeedMode::Same | runtime::BranchSeedMode::Unspecified => "same",
3208 runtime::BranchSeedMode::New => "new",
3209 runtime::BranchSeedMode::Custom => "custom",
3210 };
3211 branch_meta.insert(
3212 "seed_mode".to_string(),
3213 Value::String(seed_mode_str.to_string()),
3214 );
3215 branch_meta.insert(
3216 "created_at".to_string(),
3217 Value::String(Utc::now().to_rfc3339()),
3218 );
3219 branch_meta.insert(
3220 "prompt_packs".to_string(),
3221 Value::Array(prompt_packs.into_iter().map(Value::String).collect()),
3222 );
3223 let note = req.note.trim();
3224 if !note.is_empty() {
3225 branch_meta.insert("note".to_string(), Value::String(note.to_string()));
3226 }
3227 if let Some(patch) = inputs_patch_debug {
3228 branch_meta.insert("inputs_patch".to_string(), patch);
3229 }
3230 if let Some(patch) = labels_patch_debug {
3231 branch_meta.insert("labels_patch".to_string(), patch);
3232 }
3233
3234 let mut debug_extra = Map::new();
3235 debug_extra.insert("branch".to_string(), Value::Object(branch_meta));
3236
3237 let submission = SubmitRunSpec {
3238 dag_json: branch_dag,
3239 inputs: Value::Object(inputs_map.clone()),
3240 labels: labels_map.clone(),
3241 seed: branch_seed,
3242 traceparent: None,
3243 tracestate: None,
3244 baggage: None,
3245 debug: Some(Value::Object(debug_extra.clone())),
3246 breakpoints: Vec::new(),
3247 };
3248
3249 let outcome = submit_run_impl(
3250 self.storage.as_ref(),
3251 self.audit.as_ref(),
3252 submission,
3253 None,
3254 &actor,
3255 )
3256 .await
3257 .map_err(|err| match err {
3258 SubmitRunError::Invalid(message) => invalid_argument(message),
3259 SubmitRunError::Internal(message) => internal_error(message),
3260 })?;
3261
3262 let new_run_uuid = match outcome {
3263 SubmitRunOutcome::Created(run_id) | SubmitRunOutcome::Reused(run_id) => run_id,
3264 };
3265
3266 let mut tx = self.storage.pool().begin().await.map_err(|err| {
3267 internal_error(format!("failed to begin branch event transaction: {err}"))
3268 })?;
3269
3270 self.storage
3271 .enqueue_outbox_event(
3272 &mut tx,
3273 NewOutboxEvent {
3274 run_id: run_uuid,
3275 step_id: Some(target_step),
3276 kind: "run_branch_created".to_string(),
3277 payload: json!({
3278 "run_id": run_uuid.to_string(),
3279 "child_run_id": new_run_uuid.to_string(),
3280 "from_step_spec": from_step_spec,
3281 "seed": branch_seed,
3282 }),
3283 },
3284 )
3285 .await
3286 .map_err(|err| internal_error(format!("failed to enqueue branch event: {err}")))?;
3287
3288 tx.commit().await.map_err(|err| {
3289 internal_error(format!("failed to commit branch event transaction: {err}"))
3290 })?;
3291
3292 if let Err(err) = self
3293 .audit
3294 .append(
3295 &actor,
3296 self.storage
3297 .fetch_run_tenant(run_uuid)
3298 .await
3299 .ok()
3300 .flatten()
3301 .and_then(|tenant| Uuid::parse_str(&tenant).ok()),
3302 "debug.branch.create",
3303 &json!({
3304 "run_id": run_uuid.to_string(),
3305 "child_run_id": new_run_uuid.to_string(),
3306 "step_id": target_step.to_string(),
3307 }),
3308 )
3309 .await
3310 {
3311 warn!(error = %err, run = %run_uuid, "failed to append audit record for branch");
3312 }
3313
3314 Ok(Response::new(runtime::CreateBranchRunResponse {
3315 run_id: new_run_uuid.to_string(),
3316 }))
3317 }
3318
3319 async fn get_attestations(
3320 &self,
3321 request: Request<runtime::GetAttestationsRequest>,
3322 ) -> Result<Response<runtime::GetAttestationsResponse>, Status> {
3323 let metadata = request.metadata().clone();
3324 self.require_role(&metadata, Role::Reader).await?;
3325 let req = request.into_inner();
3326 let mut seen: HashSet<Uuid> = HashSet::new();
3327 let mut results = Vec::new();
3328
3329 for raw in req.attestation_ids {
3330 let trimmed = raw.trim();
3331 if trimmed.is_empty() {
3332 continue;
3333 }
3334 let id = Uuid::parse_str(trimmed).map_err(|err| {
3335 invalid_argument(format!("invalid attestation id '{trimmed}': {err}"))
3336 })?;
3337 if !seen.insert(id) {
3338 continue;
3339 }
3340 match self.attestation_vault.fetch(id).await {
3341 Ok(Some(attestation)) => {
3342 results.push(attestation_to_proto(attestation)?);
3343 }
3344 Ok(None) => {
3345 }
3347 Err(err) => {
3348 return Err(internal_error(format!(
3349 "failed to load attestation {id}: {err}"
3350 )));
3351 }
3352 }
3353 }
3354
3355 Ok(Response::new(runtime::GetAttestationsResponse {
3356 attestations: results,
3357 }))
3358 }
3359
3360 async fn sync_eval_pack(
3361 &self,
3362 request: Request<runtime::SyncEvalPackRequest>,
3363 ) -> Result<Response<runtime::SyncEvalPackResponse>, Status> {
3364 let metadata = request.metadata().clone();
3365 self.require_role(&metadata, Role::Writer).await?;
3366
3367 let req = request.into_inner();
3368 let pack = req.pack.trim();
3369 if pack.is_empty() {
3370 return Err(invalid_argument("pack is required"));
3371 }
3372 let pack_name = pack.to_string();
3373
3374 let mut scenario_ids = Vec::with_capacity(req.scenarios.len());
3375 for scenario in req.scenarios {
3376 let slug = scenario.slug.trim();
3377 if slug.is_empty() {
3378 return Err(invalid_argument("scenario slug cannot be empty"));
3379 }
3380 let run_spec_struct = scenario
3381 .run_spec
3382 .ok_or_else(|| invalid_argument("scenario run_spec must be provided"))?;
3383 let expected_struct = scenario
3384 .expected
3385 .ok_or_else(|| invalid_argument("scenario expected payload must be provided"))?;
3386
3387 let run_spec_json = prost_struct_to_json(&run_spec_struct);
3389 let expected_json = prost_struct_to_json(&expected_struct);
3390
3391 let title = scenario.title.trim();
3392 let description = scenario.description.trim();
3393
3394 let mut spec_map = Map::new();
3395 spec_map.insert("pack".to_string(), Value::String(pack_name.clone()));
3396 spec_map.insert("slug".to_string(), Value::String(slug.to_string()));
3397 if !title.is_empty() {
3398 spec_map.insert("title".to_string(), Value::String(title.to_string()));
3399 }
3400 if !description.is_empty() {
3401 spec_map.insert(
3402 "description".to_string(),
3403 Value::String(description.to_string()),
3404 );
3405 }
3406 spec_map.insert("run_spec".to_string(), run_spec_json);
3407 spec_map.insert("expected".to_string(), expected_json);
3408 let spec_value = Value::Object(spec_map);
3409
3410 let scenario_name = format!("{pack}/{slug}");
3411 let scenario_id = self
3412 .storage
3413 .upsert_eval_scenario(
3414 &scenario_name,
3415 if description.is_empty() {
3416 None
3417 } else {
3418 Some(description)
3419 },
3420 spec_value,
3421 )
3422 .await
3423 .map_err(|err| {
3424 internal_error(format!(
3425 "failed to upsert eval scenario '{scenario_name}': {err}"
3426 ))
3427 })?;
3428 scenario_ids.push(scenario_id.to_string());
3429 }
3430
3431 Ok(Response::new(runtime::SyncEvalPackResponse {
3432 scenario_ids,
3433 }))
3434 }
3435
3436 async fn list_eval_scenarios(
3437 &self,
3438 request: Request<runtime::ListEvalScenariosRequest>,
3439 ) -> Result<Response<runtime::ListEvalScenariosResponse>, Status> {
3440 let metadata = request.metadata().clone();
3441 self.require_role(&metadata, Role::Reader).await?;
3442
3443 let scenarios = self
3444 .storage
3445 .list_eval_scenarios()
3446 .await
3447 .map_err(|err| internal_error(format!("failed to list eval scenarios: {err}")))?;
3448
3449 let mut proto_scenarios = Vec::with_capacity(scenarios.len());
3450 for scenario in scenarios {
3451 let spec = parse_eval_spec(&scenario.spec).map_err(|err| {
3452 internal_error(format!(
3453 "invalid eval scenario '{}' spec: {err}",
3454 scenario.name
3455 ))
3456 })?;
3457 let history_models = self
3458 .storage
3459 .list_eval_results(scenario.scenario_id, 20)
3460 .await
3461 .map_err(|err| internal_error(format!("failed to fetch eval results: {err}")))?;
3462 let mut history_proto = Vec::with_capacity(history_models.len());
3463 for result in history_models {
3464 history_proto.push(build_eval_result_proto(result)?);
3465 }
3466 proto_scenarios.push(build_eval_scenario_proto(&scenario, &spec, history_proto)?);
3467 }
3468
3469 Ok(Response::new(runtime::ListEvalScenariosResponse {
3470 scenarios: proto_scenarios,
3471 }))
3472 }
3473
3474 async fn record_eval_result(
3475 &self,
3476 request: Request<runtime::RecordEvalResultRequest>,
3477 ) -> Result<Response<runtime::RecordEvalResultResponse>, Status> {
3478 let metadata = request.metadata().clone();
3479 self.require_role(&metadata, Role::Writer).await?;
3480
3481 let req = request.into_inner();
3482 let pack = req.pack.trim();
3483 let slug = req.slug.trim();
3484 if pack.is_empty() {
3485 return Err(invalid_argument("pack is required"));
3486 }
3487 if slug.is_empty() {
3488 return Err(invalid_argument("slug is required"));
3489 }
3490
3491 let scenario_name = format!("{pack}/{slug}");
3492 let scenario = self
3493 .storage
3494 .fetch_eval_scenario_by_name(&scenario_name)
3495 .await
3496 .map_err(|err| internal_error(format!("failed to fetch eval scenario: {err}")))?
3497 .ok_or_else(|| {
3498 Status::not_found(format!("eval scenario '{scenario_name}' not found"))
3499 })?;
3500
3501 let spec = parse_eval_spec(&scenario.spec).map_err(|err| {
3502 internal_error(format!(
3503 "invalid eval scenario '{}' spec: {err}",
3504 scenario.name
3505 ))
3506 })?;
3507
3508 let run_uuid = Uuid::parse_str(&req.run_id)
3509 .map_err(|err| invalid_argument(format!("invalid run_id: {err}")))?;
3510 let run = self
3511 .storage
3512 .fetch_run(run_uuid)
3513 .await
3514 .map_err(|err| internal_error(format!("failed to fetch run: {err}")))?
3515 .ok_or_else(|| Status::not_found("run not found"))?;
3516 let steps = self
3517 .storage
3518 .fetch_steps_for_run(run_uuid)
3519 .await
3520 .map_err(|err| internal_error(format!("failed to fetch steps: {err}")))?;
3521 let last_event = self
3522 .storage
3523 .fetch_last_outbox_event(run_uuid)
3524 .await
3525 .map_err(|err| internal_error(format!("failed to fetch run events: {err}")))?;
3526
3527 let previous = self
3528 .storage
3529 .latest_eval_result(scenario.scenario_id)
3530 .await
3531 .map_err(|err| {
3532 internal_error(format!("failed to fetch previous eval result: {err}"))
3533 })?;
3534 let previous_metrics = previous
3535 .as_ref()
3536 .filter(|result| result.run_id != run_uuid)
3537 .map(|result| &result.metrics);
3538
3539 let commit = req.commit.trim();
3540 let commit = if commit.is_empty() {
3541 None
3542 } else {
3543 Some(commit.to_string())
3544 };
3545
3546 let (outcome, metrics) = compute_eval_metrics(
3547 &spec,
3548 &run,
3549 &steps,
3550 last_event.as_ref().map(|event| event.created_at),
3551 previous_metrics,
3552 commit.clone(),
3553 )
3554 .map_err(|err| internal_error(format!("failed to compute eval metrics: {err}")))?;
3555
3556 let stored = self
3557 .storage
3558 .insert_eval_result(scenario.scenario_id, run_uuid, &outcome, metrics)
3559 .await
3560 .map_err(|err| internal_error(format!("failed to persist eval result: {err}")))?;
3561
3562 let proto = build_eval_result_proto(stored)?;
3563 Ok(Response::new(runtime::RecordEvalResultResponse {
3564 result: Some(proto),
3565 }))
3566 }
3567
3568 async fn check_change_gate(
3569 &self,
3570 request: Request<runtime::CheckChangeGateRequest>,
3571 ) -> Result<Response<runtime::CheckChangeGateResponse>, Status> {
3572 let metadata = request.metadata().clone();
3573 let role = self.require_role(&metadata, Role::Writer).await?;
3574 let req = request.into_inner();
3575
3576 let change_id = req.change_id.trim();
3577 if change_id.is_empty() {
3578 return Err(invalid_argument("change_id is required"));
3579 }
3580
3581 let revision = req.revision.trim();
3582 let revision = if revision.is_empty() {
3583 None
3584 } else {
3585 Some(revision.to_string())
3586 };
3587
3588 let actor_header = Self::actor_from_metadata(&metadata, role);
3589 let actor = if req.actor.trim().is_empty() {
3590 actor_header
3591 } else {
3592 req.actor.trim().to_string()
3593 };
3594
3595 let diffs = req
3596 .diffs
3597 .into_iter()
3598 .map(|diff| GateDiff {
3599 path: diff.path,
3600 lines_added: diff.lines_added,
3601 lines_deleted: diff.lines_deleted,
3602 novelty_score: diff.novelty_score,
3603 risk_tags: diff.risk_tags,
3604 component: if diff.component.is_empty() {
3605 None
3606 } else {
3607 Some(diff.component)
3608 },
3609 })
3610 .collect();
3611
3612 let coverage = req
3613 .coverage
3614 .map(|report| GateCoverageReport {
3615 overall_ratio: report.overall_ratio,
3616 minimum_ratio: report.minimum_ratio,
3617 components: report
3618 .components
3619 .into_iter()
3620 .map(|component| GateCoverageComponent {
3621 name: component.name,
3622 coverage_ratio: component.coverage_ratio,
3623 required_ratio: component.required_ratio,
3624 })
3625 .collect(),
3626 })
3627 .unwrap_or_default();
3628
3629 let evals = req
3630 .evals
3631 .into_iter()
3632 .map(|metric| GateEvalMetric {
3633 name: metric.name,
3634 slug: if metric.slug.is_empty() {
3635 None
3636 } else {
3637 Some(metric.slug)
3638 },
3639 score: metric.score,
3640 threshold: metric.threshold,
3641 direction: match runtime::MetricDirection::try_from(metric.direction) {
3642 Ok(runtime::MetricDirection::LowerIsBetter) => {
3643 GateMetricDirection::LowerIsBetter
3644 }
3645 _ => GateMetricDirection::HigherIsBetter,
3646 },
3647 weight: metric.weight,
3648 })
3649 .collect();
3650
3651 let budgets = req
3652 .budgets
3653 .into_iter()
3654 .map(|budget| GateBudgetSignal {
3655 name: budget.name,
3656 actual: budget.actual,
3657 limit: budget.limit,
3658 })
3659 .collect();
3660
3661 let telemetry_value = struct_to_value(req.telemetry);
3662 let metadata_value = struct_to_value(req.metadata);
3663
3664 let gate_request = GateRequest {
3665 change_id: change_id.to_string(),
3666 gate_id: None,
3667 revision: revision.clone(),
3668 diffs,
3669 coverage,
3670 evals,
3671 budgets,
3672 telemetry: telemetry_value.clone(),
3673 metadata: metadata_value.clone(),
3674 actor: Some(actor.clone()),
3675 };
3676
3677 let mut decision = self
3678 .changeops
3679 .evaluate(&gate_request)
3680 .map_err(|err| invalid_argument(err.to_string()))?;
3681
3682 let gate_id = Uuid::new_v4();
3683 let mut stored_request = gate_request.clone();
3684 stored_request.gate_id = Some(gate_id);
3685
3686 let decision_payload = decision.to_value(&stored_request);
3687 let decision_bytes = serde_json::to_vec(&decision_payload).map_err(|err| {
3688 internal_error(format!("failed to serialise change gate decision: {err}"))
3689 })?;
3690
3691 let artifact_meta = self
3692 .storage
3693 .artifacts()
3694 .put(
3695 Bytes::from(decision_bytes),
3696 "application/json",
3697 json!({
3698 "kind": "change_gate_decision",
3699 "change_id": change_id,
3700 "effect": decision.effect.as_str(),
3701 "gate_id": gate_id.to_string(),
3702 }),
3703 )
3704 .await
3705 .map_err(|err| {
3706 internal_error(format!("failed to persist change gate artifact: {err}"))
3707 })?;
3708
3709 let mut attestation_set: HashSet<Uuid> =
3710 attestation_ids_from_value(&stored_request.telemetry)
3711 .into_iter()
3712 .collect();
3713 attestation_set.extend(attestation_ids_from_value(&decision_payload));
3714 let attestation_refs: Vec<Uuid> = attestation_set.into_iter().collect();
3715
3716 if !attestation_refs.is_empty() && self.transparency_scope.includes_gates() {
3717 let artifact_sha = hex::encode(artifact_meta.sha256);
3718 if self.transparency_writer_enabled {
3719 let payload =
3720 models::TransparencyJobPayload::ReleaseBundle(models::ReleaseBundlePayload {
3721 change_id: change_id.to_string(),
3722 gate_id,
3723 artifact_sha256: artifact_sha.clone(),
3724 attestation_ids: attestation_refs.clone(),
3725 metadata: decision.metadata.clone(),
3726 });
3727 match self.storage.enqueue_transparency_job(payload).await {
3728 Ok(job_id) => {
3729 let queued = json!({
3730 "status": "queued",
3731 "job_id": job_id,
3732 "artifact_sha256": artifact_sha,
3733 "attestation_ids": attestation_refs.iter().map(|id| id.to_string()).collect::<Vec<_>>(),
3734 });
3735 if let Value::Object(mut meta_obj) = decision.metadata.take() {
3736 meta_obj.insert("scitt_entry".to_string(), queued);
3737 decision.metadata = Value::Object(meta_obj);
3738 } else {
3739 decision.metadata = json!({ "scitt_entry": queued });
3740 }
3741 }
3742 Err(err) => {
3743 warn!(
3744 change = change_id,
3745 gate = %gate_id,
3746 error = %err,
3747 "failed to enqueue transparency job; falling back to synchronous publish"
3748 );
3749 self.publish_scitt_entry(
3750 change_id,
3751 gate_id,
3752 &artifact_sha,
3753 &attestation_refs,
3754 &mut decision.metadata,
3755 )
3756 .await?;
3757 }
3758 }
3759 } else {
3760 self.publish_scitt_entry(
3761 change_id,
3762 gate_id,
3763 &artifact_sha,
3764 &attestation_refs,
3765 &mut decision.metadata,
3766 )
3767 .await?;
3768 }
3769 }
3770
3771 let stored_gate = self
3772 .storage
3773 .insert_change_gate(models::NewChangeGate {
3774 gate_id,
3775 change_id: change_id.to_string(),
3776 revision: revision.clone(),
3777 effect: decision.effect.as_str().to_string(),
3778 reasons: serde_json::to_value(&decision.reasons).map_err(|err| {
3779 internal_error(format!("failed to serialise change gate reasons: {err}"))
3780 })?,
3781 followups: serde_json::to_value(&decision.followups).map_err(|err| {
3782 internal_error(format!("failed to serialise change gate followups: {err}"))
3783 })?,
3784 scorecard: serde_json::to_value(&decision.scorecard).map_err(|err| {
3785 internal_error(format!("failed to serialise change gate scorecard: {err}"))
3786 })?,
3787 decided_at: decision.decided_at,
3788 decided_by: Some(actor.clone()),
3789 metadata: decision.metadata.clone(),
3790 input: serde_json::to_value(&stored_request).map_err(|err| {
3791 internal_error(format!("failed to serialise change gate request: {err}"))
3792 })?,
3793 telemetry: stored_request.telemetry.clone(),
3794 artifact_sha256: Some(artifact_meta.sha256),
3795 })
3796 .await
3797 .map_err(|err| {
3798 internal_error(format!("failed to persist change gate decision: {err}"))
3799 })?;
3800
3801 let audit_payload = decision_payload.clone();
3802 self.audit
3803 .append(&actor, None, "change_gate.check", &audit_payload)
3804 .await
3805 .map_err(|err| {
3806 internal_error(format!(
3807 "failed to append audit trail for change gate: {err}"
3808 ))
3809 })?;
3810
3811 tracing::info!(
3812 target: "fleetforge.changeops",
3813 gate = %gate_id,
3814 change = %change_id,
3815 effect = decision.effect.as_str(),
3816 reasons = ?decision.reasons,
3817 "change gate evaluated"
3818 );
3819
3820 let proto_decision = build_proto_change_gate(
3821 stored_gate.gate_id,
3822 change_id,
3823 revision.as_deref(),
3824 &decision,
3825 &stored_gate.telemetry,
3826 &[],
3827 )?;
3828
3829 Ok(Response::new(runtime::CheckChangeGateResponse {
3830 decision: Some(proto_decision),
3831 }))
3832 }
3833
3834 async fn record_gate_followup(
3835 &self,
3836 request: Request<runtime::RecordGateFollowupRequest>,
3837 ) -> Result<Response<runtime::RecordGateFollowupResponse>, Status> {
3838 let metadata = request.metadata().clone();
3839 let role = self.require_role(&metadata, Role::Writer).await?;
3840 let req = request.into_inner();
3841
3842 let gate_id = Uuid::parse_str(&req.gate_id)
3843 .map_err(|err| invalid_argument(format!("invalid gate_id: {err}")))?;
3844
3845 let gate = self
3846 .storage
3847 .fetch_change_gate(gate_id)
3848 .await
3849 .map_err(|err| internal_error(format!("failed to fetch change gate: {err}")))?
3850 .ok_or_else(|| Status::not_found("change gate not found"))?;
3851
3852 let actor_header = Self::actor_from_metadata(&metadata, role);
3853 let actor = if req.actor.trim().is_empty() {
3854 actor_header
3855 } else {
3856 req.actor.trim().to_string()
3857 };
3858
3859 let outcome = runtime::FollowupOutcome::try_from(req.outcome)
3860 .unwrap_or(runtime::FollowupOutcome::Approved);
3861 let outcome_gate = proto_followup_outcome(outcome);
3862
3863 let followup_id = Uuid::new_v4();
3864 let note = req.note;
3865 let details_value = struct_to_value(req.details);
3866
3867 let stored_followup = self
3868 .storage
3869 .insert_change_gate_followup(models::NewChangeGateFollowup {
3870 followup_id,
3871 gate_id,
3872 actor: actor.clone(),
3873 outcome: outcome_gate.as_str().to_string(),
3874 note: note.clone(),
3875 details: details_value.clone(),
3876 })
3877 .await
3878 .map_err(|err| {
3879 internal_error(format!("failed to persist change gate follow-up: {err}"))
3880 })?;
3881
3882 let record_payload = ChangeFollowupRecord {
3883 gate_id,
3884 actor: actor.clone(),
3885 outcome: outcome_gate,
3886 note: note.clone(),
3887 details: stored_followup.details.clone(),
3888 recorded_at: stored_followup.recorded_at,
3889 };
3890
3891 self.audit
3892 .append(
3893 &actor,
3894 None,
3895 "change_gate.followup",
3896 &record_payload.to_value(),
3897 )
3898 .await
3899 .map_err(|err| {
3900 internal_error(format!(
3901 "failed to append change gate follow-up audit: {err}"
3902 ))
3903 })?;
3904
3905 tracing::info!(
3906 target: "fleetforge.changeops",
3907 gate = %gate_id,
3908 followup = %followup_id,
3909 outcome = %stored_followup.outcome,
3910 "change gate follow-up recorded"
3911 );
3912
3913 let proto_followup = gate_followup_record_to_proto(&stored_followup);
3914 Ok(Response::new(runtime::RecordGateFollowupResponse {
3915 followup: Some(proto_followup),
3916 }))
3917 }
3918
3919 async fn list_change_gates(
3920 &self,
3921 request: Request<runtime::ListChangeGatesRequest>,
3922 ) -> Result<Response<runtime::ListChangeGatesResponse>, Status> {
3923 let metadata = request.metadata().clone();
3924 self.require_role(&metadata, Role::Reader).await?;
3925 let req = request.into_inner();
3926
3927 let change_id = req.change_id.trim();
3928 if change_id.is_empty() {
3929 return Err(invalid_argument("change_id is required"));
3930 }
3931
3932 let limit = if req.limit == 0 { 5 } else { req.limit.min(50) } as i64;
3933
3934 let gates = self
3935 .storage
3936 .list_change_gates_for_change(change_id, limit)
3937 .await
3938 .map_err(|err| {
3939 internal_error(format!("failed to fetch change gate decisions: {err}"))
3940 })?;
3941
3942 let mut decisions = Vec::with_capacity(gates.len());
3943 for gate in gates {
3944 let followups = self
3945 .storage
3946 .fetch_change_gate_followups(gate.gate_id)
3947 .await
3948 .map_err(|err| {
3949 internal_error(format!("failed to fetch change gate follow-ups: {err}"))
3950 })?;
3951
3952 let effect = gate_effect_from_str(&gate.effect);
3953 let reasons = parse_reasons(&gate.reasons);
3954 let followup_actions = parse_followup_actions(&gate.followups);
3955 let scorecard = parse_scorecard(&gate.scorecard);
3956
3957 let gate_decision = GateDecision {
3958 effect,
3959 reasons,
3960 followups: followup_actions,
3961 scorecard,
3962 decided_at: gate.decided_at,
3963 metadata: gate.metadata.clone(),
3964 };
3965
3966 let proto_decision = build_proto_change_gate(
3967 gate.gate_id,
3968 &gate.change_id,
3969 gate.revision.as_deref(),
3970 &gate_decision,
3971 &gate.telemetry,
3972 &followups,
3973 )?;
3974 decisions.push(proto_decision);
3975 }
3976
3977 Ok(Response::new(runtime::ListChangeGatesResponse {
3978 decisions,
3979 }))
3980 }
3981}
3982
3983#[instrument(skip_all)]
3985pub async fn start_api_server(storage: Arc<Storage>, audit: Arc<Audit>) -> Result<()> {
3986 let auth = Arc::new(AuthHandle::from_env().await?);
3987
3988 let endpoints = resolve_endpoint_config()?;
3989 let cors_config = cors_config_from_env();
3990 let transparency = transparency_log_from_env(Arc::clone(&storage))?;
3991 let transparency_scope = transparency_scope_from_env();
3992 let transparency_writer_enabled = transparency_writer_enabled_flag();
3993 let writer_interval = transparency_writer_interval();
3994
3995 if transparency_writer_enabled && !matches!(transparency_scope, TransparencyScope::Disabled) {
3996 info!(
3997 "transparency writer enabled (interval={}s)",
3998 writer_interval.as_secs()
3999 );
4000 let writer_storage = Arc::clone(&storage);
4001 let writer_log = Arc::clone(&transparency);
4002 tokio::spawn(async move {
4003 let writer = TransparencyWriter::new(writer_storage, writer_log, writer_interval);
4004 writer.run().await;
4005 });
4006 }
4007
4008 let cors_summary = cors_config
4009 .as_ref()
4010 .map(|cfg| {
4011 if cfg.display.is_empty() {
4012 "[]".to_string()
4013 } else {
4014 cfg.display.join(", ")
4015 }
4016 })
4017 .unwrap_or_else(|| "disabled".to_string());
4018
4019 info!(
4020 grpc_endpoint = %format!("http://{}", endpoints.grpc_addr),
4021 http_endpoint = %format!("http://{}", endpoints.http_addr),
4022 cors_allowlist = %cors_summary,
4023 "runtime API configuration ready"
4024 );
4025
4026 if endpoints.combined {
4027 run_combined_server(
4028 endpoints.grpc_addr,
4029 storage,
4030 auth,
4031 audit,
4032 transparency,
4033 transparency_writer_enabled,
4034 transparency_scope,
4035 cors_config,
4036 )
4037 .await
4038 } else {
4039 let grpc_storage = Arc::clone(&storage);
4040 let http_storage = Arc::clone(&storage);
4041 let http_auth = Arc::clone(&auth);
4042 let grpc_transparency = Arc::clone(&transparency);
4043 tokio::try_join!(
4044 run_grpc_server(
4045 endpoints.grpc_addr,
4046 grpc_storage,
4047 Arc::clone(&auth),
4048 Arc::clone(&audit),
4049 grpc_transparency,
4050 transparency_writer_enabled,
4051 transparency_scope,
4052 cors_config.clone(),
4053 ),
4054 run_http_server(
4055 endpoints.http_addr,
4056 http_storage,
4057 http_auth,
4058 Arc::clone(&audit),
4059 cors_config,
4060 ),
4061 )?;
4062 Ok(())
4063 }
4064}
4065
4066#[derive(Clone, Copy, Debug)]
4067struct EndpointConfig {
4068 grpc_addr: SocketAddr,
4069 http_addr: SocketAddr,
4070 combined: bool,
4071}
4072
4073fn resolve_endpoint_config() -> Result<EndpointConfig> {
4074 if let Ok(port_raw) = std::env::var("PORT") {
4075 let port: u16 = port_raw.parse().context("invalid PORT value")?;
4076 let addr = SocketAddr::from(([0, 0, 0, 0], port));
4077 return Ok(EndpointConfig {
4078 grpc_addr: addr,
4079 http_addr: addr,
4080 combined: true,
4081 });
4082 }
4083
4084 let grpc_addr: SocketAddr = std::env::var("FLEETFORGE_API_ADDR")
4085 .unwrap_or_else(|_| "0.0.0.0:50051".to_string())
4086 .parse()
4087 .context("invalid FLEETFORGE_API_ADDR")?;
4088 let http_addr: SocketAddr = std::env::var("FLEETFORGE_HTTP_ADDR")
4089 .unwrap_or_else(|_| "0.0.0.0:8080".to_string())
4090 .parse()
4091 .context("invalid FLEETFORGE_HTTP_ADDR")?;
4092
4093 Ok(EndpointConfig {
4094 grpc_addr,
4095 http_addr,
4096 combined: grpc_addr == http_addr,
4097 })
4098}
4099
4100#[instrument(skip_all)]
4101async fn run_combined_server(
4102 addr: SocketAddr,
4103 storage: Arc<Storage>,
4104 auth: Arc<AuthHandle>,
4105 audit: Arc<Audit>,
4106 transparency: Arc<dyn TransparencyLog>,
4107 transparency_writer_enabled: bool,
4108 transparency_scope: TransparencyScope,
4109 cors: Option<CorsConfig>,
4110) -> Result<()> {
4111 let listener = TcpListener::bind(addr)
4112 .await
4113 .with_context(|| format!("failed to bind listener on {addr}"))?;
4114
4115 let attestation_vault: Arc<dyn AttestationVault> = Arc::new(ObjectStoreAttestationVault::new(
4116 storage.object_store(),
4117 storage.pool().clone(),
4118 ));
4119 let runtime_api = RuntimeApiService::new(
4120 Arc::clone(&storage),
4121 Arc::clone(&auth),
4122 Arc::clone(&audit),
4123 Arc::clone(&attestation_vault),
4124 Arc::clone(&transparency),
4125 transparency_writer_enabled,
4126 transparency_scope,
4127 );
4128 let tap_api = TapApiService::new(Arc::clone(&storage), Arc::clone(&auth));
4129
4130 let (mut health_reporter, health_service) = health_reporter();
4131 health_reporter
4132 .set_service_status(
4133 "fleetforge.runtime.v1.RuntimeService",
4134 tonic_health::ServingStatus::Serving,
4135 )
4136 .await; health_reporter
4138 .set_service_status(
4139 "fleetforge.tap.v2.TapService",
4140 tonic_health::ServingStatus::Serving,
4141 )
4142 .await;
4143 health_reporter
4144 .set_service_status("", tonic_health::ServingStatus::Serving)
4145 .await;
4146
4147 let runtime_service = RuntimeServiceServer::new(runtime_api);
4148 let runtime_service = tonic_web::enable(runtime_service);
4149 let tap_service = TapServiceServer::new(tap_api);
4150 let tap_service = tonic_web::enable(tap_service);
4151
4152 let reflection = ReflectionBuilder::configure()
4153 .register_encoded_file_descriptor_set(runtime::FILE_DESCRIPTOR_SET)
4154 .build()
4155 .context("failed to build reflection service")?;
4156
4157 let cors_layer = cors.as_ref().map(|cfg| cfg.to_layer());
4158 let http_router = {
4159 let router = http::router(Arc::clone(&storage), Arc::clone(&auth), Arc::clone(&audit));
4160 match cors_layer.clone() {
4161 Some(layer) => router.layer(layer.clone()),
4162 None => router,
4163 }
4164 };
4165 let mut builder = Server::builder().accept_http1(true);
4166
4167 let grpc_service = builder
4168 .add_service(health_service)
4169 .add_service(reflection)
4170 .add_service(runtime_service)
4171 .add_service(tap_service)
4172 .into_service();
4173
4174 let grpc_service = BoxCloneService::new(grpc_service);
4175
4176 info!(%addr, "starting runtime API server (single listener)");
4177
4178 let make_service = make_service_fn(move |_conn| {
4179 let http_router = http_router.clone();
4180 let grpc_service = grpc_service.clone();
4181
4182 async move {
4183 Ok::<_, Infallible>(service_fn(move |req: HyperRequest<Body>| {
4184 let http_router = http_router.clone();
4185 let mut grpc_service = grpc_service.clone();
4186
4187 async move {
4188 if is_grpc_request(&req) {
4189 tower::Service::call(&mut grpc_service, req).await
4190 } else {
4191 let router = http_router.clone();
4192 let response = match router.oneshot(req).await {
4193 Ok(resp) => resp,
4194 Err(err) => match err {},
4195 }
4196 .map(|body| {
4197 body.map_err(|err| Status::internal(err.to_string()))
4198 .boxed_unsync()
4199 });
4200 Ok::<_, Box<dyn std::error::Error + Send + Sync>>(response)
4201 }
4202 }
4203 }))
4204 }
4205 });
4206
4207 let std_listener = listener
4208 .into_std()
4209 .context("failed to convert listener for hyper server")?;
4210 HyperServer::from_tcp(std_listener)
4211 .context("failed to build runtime API server")?
4212 .serve(make_service)
4213 .await
4214 .context("runtime API server exited unexpectedly")
4215}
4216
4217fn is_grpc_request(req: &HyperRequest<Body>) -> bool {
4218 if let Some(content_type) = req
4219 .headers()
4220 .get(header::CONTENT_TYPE)
4221 .and_then(|value| value.to_str().ok())
4222 {
4223 let lower = content_type.to_ascii_lowercase();
4224 if lower.starts_with("application/grpc")
4225 || lower.starts_with("application/grpc-web")
4226 || lower.starts_with("application/connect+")
4227 {
4228 return true;
4229 }
4230 }
4231
4232 let path = req.uri().path();
4233 path.starts_with("/fleetforge.runtime.v1.")
4234 || path.starts_with("/fleetforge.tap.v2.")
4235 || path.starts_with("/grpc.health.v1.")
4236 || path.starts_with("/grpc.reflection.v1alpha.")
4237}
4238
4239#[instrument(skip_all)]
4240async fn run_grpc_server(
4241 addr: SocketAddr,
4242 storage: Arc<Storage>,
4243 auth: Arc<AuthHandle>,
4244 audit: Arc<Audit>,
4245 transparency: Arc<dyn TransparencyLog>,
4246 transparency_writer_enabled: bool,
4247 transparency_scope: TransparencyScope,
4248 cors: Option<CorsConfig>,
4249) -> Result<()> {
4250 let attestation_vault: Arc<dyn AttestationVault> = Arc::new(ObjectStoreAttestationVault::new(
4251 storage.object_store(),
4252 storage.pool().clone(),
4253 ));
4254 let runtime_api = RuntimeApiService::new(
4255 Arc::clone(&storage),
4256 Arc::clone(&auth),
4257 Arc::clone(&audit),
4258 Arc::clone(&attestation_vault),
4259 transparency,
4260 transparency_writer_enabled,
4261 transparency_scope,
4262 );
4263 let tap_api = TapApiService::new(storage, auth);
4264
4265 let (mut health_reporter, health_service) = health_reporter();
4266 let reflection = ReflectionBuilder::configure()
4267 .register_encoded_file_descriptor_set(runtime::FILE_DESCRIPTOR_SET)
4268 .build()
4269 .context("failed to build reflection service")?;
4270
4271 health_reporter
4272 .set_service_status(
4273 "fleetforge.runtime.v1.RuntimeService",
4274 tonic_health::ServingStatus::Serving,
4275 )
4276 .await; health_reporter
4278 .set_service_status(
4279 "fleetforge.tap.v2.TapService",
4280 tonic_health::ServingStatus::Serving,
4281 )
4282 .await;
4283 health_reporter
4284 .set_service_status("", tonic_health::ServingStatus::Serving)
4285 .await;
4286
4287 info!(%addr, "starting runtime gRPC API server");
4288
4289 let runtime_service = RuntimeServiceServer::new(runtime_api);
4290 let runtime_service = tonic_web::enable(runtime_service);
4291 let tap_service = TapServiceServer::new(tap_api);
4292 let tap_service = tonic_web::enable(tap_service);
4293 let cors_layer = cors.as_ref().map(|cfg| cfg.to_layer());
4294 Server::builder()
4295 .accept_http1(true)
4296 .add_service(health_service)
4297 .add_service(reflection)
4298 .add_service(runtime_service)
4299 .add_service(tap_service)
4300 .serve(addr)
4301 .await
4302 .context("runtime gRPC API server exited unexpectedly")
4303}
4304
4305#[instrument(skip_all)]
4306async fn run_http_server(
4307 addr: SocketAddr,
4308 storage: Arc<Storage>,
4309 auth: Arc<AuthHandle>,
4310 audit: Arc<Audit>,
4311 cors: Option<CorsConfig>,
4312) -> Result<()> {
4313 let listener = TcpListener::bind(addr)
4314 .await
4315 .with_context(|| format!("failed to bind HTTP listener on {addr}"))?;
4316
4317 let cors_layer = cors.as_ref().map(|cfg| cfg.to_layer());
4319 let router = http::router(storage, auth, audit);
4320 let app = match cors_layer {
4321 Some(layer) => router.layer(layer),
4322 None => router,
4323 };
4324
4325 info!(%addr, "starting runtime HTTP API server");
4326
4327 let std_listener = listener
4328 .into_std()
4329 .context("failed to convert HTTP listener for hyper server")?;
4330 HyperServer::from_tcp(std_listener)
4331 .context("failed to build runtime HTTP server")?
4332 .serve(app.into_make_service())
4333 .await
4334 .context("runtime HTTP API server exited unexpectedly")
4335}
4336
4337#[derive(Clone, Debug)]
4338struct CorsConfig {
4339 origins: Vec<HeaderValue>,
4340 display: Vec<String>,
4341}
4342
4343impl CorsConfig {
4344 fn to_layer(&self) -> CorsLayer {
4345 CorsLayer::new()
4346 .allow_origin(self.origins.clone())
4347 .allow_methods([Method::GET, Method::POST, Method::OPTIONS])
4348 .allow_headers([header::CONTENT_TYPE, header::AUTHORIZATION])
4349 }
4350}
4351
4352fn cors_config_from_env() -> Option<CorsConfig> {
4353 let raw = std::env::var("FLEETFORGE_CORS_ALLOW_ORIGINS").ok()?;
4354 let mut origins = Vec::new();
4355 let mut display = Vec::new();
4356 for origin in raw.split(',') {
4357 let trimmed = origin.trim();
4358 if trimmed.is_empty() {
4359 continue;
4360 }
4361 match HeaderValue::from_str(trimmed) {
4362 Ok(value) => {
4363 origins.push(value);
4364 display.push(trimmed.to_string());
4365 }
4366 Err(err) => {
4367 warn!(origin = %trimmed, %err, "invalid origin in FLEETFORGE_CORS_ALLOW_ORIGINS")
4368 }
4369 }
4370 }
4371
4372 if origins.is_empty() {
4373 warn!(
4374 "no valid origins parsed from FLEETFORGE_CORS_ALLOW_ORIGINS; CORS will remain disabled"
4375 );
4376 return None;
4377 }
4378
4379 Some(CorsConfig { origins, display })
4380}
4381
4382fn transparency_writer_enabled_flag() -> bool {
4383 let enabled = matches!(
4384 std::env::var(TRANSPARENCY_WRITER_FLAG)
4385 .ok()
4386 .map(|value| value.trim().to_ascii_lowercase())
4387 .unwrap_or_default()
4388 .as_str(),
4389 "1" | "true" | "yes" | "on"
4390 );
4391 if enabled && !feature_allowed(LicensedFeature::ScittTransparency) {
4392 warn!(
4393 "FLEETFORGE_TRANSPARENCY_WRITER=1 but the current license tier does not include SCITT publishing; disabling writer"
4394 );
4395 return false;
4396 }
4397 enabled
4398}
4399
4400fn transparency_scope_from_env() -> TransparencyScope {
4401 match std::env::var("FLEETFORGE_TRANSPARENCY_SCOPE")
4402 .ok()
4403 .map(|value| value.trim().to_ascii_lowercase())
4404 .as_deref()
4405 {
4406 Some("artifacts") => TransparencyScope::Artifacts,
4407 Some("runs") => TransparencyScope::Runs,
4408 Some("gates") => TransparencyScope::Gates,
4409 Some("disabled") => TransparencyScope::Disabled,
4410 _ => TransparencyScope::Gates,
4411 }
4412}
4413
4414fn transparency_writer_interval() -> Duration {
4415 let interval = std::env::var(TRANSPARENCY_WRITER_INTERVAL_ENV)
4416 .ok()
4417 .and_then(|value| value.parse::<u64>().ok())
4418 .unwrap_or(TRANSPARENCY_WRITER_DEFAULT_INTERVAL_SECS);
4419 Duration::from_secs(interval.max(TRANSPARENCY_WRITER_INTERVAL_MIN_SECS))
4420}
4421
4422fn parse_dag_json(input: &str) -> Result<Value, Status> {
4423 serde_json::from_str(input)
4424 .map_err(|err| invalid_argument(format!("invalid dag_json payload: {err}")))
4425}
4426
4427fn extract_priority(step_obj: &Map<String, Value>) -> i16 {
4428 step_obj
4429 .get("policy")
4430 .and_then(Value::as_object)
4431 .and_then(|policy| policy.get("priority"))
4432 .and_then(Value::as_i64)
4433 .unwrap_or(0)
4434 .clamp(i16::MIN as i64, i16::MAX as i64) as i16
4435}
4436
4437fn build_run_detail(
4438 run: models::Run,
4439 steps: Vec<models::Step>,
4440) -> Result<runtime::RunDetail, Status> {
4441 let (inputs_value, labels_map) = unpack_input_ctx(&run.input_ctx);
4442 let breakpoint_specs = extract_breakpoint_specs(&run.input_ctx);
4443 let inputs_struct = match &inputs_value {
4444 Value::Null => None,
4445 Value::Object(map) if map.is_empty() => None,
4446 Value::Object(_) => Some(json_to_prost_struct(&inputs_value).map_err(|err| {
4447 internal_error(format!("failed to encode inputs for response: {err}"))
4448 })?),
4449 _ => None,
4450 };
4451
4452 let dag_json = serde_json::to_string(&run.dag_json).map_err(|err| {
4453 internal_error(format!("failed to serialise dag_json for response: {err}"))
4454 })?;
4455
4456 let run_spec = runtime::RunSpec {
4457 dag_json,
4458 inputs: inputs_struct,
4459 seed: run.seed,
4460 labels: labels_map,
4461 breakpoints: breakpoint_specs,
4462 };
4463
4464 let step_summaries = steps
4465 .into_iter()
4466 .map(step_to_summary)
4467 .collect::<Result<Vec<_>, Status>>()?;
4468
4469 Ok(runtime::RunDetail {
4470 run_id: run.run_id.to_string(),
4471 status: map_run_status(run.status) as i32,
4472 spec: Some(run_spec),
4473 created_at: Some(to_timestamp(run.created_at)),
4474 updated_at: None,
4475 steps: step_summaries,
4476 })
4477}
4478
4479fn unpack_input_ctx(ctx: &Value) -> (Value, HashMap<String, String>) {
4480 match ctx {
4481 Value::Object(map) => {
4482 let inputs = map
4483 .get("inputs")
4484 .cloned()
4485 .unwrap_or_else(|| Value::Object(Map::new()));
4486 let labels = map
4487 .get("labels")
4488 .map(labels_value_to_map)
4489 .unwrap_or_default();
4490 (inputs, labels)
4491 }
4492 other => (other.clone(), HashMap::new()),
4493 }
4494}
4495
4496fn extract_breakpoint_specs(ctx: &Value) -> Vec<String> {
4497 let Value::Object(map) = ctx else {
4498 return Vec::new();
4499 };
4500 let Some(Value::Object(debug)) = map.get("debug") else {
4501 return Vec::new();
4502 };
4503
4504 if let Some(Value::Array(specs)) = debug.get("breakpoint_specs") {
4505 return specs
4506 .iter()
4507 .filter_map(|value| value.as_str().map(|s| s.to_string()))
4508 .collect();
4509 }
4510
4511 if let Some(Value::Array(entries)) = debug.get("breakpoints") {
4512 return entries
4513 .iter()
4514 .filter_map(|entry| {
4515 entry
4516 .get("spec")
4517 .and_then(Value::as_str)
4518 .map(|s| s.to_string())
4519 })
4520 .collect();
4521 }
4522
4523 Vec::new()
4524}
4525
4526fn load_debug_breakpoints(debug: &Map<String, Value>) -> Vec<(String, Uuid)> {
4527 debug
4528 .get("breakpoints")
4529 .and_then(Value::as_array)
4530 .map(|items| {
4531 items
4532 .iter()
4533 .filter_map(|entry| {
4534 let obj = entry.as_object()?;
4535 let spec = obj.get("spec")?.as_str()?.to_string();
4536 let step_id_str = obj.get("step_id")?.as_str()?;
4537 let step_id = Uuid::parse_str(step_id_str).ok()?;
4538 Some((spec, step_id))
4539 })
4540 .collect()
4541 })
4542 .unwrap_or_default()
4543}
4544
4545fn store_debug_breakpoints(debug: &mut Map<String, Value>, entries: &[(String, Uuid)]) {
4546 let list = entries
4547 .iter()
4548 .map(|(spec, step_id)| {
4549 let mut obj = Map::new();
4550 obj.insert("spec".to_string(), Value::String(spec.clone()));
4551 obj.insert("step_id".to_string(), Value::String(step_id.to_string()));
4552 Value::Object(obj)
4553 })
4554 .collect::<Vec<_>>();
4555 debug.insert("breakpoints".to_string(), Value::Array(list));
4556
4557 let specs = entries
4558 .iter()
4559 .map(|(spec, _)| Value::String(spec.clone()))
4560 .collect::<Vec<_>>();
4561 debug.insert("breakpoint_specs".to_string(), Value::Array(specs));
4562}
4563
4564fn load_slug_map(debug: &Map<String, Value>) -> HashMap<String, Uuid> {
4565 debug
4566 .get("slugs")
4567 .and_then(Value::as_object)
4568 .map(|map| {
4569 map.iter()
4570 .filter_map(|(slug, value)| {
4571 value
4572 .as_str()
4573 .and_then(|id| Uuid::parse_str(id).ok())
4574 .map(|uuid| (slug.clone(), uuid))
4575 })
4576 .collect()
4577 })
4578 .unwrap_or_default()
4579}
4580
4581fn store_slug_map(debug: &mut Map<String, Value>, entries: &[(String, Uuid)]) {
4582 let mut slugs = Map::new();
4583 for (slug, step_id) in entries {
4584 slugs.insert(slug.clone(), Value::String(step_id.to_string()));
4585 }
4586 debug.insert("slugs".to_string(), Value::Object(slugs));
4587}
4588
4589fn extract_active_pause_step_id(debug: &Map<String, Value>) -> Option<Uuid> {
4590 debug
4591 .get("active_pause")
4592 .and_then(|value| value.as_object())
4593 .and_then(|obj| obj.get("step_id"))
4594 .and_then(Value::as_str)
4595 .and_then(|s| Uuid::parse_str(s).ok())
4596}
4597
4598fn merge_json_object(target: &mut Map<String, Value>, patch: &Map<String, Value>) {
4599 for (key, value) in patch {
4600 target.insert(key.clone(), value.clone());
4601 }
4602}
4603
4604fn collect_step_ids(run: &models::Run) -> HashSet<Uuid> {
4605 run.dag_json
4606 .get("steps")
4607 .and_then(Value::as_array)
4608 .map(|steps| {
4609 steps
4610 .iter()
4611 .filter_map(|step| {
4612 step.get("id")
4613 .and_then(Value::as_str)
4614 .and_then(|s| Uuid::parse_str(s).ok())
4615 })
4616 .collect()
4617 })
4618 .unwrap_or_default()
4619}
4620
4621fn collect_slug_entries(run: &models::Run) -> Vec<(String, Uuid)> {
4622 run.dag_json
4623 .get("steps")
4624 .and_then(Value::as_array)
4625 .map(|steps| {
4626 steps
4627 .iter()
4628 .filter_map(|step| {
4629 let id = step
4630 .get("id")
4631 .and_then(Value::as_str)
4632 .and_then(|s| Uuid::parse_str(s).ok())?;
4633 let slug = step
4634 .get("slug")
4635 .and_then(Value::as_str)
4636 .map(|s| s.trim().to_string())
4637 .filter(|s| !s.is_empty())?;
4638 Some((slug, id))
4639 })
4640 .collect()
4641 })
4642 .unwrap_or_default()
4643}
4644
4645fn build_branch_dag(dag_json: &Value, start: Uuid) -> Result<Value, SubmitRunError> {
4646 let dag_map = dag_json
4647 .as_object()
4648 .cloned()
4649 .ok_or_else(|| SubmitRunError::invalid("dag_json must be a JSON object"))?;
4650 let steps_array = dag_map
4651 .get("steps")
4652 .and_then(Value::as_array)
4653 .ok_or_else(|| SubmitRunError::invalid("dag_json.steps must be an array"))?;
4654 let edges_array = dag_map
4655 .get("edges")
4656 .and_then(Value::as_array)
4657 .ok_or_else(|| SubmitRunError::invalid("dag_json.edges must be an array"))?;
4658
4659 let mut step_defs: HashMap<Uuid, Value> = HashMap::new();
4660 for step in steps_array {
4661 let step_obj = step
4662 .as_object()
4663 .ok_or_else(|| SubmitRunError::invalid("each step must be an object"))?;
4664 let id = step_obj
4665 .get("id")
4666 .and_then(Value::as_str)
4667 .ok_or_else(|| SubmitRunError::invalid("step.id missing"))?;
4668 let step_id = Uuid::parse_str(id)
4669 .map_err(|err| SubmitRunError::invalid(format!("invalid step.id: {err}")))?;
4670 step_defs.insert(step_id, step.clone());
4671 }
4672
4673 if !step_defs.contains_key(&start) {
4674 return Err(SubmitRunError::invalid("from_step references unknown step"));
4675 }
4676
4677 let mut adjacency: HashMap<Uuid, Vec<Uuid>> = HashMap::new();
4678 for edge in edges_array {
4679 let pair = edge
4680 .as_array()
4681 .ok_or_else(|| SubmitRunError::invalid("edges entries must be arrays"))?;
4682 if pair.len() != 2 {
4683 return Err(SubmitRunError::invalid(
4684 "edges entries must contain two members",
4685 ));
4686 }
4687 let from = pair[0]
4688 .as_str()
4689 .ok_or_else(|| SubmitRunError::invalid("edge source must be a string"))?;
4690 let to = pair[1]
4691 .as_str()
4692 .ok_or_else(|| SubmitRunError::invalid("edge target must be a string"))?;
4693 let from_id = Uuid::parse_str(from)
4694 .map_err(|err| SubmitRunError::invalid(format!("invalid edge source: {err}")))?;
4695 let to_id = Uuid::parse_str(to)
4696 .map_err(|err| SubmitRunError::invalid(format!("invalid edge target: {err}")))?;
4697 adjacency.entry(from_id).or_default().push(to_id);
4698 }
4699
4700 let mut queue = VecDeque::new();
4701 let mut visited: HashSet<Uuid> = HashSet::new();
4702 queue.push_back(start);
4703 while let Some(node) = queue.pop_front() {
4704 if !visited.insert(node) {
4705 continue;
4706 }
4707 if let Some(neighbors) = adjacency.get(&node) {
4708 for next in neighbors {
4709 queue.push_back(*next);
4710 }
4711 }
4712 }
4713
4714 let filtered_steps = steps_array
4715 .iter()
4716 .filter_map(|step| {
4717 let id = step
4718 .get("id")
4719 .and_then(Value::as_str)
4720 .and_then(|s| Uuid::parse_str(s).ok());
4721 match id {
4722 Some(step_id) if visited.contains(&step_id) => Some(step.clone()),
4723 _ => None,
4724 }
4725 })
4726 .collect::<Vec<_>>();
4727
4728 let filtered_edges = edges_array
4729 .iter()
4730 .filter(|edge| {
4731 if let Some(pair) = edge.as_array() {
4732 if pair.len() == 2 {
4733 if let (Some(from), Some(to)) = (
4734 pair[0].as_str().and_then(|s| Uuid::parse_str(s).ok()),
4735 pair[1].as_str().and_then(|s| Uuid::parse_str(s).ok()),
4736 ) {
4737 return visited.contains(&from) && visited.contains(&to);
4738 }
4739 }
4740 }
4741 false
4742 })
4743 .cloned()
4744 .collect::<Vec<_>>();
4745
4746 let mut new_dag = dag_json.as_object().cloned().unwrap_or_default();
4747 new_dag.insert("steps".to_string(), Value::Array(filtered_steps));
4748 new_dag.insert("edges".to_string(), Value::Array(filtered_edges));
4749 Ok(Value::Object(new_dag))
4750}
4751
4752fn collect_prompt_packs_json(dag_json: &Value) -> Vec<String> {
4753 match dag_json {
4754 Value::Object(map) => {
4755 let packs = map.get("prompt_packs").or_else(|| map.get("promptPacks"));
4756 match packs {
4757 Some(Value::Array(items)) => items
4758 .iter()
4759 .filter_map(|item| item.as_str().map(|s| s.to_string()))
4760 .collect(),
4761 Some(Value::String(value)) => vec![value.clone()],
4762 _ => Vec::new(),
4763 }
4764 }
4765 _ => Vec::new(),
4766 }
4767}
4768
4769struct DriftComputation {
4770 token_delta: Option<f64>,
4771 cost_delta: Option<f64>,
4772 within_tolerance: bool,
4773}
4774
4775const DEFAULT_TOKEN_TOLERANCE: f64 = 0.0;
4776const DEFAULT_COST_TOLERANCE: f64 = 0.0;
4777const TOKEN_KEYS: &[&str] = &["tokens", "token_count", "total_tokens"];
4778const COST_KEYS: &[&str] = &["cost", "cost_usd", "total_cost"];
4779const SANITIZE_KEYS: &[&str] = &[
4780 "tokens",
4781 "token_count",
4782 "total_tokens",
4783 "cost",
4784 "cost_usd",
4785 "total_cost",
4786];
4787
4788fn wrap_attempt_output(attempt: Option<&models::StepAttempt>, output: &Value) -> Value {
4789 match attempt {
4790 Some(record) => json!({
4791 "attempt": record.attempt,
4792 "status": record.status.as_str(),
4793 "recorded_at": record.recorded_at,
4794 "output": output,
4795 }),
4796 None => output.clone(),
4797 }
4798}
4799
4800fn compute_drift(
4801 recorded: &Value,
4802 current: &Value,
4803 token_tolerance: f64,
4804 cost_tolerance: f64,
4805) -> DriftComputation {
4806 let token_delta = diff_numeric(recorded, current, TOKEN_KEYS);
4807 let cost_delta = diff_numeric(recorded, current, COST_KEYS);
4808
4809 let sanitized_recorded = strip_keys(recorded, SANITIZE_KEYS);
4810 let sanitized_current = strip_keys(current, SANITIZE_KEYS);
4811
4812 let tokens_within = token_delta
4813 .map(|delta| delta.abs() <= token_tolerance)
4814 .unwrap_or(true);
4815 let cost_within = cost_delta
4816 .map(|delta| delta.abs() <= cost_tolerance)
4817 .unwrap_or(true);
4818 let structure_equal = sanitized_recorded == sanitized_current;
4819
4820 DriftComputation {
4821 token_delta,
4822 cost_delta,
4823 within_tolerance: tokens_within && cost_within && structure_equal,
4824 }
4825}
4826
4827fn diff_numeric(recorded: &Value, current: &Value, keys: &[&str]) -> Option<f64> {
4828 let recorded_value = find_numeric(recorded, keys)?;
4829 let current_value = find_numeric(current, keys)?;
4830 Some(current_value - recorded_value)
4831}
4832
4833fn find_numeric(value: &Value, keys: &[&str]) -> Option<f64> {
4834 match value {
4835 Value::Object(map) => {
4836 for key in keys {
4837 if let Some(v) = map.get(*key) {
4838 if let Some(num) = v.as_f64() {
4839 return Some(num);
4840 }
4841 }
4842 }
4843 for v in map.values() {
4844 if let Some(found) = find_numeric(v, keys) {
4845 return Some(found);
4846 }
4847 }
4848 None
4849 }
4850 Value::Array(items) => {
4851 for item in items {
4852 if let Some(found) = find_numeric(item, keys) {
4853 return Some(found);
4854 }
4855 }
4856 None
4857 }
4858 _ => None,
4859 }
4860}
4861
4862fn strip_keys(value: &Value, keys: &[&str]) -> Value {
4863 match value {
4864 Value::Object(map) => {
4865 let mut new_map = serde_json::Map::new();
4866 for (k, v) in map {
4867 if keys.iter().any(|key| key == k) {
4868 continue;
4869 }
4870 new_map.insert(k.clone(), strip_keys(v, keys));
4871 }
4872 Value::Object(new_map)
4873 }
4874 Value::Array(items) => Value::Array(items.iter().map(|v| strip_keys(v, keys)).collect()),
4875 _ => value.clone(),
4876 }
4877}
4878
4879fn step_to_summary(step: models::Step) -> Result<runtime::StepSummary, Status> {
4880 let step_type = step
4881 .spec_json
4882 .get("type")
4883 .and_then(Value::as_str)
4884 .unwrap_or_default()
4885 .to_string();
4886
4887 let output = match step.output_json {
4888 Some(value) => Some(json_to_prost_value(&value).map_err(|err| {
4889 internal_error(format!(
4890 "invalid output payload for step {}: {err}",
4891 step.step_id
4892 ))
4893 })?),
4894 None => None,
4895 };
4896
4897 let error = match step.error_json {
4898 Some(value) => Some(json_to_prost_value(&value).map_err(|err| {
4899 internal_error(format!(
4900 "invalid error payload for step {}: {err}",
4901 step.step_id
4902 ))
4903 })?),
4904 None => None,
4905 };
4906
4907 Ok(runtime::StepSummary {
4908 step_id: step.step_id.to_string(),
4909 idx: step.idx,
4910 r#type: step_type,
4911 status: step.status.as_str().to_string(),
4912 attempt: step.attempt,
4913 leased_at: None,
4914 completed_at: None,
4915 output,
4916 error,
4917 })
4918}
4919
4920fn run_event_from_outbox(event: models::OutboxEvent) -> Result<runtime::RunEvent, Status> {
4921 let payload = json_to_prost_value(&event.payload).map_err(|err| {
4922 internal_error(format!(
4923 "failed to encode outbox payload for event {}: {err}",
4924 event.id
4925 ))
4926 })?;
4927
4928 Ok(runtime::RunEvent {
4929 run_id: event.run_id.to_string(),
4930 step_id: event.step_id.map(|id| id.to_string()).unwrap_or_default(),
4931 kind: event.kind,
4932 payload: Some(payload),
4933 occurred_at: Some(to_timestamp(event.created_at)),
4934 })
4935}
4936
4937fn map_run_status(status: models::RunStatus) -> runtime::RunStatus {
4938 match status {
4939 models::RunStatus::Pending => runtime::RunStatus::Pending,
4940 models::RunStatus::Running => runtime::RunStatus::Running,
4941 models::RunStatus::Succeeded => runtime::RunStatus::Succeeded,
4942 models::RunStatus::Failed => runtime::RunStatus::Failed,
4943 models::RunStatus::Canceled => runtime::RunStatus::Canceled,
4944 models::RunStatus::PausedAtStep => runtime::RunStatus::PausedAtStep,
4945 }
4946}
4947
4948#[derive(Clone, Debug)]
4949pub(crate) struct EvalScenarioSpecData {
4950 pack: String,
4951 slug: String,
4952 title: Option<String>,
4953 description: Option<String>,
4954 run_spec: Value,
4955 expected: Value,
4956}
4957
4958pub(crate) fn parse_eval_spec(spec: &Value) -> anyhow::Result<EvalScenarioSpecData> {
4959 let pack = spec
4960 .get("pack")
4961 .and_then(Value::as_str)
4962 .ok_or_else(|| anyhow!("scenario spec missing 'pack' field"))?
4963 .to_string();
4964 let slug = spec
4965 .get("slug")
4966 .and_then(Value::as_str)
4967 .ok_or_else(|| anyhow!("scenario spec missing 'slug' field"))?
4968 .to_string();
4969 let title = spec
4970 .get("title")
4971 .and_then(Value::as_str)
4972 .map(|value| value.to_string());
4973 let description = spec
4974 .get("description")
4975 .and_then(Value::as_str)
4976 .map(|value| value.to_string());
4977 let run_spec = spec
4978 .get("run_spec")
4979 .cloned()
4980 .ok_or_else(|| anyhow!("scenario spec missing 'run_spec' payload"))?;
4981 if !run_spec.is_object() {
4982 return Err(anyhow!("scenario spec 'run_spec' must be a JSON object"));
4983 }
4984 let expected = spec
4985 .get("expected")
4986 .cloned()
4987 .ok_or_else(|| anyhow!("scenario spec missing 'expected' payload"))?;
4988 if !expected.is_object() {
4989 return Err(anyhow!("scenario spec 'expected' must be a JSON object"));
4990 }
4991
4992 Ok(EvalScenarioSpecData {
4993 pack,
4994 slug,
4995 title,
4996 description,
4997 run_spec,
4998 expected,
4999 })
5000}
5001
5002fn metric_summary(current: f64, previous: Option<f64>) -> Value {
5003 let delta = previous.map(|p| current - p);
5004 json!({
5005 "value": current,
5006 "previous": previous,
5007 "delta": delta,
5008 })
5009}
5010
5011fn extract_metric(metrics: &Value, key: &str) -> Option<f64> {
5012 metrics
5013 .as_object()
5014 .and_then(|map| map.get(key))
5015 .and_then(|entry| entry.get("value"))
5016 .and_then(Value::as_f64)
5017}
5018
5019fn compute_eval_metrics(
5020 spec: &EvalScenarioSpecData,
5021 run: &models::Run,
5022 steps: &[models::Step],
5023 last_event_time: Option<DateTime<Utc>>,
5024 previous_metrics: Option<&Value>,
5025 commit: Option<String>,
5026) -> anyhow::Result<(String, Value)> {
5027 let tokens_actual: f64 = steps
5028 .iter()
5029 .map(|step| {
5030 step.actual_tokens
5031 .map(|value| value as f64)
5032 .unwrap_or_else(|| step.estimated_tokens as f64)
5033 })
5034 .sum();
5035 let cost_actual: f64 = steps
5036 .iter()
5037 .map(|step| step.actual_cost.unwrap_or(step.estimated_cost))
5038 .sum();
5039
5040 let run_end = last_event_time.unwrap_or(run.created_at);
5041 let latency_ms = run_end
5042 .signed_duration_since(run.created_at)
5043 .num_milliseconds()
5044 .max(0) as f64;
5045
5046 let previous_tokens = previous_metrics.and_then(|metrics| extract_metric(metrics, "tokens"));
5047 let previous_cost = previous_metrics.and_then(|metrics| extract_metric(metrics, "cost_usd"));
5048 let previous_latency =
5049 previous_metrics.and_then(|metrics| extract_metric(metrics, "latency_ms"));
5050
5051 let mut differences: Vec<Value> = Vec::new();
5052 if let Some(expected_status) = spec.expected.get("status").and_then(Value::as_str) {
5053 let actual_status = run.status.as_str();
5054 if !expected_status.eq_ignore_ascii_case(actual_status) {
5055 differences.push(json!({
5056 "type": "status",
5057 "expected": expected_status,
5058 "actual": actual_status,
5059 }));
5060 }
5061 }
5062
5063 let mut actual_steps: HashMap<String, Value> = HashMap::with_capacity(steps.len());
5064 for step in steps {
5065 if let Some(output) = step.output_json.clone() {
5066 actual_steps.insert(step.step_id.to_string(), output);
5067 }
5068 }
5069
5070 if let Some(expected_steps) = spec.expected.get("steps").and_then(Value::as_object) {
5071 for (step_id, expected_output) in expected_steps {
5072 match actual_steps.get(step_id) {
5073 Some(actual_output) if actual_output == expected_output => {}
5074 Some(actual_output) => {
5075 differences.push(json!({
5076 "type": "step",
5077 "step_id": step_id,
5078 "expected": expected_output,
5079 "actual": actual_output,
5080 }));
5081 }
5082 None => {
5083 differences.push(json!({
5084 "type": "step_missing",
5085 "step_id": step_id,
5086 "expected": expected_output,
5087 }));
5088 }
5089 }
5090 }
5091 }
5092
5093 let quality_matched = differences.is_empty();
5094 let outcome = if quality_matched { "passed" } else { "failed" }.to_string();
5095
5096 let mut metrics_map = Map::new();
5097 if let Some(commit) = commit {
5098 if !commit.is_empty() {
5099 metrics_map.insert("commit".to_string(), Value::String(commit));
5100 }
5101 }
5102 metrics_map.insert(
5103 "tokens".to_string(),
5104 metric_summary(tokens_actual, previous_tokens),
5105 );
5106 metrics_map.insert(
5107 "cost_usd".to_string(),
5108 metric_summary(cost_actual, previous_cost),
5109 );
5110 metrics_map.insert(
5111 "latency_ms".to_string(),
5112 metric_summary(latency_ms, previous_latency),
5113 );
5114 metrics_map.insert(
5115 "quality".to_string(),
5116 Value::Object({
5117 let mut quality = Map::new();
5118 quality.insert("matched".to_string(), Value::Bool(quality_matched));
5119 quality.insert("differences".to_string(), Value::Array(differences));
5120 quality
5121 }),
5122 );
5123
5124 Ok((outcome, Value::Object(metrics_map)))
5125}
5126
5127fn build_eval_result_proto(result: models::EvalResult) -> Result<runtime::EvalResult, Status> {
5128 let metrics_struct = json_to_prost_struct(&result.metrics)
5129 .map_err(|err| internal_error(format!("invalid eval metrics payload: {err}")))?;
5130 let commit = result
5131 .metrics
5132 .get("commit")
5133 .and_then(Value::as_str)
5134 .unwrap_or("")
5135 .to_string();
5136
5137 Ok(runtime::EvalResult {
5138 result_id: result.result_id.to_string(),
5139 run_id: result.run_id.to_string(),
5140 scenario_id: result.scenario_id.to_string(),
5141 outcome: result.outcome,
5142 commit,
5143 executed_at: Some(to_timestamp(result.executed_at)),
5144 metrics: Some(metrics_struct),
5145 })
5146}
5147
5148fn build_eval_scenario_proto(
5149 scenario: &models::EvalScenario,
5150 spec: &EvalScenarioSpecData,
5151 history: Vec<runtime::EvalResult>,
5152) -> Result<runtime::EvalScenario, Status> {
5153 Ok(runtime::EvalScenario {
5154 scenario_id: scenario.scenario_id.to_string(),
5155 pack: spec.pack.clone(),
5156 slug: spec.slug.clone(),
5157 title: spec.title.clone().unwrap_or_else(|| spec.slug.clone()),
5158 description: spec.description.clone().unwrap_or_default(),
5159 created_at: Some(to_timestamp(scenario.created_at)),
5160 history,
5161 })
5162}
5163
5164#[cfg(test)]
5165mod unit_tests {
5166 use super::*;
5167 use chrono::Utc;
5168 use fleetforge_common::prost_json::prost_value_to_json;
5169 use fleetforge_storage::models;
5170 use serde_json::{json, Map, Value};
5171 use std::collections::{HashMap, VecDeque};
5172 use std::time::Duration;
5173 use tonic::Code;
5174 use uuid::Uuid;
5175
5176 #[test]
5177 fn parse_page_token_supports_defaults() {
5178 assert_eq!(parse_page_token("").expect("empty token"), 0);
5179 assert_eq!(parse_page_token("0").expect("zero token"), 0);
5180 assert!(parse_page_token("abc").is_err());
5181 }
5182
5183 #[test]
5184 fn query_filters_normalizes_inputs() {
5185 let run_id = Uuid::new_v4();
5186 let mut request = QueryEventsRequest::default();
5187 request.run_ids.push(run_id.to_string());
5188 request.kinds.push("Step_Succeeded".to_string());
5189 request.workspace_ids.push("Workspace-1".to_string());
5190 request
5191 .policy_effects
5192 .push(PolicyDecisionEffect::PolicyDecisionEffectDeny as i32);
5193 request.severities.push(EventSeverity::Error as i32);
5194
5195 let filters = QueryFiltersOwned::from_request(&request).expect("filters");
5196 assert_eq!(filters.run_ids, vec![run_id]);
5197 assert_eq!(filters.kinds, vec!["step_succeeded"]);
5198 assert_eq!(filters.workspace_ids, vec!["workspace-1"]);
5199 assert_eq!(
5200 filters.policy_effects,
5201 vec![PolicyDecisionEffect::PolicyDecisionEffectDeny]
5202 );
5203 assert_eq!(filters.severities, vec![EventSeverity::Error]);
5204 }
5205
5206 #[test]
5207 fn matches_filters_respects_workspace_and_kind() {
5208 let run_id = Uuid::new_v4();
5209 let mut request = QueryEventsRequest::default();
5210 request.run_ids.push(run_id.to_string());
5211 request.workspace_ids.push("tenant-x".to_string());
5212 request.kinds.push("run_succeeded".to_string());
5213 let filters = QueryFiltersOwned::from_request(&request).expect("filters");
5214
5215 let payload = json!({"trace": {"run": {"workspace_id": "Tenant-X"}}});
5216 let kind = "run_succeeded";
5217 let severity = EventSeverity::Info;
5218 assert!(
5219 matches_filters(&filters, &payload, kind, severity, None),
5220 "matching workspace should pass filter"
5221 );
5222
5223 let other_payload = json!({"trace": {"run": {"workspace_id": "tenant-y"}}});
5224 assert!(
5225 !matches_filters(&filters, &other_payload, kind, severity, None),
5226 "non-matching workspace should fail filter"
5227 );
5228 }
5229
5230 #[test]
5231 fn parse_dag_json_round_trip() {
5232 let dag = r#"{"steps": []}"#;
5233 let parsed = parse_dag_json(dag).expect("valid JSON");
5234 assert_eq!(parsed["steps"], Value::Array(vec![]));
5235
5236 let err = parse_dag_json("not valid json").expect_err("should reject invalid JSON");
5237 assert_eq!(err.code(), Code::InvalidArgument);
5238 }
5239
5240 #[test]
5241 fn extract_priority_clamps_to_i16() {
5242 let mut policy = Map::new();
5243 policy.insert("priority".to_string(), Value::from(40_000));
5244 let mut step = Map::new();
5245 step.insert("policy".to_string(), Value::Object(policy));
5246 let priority = extract_priority(&step);
5247 assert_eq!(priority, i16::MAX);
5248 }
5249
5250 #[test]
5251 fn extract_priority_defaults_to_zero() {
5252 let step = Map::new();
5253 let priority = extract_priority(&step);
5254 assert_eq!(priority, 0);
5255 }
5256
5257 #[test]
5258 fn extract_priority_handles_negative_overflow() {
5259 let mut policy = Map::new();
5260 policy.insert("priority".to_string(), Value::from(-50_000));
5261 let mut step = Map::new();
5262 step.insert("policy".to_string(), Value::Object(policy));
5263 let priority = extract_priority(&step);
5264 assert_eq!(priority, i16::MIN);
5265 }
5266
5267 #[test]
5268 fn labels_map_round_trip() {
5269 let mut map = HashMap::new();
5270 map.insert("team".to_string(), "infra".to_string());
5271 map.insert("env".to_string(), "dev".to_string());
5272 let value = labels_map_to_value(&map);
5273 let round_trip = labels_value_to_map(&value);
5274 assert_eq!(map, round_trip);
5275 }
5276
5277 #[test]
5278 fn build_run_detail_converts_payloads() {
5279 let run_id = Uuid::new_v4();
5280 let step_id = Uuid::new_v4();
5281 let now = Utc::now();
5282
5283 let run = models::Run {
5284 run_id,
5285 created_at: now,
5286 status: models::RunStatus::Pending,
5287 dag_json: json!({"steps": []}),
5288 input_ctx: json!({
5289 "inputs": {"foo": "bar"},
5290 "labels": {"team": "infra"}
5291 }),
5292 seed: 7,
5293 idempotency_key: Some("abc".into()),
5294 };
5295
5296 let step = models::Step {
5297 run_id,
5298 step_id,
5299 idx: 0,
5300 priority: 0,
5301 spec_json: json!({"id": step_id, "type": "tool", "inputs": {}, "policy": {}}),
5302 status: models::StepStatus::Succeeded,
5303 attempt: 1,
5304 created_at: now,
5305 pending_dependencies: 0,
5306 total_dependencies: 0,
5307 estimated_tokens: 0,
5308 estimated_cost: 0.0,
5309 actual_tokens: Some(10),
5310 actual_cost: Some(0.01),
5311 leased_by: None,
5312 lease_expires_at: None,
5313 output_json: Some(json!({"answer": 42})),
5314 error_json: Some(json!({"message": "none"})),
5315 input_snapshot: None,
5316 output_snapshot: None,
5317 provider: Some("test".into()),
5318 provider_version: Some("1.0".into()),
5319 };
5320
5321 let detail = build_run_detail(run, vec![step]).expect("build run detail");
5322 let spec = detail.spec.expect("spec present");
5323 assert_eq!(spec.dag_json, r#"{"steps":[]}"#);
5324 assert_eq!(spec.seed, 7);
5325 assert_eq!(spec.labels.get("team"), Some(&"infra".to_string()));
5326
5327 let step_summary = &detail.steps[0];
5328 assert_eq!(step_summary.step_id, step_id.to_string());
5329 assert_eq!(step_summary.status, "succeeded");
5330 let output_value = prost_value_to_json(step_summary.output.as_ref().unwrap());
5331 assert_eq!(output_value, json!({"answer": 42}));
5332 }
5333
5334 #[test]
5335 fn subscription_config_parses_filters() {
5336 let run_id = Uuid::new_v4().to_string();
5337 let step_id = Uuid::new_v4().to_string();
5338 let open = SubscribeRunOpen {
5339 run_ids: vec![run_id.clone()],
5340 step_ids: vec![step_id.clone()],
5341 kinds: vec!["step_succeeded".to_string()],
5342 severities: vec![EventSeverity::Warn as i32],
5343 tags: vec!["team:core".to_string()],
5344 since_offset: 10,
5345 batch_size: 32,
5346 max_inflight: 20,
5347 heartbeat_interval_ms: 5_000,
5348 };
5349
5350 let config = SubscriptionConfig::try_from_open(open).expect("valid config");
5351 assert_eq!(config.run_id.to_string(), run_id);
5352 assert!(config
5353 .step_ids
5354 .as_ref()
5355 .expect("step filter")
5356 .contains(&Uuid::parse_str(&step_id).unwrap()));
5357 assert!(config
5358 .kinds
5359 .as_ref()
5360 .expect("kind filter")
5361 .contains("step_succeeded"));
5362 assert!(config
5363 .severities
5364 .as_ref()
5365 .expect("severity filter")
5366 .contains(&EventSeverity::Warn));
5367 assert!(config
5368 .tags
5369 .as_ref()
5370 .expect("tag filter")
5371 .contains("team:core"));
5372 assert_eq!(config.since_offset, 10);
5373 assert_eq!(config.batch_size, 32);
5374 assert_eq!(config.max_inflight, 20);
5375 assert_eq!(config.heartbeat.as_millis(), 5_000);
5376 }
5377
5378 #[test]
5379 fn handle_ack_evicts_inflight_offsets() {
5380 let mut inflight = VecDeque::from(vec![5, 9, 12]);
5381 let mut last_ack = 0;
5382 handle_ack(&mut inflight, &mut last_ack, 9);
5383 assert_eq!(last_ack, 9);
5384 assert_eq!(inflight, VecDeque::from(vec![12]));
5385
5386 handle_ack(&mut inflight, &mut last_ack, 8);
5387 assert_eq!(last_ack, 9, "ack lower than last should be ignored");
5388 assert_eq!(inflight, VecDeque::from(vec![12]));
5389
5390 handle_ack(&mut inflight, &mut last_ack, 15);
5391 assert_eq!(last_ack, 15);
5392 assert!(inflight.is_empty());
5393 }
5394
5395 #[test]
5396 fn passes_filters_applies_tags_and_severities() {
5397 let run_id = Uuid::new_v4();
5398 let step_id = Uuid::new_v4();
5399 let base_config = SubscriptionConfig {
5400 run_id,
5401 step_ids: Some(vec![step_id].into_iter().collect()),
5402 kinds: Some(vec!["step_succeeded".to_string()].into_iter().collect()),
5403 severities: Some(vec![EventSeverity::Info].into_iter().collect()),
5404 tags: Some(vec!["severity:info".to_string()].into_iter().collect()),
5405 cursor: None,
5406 since_offset: 0,
5407 batch_size: 16,
5408 max_inflight: 8,
5409 heartbeat: Duration::from_millis(1_000),
5410 };
5411
5412 let event = models::OutboxEvent {
5413 id: 1,
5414 run_id,
5415 step_id: Some(step_id),
5416 kind: "STEP_SUCCEEDED".to_string(),
5417 payload: json!({
5418 "status": "succeeded"
5419 }),
5420 created_at: Utc::now(),
5421 published_at: None,
5422 };
5423
5424 let severity = derive_severity(&event.kind, &event.payload);
5425 assert_eq!(severity, EventSeverity::Info);
5426 let tags = derive_tags(&event.kind, event.step_id, &event.payload, severity);
5427 assert!(passes_filters(&base_config, &event, severity, &tags));
5428
5429 let bad_config = SubscriptionConfig {
5430 tags: Some(vec!["severity:error".to_string()].into_iter().collect()),
5431 ..base_config
5432 };
5433 assert!(!passes_filters(&bad_config, &event, severity, &tags));
5434 }
5435}
5436
5437#[cfg(test)]
5438mod tests {
5439 use super::*;
5440 use chrono::Utc;
5441 use fleetforge_storage::{ObjectStoreConfig, StorageConfig};
5442 use fleetforge_telemetry::audit::Audit;
5443 use serde_json::json;
5444 use std::collections::HashMap;
5445 use testcontainers::clients::Cli;
5446 use testcontainers::images::postgres::Postgres;
5447 use tokio::time::{sleep, Duration};
5448 use uuid::Uuid;
5449
5450 fn build_request(dag: serde_json::Value) -> runtime::SubmitRunRequest {
5451 build_request_with_key(dag, "")
5452 }
5453
5454 fn build_request_with_key(
5455 dag: serde_json::Value,
5456 idempotency_key: impl Into<String>,
5457 ) -> runtime::SubmitRunRequest {
5458 runtime::SubmitRunRequest {
5459 spec: Some(runtime::RunSpec {
5460 dag_json: serde_json::to_string(&dag).unwrap(),
5461 inputs: None,
5462 seed: 123,
5463 labels: HashMap::new(),
5464 }),
5465 idempotency_key: idempotency_key.into(),
5466 }
5467 }
5468
5469 #[tokio::test]
5470 async fn submit_run_rejects_invalid_step_schema() {
5471 if std::fs::metadata("/var/run/docker.sock").is_err()
5472 && std::env::var("DOCKER_HOST").is_err()
5473 {
5474 eprintln!("Skipping API tests because Docker is unavailable");
5475 return;
5476 }
5477
5478 let docker = Cli::default();
5479 let container = docker.run(Postgres::default());
5480 let port = container.get_host_port_ipv4(5432);
5481 let db_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
5482 sleep(Duration::from_millis(250)).await;
5483
5484 let storage = Arc::new(
5485 Storage::connect(StorageConfig {
5486 database_url: db_url,
5487 max_connections: 2,
5488 connect_timeout: Duration::from_secs(10),
5489 object_store: ObjectStoreConfig::InMemory,
5490 })
5491 .await
5492 .expect("storage connect"),
5493 );
5494 let auth = Arc::new(AuthHandle::disabled());
5495 let audit = Arc::new(Audit::new(storage.pool().clone()));
5496 let attestation_vault: Arc<dyn AttestationVault> =
5497 Arc::new(InMemoryAttestationVault::new());
5498 let transparency = transparency::local_transparency_log(Arc::clone(&storage));
5499 let service = RuntimeApiService::new(
5500 storage,
5501 auth,
5502 audit,
5503 attestation_vault,
5504 transparency,
5505 false,
5506 TransparencyScope::Disabled,
5507 );
5508
5509 let dag = serde_json::json!({
5510 "steps": [
5511 {
5512 "type": "tool",
5513 "inputs": {}
5514 }
5515 ]
5516 });
5517
5518 let request = build_request(dag);
5519 let response = service.submit_run(Request::new(request)).await;
5520 assert!(response.is_err(), "expected schema validation failure");
5521 let status = response.err().unwrap();
5522 assert_eq!(status.code(), tonic::Code::InvalidArgument);
5523 assert!(status.message().contains("step schema validation failed"));
5524 drop(container);
5525 }
5526
5527 #[tokio::test]
5528 async fn submit_run_rejects_missing_seed() {
5529 if std::fs::metadata("/var/run/docker.sock").is_err()
5530 && std::env::var("DOCKER_HOST").is_err()
5531 {
5532 eprintln!("Skipping API tests because Docker is unavailable");
5533 return;
5534 }
5535
5536 let docker = Cli::default();
5537 let container = docker.run(Postgres::default());
5538 let port = container.get_host_port_ipv4(5432);
5539 let db_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
5540 sleep(Duration::from_millis(250)).await;
5541
5542 let storage = Arc::new(
5543 Storage::connect(StorageConfig {
5544 database_url: db_url,
5545 max_connections: 2,
5546 connect_timeout: Duration::from_secs(10),
5547 object_store: ObjectStoreConfig::InMemory,
5548 })
5549 .await
5550 .expect("storage connect"),
5551 );
5552 let auth = Arc::new(AuthHandle::disabled());
5553 let audit = Arc::new(Audit::new(storage.pool().clone()));
5554 let attestation_vault: Arc<dyn AttestationVault> =
5555 Arc::new(InMemoryAttestationVault::new());
5556 let transparency = transparency::local_transparency_log(Arc::clone(&storage));
5557 let service = RuntimeApiService::new(
5558 storage,
5559 auth,
5560 audit,
5561 attestation_vault,
5562 transparency,
5563 false,
5564 TransparencyScope::Disabled,
5565 );
5566
5567 let step_id = Uuid::new_v4();
5568 let dag = json!({
5569 "steps": [
5570 {
5571 "id": step_id,
5572 "type": "tool",
5573 "inputs": {
5574 "command": ["echo", "ok"]
5575 }
5576 }
5577 ]
5578 });
5579
5580 let mut request = build_request(dag);
5581 if let Some(spec) = request.spec.as_mut() {
5582 spec.seed = 0;
5583 }
5584
5585 let response = service.submit_run(Request::new(request)).await;
5586 assert!(response.is_err(), "expected seed validation failure");
5587 let status = response.err().unwrap();
5588 assert_eq!(status.code(), tonic::Code::InvalidArgument);
5589 assert!(status.message().contains("seed must be a positive integer"));
5590
5591 drop(container);
5592 }
5593
5594 #[tokio::test]
5595 async fn submit_run_invalid_dag_variants() {
5596 if std::fs::metadata("/var/run/docker.sock").is_err()
5597 && std::env::var("DOCKER_HOST").is_err()
5598 {
5599 eprintln!("Skipping API tests because Docker is unavailable");
5600 return;
5601 }
5602
5603 let docker = Cli::default();
5604 let container = docker.run(Postgres::default());
5605 let port = container.get_host_port_ipv4(5432);
5606 let db_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
5607 sleep(Duration::from_millis(250)).await;
5608
5609 let storage = Arc::new(
5610 Storage::connect(StorageConfig {
5611 database_url: db_url,
5612 max_connections: 2,
5613 connect_timeout: Duration::from_secs(10),
5614 object_store: ObjectStoreConfig::InMemory,
5615 })
5616 .await
5617 .expect("storage connect"),
5618 );
5619 let auth = Arc::new(AuthHandle::disabled());
5620 let audit = Arc::new(Audit::new(storage.pool().clone()));
5621 let attestation_vault: Arc<dyn AttestationVault> =
5622 Arc::new(InMemoryAttestationVault::new());
5623 let transparency = transparency::local_transparency_log(Arc::clone(&storage));
5624 let service = RuntimeApiService::new(
5625 Arc::clone(&storage),
5626 Arc::clone(&auth),
5627 Arc::clone(&audit),
5628 Arc::clone(&attestation_vault),
5629 transparency,
5630 false,
5631 TransparencyScope::Disabled,
5632 );
5633
5634 let step_a = Uuid::new_v4();
5635 let step_b = Uuid::new_v4();
5636 let valid_step = json!({
5637 "id": step_a,
5638 "type": "tool",
5639 "inputs": {}
5640 });
5641
5642 let cases: Vec<(&str, serde_json::Value, &str)> = vec![
5643 (
5644 "missing_steps",
5645 json!({ "edges": [] }),
5646 "dag_json.steps must be an array",
5647 ),
5648 (
5649 "unknown_step_type",
5650 json!({ "steps": [ { "id": step_a, "type": "unknown", "inputs": {} } ] }),
5651 "step schema validation failed",
5652 ),
5653 (
5654 "duplicate_step_id",
5655 json!({ "steps": [valid_step.clone(), valid_step.clone()] }),
5656 "duplicate step.id",
5657 ),
5658 (
5659 "self_edge",
5660 json!({ "steps": [valid_step.clone()], "edges": [[step_a, step_a]] }),
5661 "edge cannot reference the same step twice",
5662 ),
5663 (
5664 "edge_unknown_target",
5665 json!({ "steps": [valid_step.clone()], "edges": [[step_a, step_b]] }),
5666 "edge references unknown step id",
5667 ),
5668 (
5669 "duplicate_edge",
5670 json!({ "steps": [valid_step.clone(), {"id": step_b, "type": "tool", "inputs": {}}], "edges": [[step_a, step_b], [step_a, step_b]] }),
5671 "duplicate edge detected",
5672 ),
5673 ];
5674
5675 for (label, dag, expected) in cases {
5676 let request = build_request(dag);
5677 let response = service.submit_run(Request::new(request)).await;
5678 assert!(response.is_err(), "case {label} unexpectedly succeeded");
5679 let status = response.err().unwrap();
5680 assert_eq!(status.code(), tonic::Code::InvalidArgument, "case {label}");
5681 assert!(
5682 status.message().contains(expected),
5683 "case {label} expected '{expected}' but got '{}'",
5684 status.message()
5685 );
5686 }
5687
5688 drop(container);
5689 }
5690
5691 #[tokio::test]
5692 async fn submit_run_accepts_valid_dag_and_persists_steps() {
5693 if std::fs::metadata("/var/run/docker.sock").is_err()
5694 && std::env::var("DOCKER_HOST").is_err()
5695 {
5696 eprintln!("Skipping API tests because Docker is unavailable");
5697 return;
5698 }
5699
5700 let docker = Cli::default();
5701 let container = docker.run(Postgres::default());
5702 let port = container.get_host_port_ipv4(5432);
5703 let db_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
5704 sleep(Duration::from_millis(250)).await;
5705
5706 let storage = Arc::new(
5707 Storage::connect(StorageConfig {
5708 database_url: db_url,
5709 max_connections: 2,
5710 connect_timeout: Duration::from_secs(10),
5711 object_store: ObjectStoreConfig::InMemory,
5712 })
5713 .await
5714 .expect("storage connect"),
5715 );
5716 let auth = Arc::new(AuthHandle::disabled());
5717 let audit = Arc::new(Audit::new(storage.pool().clone()));
5718 let attestation_vault: Arc<dyn AttestationVault> =
5719 Arc::new(InMemoryAttestationVault::new());
5720 let transparency = transparency::local_transparency_log(Arc::clone(&storage));
5721 let service = RuntimeApiService::new(
5722 Arc::clone(&storage),
5723 Arc::clone(&auth),
5724 Arc::clone(&audit),
5725 Arc::clone(&attestation_vault),
5726 transparency,
5727 false,
5728 TransparencyScope::Disabled,
5729 );
5730
5731 let step_a = Uuid::new_v4();
5732 let step_b = Uuid::new_v4();
5733 let step_c = Uuid::new_v4();
5734
5735 let dag = serde_json::json!({
5736 "steps": [
5737 {
5738 "id": step_a,
5739 "type": "tool",
5740 "inputs": {},
5741 "policy": {"priority": 5}
5742 },
5743 {
5744 "id": step_b,
5745 "type": "llm",
5746 "inputs": {},
5747 "policy": {"priority": 3}
5748 },
5749 {
5750 "id": step_c,
5751 "type": "reduce",
5752 "inputs": {},
5753 "policy": {"priority": 1}
5754 }
5755 ],
5756 "edges": [
5757 [step_a, step_b],
5758 [step_b, step_c]
5759 ]
5760 });
5761
5762 let request = build_request(dag);
5763 let response = service
5764 .submit_run(Request::new(request))
5765 .await
5766 .expect("valid run should succeed")
5767 .into_inner();
5768
5769 let run_uuid = Uuid::parse_str(&response.run_id).expect("run_id must be UUID");
5770 let persisted_run = storage
5771 .fetch_run(run_uuid)
5772 .await
5773 .expect("fetch run")
5774 .expect("run should exist");
5775 assert_eq!(persisted_run.status, models::RunStatus::Pending);
5776
5777 let steps = storage
5778 .fetch_steps_for_run(run_uuid)
5779 .await
5780 .expect("fetch steps");
5781 assert_eq!(steps.len(), 3);
5782
5783 let mut by_idx = steps;
5784 by_idx.sort_by_key(|s| s.idx);
5785
5786 assert_eq!(by_idx[0].step_id, step_a);
5787 assert_eq!(by_idx[0].status, models::StepStatus::Queued);
5788 assert_eq!(by_idx[0].priority, 5);
5789 assert_eq!(by_idx[0].pending_dependencies, 0);
5790
5791 assert_eq!(by_idx[1].step_id, step_b);
5792 assert_eq!(by_idx[1].status, models::StepStatus::Blocked);
5793 assert_eq!(by_idx[1].priority, 3);
5794 assert_eq!(by_idx[1].pending_dependencies, 1);
5795
5796 assert_eq!(by_idx[2].step_id, step_c);
5797 assert_eq!(by_idx[2].status, models::StepStatus::Blocked);
5798 assert_eq!(by_idx[2].priority, 1);
5799 assert_eq!(by_idx[2].pending_dependencies, 1);
5800 assert!(
5801 by_idx
5802 .iter()
5803 .all(|s| s.total_dependencies >= s.pending_dependencies),
5804 "dependency counters should be consistent"
5805 );
5806 drop(container);
5807 }
5808
5809 #[tokio::test]
5810 async fn submit_run_respects_idempotency_key() {
5811 if std::fs::metadata("/var/run/docker.sock").is_err()
5812 && std::env::var("DOCKER_HOST").is_err()
5813 {
5814 eprintln!("Skipping API tests because Docker is unavailable");
5815 return;
5816 }
5817
5818 let docker = Cli::default();
5819 let container = docker.run(Postgres::default());
5820 let port = container.get_host_port_ipv4(5432);
5821 let db_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
5822 sleep(Duration::from_millis(250)).await;
5823
5824 let storage = Arc::new(
5825 Storage::connect(StorageConfig {
5826 database_url: db_url,
5827 max_connections: 2,
5828 connect_timeout: Duration::from_secs(10),
5829 object_store: ObjectStoreConfig::InMemory,
5830 })
5831 .await
5832 .expect("storage connect"),
5833 );
5834 let auth = Arc::new(AuthHandle::disabled());
5835 let audit = Arc::new(Audit::new(storage.pool().clone()));
5836 let attestation_vault: Arc<dyn AttestationVault> =
5837 Arc::new(InMemoryAttestationVault::new());
5838 let transparency = transparency::local_transparency_log(Arc::clone(&storage));
5839 let service = RuntimeApiService::new(
5840 Arc::clone(&storage),
5841 Arc::clone(&auth),
5842 Arc::clone(&audit),
5843 Arc::clone(&attestation_vault),
5844 transparency,
5845 false,
5846 TransparencyScope::Disabled,
5847 );
5848
5849 let step_id = Uuid::new_v4();
5850 let dag = serde_json::json!({
5851 "steps": [{
5852 "id": step_id,
5853 "type": "tool",
5854 "inputs": {}
5855 }]
5856 });
5857 let raw_key = " idempotent-key ";
5858 let normalized_key = "idempotent-key";
5859
5860 let first = service
5861 .submit_run(Request::new(build_request_with_key(dag.clone(), raw_key)))
5862 .await
5863 .expect("first submission")
5864 .into_inner();
5865
5866 let second = service
5867 .submit_run(Request::new(build_request_with_key(dag, raw_key)))
5868 .await
5869 .expect("second submission")
5870 .into_inner();
5871
5872 assert_eq!(
5873 first.run_id, second.run_id,
5874 "idempotent submissions should reuse run_id"
5875 );
5876
5877 let stored = storage
5878 .fetch_run_by_idempotency_key(normalized_key)
5879 .await
5880 .expect("lookup idempotent run")
5881 .expect("run should exist for idempotency key");
5882 assert_eq!(stored.run_id.to_string(), first.run_id);
5883
5884 drop(container);
5885 }
5886
5887 #[tokio::test]
5888 async fn get_attestations_returns_records() {
5889 if std::fs::metadata("/var/run/docker.sock").is_err()
5890 && std::env::var("DOCKER_HOST").is_err()
5891 {
5892 eprintln!("Skipping API tests because Docker is unavailable");
5893 return;
5894 }
5895
5896 let docker = Cli::default();
5897 let container = docker.run(Postgres::default());
5898 let port = container.get_host_port_ipv4(5432);
5899 let db_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
5900 sleep(Duration::from_millis(250)).await;
5901
5902 let storage = Arc::new(
5903 Storage::connect(StorageConfig {
5904 database_url: db_url,
5905 max_connections: 2,
5906 connect_timeout: Duration::from_secs(10),
5907 object_store: ObjectStoreConfig::InMemory,
5908 })
5909 .await
5910 .expect("storage connect"),
5911 );
5912 let auth = Arc::new(AuthHandle::disabled());
5913 let audit = Arc::new(Audit::new(storage.pool().clone()));
5914 let attestation_vault: Arc<dyn AttestationVault> =
5915 Arc::new(InMemoryAttestationVault::new());
5916 let transparency = transparency::local_transparency_log(Arc::clone(&storage));
5917 let service = RuntimeApiService::new(
5918 Arc::clone(&storage),
5919 Arc::clone(&auth),
5920 Arc::clone(&audit),
5921 Arc::clone(&attestation_vault),
5922 transparency,
5923 false,
5924 TransparencyScope::Disabled,
5925 );
5926
5927 let run_id = Uuid::new_v4();
5928 let attestation =
5929 Attestation::new(Uuid::new_v4(), Utc::now()).with_subject(TrustSubject::Run { run_id });
5930 attestation_vault
5931 .record(attestation.clone())
5932 .await
5933 .expect("record attestation");
5934
5935 let response = service
5936 .get_attestations(Request::new(runtime::GetAttestationsRequest {
5937 attestation_ids: vec![
5938 attestation.id.to_string(),
5939 attestation.id.to_string(),
5940 Uuid::new_v4().to_string(),
5941 ],
5942 }))
5943 .await
5944 .expect("get_attestations RPC")
5945 .into_inner();
5946
5947 assert_eq!(response.attestations.len(), 1);
5948 let returned = &response.attestations[0];
5949 assert_eq!(returned.attestation_id, attestation.id.to_string());
5950 match returned
5951 .subject
5952 .as_ref()
5953 .and_then(|subject| subject.kind.as_ref())
5954 {
5955 Some(runtime::attestation_subject::Kind::Run(run_subject)) => {
5956 assert_eq!(run_subject.run_id, run_id.to_string());
5957 }
5958 other => panic!("unexpected subject {other:?}"),
5959 }
5960
5961 drop(container);
5962 }
5963
5964 #[test]
5965 fn compute_drift_reports_within_tolerance() {
5966 let recorded = json!({
5967 "tokens": 10,
5968 "nested": { "value": "ok" }
5969 });
5970 let current = recorded.clone();
5971
5972 let result = compute_drift(
5973 &recorded,
5974 ¤t,
5975 DEFAULT_TOKEN_TOLERANCE,
5976 DEFAULT_COST_TOLERANCE,
5977 );
5978 assert!(result.within_tolerance);
5979 assert_eq!(result.token_delta, Some(0.0));
5980 assert_eq!(result.cost_delta, None);
5981 }
5982
5983 #[test]
5984 fn compute_drift_flags_token_delta() {
5985 let recorded = json!({
5986 "tokens": 10,
5987 "cost": 1.0,
5988 "payload": { "message": "hello" }
5989 });
5990 let current = json!({
5991 "tokens": 15,
5992 "cost": 1.0,
5993 "payload": { "message": "hello" }
5994 });
5995
5996 let result = compute_drift(
5997 &recorded,
5998 ¤t,
5999 DEFAULT_TOKEN_TOLERANCE,
6000 DEFAULT_COST_TOLERANCE,
6001 );
6002 assert!(!result.within_tolerance);
6003 assert_eq!(result.token_delta, Some(5.0));
6004 assert_eq!(result.cost_delta, Some(0.0));
6005 }
6006}