作者:京东物流 王北永 姚再毅 李振
1 背景
2 技术依赖
1Jsf:京东RPC框架,用作机器之间的通讯工具
3ZK/Curator: Zookeeper,用作配置信息的存储 和redis二选一
3 实现原理
1)接入方:
2)统一配置服务端:
整个接入方和统一配置服务端的架构如下图
4 实现步骤
this.client= new BroadCastClient(this;
this.proxyInvoker = new ClientProxyInvoker(this;
ProtocolFactory.check(Constants.ProtocolType.valueOf(this.getProtocol(, Constants.CodecType.valueOf(this.getSerialization(;
this.proxyIns = (T ProxyFactory.buildProxy(this.getProxy(, this.getProxyClass(, this.proxyInvoker;
2广播调用客户端方法分别获取当前注册中心有心跳的服务提供者和已失去连接的机器列表。对统一配置来讲,要么同时失败,要么同时成功,判断如果存在不正常的服务提供方,则不同步。只有全部提供方存在才可以开始广播配置信息
ConcurrentHashMap<Provider, ClientTransport> concurrentHashMap = this.connectionHolder.getAliveConnections(;
ConcurrentHashMap<Provider, ClientTransportConfig> deadConcurrentHashMap = this.connectionHolder.getDeadConnections(;
if(deadConcurrentHashMap!=null && deadConcurrentHashMap.size(>0{
log.warn("当前别名{}存在不正常服务提供方数量{},请关注!",msg.getAlias(,deadConcurrentHashMap.size(;
throw new RpcException(String.format("当前别名%s存在不正常服务提供方数量%s,请关注!",msg.getAlias(,deadConcurrentHashMap.size(;
}
if(concurrentHashMap.isEmpty({
log.info("当前别名{}不存在正常服务提供方",msg.getAlias(;
throw new RpcException(String.format("当前别名%s不存在正常服务提供方",msg.getAlias(;
}
Iterator aliveConnections = concurrentHashMap.entrySet(.iterator(;
log.info("当前别名{}存在正常服务提供方数量{}",msg.getAlias(,concurrentHashMap.size(;
while (aliveConnections.hasNext( {
Entry<Provider, ClientTransport> entry = (Entry aliveConnections.next(;
Provider provider = (Provider entry.getKey(;
log.info("当前连接ip={}、port={}、datacenterCode={}",provider.getIp(,provider.getPort(,provider.getDatacenterCode(;
ClientTransport connection = (ClientTransport entry.getValue(;
if (connection != null && connection.isOpen( {
try {
result = super.sendMsg0(new Connection(provider, connection, msg;
} catch (RpcException rpc {
exception = rpc;
log.warn(rpc.getMessage(, rpc;
} catch (Throwable e {
exception = new RpcException(e.getMessage(, e;
log.warn(e.getMessage(, e;
}
}
}
3服务配置端,当业务人员配置及时生效或者任务达到时,则根据配置,生成服务调用方,通过统一刷新接口将配置同步刷新到对应的接入系统中,如下图为操作界面,当增删改查时,会将属性增量同步。
public static ExcuteAction createJsfConsumer(String alias, String token {
RegistryConfig jsfRegistry = new RegistryConfig(;
jsfRegistry.setIndex("i.jsf.jd.com";
BroadCastConsumerConfig consumerConfig = new BroadCastConsumerConfig<>(;
Map<String, String> parameters = new HashMap<>(;
parameters.put(".token",token;
consumerConfig.setParameters(parameters;
consumerConfig.setInterfaceId(RefreshRemoteService.class.getName(;
consumerConfig.setRegistry(jsfRegistry;
consumerConfig.setProtocol("jsf";
consumerConfig.setAlias(alias;
consumerConfig.setRetries(2;
return new ExcuteAction(consumerConfig;
}
通过以上的配置的客户端,调用服务提供方方法refreshRemoteService#refresh,将配置信息进行同步到各个接入系统
public void call(Map<String,Object> propertiesValue{
try{
RefreshRemoteService refreshRemoteService = (RefreshRemoteServiceconsumerConfig.refer(;
if(refreshRemoteService!=null{
refreshRemoteService.refresh(propertiesValue;
}
}catch (Exception e{
log.error(e.getMessage(;
throw new EasyConfigException(e;
}finally {
consumerConfig.unrefer(; ;
}
}
4)接入方启动时,需要根据自己配置,将存在redis或者zk的配置一次加载到实例变量中。并注册刷新接口到JSF注册中心。
@Bean(name = "refreshPorpertiesService"
public ProviderConfig createJsfProvider( throws Exception {
ProviderConfig providerConfig = new ProviderConfig(;
providerConfig.setId("refreshPorpertiesService";
providerConfig.setInterfaceId(RefreshRemoteService.class.getName(;
providerConfig.setRef(new RefreshRemoteServiceDelage(applicationContext;
providerConfig.setTimeout(30000;
providerConfig.setAlias(EasyConfigure.getAppCode(+EasyConfigure.getEnv(;
providerConfig.setServer(serverConfig;
providerConfig.setRegistry(jsfRegistry;
providerConfig.setParameter("token", MD5Util.md5(EasyConfigure.getAppCode(;
providerConfig.export(;
return providerConfig;
}
其中
RefreshRemoteServiceDelage类提供刷新接口的实际逻辑如下,需判断当前实例是jdk动态代理还是cglib代理
if(AopUtils.isJdkDynamicProxy(object {
object= AopUtil.getJdkDynamicProxyTargetObject(object;
} else if(AopUtils.isCglibProxy(object{ //cglib
object= AopUtil.getCglibProxyTargetObject(object;
}
实例对象变量值根据自定义的参数转换方式转换后赋值实例变量
if(autoValue.convert(!=null && !autoValue.getClass(.isInterface({
if(!autoValue.convert(.newInstance(.getInClassType(.isAssignableFrom(newVal.getClass( {
continue;
}
newVal = autoValue.convert(.newInstance(.convert(newVal;
if(newVal!=null{
if(!autoValue.convert(.newInstance(.getOutClassType(.isAssignableFrom(newVal.getClass( {
continue;
}
field.setAccessible(true;
Object value = ReflectionUtils.getField(field,object;
log.info("change properties{} for object {} before value {}",field.getName(,object.getClass(.getName(,value;
ReflectionUtils.setField(field,object,newVal;
log.info("change properties{} for object {} after value {}",field.getName(,object.getClass(.getName(,newVal;
}
}
5 实践
1pom引入
<dependency>
<groupId>com.jdl</groupId>
<artifactId>easyconfig</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
2配置存储配置(比如redis方式
refresh:
config:
appCode: zdzq-worker-appcode
redisUrl: redis://:@127.0.0.1
3)类全局变量需要实时刷新配置,需在类统一指定注解PropertiryChangeListener,实例变量需要增加注解AutoValue并指定数据格式转换器
@PropertiryChangeListener
public class ChanceServiceImpl implement ChanceService{
@AutoValue(convert = DateConvert.class,alias = "config-id"
private Date signDate;
@AutoValue(convert = SpmKaApply Convert.class,alias = "config-id"
private SpmKaApply spmKaApply;
}
以上convert方法自定义,支持各种复杂配置对象,举例数据转换为List如下
public class Convert2 implements Convert<Map<String, String>, Set<String>> {
public Convert2({}
@Override
public Set<String> convert(Map<String, String> siteInfoMap {
....你的对象值转换
}
@Override
public Class<?> getInClassType( {
return Map.class;
}
@Override
public Class<?> getOutClassType( {
return Set.class;
}
接入应用服务启动后,可访问/refreshUI 可查看应用在集群中为自动配置的实例,并显示当前实例中变量值参数。key为实例变量名。
6 总结
2、支持定时刷新配置
3、直接查看和验证应用集群中实例变量是否一致