Dubbo 学习笔记

2020-04-04

Deep in dubbo

使用篇

服务发布

provider启动流程

1.ServiceConfig#export

服务提供方在启动部署时,dubbo会调用ServiceConfig#export来激活服务发布流程,如下所示:

  • Java API:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 1. 创建ServiceConfig实例
ServiceConfig<IGreetingService> serviceConfig = new ServiceConfig<>();
// 2. 设置应用程序配置
serviceConfig.setApplication(new ApplicationConfig("deep-in-dubbo-first-provider"));
// 3. 设置注册中心
RegistryConfig registryConfig = new RegistryConfig("zookeeper://127.0.0.1:2181/");
serviceConfig.setRegistry(registryConfig);
// 4. 设置接口和实现类
// 5. 设置服务分组和版本
// dubbo中,服务接口+服务分组+服务版本 唯一的确定一个服务,同一个接口可以有不同版本,方便维护升级
serviceConfig.setInterface(IGreetingService.class);
serviceConfig.setRef(new GreetingServiceImpl());
serviceConfig.setVersion("1.0.0");
serviceConfig.setGroup("dubbo-sxzhongf-group");
RpcContext.getContext().setAttachment("age","18");

// 7. 导出服务,启动Netty监听链接请求,并将服务注册到注册中心
serviceConfig.export();

// 8. 挂起线程,避免服务停止
System.out.println("api provider service is started...");
System.in.read();
  • XML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">

<!-- provider's application name, used for tracing dependency relationship -->
<dubbo:application name="first-xml-provider"/>
<!-- use multicast registry center to export service -->
<dubbo:registry address="zookeeper://127.0.0.1:2181/"/>
<!-- use dubbo protocol to export service on port 20880 -->
<dubbo:protocol name="dubbo" port="20880"/>
<!-- service implementation, as same as regular local bean -->
<bean id="demoService" class="com.sxzhongf.deep.in.dubbo.provider.service.impl.GreetingServiceImpl"/>
<!-- declare the service interface to be exported -->
<dubbo:service interface="com.sxzhongf.deep.in.dubbo.api.service.IGreetingService"
ref="demoService" version="1.0.0" group="dubbo-sxzhongf-group">
<dubbo:method name="sayHello" async="false" timeout="0" retries="3"></dubbo:method>
<dubbo:method name="testGeneric" async="false" timeout="10000" retries="3"></dubbo:method>
</dubbo:service>
</beans>

查看export源码可知,总共有三种服务导出选项:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public synchronized void export() {
//1. 是否导出
if (!shouldExport()) {
return;
}
...
//2.延迟导出
if (shouldDelay()) {
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
//3.立刻导出
doExport();
}
}
2.ServiceConfig#doExport

此方法主要是根据设置的属性进行合法性检查,主要包含是否已被导出,doExportUrls();

3.doExportUrls
4.ConfigValidationUtils#loadRegistries

此方法用来加载所有的服务注册中心对象,在dubbo中,一个service可以被注册到多个注册中心。

通过doExportUrlsFor1Protocol(protocolConfig, registryURLs);

5.doExportUrlsFor1Protocol

在此方法中会将所有的参数封装成org.apache.dubbo.common.URL对象,然后执行具体的服务导出。

具体过程分为:

  • 1.解析MethodConfig配置(单独的方法调用参数设置)

  • 2.泛型调用类型设置

  • 3.拼接URL参数

  • 4.导出具体服务

    导出又分为四种范围(scope):

    • SCOPE_NONE = “none”,如果设定为none,表示该服务不导出。

    • SCOPE_LOCAL = “local” ,如果设定为local,表示该服务导出到本地(injvm–伪协议,实现类为:org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol

      • SCOPE_REMOTE = “remote”,如果设定为remote,表示该服务导出到远程。
    • 如果有注册中心,发布到注册中心

    • 如果没有注册中心,则表示服务是直连方式

    • dubbo-2.7.0开始,新增加了WritableMetadataService 来存储dubbo 服务的元数据,元数据可以存储在远端配置中心和本地,默认是存储在本地,通过设置:METADATA_KEY = "metadata"

      • DEFAULT_METADATA_STORAGE_TYPE = “local”
      • REMOTE_METADATA_STORAGE_TYPE = “remote”
      1
      2
      3
      WritableMetadataService metadataService = WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE));
      if (metadataService != null) { metadataService.publishServiceDefinition(url);
      }
      • 不设置,导出到本地和远端
    • 最终执行导出的代码如下

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      // 扩展适配类
      private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

      // 扩展适配类
      private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
      ...

      Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
      DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

      Exporter<?> exporter = protocol.export(wrapperInvoker);
      exporters.add(exporter);

      由于protocolPROXY_FACTORY都是扩展适配类,跟踪代码我们可以发现:

      • 执行PROXY_FACTORY.getInvoker的时候实际上首先执行扩展接口ProxyFactory的适配类ProxyFactory$AdaptivegetInvoker方法,根据URL中参数proxy的设置类型选择具体的代理工厂,默认使用的是javassist,,因此又调用了org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory#getInvoker来获取代理实现类,代码如下:

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19

        public class JavassistProxyFactory extends AbstractProxyFactory {
        ...
        @Override
        public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        // 这里使用javassist动态代理生成serviceImpl实现类的包装类`Wraaper...`
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
        Class<?>[] parameterTypes,
        Object[] arguments) throws Throwable {
        return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
        };
        }
        ...
        }

        上面代码有2个目的:

      1. inal Wrapper wrapper = Wrapper.getWrapper(...);用来生成具体serviceImpl的包装类,减少反射的性能损耗;
      2. return new AbstractProxyInvoker<T>... 返回了一个抽象的代理invoker,并且重写了doInvoker方法,重写之后使用包装类中的invokeMethod来调用方法。

经过上述2步,服务提供方就将具体的实现类转换为Invoker代理。

  • 然后,当执行protocol.export(),实际上也是调用了Protocol$Adaptive#export()方法,同时也分为两种情况

  • 如果为远程暴露,则执行RegistryProtocol#export

  • 如果为本地暴露,则只需InjvmProtocol#export
    由于dubbo的增强SPI特性支持,injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));,则在调用之前会一层一层调用,ProtocolFilterWrapper->ProtocolListenerWrapper->QosProtocolWrapper,最后会调用export方法,此方法会将Invoker转换为Exporter对象,在org.apache.dubbo.registry.integration.RegistryProtocol#export方法中,org.apache.dubbo.registry.integration.RegistryProtocol#doLocalExport方法启NettyServer来监听服务,org.apache.dubbo.registry.integration.RegistryProtocol#register将当前的服务注册到注册中心。

  • doLocalExport 是如何启动NettyServer呢?

1
2
3
4
5
6
7
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}

此时URL中的protocol类型为默认的dubbo,因此会执行DubboProtocol#export进行转换,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
// invoker->exporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);

//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}

} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
//创建server
openServer(url);
//序列化提示
optimizeSerialization(url);
return exporter;
}

可以看到代码执行到openServer,因为key=getAddress()=ip+port,因此,同一台机器只会开启一个NettyServer.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
ProtocolServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}

对于org.apache.dubbo.remoting.Transporter 的适配类选择有三种:MinaTransporterNettyTransporterGrizzlyTransporter,关于JavaNIO:Apache Mina、JBoss Netty、Sun Grizzly 框架对比:传送门

  • NettyServer启动之后,回到org.apache.dubbo.registry.integration.RegistryProtocol#export方法,继续执行将服务注册到注册中心,我们以Zookeeper为例:

  • 1.首先查找所有注册中心

1
2
3
4
5
6
final Registry registry = getRegistry(originInvoker);
...
protected Registry getRegistry(final Invoker<?> originInvoker) {
URL registryUrl = getRegistryUrl(originInvoker);
return registryFactory.getRegistry(registryUrl);
}

因为RegistryFactory是一个SPI扩展接口,代码中设置的为zookeeper,因此这里调用的是ZookeeperRegistryFactory,继承自:org.apache.dubbo.registry.support.AbstractRegistryFactory#getRegistry(org.apache.dubbo.common.URL),在此方法中调用了createRegistry,但是ZookeeperRegistryFactory重写了createRegistry,因此具体调用的是ZookeeperRegistryFactory#createRegistry,该方法返回了一个new ZookeeperRegistry(url, zookeeperTransporter)实例对象。

  • 2.开始注册,RegistryProtocol#register方法执行注册动作,首先获取到我们在上一步找到的注册中心ZookeeperRegistry,ZookeeperRegistry 执行父类org.apache.dubbo.registry.support.FailbackRegistry#register,在该方法中会调用抽象方法:doRegister,ZookeeperRegistry 重写了改方法,则执行ZookeeperRegistry#doRegister ,如下:
1
2
3
4
5
6
7
8
@Override
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
  • 3.toUrlPath方法会把org.apache.dubbo.common.URL转换格式后存储到zookeeper,如下:
1
2
3
4
5
dubbo://172.16.44.21:20880/com.sxzhongf.deep.in.dubbo.api.service.IGreetingService?anyhost=true&application=deep-in-dubbo-first-provider&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=dubbo-sxzhongf-group&interface=com.sxzhongf.deep.in.dubbo.api.service.IGreetingService&methods=sayHello,testGeneric&pid=8480&release=2.7.5&revision=1.0.0&side=provider&timestamp=1582872610313&version=1.0.0

-----------------------转换------------------------

/dubbo/com.sxzhongf.deep.in.dubbo.api.service.IGreetingService/providers/dubbo%3A%2F%2F172.16.44.21%3A20880%2Fcom.sxzhongf.deep.in.dubbo.api.service.IGreetingService%3Fanyhost%3Dtrue%26application%3Ddeep-in-dubbo-first-provider%26default%3Dtrue%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26group%3Ddubbo-sxzhongf-group%26interface%3Dcom.sxzhongf.deep.in.dubbo.api.service.IGreetingService%26methods%3DsayHello%2CtestGeneric%26pid%3D8480%26release%3D2.7.5%26revision%3D1.0.0%26side%3Dprovider%26timestamp%3D1582872610313%26version%3D1.0.0

转换之后的格式其实就是我们在zookeeper中看到的一样了,不过有几个目录:

  • dubbo

  • com.sxzhongf.deep.in.dubbo.api.service.IGreetingService

  • providers

    1
    2
    [zk: localhost:2181(CONNECTED) 2] ls  /dubbo/com.sxzhongf.deep.in.dubbo.api.service.IGreetingService/providers
    [dubbo%3A%2F%2F172.16.44.21%3A20880%2Fcom.sxzhongf.deep.in.dubbo.api.service.IGreetingService%3Fanyhost%3Dtrue%26application%3Ddeep-in-dubbo-first-provider%26default%3Dtrue%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26group%3Ddubbo-sxzhongf-group%26interface%3Dcom.sxzhongf.deep.in.dubbo.api.service.IGreetingService%26methods%3DsayHello%2CtestGeneric%26pid%3D15716%26release%3D2.7.5%26revision%3D1.0.0%26side%3Dprovider%26timestamp%3D1582872850187%26version%3D1.0.0]

至此,服务消费端就可以从注册中心获取服务提供service进行调用了,下节我们继续来分析,消费端是如何从注册中心拉取service进行处理的。

消费调用

consumer启动流程

消费者在启动之后,会通过ReferenceConfig#get()来生成远程调用代理类。在get方法中,会启动一系列调用函数,我们来一个个解析。

配置同样包含2种:

  • XML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">

<dubbo:application name="first-consumer-xml"/>
<dubbo:registry address="zookeeper://127.0.0.1:2181/"/>
<dubbo:reference proxy="javassist" scope="remote"
id="demoService"
generic="false"
check="false"
async="false"
group="dubbo-sxzhongf-group"
version="1.0.0"
interface="com.sxzhongf.deep.in.dubbo.api.service.IGreetingService">
<dubbo:method name="sayHello" retries="3" timeout="5000" mock="false" />
<dubbo:method name="testGeneric" retries="3" timeout="5000" mock="false" />
</dubbo:reference>
</beans>
  • Java API
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ApiConsumerApplication {
public static void main(String[] args) {
// 1. 创建服务引用对象实例
ReferenceConfig<IGreetingService> referenceConfig = new ReferenceConfig<IGreetingService>();
// 2. 设置应用程序信息
referenceConfig.setApplication(new ApplicationConfig("deep-in-dubbo-first-consumer"));
// 3. 设置注册中心
referenceConfig.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181/"));
// 4. 设置服务接口和超时时间
referenceConfig.setInterface(IGreetingService.class);
// 默认重试3次
referenceConfig.setTimeout(5000);
// 5 设置服务分组和版本
referenceConfig.setGroup("dubbo-sxzhongf-group");
referenceConfig.setVersion("1.0.0");
// 6. 引用服务
IGreetingService greetingService = referenceConfig.get();
// 7. 设置隐式参数
RpcContext.getContext().setAttachment("company", "sxzhongf");
// 获取provider端传递的隐式参数(FIXME: 需要后续追踪)
// System.out.println("年龄是:" + RpcContext.getContext().getAttachment("age"));
//8. 调用服务
System.out.println(greetingService.sayHello("pan"));
}
}
1. new ReferenceConfig

在此阶段,会初始化org.apache.dubbo.config.AbstractConfig & org.apache.dubbo.config.ReferenceConfig的静态变量以及静态代码块。

2. ReferenceConfig#get
  • ReferenceConfig#init
  1. 通过DubboBootstrap启动dubbo。
  2. 继而初始化服务的元数据信息,URL.buildKey(interfaceName, group, version)这段用来生成唯一服务的key,所以我们之前说dubbo的唯一标识是通过全路径和group以及version来决定的。
  3. 接下来通过org.apache.dubbo.config.utils.ConfigValidationUtils#checkMock来检查我们mock是否设置正确。
  4. 设置一系列要用的参数(系统运行参数、是否为consumer、是否为泛型调用等等),检查dubbo的注册地址,默认为当前主机IP
  • ReferenceConfig#createProxy 创建调用代理开始
  1. ReferenceConfig#shouldJvmRefer首先判断是否为Injvm调用

  2. 如果不为injvm,判断是否为peer to peer端对端设置,如果为p2p,那么就直连url

  3. 检查注册中心是否存在(注册中心有可能有多个)

  4. 循环检查注册中心是否有效

  5. 配置转换URL

1
registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=deep-in-dubbo-first-consumer&dubbo=2.0.2&pid=9780&refer=application%3Ddeep-in-dubbo-first-consumer%26dubbo%3D2.0.2%26group%3Ddubbo-sxzhongf-group%26interface%3Dcom.sxzhongf.deep.in.dubbo.api.service.IGreetingService%26methods%3DsayHello%2CtestGeneric%26pid%3D9780%26register.ip%3D192.168.14.99%26release%3D2.7.5%26revision%3D1.0.0%26side%3Dconsumer%26sticky%3Dfalse%26timeout%3D5000%26timestamp%3D1582959441066%26version%3D1.0.0&registry=zookeeper&release=2.7.5&timestamp=1582961922459
  1. 如果只有一个注册中心,执行REF_PROTOCOL.refer(interfaceClass, urls.get(0));来将URL转为Invoker对象,因为private static final Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();是扩展是Adaptive,因此在这里会执行Protocol$Adaptive#refer方法,又由于protocol参数值为registry,因此会只是RegistryProtocol#refer,又由于被Wrapper类装配,因此会先执行三个Wrapper类,最后才能执行到RegistryProtocol#refer -> RegistryProtocol#doRefer,在doRefer方法中会订阅服务提供者地址,最后返回Invoker对象。1582964279895)1582977260255
    那么究竟是如何生成的Invoker对象呢?我们来看下具体代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 1.可以查寻RegistryDirectory & StaticDirectory 区别
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
//2. 封装订阅所用URL
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
registry.register(directory.getRegisteredConsumerUrl());
}
//3.build路由规则链
directory.buildRouterChain(subscribeUrl);
//4.订阅服务提供者地址
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
//5.封装集群策略到虚拟invoker
Invoker invoker = cluster.join(directory);
return invoker;
}

上述代码中,步骤1根据URL生成了一个RegistryDirectory(关于Directory接口的作用,可以自行查询一些,直白一些就是将一堆Invoker对象封装成一个List<Invoker>,只有2种实现RegistryDirectory & StaticDirectory,从命名可看出一个是动态可变,一个不可变),代码2 封装了订阅所要使用的参数信息,代码3则是封装绑定路由规则链,代码4订阅。代码5调用 Cluster$Adaptive#join方法生成Invoker对象。

在代码2种从zk获取服务提供者信息:

1582978175936 一旦zk返回服务提供者列表之后,就会调用RegistryDirectory#notify,如下:

1582978319055

org.apache.dubbo.common.utils.UrlUtils#isMatch中对provider和consumer的api进行匹配校验。继续跟踪:RegistryDirectory#notify -> RegistryDirectory#refreshOverrideAndInvoker -> RegistryDirectory#refreshInvoker -> RegistryDirectory#toInvokerstoInvokers正式将URL转换为Invoker,通过invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl); 在这里protocol#refer同样执行顺序如:

(dubbo 2.7.5) protocol#refer -> protocol$Adaptive#refer -> QosProtocolWrapper#refer -> ProtocolListenerWrapper#refer -> ProtocolFilterWrapper#refer ->AbstractProtocol#refer->DubboProtocol#protocolBindingRefer,调用代码如下:

1
2
3
4
5
6
7
8
9
10
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);

// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);

return invoker;
}

关注getClients,其中执行了DubboProtocol#getSharedClient -> DubboProtocol#initClient 创建netty client进行连接。

因为这里使用的是明确的DubboInvoker,在回调的时候,Wrapper会对该Invoker进行包装,执行效果如下:

1582984563602

因此,会执行到ProtocolFilterWrapper#buildInvokerChain,该函数会对服务进行调用链跟踪:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
// 获取所有在MATA-INF文件中配置的激活的责任链接口
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);

if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {

@Override
public Class<T> getInterface() {
return invoker.getInterface();
}

@Override
public URL getUrl() {
return invoker.getUrl();
}

@Override
public boolean isAvailable() {
return invoker.isAvailable();
}

@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
if (filter instanceof ListenableFilter) {// Deprecated!
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
listener.onError(e, invoker, invocation);
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
listener.onError(e, invoker, invocation);
}
throw e;
} finally {

}
return asyncResult.whenCompleteWithContext((r, t) -> {
if (filter instanceof ListenableFilter) {// Deprecated!
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
if (t == null) {
listener.onMessage(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
if (t == null) {
listener.onMessage(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
} else {// Deprecated!
filter.onResponse(r, invoker, invocation);
}
});
}

@Override
public void destroy() {
invoker.destroy();
}

@Override
public String toString() {
return invoker.toString();
}
};
}
}

return last;
}

所有的负载均衡、容错策略等都是在这里绑定的。
7. 如果有多个注册中心,将会循环执行步骤6,将URL转换为Invoker对象,然后添加到一个List,分别进行注册之后,然后判断最后一个注册中心url是否有效,针对多订阅的场景,URL中添加cluster参数,默认使用org.apache.dubbo.rpc.cluster.support.registry.ZoneAwareCluster策略,使用org.apache.dubbo.rpc.cluster.Cluster#join将多个Invoker对象封装一个虚拟的Invoker中,否则如果最后一个注册中心是无效的,直接封装Invoker对象。
8. 创建服务代理ProxyFactory#getProxy(org.apache.dubbo.rpc.Invoker<T>),因为ProxyFactory是一个适配类。那么同样这里会调用ProxyFactory$Adaptive#getProxy,这里最终就是返回一个代理服务的Invoker对象。

至此,我们的消费端的绑定远程zk的服务就已经结束了。
那么,我们在调用服务器方法的时候服务器端和客户端都是如何处理的呢?下节我们将继续分析。

consumer发起远程调用过程

在上一节我们有提到,当消费端通过ReferenceConfig#get 获取到一个代理服务类,我们来看JavassistProxyFactory在生成返回对象之前的代码:

1
2
3
4
5
6
7
8
9
public class JavassistProxyFactory extends AbstractProxyFactory {

@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
...
}

在返回对象之前,传入了一个org.apache.dubbo.rpc.proxy.InvokerInvocationHandler实例,这个实例是一个拦截器实例,所以当消费者调用提供者接口方法的时候,会被该InvokerInvocationHandler拦截,具体流程如下:
consumer-req-provider-workflow.

组件篇

DubboBootstrap

since 2.7.5

ApplicationModel

ExtensionLoader

ConfigManager

Environment

ServiceRepository

ServiceDescriptor

Invoker

Node

Node 这个接口继承者比较多,像 Registry、Monitor、Invoker、Directory 等均继承了这个接口。这个接口包含了一个获取配置信息的方法 getUrl,实现该接口的类可以向外提供配置信息。

Directory

directory-inherit-hierarchy
org.apache.dubbo.rpc.cluster.Directory 是一个SPI扩展接口,继承自org.apache.dubbo.common.Node,由抽象类org.apache.dubbo.rpc.cluster.directory.AbstractDirectory实现,目前(2.7.5)有2个具体实现类:

  • RegistryDirectory 顾名思义,这个是代表会根据注册中心的服务变化而变化。根据他们的声明也能看出。
1
private volatile List<Invoker<T>> invokers;

另外,RegistryDirectory 实现了 NotifyListener 接口,当注册中心节点信息发生变化后,RegistryDirectory 可以通过此接口方法得到变更信息,并根据变更信息动态调整内部 Invoker 列表。

  • StaticDirectory 初始化所有的invoker之后不在改变。
1
private final List<Invoker<T>> invokers;

一个Directory代表一组Invoker对象,对于消费端来说,一个invoker对象代表一个服务提供者。Directory内部通过List来维护invoker,并且list的内容是动态变化的,通过Directory,服务消费者可获取到服务提供者的信息,比如 ip、端口、服务协议等。

RegistryDirectory

  • 图解
    RegistryDirectory Diagram

  • 创建RegistryDirectory
    create workflow
    从上图中可以看出,在调用RegistryProtocol#refer方法时,由于它是一个SPI,因此这里通过Protocol$Adaptive#refer来调用,在consumer启动的时候,我们通过增强SPI获取到了Protocol有3个Wrapper包装类,因此,这里会一步一步执行QosProtocolWrapper->ProtocolFilterWrapper->ProtocolListenerWrapper->RegistryProtocol,进到org.apache.dubbo.registry.integration.RegistryProtocol#doRefer之后,我们可以看到创建了RegistryDirectory对象。

  • 创建 / 更新List<Invoker>
    如前文所述,Directory 核心其实就是封装一个List<Invoker>,那该属性值的变动就比较重要了,当服务消费者启动时,会调用ReferenceConfig#get来获取实现代理类,参考上小结请求方法,最终会调用到RegistryProtocol#doRefer方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 创建RegistryDirectory对象
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
registry.register(directory.getRegisteredConsumerUrl());
}

// 根据URL创建调用规则链路
directory.buildRouterChain(subscribeUrl);
// 订阅服务提供者信息
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));

Invoker invoker = cluster.join(directory);
return invoker;
}

继续跟踪directory.subscribe方法,一直执行到ZookeeperRegistry#doSubscribe之中,发现zkClient添加了一个针对path=/dubbo/com.sxzhongf.deep.in.dubbo.api.service.IGreetingService/providers节点的监听器org.apache.dubbo.remoting.zookeeper.ChildListener,如下图:
add-listener-to-provider,一旦服务提供段发现变化,zkClient就会获取到最新的节点信息,如下图:
listener-response-urls
一旦服务端发生变化,zkClient会通知到
notify
继而执行更换替换的动作:...-> RegistryDirectory#notify -> RegistryDirectory#refreshOverrideAndInvoker -> RegistryDirectory#refreshInvoker,如下图:
refresh invoker
然后在toInvoker方法中调用protocol$Adaptive$refer来生成Invoker对象,因为使用的是适配类对象,因此具体执行的是RegistryProtocol#refer,同样是经过QosProtocolWrapperProtocolFilterWrapper以及ProtocolListenerWrapper装配过处理。
assets/url-to-invoker-registryprotocol
至此,Url -> Invoker流程就清晰了,如何赋值和修改List<Invoker>也就知道了。

StaticDirectory

从名称上看,为静态服务资源目录,内部存放的List<Invoker>是不会变,我们主要看该类中的2个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

//该方法我们之前有提过,调用生成路由链路
public void buildRouterChain() {
RouterChain<T> routerChain = RouterChain.buildChain(getUrl());
routerChain.setInvokers(invokers);
this.setRouterChain(routerChain);
}

// 该方法是主方法,处理选择list
@Override
protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
List<Invoker<T>> finalInvokers = invokers;
if (routerChain != null) {
try {
finalInvokers = routerChain.route(getConsumerUrl(), invocation);
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
}
}
return finalInvokers == null ? Collections.emptyList() : finalInvokers;
}

目前(dubbo 2.7.5)StaticDirectory仅用于多注册中心合并invoker的时候使用,判断代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
this.multiGroup = group != null && (ANY_VALUE.equals(group) || group.contains(","));

this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;

private List<Invoker<T>> toMergeInvokerList(List<Invoker<T>> invokers) {
List<Invoker<T>> mergedInvokers = new ArrayList<>();
Map<String, List<Invoker<T>>> groupMap = new HashMap<>();
// 必须是多注册中心,否则这里即便设置了多组也只会添加一个组
for (Invoker<T> invoker : invokers) {
String group = invoker.getUrl().getParameter(GROUP_KEY, "");
groupMap.computeIfAbsent(group, k -> new ArrayList<>());
groupMap.get(group).add(invoker);
}

if (groupMap.size() == 1) {
mergedInvokers.addAll(groupMap.values().iterator().next());
} else if (groupMap.size() > 1) {
for (List<Invoker<T>> groupList : groupMap.values()) {
//这里
StaticDirectory<T> staticDirectory = new StaticDirectory<>(groupList);
staticDirectory.buildRouterChain();
mergedInvokers.add(CLUSTER.join(staticDirectory));
}
} else {
mergedInvokers = invokers;
}
return mergedInvokers;
}

Router

服务路由是什么?
服务路由包含一条路由规则,路由规则决定了服务消费者的调用目标,即规定了服务消费者可调用哪些服务提供者。

Directory 在更新List<Invoker>的时候,会通过 Router 进行服务路由,筛选出符合路由规则的服务提供者,
org.apache.dubbo.registry.integration.RegistryProtocol#doRefer方法中我们可以看到有一段代码directory.buildRouterChain(subscribeUrl);,它是根据URL来生成调用规则链的,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//org.apache.dubbo.registry.integration.RegistryDirectory#buildRouterChain
public void buildRouterChain(URL url) {
this.setRouterChain(RouterChain.buildChain(url));
}

//org.apache.dubbo.rpc.cluster.RouterChain#buildChain
public static <T> RouterChain<T> buildChain(URL url) {
return new RouterChain<>(url);
}

//org.apache.dubbo.rpc.cluster.RouterChain#RouterChain
private RouterChain(URL url) {
List<RouterFactory> extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class)
.getActivateExtension(url, "router");

List<Router> routers = extensionFactories.stream()
.map(factory -> factory.getRouter(url))
.collect(Collectors.toList());

initWithRouters(routers);
}

// org.apache.dubbo.rpc.cluster.RouterChain#initWithRouters
public void initWithRouters(List<Router> builtinRouters) {
this.builtinRouters = builtinRouters;
this.routers = new ArrayList<>(builtinRouters);
this.sort();
}

另外,在得到zkClient#notify之后,会执行org.apache.dubbo.registry.integration.RegistryDirectory#refreshInvoker,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void refreshInvoker(List<URL> invokerUrls) {
Assert.notNull(invokerUrls, "invokerUrls should not be null");

if (invokerUrls.size() == 1
&& invokerUrls.get(0) != null
&& EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
...
} else {
...
// pre-route and build cache, notice that route cache should build on original Invoker list.
// toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
routerChain.setInvokers(newInvokers);
this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
this.urlInvokerMap = newUrlInvokerMap;

try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}

//org.apache.dubbo.rpc.cluster.RouterChain#setInvokers
public void setInvokers(List<Invoker<T>> invokers) {
this.invokers = (invokers == null ? Collections.emptyList() : invokers);
routers.forEach(router -> router.notify(this.invokers));
}

在消费方根据集群策略,比如默认的FailoverClusterInboker获取服务提供者对应的List<invoker>的时候,会执行FailoverClusterInvoker#doInvoke -> AbstractClusterInvoker#list -> Directory#list -> AbstractDirectory#list -> RegistryDirectory#doList -> RouterChain#route对服务进行路由。