跳到主要内容
DevOpsMar 28, 2026

Deep EVM #28: High-Throughput Data Pipeline — Batch Inserts, COPY, and Conflict Resolution

OS
Open Soft Team

Engineering Team

The Insert Throughput Problem

You are building a blockchain indexer that processes 200 transactions per block, with a new block every 12 seconds. That is roughly 17 transactions per second at steady state. Easy, right? Until you factor in historical backfill: syncing 18 million blocks at maximum speed requires inserting 3.6 billion rows as fast as possible.

Individual INSERT statements max out at roughly 5,000 rows per second on a typical PostgreSQL server. COPY protocol can push 200,000+ rows per second. The difference is 40x — the difference between a 9-day backfill and a 5-hour backfill.

INSERT vs COPY Performance

Single INSERT (Slowest)

// ~5,000 rows/sec — each statement is a round trip
for tx in transactions {
    sqlx::query(
        "INSERT INTO transactions (hash, from_addr, to_addr, value_wei, block_number)
         VALUES ($1, $2, $3, $4, $5)"
    )
    .bind(&tx.hash)
    .bind(&tx.from_addr)
    .bind(&tx.to_addr)
    .bind(&tx.value_wei)
    .bind(tx.block_number)
    .execute(&pool)
    .await?;
}

Batch INSERT (Better)

// ~50,000 rows/sec — single statement, multiple rows
let mut query_builder = sqlx::QueryBuilder::new(
    "INSERT INTO transactions (hash, from_addr, to_addr, value_wei, block_number) "
);

query_builder.push_values(&transactions[..], |mut b, tx| {
    b.push_bind(&tx.hash)
     .push_bind(&tx.from_addr)
     .push_bind(&tx.to_addr)
     .push_bind(&tx.value_wei)
     .push_bind(tx.block_number);
});

query_builder.build().execute(&pool).await?;

Batch size matters: too small and you waste round trips; too large and you hit PostgreSQL’s parameter limit (65,535 parameters for a prepared statement).

// Optimal batch size: 1000-5000 rows
for chunk in transactions.chunks(2000) {
    let mut builder = sqlx::QueryBuilder::new(
        "INSERT INTO transactions (hash, from_addr, to_addr, value_wei, block_number) "
    );
    builder.push_values(chunk, |mut b, tx| {
        b.push_bind(&tx.hash)
         .push_bind(&tx.from_addr)
         .push_bind(&tx.to_addr)
         .push_bind(&tx.value_wei)
         .push_bind(tx.block_number);
    });
    builder.build().execute(&pool).await?;
}

COPY Protocol (Fastest)

The COPY protocol streams binary data directly into the table, bypassing the query parser and planner:

use tokio_postgres::types::ToSql;
use futures::SinkExt;

// Using tokio-postgres directly for COPY support
let copy_query = "COPY transactions (hash, from_addr, to_addr, value_wei, block_number)
                  FROM STDIN WITH (FORMAT binary)";

let sink = client.copy_in(copy_query).await?;
let writer = BinaryCopyInWriter::new(sink, &[
    Type::BYTEA,    // hash
    Type::BYTEA,    // from_addr
    Type::BYTEA,    // to_addr
    Type::NUMERIC,  // value_wei
    Type::INT8,     // block_number
]);

pin_mut!(writer);

for tx in &transactions {
    writer.as_mut().write(&[
        &tx.hash as &(dyn ToSql + Sync),
        &tx.from_addr,
        &tx.to_addr,
        &tx.value_wei,
        &tx.block_number,
    ]).await?;
}

writer.finish().await?;

Performance comparison on a typical server (NVMe SSD, 32GB RAM):

MethodThroughputLatency/RowNetwork Roundtrips
Single INSERT5K rows/sec200us1 per row
Batch INSERT (1000)50K rows/sec20us1 per 1000 rows
COPY text150K rows/sec6.7usStreaming
COPY binary250K rows/sec4usStreaming

ON CONFLICT Strategies

Real-world data pipelines encounter duplicates. PostgreSQL’s ON CONFLICT clause handles this efficiently:

Upsert (Update on Conflict)

INSERT INTO transactions (hash, from_addr, to_addr, value_wei, block_number, status)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (hash) DO UPDATE SET
    status = EXCLUDED.status,
    updated_at = NOW()
WHERE transactions.status != EXCLUDED.status;

The WHERE clause in DO UPDATE prevents unnecessary writes (and WAL generation) when the row has not actually changed.

Skip Duplicates

INSERT INTO transactions (hash, from_addr, to_addr, value_wei, block_number)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (hash) DO NOTHING;

Bulk Upsert Pattern

async fn bulk_upsert(
    pool: &PgPool,
    transactions: &[Transaction],
) -> anyhow::Result<u64> {
    let mut total_affected = 0u64;

    for chunk in transactions.chunks(2000) {
        let mut builder = sqlx::QueryBuilder::new(
            "INSERT INTO transactions (hash, from_addr, to_addr, value_wei, block_number) "
        );

        builder.push_values(chunk, |mut b, tx| {
            b.push_bind(&tx.hash)
             .push_bind(&tx.from_addr)
             .push_bind(&tx.to_addr)
             .push_bind(&tx.value_wei)
             .push_bind(tx.block_number);
        });

        builder.push(" ON CONFLICT (hash) DO UPDATE SET value_wei = EXCLUDED.value_wei, block_number = EXCLUDED.block_number WHERE transactions.block_number < EXCLUDED.block_number");

        let result = builder.build().execute(pool).await?;
        total_affected += result.rows_affected();
    }

    Ok(total_affected)
}

WAL Tuning for High Write Throughput

The Write-Ahead Log (WAL) is PostgreSQL’s durability mechanism. Every write goes to WAL before the table. High write throughput requires WAL tuning:

# postgresql.conf — high-write workload

# WAL size — larger = fewer checkpoints
max_wal_size = 8GB
min_wal_size = 2GB

# Checkpoint tuning
checkpoint_completion_target = 0.9  # Spread checkpoint I/O
checkpoint_timeout = 15min          # Less frequent checkpoints

# WAL compression (PostgreSQL 15+)
wal_compression = zstd

# Synchronous commit — trade durability for speed
# Only disable if you can tolerate losing last few transactions on crash
synchronous_commit = off  # ~3x write throughput increase

# WAL writer
wal_writer_delay = 200ms
wal_writer_flush_after = 1MB

Warning: Setting synchronous_commit = off means a crash can lose the last 200ms of committed transactions. For blockchain data that can be re-fetched, this is acceptable. For financial records, never disable it.

Unlogged Tables for Temporary Data

For staging tables during backfill:

-- No WAL = 5-10x faster writes, but data lost on crash
CREATE UNLOGGED TABLE transactions_staging (
    LIKE transactions INCLUDING ALL
);

-- Bulk load into staging
COPY transactions_staging FROM STDIN WITH (FORMAT binary);

-- Move to permanent table
INSERT INTO transactions
SELECT * FROM transactions_staging
ON CONFLICT (hash) DO NOTHING;

DROP TABLE transactions_staging;

PgBouncer Connection Pooling

PostgreSQL creates a new process for each connection. At 100+ connections, the process overhead degrades performance. PgBouncer sits between your application and PostgreSQL, multiplexing thousands of application connections onto a few PostgreSQL connections:

# pgbouncer.ini
[databases]
mydb = host=localhost port=5432 dbname=mydb

[pgbouncer]
listen_port = 6432
listen_addr = 0.0.0.0
auth_type = scram-sha-256
auth_file = /etc/pgbouncer/userlist.txt

# Pool settings
pool_mode = transaction    # Release connection after each transaction
default_pool_size = 25     # Connections per user/database pair
max_client_conn = 1000     # Max incoming connections
max_db_connections = 50    # Max connections to PostgreSQL

# Timeouts
server_idle_timeout = 600
client_idle_timeout = 0
query_timeout = 30

Pool modes:

  • session: Connection held for entire client session (like no pooling)
  • transaction: Connection returned after each transaction (recommended)
  • statement: Connection returned after each statement (strictest, some features break)

In Rust with sqlx:

// Connect through PgBouncer
let pool = PgPoolOptions::new()
    .max_connections(50)  // Match PgBouncer's default_pool_size
    .min_connections(5)
    .acquire_timeout(Duration::from_secs(3))
    .connect("postgres://user:pass@localhost:6432/mydb")
    .await?;

Monitoring: pg_stat_statements and Slow Query Log

pg_stat_statements

The most important PostgreSQL monitoring extension. It tracks execution statistics for all queries:

-- Enable in postgresql.conf:
-- shared_preload_libraries = 'pg_stat_statements'
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;

-- Top 10 queries by total time
SELECT
    LEFT(query, 100) AS query,
    calls,
    ROUND(total_exec_time::numeric, 2) AS total_ms,
    ROUND(mean_exec_time::numeric, 2) AS mean_ms,
    ROUND((stddev_exec_time)::numeric, 2) AS stddev_ms,
    rows
FROM pg_stat_statements
ORDER BY total_exec_time DESC
LIMIT 10;

-- Queries with high variance (inconsistent performance)
SELECT
    LEFT(query, 100),
    calls,
    ROUND(mean_exec_time::numeric, 2) AS mean_ms,
    ROUND(stddev_exec_time::numeric, 2) AS stddev_ms,
    ROUND((stddev_exec_time / NULLIF(mean_exec_time, 0) * 100)::numeric, 1) AS cv_pct
FROM pg_stat_statements
WHERE calls > 100
ORDER BY stddev_exec_time / NULLIF(mean_exec_time, 0) DESC
LIMIT 10;

Slow Query Log

# postgresql.conf
log_min_duration_statement = 100  # Log queries slower than 100ms
log_line_prefix = '%t [%p]: db=%d,user=%u '
log_statement = 'none'  # Don't log all statements
auto_explain.log_min_duration = 500  # Auto-explain queries > 500ms
auto_explain.log_analyze = on
auto_explain.log_buffers = on

Monitoring Dashboard Queries

-- Connection state
SELECT state, COUNT(*)
FROM pg_stat_activity
GROUP BY state;

-- Table sizes
SELECT
    relname,
    pg_size_pretty(pg_total_relation_size(relid)) AS total_size,
    pg_size_pretty(pg_relation_size(relid)) AS table_size,
    pg_size_pretty(pg_indexes_size(relid)) AS index_size,
    n_live_tup AS live_rows,
    n_dead_tup AS dead_rows
FROM pg_stat_user_tables
ORDER BY pg_total_relation_size(relid) DESC;

-- Index usage
SELECT
    indexrelname,
    idx_scan AS times_used,
    pg_size_pretty(pg_relation_size(indexrelid)) AS size
FROM pg_stat_user_indexes
ORDER BY idx_scan ASC;

Complete Pipeline Architecture

[Block Source] -> [Decoder] -> [Batch Buffer (2000 rows)]
                                       |
                          +------------+
                          |
              [COPY to staging table]
                          |
              [INSERT ... ON CONFLICT from staging]
                          |
              [NOTIFY indexer_channel]
                          |
              [Async Index/Materialized View Refresh]

Rust implementation of the batch buffer:

struct BatchBuffer<T> {
    items: Vec<T>,
    capacity: usize,
    flush_interval: Duration,
    last_flush: Instant,
}

impl<T> BatchBuffer<T> {
    fn new(capacity: usize, flush_interval: Duration) -> Self {
        Self {
            items: Vec::with_capacity(capacity),
            capacity,
            flush_interval,
            last_flush: Instant::now(),
        }
    }

    fn push(&mut self, item: T) -> Option<Vec<T>> {
        self.items.push(item);
        if self.should_flush() {
            Some(self.flush())
        } else {
            None
        }
    }

    fn should_flush(&self) -> bool {
        self.items.len() >= self.capacity
            || self.last_flush.elapsed() >= self.flush_interval
    }

    fn flush(&mut self) -> Vec<T> {
        self.last_flush = Instant::now();
        std::mem::take(&mut self.items)
    }
}

Conclusion

High-throughput PostgreSQL data pipelines require thinking in batches, not individual rows. Use COPY protocol for maximum ingestion speed (250K rows/sec), batch INSERTs for moderate throughput with ON CONFLICT support, and tune WAL settings to reduce checkpoint overhead. Front your database with PgBouncer to handle connection storms, and monitor everything with pg_stat_statements. The difference between a naive pipeline and an optimized one is 40x — the difference between days and hours for large data migrations.