Построение динамических конвейеров ML с конфигурацией Prefect, Dask и YAML и

Введение

В области машинного обучения (ML) поиск оптимального сочетания алгоритмов и шагов предварительной обработки для данной задачи имеет решающее значение. Однако оценка нескольких комбинаций конвейеров может потребовать много времени и больших вычислительных ресурсов. В этой статье мы представляем решение Swiss Army Knife, которое использует конфигурацию YAML, Prefect и Dask для параллельной оценки нескольких комбинаций конвейеров машинного обучения. Это решение предлагает масштабируемый и эффективный подход для изучения широкого диапазона комбинаций алгоритмов и шагов предварительной обработки.

Конфигурация YAML для комбинаций конвейеров

YAML (еще один язык разметки) предоставляет гибкий и удобочитаемый формат для указания комбинаций конвейеров. С конфигурацией YAML мы можем определять различные алгоритмы, этапы предварительной обработки и их параметры структурированным образом. Это позволяет нам легко модифицировать и экспериментировать с различными комбинациями конвейеров без изменения базового кода.

Префект и Даск для параллельной оценки

Prefect, мощная система управления рабочими процессами, в сочетании с Dask, инфраструктурой параллельных вычислений, позволяет нам параллельно выполнять комбинации конвейеров машинного обучения. Prefect предоставляет интуитивно понятный интерфейс для определения и организации сложных рабочих процессов, а Dask позволяет выполнять распределенные вычисления, эффективно используя доступные ресурсы. Вместе Prefect и Dask повышают масштабируемость и скорость оценки комбинаций нескольких конвейеров.

Внедрение швейцарского армейского ножа

Для реализации решения Swiss Army Knife мы разработали фрагмент кода Python, который включает конфигурацию YAML, Prefect и Dask. Фрагмент кода включает определения задач для каждого этапа конвейера машинного обучения, таких как загрузка данных, предварительная обработка, обучение модели и оценка.

Основной функционал выглядит следующим образом:

  1. Чтение файлов YAML. Решение считывает файлы YAML, содержащие различные комбинации алгоритмов и этапов предварительной обработки.
  2. Генерация комбинаций: Решение генерирует все возможные комбинации алгоритмов и шагов препроцессинга, указанные в файлах YAML.
    в этом случае генерируются и работают параллельно 2 пайплайна:
    один со скалером и один без него.
  3. Параллельная оценка. Используя возможности параллельных вычислений Dask, решение оценивает каждую комбинацию конвейеров параллельно. Это значительно ускоряет процесс оценки.
  4. Агрегирование результатов. Решение собирает результаты оценки и определяет наилучшую комбинацию конвейеров на основе указанной метрики.

Вот фрагмент кода:

# Import necessary libraries and modules
import itertools
import yaml
from collections import OrderedDict
from prefect import flow, task, unmapped
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from pathlib import Path
from sklearn.calibration import CalibratedClassifierCV
import os
from typing import Any
from prefect_dask.task_runners import DaskTaskRunner
import pandas as pd
def load_data(shared_pipeline_memory, **parameters):
    #currently its a dummy param, cause we are loading the iris dataset
    #data_path = parameters['data_path']
    #print(data_path)
    iris = load_iris(as_frame=True)
    X = iris.data
    y = iris.target
    shared_pipeline_memory['X'] = X
    shared_pipeline_memory['y'] = y
    return shared_pipeline_memory

def imputer(shared_pipeline_memory, **parameters):
    fill_value = parameters.get('fill_value')
    shared_pipeline_memory['X'].fillna(fill_value, axis=1, inplace=True)
    return shared_pipeline_memory

def split_train_test(shared_pipeline_memory, **parameters):
    test_ratio = parameters.get('test_ratio')
    X_train, X_test, y_train, y_test = train_test_split(shared_pipeline_memory.get('X'), shared_pipeline_memory.get('y'), test_size=test_ratio, random_state=42)
    shared_pipeline_memory['X_train'] = X_train
    shared_pipeline_memory['X_test'] = X_test
    shared_pipeline_memory['y_train'] = y_train
    shared_pipeline_memory['y_test'] = y_test
    return shared_pipeline_memory

def scaler_method(shared_pipeline_memory, **parameters):
    with_mean = parameters.get('with_mean')
    with_std = parameters.get('with_std')
    scaler = StandardScaler(with_mean=with_mean,with_std=with_std)
    X_train_scaled = scaler.fit_transform(shared_pipeline_memory.get('X_train'))
    X_test_scaled = scaler.transform(shared_pipeline_memory.get('X_test'))
    shared_pipeline_memory['X_train'] = X_train_scaled
    shared_pipeline_memory['X_test'] = X_test_scaled
    return shared_pipeline_memory

def passthrough_method(shared_pipeline_memory, **parameters):
    return shared_pipeline_memory

def train_model(shared_pipeline_memory, **parameters):
    model = CalibratedClassifierCV(LogisticRegression())
    model.fit(shared_pipeline_memory.get('X_train'), shared_pipeline_memory.get('y_train'))
    shared_pipeline_memory['model'] = model
    return shared_pipeline_memory


def evaluate_model(shared_pipeline_memory, **parameters):
    model = shared_pipeline_memory.get('model')
    X_test = shared_pipeline_memory.get('X_test')
    y_test = shared_pipeline_memory.get('y_test')
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    return accuracy

def main_flow():
    # List of YAML files
    files = ["train.yaml"]
    files = Path(os.path.dirname(__file__)+"/").glob('train.yaml')
    for file in files:
        with open(file, 'r') as stream:
            try:
                inputdict = OrderedDict(yaml.safe_load(stream))
            except BaseException as exc:
                print(exc)

        # Extract steps from the current YAML file
        steps = [inputdict[key] for key in inputdict]
        combinations = list(itertools.product(*steps))

    metricHolder = {}
    best_pipeline = distributed_flow(combinations, metricHolder)
    print("printing best pipeline and it's metric")
    print(best_pipeline)


@flow(task_runner=DaskTaskRunner())
def distributed_flow(combinations , metricHolder):
    perfect_future = run_pipeline_logic.map(combinations, unmapped(metricHolder),unmapped(globals()))
    pipelines_overall_results = pd.DataFrame([x.result() for x in perfect_future], columns=['pipeline', 'accuracy'])
    res = pipelines_overall_results.sort_values('accuracy', ascending=False)
    res = res.reset_index(drop=True)
    return [res.head(1)]

@task(name="train-model")
def run_pipeline_logic(combination: Any, metricHolder,globals_ctx):
    return train_single_pipe(combination, metricHolder,globals_ctx)

def train_single_pipe(config: Any,metricHolder,globals_ctx):
    result = None
    shared_pipeline_memory = {}
    for step in config:
        task_to_run = globals_ctx.get(step.get('task'))
        parameters = step.get('parameters')
        shared_pipeline_memory = task_to_run(shared_pipeline_memory, **parameters)

    print(f"flow : {config} has accuracy : {shared_pipeline_memory}")
    return config , shared_pipeline_memory

if __name__ == "__main__":

    main_flow()

файл определения train.yaml:

0:
  - name: Load Data
    task: load_data
    parameters:
      data_path: path/to/raw_data.csv
1:
  - name: Impute Data
    task: imputer
    parameters:
      fill_value: 0.8
2:
  - name: Split Data
    task: split_train_test
    parameters:
      test_ratio: 0.2
3:
  - name: Scale Data
    task: scaler_method
    parameters:
      with_mean: True
      with_std: True
  - name: No Scaling Data
    task: passthrough_method
    parameters: {}
4:
  - name: Cross-Validation of model
    task: train_model
    parameters:
      num_folds: 5
5:
  - name: evaluate model
    task: evaluate_model
    parameters:
      num_folds: 5

Заключение

Решение Swiss Army Knife, сочетающее конфигурацию YAML, Prefect и Dask, предлагает масштабируемый и эффективный подход для оценки нескольких комбинаций конвейеров машинного обучения. Благодаря распараллеливанию процесса оценки это решение значительно сокращает время, необходимое для изучения различных комбинаций алгоритмов и этапов предварительной обработки. Благодаря гибкости конфигурации YAML, возможностям оркестровки Prefect и распределенной вычислительной мощности Dask специалисты по машинному обучению могут эффективно определить наилучший конвейер для своей конкретной задачи.
если вам нравится это решение — хлопайте в ладоши :-)