Creating Memory Leaks with Parquet Metadata

parquet
How to create and patch a memory leak using the metadata of a Parquet file.
Published

March 1, 2025

# The Setup

In my day job, I develop computer vision models to detect methane emissions in satellite images. We have a training dataset of a few million chips (a subset of the full image from the satellite) that is about 2.5TB on disk stored in a couple thousand parquet files. Now, there’s not a lot of real labelled data for us to use, so a big part of my work is how to generate synthetic methane data.

Recently, our training jobs started running out of CPU memory. Each training epoch takes about 30 minutes, so we try to saturate the GPUs with large batches to train faster. The main bottleneck is data loading and keeping the GPUs saturated with data. To saturate the GPUs, we have multiple workers prefetching batches so the GPUs have high utilization. This means that the CPU needs to hold many extra batches in memory ready and waiting. Suffice to say, our training jobs run with fairly high memory usage; a small change to our training script can OOM (Out of Memory) the whole job.

# The Situation

Recently, our training jobs started OOMing in the first epoch, running out of CPU memory. We tried adjusting the obvious levers: batch size, the number of workers, and prefetching but the jobs were still running out of memory.

O.K. so what changed? Our training data had changed in two ways: 1. We started tracking additional metadata about our generated training dataset, going from ~15 columns to ~150 columns. The additional columns were mostly float64 data type with a few string or list of string columns. 2. The row group size in the parquet files decreased from 10 to 1, meaning, each row group has a single row.

The new columns aren’t loaded during the training loop, so this couldn’t be the cause of the memory explosion.

We decreased the row group size to 1 so that random sampling of training examples could be truly random. For performance reasons, we were randomly sampling row groups because Parquet does not support partial row group reads. It’s much faster to use the whole row group as a “sample” rather than an individual row because it avoids additional IO from loading the same row group multiple times. With row groups of 10 rows, we used all 10 rows, but then the model always saw those 10 rows together. This is actually even worse because due to how the data is generated, all the samples in a file are from the same MGRS tile. So the rows in a row group are correlated. With row groups of 1, we can still sample by row groups but now batches are composed of truly random samples.

We had benchmarked data loading with row groups of 10 and row groups of 1 and found them comparable. So why not switch and have better randomization?

# Finding the Source

# Experimenting

Let’s inspect the metadata for a file of 200 rows, 15 columns, and row group size of 10.

Code
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

num_cols = 15
num_rows = 1_000
data = {f"{x}": np.random.rand(num_rows) for x in range(num_cols)}
df = pd.DataFrame(data)
df.head()

file_path = "data.parquet"
df.to_parquet(
    file_path,
    compression="zstd",
    compression_level=9,
    row_group_size=10,
)

file = pq.ParquetFile(file_path)
print(file.metadata)
<pyarrow._parquet.FileMetaData object at 0x103c73ec0>
  created_by: parquet-cpp-arrow version 23.0.1
  num_columns: 15
  num_rows: 1000
  num_row_groups: 100
  format_version: 2.6
  serialized_size: 163754


The metadata for this parquet file is 14,895 bytes or ~0.14 MB. Now let’s see a file of 150 columns with row group size of 1.

Code
num_cols = 150
num_rows = 1_000
data = {f"{x}": np.random.rand(num_rows) for x in range(num_cols)}
df = pd.DataFrame(data)
df.head()

file_path = "data.parquet"
df.to_parquet(
    file_path,
    compression="zstd",
    compression_level=9,
    row_group_size=1,
)

file = pq.ParquetFile(file_path)
print(file.metadata)
<pyarrow._parquet.FileMetaData object at 0x1088eeb10>
  created_by: parquet-cpp-arrow version 23.0.1
  num_columns: 150
  num_rows: 1000
  num_row_groups: 1000
  format_version: 2.6
  serialized_size: 16139711


The metadata for this parquet file is 14,642,267 or 14MB, that’s massive for just metadata!

O.K. we’ve discovered that the metadata of our Parquet files had increased, but that should only be held in memory when reading from the file. To randomly sample from our dataset, we create an mapping of the index to the file and row group and then randomly sample the index. Each time a ParquetFile is initialized, it reads the file metadata into memory. This is expensive to do each time we read from the file, so we cache the object.

Code
import functools
from dataclasses import dataclass

from torch.utils.data import Dataset

@dataclass
class Partition:
    """Store information about the partition for faster retrieval."""

    file: str
    row_group: int


@functools.cache
def create_parquet_file(file: str) -> pq.ParquetFile:
    """Caches the creation of the ParquetFile."""
    return pq.ParquetFile(file)


class ArrowDataset(Dataset):

    def __init__(self, dataset: ds.FileSystemDataset) -> None:
    self.dataset = dataset
        self.partitions: list[Partition] = self.create_partition_mapping(self.dataset)

    def create_partition_mapping(self, dataset: ds.Dataset) -> list[Partition]:
        """Create a mapping of partition number to the file and row_group."""
        partitions = []

        for file in dataset.files:
            parquet_file = create_parquet_file(file)

            for row_group in range(parquet_file.num_row_groups):
                partitions.append(Partition(file, row_group))

        return partitions


    def __len__(self) -> int:
        """Return the number of row_groups in the dataset.  This should correspond to RecordBatches."""
        return len(self.partitions)


Our training data is roughly 2,000 files with 1,000 rows per file. At 14MB per file, that’s 27GB of memory just for metadata! But it gets worse! Since we train with 4 GPUs and have 2 workers per GPU and each of those workers has its own process, that means we’re holding roughly 27GB * 4 * 2 = 216GB of metadata in memory! That’s atrocious!

As it turns out, there are three variables that contribute to most of the metadata size (the number of rows being constant): the row group size, the number of columns, and the column name length.

Code
from pathlib import Path

import altair as alt

num_rows = 1_000
num_columns = []
num_rg = []
col_name_lengths = []
metadata_size = []

for num_cols in [1, 10, 50, 100, 150]:
    for name_length in [1, 10, 50, 100]:
        col_name = "x" *  name_length
        data = {f"{x}{col_name}": np.random.rand(num_rows) for x in range(num_cols)}
        df = pd.DataFrame(data)

        for row_group_size in [1_000, 100, 10, 1]:
            path = f"file_{num_cols}cols_{row_group_size}rgsize_{name_length}namelen.parquet"
            df.to_parquet(
                path,
                compression="zstd",
                compression_level=9,
                row_group_size=row_group_size,
            )
            file = pq.ParquetFile(path)

            num_columns.append(num_cols)
            num_rg.append(row_group_size)
            metadata_size.append(file.metadata.serialized_size)
            col_name_lengths.append(name_length)

df = pd.DataFrame({
    "num_cols": num_columns,
    "row_group_size": num_rg,
    "col_name_lengths": col_name_lengths,
    "metadata_size_b": metadata_size,
}).assign(
    metadata_size_mb=lambda df: df.metadata_size_b / 1024**2,
)


When adding the new columns to our dataset, we used long descriptive names which turns out to also be a contributing factor.

TODO: talk about chart

# Removing the Memory Landmines

Since statistics are calculated for each row group, increasing the number of row groups also increases the amount of statistics calculated and stored in the metadata. The statistics are used during predicate push down to check if the row group matches the predicate and if not, the row group can be skipped. Smaller row groups reduces the utility of the statistics, and storing statistics for a single row is just silly. So we can remove the statistics for our purposes.

Row groups are also encoded (actually the column chunks in a row group are encoded). Encoding small row groups doesn’t allow for great compression and for our use case, encoding the row groups increased the size of the metadata. So we can remove the encoding.

Parquet files created with Pandas or PyArrow, by default, include the Arrow schema as well. The Arrow schema includes additional information. to faithfully recreate the original Arrow data. We aren’t using complex types that would benefit from this, so we can remove the extra schema.

Shrinking the size of the metadata boils down to three lines when writing the parquet files:

Code
num_cols = 150
num_rows = 1_000
data = {f"{x}": np.random.rand(num_rows) for x in range(num_cols)}
df = pd.DataFrame(data)
df.head()

file_path = "data.parquet"
df.to_parquet(
    file_path,
    compression="zstd",
    compression_level=9,
    row_group_size=1,
    write_statistics=False,
    store_schema=False,
    use_dictionary=False
)

file = pq.ParquetFile(file_path)
print(file.metadata)
<pyarrow._parquet.FileMetaData object at 0x108d7d170>
  created_by: parquet-cpp-arrow version 23.0.1
  num_columns: 150
  num_rows: 1000
  num_row_groups: 1000
  format_version: 2.6
  serialized_size: 6934573


The size of the metadata is now 5,737,509 bytes or 5.4MB, that’s 40% of the initial 14MB.

And we can rerun the experiment with the new parameters.


So we were able to reduce the metadata size by 60% just by changing some parameters when writing the file. From the chart, we can see that by reducing the average column name length from 50 characters to 20 characters, we can shave off another few MB per file. That’s simple enough.

We can shave another another few MB off by reducing the number of columns. That’s not so simple. We want to store that extra data with our training data, it’s useful for analysis and debugging. Parquet isn’t great for storing wide data (lots of columns) but 150 columns doesn’t seem that wide. We don’t want to separate the extra data into separate files because that requires tracking two datasets and joining them; it’s doable but it’s extra work. We could also reduce the extra columns by grouping them all into a single column of JSON or pyarrow structs. This works really well for reducing the size of the metadata but it necessitates knowing exactly which columns to group together and this is variable as our generated training data is updated regularly and has a few configurations. So that would also be a bit of a headache.

# You’re Holding it Wrong

Now, you might be thinking, “Parquet isn’t intended for these workloads, why not use a more appropriate file format?” And to that I say, yes it isn’t a typical workload but I challenge you to find a better file format that is efficient for random access patterns and still support projection pushdowns. Our use case should be fairly common for training machine learning models. There is already a Pull Request in the Arrow project to make Parquet metadata reading faster for random access patterns.

The solution we’re now using, Parquet files without metadata nor encoding isn’t all that different from the Arrow format (A.K.A. Feather A.K.A. Arrow I.P.C.). We might consider storing our training data in Arrow format as PyArrow can directly read the bytes on disk without any additional processing. In theory, this could be faster than Parquet for our workloads but our initial benchmarks indicated that storing in Arrow format was no faster than storing in Parquet format.

We also tried selecting rows directly with pa.dataset.take() as that API is a bit cleaner and seems like the preferred way to select rows from a dataset. However, this was roughly 30% slower than rolling our solution.

# Resources