2024年3月


引言


大家好,我是你们的老伙计秀才!今天带来的是[深入浅出Java多线程]系列的第十二篇内容:线程池。大家觉得有用请点赞,喜欢请关注!秀才在此谢过大家了!!!

在现代软件开发中,多线程编程已经成为应对高并发、高性能场景的必备技术。随着计算机硬件的发展,尤其是多核CPU的普及,利用多线程能够充分利用系统资源,提升程序执行效率和响应速度。然而,在直接使用原生线程创建与销毁的过程中,我们往往会遇到一些难以忽视的问题:

首先,线程的创建和销毁并非无成本操作。操作系统需要分配内存空间给线程栈,以及为线程调度维护上下文切换等信息,频繁地创建和销毁线程会导致系统资源被大量消耗。尤其在处理短生命周期任务时,这种开销可能远大于实际业务逻辑执行的耗时。

其次,过多的并发线程可能会引发资源竞争问题,导致服务器过载甚至崩溃。当并发数量不受控制时,系统内存、CPU资源乃至文件句柄等关键资源都可能因过度消耗而达到瓶颈,从而影响整个系统的稳定性与性能。

再者,对线程进行分散管理会增加代码复杂度和出错风险。没有一个统一管理和协调的机制,程序员很难准确预测和控制多线程间的交互行为,例如同步问题、死锁现象及资源争抢等问题,这些问题都会降低程序的质量和可维护性。

因此,Java提供了强大的线程池机制,通过Executor接口及其核心实现类ThreadPoolExecutor来解决上述挑战。线程池能有效地复用已存在的线程,避免了频繁创建和销毁线程带来的开销;同时,它允许我们预设并动态调整线程池大小以控制并发执行的任务数,确保系统资源合理利用而不至于过载。此外,线程池还能对线程进行统一管理和异常处理,简化了多线程编程的复杂性。

例如,我们可以直观地通过Java代码实例展示线程池的优势:

ExecutorService executor = Executors.newFixedThreadPool(10); // 创建固定大小的线程池

for (int i = 0; i < 1000; i++) {
    Runnable task = new Task(i); // 假设有Task是一个实现了Runnable接口的任务类
    executor.execute(task); // 将任务提交到线程池中执行
}

executor.shutdown(); // 当所有任务提交完毕后,关闭线程池,等待所有任务执行完成

通过上述代码片段可以看到,线程池负责管理这些待执行的任务,并根据预先设定的核心线程数来高效地调度执行,极大地提升了编程效率和系统的运行效能。接下来的文章将深入剖析Java多线程之线程池原理,从线程池构造方法参数的意义,到其内部状态机设计和任务处理流程,再到线程复用的具体实现细节,全面揭示这一重要组件的工作机制和应用场景。


为什么要使用线程池


在多线程编程中,使用线程池(Thread Pool)是提高程序并发处理能力和资源利用率的关键技术。以下是采用线程池的三个主要原因:

减少系统资源消耗
创建和销毁线程是一项昂贵的操作,涉及到内存分配、上下文切换等系统资源的大量消耗。频繁创建和销毁线程可能导致性能瓶颈。线程池通过预先创建并维护一定数量的线程来复用这些线程资源,当有新任务提交时直接将任务分配给空闲线程执行,从而避免了频繁创建线程的成本。例如,在Java中,通过
ExecutorService
接口及其实现类
ThreadPoolExecutor
可以方便地创建一个线程池,并利用其管理线程生命周期,如下所示:

ExecutorService executor = Executors.newFixedThreadPool(5); // 创建包含5个核心线程的线程池

控制并发数量以防止服务器过载
在高并发场景下,如果不加限制地创建线程,可能会导致并发数量过多,超出系统承受能力,引发如内存溢出、CPU使用率过高甚至服务器崩溃等问题。线程池通过设置核心线程数(corePoolSize)与最大线程数(maximumPoolSize),能够动态调整并发执行的任务数,确保系统的稳定性和资源的有效利用。比如,当核心线程已满负荷工作时,非核心线程会在任务队列饱和后才开始创建,且一旦超过最大线程数,线程池会根据配置的拒绝策略对新提交的任务进行合理处理。

便于统一管理和维护线程
线程池提供了统一的线程管理和异常处理机制,使得程序员无需关注每个线程的具体创建和销毁过程,简化了代码逻辑。线程池还可以为线程设置优先级、命名以及自定义线程工厂等特性,进一步增强了线程管理的灵活性和可定制性。此外,线程池还支持任务完成后的回调函数,如
beforeExecute()

afterExecute()
方法,用于执行特定的前后置操作,提升了程序的整体可控性和健壮性。

通过使用线程池,我们可以更有效地组织并发执行的任务,降低开发难度,同时提高了系统的响应速度和资源使用效率。以下是一个简单的示例,展示了如何利用线程池执行多个耗时任务并控制并发数量:

class MyTask implements Runnable {
    private int taskId;

    public MyTask(int id) {
        this.taskId = id;
    }

    @Override
    public void run() {
        System.out.println("Task " + taskId + " is running in thread: " + Thread.currentThread().getName());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

// 使用线程池执行多个任务
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
    executor.execute(new MyTask(i));
}
executor.shutdown();

在这个例子中,即使有10个任务需要执行,但由于线程池大小被限制为5,所以最多只有5个任务会同时被执行,有效避免了并发数量过大带来的潜在问题。同时,当所有任务完成后,调用
shutdown()
方法优雅关闭线程池,确保资源得到释放。


线程池接口与实现


在Java中,线程池的实现基于
java.util.concurrent
包中的Executor接口及其扩展接口。其中,
ThreadPoolExecutor
作为核心实现类,提供了丰富的配置选项和灵活的任务调度机制。


Java Executor 接口

Executor接口定义了一个统一的方法execute(Runnable command),用于执行提交给它的Runnable任务,简化了线程创建和管理的过程。通过实现这个接口,可以创建具有不同策略的线程池,例如:

Executor executor = Executors.newFixedThreadPool(10); // 创建固定大小线程池
executor.execute(new Runnable() {
    @Override
    public void run() {
        // 业务逻辑代码
    }
});


ThreadPoolExecutor 类

构造方法详解
ThreadPoolExecutor提供了一系列构造函数,允许开发者自定义线程池的核心参数。主要包含以下五个基本参数:

  • corePoolSize : 核心线程数,即使没有任务处理时也会保留在线程池中的线程数量。
  • maximumPoolSize : 线程池最大容量,当工作队列满载且仍有新任务到来时,线程池将尝试增加到此值。
  • keepAliveTime : 非核心线程空闲超时时长,在指定时间内无新任务分配给非核心线程,则会销毁这些线程。
  • unit : keepAliveTime的时间单位,如秒、毫秒等。
  • workQueue : 用于存储待执行任务的阻塞队列,类型可选为LinkedBlockingQueue、ArrayBlockingQueue、SynchronousQueue或DelayQueue等。

例如:

BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(4860L, TimeUnit.SECONDS, queue);

此处创建了一个初始核心线程数为4、最大线程数为8的线程池,闲置非核心线程超过60秒会被回收,并使用链式阻塞队列来存放任务。

此外,还有两个额外的可选参数:

  • threadFactory : 定义线程工厂,用于批量创建线程并设置其属性(如守护线程、优先级等)。默认采用 DefaultThreadFactory ,可以根据需求自定义实现。
  • handler : 拒绝策略,当线程池及任务队列饱和时,无法接收新的任务时所采取的动作,默认为AbortPolicy,即抛出RejectedExecutionException异常。还可以选择DiscardPolicy(直接丢弃任务)、DiscardOldestPolicy(丢弃最早进入队列的任务以腾出空间)或CallerRunsPolicy(由提交任务的线程自行执行该任务)。


线程池执行流程

一个典型的线程池实例化示例是创建一个仅有一个核心线程的线程池,确保所有任务按顺序执行,不创建非核心线程:

ExecutorService executor = new ThreadPoolExecutor(
    1,  // corePoolSize
    1,  // maximumPoolSize
    0L// keepAliveTime
    TimeUnit.MILLISECONDS, // unit
    new LinkedBlockingQueue<>() // workQueue
);

此线程池不会因为线程数量不足而创建额外的非核心线程,所有提交的任务都会按照FIFO原则添加到队列中等待唯一的核心线程执行。

总之,Java中的线程池接口与其实现(尤其是ThreadPoolExecutor)为开发者提供了强大的并发编程工具,允许我们根据应用场景灵活调整线程资源的分配和管理策略,从而有效地提升程序性能和系统稳定性。通过深入理解其内部构造原理和运行机制,我们可以更好地设计和优化多线程应用。


线程池状态


在Java的多线程编程中,线程池的状态与生命周期管理是其核心功能之一。
ThreadPoolExecutor
类通过维护一个volatile int类型的变量runState来表示线程池的状态,该状态包括RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED五个阶段。

  • RUNNING :线程池创建后默认处于此状态,能够接受新任务并处理阻塞队列中的任务。
  • SHUTDOWN :调用 shutdown() 方法后进入此状态,不再接受新的提交任务,但会继续处理已加入队列的任务直至执行完毕。
ExecutorService executor = Executors.newFixedThreadPool(5);
// ... 执行一系列任务
executor.shutdown();

  • STOP :调用 shutdownNow() 方法后变为STOP状态,不仅不接收新任务,还会尝试中断正在执行的任务,并且不会处理尚未开始执行的任务。
executor.shutdownNow(); // 立即停止所有正在运行的任务并拒绝后续任务

  • TIDYING :当所有的任务都已经终止并且workerCount(活动工作线程数)为0时,线程池将转换到TIDYING状态,并触发 terminated() 钩子方法。
  • TERMINATED terminated() 方法执行完毕后,线程池最终进入TERMINATED状态,表明线程池已经彻底关闭,无法再进行任何操作。

线程池状态的变化过程遵循严格的条件判断和转换逻辑,例如在任务执行流程中,添加任务或销毁线程时都会检查当前的runState。此外,线程池还通过ctl变量合并了workerCount(工作线程数量)和runState的信息,以原子方式更新线程池的整体状态。

下面是一个简单的示例,演示了如何监控线程池的状态变化:

public class ThreadPoolStatusMonitor {
    private final ThreadPoolExecutor executor;

    public ThreadPoolStatusMonitor(ThreadPoolExecutor executor) {
        this.executor = executor;
        executor.addThreadFactory(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("MonitoringThread");
                return thread;
            }
        });

        ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
        monitor.scheduleAtFixedRate(() -> {
            System.out.println("Current pool status: " + getStateString(executor));
            if (executor.isTerminated()) {
                monitor.shutdown();
            }
        }, 11, TimeUnit.SECONDS);
    }

    private String getStateString(ThreadPoolExecutor executor) {
        switch (executor.getRunState()) {
            case RUNNING:
                return "RUNNING";
            case SHUTDOWN:
                return "SHUTDOWN";
            case STOP:
                return "STOP";
            case TIDYING:
                return "TIDYING";
            case TERMINATED:
                return "TERMINATED";
            default:
                return "UNKNOWN";
        }
    }
}

// 使用示例:
ThreadPoolExecutor executor = new ThreadPoolExecutor(2460L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
new ThreadPoolStatusMonitor(executor);

// 添加一些任务...
executor.execute(() -> { /* 业务逻辑 */ });

// 后续调用 shutdown 或 shutdownNow 方法
executor.shutdown();

通过上述代码片段可以看到,我们创建了一个线程池状态监测器,定期打印线程池状态,并在检测到线程池终止后自动停止监测线程。这样可以直观地观察到线程池从创建到最终关闭整个生命周期内的状态变化情况。


线程池处理流程


线程池任务处理流程是ThreadPoolExecutor类的核心功能,其主要通过
execute(Runnable command)
方法实现。下面我们将深入剖析该方法内部的任务调度逻辑。

创建核心线程执行任务(corePoolSize)
当调用
execute()
方法提交任务时,首先检查当前活跃线程数是否小于核心线程数(corePoolSize)。如果是,则直接创建新的核心线程来执行这个新任务。核心线程在没有任务可执行时不会被销毁,除非设置了允许核心线程超时的选项。

if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true)) // 核心线程,并尝试添加到工作队列中
        return;
}

将任务添加到任务队列(workQueue)
如果当前活跃线程数不小于核心线程数,接下来会尝试将任务放入阻塞队列(workQueue)中等待空闲的核心线程去执行。在这个阶段,会进行两次线程池状态检查:一次是在入队前,另一次是在成功入队后。这是因为在多线程环境下,线程池的状态可能会随时发生变化,因此需要二次检查以确保任务能够在正确状态下被执行。

if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command)) // 检查状态并移除任务
        reject(command); // 执行拒绝策略
    else if (workerCountOf(recheck) == 0// 如果此时没有活动线程则创建非核心线程
        addWorker(nullfalse);
}

创建非核心线程执行任务(maximumPoolSize)
若任务无法放入任务队列,这可能是因为队列已满或线程池配置不允许放入更多任务。在这种情况下,线程池试图创建非核心线程来执行任务,但仅在总线程数未达到最大值(maximumPoolSize)的情况下才创建。

else if (!addWorker(command, false)) // 创建非核心线程执行任务
    reject(command); // 若创建失败则执行拒绝策略

拒绝策略
当线程池无法再接受新任务时(例如超过最大线程数且任务队列已满),则触发拒绝策略。Java提供了四种预定义的拒绝策略:

  • AbortPolicy:默认策略,抛出RejectedExecutionException异常。
  • DiscardPolicy:默默地丢弃任务,不做任何处理。
  • DiscardOldestPolicy:丢弃队列中最旧的任务(即最先加入队列的任务),然后重新尝试执行新任务。
  • CallerRunsPolicy:由调用线程执行被拒绝的任务。

总结整个处理流程

  1. 当线程数量不足corePoolSize时,优先创建核心线程执行任务。
  2. 线程数量满足corePoolSize时,将任务加入workQueue等待执行。
  3. workQueue已满且线程数量未达maximumPoolSize时,创建非核心线程执行任务。
  4. 若所有条件均无法接纳新任务,则根据设定的拒绝策略处理被拒绝的任务。

以下是一个简化的示例代码,展示如何使用线程池执行任务:

public static void main(String[] args) {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
        2// 核心线程数
        5// 最大线程数
        60L// 空闲线程存活时间
        TimeUnit.SECONDS,
        new LinkedBlockingQueue<>() // 使用无界链式阻塞队列
    );

    for (int i = 0; i < 10; i++) {
        Runnable task = () -> System.out.println("Task " + Thread.currentThread().getName() + " is running");
        executor.execute(task);
    }

    executor.shutdown(); // 提交完所有任务后关闭线程池
}

这段代码创建了一个线程池,并提交了10个任务,根据上述任务处理流程,线程池会按照合适的方式安排这些任务的执行。


线程复用机制原理


线程复用机制是Java线程池实现高效并发处理的核心技术之一,其主要通过
ThreadPoolExecutor
类中的
Worker
工作线程类来完成。
Worker
不仅实现了
Runnable
接口,还是一个封装了线程和任务队列交互的实体。

在创建线程池时,首先会创建一定数量的核心线程(corePoolSize),这些线程会一直存活在线程池中,即使没有任务执行,除非设置了允许核心线程超时策略。当有新任务提交到线程池时,首先尝试将任务分配给这些核心线程。如果所有核心线程都在忙碌,且任务队列非空,则新提交的任务会被放入阻塞队列等待执行。

Worker
类的构造函数初始化了一个与之关联的
Thread
对象,并将其自身作为该线程的任务,即当调用
t.start()
启动这个线程时,实际执行的是
Worker.run()
方法。在
run()
方法中,
Worker
会不断地从阻塞队列中获取新的任务来执行,这个过程如下:

final void runWorker(Worker w) {
    // 获取当前运行的线程以及初始任务
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;

    // 清除firstTask并解锁,以便执行后续任务
    w.firstTask = null;
    w.unlock(); // allow interrupts

    try {
        // 无限循环,直到线程池停止或worker退出
        while (task != null || (task = getTask()) != null) {
            // 加锁并检查线程池状态,若已关闭则中断线程
            w.lock();
            // ... 状态判断及中断操作

            try {
                // 执行前置钩子方法
                beforeExecute(wt, task);

                // 执行任务
                task.run();

                // 执行后置钩子方法
                afterExecute(task, null);
            } catch (...) { ... }

            // 更新已完成任务计数并解锁
            task = null;
            w.completedTasks++;
            w.unlock();
        }
    } finally {
        // 工作线程退出时进行资源清理
        processWorkerExit(w, completedAbruptly);
    }
}

getTask()
方法负责从阻塞队列中取出下一个待执行的任务。根据线程池配置,核心线程会使用
workQueue.take()
方法阻塞等待新任务;而非核心线程在keepAliveTime内未获得新任务时,会调用
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
方法尝试获取,超时后线程可能会被销毁。

下面是一个简化的示例代码片段,展示了如何利用线程池执行任务并实现线程复用:

public static void main(String[] args) {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
        5// 核心线程数
        10// 最大线程数
        60L// 空闲线程存活时间
        TimeUnit.SECONDS,
        new LinkedBlockingQueue<>() // 阻塞队列
    );

    for (int i = 0; i < 20; i++) {
        final int taskId = i;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("Task " + taskId + " running in thread: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000); // 模拟耗时任务
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
    }

    executor.shutdown(); // 提交完所有任务后关闭线程池
}

在这个例子中,线程池会根据需要创建并复用线程,每个任务都由线程池中的一个线程执行,任务完成后线程并不会立即销毁,而是继续从队列中获取下一个任务执行,从而达到复用的目的。同时,线程池内部管理确保了线程生命周期的合理控制,避免了频繁创建和销毁线程带来的开销。


总结


Java线程池原理的核心在于ThreadPoolExecutor类的实现,它通过合理管理线程生命周期、任务队列以及线程池状态来高效地执行并发任务。线程池利用核心线程和非核心线程复用机制,有效降低了系统资源消耗,控制了并发数量,并简化了线程的统一管理和异常处理。

首先,线程池通过构造方法设置参数如corePoolSize(核心线程数)、maximumPoolSize(最大线程数)和keepAliveTime(空闲线程存活时间),以及选择合适的阻塞队列workQueue(例如LinkedBlockingQueue、ArrayBlockingQueue或SynchronousQueue等)。通过这些参数,开发者可以根据应用需求灵活调整线程池的行为特性。

在处理任务时,
execute()
方法作为入口点,根据当前线程池状态和线程数决定如何调度任务:优先使用核心线程执行任务,当核心线程已满载时将任务放入阻塞队列;若阻塞队列也已满且线程总数未达到最大值,则创建非核心线程执行新任务;若超过最大线程数则采用预定义的拒绝策略(AbortPolicy、DiscardPolicy、DiscardOldestPolicy或CallerRunsPolicy)。

线程复用的关键在于Worker类的设计。每个Worker对象封装了一个Thread实例并实现了Runnable接口,其run()方法会持续从工作队列中获取任务并执行,实现了线程在完成一个任务后能够立即投入下一个任务的执行,从而避免了频繁创建和销毁线程带来的开销。

此外,线程池的状态机设计至关重要,包含RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED五个状态,分别对应不同的行为模式。例如,调用shutdown()方法后,线程池进入SHUTDOWN状态,不再接受新提交的任务但继续执行已在队列中的任务,直至所有任务执行完毕并通过terminated()方法转换为TERMINATED状态。

总之,在多线程编程中,Java线程池为我们提供了一种强大而灵活的工具,通过合理配置和管理线程池,不仅能有效提升程序性能,还能确保系统的稳定性和资源的有效利用。实际开发中,应当根据业务需求定制化线程池参数,并充分理解线程池的工作原理与任务调度逻辑,以便于编写出高并发、低资源占用的健壮代码。

示例代码:

// 创建固定大小的线程池,核心线程数等于最大线程数,无界任务队列
ExecutorService executor = Executors.newFixedThreadPool(5);

for (int i = 0; i < 10; i++) {
    final int taskId = i;
    Runnable task = () -> System.out.println("Task " + taskId + " executed by " + Thread.currentThread().getName());
    executor.execute(task);
}

executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); // 等待所有任务完成

// 示例展示了线程池如何接收多个任务并分配给线程执行,最终关闭线程池并确保所有任务都已完成。

这段代码展示了如何创建一个固定大小的线程池,并提交多个任务到线程池进行异步执行。通过调用
awaitTermination()
方法,主程序可以等待所有任务完成后再结束运行,确保了任务的正确完成和线程池的有序关闭。

本文使用
markdown.com.cn
排版

Agent是大模型的重要应用方向,而ReACT是学术界提出的重要方法,本文介绍ReACT论文,然后通过llama_index ReActAgent来分析ReACT的执行过程。

ReACT

《REACT: SYNERGIZING REASONING AND ACTING IN LANGUAGE MODELS》,由Shunyu Yao等人撰写,发表在2023年的ICLR会议上。论文探讨了如何在大型语言模型(LLMs)中结合推理(reasoning)和行动(acting)的能力,以提高模型在语言理解和交互式决策制定任务中的性能。

主要观点:

  • 大型语言模型(LLMs)在语言理解和交互式决策制定任务中表现出色,但它们的推理(例如链式思维提示)和行动(例如行动计划生成)能力通常被分开研究。
  • 作者提出了一种新的方法ReAct,它通过交错生成推理轨迹和特定于任务的行动,使两者之间产生更大的协同效应。推理轨迹帮助模型诱导、跟踪和更新行动计划,以及处理异常情况,而行动则允许它与外部资源(如知识库或环境)接口并收集额外信息。

具体步骤:

ReAct(Reasoning and Acting)是一种结合了推理(reasoning)和行动(acting)的方法,旨在提高大型语言模型(LLMs)在解决复杂任务时的性能和可解释性。ReAct的具体步骤如下:

  1. 定义任务和目标
    :首先,明确模型需要解决的任务和目标。这可能包括问题回答、事实验证、文本游戏或网页导航等。
  2. 生成推理轨迹
    :模型生成一系列推理轨迹,这些轨迹是模型内部的思考过程,用于解决问题。这些推理轨迹可能包括分解任务目标、提取重要信息、执行常识推理、指导搜索重构、跟踪进度和处理异常等。
  3. 执行行动
    :模型根据推理轨迹执行一系列行动。在ReAct中,行动是通过与外部环境(例如Wikipedia或其他知识库)的交互来完成的。行动可以是搜索信息、选择产品、选择选项或购买等。
  4. 交替推理和行动
    :在解决任务的过程中,模型会交替进行推理和行动。这意味着模型在执行每个行动后可能会生成新的推理轨迹,然后再执行新的行动,以此往复。
  5. 更新上下文
    :每次行动后,模型会更新上下文信息,这包括之前的行动、观察结果和新生成的推理轨迹。这种上下文更新帮助模型跟踪任务的当前状态,并为未来的推理和行动提供信息。
  6. 生成任务解决轨迹
    :通过上述步骤,模型生成一个包含行动、观察和推理轨迹的任务解决轨迹。这个轨迹不仅展示了模型如何解决问题,而且提供了模型决策的透明度,使得人类用户可以理解和评估模型的行为。
  7. 评估和调整
    :在实际应用中,模型生成的任务解决轨迹可能会被人类用户评估和调整。用户提供的反馈可以用来进一步指导模型的行为,或者在模型自身无法正确解决问题时进行干预。

ReAct的核心在于通过交错的推理和行动步骤,使模型能够在执行任务时动态地进行推理和行动,从而提高任务解决的准确性和效率。这种方法特别适用于需要与外部环境交互并从中获取信息以支持决策的任务。

以llama_index ReActAgent来看ReAct 执行过程

我们编写一个简单的ReActAgent程序,计算乘法,同样的没有openai的账号,我们用google的Gemini。

from llama_index.core.agent import ReActAgent  
from llama_index.core.tools import FunctionTool  
from llama_index.llms.gemini import Gemini  
  
  
# define sample Tool  
def multiply(a: int, b: int) -> int:  
    """Multiply two integers and returns the result integer"""  
    return a * b  
  
  
multiply_tool = FunctionTool.from_defaults(fn=multiply)  
  
# initialize llm  
llm = Gemini(api_key="AI...", transport="rest")  
  
# initialize ReAct agent  
agent = ReActAgent.from_tools([multiply_tool], llm=llm, verbose=True)  
  
resp = agent.query("计算85乘以9")  
  
print(resp.response)
  • 第一步,将tools放入prompts,要求LLM按照要求做Thought,然后出书符合要求的response:
[ChatMessage(role=<MessageRole.SYSTEM: 'system'>, content='\nYou are designed to help with a variety of tasks, from answering questions     to providing summaries to other types of analyses.\n\n## Tools\nYou have access to a wide variety of tools. You are responsible for using\nthe tools in any sequence you deem appropriate to complete the task at hand.\nThis may require breaking the task into subtasks and using different tools\nto complete each subtask.\n\nYou have access to the following tools:\n> Tool Name: multiply\nTool Description: multiply(a: int, b: int) -> int\nMultiply two integers and returns the result integer\nTool Args: {"type": "object", "properties": {"a": {"title": "A", "type": "integer"}, "b": {"title": "B", "type": "integer"}}, "required": ["a", "b"]}\n\n\n## Output Format\nPlease answer in the same language as the question and use the following format:\n\n```\nThought: The current language of the user is: (user\'s language). I need to use a tool to help me answer the question.\nAction: tool name (one of multiply) if using a tool.\nAction Input: the input to the tool, in a JSON format representing the kwargs (e.g. {"input": "hello world", "num_beams": 5})\n```\n\nPlease ALWAYS start with a Thought.\n\nPlease use a valid JSON format for the Action Input. Do NOT do this {\'input\': \'hello world\', \'num_beams\': 5}.\n\nIf this format is used, the user will respond in the following format:\n\n```\nObservation: tool response\n```\n\nYou should keep repeating the above format until you have enough information\nto answer the question without using any more tools. At that point, you MUST respond\nin the one of the following two formats:\n\n```\nThought: I can answer without using any more tools. I\'ll use the user\'s language to answer\nAnswer: [your answer here (In the same language as the user\'s question)]\n```\n\n```\nThought: I cannot answer the question with the provided tools.\nAnswer: [your answer here (In the same language as the user\'s question)]\n```\n\n## Current Conversation\nBelow is the current conversation consisting of interleaving human and assistant messages.\n\n', additional_kwargs={}), ChatMessage(role=<MessageRole.USER: 'user'>, content='计算85乘以9', additional_kwargs={})]

上述数据没有格式化,我们格式化后来看:

大体上就是让LLM做COT思考,然后根据用户当前的question,选择合适的tool进行计算。

  • LLM 思考后,回复:
assistant: Thought: The current language of the user is: chinese. I need to use a tool to help me answer the question.
Action: multiply
Action Input: {"a": 85, "b": 9}

可以看到,给出了Action的tool,以及action的input

  • llama_index 会对上面的输出进行解析
    process_actions
    ,得到:
[ActionReasoningStep(thought='The current language of the user is: chinese. I need to use a tool to help me answer the question.', action='multiply', action_input={'a': 85, 'b': 9}), ObservationReasoningStep(observation='765')]
  • 得到action step后,会进行执行,调用函数计算
cur_step_output = 765
  • 然后,开始next_step:
[TaskStep(task_id='1d8db1d0-f5e3-4bfa-bcef-8fc405f958ca', step_id='91e9a97a-5ae1-4900-bf77-690865976902', input=None, step_state={'is_first': False}, next_steps={}, prev_steps={}, is_ready=True)]
  • 来看step如何执行,同样的拼接prompt,
    更新上下文
    ,然后让大模型COT:
[ChatMessage(role=<MessageRole.SYSTEM: 'system'>, content='\nYou are designed to help with a variety of tasks, from answering questions     to providing summaries to other types of analyses.\n\n## Tools\nYou have access to a wide variety of tools. You are responsible for using\nthe tools in any sequence you deem appropriate to complete the task at hand.\nThis may require breaking the task into subtasks and using different tools\nto complete each subtask.\n\nYou have access to the following tools:\n> Tool Name: multiply\nTool Description: multiply(a: int, b: int) -> int\nMultiply two integers and returns the result integer\nTool Args: {"type": "object", "properties": {"a": {"title": "A", "type": "integer"}, "b": {"title": "B", "type": "integer"}}, "required": ["a", "b"]}\n\n\n## Output Format\nPlease answer in the same language as the question and use the following format:\n\n```\nThought: The current language of the user is: (user\'s language). I need to use a tool to help me answer the question.\nAction: tool name (one of multiply) if using a tool.\nAction Input: the input to the tool, in a JSON format representing the kwargs (e.g. {"input": "hello world", "num_beams": 5})\n```\n\nPlease ALWAYS start with a Thought.\n\nPlease use a valid JSON format for the Action Input. Do NOT do this {\'input\': \'hello world\', \'num_beams\': 5}.\n\nIf this format is used, the user will respond in the following format:\n\n```\nObservation: tool response\n```\n\nYou should keep repeating the above format until you have enough information\nto answer the question without using any more tools. At that point, you MUST respond\nin the one of the following two formats:\n\n```\nThought: I can answer without using any more tools. I\'ll use the user\'s language to answer\nAnswer: [your answer here (In the same language as the user\'s question)]\n```\n\n```\nThought: I cannot answer the question with the provided tools.\nAnswer: [your answer here (In the same language as the user\'s question)]\n```\n\n## Current Conversation\nBelow is the current conversation consisting of interleaving human and assistant messages.\n\n', additional_kwargs={}), ChatMessage(role=<MessageRole.USER: 'user'>, content='计算85乘以9', additional_kwargs={}), ChatMessage(role=<MessageRole.ASSISTANT: 'assistant'>, content="Thought: The current language of the user is: chinese. I need to use a tool to help me answer the question.\nAction: multiply\nAction Input: {'a': 85, 'b': 9}", additional_kwargs={}), ChatMessage(role=<MessageRole.USER: 'user'>, content='Observation: 765', additional_kwargs={})]
  • 大模型回复:
assistant: Thought: I can answer without using any more tools. I'll use the user's language to answer
Answer: 765
  • 上述的回复已经没有新的action了,因此解析不出来action,next_steps就为空了,计算结束,返回答案
  • 这里的计算比较简单,一步action就搞定了,对于需要多步action的,会按照下面的流程图,递归的执行,直到得到
    Final Answer
    • 思考Thought
    • 得到要执行的Action
    • 执行Action,得到观测结果 Observation
    • 将上下文,连同Observation,让大模型继续思考Thought
    • 直到没有Action,给出最后的Filnal Answer

数据库作为存储数据的组件,数据的一致性一定是要保证的前提,今天给出两个场景来分析数据不一致的原因。

binlog同步模式导致主从不一致

在MYSQL 中主库向从库同步数据是利用binlog记录修改操作,然后将binlog传递给从库进行复制,binlog的格式有3种,

row
在对update,delete,insert语句进行记录时会进行修改的行数据进行记录。
row
格式的坏处在于比较占用空间,比如更新十万行数据,那么
row
格式将会把10万数据记录下来。

statement
只会将原始的sql语句记录下来。但是这种格式可能会引起主备不一致。

mixed
是前面两种格式的混合,MYSQL会自己去判断这条sql是不是会造成主备不一致,将引起主备不一致的sql记录成
row
格式。

statement 为什么会主备不一致?

举一个例子来说明下,statement主备不一致的原因,例如下面的sql

update navigation.t_account set id = uuid();

当你使用 类似uuid或者now这种动态函数时,那么在主库的生成结果将会和从库不同。造成数据的主备不一致。

为什么大多数时候我们还是用row

大多时候,我们还是用
row
格式写入binlog,这样带来的好处是便于
恢复数据
,下面我举例说明下,

  • 当你执行错delete语句
    ,能够通过binlog日志找到删除行的所有字段信息,不过需要注意的是,需要将
    binlog_row_image
    参数设置为
    FULL
    ,才会记录所有字段信息,如果设置为
    MINIMAL
    则只会记录删除字段信息。
  • 当你执行错update语句
    ,通过binlog记录的修改前后的整行数据,对数据进行恢复。
  • 当你执行错insert语句
    ,能够通过binlog找到插入数据的id,对错误插入的数据进行删除。

所以,为了避免主从不一致,还是选用
row
格式记录binblog吧,或者至少还是选用
mixed

主备切换导致主从不一致

第二种主从不一致的场景是发生在主备切换时,我先直接说下结论,主备切换方式其实分可靠性优先方式与可用性优先方式。

可靠性优先方式,MYSQL服务可能会存在短暂的不可提供服务的时间段,可用性优先则是保证MYSQL在切换过程中都是可用的。

为了方便,下面我将主主数据库称为master,从数据库称为slave。

可靠性优先方式

1,判断slave是否已经
seconds_behind_master
,是否小于5s或更短,
seconds_behind_master
代表主从同步延迟的时间,如果小于5s,则继续下一步。

2,修改master的
readonly
参数为true, 将master变为只读状态。

3,判断slave的主从同步延迟是否变为0,即seconds_behind_master 等于0,等于0后,继续下一步。

4,修改 slave的
readonly
参数改为false,将slave变为可读可写状态。

5,将业务请求转发到slave,原先master,修改为新master的从库。

这个切换过程,数据库是有一段时间不可写的,必须等待slave主从延迟同步变为0以后才行,所以这也是为什么要在
seconds_behind_master
在一个较小的值才开始进行主备切换的原因。

可用性优先方式

接着看下保证可用性优先的主备切换方式,在上述主备切换步骤中,我们去掉第三个步骤,也就是不等到主从同步完成就去切换主备。

现在假设现在的binlog为
row
格式。表定义为

mysql> CREATE TABLE `t` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `c` int(11) unsigned DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB;

insert into t(c) values(1),(2),(3);

业务此时进行插入操作,

insert into t(c) values(4);
insert into t(c) values(5);

当在执行完第一个sql时,进行主备切换,且假设此时备库并没有完成第一条sql的同步。如下图所示,在插入4这条数据时,将slave改为可写,接着业务系统后续的写就往slave写入了5这条数据。
注意此时master的4这条数据还没有同步到slave。

image.png

接着开始准备更改主备关系,如下图所示,更改关系前,有可能slave才会进行来自master的4这条数据的写入,但是因为slave中已经有id为4的数据了,所以会导致插入失败。

image.png
修改slave为master后,插入到之前从库的(4,5)这条数据 会同步到新的slave主机(即旧master),但是这个时候也会因为旧master有id为4的这条数据导致同步失败。主备同步就会自动停止。

可以看到,最后主从数据库中有id等于4这条数据不一样。

所以,可用性优先的主备切换方式是有可能导致主备不一致的。

数据库最重要还是数据的正确性
,拿许多业务场景来说,如果数据错乱了,是较难恢复的,但是如果业务失败了,还可以通过重试重新填充数据,怕就怕成功一半,失败一半。所以主备切换的时候尽量还是可靠性优先方式比较好。

最后,

自荐一波✅:


欢迎朋友们关注我的公众号

〇、什么是协程 Coroutines ?

进程和线程太常见,本文就不再赘述了,直接一起看下什么是协程。如下图,先看下协程的定位:

关于
用户空间和内核空间
:进程运行起来就涉及到对内存资源的管理,然而内存资源有限,因此操作系统采用虚拟内存技术,把
进程虚拟地址空间划分成用户空间和内核空间
。所有
用户空间要访问硬件或执行 I/O 操作,必须经过内核进行操作
,这样无论再多的请求,也可保证了系统的高效稳定。

操作系统在
线程等待 IO 的时候,会阻塞当前线程,切换到其它线程
,这样在当前线程等待 IO 的过程中,其它线程可以继续执行。当系统线程较少的时候没有什么问题,但是
当线程数量非常多的时候,就会出问题

  • 一是,系统线程会
    占用非常多的内存空间
  • 二是,过多的
    线程切换会占用大量的系统时间

协程
刚好可以解决上述两个问题。协程运行在线程之上,当一个协程执行完成后,会主动让出,让另一个协程运行在当前线程之上。
协程并没有增加线程数量,只是在线程的基础之上通过分时复用的方式运行多个协程
,而且协程的切换在
用户态
完成,
切换的代价比线程从用户态到内核态的代价小很多

因此,在有
大量 I/O 操作业务
的情况下,我们采用协程替换线程,可以到达很好的效果,
一是降低了系统内存,二是减少了系统切换开销
,同时系统的性能也会提升。
对于计算密集型操作
,一般无需频繁切换线程,此时协程就是累赘,白白增加代码复杂度。

另外,在协程中尽量不要调用阻塞 I/O 的方法,比如打印,读取文件,Socket 接口等,除非改为异步调用的方式,并且协程只有在 I/O 密集型的任务中才会发挥作用。

详细可参考: https://zhuanlan.zhihu.com/p/172471249
https://zhuanlan.zhihu.com/p/337978321

一、Goroutine 是啥?

Goroutine 就是 Go 语言中的协程,也可以叫做轻量级线程,由 Go 运行时(runtime)管理
,它会智能地将 Goroutine 中的任务合理地分配给每个 CPU,这主要靠的是 Go 在语言层面已经
内置了调度和上下文切换
的机制。

Go 协程调度器有三个重要数据结构,简称为 GMP,这也是其并发调度的核心:

  • G(Goroutine):它是一个
    待执行的任务
    。Goroutine 可以快速创建和销毁,它们被设计用于执行并发任务。Goroutine 在逻辑上可以看作是一个函数或者一段代码,它们被放入队列中等待执行。
  • M(Machine):即操作系统的
    工作线程,是实际执行 Goroutine 的实体
    。每个 M 都与一个 P 绑定,从 P 的本地队列中获取 Goroutine 来执行。如果本地队列为空,M 会尝试从全局队列获取 Goroutine 或者从其他 P 的本地队列中“窃取”Goroutine 来执行。
  • P(Processor):表示处理器,它可以被看做
    运行在线程上的本地调度器
    。P 的数量通常与系统的 CPU 核心数相同,但这个数量可以通过环境变量来调整。P 负责管理 Goroutine 的调度,将 Goroutine 分配给与之绑定的 M 来执行。

大概的运行流程就是:当一个 M 线程可以执行任务时,它会首先检查与其关联的 P 的本地队列。如果本地队列为空,M 会尝试从全局队列中获取一批 Goroutine 放入本地队列,或者从其他 P 的本地队列中偷取一半 Goroutine 放入自己的本地队列。当 Goroutine 执行完成后,M 会从 P 获取下一个 Goroutine 继续执行。

单从线程调度讲,Go 语言相比起其他语言的优势在于,
操作系统(OS)线程是由 OS 内核来调度的,Goroutine 则是由 Go 运行时(runtime)自己的调度器调度的
,这个调度器使用一个称为 m:n 调度的技术(调度 m 个 Goroutine 到 n 个 OS 线程),多对多。其一大特点是
Goroutine 的调度是在用户空间下完成的,不涉及与内核空间之间的频繁切换
,包括内存的分配与释放,都是在用户空间维护的一块大的内存池中,不直接调用系统的 malloc 函数(malloc 函数用于动态分配内存,并返回分配内存的首地址),除非内存池需要改变,成本比调度 OS 线程低很多。另一方面充分利用了多核的硬件资源,近似的
把若干 Goroutine 均分在物理线程上, 再加上本身 Goroutine 的超轻量(初始版本为 2kb,后续升到 4kb、8kb,在 1.4 版本又减小到 2kb,不断优化)
,以上种种保证了 Go 调度方面的性能。

Go 语言中使用 Goroutine 非常简单,只需要
在调用函数的时候在前面加上 go 关键字
,就可以为一个函数创建一个 Goroutine。一个 Goroutine 必定对应一个函数,可以创建多个 Goroutine 去执行相同的函数。

下面将通过简单的示例看下如何创建 Goroutine。

1.1
启动单个 Goroutine 的示例

情况一:顺序执行,程序开始运行时开启一个 Goroutine,然后按照代码顺序逐个执行。

package main

import "fmt"

func hello() {
	fmt.Println("Hello Goroutine!")
}
func main() {
	fmt.Println("begin-------------------!")
	hello()
	fmt.Println("main goroutine done!")
	fmt.Println("end---------------------!")
}

输出结果:

情况二:执行 hello() 函数时,通过 go 关键字,另开一个 Goroutine,其后代码仍然在主协程中运行。

package main

import "fmt"

func hello() {
	fmt.Println("Hello Goroutine!")
}
func main() {
	fmt.Println("begin-------------------!")
	go hello()
	fmt.Println("main goroutine done!")
	fmt.Println("end---------------------!")
}

这一次的执行结果只打印了“main goroutine done!”,当 main() 函数返回的时候,该 Goroutine 就结束了,所有在 main() 函数中启动的 Goroutine 会一同结束,main 函数所在的 Goroutine 就像是权利的游戏中的夜王,其他的 Goroutine 都是异鬼,夜王一死它转化的那些异鬼也就全部 GG 了。

为了让分支协程能够正常输出,主协程需要等一等,就是下边的第三种情况。

情况三:在另开协程后,通过主协程等待一定时间,让副协程执行完成。

package main

import (
	"fmt"
	"time"
)

func hello() {
	fmt.Println("Hello Goroutine!")
}
func main() {
	fmt.Println("begin-------------------!")
	go hello()
	fmt.Println("main goroutine done!")
	time.Sleep(time.Second * 1)
	fmt.Println("end---------------------!")
}

可以看到,先打印了“main goroutine done!”,是因为我们在创建新的 Goroutine 的时候需要花时间,而此时 main 函数所在的 Goroutine 是继续执行的。

1.2
启动多个 Goroutine 的示例

如下示例中的 sync.WaitGroup 是为了实现 Goroutine 的同步,本文后续章节会另外详细介绍。

package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup

func hello(i int) {
	defer wg.Done() // Goroutine 结束就登记-1
	fmt.Println("Hello Goroutine!", i)
}
func main() {
	for i := 0; i < 5; i++ {
		wg.Add(1) // 启动一个 Goroutine 就登记+1
		go hello(i)
	}
	wg.Wait() // 等待所有登记的 Goroutine 都结束
}

多次执行上面的代码,会发现每次打印的数字的顺序都不一致。这是因为多个 Goroutine 是并发执行的,而 Goroutine 的调度是随机的。

1.3 关于 Goroutine 池

下面是一个代码示例:(目的是:随机生成一串数字,并计算一个数字的各个位数之和,例如数字 123,结果为 1+2+3=6)

注:由于示例中涉及管道等内容,本文后续章节会介绍。

package main

import (
	"fmt"
	"math/rand"
	"runtime"
)

type Job struct {
	Id      int // id
	RandNum int // 需要计算的随机数
}

type Result struct {
	job *Job // 这里必须传对象实例
	sum int  // 求和
}

func main() {
	// 需要2个管道
	jobChan := make(chan *Job, 128)       // 1.job管道
	resultChan := make(chan *Result, 128) // 2.结果管道
	createPool(10, jobChan, resultChan)   // 3.创建工作池,允许十个协程同时操作
	go func(resultChan chan *Result) {    // 4.开个打印的协程
		for result := range resultChan { // 监控结果管道,并实时打印
			fmt.Printf("job id:%v randnum:%v result:%d\n", result.job.Id,
				result.job.RandNum, result.sum)
		}
	}(resultChan)
	var id int
	for { // 循环创建job,输入到管道
		id++
		r_num := rand.Int() // 生成随机数
		job := &Job{
			Id:      id,
			RandNum: r_num,
		}
		jobChan <- job // 将新鲜的 Job 对象,填入管道
		if id == 10 {  // 仅输出前十个
			runtime.Goexit()
		}
	}
}

// 创建工作池  参数1:指定开几个协程
func createPool(num int, jobChan chan *Job, resultChan chan *Result) {
	for i := 0; i < num; i++ { // 根据开协程个数,去跑运行
		go func(jobChan chan *Job, resultChan chan *Result) {
			// 执行运算
			for job := range jobChan { // 遍历 job 管道所有数据,进行相加
				r_num := job.RandNum // 随机数接过来
				var sum int          // 定义返回值
				for r_num != 0 {     // 随机数逐位相加
					tmp := r_num % 10
					sum += tmp
					r_num /= 10
				}
				r := &Result{ // 结果 Result
					job: job,
					sum: sum,
				}
				resultChan <- r //运算结果填入结果管道
			}
		}(jobChan, resultChan)
	}
}

输出结果:

1.4 主协程退出后,其他全部副协程也将中断

如下示例代码,在后边的主协程退出后,由于副协程睡的时间(5秒)超过的主协程运行时间(2秒),“Sleep-end”将不会被打印:

package main

import (
	"fmt"
	"time"
)

func main() {
	fmt.Println("begin-------------------!")
	go func() { // 直接在 main 中写函数体
		i := 0
		for {
			i++
			fmt.Printf("new goroutine: i = %d\n", i)
			time.Sleep(time.Second * 5)
			fmt.Println("new goroutine: Sleep-end")
		}
	}()
	i := 0
	for {
		i++
		fmt.Printf("main goroutine: i = %d\n", i)
		time.Sleep(time.Second)
		if i == 2 {
			break
		}
	}
	fmt.Println("end---------------------!")
}

在 Go 语言中,主协程(main Goroutine)的退出通常会导致程序的终止,即使还有其他的 Goroutine 正在运行。这是因为 Go 语言的设计是为了让程序的生命周期与主协程的生命周期绑定。当主协程结束时,它不会等待其他 Goroutine 完成任务,因此程序会直接退出,除非有额外的同步机制(比如:sync.WaitGroup)来确保主协程等待其他 Goroutine 完成。

二、runtime 包

2.1 runtime.Gosched() 切换到下一个待执行的 Goroutine

由于 Goroutine 的调度是由 Go 运行时管理的,因此在某些情况下,一个
Goroutine 可能会长时间占用 CPU 资源
,导致其他 Goroutine 无法得到足够的执行时间。

为了解决这个问题,Go 语言提供了 runtime.Gosched() 函数。
当调用该函数时,当前 Goroutine 会主动放弃 CPU 的控制权,并将执行权交给其他正在等待的 Goroutine。
这样,其他 Goroutine 就可以获得更多的执行时间,从而提高程序的整体性能。

需要注意的是,
runtime.Gosched() 函数只是让出 CPU 的控制权,并不会阻塞当前 Goroutine 的执行
。也就是说,即使调用了 runtime.Gosched(),当前 Goroutine 仍然会继续执行下去,直到完成其任务或遇到阻塞操作。

下面是一个简单的示例代码:

package main

import (
	"fmt"
	"runtime"
)

func main() {
	fmt.Println("begin-----------------!")
	go func(s string) { // 开启副协程
		for i := 0; i < 2; i++ {
			fmt.Println(s)
		}
	}("new Goroutine!")
	for i := 0; i < 2; i++ { // 主协程
		runtime.Gosched() // 切换到下一个 Goroutine,当前 Goroutine 进入等待状态
		fmt.Println("main Goroutine!")
	}
	fmt.Println("end-------------------!")
}

由输出结果可知,副协程先执行,主协程暂停后最终也执行了。

2.2 runtime.Goexit() 终止当前 Goroutine

在某些情况下,一个 Goroutine 可能会遇到错误或异常情况,需要立即停止执行并释放资源。这时,可以使用 runtime.Goexit() 函数来终止当前 Goroutine 的执行。

当调用 runtime.Goexit() 函数时,当前 Goroutine 会立即退出,不再执行后续的代码。同时,该函数还会将当前 Goroutine 的状态设置为“已结束”,并将其从调度队列中移除。这样,其他正在等待的 Goroutine 就可以继续执行,而不需要等待已经出现异常的 Goroutine 完成。

需要注意的是,runtime.Goexit() 函数只会终止当前 Goroutine 的执行,并不会关闭整个程序。也就是说,如果还有其他正在运行的 Goroutine,它们仍然会继续执行直到完成或遇到阻塞操作。

下面是一个简单的示例代码:

package main

import (
	"fmt"
	"runtime"
)

func main() {
	fmt.Println("begin-----------------!")
	go func() {
		defer fmt.Println("A.defer") // defer:延迟执行,等协程退出当前代码块时执行
		fmt.Println("A")
		func() {
			defer fmt.Println("B.defer")
			fmt.Println("B")
			runtime.Goexit() // 结束协程
			defer fmt.Println("C.defer")
			fmt.Println("C")
		}()
	}()
	for { // 阻塞主协程,等待副协程运行完成
	}
}

由输出结果可知,在函数 runtime.Goexit() 之前的 A 和 B 都正常输出了,结束协程后的 C 未输出。

2.3 runtime.GOMAXPROCS() 设置程序运行时可以使用的最大 CPU 核心数

在某些情况下,一个 Goroutine 可能会长时间占用 CPU 资源,导致其他 Goroutine 无法得到足够的执行时间。为了解决这个问题,Go 语言提供了 runtime.GOMAXPROCS 参数。当设置该变量的值时,Go 运行时会限制同时运行的 Goroutine 数量,使其不超过指定的 CPU 核心数。这样,每个 CPU 核心都可以被充分利用,从而提高程序的整体性能。

需要注意的是,runtime.GOMAXPROCS 变量的值应该根据实际的硬件配置和程序需求来设置。如果设置得过高,可能会导致系统资源的浪费;如果设置得过低,则可能无法充分发挥多核处理器的性能优势。

Go1.5 版本之前,默认使用的是单核心执行,从 Go1.5 版本开始,默认使用全部的 CPU 逻辑核心数,但都手动配置。

下面是一个简单的示例代码:

package main

import (
	"fmt"
	"runtime"
	"time"
)

func a() {
	fmt.Println("begin--a!")
	for i := 1; i < 5; i++ {
		fmt.Println("A:", i)
	}
	fmt.Println("end--a!")
}

func b() {
	fmt.Println("begin--b!")
	for i := 1; i < 5; i++ {
		fmt.Println("B:", i)
	}
	fmt.Println("end--b!")
}

func main() {
	fmt.Println("begin-----------------!")
	runtime.GOMAXPROCS(1) // 通过配置数字,允许多个系统线程处理操作
	go a()
	go b()
	time.Sleep(time.Second * 11)
	fmt.Println("end-------------------!")
}

配置为 1 时的输出示例:(当 runtime.GOMAXPROCS 为 1 时,方法 a() 和 b() 是串行的,顺序执行)

配置为 2 时的输出示例:(当 runtime.GOMAXPROCS 为 2 时,方法 a() 和 b() 是并行的,同时开始执行)

注意:并行执行有时候不会生效。由于调度器的优化、操作系统的多任务处理能力以及运行时环境的调度策略,仍然可能观察到看似“按顺序”执行的输出。这是因为虽然指令并行发射和执行,但打印操作(fmt.Println)本身是同步的,并且多个 Goroutine 的输出可能在内部缓冲,然后以某种顺序刷新到标准输出。

三、Channel 通道

如果使用共享内存进行数据交换,那么共享内存存在不同的 Goroutine 中容易发生竞态问题。为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。

Go 语言的并发模型是 CSP(Communicating Sequential Processes 通信顺序进程),提倡
通过通信
共享内存,而
不是通过共享内存
而实现通信。
CSP 是一种并发编程模型,它强调了并发实体之间的通信顺序,而不是它们执行的具体时序。

关于 CSP:
在 CSP 模型中,进程是并发执行的基本单位,每个进程都有自己的独立执行序列。进程之间不共享内存,而是通过通道进行通信。通道是一种特殊的数据结构,用于在进程之间传递消息。进程通过发送和接收操作与通道进行交互,从而实现数据的交换和同步。CSP模型提供了一种清晰、简洁的并发编程范式,适用于构建各种类型的并发系统。它强调了并发实体之间的通信顺序,而不是它们执行的具体时序,这有助于提高并发程序的正确性和可靠性。

如果说 Goroutine 是 Go 程序并发的执行体,Channel 就是它们之间的连接。Channel 是可以让一个 Goroutine 发送特定值到另一个 Goroutine 的通信机制。

Go 语言中的通道(Channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明 Channel 的时候需要为其指定元素类型。

3.1 声明和创建 Channel

Channel 是一种类型,一种引用类型,空值是 nil。声明通道类型的格式如下:

// 声明
var 变量 chan 元素类型
// 示例
var ch1 chan int   // 声明一个传递整型的通道
var ch2 chan bool  // 声明一个传递布尔型的通道
var ch3 chan []int // 声明一个传递 int切片 的通道

fmt.Println(ch1) // 输出:<nil>

声明的通道后需要使用 make 函数初始化之后才能使用。

创建 Channel 的格式如下:

make(chan Type, value)

其中,Type 表示要传递的数据类型,value 表示 Channel 的缓冲区大小。如果不指定缓冲区大小,默认为 0,表示非缓冲通道,这个本文后续章节会详细介绍。

// 创建通道的几个示例:
ch1 := make(chan int) // 无缓冲
ch2 := make(chan int, 2) // 缓冲区为 2
ch3 := make(chan bool)
ch4 := make(chan []int)

3.2 Channel 的操作 send、receive、close

通道有发送(send)、接收(receive)和关闭(close)三种操作。

发送和接收都使用 <- 符号。

// 创建 int 类型的通道
ch := make(chan int)
// send 发送数据进入通道
ch <- 10 // 把 10 发送到 ch 中
// receive 接收通道中的数据
x := <- ch // 从 ch 中接收值,并赋值给变量 x
<-ch       // 从 ch 中接收值,忽略结果
// close 关闭通道,是指关闭了入口,取值不受影响
close(ch)

关于关闭通道,需要注意的是,只有在通知接收方 Goroutine 所有的数据都发送完毕的时候才需要关闭通道。通道是可以被垃圾回收机制回收的,所以关闭通道不是必须的。

关闭后的通道有以下特点:

  • 对一个关闭的通道再发送值就会导致 panic;
  • 对一个关闭的通道进行接收,会一直获取值,直到通道为空;
  • 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值;
  • 关闭一个已经关闭的通道会导致panic。

下面是一个
关闭通道
的示例代码:

package main

import (
	"fmt"
	"time"
)

func recv(c chan int) {
	for {
		ret := <-c
		time.Sleep(time.Second) // 一秒钟接收一个值
		fmt.Println("接收成功", ret)
	}
}
func main() {
	fmt.Println("begin-----------------!")
	ch := make(chan int, 5)
	go recv(ch) // 启用goroutine从通道接收值
	for i := 1; i < 6; i++ {
		ch <- i
	}
	fmt.Println("发送成功")
	time.Sleep(time.Second * 2) // 两秒钟后关闭通道 ch
	close(ch)
	time.Sleep(time.Second * 6)
	fmt.Println("end-------------------!")
}

由输出结果可知,在关闭通道后,接收方仍可以从通道内取值,当通道中没值时,返回零值。

3.3 Channel 通道的有缓冲和无缓冲

3.3.1 无缓冲的通道

无缓冲的通道又称为阻塞的通道。

func main() {
    ch := make(chan int)
    ch <- 10
    fmt.Println("发送成功")
}

上面这段代码能够通过编译,但是执行的时候会出现以下错误:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
        .../main.go:9 +0x28
exit status 2

为什么会报错?因为上边使用 ch := make(chan int) 创建的是无缓冲的通道,只有在有对象接收值的时候才能发送值。就像快递员给你送大件的货,是必须要有人接收的,家里没人的话他就不送了。

由于代码中只有发送没有接收,所以在 ch <- 10 这一行代码形成死锁。

若想不发生死锁,就得再开一个协程来接收通道的值:

package main

import (
	"fmt"
	"time"
)

func recv(c chan int) {
	ret := <-c
	time.Sleep(time.Second) // 模拟耗时操作
	fmt.Println("接收成功", ret)
}
func main() {
	fmt.Println("begin-----------------!")
	ch := make(chan int)
	go recv(ch) // 启用 Goroutine 从通道接收值
	ch <- 10
	fmt.Println("发送成功")
	time.Sleep(time.Second * 2) // 等待副协程操作完成
	fmt.Println("end-------------------!")
}

无缓冲的通道如下图,
在管道的两端必须一个发送,一个接收

无缓冲通道上的发送操作会阻塞,直到另一个 Goroutine 在该通道上执行接收操作,这时值才能发送成功,两个 Goroutine 将继续执行。相反,如果接收操作先执行,接收方的 Goroutine 将阻塞,直到另一个 Goroutine 在该通道上发送一个值。

使用无缓冲通道进行通信将导致发送和接收的goroutine同步化。因此,无缓冲通道也被称为同步通道。

3.3.2 有缓冲的通道

在无解暂时无接收方的时候,要想不报错就可以创建有缓冲的通道。

在使用 make 函数初始化通道的时候为其指定通道的容量,例如:

package main

import (
	"fmt"
)

func main() {
	fmt.Println("begin-----------------!")
	ch := make(chan int, 1) // 创建一个容量为 1 的有缓冲区通道
	ch <- 10
	fmt.Println("发送成功")
	fmt.Println("end-------------------!")
}

只要通道的容量大于零,那么该通道就是有缓冲的通道,通道的容量表示通道中能存放元素的数量。

另外,可以使用内置的 len 函数获取通道内元素的数量,使用 cap 函数获取通道的容量。

package main

import (
	"fmt"
)

func main() {
	fmt.Println("begin-----------------!")
	ch := make(chan int, 2) // 创建一个容量为 2 的有缓冲区通道
	ch <- 10
	fmt.Println("len:", len(ch))
	fmt.Println("cap:", cap(ch))
	fmt.Println("end-------------------!")
}

3.4 判断通道是否已经关闭

当通道已经关闭,再取值时,会返回零值,若程序仍然按照正常的流程去处理就会有问题,所以,就需要添加判断,来实时了解通道是否已经关闭。

下面是一段示例代码,其中包含了两种判断的方法:

package main

import (
	"fmt"
)

func main() {
	fmt.Println("begin-----------------!")
	ch1 := make(chan int)
	ch2 := make(chan int)
	// 开启 Goroutine 将 0~5 的数发送到 ch1 中
	go func() {
		for i := 0; i < 6; i++ {
			ch1 <- i
		}
		close(ch1)
	}()
	// 开启 Goroutine 从 ch1 中接收值,并将该值的平方发送到 ch2 中
	go func() {
		for {
			i, ok := <-ch1 // 【方法一】通道关闭后再取值 ok=false
			fmt.Println(ok)
			if !ok {
				break
			}
			ch2 <- i * i
		}
		close(ch2)
	}()
	// 在主 Goroutine 中从 ch2 中接收值打印
	for i := range ch2 { // 【方法二】通道关闭后会退出 for range 循环
		fmt.Println(i)
	}
	fmt.Println("end-------------------!")
}

其中 for range 方式更加常用。

3.5 单向通道

有的时候我们会
将通道作为参数
在多个任务函数间传递,很多时候我们在不同的任务函数中使用通道都会对其进行限制,比如限制通道在函数中只能发送或只能接收。Go 语言中提供了单向通道来处理这种情况。

下面是一段的示例代码:

package main

import (
	"fmt"
)

// chan<- int 是一个只能发送的通道,可以发送但是不能接收
func counter(out chan<- int) {
	for i := 0; i < 5; i++ {
		out <- i
	}
	close(out)
}
// <-chan int 是一个只能接收的通道,可以接收但是不能发送
func squarer(out chan<- int, in <-chan int) {
	for i := range in {
		out <- i * i
	}
	close(out)
}
func printer(in <-chan int) { // <-chan int 是一个只能接收的通道,可以接收但是不能发送
	for i := range in {
		fmt.Println(i)
	}
}

func main() {
	fmt.Println("begin-----------------!")
	ch1 := make(chan int)
	ch2 := make(chan int)
	go counter(ch1)
	go squarer(ch2, ch1)
	printer(ch2)
	fmt.Println("end-------------------!")
}

在函数传参及任何赋值操作中,将双向通道转换为单向通道是可以的,但反过来是不可以的。

3.6 通道操作各种情况汇总

四、并发安全和锁

4.1 并发操作导致数据问题的出现

有时候在 Go 代码中可能会存在多个 Goroutine 同时操作一个资源(临界区),这种情况会发生竞态问题(数据竞态)。

下面是一段示例代码,新分配两个副协程一起执行 add() 函数:

package main

import (
	"fmt"
	"sync"
)

var x int64
var wg sync.WaitGroup

func add() {
	for i := 0; i < 5000; i++ {
		x = x + 1
	}
	wg.Done()
}
func main() {
	wg.Add(2)
	go add() // 副协程一
	go add() // 副协程二
	wg.Wait()
	fmt.Println(x)
}

执行结果:

上述代码执行了三次,每次的结果都不同,明显已经偏离了目标。

在 add() 函数中,对全局变量 x 的访问和修改没有进行同步保护,这可能导致竞态条件(race condition)。当两个 Goroutine 同时访问和修改 x 时,它们的操作可能会相互干扰,导致结果不正确。如何解决呢,下面来介绍下互斥锁。

4.2 互斥锁

互斥锁是一种常用的控制共享资源访问的方法,它能够保证同一时间,仅有一个 Goroutine 可以访问共享资源,其他 Goroutine 等待。当有多个 Goroutine 同时等待一个锁时,唤醒的策略是随机的。

Go 语言中的互斥锁(Mutex)是一种同步原语,用于保护共享资源的访问,可以使用 sync.Mutex 结构体来实现互斥锁。

下面代码就是对 4.1 中代码的优化,添加了 sync.Mutex 互斥锁:

package main

import (
	"fmt"
	"sync"
)

var x int64
var wg sync.WaitGroup
var mutex sync.Mutex // 互斥锁的声明

func add() {
	for i := 0; i < 5000; i++ {
		mutex.Lock() // 加锁
		x = x + 1
		mutex.Unlock() // 解锁
	}
	wg.Done()
}
func main() {
	wg.Add(2)
	go add()
	go add()
	wg.Wait()
	fmt.Println(x)
}

输出结果始终为正确数值:

4.3 互斥锁的另一种情况:读写锁

互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。读写锁在 Go 语言中使用 sync.RWMutex 类型。

读写锁分为两种:读锁和写锁。

  • 当一个 Goroutine 获取
    读锁之后
    ,其他的 Goroutine 如果是获取读锁会继续获得锁,如果是
    获取写锁就会等待
  • 当一个 Goroutine 获取
    写锁之后
    ,其他的 Goroutine
    无论是获取读锁还是写锁都会等待

下面是一个可以操作互斥锁和读写锁操作效率对比的示例代码:

package main

import (
	"fmt"
	"sync"
	"time"
)

var (
	x       int64
	wg      sync.WaitGroup
	mutex   sync.Mutex
	rwmutex sync.RWMutex
)

func write() {
	// mutex.Lock()   // 加互斥锁
	rwmutex.Lock() // 加写锁
	x = x + 1
	time.Sleep(10 * time.Millisecond) // 假设写操作耗时 10 毫秒
	rwmutex.Unlock()                  // 解写锁
	// mutex.Unlock()                     // 解互斥锁
	wg.Done()
}

func read() {
	// mutex.Lock()                  // 加互斥锁
	rwmutex.RLock()              // 加读锁
	time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒
	rwmutex.RUnlock()            // 解读锁
	// mutex.Unlock()                // 解互斥锁
	wg.Done()
}

func main() {
	start := time.Now()
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go write()
	}
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go read()
	}
	wg.Wait()
	end := time.Now()
	// fmt.Println("互斥锁---------:",end.Sub(start))
	fmt.Println("互斥锁-读写锁--:", end.Sub(start))
}

如下图,分别是用互斥锁和读写锁的时效,明显看出,当读多写少(1000:10)时,效果还是比较明显的:

特别注意,读写锁非常适合
读多写少
的场景,如果
读和写的操作差别不大,读写锁的优势就发挥不出来

五、sync 包

5.1 sync.WaitGroup:等待一组 Goroutine 执行完毕

在代码中生硬的使用 time.Sleep 肯定是不合适的,Go 语言中可以使用 sync.WaitGroup 来实现并发任务的同步。

sync.WaitGroup有以下几个方法:

方法名 功能
(wg * WaitGroup) Add(delta int) 计数器 +delta
(wg *WaitGroup) Done() 计数器 -1
(wg *WaitGroup) Wait() 阻塞直到计数器变为 0

sync.WaitGroup 内部维护着一个计数器,计数器的值可以增加和减少。例如当我们启动了 N 个并发任务时,就将计数器值增加 N。每个任务完成时通过调用 Done() 方法将计数器减 1。通过调用 Wait() 来等待并发任务执行完,当计数器值为 0 时,表示所有并发任务已经全部完成。

下面是一段示例代码:

package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup

func hello() {
	defer wg.Done() // 执行完后续代码后,计数器 -1
	fmt.Println("Hello Goroutine!")
}
func main() {
	wg.Add(1)  // 计数器 +1
	go hello() // 启动另外一个 Goroutine 去执行 hello() 函数
	fmt.Println("main Goroutine Done!")
	wg.Wait() // 等待计数器为 0
}

另外,要注意 sync.WaitGroup 是一个结构体,传递的时候要传递指针。

5.2 sync.Once:使得指定函数在并发环境下仅执行一次

在编程的很多场景下,我们需要确保某些操作在高并发的场景下只执行一次,例如只加载一次配置文件、连接数据库、只关闭一次通道等。

Go 语言中的 sync 包中提供了一个针对只执行一次场景的解决方案,就是:sync.Once。

sync.Once 只有一个 Do 方法,其签名如下:

func (o *Once) Do(f func()) {}

延迟执行一个开销很大的初始化操作,到真正用到它的时候再执行,是一个很好的思路。因为预先初始化一个变量(比如在 init 函数中完成初始化)会增加程序的启动耗时,而且有可能实际执行过程中这个变量没有用上,那么这个初始化操作就是非必须的。

下面是一个加载配置文件的示例代码:

package main

import (
	"encoding/json"
	"fmt"
	"io"
	"os"
	"sync"
)

var once sync.Once
var config map[string]string
var wait sync.WaitGroup

func main() {
	fmt.Println("begin-----------------!")
	for i := 0; i < 5; i++ {
		wait.Add(1)
		go loadConfig()
	}
	wait.Wait()
	fmt.Println("end-------------------!")
}

func loadConfig() {
	fmt.Println("尝试加载配置文件")
	once.Do(func() {
		file, err := os.Open("config.json") // 配置文件和 main.go 同目录,直接读取
		if err != nil {
			panic(err)
		}
		defer file.Close()

		bytes, err := io.ReadAll(file)
		if err != nil {
			panic(err)
		}
		config = make(map[string]string) // 声明保存配置文件的内容,键值对形式
		err = json.Unmarshal(bytes, &config)
		fmt.Println(config)
		fmt.Println("配置文件已加载")
		if err != nil {
			panic(err)
		}
	})
	wait.Done()
}

json 配置文件的内容示例:

{"key1":"value1","key2":"value2","key3":"value3"}

输出结果:(由结果可知,尽管有多次尝试加载配置文件,但是最后还是只加载了一次)

sync.Once 其实
内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成
。这样设计就能保证初始化操作的时候是并发安全的,并且初始化操作也不会被执行多次。

5.3 sync.Map:并发安全的无序键值对 map

Go 语言中内置的 map(map 是一种无序的键值对数据结构) 不是并发安全的。

请看下面关于
map 并发测试报异常
的示例:

package main

import (
	"fmt"
	"strconv"
	"sync"
)

var m = make(map[string]int)

func get(key string) int {
	return m[key]
}

func set(key string, value int) {
	m[key] = value
}

func main() {
	fmt.Println("begin-----------------!")
	wg := sync.WaitGroup{}
	for i := 0; i < 3; i++ {
		wg.Add(1)
		go func(n int) {
			key := strconv.Itoa(n)
			set(key, n)
			fmt.Printf("k=:%v,v:=%v\n", key, get(key))
			wg.Done()
		}(i)
	}
	wg.Wait()
	fmt.Println("end-------------------!")
}

如下输出结果:(当同时开三个协程时,有时可以正常输出,有时就报错了)

像上边这种场景下,就需要为 map 加锁来保证并发的安全性了,Go 语言的 sync 包中提供了一个
开箱即用的并发安全版 map:sync.Map
。开箱即用表示
不用像内置的 map 一样使用 make 函数初始化才能使用

同时 sync.Map 内置了必要的操作方法和属性,如下:

  • Store(key, value interface{}):
    存储
    键值对。如果键已经存在,更新其对应的值;
  • Load(key interface{}) (value interface{}, ok bool):
    获取
    键对应的值。如果键不存在,返回 nil 和一个布尔值 false;如果键存在,返回键对应的值和一个布尔值 true;
  • Delete(key interface{}):
    删除
    键及其对应的值;
  • Range(f func(key, value interface{}) bool):
    遍历
    ,对于每个键值对,调用传入的函数 f。如果函数返回 false,停止遍历;
  • Len() int:
    键值对的数量

下面是运用 sync.Map 对上部分代码的优化,确保了程序的稳定运行:

package main

import (
	"fmt"
	"strconv"
	"sync"
)

var m = sync.Map{}

func main() {
	fmt.Println("begin-----------------!")
	wg := sync.WaitGroup{}
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func(n int) {
			key := strconv.Itoa(n)
			m.Store(key, n)         // Store 存储
			value, _ := m.Load(key) // Load 获取
			fmt.Printf("k=:%v,v:=%v\n", key, value)
			wg.Done()
		}(i)
	}
	wg.Wait()
	fmt.Println("end-------------------!")
}

参考:
https://www.topgoer.com/%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B/

简介

egui
(发音为“e-gooey”)是一个简单、快速且高度可移植的 Rust
即时模式
GUI 库,跨平台、Rust原生,适合一些小工具和游戏引擎GUI:
文档:
https://docs.rs/egui/latest/egui/
演示:
https://www.egui.rs/#demo
github:
https://github.com/emilk/egui

关于即时模式GUI,可以参考
使用C++界面框架ImGUI开发一个简单程序
里面的介绍,
ImGUI
是C++的一个即时模式GUI库。

image

简单示例

创建项目

首先使用cargo工具快速构建项目:

cargo new eguitest

然后添加依赖:

cargo add eframe

egui只是一个图形库
,而不是图形界面开发框架,
eframe是与egui配套使用的图形框架

为了静态插入图片,还需要增加
egui_extras
依赖:

cargo add egui_extras

然后在
Cargo.toml
文件中编辑
features

egui_extras = { version = "0.26.2", features = ["all_loaders"] }

界面设计

打开src/main.rc,编写第一个eframe示例程序:

//隐藏Windows上的控制台窗口
#![windows_subsystem = "windows"]

use eframe::egui;

fn main() -> Result<(), eframe::Error> {
    // 创建视口选项,设置视口的内部大小为320x240像素
    let options = eframe::NativeOptions {
        viewport: egui::ViewportBuilder::default().with_inner_size([320.0, 240.0]),
        ..Default::default()
    };

    // 运行egui应用程序
    eframe::run_native(
        "My egui App", // 应用程序的标题
        options, // 视口选项
        Box::new(|cc| {
            // 为我们提供图像支持
            egui_extras::install_image_loaders(&cc.egui_ctx);
            // 创建并返回一个实现了eframe::App trait的对象
            Box::new(MyApp::new(cc))
        }),
    )
}

//定义 MyApp 结构体
struct MyApp {
    name: String,
    age: u32,
}

//MyApp 结构体 new 函数
impl MyApp {
    fn new(cc: &eframe::CreationContext<'_>) -> Self {        
        // 结构体赋初值
        Self {
            name: "Arthur".to_owned(),
            age: 42,
        }
    }
}

//实现 eframe::App trait 
impl eframe::App for MyApp {
    fn update(&mut self, ctx: &egui::Context, _frame: &mut eframe::Frame) {
        // 在中央面板上显示egui界面
        egui::CentralPanel::default().show(ctx, |ui| {
            // 显示标题
            ui.heading("My egui Application"); 
            // 创建一个水平布局
            ui.horizontal(|ui| {
                // 显示姓名标签
                let name_label = ui.label("Your name: "); 
                // 显示姓名输入框(单行文本框)
                ui.text_edit_singleline(&mut self.name) 
                    .labelled_by(name_label.id); // 关联标签
            });

            // 显示年龄滑块
            ui.add(egui::Slider::new(&mut self.age, 0..=120).text("age")); 

            if ui.button("Increment").clicked() {
                // 点击按钮后将年龄加1
                self.age += 1;
            }

            // 显示问候语
            ui.label(format!("Hello '{}', age {}", self.name, self.age));            
            // 显示图片,图片放在main.rs的同级目录下(可以自定义到其它目录)
            ui.image(egui::include_image!("ferris.png")); 
        });
    }
}

运行结果如下:
image

切换主题

egui提供了明亮、暗黄两种主题,在APP结构体上添加
theme_switcher
方法:

impl MyApp {
    // 切换主题
    fn theme_switcher(&mut self, ui: &mut egui::Ui, ctx: &egui::Context) {
        ui.horizontal(|ui| {
            if ui.button("Dark").clicked() {
                ctx.set_visuals(egui::Visuals::dark());
            }
            if ui.button("Light").clicked() {
                ctx.set_visuals(egui::Visuals::light());
            }
        });
    }
}

然后在update函数中调用:

egui::CentralPanel::default().show(ctx, |ui| {
   //...
   // 切换主题
   self.theme_switcher(ui, ctx);
   // 显示图片
   ui.image(egui::include_image!("ferris.png")); 
});

egui的
Style
结构体可以自定义主题,不过一般默认主题就够用了。

自定义字体

egui默认不支持中文,实现一个
setup_custom_fonts
函数:

//自定义字体
fn setup_custom_fonts(ctx: &egui::Context) {
    // 创建一个默认的字体定义对象
    let mut fonts = egui::FontDefinitions::default();

    //安装的字体支持.ttf和.otf文件
    //文件放在main.rs的同级目录下(可以自定义到其它目录)
    fonts.font_data.insert(
        "my_font".to_owned(),
        egui::FontData::from_static(include_bytes!(
            "msyh.ttc"  
        )),
    );

    // 将字体添加到 Proportional 字体族的第一个位置
    fonts
        .families
        .entry(egui::FontFamily::Proportional)
        .or_default()
        .insert(0, "my_font".to_owned());

    // 将字体添加到 Monospace 字体族的末尾
    fonts
        .families
        .entry(egui::FontFamily::Monospace)
        .or_default()
        .push("my_font".to_owned());

    // 将加载的字体设置到 egui 的上下文中
    ctx.set_fonts(fonts);
}

然后再MyApp结构体的new方法中调用:

//...
impl MyApp {
    fn new(cc: &eframe::CreationContext<'_>) -> Self {
        //加载自定义字体
        setup_custom_fonts(&cc.egui_ctx);     
        //...
    }
}
//...

运行结果:
image

自定义图标

先导入image库,在终端中运行:

cargo add image

还需要导入std::sync::Arc、eframe::egui::IconData ,库引入区如下:

use eframe::egui;
use eframe::egui::IconData;
use std::sync::Arc;
use image;

在main()函数中将
native_options
的声明改为可变变量的声明,并加入改变图标代码:

fn main() -> Result<(), eframe::Error> {
    // 创建视口选项,设置视口的内部大小为320x240像素
    let mut options = eframe::NativeOptions {
        viewport: egui::ViewportBuilder::default().with_inner_size([320.0, 240.0]),
        ..Default::default()
    };

    //导入图标,图片就用上面的
    let icon_data = include_bytes!("ferris.png");
    let img = image::load_from_memory_with_format(icon_data, image::ImageFormat::Png).unwrap();
    let rgba_data = img.into_rgba8();
    let (width, height) =(rgba_data.width(),rgba_data.height());
    let rgba: Vec<u8> = rgba_data.into_raw();
    options.viewport.icon=Some(Arc::<IconData>::new(IconData { rgba, width, height}));
    
    // ...    
}

经典布局

在上面示例的基础上,实现一个上中下或左中右的经典三栏布局,main函数不需要修改,只需要修改MyApp结构体的定义即可。

定义导航变量

先定义一个导航枚举,用来在标记当前要显示的界面:

//导航枚举
enum Page {
    Test,
    Settings,
}

为了方便理解示例,在 MyApp 中只定义一个 page 字段,并同步修改new函数:

//定义 MyApp 结构体
struct MyApp {
    page:Page,
}
//MyApp 结构体 new 函数
impl MyApp {
    fn new(cc: &eframe::CreationContext<'_>) -> Self {
        setup_custom_fonts(&cc.egui_ctx);     
        // 结构体赋初值
        Self {
            page:Page::Test,
        }
    }
}

实现导航界面

在 MyApp 中定义导航栏的界面,

impl MyApp {  

    //左侧导航按钮,egui没有内置树控件,有需要可以自己实现
    fn left_ui(&mut self, ui: &mut egui::Ui)  {   
        //一个垂直布局的ui,内部控件水平居中并对齐(填充全宽)
        ui.vertical_centered_justified(|ui| {          
            
            if ui.button("测试").clicked() {
                self.page=Page::Test;
            }

            if ui.button("设置").clicked() {
                self.page=Page::Settings;
            }
            //根据需要定义其它按钮
        });
    }

    //...其它方法
}

实现导航逻辑

在 MyApp 中定义一个 show_page 方法来进行界面调度,每个界面再单独实现自己的UI函数

impl MyApp {  
    //...其它方法

    //根据导航显示页面
    fn show_page(&mut self, ui: &mut egui::Ui)  {   

        match self.page {
            Page::Test => {
                self.test_ui(ui);
            }
            Page::Settings => {
                //...
            }
        }       
    }

    //为了方便理解示例这里只显示一张图片
    fn test_ui(&mut self, ui: &mut egui::Ui)  {         
        ui.image(egui::include_image!("ferris.png"));
    }

    //...其它方法
}

实现主框架布局

在 MyApp 中间实现 main_ui 方法,可以根据自己的需要调整各个栏的位置:

impl MyApp {  
    //...其它方法
    //主框架布局
    fn main_ui(&mut self, ui: &mut egui::Ui)  {        
        // 添加面板的顺序非常重要,影响最终的布局
        egui::TopBottomPanel::top("top_panel")
        .resizable(true)
        .min_height(32.0)
        .show_inside(ui, |ui| {
            egui::ScrollArea::vertical().show(ui, |ui| {
                ui.vertical_centered(|ui| {
                    ui.heading("标题栏");
                });
                ui.label("标题栏内容");
            });
        });

        egui::SidePanel::left("left_panel")
        .resizable(true)
        .default_width(150.0)
        .width_range(80.0..=200.0)
        .show_inside(ui, |ui| {
            ui.vertical_centered(|ui| {
                ui.heading("左导航栏");
            });
            egui::ScrollArea::vertical().show(ui, |ui| {
                self.left_ui(ui);
            });
        });

        egui::SidePanel::right("right_panel")
        .resizable(true)
        .default_width(150.0)
        .width_range(80.0..=200.0)
        .show_inside(ui, |ui| {
            ui.vertical_centered(|ui| {
                ui.heading("右导航栏");
            });
            egui::ScrollArea::vertical().show(ui, |ui| {
                ui.label("右导航栏内容");
            });
        });

        egui::TopBottomPanel::bottom("bottom_panel")
        .resizable(false)
        .min_height(0.0)
        .show_inside(ui, |ui| {
            ui.vertical_centered(|ui| {
                ui.heading("状态栏");
            });
            ui.vertical_centered(|ui| {
                ui.label("状态栏内容");
            });
        });

        egui::CentralPanel::default().show_inside(ui, |ui| {
            ui.vertical_centered(|ui| {
                ui.heading("主面板");
            });
            egui::ScrollArea::vertical().show(ui, |ui| {
                ui.label("主面板内容");

                self.show_page(ui);
            });
        });
    }       
}

调试运行

在 main 函数中稍微调整一下窗口大小:

// 创建视口选项
let mut options = eframe::NativeOptions {
    viewport: egui::ViewportBuilder::default().with_inner_size([1000.0, 500.0]),
    ..Default::default()
};

在 update 函数中调用 main_ui 函数:

impl eframe::App for MyApp {
    fn update(&mut self, ctx: &egui::Context, _frame: &mut eframe::Frame) {
        //设置主题
        ctx.set_visuals(egui::Visuals::dark());
        // 在中央面板上显示egui界面
       egui::CentralPanel::default().show(ctx, |ui| {
        self.main_ui(ui); 
       });        
    }
}

运行结果如下:
image

参考资料