Nano Hash - криптовалюты, майнинг, программирование

Redis в Spark: задача не сериализуема

Мы используем Redis в Spark для кэширования наших пар ключ-значение. Это код:

import com.redis.RedisClient
val r = new RedisClient("192.168.1.101", 6379)
val perhit = perhitFile.map(x => {
    val arr = x.split(" ")
    val readId = arr(0).toInt
    val refId = arr(1).toInt
    val start = arr(2).toInt
    val end = arr(3).toInt
    val refStr = r.hmget("refStr", refId).get(refId).split(",")(1)
    val readStr = r.hmget("readStr", readId).get(readId)
    val realend = if(end > refStr.length - 1) refStr.length - 1 else end
    val refOneStr = refStr.substring(start, realend)
      (readStr, refOneStr, refId, start, realend, readId)
 })

Но компилятор дал мне такой отзыв:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
    at org.apache.spark.rdd.RDD.map(RDD.scala:270)
    at com.ynu.App$.main(App.scala:511)
    at com.ynu.App.main(App.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: com.redis.RedisClient
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
    ... 12 more

Может ли кто-нибудь сказать мне, как сериализовать данные, полученные от Redis. Большое спасибо.

18.01.2015

Ответы:


1

В Spark функции на RDDs (например, map здесь) сериализуются и отправляются исполнителям для обработки. Это означает, что все элементы, содержащиеся в этих операциях, должны быть сериализуемыми.

Соединение Redis здесь не сериализуемо, поскольку оно открывает TCP-соединения с целевой БД, которые привязаны к машине, на которой оно создано.

Решение состоит в том, чтобы создать эти соединения на исполнителях в локальном контексте выполнения. Есть несколько способов сделать это. Два, которые приходят на ум:

  • rdd.mapPartitions: позволяет обрабатывать сразу весь раздел и, следовательно, амортизировать стоимость создания подключений)
  • Диспетчеры соединений Singleton: создайте соединение один раз для каждого исполнителя.

mapPartitions проще, так как для этого требуется лишь небольшое изменение в структуре программы:

val perhit = perhitFile.mapPartitions{partition => 
    val r = new RedisClient("192.168.1.101", 6379) // create the connection in the context of the mapPartition operation
    val res = partition.map{ x =>
        ...
        val refStr = r.hmget(...) // use r to process the local data
    }
    r.close // take care of resources
    res
}

Диспетчер одноэлементных соединений можно смоделировать с помощью объекта, который содержит ленивую ссылку на соединение (примечание: изменяемая ссылка также будет работать).

object RedisConnection extends Serializable {
   lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)
}

Затем этот объект можно использовать для создания экземпляра 1 соединения для каждой рабочей JVM и использовать как объект Serializable при закрытии операции.

val perhit = perhitFile.map{x => 
    val param = f(x)
    val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data
    }
}

Преимущество использования одноэлементного объекта заключается в меньших накладных расходах, поскольку JVM создает соединения только один раз (в отличие от одного на раздел RDD).

Есть и недостатки:

  • очистка соединений сложна (крючок / таймеры выключения)
  • необходимо обеспечить потокобезопасность общих ресурсов

(*) код предоставлен для иллюстрации. Не компилируется и не тестируется.

19.01.2015
  • Спасибо за ответ! Я использую эту библиотеку github.com/debasishg/scala-redis. У него нет метода с именем close, вместо этого он является разъединением. Я понятия не имею, работает ли он. Не могли бы вы сказать мне, какую библиотеку вы используете сейчас для работы с данными Redis? 20.01.2015
  • Плюс 1 за решение Singleton. Можете ли вы привести пример того, как управлять закрытием соединения? 04.12.2015
  • @Sohaib, учитывая, что это объект, привязанный к виртуальной машине, вам необходимо зарегистрировать перехватчик выключения, чтобы полностью закрыть соединения. 11.12.2015

  • 2

    Вы пытаетесь сериализовать клиент. У вас есть один RedisClient, r, который вы пытаетесь использовать внутри map, который будет выполняться на разных узлах кластера. Либо получите нужные данные из Redis отдельно перед выполнением задачи кластера, либо создайте клиент отдельно для каждой задачи кластера внутри вашего блока map (возможно, используя mapPartitions, а не map, как создание нового Redis). клиент для каждой отдельной строки, вероятно, плохая идея).

    18.01.2015
  • Спасибо за ответ, но не могли бы вы сказать мне, как использовать mapPartitions в этой ситуации? 18.01.2015
  • Вызовите mapPartitions, передавая блок, который принимает итерируемый объект (как видно из подписи mapPartitions), создает RedisClient внутри блока, а затем использует его для map Iterable, как вы делали. Дело в том, что RedisClient создается внутри обработки для одного раздела. Что вы пробовали и где застряли? 19.01.2015
  • Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..