dubbo导出服务 - litter-fish/ReadSource GitHub Wiki
服务导出的入口方法是 ServiceBean 的 onApplicationEvent。 onApplicationEvent 是一个事件响应方法,该方法会在收到 Spring 上下文刷新事件后执行服务导出操作
public void onApplicationEvent(ContextRefreshedEvent event) {
// 是否有延迟导出 && 是否已导出 && 是不是已被取消导出
if (isDelay() && !isExported() && !isUnexported()) {
// 导出服务
export();
}
}
先判断是否要延迟导出或者导出未被取消,接着进行服务导出
isDelay 方法,当方法返回 true 时,表示无需延迟导出,返回 false 时,表示需要延迟导出。
// -☆- ServiceBean
private boolean isDelay() {
// 获取 delay
Integer delay = getDelay();
ProviderConfig provider = getProvider();
if (delay == null && provider != null) {
// 如果前面获取的 delay 为空,这里继续获取
delay = provider.getDelay();
}
// 判断 delay 是否为空,或者等于 -1
return supportedApplicationListener && (delay == null || delay == -1);
}
supportedApplicationListener: 变量用于表示当前的 Spring 容器是否支持 ApplicationListener,这个值初始为 false。 在 Spring 容器将自己设置到 ServiceBean 中时,ServiceBean 的 setApplicationContext 方法会检测 Spring 容器是否支持 ApplicationListener。 若支持,则将 supportedApplicationListener 置为 true。
导出服务时序图:
public synchronized void export() {
if (provider != null) {
// 获取 export 和 delay 配置
if (export == null) {
export = provider.getExport();
}
if (delay == null) {
delay = provider.getDelay();
}
}
// 如果 export 为 false,则不导出服务
if (export != null && !export) {
return;
}
// delay > 0,延时导出服务
if (delay != null && delay > 0) {
delayExportExecutor.schedule(new Runnable() {
@Override
public void run() {
doExport();
}
}, delay, TimeUnit.MILLISECONDS);
// 立即导出服务
} else {
doExport();
}
}
首先获取export 和 delay 配置,根据export 判断是否要导出服务, 其次根据 delay 配置判断是否是延迟导出还是需要立即导出。 对于延迟导出直接使用带有时间的连接池,不管是延迟还是立即导出都会调用doExport方法进行导出操作
/**
- 检测 <dubbo:service> 标签的 interface 属性合法性,不合法则抛出异常
- 检测 ProviderConfig、ApplicationConfig 等核心配置类对象是否为空,若为空,则尝试从其他配置类对象中获取相应的实例。
- 检测并处理泛化服务和普通服务类
- 检测本地存根配置,并进行相应的处理
- 对 ApplicationConfig、RegistryConfig 等配置类进行检测,为空则尝试创建,若无法创建则抛出异常
*/
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("Already unexported!");
}
if (exported) {
return;
}
exported = true;
// 检测 interfaceName 是否合法
if (interfaceName == null || interfaceName.length() == 0) {
throw new IllegalStateException("interface not allow null!");
}
// 检测 provider 是否为空,为空则新建一个,并通过系统变量为其初始化
checkDefault();
// 下面几个 if 语句用于检测 provider、application 等核心配置类对象是否为空,
// 若为空,则尝试从其他配置类对象中获取相应的实例。
if (provider != null) {
if (application == null) {
application = provider.getApplication();
}
if (module == null) {
module = provider.getModule();
}
if (registries == null) {...}
if (monitor == null) {...}
if (protocols == null) {...}
}
if (module != null) {
if (registries == null) {
registries = module.getRegistries();
}
if (monitor == null) {...}
}
if (application != null) {
if (registries == null) {
registries = application.getRegistries();
}
if (monitor == null) {...}
}
// 检测 ref 是否为泛化服务类型
if (ref instanceof GenericService) {
// 设置 interfaceClass 为 GenericService.class
interfaceClass = GenericService.class;
if (StringUtils.isEmpty(generic)) {
// 设置 generic = "true"
generic = Boolean.TRUE.toString();
}
// ref 非 GenericService 类型
} else {
try {
interfaceClass =
Class
.forName(
interfaceName,
true,
Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
// 对 interfaceClass,以及 <dubbo:method> 标签中的必要字段进行检查
checkInterfaceAndMethods(interfaceClass, methods);
// 对 ref 合法性进行检测
checkRef();
// 设置 generic = "false"
generic = Boolean.FALSE.toString();
}
// local 和 stub 在功能应该是一致的,用于配置本地存根
if (local != null) {
if ("true".equals(local)) {
local = interfaceName + "Local";
}
Class<?> localClass;
try {
// 获取本地存根类
localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
// 检测本地存根类是否可赋值给接口类,若不可赋值则会抛出异常,提醒使用者本地存根类类型不合法
if (!interfaceClass.isAssignableFrom(localClass)) {
throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);
}
}
if (stub != null) {
// 此处的代码和上一个 if 分支的代码基本一致,这里省略
}
// 检测各种对象是否为空,为空则新建,或者抛出异常
checkApplication();
checkRegistry();
checkProtocol();
appendProperties(this);
checkStubAndMock(interfaceClass);
if (path == null || path.length() == 0) {
path = interfaceName;
}
// 导出服务
doExportUrls();
// ProviderModel 表示服务提供者模型,此对象中存储了与服务提供者相关的信息。
// 比如服务的配置信息,服务实例等。每个被导出的服务对应一个 ProviderModel。
// ApplicationModel 持有所有的 ProviderModel。
ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}
首先进行一些导出服务的检查操作,如:服务是否是否支持导出操作;接口名称是否正常等, 接着检测 provider、application 等核心配置类对象是否为空,为空则尝试从其他配置获取。 然后检测各种对象是否为空,为空则新建,或者抛出异常 最后进行导出url
private void doExportUrls() {
// 加载注册中心链接
List<URL> registryURLs = loadRegistries(true);
// 遍历 protocols,并在每个协议下导出服务
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
首先加载注册中心链接, 接着遍历每个协议,并在每个协议下导出服务
/**
- 1. 检测是否存在注册中心配置类,不存在则抛出异常
- 2. 构建参数映射集合,也就是 map
- 3. 构建注册中心链接列表
- 4. 遍历链接列表,并根据条件决定是否将其添加到 registryList 中
*/
protected List<URL> loadRegistries(boolean provider) {
// 检测是否存在注册中心配置类,不存在则抛出异常
checkRegistry();
List<URL> registryList = new ArrayList<URL>();
if (registries != null && !registries.isEmpty()) {
for (RegistryConfig config : registries) {
String address = config.getAddress();
if (address == null || address.length() == 0) {
// 若 address 为空,则将其设为 0.0.0.0
address = Constants.ANYHOST_VALUE;
}
// 从系统属性中加载注册中心地址
String sysaddress =
System.getProperty("dubbo.registry.address");
if (sysaddress != null && sysaddress.length() > 0) {
address = sysaddress;
}
// 检测 address 是否合法
if (
address.length() > 0
&& !RegistryConfig
.NO_AVAILABLE
.equalsIgnoreCase(address)) {
Map<String, String> map = new HashMap<String, String>();
// 添加 ApplicationConfig 中的字段信息到 map 中
appendParameters(map, application);
// 添加 RegistryConfig 字段信息到 map 中
appendParameters(map, config);
// 添加 path、pid,protocol 等信息到 map 中
map.put("path", RegistryService.class.getName());
map.put("dubbo", Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY,
String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY,
String.valueOf(ConfigUtils.getPid()));
}
if (!map.containsKey("protocol")) {
if (
ExtensionLoader
.getExtensionLoader(RegistryFactory.class)
.hasExtension("remote")) {
map.put("protocol", "remote");
} else {
map.put("protocol", "dubbo");
}
}
// 解析得到 URL 列表,address 可能包含多个注册中心 ip,
// 因此解析得到的是一个 URL 列表
List<URL> urls = UrlUtils.parseURLs(address, map);
for (URL url : urls) {
url = url.addParameter(
Constants.REGISTRY_KEY, url.getProtocol());
// 将 URL 协议头设置为 registry
url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
// 通过判断条件,决定是否添加 url 到 registryList 中,条件如下:
// (服务提供者 && register = true 或 null)
// || (非服务提供者 && subscribe = true 或 null)
if (
(provider
&& url.getParameter(
Constants.REGISTER_KEY, true))
||
(!provider
&& url.getParameter(
Constants.SUBSCRIBE_KEY, true))) {
registryList.add(url);
}
}
}
}
}
return registryList;
}
首先检测是否存在注册中心配置类,不存在则抛出异常, 接着遍历配置的注册中心配置,处理address,然后构造一个 URL 列表,通过判断条件,决定是否添加 url 到 registryList 中
首先组装 URL
private void doExportUrlsFor1Protocol(P
rotocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName();
// 如果协议名为空,或空串,则将协议名变量设置为 dubbo
if (name == null || name.length() == 0) {
name = "dubbo";
}
Map<String, String> map = new HashMap<String, String>();
// 添加 side、版本、时间戳以及进程号等信息到 map 中
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
// 通过反射将对象的字段信息添加到 map 中
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, protocolConfig);
appendParameters(map, this);
// methods 为 MethodConfig 集合,MethodConfig 中存储了 <dubbo:method> 标签的配置信息
if (methods != null && !methods.isEmpty()) {
// 这段代码用于添加 Callback 配置到 map 中,代码太长,待会单独分析
}
// 检测 generic 是否为 "true",并根据检测结果向 map 中添加不同的信息
if (ProtocolUtils.isGeneric(generic)) {
map.put(Constants.GENERIC_KEY, generic);
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}
// 为接口生成包裹类 Wrapper,Wrapper 中包含了接口的详细信息,比如接口方法名数组,字段信息等
String[] methods =
Wrapper.getWrapper(interfaceClass).getMethodNames();
// 添加方法名到 map 中,如果包含多个方法名,则用逗号隔开,比如 method = init,destroy
if (methods.length == 0) {
logger.warn("NO method found in service interface ...");
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
// 将逗号作为分隔符连接方法名,并将连接后的字符串放入 map 中
map.put(
Constants.METHODS_KEY,
StringUtils.join(
new HashSet<String>(Arrays.asList(methods)), ","));
}
}
// 添加 token 到 map 中
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
// 随机生成 token
map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(Constants.TOKEN_KEY, token);
}
}
// 判断协议名是否为 injvm
if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
protocolConfig.setRegister(false);
map.put("notify", "false");
}
// 获取上下文路径
String contextPath = protocolConfig.getContextpath();
if (
(contextPath == null || contextPath.length() == 0)
&& provider != null) {
contextPath = provider.getContextpath();
}
// 获取 host 和 port
String host =
this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
// 组装 URL
URL url =
new URL(
name,
host,
port,
(contextPath == null || contextPath.length() == 0
? ""
: contextPath + "/") + path,
map);
// 省略无关代码
}
首先根据配置组装一个map对象,对象包括side、版本、时间戳以及进程号等信息, 接着根据map及host 和 port 组装一个URL对象。
接着检测 dubbo:method 标签中的配置信息,并将相关配置添加到 map 中
private void doExportUrlsFor1Protocol(
ProtocolConfig protocolConfig, List<URL> registryURLs) {
// 省略组装 URL
// methods 为 MethodConfig 集合,MethodConfig 中存储了 <dubbo:method> 标签的配置信息
if (methods != null && !methods.isEmpty()) {
for (MethodConfig method : methods) {
// 添加 MethodConfig 对象的字段信息到 map 中,键 = 方法名.属性名。
// 比如存储 <dubbo:method name="sayHello" retries="2"> 对应的 MethodConfig,
// 键 = sayHello.retries,map = {"sayHello.retries": 2, "xxx": "yyy"}
appendParameters(map, method, method.getName());
// ????
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
// 检测 MethodConfig retry 是否为 false,若是,则设置重试次数为0
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
// 获取 ArgumentConfig 列表
List<ArgumentConfig> arguments = method.getArguments();
if (arguments != null && !arguments.isEmpty()) {
for (ArgumentConfig argument : arguments) {
// 检测 type 属性是否为空,或者空串(分支1 ⭐️)
if (
argument.getType() != null
&& argument.getType().length() > 0) {
// 获取接口中定义的方法
Method[] methods = interfaceClass.getMethods();
if (methods != null && methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// 比对方法名,查找目标方法
if (methodName.equals(method.getName())) {
// 获取方法参数类型数组
Class<?>[] argtypes =
methods[i].getParameterTypes();
if (argument.getIndex() != -1) {
// 检测 ArgumentConfig 中的 type 属性与方法参数列表
// 中的参数名称是否一致,不一致则抛出异常(分支2 ⭐️)
if (
argtypes[argument.getIndex()]
.getName()
.equals(
argument.getType())) {
// 添加 ArgumentConfig 字段信息到 map 中,
// 键前缀 = 方法名.index,比如:
// map = {"sayHello.3": true}
appendParameters(
map,
argument,
method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("argument config error: ...");
}
} else { // 分支3 ⭐️
for (int j = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
// 从参数类型列表中查找类型名称为 argument.type 的参数
if (argclazz.getName().equals(argument.getType())) {
appendParameters(
map,
argument,
method.getName() + "." + j);
if (
argument.getIndex() != -1
&& argument.getIndex() != j) {
throw new IllegalArgumentException("argument config error: ...");
}
}
}
}
}
}
}
// 用户未配置 type 属性,但配置了 index 属性,且 index != -1
} else if (argument.getIndex() != -1) { // 分支4 ⭐️
// 添加 ArgumentConfig 字段信息到 map 中
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("argument config must set index or type");
}
}
}
}
}
// 省略导出 Dubbo 服务
}
伪代码解释
// 获取 ArgumentConfig 列表
for (遍历 ArgumentConfig 列表) {
if (type 不为 null,也不为空串) { // 分支1
1. 通过反射获取 interfaceClass 的方法列表
for (遍历方法列表) {
1. 比对方法名,查找目标方法
2. 通过反射获取目标方法的参数类型数组 argtypes
if (index != -1) { // 分支2
1. 从 argtypes 数组中获取下标 index 处的元素 argType
2. 检测 argType 的名称与 ArgumentConfig 中的 type 属性是否一致
3. 添加 ArgumentConfig 字段信息到 map 中,或抛出异常
} else { // 分支3
1. 遍历参数类型数组 argtypes,查找 argument.type 类型的参数
2. 添加 ArgumentConfig 字段信息到 map 中
}
}
} else if (index != -1) { // 分支4
1. 添加 ArgumentConfig 字段信息到 map 中
}
}
导出 Dubbo 服务
private void doExportUrlsFor1Protocol(
ProtocolConfig protocolConfig, List<URL> registryURLs) {
// 省略无关代码
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
// 加载 ConfiguratorFactory,并生成 Configurator 实例,然后通过实例配置 url
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(Constants.SCOPE_KEY);
// 如果 scope = none,则什么都不做
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
// scope != remote,导出到本地
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
// scope != local,导出到远程
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
// 加载监视器链接
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
// 将监视器链接作为参数添加到 url 中
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
String proxy = url.getParameter(Constants.PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
}
// 为服务提供类(ref)生成 Invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
// DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfig
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 导出服务,并生成 Exporter
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
// 不存在注册中心,仅导出服务
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
首先加载 ConfiguratorFactory,并生成 Configurator 实例,然后通过实例配置 url, 接着根据url的 SCOPE_KEY 属性判断是导出到本地还是远程。 对于远程服务的导出,首先会为服务提供类(ref)生成 Invoker,接着导出服务,并生成 Exporter
创建Invoker过程时序图:
Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。
Invoker 是由 ProxyFactory 创建而来,Dubbo 默认的 ProxyFactory 实现类是 JavassistProxyFactory
先看Invoker的获取过程
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// 为目标类创建 Wrapper
final Wrapper wrapper =
Wrapper
.getWrapper(
proxy.getClass().getName().indexOf('$') < 0
? proxy.getClass()
: type);
// 创建匿名 Invoker 类对象,并实现 doInvoke 方法。
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(
T proxy,
String methodName,
Class<?>[] parameterTypes,
Object[] arguments)
throws Throwable {
// 调用 Wrapper 的 invokeMethod 方法,invokeMethod 最终会调用目标方法
return wrapper
.invokeMethod(
proxy, methodName, parameterTypes, arguments);
}
};
}
首先根据目标类的class对象创建 Wrapper 实例, 接着创建匿名 Invoker 类对象,并实现 doInvoke 方法。
获取 Wrapper 实例
public static Wrapper getWrapper(Class<?> c) {
while (ClassGenerator.isDynamicClass(c))
c = c.getSuperclass();
if (c == Object.class)
return OBJECT_WRAPPER;
// 从缓存中获取 Wrapper 实例
Wrapper ret = WRAPPER_MAP.get(c);
if (ret == null) {
// 缓存未命中,创建 Wrapper
ret = makeWrapper(c);
// 写入缓存
WRAPPER_MAP.put(c, ret);
}
return ret;
}
首先从缓存中获取 Wrapper 实例,获取不到则创建 Wrapper并写入缓存
创建 Wrapper
private static Wrapper makeWrapper(Class<?> c) {
// 检测 c 是否为基本类型,若是则抛出异常
if (c.isPrimitive())
throw new IllegalArgumentException(
"Can not create wrapper for primitive type: " + c);
String name = c.getName();
ClassLoader cl = ClassHelper.getClassLoader(c);
// c1 用于存储 setPropertyValue 方法代码
StringBuilder c1 = new StringBuilder(
"public void setPropertyValue(Object o, String n, Object v){ ");
// c2 用于存储 getPropertyValue 方法代码
StringBuilder c2 = new StringBuilder(
"public Object getPropertyValue(Object o, String n){ ");
// c3 用于存储 invokeMethod 方法代码
StringBuilder c3 = new StringBuilder(
"public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws "
* InvocationTargetException.class.getName()
* "{ ");
// 生成类型转换代码及异常捕捉代码,比如:
// DemoService w; try { w = ((DemoServcie) $1); }}catch(Throwable e){ throw new IllegalArgumentException(e); }
c1
.append(name)
.append(" w; try{ w = ((")
.append(name)
.append(")$1); }catch(Throwable e){ throw new
IllegalArgumentException(e); }");
c2
.append(name)
.append(" w; try{ w = ((")
.append(name)
.append(")$1); }catch(Throwable e){ throw new
IllegalArgumentException(e); }");
c3
.append(name)
.append(" w; try{ w = ((")
.append(name)
.append(")$1); }catch(Throwable e){ throw new
IllegalArgumentException(e); }");
// pts 用于存储成员变量名和类型
Map<String, Class<?>> pts = new HashMap<String, Class<?>>();
// ms 用于存储方法描述信息(可理解为方法签名)及 Method 实例
Map<String, Method> ms = new LinkedHashMap<String, Method>();
// mns 为方法名列表
List<String> mns = new ArrayList<String>();
// dmns 用于存储“定义在当前类中的方法”的名称
List<String> dmns = new ArrayList<String>();
// --------------------------------✨ 分割线1 ✨-------------------------------------
// 获取 public 访问级别的字段,并为所有字段生成条件判断语句
for (Field f : c.getFields()) {
String fn = f.getName();
Class<?> ft = f.getType();
if (
Modifier.isStatic(f.getModifiers())
|| Modifier.isTransient(f.getModifiers()))
// 忽略关键字 static 或 transient 修饰的变量
continue;
// 生成条件判断及赋值语句,比如:
// if( $2.equals("name") ) { w.name = (java.lang.String) $3; return;}
// if( $2.equals("age") ) { w.age = ((Number) $3).intValue(); return;}
c1
.append(" if( $2.equals(\"")
.append(fn)
.append("\") ){ w.")
.append(fn)
.append("=")
.append(arg(ft, "$3"))
.append("; return; }");
// 生成条件判断及返回语句,比如:
// if( $2.equals("name") ) { return ($w)w.name; }
c2
.append(" if( $2.equals(\"")
.append(fn)
.append("\") ){ return ($w)w.")
.append(fn)
.append("; }");
// 存储 <字段名, 字段类型> 键值对到 pts 中
pts.put(fn, ft);
}
// --------------------------------✨ 分割线2 ✨-------------------------------------
Method[] methods = c.getMethods();
// 检测 c 中是否包含在当前类中声明的方法
boolean hasMethod = hasMethods(methods);
if (hasMethod) {
c3.append(" try{");
}
for (Method m : methods) {
if (m.getDeclaringClass() == Object.class)
// 忽略 Object 中定义的方法
continue;
String mn = m.getName();
// 生成方法名判断语句,比如:
// if ( "sayHello".equals( $2 )
c3
.append(" if( \"")
.append(mn)
.append("\".equals( $2 ) ");
int len = m.getParameterTypes().length;
// 生成“运行时传入的参数数量与方法参数列表长度”判断语句,比如:
// && $3.length == 2
c3
.append(" && ")
.append(" $3.length == ")
.append(len);
boolean override = false;
for (Method m2 : methods) {
// 检测方法是否存在重载情况,条件为:方法对象不同 && 方法名相同
if (m != m2 && m.getName().equals(m2.getName())) {
override = true;
break;
}
}
// 对重载方法进行处理,考虑下面的方法:
// 1. void sayHello(Integer, String)
// 2. void sayHello(Integer, Integer)
// 方法名相同,参数列表长度也相同,因此不能仅通过这两项判断两个方法是否相等。
// 需要进一步判断方法的参数类型
if (override) {
if (len > 0) {
for (int l = 0; l < len; l++) {
// 生成参数类型进行检测代码,比如:
// && $3[0].getName().equals("java.lang.Integer")
// && $3[1].getName().equals("java.lang.String")
c3
.append(" && ")
.append(" $3[")
.append(l)
.append("].getName().equals(\"")
.append(m.getParameterTypes()[l].getName())
.append("\")");
}
}
}
// 添加 ) {,完成方法判断语句,此时生成的代码可能如下(已格式化):
// if ("sayHello".equals($2)
// && $3.length == 2
// && $3[0].getName().equals("java.lang.Integer")
// && $3[1].getName().equals("java.lang.String")) {
c3.append(" ) { ");
// 根据返回值类型生成目标方法调用语句
if (m.getReturnType() == Void.TYPE)
// w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]); return null;
c3
.append(" w.")
.append(mn)
.append('(')
.append(args(m.getParameterTypes(), "$4"))
.append(");")
.append(" return null;");
else
// return w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]);
c3
.append(" return ($w)w.")
.append(mn)
.append('(')
.append(args(m.getParameterTypes(), "$4"))
.append(");");
// 添加 }, 生成的代码形如(已格式化):
// if ("sayHello".equals($2)
// && $3.length == 2
// && $3[0].getName().equals("java.lang.Integer")
// && $3[1].getName().equals("java.lang.String")) {
//
// w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]);
// return null;
// }
c3.append(" }");
// 添加方法名到 mns 集合中
mns.add(mn);
// 检测当前方法是否在 c 中被声明的
if (m.getDeclaringClass() == c)
// 若是,则将当前方法名添加到 dmns 中
dmns.add(mn);
ms.put(ReflectUtils.getDesc(m), m);
}
if (hasMethod) {
// 添加异常捕捉语句
c3.append(" } catch(Throwable e) { ");
c3.append(" throw new java.lang.reflect.InvocationTargetException(e); ");
c3.append(" }");
}
// 添加 NoSuchMethodException 异常抛出代码
c3.append(" throw new " + NoSuchMethodException.class.getName() + "(\"Not found method \\\"\"+$2+\"\\\" in class " + c.getName() + ".\"); }");
// --------------------------------✨ 分割线3 ✨-------------------------------------
Matcher matcher;
// 处理 get/set 方法
for (Map.Entry<String, Method> entry : ms.entrySet()) {
String md = entry.getKey();
Method method = (Method) entry.getValue();
// 匹配以 get 开头的方法
if (
(matcher =
ReflectUtils
.GETTER_METHOD_DESC_PATTERN
.matcher(md)).matches()) {
// 获取属性名
String pn = propertyName(matcher.group(1));
// 生成属性判断以及返回语句,示例如下:
// if( $2.equals("name") ) { return ($w).w.getName(); }
c2
.append(" if( $2.equals(\"")
.append(pn)
.append("\") ){ return ($w)w.")
.append(method.getName())
.append("(); }");
pts.put(pn, method.getReturnType());
// 匹配以 is/has/can 开头的方法
} else if (
(matcher =
ReflectUtils
.IS_HAS_CAN_METHOD_DESC_PATTERN
.matcher(md)).matches()) {
String pn = propertyName(matcher.group(1));
// 生成属性判断以及返回语句,示例如下:
// if( $2.equals("dream") ) { return ($w).w.hasDream(); }
c2
.append(" if( $2.equals(\"")
.append(pn)
.append("\") ){ return ($w)w.")
.append(method.getName())
.append("(); }");
pts.put(pn, method.getReturnType());
// 匹配以 set 开头的方法
} else if (
(matcher =
ReflectUtils
.SETTER_METHOD_DESC_PATTERN
.matcher(md)).matches()) {
Class<?> pt = method.getParameterTypes()[0];
String pn = propertyName(matcher.group(1));
// 生成属性判断以及 setter 调用语句,示例如下:
// if( $2.equals("name") ) { w.setName((java.lang.String)$3); return; }
c1
.append(" if( $2.equals(\"")
.append(pn)
.append("\") ){ w.")
.append(method.getName())
.append("(")
.append(arg(pt, "$3"))
.append("); return; }");
pts.put(pn, pt);
}
}
// 添加 NoSuchPropertyException 异常抛出代码
c1.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" filed or setter method in class " + c.getName() + ".\"); }");
c2.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" filed or setter method in class " + c.getName() + ".\"); }");
// --------------------------------✨ 分割线4 ✨-------------------------------------
long id = WRAPPER_CLASS_COUNTER.getAndIncrement();
// 创建类生成器
ClassGenerator cc = ClassGenerator.newInstance(cl);
// 设置类名及超类
cc
.setClassName(
(Modifier.isPublic(c.getModifiers())
? Wrapper.class.getName()
: c.getName() + "$sw") + id);
cc.setSuperClass(Wrapper.class);
// 添加默认构造方法
cc.addDefaultConstructor();
// 添加字段
cc.addField("public static String[] pns;");
cc.addField("public static " + Map.class.getName() + " pts;");
cc.addField("public static String[] mns;");
cc.addField("public static String[] dmns;");
for (int i = 0, len = ms.size(); i < len; i++)
cc.addField("public static Class[] mts" + i + ";");
// 添加方法代码
cc.addMethod("public String[] getPropertyNames(){ return pns; }");
cc.addMethod("public boolean hasProperty(String n){ return pts.containsKey($1); }");
cc.addMethod("public Class getPropertyType(String n){ return (Class)pts.get($1); }");
cc.addMethod("public String[] getMethodNames(){ return mns; }");
cc.addMethod("public String[] getDeclaredMethodNames(){ return dmns; }");
cc.addMethod(c1.toString());
cc.addMethod(c2.toString());
cc.addMethod(c3.toString());
try {
// 生成类
Class<?> wc = cc.toClass();
// 设置字段值
wc.getField("pts").set(null, pts);
wc.getField("pns").set(null, pts.keySet().toArray(new String[0]));
wc.getField("mns").set(null, mns.toArray(new String[0]));
wc.getField("dmns").set(null, dmns.toArray(new String[0]));
int ix = 0;
for (Method m : ms.values())
wc.getField("mts" + ix++).set(null, m.getParameterTypes());
// 创建 Wrapper 实例
return (Wrapper) wc.newInstance();
} catch (RuntimeException e) {
throw e;
} catch (Throwable e) {
throw new RuntimeException(e.getMessage(), e);
} finally {
cc.release();
ms.clear();
mns.clear();
dmns.clear();
}
}
生成代码示例
public class XXXXProxy {
public static String[] pns;
public static Map pts;
public static String[] mns;
public static String[] dmns;
public static Class[] mts1;
public static Class[] mts2;
public String[] getPropertyNames(){ return pns; }
public boolean hasProperty(String n){ return pts.containsKey(n); }
public Class getPropertyType(String n){ return (Class)pts.get(n); }
public String[] getMethodNames(){ return mns; }
public String[] getDeclaredMethodNames(){ return dmns; }
public void setPropertyValue(Object o, String n, Object v){
DemoService w;
try {
w = ((DemoServcie) o);
} catch(Throwable e) {
throw new IllegalArgumentException(e);
}
if( n.equals("name") ) { w.setName((java.lang.String)v); return; }
throw new NoSuchPropertyException("Not found property xxx filed or setter method in class .");
}
public Object getPropertyValue(Object o, String n){
DemoService w;
try {
w = ((DemoServcie) o);
} catch(Throwable e) {
throw new IllegalArgumentException(e);
}
if( n.equals("name") ) { return ($w).w.getName(); }
if( n.equals("dream") ) { return ($w).w.hasDream(); }
throw new NoSuchPropertyException("Not found property xxx filed or setter method in class .");
}
public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws Exception {
DemoService w;
try {
w = ((DemoServcie) o);
} catch(Throwable e) {
throw new IllegalArgumentException(e);
}
try{
if ( "sayHello".equals( n )
&& p.length == 2
&& p[0].getName().equals("java.lang.Integer")
&& p[1].getName().equals("java.lang.String")) {
w.sayHello((java.lang.Integer)v[0], (java.lang.String)v[1]); return null;
}
} catch(Throwable e) {
throw new java.lang.reflect.InvocationTargetException(e);
}
throw new NoSuchMethodException("Not found method .......");
}
}
/**
- 首先根据 URL 协议头决定是否导出服务。
- 若需导出,则创建一个新的 URL 并将协议头、主机名以及端口设置成新的值。
- 然后创建 Invoker,并调用 InjvmProtocol 的 export 方法导出服务
*/
private void exportLocal(URL url) {
// 如果 URL 的协议头等于 injvm,说明已经导出到本地了,无需再次导出
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
URL local = URL.valueOf(url.toFullString())
.setProtocol(Constants.LOCAL_PROTOCOL) // 设置协议头为 injvm
.setHost(LOCALHOST)
.setPort(0);
ServiceClassHolder
.getInstance()
.pushServiceClass(getServiceClass(ref));
// 创建 Invoker,并导出服务,这里的 protocol 会在运行时调用 InjvmProtocol 的 export 方法
Exporter<?> exporter =
protocol
.export(
proxyFactory
.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
}
}
首先根据URL 的协议头是否等于injvm判断服务是否已经导出到本地,如果未导出设置协议头为 injvm, 然后创建 Invoker,并导出服务
使用 InjvmProtocol 的 export 导出AbstractProxyInvoker到本地
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 创建 InjvmExporter
return
new InjvmExporter<T>(invoker, invoker.getUrl()
.getServiceKey(), exporterMap);
}
/**
- 1. 调用 doLocalExport 导出服务
- 2. 向注册中心注册服务
- 3. 向注册中心进行订阅 override 数据
- 4. 创建并返回 DestroyableExporter
*/
public <T> Exporter<T> export(final Invoker<T> originInvoker)
throws RpcException {
// 导出服务
final ExporterChangeableWrapper<T> exporter =
doLocalExport(originInvoker);
// 获取注册中心 URL,以 zookeeper 注册中心为例,得到的示例 URL 如下:
// zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider
URL registryUrl = getRegistryUrl(originInvoker);
// 根据 URL 加载 Registry 实现类,比如 ZookeeperRegistry
final Registry registry = getRegistry(originInvoker);
// 获取已注册的服务提供者 URL,比如:
// dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
final URL registeredProviderUrl =
getRegisteredProviderUrl(originInvoker);
// 获取 register 参数
boolean register =
registeredProviderUrl.getParameter("register", true);
// 向服务提供者与消费者注册表中注册服务提供者
ProviderConsumerRegTable
.registerProvider(
originInvoker, registryUrl, registeredProviderUrl);
// 根据 register 的值决定是否注册服务
if (register) {
// 向注册中心注册服务
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable
.getProviderWrapper(originInvoker)
.setReg(true);
}
// 获取订阅 URL,比如:
// provider://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?category=configurators&check=false&anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
final URL overrideSubscribeUrl =
getSubscribedOverrideUrl(registeredProviderUrl);
// 创建监听器
final OverrideListener overrideSubscribeListener =
new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
// 向注册中心进行订阅 override 数据
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
// 创建并返回 DestroyableExporter
return
new DestroyableExporter<T>(
exporter,
originInvoker,
overrideSubscribeUrl,
registeredProviderUrl);
}
- 首先调用 doLocalExport 导出服务,
- 接着向注册中心注册服务,
- 然后向注册中心进行订阅 override 数据,
- 最后创建并返回 DestroyableExporter
doLocalExport 导出服务
private <T> ExporterChangeableWrapper<T> doLocalExport(
final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
// 访问缓存
ExporterChangeableWrapper<T> exporter =
(ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) { // 双重检查锁
// 创建 Invoker 为委托类对象
final Invoker<?> invokerDelegete =
new InvokerDelegete<T>(
originInvoker, getProviderUrl(originInvoker));
// 调用 protocol 的 export 方法导出服务
exporter =
new ExporterChangeableWrapper<T>(
(Exporter<T>) protocol.export(invokerDelegete),
originInvoker);
// 写缓存
bounds.put(key, exporter);
}
}
}
return exporter;
}
首先尝试从缓存中获取,有则返回,如果没有,则调用 protocol 的 export 方法导出服务
DubboProtocol export 方法导出服务
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// 获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成。比如:
// demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
String key = serviceKey(url);
// 创建 DubboExporter 实例
DubboExporter<T> exporter =
new DubboExporter<T>(invoker, key, exporterMap);
// 将 <key, exporter> 键值对放入缓存中
exporterMap.put(key, exporter);
// 本地存根相关代码
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods =
url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (
stubServiceMethods == null
|| stubServiceMethods.length() == 0) {
// 省略日志打印代码
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
// 启动服务器
openServer(url);
// 优化序列化
optimizeSerialization(url);
return exporter;
}
- 首先获取由服务组名,服务名,服务版本号以及端口组成服务标识,理解成服务坐标也行,
- 接着创建 DubboExporter 实例 exporter
- 然后将上面创建的exporter实例保存到缓存中,key为第一步的服务标识
- 接着会根据url启动服务器,打开一个TCP长连接
打开服务
创建netty服务时序图:
/**
- 打开服务
*/
private void openServer(URL url) {
// 获取 host:port,并将其作为服务器实例的 key,用于标识当前的服务器实例
String key = url.getAddress();
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
// 访问缓存
ExchangeServer server = serverMap.get(key);
if (server == null) {
// 创建服务器实例
serverMap.put(key, createServer(url));
} else {
// 服务器已创建,则根据 url 中的配置重置服务器
server.reset(url);
}
}
}
首先也是先访问缓存获取服务器实例,如果服务器已创建,则根据 url 中的配置重置服务器,否则创建服务器实例
创建服务器实例
/**
- 创建服务器实例
- 第一是检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常。
- 第二是创建服务器实例。
- 第三是检测是否支持 client 参数所表示的 Transporter 拓展,不存在也是抛出异常。
*/
private ExchangeServer createServer(URL url) {
url =
url.addParameterIfAbsent(
Constants.CHANNEL_READONLYEVENT_SENT_KEY,
Boolean.TRUE.toString());
// 添加心跳检测配置到 url 中
url =
url.addParameterIfAbsent(
Constants.HEARTBEAT_KEY,
String.valueOf(Constants.DEFAULT_HEARTBEAT));
// 获取 server 参数,默认为 netty
String str =
url.getParameter(
Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
// 通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常
if (
str != null && str.length() > 0
&& !ExtensionLoader
.getExtensionLoader(Transporter.class)
.hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
// 添加编码解码器参数
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
// 创建 ExchangeServer
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server...");
}
// 获取 client 参数,可指定 netty,mina
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
// 获取所有的 Transporter 实现类名称集合,比如 supportedTypes = [netty, mina]
Set<String> supportedTypes =
ExtensionLoader
.getExtensionLoader(Transporter.class)
.getSupportedExtensions();
// 检测当前 Dubbo 所支持的 Transporter 实现类名称列表中,
// 是否包含 client 所表示的 Transporter,若不包含,则抛出异常
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type...");
}
}
return server;
}
创建 ExchangeServer
public static ExchangeServer bind(URL url, ExchangeHandler handler)
throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 获取 Exchanger,默认为 HeaderExchanger。
// 紧接着调用 HeaderExchanger 的 bind 方法创建 ExchangeServer 实例
return getExchanger(url).bind(url, handler);
}
//
public ExchangeServer bind(URL url, ExchangeHandler handler)
throws RemotingException {
// 创建 HeaderExchangeServer 实例,该方法包含了多个逻辑,分别如下:
// 1. new HeaderExchangeHandler(handler)
// 2. new DecodeHandler(new HeaderExchangeHandler(handler))
// 3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
return
new HeaderExchangeServer(
Transporters
.bind(
url,
new DecodeHandler(
new HeaderExchangeHandler(handler))));
}
public static Server bind(URL url, ChannelHandler... handlers)
throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
// 如果 handlers 元素数量大于1,则创建 ChannelHandler 分发器
handler = new ChannelHandlerDispatcher(handlers);
}
// 获取自适应 Transporter 实例,并调用实例方法
return getTransporter().bind(url, handler);
}
getTransporter() 方法获取的 Transporter 是在运行时动态创建的,类名为 TransporterAdaptive,也就是自适应拓展类。 TransporterAdaptive 会在运行时根据传入的 URL 参数决定加载什么类型的 Transporter,默认为 NettyTransporter。
public Server bind(URL url, ChannelHandler listener)
throws RemotingException {
// 创建 NettyServer
return new NettyServer(url, listener);
}
创建 NettyServer
public class NettyServer extends AbstractServer implements Server {
public NettyServer(URL url, ChannelHandler handler)
throws RemotingException {
// 调用父类构造方法
super(url,
ChannelHandlers.wrap(
handler,
ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
}
public abstract class AbstractServer extends AbstractEndpoint implements Server {
public AbstractServer(URL url, ChannelHandler handler)
throws RemotingException {
// 调用父类构造方法
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
// 获取 ip 和端口
String bindIp =
getUrl()
.getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort =
getUrl()
.getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (
url.getParameter(Constants.ANYHOST_KEY, false)
|| NetUtils.isInvalidLocalHost(bindIp)) {
// 设置 ip 为 0.0.0.0
bindIp = NetUtils.ANYHOST;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
// 获取最大可接受连接数
this.accepts =
url
.getParameter(
Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
this.idleTimeout =
url
.getParameter(
Constants.IDLE_TIMEOUT_KEY,
Constants.DEFAULT_IDLE_TIMEOUT);
try {
// 调用模板方法 doOpen 启动服务器
doOpen();
} catch (Throwable t) {
throw new RemotingException("Failed to bind ");
}
DataStore dataStore =
ExtensionLoader
.getExtensionLoader(DataStore.class)
.getDefaultExtension();
executor =
(ExecutorService) dataStore
.get(
Constants.EXECUTOR_SERVICE_COMPONENT_KEY,
Integer.toString(url.getPort()));
}
protected abstract void doOpen() throws Throwable;
protected abstract void doClose() throws Throwable;
}
Netty创建TCP连接
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
// 创建 boss 和 worker 线程池
ExecutorService boss =
Executors.newCachedThreadPool(
new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker =
Executors.newCachedThreadPool(
new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory =
new NioServerSocketChannelFactory(
boss,
worker,
getUrl()
.getPositiveParameter(
Constants.IO_THREADS_KEY,
Constants.DEFAULT_IO_THREADS));
// 创建 ServerBootstrap
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
bootstrap.setOption("child.tcpNoDelay", true);
// 设置 PipelineFactory
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter =
new NettyCodecAdapter(
getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// 绑定到指定的 ip 和端口上
channel = bootstrap.bind(getBindAddress());
}
注册服务时序图:
public <T> Exporter<T> export(final Invoker<T> originInvoker)
throws RpcException {
// ${导出服务}
// 省略其他代码
boolean register =
registeredProviderUrl.getParameter("register", true);
if (register) {
// 注册服务
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable
.getProviderWrapper(originInvoker).setReg(true);
}
final URL overrideSubscribeUrl =
getSubscribedOverrideUrl(registeredProviderUrl);
final OverrideListener overrideSubscribeListener =
new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
// 订阅 override 数据
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
// 省略部分代码
}
注册服务
/**
- 第一步是获取注册中心实例,第二步是向注册中心注册服务
*/
public void register(URL registryUrl, URL registedProviderUrl) {
// 获取 Registry
Registry registry = registryFactory.getRegistry(registryUrl);
// 注册服务
registry.register(registedProviderUrl);
}
获取注册中心实例
public Registry getRegistry(URL url) {
url =
url
.setPath(RegistryService.class.getName())
.addParameter(
Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
String key = url.toServiceString();
LOCK.lock();
try {
// 访问缓存
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
// 缓存未命中,创建 Registry 实例
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry...");
}
// 写入缓存
REGISTRIES.put(key, registry);
return registry;
} finally {
LOCK.unlock();
}
}
创建 Registry 实例
protected abstract Registry createRegistry(URL url);
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
// zookeeperTransporter 由 SPI 在运行时注入,类型为 ZookeeperTransporter$Adaptive
private ZookeeperTransporter zookeeperTransporter;
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
@Override
public Registry createRegistry(URL url) {
// 创建 ZookeeperRegistry
return new ZookeeperRegistry(url, zookeeperTransporter);
}
public ZookeeperRegistry(
URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
// 获取组名,默认为 dubbo
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
// group = "/" + group
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
// 创建 Zookeeper 客户端,默认为 CuratorZookeeperTransporter
zkClient = zookeeperTransporter.connect(url);
// 添加状态监听器
zkClient.addStateListener(new StateListener() {
@Override
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
}
}
Zookeeper 客户端的创建过程
public ZookeeperClient connect(URL url) {
// 创建 CuratorZookeeperClient
return new CuratorZookeeperClient(url);
}
Curator模式创建ZookeeperClient
public class CuratorZookeeperClient
extends AbstractZookeeperClient<CuratorWatcher> {
private final CuratorFramework client;
public CuratorZookeeperClient(URL url) {
super(url);
try {
// 创建 CuratorFramework 构造器
CuratorFrameworkFactory.Builder builder =
CuratorFrameworkFactory
.builder()
.connectString(url.getBackupAddress())
.retryPolicy(new RetryNTimes(1, 1000))
.connectionTimeoutMs(5000);
// 客户端连接是否需要认证
String authority = url.getAuthority();
if (authority != null && authority.length() > 0) {
builder =
builder
.authorization(
"digest", authority.getBytes());
}
// 构建 CuratorFramework 实例
client = builder.build();
// 添加监听器
client
.getConnectionStateListenable()
.addListener(new ConnectionStateListener() {
@Override
public void stateChanged(
CuratorFramework client,
ConnectionState state) {
if (state == ConnectionState.LOST) {
CuratorZookeeperClient
.this
.stateChanged(StateListener.DISCONNECTED);
} else if (state == ConnectionState.CONNECTED) {
CuratorZookeeperClient
.this
.stateChanged(StateListener.CONNECTED);
} else if (state == ConnectionState.RECONNECTED) {
CuratorZookeeperClient
.this
.stateChanged(StateListener.RECONNECTED);
}
}
});
// 启动客户端
client.start();
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}
注册register(URL)
// FailbackRegistry
public void register(URL url) {
super.register(url);
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// 模板方法,由子类实现
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// 获取 check 参数,若 check = true 将会直接抛出异常
boolean check =
getUrl()
.getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants
.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback =
t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register");
} else {
logger.error("Failed to register");
}
// 记录注册失败的链接
failedRegistered.add(url);
}
}
ZookeeperRegistry 在 doRegister 中调用了 Zookeeper 客户端创建服务节点。 节点路径由 toUrlPath 方法生成
protected void doRegister(URL url) {
try {
// 通过 Zookeeper 客户端创建节点,节点路径由 toUrlPath 方法生成,路径格式如下:
// /${group}/${serviceInterface}/providers/${url}
// 比如
// /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
zkClient
.create(
toUrlPath(url),
url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register...");
}
}
递归创建节点
public void create(String path, boolean ephemeral) {
if (!ephemeral) {
// 如果要创建的节点类型非临时节点,那么这里要检测节点是否存在
if (checkExists(path)) {
return;
}
}
int i = path.lastIndexOf('/');
if (i > 0) {
// 递归创建上一级路径
create(path.substring(0, i), false);
}
// 根据 ephemeral 的值创建临时或持久节点
if (ephemeral) {
createEphemeral(path);
} else {
createPersistent(path);
}
}
订阅时序图: