2024年8月

介绍

Webhook就是一种HTTP回调,用于在某种情况下执行某些动作,Webhook不是K8S独有的,很多场景下都可以进行Webhook,比如在提交完代码后调用一个Webhook自动构建docker镜像

准入 Webhook 是一种用于接收准入请求并对其进行处理的 HTTP 回调机制。 可以定义两种类型的准入 Webhook, 即验证性质的准入 Webhook 和变更性质的准入 Webhook。 变更性质的准入 Webhook 会先被调用。它们可以修改发送到 API 服务器的对象以执行自定义的设置默认值操作。

在完成了所有对象修改并且 API 服务器也验证了所传入的对象之后, 验证性质的 Webhook 会被调用,并通过拒绝请求的方式来强制实施自定义的策略。

Admission Webhook使用较多的场景如下

  1. 在资源持久化到ETCD之前进行修改(Mutating Webhook),比如增加init Container或者sidecar Container
  2. 在资源持久化到ETCD之前进行校验(Validating Webhook),不满足条件的资源直接拒绝并给出相应信息

组成

  1. webhook 服务
  2. webhook 配置
  3. webhook 证书

创建核心组件Pod的Webhook

使用kubebuilder新建webhook项目

kubebuilder init --domain test.com --repo gitlab.qima-inc.com/test-operator
(base) (⎈ |kubernetes-admin@qa-u03:qa)➜  test-operator kubebuilder init --domain test.com --repo gitlab.qima-inc.com/test-operator
INFO Writing kustomize manifests for you to edit...
INFO Writing scaffold for you to edit...
INFO Get controller runtime:
$ go get sigs.k8s.io/controller-runtime@v0.17.2
INFO Update dependencies:
$ go mod tidy
go: go.mod file indicates go 1.21, but maximum version supported by tidy is 1.19
Error: failed to initialize project: unable to run post-scaffold tasks of "base.go.kubebuilder.io/v4": exit status 1

因为我默认是是go1.19所以版本达不到要求,这里两种处理方式

  1. 指定 --plugins go/v3 --project-version 3
  2. 切换高版本golang 这里我切换了go1.22
(base) (⎈ |kubernetes-admin@qa-u03:qa)➜  test-operator kubebuilder init --domain test.com --repo gitlab.qima-inc.com/test-operator                                    
INFO Writing kustomize manifests for you to edit... 
INFO Writing scaffold for you to edit...          
INFO Get controller runtime:
$ go get sigs.k8s.io/controller-runtime@v0.17.2 
INFO Update dependencies:
$ go mod tidy           
Next: define a resource with:
$ kubebuilder create api

生成核心组件Pod的API

(base) (⎈ |kubernetes-admin@qa-u03:qa)➜  test-operator kubebuilder create api --group core --version v1 --kind Pod                                                    
INFO Create Resource [y/n]                        
n
INFO Create Controller [y/n]                      
n
INFO Writing kustomize manifests for you to edit... 
INFO Writing scaffold for you to edit...          
INFO Update dependencies:
$ go mod tidy        

这里有两个选项,创建资源和创建控制器
因为是内置资源Pod所以不需要创建资源,也不需要控制器
假如是自定义资源,需要创建资源,创建控制器

创建webhook

(base) (⎈ |kubernetes-admin@qa-u03:qa)➜  test-operator kubebuilder create webhook --group core --version v1 --kind Pod --defaulting --programmatic-validation
INFO Writing kustomize manifests for you to edit... 
ERRO Unable to find the target(s) #- path: patches/webhook/* to uncomment in the file config/crd/kustomization.yaml. 
ERRO Unable to find the target(s) #configurations:
#- kustomizeconfig.yaml to uncomment in the file config/crd/kustomization.yaml. 
INFO Writing scaffold for you to edit...          
INFO api/v1/pod_webhook.go                        
INFO api/v1/pod_webhook_test.go                   
INFO api/v1/webhook_suite_test.go                 
INFO Update dependencies:
$ go mod tidy           
INFO Running make:
$ make generate                
mkdir -p /Users/xxxx/test-operator/bin
Downloading sigs.k8s.io/controller-tools/cmd/controller-gen@v0.14.0
/Users/xxxx/test-operator/bin/controller-gen-v0.14.0 object:headerFile="hack/boilerplate.go.txt" paths="./..."
Next: implement your new Webhook and generate the manifests with:
$ make manifests

代码结构

.
├── Dockerfile
├── Makefile
├── PROJECT
├── README.md
├── api
│   └── v1
│       ├── pod_webhook.go
│       ├── pod_webhook_test.go
│       └── webhook_suite_test.go
├── bin
│   └── controller-gen-v0.14.0
├── cmd
│   └── main.go
├── config
│   ├── certmanager
│   │   ├── certificate.yaml
│   │   ├── kustomization.yaml
│   │   └── kustomizeconfig.yaml
│   ├── crd
│   │   └── patches
│   │       ├── cainjection_in_pods.yaml
│   │       └── webhook_in_pods.yaml
│   ├── default
│   │   ├── kustomization.yaml
│   │   ├── manager_auth_proxy_patch.yaml
│   │   ├── manager_config_patch.yaml
│   │   ├── manager_webhook_patch.yaml
│   │   └── webhookcainjection_patch.yaml
│   ├── manager
│   │   ├── kustomization.yaml
│   │   └── manager.yaml
│   ├── prometheus
│   │   ├── kustomization.yaml
│   │   └── monitor.yaml
│   ├── rbac
│   │   ├── auth_proxy_client_clusterrole.yaml
│   │   ├── auth_proxy_role.yaml
│   │   ├── auth_proxy_role_binding.yaml
│   │   ├── auth_proxy_service.yaml
│   │   ├── kustomization.yaml
│   │   ├── leader_election_role.yaml
│   │   ├── leader_election_role_binding.yaml
│   │   ├── role.yaml
│   │   ├── role_binding.yaml
│   │   └── service_account.yaml
│   └── webhook
│       ├── kustomization.yaml
│       ├── kustomizeconfig.yaml
│       ├── manifests.yaml
│       └── service.yaml
├── go.mod
├── go.sum
├── hack
│   └── boilerplate.go.txt
└── test
├── e2e
│   ├── e2e_suite_test.go
│   └── e2e_test.go
└── utils
└── utils.go

实现Webhook相关代码

因为只有Webhook,没有Controller 所以只需要实现Webhook相关代码即可,同时需要注释掉一些代码如:
Dockerfile中的

# COPY internal/controller/ internal/controller/

修改api/v1/xxx_suite_test.go
因为核心组件Pod的Webhook和一般的CRD的webhook不一样,此处生成的pod_webhook.go只有Default()这个function,因此,我们需要直接重写整个代码,最重要的是Handle()方法。

/*
Copyright 2024.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1

import (
	"fmt"
	"net/http"
	"sigs.k8s.io/controller-runtime/pkg/client"
	logf "sigs.k8s.io/controller-runtime/pkg/log"
	"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)

// log is for logging in this package.
var podlog = logf.Log.WithName("pod-resource")

// 定义核心组件pod的webhook的主struct,类似于java的Class
type PodWebhookMutate struct {
	Client  client.Client
	decoder *admission.Decoder
}

// +kubebuilder:webhook:path=/mutate-core-v1-pod,mutating=true,failurePolicy=fail,sideEffects=None,groups=core,resources=pods,verbs=create;update,versions=v1,name=mpod.kb.io,admissionReviewVersions=v1
func (a *PodWebhookMutate) Handle(ctx context.Context, req admission.Request) admission.Response {
	pod := &corev1.Pod{}
	err := a.decoder.Decode(req, pod)
	if err != nil {
		return admission.Errored(http.StatusBadRequest, err)
	}

	// TODO: 变量marshaledPod是一个Map,可以直接修改pod的一些属性
	marshaledPod, err := json.Marshal(pod)
	if err != nil {
		return admission.Errored(http.StatusInternalServerError, err)
	}
	// 打印
	fmt.Println("======================================================")
	fmt.Println(string(marshaledPod))
	return admission.PatchResponseFromRaw(req.Object.Raw, marshaledPod)
}

func (a *PodWebhookMutate) InjectDecoder(d *admission.Decoder) error {
	a.decoder = d
	return nil
}

修改main.go文件:

if os.Getenv("ENABLE_WEBHOOKS") != "false" {
    //if err = (&corev1.Pod{}).SetupWebhookWithManager(mgr); err != nil {
    //	setupLog.Error(err, "unable to create webhook", "webhook", "Pod")
    //	os.Exit(1)
    //}
    mgr.GetWebhookServer().Register("/mutate-core-v1-pod", &webhook.Admission{Handler: &v1.PodWebhookMutate{Client: mgr.GetClient()}})
}

生成mainfests

make manifests generate

证书

手动签发证书

https://cuisongliu.github.io/2020/07/kubernetes/admission-webhook/

自动签发证书

webhook 服务启动时自动生成证书,授权证书

  1. 创建CA根证书以及服务的证书
  2. 将服务端、CA证书写入 k8s Secret,并且支持find or create
  3. 本地写入证书
  4. 获取MutatingWebhookConfiguration和ValidatingWebhookConfiguration将caCert写入webhook config中的ClientConfig.CABundle(这里有个问题是webhook需要提前创建,CABundle可以写个临时值,等webhook server 启动覆盖)

自动签发证书参考项目:
https://github.com/koordinator-sh/koordinator/blob/main/pkg/webhook/util/controller/webhook_controller.go#L187

存储区​

Android 一开始就将存储区分为
内部存储

外部存储
,对应手机自带的存储和可插拔的 sd 卡(可类比于 PC 的硬盘和 U盘)。

内部存储容量有限,Google 建议 App 数据尽量存储于外部存储中。

随着硬件技术发展,自带大容量空间的手机开始出现,关于内部存储的描述逐渐偏离现实了,于是从
Android 4.4(API 19)
开始,官方不再将机身存储等同于内部存储,而是从逻辑上将其一部分划到外部存储,限制剩下那部分的容量,也就是现在所谓的内部存储。这一操作,使得原本内部存储和外部存储的特性和使用场景得以延续。

当然,如果在 4.4 系统及以上的手机上插了 sd 卡,那么 sd 卡也属于外部存储。

我们可以使用如下代码打印出所有的外部存储:

File[] files;
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.KITKAT) {
    files = getExternalFilesDirs(Environment.MEDIA_MOUNTED);
    for(File file:files){
        Log.e("main",file);
    }
}

对于 4.4 以上的插了 sd 卡的大容量手机,应该会打印出如下信息:

/storage/emulated/0/Android/data/packname/files/mounted 
/storage/B3E4-1711/Android/data/packname/files/mounted 

文件

应用专属文件

仅供应用使用的文件,可以存储到内部存储或外部存储中的本应用专属目录,本应用访问时不需要任何权限。在数据安全方面,虽然都是专属目录,但是内部存储可以保证其它应用访问不到,而外部存储就比较复杂了。

在较低版本的 Android 系统中,只要声明
READ_EXTERNAL_STORAGE
权限就能访问位于外部存储空间中应用专属目录之外的任何文件;只要声明
WRITE_EXTERNAL_STORAGE
权限就能向应用专属目录以外的任何文件写入数据。

这实在是相当危险,谁也不希望自家应用中的数据被抓取或篡改。于是从
Android 10(API 29)
开始有了
分区存储
的概念,应用在默认情况下就能访问外部存储空间上自己的专属目录,以及本应用所创建的特定类型的媒体文件(使用
MediaStore API
,下面会讲到)。如此,除非特殊情况,应用不再需要声明上述权限了。

此时,如果应用在运行时请求与存储相关的权限,将会弹出请求对话框(动态申请)表明应用正在请求对外部存储空间的广泛访问权限。

Android 11(API 30)
开始更进一步,干脆将 WRITE_EXTERNAL_STORAGE 权限的作用抹除(即使声明了该权限也没用)。这将应用的写权限完全限制在了本应用相关目录(专属目录和本应用创建的媒体文件)中。

ps:Android 11 引入了
_
MANAGE_EXTERNAL_STORAGE
_
权限,该权限替代 WRITE_EXTERNAL_STORAGE,提供对应用专属目录和 MediaStore 之外文件的写入权限,但对使用的要求更严格。如需了解详情,请参阅有
管理存储设备上所有文件

共享文件

存储您的应用打算与其它应用共享的文件,包括媒体(图片、音频文件、视频)、其它类型文件。

媒体文件

使用 MediaStore API 访问。注意:即使您的应用已卸载,作为共享文件(保存在媒体库中)的媒体文件仍会保留在用户的设备上。

除访问自己的媒体文件外,访问其它应用的媒体文件需要权限——在 Android 11(API 30)或更高版本中,需要 READ_EXTERNAL_STORAGE;在 Android 10(API 29)中,需要 READ_EXTERNAL_STORAGE 或 WRITE_EXTERNAL_STORAGE;在更低版本中,访问所有文件均需要相关权限。——不过这也不是绝对的。

比如
照片选择器
,它提供了一个可浏览界面,为用户提供了一种安全的
内置授权
方式,让用户可以向应用授予限于所选图片和视频的访问权限,而非整个媒体库的访问权限,该权限保留至设备重启或应用停止运行。使用照片选择器可以看作定制的动态申请权限的界面,至少从
Android 13(API 32)
开始,无需事先声明 READ_EXTERNAL_STORAGE。

其它文件

自 Android 4.4(API 19)始,官方提供了
存储访问框架
,便于应用与外部存储卷和云端存储空间在内的文档提供器互动。此框架支持用户与系统选择器互动,从而选择文档提供器以及供您的应用创建、打开或修改的特定文档和其它文件。

同照片选择器类似,由于用户参与选择您的应用可以访问的文件或目录,因此该机制无需任何系统权限,同时用户控制和隐私保护也得到了增强。

这些文件存储在应用专属目录和媒体库之外,且在应用卸载后仍会保留在设备上。

使用存储访问框架涉及以下步骤:

  1. 应用调用包含存储相关操作的 intent(
    ACTION_CREATE_DOCUMENT
    保存文件;
    ACTION_OPEN_DOCUMENT
    打开文件;
    ACTION_OPEN_DOCUMENT_TREE
    授予应用对该目录中所有文件和子目录的访问权限)。
  2. 用户看到一个系统选择器,供其浏览文档提供器并选择将执行存储相关操作的位置或文档。
  3. 应用获得对代表用户所选位置或文档的 URI 的读写访问权限。利用该 URI,应用可以在选择的位置执行操作。

数据

应用配置项

不赘述,就是简单的键值对。值得一提的是,之前都是使用
SharedPreferences
进行应用配置项的操作,现在官方建议使用
Jetpack DataStore
,允许您使用协议缓冲区存储键值对或类型化对象。DataStore 基于
Kotlin 协程

Kotlin.Flow
以异步、一致的事务方式存储数据。

数据库

基于
SQLite
的数据存储,一般选择
Jetpack.Room
这个半 ORM 简化数据 CRUD 操作。卸载应用时数据库会跟着删除。


Kafka 从 2.6.0 开始,默认使用 Java 11 , 3.0.0 开始,不再支持 Java 8,详见:
https://kafka.apache.org/downloads

image

  • Producer:消息生产者,就是向 kafka broker 发消息的客户端:
  • Consumer:消息消费者,向 kafka broker 取消息的客户端;
  • ConsumerGroup:消费者组,由多个consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
    不同的组可以消费同一个消息,且只能被消费组内的一个消费者消费
  • Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic。
  • Topic :可以理解为一个队列,生产者和消费者面向的都是一个 topic;
  • Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列;
  • Replica: 副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 Kafka 仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个 Leader 和 若干个 follower
  • leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。
  • follower:每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。leader 发生故障时,某个 follower 会成为新的 follower。

安装 JDK 11

[root@kafka1host ~]# mkdir /usr/local/java
# 解压JDK
[root@kafka1host ~]# tar -zxvf jdk-11.0.17_linux-x64_bin.tar.gz -C /usr/local/java
# 注意 由于jdk1.8版本之后无 jre. 需要进入jdk根目录用以下命令生成jre文件夹不然在运行tomcat时会报缺失jre的错误
./bin/jlink --module-path jmods --add-modules java.desktop --output jre

# 配置环境变量
[root@localhost~]# vi /etc/profile
export JAVA_HOME=/usr/local/java/jdk-11.0.17
export JRE_HOME=${JAVA_HOME}
export PATH=$PATH:${JAVA_HOME}/bin
export CLASSPATH=./:${JAVA_HOME}/lib:${JAVA_HOME}/lib

# 让环境变更生效
[root@localhost~]# source /etc/profile
#  此时 java -version 仍然显示旧版本 【1. 删除旧版本、2. 切换Java版本】
[root@kafka1host kafka]# java -version
openjdk version "1.8.0_102"
OpenJDK Runtime Environment (build 1.8.0_102-b14)
OpenJDK 64-Bit Server VM (build 25.102-b14, mixed mode)
[root@kafka1host kafka]#

切换 JAVA 版本,(
如需要卸载旧版本,点击此处
)


# 查看已安装的Java版本及其路径,/etc/alternatives/java 是当前Java版本的符号链接。
[root@kafka1host ~]# ls -l /usr/bin/java
lrwxrwxrwx. 1 root root 22 Apr 27  2021 /usr/bin/java -> /etc/alternatives/java
# 查看当前Java版本的可执行文件路径: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.102-4.b14.el7.x86_64/jre/bin/java 是当前Java版本的可执行文件路径。
[root@kafka1host ~]# ls -l /etc/alternatives/java
lrwxrwxrwx. 1 root root 71 Apr 27  2021 /etc/alternatives/java -> /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.102-4.b14.el7.x86_64/jre/bin/java
# alternatives 注册 Java 版本信息
[root@kafka1host ~]# alternatives --install /usr/bin/java java /usr/local/java/jdk-11.0.17/bin/java 1
	# alternatives --install <link> <name> <path> <priority>
	# install 表示安装
	# link 是符号链接
	# name 则是标识符
	# path 是执行文件的路径
	# priority 则表示优先级
# 选择Java配置版本
[ithoth@kafka1host ~]# sudo alternatives --config java

There are 3 programs which provide 'java'.

  Selection    Command
-----------------------------------------------
*+ 1           java-1.8.0-openjdk.x86_64 (/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.102-4.b14.el7.x86_64/jre/bin/java)
   2           java-1.7.0-openjdk.x86_64 (/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.111-2.6.7.8.el7.x86_64/jre/bin/java)
   3           /usr/local/java/jdk-11.0.17/bin/java
# 输入对应版本的编号,然后按Enter键
Enter to keep the current selection[+], or type selection number: 3
# 查看Java版本
[ithoth@kafka1host ~]# java -version
java version "11.0.17" 2022-10-18 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.17+10-LTS-269)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.17+10-LTS-269, mixed mode)
[ithoth@kafka1host ~]# 

注意: 由于jdk1.8版本之后无 jre. 如果需要运行 tomcat ,需要进入jdk根目录用以下命令生成jre文件夹不然在运行tomcat时会报缺失jre的错误
./bin/jlink --module-path jmods --add-modules java.desktop --output jre

安装 Kafka

下载 Kafka 2.8.2

https://archive.apache.org/dist/kafka/2.8.2/kafka_2.13-2.8.2.tgz

image

防火墙

配置防火墙规则

# 查看防火墙状态
[root@kafka1host ~]# firewall-cmd --state 
running
# 查看默认作用域 -- 一般不需要查看
[root@kafka1host ~]# firewall-cmd --get-default-zone
public
# 添加防火墙规则,允许访问 2181、9092 端口
[root@kafka1host ~]# firewall-cmd --permanent --zone=public --add-port=2181/tcp
# –permanent : 表示使设置永久生效,不加的话机器重启之后失效,
# –add-port=2181/tcp : 表示添加一个端口和协议的规则,
# --zone=public: 作用域(默认为 public 可不加)
[root@kafka1host ~]# firewall-cmd --permanent --add-port=9092/tcp
# 更新防火墙规则
[root@kafka1host ~]# firewall-cmd --reload
# 查看所有打开的端口
[root@kafka1host ~]# firewall-cmd --list-port
2181/tcp 9092/tcp

修改配置

修改配置
server.properties
注意配置中的“=”前后不能有空格

# kafka broker 实际监听的地址和端口,集群间配置使用,如果不配置会使用 hostname 导致程序无法访问,如报:无法连接:kafka1host:9092
listeners=PLAINTEXT://172.0.30.100:9092

#允许删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除
delete.topic.enable=true

# 消息日志,默认日志会存放在 /tmp 目录下
# 程序目录下的 logs 是放kafka 运行的日志,不建议放程序目录下
log.dirs=/tmp/kafka-logs

# 以下可不配置-------更多详细配置百度
# 对外提供的地址,它会注册到 zookeeper上,如果不配置,会使用上面 listeners 的值
# advertised.listeners=PLAINTEXT://10.100.25.230:9092

运行测试

一般 zookeeper 单独部署。这边单节点部署,为了省事,直接使用 kafka 包中内置的 Zookeeper。

启动 zookeeper
窗口A

# 先启动 zookeeper
[root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/zookeeper-server-start.sh /usr/local/kafka_2.13-2.8.2/config/zookeeper.properties

启动 Kafka , 新开一个命令行
窗口B

# 再启动 Kafka
[root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/kafka-server-start.sh /usr/local/kafka_2.13-2.8.2/config/server.properties

创建 topic 再开一个命令行
窗口C
测试

# 创建 topic
[root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/kafka-topics.sh --create --topic kafka-vipsoft --replication-factor 1 --partitions 1 --zookeeper localhost:2181
Created topic kafka-vipsoft.

# 查看 topic
[root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
kafka-vipsoft
# 删除 topic,如果 delete.topic.enable=true 没设的话,在kafka重启后才会生效
[root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/kafka-topics.sh --delete --topic kafka-vipsoft --zookeeper localhost:2181
Topic kafka-vipsoft is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

启动生产者
窗口C

# 启动生产者, 不能使用 localhost:9092 需要和 配置中的 listeners 保持一致
[root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/kafka-console-producer.sh --broker-list 172.0.30.100:9092 --topic kafka-vipsoft
> Hello 123

启动消费者,再开一个命令行
窗口D

#启动消费者
[root@kafka1host ~]# /usr/local/kafka-console-consumer.sh --bootstrap-server 172.16.3.203:9092 --topic kafka-vipsoft --from-beginning
Hello 123

自启动

创建启动命令
vi /usr/local/kafka_2.13-2.8.2/kafka_start.sh

#!/bin/sh

/usr/local/kafka_2.13-2.8.2/bin/zookeeper-server-start.sh /usr/local/kafka_2.13-2.8.2/config/zookeeper.properties &

sleep 5 #先启动 zookeeper,等5秒后启动 kafka 

#启动kafka
/usr/local/kafka_2.13-2.8.2/bin/kafka-server-start.sh /usr/local/kafka_2.13-2.8.2/config/server.properties &

创建停止命令
vi /usr/local/kafka_2.13-2.8.2/kafka_stop.sh

#!/bin/sh

/usr/local/kafka_2.13-2.8.2/bin/zookeeper-server-stop.sh /usr/local/kafka_2.13-2.8.2/config/zookeeper.properties &

sleep 3 #先启动 zookeeper,等3秒后启动 kafka

/usr/local/kafka_2.13-2.8.2/bin/kafka-server-stop.sh /usr/local/kafka_2.13-2.8.2/config/server.properties &

修改脚本执行权限

chmod 775 kafka_start.sh
chmod 775 kafka_stop.sh

验证脚本

sh /usr/local/kafka_2.13-2.8.2/kafka_start.sh

设置开机启动
vi /etc/rc.d/rc.local


# 添加如下脚本
sh /usr/local/kafka_2.13-2.8.2/kafka_start.sh &

验证端口

查看多端口状态

netstat -ntpl | grep ':2181\|:9020'
[root@kafka1host ~]# netstat -ntpl | grep ':2181\|:9092'
tcp6       0      0 172.16.3.203:9092       :::*                    LISTEN      5419/java
tcp6       0      0 :::2181                 :::*                    LISTEN      4182/java

《黑神话:悟空》爆火,在游戏的世界里,如同一颗璀璨的新星,吸引了全球玩家的目光。从产品经理的角度来看,这款游戏的成功并非偶然,而是一系列精心策划和执行的结果。

一、精准的市场定位

《黑神话:悟空》将目标受众锁定在热爱动作角色扮演游戏、对中国传统文化有浓厚兴趣的玩家群体。游戏以中国古典名著《西游记》为背景,深入挖掘其中的神话元素,打造出一个充满奇幻色彩的游戏世界。这种独特的市场定位,既满足了玩家对于高品质游戏的需求,又弘扬了中国传统文化,具有极高的辨识度和吸引力。

产品经理在进行市场定位时,需要深入了解目标用户的需求和喜好,找到市场空白点,打造出具有差异化竞争优势的产品。《黑神话:悟空》正是通过对市场的精准把握,成功地在众多游戏中脱颖而出。

二、卓越的产品品质

1、惊艳的画面表现

从首次公布的实机演示视频开始,《黑神话:悟空》的画面就给人带来了极大的震撼。精美的场景设计、细腻的角色建模、逼真的光影效果,无不展现出开发团队的高超技术水平。游戏中的每一个细节都经过精心打磨,无论是古老的寺庙、神秘的森林,还是激烈的战斗场景,都让人仿佛置身于一个真实的神话世界中。

2、流畅的动作设计

作为一款动作角色扮演游戏,《黑神话:悟空》的动作设计至关重要。游戏中的角色动作流畅自然,打击感十足,玩家可以通过各种技能组合和连招,体验到酣畅淋漓的战斗快感。同时,游戏还加入了变身、法术等元素,丰富了战斗的策略性和趣味性。

3、丰富的剧情内容

《黑神话:悟空》的剧情紧扣《西游记》的主题,同时又进行了大胆的创新和拓展。玩家将扮演孙悟空或者其他角色,踏上一段充满挑战和惊喜的冒险之旅。游戏中的剧情跌宕起伏,充满了悬念和反转,让玩家在游戏的过程中始终保持着高度的紧张感和好奇心。

产品经理要始终把产品品质放在首位,不断追求卓越。只有提供高品质的产品,才能赢得用户的信任和口碑,在激烈的市场竞争中立于不败之地

三、有效的营销推广

1、社交媒体的力量 《黑神话:悟空》在开发过程中,通过社交媒体平台不断发布游戏的开发进度、实机演示视频等内容,吸引了大量玩家的关注。开发团队还积极与玩家互动,听取玩家的意见和建议,不断改进游戏。这种互动式的营销方式,不仅增强了玩家的参与感和忠诚度,还为游戏的宣传推广起到了积极的作用。

2、口碑营销 《黑神话:悟空》的高品质和独特魅力,使得玩家们自愿成为游戏的宣传大使。他们通过各种渠道分享自己的游戏体验,推荐给身边的朋友和家人。这种口碑营销的方式,具有极高的可信度和影响力,能够迅速扩大游戏的知名度和用户群体。

四、持续地创新和改进

游戏行业是一个不断发展和变化的行业,玩家的需求和喜好也在不断变化。《黑神话:悟空》的开发团队深知这一点,持续倾听玩家的反馈,不断进行创新和改进。无论是增加新的游戏内容、优化游戏性能,还是推出新的玩法模式,都将为玩家带来更好的游戏体验。

产品经理要保持敏锐的市场洞察力和创新意识,不断关注行业动态和用户需求的变化,及时对产品进行调整和优化。只有不断创新和改进,才能让产品始终保持竞争力。

总之,《黑神话:悟空》的成功为产品经理们提供了一个很好的范例。通过精准的市场定位、卓越的产品品质、有效的营销推广和持续的创新改进,我们可以打造出具有强大竞争力的产品,在市场中取得成功。相信在未来,我们会看到更多像《黑神话:悟空》这样的优秀产品,为玩家带来更多的惊喜和感动。

在我们使用Python来和数据库打交道中,
SQLAlchemy
是一个非常不错的ORM工具,通过它我们可以很好的实现多种数据库的统一模型接入,而且它提供了非常多的特性,通过结合不同的数据库驱动,我们可以实现同步或者异步的处理封装。

1、SQLAlchemy介绍

SQLAlchemy
是一个功能强大且灵活的 Python SQL 工具包和对象关系映射(ORM)库。它被广泛用于在 Python 项目中处理关系型数据库的场景,既提供了高级的 ORM 功能,又保留了对底层 SQL 语句的强大控制力。
SQLAlchemy
允许开发者通过 Python 代码与数据库进行交互,而无需直接编写 SQL 语句,同时也支持直接使用原生 SQL 进行复杂查询。下面是
SQLAlchemy
和我们常规数据库对象的对应关系说明。
Engine    连接对象         驱动引擎
Session   连接池           事务  由此开始查询
Model     表                   类定义
Column     列
Query     若干行         可以链式添加多个条件
在使用SQLAlchemy时,通常会将其与数据库对象对应起来。以下是SQLAlchemy和常规数据库对象的对应关系说明:

1)
数据库表 (Database Table)

  • SQLAlchemy
    : 使用
    Table
    对象或
    Declarative Base
    中的类来表示。
  • 对应关系
    : 数据库中的每一个表对应于SQLAlchemy中的一个类,该类继承自
    declarative_base()
from sqlalchemy importColumn, Integer, String, create_enginefrom sqlalchemy.ext.declarative importdeclarative_base

Base
=declarative_base()classUser(Base):__tablename__ = 'users' #数据库表名 id = Column(Integer, primary_key=True)
name
=Column(String)
email
= Column(String)

2)
数据库列 (Database Column)

  • SQLAlchemy
    : 使用
    Column
    对象来表示。
  • 对应关系
    : 每个数据库表中的列在SQLAlchemy中表示为
    Column
    对象,并作为类的属性定义。
id = Column(Integer, primary_key=True)
name
= Column(String(50))

3)
数据库行 (Database Row)

  • SQLAlchemy
    : 每个数据库表的一个实例(对象)代表数据库表中的一行。
  • 对应关系
    : 在SQLAlchemy中,通过实例化模型类来表示数据库表中的一行。
new_user = User(id=1, name='John Doe', email='john@example.com')

4)
主键 (Primary Key)

  • SQLAlchemy
    : 使用
    primary_key=True
    参数定义主键。
  • 对应关系
    : 在数据库表中定义主键列,这列在SQLAlchemy中也需要明确标注。
id = Column(Integer, primary_key=True)

5)
外键 (Foreign Key)

  • SQLAlchemy
    : 使用
    ForeignKey
    对象来表示。
  • 对应关系
    : 在SQLAlchemy中使用
    ForeignKey
    指定关系,指向另一个表的主键列。
from sqlalchemy importForeignKeyfrom sqlalchemy.orm importrelationshipclassAddress(Base):__tablename__ = 'addresses'id= Column(Integer, primary_key=True)
user_id
= Column(Integer, ForeignKey('users.id'))
user
= relationship('User')

6)
关系 (Relationships)

  • SQLAlchemy
    : 使用
    relationship
    对象来表示。
  • 对应关系
    : 数据库中表与表之间的关系在SQLAlchemy中通过
    relationship
    来定义。
addresses = relationship("Address", back_populates="user")

7)
会话 (Session)

  • SQLAlchemy
    : 使用
    Session
    对象进行事务性操作(如查询、插入、更新、删除)。
  • 对应关系
    :
    Session
    对象类似于数据库连接对象,用于与数据库进行交互。
from sqlalchemy.orm importsessionmaker

Session
= sessionmaker(bind=engine)
session
=Session()

session.add(new_user)
session.commit()

通过以上对应关系,SQLAlchemy允许开发者以面向对象的方式与数据库交互,提供了一个Pythonic的接口来操作数据库。

2、SQLAlchemy 的同步操作

SQLAlchemy 提供了同步和异步两种操作方式,分别适用于不同的应用场景。以下是如何封装 SQLAlchemy 的同步和异步操作的方法说明:

在同步操作中,SQLAlchemy 使用传统的阻塞方式进行数据库操作。首先,定义一个基础的
Session

Engine
对象:

from sqlalchemy importcreate_enginefrom sqlalchemy.orm importdeclarative_base, sessionmakerfrom typing importGeneratorfrom core.config importsettings#常规同步处理
engine =create_engine(settings.DB_URI)
SessionLocal
= sessionmaker(autocommit=False, autoflush=False, bind=engine)def get_db() ->Generator:"""创建一个 SQLAlchemy 数据库会话-同步处理.""" try:
db
=SessionLocal()yielddbfinally:
db.close()

前面说了,使用SQLAlchemy可以实现不同数据库的统一模型的处理,我们可以对应创建不同数据库的连接(engine),如下是常规几种关系型数据库的连接处理。

#mysql 数据库引擎
engine =create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/WinFramework",
pool_recycle
=3600,#echo=True, )#Sqlite 数据库引擎 engine = create_engine("sqlite:///testdir//test.db")#PostgreSQL 数据库引擎 engine =create_engine("postgresql+psycopg2://postgres:123456@localhost:5432/winframework",#echo=True, )#SQLServer 数据库引擎 engine =create_engine("mssql+pymssql://sa:123456@localhost/WinFramework?tds_version=7.0",#echo=True, )

我们可以根据数据库的CRUD操作方式,封装一些操作,如下所示。

classCRUDOperations:def __init__(self, model):
self.model
=modeldefcreate(self, db, obj_in):
db_obj
= self.model(**obj_in.dict())
db.add(db_obj)
db.commit()
db.refresh(db_obj)
returndb_objdefget(self, db, id):return db.query(self.model).filter(self.model.id ==id).first()defupdate(self, db, db_obj, obj_in):
obj_data
= obj_in.dict(exclude_unset=True)for field inobj_data:
setattr(db_obj, field, obj_data[field])
db.commit()
db.refresh(db_obj)
returndb_objdefremove(self, db, id):
obj
=db.query(self.model).get(id)
db.delete(obj)
db.commit()
return obj

使用时,构建数据访问类进行操作,如下测试代码所示。

crud_user =CRUDOperations(User)#Create
with get_db() as db:
user
=crud_user.create(db, user_data)#Read with get_db() as db:
user
=crud_user.get(db, user_id)#Update with get_db() as db:
user
=crud_user.update(db, user, user_data)#Delete with get_db() as db:
crud_user.remove(db, user_id)

3、SQLAlchemy 的异步操作封装

对于异步操作,SQLAlchemy 使用
AsyncSession
来管理异步事务。

首先,定义一个异步的
Session

Engine
对象:

from sqlalchemy importcreate_engine, URLfrom sqlalchemy.ext.asyncio importAsyncSession, async_sessionmaker, create_async_enginefrom typing importAsyncGeneratordef create_engine_and_session(url: str |URL):try:#数据库引擎
        engine = create_async_engine(url, pool_pre_ping=True)exceptException as e:print("❌ 数据库链接失败 {}", e)
sys.exit()
else:
db_session
=async_sessionmaker(
bind
=engine, autoflush=False, expire_on_commit=False
)
returnengine, db_session#异步处理 async_engine, async_session =create_engine_and_session(settings.DB_URI_ASYNC)


async
def get_db() ->AsyncGenerator[AsyncSession, None]:"""创建一个 SQLAlchemy 数据库会话-异步处理."""async with async_session() as session:yield session

和同步的处理类似,不过是换了一个对象来实现,并且函数使用了async await的组合来实现异步操作。

为了实现我的SQLSugar开发框架类似的封装模式,我们参考SQLSugar开发框架中基类CRUD的定义方式来实现多种接口的封装处理。

参照上面的实现方式,我们来看看Python中使用泛型的处理封装类的代码。

ModelType = TypeVar("ModelType", bound=Base)
PrimaryKeyType
= TypeVar("PrimaryKeyType", int, str, float) #限定主键的类型 PageDtoType = TypeVar("PageDtoType", bound=BaseModel)
DtoType
= TypeVar("DtoType", bound=BaseModel)classBaseCrud(Generic[ModelType, PrimaryKeyType, PageDtoType, DtoType]):"""基础CRUD操作类""" def __init__(self, model: Type[ModelType]):"""数据库访问操作的基类对象(CRUD).
**Parameters**
* `model`: A SQLAlchemy model class
"""self.model= model

这样,我们就可以通过泛型定义不同的类型,以及相关的处理类的信息。

该基类函数中,异步定义get_all的返回所有的数据接口如下所示。

    async defget_all(
self, sorting: Optional[str], db: AsyncSession
)
-> List[ModelType] |None:"""根据ID字符串列表获取对象列表

:param sorting: 格式:name asc 或 name asc,age desc
"""query=select(self.model)ifsorting:
query
=self.apply_sorting(query, sorting)

result
=await db.execute(query)
items
=result.scalars().all()return items

而对应获得单个对象的操作函数,如下所示。

    async def get(self, id: PrimaryKeyType, db: AsyncSession) ->Optional[ModelType]:"""根据主键获取一个对象"""query= select(self.model).filter(self.model.id ==id)

result
=await db.execute(query)
item
=result.scalars().first()return item

而创建对象的操作函数,如下所示。

    async def create(self, obj_in: DtoType, db: AsyncSession, **kwargs) ->bool:"""创建对象,使用 kwargs 时可以扩展创建对象时的字段。

:param obj_in: 对象输入数据
:param kwargs: 扩展字段,如格式: is_deleted=0, is_active=1
""" try:ifkwargs:
instance
= self.model(**obj_in.model_dump(), **kwargs)else:
instance
= self.model(**obj_in.model_dump()) #type: ignore db.add(instance)
await db.commit()
returnTrueexceptSQLAlchemyError as e:print(e)
await db.rollback()
return False

这个异步函数
create
旨在通过 SQLAlchemy 在数据库中创建一个对象,同时允许通过
kwargs
参数动态扩展创建对象时的字段。

  • async def
    : 表明这是一个异步函数,可以与
    await
    一起使用。
  • self
    : 这是一个类的方法,因此
    self
    引用类的实例。
  • obj_in: DtoType
    :
    obj_in
    是一个数据传输对象(DTO),它包含了需要插入到数据库中的数据。
    DtoType
    是一个泛型类型,用于表示 DTO 对象。
  • db: AsyncSession
    :
    db
    是一个 SQLAlchemy 的异步会话(
    AsyncSession
    ),用于与数据库进行交互。
  • **kwargs
    : 接受任意数量的关键字参数,允许在对象创建时动态传入额外的字段。
  • obj_in.model_dump()
    : 假设
    obj_in
    是一个 Pydantic 模型或类似结构,它可以通过
    model_dump()
    方法转换为字典格式,用于创建 SQLAlchemy 模型实例。
  • self.model(**obj_in.model_dump(), **kwargs)
    : 使用
    obj_in
    中的字段以及通过
    kwargs
    传入的扩展字段来实例化 SQLAlchemy 模型对象。如果
    kwargs
    非空,它们会被解包并作为额外的字段传入模型构造函数。
  • db.add(instance)
    : 将新创建的对象添加到当前的数据库会话中。
  • await db.commit()
    : 提交事务,将新对象保存到数据库。
  • SQLAlchemyError
    : 捕获所有 SQLAlchemy 相关的错误。
  • await db.rollback()
    : 在发生异常时,回滚事务,以防止不完整或错误的数据被提交。

通过上面的封装,我们可以测试调用的处理例子

from crud.customer importcustomer as customer_crudfrom models.customer importCustomerfrom pydantic importBaseModelfrom schemas.customer importCustomerDto, CustomerPageDto

async
deftest_list_customer():
async with get_db() as db:
print("get_list")
totalCount, items
=await customer_crud.get_list(
CustomerPageDto(skipCount
=0, maxResultCount=10, name="test"),
db,
)
print(totalCount, items)for customer incustomers:print(customer.name, customer.age)print("get_by_name")
name
= "test"customer=await customer_crud.get_by_name(
name,
db,
)
ifcustomer:print(customer.name, customer.age)else:print(f"{name} not found")print("soft delete")
result
= await customer_crud.delete_byid(customer.id, db, is_deleted=1)print("操作结果:", result)print("soft delete_byids")
result
=await customer_crud.delete_byids(
[
"11122", "2C5F8672-2AA7-4B14-85AD-DF56F5BF7F1F"], db, is_deleted=1)print(f"Soft delete successful: {result}")print("update_by_column")
result
=await customer_crud.update_by_column("id", customer.id, {"age": 30}, db
)
print("操作结果:", result)

await db.close()

同步和异步处理的差异:

  • 同步操作
    适用于传统的阻塞式应用场景,比如命令行工具或简单的脚本。
  • 异步操作
    更适合异步框架如
    FastAPI
    ,可以提高高并发场景下的性能。

通过封装数据库操作,可以让代码更具复用性和可维护性,支持不同类型的操作场景。