Я пытаюсь читать файлы Avro с S3 и, как показано в этом документация по искре Я могу ее нормально прочитать. Мои файлы, как показано ниже, эти файлы состоят из 5000 записей каждый.
s3a://bucket/part-0.avro
s3a://bucket/part-1.avro
s3a://bucket/part-2.avro
val byteRDD: RDD[Array[Byte]] = sc.binaryFiles(s"$s3URL/*.avro").map{ case(file, pds) => {
val dis = pds.open()
val len = dis.available()
val buf = Array.ofDim[Byte](len)
pds.open().readFully(buf)
buf
}}
import org.apache.avro.io.DecoderFactory
val deserialisedAvroRDD = byteRDD.map(record => {
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(schemaJson)
val datumReader = new GenericDatumReader[GenericRecord](schema)
val decoder = DecoderFactory.get.binaryDecoder(record, null)
var datum: GenericRecord = null
while (!decoder.isEnd()) {
datum = datumReader.read(datum, decoder)
}
datum
}
)
deserialisedAvroRDD.count() ---> 3
Я десериализую сообщения binaryAvro для создания GenericRecords, и я ожидал, что десериализованный RDD будет иметь 15 000 записей, поскольку каждый файл .avro имеет 5 000 записей, однако после десериализации я получаю только 3 записи. Может ли кто-нибудь помочь в поиске проблемы с моим кодом? Как я могу сериализовать одну запись за раз.