Polars и Pyspak: мощная комбинация для эффективной обработки данных и манипулирования ими
В этой статье мы исследуем использование Python Polars и Apache Spark для обработки и объединения больших наборов данных. Хотя приведенное ниже упражнение предназначено почти только для развлечения, оно имеет практическое применение для аналитиков данных и инженеров, которые часто работают с большими наборами данных.
Если вы еще этого не сделали, ознакомьтесь с моими предыдущими статьями для краткого ознакомления с библиотекой Polars (Часть 1, Часть 2 и Часть 3). Следите за новыми статьями из этой серии.
С увеличением объема и разнообразия данных, доступных сегодня, масштабируемые облачные решения часто используются для таких задач. Однако не у всех есть доступ к этим решениям или они хотели бы избежать связанных с ними затрат на вычисления. Вот где пригодятся такие инструменты, как Polars и Spark.
- Polars — это библиотека Python для обработки данных, в которой в качестве базовой структуры данных используется Apache Arrow. С помощью Polars пользователи могут эффективно работать с большими наборами данных в памяти, используя такие инструменты, как фильтрация, агрегирование, группировка и обработка. Apache Arrow предназначен для эффективной обработки в памяти и передачи данных между различными системами и языками. Столбчатый формат данных обеспечивает эффективную обработку данных, а поддержка выделения памяти с нулевым копированием гарантирует, что Polars может обрабатывать очень большие наборы данных с минимальным использованием памяти.
- Apache Parquet, столбчатый формат хранения, идеально подходит для эффективного хранения и извлечения больших наборов данных. Сжатый столбчатый формат Parquet поддерживает различные типы данных и сложные структуры, что делает его подходящим для многих задач обработки данных.
- Apache Spark – это платформа распределенной обработки данных, которая может работать как с Apache Arrow, так и с Apache Parquet. С помощью Spark пользователи могут работать с большими наборами данных параллельно на нескольких узлах в кластере.
Вместе Polars, Spark и Parquet обеспечивают мощную комбинацию для работы с большими наборами данных в памяти и для хранения, обеспечивая эффективную обработку данных и манипулирование ими для широкого круга приложений, интенсивно использующих данные.
К веселью. В этом упражнении мы создаем фальшивый набор данных из 100 миллионов строк, используя Polars и Spark. Пример кода ниже имеет две функции:
- Функция create_fake_data создает фрейм данных из 20 миллионов строк, используя Polars и библиотеку faker, и материализует его в виде файла Parquet.
- Затем функция «spark_union» берет 20 миллионов строк и добавляет к ним 5 раз, чтобы с помощью Spark создать фрейм данных из 100 миллионов строк.
На моем ноутбуке (Intel i7–1260P 12-го поколения и 16 ГБ ОЗУ) выполнение двух функций в среднем занимает около 2 минут.
import os from datetime import datetime, timedelta import time import random import polars as pl from faker import Faker from large_parquet_spark import spark_union # optional, in case your files are not in the working directory os.chdir("/path/") file_path = "/path/" Faker.seed(0) #optional, seed value if you want to return consistent data fake = Faker("en_CA") #change this to your locale random.seed(10) #optional, seed value if you want to return consistent data # added to measure elapsed time start = time.perf_counter() # Set number of rows and chunks for the Polars to create n_rows = 20_000_000 n_chunks = 15 chunk_size = int(n_rows / n_chunks) # Set date range for the dataset start_date = datetime(2022, 1, 1) end_date = datetime(2022, 12, 31) def create_fake_data(): ''' Generates a fake dataset with 5 columns. Number of rows can be defined. Data generated in chunks using Python Polars library (faster for initial smaller number of rows). Returns a dataframe materialized as a parquet file. ''' #Generate random dates between the date range days = (end_date - start_date).days dates = [ start_date + timedelta(days=random.randint(0, days)) for i in range(chunk_size) ] random.shuffle(dates) # Create a list of 10 random alpha-numeric values for the 'Item' column items = [ "".join( random.choices( "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890", k=10, ) ) for i in range(100) ] # Create a list of 20 fake company names company = [fake.company() for i in range(20)] # Create a DataFrame with five columns # loop through each chunk and append it to the Parquet file df = None df_chunk = None for i in range(1, n_chunks + 1): # Generate random dates between the date range days = (end_date - start_date).days chunk_dates = [ start_date + timedelta(days=random.randint(0, days)) for i in range(chunk_size) ] random.shuffle(chunk_dates) # Create a DataFrame with five columns df_chunk = pl.DataFrame( { "Date": [d.date() for d in chunk_dates], "Company": [random.choice(company) for i in range(chunk_size)], "Item": [random.choice(items) for i in range(chunk_size)], "On_hand": [random.randint(0, 100) for i in range(chunk_size)], "sales": [random.randint(0, 1000) for i in range(chunk_size)], } ).with_columns(pl.col("Date").cast(pl.Date)) # if i == 1: df = df_chunk else: df = df.collect().vstack(df_chunk) df = df.lazy() if df is not None: df.sink_parquet( file_path + "chunk.parquet", row_group_size=chunk_size, slice_pushdown=True, compression="zstd", ) create_fake_data() stop = time.perf_counter() elapsed = stop - start print(f"creating dataframe with {n_rows} rows took {elapsed}") # See next section for details of this block # take the 20M row df created by Polars and append that 5 times using # Spark to create a 100M dataframe materialized as a parquet file start = time.perf_counter() row_count = spark_union() stop = time.perf_counter() elapsed = stop - start print(f"creating dataframe with {row_count} rows took {elapsed}")
Функция «create_fake_data» возвращает 20 миллионов строк (примерно 165 МБ) за 111 секунд. Следующий раздел — модуль pyspark.
import sys from datetime import datetime, timedelta import time import os from pyspark.sql import SparkSession os.chdir("/path/") file_path = "/path/" start = time.perf_counter() def spark_union(): # create a SparkSession spark = SparkSession.builder.appName("Append Parquet Files").getOrCreate() df1 = spark.read.parquet(file_path + "chunk.parquet") df2 = spark.read.parquet(file_path + "chunk.parquet") df3 = spark.read.parquet(file_path + "chunk.parquet") df4 = spark.read.parquet(file_path + "chunk.parquet") df5 = spark.read.parquet(file_path + "chunk.parquet") # union the two dataframes df_combined = df1.union(df2).union(df3).union(df4).union(df5) # write out the combined dataframe to a new Parquet file # replace 'overwrite' with 'append', even faster but creates multiple files df_combined.write.mode("overwrite").parquet( file_path + "large.parquet" ) # read in the Parquet file lazily df = spark.read.parquet(file_path + "large.parquet") # view the contents of the DataFrame df.show() df.printSchema() # get the row count of the DataFrame row_count = df.count() # print the row count return row_count # stop the SparkSession spark.stop()
Функция «spark_union» возвращает 100 миллионов строк (примерно 460 МБ) за 16 секунд.
В заключение, хотя это упражнение было сделано для развлечения, я надеюсь, что оно также демонстрирует практическое применение Polars и Spark. Оба инструмента являются мощными и входят в набор инструментов предприятий и частных лиц, работающих с большими наборами данных.