Эрик Сюй - специалист по данным, разработчик Rails в Outbrain. Он участвовал в семинаре Insight Spark Lab в Нью-Йорке.

Мы все страдали от повторного открытия проекта машинного обучения и попыток отследить наш мыслительный процесс. Часто это похоже на джунгли, где десятки этапов разработки функций пересекаются с сумкой отрегулированных вручную моделей. Если мы не можем легко следовать нашему собственному коду, как могут другие?

Вот почему я был взволнован, когда во время Insight Spark Lab узнал о конвейерах машинного обучения (ML) Spark. Pipeline API, представленный в Spark 1.2, представляет собой высокоуровневый API для MLlib. Вдохновленный популярной реализацией в scikit-learn, концепция Pipelines состоит в том, чтобы облегчить создание, настройку и проверку практических рабочих процессов машинного обучения. Другими словами, это позволяет нам больше сосредоточиться на решении задачи машинного обучения, вместо того, чтобы тратить время на организацию кода.

Зачем нужны трубопроводы?

Обычно на исследовательских этапах задачи машинного обучения мы перебираем десятки, если не сотни, функций и комбинаций моделей. Наш мыслительный процесс может выглядеть примерно так:

Вскоре наш блокнот Jupyter наполняется спагетти-кодом, занимающим сотни ячеек. Попытка гарантировать, что наши обучающие и тестовые данные проходят идентичный процесс, управляема, но также имеет тенденцию быть утомительной и подверженной ошибкам.

Лучшее решение - заключать каждую комбинацию шагов в Pipeline. Это дает нам декларативный интерфейс, в котором легко увидеть весь рабочий процесс извлечения, преобразования и обучения модели.

Spark Pipeline определяется как последовательность этапов, и каждый этап представляет собой либо Transformer, либо Estimator. Эти этапы выполняются по порядку, и вход DataFrame преобразуется по мере прохождения каждого этапа.

Приведенный ниже код демонстрирует, как несколько Transformers и Estimators могут быть объединены для создания сложного рабочего процесса. В Insight Data Labs мне предоставили несколько источников данных размером от нескольких сотен гигабайт до терабайта. Я решил работать с набором данных отзывов Amazon. Необработанные данные состоят из обзоров ресторанов (строка) и рейтингов (целые числа), например:

В процессе разработки функций текстовые функции извлекаются из необработанных обзоров с использованием алгоритмов HashingTF и Word2Vec. Данные рейтингов представлены в двоичном формате с OneHotEncoder. Затем результаты проектирования функций объединяются с использованием VectorAssembler перед передачей в модель логистической регрессии.

from pyspark.ml import Pipeline 
from pyspark.ml.feature import * 
from pyspark.ml.classification import LogisticRegression 
# Configure pipeline stages 
tok = Tokenizer(inputCol="review", outputCol="words") 
htf = HashingTF(inputCol="words", outputCol="tf", numFeatures=200) 
w2v = Word2Vec(inputCol="review", outputCol="w2v") 
ohe = OneHotEncoder(inputCol="rating", outputCol="rc") 
va = VectorAssembler(inputCols=["tf", "w2v", "rc"], outputCol="features") 
lr = LogisticRegression(maxIter=10, regParam=0.01) 
# Build the pipeline 
pipeline = Pipeline(stages=[tok, htf, w2v, ohe, va, lr]) 
# Fit the pipeline 
model = pipeline.fit(train_df) 
# Make a prediction 
prediction = model.transform(test_df)

Эта диаграмма DAG визуализирует структуру Pipeline и все его этапы. Синие пятиугольники представляют Transformers, желтый ромб представляет Estimator, а прямоугольники представляют DataFrames с текущими именами столбцов, проходящими через Pipeline.

Изготовленные на заказ трансформаторы

Сообщество Spark быстро добавляет новые преобразователи функций и алгоритмы для API конвейера с каждым выпуском версии. Но что, если бы мы хотели сделать что-то нестандартное, например, подсчитать количество смайликов в блоке текста? Оказывается, не так уж и сложно расширить класс Transformer и создать свои собственные преобразователи.

Основные правила, которым нужно следовать, заключаются в том, что Transformer должен:
1. реализовать метод transform
2. указать inputCol и outputCol
3. принять DataFrame в качестве ввода и вернуть DataFrame в качестве вывода

Следующий фрагмент кода демонстрирует простую реализацию подсчета слов Transformer.

from pyspark.ml.util import keyword_only 
from pyspark.ml.pipeline import Transformer 
from pyspark.ml.param.shared import HasInputCol, HasOutputCol 
# Create a custom word count transformer class 
class MyWordCounter(Transformer, HasInputCol, HasOutputCol): 
    @keyword_only 
    def __init__(self, inputCol=None, outputCol=None): 
        super(WordCounter, self).__init__() 
        kwargs = self.__init__._input_kwargs 
        self.setParams(**kwargs) 
    @keyword_only 
    def setParams(self, inputCol=None, outputCol=None): 
        kwargs = self.setParams._input_kwargs 
        return self._set(**kwargs) 
    def _transform(self, dataset): 
        out_col = self.getOutputCol() 
        in_col = dataset[self.getInputCol()] 
    # Define transformer 
    logic def f(s): 
        return len(s.split(' ')) 
    t = LongType() 
    return dataset.withColumn(out_col, udf(f, t)(in_col)) 
# Instantiate the new word count transformer 
wc = MyWordCounter(inputCol="review", outputCol="wc")

Теперь мы можем относиться к MyWordCounter как к любому другому Transformer и добавлять его как сцену к нашему Pipeline.

Нижняя линия

Pipelines - это простой и эффективный способ управлять сложными рабочими процессами машинного обучения, не вырывая себе волос. Его мощность становится еще более заметной, когда мы дойдем до перекрестной проверки настройки гиперпараметров. В целом, Pipeline API - важный шаг к тому, чтобы машинное обучение стало масштабируемым, простым и приятным.

Заинтересованы в том, чтобы перейти к карьере инженера данных?
Узнайте больше о программе Insight Data Engineering Fellows Program в Нью-Йорке и Кремниевой долине, подайте заявку сегодня или подпишитесь на обновления программы

Первоначально опубликовано на blog.insightdatalabs.com 22 марта 2016 г.