Robert's Data Science Blog

Creating Parquet files

Parquet is a great format for storing dataframes. But we can leverage it even more by using row groups. If you are unfamiliar with these concepts then check out a tutorial on the workings of Parquet.

To my knowledge there are two benefits of using row groups: They can be processed in parallel and we can include metadata that can for example allow us to skip row groups that don’t contain the data we need.

Packages

I am using Polars for everything related to dataframes and pyarrow to get more out of Parquet statistics.

Both packages are capable of writing row group metadata, but I’m using pyarrow to inspect them from within Python. More specifically, I want to see the min_value/max_value of a column. These were previously called min/max. Polars write min_value/max_value and pyarrow write both, but only read min/max when showing the metadata statistics.

Dataset

Consider an example where we have a dataset with 20 million rows and 2 columns. The first column is the one we will filter on and the second column holds the values.

from pprint import pprint
from random import uniform
from uuid import uuid4

import polars as pl
import polars.selectors as cs

number_of_ids = 100_000
unique_ids = [str(uuid4()) for _ in range(number_of_ids)]
values_per_id = 200

df = pl.DataFrame({
    "id": [id for _ in range(values_per_id) for id in unique_ids],
    "value": [100 * uniform(0, 1) for _ in range(values_per_id * number_of_ids)]
})

with pl.Config(fmt_str_lengths=50):
    pprint(df.head(3))
shape: (3, 2)
┌──────────────────────────────────────┬───────────┐
│ id                                   ┆ value     │
│ ---                                  ┆ ---       │
│ str                                  ┆ f64       │
╞══════════════════════════════════════╪═══════════╡
│ fadb6f25-cd72-43a3-bf8b-2b70451ffc51 ┆ 4.799587  │
│ 23814f93-62eb-41f2-8ae7-0b0764d8d99c ┆ 49.216103 │
│ f2865c7a-eacc-411e-a520-e951ae9e8587 ┆ 76.454863 │
└──────────────────────────────────────┴───────────┘

Measuring performance

I am going to demonstrate the performance improvements using a single filtering operation on the id column, where the number of matching rows is small. Materializing the result into a dataframe is therefore not too time consuming.

Measuring performace can be tricky and with a task involving I/O like in this post, caching can deceptive. In particular, the first run can be substantially slower than any remaining runs due to caching the dataset in main memory. So if we make repeated measurements and consider the average run time the first, un-cached run time is swamped by the subsequent, cached counterparts.

Here I only measure the filtering operation once (per file creation/update). Running the measurements in a different order does not affect the results.

For more info on such a topic, see for instance Matt Dowle’s talk on the original H2O benchmarks.

Baseline performace

First we use pyarrow’s default Parquet writer to also write statistics.

import os
from pyarrow.parquet import ParquetFile, ParquetWriter, write_table

filename = "data.parquet"
write_table(df.to_arrow(), filename, compression='lz4', write_statistics=True)

There is only a single row group containing all the data.

parquet_file = ParquetFile(filename)
parquet_file.metadata
<pyarrow._parquet.FileMetaData object at 0x126a596c0>
  created_by: parquet-cpp-arrow version 11.0.0
  num_columns: 2
  num_rows: 20000000
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 637

Let us see how fast it is to filter on the id column.

from timeit import timeit

id = df.item(0, "id")
query = pl.scan_parquet(filename)
timeit(lambda: query.filter(pl.col("id") == id).collect(), number=1)
0.48111404199153185

Multiple row gropus

Let us see how performance is affected by writing multiple row groups. A row group should neither be too small nor too big. I have seen recommendations for sizes between 100_000 and 1_000_000, but try it out to see the effect.

import os
from pyarrow.parquet import ParquetFile, ParquetWriter, write_table

write_table(df.to_arrow(), filename, compression='lz4', row_group_size=100_000)

Now we have multiple row groups.

parquet_file = ParquetFile(filename)
parquet_file.metadata
<pyarrow._parquet.FileMetaData object at 0x10673dcb0>
  created_by: parquet-cpp-arrow version 11.0.0
  num_columns: 2
  num_rows: 20000000
  num_row_groups: 200
  format_version: 2.6
  serialized_size: 57877

Let us see an example of what the row group statistics looks like for the id column

parquet_file.metadata.row_group(0).column(0).statistics
<pyarrow._parquet.Statistics object at 0x14a6c49a0>
  has_min_max: True
  min: 00004554-a342-49a8-98df-131d65a4a730
  max: ffff23a0-8662-41ca-ad83-4376e64fcf20
  null_count: 0
  distinct_count: 0
  num_values: 100000
  physical_type: BYTE_ARRAY
  logical_type: String
  converted_type (legacy): UTF8

When filtering Polars can read the metadata of each row group and use the min and max statistics to infer whether or not to read the actual data in the row group. This provides quite a speed up:

query = pl.scan_parquet(filename)
timeit(lambda: query.filter(pl.col("id") == id).collect(), number=1)
0.11861637502443045

Not bad from adding an extra argument. But wait! There is more.

Controlled row groups

When writing row groups in an ad hoc manner there is no guarantee that ids that are lexicographically close ends up in the same row group. That is, a single row group may cover ids from a to z, which does not help narrow down which groups to read and which to skip.

We can facilitate more relevant grouping by explicitly creating row groups. I would like the row groups to be of comparable sizes – again containing around 100_000 rows each. (In this example every id has the same number of observations, but to generalize this I do a bit of rounding below.)

row_group_association = (
    df.group_by('id')
    .agg(pl.count())
    .sort('id')
    .with_columns(
        (pl.col("count").cum_sum() // 100_000).alias("row_group")
    )
    .select(~cs.by_name("count"))
)

Breaking this down, I compute the number of rows for each id and sort the id column. Then every 100_000-ish consecutive number of rows is binned into a row group. Here it is important to sort before binning.

Here the dataframe fits in my computer’s memory, so we can split it into smaller chunks based on the row group binning

split_df_by_group = (
    df
    .join(row_group_association, on='id')
    .partition_by("row_group", include_key=False)
)

With Pyarrow we can now write each of the smaller dataframes into a separate row group in a new file.

save_schema = split_df_by_group[0].to_arrow().schema
with ParquetWriter(filename, save_schema, compression="LZ4", version="2.6") as writer:
    for frame in split_df_by_group:
        writer.write_table(frame.to_arrow())

Now we can inspect the statistics again.

parquet_file = ParquetFile(filename)
print(parquet_file.metadata.row_group(0).column(0).statistics)
<pyarrow._parquet.Statistics object at 0x14a6c7ce0>
  has_min_max: True
  min: fab07261-63f9-4e92-8176-5317da0511cd
  max: fc103817-c993-4dc5-ac9f-c7a946592ea5
  null_count: 0
  distinct_count: 0
  num_values: 100000
  physical_type: BYTE_ARRAY
  logical_type: String
  converted_type (legacy): UTF8

Note that such a single group now contains a narrow range of ids and by construction each id is only in a single row group. This has a big relative impact on the filtering:

query = pl.scan_parquet(filename)
timeit(lambda: query.filter(pl.col("id") == id).collect(), number=1)
0.002844249946065247

Closing thoughts

So the Parquet format can be tweeked to offer better performance without any change in your Polars code. But this kind of effort is probably only worth it if you are going to read the file multiple times.

Finally, if data is already split into multiple files and organized in Hive partitioning then there is no point in trying to collect everything in a single big Parquet file – Polars can leverage the Hive partitioning directly.