1use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{anyhow, Context, Result};
7use async_trait::async_trait;
8use backoff::future::retry;
9use backoff::ExponentialBackoff;
10use rdkafka::producer::{FutureProducer, FutureRecord, Producer};
11use rdkafka::util::Timeout;
12use rdkafka::ClientConfig;
13use serde::{Deserialize, Serialize};
14use serde_json::to_vec;
15use tokio::time::sleep;
16use tracing::{debug, error, info, warn};
17
18use fleetforge_storage::models::OutboxEvent;
19use fleetforge_storage::Storage;
20
21#[async_trait]
23pub trait OutboxExporter: Send + Sync {
24 fn name(&self) -> &'static str {
26 "external"
27 }
28
29 async fn export(&self, event: &OutboxEvent) -> Result<()>;
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct BusConfig {
36 pub bootstrap_servers: String,
38 pub topic: String,
40 pub batch_size: i64,
42 pub idle_backoff: Duration,
44 #[serde(default)]
46 pub client_props: Vec<(String, String)>,
47 #[serde(default)]
49 pub transactional_id: Option<String>,
50}
51
52impl Default for BusConfig {
53 fn default() -> Self {
54 Self {
55 bootstrap_servers: "localhost:9092".to_string(),
56 topic: "fleetforge.events".to_string(),
57 batch_size: 128,
58 idle_backoff: Duration::from_millis(250),
59 client_props: vec![],
60 transactional_id: None,
61 }
62 }
63}
64
65pub struct OutboxForwarder {
78 storage: Arc<Storage>,
79 topic: String,
80 batch_size: i64,
81 idle_backoff: Duration,
82 producer: Arc<dyn EventProducer>,
83 exporters: Vec<Arc<dyn OutboxExporter>>,
84}
85
86impl OutboxForwarder {
87 #[cfg(test)]
89 pub(crate) fn new_with_config(storage: Arc<Storage>, config: BusConfig) -> Result<Self> {
90 Self::new(storage, config)
91 }
92
93 pub fn new(storage: Arc<Storage>, config: BusConfig) -> Result<Self> {
94 let mut client = ClientConfig::new();
95 client.set("bootstrap.servers", &config.bootstrap_servers);
96 client.set("message.timeout.ms", "5000");
97
98 let transactional_id = config.transactional_id.clone().or_else(|| {
99 config
100 .client_props
101 .iter()
102 .find(|(k, _)| k == "transactional.id")
103 .map(|(_, v)| v.clone())
104 });
105
106 client.set("enable.idempotence", "true");
107 if transactional_id.is_none() {
108 warn!(
109 "Kafka producer configured without transactional.id; downstream delivery remains at-least-once"
110 );
111 }
112
113 for (key, value) in &config.client_props {
114 client.set(key, value);
115 }
116
117 if let Some(id) = &transactional_id {
118 client.set("transactional.id", id);
119 }
120
121 let kafka_producer: FutureProducer = client
122 .create()
123 .map_err(|err| anyhow!("failed to create Kafka producer: {err}"))?;
124
125 if transactional_id.is_some() {
126 kafka_producer
127 .init_transactions(Timeout::After(Duration::from_secs(10)))
128 .map_err(|err| anyhow!("failed to initialise Kafka transactions: {err}"))?;
129 }
130
131 let producer = Arc::new(KafkaEventProducer::new(
132 kafka_producer,
133 transactional_id.is_some(),
134 ));
135
136 Ok(Self::with_producer(
137 storage,
138 producer,
139 config.topic,
140 config.batch_size,
141 config.idle_backoff,
142 Vec::new(),
143 ))
144 }
145
146 fn with_producer(
147 storage: Arc<Storage>,
148 producer: Arc<dyn EventProducer>,
149 topic: String,
150 batch_size: i64,
151 idle_backoff: Duration,
152 exporters: Vec<Arc<dyn OutboxExporter>>,
153 ) -> Self {
154 Self {
155 storage,
156 topic,
157 batch_size,
158 idle_backoff,
159 producer,
160 exporters,
161 }
162 }
163
164 pub fn with_exporters(mut self, exporters: Vec<Arc<dyn OutboxExporter>>) -> Self {
165 self.exporters = exporters;
166 self
167 }
168
169 pub async fn run(self: Arc<Self>) -> Result<()> {
171 info!(topic = %self.topic, "starting outbox forwarder");
172 loop {
173 let events = self.poll_pending_events().await?;
174 if events.is_empty() {
175 sleep(self.idle_backoff).await;
176 continue;
177 }
178
179 if let Err(err) = self.process_batch(events).await {
180 error!(error = ?err, "failed to forward outbox batch");
181 sleep(self.idle_backoff).await;
182 }
183 }
184 }
185
186 async fn process_batch(&self, events: Vec<OutboxEvent>) -> Result<()> {
187 if events.is_empty() {
188 return Ok(());
189 }
190
191 let transactional = self.producer.transactional();
192 let mut delivered = Vec::new();
193
194 if transactional {
195 self.producer.begin_transaction()?;
196 }
197
198 for event in events {
199 if let Err(err) = self.forward_event(&event).await {
200 if transactional {
201 if let Err(abort_err) = self.producer.abort_transaction() {
202 warn!(error = ?abort_err, "failed to abort Kafka transaction after send error");
203 }
204 }
205 return Err(err);
206 }
207
208 for exporter in &self.exporters {
209 if let Err(err) = exporter.export(&event).await {
210 warn!(
211 error = ?err,
212 exporter = exporter.name(),
213 event_id = event.id,
214 "failed to export outbox event to external sink"
215 );
216 }
217 }
218
219 if transactional {
220 delivered.push(event.id);
221 } else {
222 self.storage.mark_outbox_published(event.id).await?;
223 }
224 }
225
226 if transactional {
227 if let Err(err) = self.producer.commit_transaction() {
228 if let Err(abort_err) = self.producer.abort_transaction() {
229 warn!(error = ?abort_err, "failed to abort Kafka transaction after commit error");
230 }
231 return Err(err);
232 }
233
234 for id in delivered {
235 self.storage.mark_outbox_published(id).await?;
236 }
237 }
238
239 Ok(())
240 }
241
242 #[cfg(test)]
243 pub(crate) async fn process_once(&self) -> Result<bool> {
244 let events = self.poll_pending_events().await?;
245 if events.is_empty() {
246 return Ok(false);
247 }
248 self.process_batch(events).await.map(|_| true)
249 }
250
251 async fn poll_pending_events(&self) -> Result<Vec<OutboxEvent>> {
252 let events = self
253 .storage
254 .fetch_unpublished_outbox(self.batch_size)
255 .await
256 .context("failed to read outbox events")?;
257 Ok(events)
258 }
259
260 async fn forward_event(&self, event: &OutboxEvent) -> Result<()> {
261 let payload = to_vec(&event.payload)?;
262 let key = event.id.to_string();
263
264 let mut backoff = ExponentialBackoff::default();
265 backoff.max_elapsed_time = None;
266
267 let topic = self.topic.clone();
268 let producer = Arc::clone(&self.producer);
269
270 retry(backoff, || async {
271 producer
272 .send(&topic, key.clone(), payload.clone())
273 .await
274 .map_err(backoff::Error::transient)
275 })
276 .await?;
277
278 debug!(event_id = event.id, "forwarded outbox event");
279 Ok(())
280 }
281}
282
283#[async_trait]
284trait EventProducer: Send + Sync {
285 fn transactional(&self) -> bool;
286 async fn send(&self, topic: &str, key: String, payload: Vec<u8>) -> Result<()>;
287 fn begin_transaction(&self) -> Result<()>;
288 fn commit_transaction(&self) -> Result<()>;
289 fn abort_transaction(&self) -> Result<()>;
290}
291
292struct KafkaEventProducer {
293 inner: FutureProducer,
294 transactional: bool,
295}
296
297impl KafkaEventProducer {
298 fn new(inner: FutureProducer, transactional: bool) -> Self {
299 Self {
300 inner,
301 transactional,
302 }
303 }
304}
305
306#[async_trait]
307impl EventProducer for KafkaEventProducer {
308 fn transactional(&self) -> bool {
309 self.transactional
310 }
311
312 async fn send(&self, topic: &str, key: String, payload: Vec<u8>) -> Result<()> {
313 let record: FutureRecord<'_, String, Vec<u8>> =
314 FutureRecord::to(topic).key(&key).payload(&payload);
315 self.inner
316 .send(record, Timeout::After(Duration::from_secs(5)))
317 .await
318 .map_err(|(err, _)| anyhow!("kafka send failed: {err}"))?;
319 Ok(())
320 }
321
322 fn begin_transaction(&self) -> Result<()> {
323 if self.transactional {
324 self.inner
325 .begin_transaction()
326 .map_err(|err| anyhow!("failed to begin Kafka transaction: {err}"))?;
327 }
328 Ok(())
329 }
330
331 fn commit_transaction(&self) -> Result<()> {
332 if self.transactional {
333 self.inner
334 .commit_transaction(Timeout::After(Duration::from_secs(10)))
335 .map_err(|err| anyhow!("failed to commit Kafka transaction: {err}"))?;
336 }
337 Ok(())
338 }
339
340 fn abort_transaction(&self) -> Result<()> {
341 if self.transactional {
342 self.inner
343 .abort_transaction(Timeout::After(Duration::from_secs(10)))
344 .map_err(|err| anyhow!("failed to abort Kafka transaction: {err}"))?;
345 }
346 Ok(())
347 }
348}
349
350pub async fn run_outbox_forwarder(config: BusConfig, storage: Arc<Storage>) -> Result<()> {
352 run_outbox_forwarder_with_exporters(config, storage, Vec::new()).await
353}
354
355pub async fn run_outbox_forwarder_with_exporters(
357 config: BusConfig,
358 storage: Arc<Storage>,
359 exporters: Vec<Arc<dyn OutboxExporter>>,
360) -> Result<()> {
361 let forwarder = OutboxForwarder::new(Arc::clone(&storage), config)?.with_exporters(exporters);
362 let forwarder = Arc::new(forwarder);
363 forwarder.run().await
364}
365
366#[cfg(test)]
367mod tests {
368 use super::*;
369 use fleetforge_storage::{ObjectStoreConfig, StorageConfig};
370 use rdkafka::consumer::{Consumer, StreamConsumer};
371 use rdkafka::ClientConfig;
372 use rdkafka::Message;
373 use serde_json::json;
374 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
375 use std::sync::Mutex;
376 use testcontainers::clients::Cli;
377 use testcontainers::images::generic::GenericImage;
378 use testcontainers::images::postgres::Postgres;
379 use tokio::time::sleep;
380 use tokio::time::timeout;
381 use uuid::Uuid;
382
383 #[tokio::test]
384 async fn forward_event_marks_published() {
385 if std::fs::metadata("/var/run/docker.sock").is_err()
386 && std::env::var("DOCKER_HOST").is_err()
387 {
388 eprintln!("Skipping bus tests because Docker is unavailable");
389 return;
390 }
391
392 let docker = Cli::default();
393 let container = docker.run(Postgres::default());
394 let port = container.get_host_port_ipv4(5432);
395 let db_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
396 sleep(Duration::from_millis(250)).await;
397
398 let storage = Arc::new(
399 Storage::connect(StorageConfig {
400 database_url: db_url,
401 max_connections: 1,
402 connect_timeout: Duration::from_secs(10),
403 object_store: ObjectStoreConfig::InMemory,
404 })
405 .await
406 .expect("connect storage"),
407 );
408
409 let run_id = Uuid::new_v4();
410 let id: i64 = sqlx::query_scalar(
411 r#"insert into outbox (run_id, kind, payload) values ($1, 'event', '{"ok":true}') returning id"#,
412 )
413 .bind(run_id)
414 .fetch_one(storage.pool())
415 .await
416 .unwrap();
417
418 let producer = Arc::new(MockProducer::new(false));
419 let producer_trait: Arc<dyn EventProducer> = producer.clone();
420 let forwarder = OutboxForwarder::with_producer(
421 Arc::clone(&storage),
422 producer_trait.clone(),
423 "topic".to_string(),
424 10,
425 Duration::from_millis(50),
426 Vec::new(),
427 );
428
429 let events = forwarder.poll_pending_events().await.unwrap();
430 assert_eq!(events.len(), 1);
431
432 forwarder.forward_event(&events[0]).await.unwrap();
433 storage.mark_outbox_published(id).await.unwrap();
434
435 assert_eq!(producer.sent.lock().unwrap().len(), 1);
436 let published: Option<chrono::DateTime<chrono::Utc>> =
437 sqlx::query_scalar("select published_at from outbox where id = $1")
438 .bind(id)
439 .fetch_one(storage.pool())
440 .await
441 .unwrap();
442 assert!(published.is_some());
443
444 drop(container);
445 }
446
447 #[tokio::test]
448 async fn forward_event_retries_on_send_failure() {
449 if std::fs::metadata("/var/run/docker.sock").is_err()
450 && std::env::var("DOCKER_HOST").is_err()
451 {
452 eprintln!("Skipping bus tests because Docker is unavailable");
453 return;
454 }
455
456 let docker = Cli::default();
457 let container = docker.run(Postgres::default());
458 let port = container.get_host_port_ipv4(5432);
459 let db_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
460 sleep(Duration::from_millis(250)).await;
461
462 let storage = Arc::new(
463 Storage::connect(StorageConfig {
464 database_url: db_url,
465 max_connections: 1,
466 connect_timeout: Duration::from_secs(10),
467 object_store: ObjectStoreConfig::InMemory,
468 })
469 .await
470 .expect("connect storage"),
471 );
472
473 let run_id = Uuid::new_v4();
474 let id: i64 = sqlx::query_scalar(
475 r#"insert into outbox (run_id, kind, payload) values ($1, 'event', '{"ok":true}') returning id"#,
476 )
477 .bind(run_id)
478 .fetch_one(storage.pool())
479 .await
480 .unwrap();
481
482 let producer = Arc::new(MockProducer::failing(false, 1, false));
483 let producer_trait: Arc<dyn EventProducer> = producer.clone();
484 let forwarder = OutboxForwarder::with_producer(
485 Arc::clone(&storage),
486 producer_trait.clone(),
487 "topic".to_string(),
488 10,
489 Duration::from_millis(50),
490 Vec::new(),
491 );
492
493 let events = forwarder.poll_pending_events().await.unwrap();
494 assert_eq!(events.len(), 1);
495
496 forwarder.forward_event(&events[0]).await.unwrap();
497 storage.mark_outbox_published(id).await.unwrap();
498
499 let published: Option<chrono::DateTime<chrono::Utc>> =
500 sqlx::query_scalar("select published_at from outbox where id = $1")
501 .bind(id)
502 .fetch_one(storage.pool())
503 .await
504 .unwrap();
505 assert!(published.is_some());
506 assert_eq!(producer.sent.lock().unwrap().len(), 1);
507
508 drop(container);
509 }
510
511 #[tokio::test]
512 async fn transactional_retry_does_not_mark_until_commit() {
513 if std::fs::metadata("/var/run/docker.sock").is_err()
514 && std::env::var("DOCKER_HOST").is_err()
515 {
516 eprintln!("Skipping bus tests because Docker is unavailable");
517 return;
518 }
519
520 let docker = Cli::default();
521 let container = docker.run(Postgres::default());
522 let port = container.get_host_port_ipv4(5432);
523 let db_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
524 sleep(Duration::from_millis(250)).await;
525
526 let storage = Arc::new(
527 Storage::connect(StorageConfig {
528 database_url: db_url,
529 max_connections: 1,
530 connect_timeout: Duration::from_secs(10),
531 object_store: ObjectStoreConfig::InMemory,
532 })
533 .await
534 .expect("connect storage"),
535 );
536
537 let run_id = Uuid::new_v4();
538 let id: i64 = sqlx::query_scalar(
539 r#"insert into outbox (run_id, kind, payload) values ($1, 'event', '{"ok":true}') returning id"#,
540 )
541 .bind(run_id)
542 .fetch_one(storage.pool())
543 .await
544 .unwrap();
545
546 let producer = Arc::new(MockProducer::failing(true, 0, true));
547 let producer_trait: Arc<dyn EventProducer> = producer.clone();
548 let forwarder = OutboxForwarder::with_producer(
549 Arc::clone(&storage),
550 producer_trait.clone(),
551 "topic".to_string(),
552 10,
553 Duration::from_millis(50),
554 Vec::new(),
555 );
556
557 let events = forwarder.poll_pending_events().await.unwrap();
558 assert_eq!(events.len(), 1);
559
560 assert!(producer_trait.begin_transaction().is_ok());
562 forwarder.forward_event(&events[0]).await.unwrap();
563 assert!(producer_trait.commit_transaction().is_err());
564 producer_trait.abort_transaction().unwrap();
565
566 let published: Option<chrono::DateTime<chrono::Utc>> =
567 sqlx::query_scalar("select published_at from outbox where id = $1")
568 .bind(id)
569 .fetch_one(storage.pool())
570 .await
571 .unwrap();
572 assert!(published.is_none());
573
574 producer.reset_failures();
576 assert!(producer_trait.begin_transaction().is_ok());
577 forwarder.forward_event(&events[0]).await.unwrap();
578 producer_trait.commit_transaction().unwrap();
579 storage.mark_outbox_published(id).await.unwrap();
580
581 let published: Option<chrono::DateTime<chrono::Utc>> =
582 sqlx::query_scalar("select published_at from outbox where id = $1")
583 .bind(id)
584 .fetch_one(storage.pool())
585 .await
586 .unwrap();
587 assert!(published.is_some());
588 assert!(producer.sent.lock().unwrap().len() >= 2);
589
590 drop(container);
591 }
592
593 fn docker_available() -> bool {
594 std::fs::metadata("/var/run/docker.sock").is_ok() || std::env::var("DOCKER_HOST").is_ok()
595 }
596
597 #[tokio::test]
598 async fn forwards_to_redpanda_with_transactions() {
599 if !docker_available() {
600 eprintln!("Skipping redpanda integration test because Docker is unavailable");
601 return;
602 }
603
604 let docker = Cli::default();
605 let pg_container = docker.run(Postgres::default());
606 let pg_port = pg_container.get_host_port_ipv4(5432);
607 let db_url = format!("postgres://postgres:postgres@127.0.0.1:{pg_port}/postgres");
608 sleep(Duration::from_millis(250)).await;
609
610 let storage = Arc::new(
611 Storage::connect(StorageConfig {
612 database_url: db_url,
613 max_connections: 2,
614 connect_timeout: Duration::from_secs(10),
615 object_store: ObjectStoreConfig::InMemory,
616 })
617 .await
618 .expect("connect storage"),
619 );
620
621 let topic = "fleetforge.events";
622 let payload = json!({"run_id": "integration", "ok": true});
623 let run_id = Uuid::new_v4();
624 let event_id: i64 = sqlx::query_scalar(
625 "insert into outbox (run_id, kind, payload) values ($1, 'event', $2) returning id",
626 )
627 .bind(run_id)
628 .bind(payload.clone())
629 .fetch_one(storage.pool())
630 .await
631 .expect("insert outbox");
632
633 let redpanda_image =
635 GenericImage::new("docker.redpanda.com/redpandadata/redpanda", "v23.3.14")
636 .with_env_var("REDPANDA_AUTO_CREATE_TOPICS", "true")
637 .with_exposed_port(9092)
638 .with_cmd(vec![
639 "redpanda".into(),
640 "start".into(),
641 "--overprovisioned".into(),
642 "--smp".into(),
643 "1".into(),
644 "--memory".into(),
645 "256M".into(),
646 "--reserve-memory".into(),
647 "0M".into(),
648 "--node-id".into(),
649 "0".into(),
650 "--check=false".into(),
651 "--kafka-addr".into(),
652 "PLAINTEXT://0.0.0.0:9092".into(),
653 "--advertise-kafka-addr".into(),
654 "PLAINTEXT://redpanda:9092".into(),
655 "--set".into(),
656 "redpanda.auto_create_topics_enabled=true".into(),
657 ]);
658
659 let redpanda_container = docker.run(redpanda_image);
660 let kafka_port = redpanda_container.get_host_port_ipv4(9092);
661
662 sleep(Duration::from_secs(2)).await;
664
665 let config = BusConfig {
666 bootstrap_servers: format!("127.0.0.1:{kafka_port}"),
667 topic: topic.to_string(),
668 batch_size: 64,
669 idle_backoff: Duration::from_millis(50),
670 client_props: vec![
671 ("message.timeout.ms".to_string(), "5000".to_string()),
672 ("socket.timeout.ms".to_string(), "5000".to_string()),
673 ],
674 transactional_id: Some(format!("test-txn-{}", Uuid::new_v4())),
675 };
676
677 let forwarder =
678 OutboxForwarder::new(Arc::clone(&storage), config.clone()).expect("create forwarder");
679 assert!(forwarder.process_once().await.expect("process once"));
680
681 let published: Option<chrono::DateTime<chrono::Utc>> =
682 sqlx::query_scalar("select published_at from outbox where id = $1")
683 .bind(event_id)
684 .fetch_one(storage.pool())
685 .await
686 .expect("query published");
687 assert!(
688 published.is_some(),
689 "outbox row should be marked as published"
690 );
691
692 let consumer: StreamConsumer = ClientConfig::new()
693 .set("group.id", &format!("ff-test-{}", Uuid::new_v4()))
694 .set("bootstrap.servers", &config.bootstrap_servers)
695 .set("auto.offset.reset", "earliest")
696 .create()
697 .expect("create consumer");
698 consumer.subscribe(&[topic]).expect("subscribe to topic");
699
700 let message = timeout(Duration::from_secs(15), async {
701 loop {
702 match consumer.recv().await {
703 Ok(record) => break Ok(record),
704 Err(err) => {
705 if err.is_partition_eof() {
706 continue;
707 }
708 break Err(err);
709 }
710 }
711 }
712 })
713 .await
714 .expect("timed out waiting for message")
715 .expect("consumer error");
716
717 let key = message
718 .key()
719 .and_then(|bytes| std::str::from_utf8(bytes).ok())
720 .expect("key present");
721 assert_eq!(key, event_id.to_string());
722
723 let payload_bytes = message.payload().expect("payload present");
724 let payload_value: serde_json::Value =
725 serde_json::from_slice(payload_bytes).expect("valid json payload");
726 assert_eq!(payload_value, payload);
727
728 drop(redpanda_container);
729 drop(pg_container);
730 }
731
732 #[tokio::test]
733 async fn forwards_to_redpanda_at_least_once() {
734 if !docker_available() {
735 eprintln!("Skipping redpanda integration test because Docker is unavailable");
736 return;
737 }
738
739 let docker = Cli::default();
740 let pg_container = docker.run(Postgres::default());
741 let pg_port = pg_container.get_host_port_ipv4(5432);
742 let db_url = format!("postgres://postgres:postgres@127.0.0.1:{pg_port}/postgres");
743 sleep(Duration::from_millis(250)).await;
744
745 let storage = Arc::new(
746 Storage::connect(StorageConfig {
747 database_url: db_url,
748 max_connections: 2,
749 connect_timeout: Duration::from_secs(10),
750 object_store: ObjectStoreConfig::InMemory,
751 })
752 .await
753 .expect("connect storage"),
754 );
755
756 let topic = "fleetforge.events";
757 let payload = json!({"run_id": "integration", "ok": true});
758 let run_id = Uuid::new_v4();
759 let event_id: i64 = sqlx::query_scalar(
760 "insert into outbox (run_id, kind, payload) values ($1, 'event', $2) returning id",
761 )
762 .bind(run_id)
763 .bind(payload.clone())
764 .fetch_one(storage.pool())
765 .await
766 .expect("insert outbox");
767
768 let redpanda_image =
769 GenericImage::new("docker.redpanda.com/redpandadata/redpanda", "v23.3.14")
770 .with_env_var("REDPANDA_AUTO_CREATE_TOPICS", "true")
771 .with_exposed_port(9092)
772 .with_cmd(vec![
773 "redpanda".into(),
774 "start".into(),
775 "--overprovisioned".into(),
776 "--smp".into(),
777 "1".into(),
778 "--memory".into(),
779 "256M".into(),
780 "--reserve-memory".into(),
781 "0M".into(),
782 "--node-id".into(),
783 "0".into(),
784 "--check=false".into(),
785 "--kafka-addr".into(),
786 "PLAINTEXT://0.0.0.0:9092".into(),
787 "--advertise-kafka-addr".into(),
788 "PLAINTEXT://redpanda:9092".into(),
789 "--set".into(),
790 "redpanda.auto_create_topics_enabled=true".into(),
791 ]);
792
793 let redpanda_container = docker.run(redpanda_image);
794 let kafka_port = redpanda_container.get_host_port_ipv4(9092);
795
796 sleep(Duration::from_secs(2)).await;
797
798 let config = BusConfig {
799 bootstrap_servers: format!("127.0.0.1:{kafka_port}"),
800 topic: topic.to_string(),
801 batch_size: 64,
802 idle_backoff: Duration::from_millis(50),
803 client_props: vec![
804 ("message.timeout.ms".to_string(), "5000".to_string()),
805 ("socket.timeout.ms".to_string(), "5000".to_string()),
806 ],
807 transactional_id: None,
808 };
809
810 let forwarder =
811 OutboxForwarder::new(Arc::clone(&storage), config.clone()).expect("forwarder");
812 assert!(forwarder.process_once().await.expect("process once"));
813
814 let published: Option<chrono::DateTime<chrono::Utc>> =
815 sqlx::query_scalar("select published_at from outbox where id = $1")
816 .bind(event_id)
817 .fetch_one(storage.pool())
818 .await
819 .expect("query published");
820 assert!(
821 published.is_some(),
822 "outbox row should be marked as published"
823 );
824
825 let consumer: StreamConsumer = ClientConfig::new()
826 .set("group.id", &format!("ff-test-{}", Uuid::new_v4()))
827 .set("bootstrap.servers", &config.bootstrap_servers)
828 .set("auto.offset.reset", "earliest")
829 .create()
830 .expect("create consumer");
831 consumer.subscribe(&[topic]).expect("subscribe to topic");
832
833 let message = timeout(Duration::from_secs(15), async {
834 loop {
835 match consumer.recv().await {
836 Ok(record) => break Ok(record),
837 Err(err) => {
838 if err.is_partition_eof() {
839 continue;
840 }
841 break Err(err);
842 }
843 }
844 }
845 })
846 .await
847 .expect("timed out waiting for message")
848 .expect("consumer error");
849
850 let key = message
851 .key()
852 .and_then(|bytes| std::str::from_utf8(bytes).ok())
853 .expect("key present");
854 assert_eq!(key, event_id.to_string());
855
856 let payload_bytes = message.payload().expect("payload present");
857 let payload_value: serde_json::Value =
858 serde_json::from_slice(payload_bytes).expect("valid json payload");
859 assert_eq!(payload_value, payload);
860
861 drop(redpanda_container);
862 drop(pg_container);
863 }
864
865 struct MockProducer {
866 transactional: bool,
867 fail_send: AtomicUsize,
868 fail_commit_once: AtomicBool,
869 pub sent: Mutex<Vec<(String, Vec<u8>)>>,
870 }
871
872 impl MockProducer {
873 fn new(transactional: bool) -> Self {
874 Self {
875 transactional,
876 fail_send: AtomicUsize::new(0),
877 fail_commit_once: AtomicBool::new(false),
878 sent: Mutex::new(Vec::new()),
879 }
880 }
881
882 fn failing(transactional: bool, send_failures: usize, fail_commit: bool) -> Self {
883 Self {
884 transactional,
885 fail_send: AtomicUsize::new(send_failures),
886 fail_commit_once: AtomicBool::new(fail_commit),
887 sent: Mutex::new(Vec::new()),
888 }
889 }
890
891 fn reset_failures(&self) {
892 self.fail_send.store(0, Ordering::SeqCst);
893 self.fail_commit_once.store(false, Ordering::SeqCst);
894 }
895 }
896
897 #[async_trait]
898 impl EventProducer for MockProducer {
899 fn transactional(&self) -> bool {
900 self.transactional
901 }
902
903 async fn send(&self, _topic: &str, key: String, payload: Vec<u8>) -> Result<()> {
904 if self.fail_send.load(Ordering::SeqCst) > 0 {
905 self.fail_send.fetch_sub(1, Ordering::SeqCst);
906 return Err(anyhow!("injected send failure"));
907 }
908
909 self.sent.lock().unwrap().push((key, payload));
910 Ok(())
911 }
912
913 fn begin_transaction(&self) -> Result<()> {
914 Ok(())
915 }
916
917 fn commit_transaction(&self) -> Result<()> {
918 if self.fail_commit_once.swap(false, Ordering::SeqCst) {
919 Err(anyhow!("injected commit failure"))
920 } else {
921 Ok(())
922 }
923 }
924
925 fn abort_transaction(&self) -> Result<()> {
926 Ok(())
927 }
928 }
929}