fleetforge_storage/
retention.rs1use anyhow::Result;
2use object_store::path::Path;
3use object_store::{Error as ObjectStoreError, ObjectStore};
4use sqlx::PgPool;
5use tracing::{info, warn};
6use uuid::Uuid;
7
8#[derive(Debug, Clone, Copy)]
10pub struct RetentionConfig {
11 pub run_days: i64,
12 pub artifact_days: i64,
13}
14
15impl RetentionConfig {
16 pub fn is_enabled(&self) -> bool {
17 self.run_days > 0 || self.artifact_days > 0
18 }
19}
20
21pub async fn delete_expired(
23 pool: &PgPool,
24 store: &dyn ObjectStore,
25 cfg: &RetentionConfig,
26) -> Result<()> {
27 if cfg.run_days > 0 {
28 let deleted_runs: Vec<Uuid> = sqlx::query_scalar(
29 r#"
30 delete from runs
31 where status in ('succeeded', 'failed', 'canceled')
32 and created_at < now() - ($1 * interval '1 day')
33 returning run_id
34 "#,
35 )
36 .bind(cfg.run_days)
37 .fetch_all(pool)
38 .await?;
39
40 if !deleted_runs.is_empty() {
41 info!(
42 count = deleted_runs.len(),
43 "deleted terminal runs past retention window"
44 );
45 }
46 }
47
48 if cfg.artifact_days > 0 {
49 let digests: Vec<Vec<u8>> = sqlx::query_scalar(
50 r#"
51 select sha256
52 from artifacts
53 where created_at < now() - ($1 * interval '1 day')
54 "#,
55 )
56 .bind(cfg.artifact_days)
57 .fetch_all(pool)
58 .await?;
59
60 if !digests.is_empty() {
61 for digest in &digests {
62 let path = digest_path(digest);
63 match store.delete(&path).await {
64 Ok(_) => {}
65 Err(ObjectStoreError::NotFound { .. }) => {
66 warn!(path = %path, "artifact already removed from object store");
67 }
68 Err(err) => {
69 warn!(path = %path, error = %err, "failed to delete artifact from object store");
70 }
71 }
72 }
73
74 sqlx::query(
75 r#"
76 delete from artifacts
77 where created_at < now() - ($1 * interval '1 day')
78 "#,
79 )
80 .bind(cfg.artifact_days)
81 .execute(pool)
82 .await?;
83
84 info!(
85 count = digests.len(),
86 "deleted artifact rows past retention window"
87 );
88 }
89 }
90
91 Ok(())
92}
93
94fn digest_path(digest: &[u8]) -> Path {
95 let hex = hex::encode(digest);
96 let prefix_a = &hex[0..2];
97 let prefix_b = &hex[2..4];
98 Path::from(format!("sha256/{}/{}/{}", prefix_a, prefix_b, hex))
99}
100
101#[cfg(test)]
102mod tests {
103 use super::*;
104 use bytes::Bytes;
105 use object_store::memory::InMemory;
106 use sqlx::postgres::PgPoolOptions;
107 use testcontainers::clients::Cli;
108 use testcontainers::images::postgres::Postgres;
109 use tokio::time::Duration;
110
111 fn docker_available() -> bool {
112 std::fs::metadata("/var/run/docker.sock").is_ok() || std::env::var("DOCKER_HOST").is_ok()
113 }
114
115 #[tokio::test]
116 async fn deletes_runs_and_artifacts() -> Result<()> {
117 if !docker_available() {
118 eprintln!("Skipping retention test because Docker is unavailable");
119 return Ok(());
120 }
121
122 let docker = Cli::default();
123 let container = docker.run(Postgres::default());
124 let port = container.get_host_port_ipv4(5432);
125 let database_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
126
127 let pool = PgPoolOptions::new()
128 .max_connections(2)
129 .acquire_timeout(Duration::from_secs(5))
130 .connect(&database_url)
131 .await?;
132
133 let run_id: Uuid = sqlx::query_scalar("select gen_random_uuid()")
135 .fetch_one(&pool)
136 .await?;
137 sqlx::query(
138 r#"
139 insert into runs (run_id, status, dag_json, input_ctx, seed, created_at)
140 values ($1, 'succeeded', '{}'::jsonb, '{}'::jsonb, 0, now() - interval '10 days')
141 "#,
142 )
143 .bind(run_id)
144 .execute(&pool)
145 .await?;
146
147 let sha = hex::decode("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
149 .expect("valid hex");
150 sqlx::query(
151 r#"
152 insert into artifacts (sha256, media_type, size_bytes, metadata, created_at)
153 values ($1, 'application/octet-stream', 4, '{}'::jsonb, now() - interval '10 days')
154 "#,
155 )
156 .bind(&sha)
157 .execute(&pool)
158 .await?;
159
160 let store = InMemory::new();
162 let path = digest_path(&sha);
163 store
164 .put(
165 &path,
166 object_store::PutPayload::from_bytes(Bytes::from_static(b"test")),
167 )
168 .await
169 .expect("write artifact");
170
171 let cfg = RetentionConfig {
172 run_days: 7,
173 artifact_days: 7,
174 };
175 delete_expired(&pool, &store, &cfg).await?;
176
177 let remaining_runs: i64 = sqlx::query_scalar("select count(*) from runs where run_id = $1")
178 .bind(run_id)
179 .fetch_one(&pool)
180 .await?;
181 assert_eq!(remaining_runs, 0);
182
183 let remaining_artifacts: i64 =
184 sqlx::query_scalar("select count(*) from artifacts where sha256 = $1")
185 .bind(&sha)
186 .fetch_one(&pool)
187 .await?;
188 assert_eq!(remaining_artifacts, 0);
189
190 let get_res = store.get(&path).await;
192 assert!(
193 matches!(get_res, Err(ObjectStoreError::NotFound { .. })),
194 "expected object to be removed"
195 );
196
197 drop(container);
198 Ok(())
199 }
200}