diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java index 640081cd5002..e81b47b7c8b0 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java @@ -253,7 +253,7 @@ public static Type createTimestampWithLogicalType( return Types.primitive(INT64, repetition) .as( LogicalTypeAnnotation.timestampType( - isAdjustToUTC, LogicalTypeAnnotation.TimeUnit.MILLIS)) + isAdjustToUTC, LogicalTypeAnnotation.TimeUnit.MICROS)) .named(name); } else if (precision > 6) { return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition).named(name); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSimpleStatsExtractor.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSimpleStatsExtractor.java index f286cfce5dfe..e1575773e24b 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSimpleStatsExtractor.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSimpleStatsExtractor.java @@ -223,8 +223,8 @@ private SimpleColStats toTimestampStats(Statistics stats, int precision) { if (precision <= 3) { LongStatistics longStats = (LongStatistics) stats; return new SimpleColStats( - Timestamp.fromEpochMillis(longStats.getMin()), - Timestamp.fromEpochMillis(longStats.getMax()), + Timestamp.fromMicros(longStats.getMin()), + Timestamp.fromMicros(longStats.getMax()), stats.getNumNulls()); } else if (precision <= 6) { LongStatistics longStats = (LongStatistics) stats; diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetTimestampVector.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetTimestampVector.java index f280d03bd9dc..61d132d58923 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetTimestampVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetTimestampVector.java @@ -26,8 +26,8 @@ import static org.apache.paimon.utils.Preconditions.checkArgument; /** - * Parquet write timestamp precision 0-3 as int64 mills, 4-6 as int64 micros, 7-9 as int96, this - * class wrap the real vector to provide {@link TimestampColumnVector} interface. + * Parquet write timestamp precision 0-6 as int64 micros, 7-9 as int96, this class wrap the real + * vector to provide {@link TimestampColumnVector} interface. */ public class ParquetTimestampVector implements TimestampColumnVector { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedColumnReader.java index 53b11138da0f..c90e48c2815e 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedColumnReader.java @@ -28,6 +28,8 @@ import org.apache.paimon.data.columnar.writable.WritableColumnVector; import org.apache.paimon.data.columnar.writable.WritableIntVector; import org.apache.paimon.types.DataType; +import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.TimestampType; import org.apache.parquet.CorruptDeltaByteArrays; import org.apache.parquet.VersionParser.ParsedVersion; @@ -115,14 +117,18 @@ public VectorizedColumnReader( } private boolean isLazyDecodingSupported( - PrimitiveType.PrimitiveTypeName typeName, ColumnVector columnVector) { + PrimitiveType.PrimitiveTypeName typeName, + DataType dataType, + ColumnVector columnVector) { boolean isSupported = false; switch (typeName) { case INT32: isSupported = columnVector instanceof IntColumnVector; break; case INT64: - isSupported = columnVector instanceof LongColumnVector; + isSupported = + columnVector instanceof LongColumnVector + && !isLowPrecisionTimestamp(dataType); break; case FLOAT: isSupported = columnVector instanceof FloatColumnVector; @@ -139,6 +145,17 @@ private boolean isLazyDecodingSupported( return isSupported; } + private static boolean isLowPrecisionTimestamp(DataType dataType) { + switch (dataType.getTypeRoot()) { + case TIMESTAMP_WITHOUT_TIME_ZONE: + return ((TimestampType) dataType).getPrecision() <= 3; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return ((LocalZonedTimestampType) dataType).getPrecision() <= 3; + default: + return false; + } + } + /** Reads `total` rows from this columnReader into column. */ void readBatch( int total, @@ -198,12 +215,9 @@ void readBatch( (VectorizedValuesReader) dataColumn); } - // TIMESTAMP_MILLIS encoded as INT64 can't be lazily decoded as we need to post - // process - // the values to add microseconds precision. if (column.hasDictionary() || (startRowId == pageFirstRowIndex - && isLazyDecodingSupported(typeName, column))) { + && isLazyDecodingSupported(typeName, type, column))) { column.setDictionary(new ParquetDictionary(dictionary)); } else { updater.decodeDictionaryIds( diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java index 80b788733342..9ab960b7498e 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java @@ -380,7 +380,7 @@ public void write(InternalArray arrayData, int ordinal) { } private void writeTimestamp(Timestamp value) { - recordConsumer.addLong(value.getMillisecond()); + recordConsumer.addLong(value.toMicros()); } } diff --git a/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java b/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java index 29feeb1b5ecd..8719aee5cba6 100644 --- a/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java +++ b/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java @@ -299,11 +299,7 @@ private static Comparable toParquetObject( } else if (value instanceof Timestamp) { Timestamp timestamp = (Timestamp) value; int precision = getTimestampPrecision(type); - if (precision <= 3) { - // milliseconds - return timestamp.getMillisecond(); - } else if (precision <= 6) { - // microseconds + if (precision <= 6) { return timestamp.toMicros(); } // precision > 6 uses INT96, not supported diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFiltersTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFiltersTest.java index 4fdd1e3927aa..129f0d8fa3c0 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFiltersTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFiltersTest.java @@ -447,7 +447,7 @@ public void testInFilterDecimal64Bit() { @Test public void testTimestampMillis() { - // precision <= 3 uses milliseconds (INT64) + // precision <= 3 now uses microseconds (MICROS annotation, matching the writer) int precision = 3; PredicateBuilder builder = new PredicateBuilder( @@ -456,16 +456,16 @@ public void testTimestampMillis() { new DataField(0, "ts1", new TimestampType(precision))))); Timestamp value = Timestamp.fromEpochMillis(1704067200000L); // 2024-01-01 00:00:00 - long expectedMillis = value.getMillisecond(); + long expectedMicros = value.toMicros(); test(builder.isNull(0), "eq(ts1, null)", true); test(builder.isNotNull(0), "noteq(ts1, null)", true); - test(builder.equal(0, value), "eq(ts1, " + expectedMillis + ")", true); - test(builder.notEqual(0, value), "noteq(ts1, " + expectedMillis + ")", true); - test(builder.lessThan(0, value), "lt(ts1, " + expectedMillis + ")", true); - test(builder.lessOrEqual(0, value), "lteq(ts1, " + expectedMillis + ")", true); - test(builder.greaterThan(0, value), "gt(ts1, " + expectedMillis + ")", true); - test(builder.greaterOrEqual(0, value), "gteq(ts1, " + expectedMillis + ")", true); + test(builder.equal(0, value), "eq(ts1, " + expectedMicros + ")", true); + test(builder.notEqual(0, value), "noteq(ts1, " + expectedMicros + ")", true); + test(builder.lessThan(0, value), "lt(ts1, " + expectedMicros + ")", true); + test(builder.lessOrEqual(0, value), "lteq(ts1, " + expectedMicros + ")", true); + test(builder.greaterThan(0, value), "gt(ts1, " + expectedMicros + ")", true); + test(builder.greaterOrEqual(0, value), "gteq(ts1, " + expectedMicros + ")", true); } @Test @@ -493,7 +493,7 @@ public void testTimestampMicros() { @Test public void testLocalZonedTimestampMillis() { - // precision <= 3 uses milliseconds (INT64) + // precision <= 3 now uses microseconds (MICROS annotation, matching the writer) int precision = 3; PredicateBuilder builder = new PredicateBuilder( @@ -505,14 +505,14 @@ public void testLocalZonedTimestampMillis() { new LocalZonedTimestampType(precision))))); Timestamp value = Timestamp.fromEpochMillis(1704067200000L); - long expectedMillis = value.getMillisecond(); + long expectedMicros = value.toMicros(); test(builder.isNull(0), "eq(ts1, null)", true); test(builder.isNotNull(0), "noteq(ts1, null)", true); - test(builder.equal(0, value), "eq(ts1, " + expectedMillis + ")", true); - test(builder.notEqual(0, value), "noteq(ts1, " + expectedMillis + ")", true); - test(builder.lessThan(0, value), "lt(ts1, " + expectedMillis + ")", true); - test(builder.greaterThan(0, value), "gt(ts1, " + expectedMillis + ")", true); + test(builder.equal(0, value), "eq(ts1, " + expectedMicros + ")", true); + test(builder.notEqual(0, value), "noteq(ts1, " + expectedMicros + ")", true); + test(builder.lessThan(0, value), "lt(ts1, " + expectedMicros + ")", true); + test(builder.greaterThan(0, value), "gt(ts1, " + expectedMicros + ")", true); } @Test @@ -555,22 +555,22 @@ public void testInFilterTimestampMillis() { test( builder.in(0, Arrays.asList(v1, v2, v3)), "or(or(eq(ts1, " - + v1.getMillisecond() + + v1.toMicros() + "), eq(ts1, " - + v2.getMillisecond() + + v2.toMicros() + ")), eq(ts1, " - + v3.getMillisecond() + + v3.toMicros() + "))", true); test( builder.notIn(0, Arrays.asList(v1, v2, v3)), "and(and(noteq(ts1, " - + v1.getMillisecond() + + v1.toMicros() + "), noteq(ts1, " - + v2.getMillisecond() + + v2.toMicros() + ")), noteq(ts1, " - + v3.getMillisecond() + + v3.toMicros() + "))", true); } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaConverterTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaConverterTest.java index 2808cc535abb..41d97624a780 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaConverterTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaConverterTest.java @@ -21,20 +21,25 @@ import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.LocalZonedTimestampType; import org.apache.paimon.types.MapType; import org.apache.paimon.types.RowType; +import org.apache.paimon.types.TimestampType; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Types; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import java.util.Arrays; import static org.apache.paimon.format.parquet.ParquetSchemaConverter.convertToPaimonRowType; import static org.apache.paimon.format.parquet.ParquetSchemaConverter.convertToParquetMessageType; +import static org.apache.paimon.format.parquet.ParquetSchemaConverter.createTimestampWithLogicalType; import static org.apache.paimon.types.DataTypesTest.assertThat; +import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MICROS; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; /** Test for {@link ParquetSchemaConverter}. */ @@ -139,6 +144,115 @@ public void testParquetTimestampNanosSchemaConvert() { public void testPaimonParquetSchemaConvert() { MessageType messageType = convertToParquetMessageType(ALL_TYPES); RowType rowType = convertToPaimonRowType(messageType); - assertThat(ALL_TYPES).isEqualTo(rowType); + // TIMESTAMP(n<=3) is written with a MICROS annotation (for Iceberg v2 compatibility) and + // therefore reads back as TIMESTAMP(6). All other types round-trip exactly. + RowType expected = + new RowType( + Arrays.asList( + new DataField(0, "string", DataTypes.STRING()), + new DataField(1, "stringNotNull", DataTypes.STRING().notNull()), + new DataField(2, "boolean", DataTypes.BOOLEAN()), + new DataField(3, "bytes", DataTypes.BYTES()), + new DataField(4, "decimal(9,2)", DataTypes.DECIMAL(9, 2)), + new DataField(5, "decimal(18,2)", DataTypes.DECIMAL(18, 2)), + new DataField(6, "decimal(27,2)", DataTypes.DECIMAL(27, 2)), + new DataField(7, "tinyint", DataTypes.TINYINT()), + new DataField(8, "smallint", DataTypes.SMALLINT()), + new DataField(9, "int", DataTypes.INT()), + new DataField(10, "bigint", DataTypes.BIGINT()), + new DataField(11, "float", DataTypes.FLOAT()), + new DataField(12, "double", DataTypes.DOUBLE()), + new DataField(13, "date", DataTypes.DATE()), + new DataField(14, "time", DataTypes.TIME()), + new DataField(15, "timestamp(3)", new TimestampType(6)), + new DataField(16, "timestamp", DataTypes.TIMESTAMP()), + new DataField( + 17, "timestampLtz(3)", new LocalZonedTimestampType(6)), + new DataField( + 18, + "timestampLtz", + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()), + new DataField(19, "array", new ArrayType(DataTypes.STRING())), + new DataField( + 20, + "map", + new MapType(DataTypes.STRING(), DataTypes.STRING())), + new DataField( + 21, + "row", + new RowType( + Arrays.asList( + new DataField( + 22, + "f1", + DataTypes.INT().notNull()), + new DataField( + 23, "f2", DataTypes.STRING())))), + new DataField( + 24, + "nested", + new RowType( + Arrays.asList( + new DataField( + 25, + "f1", + new MapType( + DataTypes.STRING(), + new ArrayType( + DataTypes + .STRING()))), + new DataField( + 26, + "f2", + new RowType( + Arrays.asList( + new DataField( + 27, + "f1", + DataTypes + .INT() + .notNull()), + new DataField( + 28, + "f2", + DataTypes + .STRING()))) + .notNull())))))); + assertThat(expected).isEqualTo(rowType); + } + + @Test + public void testLowPrecisionTimestampUseMicrosAnnotation() { + // TIMESTAMP(n<=3) must emit a MICROS Parquet annotation, not MILLIS, so that Iceberg v2 + // readers (e.g. Athena, Trino) can interpret the column as "timestamp"/"timestamptz". + // The Iceberg v2 spec only allows INT64 MICROS for those logical types; MILLIS is + // Iceberg v3 only (https://iceberg.apache.org/spec/#parquet). + for (int precision = 0; precision <= 3; precision++) { + Type tsType = + createTimestampWithLogicalType( + "ts", precision, Type.Repetition.OPTIONAL, false); + Type tsLtzType = + createTimestampWithLogicalType( + "ts_ltz", precision, Type.Repetition.OPTIONAL, true); + + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation tsAnnotation = + (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) + tsType.getLogicalTypeAnnotation(); + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation tsLtzAnnotation = + (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) + tsLtzType.getLogicalTypeAnnotation(); + + Assertions.assertThat(tsAnnotation.getUnit()) + .as("TIMESTAMP(%d) should use MICROS annotation", precision) + .isEqualTo(MICROS); + Assertions.assertThat(tsLtzAnnotation.getUnit()) + .as( + "TIMESTAMP_WITH_LOCAL_TIME_ZONE(%d) should use MICROS annotation", + precision) + .isEqualTo(MICROS); + Assertions.assertThat(tsLtzAnnotation.isAdjustedToUTC()) + .as("TIMESTAMP_WITH_LOCAL_TIME_ZONE(%d) should be UTC-adjusted", precision) + .isTrue(); + } } }