Deep EVM #28: Pipeline de Datos de Alto Rendimiento — Inserciones Batch, COPY y Resolución de Conflictos
Engineering Team
El desafío de la ingestión de datos
Un indexador de blockchain necesita procesar miles de transacciones por segundo. Las inserciones individuales con INSERT son demasiado lentas — necesitamos técnicas de batch para alcanzar el rendimiento necesario.
Protocolo COPY: la vía rápida
COPY es el método más rápido para insertar datos en PostgreSQL. Bypass el parsing SQL, el planificador de consultas y la mayoría del overhead:
use tokio_postgres::types::ToSql;
use tokio_postgres::binary_copy::BinaryCopyInWriter;
async fn bulk_insert_transactions(
client: &Client,
transactions: &[Transaction],
) -> Result<u64> {
let sink = client.copy_in(
"COPY transactions (block_number, tx_hash, from_addr, to_addr, value) \
FROM STDIN BINARY"
).await?;
let writer = BinaryCopyInWriter::new(sink, &[
Type::INT8, Type::BYTEA, Type::BYTEA, Type::BYTEA, Type::NUMERIC,
]);
pin_mut!(writer);
for tx in transactions {
writer.as_mut().write(&[
&tx.block_number,
&tx.hash.as_bytes(),
&tx.from.as_bytes(),
&tx.to.as_bytes(),
&tx.value,
]).await?;
}
writer.finish().await
}
Rendimiento comparativo:
| Método | Filas/segundo |
|---|---|
| INSERT individual | 1,000 |
| INSERT batch (1000 valores) | 15,000 |
| COPY texto | 80,000 |
| COPY binario | 120,000 |
Upsert masivo con ON CONFLICT
Para datos que pueden duplicarse (re-indexación de bloques):
INSERT INTO transactions (block_number, tx_hash, from_addr, value)
SELECT * FROM unnest($1::bigint[], $2::bytea[], $3::bytea[], $4::numeric[])
ON CONFLICT (tx_hash) DO UPDATE SET
value = EXCLUDED.value,
updated_at = NOW();
En Rust con sqlx:
async fn upsert_batch(
pool: &PgPool,
txs: &[Transaction],
) -> Result<()> {
let block_numbers: Vec<i64> = txs.iter().map(|t| t.block_number).collect();
let hashes: Vec<Vec<u8>> = txs.iter().map(|t| t.hash.to_vec()).collect();
sqlx::query(
"INSERT INTO transactions (block_number, tx_hash) \
SELECT * FROM unnest($1::bigint[], $2::bytea[]) \
ON CONFLICT (tx_hash) DO NOTHING"
)
.bind(&block_numbers)
.bind(&hashes)
.execute(pool)
.await?;
Ok(())
}
Ajuste de WAL (Write-Ahead Log)
Para cargas de escritura intensiva, ajustar la configuración de WAL:
# postgresql.conf
wal_buffers = 64MB
wal_writer_delay = 200ms
checkpoint_completion_target = 0.9
max_wal_size = 4GB
min_wal_size = 1GB
synchronous_commit = off # Solo si puedes tolerar pérdida de últimas transacciones
Monitoreo con pg_stat_statements
Monitorea la salud del pipeline:
-- Top consultas por tiempo total
SELECT query, calls, total_exec_time, mean_exec_time, rows
FROM pg_stat_statements
ORDER BY total_exec_time DESC
LIMIT 10;
-- Tasa de inserción
SELECT relname, n_tup_ins, n_tup_upd, n_tup_del
FROM pg_stat_user_tables
WHERE relname = 'transactions';
Connection pooling con PgBouncer
PgBouncer reduce el overhead de crear conexiones:
[pgbouncer]
pool_mode = transaction
default_pool_size = 25
max_client_conn = 1000
reserve_pool_size = 5
reserve_pool_timeout = 3
Conclusión
Construir pipelines de datos de alto rendimiento con PostgreSQL requiere usar COPY para ingestión rápida, ON CONFLICT para idempotencia, ajustar WAL para escrituras intensivas, PgBouncer para pooling de conexiones, y pg_stat_statements para monitoreo. Con estas técnicas, PostgreSQL puede manejar cientos de miles de filas por segundo.