分类 其它 下的文章

升讯威在线客服与营销系统是基于 .net core / WPF 开发的一款在线客服软件,宗旨是:
开放、开源、共享。努力打造 .net 社区的一款优秀开源产品。

背景

我在业余时间开发的这个客服系统,有一个问题始终让我饱受困扰,360以及各种国产安全管家十分容易报威胁。即便ESET、卡巴斯基、Windows Defender、小红伞全部一切正常,但360、国产管家就是倔强的报威胁。而这些国产管家的用户基数又很大,非常让人恼火。难道真的必须交保护费吗?

经过长时间的尝试和测试,终于在一定程度上缓解了这个问题(还不敢保证不会再报,继续观察)。

对软件进行数字签名

如果软件被360和国产管家报威胁,可以为其进行数字签名,即使用代码签名证书,用户下载软件时,能通过数字签名验证软件来源,确认软件、代码没有被非法篡改或植入病毒,减少被误报的可能。

Signcode.exe

Signcode.exe 是微软提供的一款用于对代码和软件文件进行数字签名的工具:

  • 支持多种文件类型签名:能够对常见的可执行文件(如.exe、.dll、.ocx)、Cabinet 打包文件(.cab)以及目录文件(.cat)等进行数字签名,确保这些文件的完整性和来源可靠性,防止文件被篡改.
  • 增强软件安全性和可信度:通过数字签名,用户在下载和运行软件时,可以验证软件是否来自合法的开发者,以及软件在传输过程中是否被修改过,从而提高软件的安全性和可信度,减少用户对未知来源软件的担忧.
  • 提供时间戳服务:在签名过程中可添加时间戳,即使代码签名证书过期,只要软件是在证书有效期内签名的,时间戳服务仍能保证该软件的签名在一定时间内继续有效,用户依旧可以放心下载和使用,无需开发者重新对软件进行签名.

生成数字签名证书

生成证书:可以使用 New-SelfSignedCertificate 命令生成自签名证书,例如

New-SelfSignedCertificate -Type Custom -Subject "CN=YourCompanyName" -KeyUsage DigitalSignature -FriendlyName "YourFriendlyName" -CertStoreLocation "Cert:\CurrentUser\My" -TextExtension @("2.5.29.37={text}1.3.6.1.5.5.7.3.3", "2.5.29.19={text}") -NotAfter (Get-Date).AddYears(10)

其中CN为证书的主题名称,可根据实际情况修改,此命令会在当前用户的证书存储区生成一个自签名证书.

导出证书:使用 Export-PfxCertificate 命令将证书导出为pfx格式,如

Export-PfxCertificate -cert "Cert:\CurrentUser\My\YourCertificateThumbprint" -FilePath yourcertificate.pfx -Password (ConvertTo-SecureString -String "YourPassword" -Force -AsPlainText)

注意将YourCertificateThumbprint替换为实际的证书指纹,YourPassword替换为设置的密码.

使用 Signtool 签名:找到Signtool.exe工具的路径,一般在 C:\Program Files (x86)\Microsoft SDKs\ClickOnce\SignTool\signtool.exe,打开命令提示符,进入该目录,然后执行签名命令,如

C:\Program Files (x86)\Microsoft SDKs\ClickOnce\SignTool\signtool.exe" sign /f yourcertificate.pfx /p YourPassword /t http://timestamp.digicert.com /v "YourSoftware.exe"

将yourcertificate.pfx、YourPassword和YourSoftware.exe分别替换为实际的证书文件、密码和要签名的软件文件名.

使用 Signcode.exe 工具

制作根证书:在命令提示符中输入

makecert -sv myroot.pvk -ss mysubjectname -n CN=mycompany -r myroot.cer

其中-sv指定私钥文件名,-ss指定主题的证书存储名称,-n指定证书颁发对象,-r指定证书存储位置,回车后设置密码并记住.

制作子证书:输入

makecert -sv test.pvk -iv myroot.pvk -n CN=mycompany -$ commercial -ic myroot.cer test.cer

同样设置并记住密码,此命令中的参数含义与制作根证书类似,只是多了-iv指定根证书的私钥文件和-$指定授权范围.

生成 spc 发行者证书:可选操作,输入

cert2spc test.cer test.spc

可在相应目录下生成test.spc文件.

使用 Signcode 签名:运行 signcode.exe,按照向导依次选择需要签名的文件、证书类型为 “自定义”,然后从文件中选择子证书和私钥文件,并输入相应密码,还可根据需要设置哈希算法、其他证书、数据描述以及时间戳等选项,最后完成数字签名.

注意事项

证书的安全性:制作和使用数字证书时,需妥善保管私钥文件和密码,防止证书被盗用或泄露,以免造成软件被恶意签名或其他安全问题.

准确填写描述信息:填写功能描述和 Web 位置时,必须准确无误且指向证书签名单位网站上有关此软件的介绍页面,否则可能导致证书被吊销.

简介下这个 .net 开发的客服系统

可全天候 7 × 24 小时挂机运行,不掉线不丢消息,欢迎实测。

https://kf.shengxunwei.com/

希望能够打造:
开放、开源、共享。努力打造 .net 社区的一款优秀开源产品。

钟意的话请给个赞支持一下吧,谢谢~

序言:重新训练人工智能大型模型是一项复杂且高成本的任务,尤其对于当前的LLM(大型语言模型)来说,全球99.99%的企业难以承担。这是因为模型训练需要巨大的资源投入、复杂的技术流程以及大量的人力支持。因此,无论在科学研究还是实际应用中,人们通常依赖开源的预训练模型及其已经学习到的各种特征信息,就像使用开源的Linux一样。本节将讲解如何利用这些预训练模型中的“嵌入”信息来解决实际问题。

使用预训练嵌入与RNN

在之前的所有示例中,我们收集了训练集中要使用的完整单词集,然后用它们训练了嵌入。这些嵌入最初是聚合在一起的,然后输入到密集网络中,而在最近的章节中,我们探讨了如何使用RNN来改进结果。在此过程中,我们被限制在数据集中已经存在的单词,以及如何使用该数据集中的标签来学习它们的嵌入。回想一下在前面有一章,我们讨论了迁移学习。如果,您可以不自己学习嵌入,而是使用已经预先学习的嵌入,研究人员已经完成了将单词转化为向量的艰苦工作,并且这些向量是经过验证的呢?其中一个例子是Stanford大学的Jeffrey Pennington、Richard Socher和Christopher Manning开发的GloVe(Global Vectors for Word Representation)模型。

在这种情况下,研究人员分享了他们为各种数据集预训练的单词向量:

• 一个包含60亿个标记、40万个单词的词汇集,维度有50、100、200和300,单词来自维基百科和Gigaword

• 一个包含420亿个标记、190万个单词的词汇集,维度为300,来自通用爬虫

• 一个包含8400亿个标记、220万个单词的词汇集,维度为300,来自通用爬虫

• 一个包含270亿个标记、120万个单词的词汇集,维度为25、50、100和200,来自对20亿条推文的Twitter爬虫

考虑到这些向量已经预训练,我们可以轻松地在TensorFlow代码中重复使用它们,而不必从头开始学习。首先,我们需要下载GloVe数据。这里选择使用Twitter数据集,包含270亿个标记和120万个单词的词汇集。下载的是一个包含25、50、100和200维度的归档文件。

为了让整个过程稍微方便一些,我已经托管了25维版本,您可以像这样将其下载到Colab笔记本中:

!wget --no-check-certificate \

https://storage.googleapis.com/laurencemoroney-blog.appspot.com/glove.twitter.27B.25d.zip
\

-O /tmp/glove.zip

这是一个ZIP文件,您可以像这样解压缩,得到一个名为glove.twitter.27B.25d.txt的文件:

解压GloVe嵌入

import os

import zipfile

local_zip = '/tmp/glove.zip'

zip_ref = zipfile.ZipFile(local_zip, 'r')

zip_ref.extractall('/tmp/glove')

zip_ref.close()

文件中的每一行都是一个单词,后面跟着为其学习到的维度系数。最简单的使用方式是创建一个字典,其中键是单词,值是嵌入。您可以这样设置这个字典:

glove_embeddings = dict()

f = open('/tmp/glove/glove.twitter.27B.25d.txt')

for line in f:

values = line.split()

word = values[0]

coefs = np.asarray(values[1:], dtype='float32')

glove_embeddings[word] = coefs

f.close()

此时,您可以简单地通过使用单词作为键来查找任何单词的系数集。例如,要查看“frog”的嵌入,您可以使用:

glove_embeddings['frog']

有了这个资源,您可以像以前一样使用分词器获取语料库的单词索引——但现在,您可以创建一个新的矩阵,我称之为嵌入矩阵。这个矩阵将使用GloVe集中的嵌入(从glove_embeddings获取)作为其值。因此,如果您检查数据集中单词索引中的单词,如下所示:

{'
': 1, 'new': 2, … 'not': 5, 'just': 6, 'will': 7}

那么嵌入矩阵的第一行应该是GloVe中“
”的系数,接下来的行是“new”的系数,依此类推。

您可以使用以下代码创建该矩阵:

embedding_matrix = np.zeros((vocab_size, embedding_dim))

for word, index in tokenizer.word_index.items():

if index > vocab_size - 1:

break

else:

embedding_vector = glove_embeddings.get(word)

if embedding_vector is not None:

embedding_matrix[index] = embedding_vector

这只是创建了一个矩阵,矩阵的维度是您所需的词汇大小和嵌入维度。然后,对于分词器的每个词汇索引项,您会查找GloVe中的系数(从glove_embeddings中获取),并将这些值添加到矩阵中。

接着,您需要修改嵌入层,使用预训练的嵌入,通过设置weights参数,并指定不希望该层被训练,通过设置trainable=False:

model = tf.keras.Sequential([

tf.keras.layers.Embedding(vocab_size, embedding_dim,

weights=[embedding_matrix], trainable=False),

tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(embedding_dim, return_sequences=True)),

tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(embedding_dim)),

tf.keras.layers.Dense(24, activation='relu'),

tf.keras.layers.Dense(1, activation='sigmoid')

])

现在,您可以像之前一样进行训练。然而,您需要考虑您的词汇大小。在上一章中,您为了避免过拟合,做了一些优化,目的是防止嵌入过多地学习低频单词;您通过使用更小的词汇表、仅包含常用单词来避免过拟合。在这种情况下,由于单词嵌入已经通过GloVe为您学习过,您可以扩展词汇表——但扩展多少呢?

首先要探索的是,您的语料库中有多少单词实际上在GloVe集中。GloVe有120万个单词,但不能保证它包含您的所有单词。所以,这里有一些代码,可以快速对比,让您探索您的词汇表应该多大。

首先,整理数据。创建一个包含Xs和Ys的列表,其中X是词汇索引,Y=1表示该单词在嵌入中,0则表示不在。此外,您可以创建一个累计集,在每个时间步计算单词的比例。例如,索引为0的单词“OOV”不在GloVe中,所以它的累计Y值为0。下一个索引的单词“new”在GloVe中,所以它的累计Y值为0.5(即,到目前为止看到的单词中有一半在GloVe中),然后您会继续这样计算整个数据集:

xs = []

ys = []

cumulative_x = []

cumulative_y = []

total_y = 0

for word, index in tokenizer.word_index.items():

xs.append(index)

cumulative_x.append(index)

if glove_embeddings.get(word) is not None:

total_y = total_y + 1

ys.append(1)

else:

ys.append(0)

cumulative_y.append(total_y / index)

然后,您可以使用以下代码绘制Xs与Ys的关系图:

import matplotlib.pyplot as plt

fig, ax = plt.subplots(figsize=(12, 2))

ax.spines['top'].set_visible(False)

plt.margins(x=0, y=None, tight=True)

plt.axis([13000, 14000, 0, 1])

plt.fill(ys)

这将给您一个单词频率图,看起来像图7-17。

                                                      图7-17. 单词频率图

如图表所示,密度在10,000到15,000之间发生变化。这让您直观地看到,大约在13,000标记的位置,未在GloVe嵌入中的单词的频率开始超过那些已经在GloVe嵌入中的单词。

如果您再绘制累计的cumulative_x与cumulative_y的关系,您将能更好地理解这个变化。以下是代码:

import matplotlib.pyplot as plt

plt.plot(cumulative_x, cumulative_y)

plt.axis([0, 25000, .915, .985])

您可以看到图7-18中的结果。


图7-18. 绘制单词索引频率与GloVe的关系

现在,您可以调整plt.axis中的参数,放大查看拐点,看看未出现在GloVe中的单词是如何开始超过那些在GloVe中的单词的。这是设置词汇大小的一个不错起点。

使用这种方法,我选择了一个词汇大小为13,200(而不是之前为了避免过拟合而使用的2,000),并使用了以下模型架构,其中embedding_dim是25,因为我使用的是GloVe集:

model = tf.keras.Sequential([

tf.keras.layers.Embedding(vocab_size, embedding_dim,

weights=[embedding_matrix], trainable=False),

tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(embedding_dim, return_sequences=True)),

tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(embedding_dim)),

tf.keras.layers.Dense(24, activation='relu'),

tf.keras.layers.Dense(1, activation='sigmoid')

])

然后,使用Adam优化器:

adam = tf.keras.optimizers.Adam(learning_rate=0.00001, beta_1=0.9, beta_2=0.999, amsgrad=False)

model.compile(loss='binary_crossentropy', optimizer=adam, metrics=['accuracy'])

训练30个epoch后,得到了很好的结果。准确率如图7-19所示。验证准确率与训练准确率非常接近,表明我们不再过拟合。


图7-19. 使用GloVe嵌入的堆叠LSTM准确率

这一点通过损失曲线得到进一步验证,如图7-20所示。验证损失不再发散,这表明尽管我们的准确率只有大约73%,我们可以有信心认为模型的准确性达到了这个程度。

                                                      图7-20. 使用GloVe嵌入的堆叠LSTM损失

训练模型更长时间会得到非常相似的结果,并且表明,尽管大约在第80个epoch左右开始出现过拟合,模型仍然非常稳定。

准确率指标(图7-21)显示模型训练得很好。

损失指标(图7-22)显示大约在第80个epoch左右开始出现发散,但模型仍然拟合得很好。


图7-21. 使用GloVe的堆叠LSTM在150个epoch上的准确率


图7-22. 使用GloVe的堆叠LSTM在150个epoch上的损失

这告诉我们,这个模型是早停的好候选者,您只需要训练它75到80个epoch,就能得到最佳结果。

我用来自《洋葱报》的标题(《洋葱报》是讽刺性标题的来源,也是讽刺数据集的来源),与其他句子进行了测试,测试代码如下:

test_sentences = [

"It Was, For, Uh, Medical Reasons, Says Doctor To Boris Johnson, Explaining Why They Had To Give Him Haircut",

"It's a beautiful sunny day",

"I lived in Ireland, so in high school they made me learn to speak and write in Gaelic",

"Census Foot Soldiers Swarm Neighborhoods, Kick Down Doors To Tally Household Sizes"

]

这些标题的结果如下——记住,接近50%(0.5)的值被认为是中立的,接近0的是非讽刺的,接近1的是讽刺的:

[[0.8170955 ]

[0.08711044]

[0.61809343]

[0.8015281 ]]

来自《洋葱报》的第一句和第四句显示了80%以上的讽刺概率。关于天气的陈述则显得非常非讽刺(9%),而关于在爱尔兰上高中这句话被认为可能是讽刺的,但信心不高(62%)。

总结

本节中我们介绍了循环(递归)神经网络(RNN),它们在设计中使用面向序列的逻辑,可以帮助您理解句子的情感,不仅基于其中的单词,还基于它们出现的顺序。了解了基本的RNN如何工作,以及LSTM如何在此基础上改进,保留长期上下文。您使用这些技术改进了您一直在做的情感分析模型。接着,您研究了RNN的过拟合问题以及改善它们的技术,包括使用从预训练嵌入中进行迁移学习。在接下来的章节中,我们将使用前面全部所学内容探索如何预测单词,进而创建一个能够生成文本的模型,甚至为您写诗!

数据库优化方案

1.对查询进行优化,要
尽量避免全表扫描
,首先应考虑在 where 及 order by 涉及的列上建立索引。

2.应尽量避免在 where 子句中对字段进行 null 值判断,否则将导致引擎放弃使用索引而进行全表扫描,如:

select id from t where num is null

最好不要给数据库留NULL,尽可能的使用 NOT NULL填充数据库.

备注、描述、评论之类的可以设置为 NULL,其他的,最好不要使用NULL。

不要以为 NULL 不需要空间,比如:char(100) 型,在字段建立时,空间就固定了, 不管是否插入值(NULL也包含在内),都是占用 100个字符的空间的,如果是varchar这样的变长字段, null 不占用空间。

可以在num上设置默认值0,确保表中num列没有null值,然后这样查询:

select id from t where num = 0

3.应尽量避免在 where 子句中使用 != 或 <> 操作符,否则将引擎放弃使用索引而进行全表扫描。

4.应尽量避免在 where 子句中使用 or 来连接条件,如果一个字段有索引,一个字段没有索引,将导致引擎放弃使用索引而进行全表扫描,如:

select id from t where num=10 or Name = 'admin'
可以这样查询:

select id from t where num = 10 union allselect id from t where Name = 'admin'

5.in和 not in 也要慎用,否则会导致全表扫描,如:
select id from t where num in(1,2,3)
对于连续的数值,能用 between就不要用 in 了:

select id from t where num between 1 and 3
很多时候用 exists 代替 in 是一个好的选择:

select num from a where num in(select num from b)
用下面的语句替换:

select num from a where exists(select 1 from b where num=a.num)

6.下面的查询也将导致全表扫描:

select id from t where name like ‘%abc%
若要提高效率,可以考虑全文检索。

7.如果在 where 子句中使用参数,也会导致全表扫描。因为SQL只有在运行时才会解析局部变量,但优化程序不能将访问计划的选择推迟到运行时;它必须在编译时进行选择。然 而,如果在编译时建立访问计划,变量的值还是未知的,因而无法作为索引选择的输入项。如下面语句将进行全表扫描:

select id from t where num = @num
可以改为
强制查询使用索引

select id from t with(index(索引名)) where num = @num

应尽量避免在 where子句中对字段进行表达式操作,这将导致引擎放弃使用索引而进行全表扫描。如:

select id from t where num/2 = 100
应改为:

select id from t wherenum = 100*2

9.应尽量避免在where子句中对字段进行函数操作,这将导致引擎放弃使用索引而进行全表扫描。如:

select id from t where substring(name,1,3) = ’abc’       -–name以abc开头的id
select id from t where datediff(day,createdate,’2005-11-30′) = 0    -–‘2005-11-30’    --生成的id

应改为:

select id from t where name like 'abc%'
select id from t where createdate >= '2005-11-30' and createdate < '2005-12-1'

10.不要在 where 子句中的“=”左边进行函数、算术运算或其他表达式运算,否则系统将可能无法正确使用索引。
11.在使用索引字段作为条件时,如果该索引是复合索引,那么必须使用到该索引中的第一个字段作为条件时才能保证系统使用该索引,否则该索引将不会被使用,并且应尽可能的让字段顺序与索引顺序相一致。

12.不要写一些没有意义的查询,如需要生成一个空表结构:

select col1,col2 into #t from t where 1=0
这类代码不会返回任何结果集,但是会消耗系统资源的,应改成这样:

create table #t(…)

13.Update 语句,如果只更改1、2个字段,
不要Update全部字段
,否则频繁调用会引起明显的性能消耗,同时带来大量日志。

14.对于多张大数据量(这里几百条就算大了)的表JOIN,要先分页再JOIN,否则逻辑读会很高,性能很差。

15.select count(*) from table;这样
不带任何条件的count会引起全表扫描
,并且没有任何业务意义,是一定要杜绝的。

16.
索引并不是越多越好
,索引固然可以提高相应的 select 的效率,但同时也降低了 insert 及 update 的效率,因为 insert 或 update 时有可能会重建索引,所以怎样建索引需要慎重考虑,视具体情况而定。一个表的索引数最好不要超过6个,若太多则应考虑一些不常使用到的列上建的索引是否有 必要。

17.
应尽可能的避免更新 clustered 索引数据列
,因为 clustered 索引数据列的顺序就是表记录的物理存储顺序,一旦该列值改变将导致整个表记录的顺序的调整,会耗费相当大的资源。若应用系统需要频繁更新 clustered 索引数据列,那么需要考虑是否应将该索引建为 clustered 索引。

18.
尽量使用数字型字段
,若只含数值信息的字段尽量不要设计为字符型,这会降低查询和连接的性能,并会增加存储开销。这是因为引擎在处理查询和连 接时会逐个比较字符串中每一个字符,而对于数字型而言只需要比较一次就够了。

19.尽可能的使用
varchar/nvarchar代替 char/nchar
,因为首先变长字段存储空间小,可以节省存储空间,其次对于查询来说,在一个相对较小的字段内搜索效率显然要高些。

20.任何地方都不要使用
select * from t
,用具体的字段列表代替“*”,不要返回用不到的任何字段。

21.
尽量使用表变量来代替临时表
。如果表变量包含大量数据,请注意索引非常有限(只有主键索引)。

  1. 避免频繁创建和删除临时表,以减少系统表资源的消耗。
    临时表并不是不可使用,适当地使用它们可以使某些例程更有效,例如,当需要重复引用大型表或常用表中的某个数据集时。但是,对于一次性事件, 最好使用导出表。

23.在新建临时表时,如果一次性插入数据量很大,那么可以使用
select into 代替 create table
,避免造成大量 log ,以提高速度;如果数据量不大,为了缓和系统表的资源,应先create table,然后insert。

24.如果使用到了临时表,在存储过程的最后务必将所有的临时表显式删除,先 truncate table ,然后 drop table ,这样可以避免系统表的较长时间锁定。

25.尽量避免使用游标,因为游标的效率较差,如果游标操作的数据超过1万行,那么就应该考虑改写。

26.使用基于游标的方法或临时表方法之前,应先寻找基于集的解决方案来解决问题,基于集的方法通常更有效。

27.与临时表一样,游标并不是不可使用。对小型数据集使用 FAST_FORWARD 游标通常要优于其他逐行处理方法,尤其是在必须引用几个表才能获得所需的数据时。在结果集中包括“合计”的例程通常要比使用游标执行的速度快。如果开发时 间允许,基于游标的方法和基于集的方法都可以尝试一下,看哪一种方法的效果更好。

28.在所有的存储过程和触发器的开始处设置 SET NOCOUNT ON ,在结束时设置 SET NOCOUNT OFF 。无需在执行存储过程和触发器的每个语句后向客户端发送 DONE_IN_PROC 消息。

29.尽量避免大事务操作,提高系统并发能力。

30.尽量避免向客户端返回大数据量,若数据量过大,应该考虑相应需求是否合理。

本地缓存带来的挑战

分布式缓存相比于本地缓存,在实现层面需要关注的点有哪些不同。梳理如下:

维度 本地缓存 集中式缓存
缓存量 受限于单机内存大小,存储数据有限 需要提供给分布式系统里面所有节点共同使用,对于大型系统而言,对集中式缓存的容量诉求非常的大,远超单机内存的容量大小。
可靠性 影响有限,只有本进程使用,不会影响其他进程的可靠性。 作为整个系统扛压屏障,系统内所有节点共同依赖的通用服务,一旦集中式缓存出问题,会影响与其对接的所有业务节点,对系统的影响是
致命性
的。
承压性 承载单机节点的压力,请求量有限 承载整个分布式集群所有节点的流量,系统内业务分布式节点部署数量越多、业务体量越大,会导致集中缓存要承载的压力就越大,甚至是上不封顶的。

从上述几个维度的对比可以发现,同样是缓存,但
集中式缓存
所承担的使命是完全不一样的,业务对集中式缓存的
存储容量

可靠性

承压性
等方面的诉求也是天壤之别,不可等同视之。以
Redis
为例:

  • 如何打破redis缓存容量受限于机器单机内存大小的问题?
  • 如何使得redis能够扛住多方过来的请求压力?
  • 如何保证redis不会成为单点故障源?

其实答案很简单,加机器!通过多台机器的叠加使用,达到比单机更优的效果 —— 现在业务系统的集群化部署,也都是采用的这个思路。Redis的分布式之路亦是如此,但相比于常规的业务系统分布式集群化构建更加复杂:

  1. 很多业务实现集群化部署会很简单,因为每个业务进程节点都是
    无状态
    的,只需要部署下然后通过负载均衡的方式对外提供请求应答即可。
  2. Redis作为一个集中式缓存数据库,它是
    有状态
    的,不仅需要将进程分别部署在多个节点上,还需要将数据也分散存储在各个节点上,同时还得保证整个Redis集群对外是一个统一整体。

所以对于一个集中式缓存的分布式能力构建,必须要额外提供一些机制,来保障数据在各个节点上的安全与
一致性
,还需要将分散在各个节点上的数据都组成一个逻辑上的整体。

主从复制简介

主从复制是什么

主从复制,是指将一台Redis服务器的数据,复制到其他的Redis服务器。前者称为主节点(master),后者称为从节点(slave);数据的复制是单向的,只能由主节点到从节点,而对于redis来说,
一主两从
是比较常见的搭配。

主从模式按照读写分离的策略来提升整体的请求处理能力:

  1. 主节点(Master)同时对外提供
    读和写
    操作
  2. 从节点(Slave)通过
    replicate
    同步的方式,从主节点复制数据,保持自身数据与主节点一致
  3. 从节点只能对外提供
    读操作

当然,对于读多写少类的操作,为了提升整体读请求的处理能力,可以采用
一主多从
的方式。所有的从节点都从主节点进行数据同步,这样会导致主节点的同步处理压力过大而成为瓶颈。为了解决这个问题,redis还支持了从slave节点分发的能力,也就是从服务器也可以有自己的从服务器, 多个从服务器之间可以构成一个主从链。这样可以分摊主服务器压力。

主从复制的作用

  • 数据备份:主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式。
  • 故障恢复:当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复。
  • 读写分离:由主节点提供写服务,由从节点提供读服务,提高Redis服务器的并发量。

主从复制流程

全量复制

在第一次同步时会进行全量复制(但并非只有第一次同步时全量复制,其他情况看后文)

第一次同步时流程:

第一阶段:建立链接、协商同步

从服务器向主服务器发送PSYNC ? -1 命令,主动请求进行完整重同步

psync 命令包含两个参数,分别是主服务器的 runID 和复制进度 offset。

  • runID,每个 Redis 服务器在启动时都会自动生产一个随机的 ID 来唯一标识自己。当从服务器和主服务器第一次同步时,因为不知道主服务器的 run ID,所以将其设置为 "?"。
  • offset,表示复制的进度,(也叫复制偏移量),主要为增量复制服务,这里因为是全量复制,所以使用-1表示。

主服务器收到 psync 命令后,会向从服务器发送FULLRESYNC响应命令并带上两个参数:主服务器的 runID 和主服务器目前的复制进度 offset。从服务器收到响应后,会记录这两个值。

FULLRESYNC
响应命令的意图是采用全量复制的方式,也就是主服务器会把所有的数据都同步给从服务器。

第二阶段:主服务器同步数据给从服务器

接着,主服务器会执行 bgsave 命令来生成 RDB 文件,然后把文件发送给从服务器(数据持久化)。

从服务器收到 RDB 文件后,
会先清空当前的数据,然后载入 RDB 文件
。这是因为从服务器在通过 replicaof 命令开始和主服务器同步前,可能保存了其他数据。为了避免之前数据的影响,从服务器需要先把当前数据库清空。

这里有一点要注意,主服务器生成 RDB 这个过程是不会阻塞主线程的,因为 bgsave 命令是产生了一个子进程来做生成 RDB 文件的工作,是异步工作的,这样 Redis 依然可以正常处理命令。

就像RDB文件生成过程中Redis不停止提供服务一样,从服务器在接收并载入RDB文件的过程中,主服务器仍然可以写入数据,那怎么将这部分数据传给从服务器呢?

第三阶段:主服务器发送新写操作命令给从服务器

为了保证主从服务器的数据一致性,主服务器为每个连接进来的从服务器准备了一个replication buffer缓冲区,这段时间内写入的数据都会被存入这个replication buffer中,从服务器完成 RDB 的载入后,会回复一个确认消息给主服务器。主服务器就将replication buffer中的数据推送过去。

长连接传播

主从服务器在完成第一次同步后,双方之间就会维护一个 TCP 连接,这个TCP连接是长连接

之后就会基于这个
长连接
进行命令传播。通过这种方式来保证第一次同步后的主从服务器的数据一致性。

增量复制

实际上,生成RDB文件是比较耗费资源的,同时,主服务器传输 RDB 文件给从服务器,这个操作会耗费主从服务器大量的网络资源,并对主服务器响应时延产生影响。而对从服务器而言,载入 RDB 文件期间,会阻塞其他命令请求,这也会导致响应效率的降低。并且,当从服务器断开后重新连接,主从数据不一致,在数据少量不一致的情况下,也不需要全量复制。因此,就提供了增量复制

复制偏移量(replication offset)

主服务器和从服务器会分别维护一个复制偏移量。如果主从服务器的复制偏移量相同,则说明二者的数据库状态一致;反之,则说明二者的数据库状态不一致,此时从服务器需要使用增量复制来同步缺失的这一部分数据。

复制积压缓冲区(replication backlog)

主服务器的写命令,除了传给从服务器后,还会写入replication backlog(全局唯一),这是一个固定长度的先进先出(FIFO)队列,默认大小为 1MB。其在内存中是一个环形结构。

  1. 主服务器按照顺时针方向写命令,主服务器最新写入的位置即为上文提到的主服务器的偏移量,这里叫master offset。
  2. 假设从服务器在set key2 2后断开连接,也就是上图中slave offset的位置,当它重连时,再次给主服务器发送psync指令时,会带上自己的offset(注意和全量复制的区别,全量复制时offset设置为-1,此时是从服务器真实的offset值)。
  3. 接着,主服务器发现从服务器的偏移量与自己不一致,需要进行增量复制。此时主服务器会计算出master offset与slave offset之间的指令,并发送给该为从服务器准备的replication buffer中,进而发送给从服务器。
  4. 从服务器进行写入后便又恢复到和主服务器一致的状态。

断开重连并不一定总是增量复制

网络断开后,当从服务器重新连上主服务器时,从服务器会通过 psync 命令将自己的复制偏移量 slave_repl_offset 发送给主服务器,主服务器根据自己的 master_repl_offset 和 slave_repl_offset 之间的差距,然后来决定对从服务器执行哪种同步操作:

  1. 整个replication backlog是个环形结构,也就是说最新的写命令会将最老的写命令覆盖。换句话说,如果从服务器断开时间太久,环形缓冲区被主服务器的写命令覆盖了,那么从服务器连上主服务器后只能通过
    全量复制
    来获取数据了。所以replication backlog配置要尽量大一些,可以降低主从断开后全量复制的概率。
    • 如果判断出从服务器要读取的数据还在 repl_backlog_buffer 缓冲区里,那么主服务器将采用增量同步的方式;
    • 相反,如果判断出从服务器要读取的数据已经不存在 repl_backlog_buffer 缓冲区里,那么主服务器将采用全量同步的方式。
  2. 上文中有提到每个实例有自己的RunID,这个值在服务器启动时自动生成,由 40 个随机的十六进制字符组成。从服务器断开重连时会将之前主服务器的RunID一起发送过去(这里注意和第一次连接的区别,第一次连接时发送的RunID是“?”),主服务器会判断这个RunID是否为自己,如果不是(比如出现脑裂,出现两个主服务器),则会和全量复制时一样返回FULLRESYNC响应命令,告知从服务器需要进行全量复制。

总结

主从服务器第一次同步的时候,就是采用全量复制。

第一次同步完成后,主从服务器都会维护着一个长连接,主服务器在接收到写操作命令后,就会通过这个连接将写命令传播给从服务器,来保证主从服务器的数据一致性。

如果遇到网络断开,就需要进行增量复制(当然不一定是增量复制,具体还需要看replication backlog的大小,以及对应的主服务器RunID)。

面试题专栏

Java面试题专栏
已上线,欢迎访问。

  • 如果你不知道简历怎么写,简历项目不知道怎么包装;
  • 如果简历中有些内容你不知道该不该写上去;
  • 如果有些综合性问题你不知道怎么答;

那么可以私信我,我会尽我所能帮助你。

书接上回,我们今天继续讲解实现对象集合与DataTable的相互转换。

01
、把表格转换为对象集合

该方法是将表格的列名称作为类的属性名,将表格的行数据转为类的对象。从而实现表格转换为对象集合。同时我们约定如果类的属性设置了DescriptionAttribute特性,则特性值和表格列名一一对应,如果没有设置特性则取属性名称和列名一一对应。

同时我们需要约束类只能为结构体或类,而不能是枚举、基础类型、以及集合类型、委托、接口等。

类的类型校验成功后,我们还需要校验表格是否能转换为对象,即判断表格列名和类的属性名称或者Description特性值是否存在一致,如果没有一个表格列名和类的属性能对应上,则报表格列名无法映射至对象属性,无法完成转换异常。

当这些校验成功后,开始循环处理表格行记录,把每一行都转换为一个对象。

我们可以通过反射动态实例化对象,再通过反射对对象的属性动态赋值。因为我们的对象即支持类也支持结构体,因此这里面就会遇到一个技术问题,正常的property.SetValue方法并没有办法给结构体动态赋值。

这是因为结构体是值类型,而property.SetValue方法的参数都是object,因此这里面就涉及到装箱拆箱,因此SetValue是设置了装箱以后的对象,而并不能改变原对象。

而解决办法就是先把结构体赋值给object变量,然后对object变量进行SetValue设置值,最后再把object变量转为结构体。

下面我们一起看看具体实现代码:

//把表格转换为对象集合
//如果设置DescriptionAttribute,则将特性值作为表格的列名称
//否则将属性名作为表格的列名称
public static IEnumerable<T> ToModels<T>(DataTable dataTable)
{
    //T必须是结构体或类,并且不能是集合类型
    AssertTypeValid<T>();
    if (0 == dataTable.Rows.Count)
    {
        return [];
    }
    //获取T所有可写入属性
    var properties = typeof(T).GetProperties().Where(u => u.CanWrite);
    //校验表格是否能转换为对象
    var isCanParse = IsCanMapDataTableToModel(dataTable, properties);
    if (!isCanParse)
    {
        throw new NotSupportedException("The column name of the table cannot be mapped to an object property, and the conversion cannot be completed.");
    }
    var models = new List<T>();
    foreach (DataRow dr in dataTable.Rows)
    {
        //通过反射实例化T
        var model = Activator.CreateInstance<T>();
        //把行数据映射到对象上
        if (typeof(T).IsClass)
        {
            //处理T为类的情况
            MapRowToModel<T>(dr, model, properties);
        }
        else
        {
            //处理T为结构体的情况
            object boxed = model!;
            MapRowToModel<object>(dr, boxed, properties);
            model = (T)boxed;
        }
        models.Add(model);
    }
    return models;
}
//校验表格是否能转换为对象
private static bool IsCanMapDataTableToModel(DataTable dataTable, IEnumerable<PropertyInfo> properties)
{
    var isCanParse = false;
    foreach (var property in properties)
    {
        //根据属性获取列名
        var columnName = GetColumnName(property);
        if (!dataTable.Columns.Contains(columnName))
        {
            continue;
        }
        isCanParse = true;
    }
    return isCanParse;
}
//把行数据映射到对象上
private static void MapRowToModel<T>(DataRow dataRow, T model, IEnumerable<PropertyInfo> properties)
{
    foreach (var property in properties)
    {
        //根据属性获取列名
        var columnName = GetColumnName(property);
        if (!dataRow.Table.Columns.Contains(columnName))
        {
            continue;
        }
        //获取单元格值
        var value = dataRow[columnName];
        if (value != DBNull.Value)
        {
            //给对象属性赋值
            property.SetValue(model, Convert.ChangeType(value, property.PropertyType));
        }
    }
}

我们做个简单的单元测试:

[Fact]
public void ToModels()
{
    //验证正常情况
    var table = TableHelper.Create<Student<double>>();
    var row1 = table.NewRow();
    row1[0] = "Id-11";
    row1[1] = "名称-12";
    row1[2] = 33.13;
    table.Rows.Add(row1);
    var row2 = table.NewRow();
    row2[0] = "Id-21";
    row2[1] = "名称-22";
    row2[2] = 33.23;
    table.Rows.Add(row2);
    var students = TableHelper.ToModels<Student<double>>(table);
    Assert.Equal(2, students.Count());
    Assert.Equal("Id-11", students.ElementAt(0).Id);
    Assert.Equal("名称-12", students.ElementAt(0).Name);
    Assert.Equal(33.13, students.ElementAt(0).Age);
    Assert.Equal("Id-21", students.ElementAt(1).Id);
    Assert.Equal("名称-22", students.ElementAt(1).Name);
    Assert.Equal(33.23, students.ElementAt(1).Age);
}

02
、把对象集合转换为表格

该方法首先会调用根据对象创建表格方法得到一个空白表格,然后通过反射获取对象的所有属性,然后循环处理对象集合,把一个对象的所有属性值一个一个添加行的所有列中,这样就完成了一个对象映射成表的一行记录,直至所有对象转换完成即可得到一个表格。

代码如下:

//把对象集合转为表格
//如果设置DescriptionAttribute,则将特性值作为表格的列名称
//否则将属性名作为表格的列名称
public static DataTable ToDataTable<T>(IEnumerable<T> models, string? tableName = null)
{
    //创建表格
    var dataTable = Create<T>(tableName);
    if (models == null || !models.Any())
    {
        return dataTable;
    }
    //获取所有属性
    var properties = typeof(T).GetProperties().Where(u => u.CanRead);
    foreach (var model in models)
    {
        //创建行
        var dataRow = dataTable.NewRow();
        foreach (var property in properties)
        {
            //根据属性获取列名
            var columnName = GetColumnName(property);
            //填充行数据
            dataRow[columnName] = property.GetValue(model);
        }
        dataTable.Rows.Add(dataRow);
    }
    return dataTable;
}

进行如下单元测试:

[Fact]
public void ToDataTable()
{
    //验证正常情况
    var students = new List<Student<double>>();
    var student1 = new Student<double>
    {
        Id = "Id-11",
        Name = "名称-12",
        Age = 33.13
    };
    students.Add(student1);
    var student2 = new Student<double>
    {
        Id = "Id-21",
        Name = "名称-22",
        Age = 33.23
    };
    students.Add(student2);
    var table = TableHelper.ToDataTable<Student<double>>(students, "学生表");
    Assert.Equal("学生表", table.TableName);
    Assert.Equal(2, table.Rows.Count);
    Assert.Equal("Id-11", table.Rows[0][0]);
    Assert.Equal("名称-12", table.Rows[0][1]);
    Assert.Equal("33.13", table.Rows[0][2].ToString());
    Assert.Equal("Id-21", table.Rows[1][0]);
    Assert.Equal("名称-22", table.Rows[1][1]);
    Assert.Equal("33.23", table.Rows[1][2].ToString());
}

03
、把一维数组作为一列转换为表格

该方法比较简单就是把一个一维数组作为一列数据创建一张表格,同时可以选择是否填写表名和列名。具体代码如下:

//把一维数组作为一列转换为表格
public static DataTable ToDataTableWithColumnArray<TColumn>(TColumn[] array, string? tableName = null, string? columnName = null)
{
    var dataTable = new DataTable(tableName);
    //创建列
    dataTable.Columns.Add(columnName, typeof(TColumn));
    //添加行数据
    foreach (var item in array)
    {
        var dataRow = dataTable.NewRow();
        dataRow[0] = item;
        dataTable.Rows.Add(dataRow);
    }
    return dataTable;
}

单元测试如下:

[Fact]
public void ToDataTableWithColumnArray()
{
    //验证正常情况
    var columns = new string[] { "A", "B" };
    var table = TableHelper.ToDataTableWithColumnArray<string>(columns, "学生表");
    Assert.Equal("学生表", table.TableName);
    Assert.Equal("Column1", table.Columns[0].ColumnName);
    Assert.Equal(2, table.Rows.Count);
    Assert.Equal("A", table.Rows[0][0]);
    Assert.Equal("B", table.Rows[1][0]);
    table = TableHelper.ToDataTableWithColumnArray<string>(columns, "学生表", "列");
    Assert.Equal("列", table.Columns[0].ColumnName);
}

04
、把一维数组作为一行转换为表格

该方法也比较简单就是把一个一维数组作为一行数据创建一张表格,同时可以选择是否填写表名。具体代码如下:

//把一维数组作为一行转换为表格
public static DataTable ToDataTableWithRowArray<TRow>(TRow[] array, string? tableName = null)
{
    var dataTable = new DataTable(tableName);
    //创建列
    for (var i = 0; i < array.Length; i++)
    {
        dataTable.Columns.Add(null, typeof(TRow));
    }
    //添加行数据
    var dataRow = dataTable.NewRow();
    for (var i = 0; i < array.Length; i++)
    {
        dataRow[i] = array[i];
    }
    dataTable.Rows.Add(dataRow);
    return dataTable;
}

05
、行列转置

该方法是指把DataTable中的行和列互换,就是行的数据变成列,列的数据变成行。如下图示例:

这个示例转换,第一个表格中列名并没有作为数据进行转换,因此我们会提供一个可选项参数用来指示要不要把类目作为数据进行转换。

整个方法实现逻辑也很简单,就是以原表格行数为列数创建一个新表格,然后在循环处理原表格列,并把原表格一列数据填充至新表格的一行数据中,直至原表格所有列处理完成则完成行列转置。具体代码如下:

//行列转置
public static DataTable Transpose(DataTable dataTable, bool isColumnNameAsData = true)
{
    var transposed = new DataTable(dataTable.TableName);
    //如果列名作为数据,则需要多加一列
    if (isColumnNameAsData)
    {
        transposed.Columns.Add();
    }
    //转置后,行数即为新的列数
    for (int i = 0; i < dataTable.Rows.Count; i++)
    {
        transposed.Columns.Add();
    }
    //以列为单位,一次处理一列数据
    for (var column = 0; column < dataTable.Columns.Count; column++)
    {
        //创建新行
        var newRow = transposed.NewRow();
        //如果列名作为数据,则先把列名加入第一列
        if (isColumnNameAsData)
        {
            newRow[0] = dataTable.Columns[column].ColumnName;
        }
        //把一列数据转为一行数据
        for (var row = 0; row < dataTable.Rows.Count; row++)
        {
            //如果列名作为数据,则行数据从第二列开始填充
            var rowIndex = isColumnNameAsData ? row + 1 : row;
            newRow[rowIndex] = dataTable.Rows[row][column];
        }
        transposed.Rows.Add(newRow);
    }
    return transposed;
}

下面进行简单的单元测试:

[Fact]
public void Transpose_ColumnNameAsData()
{
    DataTable originalTable = new DataTable("测试");
    originalTable.Columns.Add("A", typeof(string));
    originalTable.Columns.Add("B", typeof(int));
    originalTable.Columns.Add("C", typeof(int));
    originalTable.Rows.Add("D", 1, 2);
    //列名作为数据的情况
    var table = TableHelper.Transpose(originalTable, true);
    Assert.Equal(originalTable.TableName, table.TableName);
    Assert.Equal("Column1", table.Columns[0].ColumnName);
    Assert.Equal("Column2", table.Columns[1].ColumnName);
    Assert.Equal(3, table.Rows.Count);
    Assert.Equal("A", table.Rows[0][0]);
    Assert.Equal("D", table.Rows[0][1]);
    Assert.Equal("B", table.Rows[1][0]);
    Assert.Equal("1", table.Rows[1][1].ToString());
    Assert.Equal("C", table.Rows[2][0]);
    Assert.Equal("2", table.Rows[2][1].ToString());
}


:测试方法代码以及示例源码都已经上传至代码库,有兴趣的可以看看。
https://gitee.com/hugogoos/Ideal