2024年3月

一、摘要

在上篇文章中,我们介绍了
Future
相关的用法,使用它可以获取异步任务执行的返回值。

我们再次回顾一下
Future
相关的用法。

public class FutureTest {

    public static void main(String[] args) throws Exception {
        long startTime = System.currentTimeMillis();
        // 创建一个线程池
        ExecutorService executor = Executors.newFixedThreadPool(1);

        // 提交任务并获得Future的实例
        Future<String> future = executor.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                // 执行下载某文件任务,并返回文件名称
                System.out.println("thread name:" +  Thread.currentThread().getName() + " 开始执行下载任务");
                Thread.sleep(200);
                return "xxx.png";
            }
        });

        //模拟主线程其它操作耗时
        Thread.sleep(300);

        // 通过阻塞方式,从Future中获取异步执行返回的结果
        String result = future.get();
        System.out.println("任务执行结果:" +  result);
        System.out.println("总共用时:" + (System.currentTimeMillis() - startTime) + "ms");

        // 任务执行完毕之后,关闭线程池
        executor.shutdown();
    }
}

运行结果如下:

thread name:pool-1-thread-1 开始执行下载任务
任务执行结果:xxx.png
总共用时:308ms

如果不采用线程执行,那么总共用时应该会是 200 + 300 = 500 ms,而采用线程来异步执行,总共用时是 308 ms。不难发现,通过
Future
和线程池的搭配使用,可以有效的提升程序的执行效率。

但是
Future
对异步执行结果的获取并不是很友好,要么调用阻塞方法
get()
获取结果,要么轮训调用
isDone()
方法是否等于
true
来判断任务是否执行完毕来获取结果,这两种方法都不算很好,因为主线程会被迫等待。

因此,从 Java 8 开始引入了
CompletableFuture
,它针对
Future
做了很多的改进,在实现
Future
接口相关功能之外,还支持传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象方法。

下面我们一起来看看
CompletableFuture
相关的用法!

二、CompletableFuture 用法介绍

我们还是以上面的例子为例,改用
CompletableFuture
来实现,内容如下:

public class FutureTest2 {

    public static void main(String[] args) throws Exception {
        // 创建异步执行任务
        CompletableFuture<String> cf = CompletableFuture.supplyAsync(FutureTest2::download);

        // 如果执行成功,回调此方法
        cf.thenAccept((result) -> {
            System.out.println("任务执行成功,返回结果值:" +  result);
        });

        // 如果执行异常,回调此方法
        cf.exceptionally((e) -> {
            System.out.println("任务执行失败,原因:" +  e.getMessage());
            return null;
        });

        //模拟主线程其它操作耗时
        Thread.sleep(300);
    }

    /**
     * 下载某个任务
     * @return
     */
    private static String download(){
        // 执行下载某文件任务,并返回文件名称
        System.out.println("thread name:" +  Thread.currentThread().getName() + " 开始执行下载任务");
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "xxx.png";
    }
}

运行结果如下:

thread name:ForkJoinPool.commonPool-worker-1 开始执行下载任务
任务执行成功,返回结果值:xxx.png

可以发现,采用
CompletableFuture
类的
supplyAsync()
方法进行异步编程,代码上简洁了很多,不需要单独创建线程池。

实际上,
CompletableFuture
也使用了线程池来执行任务,部分核心源码如下:

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {

    // 判断当前机器 cpu 可用逻辑核心数是否大于1
    private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism() > 1);
    
    // 默认采用的线程池
    // 如果useCommonPool = true,采用 ForkJoinPool.commonPool 线程池
    // 如果useCommonPool = false,采用 ThreadPerTaskExecutor 执行器
    private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

    // ThreadPerTaskExecutor执行器类
    static final class ThreadPerTaskExecutor implements Executor {
        public void execute(Runnable r) { new Thread(r).start(); }
    }


    // 异步执行任务的方法
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }

    // 异步执行任务的方法,支持传入自定义线程池
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                       Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
    }
}

从源码上可以分析出如下几点:

  • 当前机器 cpu 可用逻辑核心数大于 1
    ,默认会采用
    ForkJoinPool.commonPool()
    线程池来执行任务
  • 当前机器 cpu 可用逻辑核心数等于 1
    ,默认会采用
    ThreadPerTaskExecutor
    类来执行任务,它是个一对一执行器,每提交一个任务会创建一个新的线程来执行
  • 同时也支持用户传入自定义线程池来异步执行任务

其中
ForkJoinPool
线程池是从 JDK 1.7 版本引入的,它是一个全新的线程池,后面在介绍
Fork/Join
框架文章中对其进行介绍。

除此之外,
CompletableFuture
为开发者还提供了几十种方法,以便满足更多的异步任务执行的场景。这些方法包括
创建异步任务、任务异步回调、多个任务组合处理
等内容,下面我们就一起来学习一下相关的使用方式。

2.1、创建异步任务

CompletableFuture
创建异步任务,常用的方法有两个。

  • runAsync()
    :执行异步任务时,没有返回值
  • supplyAsync()
    :执行异步任务时,可以带返回值

runAsync()

supplyAsync()
方法相关的源码如下:

// 使用默认内置线程池执行任务,根据runnable构建执行任务,无返回值
public static CompletableFuture<Void> runAsync(Runnable runnable)

// 使用自定义线程池执行任务,根据runnable构建执行任务,无返回值
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

// 使用默认内置线程池执行任务,根据supplyAsync构建执行任务,可以带返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)

// 使用自定义线程池执行任务,根据supplyAsync构建执行任务,可以带返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

两者都支持使用自定义的线程池来执行任务,稍有不同的是
supplyAsync()
方法的入参使用的是
Supplier
接口,它表示
结果的提供者,该结果返回一个对象且不接受任何参数,支持通过 lambda 语法简写

下面我们一起来看看相关的使用示例!

2.1.1、runAsync 使用示例
public static void main(String[] args) throws Exception {
    // 创建异步执行任务
    CompletableFuture<Void> cf = CompletableFuture.runAsync(new Runnable() {
        @Override
        public void run() {
            System.out.println("runAsync,执行完毕");
        }
    });
    System.out.println("runAsync,任务执行结果:" + cf.get());
}

输出结果:

runAsync,执行完毕
runAsync,任务执行结果:null
2.1.2、supplyAsync 使用示例
public static void main(String[] args) throws Exception {
    // 创建异步执行任务
    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        System.out.println("supplyAsync,执行完毕");
        return "hello world";
    });
    System.out.println("supplyAsync,任务执行结果:" + cf.get());
}

输出结果:

supplyAsync,执行完毕
supplyAsync,任务执行结果:hello world

2.2、任务异步回调

当创建的异步任务执行完毕之后,我们希望拿着上一个任务的执行结果,继续执行后续的任务,此时就可以采用回调方法来处理。

CompletableFuture
针对任务异步回调做了很多的支持,常用的方法如下:

  • thenRun()/thenRunAsync()
    :它表示上一个任务执行成功后的回调方法,无入参,无返回值
  • thenAccept()/thenAcceptAsync()
    :它表示上一个任务执行成功后的回调方法,有入参,无返回值
  • thenApply()/thenApplyAsync()
    :它表示上一个任务执行成功后的回调方法,有入参,有返回值
  • whenComplete()/whenCompleteAsync()
    :它表示任务执行完成后的回调方法,有入参,无返回值
  • handle()/handleAsync()
    :它表示任务执行完成后的回调方法,有入参,有返回值
  • exceptionally()
    :它表示任务执行异常后的回调方法

下面我们一起来看看相关的使用示例!

2.2.1、thenRun/thenRunAsync

thenRun()/thenRunAsync()
方法,都表示上一个任务执行成功后的回调处理,无入参,无返回值。稍有不同的是,
thenRunAsync()
方法会采用独立的线程池来执行任务。

相关的源码方法如下:

// 默认线程池
private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

// 采用与上一个任务的线程池来执行任务
public CompletableFuture<Void> thenRun(Runnable action) {
    return uniRunStage(null, action);
}

// 采用默认线程池来执行任务
public CompletableFuture<Void> thenRunAsync(Runnable action) {
    return uniRunStage(asyncPool, action);
}

从源码上可以清晰的看到,
thenRun()/thenRunAsync()
方法都调用了
uniRunStage()
方法,不同的是
thenRunAsync()
使用了
asyncPool
参数,也就是默认的线程池;而
thenRun()
方法使用的是
null
,底层采用上一个任务的线程池来执行,总结下来就是:

  • 当调用
    thenRun()
    方法执行任务时,当前任务和上一个任务都共用同一个线程池
  • 当调用
    thenRunAsync()
    方法执行任务时,上一个任务采用自己的线程池来执行;而当前任务会采用默认线程池来执行,比如
    ForkJoinPool

thenAccept()/thenAcceptAsync()

thenApply()/thenApplyAsync()

whenComplete()/whenCompleteAsync()

handle()/handleAsync()
方法之间的区别也类似,下文不再重复讲解。

下面我们一起来看看
thenRun()
方法的使用示例。

public static void main(String[] args) throws Exception {
    // 创建异步执行任务
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("supplyAsync,执行完毕");
        return "hello world";
    });

    // 当上一个任务执行成功,会继续回调当前方法
    CompletableFuture<Void> cf2 = cf1.thenRun(() -> {
        System.out.println("thenRun1,执行完毕");

    });

    CompletableFuture<Void> cf3 = cf2.thenRun(() -> {
        System.out.println("thenRun2,执行完毕");
    });


    System.out.println("任务执行结果:" + cf3.get());
}

输出结果:

supplyAsync,执行完毕
thenRun1,执行完毕
thenRun2,执行完毕
任务执行结果:null

如果上一个任务执行异常,
是不会回调
thenRun()
方法的

,示例如下:

public static void main(String[] args) throws Exception {
    // 创建异步执行任务
    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        System.out.println("supplyAsync,执行完毕");
        if(1 == 1){
            throw new RuntimeException("执行异常");
        }
        return "hello world";
    });

    // 当上一个任务执行成功,会继续回调当前方法
    CompletableFuture<Void> cf1 = cf.thenRun(() -> {
        System.out.println("thenRun1,执行完毕");

    });

    // 监听执行时异常的回调方法
    CompletableFuture<Void> cf2 = cf1.exceptionally((e) -> {
        System.out.println("发生异常,错误信息:" + e.getMessage());
        return null;
    });

    System.out.println("任务执行结果:" + cf2.get());
}

输出结果:

supplyAsync,执行完毕
发生异常,错误信息:java.lang.RuntimeException: 执行异常
任务执行结果:null

可以清晰的看到,
thenRun()
方法没有回调。

thenAccept()

thenAcceptAsync()

thenApply()

thenApplyAsync()
方法也类似,当上一个任务执行异常,不会回调这些方法。

2.2.2、thenAccept/thenAcceptAsync

thenAccept()/thenAcceptAsync()
方法,表示上一个任务执行成功后的回调方法,有入参,无返回值。

相关的示例如下。

public static void main(String[] args) throws Exception {
    // 创建异步执行任务
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("supplyAsync,执行完毕");
        return "hello world";
    });

    // 当上一个任务执行成功,会继续回调当前方法
    CompletableFuture<Void> cf2 = cf1.thenAccept((r) -> {
        System.out.println("thenAccept,执行完毕,上一个任务执行结果值:" + r);

    });

    System.out.println("任务执行结果:" + cf2.get());
}

输出结果:

supplyAsync,执行完毕
thenAccept,执行完毕,上一个任务执行结果值:hello world
任务执行结果:null
2.2.3、thenApply/thenApplyAsync

thenApply()/thenApplyAsync()
方法,表示上一个任务执行成功后的回调方法,有入参,有返回值。

相关的示例如下。

public static void main(String[] args) throws Exception {
    // 创建异步执行任务
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("supplyAsync,执行完毕");
        return "hello world";
    });

    // 当上一个任务执行成功,会继续回调当前方法
    CompletableFuture<String> cf2 = cf1.thenApply((r) -> {
        System.out.println("thenApply,执行完毕,上一个任务执行结果值:" + r);
        return "gogogo";
    });

    System.out.println("任务执行结果:" + cf2.get());
}

输出结果:

supplyAsync,执行完毕
thenApply,执行完毕,上一个任务执行结果值:hello world
任务执行结果:gogogo
2.2.4、whenComplete/whenCompleteAsync

whenComplete()/whenCompleteAsync()
方法,表示任务执行完成后的回调方法,有入参,无返回值。

稍有不同的是:无论任务执行成功还是失败,它都会回调。

相关的示例如下。

public static void main(String[] args) throws Exception {
    // 创建异步执行任务
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("supplyAsync,执行完毕");
        if(1 == 1){
            throw new RuntimeException("执行异常");
        }
        return "hello world";
    });

    // 当任务执行完成,会继续回调当前方法
    CompletableFuture<String> cf2 = cf1.whenComplete((r, e) -> {
        System.out.println("whenComplete,执行完毕,上一个任务执行结果值:" + r + ",异常信息:" + e.getMessage());
    });

    // 监听执行时异常的回调方法
    CompletableFuture<String> cf3 = cf2.exceptionally((e) -> {
        System.out.println("发生异常,错误信息:" + e.getMessage());
        return e.getMessage();
    });

    System.out.println("任务执行结果:" + cf3.get());
}

输出结果:

supplyAsync,执行完毕
whenComplete,执行完毕,上一个任务执行结果值:null,异常信息:java.lang.RuntimeException: 执行异常
发生异常,错误信息:java.lang.RuntimeException: 执行异常
任务执行结果:java.lang.RuntimeException: 执行异常
2.2.5、handle/handleAsync

handle()/handleAsync()
方法,表示任务执行完成后的回调方法,有入参,有返回值。

同样的,无论任务执行成功还是失败,它都会回调。

相关的示例如下。

public static void main(String[] args) throws Exception {
    // 创建异步执行任务
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("supplyAsync,执行完毕");
        if(1 == 1){
            throw new RuntimeException("执行异常");
        }
        return "hello world";
    });

    // 当任务执行完成,会继续回调当前方法
    CompletableFuture<String> cf2 = cf1.handle((r, e) -> {
        System.out.println("handle,执行完毕,上一个任务执行结果值:" + r + ",异常信息:" + e.getMessage());
        return "handle";
    });
    
    System.out.println("任务执行结果:" + cf2.get());
}

输出结果:

supplyAsync,执行完毕
handle,执行完毕,上一个任务执行结果值:null,异常信息:java.lang.RuntimeException: 执行异常
任务执行结果:handle
2.2.6、exceptionally

exceptionally()
方法,表示任务执行异常后的回调方法。在上文的示例中有所介绍。

最后我们还是简单的看下示例。

public static void main(String[] args) throws Exception {
    // 创建异步执行任务
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("supplyAsync,执行开始");
        if(1 == 1){
            throw new RuntimeException("执行异常");
        }
        return "hello world";
    });

    // 监听执行时异常的回调方法
    CompletableFuture<String> cf2 = cf1.exceptionally((e) -> {
        System.out.println("发生异常,错误信息:" + e.getMessage());
        return e.getMessage();
    });

    System.out.println("任务执行结果:" + cf2.get());
}

输出结果:

supplyAsync,执行开始
发生异常,错误信息:java.lang.RuntimeException: 执行异常
任务执行结果:java.lang.RuntimeException: 执行异常

2.3、多个任务组合处理

某些场景下,如果希望获取两个不同的异步执行结果进行组合处理,可以采用多个任务组合处理方式。

CompletableFuture
针对多个任务组合处理做了很多的支持,常用的组合方式有以下几种。

  • AND组合
    :表示将两个
    CompletableFuture
    任务组合起来,只有这两个任务都正常执行完了,才会继续执行回调任务,比如
    thenCombine()
    方法
  • OR组合
    :表示将两个
    CompletableFuture
    任务组合起来,只要其中一个正常执行完了,就会继续执行回调任务,比如
    applyToEither
    方法
  • AllOf组合
    :可以将多个
    CompletableFuture
    任务组合起来,只有所有的任务都正常执行完了,才会继续执行回调任务,比如
    allOf()
    方法
  • AnyOf组合
    :可以将多个
    CompletableFuture
    任务组合起来,只要其中一个任务正常执行完了,就会继续执行回调任务,比如
    anyOf()
    方法

下面我们一起来看看相关的使用示例!

2.3.1、AND组合

实现
AND组合
的操作方法有很多,比如
runAfterBoth()

thenAcceptBoth()

thenCombine()
等方法,它们之间的区别在于:是否带有入参、是否带有返回值。

其中
thenCombine()
方法支持传入参、带返回值。

相关示例如下:

public static void main(String[] args) throws Exception {
    // 创建异步执行任务
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("supplyAsync1,执行完毕");
        return "supplyAsync1";
    });

    CompletableFuture<String> cf2 = CompletableFuture
            .supplyAsync(() -> {
                System.out.println("supplyAsync2,执行完毕");
                return "supplyAsync2";
            })
            .thenCombine(cf1, (r1, r2) -> {
                System.out.println("r1任务执行结果:" + r1);
                System.out.println("r2任务执行结果:" + r2);
                return r1 + "_" + r2;
            });

    System.out.println("任务执行结果:" + cf2.get());
}

输出结果:

supplyAsync1,执行完毕
supplyAsync2,执行完毕
r1任务执行结果:supplyAsync2
r2任务执行结果:supplyAsync1
任务执行结果:supplyAsync2_supplyAsync1
2.3.2、OR组合

实现
OR组合
的操作方法有很多,比如
runAfterEither()

acceptEither()

applyToEither()
等方法,区别同上。

其中
applyToEither()
方法支持传入参、带返回值。

相关示例如下:

public static void main(String[] args) throws Exception {
    // 创建异步执行任务
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("supplyAsync1,执行完毕");
        return "supplyAsync1";
    });

    CompletableFuture<String> cf2 = CompletableFuture
            .supplyAsync(() -> {
                System.out.println("supplyAsync2,执行完毕");
                return "supplyAsync2";
            })
            .applyToEither(cf1, (r) -> {
                System.out.println("第一个执行成功的任务结果:" + r);
                return r + "_applyToEither";
            });

    System.out.println("任务执行结果:" + cf2.get());
}

输出结果:

supplyAsync1,执行完毕
supplyAsync2,执行完毕
第一个执行成功的任务结果:supplyAsync2
任务执行结果:supplyAsync2_applyToEither
2.3.2、AllOf组合

实现
AllOf组合
的操作就一个方法
allOf()
,可以将多个任务进行组合,只有都执行成功才会回调,回调入参为空值。

相关示例如下:

public static void main(String[] args) throws Exception {
    // 创建异步执行任务
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("supplyAsync1,执行完毕");
        return "supplyAsync1";
    });

    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("supplyAsync2,执行完毕");
        return "supplyAsync2";
    });

    // 将多个任务,进行AND组合
    CompletableFuture<String> cf3 = CompletableFuture
            .allOf(cf1, cf2)
            .handle((r, e) -> {
                System.out.println("所有任务都执行成功,result:" +  r);
                return "over";
            });
    System.out.println(cf3.get());
}

输出结果:

supplyAsync1,执行完毕
supplyAsync2,执行完毕
所有任务都执行成功,result:null
over
2.3.3、AnyOf组合

实现
AnyOf组合
的操作,同样就一个方法
anyOf()
,可以将多个任务进行组合,只要一个执行成功就会回调,回调入参有值。

相关示例如下:

public static void main(String[] args) throws Exception {
    // 创建异步执行任务
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("supplyAsync1,执行完毕");
        return "supplyAsync1";
    });

    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("supplyAsync2,执行完毕");
        return "supplyAsync2";
    });

    // 将多个任务,进行AND组合
    CompletableFuture<String> cf3 = CompletableFuture
            .anyOf(cf1, cf2)
            .handle((r, e) -> {
                System.out.println("某个任务执行成功,返回值:" + r);
                return "over";
            });
    System.out.println(cf3.get());
}

输出结果:

supplyAsync1,执行完毕
supplyAsync2,执行完毕
某个任务执行成功,返回值:supplyAsync1
over

三、小结

本文主要围绕
CompletableFuture
类相关用法进行了一次知识总结,通过
CompletableFuture
类可以简化异步编程,同时支持多种异步任务,按照条件组合处理,相比其它的并发工具类,操作更加强大、实用。

本篇内容比较多,如果有描述不对的地方,欢迎网友留言指出,希望本文知识总结能帮助到大家。

四、参考

1.
https://www.liaoxuefeng.com/wiki/1252599548343744/1306581182447650

2.
https://juejin.cn/post/6970558076642394142

本文分享自华为云社区《
Bokeh图形魔法:掌握绘图基础与高级技巧,定制炫目可视化
》,作者: 柠檬味拥抱。

Bokeh是一个用于创建交互式可视化图形的强大Python库。它不仅易于使用,而且功能强大,适用于各种数据可视化需求。本文将介绍Bokeh库的绘图可视化基础入门,重点说明常用的参数,并通过实例演示如何在实际项目中应用这些技术。

安装Bokeh库

首先,确保你已经安装了Bokeh库。如果没有安装,可以使用以下命令进行安装:

pip install bokeh

Bokeh绘图基础

Bokeh支持多种图形类型,包括散点图、线图、柱状图等。在绘制这些图形时,我们需要关注一些重要的参数。

基础图形绘制

首先,我们来看一个简单的例子,绘制一个散点图:

frombokeh.plotting import figure, showfrombokeh.io import output_notebook

# 准备数据
x
= [1, 2, 3, 4, 5]
y
= [6, 7, 2, 4, 5]

# 创建绘图对象
p
= figure(title="简单散点图", x_axis_label='X轴', y_axis_label='Y轴')

# 绘制散点图
p.circle(x, y, size
=10, color="navy", alpha=0.5)

# 在Jupyter Notebook中显示图形
output_notebook()
show(p)

在上述代码中,我们使用
figure
函数创建了一个绘图对象,并使用
circle
方法绘制了一个散点图。
size
参数控制点的大小,
color
参数定义颜色,
alpha
参数设置透明度。

完善图形

除了基本的绘图,Bokeh还支持添加轴标签、图例等元素,使图形更加完整。以下是一个例子:

# 添加轴标签
p.xaxis.axis_label_standoff
= 15p.yaxis.axis_label_standoff= 15# 添加图例
p.legend.label_text_font_size
= "12pt"p.legend.location= "top_left"# 在Jupyter Notebook中显示图形
output_notebook()
show(p)

通过以上代码,我们在图形上方添加了X轴和Y轴的标签,并在左上角添加了图例。

实例演示

让我们通过一个实际案例,展示如何使用Bokeh进行更复杂的可视化。

案例:股票走势图

import yfinance asyf

# 获取股票数据
stock_data
= yf.download("AAPL", start="2023-01-01", end="2024-01-01")

# 创建绘图对象
p
= figure(title="AAPL股票走势", x_axis_label='日期', y_axis_label='股价(美元)', x_axis_type="datetime")

# 绘制线图
p.line(stock_data.index, stock_data[
'Close'], line_width=2, color="orange", legend_label="收盘价")

# 添加图例和轴标签
p.legend.location
= "top_left"p.xaxis.axis_label_standoff= 15p.yaxis.axis_label_standoff= 15# 在Jupyter Notebook中显示图形
output_notebook()
show(p)

在这个例子中,我们使用了
yfinance
库获取了苹果公司(AAPL)股票在指定日期范围内的数据,并使用Bokeh绘制了股票的收盘价走势图。

Bokeh库高级功能探索

在了解了Bokeh的基础绘图技术后,我们将深入探讨一些高级功能,使得你能够更灵活、更创造性地进行数据可视化。

1. 工具栏和交互性

Bokeh提供了丰富的工具栏,可以让用户与图形进行交互。下面是一个包含工具栏的例子:

frombokeh.models import HoverTool

# 创建绘图对象
p
= figure(title="交互式散点图", x_axis_label='X轴', y_axis_label='Y轴', tools="pan,box_zoom,reset,save")

# 添加悬停工具
hover
= HoverTool(tooltips=[("数值", "@x, @y")])
p.add_tools(hover)

# 绘制散点图
p.circle(x, y, size
=10, color="navy", alpha=0.5)

# 在Jupyter Notebook中显示图形
output_notebook()
show(p)

在上述例子中,我们通过
tools
参数添加了平移、缩放、重置和保存工具,并使用
HoverTool
添加了悬停提示。

2. 高级图形元素

Bokeh支持绘制一些高级图形元素,例如矩形、椭圆等。以下是一个绘制矩形和椭圆的例子:

# 创建绘图对象
p
= figure(title="矩形和椭圆示例", x_axis_label='X轴', y_axis_label='Y轴')

# 绘制矩形
p.rect(x
=[1, 2, 3], y=[4, 5, 6], width=0.2, height=0.2, color="green", alpha=0.7)

# 绘制椭圆
p.ellipse(x
=[4, 5, 6], y=[7, 8, 9], width=0.2, height=0.1, color="blue", alpha=0.7)

# 在Jupyter Notebook中显示图形
output_notebook()
show(p)

3. 数据链接和动态更新

Bokeh允许你动态地更新图形,可以根据用户的输入或外部事件来实现。以下是一个简单的例子,演示如何通过滑块动态更新散点图:

frombokeh.models import Sliderfrombokeh.layouts import column

# 创建绘图对象
p
= figure(title="动态散点图", x_axis_label='X轴', y_axis_label='Y轴')

# 创建滑块
slider
= Slider(start=1, end=10, step=1, value=1, title="选择散点大小")

# 回调函数,根据滑块值更新散点大小
def update_size(attr, old,
new):
p.circle(x, y, size
=new, color="navy", alpha=0.5)

slider.on_change(
'value', update_size)

# 将图形和滑块组合在一起
layout
=column(p, slider)

# 在Jupyter Notebook中显示图形
output_notebook()
show(layout)

在这个例子中,滑块的值变化时,通过回调函数更新了散点的大小,实现了动态更新效果。,你可以创建出更具吸引力和实用性的可视化图形,更好地展示和解释数据。继续探索Bokeh的文档和示例,发挥其潜力,提升你的数据可视化技能。

Bokeh库与其他库的整合

Bokeh可以与其他Python库无缝整合,进一步拓展其功能。在本节中,我们将介绍Bokeh与Pandas、Matplotlib等库的整合,以及如何在Web应用中使用Bokeh。

1. 与Pandas整合

Pandas是一个强大的数据分析库,而Bokeh可以轻松地与Pandas进行整合,实现更便捷的数据可视化。

import pandas aspd

# 创建一个Pandas DataFrame
data
= pd.DataFrame({'x': [1, 2, 3, 4, 5], 'y': [6, 7, 2, 4, 5]})

# 使用Bokeh绘制散点图
p
= figure(title="Pandas整合示例", x_axis_label='X轴', y_axis_label='Y轴')
p.circle(
'x', 'y', size=10, color="navy", alpha=0.5, source=data)

# 在Jupyter Notebook中显示图形
output_notebook()
show(p)

在这个例子中,我们创建了一个Pandas DataFrame,并使用Bokeh的
circle
函数绘制了散点图,其中的数据直接来自于DataFrame。

2. 与Matplotlib整合

如果你已经熟悉Matplotlib,并且希望结合Bokeh的交互性和Matplotlib的绘图功能,你可以使用
bokeh.plotting

from_bokeh
函数将Bokeh图形转换为Matplotlib图形。

frombokeh.plotting import figure, showfrombokeh.io import output_notebookfrombokeh.plotting import from_bokeh

# 创建Bokeh绘图对象
p
= figure(title="Bokeh与Matplotlib整合示例", x_axis_label='X轴', y_axis_label='Y轴')
p.circle([
1, 2, 3, 4, 5], [6, 7, 2, 4, 5], size=10, color="navy", alpha=0.5)

# 在Jupyter Notebook中显示Bokeh图形
output_notebook()
show(p)

# 转换为Matplotlib图形
mpl_fig
=from_bokeh(p)

# 在Matplotlib中显示图形
import matplotlib.pyplot
asplt
plt.show(mpl_fig)

这样,你可以在使用Bokeh的同时,充分利用Matplotlib的丰富绘图功能。

3. 在Web应用中使用Bokeh

Bokeh提供了
bokeh.server
模块,使得你能够将Bokeh图形嵌入到Web应用中。这样,你可以创建交互式、动态的可视化应用。

frombokeh.models import ColumnDataSourcefrombokeh.models.widgets import Sliderfrombokeh.layouts import columnfrombokeh.io import curdoc

# 创建绘图对象和数据源
source
= ColumnDataSource(data={'x': [1, 2, 3, 4, 5], 'y': [6, 7, 2, 4, 5]})
p
= figure(title="动态散点图", x_axis_label='X轴', y_axis_label='Y轴')
p.circle(
'x', 'y', size=10, color="navy", alpha=0.5, source=source)

# 创建滑块和回调函数
slider
= Slider(start=1, end=10, step=1, value=1, title="选择散点大小")

def update_size(attr, old,
new):
source.data
= {'x': [1, 2, 3, 4, 5], 'y': [6, 7, 2, 4, 5], 'size': [new]*5}

slider.on_change(
'value', update_size)

# 将图形和滑块组合在一起
layout
=column(p, slider)

# 将布局添加到文档
curdoc().add_root(layout)

通过上述代码,你可以使用Bokeh创建一个交互式Web应用。运行此脚本后,可以通过访问
http://localhost:5006/
在本地查看应用。

Bokeh库的主题和样式定制

Bokeh不仅提供了丰富的绘图功能,还允许用户根据需求自定义图形的主题和样式,以便更好地与项目风格或品牌一致。在本节中,我们将学习如何定制Bokeh图形的主题和样式。

1. 主题定制

Bokeh提供了一系列内置主题,用户可以轻松切换以改变图形的整体外观。以下是一个主题定制的简单例子:

frombokeh.themes import built_in_themes

# 创建绘图对象
p
= figure(title="主题定制示例", x_axis_label='X轴', y_axis_label='Y轴')

# 选择主题
p.theme
= built_in_themes['dark_minimal']

# 在Jupyter Notebook中显示图形
output_notebook()
show(p)

在这个例子中,我们选择了内置主题
'dark_minimal'
,你可以根据实际需求选择其他主题,如
'light_minimal'

'caliber'
等。

2. 样式定制

Bokeh还允许用户直接调整图形的样式,包括颜色、线型、字体等。以下是一个简单的样式定制例子:

# 创建绘图对象
p
= figure(title="样式定制示例", x_axis_label='X轴', y_axis_label='Y轴')

# 绘制线图
p.line([
1, 2, 3, 4, 5], [6, 7, 2, 4, 5], line_width=2, line_color="green", line_dash="dashed")

# 设置字体样式
p.title.text_font
= "times"p.title.text_font_style= "italic"# 在Jupyter Notebook中显示图形
output_notebook()
show(p)

在这个例子中,我们使用
line_color
参数设置线的颜色为绿色,使用
line_dash
参数设置线型为虚线,通过设置
title.text_font

title.text_font_style
调整标题的字体和样式。

3. 自定义工具提示

Bokeh允许用户自定义工具提示的内容和样式,以更好地满足项目需求。以下是一个自定义工具提示的例子:

frombokeh.models import HoverTool

# 创建绘图对象
p
= figure(title="自定义工具提示示例", x_axis_label='X轴', y_axis_label='Y轴')

# 绘制散点图
scatter
= p.circle([1, 2, 3, 4, 5], [6, 7, 2, 4, 5], size=10, color="navy", alpha=0.5)

# 自定义工具提示
hover
= HoverTool(tooltips=[("数值", "@x, @y"), ("额外信息", "自定义信息")], renderers=[scatter])
p.add_tools(hover)

# 在Jupyter Notebook中显示图形
output_notebook()
show(p)

在这个例子中,我们通过
HoverTool

tooltips
参数定义了工具提示的内容,并通过
renderers
参数指定了应用工具提示的图形元素。

总结

Bokeh库是一个功能强大、灵活且易于使用的Python可视化库,适用于各种数据可视化需求。本文从基础入门开始,介绍了Bokeh的基本绘图技术,包括散点图、线图、柱状图等,详细说明了常用的参数和实例演示。随后,我们深入探讨了Bokeh的高级功能,包括工具栏、交互性、高级图形元素和动态更新,使得读者能够更灵活地进行数据可视化。

进一步地,我们探讨了Bokeh库与其他常用库的整合,包括与Pandas、Matplotlib的结合,以及在Web应用中使用Bokeh的方法。这使得Bokeh不仅可以独立使用,还可以与其他库协同工作,充分发挥各个库的优势。

最后,我们了解了Bokeh库的主题和样式定制,学习了如何选择内置主题、调整样式和自定义工具提示,以便更好地满足个性化的可视化需求。Bokeh的主题和样式定制功能为用户提供了更多定制化的空间,使得可视化图形更符合项目的整体风格和品牌。

通过本文的内容,读者可以建立起对Bokeh库全面的认识,从基础到高级,从整合到定制,为数据科学家和工程师提供了强大的工具,助力更好地理解、展示和传达数据。继续深入学习Bokeh的文档和示例,将有助于更加熟练地运用这一强大的可视化工具。

点击关注,第一时间了解华为云新鲜技术~

很多人都听说过爬虫,我也不例外。曾看到别人编写的爬虫代码,虽然没有深入研究,但感觉非常强大。因此,今天我决定从零开始,花费仅5分钟学习入门爬虫技术,以后只需轻轻一爬就能查看所有感兴趣的网站内容。广告?不存在的,因为我看不见。爬虫只会获取我感兴趣的信息,不需要的内容对我而言只是一堆代码。我们不在乎网站的界面,爬取完数据后只会关注最核心的内容。

在这个过程中,技术方面实际上没有太多复杂的内容,实际上就是一项耐心细致的工作。因此才会有那么多人选择从事爬虫兼职工作,因为虽然耗时较长,但技术要求并不是很高。今天学完之后,你就不会像我一样认为爬虫很困难了。或许在未来你会需要考虑如何保持会话(session)或者绕过验证等问题,因为网站越难爬取,说明对方并不希望被爬取。实际上,这部分内容是最具挑战性的,有机会的话我们可以在以后的学习中深入讨论。

今天我们以选择菜谱为案例,来解决我们在吃饭时所面临的“吃什么”的生活难题。

爬虫解析

爬虫的工作原理类似于模拟用户在浏览网站时的操作:首先访问官方网站,检查是否有需要点击的链接,若有,则继续点击查看。当直接发现所需的图片或文字时,即可进行下载或复制。这种爬虫的基本架构如图所示,希望这样的描述能帮助你更好地理解。

image

爬网页HTML

在进行爬虫工作时,我们通常从第一步开始,即发送一个HTTP请求以获取返回的数据。在我们的工作中,通常会请求一个链接以获取JSON格式的信息,以便进行业务处理。然而,爬虫的工作方式略有不同,因为我们需要首先获取网页内容,因此这一步通常返回的是HTML页面。在Python中,有许多请求库可供选择,我只举一个例子作为参考,但你可以根据实际需求选择其他第三方库,只要能够完成任务即可。

在开始爬虫工作之前,首先需要安装所需的第三方库依赖。这部分很简单,只需根据需要安装相应的库即可,没有太多复杂的步骤。

让我们不多废话,直接看下面的代码示例:

from urllib.request import urlopen,Request
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0'}
req = Request("https://www.meishij.net/?from=space_block",headers=headers)
# 发出请求,获取html
# 获取的html内容是字节,将其转化为字符串
html = urlopen(req)
html_text = bytes.decode(html.read())
print(html_text)

通常情况下,我们可以获取这个菜谱网页的完整内容,就像我们在浏览器中按下F12查看的网页源代码一样。

解析元素

最笨的方法是使用字符串解析,但由于Python有许多第三方库可以解决这个问题,因此我们可以使用BeautifulSoup来解析HTML。其他更多的解析方法就不一一介绍了,我们需要用到什么就去搜索即可,不需要经常使用的也没必要死记硬背。

热搜菜谱

在这里,让我们对热门搜索中的菜谱进行解析和分析。

from urllib.request import urlopen,Request
from bs4 import BeautifulSoup as bf
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0'}
req = Request("https://www.meishij.net/?from=space_block",headers=headers)
# 发出请求,获取html
# 获取的html内容是字节,将其转化为字符串
html = urlopen(req)
html_text = bytes.decode(html.read())
# print(html_text)
 # 用BeautifulSoup解析html
obj = bf(html_text,'html.parser')
# print(html_text)
# 使用find_all函数获取所有图片的信息
index_hotlist = obj.find_all('a',class_='sancan_item')
# 分别打印每个图片的信息
for ul in index_hotlist:
    for li in ul.find_all('strong',class_='title'):
        print(li.get_text())

主要步骤是,首先在上一步中打印出HTML页面,然后通过肉眼观察确定所需内容位于哪个元素下,接着利用BeautifulSoup定位该元素并提取出所需信息。在我的情况下,我提取的是文字内容,因此成功提取了所有li列表元素。

随机干饭

在生活中,实际上干饭并不复杂,难点在于选择吃什么。因此,我们可以将所有菜谱解析并存储在一个列表中,然后让程序随机选择菜谱。这样,就能更轻松地解决每顿饭吃什么的难题了。

随机选取一道菜时,可以使用以下示例代码:

from urllib.request import urlopen,Request
from bs4 import BeautifulSoup as bf
for i in range(3):
    url = f"https://www.meishij.net/chufang/diy/jiangchangcaipu/?&page={i}"
    html = urlopen(url)
    # 获取的html内容是字节,将其转化为字符串
    html_text = bytes.decode(html.read())
    # print(html_text)
    obj = bf(html_text,'html.parser')
    index_hotlist = obj.find_all('img')
    for p in index_hotlist:
        if p.get('alt'):
            print(p.get('alt'))

这里我们在这个网站上找到了新的链接地址,我已经获取了前三页的数据,并进行了随机选择,你可以选择全部获取。

菜谱教程

其实上一步已经完成了,接下来只需下单外卖了。外卖种类繁多,但对于像我这样的顾家奶爸来说并不合适,因此我必须自己动手做饭。这时候教程就显得尤为重要了。

我们现在继续深入解析教程内容:

from urllib.request import urlopen,Request
import urllib,string
from bs4 import BeautifulSoup as bf

url = f"https://so.meishij.net/index.php?q=红烧排骨"
url = urllib.parse.quote(url, safe=string.printable)
html = urlopen(url)
# 获取的html内容是字节,将其转化为字符串
html_text = bytes.decode(html.read())
obj = bf(html_text,'html.parser')
index_hotlist = obj.find_all('a',class_='img')
# 分别打印每个图片的信息
url = index_hotlist[0].get('href')
html = urlopen(url)
html_text = bytes.decode(html.read())
obj = bf(html_text,'html.parser')
index_hotlist = obj.find_all('div',class_='step_content')
for div in index_hotlist:
    for p in div.find_all('p'):
        print(p.get_text())

包装一下

上面提到的方法已经满足了我们的需求,但是重复手动执行每个步骤并不是一个高效的方式。因此,我将这些步骤封装成一个简单的应用程序。这个应用程序使用控制台作为用户界面,不需要依赖任何第三方库。让我们一起来看一下这个应用程序吧:

# 导入urllib库的urlopen函数
from urllib.request import urlopen,Request
import urllib,string
# 导入BeautifulSoup
from bs4 import BeautifulSoup as bf
from random import choice,sample
from colorama import init
from os import system
from termcolor import colored
from readchar import  readkey


FGS = ['green', 'yellow', 'blue', 'cyan', 'magenta', 'red']
print(colored('搜索食谱中.....',choice(FGS)))
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0'}
req = Request("https://www.meishij.net/?from=space_block",headers=headers)
# 发出请求,获取html
# 获取的html内容是字节,将其转化为字符串
html = urlopen(req)
html_text = bytes.decode(html.read())
hot_list = []
all_food = []
food_page = 3


# '\n'.join(pos(y, OFFSET[1]) + ' '.join(color(i) for i in l)
def draw_menu(menu_list):
    clear()
    for idx,i in enumerate(menu_list):
        print(colored(f'{idx}:{i}',choice(FGS)))
    print(colored('8:随机选择',choice(FGS)))


def draw_word(word_list):
    clear()
    for i in word_list:
        print(colored(i,choice(FGS)))

def clear():
    system("CLS")

def hot_list_func() :
    global html_text
    # 用BeautifulSoup解析html
    obj = bf(html_text,'html.parser')
    # print(html_text)
    # 使用find_all函数获取所有图片的信息
    index_hotlist = obj.find_all('a',class_='sancan_item')
    # 分别打印每个图片的信息
    for ul in index_hotlist:
        for li in ul.find_all('strong',class_='title'):
            hot_list.append(li.get_text())
            # print(li.get_text())

def search_food_detail(food) :
    print('正在搜索详细教程,请稍等30秒左右!')
    url = f"https://so.meishij.net/index.php?q={food}"
    # print(url)
    url = urllib.parse.quote(url, safe=string.printable)
    html = urlopen(url)
    # 获取的html内容是字节,将其转化为字符串
    html_text = bytes.decode(html.read())
    obj = bf(html_text,'html.parser')
    index_hotlist = obj.find_all('a',class_='img')
    # 分别打印每个图片的信息
    url = index_hotlist[0].get('href')
    # print(url)
    html = urlopen(url)
    html_text = bytes.decode(html.read())
    # print(html_text)
    obj = bf(html_text,'html.parser')
    random_color = choice(FGS)
    print(colored(f"{food}做法:",random_color))
    index_hotlist = obj.find_all('div',class_='step_content')
    # print(index_hotlist)
    random_color = choice(FGS)
    for div in index_hotlist:
        for p in div.find_all('p'):
            print(colored(p.get_text(),random_color))



def get_random_food():
    global food_page
    if not all_food :
        for i in range(food_page):
            url = f"https://www.meishij.net/chufang/diy/jiangchangcaipu/?&page={i}"
            html = urlopen(url)
            # 获取的html内容是字节,将其转化为字符串
            html_text = bytes.decode(html.read())
            # print(html_text)
            obj = bf(html_text,'html.parser')
            index_hotlist = obj.find_all('img')
            for p in index_hotlist:
                if p.get('alt'):
                    all_food.append(p.get('alt'))
    my_food = choice(all_food)
    print(colored(f'随机选择,今天吃:{my_food}',choice(FGS)))
    return my_food


init() ## 命令行输出彩色文字
hot_list_func()
print(colored('已搜索完毕!',choice(FGS)))
my_array = list(range(0, 9))
my_key = ['q','c','d','m']
my_key.extend(my_array)
print(colored('m:代表今日菜谱',choice(FGS)))
print(colored('c:代表清空控制台',choice(FGS)))
print(colored('d:代表菜谱教程',choice(FGS)))
print(colored('q:退出菜谱',choice(FGS)))
print(colored('0~8:选择菜谱中的菜',choice(FGS)))
while True:
    while True:
        move = readkey()
        if move in my_key or (move.isdigit() and int(move) <= len(random_food)):
            break
    if move == 'q': ## 键盘‘Q’是退出
        break
    if move == 'c': ## 键盘‘C’是清空控制台
        clear()
    if move == 'm':
        random_food = sample(hot_list,8)
        draw_menu(random_food)
    if move.isdigit() and int(move) <= len(random_food):
        if int(move) == 8:
            my_food = get_random_food()
        else:
            my_food = random_food[int(move)]
        print(my_food)
    if move == 'd' and my_food : ## 键盘‘D’是查看教程
        search_food_detail(my_food)
        my_food = ''

完成一个简单的小爬虫其实并不复杂,如果不考虑额外的封装步骤,仅需5分钟即可完成,这已经足够快速让你入门爬虫技术。开始爬取某个网站的数据实际上是一项细致的工作。只需在网上搜索相关技术信息,找到适合的方法即可,如果有效就继续使用,不行就试试其他方法。

总结

本文的重点在于引导读者如何初步掌握爬虫技术。初步掌握爬虫技术并不难,但是在实际操作中可能会遇到一些困难,比如一些网站不允许直接访问,需要登录或者进行各种人机验证等。因此,最好先从爬取一些新闻资讯类的网站开始,因为这样相对容易。涉及用户支付等敏感信息的网站就不那么容易获取了。因此,在入门阶段,建议不要纠结于选择一个复杂的网站,先尝试入门即可。一旦理解了基本原理,遇到问题时就可以考虑添加组件或者使用第三方库来解决。

最终,我真诚地希望本文对你有所帮助。如果你觉得内容有趣或有用,不妨动动小手,点个关注支持一下,嘻嘻。

.NET 6 引入了 LoggerMessageAttribute 类型。 使用时,它会以source-generators的方式生成高性能的日志记录 API。

source-generators可在编译代码时,可以提供其他源代码作为编译的输入。

LoggerMessageAttribute依赖于 ILogger 接口和 LoggerMessage.Define 功能。

在 partial 日志记录方法上使用 LoggerMessageAttribute 时,系统会触发源生成器。 触发后,它既可以自动生成其修饰的 partial 方法的实现,也可以生成包含正确用法提示的编译时诊断。

与现有的日志记录方法相比,编译时日志记录解决方案在运行时通常要快得多。 这是因为它最大限度地消除了装箱、临时分配和副本。

基本用法

使用 LoggerMessageAttribute 时,类和方法必须为 partial。 真实记录日志的代码生成器在编译时触发,并生成 partial 方法的实现。

public static partial classLog
{
[LoggerMessage(
EventId
= 0,
Level
=LogLevel.Error,
Message
= "Can not open SQL connection {err}")]public static partial void CouldNotOpenConnection(this ILogger logger, stringerr);
}

在上面的示例中,日志记录方法为 static,日志级别在属性定义中指定,并使用 this 关键字将方法定义为扩展方法。

在调用时,可按常规方式调用即可

internal classProgram
{
private static async Task Main(string[] args)
{
using ILoggerFactory loggerFactory =LoggerFactory.Create(
builder
=>builder.AddJsonConsole(
options
=>options.JsonWriterOptions= newJsonWriterOptions()
{
Indented
= true}));

ILogger logger
= loggerFactory.CreateLogger("Program");

logger.CouldNotOpenConnection(
"network err");
}
}

使用规则

在日志记录方法上使用 LoggerMessageAttribute 时,必须遵循一些规则:

  • 日志记录方法必须为 partial 并返回 void。
  • 日志记录方法名称不得以下划线开头。
  • 日志记录方法的参数名称不得以下划线开头。
  • 日志记录方法不得在嵌套类型中定义。
  • 日志记录方法不能是泛型方法。
  • 如果日志记录方法是 static,则需要 ILogger 实例作为参数。

代码生成模型依赖于使用新式 C# 编译器 9 或更高版本编译的代码。 .NET 5 提供了 C# 9.0 编译器。 若要升级到新式 C# 编译器,请编辑项目文件以面向 C# 9.0。

好处

使用源生成器方法有几个主要好处:

  • 允许保留日志记录结构,并启用消息模板所需的确切格式语法。
  • 允许为模板占位符提供替代名称,允许使用格式说明符。
  • 允许按原样传递所有原始数据,在对其进行处理之前,不需要进行任何复杂的存储(除了创建 string)。
  • 提供特定于日志记录的诊断,针对重复的事件 ID 发出警告。

https://devblogs.microsoft.com/dotnet/introducing-c-source-generators/

https://learn.microsoft.com/zh-cn/dotnet/csharp/language-reference/keywords/partial-method

https://learn.microsoft.com/zh-cn/dotnet/core/extensions/logger-message-generator

语义索引(可通俗理解为向量索引)技术是搜索引擎、推荐系统、广告系统在召回阶段的核心技术之一。语义索引模型的目标是:给定输入文本,模型可以从海量候选召回库中
快速、准确
地召回一批语义相关文本。语义索引模型的效果直接决定了语义相关的物料能否被成功召回进入系统参与上层排序,从基础层面影响整个系统的效果。

In-batch negatives

我们采用百度paddleNLP里提到的
In-batch Negatives
方案。

In-batch Negatives 策略的训练数据为语义相似的 Pair 对,策略核心是在 1 个 Batch 内同时基于 N 个负例进行梯度更新,将Batch 内除自身之外其它所有 Source Text 的相似文本 Target Text 作为负例,例如: 上例中“我手机丢了,我想换个手机” 有 1 个正例(”我想买个新手机,求推荐“),3 个负例(1.求秋色之空全集漫画,2.手机学日语的软件,3.侠盗飞车罪恶都市怎么改车)。

具体来说,In-batch negatives策略的实施步骤如下:

  1. 选择正样本
    :首先从当前批次中选择出一个正样本,这个样本是模型需要正确识别的目标样本。
  2. 选择负样本
    :然后从同一批次中随机选择或根据特定规则选择一些负样本。这些负样本可以是与正样本相似但被错误标记的样本,也可以是完全不相关的样本。
  3. 模型训练
    :将正样本和负样本一起输入模型进行训练。模型需要学会区分正样本和负样本,从而提高推荐或检索的准确性。

In-batch negatives策略的优势在于:

  • 提高模型的区分能力
    :通过在每个批次中引入负样本,模型被迫学习如何区分正样本和负样本,这有助于提高模型的泛化能力和区分度。
  • 利用现有数据
    :不需要额外的负样本库,可以直接利用当前批次中的数据作为负样本,这在数据有限的情况下尤其有用。
  • 减少计算资源消耗
    :与从全局样本集中采样负样本相比,In-batch negatives可以减少计算资源的消耗,因为它避免了在整个数据集上进行负采样的需要。

然而,In-batch negatives策略也存在一些潜在的问题,例如:

  • 批次大小的限制
    :如果批次大小较小,可能无法提供足够多样化的负样本,这可能影响模型的学习效果。
  • 偏差问题
    :由于负样本是在同一个批次中选择的,可能会出现某些样本被频繁选为负样本的情况,这可能导致模型学习到的表示存在偏差。

一般通过 Recall@1,Recall@5 ,Recall@10 ,Recall@20 和 Recall@50 指标来评估语义索引模型的召回效果。按照paddleNLP给出的基线:

策略 模型 Recall@1 Recall@5 Recall@10 Recall@20 Recall@50
In-batch Negatives ernie 1.0 51.301 65.309 69.878 73.996 78.881
In-batch Negatives rocketqa-zh-base-query-encoder 59.622 75.089 79.668 83.404 87.773

rocketqa作为打底transformer模型效果更好。

总结,为什么为采用In-batch negatives,一方面能充分利用现有数据,不用单独准备负样例,减少投入,另外一方面模型的区分能力也比较好。

模型数据方案

流传一句话,用1亿条数据,训练10个epoch,不如用10亿数据训练一个epoch,也就是见多识广,大力出奇迹。

我们要训练一个给搜索用的向量召回模型,核心就是让准备足够多的正样例数据。正样例数据,一方面网上有较多的开源数据,可以直接利用。另外一方面,之间了解
SimBERT
时,他们的数据很多也源自于搜索数据,所以可以通过搜索引擎将query和召回结果的doc作为相似句对。

作为试验,我们构造了8000万的一个小训练集,用
rocketqa-zh-mini-query-encoder
作为打底模型,训练256维的embedding模型。

root_path=inbatch
python -u -m paddle.distributed.launch --gpus "0" \
    train_batch_neg.py \
    --device gpu \
    --save_dir ./checkpoints/${root_path} \
    --batch_size 64 \
    --learning_rate 5E-5 \
    --epochs 3 \
    --output_emb_size 256 \
    --model_name_or_path rocketqa-zh-mini-query-encoder \
    --save_steps 5000 \
    --max_seq_length 128 \
    --margin 0.2 \
    --train_set_file recall/train.csv \
    --recall_result_dir "recall_result_dir" \
    --recall_result_file "recall_result.txt" \
    --hnsw_m 100 \
    --hnsw_ef 100 \
    --recall_num 50 \
    --similar_text_pair_file "recall/dev.csv" \
    --corpus_file "recall/corpus.csv"

训练完成导出onnx模型:

def convert_model(model_path):

    try:
        import onnx
        import onnxruntime as ort
        import paddle2onnx
        from onnxconverter_common import float16
    except ImportError:
        print(
            "The inference precision is change to 'fp32', please install the dependencies that required for 'fp16' inference, pip install onnxruntime-gpu onnx onnxconverter-common"
        )
    onnx_dir = os.path.join(model_path, "onnx")

    if not os.path.exists(onnx_dir):
        os.mkdir(onnx_dir)
    float_onnx_file = os.path.join(onnx_dir, "model.onnx")
    if not os.path.exists(float_onnx_file):
        onnx_model = paddle2onnx.command.c_paddle_to_onnx(
            model_file=os.path.join(model_path, "inference.pdmodel"),
            params_file=os.path.join(model_path, "inference.pdiparams"),
            opset_version=13,
            enable_onnx_checker=True,
        )
        with open(float_onnx_file, "wb") as f:
            f.write(onnx_model)
    fp16_model_file = os.path.join(onnx_dir, "fp16_model.onnx")
    if not os.path.exists(fp16_model_file):
        onnx_model = onnx.load_model(float_onnx_file)
        trans_model = float16.convert_float_to_float16(onnx_model, keep_io_types=True)
        onnx.save_model(trans_model, fp16_model_file)

加载测试:

class MiniRocketQAEmbedding():

    def __init__(self, model_file: str = model_file, use_gpu: bool = True):

        providers = ['CUDAExecutionProvider'] if use_gpu else ['CPUExecutionProvider']
        sess_options = ort.SessionOptions()
        self.predictor = ort.InferenceSession( 
            model_file, sess_options=sess_options, providers=providers)
        self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_path)


    def embeding(self, embeding_text):
        features = self.tokenizer(embeding_text, max_seq_len=128,
                                  pad_to_max_seq_len=True, truncation_strategy="longest_first")

        vecs = self.predictor.run(None, features.data)
        return vecs[0]


    def similarity(self, pairs):
        query = pairs[0][0]
        texts = [item[1] for item in pairs]
        emdbeding_text = [query]
        emdbeding_text.extend(texts)
        features = self.tokenizer(emdbeding_text, max_seq_len=128,
                                  pad_to_max_seq_len=True, truncation_strategy="longest_first")

        vecs = self.predictor.run(None, features.data)

        # print(vecs)

        query_embeding = vecs[0][0]
        vecs_text1 = query_embeding / (query_embeding**2).sum() ** 0.5

        result = []
        for i in range(1, len(vecs[0])):
            vecs_text2 = vecs[0][i]
            vecs_text2 = vecs_text2 / (vecs_text2**2).sum() ** 0.5
            similarity = (vecs_text1 * vecs_text2).sum()
            result.append({"similarity": float(similarity)})

        return result

if __name__ == "__main__":
    bert = MiniRocketQAEmbedding(use_gpu=False)
    import time
    start = time.time()
    bert.embeding(["双鱼座性格特点","双鱼座性格特点"])
    print((time.time() - start) * 1000)

通过
MTEB框架
来测试自建搜索测试集效果:

if __name__ == '__main__':

    model = MyModel()
    task_names = ["SSRetrieval"]

    for task in task_names:
        model.query_instruction_for_retrieval = None
        evaluation = MTEB(tasks=[task], task_langs=['zh', 'zh-CN'])
        evaluation.run(model, output_folder=f"zh_results/256_model", batch_size=64)

测试结果:

{
  "dataset_revision": null,
  "dev": {
    "evaluation_time": 251.86,
    "map_at_1": 0.13427,
    "map_at_10": 0.62859,
    "map_at_100": 0.72526,
    "map_at_1000": 0.72564,
    "map_at_3": 0.31398,
    "map_at_5": 0.45025,
    "mrr_at_1": 0.71863,
    "mrr_at_10": 0.81982,
    "mrr_at_100": 0.82077,
    "mrr_at_1000": 0.82078,
    "mrr_at_3": 0.80707,
    "mrr_at_5": 0.81587,
    "ndcg_at_1": 0.71803,
    "ndcg_at_10": 0.77357,
    "ndcg_at_100": 0.83634,
    "ndcg_at_1000": 0.83907,
    "ndcg_at_3": 0.72048,
    "ndcg_at_5": 0.73003,
    "precision_at_1": 0.71803,
    "precision_at_10": 0.53373,
    "precision_at_100": 0.07386,
    "precision_at_1000": 0.00747,
    "precision_at_3": 0.68889,
    "precision_at_5": 0.65699,
    "recall_at_1": 0.13427,
    "recall_at_10": 0.78675,
    "recall_at_100": 0.98082,
    "recall_at_1000": 0.99181,
    "recall_at_3": 0.35371,
    "recall_at_5": 0.53211
  },
  "mteb_dataset_name": "SSRetrieval",
  "mteb_version": "1.1.1"
}

同样的数据集,用
peg模型
测试:

{
  "dataset_revision": null,
  "dev": {
    "evaluation_time": 1036.11,
    "map_at_1": 0.09911,
    "map_at_10": 0.42835,
    "map_at_100": 0.49497,
    "map_at_1000": 0.49681,
    "map_at_3": 0.2277,
    "map_at_5": 0.31901,
    "mrr_at_1": 0.56794,
    "mrr_at_10": 0.67666666,
    "mrr_at_100": 0.6737,
    "mrr_at_1000": 0.67386,
    "mrr_at_3": 0.65495,
    "mrr_at_5": 0.66559,
    "ndcg_at_1": 0.56794,
    "ndcg_at_10": 0.56275,
    "ndcg_at_100": 0.62991,
    "ndcg_at_1000": 0.64939,
    "ndcg_at_3": 0.66666664,
    "ndcg_at_5": 0.54815,
    "precision_at_1": 0.56794,
    "precision_at_10": 0.38468,
    "precision_at_100": 0.05755,
    "precision_at_1000": 0.00641,
    "precision_at_3": 0.53329,
    "precision_at_5": 0.49464,
    "recall_at_1": 0.09911,
    "recall_at_10": 0.55328,
    "recall_at_100": 0.7634,
    "recall_at_1000": 0.84758,
    "recall_at_3": 0.25931,
    "recall_at_5": 0.38263
  },
  "mteb_dataset_name": "SSRetrieval",
  "mteb_version": "1.1.1"
}
模型 Recall@1 Recall@10 Recall@100 Recall@1000
peg模型 9.911 55.328 76.34 84.758
微调256模型 13.427 78.675 98.082 99.181

可以看到,微调的模型,用更小的参数,见多识广后,整体效果明显优于未经历大规模数据训练的更大尺寸的模型。

参考