fleetforge_runtime/
lib.rs

1//! FleetForge runtime state machine and orchestrator components.
2
3pub mod adapters;
4pub mod aibom;
5mod capability;
6pub mod executor;
7pub mod features;
8pub mod gateway;
9pub mod guardrails;
10pub mod memory;
11pub mod model;
12pub mod otel;
13pub mod policy;
14pub mod replay;
15pub mod scheduler;
16
17use std::sync::Arc;
18
19use anyhow::{anyhow, Result};
20use chrono::{Duration as ChronoDuration, Utc};
21use fleetforge_bus::{run_outbox_forwarder_with_exporters, BusConfig};
22use fleetforge_common::licensing::{feature_allowed, LicensedFeature};
23use fleetforge_runtime_api::start_api_server;
24use fleetforge_storage::retention::{
25    delete_expired as purge_retention, RetentionConfig as StorageRetentionConfig,
26};
27use fleetforge_storage::Storage;
28use fleetforge_telemetry::{audit::Audit, TelemetryGuard};
29use tracing::{info, warn};
30
31pub use adapters::{AgentRunAction, AgentRunAdapter, AgentRunContext, AgentRunEnvelope};
32#[cfg(feature = "insecure-shell")]
33pub use executor::ShellExecutor;
34pub use executor::{
35    BudgetCtx, DeterministicReplayExecutor, DockerToolExecutor, ExecutorRegistry,
36    FirecrackerToolExecutor, HttpProxyExecutor, LangGraphAgentExecutor, LlmExecutor, ResourceUsage,
37    StepCtx, StepExecutionResult, StepExecutor, ToolRouterExecutor,
38};
39pub use features::{set_trust_mesh_alpha, trust_mesh_alpha_enabled};
40pub use fleetforge_common::schemas::{
41    AGENT_RUN_SCHEMA_JSON, ARTIFACT_SCHEMA_JSON, RUN_EVENT_SCHEMA_JSON, RUN_SCHEMA_JSON,
42    RUN_SPEC_SCHEMA_JSON, STEP_SCHEMA_JSON,
43};
44pub use fleetforge_common::validation::{
45    validate_agent_run, validate_artifact, validate_run, validate_run_event, validate_run_spec,
46    validate_step_spec, SchemaErrorDetail, SchemaValidationError, StepSpecError,
47    StepSpecErrorDetail,
48};
49pub use fleetforge_policy::{
50    shared_default_pack, AllowAllPolicy, BasicPiiPolicy, Decision as PolicyDecision, PolicyEngine,
51    PolicyRequest, RegulatedPack, RegulatedVertical,
52};
53pub use fleetforge_prompt::{
54    CompiledPrompt, ModelResponse, ModelUsage, PromptCompiler, PromptRegistry,
55};
56pub use fleetforge_trust::{
57    digest_bytes, digest_json, Attestation, AttestationVault, InMemoryAttestationVault,
58    ObjectStoreAttestationVault, Trust, TrustBoundary, TrustDecision, TrustOrigin, TrustSource,
59    TrustSubject, TrustVerdict, Trusted, Untrusted, TRUST_MESH_ALPHA_FLAG,
60};
61pub use gateway::speech::{NoopStt, NoopTts, SpeechToText, TextToSpeech};
62pub use gateway::{
63    anthropic::{AnthropicConfig, AnthropicLanguageModel},
64    openai::{OpenAiConfig, OpenAiLanguageModel},
65    GatewayRegistry, LanguageModel, ToolAdapter,
66};
67pub use guardrails::{OutputGuardPolicy, PolicyBundle, PromptInjectionFallbackPolicy};
68pub use memory::{InMemoryAdapter, MemoryAdapter, MemoryRecord, VectorMatch, VectorMemoryAdapter};
69pub use model::{
70    ChatMessage, ChatRole, ContextSource, LlmInputs, QueuedStep, ResponseSchema, RunId, RunStatus,
71    StepId, StepSpec, StepStatus, StepType, ToolChoice, ToolSpec,
72};
73pub use policy::{enforce_policy, PolicyOutcome, RuntimePolicyPack};
74pub use replay::{
75    DriftTolerance, ReplayExecutor, ReplayOutcome, ReplayStrategy, StepDrift, StepReplay,
76};
77pub use scheduler::{Scheduler, SchedulerConfig};
78
79#[cfg(test)]
80mod memory_test;
81
82#[cfg(test)]
83mod e2e {
84    use super::*;
85    use crate::replay::{DriftTolerance, ReplayExecutor, ReplayStrategy};
86    use anyhow::{anyhow, Context, Result as AnyResult};
87    use base64::engine::general_purpose::STANDARD as BASE64;
88    use base64::Engine;
89    use chrono::Utc;
90    use clickhouse::Client;
91    use fleetforge_common::prost_json::{json_to_prost_struct, prost_struct_to_json};
92    use fleetforge_contracts::runtime::{
93        runtime_service_client::RuntimeServiceClient, GetRunRequest, RunSpec, SubmitRunRequest,
94    };
95    use fleetforge_contracts::tap::{
96        self, subscribe_run_request, tap_service_client::TapServiceClient, RunEventEnvelope,
97        SubscribeRunAck, SubscribeRunOpen, SubscribeRunRequest,
98    };
99    use fleetforge_policy::AllowAllPolicy;
100    use fleetforge_storage::{models, object_store::ObjectStoreConfig, StorageConfig};
101    use fleetforge_telemetry::audit::Audit;
102    use fleetforge_telemetry::otel::{self as telemetry_otel, Value as OtelValue};
103    use fleetforge_trust::{verify_capability_token, CapabilityToken, TrustSubject};
104    use hmac::{Hmac, Mac};
105    use object_store::path::Path;
106    use opentelemetry_sdk::export::trace::{ExportResult, SpanData, SpanExporter};
107    use opentelemetry_sdk::trace;
108    use reqwest::StatusCode;
109    use serde_json::{json, Value};
110    use sha2::Sha256;
111    use sqlx::Row;
112    use std::{
113        collections::{BTreeMap, HashMap, HashSet},
114        fs,
115        net::TcpListener,
116        path::PathBuf,
117        sync::{Arc, Mutex},
118        time::Duration,
119    };
120    use testcontainers::clients::Cli;
121    use testcontainers::core::WaitFor;
122    use testcontainers::images::generic::GenericImage;
123    use testcontainers::images::postgres::Postgres;
124    use tokio::sync::mpsc;
125    use tokio::time::sleep;
126    use tokio_stream::wrappers::ReceiverStream;
127    use tonic::metadata::MetadataValue;
128    use tracing::subscriber::DefaultGuard;
129    use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Registry};
130    use uuid::Uuid;
131
132    struct EchoToolExecutor;
133
134    #[async_trait::async_trait]
135    impl StepExecutor for EchoToolExecutor {
136        fn kind(&self) -> &'static str {
137            "tool"
138        }
139
140        async fn execute(&self, ctx: &StepCtx) -> anyhow::Result<StepExecutionResult> {
141            let command = ctx
142                .untrusted_inputs
143                .as_ref()
144                .get("command")
145                .and_then(Value::as_array)
146                .ok_or_else(|| anyhow!("missing command array"))?;
147            let message = command
148                .iter()
149                .skip(1)
150                .filter_map(Value::as_str)
151                .collect::<Vec<_>>()
152                .join(" ");
153            let stdout = format!("{}\n", message);
154            let output = json!({
155                "stdout": stdout,
156                "stderr": "",
157                "exit_code": 0,
158            });
159
160            Ok(StepExecutionResult::with_usage(
161                output,
162                ResourceUsage {
163                    tokens: Some(100),
164                    cost: Some(0.001),
165                },
166            ))
167        }
168    }
169
170    fn repo_root() -> PathBuf {
171        PathBuf::from(env!("CARGO_MANIFEST_DIR"))
172            .join("..")
173            .join("..")
174    }
175
176    fn load_agent_team_spec() -> AnyResult<(
177        String,
178        Option<prost_types::Struct>,
179        i64,
180        HashMap<String, String>,
181    )> {
182        let pack_dir = repo_root().join("examples/_packs/demo-pack/agent_team");
183        let spec_value: Value =
184            serde_json::from_str(&fs::read_to_string(pack_dir.join("run_spec.json"))?)?;
185
186        let dag_value = match spec_value.get("dag_json").cloned().unwrap_or(Value::Null) {
187            Value::String(path) if path.starts_with("@file:") => {
188                let dag_path = pack_dir.join(path.trim_start_matches("@file:"));
189                serde_json::from_str(&fs::read_to_string(dag_path)?)?
190            }
191            other => other,
192        };
193
194        let dag_json = serde_json::to_string(&dag_value)?;
195
196        let inputs_struct = match spec_value.get("inputs") {
197            Some(Value::Null) | None => None,
198            Some(value) => Some(json_to_prost_struct(value)?),
199        };
200
201        let seed = spec_value
202            .get("seed")
203            .and_then(Value::as_i64)
204            .ok_or_else(|| anyhow!("run_spec.json must include an explicit integer seed"))?;
205        if seed <= 0 {
206            return Err(anyhow!("run_spec.json seed must be a positive integer"));
207        }
208
209        let labels = spec_value
210            .get("labels")
211            .and_then(Value::as_object)
212            .map(|map| {
213                map.iter()
214                    .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
215                    .collect::<HashMap<_, _>>()
216            })
217            .unwrap_or_default();
218
219        Ok((dag_json, inputs_struct, seed, labels))
220    }
221
222    fn reserve_port() -> u16 {
223        TcpListener::bind("127.0.0.1:0")
224            .expect("bind local port")
225            .local_addr()
226            .unwrap()
227            .port()
228    }
229
230    fn docker_available() -> bool {
231        std::env::var("DOCKER_HOST").is_ok() || fs::metadata("/var/run/docker.sock").is_ok()
232    }
233
234    async fn wait_for_clickhouse(client: &Client) -> AnyResult<()> {
235        const RETRIES: usize = 10;
236        for attempt in 0..RETRIES {
237            match client.query("SELECT 1").fetch_one::<(u64,)>().await {
238                Ok(_) => return Ok(()),
239                Err(err) if attempt + 1 < RETRIES => {
240                    tokio::time::sleep(Duration::from_millis(200)).await;
241                    eprintln!("waiting for ClickHouse readiness: {err}");
242                }
243                Err(err) => return Err(err.into()),
244            }
245        }
246        Ok(())
247    }
248
249    async fn wait_for_run(
250        client: &mut RuntimeServiceClient<tonic::transport::Channel>,
251        run_id: &str,
252    ) -> AnyResult<runtime::RunDetail> {
253        for _ in 0..40 {
254            let mut request = tonic::Request::new(GetRunRequest {
255                run_id: run_id.to_string(),
256            });
257            request.metadata_mut().insert(
258                "authorization",
259                MetadataValue::try_from("Bearer demo-reader")?,
260            );
261            let response = client.get_run(request).await?.into_inner();
262            if let Some(run) = response.run {
263                if run.status == runtime::RunStatus::RunStatusSucceeded as i32 {
264                    return Ok(run);
265                }
266            }
267            sleep(Duration::from_millis(250)).await;
268        }
269        Err(anyhow!("run did not succeed in time"))
270    }
271
272    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
273    async fn golden_path_api_surface_acceptance() -> AnyResult<()> {
274        if !docker_available() {
275            eprintln!("Skipping API surface acceptance test because Docker is unavailable");
276            return Ok(());
277        }
278
279        let docker = Cli::default();
280        let postgres = docker.run(Postgres::default());
281        let pg_port = postgres.get_host_port_ipv4(5432);
282        let database_url = format!("postgres://postgres:postgres@127.0.0.1:{pg_port}/postgres");
283        sleep(Duration::from_millis(250)).await;
284
285        let clickhouse_image = GenericImage::new("clickhouse/clickhouse-server", "23.11")
286            .with_wait_for(WaitFor::message("Ready for connections"));
287        let clickhouse = docker.run(clickhouse_image);
288        let ch_port = clickhouse.get_host_port_ipv4(8123);
289        let ch_url = format!("http://127.0.0.1:{ch_port}");
290
291        let admin_client = Client::default()
292            .with_url(ch_url.clone())
293            .with_database("default");
294        wait_for_clickhouse(&admin_client).await?;
295        admin_client
296            .command("CREATE DATABASE IF NOT EXISTS telemetry")
297            .execute()
298            .await?;
299        admin_client
300            .command(
301                "CREATE TABLE IF NOT EXISTS telemetry.run_events (\
302                    run_id UUID, \
303                    created_at DateTime64(3, 'UTC'), \
304                    status String, \
305                    attributes JSON\
306                ) ENGINE=MergeTree ORDER BY (created_at, run_id)",
307            )
308            .execute()
309            .await?;
310        admin_client
311            .command(
312                "CREATE TABLE IF NOT EXISTS telemetry.step_events (\
313                    run_id UUID, \
314                    step_id UUID, \
315                    status String, \
316                    attempt Int32, \
317                    started_at DateTime64(3, 'UTC'), \
318                    finished_at Nullable(DateTime64(3, 'UTC')), \
319                    attributes JSON\
320                ) ENGINE=MergeTree ORDER BY (started_at, run_id, step_id)",
321            )
322            .execute()
323            .await?;
324
325        let storage = Arc::new(
326            Storage::connect(StorageConfig {
327                database_url,
328                max_connections: 4,
329                connect_timeout: Duration::from_secs(10),
330                object_store: ObjectStoreConfig::InMemory,
331            })
332            .await?,
333        );
334        let audit = Arc::new(Audit::new(storage.pool().clone()));
335        let attestation_vault: Arc<dyn AttestationVault> = Arc::new(
336            ObjectStoreAttestationVault::new(storage.object_store(), storage.pool().clone()),
337        );
338
339        let mut registry = ExecutorRegistry::new();
340        registry.register(Arc::new(EchoToolExecutor))?;
341
342        let scheduler = Arc::new(Scheduler::new(
343            Arc::clone(&storage),
344            Arc::new(registry),
345            AllowAllPolicy::shared(),
346            Arc::clone(&audit),
347            Arc::clone(&attestation_vault),
348            SchedulerConfig::default(),
349        ));
350
351        struct TaskAbortOnDrop(tokio::task::JoinHandle<()>);
352        impl TaskAbortOnDrop {
353            fn abort(&mut self) {
354                self.0.abort();
355            }
356        }
357        impl Drop for TaskAbortOnDrop {
358            fn drop(&mut self) {
359                self.0.abort();
360            }
361        }
362
363        let scheduler_handle = tokio::spawn({
364            let scheduler = Arc::clone(&scheduler);
365            async move {
366                scheduler.run().await.expect("scheduler loop");
367            }
368        });
369        let mut scheduler_handle = TaskAbortOnDrop(scheduler_handle);
370
371        let grpc_port = reserve_port();
372        let http_port = reserve_port();
373        std::env::set_var(
374            "FLEETFORGE_API_TOKENS",
375            "writer=demo-writer,reader=demo-reader",
376        );
377        std::env::set_var("FLEETFORGE_API_ADDR", format!("127.0.0.1:{grpc_port}"));
378        std::env::set_var("FLEETFORGE_HTTP_ADDR", format!("127.0.0.1:{http_port}"));
379        std::env::set_var("CLICKHOUSE_DSN", ch_url.clone());
380        std::env::set_var("CLICKHOUSE_DATABASE", "telemetry");
381        std::env::set_var("CLICKHOUSE_RUN_TABLE", "run_events");
382        std::env::set_var("CLICKHOUSE_STEP_TABLE", "step_events");
383
384        struct EnvCleanup;
385        impl Drop for EnvCleanup {
386            fn drop(&mut self) {
387                for key in [
388                    "FLEETFORGE_API_TOKENS",
389                    "FLEETFORGE_API_ADDR",
390                    "FLEETFORGE_HTTP_ADDR",
391                    "CLICKHOUSE_DSN",
392                    "CLICKHOUSE_DATABASE",
393                    "CLICKHOUSE_RUN_TABLE",
394                    "CLICKHOUSE_STEP_TABLE",
395                ] {
396                    std::env::remove_var(key);
397                }
398            }
399        }
400        let _env_guard = EnvCleanup;
401
402        let api_storage = Arc::clone(&storage);
403        let api_audit = Arc::clone(&audit);
404        let api_handle = tokio::spawn(async move {
405            if let Err(err) = start_api_server(api_storage, api_audit).await {
406                eprintln!("API server exited: {err:?}");
407            }
408        });
409        let mut api_handle = TaskAbortOnDrop(api_handle);
410
411        sleep(Duration::from_millis(500)).await;
412
413        let dag_step_id = "11111111-1111-4111-8111-111111111111";
414        let dag_value = json!({
415            "steps": [{
416                "id": dag_step_id,
417                "type": "tool",
418                "inputs": {
419                    "command": ["echo", "hello", "acceptance"]
420                }
421            }],
422            "edges": []
423        });
424        let dag_json_string = serde_json::to_string(&dag_value)?;
425
426        let http_client = reqwest::Client::new();
427        let submit_body = json!({
428            "dag_json": dag_value,
429            "inputs": {},
430            "seed": 42,
431            "labels": { "purpose": "acceptance" }
432        });
433
434        let submit_response = http_client
435            .post(format!("http://127.0.0.1:{http_port}/v1/runs"))
436            .header("Authorization", "Bearer demo-writer")
437            .json(&submit_body)
438            .send()
439            .await?;
440        assert_eq!(submit_response.status(), StatusCode::CREATED);
441        let submit_json: Value = submit_response.json().await?;
442        let run_id_http = submit_json["run_id"]
443            .as_str()
444            .ok_or_else(|| anyhow!("missing run_id in HTTP response"))?
445            .to_string();
446
447        let detail_response = http_client
448            .get(format!(
449                "http://127.0.0.1:{http_port}/v1/runs/{run_id_http}"
450            ))
451            .header("Authorization", "Bearer demo-reader")
452            .send()
453            .await?;
454        assert_eq!(detail_response.status(), StatusCode::OK);
455        let detail_json: Value = detail_response.json().await?;
456        assert_eq!(
457            detail_json["seed"].as_i64(),
458            Some(42),
459            "HTTP readback should preserve seed"
460        );
461
462        let endpoint = format!("http://127.0.0.1:{grpc_port}");
463        let mut runtime_client = RuntimeServiceClient::connect(endpoint.clone()).await?;
464        let run_detail = wait_for_run(&mut runtime_client, &run_id_http).await?;
465        assert_eq!(
466            run_detail.seed, 42,
467            "gRPC GetRun should return the same seed"
468        );
469        let run_uuid = Uuid::parse_str(&run_id_http)?;
470        let steps = storage
471            .fetch_steps_for_run(run_uuid)
472            .await
473            .context("fetch steps for run")?;
474        let first_step = steps.first().context("expected recorded step")?;
475
476        let mut tap_client = TapServiceClient::connect(endpoint.clone()).await?;
477        let (mut tx, rx) = mpsc::channel::<SubscribeRunRequest>(32);
478        tx.send(SubscribeRunRequest {
479            request: Some(subscribe_run_request::Request::Open(SubscribeRunOpen {
480                run_ids: vec![run_id_http.clone()],
481                step_ids: Vec::new(),
482                kinds: Vec::new(),
483                severities: Vec::new(),
484                tags: Vec::new(),
485                since_offset: 0,
486                batch_size: 128,
487                max_inflight: 64,
488                heartbeat_interval_ms: 1_000,
489                cursor: "".into(),
490            })),
491        })
492        .await?;
493        let outbound = ReceiverStream::new(rx);
494        let mut tap_request = tonic::Request::new(outbound);
495        tap_request.metadata_mut().insert(
496            "authorization",
497            MetadataValue::try_from("Bearer demo-reader")?,
498        );
499        let mut stream = tap_client.subscribe_run(tap_request).await?.into_inner();
500        let mut saw_event = false;
501        while let Some(mut response) = stream.message().await? {
502            if response.heartbeat {
503                continue;
504            }
505            saw_event = true;
506            let offset = response.offset;
507            if offset > 0 {
508                tx.send(SubscribeRunRequest {
509                    request: Some(subscribe_run_request::Request::Ack(SubscribeRunAck {
510                        offset,
511                    })),
512                })
513                .await?;
514            }
515            break;
516        }
517        drop(tx);
518        assert!(saw_event, "expected tap stream to yield events");
519
520        #[derive(Clone, serde::Serialize, clickhouse::Row)]
521        struct RunEventInsert {
522            run_id: Uuid,
523            created_at: chrono::DateTime<Utc>,
524            status: String,
525            attributes: Value,
526        }
527
528        #[derive(Clone, serde::Serialize, clickhouse::Row)]
529        struct StepEventInsert {
530            run_id: Uuid,
531            step_id: Uuid,
532            status: String,
533            attempt: i32,
534            started_at: chrono::DateTime<Utc>,
535            finished_at: Option<chrono::DateTime<Utc>>,
536            attributes: Value,
537        }
538
539        let telemetry_client = Client::default()
540            .with_url(ch_url.clone())
541            .with_database("telemetry");
542        let now = Utc::now();
543
544        let mut run_insert = telemetry_client.insert("telemetry.run_events")?;
545        run_insert
546            .write(&RunEventInsert {
547                run_id: run_uuid,
548                created_at: now,
549                status: "succeeded".to_string(),
550                attributes: json!({ "source": "acceptance" }),
551            })
552            .await?;
553        run_insert.end().await?;
554
555        let mut step_insert = telemetry_client.insert("telemetry.step_events")?;
556        step_insert
557            .write(&StepEventInsert {
558                run_id: run_uuid,
559                step_id: first_step.step_id,
560                status: "succeeded".to_string(),
561                attempt: first_step.attempt,
562                started_at: first_step.created_at,
563                finished_at: Some(Utc::now()),
564                attributes: json!({ "source": "acceptance" }),
565            })
566            .await?;
567        step_insert.end().await?;
568
569        let mut tap_client = TapServiceClient::connect(endpoint.clone()).await?;
570        let mut query_request = tap::QueryEventsRequest {
571            run_ids: vec![run_id_http.clone()],
572            workspace_ids: Vec::new(),
573            kinds: Vec::new(),
574            policy_effects: Vec::new(),
575            start_time: None,
576            end_time: None,
577            page_size: 10,
578            page_token: String::new(),
579            source: tap::QueryEventSource::QueryEventSourceRun as i32,
580            step_ids: Vec::new(),
581            ascending: true,
582            severities: Vec::new(),
583        };
584        let mut query_req = tonic::Request::new(query_request);
585        query_req.metadata_mut().insert(
586            "authorization",
587            MetadataValue::try_from("Bearer demo-reader")?,
588        );
589        let query_response = tap_client.query_events(query_req).await?.into_inner();
590        assert!(
591            !query_response.events.is_empty(),
592            "expected ClickHouse-backed QueryEvents response"
593        );
594
595        let mut labels = HashMap::new();
596        labels.insert("purpose".to_string(), "breakpoint".to_string());
597        let run2_spec = RunSpec {
598            dag_json: dag_json_string.clone(),
599            inputs: None,
600            seed: 99,
601            labels,
602            breakpoints: vec![dag_step_id.to_string()],
603            ..Default::default()
604        };
605
606        let mut submit_req = tonic::Request::new(SubmitRunRequest {
607            spec: Some(run2_spec),
608            idempotency_key: String::new(),
609        });
610        submit_req.metadata_mut().insert(
611            "authorization",
612            MetadataValue::try_from("Bearer demo-writer")?,
613        );
614        let run2_id = runtime_client
615            .submit_run(submit_req)
616            .await?
617            .into_inner()
618            .run_id;
619
620        let mut paused = false;
621        for _ in 0..40 {
622            let mut get_req = tonic::Request::new(GetRunRequest {
623                run_id: run2_id.clone(),
624            });
625            get_req.metadata_mut().insert(
626                "authorization",
627                MetadataValue::try_from("Bearer demo-reader")?,
628            );
629            let response = runtime_client.get_run(get_req).await?.into_inner();
630            if let Some(run) = response.run {
631                if run.status == runtime::RunStatus::RunStatusPausedAtStep as i32 {
632                    paused = true;
633                    break;
634                }
635            }
636            sleep(Duration::from_millis(250)).await;
637        }
638        assert!(paused, "run should pause at declared breakpoint");
639
640        let mut update_req = tonic::Request::new(runtime::UpdateBreakpointsRequest {
641            run_id: run2_id.clone(),
642            add: Vec::new(),
643            remove: Vec::new(),
644            clear_all: true,
645        });
646        update_req.metadata_mut().insert(
647            "authorization",
648            MetadataValue::try_from("Bearer demo-writer")?,
649        );
650        let update_response = runtime_client
651            .update_breakpoints(update_req)
652            .await?
653            .into_inner();
654        assert!(
655            update_response.breakpoints.is_empty(),
656            "clear_all should remove pending breakpoints"
657        );
658
659        let mut resume_req = tonic::Request::new(runtime::ResumeRunRequest {
660            run_id: run2_id.clone(),
661        });
662        resume_req.metadata_mut().insert(
663            "authorization",
664            MetadataValue::try_from("Bearer demo-writer")?,
665        );
666        let resume_response = runtime_client.resume_run(resume_req).await?.into_inner();
667        assert_eq!(
668            resume_response.status,
669            runtime::RunStatus::RunStatusRunning as i32,
670            "resume should transition run back to running"
671        );
672
673        let _ = wait_for_run(&mut runtime_client, &run2_id).await?;
674
675        scheduler_handle.abort();
676        api_handle.abort();
677        drop(clickhouse);
678        drop(postgres);
679
680        Ok(())
681    }
682
683    async fn submit_and_wait(
684        client: &mut RuntimeServiceClient<tonic::transport::Channel>,
685        tap_endpoint: &str,
686        spec: RunSpec,
687    ) -> AnyResult<(String, Vec<RunEventEnvelope>, runtime::RunDetail)> {
688        let submit_request = SubmitRunRequest {
689            spec: Some(spec),
690            idempotency_key: String::new(),
691        };
692
693        let mut request = tonic::Request::new(submit_request);
694        request.metadata_mut().insert(
695            "authorization",
696            MetadataValue::try_from("Bearer demo-writer")?,
697        );
698
699        let response = client.submit_run(request).await?.into_inner();
700        let run_id = response.run_id;
701
702        let mut tap_client = TapServiceClient::connect(tap_endpoint.to_string()).await?;
703        let (mut tx, rx) = mpsc::channel::<SubscribeRunRequest>(32);
704        tx.send(SubscribeRunRequest {
705            request: Some(subscribe_run_request::Request::Open(SubscribeRunOpen {
706                run_ids: vec![run_id.clone()],
707                step_ids: Vec::new(),
708                kinds: Vec::new(),
709                severities: Vec::new(),
710                tags: Vec::new(),
711                since_offset: 0,
712                batch_size: 256,
713                max_inflight: 64,
714                heartbeat_interval_ms: 1_000,
715            })),
716        })
717        .await?;
718
719        let outbound = ReceiverStream::new(rx);
720        let mut tap_request = tonic::Request::new(outbound);
721        tap_request.metadata_mut().insert(
722            "authorization",
723            MetadataValue::try_from("Bearer demo-reader")?,
724        );
725
726        let mut stream = tap_client.subscribe_run(tap_request).await?.into_inner();
727        let mut events = Vec::new();
728
729        while let Some(mut response) = stream.message().await? {
730            if response.heartbeat {
731                continue;
732            }
733
734            if let Some(event) = response.event.take() {
735                if response.offset > 0 {
736                    let ack = SubscribeRunRequest {
737                        request: Some(subscribe_run_request::Request::Ack(SubscribeRunAck {
738                            offset: response.offset,
739                        })),
740                    };
741                    if tx.send(ack).await.is_err() {
742                        break;
743                    }
744                }
745
746                let done = event.tags.iter().any(|tag| tag == "kind:run_succeeded");
747                events.push(event);
748                if done {
749                    break;
750                }
751            }
752        }
753
754        let run_detail = wait_for_run(client, &run_id).await?;
755        Ok((run_id, events, run_detail))
756    }
757
758    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
759    async fn golden_path_agent_team_replay_within_one_percent() -> AnyResult<()> {
760        if std::fs::metadata("/var/run/docker.sock").is_err()
761            && std::env::var("DOCKER_HOST").is_err()
762        {
763            eprintln!("Skipping E2E test because Docker is unavailable");
764            return Ok(());
765        }
766
767        let docker = Cli::default();
768        let container = docker.run(Postgres::default());
769        let port = container.get_host_port_ipv4(5432);
770        let database_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
771        sleep(Duration::from_millis(250)).await;
772
773        let storage = Arc::new(
774            Storage::connect(StorageConfig {
775                database_url,
776                max_connections: 4,
777                connect_timeout: Duration::from_secs(10),
778                object_store: ObjectStoreConfig::InMemory,
779            })
780            .await?,
781        );
782        let audit = Arc::new(Audit::new(storage.pool().clone()));
783
784        let mut registry = ExecutorRegistry::new();
785        registry.register(Arc::new(EchoToolExecutor))?;
786
787        let scheduler = Arc::new(Scheduler::new(
788            Arc::clone(&storage),
789            Arc::new(registry),
790            AllowAllPolicy::shared(),
791            Arc::clone(&audit),
792            Arc::clone(&attestation_vault),
793            SchedulerConfig::default(),
794        ));
795
796        let scheduler_handle = tokio::spawn({
797            let scheduler = Arc::clone(&scheduler);
798            async move {
799                scheduler.run().await.expect("scheduler loop");
800            }
801        });
802
803        let grpc_port = reserve_port();
804        let http_port = reserve_port();
805        std::env::set_var(
806            "FLEETFORGE_API_TOKENS",
807            "writer=demo-writer,reader=demo-reader",
808        );
809        std::env::set_var("FLEETFORGE_API_ADDR", format!("127.0.0.1:{grpc_port}"));
810        std::env::set_var("FLEETFORGE_HTTP_ADDR", format!("127.0.0.1:{http_port}"));
811
812        let api_storage = Arc::clone(&storage);
813        let api_audit = Arc::clone(&audit);
814        let api_handle = tokio::spawn(async move {
815            if let Err(err) = start_api_server(api_storage, api_audit).await {
816                eprintln!("API server exited: {err:?}");
817            }
818        });
819
820        sleep(Duration::from_millis(500)).await;
821
822        let endpoint = format!("http://127.0.0.1:{grpc_port}");
823        let channel = tonic::transport::Channel::from_shared(endpoint.clone())?
824            .connect()
825            .await?;
826        let mut client = RuntimeServiceClient::new(channel);
827
828        let (dag_json, inputs_struct, seed, labels) = load_agent_team_spec()?;
829        let run_spec = RunSpec {
830            dag_json: dag_json.clone(),
831            inputs: inputs_struct.clone(),
832            seed,
833            labels: labels.clone(),
834        };
835
836        let (run_a, events_a, _) =
837            submit_and_wait(&mut client, &endpoint, run_spec.clone()).await?;
838        assert!(events_a
839            .iter()
840            .any(|event| event.tags.iter().any(|tag| tag == "kind:run_succeeded")));
841
842        let trace_ids: HashSet<_> = events_a
843            .iter()
844            .filter_map(|event| event.trace.as_ref())
845            .map(|trace| trace.trace_id.clone())
846            .filter(|id| !id.is_empty())
847            .collect();
848        assert_eq!(
849            trace_ids.len(),
850            1,
851            "expected a single trace_id across the agent run"
852        );
853
854        let mut saw_budget = false;
855        let mut saw_policy = false;
856        for event in &events_a {
857            if let Some(raw) = event.raw.as_ref() {
858                let raw_json = prost_struct_to_json(raw);
859                if raw_json
860                    .get("budget")
861                    .and_then(|value| value.as_object())
862                    .is_some()
863                {
864                    saw_budget = true;
865                }
866                if raw_json
867                    .get("policy_decisions")
868                    .and_then(|value| value.as_object())
869                    .is_some()
870                {
871                    saw_policy = true;
872                }
873            }
874        }
875        assert!(saw_budget, "expected budget telemetry in stream payloads");
876        assert!(
877            saw_policy,
878            "expected policy decision summaries in stream payloads"
879        );
880
881        let steps_a = storage
882            .fetch_steps_for_run(Uuid::parse_str(&run_a)?)
883            .await?;
884
885        let (run_b, events_b, _) = submit_and_wait(&mut client, &endpoint, run_spec).await?;
886        assert!(events_b
887            .iter()
888            .any(|event| event.tags.iter().any(|tag| tag == "kind:run_succeeded")));
889
890        let steps_b = storage
891            .fetch_steps_for_run(Uuid::parse_str(&run_b)?)
892            .await?;
893
894        assert_eq!(steps_a.len(), steps_b.len(), "step count mismatch");
895
896        let mut by_id: HashMap<Uuid, &models::Step> = HashMap::new();
897        for step in &steps_a {
898            by_id.insert(step.step_id, step);
899        }
900
901        for step_b in &steps_b {
902            let Some(step_a) = by_id.get(&step_b.step_id) else {
903                anyhow::bail!("step {} missing in original run", step_b.step_id);
904            };
905            if let (Some(tokens_a), Some(tokens_b)) = (step_a.actual_tokens, step_b.actual_tokens) {
906                if tokens_a > 0 {
907                    let diff = ((tokens_a - tokens_b).abs() as f64) / (tokens_a as f64);
908                    assert!(diff < 0.01, "token drift {:?}", diff);
909                }
910            }
911            if let (Some(cost_a), Some(cost_b)) = (step_a.actual_cost, step_b.actual_cost) {
912                if cost_a > 0.0 {
913                    let diff = ((cost_a - cost_b).abs()) / cost_a;
914                    assert!(diff < 0.01, "cost drift {:?}", diff);
915                }
916            }
917        }
918
919        scheduler_handle.abort();
920        api_handle.abort();
921        drop(container);
922        std::env::remove_var("FLEETFORGE_API_TOKENS");
923        std::env::remove_var("FLEETFORGE_API_ADDR");
924        std::env::remove_var("FLEETFORGE_HTTP_ADDR");
925
926        Ok(())
927    }
928
929    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
930    async fn golden_path_langgraph_capability_and_attestations() -> AnyResult<()> {
931        if !docker_available() {
932            eprintln!("Skipping LangGraph trust test because Docker is unavailable");
933            return Ok(());
934        }
935
936        run_agent_adapter_smoke(AgentAdapterTestConfig {
937            adapter_kind: "langgraph",
938            entrypoint: "tests.python.simple_langgraph:create_graph",
939            slug: "langgraph_demo",
940            initial_state: json!({ "topic": "trust mesh" }),
941            adapter_inputs: None,
942            attestation_key: b"langgraph-test-key",
943        })
944        .await
945    }
946
947    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
948    async fn adapter_smoke_autogen_attestations() -> AnyResult<()> {
949        if !docker_available() {
950            eprintln!("Skipping AutoGen smoke test because Docker is unavailable");
951            return Ok(());
952        }
953
954        run_agent_adapter_smoke(AgentAdapterTestConfig {
955            adapter_kind: "autogen",
956            entrypoint: "tests.python.autogen_stub:create_adapter",
957            slug: "autogen_demo",
958            initial_state: Value::Null,
959            adapter_inputs: Some(json!({ "task": "document adapter contracts" })),
960            attestation_key: b"autogen-smoke-key",
961        })
962        .await
963    }
964
965    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
966    async fn adapter_smoke_crewai_attestations() -> AnyResult<()> {
967        if !docker_available() {
968            eprintln!("Skipping CrewAI smoke test because Docker is unavailable");
969            return Ok(());
970        }
971
972        run_agent_adapter_smoke(AgentAdapterTestConfig {
973            adapter_kind: "crewai",
974            entrypoint: "tests.python.crewai_stub:create_adapter",
975            slug: "crewai_demo",
976            initial_state: Value::Null,
977            adapter_inputs: Some(json!({ "inputs": { "topic": "trust mesh" } })),
978            attestation_key: b"crewai-smoke-key",
979        })
980        .await
981    }
982
983    struct EnvVarGuard {
984        key: String,
985        previous: Option<String>,
986    }
987
988    impl EnvVarGuard {
989        fn set(key: &str, value: &str) -> Self {
990            let previous = std::env::var(key).ok();
991            std::env::set_var(key, value);
992            Self {
993                key: key.to_string(),
994                previous,
995            }
996        }
997    }
998
999    impl Drop for EnvVarGuard {
1000        fn drop(&mut self) {
1001            if let Some(value) = &self.previous {
1002                std::env::set_var(&self.key, value);
1003            } else {
1004                std::env::remove_var(&self.key);
1005            }
1006        }
1007    }
1008
1009    #[derive(Clone)]
1010    struct AgentAdapterTestConfig {
1011        adapter_kind: &'static str,
1012        entrypoint: &'static str,
1013        slug: &'static str,
1014        initial_state: Value,
1015        adapter_inputs: Option<Value>,
1016        attestation_key: &'static [u8],
1017    }
1018
1019    async fn run_agent_adapter_smoke(config: AgentAdapterTestConfig) -> AnyResult<()> {
1020        let telemetry = TestTelemetry::new();
1021        let docker = Cli::default();
1022        let postgres = docker.run(Postgres::default());
1023        let pg_port = postgres.get_host_port_ipv4(5432);
1024        let database_url = format!("postgres://postgres:postgres@127.0.0.1:{pg_port}/postgres");
1025        let storage = Arc::new(
1026            Storage::connect(StorageConfig {
1027                database_url: database_url.clone(),
1028                max_connections: 5,
1029                connect_timeout: Duration::from_secs(5),
1030                object_store: ObjectStoreConfig::InMemory,
1031            })
1032            .await?,
1033        );
1034        let audit = Arc::new(Audit::new(storage.pool().clone()));
1035        let attestation_vault: Arc<dyn AttestationVault> = Arc::new(
1036            ObjectStoreAttestationVault::new(storage.object_store(), storage.pool().clone()),
1037        );
1038
1039        let mut registry = ExecutorRegistry::new();
1040        registry.register(Arc::new(LangGraphAgentExecutor::new()))?;
1041        let scheduler = Arc::new(Scheduler::new(
1042            Arc::clone(&storage),
1043            Arc::new(registry),
1044            AllowAllPolicy::shared(),
1045            Arc::clone(&audit),
1046            Arc::clone(&attestation_vault),
1047            SchedulerConfig::default(),
1048        ));
1049        let scheduler_handle = tokio::spawn({
1050            let scheduler = Arc::clone(&scheduler);
1051            async move {
1052                scheduler
1053                    .run()
1054                    .await
1055                    .expect("scheduler loop should complete cleanly");
1056            }
1057        });
1058
1059        let grpc_port = reserve_port();
1060        let http_port = reserve_port();
1061        let _api_tokens = EnvVarGuard::set(
1062            "FLEETFORGE_API_TOKENS",
1063            "writer=demo-writer,reader=demo-reader",
1064        );
1065        let _api_addr = EnvVarGuard::set("FLEETFORGE_API_ADDR", &format!("127.0.0.1:{grpc_port}"));
1066        let _http_addr =
1067            EnvVarGuard::set("FLEETFORGE_HTTP_ADDR", &format!("127.0.0.1:{http_port}"));
1068        let attestation_b64 = BASE64.encode(config.attestation_key);
1069        let _trust_guard = EnvVarGuard::set("TRUST_MESH_ALPHA", "1");
1070        let _attestation_guard = EnvVarGuard::set("FLEETFORGE_ATTESTATION_KEY", &attestation_b64);
1071
1072        let api_storage = Arc::clone(&storage);
1073        let api_audit = Arc::clone(&audit);
1074        let api_handle = tokio::spawn(async move {
1075            if let Err(err) = start_api_server(api_storage, api_audit).await {
1076                eprintln!("API server exited: {err:?}");
1077            }
1078        });
1079
1080        sleep(Duration::from_millis(500)).await;
1081
1082        let endpoint = format!("http://127.0.0.1:{grpc_port}");
1083        let mut client = RuntimeServiceClient::connect(endpoint.clone()).await?;
1084
1085        let repo = repo_root();
1086        let python_paths = vec![
1087            repo.join("sdk/python").to_string_lossy().into_owned(),
1088            repo.join("tests/python").to_string_lossy().into_owned(),
1089        ];
1090        let python_config = json!({ "path": python_paths });
1091        let mut inputs_map = serde_json::Map::new();
1092        inputs_map.insert(
1093            "entrypoint".into(),
1094            Value::String(config.entrypoint.to_string()),
1095        );
1096        inputs_map.insert(
1097            "adapter".into(),
1098            Value::String(config.adapter_kind.to_string()),
1099        );
1100        inputs_map.insert("initial_state".into(), config.initial_state.clone());
1101        inputs_map.insert("python".into(), python_config);
1102        if let Some(extra) = config.adapter_inputs.clone() {
1103            inputs_map.insert("adapter_inputs".into(), extra);
1104        }
1105
1106        let step_id = Uuid::new_v4();
1107        let dag_value = json!({
1108            "steps": [{
1109                "id": step_id,
1110                "type": "agent",
1111                "slug": config.slug,
1112                "inputs": Value::Object(inputs_map.clone()),
1113                "policy": {
1114                    "budget": { "tokens": 128, "cost": 0.005 },
1115                    "priority": 10
1116                }
1117            }],
1118            "edges": []
1119        });
1120        let dag_json = serde_json::to_string(&dag_value)?;
1121        let run_spec = RunSpec {
1122            dag_json,
1123            inputs: None,
1124            seed: 42,
1125            labels: HashMap::from([("suite".into(), "adapter_smoke".into())]),
1126        };
1127
1128        let (run_id, events, _) = submit_and_wait(&mut client, &endpoint, run_spec).await?;
1129        assert!(
1130            events
1131                .iter()
1132                .any(|event| event.tags.iter().any(|tag| tag == "kind:run_succeeded")),
1133            "adapter run should succeed"
1134        );
1135
1136        let run_uuid = Uuid::parse_str(&run_id)?;
1137        let steps = storage.fetch_steps_for_run(run_uuid).await?;
1138        let step = steps
1139            .iter()
1140            .find(|candidate| candidate.step_id == step_id)
1141            .context("agent step not recorded")?;
1142        let snapshot = step
1143            .output_snapshot
1144            .as_ref()
1145            .context("output snapshot missing")?;
1146
1147        let capability_value = snapshot
1148            .get("capability_token")
1149            .context("capability token missing")?;
1150        let token: CapabilityToken = serde_json::from_value(capability_value.clone())?;
1151        verify_capability_token(&token)?;
1152        assert_eq!(token.claims.scope.tool.name, config.slug);
1153
1154        let expected_schema_hash = step
1155            .spec_json
1156            .get("inputs")
1157            .map(digest_json)
1158            .context("spec inputs missing")?;
1159        assert_eq!(token.claims.scope.schema.hash, expected_schema_hash);
1160
1161        let snapshot_token_id = snapshot
1162            .get("capability_token_id")
1163            .and_then(Value::as_str)
1164            .context("capability_token_id missing")?;
1165        assert_eq!(snapshot_token_id, token.token_id().to_string());
1166
1167        let mut tampered = token.clone();
1168        tampered.claims.scope.schema.hash = "bad".to_string();
1169        assert!(
1170            verify_capability_token(&tampered).is_err(),
1171            "tampered capability token should fail verification"
1172        );
1173
1174        let attestation_value = snapshot
1175            .get("attestation")
1176            .context("adapter attestation missing")?;
1177        let attestation_id = verify_adapter_attestation(attestation_value, config.attestation_key)?;
1178        let adapter_ids = snapshot
1179            .get("attestation_ids")
1180            .and_then(Value::as_array)
1181            .context("adapter attestation ids missing")?;
1182        assert!(
1183            adapter_ids
1184                .iter()
1185                .any(|value| value.as_str() == Some(&attestation_id)),
1186            "adapter attestation id should be embedded in outputs"
1187        );
1188
1189        telemetry.force_flush();
1190        let otel_attestation =
1191            telemetry.find_attribute("fleetforge.agent.langgraph", "trust.attestation_id");
1192        assert_eq!(
1193            otel_attestation.as_deref(),
1194            Some(attestation_id.as_str()),
1195            "OTEL span should carry trust.attestation_id"
1196        );
1197
1198        let subject_id = format!("run:{}:step:{}", run_uuid, step_id);
1199        let row = sqlx::query(
1200            "select attestation_id, object_path from attestation_records \
1201             where subject_id = $1 and subject_kind = 'step' order by created_at desc limit 1",
1202        )
1203        .bind(&subject_id)
1204        .fetch_one(storage.pool())
1205        .await?;
1206        let recorded_path: String = row.try_get("object_path")?;
1207        let recorded_attestation = storage
1208            .object_store()
1209            .get(&Path::from(recorded_path.clone()))
1210            .await?
1211            .bytes()
1212            .await?;
1213        let vault_attestation: Attestation = serde_json::from_slice(&recorded_attestation)?;
1214        match vault_attestation.subject {
1215            Some(TrustSubject::Step {
1216                run_id,
1217                step_id: vault_step,
1218            }) => {
1219                assert_eq!(run_id, run_uuid);
1220                assert_eq!(vault_step, step_id);
1221            }
1222            other => anyhow::bail!("unexpected attestation subject {:?}", other),
1223        }
1224
1225        let replay = ReplayExecutor::new(Arc::clone(&storage));
1226        let replay_outcome = replay
1227            .replay_run(
1228                RunId(run_uuid),
1229                ReplayStrategy::Mocked,
1230                DriftTolerance {
1231                    tokens: 0.0,
1232                    cost: 0.0,
1233                },
1234            )
1235            .await?;
1236        assert!(
1237            replay_outcome.all_within_tolerance,
1238            "replay should remain within tolerance"
1239        );
1240
1241        scheduler_handle.abort();
1242        api_handle.abort();
1243        drop(postgres);
1244
1245        telemetry.shutdown();
1246
1247        Ok(())
1248    }
1249
1250    struct TestTelemetry {
1251        spans: Arc<Mutex<Vec<SpanData>>>,
1252        provider: trace::TracerProvider,
1253        _subscriber: DefaultGuard,
1254    }
1255
1256    impl TestTelemetry {
1257        fn new() -> Self {
1258            let spans = Arc::new(Mutex::new(Vec::new()));
1259            let exporter = RecordingExporter::new(Arc::clone(&spans));
1260            let provider = trace::TracerProvider::builder()
1261                .with_simple_exporter(exporter)
1262                .build();
1263            let tracer = provider.tracer("fleetforge-runtime-test", None);
1264            let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
1265            let subscriber = Registry::default()
1266                .with(EnvFilter::new("info"))
1267                .with(otel_layer);
1268            let guard = tracing::subscriber::set_default(subscriber);
1269            Self {
1270                spans,
1271                provider,
1272                _subscriber: guard,
1273            }
1274        }
1275
1276        fn force_flush(&self) {
1277            let _ = self.provider.force_flush();
1278        }
1279
1280        fn find_attribute(&self, span_name: &str, key: &str) -> Option<String> {
1281            let spans = self.spans.lock().expect("span mutex poisoned");
1282            for span in spans.iter() {
1283                if span.name.as_ref() == span_name {
1284                    for (attr_key, value) in span.attributes.iter() {
1285                        if attr_key.as_str() == key {
1286                            return Some(match value {
1287                                OtelValue::String(s) => s.clone().into_owned(),
1288                                OtelValue::Bool(b) => b.to_string(),
1289                                OtelValue::I64(v) => v.to_string(),
1290                                OtelValue::F64(v) => v.to_string(),
1291                                OtelValue::Array(arr) => format!("{arr:?}"),
1292                            });
1293                        }
1294                    }
1295                }
1296            }
1297            None
1298        }
1299
1300        fn shutdown(self) {
1301            drop(self);
1302        }
1303    }
1304
1305    impl Drop for TestTelemetry {
1306        fn drop(&mut self) {
1307            let _ = self.provider.shutdown();
1308            telemetry_otel::shutdown_tracer_provider();
1309        }
1310    }
1311
1312    struct RecordingExporter {
1313        spans: Arc<Mutex<Vec<SpanData>>>,
1314    }
1315
1316    impl RecordingExporter {
1317        fn new(spans: Arc<Mutex<Vec<SpanData>>>) -> Self {
1318            Self { spans }
1319        }
1320    }
1321
1322    #[async_trait::async_trait]
1323    impl SpanExporter for RecordingExporter {
1324        async fn export(&mut self, batch: Vec<SpanData>) -> ExportResult {
1325            let mut guard = self.spans.lock().expect("span mutex poisoned");
1326            guard.extend(batch);
1327            ExportResult::Success
1328        }
1329
1330        fn shutdown(&mut self) -> ExportResult {
1331            self.spans.lock().expect("span mutex poisoned").clear();
1332            ExportResult::Success
1333        }
1334    }
1335
1336    fn canonicalise(value: &Value) -> Value {
1337        match value {
1338            Value::Object(map) => {
1339                let mut ordered = BTreeMap::new();
1340                for (key, value) in map.iter() {
1341                    ordered.insert(key.clone(), canonicalise(value));
1342                }
1343                Value::Object(ordered)
1344            }
1345            Value::Array(items) => Value::Array(items.iter().map(canonicalise).collect()),
1346            other => other.clone(),
1347        }
1348    }
1349
1350    fn canonical_json_bytes(value: &Value) -> Vec<u8> {
1351        let canonical = canonicalise(value);
1352        serde_json::to_vec(&canonical).expect("canonical JSON serialization must succeed")
1353    }
1354
1355    fn verify_adapter_attestation(value: &Value, key: &[u8]) -> AnyResult<String> {
1356        let signature_value = value
1357            .get("signature")
1358            .and_then(Value::as_object)
1359            .and_then(|sig| sig.get("value"))
1360            .and_then(Value::as_str)
1361            .context("attestation signature missing")?;
1362        let statement = value
1363            .get("statement")
1364            .context("attestation statement missing")?;
1365        let mut mac = Hmac::<Sha256>::new_from_slice(key)?;
1366        mac.update(&canonical_json_bytes(statement));
1367        let expected = BASE64.encode(mac.finalize().into_bytes());
1368        anyhow::ensure!(
1369            signature_value == expected,
1370            "adapter attestation signature mismatch"
1371        );
1372        let attestation_id = value
1373            .get("id")
1374            .and_then(Value::as_str)
1375            .context("attestation id missing")?;
1376        Ok(attestation_id.to_string())
1377    }
1378}
1379
1380#[derive(Clone, Copy)]
1381struct RetentionWorkerConfig {
1382    run_days: i64,
1383    artifact_days: i64,
1384    interval: tokio::time::Duration,
1385}
1386
1387fn retention_config_from_env() -> Option<RetentionWorkerConfig> {
1388    let raw_run_env = std::env::var("FLEETFORGE_RETENTION_DAYS").ok()?;
1389    if !feature_allowed(LicensedFeature::LongRetention) {
1390        warn!(
1391            "FLEETFORGE_RETENTION_DAYS is set but the current license does not include extended retention; ignoring."
1392        );
1393        return None;
1394    }
1395    let raw_run = raw_run_env;
1396    let run_days: i64 = raw_run.parse().ok()?;
1397
1398    let artifact_days = std::env::var("FLEETFORGE_RETENTION_ARTIFACT_DAYS")
1399        .ok()
1400        .and_then(|v| v.parse::<i64>().ok())
1401        .unwrap_or(run_days);
1402
1403    if run_days <= 0 && artifact_days <= 0 {
1404        return None;
1405    }
1406
1407    let interval_secs = std::env::var("FLEETFORGE_RETENTION_INTERVAL_SECS")
1408        .ok()
1409        .and_then(|value| value.parse::<u64>().ok())
1410        .unwrap_or(3600)
1411        .max(60);
1412
1413    Some(RetentionWorkerConfig {
1414        run_days,
1415        artifact_days,
1416        interval: tokio::time::Duration::from_secs(interval_secs),
1417    })
1418}
1419
1420fn spawn_retention_worker(storage: Arc<Storage>) {
1421    if let Some(config) = retention_config_from_env() {
1422        let run_window = if config.run_days > 0 {
1423            Some(ChronoDuration::days(config.run_days as i64))
1424        } else {
1425            None
1426        };
1427        info!(
1428            retention_days = config.run_days,
1429            artifact_days = config.artifact_days,
1430            interval_secs = config.interval.as_secs(),
1431            "starting retention scrub worker"
1432        );
1433        let worker_storage = Arc::clone(&storage);
1434        let worker_config = config;
1435        tokio::spawn(async move {
1436            let run_window = run_window;
1437            loop {
1438                if let Some(window) = run_window {
1439                    let cutoff = Utc::now() - window;
1440                    match worker_storage.apply_retention_policy(cutoff).await {
1441                        Ok(stats) => {
1442                            if stats.runs_scrubbed > 0
1443                                || stats.steps_scrubbed > 0
1444                                || stats.events_scrubbed > 0
1445                            {
1446                                info!(
1447                                    runs_scrubbed = stats.runs_scrubbed,
1448                                    steps_scrubbed = stats.steps_scrubbed,
1449                                    events_scrubbed = stats.events_scrubbed,
1450                                    "retention scrub completed"
1451                                );
1452                            }
1453                        }
1454                        Err(err) => {
1455                            warn!(error = %err, "retention scrub failed");
1456                        }
1457                    }
1458                }
1459
1460                let object_store = worker_storage.object_store();
1461                if let Err(err) = purge_retention(
1462                    worker_storage.pool(),
1463                    object_store.as_ref(),
1464                    &StorageRetentionConfig {
1465                        run_days: worker_config.run_days.max(0),
1466                        artifact_days: worker_config.artifact_days.max(0),
1467                    },
1468                )
1469                .await
1470                {
1471                    warn!(error = %err, "retention purge failed");
1472                }
1473
1474                tokio::time::sleep(worker_config.interval).await;
1475            }
1476        });
1477    } else {
1478        info!("retention scrub worker disabled");
1479    }
1480}
1481
1482/// Bootstraps the FleetForge runtime components.
1483pub async fn bootstrap() -> Result<()> {
1484    let telemetry = TelemetryGuard::init("fleetforge-runtime")?;
1485    let storage = Arc::new(Storage::connect_default().await?);
1486    let audit = Arc::new(Audit::new(storage.pool().clone()));
1487    let attestation_vault: Arc<dyn AttestationVault> = Arc::new(ObjectStoreAttestationVault::new(
1488        storage.object_store(),
1489        storage.pool().clone(),
1490    ));
1491    let prompt_registry: Option<Arc<dyn PromptRegistry>> =
1492        match PromptCompiler::from_directory("prompts/agent_packs") {
1493            Ok(registry) => {
1494                info!("loaded prompt packs from ./prompts/agent_packs");
1495                Some(Arc::new(registry) as Arc<dyn PromptRegistry>)
1496            }
1497            Err(err) => {
1498                info!(
1499                    error = %err,
1500                    "prompt registry unavailable; continuing without prompt packs"
1501                );
1502                None
1503            }
1504        };
1505    let mut executor_registry = ExecutorRegistry::new();
1506    let gateways = Arc::new(GatewayRegistry::with_defaults());
1507    executor_registry.register(Arc::new(LlmExecutor::new(
1508        Arc::clone(&gateways),
1509        prompt_registry.clone(),
1510        Option::<Arc<dyn MemoryAdapter>>::None,
1511    )))?;
1512    executor_registry.register(Arc::new(DeterministicReplayExecutor::default()))?;
1513    executor_registry.register(Arc::new(HttpProxyExecutor::new()?))?;
1514    executor_registry.register(Arc::new(LangGraphAgentExecutor::new()))?;
1515
1516    let docker_executor = Arc::new(DockerToolExecutor::new().map_err(|err| {
1517        anyhow!(
1518            "failed to initialise Docker tool executor: {err}. Install Docker or set FLEETFORGE_DOCKER_CLI/FLEETFORGE_TOOLBOX_IMAGE."
1519        )
1520    })?);
1521
1522    let firecracker_executor = match FirecrackerToolExecutor::new() {
1523        Ok(exec) => {
1524            info!("firecracker sandbox enabled for tool execution");
1525            Some(Arc::new(exec))
1526        }
1527        Err(err) => {
1528            info!(error = %err, "firecracker sandbox disabled; continuing with docker-only tools");
1529            None
1530        }
1531    };
1532
1533    executor_registry.register(Arc::new(ToolRouterExecutor::new(
1534        Arc::clone(&docker_executor),
1535        firecracker_executor,
1536    )))?;
1537    #[cfg(feature = "insecure-shell")]
1538    {
1539        match executor::ShellExecutor::new() {
1540            Ok(shell) => executor_registry.register(Arc::new(shell))?,
1541            Err(err) => {
1542                warn!(error = %err, "skipping insecure shell executor registration; enable it explicitly for local demos");
1543            }
1544        }
1545    }
1546    let executors = Arc::new(executor_registry);
1547    let policy_pack: Arc<RuntimePolicyPack> = match std::env::var("FLEETFORGE_POLICY_PACK") {
1548        Ok(value) => match value.trim().to_ascii_lowercase().as_str() {
1549            "default" | "" => {
1550                info!("loading default Trust Mesh policy pack");
1551                Arc::new(RuntimePolicyPack::new(
1552                    "fleetforge.policy.default",
1553                    shared_default_pack()?,
1554                ))
1555            }
1556            "hipaa" => {
1557                info!("loading HIPAA regulated policy pack");
1558                let engine: Arc<dyn PolicyEngine> =
1559                    Arc::new(RegulatedPack::new(RegulatedVertical::Hipaa)?);
1560                Arc::new(RuntimePolicyPack::new(
1561                    "fleetforge.policy.regulated.hipaa",
1562                    engine,
1563                ))
1564            }
1565            "gdpr" => {
1566                info!("loading GDPR regulated policy pack");
1567                let engine: Arc<dyn PolicyEngine> =
1568                    Arc::new(RegulatedPack::new(RegulatedVertical::Gdpr)?);
1569                Arc::new(RuntimePolicyPack::new(
1570                    "fleetforge.policy.regulated.gdpr",
1571                    engine,
1572                ))
1573            }
1574            "allow_all" => {
1575                info!("policy pack set to allow_all (no guardrails enforced)");
1576                Arc::new(RuntimePolicyPack::new(
1577                    "fleetforge.policy.allow_all",
1578                    AllowAllPolicy::shared(),
1579                ))
1580            }
1581            other => return Err(anyhow!("unknown policy pack '{other}'")),
1582        },
1583        Err(_) => {
1584            info!("policy pack not specified; loading default Trust Mesh pack");
1585            Arc::new(RuntimePolicyPack::new(
1586                "fleetforge.policy.default",
1587                shared_default_pack()?,
1588            ))
1589        }
1590    };
1591    let scheduler = Arc::new(Scheduler::new(
1592        storage.clone(),
1593        executors.clone(),
1594        policy_pack,
1595        Arc::clone(&audit),
1596        Arc::clone(&attestation_vault),
1597        SchedulerConfig::default(),
1598    ));
1599
1600    spawn_retention_worker(storage.clone());
1601
1602    let bus_storage = storage.clone();
1603    let bus_config = bus_config_from_env();
1604    let exporters = match fleetforge_telemetry::exporters::from_env() {
1605        Ok(value) => value,
1606        Err(err) => {
1607            warn!(error = ?err, "failed to initialise external exporters; continuing without them");
1608            Vec::new()
1609        }
1610    };
1611    let bus_handle = tokio::spawn(async move {
1612        run_outbox_forwarder_with_exporters(bus_config, bus_storage, exporters).await
1613    });
1614    let api_storage = storage.clone();
1615    let api_audit = audit.clone();
1616    let api_handle = tokio::spawn(async move { start_api_server(api_storage, api_audit).await });
1617    let scheduler_handle = tokio::spawn({
1618        let scheduler = scheduler.clone();
1619        async move { scheduler.run().await }
1620    });
1621
1622    let (bus_res, api_res, scheduler_res) = tokio::join!(bus_handle, api_handle, scheduler_handle);
1623
1624    bus_res.map_err(|err| anyhow!("bus task panicked: {err}"))??;
1625    api_res.map_err(|err| anyhow!("api task panicked: {err}"))??;
1626    scheduler_res.map_err(|err| anyhow!("scheduler task panicked: {err}"))??;
1627
1628    // Long-running tasks should keep the process alive; reaching here implies graceful shutdown.
1629    drop(storage);
1630    drop(telemetry);
1631    Ok(())
1632}
1633
1634fn bus_config_from_env() -> BusConfig {
1635    let mut config = BusConfig::default();
1636    if let Ok(bootstrap) = std::env::var("FLEETFORGE_BUS_BOOTSTRAP") {
1637        config.bootstrap_servers = bootstrap;
1638    }
1639    if let Ok(topic) = std::env::var("FLEETFORGE_BUS_TOPIC") {
1640        config.topic = topic;
1641    }
1642
1643    let transactional_id_env = std::env::var("FLEETFORGE_BUS_TRANSACTIONAL_ID")
1644        .ok()
1645        .filter(|id| !id.trim().is_empty());
1646    if let Some(tx_id) = transactional_id_env.clone() {
1647        config.transactional_id = Some(tx_id.clone());
1648    }
1649
1650    if transactional_id_env.is_some() {
1651        info!(
1652            transactional_id = transactional_id_env.as_deref().unwrap_or_default(),
1653            "outbox forwarder running in exactly-once mode; configure consumers with isolation.level=read_committed"
1654        );
1655    } else {
1656        info!(
1657            "outbox forwarder running in at-least-once mode; set FLEETFORGE_BUS_TRANSACTIONAL_ID for exactly-once"
1658        );
1659    }
1660
1661    config
1662}