Перейти к основному содержимому
ИнженерияMar 28, 2026

Deep EVM #21: Event-driven архитектура на Rust — паттерн шины для реалтайм-систем

OS
Open Soft Team

Engineering Team

Зачем event-driven архитектура

В блокчейн-инфраструктуре события — это основа всего. Новый блок, транзакция в мемпуле, изменение цены в пуле ликвидности — всё это события, на которые ваша система должна реагировать в реальном времени. Традиционная архитектура запрос-ответ здесь не работает: вы не можете поллить блокчейн каждую миллисекунду.

Event-driven архитектура (EDA) решает эту проблему через асинхронную обработку потока событий. Компоненты системы не вызывают друг друга напрямую — они публикуют и подписываются на события через шину.

Паттерн шины событий в Rust

Центральный элемент EDA — шина событий (event bus). В Rust мы можем реализовать её через tokio::sync::broadcast:

use tokio::sync::broadcast;
use serde::{Serialize, Deserialize};

#[derive(Clone, Debug, Serialize, Deserialize)]
enum Event {
    NewBlock { number: u64, hash: String },
    PendingTx { hash: String, from: String, to: String },
    PriceUpdate { pair: String, price: f64 },
    LiquidationRisk { address: String, health_factor: f64 },
}

struct EventBus {
    sender: broadcast::Sender<Event>,
}

impl EventBus {
    fn new(capacity: usize) -> Self {
        let (sender, _) = broadcast::channel(capacity);
        Self { sender }
    }

    fn publish(&self, event: Event) {
        // Игнорируем ошибку, если нет подписчиков
        let _ = self.sender.send(event);
    }

    fn subscribe(&self) -> broadcast::Receiver<Event> {
        self.sender.subscribe()
    }
}

Архитектура компонентов

Система состоит из трёх типов компонентов:

Producers (источники событий)

async fn block_watcher(
    bus: Arc<EventBus>,
    provider: Arc<Provider>,
) {
    let mut stream = provider.subscribe_blocks().await.unwrap();

    while let Some(block) = stream.next().await {
        bus.publish(Event::NewBlock {
            number: block.number.unwrap().as_u64(),
            hash: format!("{:?}", block.hash.unwrap()),
        });
    }
}

Consumers (обработчики событий)

async fn arbitrage_detector(
    bus: Arc<EventBus>,
    strategy: Arc<Strategy>,
) {
    let mut rx = bus.subscribe();

    while let Ok(event) = rx.recv().await {
        match event {
            Event::PriceUpdate { pair, price } => {
                if let Some(opportunity) = strategy.check(&pair, price) {
                    bus.publish(Event::ArbitrageFound(opportunity));
                }
            }
            _ => {} // Игнорируем нерелевантные события
        }
    }
}

Hybrid (и источник, и обработчик)

Компонент может подписываться на одни события и публиковать другие, создавая цепочки обработки.

Управление backpressure

Одна из ключевых проблем EDA — backpressure. Если consumer не успевает обрабатывать события, канал переполняется.

broadcast в tokio при переполнении отбрасывает старые сообщения и возвращает RecvError::Lagged. Для критичных consumer’ов используйте буферизированные каналы:

async fn buffered_consumer(
    bus: Arc<EventBus>,
) {
    let mut rx = bus.subscribe();
    let (tx, mut buffered_rx) = tokio::sync::mpsc::channel(10_000);

    // Буферизация в отдельной задаче
    tokio::spawn(async move {
        while let Ok(event) = rx.recv().await {
            if tx.send(event).await.is_err() {
                break;
            }
        }
    });

    // Обработка из буфера
    while let Some(event) = buffered_rx.recv().await {
        process(event).await;
    }
}

Типизированные каналы с enum dispatch

Вместо одного enum Event для всех событий можно использовать типизированные каналы через трейт-объекты:

trait EventHandler: Send + Sync {
    fn event_type(&self) -> &'static str;
    fn handle(&self, payload: &[u8]) -> Result<(), Error>;
}

struct TypedBus {
    handlers: DashMap<String, Vec<Arc<dyn EventHandler>>>,
}

Но на практике enum dispatch проще, быстрее (нет динамической диспетчеризации) и достаточен для большинства систем.

Персистентность событий

Для отказоустойчивости события нужно сохранять. Варианты:

  • PostgreSQL с LISTEN/NOTIFY для простых случаев
  • Redis Streams для высокой пропускной способности
  • Apache Kafka для корпоративных систем
  • NATS JetStream для облачных систем

Пример с Redis Streams:

async fn persist_event(
    redis: &mut Connection,
    event: &Event,
) -> Result<String> {
    let payload = serde_json::to_string(event)?;
    let id: String = redis::cmd("XADD")
        .arg("events")
        .arg("MAXLEN")
        .arg("~")
        .arg(100_000)
        .arg("*")
        .arg("data")
        .arg(&payload)
        .query_async(redis)
        .await?;
    Ok(id)
}

Мониторинг и метрики

Для EDA критичны следующие метрики:

  • Event throughput — количество событий в секунду
  • Processing latency — время от публикации до обработки
  • Queue depth — размер очередей consumer’ов
  • Error rate — процент ошибок обработки

Используйте metrics crate для экспорта в Prometheus:

metrics::counter!("events_published_total", "type" => "NewBlock").increment(1);
metrics::histogram!("event_processing_duration_seconds").record(elapsed);

Заключение

Event-driven архитектура на Rust — это мощный паттерн для реалтайм-систем, особенно в блокчейн-инфраструктуре. Комбинация tokio broadcast для in-process коммуникации и Redis Streams/NATS для распределённой обработки обеспечивает масштабируемость, отказоустойчивость и низкую латентность.