注册中心原理
分布式微服务架构中,服务注册中⼼⽤于存储服务提供者地址信息、服务发布相关 的属性信息,消费者通过主动查询和被动通知的⽅式获取服务提供者的地址信息, ⽽不再需要通过硬编码⽅式得到提供者的地址信息。消费者只需要知道当前系统发 布了那些服务,⽽不需要知道服务具体存在于什么位置,这就是透明化路由。
pull模式: 服务消费者可以主动拉取可用的服务提供者清单
push模式: 服务消费者订阅服务,当服务提供者有变化时,注册中心也会主动推送更新后的服务清单给消费者
注册中心需要完成对服务提供者的健康监控,当发现服务提供者失效时需要及时剔除
主流服务中心对比
- Zookeeper
Zookeeper它是⼀个分布式服务框架,是Apache Hadoop 的⼀个⼦项⽬,它主 要是⽤来解决分布式应 ⽤中经常遇到的⼀些数据管理问题,如:统⼀命名服务、状态同步服务、集群管理、分布式应⽤配置项的管理等。
Zookeeper本质=存储+监听通知,主要是通过znode监听实现服务注册的,只要服务节点发生变动,就会通知到监听端,另外,Zookeeper有选举机制,保证了可用性,只要有一半以上的选举节点存活即可.
- Eureka
由Netflix开源,并被Pivatal集成到SpringCloud体系中,它是基于 RestfulAPI ⻛格开发的服务注册与发现组件
- Consul
Consul是由HashiCorp基于Go语⾔开发的⽀持多数据中⼼分布式⾼可⽤的服务 发布和注册服务软件, 采⽤Raft算法保证服务的⼀致性,且⽀持健康检查
- Nacos
Nacos是⼀个更易于构建云原⽣应⽤的动态服务发现、配置管理和服务管理平 台。简单来说 Nacos 就是 注册中⼼ + 配置中⼼的组合,帮助我们解决微服务开 发必会涉及到的服务注册 与发现,服务配置,服务管理等问题。Nacos 是 Spring Cloud Alibaba 核⼼组件之⼀,负责服务注册与发现,还有配置。
P:分区容错性(⼀定的要满⾜的)
C:数据⼀致性
A:⾼可⽤
CAP不可能同时满⾜三个,要么是AP,要么是CP
Eureka组件
基础架构
交互流程
这个是官方的一个描述图
- 从图中可以看出,Eureka Server支持集群同步,异地同步.高可用性
- 微服务会定期向Eureka Server发送心跳,进行存活续约
- Eureka Server同时也是Eureka Client,多个Eureka Server通过复制的方式完成服务注册列表同步
- Eureka Client会缓存Eureka Server中的信息,即使所有Eureka Server节点宕机,服务消费者依然可以使用缓存查找到服务提供者
Eureka源码剖析
1.Eureka Server启动过程
SpringBoot启动时,会加载EurekaServerAutoConfiguration配置类
- EurekaServerAutoConfiguration类
- 只有添加了@EnableEurekaServer注解,才会引入EurekaServerMarkerConfiguration类
- EurekaServerAutoConfiguration做了哪些事
针对PeerEurekaNodes是如何实现节点更新的呢?可以查看类中的start方法
上面的updatePeerEurekaNodes是什么时候执行的呢?
可以看到在构建DefaultEurekaServerContext对象时,会执行start方法
在配置类中,我们还需要关注如下两个bea:
- EurekaServerInitializerConfiguration做了哪些事?
查看contextInitialized中都初始化了哪些内容?
继续查看initEurekaServerContext中都做了啥?
查看syncUp方法中,如何实现的注册信息同步?
再看一看同步过来的信息是如何注册到EurekaServer中的
我们再看一下如何实现的服务剔除
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#openForTraffic
进入postInit方法查看
2.Eureka服务接口暴露策略
在第一步中,我们看到自动配置类中注册了jerseyFilterRegistration
注入Jersey的细节
具体要扫描那些包呢?
查看这些包下包含哪些接口服务
这些就是使⽤Jersey发布的供Eureka Client调⽤的Restful⻛格服务接⼝(完成服务 注册、⼼跳续约等接⼝
3.Eureka Server服务注册接口(接受客户端注册服务)
查看ApplicationResource类的addInstance()⽅法中代码
点击register方法:注册服务信息并且同步到其他Eureka节点,查看实现
点击super.register方法:实例信息存储到注册表是一个ConcurrentHashMap.查看实现
/**
* Registers a new instance with a given duration.
*
* @see
com.netflix.eureka.lease.LeaseManager#register(java.lang.Object,
int, boolean)
*/
public void register(InstanceInfo registrant, int leaseDuration,
boolean isReplication) {
try {
read.lock(); //读锁
// registry是保存所有应⽤实例信息的Map:
ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>
// 从registry中获取当前appName的所有实例信息
Map<String, Lease<InstanceInfo>> gMap =
registry.get(registrant.getAppName());
REGISTER.increment(isReplication); //注册统计+1
// 如果当前appName实例信息为空,新建Map
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>>
gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(),
gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
// 获取实例的Lease租约信息
Lease<InstanceInfo> existingLease =
gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it,
if there is already a lease
// 如果已经有租约,则保留最后⼀个脏时间戳⽽不覆盖它
// (⽐较当前请求实例租约 和 已有租约 的LastDirtyTimestamp,选择靠
后的)
if (existingLease != null && (existingLease.getHolder() !=
null)) {
Long existingLastDirtyTimestamp =
existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp =
registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={},
provided={}", existingLastDirtyTimestamp,
registrationLastDirtyTimestamp);
if (existingLastDirtyTimestamp >
registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the
existing lease's dirty timestamp {} is greater" +
" than the one that is being registered
{}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo
instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
}
else {
// The lease does not exist and hence it is a new
registration
// 如果之前不存在实例的租约,说明是新实例注册
// expectedNumberOfRenewsPerMin期待的每分钟续约数+2(因为
30s⼀个)
// 并更新numberOfRenewsPerMinThreshold每分钟续约阀值(85%)
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce
the threshold
// (1
// for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin =
this.expectedNumberOfRenewsPerMin + 2;
this.numberOfRenewsPerMinThreshold =
(int)
(this.expectedNumberOfRenewsPerMin *
serverConfig.getRenewalPercentThreshold());
}
}
logger.debug("No previous lease information found; it
is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>
(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp())
;
}
gMap.put(registrant.getId(), lease); //当前实例信息放到维护注册
信息的Map
// 同步维护最近注册队列
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" +
registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden
status happens
// 如果当前实例已经维护了OverriddenStatus,将其也放到此Eureka
Server的overriddenInstanceStatusMap中
if
(!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus()))
{
logger.debug("Found overridden status {} for instance
{}. Checking to see if needs to be add to the "
+ "overrides",
registrant.getOverriddenStatus(), registrant.getId());
if
(!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence
adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(),
registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap =
overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map",
overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
// 根据overridden status规则,设置状态
InstanceStatus overriddenInstanceStatus
= getOverriddenInstanceStatus(registrant,
existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease
service up timestamp
// 如果租约以UP状态注册,设置租赁服务时间戳
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED); //ActionType为
ADD
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
//维护recentlyChangedQueue
registrant.setLastUpdatedTimestamp(); //更新最后更新时间
// 使当前应⽤的ResponseCache失效
invalidateCache(registrant.getAppName(),
registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {}
(replication={})",
registrant.getAppName(), registrant.getId(),
registrant.getStatus(), isReplication);
} finally {
read.unlock(); //读锁
}
}
我们再看一下如何复制的对等节点,PeerAwareInstanceRegistryImpl#replicateToPeers() :复制到Eureka对等节点
private void replicateToPeers(Action action, String appName, String
id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional
*/, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
// 如果是复制操作(针对当前节点,false)
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// 如果它已经是复制,请不要再次复制,直接return
if (peerEurekaNodes == Collections.EMPTY_LIST ||
isReplication) {
return;
}
// 遍历集群所有节点(除当前节点外)
for (final PeerEurekaNode node :
peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to
yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl()))
{
continue;
}
// 复制Instance实例操作到某个node节点
replicateInstanceActionsToPeers(action, appName, id,
info, newStatus, node);
}
}
finally {
tracer.stop();
}
}
查看PeerAwareInstanceRegistryImpl#replicateInstanceActionsToPeers的具体实现
4.Eureka Server服务续约接口(接受客户端续约)
InstanceResource的renewLease⽅法中完成客户端的⼼跳(续约)处理,关键代码:registry.renew(app.getName(), id, isFromReplicaNode)
replicateInstanceActionsToPeers() 复制Instance实例操作到其它节点
private void replicateInstanceActionsToPeers(Action action, String
appName,
String id,
InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel: //取消
node.cancel(appName, id);
break;
case Heartbeat: //⼼跳
InstanceStatus overriddenStatus =
overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName,
id, false);
node.heartbeat(appName, id, infoFromRegistry,
overriddenStatus, false);
break;
case Register: //注册
node.register(info);
break;
case StatusUpdate: //状态更新
infoFromRegistry = getInstanceByAppAndId(appName,
id, false);
node.statusUpdate(appName, id, newStatus,
infoFromRegistry);
break;
case DeleteStatusOverride: //删除OverrideStatus
infoFromRegistry = getInstanceByAppAndId(appName,
id, false);
node.deleteStatusOverride(appName, id,
infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action
{}", node.getServiceUrl(), action.name(), t);
}
}
5.Eureka Client注册服务
SpringBoot启动时,会加载EurekaClientAutoConfiguration配置类
分析EurekaClientAutoConfiguration类的注解
1.读取配置文件
2.启动时从EurekaServer获取服务实例信息
观察父类DiscoveryClient()
3.注册自己到EurekaServer中
4.开启一些定时任务(心跳,刷新缓存)
刷新缓存
心跳续约
6. Eureka Client下架服务
查看com.netflix.discovery.DiscoveryClient#shutdown