GH-3530: Eagerly release column buffers during row group flush#3571
GH-3530: Eagerly release column buffers during row group flush#3571iemejia wants to merge 2 commits into
Conversation
Release each column's compressed page buffers immediately after writing to disk in flushToFileWriter(), rather than holding all buffers until the entire flush completes. This is correct resource management that makes buffers GC-eligible sooner, though benchmarking with a PeakTrackingAllocator confirms it does not reduce peak memory: the peak is reached during the write phase (as pages are compressed), not during flush. Changes: - Call pageWriter.close() after each column in flushToFileWriter() - Add writeAllToAndRelease() to ConcatenatingByteBufferCollector for progressive slab-by-slab memory release during write - Make close() idempotent (safe to call after eager release or double-close) - Add RowGroupFlushBenchmark (20-column wide schema, PeakTrackingAllocator) and BlackHoleOutputFile for measuring flush performance and peak memory - Add tests for eager release, double-close safety, and output equivalence
| * @throws IOException if an I/O error occurs | ||
| */ | ||
| public void writeAllToAndRelease(OutputStream out) throws IOException { | ||
| WritableByteChannel channel = Channels.newChannel(out); |
There was a problem hiding this comment.
Should we wrap a try-with-resource around this channel?
There was a problem hiding this comment.
No, closing the channel would close the underlying OutputStream which is owned by the caller. The channel from Channels.newChannel(out) is just a stateless wrapper with no independent resources to release. I've added a comment explaining this in the updated code.
| * @param out the output stream to write to | ||
| * @throws IOException if an I/O error occurs | ||
| */ | ||
| public void writeAllToAndRelease(OutputStream out) throws IOException { |
There was a problem hiding this comment.
Wait, are we just using this in tests?
There was a problem hiding this comment.
Good catch — it was indeed only used in tests. I've merged writeAllToAndRelease into writeAllTo so the progressive buffer release is now the production implementation (called via ParquetFileWriter.writeColumnChunk → bytes.writeAllTo(out)). The separate method is gone.
| @@ -692,6 +692,11 @@ public void flushToFileWriter(ParquetFileWriter writer) throws IOException { | |||
| for (ColumnDescriptor path : schema.getColumns()) { | |||
| ColumnChunkPageWriter pageWriter = writers.get(path); | |||
There was a problem hiding this comment.
Should we use the try-with-resource pattern here? (Fanboy of the pattern talking here)
There was a problem hiding this comment.
Done. I am also a big fan of try-with-resources. This also ensures the column's buffers are released even if writeToFileWriter throws.
…n writeAllTo - Merge writeAllToAndRelease into writeAllTo so progressive buffer release is used in production (via ParquetFileWriter.writeColumnChunk). - Add comment explaining why the WritableByteChannel is intentionally not closed (closing it would close the caller's OutputStream). - Use try-with-resources in flushToFileWriter for idiomatic cleanup. - Capture buf.size() before write in writeToFileWriter since writeAllTo now releases buffers progressively.
| LOG.debug(String.format( | ||
| "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s", | ||
| buf.size(), | ||
| bytesWritten, |
There was a problem hiding this comment.
Why this change? We only need this when we have debug log enabled
There was a problem hiding this comment.
buf.size() is captured before the write because writeColumnChunk internally calls buf.writeAllTo(out) which progressively releases each slab and resets size to 0 (see ConcatenatingByteBufferCollector.writeAllTo, line 103). If we called buf.size() inside the debug block after the write, it would always report 0.
|
@Fokko All review comments addressed. Added baseline vs optimized benchmark comparison to the PR description — no throughput regression and identical peak memory. Ready for another look. |
Part of #3530 — Apache Parquet Java Performance Improvements
Summary
Eagerly release column buffers during row group flush for correct resource management.
Call
pageWriter.close()after each column influshToFileWriter()instead of holding all buffers until the entire flush completes. AddwriteAllToAndRelease()toConcatenatingByteBufferCollectorfor progressive slab-by-slab memory release during write. Makeclose()idempotent (safe after eager release or double-close).Changes
pageWriter.close()after each column influshToFileWriter()writeAllToAndRelease()toConcatenatingByteBufferCollectorfor progressive slab-by-slab memory release during writeclose()idempotent (safe to call after eager release or double-close)RowGroupFlushBenchmark(20-column wide schema,PeakTrackingAllocator) andBlackHoleOutputFilefor measuring flush performance and peak memoryBenchmark results
Environment: JDK 25.0.3 (Temurin), OpenJDK 64-Bit Server VM, JMH 1.37, Linux x86_64,
-wi 3 -i 5 -f 2,-Xms512m -Xmx1g.RowGroupFlushBenchmark(100K rows, 20 BINARY columns, 200 bytes each, UNCOMPRESSED):No throughput regression. Peak memory is byte-for-byte identical with and without this change — the peak is reached during page compression (as pages are accumulated into
ConcatenatingByteBufferCollector), not during flush. This is correct resource management that makes buffers GC-eligible sooner and guarantees release on error paths via try-with-resources, but is not a memory optimization.