Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,46 @@ public org.apache.parquet.column.statistics.Statistics fromParquetStatistics(
return fromParquetStatisticsInternal(createdBy, statistics, type, expectedOrder);
}

// Overload that uses a pre-computed shouldIgnoreCorruptStats flag to avoid redundant parsing
private org.apache.parquet.column.statistics.Statistics fromParquetStatisticsInternal(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of duplicating the entire fromParquetStatisticsInternal body, the existing method can simply delegate to a new overload and this eliminates duplicate code

static org.apache.parquet.column.statistics.Statistics fromParquetStatisticsInternal(
      String createdBy, Statistics formatStats, PrimitiveType type, SortOrder typeSortOrder) {
    return fromParquetStatisticsInternal(
        formatStats, 
        type, 
        typeSortOrder, 
        CorruptStatistics.fileHasCorruptStatistics(createdBy) // This is a new method in CorruptStatistics
    );
}


// overloaded method
static org.apache.parquet.column.statistics.Statistics fromParquetStatisticsInternal(
      Statistics formatStats, PrimitiveType type, SortOrder typeSortOrder, boolean fileHasCorruptStats) {

String createdBy, Statistics formatStats, PrimitiveType type, boolean shouldIgnoreCorruptStats) {
SortOrder typeSortOrder = overrideSortOrderToSigned(type) ? SortOrder.SIGNED : sortOrder(type);
org.apache.parquet.column.statistics.Statistics.Builder statsBuilder =
org.apache.parquet.column.statistics.Statistics.getBuilderForReading(type);

if (formatStats != null) {
if (formatStats.isSetMin_value() && formatStats.isSetMax_value()) {
byte[] min = formatStats.min_value.array();
byte[] max = formatStats.max_value.array();
if (isMinMaxStatsSupported(type) || Arrays.equals(min, max)) {
statsBuilder.withMin(min);
statsBuilder.withMax(max);
}
} else {
boolean isSet = formatStats.isSetMax() && formatStats.isSetMin();
boolean maxEqualsMin = isSet ? Arrays.equals(formatStats.getMin(), formatStats.getMax()) : false;
boolean sortOrdersMatch = SortOrder.SIGNED == typeSortOrder;
// The shouldIgnoreCorruptStats flag applies only to BINARY and FIXED_LEN_BYTE_ARRAY.
// For other types, shouldIgnoreStatistics always returns false, so we only guard those.
PrimitiveTypeName primitiveTypeName = type.getPrimitiveTypeName();
boolean ignoreForThisColumn = shouldIgnoreCorruptStats
&& (primitiveTypeName == PrimitiveTypeName.BINARY
|| primitiveTypeName == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY);
if (!ignoreForThisColumn && (sortOrdersMatch || maxEqualsMin)) {
Comment on lines +995 to +998

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check in shouldIgnoreStatistics is dead code with current changes as we always pass BINARY

if (columnType != PrimitiveTypeName.BINARY && columnType !=PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)

We can utilize the new methods here. Please refer to below comments for context.

if (!(fileHasCorruptStats && CorruptStatistics.isCorruptStatisticsColumnType(type.getPrimitiveTypeName()))

Instead of calling or moving the PrimitiveTypeName checks to ParquetMetadataConverter and leave it as the responsibility of CorruptStatistics

if (!CorruptStatistics.shouldIgnoreStatistics(createdBy, type.getPrimitiveTypeName())

if (isSet) {
statsBuilder.withMin(formatStats.min.array());
statsBuilder.withMax(formatStats.max.array());
}
}
}

if (formatStats.isSetNull_count()) {
statsBuilder.withNumNulls(formatStats.null_count);
}
}
return statsBuilder.build();
}

GeospatialStatistics toParquetGeospatialStatistics(
org.apache.parquet.column.statistics.geospatial.GeospatialStatistics geospatialStatistics) {
if (geospatialStatistics == null) {
Expand Down Expand Up @@ -1794,13 +1834,24 @@ public FileMetaDataAndRowGroupOffsetInfo visit(RangeMetadataFilter filter) throw

public ColumnChunkMetaData buildColumnChunkMetaData(
ColumnMetaData metaData, ColumnPath columnPath, PrimitiveType type, String createdBy) {
boolean shouldIgnoreCorruptStats =
CorruptStatistics.shouldIgnoreStatistics(createdBy, PrimitiveTypeName.BINARY);
return buildColumnChunkMetaData(metaData, columnPath, type, createdBy, shouldIgnoreCorruptStats);
Comment on lines +1837 to +1839
}

ColumnChunkMetaData buildColumnChunkMetaData(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buildColumnChunkMetaData can delegate to a package-private overload that takes the boolean similar to what you have done but with few changes,

public ColumnChunkMetaData buildColumnChunkMetaData(
      ColumnMetaData metaData, ColumnPath columnPath, PrimitiveType type, String createdBy) {
    return buildColumnChunkMetaData(
        metaData, columnPath, type, CorruptStatistics.fileHasCorruptStatistics(createdBy));
}

ColumnChunkMetaData buildColumnChunkMetaData(
      ColumnMetaData metaData, ColumnPath columnPath, PrimitiveType type, boolean fileHasCorruptStats) {
    SortOrder expectedOrder = overrideSortOrderToSigned(type) ? SortOrder.SIGNED : sortOrder(type);
    return ColumnChunkMetaData.get(...,
        fromParquetStatisticsInternal(metaData.statistics, type, expectedOrder, fileHasCorruptStats), ...);
}

No need to pass createdBy downstream, the boolean is all the internal overload needs. SortOrder computation moves here since we bypass fromParquetStatistics to avoid re-parsing createdBy as you have already done by replacing fromParquetStatisticsInternal with fromParquetStatistics.

Also notice how the new public methods we extracted in CorruptStatistics are being used in each delegate method here

ColumnMetaData metaData,
ColumnPath columnPath,
PrimitiveType type,
String createdBy,
boolean shouldIgnoreCorruptStats) {
return ColumnChunkMetaData.get(
columnPath,
type,
fromFormatCodec(metaData.codec),
convertEncodingStats(metaData.getEncoding_stats()),
fromFormatEncodings(metaData.encodings),
fromParquetStatistics(createdBy, metaData.statistics, type),
fromParquetStatisticsInternal(createdBy, metaData.statistics, type, shouldIgnoreCorruptStats),
metaData.data_page_offset,
metaData.dictionary_page_offset,
metaData.num_values,
Expand Down Expand Up @@ -1829,6 +1880,10 @@ public ParquetMetadata fromParquetMetadata(
MessageType messageType = fromParquetSchema(parquetMetadata.getSchema(), parquetMetadata.getColumn_orders());
List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
List<RowGroup> row_groups = parquetMetadata.getRow_groups();
// Compute once per file: the result is the same for BINARY and FIXED_LEN_BYTE_ARRAY
// (the only types affected by PARQUET-251), and always false for other types.
Comment on lines +1883 to +1884
boolean shouldIgnoreCorruptStats =
CorruptStatistics.shouldIgnoreStatistics(parquetMetadata.getCreated_by(), PrimitiveTypeName.BINARY);
Comment on lines +1883 to +1886

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is calling shouldIgnoreStatistics with PrimitiveTypeName.BINARY hardcoded which is incorrect.

Instead we can refactor shouldIgnoreStatistics by adding public methods.

public static boolean shouldIgnoreStatistics(String createdBy, PrimitiveTypeName columnType) {
    if (!isCorruptStatisticsColumnType(columnType)) {
      // the bug only applies to binary columns
      return false;
    }
    return fileHasCorruptStatistics(createdBy);
}

public static boolean isCorruptStatisticsColumnType(PrimitiveTypeName columnType) {
    return columnType == PrimitiveTypeName.BINARY || columnType == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
}

public static boolean fileHasCorruptStatistics(String createdBy) {
    // rest of the logic from shouldIgnoreStatistics
}


if (row_groups != null) {
for (RowGroup rowGroup : row_groups) {
Expand Down Expand Up @@ -1909,7 +1964,8 @@ public ParquetMetadata fromParquetMetadata(
metaData,
columnPath,
messageType.getType(columnPath.toArray()).asPrimitiveType(),
createdBy);
createdBy,
shouldIgnoreCorruptStats);
column.setRowGroupOrdinal(rowGroup.getOrdinal());
if (metaData.isSetBloom_filter_offset()) {
column.setBloomFilterOffset(metaData.getBloom_filter_offset());
Expand Down