Nano Hash - криптовалюты, майнинг, программирование

искра scala: удалить последовательные (по дате) дубликаты записей из фрейма данных

Вопрос по работе с датафреймами, хочу удалить полностью повторяющиеся записи за исключением некоторых полей (дат). Я пытался использовать windowFunction (WindowSpec) как:

val wFromDupl: WindowSpec = Window
  .partitionBy(comparateFields: _*)
  .orderBy(asc(orderField))

В переменной comparateFields я храню все поля, которые мне нужно проверить (в примере это будут DESC1 и DESC2), чтобы устранить дубликаты, следуя логике, согласно которой, если есть повторяющаяся запись, мы отбрасываем те, у которых более высокая дата.

В переменной orderField я просто сохраняю поле Effective_date.

Поэтому, применяя оконную функцию, я вычисляю временный столбец, назначая наименьшую дату всем дублирующимся записям, а затем фильтрую dataFrame следующим образом:

 val dfFinal: DataFrame = dfInicial
    .withColumn("w_eff_date", min(col("effective_date")).over(wFromDupl))
  .filter(col("effective_date") === col("w_eff_date")) 
  .drop("w_eff_date")
  .distinct()
  .withColumn("effective_end_date", lead(orderField, 1, "9999-12-31").over(w))

Для следующего случая он работает правильно:

KEY EFFECTIVE_DATE  DESC 1  DESC 2  W_EFF_DATE (tmp)
E2  2000            A       B       2000
E2  2001            A       B       2000
E2  2002            AA      B       2002

Код удалит вторую запись:

E2  2001            A       B       2000

Но логика должна применяться для ПОСЛЕДОВАТЕЛЬНЫХ записей (по дате), например, для следующего случая, по мере реализации кода, мы удаляем третью запись (DESC1 и DESC2 одинаковы, а дата min eff 2000) , но мы не хотим этого, потому что у нас есть (по eff_date) запись в середине (2001 AA B), поэтому мы хотим сохранить 3 записи

KEY EFFECTIVE_DATE  DESC1   DESC2   W_EFF_DATE (tmp)
E1     2000         A       B       2000
E1     2001         AA      B       2001
E1     2002         A       B       2000

Любые советы по этому поводу? Спасибо вам всем!


Ответы:


1

Один из подходов — использовать when/otherwise вместе с оконной функцией lag, чтобы определить, какие строки сохранить, как показано ниже:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val df = Seq(
  ("E1", "2000", "A",  "B"),
  ("E1", "2001", "AA", "B"),
  ("E1", "2002", "A",  "B"),
  ("E1", "2003", "A",  "B"),
  ("E1", "2004", "A",  "B"),
  ("E2", "2000", "C",  "D"),
  ("E2", "2001", "C",  "D"),
  ("E2", "2002", "CC", "D"),
  ("E2", "2003", "C",  "D")
).toDF("key", "effective_date", "desc1", "desc2")

val compareCols = List("desc1", "desc2")

val win1 = Window.partitionBy("key").orderBy("effective_date")

val df2 = df.
  withColumn("compCols", struct(compareCols.map(col): _*)).
  withColumn("rowNum", row_number.over(win1)).
  withColumn("toKeep",
    when($"rowNum" === 1 || $"compCols" =!= lag($"compCols", 1).over(win1), true).
      otherwise(false)
  )

// +---+--------------+-----+-----+--------+------+------+
// |key|effective_date|desc1|desc2|compCols|rowNum|toKeep|
// +---+--------------+-----+-----+--------+------+------+
// | E1|          2000|    A|    B|   [A,B]|     1|  true|
// | E1|          2001|   AA|    B|  [AA,B]|     2|  true|
// | E1|          2002|    A|    B|   [A,B]|     3|  true|
// | E1|          2003|    A|    B|   [A,B]|     4| false|
// | E1|          2004|    A|    B|   [A,B]|     5| false|
// | E2|          2000|    C|    D|   [C,D]|     1|  true|
// | E2|          2001|    C|    D|   [C,D]|     2| false|
// | E2|          2002|   CC|    D|  [CC,D]|     3|  true|
// | E2|          2003|    C|    D|   [C,D]|     4|  true|
// +---+--------------+-----+-----+--------+------+------+

df2.where($"toKeep").select(df.columns.map(col): _*).
  show
// +---+--------------+-----+-----+
// |key|effective_date|desc1|desc2|
// +---+--------------+-----+-----+
// | E1|          2000|    A|    B|
// | E1|          2001|   AA|    B|
// | E1|          2002|    A|    B|
// | E2|          2000|    C|    D|
// | E2|          2002|   CC|    D|
// | E2|          2003|    C|    D|
// +---+--------------+-----+-----+
01.10.2018
  • Большое спасибо, LeoC, это простое и хорошее решение, оно работает правильно с полумиллионом записей в моей локальной среде. Единственное изменение, которое мне пришлось внести, это то, что поле структуры вышло за пределы диапазона (с моими реальными данными). Но я решил это с помощью массива: 02.10.2018
  • Рад, что вы можете найти решение для удовлетворения конкретных потребностей. 02.10.2018
  • Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..