2024年2月

数据治理可以有效保障数据建设过程在一个合理高效的监管体系下进行,最终提供高质量、安全、流程可追溯的业务数据。

1 序:数据治理体系

企业数据治理体系
包括
元数据管理

主数据管理

数据资产管理

数据质量管理

数据安全

数据标准
等内容。

2 最新一代数据治理开源软件

2.0 一站式数据开发集成平台

DataSphere Studio : 982 fork / 2.9k star | Since : Nov 24, 2019

  • DataSphere Studio
  • https://github.com/WeBankFinTech/DataSphereStudio
    DataSphere Studio(简称 DSS)是微众银行自研的数据应用开发管理集成框架。
    基于插拔式的集成框架设计,及计算中间件 Linkis ,可轻松接入上层各种数据应用系统,让数据开发变得简洁又易用。在统一的 UI 下,DataSphere Studio 以工作流式的图形化拖拽开发体验,将满足从数据交换、脱敏清洗、分析挖掘、质量检测、可视化展现、定时调度到数据输出应用等,数据应用开发全流程场景需求。DSS 通过插拔式的集成框架设计,让用户可以根据需要,简单快速替换 DSS 已集成的各种功能组件,或新增功能组件。借助于 Linkis 计算中间件的连接、复用与简化能力,DSS 天生便具备了金融级高并发、高可用、多租户隔离和资源管控等执行与调度能力。
  • 主要编程语言:Java / Scala

  • 社区活跃情况

2.1 元数据

Open Metadata : 753 fork / 3.7k star | Since : Aug 1, 2021 【推荐】

  • Open Metadata | 元数据管理

  • 主要编程语言 : TypeScript / Java / Python

  • 社区活跃情况

Commits

Amundsen : 945 fork / 4.2k star | Since : Feb 3, 2019

  • Amundsen | 数据发现、元数据引擎
  • 主要编程语言: Python / TypeScript

  • 社区活跃情况

Commits

Marquez : 279 fork / 1.6k star | Since : Jul 1, 2018

  • Marquez
  • https://marquezproject.ai/
  • https://github.com/MarquezProject/marquez
    Marquez 是一款WeWork发布并开源的元数据服务,用于数据生态系统元数据的收集、汇总及可视化。它维护着数据集的消费和生产,为作业运行时和数据集访问频率提供全局可见性,提供集中的数据集生命周期管理等。

  • 主要编程语言:Java / TypeScript

  • 社区活跃度

Commits

Data Hub : 2.6K fork / 9K star | Since : Nov 15, 2015

  • Data Hub |

DataHub 是由Linkedin开源的,官方Slogan:The Metadata Platform for the Modern Data Stack - 为现代数据栈而生的元数据平台。
目的就是为了解决多种多样数据生态系统的元数据管理问题
它提供元数据检索、数据发现、数据监测和数据监管能力,帮助大家解决数据管理的复杂性。

  • 主要编程语言:Java / Python / TypeScript

  • 社区活跃情况:

Commits

Apache Atlas : 817 fork / 1.7k star | Since : Nov 16, 2014

  • Apache Atlas | 元数据、数据血缘

Apache Atlas是Apache Hadoop的数据和元数据治理的框架,是Hortonworks 公司联合其他厂商与用户于2015年发起数据治理倡议,2015年5月5日进入Apache孵化,2017年6月21日成为Apache顶级项目。
是为解决Hadoop生态系统的元数据治理问题而产生的开源项目。它为
Hadoop集群
提供了包括数据分类、集中策略引擎、数据血缘、安全和生命周期管理在内的元数据治理核心登能力。

  • 主要编程语言 : Java / JavaScript

  • 社区活跃度

Commits

Dataedo [闭源]

  • Dataedo | 数据字典、元数据管理

Dataedo是一个开源的数据字典和元数据管理工具。它可以帮助用户创建和维护数据字典,并对数据进行元数据建模和文档化。

ERD Online [闭源]

  • ERD Online

ERD(Entity-Relationship Diagram) Online

全球第一个开源、免费在线数据建模、元数据管理平台
(口号)。提供简单易用的元数据设计、关系图设计、SQL查询等功能,辅以版本、导入、导出、数据源、SQL解析、审计、团队协作等功能、方便我们快速、安全的管理数据库中的元数据。

2.2 数据集成

Sea Tunnel : 1.5k fork / 7k star | Since : Jul 30, 2017 【推荐】

  • Sea Tunnel

  • 主要编程语言:Java

  • 社区活跃情况

Kettle : 3.2k fork / 7.2k star | Since : Oct 6, 2013

  • Kettle (全名 : Pentaho Data Integration - Kettle)
  • 主要编程语言:Java

  • 社区活跃情况

ChunJun : 1.7k / 3.9k | Since : Apr 29, 2018

  • ChunJun(纯均)
  • https://dtstack.github.io/chunjun/
  • https://github.com/DTStack/chunjun
  • https://www.dtstack.com/resources?src=dsyzh
    ChunJun 是易用、稳定、高效的批流一体的数据集成框架。
    该项目最早启动的初衷是为【袋鼠云】的核心业务一站式大数据基础软件 - 数栈 ,打造一款具有 “袋鼠特色 “的核心计算引擎,承载实时平台、离线平台、数据资产平台等多个应用的底层数据同步及计算任务。
    ChunJun 基于 Flink 并采用插件式架构,将源数据库抽象成 Reader 插件,将目的数据库抽象成 Writer 插件。

  • 核心特点
  • 基于 json、sql 构建任务
  • 支持多种异构数据源之间数据传输
  • 支持断点续传、增量同步
  • 支持任务脏数据存储管理
  • 支持 Schema 同步
  • 支持 RDBS 数据源实时采集

  • 主要编程语言:Java

  • 社区活跃情况

DataX : 5.2k fork / 14.8k star

  • DataX => DataWorks (商业版)
  • https://github.com/alibaba/DataX
    DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各种异构数据源之间高效的数据同步功能。
  • 主要编程语言:Java(97.6%)、Python (2.3%)

其他成熟度低的产品

  • Cannal : 7.5 fork / 27.4k star | Since : Sep 21, 2014
  • 主要编程语言:Java

  • 社区活跃情况

2.3 数据开发

  • Apache Flink

Apache Spark 【推荐】

  • Apache Spark

2.4 数据质量

2.5 数据标准

2.6 数据模型 /数据建模 / 数据资产

2.7 数据分析与可视化

Superset 【推荐】

  • Superset | 开源BI

由Airbnb贡献的轻量级BI产品;
数据源方面,Superset支持CSV、MySQL、Oracle、Redshift、Drill、Hive、Impala、Elasticsearch等27种数据源,并深度支持Druid。

Grafana

  • Grafana

Grafana 主要用于对接
时序数据库
,分析展示
监控数据

目前支持的数据源包括 InfluxDB、Elasticsearch、Graphite、Prometheus 等,同时也支持 MySQL、MSSQL、PG 等关系数据库。

Metabase

  • Metabase

数据源方面,Metabase 支持 Redshift、Druid、Google BigQuery、MongoDB、MySQL、PG 等 15 种数据源。

DataEase

  • DataEase |

理念:人人可用的开源数据可视化分析工具。

ECharts

  • ECharts | 基于 JavaScript 的开源可视化图表库

2.8 调度系统 & 工作流系统

Apache Dolphi Scheduler 【推荐】

  • Apache Dolphi Scheduler

一个分布式和可扩展的开源工作流协调平台,具有强大的DAG可视化界面

XXL-JOB 【推荐】

  • XXL-Job

X 参考文献


0. 前言

Kubernetes:kube-scheduler 源码分析
介绍了
kube-scheduler
调度 Pod 的逻辑。文中有一点未提的是,在
Kubernetes
集群中,
kube-scheduler
组件是多副本,单实例运行。仅有一个副本作为 leader 运行,当发生故障时,其它副本会抢占为 leader 继续运行。这种机制通过 leader election 实现,本文将从源码角度分析 leader election 选举的实现。并且,在此基础上,在回头看
kube-scheduler
是怎么结合
leader elction
实现多副本,单实例运行的。

1. leader election

leader election 通过多个副本抢占资源锁的方式实现单实例运行。在 Kubernetes 中,[Configmap|Lease|Endpoint] 可以作为资源锁的实现。

1.1 示例

直接看 leader election 代码容易晕,这里从示例入手,看 leader election 是怎么运行的。

// lease.go
/*
package main

func buildConfig(kubeconfig string) (*rest.Config, error) {
	...
}

func main() {
	klog.InitFlags(nil)
	var kubeconfig string
	var leaseLockName string
	var leaseLockNamespace string
	var id string

	flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
	flag.StringVar(&id, "id", uuid.New().String(), "the holder identity name")
	flag.StringVar(&leaseLockName, "lease-lock-name", "kube-scheduler", "the lease lock resource name")
	flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "kube-system", "the lease lock resource namespace")
	flag.Parse()

	if leaseLockName == "" {
		klog.Fatal("unable to get lease lock resource name (missing lease-lock-name flag).")
	}
	if leaseLockNamespace == "" {
		klog.Fatal("unable to get lease lock resource namespace (missing lease-lock-namespace flag).")
	}

	config, err := buildConfig(kubeconfig)
	if err != nil {
		klog.Fatal(err)
	}
	client := clientset.NewForConfigOrDie(config)

	run := func(ctx context.Context) {
		klog.Info("Controller loop...")
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	ch := make(chan os.Signal, 1)
	signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
	go func() {
		<-ch
		klog.Info("Received termination, signaling shutdown")
		cancel()
	}()

	lock := &resourcelock.LeaseLock{
		LeaseMeta: metav1.ObjectMeta{
			Name:      leaseLockName,
			Namespace: leaseLockNamespace,
		},
		Client: client.CoordinationV1(),
		LockConfig: resourcelock.ResourceLockConfig{
			Identity: id,
		},
	}

	leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
		Lock: lock,
		ReleaseOnCancel: true,
		LeaseDuration:   60 * time.Second,
		RenewDeadline:   15 * time.Second,
		RetryPeriod:     5 * time.Second,
		Callbacks: leaderelection.LeaderCallbacks{
			OnStartedLeading: func(ctx context.Context) {
				// we're notified when we start - this is where you would
				// usually put your code
				run(ctx)
			},
			OnStoppedLeading: func() {
				// we can do cleanup here
				klog.Infof("leader lost: %s", id)
				os.Exit(0)
			},
			OnNewLeader: func(identity string) {
				// we're notified when new leader elected
				if identity == id {
					// I just got the lock
					return
				}
				klog.Infof("new leader elected: %s", identity)
			},
		},
	})
}

详细代码戳 这里

运行 id 为 1 的进程抢占 lease:

# go run lease.go --kubeconfig=/root/.kube/config -id=1 -lease-lock-name=example -lease-lock-namespace=default 
I0222 09:44:40.389093 4054932 leaderelection.go:250] attempting to acquire leader lease default/example...
I0222 09:44:40.400607 4054932 leaderelection.go:260] successfully acquired lease default/example
I0222 09:44:40.402998 4054932 lease.go:87] Controller loop...

可以看到,id 为 1 的进程(简称进程 1)抢占到 lease,接着进入进程的运行逻辑。

继续运行 id 为 2 的进程(简称进程 2)抢占 lease:

# go run lease.go --kubeconfig=/root/.kube/config -id=2 -lease-lock-name=example -lease-lock-namespace=default
I0222 10:26:53.162990 4070458 leaderelection.go:250] attempting to acquire leader lease default/example...
I0222 10:26:53.170046 4070458 lease.go:151] new leader elected: 1

进程 2 抢占不到 lease,因为此时进程 1 是 leader,且 lease 未被释放或者过期。

kill 掉进程 1(注:此 id 不是进程的 PID),查看进程 2 能否抢占到 lease:

# go run lease.go --kubeconfig=/root/.kube/config -id=2 -lease-lock-name=example -lease-lock-namespace=default
I0222 10:26:53.162990 4070458 leaderelection.go:250] attempting to acquire leader lease default/example...
I0222 10:26:53.170046 4070458 lease.go:151] new leader elected: 1
I0222 10:31:54.704714 4070458 leaderelection.go:260] successfully acquired lease default/example
I0222 10:31:54.704931 4070458 lease.go:87] Controller loop...

可以看到,进程 2 成功抢占 lease。

1.2 源码分析

1.2.1 lease 资源

lease 作为 leader election 的资源锁存在,多副本抢占 lease 实际是对 lease 进行 [Get|Create|Update] 操作的过程。我们看 lease 在 Kubernetes 的资源表示:

# kubectl describe lease example -n default
Name:         example
Namespace:    default
Labels:       <none>
Annotations:  <none>
API Version:  coordination.k8s.io/v1
Kind:         Lease
Metadata:
  ...
Spec:
  Acquire Time:            2024-02-22T08:31:54.696415Z
  Holder Identity:         2
  Lease Duration Seconds:  60
  Lease Transitions:       5
  Renew Time:              2024-02-22T08:51:47.060020Z
Events:                    <none>


Spec
域中有五个字段,分别介绍如下:

  • Acquire Time: 首次获取 lease 的时间。
  • Holder Identity: 当前持有 lease 的用户身份。
  • Lease Duration Seconds: lease 过期时间。
  • Lease Transitions: lease 被多少 Holder 持有过。
  • Renew Time: Holder 刷新 lease 的时间。lease 的持有者需要 renew lease,否则其它副本将抢占过期的 lease。

多副本抢占 lease 的信息将被记录到这几个字段中。lease 存储在 etcd 中,通过 Raft 算法实现 lease 资源的唯一性和原子性,以保证多副本仅可抢占唯一的 lease。

1.2.2 抢占状态图

多副本抢占 lease 的状态图如下,结合状态图看源码会更加清晰。

image

1.2.3 创建 lease

我们从创建 lease 开始分析。

首先,删掉 lease 资源。

# kubectl get lease
NAME      HOLDER   AGE
example   2        23h
# kubectl delete lease example
lease.coordination.k8s.io "example" deleted

然后,调式模式下运行 lease.go,进入
leaderelection.RunOrDie
查看函数做了什么。

// k8s.io/client-go/tools/leaderelection/leaderelection.go
func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
	// 创建 leader elector 实例 le
	le, err := NewLeaderElector(lec)
	if err != nil {
		panic(err)
	}
	...

	// 运行 leader elector 实例
	le.Run(ctx)
}

进入
LeaderElector.Run

// k8s.io/client-go/tools/leaderelection/leaderelection.go
func (le *LeaderElector) Run(ctx context.Context) {
	...

	if !le.acquire(ctx) {
		return // ctx signalled done
	}
	
	...

	le.renew(ctx)
}

LeaderElector.Run
中有两个重要的函数
LeaderElector.acquire

LeaderElector.renew

LeaderElector.acquire
负责获取(抢占)锁资源。
LeaderElector.renew
负责更新锁资源,从而一直持有锁。

进入
LeaderElector.acquire

// k8s.io/client-go/tools/leaderelection/leaderelection.go
func (le *LeaderElector) acquire(ctx context.Context) bool {
	...
	succeeded := false
	desc := le.config.Lock.Describe()
	klog.Infof("attempting to acquire leader lease %v...", desc)

	wait.JitterUntil(func() {
		// 重点是这里,tryAcquireOrRenew 负责获取锁,或者更新已经获取锁的信息
		succeeded = le.tryAcquireOrRenew(ctx)
		le.maybeReportTransition()
		if !succeeded {
			klog.V(4).Infof("failed to acquire lease %v", desc)
			return
		}
		le.config.Lock.RecordEvent("became leader")
		le.metrics.leaderOn(le.config.Name)
		klog.Infof("successfully acquired lease %v", desc)
		cancel()
	}, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
	return succeeded
}

LeaderElector.acquire
函数的重点是
LeaderElector.tryAcquireOrRenew

// k8s.io/client-go/tools/leaderelection/leaderelection.go
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
	now := metav1.NewTime(le.clock.Now())
	leaderElectionRecord := rl.LeaderElectionRecord{
		HolderIdentity:       le.config.Lock.Identity(),
		LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
		RenewTime:            now,
		AcquireTime:          now,
	}

	// 1. obtain or create the ElectionRecord
	oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
	if err != nil {
		if !errors.IsNotFound(err) {
			klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
			return false
		}
		if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
			klog.Errorf("error initially creating leader election record: %v", err)
			return false
		}

		le.setObservedRecord(&leaderElectionRecord)

		return true
	}

	...
	return true
}

LeaderElector.tryAcquireOrRenew
实现的就是状态转移图中的逻辑,这里我们仅关注创建锁这一流程。

首先,
LeaderElector.config.Lock.Get
通过
client-go
获取 etcd 中锁的信息。

// k8s.io/client-go/tools/leaderelection/resourcelock/leaselock.go
func (ll *LeaseLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
	// 获取锁(lease)
	lease, err := ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ctx, ll.LeaseMeta.Name, metav1.GetOptions{})
	if err != nil {
		return nil, nil, err
	}

	...
	return record, recordByte, nil
}

没有找到锁,进入
LeaderElector.config.Lock.Create
创建锁。

// k8s.io/client-go/tools/leaderelection/resourcelock/leaselock.go
func (ll *LeaseLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
	var err error
	ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Create(ctx, &coordinationv1.Lease{
		ObjectMeta: metav1.ObjectMeta{
			Name:      ll.LeaseMeta.Name,
			Namespace: ll.LeaseMeta.Namespace,
		},
		Spec: LeaderElectionRecordToLeaseSpec(&ler),
	}, metav1.CreateOptions{})
	return err
}

查看创建锁的信息:

# kubectl describe lease example -n default
Name:         example
Namespace:    default
Labels:       <none>
Annotations:  <none>
API Version:  coordination.k8s.io/v1
Kind:         Lease
Metadata:
  ...
Spec:
  Acquire Time:            2024-02-23T05:42:07.781552Z
  Holder Identity:         1
  Lease Duration Seconds:  60
  Lease Transitions:       0
  Renew Time:              2024-02-23T05:42:07.781552Z

1.2.4 更新 lease

接着看状态转移图中的更新 lease 的流程,场景是 lease 资源已存在并过期,holder 是 id 为 1 的进程。

进入
LeaderElector.tryAcquireOrRenew

func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
	...
	// 1. obtain or create the ElectionRecord
	oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
	if err != nil {
		...
	}

	// 2. Record obtained, check the Identity & Time
	if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
		le.setObservedRecord(oldLeaderElectionRecord)

		le.observedRawRecord = oldLeaderElectionRawRecord
	}
	if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
		le.observedTime.Add(time.Second*time.Duration(oldLeaderElectionRecord.LeaseDurationSeconds)).After(now.Time) &&
		!le.IsLeader() {
		klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
		return false
	}

	// 3. We're going to try to update. The leaderElectionRecord is set to it's default
	// here. Let's correct it before updating.
	if le.IsLeader() {
		leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
		leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
	} else {
		leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
	}

	// update the lock itself
	if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
		klog.Errorf("Failed to update lock: %v", err)
		return false
	}
}


LeaderElector.tryAcquireOrRenew
中获取锁,记录锁信息,比较锁 holder 是否是当前请求的 holder。如果是,记录锁的
AcquireTime

LeaderTransitions
。最后,进入
LeaderElector.config.Lock.Update
更新锁:

func (ll *LeaseLock) Update(ctx context.Context, ler LeaderElectionRecord) error {
	...

	lease, err := ll.Client.Leases(ll.LeaseMeta.Namespace).Update(ctx, ll.lease, metav1.UpdateOptions{})
	if err != nil {
		return err
	}

	ll.lease = lease
	return nil
}

查看更新的 lease 信息:

# kubectl describe lease example -n default
Name:         example
Namespace:    default
Labels:       <none>
Annotations:  <none>
API Version:  coordination.k8s.io/v1
Kind:         Lease
Metadata:
  ...
Spec:
  Acquire Time:            2024-02-23T05:42:07.781552Z
  Holder Identity:         1
  Lease Duration Seconds:  60
  Lease Transitions:       0
  Renew Time:              2024-02-23T06:00:52.802923Z

可以看到,holder 更新了 Renew Time,并且这里的 Lease Transitions 未更新。

1.2.5 renew lease

当 holder 拿到 lease 后需要 renew lease,以一直持有 lease。renew 是在获取锁之后,实现如下:

func (le *LeaderElector) Run(ctx context.Context) {
	...

	if !le.acquire(ctx) {
		return // ctx signalled done
	}

	...
	le.renew(ctx)
}

func (le *LeaderElector) renew(ctx context.Context) {
	...
	wait.Until(func() {
		...
		err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
			// 获取并且 renew lease
			return le.tryAcquireOrRenew(timeoutCtx), nil
		}, timeoutCtx.Done())

		le.maybeReportTransition()
		desc := le.config.Lock.Describe()
		if err == nil {
			klog.V(5).Infof("successfully renewed lease %v", desc)
			return
		}
		le.metrics.leaderOff(le.config.Name)
		klog.Infof("failed to renew lease %v: %v", desc, err)
		cancel()
	}, le.config.RetryPeriod, ctx.Done())

	// if we hold the lease, give it up
	if le.config.ReleaseOnCancel {
		le.release()
	}
}

renew lease 也是通过
LeaderElector.tryAcquireOrRenew
更新 lease 信息实现的,这里就不多赘述了。

2. kube-scheduler 和 leader election

kube-scheduler 中调度和 leader election 结合的部分在 kube-schduler 的
Run
函数:

func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
	...
	// If leader election is enabled, runCommand via LeaderElector until done and exit.
	// 配置 leader election
	if cc.LeaderElection != nil {
		cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
			// OnStartedLeading 回调函数,当抢占到 lease 时回调
			OnStartedLeading: func(ctx context.Context) {
				...
				// 作为 leader 运行调度器的调度逻辑
				sched.Run(ctx)
			},
			OnStoppedLeading: func() {
				...
				}
			},
		}

		leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
		if err != nil {
			return fmt.Errorf("couldn't create leader elector: %v", err)
		}

		// 运行 leader election 抢占 lease
		leaderElector.Run(ctx)

		return fmt.Errorf("lost lease")
	}

	// Leader election is disabled, so runCommand inline until done.
	close(waitingForLeader)
	sched.Run(ctx)
	return fmt.Errorf("finished without leader elect")
}

3. 小结

本文从源码角度分析 leader election 的实现,并且介绍了 leader election 和 kube-scheduler 结合实现是怎么实现多副本,单实例运行的。


一、程序员不善言词

在大家的印象中,程序员好像是一群不善言词的理工男。为什么大家会有这种刻板的印象呢?

因为程序员的工作,只需要一台电脑,一根网线,就可以开始工作了。好像不需要与人打交道。一段进入到工作状态,编码的时候还不希望被别人打扰,其实是编码思路不愿意被打断。
这时候,他们就可以两耳不闻窗外事,一心只编俺的代码。此时编写程序,是不需要与其它相关人员沟通,只需要和电脑打交道。

但是,实际的情况是不是都不需要与人沟通了呢?当然不是。上面说的情况只是开发一款软件产品或产品功能过程中的一个步骤。
一款软件产品的开发或产品的一项功能开发,到最后上线运营,中间要经历很多步骤。

如下图,一款产品开发的简单步骤:

image

上图中,编码开发还包括美工、UI、前端、测试、后端等。在最后面还有一个步骤 - 产品运营。所以开发一款产品,涉及到的利益相关人员很多,需要沟通的人也很多。
在这里,程序员最需要沟通的人员就是产品经理。

二、与产品经理的沟通

程序员与产品经理是 2 种不同的职业,主要职能和思维方式都不相同,专注的点也不同。

产品经理vs程序员职能对比

产品经理专注于用户需求,竞品分析,产品设计,产品运营指标等等。程序员一般专注于接口设计,程序设计,技术架构,解决方案设计,编码实现,产品落地等等。

image

产品经理vs程序员思维方式对比

产品经理与程序员的思维方式也不相同。

产品经理主要想的是用户需求,也就是用户需要什么,在什么场景下需要。用户内心需要什么。竞品有哪些,产品体验怎么做才更好,就像 iphone 的使用体验。怎么让用户不需要思考很顺畅的使用产品,用户怎么增长。产品怎么挣钱,商业模式是什么等。比较关注上层和大的方面。

程序员想的是功能怎么实现才好,下次需求变动少改代码。技术选型,哪些技术比较适合现在的项目。技术怎么设计和架构,才能应对未来的变化。有哪些好的解决方案。有什么新技术。关注技术细节。功能怎么实现落地。比较关注技术细节。

image

鸡同鸭讲的问题

鸡同鸭讲最主要的问题,就是沟通双方没有共同的语言。
产品经理和程序员,从上面 2 组图片里的内容就可以看出来,它们各自的职能和思维方式都有很大的不同。在工作上,大家各自职责内的专业术语都没有多少相同。这就有可能在沟通时,导致鸡同鸭讲的问题。产品经理说的内容,程序员不能很好的理解。

怎么办?

  • 第一:产品经理能学一些技术相关的内容,不需要学得太深,能顺利沟通就好。毕竟产品经理也不需要编码实现产品功能。这也是为什么有很多技术转产品的一个原因,就是有技术基础,沟通起来会比较有共同语言。
  • 第二:技术人员学一点产品知识,利于彼此沟通。

这个大家可以相互开讲座,培训彼此所需要的知识。还可以增加彼此之间的了解。

只有彼此了解,才会理解,沟通才能更加顺利。

不合理的需求

先看一个流传很广的需求故事,就是产品经理给程序员提了一个需求:

要求app的主题颜色可以随着用户手机壳颜色改变而变化。
然后,程序员和产品经理就干了起来。

这种看起来,就目前阶段的技术来说,就是一个让人很无语的需求。属于无理需求的范围了。

在需求开发时,如果程序员认为是一个不合理的需求,一定要及时与产品经理沟通,询问这个做这个需求的目的,对产品有哪些好处,能带来什么收益?最好能有一些具体的指标,比如用户可能增加多少?用户停留页面时间增加多少?等等。而且上线之后,要和产品经理一起复盘数据,是否达到了当初预期效果。登记起来,既可以做考核指标,提出需求的有效比例,也是对产品经理的警醒。

当然其它需求也要做同样的操作,但是你认为不合理的需求更要重点关注,以此警醒产品经理提需求时要更加深入思考,提出合理的需求。

需求理解不一致

最能理解这个主题的意思,莫过于一幅树秋千的漫画:

image

(图片来自于网络,最原始图来自于:www.projectcartoon.com)

客户自己真正需要的和客户描述的往往不一致,在经过相关人员的层层描述,离客户真正需要的越来越远。用户表达出来的需求和自身真正需要的相差甚远。
同理,产品经理获得的用户需求可能和用户需要的相差甚远,即使差的不远,到了开发人员(程序员)这里,开发实现也可能与产品经理描述的需求差的很远,最终导致产品没能满足用户需求。

怎么办?

这就需要开发人员(程序员)与产品经理不断的进行沟通,持续修复错误的认知,慢慢才能接近本质。
同样,开发人员(程序员)有时不可能一开始就有直达本质的解决方案,需要在开发过程中,渐渐弄明白本质的解决方案。

最重要的 3 点:

  • 一是从不同的角度来理解用户的需求,尽量对需求有深入的理解。
  • 二是不断的沟通,修复错误的认知,加快正确的认知过程。
  • 三是程序设计的灵活性,出现问题时能尽快调整程序。

三、与上级领导的沟通

与上级领导的沟通,一般有如下几种:

  • 项目任务分配及对任务的预期
  • 一些会议,比如周会
  • 1 对 1 沟通

项目任务分配和做任务时:
做任务前要做好是,任务完成的预估时间表。
在做的过程中,如果遇到了困难,要及时向领导反馈情况。每隔一段时间向领导汇报项目的进度情况,让他对项目的进展做到心里有数。

件件有着落,事事有回音。
及时反馈任务完成进度情况。

完成后,复盘总结。

1 对 1 沟通:

1 对 1 的沟通,是与领导沟通的好时机,你可以把在工作中遇到的困惑问题,学习方向等等问题,都可以与领导好好沟通,寻求它的意见或建议。

在沟通前,一定要好好准备,可以把想要沟通的问题都写在纸张上。

四、技术提问

现在随着 AI 技术的发展,出现了 ChatGPT ,能智能回答问题的应用,它可以直接给出答案,这给技术人带来了不少福音。

在还有就是向搜索引擎提问,一种常见的是百度、谷歌、Bing 这种传统的搜素引擎。
还有一种是结合了 ChatGPT、Bard 等的 AI 型搜索引擎,也是搜索问题答案的很好的方式。

最后一种常见的就是在问答社区、群里进行提问。这种提问,有一个很好的提问指引文章,就是
提问的智慧
这篇文章,文章很长,但值得一看,会有收获的。

举一个例子,比如编码出现了 bug,不知道这个 bug 怎么解决,我要怎么提问才能更好的得到别人的回答:

  1. 交代编码环境:使用编程语言的版本,使用的操作系统是 Linux 还是 Win 等和它的版本
  2. 代码背景:我想用代码实现一个什么样的功能
  3. 代码实现:把你编写的代码 Copy 一份出来,给别人检查,有注释更好,别人能理解你代码实现的步骤和逻辑。最好能 Copy 一份出来在线运行。当然敏感信息可以去掉。比如 Go 编程语言,有
    Go Playground
  4. bug 报错信息:给出 bug 报错的详尽信息,这是修正 bug 所需的最重要的线索
  5. 反馈:修复 bug 后,最好能给出你修复了的反馈信息。比如 bug 具体错误在哪?怎么修复的等等
  6. 感谢:最后要表达你的感谢,感谢别人的帮助

还有一点,就是谦虚的请教。因为群里聊天有时容易情绪化,都隔着一条长长的网线。

五、信息传递过程的损耗

我们想传递信息时,说话表达是最普遍的方式。

我们想给某人传递一条信息时的过程,首先自己心里想说的,然后嘴巴用语言表达出来的,把信息传递给对方,对方接收到信息被听到的信息,理解你的语言信息,听懂了的信息(可能只有一部分),最后才是执行。

信息从传递到被接收到最后执行,有这么一个过程。信息在这个传递的过程中是会损耗。我们在用户增长中有一个漏斗模型 AARRR。同理,在信息传递过程中,也有一个信息漏斗效应。

信息传递漏斗效应,模型示意图:

image

这也是上面的小节中,需求理解不一致导致的原因,信息在传递过程中是会损耗的,到最后需求理解就出现了很大的偏差。

六、沟通的环境和话语的判断

沟通的环境对于沟通双方来说都很重要,总体来说要有一个安全的环境。

沟通的地方,一般在公司里的话,会选择独立的会议室,这样就不会被人打扰,也不会被人听到沟通的信息。

沟通双方的情绪也要是“安全”。如果是对立的话,要想法设法降低对立情绪。

话语的判断:

对于别人说的一句话,是事实,是观点,还是个人感受,一定要分辨清楚。

七、更好的表达方法

金字塔原理

使用金字塔原理来进行条理分明,逻辑清晰的表达。
金字塔原理这本书里有一条沟通的方法:
自上而下,结论先行

image

金字塔中的结构关系:

  • 纵向关系:总结论与份论点、论据之间的纵向关系。

上一层的结论或思想是下一层的概括总结。自上而下结论先行

  • 横向关系:分论点之间的横向关系。

横向关系有,
演绎关系
从因到果,层层递进,推理的结论就是上一层的结论或思想。还有一个,
归纳关系
就是事物或思想之间有共性,上一层就是对下一层的概括总结。

叙述的方式:可以从 背景、冲突、疑问、回答 四个要素来展开。

当然书里面还有其它的方法,建议大家多看看这本书,然后运用它们。

非暴力沟通

非暴力沟通
是一本关于沟通方面很好的书籍,这本书描述了一种平和的沟通方式 ,通过观察、感受、需要和请求这四步来进行沟通,而不是用道德评判、比较、回避责任和强人所难的方式进行沟通。

导致非暴力沟通的一个重要原因:就是忽视了对方的
感受

比如夫妻双方沟通时,要多关注感受,而不是讲了多大的道理。

说下 2 种典型的很不好的沟通方式,道德评判和比较。

道德评判
:当一个人的话语和行为不符合我们的价值观时,我们会习惯给出道德评判。比如这个人没素质,这个人太自私等等。

比较
:比较是更容易发生的。比如父母眼中别人家的孩子怎么样怎么样。还有我有一个朋友xxx怎么样。反正都是别人家的好。

非暴力沟通的 4 个步骤:

  • 第一步:观察事实,而不是评论
  • 第二步:表达自己的感受
  • 第三步:表达自己的需要
  • 第四步:提出明确的诉求

我们在听别人说话时,不仅要理解说话的内容,还要感知对方说话时的情绪。

更多内容请好好看看这本书。

欢迎大家评论,点推荐。如果文中有不足之处,也欢迎大家留言批评并指正。

大家在工作中,遇到什么沟通的问题,或沟通的好方法,也欢迎大家留言评论。

八、参考

image

说明

曾经在学习java面向对象时,你是否会为面向对象的封装-继承-抽象-多态-组合等各种概念搞得稀里糊涂,乃至反复阅读,背诵其相关概念,结果一段时间过后又还给了时间。。。
这种经历简直令人发指,让人无法忍受,难道就没有哪个地方能把它一次说清楚,老百姓看了以后纷纷醍醐灌顶,不再重蹈覆辙???答案就在本文,请各位耐心观看,也可无脑收藏,用时瞟一眼再次醍醐灌顶即可。

先引用一句祖师爷《java编程思想》的教诲:
有一条通用准则:使用继承表达行为的差异,使用组合表达状态的变化。
当你不知道该用
继承
还是
组合
的时候可以参考这句话。

封装

封装是指将对象的属性和行为(即数据和方法)结合在一个独立的单元中,并隐藏对象的内部细节,只对外提供公共的访问方式。封装有助于保护对象内部的数据不被外部随意访问和修改,同时提高了代码的安全性和可维护性。

public class Person {  
    // 私有属性,封装了人的姓名和年龄  
    private String name;  
    private int age;  
  
    // 公共的构造方法,用于创建Person对象并初始化属性  
    public Person(String name, int age) {  
        this.name = name;  
        this.age = age;  
    }  
  
    // 公共的getter方法,用于获取私有属性的值  
    public String getName() {  
        return name;  
    }  
  
    // 公共的setter方法,用于设置私有属性的值  
    public void setAge(int age) {  
        this.age = age;  
    }  
  
    // 公有的方法,描述人的行为  
    public void introduce() {  
        System.out.println("My name is " + name + " and I am " + age + " years old.");  
    }  
}  
  
public class Main {  
    public static void main(String[] args) {  
        Person person = new Person("Alice", 25);  
        person.introduce(); // 输出:My name is Alice and I am 25 years old.  
        // person.name = "Bob"; // 错误!不能直接访问私有属性  
        // System.out.println(person.name); // 错误!不能直接访问私有属性  
        person.setAge(26);  
        person.introduce(); // 输出:My name is Alice and I am 26 years old.  
    }  
}

抽象

抽象是指只展示对象的必要信息,而隐藏不必要的细节。在Java中,抽象类是一种不能被实例化的类,它通常包含抽象方法(没有实现的方法),这些抽象方法由子类来具体实现。抽象类和抽象方法主要用于定义接口和实现多态。

// 抽象类:动物
abstract class Animal {
    // 抽象方法,没有具体实现
    abstract void makeSound();
}

class Dog extends Animal {
    // 实现父类的抽象方法  
    @Override
    void makeSound() {
        System.out.println("The dog barks");
    }
}

// 子类:猫,实现动物的抽象方法  
class Cat extends Animal {
    // 实现父类的抽象方法  
    @Override
    void makeSound() {
        System.out.println("The cat meows");
    }
}

public class AbstractClassTest {
    public static void main(String[] args) {
        Animal myDog = new Dog(); // 多态:Animal引用指向Dog对象
        myDog.makeSound();
        Animal myCat = new Cat(); // 多态:Animal引用指向Cat对象
        myCat.makeSound();
    }
}

继承

继承是指一个类(子类)可以继承另一个类(父类)的属性和方法,并可以添加或覆盖父类的属性和方法。继承允许我们创建分等级层次的类,减少了代码冗余,提高了代码的可重用性。
继承使得代码可以重用,同时能体现类与类之间的
is-a
关系

// 父类:动物  
class Animal {  
    void makeSound() {  
        System.out.println("The animal makes a sound");  
    }  
}  
  
// 子类:狗,继承自动物  
class Dog extends Animal {  
    // 覆盖父类的方法  
    @Override  
    void makeSound() {  
        System.out.println("The dog barks");  
    }  
  
    void wagTail() {  
        System.out.println("The dog wags its tail");  
    }  
}  
  
public class Main {  
    public static void main(String[] args) {  
        Dog dog = new Dog();  
        dog.makeSound(); // 输出:The dog barks  
        dog.wagTail(); // 输出:The dog wags its tail  
    }  
}

组合

组合是将多个对象组合到一起,形成一个新的对象。组合体现的是
has-a
关系,即一个类中包含另一个类的对象。组合是一种强耦合关系,体现了严格的部分和整体的关系,部分和整体的生命周期一样。

  1. 例子1
// 组件类:轮子  
class Wheel {  
    void rotate() {  
        System.out.println("The wheel is rotating");  
    }  
}  
  
// 组合类:汽车,包含轮子对象  
class Car {  
    private Wheel wheel;  
  
    public Car() {  
        this.wheel = new Wheel();  
    }  
  
    void move() {  
        System.out.println("The car is moving");  
        wheel.rotate();  // 调用轮子对象的方法  
    }  
}  
  
public class Main {  
    public static void main(String[] args) {  
        Car car = new Car();  
        car.move();  // 调用汽车对象的方法,汽车对象内部会调用轮子对象的方法  
    }  
}
  1. 更经典的例子,可以体现继承和组合结合的范例
// polymorphism/Transmogrify.java
// Dynamically changing the behavior of an object
// via composition (the "State" design pattern)
class Actor {
    public void act() {}
}

class HappyActor extends Actor {
    @Override
    public void act() {
        System.out.println("HappyActor");
    }
}

class SadActor extends Actor {
    @Override
    public void act() {
        System.out.println("SadActor");
    }
}

class Stage {
    private Actor actor = new HappyActor();
    
    public void change() {
        actor = new SadActor();
    }
    
    public void performPlay() {
        actor.act();
    }
}

public class Transmogrify {
    public static void main(String[] args) {
        Stage stage = new Stage();
        stage.performPlay();
        stage.change();
        stage.performPlay();
    }
}

输出:

HappyActor
SadActor

多态

多态(Polymorphism)是面向对象编程的四大基本特性之一,其他三个是封装(Encapsulation)、继承(Inheritance)和抽象(Abstraction)。多态字面上理解就是“多种形态”,在Java中,多态指的是允许一个接口或父类引用指向其子类对象,并且在运行时能够自动调用实际指向对象的子类方法。

简单来说,
多态就是同一个方法调用可以有不同的实现方式
,具体实现取决于运行时对象的实际类型。Java通过方法重写(Override)和向上转型(Upcasting)来实现多态。

// 动物类,定义了一个makeSound方法  
class Animal {  
    void makeSound() {  
        System.out.println("The animal makes a sound");  
    }  
}  
  
// 狗类,继承自动物类并重写了makeSound方法  
class Dog extends Animal {  
    @Override  
    void makeSound() {  
        System.out.println("The dog barks");  
    }  
}  
  
// 测试类  
public class TestPolymorphism {  
    public static void main(String[] args) {  
        // 向上转型,父类引用指向子类对象  
        Animal animal = new Dog();  
          
        // 调用makeSound方法,实际执行的是Dog类的makeSound方法  
        animal.makeSound(); // 输出:The dog barks  
          
        // 如果我们有一个Animal类型的数组,我们也可以将不同类型的Animal对象加入其中  
        Animal[] animals = new Animal[2];  
        animals[0] = new Dog();  
        animals[1] = new Animal();  
          
        // 遍历数组并调用makeSound方法,每个对象会根据其实际类型调用相应的方法  
        for (Animal a : animals) {  
            a.makeSound();  
        }  
        // 输出:  
        // The dog barks  
        // The animal makes a sound  
    }  
}

觉得有用的点赞:)

嘿嘿嘿

进程和线程

进程
(Process)就是程序在操作系统中的一次执行过程,是系统进行资源分配和调度的基 本单位,进程是一个动态概念,是程序在执行过程中分配和管理资源的基本单位,每一个进 程都有一个自己的地址空间。一个进程至少有 5 种基本状态,它们是:初始态,执行态, 等待状态,就绪状态,终止状态,通俗的讲进程就是一个正在执行的程序。

线程
是进程的一个执行实例,是程序执行的最小单元,它是比进程更小的能独立运行的基 本单位

一个进程可以创建多个线程,同一个进程中的多个线程可以并发执行,一个程序要运行的话至少有一个进程

并发和并行

并发
:多个线程同时竞争一个位置,竞争到的才可以执行,每一个时间段只有一个线程在执

行。

并行
:多个线程可以同时执行,每一个时间段,可以有多个线程同时执行。

通俗的讲
多线程程序在单核 CPU 上面运行就是
并发
,多线程程序在多核 CUP 上运行就是
并行,
如果线程数大于 CPU 核数,则多线程程序在多个 CPU 上面运行既有并行又有并发

image-20240222190919472

协程goroutine

golang
中的主线程:
(可以理解为线程/也可以理解为进程),在一个 Golang 程序的主线程 上可以起多个协程。Golang 中多协程可以实现并行或者并发

协程:
可以理解为用户级线程,这是对内核透明的,也就是系统并不知道有协程的存在,是 完全由用户自己的程序进行调度的。Golang 的一大特色就是从语言层面原生支持协程,在 函数或者方法前面加 go 关键字就可创建一个协程。可以说 Golang 中的协程就是 goroutine

多协程和多线程:
Golang 中每个 goroutine (协程) 默认占用内存远比 Java 、C 的线程少

OS 线程(操作系统线程)一般都有固定的栈内存(通常为 2MB 左右)

一个 goroutine (协程) 占用内存非常小,只有 2KB 左右,多协程 goroutine 切换调度开销方面远比线程要少

使用协程
func test() {
	for i := 0; i <= 10; i++ {
		fmt.Println(i)
	}
}

func main() {
	
	// go关键字声明这是一个协程,test方法代码和主进程代码同时指向
	// 如果主进程执行完了,整个程序就结束了,协程没有执行完也不执行了,如果协程执行完主进程没有执行完,主进程会继续执行
	go test()

	for i := 0; i <= 10; i++ {
		fmt.Println(i)
	}

}
sync.WaitGroup 等待协程
import (
	"fmt"
	"sync"
)

// 需要导入sync包
// 声明WaitGroup
var wg sync.WaitGroup

func test() {
	for i := 0; i <= 10; i++ {
		fmt.Println(i, "123")
	}
	wg.Done() // 协程计数器-1
}

func main() {

	wg.Add(1) // 协程计数器+1,
	go test()
	for i := 0; i <= 10; i++ {
		fmt.Println(i)
	}

	wg.Wait() // 等待协程执行完毕,启动一个协程登记+1,结束就-1,等于0 的时候就等待结束

}

并行运行时使用的CPU核数

Go 运行时的调度器使用 GOMAXPROCS 参数来确定需要使用多少个 OS 线程来同时执行 Go 代码。默认值是机器上的 CPU 核心数。例如在一个 8 核心的机器上,调度器会把 Go 代码同 时调度到 8 个 OS 线程上。

Go 语言中可以通过 runtime.GOMAXPROCS()函数设置当前程序并发时占用的 CPU 逻辑核心数。

Go1.5 版本之前,默认使用的是单核心执行。Go1.5 版本之后,默认使用全部的 CPU 逻辑核 心数

import (
	"fmt"
	"runtime"
)

func main() {
	// 获取当下计数器上面的CPU个数
	cpuNUm := runtime.NumCPU()
	fmt.Println(cpuNUm)

	// ky自己设置使用多少CPU
	runtime.GOMAXPROCS(cpuNUm - 1)

}

channel管道

管道是 Golang 在语言级别上提供的 goroutine 间的通讯方式,我们可以使用 channel 在

多个 goroutine 之间传递消息。如果说 goroutine 是 Go 程序并发的执行体,channel 就是它们

之间的连接。channel 是可以让一个 goroutine 发送特定值到另一个 goroutine 的通信机制。

Golang 的并发模型是 CSP(Communicating Sequential Processes),提倡
通过通信共享内

而不是
通过共享内存而实现通信

Go 语言中的管道(channel)是一种特殊的类型。管道像一个传送带或者队列,总是遵循
先入先出
的规则,保证收发数据的顺序。每一个管道都是一个具体类 型的导管,也就是声明 channel 的时候需要为其指定元素类型

管道被设计为一种并发安全的数据结构,可以在多个协程之间安全地进行通信

channel类型

channel是一种类型,一种引用类型

/*
声明管道类型的语法:
var 管道名 chan 管道传递的数据类型

*/

var int1 chan int  // 传递int类型的管道
var str chan string
var b chan bool
var s chan []int
channel操作

管道有发送(send)、接收(receive)和关闭(close)三种操作。

发送和接收都使用<-符号

创建channel

向管道中存储数据,管道就必须有容量,所以需要使用make初始化分配容量

	// 创建一个管道,传递int类型,容量为3
	var ch = make(chan int, 3)
向管道存储数据
	// 管道名 <- 数据
	ch <- 10
从管道获取数据
	// 接收变量 := <-管道名
	a := <-ch
	fmt.Println(a) // 10

管道先入先出
	var ch = make(chan int, 3)
	// 向管道存储多条数据
	ch <- 10
	ch <- 11
	ch <- 12

	// 取出第一条数据
	a := <-ch
	fmt.Println(a) //10
	// 取出第二条数据,不赋值给任何
	<-ch
	// 第三天数据
	b := <-ch
	fmt.Println(b) //12

管道的值、长度和容量
	var ch = make(chan int, 3)
	// 管道的值是一个地址,长度是0是因为现在还没有写入值
	// 值-0xc0000b2000-长度-0,容量-310
	fmt.Printf("值-%v-长度-%v,容量-%v", ch, len(ch), cap(ch))
管道阻塞
  • 无缓冲管道:如果创建管道的时候没有指定容量,那么我们可以叫这个管道为无缓冲的管道,无缓冲的管道又称为阻塞的管道

  • 有缓冲管道:我们在使用 make 函数初始化 管道的时候为其指定管道的容量,有容量的管道就是有缓冲管道

  • 只要管道的容量大于零,那么该管道就是有缓冲的管道


    • 容量为1有一条数据的管道,再存储,阻塞管道
    • 容量为1有一条数据的管道,取一条数据后,再次取值,阻塞管道

image-20240223110224787

管道遍历和关闭管道

当向管道中发送完数据时,我们可以通过 close 函数来关闭管道。

当管道被关闭时,再往该管道发送值会引发 panic,从该管道取值的操作会先取完管道中的值,再然后取到的值一直都是对应类型的零值

func main() {

	var ch = make(chan int, 10)

	for i := 0; i < 10; i++ {
		ch <- i

	}

	// 在 Go 语言中,只有在接收端需要明确知道通道已经关闭的情况下才需要关闭通道。关闭通道是为了通知接收端不再有更多的数据发送过来,从而避免接收端陷入阻塞状态
	// 当管道被关闭时,再往该管道发送值会引发 panic
	close(ch)

	// 管道没有key,遍历管道的时候只用一个变量
	// 通过for range 遍历管道,如果没有关闭管道就会报错

	for v := range ch {
		fmt.Println(v) // 先进先出取值

	}

	/*
				 如果通过单纯的for循环变量管道,管道可以不关闭

			     在for range遍历通道时,当通道被关闭时,for range 循环会自动判断通道是否关闭,并在所有元素被接收后退出循环。
			     这是 for range 的一种特性,用于在遍历通道时方便地处理通道的关闭情况。
			     当通道未关闭时,for range 循环会一直等待接收通道中的元素。如果没有其他 goroutine 在向通道发送数据,或者没有明确的					 关闭通道的操作,那么 for range 循环就会一直阻塞在接收操作上,从而导致死锁

		 		   普通的 for 循环中,手动控制循环的条件,包括接收通道的操作。在这种情况下,可以根据自己的逻辑判断何时退出循环


         // 可以不关闭管道
				 for i:0;i<10;i++{ 
				    fmt.Println(<-ch)
				}
	*/

}

协程Goroutine 结合 Channel 管道 同步操作
package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup

// 定义一个向管道写入数据的方法
func WriteData(ch chan int) {
	for i := 0; i < 10; i++ {
		fmt.Println("WriteData")
		ch <- i
	}
	close(ch)
	wg.Done()

}

// 定义一个从管道读取数据的方法
func GetData(ch chan int) {
	for i := range ch {
		fmt.Println(i)
	}

	wg.Done()

}
func main() {
	/*
		两个协程方法同时执行,一个写入数据,一个读取数据
	
	当一个协程向管道写入数据而另一个协程从管道读取数据时,管道提供了内置的同步机制,即发送操作和接收操作会自动进行同步等待。
	这意味着,写入操作会在管道有足够的空间时立即完成,否则会阻塞直到有空间可用。
	类似地,读取操作会在管道有数据可读时立即完成,否则会阻塞直到有数据可读
	
	range 循环会在管道被关闭且其中的所有元素都被接收后自动退出。
	如果 WriteData 协程完成并关闭了管道,GetData 中的 range 循环会自动退出,不会导致死锁



	*/

	var ch = make(chan int, 10)

	wg.Add(1)
	go WriteData(ch)
	wg.Add(1)
	go GetData(ch)
	wg.Wait()

}

/*
多个协程打印素数
1.协程存放数字
2.多个协程计算是否素数
3.协程打印素数
*/

var wg = sync.WaitGroup{}

// 向intChan放入数字
func PutNum(intChan chan int) {

	for i := 2; i < 100; i++ {
		intChan <- i
	}

	close(intChan)
	wg.Done()
}

// 从intChan取出数据,判断是否是素数,如果是,存储到primeChan
func PrimeNum(intChan chan int, primeChan chan int, exitChan chan bool) {

	for num := range intChan {
		var flag = true
		for i := 2; i < num; i++ {
			// 如果=0 说明不是素数
			if num%i == 0 {
				flag = false
				break

			}
		}
		// num是素数
		if flag {
			primeChan <- num
		}

	}
	wg.Done()
	// 每执行完一次 向exitChan存储一个true
	exitChan <- true

}

// 打印素数
func PrintPrime(primeChan chan int) {

	for v := range primeChan {
		fmt.Println(v)
	}
	wg.Done()

}

func main() {

	var intChan = make(chan int, 1000)
	var primeChan = make(chan int, 1000)
	var exitChan = make(chan bool, 16)

	// 存入数字
	wg.Add(1)
	go PutNum(intChan)

	// 统计素数 启动16个协程
	for i := 0; i < 16; i++ {
		wg.Add(1)
		// 启动16个协程,操作同一个channel,所以不能在方法里直接关闭管道,需要16个协程都执行完再关闭管道
		go PrimeNum(intChan, primeChan, exitChan)
	}

	// 打印素数
	wg.Add(1)
	go PrintPrime(primeChan)

	// 匿名自执行函数-关闭primeChan
	// 管道循环
	wg.Add(1)
	go func() {
		for i := 0; i < 16; i++ {
			<-exitChan
		}
		// 关闭primeChan
		close(primeChan)
		wg.Done()
	}()

	wg.Wait()

}

单向管道

有的时候我们会将管道作为参数在多个任务函数间传递,很多时候我们在不同的任务函数中 使用管道都会对其进行限制,比如限制管道在函数中只能发送或只能接收,默认情况下,管道是双向管道,可读可写

	// 双向管道 可读可写
	var ch = make(chan int, 10)
	ch <- 1
	<-ch

	// 只写管道,只可以写入不能读取
	// 声明的时候在chan 和类型之间加 <-
	var ch1 = make(chan<- int, 10)
	ch1 <- 1

	// 只读管道,只能读取不能写入
	// 在chan前面 + <-
	var ch2 = make(<-chan int, 10)
// 写入数据方法,类型为只写管道
func WriteData(ch chan<- int) {
	ch <- 1
}

// 读取数据方法,类型为只读管道
func GetData(ch <-chan int) {
	fmt.Println(<-ch)
}

func main() {

	// 定义一个双向管道
	var ch = make(chan int, 10)

	WriteData(ch)
	GetData(ch)

}

select多路复用

遍历管道时,如果不关闭会阻塞而导致 deadlock,在实际开发中,可能我们不好确定什么关闭该管道

select 的使用类似于 switch 语句,它有一系列 case 分支和一个默认的分支。每个 case 会对应一个管道的通信(接收或发送)过程。select 会一直等待,直到某个 case 的通信操作完成 时,就会执行 case 分支对应的语句

  • 可处理一个或多个 channel 的发送/接收操作。

  • 如果多个 case 同时满足,select 会随机选择一个。

  • 对于没有 case 的 select{}会一直等待,可用于阻塞 main 函数

func main() {

	// 管道一 10个数字
	intChan := make(chan int, 10)
	for i := 0; i < 10; i++ {
		intChan <- i
	}

	// 管道二 10个字符串
	stringChan := make(chan string, 10)
	for i := 0; i < 10; i++ {
		stringChan <- strconv.Itoa(i)
	}

	/*

			死循环过程中,每次循环会随机从一个管道中读取数据,可能这次intChan,下次stringChan
		    直到所有数据读取完毕,会执行default,每次循环后是一个并发的操作,循环一次选择一个channel执行

	*/

	// 某些场景下,我们需要同时从多个管道接受数据,可以使用死循环和select
	// 使用select获取数据的时候,不需要关闭channel,如果关闭chanel,会一直执行读取,读取对应类型的零值
	for {
		select {
		case v := <-intChan:
			fmt.Println(v)
		case v := <-stringChan:
			fmt.Println(v)

		default:
			fmt.Println("数据全部读取完毕")
			return // 退出循环

		}
	}

}


并发安全和锁

互斥锁

互斥锁
是一种常用的控制共享资源访问的方法,它能够保证同时只有一个 goroutine 可以访问共享资源。Go 语言中使用 sync 包的 Mutex 类型来实现互斥锁

使用互斥锁能够保证同一时间有且只有一个 goroutine 进入临界区,其他的 goroutine 则在等 待锁;当互斥锁释放后,等待的 goroutine 才可以获取锁进入临界区,多个 goroutine 同时等 待一个锁时,唤醒的策略是随机的

  • Lock:锁定共享资源
  • Unlock:解锁共享资源
import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup

// 声明一个互斥锁
var mutex sync.Mutex
var count int = 0

func test() {
	// 多个协程同时访问时,先获取锁,然后执行代码,最后解锁,同时只有一个协程能获取到锁,获取不到锁的协程就等待
	
	// 加锁
	mutex.Lock()
	count++
	fmt.Println(count)
	// 解锁
	mutex.Unlock()

}

func main() {

	for i := 0; i < 20; i++ {
		wg.Add(1)
		go test()
	}

	wg.Wait()
}

读写互斥锁

互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。

读写锁在 Go 语言中使用 sync 包中的 RWMutex 类型。

读写锁分为两种:读锁和写锁。当一个 goroutine 获取读锁之后,其他的 goroutine 如果是获取读锁会继续获得锁,如果是获取写锁就会等待;当一个 goroutine 获取写锁之后,其他的goroutine 无论是获取读锁还是写锁都会等待

var wg sync.WaitGroup

// 声明读写锁
var mutex sync.RWMutex

// 写的方法 互斥的
func WriteData() { 
	mutex.Lock() // +写的互斥锁
	fmt.Println("执行写操作")
	mutex.Unlock() // 解写的互斥锁
	wg.Done()

}

// 读的方法 并行的
func ReadData() {
	mutex.RLock() // +读的互斥锁
	fmt.Println("执行读操作")
	mutex.RUnlock() // 解读的互斥锁
	wg.Done()

}
func main() {

	// 开启10个协程执行写操作
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go WriteData()

	}

	// 开启10个协程执行写操作
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go ReadData()
	}
	wg.Wait()
}

image