fleetforge_trust/
vault_object_store.rs

1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use bytes::Bytes;
4use object_store::{path::Path, DynObjectStore as ObjectStoreDyn, PutPayload};
5use sqlx::{Pool, Postgres};
6use std::sync::Arc;
7use uuid::Uuid;
8
9use crate::{digest_bytes, Attestation, AttestationVault, TrustSubject};
10
11type DynObjectStore = Arc<ObjectStoreDyn>;
12
13/// Object store backed attestation vault with a Postgres index.
14pub struct ObjectStoreAttestationVault {
15    store: DynObjectStore,
16    pool: Pool<Postgres>,
17}
18
19impl ObjectStoreAttestationVault {
20    pub fn new(store: DynObjectStore, pool: Pool<Postgres>) -> Self {
21        Self { store, pool }
22    }
23
24    fn path_prefix(digest_hex: &str) -> Path {
25        let prefix_a = &digest_hex[0..2];
26        let prefix_b = &digest_hex[2..4];
27        Path::from(format!(
28            "attestations/{}/{}/{}.json",
29            prefix_a, prefix_b, digest_hex
30        ))
31    }
32
33    fn subject_descriptor(subject: &TrustSubject) -> (Option<String>, Option<String>) {
34        match subject {
35            TrustSubject::Run { run_id } => {
36                (Some(format!("run:{}", run_id)), Some("run".to_string()))
37            }
38            TrustSubject::Step { run_id, step_id } => (
39                Some(format!("run:{}:step:{}", run_id, step_id)),
40                Some("step".to_string()),
41            ),
42            TrustSubject::Tool { name } => {
43                (Some(format!("tool:{}", name)), Some("tool".to_string()))
44            }
45            TrustSubject::Artifact { uri } => (Some(uri.clone()), Some("artifact".to_string())),
46            TrustSubject::CapabilityToken { token_id } => (
47                Some(format!("capability:{}", token_id)),
48                Some("capability_token".to_string()),
49            ),
50            TrustSubject::Custom { label } => (Some(label.clone()), Some("custom".to_string())),
51        }
52    }
53
54    fn attestation_subject(attestation: &Attestation) -> (Option<String>, Option<String>) {
55        attestation
56            .subject
57            .as_ref()
58            .map(Self::subject_descriptor)
59            .unwrap_or((None, None))
60    }
61
62    async fn load_blob(&self, object_path: &str) -> Result<Attestation> {
63        let path = Path::from(object_path);
64        let reader = self
65            .store
66            .get(&path)
67            .await
68            .with_context(|| format!("failed to fetch attestation blob {}", object_path))?;
69        let bytes = reader
70            .bytes()
71            .await
72            .with_context(|| format!("failed to read attestation blob {}", object_path))?;
73        serde_json::from_slice(&bytes)
74            .with_context(|| format!("failed to deserialize attestation {}", object_path))
75    }
76}
77
78#[async_trait]
79impl AttestationVault for ObjectStoreAttestationVault {
80    async fn record(&self, attestation: Attestation) -> Result<Uuid> {
81        let bytes = serde_json::to_vec(&attestation).context("failed to serialize attestation")?;
82        let digest_hex = digest_bytes(&bytes);
83        let path = Self::path_prefix(&digest_hex);
84
85        self.store
86            .put(&path, PutPayload::from(Bytes::from(bytes)))
87            .await
88            .with_context(|| format!("failed to persist attestation {}", attestation.id))?;
89
90        let (subject_id, subject_kind) = Self::attestation_subject(&attestation);
91
92        sqlx::query(
93            r#"
94            insert into attestation_records (
95                attestation_id,
96                issued_at,
97                subject_id,
98                subject_kind,
99                attestation_sha256,
100                object_path
101            )
102            values ($1, $2, $3, $4, $5, $6)
103            "#,
104        )
105        .bind(attestation.id)
106        .bind(attestation.issued_at)
107        .bind(subject_id)
108        .bind(subject_kind)
109        .bind(digest_hex)
110        .bind(path.to_string())
111        .execute(&self.pool)
112        .await
113        .with_context(|| format!("failed to index attestation {}", attestation.id))?;
114
115        Ok(attestation.id)
116    }
117
118    async fn fetch(&self, id: Uuid) -> Result<Option<Attestation>> {
119        let row = sqlx::query_as::<_, (Uuid, String)>(
120            r#"
121            select attestation_id, object_path
122            from attestation_records
123            where attestation_id = $1
124            "#,
125        )
126        .bind(id)
127        .fetch_optional(&self.pool)
128        .await?;
129
130        match row {
131            Some((_record_id, object_path)) => Ok(Some(self.load_blob(&object_path).await?)),
132            None => Ok(None),
133        }
134    }
135
136    async fn list_by_subject(&self, subject: &TrustSubject) -> Result<Vec<Attestation>> {
137        let (subject_id, subject_kind) = Self::subject_descriptor(subject);
138        let (Some(id), Some(kind)) = (subject_id, subject_kind) else {
139            return Ok(Vec::new());
140        };
141
142        let rows = sqlx::query_as::<_, (Uuid, String)>(
143            r#"
144            select attestation_id, object_path
145            from attestation_records
146            where subject_id = $1
147              and subject_kind = $2
148            order by created_at asc
149            "#,
150        )
151        .bind(id)
152        .bind(kind)
153        .fetch_all(&self.pool)
154        .await?;
155
156        let mut attestations = Vec::with_capacity(rows.len());
157        for (_, object_path) in rows {
158            attestations.push(self.load_blob(&object_path).await?);
159        }
160        Ok(attestations)
161    }
162}