FlinkSQL自定义函数开发

科技资讯 投稿 7200 0 评论

FlinkSQL自定义函数开发

一、FlinkSQL自定义函数分类

Flink SQL 的自定义函数是用户可以自行编写的一种函数,用于扩展 Flink SQL 的功能。自定义函数可以在 SQL 查询中被调用,以完成用户自定义的数据处理逻辑。 在 Flink SQL 中,自定义函数分为标量函数、表函数和聚合函数三种类型。

1、标量函数(Scalar Function)

2、表函数(Table Function)

表函数接受一行输入,返回多行输出。在 Flink SQL 中,表函数可以使用 LATERAL TABLE 语法进行调用。用户可以通过继承 TableFunction 类或实现 TableFunction 接口的方式来实现自定义的表函数。

3、聚合函数(Aggregate Function)

自定义聚合函数是通过扩展 AggregateFunction 来实现的。AggregateFunction 的工作过程如下。首先,它需要一个 accumulator,它是一个数据结构,存储了聚合的中间结果。通过调用 AggregateFunction 的 createAccumulator( 方法创建一个空的 accumulator。接下来,对于每一行数据,会调用 accumulate( 方法来更新 accumulator。当所有的数据都处理完了之后,通过调用 getValue 方法来计算和返回最终的结果。
下面几个方法是每个 AggregateFunction 必须要实现的:

    createAccumulator(
  • accumulate(
  • getValue(

4、表值聚合函数

用户自定义表值聚合函数是通过扩展 TableAggregateFunction 类来实现的。一个 TableAggregateFunction 的工作过程如下。首先,它需要一个 accumulator,这个 accumulator 负责存储聚合的中间结果。 通过调用 TableAggregateFunction 的 createAccumulator 方法来构造一个空的 accumulator。接下来,对于每一行数据,会调用 accumulate 方法来更新 accumulator。当所有数据都处理完之后,调用 emitValue 方法来计算和返回最终的结果。
下面几个 TableAggregateFunction 的方法是必须要实现的:

    createAccumulator(
  • accumulate(

5、异步表值函数

二、需求场景

1、需求描述

基于Flink1.14.4集群,有一批基于某个主键生成的collect函数结果数据,需要转换为字符串传到下游Kafka。由于collect(函数生成的结果是一个多行的集合MutiSet<varchar(100>,FlinkSQL中暂未支持concat_ws或者concat函数,因此无法将collect生成的多行结果直接通过现有SQL函数转换为一行字符串。基于以上原因,需要开发一个自定义函数实现。

2、数据样例

CREATE TABLE "air_data_source_result" (
  "id" int NOT NULL DEFAULT '0' COMMENT '主键',
  "airlineLogo" varchar(100 CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
  "airlineShortCompany" varchar(100 CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
  "arrActCross" varchar(100 CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
  "arrActTime" varchar(100 CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
  "arrAirport" varchar(100 CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
  "arrCode" varchar(100 CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
  "arrOntimeRate" varchar(100 CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
  "arrPlanCross" varchar(100 CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
  "arrPlanTime" varchar(100 CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
  "arrTerminal" varchar(100 CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
  "checkInTable" varchar(100 CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,

编程笔记 » FlinkSQL自定义函数开发

赞同 (36) or 分享 (0)
游客 发表我的评论   换个身份
取消评论

表情
(0)个小伙伴在吐槽