Построение динамических конвейеров ML с конфигурацией Prefect, Dask и YAML и
Введение
В области машинного обучения (ML) поиск оптимального сочетания алгоритмов и шагов предварительной обработки для данной задачи имеет решающее значение. Однако оценка нескольких комбинаций конвейеров может потребовать много времени и больших вычислительных ресурсов. В этой статье мы представляем решение Swiss Army Knife, которое использует конфигурацию YAML, Prefect и Dask для параллельной оценки нескольких комбинаций конвейеров машинного обучения. Это решение предлагает масштабируемый и эффективный подход для изучения широкого диапазона комбинаций алгоритмов и шагов предварительной обработки.
Конфигурация YAML для комбинаций конвейеров
YAML (еще один язык разметки) предоставляет гибкий и удобочитаемый формат для указания комбинаций конвейеров. С конфигурацией YAML мы можем определять различные алгоритмы, этапы предварительной обработки и их параметры структурированным образом. Это позволяет нам легко модифицировать и экспериментировать с различными комбинациями конвейеров без изменения базового кода.
Префект и Даск для параллельной оценки
Prefect, мощная система управления рабочими процессами, в сочетании с Dask, инфраструктурой параллельных вычислений, позволяет нам параллельно выполнять комбинации конвейеров машинного обучения. Prefect предоставляет интуитивно понятный интерфейс для определения и организации сложных рабочих процессов, а Dask позволяет выполнять распределенные вычисления, эффективно используя доступные ресурсы. Вместе Prefect и Dask повышают масштабируемость и скорость оценки комбинаций нескольких конвейеров.
Внедрение швейцарского армейского ножа
Для реализации решения Swiss Army Knife мы разработали фрагмент кода Python, который включает конфигурацию YAML, Prefect и Dask. Фрагмент кода включает определения задач для каждого этапа конвейера машинного обучения, таких как загрузка данных, предварительная обработка, обучение модели и оценка.
Основной функционал выглядит следующим образом:
- Чтение файлов YAML. Решение считывает файлы YAML, содержащие различные комбинации алгоритмов и этапов предварительной обработки.
- Генерация комбинаций: Решение генерирует все возможные комбинации алгоритмов и шагов препроцессинга, указанные в файлах YAML.
в этом случае генерируются и работают параллельно 2 пайплайна:
один со скалером и один без него. - Параллельная оценка. Используя возможности параллельных вычислений Dask, решение оценивает каждую комбинацию конвейеров параллельно. Это значительно ускоряет процесс оценки.
- Агрегирование результатов. Решение собирает результаты оценки и определяет наилучшую комбинацию конвейеров на основе указанной метрики.
Вот фрагмент кода:
# Import necessary libraries and modules import itertools import yaml from collections import OrderedDict from prefect import flow, task, unmapped from sklearn.datasets import load_iris from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler from sklearn.linear_model import LogisticRegression from sklearn.metrics import accuracy_score from pathlib import Path from sklearn.calibration import CalibratedClassifierCV import os from typing import Any from prefect_dask.task_runners import DaskTaskRunner import pandas as pd def load_data(shared_pipeline_memory, **parameters): #currently its a dummy param, cause we are loading the iris dataset #data_path = parameters['data_path'] #print(data_path) iris = load_iris(as_frame=True) X = iris.data y = iris.target shared_pipeline_memory['X'] = X shared_pipeline_memory['y'] = y return shared_pipeline_memory def imputer(shared_pipeline_memory, **parameters): fill_value = parameters.get('fill_value') shared_pipeline_memory['X'].fillna(fill_value, axis=1, inplace=True) return shared_pipeline_memory def split_train_test(shared_pipeline_memory, **parameters): test_ratio = parameters.get('test_ratio') X_train, X_test, y_train, y_test = train_test_split(shared_pipeline_memory.get('X'), shared_pipeline_memory.get('y'), test_size=test_ratio, random_state=42) shared_pipeline_memory['X_train'] = X_train shared_pipeline_memory['X_test'] = X_test shared_pipeline_memory['y_train'] = y_train shared_pipeline_memory['y_test'] = y_test return shared_pipeline_memory def scaler_method(shared_pipeline_memory, **parameters): with_mean = parameters.get('with_mean') with_std = parameters.get('with_std') scaler = StandardScaler(with_mean=with_mean,with_std=with_std) X_train_scaled = scaler.fit_transform(shared_pipeline_memory.get('X_train')) X_test_scaled = scaler.transform(shared_pipeline_memory.get('X_test')) shared_pipeline_memory['X_train'] = X_train_scaled shared_pipeline_memory['X_test'] = X_test_scaled return shared_pipeline_memory def passthrough_method(shared_pipeline_memory, **parameters): return shared_pipeline_memory def train_model(shared_pipeline_memory, **parameters): model = CalibratedClassifierCV(LogisticRegression()) model.fit(shared_pipeline_memory.get('X_train'), shared_pipeline_memory.get('y_train')) shared_pipeline_memory['model'] = model return shared_pipeline_memory def evaluate_model(shared_pipeline_memory, **parameters): model = shared_pipeline_memory.get('model') X_test = shared_pipeline_memory.get('X_test') y_test = shared_pipeline_memory.get('y_test') y_pred = model.predict(X_test) accuracy = accuracy_score(y_test, y_pred) return accuracy def main_flow(): # List of YAML files files = ["train.yaml"] files = Path(os.path.dirname(__file__)+"/").glob('train.yaml') for file in files: with open(file, 'r') as stream: try: inputdict = OrderedDict(yaml.safe_load(stream)) except BaseException as exc: print(exc) # Extract steps from the current YAML file steps = [inputdict[key] for key in inputdict] combinations = list(itertools.product(*steps)) metricHolder = {} best_pipeline = distributed_flow(combinations, metricHolder) print("printing best pipeline and it's metric") print(best_pipeline) @flow(task_runner=DaskTaskRunner()) def distributed_flow(combinations , metricHolder): perfect_future = run_pipeline_logic.map(combinations, unmapped(metricHolder),unmapped(globals())) pipelines_overall_results = pd.DataFrame([x.result() for x in perfect_future], columns=['pipeline', 'accuracy']) res = pipelines_overall_results.sort_values('accuracy', ascending=False) res = res.reset_index(drop=True) return [res.head(1)] @task(name="train-model") def run_pipeline_logic(combination: Any, metricHolder,globals_ctx): return train_single_pipe(combination, metricHolder,globals_ctx) def train_single_pipe(config: Any,metricHolder,globals_ctx): result = None shared_pipeline_memory = {} for step in config: task_to_run = globals_ctx.get(step.get('task')) parameters = step.get('parameters') shared_pipeline_memory = task_to_run(shared_pipeline_memory, **parameters) print(f"flow : {config} has accuracy : {shared_pipeline_memory}") return config , shared_pipeline_memory if __name__ == "__main__": main_flow()
файл определения train.yaml:
0: - name: Load Data task: load_data parameters: data_path: path/to/raw_data.csv 1: - name: Impute Data task: imputer parameters: fill_value: 0.8 2: - name: Split Data task: split_train_test parameters: test_ratio: 0.2 3: - name: Scale Data task: scaler_method parameters: with_mean: True with_std: True - name: No Scaling Data task: passthrough_method parameters: {} 4: - name: Cross-Validation of model task: train_model parameters: num_folds: 5 5: - name: evaluate model task: evaluate_model parameters: num_folds: 5
Заключение
Решение Swiss Army Knife, сочетающее конфигурацию YAML, Prefect и Dask, предлагает масштабируемый и эффективный подход для оценки нескольких комбинаций конвейеров машинного обучения. Благодаря распараллеливанию процесса оценки это решение значительно сокращает время, необходимое для изучения различных комбинаций алгоритмов и этапов предварительной обработки. Благодаря гибкости конфигурации YAML, возможностям оркестровки Prefect и распределенной вычислительной мощности Dask специалисты по машинному обучению могут эффективно определить наилучший конвейер для своей конкретной задачи.
если вам нравится это решение — хлопайте в ладоши :-)