Eureka服务端 - 969251639/study GitHub Wiki

服务端也主要分接收服务注册,心跳续租,下线等部分浅析

服务注册:

    @Override
    public void register(final InstanceInfo info, final boolean isReplication) {
	handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
	super.register(info, isReplication);
    }

    @Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        ...
        super.register(info, leaseDuration, isReplication);
        ...
    }

    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            read.lock();
            //private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
            = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();保存所有的服务信息的map
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            if (gMap == null) {//如果找不到,则注册到map中
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            ...
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {//如果存在租约,则刷新租约的最新时间
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            gMap.put(registrant.getId(), lease);
            //启动时开始计算租约时间
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            ...
        } finally {
            read.unlock();
        }
    }

简单理解,注册就是把服务存放到map中,然后开始计算租约

心跳续租:

    @Override
    public boolean renew(final String appName, final String serverId,
	boolean isReplication) {
	...
	return super.renew(appName, serverId, isReplication);
    }
    public boolean renew(final String appName, final String id, final boolean isReplication) {
        if (super.renew(appName, id, isReplication)) {
            replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
            return true;
        }
        return false;
    }

    public boolean renew(String appName, String id, boolean isReplication) {
            ...
            Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
            Lease<InstanceInfo> leaseToRenew = null;
            if (gMap != null) {
                leaseToRenew = gMap.get(id);//获取租约信息
            }
            ...
            leaseToRenew.renew();//刷新续租时间
            return true;
        }
    }
    public void renew() {
        lastUpdateTimestamp = System.currentTimeMillis() + duration;
    }

下线:

    @Override
    public boolean cancel(String appName, String serverId, boolean isReplication) {
	handleCancelation(appName, serverId, isReplication);
	return super.cancel(appName, serverId, isReplication);
    }

    @Override
    public boolean cancel(final String appName, final String id,
                          final boolean isReplication) {
        if (super.cancel(appName, id, isReplication)) {
            ...
            return true;
        }
        return false;
    }

    @Override
    public boolean cancel(String appName, String id, boolean isReplication) {
        return internalCancel(appName, id, isReplication);
    }

    protected boolean internalCancel(String appName, String id, boolean isReplication) {
        try {
            read.lock();
            CANCEL.increment(isReplication);
            Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);//获取服务信息
            Lease<InstanceInfo> leaseToCancel = null;
            if (gMap != null) {
                leaseToCancel = gMap.remove(id);//将该服务从map中删掉
            }
            ...
        } finally {
            read.unlock();
        }
    }

下线简单的来说就是将保存在map中的服务信息从map中删除掉

服务过期:
Eureka服务器在启动时会启动一个定时任务,定期扫描过期的服务
Eureka的启动类EurekaBootStrap中,实现了ServletContextListener接口,那么服务启动时会调用contextInitialized方法,这个contextInitialized就是整个Eureka启动的开始

    @Override
    public void contextInitialized(ServletContextEvent event) {
            ...
            initEurekaServerContext();
      ...
    }
    protected void initEurekaServerContext() throws Exception {
        ...
        registry.openForTraffic(applicationInfoManager, registryCount);
        ...
    }

    @Override
    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
	super.openForTraffic(applicationInfoManager,
	    count == 0 ? this.defaultOpenForTrafficCount : count);
    }

    @Override
    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
        ...
        super.postInit();
    }

    protected void postInit() {
        renewsLastMin.start();
        if (evictionTaskRef.get() != null) {//启动前清除之前的任务,如果有的话
            evictionTaskRef.get().cancel();
        }
        evictionTaskRef.set(new EvictionTask());//设置任务
        evictionTimer.schedule(evictionTaskRef.get(),
                serverConfig.getEvictionIntervalTimerInMs(),
                serverConfig.getEvictionIntervalTimerInMs());//开始调度
    }

evictionTimer是一个Timer,所以这个定时器会在serverConfig.getEvictionIntervalTimerInMs()之后以每serverConfig.getEvictionIntervalTimerInMs()时间定时运行 serverConfig.getEvictionIntervalTimerInMs()默认是60秒,也就是每分钟检查一次
接下来看EvictionTask的run方法

    @Override
    public void run() {
        try {
            long compensationTimeMs = getCompensationTimeMs();
            logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
            evict(compensationTimeMs);
        } catch (Throwable e) {
            logger.error("Could not run the evict task", e);
        }
    }

    public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");

        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }

        // We collect first all expired items, to evict them in random order. For large eviction sets,
        // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
        // the impact should be evenly distributed across all applications.
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {//逐个检查
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {//过期判断
                        expiredLeases.add(lease);//如果过期,将该租约服务放入到expiredLeases中
                    }
                }
            }
        }

        // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
        // triggering self-preservation. Without that we would wipe out full registry.
        //计算最大可以清楚的数量,与自我保护的阈值有关,这里是分批清除
        int registrySize = (int) getLocalRegistrySize();
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        int evictionLimit = registrySize - registrySizeThreshold;

        int toEvict = Math.min(expiredLeases.size(), evictionLimit);//获取可以清除的数量
        if (toEvict > 0) {
            logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                // Pick a random item (Knuth shuffle algorithm)
                int next = i + random.nextInt(expiredLeases.size() - i);//随机删除?
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);

                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                internalCancel(appName, id, false);//下线
            }
        }
    }