fleetforge_storage/
lib.rs

1//! Persistent state management: Postgres schema + artifact storage contracts.
2
3use std::collections::HashSet;
4use std::convert::TryInto;
5use std::path::PathBuf;
6use std::str::FromStr;
7use std::sync::Arc;
8use std::time::Duration;
9
10use anyhow::{anyhow, bail, Context, Result};
11use chrono::{DateTime, Utc};
12use serde_json::{json, Map, Value};
13use sha2::{Digest, Sha256};
14use sqlx::postgres::{PgConnectOptions, PgPoolOptions};
15use sqlx::types::Json;
16use sqlx::{PgConnection, Pool, Postgres, QueryBuilder, Row, Transaction};
17use tracing::warn;
18use uuid::Uuid;
19
20static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!();
21
22use crate::models::NewOutboxEvent;
23mod stores;
24pub use artifacts::ObjectStoreArtifactStore;
25pub use fleetforge_contracts::artifacts::{ArtifactMeta, ArtifactStore};
26pub use memory::{InMemoryStorage, RunStore};
27use stores::{DynObjectSigner, DynObjectStore};
28pub mod memory;
29pub mod retention;
30
31pub mod artifacts {
32    use std::convert::TryFrom;
33    use std::time::Duration;
34
35    use anyhow::{bail, Context, Result};
36    use async_trait::async_trait;
37    use bytes::Bytes;
38    use fleetforge_contracts::artifacts::{ArtifactMeta, ArtifactStore};
39    use object_store::path::Path;
40    use object_store::PutPayload;
41    use reqwest::Method;
42    use serde_json::Value;
43    use sha2::{Digest, Sha256};
44    use url::Url;
45
46    use super::{DynObjectSigner, DynObjectStore};
47
48    /// Default `object_store`-backed implementation using content-addressed paths.
49    pub struct ObjectStoreArtifactStore {
50        store: DynObjectStore,
51        signer: Option<DynObjectSigner>,
52    }
53
54    impl ObjectStoreArtifactStore {
55        pub fn new(store: DynObjectStore, signer: Option<DynObjectSigner>) -> Self {
56            Self { store, signer }
57        }
58
59        fn path_for_digest(digest: &[u8; 32]) -> Path {
60            let digest_hex = hex::encode(digest);
61            let prefix_a = &digest_hex[0..2];
62            let prefix_b = &digest_hex[2..4];
63            Path::from(format!("sha256/{}/{}/{}", prefix_a, prefix_b, digest_hex))
64        }
65    }
66
67    #[async_trait]
68    impl ArtifactStore for ObjectStoreArtifactStore {
69        async fn put(
70            &self,
71            bytes: Bytes,
72            media_type: &str,
73            metadata: Value,
74        ) -> Result<ArtifactMeta> {
75            let mut hasher = Sha256::new();
76            hasher.update(&bytes);
77            let digest = hasher.finalize();
78            let mut sha256 = [0u8; 32];
79            sha256.copy_from_slice(&digest);
80
81            let path = Self::path_for_digest(&sha256);
82            let size =
83                u64::try_from(bytes.len()).context("artifact payload size does not fit in u64")?;
84
85            self.store
86                .put(&path, PutPayload::from(bytes))
87                .await
88                .context("failed to write artifact to object_store")?;
89
90            Ok(ArtifactMeta {
91                sha256,
92                media_type: media_type.to_string(),
93                size,
94                metadata,
95            })
96        }
97
98        async fn get(&self, sha256: [u8; 32]) -> Result<Bytes> {
99            let path = Self::path_for_digest(&sha256);
100            let result = self
101                .store
102                .get(&path)
103                .await
104                .context("failed to fetch artifact from object_store")?;
105            result
106                .bytes()
107                .await
108                .context("failed to collect artifact bytes from object_store")
109        }
110
111        async fn presign_get(&self, sha256: [u8; 32], ttl: Duration) -> Result<Url> {
112            let path = Self::path_for_digest(&sha256);
113            let Some(signer) = self.signer.as_ref() else {
114                bail!("artifact backend does not support presigned URLs");
115            };
116            signer
117                .signed_url(Method::GET, &path, ttl)
118                .await
119                .context("failed to create presigned artifact URL from object_store")
120        }
121    }
122}
123
124/// Configuration for storage backends (database + artifacts).
125#[derive(Debug, Clone)]
126pub struct StorageConfig {
127    pub database_url: String,
128    pub max_connections: u32,
129    pub connect_timeout: Duration,
130    pub object_store: ObjectStoreConfig,
131}
132
133impl Default for StorageConfig {
134    fn default() -> Self {
135        let object_store = match ObjectStoreConfig::from_env() {
136            Ok(cfg) => cfg,
137            Err(err) => {
138                eprintln!("warning: falling back to in-memory artifact store: {err}");
139                ObjectStoreConfig::InMemory
140            }
141        };
142        Self {
143            database_url: std::env::var("DATABASE_URL")
144                .unwrap_or_else(|_| "postgres://localhost/fleetforge".to_string()),
145            max_connections: 10,
146            connect_timeout: Duration::from_secs(5),
147            object_store,
148        }
149    }
150}
151
152/// Supported artifact store backends.
153#[derive(Debug, Clone)]
154pub enum ObjectStoreConfig {
155    /// In-memory object store (good for tests and quickstarts).
156    InMemory,
157    /// Amazon S3-compatible object store (AWS, MinIO, etc.).
158    S3(S3Config),
159    /// Google Cloud Storage bucket.
160    Gcs(GcsConfig),
161    /// Azure Blob Storage container.
162    Azure(AzureConfig),
163    /// Generic HTTP object store.
164    Http(HttpConfig),
165}
166
167/// Configuration for S3-compatible object store backends.
168#[derive(Debug, Clone)]
169pub struct S3Config {
170    pub endpoint: Option<String>,
171    pub access_key: String,
172    pub secret_key: String,
173    pub region: Option<String>,
174    pub bucket: String,
175    pub virtual_hosted_style: bool,
176}
177
178/// Configuration for Google Cloud Storage.
179#[derive(Debug, Clone)]
180pub struct GcsConfig {
181    pub bucket: String,
182    pub service_account: GcsServiceAccount,
183}
184
185#[derive(Debug, Clone)]
186pub enum GcsServiceAccount {
187    File(PathBuf),
188    Json(String),
189}
190
191/// Configuration for Azure Blob Storage.
192#[derive(Debug, Clone)]
193pub struct AzureConfig {
194    pub account: String,
195    pub access_key: String,
196    pub container: String,
197}
198
199/// Configuration for HTTP-backed object stores.
200#[derive(Debug, Clone)]
201pub struct HttpConfig {
202    pub base_url: String,
203}
204
205impl ObjectStoreConfig {
206    fn from_env() -> Result<Self> {
207        match std::env::var("FLEETFORGE_OBJECT_STORE")
208            .ok()
209            .as_deref()
210            .map(|s| s.to_lowercase())
211        {
212            Some(ref backend) if backend == "s3" || backend == "minio" => {
213                Ok(ObjectStoreConfig::S3(S3Config::from_env()?))
214            }
215            Some(ref backend) if backend == "gcs" => Ok(ObjectStoreConfig::Gcs(GcsConfig::from_env()?)),
216            Some(ref backend) if backend == "azure" => Ok(ObjectStoreConfig::Azure(AzureConfig::from_env()?)),
217            Some(ref backend) if backend == "http" => Ok(ObjectStoreConfig::Http(HttpConfig::from_env()?)),
218            Some(ref backend) if backend == "in_memory" => Ok(ObjectStoreConfig::InMemory),
219            Some(other) => Err(anyhow!("unsupported object store backend '{}'; expected one of in_memory, s3, gcs, azure, http", other)),
220            None => {
221                if let Ok(cfg) = S3Config::from_minio_env() {
222                    return Ok(ObjectStoreConfig::S3(cfg));
223                }
224                if let Ok(cfg) = S3Config::from_env() {
225                    return Ok(ObjectStoreConfig::S3(cfg));
226                }
227                Ok(ObjectStoreConfig::InMemory)
228            }
229        }
230    }
231}
232
233fn first_env(keys: &[&str]) -> Option<String> {
234    keys.iter()
235        .find_map(|key| std::env::var(key).ok().filter(|value| !value.is_empty()))
236}
237
238#[inline]
239fn exec<'a>(tx: &'a mut Transaction<'_, Postgres>) -> &'a mut PgConnection {
240    tx.as_mut()
241}
242
243impl S3Config {
244    fn from_env() -> Result<Self> {
245        let bucket = first_env(&["FLEETFORGE_S3_BUCKET", "S3_BUCKET", "AWS_S3_BUCKET"])
246            .ok_or_else(|| {
247                anyhow!("S3 bucket not configured; set FLEETFORGE_S3_BUCKET or S3_BUCKET")
248            })?;
249        let access_key = first_env(&[
250            "FLEETFORGE_S3_ACCESS_KEY",
251            "AWS_ACCESS_KEY_ID",
252            "MINIO_ACCESS_KEY",
253        ])
254        .ok_or_else(|| {
255            anyhow!(
256                "S3 access key not configured; set FLEETFORGE_S3_ACCESS_KEY or AWS_ACCESS_KEY_ID"
257            )
258        })?;
259        let secret_key = first_env(&["FLEETFORGE_S3_SECRET_KEY", "AWS_SECRET_ACCESS_KEY", "MINIO_SECRET_KEY"])
260            .ok_or_else(|| anyhow!("S3 secret key not configured; set FLEETFORGE_S3_SECRET_KEY or AWS_SECRET_ACCESS_KEY"))?;
261        let endpoint = first_env(&[
262            "FLEETFORGE_S3_ENDPOINT",
263            "S3_ENDPOINT",
264            "AWS_ENDPOINT_URL_S3",
265            "MINIO_ENDPOINT",
266        ]);
267        let region = first_env(&[
268            "FLEETFORGE_S3_REGION",
269            "S3_REGION",
270            "AWS_REGION",
271            "MINIO_REGION",
272        ]);
273        let virtual_hosted_style = endpoint
274            .as_deref()
275            .map(|ep| !ep.contains("minio"))
276            .unwrap_or(true);
277
278        Ok(Self {
279            endpoint,
280            access_key,
281            secret_key,
282            region,
283            bucket,
284            virtual_hosted_style,
285        })
286    }
287
288    fn from_minio_env() -> Result<Self> {
289        let endpoint = std::env::var("MINIO_ENDPOINT").context("MINIO_ENDPOINT not set")?;
290        let access_key = std::env::var("MINIO_ACCESS_KEY").context("MINIO_ACCESS_KEY not set")?;
291        let secret_key = std::env::var("MINIO_SECRET_KEY").context("MINIO_SECRET_KEY not set")?;
292        let bucket = std::env::var("MINIO_BUCKET").unwrap_or_else(|_| "fleetforge".to_string());
293
294        Ok(Self {
295            endpoint: Some(endpoint),
296            access_key,
297            secret_key,
298            region: std::env::var("MINIO_REGION").ok(),
299            bucket,
300            virtual_hosted_style: false,
301        })
302    }
303}
304
305impl GcsConfig {
306    fn from_env() -> Result<Self> {
307        let bucket = first_env(&["FLEETFORGE_GCS_BUCKET", "GCS_BUCKET"]).ok_or_else(|| {
308            anyhow!("GCS bucket not configured; set FLEETFORGE_GCS_BUCKET or GCS_BUCKET")
309        })?;
310        let service_account = if let Some(json) =
311            first_env(&["GCS_SERVICE_ACCOUNT_JSON", "GOOGLE_SERVICE_ACCOUNT_JSON"])
312        {
313            GcsServiceAccount::Json(json)
314        } else if let Some(path) = first_env(&["GOOGLE_APPLICATION_CREDENTIALS"]) {
315            GcsServiceAccount::File(PathBuf::from(path))
316        } else {
317            return Err(anyhow!(
318                "GCS credentials not configured; set GCS_SERVICE_ACCOUNT_JSON or GOOGLE_APPLICATION_CREDENTIALS"
319            ));
320        };
321
322        Ok(Self {
323            bucket,
324            service_account,
325        })
326    }
327}
328
329impl AzureConfig {
330    fn from_env() -> Result<Self> {
331        let account = first_env(&["FLEETFORGE_AZURE_STORAGE_ACCOUNT", "AZURE_STORAGE_ACCOUNT"])
332            .ok_or_else(|| anyhow!("Azure storage account not configured; set FLEETFORGE_AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCOUNT"))?;
333        let access_key = first_env(&["FLEETFORGE_AZURE_STORAGE_KEY", "AZURE_STORAGE_KEY"])
334            .ok_or_else(|| anyhow!("Azure storage key not configured; set FLEETFORGE_AZURE_STORAGE_KEY or AZURE_STORAGE_KEY"))?;
335        let container = first_env(&["FLEETFORGE_AZURE_CONTAINER", "AZURE_STORAGE_CONTAINER", "AZURE_CONTAINER"])
336            .ok_or_else(|| anyhow!("Azure container not configured; set FLEETFORGE_AZURE_CONTAINER or AZURE_STORAGE_CONTAINER"))?;
337
338        Ok(Self {
339            account,
340            access_key,
341            container,
342        })
343    }
344}
345
346impl HttpConfig {
347    fn from_env() -> Result<Self> {
348        let base_url = first_env(&["FLEETFORGE_HTTP_OBJECT_STORE", "HTTP_OBJECT_STORE_BASE_URL"])
349            .ok_or_else(|| anyhow!("HTTP object store base URL not configured; set FLEETFORGE_HTTP_OBJECT_STORE or HTTP_OBJECT_STORE_BASE_URL"))?;
350        Ok(Self { base_url })
351    }
352}
353
354impl Default for ObjectStoreConfig {
355    fn default() -> Self {
356        ObjectStoreConfig::InMemory
357    }
358}
359
360/// High-level storage handle that encapsulates database and artifact store clients.
361#[derive(Clone)]
362pub struct Storage {
363    pool: Pool<Postgres>,
364    objects: DynObjectStore,
365    artifact_store: Arc<dyn ArtifactStore>,
366}
367
368/// Outcome of attempting to consume budget prior to executing a step.
369#[derive(Debug)]
370pub enum BudgetResult {
371    Granted(models::Budget),
372    Denied,
373    NoBudgetConfigured,
374}
375
376/// Summary of retention actions applied to persisted data.
377#[derive(Debug, Clone, Copy, Default)]
378pub struct RetentionStats {
379    pub runs_scrubbed: u64,
380    pub steps_scrubbed: u64,
381    pub events_scrubbed: u64,
382}
383
384impl Storage {
385    /// Connects to Postgres, applies migrations, and provisions the artifact store.
386    pub async fn connect(config: StorageConfig) -> Result<Self> {
387        let connect_options = PgConnectOptions::from_str(&config.database_url)
388            .context("invalid postgres connection string")?;
389
390        let pool = PgPoolOptions::new()
391            .max_connections(config.max_connections)
392            .acquire_timeout(config.connect_timeout)
393            .connect_with(connect_options)
394            .await?;
395
396        MIGRATOR.run(&pool).await?;
397
398        let (objects, signer) = stores::build(&config.object_store)?;
399
400        let artifact_store: Arc<dyn ArtifactStore> = Arc::new(
401            artifacts::ObjectStoreArtifactStore::new(objects.clone(), signer.clone()),
402        );
403
404        Ok(Self {
405            pool,
406            objects,
407            artifact_store,
408        })
409    }
410
411    /// Convenience helper for default configuration (env-driven DATABASE_URL).
412    pub async fn connect_default() -> Result<Self> {
413        Self::connect(StorageConfig::default()).await
414    }
415
416    /// Returns a borrowed connection pool for ad-hoc queries.
417    pub fn pool(&self) -> &Pool<Postgres> {
418        &self.pool
419    }
420
421    /// Returns a handle to the configured object store.
422    pub fn object_store(&self) -> DynObjectStore {
423        self.objects.clone()
424    }
425
426    /// Returns the artifact store abstraction.
427    pub fn artifacts(&self) -> Arc<dyn ArtifactStore> {
428        self.artifact_store.clone()
429    }
430
431    /// Releases a leased step back to the queue without incrementing attempt counters.
432    pub async fn return_step_to_queue(&self, run_id: Uuid, step_id: Uuid) -> Result<()> {
433        let result = sqlx::query(
434            r#"
435            update steps
436            set status = 'queued',
437                leased_by = null,
438                lease_expires_at = null
439            where run_id = $1
440              and step_id = $2
441            "#,
442        )
443        .bind(run_id)
444        .bind(step_id)
445        .execute(&self.pool)
446        .await?;
447
448        if result.rows_affected() == 0 {
449            bail!(
450                "step {} for run {} not found when returning to queue",
451                step_id,
452                run_id
453            );
454        }
455
456        Ok(())
457    }
458
459    /// Requeues expired leases back to the queued state and returns affected step IDs.
460    pub async fn requeue_expired_leases(&self, limit: i64) -> Result<Vec<(Uuid, Uuid)>> {
461        let rows = sqlx::query(
462            r#"
463            with expired as (
464                select run_id, step_id
465                from steps
466                where status = 'leased'
467                  and lease_expires_at is not null
468                  and lease_expires_at < now()
469                order by lease_expires_at asc
470                limit $1
471            )
472            update steps s
473            set status = 'queued',
474                leased_by = null,
475                lease_expires_at = null,
476                not_before = null
477            from expired
478            where s.run_id = expired.run_id
479              and s.step_id = expired.step_id
480            returning s.run_id, s.step_id
481            "#,
482        )
483        .bind(limit)
484        .fetch_all(&self.pool)
485        .await?;
486
487        let mut requeued = Vec::with_capacity(rows.len());
488        for row in rows {
489            let run_id: Uuid = row.get("run_id");
490            let step_id: Uuid = row.get("step_id");
491            requeued.push((run_id, step_id));
492        }
493        Ok(requeued)
494    }
495
496    pub async fn defer_step_to_queue(
497        &self,
498        run_id: Uuid,
499        step_id: Uuid,
500        not_before: DateTime<Utc>,
501        payload: Value,
502    ) -> Result<()> {
503        let mut tx = self.pool.begin().await?;
504        self.defer_step_to_queue_tx(&mut tx, run_id, step_id, not_before, payload)
505            .await?;
506        tx.commit().await?;
507        Ok(())
508    }
509
510    pub async fn defer_step_to_queue_tx(
511        &self,
512        tx: &mut Transaction<'_, Postgres>,
513        run_id: Uuid,
514        step_id: Uuid,
515        not_before: DateTime<Utc>,
516        payload: Value,
517    ) -> Result<()> {
518        let result = sqlx::query(
519            r#"
520            update steps
521            set status = 'queued',
522                leased_by = null,
523                lease_expires_at = null,
524                not_before = $3
525            where run_id = $1
526              and step_id = $2
527            "#,
528        )
529        .bind(run_id)
530        .bind(step_id)
531        .bind(not_before)
532        .execute(exec(tx))
533        .await?;
534
535        if result.rows_affected() == 0 {
536            bail!(
537                "failed to defer step {} in run {} back to queue",
538                step_id,
539                run_id
540            );
541        }
542
543        self.enqueue_outbox_event(
544            tx,
545            NewOutboxEvent {
546                run_id,
547                step_id: Some(step_id),
548                kind: "step_deferred".to_string(),
549                payload,
550            },
551        )
552        .await?;
553
554        Ok(())
555    }
556
557    pub async fn schedule_retry_tx(
558        &self,
559        tx: &mut Transaction<'_, Postgres>,
560        run_id: Uuid,
561        step_id: Uuid,
562        attempt: i32,
563        not_before: DateTime<Utc>,
564        error_json: Value,
565    ) -> Result<()> {
566        let error_json = Json(error_json);
567        let result = sqlx::query(
568            r#"
569            update steps
570            set status = 'queued',
571                attempt = $3,
572                leased_by = null,
573                lease_expires_at = null,
574                not_before = $4,
575                error_json = $5
576            where run_id = $1
577              and step_id = $2
578            "#,
579        )
580        .bind(run_id)
581        .bind(step_id)
582        .bind(attempt)
583        .bind(not_before)
584        .bind(error_json)
585        .execute(exec(tx))
586        .await?;
587
588        if result.rows_affected() == 0 {
589            bail!(
590                "failed to schedule retry for step {} in run {}",
591                step_id,
592                run_id
593            );
594        }
595
596        Ok(())
597    }
598
599    pub async fn insert_cost_ledger_tx(
600        &self,
601        tx: &mut Transaction<'_, Postgres>,
602        run_id: Uuid,
603        step_id: Uuid,
604        attempt: i32,
605        status: &str,
606        reserved_tokens: i64,
607        reserved_cost: f64,
608        actual_tokens: Option<i64>,
609        actual_cost: Option<f64>,
610    ) -> Result<()> {
611        sqlx::query(
612            r#"
613            insert into step_cost_ledger (
614                run_id, step_id, attempt, status, reserved_tokens, reserved_cost, actual_tokens, actual_cost
615            ) values ($1, $2, $3, $4, $5, $6, $7, $8)
616            "#,
617        )
618        .bind(run_id)
619        .bind(step_id)
620        .bind(attempt)
621        .bind(status)
622        .bind(reserved_tokens)
623        .bind(reserved_cost)
624        .bind(actual_tokens)
625        .bind(actual_cost)
626        .execute(exec(tx))
627        .await?;
628
629        Ok(())
630    }
631
632    pub async fn insert_step_attempt_tx(
633        &self,
634        tx: &mut Transaction<'_, Postgres>,
635        attempt: models::NewStepAttempt,
636    ) -> Result<()> {
637        sqlx::query(
638            r#"
639            insert into step_attempts (
640                run_id,
641                step_id,
642                attempt,
643                status,
644                latency_ms,
645                spec_snapshot,
646                input_snapshot,
647                output_snapshot,
648                error_snapshot,
649                checkpoint,
650                budget_snapshot,
651                guardrail_events,
652                tool_events,
653                artifacts
654            ) values (
655                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14
656            )
657            on conflict (run_id, step_id, attempt) do nothing
658            "#,
659        )
660        .bind(attempt.run_id)
661        .bind(attempt.step_id)
662        .bind(attempt.attempt)
663        .bind(attempt.status.as_str())
664        .bind(attempt.latency_ms)
665        .bind(Json(attempt.spec_snapshot))
666        .bind(attempt.input_snapshot.map(Json))
667        .bind(attempt.output_snapshot.map(Json))
668        .bind(attempt.error_snapshot.map(Json))
669        .bind(attempt.checkpoint.map(Json))
670        .bind(attempt.budget_snapshot.map(Json))
671        .bind(attempt.guardrail_events.map(Json))
672        .bind(attempt.tool_events.map(Json))
673        .bind(attempt.artifacts.map(Json))
674        .execute(exec(tx))
675        .await?;
676
677        Ok(())
678    }
679
680    pub async fn append_run_event_tx(
681        &self,
682        tx: &mut Transaction<'_, Postgres>,
683        run_id: Uuid,
684        step_id: Option<Uuid>,
685        kind: &str,
686        payload: &Value,
687    ) -> Result<()> {
688        sqlx::query(
689            r#"
690            insert into run_event_log (run_id, step_id, kind, payload)
691            values ($1, $2, $3, $4)
692            "#,
693        )
694        .bind(run_id)
695        .bind(step_id)
696        .bind(kind)
697        .bind(Json(payload.clone()))
698        .execute(exec(tx))
699        .await?;
700
701        Ok(())
702    }
703
704    pub async fn trigger_compensation_tx(
705        &self,
706        tx: &mut Transaction<'_, Postgres>,
707        run_id: Uuid,
708        origin_step: Uuid,
709        compensation_step: Uuid,
710    ) -> Result<()> {
711        sqlx::query(
712            r#"
713            update steps
714            set compensation_scheduled = true
715            where run_id = $1
716              and step_id = $2
717            "#,
718        )
719        .bind(run_id)
720        .bind(origin_step)
721        .execute(exec(tx))
722        .await?;
723
724        let result = sqlx::query(
725            r#"
726            update steps
727            set status = 'queued',
728                pending_dependencies = 0,
729                leased_by = null,
730                lease_expires_at = null,
731                not_before = null
732            where run_id = $1
733              and step_id = $2
734            "#,
735        )
736        .bind(run_id)
737        .bind(compensation_step)
738        .execute(exec(tx))
739        .await?;
740
741        if result.rows_affected() == 0 {
742            bail!(
743                "compensation step {} not present in run {}",
744                compensation_step,
745                run_id
746            );
747        }
748
749        Ok(())
750    }
751
752    /// Returns unpublished outbox events up to `limit`, ordered by ID.
753    pub async fn fetch_unpublished_outbox(&self, limit: i64) -> Result<Vec<models::OutboxEvent>> {
754        let rows = sqlx::query_as::<_, db::OutboxRow>(
755            r#"
756            select *
757            from outbox
758            where published_at is null
759            order by id asc
760            limit $1
761            "#,
762        )
763        .bind(limit)
764        .fetch_all(&self.pool)
765        .await?;
766
767        rows.into_iter()
768            .map(models::OutboxEvent::try_from)
769            .collect()
770    }
771
772    /// Marks an outbox row as published by setting `published_at` to the current timestamp.
773    pub async fn mark_outbox_published(&self, id: i64) -> Result<()> {
774        let result = sqlx::query(
775            r#"
776            update outbox
777            set published_at = now()
778            where id = $1
779              and published_at is null
780            "#,
781        )
782        .bind(id)
783        .execute(&self.pool)
784        .await?;
785
786        if result.rows_affected() == 0 {
787            bail!("outbox row {} not found", id);
788        }
789
790        Ok(())
791    }
792
793    /// Returns outbox events for a run that were created after the given id.
794    pub async fn fetch_outbox_for_run_since(
795        &self,
796        run_id: Uuid,
797        after_id: i64,
798        limit: i64,
799    ) -> Result<Vec<models::OutboxEvent>> {
800        let rows = sqlx::query_as::<_, db::OutboxRow>(
801            r#"
802            select *
803            from outbox
804            where run_id = $1
805              and id > $2
806            order by id asc
807            limit $3
808            "#,
809        )
810        .bind(run_id)
811        .bind(after_id)
812        .bind(limit)
813        .fetch_all(&self.pool)
814        .await?;
815
816        rows.into_iter()
817            .map(models::OutboxEvent::try_from)
818            .collect()
819    }
820
821    /// Returns the most recent outbox event for a run, if any.
822    pub async fn fetch_last_outbox_event(
823        &self,
824        run_id: Uuid,
825    ) -> Result<Option<models::OutboxEvent>> {
826        let row = sqlx::query_as::<_, db::OutboxRow>(
827            r#"
828            select *
829            from outbox
830            where run_id = $1
831            order by id desc
832            limit 1
833            "#,
834        )
835        .bind(run_id)
836        .fetch_optional(&self.pool)
837        .await?;
838
839        match row {
840            Some(row) => Ok(Some(models::OutboxEvent::try_from(row)?)),
841            None => Ok(None),
842        }
843    }
844
845    /// Fetch the last acknowledged offset for a subscription cursor, if present.
846    pub async fn fetch_tap_offset(&self, run_id: Uuid, cursor_id: &str) -> Result<Option<i64>> {
847        let offset = sqlx::query_scalar::<_, i64>(
848            r#"
849            select cursor_offset
850            from tap_offsets
851            where run_id = $1
852              and cursor_id = $2
853            "#,
854        )
855        .bind(run_id)
856        .bind(cursor_id)
857        .fetch_optional(&self.pool)
858        .await?;
859
860        Ok(offset)
861    }
862
863    /// Advance the stored offset for a subscription cursor.
864    pub async fn upsert_tap_offset(
865        &self,
866        run_id: Uuid,
867        cursor_id: &str,
868        offset: i64,
869    ) -> Result<()> {
870        sqlx::query(
871            r#"
872            insert into tap_offsets (run_id, cursor_id, cursor_offset)
873            values ($1, $2, $3)
874            on conflict (run_id, cursor_id) do update
875            set cursor_offset = greatest(tap_offsets.cursor_offset, excluded.cursor_offset),
876                updated_at = now()
877            "#,
878        )
879        .bind(run_id)
880        .bind(cursor_id)
881        .bind(offset)
882        .execute(&self.pool)
883        .await?;
884
885        Ok(())
886    }
887
888    /// Applies retention policy by scrubbing input/output payloads for stale runs.
889    pub async fn apply_retention_policy(&self, cutoff: DateTime<Utc>) -> Result<RetentionStats> {
890        if cutoff.timestamp_millis() <= 0 {
891            return Ok(RetentionStats::default());
892        }
893
894        let mut tx = self.pool.begin().await?;
895
896        let scrubbed_runs = sqlx::query(
897            r#"
898            update runs
899            set input_ctx = jsonb_set(
900                jsonb_set(
901                    input_ctx,
902                    '{inputs}',
903                    '{}'::jsonb,
904                    true
905                ),
906                '{labels}',
907                '{}'::jsonb,
908                true
909            )
910            where created_at < $1
911              and (
912                jsonb_typeof(input_ctx -> 'inputs') in ('object', 'array', 'string')
913                or jsonb_typeof(input_ctx -> 'labels') in ('object', 'array', 'string')
914              )
915            "#,
916        )
917        .bind(cutoff)
918        .execute(exec(&mut tx))
919        .await?
920        .rows_affected();
921
922        let scrubbed_steps = sqlx::query(
923            r#"
924            update steps
925            set input_snapshot = null,
926                output_snapshot = null,
927                output_json = null,
928                error_json = null
929            where created_at < $1
930              and (
931                input_snapshot is not null
932                or output_snapshot is not null
933                or output_json is not null
934                or error_json is not null
935            )
936            "#,
937        )
938        .bind(cutoff)
939        .execute(exec(&mut tx))
940        .await?
941        .rows_affected();
942
943        let scrubbed_events = sqlx::query(
944            r#"
945            update outbox
946            set payload = payload - 'snapshots' - 'input' - 'output'
947            where run_id in (
948                select run_id from runs where created_at < $1
949            )
950              and (
951                payload ? 'snapshots'
952                or payload ? 'input'
953                or payload ? 'output'
954              )
955            "#,
956        )
957        .bind(cutoff)
958        .execute(exec(&mut tx))
959        .await?
960        .rows_affected();
961
962        tx.commit().await?;
963
964        Ok(RetentionStats {
965            runs_scrubbed: scrubbed_runs as u64,
966            steps_scrubbed: scrubbed_steps as u64,
967            events_scrubbed: scrubbed_events as u64,
968        })
969    }
970
971    /// Attempts to consume the requested budget; returns granted totals or denial.
972    pub async fn try_consume_budget(
973        &self,
974        run_id: Uuid,
975        tokens: i64,
976        cost: f64,
977    ) -> Result<BudgetResult> {
978        if tokens == 0 && cost == 0.0 {
979            return match self.fetch_budget(run_id).await? {
980                Some(budget) => Ok(BudgetResult::Granted(budget)),
981                None => Ok(BudgetResult::NoBudgetConfigured),
982            };
983        }
984
985        let row = sqlx::query_as::<_, db::BudgetRow>(
986            r#"
987            update budgets
988            set tokens_used = tokens_used + $2,
989                cost_used = cost_used + $3,
990                updated_at = now()
991            where run_id = $1
992              and (tokens_limit is null or tokens_used + $2 <= tokens_limit)
993              and (cost_limit is null or cost_used + $3 <= cost_limit)
994            returning *
995            "#,
996        )
997        .bind(run_id)
998        .bind(tokens)
999        .bind(cost)
1000        .fetch_optional(&self.pool)
1001        .await?;
1002
1003        if let Some(row) = row {
1004            return Ok(BudgetResult::Granted(row.try_into()?));
1005        }
1006
1007        let exists = sqlx::query_scalar::<_, i64>("select 1 from budgets where run_id = $1")
1008            .bind(run_id)
1009            .fetch_optional(&self.pool)
1010            .await?;
1011
1012        if exists.is_some() {
1013            Ok(BudgetResult::Denied)
1014        } else {
1015            Ok(BudgetResult::NoBudgetConfigured)
1016        }
1017    }
1018
1019    /// Fetches the current budget totals for a run, if configured.
1020    pub async fn fetch_budget(&self, run_id: Uuid) -> Result<Option<models::Budget>> {
1021        let row = sqlx::query_as::<_, db::BudgetRow>("select * from budgets where run_id = $1")
1022            .bind(run_id)
1023            .fetch_optional(&self.pool)
1024            .await?;
1025
1026        match row {
1027            Some(row) => Ok(Some(row.try_into()?)),
1028            None => Ok(None),
1029        }
1030    }
1031
1032    /// Returns all step rows for a run ordered by index.
1033    pub async fn fetch_steps_for_run(&self, run_id: Uuid) -> Result<Vec<models::Step>> {
1034        let rows = sqlx::query_as::<_, db::StepRow>(
1035            r#"
1036            select *
1037            from steps
1038            where run_id = $1
1039            order by idx
1040            "#,
1041        )
1042        .bind(run_id)
1043        .fetch_all(&self.pool)
1044        .await?;
1045
1046        rows.into_iter().map(models::Step::try_from).collect()
1047    }
1048
1049    /// Marks a leased step as running for the provided worker.
1050    pub async fn mark_step_running(
1051        &self,
1052        run_id: Uuid,
1053        step_id: Uuid,
1054        worker_id: &str,
1055        input_snapshot: Option<&Value>,
1056        provider: Option<&str>,
1057        provider_version: Option<&str>,
1058    ) -> Result<()> {
1059        let input_snapshot = input_snapshot.map(Json);
1060        let result = sqlx::query(
1061            r#"
1062            update steps
1063            set status = 'running',
1064                attempt = attempt + 1,
1065                input_snapshot = coalesce($4, input_snapshot),
1066                provider = coalesce($5, provider),
1067                provider_version = coalesce($6, provider_version),
1068                not_before = null
1069            where run_id = $1
1070              and step_id = $2
1071              and leased_by = $3
1072              and status = 'leased'
1073            "#,
1074        )
1075        .bind(run_id)
1076        .bind(step_id)
1077        .bind(worker_id)
1078        .bind(input_snapshot)
1079        .bind(provider)
1080        .bind(provider_version)
1081        .execute(&self.pool)
1082        .await?;
1083
1084        if result.rows_affected() == 0 {
1085            bail!(
1086                "step {} for run {} is not leased by worker {}; cannot mark running",
1087                step_id,
1088                run_id,
1089                worker_id
1090            );
1091        }
1092
1093        sqlx::query(
1094            r#"
1095            update runs
1096            set status = 'running'
1097            where run_id = $1
1098              and status = 'pending'
1099            "#,
1100        )
1101        .bind(run_id)
1102        .execute(&self.pool)
1103        .await?;
1104
1105        Ok(())
1106    }
1107
1108    /// Updates the stored budget estimate for a step.
1109    pub async fn update_step_estimate(
1110        &self,
1111        run_id: Uuid,
1112        step_id: Uuid,
1113        estimated_tokens: i64,
1114        estimated_cost: f64,
1115    ) -> Result<()> {
1116        let result = sqlx::query(
1117            r#"
1118            update steps
1119            set estimated_tokens = $3,
1120                estimated_cost = $4
1121            where run_id = $1
1122              and step_id = $2
1123            "#,
1124        )
1125        .bind(run_id)
1126        .bind(step_id)
1127        .bind(estimated_tokens)
1128        .bind(estimated_cost)
1129        .execute(&self.pool)
1130        .await?;
1131
1132        if result.rows_affected() == 0 {
1133            bail!(
1134                "failed to record budget estimate for step {} in run {}",
1135                step_id,
1136                run_id
1137            );
1138        }
1139
1140        Ok(())
1141    }
1142
1143    /// Updates the status, snapshot metadata, and output fields for a step.
1144    pub async fn update_step_status_tx(
1145        &self,
1146        tx: &mut Transaction<'_, Postgres>,
1147        run_id: Uuid,
1148        step_id: Uuid,
1149        status: models::StepStatus,
1150        attempt: Option<i32>,
1151        output_json: Option<Value>,
1152        error_json: Option<Value>,
1153        input_snapshot: Option<Value>,
1154        output_snapshot: Option<Value>,
1155        checkpoint: Option<Value>,
1156        provider: Option<&str>,
1157        provider_version: Option<&str>,
1158    ) -> Result<()> {
1159        let output_json = output_json.map(Json);
1160        let error_json = error_json.map(Json);
1161        let input_snapshot = input_snapshot.map(Json);
1162        let output_snapshot = output_snapshot.map(Json);
1163        let checkpoint = checkpoint.map(Json);
1164
1165        let result = sqlx::query(
1166            r#"
1167            update steps
1168            set status = $1,
1169                attempt = coalesce($2, attempt),
1170                output_json = $3,
1171                error_json = $4,
1172                input_snapshot = coalesce($5, input_snapshot),
1173                output_snapshot = coalesce($6, output_snapshot),
1174                checkpoint = coalesce($7, checkpoint),
1175                provider = coalesce($8, provider),
1176                provider_version = coalesce($9, provider_version),
1177                leased_by = case
1178                    when $1 in ('succeeded','failed','skipped') then null
1179                    else leased_by
1180                end,
1181                lease_expires_at = case
1182                    when $1 in ('succeeded','failed','skipped') then null
1183                    else lease_expires_at
1184                end
1185            where run_id = $10
1186              and step_id = $11
1187            "#,
1188        )
1189        .bind(status.as_str())
1190        .bind(attempt)
1191        .bind(output_json)
1192        .bind(error_json)
1193        .bind(input_snapshot)
1194        .bind(output_snapshot)
1195        .bind(checkpoint)
1196        .bind(provider)
1197        .bind(provider_version)
1198        .bind(run_id)
1199        .bind(step_id)
1200        .execute(exec(tx))
1201        .await?;
1202
1203        if result.rows_affected() == 0 {
1204            bail!(
1205                "failed to update status for step {} in run {}",
1206                step_id,
1207                run_id
1208            );
1209        }
1210
1211        Ok(())
1212    }
1213
1214    pub async fn fetch_step_attempts_for_run(
1215        &self,
1216        run_id: Uuid,
1217    ) -> Result<Vec<models::StepAttempt>> {
1218        let rows = sqlx::query_as::<_, db::StepAttemptRow>(
1219            r#"
1220            select
1221                run_id,
1222                step_id,
1223                attempt,
1224                status,
1225                recorded_at,
1226                latency_ms,
1227                spec_snapshot,
1228                input_snapshot,
1229                output_snapshot,
1230                error_snapshot,
1231                checkpoint,
1232                budget_snapshot,
1233                guardrail_events,
1234                tool_events,
1235                artifacts
1236            from step_attempts
1237            where run_id = $1
1238            order by step_id, attempt
1239            "#,
1240        )
1241        .bind(run_id)
1242        .fetch_all(&self.pool)
1243        .await?;
1244
1245        rows.into_iter()
1246            .map(models::StepAttempt::try_from)
1247            .collect()
1248    }
1249
1250    /// Persists the actual resource usage recorded for a step.
1251    pub async fn update_step_usage_tx(
1252        &self,
1253        tx: &mut Transaction<'_, Postgres>,
1254        run_id: Uuid,
1255        step_id: Uuid,
1256        actual_tokens: Option<i64>,
1257        actual_cost: Option<f64>,
1258    ) -> Result<()> {
1259        let result = sqlx::query(
1260            r#"
1261            update steps
1262            set actual_tokens = $3,
1263                actual_cost = $4
1264            where run_id = $1
1265              and step_id = $2
1266            "#,
1267        )
1268        .bind(run_id)
1269        .bind(step_id)
1270        .bind(actual_tokens)
1271        .bind(actual_cost)
1272        .execute(exec(tx))
1273        .await?;
1274
1275        if result.rows_affected() == 0 {
1276            bail!(
1277                "failed to update usage for step {} in run {}",
1278                step_id,
1279                run_id
1280            );
1281        }
1282
1283        Ok(())
1284    }
1285
1286    /// Adjusts budget totals based on the difference between reserved and actual usage.
1287    pub async fn reconcile_budget_usage_tx(
1288        &self,
1289        tx: &mut Transaction<'_, Postgres>,
1290        run_id: Uuid,
1291        tokens_delta: i64,
1292        cost_delta: f64,
1293    ) -> Result<()> {
1294        if tokens_delta == 0 && cost_delta.abs() < f64::EPSILON {
1295            return Ok(());
1296        }
1297
1298        let result = sqlx::query(
1299            r#"
1300            update budgets
1301            set tokens_used = tokens_used + $2,
1302                cost_used = cost_used + $3,
1303                updated_at = now()
1304            where run_id = $1
1305            "#,
1306        )
1307        .bind(run_id)
1308        .bind(tokens_delta)
1309        .bind(cost_delta)
1310        .execute(exec(tx))
1311        .await?;
1312
1313        if result.rows_affected() == 0 {
1314            warn!(run = %run_id, "reconcile_budget_usage_tx called but no budget row updated");
1315        }
1316
1317        Ok(())
1318    }
1319
1320    /// Inserts a run row with deterministic state materialization.
1321    pub async fn insert_run(&self, run: models::NewRun) -> Result<Uuid> {
1322        self.insert_run_with_steps(run, &[], &[]).await
1323    }
1324
1325    /// Inserts a run and its associated steps within a single transaction.
1326    pub async fn insert_run_with_steps(
1327        &self,
1328        run: models::NewRun,
1329        steps: &[models::NewStep],
1330        edges: &[(Uuid, Uuid)],
1331    ) -> Result<Uuid> {
1332        let mut tx = self.pool.begin().await?;
1333
1334        let models::NewRun {
1335            run_id,
1336            status,
1337            dag_json,
1338            input_ctx,
1339            seed,
1340            idempotency_key,
1341        } = run;
1342
1343        let (actual_run_id, inserted_new) = if let Some(ref key) = idempotency_key {
1344            let inserted = sqlx::query_scalar::<_, Uuid>(
1345                r#"
1346                insert into runs (run_id, status, dag_json, input_ctx, seed, idempotency_key)
1347                values ($1, $2, $3, $4, $5, $6)
1348                on conflict (idempotency_key) do nothing
1349                returning run_id
1350                "#,
1351            )
1352            .bind(run_id)
1353            .bind(status.as_str())
1354            .bind(Json(dag_json.clone()))
1355            .bind(Json(input_ctx.clone()))
1356            .bind(seed)
1357            .bind(key)
1358            .fetch_optional(exec(&mut tx))
1359            .await?;
1360            if let Some(id) = inserted {
1361                (id, true)
1362            } else {
1363                let existing = sqlx::query_scalar::<_, Uuid>(
1364                    "select run_id from runs where idempotency_key = $1",
1365                )
1366                .bind(key)
1367                .fetch_one(exec(&mut tx))
1368                .await?;
1369                (existing, false)
1370            }
1371        } else {
1372            let inserted = sqlx::query_scalar::<_, Uuid>(
1373                r#"
1374                insert into runs (run_id, status, dag_json, input_ctx, seed, idempotency_key)
1375                values ($1, $2, $3, $4, $5, $6)
1376                on conflict (run_id) do nothing
1377                returning run_id
1378                "#,
1379            )
1380            .bind(run_id)
1381            .bind(status.as_str())
1382            .bind(Json(dag_json.clone()))
1383            .bind(Json(input_ctx.clone()))
1384            .bind(seed)
1385            .bind(idempotency_key)
1386            .fetch_optional(exec(&mut tx))
1387            .await?;
1388            (inserted.unwrap_or(run_id), inserted.is_some())
1389        };
1390
1391        if !inserted_new {
1392            tx.rollback().await?;
1393            return Ok(actual_run_id);
1394        }
1395
1396        for step in steps {
1397            sqlx::query(
1398                r#"
1399                insert into steps (
1400                    run_id,
1401                    step_id,
1402                    idx,
1403                    priority,
1404                    spec_json,
1405                    status,
1406                    attempt,
1407                    input_snapshot,
1408                    provider,
1409                    provider_version,
1410                    max_attempts,
1411                    retry_backoff_ms,
1412                    retry_backoff_factor,
1413                    not_before,
1414                    deadline_at,
1415                    pending_dependencies,
1416                    total_dependencies,
1417                    estimated_tokens,
1418                    estimated_cost,
1419                    actual_tokens,
1420                    actual_cost,
1421                    checkpoint,
1422                    compensation_step,
1423                    compensation_scheduled
1424                )
1425                values (
1426                    $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
1427                    $11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
1428                    $21, $22, $23, $24
1429                )
1430                on conflict (run_id, step_id) do update
1431                set idx = excluded.idx,
1432                    priority = excluded.priority,
1433                    spec_json = excluded.spec_json,
1434                    status = excluded.status,
1435                    attempt = excluded.attempt,
1436                    input_snapshot = excluded.input_snapshot,
1437                    provider = excluded.provider,
1438                    provider_version = excluded.provider_version,
1439                    max_attempts = excluded.max_attempts,
1440                    retry_backoff_ms = excluded.retry_backoff_ms,
1441                    retry_backoff_factor = excluded.retry_backoff_factor,
1442                    not_before = excluded.not_before,
1443                    deadline_at = excluded.deadline_at,
1444                    pending_dependencies = excluded.pending_dependencies,
1445                    total_dependencies = excluded.total_dependencies,
1446                    estimated_tokens = excluded.estimated_tokens,
1447                    estimated_cost = excluded.estimated_cost,
1448                    actual_tokens = excluded.actual_tokens,
1449                    actual_cost = excluded.actual_cost,
1450                    checkpoint = excluded.checkpoint,
1451                    compensation_step = excluded.compensation_step,
1452                    compensation_scheduled = excluded.compensation_scheduled
1453                "#,
1454            )
1455            .bind(step.run_id)
1456            .bind(step.step_id)
1457            .bind(step.idx)
1458            .bind(step.priority)
1459            .bind(Json(step.spec_json.clone()))
1460            .bind(step.status.as_str())
1461            .bind(step.attempt)
1462            .bind(step.input_snapshot.clone().map(Json))
1463            .bind(step.provider.clone())
1464            .bind(step.provider_version.clone())
1465            .bind(step.max_attempts)
1466            .bind(step.retry_backoff_ms)
1467            .bind(step.retry_backoff_factor)
1468            .bind(step.not_before)
1469            .bind(step.deadline_at)
1470            .bind(step.pending_dependencies)
1471            .bind(step.total_dependencies)
1472            .bind(step.estimated_tokens)
1473            .bind(step.estimated_cost)
1474            .bind(step.actual_tokens)
1475            .bind(step.actual_cost)
1476            .bind(step.checkpoint.clone().map(Json))
1477            .bind(step.compensation_step)
1478            .bind(step.compensation_scheduled)
1479            .execute(exec(&mut tx))
1480            .await?;
1481        }
1482
1483        if !edges.is_empty() {
1484            let mut builder = QueryBuilder::<Postgres>::new(
1485                "insert into step_edges (run_id, from_step_id, to_step_id) ",
1486            );
1487            builder.push_values(edges, |mut b, (from, to)| {
1488                b.push_bind(actual_run_id).push_bind(from).push_bind(to);
1489            });
1490            builder.push(" on conflict do nothing");
1491            builder.build().execute(exec(&mut tx)).await?;
1492        }
1493
1494        tx.commit().await?;
1495
1496        Ok(actual_run_id)
1497    }
1498
1499    /// Decrements dependency counters for successors and queues newly-ready steps.
1500    pub async fn unlock_successors_tx(
1501        &self,
1502        tx: &mut Transaction<'_, Postgres>,
1503        run_id: Uuid,
1504        step_id: Uuid,
1505    ) -> Result<Vec<Uuid>> {
1506        let rows = sqlx::query(
1507            r#"
1508            with affected as (
1509                select to_step_id
1510                from step_edges
1511                where run_id = $1
1512                  and from_step_id = $2
1513            ),
1514            decremented as (
1515                update steps s
1516                set pending_dependencies = greatest(s.pending_dependencies - 1, 0)
1517                from affected
1518                where s.run_id = $1
1519                  and s.step_id = affected.to_step_id
1520                returning s.step_id, s.pending_dependencies
1521            ),
1522            queued as (
1523                update steps s
1524                set status = 'queued'
1525                from decremented
1526                where s.run_id = $1
1527                  and s.step_id = decremented.step_id
1528                  and decremented.pending_dependencies = 0
1529                  and s.status = 'blocked'
1530                returning s.step_id
1531            )
1532            select step_id from queued
1533            "#,
1534        )
1535        .bind(run_id)
1536        .bind(step_id)
1537        .fetch_all(exec(tx))
1538        .await?;
1539
1540        let mut ready = Vec::with_capacity(rows.len());
1541        for row in rows {
1542            ready.push(row.get::<Uuid, _>("step_id"));
1543        }
1544        Ok(ready)
1545    }
1546
1547    /// Marks all downstream blocked steps as skipped when a dependency fails.
1548    pub async fn skip_downstream_steps_tx(
1549        &self,
1550        tx: &mut Transaction<'_, Postgres>,
1551        run_id: Uuid,
1552        failed_step_id: Uuid,
1553    ) -> Result<Vec<Uuid>> {
1554        let rows = sqlx::query(
1555            r#"
1556            with recursive downstream(step_id) as (
1557                select to_step_id
1558                from step_edges
1559                where run_id = $1
1560                  and from_step_id = $2
1561                union
1562                select se.to_step_id
1563                from step_edges se
1564                join downstream d on se.from_step_id = d.step_id
1565                where se.run_id = $1
1566            ),
1567            updated as (
1568                update steps s
1569                set status = 'skipped',
1570                    pending_dependencies = 0
1571                from downstream
1572                where s.run_id = $1
1573                  and s.step_id = downstream.step_id
1574                  and s.status = 'blocked'
1575                returning s.step_id
1576            )
1577            select step_id from updated
1578            "#,
1579        )
1580        .bind(run_id)
1581        .bind(failed_step_id)
1582        .fetch_all(exec(tx))
1583        .await?;
1584
1585        let mut skipped = Vec::with_capacity(rows.len());
1586        for row in rows {
1587            skipped.push(row.get::<Uuid, _>("step_id"));
1588        }
1589        Ok(skipped)
1590    }
1591
1592    /// Recomputes the run status based on current step outcomes.
1593    pub async fn refresh_run_status_tx(
1594        &self,
1595        tx: &mut Transaction<'_, Postgres>,
1596        run_id: Uuid,
1597    ) -> Result<(models::RunStatus, models::RunStatus)> {
1598        let prev_row = sqlx::query(
1599            r#"
1600            select status
1601            from runs
1602            where run_id = $1
1603            "#,
1604        )
1605        .bind(run_id)
1606        .fetch_one(exec(tx))
1607        .await?;
1608
1609        let row = sqlx::query(
1610            r#"
1611            update runs
1612            set status = case
1613                when exists (
1614                    select 1
1615                    from steps
1616                    where run_id = $1
1617                      and status = 'failed'
1618                ) then 'failed'
1619                when not exists (
1620                    select 1
1621                    from steps
1622                    where run_id = $1
1623                      and status in ('queued','blocked','leased','running')
1624                ) then 'succeeded'
1625                else status
1626            end
1627            where run_id = $1
1628            returning status
1629            "#,
1630        )
1631        .bind(run_id)
1632        .fetch_one(exec(tx))
1633        .await?;
1634
1635        let previous: String = prev_row.get("status");
1636        let current: String = row.get("status");
1637
1638        Ok((
1639            models::RunStatus::try_from_str(&previous)?,
1640            models::RunStatus::try_from_str(&current)?,
1641        ))
1642    }
1643
1644    /// Inserts a step row tied to a run's DAG.
1645    pub async fn insert_step(&self, step: models::NewStep) -> Result<()> {
1646        let models::NewStep {
1647            run_id,
1648            step_id,
1649            idx,
1650            priority,
1651            spec_json,
1652            status,
1653            attempt,
1654            input_snapshot,
1655            provider,
1656            provider_version,
1657            max_attempts,
1658            retry_backoff_ms,
1659            retry_backoff_factor,
1660            not_before,
1661            deadline_at,
1662            pending_dependencies,
1663            total_dependencies,
1664            estimated_tokens,
1665            estimated_cost,
1666            actual_tokens,
1667            actual_cost,
1668            checkpoint,
1669            compensation_step,
1670            compensation_scheduled,
1671        } = step;
1672
1673        sqlx::query(
1674            r#"
1675            insert into steps (
1676                run_id,
1677                step_id,
1678                idx,
1679                priority,
1680                spec_json,
1681                status,
1682                attempt,
1683                input_snapshot,
1684                provider,
1685                provider_version,
1686                max_attempts,
1687                retry_backoff_ms,
1688                retry_backoff_factor,
1689                not_before,
1690                deadline_at,
1691                pending_dependencies,
1692                total_dependencies,
1693                estimated_tokens,
1694                estimated_cost,
1695                actual_tokens,
1696                actual_cost,
1697                checkpoint,
1698                compensation_step,
1699                compensation_scheduled
1700            )
1701            values (
1702                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
1703                $11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
1704                $21, $22, $23, $24
1705            )
1706            on conflict (run_id, step_id) do update
1707            set idx = excluded.idx,
1708                priority = excluded.priority,
1709                spec_json = excluded.spec_json,
1710                status = excluded.status,
1711                attempt = excluded.attempt,
1712                input_snapshot = excluded.input_snapshot,
1713                provider = excluded.provider,
1714                provider_version = excluded.provider_version,
1715                max_attempts = excluded.max_attempts,
1716                retry_backoff_ms = excluded.retry_backoff_ms,
1717                retry_backoff_factor = excluded.retry_backoff_factor,
1718                not_before = excluded.not_before,
1719                deadline_at = excluded.deadline_at,
1720                pending_dependencies = excluded.pending_dependencies,
1721                total_dependencies = excluded.total_dependencies,
1722                estimated_tokens = excluded.estimated_tokens,
1723                estimated_cost = excluded.estimated_cost,
1724                actual_tokens = excluded.actual_tokens,
1725                actual_cost = excluded.actual_cost,
1726                checkpoint = excluded.checkpoint,
1727                compensation_step = excluded.compensation_step,
1728                compensation_scheduled = excluded.compensation_scheduled
1729            "#,
1730        )
1731        .bind(run_id)
1732        .bind(step_id)
1733        .bind(idx)
1734        .bind(priority)
1735        .bind(Json(spec_json))
1736        .bind(status.as_str())
1737        .bind(attempt)
1738        .bind(input_snapshot.map(Json))
1739        .bind(provider)
1740        .bind(provider_version)
1741        .bind(max_attempts)
1742        .bind(retry_backoff_ms)
1743        .bind(retry_backoff_factor)
1744        .bind(not_before)
1745        .bind(deadline_at)
1746        .bind(pending_dependencies)
1747        .bind(total_dependencies)
1748        .bind(estimated_tokens)
1749        .bind(estimated_cost)
1750        .bind(actual_tokens)
1751        .bind(actual_cost)
1752        .bind(checkpoint.map(Json))
1753        .bind(compensation_step)
1754        .bind(compensation_scheduled)
1755        .execute(&self.pool)
1756        .await?;
1757
1758        Ok(())
1759    }
1760
1761    /// Fetches a single run record by ID.
1762    pub async fn fetch_run(&self, run_id: Uuid) -> Result<Option<models::Run>> {
1763        let row = sqlx::query_as::<_, db::RunRow>(
1764            r#"
1765            select *
1766            from runs
1767            where run_id = $1
1768            "#,
1769        )
1770        .bind(run_id)
1771        .fetch_optional(&self.pool)
1772        .await?;
1773
1774        match row {
1775            Some(row) => Ok(Some(row.try_into()?)),
1776            None => Ok(None),
1777        }
1778    }
1779
1780    pub async fn update_run_input_ctx(&self, run_id: Uuid, input_ctx: Value) -> Result<()> {
1781        let result = sqlx::query(
1782            r#"
1783            update runs
1784            set input_ctx = $2
1785            where run_id = $1
1786            "#,
1787        )
1788        .bind(run_id)
1789        .bind(Json(input_ctx))
1790        .execute(&self.pool)
1791        .await?;
1792
1793        if result.rows_affected() == 0 {
1794            bail!("run {} not found", run_id);
1795        }
1796
1797        Ok(())
1798    }
1799
1800    /// Updates the transparency receipt embedded in a run-level artifact entry.
1801    pub async fn update_run_artifact_transparency(
1802        &self,
1803        run_id: Uuid,
1804        artifact_key: &str,
1805        transparency: Value,
1806    ) -> Result<()> {
1807        let Some(run) = self.fetch_run(run_id).await? else {
1808            return Ok(());
1809        };
1810        let mut ctx_map = match run.input_ctx {
1811            Value::Object(map) => map,
1812            other => {
1813                let mut map = Map::new();
1814                map.insert("inputs".to_string(), other);
1815                map
1816            }
1817        };
1818        let artifacts_value = ctx_map
1819            .entry("artifacts".to_string())
1820            .or_insert_with(|| Value::Object(Map::new()));
1821        let Value::Object(artifacts_map) = artifacts_value else {
1822            return Ok(());
1823        };
1824        if let Some(entry) = artifacts_map.get_mut(artifact_key) {
1825            if let Value::Object(ref mut artifact_obj) = entry {
1826                artifact_obj.insert("transparency".to_string(), transparency);
1827            } else {
1828                let mut object = Map::new();
1829                object.insert("transparency".to_string(), transparency);
1830                artifacts_map.insert(artifact_key.to_string(), Value::Object(object));
1831            }
1832            self.update_run_input_ctx(run_id, Value::Object(ctx_map))
1833                .await
1834        } else {
1835            Ok(())
1836        }
1837    }
1838
1839    /// Updates the transparency receipt for a step-level artifact stored in `steps.output_json`.
1840    pub async fn update_step_artifact_transparency(
1841        &self,
1842        run_id: Uuid,
1843        step_id: Uuid,
1844        artifact_key: &str,
1845        transparency: Value,
1846    ) -> Result<()> {
1847        let row = sqlx::query(
1848            r#"
1849            select output_json
1850            from steps
1851            where run_id = $1 and step_id = $2
1852            "#,
1853        )
1854        .bind(run_id)
1855        .bind(step_id)
1856        .fetch_optional(&self.pool)
1857        .await?;
1858
1859        let Some(record) = row else {
1860            return Ok(());
1861        };
1862
1863        let Some(Json(mut output_value)) = record
1864            .try_get::<Option<Json<Value>>, _>("output_json")
1865            .context("failed to deserialize step output_json")?
1866        else {
1867            return Ok(());
1868        };
1869
1870        let Some(map) = output_value.as_object_mut() else {
1871            return Ok(());
1872        };
1873        let artifacts_entry = map
1874            .entry("artifacts".to_string())
1875            .or_insert_with(|| Value::Object(Map::new()));
1876        let Value::Object(artifacts_map) = artifacts_entry else {
1877            return Ok(());
1878        };
1879        if let Some(entry) = artifacts_map.get_mut(artifact_key) {
1880            match entry {
1881                Value::Object(obj) => {
1882                    obj.insert("transparency".to_string(), transparency);
1883                }
1884                _ => {
1885                    let mut object = Map::new();
1886                    object.insert("transparency".to_string(), transparency);
1887                    *entry = Value::Object(object);
1888                }
1889            }
1890        } else {
1891            return Ok(());
1892        }
1893
1894        sqlx::query(
1895            r#"
1896            update steps
1897            set output_json = $3
1898            where run_id = $1 and step_id = $2
1899            "#,
1900        )
1901        .bind(run_id)
1902        .bind(step_id)
1903        .bind(Json(output_value))
1904        .execute(&self.pool)
1905        .await?;
1906
1907        Ok(())
1908    }
1909
1910    pub async fn update_run_input_ctx_tx(
1911        &self,
1912        tx: &mut Transaction<'_, Postgres>,
1913        run_id: Uuid,
1914        input_ctx: Value,
1915    ) -> Result<()> {
1916        let result = sqlx::query(
1917            r#"
1918            update runs
1919            set input_ctx = $2
1920            where run_id = $1
1921            "#,
1922        )
1923        .bind(run_id)
1924        .bind(Json(input_ctx))
1925        .execute(exec(tx))
1926        .await?;
1927
1928        if result.rows_affected() == 0 {
1929            bail!("run {} not found", run_id);
1930        }
1931
1932        Ok(())
1933    }
1934
1935    pub async fn set_run_status(&self, run_id: Uuid, status: models::RunStatus) -> Result<()> {
1936        let result = sqlx::query(
1937            r#"
1938            update runs
1939            set status = $2
1940            where run_id = $1
1941            "#,
1942        )
1943        .bind(run_id)
1944        .bind(status.as_str())
1945        .execute(&self.pool)
1946        .await?;
1947
1948        if result.rows_affected() == 0 {
1949            bail!("run {} not found", run_id);
1950        }
1951
1952        Ok(())
1953    }
1954
1955    pub async fn set_run_status_tx(
1956        &self,
1957        tx: &mut Transaction<'_, Postgres>,
1958        run_id: Uuid,
1959        status: models::RunStatus,
1960    ) -> Result<()> {
1961        let result = sqlx::query(
1962            r#"
1963            update runs
1964            set status = $2
1965            where run_id = $1
1966            "#,
1967        )
1968        .bind(run_id)
1969        .bind(status.as_str())
1970        .execute(exec(tx))
1971        .await?;
1972
1973        if result.rows_affected() == 0 {
1974            bail!("run {} not found", run_id);
1975        }
1976
1977        Ok(())
1978    }
1979
1980    /// Returns the tenant label for the given run, if one is set.
1981    pub async fn fetch_run_tenant(&self, run_id: Uuid) -> Result<Option<String>> {
1982        let tenant: Option<String> = sqlx::query_scalar(
1983            r#"
1984            select input_ctx -> 'labels' ->> 'tenant'
1985            from runs
1986            where run_id = $1
1987            "#,
1988        )
1989        .bind(run_id)
1990        .fetch_optional(&self.pool)
1991        .await?;
1992
1993        Ok(tenant.and_then(|value| if value.is_empty() { None } else { Some(value) }))
1994    }
1995
1996    /// Ensures the provided tenant does not exceed the configured daily quota.
1997    pub async fn enforce_tenant_quota(&self, tenant: &str, tokens: i64, cost: f64) -> Result<()> {
1998        let quota: Option<(Option<i64>, Option<f64>)> = sqlx::query_as(
1999            r#"
2000            select daily_token_limit, daily_cost_limit
2001            from tenant_quotas
2002            where tenant = $1
2003            "#,
2004        )
2005        .bind(tenant)
2006        .fetch_optional(&self.pool)
2007        .await?;
2008
2009        let Some((token_limit, cost_limit)) = quota else {
2010            return Ok(());
2011        };
2012
2013        let usage: (i64, f64) = sqlx::query_as(
2014            r#"
2015            select coalesce(sum(tokens), 0), coalesce(sum(cost), 0)
2016            from billing_usage
2017            where tenant = $1
2018              and recorded_at >= date_trunc('day', now())
2019            "#,
2020        )
2021        .bind(tenant)
2022        .fetch_one(&self.pool)
2023        .await?;
2024
2025        let prospective_tokens = usage.0.saturating_add(tokens.max(0));
2026        if let Some(limit) = token_limit {
2027            if prospective_tokens > limit {
2028                bail!("tenant '{}' exceeded daily token quota", tenant);
2029            }
2030        }
2031
2032        let prospective_cost = usage.1 + cost.max(0.0);
2033        if let Some(limit) = cost_limit {
2034            if prospective_cost > limit {
2035                bail!("tenant '{}' exceeded daily cost quota", tenant);
2036            }
2037        }
2038
2039        Ok(())
2040    }
2041
2042    /// Records billing usage for a run inside an existing transaction.
2043    pub async fn record_billing_usage_tx(
2044        &self,
2045        tx: &mut Transaction<'_, Postgres>,
2046        run_id: Uuid,
2047        tokens: i64,
2048        cost: f64,
2049    ) -> Result<()> {
2050        sqlx::query(
2051            r#"
2052            insert into billing_usage (run_id, tenant, tokens, cost)
2053            select $1, input_ctx -> 'labels' ->> 'tenant', $2, $3
2054            from runs
2055            where run_id = $1
2056            "#,
2057        )
2058        .bind(run_id)
2059        .bind(tokens)
2060        .bind(cost)
2061        .execute(exec(tx))
2062        .await?;
2063
2064        Ok(())
2065    }
2066
2067    /// Lists recent runs ordered by creation time.
2068    pub async fn list_recent_runs(&self, limit: i64) -> Result<Vec<models::Run>> {
2069        let limit = limit.max(1);
2070        let rows = sqlx::query_as::<_, db::RunRow>(
2071            r#"
2072            select *
2073            from runs
2074            order by created_at desc
2075            limit $1
2076            "#,
2077        )
2078        .bind(limit)
2079        .fetch_all(&self.pool)
2080        .await?;
2081
2082        rows.into_iter().map(models::Run::try_from).collect()
2083    }
2084
2085    /// Fetches a run by idempotency key if present.
2086    pub async fn fetch_run_by_idempotency_key(
2087        &self,
2088        idempotency_key: &str,
2089    ) -> Result<Option<models::Run>> {
2090        let row = sqlx::query_as::<_, db::RunRow>(
2091            r#"
2092            select *
2093            from runs
2094            where idempotency_key = $1
2095            "#,
2096        )
2097        .bind(idempotency_key)
2098        .fetch_optional(&self.pool)
2099        .await?;
2100
2101        match row {
2102            Some(row) => Ok(Some(row.try_into()?)),
2103            None => Ok(None),
2104        }
2105    }
2106
2107    /// Persists artifact metadata while blob bytes live in the object store.
2108    pub async fn upsert_artifact(&self, artifact: models::NewArtifact) -> Result<()> {
2109        let models::NewArtifact {
2110            sha256,
2111            media_type,
2112            size_bytes,
2113            metadata,
2114        } = artifact;
2115
2116        let metadata = metadata.map(Json);
2117
2118        sqlx::query(
2119            r#"
2120            insert into artifacts (sha256, media_type, size_bytes, metadata)
2121            values ($1, $2, $3, $4)
2122            on conflict (sha256) do update
2123            set media_type = excluded.media_type,
2124                size_bytes = excluded.size_bytes,
2125                metadata = excluded.metadata
2126            "#,
2127        )
2128        .bind(sha256)
2129        .bind(media_type)
2130        .bind(size_bytes)
2131        .bind(metadata)
2132        .execute(&self.pool)
2133        .await?;
2134
2135        Ok(())
2136    }
2137
2138    /// Appends an outbox event within an existing transaction for exactly-once delivery.
2139    pub async fn enqueue_outbox_event(
2140        &self,
2141        tx: &mut Transaction<'_, Postgres>,
2142        event: models::NewOutboxEvent,
2143    ) -> Result<i64> {
2144        let models::NewOutboxEvent {
2145            run_id,
2146            step_id,
2147            kind,
2148            mut payload,
2149        } = event;
2150
2151        self.attach_outbox_audit(tx, run_id, step_id, &kind, &mut payload)
2152            .await?;
2153
2154        self.append_run_event_tx(tx, run_id, step_id, &kind, &payload)
2155            .await?;
2156
2157        let id = sqlx::query_scalar(
2158            r#"
2159            insert into outbox (run_id, step_id, kind, payload)
2160            values ($1, $2, $3, $4)
2161            returning id
2162            "#,
2163        )
2164        .bind(run_id)
2165        .bind(step_id)
2166        .bind(&kind)
2167        .bind(Json(payload))
2168        .fetch_one(exec(tx))
2169        .await?;
2170
2171        Ok(id)
2172    }
2173
2174    fn advisory_lock_key(run_id: Uuid) -> (i64, i64) {
2175        let value = run_id.as_u128();
2176        let hi = (value >> 64) as i64;
2177        let lo = value as i64;
2178        (hi, lo)
2179    }
2180
2181    async fn attach_outbox_audit(
2182        &self,
2183        tx: &mut Transaction<'_, Postgres>,
2184        run_id: Uuid,
2185        step_id: Option<Uuid>,
2186        kind: &str,
2187        payload: &mut Value,
2188    ) -> Result<()> {
2189        if payload.as_object().is_none() {
2190            bail!(
2191                "outbox payload for run {} kind '{}' must be a JSON object",
2192                run_id,
2193                kind
2194            );
2195        }
2196
2197        let (lock_a, lock_b) = Self::advisory_lock_key(run_id);
2198        sqlx::query("select pg_advisory_xact_lock($1, $2)")
2199            .bind(lock_a)
2200            .bind(lock_b)
2201            .fetch_optional(exec(tx))
2202            .await?;
2203
2204        let previous_hash: Option<String> = sqlx::query_scalar(
2205            r#"
2206            select payload->'_audit'->>'chain_hash'
2207            from outbox
2208            where run_id = $1
2209            order by id desc
2210            limit 1
2211            for update
2212            "#,
2213        )
2214        .bind(run_id)
2215        .fetch_optional(exec(tx))
2216        .await?;
2217
2218        let emitted_at = Utc::now();
2219        let canonical = serde_json::to_vec(payload)
2220            .context("failed to serialise outbox payload for audit hashing")?;
2221        let payload_hash = Sha256::digest(&canonical);
2222        let payload_hash_bytes: &[u8] = payload_hash.as_ref();
2223
2224        let mut hasher = Sha256::new();
2225        if let Some(prev) = &previous_hash {
2226            hasher.update(prev.as_bytes());
2227        }
2228        hasher.update(run_id.as_bytes());
2229        if let Some(step) = step_id {
2230            hasher.update(step.as_bytes());
2231        }
2232        hasher.update(kind.as_bytes());
2233        hasher.update(payload_hash_bytes);
2234        let timestamp = emitted_at.timestamp_nanos_opt().unwrap_or(0);
2235        hasher.update(timestamp.to_be_bytes());
2236        let chain_hash = hex::encode(hasher.finalize());
2237
2238        if let Some(map) = payload.as_object_mut() {
2239            map.insert(
2240                "_audit".to_string(),
2241                json!({
2242                    "version": 1,
2243                    "chain_hash": chain_hash,
2244                    "payload_hash": hex::encode(payload_hash_bytes),
2245                    "previous_hash": previous_hash,
2246                    "emitted_at": emitted_at,
2247                }),
2248            );
2249        }
2250
2251        Ok(())
2252    }
2253
2254    /// Claims queued steps using `FOR UPDATE SKIP LOCKED` for safe parallel scheduling.
2255    pub async fn claim_queued_steps(
2256        &self,
2257        limit: i64,
2258        worker_id: &str,
2259    ) -> Result<Vec<models::Step>> {
2260        let mut tx = self.pool.begin().await?;
2261        let rows: Vec<db::StepRow> = sqlx::query_as(
2262            r#"
2263            with cte as (
2264                select s.run_id, s.step_id
2265                from steps s
2266                inner join runs r on r.run_id = s.run_id
2267                where s.status = 'queued'
2268                  and s.pending_dependencies = 0
2269                  and r.status <> 'paused_at_step'
2270                  and (s.not_before is null or s.not_before <= now())
2271                order by s.priority desc, coalesce(s.not_before, s.created_at) asc, s.idx
2272                for update skip locked
2273                limit $1
2274            )
2275            update steps s set
2276                status = 'leased',
2277                leased_by = $2,
2278                lease_expires_at = now() + interval '2 minutes'
2279            from cte
2280            where s.run_id = cte.run_id and s.step_id = cte.step_id
2281            returning s.*
2282            "#,
2283        )
2284        .bind(limit)
2285        .bind(worker_id)
2286        .fetch_all(exec(&mut tx))
2287        .await?;
2288
2289        if !rows.is_empty() {
2290            let run_ids_set: HashSet<Uuid> = rows.iter().map(|row| row.run_id).collect();
2291            let run_ids: Vec<Uuid> = run_ids_set.into_iter().collect();
2292
2293            sqlx::query(
2294                r#"
2295                update runs
2296                set status = 'running'
2297                where status = 'pending'
2298                  and run_id = any($1)
2299                "#,
2300            )
2301            .bind(&run_ids)
2302            .execute(exec(&mut tx))
2303            .await?;
2304        }
2305        tx.commit().await?;
2306
2307        rows.into_iter()
2308            .map(models::Step::try_from)
2309            .collect::<Result<Vec<_>>>()
2310    }
2311
2312    pub async fn revert_step_to_queue(&self, run_id: Uuid, step_id: Uuid) -> Result<()> {
2313        let result = sqlx::query(
2314            r#"
2315            update steps
2316            set status = 'queued',
2317                leased_by = null,
2318                lease_expires_at = null
2319            where run_id = $1
2320              and step_id = $2
2321            "#,
2322        )
2323        .bind(run_id)
2324        .bind(step_id)
2325        .execute(&self.pool)
2326        .await?;
2327
2328        if result.rows_affected() == 0 {
2329            bail!(
2330                "failed to revert step {} to queue for run {}",
2331                step_id,
2332                run_id
2333            );
2334        }
2335        Ok(())
2336    }
2337
2338    pub async fn revert_step_to_queue_tx(
2339        &self,
2340        tx: &mut Transaction<'_, Postgres>,
2341        run_id: Uuid,
2342        step_id: Uuid,
2343    ) -> Result<()> {
2344        let result = sqlx::query(
2345            r#"
2346            update steps
2347            set status = 'queued',
2348                leased_by = null,
2349                lease_expires_at = null
2350            where run_id = $1
2351              and step_id = $2
2352            "#,
2353        )
2354        .bind(run_id)
2355        .bind(step_id)
2356        .execute(exec(tx))
2357        .await?;
2358
2359        if result.rows_affected() == 0 {
2360            bail!(
2361                "failed to revert step {} to queue for run {}",
2362                step_id,
2363                run_id
2364            );
2365        }
2366        Ok(())
2367    }
2368
2369    pub async fn update_step_spec(
2370        &self,
2371        run_id: Uuid,
2372        step_id: Uuid,
2373        spec_json: Value,
2374    ) -> Result<()> {
2375        let result = sqlx::query(
2376            r#"
2377            update steps
2378            set spec_json = $3
2379            where run_id = $1
2380              and step_id = $2
2381            "#,
2382        )
2383        .bind(run_id)
2384        .bind(step_id)
2385        .bind(Json(spec_json))
2386        .execute(&self.pool)
2387        .await?;
2388
2389        if result.rows_affected() == 0 {
2390            bail!(
2391                "failed to update spec for step {} in run {}",
2392                step_id,
2393                run_id
2394            );
2395        }
2396        Ok(())
2397    }
2398
2399    pub async fn update_step_spec_tx(
2400        &self,
2401        tx: &mut Transaction<'_, Postgres>,
2402        run_id: Uuid,
2403        step_id: Uuid,
2404        spec_json: Value,
2405    ) -> Result<()> {
2406        let result = sqlx::query(
2407            r#"
2408            update steps
2409            set spec_json = $3
2410            where run_id = $1
2411              and step_id = $2
2412            "#,
2413        )
2414        .bind(run_id)
2415        .bind(step_id)
2416        .bind(Json(spec_json))
2417        .execute(exec(tx))
2418        .await?;
2419
2420        if result.rows_affected() == 0 {
2421            bail!(
2422                "failed to update spec for step {} in run {}",
2423                step_id,
2424                run_id
2425            );
2426        }
2427        Ok(())
2428    }
2429}
2430
2431mod db {
2432    use super::*;
2433    use serde_json::Value;
2434
2435    #[derive(Debug, Clone, sqlx::FromRow)]
2436    pub struct RunRow {
2437        pub run_id: Uuid,
2438        pub created_at: DateTime<Utc>,
2439        pub status: String,
2440        pub dag_json: Value,
2441        pub input_ctx: Value,
2442        pub seed: i64,
2443        pub idempotency_key: Option<String>,
2444    }
2445
2446    #[derive(Debug, Clone, sqlx::FromRow)]
2447    pub struct StepRow {
2448        pub run_id: Uuid,
2449        pub step_id: Uuid,
2450        pub idx: i32,
2451        pub priority: i16,
2452        pub spec_json: Value,
2453        pub status: String,
2454        pub attempt: i32,
2455        pub created_at: DateTime<Utc>,
2456        pub max_attempts: i32,
2457        pub retry_backoff_ms: i64,
2458        pub retry_backoff_factor: f64,
2459        pub not_before: Option<DateTime<Utc>>,
2460        pub deadline_at: Option<DateTime<Utc>>,
2461        pub pending_dependencies: i32,
2462        pub total_dependencies: i32,
2463        pub estimated_tokens: i64,
2464        pub estimated_cost: f64,
2465        pub actual_tokens: Option<i64>,
2466        pub actual_cost: Option<f64>,
2467        pub leased_by: Option<String>,
2468        pub lease_expires_at: Option<DateTime<Utc>>,
2469        pub output_json: Option<Value>,
2470        pub error_json: Option<Value>,
2471        pub input_snapshot: Option<Value>,
2472        pub output_snapshot: Option<Value>,
2473        pub provider: Option<String>,
2474        pub provider_version: Option<String>,
2475        pub checkpoint: Option<Value>,
2476        pub compensation_step: Option<Uuid>,
2477        pub compensation_scheduled: bool,
2478    }
2479
2480    #[derive(Debug, Clone, sqlx::FromRow)]
2481    pub struct StepAttemptRow {
2482        pub run_id: Uuid,
2483        pub step_id: Uuid,
2484        pub attempt: i32,
2485        pub status: String,
2486        pub recorded_at: DateTime<Utc>,
2487        pub latency_ms: Option<f64>,
2488        pub spec_snapshot: Value,
2489        pub input_snapshot: Option<Value>,
2490        pub output_snapshot: Option<Value>,
2491        pub error_snapshot: Option<Value>,
2492        pub checkpoint: Option<Value>,
2493        pub budget_snapshot: Option<Value>,
2494        pub guardrail_events: Option<Value>,
2495        pub tool_events: Option<Value>,
2496        pub artifacts: Option<Value>,
2497    }
2498
2499    #[derive(Debug, Clone, sqlx::FromRow)]
2500    pub struct BudgetRow {
2501        pub budget_id: Uuid,
2502        pub scope: String,
2503        pub run_id: Option<Uuid>,
2504        pub team_id: Option<Uuid>,
2505        pub tokens_limit: Option<i64>,
2506        pub tokens_used: i64,
2507        pub cost_limit: Option<f64>,
2508        pub cost_used: f64,
2509        pub created_at: DateTime<Utc>,
2510        pub updated_at: DateTime<Utc>,
2511    }
2512
2513    #[derive(Debug, Clone, sqlx::FromRow)]
2514    pub struct ArtifactRow {
2515        pub sha256: Vec<u8>,
2516        pub media_type: String,
2517        pub size_bytes: i64,
2518        pub created_at: DateTime<Utc>,
2519        pub metadata: Option<Value>,
2520    }
2521
2522    #[derive(Debug, Clone, sqlx::FromRow)]
2523    pub struct OutboxRow {
2524        pub id: i64,
2525        pub run_id: Uuid,
2526        pub step_id: Option<Uuid>,
2527        pub kind: String,
2528        pub payload: Value,
2529        pub created_at: DateTime<Utc>,
2530        pub published_at: Option<DateTime<Utc>>,
2531    }
2532
2533    #[derive(Debug, Clone, sqlx::FromRow)]
2534    pub struct EvalScenarioRow {
2535        pub scenario_id: Uuid,
2536        pub name: String,
2537        pub description: Option<String>,
2538        pub spec: Value,
2539        pub created_at: DateTime<Utc>,
2540    }
2541
2542    #[derive(Debug, Clone, sqlx::FromRow)]
2543    pub struct EvalResultRow {
2544        pub result_id: Uuid,
2545        pub scenario_id: Uuid,
2546        pub run_id: Uuid,
2547        pub executed_at: DateTime<Utc>,
2548        pub outcome: String,
2549        pub metrics: Value,
2550    }
2551
2552    #[derive(Debug, Clone, sqlx::FromRow)]
2553    pub struct ChangeGateRow {
2554        pub gate_id: Uuid,
2555        pub change_id: String,
2556        pub revision: Option<String>,
2557        pub effect: String,
2558        pub reasons: Value,
2559        pub followups: Value,
2560        pub scorecard: Value,
2561        pub decided_at: DateTime<Utc>,
2562        pub decided_by: Option<String>,
2563        pub metadata: Value,
2564        pub input: Value,
2565        pub telemetry: Value,
2566        pub artifact_sha256: Option<Vec<u8>>,
2567        pub created_at: DateTime<Utc>,
2568    }
2569
2570    #[derive(Debug, Clone, sqlx::FromRow)]
2571    pub struct ChangeGateFollowupRow {
2572        pub followup_id: Uuid,
2573        pub gate_id: Uuid,
2574        pub actor: String,
2575        pub outcome: String,
2576        pub note: Option<String>,
2577        pub details: Value,
2578        pub recorded_at: DateTime<Utc>,
2579    }
2580
2581    #[derive(Debug, Clone, sqlx::FromRow)]
2582    pub struct TransparencyJobRow {
2583        pub job_id: i64,
2584        pub kind: String,
2585        pub payload: Value,
2586        pub receipt: Option<Value>,
2587        pub error: Option<String>,
2588        pub attempts: i32,
2589        pub created_at: DateTime<Utc>,
2590        pub processed_at: Option<DateTime<Utc>>,
2591    }
2592}
2593
2594pub mod models {
2595    use super::*;
2596    use serde::{Deserialize, Serialize};
2597    use serde_json::Value;
2598
2599    /// Valid lifecycle states for a run.
2600    #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
2601    #[serde(rename_all = "snake_case")]
2602    pub enum RunStatus {
2603        Pending,
2604        Running,
2605        Succeeded,
2606        Failed,
2607        Canceled,
2608        PausedAtStep,
2609    }
2610
2611    impl RunStatus {
2612        pub fn as_str(self) -> &'static str {
2613            match self {
2614                RunStatus::Pending => "pending",
2615                RunStatus::Running => "running",
2616                RunStatus::Succeeded => "succeeded",
2617                RunStatus::Failed => "failed",
2618                RunStatus::Canceled => "canceled",
2619                RunStatus::PausedAtStep => "paused_at_step",
2620            }
2621        }
2622
2623        pub fn try_from_str(value: &str) -> Result<Self> {
2624            match value {
2625                "pending" => Ok(RunStatus::Pending),
2626                "running" => Ok(RunStatus::Running),
2627                "succeeded" => Ok(RunStatus::Succeeded),
2628                "failed" => Ok(RunStatus::Failed),
2629                "canceled" => Ok(RunStatus::Canceled),
2630                "paused_at_step" => Ok(RunStatus::PausedAtStep),
2631                other => Err(anyhow!("invalid run_status '{}'", other)),
2632            }
2633        }
2634    }
2635
2636    /// Valid lifecycle states for a step.
2637    #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
2638    #[serde(rename_all = "snake_case")]
2639    pub enum StepStatus {
2640        Queued,
2641        Blocked,
2642        Leased,
2643        Running,
2644        Succeeded,
2645        Failed,
2646        Skipped,
2647    }
2648
2649    impl StepStatus {
2650        pub fn as_str(self) -> &'static str {
2651            match self {
2652                StepStatus::Queued => "queued",
2653                StepStatus::Blocked => "blocked",
2654                StepStatus::Leased => "leased",
2655                StepStatus::Running => "running",
2656                StepStatus::Succeeded => "succeeded",
2657                StepStatus::Failed => "failed",
2658                StepStatus::Skipped => "skipped",
2659            }
2660        }
2661
2662        pub fn try_from_str(value: &str) -> Result<Self> {
2663            match value {
2664                "queued" => Ok(StepStatus::Queued),
2665                "blocked" => Ok(StepStatus::Blocked),
2666                "leased" => Ok(StepStatus::Leased),
2667                "running" => Ok(StepStatus::Running),
2668                "succeeded" => Ok(StepStatus::Succeeded),
2669                "failed" => Ok(StepStatus::Failed),
2670                "skipped" => Ok(StepStatus::Skipped),
2671                other => Err(anyhow!("invalid step_status '{}'", other)),
2672            }
2673        }
2674    }
2675
2676    /// Run contract materialized in the database.
2677    #[derive(Debug, Clone, Serialize, Deserialize)]
2678    pub struct Run {
2679        pub run_id: Uuid,
2680        pub created_at: DateTime<Utc>,
2681        pub status: RunStatus,
2682        pub dag_json: Value,
2683        pub input_ctx: Value,
2684        pub seed: i64,
2685        pub idempotency_key: Option<String>,
2686    }
2687
2688    /// Insertable representation for a run.
2689    #[derive(Debug, Clone, Serialize, Deserialize)]
2690    pub struct NewRun {
2691        pub run_id: Uuid,
2692        pub status: RunStatus,
2693        pub dag_json: Value,
2694        pub input_ctx: Value,
2695        pub seed: i64,
2696        pub idempotency_key: Option<String>,
2697    }
2698
2699    /// Step contract spanning deterministic replay and attempt metadata.
2700    #[derive(Debug, Clone, Serialize, Deserialize)]
2701    pub struct Step {
2702        pub run_id: Uuid,
2703        pub step_id: Uuid,
2704        pub idx: i32,
2705        pub priority: i16,
2706        pub spec_json: Value,
2707        pub status: StepStatus,
2708        pub attempt: i32,
2709        pub created_at: DateTime<Utc>,
2710        pub max_attempts: i32,
2711        pub retry_backoff_ms: i64,
2712        pub retry_backoff_factor: f64,
2713        pub not_before: Option<DateTime<Utc>>,
2714        pub deadline_at: Option<DateTime<Utc>>,
2715        pub pending_dependencies: i32,
2716        pub total_dependencies: i32,
2717        pub estimated_tokens: i64,
2718        pub estimated_cost: f64,
2719        pub actual_tokens: Option<i64>,
2720        pub actual_cost: Option<f64>,
2721        pub leased_by: Option<String>,
2722        pub lease_expires_at: Option<DateTime<Utc>>,
2723        pub output_json: Option<Value>,
2724        pub error_json: Option<Value>,
2725        pub input_snapshot: Option<Value>,
2726        pub output_snapshot: Option<Value>,
2727        pub provider: Option<String>,
2728        pub provider_version: Option<String>,
2729        pub checkpoint: Option<Value>,
2730        pub compensation_step: Option<Uuid>,
2731        pub compensation_scheduled: bool,
2732    }
2733
2734    /// Insertable representation for a step.
2735    #[derive(Debug, Clone, Serialize, Deserialize)]
2736    pub struct NewStep {
2737        pub run_id: Uuid,
2738        pub step_id: Uuid,
2739        pub idx: i32,
2740        pub priority: i16,
2741        pub spec_json: Value,
2742        pub status: StepStatus,
2743        pub attempt: i32,
2744        pub input_snapshot: Option<Value>,
2745        pub provider: Option<String>,
2746        pub provider_version: Option<String>,
2747        pub max_attempts: i32,
2748        pub retry_backoff_ms: i64,
2749        pub retry_backoff_factor: f64,
2750        pub not_before: Option<DateTime<Utc>>,
2751        pub deadline_at: Option<DateTime<Utc>>,
2752        pub pending_dependencies: i32,
2753        pub total_dependencies: i32,
2754        pub estimated_tokens: i64,
2755        pub estimated_cost: f64,
2756        pub actual_tokens: Option<i64>,
2757        pub actual_cost: Option<f64>,
2758        pub checkpoint: Option<Value>,
2759        pub compensation_step: Option<Uuid>,
2760        pub compensation_scheduled: bool,
2761    }
2762
2763    /// Immutable snapshot of a step attempt used for deterministic replay.
2764    #[derive(Debug, Clone, Serialize, Deserialize)]
2765    pub struct StepAttempt {
2766        pub run_id: Uuid,
2767        pub step_id: Uuid,
2768        pub attempt: i32,
2769        pub status: StepStatus,
2770        pub recorded_at: DateTime<Utc>,
2771        pub latency_ms: Option<f64>,
2772        pub spec_snapshot: Value,
2773        pub input_snapshot: Option<Value>,
2774        pub output_snapshot: Option<Value>,
2775        pub error_snapshot: Option<Value>,
2776        pub checkpoint: Option<Value>,
2777        pub budget_snapshot: Option<Value>,
2778        pub guardrail_events: Option<Value>,
2779        pub tool_events: Option<Value>,
2780        pub artifacts: Option<Value>,
2781    }
2782
2783    #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
2784    #[serde(rename_all = "snake_case")]
2785    pub enum TransparencyJobKind {
2786        ReleaseBundle,
2787        RunRollup,
2788        ArtifactEntry,
2789    }
2790
2791    impl TransparencyJobKind {
2792        pub fn as_str(self) -> &'static str {
2793            match self {
2794                TransparencyJobKind::ReleaseBundle => "release_bundle",
2795                TransparencyJobKind::RunRollup => "run_rollup",
2796                TransparencyJobKind::ArtifactEntry => "artifact_entry",
2797            }
2798        }
2799    }
2800
2801    impl FromStr for TransparencyJobKind {
2802        type Err = anyhow::Error;
2803
2804        fn from_str(s: &str) -> Result<Self> {
2805            match s {
2806                "release_bundle" => Ok(TransparencyJobKind::ReleaseBundle),
2807                "run_rollup" => Ok(TransparencyJobKind::RunRollup),
2808                "artifact_entry" => Ok(TransparencyJobKind::ArtifactEntry),
2809                other => Err(anyhow!("invalid transparency job kind '{}'", other)),
2810            }
2811        }
2812    }
2813
2814    #[derive(Debug, Clone, Serialize, Deserialize)]
2815    pub struct ReleaseBundlePayload {
2816        pub change_id: String,
2817        pub gate_id: Uuid,
2818        pub artifact_sha256: String,
2819        pub attestation_ids: Vec<Uuid>,
2820        pub metadata: Value,
2821    }
2822
2823    #[derive(Debug, Clone, Serialize, Deserialize)]
2824    pub struct RunRollupPayload {
2825        pub run_id: Uuid,
2826        pub artifact_sha256: String,
2827        pub attestation_ids: Vec<Uuid>,
2828        pub artifact_key: String,
2829        pub metadata: Value,
2830    }
2831
2832    #[derive(Debug, Clone, Serialize, Deserialize)]
2833    pub struct ArtifactEntryPayload {
2834        pub run_id: Option<Uuid>,
2835        pub step_id: Option<Uuid>,
2836        pub artifact_key: String,
2837        pub artifact_sha256: String,
2838        pub attestation_ids: Vec<Uuid>,
2839        pub metadata: Value,
2840    }
2841
2842    #[derive(Debug, Clone, Serialize, Deserialize)]
2843    #[serde(tag = "kind", rename_all = "snake_case")]
2844    pub enum TransparencyJobPayload {
2845        ReleaseBundle(ReleaseBundlePayload),
2846        RunRollup(RunRollupPayload),
2847        ArtifactEntry(ArtifactEntryPayload),
2848    }
2849
2850    impl TransparencyJobPayload {
2851        pub fn kind(&self) -> TransparencyJobKind {
2852            match self {
2853                TransparencyJobPayload::ReleaseBundle(_) => TransparencyJobKind::ReleaseBundle,
2854                TransparencyJobPayload::RunRollup(_) => TransparencyJobKind::RunRollup,
2855                TransparencyJobPayload::ArtifactEntry(_) => TransparencyJobKind::ArtifactEntry,
2856            }
2857        }
2858    }
2859
2860    #[derive(Debug, Clone, Serialize, Deserialize)]
2861    pub struct TransparencyJob {
2862        pub job_id: i64,
2863        pub kind: TransparencyJobKind,
2864        pub payload: TransparencyJobPayload,
2865        pub attempts: i32,
2866        pub last_error: Option<String>,
2867        pub receipt: Option<Value>,
2868        pub processed_at: Option<DateTime<Utc>>,
2869        pub created_at: DateTime<Utc>,
2870    }
2871
2872    #[derive(Debug, Clone, Serialize, Deserialize)]
2873    pub struct NewTransparencyJob {
2874        pub payload: TransparencyJobPayload,
2875    }
2876
2877    /// Insertable representation for a step attempt.
2878    #[derive(Debug, Clone, Serialize, Deserialize)]
2879    pub struct NewStepAttempt {
2880        pub run_id: Uuid,
2881        pub step_id: Uuid,
2882        pub attempt: i32,
2883        pub status: StepStatus,
2884        pub latency_ms: Option<f64>,
2885        pub spec_snapshot: Value,
2886        pub input_snapshot: Option<Value>,
2887        pub output_snapshot: Option<Value>,
2888        pub error_snapshot: Option<Value>,
2889        pub checkpoint: Option<Value>,
2890        pub budget_snapshot: Option<Value>,
2891        pub guardrail_events: Option<Value>,
2892        pub tool_events: Option<Value>,
2893        pub artifacts: Option<Value>,
2894    }
2895
2896    impl Default for NewStep {
2897        fn default() -> Self {
2898            Self {
2899                run_id: Uuid::nil(),
2900                step_id: Uuid::nil(),
2901                idx: 0,
2902                priority: 0,
2903                spec_json: Value::Null,
2904                status: StepStatus::Queued,
2905                attempt: 0,
2906                input_snapshot: None,
2907                provider: None,
2908                provider_version: None,
2909                max_attempts: 1,
2910                retry_backoff_ms: 0,
2911                retry_backoff_factor: 1.0,
2912                not_before: None,
2913                deadline_at: None,
2914                pending_dependencies: 0,
2915                total_dependencies: 0,
2916                estimated_tokens: 0,
2917                estimated_cost: 0.0,
2918                actual_tokens: None,
2919                actual_cost: None,
2920                checkpoint: None,
2921                compensation_step: None,
2922                compensation_scheduled: false,
2923            }
2924        }
2925    }
2926
2927    impl TryFrom<crate::db::StepRow> for Step {
2928        type Error = anyhow::Error;
2929
2930        fn try_from(row: crate::db::StepRow) -> Result<Self, Self::Error> {
2931            Ok(Self {
2932                run_id: row.run_id,
2933                step_id: row.step_id,
2934                idx: row.idx,
2935                priority: row.priority,
2936                spec_json: row.spec_json,
2937                status: StepStatus::try_from_str(&row.status)?,
2938                attempt: row.attempt,
2939                created_at: row.created_at,
2940                max_attempts: row.max_attempts,
2941                retry_backoff_ms: row.retry_backoff_ms,
2942                retry_backoff_factor: row.retry_backoff_factor,
2943                not_before: row.not_before,
2944                deadline_at: row.deadline_at,
2945                pending_dependencies: row.pending_dependencies,
2946                total_dependencies: row.total_dependencies,
2947                estimated_tokens: row.estimated_tokens,
2948                estimated_cost: row.estimated_cost,
2949                actual_tokens: row.actual_tokens,
2950                actual_cost: row.actual_cost,
2951                leased_by: row.leased_by,
2952                lease_expires_at: row.lease_expires_at,
2953                output_json: row.output_json,
2954                error_json: row.error_json,
2955                input_snapshot: row.input_snapshot,
2956                output_snapshot: row.output_snapshot,
2957                provider: row.provider,
2958                provider_version: row.provider_version,
2959                checkpoint: row.checkpoint,
2960                compensation_step: row.compensation_step,
2961                compensation_scheduled: row.compensation_scheduled,
2962            })
2963        }
2964    }
2965
2966    impl TryFrom<crate::db::StepAttemptRow> for StepAttempt {
2967        type Error = anyhow::Error;
2968
2969        fn try_from(row: crate::db::StepAttemptRow) -> Result<Self, Self::Error> {
2970            Ok(Self {
2971                run_id: row.run_id,
2972                step_id: row.step_id,
2973                attempt: row.attempt,
2974                status: StepStatus::try_from_str(&row.status)?,
2975                recorded_at: row.recorded_at,
2976                latency_ms: row.latency_ms,
2977                spec_snapshot: row.spec_snapshot,
2978                input_snapshot: row.input_snapshot,
2979                output_snapshot: row.output_snapshot,
2980                error_snapshot: row.error_snapshot,
2981                checkpoint: row.checkpoint,
2982                budget_snapshot: row.budget_snapshot,
2983                guardrail_events: row.guardrail_events,
2984                tool_events: row.tool_events,
2985                artifacts: row.artifacts,
2986            })
2987        }
2988    }
2989
2990    /// Content-addressed artifact metadata pointing at object storage bytes.
2991    #[derive(Debug, Clone, Serialize, Deserialize)]
2992    pub struct Artifact {
2993        pub sha256: Vec<u8>,
2994        pub media_type: String,
2995        pub size_bytes: i64,
2996        pub created_at: DateTime<Utc>,
2997        pub metadata: Option<Value>,
2998    }
2999
3000    /// Insertable representation for artifact metadata.
3001    #[derive(Debug, Clone, Serialize, Deserialize)]
3002    pub struct NewArtifact {
3003        pub sha256: Vec<u8>,
3004        pub media_type: String,
3005        pub size_bytes: i64,
3006        pub metadata: Option<Value>,
3007    }
3008
3009    impl From<super::ArtifactMeta> for NewArtifact {
3010        fn from(meta: super::ArtifactMeta) -> Self {
3011            Self {
3012                sha256: meta.sha256.to_vec(),
3013                media_type: meta.media_type,
3014                size_bytes: std::cmp::min(meta.size, i64::MAX as u64) as i64,
3015                metadata: Some(meta.metadata),
3016            }
3017        }
3018    }
3019
3020    /// Budget configuration and cumulative consumption counters.
3021    #[derive(Debug, Clone, Serialize, Deserialize)]
3022    pub struct Budget {
3023        pub budget_id: Uuid,
3024        pub scope: String,
3025        pub run_id: Option<Uuid>,
3026        pub team_id: Option<Uuid>,
3027        pub tokens_limit: Option<i64>,
3028        pub tokens_used: i64,
3029        pub cost_limit: Option<f64>,
3030        pub cost_used: f64,
3031        pub created_at: DateTime<Utc>,
3032        pub updated_at: DateTime<Utc>,
3033    }
3034
3035    impl TryFrom<crate::db::BudgetRow> for Budget {
3036        type Error = anyhow::Error;
3037
3038        fn try_from(row: crate::db::BudgetRow) -> Result<Self, Self::Error> {
3039            Ok(Self {
3040                budget_id: row.budget_id,
3041                scope: row.scope,
3042                run_id: row.run_id,
3043                team_id: row.team_id,
3044                tokens_limit: row.tokens_limit,
3045                tokens_used: row.tokens_used,
3046                cost_limit: row.cost_limit,
3047                cost_used: row.cost_used,
3048                created_at: row.created_at,
3049                updated_at: row.updated_at,
3050            })
3051        }
3052    }
3053
3054    impl Budget {
3055        pub fn remaining_tokens(&self) -> Option<i64> {
3056            self.tokens_limit
3057                .map(|limit| limit.saturating_sub(self.tokens_used))
3058        }
3059
3060        pub fn remaining_cost(&self) -> Option<f64> {
3061            self.cost_limit
3062                .map(|limit| (limit - self.cost_used).max(0.0))
3063        }
3064    }
3065
3066    /// Outbox event persisted transactionally with state mutations.
3067    #[derive(Debug, Clone, Serialize, Deserialize)]
3068    pub struct OutboxEvent {
3069        pub id: i64,
3070        pub run_id: Uuid,
3071        pub step_id: Option<Uuid>,
3072        pub kind: String,
3073        pub payload: Value,
3074        pub created_at: DateTime<Utc>,
3075        pub published_at: Option<DateTime<Utc>>,
3076    }
3077
3078    /// Registered evaluation scenario definition.
3079    #[derive(Debug, Clone, Serialize, Deserialize)]
3080    pub struct EvalScenario {
3081        pub scenario_id: Uuid,
3082        pub name: String,
3083        pub description: Option<String>,
3084        pub spec: Value,
3085        pub created_at: DateTime<Utc>,
3086    }
3087
3088    /// Result for an evaluation scenario run.
3089    #[derive(Debug, Clone, Serialize, Deserialize)]
3090    pub struct EvalResult {
3091        pub result_id: Uuid,
3092        pub scenario_id: Uuid,
3093        pub run_id: Uuid,
3094        pub executed_at: DateTime<Utc>,
3095        pub outcome: String,
3096        pub metrics: Value,
3097    }
3098
3099    /// Builder for inserting a new outbox event alongside DB state changes.
3100    #[derive(Debug, Clone, Serialize, Deserialize)]
3101    pub struct NewOutboxEvent {
3102        pub run_id: Uuid,
3103        pub step_id: Option<Uuid>,
3104        pub kind: String,
3105        pub payload: Value,
3106    }
3107
3108    impl TryFrom<crate::db::RunRow> for Run {
3109        type Error = anyhow::Error;
3110
3111        fn try_from(row: crate::db::RunRow) -> Result<Self, Self::Error> {
3112            Ok(Self {
3113                run_id: row.run_id,
3114                created_at: row.created_at,
3115                status: RunStatus::try_from_str(&row.status)?,
3116                dag_json: row.dag_json,
3117                input_ctx: row.input_ctx,
3118                seed: row.seed,
3119                idempotency_key: row.idempotency_key,
3120            })
3121        }
3122    }
3123
3124    impl TryFrom<crate::db::EvalScenarioRow> for EvalScenario {
3125        type Error = anyhow::Error;
3126
3127        fn try_from(row: crate::db::EvalScenarioRow) -> Result<Self, Self::Error> {
3128            Ok(Self {
3129                scenario_id: row.scenario_id,
3130                name: row.name,
3131                description: row.description,
3132                spec: row.spec,
3133                created_at: row.created_at,
3134            })
3135        }
3136    }
3137
3138    impl TryFrom<crate::db::EvalResultRow> for EvalResult {
3139        type Error = anyhow::Error;
3140
3141        fn try_from(row: crate::db::EvalResultRow) -> Result<Self, Self::Error> {
3142            Ok(Self {
3143                result_id: row.result_id,
3144                scenario_id: row.scenario_id,
3145                run_id: row.run_id,
3146                executed_at: row.executed_at,
3147                outcome: row.outcome,
3148                metrics: row.metrics,
3149            })
3150        }
3151    }
3152
3153    #[derive(Debug, Clone, Serialize, Deserialize)]
3154    pub struct ChangeGate {
3155        pub gate_id: Uuid,
3156        pub change_id: String,
3157        pub revision: Option<String>,
3158        pub effect: String,
3159        pub reasons: Value,
3160        pub followups: Value,
3161        pub scorecard: Value,
3162        pub decided_at: DateTime<Utc>,
3163        pub decided_by: Option<String>,
3164        pub metadata: Value,
3165        pub input: Value,
3166        pub telemetry: Value,
3167        pub artifact_sha256: Option<[u8; 32]>,
3168        pub created_at: DateTime<Utc>,
3169    }
3170
3171    #[derive(Debug, Clone, Serialize, Deserialize)]
3172    pub struct NewChangeGate {
3173        pub gate_id: Uuid,
3174        pub change_id: String,
3175        pub revision: Option<String>,
3176        pub effect: String,
3177        pub reasons: Value,
3178        pub followups: Value,
3179        pub scorecard: Value,
3180        pub decided_at: DateTime<Utc>,
3181        pub decided_by: Option<String>,
3182        pub metadata: Value,
3183        pub input: Value,
3184        pub telemetry: Value,
3185        pub artifact_sha256: Option<[u8; 32]>,
3186    }
3187
3188    #[derive(Debug, Clone, Serialize, Deserialize)]
3189    pub struct ChangeGateFollowup {
3190        pub followup_id: Uuid,
3191        pub gate_id: Uuid,
3192        pub actor: String,
3193        pub outcome: String,
3194        pub note: Option<String>,
3195        pub details: Value,
3196        pub recorded_at: DateTime<Utc>,
3197    }
3198
3199    #[derive(Debug, Clone, Serialize, Deserialize)]
3200    pub struct NewChangeGateFollowup {
3201        pub followup_id: Uuid,
3202        pub gate_id: Uuid,
3203        pub actor: String,
3204        pub outcome: String,
3205        pub note: Option<String>,
3206        pub details: Value,
3207    }
3208
3209    impl From<crate::db::ChangeGateRow> for ChangeGate {
3210        fn from(row: crate::db::ChangeGateRow) -> Self {
3211            let artifact_sha256 = row.artifact_sha256.and_then(|vec| {
3212                if vec.len() == 32 {
3213                    let mut value = [0u8; 32];
3214                    value.copy_from_slice(&vec);
3215                    Some(value)
3216                } else {
3217                    None
3218                }
3219            });
3220
3221            Self {
3222                gate_id: row.gate_id,
3223                change_id: row.change_id,
3224                revision: row.revision,
3225                effect: row.effect,
3226                reasons: row.reasons,
3227                followups: row.followups,
3228                scorecard: row.scorecard,
3229                decided_at: row.decided_at,
3230                decided_by: row.decided_by,
3231                metadata: row.metadata,
3232                input: row.input,
3233                telemetry: row.telemetry,
3234                artifact_sha256,
3235                created_at: row.created_at,
3236            }
3237        }
3238    }
3239
3240    impl From<crate::db::ChangeGateFollowupRow> for ChangeGateFollowup {
3241        fn from(row: crate::db::ChangeGateFollowupRow) -> Self {
3242            Self {
3243                followup_id: row.followup_id,
3244                gate_id: row.gate_id,
3245                actor: row.actor,
3246                outcome: row.outcome,
3247                note: row.note,
3248                details: row.details,
3249                recorded_at: row.recorded_at,
3250            }
3251        }
3252    }
3253
3254    impl TryFrom<crate::db::TransparencyJobRow> for TransparencyJob {
3255        type Error = anyhow::Error;
3256
3257        fn try_from(row: crate::db::TransparencyJobRow) -> Result<Self> {
3258            let kind = TransparencyJobKind::from_str(&row.kind)?;
3259            let payload: TransparencyJobPayload = serde_json::from_value(row.payload)?;
3260            Ok(TransparencyJob {
3261                job_id: row.job_id,
3262                kind,
3263                payload,
3264                attempts: row.attempts,
3265                last_error: row.error,
3266                receipt: row.receipt,
3267                processed_at: row.processed_at,
3268                created_at: row.created_at,
3269            })
3270        }
3271    }
3272
3273    impl TryFrom<crate::db::ArtifactRow> for Artifact {
3274        type Error = anyhow::Error;
3275
3276        fn try_from(row: crate::db::ArtifactRow) -> Result<Self, Self::Error> {
3277            Ok(Self {
3278                sha256: row.sha256,
3279                media_type: row.media_type,
3280                size_bytes: row.size_bytes,
3281                created_at: row.created_at,
3282                metadata: row.metadata,
3283            })
3284        }
3285    }
3286
3287    impl TryFrom<crate::db::OutboxRow> for OutboxEvent {
3288        type Error = anyhow::Error;
3289
3290        fn try_from(row: crate::db::OutboxRow) -> Result<Self, Self::Error> {
3291            Ok(Self {
3292                id: row.id,
3293                run_id: row.run_id,
3294                step_id: row.step_id,
3295                kind: row.kind,
3296                payload: row.payload,
3297                created_at: row.created_at,
3298                published_at: row.published_at,
3299            })
3300        }
3301    }
3302}
3303
3304impl Storage {
3305    /// Registers or updates an evaluation scenario definition.
3306    pub async fn upsert_eval_scenario(
3307        &self,
3308        name: &str,
3309        description: Option<&str>,
3310        spec: Value,
3311    ) -> Result<Uuid> {
3312        let scenario_id = sqlx::query_scalar::<_, Uuid>(
3313            r#"
3314            insert into eval_scenarios (name, description, spec)
3315            values ($1, $2, $3)
3316            on conflict (name)
3317            do update set description = excluded.description,
3318                          spec = excluded.spec
3319            returning scenario_id
3320            "#,
3321        )
3322        .bind(name)
3323        .bind(description)
3324        .bind(Json(spec))
3325        .fetch_one(&self.pool)
3326        .await?;
3327
3328        Ok(scenario_id)
3329    }
3330
3331    /// Fetches an evaluation scenario by its unique name.
3332    pub async fn fetch_eval_scenario_by_name(
3333        &self,
3334        name: &str,
3335    ) -> Result<Option<models::EvalScenario>> {
3336        let row = sqlx::query_as::<_, db::EvalScenarioRow>(
3337            r#"
3338            select *
3339            from eval_scenarios
3340            where name = $1
3341            "#,
3342        )
3343        .bind(name)
3344        .fetch_optional(&self.pool)
3345        .await?;
3346
3347        row.map(models::EvalScenario::try_from).transpose()
3348    }
3349
3350    /// Lists all evaluation scenarios ordered by name.
3351    pub async fn list_eval_scenarios(&self) -> Result<Vec<models::EvalScenario>> {
3352        let rows = sqlx::query_as::<_, db::EvalScenarioRow>(
3353            r#"
3354            select *
3355            from eval_scenarios
3356            order by name asc
3357            "#,
3358        )
3359        .fetch_all(&self.pool)
3360        .await?;
3361
3362        rows.into_iter()
3363            .map(models::EvalScenario::try_from)
3364            .collect()
3365    }
3366
3367    /// Returns the latest evaluation result for a scenario, if any.
3368    pub async fn latest_eval_result(
3369        &self,
3370        scenario_id: Uuid,
3371    ) -> Result<Option<models::EvalResult>> {
3372        let row = sqlx::query_as::<_, db::EvalResultRow>(
3373            r#"
3374            select *
3375            from eval_results
3376            where scenario_id = $1
3377            order by executed_at desc
3378            limit 1
3379            "#,
3380        )
3381        .bind(scenario_id)
3382        .fetch_optional(&self.pool)
3383        .await?;
3384
3385        row.map(models::EvalResult::try_from).transpose()
3386    }
3387
3388    /// Lists evaluation results for a scenario, newest first.
3389    pub async fn list_eval_results(
3390        &self,
3391        scenario_id: Uuid,
3392        limit: i64,
3393    ) -> Result<Vec<models::EvalResult>> {
3394        let rows = sqlx::query_as::<_, db::EvalResultRow>(
3395            r#"
3396            select *
3397            from eval_results
3398            where scenario_id = $1
3399            order by executed_at desc
3400            limit $2
3401            "#,
3402        )
3403        .bind(scenario_id)
3404        .bind(limit.max(1))
3405        .fetch_all(&self.pool)
3406        .await?;
3407
3408        rows.into_iter().map(models::EvalResult::try_from).collect()
3409    }
3410
3411    /// Inserts or replaces an evaluation result for the given scenario/run pair.
3412    pub async fn insert_eval_result(
3413        &self,
3414        scenario_id: Uuid,
3415        run_id: Uuid,
3416        outcome: &str,
3417        metrics: Value,
3418    ) -> Result<models::EvalResult> {
3419        let row = sqlx::query_as::<_, db::EvalResultRow>(
3420            r#"
3421            insert into eval_results (scenario_id, run_id, outcome, metrics)
3422            values ($1, $2, $3, $4)
3423            on conflict (scenario_id, run_id)
3424            do update set outcome = excluded.outcome,
3425                          metrics = excluded.metrics,
3426                          executed_at = now()
3427            returning *
3428            "#,
3429        )
3430        .bind(scenario_id)
3431        .bind(run_id)
3432        .bind(outcome)
3433        .bind(Json(metrics))
3434        .fetch_one(&self.pool)
3435        .await?;
3436
3437        models::EvalResult::try_from(row)
3438    }
3439
3440    /// Records a ChangeOps gate decision and returns the stored record.
3441    pub async fn insert_change_gate(
3442        &self,
3443        gate: models::NewChangeGate,
3444    ) -> Result<models::ChangeGate> {
3445        let models::NewChangeGate {
3446            gate_id,
3447            change_id,
3448            revision,
3449            effect,
3450            reasons,
3451            followups,
3452            scorecard,
3453            decided_at,
3454            decided_by,
3455            metadata,
3456            input,
3457            telemetry,
3458            artifact_sha256,
3459        } = gate;
3460
3461        let row = sqlx::query_as::<_, db::ChangeGateRow>(
3462            r#"
3463            insert into change_gates (
3464                gate_id,
3465                change_id,
3466                revision,
3467                effect,
3468                reasons,
3469                followups,
3470                scorecard,
3471                decided_at,
3472                decided_by,
3473                metadata,
3474                input,
3475                telemetry,
3476                artifact_sha256
3477            )
3478            values (
3479                $1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13
3480            )
3481            returning *
3482            "#,
3483        )
3484        .bind(gate_id)
3485        .bind(change_id)
3486        .bind(revision)
3487        .bind(effect)
3488        .bind(Json(reasons))
3489        .bind(Json(followups))
3490        .bind(Json(scorecard))
3491        .bind(decided_at)
3492        .bind(decided_by)
3493        .bind(Json(metadata))
3494        .bind(Json(input))
3495        .bind(Json(telemetry))
3496        .bind(artifact_sha256.map(|digest| digest.to_vec()))
3497        .fetch_one(&self.pool)
3498        .await?;
3499
3500        Ok(models::ChangeGate::from(row))
3501    }
3502
3503    /// Replaces the metadata JSON stored for a change gate decision.
3504    pub async fn update_change_gate_metadata(&self, gate_id: Uuid, metadata: Value) -> Result<()> {
3505        let result = sqlx::query(
3506            r#"
3507            update change_gates
3508            set metadata = $2
3509            where gate_id = $1
3510            "#,
3511        )
3512        .bind(gate_id)
3513        .bind(Json(metadata))
3514        .execute(&self.pool)
3515        .await?;
3516
3517        if result.rows_affected() == 0 {
3518            bail!("change gate {} not found", gate_id);
3519        }
3520
3521        Ok(())
3522    }
3523
3524    /// Fetches the latest gate decision for a change id (limited by `limit`).
3525    pub async fn list_change_gates_for_change(
3526        &self,
3527        change_id: &str,
3528        limit: i64,
3529    ) -> Result<Vec<models::ChangeGate>> {
3530        let rows = sqlx::query_as::<_, db::ChangeGateRow>(
3531            r#"
3532            select *
3533            from change_gates
3534            where change_id = $1
3535            order by decided_at desc
3536            limit $2
3537            "#,
3538        )
3539        .bind(change_id)
3540        .bind(limit.max(1))
3541        .fetch_all(&self.pool)
3542        .await?;
3543
3544        Ok(rows.into_iter().map(models::ChangeGate::from).collect())
3545    }
3546
3547    /// Retrieves a specific gate decision by gate id.
3548    pub async fn fetch_change_gate(&self, gate_id: Uuid) -> Result<Option<models::ChangeGate>> {
3549        let row = sqlx::query_as::<_, db::ChangeGateRow>(
3550            r#"
3551            select *
3552            from change_gates
3553            where gate_id = $1
3554            "#,
3555        )
3556        .bind(gate_id)
3557        .fetch_optional(&self.pool)
3558        .await?;
3559
3560        Ok(row.map(models::ChangeGate::from))
3561    }
3562
3563    /// Enqueues a transparency job for asynchronous publishing.
3564    pub async fn enqueue_transparency_job(
3565        &self,
3566        payload: models::TransparencyJobPayload,
3567    ) -> Result<i64> {
3568        let kind = payload.kind().as_str().to_string();
3569        let job_id = sqlx::query_scalar::<_, i64>(
3570            r#"
3571            insert into transparency_jobs (kind, payload)
3572            values ($1, $2)
3573            returning job_id
3574            "#,
3575        )
3576        .bind(kind)
3577        .bind(serde_json::to_value(&payload)?)
3578        .fetch_one(&self.pool)
3579        .await?;
3580
3581        Ok(job_id)
3582    }
3583
3584    /// Returns pending transparency jobs up to the requested limit.
3585    pub async fn fetch_pending_transparency_jobs(
3586        &self,
3587        limit: i64,
3588    ) -> Result<Vec<models::TransparencyJob>> {
3589        let rows = sqlx::query_as::<_, db::TransparencyJobRow>(
3590            r#"
3591            select *
3592            from transparency_jobs
3593            where processed_at is null
3594            order by job_id asc
3595            limit $1
3596            "#,
3597        )
3598        .bind(limit)
3599        .fetch_all(&self.pool)
3600        .await?;
3601
3602        rows.into_iter()
3603            .map(models::TransparencyJob::try_from)
3604            .collect()
3605    }
3606
3607    /// Fetches transparency jobs by explicit identifiers.
3608    pub async fn fetch_transparency_jobs_by_ids(
3609        &self,
3610        job_ids: &[i64],
3611    ) -> Result<Vec<models::TransparencyJob>> {
3612        if job_ids.is_empty() {
3613            return Ok(Vec::new());
3614        }
3615        let rows = sqlx::query_as::<_, db::TransparencyJobRow>(
3616            r#"
3617            select *
3618            from transparency_jobs
3619            where job_id = any($1)
3620            order by job_id asc
3621            "#,
3622        )
3623        .bind(job_ids)
3624        .fetch_all(&self.pool)
3625        .await?;
3626
3627        rows.into_iter()
3628            .map(models::TransparencyJob::try_from)
3629            .collect()
3630    }
3631
3632    /// Records a successful transparency job along with the receipt payload.
3633    pub async fn mark_transparency_job_processed(&self, job_id: i64, receipt: Value) -> Result<()> {
3634        let result = sqlx::query(
3635            r#"
3636            update transparency_jobs
3637            set processed_at = now(),
3638                receipt = $2,
3639                error = null
3640            where job_id = $1
3641            "#,
3642        )
3643        .bind(job_id)
3644        .bind(Json(receipt))
3645        .execute(&self.pool)
3646        .await?;
3647
3648        if result.rows_affected() == 0 {
3649            bail!("transparency job {} not found", job_id);
3650        }
3651
3652        Ok(())
3653    }
3654
3655    /// Marks a transparency job failure so the writer can retry later.
3656    pub async fn mark_transparency_job_failed(&self, job_id: i64, error: String) -> Result<()> {
3657        let result = sqlx::query(
3658            r#"
3659            update transparency_jobs
3660            set error = $2,
3661                attempts = attempts + 1
3662            where job_id = $1
3663            "#,
3664        )
3665        .bind(job_id)
3666        .bind(error)
3667        .execute(&self.pool)
3668        .await?;
3669
3670        if result.rows_affected() == 0 {
3671            bail!("transparency job {} not found", job_id);
3672        }
3673
3674        Ok(())
3675    }
3676
3677    /// Records a follow-up acknowledgement for a gate.
3678    pub async fn insert_change_gate_followup(
3679        &self,
3680        followup: models::NewChangeGateFollowup,
3681    ) -> Result<models::ChangeGateFollowup> {
3682        let models::NewChangeGateFollowup {
3683            followup_id,
3684            gate_id,
3685            actor,
3686            outcome,
3687            note,
3688            details,
3689        } = followup;
3690
3691        let row = sqlx::query_as::<_, db::ChangeGateFollowupRow>(
3692            r#"
3693            insert into change_gate_followups (
3694                followup_id,
3695                gate_id,
3696                actor,
3697                outcome,
3698                note,
3699                details
3700            )
3701            values ($1,$2,$3,$4,$5,$6)
3702            returning *
3703            "#,
3704        )
3705        .bind(followup_id)
3706        .bind(gate_id)
3707        .bind(actor)
3708        .bind(outcome)
3709        .bind(note)
3710        .bind(Json(details))
3711        .fetch_one(&self.pool)
3712        .await?;
3713
3714        Ok(models::ChangeGateFollowup::from(row))
3715    }
3716
3717    /// Fetches all follow-up records for a gate in chronological order.
3718    pub async fn fetch_change_gate_followups(
3719        &self,
3720        gate_id: Uuid,
3721    ) -> Result<Vec<models::ChangeGateFollowup>> {
3722        let rows = sqlx::query_as::<_, db::ChangeGateFollowupRow>(
3723            r#"
3724            select *
3725            from change_gate_followups
3726            where gate_id = $1
3727            order by recorded_at asc
3728            "#,
3729        )
3730        .bind(gate_id)
3731        .fetch_all(&self.pool)
3732        .await?;
3733
3734        Ok(rows
3735            .into_iter()
3736            .map(models::ChangeGateFollowup::from)
3737            .collect())
3738    }
3739}
3740
3741#[cfg(test)]
3742mod tests {
3743    use super::*;
3744    use crate::InMemoryStorage;
3745    use crate::RunStore;
3746    use bytes::Bytes;
3747    use serde_json::json;
3748    use std::env;
3749    use testcontainers::clients::Cli;
3750    use testcontainers::images::postgres::Postgres;
3751    use testcontainers::Container;
3752    use tokio::sync::Barrier;
3753    use uuid::Uuid;
3754
3755    fn docker_available() -> bool {
3756        std::fs::metadata("/var/run/docker.sock").is_ok() || std::env::var("DOCKER_HOST").is_ok()
3757    }
3758
3759    async fn setup_storage() -> Result<(Storage, Container<'static, Postgres>)> {
3760        let docker = Cli::default();
3761        let container = docker.run(Postgres::default());
3762        let port = container.get_host_port_ipv4(5432);
3763        let database_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
3764
3765        let config = StorageConfig {
3766            database_url,
3767            max_connections: 2,
3768            connect_timeout: Duration::from_secs(10),
3769            object_store: ObjectStoreConfig::InMemory,
3770        };
3771
3772        let storage = Storage::connect(config).await?;
3773        let container: Container<'static, Postgres> = unsafe { std::mem::transmute(container) };
3774        Ok((storage, container))
3775    }
3776
3777    struct EnvGuard {
3778        keys: Vec<&'static str>,
3779        previous: Vec<Option<String>>,
3780    }
3781
3782    impl EnvGuard {
3783        fn new(keys: Vec<&'static str>) -> Self {
3784            let previous = keys
3785                .iter()
3786                .map(|key| env::var(key).ok())
3787                .collect::<Vec<_>>();
3788            Self { keys, previous }
3789        }
3790    }
3791
3792    impl Drop for EnvGuard {
3793        fn drop(&mut self) {
3794            for (idx, key) in self.keys.iter().enumerate() {
3795                if let Some(value) = &self.previous[idx] {
3796                    env::set_var(key, value);
3797                } else {
3798                    env::remove_var(key);
3799                }
3800            }
3801        }
3802    }
3803
3804    fn seed_step(run_id: Uuid, step_id: Uuid) -> models::NewStep {
3805        models::NewStep {
3806            run_id,
3807            step_id,
3808            idx: 0,
3809            priority: 0,
3810            spec_json: json!({"id": step_id, "type": "tool", "inputs": {}}),
3811            status: models::StepStatus::Queued,
3812            attempt: 0,
3813            input_snapshot: None,
3814            provider: None,
3815            provider_version: None,
3816            pending_dependencies: 0,
3817            total_dependencies: 0,
3818            estimated_tokens: 0,
3819            estimated_cost: 0.0,
3820            actual_tokens: None,
3821            actual_cost: None,
3822            ..models::NewStep::default()
3823        }
3824    }
3825
3826    #[test]
3827    fn s3_config_reads_expected_env() -> Result<()> {
3828        let guard = EnvGuard::new(vec![
3829            "FLEETFORGE_S3_BUCKET",
3830            "FLEETFORGE_S3_ACCESS_KEY",
3831            "FLEETFORGE_S3_SECRET_KEY",
3832            "FLEETFORGE_S3_REGION",
3833            "FLEETFORGE_S3_ENDPOINT",
3834        ]);
3835
3836        env::set_var("FLEETFORGE_S3_BUCKET", "demo-bucket");
3837        env::set_var("FLEETFORGE_S3_ACCESS_KEY", "demo-access");
3838        env::set_var("FLEETFORGE_S3_SECRET_KEY", "demo-secret");
3839        env::set_var("FLEETFORGE_S3_REGION", "us-west-2");
3840        env::set_var("FLEETFORGE_S3_ENDPOINT", "https://s3.demo");
3841
3842        let cfg = S3Config::from_env()?;
3843        assert_eq!(cfg.bucket, "demo-bucket");
3844        assert_eq!(cfg.access_key, "demo-access");
3845        assert_eq!(cfg.secret_key, "demo-secret");
3846        assert_eq!(cfg.region.as_deref(), Some("us-west-2"));
3847        assert_eq!(cfg.endpoint.as_deref(), Some("https://s3.demo"));
3848        assert!(cfg.virtual_hosted_style);
3849
3850        drop(guard);
3851        Ok(())
3852    }
3853
3854    #[cfg_attr(not(feature = "docker-tests"), ignore = "requires Docker")]
3855    #[tokio::test]
3856    async fn migrations_apply_cleanly() -> Result<()> {
3857        if !docker_available() {
3858            eprintln!("Skipping storage::migrations test because Docker is unavailable");
3859            return Ok(());
3860        }
3861
3862        let (storage, container) = setup_storage().await?;
3863
3864        let exists: (i64,) = sqlx::query_as(
3865            "select count(*) from information_schema.tables where table_name = 'runs'",
3866        )
3867        .fetch_one(storage.pool())
3868        .await?;
3869        assert!(
3870            exists.0 >= 0,
3871            "runs table should be accessible after migrations"
3872        );
3873
3874        // DAG gating: downstream steps remain blocked until predecessors succeed.
3875        let run_one = Uuid::new_v4();
3876        let step_root = Uuid::new_v4();
3877        let step_leaf = Uuid::new_v4();
3878
3879        let root_spec = json!({"id": step_root, "type": "tool", "inputs": {}});
3880        let leaf_spec = json!({"id": step_leaf, "type": "tool", "inputs": {}});
3881        let dag_one = json!({
3882            "steps": [root_spec.clone(), leaf_spec.clone()],
3883            "edges": [[step_root, step_leaf]]
3884        });
3885
3886        let _ = storage
3887            .insert_run_with_steps(
3888                models::NewRun {
3889                    run_id: run_one,
3890                    status: models::RunStatus::Pending,
3891                    dag_json: dag_one,
3892                    input_ctx: json!({}),
3893                    seed: 1,
3894                    idempotency_key: None,
3895                },
3896                &[
3897                    models::NewStep {
3898                        run_id: run_one,
3899                        step_id: step_root,
3900                        idx: 0,
3901                        priority: 0,
3902                        spec_json: root_spec,
3903                        status: models::StepStatus::Queued,
3904                        attempt: 0,
3905                        input_snapshot: None,
3906                        provider: None,
3907                        provider_version: None,
3908                        pending_dependencies: 0,
3909                        total_dependencies: 0,
3910                        estimated_tokens: 0,
3911                        estimated_cost: 0.0,
3912                        actual_tokens: None,
3913                        actual_cost: None,
3914                        ..models::NewStep::default()
3915                    },
3916                    models::NewStep {
3917                        run_id: run_one,
3918                        step_id: step_leaf,
3919                        idx: 1,
3920                        priority: 0,
3921                        spec_json: leaf_spec,
3922                        status: models::StepStatus::Blocked,
3923                        attempt: 0,
3924                        input_snapshot: None,
3925                        provider: None,
3926                        provider_version: None,
3927                        pending_dependencies: 1,
3928                        total_dependencies: 1,
3929                        estimated_tokens: 0,
3930                        estimated_cost: 0.0,
3931                        actual_tokens: None,
3932                        actual_cost: None,
3933                        ..models::NewStep::default()
3934                    },
3935                ],
3936                &[(step_root, step_leaf)],
3937            )
3938            .await?;
3939
3940        let leased = storage.claim_queued_steps(1, "worker-1").await?;
3941        assert_eq!(leased.len(), 1);
3942        assert_eq!(leased[0].status, models::StepStatus::Leased);
3943
3944        let (run_status_one,): (String,) =
3945            sqlx::query_as("select status from runs where run_id = $1")
3946                .bind(run_one)
3947                .fetch_one(storage.pool())
3948                .await?;
3949        assert_eq!(run_status_one, "running");
3950
3951        let mut tx = storage.pool().begin().await?;
3952        storage
3953            .update_step_status_tx(
3954                &mut tx,
3955                run_one,
3956                step_root,
3957                models::StepStatus::Succeeded,
3958                Some(1),
3959                None,
3960                None,
3961                None,
3962                None,
3963                None,
3964                None,
3965                None,
3966            )
3967            .await?;
3968        let unlocked = storage
3969            .unlock_successors_tx(&mut tx, run_one, step_root)
3970            .await?;
3971        assert_eq!(unlocked, vec![step_leaf]);
3972        tx.commit().await?;
3973
3974        let (leaf_status, leaf_pending): (String, i32) = sqlx::query_as(
3975            "select status, pending_dependencies from steps where run_id = $1 and step_id = $2",
3976        )
3977        .bind(run_one)
3978        .bind(step_leaf)
3979        .fetch_one(storage.pool())
3980        .await?;
3981        assert_eq!(leaf_status.as_str(), "queued");
3982        assert_eq!(leaf_pending, 0);
3983
3984        // Completing the final step should mark the run succeeded.
3985        let mut tx = storage.pool().begin().await?;
3986        storage
3987            .update_step_status_tx(
3988                &mut tx,
3989                run_one,
3990                step_leaf,
3991                models::StepStatus::Succeeded,
3992                Some(2),
3993                None,
3994                None,
3995                None,
3996                None,
3997                None,
3998                None,
3999                None,
4000            )
4001            .await?;
4002        let (_, new_status) = storage.refresh_run_status_tx(&mut tx, run_one).await?;
4003        assert_eq!(new_status, models::RunStatus::Succeeded);
4004        tx.commit().await?;
4005
4006        // Failing a root step cascades skips and marks the run failed.
4007        let run_two = Uuid::new_v4();
4008        let step_a = Uuid::new_v4();
4009        let step_b = Uuid::new_v4();
4010        let step_c = Uuid::new_v4();
4011
4012        let spec_a = json!({"id": step_a, "type": "tool", "inputs": {}});
4013        let spec_b = json!({"id": step_b, "type": "tool", "inputs": {}});
4014        let spec_c = json!({"id": step_c, "type": "tool", "inputs": {}});
4015        let dag_two = json!({
4016            "steps": [spec_a.clone(), spec_b.clone(), spec_c.clone()],
4017            "edges": [[step_a, step_b], [step_b, step_c]]
4018        });
4019
4020        let _ = storage
4021            .insert_run_with_steps(
4022                models::NewRun {
4023                    run_id: run_two,
4024                    status: models::RunStatus::Pending,
4025                    dag_json: dag_two,
4026                    input_ctx: json!({}),
4027                    seed: 1,
4028                    idempotency_key: None,
4029                },
4030                &[
4031                    models::NewStep {
4032                        run_id: run_two,
4033                        step_id: step_a,
4034                        idx: 0,
4035                        priority: 0,
4036                        spec_json: spec_a,
4037                        status: models::StepStatus::Queued,
4038                        attempt: 0,
4039                        input_snapshot: None,
4040                        provider: None,
4041                        provider_version: None,
4042                        pending_dependencies: 0,
4043                        total_dependencies: 0,
4044                        estimated_tokens: 0,
4045                        estimated_cost: 0.0,
4046                        actual_tokens: None,
4047                        actual_cost: None,
4048                        ..models::NewStep::default()
4049                    },
4050                    models::NewStep {
4051                        run_id: run_two,
4052                        step_id: step_b,
4053                        idx: 1,
4054                        priority: 0,
4055                        spec_json: spec_b,
4056                        status: models::StepStatus::Blocked,
4057                        attempt: 0,
4058                        input_snapshot: None,
4059                        provider: None,
4060                        provider_version: None,
4061                        pending_dependencies: 1,
4062                        total_dependencies: 1,
4063                        estimated_tokens: 0,
4064                        estimated_cost: 0.0,
4065                        actual_tokens: None,
4066                        actual_cost: None,
4067                        ..models::NewStep::default()
4068                    },
4069                    models::NewStep {
4070                        run_id: run_two,
4071                        step_id: step_c,
4072                        idx: 2,
4073                        priority: 0,
4074                        spec_json: spec_c,
4075                        status: models::StepStatus::Blocked,
4076                        attempt: 0,
4077                        input_snapshot: None,
4078                        provider: None,
4079                        provider_version: None,
4080                        pending_dependencies: 1,
4081                        total_dependencies: 1,
4082                        estimated_tokens: 0,
4083                        estimated_cost: 0.0,
4084                        actual_tokens: None,
4085                        actual_cost: None,
4086                        ..models::NewStep::default()
4087                    },
4088                ],
4089                &[(step_a, step_b), (step_b, step_c)],
4090            )
4091            .await?;
4092
4093        storage.claim_queued_steps(1, "worker-2").await?;
4094        let (run_two_status,): (String,) =
4095            sqlx::query_as("select status from runs where run_id = $1")
4096                .bind(run_two)
4097                .fetch_one(storage.pool())
4098                .await?;
4099        assert_eq!(run_two_status, "running");
4100
4101        let mut tx = storage.pool().begin().await?;
4102        storage
4103            .update_step_status_tx(
4104                &mut tx,
4105                run_two,
4106                step_a,
4107                models::StepStatus::Failed,
4108                Some(1),
4109                None,
4110                None,
4111                None,
4112                None,
4113                None,
4114                None,
4115                None,
4116            )
4117            .await?;
4118        let skipped = storage
4119            .skip_downstream_steps_tx(&mut tx, run_two, step_a)
4120            .await?;
4121        assert_eq!(skipped.len(), 2);
4122        assert!(skipped.contains(&step_b) && skipped.contains(&step_c));
4123        let (_, failed_status) = storage.refresh_run_status_tx(&mut tx, run_two).await?;
4124        assert_eq!(failed_status, models::RunStatus::Failed);
4125        tx.commit().await?;
4126
4127        for target in [step_b, step_c] {
4128            let (status,): (String,) =
4129                sqlx::query_as("select status from steps where run_id = $1 and step_id = $2")
4130                    .bind(run_two)
4131                    .bind(target)
4132                    .fetch_one(storage.pool())
4133                    .await?;
4134            assert_eq!(status, "skipped");
4135        }
4136
4137        Ok(())
4138    }
4139
4140    #[tokio::test]
4141    async fn in_memory_store_supports_idempotency() -> Result<()> {
4142        use crate::RunStore;
4143
4144        let store = InMemoryStorage::new();
4145        let run_id = Uuid::new_v4();
4146        let step_id = Uuid::new_v4();
4147        let dag = json!({
4148            "steps": [{
4149                "id": step_id,
4150                "type": "tool",
4151                "inputs": {}
4152            }]
4153        });
4154
4155        let first = store
4156            .insert_run_with_steps(
4157                models::NewRun {
4158                    run_id,
4159                    status: models::RunStatus::Pending,
4160                    dag_json: dag.clone(),
4161                    input_ctx: json!({}),
4162                    seed: 1,
4163                    idempotency_key: Some("mem-key".into()),
4164                },
4165                &[models::NewStep {
4166                    run_id,
4167                    step_id,
4168                    idx: 0,
4169                    priority: 0,
4170                    spec_json: json!({ "id": step_id, "type": "tool", "inputs": {} }),
4171                    status: models::StepStatus::Queued,
4172                    attempt: 0,
4173                    input_snapshot: None,
4174                    provider: None,
4175                    provider_version: None,
4176                    pending_dependencies: 0,
4177                    total_dependencies: 0,
4178                    estimated_tokens: 0,
4179                    estimated_cost: 0.0,
4180                    actual_tokens: None,
4181                    actual_cost: None,
4182                    ..models::NewStep::default()
4183                }],
4184                &[],
4185            )
4186            .await?;
4187        assert_eq!(first, run_id);
4188
4189        let reused = store
4190            .insert_run_with_steps(
4191                models::NewRun {
4192                    run_id: Uuid::new_v4(),
4193                    status: models::RunStatus::Pending,
4194                    dag_json: dag,
4195                    input_ctx: json!({}),
4196                    seed: 1,
4197                    idempotency_key: Some("mem-key".into()),
4198                },
4199                &[],
4200                &[],
4201            )
4202            .await?;
4203        assert_eq!(
4204            reused, run_id,
4205            "in-memory store should reuse idempotent run id"
4206        );
4207
4208        let fetched = store.fetch_run(run_id).await?.expect("run exists");
4209        assert_eq!(fetched.run_id, run_id);
4210        let steps = store.fetch_steps_for_run(run_id).await?;
4211        assert_eq!(steps.len(), 1);
4212
4213        Ok(())
4214    }
4215
4216    #[cfg_attr(not(feature = "docker-tests"), ignore = "requires Docker")]
4217    #[tokio::test]
4218    async fn idempotency_reuses_existing_run() -> Result<()> {
4219        if !docker_available() {
4220            eprintln!("Skipping idempotency test because Docker is unavailable");
4221            return Ok(());
4222        }
4223
4224        let (storage, container) = setup_storage().await?;
4225        let idempotency_key = "reuse-key";
4226        let run_id = Uuid::new_v4();
4227        let step_id = Uuid::new_v4();
4228        let step_spec = json!({
4229            "id": step_id,
4230            "type": "tool",
4231            "inputs": {}
4232        });
4233        let dag = json!({ "steps": [step_spec.clone()] });
4234
4235        let inserted = storage
4236            .insert_run_with_steps(
4237                models::NewRun {
4238                    run_id,
4239                    status: models::RunStatus::Pending,
4240                    dag_json: dag.clone(),
4241                    input_ctx: json!({}),
4242                    seed: 1,
4243                    idempotency_key: Some(idempotency_key.into()),
4244                },
4245                &[models::NewStep {
4246                    run_id,
4247                    step_id,
4248                    idx: 0,
4249                    priority: 0,
4250                    spec_json: step_spec,
4251                    status: models::StepStatus::Queued,
4252                    attempt: 0,
4253                    input_snapshot: None,
4254                    provider: None,
4255                    provider_version: None,
4256                    pending_dependencies: 0,
4257                    total_dependencies: 0,
4258                    estimated_tokens: 0,
4259                    estimated_cost: 0.0,
4260                    actual_tokens: None,
4261                    actual_cost: None,
4262                    ..models::NewStep::default()
4263                }],
4264                &[],
4265            )
4266            .await?;
4267        assert_eq!(inserted, run_id, "initial insert should return run id");
4268
4269        let reused = storage
4270            .insert_run_with_steps(
4271                models::NewRun {
4272                    run_id: Uuid::new_v4(),
4273                    status: models::RunStatus::Pending,
4274                    dag_json: dag,
4275                    input_ctx: json!({}),
4276                    seed: 1,
4277                    idempotency_key: Some(idempotency_key.into()),
4278                },
4279                &[],
4280                &[],
4281            )
4282            .await?;
4283        assert_eq!(
4284            reused, run_id,
4285            "second insert with same idempotency key should reuse original run id"
4286        );
4287
4288        drop(container);
4289        Ok(())
4290    }
4291
4292    #[cfg_attr(not(feature = "docker-tests"), ignore = "requires Docker")]
4293    #[tokio::test]
4294    async fn claim_queued_orders_by_priority_then_idx() -> Result<()> {
4295        if std::fs::metadata("/var/run/docker.sock").is_err()
4296            && std::env::var("DOCKER_HOST").is_err()
4297        {
4298            eprintln!("Skipping storage queue test because Docker is unavailable");
4299            return Ok(());
4300        }
4301
4302        let docker = Cli::default();
4303        let container = docker.run(Postgres::default());
4304        let port = container.get_host_port_ipv4(5432);
4305        let database_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
4306
4307        let storage = Storage::connect(StorageConfig {
4308            database_url,
4309            max_connections: 1,
4310            connect_timeout: Duration::from_secs(10),
4311            object_store: ObjectStoreConfig::InMemory,
4312        })
4313        .await?;
4314
4315        let run_id = Uuid::new_v4();
4316        let step_low = Uuid::new_v4();
4317        let step_high_a = Uuid::new_v4();
4318        let step_high_b = Uuid::new_v4();
4319
4320        let _ = storage
4321            .insert_run_with_steps(
4322                models::NewRun {
4323                    run_id,
4324                    status: models::RunStatus::Pending,
4325                    dag_json: json!({"steps": [], "edges": []}),
4326                    input_ctx: json!({}),
4327                    seed: 1,
4328                    idempotency_key: None,
4329                },
4330                &[
4331                    models::NewStep {
4332                        run_id,
4333                        step_id: step_low,
4334                        idx: 2,
4335                        priority: 1,
4336                        spec_json: json!({"id": step_low, "type": "tool", "inputs": {}, "policy": {"priority": 1}}),
4337                        status: models::StepStatus::Queued,
4338                        attempt: 0,
4339                        input_snapshot: None,
4340                        provider: None,
4341                        provider_version: None,
4342                        pending_dependencies: 0,
4343                        total_dependencies: 0,
4344                        estimated_tokens: 0,
4345                        estimated_cost: 0.0,
4346                        actual_tokens: None,
4347                        actual_cost: None,
4348                        ..models::NewStep::default()
4349                    },
4350                    models::NewStep {
4351                        run_id,
4352                        step_id: step_high_a,
4353                        idx: 0,
4354                        priority: 5,
4355                        spec_json: json!({"id": step_high_a, "type": "tool", "inputs": {}, "policy": {"priority": 5}}),
4356                        status: models::StepStatus::Queued,
4357                        attempt: 0,
4358                        input_snapshot: None,
4359                        provider: None,
4360                        provider_version: None,
4361                        pending_dependencies: 0,
4362                        total_dependencies: 0,
4363                        estimated_tokens: 0,
4364                        estimated_cost: 0.0,
4365                        actual_tokens: None,
4366                        actual_cost: None,
4367                        ..models::NewStep::default()
4368                    },
4369                    models::NewStep {
4370                        run_id,
4371                        step_id: step_high_b,
4372                        idx: 1,
4373                        priority: 5,
4374                        spec_json: json!({"id": step_high_b, "type": "tool", "inputs": {}, "policy": {"priority": 5}}),
4375                        status: models::StepStatus::Queued,
4376                        attempt: 0,
4377                        input_snapshot: None,
4378                        provider: None,
4379                        provider_version: None,
4380                        pending_dependencies: 0,
4381                        total_dependencies: 0,
4382                        estimated_tokens: 0,
4383                        estimated_cost: 0.0,
4384                        actual_tokens: None,
4385                        actual_cost: None,
4386                        ..models::NewStep::default()
4387                    },
4388                ],
4389                &[],
4390            )
4391            .await?;
4392
4393        let first = storage
4394            .claim_queued_steps(2, "worker-a")
4395            .await?
4396            .into_iter()
4397            .map(models::Step::try_from)
4398            .collect::<Result<Vec<_>>>()?;
4399        assert_eq!(first.len(), 2);
4400        assert_eq!(first[0].step_id, step_high_a);
4401        assert_eq!(first[1].step_id, step_high_b);
4402        assert!(first.iter().all(|s| s.status == models::StepStatus::Leased));
4403
4404        let second = storage
4405            .claim_queued_steps(2, "worker-b")
4406            .await?
4407            .into_iter()
4408            .map(models::Step::try_from)
4409            .collect::<Result<Vec<_>>>()?;
4410        assert_eq!(second.len(), 1);
4411        assert_eq!(second[0].step_id, step_low);
4412        assert_eq!(second[0].status, models::StepStatus::Leased);
4413
4414        drop(container);
4415        Ok(())
4416    }
4417
4418    #[tokio::test]
4419    async fn list_recent_runs_returns_most_recent_first() -> Result<()> {
4420        if !docker_available() {
4421            eprintln!("Skipping list_recent_runs test because Docker is unavailable");
4422            return Ok(());
4423        }
4424
4425        let (storage, container) = setup_storage().await?;
4426
4427        let run_one = Uuid::new_v4();
4428        let step_one = Uuid::new_v4();
4429        storage
4430            .insert_run_with_steps(
4431                models::NewRun {
4432                    run_id: run_one,
4433                    status: models::RunStatus::Pending,
4434                    dag_json: json!({ "steps": [], "edges": [] }),
4435                    input_ctx: json!({}),
4436                    seed: 1,
4437                    idempotency_key: None,
4438                },
4439                &[models::NewStep {
4440                    run_id: run_one,
4441                    step_id: step_one,
4442                    idx: 0,
4443                    priority: 0,
4444                    spec_json: json!({"id": step_one, "type": "tool"}),
4445                    status: models::StepStatus::Queued,
4446                    attempt: 0,
4447                    input_snapshot: None,
4448                    provider: None,
4449                    provider_version: None,
4450                    pending_dependencies: 0,
4451                    total_dependencies: 0,
4452                    estimated_tokens: 0,
4453                    estimated_cost: 0.0,
4454                    actual_tokens: None,
4455                    actual_cost: None,
4456                    ..models::NewStep::default()
4457                }],
4458                &[],
4459            )
4460            .await?;
4461
4462        let run_two = Uuid::new_v4();
4463        let step_two = Uuid::new_v4();
4464        storage
4465            .insert_run_with_steps(
4466                models::NewRun {
4467                    run_id: run_two,
4468                    status: models::RunStatus::Pending,
4469                    dag_json: json!({ "steps": [], "edges": [] }),
4470                    input_ctx: json!({}),
4471                    seed: 2,
4472                    idempotency_key: None,
4473                },
4474                &[models::NewStep {
4475                    run_id: run_two,
4476                    step_id: step_two,
4477                    idx: 0,
4478                    priority: 0,
4479                    spec_json: json!({"id": step_two, "type": "tool"}),
4480                    status: models::StepStatus::Queued,
4481                    attempt: 0,
4482                    input_snapshot: None,
4483                    provider: None,
4484                    provider_version: None,
4485                    pending_dependencies: 0,
4486                    total_dependencies: 0,
4487                    estimated_tokens: 0,
4488                    estimated_cost: 0.0,
4489                    actual_tokens: None,
4490                    actual_cost: None,
4491                    ..models::NewStep::default()
4492                }],
4493                &[],
4494            )
4495            .await?;
4496
4497        let runs = storage.list_recent_runs(10).await?;
4498        assert!(
4499            runs.len() >= 2,
4500            "expected at least two runs in recent listing"
4501        );
4502        assert_eq!(
4503            runs.first().map(|run| run.run_id),
4504            Some(run_two),
4505            "most recent run should appear first"
4506        );
4507
4508        drop(container);
4509        Ok(())
4510    }
4511
4512    #[tokio::test]
4513    async fn run_roundtrip_insert_and_fetch() -> Result<()> {
4514        if std::fs::metadata("/var/run/docker.sock").is_err()
4515            && std::env::var("DOCKER_HOST").is_err()
4516        {
4517            eprintln!("Skipping storage::roundtrip test because Docker is unavailable");
4518            return Ok(());
4519        }
4520
4521        let docker = Cli::default();
4522        let container = docker.run(Postgres::default());
4523        let port = container.get_host_port_ipv4(5432);
4524        let database_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
4525
4526        let storage = Storage::connect(StorageConfig {
4527            database_url,
4528            max_connections: 1,
4529            connect_timeout: Duration::from_secs(10),
4530            object_store: ObjectStoreConfig::InMemory,
4531        })
4532        .await?;
4533
4534        let run_id = Uuid::new_v4();
4535        let step_root = Uuid::new_v4();
4536        let step_leaf = Uuid::new_v4();
4537
4538        let _ = storage
4539            .insert_run_with_steps(
4540                models::NewRun {
4541                    run_id,
4542                    status: models::RunStatus::Pending,
4543                    dag_json: json!({
4544                        "steps": [
4545                            {"id": step_root, "type": "tool", "inputs": {}},
4546                            {"id": step_leaf, "type": "tool", "inputs": {}}
4547                        ],
4548                        "edges": [[step_root, step_leaf]]
4549                    }),
4550                    input_ctx: json!({"labels": {"example": "roundtrip"}}),
4551                    seed: 99,
4552                    idempotency_key: Some("round-trip".into()),
4553                },
4554                &[
4555                    models::NewStep {
4556                        run_id,
4557                        step_id: step_root,
4558                        idx: 0,
4559                        priority: 5,
4560                        spec_json: json!({"id": step_root, "type": "tool", "inputs": {}, "policy": {"priority": 5}}),
4561                        status: models::StepStatus::Queued,
4562                        attempt: 0,
4563                        input_snapshot: None,
4564                        provider: None,
4565                        provider_version: None,
4566                        pending_dependencies: 0,
4567                        total_dependencies: 0,
4568                        estimated_tokens: 10,
4569                        estimated_cost: 0.1,
4570                        actual_tokens: None,
4571                        actual_cost: None,
4572                        ..models::NewStep::default()
4573                    },
4574                    models::NewStep {
4575                        run_id,
4576                        step_id: step_leaf,
4577                        idx: 1,
4578                        priority: 1,
4579                        spec_json: json!({"id": step_leaf, "type": "tool", "inputs": {}, "policy": {"priority": 1}}),
4580                        status: models::StepStatus::Blocked,
4581                        attempt: 0,
4582                        input_snapshot: None,
4583                        provider: None,
4584                        provider_version: None,
4585                        pending_dependencies: 1,
4586                        total_dependencies: 1,
4587                        estimated_tokens: 5,
4588                        estimated_cost: 0.05,
4589                        actual_tokens: None,
4590                        actual_cost: None,
4591                        ..models::NewStep::default()
4592                    },
4593                ],
4594                &[(step_root, step_leaf)],
4595            )
4596            .await?;
4597
4598        let fetched_run = storage.fetch_run(run_id).await?.expect("run should exist");
4599        assert_eq!(fetched_run.status, models::RunStatus::Pending);
4600        assert_eq!(fetched_run.seed, 99);
4601        assert_eq!(fetched_run.idempotency_key.as_deref(), Some("round-trip"));
4602
4603        let steps = storage.fetch_steps_for_run(run_id).await?;
4604        assert_eq!(steps.len(), 2);
4605        let root = steps.iter().find(|s| s.step_id == step_root).unwrap();
4606        let leaf = steps.iter().find(|s| s.step_id == step_leaf).unwrap();
4607        assert_eq!(root.status, models::StepStatus::Queued);
4608        assert_eq!(leaf.status, models::StepStatus::Blocked);
4609        assert_eq!(root.priority, 5);
4610        assert_eq!(leaf.priority, 1);
4611
4612        let from_key = storage
4613            .fetch_run_by_idempotency_key("round-trip")
4614            .await?
4615            .expect("run by key");
4616        assert_eq!(from_key.run_id, run_id);
4617
4618        drop(container);
4619        Ok(())
4620    }
4621
4622    #[tokio::test]
4623    async fn return_step_to_queue_round_trip() -> Result<()> {
4624        if !docker_available() {
4625            eprintln!("Skipping return_step_to_queue test because Docker is unavailable");
4626            return Ok(());
4627        }
4628
4629        let (storage, container) = setup_storage().await?;
4630
4631        let run_id = Uuid::new_v4();
4632        let step_id = Uuid::new_v4();
4633        let step_spec = json!({"id": step_id, "type": "tool", "inputs": {}});
4634
4635        let _ = storage
4636            .insert_run_with_steps(
4637                models::NewRun {
4638                    run_id,
4639                    status: models::RunStatus::Pending,
4640                    dag_json: json!({"steps": [step_spec.clone()]}),
4641                    input_ctx: json!({}),
4642                    seed: 1,
4643                    idempotency_key: None,
4644                },
4645                &[models::NewStep {
4646                    run_id,
4647                    step_id,
4648                    idx: 0,
4649                    priority: 0,
4650                    spec_json: step_spec,
4651                    status: models::StepStatus::Queued,
4652                    attempt: 0,
4653                    input_snapshot: None,
4654                    provider: None,
4655                    provider_version: None,
4656                    pending_dependencies: 0,
4657                    total_dependencies: 0,
4658                    estimated_tokens: 0,
4659                    estimated_cost: 0.0,
4660                    actual_tokens: None,
4661                    actual_cost: None,
4662                    ..models::NewStep::default()
4663                }],
4664                &[],
4665            )
4666            .await?;
4667
4668        let leased = storage.claim_queued_steps(1, "worker-test").await?;
4669        assert_eq!(leased.len(), 1);
4670
4671        storage.return_step_to_queue(run_id, step_id).await?;
4672
4673        let (status, leased_by): (String, Option<String>) = sqlx::query_as(
4674            "select status, leased_by from steps where run_id = $1 and step_id = $2",
4675        )
4676        .bind(run_id)
4677        .bind(step_id)
4678        .fetch_one(storage.pool())
4679        .await?;
4680
4681        assert_eq!(status, "queued");
4682        assert!(leased_by.is_none());
4683
4684        drop(container);
4685        Ok(())
4686    }
4687
4688    #[cfg_attr(not(feature = "docker-tests"), ignore = "requires Docker")]
4689    #[tokio::test]
4690    async fn requeue_expired_leases_requeues_all() -> Result<()> {
4691        if !docker_available() {
4692            eprintln!("Skipping requeue_expired_leases test because Docker is unavailable");
4693            return Ok(());
4694        }
4695
4696        let (storage, container) = setup_storage().await?;
4697
4698        let run_id = Uuid::new_v4();
4699        let steps: Vec<(Uuid, serde_json::Value)> = (0..3)
4700            .map(|idx| {
4701                let step_id = Uuid::new_v4();
4702                (
4703                    step_id,
4704                    json!({
4705                        "id": step_id,
4706                        "type": "tool",
4707                        "inputs": {},
4708                        "policy": {"budget": {"tokens": idx + 1}}
4709                    }),
4710                )
4711            })
4712            .collect();
4713
4714        let _ = storage
4715            .insert_run_with_steps(
4716                models::NewRun {
4717                    run_id,
4718                    status: models::RunStatus::Pending,
4719                    dag_json: json!({"steps": steps.iter().map(|(_, spec)| spec.clone()).collect::<Vec<_>>()}),
4720                    input_ctx: json!({}),
4721                    seed: 1,
4722                    idempotency_key: None,
4723                },
4724                &steps
4725                    .iter()
4726                    .enumerate()
4727                    .map(|(idx, (step_id, spec))| models::NewStep {
4728                        run_id,
4729                        step_id: *step_id,
4730                        idx: idx as i32,
4731                        priority: 0,
4732                        spec_json: spec.clone(),
4733                        status: models::StepStatus::Queued,
4734                        attempt: 0,
4735                        input_snapshot: None,
4736                        provider: None,
4737                        provider_version: None,
4738                        pending_dependencies: 0,
4739                        total_dependencies: 0,
4740                        estimated_tokens: 0,
4741                        estimated_cost: 0.0,
4742                        actual_tokens: None,
4743                        actual_cost: None,
4744                        ..models::NewStep::default()
4745                    })
4746                    .collect::<Vec<_>>(),
4747                &[],
4748            )
4749            .await?;
4750
4751        let _claimed = storage.claim_queued_steps(3, "worker-requeue").await?;
4752        sqlx::query(
4753            "update steps set lease_expires_at = now() - interval '5 minutes' where run_id = $1",
4754        )
4755        .bind(run_id)
4756        .execute(storage.pool())
4757        .await?;
4758
4759        let requeued = storage.requeue_expired_leases(10).await?;
4760        assert_eq!(requeued.len(), steps.len());
4761        let requeued_ids: Vec<Uuid> = requeued.iter().map(|(_, step)| *step).collect();
4762        for (step_id, _) in &steps {
4763            assert!(requeued_ids.contains(step_id));
4764        }
4765
4766        let rows: Vec<(String, Option<String>)> =
4767            sqlx::query_as("select status, leased_by from steps where run_id = $1 order by idx")
4768                .bind(run_id)
4769                .fetch_all(storage.pool())
4770                .await?;
4771        for (status, leased_by) in rows {
4772            assert_eq!(status, "queued");
4773            assert!(leased_by.is_none());
4774        }
4775
4776        drop(container);
4777        Ok(())
4778    }
4779
4780    #[cfg_attr(not(feature = "docker-tests"), ignore = "requires Docker")]
4781    #[tokio::test]
4782    async fn idempotency_prevents_duplicate_runs() -> Result<()> {
4783        if !docker_available() {
4784            eprintln!("Skipping idempotency test because Docker is unavailable");
4785            return Ok(());
4786        }
4787
4788        let (storage, container) = setup_storage().await?;
4789        let key = "dup-key";
4790
4791        let run_one = Uuid::new_v4();
4792        let step_one = Uuid::new_v4();
4793        let step_spec = json!({"id": step_one, "type": "tool", "inputs": {}});
4794
4795        let _ = storage
4796            .insert_run_with_steps(
4797                models::NewRun {
4798                    run_id: run_one,
4799                    status: models::RunStatus::Pending,
4800                    dag_json: json!({"steps": [step_spec.clone()]}),
4801                    input_ctx: json!({}),
4802                    seed: 1,
4803                    idempotency_key: Some(key.to_string()),
4804                },
4805                &[models::NewStep {
4806                    run_id: run_one,
4807                    step_id: step_one,
4808                    idx: 0,
4809                    priority: 0,
4810                    spec_json: step_spec.clone(),
4811                    status: models::StepStatus::Queued,
4812                    attempt: 0,
4813                    input_snapshot: None,
4814                    provider: None,
4815                    provider_version: None,
4816                    pending_dependencies: 0,
4817                    total_dependencies: 0,
4818                    estimated_tokens: 0,
4819                    estimated_cost: 0.0,
4820                    actual_tokens: None,
4821                    actual_cost: None,
4822                    ..models::NewStep::default()
4823                }],
4824                &[],
4825            )
4826            .await?;
4827
4828        let run_two = Uuid::new_v4();
4829        let duplicate_id = storage
4830            .insert_run_with_steps(
4831                models::NewRun {
4832                    run_id: run_two,
4833                    status: models::RunStatus::Pending,
4834                    dag_json: json!({"steps": [step_spec.clone()]}),
4835                    input_ctx: json!({}),
4836                    seed: 1,
4837                    idempotency_key: Some(key.to_string()),
4838                },
4839                &[models::NewStep {
4840                    run_id: run_two,
4841                    step_id: Uuid::new_v4(),
4842                    idx: 0,
4843                    priority: 0,
4844                    spec_json: step_spec,
4845                    status: models::StepStatus::Queued,
4846                    attempt: 0,
4847                    input_snapshot: None,
4848                    provider: None,
4849                    provider_version: None,
4850                    pending_dependencies: 0,
4851                    total_dependencies: 0,
4852                    estimated_tokens: 0,
4853                    estimated_cost: 0.0,
4854                    actual_tokens: None,
4855                    actual_cost: None,
4856                    ..models::NewStep::default()
4857                }],
4858                &[],
4859            )
4860            .await?;
4861
4862        assert_eq!(
4863            duplicate_id, run_one,
4864            "idempotent submission should reuse run_id"
4865        );
4866
4867        let steps = storage.fetch_steps_for_run(run_one).await?;
4868        assert_eq!(
4869            steps.len(),
4870            1,
4871            "duplicate submission should not add extra steps"
4872        );
4873
4874        drop(container);
4875        Ok(())
4876    }
4877
4878    #[tokio::test]
4879    async fn audit_chain_attaches_metadata() -> Result<()> {
4880        if !docker_available() {
4881            eprintln!("Skipping audit chain test because Docker is unavailable");
4882            return Ok(());
4883        }
4884
4885        let (storage, container) = setup_storage().await?;
4886        let run_id = Uuid::new_v4();
4887        let step_id = Uuid::new_v4();
4888
4889        storage
4890            .insert_run_with_steps(
4891                models::NewRun {
4892                    run_id,
4893                    status: models::RunStatus::Pending,
4894                    dag_json: json!({"steps": []}),
4895                    input_ctx: json!({}),
4896                    seed: 1,
4897                    idempotency_key: None,
4898                },
4899                &[seed_step(run_id, step_id)],
4900                &[],
4901            )
4902            .await?;
4903
4904        let mut tx = storage.pool().begin().await?;
4905        storage
4906            .enqueue_outbox_event(
4907                &mut tx,
4908                models::NewOutboxEvent {
4909                    run_id,
4910                    step_id: Some(step_id),
4911                    kind: "step_started".to_string(),
4912                    payload: json!({"status": "running"}),
4913                },
4914            )
4915            .await?;
4916        storage
4917            .enqueue_outbox_event(
4918                &mut tx,
4919                models::NewOutboxEvent {
4920                    run_id,
4921                    step_id: Some(step_id),
4922                    kind: "step_succeeded".to_string(),
4923                    payload: json!({"status": "succeeded"}),
4924                },
4925            )
4926            .await?;
4927        tx.commit().await?;
4928
4929        let events = storage.fetch_outbox_for_run_since(run_id, 0, 10).await?;
4930        assert_eq!(events.len(), 2);
4931
4932        let first_audit = &events[0].payload["_audit"];
4933        let second_audit = &events[1].payload["_audit"];
4934        assert_eq!(first_audit["version"], json!(1));
4935        assert!(first_audit["chain_hash"].as_str().is_some());
4936        assert!(first_audit["previous_hash"].is_null());
4937
4938        let first_hash = first_audit["chain_hash"].as_str().unwrap();
4939        assert_eq!(second_audit["previous_hash"].as_str(), Some(first_hash));
4940        assert!(second_audit["chain_hash"].as_str().is_some());
4941
4942        drop(container);
4943        Ok(())
4944    }
4945
4946    #[tokio::test]
4947    async fn audit_chain_serializes_concurrent_writers() -> Result<()> {
4948        if !docker_available() {
4949            eprintln!("Skipping audit concurrency test because Docker is unavailable");
4950            return Ok(());
4951        }
4952
4953        let (storage, container) = setup_storage().await?;
4954        let storage = Arc::new(storage);
4955        let run_id = Uuid::new_v4();
4956        let step_id = Uuid::new_v4();
4957
4958        storage
4959            .insert_run_with_steps(
4960                models::NewRun {
4961                    run_id,
4962                    status: models::RunStatus::Pending,
4963                    dag_json: json!({"steps": []}),
4964                    input_ctx: json!({}),
4965                    seed: 1,
4966                    idempotency_key: None,
4967                },
4968                &[seed_step(run_id, step_id)],
4969                &[],
4970            )
4971            .await?;
4972
4973        let barrier = Arc::new(Barrier::new(3));
4974        let mut handles = Vec::new();
4975        for (kind, status) in [
4976            ("step_running".to_string(), "running".to_string()),
4977            ("step_succeeded".to_string(), "succeeded".to_string()),
4978        ] {
4979            let storage = Arc::clone(&storage);
4980            let barrier = Arc::clone(&barrier);
4981            let handle = tokio::spawn(async move {
4982                barrier.wait().await;
4983                let mut tx = storage.pool().begin().await?;
4984                storage
4985                    .enqueue_outbox_event(
4986                        &mut tx,
4987                        models::NewOutboxEvent {
4988                            run_id,
4989                            step_id: Some(step_id),
4990                            kind,
4991                            payload: json!({"status": status}),
4992                        },
4993                    )
4994                    .await?;
4995                tx.commit().await?;
4996                Ok::<(), anyhow::Error>(())
4997            });
4998            handles.push(handle);
4999        }
5000
5001        barrier.wait().await;
5002        for handle in handles {
5003            handle.await??;
5004        }
5005
5006        let events = storage.fetch_outbox_for_run_since(run_id, 0, 10).await?;
5007        assert_eq!(events.len(), 2);
5008        let first = &events[0].payload["_audit"];
5009        let second = &events[1].payload["_audit"];
5010        assert_eq!(
5011            second["previous_hash"].as_str(),
5012            first["chain_hash"].as_str()
5013        );
5014
5015        drop(container);
5016        Ok(())
5017    }
5018
5019    #[tokio::test]
5020    async fn tenant_quota_enforced() -> Result<()> {
5021        if !docker_available() {
5022            eprintln!("Skipping quota test because Docker is unavailable");
5023            return Ok(());
5024        }
5025
5026        let (storage, container) = setup_storage().await?;
5027        let tenant = "tenant-alpha";
5028        let run_id = Uuid::new_v4();
5029        let step_id = Uuid::new_v4();
5030
5031        storage
5032            .insert_run_with_steps(
5033                models::NewRun {
5034                    run_id,
5035                    status: models::RunStatus::Pending,
5036                    dag_json: json!({"steps": []}),
5037                    input_ctx: json!({"labels": {"tenant": tenant}}),
5038                    seed: 1,
5039                    idempotency_key: None,
5040                },
5041                &[seed_step(run_id, step_id)],
5042                &[],
5043            )
5044            .await?;
5045
5046        sqlx::query(
5047            "insert into tenant_quotas (tenant, daily_token_limit, daily_cost_limit) values ($1, $2, $3)",
5048        )
5049        .bind(tenant)
5050        .bind(1000_i64)
5051        .bind(10.0_f64)
5052        .execute(storage.pool())
5053        .await?;
5054
5055        storage.enforce_tenant_quota(tenant, 100, 1.0).await?;
5056
5057        let mut tx = storage.pool().begin().await?;
5058        storage
5059            .record_billing_usage_tx(&mut tx, run_id, 900, 5.0)
5060            .await?;
5061        tx.commit().await?;
5062
5063        let err = storage
5064            .enforce_tenant_quota(tenant, 200, 1.0)
5065            .await
5066            .expect_err("quota should block overage");
5067        assert!(format!("{err}").contains("quota"));
5068
5069        drop(container);
5070        Ok(())
5071    }
5072
5073    #[tokio::test]
5074    async fn retention_policy_scrubs_payloads() -> Result<()> {
5075        if !docker_available() {
5076            eprintln!("Skipping retention test because Docker is unavailable");
5077            return Ok(());
5078        }
5079
5080        let (storage, container) = setup_storage().await?;
5081        let run_id = Uuid::new_v4();
5082        let step_id = Uuid::new_v4();
5083
5084        storage
5085            .insert_run_with_steps(
5086                models::NewRun {
5087                    run_id,
5088                    status: models::RunStatus::Pending,
5089                    dag_json: json!({"steps": []}),
5090                    input_ctx: json!({"inputs": {"secret": "value"}, "labels": {"team": "core"}}),
5091                    seed: 1,
5092                    idempotency_key: None,
5093                },
5094                &[models::NewStep {
5095                    run_id,
5096                    step_id,
5097                    idx: 0,
5098                    priority: 0,
5099                    spec_json: json!({"id": step_id, "type": "tool", "inputs": {}}),
5100                    status: models::StepStatus::Succeeded,
5101                    attempt: 1,
5102                    input_snapshot: Some(json!({"secret": "value"})),
5103                    provider: None,
5104                    provider_version: None,
5105                    pending_dependencies: 0,
5106                    total_dependencies: 0,
5107                    estimated_tokens: 0,
5108                    estimated_cost: 0.0,
5109                    actual_tokens: Some(5),
5110                    actual_cost: Some(0.01),
5111                    leased_by: None,
5112                    lease_expires_at: None,
5113                    output_snapshot: Some(json!({"result": "ok"})),
5114                    output_json: Some(json!({"result": "ok"})),
5115                    error_json: Some(json!({"message": "none"})),
5116                }],
5117                &[],
5118            )
5119            .await?;
5120
5121        {
5122            let mut tx = storage.pool().begin().await?;
5123            storage
5124                .enqueue_outbox_event(
5125                    &mut tx,
5126                    models::NewOutboxEvent {
5127                        run_id,
5128                        step_id: Some(step_id),
5129                        kind: "step_succeeded".to_string(),
5130                        payload: json!({
5131                            "status": "succeeded",
5132                            "input": {"secret": "value"},
5133                            "output": {"result": "ok"},
5134                            "snapshots": {"output": {"result": "ok"}}
5135                        }),
5136                    },
5137                )
5138                .await?;
5139            tx.commit().await?;
5140        }
5141
5142        sqlx::query("update runs set created_at = now() - interval '10 days' where run_id = $1")
5143            .bind(run_id)
5144            .execute(storage.pool())
5145            .await?;
5146        sqlx::query("update steps set created_at = now() - interval '10 days' where run_id = $1")
5147            .bind(run_id)
5148            .execute(storage.pool())
5149            .await?;
5150
5151        let cutoff = Utc::now() - chrono::Duration::days(7);
5152        let stats = storage.apply_retention_policy(cutoff).await?;
5153        assert!(stats.runs_scrubbed >= 1);
5154        assert!(stats.steps_scrubbed >= 1);
5155        assert!(stats.events_scrubbed >= 1);
5156
5157        let run = storage.fetch_run(run_id).await?.expect("run exists");
5158        assert_eq!(run.input_ctx.get("inputs"), Some(&json!({})));
5159        assert_eq!(run.input_ctx.get("labels"), Some(&json!({})));
5160
5161        let step = storage
5162            .fetch_steps_for_run(run_id)
5163            .await?
5164            .into_iter()
5165            .next()
5166            .unwrap();
5167        assert!(step.input_snapshot.is_none());
5168        assert!(step.output_snapshot.is_none());
5169        assert!(step.output_json.is_none());
5170        assert!(step.error_json.is_none());
5171
5172        let events = storage.fetch_outbox_for_run_since(run_id, 0, 10).await?;
5173        assert_eq!(events.len(), 1);
5174        let payload = &events[0].payload;
5175        assert_eq!(payload.get("status"), Some(&json!("succeeded")));
5176        assert!(payload.get("input").is_none());
5177        assert!(payload.get("output").is_none());
5178        assert!(payload.get("snapshots").is_none());
5179        assert!(payload.get("_audit").is_some());
5180
5181        drop(container);
5182        Ok(())
5183    }
5184
5185    #[tokio::test]
5186    async fn artifact_store_put_get_presign() -> Result<()> {
5187        let store = artifacts::ObjectStoreArtifactStore::new(
5188            Arc::new(object_store::memory::InMemory::new()),
5189            None,
5190        );
5191        let payload = Bytes::from_static(b"hello world");
5192        let metadata = json!({"tag": "greeting"});
5193
5194        let meta = store
5195            .put(payload.clone(), "text/plain", metadata.clone())
5196            .await?;
5197        let fetched = store.get(meta.sha256).await?;
5198        assert_eq!(fetched, payload);
5199
5200        let url = store
5201            .presign_get(meta.sha256, Duration::from_secs(30))
5202            .await?;
5203        assert!(url.as_str().contains(&hex::encode(meta.sha256)));
5204
5205        Ok(())
5206    }
5207}