Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions pyiceberg/table/puffin.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,31 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import importlib.metadata
import io
import math
import zlib
from collections.abc import Iterable
from typing import TYPE_CHECKING, Literal

from pydantic import Field
from pyroaring import BitMap, FrozenBitMap

from pyiceberg.io import OutputFile
from pyiceberg.typedef import IcebergBaseModel

if TYPE_CHECKING:
import pyarrow as pa

# Short for: Puffin Fratercula arctica, version 1
MAGIC_BYTES = b"PFA1"
DELETION_VECTOR_MAGIC = b"\xd1\xd3\x39\x64"
EMPTY_BITMAP = FrozenBitMap()
MAX_JAVA_SIGNED = int(math.pow(2, 31)) - 1
PROPERTY_REFERENCED_DATA_FILE = "referenced-data-file"
# Reserved field id of the row position (_pos) metadata column, referenced by
# deletion-vector-v1 blob metadata (Java: MetadataColumns.ROW_POSITION)
ROW_POSITION_FIELD_ID = 2147483645


def _deserialize_bitmap(pl: bytes) -> list[BitMap]:
Expand Down Expand Up @@ -62,6 +71,35 @@ def _deserialize_bitmap(pl: bytes) -> list[BitMap]:
return bitmaps


def _serialize_bitmaps(bitmaps: dict[int, BitMap]) -> bytes:
"""
Serialize a dictionary of bitmaps into a byte array.

The format is:
- 8 bytes: number of bitmaps (little-endian)
- For each bitmap:
- 4 bytes: key (little-endian)
- n bytes: serialized bitmap
"""
with io.BytesIO() as out:
sorted_keys = sorted(bitmaps.keys())

# number of bitmaps
out.write(len(sorted_keys).to_bytes(8, "little"))

for key in sorted_keys:
if key < 0:
raise ValueError(f"Invalid unsigned key: {key}")
if key > MAX_JAVA_SIGNED:
raise ValueError(f"Key {key} is too large, max {MAX_JAVA_SIGNED} to maintain compatibility with Java impl")

# key
out.write(key.to_bytes(4, "little"))
# bitmap
out.write(bitmaps[key].serialize())
return out.getvalue()


class PuffinBlobMetadata(IcebergBaseModel):
type: Literal["deletion-vector-v1"] = Field()
fields: list[int] = Field()
Expand Down Expand Up @@ -114,3 +152,99 @@ def __init__(self, puffin: bytes) -> None:

def to_vector(self) -> dict[str, "pa.ChunkedArray"]:
return {path: _bitmaps_to_chunked_array(bitmaps) for path, bitmaps in self._deletion_vectors.items()}


class PuffinWriter:
"""Writes a Puffin file containing a single deletion-vector-v1 blob to an output file."""

_output_file: OutputFile
_blobs: list[PuffinBlobMetadata]
_blob_payloads: list[bytes]
_created_by: str

def __init__(self, output_file: OutputFile, created_by: str | None = None) -> None:
self._output_file = output_file
self._blobs = []
self._blob_payloads = []
self._created_by = (
created_by if created_by is not None else f"PyIceberg version {importlib.metadata.version('pyiceberg')}"
)

def set_blob(
self,
positions: Iterable[int],
referenced_data_file: str,
) -> None:
"""Set the deletion vector blob for a data file, replacing any previously set blob.

Args:
positions: Zero-based positions of the deleted rows in the referenced data file.
referenced_data_file: Location of the data file the deletion vector applies to.
"""
# We only support one blob at the moment
self._blobs = []
self._blob_payloads = []

bitmaps: dict[int, BitMap] = {}
for pos in positions:
if pos < 0:
raise ValueError(f"Invalid position: {pos}, positions must be non-negative")
key = pos >> 32
low_bits = pos & 0xFFFFFFFF
if key not in bitmaps:
bitmaps[key] = BitMap()
bitmaps[key].add(low_bits)

if not bitmaps:
raise ValueError("Deletion vector must contain at least one position")

cardinality = sum(len(bm) for bm in bitmaps.values())
vector_payload = _serialize_bitmaps(bitmaps)

# deletion-vector-v1 blob layout: combined length of magic and vector (4 bytes, big-endian),
# the DV magic bytes, the serialized vector, and a CRC-32 checksum of magic + vector (4 bytes, big-endian)
blob_content = DELETION_VECTOR_MAGIC + vector_payload
self._blob_payloads.append(
len(blob_content).to_bytes(4, "big") + blob_content + zlib.crc32(blob_content).to_bytes(4, "big")
)

self._blobs.append(
PuffinBlobMetadata(
type="deletion-vector-v1",
fields=[ROW_POSITION_FIELD_ID],
# -1 means the snapshot id and sequence number are inherited at commit time
snapshot_id=-1,
sequence_number=-1,
# offset and length are placeholders; finish() fills them in when assembling the file
offset=0,
length=0,
properties={PROPERTY_REFERENCED_DATA_FILE: referenced_data_file, "cardinality": str(cardinality)},
compression_codec=None,
)
)

def finish(self) -> int:
"""Write the Puffin file to the output file and return its size in bytes."""
with io.BytesIO() as out:
out.write(MAGIC_BYTES)

blobs_metadata: list[PuffinBlobMetadata] = []
for blob_metadata, blob_payload in zip(self._blobs, self._blob_payloads, strict=True):
blobs_metadata.append(blob_metadata.model_copy(update={"offset": out.tell(), "length": len(blob_payload)}))
out.write(blob_payload)

footer = Footer(blobs=blobs_metadata, properties={"created-by": self._created_by})
footer_payload_bytes = footer.model_dump_json(by_alias=True, exclude_none=True).encode("utf-8")

out.write(MAGIC_BYTES)
out.write(footer_payload_bytes)
out.write(len(footer_payload_bytes).to_bytes(4, "little"))
out.write((0).to_bytes(4, "little")) # flags
out.write(MAGIC_BYTES)

puffin_bytes = out.getvalue()

with self._output_file.create(overwrite=True) as output_stream:
output_stream.write(puffin_bytes)

return len(puffin_bytes)
152 changes: 151 additions & 1 deletion tests/table/test_puffin.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,23 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import importlib.metadata
import zlib
from os import path
from pathlib import Path

import pytest
from pyroaring import BitMap

from pyiceberg.table.puffin import _deserialize_bitmap
from pyiceberg.io.pyarrow import PyArrowFileIO
from pyiceberg.table.puffin import (
DELETION_VECTOR_MAGIC,
MAGIC_BYTES,
PROPERTY_REFERENCED_DATA_FILE,
PuffinFile,
PuffinWriter,
_deserialize_bitmap,
)


def _open_file(file: str) -> bytes:
Expand Down Expand Up @@ -71,3 +82,142 @@ def test_map_high_vals() -> None:

with pytest.raises(ValueError, match="Key 4022190063 is too large, max 2147483647 to maintain compatibility with Java impl"):
_ = _deserialize_bitmap(puffin)


def _new_writer(tmp_path: Path, created_by: str | None = None) -> tuple[PuffinWriter, Path]:
puffin_path = tmp_path / "test.puffin"
return PuffinWriter(PyArrowFileIO().new_output(str(puffin_path)), created_by=created_by), puffin_path


def test_puffin_round_trip(tmp_path: Path) -> None:
# Define some deletion positions for a file
deletions = [5, (1 << 32) + 1, 5] # Test with a high-bit position and duplicate

file_path = "path/to/data.parquet"

# Write the Puffin file
writer, puffin_path = _new_writer(tmp_path, created_by="my-test-app")
writer.set_blob(positions=deletions, referenced_data_file=file_path)
size = writer.finish()

# Read the Puffin file back
puffin_bytes = puffin_path.read_bytes()
assert size == len(puffin_bytes)
reader = PuffinFile(puffin_bytes)

# Assert footer metadata
assert reader.footer.properties["created-by"] == "my-test-app"
assert len(reader.footer.blobs) == 1

blob_meta = reader.footer.blobs[0]
assert blob_meta.properties[PROPERTY_REFERENCED_DATA_FILE] == file_path
assert blob_meta.properties["cardinality"] == str(len(set(deletions)))

# Assert the content of deletion vectors
read_vectors = reader.to_vector()

assert file_path in read_vectors
assert read_vectors[file_path].to_pylist() == sorted(set(deletions))


def test_puffin_round_trip_with_sparse_bitmap_keys(tmp_path: Path) -> None:
# High bits 0 and 2 are present while 1 is absent; the writer must emit sorted keys
# and the reader pads the missing key with an empty bitmap.
positions = [3, (2 << 32) + 4]
writer, puffin_path = _new_writer(tmp_path)
writer.set_blob(positions=positions, referenced_data_file="file.parquet")
writer.finish()

vectors = PuffinFile(puffin_path.read_bytes()).to_vector()
assert vectors["file.parquet"].to_pylist() == positions


def test_write_and_read_puffin_file(tmp_path: Path) -> None:
writer, puffin_path = _new_writer(tmp_path)
writer.set_blob(positions=[1, 2, 3], referenced_data_file="file1.parquet")
writer.set_blob(positions=[4, 5, 6], referenced_data_file="file2.parquet")
writer.finish()

reader = PuffinFile(puffin_path.read_bytes())

assert len(reader.footer.blobs) == 1
blob = reader.footer.blobs[0]

assert blob.properties["referenced-data-file"] == "file2.parquet"
assert blob.properties["cardinality"] == "3"
assert blob.type == "deletion-vector-v1"
# Reserved field id of the row position column (Java MetadataColumns.ROW_POSITION, INT_MAX - 2);
# required for Java/Spark interoperability.
assert blob.fields == [2147483645]
assert blob.snapshot_id == -1
assert blob.sequence_number == -1
assert blob.compression_codec is None

vectors = reader.to_vector()
assert len(vectors) == 1
assert "file1.parquet" not in vectors
assert vectors["file2.parquet"].to_pylist() == [4, 5, 6]


def test_deletion_vector_blob_framing_is_spec_compliant(tmp_path: Path) -> None:
# PuffinFile reads only the serialized vector, skipping the blob's length prefix,
# deletion-vector magic and CRC-32. Assert that framing directly at the byte level so
# the bytes an external reader (Java/Spark) relies on stay spec-compliant.
positions = [0, 1, 5, (1 << 32) + 7]
writer, puffin_path = _new_writer(tmp_path)
writer.set_blob(positions=positions, referenced_data_file="file.parquet")
writer.finish()
puffin_bytes = puffin_path.read_bytes()

# The Puffin file begins with the magic.
assert puffin_bytes[:4] == MAGIC_BYTES

blob = PuffinFile(puffin_bytes).footer.blobs[0]
blob_bytes = puffin_bytes[blob.offset : blob.offset + blob.length]

# Layout: length (4B big-endian) | DV magic (4B) | vector | CRC-32 (4B big-endian),
# where the length and CRC-32 both cover the magic bytes plus the vector.
length_prefix = int.from_bytes(blob_bytes[0:4], "big")
dv_magic = blob_bytes[4:8]
vector = blob_bytes[8 : 4 + length_prefix]
crc = int.from_bytes(blob_bytes[4 + length_prefix : 8 + length_prefix], "big")

assert dv_magic == DELETION_VECTOR_MAGIC
assert length_prefix == len(dv_magic) + len(vector)
assert blob.length == 4 + length_prefix + 4
assert crc == zlib.crc32(dv_magic + vector)


def test_puffin_file_with_no_blobs(tmp_path: Path) -> None:
writer, puffin_path = _new_writer(tmp_path)
writer.finish()

reader = PuffinFile(puffin_path.read_bytes())
assert len(reader.footer.blobs) == 0
assert len(reader.to_vector()) == 0


def test_puffin_writer_default_created_by(tmp_path: Path) -> None:
writer, puffin_path = _new_writer(tmp_path)
writer.finish()

reader = PuffinFile(puffin_path.read_bytes())
assert reader.footer.properties["created-by"] == f"PyIceberg version {importlib.metadata.version('pyiceberg')}"


def test_set_blob_rejects_negative_positions(tmp_path: Path) -> None:
writer, _ = _new_writer(tmp_path)
with pytest.raises(ValueError, match="Invalid position: -1"):
writer.set_blob(positions=[1, -1], referenced_data_file="file.parquet")


def test_set_blob_rejects_empty_positions(tmp_path: Path) -> None:
writer, _ = _new_writer(tmp_path)
with pytest.raises(ValueError, match="Deletion vector must contain at least one position"):
writer.set_blob(positions=[], referenced_data_file="file.parquet")


def test_set_blob_rejects_position_exceeding_java_key_range(tmp_path: Path) -> None:
writer, _ = _new_writer(tmp_path)
with pytest.raises(ValueError, match="Key 2147483648 is too large, max 2147483647"):
writer.set_blob(positions=[(2**31) << 32], referenced_data_file="file.parquet")