本文共 10242 字,大约阅读时间需要 34 分钟。
上期我们大概得了解了Eurake向Spring上下文中注册EurekaServiceRegistry和EurekaRegistration的详细过程,其中总调度类EurekaAutoServiceRegistration还专门采用lifeCycle的方式实现。昨天的分析中我们指出EurekaServiceRegistry是分析的重点,因此今天我们就重点突破一下这块的具体逻辑。
public void register(EurekaRegistration reg) { this.maybeInitializeClient(reg); if (log.isInfoEnabled()) { //打印服务的状态,UP表示正常启动 log.info("Registering application " + reg.getInstanceConfig().getAppname() + " with eureka with status " + reg.getInstanceConfig().getInitialStatus()); } //设置启动状态 reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus()); if (reg.getHealthCheckHandler() != null) { reg.getEurekaClient().registerHealthCheck(reg.getHealthCheckHandler()); }}
我们原来以为EurekaServiceRegistry有大量的动作,但是发现这个类中什么也没有。等于与一个壳子,重点全是EurekaRegistration既然如此我们就重新分析EurekaRegistration吧。
发现这里有两个Eurake的客户端,但是cloudEurekaClient并没有怎么用唉,好像这里的eurekaClient才是真的老大。现在这里注入进来了,我们看看它是如何初始化的。这块我们直接DiscoveryClient的实现。
在DiscoveryClient的构造方法中,有几个线程相关的,分别是定时任务处理器、心跳线程池、缓存线程池。我们做过SpringCloud的同志都知道服务注册是通过定时任务去拉取服务信息,通过心态检测是否有服务宕机的。除此之外如果注册中心宕机了也会采用缓存。
private void initScheduledTasks() { int renewalIntervalInSecs; int expBackOffBound; //判断是否拉取配置,默认为true if (this.clientConfig.shouldFetchRegistry()) { //多久拿一次,默认为30秒 renewalIntervalInSecs = this.clientConfig.getRegistryFetchIntervalSeconds(); //超时容许时间的倍数,和获取心跳或者服务信息失败的时间差的扩容有关系 默认10秒 expBackOffBound = this.clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); //这里采用了代理模式,都是runnable接口哦,比较绕 //启动刷新缓存的线程,也就是获取服务信息,全部打偶放到了定时任务中。 this.scheduler.schedule(new TimedSupervisorTask("cacheRefresh", this.scheduler, this.cacheRefreshExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.CacheRefreshThread()), (long)renewalIntervalInSecs, TimeUnit.SECONDS); }//判断是否向eureka注册 默认为true if (this.clientConfig.shouldRegisterWithEureka()) { //多久获取一次服务信息 默认30秒 renewalIntervalInSecs = this.instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); //超时容许时间的倍数,和获取心跳或者服务信息失败的时间差的扩容有关系 默认10秒 expBackOffBound = this.clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: renew interval is: " + renewalIntervalInSecs); //启动心跳线程 this.scheduler.schedule(new TimedSupervisorTask("heartbeat", this.scheduler, this.heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.HeartbeatThread()), (long)renewalIntervalInSecs, TimeUnit.SECONDS); this.instanceInfoReplicator = new InstanceInfoReplicator(this, this.instanceInfo, this.clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); //定义一个监听器,用来看获取定时任务还在吗 this.statusChangeListener = new StatusChangeListener() { public String getId() { return "statusChangeListener"; } public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN != statusChangeEvent.getStatus() && InstanceStatus.DOWN != statusChangeEvent.getPreviousStatus()) { DiscoveryClient.logger.info("Saw local status change event {}", statusChangeEvent); } else { DiscoveryClient.logger.warn("Saw local status change event {}", statusChangeEvent); } DiscoveryClient.this.instanceInfoReplicator.onDemandUpdate(); } }; if (this.clientConfig.shouldOnDemandUpdateStatusChange()) { this.applicationInfoManager.registerStatusChangeListener(this.statusChangeListener); } //启动监听 this.instanceInfoReplicator.start(this.clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); }}
上述代码只要用来获取服务的注册和发现。也对上述功能的线程进行了监听。上边的代理模式,我们打开看看TimedSupervisorTask
public void run() { Future future = null; try { //这里task是注册刷新的任务 future = this.executor.submit(this.task); //获取活动线程的个数 this.threadPoolLevelGauge.set((long)this.executor.getActiveCount()); //使用future任务类型,这里获取拿到的服务信息 future.get(this.timeoutMillis, TimeUnit.MILLISECONDS); this.delay.set(this.timeoutMillis); this.threadPoolLevelGauge.set((long)this.executor.getActiveCount()); } catch (TimeoutException var12) { //如果超时了,这里还会将当前的时间间隔扩大一倍 logger.error("task supervisor timed out", var12); this.timeoutCounter.increment(); long currentDelay = this.delay.get(); long newDelay = Math.min(this.maxDelay, currentDelay * 2L); this.delay.compareAndSet(currentDelay, newDelay); } catch (RejectedExecutionException var13) { if (!this.executor.isShutdown() && !this.scheduler.isShutdown()) { logger.error("task supervisor rejected the task", var13); } else { logger.warn("task supervisor shutting down, reject the task", var13); } this.rejectedCounter.increment(); } catch (Throwable var14) { //如果异常服务处理,就直接中断了 if (!this.executor.isShutdown() && !this.scheduler.isShutdown()) { logger.error("task supervisor threw an exception", var14); } else { logger.warn("task supervisor shutting down, can't accept the task"); } this.throwableCounter.increment(); } finally { if (future != null) { future.cancel(true); } if (!this.scheduler.isShutdown()) { //如果没有启动的话就重新创建一个 this.scheduler.schedule(this, this.delay.get(), TimeUnit.MILLISECONDS); } }}
//刷新的任务 @VisibleForTesting void refreshRegistry() { try { boolean isFetchingRemoteRegionRegistries = this.isFetchingRemoteRegionRegistries(); boolean remoteRegionsModified = false; //先会判断这个值是否为空,如果为空话就从这里获取配置 String latestRemoteRegions = this.clientConfig.fetchRegistryForRemoteRegions(); if (null != latestRemoteRegions) { String currentRemoteRegions = (String)this.remoteRegionsToFetch.get(); if (!latestRemoteRegions.equals(currentRemoteRegions)) { synchronized(this.instanceRegionChecker.getAzToRegionMapper()) { if (this.remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) { //设置最新的服务地址,用来获取服务配置 String[] remoteRegions = latestRemoteRegions.split(","); this.remoteRegionsRef.set(remoteRegions); this.instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions); remoteRegionsModified = true; } else { logger.info("Remote regions to fetch modified concurrently, ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions); } } } else { this.instanceRegionChecker.getAzToRegionMapper().refreshMapping(); } } //查看从eureka服务上获取配置是否成功, 真正的获取服务信息 boolean success = this.fetchRegistry(remoteRegionsModified); if (success) { this.registrySize = ((Applications)this.localRegionApps.get()).size(); this.lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis(); } if (logger.isDebugEnabled()) { StringBuilder allAppsHashCodes = new StringBuilder(); allAppsHashCodes.append("Local region apps hashcode: "); allAppsHashCodes.append(((Applications)this.localRegionApps.get()).getAppsHashCode()); allAppsHashCodes.append(", is fetching remote regions? "); allAppsHashCodes.append(isFetchingRemoteRegionRegistries); Iterator var11 = this.remoteRegionVsApps.entrySet().iterator(); while(var11.hasNext()) { Entryentry = (Entry)var11.next(); allAppsHashCodes.append(", Remote region: "); allAppsHashCodes.append((String)entry.getKey()); allAppsHashCodes.append(" , apps hashcode: "); allAppsHashCodes.append(((Applications)entry.getValue()).getAppsHashCode()); } logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ", allAppsHashCodes.toString()); } } catch (Throwable var9) { logger.error("Cannot fetch registry from server", var9); } }
在从Eureka服务器上获取服务所有信息的时候代码是这样写的
private void getAndStoreFullRegistry() throws Throwable { long currentUpdateGeneration = this.fetchRegistryGeneration.get(); logger.info("Getting all instance registry info from the eureka server"); Applications apps = null; //这块就是从拉取服务所有信息的 EurekaHttpResponsehttpResponse = this.clientConfig.getRegistryRefreshSingleVipAddress() == null ? this.eurekaTransport.queryClient.getApplications((String[])this.remoteRegionsRef.get()) : this.eurekaTransport.queryClient.getVip(this.clientConfig.getRegistryRefreshSingleVipAddress(), (String[])this.remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { //处理成实体类型 apps = (Applications)httpResponse.getEntity(); } logger.info("The response status is {}", httpResponse.getStatusCode()); if (apps == null) { logger.error("The application is null for some reason. Not storing this information"); } else if (this.fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1L)) { this.localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already"); } }
看到这里,我也蒙了。本来一串也走下来了,但是让我疑惑的是我的serviceurl在哪里设置进去的?找了一圈remoteRegionsRef也没找到。fetch-remote-regions-registry这个配置是找到了,当我把这两个都配置上的时候就报错了。但是用Idea看的时候确实没有serviceUrl的踪迹。所以我是觉得是反编译的问题?有知道的朋友可以给我留言哦!谢谢
转载地址:http://phkmi.baihongyu.com/