1use std::collections::{BTreeMap, HashMap};
4use std::sync::Arc;
5
6use anyhow::{anyhow, bail, Context, Result};
7use bytes::Bytes;
8use hex;
9use serde_json::{json, Map, Value};
10use tracing::{info, warn};
11use uuid::Uuid;
12
13use crate::model::{RunId, StepId};
14use fleetforge_storage::{models, ObjectStoreConfig, Storage, StorageConfig};
15
16#[derive(Debug, Clone, Copy)]
18pub enum ReplayStrategy {
19 Mocked,
21 Live,
23}
24
25#[derive(Debug, Clone, Copy)]
27pub struct DriftTolerance {
28 pub tokens: f64,
29 pub cost: f64,
30}
31
32impl Default for DriftTolerance {
33 fn default() -> Self {
34 Self {
35 tokens: 0.0,
36 cost: 0.0,
37 }
38 }
39}
40
41#[derive(Debug)]
43pub struct ReplayOutcome {
44 pub run_id: RunId,
45 pub strategy: ReplayStrategy,
46 pub steps: Vec<StepReplay>,
47 pub all_within_tolerance: bool,
48}
49
50#[derive(Debug)]
52pub struct StepReplay {
53 pub step_id: StepId,
54 pub snapshot: Value,
55 pub drift: Option<StepDrift>,
56}
57
58#[derive(Debug)]
60pub struct StepDrift {
61 pub recorded: Value,
62 pub current: Value,
63 pub token_delta: Option<f64>,
64 pub cost_delta: Option<f64>,
65 pub within_tolerance: bool,
66}
67
68pub struct ReplayExecutor {
70 storage: Arc<Storage>,
71}
72
73impl ReplayExecutor {
74 pub fn new(storage: Arc<Storage>) -> Self {
76 Self { storage }
77 }
78
79 pub async fn replay_run(
81 &self,
82 run_id: RunId,
83 strategy: ReplayStrategy,
84 tolerance: DriftTolerance,
85 ) -> Result<ReplayOutcome> {
86 let steps = self.storage.fetch_steps_for_run(Uuid::from(run_id)).await?;
87 let attempt_rows = self
88 .storage
89 .fetch_step_attempts_for_run(Uuid::from(run_id))
90 .await?;
91 let mut attempts_by_step: HashMap<Uuid, Vec<models::StepAttempt>> = HashMap::new();
92 for attempt in attempt_rows {
93 attempts_by_step
94 .entry(attempt.step_id)
95 .or_default()
96 .push(attempt);
97 }
98 for attempts in attempts_by_step.values_mut() {
99 attempts.sort_by_key(|attempt| attempt.attempt);
100 }
101
102 match strategy {
103 ReplayStrategy::Mocked => {
104 self.replay_mocked(run_id, steps, attempts_by_step, tolerance)
105 .await
106 }
107 ReplayStrategy::Live => {
108 warn!(
109 run = %run_id,
110 "live replay not yet implemented; falling back to mocked replay"
111 );
112 self.replay_mocked(run_id, steps, attempts_by_step, tolerance)
113 .await
114 }
115 }
116 }
117}
118
119impl ReplayExecutor {
120 async fn replay_mocked(
121 &self,
122 run_id: RunId,
123 steps: Vec<models::Step>,
124 attempts_by_step: HashMap<Uuid, Vec<models::StepAttempt>>,
125 tolerance: DriftTolerance,
126 ) -> Result<ReplayOutcome> {
127 let mut attempts_by_step = attempts_by_step;
128 let mut replays = Vec::with_capacity(steps.len());
129 let mut all_within = true;
130
131 for step in steps {
132 info!(
133 run = %run_id,
134 step = %step.step_id,
135 "mock replay using recorded snapshot"
136 );
137
138 let attempt_list = attempts_by_step
139 .get(&step.step_id)
140 .ok_or_else(|| anyhow!("no attempts recorded for step {}", step.step_id))?;
141 let (baseline, latest) = select_attempt_pair(attempt_list).ok_or_else(|| {
142 anyhow!(
143 "step {} missing output snapshots; cannot run mocked replay",
144 step.step_id
145 )
146 })?;
147
148 let recorded_snapshot = baseline.output_snapshot.clone().ok_or_else(|| {
149 anyhow!(
150 "attempt {} for step {} missing output_snapshot; cannot replay",
151 baseline.attempt,
152 step.step_id
153 )
154 })?;
155 let current_snapshot = latest.output_snapshot.clone().ok_or_else(|| {
156 anyhow!(
157 "latest attempt {} for step {} missing output_snapshot; cannot replay",
158 latest.attempt,
159 step.step_id
160 )
161 })?;
162
163 let mut drift = compute_drift(&recorded_snapshot, ¤t_snapshot, tolerance);
164
165 let recorded_attestations = extract_attestation_ids(&recorded_snapshot);
166 let current_attestations = extract_attestation_ids(¤t_snapshot);
167 if recorded_attestations != current_attestations {
168 let diff_payload = build_attestation_diff_payload(
169 run_id,
170 StepId(step.step_id),
171 &recorded_attestations,
172 ¤t_attestations,
173 );
174 let diff_sha = self
175 .persist_replay_diff(
176 run_id,
177 StepId(step.step_id),
178 "replay_attestation_diff",
179 diff_payload,
180 )
181 .await?;
182 warn!(
183 run = %run_id,
184 step = %step.step_id,
185 recorded = ?recorded_attestations,
186 current = ?current_attestations,
187 diff_sha256 = %diff_sha,
188 "attestation ids diverged during replay"
189 );
190 bail!(
191 "attestation ids diverged during replay for step {} (kind=replay_attestation_diff, diff_sha256={})",
192 step.step_id,
193 diff_sha
194 );
195 }
196
197 if !tool_events_identical(baseline.tool_events.as_ref(), latest.tool_events.as_ref()) {
198 let diff_payload = build_tool_event_diff_payload(
199 run_id,
200 StepId(step.step_id),
201 baseline.tool_events.as_ref(),
202 latest.tool_events.as_ref(),
203 );
204 let diff_sha = self
205 .persist_replay_diff(
206 run_id,
207 StepId(step.step_id),
208 "replay_tool_diff",
209 diff_payload,
210 )
211 .await?;
212 warn!(
213 run = %run_id,
214 step = %step.step_id,
215 diff_sha256 = %diff_sha,
216 "tool events diverged during replay"
217 );
218 bail!(
219 "tool events diverged during replay for step {} (kind=replay_tool_diff, diff_sha256={})",
220 step.step_id,
221 diff_sha
222 );
223 }
224
225 if !drift.within_tolerance {
226 all_within = false;
227 }
228
229 replays.push(StepReplay {
230 step_id: StepId(step.step_id),
231 snapshot: recorded_snapshot,
232 drift: Some(drift),
233 });
234 }
235
236 Ok(ReplayOutcome {
237 run_id,
238 strategy: ReplayStrategy::Mocked,
239 steps: replays,
240 all_within_tolerance: all_within,
241 })
242 }
243}
244
245fn compute_drift(recorded: &Value, current: &Value, tolerance: DriftTolerance) -> StepDrift {
246 let token_delta = diff_numeric(
247 recorded,
248 current,
249 &["tokens", "token_count", "total_tokens"],
250 );
251 let cost_delta = diff_numeric(recorded, current, &["cost", "cost_usd", "total_cost"]);
252
253 let recorded_tokens = find_numeric(recorded, &["tokens", "token_count", "total_tokens"]);
254 let recorded_cost = find_numeric(recorded, &["cost", "cost_usd", "total_cost"]);
255 let sanitized_recorded = strip_keys(
256 recorded,
257 &[
258 "tokens",
259 "token_count",
260 "total_tokens",
261 "cost",
262 "cost_usd",
263 "total_cost",
264 ],
265 );
266 let sanitized_current = strip_keys(
267 current,
268 &[
269 "tokens",
270 "token_count",
271 "total_tokens",
272 "cost",
273 "cost_usd",
274 "total_cost",
275 ],
276 );
277
278 let tokens_within = match (token_delta, recorded_tokens) {
279 (Some(delta), Some(base)) if base.abs() > f64::EPSILON => {
280 (delta / base).abs() <= tolerance.tokens
281 }
282 (Some(delta), Some(_)) => delta.abs() <= tolerance.tokens,
283 (Some(delta), None) => delta.abs() <= tolerance.tokens,
284 _ => true,
285 };
286 let cost_within = match (cost_delta, recorded_cost) {
287 (Some(delta), Some(base)) if base.abs() > f64::EPSILON => {
288 (delta / base).abs() <= tolerance.cost
289 }
290 (Some(delta), Some(_)) => delta.abs() <= tolerance.cost,
291 (Some(delta), None) => delta.abs() <= tolerance.cost,
292 _ => true,
293 };
294
295 let structure_equal = sanitized_recorded == sanitized_current;
296
297 StepDrift {
298 recorded: recorded.clone(),
299 current: current.clone(),
300 token_delta,
301 cost_delta,
302 within_tolerance: tokens_within && cost_within && structure_equal,
303 }
304}
305
306fn diff_numeric(recorded: &Value, current: &Value, keys: &[&str]) -> Option<f64> {
307 let recorded_value = find_numeric(recorded, keys)?;
308 let current_value = find_numeric(current, keys)?;
309 Some(current_value - recorded_value)
310}
311
312fn find_numeric(value: &Value, keys: &[&str]) -> Option<f64> {
313 match value {
314 Value::Object(map) => {
315 for key in keys {
316 if let Some(v) = map.get(*key) {
317 if let Some(num) = v.as_f64() {
318 return Some(num);
319 }
320 }
321 }
322 for v in map.values() {
323 if let Some(found) = find_numeric(v, keys) {
324 return Some(found);
325 }
326 }
327 None
328 }
329 Value::Array(items) => {
330 for item in items {
331 if let Some(found) = find_numeric(item, keys) {
332 return Some(found);
333 }
334 }
335 None
336 }
337 _ => None,
338 }
339}
340
341fn strip_keys(value: &Value, keys: &[&str]) -> Value {
342 match value {
343 Value::Object(map) => {
344 let mut new_map = serde_json::Map::new();
345 for (k, v) in map {
346 if keys.iter().any(|key| key == k) {
347 continue;
348 }
349 new_map.insert(k.clone(), strip_keys(v, keys));
350 }
351 Value::Object(new_map)
352 }
353 Value::Array(items) => Value::Array(items.iter().map(|v| strip_keys(v, keys)).collect()),
354 _ => value.clone(),
355 }
356}
357
358fn select_attempt_pair(
359 attempts: &[models::StepAttempt],
360) -> Option<(&models::StepAttempt, &models::StepAttempt)> {
361 let baseline = attempts
362 .iter()
363 .find(|attempt| attempt.output_snapshot.is_some())?;
364 let latest = attempts
365 .iter()
366 .rev()
367 .find(|attempt| attempt.output_snapshot.is_some())
368 .unwrap_or(baseline);
369 Some((baseline, latest))
370}
371
372fn extract_attestation_ids(value: &Value) -> Vec<String> {
373 match value {
374 Value::Object(map) => map
375 .get("attestation_ids")
376 .and_then(Value::as_array)
377 .map(|arr| {
378 arr.iter()
379 .filter_map(Value::as_str)
380 .map(|s| s.to_string())
381 .collect()
382 })
383 .unwrap_or_default(),
384 _ => Vec::new(),
385 }
386}
387
388fn tool_events_identical(recorded: Option<&Value>, current: Option<&Value>) -> bool {
389 match (recorded, current) {
390 (None, None) => true,
391 (Some(a), Some(b)) => canonicalize(a) == canonicalize(b),
392 _ => false,
393 }
394}
395
396fn canonicalize(value: &Value) -> Value {
397 match value {
398 Value::Object(map) => {
399 let mut ordered = BTreeMap::new();
400 for (key, val) in map {
401 ordered.insert(key.clone(), canonicalize(val));
402 }
403 let mut object = Map::new();
404 for (key, val) in ordered {
405 object.insert(key, val);
406 }
407 Value::Object(object)
408 }
409 Value::Array(items) => Value::Array(items.iter().map(|item| canonicalize(item)).collect()),
410 _ => value.clone(),
411 }
412}
413
414impl ReplayExecutor {
415 async fn persist_replay_diff(
416 &self,
417 run_id: RunId,
418 step_id: StepId,
419 kind: &'static str,
420 payload: Value,
421 ) -> Result<String> {
422 let metadata = json!({
423 "kind": kind,
424 "run_id": Uuid::from(run_id).to_string(),
425 "step_id": Uuid::from(step_id).to_string(),
426 });
427 let serialized =
428 serde_json::to_vec(&payload).context("failed to serialize replay diff payload")?;
429 let artifact = self
430 .storage
431 .artifacts()
432 .put(Bytes::from(serialized), "application/json", metadata)
433 .await
434 .context("failed to persist replay diff artifact")?;
435 Ok(hex::encode(artifact.sha256))
436 }
437}
438
439fn build_attestation_diff_payload(
440 run_id: RunId,
441 step_id: StepId,
442 recorded: &[String],
443 current: &[String],
444) -> Value {
445 let (missing, unexpected) = diff_attestations(recorded, current);
446 json!({
447 "run_id": Uuid::from(run_id).to_string(),
448 "step_id": Uuid::from(step_id).to_string(),
449 "recorded": recorded,
450 "current": current,
451 "missing": missing,
452 "unexpected": unexpected,
453 })
454}
455
456fn diff_attestations(recorded: &[String], current: &[String]) -> (Vec<String>, Vec<String>) {
457 (
458 recorded
459 .iter()
460 .filter(|value| !current.contains(value))
461 .cloned()
462 .collect(),
463 current
464 .iter()
465 .filter(|value| !recorded.contains(value))
466 .cloned()
467 .collect(),
468 )
469}
470
471fn build_tool_event_diff_payload(
472 run_id: RunId,
473 step_id: StepId,
474 recorded: Option<&Value>,
475 current: Option<&Value>,
476) -> Value {
477 json!({
478 "run_id": Uuid::from(run_id).to_string(),
479 "step_id": Uuid::from(step_id).to_string(),
480 "recorded": recorded.unwrap_or(&Value::Null),
481 "current": current.unwrap_or(&Value::Null),
482 })
483}
484
485#[cfg(test)]
486mod tests {
487 use super::*;
488 use serde_json::json;
489 use std::sync::Arc;
490 use std::time::Duration;
491 use testcontainers::clients::Cli;
492 use testcontainers::images::postgres::Postgres;
493 use tokio::time::sleep;
494
495 #[tokio::test]
496 async fn mocked_replay_parity() -> Result<()> {
497 if std::fs::metadata("/var/run/docker.sock").is_err()
498 && std::env::var("DOCKER_HOST").is_err()
499 {
500 warn!("Skipping replay test because Docker is unavailable");
501 return Ok(());
502 }
503
504 let docker = Cli::default();
505 let container = docker.run(Postgres::default());
506 let port = container.get_host_port_ipv4(5432);
507 let database_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
508
509 sleep(Duration::from_millis(250)).await;
511
512 let storage = Arc::new(
513 Storage::connect(StorageConfig {
514 database_url,
515 max_connections: 2,
516 connect_timeout: Duration::from_secs(10),
517 object_store: ObjectStoreConfig::InMemory,
518 })
519 .await?,
520 );
521
522 let run_id = Uuid::new_v4();
523 let step_id = Uuid::new_v4();
524 let spec_json = json!({
525 "id": step_id,
526 "type": "map",
527 "inputs": {"task": "demo"},
528 "policy": {"budget": {"tokens": 600, "cost": 0.002}}
529 });
530
531 let _ = storage
532 .insert_run_with_steps(
533 models::NewRun {
534 run_id,
535 status: models::RunStatus::Pending,
536 dag_json: json!({"steps": [spec_json.clone()], "edges": []}),
537 input_ctx: json!({}),
538 seed: 42,
539 idempotency_key: None,
540 },
541 &[models::NewStep {
542 run_id,
543 step_id,
544 idx: 0,
545 priority: 5,
546 spec_json: spec_json.clone(),
547 status: models::StepStatus::Queued,
548 attempt: 0,
549 input_snapshot: None,
550 provider: Some("fleet.map".to_string()),
551 provider_version: None,
552 pending_dependencies: 0,
553 total_dependencies: 0,
554 estimated_tokens: 600,
555 estimated_cost: 0.002,
556 actual_tokens: None,
557 actual_cost: None,
558 ..models::NewStep::default()
559 }],
560 &[],
561 )
562 .await?;
563
564 let output = json!({
565 "message": "Weather summary",
566 "metrics": {
567 "tokens": 600,
568 "cost": 0.002
569 }
570 });
571
572 let mut tx = storage.pool().begin().await?;
573 storage
574 .update_step_status_tx(
575 &mut tx,
576 run_id,
577 step_id,
578 models::StepStatus::Succeeded,
579 Some(1),
580 Some(output.clone()),
581 None,
582 None,
583 Some(output.clone()),
584 None,
585 Some("fleet.map"),
586 None,
587 )
588 .await?;
589 storage
590 .update_step_usage_tx(&mut tx, run_id, step_id, Some(600), Some(0.002))
591 .await?;
592 storage
593 .insert_step_attempt_tx(
594 &mut tx,
595 models::NewStepAttempt {
596 run_id,
597 step_id,
598 attempt: 1,
599 status: models::StepStatus::Succeeded,
600 latency_ms: None,
601 spec_snapshot: spec_json.clone(),
602 input_snapshot: None,
603 output_snapshot: Some(output.clone()),
604 error_snapshot: None,
605 checkpoint: None,
606 budget_snapshot: None,
607 guardrail_events: None,
608 tool_events: None,
609 artifacts: None,
610 },
611 )
612 .await?;
613 tx.commit().await?;
614
615 let executor = ReplayExecutor::new(storage.clone());
616 let result = executor
617 .replay_run(
618 RunId(run_id),
619 ReplayStrategy::Mocked,
620 DriftTolerance {
621 tokens: 0.0,
622 cost: 0.0,
623 },
624 )
625 .await?;
626
627 assert!(result.all_within_tolerance);
628 assert_eq!(result.steps.len(), 1);
629 assert!(result.steps[0].drift.as_ref().unwrap().within_tolerance);
630
631 drop(container);
632 Ok(())
633 }
634
635 #[tokio::test]
636 async fn mocked_replay_within_one_percent() -> Result<()> {
637 if std::fs::metadata("/var/run/docker.sock").is_err()
638 && std::env::var("DOCKER_HOST").is_err()
639 {
640 warn!("Skipping replay test because Docker is unavailable");
641 return Ok(());
642 }
643
644 let docker = Cli::default();
645 let container = docker.run(Postgres::default());
646 let port = container.get_host_port_ipv4(5432);
647 let database_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
648 sleep(Duration::from_millis(250)).await;
649
650 let storage = Arc::new(
651 Storage::connect(StorageConfig {
652 database_url,
653 max_connections: 2,
654 connect_timeout: Duration::from_secs(10),
655 object_store: ObjectStoreConfig::InMemory,
656 })
657 .await?,
658 );
659
660 let run_id = Uuid::new_v4();
661 let step_id = Uuid::new_v4();
662
663 let recorded_snapshot = json!({
664 "message": "Original output",
665 "metrics": {
666 "tokens": 100,
667 "cost": 1.00
668 }
669 });
670
671 let actual_output = json!({
672 "message": "Original output",
673 "metrics": {
674 "tokens": 100.5,
675 "cost": 1.009
676 }
677 });
678
679 let _ = storage
680 .insert_run_with_steps(
681 models::NewRun {
682 run_id,
683 status: models::RunStatus::Pending,
684 dag_json: json!({
685 "steps": [{
686 "id": step_id,
687 "type": "tool",
688 "inputs": {"prompt": "demo"},
689 "policy": {"budget": {"tokens": 100, "cost": 1.0}}
690 }],
691 "edges": []
692 }),
693 input_ctx: json!({}),
694 seed: 1,
695 idempotency_key: None,
696 },
697 &[models::NewStep {
698 run_id,
699 step_id,
700 idx: 0,
701 priority: 0,
702 spec_json: json!({
703 "id": step_id,
704 "type": "tool",
705 "inputs": {"prompt": "demo"},
706 "policy": {"budget": {"tokens": 100, "cost": 1.0}}
707 }),
708 status: models::StepStatus::Succeeded,
709 attempt: 1,
710 input_snapshot: Some(json!({"prompt": "demo"})),
711 provider: None,
712 provider_version: None,
713 pending_dependencies: 0,
714 total_dependencies: 0,
715 estimated_tokens: 100,
716 estimated_cost: 1.0,
717 actual_tokens: Some(101),
718 actual_cost: Some(1.02),
719 ..models::NewStep::default()
720 }],
721 &[],
722 )
723 .await?;
724
725 let mut tx = storage.pool().begin().await?;
726 storage
727 .insert_step_attempt_tx(
728 &mut tx,
729 models::NewStepAttempt {
730 run_id,
731 step_id,
732 attempt: 1,
733 status: models::StepStatus::Succeeded,
734 latency_ms: None,
735 spec_snapshot: json!({
736 "id": step_id,
737 "type": "tool",
738 "inputs": {"prompt": "demo"},
739 "policy": {"budget": {"tokens": 100, "cost": 1.0}}
740 }),
741 input_snapshot: Some(json!({"prompt": "demo"})),
742 output_snapshot: Some(recorded_snapshot.clone()),
743 error_snapshot: None,
744 checkpoint: None,
745 budget_snapshot: None,
746 guardrail_events: None,
747 tool_events: None,
748 artifacts: None,
749 },
750 )
751 .await?;
752 storage
753 .insert_step_attempt_tx(
754 &mut tx,
755 models::NewStepAttempt {
756 run_id,
757 step_id,
758 attempt: 2,
759 status: models::StepStatus::Succeeded,
760 latency_ms: None,
761 spec_snapshot: json!({
762 "id": step_id,
763 "type": "tool",
764 "inputs": {"prompt": "demo"},
765 "policy": {"budget": {"tokens": 100, "cost": 1.0}}
766 }),
767 input_snapshot: Some(json!({"prompt": "demo"})),
768 output_snapshot: Some(actual_output.clone()),
769 error_snapshot: None,
770 checkpoint: None,
771 budget_snapshot: None,
772 guardrail_events: None,
773 tool_events: None,
774 artifacts: None,
775 },
776 )
777 .await?;
778 storage
779 .update_step_status_tx(
780 &mut tx,
781 run_id,
782 step_id,
783 models::StepStatus::Succeeded,
784 Some(2),
785 Some(actual_output.clone()),
786 None,
787 Some(json!({"prompt": "demo"})),
788 Some(actual_output.clone()),
789 None,
790 None,
791 None,
792 )
793 .await?;
794 storage
795 .update_step_usage_tx(&mut tx, run_id, step_id, Some(101), Some(1.02))
796 .await?;
797 tx.commit().await?;
798
799 let executor = ReplayExecutor::new(storage.clone());
800 let outcome = executor
801 .replay_run(
802 RunId(run_id),
803 ReplayStrategy::Mocked,
804 DriftTolerance {
805 tokens: 0.01,
806 cost: 0.01,
807 },
808 )
809 .await?;
810
811 assert!(
812 outcome.all_within_tolerance,
813 "replay outcome exceeded tolerance"
814 );
815 let drift = outcome
816 .steps
817 .first()
818 .and_then(|step| step.drift.as_ref())
819 .expect("drift metadata present");
820 assert!(
821 drift.within_tolerance,
822 "drift should be within tolerance: {drift:?}"
823 );
824
825 drop(container);
826 Ok(())
827 }
828
829 #[tokio::test]
830 async fn mocked_replay_detects_attestation_mismatch() -> Result<()> {
831 if std::fs::metadata("/var/run/docker.sock").is_err()
832 && std::env::var("DOCKER_HOST").is_err()
833 {
834 warn!("Skipping replay test because Docker is unavailable");
835 return Ok(());
836 }
837
838 let docker = Cli::default();
839 let container = docker.run(Postgres::default());
840 let port = container.get_host_port_ipv4(5432);
841 let database_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
842 sleep(Duration::from_millis(250)).await;
843
844 let storage = Arc::new(
845 Storage::connect(StorageConfig {
846 database_url,
847 max_connections: 2,
848 connect_timeout: Duration::from_secs(10),
849 object_store: ObjectStoreConfig::InMemory,
850 })
851 .await?,
852 );
853
854 let run_id = Uuid::new_v4();
855 let step_id = Uuid::new_v4();
856
857 let baseline_output = json!({
858 "result": "ok",
859 "attestation_ids": [Uuid::new_v4().to_string()],
860 });
861 let latest_output = json!({
862 "result": "ok",
863 "attestation_ids": [Uuid::new_v4().to_string()],
864 });
865
866 let _ = storage
867 .insert_run_with_steps(
868 models::NewRun {
869 run_id,
870 status: models::RunStatus::Pending,
871 dag_json: json!({"steps": [{"id": step_id, "type": "tool", "inputs": {}}], "edges": []}),
872 input_ctx: json!({}),
873 seed: 11,
874 idempotency_key: None,
875 },
876 &[models::NewStep {
877 run_id,
878 step_id,
879 idx: 0,
880 priority: 0,
881 spec_json: json!({"id": step_id, "type": "tool", "inputs": {}}),
882 status: models::StepStatus::Succeeded,
883 attempt: 1,
884 input_snapshot: None,
885 provider: None,
886 provider_version: None,
887 pending_dependencies: 0,
888 total_dependencies: 0,
889 estimated_tokens: 0,
890 estimated_cost: 0.0,
891 actual_tokens: None,
892 actual_cost: None,
893 ..models::NewStep::default()
894 }],
895 &[],
896 )
897 .await?;
898
899 let mut tx = storage.pool().begin().await?;
900 storage
901 .insert_step_attempt_tx(
902 &mut tx,
903 models::NewStepAttempt {
904 run_id,
905 step_id,
906 attempt: 1,
907 status: models::StepStatus::Succeeded,
908 latency_ms: None,
909 spec_snapshot: json!({"id": step_id, "type": "tool", "inputs": {}}),
910 input_snapshot: None,
911 output_snapshot: Some(baseline_output.clone()),
912 error_snapshot: None,
913 checkpoint: None,
914 budget_snapshot: None,
915 guardrail_events: None,
916 tool_events: None,
917 artifacts: None,
918 },
919 )
920 .await?;
921 storage
922 .insert_step_attempt_tx(
923 &mut tx,
924 models::NewStepAttempt {
925 run_id,
926 step_id,
927 attempt: 2,
928 status: models::StepStatus::Succeeded,
929 latency_ms: None,
930 spec_snapshot: json!({"id": step_id, "type": "tool", "inputs": {}}),
931 input_snapshot: None,
932 output_snapshot: Some(latest_output.clone()),
933 error_snapshot: None,
934 checkpoint: None,
935 budget_snapshot: None,
936 guardrail_events: None,
937 tool_events: None,
938 artifacts: None,
939 },
940 )
941 .await?;
942 storage
943 .update_step_status_tx(
944 &mut tx,
945 run_id,
946 step_id,
947 models::StepStatus::Succeeded,
948 Some(2),
949 Some(latest_output.clone()),
950 None,
951 None,
952 Some(latest_output.clone()),
953 None,
954 None,
955 None,
956 )
957 .await?;
958 tx.commit().await?;
959
960 let executor = ReplayExecutor::new(storage.clone());
961 let err = executor
962 .replay_run(
963 RunId(run_id),
964 ReplayStrategy::Mocked,
965 DriftTolerance {
966 tokens: 0.01,
967 cost: 0.01,
968 },
969 )
970 .await
971 .expect_err("attestation mismatch must fail replay");
972 assert!(
973 err.to_string().contains("replay_attestation_diff"),
974 "error should reference replay_attestation_diff artifact: {err:?}"
975 );
976
977 drop(container);
978 Ok(())
979 }
980
981 #[tokio::test]
982 async fn mocked_replay_detects_tool_event_mismatch() -> Result<()> {
983 if std::fs::metadata("/var/run/docker.sock").is_err()
984 && std::env::var("DOCKER_HOST").is_err()
985 {
986 warn!("Skipping replay test because Docker is unavailable");
987 return Ok(());
988 }
989
990 let docker = Cli::default();
991 let container = docker.run(Postgres::default());
992 let port = container.get_host_port_ipv4(5432);
993 let database_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
994 sleep(Duration::from_millis(250)).await;
995
996 let storage = Arc::new(
997 Storage::connect(StorageConfig {
998 database_url,
999 max_connections: 2,
1000 connect_timeout: Duration::from_secs(10),
1001 object_store: ObjectStoreConfig::InMemory,
1002 })
1003 .await?,
1004 );
1005
1006 let run_id = Uuid::new_v4();
1007 let step_id = Uuid::new_v4();
1008
1009 let output = json!({
1010 "result": "value",
1011 "attestation_ids": [Uuid::new_v4().to_string()],
1012 });
1013
1014 let baseline_tool_events = json!([{
1015 "kind": "tool.run",
1016 "payload": {"stdout": "hello"},
1017 "trace": {"span_id": "abc"}
1018 }]);
1019 let latest_tool_events = json!([{
1020 "kind": "tool.run",
1021 "payload": {"stdout": "goodbye"},
1022 "trace": {"span_id": "def"}
1023 }]);
1024
1025 let _ = storage
1026 .insert_run_with_steps(
1027 models::NewRun {
1028 run_id,
1029 status: models::RunStatus::Pending,
1030 dag_json: json!({"steps": [{"id": step_id, "type": "tool", "inputs": {}}], "edges": []}),
1031 input_ctx: json!({}),
1032 seed: 21,
1033 idempotency_key: None,
1034 },
1035 &[models::NewStep {
1036 run_id,
1037 step_id,
1038 idx: 0,
1039 priority: 0,
1040 spec_json: json!({"id": step_id, "type": "tool", "inputs": {}}),
1041 status: models::StepStatus::Succeeded,
1042 attempt: 1,
1043 input_snapshot: None,
1044 provider: None,
1045 provider_version: None,
1046 pending_dependencies: 0,
1047 total_dependencies: 0,
1048 estimated_tokens: 0,
1049 estimated_cost: 0.0,
1050 actual_tokens: None,
1051 actual_cost: None,
1052 ..models::NewStep::default()
1053 }],
1054 &[],
1055 )
1056 .await?;
1057
1058 let mut tx = storage.pool().begin().await?;
1059 storage
1060 .insert_step_attempt_tx(
1061 &mut tx,
1062 models::NewStepAttempt {
1063 run_id,
1064 step_id,
1065 attempt: 1,
1066 status: models::StepStatus::Succeeded,
1067 latency_ms: None,
1068 spec_snapshot: json!({"id": step_id, "type": "tool", "inputs": {}}),
1069 input_snapshot: None,
1070 output_snapshot: Some(output.clone()),
1071 error_snapshot: None,
1072 checkpoint: None,
1073 budget_snapshot: None,
1074 guardrail_events: None,
1075 tool_events: Some(baseline_tool_events.clone()),
1076 artifacts: None,
1077 },
1078 )
1079 .await?;
1080 storage
1081 .insert_step_attempt_tx(
1082 &mut tx,
1083 models::NewStepAttempt {
1084 run_id,
1085 step_id,
1086 attempt: 2,
1087 status: models::StepStatus::Succeeded,
1088 latency_ms: None,
1089 spec_snapshot: json!({"id": step_id, "type": "tool", "inputs": {}}),
1090 input_snapshot: None,
1091 output_snapshot: Some(output.clone()),
1092 error_snapshot: None,
1093 checkpoint: None,
1094 budget_snapshot: None,
1095 guardrail_events: None,
1096 tool_events: Some(latest_tool_events.clone()),
1097 artifacts: None,
1098 },
1099 )
1100 .await?;
1101 storage
1102 .update_step_status_tx(
1103 &mut tx,
1104 run_id,
1105 step_id,
1106 models::StepStatus::Succeeded,
1107 Some(2),
1108 Some(output.clone()),
1109 None,
1110 None,
1111 Some(output.clone()),
1112 None,
1113 None,
1114 None,
1115 )
1116 .await?;
1117 tx.commit().await?;
1118
1119 let executor = ReplayExecutor::new(storage.clone());
1120 let err = executor
1121 .replay_run(
1122 RunId(run_id),
1123 ReplayStrategy::Mocked,
1124 DriftTolerance {
1125 tokens: 0.0,
1126 cost: 0.0,
1127 },
1128 )
1129 .await
1130 .expect_err("tool event mismatch must fail replay");
1131 assert!(
1132 err.to_string().contains("replay_tool_diff"),
1133 "error should reference replay_tool_diff artifact: {err:?}"
1134 );
1135
1136 drop(container);
1137 Ok(())
1138 }
1139
1140 #[tokio::test]
1141 async fn mocked_replay_requires_snapshots() {
1142 if std::fs::metadata("/var/run/docker.sock").is_err()
1143 && std::env::var("DOCKER_HOST").is_err()
1144 {
1145 warn!("Skipping replay test because Docker is unavailable");
1146 return;
1147 }
1148
1149 let docker = Cli::default();
1150 let container = docker.run(Postgres::default());
1151 let port = container.get_host_port_ipv4(5432);
1152 let database_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
1153 sleep(Duration::from_millis(250)).await;
1154
1155 let storage = Arc::new(
1156 Storage::connect(StorageConfig {
1157 database_url,
1158 max_connections: 2,
1159 connect_timeout: Duration::from_secs(10),
1160 object_store: ObjectStoreConfig::InMemory,
1161 })
1162 .await
1163 .expect("storage connection"),
1164 );
1165
1166 let run_id = Uuid::new_v4();
1167 let step_id = Uuid::new_v4();
1168
1169 let _ = storage
1170 .insert_run_with_steps(
1171 models::NewRun {
1172 run_id,
1173 status: models::RunStatus::Pending,
1174 dag_json: json!({"steps": [], "edges": []}),
1175 input_ctx: json!({}),
1176 seed: 7,
1177 idempotency_key: None,
1178 },
1179 &[models::NewStep {
1180 run_id,
1181 step_id,
1182 idx: 0,
1183 priority: 0,
1184 spec_json: json!({"id": step_id, "type": "map", "inputs": {}}),
1185 status: models::StepStatus::Queued,
1186 attempt: 0,
1187 input_snapshot: None,
1188 provider: None,
1189 provider_version: None,
1190 pending_dependencies: 0,
1191 total_dependencies: 0,
1192 estimated_tokens: 0,
1193 estimated_cost: 0.0,
1194 actual_tokens: None,
1195 actual_cost: None,
1196 ..models::NewStep::default()
1197 }],
1198 &[],
1199 )
1200 .await
1201 .expect("seed run");
1202
1203 let mut tx = storage.pool().begin().await.expect("begin tx");
1204 storage
1205 .insert_step_attempt_tx(
1206 &mut tx,
1207 models::NewStepAttempt {
1208 run_id,
1209 step_id,
1210 attempt: 1,
1211 status: models::StepStatus::Queued,
1212 latency_ms: None,
1213 spec_snapshot: json!({"id": step_id, "type": "map", "inputs": {}}),
1214 input_snapshot: None,
1215 output_snapshot: None,
1216 error_snapshot: None,
1217 checkpoint: None,
1218 budget_snapshot: None,
1219 guardrail_events: None,
1220 tool_events: None,
1221 artifacts: None,
1222 },
1223 )
1224 .await
1225 .expect("insert attempt");
1226 tx.commit().await.expect("commit");
1227
1228 let executor = ReplayExecutor::new(storage);
1229 let err = executor
1230 .replay_run(
1231 RunId(run_id),
1232 ReplayStrategy::Mocked,
1233 DriftTolerance::default(),
1234 )
1235 .await
1236 .expect_err("missing snapshots should error");
1237
1238 assert!(err
1239 .to_string()
1240 .contains("missing output snapshots; cannot run mocked replay"));
1241 drop(container);
1242 }
1243}