1pub mod adapters;
4pub mod aibom;
5mod capability;
6pub mod executor;
7pub mod features;
8pub mod gateway;
9pub mod guardrails;
10pub mod memory;
11pub mod model;
12pub mod otel;
13pub mod policy;
14pub mod replay;
15pub mod scheduler;
16
17use std::sync::Arc;
18
19use anyhow::{anyhow, Result};
20use chrono::{Duration as ChronoDuration, Utc};
21use fleetforge_bus::{run_outbox_forwarder_with_exporters, BusConfig};
22use fleetforge_common::licensing::{feature_allowed, LicensedFeature};
23use fleetforge_runtime_api::start_api_server;
24use fleetforge_storage::retention::{
25 delete_expired as purge_retention, RetentionConfig as StorageRetentionConfig,
26};
27use fleetforge_storage::Storage;
28use fleetforge_telemetry::{audit::Audit, TelemetryGuard};
29use tracing::{info, warn};
30
31pub use adapters::{AgentRunAction, AgentRunAdapter, AgentRunContext, AgentRunEnvelope};
32#[cfg(feature = "insecure-shell")]
33pub use executor::ShellExecutor;
34pub use executor::{
35 BudgetCtx, DeterministicReplayExecutor, DockerToolExecutor, ExecutorRegistry,
36 FirecrackerToolExecutor, HttpProxyExecutor, LangGraphAgentExecutor, LlmExecutor, ResourceUsage,
37 StepCtx, StepExecutionResult, StepExecutor, ToolRouterExecutor,
38};
39pub use features::{set_trust_mesh_alpha, trust_mesh_alpha_enabled};
40pub use fleetforge_common::schemas::{
41 AGENT_RUN_SCHEMA_JSON, ARTIFACT_SCHEMA_JSON, RUN_EVENT_SCHEMA_JSON, RUN_SCHEMA_JSON,
42 RUN_SPEC_SCHEMA_JSON, STEP_SCHEMA_JSON,
43};
44pub use fleetforge_common::validation::{
45 validate_agent_run, validate_artifact, validate_run, validate_run_event, validate_run_spec,
46 validate_step_spec, SchemaErrorDetail, SchemaValidationError, StepSpecError,
47 StepSpecErrorDetail,
48};
49pub use fleetforge_policy::{
50 shared_default_pack, AllowAllPolicy, BasicPiiPolicy, Decision as PolicyDecision, PolicyEngine,
51 PolicyRequest, RegulatedPack, RegulatedVertical,
52};
53pub use fleetforge_prompt::{
54 CompiledPrompt, ModelResponse, ModelUsage, PromptCompiler, PromptRegistry,
55};
56pub use fleetforge_trust::{
57 digest_bytes, digest_json, Attestation, AttestationVault, InMemoryAttestationVault,
58 ObjectStoreAttestationVault, Trust, TrustBoundary, TrustDecision, TrustOrigin, TrustSource,
59 TrustSubject, TrustVerdict, Trusted, Untrusted, TRUST_MESH_ALPHA_FLAG,
60};
61pub use gateway::speech::{NoopStt, NoopTts, SpeechToText, TextToSpeech};
62pub use gateway::{
63 anthropic::{AnthropicConfig, AnthropicLanguageModel},
64 openai::{OpenAiConfig, OpenAiLanguageModel},
65 GatewayRegistry, LanguageModel, ToolAdapter,
66};
67pub use guardrails::{OutputGuardPolicy, PolicyBundle, PromptInjectionFallbackPolicy};
68pub use memory::{InMemoryAdapter, MemoryAdapter, MemoryRecord, VectorMatch, VectorMemoryAdapter};
69pub use model::{
70 ChatMessage, ChatRole, ContextSource, LlmInputs, QueuedStep, ResponseSchema, RunId, RunStatus,
71 StepId, StepSpec, StepStatus, StepType, ToolChoice, ToolSpec,
72};
73pub use policy::{enforce_policy, PolicyOutcome, RuntimePolicyPack};
74pub use replay::{
75 DriftTolerance, ReplayExecutor, ReplayOutcome, ReplayStrategy, StepDrift, StepReplay,
76};
77pub use scheduler::{Scheduler, SchedulerConfig};
78
79#[cfg(test)]
80mod memory_test;
81
82#[cfg(test)]
83mod e2e {
84 use super::*;
85 use crate::replay::{DriftTolerance, ReplayExecutor, ReplayStrategy};
86 use anyhow::{anyhow, Context, Result as AnyResult};
87 use base64::engine::general_purpose::STANDARD as BASE64;
88 use base64::Engine;
89 use chrono::Utc;
90 use clickhouse::Client;
91 use fleetforge_common::prost_json::{json_to_prost_struct, prost_struct_to_json};
92 use fleetforge_contracts::runtime::{
93 runtime_service_client::RuntimeServiceClient, GetRunRequest, RunSpec, SubmitRunRequest,
94 };
95 use fleetforge_contracts::tap::{
96 self, subscribe_run_request, tap_service_client::TapServiceClient, RunEventEnvelope,
97 SubscribeRunAck, SubscribeRunOpen, SubscribeRunRequest,
98 };
99 use fleetforge_policy::AllowAllPolicy;
100 use fleetforge_storage::{models, object_store::ObjectStoreConfig, StorageConfig};
101 use fleetforge_telemetry::audit::Audit;
102 use fleetforge_telemetry::otel::{self as telemetry_otel, Value as OtelValue};
103 use fleetforge_trust::{verify_capability_token, CapabilityToken, TrustSubject};
104 use hmac::{Hmac, Mac};
105 use object_store::path::Path;
106 use opentelemetry_sdk::export::trace::{ExportResult, SpanData, SpanExporter};
107 use opentelemetry_sdk::trace;
108 use reqwest::StatusCode;
109 use serde_json::{json, Value};
110 use sha2::Sha256;
111 use sqlx::Row;
112 use std::{
113 collections::{BTreeMap, HashMap, HashSet},
114 fs,
115 net::TcpListener,
116 path::PathBuf,
117 sync::{Arc, Mutex},
118 time::Duration,
119 };
120 use testcontainers::clients::Cli;
121 use testcontainers::core::WaitFor;
122 use testcontainers::images::generic::GenericImage;
123 use testcontainers::images::postgres::Postgres;
124 use tokio::sync::mpsc;
125 use tokio::time::sleep;
126 use tokio_stream::wrappers::ReceiverStream;
127 use tonic::metadata::MetadataValue;
128 use tracing::subscriber::DefaultGuard;
129 use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Registry};
130 use uuid::Uuid;
131
132 struct EchoToolExecutor;
133
134 #[async_trait::async_trait]
135 impl StepExecutor for EchoToolExecutor {
136 fn kind(&self) -> &'static str {
137 "tool"
138 }
139
140 async fn execute(&self, ctx: &StepCtx) -> anyhow::Result<StepExecutionResult> {
141 let command = ctx
142 .untrusted_inputs
143 .as_ref()
144 .get("command")
145 .and_then(Value::as_array)
146 .ok_or_else(|| anyhow!("missing command array"))?;
147 let message = command
148 .iter()
149 .skip(1)
150 .filter_map(Value::as_str)
151 .collect::<Vec<_>>()
152 .join(" ");
153 let stdout = format!("{}\n", message);
154 let output = json!({
155 "stdout": stdout,
156 "stderr": "",
157 "exit_code": 0,
158 });
159
160 Ok(StepExecutionResult::with_usage(
161 output,
162 ResourceUsage {
163 tokens: Some(100),
164 cost: Some(0.001),
165 },
166 ))
167 }
168 }
169
170 fn repo_root() -> PathBuf {
171 PathBuf::from(env!("CARGO_MANIFEST_DIR"))
172 .join("..")
173 .join("..")
174 }
175
176 fn load_agent_team_spec() -> AnyResult<(
177 String,
178 Option<prost_types::Struct>,
179 i64,
180 HashMap<String, String>,
181 )> {
182 let pack_dir = repo_root().join("examples/_packs/demo-pack/agent_team");
183 let spec_value: Value =
184 serde_json::from_str(&fs::read_to_string(pack_dir.join("run_spec.json"))?)?;
185
186 let dag_value = match spec_value.get("dag_json").cloned().unwrap_or(Value::Null) {
187 Value::String(path) if path.starts_with("@file:") => {
188 let dag_path = pack_dir.join(path.trim_start_matches("@file:"));
189 serde_json::from_str(&fs::read_to_string(dag_path)?)?
190 }
191 other => other,
192 };
193
194 let dag_json = serde_json::to_string(&dag_value)?;
195
196 let inputs_struct = match spec_value.get("inputs") {
197 Some(Value::Null) | None => None,
198 Some(value) => Some(json_to_prost_struct(value)?),
199 };
200
201 let seed = spec_value
202 .get("seed")
203 .and_then(Value::as_i64)
204 .ok_or_else(|| anyhow!("run_spec.json must include an explicit integer seed"))?;
205 if seed <= 0 {
206 return Err(anyhow!("run_spec.json seed must be a positive integer"));
207 }
208
209 let labels = spec_value
210 .get("labels")
211 .and_then(Value::as_object)
212 .map(|map| {
213 map.iter()
214 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
215 .collect::<HashMap<_, _>>()
216 })
217 .unwrap_or_default();
218
219 Ok((dag_json, inputs_struct, seed, labels))
220 }
221
222 fn reserve_port() -> u16 {
223 TcpListener::bind("127.0.0.1:0")
224 .expect("bind local port")
225 .local_addr()
226 .unwrap()
227 .port()
228 }
229
230 fn docker_available() -> bool {
231 std::env::var("DOCKER_HOST").is_ok() || fs::metadata("/var/run/docker.sock").is_ok()
232 }
233
234 async fn wait_for_clickhouse(client: &Client) -> AnyResult<()> {
235 const RETRIES: usize = 10;
236 for attempt in 0..RETRIES {
237 match client.query("SELECT 1").fetch_one::<(u64,)>().await {
238 Ok(_) => return Ok(()),
239 Err(err) if attempt + 1 < RETRIES => {
240 tokio::time::sleep(Duration::from_millis(200)).await;
241 eprintln!("waiting for ClickHouse readiness: {err}");
242 }
243 Err(err) => return Err(err.into()),
244 }
245 }
246 Ok(())
247 }
248
249 async fn wait_for_run(
250 client: &mut RuntimeServiceClient<tonic::transport::Channel>,
251 run_id: &str,
252 ) -> AnyResult<runtime::RunDetail> {
253 for _ in 0..40 {
254 let mut request = tonic::Request::new(GetRunRequest {
255 run_id: run_id.to_string(),
256 });
257 request.metadata_mut().insert(
258 "authorization",
259 MetadataValue::try_from("Bearer demo-reader")?,
260 );
261 let response = client.get_run(request).await?.into_inner();
262 if let Some(run) = response.run {
263 if run.status == runtime::RunStatus::RunStatusSucceeded as i32 {
264 return Ok(run);
265 }
266 }
267 sleep(Duration::from_millis(250)).await;
268 }
269 Err(anyhow!("run did not succeed in time"))
270 }
271
272 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
273 async fn golden_path_api_surface_acceptance() -> AnyResult<()> {
274 if !docker_available() {
275 eprintln!("Skipping API surface acceptance test because Docker is unavailable");
276 return Ok(());
277 }
278
279 let docker = Cli::default();
280 let postgres = docker.run(Postgres::default());
281 let pg_port = postgres.get_host_port_ipv4(5432);
282 let database_url = format!("postgres://postgres:postgres@127.0.0.1:{pg_port}/postgres");
283 sleep(Duration::from_millis(250)).await;
284
285 let clickhouse_image = GenericImage::new("clickhouse/clickhouse-server", "23.11")
286 .with_wait_for(WaitFor::message("Ready for connections"));
287 let clickhouse = docker.run(clickhouse_image);
288 let ch_port = clickhouse.get_host_port_ipv4(8123);
289 let ch_url = format!("http://127.0.0.1:{ch_port}");
290
291 let admin_client = Client::default()
292 .with_url(ch_url.clone())
293 .with_database("default");
294 wait_for_clickhouse(&admin_client).await?;
295 admin_client
296 .command("CREATE DATABASE IF NOT EXISTS telemetry")
297 .execute()
298 .await?;
299 admin_client
300 .command(
301 "CREATE TABLE IF NOT EXISTS telemetry.run_events (\
302 run_id UUID, \
303 created_at DateTime64(3, 'UTC'), \
304 status String, \
305 attributes JSON\
306 ) ENGINE=MergeTree ORDER BY (created_at, run_id)",
307 )
308 .execute()
309 .await?;
310 admin_client
311 .command(
312 "CREATE TABLE IF NOT EXISTS telemetry.step_events (\
313 run_id UUID, \
314 step_id UUID, \
315 status String, \
316 attempt Int32, \
317 started_at DateTime64(3, 'UTC'), \
318 finished_at Nullable(DateTime64(3, 'UTC')), \
319 attributes JSON\
320 ) ENGINE=MergeTree ORDER BY (started_at, run_id, step_id)",
321 )
322 .execute()
323 .await?;
324
325 let storage = Arc::new(
326 Storage::connect(StorageConfig {
327 database_url,
328 max_connections: 4,
329 connect_timeout: Duration::from_secs(10),
330 object_store: ObjectStoreConfig::InMemory,
331 })
332 .await?,
333 );
334 let audit = Arc::new(Audit::new(storage.pool().clone()));
335 let attestation_vault: Arc<dyn AttestationVault> = Arc::new(
336 ObjectStoreAttestationVault::new(storage.object_store(), storage.pool().clone()),
337 );
338
339 let mut registry = ExecutorRegistry::new();
340 registry.register(Arc::new(EchoToolExecutor))?;
341
342 let scheduler = Arc::new(Scheduler::new(
343 Arc::clone(&storage),
344 Arc::new(registry),
345 AllowAllPolicy::shared(),
346 Arc::clone(&audit),
347 Arc::clone(&attestation_vault),
348 SchedulerConfig::default(),
349 ));
350
351 struct TaskAbortOnDrop(tokio::task::JoinHandle<()>);
352 impl TaskAbortOnDrop {
353 fn abort(&mut self) {
354 self.0.abort();
355 }
356 }
357 impl Drop for TaskAbortOnDrop {
358 fn drop(&mut self) {
359 self.0.abort();
360 }
361 }
362
363 let scheduler_handle = tokio::spawn({
364 let scheduler = Arc::clone(&scheduler);
365 async move {
366 scheduler.run().await.expect("scheduler loop");
367 }
368 });
369 let mut scheduler_handle = TaskAbortOnDrop(scheduler_handle);
370
371 let grpc_port = reserve_port();
372 let http_port = reserve_port();
373 std::env::set_var(
374 "FLEETFORGE_API_TOKENS",
375 "writer=demo-writer,reader=demo-reader",
376 );
377 std::env::set_var("FLEETFORGE_API_ADDR", format!("127.0.0.1:{grpc_port}"));
378 std::env::set_var("FLEETFORGE_HTTP_ADDR", format!("127.0.0.1:{http_port}"));
379 std::env::set_var("CLICKHOUSE_DSN", ch_url.clone());
380 std::env::set_var("CLICKHOUSE_DATABASE", "telemetry");
381 std::env::set_var("CLICKHOUSE_RUN_TABLE", "run_events");
382 std::env::set_var("CLICKHOUSE_STEP_TABLE", "step_events");
383
384 struct EnvCleanup;
385 impl Drop for EnvCleanup {
386 fn drop(&mut self) {
387 for key in [
388 "FLEETFORGE_API_TOKENS",
389 "FLEETFORGE_API_ADDR",
390 "FLEETFORGE_HTTP_ADDR",
391 "CLICKHOUSE_DSN",
392 "CLICKHOUSE_DATABASE",
393 "CLICKHOUSE_RUN_TABLE",
394 "CLICKHOUSE_STEP_TABLE",
395 ] {
396 std::env::remove_var(key);
397 }
398 }
399 }
400 let _env_guard = EnvCleanup;
401
402 let api_storage = Arc::clone(&storage);
403 let api_audit = Arc::clone(&audit);
404 let api_handle = tokio::spawn(async move {
405 if let Err(err) = start_api_server(api_storage, api_audit).await {
406 eprintln!("API server exited: {err:?}");
407 }
408 });
409 let mut api_handle = TaskAbortOnDrop(api_handle);
410
411 sleep(Duration::from_millis(500)).await;
412
413 let dag_step_id = "11111111-1111-4111-8111-111111111111";
414 let dag_value = json!({
415 "steps": [{
416 "id": dag_step_id,
417 "type": "tool",
418 "inputs": {
419 "command": ["echo", "hello", "acceptance"]
420 }
421 }],
422 "edges": []
423 });
424 let dag_json_string = serde_json::to_string(&dag_value)?;
425
426 let http_client = reqwest::Client::new();
427 let submit_body = json!({
428 "dag_json": dag_value,
429 "inputs": {},
430 "seed": 42,
431 "labels": { "purpose": "acceptance" }
432 });
433
434 let submit_response = http_client
435 .post(format!("http://127.0.0.1:{http_port}/v1/runs"))
436 .header("Authorization", "Bearer demo-writer")
437 .json(&submit_body)
438 .send()
439 .await?;
440 assert_eq!(submit_response.status(), StatusCode::CREATED);
441 let submit_json: Value = submit_response.json().await?;
442 let run_id_http = submit_json["run_id"]
443 .as_str()
444 .ok_or_else(|| anyhow!("missing run_id in HTTP response"))?
445 .to_string();
446
447 let detail_response = http_client
448 .get(format!(
449 "http://127.0.0.1:{http_port}/v1/runs/{run_id_http}"
450 ))
451 .header("Authorization", "Bearer demo-reader")
452 .send()
453 .await?;
454 assert_eq!(detail_response.status(), StatusCode::OK);
455 let detail_json: Value = detail_response.json().await?;
456 assert_eq!(
457 detail_json["seed"].as_i64(),
458 Some(42),
459 "HTTP readback should preserve seed"
460 );
461
462 let endpoint = format!("http://127.0.0.1:{grpc_port}");
463 let mut runtime_client = RuntimeServiceClient::connect(endpoint.clone()).await?;
464 let run_detail = wait_for_run(&mut runtime_client, &run_id_http).await?;
465 assert_eq!(
466 run_detail.seed, 42,
467 "gRPC GetRun should return the same seed"
468 );
469 let run_uuid = Uuid::parse_str(&run_id_http)?;
470 let steps = storage
471 .fetch_steps_for_run(run_uuid)
472 .await
473 .context("fetch steps for run")?;
474 let first_step = steps.first().context("expected recorded step")?;
475
476 let mut tap_client = TapServiceClient::connect(endpoint.clone()).await?;
477 let (mut tx, rx) = mpsc::channel::<SubscribeRunRequest>(32);
478 tx.send(SubscribeRunRequest {
479 request: Some(subscribe_run_request::Request::Open(SubscribeRunOpen {
480 run_ids: vec![run_id_http.clone()],
481 step_ids: Vec::new(),
482 kinds: Vec::new(),
483 severities: Vec::new(),
484 tags: Vec::new(),
485 since_offset: 0,
486 batch_size: 128,
487 max_inflight: 64,
488 heartbeat_interval_ms: 1_000,
489 cursor: "".into(),
490 })),
491 })
492 .await?;
493 let outbound = ReceiverStream::new(rx);
494 let mut tap_request = tonic::Request::new(outbound);
495 tap_request.metadata_mut().insert(
496 "authorization",
497 MetadataValue::try_from("Bearer demo-reader")?,
498 );
499 let mut stream = tap_client.subscribe_run(tap_request).await?.into_inner();
500 let mut saw_event = false;
501 while let Some(mut response) = stream.message().await? {
502 if response.heartbeat {
503 continue;
504 }
505 saw_event = true;
506 let offset = response.offset;
507 if offset > 0 {
508 tx.send(SubscribeRunRequest {
509 request: Some(subscribe_run_request::Request::Ack(SubscribeRunAck {
510 offset,
511 })),
512 })
513 .await?;
514 }
515 break;
516 }
517 drop(tx);
518 assert!(saw_event, "expected tap stream to yield events");
519
520 #[derive(Clone, serde::Serialize, clickhouse::Row)]
521 struct RunEventInsert {
522 run_id: Uuid,
523 created_at: chrono::DateTime<Utc>,
524 status: String,
525 attributes: Value,
526 }
527
528 #[derive(Clone, serde::Serialize, clickhouse::Row)]
529 struct StepEventInsert {
530 run_id: Uuid,
531 step_id: Uuid,
532 status: String,
533 attempt: i32,
534 started_at: chrono::DateTime<Utc>,
535 finished_at: Option<chrono::DateTime<Utc>>,
536 attributes: Value,
537 }
538
539 let telemetry_client = Client::default()
540 .with_url(ch_url.clone())
541 .with_database("telemetry");
542 let now = Utc::now();
543
544 let mut run_insert = telemetry_client.insert("telemetry.run_events")?;
545 run_insert
546 .write(&RunEventInsert {
547 run_id: run_uuid,
548 created_at: now,
549 status: "succeeded".to_string(),
550 attributes: json!({ "source": "acceptance" }),
551 })
552 .await?;
553 run_insert.end().await?;
554
555 let mut step_insert = telemetry_client.insert("telemetry.step_events")?;
556 step_insert
557 .write(&StepEventInsert {
558 run_id: run_uuid,
559 step_id: first_step.step_id,
560 status: "succeeded".to_string(),
561 attempt: first_step.attempt,
562 started_at: first_step.created_at,
563 finished_at: Some(Utc::now()),
564 attributes: json!({ "source": "acceptance" }),
565 })
566 .await?;
567 step_insert.end().await?;
568
569 let mut tap_client = TapServiceClient::connect(endpoint.clone()).await?;
570 let mut query_request = tap::QueryEventsRequest {
571 run_ids: vec![run_id_http.clone()],
572 workspace_ids: Vec::new(),
573 kinds: Vec::new(),
574 policy_effects: Vec::new(),
575 start_time: None,
576 end_time: None,
577 page_size: 10,
578 page_token: String::new(),
579 source: tap::QueryEventSource::QueryEventSourceRun as i32,
580 step_ids: Vec::new(),
581 ascending: true,
582 severities: Vec::new(),
583 };
584 let mut query_req = tonic::Request::new(query_request);
585 query_req.metadata_mut().insert(
586 "authorization",
587 MetadataValue::try_from("Bearer demo-reader")?,
588 );
589 let query_response = tap_client.query_events(query_req).await?.into_inner();
590 assert!(
591 !query_response.events.is_empty(),
592 "expected ClickHouse-backed QueryEvents response"
593 );
594
595 let mut labels = HashMap::new();
596 labels.insert("purpose".to_string(), "breakpoint".to_string());
597 let run2_spec = RunSpec {
598 dag_json: dag_json_string.clone(),
599 inputs: None,
600 seed: 99,
601 labels,
602 breakpoints: vec![dag_step_id.to_string()],
603 ..Default::default()
604 };
605
606 let mut submit_req = tonic::Request::new(SubmitRunRequest {
607 spec: Some(run2_spec),
608 idempotency_key: String::new(),
609 });
610 submit_req.metadata_mut().insert(
611 "authorization",
612 MetadataValue::try_from("Bearer demo-writer")?,
613 );
614 let run2_id = runtime_client
615 .submit_run(submit_req)
616 .await?
617 .into_inner()
618 .run_id;
619
620 let mut paused = false;
621 for _ in 0..40 {
622 let mut get_req = tonic::Request::new(GetRunRequest {
623 run_id: run2_id.clone(),
624 });
625 get_req.metadata_mut().insert(
626 "authorization",
627 MetadataValue::try_from("Bearer demo-reader")?,
628 );
629 let response = runtime_client.get_run(get_req).await?.into_inner();
630 if let Some(run) = response.run {
631 if run.status == runtime::RunStatus::RunStatusPausedAtStep as i32 {
632 paused = true;
633 break;
634 }
635 }
636 sleep(Duration::from_millis(250)).await;
637 }
638 assert!(paused, "run should pause at declared breakpoint");
639
640 let mut update_req = tonic::Request::new(runtime::UpdateBreakpointsRequest {
641 run_id: run2_id.clone(),
642 add: Vec::new(),
643 remove: Vec::new(),
644 clear_all: true,
645 });
646 update_req.metadata_mut().insert(
647 "authorization",
648 MetadataValue::try_from("Bearer demo-writer")?,
649 );
650 let update_response = runtime_client
651 .update_breakpoints(update_req)
652 .await?
653 .into_inner();
654 assert!(
655 update_response.breakpoints.is_empty(),
656 "clear_all should remove pending breakpoints"
657 );
658
659 let mut resume_req = tonic::Request::new(runtime::ResumeRunRequest {
660 run_id: run2_id.clone(),
661 });
662 resume_req.metadata_mut().insert(
663 "authorization",
664 MetadataValue::try_from("Bearer demo-writer")?,
665 );
666 let resume_response = runtime_client.resume_run(resume_req).await?.into_inner();
667 assert_eq!(
668 resume_response.status,
669 runtime::RunStatus::RunStatusRunning as i32,
670 "resume should transition run back to running"
671 );
672
673 let _ = wait_for_run(&mut runtime_client, &run2_id).await?;
674
675 scheduler_handle.abort();
676 api_handle.abort();
677 drop(clickhouse);
678 drop(postgres);
679
680 Ok(())
681 }
682
683 async fn submit_and_wait(
684 client: &mut RuntimeServiceClient<tonic::transport::Channel>,
685 tap_endpoint: &str,
686 spec: RunSpec,
687 ) -> AnyResult<(String, Vec<RunEventEnvelope>, runtime::RunDetail)> {
688 let submit_request = SubmitRunRequest {
689 spec: Some(spec),
690 idempotency_key: String::new(),
691 };
692
693 let mut request = tonic::Request::new(submit_request);
694 request.metadata_mut().insert(
695 "authorization",
696 MetadataValue::try_from("Bearer demo-writer")?,
697 );
698
699 let response = client.submit_run(request).await?.into_inner();
700 let run_id = response.run_id;
701
702 let mut tap_client = TapServiceClient::connect(tap_endpoint.to_string()).await?;
703 let (mut tx, rx) = mpsc::channel::<SubscribeRunRequest>(32);
704 tx.send(SubscribeRunRequest {
705 request: Some(subscribe_run_request::Request::Open(SubscribeRunOpen {
706 run_ids: vec![run_id.clone()],
707 step_ids: Vec::new(),
708 kinds: Vec::new(),
709 severities: Vec::new(),
710 tags: Vec::new(),
711 since_offset: 0,
712 batch_size: 256,
713 max_inflight: 64,
714 heartbeat_interval_ms: 1_000,
715 })),
716 })
717 .await?;
718
719 let outbound = ReceiverStream::new(rx);
720 let mut tap_request = tonic::Request::new(outbound);
721 tap_request.metadata_mut().insert(
722 "authorization",
723 MetadataValue::try_from("Bearer demo-reader")?,
724 );
725
726 let mut stream = tap_client.subscribe_run(tap_request).await?.into_inner();
727 let mut events = Vec::new();
728
729 while let Some(mut response) = stream.message().await? {
730 if response.heartbeat {
731 continue;
732 }
733
734 if let Some(event) = response.event.take() {
735 if response.offset > 0 {
736 let ack = SubscribeRunRequest {
737 request: Some(subscribe_run_request::Request::Ack(SubscribeRunAck {
738 offset: response.offset,
739 })),
740 };
741 if tx.send(ack).await.is_err() {
742 break;
743 }
744 }
745
746 let done = event.tags.iter().any(|tag| tag == "kind:run_succeeded");
747 events.push(event);
748 if done {
749 break;
750 }
751 }
752 }
753
754 let run_detail = wait_for_run(client, &run_id).await?;
755 Ok((run_id, events, run_detail))
756 }
757
758 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
759 async fn golden_path_agent_team_replay_within_one_percent() -> AnyResult<()> {
760 if std::fs::metadata("/var/run/docker.sock").is_err()
761 && std::env::var("DOCKER_HOST").is_err()
762 {
763 eprintln!("Skipping E2E test because Docker is unavailable");
764 return Ok(());
765 }
766
767 let docker = Cli::default();
768 let container = docker.run(Postgres::default());
769 let port = container.get_host_port_ipv4(5432);
770 let database_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
771 sleep(Duration::from_millis(250)).await;
772
773 let storage = Arc::new(
774 Storage::connect(StorageConfig {
775 database_url,
776 max_connections: 4,
777 connect_timeout: Duration::from_secs(10),
778 object_store: ObjectStoreConfig::InMemory,
779 })
780 .await?,
781 );
782 let audit = Arc::new(Audit::new(storage.pool().clone()));
783
784 let mut registry = ExecutorRegistry::new();
785 registry.register(Arc::new(EchoToolExecutor))?;
786
787 let scheduler = Arc::new(Scheduler::new(
788 Arc::clone(&storage),
789 Arc::new(registry),
790 AllowAllPolicy::shared(),
791 Arc::clone(&audit),
792 Arc::clone(&attestation_vault),
793 SchedulerConfig::default(),
794 ));
795
796 let scheduler_handle = tokio::spawn({
797 let scheduler = Arc::clone(&scheduler);
798 async move {
799 scheduler.run().await.expect("scheduler loop");
800 }
801 });
802
803 let grpc_port = reserve_port();
804 let http_port = reserve_port();
805 std::env::set_var(
806 "FLEETFORGE_API_TOKENS",
807 "writer=demo-writer,reader=demo-reader",
808 );
809 std::env::set_var("FLEETFORGE_API_ADDR", format!("127.0.0.1:{grpc_port}"));
810 std::env::set_var("FLEETFORGE_HTTP_ADDR", format!("127.0.0.1:{http_port}"));
811
812 let api_storage = Arc::clone(&storage);
813 let api_audit = Arc::clone(&audit);
814 let api_handle = tokio::spawn(async move {
815 if let Err(err) = start_api_server(api_storage, api_audit).await {
816 eprintln!("API server exited: {err:?}");
817 }
818 });
819
820 sleep(Duration::from_millis(500)).await;
821
822 let endpoint = format!("http://127.0.0.1:{grpc_port}");
823 let channel = tonic::transport::Channel::from_shared(endpoint.clone())?
824 .connect()
825 .await?;
826 let mut client = RuntimeServiceClient::new(channel);
827
828 let (dag_json, inputs_struct, seed, labels) = load_agent_team_spec()?;
829 let run_spec = RunSpec {
830 dag_json: dag_json.clone(),
831 inputs: inputs_struct.clone(),
832 seed,
833 labels: labels.clone(),
834 };
835
836 let (run_a, events_a, _) =
837 submit_and_wait(&mut client, &endpoint, run_spec.clone()).await?;
838 assert!(events_a
839 .iter()
840 .any(|event| event.tags.iter().any(|tag| tag == "kind:run_succeeded")));
841
842 let trace_ids: HashSet<_> = events_a
843 .iter()
844 .filter_map(|event| event.trace.as_ref())
845 .map(|trace| trace.trace_id.clone())
846 .filter(|id| !id.is_empty())
847 .collect();
848 assert_eq!(
849 trace_ids.len(),
850 1,
851 "expected a single trace_id across the agent run"
852 );
853
854 let mut saw_budget = false;
855 let mut saw_policy = false;
856 for event in &events_a {
857 if let Some(raw) = event.raw.as_ref() {
858 let raw_json = prost_struct_to_json(raw);
859 if raw_json
860 .get("budget")
861 .and_then(|value| value.as_object())
862 .is_some()
863 {
864 saw_budget = true;
865 }
866 if raw_json
867 .get("policy_decisions")
868 .and_then(|value| value.as_object())
869 .is_some()
870 {
871 saw_policy = true;
872 }
873 }
874 }
875 assert!(saw_budget, "expected budget telemetry in stream payloads");
876 assert!(
877 saw_policy,
878 "expected policy decision summaries in stream payloads"
879 );
880
881 let steps_a = storage
882 .fetch_steps_for_run(Uuid::parse_str(&run_a)?)
883 .await?;
884
885 let (run_b, events_b, _) = submit_and_wait(&mut client, &endpoint, run_spec).await?;
886 assert!(events_b
887 .iter()
888 .any(|event| event.tags.iter().any(|tag| tag == "kind:run_succeeded")));
889
890 let steps_b = storage
891 .fetch_steps_for_run(Uuid::parse_str(&run_b)?)
892 .await?;
893
894 assert_eq!(steps_a.len(), steps_b.len(), "step count mismatch");
895
896 let mut by_id: HashMap<Uuid, &models::Step> = HashMap::new();
897 for step in &steps_a {
898 by_id.insert(step.step_id, step);
899 }
900
901 for step_b in &steps_b {
902 let Some(step_a) = by_id.get(&step_b.step_id) else {
903 anyhow::bail!("step {} missing in original run", step_b.step_id);
904 };
905 if let (Some(tokens_a), Some(tokens_b)) = (step_a.actual_tokens, step_b.actual_tokens) {
906 if tokens_a > 0 {
907 let diff = ((tokens_a - tokens_b).abs() as f64) / (tokens_a as f64);
908 assert!(diff < 0.01, "token drift {:?}", diff);
909 }
910 }
911 if let (Some(cost_a), Some(cost_b)) = (step_a.actual_cost, step_b.actual_cost) {
912 if cost_a > 0.0 {
913 let diff = ((cost_a - cost_b).abs()) / cost_a;
914 assert!(diff < 0.01, "cost drift {:?}", diff);
915 }
916 }
917 }
918
919 scheduler_handle.abort();
920 api_handle.abort();
921 drop(container);
922 std::env::remove_var("FLEETFORGE_API_TOKENS");
923 std::env::remove_var("FLEETFORGE_API_ADDR");
924 std::env::remove_var("FLEETFORGE_HTTP_ADDR");
925
926 Ok(())
927 }
928
929 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
930 async fn golden_path_langgraph_capability_and_attestations() -> AnyResult<()> {
931 if !docker_available() {
932 eprintln!("Skipping LangGraph trust test because Docker is unavailable");
933 return Ok(());
934 }
935
936 run_agent_adapter_smoke(AgentAdapterTestConfig {
937 adapter_kind: "langgraph",
938 entrypoint: "tests.python.simple_langgraph:create_graph",
939 slug: "langgraph_demo",
940 initial_state: json!({ "topic": "trust mesh" }),
941 adapter_inputs: None,
942 attestation_key: b"langgraph-test-key",
943 })
944 .await
945 }
946
947 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
948 async fn adapter_smoke_autogen_attestations() -> AnyResult<()> {
949 if !docker_available() {
950 eprintln!("Skipping AutoGen smoke test because Docker is unavailable");
951 return Ok(());
952 }
953
954 run_agent_adapter_smoke(AgentAdapterTestConfig {
955 adapter_kind: "autogen",
956 entrypoint: "tests.python.autogen_stub:create_adapter",
957 slug: "autogen_demo",
958 initial_state: Value::Null,
959 adapter_inputs: Some(json!({ "task": "document adapter contracts" })),
960 attestation_key: b"autogen-smoke-key",
961 })
962 .await
963 }
964
965 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
966 async fn adapter_smoke_crewai_attestations() -> AnyResult<()> {
967 if !docker_available() {
968 eprintln!("Skipping CrewAI smoke test because Docker is unavailable");
969 return Ok(());
970 }
971
972 run_agent_adapter_smoke(AgentAdapterTestConfig {
973 adapter_kind: "crewai",
974 entrypoint: "tests.python.crewai_stub:create_adapter",
975 slug: "crewai_demo",
976 initial_state: Value::Null,
977 adapter_inputs: Some(json!({ "inputs": { "topic": "trust mesh" } })),
978 attestation_key: b"crewai-smoke-key",
979 })
980 .await
981 }
982
983 struct EnvVarGuard {
984 key: String,
985 previous: Option<String>,
986 }
987
988 impl EnvVarGuard {
989 fn set(key: &str, value: &str) -> Self {
990 let previous = std::env::var(key).ok();
991 std::env::set_var(key, value);
992 Self {
993 key: key.to_string(),
994 previous,
995 }
996 }
997 }
998
999 impl Drop for EnvVarGuard {
1000 fn drop(&mut self) {
1001 if let Some(value) = &self.previous {
1002 std::env::set_var(&self.key, value);
1003 } else {
1004 std::env::remove_var(&self.key);
1005 }
1006 }
1007 }
1008
1009 #[derive(Clone)]
1010 struct AgentAdapterTestConfig {
1011 adapter_kind: &'static str,
1012 entrypoint: &'static str,
1013 slug: &'static str,
1014 initial_state: Value,
1015 adapter_inputs: Option<Value>,
1016 attestation_key: &'static [u8],
1017 }
1018
1019 async fn run_agent_adapter_smoke(config: AgentAdapterTestConfig) -> AnyResult<()> {
1020 let telemetry = TestTelemetry::new();
1021 let docker = Cli::default();
1022 let postgres = docker.run(Postgres::default());
1023 let pg_port = postgres.get_host_port_ipv4(5432);
1024 let database_url = format!("postgres://postgres:postgres@127.0.0.1:{pg_port}/postgres");
1025 let storage = Arc::new(
1026 Storage::connect(StorageConfig {
1027 database_url: database_url.clone(),
1028 max_connections: 5,
1029 connect_timeout: Duration::from_secs(5),
1030 object_store: ObjectStoreConfig::InMemory,
1031 })
1032 .await?,
1033 );
1034 let audit = Arc::new(Audit::new(storage.pool().clone()));
1035 let attestation_vault: Arc<dyn AttestationVault> = Arc::new(
1036 ObjectStoreAttestationVault::new(storage.object_store(), storage.pool().clone()),
1037 );
1038
1039 let mut registry = ExecutorRegistry::new();
1040 registry.register(Arc::new(LangGraphAgentExecutor::new()))?;
1041 let scheduler = Arc::new(Scheduler::new(
1042 Arc::clone(&storage),
1043 Arc::new(registry),
1044 AllowAllPolicy::shared(),
1045 Arc::clone(&audit),
1046 Arc::clone(&attestation_vault),
1047 SchedulerConfig::default(),
1048 ));
1049 let scheduler_handle = tokio::spawn({
1050 let scheduler = Arc::clone(&scheduler);
1051 async move {
1052 scheduler
1053 .run()
1054 .await
1055 .expect("scheduler loop should complete cleanly");
1056 }
1057 });
1058
1059 let grpc_port = reserve_port();
1060 let http_port = reserve_port();
1061 let _api_tokens = EnvVarGuard::set(
1062 "FLEETFORGE_API_TOKENS",
1063 "writer=demo-writer,reader=demo-reader",
1064 );
1065 let _api_addr = EnvVarGuard::set("FLEETFORGE_API_ADDR", &format!("127.0.0.1:{grpc_port}"));
1066 let _http_addr =
1067 EnvVarGuard::set("FLEETFORGE_HTTP_ADDR", &format!("127.0.0.1:{http_port}"));
1068 let attestation_b64 = BASE64.encode(config.attestation_key);
1069 let _trust_guard = EnvVarGuard::set("TRUST_MESH_ALPHA", "1");
1070 let _attestation_guard = EnvVarGuard::set("FLEETFORGE_ATTESTATION_KEY", &attestation_b64);
1071
1072 let api_storage = Arc::clone(&storage);
1073 let api_audit = Arc::clone(&audit);
1074 let api_handle = tokio::spawn(async move {
1075 if let Err(err) = start_api_server(api_storage, api_audit).await {
1076 eprintln!("API server exited: {err:?}");
1077 }
1078 });
1079
1080 sleep(Duration::from_millis(500)).await;
1081
1082 let endpoint = format!("http://127.0.0.1:{grpc_port}");
1083 let mut client = RuntimeServiceClient::connect(endpoint.clone()).await?;
1084
1085 let repo = repo_root();
1086 let python_paths = vec![
1087 repo.join("sdk/python").to_string_lossy().into_owned(),
1088 repo.join("tests/python").to_string_lossy().into_owned(),
1089 ];
1090 let python_config = json!({ "path": python_paths });
1091 let mut inputs_map = serde_json::Map::new();
1092 inputs_map.insert(
1093 "entrypoint".into(),
1094 Value::String(config.entrypoint.to_string()),
1095 );
1096 inputs_map.insert(
1097 "adapter".into(),
1098 Value::String(config.adapter_kind.to_string()),
1099 );
1100 inputs_map.insert("initial_state".into(), config.initial_state.clone());
1101 inputs_map.insert("python".into(), python_config);
1102 if let Some(extra) = config.adapter_inputs.clone() {
1103 inputs_map.insert("adapter_inputs".into(), extra);
1104 }
1105
1106 let step_id = Uuid::new_v4();
1107 let dag_value = json!({
1108 "steps": [{
1109 "id": step_id,
1110 "type": "agent",
1111 "slug": config.slug,
1112 "inputs": Value::Object(inputs_map.clone()),
1113 "policy": {
1114 "budget": { "tokens": 128, "cost": 0.005 },
1115 "priority": 10
1116 }
1117 }],
1118 "edges": []
1119 });
1120 let dag_json = serde_json::to_string(&dag_value)?;
1121 let run_spec = RunSpec {
1122 dag_json,
1123 inputs: None,
1124 seed: 42,
1125 labels: HashMap::from([("suite".into(), "adapter_smoke".into())]),
1126 };
1127
1128 let (run_id, events, _) = submit_and_wait(&mut client, &endpoint, run_spec).await?;
1129 assert!(
1130 events
1131 .iter()
1132 .any(|event| event.tags.iter().any(|tag| tag == "kind:run_succeeded")),
1133 "adapter run should succeed"
1134 );
1135
1136 let run_uuid = Uuid::parse_str(&run_id)?;
1137 let steps = storage.fetch_steps_for_run(run_uuid).await?;
1138 let step = steps
1139 .iter()
1140 .find(|candidate| candidate.step_id == step_id)
1141 .context("agent step not recorded")?;
1142 let snapshot = step
1143 .output_snapshot
1144 .as_ref()
1145 .context("output snapshot missing")?;
1146
1147 let capability_value = snapshot
1148 .get("capability_token")
1149 .context("capability token missing")?;
1150 let token: CapabilityToken = serde_json::from_value(capability_value.clone())?;
1151 verify_capability_token(&token)?;
1152 assert_eq!(token.claims.scope.tool.name, config.slug);
1153
1154 let expected_schema_hash = step
1155 .spec_json
1156 .get("inputs")
1157 .map(digest_json)
1158 .context("spec inputs missing")?;
1159 assert_eq!(token.claims.scope.schema.hash, expected_schema_hash);
1160
1161 let snapshot_token_id = snapshot
1162 .get("capability_token_id")
1163 .and_then(Value::as_str)
1164 .context("capability_token_id missing")?;
1165 assert_eq!(snapshot_token_id, token.token_id().to_string());
1166
1167 let mut tampered = token.clone();
1168 tampered.claims.scope.schema.hash = "bad".to_string();
1169 assert!(
1170 verify_capability_token(&tampered).is_err(),
1171 "tampered capability token should fail verification"
1172 );
1173
1174 let attestation_value = snapshot
1175 .get("attestation")
1176 .context("adapter attestation missing")?;
1177 let attestation_id = verify_adapter_attestation(attestation_value, config.attestation_key)?;
1178 let adapter_ids = snapshot
1179 .get("attestation_ids")
1180 .and_then(Value::as_array)
1181 .context("adapter attestation ids missing")?;
1182 assert!(
1183 adapter_ids
1184 .iter()
1185 .any(|value| value.as_str() == Some(&attestation_id)),
1186 "adapter attestation id should be embedded in outputs"
1187 );
1188
1189 telemetry.force_flush();
1190 let otel_attestation =
1191 telemetry.find_attribute("fleetforge.agent.langgraph", "trust.attestation_id");
1192 assert_eq!(
1193 otel_attestation.as_deref(),
1194 Some(attestation_id.as_str()),
1195 "OTEL span should carry trust.attestation_id"
1196 );
1197
1198 let subject_id = format!("run:{}:step:{}", run_uuid, step_id);
1199 let row = sqlx::query(
1200 "select attestation_id, object_path from attestation_records \
1201 where subject_id = $1 and subject_kind = 'step' order by created_at desc limit 1",
1202 )
1203 .bind(&subject_id)
1204 .fetch_one(storage.pool())
1205 .await?;
1206 let recorded_path: String = row.try_get("object_path")?;
1207 let recorded_attestation = storage
1208 .object_store()
1209 .get(&Path::from(recorded_path.clone()))
1210 .await?
1211 .bytes()
1212 .await?;
1213 let vault_attestation: Attestation = serde_json::from_slice(&recorded_attestation)?;
1214 match vault_attestation.subject {
1215 Some(TrustSubject::Step {
1216 run_id,
1217 step_id: vault_step,
1218 }) => {
1219 assert_eq!(run_id, run_uuid);
1220 assert_eq!(vault_step, step_id);
1221 }
1222 other => anyhow::bail!("unexpected attestation subject {:?}", other),
1223 }
1224
1225 let replay = ReplayExecutor::new(Arc::clone(&storage));
1226 let replay_outcome = replay
1227 .replay_run(
1228 RunId(run_uuid),
1229 ReplayStrategy::Mocked,
1230 DriftTolerance {
1231 tokens: 0.0,
1232 cost: 0.0,
1233 },
1234 )
1235 .await?;
1236 assert!(
1237 replay_outcome.all_within_tolerance,
1238 "replay should remain within tolerance"
1239 );
1240
1241 scheduler_handle.abort();
1242 api_handle.abort();
1243 drop(postgres);
1244
1245 telemetry.shutdown();
1246
1247 Ok(())
1248 }
1249
1250 struct TestTelemetry {
1251 spans: Arc<Mutex<Vec<SpanData>>>,
1252 provider: trace::TracerProvider,
1253 _subscriber: DefaultGuard,
1254 }
1255
1256 impl TestTelemetry {
1257 fn new() -> Self {
1258 let spans = Arc::new(Mutex::new(Vec::new()));
1259 let exporter = RecordingExporter::new(Arc::clone(&spans));
1260 let provider = trace::TracerProvider::builder()
1261 .with_simple_exporter(exporter)
1262 .build();
1263 let tracer = provider.tracer("fleetforge-runtime-test", None);
1264 let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
1265 let subscriber = Registry::default()
1266 .with(EnvFilter::new("info"))
1267 .with(otel_layer);
1268 let guard = tracing::subscriber::set_default(subscriber);
1269 Self {
1270 spans,
1271 provider,
1272 _subscriber: guard,
1273 }
1274 }
1275
1276 fn force_flush(&self) {
1277 let _ = self.provider.force_flush();
1278 }
1279
1280 fn find_attribute(&self, span_name: &str, key: &str) -> Option<String> {
1281 let spans = self.spans.lock().expect("span mutex poisoned");
1282 for span in spans.iter() {
1283 if span.name.as_ref() == span_name {
1284 for (attr_key, value) in span.attributes.iter() {
1285 if attr_key.as_str() == key {
1286 return Some(match value {
1287 OtelValue::String(s) => s.clone().into_owned(),
1288 OtelValue::Bool(b) => b.to_string(),
1289 OtelValue::I64(v) => v.to_string(),
1290 OtelValue::F64(v) => v.to_string(),
1291 OtelValue::Array(arr) => format!("{arr:?}"),
1292 });
1293 }
1294 }
1295 }
1296 }
1297 None
1298 }
1299
1300 fn shutdown(self) {
1301 drop(self);
1302 }
1303 }
1304
1305 impl Drop for TestTelemetry {
1306 fn drop(&mut self) {
1307 let _ = self.provider.shutdown();
1308 telemetry_otel::shutdown_tracer_provider();
1309 }
1310 }
1311
1312 struct RecordingExporter {
1313 spans: Arc<Mutex<Vec<SpanData>>>,
1314 }
1315
1316 impl RecordingExporter {
1317 fn new(spans: Arc<Mutex<Vec<SpanData>>>) -> Self {
1318 Self { spans }
1319 }
1320 }
1321
1322 #[async_trait::async_trait]
1323 impl SpanExporter for RecordingExporter {
1324 async fn export(&mut self, batch: Vec<SpanData>) -> ExportResult {
1325 let mut guard = self.spans.lock().expect("span mutex poisoned");
1326 guard.extend(batch);
1327 ExportResult::Success
1328 }
1329
1330 fn shutdown(&mut self) -> ExportResult {
1331 self.spans.lock().expect("span mutex poisoned").clear();
1332 ExportResult::Success
1333 }
1334 }
1335
1336 fn canonicalise(value: &Value) -> Value {
1337 match value {
1338 Value::Object(map) => {
1339 let mut ordered = BTreeMap::new();
1340 for (key, value) in map.iter() {
1341 ordered.insert(key.clone(), canonicalise(value));
1342 }
1343 Value::Object(ordered)
1344 }
1345 Value::Array(items) => Value::Array(items.iter().map(canonicalise).collect()),
1346 other => other.clone(),
1347 }
1348 }
1349
1350 fn canonical_json_bytes(value: &Value) -> Vec<u8> {
1351 let canonical = canonicalise(value);
1352 serde_json::to_vec(&canonical).expect("canonical JSON serialization must succeed")
1353 }
1354
1355 fn verify_adapter_attestation(value: &Value, key: &[u8]) -> AnyResult<String> {
1356 let signature_value = value
1357 .get("signature")
1358 .and_then(Value::as_object)
1359 .and_then(|sig| sig.get("value"))
1360 .and_then(Value::as_str)
1361 .context("attestation signature missing")?;
1362 let statement = value
1363 .get("statement")
1364 .context("attestation statement missing")?;
1365 let mut mac = Hmac::<Sha256>::new_from_slice(key)?;
1366 mac.update(&canonical_json_bytes(statement));
1367 let expected = BASE64.encode(mac.finalize().into_bytes());
1368 anyhow::ensure!(
1369 signature_value == expected,
1370 "adapter attestation signature mismatch"
1371 );
1372 let attestation_id = value
1373 .get("id")
1374 .and_then(Value::as_str)
1375 .context("attestation id missing")?;
1376 Ok(attestation_id.to_string())
1377 }
1378}
1379
1380#[derive(Clone, Copy)]
1381struct RetentionWorkerConfig {
1382 run_days: i64,
1383 artifact_days: i64,
1384 interval: tokio::time::Duration,
1385}
1386
1387fn retention_config_from_env() -> Option<RetentionWorkerConfig> {
1388 let raw_run_env = std::env::var("FLEETFORGE_RETENTION_DAYS").ok()?;
1389 if !feature_allowed(LicensedFeature::LongRetention) {
1390 warn!(
1391 "FLEETFORGE_RETENTION_DAYS is set but the current license does not include extended retention; ignoring."
1392 );
1393 return None;
1394 }
1395 let raw_run = raw_run_env;
1396 let run_days: i64 = raw_run.parse().ok()?;
1397
1398 let artifact_days = std::env::var("FLEETFORGE_RETENTION_ARTIFACT_DAYS")
1399 .ok()
1400 .and_then(|v| v.parse::<i64>().ok())
1401 .unwrap_or(run_days);
1402
1403 if run_days <= 0 && artifact_days <= 0 {
1404 return None;
1405 }
1406
1407 let interval_secs = std::env::var("FLEETFORGE_RETENTION_INTERVAL_SECS")
1408 .ok()
1409 .and_then(|value| value.parse::<u64>().ok())
1410 .unwrap_or(3600)
1411 .max(60);
1412
1413 Some(RetentionWorkerConfig {
1414 run_days,
1415 artifact_days,
1416 interval: tokio::time::Duration::from_secs(interval_secs),
1417 })
1418}
1419
1420fn spawn_retention_worker(storage: Arc<Storage>) {
1421 if let Some(config) = retention_config_from_env() {
1422 let run_window = if config.run_days > 0 {
1423 Some(ChronoDuration::days(config.run_days as i64))
1424 } else {
1425 None
1426 };
1427 info!(
1428 retention_days = config.run_days,
1429 artifact_days = config.artifact_days,
1430 interval_secs = config.interval.as_secs(),
1431 "starting retention scrub worker"
1432 );
1433 let worker_storage = Arc::clone(&storage);
1434 let worker_config = config;
1435 tokio::spawn(async move {
1436 let run_window = run_window;
1437 loop {
1438 if let Some(window) = run_window {
1439 let cutoff = Utc::now() - window;
1440 match worker_storage.apply_retention_policy(cutoff).await {
1441 Ok(stats) => {
1442 if stats.runs_scrubbed > 0
1443 || stats.steps_scrubbed > 0
1444 || stats.events_scrubbed > 0
1445 {
1446 info!(
1447 runs_scrubbed = stats.runs_scrubbed,
1448 steps_scrubbed = stats.steps_scrubbed,
1449 events_scrubbed = stats.events_scrubbed,
1450 "retention scrub completed"
1451 );
1452 }
1453 }
1454 Err(err) => {
1455 warn!(error = %err, "retention scrub failed");
1456 }
1457 }
1458 }
1459
1460 let object_store = worker_storage.object_store();
1461 if let Err(err) = purge_retention(
1462 worker_storage.pool(),
1463 object_store.as_ref(),
1464 &StorageRetentionConfig {
1465 run_days: worker_config.run_days.max(0),
1466 artifact_days: worker_config.artifact_days.max(0),
1467 },
1468 )
1469 .await
1470 {
1471 warn!(error = %err, "retention purge failed");
1472 }
1473
1474 tokio::time::sleep(worker_config.interval).await;
1475 }
1476 });
1477 } else {
1478 info!("retention scrub worker disabled");
1479 }
1480}
1481
1482pub async fn bootstrap() -> Result<()> {
1484 let telemetry = TelemetryGuard::init("fleetforge-runtime")?;
1485 let storage = Arc::new(Storage::connect_default().await?);
1486 let audit = Arc::new(Audit::new(storage.pool().clone()));
1487 let attestation_vault: Arc<dyn AttestationVault> = Arc::new(ObjectStoreAttestationVault::new(
1488 storage.object_store(),
1489 storage.pool().clone(),
1490 ));
1491 let prompt_registry: Option<Arc<dyn PromptRegistry>> =
1492 match PromptCompiler::from_directory("prompts/agent_packs") {
1493 Ok(registry) => {
1494 info!("loaded prompt packs from ./prompts/agent_packs");
1495 Some(Arc::new(registry) as Arc<dyn PromptRegistry>)
1496 }
1497 Err(err) => {
1498 info!(
1499 error = %err,
1500 "prompt registry unavailable; continuing without prompt packs"
1501 );
1502 None
1503 }
1504 };
1505 let mut executor_registry = ExecutorRegistry::new();
1506 let gateways = Arc::new(GatewayRegistry::with_defaults());
1507 executor_registry.register(Arc::new(LlmExecutor::new(
1508 Arc::clone(&gateways),
1509 prompt_registry.clone(),
1510 Option::<Arc<dyn MemoryAdapter>>::None,
1511 )))?;
1512 executor_registry.register(Arc::new(DeterministicReplayExecutor::default()))?;
1513 executor_registry.register(Arc::new(HttpProxyExecutor::new()?))?;
1514 executor_registry.register(Arc::new(LangGraphAgentExecutor::new()))?;
1515
1516 let docker_executor = Arc::new(DockerToolExecutor::new().map_err(|err| {
1517 anyhow!(
1518 "failed to initialise Docker tool executor: {err}. Install Docker or set FLEETFORGE_DOCKER_CLI/FLEETFORGE_TOOLBOX_IMAGE."
1519 )
1520 })?);
1521
1522 let firecracker_executor = match FirecrackerToolExecutor::new() {
1523 Ok(exec) => {
1524 info!("firecracker sandbox enabled for tool execution");
1525 Some(Arc::new(exec))
1526 }
1527 Err(err) => {
1528 info!(error = %err, "firecracker sandbox disabled; continuing with docker-only tools");
1529 None
1530 }
1531 };
1532
1533 executor_registry.register(Arc::new(ToolRouterExecutor::new(
1534 Arc::clone(&docker_executor),
1535 firecracker_executor,
1536 )))?;
1537 #[cfg(feature = "insecure-shell")]
1538 {
1539 match executor::ShellExecutor::new() {
1540 Ok(shell) => executor_registry.register(Arc::new(shell))?,
1541 Err(err) => {
1542 warn!(error = %err, "skipping insecure shell executor registration; enable it explicitly for local demos");
1543 }
1544 }
1545 }
1546 let executors = Arc::new(executor_registry);
1547 let policy_pack: Arc<RuntimePolicyPack> = match std::env::var("FLEETFORGE_POLICY_PACK") {
1548 Ok(value) => match value.trim().to_ascii_lowercase().as_str() {
1549 "default" | "" => {
1550 info!("loading default Trust Mesh policy pack");
1551 Arc::new(RuntimePolicyPack::new(
1552 "fleetforge.policy.default",
1553 shared_default_pack()?,
1554 ))
1555 }
1556 "hipaa" => {
1557 info!("loading HIPAA regulated policy pack");
1558 let engine: Arc<dyn PolicyEngine> =
1559 Arc::new(RegulatedPack::new(RegulatedVertical::Hipaa)?);
1560 Arc::new(RuntimePolicyPack::new(
1561 "fleetforge.policy.regulated.hipaa",
1562 engine,
1563 ))
1564 }
1565 "gdpr" => {
1566 info!("loading GDPR regulated policy pack");
1567 let engine: Arc<dyn PolicyEngine> =
1568 Arc::new(RegulatedPack::new(RegulatedVertical::Gdpr)?);
1569 Arc::new(RuntimePolicyPack::new(
1570 "fleetforge.policy.regulated.gdpr",
1571 engine,
1572 ))
1573 }
1574 "allow_all" => {
1575 info!("policy pack set to allow_all (no guardrails enforced)");
1576 Arc::new(RuntimePolicyPack::new(
1577 "fleetforge.policy.allow_all",
1578 AllowAllPolicy::shared(),
1579 ))
1580 }
1581 other => return Err(anyhow!("unknown policy pack '{other}'")),
1582 },
1583 Err(_) => {
1584 info!("policy pack not specified; loading default Trust Mesh pack");
1585 Arc::new(RuntimePolicyPack::new(
1586 "fleetforge.policy.default",
1587 shared_default_pack()?,
1588 ))
1589 }
1590 };
1591 let scheduler = Arc::new(Scheduler::new(
1592 storage.clone(),
1593 executors.clone(),
1594 policy_pack,
1595 Arc::clone(&audit),
1596 Arc::clone(&attestation_vault),
1597 SchedulerConfig::default(),
1598 ));
1599
1600 spawn_retention_worker(storage.clone());
1601
1602 let bus_storage = storage.clone();
1603 let bus_config = bus_config_from_env();
1604 let exporters = match fleetforge_telemetry::exporters::from_env() {
1605 Ok(value) => value,
1606 Err(err) => {
1607 warn!(error = ?err, "failed to initialise external exporters; continuing without them");
1608 Vec::new()
1609 }
1610 };
1611 let bus_handle = tokio::spawn(async move {
1612 run_outbox_forwarder_with_exporters(bus_config, bus_storage, exporters).await
1613 });
1614 let api_storage = storage.clone();
1615 let api_audit = audit.clone();
1616 let api_handle = tokio::spawn(async move { start_api_server(api_storage, api_audit).await });
1617 let scheduler_handle = tokio::spawn({
1618 let scheduler = scheduler.clone();
1619 async move { scheduler.run().await }
1620 });
1621
1622 let (bus_res, api_res, scheduler_res) = tokio::join!(bus_handle, api_handle, scheduler_handle);
1623
1624 bus_res.map_err(|err| anyhow!("bus task panicked: {err}"))??;
1625 api_res.map_err(|err| anyhow!("api task panicked: {err}"))??;
1626 scheduler_res.map_err(|err| anyhow!("scheduler task panicked: {err}"))??;
1627
1628 drop(storage);
1630 drop(telemetry);
1631 Ok(())
1632}
1633
1634fn bus_config_from_env() -> BusConfig {
1635 let mut config = BusConfig::default();
1636 if let Ok(bootstrap) = std::env::var("FLEETFORGE_BUS_BOOTSTRAP") {
1637 config.bootstrap_servers = bootstrap;
1638 }
1639 if let Ok(topic) = std::env::var("FLEETFORGE_BUS_TOPIC") {
1640 config.topic = topic;
1641 }
1642
1643 let transactional_id_env = std::env::var("FLEETFORGE_BUS_TRANSACTIONAL_ID")
1644 .ok()
1645 .filter(|id| !id.trim().is_empty());
1646 if let Some(tx_id) = transactional_id_env.clone() {
1647 config.transactional_id = Some(tx_id.clone());
1648 }
1649
1650 if transactional_id_env.is_some() {
1651 info!(
1652 transactional_id = transactional_id_env.as_deref().unwrap_or_default(),
1653 "outbox forwarder running in exactly-once mode; configure consumers with isolation.level=read_committed"
1654 );
1655 } else {
1656 info!(
1657 "outbox forwarder running in at-least-once mode; set FLEETFORGE_BUS_TRANSACTIONAL_ID for exactly-once"
1658 );
1659 }
1660
1661 config
1662}