原文:towardsdatascience.com/apache-beam-data-processing-data-pipelines-dataflow-and-flex-templates-2902224aabf3

在这篇文章中,我们将探索 Apache Beam,从简单的管道到更复杂的管道,使用 GCP Dataflow。让我们了解PTransformPCollectionGroupByKey和 Dataflow Flex Template 的含义

https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/47cc88eac13f9e443e204944010ce405.png

图片由 Faruk KaymakUnsplash 提供

Apache Beam 简介

毫无疑问,在安全的环境中处理数据、创建特征、移动数据以及以稳定和计算高效的方式执行所有这些操作,对于当今所有的 AI 任务都极其相关。在很久以前,Google 开始开发一个开源项目,以启动批处理和流数据处理操作,名为 Beam。随后,Apache 软件基金会开始为此项目做出贡献,将 Apache Beam 扩展到更大的规模。

Apache Beam 的相关关键在于其灵活性,使其成为构建数据处理管道的最佳编程 SDK 之一。我认为 Apache Beam 中有 4 个主要概念,使其成为无价的数据工具:

  • 批处理/流处理统一模型:Beam 是一个统一的编程模型,也就是说,使用相同的 Beam 代码,您可以决定是批量处理数据还是流处理数据,并且可以将管道用作其他新处理单元的模板。Beam 可以自动摄取连续的数据流或对给定批次的数据执行特定操作。

  • 并行处理:高效且可扩展的数据处理核心从数据处理管道的并行执行开始,将工作负载分配到多个“工作者”上——工作者可以被视为一个节点。并行执行的关键概念被称为“ParDo转换”,它接受一个处理单个元素的函数,并在多个工作者上并发应用。这个实现的好处是您无需担心如何分割数据或创建批量加载器。Apache Beam 会为您完成所有这些工作。

  • 数据管道:鉴于上述两个方面,可以通过几行代码轻松创建数据管道,从数据摄取到输出结果。

Python、Java 以及一个简单的入门示例

Apache Beam SDK 以两种语言存在:Python 和 Java。Java SDK 自项目开始以来一直存在,它保证了最佳的可扩展性、功能丰富性和来自社区的强大支持,用于编写数据管道。Python SDK 提供与 Java 对应版本相同的特性,尽管一些实现尚未从 Java 端迁移。然而,Python 以一种可扩展的方式提供了创建高效数据管道的能力,利用巨大的计算能力,以执行任何困难的数据处理操作。

为了亲自动手并更好地理解 Apache Beam 的功能,让我们从一个简单的例子开始——这是每个人开始时最著名的例子,一个单词计数器,针对输入文本文件 “King Lear”。在这篇文章中,我们只看 Python SDK,但如果你感兴趣,并且有足够的请求,我会非常乐意为 Java 部分设置一篇文章。

在开始之前,创建你的虚拟环境,并安装 pip install apache-beampip install apache-beam[gcp]。后者将授予你所有 GCP 包,以便从 Google Cloud Storage(或 GCS,本例中需要,以下载 “King Lear” 数据)。然后,只需复制并粘贴此代码:

import argparse
import logging
import re

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

class WordExtractingDoFn(beam.DoFn):
  """ This is the transform, that extracts the input words

  Args: 
      element: A worker's element 
  Return: 
      words in the element text 
  """
  def process(self, element):
    """Returns an iterator over the words of this element.

    The element is a line of text.  If the line is blank, note that, too.

    Args:
      element: the element being processed

    Returns:
      The processed element.
    """
    return re.findall(r'[w']+', element, re.UNICODE)

def run(argv=None, save_main_session=True):
  """ Main entry point; defines and runs the wordcount pipeline."""
  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--input',
      dest='input',
      default='gs://dataflow-samples/shakespeare/kinglear.txt',
      help='Input file to process.')
  parser.add_argument(
      '--output',
      dest='output',
      required=True,
      help='Output file to write results to.')
  known_args, pipeline_args = parser.parse_known_args(argv)

  # We use the save_main_session option because one or more DoFn's in this
  # workflow relies on global context (e.g., a module imported at module level).
  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

  # The pipeline will be run on exiting the with block.
  with beam.Pipeline(options=pipeline_options) as p:

    # Read the text file[pattern] into a PCollection.
    lines = p | 'Read' >> ReadFromText(known_args.input)

    counts = (
        lines
        | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
        | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
        | 'GroupAndSum' >> beam.CombinePerKey(sum))

    # Format the counts into a collection of strings.
    def format_result(word, count):
      return '%s: %d' % (word, count)

    output = counts | 'Format' >> beam.MapTuple(format_result)

    # Write the output using a "Write" transform that has side effects.
    # pylint: disable=expression-not-assigned
    output | 'Write' >> WriteToText(known_args.output)

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run() 

然后,你可以在本地环境中运行此代码:python wordcount.py --output "output_from_beam.txt"。在看到一个奇怪的输出信息后,你会看到 output_from_beam.txt 正在被填充:

KING: 243
LEAR: 236
DRAMATIS: 1
PERSONAE: 1
king: 65
of: 447
Britain: 2
OF: 15
FRANCE: 10
DUKE: 3
BURGUNDY: 8
CORNWALL: 63
ALBANY: 67
EARL: 2
KENT: 156
GLOUCESTER: 141
...

让我们从上面的代码中突出显示主要的 Beam 概念

  • 有一个用于调用管道并使其生效的上下文管理器,并且从那里开始,管道的步骤通过竖线分隔:
with beam.Pipeline(options=pipeline_options) as p:
    # start the pipeline p 
    lines = p | 'Read' >> ReadFromText(known_args.input)
    # call a second step 
    second_step = lines
        | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
    ...
  • 每个管道的步骤都有一个名称,例如 p | 'Read'Read 是组件的名称,并通过 >> 调用一个特定的函数。

  • 管道中的每个函数都返回一个特定的 Beam 对象,称为 PCollectionPCollection 是由定义的函数处理的数据的一部分。例如,在 p | 'Read' >> ReadFromText(known_args.input) 中返回给定输入文件的一行文本。每个输出都是一个 String,因此这一步的输出是 PCollection<String>

  • 所有 PCollection,或数据集合,都可以由管道中的下一步读取。为了正确且高效地读取所有这些输入数据,我们需要并行处理它们。对于自定义函数,例如 WordExtractingDoFn,我们可以使用两个元素。首先,包装器 beam.ParDo,或并行-do。这是一个 “Beam 转换”,它将 PCollection 中的每个输入元素映射到某个处理函数。第二个元素是 beam.DoFn,或 do 函数。从这里:

  • 每个处理数据的自定义函数都称为“转换”。函数必须定义为 Python 对象,而不是函数,因此使用class。每个类都必须有一个process方法,其中实现了处理逻辑。process方法接收element,即输入元素,并返回一个包含函数输出值的可迭代对象。

一个更复杂的例子

让我们关注一个更复杂的例子,以欣赏 Apache Beam 的力量。特别是,我们将创建一个读取日志文件的代码。该日志文件有 100 万个条目。这些条目被解析和分析,按键分组,并计算每个日志事件的平均持续时间作为输出。

要创建日志文件,可以使用此脚本:

import random
from datetime import datetime, timedelta

def generate_log_entry():
    timestamp = datetime(2022, 1, 1, 12, 0, 0) + timedelta(minutes=random.randint(0, 1440))
    event_type = f"event_type_{random.choice(['A', 'B', 'C'])}"
    duration = round(random.uniform(5.0, 30.0), 2)
    return f"{timestamp.isoformat()},{event_type},{duration}"

if __name__ == '__main__':
    output_file = 'log_entries_large.txt'

    with open(output_file, 'w') as file:
        for _ in range(1000000):
            file.write(generate_log_entry() + 'n')

    print(f"Generated log entries file: {output_file}") 

脚本保存了一个名为log_entries_large.txt的文件。它创建了 100 万行。每一行代表一个已记录的事件,包括时间戳、事件类型和事件持续时间:

2022-01-02T00:36:00,event_type_A,18.52
2022-01-01T17:31:00,event_type_B,25.5
2022-01-02T03:22:00,event_type_C,26.79
2022-01-01T23:26:00,event_type_C,17.98
2022-01-02T01:15:00,event_type_C,29.54
2022-01-01T19:43:00,event_type_C,19.68
2022-01-02T01:30:00,event_type_B,5.65
2022-01-01T23:33:00,event_type_C,25.4
2022-01-02T07:20:00,event_type_A,16.59
2022-01-01T14:49:00,event_type_C,23.62

这些输入数据可以轻松地被 Beam 流程摄取,以生成每个日志事件的平均持续时间:

import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from structlog import get_logger

logger = get_logger()

class ParseLogEntry(beam.DoFn):
    def process(self, element):
        # Assuming log entries are in CSV format: timestamp,event_type,duration
        timestamp, event_type, duration = element.split(',')
        return [{'event_type': event_type, 'duration': float(duration)}]

class CalculateAverageDuration(beam.CombineFn):
    def create_accumulator(self):
        return (0.0, 0)

    def add_input(self, accumulator, input):
        total_duration, count = accumulator
        for duration in input:
            total_duration += duration
            count += 1
        return total_duration, count

    def merge_accumulators(self, accumulators):
        total_duration, count = zip(*accumulators)
        return sum(total_duration), sum(count)

    def extract_output(self, accumulator):
        total_duration, count = accumulator
        return total_duration / count if count != 0 else 0

def run_pipeline(argv=None, save_main_session=True):
    parser = argparse.ArgumentParser()
    parser.add_argument('--input-file', dest='input', required=True)
    parser.add_argument('--output-file', dest='output', required=True)
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
    options = PipelineOptions(['--runner=DirectRunner'])

    with beam.Pipeline(options=options) as pipeline:
        results = (
            pipeline
            | 'ReadFromText' >> beam.io.ReadFromText(known_args.input)
            | 'ParseLogEntry' >> beam.ParDo(ParseLogEntry())
            | 'WithKeys' >> beam.Map(lambda element: (element['event_type'], element['duration']))
            | 'GroupByKey' >> beam.GroupByKey()
            | 'CalculateAverageDuration' >> beam.CombinePerKey(CalculateAverageDuration())
            | 'FormatOutput' >> beam.Map(lambda kv: f'Event Type: {kv[0]}, Average Duration: {kv[1]:.2f}')
            | 'WriteToText' >> beam.io.WriteToText(known_args.output)
        )

if __name__ == '__main__':
    run_pipeline()

让我们分析这个脚本中发生的事情:

  • 主要函数是run_pipeline。像 Beam 命名约定所希望的那样,**流程接收输入参数,我们将主会话设置为所有工作进程共享。**这允许我们在代码的主会话中保存全局环境的状态。然后,这个环境可以在进程的所有工作节点之间共享,这样流程的转换(例如ParDo),由流程的工作进程执行,就可以访问与主函数相同的全局环境。

  • 然后我们编写一个典型的 Beam 流程设置,定义输入参数和流程选项

parser = argparse.ArgumentParser()
parser.add_argument('--input-file', dest='input', required=True)
parser.add_argument('--output-file', dest='output', required=True)
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
options = PipelineOptions(['--runner=DirectRunner'])

值得注意的是,Beam 有不同的runner选项。特别是,我们可以有三个主要运行器

  1. DirectRunner:用于本地流程执行的运行器

  2. DataflowRunner:这是一个设计用于在 Google Cloud Dataflow 上执行流程的运行器,它是一个完全托管的无服务器数据处理服务

  3. FlinkRunner:这是一个允许 Beam 流程在 Apache Flink 上运行的运行器,Apache Flink 是一个流处理框架。AWS Kinesis Analytics 可以运行 Apache Flink,因此我们可以说这是DataflowRunner的 AWS 对应物

  • 主流程分为 7 个步骤。一旦数据被读取,我们执行第一个转换,使用beam.ParDo(ParseLogEntry())。对象ParseLogEntryReadFromText步骤输出的PCollection执行简单的解析操作

  • WithKeys步骤非常重要,因为它创建了一个PCollection<K,V>输出,即键和值的集合。前一个步骤的输出是一个字符串,例如:{'event_type':'eventy_type_A', 'duration':18.52}。我们想知道的是,获取所有事件类型,将它们转换为键,并使用持续时间值作为键的值。因此,输出将是event_type_A: 18.52

  • 一旦我们使用键映射了值,我们就可以对它们进行分组。GroupByKey 步骤使用 beam.GroupByKey() 将所有具有相同键的集合排序到组中。

  • 从这里,我们计算每个事件类型的平均持续时间。这可以通过另一种类型的 Beam 转换完成,称为 beam.CombineFn

class CalculateAverageDuration(beam.CombineFn):
    def create_accumulator(self):
        return (0.0, 0)

    def add_input(self, accumulator, input):
        total_duration, count = accumulator
        for duration in input:
            total_duration += duration
            count += 1
        return total_duration, count

    def merge_accumulators(self, accumulators):
        total_duration, count = zip(*accumulators)
        return sum(total_duration), sum(count)

    def extract_output(self, accumulator):
        total_duration, count = accumulator
        return total_duration / count if count != 0 else 0

beam.CombineFn 创建一个组合转换。每个组合转换需要以下函数:

  1. create_accumulator: 这个函数创建一个累加器。在这种情况下,我们将有一个包含两个值的元组,第一个值表示持续时间,第二个值表示我们跨元素计数的数量。

  2. add_input: 这个函数接收累加器和输入。在这里,我们计算特定键的所有事件的持续时间总和,然后计算到目前为止我们计数的元素总数。

  3. merge_accumulators: 这个函数执行所有累积值的合并。对于每个我们已分组的键,我们将有三个不同的累加器,包含总持续时间总和和计数的元素总数。

  4. extract_output: 这个函数返回我们想要的组合(平均)值,执行累加器的算术平均值。

  • 管道中的最后两个步骤将输出转换为可写格式并将其写入输出文本。

Apache Beam,即使是本地,也能很好地处理文件缩放的大小,正如表.1 所示。平均时间是在 3 次不同的运行中计算的,报告了平均值及其标准误差(作为参考,这些计算是在 Mac M2 Pro,16 GB 上进行的)。值得注意的是,输入文件本身就不大,因为它只有三列,但如果考虑到我们在简单的笔记本电脑上运行这些计算,结果确实是令人瞩目的。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐