Nano Hash - криптовалюты, майнинг, программирование

Потоковая передача данных с flink на S3

Я использую Flink на Amazon EMR и хочу передавать результаты моего конвейера в корзину s3.

Я использую версию Flink => 1.11.2

Это фрагмент кода, показывающий, как переменная кода выглядит прямо сейчас:

val outputPath = new Path("s3://test/flinkStreamTest/failureLogs/dt=2021-04-15/")

val sink: StreamingFileSink[String] = StreamingFileSink
      .forRowFormat(outputPath, new SimpleStringEncoder[String]("UTF-8"))
      .withRollingPolicy(
        DefaultRollingPolicy.builder()
          .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
          .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
          .withMaxPartSize(1024 * 1024 * 1024)
          .build()
      )
      .build()

val enrichedStream = AsyncDataStream
      .unorderedWait(
        resConsumer,
        new AsyncElasticRequest(elasticIndexName, elasticHost, elasticPort),
        asyncTimeOut.toInt, TimeUnit.MILLISECONDS,
        asyncCapacity.toInt
      ) // this is my pipeline result. it returns a string

enrichedStream.addSink(sink)
   
env.execute("run pipeline") // this is just to run the pipeline

И это ошибка, которую я сейчас получаю;

java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS
    at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:61)
    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
    at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:260)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:396)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)

Я поместил файл jar s3-fs-hadoop в папку plugins/s3-fs-hadoop. У меня также есть такая же банка s3-fs-hadoop в /usr/lib/flink/lib на тот случай, если flink также будет искать банку s3-fs-hadoop в этой папке. Пожалуйста, кто-нибудь может мне помочь. Я искал и искал, но не могу решить эту проблему.

Спасибо


Ответы:


1

Я понял. Мне нужно было перезапустить все долго работающее приложение flink (не перезапускать задание). Также пришлось удалить банку s3-fs-hadoop, которую я поместил в каталог /usr/lib/flink/lib, но сохранил копию банки s3-fs-hadoop в папке plugins/s3-fs-hadoop.

12.05.2021
Новые материалы

Кластеризация: более глубокий взгляд
Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

Как написать эффективное резюме
Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

Частный метод Python: улучшение инкапсуляции и безопасности
Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

Как я автоматизирую тестирование с помощью Jest
Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..