fleetforge_bus/
lib.rs

1//! Kafka-compatible event bus utilities and transactional outbox forwarder.
2
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{anyhow, Context, Result};
7use async_trait::async_trait;
8use backoff::future::retry;
9use backoff::ExponentialBackoff;
10use rdkafka::producer::{FutureProducer, FutureRecord, Producer};
11use rdkafka::util::Timeout;
12use rdkafka::ClientConfig;
13use serde::{Deserialize, Serialize};
14use serde_json::to_vec;
15use tokio::time::sleep;
16use tracing::{debug, error, info, warn};
17
18use fleetforge_storage::models::OutboxEvent;
19use fleetforge_storage::Storage;
20
21/// External sink that receives a copy of every outbox event.
22#[async_trait]
23pub trait OutboxExporter: Send + Sync {
24    /// Returns a human-readable identifier for logging.
25    fn name(&self) -> &'static str {
26        "external"
27    }
28
29    /// Deliver an outbox event to the downstream sink.
30    async fn export(&self, event: &OutboxEvent) -> Result<()>;
31}
32
33/// Configuration for the event bus forwarder.
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct BusConfig {
36    /// Kafka/Redpanda bootstrap servers.
37    pub bootstrap_servers: String,
38    /// Topic that receives structured run/step events.
39    pub topic: String,
40    /// Maximum number of rows polled per batch.
41    pub batch_size: i64,
42    /// Poll interval when no events are pending.
43    pub idle_backoff: Duration,
44    /// Connection/client properties for the Kafka producer.
45    #[serde(default)]
46    pub client_props: Vec<(String, String)>,
47    /// Optional transactional id enabling exactly-once semantics.
48    #[serde(default)]
49    pub transactional_id: Option<String>,
50}
51
52impl Default for BusConfig {
53    fn default() -> Self {
54        Self {
55            bootstrap_servers: "localhost:9092".to_string(),
56            topic: "fleetforge.events".to_string(),
57            batch_size: 128,
58            idle_backoff: Duration::from_millis(250),
59            client_props: vec![],
60            transactional_id: None,
61        }
62    }
63}
64
65/// Forwarder that drains the transactional outbox and publishes records.
66///
67/// ```text
68/// Typical flow:
69/// 1. Application writes domain rows and enqueues an outbox event in the same DB transaction.
70/// 2. After commit, `OutboxForwarder` publishes the event to Kafka/Redpanda.
71/// 3. Once the broker acknowledges the write, the forwarder marks the outbox row as published.
72///
73/// This mirrors the transactional outbox pattern (see microservices.io) and provides
74/// at-least-once delivery. When configured with a Kafka `transactional.id`, the forwarder
75/// wraps each batch in a producer transaction for effectively-exactly-once semantics.
76/// ```
77pub struct OutboxForwarder {
78    storage: Arc<Storage>,
79    topic: String,
80    batch_size: i64,
81    idle_backoff: Duration,
82    producer: Arc<dyn EventProducer>,
83    exporters: Vec<Arc<dyn OutboxExporter>>,
84}
85
86impl OutboxForwarder {
87    /// Lightweight constructor check ensuring transactional mode is enabled when a `transactional.id` is provided.
88    #[cfg(test)]
89    pub(crate) fn new_with_config(storage: Arc<Storage>, config: BusConfig) -> Result<Self> {
90        Self::new(storage, config)
91    }
92
93    pub fn new(storage: Arc<Storage>, config: BusConfig) -> Result<Self> {
94        let mut client = ClientConfig::new();
95        client.set("bootstrap.servers", &config.bootstrap_servers);
96        client.set("message.timeout.ms", "5000");
97
98        let transactional_id = config.transactional_id.clone().or_else(|| {
99            config
100                .client_props
101                .iter()
102                .find(|(k, _)| k == "transactional.id")
103                .map(|(_, v)| v.clone())
104        });
105
106        client.set("enable.idempotence", "true");
107        if transactional_id.is_none() {
108            warn!(
109                "Kafka producer configured without transactional.id; downstream delivery remains at-least-once"
110            );
111        }
112
113        for (key, value) in &config.client_props {
114            client.set(key, value);
115        }
116
117        if let Some(id) = &transactional_id {
118            client.set("transactional.id", id);
119        }
120
121        let kafka_producer: FutureProducer = client
122            .create()
123            .map_err(|err| anyhow!("failed to create Kafka producer: {err}"))?;
124
125        if transactional_id.is_some() {
126            kafka_producer
127                .init_transactions(Timeout::After(Duration::from_secs(10)))
128                .map_err(|err| anyhow!("failed to initialise Kafka transactions: {err}"))?;
129        }
130
131        let producer = Arc::new(KafkaEventProducer::new(
132            kafka_producer,
133            transactional_id.is_some(),
134        ));
135
136        Ok(Self::with_producer(
137            storage,
138            producer,
139            config.topic,
140            config.batch_size,
141            config.idle_backoff,
142            Vec::new(),
143        ))
144    }
145
146    fn with_producer(
147        storage: Arc<Storage>,
148        producer: Arc<dyn EventProducer>,
149        topic: String,
150        batch_size: i64,
151        idle_backoff: Duration,
152        exporters: Vec<Arc<dyn OutboxExporter>>,
153    ) -> Self {
154        Self {
155            storage,
156            topic,
157            batch_size,
158            idle_backoff,
159            producer,
160            exporters,
161        }
162    }
163
164    pub fn with_exporters(mut self, exporters: Vec<Arc<dyn OutboxExporter>>) -> Self {
165        self.exporters = exporters;
166        self
167    }
168
169    /// Runs the forwarder loop until shutdown.
170    pub async fn run(self: Arc<Self>) -> Result<()> {
171        info!(topic = %self.topic, "starting outbox forwarder");
172        loop {
173            let events = self.poll_pending_events().await?;
174            if events.is_empty() {
175                sleep(self.idle_backoff).await;
176                continue;
177            }
178
179            if let Err(err) = self.process_batch(events).await {
180                error!(error = ?err, "failed to forward outbox batch");
181                sleep(self.idle_backoff).await;
182            }
183        }
184    }
185
186    async fn process_batch(&self, events: Vec<OutboxEvent>) -> Result<()> {
187        if events.is_empty() {
188            return Ok(());
189        }
190
191        let transactional = self.producer.transactional();
192        let mut delivered = Vec::new();
193
194        if transactional {
195            self.producer.begin_transaction()?;
196        }
197
198        for event in events {
199            if let Err(err) = self.forward_event(&event).await {
200                if transactional {
201                    if let Err(abort_err) = self.producer.abort_transaction() {
202                        warn!(error = ?abort_err, "failed to abort Kafka transaction after send error");
203                    }
204                }
205                return Err(err);
206            }
207
208            for exporter in &self.exporters {
209                if let Err(err) = exporter.export(&event).await {
210                    warn!(
211                        error = ?err,
212                        exporter = exporter.name(),
213                        event_id = event.id,
214                        "failed to export outbox event to external sink"
215                    );
216                }
217            }
218
219            if transactional {
220                delivered.push(event.id);
221            } else {
222                self.storage.mark_outbox_published(event.id).await?;
223            }
224        }
225
226        if transactional {
227            if let Err(err) = self.producer.commit_transaction() {
228                if let Err(abort_err) = self.producer.abort_transaction() {
229                    warn!(error = ?abort_err, "failed to abort Kafka transaction after commit error");
230                }
231                return Err(err);
232            }
233
234            for id in delivered {
235                self.storage.mark_outbox_published(id).await?;
236            }
237        }
238
239        Ok(())
240    }
241
242    #[cfg(test)]
243    pub(crate) async fn process_once(&self) -> Result<bool> {
244        let events = self.poll_pending_events().await?;
245        if events.is_empty() {
246            return Ok(false);
247        }
248        self.process_batch(events).await.map(|_| true)
249    }
250
251    async fn poll_pending_events(&self) -> Result<Vec<OutboxEvent>> {
252        let events = self
253            .storage
254            .fetch_unpublished_outbox(self.batch_size)
255            .await
256            .context("failed to read outbox events")?;
257        Ok(events)
258    }
259
260    async fn forward_event(&self, event: &OutboxEvent) -> Result<()> {
261        let payload = to_vec(&event.payload)?;
262        let key = event.id.to_string();
263
264        let mut backoff = ExponentialBackoff::default();
265        backoff.max_elapsed_time = None;
266
267        let topic = self.topic.clone();
268        let producer = Arc::clone(&self.producer);
269
270        retry(backoff, || async {
271            producer
272                .send(&topic, key.clone(), payload.clone())
273                .await
274                .map_err(backoff::Error::transient)
275        })
276        .await?;
277
278        debug!(event_id = event.id, "forwarded outbox event");
279        Ok(())
280    }
281}
282
283#[async_trait]
284trait EventProducer: Send + Sync {
285    fn transactional(&self) -> bool;
286    async fn send(&self, topic: &str, key: String, payload: Vec<u8>) -> Result<()>;
287    fn begin_transaction(&self) -> Result<()>;
288    fn commit_transaction(&self) -> Result<()>;
289    fn abort_transaction(&self) -> Result<()>;
290}
291
292struct KafkaEventProducer {
293    inner: FutureProducer,
294    transactional: bool,
295}
296
297impl KafkaEventProducer {
298    fn new(inner: FutureProducer, transactional: bool) -> Self {
299        Self {
300            inner,
301            transactional,
302        }
303    }
304}
305
306#[async_trait]
307impl EventProducer for KafkaEventProducer {
308    fn transactional(&self) -> bool {
309        self.transactional
310    }
311
312    async fn send(&self, topic: &str, key: String, payload: Vec<u8>) -> Result<()> {
313        let record: FutureRecord<'_, String, Vec<u8>> =
314            FutureRecord::to(topic).key(&key).payload(&payload);
315        self.inner
316            .send(record, Timeout::After(Duration::from_secs(5)))
317            .await
318            .map_err(|(err, _)| anyhow!("kafka send failed: {err}"))?;
319        Ok(())
320    }
321
322    fn begin_transaction(&self) -> Result<()> {
323        if self.transactional {
324            self.inner
325                .begin_transaction()
326                .map_err(|err| anyhow!("failed to begin Kafka transaction: {err}"))?;
327        }
328        Ok(())
329    }
330
331    fn commit_transaction(&self) -> Result<()> {
332        if self.transactional {
333            self.inner
334                .commit_transaction(Timeout::After(Duration::from_secs(10)))
335                .map_err(|err| anyhow!("failed to commit Kafka transaction: {err}"))?;
336        }
337        Ok(())
338    }
339
340    fn abort_transaction(&self) -> Result<()> {
341        if self.transactional {
342            self.inner
343                .abort_transaction(Timeout::After(Duration::from_secs(10)))
344                .map_err(|err| anyhow!("failed to abort Kafka transaction: {err}"))?;
345        }
346        Ok(())
347    }
348}
349
350/// Spawns an outbox forwarder task.
351pub async fn run_outbox_forwarder(config: BusConfig, storage: Arc<Storage>) -> Result<()> {
352    run_outbox_forwarder_with_exporters(config, storage, Vec::new()).await
353}
354
355/// Spawns an outbox forwarder task with additional exporters that fan out a copy of each event.
356pub async fn run_outbox_forwarder_with_exporters(
357    config: BusConfig,
358    storage: Arc<Storage>,
359    exporters: Vec<Arc<dyn OutboxExporter>>,
360) -> Result<()> {
361    let forwarder = OutboxForwarder::new(Arc::clone(&storage), config)?.with_exporters(exporters);
362    let forwarder = Arc::new(forwarder);
363    forwarder.run().await
364}
365
366#[cfg(test)]
367mod tests {
368    use super::*;
369    use fleetforge_storage::{ObjectStoreConfig, StorageConfig};
370    use rdkafka::consumer::{Consumer, StreamConsumer};
371    use rdkafka::ClientConfig;
372    use rdkafka::Message;
373    use serde_json::json;
374    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
375    use std::sync::Mutex;
376    use testcontainers::clients::Cli;
377    use testcontainers::images::generic::GenericImage;
378    use testcontainers::images::postgres::Postgres;
379    use tokio::time::sleep;
380    use tokio::time::timeout;
381    use uuid::Uuid;
382
383    #[tokio::test]
384    async fn forward_event_marks_published() {
385        if std::fs::metadata("/var/run/docker.sock").is_err()
386            && std::env::var("DOCKER_HOST").is_err()
387        {
388            eprintln!("Skipping bus tests because Docker is unavailable");
389            return;
390        }
391
392        let docker = Cli::default();
393        let container = docker.run(Postgres::default());
394        let port = container.get_host_port_ipv4(5432);
395        let db_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
396        sleep(Duration::from_millis(250)).await;
397
398        let storage = Arc::new(
399            Storage::connect(StorageConfig {
400                database_url: db_url,
401                max_connections: 1,
402                connect_timeout: Duration::from_secs(10),
403                object_store: ObjectStoreConfig::InMemory,
404            })
405            .await
406            .expect("connect storage"),
407        );
408
409        let run_id = Uuid::new_v4();
410        let id: i64 = sqlx::query_scalar(
411            r#"insert into outbox (run_id, kind, payload) values ($1, 'event', '{"ok":true}') returning id"#,
412        )
413        .bind(run_id)
414        .fetch_one(storage.pool())
415        .await
416        .unwrap();
417
418        let producer = Arc::new(MockProducer::new(false));
419        let producer_trait: Arc<dyn EventProducer> = producer.clone();
420        let forwarder = OutboxForwarder::with_producer(
421            Arc::clone(&storage),
422            producer_trait.clone(),
423            "topic".to_string(),
424            10,
425            Duration::from_millis(50),
426            Vec::new(),
427        );
428
429        let events = forwarder.poll_pending_events().await.unwrap();
430        assert_eq!(events.len(), 1);
431
432        forwarder.forward_event(&events[0]).await.unwrap();
433        storage.mark_outbox_published(id).await.unwrap();
434
435        assert_eq!(producer.sent.lock().unwrap().len(), 1);
436        let published: Option<chrono::DateTime<chrono::Utc>> =
437            sqlx::query_scalar("select published_at from outbox where id = $1")
438                .bind(id)
439                .fetch_one(storage.pool())
440                .await
441                .unwrap();
442        assert!(published.is_some());
443
444        drop(container);
445    }
446
447    #[tokio::test]
448    async fn forward_event_retries_on_send_failure() {
449        if std::fs::metadata("/var/run/docker.sock").is_err()
450            && std::env::var("DOCKER_HOST").is_err()
451        {
452            eprintln!("Skipping bus tests because Docker is unavailable");
453            return;
454        }
455
456        let docker = Cli::default();
457        let container = docker.run(Postgres::default());
458        let port = container.get_host_port_ipv4(5432);
459        let db_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
460        sleep(Duration::from_millis(250)).await;
461
462        let storage = Arc::new(
463            Storage::connect(StorageConfig {
464                database_url: db_url,
465                max_connections: 1,
466                connect_timeout: Duration::from_secs(10),
467                object_store: ObjectStoreConfig::InMemory,
468            })
469            .await
470            .expect("connect storage"),
471        );
472
473        let run_id = Uuid::new_v4();
474        let id: i64 = sqlx::query_scalar(
475            r#"insert into outbox (run_id, kind, payload) values ($1, 'event', '{"ok":true}') returning id"#,
476        )
477        .bind(run_id)
478        .fetch_one(storage.pool())
479        .await
480        .unwrap();
481
482        let producer = Arc::new(MockProducer::failing(false, 1, false));
483        let producer_trait: Arc<dyn EventProducer> = producer.clone();
484        let forwarder = OutboxForwarder::with_producer(
485            Arc::clone(&storage),
486            producer_trait.clone(),
487            "topic".to_string(),
488            10,
489            Duration::from_millis(50),
490            Vec::new(),
491        );
492
493        let events = forwarder.poll_pending_events().await.unwrap();
494        assert_eq!(events.len(), 1);
495
496        forwarder.forward_event(&events[0]).await.unwrap();
497        storage.mark_outbox_published(id).await.unwrap();
498
499        let published: Option<chrono::DateTime<chrono::Utc>> =
500            sqlx::query_scalar("select published_at from outbox where id = $1")
501                .bind(id)
502                .fetch_one(storage.pool())
503                .await
504                .unwrap();
505        assert!(published.is_some());
506        assert_eq!(producer.sent.lock().unwrap().len(), 1);
507
508        drop(container);
509    }
510
511    #[tokio::test]
512    async fn transactional_retry_does_not_mark_until_commit() {
513        if std::fs::metadata("/var/run/docker.sock").is_err()
514            && std::env::var("DOCKER_HOST").is_err()
515        {
516            eprintln!("Skipping bus tests because Docker is unavailable");
517            return;
518        }
519
520        let docker = Cli::default();
521        let container = docker.run(Postgres::default());
522        let port = container.get_host_port_ipv4(5432);
523        let db_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
524        sleep(Duration::from_millis(250)).await;
525
526        let storage = Arc::new(
527            Storage::connect(StorageConfig {
528                database_url: db_url,
529                max_connections: 1,
530                connect_timeout: Duration::from_secs(10),
531                object_store: ObjectStoreConfig::InMemory,
532            })
533            .await
534            .expect("connect storage"),
535        );
536
537        let run_id = Uuid::new_v4();
538        let id: i64 = sqlx::query_scalar(
539            r#"insert into outbox (run_id, kind, payload) values ($1, 'event', '{"ok":true}') returning id"#,
540        )
541        .bind(run_id)
542        .fetch_one(storage.pool())
543        .await
544        .unwrap();
545
546        let producer = Arc::new(MockProducer::failing(true, 0, true));
547        let producer_trait: Arc<dyn EventProducer> = producer.clone();
548        let forwarder = OutboxForwarder::with_producer(
549            Arc::clone(&storage),
550            producer_trait.clone(),
551            "topic".to_string(),
552            10,
553            Duration::from_millis(50),
554            Vec::new(),
555        );
556
557        let events = forwarder.poll_pending_events().await.unwrap();
558        assert_eq!(events.len(), 1);
559
560        // emulate run() transaction flow
561        assert!(producer_trait.begin_transaction().is_ok());
562        forwarder.forward_event(&events[0]).await.unwrap();
563        assert!(producer_trait.commit_transaction().is_err());
564        producer_trait.abort_transaction().unwrap();
565
566        let published: Option<chrono::DateTime<chrono::Utc>> =
567            sqlx::query_scalar("select published_at from outbox where id = $1")
568                .bind(id)
569                .fetch_one(storage.pool())
570                .await
571                .unwrap();
572        assert!(published.is_none());
573
574        // second attempt succeeds
575        producer.reset_failures();
576        assert!(producer_trait.begin_transaction().is_ok());
577        forwarder.forward_event(&events[0]).await.unwrap();
578        producer_trait.commit_transaction().unwrap();
579        storage.mark_outbox_published(id).await.unwrap();
580
581        let published: Option<chrono::DateTime<chrono::Utc>> =
582            sqlx::query_scalar("select published_at from outbox where id = $1")
583                .bind(id)
584                .fetch_one(storage.pool())
585                .await
586                .unwrap();
587        assert!(published.is_some());
588        assert!(producer.sent.lock().unwrap().len() >= 2);
589
590        drop(container);
591    }
592
593    fn docker_available() -> bool {
594        std::fs::metadata("/var/run/docker.sock").is_ok() || std::env::var("DOCKER_HOST").is_ok()
595    }
596
597    #[tokio::test]
598    async fn forwards_to_redpanda_with_transactions() {
599        if !docker_available() {
600            eprintln!("Skipping redpanda integration test because Docker is unavailable");
601            return;
602        }
603
604        let docker = Cli::default();
605        let pg_container = docker.run(Postgres::default());
606        let pg_port = pg_container.get_host_port_ipv4(5432);
607        let db_url = format!("postgres://postgres:postgres@127.0.0.1:{pg_port}/postgres");
608        sleep(Duration::from_millis(250)).await;
609
610        let storage = Arc::new(
611            Storage::connect(StorageConfig {
612                database_url: db_url,
613                max_connections: 2,
614                connect_timeout: Duration::from_secs(10),
615                object_store: ObjectStoreConfig::InMemory,
616            })
617            .await
618            .expect("connect storage"),
619        );
620
621        let topic = "fleetforge.events";
622        let payload = json!({"run_id": "integration", "ok": true});
623        let run_id = Uuid::new_v4();
624        let event_id: i64 = sqlx::query_scalar(
625            "insert into outbox (run_id, kind, payload) values ($1, 'event', $2) returning id",
626        )
627        .bind(run_id)
628        .bind(payload.clone())
629        .fetch_one(storage.pool())
630        .await
631        .expect("insert outbox");
632
633        // Launch Redpanda with host listener enabled.
634        let redpanda_image =
635            GenericImage::new("docker.redpanda.com/redpandadata/redpanda", "v23.3.14")
636                .with_env_var("REDPANDA_AUTO_CREATE_TOPICS", "true")
637                .with_exposed_port(9092)
638                .with_cmd(vec![
639                    "redpanda".into(),
640                    "start".into(),
641                    "--overprovisioned".into(),
642                    "--smp".into(),
643                    "1".into(),
644                    "--memory".into(),
645                    "256M".into(),
646                    "--reserve-memory".into(),
647                    "0M".into(),
648                    "--node-id".into(),
649                    "0".into(),
650                    "--check=false".into(),
651                    "--kafka-addr".into(),
652                    "PLAINTEXT://0.0.0.0:9092".into(),
653                    "--advertise-kafka-addr".into(),
654                    "PLAINTEXT://redpanda:9092".into(),
655                    "--set".into(),
656                    "redpanda.auto_create_topics_enabled=true".into(),
657                ]);
658
659        let redpanda_container = docker.run(redpanda_image);
660        let kafka_port = redpanda_container.get_host_port_ipv4(9092);
661
662        // Allow broker to finish booting.
663        sleep(Duration::from_secs(2)).await;
664
665        let config = BusConfig {
666            bootstrap_servers: format!("127.0.0.1:{kafka_port}"),
667            topic: topic.to_string(),
668            batch_size: 64,
669            idle_backoff: Duration::from_millis(50),
670            client_props: vec![
671                ("message.timeout.ms".to_string(), "5000".to_string()),
672                ("socket.timeout.ms".to_string(), "5000".to_string()),
673            ],
674            transactional_id: Some(format!("test-txn-{}", Uuid::new_v4())),
675        };
676
677        let forwarder =
678            OutboxForwarder::new(Arc::clone(&storage), config.clone()).expect("create forwarder");
679        assert!(forwarder.process_once().await.expect("process once"));
680
681        let published: Option<chrono::DateTime<chrono::Utc>> =
682            sqlx::query_scalar("select published_at from outbox where id = $1")
683                .bind(event_id)
684                .fetch_one(storage.pool())
685                .await
686                .expect("query published");
687        assert!(
688            published.is_some(),
689            "outbox row should be marked as published"
690        );
691
692        let consumer: StreamConsumer = ClientConfig::new()
693            .set("group.id", &format!("ff-test-{}", Uuid::new_v4()))
694            .set("bootstrap.servers", &config.bootstrap_servers)
695            .set("auto.offset.reset", "earliest")
696            .create()
697            .expect("create consumer");
698        consumer.subscribe(&[topic]).expect("subscribe to topic");
699
700        let message = timeout(Duration::from_secs(15), async {
701            loop {
702                match consumer.recv().await {
703                    Ok(record) => break Ok(record),
704                    Err(err) => {
705                        if err.is_partition_eof() {
706                            continue;
707                        }
708                        break Err(err);
709                    }
710                }
711            }
712        })
713        .await
714        .expect("timed out waiting for message")
715        .expect("consumer error");
716
717        let key = message
718            .key()
719            .and_then(|bytes| std::str::from_utf8(bytes).ok())
720            .expect("key present");
721        assert_eq!(key, event_id.to_string());
722
723        let payload_bytes = message.payload().expect("payload present");
724        let payload_value: serde_json::Value =
725            serde_json::from_slice(payload_bytes).expect("valid json payload");
726        assert_eq!(payload_value, payload);
727
728        drop(redpanda_container);
729        drop(pg_container);
730    }
731
732    #[tokio::test]
733    async fn forwards_to_redpanda_at_least_once() {
734        if !docker_available() {
735            eprintln!("Skipping redpanda integration test because Docker is unavailable");
736            return;
737        }
738
739        let docker = Cli::default();
740        let pg_container = docker.run(Postgres::default());
741        let pg_port = pg_container.get_host_port_ipv4(5432);
742        let db_url = format!("postgres://postgres:postgres@127.0.0.1:{pg_port}/postgres");
743        sleep(Duration::from_millis(250)).await;
744
745        let storage = Arc::new(
746            Storage::connect(StorageConfig {
747                database_url: db_url,
748                max_connections: 2,
749                connect_timeout: Duration::from_secs(10),
750                object_store: ObjectStoreConfig::InMemory,
751            })
752            .await
753            .expect("connect storage"),
754        );
755
756        let topic = "fleetforge.events";
757        let payload = json!({"run_id": "integration", "ok": true});
758        let run_id = Uuid::new_v4();
759        let event_id: i64 = sqlx::query_scalar(
760            "insert into outbox (run_id, kind, payload) values ($1, 'event', $2) returning id",
761        )
762        .bind(run_id)
763        .bind(payload.clone())
764        .fetch_one(storage.pool())
765        .await
766        .expect("insert outbox");
767
768        let redpanda_image =
769            GenericImage::new("docker.redpanda.com/redpandadata/redpanda", "v23.3.14")
770                .with_env_var("REDPANDA_AUTO_CREATE_TOPICS", "true")
771                .with_exposed_port(9092)
772                .with_cmd(vec![
773                    "redpanda".into(),
774                    "start".into(),
775                    "--overprovisioned".into(),
776                    "--smp".into(),
777                    "1".into(),
778                    "--memory".into(),
779                    "256M".into(),
780                    "--reserve-memory".into(),
781                    "0M".into(),
782                    "--node-id".into(),
783                    "0".into(),
784                    "--check=false".into(),
785                    "--kafka-addr".into(),
786                    "PLAINTEXT://0.0.0.0:9092".into(),
787                    "--advertise-kafka-addr".into(),
788                    "PLAINTEXT://redpanda:9092".into(),
789                    "--set".into(),
790                    "redpanda.auto_create_topics_enabled=true".into(),
791                ]);
792
793        let redpanda_container = docker.run(redpanda_image);
794        let kafka_port = redpanda_container.get_host_port_ipv4(9092);
795
796        sleep(Duration::from_secs(2)).await;
797
798        let config = BusConfig {
799            bootstrap_servers: format!("127.0.0.1:{kafka_port}"),
800            topic: topic.to_string(),
801            batch_size: 64,
802            idle_backoff: Duration::from_millis(50),
803            client_props: vec![
804                ("message.timeout.ms".to_string(), "5000".to_string()),
805                ("socket.timeout.ms".to_string(), "5000".to_string()),
806            ],
807            transactional_id: None,
808        };
809
810        let forwarder =
811            OutboxForwarder::new(Arc::clone(&storage), config.clone()).expect("forwarder");
812        assert!(forwarder.process_once().await.expect("process once"));
813
814        let published: Option<chrono::DateTime<chrono::Utc>> =
815            sqlx::query_scalar("select published_at from outbox where id = $1")
816                .bind(event_id)
817                .fetch_one(storage.pool())
818                .await
819                .expect("query published");
820        assert!(
821            published.is_some(),
822            "outbox row should be marked as published"
823        );
824
825        let consumer: StreamConsumer = ClientConfig::new()
826            .set("group.id", &format!("ff-test-{}", Uuid::new_v4()))
827            .set("bootstrap.servers", &config.bootstrap_servers)
828            .set("auto.offset.reset", "earliest")
829            .create()
830            .expect("create consumer");
831        consumer.subscribe(&[topic]).expect("subscribe to topic");
832
833        let message = timeout(Duration::from_secs(15), async {
834            loop {
835                match consumer.recv().await {
836                    Ok(record) => break Ok(record),
837                    Err(err) => {
838                        if err.is_partition_eof() {
839                            continue;
840                        }
841                        break Err(err);
842                    }
843                }
844            }
845        })
846        .await
847        .expect("timed out waiting for message")
848        .expect("consumer error");
849
850        let key = message
851            .key()
852            .and_then(|bytes| std::str::from_utf8(bytes).ok())
853            .expect("key present");
854        assert_eq!(key, event_id.to_string());
855
856        let payload_bytes = message.payload().expect("payload present");
857        let payload_value: serde_json::Value =
858            serde_json::from_slice(payload_bytes).expect("valid json payload");
859        assert_eq!(payload_value, payload);
860
861        drop(redpanda_container);
862        drop(pg_container);
863    }
864
865    struct MockProducer {
866        transactional: bool,
867        fail_send: AtomicUsize,
868        fail_commit_once: AtomicBool,
869        pub sent: Mutex<Vec<(String, Vec<u8>)>>,
870    }
871
872    impl MockProducer {
873        fn new(transactional: bool) -> Self {
874            Self {
875                transactional,
876                fail_send: AtomicUsize::new(0),
877                fail_commit_once: AtomicBool::new(false),
878                sent: Mutex::new(Vec::new()),
879            }
880        }
881
882        fn failing(transactional: bool, send_failures: usize, fail_commit: bool) -> Self {
883            Self {
884                transactional,
885                fail_send: AtomicUsize::new(send_failures),
886                fail_commit_once: AtomicBool::new(fail_commit),
887                sent: Mutex::new(Vec::new()),
888            }
889        }
890
891        fn reset_failures(&self) {
892            self.fail_send.store(0, Ordering::SeqCst);
893            self.fail_commit_once.store(false, Ordering::SeqCst);
894        }
895    }
896
897    #[async_trait]
898    impl EventProducer for MockProducer {
899        fn transactional(&self) -> bool {
900            self.transactional
901        }
902
903        async fn send(&self, _topic: &str, key: String, payload: Vec<u8>) -> Result<()> {
904            if self.fail_send.load(Ordering::SeqCst) > 0 {
905                self.fail_send.fetch_sub(1, Ordering::SeqCst);
906                return Err(anyhow!("injected send failure"));
907            }
908
909            self.sent.lock().unwrap().push((key, payload));
910            Ok(())
911        }
912
913        fn begin_transaction(&self) -> Result<()> {
914            Ok(())
915        }
916
917        fn commit_transaction(&self) -> Result<()> {
918            if self.fail_commit_once.swap(false, Ordering::SeqCst) {
919                Err(anyhow!("injected commit failure"))
920            } else {
921                Ok(())
922            }
923        }
924
925        fn abort_transaction(&self) -> Result<()> {
926            Ok(())
927        }
928    }
929}