使用Apache Flink 和 Apache Hudi 创建低延迟数据湖管道

科技资讯 投稿 89800 0 评论

使用Apache Flink 和 Apache Hudi 创建低延迟数据湖管道

AWS 提供复制工具,例如 AWS Database Migration Service (AWS DMS,用于将数据更改从各种源数据库复制到各种目标,包括 Amazon Simple Storage Service (Amazon S3。但是需要将数据湖中的数据与源系统上的更新和删除同步的客户仍然面临一些挑战:

  • 当记录存储在 Amazon S3 上的开放数据格式文件(例如 JSON、ORC 或 Parquet)中时,很难应用记录级更新或删除。

  • 在流式使用案例中,作业需要以低延迟写入数据,JSON 和 Avro 等基于行的格式最适合。但是使用这些格式扫描许多小文件会降低读取查询性能。

  • 在源数据模式频繁更改的用例中,通过自定义代码维护目标数据集的模式既困难又容易出错。

  • Copy-On-Write (COW – 这些表在批处理中很常见。在这种类型中,数据以列格式(Parquet)存储,每次更新(或删除)都会在写入过程中创建一个新版本的文件。

  • Merge-On-Read (MOR – 使用列(例如 Parquet)和基于行(例如 Avro)文件格式的组合存储数据,旨在让数据更加实时。

存储在 Amazon S3 中的 Hudi 数据集提供与其他 AWS 服务的原生集成。例如可以使用 AWS Glue(请参阅使用 AWS Glue 自定义连接器写入 Apache Hudi 表)或 Amazon EMR(请参阅 Amazon EMR 中提供的 Apache Hudi 的新功能)写入 Apache Hudi 表。但这些方法需要对 Hudi 的 Spark API 和编程技能有深入的掌握,才能构建和维护数据管道。
这篇文章中将展示一种以最少编码处理流数据的不同方式。本文中的步骤演示了如何在没有 Flink 或 Hudi 知识的情况下使用 SQL 语言构建完全可扩展的管道。可以通过编写熟悉的 SELECT 查询来查询和探索多个数据流中的数据,还可以连接来自多个流的数据并将结果物化到 Amazon S3 上的 Hudi 数据集。

解决方案概述

2021 年 9 月,AWS 宣布 MSK Connect 用于运行完全托管的 Kafka Connect 集群。只需单击几下,MSK Connect 即可轻松部署、监控和扩展连接器,将数据从数据库、文件系统和搜索索引等外部系统移入和移出 Apache Kafka 和 MSK 集群。用户现在可以使用 MSK Connect 构建从许多数据库源到 MSK 集群的完整 CDC 管道。
Amazon MSK 是一项完全托管的服务,可以轻松构建和运行使用 Apache Kafka 处理流数据的应用程序。使用 Apache Kafka 可以从数据库更改事件或网站点击流等来源捕获实时数据。然后构建管道(使用流处理框架,如 Apache Flink)将它们交付到目标,如持久存储或 Amazon S3。
Apache Flink 是一个流行的框架,用于构建有状态的流和批处理管道。 Flink 带有不同级别的抽象,以涵盖广泛的用例。
Flink 还根据选择的资源提供者(Hadoop YARN、Kubernetes 或独立)提供不同的部署模式。
这篇文章将使用 SQL 客户端工具作为一种交互式方式以 SQL 语法创建 Flink 作业。 将作业编译并提交到 Amazon EMR 上长时间运行的 Flink 集群(session 模式)。根据脚本,要么实时显示作业的表格格式输出,要么返回长时间运行的作业的作业 ID。
可以通过以下步骤实施解决方案:

  • 创建 EMR 集群

  • 使用 Kafka 和 Hudi 表连接器配置 Flink

  • 开发实时提取、转换和加载 (ETL 作业

  • 将管道部署到生产环境

先决条件

  • Aurora MySQL 托管数据库。这篇文章中将使用示例数据库 。

  • 在 MSK Connect 上运行的 Debezium MySQL 连接器,在 Amazon Virtual Private Cloud (Amazon VPC 中以 Amazon MSK 结尾。

  • 在 VPC 中运行的 MSK 集群

如果没有 MSK Connect,请按照 MSK Connect 实验室设置中的说明进行操作,并验证源连接器是否将数据更改复制到 MSK 主题。
还需要能够直接连接到 EMR  Leader节点。Session Manager 是 AWS Systems Manager 的一项功能,可提供基于浏览器的交互式一键式 shell 窗口。会话管理器还允许对受管节点进行受控访问的公司策略。
如果不使用 Session Manager,也可以使用 Amazon Elastic Compute Cloud (Amazon EC2 私有密钥对,但需要在公有子网中启动集群并提供入站 SSH 访问。

创建 EMR 集群

  1. 创建一个文件,configurations.json,包含以下内容:

[
  {
    "Classification": "flink-conf",
    "Properties": {
      "taskmanager.numberOfTaskSlots":"4"
    }
  }
]
  1. 在私有子网(推荐)或托管 MSK 集群的同一 VPC 的公有子网中创建 EMR 集群。 使用  选项输入集群的名称,并使用  选项指定 EC2 密钥对的名称以及子网 ID。 请参阅以下代码:

aws emr create-cluster --release-label emr-6.4.0 \
--applications Name=Flink \
--name FlinkHudiCluster \
--configurations file://./configurations.json \
--region us-east-1 \
--log-uri s3://yourLogUri \
--instance-type m5.xlarge \
--instance-count 2 \
--service-role EMR_DefaultRole \ 
--ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole, SubnetId=A SubnetID of Amazon MSK VPC
  1. 等到集群状态变更为 Running。

  2. 使用 Amazon EMR 控制台或 AWS CLI 检索Leader节点的 DNS 名称。

  3. 通过 Session Manager 或在 Linux、Unix 和 Mac OS X 上使用 SSH 和 EC2 私钥连接到Leader节点。

  4. 使用 SSH 连接时,领导节点的安全组必须允许端口 22。

  5. 确保 MSK 集群的安全组具有接收来自 EMR 集群安全组的流量的入站规则。

使用 Kafka 和 Hudi 表连接器配置 Flink

Flink 表连接器允许使用 Table API 编程流操作时连接到外部系统。源连接器提供对流服务的访问,包括作为数据源的 Kinesis 或 Apache Kafka。Sink 连接器允许 Flink 将流处理结果发送到外部系统或 Amazon S3 等存储服务。
在 Amazon EMR Leader节点上下载以下连接器并将它们保存在 /lib/flink/lib 目录中:

  • 源连接器——从 Apache 仓库下载 。 Apache Kafka SQL 连接器允许 Flink 从 Kafka 主题中读取数据。

  • 接收器连接器 – Amazon EMR 发布版本 emr-6.4.0 随附 Hudi 发布版本 0.8.0。但是在这篇文章中需要 Hudi Flink 捆绑连接器发布版本 0.10.0,它与 Flink 发布版本 1.13 兼容。从 Apache 仓库下载 。它还包含多个文件系统客户端,包括用于与 Amazon S3 集成的 S3A。

开发实时 ETL 作业

使用 Flink SQL API 构建一个新作业。这些 API 允许使用流数据,类似于关系数据库中的表。此方法中指定的 SQL 查询在源流中的数据事件上连续运行。因为 Flink 应用程序从流中消费无限数据,所以输出不断变化。为了将输出发送到另一个系统,Flink 向下游 sink 操作员发出更新或删除事件。因此当使用 CDC 数据或编写需要更新或删除输出行的 SQL 查询时,必须提供支持这些操作的接收器连接器。否则Flink 作业将出现如下错误信息

Target Table doesn't support consuming update or delete changes which is produced by {your query statement} …

使用之前在配置文件中指定的配置在 EMR 集群上启动 Flink YARN 应用程序:

cd /lib/flink && ./bin/yarn-session.sh --detached

命令成功运行后就可以创建第一个作业了。运行以下命令以启动 sql-client:

./bin/sql-client.sh

终端窗口类似于以下屏幕截图。

设置作业参数

SET execution.checkpointing.interval = 1min;

定义源表

从概念上讲使用 SQL 查询处理流需要将事件解释为表中的逻辑记录。 因此使用 SQL API 读取或写入数据之前的第一步是创建源表和目标表。 表定义包括连接设置和配置以及定义流中对象的结构和序列化格式的模式。
这篇文章中将创建三个源表,每个对应于 Amazon MSK 中的一个主题。还可以创建一个目标表,将输出数据记录写入存储在 Amazon S3 上的 Hudi 数据集。
在选项中将 替换为自己的 Amazon MSK 集群信息,并在 终端中运行以下命令:

CREATE TABLE CustomerKafka (
      `event_time` TIMESTAMP(3 METADATA FROM 'value.source.timestamp' VIRTUAL,  -- from Debezium format
      `origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
      `record_time` TIMESTAMP(3 METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
      `CUST_ID` BIGINT,
      `NAME` STRING,
      `MKTSEGMENT` STRING,
       WATERMARK FOR event_time AS event_time
     WITH (
      'connector' = 'kafka',
      'topic' = 'salesdb.salesdb.CUSTOMER', -- created by debezium connector, corresponds to CUSTOMER table in Amazon Aurora database. 
      'properties.bootstrap.servers' = '<PLAINTEXT BOOTSTRAP SERVERS ADDRESSES>',
      'properties.group.id' = 'ConsumerGroup1',
      'scan.startup.mode' = 'earliest-offset',
      'value.format' = 'debezium-json'
    ;

CREATE TABLE CustomerSiteKafka (
      `event_time` TIMESTAMP(3 METADATA FROM 'value.source.timestamp' VIRTUAL,  -- from Debezium format
      `origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
      `record_time` TIMESTAMP(3 METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
      `CUST_ID` BIGINT,
      `SITE_ID` BIGINT,
      `STATE` STRING,
      `CITY` STRING,
       WATERMARK FOR event_time AS event_time
     WITH (
      'connector' = 'kafka',
      'topic' = 'salesdb.salesdb.CUSTOMER_SITE',
      'properties.bootstrap.servers' = '< PLAINTEXT BOOTSTRAP SERVERS ADDRESSES>',
      'properties.group.id' = 'ConsumerGroup2',
      'scan.startup.mode' = 'earliest-offset',
      'value.format' = 'debezium-json'
    ;

CREATE TABLE SalesOrderAllKafka (
      `event_time` TIMESTAMP(3 METADATA FROM 'value.source.timestamp' VIRTUAL,  -- from Debezium format
      `origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
      `record_time` TIMESTAMP(3 METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
      `ORDER_ID` BIGINT,
      `SITE_ID` BIGINT,
      `ORDER_DATE` BIGINT,
      `SHIP_MODE` STRING,
       WATERMARK FOR event_time AS event_time
     WITH (
      'connector' = 'kafka',
      'topic' = 'salesdb.salesdb.SALES_ORDER_ALL',
      'properties.bootstrap.servers' = '< PLAINTEXT BOOTSTRAP SERVERS ADDRESSES>',
      'properties.group.id' = 'ConsumerGroup3',
      'scan.startup.mode' = 'earliest-offset',
      'value.format' = 'debezium-json'
    ;

默认情况下 将这些表存储在内存中,它们仅在活动会话期间存在,每当  会话到期或退出时都需要重新创建表。

定义目标Sink表

CREATE TABLE CustomerHudi (
      `order_count` BIGINT,
      `customer_id` BIGINT,
      `name` STRING,
      `mktsegment` STRING,
      `ts` TIMESTAMP(3,
      PRIMARY KEY (`customer_id` NOT Enforced
    
    PARTITIONED BY (`mktsegment`
    WITH (
      'connector' = 'hudi',
      'write.tasks' = '4',
      'path' = '<S3URI OF HUDI DATASET LOCATION>',
      'table.type' = 'MERGE_ON_READ' --  MERGE_ON_READ table or, by default is COPY_ON_WRITE
    ;

对于 查询,将作业提交到 Flink 集群,然后将结果实时显示在屏幕上。 运行以下选择查询以查看 Amazon MSK 数据:

SELECT Count(O.order_id AS order_count,
       C.cust_id,
       C.NAME,
       C.mktsegment
FROM   customerkafka C
       JOIN customersitekafka CS
         ON C.cust_id = CS.cust_id
       JOIN salesorderallkafka O
         ON O.site_id = CS.site_id
GROUP  BY C.cust_id,
          C.NAME,
          C.mktsegment;

此查询连接三个流并聚合按每个客户记录分组的客户订单计数,几秒钟后会在终端中看到结果。 请注意终端输出如何随着 Flink 作业从源流中消耗更多事件而发生变化。

将结果写入 Hudi 数据集

INSERT INTO customerhudi
SELECT Count(O.order_id,
       C.cust_id,
       C.NAME,
       C.mktsegment,
       Proctime(
FROM   customerkafka C
       JOIN customersitekafka CS
         ON C.cust_id = CS.cust_id
       JOIN salesorderallkafka O
         ON O.site_id = CS.site_id
GROUP  BY C.cust_id,
          C.NAME,
          C.mktsegment;

这一次 提交作业后与集群断开连接,客户端不必等待作业的结果,因为它会将结果写入 Hudi 数据集。即使停止了 会话,该作业也会继续在 Flink 集群上运行。
等待几分钟,直到作业将 Hudi 提交日志文件生成到 Amazon S3。然后导航到为 CustomerHudi 表指定的 Amazon S3 中的位置,其中包含按 MKTSEGMENT 列分区的 Hudi 数据集。在每个分区中,您还可以找到 Hudi 提交日志文件。这是因为表类型定义为 MERGE_ON_READ。在此模式下使用默认配置,Hudi 会在出现五个 delta 提交日志后将提交日志合并到更大的 Parquet 文件中。可以通过将表类型更改为 COPY_ON_WRITE 或指定自定义压缩配置来更改此设置。

查询 Hudi 数据集

CREATE TABLE `CustomerHudiReadonly` (
      `_hoodie_commit_time` string,
      `_hoodie_commit_seqno` string,
      `_hoodie_record_key` string,
      `order_count` BIGINT,
      `customer_id` BIGINT,
      `name` STRING,
      `mktsegment` STRING,
      `ts` TIMESTAMP(3,
      PRIMARY KEY (`customer_id` NOT Enforced
    
    PARTITIONED BY (`mktsegment`
    WITH (
      'connector' = 'hudi',
      'hoodie.datasource.query.type' = 'snapshot',
      'path' = '<S3URI OF HUDI DATASET LOCATION>',
     'table.type' = 'MERGE_ON_READ' --  MERGE_ON_READ table or, by default is COPY_ON_WRITE
    ;

请注意以 为前缀的附加列名,这些列是 Hudi 在写入过程中添加的,用于维护每条记录的元数据。另请注意在表定义的 WITH 部分中传递的额外读取配置,这可确保从 Hudi 数据集的实时视图中读取数据。运行以下命令:

select * from CustomerHudiReadonly where customer_id <= 5;

终端会在 30 秒内显示结果。导航到 Flink Web 界面可以在其中观察由 select 查询启动的新 Flink 作业(有关如何找到 Flink Web 界面,请参见下文)。它扫描 Hudi 数据集中已提交的文件,并将结果返回给 Flink SQL 客户端。
使用 或其他 IDE 连接到托管在 Aurora MySQL 上的  数据库。针对 表运行一些插入语句:

insert into SALES_ORDER_ALL values (29001, 2, now(, 'STANDARD';
insert into SALES_ORDER_ALL values (29002, 2, now(, 'TWO-DAY';
insert into SALES_ORDER_ALL values (29003, 2, now(, 'STANDARD';
insert into SALES_ORDER_ALL values (29004, 2, now(, 'TWO-DAY';
insert into SALES_ORDER_ALL values (29005, 2, now(, 'STANDARD';

几秒钟后一个新的提交日志文件会出现在 Amazon S3 上的 Hudi 数据集中。 Debezium for MySQL Kafka 连接器捕获更改并为 MSK 主题生成事件。 Flink 应用程序使用来自主题的新事件并相应地更新 customer_count 列。然后它将更改的记录发送到 Hudi 连接器以与 Hudi 数据集合并。
Hudi 支持不同的写操作类型。默认操作是 upsert,它最初在数据集中插入记录。当具有现有键的记录到达流程时,它被视为更新。此操作在希望将数据集与源数据库同步且不希望出现重复记录的情况下很有用。

将管道部署到生产环境

GitHub 上提供了一个 Flink Hudi 应用程序,其中包含这篇文章中的脚本,用户可以访问此存储库,并比较在 和 中运行之间的差异。

清理

  • 停止 EMR 集群

  • 删除 MSK Connect Lab 设置创建的 AWS CloudFormation

结论

构建数据湖是打破数据孤岛和运行分析以从所有数据中获取洞察力的第一步。在数据湖上的事务数据库和数据文件之间同步数据并非易事,而且需要大量工作。在 Hudi 添加对 Flink SQL API 的支持之前,Hudi 客户必须具备编写 Apache Spark 代码并在 AWS Glue 或 Amazon EMR 上运行它的必要技能。在这篇文章中展示了一种新方法,可以使用 SQL 查询以交互方式探索流服务中的数据,并加快数据管道的开发过程。

编程笔记 » 使用Apache Flink 和 Apache Hudi 创建低延迟数据湖管道

赞同 (79) or 分享 (0)
游客 发表我的评论   换个身份
取消评论

表情
(0)个小伙伴在吐槽