一、FlinkSQL自定义函数分类
Flink SQL 的自定义函数是用户可以自行编写的一种函数,用于扩展 Flink SQL 的功能。自定义函数可以在 SQL 查询中被调用,以完成用户自定义的数据处理逻辑。 在 Flink SQL 中,自定义函数分为标量函数、表函数和聚合函数三种类型。
1、标量函数(Scalar Function)
2、表函数(Table Function)
表函数接受一行输入,返回多行输出。在 Flink SQL 中,表函数可以使用 LATERAL TABLE 语法进行调用。用户可以通过继承 TableFunction 类或实现 TableFunction 接口的方式来实现自定义的表函数。
3、聚合函数(Aggregate Function)
自定义聚合函数是通过扩展 AggregateFunction 来实现的。AggregateFunction 的工作过程如下。首先,它需要一个 accumulator,它是一个数据结构,存储了聚合的中间结果。通过调用 AggregateFunction 的 createAccumulator( 方法创建一个空的 accumulator。接下来,对于每一行数据,会调用 accumulate( 方法来更新 accumulator。当所有的数据都处理完了之后,通过调用 getValue 方法来计算和返回最终的结果。
下面几个方法是每个 AggregateFunction 必须要实现的:
- createAccumulator(
- accumulate(
- getValue(
4、表值聚合函数
用户自定义表值聚合函数是通过扩展 TableAggregateFunction 类来实现的。一个 TableAggregateFunction 的工作过程如下。首先,它需要一个 accumulator,这个 accumulator 负责存储聚合的中间结果。 通过调用 TableAggregateFunction 的 createAccumulator 方法来构造一个空的 accumulator。接下来,对于每一行数据,会调用 accumulate 方法来更新 accumulator。当所有数据都处理完之后,调用 emitValue 方法来计算和返回最终的结果。
下面几个 TableAggregateFunction 的方法是必须要实现的:
- createAccumulator(
- accumulate(
5、异步表值函数
二、需求场景
1、需求描述
基于Flink1.14.4集群,有一批基于某个主键生成的collect函数结果数据,需要转换为字符串传到下游Kafka。由于collect(函数生成的结果是一个多行的集合MutiSet<varchar(100>,FlinkSQL中暂未支持concat_ws或者concat函数,因此无法将collect生成的多行结果直接通过现有SQL函数转换为一行字符串。基于以上原因,需要开发一个自定义函数实现。
2、数据样例
CREATE TABLE "air_data_source_result" (
"id" int NOT NULL DEFAULT '0' COMMENT '主键',
"airlineLogo" varchar(100 CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"airlineShortCompany" varchar(100 CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"arrActCross" varchar(100 CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"arrActTime" varchar(100 CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"arrAirport" varchar(100 CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"arrCode" varchar(100 CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"arrOntimeRate" varchar(100 CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"arrPlanCross" varchar(100 CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"arrPlanTime" varchar(100 CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"arrTerminal" varchar(100 CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"checkInTable" varchar(100 CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,