Deep EVM #21: Event-driven архитектура на Rust — паттерн шины для реалтайм-систем
Engineering Team
Зачем event-driven архитектура
В блокчейн-инфраструктуре события — это основа всего. Новый блок, транзакция в мемпуле, изменение цены в пуле ликвидности — всё это события, на которые ваша система должна реагировать в реальном времени. Традиционная архитектура запрос-ответ здесь не работает: вы не можете поллить блокчейн каждую миллисекунду.
Event-driven архитектура (EDA) решает эту проблему через асинхронную обработку потока событий. Компоненты системы не вызывают друг друга напрямую — они публикуют и подписываются на события через шину.
Паттерн шины событий в Rust
Центральный элемент EDA — шина событий (event bus). В Rust мы можем реализовать её через tokio::sync::broadcast:
use tokio::sync::broadcast;
use serde::{Serialize, Deserialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
enum Event {
NewBlock { number: u64, hash: String },
PendingTx { hash: String, from: String, to: String },
PriceUpdate { pair: String, price: f64 },
LiquidationRisk { address: String, health_factor: f64 },
}
struct EventBus {
sender: broadcast::Sender<Event>,
}
impl EventBus {
fn new(capacity: usize) -> Self {
let (sender, _) = broadcast::channel(capacity);
Self { sender }
}
fn publish(&self, event: Event) {
// Игнорируем ошибку, если нет подписчиков
let _ = self.sender.send(event);
}
fn subscribe(&self) -> broadcast::Receiver<Event> {
self.sender.subscribe()
}
}
Архитектура компонентов
Система состоит из трёх типов компонентов:
Producers (источники событий)
async fn block_watcher(
bus: Arc<EventBus>,
provider: Arc<Provider>,
) {
let mut stream = provider.subscribe_blocks().await.unwrap();
while let Some(block) = stream.next().await {
bus.publish(Event::NewBlock {
number: block.number.unwrap().as_u64(),
hash: format!("{:?}", block.hash.unwrap()),
});
}
}
Consumers (обработчики событий)
async fn arbitrage_detector(
bus: Arc<EventBus>,
strategy: Arc<Strategy>,
) {
let mut rx = bus.subscribe();
while let Ok(event) = rx.recv().await {
match event {
Event::PriceUpdate { pair, price } => {
if let Some(opportunity) = strategy.check(&pair, price) {
bus.publish(Event::ArbitrageFound(opportunity));
}
}
_ => {} // Игнорируем нерелевантные события
}
}
}
Hybrid (и источник, и обработчик)
Компонент может подписываться на одни события и публиковать другие, создавая цепочки обработки.
Управление backpressure
Одна из ключевых проблем EDA — backpressure. Если consumer не успевает обрабатывать события, канал переполняется.
broadcast в tokio при переполнении отбрасывает старые сообщения и возвращает RecvError::Lagged. Для критичных consumer’ов используйте буферизированные каналы:
async fn buffered_consumer(
bus: Arc<EventBus>,
) {
let mut rx = bus.subscribe();
let (tx, mut buffered_rx) = tokio::sync::mpsc::channel(10_000);
// Буферизация в отдельной задаче
tokio::spawn(async move {
while let Ok(event) = rx.recv().await {
if tx.send(event).await.is_err() {
break;
}
}
});
// Обработка из буфера
while let Some(event) = buffered_rx.recv().await {
process(event).await;
}
}
Типизированные каналы с enum dispatch
Вместо одного enum Event для всех событий можно использовать типизированные каналы через трейт-объекты:
trait EventHandler: Send + Sync {
fn event_type(&self) -> &'static str;
fn handle(&self, payload: &[u8]) -> Result<(), Error>;
}
struct TypedBus {
handlers: DashMap<String, Vec<Arc<dyn EventHandler>>>,
}
Но на практике enum dispatch проще, быстрее (нет динамической диспетчеризации) и достаточен для большинства систем.
Персистентность событий
Для отказоустойчивости события нужно сохранять. Варианты:
- PostgreSQL с LISTEN/NOTIFY для простых случаев
- Redis Streams для высокой пропускной способности
- Apache Kafka для корпоративных систем
- NATS JetStream для облачных систем
Пример с Redis Streams:
async fn persist_event(
redis: &mut Connection,
event: &Event,
) -> Result<String> {
let payload = serde_json::to_string(event)?;
let id: String = redis::cmd("XADD")
.arg("events")
.arg("MAXLEN")
.arg("~")
.arg(100_000)
.arg("*")
.arg("data")
.arg(&payload)
.query_async(redis)
.await?;
Ok(id)
}
Мониторинг и метрики
Для EDA критичны следующие метрики:
- Event throughput — количество событий в секунду
- Processing latency — время от публикации до обработки
- Queue depth — размер очередей consumer’ов
- Error rate — процент ошибок обработки
Используйте metrics crate для экспорта в Prometheus:
metrics::counter!("events_published_total", "type" => "NewBlock").increment(1);
metrics::histogram!("event_processing_duration_seconds").record(elapsed);
Заключение
Event-driven архитектура на Rust — это мощный паттерн для реалтайм-систем, особенно в блокчейн-инфраструктуре. Комбинация tokio broadcast для in-process коммуникации и Redis Streams/NATS для распределённой обработки обеспечивает масштабируемость, отказоустойчивость и низкую латентность.