Apache Beam:数据处理、数据管道、Dataflow 和 Flex 模板
毫无疑问,在安全的环境中处理数据、创建特征、移动数据以及以稳定和计算高效的方式执行所有这些操作,对于当今所有的 AI 任务都极其相关。在很久以前,Google 开始开发一个开源项目,以启动批处理和流数据处理操作,名为 Beam。随后,Apache 软件基金会开始为此项目做出贡献,将 Apache Beam 扩展到更大的规模。Apache Beam 的相关关键在于其灵活性,使其成为构建数据处理管道的
在这篇文章中,我们将探索 Apache Beam,从简单的管道到更复杂的管道,使用 GCP Dataflow。让我们了解PTransform、PCollection、GroupByKey和 Dataflow Flex Template 的含义
图片由 Faruk Kaymak 在 Unsplash 提供
Apache Beam 简介
毫无疑问,在安全的环境中处理数据、创建特征、移动数据以及以稳定和计算高效的方式执行所有这些操作,对于当今所有的 AI 任务都极其相关。在很久以前,Google 开始开发一个开源项目,以启动批处理和流数据处理操作,名为 Beam。随后,Apache 软件基金会开始为此项目做出贡献,将 Apache Beam 扩展到更大的规模。
Apache Beam 的相关关键在于其灵活性,使其成为构建数据处理管道的最佳编程 SDK 之一。我认为 Apache Beam 中有 4 个主要概念,使其成为无价的数据工具:
-
批处理/流处理统一模型:Beam 是一个统一的编程模型,也就是说,使用相同的 Beam 代码,您可以决定是批量处理数据还是流处理数据,并且可以将管道用作其他新处理单元的模板。Beam 可以自动摄取连续的数据流或对给定批次的数据执行特定操作。
-
并行处理:高效且可扩展的数据处理核心从数据处理管道的并行执行开始,将工作负载分配到多个“工作者”上——工作者可以被视为一个节点。并行执行的关键概念被称为“
ParDo转换”,它接受一个处理单个元素的函数,并在多个工作者上并发应用。这个实现的好处是您无需担心如何分割数据或创建批量加载器。Apache Beam 会为您完成所有这些工作。 -
数据管道:鉴于上述两个方面,可以通过几行代码轻松创建数据管道,从数据摄取到输出结果。
Python、Java 以及一个简单的入门示例
Apache Beam SDK 以两种语言存在:Python 和 Java。Java SDK 自项目开始以来一直存在,它保证了最佳的可扩展性、功能丰富性和来自社区的强大支持,用于编写数据管道。Python SDK 提供与 Java 对应版本相同的特性,尽管一些实现尚未从 Java 端迁移。然而,Python 以一种可扩展的方式提供了创建高效数据管道的能力,利用巨大的计算能力,以执行任何困难的数据处理操作。
为了亲自动手并更好地理解 Apache Beam 的功能,让我们从一个简单的例子开始——这是每个人开始时最著名的例子,一个单词计数器,针对输入文本文件 “King Lear”。在这篇文章中,我们只看 Python SDK,但如果你感兴趣,并且有足够的请求,我会非常乐意为 Java 部分设置一篇文章。
在开始之前,创建你的虚拟环境,并安装 pip install apache-beam 和 pip install apache-beam[gcp]。后者将授予你所有 GCP 包,以便从 Google Cloud Storage(或 GCS,本例中需要,以下载 “King Lear” 数据)。然后,只需复制并粘贴此代码:
import argparse
import logging
import re
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class WordExtractingDoFn(beam.DoFn):
""" This is the transform, that extracts the input words
Args:
element: A worker's element
Return:
words in the element text
"""
def process(self, element):
"""Returns an iterator over the words of this element.
The element is a line of text. If the line is blank, note that, too.
Args:
element: the element being processed
Returns:
The processed element.
"""
return re.findall(r'[w']+', element, re.UNICODE)
def run(argv=None, save_main_session=True):
""" Main entry point; defines and runs the wordcount pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='Input file to process.')
parser.add_argument(
'--output',
dest='output',
required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
# We use the save_main_session option because one or more DoFn's in this
# workflow relies on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
# The pipeline will be run on exiting the with block.
with beam.Pipeline(options=pipeline_options) as p:
# Read the text file[pattern] into a PCollection.
lines = p | 'Read' >> ReadFromText(known_args.input)
counts = (
lines
| 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
| 'PairWithOne' >> beam.Map(lambda x: (x, 1))
| 'GroupAndSum' >> beam.CombinePerKey(sum))
# Format the counts into a collection of strings.
def format_result(word, count):
return '%s: %d' % (word, count)
output = counts | 'Format' >> beam.MapTuple(format_result)
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | 'Write' >> WriteToText(known_args.output)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
然后,你可以在本地环境中运行此代码:python wordcount.py --output "output_from_beam.txt"。在看到一个奇怪的输出信息后,你会看到 output_from_beam.txt 正在被填充:
KING: 243
LEAR: 236
DRAMATIS: 1
PERSONAE: 1
king: 65
of: 447
Britain: 2
OF: 15
FRANCE: 10
DUKE: 3
BURGUNDY: 8
CORNWALL: 63
ALBANY: 67
EARL: 2
KENT: 156
GLOUCESTER: 141
...
让我们从上面的代码中突出显示主要的 Beam 概念
- 有一个用于调用管道并使其生效的上下文管理器,并且从那里开始,管道的步骤通过竖线分隔:
with beam.Pipeline(options=pipeline_options) as p:
# start the pipeline p
lines = p | 'Read' >> ReadFromText(known_args.input)
# call a second step
second_step = lines
| 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
...
-
每个管道的步骤都有一个名称,例如
p | 'Read'。Read是组件的名称,并通过>>调用一个特定的函数。 -
管道中的每个函数都返回一个特定的 Beam 对象,称为
PCollection。PCollection是由定义的函数处理的数据的一部分。例如,在p | 'Read' >> ReadFromText(known_args.input)中返回给定输入文件的一行文本。每个输出都是一个String,因此这一步的输出是PCollection<String>。 -
所有
PCollection,或数据集合,都可以由管道中的下一步读取。为了正确且高效地读取所有这些输入数据,我们需要并行处理它们。对于自定义函数,例如WordExtractingDoFn,我们可以使用两个元素。首先,包装器beam.ParDo,或并行-do。这是一个 “Beam 转换”,它将PCollection中的每个输入元素映射到某个处理函数。第二个元素是beam.DoFn,或 do 函数。从这里: -
每个处理数据的自定义函数都称为“转换”。函数必须定义为 Python 对象,而不是函数,因此使用
class。每个类都必须有一个process方法,其中实现了处理逻辑。process方法接收element,即输入元素,并返回一个包含函数输出值的可迭代对象。
一个更复杂的例子
让我们关注一个更复杂的例子,以欣赏 Apache Beam 的力量。特别是,我们将创建一个读取日志文件的代码。该日志文件有 100 万个条目。这些条目被解析和分析,按键分组,并计算每个日志事件的平均持续时间作为输出。
要创建日志文件,可以使用此脚本:
import random
from datetime import datetime, timedelta
def generate_log_entry():
timestamp = datetime(2022, 1, 1, 12, 0, 0) + timedelta(minutes=random.randint(0, 1440))
event_type = f"event_type_{random.choice(['A', 'B', 'C'])}"
duration = round(random.uniform(5.0, 30.0), 2)
return f"{timestamp.isoformat()},{event_type},{duration}"
if __name__ == '__main__':
output_file = 'log_entries_large.txt'
with open(output_file, 'w') as file:
for _ in range(1000000):
file.write(generate_log_entry() + 'n')
print(f"Generated log entries file: {output_file}")
脚本保存了一个名为log_entries_large.txt的文件。它创建了 100 万行。每一行代表一个已记录的事件,包括时间戳、事件类型和事件持续时间:
2022-01-02T00:36:00,event_type_A,18.52
2022-01-01T17:31:00,event_type_B,25.5
2022-01-02T03:22:00,event_type_C,26.79
2022-01-01T23:26:00,event_type_C,17.98
2022-01-02T01:15:00,event_type_C,29.54
2022-01-01T19:43:00,event_type_C,19.68
2022-01-02T01:30:00,event_type_B,5.65
2022-01-01T23:33:00,event_type_C,25.4
2022-01-02T07:20:00,event_type_A,16.59
2022-01-01T14:49:00,event_type_C,23.62
这些输入数据可以轻松地被 Beam 流程摄取,以生成每个日志事件的平均持续时间:
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from structlog import get_logger
logger = get_logger()
class ParseLogEntry(beam.DoFn):
def process(self, element):
# Assuming log entries are in CSV format: timestamp,event_type,duration
timestamp, event_type, duration = element.split(',')
return [{'event_type': event_type, 'duration': float(duration)}]
class CalculateAverageDuration(beam.CombineFn):
def create_accumulator(self):
return (0.0, 0)
def add_input(self, accumulator, input):
total_duration, count = accumulator
for duration in input:
total_duration += duration
count += 1
return total_duration, count
def merge_accumulators(self, accumulators):
total_duration, count = zip(*accumulators)
return sum(total_duration), sum(count)
def extract_output(self, accumulator):
total_duration, count = accumulator
return total_duration / count if count != 0 else 0
def run_pipeline(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
parser.add_argument('--input-file', dest='input', required=True)
parser.add_argument('--output-file', dest='output', required=True)
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
options = PipelineOptions(['--runner=DirectRunner'])
with beam.Pipeline(options=options) as pipeline:
results = (
pipeline
| 'ReadFromText' >> beam.io.ReadFromText(known_args.input)
| 'ParseLogEntry' >> beam.ParDo(ParseLogEntry())
| 'WithKeys' >> beam.Map(lambda element: (element['event_type'], element['duration']))
| 'GroupByKey' >> beam.GroupByKey()
| 'CalculateAverageDuration' >> beam.CombinePerKey(CalculateAverageDuration())
| 'FormatOutput' >> beam.Map(lambda kv: f'Event Type: {kv[0]}, Average Duration: {kv[1]:.2f}')
| 'WriteToText' >> beam.io.WriteToText(known_args.output)
)
if __name__ == '__main__':
run_pipeline()
让我们分析这个脚本中发生的事情:
-
主要函数是
run_pipeline。像 Beam 命名约定所希望的那样,**流程接收输入参数,我们将主会话设置为所有工作进程共享。**这允许我们在代码的主会话中保存全局环境的状态。然后,这个环境可以在进程的所有工作节点之间共享,这样流程的转换(例如ParDo),由流程的工作进程执行,就可以访问与主函数相同的全局环境。 -
然后我们编写一个典型的 Beam 流程设置,定义输入参数和流程选项
parser = argparse.ArgumentParser()
parser.add_argument('--input-file', dest='input', required=True)
parser.add_argument('--output-file', dest='output', required=True)
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
options = PipelineOptions(['--runner=DirectRunner'])
值得注意的是,Beam 有不同的runner选项。特别是,我们可以有三个主要运行器:
-
DirectRunner:用于本地流程执行的运行器 -
DataflowRunner:这是一个设计用于在 Google Cloud Dataflow 上执行流程的运行器,它是一个完全托管的无服务器数据处理服务 -
FlinkRunner:这是一个允许 Beam 流程在 Apache Flink 上运行的运行器,Apache Flink 是一个流处理框架。AWS Kinesis Analytics 可以运行 Apache Flink,因此我们可以说这是DataflowRunner的 AWS 对应物
-
主流程分为 7 个步骤。一旦数据被读取,我们执行第一个转换,使用
beam.ParDo(ParseLogEntry())。对象ParseLogEntry对ReadFromText步骤输出的PCollection执行简单的解析操作 -
WithKeys步骤非常重要,因为它创建了一个PCollection<K,V>输出,即键和值的集合。前一个步骤的输出是一个字符串,例如:{'event_type':'eventy_type_A', 'duration':18.52}。我们想知道的是,获取所有事件类型,将它们转换为键,并使用持续时间值作为键的值。因此,输出将是event_type_A: 18.52 -
一旦我们使用键映射了值,我们就可以对它们进行分组。
GroupByKey步骤使用beam.GroupByKey()将所有具有相同键的集合排序到组中。 -
从这里,我们计算每个事件类型的平均持续时间。这可以通过另一种类型的 Beam 转换完成,称为
beam.CombineFn:
class CalculateAverageDuration(beam.CombineFn):
def create_accumulator(self):
return (0.0, 0)
def add_input(self, accumulator, input):
total_duration, count = accumulator
for duration in input:
total_duration += duration
count += 1
return total_duration, count
def merge_accumulators(self, accumulators):
total_duration, count = zip(*accumulators)
return sum(total_duration), sum(count)
def extract_output(self, accumulator):
total_duration, count = accumulator
return total_duration / count if count != 0 else 0
beam.CombineFn 创建一个组合转换。每个组合转换需要以下函数:
-
create_accumulator: 这个函数创建一个累加器。在这种情况下,我们将有一个包含两个值的元组,第一个值表示持续时间,第二个值表示我们跨元素计数的数量。 -
add_input: 这个函数接收累加器和输入。在这里,我们计算特定键的所有事件的持续时间总和,然后计算到目前为止我们计数的元素总数。 -
merge_accumulators: 这个函数执行所有累积值的合并。对于每个我们已分组的键,我们将有三个不同的累加器,包含总持续时间总和和计数的元素总数。 -
extract_output: 这个函数返回我们想要的组合(平均)值,执行累加器的算术平均值。
- 管道中的最后两个步骤将输出转换为可写格式并将其写入输出文本。
Apache Beam,即使是本地,也能很好地处理文件缩放的大小,正如表.1 所示。平均时间是在 3 次不同的运行中计算的,报告了平均值及其标准误差(作为参考,这些计算是在 Mac M2 Pro,16 GB 上进行的)。值得注意的是,输入文件本身就不大,因为它只有三列,但如果考虑到我们在简单的笔记本电脑上运行这些计算,结果确实是令人瞩目的。
更多推荐



所有评论(0)