fleetforge_storage/
memory.rs

1use std::collections::HashMap;
2use std::sync::RwLock;
3
4use anyhow::Result;
5use async_trait::async_trait;
6use chrono::Utc;
7use uuid::Uuid;
8
9use crate::models;
10use crate::Storage;
11
12/// Minimal contract for benches and fast unit tests that need storage semantics
13/// without a live database.
14#[async_trait]
15pub trait RunStore: Send + Sync {
16    async fn insert_run_with_steps(
17        &self,
18        new_run: models::NewRun,
19        steps: &[models::NewStep],
20        edges: &[(Uuid, Uuid)],
21    ) -> Result<Uuid>;
22
23    async fn fetch_run(&self, run_id: Uuid) -> Result<Option<models::Run>>;
24
25    async fn fetch_steps_for_run(&self, run_id: Uuid) -> Result<Vec<models::Step>>;
26}
27
28struct StoredRun {
29    run: models::Run,
30    steps: Vec<models::Step>,
31}
32
33/// Simple in-memory implementation useful for benches and unit tests.
34#[derive(Default)]
35pub struct InMemoryStorage {
36    inner: RwLock<HashMap<Uuid, StoredRun>>,
37}
38
39impl InMemoryStorage {
40    pub fn new() -> Self {
41        Self {
42            inner: RwLock::new(HashMap::new()),
43        }
44    }
45
46    fn find_by_key(&self, key: &str) -> Option<Uuid> {
47        let guard = self.inner.read().unwrap();
48        guard
49            .iter()
50            .find(|(_, stored)| stored.run.idempotency_key.as_deref() == Some(key))
51            .map(|(run_id, _)| *run_id)
52    }
53
54    fn materialise_step(new_step: &models::NewStep) -> models::Step {
55        models::Step {
56            run_id: new_step.run_id,
57            step_id: new_step.step_id,
58            idx: new_step.idx,
59            priority: new_step.priority,
60            spec_json: new_step.spec_json.clone(),
61            status: new_step.status,
62            attempt: new_step.attempt,
63            created_at: Utc::now(),
64            max_attempts: new_step.max_attempts,
65            retry_backoff_ms: new_step.retry_backoff_ms,
66            retry_backoff_factor: new_step.retry_backoff_factor,
67            not_before: new_step.not_before,
68            deadline_at: new_step.deadline_at,
69            pending_dependencies: new_step.pending_dependencies,
70            total_dependencies: new_step.total_dependencies,
71            estimated_tokens: new_step.estimated_tokens,
72            estimated_cost: new_step.estimated_cost,
73            actual_tokens: new_step.actual_tokens,
74            actual_cost: new_step.actual_cost,
75            leased_by: None,
76            lease_expires_at: None,
77            output_json: None,
78            error_json: None,
79            input_snapshot: new_step.input_snapshot.clone(),
80            output_snapshot: None,
81            provider: new_step.provider.clone(),
82            provider_version: new_step.provider_version.clone(),
83            checkpoint: new_step.checkpoint.clone(),
84            compensation_step: new_step.compensation_step,
85            compensation_scheduled: new_step.compensation_scheduled,
86        }
87    }
88
89    fn materialise_run(new_run: &models::NewRun) -> models::Run {
90        models::Run {
91            run_id: new_run.run_id,
92            created_at: Utc::now(),
93            status: new_run.status,
94            dag_json: new_run.dag_json.clone(),
95            input_ctx: new_run.input_ctx.clone(),
96            seed: new_run.seed,
97            idempotency_key: new_run.idempotency_key.clone(),
98        }
99    }
100}
101
102#[async_trait]
103impl RunStore for InMemoryStorage {
104    async fn insert_run_with_steps(
105        &self,
106        new_run: models::NewRun,
107        steps: &[models::NewStep],
108        edges: &[(Uuid, Uuid)],
109    ) -> Result<Uuid> {
110        if let Some(key) = new_run.idempotency_key.as_deref() {
111            if let Some(existing) = self.find_by_key(key) {
112                return Ok(existing);
113            }
114        }
115
116        let run_id = new_run.run_id;
117        let mut stored_steps: Vec<models::Step> =
118            steps.iter().map(Self::materialise_step).collect();
119
120        if !edges.is_empty() {
121            let mut successors: HashMap<Uuid, i32> = HashMap::new();
122            for (_, to) in edges {
123                *successors.entry(*to).or_insert(0) += 1;
124            }
125            for step in &mut stored_steps {
126                if let Some(count) = successors.get(&step.step_id) {
127                    step.pending_dependencies = *count;
128                    step.total_dependencies = *count;
129                }
130            }
131        }
132
133        let run = Self::materialise_run(&new_run);
134        let stored = StoredRun {
135            run,
136            steps: stored_steps,
137        };
138
139        let mut guard = self.inner.write().unwrap();
140        guard.insert(run_id, stored);
141        Ok(run_id)
142    }
143
144    async fn fetch_run(&self, run_id: Uuid) -> Result<Option<models::Run>> {
145        let guard = self.inner.read().unwrap();
146        Ok(guard.get(&run_id).map(|stored| stored.run.clone()))
147    }
148
149    async fn fetch_steps_for_run(&self, run_id: Uuid) -> Result<Vec<models::Step>> {
150        let guard = self.inner.read().unwrap();
151        Ok(guard
152            .get(&run_id)
153            .map(|stored| {
154                let mut steps = stored.steps.clone();
155                steps.sort_by_key(|step| step.idx);
156                steps
157            })
158            .unwrap_or_default())
159    }
160}
161
162#[async_trait]
163impl RunStore for Storage {
164    async fn insert_run_with_steps(
165        &self,
166        new_run: models::NewRun,
167        steps: &[models::NewStep],
168        edges: &[(Uuid, Uuid)],
169    ) -> Result<Uuid> {
170        Storage::insert_run_with_steps(self, new_run, steps, edges).await
171    }
172
173    async fn fetch_run(&self, run_id: Uuid) -> Result<Option<models::Run>> {
174        Storage::fetch_run(self, run_id).await
175    }
176
177    async fn fetch_steps_for_run(&self, run_id: Uuid) -> Result<Vec<models::Step>> {
178        Storage::fetch_steps_for_run(self, run_id).await
179    }
180}