fleetforge_trust/
vault_object_store.rs1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use bytes::Bytes;
4use object_store::{path::Path, DynObjectStore as ObjectStoreDyn, PutPayload};
5use sqlx::{Pool, Postgres};
6use std::sync::Arc;
7use uuid::Uuid;
8
9use crate::{digest_bytes, Attestation, AttestationVault, TrustSubject};
10
11type DynObjectStore = Arc<ObjectStoreDyn>;
12
13pub struct ObjectStoreAttestationVault {
15 store: DynObjectStore,
16 pool: Pool<Postgres>,
17}
18
19impl ObjectStoreAttestationVault {
20 pub fn new(store: DynObjectStore, pool: Pool<Postgres>) -> Self {
21 Self { store, pool }
22 }
23
24 fn path_prefix(digest_hex: &str) -> Path {
25 let prefix_a = &digest_hex[0..2];
26 let prefix_b = &digest_hex[2..4];
27 Path::from(format!(
28 "attestations/{}/{}/{}.json",
29 prefix_a, prefix_b, digest_hex
30 ))
31 }
32
33 fn subject_descriptor(subject: &TrustSubject) -> (Option<String>, Option<String>) {
34 match subject {
35 TrustSubject::Run { run_id } => {
36 (Some(format!("run:{}", run_id)), Some("run".to_string()))
37 }
38 TrustSubject::Step { run_id, step_id } => (
39 Some(format!("run:{}:step:{}", run_id, step_id)),
40 Some("step".to_string()),
41 ),
42 TrustSubject::Tool { name } => {
43 (Some(format!("tool:{}", name)), Some("tool".to_string()))
44 }
45 TrustSubject::Artifact { uri } => (Some(uri.clone()), Some("artifact".to_string())),
46 TrustSubject::CapabilityToken { token_id } => (
47 Some(format!("capability:{}", token_id)),
48 Some("capability_token".to_string()),
49 ),
50 TrustSubject::Custom { label } => (Some(label.clone()), Some("custom".to_string())),
51 }
52 }
53
54 fn attestation_subject(attestation: &Attestation) -> (Option<String>, Option<String>) {
55 attestation
56 .subject
57 .as_ref()
58 .map(Self::subject_descriptor)
59 .unwrap_or((None, None))
60 }
61
62 async fn load_blob(&self, object_path: &str) -> Result<Attestation> {
63 let path = Path::from(object_path);
64 let reader = self
65 .store
66 .get(&path)
67 .await
68 .with_context(|| format!("failed to fetch attestation blob {}", object_path))?;
69 let bytes = reader
70 .bytes()
71 .await
72 .with_context(|| format!("failed to read attestation blob {}", object_path))?;
73 serde_json::from_slice(&bytes)
74 .with_context(|| format!("failed to deserialize attestation {}", object_path))
75 }
76}
77
78#[async_trait]
79impl AttestationVault for ObjectStoreAttestationVault {
80 async fn record(&self, attestation: Attestation) -> Result<Uuid> {
81 let bytes = serde_json::to_vec(&attestation).context("failed to serialize attestation")?;
82 let digest_hex = digest_bytes(&bytes);
83 let path = Self::path_prefix(&digest_hex);
84
85 self.store
86 .put(&path, PutPayload::from(Bytes::from(bytes)))
87 .await
88 .with_context(|| format!("failed to persist attestation {}", attestation.id))?;
89
90 let (subject_id, subject_kind) = Self::attestation_subject(&attestation);
91
92 sqlx::query(
93 r#"
94 insert into attestation_records (
95 attestation_id,
96 issued_at,
97 subject_id,
98 subject_kind,
99 attestation_sha256,
100 object_path
101 )
102 values ($1, $2, $3, $4, $5, $6)
103 "#,
104 )
105 .bind(attestation.id)
106 .bind(attestation.issued_at)
107 .bind(subject_id)
108 .bind(subject_kind)
109 .bind(digest_hex)
110 .bind(path.to_string())
111 .execute(&self.pool)
112 .await
113 .with_context(|| format!("failed to index attestation {}", attestation.id))?;
114
115 Ok(attestation.id)
116 }
117
118 async fn fetch(&self, id: Uuid) -> Result<Option<Attestation>> {
119 let row = sqlx::query_as::<_, (Uuid, String)>(
120 r#"
121 select attestation_id, object_path
122 from attestation_records
123 where attestation_id = $1
124 "#,
125 )
126 .bind(id)
127 .fetch_optional(&self.pool)
128 .await?;
129
130 match row {
131 Some((_record_id, object_path)) => Ok(Some(self.load_blob(&object_path).await?)),
132 None => Ok(None),
133 }
134 }
135
136 async fn list_by_subject(&self, subject: &TrustSubject) -> Result<Vec<Attestation>> {
137 let (subject_id, subject_kind) = Self::subject_descriptor(subject);
138 let (Some(id), Some(kind)) = (subject_id, subject_kind) else {
139 return Ok(Vec::new());
140 };
141
142 let rows = sqlx::query_as::<_, (Uuid, String)>(
143 r#"
144 select attestation_id, object_path
145 from attestation_records
146 where subject_id = $1
147 and subject_kind = $2
148 order by created_at asc
149 "#,
150 )
151 .bind(id)
152 .bind(kind)
153 .fetch_all(&self.pool)
154 .await?;
155
156 let mut attestations = Vec::with_capacity(rows.len());
157 for (_, object_path) in rows {
158 attestations.push(self.load_blob(&object_path).await?);
159 }
160 Ok(attestations)
161 }
162}