Polars и Pyspak: мощная комбинация для эффективной обработки данных и манипулирования ими

В этой статье мы исследуем использование Python Polars и Apache Spark для обработки и объединения больших наборов данных. Хотя приведенное ниже упражнение предназначено почти только для развлечения, оно имеет практическое применение для аналитиков данных и инженеров, которые часто работают с большими наборами данных.

Если вы еще этого не сделали, ознакомьтесь с моими предыдущими статьями для краткого ознакомления с библиотекой Polars (Часть 1, Часть 2 и Часть 3). Следите за новыми статьями из этой серии.

С увеличением объема и разнообразия данных, доступных сегодня, масштабируемые облачные решения часто используются для таких задач. Однако не у всех есть доступ к этим решениям или они хотели бы избежать связанных с ними затрат на вычисления. Вот где пригодятся такие инструменты, как Polars и Spark.

  • Polars — это библиотека Python для обработки данных, в которой в качестве базовой структуры данных используется Apache Arrow. С помощью Polars пользователи могут эффективно работать с большими наборами данных в памяти, используя такие инструменты, как фильтрация, агрегирование, группировка и обработка. Apache Arrow предназначен для эффективной обработки в памяти и передачи данных между различными системами и языками. Столбчатый формат данных обеспечивает эффективную обработку данных, а поддержка выделения памяти с нулевым копированием гарантирует, что Polars может обрабатывать очень большие наборы данных с минимальным использованием памяти.
  • Apache Parquet, столбчатый формат хранения, идеально подходит для эффективного хранения и извлечения больших наборов данных. Сжатый столбчатый формат Parquet поддерживает различные типы данных и сложные структуры, что делает его подходящим для многих задач обработки данных.
  • Apache Spark – это платформа распределенной обработки данных, которая может работать как с Apache Arrow, так и с Apache Parquet. С помощью Spark пользователи могут работать с большими наборами данных параллельно на нескольких узлах в кластере.

Вместе Polars, Spark и Parquet обеспечивают мощную комбинацию для работы с большими наборами данных в памяти и для хранения, обеспечивая эффективную обработку данных и манипулирование ими для широкого круга приложений, интенсивно использующих данные.

К веселью. В этом упражнении мы создаем фальшивый набор данных из 100 миллионов строк, используя Polars и Spark. Пример кода ниже имеет две функции:

  • Функция create_fake_data создает фрейм данных из 20 миллионов строк, используя Polars и библиотеку faker, и материализует его в виде файла Parquet.
  • Затем функция «spark_union» берет 20 миллионов строк и добавляет к ним 5 раз, чтобы с помощью Spark создать фрейм данных из 100 миллионов строк.

На моем ноутбуке (Intel i7–1260P 12-го поколения и 16 ГБ ОЗУ) выполнение двух функций в среднем занимает около 2 минут.

import os
from datetime import datetime, timedelta
import time
import random

import polars as pl
from faker import Faker

from large_parquet_spark import spark_union

# optional, in case your files are not in the working directory
os.chdir("/path/")
file_path = "/path/"

Faker.seed(0) #optional, seed value if you want to return consistent data
fake = Faker("en_CA") #change this to your locale
random.seed(10) #optional, seed value if you want to return consistent data

# added to measure elapsed time
start = time.perf_counter()

# Set number of rows and chunks for the Polars to create
n_rows = 20_000_000
n_chunks = 15
chunk_size = int(n_rows / n_chunks)

# Set date range for the dataset
start_date = datetime(2022, 1, 1)
end_date = datetime(2022, 12, 31)

def create_fake_data():
    '''
    Generates a fake dataset with 5 columns. 
    Number of rows can be defined. Data generated in chunks using 
    Python Polars library (faster for initial smaller number of rows).
    Returns a dataframe materialized as a parquet file.
    '''
    #Generate random dates between the date range
    days = (end_date - start_date).days
    dates = [
        start_date + timedelta(days=random.randint(0, days))
        for i in range(chunk_size)
    ]
    random.shuffle(dates)
    
    # Create a list of 10 random alpha-numeric values for the 'Item' column
    items = [
        "".join(
            random.choices(
                "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890",
                k=10,
            )
        )
        for i in range(100)
    ]
    
    # Create a list of 20 fake company names
    company = [fake.company() for i in range(20)]
    
    # Create a DataFrame with five columns
    # loop through each chunk and append it to the Parquet file
    
    df = None
    df_chunk = None
    
    for i in range(1, n_chunks + 1):
    
        # Generate random dates between the date range
        days = (end_date - start_date).days
        chunk_dates = [
            start_date + timedelta(days=random.randint(0, days))
            for i in range(chunk_size)
        ]
        random.shuffle(chunk_dates)
    
        # Create a DataFrame with five columns
        df_chunk = pl.DataFrame(
            {
                "Date": [d.date() for d in chunk_dates],
                "Company": [random.choice(company) for i in range(chunk_size)],
                "Item": [random.choice(items) for i in range(chunk_size)],
                "On_hand": [random.randint(0, 100) for i in range(chunk_size)],
                "sales": [random.randint(0, 1000) for i in range(chunk_size)],
            }
        ).with_columns(pl.col("Date").cast(pl.Date)) #
    
        if i == 1:
            df = df_chunk
        else:
            df = df.collect().vstack(df_chunk)
    
        df = df.lazy()
        
    
        if df is not None:  
            df.sink_parquet(
                file_path + "chunk.parquet",
                row_group_size=chunk_size,
                slice_pushdown=True,
                compression="zstd",
            )

create_fake_data()

stop = time.perf_counter()
elapsed = stop - start
print(f"creating dataframe with {n_rows} rows took {elapsed}")

# See next section for details of this block
# take the 20M row df created by Polars and append that 5 times using
# Spark to create a 100M dataframe materialized as a parquet file
start = time.perf_counter()

row_count = spark_union()

stop = time.perf_counter()
elapsed = stop - start
print(f"creating dataframe with {row_count} rows took {elapsed}")

Функция «create_fake_data» возвращает 20 миллионов строк (примерно 165 МБ) за 111 секунд. Следующий раздел — модуль pyspark.

import sys
from datetime import datetime, timedelta
import time
import os

from pyspark.sql import SparkSession

os.chdir("/path/")
file_path = "/path/"

start = time.perf_counter()

def spark_union():
    # create a SparkSession
    spark = SparkSession.builder.appName("Append Parquet Files").getOrCreate()

    df1 = spark.read.parquet(file_path + "chunk.parquet")
    df2 = spark.read.parquet(file_path + "chunk.parquet")
    df3 = spark.read.parquet(file_path + "chunk.parquet")
    df4 = spark.read.parquet(file_path + "chunk.parquet")
    df5 = spark.read.parquet(file_path + "chunk.parquet")

    # union the two dataframes
    df_combined = df1.union(df2).union(df3).union(df4).union(df5)

    # write out the combined dataframe to a new Parquet file
    # replace 'overwrite' with 'append', even faster but creates multiple files
    df_combined.write.mode("overwrite").parquet(
        file_path + "large.parquet"
    )  

    # read in the Parquet file lazily
    df = spark.read.parquet(file_path + "large.parquet")

    # view the contents of the DataFrame
    df.show()
    df.printSchema()

    # get the row count of the DataFrame
    row_count = df.count()

    # print the row count
    return row_count

    # stop the SparkSession
    spark.stop()

Функция «spark_union» возвращает 100 миллионов строк (примерно 460 МБ) за 16 секунд.

В заключение, хотя это упражнение было сделано для развлечения, я надеюсь, что оно также демонстрирует практическое применение Polars и Spark. Оба инструмента являются мощными и входят в набор инструментов предприятий и частных лиц, работающих с большими наборами данных.

Я надеюсь, что вы найдете эту серию полезной, дорогой читатель. Если у вас есть опыт работы с Polars и Spark, но вы предпочитаете разные инструменты для работы с большими наборами данных, поделитесь своими мыслями в комментариях. Ваши идеи могут помочь другим аналитикам данных и инженерам выбрать лучшие инструменты для своих конкретных нужд.