RekayasaMar 28, 2026
Arsitektur Event-Driven di Rust — Pola Bus untuk Sistem Real-Time
OS
Open Soft Team
Engineering Team
Apa Itu Arsitektur Event-Driven?
Arsitektur event-driven adalah pola di mana komponen berkomunikasi melalui event daripada panggilan langsung. Ini memungkinkan loose coupling — setiap komponen tidak perlu mengetahui komponen lain, hanya jenis event yang mereka terbitkan atau konsumsi.
Dalam Rust, kita membangun event bus menggunakan channel async dari tokio. Ini memberikan performa tinggi dengan zero-cost abstraction yang menjadi ciri khas Rust.
Event Bus dengan tokio::broadcast
use tokio::sync::broadcast;
use serde::{Serialize, Deserialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
enum AppEvent {
UserRegistered { user_id: Uuid, email: String },
OrderPlaced { order_id: Uuid, user_id: Uuid, total: f64 },
PaymentReceived { order_id: Uuid, amount: f64 },
InventoryUpdated { product_id: Uuid, quantity: i32 },
}
struct EventBus {
sender: broadcast::Sender<AppEvent>,
}
impl EventBus {
fn new(capacity: usize) -> Self {
let (sender, _) = broadcast::channel(capacity);
Self { sender }
}
fn publish(&self, event: AppEvent) {
// Abaikan error jika tidak ada subscriber
let _ = self.sender.send(event);
}
fn subscribe(&self) -> broadcast::Receiver<AppEvent> {
self.sender.subscribe()
}
}
Subscriber Pattern
Setiap komponen subscribe ke event yang relevan:
async fn email_service(mut rx: broadcast::Receiver<AppEvent>) {
loop {
match rx.recv().await {
Ok(AppEvent::UserRegistered { email, .. }) => {
send_welcome_email(&email).await;
}
Ok(AppEvent::OrderPlaced { order_id, user_id, .. }) => {
send_order_confirmation(user_id, order_id).await;
}
Ok(_) => {} // Abaikan event lain
Err(broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!("Email service tertinggal {} event", n);
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
}
async fn inventory_service(mut rx: broadcast::Receiver<AppEvent>) {
loop {
match rx.recv().await {
Ok(AppEvent::OrderPlaced { order_id, .. }) => {
reserve_inventory(order_id).await;
}
Ok(AppEvent::PaymentReceived { order_id, .. }) => {
confirm_inventory_reservation(order_id).await;
}
Ok(_) => {}
Err(broadcast::error::RecvError::Closed) => break,
Err(_) => continue,
}
}
}
Integrasi dengan Axum
use axum::{Router, Extension, Json};
#[derive(Clone)]
struct AppState {
event_bus: Arc<EventBus>,
db: PgPool,
}
async fn create_order(
State(state): State<AppState>,
Json(payload): Json<CreateOrderRequest>,
) -> Result<Json<Order>, AppError> {
// 1. Simpan ke database
let order = sqlx::query_as::<_, Order>(
"INSERT INTO orders (user_id, total) VALUES ($1, $2) RETURNING *"
)
.bind(payload.user_id)
.bind(payload.total)
.fetch_one(&state.db)
.await?;
// 2. Publish event
state.event_bus.publish(AppEvent::OrderPlaced {
order_id: order.id,
user_id: order.user_id,
total: order.total,
});
// 3. Return response segera
Ok(Json(order))
}
#[tokio::main]
async fn main() {
let event_bus = Arc::new(EventBus::new(1000));
// Spawn subscriber services
tokio::spawn(email_service(event_bus.subscribe()));
tokio::spawn(inventory_service(event_bus.subscribe()));
tokio::spawn(analytics_service(event_bus.subscribe()));
let state = AppState {
event_bus,
db: create_pool().await,
};
let app = Router::new()
.route("/orders", post(create_order))
.with_state(state);
let listener = tokio::net::TcpListener::bind("0.0.0.0:3001").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
Keuntungan Event-Driven
- Loose coupling — Handler tidak perlu tahu tentang email, inventory, atau analytics service
- Extensibility — Tambahkan subscriber baru tanpa mengubah publisher
- Testability — Test setiap subscriber secara independen
- Performance — Event diproses secara asinkron, tidak memblokir response
- Resilience — Kegagalan satu subscriber tidak memengaruhi yang lain
Error Handling dan Retry
async fn resilient_subscriber(
mut rx: broadcast::Receiver<AppEvent>,
max_retries: u32,
) {
loop {
let event = match rx.recv().await {
Ok(e) => e,
Err(_) => continue,
};
for attempt in 0..max_retries {
match process_event(&event).await {
Ok(()) => break,
Err(e) if attempt < max_retries - 1 => {
tracing::warn!(
"Retry {}/{}: {:?}",
attempt + 1, max_retries, e
);
tokio::time::sleep(
Duration::from_millis(100 * 2u64.pow(attempt))
).await;
}
Err(e) => {
tracing::error!("Gagal setelah {} retry: {:?}", max_retries, e);
// Kirim ke dead letter queue
dead_letter_queue(&event, &e).await;
}
}
}
}
}
Kesimpulan
Arsitektur event-driven di Rust memberikan decoupling, extensibility, dan performa tinggi. tokio broadcast channel menyediakan primitif yang efisien untuk event bus, dan integrasi dengan Axum memungkinkan HTTP handler yang responsif tanpa memblokir pada side effect.