Nano Hash - криптовалюты, майнинг, программирование

Проблема с потоковой передачей файлов Spark

Я пробую простой пример потоковой передачи файлов, используя Sparkstreaming (spark-streaming_2.10, версия: 1.5.1)

public class DStreamExample {

    public static void main(final String[] args) {

        final SparkConf sparkConf = new SparkConf();
        sparkConf.setAppName("SparkJob");
        sparkConf.setMaster("local[4]"); // for local

        final JavaSparkContext sc = new JavaSparkContext(sparkConf);

        final JavaStreamingContext ssc = new JavaStreamingContext(sc,
            new Duration(2000));

        final JavaDStream<String> lines = ssc.textFileStream("/opt/test/");
        lines.print();

        ssc.start();
        ssc.awaitTermination();
    }
}

Когда я запускаю этот код в одном файле или директории, он ничего не печатает из файла, я вижу в журналах его постоянный опрос, но ничего не печатается. Я попытался переместить файл в каталог, когда эта программа работала.

Есть ли что-то, что мне не хватает? Я попытался применить функцию карты к строкам RDD, которая также не работает.


Ответы:


1

API textFileStream не предназначен для чтения существующего содержимого каталога, вместо этого он предназначен для отслеживания заданного пути файловой системы, совместимого с Hadoop, на наличие изменений, файлы должны быть записаны в отслеживаемое местоположение путем «перемещения» их из другого расположение в той же файловой системе. Короче говоря, вы подписываетесь на изменения каталога и будете получать содержимое вновь появившихся файлов в отслеживаемом расположении — в том состоянии, в котором файл(ы) появляются на момент снимка мониторинга (то есть 2000 мс в вашем случае), и любые дальнейшие обновления файлов не будут достигать потока, будут выполняться только обновления каталога (новые файлы).

Вы можете эмулировать обновления, создавая новый файл во время сеанса мониторинга:

import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import java.io.File;
import java.io.IOException;
import java.util.List;

public class DStreamExample {

public static void main(final String[] args) throws IOException {

    final SparkConf sparkConf = new SparkConf();
    sparkConf.setAppName("SparkJob");
    sparkConf.setMaster("local[4]"); // for local

    final JavaSparkContext sc = new JavaSparkContext(sparkConf);

    final JavaStreamingContext ssc = new JavaStreamingContext(sc,
            new Duration(2000));

    final JavaDStream<String> lines = ssc.textFileStream("/opt/test/");

    // spawn the thread which will create new file within the monitored directory soon
    Runnable r = () -> {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            FileUtils.write(new File("/opt/test/newfile1"), "whatever");
        } catch (IOException e) {
            e.printStackTrace();
        }
    };

    new Thread(r).start();


    lines.foreachRDD((Function<JavaRDD<String>, Void>) rdd -> {
        List<String> lines1 = rdd.collect();
        lines1.stream().forEach(l -> System.out.println(l));
        return null;
    });

    ssc.start();
    ssc.awaitTermination();
}

}

20.11.2015
  • Спасибо за ответ! Теперь, когда я изменяю содержимое файла и перехожу в каталог мониторинга, Sparkstreaming выбирает файл для обработки. 21.11.2015
  • Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..