Deep EVM #28 : Pipeline de données haut débit — Insertions par lots, COPY et résolution de conflits
Engineering Team
Le défi du haut débit
Construire un indexeur blockchain qui traite des millions de blocs nécessite d’insérer des milliards de lignes aussi vite que possible. Les instructions INSERT individuelles plafonnent à 5 000 lignes par seconde ; le protocole COPY pousse à 250 000+ lignes par seconde. Cet article couvre les techniques qui font la différence.
INSERT par lots
La première optimisation : regrouper les insertions :
// Au lieu de :
for row in rows {
sqlx::query("INSERT INTO events (id, data) VALUES ($1, $2)")
.bind(&row.id)
.bind(&row.data)
.execute(&pool)
.await?;
}
// Utilisez l'insertion par lots :
let mut builder = sqlx::QueryBuilder::new(
"INSERT INTO events (id, data) "
);
builder.push_values(rows.iter().take(1000), |mut b, row| {
b.push_bind(&row.id)
.push_bind(&row.data);
});
builder.build().execute(&pool).await?;
Performance : 5 000 rows/s -> 50 000 rows/s (10x).
Le protocole COPY
COPY est le mécanisme le plus rapide pour charger des données dans PostgreSQL :
use tokio_postgres::CopyInWriter;
let writer = client.copy_in(
"COPY events (id, block_number, data) FROM STDIN WITH (FORMAT BINARY)"
).await?;
let mut writer = BinaryCopyInWriter::new(writer, &[Type::INT8, Type::INT8, Type::JSONB]);
for row in rows {
writer.write(&[&row.id, &row.block_number, &row.data]).await?;
}
writer.finish().await?;
Performance : 50 000 rows/s -> 250 000+ rows/s (5x supplémentaire).
Upsert en masse avec ON CONFLICT
INSERT INTO accounts (address, balance, updated_at)
SELECT * FROM UNNEST($1::text[], $2::numeric[], $3::timestamptz[])
ON CONFLICT (address)
DO UPDATE SET
balance = EXCLUDED.balance,
updated_at = EXCLUDED.updated_at
WHERE accounts.updated_at < EXCLUDED.updated_at;
La clause WHERE empêche les mises à jour obsolètes — essentiel quand les données arrivent dans le désordre.
Réglage WAL
Le Write-Ahead Log (WAL) est le goulot d’étranglement pour les écritures à haut débit :
-- postgresql.conf
wal_level = minimal -- Réduire le logging WAL (pas de réplication)
max_wal_size = 4GB -- Buffer WAL plus grand avant checkpoint
checkpoint_completion_target = 0.9 -- Étaler les checkpoints
synchronous_commit = off -- Acquitter avant flush disque
Attention : synchronous_commit = off peut perdre les dernières millisecondes de données en cas de crash. Acceptable pour les indexeurs qui peuvent réindexer.
Pooling de connexions avec PgBouncer
# pgbouncer.ini
[databases]
mydb = host=127.0.0.1 dbname=mydb
[pgbouncer]
pool_mode = transaction
max_client_conn = 1000
default_pool_size = 50
PgBouncer multiplex les connexions : 1000 connexions application -> 50 connexions PostgreSQL. Cela réduit l’overhead de gestion de connexions et permet plus de concurrence.
Monitoring avec pg_stat_statements
-- Top 10 des requêtes par temps total
SELECT
query,
calls,
round(total_exec_time::numeric, 2) AS total_ms,
round(mean_exec_time::numeric, 2) AS mean_ms,
rows
FROM pg_stat_statements
ORDER BY total_exec_time DESC
LIMIT 10;
Résultats
| Technique | Débit | Amélioration |
|---|---|---|
| INSERT unitaire | 5K rows/s | Base |
| INSERT par lots (1000) | 50K rows/s | 10x |
| COPY binaire | 250K rows/s | 50x |
| COPY + WAL tuning | 400K rows/s | 80x |
Conclusion
Les pipelines de données haut débit avec PostgreSQL reposent sur le protocole COPY pour le chargement, l’upsert en masse pour l’idempotence, le réglage WAL pour le débit d’écriture et PgBouncer pour le pooling de connexions. Ensemble, ces techniques permettent de traiter des centaines de milliers de lignes par seconde.