fleetforge_runtime/
replay.rs

1//! Time-travel replay executor for deterministic verification and evals.
2
3use std::collections::{BTreeMap, HashMap};
4use std::sync::Arc;
5
6use anyhow::{anyhow, bail, Context, Result};
7use bytes::Bytes;
8use hex;
9use serde_json::{json, Map, Value};
10use tracing::{info, warn};
11use uuid::Uuid;
12
13use crate::model::{RunId, StepId};
14use fleetforge_storage::{models, ObjectStoreConfig, Storage, StorageConfig};
15
16/// Replay strategy for deterministic time-travel.
17#[derive(Debug, Clone, Copy)]
18pub enum ReplayStrategy {
19    /// Rehydrate outputs from recorded snapshots only.
20    Mocked,
21    /// Execute live using registered gateways/executors.
22    Live,
23}
24
25/// Tolerances for drift detection when comparing recorded vs current outputs.
26#[derive(Debug, Clone, Copy)]
27pub struct DriftTolerance {
28    pub tokens: f64,
29    pub cost: f64,
30}
31
32impl Default for DriftTolerance {
33    fn default() -> Self {
34        Self {
35            tokens: 0.0,
36            cost: 0.0,
37        }
38    }
39}
40
41/// Replay outcome including rehydrated snapshots and drift findings.
42#[derive(Debug)]
43pub struct ReplayOutcome {
44    pub run_id: RunId,
45    pub strategy: ReplayStrategy,
46    pub steps: Vec<StepReplay>,
47    pub all_within_tolerance: bool,
48}
49
50/// Rehydrated step snapshot with optional drift metadata.
51#[derive(Debug)]
52pub struct StepReplay {
53    pub step_id: StepId,
54    pub snapshot: Value,
55    pub drift: Option<StepDrift>,
56}
57
58/// Drift comparison between recorded snapshot and current output.
59#[derive(Debug)]
60pub struct StepDrift {
61    pub recorded: Value,
62    pub current: Value,
63    pub token_delta: Option<f64>,
64    pub cost_delta: Option<f64>,
65    pub within_tolerance: bool,
66}
67
68/// Orchestrates mocked/live replays by loading past step snapshots.
69pub struct ReplayExecutor {
70    storage: Arc<Storage>,
71}
72
73impl ReplayExecutor {
74    /// Builds a replay executor backed by the shared storage handle.
75    pub fn new(storage: Arc<Storage>) -> Self {
76        Self { storage }
77    }
78
79    /// Replays every step in a run and returns drift diagnostics.
80    pub async fn replay_run(
81        &self,
82        run_id: RunId,
83        strategy: ReplayStrategy,
84        tolerance: DriftTolerance,
85    ) -> Result<ReplayOutcome> {
86        let steps = self.storage.fetch_steps_for_run(Uuid::from(run_id)).await?;
87        let attempt_rows = self
88            .storage
89            .fetch_step_attempts_for_run(Uuid::from(run_id))
90            .await?;
91        let mut attempts_by_step: HashMap<Uuid, Vec<models::StepAttempt>> = HashMap::new();
92        for attempt in attempt_rows {
93            attempts_by_step
94                .entry(attempt.step_id)
95                .or_default()
96                .push(attempt);
97        }
98        for attempts in attempts_by_step.values_mut() {
99            attempts.sort_by_key(|attempt| attempt.attempt);
100        }
101
102        match strategy {
103            ReplayStrategy::Mocked => {
104                self.replay_mocked(run_id, steps, attempts_by_step, tolerance)
105                    .await
106            }
107            ReplayStrategy::Live => {
108                warn!(
109                    run = %run_id,
110                    "live replay not yet implemented; falling back to mocked replay"
111                );
112                self.replay_mocked(run_id, steps, attempts_by_step, tolerance)
113                    .await
114            }
115        }
116    }
117}
118
119impl ReplayExecutor {
120    async fn replay_mocked(
121        &self,
122        run_id: RunId,
123        steps: Vec<models::Step>,
124        attempts_by_step: HashMap<Uuid, Vec<models::StepAttempt>>,
125        tolerance: DriftTolerance,
126    ) -> Result<ReplayOutcome> {
127        let mut attempts_by_step = attempts_by_step;
128        let mut replays = Vec::with_capacity(steps.len());
129        let mut all_within = true;
130
131        for step in steps {
132            info!(
133                run = %run_id,
134                step = %step.step_id,
135                "mock replay using recorded snapshot"
136            );
137
138            let attempt_list = attempts_by_step
139                .get(&step.step_id)
140                .ok_or_else(|| anyhow!("no attempts recorded for step {}", step.step_id))?;
141            let (baseline, latest) = select_attempt_pair(attempt_list).ok_or_else(|| {
142                anyhow!(
143                    "step {} missing output snapshots; cannot run mocked replay",
144                    step.step_id
145                )
146            })?;
147
148            let recorded_snapshot = baseline.output_snapshot.clone().ok_or_else(|| {
149                anyhow!(
150                    "attempt {} for step {} missing output_snapshot; cannot replay",
151                    baseline.attempt,
152                    step.step_id
153                )
154            })?;
155            let current_snapshot = latest.output_snapshot.clone().ok_or_else(|| {
156                anyhow!(
157                    "latest attempt {} for step {} missing output_snapshot; cannot replay",
158                    latest.attempt,
159                    step.step_id
160                )
161            })?;
162
163            let mut drift = compute_drift(&recorded_snapshot, &current_snapshot, tolerance);
164
165            let recorded_attestations = extract_attestation_ids(&recorded_snapshot);
166            let current_attestations = extract_attestation_ids(&current_snapshot);
167            if recorded_attestations != current_attestations {
168                let diff_payload = build_attestation_diff_payload(
169                    run_id,
170                    StepId(step.step_id),
171                    &recorded_attestations,
172                    &current_attestations,
173                );
174                let diff_sha = self
175                    .persist_replay_diff(
176                        run_id,
177                        StepId(step.step_id),
178                        "replay_attestation_diff",
179                        diff_payload,
180                    )
181                    .await?;
182                warn!(
183                    run = %run_id,
184                    step = %step.step_id,
185                    recorded = ?recorded_attestations,
186                    current = ?current_attestations,
187                    diff_sha256 = %diff_sha,
188                    "attestation ids diverged during replay"
189                );
190                bail!(
191                    "attestation ids diverged during replay for step {} (kind=replay_attestation_diff, diff_sha256={})",
192                    step.step_id,
193                    diff_sha
194                );
195            }
196
197            if !tool_events_identical(baseline.tool_events.as_ref(), latest.tool_events.as_ref()) {
198                let diff_payload = build_tool_event_diff_payload(
199                    run_id,
200                    StepId(step.step_id),
201                    baseline.tool_events.as_ref(),
202                    latest.tool_events.as_ref(),
203                );
204                let diff_sha = self
205                    .persist_replay_diff(
206                        run_id,
207                        StepId(step.step_id),
208                        "replay_tool_diff",
209                        diff_payload,
210                    )
211                    .await?;
212                warn!(
213                    run = %run_id,
214                    step = %step.step_id,
215                    diff_sha256 = %diff_sha,
216                    "tool events diverged during replay"
217                );
218                bail!(
219                    "tool events diverged during replay for step {} (kind=replay_tool_diff, diff_sha256={})",
220                    step.step_id,
221                    diff_sha
222                );
223            }
224
225            if !drift.within_tolerance {
226                all_within = false;
227            }
228
229            replays.push(StepReplay {
230                step_id: StepId(step.step_id),
231                snapshot: recorded_snapshot,
232                drift: Some(drift),
233            });
234        }
235
236        Ok(ReplayOutcome {
237            run_id,
238            strategy: ReplayStrategy::Mocked,
239            steps: replays,
240            all_within_tolerance: all_within,
241        })
242    }
243}
244
245fn compute_drift(recorded: &Value, current: &Value, tolerance: DriftTolerance) -> StepDrift {
246    let token_delta = diff_numeric(
247        recorded,
248        current,
249        &["tokens", "token_count", "total_tokens"],
250    );
251    let cost_delta = diff_numeric(recorded, current, &["cost", "cost_usd", "total_cost"]);
252
253    let recorded_tokens = find_numeric(recorded, &["tokens", "token_count", "total_tokens"]);
254    let recorded_cost = find_numeric(recorded, &["cost", "cost_usd", "total_cost"]);
255    let sanitized_recorded = strip_keys(
256        recorded,
257        &[
258            "tokens",
259            "token_count",
260            "total_tokens",
261            "cost",
262            "cost_usd",
263            "total_cost",
264        ],
265    );
266    let sanitized_current = strip_keys(
267        current,
268        &[
269            "tokens",
270            "token_count",
271            "total_tokens",
272            "cost",
273            "cost_usd",
274            "total_cost",
275        ],
276    );
277
278    let tokens_within = match (token_delta, recorded_tokens) {
279        (Some(delta), Some(base)) if base.abs() > f64::EPSILON => {
280            (delta / base).abs() <= tolerance.tokens
281        }
282        (Some(delta), Some(_)) => delta.abs() <= tolerance.tokens,
283        (Some(delta), None) => delta.abs() <= tolerance.tokens,
284        _ => true,
285    };
286    let cost_within = match (cost_delta, recorded_cost) {
287        (Some(delta), Some(base)) if base.abs() > f64::EPSILON => {
288            (delta / base).abs() <= tolerance.cost
289        }
290        (Some(delta), Some(_)) => delta.abs() <= tolerance.cost,
291        (Some(delta), None) => delta.abs() <= tolerance.cost,
292        _ => true,
293    };
294
295    let structure_equal = sanitized_recorded == sanitized_current;
296
297    StepDrift {
298        recorded: recorded.clone(),
299        current: current.clone(),
300        token_delta,
301        cost_delta,
302        within_tolerance: tokens_within && cost_within && structure_equal,
303    }
304}
305
306fn diff_numeric(recorded: &Value, current: &Value, keys: &[&str]) -> Option<f64> {
307    let recorded_value = find_numeric(recorded, keys)?;
308    let current_value = find_numeric(current, keys)?;
309    Some(current_value - recorded_value)
310}
311
312fn find_numeric(value: &Value, keys: &[&str]) -> Option<f64> {
313    match value {
314        Value::Object(map) => {
315            for key in keys {
316                if let Some(v) = map.get(*key) {
317                    if let Some(num) = v.as_f64() {
318                        return Some(num);
319                    }
320                }
321            }
322            for v in map.values() {
323                if let Some(found) = find_numeric(v, keys) {
324                    return Some(found);
325                }
326            }
327            None
328        }
329        Value::Array(items) => {
330            for item in items {
331                if let Some(found) = find_numeric(item, keys) {
332                    return Some(found);
333                }
334            }
335            None
336        }
337        _ => None,
338    }
339}
340
341fn strip_keys(value: &Value, keys: &[&str]) -> Value {
342    match value {
343        Value::Object(map) => {
344            let mut new_map = serde_json::Map::new();
345            for (k, v) in map {
346                if keys.iter().any(|key| key == k) {
347                    continue;
348                }
349                new_map.insert(k.clone(), strip_keys(v, keys));
350            }
351            Value::Object(new_map)
352        }
353        Value::Array(items) => Value::Array(items.iter().map(|v| strip_keys(v, keys)).collect()),
354        _ => value.clone(),
355    }
356}
357
358fn select_attempt_pair(
359    attempts: &[models::StepAttempt],
360) -> Option<(&models::StepAttempt, &models::StepAttempt)> {
361    let baseline = attempts
362        .iter()
363        .find(|attempt| attempt.output_snapshot.is_some())?;
364    let latest = attempts
365        .iter()
366        .rev()
367        .find(|attempt| attempt.output_snapshot.is_some())
368        .unwrap_or(baseline);
369    Some((baseline, latest))
370}
371
372fn extract_attestation_ids(value: &Value) -> Vec<String> {
373    match value {
374        Value::Object(map) => map
375            .get("attestation_ids")
376            .and_then(Value::as_array)
377            .map(|arr| {
378                arr.iter()
379                    .filter_map(Value::as_str)
380                    .map(|s| s.to_string())
381                    .collect()
382            })
383            .unwrap_or_default(),
384        _ => Vec::new(),
385    }
386}
387
388fn tool_events_identical(recorded: Option<&Value>, current: Option<&Value>) -> bool {
389    match (recorded, current) {
390        (None, None) => true,
391        (Some(a), Some(b)) => canonicalize(a) == canonicalize(b),
392        _ => false,
393    }
394}
395
396fn canonicalize(value: &Value) -> Value {
397    match value {
398        Value::Object(map) => {
399            let mut ordered = BTreeMap::new();
400            for (key, val) in map {
401                ordered.insert(key.clone(), canonicalize(val));
402            }
403            let mut object = Map::new();
404            for (key, val) in ordered {
405                object.insert(key, val);
406            }
407            Value::Object(object)
408        }
409        Value::Array(items) => Value::Array(items.iter().map(|item| canonicalize(item)).collect()),
410        _ => value.clone(),
411    }
412}
413
414impl ReplayExecutor {
415    async fn persist_replay_diff(
416        &self,
417        run_id: RunId,
418        step_id: StepId,
419        kind: &'static str,
420        payload: Value,
421    ) -> Result<String> {
422        let metadata = json!({
423            "kind": kind,
424            "run_id": Uuid::from(run_id).to_string(),
425            "step_id": Uuid::from(step_id).to_string(),
426        });
427        let serialized =
428            serde_json::to_vec(&payload).context("failed to serialize replay diff payload")?;
429        let artifact = self
430            .storage
431            .artifacts()
432            .put(Bytes::from(serialized), "application/json", metadata)
433            .await
434            .context("failed to persist replay diff artifact")?;
435        Ok(hex::encode(artifact.sha256))
436    }
437}
438
439fn build_attestation_diff_payload(
440    run_id: RunId,
441    step_id: StepId,
442    recorded: &[String],
443    current: &[String],
444) -> Value {
445    let (missing, unexpected) = diff_attestations(recorded, current);
446    json!({
447        "run_id": Uuid::from(run_id).to_string(),
448        "step_id": Uuid::from(step_id).to_string(),
449        "recorded": recorded,
450        "current": current,
451        "missing": missing,
452        "unexpected": unexpected,
453    })
454}
455
456fn diff_attestations(recorded: &[String], current: &[String]) -> (Vec<String>, Vec<String>) {
457    (
458        recorded
459            .iter()
460            .filter(|value| !current.contains(value))
461            .cloned()
462            .collect(),
463        current
464            .iter()
465            .filter(|value| !recorded.contains(value))
466            .cloned()
467            .collect(),
468    )
469}
470
471fn build_tool_event_diff_payload(
472    run_id: RunId,
473    step_id: StepId,
474    recorded: Option<&Value>,
475    current: Option<&Value>,
476) -> Value {
477    json!({
478        "run_id": Uuid::from(run_id).to_string(),
479        "step_id": Uuid::from(step_id).to_string(),
480        "recorded": recorded.unwrap_or(&Value::Null),
481        "current": current.unwrap_or(&Value::Null),
482    })
483}
484
485#[cfg(test)]
486mod tests {
487    use super::*;
488    use serde_json::json;
489    use std::sync::Arc;
490    use std::time::Duration;
491    use testcontainers::clients::Cli;
492    use testcontainers::images::postgres::Postgres;
493    use tokio::time::sleep;
494
495    #[tokio::test]
496    async fn mocked_replay_parity() -> Result<()> {
497        if std::fs::metadata("/var/run/docker.sock").is_err()
498            && std::env::var("DOCKER_HOST").is_err()
499        {
500            warn!("Skipping replay test because Docker is unavailable");
501            return Ok(());
502        }
503
504        let docker = Cli::default();
505        let container = docker.run(Postgres::default());
506        let port = container.get_host_port_ipv4(5432);
507        let database_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
508
509        // Postgres inside the container can take a moment to accept connections.
510        sleep(Duration::from_millis(250)).await;
511
512        let storage = Arc::new(
513            Storage::connect(StorageConfig {
514                database_url,
515                max_connections: 2,
516                connect_timeout: Duration::from_secs(10),
517                object_store: ObjectStoreConfig::InMemory,
518            })
519            .await?,
520        );
521
522        let run_id = Uuid::new_v4();
523        let step_id = Uuid::new_v4();
524        let spec_json = json!({
525            "id": step_id,
526            "type": "map",
527            "inputs": {"task": "demo"},
528            "policy": {"budget": {"tokens": 600, "cost": 0.002}}
529        });
530
531        let _ = storage
532            .insert_run_with_steps(
533                models::NewRun {
534                    run_id,
535                    status: models::RunStatus::Pending,
536                    dag_json: json!({"steps": [spec_json.clone()], "edges": []}),
537                    input_ctx: json!({}),
538                    seed: 42,
539                    idempotency_key: None,
540                },
541                &[models::NewStep {
542                    run_id,
543                    step_id,
544                    idx: 0,
545                    priority: 5,
546                    spec_json: spec_json.clone(),
547                    status: models::StepStatus::Queued,
548                    attempt: 0,
549                    input_snapshot: None,
550                    provider: Some("fleet.map".to_string()),
551                    provider_version: None,
552                    pending_dependencies: 0,
553                    total_dependencies: 0,
554                    estimated_tokens: 600,
555                    estimated_cost: 0.002,
556                    actual_tokens: None,
557                    actual_cost: None,
558                    ..models::NewStep::default()
559                }],
560                &[],
561            )
562            .await?;
563
564        let output = json!({
565            "message": "Weather summary",
566            "metrics": {
567                "tokens": 600,
568                "cost": 0.002
569            }
570        });
571
572        let mut tx = storage.pool().begin().await?;
573        storage
574            .update_step_status_tx(
575                &mut tx,
576                run_id,
577                step_id,
578                models::StepStatus::Succeeded,
579                Some(1),
580                Some(output.clone()),
581                None,
582                None,
583                Some(output.clone()),
584                None,
585                Some("fleet.map"),
586                None,
587            )
588            .await?;
589        storage
590            .update_step_usage_tx(&mut tx, run_id, step_id, Some(600), Some(0.002))
591            .await?;
592        storage
593            .insert_step_attempt_tx(
594                &mut tx,
595                models::NewStepAttempt {
596                    run_id,
597                    step_id,
598                    attempt: 1,
599                    status: models::StepStatus::Succeeded,
600                    latency_ms: None,
601                    spec_snapshot: spec_json.clone(),
602                    input_snapshot: None,
603                    output_snapshot: Some(output.clone()),
604                    error_snapshot: None,
605                    checkpoint: None,
606                    budget_snapshot: None,
607                    guardrail_events: None,
608                    tool_events: None,
609                    artifacts: None,
610                },
611            )
612            .await?;
613        tx.commit().await?;
614
615        let executor = ReplayExecutor::new(storage.clone());
616        let result = executor
617            .replay_run(
618                RunId(run_id),
619                ReplayStrategy::Mocked,
620                DriftTolerance {
621                    tokens: 0.0,
622                    cost: 0.0,
623                },
624            )
625            .await?;
626
627        assert!(result.all_within_tolerance);
628        assert_eq!(result.steps.len(), 1);
629        assert!(result.steps[0].drift.as_ref().unwrap().within_tolerance);
630
631        drop(container);
632        Ok(())
633    }
634
635    #[tokio::test]
636    async fn mocked_replay_within_one_percent() -> Result<()> {
637        if std::fs::metadata("/var/run/docker.sock").is_err()
638            && std::env::var("DOCKER_HOST").is_err()
639        {
640            warn!("Skipping replay test because Docker is unavailable");
641            return Ok(());
642        }
643
644        let docker = Cli::default();
645        let container = docker.run(Postgres::default());
646        let port = container.get_host_port_ipv4(5432);
647        let database_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
648        sleep(Duration::from_millis(250)).await;
649
650        let storage = Arc::new(
651            Storage::connect(StorageConfig {
652                database_url,
653                max_connections: 2,
654                connect_timeout: Duration::from_secs(10),
655                object_store: ObjectStoreConfig::InMemory,
656            })
657            .await?,
658        );
659
660        let run_id = Uuid::new_v4();
661        let step_id = Uuid::new_v4();
662
663        let recorded_snapshot = json!({
664            "message": "Original output",
665            "metrics": {
666                "tokens": 100,
667                "cost": 1.00
668            }
669        });
670
671        let actual_output = json!({
672            "message": "Original output",
673            "metrics": {
674                "tokens": 100.5,
675                "cost": 1.009
676            }
677        });
678
679        let _ = storage
680            .insert_run_with_steps(
681                models::NewRun {
682                    run_id,
683                    status: models::RunStatus::Pending,
684                    dag_json: json!({
685                        "steps": [{
686                            "id": step_id,
687                            "type": "tool",
688                            "inputs": {"prompt": "demo"},
689                            "policy": {"budget": {"tokens": 100, "cost": 1.0}}
690                        }],
691                        "edges": []
692                    }),
693                    input_ctx: json!({}),
694                    seed: 1,
695                    idempotency_key: None,
696                },
697                &[models::NewStep {
698                    run_id,
699                    step_id,
700                    idx: 0,
701                    priority: 0,
702                    spec_json: json!({
703                        "id": step_id,
704                        "type": "tool",
705                        "inputs": {"prompt": "demo"},
706                        "policy": {"budget": {"tokens": 100, "cost": 1.0}}
707                    }),
708                    status: models::StepStatus::Succeeded,
709                    attempt: 1,
710                    input_snapshot: Some(json!({"prompt": "demo"})),
711                    provider: None,
712                    provider_version: None,
713                    pending_dependencies: 0,
714                    total_dependencies: 0,
715                    estimated_tokens: 100,
716                    estimated_cost: 1.0,
717                    actual_tokens: Some(101),
718                    actual_cost: Some(1.02),
719                    ..models::NewStep::default()
720                }],
721                &[],
722            )
723            .await?;
724
725        let mut tx = storage.pool().begin().await?;
726        storage
727            .insert_step_attempt_tx(
728                &mut tx,
729                models::NewStepAttempt {
730                    run_id,
731                    step_id,
732                    attempt: 1,
733                    status: models::StepStatus::Succeeded,
734                    latency_ms: None,
735                    spec_snapshot: json!({
736                        "id": step_id,
737                        "type": "tool",
738                        "inputs": {"prompt": "demo"},
739                        "policy": {"budget": {"tokens": 100, "cost": 1.0}}
740                    }),
741                    input_snapshot: Some(json!({"prompt": "demo"})),
742                    output_snapshot: Some(recorded_snapshot.clone()),
743                    error_snapshot: None,
744                    checkpoint: None,
745                    budget_snapshot: None,
746                    guardrail_events: None,
747                    tool_events: None,
748                    artifacts: None,
749                },
750            )
751            .await?;
752        storage
753            .insert_step_attempt_tx(
754                &mut tx,
755                models::NewStepAttempt {
756                    run_id,
757                    step_id,
758                    attempt: 2,
759                    status: models::StepStatus::Succeeded,
760                    latency_ms: None,
761                    spec_snapshot: json!({
762                        "id": step_id,
763                        "type": "tool",
764                        "inputs": {"prompt": "demo"},
765                        "policy": {"budget": {"tokens": 100, "cost": 1.0}}
766                    }),
767                    input_snapshot: Some(json!({"prompt": "demo"})),
768                    output_snapshot: Some(actual_output.clone()),
769                    error_snapshot: None,
770                    checkpoint: None,
771                    budget_snapshot: None,
772                    guardrail_events: None,
773                    tool_events: None,
774                    artifacts: None,
775                },
776            )
777            .await?;
778        storage
779            .update_step_status_tx(
780                &mut tx,
781                run_id,
782                step_id,
783                models::StepStatus::Succeeded,
784                Some(2),
785                Some(actual_output.clone()),
786                None,
787                Some(json!({"prompt": "demo"})),
788                Some(actual_output.clone()),
789                None,
790                None,
791                None,
792            )
793            .await?;
794        storage
795            .update_step_usage_tx(&mut tx, run_id, step_id, Some(101), Some(1.02))
796            .await?;
797        tx.commit().await?;
798
799        let executor = ReplayExecutor::new(storage.clone());
800        let outcome = executor
801            .replay_run(
802                RunId(run_id),
803                ReplayStrategy::Mocked,
804                DriftTolerance {
805                    tokens: 0.01,
806                    cost: 0.01,
807                },
808            )
809            .await?;
810
811        assert!(
812            outcome.all_within_tolerance,
813            "replay outcome exceeded tolerance"
814        );
815        let drift = outcome
816            .steps
817            .first()
818            .and_then(|step| step.drift.as_ref())
819            .expect("drift metadata present");
820        assert!(
821            drift.within_tolerance,
822            "drift should be within tolerance: {drift:?}"
823        );
824
825        drop(container);
826        Ok(())
827    }
828
829    #[tokio::test]
830    async fn mocked_replay_detects_attestation_mismatch() -> Result<()> {
831        if std::fs::metadata("/var/run/docker.sock").is_err()
832            && std::env::var("DOCKER_HOST").is_err()
833        {
834            warn!("Skipping replay test because Docker is unavailable");
835            return Ok(());
836        }
837
838        let docker = Cli::default();
839        let container = docker.run(Postgres::default());
840        let port = container.get_host_port_ipv4(5432);
841        let database_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
842        sleep(Duration::from_millis(250)).await;
843
844        let storage = Arc::new(
845            Storage::connect(StorageConfig {
846                database_url,
847                max_connections: 2,
848                connect_timeout: Duration::from_secs(10),
849                object_store: ObjectStoreConfig::InMemory,
850            })
851            .await?,
852        );
853
854        let run_id = Uuid::new_v4();
855        let step_id = Uuid::new_v4();
856
857        let baseline_output = json!({
858            "result": "ok",
859            "attestation_ids": [Uuid::new_v4().to_string()],
860        });
861        let latest_output = json!({
862            "result": "ok",
863            "attestation_ids": [Uuid::new_v4().to_string()],
864        });
865
866        let _ = storage
867            .insert_run_with_steps(
868                models::NewRun {
869                    run_id,
870                    status: models::RunStatus::Pending,
871                    dag_json: json!({"steps": [{"id": step_id, "type": "tool", "inputs": {}}], "edges": []}),
872                    input_ctx: json!({}),
873                    seed: 11,
874                    idempotency_key: None,
875                },
876                &[models::NewStep {
877                    run_id,
878                    step_id,
879                    idx: 0,
880                    priority: 0,
881                    spec_json: json!({"id": step_id, "type": "tool", "inputs": {}}),
882                    status: models::StepStatus::Succeeded,
883                    attempt: 1,
884                    input_snapshot: None,
885                    provider: None,
886                    provider_version: None,
887                    pending_dependencies: 0,
888                    total_dependencies: 0,
889                    estimated_tokens: 0,
890                    estimated_cost: 0.0,
891                    actual_tokens: None,
892                    actual_cost: None,
893                    ..models::NewStep::default()
894                }],
895                &[],
896            )
897            .await?;
898
899        let mut tx = storage.pool().begin().await?;
900        storage
901            .insert_step_attempt_tx(
902                &mut tx,
903                models::NewStepAttempt {
904                    run_id,
905                    step_id,
906                    attempt: 1,
907                    status: models::StepStatus::Succeeded,
908                    latency_ms: None,
909                    spec_snapshot: json!({"id": step_id, "type": "tool", "inputs": {}}),
910                    input_snapshot: None,
911                    output_snapshot: Some(baseline_output.clone()),
912                    error_snapshot: None,
913                    checkpoint: None,
914                    budget_snapshot: None,
915                    guardrail_events: None,
916                    tool_events: None,
917                    artifacts: None,
918                },
919            )
920            .await?;
921        storage
922            .insert_step_attempt_tx(
923                &mut tx,
924                models::NewStepAttempt {
925                    run_id,
926                    step_id,
927                    attempt: 2,
928                    status: models::StepStatus::Succeeded,
929                    latency_ms: None,
930                    spec_snapshot: json!({"id": step_id, "type": "tool", "inputs": {}}),
931                    input_snapshot: None,
932                    output_snapshot: Some(latest_output.clone()),
933                    error_snapshot: None,
934                    checkpoint: None,
935                    budget_snapshot: None,
936                    guardrail_events: None,
937                    tool_events: None,
938                    artifacts: None,
939                },
940            )
941            .await?;
942        storage
943            .update_step_status_tx(
944                &mut tx,
945                run_id,
946                step_id,
947                models::StepStatus::Succeeded,
948                Some(2),
949                Some(latest_output.clone()),
950                None,
951                None,
952                Some(latest_output.clone()),
953                None,
954                None,
955                None,
956            )
957            .await?;
958        tx.commit().await?;
959
960        let executor = ReplayExecutor::new(storage.clone());
961        let err = executor
962            .replay_run(
963                RunId(run_id),
964                ReplayStrategy::Mocked,
965                DriftTolerance {
966                    tokens: 0.01,
967                    cost: 0.01,
968                },
969            )
970            .await
971            .expect_err("attestation mismatch must fail replay");
972        assert!(
973            err.to_string().contains("replay_attestation_diff"),
974            "error should reference replay_attestation_diff artifact: {err:?}"
975        );
976
977        drop(container);
978        Ok(())
979    }
980
981    #[tokio::test]
982    async fn mocked_replay_detects_tool_event_mismatch() -> Result<()> {
983        if std::fs::metadata("/var/run/docker.sock").is_err()
984            && std::env::var("DOCKER_HOST").is_err()
985        {
986            warn!("Skipping replay test because Docker is unavailable");
987            return Ok(());
988        }
989
990        let docker = Cli::default();
991        let container = docker.run(Postgres::default());
992        let port = container.get_host_port_ipv4(5432);
993        let database_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
994        sleep(Duration::from_millis(250)).await;
995
996        let storage = Arc::new(
997            Storage::connect(StorageConfig {
998                database_url,
999                max_connections: 2,
1000                connect_timeout: Duration::from_secs(10),
1001                object_store: ObjectStoreConfig::InMemory,
1002            })
1003            .await?,
1004        );
1005
1006        let run_id = Uuid::new_v4();
1007        let step_id = Uuid::new_v4();
1008
1009        let output = json!({
1010            "result": "value",
1011            "attestation_ids": [Uuid::new_v4().to_string()],
1012        });
1013
1014        let baseline_tool_events = json!([{
1015            "kind": "tool.run",
1016            "payload": {"stdout": "hello"},
1017            "trace": {"span_id": "abc"}
1018        }]);
1019        let latest_tool_events = json!([{
1020            "kind": "tool.run",
1021            "payload": {"stdout": "goodbye"},
1022            "trace": {"span_id": "def"}
1023        }]);
1024
1025        let _ = storage
1026            .insert_run_with_steps(
1027                models::NewRun {
1028                    run_id,
1029                    status: models::RunStatus::Pending,
1030                    dag_json: json!({"steps": [{"id": step_id, "type": "tool", "inputs": {}}], "edges": []}),
1031                    input_ctx: json!({}),
1032                    seed: 21,
1033                    idempotency_key: None,
1034                },
1035                &[models::NewStep {
1036                    run_id,
1037                    step_id,
1038                    idx: 0,
1039                    priority: 0,
1040                    spec_json: json!({"id": step_id, "type": "tool", "inputs": {}}),
1041                    status: models::StepStatus::Succeeded,
1042                    attempt: 1,
1043                    input_snapshot: None,
1044                    provider: None,
1045                    provider_version: None,
1046                    pending_dependencies: 0,
1047                    total_dependencies: 0,
1048                    estimated_tokens: 0,
1049                    estimated_cost: 0.0,
1050                    actual_tokens: None,
1051                    actual_cost: None,
1052                    ..models::NewStep::default()
1053                }],
1054                &[],
1055            )
1056            .await?;
1057
1058        let mut tx = storage.pool().begin().await?;
1059        storage
1060            .insert_step_attempt_tx(
1061                &mut tx,
1062                models::NewStepAttempt {
1063                    run_id,
1064                    step_id,
1065                    attempt: 1,
1066                    status: models::StepStatus::Succeeded,
1067                    latency_ms: None,
1068                    spec_snapshot: json!({"id": step_id, "type": "tool", "inputs": {}}),
1069                    input_snapshot: None,
1070                    output_snapshot: Some(output.clone()),
1071                    error_snapshot: None,
1072                    checkpoint: None,
1073                    budget_snapshot: None,
1074                    guardrail_events: None,
1075                    tool_events: Some(baseline_tool_events.clone()),
1076                    artifacts: None,
1077                },
1078            )
1079            .await?;
1080        storage
1081            .insert_step_attempt_tx(
1082                &mut tx,
1083                models::NewStepAttempt {
1084                    run_id,
1085                    step_id,
1086                    attempt: 2,
1087                    status: models::StepStatus::Succeeded,
1088                    latency_ms: None,
1089                    spec_snapshot: json!({"id": step_id, "type": "tool", "inputs": {}}),
1090                    input_snapshot: None,
1091                    output_snapshot: Some(output.clone()),
1092                    error_snapshot: None,
1093                    checkpoint: None,
1094                    budget_snapshot: None,
1095                    guardrail_events: None,
1096                    tool_events: Some(latest_tool_events.clone()),
1097                    artifacts: None,
1098                },
1099            )
1100            .await?;
1101        storage
1102            .update_step_status_tx(
1103                &mut tx,
1104                run_id,
1105                step_id,
1106                models::StepStatus::Succeeded,
1107                Some(2),
1108                Some(output.clone()),
1109                None,
1110                None,
1111                Some(output.clone()),
1112                None,
1113                None,
1114                None,
1115            )
1116            .await?;
1117        tx.commit().await?;
1118
1119        let executor = ReplayExecutor::new(storage.clone());
1120        let err = executor
1121            .replay_run(
1122                RunId(run_id),
1123                ReplayStrategy::Mocked,
1124                DriftTolerance {
1125                    tokens: 0.0,
1126                    cost: 0.0,
1127                },
1128            )
1129            .await
1130            .expect_err("tool event mismatch must fail replay");
1131        assert!(
1132            err.to_string().contains("replay_tool_diff"),
1133            "error should reference replay_tool_diff artifact: {err:?}"
1134        );
1135
1136        drop(container);
1137        Ok(())
1138    }
1139
1140    #[tokio::test]
1141    async fn mocked_replay_requires_snapshots() {
1142        if std::fs::metadata("/var/run/docker.sock").is_err()
1143            && std::env::var("DOCKER_HOST").is_err()
1144        {
1145            warn!("Skipping replay test because Docker is unavailable");
1146            return;
1147        }
1148
1149        let docker = Cli::default();
1150        let container = docker.run(Postgres::default());
1151        let port = container.get_host_port_ipv4(5432);
1152        let database_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
1153        sleep(Duration::from_millis(250)).await;
1154
1155        let storage = Arc::new(
1156            Storage::connect(StorageConfig {
1157                database_url,
1158                max_connections: 2,
1159                connect_timeout: Duration::from_secs(10),
1160                object_store: ObjectStoreConfig::InMemory,
1161            })
1162            .await
1163            .expect("storage connection"),
1164        );
1165
1166        let run_id = Uuid::new_v4();
1167        let step_id = Uuid::new_v4();
1168
1169        let _ = storage
1170            .insert_run_with_steps(
1171                models::NewRun {
1172                    run_id,
1173                    status: models::RunStatus::Pending,
1174                    dag_json: json!({"steps": [], "edges": []}),
1175                    input_ctx: json!({}),
1176                    seed: 7,
1177                    idempotency_key: None,
1178                },
1179                &[models::NewStep {
1180                    run_id,
1181                    step_id,
1182                    idx: 0,
1183                    priority: 0,
1184                    spec_json: json!({"id": step_id, "type": "map", "inputs": {}}),
1185                    status: models::StepStatus::Queued,
1186                    attempt: 0,
1187                    input_snapshot: None,
1188                    provider: None,
1189                    provider_version: None,
1190                    pending_dependencies: 0,
1191                    total_dependencies: 0,
1192                    estimated_tokens: 0,
1193                    estimated_cost: 0.0,
1194                    actual_tokens: None,
1195                    actual_cost: None,
1196                    ..models::NewStep::default()
1197                }],
1198                &[],
1199            )
1200            .await
1201            .expect("seed run");
1202
1203        let mut tx = storage.pool().begin().await.expect("begin tx");
1204        storage
1205            .insert_step_attempt_tx(
1206                &mut tx,
1207                models::NewStepAttempt {
1208                    run_id,
1209                    step_id,
1210                    attempt: 1,
1211                    status: models::StepStatus::Queued,
1212                    latency_ms: None,
1213                    spec_snapshot: json!({"id": step_id, "type": "map", "inputs": {}}),
1214                    input_snapshot: None,
1215                    output_snapshot: None,
1216                    error_snapshot: None,
1217                    checkpoint: None,
1218                    budget_snapshot: None,
1219                    guardrail_events: None,
1220                    tool_events: None,
1221                    artifacts: None,
1222                },
1223            )
1224            .await
1225            .expect("insert attempt");
1226        tx.commit().await.expect("commit");
1227
1228        let executor = ReplayExecutor::new(storage);
1229        let err = executor
1230            .replay_run(
1231                RunId(run_id),
1232                ReplayStrategy::Mocked,
1233                DriftTolerance::default(),
1234            )
1235            .await
1236            .expect_err("missing snapshots should error");
1237
1238        assert!(err
1239            .to_string()
1240            .contains("missing output snapshots; cannot run mocked replay"));
1241        drop(container);
1242    }
1243}