对dubbo的DubboReference.check的参数进行剖析

科技资讯 投稿 5200 0 评论

对dubbo的DubboReference.check的参数进行剖析

背景

注册中心使用都是nacos

例子

接口

public interface DemoService {
    String sayHello(;
}

提供者

@DubboService
public class DemoServiceImpl implements DemoService {
    @Override
    public String sayHello( {
        return "hello";
    }
}

@EnableDubbo
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class}
public class ReferenceCheckProviderStarter {
    public static void main(String[] args {
        new SpringApplicationBuilder(ReferenceCheckProviderStarter.class
                .web(WebApplicationType.NONE // .REACTIVE, .SERVLET
                .run(args;
        System.out.println("dubbo service started";
    }
}

消费者

@EnableDubbo
@RestController
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class}
public class ReferenceCheckConsumerStarter {

    @DubboReference
    private DemoService demoService;

    @GetMapping("/dubbo/nacos/test"
    public Object test( {
        return demoService.sayHello(;
    }

    public static void main(String[] args {
        SpringApplication.run(ReferenceCheckConsumerStarter.class, args;
    }
}

1. 先启动provider,再启动consumer

nacos出现provider的服务

nacos出现consumer的服务

访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回

hello

c. 终止provider
nacos上provider的服务消失了

访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回

No provider available from registry

d. 重新启动provider
nacos出现provider的服务

访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回

hello

可以看出:先启动provider,再启动consumer,整个过程是没问题。

2. 先启动consumer,再启动provider

nacos出现consumer的服务,但立即又消失了

nacos出现provider的服务

访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回

Directory already destroyed .

可以看出:当consumer先启动时,如果provider此时没有启动,consumer就再也访问不到provider的服务了。

3. 先启动consumer,再启动provider (check=false

@DubboRefere的参数

@DubboReference(check = false
private DemoService demoService;

a. 启动consumer
nacos出现consumer的服务

访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回
No provider available from registry

b. 启动provider
nacos出现provider的服务

访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回
hello

可以看出:即使是consumer先启动,当provider启动后,consumer还是能够访问到provider的服务的。

关于报错

org.apache.dubbo.rpc.RpcException: No provider available from registry

public class RegistryDirectory<T> extends DynamicDirectory<T> {
@Override
    public List<Invoker<T>> doList(Invocation invocation {
        if (forbidden {
            // 1. No service provider 2. Service providers are disabled
            throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " +
                    getUrl(.getAddress( + " for service " + getConsumerUrl(.getServiceKey( + " on consumer " +
                    NetUtils.getLocalHost( + " use dubbo version " + Version.getVersion( +
                    ", please check status of providers(disabled, not registered or in blacklist.";
        }

        // ......
    }
}
public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> {
    String EMPTY_PROTOCOL = "empty";

    private void refreshInvoker(List<URL> invokerUrls {
        Assert.notNull(invokerUrls, "invokerUrls should not be null, use empty url list to clear address.";
        this.originalUrls = invokerUrls;

        if (invokerUrls.size( == 0 {
            logger.info("Received empty url list...";
            this.forbidden = true; // Forbid to access // 这里
            this.invokers = Collections.emptyList(;
            routerChain.setInvokers(this.invokers;
            destroyAllInvokers(; // Close all invokers
        } else {
            this.forbidden = false; // Allow accessing // 这里
            if (CollectionUtils.isEmpty(invokerUrls {
                return;
            }

            // can't use local reference because this.urlInvokerMap might be accessed at isAvailable( by main thread concurrently.
            Map<String, Invoker<T>> oldUrlInvokerMap = null;
            if (this.urlInvokerMap != null {
                // the initial capacity should be set greater than the maximum number of entries divided by the load factor to avoid resizing.
                oldUrlInvokerMap = new LinkedHashMap<>(Math.round(1 + this.urlInvokerMap.size( / DEFAULT_HASHMAP_LOAD_FACTOR;
                this.urlInvokerMap.forEach(oldUrlInvokerMap::put;
            }
            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(oldUrlInvokerMap, invokerUrls;// Translate url list to Invoker map // 这里
            logger.info("Refreshed invoker size " + newUrlInvokerMap.size(;

            if (CollectionUtils.isEmptyMap(newUrlInvokerMap {
                logger.error(new IllegalStateException("Cannot create invokers from url address list (total " + invokerUrls.size( + "";
                return;
            }
            List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values(;
            // pre-route and build cache, notice that route cache should build on original Invoker list.
            // toMergeMethodInvokerMap( will wrap some invokers having different groups, those wrapped invokers not should be routed.
            routerChain.setInvokers(newInvokers;
            this.invokers = multiGroup ? toMergeInvokerList(newInvokers : newInvokers;
            this.urlInvokerMap = newUrlInvokerMap; // 这里

            if (oldUrlInvokerMap != null {
                try {
                    destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap; // Close the unused Invoker
                } catch (Exception e {
                    logger.warn("destroyUnusedInvokers error. ", e;
                }
            }
        }

        // notify invokers refreshed
        this.invokersChanged(;
    }

    private synchronized void refreshOverrideAndInvoker(List<URL> instanceUrls {
        // mock zookeeper://xxx?mock=return null
        if (enableConfigurationListen {
            overrideDirectoryUrl(;
        }
        refreshInvoker(instanceUrls; // 这里
    }
}
public class RegistryDirectory<T> extends DynamicDirectory<T> {
    @Override
    public synchronized void notify(List<URL> urls {
        if (isDestroyed( {
            return;
        }

        Map<String, List<URL>> categoryUrls = urls.stream(
                .filter(Objects::nonNull
                .filter(this::isValidCategory
                .filter(this::isNotCompatibleFor26x
                .collect(Collectors.groupingBy(this::judgeCategory;

        List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList(;
        this.configurators = Configurator.toConfigurators(configuratorURLs.orElse(this.configurators;

        List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList(;
        toRouters(routerURLs.ifPresent(this::addRouters;

        // providers
        List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList(;

        // 3.x added for extend URL address
        ExtensionLoader<AddressListener> addressListenerExtensionLoader = getUrl(.getOrDefaultModuleModel(.getExtensionLoader(AddressListener.class;
        List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(, (String[] null;
        if (supportedListeners != null && !supportedListeners.isEmpty( {
            for (AddressListener addressListener : supportedListeners {
                providerURLs = addressListener.notify(providerURLs, getConsumerUrl(,this;
            }
        }

        refreshOverrideAndInvoker(providerURLs; // 这里
    }
}

public abstract class AbstractRegistry implements Registry {
    /**
     * Notify changes from the Provider side.
     *
     * @param url      consumer side url
     * @param listener listener
     * @param urls     provider latest urls
     */
    protected void notify(URL url, NotifyListener listener, List<URL> urls {
        if (url == null {
            throw new IllegalArgumentException("notify url == null";
        }
        if (listener == null {
            throw new IllegalArgumentException("notify listener == null";
        }
        if ((CollectionUtils.isEmpty(urls
            && !ANY_VALUE.equals(url.getServiceInterface( {
            logger.warn("Ignore empty notify urls for subscribe url " + url;
            return;
        }
        if (logger.isInfoEnabled( {
            logger.info("Notify urls for subscribe url " + url + ", url size: " + urls.size(;
        }
        // keep every provider's category.
        Map<String, List<URL>> result = new HashMap<>(; // 这里
        for (URL u : urls {
            if (UrlUtils.isMatch(url, u {
                String category = u.getCategory(DEFAULT_CATEGORY;
                List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>(; // 这里
                categoryList.add(u; // 这里
            }
        }
        if (result.size( == 0 {
            return;
        }
        Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>(;
        for (Map.Entry<String, List<URL>> entry : result.entrySet( {
            String category = entry.getKey(;
            List<URL> categoryList = entry.getValue(;
            categoryNotified.put(category, categoryList;
            listener.notify(categoryList; // 这里
            // We will update our cache file after each notification.
            // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
            if (localCacheEnabled {
                saveProperties(url;
            }
        }
    }
}
public class NacosRegistry extends FailbackRegistry {
    private void notifySubscriber(URL url, NotifyListener listener, Collection<Instance> instances {
        List<Instance> enabledInstances = new LinkedList<>(instances;
        if (enabledInstances.size( > 0 {
            //  Instances
            filterEnabledInstances(enabledInstances;
        }
        List<URL> urls = toUrlWithEmpty(url, enabledInstances;
        NacosRegistry.this.notify(url, listener, urls; // 这里
    }

    String EMPTY_PROTOCOL = "empty";

    private List<URL> toUrlWithEmpty(URL consumerURL, Collection<Instance> instances {
        List<URL> urls = buildURLs(consumerURL, instances;
        if (urls.size( == 0 { // 这里
            URL empty = URLBuilder.from(consumerURL
                .setProtocol(EMPTY_PROTOCOL
                .addParameter(CATEGORY_KEY, DEFAULT_CATEGORY
                .build(;
            urls.add(empty;
        }
        return urls;
    }
}

当没有可用的服务时,instances是空的

是怎么通知的

public class ServiceInfoHolder implements Closeable {
    public ServiceInfo processServiceInfo(ServiceInfo serviceInfo {
        String serviceKey = serviceInfo.getKey(;
        if (serviceKey == null {
            return null;
        }
        ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey(;
        if (isEmptyOrErrorPush(serviceInfo {
            //empty or error push, just ignore
            return oldService;
        }
        serviceInfoMap.put(serviceInfo.getKey(, serviceInfo;
        boolean changed = isChangedServiceInfo(oldService, serviceInfo;
        if (StringUtils.isBlank(serviceInfo.getJsonFromServer( {
            serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo;
        }
        MetricsMonitor.getServiceInfoMapSizeMonitor(.set(serviceInfoMap.size(;
        if (changed { // 这里
            NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount( + " service: " + serviceInfo.getKey( + " -> "
                    + JacksonUtils.toJson(serviceInfo.getHosts(;
            NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(, serviceInfo.getGroupName(,
                    serviceInfo.getClusters(, serviceInfo.getHosts(; // 这里
            DiskCache.write(serviceInfo, cacheDir;
        }
        return serviceInfo;
    }
}

public class DefaultPublisher extends Thread implements EventPublisher {
    private BlockingQueue<Event> queue;

    @Override
    public void init(Class<? extends Event> type, int bufferSize {
        setDaemon(true;
        setName("nacos.publisher-" + type.getName(;
        this.eventType = type;
        this.queueMaxSize = bufferSize;
        this.queue = new ArrayBlockingQueue<>(bufferSize; // 这里
        start(;
    }

    @Override
    public boolean publish(Event event {
        checkIsStart(;
        boolean success = this.queue.offer(event; // 这里
        if (!success {
            LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event;
            receiveEvent(event;
            return true;
        }
        return true;
    }

    @Override
    public void run( {
        openEventHandler(;
    }
    
    void openEventHandler( {
        try {
            
            // This variable is defined to resolve the problem which message overstock in the queue.
            int waitTimes = 60;
            // To ensure that messages are not lost, enable EventHandler when
            // waiting for the first Subscriber to register
            for (; ;  {
                if (shutdown || hasSubscriber( || waitTimes <= 0 {
                    break;
                }
                ThreadUtils.sleep(1000L;
                waitTimes--;
            }
            
            for (; ;  {
                if (shutdown {
                    break;
                }
                final Event event = queue.take(; // 这里
                receiveEvent(event;  // 这里
                UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence(;
            }
        } catch (Throwable ex {
            LOGGER.error("Event listener exception : ", ex;
        }
    }

    void receiveEvent(Event event {
        final long currentEventSequence = event.sequence(;
        
        if (!hasSubscriber( {
            LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.";
            return;
        }
        
        // Notification single event listener
        for (Subscriber subscriber : subscribers {
            // Whether to ignore expiration events
            if (subscriber.ignoreExpireEvent( && lastEventSequence > currentEventSequence {
                LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
                        event.getClass(;
                continue;
            }
            
            // Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
            // Remove original judge part of codes.
            notifySubscriber(subscriber, event; // 这里
        }
    }

    @Override
    public void notifySubscriber(final Subscriber subscriber, final Event event {
        
        LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber;
        
        final Runnable job = ( -> subscriber.onEvent(event;
        final Executor executor = subscriber.executor(; 
        
        if (executor != null {
            executor.execute(job; // 这里
        } else {
            try {
                job.run(; // 这里
            } catch (Throwable e {
                LOGGER.error("Event callback exception: ", e;
            }
        }
    }
}

public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {
    @Override
    public void onEvent(InstancesChangeEvent event {
        String key = ServiceInfo
                .getKey(NamingUtils.getGroupedName(event.getServiceName(, event.getGroupName(, event.getClusters(;
        ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key;
        if (CollectionUtils.isEmpty(eventListeners {
            return;
        }
        for (final EventListener listener : eventListeners {
            final com.alibaba.nacos.api.naming.listener.Event namingEvent = transferToNamingEvent(event;
            if (listener instanceof AbstractEventListener && ((AbstractEventListener listener.getExecutor( != null {
                ((AbstractEventListener listener.getExecutor(.execute(( -> listener.onEvent(namingEvent; // 这里
            } else {
                listener.onEvent(namingEvent; // 这里
            }
        }
    }
}

public class NacosRegistry extends FailbackRegistry {
        @Override
        public void onEvent(Event event {
            if (event instanceof NamingEvent {
                NamingEvent e = (NamingEvent event;
                notifier.notify(e.getInstances(; // 这里
            }
        }
}

public abstract class RegistryNotifier {
    public synchronized void notify(Object rawAddresses {
        this.rawAddresses = rawAddresses;
        long notifyTime = System.currentTimeMillis(;
        this.lastEventTime = notifyTime;

        long delta = (System.currentTimeMillis( - lastExecuteTime - delayTime;

        // more than 10 calls && next execute time is in the future
        boolean delay = shouldDelay.get( && delta < 0;
        if (delay {
            scheduler.schedule(new NotificationTask(this, notifyTime, -delta, TimeUnit.MILLISECONDS; // 这里
        } else {
            // check if more than 10 calls
            if (!shouldDelay.get( && executeTime.incrementAndGet( > DEFAULT_DELAY_EXECUTE_TIMES {
                shouldDelay.set(true;
            }
            scheduler.submit(new NotificationTask(this, notifyTime; // 这里
        }
    }

    public static class NotificationTask implements Runnable {
        private final RegistryNotifier listener;
        private final long time;

        public NotificationTask(RegistryNotifier listener, long time {
            this.listener = listener;
            this.time = time;
        }

        @Override
        public void run( {
            try {
                if (this.time == listener.lastEventTime {
                    listener.doNotify(listener.rawAddresses; // 这里
                    listener.lastExecuteTime = System.currentTimeMillis(;
                    synchronized (listener {
                        if (this.time == listener.lastEventTime {
                            listener.rawAddresses = null;
                        }
                    }
                }
            } catch (Throwable t {
                logger.error("Error occurred when notify directory. ", t;
            }
        }
    }}
}

public class NacosRegistry extends FailbackRegistry {

    private class RegistryChildListenerImpl implements EventListener {
        private RegistryNotifier notifier;

        public RegistryChildListenerImpl(String serviceName, URL consumerUrl, NotifyListener listener {
            notifier = new RegistryNotifier(getUrl(, NacosRegistry.this.getDelay( {
                @Override
                protected void doNotify(Object rawAddresses {
                    List<Instance> instances = (List<Instance> rawAddresses;
                    if (isServiceNamesWithCompatibleMode(consumerUrl {
                        /**
                         * Get all instances with corresponding serviceNames to avoid instance overwrite and but with empty instance mentioned
                         * in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899
                         */
                        NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances;
                        instances = NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(serviceName;
                    }
                    NacosRegistry.this.notifySubscriber(consumerUrl, listener, instances; // 这里
                }
            };
        }
}

然后就调用了上面的👆🏻

什么时候添加监听器的?

public class NacosRegistry extends FailbackRegistry {

    private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener
        throws NacosException {
        EventListener eventListener = new RegistryChildListenerImpl(serviceName, url, listener;  // 这里
        namingService.subscribe(serviceName,
            getUrl(.getGroup(Constants.DEFAULT_GROUP,
            eventListener; // 这里
    }

    private void doSubscribe(final URL url, final NotifyListener listener, final Set<String> serviceNames {
        try {
            if (isServiceNamesWithCompatibleMode(url {
                List<Instance> allCorrespondingInstanceList = Lists.newArrayList(;

                /**
                 * Get all instances with serviceNames to avoid instance overwrite and but with empty instance mentioned
                 * in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899
                 *
                 * namingService.getAllInstances with {@link org.apache.dubbo.registry.support.AbstractRegistry#registryUrl}
                 * default {@link DEFAULT_GROUP}
                 *
                 * in https://github.com/apache/dubbo/issues/5978
                 */
                for (String serviceName : serviceNames {
                    List<Instance> instances = namingService.getAllInstances(serviceName,
                        getUrl(.getGroup(Constants.DEFAULT_GROUP;
                    NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances;
                    allCorrespondingInstanceList.addAll(instances;
                }
                notifySubscriber(url, listener, allCorrespondingInstanceList; 
                for (String serviceName : serviceNames {
                    subscribeEventListener(serviceName, url, listener; // 这里
                }
            } else {
                for (String serviceName : serviceNames {
                    List<Instance> instances = new LinkedList<>(;
                    instances.addAll(namingService.getAllInstances(serviceName
                        , getUrl(.getGroup(Constants.DEFAULT_GROUP;
                    String serviceInterface = serviceName;
                    String[] segments = serviceName.split(SERVICE_NAME_SEPARATOR, -1;
                    if (segments.length == 4 {
                        serviceInterface = segments[SERVICE_INTERFACE_INDEX];
                    }
                    URL subscriberURL = url.setPath(serviceInterface.addParameters(INTERFACE_KEY, serviceInterface,
                        CHECK_KEY, String.valueOf(false;
                    notifySubscriber(subscriberURL, listener, instances;
                    subscribeEventListener(serviceName, subscriberURL, listener;
                }
            }
        } catch (Throwable cause {
            throw new RpcException("Failed to subscribe " + url + " to nacos " + getUrl( + ", cause: " + cause.getMessage(, cause;
        }
    }
}

org.apache.dubbo.rpc.RpcException: Directory already destroyed

public abstract class AbstractDirectory<T> implements Directory<T> {
    @Override
    public List<Invoker<T>> list(Invocation invocation throws RpcException {
        if (destroyed {
            throw new RpcException("Directory already destroyed .url: " + getUrl(;
        }

        return doList(invocation;
    }

    @Override
    public void destroy( {
        destroyed = true; // 这里
    }
}
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {

    private void checkInvokerAvailable( throws IllegalStateException {
        if (shouldCheck( && !invoker.isAvailable( {
            invoker.destroy(; // 这里
            throw new IllegalStateException("Should has at least one way to know which services this interface belongs to," +
                " subscription url: " + invoker.getUrl(;
        }
    }

    protected synchronized void init( {
        // ......

        checkInvokerAvailable(; // 这里
    }

}
public abstract class ReferenceConfigBase<T> extends AbstractReferenceConfig {
    public boolean shouldCheck( {
        checkDefault(;
        Boolean shouldCheck = isCheck(; // 这里
        if (shouldCheck == null && getConsumer( != null {
            shouldCheck = getConsumer(.isCheck(; 
        }
        if (shouldCheck == null {
            // default true // 这里
            shouldCheck = true;
        }
        return shouldCheck;
    }
}
public class RegistryDirectory<T> extends DynamicDirectory<T> {
    @Override
    public boolean isAvailable( {
        if (isDestroyed( || this.forbidden { // 这里
            return false;
        }
        Map<URL, Invoker<T>> localUrlInvokerMap = urlInvokerMap; // 这里
        return CollectionUtils.isNotEmptyMap(localUrlInvokerMap
                && localUrlInvokerMap.values(.stream(.anyMatch(Invoker::isAvailable;
    }
}

如果没有设置check字段,那么就会在启动的时候检查提供方是否可用,如果不可用,就销毁了。

编程笔记 » 对dubbo的DubboReference.check的参数进行剖析

赞同 (24) or 分享 (0)
游客 发表我的评论   换个身份
取消评论

表情
(0)个小伙伴在吐槽