博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SpringCloud源码学习(二) 面试官问我Eurake服务注册的实现细节?
阅读量:4210 次
发布时间:2019-05-26

本文共 10242 字,大约阅读时间需要 34 分钟。

上期我们大概得了解了Eurake向Spring上下文中注册EurekaServiceRegistry和EurekaRegistration的详细过程,其中总调度类EurekaAutoServiceRegistration还专门采用lifeCycle的方式实现。昨天的分析中我们指出EurekaServiceRegistry是分析的重点,因此今天我们就重点突破一下这块的具体逻辑。

public void register(EurekaRegistration reg) {    this.maybeInitializeClient(reg);    if (log.isInfoEnabled()) {    //打印服务的状态,UP表示正常启动        log.info("Registering application " + reg.getInstanceConfig().getAppname() + " with eureka with status " + reg.getInstanceConfig().getInitialStatus());    }    //设置启动状态    reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());    if (reg.getHealthCheckHandler() != null) {        reg.getEurekaClient().registerHealthCheck(reg.getHealthCheckHandler());    }}

我们原来以为EurekaServiceRegistry有大量的动作,但是发现这个类中什么也没有。等于与一个壳子,重点全是EurekaRegistration既然如此我们就重新分析EurekaRegistration吧。

发现这里有两个Eurake的客户端,但是cloudEurekaClient并没有怎么用唉,好像这里的eurekaClient才是真的老大。现在这里注入进来了,我们看看它是如何初始化的。这块我们直接DiscoveryClient的实现。

 在DiscoveryClient的构造方法中,有几个线程相关的,分别是定时任务处理器、心跳线程池、缓存线程池。我们做过SpringCloud的同志都知道服务注册是通过定时任务去拉取服务信息,通过心态检测是否有服务宕机的。除此之外如果注册中心宕机了也会采用缓存。

private void initScheduledTasks() {    int renewalIntervalInSecs;    int expBackOffBound;    //判断是否拉取配置,默认为true    if (this.clientConfig.shouldFetchRegistry()) {    //多久拿一次,默认为30秒        renewalIntervalInSecs = this.clientConfig.getRegistryFetchIntervalSeconds();        //超时容许时间的倍数,和获取心跳或者服务信息失败的时间差的扩容有关系 默认10秒        expBackOffBound = this.clientConfig.getCacheRefreshExecutorExponentialBackOffBound();        //这里采用了代理模式,都是runnable接口哦,比较绕        //启动刷新缓存的线程,也就是获取服务信息,全部打偶放到了定时任务中。        this.scheduler.schedule(new TimedSupervisorTask("cacheRefresh", this.scheduler, this.cacheRefreshExecutor, renewalIntervalInSecs,         TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.CacheRefreshThread()), (long)renewalIntervalInSecs, TimeUnit.SECONDS);    }//判断是否向eureka注册 默认为true    if (this.clientConfig.shouldRegisterWithEureka()) {    //多久获取一次服务信息 默认30秒        renewalIntervalInSecs = this.instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();         //超时容许时间的倍数,和获取心跳或者服务信息失败的时间差的扩容有关系 默认10秒        expBackOffBound = this.clientConfig.getHeartbeatExecutorExponentialBackOffBound();        logger.info("Starting heartbeat executor: renew interval is: " + renewalIntervalInSecs);        //启动心跳线程        this.scheduler.schedule(new TimedSupervisorTask("heartbeat", this.scheduler, this.heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.HeartbeatThread()), (long)renewalIntervalInSecs, TimeUnit.SECONDS);        this.instanceInfoReplicator = new InstanceInfoReplicator(this, this.instanceInfo, this.clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2);        //定义一个监听器,用来看获取定时任务还在吗        this.statusChangeListener = new StatusChangeListener() {            public String getId() {                return "statusChangeListener";            }            public void notify(StatusChangeEvent statusChangeEvent) {                if (InstanceStatus.DOWN != statusChangeEvent.getStatus() && InstanceStatus.DOWN != statusChangeEvent.getPreviousStatus()) {                    DiscoveryClient.logger.info("Saw local status change event {}", statusChangeEvent);                } else {                    DiscoveryClient.logger.warn("Saw local status change event {}", statusChangeEvent);                }                DiscoveryClient.this.instanceInfoReplicator.onDemandUpdate();            }        };        if (this.clientConfig.shouldOnDemandUpdateStatusChange()) {            this.applicationInfoManager.registerStatusChangeListener(this.statusChangeListener);        }        //启动监听        this.instanceInfoReplicator.start(this.clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());    } else {        logger.info("Not registering with Eureka server per configuration");    }}

上述代码只要用来获取服务的注册和发现。也对上述功能的线程进行了监听。上边的代理模式,我们打开看看TimedSupervisorTask

public void run() {    Future future = null;    try {    //这里task是注册刷新的任务         future = this.executor.submit(this.task);        //获取活动线程的个数        this.threadPoolLevelGauge.set((long)this.executor.getActiveCount());        //使用future任务类型,这里获取拿到的服务信息        future.get(this.timeoutMillis, TimeUnit.MILLISECONDS);        this.delay.set(this.timeoutMillis);        this.threadPoolLevelGauge.set((long)this.executor.getActiveCount());    } catch (TimeoutException var12) {    //如果超时了,这里还会将当前的时间间隔扩大一倍        logger.error("task supervisor timed out", var12);        this.timeoutCounter.increment();        long currentDelay = this.delay.get();        long newDelay = Math.min(this.maxDelay, currentDelay * 2L);        this.delay.compareAndSet(currentDelay, newDelay);    } catch (RejectedExecutionException var13) {        if (!this.executor.isShutdown() && !this.scheduler.isShutdown()) {            logger.error("task supervisor rejected the task", var13);        } else {            logger.warn("task supervisor shutting down, reject the task", var13);        }        this.rejectedCounter.increment();    } catch (Throwable var14) {    //如果异常服务处理,就直接中断了        if (!this.executor.isShutdown() && !this.scheduler.isShutdown()) {            logger.error("task supervisor threw an exception", var14);        } else {            logger.warn("task supervisor shutting down, can't accept the task");        }        this.throwableCounter.increment();    } finally {        if (future != null) {            future.cancel(true);        }        if (!this.scheduler.isShutdown()) {        //如果没有启动的话就重新创建一个            this.scheduler.schedule(this, this.delay.get(), TimeUnit.MILLISECONDS);        }    }}
    //刷新的任务    @VisibleForTesting    void refreshRegistry() {        try {            boolean isFetchingRemoteRegionRegistries = this.isFetchingRemoteRegionRegistries();            boolean remoteRegionsModified = false;            //先会判断这个值是否为空,如果为空话就从这里获取配置            String latestRemoteRegions = this.clientConfig.fetchRegistryForRemoteRegions();            if (null != latestRemoteRegions) {                String currentRemoteRegions = (String)this.remoteRegionsToFetch.get();                if (!latestRemoteRegions.equals(currentRemoteRegions)) {                    synchronized(this.instanceRegionChecker.getAzToRegionMapper()) {                        if (this.remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {                            //设置最新的服务地址,用来获取服务配置                            String[] remoteRegions = latestRemoteRegions.split(",");                            this.remoteRegionsRef.set(remoteRegions);                            this.instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);                            remoteRegionsModified = true;                        } else {                            logger.info("Remote regions to fetch modified concurrently, ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);                        }                    }                } else {                    this.instanceRegionChecker.getAzToRegionMapper().refreshMapping();                }            }           //查看从eureka服务上获取配置是否成功, 真正的获取服务信息            boolean success = this.fetchRegistry(remoteRegionsModified);            if (success) {                this.registrySize = ((Applications)this.localRegionApps.get()).size();                this.lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();            }            if (logger.isDebugEnabled()) {                StringBuilder allAppsHashCodes = new StringBuilder();                allAppsHashCodes.append("Local region apps hashcode: ");                allAppsHashCodes.append(((Applications)this.localRegionApps.get()).getAppsHashCode());                allAppsHashCodes.append(", is fetching remote regions? ");                allAppsHashCodes.append(isFetchingRemoteRegionRegistries);                Iterator var11 = this.remoteRegionVsApps.entrySet().iterator();                while(var11.hasNext()) {                    Entry
entry = (Entry)var11.next(); allAppsHashCodes.append(", Remote region: "); allAppsHashCodes.append((String)entry.getKey()); allAppsHashCodes.append(" , apps hashcode: "); allAppsHashCodes.append(((Applications)entry.getValue()).getAppsHashCode());                } logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ", allAppsHashCodes.toString()); } } catch (Throwable var9) { logger.error("Cannot fetch registry from server", var9);        } }

在从Eureka服务器上获取服务所有信息的时候代码是这样写的

private void getAndStoreFullRegistry() throws Throwable {        long currentUpdateGeneration = this.fetchRegistryGeneration.get();        logger.info("Getting all instance registry info from the eureka server");        Applications apps = null;        //这块就是从拉取服务所有信息的        EurekaHttpResponse
httpResponse = this.clientConfig.getRegistryRefreshSingleVipAddress() == null ?        this.eurekaTransport.queryClient.getApplications((String[])this.remoteRegionsRef.get()) : this.eurekaTransport.queryClient.getVip(this.clientConfig.getRegistryRefreshSingleVipAddress(), (String[])this.remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { //处理成实体类型 apps = (Applications)httpResponse.getEntity();        } logger.info("The response status is {}", httpResponse.getStatusCode()); if (apps == null) { logger.error("The application is null for some reason. Not storing this information"); } else if (this.fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1L)) { this.localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already");        } }

看到这里,我也蒙了。本来一串也走下来了,但是让我疑惑的是我的serviceurl在哪里设置进去的?找了一圈remoteRegionsRef也没找到。fetch-remote-regions-registry这个配置是找到了,当我把这两个都配置上的时候就报错了。但是用Idea看的时候确实没有serviceUrl的踪迹。所以我是觉得是反编译的问题?有知道的朋友可以给我留言哦!谢谢

转载地址:http://phkmi.baihongyu.com/

你可能感兴趣的文章
Android Camera架构浅析
查看>>
Android display架构分析
查看>>
S3C2440上LCD驱动 (FrameBuffer)实例开发讲解
查看>>
Linux音频编程指南
查看>>
针对windows编程-linux驱动编程-usb编程的号文章--推荐
查看>>
USB OTG的工作原理-相互切换和交互流程
查看>>
usb-otg-调试心得
查看>>
嵌入式开发指导博客-刘洪涛
查看>>
Linux启动流程-bootloader至kernel的过程--android系统启动流程
查看>>
USB2.0速度识别--区分低速-高速-全速
查看>>
NandFlash驱动超详细分析
查看>>
inf文件解析
查看>>
MOSFET结构及其工作原理详解
查看>>
android开源社区
查看>>
手机摄像头工作原理
查看>>
手机电容触摸屏技术简介
查看>>
led子系统 及 内核中led触发器实例
查看>>
Android USB Tethering的实现以及代码流程
查看>>
有关电池充电和LCD驱动的好地方
查看>>
USB规范浏览--设备和主机规范
查看>>