2024年1月

MySQL提供了丰富的日期和时间函数,用于处理和操作日期时间数据。本篇博文将深入介绍一些常用的MySQL日期函数,通过详细的例子带你了解这些函数的用法和实际应用。

1.
CURDATE() - 获取当前日期

CURDATE()
函数返回当前日期,不包含时间信息。

SELECT CURDATE();

结果可能类似于:

+------------+
| CURDATE()  |
+------------+
| 2024-01-18 |
+------------+

2.
NOW() - 获取当前日期时间

NOW()
函数返回当前日期和时间。

SELECT NOW();

结果可能类似于:

+---------------------+
| NOW()               |
+---------------------+
| 2024-01-18 13:30:45 |
+---------------------+

3.
DATE_FORMAT() - 格式化日期

DATE_FORMAT()
函数用于将日期格式化为指定的字符串。

SELECT DATE_FORMAT(NOW(), '%Y-%m-%d %H:%i:%s') AS formatted_date;

结果可能类似于:

+---------------------+
| formatted_date      |
+---------------------+
| 2024-01-18 13:30:45 |
+---------------------+

4.
DATEDIFF() - 计算日期差

DATEDIFF()
函数用于计算两个日期之间的天数差。

SELECT DATEDIFF('2024-01-20', '2024-01-18') AS date_difference;

结果可能类似于:

+------------------+
| date_difference  |
+------------------+
| 2                |
+------------------+

5.
DATE_ADD() - 日期加法

DATE_ADD()
函数用于在日期上加上一定的时间间隔。

SELECT DATE_ADD(NOW(), INTERVAL 7 DAY) AS future_date;

结果可能类似于:

+---------------------+
| future_date         |
+---------------------+
| 2024-01-25 13:30:45 |
+---------------------+

6.
DATE_SUB() - 日期减法

DATE_SUB()
函数用于在日期上减去一定的时间间隔。

SELECT DATE_SUB(NOW(), INTERVAL 3 MONTH) AS past_date;

结果可能类似于:

+---------------------+
| past_date           |
+---------------------+
| 2023-10-18 13:30:45 |
+---------------------+

7.
DATE() - 提取日期部分

DATE()
函数用于从日期时间值中提取日期部分。

SELECT DATE(NOW()) AS extracted_date;

结果可能类似于:

+---------------------+
| extracted_date      |
+---------------------+
| 2024-01-18          |
+---------------------+

结语

通过本文的详细介绍,你现在应该对MySQL日期函数有了更深入的理解。这些函数在实际应用中可以帮助你轻松处理和操作日期时间数据,使得数据库操作更为灵活和便捷。在实际项目中,根据需求合理使用这些日期函数,可以提高数据处理的效率和精确性。

Flink系列文章

  1. 第01讲:Flink 的应用场景和架构模型
  2. 第02讲:Flink 入门程序 WordCount 和 SQL 实现
  3. 第03讲:Flink 的编程模型与其他框架比较
  4. 第04讲:Flink 常用的 DataSet 和 DataStream API
  5. 第05讲:Flink SQL & Table 编程和案例
  6. 第06讲:Flink 集群安装部署和 HA 配置
  7. 第07讲:Flink 常见核心概念分析
  8. 第08讲:Flink 窗口、时间和水印
  9. 第09讲:Flink 状态与容错
  10. 第10讲:Flink Side OutPut 分流
  11. 第11讲:Flink CEP 复杂事件处理
  12. 第12讲:Flink 常用的 Source 和 Connector
  13. 第13讲:如何实现生产环境中的 Flink 高可用配置
  14. 第14讲:Flink Exactly-once 实现原理解析
  15. 第15讲:如何排查生产环境中的反压问题
  16. 第16讲:如何处理Flink生产环境中的数据倾斜问题
  17. 第17讲:生产环境中的并行度和资源设置

本章教程对 Apache Flink 的基本概念进行了介绍,虽然省略了许多重要细节,但是如果你掌握了本章内容,就足以对Flink实现可扩展并行度的 ETL、数据分析以及事件驱动的流式应用程序,有一个大致的了解。

Flink 是一个分布式系统,需要有效分配和管理计算资源才能执行流应用程序。它集成了所有常见的集群资源管理器,例如Hadoop YARN,但也可以设置作为独立集群甚至库运行。Flink 运行时由两种类型的进程组成:一个 JobManager 和一个或者多个 TaskManager。

Client 不是运行时和程序执行的一部分,而是用于准备数据流并将其发送给 JobManager。之后,客户端可以断开连接(分离模式),或保持连接来接收进程报告(附加模式)。客户端可以作为触发执行 Java/Scala 程序的一部分运行,也可以在命令行进程./bin/flink run ...中运行。

可以通过多种方式启动 JobManager 和 TaskManager:直接在机器上作为standalone 集群启动、在容器中启动、或者通过YARN等资源框架管理并启动。TaskManager 连接到 JobManagers,宣布自己可用,并被分配工作。

流处理

在自然环境中,数据的产生原本就是流式的。无论是来自 Web 服务器的事件数据,证券交易所的交易数据,还是来自工厂车间机器上的传感器数据,其数据都是流式的。但是当你分析数据时,可以围绕 有界流(bounded)或 无界流(unbounded)两种模型来组织处理数据,当然,选择不同的模型,程序的执行和处理方式也都会不同。

Flink 程序看起来像一个转换 DataStream 的常规程序。每个程序由相同的基本部分组成:

  1. 获取一个执行环境(execution environment);
  2. 加载/创建初始数据;
  3. 指定数据相关的转换;
  4. 指定计算结果的存储位置;
  5. 触发程序执行。

通常,你只需要使用 getExecutionEnvironment() 即可,因为该方法会根据上下文做正确的处理:如果你在 IDE 中执行你的程序或将其作为一般的 Java 程序执行,那么它将创建一个本地环境,该环境将在你的本地机器上执行你的程序。如果你基于程序创建了一个 JAR 文件,并通过命令行运行它,Flink 集群管理器将执行程序的 main 方法,同时 getExecutionEnvironment() 方法会返回一个执行环境以在集群上执行你的程序。

StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();

示例

如下是一个完整的、可运行的程序示例,它是基于流窗口的单词统计应用程序,计算 5 秒窗口内来自 Web 套接字的单词数。你可以复制并粘贴代码以在本地运行,需要的maven
依赖地址

package wordcount;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowWordCount {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("192.168.20.130", 9999)
                .flatMap(new Splitter())
                .keyBy(value -> value.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum(1);

        dataStream.print();
        System.out.println("parallelism -> " + env.getParallelism());

        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}

Linux安装nc工具:
yum install nc
,并且在命令行键入数据:

[root@hadoop-001 ~]# nc -lk 9999
flink flink spark
flink hadoop spark

程序执行结果:

# IDEA执行,默认flink并行度是8,可以env.setParallelism来设置
parallelism -> 8


1> (spark,1)
7> (flink,2)


1> (spark,1)
8> (hadoop,1)
7> (flink,1)

两个窗口的结果,可以看到,把
flink spark hadoop
三个单词的总次数一个不漏的算出来了。需要注意打印结果,
1>
表示编号为
1
的task打印的,代码的
gitee地址

我们知道了一个Flink程序通常有source -> transform -> sink,即 读取数据源,处理转换数据,结果保存 ,接下来将逐步介绍这些基本用法。

Data Sources

Source 是你的程序从中读取其输入的地方。你可以用 StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到你的程序。Flink 自带了许多预先实现的 source functions,不过你仍然可以通过实现 SourceFunction 接口编写自定义的非并行 source,也可以通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 类编写自定义的并行 sources。通过 StreamExecutionEnvironment 可以访问多种预定义的 stream source:

1 基于文件:

  • readTextFile(path) - 读取文本文件,例如遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。

  • readFile(fileInputFormat, path) - 按照指定的文件输入格式读取(一次)文件。

  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 这是前两个方法内部调用的方法。它基于给定的 fileInputFormat 读取路径 path 上的文件。根据提供的 watchType 的不同,source 可能定期(每 interval 毫秒)监控路径上的新数据(watchType 为 FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次当前路径中的数据然后退出(watchType 为 FileProcessingMode.PROCESS_ONCE)。使用 pathFilter,用户可以进一步排除正在处理的文件。

实现:

在底层,Flink 将文件读取过程拆分为两个子任务,即 目录监控 和 数据读取。每个子任务都由一个单独的实体实现。监控由单个非并行(并行度 = 1)任务实现,而读取由多个并行运行的任务执行。后者的并行度和作业的并行度相等。单个监控任务的作用是扫描目录(定期或仅扫描一次,取决于 watchType),找到要处理的文件,将它们划分为 分片,并将这些分片分配给下游 reader。Reader 是将实际获取数据的角色。每个分片只能被一个 reader 读取,而一个 reader 可以一个一个地读取多个分片。

重要提示:

  • 如果 watchType 设置为 FileProcessingMode.PROCESS_CONTINUOUSLY,当一个文件被修改时,它的内容会被完全重新处理。这可能会打破 “精确一次” 的语义,因为在文件末尾追加数据将导致重新处理文件的所有内容。

  • 如果 watchType 设置为 FileProcessingMode.PROCESS_ONCE,source 扫描一次路径然后退出,无需等待 reader 读完文件内容。当然,reader 会继续读取数据,直到所有文件内容都读完。关闭 source 会导致在那之后不再有检查点。这可能会导致节点故障后恢复速度变慢,因为作业将从最后一个检查点恢复读取。

2 基于套接字:

  • socketTextStream - 从套接字读取。元素可以由分隔符分隔。

3 基于集合:

  • fromCollection(Collection) - 从 Java Java.util.Collection 创建数据流。集合中的所有元素必须属于同一类型。

  • fromCollection(Iterator, Class) - 从迭代器创建数据流。class 参数指定迭代器返回元素的数据类型。

  • fromElements(T ...) - 从给定的对象序列中创建数据流。所有的对象必须属于同一类型。

  • fromParallelCollection(SplittableIterator, Class) - 从迭代器并行创建数据流。class 参数指定迭代器返回元素的数据类型。

  • generateSequence(from, to) - 基于给定间隔内的数字序列并行生成数据流。

4 自定义:

  • addSource - 关联一个新的 source function。例如,你可以使用 addSource(new FlinkKafkaConsumer<>(...)) 来从 Apache Kafka 获取数据。更多详细信息见连接器。

基本的stream source

这样将简单的流放在一起是为了方便用于原型或测试。StreamExecutionEnvironment 上还有一个 fromCollection(Collection) 方法。因此,你可以这样做:

List<Person> people = new ArrayList<Person>();

people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));

DataStream<Person> flintstones = env.fromCollection(people);

另一个获取数据到流中的便捷方法是用 socket

DataStream<String> lines = env.socketTextStream("localhost", 9999)
    
    
    
public static void demo4() throws Exception {
    StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();

    /**
     * 1. linux安装nc工具:yum install nc
     * 2. 发送数据: nc -lk 9999
      */
    DataStream<Person> persons = senv.socketTextStream("192.168.20.130", 9999)
            .map(line -> new Person(line.split(",")[0], Integer.valueOf(line.split(",")[1])));

    persons.print();
    senv.execute("DataSourceDemo");
}

或读取文件

DataStream<String> lines = env.readTextFile("file:///path");

在真实的应用中,最常用的数据源是那些支持低延迟,高吞吐并行读取以及重复(高性能和容错能力为先决条件)的数据源,例如 Apache Kafka,Kinesis 和各种文件系统,这将在后面的教程会经常使用Kafka Source。REST API 和数据库也经常用于增强流处理的能力(stream enrichment)。

由于篇幅,这里不会列出所有的代码,demo的
gitee地址

DataStream Transformations

转换主要常用的算子有map、flatMap、Filter、KeyBy、Window等,它们作用是对数据进行清洗、转换、分发等。这里列出几个常用算子,在以后的Flink程序编写中,这将是非常常用的。通常都需要用户自定义Function,可以通过1)实现接口;2)匿名类;3)Java8 Lambdas表达式;

1. Map算子 DataStream => DataStream

输入一个元素同时输出一个元素。下面是将输入流中元素数值加倍的 map function:

DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
});

2. FlatMap算子 DataStream => DataStream

输入一个元素同时产生零个、一个或多个元素。下面是将句子拆分为单词的 flatmap function:

dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});

3. Filter算子 DataStream => DataStream

为每个元素执行一个布尔 function,并保留那些 function 输出值为 true 的元素。下面是过滤掉零值的 filter:

dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});

KeyBy算子 DataStream => KeyedStream

在逻辑上将流划分为不相交的分区。具有相同 key 的记录都分配到同一个分区。在内部, keyBy() 是通过哈希分区实现的,有多种指定 key 的方式,以下是通过Java8 Lambdas表达式:

dataStream.keyBy(value -> value.getSomeKey());
dataStream.keyBy(value -> value.f0);

还可以通过实现
KeySelector
接口,来指定key。

Rich Functions

至此,你已经看到了 Flink 的几种函数接口,包括 FilterFunction, MapFunction,和 FlatMapFunction。这些都是单一抽象方法模式。对其中的每一个接口,Flink 同样提供了一个所谓 “rich” 的变体,如 RichFlatMapFunction,其中增加了以下方法,包括:

  • open(Configuration c)

  • close()

  • getRuntimeContext()

open() 仅在算子初始化时调用一次。可以用来加载一些静态数据,或者建立外部服务的链接等,比如从数据库读取配置。

getRuntimeContext() 为整套潜在有趣的东西提供了一个访问途径,最明显的,它是你创建和访问 Flink 状态的途径。

Data Sinks

Data sinks 使用 DataStream 并将它们转发到文件、套接字、外部系统或打印它们。Flink 自带了多种内置的输出格式,这些格式相关的实现封装在 DataStreams 的算子里:

  • writeAsText() / TextOutputFormat - 将元素按行写成字符串。通过调用每个元素的 toString() 方法获得字符串。

  • writeAsCsv(...) / CsvOutputFormat - 将元组写成逗号分隔值文件。行和字段的分隔符是可配置的。每个字段的值来自对象的 toString() 方法。

  • print() / printToErr() - 在标准输出/标准错误流上打印每个元素的 toString() 值。 可选地,可以提供一个前缀(msg)附加到输出。这有助于区分不同的 print 调用。如果并行度大于1,输出结果将附带输出任务标识符的前缀。

  • writeUsingOutputFormat() / FileOutputFormat - 自定义文件输出的方法和基类。支持自定义 object 到 byte 的转换。

  • writeToSocket - 根据 SerializationSchema 将元素写入套接字。

  • addSink - 调用自定义 sink function。Flink 捆绑了连接到其他系统(例如 Apache Kafka)的连接器,这些连接器被实现为 sink functions。

print() / printToErr() 主要是程序开发调试的时候,将一些中间结果打印到控制台,便于调试。

在实际业务开发中,通常会使用addSink ,里面传入一个SinkFunction对象,将结果保存到mysql等外部存储。

rows.addSink(new RichSinkFunction<Row>() {
    private Connection conn = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        if(conn == null) {
            Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
            conn = DriverManager.getConnection("jdbc:clickhouse://192.168.1.2:8123/test");
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        if(conn != null) {
            conn.close();
        }
    }

    @Override
    public void invoke(Row row, Context context) throws Exception {
        String sql = "";
        PreparedStatement ps = null;
        sql = "insert into table ...";
        ps = conn.prepareStatement(sql);
        ps.setInt(1, ...);
      
        ps.execute();

        if(ps != null) {
            ps.close();
        }
    }
});

在sink里面拿到数据库连接,通常在open()方法,并且组装sql,invoke()将其写入到数据库。

Flink 为流式/批式处理应用程序的开发提供了不同级别的抽象。

image-20231226210537809

  • Flink API 最底层的抽象为有状态实时流处理。其抽象实现是 Process Function,并且 Process Function 被 Flink 框架集成到了 DataStream API 中来为我们使用。它允许用户在应用程序中自由地处理来自单流或多流的事件(数据),并提供具有全局一致性和容错保障的状态。此外,用户可以在此层抽象中注册事件时间(event time)和处理时间(processing time)回调方法,从而允许程序可以实现复杂计算。

  • Flink API 第二层抽象是 Core APIs。实际上,许多应用程序不需要使用到上述最底层抽象的 API,而是可以使用 Core APIs 进行编程:其中包含 DataStream API(应用于有界/无界数据流场景)。Core APIs 提供的流式 API(Fluent API)为数据处理提供了通用的模块组件,例如各种形式的用户自定义转换(transformations)、联接(joins)、聚合(aggregations)、窗口(windows)和状态(state)操作等。此层 API 中处理的数据类型在每种编程语言中都有其对应的类。

  • Process Function 这类底层抽象和 DataStream API 的相互集成使得用户可以选择使用更底层的抽象 API 来实现自己的需求。DataSet API 还额外提供了一些原语,比如循环/迭代(loop/iteration)操作。

  • Flink API 第三层抽象是 Table API。Table API 是以表(Table)为中心的声明式编程(DSL)API,例如在流式数据场景下,它可以表示一张正在动态改变的表。Table API 遵循(扩展)关系模型:即表拥有 schema(类似于关系型数据库中的 schema),并且 Table API 也提供了类似于关系模型中的操作,比如 select、project、join、group-by 和 aggregate 等。Table API 程序是以声明的方式定义应执行的逻辑操作,而不是确切地指定程序应该执行的代码。尽管 Table API 使用起来很简洁并且可以由各种类型的用户自定义函数扩展功能,但还是比 Core API 的表达能力差。此外,Table API 程序在执行之前还会使用优化器中的优化规则对用户编写的表达式进行优化。

表和 DataStream/DataSet 可以进行无缝切换,Flink 允许用户在编写应用程序时将 Table API 与 DataStream/DataSet API 混合使用。

  • Flink API 最顶层抽象是 SQL。这层抽象在语义和程序表达式上都类似于 Table API,但是其程序实现都是 SQL 查询表达式。SQL 抽象与 Table API 抽象之间的关联是非常紧密的,并且 SQL 查询语句可以在 Table API 中定义的表上执行。

容错处理

流式处理遇到程序中断是很常见的异常,如何恢复,这将是很关键的,那么Flink又是如何进行容错的呢?

Checkpoint Storage

Flink 定期对每个算子的所有状态进行持久化快照,并将这些快照复制到更持久的地方,例如分布式文件系统hdfs。 如果发生故障,Flink 可以恢复应用程序的完整状态并恢复处理,就好像没有出现任何问题一样。

这些快照的存储位置是通过作业_checkpoint storage_定义的。 有两种可用检查点存储实现:一种持久保存其状态快照 到一个分布式文件系统,另一种是使用 JobManager 的堆。

状态快照如何工作?

Flink 使用 Chandy-Lamport algorithm 算法的一种变体,称为异步 barrier 快照(asynchronous barrier snapshotting)。

当 checkpoint coordinator(job manager 的一部分)指示 task manager 开始 checkpoint 时,它会让所有 sources 记录它们的偏移量,并将编号的 checkpoint barriers 插入到它们的流中。这些 barriers 流经 job graph,标注每个 checkpoint 前后的流部分。

image-20231226213132036

Checkpoint n 将包含每个 operator 的 state,这些 state 是对应的 operator 消费了严格在 checkpoint barrier n 之前的所有事件,并且不包含在此(checkpoint barrier n)后的任何事件后而生成的状态。

当 job graph 中的每个 operator 接收到 barriers 时,它就会记录下其状态。拥有两个输入流的 Operators(例如 CoProcessFunction)会执行 barrier 对齐(barrier alignment) 以便当前快照能够包含消费两个输入流 barrier 之前(但不超过)的所有 events 而产生的状态。

确保精确一次(exactly once)

当流处理应用程序发生错误的时候,结果可能会产生丢失或者重复。Flink 根据你为应用程序和集群的配置,可以产生以下结果:

  • Flink 不会从快照中进行恢复(at most once)

  • 没有任何丢失,但是你可能会得到重复冗余的结果(at least once)

  • 没有丢失或冗余重复(exactly once)

Flink 通过回退和重新发送 source 数据流从故障中恢复,当理想情况被描述为精确一次时,这并不意味着每个事件都将被精确一次处理。相反,这意味着 每一个事件都会影响 Flink 管理的状态精确一次。

Barrier 只有在需要提供精确一次的语义保证时需要进行对齐(Barrier alignment)。如果不需要这种语义,可以通过配置 CheckpointingMode.AT_LEAST_ONCE 关闭 Barrier 对齐来提高性能。

端到端精确一次

为了实现端到端的精确一次,以便 sources 中的每个事件都仅精确一次对 sinks 生效,必须满足以下条件:

  1. 你的 sources 必须是可重放的,并且

  2. 你的 sinks 必须是事务性的(或幂等的)

在Flink里面开启checkpoint只需要:

Job 升级与扩容

升级 Flink 作业一般都需要两步:第一,使用 Savepoint 优雅地停止 Flink Job。 Savepoint 是整个应用程序状态的一次快照(类似于 checkpoint ),该快照是在一个明确定义的、全局一致的时间点生成的。第二,从 Savepoint 恢复启动待升级的 Flink Job。 在此,“升级”包含如下几种含义:

  • 配置升级(比如 Job 并行度修改)

  • Job 拓扑升级(比如添加或者删除算子)

  • Job 的用户自定义函数升级

Step 1: 停止 Job
要优雅停止 Job,需要使用 JobID 通过 CLI 或 REST API 调用 “stop” 命令。 JobID 可以通过获取所有运行中的 Job 接口或 Flink WebUI 界面获取,拿到 JobID 后就可以继续停止作业了:

bin/flink stop <job-id>

client 预期输出

Suspending job "<job-id>" with a savepoint.
Suspended job "<job-id>" with a savepoint.

Savepoint 已保存在 state.savepoints.dir 指定的路径中,该配置在 flink-conf.yaml 中定义,flink-conf.yaml 挂载在本机的 /tmp/flink-savepoints-directory/ 目录下。 在下一步操作中我们会用到这个 Savepoint 路径,如果我们是通过 REST API 操作的, 那么 Savepoint 路径会随着响应结果一起返回,我们可以直接查看文件系统来确认 Savepoint 保存情况。

**Step 2: 重启 Job (不作任何变更) **

如果代码逻辑需要改变,现在你可以从这个 Savepoint 重新启动待升级的 Job。

flink run -s <savepoint-path> -p 3 -c MainClass -yid app_id /opt/ClickCountJob.jar

预期输出

Starting execution of program
Job has been submitted with JobID <job-id>

迟到的数据

对于数据延迟,Flink又是怎么处理的呢?这里先介绍2个概念。

Event Time and Watermarks

Flink 明确支持以下三种时间语义:

  • 事件时间(event time): 事件产生的时间,记录的是设备生产(或者存储)事件的时间;

  • 摄取时间(ingestion time): Flink 读取事件时记录的时间;

  • 处理时间(processing time): Flink pipeline 中具体算子处理事件的时间;

为了获得可重现的结果,例如在计算过去的特定一天里第一个小时股票的最高价格时,我们应该使用事件时间。这样的话,无论什么时间去计算都不会影响输出结果。然而如果使用处理时间的话,实时应用程序的结果是由程序运行的时间所决定。多次运行基于处理时间的实时程序,可能得到的结果都不相同,也可能会导致再次分析历史数据或者测试新代码变得异常困难。

EventTime就是我们的数据时间,Flink把每条数据称为Event;Watermarks就是每条数据允许的最大延迟;

公司组织春游,规定周六早晨8:00 ~ 8:30清查人数,人齐则发车出发,可是总有那么个同学会睡懒觉迟到,这时候通常也会等待20分钟,但是不能一直等下去,最多等到8:50,不会继续等待了,直接出发。在这个例子中,最晚期限时间是8:50 - 20分钟,watermark就是8:30对应的时间戳。

在基于窗口的允许延迟的Flink程序中,窗口最大时间,减去允许延迟的时间,也就是watermark,如果watermark大于window 结束时间,则触发计算。

本文中使用到的工具是Intellij IDEA和JDK 8,需要安装两款工具的小伙伴请查看这两篇教程:
点我查看安装JDK8教程

点我查看安装Intellij IDEA教程

假设我想在某宝上买一点零食(没错,我承认我确实是个吃货),经过搜索后出现了如下结果,我们发现每一项都包含相同内容:图片、标题、价格、购买人数、所在店铺名称。要想将每一个数据项展现给用户,就需要一个特定的“容器”来存储每一个数据项。

在日常生活中,"容器"通常是指一种用于装载、储存物质的器具。例如:水杯可以装水,衣柜可以装衣物。

仿照日常生活中的“容器”的定义,我们可以给程序中的容器做个说明:多个数据项聚合在一起,组成了一个装载数据的容器。这个容器对数据项进行访问、修改等操作。

接下来要讲的数组就属于容器的一种。

一、数组的概念

数组的定义:数组是一种数据结构,它用于
存储相同类型的元素
(如整数、字符串等)的有序集合。

1.1 数组相关的概念

1. 数组名:基本数据类型变量存在变量名,那么数组也有数组变量,
数组的变量名就是这个数组的名称
。例如:
numbers
代表这个数组存储的是数字。

2. 索引:
数组中每一个元素都有唯一的索引
(类似我们的身份证号),我们访问数组元素的值时会用到索引。数组的索引是有序的,这样做的好处是访问数组的元素非常快。

3. 元素:
数组元素是数组的基本组成单位
,元素的数据类型决定了数组的数据类型,相反,数组的数据类型也决定了每个元素的数据类型。

4. 数组的长度:
数组中存储元素的数量

5. 数组本身是
引用数据类型
。但是数组中的元素既可以是基本数据类型,也可以是引用数据类型。

6.
数组的大小是固定的,一旦创建,其大小就不能改变

1.2 数组的划分

按照数据类型划分,数组可以分成基本类型数组、引用数据类型数组。以基本类型数组为例,每一个数组元素的数据类型都是基本数据类型。

按照维度划分,数组可以划分成一维数组、二维数组、三维数组……

  • 一维数组中存储一组数据:

  • 二维数组本质上就是一维数组的每个元素再存储一个数组,每个元素对应的数组存储元素个数可能也不尽相同(本质上就是一维数组再嵌套一维数组):

就我个人日常开发而言,使用一维数组的次数多一些,用到二维数组的次数就几乎很少了(在做算法题的时候能用的到)

备注:本文后续提到的数组默认都是一维数组。

二、定义并初始化数组

2.1 定义数组变量

定义数组变量有两种方式:

方式一:方括号写到数据类型的后面。

// Java风格定义数组变量
数据类型[] 变量名;

这里的数据类型既可以是基本数据类型,也可以是引用数据类型。例如,我想定义一个
int
类型数组
arr1

String
类型数组
arr2
,可以写成如下形式:

// int类型数组
int[] arr1;
// 引用数据类型数组
String[] arr2;

方式二:如果你之前学过C或者C++的数组,也可以把方括号写到变量名后面。

数据类型 变量名[];

定义数组变量完成后,就需要创建数组了。创建数组一共有两种方式:静态初始化和动态初始化。

2.2 创建数组方式——静态初始化

静态初始化:
数组中存储的内容已经确定
,可以使用以下两种方式静态初始化:

数据类型[] 变量名 = new 数据类型[] {变量值1, 变量值2, 变量值3,..., 变量值n};

上述方式有简化写法,赋值符号右侧的
new 数据类型[]
可以省略,只保留大括号中的内容,写法如下:

数据类型[] 变量名 = {变量值1, 变量值2, 变量值3,..., 变量值n};

下面代码展示了上述两种方式创建数组:

/**
 * 数组静态初始化两种方式
 *
 * @author iCode504
 * @date 2024-01-12
 */
public class MyArrayDemo1 {
    public static void main(String[] args) {
        // new 数据类型[] {...}方式初始化数组
        int[] array1 = new int[]{1, 2, 3, 4, 5};
        double[] array2 = new double[]{4.2, 5.6, 7.6, 2.33, 8.88};
        String[] tastyFoods = new String[]{"砂锅麻辣烫", "过桥米线", "土豆粉", "砂锅居", "火锅"};

        // {...}方式初始化数组
        float[] array3 = {2.3f, 89.66f, 34.2f, -20.33f};
        long[] array4 = {666, 888, 999, 648};
        String[] changchunAttractions = {"净月潭公园", "南湖公园", "伪满皇宫", "长影世纪城", "雕塑公园"};
    }
}

注意:
使用花括号
{}
创建数组必须先定义数组变量的同时就将创建好的数组赋值给数组变量

先定义数组变量,再使用花括号的方式赋值是错误的,无法通过编译。以下是错误写法:

// 定义数组变量
int[] arr;
// 以花括号的方式对应的数组值赋值给变量无法通过编译
arr = {1, 2, 3, 4, 5};

在IDEA中这样写代码时,也会给出错误提示:

2.3 创建数组方式——动态初始化

前面讲到的静态初始化的方式创建数组有一个前提就是你已经确定数组中要存储什么。但是在大多数情况下,我们也不确定数组要存储什么,而是在后续的时候修改数组中元素的值,这时候我们就可以使用动态初始化的方式创建数组。

动态初始化:数组的长度可以确定,但是数组里面具体写什么内容还不确定,语法格式如下:

数据类型[] 变量名 = new 数据类型[n];

其中n指的是数组的长度,即数组中存储元素的数量。具体内容可以看下一部分:数组的长度。

以下是动态初始化方式创建数组:

/**
 * 动态初始化方式创建数组
 *
 * @author iCode504
 * @date 2024-01-13
 */
public class MyArrayDemo2 {
    public static void main(String[] args) {
        // 动态初始化方式创建长度为4的char数组、String数组、int数组
        char[] array1 = new char[4];
        String[] array2 = new String[4];
        int[] array3 = new int[4];
    }
}

三、数组的长度

假设有一个数组
arr
,要想获取数组的长度,我们只需要使用
arr.length
获取数组的长度即可。

长度为0的数组称作空数组。

定义数组时,
数组的长度必须是整数且大于等于0
,否则系统会抛出
数组长度为负数

NegativeArraySizeException
)异常。

/**
 * 数组的长度获取
 *
 * @author iCode504
 * @date 2023-12-21
 */
public class ArrayLength {
    public static void main(String[] args) {
        int[] array = {1, 3, 5, 7, 9, 11, 13};
        System.out.println("array.length = " + array.length);

        // 空数组的长度
        int[] array1 = new int[0];
        System.out.println("array1.length = " + array1.length);
        int[] array2 = {};
        System.out.println("array2.length = " + array2.length);
        // 数组长度必须要>=0,如果定义的数组默认长度是负数的话会抛出NegativeArraySizeException异常
        int[] array3 = new int[-1];
        System.out.println("array3.length = " + array3.length);
    }
}

运行结果:

四、访问数组元素

通过
数组名[索引值]
可以访问到这个索引值下的值,其中索引值必须是整数并且处于
[0, 数组的长度值)
范围内,如果超出这个范围,系统就会抛出数组索引越界异常
ArrayIndexOutOfBoundsException

/**
 * 数组元素的访问
 *
 * @author iCode504
 * @date 2023-12-21
 */
public class ArrayElementsAccess {
    public static void main(String[] args) {
        int[] array = {1, 3, 5, 7, 9, 11, 13};

        System.out.println("数组的长度是: " + array.length);
        System.out.println("array[0] = " + array[0]);
        System.out.println("array[3] = " + array[3]);
        System.out.println("array[6] = " + array[6]);
        // 超出[0, array.length)的范围会抛出异常
        System.out.println("array[7] = " + array[7]);
    }
}

运行结果:

从运行结果不难看出,数组索引的范围是
[0, 6]
,一旦索引值超出这个范围,Java会为我们抛出一个数组索引值越界异常(
ArrayIndexOutOfBoundsException
),这种情况在以后编写代码的过程中要尽量避免。

当我们能获取到数组元素的值时,我们就可以修改数组中的元素了。

我们还是以上面的数组为例,我想修改索引值为2的元素值为88,索引值为5的元素值为66:

/**
 * 数组元素的修改
 *
 * @author iCode504
 * @date 2024-01-17
 */
public class ArrayElementsModification {
    public static void main(String[] args) {
        int[] array = {1, 3, 5, 7, 9, 11, 13};
        System.out.println("修改前各个元素的值: ");
        System.out.println("array[0] = " + array[0]);
        System.out.println("array[1] = " + array[1]);
        System.out.println("array[2] = " + array[2]);
        System.out.println("array[3] = " + array[3]);
        System.out.println("array[4] = " + array[4]);
        System.out.println("array[5] = " + array[5]);
        System.out.println("array[6] = " + array[6]);

        // 修改索引值为2的元素为88,索引值为3的元素为66
        array[2] = 88;
        array[3] = 66;

        System.out.println("--------------------");
        System.out.println("修改后各个元素的值: ");
        System.out.println("array[0] = " + array[0]);
        System.out.println("array[1] = " + array[1]);
        System.out.println("array[2] = " + array[2]);
        System.out.println("array[3] = " + array[3]);
        System.out.println("array[4] = " + array[4]);
        System.out.println("array[5] = " + array[5]);
        System.out.println("array[6] = " + array[6]);
    }
}

运行结果符合预期:

五、数组的特点

上边讲了这么多数组的用法,这一部分我们就来简单总结以下数组的特点。其实这一部分我本来想放到前面来讲,但是后来一想放到前面容易给初学者说的云里雾里,于是将数组的特点放到这一部分。数组一共有如下几个特点:

1.
相同数据类型:
所有数组元素必须是相同的数据类型,可以是基本数据类型(如整数、浮点数等)或引用数据类型(如对象、字符串等)。

例如:假设有一个长度为5的
int
数组,不管是静态初始化还是动态初始化,里面存储的都是
int
类型的值。

/**
 * 数组特点1:所有数组元素都是相同类型
 *
 * @author iCode504
 * @date 2024-01-13
 */
public class ArrayCharacteristic1 {
    public static void main(String[] args) {
        // 定义一个int类型的数组
        int[] array = new int[5];

        // 错误写法,因为int[]数组所有元素必须是int类型。
        // 而long和double本身就比int范围大,不能直接赋值给数组元素。
        // 如需赋值,就需要进行强制类型转换(但是这个过程中也可能会出现一些问题)
        long number1 = 8;
        // array[2] = number1;
        double number2 = 20.34;
        // array[3] = number2;
        // int[] array2 = {1, 2, 3, 4, 8.88};

        // 正确写法,因为character1赋值给array[4]时,会将值自动类型提升为int类型
        // byte、short也同理
        char character1 = 'i';
        array[4] = character1;
        byte number3 = 30;
        array[4] = number3;
        short number4 = 40;
        array[4] = number4;
    }
}

2.
固定大小:
数组在创建时需要指定固定的大小,这个大小通常在数组声明时确定,且无法在运行时改变。这意味着数组的长度是固定的,无法动态调整。

我们可以使用反证法对上述内容进行证明,假设
数组定义了以后,可以动态调整
。通过这个假设,我们来编写一段代码证明一下上述假设是否可行:

/**
 * 数组特点2--创建数组后,无法动态调整数组长度
 *
 * @author iCode504
 * @date 2024-01-13
 */
public class ArrayCharacteristic2 {
    public static void main(String[] args) {
        // 定义一个长度为5的数组
        int[] array = new int[5];
        System.out.println("数组初始长度: " + array.length);
        // 如果数组长度可以动态调整,那么我在数组范围外再给数组元素赋值,此时数组长度会动态调整到这个范围外的索引值
        array[5] = 20;
        System.out.println("给数组范围外元素赋值后的长度: " + array.length);
    }
}

在这个程序中,我们尝试将一个元素添加到数组的第六个位置(索引为5),而此时抛出了
ArrayIndexOutOfBoundsException
(数组越界异常)。很显然,上述假设并不成立。

尝试访问超出数组长度的索引会导致程序异常。

3.
连续内存空间:
数组的元素在内存中是连续存储的,这也是通过索引直接访问数组元素的原因。

4.
索引访问:
数组中的每个元素都有一个唯一的索引,通过该索引可以访问或修改对应位置的元素。数组的索引从0开始。

这一点我们在第四部分数组元素的访问已经提到,创建指定长度的数组时,会为每一个数组元素分配一个索引值(从0到
arr.length - 1
且都是整数)。要想访问到数组元素值,必须通过
数组名[索引值]
访问每一个数组元素。

六、遍历数组

使用循环来遍历数组可以获取到数组的每一个元素。遍历数组有两种方式:普通循环遍历和
foreach
循环(也称作增强for循环)遍历。

6.1 普通循环遍历

我们可以使用数组的索引值,通过循环来遍历数组,这里我使用普通的
for
循环来遍历一个
String
类型的数组(当然,使用
while

do-while
循环也OK):

/**
 * 普通循环遍历数组
 *
 * @author iCode504
 * @date 2023-12-21
 */
public class ForArray {
    public static void main(String[] args) {
        String[] array = {"刘备", "关羽", "张飞", "诸葛亮", "赵云"};
        // 遍历范围:[0, array.length)
        for (int i = 0; i < array.length; i++) {
            // 用数组名和索引值访问到这个元素的值
            System.out.println(array[i]);
        }
    }
}

运行结果:

在IDEA中,我们可以使用
数组名.fori
快速生成一个
for
循环:

同理,如果想逆序输出数组,可以使用
数组名.forr

6.2 foreach循环遍历

foreach
循环是JDK 5的新特性,它也是一种循环结构,这个循环主要用于遍历
数组

集合
(集合后续会学习到),也称作增强
for
循环。

说明:在JDK 8中,在迭代器
Iterator<T>
接口添加了默认方法
foreach()
专门用来遍历集合,后续会在集合部分讲到。

foreach
循环的语法结构如下所示:

for (数据类型 变量名 : 数组/集合名) {
	// 执行代码...
}

在这个语法结构中,如果我们使用的是数组
arr
,那么结构中的变量名就相当于数组中每一个元素对应的变量名称,即
arr[i]

例如:假设要使用
foreach
循环遍历一个
String
类型的数组,使用方式和普通循环遍历语法要简单一些。

/**
 * foreach循环遍历数组
 * 
 * @author iCode504
 * @date 2023-12-21
 */
public class ForeachArray {
    public static void main(String[] args) {
        String[] array = {"刘备", "关羽", "张飞", "诸葛亮", "赵云"};
        // foreach循环遍历数组
        for (String s : array) {
            System.out.println(s);
        }
    }
}

运行结果:

在IDEA中也为我们设置了快捷生成
foreach
循环的快捷键
数组名.for
,就能快速生成一个
foreach
循环:

七、数组元素的默认值

整数类型(
byte

short

int

long
)的数组初始化时,每一个数组元素的默认值是0。

/**
 * 基本数据类型--整数类型数组元素的默认值
 *
 * @author iCode504
 * @date 2023-12-21
 */
public class ArrayElementsDefaultValue1 {
    public static void main(String[] args) {
        // 基本数据类型--byte类型数组元素的默认值
        byte[] byteArray = new byte[5];
        System.out.println("byte[]元素的默认值是: ");
        for (byte b : byteArray) {
            System.out.print(b + "\t");
        }
        System.out.println();

        // 基本数据类型--short类型数组元素的默认值
        short[] shortArray = new short[5];
        System.out.println("short[]元素的默认值是: ");
        for (short s : shortArray) {
            System.out.print(s + "\t");
        }
        System.out.println();

        // 基本数据类型--int类型数组元素的默认值
        int[] intArray = new int[5];
        System.out.println("int[]元素的默认值是: ");
        for (int i : intArray) {
            System.out.print(i + "\t");
        }
        System.out.println();

        // 基本数据类型--long类型数组元素的默认值
        long[] longArray = new long[5];
        System.out.println("long[]元素的默认值是: ");
        for (long l : longArray) {
            System.out.print(l + "\t");
        }
        System.out.println();
    }
}

运行结果:

浮点类型(
float

double
)的数组初始化,每一个数组元素的默认值是0.0。

/**
 * 基本数据类型--浮点类型数组元素的默认值
 *
 * @author iCode504
 * @date 2023-12-21
 */
public class ArrayElementsDefaultValue2 {
    public static void main(String[] args) {
        // 基本数据类型--float类型数组元素的默认值
        float[] floatArray = new float[5];
        System.out.println("float[]元素的默认值是: ");
        for (float f : floatArray) {
            System.out.print(f + "\t");
        }
        System.out.println();

        // 基本数据类型--double类型数组元素的默认值
        double[] doubleArray = new double[5];
        System.out.println("double[]元素的默认值是: ");
        for (double d : doubleArray) {
            System.out.print(d + "\t");
        }
        System.out.println();
    }
}

字符类型(
char
)的数组初始化时,每一个数组元素的默认值是
\u0000
(即Unicode字符表的第一个字符)。

布尔类型(
boolean
)的数组初始化时,每一个数组元素的默认值是
false

/**
 * 基本数据类型--字符类型和布尔类型数组的默认值
 *
 * @author iCode504
 * @date 2023-12-21
 */
public class ArrayElementsDefaultValue3 {
    public static void main(String[] args) {

        char[] charArray = new char[5];
        System.out.println("char[]元素的默认值是: ");
        for (char c : charArray) {
            // 间接验证每一个char数组元素默认值是否是Unicode字符表的第一个元素
            System.out.print((c == 0) + "\t");
            // 写成下面的形式验证也OK
            // System.out.print((c == '\u0000') + "\t");
        }
        System.out.println();

        // 基本数据类型--boolean类型数组元素的默认值
        boolean[] booleanArray = new boolean[5];
        System.out.println("boolean[]元素的默认值是: ");
        for (boolean b : booleanArray) {
            System.out.print(b + "\t");
        }
        System.out.println();
    }
}

运行结果:

引用数据类型的数组初始化时,每一个数组元素的默认值是
null

import java.util.Random;

/**
 * 引用数据类型数组的默认值
 *
 * @author iCode504
 * @date 2023-12-21
 */
public class ArrayElementsDefaultValue4 {
    public static void main(String[] args) {
        // 引用数据类型数组元素的默认值
        String[] strArray = new String[5];
        System.out.println("String[]元素的默认值是: ");
        for (String s : strArray) {
            System.out.print(s + "\t");
        }
        System.out.println();

        Random[] randomArray = new Random[5];
        System.out.println("Random[]元素的默认值是: ");
        for (Random random : randomArray) {
            System.out.print(random + "\t");
        }
    }
}

运行结果:

八、知识点总结

数组的概念与一维数组知识点总结如下图所示:

如需高清大图,请点击右侧链接下载:
点我下载

数组篇-其之一-数组的概念与一维数组-知识点总结

1、准备材料

正点原子stm32f407探索者开发板V2.4

STM32CubeMX软件(
Version 6.10.0

keil µVision5 IDE(
MDK-Arm

ST-LINK/V2驱动

野火DAP仿真器

XCOM V2.6串口助手

一台示波器

2、实验目标

使用STM32CubeMX软件配置STM32F407开发板的
DAC OUT1实现输出三角波

3、实验流程

3.0、前提知识

STM32F407的DAC输出引脚除可以输出 DACoutput = VREF+ * DOR / 4095 的模拟电压之外,其DAC控制逻辑中还有两个重要的波形生成器 Wave generation mode ,
分别为三角波和噪声波,本小节的实验主要以生成三角波为例
,只会在“3.0、前提知识”中简单提到噪声波相关内容,在实际生成过程中两者的设置类似,且均简单易理解

使用DAC输出指定三角波/噪声波需要先指定DAC的输出触发源
Trigger
,DAC输出的触发源一共有7个,包括Timer 2/4/5/6/7/8 Trigger Out event和Software trigger,一般使用定时器的溢出时间作为DAC输出的触发源,本实验采用了TIM6的溢出更新事件作为DAC OUT1 三角波的触发源,所有可选的触发源如下图所示

当DAC输出三角波时需要设置参数
Maximum Triangle Amplitude
,当触发源定时器每次产生溢出更新事件时,DAC的输出值就会从基值增加1/减少1,因为TIM6基础定时器只能向上计数,因此当TIM6每次溢出时,DAC的输出会增加1,直到增加到设置的
Maximum Triangle Amplitude
参数值为止,然后逐渐减少直到基值,这个过程会反复执行从而生成三角波

上述过程如下图所示
(注释1)

当DAC输出伪噪声波时需要设置
Noise Amplitude
参数,其主要配置生成噪声波使用的12位LFSR寄存器解锁的位,如下图所示为DAC使用LFSR寄存器生成伪噪声的算法结构图,这里具体不做深究
(注释1)

3.1、CubeMX相关配置

3.1.0、工程基本配置

打开STM32CubeMX软件,单击ACCESS TO MCU SELECTOR选择开发板MCU(选择你使用开发板的主控MCU型号),选中MCU型号后单击页面右上角Start Project开始工程,具体如下图所示

开始工程之后在配置主页面System Core/RCC中配置HSE/LSE晶振,在System Core/SYS中配置Debug模式,具体如下图所示

详细工程建立内容读者可以阅读“
STM32CubeMX教程1 工程建立

3.1.1、时钟树配置

系统时钟使用8MHz外部高速时钟HSE,HCLK、PCLK1和PCLK2均设置为STM32F407能达到的最高时钟频率,具体如下图所示

3.1.2、外设参数配置

在Pinout & Configuration页面左边功能分类栏目Analog中单击其中DAC

在Mode中勾选OUT1 Configuration

将DAC OUT1的触发源选择为TIM6外部触发,最大三角波幅值设置为4095

具体配置如下图所示

在Pinout & Configuration页面左边功能分类栏目Timers中单击其中TIM6

勾选Activated激活定时器,配置其计数器参数溢出时间为0.1ms,具体参数解释请阅读“
STM32CubeMX教程5 TIM 定时器概述及基本定时器

外部事件触发选择更新事件Updata Event,具体配置如下图所示

3.1.3、外设中断配置

此实验无需开启DAC的任何中断

3.2、生成代码

3.2.0、配置Project Manager页面

单击进入Project Manager页面,在左边Project分栏中修改工程名称、工程目录和工具链,然后在Code Generator中勾选“Gnerate peripheral initialization as a pair of 'c/h' files per peripheral”,最后单击页面右上角GENERATE CODE生成工程,具体如下图所示

详细Project Manager配置内容读者可以阅读”
STM32CubeMX教程1 工程建立
“实验3.4.3小节

3.2.1、外设初始化调用流程

请阅读“
STM32CubeMX教程16 DAC - 输出3.3V内任意电压
”实验“3.2.1、外设初始化调用流程”小节

3.2.2、外设中断调用流程

此实验无需开启DAC的任何中断

3.2.3、添加其他必要代码

在主函数中启动DAC通道1输出,默认基值设置为0即可,源代码如下所示

/*启动DAC输出*/
HAL_DAC_Start(&hdac,DAC_CHANNEL_1);
/*设置DAC三角波输出基值*/
int32_t DacValue=0;
HAL_DAC_SetValue(&hdac,DAC_CHANNEL_1,DAC_ALIGN_12B_L,DacValue);
/*启动TIM6触发源*/
HAL_TIM_Base_Start(&htim6);
printf("Reset\r\n");

4、常用函数

请阅读“
STM32CubeMX教程16 DAC - 输出3.3V内任意电压
”实验

5、烧录验证

烧录程序,单片机上电后,将示波器的探头挂钩与DAC OUT1引脚PA4相连接,接地环与开发板上的GND引脚连接,将示波器每格电压幅值调节为1.00V,将每格子采集时间调节为400ms,然后开启示波器对DAC OU1输出的波形采集

设置Maximum Triangle Amplitud最大三角波幅值设置为2047时由示波器采集到的三角波如下图所示,
其中三角波的幅值电压为1.48V,大致为3.3V的一半,波形频率为2.446Hz,计算的周期大约为408.8ms,定时器溢出时间为0.1ms,期待的周期为0.1
2048
2=409.6ms

,与示波器采集结果大致一致

设置Maximum Triangle Amplitud最大三角波幅值设置为4095时由示波器采集到的三角波如下图所示,
其中三角波的幅值电压为2.96V,波形频率为1.207Hz,计算的周期大约为828.5ms,定时器溢出时间为0.1ms,期待的周期为0.1
4096
2=819.2ms

,与示波器采集结果大致一致

6、注释解析

注释1
:图片来源STM32F4xx 中文参考手册

参考资料

STM32Cube高效开发教程(基础篇)

更多内容请浏览
STM32CubeMX+STM32F4系列教程文章汇总贴

2023 是 AI 大爆发的一年,这一年我在我的生产力工具中(
一个叫 lowcode 的 vscode 插件
)接入了 ChatGPT API,插件也进行了重构,日常搬砖也因为 ChatGPT 的引入发生了很大的变化。

在介绍 ChatGPT 是如何与
lowcode
插件结合之前,先说说
lowcode
插件的发展历史,毕竟从 2020 年第一个版本发布到现在也迭代 3 年多了。

介绍

轮子的产生

一开始写这么个插件的目的为了拉取 YAPI 接口文档信息生成前端 API 请求方法,如下

export interface IFetchUserListResult {
  code: number;
  msg: string;
  result: {
    rows: {
      name: string;
      age: number;
      mobile: string;
      address: string;
      tags: string[];
      id: number;
    }[];
    total: number;
  };
}

export interface IFetchUserListParams {
  name?: string;
  page: number;
  size: number;
}

/**
 * 用户列表
 * http://yapi.smart-xwork.cn/project/129987/interface/api/1796953
 * @author 划水摸鱼糊屎工程师
 *
 * @param {IFetchUserListParams} params
 * @returns
 */
export function fetchUserList(params: IFetchUserListParams) {
  return request<IFetchUserListResult>(`${env.API_HOST}/api/user/page`, {
    method: 'GET',
    params,
  });
}

之后增加了根据 JSON 生成 API 请求、根据 JSON 生成 TS 类型等功能。

物料的概念

再之后就是引入了物料的概念:代码片段和区块。上面说的根据 YAPI 接口信息生成 API 请求方法、根据 JSON 生成 API 请求、根据 JSON 生成 TS 类型都属于代码片段,只在当前激活的文件里生成代码。而区块就是在多个文件里生成代码(或者说创建多个文件)。

代码片段

代码片段可以通过右键菜单、输入提示、可视化界面进行使用,区块只能通过可视化界面使用。

右键菜单

image.png

image.png

输入提示

image.png

输入提示类似 vscode 自带的代码片段功能,同时兼容 vscode 代码片段的语法

可视化界面

image.png

代码片段可视化的功能目前我也很少用到(可以不用,但不能没有)

区块

前面也说了,区块是为了在多个文件里生成代码(或者创建多个文件)。比如写一个 react 组件的时候,可能包含 js 文件和 css 文件。

image.png

image.png

不同区块的 Schema 表单不一样,产生不一样的模板数据,就可以达到模板数据 + 模板生成代码的目的。

内部细节

插件读取项目根目录下的 materials/blocks 作为区块,读取 materials/snippets 作为代码片段。

image.png

目前已经支持配置任意目录,在所有项目中共享物料

代码片段和区块目录内的内容如下

image.png

image.png

主要是 src 目录内的内容存在差异,代码片段的 src 目录内必须是 template.ejs 文件,区块的 src 目录内可以是任意内容。生成代码的时候,使用 ejs 模板引擎编译 ejs 文件。代码片段是将编译以后的内容插入到编辑器光标所在的位置,区块是将编译后的文件拷贝到指定的目录里(非 ejs 文件直接拷贝)。

model.json

默认模板数据

preview.json

物料相关配置

{
	"title": "",
	"description": "",
	"img": [
		"https://gitee.com/img-host/img-host/raw/master/2020/11/05/1604587962875.jpg"
	],
	"category": [],
	"notShowInCommand": true,
	"notShowInSnippetsList": true,
	"notShowInintellisense": true,
	"showInRunSnippetScript": true,
	"schema": "amis",
	"scripts": []
}

schema.json

可视化界面 Schema 表单配置,支持 form-render、formily、amis。

image.png

script/index.js

模板编译周期钩子函数

module.exports = {
  beforeCompile: context => {
    console.log(context)
  },
  afterCompile: context => {
    console.log(context)
  },
  complete: context => {
    console.log(context)
  },
}

重构之后会利用这个文件做更多有趣的事

缺陷

右键菜单使用代码片段的适用范围有限

export const generateCode = (context: vscode.ExtensionContext) => {
  context.subscriptions.push(
    vscode.commands.registerTextEditorCommand(
      'lowcode.generateCode',
      async () => {
        const rawClipboardText = getClipboardText();
        let clipboardText = rawClipboardText.trim();

        clipboardText = JSON.stringify(jsonParse(clipboardText));

        const validYapiId = isYapiId(clipboardText);

        const validJson = jsonIsValid(clipboardText);

        const valid = validJson || validYapiId;
        if (valid) {
          if (validYapiId) {
            await genCodeByYapi(clipboardText, rawClipboardText);
          } else {
            await genCodeByJson(clipboardText, rawClipboardText);
          }
          return;
        }
        try {
          await genCodeByTypescript(rawClipboardText, rawClipboardText);
        } catch {
          window.showErrorMessage('请复制Yapi接口ID或JSON字符串或TS类型');
        }
      },
    ),
  );
};

代码里写死了逻辑,只能处理 json 、ts 类型、YAPI 接口。

与 ChatGPT 的结合

引入 ChatGPT 的最初目的是为了翻译区块 Schema 表单对应的模板数据里的指定字段。

翻译物料模板数据指定字段

image.png

image.png

点击 Ask ChatGPT 就会打开 ChatGPT 的 WebView 界面,并自动发送预设的 Prompt。

image.png

image.png

预设的 Prompt 就是放在 viewPrompt.ejs 中

image.png

内容如下:

<%- model %> 将这段 json 中,filters 字段中的 key 字段翻译为英文,使用驼峰语法,label、placeholder
保留中文。columns 字段中的 key、dataIndex 字段翻译为英文,使用驼峰语法,title 字段保留中文。
返回翻译后的 markdown 语法的代码块

这种方式和 ChatGPT 交流会有各种玄学问题,比如翻译的字段不对或者所有的字段都翻译了,也可能是我写的 Prompt 有问题,这个功能也几乎不用了,后面会介绍另一种方式。

代码片段当作 Prompt 管理工具

看过几个 vscode 里 ChatGPT 的插件,大都是写死几个菜单,比如解释一段代码的意思、重构一段代码、给代码添加单元测试,说实话,有点 low low 的。

我是加了两个菜单:Ask ChatGPT、Ask ChatGPT With Template

image.png

Ask ChatGPT

逻辑很简单,直接把当前选中的代码或者剪贴板里的内容原封不动的发给 ChatGPT。

vscode.commands.registerCommand('lowcode.askChatGPT', () => {
      showChatGPTView({
        task: {
          task: 'askChatGPT',
          data: getSelectedText() || getClipboardText(),
        },
      });
    }),

其实这个菜单完全可以去掉,用 Ask ChatGPT With Template 也能实现

Ask ChatGPT With Template

顾名思义就是根据不同的场景使用不同的 Prompt 模版去问 ChatGPT。

image.png

只需要在代码片段的目录下添加 commandPrompt.ejs 文件即可

image.png

内容可能如下

下面我让你来充当翻译家,你的目标是把中文翻译成英文单词,请翻译时使用驼峰格式,小写字母开头,不要带翻译腔,而是要翻译得自然、流畅和地道,使用优美和高雅的表达方式。
请翻译下面的内容:“<%- rawSelectedText || rawClipboardText %>”

image.png

重构、优化

之前提到过右键菜单使用代码片段的适用范围有限,只能处理 json 、ts 类型、YAPI 接口。接入 ChatGPT 后,又加了两个菜单。如果之后要加什么新功能还得接着加菜单,那就太 low 了。

虽然引入了 ChatGPT,但是 ChatGPT 的交互页面是独立的,ChatGPT 返回结果后还需要手动复制,我这种懒人是无法接受的。

webview 界面调用 nodejs 脚本

可视化界面配置表单还是挺费时的,而且原来的 Ask ChatGPT 的功能也比较玄学。加了一个“执行脚本”的按钮,可以实现调用物料目录下 src/index.js 文件内指定方法。

image.png

image.png

image.png

在使用 ChatGPT 进行翻译的时候,使用了 TypeChat(关于 TypeChat 可以看
TypeChat、JSONSchemaChat实战 - 让ChatGPT更听你的话
),但是并不需要在插件内部引入 TypeChat。如下

image.png

export async function handleAskChatGPT() {
  const { lowcodeContext } = context;
  const schema = fs.readFileSync(
    path.join(lowcodeContext!.materialPath, 'config/schema.ts'),
    'utf8',
  );
  const typeName = 'PageConfig';
  const res = await translate<PageConfig>({
    schema,
    typeName,
    request: JSON.stringify(lowcodeContext!.model as PageConfig),
    completePrompt:
      `你是一个根据以下 TypeScript 类型定义将用户请求转换为 "${typeName}" 类型的 JSON 对象的服务,并且按照字段的注释进行处理:\n` +
      `\`\`\`\n${schema}\`\`\`\n` +
      `以下是用户请求:\n` +
      `"""\n${JSON.stringify(lowcodeContext!.model as PageConfig)}\n"""\n` +
      `The following is the user request translated into a JSON object with 2 spaces of indentation and no properties with the value undefined:\n`,
    createChatCompletion: lowcodeContext!.createChatCompletion,
    showWebview: true,
    extendValidate: (jsonObject) => ({ success: true, data: jsonObject }),
  });
  lowcodeContext!.outputChannel.appendLine(JSON.stringify(res, null, 2));
  if (res.success) {
    return { ...res.data };
  }
  return lowcodeContext!.model;
}

脚本方法执行完后将模版数据(model)返回,省去手动复制。

可以像写业务代码一样,根据自己需要添加各种处理方法,尝试各种新的技术。

如果 schema 表单用的是 amis,还可以在 schema 中配置执行脚本,比如:

{
	"type": "button",
	"label": "插入// lowcode-model-import-api",
	"onEvent": {
		"click": {
			"actions": [{
				"actionType": "runScript",
				"args": {
					"method": "insertPlaceholder",
					"params": "// lowcode-model-import-api"
				}
			}]
		}
	}
}

点击按钮的时候,会调用
insertPlaceholder
方法,参数为
// lowcode-model-import-api

Run Snippet Script

添加了
Run Snippet Script
菜单

image.png

image.png

选择对应的模版(代码片段)后,会执行模版目录下 src/index.js 的
onSelect
方法,方法里可以写任何逻辑。

只需要将代码片段目录下的
config/preview.json
文件里的
showInRunSnippetScript
设置为
true
,代码片段就会出现在菜单中。

{
	"title": "",
	"description": "",
	"img": [
		"https://gitee.com/img-host/img-host/raw/master/2020/11/05/1604587962875.jpg"
	],
	"category": [],
	"notShowInCommand": false,
	"notShowInSnippetsList": true,
	"notShowInintellisense": true,
	"showInRunSnippetScript": true,
	"schema": "amis",
	"scripts": []
}

这个功能的加入,可以做很多有趣的事情,如下:

axios-request-api

把插件内部根据 YAPI 接口文档信息生成前端 API 请求方法的代码挪到了外面,并且加了个有意思的功能,让 ChatGPT 生成请求方法的名称,部分代码如下:

const res = await fetchApiDetailInfo(domain, yapiId, token);
  if (!res.data.data) {
    throw res.data.errmsg;
  }
  funcName = await context.lowcodeContext!.createChatCompletion({
    messages: [
      {
        role: 'system',
        content: `你是一个代码专家,按照用户传给你的 api 接口地址,和接口请求方法,根据接口地址里的信息推测出一个生动形象的方法名称,驼峰格式,返回方法名称`,
      },
      {
        role: 'user',
        content: `api 地址:${res.data.data.query_path},${res.data.data.method} 方法,作用是${res.data.data.title}`,
      },
    ],
  });
  typeName = `I${funcName.charAt(0).toUpperCase() + funcName.slice(1)}Result`;

完整代码:
https://github.com/lowcode-scaffold/lowcode-materials/tree/master/materials/snippets/axios-request-api-外挂脚本/script

OCR

使用百度 OCR 识别图片文字

因为 nodejs 没法读取剪贴板里的图片,只能打开一个 webview 去读取,核心代码如下:

import { window, Range, env } from 'vscode';
import { generalBasic } from '../../../../../share/BaiduOCR/index';
import { context } from './context';

export async function bootstrap() {
  const { lowcodeContext } = context;
  const clipboardImage = await lowcodeContext?.getClipboardImage();
  const ocrRes = await generalBasic({ image: clipboardImage! });
  const words = ocrRes.words_result.map((s) => s.words).join(',');
  env.clipboard.writeText(words).then(() => {
    window.showInformationMessage('内容已经复制到剪贴板');
  });
  window.activeTextEditor?.edit((editBuilder) => {
    // editBuilder.replace(activeTextEditor.selection, content);
    if (window.activeTextEditor?.selection.isEmpty) {
      editBuilder.insert(window.activeTextEditor.selection.start, words);
    } else {
      editBuilder.replace(
        new Range(
          window.activeTextEditor!.selection.start,
          window.activeTextEditor!.selection.end,
        ),
        words,
      );
    }
  });
}

启动一个 nestjs 服务

image.png

image.png

import { Controller, Get } from '@nestjs/common';
import { AppService } from './app.service';

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) {}

  @Get()
  getMaterialPath() {
    return this.appService.getMaterialPath();
  }
}

import { Injectable } from '@nestjs/common';
import { context } from './context';

@Injectable()
export class AppService {
  getMaterialPath() {
    return context.lowcodeContext?.materialPath;
  }
}

完整代码:
https://github.com/lowcode-scaffold/lowcode-materials/tree/master/materials/snippets/start nest api server/script

生成 value-label 格式 JSON

使用了
TypeChat
,ChatGPT 返回的结果有提问,最终重试之后正确了。

完整代码:
https://github.com/lowcode-scaffold/lowcode-materials/tree/master/materials/snippets/生成 value-label 格式 JSON/script

翻译成驼峰格式

代码:

import { env, window, Range } from 'vscode';
import { context } from './context';

export async function bootstrap() {
  const clipboardText = await env.clipboard.readText();
  const { selection, document } = window.activeTextEditor!;
  const selectText = document.getText(selection).trim();
  let content = await context.lowcodeContext!.createChatCompletion({
    messages: [
      {
        role: 'system',
        content: `你是一个翻译家,你的目标是把中文翻译成英文单词,请翻译时使用驼峰格式,小写字母开头,不要带翻译腔,而是要翻译得自然、流畅和地道,使用优美和高雅的表达方式。请翻译下面用户输入的内容`,
      },
      {
        role: 'user',
        content: selectText || clipboardText,
      },
    ],
  });
  content = content.charAt(0).toLowerCase() + content.slice(1);
  window.activeTextEditor?.edit((editBuilder) => {
    if (window.activeTextEditor?.selection.isEmpty) {
      editBuilder.insert(window.activeTextEditor.selection.start, content);
    } else {
      editBuilder.replace(
        new Range(
          window.activeTextEditor!.selection.start,
          window.activeTextEditor!.selection.end,
        ),
        content,
      );
    }
  });
}

当前目录翻译成英文

代码:

import * as path from 'path';
import * as vscode from 'vscode';
import * as fs from 'fs-extra';
import { context } from './context';

export async function bootstrap() {
  const { lowcodeContext } = context;
  const explorerSelectedPath = path
    .join(lowcodeContext?.explorerSelectedPath || '')
    .replace(/\\/g, '/');
  const explorerSelectedPathArr = explorerSelectedPath.split('/');
  const name = explorerSelectedPathArr.pop();
  vscode.window.withProgress(
    {
      location: vscode.ProgressLocation.Notification,
    },
    async (progress) => {
      progress.report({
        message: `loading`,
      });

      let content = await context.lowcodeContext!.createChatCompletion({
        messages: [
          {
            role: 'system',
            content: `你是一个翻译家,你的目标是把中文翻译成英文单词,请翻译时使用驼峰格式,小写字母开头,不要带翻译腔,而是要翻译得自然、流畅和地道,使用优美和高雅的表达方式。请翻译下面用户输入的内容`,
          },
          {
            role: 'user',
            content: name || '',
          },
        ],
      });
      content = content.charAt(0).toLowerCase() + content.slice(1);
      fs.renameSync(
        path.join(lowcodeContext?.explorerSelectedPath || ''),
        path.join(explorerSelectedPathArr.join('/'), content),
      );
    },
  );
}

快速创建区块

代码:

import * as path from 'path';
import { window } from 'vscode';
import * as fs from 'fs-extra';
import { context } from './context';
import { renderEjsTemplates } from '../../../../../share/utils/ejs';

export async function bootstrap() {
  const { lowcodeContext } = context;
  const result = await window.showQuickPick(
    [
      'uniapp/vue3-mvp',
      'uniapp/vue3-mvp emit',
      'uniapp/vue3-mvp props',
      'uniapp/vue3-mvp props emit',
    ].map((s) => s),
    { placeHolder: '请选择模板' },
  );
  if (!result) {
    return;
  }
  const tempWorkPath = path.join(
    lowcodeContext?.env.rootPath || '',
    '.lowcode',
  );
  fs.copySync(path.join(lowcodeContext?.materialPath || ''), tempWorkPath);
  await renderEjsTemplates(
    {
      createBlockPath: path
        .join(lowcodeContext?.explorerSelectedPath || '')
        .replace(/\\/g, '/'),
    },
    path.join(tempWorkPath, 'src'),
  );
  fs.copySync(
    path.join(tempWorkPath, 'src', result),
    path.join(lowcodeContext?.explorerSelectedPath || ''),
  );
  fs.removeSync(tempWorkPath);
}

打开 WebView

右边 WebView 是一个独立的工程,部署在 vercel 上,主要为了学一下 UnoCSS,后续可能会把
screenshot-to-code
抄过来

完整代码:
https://github.com/lowcode-scaffold/lowcode-materials/tree/master/materials/snippets/打开webview/script

WebView 项目代码(Vue):
lowcode-webview-vue

WebView 项目代码(React):
lowcode-webview-react-vite

无限可能

上面列举了我常用的一些功能,以及正在尝试的东西,可以看出 lowcode 插件的自由度已经很高了,后续如果出现了什么好玩的技术可以立即接入玩一下。

遗憾

2023 对图片相关的 AI 研究的比较少,也想不到有什么使用场景。

2024 研究一下 Design to Code + AI 的落地。

源码

插件源码:
https://github.com/lowcoding/lowcode-vscode

物料源码:
https://github.com/lowcode-scaffold/lowcode-materials