Перейти к основному содержимому
DevOpsMar 28, 2026

Deep EVM #28: Высоконагруженный пайплайн данных — batch insert, COPY и конфликты

OS
Open Soft Team

Engineering Team

Требования к пайплайну блокчейн-данных

Индексация блокчейна генерирует колоссальные объёмы данных. Ethereum производит ~12 транзакций в секунду в среднем, но пики достигают 50+ TPS. Каждая транзакция порождает события (logs), изменения состояния (state diffs) и внутренние вызовы (traces). В итоге индексатор должен записывать тысячи строк в секунду стабильно, 24/7, без потери данных.

Обычный INSERT по одной строке здесь не справится. Нужны batch-операции, COPY-протокол и умное управление конфликтами.

COPY — самый быстрый способ загрузки данных

PostgreSQL’s COPY протокол обходит планировщик запросов и парсер SQL, записывая данные напрямую в таблицу. Это в 5-10 раз быстрее обычного INSERT:

use sqlx::postgres::PgCopyIn;
use tokio::io::AsyncWriteExt;

async fn bulk_insert_transactions(
    pool: &PgPool,
    transactions: &[Transaction],
) -> Result<u64> {
    let mut conn = pool.acquire().await?;

    let mut copy_in = conn.copy_in_raw(
        "COPY transactions (tx_hash, block_number, from_address, \
         to_address, value, gas_used, created_at) \
         FROM STDIN WITH (FORMAT CSV)"
    ).await?;

    let mut buf = Vec::with_capacity(transactions.len() * 200);
    for tx in transactions {
        writeln!(
            buf, "{},{},{},{},{},{},{}",
            tx.tx_hash, tx.block_number, tx.from_address,
            tx.to_address.as_deref().unwrap_or(""),
            tx.value, tx.gas_used, tx.created_at.to_rfc3339()
        )?;
    }

    copy_in.send(buf.as_slice()).await?;
    let rows = copy_in.finish().await?;
    Ok(rows)
}

Бенчмарки COPY vs INSERT

Метод1K строк10K строк100K строк
INSERT по одной450мс4.5с45с
Batch INSERT (100)85мс850мс8.5с
COPY CSV12мс95мс850мс
COPY BINARY8мс65мс580мс

COPY BINARY ещё быстрее, так как избегает текстовой сериализации/десериализации.

Batch INSERT с ON CONFLICT

COPY не поддерживает ON CONFLICT. Для upsert используйте batch INSERT:

async fn batch_upsert_balances(
    pool: &PgPool,
    balances: &[Balance],
) -> Result<u64> {
    // Генерируем VALUES-плейсхолдеры
    let mut query = String::from(
        "INSERT INTO balances (address, token, balance, updated_at) VALUES "
    );

    let mut params: Vec<Box<dyn sqlx::Encode<'_, sqlx::Postgres> + Send>> = Vec::new();
    let mut param_idx = 1;

    for (i, b) in balances.iter().enumerate() {
        if i > 0 { query.push(','); }
        query.push_str(&format!(
            "(${}, ${}, ${}, ${})",
            param_idx, param_idx + 1, param_idx + 2, param_idx + 3
        ));
        param_idx += 4;
    }

    query.push_str(
        " ON CONFLICT (address, token) DO UPDATE SET \
         balance = EXCLUDED.balance, \
         updated_at = EXCLUDED.updated_at \
         WHERE balances.balance != EXCLUDED.balance"
    );

    // Используем sqlx::query с динамическими параметрами
    let mut q = sqlx::query(&query);
    for b in balances {
        q = q.bind(&b.address)
             .bind(&b.token)
             .bind(&b.balance)
             .bind(&b.updated_at);
    }

    let result = q.execute(pool).await?;
    Ok(result.rows_affected())
}

Обратите внимание на WHERE balances.balance != EXCLUDED.balance — это пропускает обновления, когда значение не изменилось, экономя write amplification.

Staging table паттерн

Для максимальной производительности используйте временную таблицу:

-- 1. Создать временную таблицу
CREATE TEMP TABLE staging_balances (
    LIKE balances INCLUDING NOTHING
) ON COMMIT DROP;

-- 2. COPY в staging (без индексов — максимальная скорость)
COPY staging_balances FROM STDIN WITH (FORMAT BINARY);

-- 3. Upsert из staging в целевую таблицу
INSERT INTO balances
SELECT * FROM staging_balances
ON CONFLICT (address, token) DO UPDATE SET
    balance = EXCLUDED.balance,
    updated_at = EXCLUDED.updated_at
WHERE balances.balance != EXCLUDED.balance;

Этот паттерн комбинирует скорость COPY с гибкостью ON CONFLICT.

Управление конфликтами

Стратегии разрешения конфликтов

-- 1. Игнорировать дубликаты
INSERT INTO events (...) VALUES (...)
ON CONFLICT (tx_hash, log_index) DO NOTHING;

-- 2. Обновить всё
ON CONFLICT (address, token) DO UPDATE SET
    balance = EXCLUDED.balance;

-- 3. Условное обновление (только если новее)
ON CONFLICT (address, token) DO UPDATE SET
    balance = EXCLUDED.balance,
    updated_at = EXCLUDED.updated_at
WHERE EXCLUDED.updated_at > balances.updated_at;

-- 4. Merge (агрегация)
ON CONFLICT (address) DO UPDATE SET
    total_tx_count = balances.total_tx_count + EXCLUDED.total_tx_count;

Пайплайн на tokio

Архитектура высокопроизводительного пайплайна:

use tokio::sync::mpsc;

struct Pipeline {
    batch_size: usize,
    flush_interval: Duration,
}

impl Pipeline {
    async fn run(
        &self,
        mut rx: mpsc::Receiver<Transaction>,
        pool: PgPool,
    ) {
        let mut buffer = Vec::with_capacity(self.batch_size);
        let mut interval = tokio::time::interval(self.flush_interval);

        loop {
            tokio::select! {
                Some(tx) = rx.recv() => {
                    buffer.push(tx);
                    if buffer.len() >= self.batch_size {
                        self.flush(&pool, &mut buffer).await;
                    }
                }
                _ = interval.tick() => {
                    if !buffer.is_empty() {
                        self.flush(&pool, &mut buffer).await;
                    }
                }
                else => break,
            }
        }
    }

    async fn flush(
        &self,
        pool: &PgPool,
        buffer: &mut Vec<Transaction>,
    ) {
        let batch: Vec<Transaction> = buffer.drain(..).collect();
        let count = batch.len();
        let start = Instant::now();

        match bulk_insert_transactions(pool, &batch).await {
            Ok(rows) => {
                metrics::counter!("pipeline_rows_inserted").increment(rows);
                metrics::histogram!("pipeline_flush_duration")
                    .record(start.elapsed().as_secs_f64());
            }
            Err(e) => {
                tracing::error!(error = %e, count, "Flush failed");
                metrics::counter!("pipeline_errors").increment(1);
                // Retry logic или dead letter queue
            }
        }
    }
}

Пайплайн буферизирует записи и флашит их по достижении batch_size или по таймеру, что бы ни наступило раньше.

Backpressure и мониторинг

Критичные метрики пайплайна:

  • Throughput — строки/секунду
  • Latency — время от получения до записи в БД
  • Queue depth — размер буфера (индикатор backpressure)
  • Error rate — процент failed flush’ей
  • DB connection wait time — ожидание соединения из пула
metrics::gauge!("pipeline_buffer_size").set(buffer.len() as f64);
metrics::counter!("pipeline_total_received").increment(1);

Алерт, если queue depth растёт — это означает, что запись не успевает за потоком данных.

Оптимизация PostgreSQL для write-heavy нагрузки

# Увеличить WAL для batch-записей
wal_buffers = 64MB
max_wal_size = 8GB
checkpoint_completion_target = 0.9

# Отключить fsync для dev (НИКОГДА в production!)
# synchronous_commit = off  # Для не-критичных данных

# Увеличить shared_buffers
shared_buffers = 8GB

# Параллельные workers для VACUUM
autovacuum_max_workers = 6

Для не-критичных данных (метрики, логи) synchronous_commit = off даёт 2-3x прирост скорости записи ценой возможной потери последних ~200мс данных при крэше.

Заключение

Высоконагруженный пайплайн данных для PostgreSQL — это архитектура, а не одна оптимизация. COPY для массовой загрузки, staging tables для upsert, tokio-пайплайн с батчингом и backpressure, мониторинг throughput и latency — всё это вместе обеспечивает стабильную запись тысяч строк в секунду 24/7.