Deep EVM #28: Высоконагруженный пайплайн данных — batch insert, COPY и конфликты
Engineering Team
Требования к пайплайну блокчейн-данных
Индексация блокчейна генерирует колоссальные объёмы данных. Ethereum производит ~12 транзакций в секунду в среднем, но пики достигают 50+ TPS. Каждая транзакция порождает события (logs), изменения состояния (state diffs) и внутренние вызовы (traces). В итоге индексатор должен записывать тысячи строк в секунду стабильно, 24/7, без потери данных.
Обычный INSERT по одной строке здесь не справится. Нужны batch-операции, COPY-протокол и умное управление конфликтами.
COPY — самый быстрый способ загрузки данных
PostgreSQL’s COPY протокол обходит планировщик запросов и парсер SQL, записывая данные напрямую в таблицу. Это в 5-10 раз быстрее обычного INSERT:
use sqlx::postgres::PgCopyIn;
use tokio::io::AsyncWriteExt;
async fn bulk_insert_transactions(
pool: &PgPool,
transactions: &[Transaction],
) -> Result<u64> {
let mut conn = pool.acquire().await?;
let mut copy_in = conn.copy_in_raw(
"COPY transactions (tx_hash, block_number, from_address, \
to_address, value, gas_used, created_at) \
FROM STDIN WITH (FORMAT CSV)"
).await?;
let mut buf = Vec::with_capacity(transactions.len() * 200);
for tx in transactions {
writeln!(
buf, "{},{},{},{},{},{},{}",
tx.tx_hash, tx.block_number, tx.from_address,
tx.to_address.as_deref().unwrap_or(""),
tx.value, tx.gas_used, tx.created_at.to_rfc3339()
)?;
}
copy_in.send(buf.as_slice()).await?;
let rows = copy_in.finish().await?;
Ok(rows)
}
Бенчмарки COPY vs INSERT
| Метод | 1K строк | 10K строк | 100K строк |
|---|---|---|---|
| INSERT по одной | 450мс | 4.5с | 45с |
| Batch INSERT (100) | 85мс | 850мс | 8.5с |
| COPY CSV | 12мс | 95мс | 850мс |
| COPY BINARY | 8мс | 65мс | 580мс |
COPY BINARY ещё быстрее, так как избегает текстовой сериализации/десериализации.
Batch INSERT с ON CONFLICT
COPY не поддерживает ON CONFLICT. Для upsert используйте batch INSERT:
async fn batch_upsert_balances(
pool: &PgPool,
balances: &[Balance],
) -> Result<u64> {
// Генерируем VALUES-плейсхолдеры
let mut query = String::from(
"INSERT INTO balances (address, token, balance, updated_at) VALUES "
);
let mut params: Vec<Box<dyn sqlx::Encode<'_, sqlx::Postgres> + Send>> = Vec::new();
let mut param_idx = 1;
for (i, b) in balances.iter().enumerate() {
if i > 0 { query.push(','); }
query.push_str(&format!(
"(${}, ${}, ${}, ${})",
param_idx, param_idx + 1, param_idx + 2, param_idx + 3
));
param_idx += 4;
}
query.push_str(
" ON CONFLICT (address, token) DO UPDATE SET \
balance = EXCLUDED.balance, \
updated_at = EXCLUDED.updated_at \
WHERE balances.balance != EXCLUDED.balance"
);
// Используем sqlx::query с динамическими параметрами
let mut q = sqlx::query(&query);
for b in balances {
q = q.bind(&b.address)
.bind(&b.token)
.bind(&b.balance)
.bind(&b.updated_at);
}
let result = q.execute(pool).await?;
Ok(result.rows_affected())
}
Обратите внимание на WHERE balances.balance != EXCLUDED.balance — это пропускает обновления, когда значение не изменилось, экономя write amplification.
Staging table паттерн
Для максимальной производительности используйте временную таблицу:
-- 1. Создать временную таблицу
CREATE TEMP TABLE staging_balances (
LIKE balances INCLUDING NOTHING
) ON COMMIT DROP;
-- 2. COPY в staging (без индексов — максимальная скорость)
COPY staging_balances FROM STDIN WITH (FORMAT BINARY);
-- 3. Upsert из staging в целевую таблицу
INSERT INTO balances
SELECT * FROM staging_balances
ON CONFLICT (address, token) DO UPDATE SET
balance = EXCLUDED.balance,
updated_at = EXCLUDED.updated_at
WHERE balances.balance != EXCLUDED.balance;
Этот паттерн комбинирует скорость COPY с гибкостью ON CONFLICT.
Управление конфликтами
Стратегии разрешения конфликтов
-- 1. Игнорировать дубликаты
INSERT INTO events (...) VALUES (...)
ON CONFLICT (tx_hash, log_index) DO NOTHING;
-- 2. Обновить всё
ON CONFLICT (address, token) DO UPDATE SET
balance = EXCLUDED.balance;
-- 3. Условное обновление (только если новее)
ON CONFLICT (address, token) DO UPDATE SET
balance = EXCLUDED.balance,
updated_at = EXCLUDED.updated_at
WHERE EXCLUDED.updated_at > balances.updated_at;
-- 4. Merge (агрегация)
ON CONFLICT (address) DO UPDATE SET
total_tx_count = balances.total_tx_count + EXCLUDED.total_tx_count;
Пайплайн на tokio
Архитектура высокопроизводительного пайплайна:
use tokio::sync::mpsc;
struct Pipeline {
batch_size: usize,
flush_interval: Duration,
}
impl Pipeline {
async fn run(
&self,
mut rx: mpsc::Receiver<Transaction>,
pool: PgPool,
) {
let mut buffer = Vec::with_capacity(self.batch_size);
let mut interval = tokio::time::interval(self.flush_interval);
loop {
tokio::select! {
Some(tx) = rx.recv() => {
buffer.push(tx);
if buffer.len() >= self.batch_size {
self.flush(&pool, &mut buffer).await;
}
}
_ = interval.tick() => {
if !buffer.is_empty() {
self.flush(&pool, &mut buffer).await;
}
}
else => break,
}
}
}
async fn flush(
&self,
pool: &PgPool,
buffer: &mut Vec<Transaction>,
) {
let batch: Vec<Transaction> = buffer.drain(..).collect();
let count = batch.len();
let start = Instant::now();
match bulk_insert_transactions(pool, &batch).await {
Ok(rows) => {
metrics::counter!("pipeline_rows_inserted").increment(rows);
metrics::histogram!("pipeline_flush_duration")
.record(start.elapsed().as_secs_f64());
}
Err(e) => {
tracing::error!(error = %e, count, "Flush failed");
metrics::counter!("pipeline_errors").increment(1);
// Retry logic или dead letter queue
}
}
}
}
Пайплайн буферизирует записи и флашит их по достижении batch_size или по таймеру, что бы ни наступило раньше.
Backpressure и мониторинг
Критичные метрики пайплайна:
- Throughput — строки/секунду
- Latency — время от получения до записи в БД
- Queue depth — размер буфера (индикатор backpressure)
- Error rate — процент failed flush’ей
- DB connection wait time — ожидание соединения из пула
metrics::gauge!("pipeline_buffer_size").set(buffer.len() as f64);
metrics::counter!("pipeline_total_received").increment(1);
Алерт, если queue depth растёт — это означает, что запись не успевает за потоком данных.
Оптимизация PostgreSQL для write-heavy нагрузки
# Увеличить WAL для batch-записей
wal_buffers = 64MB
max_wal_size = 8GB
checkpoint_completion_target = 0.9
# Отключить fsync для dev (НИКОГДА в production!)
# synchronous_commit = off # Для не-критичных данных
# Увеличить shared_buffers
shared_buffers = 8GB
# Параллельные workers для VACUUM
autovacuum_max_workers = 6
Для не-критичных данных (метрики, логи) synchronous_commit = off даёт 2-3x прирост скорости записи ценой возможной потери последних ~200мс данных при крэше.
Заключение
Высоконагруженный пайплайн данных для PostgreSQL — это архитектура, а не одна оптимизация. COPY для массовой загрузки, staging tables для upsert, tokio-пайплайн с батчингом и backpressure, мониторинг throughput и latency — всё это вместе обеспечивает стабильную запись тысяч строк в секунду 24/7.