pub struct Storage { /* private fields */ }Expand description
High-level storage handle that encapsulates database and artifact store clients.
Implementations§
Source§impl Storage
impl Storage
Sourcepub async fn connect(config: StorageConfig) -> Result<Self>
pub async fn connect(config: StorageConfig) -> Result<Self>
Connects to Postgres, applies migrations, and provisions the artifact store.
Sourcepub async fn connect_default() -> Result<Self>
pub async fn connect_default() -> Result<Self>
Convenience helper for default configuration (env-driven DATABASE_URL).
Sourcepub fn object_store(&self) -> Arc<DynObjectStore> ⓘ
pub fn object_store(&self) -> Arc<DynObjectStore> ⓘ
Returns a handle to the configured object store.
Sourcepub fn artifacts(&self) -> Arc<dyn ArtifactStore>
pub fn artifacts(&self) -> Arc<dyn ArtifactStore>
Returns the artifact store abstraction.
Sourcepub async fn return_step_to_queue(
&self,
run_id: Uuid,
step_id: Uuid,
) -> Result<()>
pub async fn return_step_to_queue( &self, run_id: Uuid, step_id: Uuid, ) -> Result<()>
Releases a leased step back to the queue without incrementing attempt counters.
Sourcepub async fn requeue_expired_leases(
&self,
limit: i64,
) -> Result<Vec<(Uuid, Uuid)>>
pub async fn requeue_expired_leases( &self, limit: i64, ) -> Result<Vec<(Uuid, Uuid)>>
Requeues expired leases back to the queued state and returns affected step IDs.
pub async fn defer_step_to_queue( &self, run_id: Uuid, step_id: Uuid, not_before: DateTime<Utc>, payload: Value, ) -> Result<()>
pub async fn defer_step_to_queue_tx( &self, tx: &mut Transaction<'_, Postgres>, run_id: Uuid, step_id: Uuid, not_before: DateTime<Utc>, payload: Value, ) -> Result<()>
pub async fn schedule_retry_tx( &self, tx: &mut Transaction<'_, Postgres>, run_id: Uuid, step_id: Uuid, attempt: i32, not_before: DateTime<Utc>, error_json: Value, ) -> Result<()>
pub async fn insert_cost_ledger_tx( &self, tx: &mut Transaction<'_, Postgres>, run_id: Uuid, step_id: Uuid, attempt: i32, status: &str, reserved_tokens: i64, reserved_cost: f64, actual_tokens: Option<i64>, actual_cost: Option<f64>, ) -> Result<()>
pub async fn insert_step_attempt_tx( &self, tx: &mut Transaction<'_, Postgres>, attempt: NewStepAttempt, ) -> Result<()>
pub async fn append_run_event_tx( &self, tx: &mut Transaction<'_, Postgres>, run_id: Uuid, step_id: Option<Uuid>, kind: &str, payload: &Value, ) -> Result<()>
pub async fn trigger_compensation_tx( &self, tx: &mut Transaction<'_, Postgres>, run_id: Uuid, origin_step: Uuid, compensation_step: Uuid, ) -> Result<()>
Sourcepub async fn fetch_unpublished_outbox(
&self,
limit: i64,
) -> Result<Vec<OutboxEvent>>
pub async fn fetch_unpublished_outbox( &self, limit: i64, ) -> Result<Vec<OutboxEvent>>
Returns unpublished outbox events up to limit, ordered by ID.
Sourcepub async fn mark_outbox_published(&self, id: i64) -> Result<()>
pub async fn mark_outbox_published(&self, id: i64) -> Result<()>
Marks an outbox row as published by setting published_at to the current timestamp.
Sourcepub async fn fetch_outbox_for_run_since(
&self,
run_id: Uuid,
after_id: i64,
limit: i64,
) -> Result<Vec<OutboxEvent>>
pub async fn fetch_outbox_for_run_since( &self, run_id: Uuid, after_id: i64, limit: i64, ) -> Result<Vec<OutboxEvent>>
Returns outbox events for a run that were created after the given id.
Sourcepub async fn fetch_last_outbox_event(
&self,
run_id: Uuid,
) -> Result<Option<OutboxEvent>>
pub async fn fetch_last_outbox_event( &self, run_id: Uuid, ) -> Result<Option<OutboxEvent>>
Returns the most recent outbox event for a run, if any.
Sourcepub async fn fetch_tap_offset(
&self,
run_id: Uuid,
cursor_id: &str,
) -> Result<Option<i64>>
pub async fn fetch_tap_offset( &self, run_id: Uuid, cursor_id: &str, ) -> Result<Option<i64>>
Fetch the last acknowledged offset for a subscription cursor, if present.
Sourcepub async fn upsert_tap_offset(
&self,
run_id: Uuid,
cursor_id: &str,
offset: i64,
) -> Result<()>
pub async fn upsert_tap_offset( &self, run_id: Uuid, cursor_id: &str, offset: i64, ) -> Result<()>
Advance the stored offset for a subscription cursor.
Sourcepub async fn apply_retention_policy(
&self,
cutoff: DateTime<Utc>,
) -> Result<RetentionStats>
pub async fn apply_retention_policy( &self, cutoff: DateTime<Utc>, ) -> Result<RetentionStats>
Applies retention policy by scrubbing input/output payloads for stale runs.
Sourcepub async fn try_consume_budget(
&self,
run_id: Uuid,
tokens: i64,
cost: f64,
) -> Result<BudgetResult>
pub async fn try_consume_budget( &self, run_id: Uuid, tokens: i64, cost: f64, ) -> Result<BudgetResult>
Attempts to consume the requested budget; returns granted totals or denial.
Sourcepub async fn fetch_budget(&self, run_id: Uuid) -> Result<Option<Budget>>
pub async fn fetch_budget(&self, run_id: Uuid) -> Result<Option<Budget>>
Fetches the current budget totals for a run, if configured.
Sourcepub async fn fetch_steps_for_run(&self, run_id: Uuid) -> Result<Vec<Step>>
pub async fn fetch_steps_for_run(&self, run_id: Uuid) -> Result<Vec<Step>>
Returns all step rows for a run ordered by index.
Sourcepub async fn mark_step_running(
&self,
run_id: Uuid,
step_id: Uuid,
worker_id: &str,
input_snapshot: Option<&Value>,
provider: Option<&str>,
provider_version: Option<&str>,
) -> Result<()>
pub async fn mark_step_running( &self, run_id: Uuid, step_id: Uuid, worker_id: &str, input_snapshot: Option<&Value>, provider: Option<&str>, provider_version: Option<&str>, ) -> Result<()>
Marks a leased step as running for the provided worker.
Sourcepub async fn update_step_estimate(
&self,
run_id: Uuid,
step_id: Uuid,
estimated_tokens: i64,
estimated_cost: f64,
) -> Result<()>
pub async fn update_step_estimate( &self, run_id: Uuid, step_id: Uuid, estimated_tokens: i64, estimated_cost: f64, ) -> Result<()>
Updates the stored budget estimate for a step.
Sourcepub async fn update_step_status_tx(
&self,
tx: &mut Transaction<'_, Postgres>,
run_id: Uuid,
step_id: Uuid,
status: StepStatus,
attempt: Option<i32>,
output_json: Option<Value>,
error_json: Option<Value>,
input_snapshot: Option<Value>,
output_snapshot: Option<Value>,
checkpoint: Option<Value>,
provider: Option<&str>,
provider_version: Option<&str>,
) -> Result<()>
pub async fn update_step_status_tx( &self, tx: &mut Transaction<'_, Postgres>, run_id: Uuid, step_id: Uuid, status: StepStatus, attempt: Option<i32>, output_json: Option<Value>, error_json: Option<Value>, input_snapshot: Option<Value>, output_snapshot: Option<Value>, checkpoint: Option<Value>, provider: Option<&str>, provider_version: Option<&str>, ) -> Result<()>
Updates the status, snapshot metadata, and output fields for a step.
pub async fn fetch_step_attempts_for_run( &self, run_id: Uuid, ) -> Result<Vec<StepAttempt>>
Sourcepub async fn update_step_usage_tx(
&self,
tx: &mut Transaction<'_, Postgres>,
run_id: Uuid,
step_id: Uuid,
actual_tokens: Option<i64>,
actual_cost: Option<f64>,
) -> Result<()>
pub async fn update_step_usage_tx( &self, tx: &mut Transaction<'_, Postgres>, run_id: Uuid, step_id: Uuid, actual_tokens: Option<i64>, actual_cost: Option<f64>, ) -> Result<()>
Persists the actual resource usage recorded for a step.
Sourcepub async fn reconcile_budget_usage_tx(
&self,
tx: &mut Transaction<'_, Postgres>,
run_id: Uuid,
tokens_delta: i64,
cost_delta: f64,
) -> Result<()>
pub async fn reconcile_budget_usage_tx( &self, tx: &mut Transaction<'_, Postgres>, run_id: Uuid, tokens_delta: i64, cost_delta: f64, ) -> Result<()>
Adjusts budget totals based on the difference between reserved and actual usage.
Sourcepub async fn insert_run(&self, run: NewRun) -> Result<Uuid>
pub async fn insert_run(&self, run: NewRun) -> Result<Uuid>
Inserts a run row with deterministic state materialization.
Sourcepub async fn insert_run_with_steps(
&self,
run: NewRun,
steps: &[NewStep],
edges: &[(Uuid, Uuid)],
) -> Result<Uuid>
pub async fn insert_run_with_steps( &self, run: NewRun, steps: &[NewStep], edges: &[(Uuid, Uuid)], ) -> Result<Uuid>
Inserts a run and its associated steps within a single transaction.
Sourcepub async fn unlock_successors_tx(
&self,
tx: &mut Transaction<'_, Postgres>,
run_id: Uuid,
step_id: Uuid,
) -> Result<Vec<Uuid>>
pub async fn unlock_successors_tx( &self, tx: &mut Transaction<'_, Postgres>, run_id: Uuid, step_id: Uuid, ) -> Result<Vec<Uuid>>
Decrements dependency counters for successors and queues newly-ready steps.
Sourcepub async fn skip_downstream_steps_tx(
&self,
tx: &mut Transaction<'_, Postgres>,
run_id: Uuid,
failed_step_id: Uuid,
) -> Result<Vec<Uuid>>
pub async fn skip_downstream_steps_tx( &self, tx: &mut Transaction<'_, Postgres>, run_id: Uuid, failed_step_id: Uuid, ) -> Result<Vec<Uuid>>
Marks all downstream blocked steps as skipped when a dependency fails.
Sourcepub async fn refresh_run_status_tx(
&self,
tx: &mut Transaction<'_, Postgres>,
run_id: Uuid,
) -> Result<(RunStatus, RunStatus)>
pub async fn refresh_run_status_tx( &self, tx: &mut Transaction<'_, Postgres>, run_id: Uuid, ) -> Result<(RunStatus, RunStatus)>
Recomputes the run status based on current step outcomes.
Sourcepub async fn insert_step(&self, step: NewStep) -> Result<()>
pub async fn insert_step(&self, step: NewStep) -> Result<()>
Inserts a step row tied to a run’s DAG.
Sourcepub async fn fetch_run(&self, run_id: Uuid) -> Result<Option<Run>>
pub async fn fetch_run(&self, run_id: Uuid) -> Result<Option<Run>>
Fetches a single run record by ID.
pub async fn update_run_input_ctx( &self, run_id: Uuid, input_ctx: Value, ) -> Result<()>
Sourcepub async fn update_run_artifact_transparency(
&self,
run_id: Uuid,
artifact_key: &str,
transparency: Value,
) -> Result<()>
pub async fn update_run_artifact_transparency( &self, run_id: Uuid, artifact_key: &str, transparency: Value, ) -> Result<()>
Updates the transparency receipt embedded in a run-level artifact entry.
Sourcepub async fn update_step_artifact_transparency(
&self,
run_id: Uuid,
step_id: Uuid,
artifact_key: &str,
transparency: Value,
) -> Result<()>
pub async fn update_step_artifact_transparency( &self, run_id: Uuid, step_id: Uuid, artifact_key: &str, transparency: Value, ) -> Result<()>
Updates the transparency receipt for a step-level artifact stored in steps.output_json.
pub async fn update_run_input_ctx_tx( &self, tx: &mut Transaction<'_, Postgres>, run_id: Uuid, input_ctx: Value, ) -> Result<()>
pub async fn set_run_status( &self, run_id: Uuid, status: RunStatus, ) -> Result<()>
pub async fn set_run_status_tx( &self, tx: &mut Transaction<'_, Postgres>, run_id: Uuid, status: RunStatus, ) -> Result<()>
Sourcepub async fn fetch_run_tenant(&self, run_id: Uuid) -> Result<Option<String>>
pub async fn fetch_run_tenant(&self, run_id: Uuid) -> Result<Option<String>>
Returns the tenant label for the given run, if one is set.
Sourcepub async fn enforce_tenant_quota(
&self,
tenant: &str,
tokens: i64,
cost: f64,
) -> Result<()>
pub async fn enforce_tenant_quota( &self, tenant: &str, tokens: i64, cost: f64, ) -> Result<()>
Ensures the provided tenant does not exceed the configured daily quota.
Sourcepub async fn record_billing_usage_tx(
&self,
tx: &mut Transaction<'_, Postgres>,
run_id: Uuid,
tokens: i64,
cost: f64,
) -> Result<()>
pub async fn record_billing_usage_tx( &self, tx: &mut Transaction<'_, Postgres>, run_id: Uuid, tokens: i64, cost: f64, ) -> Result<()>
Records billing usage for a run inside an existing transaction.
Sourcepub async fn list_recent_runs(&self, limit: i64) -> Result<Vec<Run>>
pub async fn list_recent_runs(&self, limit: i64) -> Result<Vec<Run>>
Lists recent runs ordered by creation time.
Sourcepub async fn fetch_run_by_idempotency_key(
&self,
idempotency_key: &str,
) -> Result<Option<Run>>
pub async fn fetch_run_by_idempotency_key( &self, idempotency_key: &str, ) -> Result<Option<Run>>
Fetches a run by idempotency key if present.
Sourcepub async fn upsert_artifact(&self, artifact: NewArtifact) -> Result<()>
pub async fn upsert_artifact(&self, artifact: NewArtifact) -> Result<()>
Persists artifact metadata while blob bytes live in the object store.
Sourcepub async fn enqueue_outbox_event(
&self,
tx: &mut Transaction<'_, Postgres>,
event: NewOutboxEvent,
) -> Result<i64>
pub async fn enqueue_outbox_event( &self, tx: &mut Transaction<'_, Postgres>, event: NewOutboxEvent, ) -> Result<i64>
Appends an outbox event within an existing transaction for exactly-once delivery.
Sourcepub async fn claim_queued_steps(
&self,
limit: i64,
worker_id: &str,
) -> Result<Vec<Step>>
pub async fn claim_queued_steps( &self, limit: i64, worker_id: &str, ) -> Result<Vec<Step>>
Claims queued steps using FOR UPDATE SKIP LOCKED for safe parallel scheduling.
pub async fn revert_step_to_queue( &self, run_id: Uuid, step_id: Uuid, ) -> Result<()>
pub async fn revert_step_to_queue_tx( &self, tx: &mut Transaction<'_, Postgres>, run_id: Uuid, step_id: Uuid, ) -> Result<()>
pub async fn update_step_spec( &self, run_id: Uuid, step_id: Uuid, spec_json: Value, ) -> Result<()>
pub async fn update_step_spec_tx( &self, tx: &mut Transaction<'_, Postgres>, run_id: Uuid, step_id: Uuid, spec_json: Value, ) -> Result<()>
Source§impl Storage
impl Storage
Sourcepub async fn upsert_eval_scenario(
&self,
name: &str,
description: Option<&str>,
spec: Value,
) -> Result<Uuid>
pub async fn upsert_eval_scenario( &self, name: &str, description: Option<&str>, spec: Value, ) -> Result<Uuid>
Registers or updates an evaluation scenario definition.
Sourcepub async fn fetch_eval_scenario_by_name(
&self,
name: &str,
) -> Result<Option<EvalScenario>>
pub async fn fetch_eval_scenario_by_name( &self, name: &str, ) -> Result<Option<EvalScenario>>
Fetches an evaluation scenario by its unique name.
Sourcepub async fn list_eval_scenarios(&self) -> Result<Vec<EvalScenario>>
pub async fn list_eval_scenarios(&self) -> Result<Vec<EvalScenario>>
Lists all evaluation scenarios ordered by name.
Sourcepub async fn latest_eval_result(
&self,
scenario_id: Uuid,
) -> Result<Option<EvalResult>>
pub async fn latest_eval_result( &self, scenario_id: Uuid, ) -> Result<Option<EvalResult>>
Returns the latest evaluation result for a scenario, if any.
Sourcepub async fn list_eval_results(
&self,
scenario_id: Uuid,
limit: i64,
) -> Result<Vec<EvalResult>>
pub async fn list_eval_results( &self, scenario_id: Uuid, limit: i64, ) -> Result<Vec<EvalResult>>
Lists evaluation results for a scenario, newest first.
Sourcepub async fn insert_eval_result(
&self,
scenario_id: Uuid,
run_id: Uuid,
outcome: &str,
metrics: Value,
) -> Result<EvalResult>
pub async fn insert_eval_result( &self, scenario_id: Uuid, run_id: Uuid, outcome: &str, metrics: Value, ) -> Result<EvalResult>
Inserts or replaces an evaluation result for the given scenario/run pair.
Sourcepub async fn insert_change_gate(
&self,
gate: NewChangeGate,
) -> Result<ChangeGate>
pub async fn insert_change_gate( &self, gate: NewChangeGate, ) -> Result<ChangeGate>
Records a ChangeOps gate decision and returns the stored record.
Sourcepub async fn update_change_gate_metadata(
&self,
gate_id: Uuid,
metadata: Value,
) -> Result<()>
pub async fn update_change_gate_metadata( &self, gate_id: Uuid, metadata: Value, ) -> Result<()>
Replaces the metadata JSON stored for a change gate decision.
Sourcepub async fn list_change_gates_for_change(
&self,
change_id: &str,
limit: i64,
) -> Result<Vec<ChangeGate>>
pub async fn list_change_gates_for_change( &self, change_id: &str, limit: i64, ) -> Result<Vec<ChangeGate>>
Fetches the latest gate decision for a change id (limited by limit).
Sourcepub async fn fetch_change_gate(
&self,
gate_id: Uuid,
) -> Result<Option<ChangeGate>>
pub async fn fetch_change_gate( &self, gate_id: Uuid, ) -> Result<Option<ChangeGate>>
Retrieves a specific gate decision by gate id.
Sourcepub async fn enqueue_transparency_job(
&self,
payload: TransparencyJobPayload,
) -> Result<i64>
pub async fn enqueue_transparency_job( &self, payload: TransparencyJobPayload, ) -> Result<i64>
Enqueues a transparency job for asynchronous publishing.
Sourcepub async fn fetch_pending_transparency_jobs(
&self,
limit: i64,
) -> Result<Vec<TransparencyJob>>
pub async fn fetch_pending_transparency_jobs( &self, limit: i64, ) -> Result<Vec<TransparencyJob>>
Returns pending transparency jobs up to the requested limit.
Sourcepub async fn fetch_transparency_jobs_by_ids(
&self,
job_ids: &[i64],
) -> Result<Vec<TransparencyJob>>
pub async fn fetch_transparency_jobs_by_ids( &self, job_ids: &[i64], ) -> Result<Vec<TransparencyJob>>
Fetches transparency jobs by explicit identifiers.
Sourcepub async fn mark_transparency_job_processed(
&self,
job_id: i64,
receipt: Value,
) -> Result<()>
pub async fn mark_transparency_job_processed( &self, job_id: i64, receipt: Value, ) -> Result<()>
Records a successful transparency job along with the receipt payload.
Sourcepub async fn mark_transparency_job_failed(
&self,
job_id: i64,
error: String,
) -> Result<()>
pub async fn mark_transparency_job_failed( &self, job_id: i64, error: String, ) -> Result<()>
Marks a transparency job failure so the writer can retry later.
Sourcepub async fn insert_change_gate_followup(
&self,
followup: NewChangeGateFollowup,
) -> Result<ChangeGateFollowup>
pub async fn insert_change_gate_followup( &self, followup: NewChangeGateFollowup, ) -> Result<ChangeGateFollowup>
Records a follow-up acknowledgement for a gate.
Sourcepub async fn fetch_change_gate_followups(
&self,
gate_id: Uuid,
) -> Result<Vec<ChangeGateFollowup>>
pub async fn fetch_change_gate_followups( &self, gate_id: Uuid, ) -> Result<Vec<ChangeGateFollowup>>
Fetches all follow-up records for a gate in chronological order.
Trait Implementations§
Source§impl RunStore for Storage
impl RunStore for Storage
fn insert_run_with_steps<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
new_run: NewRun,
steps: &'life1 [NewStep],
edges: &'life2 [(Uuid, Uuid)],
) -> Pin<Box<dyn Future<Output = Result<Uuid>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn fetch_run<'life0, 'async_trait>(
&'life0 self,
run_id: Uuid,
) -> Pin<Box<dyn Future<Output = Result<Option<Run>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn fetch_steps_for_run<'life0, 'async_trait>(
&'life0 self,
run_id: Uuid,
) -> Pin<Box<dyn Future<Output = Result<Vec<Step>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Auto Trait Implementations§
impl Freeze for Storage
impl !RefUnwindSafe for Storage
impl Send for Storage
impl Sync for Storage
impl Unpin for Storage
impl !UnwindSafe for Storage
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request