본문으로 건너뛰기
엔지니어링Mar 28, 2026

Deep EVM #21: Rust에서의 이벤트 기반 아키텍처 — 실시간 시스템을 위한 버스 패턴

OS
Open Soft Team

Engineering Team

이벤트 기반 아키텍처가 필요한 이유

MEV 봇, 블록체인 인덱서, 실시간 거래 엔진과 같은 고성능 시스템에서 컴포넌트는 최소 지연 시간으로 이벤트에 반응해야 합니다. 새 블록이 도착하면, 밀리초 내에 시스템은 트랜잭션을 디코딩하고, 기회를 평가하며, 실행을 시뮬레이션하고, 응답을 제출해야 합니다.

전통적인 요청-응답 모델은 따라갈 수 없습니다. 비동기 메시지 전달을 통해 컴포넌트가 통신하는 이벤트 기반 아키텍처가 필요합니다. Rust에서는 tokio의 채널 원시 자료형이 이러한 시스템을 구축하는 기반을 제공합니다.

Tokio 채널 원시 자료형

Tokio는 각각 다른 통신 패턴에 적합한 네 가지 채널 유형을 제공합니다:

mpsc — 다중 생산자, 단일 소비자

많은 발신자, 한 수신자. 이벤트 집계를 위한 주력 패턴입니다:

use tokio::sync::mpsc;

#[derive(Debug, Clone)]
enum BusEvent {
    NewBlock { number: u64, hash: [u8; 32] },
    NewTransaction { hash: [u8; 32], from: [u8; 20], to: [u8; 20] },
    PriceUpdate { pair: String, price: f64, timestamp: u64 },
    OpportunityFound { profit_wei: u128, deadline_block: u64 },
}

async fn event_aggregator(mut rx: mpsc::Receiver<BusEvent>) {
    while let Some(event) = rx.recv().await {
        match event {
            BusEvent::NewBlock { number, .. } => {
                tracing::info!(block = number, "Processing new block");
            }
            BusEvent::PriceUpdate { pair, price, .. } => {
                tracing::info!(%pair, %price, "Price updated");
            }
            _ => {}
        }
    }
}

백프레셔가 있는 바운드 채널을 생성합니다:

let (tx, rx) = mpsc::channel::<BusEvent>(1024);

// 채널이 가득 차면 발신자가 차단됨 (백프레셔)
tx.send(BusEvent::NewBlock {
    number: 18_000_000,
    hash: [0u8; 32],
}).await.expect("receiver dropped");

broadcast — 다중 생산자, 다중 소비자

모든 구독자가 모든 메시지를 받습니다. 팬아웃에 완벽합니다:

use tokio::sync::broadcast;

let (tx, _) = broadcast::channel::<BusEvent>(4096);

// 각 구독자가 자체 수신자를 얻음
let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();

// 발행자
tx.send(BusEvent::NewBlock {
    number: 18_000_001,
    hash: [0u8; 32],
}).expect("no subscribers");

// rx1과 rx2 모두 이벤트를 수신

broadcast 채널은 고정 용량을 가집니다. 구독자가 뒤처지면, n개의 메시지를 놓쳤음을 나타내는 RecvError::Lagged(n)을 수신합니다. 이를 우아하게 처리합니다:

loop {
    match rx.recv().await {
        Ok(event) => handle_event(event).await,
        Err(broadcast::error::RecvError::Lagged(n)) => {
            tracing::warn!(missed = n, "Subscriber lagged, catching up");
        }
        Err(broadcast::error::RecvError::Closed) => break,
    }
}

watch — 단일 생산자, 다중 소비자 (최신값)

구독자는 최신 값만 봅니다. 자주 변경되지만 소비자가 현재 상태만 관심 갖는 상태에 이상적입니다:

use tokio::sync::watch;

#[derive(Debug, Clone)]
struct SystemState {
    latest_block: u64,
    gas_price: u128,
    connected_peers: usize,
}

let (tx, rx) = watch::channel(SystemState {
    latest_block: 0,
    gas_price: 0,
    connected_peers: 0,
});

// 상태 업데이트
tx.send_modify(|state| {
    state.latest_block = 18_000_001;
    state.gas_price = 30_000_000_000; // 30 gwei
});

// 소비자가 최신 상태 읽기
let state = rx.borrow();
println!("Block: {}, Gas: {} gwei",
    state.latest_block,
    state.gas_price / 1_000_000_000
);

BusEvent 열거형 패턴

모든 이벤트 유형을 단일 열거형으로 중앙화합니다. 이것은 타입 안전성, 완전한 패턴 매칭, 시스템의 모든 이벤트에 대한 단일 진실의 원천을 제공합니다:

#[derive(Debug, Clone)]
enum BusEvent {
    // 블록체인 이벤트
    NewBlock(BlockEvent),
    NewPendingTx(PendingTxEvent),
    Reorg(ReorgEvent),

    // 시장 이벤트
    PriceUpdate(PriceEvent),
    LiquidityChange(LiquidityEvent),

    // 시스템 이벤트
    HealthCheck(HealthEvent),
    Shutdown,
}

이 패턴에는 여러 장점이 있습니다:

  • 핸들러가 모든 이벤트 유형을 커버하는 컴파일 타임 보장
  • 기존 핸들러를 깨뜨리지 않고 새 이벤트를 쉽게 추가
  • 로깅과 리플레이를 위한 직렬화 가능
  • 패턴 매칭으로 효율적인 디스패칭 가능

핸들러 등록과 디스패칭

트레잇 객체를 사용하여 핸들러 레지스트리를 구축합니다:

use std::sync::Arc;
use async_trait::async_trait;

#[async_trait]
trait EventHandler: Send + Sync {
    async fn handle(&self, event: &BusEvent) -> anyhow::Result<()>;
    fn interested_in(&self, event: &BusEvent) -> bool;
}

struct EventBus {
    tx: broadcast::Sender<BusEvent>,
    handlers: Vec<Arc<dyn EventHandler>>,
}

impl EventBus {
    fn new(capacity: usize) -> Self {
        let (tx, _) = broadcast::channel(capacity);
        Self {
            tx,
            handlers: Vec::new(),
        }
    }

    fn register(&mut self, handler: Arc<dyn EventHandler>) {
        self.handlers.push(handler);
    }

    async fn start(self) -> anyhow::Result<()> {
        let mut handles = Vec::new();
        for handler in &self.handlers {
            let handler = Arc::clone(handler);
            let mut rx = self.tx.subscribe();
            let handle = tokio::spawn(async move {
                loop {
                    match rx.recv().await {
                        Ok(event) => {
                            if handler.interested_in(&event) {
                                if let Err(e) = handler.handle(&event).await {
                                    tracing::error!(error = %e, "Handler failed");
                                }
                            }
                        }
                        Err(broadcast::error::RecvError::Lagged(n)) => {
                            tracing::warn!(missed = n, "Handler lagged");
                        }
                        Err(broadcast::error::RecvError::Closed) => break,
                    }
                }
            });
            handles.push(handle);
        }
        futures::future::join_all(handles).await;
        Ok(())
    }
}

백프레셔 전략

백프레셔는 실시간 시스템에서 중요합니다. 핸들러가 이벤트를 따라잡지 못하면 전략이 필요합니다:

전략 1: 바운드 채널 (생산자 차단)

let (tx, rx) = mpsc::channel(1024);
// 채널이 가득 차면 send().await가 생산자를 차단

적합: 생산자를 느리게 하는 것이 허용되는 시스템.

전략 2: Try-Send (이벤트 드롭)

match tx.try_send(event) {
    Ok(()) => {},
    Err(mpsc::error::TrySendError::Full(_)) => {
        tracing::warn!("Channel full, dropping event");
        metrics::counter!("events_dropped").increment(1);
    }
    Err(mpsc::error::TrySendError::Closed(_)) => break,
}

적합: 오래된 이벤트에 가치가 없는 실시간 시스템.

전략 3: 최신값 교체 (Watch 채널)

let (tx, rx) = watch::channel(latest_state);
// 최신값만 유지, 소비자는 항상 현재 상태를 봄

적합: 자주 덮어쓰이는 상태(가격, 블록 높이).

채널 vs 공유 상태 사용 시점

시나리오채널 사용Arc<RwLock> 사용
이벤트 알림아니오
자주 업데이트되는 상태watch 채널
읽기 많음, 쓰기 드묾아니오
복잡한 트랜잭션액터에 mpsc예 (Mutex 포함)
다수 소비자 팬아웃broadcast아니오

경험 법칙: 변경에 대해 다른 곳에 알려야 하면 채널을 사용합니다. 알림 없이 데이터에 대한 접근을 공유해야 하면 공유 상태를 사용합니다.

프로덕션 예시: MEV 봇 이벤트 흐름

[WebSocket] -> NewBlock event -> [broadcast]
                                    |
                    +---------------+---------------+
                    |               |               |
              [TxDecoder]    [PriceOracle]    [StateUpdater]
                    |               |               |
                    v               v               v
              [mpsc: decoded]  [watch: prices]  [watch: state]
                    |               |               |
                    +-------+-------+-------+-------+
                            |               |
                    [ArbitrageDetector]  [LiquidationDetector]
                            |               |
                            v               v
                    [mpsc: opportunities]
                            |
                    [ExecutionEngine]

이 아키텍처는 각 컴포넌트가 별도의 tokio 태스크에서 독립적으로 실행되며, 새 블록을 10ms 미만으로 처리합니다.

결론

tokio 채널을 사용한 Rust의 이벤트 기반 아키텍처는 실시간 시스템에 필요한 성능과 타입 안전성을 제공합니다. BusEvent 열거형 패턴은 이벤트 정의를 중앙화하고, broadcast 채널은 팬아웃을 가능하게 하며, mpsc 채널은 백프레셔를 제공하고, watch 채널은 최신값 의미론을 최적화합니다. 각 통신 패턴에 적절한 채널 유형을 선택하고, 적절한 백프레셔 처리를 구현하면, 시스템은 빠르고 탄력적일 것입니다.