From 25c0f5cd7cd3df810b0141bed8e3c8c3246652de Mon Sep 17 00:00:00 2001 From: Q8Webmaster Date: Sun, 14 Jun 2026 02:27:25 +0200 Subject: [PATCH 1/5] [parquet] use MICROS annotation for TIMESTAMP(n<=3) columns (Iceberg v2 compatibility) Paimon emits TIMESTAMP(MILLIS) for precision <= 3 columns. The Iceberg v2 spec requires INT64 MICROS for timestamp/timestamptz; MILLIS is only valid under Iceberg v3. This causes Iceberg-aware engines (Athena, Trino, Spark) to reject Parquet files with a schema compatibility error. - ParquetSchemaConverter.createTimestampWithLogicalType: emit MICROS for precision <= 3 instead of MILLIS. - ParquetRowDataWriter.TimestampMillsWriter.writeTimestamp: call value.toMicros() so the stored INT64 matches the MICROS annotation unit. The reader path (MILLIS -> precision=3, MICROS -> precision=6) is left unchanged so files written by older versions remain readable. Existing tables with precision<=3 columns should be rebuilt after upgrading. Tests: testLowPrecisionTimestampUseMicrosAnnotation verifies MICROS annotation for precision 0-3; testPaimonParquetSchemaConvert updated for the widened round-trip precision. --- .../parquet/ParquetSchemaConverter.java | 2 +- .../parquet/writer/ParquetRowDataWriter.java | 2 +- .../parquet/ParquetSchemaConverterTest.java | 116 +++++++++++++++++- 3 files changed, 117 insertions(+), 3 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java index 640081cd5002..e81b47b7c8b0 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java @@ -253,7 +253,7 @@ public static Type createTimestampWithLogicalType( return Types.primitive(INT64, repetition) .as( LogicalTypeAnnotation.timestampType( - isAdjustToUTC, LogicalTypeAnnotation.TimeUnit.MILLIS)) + isAdjustToUTC, LogicalTypeAnnotation.TimeUnit.MICROS)) .named(name); } else if (precision > 6) { return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition).named(name); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java index 80b788733342..9ab960b7498e 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java @@ -380,7 +380,7 @@ public void write(InternalArray arrayData, int ordinal) { } private void writeTimestamp(Timestamp value) { - recordConsumer.addLong(value.getMillisecond()); + recordConsumer.addLong(value.toMicros()); } } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaConverterTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaConverterTest.java index 2808cc535abb..41d97624a780 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaConverterTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaConverterTest.java @@ -21,20 +21,25 @@ import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.LocalZonedTimestampType; import org.apache.paimon.types.MapType; import org.apache.paimon.types.RowType; +import org.apache.paimon.types.TimestampType; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Types; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import java.util.Arrays; import static org.apache.paimon.format.parquet.ParquetSchemaConverter.convertToPaimonRowType; import static org.apache.paimon.format.parquet.ParquetSchemaConverter.convertToParquetMessageType; +import static org.apache.paimon.format.parquet.ParquetSchemaConverter.createTimestampWithLogicalType; import static org.apache.paimon.types.DataTypesTest.assertThat; +import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MICROS; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; /** Test for {@link ParquetSchemaConverter}. */ @@ -139,6 +144,115 @@ public void testParquetTimestampNanosSchemaConvert() { public void testPaimonParquetSchemaConvert() { MessageType messageType = convertToParquetMessageType(ALL_TYPES); RowType rowType = convertToPaimonRowType(messageType); - assertThat(ALL_TYPES).isEqualTo(rowType); + // TIMESTAMP(n<=3) is written with a MICROS annotation (for Iceberg v2 compatibility) and + // therefore reads back as TIMESTAMP(6). All other types round-trip exactly. + RowType expected = + new RowType( + Arrays.asList( + new DataField(0, "string", DataTypes.STRING()), + new DataField(1, "stringNotNull", DataTypes.STRING().notNull()), + new DataField(2, "boolean", DataTypes.BOOLEAN()), + new DataField(3, "bytes", DataTypes.BYTES()), + new DataField(4, "decimal(9,2)", DataTypes.DECIMAL(9, 2)), + new DataField(5, "decimal(18,2)", DataTypes.DECIMAL(18, 2)), + new DataField(6, "decimal(27,2)", DataTypes.DECIMAL(27, 2)), + new DataField(7, "tinyint", DataTypes.TINYINT()), + new DataField(8, "smallint", DataTypes.SMALLINT()), + new DataField(9, "int", DataTypes.INT()), + new DataField(10, "bigint", DataTypes.BIGINT()), + new DataField(11, "float", DataTypes.FLOAT()), + new DataField(12, "double", DataTypes.DOUBLE()), + new DataField(13, "date", DataTypes.DATE()), + new DataField(14, "time", DataTypes.TIME()), + new DataField(15, "timestamp(3)", new TimestampType(6)), + new DataField(16, "timestamp", DataTypes.TIMESTAMP()), + new DataField( + 17, "timestampLtz(3)", new LocalZonedTimestampType(6)), + new DataField( + 18, + "timestampLtz", + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()), + new DataField(19, "array", new ArrayType(DataTypes.STRING())), + new DataField( + 20, + "map", + new MapType(DataTypes.STRING(), DataTypes.STRING())), + new DataField( + 21, + "row", + new RowType( + Arrays.asList( + new DataField( + 22, + "f1", + DataTypes.INT().notNull()), + new DataField( + 23, "f2", DataTypes.STRING())))), + new DataField( + 24, + "nested", + new RowType( + Arrays.asList( + new DataField( + 25, + "f1", + new MapType( + DataTypes.STRING(), + new ArrayType( + DataTypes + .STRING()))), + new DataField( + 26, + "f2", + new RowType( + Arrays.asList( + new DataField( + 27, + "f1", + DataTypes + .INT() + .notNull()), + new DataField( + 28, + "f2", + DataTypes + .STRING()))) + .notNull())))))); + assertThat(expected).isEqualTo(rowType); + } + + @Test + public void testLowPrecisionTimestampUseMicrosAnnotation() { + // TIMESTAMP(n<=3) must emit a MICROS Parquet annotation, not MILLIS, so that Iceberg v2 + // readers (e.g. Athena, Trino) can interpret the column as "timestamp"/"timestamptz". + // The Iceberg v2 spec only allows INT64 MICROS for those logical types; MILLIS is + // Iceberg v3 only (https://iceberg.apache.org/spec/#parquet). + for (int precision = 0; precision <= 3; precision++) { + Type tsType = + createTimestampWithLogicalType( + "ts", precision, Type.Repetition.OPTIONAL, false); + Type tsLtzType = + createTimestampWithLogicalType( + "ts_ltz", precision, Type.Repetition.OPTIONAL, true); + + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation tsAnnotation = + (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) + tsType.getLogicalTypeAnnotation(); + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation tsLtzAnnotation = + (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) + tsLtzType.getLogicalTypeAnnotation(); + + Assertions.assertThat(tsAnnotation.getUnit()) + .as("TIMESTAMP(%d) should use MICROS annotation", precision) + .isEqualTo(MICROS); + Assertions.assertThat(tsLtzAnnotation.getUnit()) + .as( + "TIMESTAMP_WITH_LOCAL_TIME_ZONE(%d) should use MICROS annotation", + precision) + .isEqualTo(MICROS); + Assertions.assertThat(tsLtzAnnotation.isAdjustedToUTC()) + .as("TIMESTAMP_WITH_LOCAL_TIME_ZONE(%d) should be UTC-adjusted", precision) + .isTrue(); + } } } From 714293251f6b67f144fc8405a4078943a57ace2b Mon Sep 17 00:00:00 2001 From: Q8Webmaster Date: Sun, 14 Jun 2026 02:27:30 +0200 Subject: [PATCH 2/5] [parquet] fix TIMESTAMP(n<=3) footer stats decoded as millis instead of micros ParquetSimpleStatsExtractor.toTimestampStats called fromEpochMillis for precision <= 3, but footer statistics for those columns now contain INT64 microseconds (matching the MICROS annotation). Switch to fromMicros so that Parquet column bounds are decoded correctly. --- .../paimon/format/parquet/ParquetSimpleStatsExtractor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSimpleStatsExtractor.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSimpleStatsExtractor.java index f286cfce5dfe..e1575773e24b 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSimpleStatsExtractor.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSimpleStatsExtractor.java @@ -223,8 +223,8 @@ private SimpleColStats toTimestampStats(Statistics stats, int precision) { if (precision <= 3) { LongStatistics longStats = (LongStatistics) stats; return new SimpleColStats( - Timestamp.fromEpochMillis(longStats.getMin()), - Timestamp.fromEpochMillis(longStats.getMax()), + Timestamp.fromMicros(longStats.getMin()), + Timestamp.fromMicros(longStats.getMax()), stats.getNumNulls()); } else if (precision <= 6) { LongStatistics longStats = (LongStatistics) stats; From f9315e0d00698476258c637c5aabbf1da636e24c Mon Sep 17 00:00:00 2001 From: Q8Webmaster Date: Sun, 14 Jun 2026 02:27:38 +0200 Subject: [PATCH 3/5] [parquet] fix lazy dictionary decoding for TIMESTAMP(n<=3) columns VectorizedColumnReader has a lazy dictionary fast path for INT64/ LongColumnVector: the raw Parquet dictionary is stored on the vector directly, bypassing LongTimestampUpdater.longTimestamp() which normalises on-disk microseconds to the milliseconds that ParquetTimestampVector. getTimestamp expects. The result is timestamps ~1000x too far in the future for any dictionary-encoded page (triggered when rowGroupSize is large enough to activate dictionary encoding). Exclude precision <= 3 timestamp types from lazy decoding via a new isLowPrecisionTimestamp helper so the eager path (decodeDictionaryIds) is always taken, applying the correct /1000 normalisation. --- .../reader/ParquetTimestampVector.java | 4 +-- .../reader/VectorizedColumnReader.java | 26 ++++++++++++++----- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetTimestampVector.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetTimestampVector.java index f280d03bd9dc..61d132d58923 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetTimestampVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetTimestampVector.java @@ -26,8 +26,8 @@ import static org.apache.paimon.utils.Preconditions.checkArgument; /** - * Parquet write timestamp precision 0-3 as int64 mills, 4-6 as int64 micros, 7-9 as int96, this - * class wrap the real vector to provide {@link TimestampColumnVector} interface. + * Parquet write timestamp precision 0-6 as int64 micros, 7-9 as int96, this class wrap the real + * vector to provide {@link TimestampColumnVector} interface. */ public class ParquetTimestampVector implements TimestampColumnVector { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedColumnReader.java index 53b11138da0f..c90e48c2815e 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedColumnReader.java @@ -28,6 +28,8 @@ import org.apache.paimon.data.columnar.writable.WritableColumnVector; import org.apache.paimon.data.columnar.writable.WritableIntVector; import org.apache.paimon.types.DataType; +import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.TimestampType; import org.apache.parquet.CorruptDeltaByteArrays; import org.apache.parquet.VersionParser.ParsedVersion; @@ -115,14 +117,18 @@ public VectorizedColumnReader( } private boolean isLazyDecodingSupported( - PrimitiveType.PrimitiveTypeName typeName, ColumnVector columnVector) { + PrimitiveType.PrimitiveTypeName typeName, + DataType dataType, + ColumnVector columnVector) { boolean isSupported = false; switch (typeName) { case INT32: isSupported = columnVector instanceof IntColumnVector; break; case INT64: - isSupported = columnVector instanceof LongColumnVector; + isSupported = + columnVector instanceof LongColumnVector + && !isLowPrecisionTimestamp(dataType); break; case FLOAT: isSupported = columnVector instanceof FloatColumnVector; @@ -139,6 +145,17 @@ private boolean isLazyDecodingSupported( return isSupported; } + private static boolean isLowPrecisionTimestamp(DataType dataType) { + switch (dataType.getTypeRoot()) { + case TIMESTAMP_WITHOUT_TIME_ZONE: + return ((TimestampType) dataType).getPrecision() <= 3; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return ((LocalZonedTimestampType) dataType).getPrecision() <= 3; + default: + return false; + } + } + /** Reads `total` rows from this columnReader into column. */ void readBatch( int total, @@ -198,12 +215,9 @@ void readBatch( (VectorizedValuesReader) dataColumn); } - // TIMESTAMP_MILLIS encoded as INT64 can't be lazily decoded as we need to post - // process - // the values to add microseconds precision. if (column.hasDictionary() || (startRowId == pageFirstRowIndex - && isLazyDecodingSupported(typeName, column))) { + && isLazyDecodingSupported(typeName, type, column))) { column.setDictionary(new ParquetDictionary(dictionary)); } else { updater.decodeDictionaryIds( From fb82382fc48f52d986135f1d733d569c112462e1 Mon Sep 17 00:00:00 2001 From: Q8Webmaster Date: Sun, 14 Jun 2026 23:06:47 +0200 Subject: [PATCH 4/5] =?UTF-8?q?[parquet]=20fix=20TIMESTAMP(n<=3D3)=20Parqu?= =?UTF-8?q?et=20row-group=20filter=20uses=20epoch=5Fms=20vs=20epoch=5F?= =?UTF-8?q?=C2=B5s?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After the MICROS annotation change, ParquetRowDataWriter stores TIMESTAMP(n<=3) values as epoch microseconds. ParquetFilters.convertLiteral was still using getMillisecond() (epoch_ms) for those columns, so the Parquet row-group statistics comparison always failed against the new epoch_µs statistics — causing WHERE predicates on low-precision timestamp columns to filter out all row groups and return empty results. Fix: use toMicros() for all INT64 timestamp precisions (0-6) in ParquetFilters.convertLiteral, matching the storage unit written by the writer. Update ParquetFiltersTest assertions accordingly. --- .../filter2/predicate/ParquetFilters.java | 7 +--- .../format/parquet/ParquetFiltersTest.java | 40 +++++++++---------- 2 files changed, 22 insertions(+), 25 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java b/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java index 29feeb1b5ecd..a897d2d70ce1 100644 --- a/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java +++ b/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java @@ -299,11 +299,8 @@ private static Comparable toParquetObject( } else if (value instanceof Timestamp) { Timestamp timestamp = (Timestamp) value; int precision = getTimestampPrecision(type); - if (precision <= 3) { - // milliseconds - return timestamp.getMillisecond(); - } else if (precision <= 6) { - // microseconds + if (precision <= 6) { + // microseconds (precision 0-3 also stored as MICROS since the writer uses toMicros()) return timestamp.toMicros(); } // precision > 6 uses INT96, not supported diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFiltersTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFiltersTest.java index 4fdd1e3927aa..129f0d8fa3c0 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFiltersTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFiltersTest.java @@ -447,7 +447,7 @@ public void testInFilterDecimal64Bit() { @Test public void testTimestampMillis() { - // precision <= 3 uses milliseconds (INT64) + // precision <= 3 now uses microseconds (MICROS annotation, matching the writer) int precision = 3; PredicateBuilder builder = new PredicateBuilder( @@ -456,16 +456,16 @@ public void testTimestampMillis() { new DataField(0, "ts1", new TimestampType(precision))))); Timestamp value = Timestamp.fromEpochMillis(1704067200000L); // 2024-01-01 00:00:00 - long expectedMillis = value.getMillisecond(); + long expectedMicros = value.toMicros(); test(builder.isNull(0), "eq(ts1, null)", true); test(builder.isNotNull(0), "noteq(ts1, null)", true); - test(builder.equal(0, value), "eq(ts1, " + expectedMillis + ")", true); - test(builder.notEqual(0, value), "noteq(ts1, " + expectedMillis + ")", true); - test(builder.lessThan(0, value), "lt(ts1, " + expectedMillis + ")", true); - test(builder.lessOrEqual(0, value), "lteq(ts1, " + expectedMillis + ")", true); - test(builder.greaterThan(0, value), "gt(ts1, " + expectedMillis + ")", true); - test(builder.greaterOrEqual(0, value), "gteq(ts1, " + expectedMillis + ")", true); + test(builder.equal(0, value), "eq(ts1, " + expectedMicros + ")", true); + test(builder.notEqual(0, value), "noteq(ts1, " + expectedMicros + ")", true); + test(builder.lessThan(0, value), "lt(ts1, " + expectedMicros + ")", true); + test(builder.lessOrEqual(0, value), "lteq(ts1, " + expectedMicros + ")", true); + test(builder.greaterThan(0, value), "gt(ts1, " + expectedMicros + ")", true); + test(builder.greaterOrEqual(0, value), "gteq(ts1, " + expectedMicros + ")", true); } @Test @@ -493,7 +493,7 @@ public void testTimestampMicros() { @Test public void testLocalZonedTimestampMillis() { - // precision <= 3 uses milliseconds (INT64) + // precision <= 3 now uses microseconds (MICROS annotation, matching the writer) int precision = 3; PredicateBuilder builder = new PredicateBuilder( @@ -505,14 +505,14 @@ public void testLocalZonedTimestampMillis() { new LocalZonedTimestampType(precision))))); Timestamp value = Timestamp.fromEpochMillis(1704067200000L); - long expectedMillis = value.getMillisecond(); + long expectedMicros = value.toMicros(); test(builder.isNull(0), "eq(ts1, null)", true); test(builder.isNotNull(0), "noteq(ts1, null)", true); - test(builder.equal(0, value), "eq(ts1, " + expectedMillis + ")", true); - test(builder.notEqual(0, value), "noteq(ts1, " + expectedMillis + ")", true); - test(builder.lessThan(0, value), "lt(ts1, " + expectedMillis + ")", true); - test(builder.greaterThan(0, value), "gt(ts1, " + expectedMillis + ")", true); + test(builder.equal(0, value), "eq(ts1, " + expectedMicros + ")", true); + test(builder.notEqual(0, value), "noteq(ts1, " + expectedMicros + ")", true); + test(builder.lessThan(0, value), "lt(ts1, " + expectedMicros + ")", true); + test(builder.greaterThan(0, value), "gt(ts1, " + expectedMicros + ")", true); } @Test @@ -555,22 +555,22 @@ public void testInFilterTimestampMillis() { test( builder.in(0, Arrays.asList(v1, v2, v3)), "or(or(eq(ts1, " - + v1.getMillisecond() + + v1.toMicros() + "), eq(ts1, " - + v2.getMillisecond() + + v2.toMicros() + ")), eq(ts1, " - + v3.getMillisecond() + + v3.toMicros() + "))", true); test( builder.notIn(0, Arrays.asList(v1, v2, v3)), "and(and(noteq(ts1, " - + v1.getMillisecond() + + v1.toMicros() + "), noteq(ts1, " - + v2.getMillisecond() + + v2.toMicros() + ")), noteq(ts1, " - + v3.getMillisecond() + + v3.toMicros() + "))", true); } From 5bc2d622d6831d277141aace9d088f3bfe8cf784 Mon Sep 17 00:00:00 2001 From: Q8Webmaster Date: Sun, 14 Jun 2026 23:38:45 +0200 Subject: [PATCH 5/5] [parquet] remove over-long comment that broke Spotless line-length check --- .../org/apache/parquet/filter2/predicate/ParquetFilters.java | 1 - 1 file changed, 1 deletion(-) diff --git a/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java b/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java index a897d2d70ce1..8719aee5cba6 100644 --- a/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java +++ b/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java @@ -300,7 +300,6 @@ private static Comparable toParquetObject( Timestamp timestamp = (Timestamp) value; int precision = getTimestampPrecision(type); if (precision <= 6) { - // microseconds (precision 0-3 also stored as MICROS since the writer uses toMicros()) return timestamp.toMicros(); } // precision > 6 uses INT96, not supported