fleetforge_storage/
memory.rs1use std::collections::HashMap;
2use std::sync::RwLock;
3
4use anyhow::Result;
5use async_trait::async_trait;
6use chrono::Utc;
7use uuid::Uuid;
8
9use crate::models;
10use crate::Storage;
11
12#[async_trait]
15pub trait RunStore: Send + Sync {
16 async fn insert_run_with_steps(
17 &self,
18 new_run: models::NewRun,
19 steps: &[models::NewStep],
20 edges: &[(Uuid, Uuid)],
21 ) -> Result<Uuid>;
22
23 async fn fetch_run(&self, run_id: Uuid) -> Result<Option<models::Run>>;
24
25 async fn fetch_steps_for_run(&self, run_id: Uuid) -> Result<Vec<models::Step>>;
26}
27
28struct StoredRun {
29 run: models::Run,
30 steps: Vec<models::Step>,
31}
32
33#[derive(Default)]
35pub struct InMemoryStorage {
36 inner: RwLock<HashMap<Uuid, StoredRun>>,
37}
38
39impl InMemoryStorage {
40 pub fn new() -> Self {
41 Self {
42 inner: RwLock::new(HashMap::new()),
43 }
44 }
45
46 fn find_by_key(&self, key: &str) -> Option<Uuid> {
47 let guard = self.inner.read().unwrap();
48 guard
49 .iter()
50 .find(|(_, stored)| stored.run.idempotency_key.as_deref() == Some(key))
51 .map(|(run_id, _)| *run_id)
52 }
53
54 fn materialise_step(new_step: &models::NewStep) -> models::Step {
55 models::Step {
56 run_id: new_step.run_id,
57 step_id: new_step.step_id,
58 idx: new_step.idx,
59 priority: new_step.priority,
60 spec_json: new_step.spec_json.clone(),
61 status: new_step.status,
62 attempt: new_step.attempt,
63 created_at: Utc::now(),
64 max_attempts: new_step.max_attempts,
65 retry_backoff_ms: new_step.retry_backoff_ms,
66 retry_backoff_factor: new_step.retry_backoff_factor,
67 not_before: new_step.not_before,
68 deadline_at: new_step.deadline_at,
69 pending_dependencies: new_step.pending_dependencies,
70 total_dependencies: new_step.total_dependencies,
71 estimated_tokens: new_step.estimated_tokens,
72 estimated_cost: new_step.estimated_cost,
73 actual_tokens: new_step.actual_tokens,
74 actual_cost: new_step.actual_cost,
75 leased_by: None,
76 lease_expires_at: None,
77 output_json: None,
78 error_json: None,
79 input_snapshot: new_step.input_snapshot.clone(),
80 output_snapshot: None,
81 provider: new_step.provider.clone(),
82 provider_version: new_step.provider_version.clone(),
83 checkpoint: new_step.checkpoint.clone(),
84 compensation_step: new_step.compensation_step,
85 compensation_scheduled: new_step.compensation_scheduled,
86 }
87 }
88
89 fn materialise_run(new_run: &models::NewRun) -> models::Run {
90 models::Run {
91 run_id: new_run.run_id,
92 created_at: Utc::now(),
93 status: new_run.status,
94 dag_json: new_run.dag_json.clone(),
95 input_ctx: new_run.input_ctx.clone(),
96 seed: new_run.seed,
97 idempotency_key: new_run.idempotency_key.clone(),
98 }
99 }
100}
101
102#[async_trait]
103impl RunStore for InMemoryStorage {
104 async fn insert_run_with_steps(
105 &self,
106 new_run: models::NewRun,
107 steps: &[models::NewStep],
108 edges: &[(Uuid, Uuid)],
109 ) -> Result<Uuid> {
110 if let Some(key) = new_run.idempotency_key.as_deref() {
111 if let Some(existing) = self.find_by_key(key) {
112 return Ok(existing);
113 }
114 }
115
116 let run_id = new_run.run_id;
117 let mut stored_steps: Vec<models::Step> =
118 steps.iter().map(Self::materialise_step).collect();
119
120 if !edges.is_empty() {
121 let mut successors: HashMap<Uuid, i32> = HashMap::new();
122 for (_, to) in edges {
123 *successors.entry(*to).or_insert(0) += 1;
124 }
125 for step in &mut stored_steps {
126 if let Some(count) = successors.get(&step.step_id) {
127 step.pending_dependencies = *count;
128 step.total_dependencies = *count;
129 }
130 }
131 }
132
133 let run = Self::materialise_run(&new_run);
134 let stored = StoredRun {
135 run,
136 steps: stored_steps,
137 };
138
139 let mut guard = self.inner.write().unwrap();
140 guard.insert(run_id, stored);
141 Ok(run_id)
142 }
143
144 async fn fetch_run(&self, run_id: Uuid) -> Result<Option<models::Run>> {
145 let guard = self.inner.read().unwrap();
146 Ok(guard.get(&run_id).map(|stored| stored.run.clone()))
147 }
148
149 async fn fetch_steps_for_run(&self, run_id: Uuid) -> Result<Vec<models::Step>> {
150 let guard = self.inner.read().unwrap();
151 Ok(guard
152 .get(&run_id)
153 .map(|stored| {
154 let mut steps = stored.steps.clone();
155 steps.sort_by_key(|step| step.idx);
156 steps
157 })
158 .unwrap_or_default())
159 }
160}
161
162#[async_trait]
163impl RunStore for Storage {
164 async fn insert_run_with_steps(
165 &self,
166 new_run: models::NewRun,
167 steps: &[models::NewStep],
168 edges: &[(Uuid, Uuid)],
169 ) -> Result<Uuid> {
170 Storage::insert_run_with_steps(self, new_run, steps, edges).await
171 }
172
173 async fn fetch_run(&self, run_id: Uuid) -> Result<Option<models::Run>> {
174 Storage::fetch_run(self, run_id).await
175 }
176
177 async fn fetch_steps_for_run(&self, run_id: Uuid) -> Result<Vec<models::Step>> {
178 Storage::fetch_steps_for_run(self, run_id).await
179 }
180}