от Мак Макой

В недавней публикации мы рассказали, почему выбрали Apache Airflow для организации конвейеров машинного обучения в Chick-fil-A. В этом посте мы углубимся в то, как мы улучшили Airflow для поддержки управления версиями данных для машинного обучения.

Воздушный поток в его ядре

По своей сути Airflow — это инструмент оркестровки рабочего процесса, и в этом он прекрасен. Существуют сотни интеграций с открытым исходным кодом, которые упрощают создание рабочего процесса без большого количества кода.

Хотя большая часть пользователей Airflow создает конвейеры данных, Airflow изначально не поддерживает перемещение больших объемов данных между задачами. Он поддерживает XCom, но они не предназначены для передачи наборов данных. Решение состоит в том, чтобы передавать ссылки на пути хранилища облачных объектов (например, S3) между задачами.

Пример:

# 1: Define S3 path
athena_output_location = "s3://my_bucket/source_data/athena_task/"

athena_task = AthenaOperator(
    task_id="athena_task",
    query="SELECT * FROM locations",
    database="enterprise",
    # 2: Define where to write query output
    output_location=athena_output_location
)
sagemaker_training_task = SageMakerTrainingOperator(
    task_id="sagemaker_training_task",
    config={
        "InputDataConfig": {
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    # 3: Define where to read training input
                    "S3Uri": athena_output_location,
                    ...
                },
                ...
            }
        },
        ...
    }
)
athena_task >> sagemaker_training_task

Это выполняет свою работу, но нам нужно еще несколько улучшений для конвейеров машинного обучения.

Данные должны быть версионными

Инженеры и специалисты по данным знают о важности контроля версий своего кода с помощью таких инструментов, как GitHub. При разработке моделей ваши данные так же важны, как и ваш код, если не важнее.

Знание точного набора данных, на котором обучалась модель, важно для объяснения и воспроизведения вашей модели.

Заинтересованные стороны, клиенты или правительства могут попросить вас объяснить, почему ваша модель приняла решение. Знание состава ваших тренировочных данных может помочь вам ответить на них.

В любой момент вам может потребоваться перезапустить конвейер обучения. Например, представьте, что задание по обучению провалилось на выходных. Вы обнаружили проблему и хотите повторно запустить задание обучения в понедельник. Данные обучения должны оставаться в своем состоянии с выходных. Любые обновления исходного набора данных после запланированного запуска не должны использоваться, поскольку они могут испортить модель.

Версии данных в Airflow

До того, как мы внедрили Airflow, некоторые из наших специалистов по данным использовали DVC (Контроль версий данных) для оркестровки конвейеров своих моделей. Как следует из названия, DVC поддерживает управление версиями данных.

Чтобы включить управление версиями данных в Airflow, мы используем макросы Airflow для параметризации путей S3 на основе текущего запуска и задачи DAG. Изменение примера из предыдущего…

athena_output_location = "s3://my_bucket/{{ run_id }}/source_data/{{ task.task_id }}/"

Это гарантирует, что каждый запуск DAG будет иметь собственный путь S3. Пользователи могут легко найти данные конкретного запуска конвейера в S3, выполнив поиск идентификатора запуска в пути S3.

Специалистам по данным не нужно управлять путями S3

Параметризация путей S3 с помощью макросов Airflow позволяет управлять версиями данных, но мы хотим абстрагироваться от необходимости постоянно генерировать и передавать пути нашим специалистам по данным.

Что, если бы вам вообще не нужно было ссылаться на пути S3? Вместо этого вам нужно было ссылаться на каждый набор данных только по простому подходящему имени? Вот почему мы создали Data Conduits.

Data Conduit — это простая абстракция вокруг хранилища данных, такого как путь S3. У него есть имя, и на него ссылаются в DAG.

Вот пример определения Data Conduit. Вы передаете имя и корзину S3 для хранения данных.

menu_items = S3Conduit(
  dc_name="menu_items", 
  bucket="my-bucket"
)

Внутри S3Conduit он создает URL-адрес S3, шаблон которого основан на идентификаторе запуска DAG:

class S3Conduit:

def __init__(self, dc_name, bucket, url=None):
    self.dc_name = dc_name
    self.bucket = bucket
    if url is None:
      self.url = "s3://" + self.bucket + "/{{ run_id }}/" + self.dc_name + "/"
    else:
      self.url = url

Здесь путь передачи данных menu_items будет s3://my-bucket/{{ run_id }}/menu_items/.

Используемые каналы передачи данных

Мы создали версии Chick-fil-A наших наиболее часто используемых операторов Airflow. Каждый из них наследует свою версию с открытым исходным кодом, чтобы не изобретать велосипед и получать выгоду от будущих улучшений с открытым исходным кодом.

Мы добавили несколько улучшений для каждого из наших операторов Airflow, и одно из них — поддержка каналов передачи данных.

Каждый из наших операторов может быть классифицирован как источник данных, приемник данных или преобразователь данных. Каждый оператор также может быть классифицирован как узел на графе, а каналы передачи данных — это ребра узлов, которые их соединяют.

CFARedshiftToS3Operator — это источник данных, который выгружает данные из Redshift в S3.

query_menu_items = CFARedshiftToS3Operator(
  query="""
    SELECT *
    FROM menu.menu_items
    WHERE business_date = date '{{ ds }}''
  """,
  destination=S3Conduit(
    dc_name="menu_items", 
    bucket="my-bucket"
  ),
  ...
)

Обратите внимание, что пункт назначения определяется S3Conduit. Оператор выгрузит данные в шаблонный путь S3, определенный в файле S3Conduit.

CFAProcessingOperator — оператор преобразователя данных. Он запускает скрипт Python или R в контейнере Docker с помощью Sagemaker Processing. Он также автоматически монтирует все восходящие потоки данных в контейнер. Это означает, что специалисты по данным могут ссылаться на неизменяемый путь к локальному файлу в своем скрипте для чтения каждого набора данных вместо управления постоянно меняющимися путями к файлам S3.

Вот пример CFAProcessingOperator:

feature_engineering = CFAProcessingOperator(
  script="src/scripts/feature_engineering.py",
  image="12345678.dkr.ecr.us-east-1.amazonaws.com/my-image:tag",
  outputs=[
    S3Conduit(
      dc_name="training_set", 
      bucket="my-bucket"
    )
  ]
)

[query_menu_items, query_customer_orders] >> feature_engineering

Поскольку query_menu_items и query_customer_orders являются восходящими задачами feature_engineering, каналы данных menu_items и customer_orders будут доступны для сценария feature_engineering.py в контейнере Docker через монтирование Docker.

menu_items_df      = pd.read_parquet("/opt/ml/processing/input/menu_items/")
customer_orders_df = pd.read_parquet("/opt/ml/processing/input/customer_orders/")

Выходные данные разработки признаков также могут быть записаны в монтирование выходных данных.

training_df.to_parquet("/opt/ml/processing/output/training_set/")

Sagemaker Processing заботится о копировании выходных данных из локального пути в путь S3, определенный выходными данными S3Conduit training_set.

Наконец, чтобы завершить пример, мы можем записать training_set обратно в Redshift, используя CFAS3ToRedshiftOperator.

training_set_to_redshift = CFAS3ToRedshiftOperator(
  dc_input="training_set",
  schema="menu",
  table="training_set"
  ...
)

[query_menu_items, query_customer_orders] >> feature_engineering
feature_engineering >> training_set_to_redshift

Краткое содержание

Включение управления версиями данных по умолчанию и беспрепятственное перемещение данных между задачами с одной ответственностью — важные компоненты конвейерной инфраструктуры машинного обучения.

Благодаря функциям, которые мы добавили, наши специалисты по данным и инженеры меньше внимания уделяют управлению версиями и перемещению данных, а больше — обеспечению ценности для бизнеса!

Помогут ли эти шаблоны и усовершенствования Airflow вашим командам? Или как вы обеспечиваете управление версиями данных и беспрепятственное перемещение данных в своих командах? Дайте нам знать об этом в комментариях!