Nano Hash - криптовалюты, майнинг, программирование

Потоковая передача файлов GCS с использованием потока данных (apachebeam python)

У меня есть GCS, где я получаю файл каждую минуту. Я создал поток потоковых данных, используя apache beam python sdk. I создал тему pub / sub для входного ведра gcs и вывода ведра gcs. Мой поток данных передается, но мой вывод не сохраняется в выходном ведре. это мой следующий код,

from __future__ import absolute_import

    import os
    import logging
    import argparse
    from google.cloud import language
    from google.cloud.language import enums
    from google.cloud.language import types
    from datetime import datetime
    import apache_beam as beam 
    from apache_beam.options.pipeline_options import PipelineOptions
    from apache_beam.options.pipeline_options import SetupOptions
    from apache_beam.options.pipeline_options import GoogleCloudOptions
    from apache_beam.options.pipeline_options import StandardOptions
    from apache_beam.io.textio import ReadFromText, WriteToText

    #dataflow_options = ['--project=****','--job_name=*****','--temp_location=gs://*****','--setup_file=./setup.py']
    #dataflow_options.append('--staging_location=gs://*****')
    #dataflow_options.append('--requirements_file ./requirements.txt')
    #options=PipelineOptions(dataflow_options)
    #gcloud_options=options.view_as(GoogleCloudOptions)


    # Dataflow runner
    #options.view_as(StandardOptions).runner = 'DataflowRunner'
    #options.view_as(SetupOptions).save_main_session = True

    def run(argv=None):
        """Build and run the pipeline."""
        parser = argparse.ArgumentParser()
        parser.add_argument(
            '--output_topic', required=True,
            help=('Output PubSub topic of the form '
                '"projects/***********".'))
        group = parser.add_mutually_exclusive_group(required=True)
        group.add_argument(
            '--input_topic',
            help=('Input PubSub topic of the form '
                '"projects/************".'))
        group.add_argument(
            '--input_subscription',
            help=('Input PubSub subscription of the form '
                '"projects/***********."'))
        known_args, pipeline_args = parser.parse_known_args(argv)

      # We use the save_main_session option because one or more DoFn's in this
      # workflow rely on global context (e.g., a module imported at module level).
        pipeline_options = PipelineOptions(pipeline_args)
        pipeline_options.view_as(SetupOptions).save_main_session = True
        pipeline_options.view_as(StandardOptions).streaming = True
        p = beam.Pipeline(options=pipeline_options)


        # Read from PubSub into a PCollection.
        if known_args.input_subscription:
            messages = (p
                        | beam.io.ReadFromPubSub(
                            subscription=known_args.input_subscription)
                        .with_output_types(bytes))
        else:
            messages = (p
                        | beam.io.ReadFromPubSub(topic=known_args.input_topic)
                        .with_output_types(bytes))

        lines = messages | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))

        class Split(beam.DoFn):
            def process(self,element):
                element = element.rstrip("\n").encode('utf-8')
                text = element.split(',') 
                result = []
                for i in range(len(text)):
                    dat = text[i]
                    #print(dat)
                    client = language.LanguageServiceClient()
                    document = types.Document(content=dat,type=enums.Document.Type.PLAIN_TEXT)
                    sent_analysis = client.analyze_sentiment(document=document)
                    sentiment = sent_analysis.document_sentiment
                    data = [
                    (dat,sentiment.score)
                    ] 
                    result.append(data)
                return result

        class WriteToCSV(beam.DoFn):
            def process(self, element):
                return [
                    "{},{}".format(
                        element[0][0],
                        element[0][1]
                    )
                ]

        Transform = (lines
                    | 'split' >> beam.ParDo(Split())
                    | beam.io.WriteToPubSub(known_args.output_topic)
        )
        result = p.run()
        result.wait_until_finish()

    if __name__ == '__main__':
      logging.getLogger().setLevel(logging.INFO)
      run()

что я делаю не так, пожалуйста, объясните мне это.


Ответы:


1

WriteToPubSub записывает данные в тему PubSub, а не в корзину GCS. Что вы хотите сделать, это, возможно, использовать WriteToText или DoFn, который записывает ваши данные в корзину с помощью apache_beam.io.filesystems.

Дополнительное замечание: не похоже, что ваше WriteToCsv преобразование где-либо используется.

07.03.2019
  • спасибо за обратную связь, но я подумал, что я создал тему корзины, в которой я получаю входящие файлы. Поэтому, когда я использую ReadFromPubSub, что именно он делает? выводится ли это имя файла корзины? если да, то могу ли я использовать этот вывод pubsub и вводить как gs: // bucketname / outputof pubsub? или readfrompubsub напрямую передает новые файлы один за другим, и мне не нужно указывать имя входного файла? пожалуйста, помогите сэр 08.03.2019
  • 1) Я использовал apache_beam.io.WriteToText для записи потоковых данных (из ReadFromPubSub) в GCS .. но потоковые сообщения просто остаются во временной папке (в целевом сегменте). Пока я не опорожняю конвейер, и только тогда я увижу количество осколков с фактическими данными, появляющимися в желаемом месте назначения. Есть ли какие-нибудь известные проблемы? 2) Также я хотел бы уточнить, в GCS записывается только оконный поток? каково было бы поведение, если бы я записывал каждый опубликованный поток сообщений (без окон) в GCS? каждое сообщение создает один файл? 08.10.2019
  • Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..