Nano Hash - криптовалюты, майнинг, программирование

Почему приемник Flink FileSystem разбивается на несколько файлов

Я хочу использовать Flink для чтения из входного файла, выполнения некоторой агрегации и записи результата в выходной файл. Работа находится в пакетном режиме. См. wordcount.py ниже:

from pyflink.table import EnvironmentSettings, BatchTableEnvironment

# https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html

env_settings = EnvironmentSettings.new_instance().in_batch_mode().build()
table_env = BatchTableEnvironment.create(environment_settings=env_settings)

my_source_ddl = """
    create table mySource (
        word VARCHAR
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/input'
    )
"""

my_sink_ddl = """
    create table mySink (
        word VARCHAR,
        `count` BIGINT
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/output'
    )
"""

transform_dml = """
INSERT INTO mySink
SELECT word, COUNT(1) FROM mySource GROUP BY word
"""

table_env.execute_sql(my_source_ddl)
table_env.execute_sql(my_sink_ddl)
table_env.execute_sql(transform_dml).wait()

# before run: echo -e  "flink\npyflink\nflink" > /tmp/input
# after run: cat /tmp/output

Перед запуском python wordcount.py я запускаю echo -e "flink\npyflink\nflink" > /tmp/input, чтобы убедиться, что данные существуют в /tmp/input. Однако после запуска в /tmp/output есть два файла:

> ls /tmp/output
part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0 part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0
> cat /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0
pyflink,1
> cat /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0
flink,2

Пока я ожидаю один файл /tmp/output с содержимым:

pyflink,1
flink,2

На самом деле, я получил указанную выше программу Python, изменив приведенную ниже, которая создает один файл /tmp/output.

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit

# https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

t_env.connect(FileSystem().path('/tmp/input')) \
    .with_format(OldCsv()
                 .field('word', DataTypes.STRING())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
    .with_format(OldCsv()
                 .field_delimiter('\t')
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
   .select(tab.word, lit(1).count) \
   .execute_insert('mySink').wait()

Запуск этой версии создаст файл /tmp/output. Обратите внимание, что это не идет с разделителем запятой.

> cat /tmp/output
flink   2
pyflink 1

Есть идеи, почему? Спасибо!

15.03.2021

Ответы:


1

В первый раз, когда вы запустили его без указания параллелизма, вы получили параллелизм по умолчанию, который больше 1 (вероятно, 4 или 8, в зависимости от того, сколько ядер у вашего компьютера).

Flink предназначен для масштабирования, и для этого параллельные экземпляры оператора, такие как приемник, отделены друг от друга. Представьте, например, большой кластер с сотнями или тысячами узлов. Чтобы это работало хорошо, каждый экземпляр должен писать в свой собственный файл.

Запятые были заменены на табуляции, потому что вы указали .field_delimiter('\t').

15.03.2021
  • Привет, Дэвид, да, установка параллелизма по умолчанию на 1 (table_env.get_config().get_configuration().set_string("parallelism.default", "1")) уменьшает количество выходных файлов до 1, а содержимое ожидается, но оно по-прежнему имеет длинное имя и находится в /tmp/output, а не в /tmp /output сам файл. Интересно, почему. Спасибо 15.03.2021
  • Интерфейс предназначен для поддержки как пакетных, так и потоковых конвейеров. Для поддержки вариантов использования потоковой передачи необходимо иметь подход к именованию выходных файлов, которые могут работать неограниченно долго, т. е. в течение многих лет, с очень большими объемами, например, петабайтами в день. 15.03.2021
  • Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..