1use std::collections::HashSet;
4use std::convert::TryInto;
5use std::path::PathBuf;
6use std::str::FromStr;
7use std::sync::Arc;
8use std::time::Duration;
9
10use anyhow::{anyhow, bail, Context, Result};
11use chrono::{DateTime, Utc};
12use serde_json::{json, Map, Value};
13use sha2::{Digest, Sha256};
14use sqlx::postgres::{PgConnectOptions, PgPoolOptions};
15use sqlx::types::Json;
16use sqlx::{PgConnection, Pool, Postgres, QueryBuilder, Row, Transaction};
17use tracing::warn;
18use uuid::Uuid;
19
20static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!();
21
22use crate::models::NewOutboxEvent;
23mod stores;
24pub use artifacts::ObjectStoreArtifactStore;
25pub use fleetforge_contracts::artifacts::{ArtifactMeta, ArtifactStore};
26pub use memory::{InMemoryStorage, RunStore};
27use stores::{DynObjectSigner, DynObjectStore};
28pub mod memory;
29pub mod retention;
30
31pub mod artifacts {
32 use std::convert::TryFrom;
33 use std::time::Duration;
34
35 use anyhow::{bail, Context, Result};
36 use async_trait::async_trait;
37 use bytes::Bytes;
38 use fleetforge_contracts::artifacts::{ArtifactMeta, ArtifactStore};
39 use object_store::path::Path;
40 use object_store::PutPayload;
41 use reqwest::Method;
42 use serde_json::Value;
43 use sha2::{Digest, Sha256};
44 use url::Url;
45
46 use super::{DynObjectSigner, DynObjectStore};
47
48 pub struct ObjectStoreArtifactStore {
50 store: DynObjectStore,
51 signer: Option<DynObjectSigner>,
52 }
53
54 impl ObjectStoreArtifactStore {
55 pub fn new(store: DynObjectStore, signer: Option<DynObjectSigner>) -> Self {
56 Self { store, signer }
57 }
58
59 fn path_for_digest(digest: &[u8; 32]) -> Path {
60 let digest_hex = hex::encode(digest);
61 let prefix_a = &digest_hex[0..2];
62 let prefix_b = &digest_hex[2..4];
63 Path::from(format!("sha256/{}/{}/{}", prefix_a, prefix_b, digest_hex))
64 }
65 }
66
67 #[async_trait]
68 impl ArtifactStore for ObjectStoreArtifactStore {
69 async fn put(
70 &self,
71 bytes: Bytes,
72 media_type: &str,
73 metadata: Value,
74 ) -> Result<ArtifactMeta> {
75 let mut hasher = Sha256::new();
76 hasher.update(&bytes);
77 let digest = hasher.finalize();
78 let mut sha256 = [0u8; 32];
79 sha256.copy_from_slice(&digest);
80
81 let path = Self::path_for_digest(&sha256);
82 let size =
83 u64::try_from(bytes.len()).context("artifact payload size does not fit in u64")?;
84
85 self.store
86 .put(&path, PutPayload::from(bytes))
87 .await
88 .context("failed to write artifact to object_store")?;
89
90 Ok(ArtifactMeta {
91 sha256,
92 media_type: media_type.to_string(),
93 size,
94 metadata,
95 })
96 }
97
98 async fn get(&self, sha256: [u8; 32]) -> Result<Bytes> {
99 let path = Self::path_for_digest(&sha256);
100 let result = self
101 .store
102 .get(&path)
103 .await
104 .context("failed to fetch artifact from object_store")?;
105 result
106 .bytes()
107 .await
108 .context("failed to collect artifact bytes from object_store")
109 }
110
111 async fn presign_get(&self, sha256: [u8; 32], ttl: Duration) -> Result<Url> {
112 let path = Self::path_for_digest(&sha256);
113 let Some(signer) = self.signer.as_ref() else {
114 bail!("artifact backend does not support presigned URLs");
115 };
116 signer
117 .signed_url(Method::GET, &path, ttl)
118 .await
119 .context("failed to create presigned artifact URL from object_store")
120 }
121 }
122}
123
124#[derive(Debug, Clone)]
126pub struct StorageConfig {
127 pub database_url: String,
128 pub max_connections: u32,
129 pub connect_timeout: Duration,
130 pub object_store: ObjectStoreConfig,
131}
132
133impl Default for StorageConfig {
134 fn default() -> Self {
135 let object_store = match ObjectStoreConfig::from_env() {
136 Ok(cfg) => cfg,
137 Err(err) => {
138 eprintln!("warning: falling back to in-memory artifact store: {err}");
139 ObjectStoreConfig::InMemory
140 }
141 };
142 Self {
143 database_url: std::env::var("DATABASE_URL")
144 .unwrap_or_else(|_| "postgres://localhost/fleetforge".to_string()),
145 max_connections: 10,
146 connect_timeout: Duration::from_secs(5),
147 object_store,
148 }
149 }
150}
151
152#[derive(Debug, Clone)]
154pub enum ObjectStoreConfig {
155 InMemory,
157 S3(S3Config),
159 Gcs(GcsConfig),
161 Azure(AzureConfig),
163 Http(HttpConfig),
165}
166
167#[derive(Debug, Clone)]
169pub struct S3Config {
170 pub endpoint: Option<String>,
171 pub access_key: String,
172 pub secret_key: String,
173 pub region: Option<String>,
174 pub bucket: String,
175 pub virtual_hosted_style: bool,
176}
177
178#[derive(Debug, Clone)]
180pub struct GcsConfig {
181 pub bucket: String,
182 pub service_account: GcsServiceAccount,
183}
184
185#[derive(Debug, Clone)]
186pub enum GcsServiceAccount {
187 File(PathBuf),
188 Json(String),
189}
190
191#[derive(Debug, Clone)]
193pub struct AzureConfig {
194 pub account: String,
195 pub access_key: String,
196 pub container: String,
197}
198
199#[derive(Debug, Clone)]
201pub struct HttpConfig {
202 pub base_url: String,
203}
204
205impl ObjectStoreConfig {
206 fn from_env() -> Result<Self> {
207 match std::env::var("FLEETFORGE_OBJECT_STORE")
208 .ok()
209 .as_deref()
210 .map(|s| s.to_lowercase())
211 {
212 Some(ref backend) if backend == "s3" || backend == "minio" => {
213 Ok(ObjectStoreConfig::S3(S3Config::from_env()?))
214 }
215 Some(ref backend) if backend == "gcs" => Ok(ObjectStoreConfig::Gcs(GcsConfig::from_env()?)),
216 Some(ref backend) if backend == "azure" => Ok(ObjectStoreConfig::Azure(AzureConfig::from_env()?)),
217 Some(ref backend) if backend == "http" => Ok(ObjectStoreConfig::Http(HttpConfig::from_env()?)),
218 Some(ref backend) if backend == "in_memory" => Ok(ObjectStoreConfig::InMemory),
219 Some(other) => Err(anyhow!("unsupported object store backend '{}'; expected one of in_memory, s3, gcs, azure, http", other)),
220 None => {
221 if let Ok(cfg) = S3Config::from_minio_env() {
222 return Ok(ObjectStoreConfig::S3(cfg));
223 }
224 if let Ok(cfg) = S3Config::from_env() {
225 return Ok(ObjectStoreConfig::S3(cfg));
226 }
227 Ok(ObjectStoreConfig::InMemory)
228 }
229 }
230 }
231}
232
233fn first_env(keys: &[&str]) -> Option<String> {
234 keys.iter()
235 .find_map(|key| std::env::var(key).ok().filter(|value| !value.is_empty()))
236}
237
238#[inline]
239fn exec<'a>(tx: &'a mut Transaction<'_, Postgres>) -> &'a mut PgConnection {
240 tx.as_mut()
241}
242
243impl S3Config {
244 fn from_env() -> Result<Self> {
245 let bucket = first_env(&["FLEETFORGE_S3_BUCKET", "S3_BUCKET", "AWS_S3_BUCKET"])
246 .ok_or_else(|| {
247 anyhow!("S3 bucket not configured; set FLEETFORGE_S3_BUCKET or S3_BUCKET")
248 })?;
249 let access_key = first_env(&[
250 "FLEETFORGE_S3_ACCESS_KEY",
251 "AWS_ACCESS_KEY_ID",
252 "MINIO_ACCESS_KEY",
253 ])
254 .ok_or_else(|| {
255 anyhow!(
256 "S3 access key not configured; set FLEETFORGE_S3_ACCESS_KEY or AWS_ACCESS_KEY_ID"
257 )
258 })?;
259 let secret_key = first_env(&["FLEETFORGE_S3_SECRET_KEY", "AWS_SECRET_ACCESS_KEY", "MINIO_SECRET_KEY"])
260 .ok_or_else(|| anyhow!("S3 secret key not configured; set FLEETFORGE_S3_SECRET_KEY or AWS_SECRET_ACCESS_KEY"))?;
261 let endpoint = first_env(&[
262 "FLEETFORGE_S3_ENDPOINT",
263 "S3_ENDPOINT",
264 "AWS_ENDPOINT_URL_S3",
265 "MINIO_ENDPOINT",
266 ]);
267 let region = first_env(&[
268 "FLEETFORGE_S3_REGION",
269 "S3_REGION",
270 "AWS_REGION",
271 "MINIO_REGION",
272 ]);
273 let virtual_hosted_style = endpoint
274 .as_deref()
275 .map(|ep| !ep.contains("minio"))
276 .unwrap_or(true);
277
278 Ok(Self {
279 endpoint,
280 access_key,
281 secret_key,
282 region,
283 bucket,
284 virtual_hosted_style,
285 })
286 }
287
288 fn from_minio_env() -> Result<Self> {
289 let endpoint = std::env::var("MINIO_ENDPOINT").context("MINIO_ENDPOINT not set")?;
290 let access_key = std::env::var("MINIO_ACCESS_KEY").context("MINIO_ACCESS_KEY not set")?;
291 let secret_key = std::env::var("MINIO_SECRET_KEY").context("MINIO_SECRET_KEY not set")?;
292 let bucket = std::env::var("MINIO_BUCKET").unwrap_or_else(|_| "fleetforge".to_string());
293
294 Ok(Self {
295 endpoint: Some(endpoint),
296 access_key,
297 secret_key,
298 region: std::env::var("MINIO_REGION").ok(),
299 bucket,
300 virtual_hosted_style: false,
301 })
302 }
303}
304
305impl GcsConfig {
306 fn from_env() -> Result<Self> {
307 let bucket = first_env(&["FLEETFORGE_GCS_BUCKET", "GCS_BUCKET"]).ok_or_else(|| {
308 anyhow!("GCS bucket not configured; set FLEETFORGE_GCS_BUCKET or GCS_BUCKET")
309 })?;
310 let service_account = if let Some(json) =
311 first_env(&["GCS_SERVICE_ACCOUNT_JSON", "GOOGLE_SERVICE_ACCOUNT_JSON"])
312 {
313 GcsServiceAccount::Json(json)
314 } else if let Some(path) = first_env(&["GOOGLE_APPLICATION_CREDENTIALS"]) {
315 GcsServiceAccount::File(PathBuf::from(path))
316 } else {
317 return Err(anyhow!(
318 "GCS credentials not configured; set GCS_SERVICE_ACCOUNT_JSON or GOOGLE_APPLICATION_CREDENTIALS"
319 ));
320 };
321
322 Ok(Self {
323 bucket,
324 service_account,
325 })
326 }
327}
328
329impl AzureConfig {
330 fn from_env() -> Result<Self> {
331 let account = first_env(&["FLEETFORGE_AZURE_STORAGE_ACCOUNT", "AZURE_STORAGE_ACCOUNT"])
332 .ok_or_else(|| anyhow!("Azure storage account not configured; set FLEETFORGE_AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCOUNT"))?;
333 let access_key = first_env(&["FLEETFORGE_AZURE_STORAGE_KEY", "AZURE_STORAGE_KEY"])
334 .ok_or_else(|| anyhow!("Azure storage key not configured; set FLEETFORGE_AZURE_STORAGE_KEY or AZURE_STORAGE_KEY"))?;
335 let container = first_env(&["FLEETFORGE_AZURE_CONTAINER", "AZURE_STORAGE_CONTAINER", "AZURE_CONTAINER"])
336 .ok_or_else(|| anyhow!("Azure container not configured; set FLEETFORGE_AZURE_CONTAINER or AZURE_STORAGE_CONTAINER"))?;
337
338 Ok(Self {
339 account,
340 access_key,
341 container,
342 })
343 }
344}
345
346impl HttpConfig {
347 fn from_env() -> Result<Self> {
348 let base_url = first_env(&["FLEETFORGE_HTTP_OBJECT_STORE", "HTTP_OBJECT_STORE_BASE_URL"])
349 .ok_or_else(|| anyhow!("HTTP object store base URL not configured; set FLEETFORGE_HTTP_OBJECT_STORE or HTTP_OBJECT_STORE_BASE_URL"))?;
350 Ok(Self { base_url })
351 }
352}
353
354impl Default for ObjectStoreConfig {
355 fn default() -> Self {
356 ObjectStoreConfig::InMemory
357 }
358}
359
360#[derive(Clone)]
362pub struct Storage {
363 pool: Pool<Postgres>,
364 objects: DynObjectStore,
365 artifact_store: Arc<dyn ArtifactStore>,
366}
367
368#[derive(Debug)]
370pub enum BudgetResult {
371 Granted(models::Budget),
372 Denied,
373 NoBudgetConfigured,
374}
375
376#[derive(Debug, Clone, Copy, Default)]
378pub struct RetentionStats {
379 pub runs_scrubbed: u64,
380 pub steps_scrubbed: u64,
381 pub events_scrubbed: u64,
382}
383
384impl Storage {
385 pub async fn connect(config: StorageConfig) -> Result<Self> {
387 let connect_options = PgConnectOptions::from_str(&config.database_url)
388 .context("invalid postgres connection string")?;
389
390 let pool = PgPoolOptions::new()
391 .max_connections(config.max_connections)
392 .acquire_timeout(config.connect_timeout)
393 .connect_with(connect_options)
394 .await?;
395
396 MIGRATOR.run(&pool).await?;
397
398 let (objects, signer) = stores::build(&config.object_store)?;
399
400 let artifact_store: Arc<dyn ArtifactStore> = Arc::new(
401 artifacts::ObjectStoreArtifactStore::new(objects.clone(), signer.clone()),
402 );
403
404 Ok(Self {
405 pool,
406 objects,
407 artifact_store,
408 })
409 }
410
411 pub async fn connect_default() -> Result<Self> {
413 Self::connect(StorageConfig::default()).await
414 }
415
416 pub fn pool(&self) -> &Pool<Postgres> {
418 &self.pool
419 }
420
421 pub fn object_store(&self) -> DynObjectStore {
423 self.objects.clone()
424 }
425
426 pub fn artifacts(&self) -> Arc<dyn ArtifactStore> {
428 self.artifact_store.clone()
429 }
430
431 pub async fn return_step_to_queue(&self, run_id: Uuid, step_id: Uuid) -> Result<()> {
433 let result = sqlx::query(
434 r#"
435 update steps
436 set status = 'queued',
437 leased_by = null,
438 lease_expires_at = null
439 where run_id = $1
440 and step_id = $2
441 "#,
442 )
443 .bind(run_id)
444 .bind(step_id)
445 .execute(&self.pool)
446 .await?;
447
448 if result.rows_affected() == 0 {
449 bail!(
450 "step {} for run {} not found when returning to queue",
451 step_id,
452 run_id
453 );
454 }
455
456 Ok(())
457 }
458
459 pub async fn requeue_expired_leases(&self, limit: i64) -> Result<Vec<(Uuid, Uuid)>> {
461 let rows = sqlx::query(
462 r#"
463 with expired as (
464 select run_id, step_id
465 from steps
466 where status = 'leased'
467 and lease_expires_at is not null
468 and lease_expires_at < now()
469 order by lease_expires_at asc
470 limit $1
471 )
472 update steps s
473 set status = 'queued',
474 leased_by = null,
475 lease_expires_at = null,
476 not_before = null
477 from expired
478 where s.run_id = expired.run_id
479 and s.step_id = expired.step_id
480 returning s.run_id, s.step_id
481 "#,
482 )
483 .bind(limit)
484 .fetch_all(&self.pool)
485 .await?;
486
487 let mut requeued = Vec::with_capacity(rows.len());
488 for row in rows {
489 let run_id: Uuid = row.get("run_id");
490 let step_id: Uuid = row.get("step_id");
491 requeued.push((run_id, step_id));
492 }
493 Ok(requeued)
494 }
495
496 pub async fn defer_step_to_queue(
497 &self,
498 run_id: Uuid,
499 step_id: Uuid,
500 not_before: DateTime<Utc>,
501 payload: Value,
502 ) -> Result<()> {
503 let mut tx = self.pool.begin().await?;
504 self.defer_step_to_queue_tx(&mut tx, run_id, step_id, not_before, payload)
505 .await?;
506 tx.commit().await?;
507 Ok(())
508 }
509
510 pub async fn defer_step_to_queue_tx(
511 &self,
512 tx: &mut Transaction<'_, Postgres>,
513 run_id: Uuid,
514 step_id: Uuid,
515 not_before: DateTime<Utc>,
516 payload: Value,
517 ) -> Result<()> {
518 let result = sqlx::query(
519 r#"
520 update steps
521 set status = 'queued',
522 leased_by = null,
523 lease_expires_at = null,
524 not_before = $3
525 where run_id = $1
526 and step_id = $2
527 "#,
528 )
529 .bind(run_id)
530 .bind(step_id)
531 .bind(not_before)
532 .execute(exec(tx))
533 .await?;
534
535 if result.rows_affected() == 0 {
536 bail!(
537 "failed to defer step {} in run {} back to queue",
538 step_id,
539 run_id
540 );
541 }
542
543 self.enqueue_outbox_event(
544 tx,
545 NewOutboxEvent {
546 run_id,
547 step_id: Some(step_id),
548 kind: "step_deferred".to_string(),
549 payload,
550 },
551 )
552 .await?;
553
554 Ok(())
555 }
556
557 pub async fn schedule_retry_tx(
558 &self,
559 tx: &mut Transaction<'_, Postgres>,
560 run_id: Uuid,
561 step_id: Uuid,
562 attempt: i32,
563 not_before: DateTime<Utc>,
564 error_json: Value,
565 ) -> Result<()> {
566 let error_json = Json(error_json);
567 let result = sqlx::query(
568 r#"
569 update steps
570 set status = 'queued',
571 attempt = $3,
572 leased_by = null,
573 lease_expires_at = null,
574 not_before = $4,
575 error_json = $5
576 where run_id = $1
577 and step_id = $2
578 "#,
579 )
580 .bind(run_id)
581 .bind(step_id)
582 .bind(attempt)
583 .bind(not_before)
584 .bind(error_json)
585 .execute(exec(tx))
586 .await?;
587
588 if result.rows_affected() == 0 {
589 bail!(
590 "failed to schedule retry for step {} in run {}",
591 step_id,
592 run_id
593 );
594 }
595
596 Ok(())
597 }
598
599 pub async fn insert_cost_ledger_tx(
600 &self,
601 tx: &mut Transaction<'_, Postgres>,
602 run_id: Uuid,
603 step_id: Uuid,
604 attempt: i32,
605 status: &str,
606 reserved_tokens: i64,
607 reserved_cost: f64,
608 actual_tokens: Option<i64>,
609 actual_cost: Option<f64>,
610 ) -> Result<()> {
611 sqlx::query(
612 r#"
613 insert into step_cost_ledger (
614 run_id, step_id, attempt, status, reserved_tokens, reserved_cost, actual_tokens, actual_cost
615 ) values ($1, $2, $3, $4, $5, $6, $7, $8)
616 "#,
617 )
618 .bind(run_id)
619 .bind(step_id)
620 .bind(attempt)
621 .bind(status)
622 .bind(reserved_tokens)
623 .bind(reserved_cost)
624 .bind(actual_tokens)
625 .bind(actual_cost)
626 .execute(exec(tx))
627 .await?;
628
629 Ok(())
630 }
631
632 pub async fn insert_step_attempt_tx(
633 &self,
634 tx: &mut Transaction<'_, Postgres>,
635 attempt: models::NewStepAttempt,
636 ) -> Result<()> {
637 sqlx::query(
638 r#"
639 insert into step_attempts (
640 run_id,
641 step_id,
642 attempt,
643 status,
644 latency_ms,
645 spec_snapshot,
646 input_snapshot,
647 output_snapshot,
648 error_snapshot,
649 checkpoint,
650 budget_snapshot,
651 guardrail_events,
652 tool_events,
653 artifacts
654 ) values (
655 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14
656 )
657 on conflict (run_id, step_id, attempt) do nothing
658 "#,
659 )
660 .bind(attempt.run_id)
661 .bind(attempt.step_id)
662 .bind(attempt.attempt)
663 .bind(attempt.status.as_str())
664 .bind(attempt.latency_ms)
665 .bind(Json(attempt.spec_snapshot))
666 .bind(attempt.input_snapshot.map(Json))
667 .bind(attempt.output_snapshot.map(Json))
668 .bind(attempt.error_snapshot.map(Json))
669 .bind(attempt.checkpoint.map(Json))
670 .bind(attempt.budget_snapshot.map(Json))
671 .bind(attempt.guardrail_events.map(Json))
672 .bind(attempt.tool_events.map(Json))
673 .bind(attempt.artifacts.map(Json))
674 .execute(exec(tx))
675 .await?;
676
677 Ok(())
678 }
679
680 pub async fn append_run_event_tx(
681 &self,
682 tx: &mut Transaction<'_, Postgres>,
683 run_id: Uuid,
684 step_id: Option<Uuid>,
685 kind: &str,
686 payload: &Value,
687 ) -> Result<()> {
688 sqlx::query(
689 r#"
690 insert into run_event_log (run_id, step_id, kind, payload)
691 values ($1, $2, $3, $4)
692 "#,
693 )
694 .bind(run_id)
695 .bind(step_id)
696 .bind(kind)
697 .bind(Json(payload.clone()))
698 .execute(exec(tx))
699 .await?;
700
701 Ok(())
702 }
703
704 pub async fn trigger_compensation_tx(
705 &self,
706 tx: &mut Transaction<'_, Postgres>,
707 run_id: Uuid,
708 origin_step: Uuid,
709 compensation_step: Uuid,
710 ) -> Result<()> {
711 sqlx::query(
712 r#"
713 update steps
714 set compensation_scheduled = true
715 where run_id = $1
716 and step_id = $2
717 "#,
718 )
719 .bind(run_id)
720 .bind(origin_step)
721 .execute(exec(tx))
722 .await?;
723
724 let result = sqlx::query(
725 r#"
726 update steps
727 set status = 'queued',
728 pending_dependencies = 0,
729 leased_by = null,
730 lease_expires_at = null,
731 not_before = null
732 where run_id = $1
733 and step_id = $2
734 "#,
735 )
736 .bind(run_id)
737 .bind(compensation_step)
738 .execute(exec(tx))
739 .await?;
740
741 if result.rows_affected() == 0 {
742 bail!(
743 "compensation step {} not present in run {}",
744 compensation_step,
745 run_id
746 );
747 }
748
749 Ok(())
750 }
751
752 pub async fn fetch_unpublished_outbox(&self, limit: i64) -> Result<Vec<models::OutboxEvent>> {
754 let rows = sqlx::query_as::<_, db::OutboxRow>(
755 r#"
756 select *
757 from outbox
758 where published_at is null
759 order by id asc
760 limit $1
761 "#,
762 )
763 .bind(limit)
764 .fetch_all(&self.pool)
765 .await?;
766
767 rows.into_iter()
768 .map(models::OutboxEvent::try_from)
769 .collect()
770 }
771
772 pub async fn mark_outbox_published(&self, id: i64) -> Result<()> {
774 let result = sqlx::query(
775 r#"
776 update outbox
777 set published_at = now()
778 where id = $1
779 and published_at is null
780 "#,
781 )
782 .bind(id)
783 .execute(&self.pool)
784 .await?;
785
786 if result.rows_affected() == 0 {
787 bail!("outbox row {} not found", id);
788 }
789
790 Ok(())
791 }
792
793 pub async fn fetch_outbox_for_run_since(
795 &self,
796 run_id: Uuid,
797 after_id: i64,
798 limit: i64,
799 ) -> Result<Vec<models::OutboxEvent>> {
800 let rows = sqlx::query_as::<_, db::OutboxRow>(
801 r#"
802 select *
803 from outbox
804 where run_id = $1
805 and id > $2
806 order by id asc
807 limit $3
808 "#,
809 )
810 .bind(run_id)
811 .bind(after_id)
812 .bind(limit)
813 .fetch_all(&self.pool)
814 .await?;
815
816 rows.into_iter()
817 .map(models::OutboxEvent::try_from)
818 .collect()
819 }
820
821 pub async fn fetch_last_outbox_event(
823 &self,
824 run_id: Uuid,
825 ) -> Result<Option<models::OutboxEvent>> {
826 let row = sqlx::query_as::<_, db::OutboxRow>(
827 r#"
828 select *
829 from outbox
830 where run_id = $1
831 order by id desc
832 limit 1
833 "#,
834 )
835 .bind(run_id)
836 .fetch_optional(&self.pool)
837 .await?;
838
839 match row {
840 Some(row) => Ok(Some(models::OutboxEvent::try_from(row)?)),
841 None => Ok(None),
842 }
843 }
844
845 pub async fn fetch_tap_offset(&self, run_id: Uuid, cursor_id: &str) -> Result<Option<i64>> {
847 let offset = sqlx::query_scalar::<_, i64>(
848 r#"
849 select cursor_offset
850 from tap_offsets
851 where run_id = $1
852 and cursor_id = $2
853 "#,
854 )
855 .bind(run_id)
856 .bind(cursor_id)
857 .fetch_optional(&self.pool)
858 .await?;
859
860 Ok(offset)
861 }
862
863 pub async fn upsert_tap_offset(
865 &self,
866 run_id: Uuid,
867 cursor_id: &str,
868 offset: i64,
869 ) -> Result<()> {
870 sqlx::query(
871 r#"
872 insert into tap_offsets (run_id, cursor_id, cursor_offset)
873 values ($1, $2, $3)
874 on conflict (run_id, cursor_id) do update
875 set cursor_offset = greatest(tap_offsets.cursor_offset, excluded.cursor_offset),
876 updated_at = now()
877 "#,
878 )
879 .bind(run_id)
880 .bind(cursor_id)
881 .bind(offset)
882 .execute(&self.pool)
883 .await?;
884
885 Ok(())
886 }
887
888 pub async fn apply_retention_policy(&self, cutoff: DateTime<Utc>) -> Result<RetentionStats> {
890 if cutoff.timestamp_millis() <= 0 {
891 return Ok(RetentionStats::default());
892 }
893
894 let mut tx = self.pool.begin().await?;
895
896 let scrubbed_runs = sqlx::query(
897 r#"
898 update runs
899 set input_ctx = jsonb_set(
900 jsonb_set(
901 input_ctx,
902 '{inputs}',
903 '{}'::jsonb,
904 true
905 ),
906 '{labels}',
907 '{}'::jsonb,
908 true
909 )
910 where created_at < $1
911 and (
912 jsonb_typeof(input_ctx -> 'inputs') in ('object', 'array', 'string')
913 or jsonb_typeof(input_ctx -> 'labels') in ('object', 'array', 'string')
914 )
915 "#,
916 )
917 .bind(cutoff)
918 .execute(exec(&mut tx))
919 .await?
920 .rows_affected();
921
922 let scrubbed_steps = sqlx::query(
923 r#"
924 update steps
925 set input_snapshot = null,
926 output_snapshot = null,
927 output_json = null,
928 error_json = null
929 where created_at < $1
930 and (
931 input_snapshot is not null
932 or output_snapshot is not null
933 or output_json is not null
934 or error_json is not null
935 )
936 "#,
937 )
938 .bind(cutoff)
939 .execute(exec(&mut tx))
940 .await?
941 .rows_affected();
942
943 let scrubbed_events = sqlx::query(
944 r#"
945 update outbox
946 set payload = payload - 'snapshots' - 'input' - 'output'
947 where run_id in (
948 select run_id from runs where created_at < $1
949 )
950 and (
951 payload ? 'snapshots'
952 or payload ? 'input'
953 or payload ? 'output'
954 )
955 "#,
956 )
957 .bind(cutoff)
958 .execute(exec(&mut tx))
959 .await?
960 .rows_affected();
961
962 tx.commit().await?;
963
964 Ok(RetentionStats {
965 runs_scrubbed: scrubbed_runs as u64,
966 steps_scrubbed: scrubbed_steps as u64,
967 events_scrubbed: scrubbed_events as u64,
968 })
969 }
970
971 pub async fn try_consume_budget(
973 &self,
974 run_id: Uuid,
975 tokens: i64,
976 cost: f64,
977 ) -> Result<BudgetResult> {
978 if tokens == 0 && cost == 0.0 {
979 return match self.fetch_budget(run_id).await? {
980 Some(budget) => Ok(BudgetResult::Granted(budget)),
981 None => Ok(BudgetResult::NoBudgetConfigured),
982 };
983 }
984
985 let row = sqlx::query_as::<_, db::BudgetRow>(
986 r#"
987 update budgets
988 set tokens_used = tokens_used + $2,
989 cost_used = cost_used + $3,
990 updated_at = now()
991 where run_id = $1
992 and (tokens_limit is null or tokens_used + $2 <= tokens_limit)
993 and (cost_limit is null or cost_used + $3 <= cost_limit)
994 returning *
995 "#,
996 )
997 .bind(run_id)
998 .bind(tokens)
999 .bind(cost)
1000 .fetch_optional(&self.pool)
1001 .await?;
1002
1003 if let Some(row) = row {
1004 return Ok(BudgetResult::Granted(row.try_into()?));
1005 }
1006
1007 let exists = sqlx::query_scalar::<_, i64>("select 1 from budgets where run_id = $1")
1008 .bind(run_id)
1009 .fetch_optional(&self.pool)
1010 .await?;
1011
1012 if exists.is_some() {
1013 Ok(BudgetResult::Denied)
1014 } else {
1015 Ok(BudgetResult::NoBudgetConfigured)
1016 }
1017 }
1018
1019 pub async fn fetch_budget(&self, run_id: Uuid) -> Result<Option<models::Budget>> {
1021 let row = sqlx::query_as::<_, db::BudgetRow>("select * from budgets where run_id = $1")
1022 .bind(run_id)
1023 .fetch_optional(&self.pool)
1024 .await?;
1025
1026 match row {
1027 Some(row) => Ok(Some(row.try_into()?)),
1028 None => Ok(None),
1029 }
1030 }
1031
1032 pub async fn fetch_steps_for_run(&self, run_id: Uuid) -> Result<Vec<models::Step>> {
1034 let rows = sqlx::query_as::<_, db::StepRow>(
1035 r#"
1036 select *
1037 from steps
1038 where run_id = $1
1039 order by idx
1040 "#,
1041 )
1042 .bind(run_id)
1043 .fetch_all(&self.pool)
1044 .await?;
1045
1046 rows.into_iter().map(models::Step::try_from).collect()
1047 }
1048
1049 pub async fn mark_step_running(
1051 &self,
1052 run_id: Uuid,
1053 step_id: Uuid,
1054 worker_id: &str,
1055 input_snapshot: Option<&Value>,
1056 provider: Option<&str>,
1057 provider_version: Option<&str>,
1058 ) -> Result<()> {
1059 let input_snapshot = input_snapshot.map(Json);
1060 let result = sqlx::query(
1061 r#"
1062 update steps
1063 set status = 'running',
1064 attempt = attempt + 1,
1065 input_snapshot = coalesce($4, input_snapshot),
1066 provider = coalesce($5, provider),
1067 provider_version = coalesce($6, provider_version),
1068 not_before = null
1069 where run_id = $1
1070 and step_id = $2
1071 and leased_by = $3
1072 and status = 'leased'
1073 "#,
1074 )
1075 .bind(run_id)
1076 .bind(step_id)
1077 .bind(worker_id)
1078 .bind(input_snapshot)
1079 .bind(provider)
1080 .bind(provider_version)
1081 .execute(&self.pool)
1082 .await?;
1083
1084 if result.rows_affected() == 0 {
1085 bail!(
1086 "step {} for run {} is not leased by worker {}; cannot mark running",
1087 step_id,
1088 run_id,
1089 worker_id
1090 );
1091 }
1092
1093 sqlx::query(
1094 r#"
1095 update runs
1096 set status = 'running'
1097 where run_id = $1
1098 and status = 'pending'
1099 "#,
1100 )
1101 .bind(run_id)
1102 .execute(&self.pool)
1103 .await?;
1104
1105 Ok(())
1106 }
1107
1108 pub async fn update_step_estimate(
1110 &self,
1111 run_id: Uuid,
1112 step_id: Uuid,
1113 estimated_tokens: i64,
1114 estimated_cost: f64,
1115 ) -> Result<()> {
1116 let result = sqlx::query(
1117 r#"
1118 update steps
1119 set estimated_tokens = $3,
1120 estimated_cost = $4
1121 where run_id = $1
1122 and step_id = $2
1123 "#,
1124 )
1125 .bind(run_id)
1126 .bind(step_id)
1127 .bind(estimated_tokens)
1128 .bind(estimated_cost)
1129 .execute(&self.pool)
1130 .await?;
1131
1132 if result.rows_affected() == 0 {
1133 bail!(
1134 "failed to record budget estimate for step {} in run {}",
1135 step_id,
1136 run_id
1137 );
1138 }
1139
1140 Ok(())
1141 }
1142
1143 pub async fn update_step_status_tx(
1145 &self,
1146 tx: &mut Transaction<'_, Postgres>,
1147 run_id: Uuid,
1148 step_id: Uuid,
1149 status: models::StepStatus,
1150 attempt: Option<i32>,
1151 output_json: Option<Value>,
1152 error_json: Option<Value>,
1153 input_snapshot: Option<Value>,
1154 output_snapshot: Option<Value>,
1155 checkpoint: Option<Value>,
1156 provider: Option<&str>,
1157 provider_version: Option<&str>,
1158 ) -> Result<()> {
1159 let output_json = output_json.map(Json);
1160 let error_json = error_json.map(Json);
1161 let input_snapshot = input_snapshot.map(Json);
1162 let output_snapshot = output_snapshot.map(Json);
1163 let checkpoint = checkpoint.map(Json);
1164
1165 let result = sqlx::query(
1166 r#"
1167 update steps
1168 set status = $1,
1169 attempt = coalesce($2, attempt),
1170 output_json = $3,
1171 error_json = $4,
1172 input_snapshot = coalesce($5, input_snapshot),
1173 output_snapshot = coalesce($6, output_snapshot),
1174 checkpoint = coalesce($7, checkpoint),
1175 provider = coalesce($8, provider),
1176 provider_version = coalesce($9, provider_version),
1177 leased_by = case
1178 when $1 in ('succeeded','failed','skipped') then null
1179 else leased_by
1180 end,
1181 lease_expires_at = case
1182 when $1 in ('succeeded','failed','skipped') then null
1183 else lease_expires_at
1184 end
1185 where run_id = $10
1186 and step_id = $11
1187 "#,
1188 )
1189 .bind(status.as_str())
1190 .bind(attempt)
1191 .bind(output_json)
1192 .bind(error_json)
1193 .bind(input_snapshot)
1194 .bind(output_snapshot)
1195 .bind(checkpoint)
1196 .bind(provider)
1197 .bind(provider_version)
1198 .bind(run_id)
1199 .bind(step_id)
1200 .execute(exec(tx))
1201 .await?;
1202
1203 if result.rows_affected() == 0 {
1204 bail!(
1205 "failed to update status for step {} in run {}",
1206 step_id,
1207 run_id
1208 );
1209 }
1210
1211 Ok(())
1212 }
1213
1214 pub async fn fetch_step_attempts_for_run(
1215 &self,
1216 run_id: Uuid,
1217 ) -> Result<Vec<models::StepAttempt>> {
1218 let rows = sqlx::query_as::<_, db::StepAttemptRow>(
1219 r#"
1220 select
1221 run_id,
1222 step_id,
1223 attempt,
1224 status,
1225 recorded_at,
1226 latency_ms,
1227 spec_snapshot,
1228 input_snapshot,
1229 output_snapshot,
1230 error_snapshot,
1231 checkpoint,
1232 budget_snapshot,
1233 guardrail_events,
1234 tool_events,
1235 artifacts
1236 from step_attempts
1237 where run_id = $1
1238 order by step_id, attempt
1239 "#,
1240 )
1241 .bind(run_id)
1242 .fetch_all(&self.pool)
1243 .await?;
1244
1245 rows.into_iter()
1246 .map(models::StepAttempt::try_from)
1247 .collect()
1248 }
1249
1250 pub async fn update_step_usage_tx(
1252 &self,
1253 tx: &mut Transaction<'_, Postgres>,
1254 run_id: Uuid,
1255 step_id: Uuid,
1256 actual_tokens: Option<i64>,
1257 actual_cost: Option<f64>,
1258 ) -> Result<()> {
1259 let result = sqlx::query(
1260 r#"
1261 update steps
1262 set actual_tokens = $3,
1263 actual_cost = $4
1264 where run_id = $1
1265 and step_id = $2
1266 "#,
1267 )
1268 .bind(run_id)
1269 .bind(step_id)
1270 .bind(actual_tokens)
1271 .bind(actual_cost)
1272 .execute(exec(tx))
1273 .await?;
1274
1275 if result.rows_affected() == 0 {
1276 bail!(
1277 "failed to update usage for step {} in run {}",
1278 step_id,
1279 run_id
1280 );
1281 }
1282
1283 Ok(())
1284 }
1285
1286 pub async fn reconcile_budget_usage_tx(
1288 &self,
1289 tx: &mut Transaction<'_, Postgres>,
1290 run_id: Uuid,
1291 tokens_delta: i64,
1292 cost_delta: f64,
1293 ) -> Result<()> {
1294 if tokens_delta == 0 && cost_delta.abs() < f64::EPSILON {
1295 return Ok(());
1296 }
1297
1298 let result = sqlx::query(
1299 r#"
1300 update budgets
1301 set tokens_used = tokens_used + $2,
1302 cost_used = cost_used + $3,
1303 updated_at = now()
1304 where run_id = $1
1305 "#,
1306 )
1307 .bind(run_id)
1308 .bind(tokens_delta)
1309 .bind(cost_delta)
1310 .execute(exec(tx))
1311 .await?;
1312
1313 if result.rows_affected() == 0 {
1314 warn!(run = %run_id, "reconcile_budget_usage_tx called but no budget row updated");
1315 }
1316
1317 Ok(())
1318 }
1319
1320 pub async fn insert_run(&self, run: models::NewRun) -> Result<Uuid> {
1322 self.insert_run_with_steps(run, &[], &[]).await
1323 }
1324
1325 pub async fn insert_run_with_steps(
1327 &self,
1328 run: models::NewRun,
1329 steps: &[models::NewStep],
1330 edges: &[(Uuid, Uuid)],
1331 ) -> Result<Uuid> {
1332 let mut tx = self.pool.begin().await?;
1333
1334 let models::NewRun {
1335 run_id,
1336 status,
1337 dag_json,
1338 input_ctx,
1339 seed,
1340 idempotency_key,
1341 } = run;
1342
1343 let (actual_run_id, inserted_new) = if let Some(ref key) = idempotency_key {
1344 let inserted = sqlx::query_scalar::<_, Uuid>(
1345 r#"
1346 insert into runs (run_id, status, dag_json, input_ctx, seed, idempotency_key)
1347 values ($1, $2, $3, $4, $5, $6)
1348 on conflict (idempotency_key) do nothing
1349 returning run_id
1350 "#,
1351 )
1352 .bind(run_id)
1353 .bind(status.as_str())
1354 .bind(Json(dag_json.clone()))
1355 .bind(Json(input_ctx.clone()))
1356 .bind(seed)
1357 .bind(key)
1358 .fetch_optional(exec(&mut tx))
1359 .await?;
1360 if let Some(id) = inserted {
1361 (id, true)
1362 } else {
1363 let existing = sqlx::query_scalar::<_, Uuid>(
1364 "select run_id from runs where idempotency_key = $1",
1365 )
1366 .bind(key)
1367 .fetch_one(exec(&mut tx))
1368 .await?;
1369 (existing, false)
1370 }
1371 } else {
1372 let inserted = sqlx::query_scalar::<_, Uuid>(
1373 r#"
1374 insert into runs (run_id, status, dag_json, input_ctx, seed, idempotency_key)
1375 values ($1, $2, $3, $4, $5, $6)
1376 on conflict (run_id) do nothing
1377 returning run_id
1378 "#,
1379 )
1380 .bind(run_id)
1381 .bind(status.as_str())
1382 .bind(Json(dag_json.clone()))
1383 .bind(Json(input_ctx.clone()))
1384 .bind(seed)
1385 .bind(idempotency_key)
1386 .fetch_optional(exec(&mut tx))
1387 .await?;
1388 (inserted.unwrap_or(run_id), inserted.is_some())
1389 };
1390
1391 if !inserted_new {
1392 tx.rollback().await?;
1393 return Ok(actual_run_id);
1394 }
1395
1396 for step in steps {
1397 sqlx::query(
1398 r#"
1399 insert into steps (
1400 run_id,
1401 step_id,
1402 idx,
1403 priority,
1404 spec_json,
1405 status,
1406 attempt,
1407 input_snapshot,
1408 provider,
1409 provider_version,
1410 max_attempts,
1411 retry_backoff_ms,
1412 retry_backoff_factor,
1413 not_before,
1414 deadline_at,
1415 pending_dependencies,
1416 total_dependencies,
1417 estimated_tokens,
1418 estimated_cost,
1419 actual_tokens,
1420 actual_cost,
1421 checkpoint,
1422 compensation_step,
1423 compensation_scheduled
1424 )
1425 values (
1426 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
1427 $11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
1428 $21, $22, $23, $24
1429 )
1430 on conflict (run_id, step_id) do update
1431 set idx = excluded.idx,
1432 priority = excluded.priority,
1433 spec_json = excluded.spec_json,
1434 status = excluded.status,
1435 attempt = excluded.attempt,
1436 input_snapshot = excluded.input_snapshot,
1437 provider = excluded.provider,
1438 provider_version = excluded.provider_version,
1439 max_attempts = excluded.max_attempts,
1440 retry_backoff_ms = excluded.retry_backoff_ms,
1441 retry_backoff_factor = excluded.retry_backoff_factor,
1442 not_before = excluded.not_before,
1443 deadline_at = excluded.deadline_at,
1444 pending_dependencies = excluded.pending_dependencies,
1445 total_dependencies = excluded.total_dependencies,
1446 estimated_tokens = excluded.estimated_tokens,
1447 estimated_cost = excluded.estimated_cost,
1448 actual_tokens = excluded.actual_tokens,
1449 actual_cost = excluded.actual_cost,
1450 checkpoint = excluded.checkpoint,
1451 compensation_step = excluded.compensation_step,
1452 compensation_scheduled = excluded.compensation_scheduled
1453 "#,
1454 )
1455 .bind(step.run_id)
1456 .bind(step.step_id)
1457 .bind(step.idx)
1458 .bind(step.priority)
1459 .bind(Json(step.spec_json.clone()))
1460 .bind(step.status.as_str())
1461 .bind(step.attempt)
1462 .bind(step.input_snapshot.clone().map(Json))
1463 .bind(step.provider.clone())
1464 .bind(step.provider_version.clone())
1465 .bind(step.max_attempts)
1466 .bind(step.retry_backoff_ms)
1467 .bind(step.retry_backoff_factor)
1468 .bind(step.not_before)
1469 .bind(step.deadline_at)
1470 .bind(step.pending_dependencies)
1471 .bind(step.total_dependencies)
1472 .bind(step.estimated_tokens)
1473 .bind(step.estimated_cost)
1474 .bind(step.actual_tokens)
1475 .bind(step.actual_cost)
1476 .bind(step.checkpoint.clone().map(Json))
1477 .bind(step.compensation_step)
1478 .bind(step.compensation_scheduled)
1479 .execute(exec(&mut tx))
1480 .await?;
1481 }
1482
1483 if !edges.is_empty() {
1484 let mut builder = QueryBuilder::<Postgres>::new(
1485 "insert into step_edges (run_id, from_step_id, to_step_id) ",
1486 );
1487 builder.push_values(edges, |mut b, (from, to)| {
1488 b.push_bind(actual_run_id).push_bind(from).push_bind(to);
1489 });
1490 builder.push(" on conflict do nothing");
1491 builder.build().execute(exec(&mut tx)).await?;
1492 }
1493
1494 tx.commit().await?;
1495
1496 Ok(actual_run_id)
1497 }
1498
1499 pub async fn unlock_successors_tx(
1501 &self,
1502 tx: &mut Transaction<'_, Postgres>,
1503 run_id: Uuid,
1504 step_id: Uuid,
1505 ) -> Result<Vec<Uuid>> {
1506 let rows = sqlx::query(
1507 r#"
1508 with affected as (
1509 select to_step_id
1510 from step_edges
1511 where run_id = $1
1512 and from_step_id = $2
1513 ),
1514 decremented as (
1515 update steps s
1516 set pending_dependencies = greatest(s.pending_dependencies - 1, 0)
1517 from affected
1518 where s.run_id = $1
1519 and s.step_id = affected.to_step_id
1520 returning s.step_id, s.pending_dependencies
1521 ),
1522 queued as (
1523 update steps s
1524 set status = 'queued'
1525 from decremented
1526 where s.run_id = $1
1527 and s.step_id = decremented.step_id
1528 and decremented.pending_dependencies = 0
1529 and s.status = 'blocked'
1530 returning s.step_id
1531 )
1532 select step_id from queued
1533 "#,
1534 )
1535 .bind(run_id)
1536 .bind(step_id)
1537 .fetch_all(exec(tx))
1538 .await?;
1539
1540 let mut ready = Vec::with_capacity(rows.len());
1541 for row in rows {
1542 ready.push(row.get::<Uuid, _>("step_id"));
1543 }
1544 Ok(ready)
1545 }
1546
1547 pub async fn skip_downstream_steps_tx(
1549 &self,
1550 tx: &mut Transaction<'_, Postgres>,
1551 run_id: Uuid,
1552 failed_step_id: Uuid,
1553 ) -> Result<Vec<Uuid>> {
1554 let rows = sqlx::query(
1555 r#"
1556 with recursive downstream(step_id) as (
1557 select to_step_id
1558 from step_edges
1559 where run_id = $1
1560 and from_step_id = $2
1561 union
1562 select se.to_step_id
1563 from step_edges se
1564 join downstream d on se.from_step_id = d.step_id
1565 where se.run_id = $1
1566 ),
1567 updated as (
1568 update steps s
1569 set status = 'skipped',
1570 pending_dependencies = 0
1571 from downstream
1572 where s.run_id = $1
1573 and s.step_id = downstream.step_id
1574 and s.status = 'blocked'
1575 returning s.step_id
1576 )
1577 select step_id from updated
1578 "#,
1579 )
1580 .bind(run_id)
1581 .bind(failed_step_id)
1582 .fetch_all(exec(tx))
1583 .await?;
1584
1585 let mut skipped = Vec::with_capacity(rows.len());
1586 for row in rows {
1587 skipped.push(row.get::<Uuid, _>("step_id"));
1588 }
1589 Ok(skipped)
1590 }
1591
1592 pub async fn refresh_run_status_tx(
1594 &self,
1595 tx: &mut Transaction<'_, Postgres>,
1596 run_id: Uuid,
1597 ) -> Result<(models::RunStatus, models::RunStatus)> {
1598 let prev_row = sqlx::query(
1599 r#"
1600 select status
1601 from runs
1602 where run_id = $1
1603 "#,
1604 )
1605 .bind(run_id)
1606 .fetch_one(exec(tx))
1607 .await?;
1608
1609 let row = sqlx::query(
1610 r#"
1611 update runs
1612 set status = case
1613 when exists (
1614 select 1
1615 from steps
1616 where run_id = $1
1617 and status = 'failed'
1618 ) then 'failed'
1619 when not exists (
1620 select 1
1621 from steps
1622 where run_id = $1
1623 and status in ('queued','blocked','leased','running')
1624 ) then 'succeeded'
1625 else status
1626 end
1627 where run_id = $1
1628 returning status
1629 "#,
1630 )
1631 .bind(run_id)
1632 .fetch_one(exec(tx))
1633 .await?;
1634
1635 let previous: String = prev_row.get("status");
1636 let current: String = row.get("status");
1637
1638 Ok((
1639 models::RunStatus::try_from_str(&previous)?,
1640 models::RunStatus::try_from_str(¤t)?,
1641 ))
1642 }
1643
1644 pub async fn insert_step(&self, step: models::NewStep) -> Result<()> {
1646 let models::NewStep {
1647 run_id,
1648 step_id,
1649 idx,
1650 priority,
1651 spec_json,
1652 status,
1653 attempt,
1654 input_snapshot,
1655 provider,
1656 provider_version,
1657 max_attempts,
1658 retry_backoff_ms,
1659 retry_backoff_factor,
1660 not_before,
1661 deadline_at,
1662 pending_dependencies,
1663 total_dependencies,
1664 estimated_tokens,
1665 estimated_cost,
1666 actual_tokens,
1667 actual_cost,
1668 checkpoint,
1669 compensation_step,
1670 compensation_scheduled,
1671 } = step;
1672
1673 sqlx::query(
1674 r#"
1675 insert into steps (
1676 run_id,
1677 step_id,
1678 idx,
1679 priority,
1680 spec_json,
1681 status,
1682 attempt,
1683 input_snapshot,
1684 provider,
1685 provider_version,
1686 max_attempts,
1687 retry_backoff_ms,
1688 retry_backoff_factor,
1689 not_before,
1690 deadline_at,
1691 pending_dependencies,
1692 total_dependencies,
1693 estimated_tokens,
1694 estimated_cost,
1695 actual_tokens,
1696 actual_cost,
1697 checkpoint,
1698 compensation_step,
1699 compensation_scheduled
1700 )
1701 values (
1702 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
1703 $11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
1704 $21, $22, $23, $24
1705 )
1706 on conflict (run_id, step_id) do update
1707 set idx = excluded.idx,
1708 priority = excluded.priority,
1709 spec_json = excluded.spec_json,
1710 status = excluded.status,
1711 attempt = excluded.attempt,
1712 input_snapshot = excluded.input_snapshot,
1713 provider = excluded.provider,
1714 provider_version = excluded.provider_version,
1715 max_attempts = excluded.max_attempts,
1716 retry_backoff_ms = excluded.retry_backoff_ms,
1717 retry_backoff_factor = excluded.retry_backoff_factor,
1718 not_before = excluded.not_before,
1719 deadline_at = excluded.deadline_at,
1720 pending_dependencies = excluded.pending_dependencies,
1721 total_dependencies = excluded.total_dependencies,
1722 estimated_tokens = excluded.estimated_tokens,
1723 estimated_cost = excluded.estimated_cost,
1724 actual_tokens = excluded.actual_tokens,
1725 actual_cost = excluded.actual_cost,
1726 checkpoint = excluded.checkpoint,
1727 compensation_step = excluded.compensation_step,
1728 compensation_scheduled = excluded.compensation_scheduled
1729 "#,
1730 )
1731 .bind(run_id)
1732 .bind(step_id)
1733 .bind(idx)
1734 .bind(priority)
1735 .bind(Json(spec_json))
1736 .bind(status.as_str())
1737 .bind(attempt)
1738 .bind(input_snapshot.map(Json))
1739 .bind(provider)
1740 .bind(provider_version)
1741 .bind(max_attempts)
1742 .bind(retry_backoff_ms)
1743 .bind(retry_backoff_factor)
1744 .bind(not_before)
1745 .bind(deadline_at)
1746 .bind(pending_dependencies)
1747 .bind(total_dependencies)
1748 .bind(estimated_tokens)
1749 .bind(estimated_cost)
1750 .bind(actual_tokens)
1751 .bind(actual_cost)
1752 .bind(checkpoint.map(Json))
1753 .bind(compensation_step)
1754 .bind(compensation_scheduled)
1755 .execute(&self.pool)
1756 .await?;
1757
1758 Ok(())
1759 }
1760
1761 pub async fn fetch_run(&self, run_id: Uuid) -> Result<Option<models::Run>> {
1763 let row = sqlx::query_as::<_, db::RunRow>(
1764 r#"
1765 select *
1766 from runs
1767 where run_id = $1
1768 "#,
1769 )
1770 .bind(run_id)
1771 .fetch_optional(&self.pool)
1772 .await?;
1773
1774 match row {
1775 Some(row) => Ok(Some(row.try_into()?)),
1776 None => Ok(None),
1777 }
1778 }
1779
1780 pub async fn update_run_input_ctx(&self, run_id: Uuid, input_ctx: Value) -> Result<()> {
1781 let result = sqlx::query(
1782 r#"
1783 update runs
1784 set input_ctx = $2
1785 where run_id = $1
1786 "#,
1787 )
1788 .bind(run_id)
1789 .bind(Json(input_ctx))
1790 .execute(&self.pool)
1791 .await?;
1792
1793 if result.rows_affected() == 0 {
1794 bail!("run {} not found", run_id);
1795 }
1796
1797 Ok(())
1798 }
1799
1800 pub async fn update_run_artifact_transparency(
1802 &self,
1803 run_id: Uuid,
1804 artifact_key: &str,
1805 transparency: Value,
1806 ) -> Result<()> {
1807 let Some(run) = self.fetch_run(run_id).await? else {
1808 return Ok(());
1809 };
1810 let mut ctx_map = match run.input_ctx {
1811 Value::Object(map) => map,
1812 other => {
1813 let mut map = Map::new();
1814 map.insert("inputs".to_string(), other);
1815 map
1816 }
1817 };
1818 let artifacts_value = ctx_map
1819 .entry("artifacts".to_string())
1820 .or_insert_with(|| Value::Object(Map::new()));
1821 let Value::Object(artifacts_map) = artifacts_value else {
1822 return Ok(());
1823 };
1824 if let Some(entry) = artifacts_map.get_mut(artifact_key) {
1825 if let Value::Object(ref mut artifact_obj) = entry {
1826 artifact_obj.insert("transparency".to_string(), transparency);
1827 } else {
1828 let mut object = Map::new();
1829 object.insert("transparency".to_string(), transparency);
1830 artifacts_map.insert(artifact_key.to_string(), Value::Object(object));
1831 }
1832 self.update_run_input_ctx(run_id, Value::Object(ctx_map))
1833 .await
1834 } else {
1835 Ok(())
1836 }
1837 }
1838
1839 pub async fn update_step_artifact_transparency(
1841 &self,
1842 run_id: Uuid,
1843 step_id: Uuid,
1844 artifact_key: &str,
1845 transparency: Value,
1846 ) -> Result<()> {
1847 let row = sqlx::query(
1848 r#"
1849 select output_json
1850 from steps
1851 where run_id = $1 and step_id = $2
1852 "#,
1853 )
1854 .bind(run_id)
1855 .bind(step_id)
1856 .fetch_optional(&self.pool)
1857 .await?;
1858
1859 let Some(record) = row else {
1860 return Ok(());
1861 };
1862
1863 let Some(Json(mut output_value)) = record
1864 .try_get::<Option<Json<Value>>, _>("output_json")
1865 .context("failed to deserialize step output_json")?
1866 else {
1867 return Ok(());
1868 };
1869
1870 let Some(map) = output_value.as_object_mut() else {
1871 return Ok(());
1872 };
1873 let artifacts_entry = map
1874 .entry("artifacts".to_string())
1875 .or_insert_with(|| Value::Object(Map::new()));
1876 let Value::Object(artifacts_map) = artifacts_entry else {
1877 return Ok(());
1878 };
1879 if let Some(entry) = artifacts_map.get_mut(artifact_key) {
1880 match entry {
1881 Value::Object(obj) => {
1882 obj.insert("transparency".to_string(), transparency);
1883 }
1884 _ => {
1885 let mut object = Map::new();
1886 object.insert("transparency".to_string(), transparency);
1887 *entry = Value::Object(object);
1888 }
1889 }
1890 } else {
1891 return Ok(());
1892 }
1893
1894 sqlx::query(
1895 r#"
1896 update steps
1897 set output_json = $3
1898 where run_id = $1 and step_id = $2
1899 "#,
1900 )
1901 .bind(run_id)
1902 .bind(step_id)
1903 .bind(Json(output_value))
1904 .execute(&self.pool)
1905 .await?;
1906
1907 Ok(())
1908 }
1909
1910 pub async fn update_run_input_ctx_tx(
1911 &self,
1912 tx: &mut Transaction<'_, Postgres>,
1913 run_id: Uuid,
1914 input_ctx: Value,
1915 ) -> Result<()> {
1916 let result = sqlx::query(
1917 r#"
1918 update runs
1919 set input_ctx = $2
1920 where run_id = $1
1921 "#,
1922 )
1923 .bind(run_id)
1924 .bind(Json(input_ctx))
1925 .execute(exec(tx))
1926 .await?;
1927
1928 if result.rows_affected() == 0 {
1929 bail!("run {} not found", run_id);
1930 }
1931
1932 Ok(())
1933 }
1934
1935 pub async fn set_run_status(&self, run_id: Uuid, status: models::RunStatus) -> Result<()> {
1936 let result = sqlx::query(
1937 r#"
1938 update runs
1939 set status = $2
1940 where run_id = $1
1941 "#,
1942 )
1943 .bind(run_id)
1944 .bind(status.as_str())
1945 .execute(&self.pool)
1946 .await?;
1947
1948 if result.rows_affected() == 0 {
1949 bail!("run {} not found", run_id);
1950 }
1951
1952 Ok(())
1953 }
1954
1955 pub async fn set_run_status_tx(
1956 &self,
1957 tx: &mut Transaction<'_, Postgres>,
1958 run_id: Uuid,
1959 status: models::RunStatus,
1960 ) -> Result<()> {
1961 let result = sqlx::query(
1962 r#"
1963 update runs
1964 set status = $2
1965 where run_id = $1
1966 "#,
1967 )
1968 .bind(run_id)
1969 .bind(status.as_str())
1970 .execute(exec(tx))
1971 .await?;
1972
1973 if result.rows_affected() == 0 {
1974 bail!("run {} not found", run_id);
1975 }
1976
1977 Ok(())
1978 }
1979
1980 pub async fn fetch_run_tenant(&self, run_id: Uuid) -> Result<Option<String>> {
1982 let tenant: Option<String> = sqlx::query_scalar(
1983 r#"
1984 select input_ctx -> 'labels' ->> 'tenant'
1985 from runs
1986 where run_id = $1
1987 "#,
1988 )
1989 .bind(run_id)
1990 .fetch_optional(&self.pool)
1991 .await?;
1992
1993 Ok(tenant.and_then(|value| if value.is_empty() { None } else { Some(value) }))
1994 }
1995
1996 pub async fn enforce_tenant_quota(&self, tenant: &str, tokens: i64, cost: f64) -> Result<()> {
1998 let quota: Option<(Option<i64>, Option<f64>)> = sqlx::query_as(
1999 r#"
2000 select daily_token_limit, daily_cost_limit
2001 from tenant_quotas
2002 where tenant = $1
2003 "#,
2004 )
2005 .bind(tenant)
2006 .fetch_optional(&self.pool)
2007 .await?;
2008
2009 let Some((token_limit, cost_limit)) = quota else {
2010 return Ok(());
2011 };
2012
2013 let usage: (i64, f64) = sqlx::query_as(
2014 r#"
2015 select coalesce(sum(tokens), 0), coalesce(sum(cost), 0)
2016 from billing_usage
2017 where tenant = $1
2018 and recorded_at >= date_trunc('day', now())
2019 "#,
2020 )
2021 .bind(tenant)
2022 .fetch_one(&self.pool)
2023 .await?;
2024
2025 let prospective_tokens = usage.0.saturating_add(tokens.max(0));
2026 if let Some(limit) = token_limit {
2027 if prospective_tokens > limit {
2028 bail!("tenant '{}' exceeded daily token quota", tenant);
2029 }
2030 }
2031
2032 let prospective_cost = usage.1 + cost.max(0.0);
2033 if let Some(limit) = cost_limit {
2034 if prospective_cost > limit {
2035 bail!("tenant '{}' exceeded daily cost quota", tenant);
2036 }
2037 }
2038
2039 Ok(())
2040 }
2041
2042 pub async fn record_billing_usage_tx(
2044 &self,
2045 tx: &mut Transaction<'_, Postgres>,
2046 run_id: Uuid,
2047 tokens: i64,
2048 cost: f64,
2049 ) -> Result<()> {
2050 sqlx::query(
2051 r#"
2052 insert into billing_usage (run_id, tenant, tokens, cost)
2053 select $1, input_ctx -> 'labels' ->> 'tenant', $2, $3
2054 from runs
2055 where run_id = $1
2056 "#,
2057 )
2058 .bind(run_id)
2059 .bind(tokens)
2060 .bind(cost)
2061 .execute(exec(tx))
2062 .await?;
2063
2064 Ok(())
2065 }
2066
2067 pub async fn list_recent_runs(&self, limit: i64) -> Result<Vec<models::Run>> {
2069 let limit = limit.max(1);
2070 let rows = sqlx::query_as::<_, db::RunRow>(
2071 r#"
2072 select *
2073 from runs
2074 order by created_at desc
2075 limit $1
2076 "#,
2077 )
2078 .bind(limit)
2079 .fetch_all(&self.pool)
2080 .await?;
2081
2082 rows.into_iter().map(models::Run::try_from).collect()
2083 }
2084
2085 pub async fn fetch_run_by_idempotency_key(
2087 &self,
2088 idempotency_key: &str,
2089 ) -> Result<Option<models::Run>> {
2090 let row = sqlx::query_as::<_, db::RunRow>(
2091 r#"
2092 select *
2093 from runs
2094 where idempotency_key = $1
2095 "#,
2096 )
2097 .bind(idempotency_key)
2098 .fetch_optional(&self.pool)
2099 .await?;
2100
2101 match row {
2102 Some(row) => Ok(Some(row.try_into()?)),
2103 None => Ok(None),
2104 }
2105 }
2106
2107 pub async fn upsert_artifact(&self, artifact: models::NewArtifact) -> Result<()> {
2109 let models::NewArtifact {
2110 sha256,
2111 media_type,
2112 size_bytes,
2113 metadata,
2114 } = artifact;
2115
2116 let metadata = metadata.map(Json);
2117
2118 sqlx::query(
2119 r#"
2120 insert into artifacts (sha256, media_type, size_bytes, metadata)
2121 values ($1, $2, $3, $4)
2122 on conflict (sha256) do update
2123 set media_type = excluded.media_type,
2124 size_bytes = excluded.size_bytes,
2125 metadata = excluded.metadata
2126 "#,
2127 )
2128 .bind(sha256)
2129 .bind(media_type)
2130 .bind(size_bytes)
2131 .bind(metadata)
2132 .execute(&self.pool)
2133 .await?;
2134
2135 Ok(())
2136 }
2137
2138 pub async fn enqueue_outbox_event(
2140 &self,
2141 tx: &mut Transaction<'_, Postgres>,
2142 event: models::NewOutboxEvent,
2143 ) -> Result<i64> {
2144 let models::NewOutboxEvent {
2145 run_id,
2146 step_id,
2147 kind,
2148 mut payload,
2149 } = event;
2150
2151 self.attach_outbox_audit(tx, run_id, step_id, &kind, &mut payload)
2152 .await?;
2153
2154 self.append_run_event_tx(tx, run_id, step_id, &kind, &payload)
2155 .await?;
2156
2157 let id = sqlx::query_scalar(
2158 r#"
2159 insert into outbox (run_id, step_id, kind, payload)
2160 values ($1, $2, $3, $4)
2161 returning id
2162 "#,
2163 )
2164 .bind(run_id)
2165 .bind(step_id)
2166 .bind(&kind)
2167 .bind(Json(payload))
2168 .fetch_one(exec(tx))
2169 .await?;
2170
2171 Ok(id)
2172 }
2173
2174 fn advisory_lock_key(run_id: Uuid) -> (i64, i64) {
2175 let value = run_id.as_u128();
2176 let hi = (value >> 64) as i64;
2177 let lo = value as i64;
2178 (hi, lo)
2179 }
2180
2181 async fn attach_outbox_audit(
2182 &self,
2183 tx: &mut Transaction<'_, Postgres>,
2184 run_id: Uuid,
2185 step_id: Option<Uuid>,
2186 kind: &str,
2187 payload: &mut Value,
2188 ) -> Result<()> {
2189 if payload.as_object().is_none() {
2190 bail!(
2191 "outbox payload for run {} kind '{}' must be a JSON object",
2192 run_id,
2193 kind
2194 );
2195 }
2196
2197 let (lock_a, lock_b) = Self::advisory_lock_key(run_id);
2198 sqlx::query("select pg_advisory_xact_lock($1, $2)")
2199 .bind(lock_a)
2200 .bind(lock_b)
2201 .fetch_optional(exec(tx))
2202 .await?;
2203
2204 let previous_hash: Option<String> = sqlx::query_scalar(
2205 r#"
2206 select payload->'_audit'->>'chain_hash'
2207 from outbox
2208 where run_id = $1
2209 order by id desc
2210 limit 1
2211 for update
2212 "#,
2213 )
2214 .bind(run_id)
2215 .fetch_optional(exec(tx))
2216 .await?;
2217
2218 let emitted_at = Utc::now();
2219 let canonical = serde_json::to_vec(payload)
2220 .context("failed to serialise outbox payload for audit hashing")?;
2221 let payload_hash = Sha256::digest(&canonical);
2222 let payload_hash_bytes: &[u8] = payload_hash.as_ref();
2223
2224 let mut hasher = Sha256::new();
2225 if let Some(prev) = &previous_hash {
2226 hasher.update(prev.as_bytes());
2227 }
2228 hasher.update(run_id.as_bytes());
2229 if let Some(step) = step_id {
2230 hasher.update(step.as_bytes());
2231 }
2232 hasher.update(kind.as_bytes());
2233 hasher.update(payload_hash_bytes);
2234 let timestamp = emitted_at.timestamp_nanos_opt().unwrap_or(0);
2235 hasher.update(timestamp.to_be_bytes());
2236 let chain_hash = hex::encode(hasher.finalize());
2237
2238 if let Some(map) = payload.as_object_mut() {
2239 map.insert(
2240 "_audit".to_string(),
2241 json!({
2242 "version": 1,
2243 "chain_hash": chain_hash,
2244 "payload_hash": hex::encode(payload_hash_bytes),
2245 "previous_hash": previous_hash,
2246 "emitted_at": emitted_at,
2247 }),
2248 );
2249 }
2250
2251 Ok(())
2252 }
2253
2254 pub async fn claim_queued_steps(
2256 &self,
2257 limit: i64,
2258 worker_id: &str,
2259 ) -> Result<Vec<models::Step>> {
2260 let mut tx = self.pool.begin().await?;
2261 let rows: Vec<db::StepRow> = sqlx::query_as(
2262 r#"
2263 with cte as (
2264 select s.run_id, s.step_id
2265 from steps s
2266 inner join runs r on r.run_id = s.run_id
2267 where s.status = 'queued'
2268 and s.pending_dependencies = 0
2269 and r.status <> 'paused_at_step'
2270 and (s.not_before is null or s.not_before <= now())
2271 order by s.priority desc, coalesce(s.not_before, s.created_at) asc, s.idx
2272 for update skip locked
2273 limit $1
2274 )
2275 update steps s set
2276 status = 'leased',
2277 leased_by = $2,
2278 lease_expires_at = now() + interval '2 minutes'
2279 from cte
2280 where s.run_id = cte.run_id and s.step_id = cte.step_id
2281 returning s.*
2282 "#,
2283 )
2284 .bind(limit)
2285 .bind(worker_id)
2286 .fetch_all(exec(&mut tx))
2287 .await?;
2288
2289 if !rows.is_empty() {
2290 let run_ids_set: HashSet<Uuid> = rows.iter().map(|row| row.run_id).collect();
2291 let run_ids: Vec<Uuid> = run_ids_set.into_iter().collect();
2292
2293 sqlx::query(
2294 r#"
2295 update runs
2296 set status = 'running'
2297 where status = 'pending'
2298 and run_id = any($1)
2299 "#,
2300 )
2301 .bind(&run_ids)
2302 .execute(exec(&mut tx))
2303 .await?;
2304 }
2305 tx.commit().await?;
2306
2307 rows.into_iter()
2308 .map(models::Step::try_from)
2309 .collect::<Result<Vec<_>>>()
2310 }
2311
2312 pub async fn revert_step_to_queue(&self, run_id: Uuid, step_id: Uuid) -> Result<()> {
2313 let result = sqlx::query(
2314 r#"
2315 update steps
2316 set status = 'queued',
2317 leased_by = null,
2318 lease_expires_at = null
2319 where run_id = $1
2320 and step_id = $2
2321 "#,
2322 )
2323 .bind(run_id)
2324 .bind(step_id)
2325 .execute(&self.pool)
2326 .await?;
2327
2328 if result.rows_affected() == 0 {
2329 bail!(
2330 "failed to revert step {} to queue for run {}",
2331 step_id,
2332 run_id
2333 );
2334 }
2335 Ok(())
2336 }
2337
2338 pub async fn revert_step_to_queue_tx(
2339 &self,
2340 tx: &mut Transaction<'_, Postgres>,
2341 run_id: Uuid,
2342 step_id: Uuid,
2343 ) -> Result<()> {
2344 let result = sqlx::query(
2345 r#"
2346 update steps
2347 set status = 'queued',
2348 leased_by = null,
2349 lease_expires_at = null
2350 where run_id = $1
2351 and step_id = $2
2352 "#,
2353 )
2354 .bind(run_id)
2355 .bind(step_id)
2356 .execute(exec(tx))
2357 .await?;
2358
2359 if result.rows_affected() == 0 {
2360 bail!(
2361 "failed to revert step {} to queue for run {}",
2362 step_id,
2363 run_id
2364 );
2365 }
2366 Ok(())
2367 }
2368
2369 pub async fn update_step_spec(
2370 &self,
2371 run_id: Uuid,
2372 step_id: Uuid,
2373 spec_json: Value,
2374 ) -> Result<()> {
2375 let result = sqlx::query(
2376 r#"
2377 update steps
2378 set spec_json = $3
2379 where run_id = $1
2380 and step_id = $2
2381 "#,
2382 )
2383 .bind(run_id)
2384 .bind(step_id)
2385 .bind(Json(spec_json))
2386 .execute(&self.pool)
2387 .await?;
2388
2389 if result.rows_affected() == 0 {
2390 bail!(
2391 "failed to update spec for step {} in run {}",
2392 step_id,
2393 run_id
2394 );
2395 }
2396 Ok(())
2397 }
2398
2399 pub async fn update_step_spec_tx(
2400 &self,
2401 tx: &mut Transaction<'_, Postgres>,
2402 run_id: Uuid,
2403 step_id: Uuid,
2404 spec_json: Value,
2405 ) -> Result<()> {
2406 let result = sqlx::query(
2407 r#"
2408 update steps
2409 set spec_json = $3
2410 where run_id = $1
2411 and step_id = $2
2412 "#,
2413 )
2414 .bind(run_id)
2415 .bind(step_id)
2416 .bind(Json(spec_json))
2417 .execute(exec(tx))
2418 .await?;
2419
2420 if result.rows_affected() == 0 {
2421 bail!(
2422 "failed to update spec for step {} in run {}",
2423 step_id,
2424 run_id
2425 );
2426 }
2427 Ok(())
2428 }
2429}
2430
2431mod db {
2432 use super::*;
2433 use serde_json::Value;
2434
2435 #[derive(Debug, Clone, sqlx::FromRow)]
2436 pub struct RunRow {
2437 pub run_id: Uuid,
2438 pub created_at: DateTime<Utc>,
2439 pub status: String,
2440 pub dag_json: Value,
2441 pub input_ctx: Value,
2442 pub seed: i64,
2443 pub idempotency_key: Option<String>,
2444 }
2445
2446 #[derive(Debug, Clone, sqlx::FromRow)]
2447 pub struct StepRow {
2448 pub run_id: Uuid,
2449 pub step_id: Uuid,
2450 pub idx: i32,
2451 pub priority: i16,
2452 pub spec_json: Value,
2453 pub status: String,
2454 pub attempt: i32,
2455 pub created_at: DateTime<Utc>,
2456 pub max_attempts: i32,
2457 pub retry_backoff_ms: i64,
2458 pub retry_backoff_factor: f64,
2459 pub not_before: Option<DateTime<Utc>>,
2460 pub deadline_at: Option<DateTime<Utc>>,
2461 pub pending_dependencies: i32,
2462 pub total_dependencies: i32,
2463 pub estimated_tokens: i64,
2464 pub estimated_cost: f64,
2465 pub actual_tokens: Option<i64>,
2466 pub actual_cost: Option<f64>,
2467 pub leased_by: Option<String>,
2468 pub lease_expires_at: Option<DateTime<Utc>>,
2469 pub output_json: Option<Value>,
2470 pub error_json: Option<Value>,
2471 pub input_snapshot: Option<Value>,
2472 pub output_snapshot: Option<Value>,
2473 pub provider: Option<String>,
2474 pub provider_version: Option<String>,
2475 pub checkpoint: Option<Value>,
2476 pub compensation_step: Option<Uuid>,
2477 pub compensation_scheduled: bool,
2478 }
2479
2480 #[derive(Debug, Clone, sqlx::FromRow)]
2481 pub struct StepAttemptRow {
2482 pub run_id: Uuid,
2483 pub step_id: Uuid,
2484 pub attempt: i32,
2485 pub status: String,
2486 pub recorded_at: DateTime<Utc>,
2487 pub latency_ms: Option<f64>,
2488 pub spec_snapshot: Value,
2489 pub input_snapshot: Option<Value>,
2490 pub output_snapshot: Option<Value>,
2491 pub error_snapshot: Option<Value>,
2492 pub checkpoint: Option<Value>,
2493 pub budget_snapshot: Option<Value>,
2494 pub guardrail_events: Option<Value>,
2495 pub tool_events: Option<Value>,
2496 pub artifacts: Option<Value>,
2497 }
2498
2499 #[derive(Debug, Clone, sqlx::FromRow)]
2500 pub struct BudgetRow {
2501 pub budget_id: Uuid,
2502 pub scope: String,
2503 pub run_id: Option<Uuid>,
2504 pub team_id: Option<Uuid>,
2505 pub tokens_limit: Option<i64>,
2506 pub tokens_used: i64,
2507 pub cost_limit: Option<f64>,
2508 pub cost_used: f64,
2509 pub created_at: DateTime<Utc>,
2510 pub updated_at: DateTime<Utc>,
2511 }
2512
2513 #[derive(Debug, Clone, sqlx::FromRow)]
2514 pub struct ArtifactRow {
2515 pub sha256: Vec<u8>,
2516 pub media_type: String,
2517 pub size_bytes: i64,
2518 pub created_at: DateTime<Utc>,
2519 pub metadata: Option<Value>,
2520 }
2521
2522 #[derive(Debug, Clone, sqlx::FromRow)]
2523 pub struct OutboxRow {
2524 pub id: i64,
2525 pub run_id: Uuid,
2526 pub step_id: Option<Uuid>,
2527 pub kind: String,
2528 pub payload: Value,
2529 pub created_at: DateTime<Utc>,
2530 pub published_at: Option<DateTime<Utc>>,
2531 }
2532
2533 #[derive(Debug, Clone, sqlx::FromRow)]
2534 pub struct EvalScenarioRow {
2535 pub scenario_id: Uuid,
2536 pub name: String,
2537 pub description: Option<String>,
2538 pub spec: Value,
2539 pub created_at: DateTime<Utc>,
2540 }
2541
2542 #[derive(Debug, Clone, sqlx::FromRow)]
2543 pub struct EvalResultRow {
2544 pub result_id: Uuid,
2545 pub scenario_id: Uuid,
2546 pub run_id: Uuid,
2547 pub executed_at: DateTime<Utc>,
2548 pub outcome: String,
2549 pub metrics: Value,
2550 }
2551
2552 #[derive(Debug, Clone, sqlx::FromRow)]
2553 pub struct ChangeGateRow {
2554 pub gate_id: Uuid,
2555 pub change_id: String,
2556 pub revision: Option<String>,
2557 pub effect: String,
2558 pub reasons: Value,
2559 pub followups: Value,
2560 pub scorecard: Value,
2561 pub decided_at: DateTime<Utc>,
2562 pub decided_by: Option<String>,
2563 pub metadata: Value,
2564 pub input: Value,
2565 pub telemetry: Value,
2566 pub artifact_sha256: Option<Vec<u8>>,
2567 pub created_at: DateTime<Utc>,
2568 }
2569
2570 #[derive(Debug, Clone, sqlx::FromRow)]
2571 pub struct ChangeGateFollowupRow {
2572 pub followup_id: Uuid,
2573 pub gate_id: Uuid,
2574 pub actor: String,
2575 pub outcome: String,
2576 pub note: Option<String>,
2577 pub details: Value,
2578 pub recorded_at: DateTime<Utc>,
2579 }
2580
2581 #[derive(Debug, Clone, sqlx::FromRow)]
2582 pub struct TransparencyJobRow {
2583 pub job_id: i64,
2584 pub kind: String,
2585 pub payload: Value,
2586 pub receipt: Option<Value>,
2587 pub error: Option<String>,
2588 pub attempts: i32,
2589 pub created_at: DateTime<Utc>,
2590 pub processed_at: Option<DateTime<Utc>>,
2591 }
2592}
2593
2594pub mod models {
2595 use super::*;
2596 use serde::{Deserialize, Serialize};
2597 use serde_json::Value;
2598
2599 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
2601 #[serde(rename_all = "snake_case")]
2602 pub enum RunStatus {
2603 Pending,
2604 Running,
2605 Succeeded,
2606 Failed,
2607 Canceled,
2608 PausedAtStep,
2609 }
2610
2611 impl RunStatus {
2612 pub fn as_str(self) -> &'static str {
2613 match self {
2614 RunStatus::Pending => "pending",
2615 RunStatus::Running => "running",
2616 RunStatus::Succeeded => "succeeded",
2617 RunStatus::Failed => "failed",
2618 RunStatus::Canceled => "canceled",
2619 RunStatus::PausedAtStep => "paused_at_step",
2620 }
2621 }
2622
2623 pub fn try_from_str(value: &str) -> Result<Self> {
2624 match value {
2625 "pending" => Ok(RunStatus::Pending),
2626 "running" => Ok(RunStatus::Running),
2627 "succeeded" => Ok(RunStatus::Succeeded),
2628 "failed" => Ok(RunStatus::Failed),
2629 "canceled" => Ok(RunStatus::Canceled),
2630 "paused_at_step" => Ok(RunStatus::PausedAtStep),
2631 other => Err(anyhow!("invalid run_status '{}'", other)),
2632 }
2633 }
2634 }
2635
2636 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
2638 #[serde(rename_all = "snake_case")]
2639 pub enum StepStatus {
2640 Queued,
2641 Blocked,
2642 Leased,
2643 Running,
2644 Succeeded,
2645 Failed,
2646 Skipped,
2647 }
2648
2649 impl StepStatus {
2650 pub fn as_str(self) -> &'static str {
2651 match self {
2652 StepStatus::Queued => "queued",
2653 StepStatus::Blocked => "blocked",
2654 StepStatus::Leased => "leased",
2655 StepStatus::Running => "running",
2656 StepStatus::Succeeded => "succeeded",
2657 StepStatus::Failed => "failed",
2658 StepStatus::Skipped => "skipped",
2659 }
2660 }
2661
2662 pub fn try_from_str(value: &str) -> Result<Self> {
2663 match value {
2664 "queued" => Ok(StepStatus::Queued),
2665 "blocked" => Ok(StepStatus::Blocked),
2666 "leased" => Ok(StepStatus::Leased),
2667 "running" => Ok(StepStatus::Running),
2668 "succeeded" => Ok(StepStatus::Succeeded),
2669 "failed" => Ok(StepStatus::Failed),
2670 "skipped" => Ok(StepStatus::Skipped),
2671 other => Err(anyhow!("invalid step_status '{}'", other)),
2672 }
2673 }
2674 }
2675
2676 #[derive(Debug, Clone, Serialize, Deserialize)]
2678 pub struct Run {
2679 pub run_id: Uuid,
2680 pub created_at: DateTime<Utc>,
2681 pub status: RunStatus,
2682 pub dag_json: Value,
2683 pub input_ctx: Value,
2684 pub seed: i64,
2685 pub idempotency_key: Option<String>,
2686 }
2687
2688 #[derive(Debug, Clone, Serialize, Deserialize)]
2690 pub struct NewRun {
2691 pub run_id: Uuid,
2692 pub status: RunStatus,
2693 pub dag_json: Value,
2694 pub input_ctx: Value,
2695 pub seed: i64,
2696 pub idempotency_key: Option<String>,
2697 }
2698
2699 #[derive(Debug, Clone, Serialize, Deserialize)]
2701 pub struct Step {
2702 pub run_id: Uuid,
2703 pub step_id: Uuid,
2704 pub idx: i32,
2705 pub priority: i16,
2706 pub spec_json: Value,
2707 pub status: StepStatus,
2708 pub attempt: i32,
2709 pub created_at: DateTime<Utc>,
2710 pub max_attempts: i32,
2711 pub retry_backoff_ms: i64,
2712 pub retry_backoff_factor: f64,
2713 pub not_before: Option<DateTime<Utc>>,
2714 pub deadline_at: Option<DateTime<Utc>>,
2715 pub pending_dependencies: i32,
2716 pub total_dependencies: i32,
2717 pub estimated_tokens: i64,
2718 pub estimated_cost: f64,
2719 pub actual_tokens: Option<i64>,
2720 pub actual_cost: Option<f64>,
2721 pub leased_by: Option<String>,
2722 pub lease_expires_at: Option<DateTime<Utc>>,
2723 pub output_json: Option<Value>,
2724 pub error_json: Option<Value>,
2725 pub input_snapshot: Option<Value>,
2726 pub output_snapshot: Option<Value>,
2727 pub provider: Option<String>,
2728 pub provider_version: Option<String>,
2729 pub checkpoint: Option<Value>,
2730 pub compensation_step: Option<Uuid>,
2731 pub compensation_scheduled: bool,
2732 }
2733
2734 #[derive(Debug, Clone, Serialize, Deserialize)]
2736 pub struct NewStep {
2737 pub run_id: Uuid,
2738 pub step_id: Uuid,
2739 pub idx: i32,
2740 pub priority: i16,
2741 pub spec_json: Value,
2742 pub status: StepStatus,
2743 pub attempt: i32,
2744 pub input_snapshot: Option<Value>,
2745 pub provider: Option<String>,
2746 pub provider_version: Option<String>,
2747 pub max_attempts: i32,
2748 pub retry_backoff_ms: i64,
2749 pub retry_backoff_factor: f64,
2750 pub not_before: Option<DateTime<Utc>>,
2751 pub deadline_at: Option<DateTime<Utc>>,
2752 pub pending_dependencies: i32,
2753 pub total_dependencies: i32,
2754 pub estimated_tokens: i64,
2755 pub estimated_cost: f64,
2756 pub actual_tokens: Option<i64>,
2757 pub actual_cost: Option<f64>,
2758 pub checkpoint: Option<Value>,
2759 pub compensation_step: Option<Uuid>,
2760 pub compensation_scheduled: bool,
2761 }
2762
2763 #[derive(Debug, Clone, Serialize, Deserialize)]
2765 pub struct StepAttempt {
2766 pub run_id: Uuid,
2767 pub step_id: Uuid,
2768 pub attempt: i32,
2769 pub status: StepStatus,
2770 pub recorded_at: DateTime<Utc>,
2771 pub latency_ms: Option<f64>,
2772 pub spec_snapshot: Value,
2773 pub input_snapshot: Option<Value>,
2774 pub output_snapshot: Option<Value>,
2775 pub error_snapshot: Option<Value>,
2776 pub checkpoint: Option<Value>,
2777 pub budget_snapshot: Option<Value>,
2778 pub guardrail_events: Option<Value>,
2779 pub tool_events: Option<Value>,
2780 pub artifacts: Option<Value>,
2781 }
2782
2783 #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
2784 #[serde(rename_all = "snake_case")]
2785 pub enum TransparencyJobKind {
2786 ReleaseBundle,
2787 RunRollup,
2788 ArtifactEntry,
2789 }
2790
2791 impl TransparencyJobKind {
2792 pub fn as_str(self) -> &'static str {
2793 match self {
2794 TransparencyJobKind::ReleaseBundle => "release_bundle",
2795 TransparencyJobKind::RunRollup => "run_rollup",
2796 TransparencyJobKind::ArtifactEntry => "artifact_entry",
2797 }
2798 }
2799 }
2800
2801 impl FromStr for TransparencyJobKind {
2802 type Err = anyhow::Error;
2803
2804 fn from_str(s: &str) -> Result<Self> {
2805 match s {
2806 "release_bundle" => Ok(TransparencyJobKind::ReleaseBundle),
2807 "run_rollup" => Ok(TransparencyJobKind::RunRollup),
2808 "artifact_entry" => Ok(TransparencyJobKind::ArtifactEntry),
2809 other => Err(anyhow!("invalid transparency job kind '{}'", other)),
2810 }
2811 }
2812 }
2813
2814 #[derive(Debug, Clone, Serialize, Deserialize)]
2815 pub struct ReleaseBundlePayload {
2816 pub change_id: String,
2817 pub gate_id: Uuid,
2818 pub artifact_sha256: String,
2819 pub attestation_ids: Vec<Uuid>,
2820 pub metadata: Value,
2821 }
2822
2823 #[derive(Debug, Clone, Serialize, Deserialize)]
2824 pub struct RunRollupPayload {
2825 pub run_id: Uuid,
2826 pub artifact_sha256: String,
2827 pub attestation_ids: Vec<Uuid>,
2828 pub artifact_key: String,
2829 pub metadata: Value,
2830 }
2831
2832 #[derive(Debug, Clone, Serialize, Deserialize)]
2833 pub struct ArtifactEntryPayload {
2834 pub run_id: Option<Uuid>,
2835 pub step_id: Option<Uuid>,
2836 pub artifact_key: String,
2837 pub artifact_sha256: String,
2838 pub attestation_ids: Vec<Uuid>,
2839 pub metadata: Value,
2840 }
2841
2842 #[derive(Debug, Clone, Serialize, Deserialize)]
2843 #[serde(tag = "kind", rename_all = "snake_case")]
2844 pub enum TransparencyJobPayload {
2845 ReleaseBundle(ReleaseBundlePayload),
2846 RunRollup(RunRollupPayload),
2847 ArtifactEntry(ArtifactEntryPayload),
2848 }
2849
2850 impl TransparencyJobPayload {
2851 pub fn kind(&self) -> TransparencyJobKind {
2852 match self {
2853 TransparencyJobPayload::ReleaseBundle(_) => TransparencyJobKind::ReleaseBundle,
2854 TransparencyJobPayload::RunRollup(_) => TransparencyJobKind::RunRollup,
2855 TransparencyJobPayload::ArtifactEntry(_) => TransparencyJobKind::ArtifactEntry,
2856 }
2857 }
2858 }
2859
2860 #[derive(Debug, Clone, Serialize, Deserialize)]
2861 pub struct TransparencyJob {
2862 pub job_id: i64,
2863 pub kind: TransparencyJobKind,
2864 pub payload: TransparencyJobPayload,
2865 pub attempts: i32,
2866 pub last_error: Option<String>,
2867 pub receipt: Option<Value>,
2868 pub processed_at: Option<DateTime<Utc>>,
2869 pub created_at: DateTime<Utc>,
2870 }
2871
2872 #[derive(Debug, Clone, Serialize, Deserialize)]
2873 pub struct NewTransparencyJob {
2874 pub payload: TransparencyJobPayload,
2875 }
2876
2877 #[derive(Debug, Clone, Serialize, Deserialize)]
2879 pub struct NewStepAttempt {
2880 pub run_id: Uuid,
2881 pub step_id: Uuid,
2882 pub attempt: i32,
2883 pub status: StepStatus,
2884 pub latency_ms: Option<f64>,
2885 pub spec_snapshot: Value,
2886 pub input_snapshot: Option<Value>,
2887 pub output_snapshot: Option<Value>,
2888 pub error_snapshot: Option<Value>,
2889 pub checkpoint: Option<Value>,
2890 pub budget_snapshot: Option<Value>,
2891 pub guardrail_events: Option<Value>,
2892 pub tool_events: Option<Value>,
2893 pub artifacts: Option<Value>,
2894 }
2895
2896 impl Default for NewStep {
2897 fn default() -> Self {
2898 Self {
2899 run_id: Uuid::nil(),
2900 step_id: Uuid::nil(),
2901 idx: 0,
2902 priority: 0,
2903 spec_json: Value::Null,
2904 status: StepStatus::Queued,
2905 attempt: 0,
2906 input_snapshot: None,
2907 provider: None,
2908 provider_version: None,
2909 max_attempts: 1,
2910 retry_backoff_ms: 0,
2911 retry_backoff_factor: 1.0,
2912 not_before: None,
2913 deadline_at: None,
2914 pending_dependencies: 0,
2915 total_dependencies: 0,
2916 estimated_tokens: 0,
2917 estimated_cost: 0.0,
2918 actual_tokens: None,
2919 actual_cost: None,
2920 checkpoint: None,
2921 compensation_step: None,
2922 compensation_scheduled: false,
2923 }
2924 }
2925 }
2926
2927 impl TryFrom<crate::db::StepRow> for Step {
2928 type Error = anyhow::Error;
2929
2930 fn try_from(row: crate::db::StepRow) -> Result<Self, Self::Error> {
2931 Ok(Self {
2932 run_id: row.run_id,
2933 step_id: row.step_id,
2934 idx: row.idx,
2935 priority: row.priority,
2936 spec_json: row.spec_json,
2937 status: StepStatus::try_from_str(&row.status)?,
2938 attempt: row.attempt,
2939 created_at: row.created_at,
2940 max_attempts: row.max_attempts,
2941 retry_backoff_ms: row.retry_backoff_ms,
2942 retry_backoff_factor: row.retry_backoff_factor,
2943 not_before: row.not_before,
2944 deadline_at: row.deadline_at,
2945 pending_dependencies: row.pending_dependencies,
2946 total_dependencies: row.total_dependencies,
2947 estimated_tokens: row.estimated_tokens,
2948 estimated_cost: row.estimated_cost,
2949 actual_tokens: row.actual_tokens,
2950 actual_cost: row.actual_cost,
2951 leased_by: row.leased_by,
2952 lease_expires_at: row.lease_expires_at,
2953 output_json: row.output_json,
2954 error_json: row.error_json,
2955 input_snapshot: row.input_snapshot,
2956 output_snapshot: row.output_snapshot,
2957 provider: row.provider,
2958 provider_version: row.provider_version,
2959 checkpoint: row.checkpoint,
2960 compensation_step: row.compensation_step,
2961 compensation_scheduled: row.compensation_scheduled,
2962 })
2963 }
2964 }
2965
2966 impl TryFrom<crate::db::StepAttemptRow> for StepAttempt {
2967 type Error = anyhow::Error;
2968
2969 fn try_from(row: crate::db::StepAttemptRow) -> Result<Self, Self::Error> {
2970 Ok(Self {
2971 run_id: row.run_id,
2972 step_id: row.step_id,
2973 attempt: row.attempt,
2974 status: StepStatus::try_from_str(&row.status)?,
2975 recorded_at: row.recorded_at,
2976 latency_ms: row.latency_ms,
2977 spec_snapshot: row.spec_snapshot,
2978 input_snapshot: row.input_snapshot,
2979 output_snapshot: row.output_snapshot,
2980 error_snapshot: row.error_snapshot,
2981 checkpoint: row.checkpoint,
2982 budget_snapshot: row.budget_snapshot,
2983 guardrail_events: row.guardrail_events,
2984 tool_events: row.tool_events,
2985 artifacts: row.artifacts,
2986 })
2987 }
2988 }
2989
2990 #[derive(Debug, Clone, Serialize, Deserialize)]
2992 pub struct Artifact {
2993 pub sha256: Vec<u8>,
2994 pub media_type: String,
2995 pub size_bytes: i64,
2996 pub created_at: DateTime<Utc>,
2997 pub metadata: Option<Value>,
2998 }
2999
3000 #[derive(Debug, Clone, Serialize, Deserialize)]
3002 pub struct NewArtifact {
3003 pub sha256: Vec<u8>,
3004 pub media_type: String,
3005 pub size_bytes: i64,
3006 pub metadata: Option<Value>,
3007 }
3008
3009 impl From<super::ArtifactMeta> for NewArtifact {
3010 fn from(meta: super::ArtifactMeta) -> Self {
3011 Self {
3012 sha256: meta.sha256.to_vec(),
3013 media_type: meta.media_type,
3014 size_bytes: std::cmp::min(meta.size, i64::MAX as u64) as i64,
3015 metadata: Some(meta.metadata),
3016 }
3017 }
3018 }
3019
3020 #[derive(Debug, Clone, Serialize, Deserialize)]
3022 pub struct Budget {
3023 pub budget_id: Uuid,
3024 pub scope: String,
3025 pub run_id: Option<Uuid>,
3026 pub team_id: Option<Uuid>,
3027 pub tokens_limit: Option<i64>,
3028 pub tokens_used: i64,
3029 pub cost_limit: Option<f64>,
3030 pub cost_used: f64,
3031 pub created_at: DateTime<Utc>,
3032 pub updated_at: DateTime<Utc>,
3033 }
3034
3035 impl TryFrom<crate::db::BudgetRow> for Budget {
3036 type Error = anyhow::Error;
3037
3038 fn try_from(row: crate::db::BudgetRow) -> Result<Self, Self::Error> {
3039 Ok(Self {
3040 budget_id: row.budget_id,
3041 scope: row.scope,
3042 run_id: row.run_id,
3043 team_id: row.team_id,
3044 tokens_limit: row.tokens_limit,
3045 tokens_used: row.tokens_used,
3046 cost_limit: row.cost_limit,
3047 cost_used: row.cost_used,
3048 created_at: row.created_at,
3049 updated_at: row.updated_at,
3050 })
3051 }
3052 }
3053
3054 impl Budget {
3055 pub fn remaining_tokens(&self) -> Option<i64> {
3056 self.tokens_limit
3057 .map(|limit| limit.saturating_sub(self.tokens_used))
3058 }
3059
3060 pub fn remaining_cost(&self) -> Option<f64> {
3061 self.cost_limit
3062 .map(|limit| (limit - self.cost_used).max(0.0))
3063 }
3064 }
3065
3066 #[derive(Debug, Clone, Serialize, Deserialize)]
3068 pub struct OutboxEvent {
3069 pub id: i64,
3070 pub run_id: Uuid,
3071 pub step_id: Option<Uuid>,
3072 pub kind: String,
3073 pub payload: Value,
3074 pub created_at: DateTime<Utc>,
3075 pub published_at: Option<DateTime<Utc>>,
3076 }
3077
3078 #[derive(Debug, Clone, Serialize, Deserialize)]
3080 pub struct EvalScenario {
3081 pub scenario_id: Uuid,
3082 pub name: String,
3083 pub description: Option<String>,
3084 pub spec: Value,
3085 pub created_at: DateTime<Utc>,
3086 }
3087
3088 #[derive(Debug, Clone, Serialize, Deserialize)]
3090 pub struct EvalResult {
3091 pub result_id: Uuid,
3092 pub scenario_id: Uuid,
3093 pub run_id: Uuid,
3094 pub executed_at: DateTime<Utc>,
3095 pub outcome: String,
3096 pub metrics: Value,
3097 }
3098
3099 #[derive(Debug, Clone, Serialize, Deserialize)]
3101 pub struct NewOutboxEvent {
3102 pub run_id: Uuid,
3103 pub step_id: Option<Uuid>,
3104 pub kind: String,
3105 pub payload: Value,
3106 }
3107
3108 impl TryFrom<crate::db::RunRow> for Run {
3109 type Error = anyhow::Error;
3110
3111 fn try_from(row: crate::db::RunRow) -> Result<Self, Self::Error> {
3112 Ok(Self {
3113 run_id: row.run_id,
3114 created_at: row.created_at,
3115 status: RunStatus::try_from_str(&row.status)?,
3116 dag_json: row.dag_json,
3117 input_ctx: row.input_ctx,
3118 seed: row.seed,
3119 idempotency_key: row.idempotency_key,
3120 })
3121 }
3122 }
3123
3124 impl TryFrom<crate::db::EvalScenarioRow> for EvalScenario {
3125 type Error = anyhow::Error;
3126
3127 fn try_from(row: crate::db::EvalScenarioRow) -> Result<Self, Self::Error> {
3128 Ok(Self {
3129 scenario_id: row.scenario_id,
3130 name: row.name,
3131 description: row.description,
3132 spec: row.spec,
3133 created_at: row.created_at,
3134 })
3135 }
3136 }
3137
3138 impl TryFrom<crate::db::EvalResultRow> for EvalResult {
3139 type Error = anyhow::Error;
3140
3141 fn try_from(row: crate::db::EvalResultRow) -> Result<Self, Self::Error> {
3142 Ok(Self {
3143 result_id: row.result_id,
3144 scenario_id: row.scenario_id,
3145 run_id: row.run_id,
3146 executed_at: row.executed_at,
3147 outcome: row.outcome,
3148 metrics: row.metrics,
3149 })
3150 }
3151 }
3152
3153 #[derive(Debug, Clone, Serialize, Deserialize)]
3154 pub struct ChangeGate {
3155 pub gate_id: Uuid,
3156 pub change_id: String,
3157 pub revision: Option<String>,
3158 pub effect: String,
3159 pub reasons: Value,
3160 pub followups: Value,
3161 pub scorecard: Value,
3162 pub decided_at: DateTime<Utc>,
3163 pub decided_by: Option<String>,
3164 pub metadata: Value,
3165 pub input: Value,
3166 pub telemetry: Value,
3167 pub artifact_sha256: Option<[u8; 32]>,
3168 pub created_at: DateTime<Utc>,
3169 }
3170
3171 #[derive(Debug, Clone, Serialize, Deserialize)]
3172 pub struct NewChangeGate {
3173 pub gate_id: Uuid,
3174 pub change_id: String,
3175 pub revision: Option<String>,
3176 pub effect: String,
3177 pub reasons: Value,
3178 pub followups: Value,
3179 pub scorecard: Value,
3180 pub decided_at: DateTime<Utc>,
3181 pub decided_by: Option<String>,
3182 pub metadata: Value,
3183 pub input: Value,
3184 pub telemetry: Value,
3185 pub artifact_sha256: Option<[u8; 32]>,
3186 }
3187
3188 #[derive(Debug, Clone, Serialize, Deserialize)]
3189 pub struct ChangeGateFollowup {
3190 pub followup_id: Uuid,
3191 pub gate_id: Uuid,
3192 pub actor: String,
3193 pub outcome: String,
3194 pub note: Option<String>,
3195 pub details: Value,
3196 pub recorded_at: DateTime<Utc>,
3197 }
3198
3199 #[derive(Debug, Clone, Serialize, Deserialize)]
3200 pub struct NewChangeGateFollowup {
3201 pub followup_id: Uuid,
3202 pub gate_id: Uuid,
3203 pub actor: String,
3204 pub outcome: String,
3205 pub note: Option<String>,
3206 pub details: Value,
3207 }
3208
3209 impl From<crate::db::ChangeGateRow> for ChangeGate {
3210 fn from(row: crate::db::ChangeGateRow) -> Self {
3211 let artifact_sha256 = row.artifact_sha256.and_then(|vec| {
3212 if vec.len() == 32 {
3213 let mut value = [0u8; 32];
3214 value.copy_from_slice(&vec);
3215 Some(value)
3216 } else {
3217 None
3218 }
3219 });
3220
3221 Self {
3222 gate_id: row.gate_id,
3223 change_id: row.change_id,
3224 revision: row.revision,
3225 effect: row.effect,
3226 reasons: row.reasons,
3227 followups: row.followups,
3228 scorecard: row.scorecard,
3229 decided_at: row.decided_at,
3230 decided_by: row.decided_by,
3231 metadata: row.metadata,
3232 input: row.input,
3233 telemetry: row.telemetry,
3234 artifact_sha256,
3235 created_at: row.created_at,
3236 }
3237 }
3238 }
3239
3240 impl From<crate::db::ChangeGateFollowupRow> for ChangeGateFollowup {
3241 fn from(row: crate::db::ChangeGateFollowupRow) -> Self {
3242 Self {
3243 followup_id: row.followup_id,
3244 gate_id: row.gate_id,
3245 actor: row.actor,
3246 outcome: row.outcome,
3247 note: row.note,
3248 details: row.details,
3249 recorded_at: row.recorded_at,
3250 }
3251 }
3252 }
3253
3254 impl TryFrom<crate::db::TransparencyJobRow> for TransparencyJob {
3255 type Error = anyhow::Error;
3256
3257 fn try_from(row: crate::db::TransparencyJobRow) -> Result<Self> {
3258 let kind = TransparencyJobKind::from_str(&row.kind)?;
3259 let payload: TransparencyJobPayload = serde_json::from_value(row.payload)?;
3260 Ok(TransparencyJob {
3261 job_id: row.job_id,
3262 kind,
3263 payload,
3264 attempts: row.attempts,
3265 last_error: row.error,
3266 receipt: row.receipt,
3267 processed_at: row.processed_at,
3268 created_at: row.created_at,
3269 })
3270 }
3271 }
3272
3273 impl TryFrom<crate::db::ArtifactRow> for Artifact {
3274 type Error = anyhow::Error;
3275
3276 fn try_from(row: crate::db::ArtifactRow) -> Result<Self, Self::Error> {
3277 Ok(Self {
3278 sha256: row.sha256,
3279 media_type: row.media_type,
3280 size_bytes: row.size_bytes,
3281 created_at: row.created_at,
3282 metadata: row.metadata,
3283 })
3284 }
3285 }
3286
3287 impl TryFrom<crate::db::OutboxRow> for OutboxEvent {
3288 type Error = anyhow::Error;
3289
3290 fn try_from(row: crate::db::OutboxRow) -> Result<Self, Self::Error> {
3291 Ok(Self {
3292 id: row.id,
3293 run_id: row.run_id,
3294 step_id: row.step_id,
3295 kind: row.kind,
3296 payload: row.payload,
3297 created_at: row.created_at,
3298 published_at: row.published_at,
3299 })
3300 }
3301 }
3302}
3303
3304impl Storage {
3305 pub async fn upsert_eval_scenario(
3307 &self,
3308 name: &str,
3309 description: Option<&str>,
3310 spec: Value,
3311 ) -> Result<Uuid> {
3312 let scenario_id = sqlx::query_scalar::<_, Uuid>(
3313 r#"
3314 insert into eval_scenarios (name, description, spec)
3315 values ($1, $2, $3)
3316 on conflict (name)
3317 do update set description = excluded.description,
3318 spec = excluded.spec
3319 returning scenario_id
3320 "#,
3321 )
3322 .bind(name)
3323 .bind(description)
3324 .bind(Json(spec))
3325 .fetch_one(&self.pool)
3326 .await?;
3327
3328 Ok(scenario_id)
3329 }
3330
3331 pub async fn fetch_eval_scenario_by_name(
3333 &self,
3334 name: &str,
3335 ) -> Result<Option<models::EvalScenario>> {
3336 let row = sqlx::query_as::<_, db::EvalScenarioRow>(
3337 r#"
3338 select *
3339 from eval_scenarios
3340 where name = $1
3341 "#,
3342 )
3343 .bind(name)
3344 .fetch_optional(&self.pool)
3345 .await?;
3346
3347 row.map(models::EvalScenario::try_from).transpose()
3348 }
3349
3350 pub async fn list_eval_scenarios(&self) -> Result<Vec<models::EvalScenario>> {
3352 let rows = sqlx::query_as::<_, db::EvalScenarioRow>(
3353 r#"
3354 select *
3355 from eval_scenarios
3356 order by name asc
3357 "#,
3358 )
3359 .fetch_all(&self.pool)
3360 .await?;
3361
3362 rows.into_iter()
3363 .map(models::EvalScenario::try_from)
3364 .collect()
3365 }
3366
3367 pub async fn latest_eval_result(
3369 &self,
3370 scenario_id: Uuid,
3371 ) -> Result<Option<models::EvalResult>> {
3372 let row = sqlx::query_as::<_, db::EvalResultRow>(
3373 r#"
3374 select *
3375 from eval_results
3376 where scenario_id = $1
3377 order by executed_at desc
3378 limit 1
3379 "#,
3380 )
3381 .bind(scenario_id)
3382 .fetch_optional(&self.pool)
3383 .await?;
3384
3385 row.map(models::EvalResult::try_from).transpose()
3386 }
3387
3388 pub async fn list_eval_results(
3390 &self,
3391 scenario_id: Uuid,
3392 limit: i64,
3393 ) -> Result<Vec<models::EvalResult>> {
3394 let rows = sqlx::query_as::<_, db::EvalResultRow>(
3395 r#"
3396 select *
3397 from eval_results
3398 where scenario_id = $1
3399 order by executed_at desc
3400 limit $2
3401 "#,
3402 )
3403 .bind(scenario_id)
3404 .bind(limit.max(1))
3405 .fetch_all(&self.pool)
3406 .await?;
3407
3408 rows.into_iter().map(models::EvalResult::try_from).collect()
3409 }
3410
3411 pub async fn insert_eval_result(
3413 &self,
3414 scenario_id: Uuid,
3415 run_id: Uuid,
3416 outcome: &str,
3417 metrics: Value,
3418 ) -> Result<models::EvalResult> {
3419 let row = sqlx::query_as::<_, db::EvalResultRow>(
3420 r#"
3421 insert into eval_results (scenario_id, run_id, outcome, metrics)
3422 values ($1, $2, $3, $4)
3423 on conflict (scenario_id, run_id)
3424 do update set outcome = excluded.outcome,
3425 metrics = excluded.metrics,
3426 executed_at = now()
3427 returning *
3428 "#,
3429 )
3430 .bind(scenario_id)
3431 .bind(run_id)
3432 .bind(outcome)
3433 .bind(Json(metrics))
3434 .fetch_one(&self.pool)
3435 .await?;
3436
3437 models::EvalResult::try_from(row)
3438 }
3439
3440 pub async fn insert_change_gate(
3442 &self,
3443 gate: models::NewChangeGate,
3444 ) -> Result<models::ChangeGate> {
3445 let models::NewChangeGate {
3446 gate_id,
3447 change_id,
3448 revision,
3449 effect,
3450 reasons,
3451 followups,
3452 scorecard,
3453 decided_at,
3454 decided_by,
3455 metadata,
3456 input,
3457 telemetry,
3458 artifact_sha256,
3459 } = gate;
3460
3461 let row = sqlx::query_as::<_, db::ChangeGateRow>(
3462 r#"
3463 insert into change_gates (
3464 gate_id,
3465 change_id,
3466 revision,
3467 effect,
3468 reasons,
3469 followups,
3470 scorecard,
3471 decided_at,
3472 decided_by,
3473 metadata,
3474 input,
3475 telemetry,
3476 artifact_sha256
3477 )
3478 values (
3479 $1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13
3480 )
3481 returning *
3482 "#,
3483 )
3484 .bind(gate_id)
3485 .bind(change_id)
3486 .bind(revision)
3487 .bind(effect)
3488 .bind(Json(reasons))
3489 .bind(Json(followups))
3490 .bind(Json(scorecard))
3491 .bind(decided_at)
3492 .bind(decided_by)
3493 .bind(Json(metadata))
3494 .bind(Json(input))
3495 .bind(Json(telemetry))
3496 .bind(artifact_sha256.map(|digest| digest.to_vec()))
3497 .fetch_one(&self.pool)
3498 .await?;
3499
3500 Ok(models::ChangeGate::from(row))
3501 }
3502
3503 pub async fn update_change_gate_metadata(&self, gate_id: Uuid, metadata: Value) -> Result<()> {
3505 let result = sqlx::query(
3506 r#"
3507 update change_gates
3508 set metadata = $2
3509 where gate_id = $1
3510 "#,
3511 )
3512 .bind(gate_id)
3513 .bind(Json(metadata))
3514 .execute(&self.pool)
3515 .await?;
3516
3517 if result.rows_affected() == 0 {
3518 bail!("change gate {} not found", gate_id);
3519 }
3520
3521 Ok(())
3522 }
3523
3524 pub async fn list_change_gates_for_change(
3526 &self,
3527 change_id: &str,
3528 limit: i64,
3529 ) -> Result<Vec<models::ChangeGate>> {
3530 let rows = sqlx::query_as::<_, db::ChangeGateRow>(
3531 r#"
3532 select *
3533 from change_gates
3534 where change_id = $1
3535 order by decided_at desc
3536 limit $2
3537 "#,
3538 )
3539 .bind(change_id)
3540 .bind(limit.max(1))
3541 .fetch_all(&self.pool)
3542 .await?;
3543
3544 Ok(rows.into_iter().map(models::ChangeGate::from).collect())
3545 }
3546
3547 pub async fn fetch_change_gate(&self, gate_id: Uuid) -> Result<Option<models::ChangeGate>> {
3549 let row = sqlx::query_as::<_, db::ChangeGateRow>(
3550 r#"
3551 select *
3552 from change_gates
3553 where gate_id = $1
3554 "#,
3555 )
3556 .bind(gate_id)
3557 .fetch_optional(&self.pool)
3558 .await?;
3559
3560 Ok(row.map(models::ChangeGate::from))
3561 }
3562
3563 pub async fn enqueue_transparency_job(
3565 &self,
3566 payload: models::TransparencyJobPayload,
3567 ) -> Result<i64> {
3568 let kind = payload.kind().as_str().to_string();
3569 let job_id = sqlx::query_scalar::<_, i64>(
3570 r#"
3571 insert into transparency_jobs (kind, payload)
3572 values ($1, $2)
3573 returning job_id
3574 "#,
3575 )
3576 .bind(kind)
3577 .bind(serde_json::to_value(&payload)?)
3578 .fetch_one(&self.pool)
3579 .await?;
3580
3581 Ok(job_id)
3582 }
3583
3584 pub async fn fetch_pending_transparency_jobs(
3586 &self,
3587 limit: i64,
3588 ) -> Result<Vec<models::TransparencyJob>> {
3589 let rows = sqlx::query_as::<_, db::TransparencyJobRow>(
3590 r#"
3591 select *
3592 from transparency_jobs
3593 where processed_at is null
3594 order by job_id asc
3595 limit $1
3596 "#,
3597 )
3598 .bind(limit)
3599 .fetch_all(&self.pool)
3600 .await?;
3601
3602 rows.into_iter()
3603 .map(models::TransparencyJob::try_from)
3604 .collect()
3605 }
3606
3607 pub async fn fetch_transparency_jobs_by_ids(
3609 &self,
3610 job_ids: &[i64],
3611 ) -> Result<Vec<models::TransparencyJob>> {
3612 if job_ids.is_empty() {
3613 return Ok(Vec::new());
3614 }
3615 let rows = sqlx::query_as::<_, db::TransparencyJobRow>(
3616 r#"
3617 select *
3618 from transparency_jobs
3619 where job_id = any($1)
3620 order by job_id asc
3621 "#,
3622 )
3623 .bind(job_ids)
3624 .fetch_all(&self.pool)
3625 .await?;
3626
3627 rows.into_iter()
3628 .map(models::TransparencyJob::try_from)
3629 .collect()
3630 }
3631
3632 pub async fn mark_transparency_job_processed(&self, job_id: i64, receipt: Value) -> Result<()> {
3634 let result = sqlx::query(
3635 r#"
3636 update transparency_jobs
3637 set processed_at = now(),
3638 receipt = $2,
3639 error = null
3640 where job_id = $1
3641 "#,
3642 )
3643 .bind(job_id)
3644 .bind(Json(receipt))
3645 .execute(&self.pool)
3646 .await?;
3647
3648 if result.rows_affected() == 0 {
3649 bail!("transparency job {} not found", job_id);
3650 }
3651
3652 Ok(())
3653 }
3654
3655 pub async fn mark_transparency_job_failed(&self, job_id: i64, error: String) -> Result<()> {
3657 let result = sqlx::query(
3658 r#"
3659 update transparency_jobs
3660 set error = $2,
3661 attempts = attempts + 1
3662 where job_id = $1
3663 "#,
3664 )
3665 .bind(job_id)
3666 .bind(error)
3667 .execute(&self.pool)
3668 .await?;
3669
3670 if result.rows_affected() == 0 {
3671 bail!("transparency job {} not found", job_id);
3672 }
3673
3674 Ok(())
3675 }
3676
3677 pub async fn insert_change_gate_followup(
3679 &self,
3680 followup: models::NewChangeGateFollowup,
3681 ) -> Result<models::ChangeGateFollowup> {
3682 let models::NewChangeGateFollowup {
3683 followup_id,
3684 gate_id,
3685 actor,
3686 outcome,
3687 note,
3688 details,
3689 } = followup;
3690
3691 let row = sqlx::query_as::<_, db::ChangeGateFollowupRow>(
3692 r#"
3693 insert into change_gate_followups (
3694 followup_id,
3695 gate_id,
3696 actor,
3697 outcome,
3698 note,
3699 details
3700 )
3701 values ($1,$2,$3,$4,$5,$6)
3702 returning *
3703 "#,
3704 )
3705 .bind(followup_id)
3706 .bind(gate_id)
3707 .bind(actor)
3708 .bind(outcome)
3709 .bind(note)
3710 .bind(Json(details))
3711 .fetch_one(&self.pool)
3712 .await?;
3713
3714 Ok(models::ChangeGateFollowup::from(row))
3715 }
3716
3717 pub async fn fetch_change_gate_followups(
3719 &self,
3720 gate_id: Uuid,
3721 ) -> Result<Vec<models::ChangeGateFollowup>> {
3722 let rows = sqlx::query_as::<_, db::ChangeGateFollowupRow>(
3723 r#"
3724 select *
3725 from change_gate_followups
3726 where gate_id = $1
3727 order by recorded_at asc
3728 "#,
3729 )
3730 .bind(gate_id)
3731 .fetch_all(&self.pool)
3732 .await?;
3733
3734 Ok(rows
3735 .into_iter()
3736 .map(models::ChangeGateFollowup::from)
3737 .collect())
3738 }
3739}
3740
3741#[cfg(test)]
3742mod tests {
3743 use super::*;
3744 use crate::InMemoryStorage;
3745 use crate::RunStore;
3746 use bytes::Bytes;
3747 use serde_json::json;
3748 use std::env;
3749 use testcontainers::clients::Cli;
3750 use testcontainers::images::postgres::Postgres;
3751 use testcontainers::Container;
3752 use tokio::sync::Barrier;
3753 use uuid::Uuid;
3754
3755 fn docker_available() -> bool {
3756 std::fs::metadata("/var/run/docker.sock").is_ok() || std::env::var("DOCKER_HOST").is_ok()
3757 }
3758
3759 async fn setup_storage() -> Result<(Storage, Container<'static, Postgres>)> {
3760 let docker = Cli::default();
3761 let container = docker.run(Postgres::default());
3762 let port = container.get_host_port_ipv4(5432);
3763 let database_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
3764
3765 let config = StorageConfig {
3766 database_url,
3767 max_connections: 2,
3768 connect_timeout: Duration::from_secs(10),
3769 object_store: ObjectStoreConfig::InMemory,
3770 };
3771
3772 let storage = Storage::connect(config).await?;
3773 let container: Container<'static, Postgres> = unsafe { std::mem::transmute(container) };
3774 Ok((storage, container))
3775 }
3776
3777 struct EnvGuard {
3778 keys: Vec<&'static str>,
3779 previous: Vec<Option<String>>,
3780 }
3781
3782 impl EnvGuard {
3783 fn new(keys: Vec<&'static str>) -> Self {
3784 let previous = keys
3785 .iter()
3786 .map(|key| env::var(key).ok())
3787 .collect::<Vec<_>>();
3788 Self { keys, previous }
3789 }
3790 }
3791
3792 impl Drop for EnvGuard {
3793 fn drop(&mut self) {
3794 for (idx, key) in self.keys.iter().enumerate() {
3795 if let Some(value) = &self.previous[idx] {
3796 env::set_var(key, value);
3797 } else {
3798 env::remove_var(key);
3799 }
3800 }
3801 }
3802 }
3803
3804 fn seed_step(run_id: Uuid, step_id: Uuid) -> models::NewStep {
3805 models::NewStep {
3806 run_id,
3807 step_id,
3808 idx: 0,
3809 priority: 0,
3810 spec_json: json!({"id": step_id, "type": "tool", "inputs": {}}),
3811 status: models::StepStatus::Queued,
3812 attempt: 0,
3813 input_snapshot: None,
3814 provider: None,
3815 provider_version: None,
3816 pending_dependencies: 0,
3817 total_dependencies: 0,
3818 estimated_tokens: 0,
3819 estimated_cost: 0.0,
3820 actual_tokens: None,
3821 actual_cost: None,
3822 ..models::NewStep::default()
3823 }
3824 }
3825
3826 #[test]
3827 fn s3_config_reads_expected_env() -> Result<()> {
3828 let guard = EnvGuard::new(vec![
3829 "FLEETFORGE_S3_BUCKET",
3830 "FLEETFORGE_S3_ACCESS_KEY",
3831 "FLEETFORGE_S3_SECRET_KEY",
3832 "FLEETFORGE_S3_REGION",
3833 "FLEETFORGE_S3_ENDPOINT",
3834 ]);
3835
3836 env::set_var("FLEETFORGE_S3_BUCKET", "demo-bucket");
3837 env::set_var("FLEETFORGE_S3_ACCESS_KEY", "demo-access");
3838 env::set_var("FLEETFORGE_S3_SECRET_KEY", "demo-secret");
3839 env::set_var("FLEETFORGE_S3_REGION", "us-west-2");
3840 env::set_var("FLEETFORGE_S3_ENDPOINT", "https://s3.demo");
3841
3842 let cfg = S3Config::from_env()?;
3843 assert_eq!(cfg.bucket, "demo-bucket");
3844 assert_eq!(cfg.access_key, "demo-access");
3845 assert_eq!(cfg.secret_key, "demo-secret");
3846 assert_eq!(cfg.region.as_deref(), Some("us-west-2"));
3847 assert_eq!(cfg.endpoint.as_deref(), Some("https://s3.demo"));
3848 assert!(cfg.virtual_hosted_style);
3849
3850 drop(guard);
3851 Ok(())
3852 }
3853
3854 #[cfg_attr(not(feature = "docker-tests"), ignore = "requires Docker")]
3855 #[tokio::test]
3856 async fn migrations_apply_cleanly() -> Result<()> {
3857 if !docker_available() {
3858 eprintln!("Skipping storage::migrations test because Docker is unavailable");
3859 return Ok(());
3860 }
3861
3862 let (storage, container) = setup_storage().await?;
3863
3864 let exists: (i64,) = sqlx::query_as(
3865 "select count(*) from information_schema.tables where table_name = 'runs'",
3866 )
3867 .fetch_one(storage.pool())
3868 .await?;
3869 assert!(
3870 exists.0 >= 0,
3871 "runs table should be accessible after migrations"
3872 );
3873
3874 let run_one = Uuid::new_v4();
3876 let step_root = Uuid::new_v4();
3877 let step_leaf = Uuid::new_v4();
3878
3879 let root_spec = json!({"id": step_root, "type": "tool", "inputs": {}});
3880 let leaf_spec = json!({"id": step_leaf, "type": "tool", "inputs": {}});
3881 let dag_one = json!({
3882 "steps": [root_spec.clone(), leaf_spec.clone()],
3883 "edges": [[step_root, step_leaf]]
3884 });
3885
3886 let _ = storage
3887 .insert_run_with_steps(
3888 models::NewRun {
3889 run_id: run_one,
3890 status: models::RunStatus::Pending,
3891 dag_json: dag_one,
3892 input_ctx: json!({}),
3893 seed: 1,
3894 idempotency_key: None,
3895 },
3896 &[
3897 models::NewStep {
3898 run_id: run_one,
3899 step_id: step_root,
3900 idx: 0,
3901 priority: 0,
3902 spec_json: root_spec,
3903 status: models::StepStatus::Queued,
3904 attempt: 0,
3905 input_snapshot: None,
3906 provider: None,
3907 provider_version: None,
3908 pending_dependencies: 0,
3909 total_dependencies: 0,
3910 estimated_tokens: 0,
3911 estimated_cost: 0.0,
3912 actual_tokens: None,
3913 actual_cost: None,
3914 ..models::NewStep::default()
3915 },
3916 models::NewStep {
3917 run_id: run_one,
3918 step_id: step_leaf,
3919 idx: 1,
3920 priority: 0,
3921 spec_json: leaf_spec,
3922 status: models::StepStatus::Blocked,
3923 attempt: 0,
3924 input_snapshot: None,
3925 provider: None,
3926 provider_version: None,
3927 pending_dependencies: 1,
3928 total_dependencies: 1,
3929 estimated_tokens: 0,
3930 estimated_cost: 0.0,
3931 actual_tokens: None,
3932 actual_cost: None,
3933 ..models::NewStep::default()
3934 },
3935 ],
3936 &[(step_root, step_leaf)],
3937 )
3938 .await?;
3939
3940 let leased = storage.claim_queued_steps(1, "worker-1").await?;
3941 assert_eq!(leased.len(), 1);
3942 assert_eq!(leased[0].status, models::StepStatus::Leased);
3943
3944 let (run_status_one,): (String,) =
3945 sqlx::query_as("select status from runs where run_id = $1")
3946 .bind(run_one)
3947 .fetch_one(storage.pool())
3948 .await?;
3949 assert_eq!(run_status_one, "running");
3950
3951 let mut tx = storage.pool().begin().await?;
3952 storage
3953 .update_step_status_tx(
3954 &mut tx,
3955 run_one,
3956 step_root,
3957 models::StepStatus::Succeeded,
3958 Some(1),
3959 None,
3960 None,
3961 None,
3962 None,
3963 None,
3964 None,
3965 None,
3966 )
3967 .await?;
3968 let unlocked = storage
3969 .unlock_successors_tx(&mut tx, run_one, step_root)
3970 .await?;
3971 assert_eq!(unlocked, vec![step_leaf]);
3972 tx.commit().await?;
3973
3974 let (leaf_status, leaf_pending): (String, i32) = sqlx::query_as(
3975 "select status, pending_dependencies from steps where run_id = $1 and step_id = $2",
3976 )
3977 .bind(run_one)
3978 .bind(step_leaf)
3979 .fetch_one(storage.pool())
3980 .await?;
3981 assert_eq!(leaf_status.as_str(), "queued");
3982 assert_eq!(leaf_pending, 0);
3983
3984 let mut tx = storage.pool().begin().await?;
3986 storage
3987 .update_step_status_tx(
3988 &mut tx,
3989 run_one,
3990 step_leaf,
3991 models::StepStatus::Succeeded,
3992 Some(2),
3993 None,
3994 None,
3995 None,
3996 None,
3997 None,
3998 None,
3999 None,
4000 )
4001 .await?;
4002 let (_, new_status) = storage.refresh_run_status_tx(&mut tx, run_one).await?;
4003 assert_eq!(new_status, models::RunStatus::Succeeded);
4004 tx.commit().await?;
4005
4006 let run_two = Uuid::new_v4();
4008 let step_a = Uuid::new_v4();
4009 let step_b = Uuid::new_v4();
4010 let step_c = Uuid::new_v4();
4011
4012 let spec_a = json!({"id": step_a, "type": "tool", "inputs": {}});
4013 let spec_b = json!({"id": step_b, "type": "tool", "inputs": {}});
4014 let spec_c = json!({"id": step_c, "type": "tool", "inputs": {}});
4015 let dag_two = json!({
4016 "steps": [spec_a.clone(), spec_b.clone(), spec_c.clone()],
4017 "edges": [[step_a, step_b], [step_b, step_c]]
4018 });
4019
4020 let _ = storage
4021 .insert_run_with_steps(
4022 models::NewRun {
4023 run_id: run_two,
4024 status: models::RunStatus::Pending,
4025 dag_json: dag_two,
4026 input_ctx: json!({}),
4027 seed: 1,
4028 idempotency_key: None,
4029 },
4030 &[
4031 models::NewStep {
4032 run_id: run_two,
4033 step_id: step_a,
4034 idx: 0,
4035 priority: 0,
4036 spec_json: spec_a,
4037 status: models::StepStatus::Queued,
4038 attempt: 0,
4039 input_snapshot: None,
4040 provider: None,
4041 provider_version: None,
4042 pending_dependencies: 0,
4043 total_dependencies: 0,
4044 estimated_tokens: 0,
4045 estimated_cost: 0.0,
4046 actual_tokens: None,
4047 actual_cost: None,
4048 ..models::NewStep::default()
4049 },
4050 models::NewStep {
4051 run_id: run_two,
4052 step_id: step_b,
4053 idx: 1,
4054 priority: 0,
4055 spec_json: spec_b,
4056 status: models::StepStatus::Blocked,
4057 attempt: 0,
4058 input_snapshot: None,
4059 provider: None,
4060 provider_version: None,
4061 pending_dependencies: 1,
4062 total_dependencies: 1,
4063 estimated_tokens: 0,
4064 estimated_cost: 0.0,
4065 actual_tokens: None,
4066 actual_cost: None,
4067 ..models::NewStep::default()
4068 },
4069 models::NewStep {
4070 run_id: run_two,
4071 step_id: step_c,
4072 idx: 2,
4073 priority: 0,
4074 spec_json: spec_c,
4075 status: models::StepStatus::Blocked,
4076 attempt: 0,
4077 input_snapshot: None,
4078 provider: None,
4079 provider_version: None,
4080 pending_dependencies: 1,
4081 total_dependencies: 1,
4082 estimated_tokens: 0,
4083 estimated_cost: 0.0,
4084 actual_tokens: None,
4085 actual_cost: None,
4086 ..models::NewStep::default()
4087 },
4088 ],
4089 &[(step_a, step_b), (step_b, step_c)],
4090 )
4091 .await?;
4092
4093 storage.claim_queued_steps(1, "worker-2").await?;
4094 let (run_two_status,): (String,) =
4095 sqlx::query_as("select status from runs where run_id = $1")
4096 .bind(run_two)
4097 .fetch_one(storage.pool())
4098 .await?;
4099 assert_eq!(run_two_status, "running");
4100
4101 let mut tx = storage.pool().begin().await?;
4102 storage
4103 .update_step_status_tx(
4104 &mut tx,
4105 run_two,
4106 step_a,
4107 models::StepStatus::Failed,
4108 Some(1),
4109 None,
4110 None,
4111 None,
4112 None,
4113 None,
4114 None,
4115 None,
4116 )
4117 .await?;
4118 let skipped = storage
4119 .skip_downstream_steps_tx(&mut tx, run_two, step_a)
4120 .await?;
4121 assert_eq!(skipped.len(), 2);
4122 assert!(skipped.contains(&step_b) && skipped.contains(&step_c));
4123 let (_, failed_status) = storage.refresh_run_status_tx(&mut tx, run_two).await?;
4124 assert_eq!(failed_status, models::RunStatus::Failed);
4125 tx.commit().await?;
4126
4127 for target in [step_b, step_c] {
4128 let (status,): (String,) =
4129 sqlx::query_as("select status from steps where run_id = $1 and step_id = $2")
4130 .bind(run_two)
4131 .bind(target)
4132 .fetch_one(storage.pool())
4133 .await?;
4134 assert_eq!(status, "skipped");
4135 }
4136
4137 Ok(())
4138 }
4139
4140 #[tokio::test]
4141 async fn in_memory_store_supports_idempotency() -> Result<()> {
4142 use crate::RunStore;
4143
4144 let store = InMemoryStorage::new();
4145 let run_id = Uuid::new_v4();
4146 let step_id = Uuid::new_v4();
4147 let dag = json!({
4148 "steps": [{
4149 "id": step_id,
4150 "type": "tool",
4151 "inputs": {}
4152 }]
4153 });
4154
4155 let first = store
4156 .insert_run_with_steps(
4157 models::NewRun {
4158 run_id,
4159 status: models::RunStatus::Pending,
4160 dag_json: dag.clone(),
4161 input_ctx: json!({}),
4162 seed: 1,
4163 idempotency_key: Some("mem-key".into()),
4164 },
4165 &[models::NewStep {
4166 run_id,
4167 step_id,
4168 idx: 0,
4169 priority: 0,
4170 spec_json: json!({ "id": step_id, "type": "tool", "inputs": {} }),
4171 status: models::StepStatus::Queued,
4172 attempt: 0,
4173 input_snapshot: None,
4174 provider: None,
4175 provider_version: None,
4176 pending_dependencies: 0,
4177 total_dependencies: 0,
4178 estimated_tokens: 0,
4179 estimated_cost: 0.0,
4180 actual_tokens: None,
4181 actual_cost: None,
4182 ..models::NewStep::default()
4183 }],
4184 &[],
4185 )
4186 .await?;
4187 assert_eq!(first, run_id);
4188
4189 let reused = store
4190 .insert_run_with_steps(
4191 models::NewRun {
4192 run_id: Uuid::new_v4(),
4193 status: models::RunStatus::Pending,
4194 dag_json: dag,
4195 input_ctx: json!({}),
4196 seed: 1,
4197 idempotency_key: Some("mem-key".into()),
4198 },
4199 &[],
4200 &[],
4201 )
4202 .await?;
4203 assert_eq!(
4204 reused, run_id,
4205 "in-memory store should reuse idempotent run id"
4206 );
4207
4208 let fetched = store.fetch_run(run_id).await?.expect("run exists");
4209 assert_eq!(fetched.run_id, run_id);
4210 let steps = store.fetch_steps_for_run(run_id).await?;
4211 assert_eq!(steps.len(), 1);
4212
4213 Ok(())
4214 }
4215
4216 #[cfg_attr(not(feature = "docker-tests"), ignore = "requires Docker")]
4217 #[tokio::test]
4218 async fn idempotency_reuses_existing_run() -> Result<()> {
4219 if !docker_available() {
4220 eprintln!("Skipping idempotency test because Docker is unavailable");
4221 return Ok(());
4222 }
4223
4224 let (storage, container) = setup_storage().await?;
4225 let idempotency_key = "reuse-key";
4226 let run_id = Uuid::new_v4();
4227 let step_id = Uuid::new_v4();
4228 let step_spec = json!({
4229 "id": step_id,
4230 "type": "tool",
4231 "inputs": {}
4232 });
4233 let dag = json!({ "steps": [step_spec.clone()] });
4234
4235 let inserted = storage
4236 .insert_run_with_steps(
4237 models::NewRun {
4238 run_id,
4239 status: models::RunStatus::Pending,
4240 dag_json: dag.clone(),
4241 input_ctx: json!({}),
4242 seed: 1,
4243 idempotency_key: Some(idempotency_key.into()),
4244 },
4245 &[models::NewStep {
4246 run_id,
4247 step_id,
4248 idx: 0,
4249 priority: 0,
4250 spec_json: step_spec,
4251 status: models::StepStatus::Queued,
4252 attempt: 0,
4253 input_snapshot: None,
4254 provider: None,
4255 provider_version: None,
4256 pending_dependencies: 0,
4257 total_dependencies: 0,
4258 estimated_tokens: 0,
4259 estimated_cost: 0.0,
4260 actual_tokens: None,
4261 actual_cost: None,
4262 ..models::NewStep::default()
4263 }],
4264 &[],
4265 )
4266 .await?;
4267 assert_eq!(inserted, run_id, "initial insert should return run id");
4268
4269 let reused = storage
4270 .insert_run_with_steps(
4271 models::NewRun {
4272 run_id: Uuid::new_v4(),
4273 status: models::RunStatus::Pending,
4274 dag_json: dag,
4275 input_ctx: json!({}),
4276 seed: 1,
4277 idempotency_key: Some(idempotency_key.into()),
4278 },
4279 &[],
4280 &[],
4281 )
4282 .await?;
4283 assert_eq!(
4284 reused, run_id,
4285 "second insert with same idempotency key should reuse original run id"
4286 );
4287
4288 drop(container);
4289 Ok(())
4290 }
4291
4292 #[cfg_attr(not(feature = "docker-tests"), ignore = "requires Docker")]
4293 #[tokio::test]
4294 async fn claim_queued_orders_by_priority_then_idx() -> Result<()> {
4295 if std::fs::metadata("/var/run/docker.sock").is_err()
4296 && std::env::var("DOCKER_HOST").is_err()
4297 {
4298 eprintln!("Skipping storage queue test because Docker is unavailable");
4299 return Ok(());
4300 }
4301
4302 let docker = Cli::default();
4303 let container = docker.run(Postgres::default());
4304 let port = container.get_host_port_ipv4(5432);
4305 let database_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
4306
4307 let storage = Storage::connect(StorageConfig {
4308 database_url,
4309 max_connections: 1,
4310 connect_timeout: Duration::from_secs(10),
4311 object_store: ObjectStoreConfig::InMemory,
4312 })
4313 .await?;
4314
4315 let run_id = Uuid::new_v4();
4316 let step_low = Uuid::new_v4();
4317 let step_high_a = Uuid::new_v4();
4318 let step_high_b = Uuid::new_v4();
4319
4320 let _ = storage
4321 .insert_run_with_steps(
4322 models::NewRun {
4323 run_id,
4324 status: models::RunStatus::Pending,
4325 dag_json: json!({"steps": [], "edges": []}),
4326 input_ctx: json!({}),
4327 seed: 1,
4328 idempotency_key: None,
4329 },
4330 &[
4331 models::NewStep {
4332 run_id,
4333 step_id: step_low,
4334 idx: 2,
4335 priority: 1,
4336 spec_json: json!({"id": step_low, "type": "tool", "inputs": {}, "policy": {"priority": 1}}),
4337 status: models::StepStatus::Queued,
4338 attempt: 0,
4339 input_snapshot: None,
4340 provider: None,
4341 provider_version: None,
4342 pending_dependencies: 0,
4343 total_dependencies: 0,
4344 estimated_tokens: 0,
4345 estimated_cost: 0.0,
4346 actual_tokens: None,
4347 actual_cost: None,
4348 ..models::NewStep::default()
4349 },
4350 models::NewStep {
4351 run_id,
4352 step_id: step_high_a,
4353 idx: 0,
4354 priority: 5,
4355 spec_json: json!({"id": step_high_a, "type": "tool", "inputs": {}, "policy": {"priority": 5}}),
4356 status: models::StepStatus::Queued,
4357 attempt: 0,
4358 input_snapshot: None,
4359 provider: None,
4360 provider_version: None,
4361 pending_dependencies: 0,
4362 total_dependencies: 0,
4363 estimated_tokens: 0,
4364 estimated_cost: 0.0,
4365 actual_tokens: None,
4366 actual_cost: None,
4367 ..models::NewStep::default()
4368 },
4369 models::NewStep {
4370 run_id,
4371 step_id: step_high_b,
4372 idx: 1,
4373 priority: 5,
4374 spec_json: json!({"id": step_high_b, "type": "tool", "inputs": {}, "policy": {"priority": 5}}),
4375 status: models::StepStatus::Queued,
4376 attempt: 0,
4377 input_snapshot: None,
4378 provider: None,
4379 provider_version: None,
4380 pending_dependencies: 0,
4381 total_dependencies: 0,
4382 estimated_tokens: 0,
4383 estimated_cost: 0.0,
4384 actual_tokens: None,
4385 actual_cost: None,
4386 ..models::NewStep::default()
4387 },
4388 ],
4389 &[],
4390 )
4391 .await?;
4392
4393 let first = storage
4394 .claim_queued_steps(2, "worker-a")
4395 .await?
4396 .into_iter()
4397 .map(models::Step::try_from)
4398 .collect::<Result<Vec<_>>>()?;
4399 assert_eq!(first.len(), 2);
4400 assert_eq!(first[0].step_id, step_high_a);
4401 assert_eq!(first[1].step_id, step_high_b);
4402 assert!(first.iter().all(|s| s.status == models::StepStatus::Leased));
4403
4404 let second = storage
4405 .claim_queued_steps(2, "worker-b")
4406 .await?
4407 .into_iter()
4408 .map(models::Step::try_from)
4409 .collect::<Result<Vec<_>>>()?;
4410 assert_eq!(second.len(), 1);
4411 assert_eq!(second[0].step_id, step_low);
4412 assert_eq!(second[0].status, models::StepStatus::Leased);
4413
4414 drop(container);
4415 Ok(())
4416 }
4417
4418 #[tokio::test]
4419 async fn list_recent_runs_returns_most_recent_first() -> Result<()> {
4420 if !docker_available() {
4421 eprintln!("Skipping list_recent_runs test because Docker is unavailable");
4422 return Ok(());
4423 }
4424
4425 let (storage, container) = setup_storage().await?;
4426
4427 let run_one = Uuid::new_v4();
4428 let step_one = Uuid::new_v4();
4429 storage
4430 .insert_run_with_steps(
4431 models::NewRun {
4432 run_id: run_one,
4433 status: models::RunStatus::Pending,
4434 dag_json: json!({ "steps": [], "edges": [] }),
4435 input_ctx: json!({}),
4436 seed: 1,
4437 idempotency_key: None,
4438 },
4439 &[models::NewStep {
4440 run_id: run_one,
4441 step_id: step_one,
4442 idx: 0,
4443 priority: 0,
4444 spec_json: json!({"id": step_one, "type": "tool"}),
4445 status: models::StepStatus::Queued,
4446 attempt: 0,
4447 input_snapshot: None,
4448 provider: None,
4449 provider_version: None,
4450 pending_dependencies: 0,
4451 total_dependencies: 0,
4452 estimated_tokens: 0,
4453 estimated_cost: 0.0,
4454 actual_tokens: None,
4455 actual_cost: None,
4456 ..models::NewStep::default()
4457 }],
4458 &[],
4459 )
4460 .await?;
4461
4462 let run_two = Uuid::new_v4();
4463 let step_two = Uuid::new_v4();
4464 storage
4465 .insert_run_with_steps(
4466 models::NewRun {
4467 run_id: run_two,
4468 status: models::RunStatus::Pending,
4469 dag_json: json!({ "steps": [], "edges": [] }),
4470 input_ctx: json!({}),
4471 seed: 2,
4472 idempotency_key: None,
4473 },
4474 &[models::NewStep {
4475 run_id: run_two,
4476 step_id: step_two,
4477 idx: 0,
4478 priority: 0,
4479 spec_json: json!({"id": step_two, "type": "tool"}),
4480 status: models::StepStatus::Queued,
4481 attempt: 0,
4482 input_snapshot: None,
4483 provider: None,
4484 provider_version: None,
4485 pending_dependencies: 0,
4486 total_dependencies: 0,
4487 estimated_tokens: 0,
4488 estimated_cost: 0.0,
4489 actual_tokens: None,
4490 actual_cost: None,
4491 ..models::NewStep::default()
4492 }],
4493 &[],
4494 )
4495 .await?;
4496
4497 let runs = storage.list_recent_runs(10).await?;
4498 assert!(
4499 runs.len() >= 2,
4500 "expected at least two runs in recent listing"
4501 );
4502 assert_eq!(
4503 runs.first().map(|run| run.run_id),
4504 Some(run_two),
4505 "most recent run should appear first"
4506 );
4507
4508 drop(container);
4509 Ok(())
4510 }
4511
4512 #[tokio::test]
4513 async fn run_roundtrip_insert_and_fetch() -> Result<()> {
4514 if std::fs::metadata("/var/run/docker.sock").is_err()
4515 && std::env::var("DOCKER_HOST").is_err()
4516 {
4517 eprintln!("Skipping storage::roundtrip test because Docker is unavailable");
4518 return Ok(());
4519 }
4520
4521 let docker = Cli::default();
4522 let container = docker.run(Postgres::default());
4523 let port = container.get_host_port_ipv4(5432);
4524 let database_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
4525
4526 let storage = Storage::connect(StorageConfig {
4527 database_url,
4528 max_connections: 1,
4529 connect_timeout: Duration::from_secs(10),
4530 object_store: ObjectStoreConfig::InMemory,
4531 })
4532 .await?;
4533
4534 let run_id = Uuid::new_v4();
4535 let step_root = Uuid::new_v4();
4536 let step_leaf = Uuid::new_v4();
4537
4538 let _ = storage
4539 .insert_run_with_steps(
4540 models::NewRun {
4541 run_id,
4542 status: models::RunStatus::Pending,
4543 dag_json: json!({
4544 "steps": [
4545 {"id": step_root, "type": "tool", "inputs": {}},
4546 {"id": step_leaf, "type": "tool", "inputs": {}}
4547 ],
4548 "edges": [[step_root, step_leaf]]
4549 }),
4550 input_ctx: json!({"labels": {"example": "roundtrip"}}),
4551 seed: 99,
4552 idempotency_key: Some("round-trip".into()),
4553 },
4554 &[
4555 models::NewStep {
4556 run_id,
4557 step_id: step_root,
4558 idx: 0,
4559 priority: 5,
4560 spec_json: json!({"id": step_root, "type": "tool", "inputs": {}, "policy": {"priority": 5}}),
4561 status: models::StepStatus::Queued,
4562 attempt: 0,
4563 input_snapshot: None,
4564 provider: None,
4565 provider_version: None,
4566 pending_dependencies: 0,
4567 total_dependencies: 0,
4568 estimated_tokens: 10,
4569 estimated_cost: 0.1,
4570 actual_tokens: None,
4571 actual_cost: None,
4572 ..models::NewStep::default()
4573 },
4574 models::NewStep {
4575 run_id,
4576 step_id: step_leaf,
4577 idx: 1,
4578 priority: 1,
4579 spec_json: json!({"id": step_leaf, "type": "tool", "inputs": {}, "policy": {"priority": 1}}),
4580 status: models::StepStatus::Blocked,
4581 attempt: 0,
4582 input_snapshot: None,
4583 provider: None,
4584 provider_version: None,
4585 pending_dependencies: 1,
4586 total_dependencies: 1,
4587 estimated_tokens: 5,
4588 estimated_cost: 0.05,
4589 actual_tokens: None,
4590 actual_cost: None,
4591 ..models::NewStep::default()
4592 },
4593 ],
4594 &[(step_root, step_leaf)],
4595 )
4596 .await?;
4597
4598 let fetched_run = storage.fetch_run(run_id).await?.expect("run should exist");
4599 assert_eq!(fetched_run.status, models::RunStatus::Pending);
4600 assert_eq!(fetched_run.seed, 99);
4601 assert_eq!(fetched_run.idempotency_key.as_deref(), Some("round-trip"));
4602
4603 let steps = storage.fetch_steps_for_run(run_id).await?;
4604 assert_eq!(steps.len(), 2);
4605 let root = steps.iter().find(|s| s.step_id == step_root).unwrap();
4606 let leaf = steps.iter().find(|s| s.step_id == step_leaf).unwrap();
4607 assert_eq!(root.status, models::StepStatus::Queued);
4608 assert_eq!(leaf.status, models::StepStatus::Blocked);
4609 assert_eq!(root.priority, 5);
4610 assert_eq!(leaf.priority, 1);
4611
4612 let from_key = storage
4613 .fetch_run_by_idempotency_key("round-trip")
4614 .await?
4615 .expect("run by key");
4616 assert_eq!(from_key.run_id, run_id);
4617
4618 drop(container);
4619 Ok(())
4620 }
4621
4622 #[tokio::test]
4623 async fn return_step_to_queue_round_trip() -> Result<()> {
4624 if !docker_available() {
4625 eprintln!("Skipping return_step_to_queue test because Docker is unavailable");
4626 return Ok(());
4627 }
4628
4629 let (storage, container) = setup_storage().await?;
4630
4631 let run_id = Uuid::new_v4();
4632 let step_id = Uuid::new_v4();
4633 let step_spec = json!({"id": step_id, "type": "tool", "inputs": {}});
4634
4635 let _ = storage
4636 .insert_run_with_steps(
4637 models::NewRun {
4638 run_id,
4639 status: models::RunStatus::Pending,
4640 dag_json: json!({"steps": [step_spec.clone()]}),
4641 input_ctx: json!({}),
4642 seed: 1,
4643 idempotency_key: None,
4644 },
4645 &[models::NewStep {
4646 run_id,
4647 step_id,
4648 idx: 0,
4649 priority: 0,
4650 spec_json: step_spec,
4651 status: models::StepStatus::Queued,
4652 attempt: 0,
4653 input_snapshot: None,
4654 provider: None,
4655 provider_version: None,
4656 pending_dependencies: 0,
4657 total_dependencies: 0,
4658 estimated_tokens: 0,
4659 estimated_cost: 0.0,
4660 actual_tokens: None,
4661 actual_cost: None,
4662 ..models::NewStep::default()
4663 }],
4664 &[],
4665 )
4666 .await?;
4667
4668 let leased = storage.claim_queued_steps(1, "worker-test").await?;
4669 assert_eq!(leased.len(), 1);
4670
4671 storage.return_step_to_queue(run_id, step_id).await?;
4672
4673 let (status, leased_by): (String, Option<String>) = sqlx::query_as(
4674 "select status, leased_by from steps where run_id = $1 and step_id = $2",
4675 )
4676 .bind(run_id)
4677 .bind(step_id)
4678 .fetch_one(storage.pool())
4679 .await?;
4680
4681 assert_eq!(status, "queued");
4682 assert!(leased_by.is_none());
4683
4684 drop(container);
4685 Ok(())
4686 }
4687
4688 #[cfg_attr(not(feature = "docker-tests"), ignore = "requires Docker")]
4689 #[tokio::test]
4690 async fn requeue_expired_leases_requeues_all() -> Result<()> {
4691 if !docker_available() {
4692 eprintln!("Skipping requeue_expired_leases test because Docker is unavailable");
4693 return Ok(());
4694 }
4695
4696 let (storage, container) = setup_storage().await?;
4697
4698 let run_id = Uuid::new_v4();
4699 let steps: Vec<(Uuid, serde_json::Value)> = (0..3)
4700 .map(|idx| {
4701 let step_id = Uuid::new_v4();
4702 (
4703 step_id,
4704 json!({
4705 "id": step_id,
4706 "type": "tool",
4707 "inputs": {},
4708 "policy": {"budget": {"tokens": idx + 1}}
4709 }),
4710 )
4711 })
4712 .collect();
4713
4714 let _ = storage
4715 .insert_run_with_steps(
4716 models::NewRun {
4717 run_id,
4718 status: models::RunStatus::Pending,
4719 dag_json: json!({"steps": steps.iter().map(|(_, spec)| spec.clone()).collect::<Vec<_>>()}),
4720 input_ctx: json!({}),
4721 seed: 1,
4722 idempotency_key: None,
4723 },
4724 &steps
4725 .iter()
4726 .enumerate()
4727 .map(|(idx, (step_id, spec))| models::NewStep {
4728 run_id,
4729 step_id: *step_id,
4730 idx: idx as i32,
4731 priority: 0,
4732 spec_json: spec.clone(),
4733 status: models::StepStatus::Queued,
4734 attempt: 0,
4735 input_snapshot: None,
4736 provider: None,
4737 provider_version: None,
4738 pending_dependencies: 0,
4739 total_dependencies: 0,
4740 estimated_tokens: 0,
4741 estimated_cost: 0.0,
4742 actual_tokens: None,
4743 actual_cost: None,
4744 ..models::NewStep::default()
4745 })
4746 .collect::<Vec<_>>(),
4747 &[],
4748 )
4749 .await?;
4750
4751 let _claimed = storage.claim_queued_steps(3, "worker-requeue").await?;
4752 sqlx::query(
4753 "update steps set lease_expires_at = now() - interval '5 minutes' where run_id = $1",
4754 )
4755 .bind(run_id)
4756 .execute(storage.pool())
4757 .await?;
4758
4759 let requeued = storage.requeue_expired_leases(10).await?;
4760 assert_eq!(requeued.len(), steps.len());
4761 let requeued_ids: Vec<Uuid> = requeued.iter().map(|(_, step)| *step).collect();
4762 for (step_id, _) in &steps {
4763 assert!(requeued_ids.contains(step_id));
4764 }
4765
4766 let rows: Vec<(String, Option<String>)> =
4767 sqlx::query_as("select status, leased_by from steps where run_id = $1 order by idx")
4768 .bind(run_id)
4769 .fetch_all(storage.pool())
4770 .await?;
4771 for (status, leased_by) in rows {
4772 assert_eq!(status, "queued");
4773 assert!(leased_by.is_none());
4774 }
4775
4776 drop(container);
4777 Ok(())
4778 }
4779
4780 #[cfg_attr(not(feature = "docker-tests"), ignore = "requires Docker")]
4781 #[tokio::test]
4782 async fn idempotency_prevents_duplicate_runs() -> Result<()> {
4783 if !docker_available() {
4784 eprintln!("Skipping idempotency test because Docker is unavailable");
4785 return Ok(());
4786 }
4787
4788 let (storage, container) = setup_storage().await?;
4789 let key = "dup-key";
4790
4791 let run_one = Uuid::new_v4();
4792 let step_one = Uuid::new_v4();
4793 let step_spec = json!({"id": step_one, "type": "tool", "inputs": {}});
4794
4795 let _ = storage
4796 .insert_run_with_steps(
4797 models::NewRun {
4798 run_id: run_one,
4799 status: models::RunStatus::Pending,
4800 dag_json: json!({"steps": [step_spec.clone()]}),
4801 input_ctx: json!({}),
4802 seed: 1,
4803 idempotency_key: Some(key.to_string()),
4804 },
4805 &[models::NewStep {
4806 run_id: run_one,
4807 step_id: step_one,
4808 idx: 0,
4809 priority: 0,
4810 spec_json: step_spec.clone(),
4811 status: models::StepStatus::Queued,
4812 attempt: 0,
4813 input_snapshot: None,
4814 provider: None,
4815 provider_version: None,
4816 pending_dependencies: 0,
4817 total_dependencies: 0,
4818 estimated_tokens: 0,
4819 estimated_cost: 0.0,
4820 actual_tokens: None,
4821 actual_cost: None,
4822 ..models::NewStep::default()
4823 }],
4824 &[],
4825 )
4826 .await?;
4827
4828 let run_two = Uuid::new_v4();
4829 let duplicate_id = storage
4830 .insert_run_with_steps(
4831 models::NewRun {
4832 run_id: run_two,
4833 status: models::RunStatus::Pending,
4834 dag_json: json!({"steps": [step_spec.clone()]}),
4835 input_ctx: json!({}),
4836 seed: 1,
4837 idempotency_key: Some(key.to_string()),
4838 },
4839 &[models::NewStep {
4840 run_id: run_two,
4841 step_id: Uuid::new_v4(),
4842 idx: 0,
4843 priority: 0,
4844 spec_json: step_spec,
4845 status: models::StepStatus::Queued,
4846 attempt: 0,
4847 input_snapshot: None,
4848 provider: None,
4849 provider_version: None,
4850 pending_dependencies: 0,
4851 total_dependencies: 0,
4852 estimated_tokens: 0,
4853 estimated_cost: 0.0,
4854 actual_tokens: None,
4855 actual_cost: None,
4856 ..models::NewStep::default()
4857 }],
4858 &[],
4859 )
4860 .await?;
4861
4862 assert_eq!(
4863 duplicate_id, run_one,
4864 "idempotent submission should reuse run_id"
4865 );
4866
4867 let steps = storage.fetch_steps_for_run(run_one).await?;
4868 assert_eq!(
4869 steps.len(),
4870 1,
4871 "duplicate submission should not add extra steps"
4872 );
4873
4874 drop(container);
4875 Ok(())
4876 }
4877
4878 #[tokio::test]
4879 async fn audit_chain_attaches_metadata() -> Result<()> {
4880 if !docker_available() {
4881 eprintln!("Skipping audit chain test because Docker is unavailable");
4882 return Ok(());
4883 }
4884
4885 let (storage, container) = setup_storage().await?;
4886 let run_id = Uuid::new_v4();
4887 let step_id = Uuid::new_v4();
4888
4889 storage
4890 .insert_run_with_steps(
4891 models::NewRun {
4892 run_id,
4893 status: models::RunStatus::Pending,
4894 dag_json: json!({"steps": []}),
4895 input_ctx: json!({}),
4896 seed: 1,
4897 idempotency_key: None,
4898 },
4899 &[seed_step(run_id, step_id)],
4900 &[],
4901 )
4902 .await?;
4903
4904 let mut tx = storage.pool().begin().await?;
4905 storage
4906 .enqueue_outbox_event(
4907 &mut tx,
4908 models::NewOutboxEvent {
4909 run_id,
4910 step_id: Some(step_id),
4911 kind: "step_started".to_string(),
4912 payload: json!({"status": "running"}),
4913 },
4914 )
4915 .await?;
4916 storage
4917 .enqueue_outbox_event(
4918 &mut tx,
4919 models::NewOutboxEvent {
4920 run_id,
4921 step_id: Some(step_id),
4922 kind: "step_succeeded".to_string(),
4923 payload: json!({"status": "succeeded"}),
4924 },
4925 )
4926 .await?;
4927 tx.commit().await?;
4928
4929 let events = storage.fetch_outbox_for_run_since(run_id, 0, 10).await?;
4930 assert_eq!(events.len(), 2);
4931
4932 let first_audit = &events[0].payload["_audit"];
4933 let second_audit = &events[1].payload["_audit"];
4934 assert_eq!(first_audit["version"], json!(1));
4935 assert!(first_audit["chain_hash"].as_str().is_some());
4936 assert!(first_audit["previous_hash"].is_null());
4937
4938 let first_hash = first_audit["chain_hash"].as_str().unwrap();
4939 assert_eq!(second_audit["previous_hash"].as_str(), Some(first_hash));
4940 assert!(second_audit["chain_hash"].as_str().is_some());
4941
4942 drop(container);
4943 Ok(())
4944 }
4945
4946 #[tokio::test]
4947 async fn audit_chain_serializes_concurrent_writers() -> Result<()> {
4948 if !docker_available() {
4949 eprintln!("Skipping audit concurrency test because Docker is unavailable");
4950 return Ok(());
4951 }
4952
4953 let (storage, container) = setup_storage().await?;
4954 let storage = Arc::new(storage);
4955 let run_id = Uuid::new_v4();
4956 let step_id = Uuid::new_v4();
4957
4958 storage
4959 .insert_run_with_steps(
4960 models::NewRun {
4961 run_id,
4962 status: models::RunStatus::Pending,
4963 dag_json: json!({"steps": []}),
4964 input_ctx: json!({}),
4965 seed: 1,
4966 idempotency_key: None,
4967 },
4968 &[seed_step(run_id, step_id)],
4969 &[],
4970 )
4971 .await?;
4972
4973 let barrier = Arc::new(Barrier::new(3));
4974 let mut handles = Vec::new();
4975 for (kind, status) in [
4976 ("step_running".to_string(), "running".to_string()),
4977 ("step_succeeded".to_string(), "succeeded".to_string()),
4978 ] {
4979 let storage = Arc::clone(&storage);
4980 let barrier = Arc::clone(&barrier);
4981 let handle = tokio::spawn(async move {
4982 barrier.wait().await;
4983 let mut tx = storage.pool().begin().await?;
4984 storage
4985 .enqueue_outbox_event(
4986 &mut tx,
4987 models::NewOutboxEvent {
4988 run_id,
4989 step_id: Some(step_id),
4990 kind,
4991 payload: json!({"status": status}),
4992 },
4993 )
4994 .await?;
4995 tx.commit().await?;
4996 Ok::<(), anyhow::Error>(())
4997 });
4998 handles.push(handle);
4999 }
5000
5001 barrier.wait().await;
5002 for handle in handles {
5003 handle.await??;
5004 }
5005
5006 let events = storage.fetch_outbox_for_run_since(run_id, 0, 10).await?;
5007 assert_eq!(events.len(), 2);
5008 let first = &events[0].payload["_audit"];
5009 let second = &events[1].payload["_audit"];
5010 assert_eq!(
5011 second["previous_hash"].as_str(),
5012 first["chain_hash"].as_str()
5013 );
5014
5015 drop(container);
5016 Ok(())
5017 }
5018
5019 #[tokio::test]
5020 async fn tenant_quota_enforced() -> Result<()> {
5021 if !docker_available() {
5022 eprintln!("Skipping quota test because Docker is unavailable");
5023 return Ok(());
5024 }
5025
5026 let (storage, container) = setup_storage().await?;
5027 let tenant = "tenant-alpha";
5028 let run_id = Uuid::new_v4();
5029 let step_id = Uuid::new_v4();
5030
5031 storage
5032 .insert_run_with_steps(
5033 models::NewRun {
5034 run_id,
5035 status: models::RunStatus::Pending,
5036 dag_json: json!({"steps": []}),
5037 input_ctx: json!({"labels": {"tenant": tenant}}),
5038 seed: 1,
5039 idempotency_key: None,
5040 },
5041 &[seed_step(run_id, step_id)],
5042 &[],
5043 )
5044 .await?;
5045
5046 sqlx::query(
5047 "insert into tenant_quotas (tenant, daily_token_limit, daily_cost_limit) values ($1, $2, $3)",
5048 )
5049 .bind(tenant)
5050 .bind(1000_i64)
5051 .bind(10.0_f64)
5052 .execute(storage.pool())
5053 .await?;
5054
5055 storage.enforce_tenant_quota(tenant, 100, 1.0).await?;
5056
5057 let mut tx = storage.pool().begin().await?;
5058 storage
5059 .record_billing_usage_tx(&mut tx, run_id, 900, 5.0)
5060 .await?;
5061 tx.commit().await?;
5062
5063 let err = storage
5064 .enforce_tenant_quota(tenant, 200, 1.0)
5065 .await
5066 .expect_err("quota should block overage");
5067 assert!(format!("{err}").contains("quota"));
5068
5069 drop(container);
5070 Ok(())
5071 }
5072
5073 #[tokio::test]
5074 async fn retention_policy_scrubs_payloads() -> Result<()> {
5075 if !docker_available() {
5076 eprintln!("Skipping retention test because Docker is unavailable");
5077 return Ok(());
5078 }
5079
5080 let (storage, container) = setup_storage().await?;
5081 let run_id = Uuid::new_v4();
5082 let step_id = Uuid::new_v4();
5083
5084 storage
5085 .insert_run_with_steps(
5086 models::NewRun {
5087 run_id,
5088 status: models::RunStatus::Pending,
5089 dag_json: json!({"steps": []}),
5090 input_ctx: json!({"inputs": {"secret": "value"}, "labels": {"team": "core"}}),
5091 seed: 1,
5092 idempotency_key: None,
5093 },
5094 &[models::NewStep {
5095 run_id,
5096 step_id,
5097 idx: 0,
5098 priority: 0,
5099 spec_json: json!({"id": step_id, "type": "tool", "inputs": {}}),
5100 status: models::StepStatus::Succeeded,
5101 attempt: 1,
5102 input_snapshot: Some(json!({"secret": "value"})),
5103 provider: None,
5104 provider_version: None,
5105 pending_dependencies: 0,
5106 total_dependencies: 0,
5107 estimated_tokens: 0,
5108 estimated_cost: 0.0,
5109 actual_tokens: Some(5),
5110 actual_cost: Some(0.01),
5111 leased_by: None,
5112 lease_expires_at: None,
5113 output_snapshot: Some(json!({"result": "ok"})),
5114 output_json: Some(json!({"result": "ok"})),
5115 error_json: Some(json!({"message": "none"})),
5116 }],
5117 &[],
5118 )
5119 .await?;
5120
5121 {
5122 let mut tx = storage.pool().begin().await?;
5123 storage
5124 .enqueue_outbox_event(
5125 &mut tx,
5126 models::NewOutboxEvent {
5127 run_id,
5128 step_id: Some(step_id),
5129 kind: "step_succeeded".to_string(),
5130 payload: json!({
5131 "status": "succeeded",
5132 "input": {"secret": "value"},
5133 "output": {"result": "ok"},
5134 "snapshots": {"output": {"result": "ok"}}
5135 }),
5136 },
5137 )
5138 .await?;
5139 tx.commit().await?;
5140 }
5141
5142 sqlx::query("update runs set created_at = now() - interval '10 days' where run_id = $1")
5143 .bind(run_id)
5144 .execute(storage.pool())
5145 .await?;
5146 sqlx::query("update steps set created_at = now() - interval '10 days' where run_id = $1")
5147 .bind(run_id)
5148 .execute(storage.pool())
5149 .await?;
5150
5151 let cutoff = Utc::now() - chrono::Duration::days(7);
5152 let stats = storage.apply_retention_policy(cutoff).await?;
5153 assert!(stats.runs_scrubbed >= 1);
5154 assert!(stats.steps_scrubbed >= 1);
5155 assert!(stats.events_scrubbed >= 1);
5156
5157 let run = storage.fetch_run(run_id).await?.expect("run exists");
5158 assert_eq!(run.input_ctx.get("inputs"), Some(&json!({})));
5159 assert_eq!(run.input_ctx.get("labels"), Some(&json!({})));
5160
5161 let step = storage
5162 .fetch_steps_for_run(run_id)
5163 .await?
5164 .into_iter()
5165 .next()
5166 .unwrap();
5167 assert!(step.input_snapshot.is_none());
5168 assert!(step.output_snapshot.is_none());
5169 assert!(step.output_json.is_none());
5170 assert!(step.error_json.is_none());
5171
5172 let events = storage.fetch_outbox_for_run_since(run_id, 0, 10).await?;
5173 assert_eq!(events.len(), 1);
5174 let payload = &events[0].payload;
5175 assert_eq!(payload.get("status"), Some(&json!("succeeded")));
5176 assert!(payload.get("input").is_none());
5177 assert!(payload.get("output").is_none());
5178 assert!(payload.get("snapshots").is_none());
5179 assert!(payload.get("_audit").is_some());
5180
5181 drop(container);
5182 Ok(())
5183 }
5184
5185 #[tokio::test]
5186 async fn artifact_store_put_get_presign() -> Result<()> {
5187 let store = artifacts::ObjectStoreArtifactStore::new(
5188 Arc::new(object_store::memory::InMemory::new()),
5189 None,
5190 );
5191 let payload = Bytes::from_static(b"hello world");
5192 let metadata = json!({"tag": "greeting"});
5193
5194 let meta = store
5195 .put(payload.clone(), "text/plain", metadata.clone())
5196 .await?;
5197 let fetched = store.get(meta.sha256).await?;
5198 assert_eq!(fetched, payload);
5199
5200 let url = store
5201 .presign_get(meta.sha256, Duration::from_secs(30))
5202 .await?;
5203 assert!(url.as_str().contains(&hex::encode(meta.sha256)));
5204
5205 Ok(())
5206 }
5207}