2023年4月

一、问题描述:目前找不到任何关于opengauuss Datakit安装部署的文档,自己来尝试踩坑。

DataKit是一个以资源(物理机,数据库)为底座的开发运维工具,将上层的开发运维工具插件化,各插件之间相互独立,方便用户按需引入。各插件围绕DataKit的资源中心进行扩展开,完成数据库的运维,监控,迁移,开发,建模等复杂的操作。

Datakit安装部署在服务器上,是一个自动化运维的平台。可以部署不同类型的插件来实现不同的功能,是跟随opengauss5.0发布的新软件,也可以用来监控小于5.0的opengauss版本

二、环境准备:

Datakit官方文档:https://docs.opengauss.org/zh/docs/5.0.0/docs/ToolandCommandReference/DataKit.html

DataKit使用文档和开发文档:
https://gitee.com/opengauss/openGauss-workbench/tree/master/openGauss-visualtool/doc

openGauss-workbench下载链接:https://gitee.com/opengauss/openGauss-workbench.git

Datakit下载链接:https://opengauss.org/zh/download/

JDK下载链接:https://www.oracle.com/in/java/technologies/javase/jdk11-archive-downloads.html#license-lightbox

linux操作系统的jdk版本要与datakit打的jar包jdk版本保持一致,要不然通过不了

三、安装部署

部署环境:redhat7,opengauss3.0.3,Datakit5.0

1.上传压缩包

# 此时如果没有/ops/server/openGauss-visualtool目录,可以临时手动创建,也可以把这一步在初始化环境中进行解压
[root@test01 tmp]# tar -xvf Datakit-5.0.0.tar.gz -C /ops/server/openGauss-visualtool

jdk手动安装
workbench-master有启动和初始化的脚本可以拿来用
datakit除了visualtool-main.jar 放在/ops/server/openGauss-visualtool目录中,其余几个打包好的插件需要放在/ops/server/openGauss-visualtool/visualtool-plugin/ 目录中

2.更新jdk版本,如果需要

# 检查
rpm -qa |grep java
rpm
-qa |grep jdk
# 卸载
rpm
-qa | grep java | xargs rpm -e --nodeps
rpm
-qa | grep jdk | xargs rpm -e --nodeps
# 安装
rpm -ivh jdk-11.0.17_linux-x64_bin.rpm
# 验证
[root@test01 tmp]# java -version
java version "11.0.17" 2022-10-18 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.17+10-LTS-269)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.17+10-LTS-269, mixed mode)

3.创建远程用户

# 创建用户
openGauss=# CREATE USER jack WITH MONADMIN password "xxxxxxxx";
openGauss=# alter user jack sysadmin;
# 写入pg_hba.conf文件
[omm@test01 ~]$ gs_guc set -N all -I all -h "host all jack 192.168.1.0/24 sha256"

4.编辑安装和启动脚本

安装脚本在\openGauss-workbench-master\openGauss-workbench-master\openGauss-visualtool中

使用安装脚本初始化环境,或者使用https://docs.opengauss.org/zh/docs/5.0.0/docs/ToolandCommandReference/DataKit.html文档中初始化环境,安装脚本需要修改一下。如果脚本执行不顺畅,也可以手动跑脚本中的内容,保障目录正常,以及加密文件生成

# 创建ops用户
useradd
-m ops
vim install.sh
#!/usr/bin/env bash
echo
"begin install..."#sh ./uninstall.sh
read
-p "Do you want to automatically install dependencies (JDK, maven, node) ? (Press y|Y for Yes, any other key for No)."install_depencyif [ "$install_depency" = "Y" -o "$install_depency" = "y"]; thensh ./install-depency.sh elseecho"Please install the dependencies required by the system by yourself, including openjdk (11), maven (3), and node (16.15.1)."exit1fi

read
-p "Please enter the host of openGauss, Please ensure that the current host IP is in the whitelist of openGauss:"hostif [ ! -n "$host"]; then
echo
"Host cannot be empty."exit1fi
read
-p "Please enter the port of openGauss.:"portif [ ! -n "$port"]; then
echo
"Port cannot be empty."exit1fi
read
-p "Please enter the database of openGauss.:"databaseif [ ! -n "$database"]; then
echo
"Database cannot be empty."exit1fi
read
-p "Please enter the username of openGauss.:"usernameif [ ! -n "$username"]; then
echo
"Username cannot be empty."exit1fi
stty
-echo
read
-p "Please enter the password of openGauss.:"passwordif [ ! -n "$password"]; then
echo
"Password cannot be empty."exit1fi
stty echo

echo
"host: $host, port: $port username: $username database: $database"cp config/application-temp.yml config/application-cus.yml
sed
-i "23s/ip:port/$host:$port/" config/application-cus.yml
sed
-i "23s/database/$database/" config/application-cus.yml
sed
-i "24s/dbuser/$username/" config/application-cus.yml
sed
-i "25s/dbpassword/$password/" config/application-cus.yml

#mvn clean install
-P prod -Dmaven.test.skip=truemkdir-p /ops/server/openGauss-visualtool/mkdir-p /ops/files/mkdir-p /ops/server/openGauss-visualtool/logs/mkdir-p /ops/server/openGauss-visualtool/config/mkdir-p /ops/ssl/ if [ ! -f "/ops/ssl/keystore.p12"];then
keytool
-genkey -noprompt \-dname "CN=opengauss, OU=opengauss, O=opengauss, L=Beijing, S=Beijing, C=CN"\-alias opengauss\-storetype PKCS12 \-keyalg RSA \-keysize 2048\-keystore /ops/ssl/keystore.p12 \-validity 3650\-storepass 123456fi
echo
"SSL is enabled, you can replace the keystore file at /ops/ssl/ folder and config the ssl options at file /ops/server/openGauss-visualtool/config/application-cus.yml"touch/ops/server/openGauss-visualtool/logs/visualtool-main.outcp visualtool-api/target/visualtool-main.jar /ops/server/openGauss-visualtool/mv config/application-cus.yml /ops/server/openGauss-visualtool/config/chown-R ops:ops /ops
echo
"end install"

编辑启动脚本

#!/usr/bin/env bash

SERVER_HOME
=/ops/server/openGauss-visualtool
cd $SERVER_HOME
API_NAME
=visualtool-main
JAR_NAME
=$SERVER_HOME/$API_NAME\.jar
LOG
=$SERVER_HOME/logs/$API_NAME\.outPID=$SERVER_HOME/$API_NAME\.pid

usage() {
echo
"Usage: sh server.sh [start|stop|restart|status]"exit1}

is_exist(){
pid
=`ps -ef|grep $JAR_NAME|grep -v grep|awk '{print $2}'`if [ -z "${pid}"]; thenreturn 1 else return 0fi
}

start(){
is_exist
if [ $? -eq "0"]; then
echo
">>> ${JAR_NAME} is already running PID=${pid} <<<" elseecho'' >$LOG
nohup
java -Xms2048m -Xmx4096m -jar $JAR_NAME --spring.profiles.active=cus >$LOG 2>&1 &echo $! >$PID
echo
">>> start $JAR_NAME successed PID=$! <<<"fi
}

stop(){
pidf
=$(cat $PID)
echo
">>> ${API_NAME} PID = $pidf begin kill $pidf <<<"kill $pidf
rm
-rf $PID
sleep
2is_existif [ $? -eq "0"]; then
echo
">>> ${API_NAME} 2 PID = $pid begin kill -9 $pid <<<"kill-9$pid
sleep
2echo">>> $JAR_NAME process stopped <<<" elseecho">>> ${JAR_NAME} is not running <<<"fi
}

status(){
is_exist
if [ $? -eq "0"]; then
echo
">>> ${JAR_NAME} is running PID is ${pid} <<<" elseecho">>> ${JAR_NAME} is not running <<<"fi
}

restart(){
stop
start
}
case "$1" in "start")
start
;;
"stop")
stop
;;
"status")
status
;;
"restart")
restart
;;
*)
usage
;;
esac
exit
0}

5.初始化环境

[root@test01 openGauss-visualtool]# mv application-temp.yml ./config/[root@test01 openGauss-visualtool]# ./install.sh
begin install...
sh: .
/uninstall.sh: No such file or directory
Do you want to automatically install dependencies (JDK, maven, node)
? (Press y|Y for Yes, any other key forNo). Y
sh: .
/install-depency.sh: No such file or directory
Please enter the host of openGauss, Please ensure that the current host IP
is in the whitelist of openGauss: 10.83.239.211Please enter the port of openGauss.:26000Please enter the database of openGauss.: postgres
Please enter the username of openGauss.: jack
Please enter the password of openGauss.: host:
192.168.163.21, port: 26000username: jack database: postgres
SSL
is enabled, you can replace the keystore file at /ops/ssl/ folder and config the ssl options at file /ops/server/openGauss-visualtool/config/application-cus.yml
cp: cannot stat ‘visualtool
-api/target/visualtool-main.jar’: No such file or directory
mv: ‘config
/application-cus.yml’ and ‘/ops/server/openGauss-visualtool/config/application-cus.yml’ are the same file
end install

[root@hktestmysqldb01 openGauss-visualtool]# ll config/total8
-rw------- 1 ops ops 873 Apr 11 16:42 application-cus.yml-rw-r--r-- 1 ops ops 865 Mar 28 20:47 application-temp.yml
[root@hktestmysqldb01 openGauss
-visualtool]# pwd/ops/server/openGauss-visualtool

6. 启动服务

# 把模板移动到插件目录下。也可以后期启动好平台后手动补入插件
[ops@test01 openGauss
-visualtool]$ mkdir -p /ops/server/openGauss-visualtool/visualtool-plugin
[ops@test01 openGauss
-visualtool]$ mv base-ops-5.0.0-repackage.jar ./visualtool-plugin/[ops@test01 openGauss-visualtool]$ mv data-migration-5.0.0-repackage.jar ./visualtool-plugin/[ops@test01 openGauss-visualtool]$ mv observability-instance-5.0.0-repackage.jar ./visualtool-plugin/[ops@test01 openGauss-visualtool]$ mv observability-log-search-5.0.0-repackage.jar ./visualtool-plugin/[ops@test01 openGauss-visualtool]$ mv observability-sql-diagnosis-5.0.0-repackage.jar ./visualtool-plugin/[ops@test01 openGauss-visualtool]$ mv webds-plugin-5.0.0-repackage.jar ./visualtool-plugin/# ./server.sh start/stop/restart
[ops@test01 openGauss
-visualtool]$ ./server.sh restart>>> visualtool-main PID = 45148 begin kill 45148 <<< >>> /ops/server/openGauss-visualtool/visualtool-main.jar is not running <<< >>> start /ops/server/openGauss-visualtool/visualtool-main.jar successed PID=46530 <<<

7.检查是否启动成功

/ops/server/openGauss-visualtool/logs/visualtool-main.out 会记录实时日志

[root@test01 ~]# netstat -ntpl | grep 9494tcp0      0 0.0.0.0:9494            0.0.0.0:*               LISTEN      46530/java

前台访问链接:https://192.168.163.21:9494/

插件管理,如果没有做第6部在服务器上移动插件,也可以在前台手动导入一次

首先我们应该知道,写到数据库里的时间,主要和你的mysql时区
system_time_zone
有关,而把mysql里的数据取出来,以json形式响应到浏览器上,这个时间会经过反序列化的过程,这时时间和注解
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
有关。

和我们相关的几个时区

  • UTC: Coordinated Universal Time, 国际协调时间,也称世界标准时间。
  • GMT:Greenwich Mean Time, 格林尼治时间
  • CST:中国标准时间(China Standard Time),为GMT+8
  • BST:英国夏令时间,为GMT+1
  • CST:美国中部时间(Central Standard Time),为GMT-6,正常比北京慢14小时,夏令时慢13小时
  • 东八区:GMT+8

数据库时区

1、首先查看MySQL当前的时间

select curtime();
show variables like "%time_zone%";

time_zone说明mysql使用system的时区,system_time_zone说明system使用CST时区

2、进行修改

set global time_zone = '+8:00'; #修改mysql全局时区为北京时间,也就是我们所在的东8区
set time_zone = '+8:00'; #修改当前会话时区
flush privileges;

直接在数据库连接串上添加时区

serverTimezone=GMT%2B8 #表示东八区

为代码添加对象的时区注解

@Column(name = "EXPIRE_DATE")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date expireDate;

修改k8s中pod的时区

...
  containers:
  - name: xxx
    env: 
      - name: TZ
        value: Asia/Shanghai
...
    volumeMounts:
      - name: timezone
        mountPath: /etc/localtime
  volumes:
    - name: timezone
      hostPath:
        path: /usr/share/zoneinfo/Asia/Shanghai

总结

事实上,数据库里的时间与显示出现的时间不一致,与pod的时间没关系,主要还是看你的数据库时区与@JsonFormat注解的时区。

  • 如果@JsonFormat如果是GMT+8,而连接串里是GMT+0,会出现下面截图

  • 如果@JsonFormat如果是GMT+8,连接串里也是GMT+8,会出现我们想要的截图

  • 最后,如果@JsonFormat如果是GMT+8,连接串里也是CST(可能被认为是美国中部时间,GMT-6),那么它将会比北京时间慢8+6小时

基于chunjun纯钧的增量数据同步

目前我司的大数据平台使用的是flink技术栈,底层的连接器插件使用的是国产的chunjun插件,在使用chunjun的过程中也遇到了很多问题,本次记录下在SQL模式的情况下怎么支持增量的数据同步

chunjun的官网文档对增量同步已经做出了一定的说明

纯钧官方
根据文档我编写了一个SQL脚本

create table `source` (
        `sfzh` STRING COMMENT '',
        `xm` STRING COMMENT '',
        `xb` STRING COMMENT '',
        `xbdm` STRING COMMENT '',
        `jzdz` STRING COMMENT '',
        `fzrq` DATE COMMENT '',
        `dsc_biz_record_id` STRING COMMENT ''
) with (
        'connector' = 'mysql-x',
        'url' = 'jdbc:mysql://:/?useSSL=false&useInformationSchema=true&nullCatalogMeansCurrent=true',
        'table-name' = '',
        'username' = '',
        'password' = '',
        'scan.fetch-size' = '1024',
        'scan.increment.column' = 'fzrq',
        --'scan.increment.column-type' = 'date',
        'scan.start-location' = '1659974400000'
);

create table `sink` (
        `sfzh` STRING COMMENT '',
        `xm` STRING COMMENT '',
        `xb` STRING COMMENT '',
        `xbdm` STRING COMMENT '',
        `jzdz` STRING COMMENT '',
        `fzrq` DATE COMMENT '',
        `dsc_biz_record_id` STRING COMMENT '',
        PRIMARY KEY (`dsc_biz_record_id`) NOT ENFORCED
) with (
        'connector' = 'stream-x'
);

然后提交任务的时候发现已经记录了
start-location

start-location
的指标信息了,但是并没有上报到Prometheus!

在本地调试源码解决问题的大致过程

在类 com.dtstack.chunjun.source.format.BaseRichInputFormat中有一个成员变量

/** 自定义的prometheus reporter,用于提交startLocation和endLocation指标 */
protected transient CustomReporter customReporter;

该变量是用来提交增量信息的对象,flink任务在开始的时候会执行一下方法

    @Override
    public void openInputFormat() throws IOException {
        Map<String, String> vars = getRuntimeContext().getMetricGroup().getAllVariables();
        if (vars != null) {
            jobName = vars.getOrDefault(Metrics.JOB_NAME, "defaultJobName");
            jobId = vars.get(Metrics.JOB_NAME);
            indexOfSubTask = Integer.parseInt(vars.get(Metrics.SUBTASK_INDEX));
        }

        LOG.info("是否使用自定义报告 {}", useCustomReporter());
        if (useCustomReporter()) {

            customReporter =
                    DataSyncFactoryUtil.discoverMetric(
                            config, getRuntimeContext(), makeTaskFailedWhenReportFailed());
            customReporter.open();
            LOG.info("customReporter 的hashcode is {}", customReporter.hashCode());
        }

        startTime = System.currentTimeMillis();
    }

通过排查useCustomReporter方法得知 jdbcConf.getInitReporter()是false,而在JdbcConfig类里面这个对象默认是true

 /** 使用自定义的指标输出器把增量指标打到普罗米修斯 */
    @Override
    protected boolean useCustomReporter() {
        return jdbcConf.isIncrement() && jdbcConf.getInitReporter();
    }

    /** 增量同步或者间隔轮询时,是否初始化外部存储 */
    protected Boolean initReporter = true;

经过查找 initReporter 属性的set方法调用,找到了下面的问题
在类 com.dtstack.chunjun.connector.jdbc.source.JdbcDynamicTableSource 中有个地方说暂时不支持SQL的方式

尝试一下将false修改为true,然后在本地进行测试,测试的时候将pushgateway的host和port写到代码里面,执行任务发现pushgateway里面已经有数据了

那么可以开始打包了,由于改了源代码,所以要先格式化代码 mvn spotless:apply 再打包 mvn clean package -DskipTests

后续问题

打包到虚拟机进行测试,我使用的是yarn-per-job模式,提交任务后发现报找不到Prometheus报告类的异常,通过异常信息发现在前面提到的方法里有classloader

public void openInputFormat() throws IOException {
        Map<String, String> vars = getRuntimeContext().getMetricGroup().getAllVariables();
        if (vars != null) {
            jobName = vars.getOrDefault(Metrics.JOB_NAME, "defaultJobName");
            jobId = vars.get(Metrics.JOB_NAME);
            indexOfSubTask = Integer.parseInt(vars.get(Metrics.SUBTASK_INDEX));
        }

        LOG.info("是否使用自定义报告 {}", useCustomReporter());
        if (useCustomReporter()) {

            customReporter =
                    DataSyncFactoryUtil.discoverMetric(
                            config, getRuntimeContext(), makeTaskFailedWhenReportFailed());
            customReporter.open();
            LOG.info("customReporter 的hashcode is {}", customReporter.hashCode());
        }

        startTime = System.currentTimeMillis();
    }

    public static CustomReporter discoverMetric(
            ChunJunCommonConf commonConf,
            RuntimeContext context,
            boolean makeTaskFailedWhenReportFailed) {
        try {
            String pluginName = commonConf.getMetricPluginName();
            // 这里获取到了类的全限定名 com.dtstack.chunjun.metrics.prometheus.PrometheusReport
            String pluginClassName = PluginUtil.getPluginClassName(pluginName, OperatorType.metric);
            MetricParam metricParam =
                    new MetricParam(
                            context, makeTaskFailedWhenReportFailed, commonConf.getMetricProps());

            ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
            Class<?> clazz = classLoader.loadClass(pluginClassName);
            Constructor<?> constructor = clazz.getConstructor(MetricParam.class);

            return (CustomReporter) constructor.newInstance(metricParam);
        } catch (Exception e) {
            throw new ChunJunRuntimeException(e);
        }
    }

在本地的时候这里加载类的时候是没问题的,但是在线上的时候出现了了找不到类的异常,猜测是相关的jar没有加载到flink jvm进程里面,所以将项目里面的 chunjun-metrics-prometheus.jar 放到了flink的lib目录下,再次启动任务
问题得以解决!

前言

在学习Spring框架源码时,记住一句话:源码并不难,只需要给你各种业务场景或者项目经理,你也能实现自己的Spring。虽然你的实现可能无法与开源团队相媲美,但是你肯定可以实现一个0.0.1版本。因此,初次阅读源码时,不要陷入太深的细节中。先了解大体逻辑,再仔细研读。

实现功能

本文将带领大家实现一个简易版的Spring框架,并介绍以下功能点:

  1. 了解Spring的底层源码启动过程
  2. 了解BeanDefinition的概念
  3. 了解Spring解析配置类等底层源码工作流程
  4. 了解依赖注入,Aware回调等底层源码工作流程
  5. 了解Spring AOP的底层源码工作流程

以上功能点将使我们对Spring框架的实现有所了解,但我们并不会一下子实现整个Spring框架的业务。我们将从上述功能点入手,通过手写模拟Spring框架来实现这些功能。

首先,我们像使用Spring一样,传入配置类获取applicationContext,再通过getBean方法获取具体对象。最后,我们调用方法并打印日志。如果你对这些基本流程不熟悉,可以查看我的入门系列文章:
Spring入门系列:浅析知识点

详细流程如下:

  1. 解析配置类上的ComponentScan注解,获取扫描的基本路径
  2. 开始解析各个被扫描到的文件,是否是需要被Spring管理,如果是则暂存到list集合中
  3. 开始遍历被Spring管理list集合,解析各个类上的注解,比如是否是懒加载,然后将这些属性都封装到applicationContext中的以beanName为key的BeanDefineMap中
  4. 针对已经解析好的bean定义进行创建对象并实例化,并将其放入以beanName为key的singletonMap实例化缓存池中。
  5. 在实例化时,如果发现有依赖注入的对象,则将实例化缓存池中的对象存入。如果缓存池中没有该对象,则进行创建后再注入。
  6. 判断对象是否实现了各个Aware接口,如果实现,则进行回调。
  7. 判断对象是否属于增强类。在这里,我们模拟了事务注解。如果有事务注解,则创建一个代理对象,并为所有方法增加拦截器。然后将该代理对象存入单例缓存池中。

详细解析

项目结构

基本路径:com.user目录

各个注解及上下文类:config目录

需要被管理的Bean:service目录

启动类:com.user根目录

截图

源码分析

@Component
public class UserService implements ApplicationContextAware, BeanNameAware {

    @AutoWired
    ServiceDemo serviceDemo;

    private XiaoyuApplicationContext applicationContext;
    private String beanName;

    public void test() {
        serviceDemo.say();
//        System.out.println(serviceDemo);
        System.out.println("userService:"+applicationContext.getBean("userService"));
        System.out.println("beanName:"+beanName);
    }

    @Override
    public void setApplicationContext(XiaoyuApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    @Override
    public void setBeanName(String beanName) {
        this.beanName = beanName;
    }
}

UserService类主要用于测试是否Spring已经管理了相关对象并生成了代理对象,是我们的日常业务类,我们仔细看下XiaoyuApplicationContext类,主要的流程在这边:

public class XiaoyuApplicationContext {


    //配置类
    private Class config;
    //初始的bean定义
    private Map<String,BeanDefinition> beanDefineMap = new HashMap<>();
    //单例缓存池
    private Map<String,Object> singleBean = new HashMap<>();

    public XiaoyuApplicationContext(Class myDemoConfigClass) {
        config = myDemoConfigClass;
        //解析配置类
        scan();

    }

    public void scan(){
        ComponentScan declaredAnnotation = (ComponentScan) config.getDeclaredAnnotation(ComponentScan.class);
        String value = declaredAnnotation.basePackages();
        doScan(value);
        //将bean定义Map生成具体的Bean对象
        beanDefineMap.entrySet().stream().forEach(item->{
            String beanName = item.getKey();
            BeanDefinition beanDefinition = item.getValue();
            if (!beanDefinition.isLazy() && "singleton".equals(beanDefinition.getScope())) {
                Object bean = createBean(beanName);
                singleBean.put(beanName,bean);
            }
        });
    }

    /**
     *     解析配置类
     */
    private void doScan(String value) {
        String path = value.replace(".","/");
        //正常走文件解析
        ClassLoader classLoader = this.getClass().getClassLoader();
        URL resource = classLoader.getResource(path);
        File file = new File(resource.getFile());

        List<File> classFile = new ArrayList<>();
        //简单点直接双层解析即可
        if (file.isDirectory()) {
            for (File f : file.listFiles()) {
                if (f.isDirectory()) {
                    for (File f1 : f.listFiles()) {
                        if (!f1.isDirectory()) {
                            classFile.add(f1);
                        }
                    }
                } else {
                    classFile.add(f);
                }
            }
        }
        //遍历所有解析文件
        for (File cFile : classFile) {
            String absolutePath = cFile.getAbsolutePath();
            String className = absolutePath.substring(absolutePath.indexOf("com"), absolutePath.indexOf(".class"))
                    .replace("\\", ".");

            try {
                Class<?> clazz = classLoader.loadClass(className);
                //是否需要被Spring管理
                if (clazz.isAnnotationPresent(Component.class)) {
                    //将bean上的注解封装到bean定义中
                    BeanDefinition beanDefinition = new BeanDefinition();
                    beanDefinition.setType(clazz);
                    beanDefinition.setLazy(clazz.isAnnotationPresent(Lazy.class));
                    if (clazz.isAnnotationPresent(Scope.class)) {
                        beanDefinition.setScope(clazz.getAnnotation(Scope.class).value());
                    } else {
                        beanDefinition.setScope("singleton");
                    }

                    String beanName = clazz.getAnnotation(Component.class).value();
                    if (beanName.isEmpty()) {
                        //如果不设置beanName会默认生产唯一一个name,Spring底层也是这样做的
                        beanName = Introspector.decapitalize(clazz.getSimpleName());
                    }

                    beanDefineMap.put(beanName, beanDefinition);

                }
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }

        }
    }

    public Object createBean(String beanName){

        BeanDefinition beanDefinition = beanDefineMap.get(beanName);
        Class type = beanDefinition.getType();

        try {
            Object instance = type.newInstance();
            //属性填充,依赖注入
            populateBean(instance);

            if (instance instanceof ApplicationContextAware){
                ((ApplicationContextAware) instance).setApplicationContext(this);
            }
            if (instance instanceof BeanNameAware) {
                ((BeanNameAware) instance).setBeanName(beanName);
            }
            //是否需要AOP增强
            if (type.isAnnotationPresent(Transaction.class)) {
                Enhancer enhancer = new Enhancer();
                enhancer.setSuperclass(type);
                //简单的方法切面
                enhancer.setCallback(new MethodInterceptor() {
                    @Override
                    public Object intercept(Object proxy, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable {
                        //开启事务,关闭自动提交
                        System.out.println("事务已开启");
                        Object res = method.invoke(instance, objects);
                        //提交事务
                        System.out.println("事务已提交");
                        return res;
                    }
                });
                return enhancer.create();
            }
            return instance;
        } catch (InstantiationException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        }

        return null;
    }

    private void populateBean(Object instance) {
        Field[] declaredFields = instance.getClass().getDeclaredFields();

        Arrays.stream(declaredFields).forEach(item->{
            //寻找注入点
            if (item.isAnnotationPresent(AutoWired.class)) {
                Object bean = getBean(item.getName());
                item.setAccessible(true);
                try {
                    item.set(instance,bean);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                }
            }
        });

    }


    public Object getBean(String beanName){
        if (!beanDefineMap.containsKey(beanName)) {
           throw new NullPointerException();
        }
        if ("singleton".equals(beanDefineMap.get(beanName).getScope())) {
            if (singleBean.containsKey(beanName)) {
                return singleBean.get(beanName);
            } else {
                Object bean = createBean(beanName);
                singleBean.put(beanName,bean);
                return bean;
            }
        }
        return createBean(beanName);
    }
}

以上即为整个流程的基本梳理。我们在实现过程中没有涉及Bean循环依赖以及其他各种创建缓存,但Spring在实现Bean的创建过程中确实用到了各种本地缓存和同步锁(synchronized)。在学习源码时,不要太关注这些细节。首先要理解整个流程,再深入研究。

结语

最后,我们在gitee上提供了项目源码。如果需要,可以查看
spring-xiaoyu
。虽然我认为写一遍自己的代码更好,因为这是最简单的流程,有助于理解Spring源码。
公众号

一、贝叶斯定理

贝叶斯定理是关于随机事件A和B的条件概率,生活中,我们可能很容易知道P(A|B),但是我需要求解P(B|A),学习了贝叶斯定理,就可以解决这类问题,计算公式如下:

  • P(A)是A的先验概率
  • P(B)是B的先验概率
  • P(A|B)是A的后验概率(已经知道B发生过了)
  • P(B|A)是B的后验概率(已经知道A发生过了)

二、朴素贝叶斯分类

朴素贝叶斯的思想是,对于给出的待分类项,求解在此项出现的条件下,各个类别出现的概率,哪个最大,那么就是那个分类。

  • x={a_{1},a_{2},...,a_{m}}
    是一个待分类的数据,有m个特征
  • C=y_{1},y_{2},...,y_{n}
    是类别,计算每个类别出现的先验概率
    p(y_{i})
  • 在各个类别下,每个特征属性的条件概率计算
    p(x|y_{i})
  • 计算每个分类器的概率
    p(y_{i}|x)=\frac{p(x|y_{i})p(y_{i})}{p(x)}
  • 概率最大的分类器就是样本
    x
    的分类

三、java样例代码开发步骤

首先,需要在pom.xml文件中添加以下依赖项:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-mllib_2.12</artifactId>
    <version>3.2.0</version>
</dependency>

然后,在Java代码中,可以执行以下步骤来实现朴素贝叶斯算法:

1、创建一个SparkSession对象,如下所示:

importorg.apache.spark.sql.SparkSession;

SparkSession spark
=SparkSession.builder()
.appName(
"NaiveBayesExample")
.master(
"local[*]")
.getOrCreate();

2、加载训练数据和测试数据:

importorg.apache.spark.ml.feature.LabeledPoint;importorg.apache.spark.ml.linalg.Vectors;importorg.apache.spark.sql.Dataset;importorg.apache.spark.sql.Row;importorg.apache.spark.sql.types.DataTypes;import static org.apache.spark.sql.functions.*;//读取训练数据
Dataset<Row> trainingData =spark.read()
.option(
"header", true)
.option(
"inferSchema", true)
.csv(
"path/to/training_data.csv");//将训练数据转换为LabeledPoint格式 Dataset<LabeledPoint> trainingLP =trainingData
.select(col(
"label"), col("features"))
.map(row
-> newLabeledPoint(
row.getDouble(
0),
Vectors.dense((
double[])row.get(1))),
Encoders.bean(LabeledPoint.
class));//读取测试数据 Dataset<Row> testData =spark.read()
.option(
"header", true)
.option(
"inferSchema", true)
.csv(
"path/to/test_data.csv");//将测试数据转换为LabeledPoint格式 Dataset<LabeledPoint> testLP =testData
.select(col(
"label"), col("features"))
.map(row
-> newLabeledPoint(
row.getDouble(
0),
Vectors.dense((
double[])row.get(1))),
Encoders.bean(LabeledPoint.
class));

请确保训练数据和测试数据均包含
"label"

"features"
两列,其中
"label"
是标签列,
"features"
是特征列。

3、创建一个朴素贝叶斯分类器:
importorg.apache.spark.ml.classification.NaiveBayes;importorg.apache.spark.ml.classification.NaiveBayesModel;

NaiveBayes nb
= newNaiveBayes()
.setSmoothing(
1.0) //设置平滑参数 .setModelType("multinomial"); //设置模型类型 NaiveBayesModel model= nb.fit(trainingLP); //拟合模型

在这里,我们创建了一个NaiveBayes对象,并设置了平滑参数和模型类型。然后,我们使用
fit()
方法将模型拟合到训练数据上。

4、使用模型进行预测:
Dataset<Row> predictions =model.transform(testLP);//查看前10条预测结果
predictions.show(10);

在这里,我们使用
transform()
方法对测试数据进行预测,并将结果存储在一个DataFrame中。可以通过调用
show()
方法查看前10条预测结果。

5、关闭SparkSession:

spark.close();

以下是完整代码的示例。请注意,需要替换数据文件的路径以匹配您的实际文件路径:

importorg.apache.spark.ml.classification.NaiveBayes;importorg.apache.spark.ml.classification.NaiveBayesModel;importorg.apache.spark.ml.feature.LabeledPoint;importorg.apache.spark.ml.linalg.Vectors;importorg.apache.spark.sql.Dataset;importorg.apache.spark.sql.Row;importorg.apache.spark.sql.SparkSession;importorg.apache.spark.sql.Encoders;import static org.apache.spark.sql.functions.*;public classNaiveBayesExample {public static voidmain(String[] args) {//创建SparkSession对象
        SparkSession spark =SparkSession.builder()
.appName(
"NaiveBayesExample")
.master(
"local[*]")
.getOrCreate();
try{//读取很抱歉,我刚才的回答被意外截断了。以下是完整的Java代码示例: ```javaimportorg.apache.spark.ml.classification.NaiveBayes;importorg.apache.spark.ml.classification.NaiveBayesModel;importorg.apache.spark.ml.feature.LabeledPoint;importorg.apache.spark.ml.linalg.Vectors;importorg.apache.spark.sql.Dataset;importorg.apache.spark.sql.Row;importorg.apache.spark.sql.SparkSession;importorg.apache.spark.sql.Encoders;import static org.apache.spark.sql.functions.*;public classNaiveBayesExample {public static voidmain(String[] args) {//创建SparkSession对象 SparkSession spark =SparkSession.builder()
.appName(
"NaiveBayesExample")
.master(
"local[*]")
.getOrCreate();
try{//读取训练数据 Dataset<Row> trainingData =spark.read()
.option(
"header", true)
.option(
"inferSchema", true)
.csv(
"path/to/training_data.csv");//将训练数据转换为LabeledPoint格式 Dataset<LabeledPoint> trainingLP =trainingData
.select(col(
"label"), col("features"))
.map(row
-> newLabeledPoint(
row.getDouble(
0),
Vectors.dense((
double[])row.get(1))),
Encoders.bean(LabeledPoint.
class));//读取测试数据 Dataset<Row> testData =spark.read()
.option(
"header", true)
.option(
"inferSchema", true)
.csv(
"path/to/test_data.csv");//将测试数据转换为LabeledPoint格式 Dataset<LabeledPoint> testLP =testData
.select(col(
"label"), col("features"))
.map(row
-> newLabeledPoint(
row.getDouble(
0),
Vectors.dense((
double[])row.get(1))),
Encoders.bean(LabeledPoint.
class));//创建朴素贝叶斯分类器 NaiveBayes nb = newNaiveBayes()
.setSmoothing(
1.0)
.setModelType(
"multinomial");//拟合模型 NaiveBayesModel model =nb.fit(trainingLP);//进行预测 Dataset<Row> predictions =model.transform(testLP);//查看前10条预测结果 predictions.show(10);

}
finally{//关闭SparkSession spark.close();
}
}
}

请注意替换代码中的数据文件路径,以匹配实际路径。另外,如果在集群上运行此代码,则需要更改master地址以指向正确的集群地址。