Dubbo服务发布 Dubbo服务发布影响流程的主要包括三个部分,依次是: 服务暴露 心跳 服务注册 服务暴露是对外提供服务及暴露端口,以便消费端可以正常调通服务。心跳机制保证服务器端及客户端。..
一、Dubbo服务发布 Dubbo服务发布影响流程的主要包括三个部分,依次是:
服务暴露 心跳 服务注册
服务暴露是对外提供服务及暴露端口,以便消费端可以正常调通服务。心跳机制保证服务器端及客户端正常长连接的保持,服务注册是向注册中心注册服务暴露服务的过程。
1.1 Dubbo服务暴露 此处只记录主要代码部分以便能快速定位到主要的核心代码: ServiceConfig.java中代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 if (registryURLs != null && registryURLs.size() > 0 && url.getParameter("register" , true )) { for (URL registryURL : registryURLs) { url = url.addParameterIfAbsent("dynamic" , registryURL.getParameter("dynamic" )); URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null ) { url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); Exporter<?> exporter = protocol.export(invoker); exporters.add(exporter); } ``` 循环注册中心,对每个注册中心都执行代码块中的执行过程 1 。如果url中没有dynamic 参数,则从registerUrl中取值,并赋予url dynamic是服务动态注册的标识,默认为true ,如果设置为false ,则服务注册后显示disable状态,需人工启动 2 。加载注册中心对应的监控中心配置 3 。如果注册中心不为空则设置url的 monitor参数 ```java 4. Invoker proxyFactory.getInvoker proxyFactory 默认为JavassistProxyFactory对象,这段代码为创建 ref 服务对象的代理对象。 proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); 获取ref的代理对象并在registryURL 中添加export属性,代理对象中属性参数如下  为暴露服务的核心实现部分,协议的调用链如下:
其中DubboProtocol 实现了服务暴露及心跳检测功能 RegistryProtocol 调用了DubboProtocol及注册服务
1 2 3 4 5 接下来经过两个扩展类(包装器) ProtocolFilterWrapper和ProtocolListenerWrapper 进入RegistryProtocol 核心代码如下: public <T> Exporter<T> export (final Invoker<T> originInvoker) throws RpcException { final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
1 2 3 4 //registry provider final Registry registry = getRegistry(originInvoker); // 获得服务提供者 URL final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
1 2 3 4 5 6 7 8 9 10 11 registry.register(registedProviderUrl); final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);final OverrideListener overrideSubscribeListener = new OverrideListener (overrideSubscribeUrl, originInvoker);overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); return new Exporter <T>() { public Invoker<T> getInvoker () { return exporter.getInvoker();
}
1 2 3 4 5 public void unexport () { try { exporter.unexport(); } catch (Throwable t) { logger.warn(t.getMessage(), t);
}
1 2 try { registry.unregister(registedProviderUrl);
}
1 2 3 try { overrideListeners.remove(overrideSubscribeUrl); registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
}
};
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 /** * 暴露服务。 * * 此处的 Local 指的是,本地启动服务,但是不包括向注册中心注册服务的意思。 * @param originInvoker * @param <T> * @return */ @SuppressWarnings("unchecked") private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) { // 获得在 `bounds` 中的缓存 Key //dubbo://192.168.20.218:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&default.accepts=1000&default.threadpool=fixed&default.threads=100&default.timeout=5000&dubbo=2.0.0&generic=false& // interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&owner=uce&pid=1760&side=provider×tamp=1530150456618 String key = getCacheKey(originInvoker); ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); // 未暴露过,进行暴露服务 // InvokerDelegete 实现 com.alibaba.dubbo.rpc.protocol.InvokerWrapper 类,主要增加了 #getInvoker() 方法,获得真实的,非 InvokerDelegete 的 Invoker 对象。 // 因为,可能会存在 InvokerDelegete.invoker 也是 InvokerDelegete 类型的情况。 getProviderUrl 同上 key = getCacheKey final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); // 暴露服务,创建 Exporter 对象 Exporter<T> export = (Exporter<T>) protocol.export(invokerDelegete); // 使用 创建的Exporter对象 + originInvoker ,创建 ExporterChangeableWrapper 对象 exporter = new ExporterChangeableWrapper<T>(export, originInvoker); bounds.put(key, exporter); } return exporter; } ``` 1。代用同步锁+double-check的方式来保证同样的服务不重复暴露。
2.new InvokerDelegete(originInvoker, getProviderUrl(originInvoker)); InvokerDelegete 实现 com.alibaba.dubbo.rpc.protocol.InvokerWrapper(invoke) 类,主要增加了 #getInvoker() 方法,获得真实的,非 InvokerDelegete 的 Invoker 对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 3。调用protocol.export接 经过ProtocolFilterWrapper.invoker方法 创过滤器链再暴露服务: ```java protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); /** * 构建过滤器链 * @param invoker injvm://127.0.0.1/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&default.accepts=1000&default.threadpool=fixed&default.threads=100&default.timeout=5000&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&owner=uce&pid=9932&side=provider×tamp=1527930395583 * @param key service.filter 该参数用于获得 ServiceConfig 或 ReferenceConfig 配置的自定义过滤器 * 以 ServiceConfig 举例子,例如 url = injvm://127.0.0.1/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.3.17&bind.port=20880&default.delay=-1&default.retries=0&default.service.filter=demo&delay=-1&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=81844&qos.port=22222&service.filter=demo&side=provider×tamp=1520682156043 中, * service.filter=demo,这是笔者配置自定义的 DemoFilter 过滤器。 * <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" filter="demo" /> * @param group provider 属性,分组 * 在暴露服务时,group = provider 。 * 在引用服务时,group = consumer 。
*/
1 2 3 4 5 private static <T> Invoker<T> buildInvokerChain (final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
GenericFilter
ContextFilter
TraceFilter
TimeoutFilter
MonitorFilter
ExceptionFilter
DemoFilter 【自定义】*/
1 2 3 4 5 6 //倒序循环 Filter ,创建带 Filter 链的 Invoker 对象。因为是通过嵌套声明匿名类循环调用的方式,所以要倒序。可以手工模拟下这个过程。通过这样的方式,实际过滤的顺序,还是我们上面看到的正序 if (filters.size() > 0) { for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() {
1 2 3 @Override public Class<T> getInterface () { return invoker.getInterface();
}
1 2 public URL getUrl () { return invoker.getUrl();
}
1 2 public boolean isAvailable () { return invoker.isAvailable();
}
1 2 public Result invoke (Invocation invocation) throws RpcException { return filter.invoke(next, invocation);
}
1 2 public void destroy () { invoker.destroy();
}
1 2 public String toString () { return invoker.toString();
}
};
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 接下来执行DubboProrocol进行服务暴露的过程。 public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. String key = serviceKey(url); // 创建 DubboExporter 对象,并添加到 `exporterMap` 。 DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispaching event Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } // 启动服务器 openServer(url); } ``` 1。获取invoker的 URL信息 2。获取key信息 为URL中interface与暴露端口的拼装字符串:com.alibaba.dubbo.demo.DemoService:20880 3。创建DubboExporter对象 并且入参为exporterMap 4。将exporter对象添加到exporterMap中
/**
启动服务器 @param url URL1 2 3 4 5 6 7 8 9 10 11 12 13 */ ```java private void openServer(URL url) { // find server. String key = url.getAddress(); //client 也可以暴露一个只有server可以调用的服务。 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null) { serverMap.put(key, createServer(url)); //server支持reset,配合override功能使用 server.reset(url);
}
`调用createServer()方法 并存入DubboProtocol的serverMap中```java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private ExchangeServer createServer (URL url) { url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException ("Unsupported server type: " + str + ", url: " + url); url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException ("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
1 2 3 4 5 str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0 ) { Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException ("Unsupported client type: " + str);
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 1。默认开启server 关闭时发送readonly事件:channel.readonly.sent : true 2。默认开启 heartbeat 3。获取服务暴露的 server 传输 , 默认为netty 4。设置编码器为Dubbo也就是 DubboCountCodec 5.Exchangers#bind(url, requestHandler) 启动服务器,requestHandler结构如下  具体实现代码如下: private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); //如果是callback 需要处理高版本调用低版本的问题 if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || methodsStr.indexOf(",") == -1) { hasMethod = inv.getMethodName().equals(methodsStr); String[] methods = methodsStr.split(","); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true; break; } if (!hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the API interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv); return null; } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); return invoker.invoke(inv); } throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } public void received(Channel channel, Object message) throws RemotingException { reply((ExchangeChannel) channel, message); super.received(channel, message); } public void connected(Channel channel) throws RemotingException { invoke(channel, Constants.ON_CONNECT_KEY); } public void disconnected(Channel channel) throws RemotingException { logger.info("disconected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl()); } invoke(channel, Constants.ON_DISCONNECT_KEY); } private void invoke(Channel channel, String methodKey) { Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey); if (invocation != null) { try { received(channel, invocation); logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t); } private Invocation createInvocation(Channel channel, URL url, String methodKey) { String method = url.getParameter(methodKey); if (method == null || method.length() == 0) { } RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]); invocation.setAttachment(Constants.PATH_KEY, url.getPath()); invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY)); invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY)); invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY)); if (url.getParameter(Constants.STUB_EVENT_KEY, false)) { invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString()); } return invocation; } }; ``` ```java Exchangeers.bind(URL url, ExchangeHandler handler) public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null");
}
1 2 if (handler == null) { throw new IllegalArgumentException("handler == null");
}
1 2 url = url.addParameterIfAbsent (Constants .CODEC_KEY , "exchange" ); return getExchanger (url).bind (url, handler);
}
1 2 3 public static Exchanger getExchanger (URL url) { String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER); return getExchanger(type);
}
1 2 public static Exchanger getExchanger (String type) { return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 接口作用是设置exchanger params为header 并且获取Exchanger.class的header扩展接口HeaderExchanger, 并调用bind方法: public class HeaderExchanger implements Exchanger { public static final String NAME = "header"; public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); } public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } ``` ```java 先将 DubboProtocol入参 传过来的ExchangeHandler对象ExchangeHandlerAdapter() 进行包装组成handler链:最后返回ChannelHandler对象,接下来调用:Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))) Server Transporters.bind(URL url, ChannelHandler... handlers) Transpoter$Adaptive.bind() 数据透传 NettyTransporter.java Server NettyTransporter.bind(URL url, ChannelHandler listener) public Server bind(URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener);
}
1 2 3 4 5 6 7 8 9 10 作用是: 返回一个NettyServer实例: public NettyServer(URL url, ChannelHandler handler) throws RemotingException { super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); } ``` ```java ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)) 只用是生成获取ThreadName的名称 为URL添加threadname的param ChannelHandlers.wrap(ChannelHandler handler, URL url) 代码如下: public class ChannelHandlers {
1 private static ChannelHandlers INSTANCE = new ChannelHandlers ();
1 protected ChannelHandlers() {
}
1 2 public static ChannelHandler wrap (ChannelHandler handler, URL url) { return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
1 2 protected static ChannelHandlers getInstance() { return INSTANCE;
}
1 2 static void setTestingChannelHandlers(ChannelHandlers instance) { INSTANCE = instance;
}
1 2 3 protected ChannelHandler wrapInternal (ChannelHandler handler, URL url) { return new MultiMessageHandler (new HeartbeatHandler (ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url)));
}
1 2 3 4 5 6 7 8 9 ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url):  获取到AllDispatcher分发器进行透传: public class AllDispatcher implements Dispatcher { public static final String NAME = "all"; public ChannelHandler dispatch(ChannelHandler handler, URL url) { return new AllChannelHandler(handler, url); } ```
结构如图所示:
1 2 3 4 5 6 调用WrappedChannelHandler的构造方法: ```java public WrappedChannelHandler(ChannelHandler handler, URL url) { this.handler = handler; this.url = url; executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
1 2 3 String componentKey = Constants .EXECUTOR_SERVICE_COMPONENT_KEY ;if (Constants .CONSUMER_SIDE .equalsIgnoreCase (url.getParameter (Constants .SIDE_KEY ))) { componentKey = Constants .CONSUMER_SIDE ;
}
1 2 DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 这段代码的功能为: 1。将 之前头创的DecoderHandler对象再进包装 包装为AllChannelHandler 2。生成线程池对象Executor对象 3。获取默认的DataStore对象,并将线程池对象放入DataStore 中 key为 : java.util.concurrent.ExecutorService 字符串和服务暴露的端口 值为线程池对象 return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url))); 接下来将返回的AllChannelHandler对象用HeartbeatHandler 和 MultiMessageHandler 进行包装处理并返回ChannelHandler.wrap() 的上一端。 NettyTransporter.bind(URL url, ChannelHandler listener) -> new NettyServer(URL url, ChannelHandler handler) -> super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); 接下来是创建NettyServer对象的最后一步: NettyServer ==> } AbstractServer==> public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); localAddress = getUrl().toInetSocketAddress(); String host = url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(getUrl().getHost()) ? NetUtils.ANYHOST : getUrl().getHost(); bindAddress = new InetSocketAddress(host, getUrl().getPort()); this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); try { doOpen(); logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } //fixme replace this with better method executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort())); } AbstractEndpoint ==> public AbstractEndpoint(URL url, ChannelHandler handler) { this.codec = getChannelCodec(url); this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT); } AbstractPeer==> public AbstractPeer(URL url, ChannelHandler handler) { } } } ``` ```javascript 调用栈如上所示: 因为之前设置了codec为dubbo 所以返回DubboCountCodec实例 获取超时时间timeout ,和链接的超时时间connectTimeout localAddress为本地IP:PORT port为服务暴露的端口 host 为0.0.0.0 bindAddress为 host:port port为服务暴露的端口 this.accept 为默认获取最大连接数 idleTimeout为 url中 idle.timeout 核心代码:doOpen() protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 final NettyHandler nettyHandler = new NettyHandler (getUrl(), this );channels = nettyHandler.getChannels(); bootstrap.setPipelineFactory(new ChannelPipelineFactory () { public ChannelPipeline getPipeline () { NettyCodecAdapter adapter = new NettyCodecAdapter (getCodec(), getUrl(), NettyServer.this ); ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder" , adapter.getDecoder()); pipeline.addLast("encoder" , adapter.getEncoder()); pipeline.addLast("handler" , nettyHandler); return pipeline;
}
1 2 3 }); // bind channel = bootstrap.bind(getBindAddress());
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 1。首先进行Netty的日志配置 接下来先生成 NettyCodecAdapter 入参为之前生成的codec , URL信息(主要用到buffer属性配置Netty缓冲区)及 this (Handler) 对象 接下来就是设置Netty的Encoder Decoder 来进行数据的编码与解码 其会调用 this的handler链来进行数据处理。Dubbo2.5.6采用的是Netty3来进行通讯的,此处就不进行赘述。 AbstractServer 接下来获取到从DataStore对象中获取之前缓存的线程池 ,设置 NettyServer的 executor属性。 自此,Dubbo服务暴露的代码解析完毕,NettyServer的类结构图如下: #### 1.2 心跳服务 Dubbo provider的心跳服务是 HeaderExchanger bind代码执行的最后一步:参数是上面生成的Server对象 (NettyServer)。 public HeaderExchangeServer(Server server) { throw new IllegalArgumentException("server == null"); } this.server = server; this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); if (heartbeatTimeout < heartbeat * 2) { throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); } startHeatbeatTimer(); } ``` ```java 1。初始化 server信息 2。获取server URL中heartbeat信息 及心跳超时信息,默认为heartbeat的三倍 3。执行心跳代码 startHeatbeatTimer() private void startHeatbeatTimer() { stopHeartbeatTimer(); if (heartbeat > 0) { heatbeatTimer = scheduled.scheduleWithFixedDelay( new HeartBeatTask(new HeartBeatTask.ChannelProvider() { public Collection<Channel> getChannels() { return Collections.unmodifiableCollection( HeaderExchangeServer.this.getChannels());
}
1 2 }, heartbeat, heartbeatTimeout), heartbeat, heartbeat, TimeUnit.MILLISECONDS);
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 1。停止定时任务——首先停止定时器中所有任务,置空 beatbeatTimer; 2。重新设置定时器 , 环测 接下来在DubboProtocol的openServer(URL) 方法中将创建的ExchangeServer对象放入 DubboProtocol的 serverMap 集合对象中 key为服务的ip:port 如 192.168.20.218:20880 value为之前创建的ExchangeServer对象 DubboProtocol export方法到此执行完毕,最终返回的是 DubboExporter对象包装了入参的invoker对象,serviceKey信息,及服务暴露的 exporterMap对象。 #### 1.3 服务注册 我们接着来看RegistryProtocol 接下来的执行代码: //registry provider 添加定时任务 ping request response } try { } try { } try { } }; ```
1.ExporterChangeableWrapperdoLocalExport(final InvokeroriginInvoker) 为暴露服务的执行过程,上面流程已经走过。 返回的数据格式如下: 2。根据originInvoker中注册中心信息获取对应的Registry对象,因为这里是zookeeper协议,所以为ZookeeperRegistry对象 3。从注册中心的URL中获得 export 参数对应的值,即服务提供者的URL. 4.registry.register(registedProviderUrl); 用之前创建的注册中心对象注册服务 5. // TODO
1 2 3 4 5 6 7 8 9 ```java 上面提到 Registry getRegistry(final Invoker<?> originInvoker) 是根据invoker的地址获取registry实例代码如下: private Registry getRegistry(final Invoker<?> originInvoker) { // registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0&export=dubbo%3A%2F%2F192.168.20.218%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26default.accepts%3D1000%26default.threadpool%3Dfixed%26default.threads%3D100%26default.timeout%3D5000%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26owner%3Duce%26pid%3D12028%26side%3Dprovider%26timestamp%3D1531912729429&owner=uce&pid=12028®istry=zookeeper×tamp=1531912729343 URL registryUrl = originInvoker.getUrl(); if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) { String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY); registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
}
1 2 // zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0&export=dubbo%3A%2F%2F192.168.20.218%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26default.accepts%3D1000%26default.threadpool%3Dfixed%26default.threads%3D100%26default.timeout%3D5000%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26owner%3Duce%26pid%3D12028%26side%3Dprovider%26timestamp%3D1531912729429&owner=uce&pid=12028×tamp=1531912729343 return registryFactory.getRegistry(registryUrl);
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 上面代码的意思是: 1。获取originalInvoker中的URL信息 (注册中心的配置信息) 2。将URL中信息中Param中registry参数获取到,并替换URL中的protocol属性,并删除Param中的registry信息,上面代码中的注释为执行前和执行后的的结果。 3。获取protocol 为 zookeeper对应的RegistryFactory接口的扩展对象 ZookeeperRegistryFactory 并执行getRegistry 方法: ZookeeperRegistryFactory的继承结构和对应类中属性如下图所示: 其中REGISTRIES = new ConcurrentHashMap<String, Registry>(); 代表注册中心的配置,其中可以有多个注册中心配置 AbstractRegistryFactory.getRegistry执行代码如下: public Registry getRegistry(URL url) { url = url.setPath(RegistryService.class.getName()) .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()) .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY); String key = url.toServiceString(); // zookeeper://192.168.1.157:2181/com.alibaba.dubbo.registry.RegistryService // 锁定注册中心获取过程,保证注册中心单一实例 LOCK.lock(); try { Registry registry = REGISTRIES.get(key); if (registry != null) { return registry; } registry = createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry " + url); } REGISTRIES.put(key, registry); } finally { // 释放锁 LOCK.unlock(); } ``` ```java 1。设置Path属性,添加interface参数信息,及移除export 和 refer 参数信息。执行结果如下: zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&owner=uce&pid=12028×tamp=1531912729343 2。获取url对应的serviceString信息:zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService,由于我使用的是本地的zookeeper 所以IP为 127.0.0.1 3。顺序地创建注册中心:Registry ZookeeperRegistryFactory.createRegistry(URL url); public Registry createRegistry(URL url) { return new ZookeeperRegistry(url, zookeeperTransporter);
}
1 2 3 4 5 public ZookeeperRegistry (URL url, ZookeeperTransporter zookeeperTransporter) { super (url); if (url.isAnyHost()) { throw new IllegalStateException ("registry address == null" );
}
1 2 3 String group = url.getParameter (Constants .GROUP_KEY , DEFAULT_ROOT );if (!group.startsWith (Constants .PATH_SEPARATOR )) { group = Constants .PATH_SEPARATOR + group;
}
1 2 3 4 5 6 7 8 9 this .root = group;zkClient = zookeeperTransporter.connect(url); zkClient.addStateListener(new StateListener () { public void stateChanged (int state) { if (state == RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e);
}
1 2 3 4 5 6 7 8 9 10 }); public FailbackRegistry (URL url) { int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); this .retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable () { public void run () { try { retry(); } catch (Throwable t) { logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
}
1 }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}
1 2 3 4 5 6 7 8 9 10 11 public AbstractRegistry (URL url) { setUrl(url); syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false ); String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home" ) + "/.dubbo/dubbo-registry-" + url.getHost() + ".cache" ); File file = null ; if (ConfigUtils.isNotEmpty(filename)) { file = new File (filename); if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) { if (!file.getParentFile().mkdirs()) { throw new IllegalArgumentException ("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!" );
}
1 2 3 this.file = file; loadProperties(); notify(url.getBackupUrls());
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 ZookeeperRegistry 的类继承结构图如图所示:  ZooKeeperRegistry.FailbackRegistry.AbstractRegistry中 1.setUrl设置url属性信息 2.是否启用文件的异步保存 3.注册中心对应的本地文件保存的位置信息:如C:\Users\Administrator/.dubbo/dubbo-registry-127.0.0.1.cache 4.给file赋值 并且加载文件信息到properties属性中 5.notify(url.getBackupUrls) 这段代码不知道什么意思。 ZooKeeperRegistry.FailbackRegistry中 1。获取定时任务的时间间隔。 2。开启定时任务定时检测失败的注册,并重新注册。 ZooKeeperRegistry 中 1。获取注册中心的group参数 ,默认为/dubbo , 并未root赋予group值 2.zkClient = zookeeperTransporter.connect(url); 链接zookeeper信息并添加状态监听事件,具体再更文详述吧,代码如下: public ZkclientZookeeperClient(URL url) { client = new ZkClient(url.getBackupAddress()); client.subscribeStateChanges(new IZkStateListener() { public void handleStateChanged(KeeperState state) throws Exception { ZkclientZookeeperClient.this.state = state; if (state == KeeperState.Disconnected) { stateChanged(StateListener.DISCONNECTED); } else if (state == KeeperState.SyncConnected) { stateChanged(StateListener.CONNECTED); } public void handleNewSession() throws Exception { stateChanged(StateListener.RECONNECTED); } }); ```
3。添加重连状态的状态监听事件 调用 recover()方法。 至此 ZookeeperRegistry创建完毕。
1 ZookeeperRegistryFactory中最后将registry放入 ZookeeperRegistryFactory.REGISTRIES中 key 为zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService value 为之前创建的ZookeeperRegistry对象。
接着返回RegistryProtocol 的export方法 , 1。上面说到了调用doLocalExport(originInvoker);进行服务暴露的过程及调用getRegistry(originInvoker)方法通过ZookeeperRegistryFactory 工厂生成 ZookeeperRegistry 方法,然后加入到工厂REGISTRIES 缓存中,并返回ZookeeperRegistry 实例的过程。 2。接下来RegistryProtocol 的export方法中调用 final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); 获取服务提供者的URL信息 , 它是从注册中心的URL中获得export参数对应的值转换的URL信息。(去除掉不需要在注册中心上看到的字段) 3。接下来调用registry.register(registedProviderUrl); 进行服务的注册将暴露的服务信息注册到注册中心,并且将已经注册的服务URL缓存到ZookeeperRegistry.registered 已注册服务的缓存中。
1 FailbackRegistry.register
/**
进行服务注册逻辑的实现1 2 3 4 5 */ ```java public void register(URL url) { if (destroyed.get()){ return;
}1 2 3 4 5 6 7 8 // 调用AbstractRegistry.register进行服务对应URL的缓存 super.register(url); failedRegistered.remove(url); failedUnregistered.remove(url); try { // 向服务器端发送注册请求,将服务注册到注册中心,可以使用各个注册协议(注册中心)的实现 此处使用zookeeper ZookeeperRegistry.doRegister doRegister(url); Throwable t = e;
1 2 3 4 5 6 7 8 boolean check = getUrl ().getParameter (Constants .CHECK_KEY , true ) && url.getParameter (Constants .CHECK_KEY , true ) && !Constants .CONSUMER_PROTOCOL .equals (url.getProtocol ()); boolean skipFailback = t instanceof SkipFailbackWrapperException ; if (check || skipFailback) { if (skipFailback) { t = t.getCause ();
}
1 2 throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
1 2 // 将失败的注册请求记录到失败列表,定时重试 failedRegistered.add(url);
}
1 2 AbstractRegistry.register throw new IllegalArgumentException("register url == null");
}
1 logger.info("Register: " + url);
}
1 2 // 缓存已经注册的服务 registered.add(url);
} ZookeeperRegistry.doRegister
1 2 3 4 5 6 protected void doRegister (URL url ) { try { zkClient.create (toUrlPath (url), url.getParameter (Constants .DYNAMIC_KEY , true )); } catch (Throwable e) { throw new RpcException ("Failed to register " + url + " to zookeeper " + getUrl () + ", cause: " + e.getMessage (), e);
}
4。由registryProviderUrl获取overrideSubscribeUrl 再构建OverrideListener
#### 1.4 赞赏支持
本文标题: Dubbo服务发布之服务
发布时间: 2019年10月03日 00:00
最后更新: 2025年12月30日 08:54
原始链接: https://haoxiang.eu.org/b3a49a5b/
版权声明: 本文著作权归作者所有,均采用CC BY-NC-SA 4.0 许可协议,转载请注明出处!