Ir al contenido principal
DevOpsMar 28, 2026

Deep EVM #28: Pipeline de Datos de Alto Rendimiento — Inserciones Batch, COPY y Resolución de Conflictos

OS
Open Soft Team

Engineering Team

El desafío de la ingestión de datos

Un indexador de blockchain necesita procesar miles de transacciones por segundo. Las inserciones individuales con INSERT son demasiado lentas — necesitamos técnicas de batch para alcanzar el rendimiento necesario.

Protocolo COPY: la vía rápida

COPY es el método más rápido para insertar datos en PostgreSQL. Bypass el parsing SQL, el planificador de consultas y la mayoría del overhead:

use tokio_postgres::types::ToSql;
use tokio_postgres::binary_copy::BinaryCopyInWriter;

async fn bulk_insert_transactions(
    client: &Client,
    transactions: &[Transaction],
) -> Result<u64> {
    let sink = client.copy_in(
        "COPY transactions (block_number, tx_hash, from_addr, to_addr, value) \
         FROM STDIN BINARY"
    ).await?;
    
    let writer = BinaryCopyInWriter::new(sink, &[
        Type::INT8, Type::BYTEA, Type::BYTEA, Type::BYTEA, Type::NUMERIC,
    ]);
    
    pin_mut!(writer);
    
    for tx in transactions {
        writer.as_mut().write(&[
            &tx.block_number,
            &tx.hash.as_bytes(),
            &tx.from.as_bytes(),
            &tx.to.as_bytes(),
            &tx.value,
        ]).await?;
    }
    
    writer.finish().await
}

Rendimiento comparativo:

MétodoFilas/segundo
INSERT individual1,000
INSERT batch (1000 valores)15,000
COPY texto80,000
COPY binario120,000

Upsert masivo con ON CONFLICT

Para datos que pueden duplicarse (re-indexación de bloques):

INSERT INTO transactions (block_number, tx_hash, from_addr, value)
SELECT * FROM unnest($1::bigint[], $2::bytea[], $3::bytea[], $4::numeric[])
ON CONFLICT (tx_hash) DO UPDATE SET
    value = EXCLUDED.value,
    updated_at = NOW();

En Rust con sqlx:

async fn upsert_batch(
    pool: &PgPool,
    txs: &[Transaction],
) -> Result<()> {
    let block_numbers: Vec<i64> = txs.iter().map(|t| t.block_number).collect();
    let hashes: Vec<Vec<u8>> = txs.iter().map(|t| t.hash.to_vec()).collect();
    
    sqlx::query(
        "INSERT INTO transactions (block_number, tx_hash) \
         SELECT * FROM unnest($1::bigint[], $2::bytea[]) \
         ON CONFLICT (tx_hash) DO NOTHING"
    )
    .bind(&block_numbers)
    .bind(&hashes)
    .execute(pool)
    .await?;
    
    Ok(())
}

Ajuste de WAL (Write-Ahead Log)

Para cargas de escritura intensiva, ajustar la configuración de WAL:

# postgresql.conf
wal_buffers = 64MB
wal_writer_delay = 200ms
checkpoint_completion_target = 0.9
max_wal_size = 4GB
min_wal_size = 1GB
synchronous_commit = off  # Solo si puedes tolerar pérdida de últimas transacciones

Monitoreo con pg_stat_statements

Monitorea la salud del pipeline:

-- Top consultas por tiempo total
SELECT query, calls, total_exec_time, mean_exec_time, rows
FROM pg_stat_statements
ORDER BY total_exec_time DESC
LIMIT 10;

-- Tasa de inserción
SELECT relname, n_tup_ins, n_tup_upd, n_tup_del
FROM pg_stat_user_tables
WHERE relname = 'transactions';

Connection pooling con PgBouncer

PgBouncer reduce el overhead de crear conexiones:

[pgbouncer]
pool_mode = transaction
default_pool_size = 25
max_client_conn = 1000
reserve_pool_size = 5
reserve_pool_timeout = 3

Conclusión

Construir pipelines de datos de alto rendimiento con PostgreSQL requiere usar COPY para ingestión rápida, ON CONFLICT para idempotencia, ajustar WAL para escrituras intensivas, PgBouncer para pooling de conexiones, y pg_stat_statements para monitoreo. Con estas técnicas, PostgreSQL puede manejar cientos de miles de filas por segundo.

Etiquetas