Google Cloud Dataflow 可提供全代管式数据处理系统,以便用户在 Google Cloud 上以高度可扩展的方式运行 Apache Beam 流水线。由于是全代管式服务, Dataflow 用户无需担心任何服务端回归和版本控制。我们承诺,您只需关心您的流水线逻辑,而 Google 将为您搞定服务基础设施。毋庸置疑,Apache Beam 本身就是一个功能非常全面的 SDK,可提供许多由简到繁的转换,供您在其流水线中使用。例如,Apache Beam 提供了不少 I/O 连接器,其中许多连接器是 Apache Beam 复合转换,转换步骤由数十步至数百步不等。在过去,尽管这些代码不是由用户编写或维护的,但从服务的角度来看,它们一直被视为“用户代码”。在使用 I/O 连接器等复杂 Beam 转换时,客户会遇到一些常见的复杂问题。
为了缓解上述三个问题所带来的困扰,Dataflow 最近推出了名为代管式 I/O 的新产品。通过代管式 I/O,服务仅靠自身便能够代您管理这些复杂问题。因此,您可以真正专注于流水线业务逻辑,而不是为了满足相应需求,去专注于处理与特定连接器的使用和配置相关的琐碎小事。下面我们将详细介绍如何通过代管式 I/O 来解决上述的每一个复杂问题。
Apache Beam 是一个成熟的 SDK,具有许多转换、功能和优化。与许多大型软件一样,将 Beam 升级到新版本可能是一个重大工程。通常,在升级 Beam 时,您需要升级流水线的所有部分,包括所有 I/O 连接器。但有时,您只需要采用流水线中使用的一个或多个 I/O 连接器的最新版本中提供的关键错误修复或改进。
Managed I/O with Dataflow simplifies this by completely taking over the management of the Beam I/O connector version. With Managed I/O, Dataflow will make sure that I/O connectors used by pipelines are always up to date. Dataflow performs this by always upgrading I/O connectors to the latest vetted version during job submission and streaming update via replacement.
For example, assume that you use a Beam pipeline that uses Beam 2.x.0 and assume that you use the Managed Apache Iceberg I/O source in your pipeline. Also, assume that the latest vetted version of the Iceberg I/O source supported by Dataflow is 2.y.0. During job submission, Dataflow will replace this specific connector with version 2.y.0 and will keep the rest of the Beam pipeline including any standard (non-managed) I/O connectors at version 2.x.0.
替换后,Dataflow 会优化更新后的流水线并在 GCE 中加以执行。为了实现不同 Beam 版本连接器之间的隔离,Dataflow 会在 GCE 虚拟机中部署一个额外的 Beam SDK 容器。在这种情况下,2.x.0 和 2.y.0 版本的 Beam SDK 容器将在 Dataflow 作业使用的各个 GCE 虚拟机中单独运行。
有了代管式 I/O,您便可以确信流水线中使用的 I/O 连接器始终都是最新的。这让您能够专注于改进流水线的业务逻辑,而无需单单为了获取 I/O 连接器更新而忧心 Beam 版本升级的问题。
不同 Beam I/O 连接器之间的 API 差异很大。这意味着每当您尝试使用新的 Beam I/O 连接器时,您都必须学习该连接器的专用 API。有些 API 可能非常庞大而且不直观。这可能是出于以下原因:
以上几点原因导致某些连接器的 API Surface 非常庞大,新客户无法直观高效地使用。
Managed I/O offers standardized Java and Python APIs for supported I/O connectors. For example, with Beam Java SDK an I/O connector source can be instantiated in the following standardized form.
Managed.read(SOURCE).withConfig(sourceConfig)
An I/O connector sink can be instantiated in the following form.
Managed.write(SINK).withConfig(sinkConfig)
Here SOURCE
and SINK
are keys specifically identifying the connector while sourceConfig
and sinkConfig
are maps of configurations used to instantiate the connector source or sink. The map of configurations may also be provided as YAML files available locally or in Google Cloud Storage. Please see the Managed I/O website for more complete examples for supported sources and sinks.
Beam Python SDK 可提供类似的简化版 API。
这意味着您能够以非常标准的方式实例化具有不同 API 的各种 Beam I/O 连接器。例如,
// Create a Java BigQuery I/O source
Map<String, Object> bqReadConfig = ImmutableMap.of("query", "<query>", ...);
Managed.read(Managed.BIGQUERY).withConfig(bqReadConfig)
// Create a Java Kafka I/O source.
Map<String, Object> kafkaReadConfig = ImmutableMap.of("bootstrap_servers", "<server>", "topic", "<topic>", ...);
Managed.read(Managed.KAFKA).withConfig(kafkaReadConfig)
// Create a Java Kafka I/O source but with a YAML based config available in Google Cloud Storage.
String kafkaReadYAMLConfig = "gs://path/to/config.yaml"
Managed.read(Managed.KAFKA).withConfigUrl(kafkaReadYAMLConfig)
// Create a Python Iceberg I/O source.
iceberg_config = {"table": "<table>", ...}
managed.Read(managed.ICEBERG, config=iceberg_config)
许多 Beam 连接器都提供全面的 API,用于配置和优化连接器,以适应给定的流水线和给定的 Beam 运行程序。这其中的一个缺点是,如果您想专门在 Dataflow 上运行连接器,就可能需要了解最适合 Dataflow 的特定配置,并在设置流水线时加以应用。连接器的相关文档可能会很长很详细,而您所需的具体更改可能并不是那么一目了然。这可能会导致 Dataflow 流水线中使用的连接器以次优方式执行。
代管式 I/O 连接器可以自动重新配置连接器以采用最佳实践,并将连接器配置为最适合 Dataflow 的形式,以此来缓解次优执行的情况。这种重新配置可能会在提交作业或通过替换进行串流更新期间发生。
例如,Dataflow 串流提供了两种模式,分别为“exactly-once”和“at-least-once”,而带有 Storage Write API 的 BigQuery I/O 接收器也提供了两种类似的交付语义,分别为“exactly-once”和“at-least-once”。使用“at-least-once”交付语义的 BigQuery 接收器通常成本较低,并且延迟较低。对于标准 BigQuery I/O 连接器,您需负责确保在使用 BigQuery I/O 时使用适当的模式。对于代管式 BigQuery I/O 接收器,系统会自动为您配置适当的模式。这意味着如果您的串流流水线以“at-least-once”模式运行,您的代管式 I/O BigQuery 接收器将自动配置为使用“at-least-once”交付语义。
我们运行了几个流水线,这些流水线使用由部署在 GCS 中的 Hadoop 目录提供支持的代管式 Iceberg I/O 接收器来写入数据(有关其他受支持的目录,请参阅此处)。提交的流水线使用 Beam 2.61.0,Dataflow 自动将代管式 I/O 接收器升级到了受支持的最新版本。所有基准测试都使用 n1-standard-4 虚拟机,流水线使用的虚拟机数量固定为 100 个。请注意,此处的执行时间不包括启动和关闭时间。
正如基准测试所显示的那样,代管式 Iceberg I/O 很好地扩大了规模,并且两项指标都随数据量增长呈现出线性增长趋势。
我们还运行了一个从 Google Pub/Sub 读取内容的串流流水线,并使用代管式 I/O Kafka 接收器将消息推送到托管在 GCP 中的 Kafka 集群。流水线使用 Beam 2.61.0,Dataflow 将代管式 Kafka 接收器升级到了受支持的最新版本。在稳定状态下,流水线使用了 10 个 n1-standard-4 虚拟机(最多 20 个虚拟机)。流水线在所有步骤中一直以 250k msgs/sec 的吞吐量处理消息,并运行了 2 小时。
下图显示了流水线各个步骤的数据吞吐量。请注意,这里的吞吐量是不同的,因为元素大小在步骤之间发生了变化。流水线以 75 MiB/秒(红线)的速率从 Pub/Sub 读取内容,并以 40 MiB/秒(绿线)的速率写入 Kafka。
在流水线执行期间,延迟和积压都很低。
该流水线高效地使用了虚拟机 CPU 和内存。
这些结果表明,可以通过 Dataflow 服务提供的升级版代管式 I/O Kafka 接收器高效执行流水线。
使用代管式 I/O 的方法并不难,例如在流水线中使用其中一种受支持源和接收器,并使用 Dataflow Runner v2 运行流水线。当您运行流水线时,Dataflow 会确保在作业提交和通过替换进行串流更新期间启用源和接收器经过审核的最新版本,即便您在流水线中使用的是较旧的 Beam 版本。