Я использую Flink на Amazon EMR и хочу передавать результаты моего конвейера в корзину s3.
Я использую версию Flink => 1.11.2
Это фрагмент кода, показывающий, как переменная кода выглядит прямо сейчас:
val outputPath = new Path("s3://test/flinkStreamTest/failureLogs/dt=2021-04-15/")
val sink: StreamingFileSink[String] = StreamingFileSink
.forRowFormat(outputPath, new SimpleStringEncoder[String]("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024 * 1024 * 1024)
.build()
)
.build()
val enrichedStream = AsyncDataStream
.unorderedWait(
resConsumer,
new AsyncElasticRequest(elasticIndexName, elasticHost, elasticPort),
asyncTimeOut.toInt, TimeUnit.MILLISECONDS,
asyncCapacity.toInt
) // this is my pipeline result. it returns a string
enrichedStream.addSink(sink)
env.execute("run pipeline") // this is just to run the pipeline
И это ошибка, которую я сейчас получаю;
java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS
at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:61)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:260)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:396)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Я поместил файл jar s3-fs-hadoop в папку plugins/s3-fs-hadoop. У меня также есть такая же банка s3-fs-hadoop в /usr/lib/flink/lib на тот случай, если flink также будет искать банку s3-fs-hadoop в этой папке. Пожалуйста, кто-нибудь может мне помочь. Я искал и искал, но не могу решить эту проблему.
Спасибо