Aller au contenu principal
EngineeringMar 28, 2026

Deep EVM #21: Event-Driven Architecture in Rust — Bus Pattern for Real-Time Systems

OS
Open Soft Team

Engineering Team

Why Event-Driven Architecture

In high-performance systems like MEV bots, blockchain indexers, and real-time trading engines, components must react to events with minimal latency. A new block arrives, and within milliseconds your system must decode transactions, evaluate opportunities, simulate execution, and submit a response.

The traditional request-response model cannot keep up. You need an event-driven architecture where components communicate through asynchronous message passing. In Rust, tokio’s channel primitives provide the foundation for building such systems.

Tokio Channel Primitives

Tokio provides four channel types, each suited for different communication patterns:

mpsc — Multi-Producer, Single Consumer

Many senders, one receiver. The workhorse for event aggregation:

use tokio::sync::mpsc;

#[derive(Debug, Clone)]
enum BusEvent {
    NewBlock { number: u64, hash: [u8; 32] },
    NewTransaction { hash: [u8; 32], from: [u8; 20], to: [u8; 20] },
    PriceUpdate { pair: String, price: f64, timestamp: u64 },
    OpportunityFound { profit_wei: u128, deadline_block: u64 },
}

async fn event_aggregator(mut rx: mpsc::Receiver<BusEvent>) {
    while let Some(event) = rx.recv().await {
        match event {
            BusEvent::NewBlock { number, .. } => {
                tracing::info!(block = number, "Processing new block");
            }
            BusEvent::PriceUpdate { pair, price, .. } => {
                tracing::info!(%pair, %price, "Price updated");
            }
            _ => {}
        }
    }
}

Create a bounded channel with backpressure:

let (tx, rx) = mpsc::channel::<BusEvent>(1024);

// Sender blocks if channel is full (backpressure)
tx.send(BusEvent::NewBlock {
    number: 18_000_000,
    hash: [0u8; 32],
}).await.expect("receiver dropped");

broadcast — Multi-Producer, Multi-Consumer

Every subscriber gets every message. Perfect for fan-out:

use tokio::sync::broadcast;

let (tx, _) = broadcast::channel::<BusEvent>(4096);

// Each subscriber gets their own receiver
let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();

// Publisher
tx.send(BusEvent::NewBlock {
    number: 18_000_001,
    hash: [0u8; 32],
}).expect("no subscribers");

// Both rx1 and rx2 receive the event

Broadcast channels have a fixed capacity. If a subscriber falls behind, it receives a RecvError::Lagged(n) indicating it missed n messages. Handle this gracefully:

loop {
    match rx.recv().await {
        Ok(event) => handle_event(event).await,
        Err(broadcast::error::RecvError::Lagged(n)) => {
            tracing::warn!(missed = n, "Subscriber lagged, catching up");
            // Continue — next recv() gets the latest available
        }
        Err(broadcast::error::RecvError::Closed) => break,
    }
}

watch — Single-Producer, Multi-Consumer (Latest Value)

Subscribers only see the latest value. Ideal for state that changes frequently but consumers only care about the current state:

use tokio::sync::watch;

#[derive(Debug, Clone)]
struct SystemState {
    latest_block: u64,
    gas_price: u128,
    connected_peers: usize,
}

let (tx, rx) = watch::channel(SystemState {
    latest_block: 0,
    gas_price: 0,
    connected_peers: 0,
});

// Update state
tx.send_modify(|state| {
    state.latest_block = 18_000_001;
    state.gas_price = 30_000_000_000; // 30 gwei
});

// Consumer reads latest state
let state = rx.borrow();
println!("Block: {}, Gas: {} gwei",
    state.latest_block,
    state.gas_price / 1_000_000_000
);

The BusEvent Enum Pattern

Centralize all event types in a single enum. This provides type safety, exhaustive pattern matching, and a single source of truth for all events in the system:

#[derive(Debug, Clone)]
enum BusEvent {
    // Blockchain events
    NewBlock(BlockEvent),
    NewPendingTx(PendingTxEvent),
    Reorg(ReorgEvent),

    // Market events
    PriceUpdate(PriceEvent),
    LiquidityChange(LiquidityEvent),

    // System events
    HealthCheck(HealthEvent),
    Shutdown,
}

#[derive(Debug, Clone)]
struct BlockEvent {
    number: u64,
    hash: [u8; 32],
    timestamp: u64,
    base_fee: u128,
    transactions: Vec<[u8; 32]>,
}

This pattern has several advantages:

  • Compile-time guarantee that handlers cover all event types
  • Easy to add new events without breaking existing handlers
  • Serializable for logging and replay
  • Pattern matching enables efficient dispatching

Handler Registration and Dispatching

Build a handler registry using trait objects:

use std::sync::Arc;
use async_trait::async_trait;

#[async_trait]
trait EventHandler: Send + Sync {
    async fn handle(&self, event: &BusEvent) -> anyhow::Result<()>;
    fn interested_in(&self, event: &BusEvent) -> bool;
}

struct EventBus {
    tx: broadcast::Sender<BusEvent>,
    handlers: Vec<Arc<dyn EventHandler>>,
}

impl EventBus {
    fn new(capacity: usize) -> Self {
        let (tx, _) = broadcast::channel(capacity);
        Self {
            tx,
            handlers: Vec::new(),
        }
    }

    fn register(&mut self, handler: Arc<dyn EventHandler>) {
        self.handlers.push(handler);
    }

    async fn start(self) -> anyhow::Result<()> {
        let mut handles = Vec::new();

        for handler in &self.handlers {
            let handler = Arc::clone(handler);
            let mut rx = self.tx.subscribe();

            let handle = tokio::spawn(async move {
                loop {
                    match rx.recv().await {
                        Ok(event) => {
                            if handler.interested_in(&event) {
                                if let Err(e) = handler.handle(&event).await {
                                    tracing::error!(
                                        error = %e,
                                        "Handler failed"
                                    );
                                }
                            }
                        }
                        Err(broadcast::error::RecvError::Lagged(n)) => {
                            tracing::warn!(missed = n, "Handler lagged");
                        }
                        Err(broadcast::error::RecvError::Closed) => break,
                    }
                }
            });

            handles.push(handle);
        }

        futures::future::join_all(handles).await;
        Ok(())
    }

    fn publish(&self, event: BusEvent) -> anyhow::Result<()> {
        self.tx.send(event)
            .map_err(|_| anyhow::anyhow!("No subscribers"))?;
        Ok(())
    }
}

Implementing a Concrete Handler

struct ArbitrageDetector {
    min_profit_wei: u128,
    result_tx: mpsc::Sender<Opportunity>,
}

#[async_trait]
impl EventHandler for ArbitrageDetector {
    async fn handle(&self, event: &BusEvent) -> anyhow::Result<()> {
        if let BusEvent::PriceUpdate(price) = event {
            if let Some(opp) = self.evaluate_arbitrage(price).await? {
                if opp.profit_wei >= self.min_profit_wei {
                    self.result_tx.send(opp).await?;
                }
            }
        }
        Ok(())
    }

    fn interested_in(&self, event: &BusEvent) -> bool {
        matches!(event, BusEvent::PriceUpdate(_))
    }
}

Backpressure Strategies

Backpressure is critical in real-time systems. If a handler cannot keep up with events, you need a strategy:

Strategy 1: Bounded Channel (Block Producer)

let (tx, rx) = mpsc::channel(1024);
// If channel is full, send().await blocks the producer

Good for: systems where slowing down the producer is acceptable.

Strategy 2: Try-Send (Drop Events)

match tx.try_send(event) {
    Ok(()) => {},
    Err(mpsc::error::TrySendError::Full(_)) => {
        tracing::warn!("Channel full, dropping event");
        metrics::counter!("events_dropped").increment(1);
    }
    Err(mpsc::error::TrySendError::Closed(_)) => break,
}

Good for: real-time systems where stale events have no value.

Strategy 3: Replace Latest (Watch Channel)

let (tx, rx) = watch::channel(latest_state);
// Only keeps latest value, consumers always see current state

Good for: state that is overwritten frequently (prices, block height).

When to Use Channels vs Shared State

ScenarioUse ChannelsUse Arc<RwLock>
Event notificationYesNo
Frequently updated statewatch channelYes
Read-heavy, write-rareNoYes
Complex transactionsmpsc to actorYes (with Mutex)
Fan-out to many consumersbroadcastNo

The rule of thumb: if you need to notify others about a change, use channels. If you need to share access to data without notification, use shared state.

Production Example: MEV Bot Event Flow

[WebSocket] -> NewBlock event -> [broadcast]
                                    |
                    +---------------+---------------+
                    |               |               |
              [TxDecoder]    [PriceOracle]    [StateUpdater]
                    |               |               |
                    v               v               v
              [mpsc: decoded]  [watch: prices]  [watch: state]
                    |               |               |
                    +-------+-------+-------+-------+
                            |               |
                    [ArbitrageDetector]  [LiquidationDetector]
                            |               |
                            v               v
                    [mpsc: opportunities]
                            |
                    [ExecutionEngine]

This architecture processes a new block in under 10ms with each component running independently on separate tokio tasks.

Conclusion

Event-driven architecture in Rust with tokio channels provides the performance and type safety needed for real-time systems. The BusEvent enum pattern centralizes event definitions, broadcast channels enable fan-out, mpsc channels provide backpressure, and watch channels optimize for latest-value semantics. Choose the right channel type for each communication pattern, implement proper backpressure handling, and your system will be both fast and resilient.