fleetforge_runtime/
scheduler.rs

1use std::cmp::Ordering;
2use std::collections::HashMap;
3use std::env;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use crate::{event_policy, span_step};
8use anyhow::{anyhow, Context, Result};
9use base64::engine::general_purpose::STANDARD as BASE64;
10use base64::Engine;
11use bytes::Bytes;
12use chrono::{Duration as ChronoDuration, Utc};
13use fleetforge_telemetry::otel::{
14    self as telemetry_otel, Counter, Histogram, Meter, SpanId, TraceId,
15};
16use serde_json::{json, to_value, Map, Value};
17use tracing::{debug, error, info, warn};
18use tracing_opentelemetry::OpenTelemetrySpanExt;
19use uuid::Uuid;
20
21const C2PA_PROFILE_ENV: &str = "FLEETFORGE_C2PA_PROFILE";
22const DEFAULT_QUEUE_TARGET_MS: i64 = 30_000;
23const MAX_INFLIGHT_TOKENS_ENV: &str = "FLEETFORGE_MAX_INFLIGHT_TOKENS";
24const MAX_INFLIGHT_COST_ENV: &str = "FLEETFORGE_MAX_INFLIGHT_COST";
25const BACKPRESSURE_DELAY_ENV: &str = "FLEETFORGE_QUEUE_BACKPRESSURE_MS";
26
27use crate::aibom::build_aibom;
28use crate::capability;
29use crate::executor::{
30    artifact_meta_to_value, guardrail_budget_view, BudgetCtx, ExecutorRegistry, StepCtx,
31    StepExecutionResult, StepSlo, ToolEventRecord,
32};
33use crate::guardrails::{PolicyBundle, PolicyDecisionTrace, PolicyEvaluationRecord};
34use crate::model::{QueuedStep, RunId, StepExecution, StepId, StepSpec, StepStatus, StepType};
35use crate::otel::{record_attribute_str, record_trust_attributes};
36use crate::policy::{enforce_policy, PolicyOutcome, RuntimePolicyPack};
37use fleetforge_policy::DecisionEffect;
38use fleetforge_common::licensing::{feature_allowed, LicensedFeature};
39use fleetforge_storage::models::{self, NewOutboxEvent};
40use fleetforge_storage::{ArtifactStore, BudgetResult, Storage};
41use fleetforge_telemetry::audit::Audit;
42use fleetforge_telemetry::context::{RunScope, StepScope, TraceContext};
43use fleetforge_trust::{
44    capability_signer, generate_c2pa_manifest, mint_capability_token, trust_signer,
45    AttestationVault, CapabilityToken, CapabilityTokenSubject, IdentityEvidence, ManifestInput,
46    ManifestProfile, PolicyEvidence, Trust, TrustBoundary, TrustOrigin, TrustSource, Untrusted,
47};
48use tokio::sync::{Mutex, Semaphore};
49
50#[derive(Clone, Copy, Debug, PartialEq, Eq)]
51enum TransparencyScope {
52    Disabled,
53    Gates,
54    Runs,
55    Artifacts,
56}
57
58impl TransparencyScope {
59    fn includes_gates(self) -> bool {
60        matches!(
61            self,
62            TransparencyScope::Gates | TransparencyScope::Runs | TransparencyScope::Artifacts
63        )
64    }
65
66    fn includes_runs(self) -> bool {
67        matches!(self, TransparencyScope::Runs | TransparencyScope::Artifacts)
68    }
69
70    fn includes_artifacts(self) -> bool {
71        matches!(self, TransparencyScope::Artifacts)
72    }
73}
74
75#[derive(Clone, Debug)]
76struct SloContract {
77    tier: Option<String>,
78    queue_target_ms: Option<i64>,
79    priority_boost: i32,
80    error_budget_ratio: Option<f64>,
81}
82
83#[derive(Clone, Debug)]
84struct SloScore {
85    has_contract: bool,
86    slack_ms: i64,
87    wait_ms: i64,
88    priority: i32,
89    queue_target_ms: Option<i64>,
90    tier: Option<String>,
91    error_budget_ratio: Option<f64>,
92}
93
94impl SloScore {
95    fn compare(&self, other: &Self) -> Ordering {
96        match (self.has_contract, other.has_contract) {
97            (true, false) => Ordering::Less,
98            (false, true) => Ordering::Greater,
99            _ => {
100                if self.has_contract && other.has_contract {
101                    let self_breach = self.slack_ms < 0;
102                    let other_breach = other.slack_ms < 0;
103                    match (self_breach, other_breach) {
104                        (true, false) => return Ordering::Less,
105                        (false, true) => return Ordering::Greater,
106                        _ => {}
107                    }
108                    match self.slack_ms.cmp(&other.slack_ms) {
109                        Ordering::Equal => {}
110                        ord => return ord,
111                    }
112                }
113
114                match other.priority.cmp(&self.priority) {
115                    Ordering::Equal => self.wait_ms.cmp(&other.wait_ms),
116                    ord => ord,
117                }
118            }
119        }
120    }
121}
122
123fn trace_id_from_uuid(uuid: &Uuid) -> TraceId {
124    TraceId::from_bytes(*uuid.as_bytes())
125}
126
127fn span_id_from_uuid(uuid: &Uuid) -> SpanId {
128    let raw = uuid.as_u128();
129    let mut span_bits = ((raw >> 64) as u64) ^ (raw as u64);
130    if span_bits == 0 {
131        span_bits = 1;
132    }
133    SpanId::from_bytes(span_bits.to_be_bytes())
134}
135
136fn extract_slo_contract(spec: &StepSpec) -> Option<SloContract> {
137    let slo_value = spec.policy.get("slo")?;
138    if !slo_value.is_object() {
139        return None;
140    }
141    Some(SloContract {
142        tier: slo_value
143            .get("tier")
144            .and_then(Value::as_str)
145            .map(|s| s.to_string()),
146        queue_target_ms: slo_value.get("queue_target_ms").and_then(Value::as_i64),
147        priority_boost: slo_value
148            .get("priority_boost")
149            .and_then(Value::as_i64)
150            .unwrap_or(0) as i32,
151        error_budget_ratio: slo_value.get("error_budget_ratio").and_then(Value::as_f64),
152    })
153}
154
155fn compute_slo_score(
156    step: &QueuedStep,
157    contract: Option<&SloContract>,
158    now: chrono::DateTime<Utc>,
159) -> SloScore {
160    let wait_ms = now
161        .signed_duration_since(step.created_at)
162        .num_milliseconds()
163        .max(0);
164
165    if let Some(contract) = contract {
166        let target_ms = contract.queue_target_ms.unwrap_or(DEFAULT_QUEUE_TARGET_MS);
167        let slack_ms = target_ms - wait_ms;
168        SloScore {
169            has_contract: true,
170            slack_ms,
171            wait_ms,
172            priority: i32::from(step.priority) + contract.priority_boost,
173            queue_target_ms: contract.queue_target_ms,
174            tier: contract.tier.clone(),
175            error_budget_ratio: contract.error_budget_ratio,
176        }
177    } else {
178        SloScore {
179            has_contract: false,
180            slack_ms: i64::MAX,
181            wait_ms,
182            priority: i32::from(step.priority),
183            queue_target_ms: None,
184            tier: None,
185            error_budget_ratio: None,
186        }
187    }
188}
189
190fn build_step_slo(step: &QueuedStep, now: chrono::DateTime<Utc>) -> Option<StepSlo> {
191    let contract = extract_slo_contract(&step.spec)?;
192    let score = compute_slo_score(step, Some(&contract), now);
193    Some(StepSlo {
194        tier: contract.tier.clone(),
195        queue_target_ms: Some(contract.queue_target_ms.unwrap_or(DEFAULT_QUEUE_TARGET_MS)),
196        observed_queue_ms: Some(score.wait_ms),
197        slack_ms: Some(score.slack_ms),
198        priority_boost: contract.priority_boost,
199        error_budget_ratio: contract.error_budget_ratio,
200    })
201}
202
203fn resolve_step_providers(spec: &crate::model::StepSpec) -> (Option<String>, Option<String>) {
204    let provider = spec
205        .inputs
206        .get("provider")
207        .and_then(Value::as_str)
208        .or_else(|| spec.inputs.get("model").and_then(Value::as_str))
209        .or_else(|| spec.inputs.get("tool").and_then(Value::as_str))
210        .map(|s| s.to_owned());
211    let provider_version = spec
212        .inputs
213        .get("provider_version")
214        .or_else(|| spec.inputs.get("model_version"))
215        .and_then(Value::as_str)
216        .map(|s| s.to_owned());
217    (provider, provider_version)
218}
219
220fn build_scheduler_payload(
221    spec: &crate::model::StepSpec,
222    step: &QueuedStep,
223    budget: &BudgetCtx,
224    slo: Option<&StepSlo>,
225) -> Value {
226    let execution = serde_json::to_value(&spec.execution).unwrap_or(Value::Null);
227    json!({
228        "inputs": spec.inputs,
229        "policy": spec.policy,
230        "execution": execution,
231        "step": {
232            "id": step.step_id.to_string(),
233            "type": spec.r#type,
234            "slug": spec.slug,
235            "attempt": step.attempt,
236            "max_attempts": spec.execution.max_attempts,
237        },
238        "budget": guardrail_budget_view(budget, slo),
239    })
240}
241
242#[derive(Debug, Default)]
243struct InFlightBudget {
244    tokens: i64,
245    cost: f64,
246}
247
248impl InFlightBudget {
249    fn add(&mut self, tokens: i64, cost: f64) {
250        if tokens > 0 {
251            self.tokens = self.tokens.saturating_add(tokens);
252        }
253        if cost > 0.0 {
254            self.cost += cost;
255        }
256    }
257
258    fn subtract(&mut self, tokens: i64, cost: f64) {
259        if tokens > 0 {
260            self.tokens = (self.tokens - tokens).max(0);
261        }
262        if cost > 0.0 {
263            self.cost = (self.cost - cost).max(0.0);
264        }
265    }
266}
267
268#[derive(Clone, Debug)]
269struct BreakpointRecord {
270    spec: String,
271    step_id: Uuid,
272}
273
274fn load_debug_breakpoints(debug: &Map<String, Value>) -> Vec<BreakpointRecord> {
275    debug
276        .get("breakpoints")
277        .and_then(Value::as_array)
278        .map(|items| {
279            items
280                .iter()
281                .filter_map(|entry| {
282                    let obj = entry.as_object()?;
283                    let spec = obj.get("spec")?.as_str()?.to_string();
284                    let step_id = obj
285                        .get("step_id")
286                        .and_then(Value::as_str)
287                        .and_then(|s| Uuid::parse_str(s).ok())?;
288                    Some(BreakpointRecord { spec, step_id })
289                })
290                .collect()
291        })
292        .unwrap_or_default()
293}
294
295fn store_debug_breakpoints(debug: &mut Map<String, Value>, entries: &[BreakpointRecord]) {
296    let list = entries
297        .iter()
298        .map(|record| {
299            let mut obj = Map::new();
300            obj.insert("spec".to_string(), Value::String(record.spec.clone()));
301            obj.insert(
302                "step_id".to_string(),
303                Value::String(record.step_id.to_string()),
304            );
305            Value::Object(obj)
306        })
307        .collect::<Vec<_>>();
308    debug.insert("breakpoints".to_string(), Value::Array(list));
309
310    let specs = entries
311        .iter()
312        .map(|record| Value::String(record.spec.clone()))
313        .collect::<Vec<_>>();
314    debug.insert("breakpoint_specs".to_string(), Value::Array(specs));
315}
316
317fn set_active_pause(debug: &mut Map<String, Value>, record: &BreakpointRecord) {
318    let pause = json!({
319        "step_id": record.step_id.to_string(),
320        "spec": record.spec,
321        "started_at": chrono::Utc::now().to_rfc3339(),
322    });
323    debug.insert("active_pause".to_string(), pause);
324}
325
326#[derive(Clone, Debug)]
327struct TraceHeaders {
328    traceparent: String,
329    tracestate: Option<String>,
330    baggage: Option<String>,
331}
332
333fn parse_trace_headers(value: &Value) -> Option<TraceHeaders> {
334    let trace_value = value.get("trace")?;
335    let trace_obj = trace_value.as_object()?;
336    let traceparent = trace_obj
337        .get("traceparent")
338        .and_then(Value::as_str)?
339        .to_string();
340    let tracestate = trace_obj
341        .get("tracestate")
342        .and_then(Value::as_str)
343        .map(|s| s.to_string());
344    let baggage = trace_obj
345        .get("baggage")
346        .and_then(Value::as_str)
347        .map(|s| s.to_string());
348    Some(TraceHeaders {
349        traceparent,
350        tracestate,
351        baggage,
352    })
353}
354
355/// Tunables that shape how the scheduler polls, leases, and back-pressures work.
356#[derive(Debug, Clone)]
357pub struct SchedulerConfig {
358    pub worker_id: String,
359    pub batch_size: i64,
360    pub idle_backoff: Duration,
361    pub active_backoff: Duration,
362    pub reaper_interval: Duration,
363    pub requeue_batch_size: i64,
364    pub max_inflight_tokens: Option<i64>,
365    pub max_inflight_cost: Option<f64>,
366    pub backpressure_delay: Duration,
367    pub transparency_writer: bool,
368    pub transparency_scope: TransparencyScope,
369}
370
371impl Default for SchedulerConfig {
372    fn default() -> Self {
373        let scope = transparency_scope_from_env();
374        let max_inflight_tokens = env::var(MAX_INFLIGHT_TOKENS_ENV)
375            .ok()
376            .and_then(|value| value.parse::<i64>().ok())
377            .filter(|value| *value > 0);
378        let max_inflight_cost = env::var(MAX_INFLIGHT_COST_ENV)
379            .ok()
380            .and_then(|value| value.parse::<f64>().ok())
381            .filter(|value| *value > 0.0);
382        let backpressure_delay = env::var(BACKPRESSURE_DELAY_ENV)
383            .ok()
384            .and_then(|value| value.parse::<u64>().ok())
385            .map(Duration::from_millis)
386            .unwrap_or_else(|| Duration::from_millis(50));
387        Self {
388            worker_id: format!("worker-{}", uuid::Uuid::new_v4()),
389            batch_size: 8,
390            idle_backoff: Duration::from_millis(200),
391            active_backoff: Duration::from_millis(10),
392            reaper_interval: Duration::from_secs(30),
393            requeue_batch_size: 128,
394            max_inflight_tokens,
395            max_inflight_cost,
396            backpressure_delay,
397            transparency_writer: transparency_writer_flag()
398                && !matches!(scope, TransparencyScope::Disabled),
399            transparency_scope: scope,
400        }
401    }
402}
403
404/// Core runtime loop that leases steps from storage and hands them to executors.
405pub struct Scheduler {
406    storage: Arc<Storage>,
407    executors: Arc<ExecutorRegistry>,
408    policy_pack: Arc<RuntimePolicyPack>,
409    artifact_store: Arc<dyn ArtifactStore>,
410    attestation_vault: Arc<dyn AttestationVault>,
411    config: SchedulerConfig,
412    metrics: QueueMetrics,
413    concurrency: Arc<Semaphore>,
414    audit: Arc<Audit>,
415    trace_headers: Mutex<HashMap<RunId, Option<TraceHeaders>>>,
416    inflight: Mutex<InFlightBudget>,
417}
418
419fn transparency_writer_flag() -> bool {
420    let enabled = matches!(
421        std::env::var("FLEETFORGE_TRANSPARENCY_WRITER")
422            .ok()
423            .map(|value| value.trim().to_ascii_lowercase())
424            .unwrap_or_default()
425            .as_str(),
426        "1" | "true" | "yes" | "on"
427    );
428    if enabled && !feature_allowed(LicensedFeature::ScittTransparency) {
429        warn!(
430            "SCITT transparency writer requested but the current license tier does not allow it; disabling writer"
431        );
432        return false;
433    }
434    enabled
435}
436
437fn transparency_scope_from_env() -> TransparencyScope {
438    match std::env::var("FLEETFORGE_TRANSPARENCY_SCOPE")
439        .ok()
440        .map(|value| value.trim().to_ascii_lowercase())
441        .as_deref()
442    {
443        Some("artifacts") => TransparencyScope::Artifacts,
444        Some("runs") => TransparencyScope::Runs,
445        Some("gates") => TransparencyScope::Gates,
446        Some("disabled") => TransparencyScope::Disabled,
447        _ => TransparencyScope::Gates,
448    }
449}
450
451impl Scheduler {
452    pub fn new(
453        storage: Arc<Storage>,
454        executors: Arc<ExecutorRegistry>,
455        policy_pack: Arc<RuntimePolicyPack>,
456        audit: Arc<Audit>,
457        attestation_vault: Arc<dyn AttestationVault>,
458        config: SchedulerConfig,
459    ) -> Self {
460        let artifact_store = storage.artifacts();
461        let metrics = QueueMetrics::new();
462        let permits_i64 = std::cmp::max(config.batch_size, 1);
463        let permits = permits_i64.min(usize::MAX as i64) as usize;
464        Self {
465            storage,
466            executors,
467            policy_pack,
468            artifact_store,
469            attestation_vault,
470            config,
471            metrics,
472            concurrency: Arc::new(Semaphore::new(permits)),
473            audit,
474            trace_headers: Mutex::new(HashMap::new()),
475            inflight: Mutex::new(InFlightBudget::default()),
476        }
477    }
478
479    /// Starts the scheduling loop, spawning workers and the lease reaper.
480    pub async fn run(self: Arc<Self>) -> Result<()> {
481        let reaper = Arc::clone(&self);
482        tokio::spawn(async move {
483            reaper.lease_reaper_loop().await;
484        });
485
486        loop {
487            let claimed = self
488                .claim_ready_steps()
489                .await
490                .context("failed to claim ready steps")?;
491
492            self.metrics.record_queue_depth(claimed.len());
493
494            if claimed.is_empty() {
495                tokio::time::sleep(self.config.idle_backoff).await;
496                continue;
497            }
498
499            for step in claimed {
500                let scheduler = Arc::clone(&self);
501                let semaphore = Arc::clone(&self.concurrency);
502                tokio::spawn(async move {
503                    let _permit = semaphore
504                        .acquire_owned()
505                        .await
506                        .expect("scheduler semaphore closed");
507                    if let Err(err) = scheduler.execute_step(step).await {
508                        error!(error = ?err, "step execution failed");
509                    }
510                });
511            }
512
513            tokio::time::sleep(self.config.active_backoff).await;
514        }
515    }
516
517    async fn lease_reaper_loop(self: Arc<Self>) {
518        loop {
519            let sleep = match self
520                .storage
521                .requeue_expired_leases(self.config.requeue_batch_size)
522                .await
523            {
524                Ok(requeued) => {
525                    if !requeued.is_empty() {
526                        info!(
527                            count = requeued.len(),
528                            "requeued expired leases back to the queue"
529                        );
530                        for (run_id, step_id) in &requeued {
531                            debug!(%run_id, %step_id, "expired lease requeued");
532                        }
533                        // Run another sweep quickly to drain backlog.
534                        self.config.active_backoff
535                    } else {
536                        self.config.reaper_interval
537                    }
538                }
539                Err(err) => {
540                    warn!(error = %err, "lease reaper iteration failed");
541                    self.config.reaper_interval
542                }
543            };
544
545            tokio::time::sleep(sleep).await;
546        }
547    }
548
549    async fn trace_headers_for_run(&self, run_id: RunId) -> Result<Option<TraceHeaders>> {
550        {
551            let cache = self.trace_headers.lock().await;
552            if let Some(entry) = cache.get(&run_id) {
553                return Ok(entry.clone());
554            }
555        }
556
557        let run = self
558            .storage
559            .fetch_run(Uuid::from(run_id))
560            .await
561            .context("failed to load run for trace context")?;
562        let headers = run.and_then(|row| parse_trace_headers(&row.input_ctx));
563        let mut cache = self.trace_headers.lock().await;
564        cache.insert(run_id, headers.clone());
565        Ok(headers)
566    }
567
568    async fn claim_ready_steps(&self) -> Result<Vec<QueuedStep>> {
569        let db_steps = self
570            .storage
571            .claim_queued_steps(self.config.batch_size, &self.config.worker_id)
572            .await?;
573
574        let mut steps = db_steps
575            .into_iter()
576            .map(QueuedStep::try_from)
577            .collect::<Result<Vec<_>>>()?;
578        self.rank_steps(&mut steps);
579        Ok(steps)
580    }
581
582    async fn execute_step(&self, step: QueuedStep) -> Result<()> {
583        let mut spec = step.spec.clone();
584        let mut input_snapshot_value = spec.inputs.clone();
585        let (mut provider, mut provider_version) = resolve_step_providers(&spec);
586        let mut policy_event_log: Option<(String, String, Option<String>, DecisionEffect)> = None;
587        let pack_id = self.policy_pack.id().to_string();
588
589        let observed_at = Utc::now();
590        if let Ok(wait) = observed_at.signed_duration_since(step.created_at).to_std() {
591            self.metrics.record_queue_lag(wait);
592        }
593        let step_slo = build_step_slo(&step, observed_at);
594        if step.attempt > 0 {
595            self.metrics.record_retry();
596        }
597
598        if self.try_pause_for_breakpoint(&step).await? {
599            return Ok(());
600        }
601
602        if let Some(deadline_at) = step.deadline_at {
603            if Utc::now() > deadline_at {
604                let error_payload = json!({
605                    "message": "step deadline exceeded",
606                    "kind": "deadline_exceeded",
607                });
608                self.finish_step(
609                    &step,
610                    &spec,
611                    StepStatus::Failed,
612                    step.attempt + 1,
613                    None,
614                    Some(error_payload),
615                    None,
616                    &BudgetCtx::default(),
617                    Some(input_snapshot_value.clone()),
618                    None,
619                    None,
620                    None,
621                    None,
622                    Vec::new(),
623                    Vec::new(),
624                    None,
625                    None,
626                    step_slo.as_ref(),
627                )
628                .await?;
629
630                let run_trace = self.build_run_trace_for_step(&step).await?;
631                self.schedule_compensation(&step, &run_trace).await?;
632                return Ok(());
633            }
634        }
635
636        let mut policy_records: Vec<PolicyEvaluationRecord> = Vec::new();
637        let policy_outcome = enforce_policy(self.policy_pack.as_ref(), step.run_id, &spec).await?;
638        let policy_record = policy_outcome_to_record(
639            &policy_outcome,
640            step.run_id,
641            step.step_id,
642            TrustBoundary::Scheduler,
643            &pack_id,
644        );
645        policy_records.push(policy_record.clone());
646
647        self.attestation_vault
648            .record(policy_outcome.attestation.clone())
649            .await
650            .context("failed to persist policy attestation")?;
651
652        let att_id_str = policy_outcome.attestation.id.to_string();
653        let subject_label = format!("run:{}:step:{}", step.run_id, step.step_id);
654        record_trust_attributes(
655            &[att_id_str.clone()],
656            Some(subject_label.as_str()),
657            Some(att_id_str.as_str()),
658        );
659
660        match policy_outcome.decision.effect {
661            DecisionEffect::Deny => {
662                let reason = policy_outcome
663                    .decision
664                    .reason
665                    .clone()
666                    .unwrap_or_else(|| "policy denied execution".to_string());
667                warn!(
668                    run = %step.run_id,
669                    step = %step.step_id,
670                    policy = %pack_id,
671                    "policy denied execution: {reason}"
672                );
673
674                let (artifact_value, summary) = persist_policy_decisions(
675                    Arc::clone(&self.artifact_store),
676                    step.run_id,
677                    step.step_id,
678                    &policy_records,
679                )
680                .await
681                .unwrap_or_else(|err| {
682                    warn!(error = %err, "failed to persist policy decisions for denial");
683                    (Value::Null, Value::Null)
684                });
685                let artifact_payload = if artifact_value.is_null() {
686                    None
687                } else {
688                    Some(artifact_value.clone())
689                };
690
691                let artifact_id = artifact_value
692                    .get("sha256")
693                    .and_then(Value::as_str)
694                    .map(|s| s.to_string())
695                    .or_else(|| {
696                        artifact_value
697                            .get("artifact_id")
698                            .and_then(Value::as_str)
699                            .map(|s| s.to_string())
700                    });
701
702                let summary_str = summary
703                    .get("reasons")
704                    .and_then(Value::as_array)
705                    .and_then(|arr| arr.first())
706                    .and_then(Value::as_str)
707                    .map(|s| s.to_string())
708                    .unwrap_or_else(|| reason.clone());
709
710                policy_event_log = Some((
711                    pack_id.clone(),
712                    summary_str.clone(),
713                    artifact_id.clone(),
714                    policy_outcome.decision.effect,
715                ));
716
717                let error_payload = json!({
718                    "message": reason,
719                    "kind": "policy_denied",
720                });
721                self.finish_step(
722                    &step,
723                    &spec,
724                    StepStatus::Failed,
725                    step.attempt,
726                    None,
727                    Some(error_payload),
728                    None,
729                    &BudgetCtx::default(),
730                    Some(input_snapshot_value),
731                    None,
732                    provider.as_deref(),
733                    provider_version.as_deref(),
734                    None,
735                    None,
736                    Vec::new(),
737                    policy_records,
738                    None,
739                    artifact_payload,
740                )
741                .await?;
742                return Ok(());
743            }
744            DecisionEffect::Redact => {
745                if let Some(reason) = policy_outcome.decision.reason.as_deref() {
746                    debug!(
747                        run = %step.run_id,
748                        step = %step.step_id,
749                        policy = %pack_id,
750                        "policy requested redaction: {reason}"
751                    );
752                }
753            }
754            DecisionEffect::Allow => {
755                if let Some(reason) = policy_outcome.decision.reason.as_deref() {
756                    debug!(
757                        run = %step.run_id,
758                        step = %step.step_id,
759                        policy = %pack_id,
760                        "policy allow with advisory: {reason}"
761                    );
762                }
763            }
764        }
765
766        let mut input_snapshot = Some(input_snapshot_value.clone());
767
768        let budget_request = BudgetRequest::from_step(&step);
769        if self.should_backpressure(&budget_request).await {
770            self.defer_for_backpressure(&step).await?;
771            return Ok(());
772        }
773        let mut budget_ctx = BudgetCtx::default();
774        match self.reserve_budget(step.run_id, &budget_request).await? {
775            BudgetDecision::Granted(ctx) => {
776                budget_ctx = ctx;
777            }
778            BudgetDecision::Unlimited(ctx) => {
779                budget_ctx = ctx;
780            }
781            BudgetDecision::Denied { reason } => {
782                let error_payload = json!({
783                    "message": reason,
784                    "kind": "budget_exceeded",
785                });
786                self.finish_step(
787                    &step,
788                    &spec,
789                    StepStatus::Failed,
790                    step.attempt,
791                    None,
792                    Some(error_payload),
793                    None,
794                    &BudgetCtx::default(),
795                    input_snapshot.clone(),
796                    None,
797                    None,
798                    provider.as_deref(),
799                    provider_version.as_deref(),
800                    None,
801                    None,
802                    Vec::new(),
803                    Vec::new(),
804                    None,
805                    None,
806                    step_slo.as_ref(),
807                )
808                .await?;
809                return Ok(());
810            }
811        }
812        let policy_decision_id = policy_outcome.attestation.id;
813
814        if let Err(err) = self
815            .storage
816            .update_step_estimate(
817                Uuid::from(step.run_id),
818                Uuid::from(step.step_id),
819                budget_ctx.tokens_reserved,
820                budget_ctx.cost_reserved,
821            )
822            .await
823        {
824            warn!(
825                error = %err,
826                run = %step.run_id,
827                step = %step.step_id,
828                "failed to persist budget estimate"
829            );
830        }
831
832        self.metrics.record_budget_projection(&budget_ctx);
833
834        let executor = self
835            .executors
836            .get(spec.r#type)
837            .ok_or_else(|| anyhow!("no executor registered for kind {}", spec.r#type))?;
838
839        let run_uuid = Uuid::from(step.run_id);
840        let run_span_id = span_id_from_uuid(&run_uuid);
841        let trace_headers = self.trace_headers_for_run(step.run_id).await?;
842        let run_scope = RunScope::new(step.run_id.to_string());
843        let run_trace = if let Some(headers) = trace_headers {
844            match TraceContext::from_w3c(
845                run_scope.clone(),
846                &headers.traceparent,
847                headers.tracestate.as_deref(),
848                headers.baggage.as_deref(),
849            ) {
850                Ok(ctx) => ctx.with_span_id(run_span_id),
851                Err(err) => {
852                    warn!(
853                        run = %step.run_id,
854                        error = %err,
855                        "failed to parse trace headers; using fallback trace id"
856                    );
857                    TraceContext::new(run_scope.clone())
858                        .with_trace_id(trace_id_from_uuid(&run_uuid))
859                        .with_span_id(run_span_id)
860                }
861            }
862        } else {
863            TraceContext::new(run_scope.clone())
864                .with_trace_id(trace_id_from_uuid(&run_uuid))
865                .with_span_id(run_span_id)
866        };
867        let (mut step_span, step_trace) = span_step!(
868            parent: &run_trace,
869            step: &step,
870            spec: &spec,
871            worker: &self.config.worker_id
872        );
873        let _span_guard = step_span.enter();
874
875        let guardrails = PolicyBundle::for_policy(&spec.policy);
876
877        if !guardrails.is_empty() {
878            let scheduler_payload =
879                build_scheduler_payload(&spec, &step, &budget_ctx, step_slo.as_ref());
880            let outcome = guardrails
881                .evaluate(
882                    TrustBoundary::Scheduler,
883                    Some(step.run_id),
884                    Some(step.step_id),
885                    scheduler_payload,
886                )
887                .await?;
888
889            match outcome.effect {
890                DecisionEffect::Deny => {
891                    let mut events = guardrails.drain_events();
892                    if !policy_records.is_empty() {
893                        events.extend(policy_records.clone());
894                    }
895                    let mut policy_artifact: Option<(Value, Value)> = None;
896                    let mut artifact_id: Option<String> = None;
897                    if !events.is_empty() {
898                        let policy_pack = policy_pack_name(&spec.policy);
899                        let dominant_effect = dominant_policy_effect(&events);
900                        let summary_text = summarize_policy_events(&events);
901                        let effect_label = decision_effect_label(dominant_effect);
902                        match persist_policy_decisions(
903                            Arc::clone(&self.artifact_store),
904                            step.run_id,
905                            step.step_id,
906                            &events,
907                        )
908                        .await
909                        {
910                            Ok((artifact_meta, summary)) => {
911                                artifact_id = artifact_meta
912                                    .get("sha256")
913                                    .and_then(Value::as_str)
914                                    .map(|s| s.to_string())
915                                    .or_else(|| {
916                                        artifact_meta
917                                            .get("artifact_id")
918                                            .and_then(Value::as_str)
919                                            .map(|s| s.to_string())
920                                    });
921                                policy_artifact = Some((artifact_meta, summary));
922                            }
923                            Err(err) => {
924                                warn!(
925                                    error = %err,
926                                    run = %step.run_id,
927                                    step = %step.step_id,
928                                    "failed to persist policy decisions"
929                                );
930                            }
931                        }
932                        step_span.record("fleetforge_policy_pack", policy_pack.as_str());
933                        step_span.record("fleetforge_policy_effect", effect_label.as_str());
934                        event_policy!(
935                            pack: policy_pack.as_str(),
936                            effect: effect_label.as_str(),
937                            summary: summary_text.as_str(),
938                            artifact: artifact_id.as_deref()
939                        );
940                    }
941
942                    let mut error_payload = json!({
943                        "message": outcome.summary(),
944                        "kind": "policy_denied",
945                    });
946                    if let Some((ref artifact_meta, ref summary)) = policy_artifact {
947                        attach_policy_summary(
948                            &mut error_payload,
949                            summary.clone(),
950                            artifact_meta.clone(),
951                        );
952                    }
953                    self.finish_step(
954                        &step,
955                        &spec,
956                        StepStatus::Failed,
957                        step.attempt,
958                        None,
959                        Some(error_payload),
960                        None,
961                        &budget_ctx,
962                        Some(input_snapshot_value.clone()),
963                        None,
964                        provider.as_deref(),
965                        provider_version.as_deref(),
966                        None,
967                        None,
968                        Vec::new(),
969                        events,
970                        None,
971                        None,
972                        step_slo.as_ref(),
973                    )
974                    .await?;
975                    return Ok(());
976                }
977                DecisionEffect::Redact => {
978                    if let Some(inputs) = outcome.value.get("inputs").cloned() {
979                        spec.inputs = inputs;
980                    }
981                    if let Some(policy_value) = outcome.value.get("policy").cloned() {
982                        spec.policy = policy_value;
983                    }
984                    if let Some(execution_value) = outcome.value.get("execution") {
985                        if let Ok(updated_execution) =
986                            serde_json::from_value(execution_value.clone())
987                        {
988                            spec.execution = updated_execution;
989                        }
990                    }
991                    input_snapshot_value = spec.inputs.clone();
992                    input_snapshot = Some(input_snapshot_value.clone());
993                    let (prov, prov_version) = resolve_step_providers(&spec);
994                    provider = prov;
995                    provider_version = prov_version;
996                }
997                DecisionEffect::Allow => {}
998            }
999        }
1000
1001        self.storage
1002            .mark_step_running(
1003                Uuid::from(step.run_id),
1004                Uuid::from(step.step_id),
1005                &self.config.worker_id,
1006                Some(&input_snapshot_value),
1007                provider.as_deref(),
1008                provider_version.as_deref(),
1009            )
1010            .await
1011            .context("failed to mark step running")?;
1012
1013        self.add_inflight(budget_ctx.tokens_reserved, budget_ctx.cost_reserved)
1014            .await;
1015
1016        let untrusted_inputs = Untrusted(spec.inputs.clone());
1017        let attempt_number = step.attempt + 1;
1018        let capability_token = match self.issue_capability_token(
1019            step.run_id,
1020            step.step_id,
1021            attempt_number,
1022            &spec,
1023            &budget_ctx,
1024        ) {
1025            Ok(token) => {
1026                let token_id = token.token_id().to_string();
1027                step_span.record("trust.capability_token_id", token_id.as_str());
1028                record_attribute_str("trust.capability_token_id", &token_id);
1029                Some(token)
1030            }
1031            Err(err) => {
1032                warn!(
1033                    run = %step.run_id,
1034                    step = %step.step_id,
1035                    error = %err,
1036                    "failed to mint capability token"
1037                );
1038                let mut error_payload = json!({
1039                    "message": "capability token mint failed",
1040                    "kind": "capability_mint_failed",
1041                    "error": err.to_string(),
1042                });
1043                if let Some(obj) = error_payload.as_object_mut() {
1044                    obj.insert(
1045                        "step".into(),
1046                        json!({ "run_id": step.run_id.to_string(), "step_id": step.step_id.to_string() }),
1047                    );
1048                }
1049                self.finish_step(
1050                    &step,
1051                    &spec,
1052                    StepStatus::Failed,
1053                    step.attempt,
1054                    None,
1055                    Some(error_payload),
1056                    None,
1057                    &budget_ctx,
1058                    Some(input_snapshot_value.clone()),
1059                    None,
1060                    provider.as_deref(),
1061                    provider_version.as_deref(),
1062                    Some(&step_trace),
1063                    Some(&run_trace),
1064                    Vec::new(),
1065                    Vec::new(),
1066                    None,
1067                    None,
1068                    step_slo.as_ref(),
1069                )
1070                .await?;
1071                return Ok(());
1072            }
1073        };
1074
1075        if let Some(violation) = self.validate_capability_guard(
1076            capability_token.as_ref(),
1077            &spec,
1078            &budget_ctx,
1079            step.run_id,
1080            step.step_id,
1081        )? {
1082            let summary_text = violation
1083                .record
1084                .reasons
1085                .first()
1086                .cloned()
1087                .unwrap_or_else(|| violation.reason.clone());
1088            step_span.record("fleetforge_policy_pack", "capability_guard");
1089            step_span.record("fleetforge_policy_effect", "deny");
1090
1091            let records = vec![violation.record.clone()];
1092            let (artifact_meta, summary) = persist_policy_decisions(
1093                Arc::clone(&self.artifact_store),
1094                step.run_id,
1095                step.step_id,
1096                &records,
1097            )
1098            .await?;
1099
1100            let artifact_id = artifact_meta
1101                .get("sha256")
1102                .and_then(Value::as_str)
1103                .map(|s| s.to_string())
1104                .or_else(|| {
1105                    artifact_meta
1106                        .get("artifact_id")
1107                        .and_then(Value::as_str)
1108                        .map(|s| s.to_string())
1109                });
1110
1111            event_policy!(
1112                pack: "capability_guard",
1113                effect: "deny",
1114                summary: summary_text.as_str(),
1115                artifact: artifact_id.as_deref()
1116            );
1117
1118            let mut error_payload = json!({
1119                "message": summary_text,
1120                "kind": "capability_denied",
1121            });
1122            if let Some(token_id) = violation.token_id.as_ref() {
1123                if let Some(obj) = error_payload.as_object_mut() {
1124                    obj.insert(
1125                        "capability_token_id".into(),
1126                        Value::String(token_id.clone()),
1127                    );
1128                }
1129            }
1130            if let Some(token) = capability_token.as_ref() {
1131                if let Some(obj) = error_payload.as_object_mut() {
1132                    obj.insert(
1133                        "capability_token".into(),
1134                        to_value(token).expect("capability token is serialisable"),
1135                    );
1136                }
1137            }
1138            attach_policy_summary(&mut error_payload, summary.clone(), artifact_meta.clone());
1139
1140            warn!(
1141                run = %step.run_id,
1142                step = %step.step_id,
1143                reason = %violation.reason,
1144                "capability guard denied step execution"
1145            );
1146
1147            self.finish_step(
1148                &step,
1149                &spec,
1150                StepStatus::Failed,
1151                step.attempt,
1152                None,
1153                Some(error_payload),
1154                None,
1155                &budget_ctx,
1156                Some(input_snapshot_value.clone()),
1157                None,
1158                provider.as_deref(),
1159                provider_version.as_deref(),
1160                Some(&step_trace),
1161                Some(&run_trace),
1162                Vec::new(),
1163                records,
1164                None,
1165                None,
1166                step_slo.as_ref(),
1167            )
1168            .await?;
1169            return Ok(());
1170        }
1171        let idempotency_key = format!("{}:{}:{}", step.run_id, step.step_id, attempt_number);
1172        let ctx = StepCtx {
1173            run_id: step.run_id,
1174            step_id: step.step_id,
1175            attempt: attempt_number,
1176            max_attempts: step.max_attempts,
1177            spec: spec.clone(),
1178            untrusted_inputs,
1179            artifacts: self.artifact_store.clone(),
1180            policy: Arc::clone(&self.policy_pack),
1181            policy_decision_id: Some(policy_decision_id),
1182            guardrails,
1183            trust: spec.trust.clone(),
1184            trust_origin: spec.trust_origin.clone(),
1185            budget: budget_ctx.clone(),
1186            trace: step_trace.clone(),
1187            checkpoint: step.checkpoint.clone(),
1188            idempotency_key,
1189            capability_token: capability_token.clone(),
1190            slo: step_slo.clone(),
1191        };
1192
1193        let start = Instant::now();
1194        let execution = executor.execute(&ctx).await;
1195        let latency = start.elapsed();
1196        step_span.record("fleetforge_duration_ms", latency.as_secs_f64() * 1000.0);
1197        let mut guardrail_events = ctx.guardrails.drain_events();
1198        if !policy_records.is_empty() {
1199            guardrail_events.extend(policy_records.clone());
1200        }
1201        let mut policy_artifact = None;
1202        if !guardrail_events.is_empty() {
1203            let mut policy_pack = policy_pack_name(&spec.policy);
1204            if policy_pack == "baseline" && !policy_records.is_empty() {
1205                policy_pack = pack_id.clone();
1206            }
1207            let dominant_effect = dominant_policy_effect(&guardrail_events);
1208            let summary_text = summarize_policy_events(&guardrail_events);
1209            match persist_policy_decisions(
1210                ctx.artifacts.clone(),
1211                step.run_id,
1212                step.step_id,
1213                &guardrail_events,
1214            )
1215            .await
1216            {
1217                Ok(value) => {
1218                    let (artifact_meta, summary) = value;
1219                    let artifact_id = artifact_meta
1220                        .get("sha256")
1221                        .and_then(Value::as_str)
1222                        .map(|s| s.to_string())
1223                        .or_else(|| {
1224                            artifact_meta
1225                                .get("artifact_id")
1226                                .and_then(Value::as_str)
1227                                .map(|s| s.to_string())
1228                        });
1229                    policy_event_log = Some((
1230                        policy_pack.clone(),
1231                        summary_text.clone(),
1232                        artifact_id.clone(),
1233                        dominant_effect,
1234                    ));
1235                    policy_artifact = Some((artifact_meta, summary));
1236                }
1237                Err(err) => {
1238                    warn!(
1239                        error = %err,
1240                        run = %step.run_id,
1241                        step = %step.step_id,
1242                        "failed to persist policy decisions"
1243                    );
1244                    policy_event_log = Some((
1245                        policy_pack.clone(),
1246                        summary_text.clone(),
1247                        None,
1248                        dominant_effect,
1249                    ));
1250                }
1251            }
1252            if policy_event_log.is_none() {
1253                policy_event_log = Some((
1254                    policy_pack.clone(),
1255                    summary_text.clone(),
1256                    None,
1257                    dominant_effect,
1258                ));
1259            }
1260        }
1261
1262        if let Some((pack, summary, artifact_id, effect)) = &policy_event_log {
1263            let effect_label = decision_effect_label(*effect);
1264            step_span.record("fleetforge_policy_pack", pack.as_str());
1265            step_span.record("fleetforge_policy_effect", effect_label.as_str());
1266            event_policy!(
1267                pack: pack.as_str(),
1268                effect: effect_label.as_str(),
1269                summary: summary.as_str(),
1270                artifact: artifact_id.as_deref()
1271            );
1272            tracing::info!(
1273                target: "fleetforge.policy",
1274                run = %step.run_id,
1275                step = %step.step_id,
1276                pack = %pack,
1277                effect = %effect_label,
1278                summary = %summary,
1279                artifact = %artifact_id.clone().unwrap_or_default(),
1280                "policy evaluation"
1281            );
1282        }
1283
1284        match execution {
1285            Ok(mut result) => {
1286                if result.capability_token.is_none() {
1287                    if let Some(token) = ctx.capability_token.clone() {
1288                        result.capability_token = Some(token);
1289                    }
1290                }
1291                let StepExecutionResult {
1292                    mut output,
1293                    usage,
1294                    provider: result_provider,
1295                    provider_version: result_provider_version,
1296                    tool_events,
1297                    checkpoint,
1298                    capability_token: result_capability_token,
1299                } = result;
1300
1301                let capability_token =
1302                    result_capability_token.or_else(|| ctx.capability_token.clone());
1303
1304                if let Some((artifact_meta, summary)) = policy_artifact {
1305                    attach_policy_summary(&mut output, summary, artifact_meta);
1306                }
1307
1308                if self.config.transparency_writer
1309                    && self.config.transparency_scope.includes_artifacts()
1310                {
1311                    if let Some(artifacts_value) = output.get_mut("artifacts") {
1312                        self.decorate_artifacts(step.run_id, step.step_id, artifacts_value)
1313                            .await?;
1314                    }
1315                }
1316
1317                if let Some(token) = capability_token.as_ref() {
1318                    if let Some(map) = output.as_object_mut() {
1319                        map.insert(
1320                            "capability_token".into(),
1321                            to_value(token).expect("capability token serialisable"),
1322                        );
1323                        map.insert(
1324                            "capability_token_id".into(),
1325                            Value::String(token.token_id().to_string()),
1326                        );
1327                    }
1328                }
1329
1330                if let Some(usage) = usage {
1331                    budget_ctx.tokens_actual = usage.tokens;
1332                    budget_ctx.cost_actual = usage.cost;
1333                    if let Some(tokens) = usage.tokens {
1334                        step_span.record("fleetforge_tokens_total", tokens);
1335                    }
1336                    if let Some(cost) = usage.cost {
1337                        step_span.record("fleetforge_cost_usd", cost);
1338                    }
1339                }
1340                if let Some(p) = result_provider {
1341                    provider = Some(p);
1342                }
1343                if let Some(v) = result_provider_version {
1344                    provider_version = Some(v);
1345                }
1346                step_span.record("fleetforge_status", "succeeded");
1347
1348                let mut checkpoint_value = checkpoint.clone();
1349                if checkpoint_value.is_none() && step.spec.execution.checkpoint {
1350                    checkpoint_value = Some(output.clone());
1351                }
1352                let artifacts_value = output.get("artifacts").cloned();
1353                let output_snapshot_value = output.clone();
1354
1355                self.finish_step(
1356                    &step,
1357                    &spec,
1358                    StepStatus::Succeeded,
1359                    step.attempt + 1,
1360                    Some(output.clone()),
1361                    None,
1362                    Some(latency),
1363                    &budget_ctx,
1364                    input_snapshot.clone(),
1365                    Some(output_snapshot_value),
1366                    provider.as_deref(),
1367                    provider_version.as_deref(),
1368                    Some(&ctx.trace),
1369                    Some(&run_trace),
1370                    tool_events,
1371                    guardrail_events.clone(),
1372                    checkpoint_value,
1373                    artifacts_value,
1374                    ctx.slo.as_ref(),
1375                )
1376                .await
1377            }
1378            Err(err) => {
1379                step_span.record("fleetforge_status", "failed");
1380                let error_message = err.to_string();
1381                step_span.record("error_message", error_message.as_str());
1382                let mut error_payload = json!({
1383                    "message": &error_message,
1384                    "kind": "execution_error"
1385                });
1386                if let Some((artifact_meta, summary)) = policy_artifact {
1387                    attach_policy_summary(&mut error_payload, summary, artifact_meta);
1388                }
1389                if let Some(token) = ctx.capability_token.as_ref() {
1390                    if let Some(map) = error_payload.as_object_mut() {
1391                        map.insert(
1392                            "capability_token".into(),
1393                            to_value(token).expect("capability token serialisable"),
1394                        );
1395                        map.insert(
1396                            "capability_token_id".into(),
1397                            Value::String(token.token_id().to_string()),
1398                        );
1399                    }
1400                }
1401                insert_trace(&mut error_payload, &ctx.trace);
1402
1403                let attempt_number = step.attempt + 1;
1404                if attempt_number < step.max_attempts {
1405                    self.release_inflight(budget_ctx.tokens_reserved, budget_ctx.cost_reserved)
1406                        .await;
1407                    self.schedule_retry(&step, error_payload, &budget_ctx, &ctx.trace, &run_trace)
1408                        .await?;
1409                    return Ok(());
1410                }
1411
1412                self.finish_step(
1413                    &step,
1414                    &spec,
1415                    StepStatus::Failed,
1416                    attempt_number,
1417                    None,
1418                    Some(error_payload),
1419                    Some(latency),
1420                    &budget_ctx,
1421                    input_snapshot,
1422                    None,
1423                    provider.as_deref(),
1424                    provider_version.as_deref(),
1425                    Some(&ctx.trace),
1426                    Some(&run_trace),
1427                    Vec::new(),
1428                    guardrail_events.clone(),
1429                    None,
1430                    None,
1431                    ctx.slo.as_ref(),
1432                )
1433                .await?;
1434
1435                self.schedule_compensation(&step, &run_trace).await
1436            }
1437        }
1438    }
1439
1440    fn rank_steps(&self, steps: &mut [QueuedStep]) {
1441        if steps.len() <= 1 {
1442            return;
1443        }
1444
1445        let now = Utc::now();
1446        let mut scores: HashMap<StepId, SloScore> = HashMap::with_capacity(steps.len());
1447        let mut original_positions: HashMap<StepId, usize> = HashMap::with_capacity(steps.len());
1448
1449        for (idx, step) in steps.iter().enumerate() {
1450            let contract = extract_slo_contract(&step.spec);
1451            let score = compute_slo_score(step, contract.as_ref(), now);
1452            if score.has_contract {
1453                self.metrics.record_slo_slack(Some(score.slack_ms));
1454            }
1455            scores.insert(step.step_id, score);
1456            original_positions.insert(step.step_id, idx);
1457        }
1458
1459        steps.sort_by(|a, b| {
1460            let sa = scores.get(&a.step_id).expect("slo score missing");
1461            let sb = scores.get(&b.step_id).expect("slo score missing");
1462            sa.compare(sb).then_with(|| a.created_at.cmp(&b.created_at))
1463        });
1464
1465        for (idx, step) in steps.iter().enumerate() {
1466            if let Some(score) = scores.get(&step.step_id) {
1467                if score.has_contract && score.slack_ms < 0 {
1468                    if let Some(original) = original_positions.get(&step.step_id) {
1469                        if idx < *original {
1470                            self.metrics.record_slo_preemption();
1471                        }
1472                    }
1473                }
1474            }
1475        }
1476    }
1477
1478    async fn finish_step(
1479        &self,
1480        step: &QueuedStep,
1481        spec_snapshot: &StepSpec,
1482        status: StepStatus,
1483        attempt: i32,
1484        output: Option<Value>,
1485        error: Option<Value>,
1486        latency: Option<Duration>,
1487        budget: &BudgetCtx,
1488        input_snapshot: Option<Value>,
1489        output_snapshot: Option<Value>,
1490        provider: Option<&str>,
1491        provider_version: Option<&str>,
1492        step_trace: Option<&TraceContext>,
1493        run_trace: Option<&TraceContext>,
1494        tool_events: Vec<ToolEventRecord>,
1495        guardrail_events: Vec<PolicyEvaluationRecord>,
1496        checkpoint: Option<Value>,
1497        artifacts: Option<Value>,
1498        slo: Option<&StepSlo>,
1499    ) -> Result<()> {
1500        let mut tx = self.storage.pool().begin().await?;
1501        let run_uuid = Uuid::from(step.run_id);
1502        let step_uuid = Uuid::from(step.step_id);
1503
1504        let run_trace_ctx = if let Some(trace) = run_trace {
1505            trace.clone()
1506        } else {
1507            self.build_run_trace_for_step(step).await?
1508        };
1509        let step_trace_ctx = if let Some(trace) = step_trace {
1510            trace.clone()
1511        } else {
1512            self.build_step_trace_for_step(step, &run_trace_ctx)
1513        };
1514
1515        let artifacts_for_event = artifacts.clone();
1516
1517        self.storage
1518            .update_step_status_tx(
1519                &mut tx,
1520                run_uuid,
1521                step_uuid,
1522                models::StepStatus::from(status),
1523                Some(attempt),
1524                output.clone(),
1525                error.clone(),
1526                input_snapshot.clone(),
1527                output_snapshot.clone(),
1528                checkpoint.clone(),
1529                provider,
1530                provider_version,
1531            )
1532            .await?;
1533
1534        let actual_tokens = budget
1535            .tokens_actual
1536            .unwrap_or(budget.tokens_reserved)
1537            .max(0);
1538        let actual_cost = budget.cost_actual.unwrap_or(budget.cost_reserved).max(0.0);
1539
1540        self.storage
1541            .update_step_usage_tx(
1542                &mut tx,
1543                run_uuid,
1544                step_uuid,
1545                Some(actual_tokens),
1546                Some(actual_cost),
1547            )
1548            .await?;
1549
1550        let tokens_delta = actual_tokens - budget.tokens_reserved;
1551        let cost_delta = actual_cost - budget.cost_reserved;
1552        let mut tokens_total = if budget.tracked {
1553            budget.tokens_used
1554        } else {
1555            actual_tokens
1556        };
1557        let mut cost_total = if budget.tracked {
1558            budget.cost_used
1559        } else {
1560            actual_cost
1561        };
1562
1563        if budget.tracked {
1564            self.storage
1565                .reconcile_budget_usage_tx(&mut tx, run_uuid, tokens_delta, cost_delta)
1566                .await?;
1567            tokens_total += tokens_delta;
1568            cost_total += cost_delta;
1569        }
1570
1571        let latency_ms = latency.map(|d| d.as_secs_f64() * 1000.0);
1572        let spec_snapshot_json = serde_json::to_value(spec_snapshot)
1573            .context("failed to serialise step spec snapshot")?;
1574        let budget_snapshot_json = guardrail_budget_view(budget, slo);
1575        let guardrail_events_json = if guardrail_events.is_empty() {
1576            None
1577        } else {
1578            Some(serde_json::to_value(&guardrail_events)?)
1579        };
1580        let tool_events_json = if tool_events.is_empty() {
1581            None
1582        } else {
1583            Some(tool_events_to_value(&tool_events))
1584        };
1585
1586        self.storage
1587            .insert_step_attempt_tx(
1588                &mut tx,
1589                models::NewStepAttempt {
1590                    run_id: run_uuid,
1591                    step_id: step_uuid,
1592                    attempt,
1593                    status: models::StepStatus::from(status),
1594                    latency_ms,
1595                    spec_snapshot: spec_snapshot_json.clone(),
1596                    input_snapshot: input_snapshot.clone(),
1597                    output_snapshot: output_snapshot.clone(),
1598                    error_snapshot: error.clone(),
1599                    checkpoint: checkpoint.clone(),
1600                    budget_snapshot: Some(budget_snapshot_json.clone()),
1601                    guardrail_events: guardrail_events_json.clone(),
1602                    tool_events: tool_events_json.clone(),
1603                    artifacts: artifacts.clone(),
1604                },
1605            )
1606            .await?;
1607
1608        let mut newly_ready = Vec::new();
1609        let mut newly_skipped = Vec::new();
1610
1611        match status {
1612            StepStatus::Succeeded => {
1613                newly_ready = self
1614                    .storage
1615                    .unlock_successors_tx(&mut tx, run_uuid, step_uuid)
1616                    .await?;
1617            }
1618            StepStatus::Failed => {
1619                newly_skipped = self
1620                    .storage
1621                    .skip_downstream_steps_tx(&mut tx, run_uuid, step_uuid)
1622                    .await?;
1623            }
1624            _ => {}
1625        }
1626
1627        let (previous_run_status, current_run_status) = self
1628            .storage
1629            .refresh_run_status_tx(&mut tx, run_uuid)
1630            .await?;
1631
1632        let mut payload = json!({
1633            "run_id": step.run_id,
1634            "step_id": step.step_id,
1635            "status": status.as_str(),
1636            "output": output,
1637            "error": error,
1638            "attempt": attempt,
1639            "priority": step.priority,
1640            "provider": provider,
1641            "provider_version": provider_version,
1642            "snapshots": {
1643                "input": input_snapshot,
1644                "output": output_snapshot,
1645                "checkpoint": checkpoint,
1646            },
1647            "budget": {
1648                "tracked": budget.tracked,
1649                "reserved": {
1650                    "tokens": budget.tokens_reserved,
1651                    "cost": budget.cost_reserved,
1652                },
1653                "actual": {
1654                    "tokens": actual_tokens,
1655                    "cost": actual_cost,
1656                },
1657                "deltas": {
1658                    "tokens": tokens_delta,
1659                    "cost": cost_delta,
1660                },
1661                "totals": {
1662                    "tokens_used": tokens_total,
1663                    "cost_used": cost_total,
1664                },
1665                "limits": {
1666                    "tokens": budget.token_limit,
1667                    "cost": budget.cost_limit,
1668                    "time_secs": budget.time_limit.map(|d| d.as_secs()),
1669                },
1670                "snapshot": budget_snapshot_json.clone(),
1671            },
1672            "spec_snapshot": spec_snapshot_json.clone(),
1673            "guardrail_events": guardrail_events_json.clone(),
1674            "tool_events": tool_events_json.clone(),
1675            "latency_ms": latency_ms,
1676            "artifacts": artifacts_for_event,
1677        });
1678        insert_trace(&mut payload, &step_trace_ctx);
1679
1680        self.storage
1681            .enqueue_outbox_event(
1682                &mut tx,
1683                NewOutboxEvent {
1684                    run_id: run_uuid,
1685                    step_id: Some(step_uuid),
1686                    kind: format!("step_{}", status.as_str()),
1687                    payload,
1688                },
1689            )
1690            .await?;
1691
1692        for event in tool_events {
1693            let mut payload = event.payload;
1694            insert_trace(&mut payload, &event.trace);
1695            self.storage
1696                .enqueue_outbox_event(
1697                    &mut tx,
1698                    NewOutboxEvent {
1699                        run_id: run_uuid,
1700                        step_id: Some(step_uuid),
1701                        kind: event.kind,
1702                        payload,
1703                    },
1704                )
1705                .await?;
1706        }
1707
1708        self.storage
1709            .insert_cost_ledger_tx(
1710                &mut tx,
1711                run_uuid,
1712                step_uuid,
1713                attempt,
1714                status.as_str(),
1715                budget.tokens_reserved,
1716                budget.cost_reserved,
1717                Some(actual_tokens),
1718                Some(actual_cost),
1719            )
1720            .await?;
1721
1722        for queued_step_id in &newly_ready {
1723            let mut payload = json!({
1724                "run_id": step.run_id,
1725                "step_id": StepId::from(*queued_step_id),
1726                "status": StepStatus::Queued.as_str(),
1727                "reason": "dependencies_satisfied",
1728            });
1729            insert_trace(&mut payload, &run_trace_ctx);
1730
1731            self.storage
1732                .enqueue_outbox_event(
1733                    &mut tx,
1734                    NewOutboxEvent {
1735                        run_id: run_uuid,
1736                        step_id: Some(*queued_step_id),
1737                        kind: "step_queued".to_string(),
1738                        payload,
1739                    },
1740                )
1741                .await?;
1742        }
1743
1744        for skipped_step_id in &newly_skipped {
1745            let mut payload = json!({
1746                "run_id": step.run_id,
1747                "step_id": StepId::from(*skipped_step_id),
1748                "status": StepStatus::Skipped.as_str(),
1749                "reason": "dependency_failed",
1750            });
1751            insert_trace(&mut payload, &run_trace_ctx);
1752
1753            self.storage
1754                .enqueue_outbox_event(
1755                    &mut tx,
1756                    NewOutboxEvent {
1757                        run_id: run_uuid,
1758                        step_id: Some(*skipped_step_id),
1759                        kind: "step_skipped".to_string(),
1760                        payload,
1761                    },
1762                )
1763                .await?;
1764        }
1765
1766        if previous_run_status != current_run_status {
1767            let mut payload = json!({
1768                "run_id": step.run_id,
1769                "previous_status": previous_run_status.as_str(),
1770                "status": current_run_status.as_str(),
1771            });
1772            insert_trace(&mut payload, &run_trace_ctx);
1773
1774            self.storage
1775                .enqueue_outbox_event(
1776                    &mut tx,
1777                    NewOutboxEvent {
1778                        run_id: run_uuid,
1779                        step_id: None,
1780                        kind: format!("run_{}", current_run_status.as_str()),
1781                        payload,
1782                    },
1783                )
1784                .await?;
1785
1786            if matches!(
1787                current_run_status,
1788                models::RunStatus::Succeeded | models::RunStatus::Failed
1789            ) {
1790                if let Err(err) = self.emit_run_bom(step.run_id, current_run_status).await {
1791                    warn!(
1792                        run = %step.run_id,
1793                        error = %err,
1794                        "failed to generate BOM for run"
1795                    );
1796                }
1797            }
1798        }
1799
1800        if actual_tokens > 0 || actual_cost > 0.0 {
1801            self.storage
1802                .record_billing_usage_tx(&mut tx, run_uuid, actual_tokens, actual_cost)
1803                .await?;
1804        }
1805
1806        tx.commit().await?;
1807
1808        let tenant_uuid = match self.storage.fetch_run_tenant(run_uuid).await {
1809            Ok(value) => value.and_then(|s| Uuid::parse_str(&s).ok()),
1810            Err(err) => {
1811                warn!(error = %err, run = %step.run_id, "failed to fetch tenant for audit");
1812                None
1813            }
1814        };
1815
1816        let audit_resource = json!({
1817            "run_id": step.run_id.to_string(),
1818            "step_id": step.step_id.to_string(),
1819            "status": status.as_str(),
1820            "attempt": attempt,
1821            "worker": self.config.worker_id,
1822        });
1823
1824        if let Err(err) = self
1825            .audit
1826            .append(
1827                &self.config.worker_id,
1828                tenant_uuid,
1829                &format!("step.{}", status.as_str()),
1830                &audit_resource,
1831            )
1832            .await
1833        {
1834            warn!(error = %err, run = %step.run_id, step = %step.step_id, "failed to append audit log for step");
1835        }
1836
1837        if let Some(latency) = latency {
1838            self.metrics.record_latency(latency);
1839        }
1840        self.metrics
1841            .record_budget_consumed(actual_tokens, actual_cost);
1842
1843        if budget.tracked {
1844            let remaining_tokens = budget.token_limit.map(|limit| {
1845                let diff = limit - tokens_total;
1846                if diff < 0 {
1847                    0
1848                } else {
1849                    diff
1850                }
1851            });
1852            let remaining_cost = budget.cost_limit.map(|limit| (limit - cost_total).max(0.0));
1853            self.metrics
1854                .record_budget_remaining(remaining_tokens, remaining_cost);
1855        }
1856
1857        self.release_inflight(budget.tokens_reserved, budget.cost_reserved)
1858            .await;
1859
1860        Ok(())
1861    }
1862
1863    fn tool_events_to_value(events: &[ToolEventRecord]) -> Value {
1864        Value::Array(
1865            events
1866                .iter()
1867                .map(|event| {
1868                    let mut object = Map::new();
1869                    object.insert("kind".to_string(), Value::String(event.kind.clone()));
1870                    object.insert("payload".to_string(), event.payload.clone());
1871                    object.insert("trace".to_string(), event.trace.to_json());
1872                    Value::Object(object)
1873                })
1874                .collect(),
1875        )
1876    }
1877
1878    async fn build_run_trace_for_step(&self, step: &QueuedStep) -> Result<TraceContext> {
1879        let run_uuid = Uuid::from(step.run_id);
1880        let run_span_id = span_id_from_uuid(&run_uuid);
1881        let trace_headers = self.trace_headers_for_run(step.run_id).await?;
1882        let run_scope = RunScope::new(step.run_id.to_string());
1883
1884        let run_trace = if let Some(headers) = trace_headers {
1885            match TraceContext::from_w3c(
1886                run_scope.clone(),
1887                &headers.traceparent,
1888                headers.tracestate.as_deref(),
1889                headers.baggage.as_deref(),
1890            ) {
1891                Ok(ctx) => ctx.with_span_id(run_span_id),
1892                Err(err) => {
1893                    warn!(
1894                        run = %step.run_id,
1895                        error = %err,
1896                        "failed to parse trace headers; using fallback trace id"
1897                    );
1898                    TraceContext::new(run_scope.clone())
1899                        .with_trace_id(trace_id_from_uuid(&run_uuid))
1900                        .with_span_id(run_span_id)
1901                }
1902            }
1903        } else {
1904            TraceContext::new(run_scope.clone())
1905                .with_trace_id(trace_id_from_uuid(&run_uuid))
1906                .with_span_id(run_span_id)
1907        };
1908
1909        Ok(run_trace)
1910    }
1911
1912    fn build_step_trace_for_step(
1913        &self,
1914        step: &QueuedStep,
1915        run_trace: &TraceContext,
1916    ) -> TraceContext {
1917        let step_index = if step.idx >= 0 { step.idx as u32 } else { 0 };
1918        let step_attempt = if step.attempt >= 0 {
1919            (step.attempt + 1) as u32
1920        } else {
1921            0
1922        };
1923
1924        let step_scope = StepScope::new(step.step_id.to_string())
1925            .with_index(step_index)
1926            .with_attempt(step_attempt)
1927            .with_kind(step.spec.r#type.clone())
1928            .with_scheduler(self.config.worker_id.clone());
1929
1930        run_trace.child_step(step_scope)
1931    }
1932
1933    async fn try_pause_for_breakpoint(&self, step: &QueuedStep) -> Result<bool> {
1934        let run_uuid = Uuid::from(step.run_id);
1935        let step_uuid = Uuid::from(step.step_id);
1936
1937        let run = match self.storage.fetch_run(run_uuid).await? {
1938            Some(run) => run,
1939            None => return Ok(false),
1940        };
1941
1942        let mut ctx_map = match run.input_ctx.clone() {
1943            Value::Object(map) => map,
1944            _ => return Ok(false),
1945        };
1946
1947        let debug_value = match ctx_map.get_mut("debug") {
1948            Some(Value::Object(map)) => map,
1949            _ => return Ok(false),
1950        };
1951
1952        let mut breakpoints = load_debug_breakpoints(debug_value);
1953        if breakpoints.is_empty() {
1954            return Ok(false);
1955        }
1956
1957        let index = match breakpoints.iter().position(|bp| bp.step_id == step_uuid) {
1958            Some(idx) => idx,
1959            None => return Ok(false),
1960        };
1961
1962        let record = breakpoints.remove(index);
1963        store_debug_breakpoints(debug_value, &breakpoints);
1964        set_active_pause(debug_value, &record);
1965
1966        let mut tx = self
1967            .storage
1968            .pool()
1969            .begin()
1970            .await
1971            .context("failed to begin pause transaction")?;
1972
1973        self.storage
1974            .update_run_input_ctx_tx(&mut tx, run_uuid, Value::Object(ctx_map.clone()))
1975            .await
1976            .context("failed to persist pause context")?;
1977
1978        self.storage
1979            .set_run_status_tx(&mut tx, run_uuid, models::RunStatus::PausedAtStep)
1980            .await
1981            .context("failed to set run status to paused")?;
1982
1983        self.storage
1984            .revert_step_to_queue_tx(&mut tx, run_uuid, step_uuid)
1985            .await
1986            .context("failed to revert step to queue after pause")?;
1987
1988        self.storage
1989            .enqueue_outbox_event(
1990                &mut tx,
1991                NewOutboxEvent {
1992                    run_id: run_uuid,
1993                    step_id: Some(step_uuid),
1994                    kind: "run_paused".to_string(),
1995                    payload: json!({
1996                        "run_id": run_uuid.to_string(),
1997                        "step_id": step_uuid.to_string(),
1998                        "spec": record.spec,
1999                        "reason": "breakpoint",
2000                    }),
2001                },
2002            )
2003            .await
2004            .context("failed to enqueue pause event")?;
2005
2006        tx.commit()
2007            .await
2008            .context("failed to commit pause transaction")?;
2009
2010        let tenant_uuid = match self.storage.fetch_run_tenant(run_uuid).await {
2011            Ok(Some(tenant)) => Uuid::parse_str(&tenant).ok(),
2012            _ => None,
2013        };
2014
2015        if let Err(err) = self
2016            .audit
2017            .append(
2018                &self.config.worker_id,
2019                tenant_uuid,
2020                "debug.breakpoints.pause",
2021                &json!({
2022                    "run_id": run_uuid.to_string(),
2023                    "step_id": step_uuid.to_string(),
2024                    "reason": "breakpoint",
2025                }),
2026            )
2027            .await
2028        {
2029            warn!(error = %err, run = %step.run_id, step = %step.step_id, "failed to append audit log for breakpoint pause");
2030        }
2031
2032        info!(run = %step.run_id, step = %step.step_id, "paused run at breakpoint");
2033        Ok(true)
2034    }
2035
2036    async fn reserve_budget(
2037        &self,
2038        run_id: RunId,
2039        request: &BudgetRequest,
2040    ) -> Result<BudgetDecision> {
2041        if let Some(tenant) = self.storage.fetch_run_tenant(Uuid::from(run_id)).await? {
2042            if let Err(err) = self
2043                .storage
2044                .enforce_tenant_quota(&tenant, request.tokens, request.cost)
2045                .await
2046            {
2047                return Ok(BudgetDecision::Denied {
2048                    reason: err.to_string(),
2049                });
2050            }
2051        }
2052
2053        match self
2054            .storage
2055            .try_consume_budget(Uuid::from(run_id), request.tokens, request.cost)
2056            .await?
2057        {
2058            BudgetResult::Granted(budget) => {
2059                let mut ctx = BudgetCtx::default();
2060                ctx.token_limit = budget.tokens_limit;
2061                ctx.tokens_used = budget.tokens_used;
2062                ctx.tokens_reserved = request.tokens;
2063                ctx.cost_limit = budget.cost_limit;
2064                ctx.cost_used = budget.cost_used;
2065                ctx.cost_reserved = request.cost;
2066                ctx.tracked = true;
2067                Ok(BudgetDecision::Granted(ctx))
2068            }
2069            BudgetResult::Denied => Ok(BudgetDecision::Denied {
2070                reason: format!("budget exhausted for run {}", run_id),
2071            }),
2072            BudgetResult::NoBudgetConfigured => {
2073                let mut ctx = BudgetCtx::default();
2074                ctx.tokens_reserved = request.tokens;
2075                ctx.cost_reserved = request.cost;
2076                Ok(BudgetDecision::Unlimited(ctx))
2077            }
2078        }
2079    }
2080
2081    async fn should_backpressure(&self, request: &BudgetRequest) -> bool {
2082        if (request.tokens <= 0 && request.cost <= 0.0)
2083            || (self.config.max_inflight_tokens.is_none()
2084                && self.config.max_inflight_cost.is_none())
2085        {
2086            return false;
2087        }
2088
2089        let inflight = self.inflight.lock().await;
2090        if let Some(limit) = self.config.max_inflight_tokens {
2091            if request.tokens > 0 && inflight.tokens + request.tokens > limit {
2092                return true;
2093            }
2094        }
2095        if let Some(limit) = self.config.max_inflight_cost {
2096            if request.cost > 0.0 && inflight.cost + request.cost > limit {
2097                return true;
2098            }
2099        }
2100        false
2101    }
2102
2103    async fn defer_for_backpressure(&self, step: &QueuedStep) -> Result<()> {
2104        let delay = chrono::Duration::from_std(self.config.backpressure_delay)
2105            .unwrap_or_else(|_| ChronoDuration::milliseconds(50));
2106        let not_before = Utc::now() + delay;
2107        let run_trace = self.build_run_trace_for_step(step).await?;
2108        let mut payload = json!({
2109            "run_id": step.run_id,
2110            "step_id": step.step_id,
2111            "status": "queued",
2112            "reason": "backpressure",
2113            "not_before": not_before.to_rfc3339(),
2114        });
2115        insert_trace(&mut payload, &run_trace);
2116        self.storage
2117            .defer_step_to_queue(
2118                Uuid::from(step.run_id),
2119                Uuid::from(step.step_id),
2120                not_before,
2121                payload,
2122            )
2123            .await
2124    }
2125
2126    async fn add_inflight(&self, tokens: i64, cost: f64) {
2127        if tokens <= 0 && cost <= 0.0 {
2128            return;
2129        }
2130        let mut inflight = self.inflight.lock().await;
2131        inflight.add(tokens, cost);
2132    }
2133
2134    async fn release_inflight(&self, tokens: i64, cost: f64) {
2135        if tokens <= 0 && cost <= 0.0 {
2136            return;
2137        }
2138        let mut inflight = self.inflight.lock().await;
2139        inflight.subtract(tokens, cost);
2140    }
2141
2142    fn compute_retry_delay(&self, step: &QueuedStep) -> ChronoDuration {
2143        let base = step.retry_backoff_ms.max(0);
2144        if base == 0 {
2145            return ChronoDuration::from_std(self.config.active_backoff)
2146                .unwrap_or_else(|_| ChronoDuration::milliseconds(10));
2147        }
2148        let exponent = step.attempt.max(0);
2149        let factor = step.retry_backoff_factor.max(1.0);
2150        let scale = if exponent > 0 {
2151            factor.powi(exponent)
2152        } else {
2153            1.0
2154        };
2155        let delay = (base as f64) * scale;
2156        let clamped = delay.min(i64::MAX as f64);
2157        ChronoDuration::milliseconds(clamped as i64)
2158    }
2159
2160    fn issue_capability_token(
2161        &self,
2162        run_id: RunId,
2163        step_id: StepId,
2164        attempt: i32,
2165        spec: &StepSpec,
2166        budget: &BudgetCtx,
2167    ) -> Option<CapabilityToken> {
2168        let signer = match capability_signer() {
2169            Ok(signer) => signer,
2170            Err(err) => {
2171                warn!(
2172                    run = %run_id,
2173                    step = %step_id,
2174                    error = %err,
2175                    "failed to load capability signer; skipping capability token mint"
2176                );
2177                return None;
2178            }
2179        };
2180
2181        let subject = CapabilityTokenSubject {
2182            run_id: Uuid::from(run_id),
2183            step_id: Some(Uuid::from(step_id)),
2184            attempt: Some(attempt),
2185        };
2186        let scope = capability::build_capability_scope(spec, budget);
2187        let ttl = spec
2188            .execution
2189            .deadline_ms
2190            .map(|ms| {
2191                let clamped = ms.min(i64::MAX as u64) as i64;
2192                ChronoDuration::milliseconds(clamped)
2193            })
2194            .filter(|duration| *duration > ChronoDuration::zero())
2195            .unwrap_or_else(|| ChronoDuration::minutes(15));
2196
2197        match mint_capability_token(
2198            subject,
2199            scope,
2200            Some(vec!["runtime-tool-adapter".into()]),
2201            ttl,
2202            signer.as_ref(),
2203        ) {
2204            Ok(token) => Some(token),
2205            Err(err) => {
2206                warn!(
2207                    run = %run_id,
2208                    step = %step_id,
2209                    error = %err,
2210                    "failed to mint capability token"
2211                );
2212                None
2213            }
2214        }
2215    }
2216
2217    fn validate_capability_guard(
2218        &self,
2219        token: Option<&CapabilityToken>,
2220        spec: &StepSpec,
2221        budget: &BudgetCtx,
2222        run_id: RunId,
2223        step_id: StepId,
2224    ) -> Result<Option<CapabilityViolation>> {
2225        match capability::validate_capability_token(token, spec, budget, Utc::now()) {
2226            Ok(()) => Ok(None),
2227            Err(err) => Ok(Some(capability_violation_record(
2228                run_id,
2229                step_id,
2230                err.reasons,
2231                err.detail,
2232                err.token_id,
2233            ))),
2234        }
2235    }
2236
2237    async fn schedule_retry(
2238        &self,
2239        step: &QueuedStep,
2240        error_payload: Value,
2241        budget: &BudgetCtx,
2242        step_trace: &TraceContext,
2243        run_trace: &TraceContext,
2244    ) -> Result<()> {
2245        let delay = self.compute_retry_delay(step);
2246        let not_before = Utc::now() + delay;
2247        let run_uuid = Uuid::from(step.run_id);
2248        let step_uuid = Uuid::from(step.step_id);
2249        let attempt = step.attempt + 1;
2250
2251        let mut tx = self.storage.pool().begin().await?;
2252        self.storage
2253            .schedule_retry_tx(
2254                &mut tx,
2255                run_uuid,
2256                step_uuid,
2257                attempt,
2258                not_before,
2259                error_payload,
2260            )
2261            .await?;
2262
2263        if budget.tracked {
2264            self.storage
2265                .reconcile_budget_usage_tx(
2266                    &mut tx,
2267                    run_uuid,
2268                    -budget.tokens_reserved,
2269                    -budget.cost_reserved,
2270                )
2271                .await?;
2272        }
2273
2274        let mut payload = json!({
2275            "run_id": step.run_id,
2276            "step_id": step.step_id,
2277            "status": "queued",
2278            "reason": "retry",
2279            "attempt": attempt,
2280            "not_before": not_before.to_rfc3339(),
2281        });
2282        insert_trace(&mut payload, run_trace);
2283        insert_trace(&mut payload, step_trace);
2284
2285        self.storage
2286            .enqueue_outbox_event(
2287                &mut tx,
2288                NewOutboxEvent {
2289                    run_id: run_uuid,
2290                    step_id: Some(step_uuid),
2291                    kind: "step_retry_scheduled".to_string(),
2292                    payload,
2293                },
2294            )
2295            .await?;
2296
2297        self.storage
2298            .insert_cost_ledger_tx(
2299                &mut tx,
2300                run_uuid,
2301                step_uuid,
2302                attempt,
2303                "retry_scheduled",
2304                budget.tokens_reserved,
2305                budget.cost_reserved,
2306                budget.tokens_actual,
2307                budget.cost_actual,
2308            )
2309            .await?;
2310
2311        tx.commit().await?;
2312
2313        let tenant_uuid = match self.storage.fetch_run_tenant(run_uuid).await {
2314            Ok(value) => value.and_then(|s| Uuid::parse_str(&s).ok()),
2315            Err(err) => {
2316                warn!(
2317                    error = %err,
2318                    run = %step.run_id,
2319                    step = %step.step_id,
2320                    "failed to fetch tenant for retry audit"
2321                );
2322                None
2323            }
2324        };
2325
2326        let audit_resource = json!({
2327            "run_id": step.run_id.to_string(),
2328            "step_id": step.step_id.to_string(),
2329            "attempt": attempt,
2330            "not_before": not_before.to_rfc3339(),
2331        });
2332
2333        if let Err(err) = self
2334            .audit
2335            .append(
2336                &self.config.worker_id,
2337                tenant_uuid,
2338                "step.retry",
2339                &audit_resource,
2340            )
2341            .await
2342        {
2343            warn!(
2344                error = %err,
2345                run = %step.run_id,
2346                step = %step.step_id,
2347                "failed to append audit log for retry"
2348            );
2349        }
2350
2351        self.metrics.record_retry();
2352        Ok(())
2353    }
2354
2355    async fn schedule_compensation(
2356        &self,
2357        step: &QueuedStep,
2358        run_trace: &TraceContext,
2359    ) -> Result<()> {
2360        let Some(target) = step.compensation_step else {
2361            return Ok(());
2362        };
2363        if step.compensation_scheduled {
2364            return Ok(());
2365        }
2366
2367        let run_uuid = Uuid::from(step.run_id);
2368        let origin_uuid = Uuid::from(step.step_id);
2369        let target_uuid = Uuid::from(target);
2370
2371        let mut tx = self.storage.pool().begin().await?;
2372        self.storage
2373            .trigger_compensation_tx(&mut tx, run_uuid, origin_uuid, target_uuid)
2374            .await?;
2375
2376        let mut payload = json!({
2377            "run_id": step.run_id,
2378            "step_id": target,
2379            "reason": "compensation",
2380        });
2381        insert_trace(&mut payload, run_trace);
2382
2383        self.storage
2384            .enqueue_outbox_event(
2385                &mut tx,
2386                NewOutboxEvent {
2387                    run_id: run_uuid,
2388                    step_id: Some(target_uuid),
2389                    kind: "step_compensation_queued".to_string(),
2390                    payload,
2391                },
2392            )
2393            .await?;
2394
2395        self.storage
2396            .insert_cost_ledger_tx(
2397                &mut tx,
2398                run_uuid,
2399                origin_uuid,
2400                step.attempt + 1,
2401                "compensation_scheduled",
2402                0,
2403                0.0,
2404                None,
2405                None,
2406            )
2407            .await?;
2408
2409        tx.commit().await?;
2410
2411        let tenant_uuid = match self.storage.fetch_run_tenant(run_uuid).await {
2412            Ok(value) => value.and_then(|s| Uuid::parse_str(&s).ok()),
2413            Err(err) => {
2414                warn!(
2415                    error = %err,
2416                    run = %step.run_id,
2417                    step = %step.step_id,
2418                    "failed to fetch tenant for compensation audit"
2419                );
2420                None
2421            }
2422        };
2423
2424        let audit_resource = json!({
2425            "run_id": step.run_id.to_string(),
2426            "step_id": step.step_id.to_string(),
2427            "compensation_step": target.to_string(),
2428        });
2429
2430        if let Err(err) = self
2431            .audit
2432            .append(
2433                &self.config.worker_id,
2434                tenant_uuid,
2435                "step.compensation",
2436                &audit_resource,
2437            )
2438            .await
2439        {
2440            warn!(
2441                error = %err,
2442                run = %step.run_id,
2443                step = %step.step_id,
2444                "failed to append audit log for compensation step"
2445            );
2446        }
2447
2448        Ok(())
2449    }
2450}
2451
2452fn attach_policy_summary(target: &mut Value, mut summary: Value, artifact_meta: Value) {
2453    let Some(obj) = target.as_object_mut() else {
2454        return;
2455    };
2456
2457    if let Value::Object(ref mut summary_obj) = summary {
2458        summary_obj.insert("artifact".to_string(), artifact_meta.clone());
2459    }
2460
2461    obj.insert("policy_decisions".to_string(), summary);
2462    let entry = obj
2463        .entry("artifacts")
2464        .or_insert_with(|| Value::Object(Map::new()));
2465    if let Value::Object(ref mut map) = entry {
2466        map.insert("policy_decisions".to_string(), artifact_meta);
2467    } else {
2468        let mut map = Map::new();
2469        map.insert("policy_decisions".to_string(), artifact_meta);
2470        *entry = Value::Object(map);
2471    }
2472}
2473
2474fn insert_trace(payload: &mut Value, trace: &TraceContext) {
2475    if let Value::Object(ref mut map) = payload {
2476        map.insert("trace".to_string(), trace.to_json());
2477    }
2478}
2479
2480impl Scheduler {
2481    async fn decorate_artifacts(
2482        &self,
2483        run_id: RunId,
2484        step_id: StepId,
2485        artifacts_value: &mut Value,
2486    ) -> Result<()> {
2487        if !self.config.transparency_writer || !self.config.transparency_scope.includes_artifacts()
2488        {
2489            return Ok(());
2490        }
2491
2492        let Value::Object(map) = artifacts_value else {
2493            return Ok(());
2494        };
2495
2496        for (key, entry) in map.iter_mut() {
2497            self.enqueue_artifact_job(run_id, step_id, key, entry)
2498                .await?;
2499        }
2500
2501        Ok(())
2502    }
2503
2504    async fn enqueue_artifact_job(
2505        &self,
2506        run_id: RunId,
2507        step_id: StepId,
2508        artifact_key: &str,
2509        entry: &mut Value,
2510    ) -> Result<()> {
2511        let Value::Object(obj) = entry else {
2512            return Ok(());
2513        };
2514        let Some(sha256) = obj
2515            .get("sha256")
2516            .and_then(Value::as_str)
2517            .map(|s| s.to_string())
2518        else {
2519            return Ok(());
2520        };
2521        let metadata = obj.get("metadata").cloned().unwrap_or(Value::Null);
2522        let metadata_obj = match metadata.as_object() {
2523            Some(map) => map,
2524            None => return Ok(()),
2525        };
2526        let attestation_ids_value = metadata_obj
2527            .get("attestation_ids")
2528            .and_then(Value::as_array)
2529            .cloned()
2530            .unwrap_or_default();
2531        if attestation_ids_value.is_empty() {
2532            return Ok(());
2533        }
2534        let mut attestation_ids: Vec<Uuid> = Vec::new();
2535        for value in attestation_ids_value {
2536            if let Some(id) = value.as_str() {
2537                if let Ok(uuid) = Uuid::parse_str(id) {
2538                    attestation_ids.push(uuid);
2539                }
2540            }
2541        }
2542        if attestation_ids.is_empty() {
2543            return Ok(());
2544        }
2545
2546        let payload = models::TransparencyJobPayload::ArtifactEntry(models::ArtifactEntryPayload {
2547            run_id: Some(Uuid::from(run_id)),
2548            step_id: Some(Uuid::from(step_id)),
2549            artifact_key: artifact_key.to_string(),
2550            artifact_sha256: sha256.clone(),
2551            attestation_ids: attestation_ids.clone(),
2552            metadata: metadata.clone(),
2553        });
2554
2555        match self.storage.enqueue_transparency_job(payload).await {
2556            Ok(job_id) => {
2557                obj.insert(
2558                    "transparency".to_string(),
2559                    json!({
2560                        "status": "queued",
2561                        "job_id": job_id,
2562                        "artifact_sha256": sha256,
2563                        "attestation_ids": attestation_ids.iter().map(|id| id.to_string()).collect::<Vec<_>>(),
2564                    }),
2565                );
2566            }
2567            Err(err) => {
2568                warn!(
2569                    run = %run_id,
2570                    step = %step_id,
2571                    artifact = artifact_key,
2572                    error = %err,
2573                    "failed to enqueue artifact transparency job"
2574                );
2575            }
2576        }
2577
2578        Ok(())
2579    }
2580}
2581
2582fn policy_pack_name(policy: &Value) -> String {
2583    match policy {
2584        Value::Object(map) => map
2585            .get("pack")
2586            .and_then(Value::as_str)
2587            .map(|value| value.to_string())
2588            .unwrap_or_else(|| "baseline".to_string()),
2589        _ => "baseline".to_string(),
2590    }
2591}
2592
2593fn policy_outcome_to_record(
2594    outcome: &PolicyOutcome,
2595    run_id: RunId,
2596    step_id: StepId,
2597    boundary: TrustBoundary,
2598    policy_id: &str,
2599) -> PolicyEvaluationRecord {
2600    let run_uuid = Uuid::from(run_id);
2601    let step_uuid = Uuid::from(step_id);
2602    let reasons = outcome
2603        .decision
2604        .reason
2605        .as_ref()
2606        .map(|reason| vec![reason.clone()])
2607        .unwrap_or_default();
2608    let decision_trace = PolicyDecisionTrace {
2609        policy: policy_id.to_string(),
2610        effect: outcome.decision.effect,
2611        reason: outcome.decision.reason.clone(),
2612        patches: outcome.decision.patches.clone(),
2613    };
2614
2615    PolicyEvaluationRecord {
2616        boundary,
2617        run_id: Some(run_uuid),
2618        step_id: Some(step_uuid),
2619        effect: outcome.decision.effect,
2620        reasons,
2621        decisions: vec![decision_trace],
2622        patches: outcome.decision.patches.clone(),
2623        payload: None,
2624        timestamp: outcome.attestation.issued_at,
2625        origins: Vec::new(),
2626        preview: None,
2627        attestation_id: Some(outcome.attestation.id),
2628        attestation: serde_json::to_value(&outcome.attestation).ok(),
2629        trust_decision: serde_json::to_value(&outcome.trust).ok(),
2630    }
2631}
2632
2633fn dominant_policy_effect(events: &[PolicyEvaluationRecord]) -> DecisionEffect {
2634    events
2635        .iter()
2636        .map(|event| event.effect)
2637        .max_by_key(effect_priority)
2638        .unwrap_or(DecisionEffect::Allow)
2639}
2640
2641fn effect_priority(effect: DecisionEffect) -> u8 {
2642    match effect {
2643        DecisionEffect::Deny => 4,
2644        DecisionEffect::Redact => 2,
2645        DecisionEffect::Allow => 0,
2646    }
2647}
2648
2649fn decision_effect_label(effect: DecisionEffect) -> String {
2650    match effect {
2651        DecisionEffect::Allow => "allow",
2652        DecisionEffect::Deny => "deny",
2653        DecisionEffect::Redact => "redact",
2654    }
2655    .to_string()
2656}
2657
2658fn trust_boundary_label(boundary: &TrustBoundary) -> String {
2659    serde_json::to_value(boundary)
2660        .ok()
2661        .and_then(|value| value.as_str().map(|s| s.to_string()))
2662        .unwrap_or_else(|| format!("{boundary:?}"))
2663}
2664
2665fn resolve_manifest_profile(
2666    metadata: Option<&Value>,
2667    fallback: ManifestProfile,
2668) -> ManifestProfile {
2669    metadata_manifest_profile(metadata)
2670        .or_else(env_manifest_profile)
2671        .unwrap_or(fallback)
2672}
2673
2674fn metadata_manifest_profile(metadata: Option<&Value>) -> Option<ManifestProfile> {
2675    metadata
2676        .and_then(|value| match value {
2677            Value::Object(map) => map.get("c2pa_profile").and_then(Value::as_str),
2678            _ => None,
2679        })
2680        .and_then(parse_manifest_profile_label)
2681}
2682
2683fn env_manifest_profile() -> Option<ManifestProfile> {
2684    env::var(C2PA_PROFILE_ENV)
2685        .ok()
2686        .and_then(|value| parse_manifest_profile_label(value.trim()))
2687}
2688
2689fn parse_manifest_profile_label(label: &str) -> Option<ManifestProfile> {
2690    match label.to_ascii_lowercase().as_str() {
2691        "basic" => Some(ManifestProfile::Basic),
2692        "policy" | "policy-evidence" => Some(ManifestProfile::PolicyEvidence),
2693        "full" => Some(ManifestProfile::Full),
2694        _ => None,
2695    }
2696}
2697
2698fn summarize_policy_events(events: &[PolicyEvaluationRecord]) -> String {
2699    let mut snippets: Vec<String> = Vec::new();
2700    for event in events {
2701        if !event.reasons.is_empty() {
2702            snippets.extend(event.reasons.iter().cloned());
2703        } else if let Some(preview) = &event.preview {
2704            snippets.push(preview.clone());
2705        }
2706    }
2707
2708    if snippets.is_empty() {
2709        decision_effect_label(dominant_policy_effect(events))
2710    } else {
2711        if snippets.len() > 3 {
2712            snippets.truncate(3);
2713        }
2714        snippets.join("; ")
2715    }
2716}
2717
2718async fn persist_policy_decisions(
2719    artifacts: Arc<dyn ArtifactStore>,
2720    run_id: RunId,
2721    step_id: StepId,
2722    events: &[PolicyEvaluationRecord],
2723) -> Result<(Value, Value)> {
2724    let events_value = serde_json::to_value(events)?;
2725    let payload_vec = serde_json::to_vec(&events_value)?;
2726    let attestation_ids: Vec<Uuid> = events
2727        .iter()
2728        .filter_map(|event| event.attestation_id)
2729        .collect();
2730    let boundaries: Vec<TrustBoundary> = events.iter().map(|e| e.boundary.clone()).collect();
2731    let boundary_labels: Vec<String> = boundaries.iter().map(trust_boundary_label).collect();
2732    let reasons = collect_unique_reasons(events);
2733    let denied = events.iter().any(|e| e.effect == DecisionEffect::Deny);
2734    let redacted = events.iter().any(|e| e.effect == DecisionEffect::Redact);
2735    let preview_strings: Vec<String> = events
2736        .iter()
2737        .filter_map(|e| e.preview.clone())
2738        .take(4)
2739        .collect();
2740    let previews: Vec<Value> = preview_strings
2741        .iter()
2742        .map(|preview| Value::String(preview.clone()))
2743        .collect();
2744
2745    let policy_subject = format!("run:{}:step:{}:policy_decisions", run_id, step_id);
2746    let policy_evidence = PolicyEvidence {
2747        effect: decision_effect_label(dominant_policy_effect(events)),
2748        boundaries: boundary_labels.clone(),
2749        reasons: reasons.clone(),
2750        previews: preview_strings,
2751        decision_attestation_ids: attestation_ids.clone(),
2752    };
2753    let identity = IdentityEvidence::new()
2754        .with_run_id(Uuid::from(run_id))
2755        .with_step_id(Uuid::from(step_id))
2756        .with_subject_label(policy_subject.clone());
2757    let manifest = generate_c2pa_manifest(ManifestInput {
2758        bytes: &payload_vec,
2759        media_type: "application/json",
2760        subject: Some(&policy_subject),
2761        attestation_ids: &attestation_ids,
2762        profile: ManifestProfile::PolicyEvidence,
2763        policy_evidence: Some(&policy_evidence),
2764        identity: Some(&identity),
2765        capability_evidence: None,
2766    })?;
2767    let payload = Bytes::from(payload_vec);
2768    let effects: Vec<DecisionEffect> = events.iter().map(|e| e.effect).collect();
2769    let origins = aggregate_origins(events);
2770
2771    let signer = trust_signer();
2772    let envelope = signer
2773        .sign_json(&events_value)
2774        .context("failed to sign policy decisions payload")?;
2775    let signature = BASE64.encode(&envelope.signature);
2776
2777    let mut metadata = Map::new();
2778    metadata.insert(
2779        "kind".to_string(),
2780        Value::String("policy_decisions".to_string()),
2781    );
2782    metadata.insert("count".to_string(), Value::from(events.len()));
2783    metadata.insert("effects".to_string(), serde_json::to_value(&effects)?);
2784    metadata.insert("boundaries".to_string(), serde_json::to_value(&boundaries)?);
2785    if !reasons.is_empty() {
2786        metadata.insert("reasons".to_string(), serde_json::to_value(&reasons)?);
2787    }
2788    metadata.insert("denied".to_string(), Value::Bool(denied));
2789    metadata.insert("redacted".to_string(), Value::Bool(redacted));
2790    if !origins.is_empty() {
2791        metadata.insert("origins".to_string(), Value::Array(origins.clone()));
2792    }
2793    metadata.insert("c2pa_manifest".to_string(), manifest.clone());
2794    metadata.insert("c2pa_verified".to_string(), Value::Bool(true));
2795    if !attestation_ids.is_empty() {
2796        let ids = attestation_ids
2797            .iter()
2798            .map(|id| Value::String(id.to_string()))
2799            .collect::<Vec<_>>();
2800        metadata.insert("attestation_ids".to_string(), Value::Array(ids));
2801    }
2802    metadata.insert(
2803        "signature".to_string(),
2804        json!({
2805            "algorithm": envelope.algorithm.as_str(),
2806            "key_id": envelope.key_id,
2807            "value": signature,
2808        }),
2809    );
2810
2811    let meta = artifacts
2812        .put(payload, "application/json", Value::Object(metadata))
2813        .await?;
2814
2815    let mut artifact_value = artifact_meta_to_value(&meta);
2816    if let Value::Object(ref mut obj) = artifact_value {
2817        let origin = TrustOrigin::new(TrustBoundary::Artifact)
2818            .with_run_id(Uuid::from(run_id))
2819            .with_step_id(Uuid::from(step_id))
2820            .with_source(TrustSource::System);
2821        obj.insert(
2822            "trust".to_string(),
2823            serde_json::to_value(Trust::derived(origin.clone()))?,
2824        );
2825        obj.insert("trust_origin".to_string(), serde_json::to_value(origin)?);
2826        if !origins.is_empty() {
2827            obj.insert("origins".to_string(), Value::Array(origins.clone()));
2828        }
2829    }
2830
2831    let mut summary = Map::new();
2832    summary.insert("count".to_string(), Value::from(events.len()));
2833    summary.insert("effects".to_string(), serde_json::to_value(&effects)?);
2834    summary.insert("boundaries".to_string(), serde_json::to_value(&boundaries)?);
2835    if !reasons.is_empty() {
2836        summary.insert("reasons".to_string(), serde_json::to_value(&reasons)?);
2837    }
2838    summary.insert("denied".to_string(), Value::Bool(denied));
2839    summary.insert("redacted".to_string(), Value::Bool(redacted));
2840    if !previews.is_empty() {
2841        summary.insert("previews".to_string(), Value::Array(previews));
2842    }
2843    if !origins.is_empty() {
2844        summary.insert("origins".to_string(), Value::Array(origins));
2845    }
2846
2847    Ok((artifact_value, Value::Object(summary)))
2848}
2849
2850fn aggregate_origins(events: &[PolicyEvaluationRecord]) -> Vec<Value> {
2851    let mut aggregated = Vec::new();
2852    for event in events {
2853        for origin in &event.origins {
2854            if !aggregated.iter().any(|existing| existing == origin) {
2855                aggregated.push(origin.clone());
2856            }
2857        }
2858    }
2859    aggregated
2860}
2861
2862fn collect_unique_reasons(events: &[PolicyEvaluationRecord]) -> Vec<String> {
2863    let mut reasons = Vec::new();
2864    for event in events {
2865        for reason in &event.reasons {
2866            push_unique_string(&mut reasons, reason);
2867        }
2868        for decision in &event.decisions {
2869            if let Some(reason) = &decision.reason {
2870                push_unique_string(&mut reasons, reason);
2871            }
2872        }
2873    }
2874    reasons
2875}
2876
2877fn capability_violation_record(
2878    run_id: RunId,
2879    step_id: StepId,
2880    reasons: Vec<String>,
2881    detail: Value,
2882    token_id: Option<String>,
2883) -> CapabilityViolation {
2884    let summary = reasons
2885        .first()
2886        .cloned()
2887        .unwrap_or_else(|| "capability_denied".to_string());
2888    let decision_trace = PolicyDecisionTrace {
2889        policy: "capability_guard".to_string(),
2890        effect: DecisionEffect::Deny,
2891        reason: Some(summary.clone()),
2892        patches: Vec::new(),
2893    };
2894    let record = PolicyEvaluationRecord {
2895        boundary: TrustBoundary::Scheduler,
2896        run_id: Some(Uuid::from(run_id)),
2897        step_id: Some(Uuid::from(step_id)),
2898        effect: DecisionEffect::Deny,
2899        reasons,
2900        decisions: vec![decision_trace],
2901        patches: Vec::new(),
2902        payload: Some(detail),
2903        timestamp: Utc::now(),
2904        origins: Vec::new(),
2905        preview: None,
2906        attestation_id: None,
2907        attestation: None,
2908        trust_decision: None,
2909    };
2910    CapabilityViolation {
2911        record,
2912        reason: summary,
2913        token_id,
2914    }
2915}
2916
2917impl Scheduler {
2918    async fn emit_run_bom(&self, run_id: RunId, status: models::RunStatus) -> Result<()> {
2919        let run_uuid = Uuid::from(run_id);
2920        let Some(run) = self.storage.fetch_run(run_uuid).await? else {
2921            warn!(run = %run_id, "run missing during BOM generation");
2922            return Ok(());
2923        };
2924        if has_run_artifact(&run.input_ctx, "aibom") {
2925            return Ok(());
2926        }
2927        let steps = self.storage.fetch_steps_for_run(run_uuid).await?;
2928        let build = build_aibom(&run, &steps, self.policy_pack.id())
2929            .context("failed to assemble AIBOM payload")?;
2930        let payload_bytes =
2931            serde_json::to_vec(&build.bom).context("failed to serialise AIBOM payload")?;
2932        let attestation_refs = build.attestation_ids.clone();
2933        let manifest_subject = format!("run:{}:aibom", run_id);
2934        let status_label = crate::model::RunStatus::from(status).as_str().to_string();
2935        let attestation_values: Vec<Value> = attestation_refs
2936            .iter()
2937            .map(|id| Value::String(id.to_string()))
2938            .collect();
2939        let mut metadata = json!({
2940            "kind": "aibom",
2941            "run_id": run_id.to_string(),
2942            "status": status_label,
2943            "trust_subjects": build.trust_subjects,
2944            "attestation_ids": attestation_values,
2945            "summary": build.summary,
2946        });
2947        let manifest_identity = IdentityEvidence::new()
2948            .with_run_id(Uuid::from(run_id))
2949            .with_subject_label(manifest_subject.clone());
2950        let manifest_profile = resolve_manifest_profile(Some(&metadata), ManifestProfile::Full);
2951        let manifest = generate_c2pa_manifest(ManifestInput {
2952            bytes: &payload_bytes,
2953            media_type: "application/json",
2954            subject: Some(&manifest_subject),
2955            attestation_ids: &attestation_refs,
2956            profile: manifest_profile,
2957            policy_evidence: None,
2958            identity: Some(&manifest_identity),
2959            capability_evidence: None,
2960        })?;
2961        let signer = trust_signer();
2962        let envelope = signer
2963            .sign(&payload_bytes)
2964            .context("failed to sign AIBOM payload")?;
2965        let signature_b64 = BASE64.encode(&envelope.signature);
2966        if let Some(obj) = metadata.as_object_mut() {
2967            obj.insert("c2pa_manifest".to_string(), manifest);
2968            obj.insert(
2969                "signature".to_string(),
2970                json!({
2971                    "algorithm": envelope.algorithm.as_str(),
2972                    "key_id": envelope.key_id,
2973                    "value": signature_b64,
2974                }),
2975            );
2976        }
2977        let artifact_meta = self
2978            .artifact_store
2979            .put(Bytes::from(payload_bytes), "application/json", metadata)
2980            .await
2981            .context("failed to persist AIBOM artifact")?;
2982        let mut artifact_value = artifact_meta_to_value(&artifact_meta);
2983
2984        if self.config.transparency_writer
2985            && self.config.transparency_scope.includes_runs()
2986            && !attestation_refs.is_empty()
2987        {
2988            let payload = models::TransparencyJobPayload::RunRollup(models::RunRollupPayload {
2989                run_id: run_uuid,
2990                artifact_sha256: hex::encode(artifact_meta.sha256),
2991                attestation_ids: attestation_refs.clone(),
2992                artifact_key: "aibom".to_string(),
2993                metadata: artifact_meta.metadata.clone(),
2994            });
2995            match self.storage.enqueue_transparency_job(payload).await {
2996                Ok(job_id) => {
2997                    if let Value::Object(ref mut obj) = artifact_value {
2998                        obj.insert(
2999                            "transparency".to_string(),
3000                            json!({
3001                                "status": "queued",
3002                                "job_id": job_id,
3003                            }),
3004                        );
3005                    }
3006                }
3007                Err(err) => {
3008                    warn!(
3009                        run = %run_id,
3010                        error = %err,
3011                        "failed to enqueue transparency job for AIBOM"
3012                    );
3013                }
3014            }
3015        }
3016
3017        if let Value::Object(ref mut obj) = artifact_value {
3018            if let Some(Value::Object(ref mut meta_obj)) = obj.get_mut("metadata") {
3019                meta_obj.insert("kind".to_string(), Value::String("aibom".to_string()));
3020            }
3021        }
3022        self.attach_run_artifact(run_uuid, "aibom", artifact_value)
3023            .await
3024    }
3025
3026    async fn attach_run_artifact(&self, run_uuid: Uuid, key: &str, artifact: Value) -> Result<()> {
3027        let Some(run) = self.storage.fetch_run(run_uuid).await? else {
3028            warn!(run = %run_uuid, "run missing while attaching artifact");
3029            return Ok(());
3030        };
3031        let mut ctx_map = match run.input_ctx {
3032            Value::Object(map) => map,
3033            other => {
3034                let mut map = Map::new();
3035                map.insert("inputs".to_string(), other);
3036                map
3037            }
3038        };
3039        let entry = ctx_map
3040            .entry("artifacts".to_string())
3041            .or_insert_with(|| Value::Object(Map::new()));
3042        match entry {
3043            Value::Object(map) => {
3044                map.insert(key.to_string(), artifact);
3045            }
3046            _ => {
3047                let mut map = Map::new();
3048                map.insert(key.to_string(), artifact);
3049                ctx_map.insert("artifacts".to_string(), Value::Object(map));
3050            }
3051        }
3052        self.storage
3053            .update_run_input_ctx(run_uuid, Value::Object(ctx_map))
3054            .await
3055    }
3056}
3057
3058fn push_unique_string(list: &mut Vec<String>, value: &str) {
3059    if !list.iter().any(|existing| existing == value) {
3060        list.push(value.to_string());
3061    }
3062}
3063
3064fn has_run_artifact(ctx: &Value, key: &str) -> bool {
3065    ctx.as_object()
3066        .and_then(|map| map.get("artifacts"))
3067        .and_then(Value::as_object)
3068        .map(|artifacts| artifacts.contains_key(key))
3069        .unwrap_or(false)
3070}
3071
3072#[derive(Clone)]
3073struct CapabilityViolation {
3074    record: PolicyEvaluationRecord,
3075    reason: String,
3076    token_id: Option<String>,
3077}
3078
3079#[derive(Debug, Clone, Copy)]
3080struct BudgetRequest {
3081    tokens: i64,
3082    cost: f64,
3083}
3084
3085impl BudgetRequest {
3086    fn from_step(step: &QueuedStep) -> Self {
3087        let budget_policy = step.spec.policy.get("budget");
3088        let tokens = budget_policy
3089            .and_then(|p| p.get("tokens"))
3090            .and_then(Value::as_i64);
3091        let execution_hint = step.spec.execution.cost_hint.as_ref();
3092        let tokens = execution_hint
3093            .and_then(|hint| hint.tokens)
3094            .or(tokens)
3095            .or_else(|| {
3096                step.spec
3097                    .inputs
3098                    .get("token_estimate")
3099                    .and_then(Value::as_i64)
3100            })
3101            .unwrap_or(0)
3102            .max(0);
3103        let cost = budget_policy
3104            .and_then(|p| p.get("cost"))
3105            .and_then(Value::as_f64);
3106        let cost = execution_hint
3107            .and_then(|hint| hint.usd)
3108            .or(cost)
3109            .or_else(|| {
3110                step.spec
3111                    .inputs
3112                    .get("cost_estimate")
3113                    .and_then(Value::as_f64)
3114            })
3115            .unwrap_or(0.0)
3116            .max(0.0);
3117        Self { tokens, cost }
3118    }
3119}
3120
3121enum BudgetDecision {
3122    Granted(BudgetCtx),
3123    Denied { reason: String },
3124    Unlimited(BudgetCtx),
3125}
3126
3127struct QueueMetrics {
3128    queue_depth: Histogram<f64>,
3129    queue_lag: Histogram<f64>,
3130    step_latency: Histogram<f64>,
3131    budget_tokens: Counter<u64>,
3132    budget_cost: Counter<f64>,
3133    budget_tokens_remaining: Histogram<f64>,
3134    budget_cost_remaining: Histogram<f64>,
3135    budget_tokens_projected: Histogram<f64>,
3136    budget_cost_projected: Histogram<f64>,
3137    retries: Counter<u64>,
3138    slo_slack: Histogram<f64>,
3139    slo_preemptions: Counter<u64>,
3140}
3141
3142impl QueueMetrics {
3143    fn new() -> Self {
3144        let meter: Meter = telemetry_otel::meter("fleetforge.runtime");
3145        let queue_depth = meter
3146            .f64_histogram("fleetforge.queue.depth")
3147            .with_description("Sampled queue depth of the step scheduler")
3148            .init();
3149        let queue_lag = meter
3150            .f64_histogram("fleetforge.queue.lag_seconds")
3151            .with_description("Time a step spent waiting in the scheduler queue before execution")
3152            .init();
3153        let step_latency = meter
3154            .f64_histogram("fleetforge.step.latency")
3155            .with_description("End-to-end step execution latency in seconds")
3156            .init();
3157        let budget_tokens = meter
3158            .u64_counter("fleetforge.budget.tokens")
3159            .with_description("Total tokens consumed by steps")
3160            .init();
3161        let budget_cost = meter
3162            .f64_counter("fleetforge.budget.cost")
3163            .with_description("Total cost consumed by steps (USD)")
3164            .init();
3165        let budget_tokens_remaining = meter
3166            .f64_histogram("fleetforge.budget.tokens_remaining")
3167            .with_description("Remaining tokens in the active run budget after each step")
3168            .init();
3169        let budget_cost_remaining = meter
3170            .f64_histogram("fleetforge.budget.cost_remaining")
3171            .with_description("Remaining cost in the active run budget after each step (USD)")
3172            .init();
3173        let budget_tokens_projected = meter
3174            .f64_histogram("fleetforge.budget.tokens_projected_burn")
3175            .with_description(
3176                "Projected token consumption (used + reserved) prior to executing a step",
3177            )
3178            .init();
3179        let budget_cost_projected = meter
3180            .f64_histogram("fleetforge.budget.cost_projected_burn")
3181            .with_description("Projected cost (used + reserved) prior to executing a step (USD)")
3182            .init();
3183        let retries = meter
3184            .u64_counter("fleetforge.step.retries")
3185            .with_description("Number of step retries executed by the scheduler")
3186            .init();
3187        let slo_slack = meter
3188            .f64_histogram("fleetforge.slo.queue_slack_ms")
3189            .with_description("Observed queue slack (target - wait) for steps with SLO contracts, in milliseconds")
3190            .init();
3191        let slo_preemptions = meter
3192            .u64_counter("fleetforge.slo.preemptions")
3193            .with_description("Count of times the scheduler moved an SLO-contracted step ahead of another step due to a breach")
3194            .init();
3195        Self {
3196            queue_depth,
3197            queue_lag,
3198            step_latency,
3199            budget_tokens,
3200            budget_cost,
3201            budget_tokens_remaining,
3202            budget_cost_remaining,
3203            budget_tokens_projected,
3204            budget_cost_projected,
3205            retries,
3206            slo_slack,
3207            slo_preemptions,
3208        }
3209    }
3210
3211    fn record_slo_slack(&self, slack_ms: Option<i64>) {
3212        if let Some(value) = slack_ms {
3213            self.slo_slack.record(value as f64, &[]);
3214        }
3215    }
3216
3217    fn record_slo_preemption(&self) {
3218        self.slo_preemptions.add(1, &[]);
3219    }
3220
3221    fn record_queue_depth(&self, depth: usize) {
3222        self.queue_depth.record(depth as f64, &[]);
3223    }
3224
3225    fn record_queue_lag(&self, lag: Duration) {
3226        self.queue_lag.record(lag.as_secs_f64(), &[]);
3227    }
3228
3229    fn record_latency(&self, duration: Duration) {
3230        self.step_latency.record(duration.as_secs_f64(), &[]);
3231    }
3232
3233    fn record_retry(&self) {
3234        self.retries.add(1, &[]);
3235    }
3236
3237    fn record_budget_projection(&self, budget: &BudgetCtx) {
3238        if budget.tokens_reserved > 0 {
3239            let projected = if budget.tracked {
3240                budget.tokens_used.saturating_add(budget.tokens_reserved)
3241            } else {
3242                budget.tokens_reserved
3243            };
3244            self.budget_tokens_projected.record(projected as f64, &[]);
3245        }
3246        if budget.cost_reserved > 0.0 {
3247            let projected = if budget.tracked {
3248                budget.cost_used + budget.cost_reserved
3249            } else {
3250                budget.cost_reserved
3251            };
3252            self.budget_cost_projected.record(projected, &[]);
3253        }
3254    }
3255
3256    fn record_budget_consumed(&self, tokens: i64, cost: f64) {
3257        if tokens > 0 {
3258            self.budget_tokens.add(tokens as u64, &[]);
3259        }
3260        if cost > 0.0 {
3261            self.budget_cost.add(cost, &[]);
3262        }
3263    }
3264
3265    fn record_budget_remaining(&self, tokens: Option<i64>, cost: Option<f64>) {
3266        if let Some(tokens) = tokens {
3267            if tokens >= 0 {
3268                self.budget_tokens_remaining.record(tokens as f64, &[]);
3269            }
3270        }
3271        if let Some(cost) = cost {
3272            if cost >= 0.0 {
3273                self.budget_cost_remaining.record(cost, &[]);
3274            }
3275        }
3276    }
3277}
3278
3279#[cfg(test)]
3280mod tests {
3281    use super::*;
3282    use crate::guardrails::PolicyDecisionTrace;
3283    use crate::model::{RunId, StepId};
3284    use async_trait::async_trait;
3285    use bytes::Bytes;
3286    use chrono::Utc;
3287    use fleetforge_storage::ArtifactMeta;
3288    use fleetforge_trust::InMemoryAttestationVault;
3289    use serde_json::{json, Value};
3290    use sqlx::Row;
3291    use std::sync::{
3292        atomic::{AtomicBool, Ordering},
3293        Arc,
3294    };
3295    use testcontainers::clients::Cli;
3296    use testcontainers::images::postgres::Postgres;
3297    use tokio::time::{sleep, Duration};
3298    use url::Url;
3299    use uuid::Uuid;
3300
3301    fn docker_available() -> bool {
3302        std::fs::metadata("/var/run/docker.sock").is_ok() || std::env::var("DOCKER_HOST").is_ok()
3303    }
3304
3305    fn test_step_with_policy(
3306        policy: Value,
3307        created_at: chrono::DateTime<Utc>,
3308        priority: i16,
3309    ) -> QueuedStep {
3310        QueuedStep {
3311            run_id: RunId(Uuid::new_v4()),
3312            step_id: StepId(Uuid::new_v4()),
3313            idx: 0,
3314            priority,
3315            status: StepStatus::Queued,
3316            attempt: 0,
3317            created_at,
3318            max_attempts: 1,
3319            retry_backoff_ms: 0,
3320            retry_backoff_factor: 1.0,
3321            not_before: None,
3322            deadline_at: None,
3323            checkpoint: None,
3324            compensation_step: None,
3325            compensation_scheduled: false,
3326            spec: StepSpec {
3327                id: StepId(Uuid::new_v4()),
3328                r#type: StepType::Tool,
3329                inputs: json!({}),
3330                policy,
3331                execution: StepExecution::default(),
3332                slug: None,
3333                trust: None,
3334                trust_origin: None,
3335                llm_inputs: None,
3336            },
3337        }
3338    }
3339
3340    #[test]
3341    fn build_step_slo_includes_queue_observations() {
3342        let now = Utc::now();
3343        let policy = json!({
3344            "slo": {
3345                "tier": "gold",
3346                "queue_target_ms": 1000,
3347                "priority_boost": 3,
3348                "error_budget_ratio": 0.25
3349            }
3350        });
3351        let step = test_step_with_policy(policy, now - ChronoDuration::milliseconds(1500), 5);
3352        let slo = build_step_slo(&step, now).expect("slo metadata expected");
3353
3354        assert_eq!(slo.tier.as_deref(), Some("gold"));
3355        assert_eq!(slo.queue_target_ms, Some(1000));
3356        assert!(slo.observed_queue_ms.unwrap() >= 1500);
3357        assert_eq!(slo.slack_ms, Some(-500));
3358        assert_eq!(slo.priority_boost, 3);
3359        assert_eq!(slo.error_budget_ratio, Some(0.25));
3360    }
3361
3362    #[test]
3363    fn slo_scores_prioritize_breaches_over_healthy_steps() {
3364        let now = Utc::now();
3365        let breached = test_step_with_policy(
3366            json!({ "slo": { "queue_target_ms": 2000 } }),
3367            now - ChronoDuration::milliseconds(4000),
3368            0,
3369        );
3370        let healthy = test_step_with_policy(
3371            json!({ "slo": { "queue_target_ms": 2000 } }),
3372            now - ChronoDuration::milliseconds(100),
3373            0,
3374        );
3375
3376        let breached_contract = extract_slo_contract(&breached.spec);
3377        let healthy_contract = extract_slo_contract(&healthy.spec);
3378
3379        let breached_score = compute_slo_score(&breached, breached_contract.as_ref(), now);
3380        let healthy_score = compute_slo_score(&healthy, healthy_contract.as_ref(), now);
3381
3382        assert!(breached_score.has_contract);
3383        assert!(healthy_score.has_contract);
3384        assert!(breached_score.slack_ms < 0);
3385        assert!(healthy_score.slack_ms > 0);
3386        assert_eq!(breached_score.compare(&healthy_score), Ordering::Less);
3387    }
3388
3389    #[derive(Default, Clone)]
3390    struct StubArtifactStore;
3391
3392    #[async_trait]
3393    impl ArtifactStore for StubArtifactStore {
3394        async fn put(
3395            &self,
3396            bytes: Bytes,
3397            media_type: &str,
3398            metadata: Value,
3399        ) -> Result<ArtifactMeta> {
3400            Ok(ArtifactMeta {
3401                sha256: [0u8; 32],
3402                media_type: media_type.to_string(),
3403                size: bytes.len() as u64,
3404                metadata,
3405            })
3406        }
3407
3408        async fn get(&self, _sha256: [u8; 32]) -> Result<Bytes> {
3409            unimplemented!("get not implemented for stub artifact store");
3410        }
3411
3412        async fn presign_get(&self, _sha256: [u8; 32], _ttl: std::time::Duration) -> Result<Url> {
3413            unimplemented!("presign not implemented for stub artifact store");
3414        }
3415    }
3416
3417    #[tokio::test]
3418    async fn persist_policy_decisions_emits_summary() {
3419        let store = Arc::new(StubArtifactStore::default());
3420        let event = PolicyEvaluationRecord {
3421            boundary: TrustBoundary::IngressPrompt,
3422            run_id: Some(Uuid::nil()),
3423            step_id: Some(Uuid::nil()),
3424            effect: DecisionEffect::Redact,
3425            reasons: vec!["mask".into()],
3426            decisions: vec![PolicyDecisionTrace {
3427                policy: "test".into(),
3428                effect: DecisionEffect::Redact,
3429                reason: Some("mask".into()),
3430                patches: vec![
3431                    json!({"op": "replace", "path": "/inputs/secret", "value": "[masked]"}),
3432                ],
3433            }],
3434            patches: vec![json!({"op": "replace", "path": "/inputs/secret", "value": "[masked]"})],
3435            payload: Some(json!({"inputs": {"secret": "[masked]"}})),
3436            timestamp: Utc::now(),
3437            origins: vec![json!({"boundary": "ingress_prompt"})],
3438            preview: Some("secret".into()),
3439            attestation_id: None,
3440            attestation: None,
3441            trust_decision: None,
3442        };
3443
3444        let (artifact, summary) =
3445            persist_policy_decisions(store, RunId(Uuid::nil()), StepId(Uuid::nil()), &[event])
3446                .await
3447                .expect("persist policy decisions");
3448
3449        assert_eq!(summary["count"], 1);
3450        assert_eq!(summary["redacted"], true);
3451        assert!(summary["origins"].is_array());
3452        assert!(summary["previews"].is_array());
3453        assert_eq!(artifact["metadata"]["kind"], "policy_decisions");
3454        assert!(artifact["metadata"]["signature"].is_object());
3455        assert_eq!(artifact["metadata"]["signature"]["algorithm"], "HS256");
3456        assert!(artifact["metadata"]["signature"]["key_id"].is_string());
3457        assert!(artifact["metadata"]["signature"]["value"]
3458            .as_str()
3459            .is_some());
3460        assert!(artifact["trust_origin"].is_object());
3461    }
3462
3463    #[test]
3464    fn attach_policy_summary_inserts_artifact() {
3465        let mut target = json!({"existing": true});
3466        let summary = json!({"count": 2});
3467        let artifact = json!({"sha256": "abc123", "metadata": {"kind": "policy_decisions"}});
3468        attach_policy_summary(&mut target, summary, artifact.clone());
3469
3470        assert_eq!(target["policy_decisions"]["count"], 2);
3471        assert_eq!(target["policy_decisions"]["artifact"]["sha256"], "abc123");
3472        assert_eq!(
3473            target["artifacts"]["policy_decisions"]["metadata"]["kind"],
3474            "policy_decisions"
3475        );
3476    }
3477
3478    #[derive(Clone)]
3479    struct DenyPolicy;
3480
3481    #[async_trait]
3482    impl PolicyEngine for DenyPolicy {
3483        async fn evaluate(
3484            &self,
3485            _request: &fleetforge_policy::PolicyRequest,
3486        ) -> Result<PolicyDecision> {
3487            Ok(PolicyDecision {
3488                effect: DecisionEffect::Deny,
3489                reason: Some("blocked".to_string()),
3490                patches: Vec::new(),
3491            })
3492        }
3493    }
3494
3495    #[derive(Clone)]
3496    struct RedactPolicy;
3497
3498    #[async_trait]
3499    impl PolicyEngine for RedactPolicy {
3500        async fn evaluate(
3501            &self,
3502            _request: &fleetforge_policy::PolicyRequest,
3503        ) -> Result<PolicyDecision> {
3504            Ok(PolicyDecision {
3505                effect: DecisionEffect::Redact,
3506                reason: Some("mask".to_string()),
3507                patches: vec![json!({"op": "remove", "path": "/secret"})],
3508            })
3509        }
3510    }
3511
3512    struct RecordingExecutor {
3513        executed: Arc<AtomicBool>,
3514    }
3515
3516    #[async_trait]
3517    impl StepExecutor for RecordingExecutor {
3518        fn kind(&self) -> &'static str {
3519            "tool"
3520        }
3521
3522        async fn execute(&self, _ctx: &StepCtx) -> Result<StepExecutionResult> {
3523            self.executed.store(true, Ordering::SeqCst);
3524            Ok(StepExecutionResult::new(json!({"ok": true})))
3525        }
3526    }
3527
3528    struct UsageExecutor {
3529        executed: Arc<AtomicBool>,
3530    }
3531
3532    #[async_trait]
3533    impl StepExecutor for UsageExecutor {
3534        fn kind(&self) -> &'static str {
3535            "tool"
3536        }
3537
3538        async fn execute(&self, _ctx: &StepCtx) -> Result<StepExecutionResult> {
3539            self.executed.store(true, Ordering::SeqCst);
3540            Ok(StepExecutionResult::with_usage(
3541                json!({"result": "ok"}),
3542                ResourceUsage {
3543                    tokens: Some(10),
3544                    cost: Some(0.0005),
3545                },
3546            ))
3547        }
3548    }
3549
3550    struct FailingExecutor;
3551
3552    #[async_trait]
3553    impl StepExecutor for FailingExecutor {
3554        fn kind(&self) -> &'static str {
3555            "tool"
3556        }
3557
3558        async fn execute(&self, _ctx: &StepCtx) -> Result<StepExecutionResult> {
3559            Err(anyhow!("executor failure"))
3560        }
3561    }
3562
3563    struct ConditionalExecutor;
3564
3565    #[async_trait]
3566    impl StepExecutor for ConditionalExecutor {
3567        fn kind(&self) -> &'static str {
3568            "tool"
3569        }
3570
3571        async fn execute(&self, ctx: &StepCtx) -> Result<StepExecutionResult> {
3572            let mode = ctx
3573                .untrusted_inputs
3574                .as_ref()
3575                .get("mode")
3576                .and_then(Value::as_str)
3577                .unwrap_or_default();
3578            match mode {
3579                "primary" => Err(anyhow!("primary failure")),
3580                "compensation" => Ok(StepExecutionResult::with_usage(
3581                    json!({"status": "compensated"}),
3582                    ResourceUsage {
3583                        tokens: Some(5),
3584                        cost: Some(0.25),
3585                    },
3586                )),
3587                _ => Ok(StepExecutionResult::new(json!({"status": mode}))),
3588            }
3589        }
3590    }
3591
3592    async fn setup_storage() -> (Arc<Storage>, testcontainers::Container<'static, Postgres>) {
3593        let docker = Cli::default();
3594        let container = docker.run(Postgres::default());
3595        let port = container.get_host_port_ipv4(5432);
3596        let db_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
3597        sleep(Duration::from_millis(250)).await;
3598
3599        let storage = Storage::connect(StorageConfig {
3600            database_url: db_url,
3601            max_connections: 2,
3602            connect_timeout: Duration::from_secs(10),
3603            object_store: ObjectStoreConfig::InMemory,
3604        })
3605        .await
3606        .expect("connect storage");
3607
3608        let container: testcontainers::Container<'static, Postgres> =
3609            unsafe { std::mem::transmute(container) };
3610        (Arc::new(storage), container)
3611    }
3612
3613    fn seed_step(run_id: Uuid, step_id: Uuid) -> models::NewStep {
3614        models::NewStep {
3615            run_id,
3616            step_id,
3617            idx: 0,
3618            priority: 5,
3619            spec_json: json!({"id": step_id, "type": "tool", "inputs": {}, "policy": {}}),
3620            status: models::StepStatus::Queued,
3621            attempt: 0,
3622            input_snapshot: None,
3623            provider: None,
3624            provider_version: None,
3625            pending_dependencies: 0,
3626            total_dependencies: 0,
3627            estimated_tokens: 50,
3628            estimated_cost: 0.001,
3629            actual_tokens: None,
3630            actual_cost: None,
3631            ..models::NewStep::default()
3632        }
3633    }
3634
3635    #[tokio::test]
3636    async fn deny_decision_blocks_execution() {
3637        if std::fs::metadata("/var/run/docker.sock").is_err()
3638            && std::env::var("DOCKER_HOST").is_err()
3639        {
3640            eprintln!("Skipping scheduler tests because Docker is unavailable");
3641            return;
3642        }
3643
3644        let (storage, container) = setup_storage().await;
3645        let executed = Arc::new(AtomicBool::new(false));
3646        let mut registry = ExecutorRegistry::new();
3647        registry
3648            .register(Arc::new(RecordingExecutor {
3649                executed: Arc::clone(&executed),
3650            }))
3651            .unwrap();
3652        let audit = Arc::new(Audit::new(storage.pool().clone()));
3653        let attestation_vault: Arc<dyn AttestationVault> =
3654            Arc::new(InMemoryAttestationVault::new());
3655
3656        let scheduler = Scheduler::new(
3657            Arc::clone(&storage),
3658            Arc::new(registry),
3659            Arc::new(DenyPolicy),
3660            Arc::clone(&audit),
3661            Arc::clone(&attestation_vault),
3662            SchedulerConfig::default(),
3663        );
3664
3665        let run_id = Uuid::new_v4();
3666        let step_id = Uuid::new_v4();
3667
3668        let _ = storage
3669            .insert_run_with_steps(
3670                models::NewRun {
3671                    run_id,
3672                    status: models::RunStatus::Pending,
3673                    dag_json: json!({"steps": [], "edges": []}),
3674                    input_ctx: json!({}),
3675                    seed: 1,
3676                    idempotency_key: None,
3677                },
3678                &[seed_step(run_id, step_id)],
3679                &[],
3680            )
3681            .await
3682            .unwrap();
3683
3684        let cfg = SchedulerConfig::default();
3685        let leased = storage.claim_queued_steps(1, &cfg.worker_id).await.unwrap();
3686        let queued = leased
3687            .into_iter()
3688            .map(QueuedStep::try_from)
3689            .next()
3690            .unwrap()
3691            .unwrap();
3692
3693        scheduler.execute_step(queued).await.unwrap();
3694
3695        assert!(!executed.load(Ordering::SeqCst));
3696
3697        let steps = storage.fetch_steps_for_run(run_id).await.unwrap();
3698        let step = &steps[0];
3699        assert_eq!(step.status, models::StepStatus::Failed);
3700        assert_eq!(step.attempt, 0);
3701        assert_eq!(
3702            step.error_json.as_ref().unwrap()["kind"],
3703            json!("policy_denied")
3704        );
3705
3706        let events = storage.fetch_unpublished_outbox(10).await.unwrap();
3707        let event = events.iter().find(|ev| ev.kind == "step_failed").unwrap();
3708        assert_eq!(event.payload["error"]["kind"], json!("policy_denied"));
3709
3710        drop(container);
3711    }
3712
3713    #[tokio::test]
3714    async fn retry_reschedules_until_max_attempts() {
3715        if !docker_available() {
3716            eprintln!(
3717                "Skipping retry_reschedules_until_max_attempts because Docker is unavailable"
3718            );
3719            return;
3720        }
3721
3722        let (storage, container) = setup_storage().await;
3723        let mut registry = ExecutorRegistry::new();
3724        registry.register(Arc::new(FailingExecutor)).unwrap();
3725        let audit = Arc::new(Audit::new(storage.pool().clone()));
3726        let attestation_vault: Arc<dyn AttestationVault> =
3727            Arc::new(InMemoryAttestationVault::new());
3728        let scheduler = Scheduler::new(
3729            Arc::clone(&storage),
3730            Arc::new(registry),
3731            crate::policy::AllowAllPolicy::shared(),
3732            Arc::clone(&audit),
3733            Arc::clone(&attestation_vault),
3734            SchedulerConfig::default(),
3735        );
3736
3737        let run_id = Uuid::new_v4();
3738        let step_id = Uuid::new_v4();
3739        let step_spec = json!({
3740            "id": step_id,
3741            "type": "tool",
3742            "inputs": {},
3743            "execution": {
3744                "max_attempts": 2,
3745                "retry_backoff_ms": 10
3746            }
3747        });
3748
3749        let spec = json!({
3750            "steps": [step_spec.clone()],
3751            "edges": []
3752        });
3753
3754        storage
3755            .insert_run_with_steps(
3756                models::NewRun {
3757                    run_id,
3758                    status: models::RunStatus::Pending,
3759                    dag_json: spec,
3760                    input_ctx: json!({}),
3761                    seed: 7,
3762                    idempotency_key: None,
3763                },
3764                &[models::NewStep {
3765                    run_id,
3766                    step_id,
3767                    idx: 0,
3768                    priority: 0,
3769                    spec_json: step_spec,
3770                    status: models::StepStatus::Queued,
3771                    attempt: 0,
3772                    input_snapshot: None,
3773                    provider: None,
3774                    provider_version: None,
3775                    pending_dependencies: 0,
3776                    total_dependencies: 0,
3777                    estimated_tokens: 0,
3778                    estimated_cost: 0.0,
3779                    actual_tokens: None,
3780                    actual_cost: None,
3781                    ..models::NewStep::default()
3782                }],
3783                &[],
3784            )
3785            .await
3786            .unwrap();
3787
3788        let cfg = scheduler.config.clone();
3789        let first = storage
3790            .claim_queued_steps(1, &cfg.worker_id)
3791            .await
3792            .unwrap()
3793            .into_iter()
3794            .map(QueuedStep::try_from)
3795            .next()
3796            .unwrap()
3797            .unwrap();
3798
3799        scheduler.execute_step(first).await.unwrap();
3800
3801        let steps = storage.fetch_steps_for_run(run_id).await.unwrap();
3802        let queued = steps.iter().find(|step| step.step_id == step_id).unwrap();
3803        assert_eq!(queued.status, models::StepStatus::Queued);
3804        assert_eq!(queued.attempt, 1);
3805        assert!(queued.not_before.is_some());
3806
3807        sleep(Duration::from_millis(20)).await;
3808
3809        let second = storage
3810            .claim_queued_steps(1, &cfg.worker_id)
3811            .await
3812            .unwrap()
3813            .into_iter()
3814            .map(QueuedStep::try_from)
3815            .next()
3816            .unwrap()
3817            .unwrap();
3818
3819        scheduler.execute_step(second).await.unwrap();
3820
3821        let final_steps = storage.fetch_steps_for_run(run_id).await.unwrap();
3822        let failed = final_steps
3823            .iter()
3824            .find(|step| step.step_id == step_id)
3825            .unwrap();
3826        assert_eq!(failed.status, models::StepStatus::Failed);
3827        assert!(failed.compensation_step.is_none());
3828
3829        drop(container);
3830    }
3831
3832    #[tokio::test]
3833    async fn backpressure_defers_steps_exceeding_inflight_cost() {
3834        if !docker_available() {
3835            eprintln!("Skipping backpressure test because Docker is unavailable");
3836            return;
3837        }
3838
3839        let (storage, container) = setup_storage().await;
3840        let executed = Arc::new(AtomicBool::new(false));
3841        let mut registry = ExecutorRegistry::new();
3842        registry
3843            .register(Arc::new(RecordingExecutor {
3844                executed: Arc::clone(&executed),
3845            }))
3846            .unwrap();
3847        let audit = Arc::new(Audit::new(storage.pool().clone()));
3848        let attestation_vault: Arc<dyn AttestationVault> =
3849            Arc::new(InMemoryAttestationVault::new());
3850        let mut config = SchedulerConfig::default();
3851        config.batch_size = 2;
3852        config.max_inflight_cost = Some(1.0);
3853        config.backpressure_delay = Duration::from_millis(20);
3854        let scheduler = Scheduler::new(
3855            Arc::clone(&storage),
3856            Arc::new(registry),
3857            crate::policy::AllowAllPolicy::shared(),
3858            Arc::clone(&audit),
3859            Arc::clone(&attestation_vault),
3860            config,
3861        );
3862
3863        let run_id = Uuid::new_v4();
3864        let first_id = Uuid::new_v4();
3865        let second_id = Uuid::new_v4();
3866
3867        let first_spec = json!({
3868            "id": first_id,
3869            "type": "tool",
3870            "inputs": {},
3871            "execution": { "cost_hint": { "usd": 1.0 } }
3872        });
3873        let second_spec = json!({
3874            "id": second_id,
3875            "type": "tool",
3876            "inputs": {},
3877            "execution": { "cost_hint": { "usd": 0.5 } }
3878        });
3879
3880        storage
3881            .insert_run_with_steps(
3882                models::NewRun {
3883                    run_id,
3884                    status: models::RunStatus::Pending,
3885                    dag_json: json!({ "steps": [first_spec.clone(), second_spec.clone()], "edges": [] }),
3886                    input_ctx: json!({}),
3887                    seed: 99,
3888                    idempotency_key: None,
3889                },
3890                &[
3891                    models::NewStep {
3892                        run_id,
3893                        step_id: first_id,
3894                        idx: 0,
3895                        priority: 0,
3896                        spec_json: first_spec,
3897                        status: models::StepStatus::Queued,
3898                        attempt: 0,
3899                        input_snapshot: None,
3900                        provider: None,
3901                        provider_version: None,
3902                        pending_dependencies: 0,
3903                        total_dependencies: 0,
3904                        estimated_tokens: 0,
3905                        estimated_cost: 1.0,
3906                        actual_tokens: None,
3907                        actual_cost: None,
3908                        ..models::NewStep::default()
3909                    },
3910                    models::NewStep {
3911                        run_id,
3912                        step_id: second_id,
3913                        idx: 1,
3914                        priority: 0,
3915                        spec_json: second_spec,
3916                        status: models::StepStatus::Queued,
3917                        attempt: 0,
3918                        input_snapshot: None,
3919                        provider: None,
3920                        provider_version: None,
3921                        pending_dependencies: 0,
3922                        total_dependencies: 0,
3923                        estimated_tokens: 0,
3924                        estimated_cost: 0.5,
3925                        actual_tokens: None,
3926                        actual_cost: None,
3927                        ..models::NewStep::default()
3928                    },
3929                ],
3930                &[],
3931            )
3932            .await
3933            .unwrap();
3934
3935        let cfg = scheduler.config.clone();
3936        let mut leased: Vec<QueuedStep> = storage
3937            .claim_queued_steps(2, &cfg.worker_id)
3938            .await
3939            .unwrap()
3940            .into_iter()
3941            .map(|step| QueuedStep::try_from(step).unwrap())
3942            .collect();
3943
3944        let first = leased.remove(0);
3945        scheduler.execute_step(first).await.unwrap();
3946        assert!(executed.load(Ordering::SeqCst));
3947
3948        // Simulate inflight usage before processing the second step
3949        scheduler.add_inflight(0, 1.0).await;
3950
3951        let second = leased.remove(0);
3952        scheduler.execute_step(second).await.unwrap();
3953
3954        let steps = storage.fetch_steps_for_run(run_id).await.unwrap();
3955        let deferred = steps.iter().find(|step| step.step_id == second_id).unwrap();
3956        assert_eq!(deferred.status, models::StepStatus::Queued);
3957        assert!(deferred.not_before.is_some());
3958        assert_eq!(deferred.attempt, 0);
3959
3960        scheduler.release_inflight(0, 1.0).await;
3961        drop(container);
3962    }
3963
3964    struct CheckpointExecutor;
3965
3966    #[async_trait]
3967    impl StepExecutor for CheckpointExecutor {
3968        fn kind(&self) -> &'static str {
3969            "tool"
3970        }
3971
3972        async fn execute(&self, _ctx: &StepCtx) -> Result<StepExecutionResult> {
3973            Ok(StepExecutionResult::new(json!({"ok": true})).with_checkpoint(json!({"state": 42})))
3974        }
3975    }
3976
3977    #[tokio::test]
3978    async fn checkpoint_is_persisted_from_executor() {
3979        if !docker_available() {
3980            eprintln!("Skipping checkpoint test because Docker is unavailable");
3981            return;
3982        }
3983
3984        let (storage, container) = setup_storage().await;
3985        let mut registry = ExecutorRegistry::new();
3986        registry.register(Arc::new(CheckpointExecutor)).unwrap();
3987        let audit = Arc::new(Audit::new(storage.pool().clone()));
3988        let attestation_vault: Arc<dyn AttestationVault> =
3989            Arc::new(InMemoryAttestationVault::new());
3990        let scheduler = Scheduler::new(
3991            Arc::clone(&storage),
3992            Arc::new(registry),
3993            crate::policy::AllowAllPolicy::shared(),
3994            Arc::clone(&audit),
3995            Arc::clone(&attestation_vault),
3996            SchedulerConfig::default(),
3997        );
3998
3999        let run_id = Uuid::new_v4();
4000        let step_id = Uuid::new_v4();
4001
4002        let step_spec = json!({
4003            "id": step_id,
4004            "type": "tool",
4005            "inputs": {},
4006            "execution": { "checkpoint": true }
4007        });
4008
4009        storage
4010            .insert_run_with_steps(
4011                models::NewRun {
4012                    run_id,
4013                    status: models::RunStatus::Pending,
4014                    dag_json: json!({ "steps": [step_spec.clone()], "edges": [] }),
4015                    input_ctx: json!({}),
4016                    seed: 11,
4017                    idempotency_key: None,
4018                },
4019                &[models::NewStep {
4020                    run_id,
4021                    step_id,
4022                    idx: 0,
4023                    priority: 0,
4024                    spec_json: step_spec,
4025                    status: models::StepStatus::Queued,
4026                    attempt: 0,
4027                    input_snapshot: None,
4028                    provider: None,
4029                    provider_version: None,
4030                    pending_dependencies: 0,
4031                    total_dependencies: 0,
4032                    estimated_tokens: 0,
4033                    estimated_cost: 0.0,
4034                    actual_tokens: None,
4035                    actual_cost: None,
4036                    ..models::NewStep::default()
4037                }],
4038                &[],
4039            )
4040            .await
4041            .unwrap();
4042
4043        let worker = scheduler.config.worker_id.clone();
4044        let queued = storage
4045            .claim_queued_steps(1, &worker)
4046            .await
4047            .unwrap()
4048            .into_iter()
4049            .map(QueuedStep::try_from)
4050            .next()
4051            .unwrap()
4052            .unwrap();
4053
4054        scheduler.execute_step(queued).await.unwrap();
4055
4056        let steps = storage.fetch_steps_for_run(run_id).await.unwrap();
4057        let step = steps.iter().find(|item| item.step_id == step_id).unwrap();
4058        assert_eq!(step.status, models::StepStatus::Succeeded);
4059        let checkpoint = step.checkpoint.as_ref().expect("checkpoint missing");
4060        assert_eq!(checkpoint, &json!({"state": 42}));
4061
4062        drop(container);
4063    }
4064
4065    #[tokio::test]
4066    async fn compensation_step_queued_on_terminal_failure() {
4067        if !docker_available() {
4068            eprintln!("Skipping compensation test because Docker is unavailable");
4069            return;
4070        }
4071
4072        let (storage, container) = setup_storage().await;
4073        let mut registry = ExecutorRegistry::new();
4074        registry.register(Arc::new(FailingExecutor)).unwrap();
4075        let audit = Arc::new(Audit::new(storage.pool().clone()));
4076        let attestation_vault: Arc<dyn AttestationVault> =
4077            Arc::new(InMemoryAttestationVault::new());
4078        let scheduler = Scheduler::new(
4079            Arc::clone(&storage),
4080            Arc::new(registry),
4081            crate::policy::AllowAllPolicy::shared(),
4082            Arc::clone(&audit),
4083            Arc::clone(&attestation_vault),
4084            SchedulerConfig::default(),
4085        );
4086
4087        let run_id = Uuid::new_v4();
4088        let primary_id = Uuid::new_v4();
4089        let compensation_id = Uuid::new_v4();
4090
4091        let primary_spec = json!({
4092            "id": primary_id,
4093            "type": "tool",
4094            "inputs": {},
4095            "execution": {
4096                "max_attempts": 1,
4097                "compensation": { "step": compensation_id.to_string() }
4098            }
4099        });
4100
4101        let compensation_spec = json!({
4102            "id": compensation_id,
4103            "type": "tool",
4104            "inputs": { "action": "rollback" }
4105        });
4106
4107        storage
4108            .insert_run_with_steps(
4109                models::NewRun {
4110                    run_id,
4111                    status: models::RunStatus::Pending,
4112                    dag_json: json!({ "steps": [primary_spec.clone(), compensation_spec.clone()], "edges": [] }),
4113                    input_ctx: json!({}),
4114                    seed: 13,
4115                    idempotency_key: None,
4116                },
4117                &[
4118                    models::NewStep {
4119                        run_id,
4120                        step_id: primary_id,
4121                        idx: 0,
4122                        priority: 1,
4123                        spec_json: primary_spec,
4124                        status: models::StepStatus::Queued,
4125                        attempt: 0,
4126                        input_snapshot: None,
4127                        provider: None,
4128                        provider_version: None,
4129                        pending_dependencies: 0,
4130                        total_dependencies: 0,
4131                        estimated_tokens: 0,
4132                        estimated_cost: 0.0,
4133                        actual_tokens: None,
4134                        actual_cost: None,
4135                        ..models::NewStep::default()
4136                    },
4137                    models::NewStep {
4138                        run_id,
4139                        step_id: compensation_id,
4140                        idx: 1,
4141                        priority: 0,
4142                        spec_json: compensation_spec,
4143                        status: models::StepStatus::Blocked,
4144                        attempt: 0,
4145                        input_snapshot: None,
4146                        provider: None,
4147                        provider_version: None,
4148                        pending_dependencies: 1,
4149                        total_dependencies: 1,
4150                        estimated_tokens: 0,
4151                        estimated_cost: 0.0,
4152                        actual_tokens: None,
4153                        actual_cost: None,
4154                        ..models::NewStep::default()
4155                    },
4156                ],
4157                &[],
4158            )
4159            .await
4160            .unwrap();
4161
4162        let worker = scheduler.config.worker_id.clone();
4163        let queued = storage
4164            .claim_queued_steps(1, &worker)
4165            .await
4166            .unwrap()
4167            .into_iter()
4168            .map(QueuedStep::try_from)
4169            .next()
4170            .unwrap()
4171            .unwrap();
4172
4173        scheduler.execute_step(queued).await.unwrap();
4174
4175        let steps = storage.fetch_steps_for_run(run_id).await.unwrap();
4176        let primary = steps
4177            .iter()
4178            .find(|step| step.step_id == primary_id)
4179            .unwrap();
4180        assert_eq!(primary.status, models::StepStatus::Failed);
4181        assert!(primary.compensation_scheduled);
4182
4183        let compensation = steps
4184            .iter()
4185            .find(|step| step.step_id == compensation_id)
4186            .unwrap();
4187        assert_eq!(compensation.status, models::StepStatus::Queued);
4188        assert_eq!(compensation.pending_dependencies, 0);
4189
4190        drop(container);
4191    }
4192
4193    #[tokio::test]
4194    async fn compensation_step_executes_and_records_ledger() {
4195        if !docker_available() {
4196            eprintln!("Skipping compensation ledger test because Docker is unavailable");
4197            return;
4198        }
4199
4200        let (storage, container) = setup_storage().await;
4201        let mut registry = ExecutorRegistry::new();
4202        registry.register(Arc::new(ConditionalExecutor)).unwrap();
4203        let audit = Arc::new(Audit::new(storage.pool().clone()));
4204        let attestation_vault: Arc<dyn AttestationVault> =
4205            Arc::new(InMemoryAttestationVault::new());
4206        let scheduler = Scheduler::new(
4207            Arc::clone(&storage),
4208            Arc::new(registry),
4209            crate::policy::AllowAllPolicy::shared(),
4210            Arc::clone(&audit),
4211            Arc::clone(&attestation_vault),
4212            SchedulerConfig::default(),
4213        );
4214
4215        let run_id = Uuid::new_v4();
4216        let primary_id = Uuid::new_v4();
4217        let compensation_id = Uuid::new_v4();
4218
4219        let primary_spec = json!({
4220            "id": primary_id,
4221            "type": "tool",
4222            "inputs": { "mode": "primary" },
4223            "execution": {
4224                "max_attempts": 1,
4225                "compensation": { "step": compensation_id.to_string() },
4226                "cost_hint": { "tokens": 50, "usd": 1.5 }
4227            }
4228        });
4229        let compensation_spec = json!({
4230            "id": compensation_id,
4231            "type": "tool",
4232            "inputs": { "mode": "compensation" },
4233            "execution": { "cost_hint": { "tokens": 10, "usd": 0.5 } }
4234        });
4235
4236        storage
4237            .insert_run_with_steps(
4238                models::NewRun {
4239                    run_id,
4240                    status: models::RunStatus::Pending,
4241                    dag_json: json!({
4242                        "steps": [primary_spec.clone(), compensation_spec.clone()],
4243                        "edges": []
4244                    }),
4245                    input_ctx: json!({}),
4246                    seed: 17,
4247                    idempotency_key: None,
4248                },
4249                &[
4250                    models::NewStep {
4251                        run_id,
4252                        step_id: primary_id,
4253                        idx: 0,
4254                        priority: 1,
4255                        spec_json: primary_spec,
4256                        status: models::StepStatus::Queued,
4257                        attempt: 0,
4258                        input_snapshot: None,
4259                        provider: None,
4260                        provider_version: None,
4261                        pending_dependencies: 0,
4262                        total_dependencies: 0,
4263                        estimated_tokens: 50,
4264                        estimated_cost: 1.5,
4265                        actual_tokens: None,
4266                        actual_cost: None,
4267                        ..models::NewStep::default()
4268                    },
4269                    models::NewStep {
4270                        run_id,
4271                        step_id: compensation_id,
4272                        idx: 1,
4273                        priority: 0,
4274                        spec_json: compensation_spec,
4275                        status: models::StepStatus::Blocked,
4276                        attempt: 0,
4277                        input_snapshot: None,
4278                        provider: None,
4279                        provider_version: None,
4280                        pending_dependencies: 1,
4281                        total_dependencies: 1,
4282                        estimated_tokens: 10,
4283                        estimated_cost: 0.5,
4284                        actual_tokens: None,
4285                        actual_cost: None,
4286                        ..models::NewStep::default()
4287                    },
4288                ],
4289                &[],
4290            )
4291            .await
4292            .unwrap();
4293
4294        let worker = scheduler.config.worker_id.clone();
4295        let queued = storage
4296            .claim_queued_steps(1, &worker)
4297            .await
4298            .unwrap()
4299            .into_iter()
4300            .map(QueuedStep::try_from)
4301            .next()
4302            .unwrap()
4303            .unwrap();
4304
4305        scheduler.execute_step(queued).await.unwrap();
4306
4307        let steps = storage.fetch_steps_for_run(run_id).await.unwrap();
4308        let primary = steps
4309            .iter()
4310            .find(|step| step.step_id == primary_id)
4311            .unwrap();
4312        assert_eq!(primary.status, models::StepStatus::Failed);
4313        assert!(primary.compensation_scheduled);
4314
4315        let compensation = storage
4316            .claim_queued_steps(1, &worker)
4317            .await
4318            .unwrap()
4319            .into_iter()
4320            .map(QueuedStep::try_from)
4321            .next()
4322            .unwrap()
4323            .unwrap();
4324
4325        scheduler.execute_step(compensation).await.unwrap();
4326
4327        let steps = storage.fetch_steps_for_run(run_id).await.unwrap();
4328        let primary = steps
4329            .iter()
4330            .find(|step| step.step_id == primary_id)
4331            .unwrap();
4332        assert_eq!(primary.status, models::StepStatus::Failed);
4333        assert!(primary.compensation_scheduled);
4334        let compensation = steps
4335            .iter()
4336            .find(|step| step.step_id == compensation_id)
4337            .unwrap();
4338        assert_eq!(compensation.status, models::StepStatus::Succeeded);
4339        assert_eq!(compensation.attempt, 1);
4340
4341        let ledger_rows = sqlx::query(
4342            "select step_id, status, reserved_tokens, reserved_cost, actual_tokens, actual_cost \
4343             from step_cost_ledger where run_id = $1 order by recorded_at",
4344        )
4345        .bind(run_id)
4346        .fetch_all(storage.pool())
4347        .await
4348        .unwrap();
4349
4350        let mut saw_failed = false;
4351        let mut saw_comp_scheduled = false;
4352        let mut saw_comp_success = false;
4353
4354        for row in ledger_rows {
4355            let step_id: Uuid = row.get("step_id");
4356            let status: String = row.get("status");
4357            if step_id == primary_id && status == "failed" {
4358                saw_failed = true;
4359                assert_eq!(row.get::<i64, _>("reserved_tokens"), 50);
4360                assert_eq!(row.get::<f64, _>("reserved_cost"), 1.5);
4361            } else if step_id == primary_id && status == "compensation_scheduled" {
4362                saw_comp_scheduled = true;
4363            } else if step_id == compensation_id && status == "succeeded" {
4364                saw_comp_success = true;
4365                assert_eq!(row.get::<i64, _>("reserved_tokens"), 10);
4366                assert_eq!(row.get::<f64, _>("reserved_cost"), 0.5);
4367                assert_eq!(row.get::<Option<i64>, _>("actual_tokens"), Some(5));
4368                assert_eq!(row.get::<Option<f64>, _>("actual_cost"), Some(0.25));
4369            }
4370        }
4371
4372        assert!(saw_failed, "missing failed ledger entry for primary step");
4373        assert!(
4374            saw_comp_scheduled,
4375            "missing compensation_scheduled ledger entry for primary step"
4376        );
4377        assert!(
4378            saw_comp_success,
4379            "missing succeeded ledger entry for compensation step"
4380        );
4381
4382        drop(container);
4383    }
4384
4385    #[tokio::test]
4386    async fn failing_step_skips_dependents() {
4387        if std::fs::metadata("/var/run/docker.sock").is_err()
4388            && std::env::var("DOCKER_HOST").is_err()
4389        {
4390            eprintln!("Skipping scheduler tests because Docker is unavailable");
4391            return;
4392        }
4393
4394        let (storage, container) = setup_storage().await;
4395        let mut registry = ExecutorRegistry::new();
4396        registry.register(Arc::new(FailingExecutor)).unwrap();
4397        let audit = Arc::new(Audit::new(storage.pool().clone()));
4398        let attestation_vault: Arc<dyn AttestationVault> =
4399            Arc::new(InMemoryAttestationVault::new());
4400
4401        let scheduler = Scheduler::new(
4402            Arc::clone(&storage),
4403            Arc::new(registry),
4404            crate::policy::AllowAllPolicy::shared(),
4405            Arc::clone(&audit),
4406            Arc::clone(&attestation_vault),
4407            SchedulerConfig::default(),
4408        );
4409
4410        let run_id = Uuid::new_v4();
4411        let root_id = Uuid::new_v4();
4412        let child_id = Uuid::new_v4();
4413
4414        let root_step = models::NewStep {
4415            run_id,
4416            step_id: root_id,
4417            idx: 0,
4418            priority: 0,
4419            spec_json: json!({"id": root_id, "type": "tool", "inputs": {}, "policy": {}}),
4420            status: models::StepStatus::Queued,
4421            attempt: 0,
4422            input_snapshot: None,
4423            provider: None,
4424            provider_version: None,
4425            pending_dependencies: 0,
4426            total_dependencies: 0,
4427            estimated_tokens: 0,
4428            estimated_cost: 0.0,
4429            actual_tokens: None,
4430            actual_cost: None,
4431            ..models::NewStep::default()
4432        };
4433        let child_step = models::NewStep {
4434            run_id,
4435            step_id: child_id,
4436            idx: 1,
4437            priority: 0,
4438            spec_json: json!({"id": child_id, "type": "tool", "inputs": {}, "policy": {}}),
4439            status: models::StepStatus::Blocked,
4440            attempt: 0,
4441            input_snapshot: None,
4442            provider: None,
4443            provider_version: None,
4444            pending_dependencies: 1,
4445            total_dependencies: 1,
4446            estimated_tokens: 0,
4447            estimated_cost: 0.0,
4448            actual_tokens: None,
4449            actual_cost: None,
4450            ..models::NewStep::default()
4451        };
4452
4453        let _ = storage
4454            .insert_run_with_steps(
4455                models::NewRun {
4456                    run_id,
4457                    status: models::RunStatus::Pending,
4458                    dag_json: json!({
4459                        "steps": [root_step.spec_json.clone(), child_step.spec_json.clone()],
4460                        "edges": [[root_id, child_id]]
4461                    }),
4462                    input_ctx: json!({}),
4463                    seed: 1,
4464                    idempotency_key: None,
4465                },
4466                &[root_step, child_step],
4467                &[(root_id, child_id)],
4468            )
4469            .await
4470            .unwrap();
4471
4472        let cfg = SchedulerConfig::default();
4473        let leased = storage.claim_queued_steps(1, &cfg.worker_id).await.unwrap();
4474        let queued = leased
4475            .into_iter()
4476            .map(QueuedStep::try_from)
4477            .next()
4478            .unwrap()
4479            .unwrap();
4480
4481        scheduler.execute_step(queued).await.unwrap();
4482
4483        let steps = storage.fetch_steps_for_run(run_id).await.unwrap();
4484        let root = steps.iter().find(|step| step.step_id == root_id).unwrap();
4485        let child = steps.iter().find(|step| step.step_id == child_id).unwrap();
4486
4487        assert_eq!(root.status, models::StepStatus::Failed);
4488        assert_eq!(child.status, models::StepStatus::Skipped);
4489
4490        let events = storage.fetch_unpublished_outbox(10).await.unwrap();
4491        assert!(events.iter().any(|ev| ev.kind == "step_failed"));
4492        assert!(events.iter().any(|ev| ev.kind == "step_skipped"));
4493
4494        drop(container);
4495    }
4496
4497    #[tokio::test]
4498    async fn redact_decision_logs_and_allows() {
4499        if std::fs::metadata("/var/run/docker.sock").is_err()
4500            && std::env::var("DOCKER_HOST").is_err()
4501        {
4502            eprintln!("Skipping scheduler tests because Docker is unavailable");
4503            return;
4504        }
4505
4506        let (storage, container) = setup_storage().await;
4507        let executed = Arc::new(AtomicBool::new(false));
4508        let mut registry = ExecutorRegistry::new();
4509        registry
4510            .register(Arc::new(UsageExecutor {
4511                executed: Arc::clone(&executed),
4512            }))
4513            .unwrap();
4514        let audit = Arc::new(Audit::new(storage.pool().clone()));
4515        let attestation_vault: Arc<dyn AttestationVault> =
4516            Arc::new(InMemoryAttestationVault::new());
4517
4518        let scheduler = Scheduler::new(
4519            Arc::clone(&storage),
4520            Arc::new(registry),
4521            Arc::new(RedactPolicy),
4522            Arc::clone(&audit),
4523            Arc::clone(&attestation_vault),
4524            SchedulerConfig::default(),
4525        );
4526
4527        let run_id = Uuid::new_v4();
4528        let step_id = Uuid::new_v4();
4529
4530        let _ = storage
4531            .insert_run_with_steps(
4532                models::NewRun {
4533                    run_id,
4534                    status: models::RunStatus::Pending,
4535                    dag_json: json!({"steps": [], "edges": []}),
4536                    input_ctx: json!({}),
4537                    seed: 1,
4538                    idempotency_key: None,
4539                },
4540                &[seed_step(run_id, step_id)],
4541                &[],
4542            )
4543            .await
4544            .unwrap();
4545
4546        let cfg = SchedulerConfig::default();
4547        let leased = storage.claim_queued_steps(1, &cfg.worker_id).await.unwrap();
4548        let queued = leased
4549            .into_iter()
4550            .map(QueuedStep::try_from)
4551            .next()
4552            .unwrap()
4553            .unwrap();
4554
4555        scheduler.execute_step(queued).await.unwrap();
4556
4557        assert!(executed.load(Ordering::SeqCst));
4558
4559        let steps = storage.fetch_steps_for_run(run_id).await.unwrap();
4560        let step = &steps[0];
4561        assert_eq!(step.status, models::StepStatus::Succeeded);
4562        assert_eq!(step.attempt, 1);
4563        assert_eq!(step.actual_tokens, Some(10));
4564
4565        let events = storage.fetch_unpublished_outbox(10).await.unwrap();
4566        let event = events
4567            .iter()
4568            .find(|ev| ev.kind == "step_succeeded")
4569            .unwrap();
4570        assert_eq!(event.payload["budget"]["actual"]["tokens"], json!(10));
4571
4572        drop(container);
4573    }
4574
4575    #[tokio::test]
4576    async fn reserve_budget_granted_denied_unlimited() {
4577        if std::fs::metadata("/var/run/docker.sock").is_err()
4578            && std::env::var("DOCKER_HOST").is_err()
4579        {
4580            eprintln!("Skipping scheduler tests because Docker is unavailable");
4581            return;
4582        }
4583
4584        let (storage, container) = setup_storage().await;
4585        let registry = ExecutorRegistry::new();
4586        let audit = Arc::new(Audit::new(storage.pool().clone()));
4587        let attestation_vault: Arc<dyn AttestationVault> =
4588            Arc::new(InMemoryAttestationVault::new());
4589        let scheduler = Scheduler::new(
4590            Arc::clone(&storage),
4591            Arc::new(registry),
4592            Arc::new(RedactPolicy),
4593            Arc::clone(&audit),
4594            Arc::clone(&attestation_vault),
4595            SchedulerConfig::default(),
4596        );
4597
4598        let run_id = Uuid::new_v4();
4599        // insert budget row with small limits
4600        sqlx::query(
4601            "insert into budgets (budget_id, run_id, tokens_limit, tokens_used, cost_limit, cost_used) values (gen_random_uuid(), $1, 100, 0, 10.0, 0.0)"
4602        )
4603        .bind(run_id)
4604        .execute(&storage.pool())
4605        .await
4606        .unwrap();
4607
4608        let granted = scheduler
4609            .reserve_budget(
4610                RunId(run_id),
4611                &BudgetRequest {
4612                    tokens: 10,
4613                    cost: 1.0,
4614                },
4615            )
4616            .await
4617            .unwrap();
4618        match granted {
4619            BudgetDecision::Granted(ctx) => {
4620                assert_eq!(ctx.tokens_reserved, 10);
4621                assert_eq!(ctx.cost_reserved, 1.0);
4622                assert!(ctx.tracked);
4623            }
4624            _ => panic!("expected granted"),
4625        }
4626
4627        let denied = scheduler
4628            .reserve_budget(
4629                RunId(run_id),
4630                &BudgetRequest {
4631                    tokens: 200,
4632                    cost: 20.0,
4633                },
4634            )
4635            .await
4636            .unwrap();
4637        match denied {
4638            BudgetDecision::Denied { reason } => {
4639                assert!(reason.contains("budget exhausted"));
4640            }
4641            _ => panic!("expected denied"),
4642        }
4643
4644        let unlimited = scheduler
4645            .reserve_budget(
4646                RunId(Uuid::new_v4()),
4647                &BudgetRequest {
4648                    tokens: 0,
4649                    cost: 0.0,
4650                },
4651            )
4652            .await
4653            .unwrap();
4654        match unlimited {
4655            BudgetDecision::Unlimited(ctx) => {
4656                assert_eq!(ctx.tokens_reserved, 0);
4657                assert!(!ctx.tracked);
4658            }
4659            _ => panic!("expected unlimited"),
4660        }
4661
4662        drop(container);
4663    }
4664
4665    #[tokio::test]
4666    async fn zero_request_uses_existing_budget() {
4667        if std::fs::metadata("/var/run/docker.sock").is_err()
4668            && std::env::var("DOCKER_HOST").is_err()
4669        {
4670            eprintln!("Skipping scheduler tests because Docker is unavailable");
4671            return;
4672        }
4673
4674        let (storage, container) = setup_storage().await;
4675        let audit = Arc::new(Audit::new(storage.pool().clone()));
4676        let attestation_vault: Arc<dyn AttestationVault> =
4677            Arc::new(InMemoryAttestationVault::new());
4678        let scheduler = Scheduler::new(
4679            Arc::clone(&storage),
4680            Arc::new(ExecutorRegistry::new()),
4681            Arc::new(RedactPolicy),
4682            Arc::clone(&audit),
4683            Arc::clone(&attestation_vault),
4684            SchedulerConfig::default(),
4685        );
4686
4687        let run_id = Uuid::new_v4();
4688        sqlx::query(
4689            "insert into budgets (budget_id, run_id, tokens_limit, tokens_used, cost_limit, cost_used) values (gen_random_uuid(), $1, 50, 20, 5.0, 2.5)"
4690        )
4691        .bind(run_id)
4692        .execute(&storage.pool())
4693        .await
4694        .unwrap();
4695
4696        let decision = scheduler
4697            .reserve_budget(
4698                RunId(run_id),
4699                &BudgetRequest {
4700                    tokens: 0,
4701                    cost: 0.0,
4702                },
4703            )
4704            .await
4705            .unwrap();
4706
4707        match decision {
4708            BudgetDecision::Granted(ctx) => {
4709                assert_eq!(ctx.tokens_used, 20);
4710                assert_eq!(ctx.token_limit, Some(50));
4711                assert_eq!(ctx.cost_used, 2.5);
4712                assert_eq!(ctx.cost_limit, Some(5.0));
4713            }
4714            other => panic!("expected granted but got {:?}", other),
4715        }
4716
4717        drop(container);
4718    }
4719
4720    #[cfg(feature = "insecure-shell")]
4721    #[tokio::test]
4722    async fn end_to_end_shell_execution() {
4723        if std::fs::metadata("/var/run/docker.sock").is_err()
4724            && std::env::var("DOCKER_HOST").is_err()
4725        {
4726            eprintln!("Skipping scheduler tests because Docker is unavailable");
4727            return;
4728        }
4729
4730        std::env::set_var("FLEETFORGE_ALLOWED_TOOLS", "/bin/echo");
4731        std::env::set_var("FLEETFORGE_ALLOW_SHELL", "1");
4732
4733        let docker = Cli::default();
4734        let container = docker.run(Postgres::default());
4735        let port = container.get_host_port_ipv4(5432);
4736        let database_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
4737        sleep(Duration::from_millis(250)).await;
4738
4739        let storage = Arc::new(
4740            Storage::connect(StorageConfig {
4741                database_url,
4742                max_connections: 2,
4743                connect_timeout: Duration::from_secs(10),
4744                object_store: ObjectStoreConfig::InMemory,
4745            })
4746            .await
4747            .expect("connect storage"),
4748        );
4749
4750        let mut registry = ExecutorRegistry::new();
4751        registry
4752            .register(Arc::new(
4753                ShellExecutor::new().expect("shell executor should be enabled for test"),
4754            ))
4755            .expect("register shell executor");
4756        let audit = Arc::new(Audit::new(storage.pool().clone()));
4757        let attestation_vault: Arc<dyn AttestationVault> =
4758            Arc::new(InMemoryAttestationVault::new());
4759
4760        let scheduler = Arc::new(Scheduler::new(
4761            Arc::clone(&storage),
4762            Arc::new(registry),
4763            crate::policy::AllowAllPolicy::shared(),
4764            Arc::clone(&audit),
4765            Arc::clone(&attestation_vault),
4766            SchedulerConfig {
4767                batch_size: 2,
4768                idle_backoff: Duration::from_millis(50),
4769                active_backoff: Duration::from_millis(10),
4770                transparency_writer: false,
4771                ..SchedulerConfig::default()
4772            },
4773        ));
4774
4775        let run_id = Uuid::new_v4();
4776        let root_id = Uuid::new_v4();
4777        let child_id = Uuid::new_v4();
4778
4779        storage
4780            .insert_run_with_steps(
4781                models::NewRun {
4782                    run_id,
4783                    status: models::RunStatus::Pending,
4784                    dag_json: json!({
4785                        "steps": [
4786                            {
4787                                "id": root_id,
4788                                "type": "tool",
4789                                "inputs": {"command": ["/bin/echo", "root"]},
4790                                "policy": {}
4791                            },
4792                            {
4793                                "id": child_id,
4794                                "type": "tool",
4795                                "inputs": {"command": ["/bin/echo", "child"]},
4796                                "policy": {}
4797                            }
4798                        ],
4799                        "edges": [[root_id, child_id]]
4800                    }),
4801                    input_ctx: json!({}),
4802                    seed: 42,
4803                    idempotency_key: None,
4804                },
4805                &[
4806                    models::NewStep {
4807                        run_id,
4808                        step_id: root_id,
4809                        idx: 0,
4810                        priority: 1,
4811                        spec_json: json!({
4812                            "id": root_id,
4813                            "type": "tool",
4814                            "inputs": {"command": ["/bin/echo", "root"]},
4815                            "policy": {}
4816                        }),
4817                        status: models::StepStatus::Queued,
4818                        attempt: 0,
4819                        input_snapshot: None,
4820                        provider: None,
4821                        provider_version: None,
4822                        pending_dependencies: 0,
4823                        total_dependencies: 0,
4824                        estimated_tokens: 1,
4825                        estimated_cost: 0.0,
4826                        actual_tokens: None,
4827                        actual_cost: None,
4828                        ..models::NewStep::default()
4829                    },
4830                    models::NewStep {
4831                        run_id,
4832                        step_id: child_id,
4833                        idx: 1,
4834                        priority: 1,
4835                        spec_json: json!({
4836                            "id": child_id,
4837                            "type": "tool",
4838                            "inputs": {"command": ["/bin/echo", "child"]},
4839                            "policy": {}
4840                        }),
4841                        status: models::StepStatus::Blocked,
4842                        attempt: 0,
4843                        input_snapshot: None,
4844                        provider: None,
4845                        provider_version: None,
4846                        pending_dependencies: 1,
4847                        total_dependencies: 1,
4848                        estimated_tokens: 1,
4849                        estimated_cost: 0.0,
4850                        actual_tokens: None,
4851                        actual_cost: None,
4852                        ..models::NewStep::default()
4853                    },
4854                ],
4855                &[(root_id, child_id)],
4856            )
4857            .await
4858            .expect("seed run");
4859
4860        let initial = storage.fetch_steps_for_run(run_id).await.unwrap();
4861        assert_eq!(initial.len(), 2);
4862        let root = initial.iter().find(|s| s.step_id == root_id).unwrap();
4863        let child = initial.iter().find(|s| s.step_id == child_id).unwrap();
4864        assert_eq!(root.status, models::StepStatus::Queued);
4865        assert_eq!(child.status, models::StepStatus::Blocked);
4866
4867        let running = Arc::clone(&scheduler);
4868        let handle = tokio::spawn(async move {
4869            let _ = running.run().await;
4870        });
4871
4872        tokio::time::sleep(Duration::from_secs(1)).await;
4873        handle.abort();
4874
4875        let steps = storage.fetch_steps_for_run(run_id).await.unwrap();
4876        let root = steps.iter().find(|s| s.step_id == root_id).unwrap();
4877        let child = steps.iter().find(|s| s.step_id == child_id).unwrap();
4878        assert_eq!(root.status, models::StepStatus::Succeeded);
4879        assert_eq!(child.status, models::StepStatus::Succeeded);
4880        assert_eq!(root.attempt, 1);
4881        assert_eq!(child.attempt, 1);
4882
4883        let run_status = storage.fetch_run(run_id).await.unwrap().unwrap().status;
4884        assert_eq!(run_status, models::RunStatus::Succeeded);
4885
4886        drop(container);
4887    }
4888
4889    #[cfg(feature = "insecure-shell")]
4890    #[tokio::test]
4891    async fn lease_reaper_requeues_expired() {
4892        if std::fs::metadata("/var/run/docker.sock").is_err()
4893            && std::env::var("DOCKER_HOST").is_err()
4894        {
4895            eprintln!("Skipping scheduler tests because Docker is unavailable");
4896            return;
4897        }
4898
4899        std::env::set_var("FLEETFORGE_ALLOWED_TOOLS", "/bin/echo");
4900        std::env::set_var("FLEETFORGE_ALLOW_SHELL", "1");
4901
4902        let docker = Cli::default();
4903        let container = docker.run(Postgres::default());
4904        let port = container.get_host_port_ipv4(5432);
4905        let database_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
4906        sleep(Duration::from_millis(250)).await;
4907
4908        let storage = Arc::new(
4909            Storage::connect(StorageConfig {
4910                database_url,
4911                max_connections: 2,
4912                connect_timeout: Duration::from_secs(10),
4913                object_store: ObjectStoreConfig::InMemory,
4914            })
4915            .await
4916            .expect("connect storage"),
4917        );
4918
4919        let mut registry = ExecutorRegistry::new();
4920        registry
4921            .register(Arc::new(
4922                ShellExecutor::new().expect("shell executor should be enabled for test"),
4923            ))
4924            .expect("register shell executor");
4925        let audit = Arc::new(Audit::new(storage.pool().clone()));
4926        let attestation_vault: Arc<dyn AttestationVault> =
4927            Arc::new(InMemoryAttestationVault::new());
4928
4929        let scheduler = Arc::new(Scheduler::new(
4930            Arc::clone(&storage),
4931            Arc::new(registry),
4932            crate::policy::AllowAllPolicy::shared(),
4933            Arc::clone(&audit),
4934            Arc::clone(&attestation_vault),
4935            SchedulerConfig {
4936                batch_size: 1,
4937                idle_backoff: Duration::from_millis(50),
4938                active_backoff: Duration::from_millis(10),
4939                reaper_interval: Duration::from_millis(200),
4940                transparency_writer: false,
4941                ..SchedulerConfig::default()
4942            },
4943        ));
4944
4945        let run_id = Uuid::new_v4();
4946        let step_id = Uuid::new_v4();
4947
4948        storage
4949            .insert_run_with_steps(
4950                models::NewRun {
4951                    run_id,
4952                    status: models::RunStatus::Pending,
4953                    dag_json: json!({
4954                        "steps": [{
4955                            "id": step_id,
4956                            "type": "tool",
4957                            "inputs": {"command": ["/bin/echo", "late"]},
4958                            "policy": {}
4959                        }],
4960                        "edges": []
4961                    }),
4962                    input_ctx: json!({}),
4963                    seed: 1,
4964                    idempotency_key: None,
4965                },
4966                &[models::NewStep {
4967                    run_id,
4968                    step_id,
4969                    idx: 0,
4970                    priority: 1,
4971                    spec_json: json!({
4972                        "id": step_id,
4973                        "type": "tool",
4974                        "inputs": {"command": ["/bin/echo", "late"]},
4975                        "policy": {}
4976                    }),
4977                    status: models::StepStatus::Queued,
4978                    attempt: 0,
4979                    input_snapshot: None,
4980                    provider: None,
4981                    provider_version: None,
4982                    pending_dependencies: 0,
4983                    total_dependencies: 0,
4984                    estimated_tokens: 0,
4985                    estimated_cost: 0.0,
4986                    actual_tokens: None,
4987                    actual_cost: None,
4988                    ..models::NewStep::default()
4989                }],
4990                &[],
4991            )
4992            .await
4993            .expect("seed run");
4994
4995        // Manually lease the step with a short expiry to simulate a hung worker.
4996        sqlx::query(
4997            "update steps set status = 'leased', leased_by = 'worker-a', lease_expires_at = now() - interval '1 second' where run_id = $1 and step_id = $2"
4998        )
4999        .bind(run_id)
5000        .bind(step_id)
5001        .execute(storage.pool())
5002        .await
5003        .expect("mark leased");
5004
5005        let running = Arc::clone(&scheduler);
5006        let handle = tokio::spawn(async move {
5007            let _ = running.run().await;
5008        });
5009
5010        tokio::time::sleep(Duration::from_secs(2)).await;
5011        handle.abort();
5012
5013        let step = storage
5014            .fetch_steps_for_run(run_id)
5015            .await
5016            .unwrap()
5017            .first()
5018            .cloned()
5019            .unwrap();
5020        assert_eq!(step.status, models::StepStatus::Succeeded);
5021        assert_eq!(step.attempt, 1);
5022
5023        drop(container);
5024    }
5025
5026    #[cfg(feature = "insecure-shell")]
5027    #[tokio::test]
5028    async fn budget_denial_records_failure() {
5029        if std::fs::metadata("/var/run/docker.sock").is_err()
5030            && std::env::var("DOCKER_HOST").is_err()
5031        {
5032            eprintln!("Skipping scheduler tests because Docker is unavailable");
5033            return;
5034        }
5035
5036        std::env::set_var("FLEETFORGE_ALLOWED_TOOLS", "/bin/echo");
5037        std::env::set_var("FLEETFORGE_ALLOW_SHELL", "1");
5038
5039        let docker = Cli::default();
5040        let container = docker.run(Postgres::default());
5041        let port = container.get_host_port_ipv4(5432);
5042        let database_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
5043        sleep(Duration::from_millis(250)).await;
5044
5045        let storage = Arc::new(
5046            Storage::connect(StorageConfig {
5047                database_url,
5048                max_connections: 2,
5049                connect_timeout: Duration::from_secs(10),
5050                object_store: ObjectStoreConfig::InMemory,
5051            })
5052            .await
5053            .expect("connect storage"),
5054        );
5055
5056        let mut registry = ExecutorRegistry::new();
5057        registry
5058            .register(Arc::new(
5059                ShellExecutor::new().expect("shell executor should be enabled for test"),
5060            ))
5061            .expect("register shell executor");
5062        let audit = Arc::new(Audit::new(storage.pool().clone()));
5063        let attestation_vault: Arc<dyn AttestationVault> =
5064            Arc::new(InMemoryAttestationVault::new());
5065
5066        let scheduler = Arc::new(Scheduler::new(
5067            Arc::clone(&storage),
5068            Arc::new(registry),
5069            crate::policy::AllowAllPolicy::shared(),
5070            Arc::clone(&audit),
5071            Arc::clone(&attestation_vault),
5072            SchedulerConfig::default(),
5073        ));
5074
5075        let run_id = Uuid::new_v4();
5076        let step_id = Uuid::new_v4();
5077
5078        storage
5079            .insert_run_with_steps(
5080                models::NewRun {
5081                    run_id,
5082                    status: models::RunStatus::Pending,
5083                    dag_json: json!({
5084                        "steps": [{
5085                            "id": step_id,
5086                            "type": "tool",
5087                            "inputs": {"command": ["/bin/echo", "over"]},
5088                            "policy": {"budget": {"tokens": 10, "cost": 0.0}}
5089                        }],
5090                        "edges": []
5091                    }),
5092                    input_ctx: json!({}),
5093                    seed: 1,
5094                    idempotency_key: None,
5095                },
5096                &[models::NewStep {
5097                    run_id,
5098                    step_id,
5099                    idx: 0,
5100                    priority: 1,
5101                    spec_json: json!({
5102                        "id": step_id,
5103                        "type": "tool",
5104                        "inputs": {"command": ["/bin/echo", "over"]},
5105                        "policy": {"budget": {"tokens": 10, "cost": 0.0}}
5106                    }),
5107                    status: models::StepStatus::Queued,
5108                    attempt: 0,
5109                    input_snapshot: None,
5110                    provider: None,
5111                    provider_version: None,
5112                    pending_dependencies: 0,
5113                    total_dependencies: 0,
5114                    estimated_tokens: 20,
5115                    estimated_cost: 0.0,
5116                    actual_tokens: None,
5117                    actual_cost: None,
5118                    ..models::NewStep::default()
5119                }],
5120                &[],
5121            )
5122            .await
5123            .expect("seed run");
5124
5125        let running = Arc::clone(&scheduler);
5126        let handle = tokio::spawn(async move {
5127            let _ = running.run().await;
5128        });
5129
5130        tokio::time::sleep(Duration::from_millis(500)).await;
5131        handle.abort();
5132
5133        let steps = storage.fetch_steps_for_run(run_id).await.unwrap();
5134        let step = steps.first().unwrap();
5135        assert_eq!(step.status, models::StepStatus::Failed);
5136        let error = step.error_json.clone().unwrap();
5137        assert_eq!(error["kind"], json!("budget_exceeded"));
5138
5139        let events = storage.fetch_unpublished_outbox(10).await.unwrap();
5140        let failed_event = events.iter().find(|ev| ev.kind == "step_failed").unwrap();
5141        assert_eq!(
5142            failed_event.payload["error"]["kind"],
5143            json!("budget_exceeded")
5144        );
5145
5146        drop(container);
5147    }
5148}