一个基于Java线程池管理的开源框架Hippo4j实践

科技资讯 投稿 7000 0 评论

一个基于Java线程池管理的开源框架Hippo4j实践

目录
    概述
    • 定义
    • 线程池痛点
    • 功能
    • 框架概览
    • 架构
  • 部署
      Docker安装
  • 二进制安装
  • 运行模式
      依赖配置中心
      • 接入流程
      • 个性化配置
      • 线程池监控
  • 无中间件依赖
      接入流程
  • 服务端配置
  • 三方框架线程池适配
  • 拒绝策略自定义
  • 概述

    定义

    Hippo4j 官网文档地址 https://hippo4j.cn/docs/user_docs/intro

    Hippo4j是一个动态可观测线程池框架,通过对 JDK 线程池增强,以及扩展三方框架底层线程池等功能,为业务系统提高线上运行保障能力。

    线程池痛点

      线程池随便定义,线程资源过多,造成服务器高负载。
    • 线程池参数不易评估,随着业务的并发提升,业务面临出现故障的风险。
    • 线程池任务执行时间超过平均执行周期,开发人员无法感知。
    • 线程池任务堆积,触发拒绝策略,影响既有业务正常运行。
    • 当业务出现超时、熔断等问题时,因为没有监控,无法确定是不是线程池引起。
    • 原生线程池不支持运行时变量的传递,比如 MDC 上下文遇到线程池就 GG。
    • 无法执行优雅关闭,当项目关闭时,大量正在运行的线程池任务被丢弃。
    • 线程池运行中,任务执行停止,怀疑发生死锁或执行耗时操作,但是无从下手。

    功能

      动态变更:应用运行时动态变更线程池参数,包括不限于核心、最大线程、阻塞队列大小和拒绝策略等,支持应用集群下不同节点线程池配置差异化。
    • 自定义报警:应用线程池运行时埋点,提供四种报警维度,线程池过载、阻塞队列容量、运行超长以及拒绝策略报警,并支持自定义时间内不重复报警。
    • 运行监控:管理应用线程池实例;支持自定义时长线程池运行数据采集存储,同时也支持 Prometheus、InfluxDB 等采集监控,通过 Grafana 或内置监控页面提供可视化大屏监控运行指标。实时查看线程池运行时数据,最近半小时线程池运行数据图表展示。
    • 功能扩展 - 支持线程池任务传递上下文;项目关闭时,支持等待线程池在指定时间内完成任务。
    • 多种模式 - 内置两种使用模式:依赖配置中心和无中间件依赖。
    • 容器管理 - Tomcat、Jetty、Undertow 容器线程池运行时查看和线程数变更。
    • 框架适配 - Dubbo、Hystrix、RabbitMQ、RocketMQ 等消费线程池运行时数据查看和线程数变更。

    框架概览

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4bRHnVlQ-1681738378627(null]

      hippo4j-adapter:适配对第三方框架中的线程池进行监控,如 Dubbo、RocketMQ、Hystrix 等;
    • hippo4j-auth:用户、角色、权限等;
    • hippo4j-common:多个模块公用代码实现;
    • hippo4j-config:提供线程池准实时参数更新功能;
    • hippo4j-console:对接前端控制台;
    • hippo4j-core:核心的依赖,包括配置、核心包装类等;
    • hippo4j-discovery:提供线程池项目实例注册、续约、下线等功能;
    • hippo4j-example :示例工程;
    • hippo4j-message :配置变更以及报警通知发送;
    • hippo4j-monitor :线程池运行时监控;
    • hippo4j-server :Server 端发布需要的模块聚合;
    • hippo4j-spring-boot:SpringBoot Starter。

    架构

    简单来说,Hippo4j 从部署的角度上分为两种角色:Server 端和 Client 端。Server 端是 Hippo4j 项目打包出的 Java 进程,功能包括用户权限、线程池监控以及执行持久化的动作。Client 端指的是我们 SpringBoot 应用,通过引入 Hippo4j Starter Jar 包负责与 Server 端进行交互。比如拉取 Server 端线程池数据、动态更新线程池配置以及采集上报线程池运行时数据等。总体功能架构如下图

      基础组件
      • 配置中心(Config):配置中心位于 Server 端,它的主要作用是监控 Server 端线程池配置变更,实时通知到 Client 实例执行线程池变更流程。代码设计基于 Nacos 1.x 版本的 长轮询以及异步 Servlet 机制 实现。
      • 注册中心(Discovery):负责管理 Client 端(单机或集群)注册到 Server 端的实例,包括不限于实例注册、续约、过期剔除 等操作,代码基于 Eureka 源码实现。
      • 控制台(Console):对接前端项目,包括不限于以下模块管理:
      消息通知:Hippo4j 内置了很多需要通知的事件,比如:线程池参数变更通知、线程池活跃度报警、拒绝策略执行报警以及阻塞队列容量报警等。目前 Notify 已经接入了钉钉、企业微信和飞书,后续持续集成邮件、短信等通知渠道;并且Notify 模块提供了消息事件的 SPI 方案,可以接受三方自定义的推送。
    • Hippo4j-Spring-Boot-Starter:Hippo4j 提供以 Starter Jar 包的形式嵌套在应用内,负责与 Server 端完成交互。

    部署

    Docker安装

    docker run -d -p 6691:6691 --name hippo4j-server hippo4j/hippo4j-server
    

    或者底层存储数据库切换为 MySQL。DATASOURCE_HOST 需要切换为本地 IP,不能使用 127.0.0.1localhost

    docker run -d -p 6691:6691 --name hippo4j-server \
    -e DATASOURCE_MODE=mysql \
    -e DATASOURCE_HOST=192.168.3.200 \
    -e DATASOURCE_PORT=3306 \
    -e DATASOURCE_DB=hippo4j_manager \
    -e DATASOURCE_USERNAME=root \
    -e DATASOURCE_PASSWORD=root \
    hippo4j/hippo4j-server
    

    访问 Server 控制台,路径 http://hadoop3:6691/index.html,默认用户名密码:admin / 123456

    二进制安装

    # 下载hippo4j-server1.5.0最新版本二进制文件,
    wget https://github.com/opengoofy/hippo4j/releases/download/v1.5.0/hippo4j-server-1.5.0.tar.gz
    # 解压文件
    tar -xvf hippo4j-server-1.5.0.tar.gz
    # 进入目录
    cd hippo4j-server/
    # 创建数据库用户并执行conf/hippo4j_manager.sql创建和初始hippo4j_manager数据库,按需修改conf/application.properties数据库连接信息
    # 授权startup.sh执行权限后,启动hippo4j-server
    ./bin/startup.sh
    

    如果不下载二进制也可以使用源码编译的方式,修改resources目录下的application.properties数据库连接信息,启动 Hippo4j-Server/Hippo4j-Bootstrap模块下 ServerApplication 应用类。

    http://hadoop3:6691/index.html,默认用户名密码:admin / 123456

    运行模式

      轻量级动态线程池管理,依赖 Nacos、Apollo、Zookeeper、ETCD、Polaris、Consul 等三方配置中心(任选其一)完成线程池参数动态变更,支持运行时报警、监控等功能。

    • 部署 Hippo4j server 服务,通过可视化 Web 界面完成线程池的创建、变更以及查看,不依赖三方中间件。相比较 Hippo4j config,功能会更强大,但同时也引入了一定的复杂性。需要部署一个 Java 服务,以及依赖 MySQL 数据库。

    Hippo4j config Hippo4j server
    依赖 Nacos、Apollo、Zookeeper、ETCD、Polaris、Consul 配置中心(任选其一) 部署 Hippo4j server(内部无依赖中间件)
    使用 配置中心补充线程池相关参数 Hippo4j server web 控制台添加线程池记录
    功能 包含基础功能:参数动态化、运行时监控、报警等 基础功能之外扩展控制台界面、线程池堆栈查看、线程池运行信息实时查看、历史运行信息查看、线程池配置集群个性化等
      使用建议:根据公司情况选择,如果基本功能可以满足使用,选择 Hippo4j config 使用即可;如果希望更多的功能,可以选择 Hippo4j server。两者在进行替换的时候,无需修改业务代码

    依赖配置中心

    接入流程

      引入依赖
    <dependency>
        <groupId>cn.hippo4j</groupId>
        <artifactId>hippo4j-config-spring-boot-starter</artifactId>
        <version>1.5.0</version>
    </dependency>
    
      启动类上添加注解 @EnableDynamicThreadPool
    • 创建Nacos配置文件。

    在示例工程hippo4j-config-nacos-spring-boot-starter-example中配置Nacos的地址,并在Nacos对应的空间和组下创建hippo4j-nacos.properties文件,将原来在bootstrap.properties中的配置转移到Nacos中

    spring.dynamic.thread-pool.enable=true
    spring.dynamic.thread-pool.banner=true
    spring.dynamic.thread-pool.check-state-interval=5
    spring.dynamic.thread-pool.monitor.enable=true
    spring.dynamic.thread-pool.monitor.collect-types=micrometer
    spring.dynamic.thread-pool.monitor.thread-pool-types=dynamic,web
    spring.dynamic.thread-pool.monitor.initial-delay=10000
    spring.dynamic.thread-pool.monitor.collect-interval=5000
    
    spring.dynamic.thread-pool.notify-platforms[0].platform=WECHAT
    spring.dynamic.thread-pool.notify-platforms[0].token=ac0426a5-c712-474c-9bff-72b8b8f5caff
    spring.dynamic.thread-pool.notify-platforms[1].platform=DING
    spring.dynamic.thread-pool.notify-platforms[1].token=56417ebba6a27ca352f0de77a2ae9da66d01f39610b5ee8a6033c60ef9071c55
    spring.dynamic.thread-pool.notify-platforms[2].platform=LARK
    spring.dynamic.thread-pool.notify-platforms[2].token=2cbf2808-3839-4c26-a04d-fd201dd51f9e
    
    spring.dynamic.thread-pool.executors[0].thread-pool-id=message-consume
    spring.dynamic.thread-pool.executors[0].thread-name-prefix=message-consume
    spring.dynamic.thread-pool.executors[0].core-pool-size=4
    spring.dynamic.thread-pool.executors[0].maximum-pool-size=6
    spring.dynamic.thread-pool.executors[0].queue-capacity=512
    spring.dynamic.thread-pool.executors[0].blocking-queue=ResizableCapacityLinkedBlockingQueue
    spring.dynamic.thread-pool.executors[0].execute-time-out=800
    spring.dynamic.thread-pool.executors[0].rejected-handler=AbortPolicy
    spring.dynamic.thread-pool.executors[0].keep-alive-time=6691
    spring.dynamic.thread-pool.executors[0].allow-core-thread-time-out=true
    spring.dynamic.thread-pool.executors[0].alarm=true
    spring.dynamic.thread-pool.executors[0].active-alarm=80
    spring.dynamic.thread-pool.executors[0].capacity-alarm=80
    spring.dynamic.thread-pool.executors[0].notify.interval=8
    spring.dynamic.thread-pool.executors[0].notify.receives=chen.ma
    spring.dynamic.thread-pool.executors[1].thread-pool-id=message-produce
    spring.dynamic.thread-pool.executors[1].thread-name-prefix=message-produce
    spring.dynamic.thread-pool.executors[1].core-pool-size=2
    spring.dynamic.thread-pool.executors[1].maximum-pool-size=4
    spring.dynamic.thread-pool.executors[1].queue-capacity=1024
    spring.dynamic.thread-pool.executors[1].blocking-queue=ResizableCapacityLinkedBlockingQueue
    spring.dynamic.thread-pool.executors[1].execute-time-out=800
    spring.dynamic.thread-pool.executors[1].rejected-handler=AbortPolicy
    spring.dynamic.thread-pool.executors[1].keep-alive-time=6691
    spring.dynamic.thread-pool.executors[1].allow-core-thread-time-out=true
    spring.dynamic.thread-pool.executors[1].alarm=true
    spring.dynamic.thread-pool.executors[1].active-alarm=80
    spring.dynamic.thread-pool.executors[1].capacity-alarm=80
    spring.dynamic.thread-pool.executors[1].notify.interval=8
    spring.dynamic.thread-pool.executors[1].notify.receives=chen.ma
    

    @DynamicThreadPool 注解修饰。threadPoolId 为服务端创建的线程池 ID。这个也是前面配置的spring.dynamic.thread-pool.executors[0].thread-pool-id=message-consume

    个性化配置

    客户端集群个性化配置,能够针对单独的客户端进行配置变更。

      容器及三方框架线程池自定义启用
    spring:
      dynamic:
        thread-pool:
          tomcat:
            enable: true
          executors:
            - thread-pool-id: message-consume
              enable: false
          adapter-executors:
            - threadPoolKey: 'input'
              enable: true
    
      客户端集群个性化配置:分别在动态线程池、容器线程池以及三方框架线程池配置下增加 nodes 配置节点,通过该配置可匹配需要变更的节点。
    spring:
      dynamic:
        thread-pool:
          tomcat:
            nodes: 192.168.1.5:*,192.168.1.6:8080
          executors:
          - thread-pool-id: message-consume
            nodes: 192.168.1.5:*
          adapter-executors:
            - threadPoolKey: 'input'
              nodes: 192.168.1.5:*
    

    线程池监控

      添加依赖
    <dependency>
        <groupId>io.micrometer</groupId>
        <artifactId>micrometer-registry-prometheus</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    
      添加配置,上面Nccos配置已添加
    • 项目启动,访问 http://localhost:29999/actuator/prometheus 出现 dynamic_thread_pool_ 前缀的指标,即为成功。

    无中间件依赖

    接入流程

    前面部署章节主要演示无中间件依赖的,大体流程和依赖配置中心相似。通过 ThreadPoolBuilder 构建动态线程池,只有 threadFactory、threadPoolId 为必填项,其它参数会从 hippo4j-server 服务拉取。项目中使用上述定义的动态线程池,如下所示:

    @Resourceprivate ThreadPoolExecutor messageConsumeDynamicExecutor;messageConsumeDynamicExecutor.execute(( -> xxx;@Resourceprivate ThreadPoolExecutor messageProduceDynamicExecutor;messageProduceDynamicExecutor.execute(( -> xxx;
    

    服务端配置

    hippo4j.core.clean-history-data-enable
    

    是否开启线程池历史数据清洗,默认开启。

    hippo4j.core.clean-history-data-period
    

    线程池历史数据保留时间,默认值:30,单位分钟。

    hippo4j.core.monitor.report-type
    

    客户端监控上报服务端类型,可选值:http、netty,默认 http。服务端开启 netty 配置后,需要在客户端对应开启才可生效。用来应对大量动态线程池监控场景。

    三方框架线程池适配

      Dubbo
    • Hystrix
    • RabbitMQ
    • RocketMQ
    • AlibabaDubbo
    • RocketMQSpringCloudStream
    • RabbitMQSpringCloudStream

    引入 Hippo4j Server 或 Core 的 Maven Jar 坐标后,还需要引入对应的框架适配 Jar:

    <dependency>    <groupId>cn.hippo4j</groupId>    <!-- Dubbo -->    <artifactId>hippo4j-spring-boot-starter-adapter-dubbo</artifactId>    <!-- Alibaba Dubbo -->    <artifactId>hippo4j-spring-boot-starter-adapter-alibaba-dubbo</artifactId>    <!-- Hystrix -->    <artifactId>hippo4j-spring-boot-starter-adapter-hystrix</artifactId>    <!-- RabbitMQ -->    <artifactId>hippo4j-spring-boot-starter-adapter-rabbitmq</artifactId>    <!-- RocketMQ -->    <artifactId>hippo4j-spring-boot-starter-adapter-rocketmq</artifactId>    <!-- SpringCloud Stream RocketMQ -->    <artifactId>hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq</artifactId>    <!-- SpringCloud Stream RabbitMQ -->    <artifactId>hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rabbitmq</artifactId>    <version>1.5.0</version></dependency>
    

    如果省事仅需引入一个全量包,框架底层会根据条件判断加载具体线程池适配器。

    <dependency>    <groupId>cn.hippo4j</groupId>    <artifactId>hippo4j-spring-boot-starter-adapter-all</artifactId>    <version>1.5.0</version></dependency>
    

    在官方示例中也提供集中线程池适配示例

    spring:  dynamic:    thread-pool:      # 省略其它配置      adapter-executors:        # threadPoolKey 代表线程池标识        - threadPoolKey: 'input'          # mark 为三方线程池框架类型,参见文初已支持框架集合          mark: 'RocketMQSpringCloudStream'          corePoolSize: 10          maximumPoolSize: 10
    

    拒绝策略自定义

    Hippo4j 通过 SPI 的方式对拒绝策略进行扩展,可以让用户在 Hippo4j 中完成自定义拒绝策略实现。自定义拒绝策略,实现 CustomRejectedExecutionHandler 接口,在hippo4j-example-core中添加MyDemoRejectedExecutionHandler.java,内容如下:

    package cn.hippo4j.example.core.handler;import cn.hippo4j.common.executor.support.CustomRejectedExecutionHandler;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.concurrent.RejectedExecutionHandler;import java.util.concurrent.ThreadPoolExecutor;public class MyDemoRejectedExecutionHandler implements CustomRejectedExecutionHandler {    @Override    public Integer getType( {        return 15;    }    @Override    public String getName( {        return null;    }    @Override    public RejectedExecutionHandler generateRejected( {        return new CustomMyDemoRejectedExecutionHandler(;    }    public static class CustomMyDemoRejectedExecutionHandler implements RejectedExecutionHandler {        @Override        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor {            Logger logger = LoggerFactory.getLogger(this.getClass(;            logger.error("线程池抛出拒绝策略MyDemoRejected";        }    }}
    

    在hippo4j-spring-boot-starter-example模块中src/main/resources/META-INF/services 目录,创建 SPI 自定义拒绝策略文件 cn.hippo4j.common.executor.support.CustomRejectedExecutionHandler,文件内仅放一行自定义拒绝策略全限定名即可,原本已有这个文件,我们修改内容即可

    cn.hippo4j.example.core.handler.MyDemoRejectedExecutionHandler
    

    启动hippo4j-spring-boot-starter-example,修改实例线程池配置

    拒绝策略触发时,完成上述代码效果,仅打印异常日志提示。

    2023-04-17 19:17:33.324 ERROR 29977 --- [ateHandler.test] r$CustomMyDemoRejectedExecutionHandler : 线程池抛出拒绝策略MyDemoRejected
    
    - **本人博客网站**[**IT小神**](http://www.itxiaoshen.com   www.itxiaoshen.com
    

    编程笔记 » 一个基于Java线程池管理的开源框架Hippo4j实践

    赞同 (29) or 分享 (0)
    游客 发表我的评论   换个身份
    取消评论

    表情
    (0)个小伙伴在吐槽