1use std::cmp::Ordering;
2use std::collections::HashMap;
3use std::env;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use crate::{event_policy, span_step};
8use anyhow::{anyhow, Context, Result};
9use base64::engine::general_purpose::STANDARD as BASE64;
10use base64::Engine;
11use bytes::Bytes;
12use chrono::{Duration as ChronoDuration, Utc};
13use fleetforge_telemetry::otel::{
14 self as telemetry_otel, Counter, Histogram, Meter, SpanId, TraceId,
15};
16use serde_json::{json, to_value, Map, Value};
17use tracing::{debug, error, info, warn};
18use tracing_opentelemetry::OpenTelemetrySpanExt;
19use uuid::Uuid;
20
21const C2PA_PROFILE_ENV: &str = "FLEETFORGE_C2PA_PROFILE";
22const DEFAULT_QUEUE_TARGET_MS: i64 = 30_000;
23const MAX_INFLIGHT_TOKENS_ENV: &str = "FLEETFORGE_MAX_INFLIGHT_TOKENS";
24const MAX_INFLIGHT_COST_ENV: &str = "FLEETFORGE_MAX_INFLIGHT_COST";
25const BACKPRESSURE_DELAY_ENV: &str = "FLEETFORGE_QUEUE_BACKPRESSURE_MS";
26
27use crate::aibom::build_aibom;
28use crate::capability;
29use crate::executor::{
30 artifact_meta_to_value, guardrail_budget_view, BudgetCtx, ExecutorRegistry, StepCtx,
31 StepExecutionResult, StepSlo, ToolEventRecord,
32};
33use crate::guardrails::{PolicyBundle, PolicyDecisionTrace, PolicyEvaluationRecord};
34use crate::model::{QueuedStep, RunId, StepExecution, StepId, StepSpec, StepStatus, StepType};
35use crate::otel::{record_attribute_str, record_trust_attributes};
36use crate::policy::{enforce_policy, PolicyOutcome, RuntimePolicyPack};
37use fleetforge_policy::DecisionEffect;
38use fleetforge_common::licensing::{feature_allowed, LicensedFeature};
39use fleetforge_storage::models::{self, NewOutboxEvent};
40use fleetforge_storage::{ArtifactStore, BudgetResult, Storage};
41use fleetforge_telemetry::audit::Audit;
42use fleetforge_telemetry::context::{RunScope, StepScope, TraceContext};
43use fleetforge_trust::{
44 capability_signer, generate_c2pa_manifest, mint_capability_token, trust_signer,
45 AttestationVault, CapabilityToken, CapabilityTokenSubject, IdentityEvidence, ManifestInput,
46 ManifestProfile, PolicyEvidence, Trust, TrustBoundary, TrustOrigin, TrustSource, Untrusted,
47};
48use tokio::sync::{Mutex, Semaphore};
49
50#[derive(Clone, Copy, Debug, PartialEq, Eq)]
51enum TransparencyScope {
52 Disabled,
53 Gates,
54 Runs,
55 Artifacts,
56}
57
58impl TransparencyScope {
59 fn includes_gates(self) -> bool {
60 matches!(
61 self,
62 TransparencyScope::Gates | TransparencyScope::Runs | TransparencyScope::Artifacts
63 )
64 }
65
66 fn includes_runs(self) -> bool {
67 matches!(self, TransparencyScope::Runs | TransparencyScope::Artifacts)
68 }
69
70 fn includes_artifacts(self) -> bool {
71 matches!(self, TransparencyScope::Artifacts)
72 }
73}
74
75#[derive(Clone, Debug)]
76struct SloContract {
77 tier: Option<String>,
78 queue_target_ms: Option<i64>,
79 priority_boost: i32,
80 error_budget_ratio: Option<f64>,
81}
82
83#[derive(Clone, Debug)]
84struct SloScore {
85 has_contract: bool,
86 slack_ms: i64,
87 wait_ms: i64,
88 priority: i32,
89 queue_target_ms: Option<i64>,
90 tier: Option<String>,
91 error_budget_ratio: Option<f64>,
92}
93
94impl SloScore {
95 fn compare(&self, other: &Self) -> Ordering {
96 match (self.has_contract, other.has_contract) {
97 (true, false) => Ordering::Less,
98 (false, true) => Ordering::Greater,
99 _ => {
100 if self.has_contract && other.has_contract {
101 let self_breach = self.slack_ms < 0;
102 let other_breach = other.slack_ms < 0;
103 match (self_breach, other_breach) {
104 (true, false) => return Ordering::Less,
105 (false, true) => return Ordering::Greater,
106 _ => {}
107 }
108 match self.slack_ms.cmp(&other.slack_ms) {
109 Ordering::Equal => {}
110 ord => return ord,
111 }
112 }
113
114 match other.priority.cmp(&self.priority) {
115 Ordering::Equal => self.wait_ms.cmp(&other.wait_ms),
116 ord => ord,
117 }
118 }
119 }
120 }
121}
122
123fn trace_id_from_uuid(uuid: &Uuid) -> TraceId {
124 TraceId::from_bytes(*uuid.as_bytes())
125}
126
127fn span_id_from_uuid(uuid: &Uuid) -> SpanId {
128 let raw = uuid.as_u128();
129 let mut span_bits = ((raw >> 64) as u64) ^ (raw as u64);
130 if span_bits == 0 {
131 span_bits = 1;
132 }
133 SpanId::from_bytes(span_bits.to_be_bytes())
134}
135
136fn extract_slo_contract(spec: &StepSpec) -> Option<SloContract> {
137 let slo_value = spec.policy.get("slo")?;
138 if !slo_value.is_object() {
139 return None;
140 }
141 Some(SloContract {
142 tier: slo_value
143 .get("tier")
144 .and_then(Value::as_str)
145 .map(|s| s.to_string()),
146 queue_target_ms: slo_value.get("queue_target_ms").and_then(Value::as_i64),
147 priority_boost: slo_value
148 .get("priority_boost")
149 .and_then(Value::as_i64)
150 .unwrap_or(0) as i32,
151 error_budget_ratio: slo_value.get("error_budget_ratio").and_then(Value::as_f64),
152 })
153}
154
155fn compute_slo_score(
156 step: &QueuedStep,
157 contract: Option<&SloContract>,
158 now: chrono::DateTime<Utc>,
159) -> SloScore {
160 let wait_ms = now
161 .signed_duration_since(step.created_at)
162 .num_milliseconds()
163 .max(0);
164
165 if let Some(contract) = contract {
166 let target_ms = contract.queue_target_ms.unwrap_or(DEFAULT_QUEUE_TARGET_MS);
167 let slack_ms = target_ms - wait_ms;
168 SloScore {
169 has_contract: true,
170 slack_ms,
171 wait_ms,
172 priority: i32::from(step.priority) + contract.priority_boost,
173 queue_target_ms: contract.queue_target_ms,
174 tier: contract.tier.clone(),
175 error_budget_ratio: contract.error_budget_ratio,
176 }
177 } else {
178 SloScore {
179 has_contract: false,
180 slack_ms: i64::MAX,
181 wait_ms,
182 priority: i32::from(step.priority),
183 queue_target_ms: None,
184 tier: None,
185 error_budget_ratio: None,
186 }
187 }
188}
189
190fn build_step_slo(step: &QueuedStep, now: chrono::DateTime<Utc>) -> Option<StepSlo> {
191 let contract = extract_slo_contract(&step.spec)?;
192 let score = compute_slo_score(step, Some(&contract), now);
193 Some(StepSlo {
194 tier: contract.tier.clone(),
195 queue_target_ms: Some(contract.queue_target_ms.unwrap_or(DEFAULT_QUEUE_TARGET_MS)),
196 observed_queue_ms: Some(score.wait_ms),
197 slack_ms: Some(score.slack_ms),
198 priority_boost: contract.priority_boost,
199 error_budget_ratio: contract.error_budget_ratio,
200 })
201}
202
203fn resolve_step_providers(spec: &crate::model::StepSpec) -> (Option<String>, Option<String>) {
204 let provider = spec
205 .inputs
206 .get("provider")
207 .and_then(Value::as_str)
208 .or_else(|| spec.inputs.get("model").and_then(Value::as_str))
209 .or_else(|| spec.inputs.get("tool").and_then(Value::as_str))
210 .map(|s| s.to_owned());
211 let provider_version = spec
212 .inputs
213 .get("provider_version")
214 .or_else(|| spec.inputs.get("model_version"))
215 .and_then(Value::as_str)
216 .map(|s| s.to_owned());
217 (provider, provider_version)
218}
219
220fn build_scheduler_payload(
221 spec: &crate::model::StepSpec,
222 step: &QueuedStep,
223 budget: &BudgetCtx,
224 slo: Option<&StepSlo>,
225) -> Value {
226 let execution = serde_json::to_value(&spec.execution).unwrap_or(Value::Null);
227 json!({
228 "inputs": spec.inputs,
229 "policy": spec.policy,
230 "execution": execution,
231 "step": {
232 "id": step.step_id.to_string(),
233 "type": spec.r#type,
234 "slug": spec.slug,
235 "attempt": step.attempt,
236 "max_attempts": spec.execution.max_attempts,
237 },
238 "budget": guardrail_budget_view(budget, slo),
239 })
240}
241
242#[derive(Debug, Default)]
243struct InFlightBudget {
244 tokens: i64,
245 cost: f64,
246}
247
248impl InFlightBudget {
249 fn add(&mut self, tokens: i64, cost: f64) {
250 if tokens > 0 {
251 self.tokens = self.tokens.saturating_add(tokens);
252 }
253 if cost > 0.0 {
254 self.cost += cost;
255 }
256 }
257
258 fn subtract(&mut self, tokens: i64, cost: f64) {
259 if tokens > 0 {
260 self.tokens = (self.tokens - tokens).max(0);
261 }
262 if cost > 0.0 {
263 self.cost = (self.cost - cost).max(0.0);
264 }
265 }
266}
267
268#[derive(Clone, Debug)]
269struct BreakpointRecord {
270 spec: String,
271 step_id: Uuid,
272}
273
274fn load_debug_breakpoints(debug: &Map<String, Value>) -> Vec<BreakpointRecord> {
275 debug
276 .get("breakpoints")
277 .and_then(Value::as_array)
278 .map(|items| {
279 items
280 .iter()
281 .filter_map(|entry| {
282 let obj = entry.as_object()?;
283 let spec = obj.get("spec")?.as_str()?.to_string();
284 let step_id = obj
285 .get("step_id")
286 .and_then(Value::as_str)
287 .and_then(|s| Uuid::parse_str(s).ok())?;
288 Some(BreakpointRecord { spec, step_id })
289 })
290 .collect()
291 })
292 .unwrap_or_default()
293}
294
295fn store_debug_breakpoints(debug: &mut Map<String, Value>, entries: &[BreakpointRecord]) {
296 let list = entries
297 .iter()
298 .map(|record| {
299 let mut obj = Map::new();
300 obj.insert("spec".to_string(), Value::String(record.spec.clone()));
301 obj.insert(
302 "step_id".to_string(),
303 Value::String(record.step_id.to_string()),
304 );
305 Value::Object(obj)
306 })
307 .collect::<Vec<_>>();
308 debug.insert("breakpoints".to_string(), Value::Array(list));
309
310 let specs = entries
311 .iter()
312 .map(|record| Value::String(record.spec.clone()))
313 .collect::<Vec<_>>();
314 debug.insert("breakpoint_specs".to_string(), Value::Array(specs));
315}
316
317fn set_active_pause(debug: &mut Map<String, Value>, record: &BreakpointRecord) {
318 let pause = json!({
319 "step_id": record.step_id.to_string(),
320 "spec": record.spec,
321 "started_at": chrono::Utc::now().to_rfc3339(),
322 });
323 debug.insert("active_pause".to_string(), pause);
324}
325
326#[derive(Clone, Debug)]
327struct TraceHeaders {
328 traceparent: String,
329 tracestate: Option<String>,
330 baggage: Option<String>,
331}
332
333fn parse_trace_headers(value: &Value) -> Option<TraceHeaders> {
334 let trace_value = value.get("trace")?;
335 let trace_obj = trace_value.as_object()?;
336 let traceparent = trace_obj
337 .get("traceparent")
338 .and_then(Value::as_str)?
339 .to_string();
340 let tracestate = trace_obj
341 .get("tracestate")
342 .and_then(Value::as_str)
343 .map(|s| s.to_string());
344 let baggage = trace_obj
345 .get("baggage")
346 .and_then(Value::as_str)
347 .map(|s| s.to_string());
348 Some(TraceHeaders {
349 traceparent,
350 tracestate,
351 baggage,
352 })
353}
354
355#[derive(Debug, Clone)]
357pub struct SchedulerConfig {
358 pub worker_id: String,
359 pub batch_size: i64,
360 pub idle_backoff: Duration,
361 pub active_backoff: Duration,
362 pub reaper_interval: Duration,
363 pub requeue_batch_size: i64,
364 pub max_inflight_tokens: Option<i64>,
365 pub max_inflight_cost: Option<f64>,
366 pub backpressure_delay: Duration,
367 pub transparency_writer: bool,
368 pub transparency_scope: TransparencyScope,
369}
370
371impl Default for SchedulerConfig {
372 fn default() -> Self {
373 let scope = transparency_scope_from_env();
374 let max_inflight_tokens = env::var(MAX_INFLIGHT_TOKENS_ENV)
375 .ok()
376 .and_then(|value| value.parse::<i64>().ok())
377 .filter(|value| *value > 0);
378 let max_inflight_cost = env::var(MAX_INFLIGHT_COST_ENV)
379 .ok()
380 .and_then(|value| value.parse::<f64>().ok())
381 .filter(|value| *value > 0.0);
382 let backpressure_delay = env::var(BACKPRESSURE_DELAY_ENV)
383 .ok()
384 .and_then(|value| value.parse::<u64>().ok())
385 .map(Duration::from_millis)
386 .unwrap_or_else(|| Duration::from_millis(50));
387 Self {
388 worker_id: format!("worker-{}", uuid::Uuid::new_v4()),
389 batch_size: 8,
390 idle_backoff: Duration::from_millis(200),
391 active_backoff: Duration::from_millis(10),
392 reaper_interval: Duration::from_secs(30),
393 requeue_batch_size: 128,
394 max_inflight_tokens,
395 max_inflight_cost,
396 backpressure_delay,
397 transparency_writer: transparency_writer_flag()
398 && !matches!(scope, TransparencyScope::Disabled),
399 transparency_scope: scope,
400 }
401 }
402}
403
404pub struct Scheduler {
406 storage: Arc<Storage>,
407 executors: Arc<ExecutorRegistry>,
408 policy_pack: Arc<RuntimePolicyPack>,
409 artifact_store: Arc<dyn ArtifactStore>,
410 attestation_vault: Arc<dyn AttestationVault>,
411 config: SchedulerConfig,
412 metrics: QueueMetrics,
413 concurrency: Arc<Semaphore>,
414 audit: Arc<Audit>,
415 trace_headers: Mutex<HashMap<RunId, Option<TraceHeaders>>>,
416 inflight: Mutex<InFlightBudget>,
417}
418
419fn transparency_writer_flag() -> bool {
420 let enabled = matches!(
421 std::env::var("FLEETFORGE_TRANSPARENCY_WRITER")
422 .ok()
423 .map(|value| value.trim().to_ascii_lowercase())
424 .unwrap_or_default()
425 .as_str(),
426 "1" | "true" | "yes" | "on"
427 );
428 if enabled && !feature_allowed(LicensedFeature::ScittTransparency) {
429 warn!(
430 "SCITT transparency writer requested but the current license tier does not allow it; disabling writer"
431 );
432 return false;
433 }
434 enabled
435}
436
437fn transparency_scope_from_env() -> TransparencyScope {
438 match std::env::var("FLEETFORGE_TRANSPARENCY_SCOPE")
439 .ok()
440 .map(|value| value.trim().to_ascii_lowercase())
441 .as_deref()
442 {
443 Some("artifacts") => TransparencyScope::Artifacts,
444 Some("runs") => TransparencyScope::Runs,
445 Some("gates") => TransparencyScope::Gates,
446 Some("disabled") => TransparencyScope::Disabled,
447 _ => TransparencyScope::Gates,
448 }
449}
450
451impl Scheduler {
452 pub fn new(
453 storage: Arc<Storage>,
454 executors: Arc<ExecutorRegistry>,
455 policy_pack: Arc<RuntimePolicyPack>,
456 audit: Arc<Audit>,
457 attestation_vault: Arc<dyn AttestationVault>,
458 config: SchedulerConfig,
459 ) -> Self {
460 let artifact_store = storage.artifacts();
461 let metrics = QueueMetrics::new();
462 let permits_i64 = std::cmp::max(config.batch_size, 1);
463 let permits = permits_i64.min(usize::MAX as i64) as usize;
464 Self {
465 storage,
466 executors,
467 policy_pack,
468 artifact_store,
469 attestation_vault,
470 config,
471 metrics,
472 concurrency: Arc::new(Semaphore::new(permits)),
473 audit,
474 trace_headers: Mutex::new(HashMap::new()),
475 inflight: Mutex::new(InFlightBudget::default()),
476 }
477 }
478
479 pub async fn run(self: Arc<Self>) -> Result<()> {
481 let reaper = Arc::clone(&self);
482 tokio::spawn(async move {
483 reaper.lease_reaper_loop().await;
484 });
485
486 loop {
487 let claimed = self
488 .claim_ready_steps()
489 .await
490 .context("failed to claim ready steps")?;
491
492 self.metrics.record_queue_depth(claimed.len());
493
494 if claimed.is_empty() {
495 tokio::time::sleep(self.config.idle_backoff).await;
496 continue;
497 }
498
499 for step in claimed {
500 let scheduler = Arc::clone(&self);
501 let semaphore = Arc::clone(&self.concurrency);
502 tokio::spawn(async move {
503 let _permit = semaphore
504 .acquire_owned()
505 .await
506 .expect("scheduler semaphore closed");
507 if let Err(err) = scheduler.execute_step(step).await {
508 error!(error = ?err, "step execution failed");
509 }
510 });
511 }
512
513 tokio::time::sleep(self.config.active_backoff).await;
514 }
515 }
516
517 async fn lease_reaper_loop(self: Arc<Self>) {
518 loop {
519 let sleep = match self
520 .storage
521 .requeue_expired_leases(self.config.requeue_batch_size)
522 .await
523 {
524 Ok(requeued) => {
525 if !requeued.is_empty() {
526 info!(
527 count = requeued.len(),
528 "requeued expired leases back to the queue"
529 );
530 for (run_id, step_id) in &requeued {
531 debug!(%run_id, %step_id, "expired lease requeued");
532 }
533 self.config.active_backoff
535 } else {
536 self.config.reaper_interval
537 }
538 }
539 Err(err) => {
540 warn!(error = %err, "lease reaper iteration failed");
541 self.config.reaper_interval
542 }
543 };
544
545 tokio::time::sleep(sleep).await;
546 }
547 }
548
549 async fn trace_headers_for_run(&self, run_id: RunId) -> Result<Option<TraceHeaders>> {
550 {
551 let cache = self.trace_headers.lock().await;
552 if let Some(entry) = cache.get(&run_id) {
553 return Ok(entry.clone());
554 }
555 }
556
557 let run = self
558 .storage
559 .fetch_run(Uuid::from(run_id))
560 .await
561 .context("failed to load run for trace context")?;
562 let headers = run.and_then(|row| parse_trace_headers(&row.input_ctx));
563 let mut cache = self.trace_headers.lock().await;
564 cache.insert(run_id, headers.clone());
565 Ok(headers)
566 }
567
568 async fn claim_ready_steps(&self) -> Result<Vec<QueuedStep>> {
569 let db_steps = self
570 .storage
571 .claim_queued_steps(self.config.batch_size, &self.config.worker_id)
572 .await?;
573
574 let mut steps = db_steps
575 .into_iter()
576 .map(QueuedStep::try_from)
577 .collect::<Result<Vec<_>>>()?;
578 self.rank_steps(&mut steps);
579 Ok(steps)
580 }
581
582 async fn execute_step(&self, step: QueuedStep) -> Result<()> {
583 let mut spec = step.spec.clone();
584 let mut input_snapshot_value = spec.inputs.clone();
585 let (mut provider, mut provider_version) = resolve_step_providers(&spec);
586 let mut policy_event_log: Option<(String, String, Option<String>, DecisionEffect)> = None;
587 let pack_id = self.policy_pack.id().to_string();
588
589 let observed_at = Utc::now();
590 if let Ok(wait) = observed_at.signed_duration_since(step.created_at).to_std() {
591 self.metrics.record_queue_lag(wait);
592 }
593 let step_slo = build_step_slo(&step, observed_at);
594 if step.attempt > 0 {
595 self.metrics.record_retry();
596 }
597
598 if self.try_pause_for_breakpoint(&step).await? {
599 return Ok(());
600 }
601
602 if let Some(deadline_at) = step.deadline_at {
603 if Utc::now() > deadline_at {
604 let error_payload = json!({
605 "message": "step deadline exceeded",
606 "kind": "deadline_exceeded",
607 });
608 self.finish_step(
609 &step,
610 &spec,
611 StepStatus::Failed,
612 step.attempt + 1,
613 None,
614 Some(error_payload),
615 None,
616 &BudgetCtx::default(),
617 Some(input_snapshot_value.clone()),
618 None,
619 None,
620 None,
621 None,
622 Vec::new(),
623 Vec::new(),
624 None,
625 None,
626 step_slo.as_ref(),
627 )
628 .await?;
629
630 let run_trace = self.build_run_trace_for_step(&step).await?;
631 self.schedule_compensation(&step, &run_trace).await?;
632 return Ok(());
633 }
634 }
635
636 let mut policy_records: Vec<PolicyEvaluationRecord> = Vec::new();
637 let policy_outcome = enforce_policy(self.policy_pack.as_ref(), step.run_id, &spec).await?;
638 let policy_record = policy_outcome_to_record(
639 &policy_outcome,
640 step.run_id,
641 step.step_id,
642 TrustBoundary::Scheduler,
643 &pack_id,
644 );
645 policy_records.push(policy_record.clone());
646
647 self.attestation_vault
648 .record(policy_outcome.attestation.clone())
649 .await
650 .context("failed to persist policy attestation")?;
651
652 let att_id_str = policy_outcome.attestation.id.to_string();
653 let subject_label = format!("run:{}:step:{}", step.run_id, step.step_id);
654 record_trust_attributes(
655 &[att_id_str.clone()],
656 Some(subject_label.as_str()),
657 Some(att_id_str.as_str()),
658 );
659
660 match policy_outcome.decision.effect {
661 DecisionEffect::Deny => {
662 let reason = policy_outcome
663 .decision
664 .reason
665 .clone()
666 .unwrap_or_else(|| "policy denied execution".to_string());
667 warn!(
668 run = %step.run_id,
669 step = %step.step_id,
670 policy = %pack_id,
671 "policy denied execution: {reason}"
672 );
673
674 let (artifact_value, summary) = persist_policy_decisions(
675 Arc::clone(&self.artifact_store),
676 step.run_id,
677 step.step_id,
678 &policy_records,
679 )
680 .await
681 .unwrap_or_else(|err| {
682 warn!(error = %err, "failed to persist policy decisions for denial");
683 (Value::Null, Value::Null)
684 });
685 let artifact_payload = if artifact_value.is_null() {
686 None
687 } else {
688 Some(artifact_value.clone())
689 };
690
691 let artifact_id = artifact_value
692 .get("sha256")
693 .and_then(Value::as_str)
694 .map(|s| s.to_string())
695 .or_else(|| {
696 artifact_value
697 .get("artifact_id")
698 .and_then(Value::as_str)
699 .map(|s| s.to_string())
700 });
701
702 let summary_str = summary
703 .get("reasons")
704 .and_then(Value::as_array)
705 .and_then(|arr| arr.first())
706 .and_then(Value::as_str)
707 .map(|s| s.to_string())
708 .unwrap_or_else(|| reason.clone());
709
710 policy_event_log = Some((
711 pack_id.clone(),
712 summary_str.clone(),
713 artifact_id.clone(),
714 policy_outcome.decision.effect,
715 ));
716
717 let error_payload = json!({
718 "message": reason,
719 "kind": "policy_denied",
720 });
721 self.finish_step(
722 &step,
723 &spec,
724 StepStatus::Failed,
725 step.attempt,
726 None,
727 Some(error_payload),
728 None,
729 &BudgetCtx::default(),
730 Some(input_snapshot_value),
731 None,
732 provider.as_deref(),
733 provider_version.as_deref(),
734 None,
735 None,
736 Vec::new(),
737 policy_records,
738 None,
739 artifact_payload,
740 )
741 .await?;
742 return Ok(());
743 }
744 DecisionEffect::Redact => {
745 if let Some(reason) = policy_outcome.decision.reason.as_deref() {
746 debug!(
747 run = %step.run_id,
748 step = %step.step_id,
749 policy = %pack_id,
750 "policy requested redaction: {reason}"
751 );
752 }
753 }
754 DecisionEffect::Allow => {
755 if let Some(reason) = policy_outcome.decision.reason.as_deref() {
756 debug!(
757 run = %step.run_id,
758 step = %step.step_id,
759 policy = %pack_id,
760 "policy allow with advisory: {reason}"
761 );
762 }
763 }
764 }
765
766 let mut input_snapshot = Some(input_snapshot_value.clone());
767
768 let budget_request = BudgetRequest::from_step(&step);
769 if self.should_backpressure(&budget_request).await {
770 self.defer_for_backpressure(&step).await?;
771 return Ok(());
772 }
773 let mut budget_ctx = BudgetCtx::default();
774 match self.reserve_budget(step.run_id, &budget_request).await? {
775 BudgetDecision::Granted(ctx) => {
776 budget_ctx = ctx;
777 }
778 BudgetDecision::Unlimited(ctx) => {
779 budget_ctx = ctx;
780 }
781 BudgetDecision::Denied { reason } => {
782 let error_payload = json!({
783 "message": reason,
784 "kind": "budget_exceeded",
785 });
786 self.finish_step(
787 &step,
788 &spec,
789 StepStatus::Failed,
790 step.attempt,
791 None,
792 Some(error_payload),
793 None,
794 &BudgetCtx::default(),
795 input_snapshot.clone(),
796 None,
797 None,
798 provider.as_deref(),
799 provider_version.as_deref(),
800 None,
801 None,
802 Vec::new(),
803 Vec::new(),
804 None,
805 None,
806 step_slo.as_ref(),
807 )
808 .await?;
809 return Ok(());
810 }
811 }
812 let policy_decision_id = policy_outcome.attestation.id;
813
814 if let Err(err) = self
815 .storage
816 .update_step_estimate(
817 Uuid::from(step.run_id),
818 Uuid::from(step.step_id),
819 budget_ctx.tokens_reserved,
820 budget_ctx.cost_reserved,
821 )
822 .await
823 {
824 warn!(
825 error = %err,
826 run = %step.run_id,
827 step = %step.step_id,
828 "failed to persist budget estimate"
829 );
830 }
831
832 self.metrics.record_budget_projection(&budget_ctx);
833
834 let executor = self
835 .executors
836 .get(spec.r#type)
837 .ok_or_else(|| anyhow!("no executor registered for kind {}", spec.r#type))?;
838
839 let run_uuid = Uuid::from(step.run_id);
840 let run_span_id = span_id_from_uuid(&run_uuid);
841 let trace_headers = self.trace_headers_for_run(step.run_id).await?;
842 let run_scope = RunScope::new(step.run_id.to_string());
843 let run_trace = if let Some(headers) = trace_headers {
844 match TraceContext::from_w3c(
845 run_scope.clone(),
846 &headers.traceparent,
847 headers.tracestate.as_deref(),
848 headers.baggage.as_deref(),
849 ) {
850 Ok(ctx) => ctx.with_span_id(run_span_id),
851 Err(err) => {
852 warn!(
853 run = %step.run_id,
854 error = %err,
855 "failed to parse trace headers; using fallback trace id"
856 );
857 TraceContext::new(run_scope.clone())
858 .with_trace_id(trace_id_from_uuid(&run_uuid))
859 .with_span_id(run_span_id)
860 }
861 }
862 } else {
863 TraceContext::new(run_scope.clone())
864 .with_trace_id(trace_id_from_uuid(&run_uuid))
865 .with_span_id(run_span_id)
866 };
867 let (mut step_span, step_trace) = span_step!(
868 parent: &run_trace,
869 step: &step,
870 spec: &spec,
871 worker: &self.config.worker_id
872 );
873 let _span_guard = step_span.enter();
874
875 let guardrails = PolicyBundle::for_policy(&spec.policy);
876
877 if !guardrails.is_empty() {
878 let scheduler_payload =
879 build_scheduler_payload(&spec, &step, &budget_ctx, step_slo.as_ref());
880 let outcome = guardrails
881 .evaluate(
882 TrustBoundary::Scheduler,
883 Some(step.run_id),
884 Some(step.step_id),
885 scheduler_payload,
886 )
887 .await?;
888
889 match outcome.effect {
890 DecisionEffect::Deny => {
891 let mut events = guardrails.drain_events();
892 if !policy_records.is_empty() {
893 events.extend(policy_records.clone());
894 }
895 let mut policy_artifact: Option<(Value, Value)> = None;
896 let mut artifact_id: Option<String> = None;
897 if !events.is_empty() {
898 let policy_pack = policy_pack_name(&spec.policy);
899 let dominant_effect = dominant_policy_effect(&events);
900 let summary_text = summarize_policy_events(&events);
901 let effect_label = decision_effect_label(dominant_effect);
902 match persist_policy_decisions(
903 Arc::clone(&self.artifact_store),
904 step.run_id,
905 step.step_id,
906 &events,
907 )
908 .await
909 {
910 Ok((artifact_meta, summary)) => {
911 artifact_id = artifact_meta
912 .get("sha256")
913 .and_then(Value::as_str)
914 .map(|s| s.to_string())
915 .or_else(|| {
916 artifact_meta
917 .get("artifact_id")
918 .and_then(Value::as_str)
919 .map(|s| s.to_string())
920 });
921 policy_artifact = Some((artifact_meta, summary));
922 }
923 Err(err) => {
924 warn!(
925 error = %err,
926 run = %step.run_id,
927 step = %step.step_id,
928 "failed to persist policy decisions"
929 );
930 }
931 }
932 step_span.record("fleetforge_policy_pack", policy_pack.as_str());
933 step_span.record("fleetforge_policy_effect", effect_label.as_str());
934 event_policy!(
935 pack: policy_pack.as_str(),
936 effect: effect_label.as_str(),
937 summary: summary_text.as_str(),
938 artifact: artifact_id.as_deref()
939 );
940 }
941
942 let mut error_payload = json!({
943 "message": outcome.summary(),
944 "kind": "policy_denied",
945 });
946 if let Some((ref artifact_meta, ref summary)) = policy_artifact {
947 attach_policy_summary(
948 &mut error_payload,
949 summary.clone(),
950 artifact_meta.clone(),
951 );
952 }
953 self.finish_step(
954 &step,
955 &spec,
956 StepStatus::Failed,
957 step.attempt,
958 None,
959 Some(error_payload),
960 None,
961 &budget_ctx,
962 Some(input_snapshot_value.clone()),
963 None,
964 provider.as_deref(),
965 provider_version.as_deref(),
966 None,
967 None,
968 Vec::new(),
969 events,
970 None,
971 None,
972 step_slo.as_ref(),
973 )
974 .await?;
975 return Ok(());
976 }
977 DecisionEffect::Redact => {
978 if let Some(inputs) = outcome.value.get("inputs").cloned() {
979 spec.inputs = inputs;
980 }
981 if let Some(policy_value) = outcome.value.get("policy").cloned() {
982 spec.policy = policy_value;
983 }
984 if let Some(execution_value) = outcome.value.get("execution") {
985 if let Ok(updated_execution) =
986 serde_json::from_value(execution_value.clone())
987 {
988 spec.execution = updated_execution;
989 }
990 }
991 input_snapshot_value = spec.inputs.clone();
992 input_snapshot = Some(input_snapshot_value.clone());
993 let (prov, prov_version) = resolve_step_providers(&spec);
994 provider = prov;
995 provider_version = prov_version;
996 }
997 DecisionEffect::Allow => {}
998 }
999 }
1000
1001 self.storage
1002 .mark_step_running(
1003 Uuid::from(step.run_id),
1004 Uuid::from(step.step_id),
1005 &self.config.worker_id,
1006 Some(&input_snapshot_value),
1007 provider.as_deref(),
1008 provider_version.as_deref(),
1009 )
1010 .await
1011 .context("failed to mark step running")?;
1012
1013 self.add_inflight(budget_ctx.tokens_reserved, budget_ctx.cost_reserved)
1014 .await;
1015
1016 let untrusted_inputs = Untrusted(spec.inputs.clone());
1017 let attempt_number = step.attempt + 1;
1018 let capability_token = match self.issue_capability_token(
1019 step.run_id,
1020 step.step_id,
1021 attempt_number,
1022 &spec,
1023 &budget_ctx,
1024 ) {
1025 Ok(token) => {
1026 let token_id = token.token_id().to_string();
1027 step_span.record("trust.capability_token_id", token_id.as_str());
1028 record_attribute_str("trust.capability_token_id", &token_id);
1029 Some(token)
1030 }
1031 Err(err) => {
1032 warn!(
1033 run = %step.run_id,
1034 step = %step.step_id,
1035 error = %err,
1036 "failed to mint capability token"
1037 );
1038 let mut error_payload = json!({
1039 "message": "capability token mint failed",
1040 "kind": "capability_mint_failed",
1041 "error": err.to_string(),
1042 });
1043 if let Some(obj) = error_payload.as_object_mut() {
1044 obj.insert(
1045 "step".into(),
1046 json!({ "run_id": step.run_id.to_string(), "step_id": step.step_id.to_string() }),
1047 );
1048 }
1049 self.finish_step(
1050 &step,
1051 &spec,
1052 StepStatus::Failed,
1053 step.attempt,
1054 None,
1055 Some(error_payload),
1056 None,
1057 &budget_ctx,
1058 Some(input_snapshot_value.clone()),
1059 None,
1060 provider.as_deref(),
1061 provider_version.as_deref(),
1062 Some(&step_trace),
1063 Some(&run_trace),
1064 Vec::new(),
1065 Vec::new(),
1066 None,
1067 None,
1068 step_slo.as_ref(),
1069 )
1070 .await?;
1071 return Ok(());
1072 }
1073 };
1074
1075 if let Some(violation) = self.validate_capability_guard(
1076 capability_token.as_ref(),
1077 &spec,
1078 &budget_ctx,
1079 step.run_id,
1080 step.step_id,
1081 )? {
1082 let summary_text = violation
1083 .record
1084 .reasons
1085 .first()
1086 .cloned()
1087 .unwrap_or_else(|| violation.reason.clone());
1088 step_span.record("fleetforge_policy_pack", "capability_guard");
1089 step_span.record("fleetforge_policy_effect", "deny");
1090
1091 let records = vec![violation.record.clone()];
1092 let (artifact_meta, summary) = persist_policy_decisions(
1093 Arc::clone(&self.artifact_store),
1094 step.run_id,
1095 step.step_id,
1096 &records,
1097 )
1098 .await?;
1099
1100 let artifact_id = artifact_meta
1101 .get("sha256")
1102 .and_then(Value::as_str)
1103 .map(|s| s.to_string())
1104 .or_else(|| {
1105 artifact_meta
1106 .get("artifact_id")
1107 .and_then(Value::as_str)
1108 .map(|s| s.to_string())
1109 });
1110
1111 event_policy!(
1112 pack: "capability_guard",
1113 effect: "deny",
1114 summary: summary_text.as_str(),
1115 artifact: artifact_id.as_deref()
1116 );
1117
1118 let mut error_payload = json!({
1119 "message": summary_text,
1120 "kind": "capability_denied",
1121 });
1122 if let Some(token_id) = violation.token_id.as_ref() {
1123 if let Some(obj) = error_payload.as_object_mut() {
1124 obj.insert(
1125 "capability_token_id".into(),
1126 Value::String(token_id.clone()),
1127 );
1128 }
1129 }
1130 if let Some(token) = capability_token.as_ref() {
1131 if let Some(obj) = error_payload.as_object_mut() {
1132 obj.insert(
1133 "capability_token".into(),
1134 to_value(token).expect("capability token is serialisable"),
1135 );
1136 }
1137 }
1138 attach_policy_summary(&mut error_payload, summary.clone(), artifact_meta.clone());
1139
1140 warn!(
1141 run = %step.run_id,
1142 step = %step.step_id,
1143 reason = %violation.reason,
1144 "capability guard denied step execution"
1145 );
1146
1147 self.finish_step(
1148 &step,
1149 &spec,
1150 StepStatus::Failed,
1151 step.attempt,
1152 None,
1153 Some(error_payload),
1154 None,
1155 &budget_ctx,
1156 Some(input_snapshot_value.clone()),
1157 None,
1158 provider.as_deref(),
1159 provider_version.as_deref(),
1160 Some(&step_trace),
1161 Some(&run_trace),
1162 Vec::new(),
1163 records,
1164 None,
1165 None,
1166 step_slo.as_ref(),
1167 )
1168 .await?;
1169 return Ok(());
1170 }
1171 let idempotency_key = format!("{}:{}:{}", step.run_id, step.step_id, attempt_number);
1172 let ctx = StepCtx {
1173 run_id: step.run_id,
1174 step_id: step.step_id,
1175 attempt: attempt_number,
1176 max_attempts: step.max_attempts,
1177 spec: spec.clone(),
1178 untrusted_inputs,
1179 artifacts: self.artifact_store.clone(),
1180 policy: Arc::clone(&self.policy_pack),
1181 policy_decision_id: Some(policy_decision_id),
1182 guardrails,
1183 trust: spec.trust.clone(),
1184 trust_origin: spec.trust_origin.clone(),
1185 budget: budget_ctx.clone(),
1186 trace: step_trace.clone(),
1187 checkpoint: step.checkpoint.clone(),
1188 idempotency_key,
1189 capability_token: capability_token.clone(),
1190 slo: step_slo.clone(),
1191 };
1192
1193 let start = Instant::now();
1194 let execution = executor.execute(&ctx).await;
1195 let latency = start.elapsed();
1196 step_span.record("fleetforge_duration_ms", latency.as_secs_f64() * 1000.0);
1197 let mut guardrail_events = ctx.guardrails.drain_events();
1198 if !policy_records.is_empty() {
1199 guardrail_events.extend(policy_records.clone());
1200 }
1201 let mut policy_artifact = None;
1202 if !guardrail_events.is_empty() {
1203 let mut policy_pack = policy_pack_name(&spec.policy);
1204 if policy_pack == "baseline" && !policy_records.is_empty() {
1205 policy_pack = pack_id.clone();
1206 }
1207 let dominant_effect = dominant_policy_effect(&guardrail_events);
1208 let summary_text = summarize_policy_events(&guardrail_events);
1209 match persist_policy_decisions(
1210 ctx.artifacts.clone(),
1211 step.run_id,
1212 step.step_id,
1213 &guardrail_events,
1214 )
1215 .await
1216 {
1217 Ok(value) => {
1218 let (artifact_meta, summary) = value;
1219 let artifact_id = artifact_meta
1220 .get("sha256")
1221 .and_then(Value::as_str)
1222 .map(|s| s.to_string())
1223 .or_else(|| {
1224 artifact_meta
1225 .get("artifact_id")
1226 .and_then(Value::as_str)
1227 .map(|s| s.to_string())
1228 });
1229 policy_event_log = Some((
1230 policy_pack.clone(),
1231 summary_text.clone(),
1232 artifact_id.clone(),
1233 dominant_effect,
1234 ));
1235 policy_artifact = Some((artifact_meta, summary));
1236 }
1237 Err(err) => {
1238 warn!(
1239 error = %err,
1240 run = %step.run_id,
1241 step = %step.step_id,
1242 "failed to persist policy decisions"
1243 );
1244 policy_event_log = Some((
1245 policy_pack.clone(),
1246 summary_text.clone(),
1247 None,
1248 dominant_effect,
1249 ));
1250 }
1251 }
1252 if policy_event_log.is_none() {
1253 policy_event_log = Some((
1254 policy_pack.clone(),
1255 summary_text.clone(),
1256 None,
1257 dominant_effect,
1258 ));
1259 }
1260 }
1261
1262 if let Some((pack, summary, artifact_id, effect)) = &policy_event_log {
1263 let effect_label = decision_effect_label(*effect);
1264 step_span.record("fleetforge_policy_pack", pack.as_str());
1265 step_span.record("fleetforge_policy_effect", effect_label.as_str());
1266 event_policy!(
1267 pack: pack.as_str(),
1268 effect: effect_label.as_str(),
1269 summary: summary.as_str(),
1270 artifact: artifact_id.as_deref()
1271 );
1272 tracing::info!(
1273 target: "fleetforge.policy",
1274 run = %step.run_id,
1275 step = %step.step_id,
1276 pack = %pack,
1277 effect = %effect_label,
1278 summary = %summary,
1279 artifact = %artifact_id.clone().unwrap_or_default(),
1280 "policy evaluation"
1281 );
1282 }
1283
1284 match execution {
1285 Ok(mut result) => {
1286 if result.capability_token.is_none() {
1287 if let Some(token) = ctx.capability_token.clone() {
1288 result.capability_token = Some(token);
1289 }
1290 }
1291 let StepExecutionResult {
1292 mut output,
1293 usage,
1294 provider: result_provider,
1295 provider_version: result_provider_version,
1296 tool_events,
1297 checkpoint,
1298 capability_token: result_capability_token,
1299 } = result;
1300
1301 let capability_token =
1302 result_capability_token.or_else(|| ctx.capability_token.clone());
1303
1304 if let Some((artifact_meta, summary)) = policy_artifact {
1305 attach_policy_summary(&mut output, summary, artifact_meta);
1306 }
1307
1308 if self.config.transparency_writer
1309 && self.config.transparency_scope.includes_artifacts()
1310 {
1311 if let Some(artifacts_value) = output.get_mut("artifacts") {
1312 self.decorate_artifacts(step.run_id, step.step_id, artifacts_value)
1313 .await?;
1314 }
1315 }
1316
1317 if let Some(token) = capability_token.as_ref() {
1318 if let Some(map) = output.as_object_mut() {
1319 map.insert(
1320 "capability_token".into(),
1321 to_value(token).expect("capability token serialisable"),
1322 );
1323 map.insert(
1324 "capability_token_id".into(),
1325 Value::String(token.token_id().to_string()),
1326 );
1327 }
1328 }
1329
1330 if let Some(usage) = usage {
1331 budget_ctx.tokens_actual = usage.tokens;
1332 budget_ctx.cost_actual = usage.cost;
1333 if let Some(tokens) = usage.tokens {
1334 step_span.record("fleetforge_tokens_total", tokens);
1335 }
1336 if let Some(cost) = usage.cost {
1337 step_span.record("fleetforge_cost_usd", cost);
1338 }
1339 }
1340 if let Some(p) = result_provider {
1341 provider = Some(p);
1342 }
1343 if let Some(v) = result_provider_version {
1344 provider_version = Some(v);
1345 }
1346 step_span.record("fleetforge_status", "succeeded");
1347
1348 let mut checkpoint_value = checkpoint.clone();
1349 if checkpoint_value.is_none() && step.spec.execution.checkpoint {
1350 checkpoint_value = Some(output.clone());
1351 }
1352 let artifacts_value = output.get("artifacts").cloned();
1353 let output_snapshot_value = output.clone();
1354
1355 self.finish_step(
1356 &step,
1357 &spec,
1358 StepStatus::Succeeded,
1359 step.attempt + 1,
1360 Some(output.clone()),
1361 None,
1362 Some(latency),
1363 &budget_ctx,
1364 input_snapshot.clone(),
1365 Some(output_snapshot_value),
1366 provider.as_deref(),
1367 provider_version.as_deref(),
1368 Some(&ctx.trace),
1369 Some(&run_trace),
1370 tool_events,
1371 guardrail_events.clone(),
1372 checkpoint_value,
1373 artifacts_value,
1374 ctx.slo.as_ref(),
1375 )
1376 .await
1377 }
1378 Err(err) => {
1379 step_span.record("fleetforge_status", "failed");
1380 let error_message = err.to_string();
1381 step_span.record("error_message", error_message.as_str());
1382 let mut error_payload = json!({
1383 "message": &error_message,
1384 "kind": "execution_error"
1385 });
1386 if let Some((artifact_meta, summary)) = policy_artifact {
1387 attach_policy_summary(&mut error_payload, summary, artifact_meta);
1388 }
1389 if let Some(token) = ctx.capability_token.as_ref() {
1390 if let Some(map) = error_payload.as_object_mut() {
1391 map.insert(
1392 "capability_token".into(),
1393 to_value(token).expect("capability token serialisable"),
1394 );
1395 map.insert(
1396 "capability_token_id".into(),
1397 Value::String(token.token_id().to_string()),
1398 );
1399 }
1400 }
1401 insert_trace(&mut error_payload, &ctx.trace);
1402
1403 let attempt_number = step.attempt + 1;
1404 if attempt_number < step.max_attempts {
1405 self.release_inflight(budget_ctx.tokens_reserved, budget_ctx.cost_reserved)
1406 .await;
1407 self.schedule_retry(&step, error_payload, &budget_ctx, &ctx.trace, &run_trace)
1408 .await?;
1409 return Ok(());
1410 }
1411
1412 self.finish_step(
1413 &step,
1414 &spec,
1415 StepStatus::Failed,
1416 attempt_number,
1417 None,
1418 Some(error_payload),
1419 Some(latency),
1420 &budget_ctx,
1421 input_snapshot,
1422 None,
1423 provider.as_deref(),
1424 provider_version.as_deref(),
1425 Some(&ctx.trace),
1426 Some(&run_trace),
1427 Vec::new(),
1428 guardrail_events.clone(),
1429 None,
1430 None,
1431 ctx.slo.as_ref(),
1432 )
1433 .await?;
1434
1435 self.schedule_compensation(&step, &run_trace).await
1436 }
1437 }
1438 }
1439
1440 fn rank_steps(&self, steps: &mut [QueuedStep]) {
1441 if steps.len() <= 1 {
1442 return;
1443 }
1444
1445 let now = Utc::now();
1446 let mut scores: HashMap<StepId, SloScore> = HashMap::with_capacity(steps.len());
1447 let mut original_positions: HashMap<StepId, usize> = HashMap::with_capacity(steps.len());
1448
1449 for (idx, step) in steps.iter().enumerate() {
1450 let contract = extract_slo_contract(&step.spec);
1451 let score = compute_slo_score(step, contract.as_ref(), now);
1452 if score.has_contract {
1453 self.metrics.record_slo_slack(Some(score.slack_ms));
1454 }
1455 scores.insert(step.step_id, score);
1456 original_positions.insert(step.step_id, idx);
1457 }
1458
1459 steps.sort_by(|a, b| {
1460 let sa = scores.get(&a.step_id).expect("slo score missing");
1461 let sb = scores.get(&b.step_id).expect("slo score missing");
1462 sa.compare(sb).then_with(|| a.created_at.cmp(&b.created_at))
1463 });
1464
1465 for (idx, step) in steps.iter().enumerate() {
1466 if let Some(score) = scores.get(&step.step_id) {
1467 if score.has_contract && score.slack_ms < 0 {
1468 if let Some(original) = original_positions.get(&step.step_id) {
1469 if idx < *original {
1470 self.metrics.record_slo_preemption();
1471 }
1472 }
1473 }
1474 }
1475 }
1476 }
1477
1478 async fn finish_step(
1479 &self,
1480 step: &QueuedStep,
1481 spec_snapshot: &StepSpec,
1482 status: StepStatus,
1483 attempt: i32,
1484 output: Option<Value>,
1485 error: Option<Value>,
1486 latency: Option<Duration>,
1487 budget: &BudgetCtx,
1488 input_snapshot: Option<Value>,
1489 output_snapshot: Option<Value>,
1490 provider: Option<&str>,
1491 provider_version: Option<&str>,
1492 step_trace: Option<&TraceContext>,
1493 run_trace: Option<&TraceContext>,
1494 tool_events: Vec<ToolEventRecord>,
1495 guardrail_events: Vec<PolicyEvaluationRecord>,
1496 checkpoint: Option<Value>,
1497 artifacts: Option<Value>,
1498 slo: Option<&StepSlo>,
1499 ) -> Result<()> {
1500 let mut tx = self.storage.pool().begin().await?;
1501 let run_uuid = Uuid::from(step.run_id);
1502 let step_uuid = Uuid::from(step.step_id);
1503
1504 let run_trace_ctx = if let Some(trace) = run_trace {
1505 trace.clone()
1506 } else {
1507 self.build_run_trace_for_step(step).await?
1508 };
1509 let step_trace_ctx = if let Some(trace) = step_trace {
1510 trace.clone()
1511 } else {
1512 self.build_step_trace_for_step(step, &run_trace_ctx)
1513 };
1514
1515 let artifacts_for_event = artifacts.clone();
1516
1517 self.storage
1518 .update_step_status_tx(
1519 &mut tx,
1520 run_uuid,
1521 step_uuid,
1522 models::StepStatus::from(status),
1523 Some(attempt),
1524 output.clone(),
1525 error.clone(),
1526 input_snapshot.clone(),
1527 output_snapshot.clone(),
1528 checkpoint.clone(),
1529 provider,
1530 provider_version,
1531 )
1532 .await?;
1533
1534 let actual_tokens = budget
1535 .tokens_actual
1536 .unwrap_or(budget.tokens_reserved)
1537 .max(0);
1538 let actual_cost = budget.cost_actual.unwrap_or(budget.cost_reserved).max(0.0);
1539
1540 self.storage
1541 .update_step_usage_tx(
1542 &mut tx,
1543 run_uuid,
1544 step_uuid,
1545 Some(actual_tokens),
1546 Some(actual_cost),
1547 )
1548 .await?;
1549
1550 let tokens_delta = actual_tokens - budget.tokens_reserved;
1551 let cost_delta = actual_cost - budget.cost_reserved;
1552 let mut tokens_total = if budget.tracked {
1553 budget.tokens_used
1554 } else {
1555 actual_tokens
1556 };
1557 let mut cost_total = if budget.tracked {
1558 budget.cost_used
1559 } else {
1560 actual_cost
1561 };
1562
1563 if budget.tracked {
1564 self.storage
1565 .reconcile_budget_usage_tx(&mut tx, run_uuid, tokens_delta, cost_delta)
1566 .await?;
1567 tokens_total += tokens_delta;
1568 cost_total += cost_delta;
1569 }
1570
1571 let latency_ms = latency.map(|d| d.as_secs_f64() * 1000.0);
1572 let spec_snapshot_json = serde_json::to_value(spec_snapshot)
1573 .context("failed to serialise step spec snapshot")?;
1574 let budget_snapshot_json = guardrail_budget_view(budget, slo);
1575 let guardrail_events_json = if guardrail_events.is_empty() {
1576 None
1577 } else {
1578 Some(serde_json::to_value(&guardrail_events)?)
1579 };
1580 let tool_events_json = if tool_events.is_empty() {
1581 None
1582 } else {
1583 Some(tool_events_to_value(&tool_events))
1584 };
1585
1586 self.storage
1587 .insert_step_attempt_tx(
1588 &mut tx,
1589 models::NewStepAttempt {
1590 run_id: run_uuid,
1591 step_id: step_uuid,
1592 attempt,
1593 status: models::StepStatus::from(status),
1594 latency_ms,
1595 spec_snapshot: spec_snapshot_json.clone(),
1596 input_snapshot: input_snapshot.clone(),
1597 output_snapshot: output_snapshot.clone(),
1598 error_snapshot: error.clone(),
1599 checkpoint: checkpoint.clone(),
1600 budget_snapshot: Some(budget_snapshot_json.clone()),
1601 guardrail_events: guardrail_events_json.clone(),
1602 tool_events: tool_events_json.clone(),
1603 artifacts: artifacts.clone(),
1604 },
1605 )
1606 .await?;
1607
1608 let mut newly_ready = Vec::new();
1609 let mut newly_skipped = Vec::new();
1610
1611 match status {
1612 StepStatus::Succeeded => {
1613 newly_ready = self
1614 .storage
1615 .unlock_successors_tx(&mut tx, run_uuid, step_uuid)
1616 .await?;
1617 }
1618 StepStatus::Failed => {
1619 newly_skipped = self
1620 .storage
1621 .skip_downstream_steps_tx(&mut tx, run_uuid, step_uuid)
1622 .await?;
1623 }
1624 _ => {}
1625 }
1626
1627 let (previous_run_status, current_run_status) = self
1628 .storage
1629 .refresh_run_status_tx(&mut tx, run_uuid)
1630 .await?;
1631
1632 let mut payload = json!({
1633 "run_id": step.run_id,
1634 "step_id": step.step_id,
1635 "status": status.as_str(),
1636 "output": output,
1637 "error": error,
1638 "attempt": attempt,
1639 "priority": step.priority,
1640 "provider": provider,
1641 "provider_version": provider_version,
1642 "snapshots": {
1643 "input": input_snapshot,
1644 "output": output_snapshot,
1645 "checkpoint": checkpoint,
1646 },
1647 "budget": {
1648 "tracked": budget.tracked,
1649 "reserved": {
1650 "tokens": budget.tokens_reserved,
1651 "cost": budget.cost_reserved,
1652 },
1653 "actual": {
1654 "tokens": actual_tokens,
1655 "cost": actual_cost,
1656 },
1657 "deltas": {
1658 "tokens": tokens_delta,
1659 "cost": cost_delta,
1660 },
1661 "totals": {
1662 "tokens_used": tokens_total,
1663 "cost_used": cost_total,
1664 },
1665 "limits": {
1666 "tokens": budget.token_limit,
1667 "cost": budget.cost_limit,
1668 "time_secs": budget.time_limit.map(|d| d.as_secs()),
1669 },
1670 "snapshot": budget_snapshot_json.clone(),
1671 },
1672 "spec_snapshot": spec_snapshot_json.clone(),
1673 "guardrail_events": guardrail_events_json.clone(),
1674 "tool_events": tool_events_json.clone(),
1675 "latency_ms": latency_ms,
1676 "artifacts": artifacts_for_event,
1677 });
1678 insert_trace(&mut payload, &step_trace_ctx);
1679
1680 self.storage
1681 .enqueue_outbox_event(
1682 &mut tx,
1683 NewOutboxEvent {
1684 run_id: run_uuid,
1685 step_id: Some(step_uuid),
1686 kind: format!("step_{}", status.as_str()),
1687 payload,
1688 },
1689 )
1690 .await?;
1691
1692 for event in tool_events {
1693 let mut payload = event.payload;
1694 insert_trace(&mut payload, &event.trace);
1695 self.storage
1696 .enqueue_outbox_event(
1697 &mut tx,
1698 NewOutboxEvent {
1699 run_id: run_uuid,
1700 step_id: Some(step_uuid),
1701 kind: event.kind,
1702 payload,
1703 },
1704 )
1705 .await?;
1706 }
1707
1708 self.storage
1709 .insert_cost_ledger_tx(
1710 &mut tx,
1711 run_uuid,
1712 step_uuid,
1713 attempt,
1714 status.as_str(),
1715 budget.tokens_reserved,
1716 budget.cost_reserved,
1717 Some(actual_tokens),
1718 Some(actual_cost),
1719 )
1720 .await?;
1721
1722 for queued_step_id in &newly_ready {
1723 let mut payload = json!({
1724 "run_id": step.run_id,
1725 "step_id": StepId::from(*queued_step_id),
1726 "status": StepStatus::Queued.as_str(),
1727 "reason": "dependencies_satisfied",
1728 });
1729 insert_trace(&mut payload, &run_trace_ctx);
1730
1731 self.storage
1732 .enqueue_outbox_event(
1733 &mut tx,
1734 NewOutboxEvent {
1735 run_id: run_uuid,
1736 step_id: Some(*queued_step_id),
1737 kind: "step_queued".to_string(),
1738 payload,
1739 },
1740 )
1741 .await?;
1742 }
1743
1744 for skipped_step_id in &newly_skipped {
1745 let mut payload = json!({
1746 "run_id": step.run_id,
1747 "step_id": StepId::from(*skipped_step_id),
1748 "status": StepStatus::Skipped.as_str(),
1749 "reason": "dependency_failed",
1750 });
1751 insert_trace(&mut payload, &run_trace_ctx);
1752
1753 self.storage
1754 .enqueue_outbox_event(
1755 &mut tx,
1756 NewOutboxEvent {
1757 run_id: run_uuid,
1758 step_id: Some(*skipped_step_id),
1759 kind: "step_skipped".to_string(),
1760 payload,
1761 },
1762 )
1763 .await?;
1764 }
1765
1766 if previous_run_status != current_run_status {
1767 let mut payload = json!({
1768 "run_id": step.run_id,
1769 "previous_status": previous_run_status.as_str(),
1770 "status": current_run_status.as_str(),
1771 });
1772 insert_trace(&mut payload, &run_trace_ctx);
1773
1774 self.storage
1775 .enqueue_outbox_event(
1776 &mut tx,
1777 NewOutboxEvent {
1778 run_id: run_uuid,
1779 step_id: None,
1780 kind: format!("run_{}", current_run_status.as_str()),
1781 payload,
1782 },
1783 )
1784 .await?;
1785
1786 if matches!(
1787 current_run_status,
1788 models::RunStatus::Succeeded | models::RunStatus::Failed
1789 ) {
1790 if let Err(err) = self.emit_run_bom(step.run_id, current_run_status).await {
1791 warn!(
1792 run = %step.run_id,
1793 error = %err,
1794 "failed to generate BOM for run"
1795 );
1796 }
1797 }
1798 }
1799
1800 if actual_tokens > 0 || actual_cost > 0.0 {
1801 self.storage
1802 .record_billing_usage_tx(&mut tx, run_uuid, actual_tokens, actual_cost)
1803 .await?;
1804 }
1805
1806 tx.commit().await?;
1807
1808 let tenant_uuid = match self.storage.fetch_run_tenant(run_uuid).await {
1809 Ok(value) => value.and_then(|s| Uuid::parse_str(&s).ok()),
1810 Err(err) => {
1811 warn!(error = %err, run = %step.run_id, "failed to fetch tenant for audit");
1812 None
1813 }
1814 };
1815
1816 let audit_resource = json!({
1817 "run_id": step.run_id.to_string(),
1818 "step_id": step.step_id.to_string(),
1819 "status": status.as_str(),
1820 "attempt": attempt,
1821 "worker": self.config.worker_id,
1822 });
1823
1824 if let Err(err) = self
1825 .audit
1826 .append(
1827 &self.config.worker_id,
1828 tenant_uuid,
1829 &format!("step.{}", status.as_str()),
1830 &audit_resource,
1831 )
1832 .await
1833 {
1834 warn!(error = %err, run = %step.run_id, step = %step.step_id, "failed to append audit log for step");
1835 }
1836
1837 if let Some(latency) = latency {
1838 self.metrics.record_latency(latency);
1839 }
1840 self.metrics
1841 .record_budget_consumed(actual_tokens, actual_cost);
1842
1843 if budget.tracked {
1844 let remaining_tokens = budget.token_limit.map(|limit| {
1845 let diff = limit - tokens_total;
1846 if diff < 0 {
1847 0
1848 } else {
1849 diff
1850 }
1851 });
1852 let remaining_cost = budget.cost_limit.map(|limit| (limit - cost_total).max(0.0));
1853 self.metrics
1854 .record_budget_remaining(remaining_tokens, remaining_cost);
1855 }
1856
1857 self.release_inflight(budget.tokens_reserved, budget.cost_reserved)
1858 .await;
1859
1860 Ok(())
1861 }
1862
1863 fn tool_events_to_value(events: &[ToolEventRecord]) -> Value {
1864 Value::Array(
1865 events
1866 .iter()
1867 .map(|event| {
1868 let mut object = Map::new();
1869 object.insert("kind".to_string(), Value::String(event.kind.clone()));
1870 object.insert("payload".to_string(), event.payload.clone());
1871 object.insert("trace".to_string(), event.trace.to_json());
1872 Value::Object(object)
1873 })
1874 .collect(),
1875 )
1876 }
1877
1878 async fn build_run_trace_for_step(&self, step: &QueuedStep) -> Result<TraceContext> {
1879 let run_uuid = Uuid::from(step.run_id);
1880 let run_span_id = span_id_from_uuid(&run_uuid);
1881 let trace_headers = self.trace_headers_for_run(step.run_id).await?;
1882 let run_scope = RunScope::new(step.run_id.to_string());
1883
1884 let run_trace = if let Some(headers) = trace_headers {
1885 match TraceContext::from_w3c(
1886 run_scope.clone(),
1887 &headers.traceparent,
1888 headers.tracestate.as_deref(),
1889 headers.baggage.as_deref(),
1890 ) {
1891 Ok(ctx) => ctx.with_span_id(run_span_id),
1892 Err(err) => {
1893 warn!(
1894 run = %step.run_id,
1895 error = %err,
1896 "failed to parse trace headers; using fallback trace id"
1897 );
1898 TraceContext::new(run_scope.clone())
1899 .with_trace_id(trace_id_from_uuid(&run_uuid))
1900 .with_span_id(run_span_id)
1901 }
1902 }
1903 } else {
1904 TraceContext::new(run_scope.clone())
1905 .with_trace_id(trace_id_from_uuid(&run_uuid))
1906 .with_span_id(run_span_id)
1907 };
1908
1909 Ok(run_trace)
1910 }
1911
1912 fn build_step_trace_for_step(
1913 &self,
1914 step: &QueuedStep,
1915 run_trace: &TraceContext,
1916 ) -> TraceContext {
1917 let step_index = if step.idx >= 0 { step.idx as u32 } else { 0 };
1918 let step_attempt = if step.attempt >= 0 {
1919 (step.attempt + 1) as u32
1920 } else {
1921 0
1922 };
1923
1924 let step_scope = StepScope::new(step.step_id.to_string())
1925 .with_index(step_index)
1926 .with_attempt(step_attempt)
1927 .with_kind(step.spec.r#type.clone())
1928 .with_scheduler(self.config.worker_id.clone());
1929
1930 run_trace.child_step(step_scope)
1931 }
1932
1933 async fn try_pause_for_breakpoint(&self, step: &QueuedStep) -> Result<bool> {
1934 let run_uuid = Uuid::from(step.run_id);
1935 let step_uuid = Uuid::from(step.step_id);
1936
1937 let run = match self.storage.fetch_run(run_uuid).await? {
1938 Some(run) => run,
1939 None => return Ok(false),
1940 };
1941
1942 let mut ctx_map = match run.input_ctx.clone() {
1943 Value::Object(map) => map,
1944 _ => return Ok(false),
1945 };
1946
1947 let debug_value = match ctx_map.get_mut("debug") {
1948 Some(Value::Object(map)) => map,
1949 _ => return Ok(false),
1950 };
1951
1952 let mut breakpoints = load_debug_breakpoints(debug_value);
1953 if breakpoints.is_empty() {
1954 return Ok(false);
1955 }
1956
1957 let index = match breakpoints.iter().position(|bp| bp.step_id == step_uuid) {
1958 Some(idx) => idx,
1959 None => return Ok(false),
1960 };
1961
1962 let record = breakpoints.remove(index);
1963 store_debug_breakpoints(debug_value, &breakpoints);
1964 set_active_pause(debug_value, &record);
1965
1966 let mut tx = self
1967 .storage
1968 .pool()
1969 .begin()
1970 .await
1971 .context("failed to begin pause transaction")?;
1972
1973 self.storage
1974 .update_run_input_ctx_tx(&mut tx, run_uuid, Value::Object(ctx_map.clone()))
1975 .await
1976 .context("failed to persist pause context")?;
1977
1978 self.storage
1979 .set_run_status_tx(&mut tx, run_uuid, models::RunStatus::PausedAtStep)
1980 .await
1981 .context("failed to set run status to paused")?;
1982
1983 self.storage
1984 .revert_step_to_queue_tx(&mut tx, run_uuid, step_uuid)
1985 .await
1986 .context("failed to revert step to queue after pause")?;
1987
1988 self.storage
1989 .enqueue_outbox_event(
1990 &mut tx,
1991 NewOutboxEvent {
1992 run_id: run_uuid,
1993 step_id: Some(step_uuid),
1994 kind: "run_paused".to_string(),
1995 payload: json!({
1996 "run_id": run_uuid.to_string(),
1997 "step_id": step_uuid.to_string(),
1998 "spec": record.spec,
1999 "reason": "breakpoint",
2000 }),
2001 },
2002 )
2003 .await
2004 .context("failed to enqueue pause event")?;
2005
2006 tx.commit()
2007 .await
2008 .context("failed to commit pause transaction")?;
2009
2010 let tenant_uuid = match self.storage.fetch_run_tenant(run_uuid).await {
2011 Ok(Some(tenant)) => Uuid::parse_str(&tenant).ok(),
2012 _ => None,
2013 };
2014
2015 if let Err(err) = self
2016 .audit
2017 .append(
2018 &self.config.worker_id,
2019 tenant_uuid,
2020 "debug.breakpoints.pause",
2021 &json!({
2022 "run_id": run_uuid.to_string(),
2023 "step_id": step_uuid.to_string(),
2024 "reason": "breakpoint",
2025 }),
2026 )
2027 .await
2028 {
2029 warn!(error = %err, run = %step.run_id, step = %step.step_id, "failed to append audit log for breakpoint pause");
2030 }
2031
2032 info!(run = %step.run_id, step = %step.step_id, "paused run at breakpoint");
2033 Ok(true)
2034 }
2035
2036 async fn reserve_budget(
2037 &self,
2038 run_id: RunId,
2039 request: &BudgetRequest,
2040 ) -> Result<BudgetDecision> {
2041 if let Some(tenant) = self.storage.fetch_run_tenant(Uuid::from(run_id)).await? {
2042 if let Err(err) = self
2043 .storage
2044 .enforce_tenant_quota(&tenant, request.tokens, request.cost)
2045 .await
2046 {
2047 return Ok(BudgetDecision::Denied {
2048 reason: err.to_string(),
2049 });
2050 }
2051 }
2052
2053 match self
2054 .storage
2055 .try_consume_budget(Uuid::from(run_id), request.tokens, request.cost)
2056 .await?
2057 {
2058 BudgetResult::Granted(budget) => {
2059 let mut ctx = BudgetCtx::default();
2060 ctx.token_limit = budget.tokens_limit;
2061 ctx.tokens_used = budget.tokens_used;
2062 ctx.tokens_reserved = request.tokens;
2063 ctx.cost_limit = budget.cost_limit;
2064 ctx.cost_used = budget.cost_used;
2065 ctx.cost_reserved = request.cost;
2066 ctx.tracked = true;
2067 Ok(BudgetDecision::Granted(ctx))
2068 }
2069 BudgetResult::Denied => Ok(BudgetDecision::Denied {
2070 reason: format!("budget exhausted for run {}", run_id),
2071 }),
2072 BudgetResult::NoBudgetConfigured => {
2073 let mut ctx = BudgetCtx::default();
2074 ctx.tokens_reserved = request.tokens;
2075 ctx.cost_reserved = request.cost;
2076 Ok(BudgetDecision::Unlimited(ctx))
2077 }
2078 }
2079 }
2080
2081 async fn should_backpressure(&self, request: &BudgetRequest) -> bool {
2082 if (request.tokens <= 0 && request.cost <= 0.0)
2083 || (self.config.max_inflight_tokens.is_none()
2084 && self.config.max_inflight_cost.is_none())
2085 {
2086 return false;
2087 }
2088
2089 let inflight = self.inflight.lock().await;
2090 if let Some(limit) = self.config.max_inflight_tokens {
2091 if request.tokens > 0 && inflight.tokens + request.tokens > limit {
2092 return true;
2093 }
2094 }
2095 if let Some(limit) = self.config.max_inflight_cost {
2096 if request.cost > 0.0 && inflight.cost + request.cost > limit {
2097 return true;
2098 }
2099 }
2100 false
2101 }
2102
2103 async fn defer_for_backpressure(&self, step: &QueuedStep) -> Result<()> {
2104 let delay = chrono::Duration::from_std(self.config.backpressure_delay)
2105 .unwrap_or_else(|_| ChronoDuration::milliseconds(50));
2106 let not_before = Utc::now() + delay;
2107 let run_trace = self.build_run_trace_for_step(step).await?;
2108 let mut payload = json!({
2109 "run_id": step.run_id,
2110 "step_id": step.step_id,
2111 "status": "queued",
2112 "reason": "backpressure",
2113 "not_before": not_before.to_rfc3339(),
2114 });
2115 insert_trace(&mut payload, &run_trace);
2116 self.storage
2117 .defer_step_to_queue(
2118 Uuid::from(step.run_id),
2119 Uuid::from(step.step_id),
2120 not_before,
2121 payload,
2122 )
2123 .await
2124 }
2125
2126 async fn add_inflight(&self, tokens: i64, cost: f64) {
2127 if tokens <= 0 && cost <= 0.0 {
2128 return;
2129 }
2130 let mut inflight = self.inflight.lock().await;
2131 inflight.add(tokens, cost);
2132 }
2133
2134 async fn release_inflight(&self, tokens: i64, cost: f64) {
2135 if tokens <= 0 && cost <= 0.0 {
2136 return;
2137 }
2138 let mut inflight = self.inflight.lock().await;
2139 inflight.subtract(tokens, cost);
2140 }
2141
2142 fn compute_retry_delay(&self, step: &QueuedStep) -> ChronoDuration {
2143 let base = step.retry_backoff_ms.max(0);
2144 if base == 0 {
2145 return ChronoDuration::from_std(self.config.active_backoff)
2146 .unwrap_or_else(|_| ChronoDuration::milliseconds(10));
2147 }
2148 let exponent = step.attempt.max(0);
2149 let factor = step.retry_backoff_factor.max(1.0);
2150 let scale = if exponent > 0 {
2151 factor.powi(exponent)
2152 } else {
2153 1.0
2154 };
2155 let delay = (base as f64) * scale;
2156 let clamped = delay.min(i64::MAX as f64);
2157 ChronoDuration::milliseconds(clamped as i64)
2158 }
2159
2160 fn issue_capability_token(
2161 &self,
2162 run_id: RunId,
2163 step_id: StepId,
2164 attempt: i32,
2165 spec: &StepSpec,
2166 budget: &BudgetCtx,
2167 ) -> Option<CapabilityToken> {
2168 let signer = match capability_signer() {
2169 Ok(signer) => signer,
2170 Err(err) => {
2171 warn!(
2172 run = %run_id,
2173 step = %step_id,
2174 error = %err,
2175 "failed to load capability signer; skipping capability token mint"
2176 );
2177 return None;
2178 }
2179 };
2180
2181 let subject = CapabilityTokenSubject {
2182 run_id: Uuid::from(run_id),
2183 step_id: Some(Uuid::from(step_id)),
2184 attempt: Some(attempt),
2185 };
2186 let scope = capability::build_capability_scope(spec, budget);
2187 let ttl = spec
2188 .execution
2189 .deadline_ms
2190 .map(|ms| {
2191 let clamped = ms.min(i64::MAX as u64) as i64;
2192 ChronoDuration::milliseconds(clamped)
2193 })
2194 .filter(|duration| *duration > ChronoDuration::zero())
2195 .unwrap_or_else(|| ChronoDuration::minutes(15));
2196
2197 match mint_capability_token(
2198 subject,
2199 scope,
2200 Some(vec!["runtime-tool-adapter".into()]),
2201 ttl,
2202 signer.as_ref(),
2203 ) {
2204 Ok(token) => Some(token),
2205 Err(err) => {
2206 warn!(
2207 run = %run_id,
2208 step = %step_id,
2209 error = %err,
2210 "failed to mint capability token"
2211 );
2212 None
2213 }
2214 }
2215 }
2216
2217 fn validate_capability_guard(
2218 &self,
2219 token: Option<&CapabilityToken>,
2220 spec: &StepSpec,
2221 budget: &BudgetCtx,
2222 run_id: RunId,
2223 step_id: StepId,
2224 ) -> Result<Option<CapabilityViolation>> {
2225 match capability::validate_capability_token(token, spec, budget, Utc::now()) {
2226 Ok(()) => Ok(None),
2227 Err(err) => Ok(Some(capability_violation_record(
2228 run_id,
2229 step_id,
2230 err.reasons,
2231 err.detail,
2232 err.token_id,
2233 ))),
2234 }
2235 }
2236
2237 async fn schedule_retry(
2238 &self,
2239 step: &QueuedStep,
2240 error_payload: Value,
2241 budget: &BudgetCtx,
2242 step_trace: &TraceContext,
2243 run_trace: &TraceContext,
2244 ) -> Result<()> {
2245 let delay = self.compute_retry_delay(step);
2246 let not_before = Utc::now() + delay;
2247 let run_uuid = Uuid::from(step.run_id);
2248 let step_uuid = Uuid::from(step.step_id);
2249 let attempt = step.attempt + 1;
2250
2251 let mut tx = self.storage.pool().begin().await?;
2252 self.storage
2253 .schedule_retry_tx(
2254 &mut tx,
2255 run_uuid,
2256 step_uuid,
2257 attempt,
2258 not_before,
2259 error_payload,
2260 )
2261 .await?;
2262
2263 if budget.tracked {
2264 self.storage
2265 .reconcile_budget_usage_tx(
2266 &mut tx,
2267 run_uuid,
2268 -budget.tokens_reserved,
2269 -budget.cost_reserved,
2270 )
2271 .await?;
2272 }
2273
2274 let mut payload = json!({
2275 "run_id": step.run_id,
2276 "step_id": step.step_id,
2277 "status": "queued",
2278 "reason": "retry",
2279 "attempt": attempt,
2280 "not_before": not_before.to_rfc3339(),
2281 });
2282 insert_trace(&mut payload, run_trace);
2283 insert_trace(&mut payload, step_trace);
2284
2285 self.storage
2286 .enqueue_outbox_event(
2287 &mut tx,
2288 NewOutboxEvent {
2289 run_id: run_uuid,
2290 step_id: Some(step_uuid),
2291 kind: "step_retry_scheduled".to_string(),
2292 payload,
2293 },
2294 )
2295 .await?;
2296
2297 self.storage
2298 .insert_cost_ledger_tx(
2299 &mut tx,
2300 run_uuid,
2301 step_uuid,
2302 attempt,
2303 "retry_scheduled",
2304 budget.tokens_reserved,
2305 budget.cost_reserved,
2306 budget.tokens_actual,
2307 budget.cost_actual,
2308 )
2309 .await?;
2310
2311 tx.commit().await?;
2312
2313 let tenant_uuid = match self.storage.fetch_run_tenant(run_uuid).await {
2314 Ok(value) => value.and_then(|s| Uuid::parse_str(&s).ok()),
2315 Err(err) => {
2316 warn!(
2317 error = %err,
2318 run = %step.run_id,
2319 step = %step.step_id,
2320 "failed to fetch tenant for retry audit"
2321 );
2322 None
2323 }
2324 };
2325
2326 let audit_resource = json!({
2327 "run_id": step.run_id.to_string(),
2328 "step_id": step.step_id.to_string(),
2329 "attempt": attempt,
2330 "not_before": not_before.to_rfc3339(),
2331 });
2332
2333 if let Err(err) = self
2334 .audit
2335 .append(
2336 &self.config.worker_id,
2337 tenant_uuid,
2338 "step.retry",
2339 &audit_resource,
2340 )
2341 .await
2342 {
2343 warn!(
2344 error = %err,
2345 run = %step.run_id,
2346 step = %step.step_id,
2347 "failed to append audit log for retry"
2348 );
2349 }
2350
2351 self.metrics.record_retry();
2352 Ok(())
2353 }
2354
2355 async fn schedule_compensation(
2356 &self,
2357 step: &QueuedStep,
2358 run_trace: &TraceContext,
2359 ) -> Result<()> {
2360 let Some(target) = step.compensation_step else {
2361 return Ok(());
2362 };
2363 if step.compensation_scheduled {
2364 return Ok(());
2365 }
2366
2367 let run_uuid = Uuid::from(step.run_id);
2368 let origin_uuid = Uuid::from(step.step_id);
2369 let target_uuid = Uuid::from(target);
2370
2371 let mut tx = self.storage.pool().begin().await?;
2372 self.storage
2373 .trigger_compensation_tx(&mut tx, run_uuid, origin_uuid, target_uuid)
2374 .await?;
2375
2376 let mut payload = json!({
2377 "run_id": step.run_id,
2378 "step_id": target,
2379 "reason": "compensation",
2380 });
2381 insert_trace(&mut payload, run_trace);
2382
2383 self.storage
2384 .enqueue_outbox_event(
2385 &mut tx,
2386 NewOutboxEvent {
2387 run_id: run_uuid,
2388 step_id: Some(target_uuid),
2389 kind: "step_compensation_queued".to_string(),
2390 payload,
2391 },
2392 )
2393 .await?;
2394
2395 self.storage
2396 .insert_cost_ledger_tx(
2397 &mut tx,
2398 run_uuid,
2399 origin_uuid,
2400 step.attempt + 1,
2401 "compensation_scheduled",
2402 0,
2403 0.0,
2404 None,
2405 None,
2406 )
2407 .await?;
2408
2409 tx.commit().await?;
2410
2411 let tenant_uuid = match self.storage.fetch_run_tenant(run_uuid).await {
2412 Ok(value) => value.and_then(|s| Uuid::parse_str(&s).ok()),
2413 Err(err) => {
2414 warn!(
2415 error = %err,
2416 run = %step.run_id,
2417 step = %step.step_id,
2418 "failed to fetch tenant for compensation audit"
2419 );
2420 None
2421 }
2422 };
2423
2424 let audit_resource = json!({
2425 "run_id": step.run_id.to_string(),
2426 "step_id": step.step_id.to_string(),
2427 "compensation_step": target.to_string(),
2428 });
2429
2430 if let Err(err) = self
2431 .audit
2432 .append(
2433 &self.config.worker_id,
2434 tenant_uuid,
2435 "step.compensation",
2436 &audit_resource,
2437 )
2438 .await
2439 {
2440 warn!(
2441 error = %err,
2442 run = %step.run_id,
2443 step = %step.step_id,
2444 "failed to append audit log for compensation step"
2445 );
2446 }
2447
2448 Ok(())
2449 }
2450}
2451
2452fn attach_policy_summary(target: &mut Value, mut summary: Value, artifact_meta: Value) {
2453 let Some(obj) = target.as_object_mut() else {
2454 return;
2455 };
2456
2457 if let Value::Object(ref mut summary_obj) = summary {
2458 summary_obj.insert("artifact".to_string(), artifact_meta.clone());
2459 }
2460
2461 obj.insert("policy_decisions".to_string(), summary);
2462 let entry = obj
2463 .entry("artifacts")
2464 .or_insert_with(|| Value::Object(Map::new()));
2465 if let Value::Object(ref mut map) = entry {
2466 map.insert("policy_decisions".to_string(), artifact_meta);
2467 } else {
2468 let mut map = Map::new();
2469 map.insert("policy_decisions".to_string(), artifact_meta);
2470 *entry = Value::Object(map);
2471 }
2472}
2473
2474fn insert_trace(payload: &mut Value, trace: &TraceContext) {
2475 if let Value::Object(ref mut map) = payload {
2476 map.insert("trace".to_string(), trace.to_json());
2477 }
2478}
2479
2480impl Scheduler {
2481 async fn decorate_artifacts(
2482 &self,
2483 run_id: RunId,
2484 step_id: StepId,
2485 artifacts_value: &mut Value,
2486 ) -> Result<()> {
2487 if !self.config.transparency_writer || !self.config.transparency_scope.includes_artifacts()
2488 {
2489 return Ok(());
2490 }
2491
2492 let Value::Object(map) = artifacts_value else {
2493 return Ok(());
2494 };
2495
2496 for (key, entry) in map.iter_mut() {
2497 self.enqueue_artifact_job(run_id, step_id, key, entry)
2498 .await?;
2499 }
2500
2501 Ok(())
2502 }
2503
2504 async fn enqueue_artifact_job(
2505 &self,
2506 run_id: RunId,
2507 step_id: StepId,
2508 artifact_key: &str,
2509 entry: &mut Value,
2510 ) -> Result<()> {
2511 let Value::Object(obj) = entry else {
2512 return Ok(());
2513 };
2514 let Some(sha256) = obj
2515 .get("sha256")
2516 .and_then(Value::as_str)
2517 .map(|s| s.to_string())
2518 else {
2519 return Ok(());
2520 };
2521 let metadata = obj.get("metadata").cloned().unwrap_or(Value::Null);
2522 let metadata_obj = match metadata.as_object() {
2523 Some(map) => map,
2524 None => return Ok(()),
2525 };
2526 let attestation_ids_value = metadata_obj
2527 .get("attestation_ids")
2528 .and_then(Value::as_array)
2529 .cloned()
2530 .unwrap_or_default();
2531 if attestation_ids_value.is_empty() {
2532 return Ok(());
2533 }
2534 let mut attestation_ids: Vec<Uuid> = Vec::new();
2535 for value in attestation_ids_value {
2536 if let Some(id) = value.as_str() {
2537 if let Ok(uuid) = Uuid::parse_str(id) {
2538 attestation_ids.push(uuid);
2539 }
2540 }
2541 }
2542 if attestation_ids.is_empty() {
2543 return Ok(());
2544 }
2545
2546 let payload = models::TransparencyJobPayload::ArtifactEntry(models::ArtifactEntryPayload {
2547 run_id: Some(Uuid::from(run_id)),
2548 step_id: Some(Uuid::from(step_id)),
2549 artifact_key: artifact_key.to_string(),
2550 artifact_sha256: sha256.clone(),
2551 attestation_ids: attestation_ids.clone(),
2552 metadata: metadata.clone(),
2553 });
2554
2555 match self.storage.enqueue_transparency_job(payload).await {
2556 Ok(job_id) => {
2557 obj.insert(
2558 "transparency".to_string(),
2559 json!({
2560 "status": "queued",
2561 "job_id": job_id,
2562 "artifact_sha256": sha256,
2563 "attestation_ids": attestation_ids.iter().map(|id| id.to_string()).collect::<Vec<_>>(),
2564 }),
2565 );
2566 }
2567 Err(err) => {
2568 warn!(
2569 run = %run_id,
2570 step = %step_id,
2571 artifact = artifact_key,
2572 error = %err,
2573 "failed to enqueue artifact transparency job"
2574 );
2575 }
2576 }
2577
2578 Ok(())
2579 }
2580}
2581
2582fn policy_pack_name(policy: &Value) -> String {
2583 match policy {
2584 Value::Object(map) => map
2585 .get("pack")
2586 .and_then(Value::as_str)
2587 .map(|value| value.to_string())
2588 .unwrap_or_else(|| "baseline".to_string()),
2589 _ => "baseline".to_string(),
2590 }
2591}
2592
2593fn policy_outcome_to_record(
2594 outcome: &PolicyOutcome,
2595 run_id: RunId,
2596 step_id: StepId,
2597 boundary: TrustBoundary,
2598 policy_id: &str,
2599) -> PolicyEvaluationRecord {
2600 let run_uuid = Uuid::from(run_id);
2601 let step_uuid = Uuid::from(step_id);
2602 let reasons = outcome
2603 .decision
2604 .reason
2605 .as_ref()
2606 .map(|reason| vec![reason.clone()])
2607 .unwrap_or_default();
2608 let decision_trace = PolicyDecisionTrace {
2609 policy: policy_id.to_string(),
2610 effect: outcome.decision.effect,
2611 reason: outcome.decision.reason.clone(),
2612 patches: outcome.decision.patches.clone(),
2613 };
2614
2615 PolicyEvaluationRecord {
2616 boundary,
2617 run_id: Some(run_uuid),
2618 step_id: Some(step_uuid),
2619 effect: outcome.decision.effect,
2620 reasons,
2621 decisions: vec![decision_trace],
2622 patches: outcome.decision.patches.clone(),
2623 payload: None,
2624 timestamp: outcome.attestation.issued_at,
2625 origins: Vec::new(),
2626 preview: None,
2627 attestation_id: Some(outcome.attestation.id),
2628 attestation: serde_json::to_value(&outcome.attestation).ok(),
2629 trust_decision: serde_json::to_value(&outcome.trust).ok(),
2630 }
2631}
2632
2633fn dominant_policy_effect(events: &[PolicyEvaluationRecord]) -> DecisionEffect {
2634 events
2635 .iter()
2636 .map(|event| event.effect)
2637 .max_by_key(effect_priority)
2638 .unwrap_or(DecisionEffect::Allow)
2639}
2640
2641fn effect_priority(effect: DecisionEffect) -> u8 {
2642 match effect {
2643 DecisionEffect::Deny => 4,
2644 DecisionEffect::Redact => 2,
2645 DecisionEffect::Allow => 0,
2646 }
2647}
2648
2649fn decision_effect_label(effect: DecisionEffect) -> String {
2650 match effect {
2651 DecisionEffect::Allow => "allow",
2652 DecisionEffect::Deny => "deny",
2653 DecisionEffect::Redact => "redact",
2654 }
2655 .to_string()
2656}
2657
2658fn trust_boundary_label(boundary: &TrustBoundary) -> String {
2659 serde_json::to_value(boundary)
2660 .ok()
2661 .and_then(|value| value.as_str().map(|s| s.to_string()))
2662 .unwrap_or_else(|| format!("{boundary:?}"))
2663}
2664
2665fn resolve_manifest_profile(
2666 metadata: Option<&Value>,
2667 fallback: ManifestProfile,
2668) -> ManifestProfile {
2669 metadata_manifest_profile(metadata)
2670 .or_else(env_manifest_profile)
2671 .unwrap_or(fallback)
2672}
2673
2674fn metadata_manifest_profile(metadata: Option<&Value>) -> Option<ManifestProfile> {
2675 metadata
2676 .and_then(|value| match value {
2677 Value::Object(map) => map.get("c2pa_profile").and_then(Value::as_str),
2678 _ => None,
2679 })
2680 .and_then(parse_manifest_profile_label)
2681}
2682
2683fn env_manifest_profile() -> Option<ManifestProfile> {
2684 env::var(C2PA_PROFILE_ENV)
2685 .ok()
2686 .and_then(|value| parse_manifest_profile_label(value.trim()))
2687}
2688
2689fn parse_manifest_profile_label(label: &str) -> Option<ManifestProfile> {
2690 match label.to_ascii_lowercase().as_str() {
2691 "basic" => Some(ManifestProfile::Basic),
2692 "policy" | "policy-evidence" => Some(ManifestProfile::PolicyEvidence),
2693 "full" => Some(ManifestProfile::Full),
2694 _ => None,
2695 }
2696}
2697
2698fn summarize_policy_events(events: &[PolicyEvaluationRecord]) -> String {
2699 let mut snippets: Vec<String> = Vec::new();
2700 for event in events {
2701 if !event.reasons.is_empty() {
2702 snippets.extend(event.reasons.iter().cloned());
2703 } else if let Some(preview) = &event.preview {
2704 snippets.push(preview.clone());
2705 }
2706 }
2707
2708 if snippets.is_empty() {
2709 decision_effect_label(dominant_policy_effect(events))
2710 } else {
2711 if snippets.len() > 3 {
2712 snippets.truncate(3);
2713 }
2714 snippets.join("; ")
2715 }
2716}
2717
2718async fn persist_policy_decisions(
2719 artifacts: Arc<dyn ArtifactStore>,
2720 run_id: RunId,
2721 step_id: StepId,
2722 events: &[PolicyEvaluationRecord],
2723) -> Result<(Value, Value)> {
2724 let events_value = serde_json::to_value(events)?;
2725 let payload_vec = serde_json::to_vec(&events_value)?;
2726 let attestation_ids: Vec<Uuid> = events
2727 .iter()
2728 .filter_map(|event| event.attestation_id)
2729 .collect();
2730 let boundaries: Vec<TrustBoundary> = events.iter().map(|e| e.boundary.clone()).collect();
2731 let boundary_labels: Vec<String> = boundaries.iter().map(trust_boundary_label).collect();
2732 let reasons = collect_unique_reasons(events);
2733 let denied = events.iter().any(|e| e.effect == DecisionEffect::Deny);
2734 let redacted = events.iter().any(|e| e.effect == DecisionEffect::Redact);
2735 let preview_strings: Vec<String> = events
2736 .iter()
2737 .filter_map(|e| e.preview.clone())
2738 .take(4)
2739 .collect();
2740 let previews: Vec<Value> = preview_strings
2741 .iter()
2742 .map(|preview| Value::String(preview.clone()))
2743 .collect();
2744
2745 let policy_subject = format!("run:{}:step:{}:policy_decisions", run_id, step_id);
2746 let policy_evidence = PolicyEvidence {
2747 effect: decision_effect_label(dominant_policy_effect(events)),
2748 boundaries: boundary_labels.clone(),
2749 reasons: reasons.clone(),
2750 previews: preview_strings,
2751 decision_attestation_ids: attestation_ids.clone(),
2752 };
2753 let identity = IdentityEvidence::new()
2754 .with_run_id(Uuid::from(run_id))
2755 .with_step_id(Uuid::from(step_id))
2756 .with_subject_label(policy_subject.clone());
2757 let manifest = generate_c2pa_manifest(ManifestInput {
2758 bytes: &payload_vec,
2759 media_type: "application/json",
2760 subject: Some(&policy_subject),
2761 attestation_ids: &attestation_ids,
2762 profile: ManifestProfile::PolicyEvidence,
2763 policy_evidence: Some(&policy_evidence),
2764 identity: Some(&identity),
2765 capability_evidence: None,
2766 })?;
2767 let payload = Bytes::from(payload_vec);
2768 let effects: Vec<DecisionEffect> = events.iter().map(|e| e.effect).collect();
2769 let origins = aggregate_origins(events);
2770
2771 let signer = trust_signer();
2772 let envelope = signer
2773 .sign_json(&events_value)
2774 .context("failed to sign policy decisions payload")?;
2775 let signature = BASE64.encode(&envelope.signature);
2776
2777 let mut metadata = Map::new();
2778 metadata.insert(
2779 "kind".to_string(),
2780 Value::String("policy_decisions".to_string()),
2781 );
2782 metadata.insert("count".to_string(), Value::from(events.len()));
2783 metadata.insert("effects".to_string(), serde_json::to_value(&effects)?);
2784 metadata.insert("boundaries".to_string(), serde_json::to_value(&boundaries)?);
2785 if !reasons.is_empty() {
2786 metadata.insert("reasons".to_string(), serde_json::to_value(&reasons)?);
2787 }
2788 metadata.insert("denied".to_string(), Value::Bool(denied));
2789 metadata.insert("redacted".to_string(), Value::Bool(redacted));
2790 if !origins.is_empty() {
2791 metadata.insert("origins".to_string(), Value::Array(origins.clone()));
2792 }
2793 metadata.insert("c2pa_manifest".to_string(), manifest.clone());
2794 metadata.insert("c2pa_verified".to_string(), Value::Bool(true));
2795 if !attestation_ids.is_empty() {
2796 let ids = attestation_ids
2797 .iter()
2798 .map(|id| Value::String(id.to_string()))
2799 .collect::<Vec<_>>();
2800 metadata.insert("attestation_ids".to_string(), Value::Array(ids));
2801 }
2802 metadata.insert(
2803 "signature".to_string(),
2804 json!({
2805 "algorithm": envelope.algorithm.as_str(),
2806 "key_id": envelope.key_id,
2807 "value": signature,
2808 }),
2809 );
2810
2811 let meta = artifacts
2812 .put(payload, "application/json", Value::Object(metadata))
2813 .await?;
2814
2815 let mut artifact_value = artifact_meta_to_value(&meta);
2816 if let Value::Object(ref mut obj) = artifact_value {
2817 let origin = TrustOrigin::new(TrustBoundary::Artifact)
2818 .with_run_id(Uuid::from(run_id))
2819 .with_step_id(Uuid::from(step_id))
2820 .with_source(TrustSource::System);
2821 obj.insert(
2822 "trust".to_string(),
2823 serde_json::to_value(Trust::derived(origin.clone()))?,
2824 );
2825 obj.insert("trust_origin".to_string(), serde_json::to_value(origin)?);
2826 if !origins.is_empty() {
2827 obj.insert("origins".to_string(), Value::Array(origins.clone()));
2828 }
2829 }
2830
2831 let mut summary = Map::new();
2832 summary.insert("count".to_string(), Value::from(events.len()));
2833 summary.insert("effects".to_string(), serde_json::to_value(&effects)?);
2834 summary.insert("boundaries".to_string(), serde_json::to_value(&boundaries)?);
2835 if !reasons.is_empty() {
2836 summary.insert("reasons".to_string(), serde_json::to_value(&reasons)?);
2837 }
2838 summary.insert("denied".to_string(), Value::Bool(denied));
2839 summary.insert("redacted".to_string(), Value::Bool(redacted));
2840 if !previews.is_empty() {
2841 summary.insert("previews".to_string(), Value::Array(previews));
2842 }
2843 if !origins.is_empty() {
2844 summary.insert("origins".to_string(), Value::Array(origins));
2845 }
2846
2847 Ok((artifact_value, Value::Object(summary)))
2848}
2849
2850fn aggregate_origins(events: &[PolicyEvaluationRecord]) -> Vec<Value> {
2851 let mut aggregated = Vec::new();
2852 for event in events {
2853 for origin in &event.origins {
2854 if !aggregated.iter().any(|existing| existing == origin) {
2855 aggregated.push(origin.clone());
2856 }
2857 }
2858 }
2859 aggregated
2860}
2861
2862fn collect_unique_reasons(events: &[PolicyEvaluationRecord]) -> Vec<String> {
2863 let mut reasons = Vec::new();
2864 for event in events {
2865 for reason in &event.reasons {
2866 push_unique_string(&mut reasons, reason);
2867 }
2868 for decision in &event.decisions {
2869 if let Some(reason) = &decision.reason {
2870 push_unique_string(&mut reasons, reason);
2871 }
2872 }
2873 }
2874 reasons
2875}
2876
2877fn capability_violation_record(
2878 run_id: RunId,
2879 step_id: StepId,
2880 reasons: Vec<String>,
2881 detail: Value,
2882 token_id: Option<String>,
2883) -> CapabilityViolation {
2884 let summary = reasons
2885 .first()
2886 .cloned()
2887 .unwrap_or_else(|| "capability_denied".to_string());
2888 let decision_trace = PolicyDecisionTrace {
2889 policy: "capability_guard".to_string(),
2890 effect: DecisionEffect::Deny,
2891 reason: Some(summary.clone()),
2892 patches: Vec::new(),
2893 };
2894 let record = PolicyEvaluationRecord {
2895 boundary: TrustBoundary::Scheduler,
2896 run_id: Some(Uuid::from(run_id)),
2897 step_id: Some(Uuid::from(step_id)),
2898 effect: DecisionEffect::Deny,
2899 reasons,
2900 decisions: vec![decision_trace],
2901 patches: Vec::new(),
2902 payload: Some(detail),
2903 timestamp: Utc::now(),
2904 origins: Vec::new(),
2905 preview: None,
2906 attestation_id: None,
2907 attestation: None,
2908 trust_decision: None,
2909 };
2910 CapabilityViolation {
2911 record,
2912 reason: summary,
2913 token_id,
2914 }
2915}
2916
2917impl Scheduler {
2918 async fn emit_run_bom(&self, run_id: RunId, status: models::RunStatus) -> Result<()> {
2919 let run_uuid = Uuid::from(run_id);
2920 let Some(run) = self.storage.fetch_run(run_uuid).await? else {
2921 warn!(run = %run_id, "run missing during BOM generation");
2922 return Ok(());
2923 };
2924 if has_run_artifact(&run.input_ctx, "aibom") {
2925 return Ok(());
2926 }
2927 let steps = self.storage.fetch_steps_for_run(run_uuid).await?;
2928 let build = build_aibom(&run, &steps, self.policy_pack.id())
2929 .context("failed to assemble AIBOM payload")?;
2930 let payload_bytes =
2931 serde_json::to_vec(&build.bom).context("failed to serialise AIBOM payload")?;
2932 let attestation_refs = build.attestation_ids.clone();
2933 let manifest_subject = format!("run:{}:aibom", run_id);
2934 let status_label = crate::model::RunStatus::from(status).as_str().to_string();
2935 let attestation_values: Vec<Value> = attestation_refs
2936 .iter()
2937 .map(|id| Value::String(id.to_string()))
2938 .collect();
2939 let mut metadata = json!({
2940 "kind": "aibom",
2941 "run_id": run_id.to_string(),
2942 "status": status_label,
2943 "trust_subjects": build.trust_subjects,
2944 "attestation_ids": attestation_values,
2945 "summary": build.summary,
2946 });
2947 let manifest_identity = IdentityEvidence::new()
2948 .with_run_id(Uuid::from(run_id))
2949 .with_subject_label(manifest_subject.clone());
2950 let manifest_profile = resolve_manifest_profile(Some(&metadata), ManifestProfile::Full);
2951 let manifest = generate_c2pa_manifest(ManifestInput {
2952 bytes: &payload_bytes,
2953 media_type: "application/json",
2954 subject: Some(&manifest_subject),
2955 attestation_ids: &attestation_refs,
2956 profile: manifest_profile,
2957 policy_evidence: None,
2958 identity: Some(&manifest_identity),
2959 capability_evidence: None,
2960 })?;
2961 let signer = trust_signer();
2962 let envelope = signer
2963 .sign(&payload_bytes)
2964 .context("failed to sign AIBOM payload")?;
2965 let signature_b64 = BASE64.encode(&envelope.signature);
2966 if let Some(obj) = metadata.as_object_mut() {
2967 obj.insert("c2pa_manifest".to_string(), manifest);
2968 obj.insert(
2969 "signature".to_string(),
2970 json!({
2971 "algorithm": envelope.algorithm.as_str(),
2972 "key_id": envelope.key_id,
2973 "value": signature_b64,
2974 }),
2975 );
2976 }
2977 let artifact_meta = self
2978 .artifact_store
2979 .put(Bytes::from(payload_bytes), "application/json", metadata)
2980 .await
2981 .context("failed to persist AIBOM artifact")?;
2982 let mut artifact_value = artifact_meta_to_value(&artifact_meta);
2983
2984 if self.config.transparency_writer
2985 && self.config.transparency_scope.includes_runs()
2986 && !attestation_refs.is_empty()
2987 {
2988 let payload = models::TransparencyJobPayload::RunRollup(models::RunRollupPayload {
2989 run_id: run_uuid,
2990 artifact_sha256: hex::encode(artifact_meta.sha256),
2991 attestation_ids: attestation_refs.clone(),
2992 artifact_key: "aibom".to_string(),
2993 metadata: artifact_meta.metadata.clone(),
2994 });
2995 match self.storage.enqueue_transparency_job(payload).await {
2996 Ok(job_id) => {
2997 if let Value::Object(ref mut obj) = artifact_value {
2998 obj.insert(
2999 "transparency".to_string(),
3000 json!({
3001 "status": "queued",
3002 "job_id": job_id,
3003 }),
3004 );
3005 }
3006 }
3007 Err(err) => {
3008 warn!(
3009 run = %run_id,
3010 error = %err,
3011 "failed to enqueue transparency job for AIBOM"
3012 );
3013 }
3014 }
3015 }
3016
3017 if let Value::Object(ref mut obj) = artifact_value {
3018 if let Some(Value::Object(ref mut meta_obj)) = obj.get_mut("metadata") {
3019 meta_obj.insert("kind".to_string(), Value::String("aibom".to_string()));
3020 }
3021 }
3022 self.attach_run_artifact(run_uuid, "aibom", artifact_value)
3023 .await
3024 }
3025
3026 async fn attach_run_artifact(&self, run_uuid: Uuid, key: &str, artifact: Value) -> Result<()> {
3027 let Some(run) = self.storage.fetch_run(run_uuid).await? else {
3028 warn!(run = %run_uuid, "run missing while attaching artifact");
3029 return Ok(());
3030 };
3031 let mut ctx_map = match run.input_ctx {
3032 Value::Object(map) => map,
3033 other => {
3034 let mut map = Map::new();
3035 map.insert("inputs".to_string(), other);
3036 map
3037 }
3038 };
3039 let entry = ctx_map
3040 .entry("artifacts".to_string())
3041 .or_insert_with(|| Value::Object(Map::new()));
3042 match entry {
3043 Value::Object(map) => {
3044 map.insert(key.to_string(), artifact);
3045 }
3046 _ => {
3047 let mut map = Map::new();
3048 map.insert(key.to_string(), artifact);
3049 ctx_map.insert("artifacts".to_string(), Value::Object(map));
3050 }
3051 }
3052 self.storage
3053 .update_run_input_ctx(run_uuid, Value::Object(ctx_map))
3054 .await
3055 }
3056}
3057
3058fn push_unique_string(list: &mut Vec<String>, value: &str) {
3059 if !list.iter().any(|existing| existing == value) {
3060 list.push(value.to_string());
3061 }
3062}
3063
3064fn has_run_artifact(ctx: &Value, key: &str) -> bool {
3065 ctx.as_object()
3066 .and_then(|map| map.get("artifacts"))
3067 .and_then(Value::as_object)
3068 .map(|artifacts| artifacts.contains_key(key))
3069 .unwrap_or(false)
3070}
3071
3072#[derive(Clone)]
3073struct CapabilityViolation {
3074 record: PolicyEvaluationRecord,
3075 reason: String,
3076 token_id: Option<String>,
3077}
3078
3079#[derive(Debug, Clone, Copy)]
3080struct BudgetRequest {
3081 tokens: i64,
3082 cost: f64,
3083}
3084
3085impl BudgetRequest {
3086 fn from_step(step: &QueuedStep) -> Self {
3087 let budget_policy = step.spec.policy.get("budget");
3088 let tokens = budget_policy
3089 .and_then(|p| p.get("tokens"))
3090 .and_then(Value::as_i64);
3091 let execution_hint = step.spec.execution.cost_hint.as_ref();
3092 let tokens = execution_hint
3093 .and_then(|hint| hint.tokens)
3094 .or(tokens)
3095 .or_else(|| {
3096 step.spec
3097 .inputs
3098 .get("token_estimate")
3099 .and_then(Value::as_i64)
3100 })
3101 .unwrap_or(0)
3102 .max(0);
3103 let cost = budget_policy
3104 .and_then(|p| p.get("cost"))
3105 .and_then(Value::as_f64);
3106 let cost = execution_hint
3107 .and_then(|hint| hint.usd)
3108 .or(cost)
3109 .or_else(|| {
3110 step.spec
3111 .inputs
3112 .get("cost_estimate")
3113 .and_then(Value::as_f64)
3114 })
3115 .unwrap_or(0.0)
3116 .max(0.0);
3117 Self { tokens, cost }
3118 }
3119}
3120
3121enum BudgetDecision {
3122 Granted(BudgetCtx),
3123 Denied { reason: String },
3124 Unlimited(BudgetCtx),
3125}
3126
3127struct QueueMetrics {
3128 queue_depth: Histogram<f64>,
3129 queue_lag: Histogram<f64>,
3130 step_latency: Histogram<f64>,
3131 budget_tokens: Counter<u64>,
3132 budget_cost: Counter<f64>,
3133 budget_tokens_remaining: Histogram<f64>,
3134 budget_cost_remaining: Histogram<f64>,
3135 budget_tokens_projected: Histogram<f64>,
3136 budget_cost_projected: Histogram<f64>,
3137 retries: Counter<u64>,
3138 slo_slack: Histogram<f64>,
3139 slo_preemptions: Counter<u64>,
3140}
3141
3142impl QueueMetrics {
3143 fn new() -> Self {
3144 let meter: Meter = telemetry_otel::meter("fleetforge.runtime");
3145 let queue_depth = meter
3146 .f64_histogram("fleetforge.queue.depth")
3147 .with_description("Sampled queue depth of the step scheduler")
3148 .init();
3149 let queue_lag = meter
3150 .f64_histogram("fleetforge.queue.lag_seconds")
3151 .with_description("Time a step spent waiting in the scheduler queue before execution")
3152 .init();
3153 let step_latency = meter
3154 .f64_histogram("fleetforge.step.latency")
3155 .with_description("End-to-end step execution latency in seconds")
3156 .init();
3157 let budget_tokens = meter
3158 .u64_counter("fleetforge.budget.tokens")
3159 .with_description("Total tokens consumed by steps")
3160 .init();
3161 let budget_cost = meter
3162 .f64_counter("fleetforge.budget.cost")
3163 .with_description("Total cost consumed by steps (USD)")
3164 .init();
3165 let budget_tokens_remaining = meter
3166 .f64_histogram("fleetforge.budget.tokens_remaining")
3167 .with_description("Remaining tokens in the active run budget after each step")
3168 .init();
3169 let budget_cost_remaining = meter
3170 .f64_histogram("fleetforge.budget.cost_remaining")
3171 .with_description("Remaining cost in the active run budget after each step (USD)")
3172 .init();
3173 let budget_tokens_projected = meter
3174 .f64_histogram("fleetforge.budget.tokens_projected_burn")
3175 .with_description(
3176 "Projected token consumption (used + reserved) prior to executing a step",
3177 )
3178 .init();
3179 let budget_cost_projected = meter
3180 .f64_histogram("fleetforge.budget.cost_projected_burn")
3181 .with_description("Projected cost (used + reserved) prior to executing a step (USD)")
3182 .init();
3183 let retries = meter
3184 .u64_counter("fleetforge.step.retries")
3185 .with_description("Number of step retries executed by the scheduler")
3186 .init();
3187 let slo_slack = meter
3188 .f64_histogram("fleetforge.slo.queue_slack_ms")
3189 .with_description("Observed queue slack (target - wait) for steps with SLO contracts, in milliseconds")
3190 .init();
3191 let slo_preemptions = meter
3192 .u64_counter("fleetforge.slo.preemptions")
3193 .with_description("Count of times the scheduler moved an SLO-contracted step ahead of another step due to a breach")
3194 .init();
3195 Self {
3196 queue_depth,
3197 queue_lag,
3198 step_latency,
3199 budget_tokens,
3200 budget_cost,
3201 budget_tokens_remaining,
3202 budget_cost_remaining,
3203 budget_tokens_projected,
3204 budget_cost_projected,
3205 retries,
3206 slo_slack,
3207 slo_preemptions,
3208 }
3209 }
3210
3211 fn record_slo_slack(&self, slack_ms: Option<i64>) {
3212 if let Some(value) = slack_ms {
3213 self.slo_slack.record(value as f64, &[]);
3214 }
3215 }
3216
3217 fn record_slo_preemption(&self) {
3218 self.slo_preemptions.add(1, &[]);
3219 }
3220
3221 fn record_queue_depth(&self, depth: usize) {
3222 self.queue_depth.record(depth as f64, &[]);
3223 }
3224
3225 fn record_queue_lag(&self, lag: Duration) {
3226 self.queue_lag.record(lag.as_secs_f64(), &[]);
3227 }
3228
3229 fn record_latency(&self, duration: Duration) {
3230 self.step_latency.record(duration.as_secs_f64(), &[]);
3231 }
3232
3233 fn record_retry(&self) {
3234 self.retries.add(1, &[]);
3235 }
3236
3237 fn record_budget_projection(&self, budget: &BudgetCtx) {
3238 if budget.tokens_reserved > 0 {
3239 let projected = if budget.tracked {
3240 budget.tokens_used.saturating_add(budget.tokens_reserved)
3241 } else {
3242 budget.tokens_reserved
3243 };
3244 self.budget_tokens_projected.record(projected as f64, &[]);
3245 }
3246 if budget.cost_reserved > 0.0 {
3247 let projected = if budget.tracked {
3248 budget.cost_used + budget.cost_reserved
3249 } else {
3250 budget.cost_reserved
3251 };
3252 self.budget_cost_projected.record(projected, &[]);
3253 }
3254 }
3255
3256 fn record_budget_consumed(&self, tokens: i64, cost: f64) {
3257 if tokens > 0 {
3258 self.budget_tokens.add(tokens as u64, &[]);
3259 }
3260 if cost > 0.0 {
3261 self.budget_cost.add(cost, &[]);
3262 }
3263 }
3264
3265 fn record_budget_remaining(&self, tokens: Option<i64>, cost: Option<f64>) {
3266 if let Some(tokens) = tokens {
3267 if tokens >= 0 {
3268 self.budget_tokens_remaining.record(tokens as f64, &[]);
3269 }
3270 }
3271 if let Some(cost) = cost {
3272 if cost >= 0.0 {
3273 self.budget_cost_remaining.record(cost, &[]);
3274 }
3275 }
3276 }
3277}
3278
3279#[cfg(test)]
3280mod tests {
3281 use super::*;
3282 use crate::guardrails::PolicyDecisionTrace;
3283 use crate::model::{RunId, StepId};
3284 use async_trait::async_trait;
3285 use bytes::Bytes;
3286 use chrono::Utc;
3287 use fleetforge_storage::ArtifactMeta;
3288 use fleetforge_trust::InMemoryAttestationVault;
3289 use serde_json::{json, Value};
3290 use sqlx::Row;
3291 use std::sync::{
3292 atomic::{AtomicBool, Ordering},
3293 Arc,
3294 };
3295 use testcontainers::clients::Cli;
3296 use testcontainers::images::postgres::Postgres;
3297 use tokio::time::{sleep, Duration};
3298 use url::Url;
3299 use uuid::Uuid;
3300
3301 fn docker_available() -> bool {
3302 std::fs::metadata("/var/run/docker.sock").is_ok() || std::env::var("DOCKER_HOST").is_ok()
3303 }
3304
3305 fn test_step_with_policy(
3306 policy: Value,
3307 created_at: chrono::DateTime<Utc>,
3308 priority: i16,
3309 ) -> QueuedStep {
3310 QueuedStep {
3311 run_id: RunId(Uuid::new_v4()),
3312 step_id: StepId(Uuid::new_v4()),
3313 idx: 0,
3314 priority,
3315 status: StepStatus::Queued,
3316 attempt: 0,
3317 created_at,
3318 max_attempts: 1,
3319 retry_backoff_ms: 0,
3320 retry_backoff_factor: 1.0,
3321 not_before: None,
3322 deadline_at: None,
3323 checkpoint: None,
3324 compensation_step: None,
3325 compensation_scheduled: false,
3326 spec: StepSpec {
3327 id: StepId(Uuid::new_v4()),
3328 r#type: StepType::Tool,
3329 inputs: json!({}),
3330 policy,
3331 execution: StepExecution::default(),
3332 slug: None,
3333 trust: None,
3334 trust_origin: None,
3335 llm_inputs: None,
3336 },
3337 }
3338 }
3339
3340 #[test]
3341 fn build_step_slo_includes_queue_observations() {
3342 let now = Utc::now();
3343 let policy = json!({
3344 "slo": {
3345 "tier": "gold",
3346 "queue_target_ms": 1000,
3347 "priority_boost": 3,
3348 "error_budget_ratio": 0.25
3349 }
3350 });
3351 let step = test_step_with_policy(policy, now - ChronoDuration::milliseconds(1500), 5);
3352 let slo = build_step_slo(&step, now).expect("slo metadata expected");
3353
3354 assert_eq!(slo.tier.as_deref(), Some("gold"));
3355 assert_eq!(slo.queue_target_ms, Some(1000));
3356 assert!(slo.observed_queue_ms.unwrap() >= 1500);
3357 assert_eq!(slo.slack_ms, Some(-500));
3358 assert_eq!(slo.priority_boost, 3);
3359 assert_eq!(slo.error_budget_ratio, Some(0.25));
3360 }
3361
3362 #[test]
3363 fn slo_scores_prioritize_breaches_over_healthy_steps() {
3364 let now = Utc::now();
3365 let breached = test_step_with_policy(
3366 json!({ "slo": { "queue_target_ms": 2000 } }),
3367 now - ChronoDuration::milliseconds(4000),
3368 0,
3369 );
3370 let healthy = test_step_with_policy(
3371 json!({ "slo": { "queue_target_ms": 2000 } }),
3372 now - ChronoDuration::milliseconds(100),
3373 0,
3374 );
3375
3376 let breached_contract = extract_slo_contract(&breached.spec);
3377 let healthy_contract = extract_slo_contract(&healthy.spec);
3378
3379 let breached_score = compute_slo_score(&breached, breached_contract.as_ref(), now);
3380 let healthy_score = compute_slo_score(&healthy, healthy_contract.as_ref(), now);
3381
3382 assert!(breached_score.has_contract);
3383 assert!(healthy_score.has_contract);
3384 assert!(breached_score.slack_ms < 0);
3385 assert!(healthy_score.slack_ms > 0);
3386 assert_eq!(breached_score.compare(&healthy_score), Ordering::Less);
3387 }
3388
3389 #[derive(Default, Clone)]
3390 struct StubArtifactStore;
3391
3392 #[async_trait]
3393 impl ArtifactStore for StubArtifactStore {
3394 async fn put(
3395 &self,
3396 bytes: Bytes,
3397 media_type: &str,
3398 metadata: Value,
3399 ) -> Result<ArtifactMeta> {
3400 Ok(ArtifactMeta {
3401 sha256: [0u8; 32],
3402 media_type: media_type.to_string(),
3403 size: bytes.len() as u64,
3404 metadata,
3405 })
3406 }
3407
3408 async fn get(&self, _sha256: [u8; 32]) -> Result<Bytes> {
3409 unimplemented!("get not implemented for stub artifact store");
3410 }
3411
3412 async fn presign_get(&self, _sha256: [u8; 32], _ttl: std::time::Duration) -> Result<Url> {
3413 unimplemented!("presign not implemented for stub artifact store");
3414 }
3415 }
3416
3417 #[tokio::test]
3418 async fn persist_policy_decisions_emits_summary() {
3419 let store = Arc::new(StubArtifactStore::default());
3420 let event = PolicyEvaluationRecord {
3421 boundary: TrustBoundary::IngressPrompt,
3422 run_id: Some(Uuid::nil()),
3423 step_id: Some(Uuid::nil()),
3424 effect: DecisionEffect::Redact,
3425 reasons: vec!["mask".into()],
3426 decisions: vec![PolicyDecisionTrace {
3427 policy: "test".into(),
3428 effect: DecisionEffect::Redact,
3429 reason: Some("mask".into()),
3430 patches: vec![
3431 json!({"op": "replace", "path": "/inputs/secret", "value": "[masked]"}),
3432 ],
3433 }],
3434 patches: vec![json!({"op": "replace", "path": "/inputs/secret", "value": "[masked]"})],
3435 payload: Some(json!({"inputs": {"secret": "[masked]"}})),
3436 timestamp: Utc::now(),
3437 origins: vec![json!({"boundary": "ingress_prompt"})],
3438 preview: Some("secret".into()),
3439 attestation_id: None,
3440 attestation: None,
3441 trust_decision: None,
3442 };
3443
3444 let (artifact, summary) =
3445 persist_policy_decisions(store, RunId(Uuid::nil()), StepId(Uuid::nil()), &[event])
3446 .await
3447 .expect("persist policy decisions");
3448
3449 assert_eq!(summary["count"], 1);
3450 assert_eq!(summary["redacted"], true);
3451 assert!(summary["origins"].is_array());
3452 assert!(summary["previews"].is_array());
3453 assert_eq!(artifact["metadata"]["kind"], "policy_decisions");
3454 assert!(artifact["metadata"]["signature"].is_object());
3455 assert_eq!(artifact["metadata"]["signature"]["algorithm"], "HS256");
3456 assert!(artifact["metadata"]["signature"]["key_id"].is_string());
3457 assert!(artifact["metadata"]["signature"]["value"]
3458 .as_str()
3459 .is_some());
3460 assert!(artifact["trust_origin"].is_object());
3461 }
3462
3463 #[test]
3464 fn attach_policy_summary_inserts_artifact() {
3465 let mut target = json!({"existing": true});
3466 let summary = json!({"count": 2});
3467 let artifact = json!({"sha256": "abc123", "metadata": {"kind": "policy_decisions"}});
3468 attach_policy_summary(&mut target, summary, artifact.clone());
3469
3470 assert_eq!(target["policy_decisions"]["count"], 2);
3471 assert_eq!(target["policy_decisions"]["artifact"]["sha256"], "abc123");
3472 assert_eq!(
3473 target["artifacts"]["policy_decisions"]["metadata"]["kind"],
3474 "policy_decisions"
3475 );
3476 }
3477
3478 #[derive(Clone)]
3479 struct DenyPolicy;
3480
3481 #[async_trait]
3482 impl PolicyEngine for DenyPolicy {
3483 async fn evaluate(
3484 &self,
3485 _request: &fleetforge_policy::PolicyRequest,
3486 ) -> Result<PolicyDecision> {
3487 Ok(PolicyDecision {
3488 effect: DecisionEffect::Deny,
3489 reason: Some("blocked".to_string()),
3490 patches: Vec::new(),
3491 })
3492 }
3493 }
3494
3495 #[derive(Clone)]
3496 struct RedactPolicy;
3497
3498 #[async_trait]
3499 impl PolicyEngine for RedactPolicy {
3500 async fn evaluate(
3501 &self,
3502 _request: &fleetforge_policy::PolicyRequest,
3503 ) -> Result<PolicyDecision> {
3504 Ok(PolicyDecision {
3505 effect: DecisionEffect::Redact,
3506 reason: Some("mask".to_string()),
3507 patches: vec![json!({"op": "remove", "path": "/secret"})],
3508 })
3509 }
3510 }
3511
3512 struct RecordingExecutor {
3513 executed: Arc<AtomicBool>,
3514 }
3515
3516 #[async_trait]
3517 impl StepExecutor for RecordingExecutor {
3518 fn kind(&self) -> &'static str {
3519 "tool"
3520 }
3521
3522 async fn execute(&self, _ctx: &StepCtx) -> Result<StepExecutionResult> {
3523 self.executed.store(true, Ordering::SeqCst);
3524 Ok(StepExecutionResult::new(json!({"ok": true})))
3525 }
3526 }
3527
3528 struct UsageExecutor {
3529 executed: Arc<AtomicBool>,
3530 }
3531
3532 #[async_trait]
3533 impl StepExecutor for UsageExecutor {
3534 fn kind(&self) -> &'static str {
3535 "tool"
3536 }
3537
3538 async fn execute(&self, _ctx: &StepCtx) -> Result<StepExecutionResult> {
3539 self.executed.store(true, Ordering::SeqCst);
3540 Ok(StepExecutionResult::with_usage(
3541 json!({"result": "ok"}),
3542 ResourceUsage {
3543 tokens: Some(10),
3544 cost: Some(0.0005),
3545 },
3546 ))
3547 }
3548 }
3549
3550 struct FailingExecutor;
3551
3552 #[async_trait]
3553 impl StepExecutor for FailingExecutor {
3554 fn kind(&self) -> &'static str {
3555 "tool"
3556 }
3557
3558 async fn execute(&self, _ctx: &StepCtx) -> Result<StepExecutionResult> {
3559 Err(anyhow!("executor failure"))
3560 }
3561 }
3562
3563 struct ConditionalExecutor;
3564
3565 #[async_trait]
3566 impl StepExecutor for ConditionalExecutor {
3567 fn kind(&self) -> &'static str {
3568 "tool"
3569 }
3570
3571 async fn execute(&self, ctx: &StepCtx) -> Result<StepExecutionResult> {
3572 let mode = ctx
3573 .untrusted_inputs
3574 .as_ref()
3575 .get("mode")
3576 .and_then(Value::as_str)
3577 .unwrap_or_default();
3578 match mode {
3579 "primary" => Err(anyhow!("primary failure")),
3580 "compensation" => Ok(StepExecutionResult::with_usage(
3581 json!({"status": "compensated"}),
3582 ResourceUsage {
3583 tokens: Some(5),
3584 cost: Some(0.25),
3585 },
3586 )),
3587 _ => Ok(StepExecutionResult::new(json!({"status": mode}))),
3588 }
3589 }
3590 }
3591
3592 async fn setup_storage() -> (Arc<Storage>, testcontainers::Container<'static, Postgres>) {
3593 let docker = Cli::default();
3594 let container = docker.run(Postgres::default());
3595 let port = container.get_host_port_ipv4(5432);
3596 let db_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
3597 sleep(Duration::from_millis(250)).await;
3598
3599 let storage = Storage::connect(StorageConfig {
3600 database_url: db_url,
3601 max_connections: 2,
3602 connect_timeout: Duration::from_secs(10),
3603 object_store: ObjectStoreConfig::InMemory,
3604 })
3605 .await
3606 .expect("connect storage");
3607
3608 let container: testcontainers::Container<'static, Postgres> =
3609 unsafe { std::mem::transmute(container) };
3610 (Arc::new(storage), container)
3611 }
3612
3613 fn seed_step(run_id: Uuid, step_id: Uuid) -> models::NewStep {
3614 models::NewStep {
3615 run_id,
3616 step_id,
3617 idx: 0,
3618 priority: 5,
3619 spec_json: json!({"id": step_id, "type": "tool", "inputs": {}, "policy": {}}),
3620 status: models::StepStatus::Queued,
3621 attempt: 0,
3622 input_snapshot: None,
3623 provider: None,
3624 provider_version: None,
3625 pending_dependencies: 0,
3626 total_dependencies: 0,
3627 estimated_tokens: 50,
3628 estimated_cost: 0.001,
3629 actual_tokens: None,
3630 actual_cost: None,
3631 ..models::NewStep::default()
3632 }
3633 }
3634
3635 #[tokio::test]
3636 async fn deny_decision_blocks_execution() {
3637 if std::fs::metadata("/var/run/docker.sock").is_err()
3638 && std::env::var("DOCKER_HOST").is_err()
3639 {
3640 eprintln!("Skipping scheduler tests because Docker is unavailable");
3641 return;
3642 }
3643
3644 let (storage, container) = setup_storage().await;
3645 let executed = Arc::new(AtomicBool::new(false));
3646 let mut registry = ExecutorRegistry::new();
3647 registry
3648 .register(Arc::new(RecordingExecutor {
3649 executed: Arc::clone(&executed),
3650 }))
3651 .unwrap();
3652 let audit = Arc::new(Audit::new(storage.pool().clone()));
3653 let attestation_vault: Arc<dyn AttestationVault> =
3654 Arc::new(InMemoryAttestationVault::new());
3655
3656 let scheduler = Scheduler::new(
3657 Arc::clone(&storage),
3658 Arc::new(registry),
3659 Arc::new(DenyPolicy),
3660 Arc::clone(&audit),
3661 Arc::clone(&attestation_vault),
3662 SchedulerConfig::default(),
3663 );
3664
3665 let run_id = Uuid::new_v4();
3666 let step_id = Uuid::new_v4();
3667
3668 let _ = storage
3669 .insert_run_with_steps(
3670 models::NewRun {
3671 run_id,
3672 status: models::RunStatus::Pending,
3673 dag_json: json!({"steps": [], "edges": []}),
3674 input_ctx: json!({}),
3675 seed: 1,
3676 idempotency_key: None,
3677 },
3678 &[seed_step(run_id, step_id)],
3679 &[],
3680 )
3681 .await
3682 .unwrap();
3683
3684 let cfg = SchedulerConfig::default();
3685 let leased = storage.claim_queued_steps(1, &cfg.worker_id).await.unwrap();
3686 let queued = leased
3687 .into_iter()
3688 .map(QueuedStep::try_from)
3689 .next()
3690 .unwrap()
3691 .unwrap();
3692
3693 scheduler.execute_step(queued).await.unwrap();
3694
3695 assert!(!executed.load(Ordering::SeqCst));
3696
3697 let steps = storage.fetch_steps_for_run(run_id).await.unwrap();
3698 let step = &steps[0];
3699 assert_eq!(step.status, models::StepStatus::Failed);
3700 assert_eq!(step.attempt, 0);
3701 assert_eq!(
3702 step.error_json.as_ref().unwrap()["kind"],
3703 json!("policy_denied")
3704 );
3705
3706 let events = storage.fetch_unpublished_outbox(10).await.unwrap();
3707 let event = events.iter().find(|ev| ev.kind == "step_failed").unwrap();
3708 assert_eq!(event.payload["error"]["kind"], json!("policy_denied"));
3709
3710 drop(container);
3711 }
3712
3713 #[tokio::test]
3714 async fn retry_reschedules_until_max_attempts() {
3715 if !docker_available() {
3716 eprintln!(
3717 "Skipping retry_reschedules_until_max_attempts because Docker is unavailable"
3718 );
3719 return;
3720 }
3721
3722 let (storage, container) = setup_storage().await;
3723 let mut registry = ExecutorRegistry::new();
3724 registry.register(Arc::new(FailingExecutor)).unwrap();
3725 let audit = Arc::new(Audit::new(storage.pool().clone()));
3726 let attestation_vault: Arc<dyn AttestationVault> =
3727 Arc::new(InMemoryAttestationVault::new());
3728 let scheduler = Scheduler::new(
3729 Arc::clone(&storage),
3730 Arc::new(registry),
3731 crate::policy::AllowAllPolicy::shared(),
3732 Arc::clone(&audit),
3733 Arc::clone(&attestation_vault),
3734 SchedulerConfig::default(),
3735 );
3736
3737 let run_id = Uuid::new_v4();
3738 let step_id = Uuid::new_v4();
3739 let step_spec = json!({
3740 "id": step_id,
3741 "type": "tool",
3742 "inputs": {},
3743 "execution": {
3744 "max_attempts": 2,
3745 "retry_backoff_ms": 10
3746 }
3747 });
3748
3749 let spec = json!({
3750 "steps": [step_spec.clone()],
3751 "edges": []
3752 });
3753
3754 storage
3755 .insert_run_with_steps(
3756 models::NewRun {
3757 run_id,
3758 status: models::RunStatus::Pending,
3759 dag_json: spec,
3760 input_ctx: json!({}),
3761 seed: 7,
3762 idempotency_key: None,
3763 },
3764 &[models::NewStep {
3765 run_id,
3766 step_id,
3767 idx: 0,
3768 priority: 0,
3769 spec_json: step_spec,
3770 status: models::StepStatus::Queued,
3771 attempt: 0,
3772 input_snapshot: None,
3773 provider: None,
3774 provider_version: None,
3775 pending_dependencies: 0,
3776 total_dependencies: 0,
3777 estimated_tokens: 0,
3778 estimated_cost: 0.0,
3779 actual_tokens: None,
3780 actual_cost: None,
3781 ..models::NewStep::default()
3782 }],
3783 &[],
3784 )
3785 .await
3786 .unwrap();
3787
3788 let cfg = scheduler.config.clone();
3789 let first = storage
3790 .claim_queued_steps(1, &cfg.worker_id)
3791 .await
3792 .unwrap()
3793 .into_iter()
3794 .map(QueuedStep::try_from)
3795 .next()
3796 .unwrap()
3797 .unwrap();
3798
3799 scheduler.execute_step(first).await.unwrap();
3800
3801 let steps = storage.fetch_steps_for_run(run_id).await.unwrap();
3802 let queued = steps.iter().find(|step| step.step_id == step_id).unwrap();
3803 assert_eq!(queued.status, models::StepStatus::Queued);
3804 assert_eq!(queued.attempt, 1);
3805 assert!(queued.not_before.is_some());
3806
3807 sleep(Duration::from_millis(20)).await;
3808
3809 let second = storage
3810 .claim_queued_steps(1, &cfg.worker_id)
3811 .await
3812 .unwrap()
3813 .into_iter()
3814 .map(QueuedStep::try_from)
3815 .next()
3816 .unwrap()
3817 .unwrap();
3818
3819 scheduler.execute_step(second).await.unwrap();
3820
3821 let final_steps = storage.fetch_steps_for_run(run_id).await.unwrap();
3822 let failed = final_steps
3823 .iter()
3824 .find(|step| step.step_id == step_id)
3825 .unwrap();
3826 assert_eq!(failed.status, models::StepStatus::Failed);
3827 assert!(failed.compensation_step.is_none());
3828
3829 drop(container);
3830 }
3831
3832 #[tokio::test]
3833 async fn backpressure_defers_steps_exceeding_inflight_cost() {
3834 if !docker_available() {
3835 eprintln!("Skipping backpressure test because Docker is unavailable");
3836 return;
3837 }
3838
3839 let (storage, container) = setup_storage().await;
3840 let executed = Arc::new(AtomicBool::new(false));
3841 let mut registry = ExecutorRegistry::new();
3842 registry
3843 .register(Arc::new(RecordingExecutor {
3844 executed: Arc::clone(&executed),
3845 }))
3846 .unwrap();
3847 let audit = Arc::new(Audit::new(storage.pool().clone()));
3848 let attestation_vault: Arc<dyn AttestationVault> =
3849 Arc::new(InMemoryAttestationVault::new());
3850 let mut config = SchedulerConfig::default();
3851 config.batch_size = 2;
3852 config.max_inflight_cost = Some(1.0);
3853 config.backpressure_delay = Duration::from_millis(20);
3854 let scheduler = Scheduler::new(
3855 Arc::clone(&storage),
3856 Arc::new(registry),
3857 crate::policy::AllowAllPolicy::shared(),
3858 Arc::clone(&audit),
3859 Arc::clone(&attestation_vault),
3860 config,
3861 );
3862
3863 let run_id = Uuid::new_v4();
3864 let first_id = Uuid::new_v4();
3865 let second_id = Uuid::new_v4();
3866
3867 let first_spec = json!({
3868 "id": first_id,
3869 "type": "tool",
3870 "inputs": {},
3871 "execution": { "cost_hint": { "usd": 1.0 } }
3872 });
3873 let second_spec = json!({
3874 "id": second_id,
3875 "type": "tool",
3876 "inputs": {},
3877 "execution": { "cost_hint": { "usd": 0.5 } }
3878 });
3879
3880 storage
3881 .insert_run_with_steps(
3882 models::NewRun {
3883 run_id,
3884 status: models::RunStatus::Pending,
3885 dag_json: json!({ "steps": [first_spec.clone(), second_spec.clone()], "edges": [] }),
3886 input_ctx: json!({}),
3887 seed: 99,
3888 idempotency_key: None,
3889 },
3890 &[
3891 models::NewStep {
3892 run_id,
3893 step_id: first_id,
3894 idx: 0,
3895 priority: 0,
3896 spec_json: first_spec,
3897 status: models::StepStatus::Queued,
3898 attempt: 0,
3899 input_snapshot: None,
3900 provider: None,
3901 provider_version: None,
3902 pending_dependencies: 0,
3903 total_dependencies: 0,
3904 estimated_tokens: 0,
3905 estimated_cost: 1.0,
3906 actual_tokens: None,
3907 actual_cost: None,
3908 ..models::NewStep::default()
3909 },
3910 models::NewStep {
3911 run_id,
3912 step_id: second_id,
3913 idx: 1,
3914 priority: 0,
3915 spec_json: second_spec,
3916 status: models::StepStatus::Queued,
3917 attempt: 0,
3918 input_snapshot: None,
3919 provider: None,
3920 provider_version: None,
3921 pending_dependencies: 0,
3922 total_dependencies: 0,
3923 estimated_tokens: 0,
3924 estimated_cost: 0.5,
3925 actual_tokens: None,
3926 actual_cost: None,
3927 ..models::NewStep::default()
3928 },
3929 ],
3930 &[],
3931 )
3932 .await
3933 .unwrap();
3934
3935 let cfg = scheduler.config.clone();
3936 let mut leased: Vec<QueuedStep> = storage
3937 .claim_queued_steps(2, &cfg.worker_id)
3938 .await
3939 .unwrap()
3940 .into_iter()
3941 .map(|step| QueuedStep::try_from(step).unwrap())
3942 .collect();
3943
3944 let first = leased.remove(0);
3945 scheduler.execute_step(first).await.unwrap();
3946 assert!(executed.load(Ordering::SeqCst));
3947
3948 scheduler.add_inflight(0, 1.0).await;
3950
3951 let second = leased.remove(0);
3952 scheduler.execute_step(second).await.unwrap();
3953
3954 let steps = storage.fetch_steps_for_run(run_id).await.unwrap();
3955 let deferred = steps.iter().find(|step| step.step_id == second_id).unwrap();
3956 assert_eq!(deferred.status, models::StepStatus::Queued);
3957 assert!(deferred.not_before.is_some());
3958 assert_eq!(deferred.attempt, 0);
3959
3960 scheduler.release_inflight(0, 1.0).await;
3961 drop(container);
3962 }
3963
3964 struct CheckpointExecutor;
3965
3966 #[async_trait]
3967 impl StepExecutor for CheckpointExecutor {
3968 fn kind(&self) -> &'static str {
3969 "tool"
3970 }
3971
3972 async fn execute(&self, _ctx: &StepCtx) -> Result<StepExecutionResult> {
3973 Ok(StepExecutionResult::new(json!({"ok": true})).with_checkpoint(json!({"state": 42})))
3974 }
3975 }
3976
3977 #[tokio::test]
3978 async fn checkpoint_is_persisted_from_executor() {
3979 if !docker_available() {
3980 eprintln!("Skipping checkpoint test because Docker is unavailable");
3981 return;
3982 }
3983
3984 let (storage, container) = setup_storage().await;
3985 let mut registry = ExecutorRegistry::new();
3986 registry.register(Arc::new(CheckpointExecutor)).unwrap();
3987 let audit = Arc::new(Audit::new(storage.pool().clone()));
3988 let attestation_vault: Arc<dyn AttestationVault> =
3989 Arc::new(InMemoryAttestationVault::new());
3990 let scheduler = Scheduler::new(
3991 Arc::clone(&storage),
3992 Arc::new(registry),
3993 crate::policy::AllowAllPolicy::shared(),
3994 Arc::clone(&audit),
3995 Arc::clone(&attestation_vault),
3996 SchedulerConfig::default(),
3997 );
3998
3999 let run_id = Uuid::new_v4();
4000 let step_id = Uuid::new_v4();
4001
4002 let step_spec = json!({
4003 "id": step_id,
4004 "type": "tool",
4005 "inputs": {},
4006 "execution": { "checkpoint": true }
4007 });
4008
4009 storage
4010 .insert_run_with_steps(
4011 models::NewRun {
4012 run_id,
4013 status: models::RunStatus::Pending,
4014 dag_json: json!({ "steps": [step_spec.clone()], "edges": [] }),
4015 input_ctx: json!({}),
4016 seed: 11,
4017 idempotency_key: None,
4018 },
4019 &[models::NewStep {
4020 run_id,
4021 step_id,
4022 idx: 0,
4023 priority: 0,
4024 spec_json: step_spec,
4025 status: models::StepStatus::Queued,
4026 attempt: 0,
4027 input_snapshot: None,
4028 provider: None,
4029 provider_version: None,
4030 pending_dependencies: 0,
4031 total_dependencies: 0,
4032 estimated_tokens: 0,
4033 estimated_cost: 0.0,
4034 actual_tokens: None,
4035 actual_cost: None,
4036 ..models::NewStep::default()
4037 }],
4038 &[],
4039 )
4040 .await
4041 .unwrap();
4042
4043 let worker = scheduler.config.worker_id.clone();
4044 let queued = storage
4045 .claim_queued_steps(1, &worker)
4046 .await
4047 .unwrap()
4048 .into_iter()
4049 .map(QueuedStep::try_from)
4050 .next()
4051 .unwrap()
4052 .unwrap();
4053
4054 scheduler.execute_step(queued).await.unwrap();
4055
4056 let steps = storage.fetch_steps_for_run(run_id).await.unwrap();
4057 let step = steps.iter().find(|item| item.step_id == step_id).unwrap();
4058 assert_eq!(step.status, models::StepStatus::Succeeded);
4059 let checkpoint = step.checkpoint.as_ref().expect("checkpoint missing");
4060 assert_eq!(checkpoint, &json!({"state": 42}));
4061
4062 drop(container);
4063 }
4064
4065 #[tokio::test]
4066 async fn compensation_step_queued_on_terminal_failure() {
4067 if !docker_available() {
4068 eprintln!("Skipping compensation test because Docker is unavailable");
4069 return;
4070 }
4071
4072 let (storage, container) = setup_storage().await;
4073 let mut registry = ExecutorRegistry::new();
4074 registry.register(Arc::new(FailingExecutor)).unwrap();
4075 let audit = Arc::new(Audit::new(storage.pool().clone()));
4076 let attestation_vault: Arc<dyn AttestationVault> =
4077 Arc::new(InMemoryAttestationVault::new());
4078 let scheduler = Scheduler::new(
4079 Arc::clone(&storage),
4080 Arc::new(registry),
4081 crate::policy::AllowAllPolicy::shared(),
4082 Arc::clone(&audit),
4083 Arc::clone(&attestation_vault),
4084 SchedulerConfig::default(),
4085 );
4086
4087 let run_id = Uuid::new_v4();
4088 let primary_id = Uuid::new_v4();
4089 let compensation_id = Uuid::new_v4();
4090
4091 let primary_spec = json!({
4092 "id": primary_id,
4093 "type": "tool",
4094 "inputs": {},
4095 "execution": {
4096 "max_attempts": 1,
4097 "compensation": { "step": compensation_id.to_string() }
4098 }
4099 });
4100
4101 let compensation_spec = json!({
4102 "id": compensation_id,
4103 "type": "tool",
4104 "inputs": { "action": "rollback" }
4105 });
4106
4107 storage
4108 .insert_run_with_steps(
4109 models::NewRun {
4110 run_id,
4111 status: models::RunStatus::Pending,
4112 dag_json: json!({ "steps": [primary_spec.clone(), compensation_spec.clone()], "edges": [] }),
4113 input_ctx: json!({}),
4114 seed: 13,
4115 idempotency_key: None,
4116 },
4117 &[
4118 models::NewStep {
4119 run_id,
4120 step_id: primary_id,
4121 idx: 0,
4122 priority: 1,
4123 spec_json: primary_spec,
4124 status: models::StepStatus::Queued,
4125 attempt: 0,
4126 input_snapshot: None,
4127 provider: None,
4128 provider_version: None,
4129 pending_dependencies: 0,
4130 total_dependencies: 0,
4131 estimated_tokens: 0,
4132 estimated_cost: 0.0,
4133 actual_tokens: None,
4134 actual_cost: None,
4135 ..models::NewStep::default()
4136 },
4137 models::NewStep {
4138 run_id,
4139 step_id: compensation_id,
4140 idx: 1,
4141 priority: 0,
4142 spec_json: compensation_spec,
4143 status: models::StepStatus::Blocked,
4144 attempt: 0,
4145 input_snapshot: None,
4146 provider: None,
4147 provider_version: None,
4148 pending_dependencies: 1,
4149 total_dependencies: 1,
4150 estimated_tokens: 0,
4151 estimated_cost: 0.0,
4152 actual_tokens: None,
4153 actual_cost: None,
4154 ..models::NewStep::default()
4155 },
4156 ],
4157 &[],
4158 )
4159 .await
4160 .unwrap();
4161
4162 let worker = scheduler.config.worker_id.clone();
4163 let queued = storage
4164 .claim_queued_steps(1, &worker)
4165 .await
4166 .unwrap()
4167 .into_iter()
4168 .map(QueuedStep::try_from)
4169 .next()
4170 .unwrap()
4171 .unwrap();
4172
4173 scheduler.execute_step(queued).await.unwrap();
4174
4175 let steps = storage.fetch_steps_for_run(run_id).await.unwrap();
4176 let primary = steps
4177 .iter()
4178 .find(|step| step.step_id == primary_id)
4179 .unwrap();
4180 assert_eq!(primary.status, models::StepStatus::Failed);
4181 assert!(primary.compensation_scheduled);
4182
4183 let compensation = steps
4184 .iter()
4185 .find(|step| step.step_id == compensation_id)
4186 .unwrap();
4187 assert_eq!(compensation.status, models::StepStatus::Queued);
4188 assert_eq!(compensation.pending_dependencies, 0);
4189
4190 drop(container);
4191 }
4192
4193 #[tokio::test]
4194 async fn compensation_step_executes_and_records_ledger() {
4195 if !docker_available() {
4196 eprintln!("Skipping compensation ledger test because Docker is unavailable");
4197 return;
4198 }
4199
4200 let (storage, container) = setup_storage().await;
4201 let mut registry = ExecutorRegistry::new();
4202 registry.register(Arc::new(ConditionalExecutor)).unwrap();
4203 let audit = Arc::new(Audit::new(storage.pool().clone()));
4204 let attestation_vault: Arc<dyn AttestationVault> =
4205 Arc::new(InMemoryAttestationVault::new());
4206 let scheduler = Scheduler::new(
4207 Arc::clone(&storage),
4208 Arc::new(registry),
4209 crate::policy::AllowAllPolicy::shared(),
4210 Arc::clone(&audit),
4211 Arc::clone(&attestation_vault),
4212 SchedulerConfig::default(),
4213 );
4214
4215 let run_id = Uuid::new_v4();
4216 let primary_id = Uuid::new_v4();
4217 let compensation_id = Uuid::new_v4();
4218
4219 let primary_spec = json!({
4220 "id": primary_id,
4221 "type": "tool",
4222 "inputs": { "mode": "primary" },
4223 "execution": {
4224 "max_attempts": 1,
4225 "compensation": { "step": compensation_id.to_string() },
4226 "cost_hint": { "tokens": 50, "usd": 1.5 }
4227 }
4228 });
4229 let compensation_spec = json!({
4230 "id": compensation_id,
4231 "type": "tool",
4232 "inputs": { "mode": "compensation" },
4233 "execution": { "cost_hint": { "tokens": 10, "usd": 0.5 } }
4234 });
4235
4236 storage
4237 .insert_run_with_steps(
4238 models::NewRun {
4239 run_id,
4240 status: models::RunStatus::Pending,
4241 dag_json: json!({
4242 "steps": [primary_spec.clone(), compensation_spec.clone()],
4243 "edges": []
4244 }),
4245 input_ctx: json!({}),
4246 seed: 17,
4247 idempotency_key: None,
4248 },
4249 &[
4250 models::NewStep {
4251 run_id,
4252 step_id: primary_id,
4253 idx: 0,
4254 priority: 1,
4255 spec_json: primary_spec,
4256 status: models::StepStatus::Queued,
4257 attempt: 0,
4258 input_snapshot: None,
4259 provider: None,
4260 provider_version: None,
4261 pending_dependencies: 0,
4262 total_dependencies: 0,
4263 estimated_tokens: 50,
4264 estimated_cost: 1.5,
4265 actual_tokens: None,
4266 actual_cost: None,
4267 ..models::NewStep::default()
4268 },
4269 models::NewStep {
4270 run_id,
4271 step_id: compensation_id,
4272 idx: 1,
4273 priority: 0,
4274 spec_json: compensation_spec,
4275 status: models::StepStatus::Blocked,
4276 attempt: 0,
4277 input_snapshot: None,
4278 provider: None,
4279 provider_version: None,
4280 pending_dependencies: 1,
4281 total_dependencies: 1,
4282 estimated_tokens: 10,
4283 estimated_cost: 0.5,
4284 actual_tokens: None,
4285 actual_cost: None,
4286 ..models::NewStep::default()
4287 },
4288 ],
4289 &[],
4290 )
4291 .await
4292 .unwrap();
4293
4294 let worker = scheduler.config.worker_id.clone();
4295 let queued = storage
4296 .claim_queued_steps(1, &worker)
4297 .await
4298 .unwrap()
4299 .into_iter()
4300 .map(QueuedStep::try_from)
4301 .next()
4302 .unwrap()
4303 .unwrap();
4304
4305 scheduler.execute_step(queued).await.unwrap();
4306
4307 let steps = storage.fetch_steps_for_run(run_id).await.unwrap();
4308 let primary = steps
4309 .iter()
4310 .find(|step| step.step_id == primary_id)
4311 .unwrap();
4312 assert_eq!(primary.status, models::StepStatus::Failed);
4313 assert!(primary.compensation_scheduled);
4314
4315 let compensation = storage
4316 .claim_queued_steps(1, &worker)
4317 .await
4318 .unwrap()
4319 .into_iter()
4320 .map(QueuedStep::try_from)
4321 .next()
4322 .unwrap()
4323 .unwrap();
4324
4325 scheduler.execute_step(compensation).await.unwrap();
4326
4327 let steps = storage.fetch_steps_for_run(run_id).await.unwrap();
4328 let primary = steps
4329 .iter()
4330 .find(|step| step.step_id == primary_id)
4331 .unwrap();
4332 assert_eq!(primary.status, models::StepStatus::Failed);
4333 assert!(primary.compensation_scheduled);
4334 let compensation = steps
4335 .iter()
4336 .find(|step| step.step_id == compensation_id)
4337 .unwrap();
4338 assert_eq!(compensation.status, models::StepStatus::Succeeded);
4339 assert_eq!(compensation.attempt, 1);
4340
4341 let ledger_rows = sqlx::query(
4342 "select step_id, status, reserved_tokens, reserved_cost, actual_tokens, actual_cost \
4343 from step_cost_ledger where run_id = $1 order by recorded_at",
4344 )
4345 .bind(run_id)
4346 .fetch_all(storage.pool())
4347 .await
4348 .unwrap();
4349
4350 let mut saw_failed = false;
4351 let mut saw_comp_scheduled = false;
4352 let mut saw_comp_success = false;
4353
4354 for row in ledger_rows {
4355 let step_id: Uuid = row.get("step_id");
4356 let status: String = row.get("status");
4357 if step_id == primary_id && status == "failed" {
4358 saw_failed = true;
4359 assert_eq!(row.get::<i64, _>("reserved_tokens"), 50);
4360 assert_eq!(row.get::<f64, _>("reserved_cost"), 1.5);
4361 } else if step_id == primary_id && status == "compensation_scheduled" {
4362 saw_comp_scheduled = true;
4363 } else if step_id == compensation_id && status == "succeeded" {
4364 saw_comp_success = true;
4365 assert_eq!(row.get::<i64, _>("reserved_tokens"), 10);
4366 assert_eq!(row.get::<f64, _>("reserved_cost"), 0.5);
4367 assert_eq!(row.get::<Option<i64>, _>("actual_tokens"), Some(5));
4368 assert_eq!(row.get::<Option<f64>, _>("actual_cost"), Some(0.25));
4369 }
4370 }
4371
4372 assert!(saw_failed, "missing failed ledger entry for primary step");
4373 assert!(
4374 saw_comp_scheduled,
4375 "missing compensation_scheduled ledger entry for primary step"
4376 );
4377 assert!(
4378 saw_comp_success,
4379 "missing succeeded ledger entry for compensation step"
4380 );
4381
4382 drop(container);
4383 }
4384
4385 #[tokio::test]
4386 async fn failing_step_skips_dependents() {
4387 if std::fs::metadata("/var/run/docker.sock").is_err()
4388 && std::env::var("DOCKER_HOST").is_err()
4389 {
4390 eprintln!("Skipping scheduler tests because Docker is unavailable");
4391 return;
4392 }
4393
4394 let (storage, container) = setup_storage().await;
4395 let mut registry = ExecutorRegistry::new();
4396 registry.register(Arc::new(FailingExecutor)).unwrap();
4397 let audit = Arc::new(Audit::new(storage.pool().clone()));
4398 let attestation_vault: Arc<dyn AttestationVault> =
4399 Arc::new(InMemoryAttestationVault::new());
4400
4401 let scheduler = Scheduler::new(
4402 Arc::clone(&storage),
4403 Arc::new(registry),
4404 crate::policy::AllowAllPolicy::shared(),
4405 Arc::clone(&audit),
4406 Arc::clone(&attestation_vault),
4407 SchedulerConfig::default(),
4408 );
4409
4410 let run_id = Uuid::new_v4();
4411 let root_id = Uuid::new_v4();
4412 let child_id = Uuid::new_v4();
4413
4414 let root_step = models::NewStep {
4415 run_id,
4416 step_id: root_id,
4417 idx: 0,
4418 priority: 0,
4419 spec_json: json!({"id": root_id, "type": "tool", "inputs": {}, "policy": {}}),
4420 status: models::StepStatus::Queued,
4421 attempt: 0,
4422 input_snapshot: None,
4423 provider: None,
4424 provider_version: None,
4425 pending_dependencies: 0,
4426 total_dependencies: 0,
4427 estimated_tokens: 0,
4428 estimated_cost: 0.0,
4429 actual_tokens: None,
4430 actual_cost: None,
4431 ..models::NewStep::default()
4432 };
4433 let child_step = models::NewStep {
4434 run_id,
4435 step_id: child_id,
4436 idx: 1,
4437 priority: 0,
4438 spec_json: json!({"id": child_id, "type": "tool", "inputs": {}, "policy": {}}),
4439 status: models::StepStatus::Blocked,
4440 attempt: 0,
4441 input_snapshot: None,
4442 provider: None,
4443 provider_version: None,
4444 pending_dependencies: 1,
4445 total_dependencies: 1,
4446 estimated_tokens: 0,
4447 estimated_cost: 0.0,
4448 actual_tokens: None,
4449 actual_cost: None,
4450 ..models::NewStep::default()
4451 };
4452
4453 let _ = storage
4454 .insert_run_with_steps(
4455 models::NewRun {
4456 run_id,
4457 status: models::RunStatus::Pending,
4458 dag_json: json!({
4459 "steps": [root_step.spec_json.clone(), child_step.spec_json.clone()],
4460 "edges": [[root_id, child_id]]
4461 }),
4462 input_ctx: json!({}),
4463 seed: 1,
4464 idempotency_key: None,
4465 },
4466 &[root_step, child_step],
4467 &[(root_id, child_id)],
4468 )
4469 .await
4470 .unwrap();
4471
4472 let cfg = SchedulerConfig::default();
4473 let leased = storage.claim_queued_steps(1, &cfg.worker_id).await.unwrap();
4474 let queued = leased
4475 .into_iter()
4476 .map(QueuedStep::try_from)
4477 .next()
4478 .unwrap()
4479 .unwrap();
4480
4481 scheduler.execute_step(queued).await.unwrap();
4482
4483 let steps = storage.fetch_steps_for_run(run_id).await.unwrap();
4484 let root = steps.iter().find(|step| step.step_id == root_id).unwrap();
4485 let child = steps.iter().find(|step| step.step_id == child_id).unwrap();
4486
4487 assert_eq!(root.status, models::StepStatus::Failed);
4488 assert_eq!(child.status, models::StepStatus::Skipped);
4489
4490 let events = storage.fetch_unpublished_outbox(10).await.unwrap();
4491 assert!(events.iter().any(|ev| ev.kind == "step_failed"));
4492 assert!(events.iter().any(|ev| ev.kind == "step_skipped"));
4493
4494 drop(container);
4495 }
4496
4497 #[tokio::test]
4498 async fn redact_decision_logs_and_allows() {
4499 if std::fs::metadata("/var/run/docker.sock").is_err()
4500 && std::env::var("DOCKER_HOST").is_err()
4501 {
4502 eprintln!("Skipping scheduler tests because Docker is unavailable");
4503 return;
4504 }
4505
4506 let (storage, container) = setup_storage().await;
4507 let executed = Arc::new(AtomicBool::new(false));
4508 let mut registry = ExecutorRegistry::new();
4509 registry
4510 .register(Arc::new(UsageExecutor {
4511 executed: Arc::clone(&executed),
4512 }))
4513 .unwrap();
4514 let audit = Arc::new(Audit::new(storage.pool().clone()));
4515 let attestation_vault: Arc<dyn AttestationVault> =
4516 Arc::new(InMemoryAttestationVault::new());
4517
4518 let scheduler = Scheduler::new(
4519 Arc::clone(&storage),
4520 Arc::new(registry),
4521 Arc::new(RedactPolicy),
4522 Arc::clone(&audit),
4523 Arc::clone(&attestation_vault),
4524 SchedulerConfig::default(),
4525 );
4526
4527 let run_id = Uuid::new_v4();
4528 let step_id = Uuid::new_v4();
4529
4530 let _ = storage
4531 .insert_run_with_steps(
4532 models::NewRun {
4533 run_id,
4534 status: models::RunStatus::Pending,
4535 dag_json: json!({"steps": [], "edges": []}),
4536 input_ctx: json!({}),
4537 seed: 1,
4538 idempotency_key: None,
4539 },
4540 &[seed_step(run_id, step_id)],
4541 &[],
4542 )
4543 .await
4544 .unwrap();
4545
4546 let cfg = SchedulerConfig::default();
4547 let leased = storage.claim_queued_steps(1, &cfg.worker_id).await.unwrap();
4548 let queued = leased
4549 .into_iter()
4550 .map(QueuedStep::try_from)
4551 .next()
4552 .unwrap()
4553 .unwrap();
4554
4555 scheduler.execute_step(queued).await.unwrap();
4556
4557 assert!(executed.load(Ordering::SeqCst));
4558
4559 let steps = storage.fetch_steps_for_run(run_id).await.unwrap();
4560 let step = &steps[0];
4561 assert_eq!(step.status, models::StepStatus::Succeeded);
4562 assert_eq!(step.attempt, 1);
4563 assert_eq!(step.actual_tokens, Some(10));
4564
4565 let events = storage.fetch_unpublished_outbox(10).await.unwrap();
4566 let event = events
4567 .iter()
4568 .find(|ev| ev.kind == "step_succeeded")
4569 .unwrap();
4570 assert_eq!(event.payload["budget"]["actual"]["tokens"], json!(10));
4571
4572 drop(container);
4573 }
4574
4575 #[tokio::test]
4576 async fn reserve_budget_granted_denied_unlimited() {
4577 if std::fs::metadata("/var/run/docker.sock").is_err()
4578 && std::env::var("DOCKER_HOST").is_err()
4579 {
4580 eprintln!("Skipping scheduler tests because Docker is unavailable");
4581 return;
4582 }
4583
4584 let (storage, container) = setup_storage().await;
4585 let registry = ExecutorRegistry::new();
4586 let audit = Arc::new(Audit::new(storage.pool().clone()));
4587 let attestation_vault: Arc<dyn AttestationVault> =
4588 Arc::new(InMemoryAttestationVault::new());
4589 let scheduler = Scheduler::new(
4590 Arc::clone(&storage),
4591 Arc::new(registry),
4592 Arc::new(RedactPolicy),
4593 Arc::clone(&audit),
4594 Arc::clone(&attestation_vault),
4595 SchedulerConfig::default(),
4596 );
4597
4598 let run_id = Uuid::new_v4();
4599 sqlx::query(
4601 "insert into budgets (budget_id, run_id, tokens_limit, tokens_used, cost_limit, cost_used) values (gen_random_uuid(), $1, 100, 0, 10.0, 0.0)"
4602 )
4603 .bind(run_id)
4604 .execute(&storage.pool())
4605 .await
4606 .unwrap();
4607
4608 let granted = scheduler
4609 .reserve_budget(
4610 RunId(run_id),
4611 &BudgetRequest {
4612 tokens: 10,
4613 cost: 1.0,
4614 },
4615 )
4616 .await
4617 .unwrap();
4618 match granted {
4619 BudgetDecision::Granted(ctx) => {
4620 assert_eq!(ctx.tokens_reserved, 10);
4621 assert_eq!(ctx.cost_reserved, 1.0);
4622 assert!(ctx.tracked);
4623 }
4624 _ => panic!("expected granted"),
4625 }
4626
4627 let denied = scheduler
4628 .reserve_budget(
4629 RunId(run_id),
4630 &BudgetRequest {
4631 tokens: 200,
4632 cost: 20.0,
4633 },
4634 )
4635 .await
4636 .unwrap();
4637 match denied {
4638 BudgetDecision::Denied { reason } => {
4639 assert!(reason.contains("budget exhausted"));
4640 }
4641 _ => panic!("expected denied"),
4642 }
4643
4644 let unlimited = scheduler
4645 .reserve_budget(
4646 RunId(Uuid::new_v4()),
4647 &BudgetRequest {
4648 tokens: 0,
4649 cost: 0.0,
4650 },
4651 )
4652 .await
4653 .unwrap();
4654 match unlimited {
4655 BudgetDecision::Unlimited(ctx) => {
4656 assert_eq!(ctx.tokens_reserved, 0);
4657 assert!(!ctx.tracked);
4658 }
4659 _ => panic!("expected unlimited"),
4660 }
4661
4662 drop(container);
4663 }
4664
4665 #[tokio::test]
4666 async fn zero_request_uses_existing_budget() {
4667 if std::fs::metadata("/var/run/docker.sock").is_err()
4668 && std::env::var("DOCKER_HOST").is_err()
4669 {
4670 eprintln!("Skipping scheduler tests because Docker is unavailable");
4671 return;
4672 }
4673
4674 let (storage, container) = setup_storage().await;
4675 let audit = Arc::new(Audit::new(storage.pool().clone()));
4676 let attestation_vault: Arc<dyn AttestationVault> =
4677 Arc::new(InMemoryAttestationVault::new());
4678 let scheduler = Scheduler::new(
4679 Arc::clone(&storage),
4680 Arc::new(ExecutorRegistry::new()),
4681 Arc::new(RedactPolicy),
4682 Arc::clone(&audit),
4683 Arc::clone(&attestation_vault),
4684 SchedulerConfig::default(),
4685 );
4686
4687 let run_id = Uuid::new_v4();
4688 sqlx::query(
4689 "insert into budgets (budget_id, run_id, tokens_limit, tokens_used, cost_limit, cost_used) values (gen_random_uuid(), $1, 50, 20, 5.0, 2.5)"
4690 )
4691 .bind(run_id)
4692 .execute(&storage.pool())
4693 .await
4694 .unwrap();
4695
4696 let decision = scheduler
4697 .reserve_budget(
4698 RunId(run_id),
4699 &BudgetRequest {
4700 tokens: 0,
4701 cost: 0.0,
4702 },
4703 )
4704 .await
4705 .unwrap();
4706
4707 match decision {
4708 BudgetDecision::Granted(ctx) => {
4709 assert_eq!(ctx.tokens_used, 20);
4710 assert_eq!(ctx.token_limit, Some(50));
4711 assert_eq!(ctx.cost_used, 2.5);
4712 assert_eq!(ctx.cost_limit, Some(5.0));
4713 }
4714 other => panic!("expected granted but got {:?}", other),
4715 }
4716
4717 drop(container);
4718 }
4719
4720 #[cfg(feature = "insecure-shell")]
4721 #[tokio::test]
4722 async fn end_to_end_shell_execution() {
4723 if std::fs::metadata("/var/run/docker.sock").is_err()
4724 && std::env::var("DOCKER_HOST").is_err()
4725 {
4726 eprintln!("Skipping scheduler tests because Docker is unavailable");
4727 return;
4728 }
4729
4730 std::env::set_var("FLEETFORGE_ALLOWED_TOOLS", "/bin/echo");
4731 std::env::set_var("FLEETFORGE_ALLOW_SHELL", "1");
4732
4733 let docker = Cli::default();
4734 let container = docker.run(Postgres::default());
4735 let port = container.get_host_port_ipv4(5432);
4736 let database_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
4737 sleep(Duration::from_millis(250)).await;
4738
4739 let storage = Arc::new(
4740 Storage::connect(StorageConfig {
4741 database_url,
4742 max_connections: 2,
4743 connect_timeout: Duration::from_secs(10),
4744 object_store: ObjectStoreConfig::InMemory,
4745 })
4746 .await
4747 .expect("connect storage"),
4748 );
4749
4750 let mut registry = ExecutorRegistry::new();
4751 registry
4752 .register(Arc::new(
4753 ShellExecutor::new().expect("shell executor should be enabled for test"),
4754 ))
4755 .expect("register shell executor");
4756 let audit = Arc::new(Audit::new(storage.pool().clone()));
4757 let attestation_vault: Arc<dyn AttestationVault> =
4758 Arc::new(InMemoryAttestationVault::new());
4759
4760 let scheduler = Arc::new(Scheduler::new(
4761 Arc::clone(&storage),
4762 Arc::new(registry),
4763 crate::policy::AllowAllPolicy::shared(),
4764 Arc::clone(&audit),
4765 Arc::clone(&attestation_vault),
4766 SchedulerConfig {
4767 batch_size: 2,
4768 idle_backoff: Duration::from_millis(50),
4769 active_backoff: Duration::from_millis(10),
4770 transparency_writer: false,
4771 ..SchedulerConfig::default()
4772 },
4773 ));
4774
4775 let run_id = Uuid::new_v4();
4776 let root_id = Uuid::new_v4();
4777 let child_id = Uuid::new_v4();
4778
4779 storage
4780 .insert_run_with_steps(
4781 models::NewRun {
4782 run_id,
4783 status: models::RunStatus::Pending,
4784 dag_json: json!({
4785 "steps": [
4786 {
4787 "id": root_id,
4788 "type": "tool",
4789 "inputs": {"command": ["/bin/echo", "root"]},
4790 "policy": {}
4791 },
4792 {
4793 "id": child_id,
4794 "type": "tool",
4795 "inputs": {"command": ["/bin/echo", "child"]},
4796 "policy": {}
4797 }
4798 ],
4799 "edges": [[root_id, child_id]]
4800 }),
4801 input_ctx: json!({}),
4802 seed: 42,
4803 idempotency_key: None,
4804 },
4805 &[
4806 models::NewStep {
4807 run_id,
4808 step_id: root_id,
4809 idx: 0,
4810 priority: 1,
4811 spec_json: json!({
4812 "id": root_id,
4813 "type": "tool",
4814 "inputs": {"command": ["/bin/echo", "root"]},
4815 "policy": {}
4816 }),
4817 status: models::StepStatus::Queued,
4818 attempt: 0,
4819 input_snapshot: None,
4820 provider: None,
4821 provider_version: None,
4822 pending_dependencies: 0,
4823 total_dependencies: 0,
4824 estimated_tokens: 1,
4825 estimated_cost: 0.0,
4826 actual_tokens: None,
4827 actual_cost: None,
4828 ..models::NewStep::default()
4829 },
4830 models::NewStep {
4831 run_id,
4832 step_id: child_id,
4833 idx: 1,
4834 priority: 1,
4835 spec_json: json!({
4836 "id": child_id,
4837 "type": "tool",
4838 "inputs": {"command": ["/bin/echo", "child"]},
4839 "policy": {}
4840 }),
4841 status: models::StepStatus::Blocked,
4842 attempt: 0,
4843 input_snapshot: None,
4844 provider: None,
4845 provider_version: None,
4846 pending_dependencies: 1,
4847 total_dependencies: 1,
4848 estimated_tokens: 1,
4849 estimated_cost: 0.0,
4850 actual_tokens: None,
4851 actual_cost: None,
4852 ..models::NewStep::default()
4853 },
4854 ],
4855 &[(root_id, child_id)],
4856 )
4857 .await
4858 .expect("seed run");
4859
4860 let initial = storage.fetch_steps_for_run(run_id).await.unwrap();
4861 assert_eq!(initial.len(), 2);
4862 let root = initial.iter().find(|s| s.step_id == root_id).unwrap();
4863 let child = initial.iter().find(|s| s.step_id == child_id).unwrap();
4864 assert_eq!(root.status, models::StepStatus::Queued);
4865 assert_eq!(child.status, models::StepStatus::Blocked);
4866
4867 let running = Arc::clone(&scheduler);
4868 let handle = tokio::spawn(async move {
4869 let _ = running.run().await;
4870 });
4871
4872 tokio::time::sleep(Duration::from_secs(1)).await;
4873 handle.abort();
4874
4875 let steps = storage.fetch_steps_for_run(run_id).await.unwrap();
4876 let root = steps.iter().find(|s| s.step_id == root_id).unwrap();
4877 let child = steps.iter().find(|s| s.step_id == child_id).unwrap();
4878 assert_eq!(root.status, models::StepStatus::Succeeded);
4879 assert_eq!(child.status, models::StepStatus::Succeeded);
4880 assert_eq!(root.attempt, 1);
4881 assert_eq!(child.attempt, 1);
4882
4883 let run_status = storage.fetch_run(run_id).await.unwrap().unwrap().status;
4884 assert_eq!(run_status, models::RunStatus::Succeeded);
4885
4886 drop(container);
4887 }
4888
4889 #[cfg(feature = "insecure-shell")]
4890 #[tokio::test]
4891 async fn lease_reaper_requeues_expired() {
4892 if std::fs::metadata("/var/run/docker.sock").is_err()
4893 && std::env::var("DOCKER_HOST").is_err()
4894 {
4895 eprintln!("Skipping scheduler tests because Docker is unavailable");
4896 return;
4897 }
4898
4899 std::env::set_var("FLEETFORGE_ALLOWED_TOOLS", "/bin/echo");
4900 std::env::set_var("FLEETFORGE_ALLOW_SHELL", "1");
4901
4902 let docker = Cli::default();
4903 let container = docker.run(Postgres::default());
4904 let port = container.get_host_port_ipv4(5432);
4905 let database_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
4906 sleep(Duration::from_millis(250)).await;
4907
4908 let storage = Arc::new(
4909 Storage::connect(StorageConfig {
4910 database_url,
4911 max_connections: 2,
4912 connect_timeout: Duration::from_secs(10),
4913 object_store: ObjectStoreConfig::InMemory,
4914 })
4915 .await
4916 .expect("connect storage"),
4917 );
4918
4919 let mut registry = ExecutorRegistry::new();
4920 registry
4921 .register(Arc::new(
4922 ShellExecutor::new().expect("shell executor should be enabled for test"),
4923 ))
4924 .expect("register shell executor");
4925 let audit = Arc::new(Audit::new(storage.pool().clone()));
4926 let attestation_vault: Arc<dyn AttestationVault> =
4927 Arc::new(InMemoryAttestationVault::new());
4928
4929 let scheduler = Arc::new(Scheduler::new(
4930 Arc::clone(&storage),
4931 Arc::new(registry),
4932 crate::policy::AllowAllPolicy::shared(),
4933 Arc::clone(&audit),
4934 Arc::clone(&attestation_vault),
4935 SchedulerConfig {
4936 batch_size: 1,
4937 idle_backoff: Duration::from_millis(50),
4938 active_backoff: Duration::from_millis(10),
4939 reaper_interval: Duration::from_millis(200),
4940 transparency_writer: false,
4941 ..SchedulerConfig::default()
4942 },
4943 ));
4944
4945 let run_id = Uuid::new_v4();
4946 let step_id = Uuid::new_v4();
4947
4948 storage
4949 .insert_run_with_steps(
4950 models::NewRun {
4951 run_id,
4952 status: models::RunStatus::Pending,
4953 dag_json: json!({
4954 "steps": [{
4955 "id": step_id,
4956 "type": "tool",
4957 "inputs": {"command": ["/bin/echo", "late"]},
4958 "policy": {}
4959 }],
4960 "edges": []
4961 }),
4962 input_ctx: json!({}),
4963 seed: 1,
4964 idempotency_key: None,
4965 },
4966 &[models::NewStep {
4967 run_id,
4968 step_id,
4969 idx: 0,
4970 priority: 1,
4971 spec_json: json!({
4972 "id": step_id,
4973 "type": "tool",
4974 "inputs": {"command": ["/bin/echo", "late"]},
4975 "policy": {}
4976 }),
4977 status: models::StepStatus::Queued,
4978 attempt: 0,
4979 input_snapshot: None,
4980 provider: None,
4981 provider_version: None,
4982 pending_dependencies: 0,
4983 total_dependencies: 0,
4984 estimated_tokens: 0,
4985 estimated_cost: 0.0,
4986 actual_tokens: None,
4987 actual_cost: None,
4988 ..models::NewStep::default()
4989 }],
4990 &[],
4991 )
4992 .await
4993 .expect("seed run");
4994
4995 sqlx::query(
4997 "update steps set status = 'leased', leased_by = 'worker-a', lease_expires_at = now() - interval '1 second' where run_id = $1 and step_id = $2"
4998 )
4999 .bind(run_id)
5000 .bind(step_id)
5001 .execute(storage.pool())
5002 .await
5003 .expect("mark leased");
5004
5005 let running = Arc::clone(&scheduler);
5006 let handle = tokio::spawn(async move {
5007 let _ = running.run().await;
5008 });
5009
5010 tokio::time::sleep(Duration::from_secs(2)).await;
5011 handle.abort();
5012
5013 let step = storage
5014 .fetch_steps_for_run(run_id)
5015 .await
5016 .unwrap()
5017 .first()
5018 .cloned()
5019 .unwrap();
5020 assert_eq!(step.status, models::StepStatus::Succeeded);
5021 assert_eq!(step.attempt, 1);
5022
5023 drop(container);
5024 }
5025
5026 #[cfg(feature = "insecure-shell")]
5027 #[tokio::test]
5028 async fn budget_denial_records_failure() {
5029 if std::fs::metadata("/var/run/docker.sock").is_err()
5030 && std::env::var("DOCKER_HOST").is_err()
5031 {
5032 eprintln!("Skipping scheduler tests because Docker is unavailable");
5033 return;
5034 }
5035
5036 std::env::set_var("FLEETFORGE_ALLOWED_TOOLS", "/bin/echo");
5037 std::env::set_var("FLEETFORGE_ALLOW_SHELL", "1");
5038
5039 let docker = Cli::default();
5040 let container = docker.run(Postgres::default());
5041 let port = container.get_host_port_ipv4(5432);
5042 let database_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
5043 sleep(Duration::from_millis(250)).await;
5044
5045 let storage = Arc::new(
5046 Storage::connect(StorageConfig {
5047 database_url,
5048 max_connections: 2,
5049 connect_timeout: Duration::from_secs(10),
5050 object_store: ObjectStoreConfig::InMemory,
5051 })
5052 .await
5053 .expect("connect storage"),
5054 );
5055
5056 let mut registry = ExecutorRegistry::new();
5057 registry
5058 .register(Arc::new(
5059 ShellExecutor::new().expect("shell executor should be enabled for test"),
5060 ))
5061 .expect("register shell executor");
5062 let audit = Arc::new(Audit::new(storage.pool().clone()));
5063 let attestation_vault: Arc<dyn AttestationVault> =
5064 Arc::new(InMemoryAttestationVault::new());
5065
5066 let scheduler = Arc::new(Scheduler::new(
5067 Arc::clone(&storage),
5068 Arc::new(registry),
5069 crate::policy::AllowAllPolicy::shared(),
5070 Arc::clone(&audit),
5071 Arc::clone(&attestation_vault),
5072 SchedulerConfig::default(),
5073 ));
5074
5075 let run_id = Uuid::new_v4();
5076 let step_id = Uuid::new_v4();
5077
5078 storage
5079 .insert_run_with_steps(
5080 models::NewRun {
5081 run_id,
5082 status: models::RunStatus::Pending,
5083 dag_json: json!({
5084 "steps": [{
5085 "id": step_id,
5086 "type": "tool",
5087 "inputs": {"command": ["/bin/echo", "over"]},
5088 "policy": {"budget": {"tokens": 10, "cost": 0.0}}
5089 }],
5090 "edges": []
5091 }),
5092 input_ctx: json!({}),
5093 seed: 1,
5094 idempotency_key: None,
5095 },
5096 &[models::NewStep {
5097 run_id,
5098 step_id,
5099 idx: 0,
5100 priority: 1,
5101 spec_json: json!({
5102 "id": step_id,
5103 "type": "tool",
5104 "inputs": {"command": ["/bin/echo", "over"]},
5105 "policy": {"budget": {"tokens": 10, "cost": 0.0}}
5106 }),
5107 status: models::StepStatus::Queued,
5108 attempt: 0,
5109 input_snapshot: None,
5110 provider: None,
5111 provider_version: None,
5112 pending_dependencies: 0,
5113 total_dependencies: 0,
5114 estimated_tokens: 20,
5115 estimated_cost: 0.0,
5116 actual_tokens: None,
5117 actual_cost: None,
5118 ..models::NewStep::default()
5119 }],
5120 &[],
5121 )
5122 .await
5123 .expect("seed run");
5124
5125 let running = Arc::clone(&scheduler);
5126 let handle = tokio::spawn(async move {
5127 let _ = running.run().await;
5128 });
5129
5130 tokio::time::sleep(Duration::from_millis(500)).await;
5131 handle.abort();
5132
5133 let steps = storage.fetch_steps_for_run(run_id).await.unwrap();
5134 let step = steps.first().unwrap();
5135 assert_eq!(step.status, models::StepStatus::Failed);
5136 let error = step.error_json.clone().unwrap();
5137 assert_eq!(error["kind"], json!("budget_exceeded"));
5138
5139 let events = storage.fetch_unpublished_outbox(10).await.unwrap();
5140 let failed_event = events.iter().find(|ev| ev.kind == "step_failed").unwrap();
5141 assert_eq!(
5142 failed_event.payload["error"]["kind"],
5143 json!("budget_exceeded")
5144 );
5145
5146 drop(container);
5147 }
5148}