2024年3月

声明

原创文章,转载请标注。
https://www.cnblogs.com/boycelee/p/18055933
《码头工人的一千零一夜》是一位专注于技术干货分享的博主,追随博主的文章,你将深入了解业界最新的技术趋势,以及在Java开发和安全领域的实用经验分享。无论你是开发人员还是对逆向工程感兴趣的爱好者,都能在《码头工人的一千零一夜》找到有价值的知识和见解。

配置中心系列文章

《【架构师视角系列】Apollo配置中心之架构设计(一)》
https://www.cnblogs.com/boycelee/p/17967590
《【架构师视角系列】Apollo配置中心之Client端(二)》
https://www.cnblogs.com/boycelee/p/17978027
《【架构师视角系列】Apollo配置中心之Server端(ConfigSevice)(三)》
https://www.cnblogs.com/boycelee/p/18005318
《【架构师视角系列】QConfig配置中心系列之架构设计(一)》
https://www.cnblogs.com/boycelee/p/18013653
《【架构师视角系列】QConfig配置中心系列之Client端(二)》
https://www.cnblogs.com/boycelee/p/18033286
《【架构师视角系列】QConfig配置中心系列之Server端(三)》
https://www.cnblogs.com/boycelee/p/18055933

一、通知与配置拉取

二、设计思考

1、Admin如何通知Server所有实例配置发生变更?

2、Server如何通知Client端配置发生变更?

3、Client如何拉取配置?

三、源码分析

1、Admin配置推送

1.1、主动推送

1.1.1、逻辑描述

QConfig的Server配置发现有两种方式,一种是主动推送,另一种是被动扫描。

主动发现是Admin(管理平台)通过注册中心获取到已经注册的Server实例相关IP与Port信息,然后通过遍历的方式调用Server接口通知实例此时有配置更新。

被动发现是Server实例中自主定时进行数据库扫描,当发现新版本时通知Client端有配置变更。

1.1.2、时序图

1.1.3、代码位置

1.1.3.1、NotifyServiceImpl#notifyPush

当用户在操作平台进行配置修改时,会调用该接口进行配置变更推送,由于需要通知所有已经部署的Servers有配置更新,所以需要从注册中心中获取到对应的Host信息,然后通过遍历的方式进行配置推送。

@Service
public class NotifyServiceImpl implements NotifyService, InitializingBean {

    /**
     * 管理平台操作,配置变更通知
     */
    @Override
    public void notifyPush(final ConfigMeta meta, final long version, List<PushItemWithHostName> destinations) {
        // 从注册中心(Eureka)获取Server实例的Hosts信息
        List<String> serverUrls = getServerUrls();
        if (serverUrls.isEmpty()) {
            logger.warn("notify push server, {}, version: {}, but no server, {}", meta, version, destinations);
            return;
        }

        // Server中接收变更推送的接口URL
        String uri = this.notifyPushUrl;
        logger.info("notify push server, {}, version: {}, uri: {}, servers: {}, {}", meta, version, uri, serverUrls, destinations);
        StringBuilder sb = new StringBuilder();
        for (PushItemWithHostName item : destinations) {
            sb.append(item.getHostname()).append(',')
                    .append(item.getIp()).append(',')
                    .append(item.getPort()).append(Constants.LINE);
        }
        final String destinationsStr = sb.toString();
        
        // 根据已注册Server的Host列表,配置信息、配置版本等信息,执行通知推送动作
        doNotify(serverUrls, uri, "push", new Function<String, Request>() {
            @Override
            public Request apply(String url) {
                AsyncHttpClient.BoundRequestBuilder builder = getBoundRequestBuilder(url, meta, version, destinationsStr);
                return builder.build();
            }
        });
    }

    /**
     * 获取注册中心中已注册的Server Hosts信息
     */
    private List<String> getServerUrls() {
        return serverListService.getOnlineServerHosts();
    }

    private void doNotify(List<String> serverUrls, String uri, String type, Function<String, Request> requestBuilder) {
        List<ListenableFuture<Response>> futures = Lists.newArrayListWithCapacity(serverUrls.size());
        for (String oneServer : serverUrls) {
            String url = "http://" + oneServer + "/" + uri;
            Request request = requestBuilder.apply(url);
            ListenableFuture<Response> future = HttpListenableFuture.wrap(httpClient.executeRequest(request));
            futures.add(future);
        }

        dealResult(futures, serverUrls, type);
    }

    
}
1.1.3.2、LongPollingStoreImpl#manualPush
@Service
public class LongPollingStoreImpl implements LongPollingStore {

    private static final ConcurrentMap<ConfigMeta, Cache<Listener, Listener>> listenerMappings = Maps.newConcurrentMap();

    private static final int DEFAULT_THREAD_COUNT = 4;

    private static final long DEFAULT_TIMEOUT = 60 * 1000L;

    private static ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(
            DEFAULT_THREAD_COUNT, new NamedThreadFactory("qconfig-config-listener-push"));

    private static ExecutorService onChangeExecutor = Executors.newFixedThreadPool(
            Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("config-on-change"));

    @Override
    public void manualPush(ConfigMeta meta, long version, final Set<IpAndPort> ipAndPorts) {
        logger.info("push client file: {}, version {}, {}", meta, version, ipAndPorts);
        Set<String> ips = Sets.newHashSetWithExpectedSize(ipAndPorts.size());
        for (IpAndPort ipAndPort : ipAndPorts) {
            ips.add(ipAndPort.getIp());
        }

        manualPushIps(meta, version, ips);
    }

    @Override
    public void manualPushIps(ConfigMeta meta, long version, final Set<String> ips) {
        logger.info("push client file: {}, version {}, {}", meta, version, ips);
        Stopwatch stopwatch = Stopwatch.createStarted();
        try {
            doChange(meta, version, Constants.PULL, new Predicate<Listener>() {
                @Override
                public boolean apply(Listener input) {
                    return ips.contains(input.getContextHolder().getIp());
                }
            });
        } finally {
            Monitor.filePushOnChangeTimer.update(stopwatch.elapsed().toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    @Override
    public void onChange(final ConfigMeta meta, final long version) {
        logger.info("file change: {}, version {}", meta, version);
        onChangeExecutor.execute(new Runnable() {
            @Override
            public void run() {
                Stopwatch stopwatch = Stopwatch.createStarted();
                try {
                    doChange(meta, version, Constants.UPDATE, Predicates.<Listener>alwaysTrue());
                } finally {
                    Monitor.fileOnChangeTimer.update(stopwatch.elapsed().toMillis(), TimeUnit.MILLISECONDS);
                }
            }
        });
    }

    private void doChange(ConfigMeta meta, long newVersion, String type, Predicate<Listener> needChange) {
        List<Listener> listeners = getListeners(meta, needChange);
        if (listeners.isEmpty()) {
            return;
        }

        Changed change = new Changed(meta, newVersion);
        // 如果没超过直接推送数量,则直接推送
        if (listeners.size() <= pushConfig.getDirectPushLimit()) {
            directDoChange(listeners, change, type);
        } else {
            // 如果超过一定数量,则scheduled定时,通过一定节奏来推送,避免惊群
            PushItem pushItem = new PushItem(listeners, type, change);
            scheduledExecutor.execute(new PushRunnable(pushItem));
        }
    }

    private void directDoChange(List<Listener> listeners, Changed change, String type) {
        Stopwatch stopwatch = Stopwatch.createStarted();
        try {
            for (Listener listener : listeners) {
                logger.debug("return {}, {}", listener, change);
                returnChange(change, listener, type);
            }
        } catch (Exception e) {
            Monitor.batchReturnChangeFailCounter.inc();
            logger.error("batch direct return changes error, type {}, change {}", type, change, e);
        } finally {
            Monitor.batchReturnChangeTimer.update(stopwatch.elapsed().toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    private static class PushRunnable implements Runnable {

        private final PushItem pushItem;

        private PushRunnable(PushItem pushItem) {
            this.pushItem = pushItem;
        }

        @Override
        public void run() {
            Stopwatch stopwatch = Stopwatch.createStarted();
            try {
                long start = System.currentTimeMillis();
                PushConfig config = pushConfig;
                int num = Math.min(pushItem.getListeners().size(), config.getPushMax());
                for (int i = 0; i < num; ++i) {
                    Listener listener = pushItem.getListeners().poll();
                    returnChange(pushItem.getChange(), listener, pushItem.getType());
                }

                if (!pushItem.getListeners().isEmpty()) {
                    long elapsed = System.currentTimeMillis() - start;
                    long delay;
                    if (elapsed >= config.getPushInterval()) {
                        delay = 0;
                    } else {
                        delay = config.getPushInterval() - elapsed;
                    }
                    //一次推送后,以这次推送时间为起始时间,延迟一定时间后再次推送。这里的PushRunnable递归执行
                    scheduledExecutor.schedule(new PushRunnable(pushItem), delay, TimeUnit.MILLISECONDS);
                }
            } catch (Exception e) {
                Monitor.batchReturnChangeFailCounter.inc();
                logger.error("batch return changes error, {}", pushItem, e);
            } finally {
                Monitor.batchReturnChangeTimer.update(stopwatch.elapsed().toMillis(), TimeUnit.MILLISECONDS);
            }
        }
    }

    private static void returnChange(Changed change, Listener listener, String type) {
        Stopwatch stopwatch = Stopwatch.createStarted();
        try {
            // 通知注册的监听器,响应client,返回版本信息
            listener.onChange(change, type);
        } finally {
            Monitor.returnChangeTimer.update(stopwatch.elapsed().toMillis(), TimeUnit.MILLISECONDS);
        }
    }

}

1.2、被动推送

1.2.1、逻辑描述

首次启动或启动后每3分钟,刷新一次配置的最新版本,如果出现最新版本,则触发推送逻辑,将配置最新的版本推送至Client端中。

1.2.2、代码位置

1.2.2.1、CacheConfigVersionServiceImpl#freshConfigVersionCache
@Service
public class CacheConfigVersionServiceImpl implements CacheConfigVersionService {

    private volatile ConcurrentMap<ConfigMeta, Long> cache = Maps.newConcurrentMap();

    /**
     * 首次启动或启动后每3分钟,刷新一次配置的最新版本
     */
    @PostConstruct
    public void init() {
        freshConfigVersionCache();

        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        // 每3分钟执行一次缓存刷新,判断配置是否有最新版本
        scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                Thread.currentThread().setName("fresh-config-version-thread");
                try {
                    freshConfigVersionCache();
                } catch (Throwable e) {
                    logger.error("fresh config version error", e);
                }
            }
        }, 3, 3, TimeUnit.MINUTES);
    }

    @Override
    public Optional<Long> getVersion(ConfigMeta meta) {
        return Optional.fromNullable(cache.get(meta));
    }

    /**
     * 定时刷新配置最新版本,如果出现最新版本,则触发推送逻辑
     */
    private void freshConfigVersionCache() {
        Stopwatch stopwatch = Stopwatch.createStarted();
        try {
            logger.info("fresh config version cache");
            List<VersionData<ConfigMeta>> configIds = configDao.loadAll();

            ConcurrentMap<ConfigMeta, Long> newCache = new ConcurrentHashMap<ConfigMeta, Long>(configIds.size());
            ConcurrentMap<ConfigMeta, Long> oldCache = this.cache;

            // 判断是否有最新版本
            synchronized (this) {
                for (VersionData<ConfigMeta> configId : configIds) {
                    long newVersion = configId.getVersion();
                    Long oldVersion = cache.get(configId.getData());
                    // 暂时不考虑delete的情况
                    // 从数据库load数据先于配置更新
                    if (oldVersion != null && oldVersion > newVersion) {
                        newVersion = oldVersion;
                    }
                    // 如果有最新版本则刷新缓存
                    newCache.put(configId.getData(), newVersion);
                }

                this.cache = newCache;
            }

            logger.info("fresh config version cache successOf, count [{}]", configIds.size());
            int updates = 0;
            for (Map.Entry<ConfigMeta, Long> oldEntry : oldCache.entrySet()) {
                ConfigMeta meta = oldEntry.getKey();
                Long oldVersion = oldEntry.getValue();
                Long newVersion = newCache.get(meta);
                if (newVersion != null && newVersion > oldVersion) {
                    updates += 1;
                    // 配置变更,通知Client端
                    longPollingStore.onChange(meta, newVersion);
                }
            }
            logger.info("fresh size={} config version cache from db", updates);
        } finally {
            Monitor.freshConfigVersionCacheTimer.update(stopwatch.elapsed().toMillis(), TimeUnit.MILLISECONDS);
        }
    }
}

2、变更监听

2.1.1、逻辑描述

Client端与Server端建立长轮询,长轮询建立完成之后会为当前请求建立一个监听器,当配置发生变变更时就会触发监听器,然后通过监听机制结束长轮询并返回最新的配置版本。如果没有版本变更,长轮询会每分钟断开重新建立一次。

2.1.2、时序图

2.1.3、代码位置

2.1.3.1、AbstractCheckVersionServlet#doPost
public abstract class AbstractCheckVersionServlet extends AbstractServlet {

    private static final long serialVersionUID = -8278568383506314625L;

    @Override
    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        ...
        
        checkVersion(requests, req, resp);
    }
}
2.1.3.2、LongPollingCheckServlet#checkVersion
public class LongPollingCheckServlet extends AbstractCheckVersionServlet {

    @Override
    protected void checkVersion(List<CheckRequest> checkRequests, HttpServletRequest req, HttpServletResponse resp)
    throws ServletException, IOException {
        ...
        try {
            // 异步
            AsyncContext context = req.startAsync();
            // (核心流程,重点关注),执行版本检查(长轮询)
            getLongPollingProcessService().process(context, checkRequests);
        } catch (Throwable e) {
            // never come here !!!
            logger.error("服务异常", e);
        }
    }
}
2.1.3.3、LongPollingProcessServiceImpl#process
@Service
public class LongPollingProcessServiceImpl implements LongPollingProcessService {

    @PostConstruct
    public void init() {
        MapConfig config = MapConfig.get("config.properties");
        config.asMap();
        // 向config中添加监听器
        config.addListener(new Configuration.ConfigListener<Map<String, String>>() {
            @Override
            public void onLoad(Map<String, String> conf) {
                String newTimeout = conf.get("longPolling.server.timeout");
                if (!Strings.isNullOrEmpty(newTimeout)) {
                    timeout = Numbers.toLong(newTimeout, DEFAULT_TIMEOUT);
                }
            }
        });
    }

    // 核心逻辑,重点关注
    @Override
    public void process(AsyncContext context, List<CheckRequest> requests) {
        IpAndPort address = new IpAndPort(clientInfoService.getIp(), clientInfoService.getPort());
        AsyncContextHolder contextHolder = new AsyncContextHolder(context, address);
        // 设置超时
        context.setTimeout(timeout);
        // 设置监听器
        context.addListener(new TimeoutServletListener(contextHolder));
        processCheckRequests(requests, clientInfoService.getIp(), contextHolder);
    }

    private void processCheckRequests(List<CheckRequest> requests, String ip, AsyncContextHolder contextHolder) {
        CheckResult result = checkService.check(requests, ip, qFileFactory);
        logger.info("profile:{}, result change list {} for check request {}", clientInfoService.getProfile(), result.getChanges(), requests);

        if (!result.getChanges().isEmpty()) {
            returnChanges(AbstractCheckConfigServlet.formatOutput(CheckUtil.processStringCase(result.getChanges())), contextHolder, Constants.UPDATE);
            return;
        }
        // 为该请求注册监听器,并存放至longPollingStore中
        addListener(result.getRequestsNoChange(), contextHolder);
        // 注册client
        registerOnlineClients(result, contextHolder);
    }

    private void addListener(Map<CheckRequest, QFile> requests, AsyncContextHolder contextHolder) {
        for (Map.Entry<CheckRequest, QFile> noChangeEntry : requests.entrySet()) {
            CheckRequest request = noChangeEntry.getKey();
            QFile qFile = noChangeEntry.getValue();
            if (!contextHolder.isComplete()) {
                // 根据请求创建监听器
                Listener listener = qFile.createListener(request, contextHolder);
                // 将监听器存储至longPollingStore
                longPollingStore.addListener(listener);
            }
        }
    }

    private void registerOnlineClients(CheckResult result, AsyncContextHolder contextHolder) {
        Map<CheckRequest, QFile> noChanges = Maps.newHashMapWithExpectedSize(
                result.getRequestsNoChange().size() + result.getRequestsLockByFixVersion().size());
        noChanges.putAll(result.getRequestsNoChange());
        noChanges.putAll(result.getRequestsLockByFixVersion());

        for (Map.Entry<CheckRequest, QFile> noChangeEntry : noChanges.entrySet()) {
            CheckRequest request = noChangeEntry.getKey();
            QFile qFile = noChangeEntry.getValue();
            if (!contextHolder.isComplete()) {
                long version = request.getVersion();
                ConfigMeta meta = qFile.getRealMeta();
                String ip = contextHolder.getIp();
                if (qFile instanceof InheritQFileV2) {
                    InheritQFileV2 inheritQFile = (InheritQFileV2) qFile;
                    Optional<Long> optional = inheritQFile.getCacheConfigInfoService().getVersion(inheritQFile.getRealMeta());
                    version = optional.isPresent() ? optional.get() : version;
                    onlineClientListService.register(inheritQFile.getRealMeta(), ip, version);
                } else {
                    // 注册client,admin(管理平台)获取已经连接的client信息,其中包括ip、配置版本
                    onlineClientListService.register(meta, ip, version);
                }
            }
        }
    }

    /**
     * 配置变化,执行返回
     */
    private void returnChanges(String change, AsyncContextHolder contextHolder, String type) {
        contextHolder.completeRequest(new ChangeReturnAction(change, type));
    }
}
2.1.3.4、CheckService#check
@Service
public class CheckServiceImpl implements CheckService {
    ...

    @Override
    public CheckResult check(List<CheckRequest> requests, String ip, QFileFactory qFileFactory) {
        List<CheckRequest> requestsNoFile = Lists.newArrayList();
        Map<CheckRequest, Changed> changes = Maps.newHashMap();
        Map<CheckRequest, QFile> requestNoChange = Maps.newHashMap();
        Map<CheckRequest, QFile> requestsLockByFixVersion = Maps.newHashMap();
        for (CheckRequest request : requests) {
            ConfigMeta meta = new ConfigMeta(request.getGroup(), request.getDataId(), request.getProfile());
            Optional<QFile> qFileOptional = qFileFactory.create(meta, cacheConfigInfoService);
            if (!qFileOptional.isPresent()) {
                requestsNoFile.add(request);
                continue;
            }

            QFile qFile = qFileOptional.get();
            // 核心逻辑,检测版本
            Optional<Changed> changedOptional = qFile.checkChange(request, ip);
            if (changedOptional.isPresent()) {
                Optional<Changed> resultChange = repairChangeWithFixVersion(qFile, request, ip, changedOptional.get());
                if (resultChange.isPresent()) {
                    changes.put(request, resultChange.get());
                } else {
                    requestsLockByFixVersion.put(request, qFile);
                }
            } else {
                requestNoChange.put(request, qFile);
            }
        }
        return new CheckResult(requestsNoFile, changes, requestNoChange, requestsLockByFixVersion);
    }
}
2.1.3.5、QFileEntityV1#checkChange
public class QFileEntityV1 extends AbstractQFileEntity implements QFile {

    public QFileEntityV1(ConfigMeta meta,
                         CacheConfigInfoService cacheConfigInfoService,
                         ConfigStore configStore,
                         LogService logService,
                         ClientInfoService clientInfoService) {
        super(meta, cacheConfigInfoService, configStore, logService, clientInfoService);
    }

    @Override
    public Optional<Changed> checkChange(CheckRequest request, String ip) {
        ConfigMeta meta = getSourceMeta();
        // 从缓存中获取配置文件的最新版本
        Optional<Long> version = getCacheConfigInfoService().getVersion(meta, ip);
        if (!version.isPresent()) {
            return Optional.absent();
        }

        if (version.get() <= request.getVersion()) {
            return Optional.absent();
        }

        return Optional.of(new Changed(meta.getGroup(), meta.getDataId(), meta.getProfile(), version.get()));
    }
}
2.1.3.6、CacheConfigInfoService#getVersion
@Service("cacheConfigInfoService")
public class CacheConfigInfoService implements ConfigInfoService {
    ... 
    @Override
    public Optional<Long> getVersion(ConfigMeta meta, String ip) {
        // 获取配置已发布的最新版本
        Optional<Long> publishVersion = getVersion(meta);
        // 获取推送给该IP的配置的最新灰度版本
        Optional<Long> pushVersion = getPushVersion(meta, ip);
        return VersionUtil.getLoadVersion(publishVersion, pushVersion);
    }
}

3、Client配置拉取

3.1.1、逻辑描述

根据长轮询后Client端获取到的配置文件对应的最新版本信息,查询最新的配置数据。查询顺序是先查询缓存,如果查找不到则通过本地文件查找,如果再查不到则查询数据库。这样可以有效缓解数据库压力。

3.1.2、代码位置

3.1.2.1、ConfigStoreImpl#findConfig
@Service
public class ConfigStoreImpl implements ConfigStore {

    private LoadingCache<VersionData<ConfigMeta>, ChecksumData<String>> configCache;

    @PostConstruct
    private void init() {
        configCache = CacheBuilder.newBuilder()
                .maximumSize(5000) // 最大数量
                .expireAfterAccess(10, TimeUnit.SECONDS) // 访问失效时间
                .recordStats()
                .build(new CacheLoader<VersionData<ConfigMeta>, ChecksumData<String>>() {
                    @Override
                    public ChecksumData<String> load(VersionData<ConfigMeta> configId) throws ConfigNotFoundException {
                        
                        return loadConfig(configId);
                    }
                });

        Metrics.gauge("configFile_notFound_cache_hitRate", new Supplier<Double>() {
            @Override
            public Double get() {
                return configCache.stats().hitRate();
            }
        });
    }

    /**
     * 查本地guava cache
     */
    @Override
    public ChecksumData<String> findConfig(VersionData<ConfigMeta> configId) throws ConfigNotFoundException {
        try {
            return configCache.get(configId);
        } catch (ExecutionException e) {
            if (e.getCause() instanceof ConfigNotFoundException) {
                throw (ConfigNotFoundException) e.getCause();
            } else {
                log.error("find config error, configId:{}", configId, e);
                throw new RuntimeException(e.getCause());
            }
        }
    }

    /**
     * 从本地文件或数据库中获取配置信息
     */
    private ChecksumData<String> loadConfig(VersionData<ConfigMeta> configId) throws ConfigNotFoundException {
        // 从本地配置文件中查询配置信息
        ChecksumData<String> config = findFromDisk(configId);
        if (config != null) {
            return config;
        }

        String groupId = configId.getData().getGroup();
        Monitor.notFoundConfigFileFromDiskCounterInc(groupId);
        log.warn("config not found from disk: {}", configId);
        // 从数据库中加载配置数据
        config = findFromDb(configId);
        if (config != null) {
            return config;
        }
        Monitor.notFoundConfigFileFromDbCounterInc(groupId);

        throw new ConfigNotFoundException();
    }

    private ChecksumData<String> findFromDb(VersionData<ConfigMeta> configId) {
        ChecksumData<String> config = configDao.loadFromCandidateSnapshot(configId);
        if (config != null) {
            saveToFile(configId, config);
        }
        return config;
    }
}

三、最后

《码头工人的一千零一夜》是一位专注于技术干货分享的博主,追随博主的文章,你将深入了解业界最新的技术趋势,以及在Java开发和安全领域的实用经验分享。无论你是开发人员还是对逆向工程感兴趣的爱好者,都能在《码头工人的一千零一夜》找到有价值的知识和见解。

懂得不多,做得太少。欢迎批评、指正。

简单概括

你应该使用自己的模型,还是使用 LLM API?创建你自己的模型可以让你完全控制,但需要数据收集、训练和部署方面的专业知识。LLM API 使用起来更简单,但会将数据发送给第三方,并对提供商有强烈依赖。这篇博客让你可以将 LLM 的便利性与定制模型的控制性和效率相结合。

在一个关于识别新闻报道中投资者情绪的案例研究中,我们展示了如何使用开源 LLM 创建合成数据,并在几个步骤中训练你的定制模型。我们定制的 RoBERTa 模型可以分析大型新闻数据集,与 GPT4 相比性能一致都是 (94% acc 和 0.94 的 F1 macro),我们只需 2.7 美元,排碳 0.12kg,延迟 0.13s ; 而 GPT4 要费 3061 美元,排碳约 735 到 1100 kg ,延迟多秒。这里提供了
notebooks
方便你用于自己的研究。

1. 问题: 你的使用案例没有数据

想象一下你的老板让你去建一个你公司的情感分析系统。你可以在 Hugging Face Hub 上找到 100,000+ 个数据集,这其中包含标题含有 “sentiment” 的字段的数据集, Twitter 上的情感数据集、诗歌中的情感或以希伯来语的情感数据集。这很好,但比如你在一家金融机构工作并且你追踪你投资组合中特定品牌的市场情绪,那么你可能会发现上面这些数据集没有一个有用的。虽然机器学习需要处理数百万的任务公司,但正巧别人已经收集并发布你公司的这个案例的数据集的可能性机会几乎为零。

由于对特定领域的数据集和模型的缺失,许多人尝试用更加通用的 LLM。这些模型都非常大和通用,以至于它们可以开箱即用,并实现令人印象深刻的准确度。它们的易于使用的 API 消除了对微调和对部署的专业知识的需求。但他们最主要的缺点是大小和可控性: 大小超过十亿到万亿的参数运行在计算集群中,控制权只在少数的几家公司手中。

2. 解决方案: 合成数据来高效蒸馏学生模型

在 2023 年,一个东西根本的改变了机器学习的蓝图,LLM 开始达到和人类数据标注者相同的水平。现在有大量的证据表明,最好的 LLM 比众包工人更胜一筹,并且在创建高质量 (合成的) 数据中部分达到了专家水平 (例如
Zheng et al. 2023
,
Gilardi et al. 2023
,
He et al. 2023
)。这一进展的重要性怎么强调都不为过。创建定制模型的关键瓶颈在于招募和协调人工工作者以创造定制训练数据的资金、时间和专业知识需求。随着大型语言模型 (LLMs) 开始达到人类水平,高质量的数据标注现在可以通过 API 获得; 可复制的注释指令可以作为提示 prompt 发送; 合成数据几乎可以立即返回,唯一的瓶颈就剩计算能力了。

在 2024 年,这种方法将变得具有商业可行性,并提升开源对大中小企业的重要性。在 2023 年的大部分时间里,由于 LLM API 提供商的限制性商业条款,LLMs 的商业用途在标注数据方面被阻止。随着像
Mistral

Mixtral-8x7B-Instruct-v0.1
这样的模型的推出,LLM 数据标注和合成数据现在对商业用途开放。
Mixtral 的表现与 GPT3.5 相当
,并且由于它的 Apache 2.0 许可证,其合成数据输出可以作为商业用例中较小、专业化的模型 (“学生”) 的训练数据。这篇博客提供了一个示例,这将显著加快你自己的定制模型的创建速度,同时大幅降低长期推理成本。

3. 案例分析: 监控金融情绪

想象你是一个数据科学家,正在为一家大型投资公司工作。你的任务是监控经济新闻情绪,以帮助公司做出投资决策。最近,你有两个主要选择:

  1. 你可以微调你自己的模型。这需要编写标注指令,创建标注接口,招人,引入质量保证措施以处理低质量数据,在这个数据上微调模型,并部署。
  2. 或者,你可以按照指令将数据发送到 LLM API。你完全跳过微调和部署步骤,将数据分析过程简化为编写指令 (提示),然后发送给 API 背后的“LLM 标注器”。在这种情况下,LLM API 就是你的最终推理解决方案,你直接使用 LLM 的输出进行分析。

尽管选项 2 在推理时间上更贵,并且需要你发送敏感数据到第三方,但选项 2 比选项 1 更容易设置,因此被许多开发人员使用。

在 2024 年,合成数据将提供第三个选项: 结合选项 1 的成本效益与选项 2 的简易性。你可以使用一个 LLM (老师模型) 去标注一个你的小数据样本,并在这个数据集上微调一个小的,高效的语言模型 (学生模型)。这种方法可以在几步内执行完成。

3.1 给 LLM 提示来标注你的数据

我们使用
financial_phrasebank
情感数据集作为示例,但你可以将代码适配到任何其他用例。financial_phrasebank 任务是一个 3 类分类任务,其中 16 位专家从投资者视角对芬兰公司金融新闻中的句子进行“积极”/“消极”/“中性”标注 (
Malo et al. 2013
)。例如,数据集中包含这样一句话: “对于 2010 年最后一个季度,Componenta 的净销售额翻倍,达到 1.31 亿欧元,而去年同期为 7600 万欧元”,标注者从投资者视角将其归类为“积极”。

我们首先安装一些必需的库。

!pip install datasets # for loading the example dataset
!pip install huggingface_hub # for secure token handling
!pip install requests # for making API requests
!pip install scikit-learn # for evaluation metrics
!pip install pandas # for post-processing some data
!pip install tqdm # for progress bars

然后,我们可以下载带有专家标注的示例数据集。

from datasets import load_dataset

dataset = load_dataset("financial_phrasebank", "sentences_allagree", split='train')

# create a new column with the numeric label verbalised as label_text (e.g. "positive" instead of "0")
label_map = {
    i: label_text
    for i, label_text in enumerate(dataset.features["label"].names)
}

def add_label_text(example):
    example["label_text"] = label_map[example["label"]]
    return example

dataset = dataset.map(add_label_text)

print(dataset)
# Dataset({
# features: ['sentence', 'label', 'label_text'],
# num_rows: 2264
#})

现在我们写一个短的标注指令,针对
financial_phrasebank
任务,并将其格式化为一个 LLM 提示。这个提示类似于你通常提供给众包工人的指令。

prompt_financial_sentiment = """\
You are a highly qualified expert trained to annotate machine learning training data.

Your task is to analyze the sentiment in the TEXT below from an investor perspective and label it with only one the three labels:
positive, negative, or neutral.

Base your label decision only on the TEXT and do not speculate e.g. based on prior knowledge about a company.

Do not provide any explanations and only respond with one of the labels as one word: negative, positive, or neutral

Examples:
Text: Operating profit increased, from EUR 7m to 9m compared to the previous reporting period.
Label: positive
Text: The company generated net sales of 11.3 million euro this year.
Label: neutral
Text: Profit before taxes decreased to EUR 14m, compared to EUR 19m in the previous period.	
Label: negative

Your TEXT to analyse:
TEXT: {text}
Label: """

这个标注指令现在可以被传递给 LLM API。对于这个例子,我们使用免费 Hugging Face
无服务的推理 API
。这个 API 是测试流行模型的理想选择。请注意,如果你发送次数过多,尤其是分享给过多用户,你可能会遇到速率限制。对于更大的工作流,我们推荐创建一个
专用推理端点
。专用推理端点对于你自己的付费 API 尤为重要,特别是你可以灵活的控制开和关。

我们登录
huggingface_hub
库,简单安全的填入我们的 API token。或者,你也可以定义你自己的 token 作为环境变量。(详情可以参考
文档
)。

# you need a huggingface account and create a token here: https://huggingface.co/settings/tokens
# we can then safely call on the token with huggingface_hub.get_token()
import huggingface_hub
huggingface_hub.login()

我么定义一个简单的
generate_text
函数,用于发送我们的提示 prompt 和数据到 API。

import os
import requests

# Choose your LLM annotator
# to find available LLMs see: https://huggingface.co/docs/huggingface_hub/main/en/package_reference/inference_client#huggingface_hub.InferenceClient.list_deployed_models
API_URL = "https://api-inference.huggingface.co/models/mistralai/Mixtral-8x7B-Instruct-v0.1"

# docs on different parameters: https://huggingface.co/docs/api-inference/detailed_parameters#text-generation-task
generation_params = dict(
    top_p=0.90,
    temperature=0.8,
    max_new_tokens=128,
    return_full_text=False,
    use_cache=False
)

def generate_text(prompt=None, generation_params=None):
    payload = {
        "inputs": prompt,
        "parameters": {**generation_params}
    }
    response = requests.post(
        API_URL,
        headers={"Authorization": f"Bearer {huggingface_hub.get_token()}"},
        json=payload
    )
    return response.json()[0]["generated_text"]

作为 LLM 可能不会总是返回标签相同的标准化格式,我们还可以定义一个短
clean_output
函数,将 LLM 从字符串输出映射到我们的三个可能标签。

labels = ["positive", "negative", "neutral"]

def clean_output(string, random_choice=True):
    for category in labels:
        if category.lower() in string.lower():
            return category
    # if the output string cannot be mapped to one of the categories, we either return "FAIL" or choose a random label
    if random_choice:
        return random.choice(labels)
    else:
        return "FAIL"

我们现在可以将我们的文本发送给 LLM 进行标注。下面的代码将每一段文本发送到 LLM API,并将文本输出映射到我们的三个清晰类别。注意: 在实际操作中,逐个文本迭代并将它们分别发送到 API 是非常低效的。API 可以同时处理多个文本,你可以异步地批量向 API 发送文本来显著加快 API 调用速度。你可以在本博客的
复现仓库
中找到优化后的代码。

output_simple = []
for text in dataset["sentence"]:
    # add text into the prompt template
    prompt_formatted = prompt_financial_sentiment.format(text=text)
    # send text to API
    output = generate_text(
        prompt=prompt_formatted, generation_params=generation_params
    )
    # clean output
    output_cl = clean_output(output, random_choice=True)
    output_simple.append(output_cl)

基于这个输出,我么可以计算指标来查看模型在不对其进行训练的情况下是否准确地完成了任务。

from sklearn.metrics import classification_report

def compute_metrics(label_experts, label_pred):
    # classification report gives us both aggregate and per-class metrics
    metrics_report = classification_report(
        label_experts, label_pred, digits=2, output_dict=True, zero_division='warn'
    )
    return metrics_report

label_experts = dataset["label_text"]
label_pred = output_simple

metrics = compute_metrics(label_experts, label_pred)

基于简单的提示 prompt,LLM 正确分类了 91.6% 的文本 (0.916 准确率和 0.916 F1 macro)。考虑到它没有训练来完成这个具体任务,这相当不错。

我们通过使用两个简单的提示 Prompt 技巧来进一步提升精度: 思维链 COT 和 自我一致 SC。CoT 要求模型首先对正确的标签进行推理,然后再做出标注决策,而不是立即决定正确的标签。SC 意味着多次向同一个 LLM 发送相同文本的相同提示。SC 有效地为 LLM 提供了针对每段文本的多条不同的推理路径,如果 LLM 回应“积极”两次和“中性”一次,我们选择多数 (“积极”) 作为正确的标签。这是我们为 CoT 和 SC 更新的提示:

prompt_financial_sentiment_cot = """\
You are a highly qualified expert trained to annotate machine learning training data.

Your task is to briefly analyze the sentiment in the TEXT below from an investor perspective and then label it with only one the three labels:
positive, negative, neutral.

Base your label decision only on the TEXT and do not speculate e.g. based on prior knowledge about a company.

You first reason step by step about the correct label and then return your label.

You ALWAYS respond only in the following JSON format: {{"reason": "...", "label": "..."}}
You only respond with one single JSON response.

Examples:
Text: Operating profit increased, from EUR 7m to 9m compared to the previous reporting period.
JSON response: {{"reason": "An increase in operating profit is positive for investors", "label": "positive"}}
Text: The company generated net sales of 11.3 million euro this year.
JSON response: {{"reason": "The text only mentions financials without indication if they are better or worse than before", "label": "neutral"}}
Text: Profit before taxes decreased to EUR 14m, compared to EUR 19m in the previous period.	
JSON response: {{"reason": "A decrease in profit is negative for investors", "label": "negative"}}

Your TEXT to analyse:
TEXT: {text}
JSON response: """

这是一个 JSON 提示,我们要求 LLM 返回一个结构化的 JSON 字符串,其中 “reason” 作为一个键,“label” 作为另一个键。JSON 的主要优点是我们可以将其解析为 Python 字典,然后提取 “label” 。如果我们想了解 LLM 选择这个标签的原因,我们也可以提取 “reason”。

process_output_cot
函数解析 LLM 返回的 JSON 字符串,并且如果 LLM 没有返回有效的 JSON,它会尝试使用上面定义的
clean_output
函数通过简单的字符串匹配来识别标签。

import ast

def process_output_cot(output):
    try:
        output_dic = ast.literal_eval(output)
        return output_dic
    except Exception as e:
        # if json/dict parse fails, do simple search for occurance of first label term
        print(f"Parsing failed for output: {output}, Error: {e}")
        output_cl = clean_output(output, random_choice=False)
        output_dic = {"reason": "FAIL", "label": output_cl}
        return output_dic

现在,我们可以使用上面新的提示重复使用我们的
generate_text
函数,用
process_output_cot
处理 JSON 的 COT 输出,并且为了 SC 多次发送每个提示。

self_consistency_iterations = 3

output_cot_multiple = []
for _ in range(self_consistency_iterations):
    output_lst_step = []
    for text in tqdm(dataset["sentence"]):
        prompt_formatted = prompt_financial_sentiment_cot.format(text=text)
        output = generate_text(
            prompt=prompt_formatted, generation_params=generation_params
        )
        output_dic = process_output_cot(output)
        output_lst_step.append(output_dic["label"])

    output_cot_multiple.append(output_lst_step)

对于每段文本,我们现在的 LLM 标注器有了三次尝试来识别正确标签,并采用了三种不同的推理路径。下面的代码从这三条路径中选择了多数标签。

import pandas as pd
from collections import Counter

def find_majority(row):
    # Count occurrences
    count = Counter(row)
    # Find majority
    majority = count.most_common(1)[0]
    # Check if it's a real majority or if all labels are equally frequent
    if majority[1] > 1:
        return majority[0]
    else: # in case all labels appear with equal frequency
        return random.choice(labels)

df_output = pd.DataFrame(data=output_cot_multiple).T

df_output['label_pred_cot_multiple'] = df_output.apply(find_majority, axis=1)

现在,我们可以比较我们的改进的 LLM 标签与专家标签,并计算指标。

label_experts = dataset["label_text"]
label_pred_cot_multiple = df_output['label_pred_cot_multiple']

metrics_cot_multiple = compute_metrics(label_experts, label_pred_cot_multiple)

CoT 和 SC 将性能提升到了 94.0% 的准确率和 0.94 的 F1 macro。通过给模型时间来考虑其标签决策,并给予它多次尝试,我们提升了性能。请注意,CoT 和 SC 需要额外的计算资源。我们本质上是在用计算资源购买标注的准确性。

现在,我们通过这些简单的 LLM API 调用创建了一个合成训练数据集。我们在做出标签决策之前,让 LLM 尝试了三种不同的推理路径来标注每段文本。结果是,这些标签与人类专家的高度一致,并且我们得到了一个高质量的数据集,可以用来训练更高效、更专业的模型。

df_train = pd.DataFrame({
    "text": dataset["sentence"],
    "labels": df_output['label_pred_cot_multiple']
})

df_train.to_csv("df_train.csv")

请注意,在这篇博客文章的
完整复现脚本
中,我们还将仅基于专家标注创建一个测试集,以评估所有模型的质量。所有指标始终基于这个人类专家测试集。

3.2 将开源模型与专有模型进行比较

使用开源的 Mixtral 模型创建的这种数据的主要优势在于,这些数据在商业上完全可用,且没有法律上的不确定性。例如,使用 OpenAI API 创建的数据受
OpenAI 商业条款
的约束,这些条款明确禁止将模型输出用于训练与他们的产品和服务竞争的模型。这些条款的法律价值和意义尚不明确,但它们为使用 OpenAI 模型合成的数据训练模型的商业使用引入了法律上的不确定性。任何使用合成数据训练的更小、更高效的模型都可能被视为竞争者,因为它减少了对 API 服务的依赖。

开源的 Mistral 的
Mixtral-8x7B-Instruct-v0.1
与 OpenAI 的 GPT3.5 和 GPT4 之间合成的数据质量如何比较呢?我们使用
gpt-3.5-turbo-0613

gpt-4-0125-preview
运行了上述相同的流程和提示,并在下表中报告了结果。我们看到,Mixtral 在这个任务上的表现优于 GPT3.5,并且与 GPT4 相当,这取决于提示类型。(我们没有显示新版本的 gpt-3.5-turbo-0125 的结果,因为不知何故,这个模型的表现比旧版本的默认 gpt-3.5-turbo-0613 要差)。

请注意,这并不意味着 Mixtral 总是比 GPT3.5 更好,与 GPT4 相当。GPT4 在多个基准测试上的表现更好。主要想表达的是,开源模型现在可以创建高质量的合成数据。

3.3 理解并验证你合成的数据

所有这些在实践中意味着什么呢?到目前为止,结果只是由一些黑盒 LLM 标注的数据。我们只能计算指标,因为我们有来自示例数据集的专家标注的参考数据。如果在真实世界的场景中没有专家标注,我们如何信任 LLM 的标注呢?

在实践中,无论你使用哪种标注器 (人类标注或 LLM ),你只能信任你自己验证过的数据。指令/提示总是包含一定程度的模糊性。即使是一个完美智能的标注也可能犯错误,并且在面对通常模糊的现实世界数据时,必须做出不明确的决定。

幸运的是,随着近年来开源工具的出现,数据验证变得更加简单:
Argilla
提供了一个免费的界面,用于验证和清理非结构化的 LLM 输出;
LabelStudio
使你能够以多种方式标注数据;
CleanLab
提供了一个用于标注和自动清理结构化数据的界面; 对于快速和简单的验证,仅在简单的 Excel 文件中标注也可能是可以的。

花些时间标注文本,以了解数据和其模糊性,这是非常重要的。你会很快发现模型犯了一些错误,但也会有几个例子,正确的标签是不明确的,有些文本你更同意 LLM 的决定,而不是创建数据集的专家。这些错误和模糊性是数据集创建的正常部分。实际上,只有极少数现实世界的任务中,人类专家的基线是完全一致的。这是一个古老的见解,最近被机器学习文献“重新发现”,即人类数据是一个有缺陷的金标准 (
Krippendorf 2004
,
Hosking et al. 2024
)。

在标注界面不到一个小时的时间里,我们更好地了解了我们的数据并纠正了一些错误。然而,为了可复现性,以及展示纯粹合成数据的质量,我们在下一步继续使用未清理的 LLM 标注。

3.4 使用 AutoTrain 调整你高效、专业的模型

到目前为止,我们已经经历了一个标准的流程,即通过 API 提示 LLM 并验证输出。现在,进入一个额外的步骤,以实现显著的资源节约: 我们将在 LLM 的合成数据上微调一个更小、更高效和专业化的 LM。这个过程也被称为“蒸馏”,其中较大模型的输出 (“教师”) 用于训练一个较小的模型 (“学生”)。虽然这听起来很复杂,但它本质上只意味着我们使用数据集中的原始
text
,并将 LLM 的预测作为我们微调的
labels
。如果你以前训练过分类器,你知道,使用
transformers

sklearn
或其他库,你只需要这两个列来训练一个分类器。

我们使用 Hugging Face 的
AutoTrain
解决方案使这个过程更加简单。AutoTrain 是一个无代码界面,它使你能够上传一个带有标记数据的
.csv
文件,该服务然后使用它为你自动微调模型。这消除了为训练你自己的模型编写代码或深入微调专业知识的需求。

在 Hugging Face 网站上,我们首先在顶部点击 “Spaces”,然后点击 “Create new Space”。然后选择 “Docker”>“AutoTrain” 并选择一个小型 A10G GPU,每小时成本为 $1.05。AutoTrain 的空间将然后初始化。然后,我们可以通过界面上传我们的合成训练数据和专家测试数据,并调整不同的字段,如下面的截图所示。填写所有内容后,我们可以点击 “Start Training”,并在 Space 的日志中跟踪训练过程。仅在 1811 个数据点上训练一个小型的 RoBERTa-base 模型 (~0.13 B 参数) 非常快,可能不需要超过几分钟。一旦训练完成,模型将自动上传到你的 HF 个人资料。一旦训练完成,space 就会停止,整个过程最多应该需要 15 分钟,成本不到 $1。

如果你愿意,你也可以完全在你自己的硬件上本地使用 AutoTrain,请参阅我们的
文档
。高级用户当然总是可以编写自己的训练脚本,但对于这些默认的超参数,AutoTrain 的结果对于许多分类任务来说应该足够了。

我们最终微调的约 0.13B 参数的 RoBERTa-base 模型与更大的 LLM 相比表现如何?下图显示,在 1811 个文本上微调的自定义模型达到了 94% 的准确率,与它的老师 Mixtral 和 GPT4 一样!一个小型模型当然无法与一个更大型的 LLM 出厂即战,但通过在一些高质量数据上进行微调,它可以达到在它所专长的任务上与大型 LLM 相同的性能水平。

3.5 不同方法的利弊

我们在开始时讨论的三种方法的总体优缺点是什么:(1) 手动创建你自己的数据和模型,(2) 仅使用 LLM API,或者 (3) 使用 LLM API 创建用于专业模型的合成数据?下面的表格显示了不同因素之间的权衡,我们将在下面根据我们的示例数据集讨论不同的指标。

让我们从任务性能开始。如上所示,专业模型与更大型的 LLM 表现相当。微调后的模型只能执行我们训练它执行的特定任务,但它在执行这个特定任务方面表现非常好。要创建更多训练数据来将模型适应到新的领域或更复杂的任务是轻而易举的。多亏了 LLM 的合成数据,由于缺乏专业数据而导致的低性能不再是问题。

其次,计算成本和推理速度。实际中的主要计算成本将是推理,即在训练后运行模型。假设在你的生产用例中,你需要在给定时间段内处理 100 万句话。我们的微调 RoBERTa-base 模型在一个带有 16GB RAM 的小型 T4 GPU 上运行效率很高,在
推理端点
上的成本为每小时 $0.6。它具有 0.13s 的延迟和每秒 61 句话的吞吐量 (
batch_size=8
)。这使得处理 100 万句话的总成本为 $2.7。

使用 GPT 模型,我们可以通过计算 token 来计算推理成本。处理 100 万句话的 toekn 将花费 GPT3.5 约 $153,GPT4 约 $3061。这些模型的延迟和吞吐量更加复杂,因为它们根据一天中的当前服务器负载而变化。任何使用 GPT4 的人都清楚,延迟通常可以是多秒,并且受到速率限制。请注意,速度是任何 LLM (API) 的问题,包括开源 LLM。许多生成型 LLM 由于过大而无法快速运行。

训练计算成本往往不太相关,因为 LLM 可以不经过微调直接使用,且小型模型的微调成本相对较小 (微调 RoBERTa-base 的成本不到 $1)。只有在需要将大型生成型 LLM 专门化以执行特定生成任务时,才需要投资从头开始预训练模型。当微调一个更大的生成型 LLM 以使其适应特定生成任务时,训练成本可能变得相关。

第三,在时间和专业知识方面的投资。这是 LLM API 的主要优势。与手动收集数据、微调定制模型和部署相比,向 API 发送指令要容易得多。这正是使用 LLM API 创建合成数据变得重要的地方。创建良好的训练数据变得显著更容易。然后,微调和部署可以由 AutoTrain 等服务和专业推理端点处理。

第四,控制。这可能是 LLM API 的主要缺点。按设计,LLM API 使你依赖于 LLM API 提供商。你需要将敏感数据发送到别人的服务器,并且你无法控制系统的可靠性和速度。自己训练模型可以让你选择如何和在哪里部署它。

最后,环境影响。由于缺乏有关模型架构和硬件基础设施的信息,很难估计 GPT4 等封闭模型的能源消耗和二氧化碳排放。我们找到的
最佳 (但非常粗略) 估计
显示,GPT4 查询的能源消耗约为 0.0017 至 0.0026 千瓦时。这将是分析 100 万句话的大致 1700 至 2600 千瓦时。根据
EPA 二氧化碳当量计算器
,这相当于 0.735 至 1.1 公吨二氧化碳,或平均汽车行驶 1885 至 2883 英里。请注意,实际二氧化碳排放可以根据 LLM 特定计算区域的能源混合而有很大差异。与我们的自定义模型相比,这个估计要容易得多。使用自定义模型分析 100 万句话,在一个 T4 GPU 上大约需要 4.52 小时,在 US East N. Virginia 的 AWS 服务器上,这相当于大约 0.12 公斤二氧化碳 (见
ML CO2 Impact calculator
)。与具有 (据称) 8x220B 参数的通用 LLM 相比,运行一个专门化的模型 (约 0.13B 参数) 的效率低下得多。

结论

我们已经展示了使用 LLM 创建合成数据来训练一个更小、更高效的模型的巨大好处。虽然这个例子只处理了投资者情绪分类,但同样的流程可以应用于许多其他任务,从其他分类任务 (例如,客户意图检测或有害内容检测),到 token 分类 (例如,命名实体识别或 PII 检测),或生成任务 (例如,总结或问答)。

在 2024 年,公司创建自己的高效模型、控制自己的数据和基础设施、减少二氧化碳排放、节省计算成本和时间,而不必妥协准确性的难度从未如此之低。

现在,亲自动手尝试一下!你可以在本博客文章中找到所有数字的完整复现代码,以及更高效的异步函数和批量 API 调用的代码,在
复现仓库
中。我们邀请你复制并适配我们的代码以应用于你的用例!


英文原文:
https://hf.co/blog/synthetic-data-save-costs

原文作者: Moritz Laurer

译者: innovation64

什么是订单履约系统?

订单履约系统用来管理从接收客户订单到将商品送达客户手中的全过程。

它连接了上游交易(客户在销售平台下单环)和下游仓储配送(如库存管理、物流配送),确保信息流顺畅、操作协同,提升整个供应链的效率和响应速度。

系统定位

Untitled

订单履约系统的目标是让订单处理更快、更清晰,提高客户体验。

履约过程需要快速处理订单,同时为客户提供订单、物流信息的实时更新。保证每个订单都能准时、正确地完成,不仅要提高库存和物流配送的效率,降低成本,还要提升客户对履约服务的满意度。

业务流程

Untitled

订单履约过程是一系列步骤,从客户下单到商品交给客户,包含很多步骤,例如客户在销售平台下订单,订单履约系统接收订单,仓库或门店备货和发货,配送小哥交付给客户。每一步都必须顺利进行,确保整个过程高效。

履约流程的关键是协同顺畅,只有各系统相互配合,订单能从头到尾顺利完成各个环节,才能确保在客户约定的时间内完成履约。任何一个环节出问题都会导致履约时间延长,降低客户满意度。

  • 接收订单:当客户在销售平台下单后,第一步是收订单。这个步骤需要收集和确认订单信息,包括销售店铺信息、客户信息、商品信息、收发货地址、交付信息、支付信息等。在这个阶段,系统会检查订单是否有效,确保所有的订单信息都是完整的。
  • 订单拆单:此环节的目标是把复杂的订单分解成更好管理的子订单。通常根据订单类型、商品类型、存放位置、履约要求等因素来分解。比如,需要从不同地方发货的商品、预售商品。拆分子订单可以提高我们处理的速度,减少物流的费用,每个小订单都可以根据最佳的履约流程来处理。
  • 派单:该步骤会基于物流配送的因素进行决策,比如物流公司对包裹的重量和体积有限制、客户需分批送达或特定时间送达,在派单环节,可能会进一步拆单,分配给合适的仓库或门店。
  • 预占库存:该环节可以防止在处理订单时,库存被其他订单占用,防止超卖情况发生,是库存管理的关键环节,确保了库存的准确性。
  • 改派:在履约过程中,可能会因为库存不足、配送地址问题或其他突发情况,需要把订单转给另一个仓库或门店。改派环节允许订单根据实际情况进行调整。这个过程有助于更好地利用资源,确保订单能快速准确地完成。
  • 拣货:指根据订单信息从库存中挑选出下单商品的过程。这个环节要求高度的准确性和效率,拣选错误,会直接影响客户满意度。仓库工作人员通常会使用手持终端设备,确保按照订单作业的准确性。
  • 打包:拣货后商品会被妥善包装,保证运输安全。包装时会贴上运输标签和配送信息,确保商品能顺利送达。
  • 出库:打包好的商品被快递员或配送小哥揽收后,会被记录为已出库,这就意味着商品已经离开门店/仓库。
  • 物流配送:商品出库后,将通过快递或同城配送方式进行运送。这一阶段,物流公司或配送公司负责将商品运送到客户指定的收货地址。
  • 确认收货:当客户收到并确认商品没有问题后,订单就算完成了。客户通常在网上确认收货。这个环节是记录服务时间、收集客户反馈的时机。

核心概念模型

Untitled

在整个订单履约过程中,订单是起始,子订单是订单拆分的结果,用于处理更细粒度的履约逻辑。

发货单则是具体的执行单据,指导商品从仓库到客户手中的具体操作任务。

这三个模型层层递进,确保整个履约链条的高效管理。

  • 订单:客户提交购物请求后,生成的买卖合同,通常包含客户信息、下单日期、所购买的商品或服务明细、价格、数量、收货地址以及支付方式等详细信息。
  • 子订单:为了更高效地进行履约,大订单可能会被拆分成多个子订单,子订单会根据商品类型、配送地址、仓库位置或供应商等因素进行拆分。
  • 发货单:根据子订单生成,指导完成订单的具体履约任务,如商品的拣选、包装、出库、配送等。

订单拆分场景

单门店履约场景

在连锁模式下,系统会自动根据用户的收货地址匹配最近的门店。

如果匹配到某个门店,且门店库存充足,能完成履约服务。在这种情况下,不会对订单进行拆分,直接分配给门店进行发货。

Untitled

多仓库履约场景

有些商家有多个仓库,不同的商品存放在不同的门店或仓库里。

当用户下单时,如果订单内的商品在不同的仓库,就需要拆分订单,把拆分后的子订单匹配到对应的仓库中,然后根据商品的数量进行备货和出库。

Untitled

按订单类型、商品类型拆分

由于订单和商品类型的差异,我们需要将其拆分成不同类型的子订单。

商品中包括跨境商品、分销商品等,我们会根据不同的商品类型自动拆分。

对于生鲜水果、冷链食品以及其他易碎物品,由于它们对快递的保护性和及时性有较高的要求,我们需要单独包装并发货。如果订单中包含这类商品,会对订单进行拆分处理。

Untitled

按物流场景拆分

物流公司通常对包裹的重量和体积有限制。如果订单中的商品超过这些限制,就需要将订单拆分为多个发货单来发货。

从成本的角度考虑,在某些情况下,将大量商品分成多个发货单可能会比一个大包裹发货更省钱。

客户可能会有特殊的物流要求,如分批送达或特定时间送达,需要将订单拆分为多个发货单。例如预售商品与其他商品一起下单,需要等到预售商品到货后再发货。

Untitled

系统的核心能力

通过分析订单履约的全流程和各个业务活动,我们可以梳理出订单履约流程所需的核心业务能力,分别为履约服务表达、履约调度和物流配送。

Untitled

  • 履约服务表达:负责清楚、准确地向客户传递履约服务的能力,包括订单处理时间、配送时间、费用计算和服务范围。确保客户下单时有明确的期待,并在整个订单过程中保持透明和一致。
  • 履约调度:涉及订单的接收、处理、门店/仓库分配。这一能力确保订单根据预定的规则和优先级,有效地分配给门店/仓库。提升内部操作的效率,减少履约时间,同时最大限度地减少延期情况。
  • 物流配送:确保下单商品从门店/仓库准时地运送到客户手中,这包括与第三方运力服务商的合作、配送管理、配送路径的优化以及送货执行。这部分能力由配送系统提供。

应用架构设计

Untitled

应用层定义软件的应用功能,它负责接收用户请求,协调领域层能力来执行任务,并将结果返回给用户,核心模块包括:

  • C端履约服务
    • 预计送达时间:为消费者提供订单的预计处理时间、配送时效等,通常基于订单处理时间、配送情况、配送距离等多种因素计算。
    • 实时订单状态查询:允许消费者实时查看他们的订单所处阶段。包括订单待接单、拣货、打包、已发货、配送中等状态。
    • 配送轨迹跟踪:提供订单从出库到最终送达的完整路径跟踪,消费者可以查看订单的当前位置和过往的配送节点,了解配送进度。
    • 配送信息修改:在订单还未最终发出之前,消费者可能需要更改配送信息,如地址或配送时间。
    • 配送费用明细:显示消费者的订单配送费用的详细分解,包括配送费、包装费、服务费等。
    • 确认收货:消费者可以通过系统确认收货,是完成订单流程的最后一步。
  • B端管理模块:
    • 订单派单:接收来自销售平台的订单,并按照既定规则自动分配给对应的门店/仓库。
    • 订单管理:全面管理订单的生命周期,包括订单的确认、处理、状态跟踪、修改和取消等管理操作。
    • 拣货管理:管理仓库内的拣货操作,确保商品被准确无误地从货架上拣选出来,并进行打包和发货。
    • 发货管理:全面管理发货单的生命周期,根据订单的地址、商品大小、重量和客户选择的履约方式,匹配合适的发货方式,并对发货流程进行跟踪。
    • 逆向履约:当客户不满意或需退换商品时,逆向履约模块负责处理退货请求,并管理退货退款和换货流程。

领域层是业务逻辑的核心,专注于表示业务概念、业务状态流转和业务规则,沉淀可复用的服务能力,模块包括:

  • 履约服务表达:负责向客户提供履约服务的明确信息。包括预计的送货时间、费用计算、服务选项(如定时达、次日达等)以及履约可达性要求。
  • 订单履约调度:提供订单履约调度的核心能力,确保订单被高效地处理和执行。它涉及订单从接收到最终准备配送的所有调度和处理过程,包括订单拆分、分配、拣货、包装、发货等。

订单履约系统与其他系统的依赖关系:

  • 商品管理系统:提供的商品信息,包括价格、规格、描述、分类、SKU等。
  • 中央库存系统:需要访问中央库存系统来确认下单商品的实物库存情况,包括库存数量和库存位置。
  • 配送系统:一旦商品打包完成,将依赖配送系统来处理商品的实际配送工作,包括配送安排、跟踪和状态更新。配送系统提供的配送状态和时间信息,对于订单履约系统中订单状态的更新至关重要。
  • 基础数据系统:提供组织机构信息、用户权限信息、服务商信息等基础数据。这些标准化的数据确保各个系统数据的一致性
  • 数据分析系统:订单履约系统将产生大量数据,包括订单数据、履约过程数据、配送时效数据等,这些数据需传输到数据分析系统。数据分析系统基于采集到的数据,提供分析与洞察,帮助优化订单履约流程,提升客户满意度,并提供预测分析,来辅助库存管理和需求预测。

写在最后

订单履约系统负责管理从接收客户订单到将商品送达客户手中的全过程。它连接上游交易和下游仓储配送,以提高供应链效率。

该系统的核心业务能力包括履约服务表达、履约调度和物流配送。

根据物流和商品类型等因素,订单会被拆分成子订单。这样做可以提高处理速度并减少物流费用。

在订单履约系统的应用架构中,应用层定义了软件的应用功能,包括C端履约服务和B端管理模块。领域层是业务逻辑的核心,专注于表示业务概念、业务状态流转和业务规则。

订单履约系统与商品管理系统、中央库存系统、配送系统、基础数据系统和数据分析系统等其他系统存在依赖关系。各系统通过相互协作来完成订单履约流程。

Qt 是一个跨平台C++图形界面开发库,利用Qt可以快速开发跨平台窗体应用程序,在Qt中我们可以通过拖拽的方式将不同组件放到指定的位置,实现图形化开发极大的方便了开发效率,本章将重点介绍如何运用
QThread
组件实现多线程功能。

多线程技术在程序开发中尤为常用,Qt框架中提供了
QThread
库来实现多线程功能。当你需要使用
QThread
时,需包含
QThread
模块,以下是
QThread
类的一些主要成员函数和槽函数。

成员函数/槽函数 描述
QThread(QObject *parent = nullptr) 构造函数,创建一个QThread对象。
~QThread() 析构函数,释放QThread对象。
void start(QThread::Priority priority = InheritPriority) 启动线程。
void run() 默认的线程执行函数,需要在继承QThread的子类中重新实现以定义线程的操作。
void exit(int returnCode = 0) 请求线程退出,线程将在适当的时候退出。
void quit() 请求线程退出,与exit()类似。
void terminate() 立即终止线程的执行。这是一个危险的操作,可能导致资源泄漏和未完成的操作。
void wait() 等待线程完成。主线程将被阻塞,直到该线程退出。
bool isRunning() const 检查线程是否正在运行。
void setPriority(Priority priority) 设置线程的优先级。
Priority priority() const 获取线程的优先级。
QThread::Priority priority() 获取线程的优先级。
void setStackSize(uint stackSize) 设置线程的堆栈大小(以字节为单位)。
uint stackSize() const 获取线程的堆栈大小。
void msleep(unsigned long msecs) 使线程休眠指定的毫秒数。
void sleep(unsigned long secs) 使线程休眠指定的秒数。
static QThread *currentThread() 获取当前正在执行的线程的QThread对象。
void setObjectName(const QString &name) 为线程设置一个对象名。

当我们需要创建线程时,通常第一步则是要继承
QThread
类,并重写类内的
run()
方法,在
run()
方法中,你可以编写需要在新线程中执行的代码。当你创建一个
QThread
的实例并调用它的
start()
方法时,会自动调用
run()
来执行线程逻辑,如下这样一段代码展示了如何运用线程类。

#include <QCoreApplication>
#include <QThread>
#include <QDebug>

class MyThread : public QThread
{
public:
    void run() override
    {
        for (int i = 0; i < 5; ++i)
        {
            qDebug() << "Thread is running" << i;
            sleep(1);
        }
    }
};

int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);

    MyThread thread;
    thread.start();
    thread.wait();

    qDebug() << "Main thread is done.";
    return a.exec();
}

上述代码运行后则会每隔1秒输出一段话,在主函数内通过调用
thread.start
方法启动这个线程,并通过
thread.wait
等待线程结束,如下图所示;

1.1 线程组与多线程

线程组是一种组织和管理多个线程的机制,允许将相关联的线程集中在一起,便于集中管理、协调和监控。通过线程组,可以对一组线程进行统一的生命周期管理,包括启动、停止、调度和资源分配等操作。

上述方法并未真正实现多线程功能,我们继续完善
MyThread
自定义类,在该类内增加两个标志,
is_run()
用于判断线程是否正在运行,
is_finish()
则用来判断线程是否已经完成,并在
run()
中增加打印当前线程对象名称的功能。

class MyThread: public QThread
{
protected:
    volatile bool m_to_stop;

protected:
    void run()
    {
        for(int x=0; !m_to_stop && (x <10); x++)
        {
            msleep(1000);
            std::cout << objectName().toStdString() << std::endl;
        }
    }

public:
    MyThread()
    {
        m_to_stop = false;
    }

    void stop()
    {
        m_to_stop = true;
    }

    void is_run()
    {
        std::cout << "Thread Running = " << isRunning() << std::endl;
    }

    void is_finish()
    {
        std::cout << "Thread Finished = " << isFinished() << std::endl;
    }

};

接着在主函数内调整,增加一个
MyThread thread[10]
用于存储线程组,线程组是一种用于组织和管理多个线程的概念。在不同的编程框架和操作系统中,线程组可能具有不同的实现和功能,但通常用于提供一种集中管理和协调一组相关线程的机制。

我们通过循环的方式依次对线程组进行赋值,通过调用
setObjectName
对每一个线程赋予一个不同的名称,当需要使用这些线程时则可以通过循环调用
run()
方法来实现,而结束调用同样如此,如下是调用的具体实现;

#include <QCoreApplication>
#include <iostream>
#include <QThread>

int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);

    // 定义线程数组
    MyThread thread[10];

    // 设置线程对象名字
    for(int x=0;x<10;x++)
    {
        thread[x].setObjectName(QString("thread => %1").arg(x));
    }

    // 批量调用run执行
    for(int x=0;x<10;x++)
    {
        thread[x].start();
        thread[x].is_run();
        thread[x].isFinished();
    }

    // 批量调用stop关闭
    for(int x=0;x<10;x++)
    {
        thread[x].wait();
        thread[x].stop();

        thread[x].is_run();
        thread[x].is_finish();
    }

    return a.exec();
}

如下图则是运行后实现的多线程效果;

1.2 向线程中传递参数

向线程中传递参数是多线程编程中常见的需求,不同的编程语言和框架提供了多种方式来实现这个目标,在Qt中,由于使用的自定义线程类,所以可通过增加一个
set_value()
方法来向线程内传递参数,由于线程函数内的变量使用了
protected
属性,所以也就实现了线程间变量的隔离,当线程被执行结束后则可以通过
result()
方法获取到线程执行结果,这个线程函数如下所示;

class MyThread: public QThread
{
protected:
    int m_begin;
    int m_end;
    int m_result;

    void run()
    {
        m_result = m_begin + m_end;
    }

public:
    MyThread()
    {
        m_begin = 0;
        m_end = 0;
        m_result = 0;
    }

    // 设置参数给当前线程
    void set_value(int x,int y)
    {
        m_begin = x;
        m_end = y;
    }

    // 获取当前线程名
    void get_object_name()
    {
        std::cout << "this thread name => " << objectName().toStdString() << std::endl;
    }

    // 获取线程返回结果
    int result()
    {
        return m_result;
    }
};

在主函数中,我们通过
MyThread thread[3];
来定义3个线程组,并通过循环三次分别
thread[x].set_value()
设置三组不同的参数,当设置完成后则可以调用
thread[x].start()
方法运行这些线程,线程运行结束后则返回值将会被依次保存在
thread[x].result()
中,此时直接将其相加即可得到最终线程执行结果;

#include <QCoreApplication>
#include <iostream>
#include <QThread>

int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);

    MyThread thread[3];

    // 分别将不同的参数传入到线程函数内
    for(int x=0; x<3; x++)
    {
        thread[x].set_value(1,2);
        thread[x].setObjectName(QString("thread -> %1").arg(x));
        thread[x].start();
    }

    // 等待所有线程执行结束
    for(int x=0; x<3; x++)
    {
        thread[x].get_object_name();
        thread[x].wait();
    }

    // 获取线程返回值并相加
    int result = thread[0].result() + thread[1].result() + thread[2].result();
    std::cout << "sum => " << result << std::endl;

    return a.exec();
}

程序运行后,则可以输出三个线程相加的和;

1.3 互斥同步线程锁

QMutex
是Qt框架中提供的用于线程同步的类,用于实现互斥访问共享资源。Mutex是“互斥锁(Mutual Exclusion)”的缩写,它能够确保在任意时刻,只有一个线程可以访问被保护的资源,从而避免了多线程环境下的数据竞争和不一致性。

在Qt中,
QMutex
提供了简单而有效的线程同步机制,其基本用法包括:

  • 锁定(Lock):
    线程在访问共享资源之前,首先需要获取
    QMutex
    的锁,这通过调用
    lock()
    方法来实现。
  • 解锁(Unlock):
    当线程使用完共享资源后,需要释放
    QMutex
    的锁,以允许其他线程访问,这通过调用
    unlock()
    方法来实现。

该锁
lock()
锁定与
unlock()
解锁必须配对使用,线程锁保证线程间的互斥,利用线程锁能够保证临界资源的安全性。

  • 线程锁解决的问题: 多个线程同时操作同一个全局变量,为了防止资源的无序覆盖现象,从而需要增加锁,来实现多线程抢占资源时可以有序执行。
  • 临界资源(Critical Resource): 每次只允许一个线程进行访问 (读/写)的资源。
  • 线程间的互斥(竞争): 多个线程在同一时刻都需要访问临界资源。
  • 一般性原则: 每一个临界资源都需要一个线程锁进行保护。

我们以生产者消费者模型为例来演示锁的使用方法,生产者消费者模型是一种并发编程中常见的同步机制,用于解决多线程环境下的协作问题。该模型基于两类角色:生产者(Producer)和消费者(Consumer),它们通过共享的缓冲区进行协作。

主要特点和工作原理如下:

  1. 生产者:
    • 生产者负责产生一些资源或数据,并将其放入共享的缓冲区中。生产者在生产资源后,需要通知消费者,以便它们可以取走资源。
  2. 消费者:
    • 消费者从共享的缓冲区中取走资源,并进行相应的处理。如果缓冲区为空,消费者需要等待,直到有新的资源可用。
  3. 共享缓冲区:
    • 作为生产者和消费者之间的交换介质,共享缓冲区存储被生产者产生的资源。它需要提供对资源的安全访问,以防止竞态条件和数据不一致性。
  4. 同步机制:
    • 生产者和消费者之间需要一些同步机制,以确保在正确的时机进行资源的生产和消费。典型的同步机制包括信号量、互斥锁、条件变量等。

生产者消费者模型的典型应用场景包括异步任务处理、事件驱动系统、数据缓存等。这种模型的实现可以通过多线程编程或使用消息队列等方式来完成。

首先在全局中引入
#include <QMutex>
库,并在全局定义
static QMutex
线程锁变量,接着我们分别定义两个自定义线程函数,其中
Producer
代表生产者,而
Customer
则是消费者,生产者中负责每次产出一个随机数并将其追加到
g_store
全局变量内保存,消费者则通过
g_store.remove
每次取出一个元素。

static QMutex g_mutex;      // 线程锁
static QString g_store;     // 定义全局变量

class Producer : public QThread
{
protected:
    void run()
    {
        int count = 0;

        while(true)
        {
            // 加锁
            g_mutex.lock();

            g_store.append(QString::number((count++) % 10));
            std::cout << "Producer -> "<< g_store.toStdString() << std::endl;

            // 释放锁
            g_mutex.unlock();
            msleep(900);
        }
    }
};

class Customer : public QThread
{
protected:
    void run()
    {
        while( true )
        {
            g_mutex.lock();
            if( g_store != "" )
            {
                g_store.remove(0, 1);
                std::cout << "Curstomer -> "<< g_store.toStdString() << std::endl;
            }

            g_mutex.unlock();
            msleep(1000);
        }
    }
};

在主函数中分别定义两个线程类,并依次运行它们;

int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);

    Producer p;
    Customer c;

    p.setObjectName("producer");
    c.setObjectName("curstomer");

    p.start();
    c.start();

    return a.exec();
}

至此,生产者产生数据,消费者消费数据;如下图所示;

QMutexLocker
是Qt框架中提供的一个辅助类,它是在
QMutex
基础上简化版的线程锁,
QMutexLocker
会保护加锁区域,并自动实现互斥量的锁定和解锁操作,可以将其理解为是智能版的
QMutex
锁,通过
QMutexLocker
可以确保在作用域内始终持有锁,从而避免因为忘记释放锁而导致的问题。该锁只需要在上方代码中稍加修改即可。

使用
QMutexLocker
的一般流程如下:

  1. 创建一个
    QMutex
    对象。
  2. 创建一个
    QMutexLocker
    对象,传入需要锁定的
    QMutex

  3. QMutexLocker
    对象的作用域内进行需要互斥访问的操作。

  4. QMutexLocker
    对象超出作用域范围时,会自动释放锁。
static QMutex g_mutex;      // 线程锁
static QString g_store;     // 定义全局变量

class Producer : public QThread
{
protected:
    void run()
    {
        int count = 0;

        while(true)
        {
			// 增加智能线程锁
            QMutexLocker Locker(&g_mutex);

            g_store.append(QString::number((count++) % 10));
            std::cout << "Producer -> "<< g_store.toStdString() << std::endl;

            msleep(900);
        }
    }
};

1.4 读写同步线程锁

QReadWriteLock 是Qt框架中提供的用于实现读写锁的类。读写锁允许多个线程同时读取共享数据,但在写入数据时会互斥,确保数据的一致性和完整性。这对于大多数情况下读取频繁而写入较少的共享数据非常有用,可以提高程序的性能。

其提供了两种锁定操作:

  • 读取锁(Read Lock):
    允许多个线程同时获取读取锁,用于并行读取共享数据。在没有写入锁的情况下,多个线程可以同时持有读取锁。
  • 写入锁(Write Lock):
    写入锁是互斥的,当一个线程获取写入锁时,其他线程无法获取读取锁或写入锁。这确保了在写入数据时,不会有其他线程同时读取或写入。

互斥锁存在一个问题,每次只能有一个线程获得互斥量的权限,如果在程序中有多个线程来同时读取某个变量,那么使用互斥量必须排队,效率上会大打折扣,基于
QReadWriteLock
读写模式进行代码段锁定,即可解决互斥锁存在的问题。

#include <QCoreApplication>
#include <iostream>
#include <QThread>
#include <QMutex>
#include <QReadWriteLock>

static QReadWriteLock g_mutex;      // 线程锁
static QString g_store;             // 定义全局变量

class Producer : public QThread
{
protected:
    void run()
    {
        int count = 0;

        while(true)
        {
            // 以写入方式锁定资源
            g_mutex.lockForWrite();

            g_store.append(QString::number((count++) % 10));

            // 写入后解锁资源
            g_mutex.unlock();

            msleep(900);
        }
    }
};

class Customer : public QThread
{
protected:
    void run()
    {
        while( true )
        {
            // 以读取方式写入资源
            g_mutex.lockForRead();
            if( g_store != "" )
            {
                std::cout << "Curstomer -> "<< g_store.toStdString() << std::endl;
            }

            // 读取到后解锁资源
            g_mutex.unlock();
            msleep(1000);
        }
    }
};

int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);

    Producer p1,p2;
    Customer c1,c2;

    p1.setObjectName("producer 1");
    p2.setObjectName("producer 2");

    c1.setObjectName("curstomer 1");
    c2.setObjectName("curstomer 2");

    p1.start();
    p2.start();

    c1.start();
    c2.start();

    return a.exec();
}

该锁允许用户以同步读
lockForRead()
或同步写
lockForWrite()
两种方式实现保护资源,但只要有一个线程在以写的方式操作资源,其他线程也会等待写入操作结束后才可继续读资源。

1.5 基于信号线程锁

QSemaphore 是Qt框架中提供的用于实现信号量的类。信号量是一种用于在线程之间进行同步和通信的机制,它允许多个线程在某个共享资源上进行协调,控制对该资源的访问。
QSemaphore
的主要作用是维护一个计数器,线程可以通过获取和释放信号量来改变计数器的值。

其主要方法包括:

  • QSemaphore(int n = 0)
    :构造函数,创建一个初始计数值为
    n
    的信号量。
  • void acquire(int n = 1)
    :获取信号量,将计数器减去
    n
    。如果计数器不足,线程将阻塞等待。
  • bool tryAcquire(int n = 1)
    :尝试获取信号量,如果计数器足够,立即获取并返回
    true
    ;否则返回
    false
  • void release(int n = 1)
    :释放信号量,将计数器加上
    n
    。如果有等待的线程,其中一个将被唤醒。

信号量是特殊的线程锁,信号量允许N个线程同时访问临界资源,通过
acquire()
获取到指定资源,
release()
释放指定资源。

#include <QCoreApplication>
#include <iostream>
#include <QThread>
#include <QSemaphore>

const int SIZE = 5;
unsigned char g_buff[SIZE] = {0};

QSemaphore g_sem_free(SIZE); // 5个可生产资源
QSemaphore g_sem_used(0);    // 0个可消费资源

// 生产者生产产品
class Producer : public QThread
{
protected:
    void run()
    {
        while( true )
        {
            int value = qrand() % 256;

            // 若无法获得可生产资源,阻塞在这里
            g_sem_free.acquire();

            for(int i=0; i<SIZE; i++)
            {
                if( !g_buff[i] )
                {
                    g_buff[i] = value;
                    std::cout << objectName().toStdString() << " --> " << value << std::endl;
                    break;
                }
            }

            // 可消费资源数+1
            g_sem_used.release();

            sleep(2);
        }
    }
};

// 消费者消费产品
class Customer : public QThread
{
protected:
    void run()
    {
        while( true )
        {
            // 若无法获得可消费资源,阻塞在这里
            g_sem_used.acquire();

            for(int i=0; i<SIZE; i++)
            {
                if( g_buff[i] )
                {
                    int value = g_buff[i];

                    g_buff[i] = 0;
                    std::cout << objectName().toStdString() << " --> " << value << std::endl;
                    break;
                }
            }

            // 可生产资源数+1
            g_sem_free.release();

            sleep(1);
        }
    }
};

int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);

    Producer p1;
    Customer c1;

    p1.setObjectName("producer");
    c1.setObjectName("curstomer");

    p1.start();
    c1.start();

    return a.exec();
}

前言

在我们日常工作中常用的C#跳转语句有break、continue、return,但是还有一个C#跳转语句很多同学可能都比较的陌生就是goto,今天大姚带大家一起来认识一下goto语句及其它的优缺点。

goto语句介绍

  • goto 语句由关键字 goto 后跟一个标签名称组成,通过标签名称指定跳转的位置。
  • 可以在方法的任何地方放置标签,并且可以多次使用相同的标签。

goto代码使用示例

使用goto进行代码重试示例

        /// <summary>
        /// 使用goto进行代码重试示例
        /// </summary>
        public static void GotoRetryUseExample()
        {
            int retryCount = 0;
            for (int i = 0; i < 10; i++)
            {
            retryLogic:
                try
                {
                    //模拟可能出错的操作
                    Random random = new Random();
                    int result = random.Next(0, 2);

                    if (result == 0)
                    {
                        throw new Exception("Error occurred");
                    }

                    Console.WriteLine("Operation successful on attempt: " + retryCount);
                }
                catch (Exception ex)
                {
                    retryCount++;
                    if (retryCount < 3)
                    {
                        Console.WriteLine("Error occurred, retrying...");
                        goto retryLogic; //跳转到重试逻辑
                    }
                    else
                    {
                        Console.WriteLine("Max retry limit reached.");
                        return;
                    }
                }
            }
        }

不使用goto进行代码重试示例

        /// <summary>
        /// 不使用goto进行代码重试示例
        /// </summary>
        public static void NonGotoRetryUseExample()
        {
            int retryCount = 0;
            for (int i = 0; i < 10; i++)
            {
                while (retryCount < 3)
                {
                    try
                    {
                        //模拟可能出错的操作
                        Random random = new Random();
                        int result = random.Next(0, 2);

                        if (result == 0)
                        {
                            throw new Exception("Error occurred");
                        }

                        Console.WriteLine("Operation successful on attempt: " + retryCount);
                        break;
                    }
                    catch (Exception ex)
                    {
                        retryCount++;
                        Console.WriteLine("Error occurred, retrying...");
                    }
                }

                if (retryCount == 3)
                {
                    Console.WriteLine("Max retry limit reached.");
                    return;
                }
            }
        }

goto正常输出使用示例

        /// <summary>
        /// goto正常输出使用示例
        /// </summary>
        public static void GotoGeneralUseExample(int num)
        {
            if (num < 0)
            {
                goto LessThanZero;
            }
            else if (num == 0)
            {
                goto EqualToZero;
            }
            else
            {
                goto GreaterThanZero;
            }

        LessThanZero:
            Console.WriteLine("数字小于零");
            goto End;

        EqualToZero:
            Console.WriteLine("数字等于零");
            goto End;

        GreaterThanZero:
            Console.WriteLine("数字大于零");
            goto End;
        End:
            Console.WriteLine("End...");
        }

不使用goto正常输出使用示例

        /// <summary>
        /// 不使用goto正常输出使用示例
        /// </summary>
        public static void NonGotoGeneralUseExample(int num)
        {
            if (num < 0)
            {
                Console.WriteLine("数字小于零");
            }
            else if (num == 0)
            {
                Console.WriteLine("数字等于零");
            }
            else
            {
                Console.WriteLine("数字大于零");
            }
            Console.WriteLine("End...");
        }

goto语句的优缺点

通过上述代码示例我们可以总结如下goto语句的几大优缺点,大家可以根据自己的使用场景谨慎合理的使用。

优点:

  1. 简化复杂逻辑:
    在某些情况下,
    goto
    可以帮助简化复杂的逻辑流程,减少嵌套结构。
  2. 跳出多层循环:
    可以用于直接跳出多层循环,避免使用额外的标志变量。

缺点:

  1. 降低可读性:
    过度使用
    goto
    可能会导致代码难以理解,降低代码的可读性。
  2. 增加维护难度:
    goto
    可能使代码结构复杂化,增加代码的维护难度。
  3. 潜在引入bug:
    不当使用
    goto
    可能会引入潜在的错误,打破正常的控制流程。

DotNetGuide技术社区交流群

  • DotNetGuide技术社区是一个面向.NET开发者的开源技术社区,旨在为开发者们提供全面的C#/.NET/.NET Core相关学习资料、技术分享和咨询、项目推荐、招聘资讯和解决问题的平台。
  • 在这个社区中,开发者们可以分享自己的技术文章、项目经验、遇到的疑难技术问题以及解决方案,并且还有机会结识志同道合的开发者。
  • 我们致力于构建一个积极向上、和谐友善的.NET技术交流平台,为广大.NET开发者带来更多的价值和成长机会。





欢迎加入DotNetGuide技术社区微信交流群