fleetforge_storage/
retention.rs

1use anyhow::Result;
2use object_store::path::Path;
3use object_store::{Error as ObjectStoreError, ObjectStore};
4use sqlx::PgPool;
5use tracing::{info, warn};
6use uuid::Uuid;
7
8/// Configuration for data retention sweeps.
9#[derive(Debug, Clone, Copy)]
10pub struct RetentionConfig {
11    pub run_days: i64,
12    pub artifact_days: i64,
13}
14
15impl RetentionConfig {
16    pub fn is_enabled(&self) -> bool {
17        self.run_days > 0 || self.artifact_days > 0
18    }
19}
20
21/// Deletes terminal runs and aged artifacts according to the provided configuration.
22pub async fn delete_expired(
23    pool: &PgPool,
24    store: &dyn ObjectStore,
25    cfg: &RetentionConfig,
26) -> Result<()> {
27    if cfg.run_days > 0 {
28        let deleted_runs: Vec<Uuid> = sqlx::query_scalar(
29            r#"
30            delete from runs
31            where status in ('succeeded', 'failed', 'canceled')
32              and created_at < now() - ($1 * interval '1 day')
33            returning run_id
34            "#,
35        )
36        .bind(cfg.run_days)
37        .fetch_all(pool)
38        .await?;
39
40        if !deleted_runs.is_empty() {
41            info!(
42                count = deleted_runs.len(),
43                "deleted terminal runs past retention window"
44            );
45        }
46    }
47
48    if cfg.artifact_days > 0 {
49        let digests: Vec<Vec<u8>> = sqlx::query_scalar(
50            r#"
51            select sha256
52            from artifacts
53            where created_at < now() - ($1 * interval '1 day')
54            "#,
55        )
56        .bind(cfg.artifact_days)
57        .fetch_all(pool)
58        .await?;
59
60        if !digests.is_empty() {
61            for digest in &digests {
62                let path = digest_path(digest);
63                match store.delete(&path).await {
64                    Ok(_) => {}
65                    Err(ObjectStoreError::NotFound { .. }) => {
66                        warn!(path = %path, "artifact already removed from object store");
67                    }
68                    Err(err) => {
69                        warn!(path = %path, error = %err, "failed to delete artifact from object store");
70                    }
71                }
72            }
73
74            sqlx::query(
75                r#"
76                delete from artifacts
77                where created_at < now() - ($1 * interval '1 day')
78                "#,
79            )
80            .bind(cfg.artifact_days)
81            .execute(pool)
82            .await?;
83
84            info!(
85                count = digests.len(),
86                "deleted artifact rows past retention window"
87            );
88        }
89    }
90
91    Ok(())
92}
93
94fn digest_path(digest: &[u8]) -> Path {
95    let hex = hex::encode(digest);
96    let prefix_a = &hex[0..2];
97    let prefix_b = &hex[2..4];
98    Path::from(format!("sha256/{}/{}/{}", prefix_a, prefix_b, hex))
99}
100
101#[cfg(test)]
102mod tests {
103    use super::*;
104    use bytes::Bytes;
105    use object_store::memory::InMemory;
106    use sqlx::postgres::PgPoolOptions;
107    use testcontainers::clients::Cli;
108    use testcontainers::images::postgres::Postgres;
109    use tokio::time::Duration;
110
111    fn docker_available() -> bool {
112        std::fs::metadata("/var/run/docker.sock").is_ok() || std::env::var("DOCKER_HOST").is_ok()
113    }
114
115    #[tokio::test]
116    async fn deletes_runs_and_artifacts() -> Result<()> {
117        if !docker_available() {
118            eprintln!("Skipping retention test because Docker is unavailable");
119            return Ok(());
120        }
121
122        let docker = Cli::default();
123        let container = docker.run(Postgres::default());
124        let port = container.get_host_port_ipv4(5432);
125        let database_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
126
127        let pool = PgPoolOptions::new()
128            .max_connections(2)
129            .acquire_timeout(Duration::from_secs(5))
130            .connect(&database_url)
131            .await?;
132
133        // Seed runs with terminal status and old timestamp.
134        let run_id: Uuid = sqlx::query_scalar("select gen_random_uuid()")
135            .fetch_one(&pool)
136            .await?;
137        sqlx::query(
138            r#"
139            insert into runs (run_id, status, dag_json, input_ctx, seed, created_at)
140            values ($1, 'succeeded', '{}'::jsonb, '{}'::jsonb, 0, now() - interval '10 days')
141            "#,
142        )
143        .bind(run_id)
144        .execute(&pool)
145        .await?;
146
147        // Seed artifacts row.
148        let sha = hex::decode("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
149            .expect("valid hex");
150        sqlx::query(
151            r#"
152            insert into artifacts (sha256, media_type, size_bytes, metadata, created_at)
153            values ($1, 'application/octet-stream', 4, '{}'::jsonb, now() - interval '10 days')
154            "#,
155        )
156        .bind(&sha)
157        .execute(&pool)
158        .await?;
159
160        // Populate in-memory object store with matching path.
161        let store = InMemory::new();
162        let path = digest_path(&sha);
163        store
164            .put(
165                &path,
166                object_store::PutPayload::from_bytes(Bytes::from_static(b"test")),
167            )
168            .await
169            .expect("write artifact");
170
171        let cfg = RetentionConfig {
172            run_days: 7,
173            artifact_days: 7,
174        };
175        delete_expired(&pool, &store, &cfg).await?;
176
177        let remaining_runs: i64 = sqlx::query_scalar("select count(*) from runs where run_id = $1")
178            .bind(run_id)
179            .fetch_one(&pool)
180            .await?;
181        assert_eq!(remaining_runs, 0);
182
183        let remaining_artifacts: i64 =
184            sqlx::query_scalar("select count(*) from artifacts where sha256 = $1")
185                .bind(&sha)
186                .fetch_one(&pool)
187                .await?;
188        assert_eq!(remaining_artifacts, 0);
189
190        // Ensure object store object is gone.
191        let get_res = store.get(&path).await;
192        assert!(
193            matches!(get_res, Err(ObjectStoreError::NotFound { .. })),
194            "expected object to be removed"
195        );
196
197        drop(container);
198        Ok(())
199    }
200}