Langsung ke konten utama
RekayasaMar 28, 2026

Arsitektur Event-Driven di Rust — Pola Bus untuk Sistem Real-Time

OS
Open Soft Team

Engineering Team

Apa Itu Arsitektur Event-Driven?

Arsitektur event-driven adalah pola di mana komponen berkomunikasi melalui event daripada panggilan langsung. Ini memungkinkan loose coupling — setiap komponen tidak perlu mengetahui komponen lain, hanya jenis event yang mereka terbitkan atau konsumsi.

Dalam Rust, kita membangun event bus menggunakan channel async dari tokio. Ini memberikan performa tinggi dengan zero-cost abstraction yang menjadi ciri khas Rust.

Event Bus dengan tokio::broadcast

use tokio::sync::broadcast;
use serde::{Serialize, Deserialize};

#[derive(Clone, Debug, Serialize, Deserialize)]
enum AppEvent {
    UserRegistered { user_id: Uuid, email: String },
    OrderPlaced { order_id: Uuid, user_id: Uuid, total: f64 },
    PaymentReceived { order_id: Uuid, amount: f64 },
    InventoryUpdated { product_id: Uuid, quantity: i32 },
}

struct EventBus {
    sender: broadcast::Sender<AppEvent>,
}

impl EventBus {
    fn new(capacity: usize) -> Self {
        let (sender, _) = broadcast::channel(capacity);
        Self { sender }
    }
    
    fn publish(&self, event: AppEvent) {
        // Abaikan error jika tidak ada subscriber
        let _ = self.sender.send(event);
    }
    
    fn subscribe(&self) -> broadcast::Receiver<AppEvent> {
        self.sender.subscribe()
    }
}

Subscriber Pattern

Setiap komponen subscribe ke event yang relevan:

async fn email_service(mut rx: broadcast::Receiver<AppEvent>) {
    loop {
        match rx.recv().await {
            Ok(AppEvent::UserRegistered { email, .. }) => {
                send_welcome_email(&email).await;
            }
            Ok(AppEvent::OrderPlaced { order_id, user_id, .. }) => {
                send_order_confirmation(user_id, order_id).await;
            }
            Ok(_) => {} // Abaikan event lain
            Err(broadcast::error::RecvError::Lagged(n)) => {
                tracing::warn!("Email service tertinggal {} event", n);
            }
            Err(broadcast::error::RecvError::Closed) => break,
        }
    }
}

async fn inventory_service(mut rx: broadcast::Receiver<AppEvent>) {
    loop {
        match rx.recv().await {
            Ok(AppEvent::OrderPlaced { order_id, .. }) => {
                reserve_inventory(order_id).await;
            }
            Ok(AppEvent::PaymentReceived { order_id, .. }) => {
                confirm_inventory_reservation(order_id).await;
            }
            Ok(_) => {}
            Err(broadcast::error::RecvError::Closed) => break,
            Err(_) => continue,
        }
    }
}

Integrasi dengan Axum

use axum::{Router, Extension, Json};

#[derive(Clone)]
struct AppState {
    event_bus: Arc<EventBus>,
    db: PgPool,
}

async fn create_order(
    State(state): State<AppState>,
    Json(payload): Json<CreateOrderRequest>,
) -> Result<Json<Order>, AppError> {
    // 1. Simpan ke database
    let order = sqlx::query_as::<_, Order>(
        "INSERT INTO orders (user_id, total) VALUES ($1, $2) RETURNING *"
    )
    .bind(payload.user_id)
    .bind(payload.total)
    .fetch_one(&state.db)
    .await?;
    
    // 2. Publish event
    state.event_bus.publish(AppEvent::OrderPlaced {
        order_id: order.id,
        user_id: order.user_id,
        total: order.total,
    });
    
    // 3. Return response segera
    Ok(Json(order))
}

#[tokio::main]
async fn main() {
    let event_bus = Arc::new(EventBus::new(1000));
    
    // Spawn subscriber services
    tokio::spawn(email_service(event_bus.subscribe()));
    tokio::spawn(inventory_service(event_bus.subscribe()));
    tokio::spawn(analytics_service(event_bus.subscribe()));
    
    let state = AppState {
        event_bus,
        db: create_pool().await,
    };
    
    let app = Router::new()
        .route("/orders", post(create_order))
        .with_state(state);
    
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3001").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

Keuntungan Event-Driven

  1. Loose coupling — Handler tidak perlu tahu tentang email, inventory, atau analytics service
  2. Extensibility — Tambahkan subscriber baru tanpa mengubah publisher
  3. Testability — Test setiap subscriber secara independen
  4. Performance — Event diproses secara asinkron, tidak memblokir response
  5. Resilience — Kegagalan satu subscriber tidak memengaruhi yang lain

Error Handling dan Retry

async fn resilient_subscriber(
    mut rx: broadcast::Receiver<AppEvent>,
    max_retries: u32,
) {
    loop {
        let event = match rx.recv().await {
            Ok(e) => e,
            Err(_) => continue,
        };
        
        for attempt in 0..max_retries {
            match process_event(&event).await {
                Ok(()) => break,
                Err(e) if attempt < max_retries - 1 => {
                    tracing::warn!(
                        "Retry {}/{}: {:?}",
                        attempt + 1, max_retries, e
                    );
                    tokio::time::sleep(
                        Duration::from_millis(100 * 2u64.pow(attempt))
                    ).await;
                }
                Err(e) => {
                    tracing::error!("Gagal setelah {} retry: {:?}", max_retries, e);
                    // Kirim ke dead letter queue
                    dead_letter_queue(&event, &e).await;
                }
            }
        }
    }
}

Kesimpulan

Arsitektur event-driven di Rust memberikan decoupling, extensibility, dan performa tinggi. tokio broadcast channel menyediakan primitif yang efisien untuk event bus, dan integrasi dengan Axum memungkinkan HTTP handler yang responsif tanpa memblokir pada side effect.