声明

原创文章,转载请标注。
https://www.cnblogs.com/boycelee/p/18055933
《码头工人的一千零一夜》是一位专注于技术干货分享的博主,追随博主的文章,你将深入了解业界最新的技术趋势,以及在Java开发和安全领域的实用经验分享。无论你是开发人员还是对逆向工程感兴趣的爱好者,都能在《码头工人的一千零一夜》找到有价值的知识和见解。

配置中心系列文章

《【架构师视角系列】Apollo配置中心之架构设计(一)》
https://www.cnblogs.com/boycelee/p/17967590
《【架构师视角系列】Apollo配置中心之Client端(二)》
https://www.cnblogs.com/boycelee/p/17978027
《【架构师视角系列】Apollo配置中心之Server端(ConfigSevice)(三)》
https://www.cnblogs.com/boycelee/p/18005318
《【架构师视角系列】QConfig配置中心系列之架构设计(一)》
https://www.cnblogs.com/boycelee/p/18013653
《【架构师视角系列】QConfig配置中心系列之Client端(二)》
https://www.cnblogs.com/boycelee/p/18033286
《【架构师视角系列】QConfig配置中心系列之Server端(三)》
https://www.cnblogs.com/boycelee/p/18055933

一、通知与配置拉取

二、设计思考

1、Admin如何通知Server所有实例配置发生变更?

2、Server如何通知Client端配置发生变更?

3、Client如何拉取配置?

三、源码分析

1、Admin配置推送

1.1、主动推送

1.1.1、逻辑描述

QConfig的Server配置发现有两种方式,一种是主动推送,另一种是被动扫描。

主动发现是Admin(管理平台)通过注册中心获取到已经注册的Server实例相关IP与Port信息,然后通过遍历的方式调用Server接口通知实例此时有配置更新。

被动发现是Server实例中自主定时进行数据库扫描,当发现新版本时通知Client端有配置变更。

1.1.2、时序图

1.1.3、代码位置

1.1.3.1、NotifyServiceImpl#notifyPush

当用户在操作平台进行配置修改时,会调用该接口进行配置变更推送,由于需要通知所有已经部署的Servers有配置更新,所以需要从注册中心中获取到对应的Host信息,然后通过遍历的方式进行配置推送。

@Service
public class NotifyServiceImpl implements NotifyService, InitializingBean {

    /**
     * 管理平台操作,配置变更通知
     */
    @Override
    public void notifyPush(final ConfigMeta meta, final long version, List<PushItemWithHostName> destinations) {
        // 从注册中心(Eureka)获取Server实例的Hosts信息
        List<String> serverUrls = getServerUrls();
        if (serverUrls.isEmpty()) {
            logger.warn("notify push server, {}, version: {}, but no server, {}", meta, version, destinations);
            return;
        }

        // Server中接收变更推送的接口URL
        String uri = this.notifyPushUrl;
        logger.info("notify push server, {}, version: {}, uri: {}, servers: {}, {}", meta, version, uri, serverUrls, destinations);
        StringBuilder sb = new StringBuilder();
        for (PushItemWithHostName item : destinations) {
            sb.append(item.getHostname()).append(',')
                    .append(item.getIp()).append(',')
                    .append(item.getPort()).append(Constants.LINE);
        }
        final String destinationsStr = sb.toString();
        
        // 根据已注册Server的Host列表,配置信息、配置版本等信息,执行通知推送动作
        doNotify(serverUrls, uri, "push", new Function<String, Request>() {
            @Override
            public Request apply(String url) {
                AsyncHttpClient.BoundRequestBuilder builder = getBoundRequestBuilder(url, meta, version, destinationsStr);
                return builder.build();
            }
        });
    }

    /**
     * 获取注册中心中已注册的Server Hosts信息
     */
    private List<String> getServerUrls() {
        return serverListService.getOnlineServerHosts();
    }

    private void doNotify(List<String> serverUrls, String uri, String type, Function<String, Request> requestBuilder) {
        List<ListenableFuture<Response>> futures = Lists.newArrayListWithCapacity(serverUrls.size());
        for (String oneServer : serverUrls) {
            String url = "http://" + oneServer + "/" + uri;
            Request request = requestBuilder.apply(url);
            ListenableFuture<Response> future = HttpListenableFuture.wrap(httpClient.executeRequest(request));
            futures.add(future);
        }

        dealResult(futures, serverUrls, type);
    }

    
}
1.1.3.2、LongPollingStoreImpl#manualPush
@Service
public class LongPollingStoreImpl implements LongPollingStore {

    private static final ConcurrentMap<ConfigMeta, Cache<Listener, Listener>> listenerMappings = Maps.newConcurrentMap();

    private static final int DEFAULT_THREAD_COUNT = 4;

    private static final long DEFAULT_TIMEOUT = 60 * 1000L;

    private static ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(
            DEFAULT_THREAD_COUNT, new NamedThreadFactory("qconfig-config-listener-push"));

    private static ExecutorService onChangeExecutor = Executors.newFixedThreadPool(
            Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("config-on-change"));

    @Override
    public void manualPush(ConfigMeta meta, long version, final Set<IpAndPort> ipAndPorts) {
        logger.info("push client file: {}, version {}, {}", meta, version, ipAndPorts);
        Set<String> ips = Sets.newHashSetWithExpectedSize(ipAndPorts.size());
        for (IpAndPort ipAndPort : ipAndPorts) {
            ips.add(ipAndPort.getIp());
        }

        manualPushIps(meta, version, ips);
    }

    @Override
    public void manualPushIps(ConfigMeta meta, long version, final Set<String> ips) {
        logger.info("push client file: {}, version {}, {}", meta, version, ips);
        Stopwatch stopwatch = Stopwatch.createStarted();
        try {
            doChange(meta, version, Constants.PULL, new Predicate<Listener>() {
                @Override
                public boolean apply(Listener input) {
                    return ips.contains(input.getContextHolder().getIp());
                }
            });
        } finally {
            Monitor.filePushOnChangeTimer.update(stopwatch.elapsed().toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    @Override
    public void onChange(final ConfigMeta meta, final long version) {
        logger.info("file change: {}, version {}", meta, version);
        onChangeExecutor.execute(new Runnable() {
            @Override
            public void run() {
                Stopwatch stopwatch = Stopwatch.createStarted();
                try {
                    doChange(meta, version, Constants.UPDATE, Predicates.<Listener>alwaysTrue());
                } finally {
                    Monitor.fileOnChangeTimer.update(stopwatch.elapsed().toMillis(), TimeUnit.MILLISECONDS);
                }
            }
        });
    }

    private void doChange(ConfigMeta meta, long newVersion, String type, Predicate<Listener> needChange) {
        List<Listener> listeners = getListeners(meta, needChange);
        if (listeners.isEmpty()) {
            return;
        }

        Changed change = new Changed(meta, newVersion);
        // 如果没超过直接推送数量,则直接推送
        if (listeners.size() <= pushConfig.getDirectPushLimit()) {
            directDoChange(listeners, change, type);
        } else {
            // 如果超过一定数量,则scheduled定时,通过一定节奏来推送,避免惊群
            PushItem pushItem = new PushItem(listeners, type, change);
            scheduledExecutor.execute(new PushRunnable(pushItem));
        }
    }

    private void directDoChange(List<Listener> listeners, Changed change, String type) {
        Stopwatch stopwatch = Stopwatch.createStarted();
        try {
            for (Listener listener : listeners) {
                logger.debug("return {}, {}", listener, change);
                returnChange(change, listener, type);
            }
        } catch (Exception e) {
            Monitor.batchReturnChangeFailCounter.inc();
            logger.error("batch direct return changes error, type {}, change {}", type, change, e);
        } finally {
            Monitor.batchReturnChangeTimer.update(stopwatch.elapsed().toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    private static class PushRunnable implements Runnable {

        private final PushItem pushItem;

        private PushRunnable(PushItem pushItem) {
            this.pushItem = pushItem;
        }

        @Override
        public void run() {
            Stopwatch stopwatch = Stopwatch.createStarted();
            try {
                long start = System.currentTimeMillis();
                PushConfig config = pushConfig;
                int num = Math.min(pushItem.getListeners().size(), config.getPushMax());
                for (int i = 0; i < num; ++i) {
                    Listener listener = pushItem.getListeners().poll();
                    returnChange(pushItem.getChange(), listener, pushItem.getType());
                }

                if (!pushItem.getListeners().isEmpty()) {
                    long elapsed = System.currentTimeMillis() - start;
                    long delay;
                    if (elapsed >= config.getPushInterval()) {
                        delay = 0;
                    } else {
                        delay = config.getPushInterval() - elapsed;
                    }
                    //一次推送后,以这次推送时间为起始时间,延迟一定时间后再次推送。这里的PushRunnable递归执行
                    scheduledExecutor.schedule(new PushRunnable(pushItem), delay, TimeUnit.MILLISECONDS);
                }
            } catch (Exception e) {
                Monitor.batchReturnChangeFailCounter.inc();
                logger.error("batch return changes error, {}", pushItem, e);
            } finally {
                Monitor.batchReturnChangeTimer.update(stopwatch.elapsed().toMillis(), TimeUnit.MILLISECONDS);
            }
        }
    }

    private static void returnChange(Changed change, Listener listener, String type) {
        Stopwatch stopwatch = Stopwatch.createStarted();
        try {
            // 通知注册的监听器,响应client,返回版本信息
            listener.onChange(change, type);
        } finally {
            Monitor.returnChangeTimer.update(stopwatch.elapsed().toMillis(), TimeUnit.MILLISECONDS);
        }
    }

}

1.2、被动推送

1.2.1、逻辑描述

首次启动或启动后每3分钟,刷新一次配置的最新版本,如果出现最新版本,则触发推送逻辑,将配置最新的版本推送至Client端中。

1.2.2、代码位置

1.2.2.1、CacheConfigVersionServiceImpl#freshConfigVersionCache
@Service
public class CacheConfigVersionServiceImpl implements CacheConfigVersionService {

    private volatile ConcurrentMap<ConfigMeta, Long> cache = Maps.newConcurrentMap();

    /**
     * 首次启动或启动后每3分钟,刷新一次配置的最新版本
     */
    @PostConstruct
    public void init() {
        freshConfigVersionCache();

        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        // 每3分钟执行一次缓存刷新,判断配置是否有最新版本
        scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                Thread.currentThread().setName("fresh-config-version-thread");
                try {
                    freshConfigVersionCache();
                } catch (Throwable e) {
                    logger.error("fresh config version error", e);
                }
            }
        }, 3, 3, TimeUnit.MINUTES);
    }

    @Override
    public Optional<Long> getVersion(ConfigMeta meta) {
        return Optional.fromNullable(cache.get(meta));
    }

    /**
     * 定时刷新配置最新版本,如果出现最新版本,则触发推送逻辑
     */
    private void freshConfigVersionCache() {
        Stopwatch stopwatch = Stopwatch.createStarted();
        try {
            logger.info("fresh config version cache");
            List<VersionData<ConfigMeta>> configIds = configDao.loadAll();

            ConcurrentMap<ConfigMeta, Long> newCache = new ConcurrentHashMap<ConfigMeta, Long>(configIds.size());
            ConcurrentMap<ConfigMeta, Long> oldCache = this.cache;

            // 判断是否有最新版本
            synchronized (this) {
                for (VersionData<ConfigMeta> configId : configIds) {
                    long newVersion = configId.getVersion();
                    Long oldVersion = cache.get(configId.getData());
                    // 暂时不考虑delete的情况
                    // 从数据库load数据先于配置更新
                    if (oldVersion != null && oldVersion > newVersion) {
                        newVersion = oldVersion;
                    }
                    // 如果有最新版本则刷新缓存
                    newCache.put(configId.getData(), newVersion);
                }

                this.cache = newCache;
            }

            logger.info("fresh config version cache successOf, count [{}]", configIds.size());
            int updates = 0;
            for (Map.Entry<ConfigMeta, Long> oldEntry : oldCache.entrySet()) {
                ConfigMeta meta = oldEntry.getKey();
                Long oldVersion = oldEntry.getValue();
                Long newVersion = newCache.get(meta);
                if (newVersion != null && newVersion > oldVersion) {
                    updates += 1;
                    // 配置变更,通知Client端
                    longPollingStore.onChange(meta, newVersion);
                }
            }
            logger.info("fresh size={} config version cache from db", updates);
        } finally {
            Monitor.freshConfigVersionCacheTimer.update(stopwatch.elapsed().toMillis(), TimeUnit.MILLISECONDS);
        }
    }
}

2、变更监听

2.1.1、逻辑描述

Client端与Server端建立长轮询,长轮询建立完成之后会为当前请求建立一个监听器,当配置发生变变更时就会触发监听器,然后通过监听机制结束长轮询并返回最新的配置版本。如果没有版本变更,长轮询会每分钟断开重新建立一次。

2.1.2、时序图

2.1.3、代码位置

2.1.3.1、AbstractCheckVersionServlet#doPost
public abstract class AbstractCheckVersionServlet extends AbstractServlet {

    private static final long serialVersionUID = -8278568383506314625L;

    @Override
    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        ...
        
        checkVersion(requests, req, resp);
    }
}
2.1.3.2、LongPollingCheckServlet#checkVersion
public class LongPollingCheckServlet extends AbstractCheckVersionServlet {

    @Override
    protected void checkVersion(List<CheckRequest> checkRequests, HttpServletRequest req, HttpServletResponse resp)
    throws ServletException, IOException {
        ...
        try {
            // 异步
            AsyncContext context = req.startAsync();
            // (核心流程,重点关注),执行版本检查(长轮询)
            getLongPollingProcessService().process(context, checkRequests);
        } catch (Throwable e) {
            // never come here !!!
            logger.error("服务异常", e);
        }
    }
}
2.1.3.3、LongPollingProcessServiceImpl#process
@Service
public class LongPollingProcessServiceImpl implements LongPollingProcessService {

    @PostConstruct
    public void init() {
        MapConfig config = MapConfig.get("config.properties");
        config.asMap();
        // 向config中添加监听器
        config.addListener(new Configuration.ConfigListener<Map<String, String>>() {
            @Override
            public void onLoad(Map<String, String> conf) {
                String newTimeout = conf.get("longPolling.server.timeout");
                if (!Strings.isNullOrEmpty(newTimeout)) {
                    timeout = Numbers.toLong(newTimeout, DEFAULT_TIMEOUT);
                }
            }
        });
    }

    // 核心逻辑,重点关注
    @Override
    public void process(AsyncContext context, List<CheckRequest> requests) {
        IpAndPort address = new IpAndPort(clientInfoService.getIp(), clientInfoService.getPort());
        AsyncContextHolder contextHolder = new AsyncContextHolder(context, address);
        // 设置超时
        context.setTimeout(timeout);
        // 设置监听器
        context.addListener(new TimeoutServletListener(contextHolder));
        processCheckRequests(requests, clientInfoService.getIp(), contextHolder);
    }

    private void processCheckRequests(List<CheckRequest> requests, String ip, AsyncContextHolder contextHolder) {
        CheckResult result = checkService.check(requests, ip, qFileFactory);
        logger.info("profile:{}, result change list {} for check request {}", clientInfoService.getProfile(), result.getChanges(), requests);

        if (!result.getChanges().isEmpty()) {
            returnChanges(AbstractCheckConfigServlet.formatOutput(CheckUtil.processStringCase(result.getChanges())), contextHolder, Constants.UPDATE);
            return;
        }
        // 为该请求注册监听器,并存放至longPollingStore中
        addListener(result.getRequestsNoChange(), contextHolder);
        // 注册client
        registerOnlineClients(result, contextHolder);
    }

    private void addListener(Map<CheckRequest, QFile> requests, AsyncContextHolder contextHolder) {
        for (Map.Entry<CheckRequest, QFile> noChangeEntry : requests.entrySet()) {
            CheckRequest request = noChangeEntry.getKey();
            QFile qFile = noChangeEntry.getValue();
            if (!contextHolder.isComplete()) {
                // 根据请求创建监听器
                Listener listener = qFile.createListener(request, contextHolder);
                // 将监听器存储至longPollingStore
                longPollingStore.addListener(listener);
            }
        }
    }

    private void registerOnlineClients(CheckResult result, AsyncContextHolder contextHolder) {
        Map<CheckRequest, QFile> noChanges = Maps.newHashMapWithExpectedSize(
                result.getRequestsNoChange().size() + result.getRequestsLockByFixVersion().size());
        noChanges.putAll(result.getRequestsNoChange());
        noChanges.putAll(result.getRequestsLockByFixVersion());

        for (Map.Entry<CheckRequest, QFile> noChangeEntry : noChanges.entrySet()) {
            CheckRequest request = noChangeEntry.getKey();
            QFile qFile = noChangeEntry.getValue();
            if (!contextHolder.isComplete()) {
                long version = request.getVersion();
                ConfigMeta meta = qFile.getRealMeta();
                String ip = contextHolder.getIp();
                if (qFile instanceof InheritQFileV2) {
                    InheritQFileV2 inheritQFile = (InheritQFileV2) qFile;
                    Optional<Long> optional = inheritQFile.getCacheConfigInfoService().getVersion(inheritQFile.getRealMeta());
                    version = optional.isPresent() ? optional.get() : version;
                    onlineClientListService.register(inheritQFile.getRealMeta(), ip, version);
                } else {
                    // 注册client,admin(管理平台)获取已经连接的client信息,其中包括ip、配置版本
                    onlineClientListService.register(meta, ip, version);
                }
            }
        }
    }

    /**
     * 配置变化,执行返回
     */
    private void returnChanges(String change, AsyncContextHolder contextHolder, String type) {
        contextHolder.completeRequest(new ChangeReturnAction(change, type));
    }
}
2.1.3.4、CheckService#check
@Service
public class CheckServiceImpl implements CheckService {
    ...

    @Override
    public CheckResult check(List<CheckRequest> requests, String ip, QFileFactory qFileFactory) {
        List<CheckRequest> requestsNoFile = Lists.newArrayList();
        Map<CheckRequest, Changed> changes = Maps.newHashMap();
        Map<CheckRequest, QFile> requestNoChange = Maps.newHashMap();
        Map<CheckRequest, QFile> requestsLockByFixVersion = Maps.newHashMap();
        for (CheckRequest request : requests) {
            ConfigMeta meta = new ConfigMeta(request.getGroup(), request.getDataId(), request.getProfile());
            Optional<QFile> qFileOptional = qFileFactory.create(meta, cacheConfigInfoService);
            if (!qFileOptional.isPresent()) {
                requestsNoFile.add(request);
                continue;
            }

            QFile qFile = qFileOptional.get();
            // 核心逻辑,检测版本
            Optional<Changed> changedOptional = qFile.checkChange(request, ip);
            if (changedOptional.isPresent()) {
                Optional<Changed> resultChange = repairChangeWithFixVersion(qFile, request, ip, changedOptional.get());
                if (resultChange.isPresent()) {
                    changes.put(request, resultChange.get());
                } else {
                    requestsLockByFixVersion.put(request, qFile);
                }
            } else {
                requestNoChange.put(request, qFile);
            }
        }
        return new CheckResult(requestsNoFile, changes, requestNoChange, requestsLockByFixVersion);
    }
}
2.1.3.5、QFileEntityV1#checkChange
public class QFileEntityV1 extends AbstractQFileEntity implements QFile {

    public QFileEntityV1(ConfigMeta meta,
                         CacheConfigInfoService cacheConfigInfoService,
                         ConfigStore configStore,
                         LogService logService,
                         ClientInfoService clientInfoService) {
        super(meta, cacheConfigInfoService, configStore, logService, clientInfoService);
    }

    @Override
    public Optional<Changed> checkChange(CheckRequest request, String ip) {
        ConfigMeta meta = getSourceMeta();
        // 从缓存中获取配置文件的最新版本
        Optional<Long> version = getCacheConfigInfoService().getVersion(meta, ip);
        if (!version.isPresent()) {
            return Optional.absent();
        }

        if (version.get() <= request.getVersion()) {
            return Optional.absent();
        }

        return Optional.of(new Changed(meta.getGroup(), meta.getDataId(), meta.getProfile(), version.get()));
    }
}
2.1.3.6、CacheConfigInfoService#getVersion
@Service("cacheConfigInfoService")
public class CacheConfigInfoService implements ConfigInfoService {
    ... 
    @Override
    public Optional<Long> getVersion(ConfigMeta meta, String ip) {
        // 获取配置已发布的最新版本
        Optional<Long> publishVersion = getVersion(meta);
        // 获取推送给该IP的配置的最新灰度版本
        Optional<Long> pushVersion = getPushVersion(meta, ip);
        return VersionUtil.getLoadVersion(publishVersion, pushVersion);
    }
}

3、Client配置拉取

3.1.1、逻辑描述

根据长轮询后Client端获取到的配置文件对应的最新版本信息,查询最新的配置数据。查询顺序是先查询缓存,如果查找不到则通过本地文件查找,如果再查不到则查询数据库。这样可以有效缓解数据库压力。

3.1.2、代码位置

3.1.2.1、ConfigStoreImpl#findConfig
@Service
public class ConfigStoreImpl implements ConfigStore {

    private LoadingCache<VersionData<ConfigMeta>, ChecksumData<String>> configCache;

    @PostConstruct
    private void init() {
        configCache = CacheBuilder.newBuilder()
                .maximumSize(5000) // 最大数量
                .expireAfterAccess(10, TimeUnit.SECONDS) // 访问失效时间
                .recordStats()
                .build(new CacheLoader<VersionData<ConfigMeta>, ChecksumData<String>>() {
                    @Override
                    public ChecksumData<String> load(VersionData<ConfigMeta> configId) throws ConfigNotFoundException {
                        
                        return loadConfig(configId);
                    }
                });

        Metrics.gauge("configFile_notFound_cache_hitRate", new Supplier<Double>() {
            @Override
            public Double get() {
                return configCache.stats().hitRate();
            }
        });
    }

    /**
     * 查本地guava cache
     */
    @Override
    public ChecksumData<String> findConfig(VersionData<ConfigMeta> configId) throws ConfigNotFoundException {
        try {
            return configCache.get(configId);
        } catch (ExecutionException e) {
            if (e.getCause() instanceof ConfigNotFoundException) {
                throw (ConfigNotFoundException) e.getCause();
            } else {
                log.error("find config error, configId:{}", configId, e);
                throw new RuntimeException(e.getCause());
            }
        }
    }

    /**
     * 从本地文件或数据库中获取配置信息
     */
    private ChecksumData<String> loadConfig(VersionData<ConfigMeta> configId) throws ConfigNotFoundException {
        // 从本地配置文件中查询配置信息
        ChecksumData<String> config = findFromDisk(configId);
        if (config != null) {
            return config;
        }

        String groupId = configId.getData().getGroup();
        Monitor.notFoundConfigFileFromDiskCounterInc(groupId);
        log.warn("config not found from disk: {}", configId);
        // 从数据库中加载配置数据
        config = findFromDb(configId);
        if (config != null) {
            return config;
        }
        Monitor.notFoundConfigFileFromDbCounterInc(groupId);

        throw new ConfigNotFoundException();
    }

    private ChecksumData<String> findFromDb(VersionData<ConfigMeta> configId) {
        ChecksumData<String> config = configDao.loadFromCandidateSnapshot(configId);
        if (config != null) {
            saveToFile(configId, config);
        }
        return config;
    }
}

三、最后

《码头工人的一千零一夜》是一位专注于技术干货分享的博主,追随博主的文章,你将深入了解业界最新的技术趋势,以及在Java开发和安全领域的实用经验分享。无论你是开发人员还是对逆向工程感兴趣的爱好者,都能在《码头工人的一千零一夜》找到有价值的知识和见解。

懂得不多,做得太少。欢迎批评、指正。

标签: none

添加新评论