Chiến lược upsert quy mô lớn: staging table pattern, batch merge, index management, lock contention — production trade-offs khi merge file vào billion-row table.
Bạn có 1 file CSV chứa 1 triệu bản ghi. Bảng đích có 1 tỷ bản ghi. Yêu cầu: bản ghi nào đã tồn tại thì UPDATE, bản ghi nào chưa có thì INSERT. Bạn sẽ làm như thế nào?
Staging table là chìa khóa — không upsert trực tiếp từ file vào bảng 1 tỷ rows. Load file vào staging table trước (COPY nhanh nhất, không trigger, không index check), sau đó merge từ staging vào main trong batch nhỏ. Tách biệt giai đoạn load và giai đoạn merge giúp isolate lỗi, retry an toàn, và không lock main table trong khi đọc file.
Batch là bắt buộc — một transaction merge 1 triệu rows sẽ giữ lock quá lâu. Chia merge thành batch 5.000–20.000 rows/transaction, commit sau mỗi batch. Giảm lock duration, tránh lock escalation, và cho phép production traffic tiếp tục giữa các batch.
Index là kẻ thù của tốc độ insert, nhưng là bạn của tốc độ lookup. Trong quá trình merge, index trên main table phải được update cho mỗi row → overhead lớn. Nếu merge trong maintenance window, có thể drop index → merge → rebuild; nếu online thì phải giữ index và chấp nhận tốc độ thấp hơn để không ảnh hưởng query đang chạy.
"Đây là bài toán upsert quy mô lớn — điều đầu tiên tôi tránh là loop từng row hoặc upsert trực tiếp từ app lên bảng 1 tỷ rows, vì cả hai đều cực kỳ chậm và block production traffic. Cách tôi tiếp cận là dùng staging table pattern: load toàn bộ 1 triệu bản ghi từ file vào một bảng tạm bằng COPY command — không index, không trigger, không constraint, tốc độ tối đa. Sau đó tôi merge từ staging vào main table theo batch 10.000 rows một lần, dùng INSERT ... ON CONFLICT DO UPDATE trên PostgreSQL. Mỗi batch là một transaction riêng, commit xong rồi mới batch tiếp — vừa giảm lock duration, vừa có checkpoint để resume nếu job fail. Tôi cũng thêm delay nhỏ giữa các batch và monitor pg_stat_activity để không làm bão hòa I/O của production. Nếu đây là maintenance window thì tôi sẽ drop secondary indexes trước khi merge và rebuild sau — tốc độ tăng gấp 3–5 lần. Nếu cần online hoàn toàn thì giữ index nhưng throttle batch để response time không bị ảnh hưởng."
-- Tạo staging table — UNLOGGED để không ghi WAL, tăng tốc loadCREATE UNLOGGED TABLE orders_staging ( order_id BIGINT, customer_id BIGINT, amount NUMERIC(12,2), status VARCHAR(50), updated_at TIMESTAMPTZ -- Không có PRIMARY KEY, không có index, không có FK);-- Load file bằng COPY — nhanh nhất, không qua app layerCOPY orders_staging FROM '/data/orders_update.csv'WITH (FORMAT csv, HEADER true, DELIMITER ',');-- 1 triệu rows: ~30–60 giây tùy disk I/O
Tại sao COPY thay vì INSERT từ app?
Phương pháp
Tốc độ
Ghi chú
Row-by-row INSERT
~500 rows/s
N round trips mạng
Batch INSERT (1000/batch)
~50.000 rows/s
Ít round trip hơn
COPY command
~200.000–500.000 rows/s
Stream trực tiếp, bypass parser
Nếu app Java phải đọc file trước, dùng CopyManager của JDBC:
@Servicepublic class BulkLoader { private final DataSource dataSource; public long loadToStaging(Path csvFile) throws Exception { try (Connection conn = dataSource.getConnection(); Reader reader = Files.newBufferedReader(csvFile)) { CopyManager copyManager = new CopyManager((BaseConnection) conn); return copyManager.copyIn( "COPY orders_staging FROM STDIN WITH (FORMAT csv, HEADER true)", reader ); } }}
-- Loại row không hợp lệ trước khi mergeDELETE FROM orders_stagingWHERE order_id IS NULL OR amount < 0 OR status NOT IN ('PENDING', 'PAID', 'CANCELLED', 'REFUNDED');-- Log vào dead-letter table nếu cần auditINSERT INTO orders_staging_errorsSELECT *, NOW(), 'INVALID_STATUS'FROM orders_stagingWHERE status NOT IN ('PENDING', 'PAID', 'CANCELLED', 'REFUNDED');
@Service@Slf4jpublic class BulkMergeService { private static final int BATCH_SIZE = 10_000; private static final long BATCH_DELAY_MS = 50; // throttle để không bão hòa I/O private final JdbcTemplate jdbc; public void mergeFromStaging() throws Exception { long totalRows = jdbc.queryForObject( "SELECT COUNT(*) FROM orders_staging", Long.class); log.info("Starting merge: {} rows in staging", totalRows); long offset = 0; int batchNum = 0; while (offset < totalRows) { int merged = mergeBatch(offset, BATCH_SIZE); offset += BATCH_SIZE; batchNum++; if (batchNum % 10 == 0) { log.info("Progress: {}/{} rows processed ({:.1f}%)", offset, totalRows, 100.0 * offset / totalRows); } Thread.sleep(BATCH_DELAY_MS); // throttle } log.info("Merge complete: {} batches, {} rows", batchNum, totalRows); } @Transactional // mỗi batch = 1 transaction public int mergeBatch(long offset, int batchSize) { return jdbc.update(""" INSERT INTO orders (order_id, customer_id, amount, status, updated_at) SELECT order_id, customer_id, amount, status, updated_at FROM orders_staging ORDER BY order_id LIMIT ? OFFSET ? ON CONFLICT (order_id) DO UPDATE SET customer_id = EXCLUDED.customer_id, amount = EXCLUDED.amount, status = EXCLUDED.status, updated_at = EXCLUDED.updated_at WHERE orders.updated_at < EXCLUDED.updated_at """, batchSize, offset); }}
Tại sao ORDER BY order_id trong batch? Nếu 2 batch chạy song song mà không có thứ tự nhất quán, chúng có thể lock cùng row theo thứ tự ngược nhau → deadlock. ORDER BY đảm bảo tất cả transaction lock rows theo cùng một thứ tự.
Giữ nguyên tất cả index. Tốc độ chậm hơn nhưng không ảnh hưởng query.
-- Thêm: rate limit batch, monitor query time song songSELECT query, state, wait_event, now() - query_start AS durationFROM pg_stat_activityWHERE state != 'idle'ORDER BY duration DESC;
-- Drop secondary indexes trước khi mergeDROP INDEX CONCURRENTLY idx_orders_customer_id;DROP INDEX CONCURRENTLY idx_orders_status;DROP INDEX CONCURRENTLY idx_orders_updated_at;-- Chạy merge (nhanh hơn 3–5x vì không update index mỗi row)-- ...-- Rebuild sau khi mergeCREATE INDEX CONCURRENTLY idx_orders_customer_id ON orders(customer_id);CREATE INDEX CONCURRENTLY idx_orders_status ON orders(status);CREATE INDEX CONCURRENTLY idx_orders_updated_at ON orders(updated_at);-- CONCURRENTLY: build trong background, không lock bảng
Giữ lại index nào? PRIMARY KEY (order_id) phải giữ vì ON CONFLICT cần nó. Secondary indexes có thể drop nếu không có read query nào dùng chúng trong lúc merge.
Nếu bảng orders được partition theo thời gian (ví dụ orders_2024_01), và file chỉ update data của một partition:
-- 1. Tạo bảng mới với cấu trúc giống partition cũCREATE TABLE orders_2024_01_new (LIKE orders_2024_01 INCLUDING ALL);-- 2. Copy toàn bộ partition cũ vào bảng mớiINSERT INTO orders_2024_01_new SELECT * FROM orders_2024_01;-- 3. Merge staging vào bảng mới (không lock production)INSERT INTO orders_2024_01_newSELECT ... FROM orders_stagingON CONFLICT (order_id) DO UPDATE SET ...;-- 4. Atomic swap (cực nhanh, lock chỉ vài milliseconds)BEGIN;ALTER TABLE orders DETACH PARTITION orders_2024_01;ALTER TABLE orders ATTACH PARTITION orders_2024_01_new FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');COMMIT;-- 5. Drop partition cũDROP TABLE orders_2024_01;
"Dùng INSERT ON CONFLICT trực tiếp từ app, 1 row 1 request" → Row-by-row là pattern chậm nhất: 1 triệu rows × 1 round-trip = hàng giờ, và lock mỗi row một lần riêng → không efficient. Luôn batch.
"Một transaction cho toàn bộ 1 triệu rows" → Giữ lock trên hàng triệu rows trong hàng chục phút. Production queries bị block. Nếu fail → rollback toàn bộ, mất công từ đầu. Batch nhỏ + commit thường xuyên là bắt buộc.
"Chạy nhiều thread merge song song để nhanh hơn" → Nguy hiểm nếu không có consistent ordering. Hai thread lock cùng rows theo thứ tự khác nhau → deadlock. Nếu muốn song song, phải partition staging table theo key range không overlap, mỗi thread xử lý range riêng biệt.
"Không cần staging table, đọc file rồi upsert luôn" → Mất khả năng validate trước, không có checkpoint để resume nếu fail, và đọc file chậm hơn COPY nhiều lần. Staging table là checkpoint thiết yếu.
"Drop toàn bộ index để tăng tốc" → Không drop PRIMARY KEY — ON CONFLICT cần nó để xác định conflict. Chỉ drop secondary indexes trong maintenance window.
1. Nếu file CSV có thể có duplicate order_id (cùng một order xuất hiện nhiều lần trong file)?
Deduplicate trong staging trước khi merge:
DELETE FROM orders_staging a USING orders_staging bWHERE a.ctid < b.ctid AND a.order_id = b.order_id;-- Hoặc dùng DISTINCT ON:CREATE TABLE orders_staging_dedup ASSELECT DISTINCT ON (order_id) *FROM orders_staging ORDER BY order_id, updated_at DESC;
2. Làm sao tính được số row thực sự INSERT vs UPDATE?
ON CONFLICT DO UPDATE không phân biệt trong affected rows count. Dùng xmax để detect:
INSERT INTO orders (...)SELECT ... FROM orders_stagingON CONFLICT (order_id) DO UPDATE SET ...RETURNING order_id, (xmax = 0) AS is_insert, -- xmax=0 → mới insert; xmax>0 → update (xmax != 0) AS is_update;
3. Nếu bảng đích đang có write traffic cao trong lúc merge thì sao?
Tăng delay giữa batch (throttle), giảm batch size xuống 1.000–2.000 rows. Monitor pg_stat_activity để xem lock wait time. Nếu merge không khẩn cấp, schedule vào giờ thấp điểm. Với AWS RDS Aurora, có thể route read-check query sang Read Replica để giảm tải primary.
4. Nếu cần làm điều này thường xuyên (daily batch)?
Cân nhắc chuyển sang CDC (Change Data Capture) với Kafka + Debezium: thay vì batch file hàng ngày, stream changes real-time → upsert liên tục với batch nhỏ. Giảm peak load, tăng data freshness.