Я хочу использовать Flink для чтения из входного файла, выполнения некоторой агрегации и записи результата в выходной файл. Работа находится в пакетном режиме. См. wordcount.py
ниже:
from pyflink.table import EnvironmentSettings, BatchTableEnvironment
# https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html
env_settings = EnvironmentSettings.new_instance().in_batch_mode().build()
table_env = BatchTableEnvironment.create(environment_settings=env_settings)
my_source_ddl = """
create table mySource (
word VARCHAR
) with (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '/tmp/input'
)
"""
my_sink_ddl = """
create table mySink (
word VARCHAR,
`count` BIGINT
) with (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '/tmp/output'
)
"""
transform_dml = """
INSERT INTO mySink
SELECT word, COUNT(1) FROM mySource GROUP BY word
"""
table_env.execute_sql(my_source_ddl)
table_env.execute_sql(my_sink_ddl)
table_env.execute_sql(transform_dml).wait()
# before run: echo -e "flink\npyflink\nflink" > /tmp/input
# after run: cat /tmp/output
Перед запуском python wordcount.py
я запускаю echo -e "flink\npyflink\nflink" > /tmp/input
, чтобы убедиться, что данные существуют в /tmp/input. Однако после запуска в /tmp/output есть два файла:
> ls /tmp/output
part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0 part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0
> cat /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0
pyflink,1
> cat /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0
flink,2
Пока я ожидаю один файл /tmp/output с содержимым:
pyflink,1
flink,2
На самом деле, я получил указанную выше программу Python, изменив приведенную ниже, которая создает один файл /tmp/output.
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit
# https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html
exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)
t_env.connect(FileSystem().path('/tmp/input')) \
.with_format(OldCsv()
.field('word', DataTypes.STRING())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())) \
.create_temporary_table('mySource')
t_env.connect(FileSystem().path('/tmp/output')) \
.with_format(OldCsv()
.field_delimiter('\t')
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.create_temporary_table('mySink')
tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
.select(tab.word, lit(1).count) \
.execute_insert('mySink').wait()
Запуск этой версии создаст файл /tmp/output. Обратите внимание, что это не идет с разделителем запятой.
> cat /tmp/output
flink 2
pyflink 1
Есть идеи, почему? Спасибо!
table_env.get_config().get_configuration().set_string("parallelism.default", "1")
) уменьшает количество выходных файлов до 1, а содержимое ожидается, но оно по-прежнему имеет длинное имя и находится в /tmp/output, а не в /tmp /output сам файл. Интересно, почему. Спасибо 15.03.2021