2023年3月

本文介绍在
Anaconda
环境下,安装
Python
中的一个高级地理空间数据分析库
whitebox
的方法。

首先,我们打开“
Anaconda Prompt (Anaconda)
”软件。

随后,将弹出如下所示的命令输入窗口。

在上述弹出的命令输入窗口中,输入以下代码:

conda install -c conda-forge whitebox

随后,系统将自动搜索
whitebox
这一模块,并准备安装。

在这里有一点需要注意的是:如果我们开启了网络代理软件,则可能会导致系统找不到
whitebox
这一模块元数据的下载地址,出现如下所示的错误提示。

针对这种情况,我们将网络代理软件关闭后,重新输入前述代码,即可解决问题。

待系统找到
whitebox
这一模块的元数据后,我们输入
y
即可开始下载、安装的过程。

稍等片刻;成功完成下载与安装后,会出现如下所示的界面。

为了验证我们
whitebox
模块的安装是否成功,我们可以在编译器中尝试加载这一模块;若发现可以成功加载,则说明
whitebox
模块安装无误。

此外,在第一次使用
whitebox
这个模块时,程序还将自动进行一些预处理操作,包括下载该模块的预编译二进制文件、下载测试数据等。

预处理操作完毕后,将自动开始执行接下来的其它代码。

至此,大功告成。

本文示例代码已上传至我的
Github
仓库
https://github.com/CNFeffery/dash-master

大家好我是费老师,就在昨晚,
Dash
框架发布了其2.9.0版本更新,在一众更新内容中,有两条新特性在我看来
尤为重要
,可以大幅度提升我们开发
Dash
应用的效率,下面我就将带大家一起了解它们的具体内容:

1 允许多个回调函数重复Output

在之前版本的
Dash
中,严格限制了不同的回调函数不可以对相同的
id.属性
目标进行输出,以下面的示例应用为例:

import dash
from dash import html
import feffery_antd_components as fac
from dash.dependencies import Input, Output

app = dash.Dash(__name__)

app.layout = html.Div(
    [
        fac.AntdSpace(
            [
                fac.AntdButton(
                    '按钮1',
                    id='button-demo1'
                ),
                fac.AntdButton(
                    '按钮2',
                    id='button-demo2'
                )
            ]
        ),
        fac.AntdParagraph(
            id='output-demo'
        )
    ],
    style={
        'padding': '50px 100px'
    }
)


@app.callback(
    Output('output-demo', 'children'),
    Input('button-demo1', 'nClicks'),
    prevent_initial_call=True
)
def trigger1(nClicks):

    return f'按钮1: {nClicks}'


@app.callback(
    Output('output-demo', 'children'),
    Input('button-demo2', 'nClicks'),
    prevent_initial_call=True
)
def trigger2(nClicks):

    return f'按钮2: {nClicks}'


if __name__ == '__main__':
    app.run(debug=True)

如果我们希望两个
AntdButton
分别点击后,可以通过两个不同的回调函数对同一
AntdPargraph
的内容进行输出,在之前的版本中默认会报下图所示的
Duplicate callback outputs
错误:

在之前的版本中遇到这种情况解决方式也有很多,常用的如将多个回调函数整合为一个并在回调函数中,再基于
dash.ctx.triggered_id
判断每次回调函数究竟是由哪个
Input
触发的,这在较复杂回调功能的编写中就不太方便了。

而从
Dash
2.9.0版本开始,为
Output()
引入了
bool
型新参数
allow_duplicate
,默认为
False
,当设置为
True
后,当前
Output
便可以允许通过多个回调函数共同输出,将上面的例子回调部分进行改造,对后续重复的
Output
设置
allow_duplicate=True

@app.callback(
    Output('output-demo', 'children', allow_duplicate=True),
    Input('button-demo2', 'nClicks'),
    prevent_initial_call=True
)
def trigger2(nClicks):

    return f'按钮2: {nClicks}'

就可以不受限制啦~

当然,虽然有了这个新特性帮助我们解除了不少限制,但是我的建议还是不要滥用,它不一定可以使得我们的代码更简洁,基于
dash.ctx.triggered_id
的分支判断在很多场景下还是更合适。

作为一个新的功能,
allow_duplicate
目前在常规的
服务端
回调函数中运作正常,但在
浏览器端
回调函数中暂时无法使用,静待后续
Dash
官方的更新。

2 新增Patch()操作模式

Dash
2.9.0版本中新增参数局部快捷更新操作
Patch()
,使得我们可以在回调函数中对目标属性进行局部更新,这样说起来还是比较抽象,我们举例说明:

假如我们的应用中要实现这样的交互逻辑:每点击一次
AntdButton
,就会在下方
AntdSpace
中新增一行文字内容,在
以前
的版本中,要实现这个功能,我们需要在回调函数中额外将目标
AntdSpace

children
属性作为
State
传入,从而在每次回调执行时,将新的一行内容追加到先前状态的
children
列表中,再进行输出:

import dash
import uuid
from dash import html
import feffery_antd_components as fac
from dash.dependencies import Input, Output, State

app = dash.Dash(__name__)

app.layout = html.Div(
    [
        fac.AntdButton(
            '新增一行',
            id='add-new-line'
        ),
        fac.AntdSpace(
            [],
            id='target-container',
            direction='vertical',
            style={
                'width': '100%'
            }
        )
    ],
    style={
        'padding': '50px 100px'
    }
)


@app.callback(
    Output('target-container', 'children'),
    Input('add-new-line', 'nClicks'),
    State('target-container', 'children'),
    prevent_initial_call=True
)
def add_new_line(nClicks, origin_children):

    return [
        *origin_children,
        str(uuid.uuid4())
    ]


if __name__ == '__main__':
    app.run(debug=True)

这样做的弊端很明显——我们每次更新都需要先取回目标属性的现有状态,这带来了多余的资源消耗,而有了
Patch()
模式,我们就可以将回调函数改写为下面的形式,实现相同的效果:

@app.callback(
    Output('target-container', 'children'),
    Input('add-new-line', 'nClicks'),
    prevent_initial_call=True
)
def add_new_line(nClicks):

    patch = dash.Patch()
    patch.append(str(uuid.uuid4()))

    return patch

相当于在回调函数中通过实例化
Patch
,创建了针对目标
Output
的远程代理对象,在回调函数中针对该代理对象的各种常用操作,都会在回调函数执行后落实到用户浏览器中的目标属性上,这听起来可能有些抽象,我用下面的例子展示了基于
Patch
可以实现的常用局部值操作(对应代码受篇幅限制,请在文章开头的
github
仓库中查看):


以上就是本文的全部内容,对
Dash
应用开发感兴趣的朋友,欢迎添加微信号
CNFeffery
,备注“dash学习”加入我的技术交流群,一起成长一起进步。

您可以使用以下 SQL 语句删除 MS SQL Server 表中重复的行:

WITH CTE AS (
  SELECT ROW_NUMBER() OVER(PARTITION BY column1, column2, ... columnN ORDER BY (SELECT 0)) RN
  FROM table_name
)
DELETE FROM CTE WHERE RN > 1;

您需要将
table_name
替换为要删除重复行的表名,并将
column1, column2, ... columnN
替换为用于检查重复的列名。该语句使用
ROW_NUMBER()
函数和
PARTITION BY
子句来标识重复的行,然后使用
DELETE
语句删除其中一个副本。

这样说有些抽象,下面举一个例子:

比如我有一个deadUrlRecord_copy1 表,存的数据如下格式。

img

这个表存在一个问题,url列有一部分是重复的。用group by语句可以查出来,有挺多重复的,那么,如何删除多余的数据,只保留一条呢?

img

这就要采用文章开头给出的语句了。

WITH cte AS (
    SELECT url,
           ROW_NUMBER() OVER (PARTITION BY url ORDER BY url) AS rn
    FROM deadUrlRecord_copy1
    WHERE status = 'NotFound'
)
DELETE FROM cte WHERE rn > 1;

乍一看一脸懵逼,但是执行发现竟然成功删除了重复数据,达到了预期效果,为什么呢?

这要解释下这一行代码:

ROW_NUMBER() OVER (PARTITION BY url ORDER BY url) AS rn  

这是一种 SQL 语法,用于对一个查询结果集的行进行编号,并且可以根据特定列来分组编号。

具体来说,
ROW_NUMBER()
是一个窗口函数,它会为查询结果集中每一行计算一个行号。而
OVER
子句则是指定如何定义窗口(window),也就是要给哪些行计算行号。在这个例子中,
PARTITION BY url
表示按照
url
这一列进行分组,也就是说对于每个不同的
url
分别计算行号;
ORDER BY url
则表示按照
url
这一列进行排序,这样同一个
url
中的行就会按照
url
的值依次排列。最后,
AS rn
则是给这个新的行号列起个名字,即
rn

例如,假设有如下表格:

id url
1 www.example.com
2 www.example.com
3 www.example.com/foo
4 www.example.com/bar
5 www.google.com

如果执行以下 SQL 查询:

SELECT id, url, ROW_NUMBER() OVER (PARTITION BY url ORDER BY url) AS rn FROM my_table;

则会得到以下结果:

id url rn
1 www.example.com 1
2 www.example.com 2
3 www.example.com/foo 1
4 www.example.com/bar 1
5 www.google.com 1

其中,同一个
url
中的行拥有相同的行号,同时这个行号是按照
url
的值进行排序的。


然后执行刚才那段代码的
片段
试一下,可能更好理解:

img

url不同的,行号都是1。相同的,会从1开始排序,所有就出现了2.

然后用 DELETE FROM cte WHERE rn > 1;  删除行号>1的数据,就成功把多余的数据删除了,非常巧妙。

一、概述

量子随机游走是一种基于量子力学的随机游走模型,其具有良好的随机性和不可预测性,因此被广泛应用于密码学中。基于量子随机游走的图像加密算法是一种新兴的加密算法,其基本思路是将明文图像转换为量子态,通过量子随机游走对量子态进行加密,最后将加密后的量子态转换为密文图像。

二、算法流程

  1. 将明文图像转换为量子态:首先将明文图像转换为二进制编码,然后将编码转换为对应的量子态。这可以通过一些常见的量子态制备方法来实现,例如 Hadamard 变换、相位旋转门等。

  2. 进行量子随机游走加密:将生成的量子态输入到哈密顿量中进行量子随机游走,以实现加密。其中哈密顿量的构造和具体实现方法可能有多种选择,例如可以利用 Grover 算法来进行哈密顿量的构造。

  3. 将加密后的量子态转换为密文图像:将加密后的量子态进行测量,然后将测量结果转换为对应的二进制编码,最后将编码还原为密文图像。

需要注意的是,量子随机游走过程的成功与否取决于构造的哈密顿量,而构造哈密顿量是该算法的关键。此外,加密和解密过程中需要保证密钥的安全性,否则会导致算法失效。

该算法具有很强的随机性和不可预测性,能够有效保护图像的安全性和隐私性。同时,由于量子随机游走具有高效的性质,因此该算法的加密效率也较高。但需要注意的是,该算法还处于研究阶段,其安全性和可靠性有待进一步验证和完善。

三、算法实现

  1. 图像像素置换的实现方式:可以通过使用置换矩阵的方式实现像素置换,也可以通过使用置换函数的方式实现像素置换。其中,置换矩阵可以通过随机生成一个置换矩阵来实现,置换函数可以通过使用一个随机的函数来实现。

  2. 图像像素值的量子态转换:需要将图像像素值转换成量子态。可以使用qiskit中的qubit来表示量子态,将图像像素值转换成qubit,然后通过量子门的操作,实现像素值的量子态转换。

  3. 量子随机游走的实现方式:可以通过使用量子随机游走的概率转移矩阵来实现,也可以通过使用量子随机游走的电路来实现。其中,概率转移矩阵可以通过随机生成一个概率转移矩阵来实现,量子随机游走的电路可以通过qiskit中的量子门和量子电路实现。

  4. 加密解密的实现方式:基于量子随机游走的图像加密算法可以通过对图像像素值进行加密,然后通过相反的过程对图像像素值进行解密。在实现过程中,需要注意加密和解密的过程是相反的,即解密过程需要使用加密过程中使用的参数和密钥。

  5. 算法的性能和安全性:需要考虑算法的性能和安全性。算法的性能可以通过对加密和解密的时间和空间复杂度进行分析来评估。算法的安全性可以通过对算法的加密强度进行分析来评估,例如,对算法的置换矩阵和概率转移矩阵进行分析,以及对算法的安全性进行模拟和实验验证。

四、部分功能代码实现

1.置换矩阵

假设需要对一个n×n的图像进行像素置换,可以随机生成一个n×n的置换矩阵P来实现像素置换。置换矩阵P中的每一个元素都是0或1,且每一行和每一列都有且仅有一个元素为1,其余元素都为0。在像素置换的过程中,可以将原始图像的每一个像素的位置与置换矩阵P中对应的位置进行置换,从而实现像素的混淆。

MATLAB

function P = generate_permutation_matrix(n)
% 生成一个n×n的置换矩阵,用于图像像素置换

P = zeros(n);
for i = 1:n
    row_idx = randperm(n, 1);
    P(i, row_idx) = 1;
end

end

该函数使用MATLAB内置函数
randperm
生成一个n×1的随机排列向量
row_idx
,并将其赋值给置换矩阵P中的第i行中的第
row_idx
个元素。最后,将生成的置换矩阵P返回。

该函数可以通过输入置换矩阵的维度n来生成一个n×n的置换矩阵。例如,通过执行以下代码可以生成一个10×10的置换矩阵P:

P = generate_permutation_matrix(10);

Python

import numpy as np

def generate_permutation_matrix(n):
    """
    生成一个n×n的置换矩阵,用于图像像素置换

    Args:
        n: int, 置换矩阵的维度

    Returns:
        P: np.ndarray, n×n的置换矩阵
    """
    P = np.zeros((n, n), dtype=int)
    for i in range(n):
        row_idx = np.random.choice(n, size=1, replace=False)
        P[i][row_idx] = 1
    return P

该函数使用numpy库生成一个n×n的全0矩阵P,并随机生成每一行中的一个元素为1,从而生成置换矩阵P。最后,将生成的置换矩阵P返回。

2.置换函数

在基于量子随机游走的图像加密算法中,需要使用置换函数对加密后的图像进行进一步混淆。置换函数可以将图像的每一个像素位置进行置换,从而增加破解的难度。

MATLAB

function permuted_image = permutation(image, P)
% 对图像进行像素置换

[n, ~] = size(image);
permuted_image = zeros(n);

for i = 1:n
    for j = 1:n
        new_i = find(P(:, i));
        new_j = find(P(:, j));
        permuted_image(new_i, new_j) = image(i, j);
    end
end

end

该函数接受两个输入参数:待置换的图像和置换矩阵P。在函数内部,通过
find
函数查找P矩阵中值为1的元素所在的行和列,然后根据这些行和列将原始图像中的像素进行置换。最后,将置换后的图像返回。

需要注意的是,该函数实现的置换是对灰度图像进行的,对于彩色图像,需要对每个颜色通道分别进行置换。

Python

import numpy as np

def permutation(image, P):
    """
    对图像进行像素置换

    Args:
        image: np.ndarray, 待置换的图像
        P: np.ndarray, 置换矩阵

    Returns:
        permuted_image: np.ndarray, 置换后的图像
    """
    n = image.shape[0]
    permuted_image = np.zeros_like(image)
    for i in range(n):
        for j in range(n):
            new_i, new_j = np.where(P == 1)[1][i], np.where(P == 1)[1][j]
            permuted_image[new_i, new_j] = image[i, j]
    return permuted_image

该函数接受两个输入参数:待置换的图像和置换矩阵P。在函数内部,使用
np.where
函数查找P矩阵中值为1的元素所在的位置,并根据这些位置将原始图像中的像素进行置换。最后,将置换后的图像返回。

需要注意的是,该函数实现的置换是对灰度图像进行的,对于彩色图像,需要对每个颜色通道分别进行置换。

3.图像像素值转换成量子态

在量子图像加密算法中,需要将图像的像素值转换成对应的量子态,以便进行后续的量子操作。这个过程可以通过将每个像素值表示为二进制数,然后将每一位作为一个量子比特,转换成对应的量子态

MATLAB

function qc = pixel_to_quantum(image)
% 将图像的像素值转换成量子态

n = size(image, 1);
q = qubit(n);
qc = quantumCircuit(q);

for i = 1:n
    for j = 1:n
        % 将像素值表示为二进制数
        binary = dec2bin(image(i, j), 8) - '0';
        % 将每一位作为一个量子比特,转换成对应的量子态
        for k = 1:length(binary)
            if binary(k) == 1
                qc.x(q(k));
            end
        end
        qc.barrier();
    end
end

end

在MATLAB中,可以使用Quantum Computing Toolbox来实现将图像像素值转换成量子态。

该函数接受一个输入参数:待转换的图像。在函数内部,首先获取图像的大小n,然后创建一个包含n个量子比特的量子电路对象。接下来,对于每一个像素值,将其表示为8位二进制数,并将每一位作为一个量子比特,通过在量子电路中添加X门将其转换成对应的量子态。最后,将量子电路对象返回。

需要注意的是,该示例代码中假设图像是8位灰度图像,对于其他位数的灰度图像或彩色图像,需要相应地进行修改。另外,在转换过程中,也可以对像素值进行量子编码,以便在加密过程中加强图像的保密性。

Python

import numpy as np
from qiskit import QuantumCircuit, QuantumRegister

def pixel_to_quantum(image):
    """
    将图像的像素值转换成量子态

    Args:
        image: np.ndarray, 待转换的图像

    Returns:
        qc: QuantumCircuit, 量子电路对象
    """
    n = image.shape[0]
    qc = QuantumCircuit(n, name='PixelToQuantum')
    for i in range(n):
        for j in range(n):
            # 将像素值表示为二进制数
            binary = bin(image[i, j])[2:].zfill(8)
            # 将每一位作为一个量子比特,转换成对应的量子态
            for k, bit in enumerate(binary):
                if bit == '1':
                    qc.x(k)
            qc.barrier()
    return qc

在量子图像加密算法中,需要将图像的像素值转换成对应的量子态,以便进行后续的量子操作。这个过程可以通过将每个像素值表示为二进制数,然后将每一位作为一个量子比特,转换成对应的量子态。

该函数接受一个输入参数:待转换的图像。在函数内部,首先获取图像的大小n,然后创建一个包含n个量子比特的量子电路对象。接下来,对于每一个像素值,将其表示为8位二进制数,并将每一位作为一个量子比特,通过在量子电路中添加X门将其转换成对应的量子态。最后,将量子电路对象返回。

需要注意的是,该示例代码中假设图像是8位灰度图像,对于其他位数的灰度图像或彩色图像,需要相应地进行修改。另外,在转换过程中,也可以对像素值进行量子编码,以便在加密过程中加强图像的保密性。

4.量子随机游走的代码实现

量子随机游走是一个重要的量子算法,可以用于图像加密中的置换操作。在MATLAB中,可以使用Quantum Computing Toolbox来实现量子随机游走。

function qc = quantum_random_walk(n)
% 量子随机游走

q = qubit(n);
qc = quantumCircuit(q);

% 初始化量子比特为均匀叠加态
for i = 1:n
    qc.h(q(i));
end
qc.barrier();

% 定义角度和目标状态
angle = pi / 4;
target = zeros(1, n);
target(1) = 1;

% 执行量子随机游走
for i = 1:n
    % 应用哈达玛门
    qc.h(q(i));
    qc.barrier();
    % 应用相位门
    for j = 1:n
        if j == i
            continue;
        end
        qc.cphase(angle, q(j), q(i));
    end
    qc.barrier();
    % 应用反射门
    qc.mct(q(1:n-1), q(n), [], 'inverse');
    qc.barrier();
end

% 应用逆哈达玛门
for i = 1:n
    qc.h(q(i));
end
qc.barrier();

end

该函数接受一个输入参数n,表示量子比特的数量。在函数内部,首先创建一个包含n个量子比特的量子电路对象,并将所有量子比特初始化为均匀叠加态。接下来,定义角度和目标状态,并按照量子随机游走算法的步骤依次应用哈达玛门、相位门、反射门和逆哈达玛门。最后,将量子电路对象返回。

需要注意的是,该示例代码中仅实现了一次量子随机游走,而在实际应用中通常需要多次执行量子随机游走以增强图像的置换效果。此外,量子随机游走的参数设置对于图像加密的效果也有重要影响,需要根据具体情况进行调整。

5.使用量子随机游走的概率转移矩阵来实现量子随机游走

在图像加密中,可以使用量子随机游走的概率转移矩阵来实现置换操作。假设有n个像素点,可以将每个像素点表示为一个n维的列向量,其中每个维度表示像素点的取值。量子随机游走的概率转移矩阵可以定义为:

P = (1/2n) * (I - D),

其中I是n维的单位矩阵,D是度数矩阵,定义为:

D(i, i) = sum(P(i, j)) (i = 1, 2, ..., n)

可以使用MATLAB来实现量子随机游走的概率转移矩阵,代码如下:

function [P, D] = quantum_random_walk_matrix(n)
% 量子随机游走的概率转移矩阵

% 初始化单位矩阵和度数矩阵
I = eye(n);
D = zeros(n);

% 计算度数矩阵
for i = 1:n
    for j = 1:n
        if i == j
            continue;
        end
        D(i, i) = D(i, i) + 1/norm(i-j, 2);
    end
end

% 计算概率转移矩阵
P = (1/2/n) * (I - D);

end

该函数接受一个输入参数n,表示像素点的数量。在函数内部,首先初始化单位矩阵和度数矩阵,并计算度数矩阵的每个元素。接下来,按照概率转移矩阵的定义计算概率转移矩阵P,并将度数矩阵D和概率转移矩阵P作为输出返回。

需要注意的是,在实际应用中,量子随机游走的概率转移矩阵需要与图像进行对应,即将像素点的n维列向量按照概率转移矩阵进行置换。此外,量子随机游走的概率转移矩阵的参数设置也对图像加密的效果有重要影响,需要根据具体情况进行调整。

6.使用qiskit中的量子门和量子电路实现量子随机游走

在qiskit中,可以通过使用qiskit.quantum_info库中的量子门和量子电路来实现量子随机游走。

示例代码

首先,需要导入必要的库和模块:

from qiskit import QuantumCircuit, QuantumRegister, ClassicalRegister
from qiskit.quantum_info import Operator

然后,需要构造一个初始的量子电路,其中包含每个像素值对应的量子比特。假设我们要处理的是一个4比特的图像,那么可以这样构造电路:

qr = QuantumRegister(4, 'q')
cr = ClassicalRegister(4, 'c')
circuit = QuantumCircuit(qr, cr)

接下来,需要将每个像素值转换为一个复数,并将其作为量子比特的初始态。可以使用qiskit.quantum_info.Statevector.from_label()函数来实现这一步骤:

import numpy as np

# 假设像素值为[1, 2, 3, 4]
initial_state = np.array([1, 2, 3, 4]) / np.sqrt(np.sum(np.square([1, 2, 3, 4])))
circuit.initialize(initial_state, [qr[0], qr[1], qr[2], qr[3]])

接下来,需要构造量子随机游走的概率转移矩阵。可以使用qiskit.quantum_info.Operator.from_label()函数来实现这一步骤。例如,可以使用Grover的搜索算法来构造一个概率转移矩阵:

# 构造一个包含两个目标状态的搜索算法
grover_op = Operator.from_label('10')
diffusion_op = Operator.from_label('H' * 4) * Operator.from_label('X' * 4) * Operator.from_label('H' * 4) * Operator.from_label('Z' * 4) * Operator.from_label('H' * 4)
grover_op = diffusion_op * grover_op * diffusion_op

# 将概率转移矩阵作为一个量子门添加到电路中
circuit.append(grover_op, [qr[0], qr[1], qr[2], qr[3]])

最后,可以测量电路的输出结果,并将结果转换为像素值。可以使用qiskit的simulator模块来模拟电路的输出结果,例如:

from qiskit import Aer, execute

simulator = Aer.get_backend('qasm_simulator')
result = execute(circuit, simulator).result()
counts = result.get_counts(circuit)

# 将测量结果转换为像素值
pixel_values = []
for key in counts.keys():
    pixel_value = int(key, 2)
    pixel_values.append(pixel_value)

这样就可以实现量子随机游走,并将量子态转换为像素值。

通过 qiskit 框架来实现量子随机游走的电路

导入必要的包和模块:

from qiskit import QuantumRegister, ClassicalRegister, QuantumCircuit
from qiskit.circuit.library import QFT
from qiskit.quantum_info.operators import Operator
import numpy as np


定义量子寄存器和量子电路:

# 定义量子寄存器
qr = QuantumRegister(n, name='q')

# 定义经典寄存器
cr = ClassicalRegister(n, name='c')

# 定义量子电路
qc = QuantumCircuit(qr, cr, name='qrw')

其中,
n
表示量子比特数。接着,我们需要定义概率转移矩阵和其对应的量子门:

# 定义概率转移矩阵
p = np.zeros((2 ** n, 2 ** n))
for i in range(2 ** n):
    for j in range(2 ** n):
        if i == j:
            p[i][j] = 1 / (2 ** n)
        else:
            p[i][j] = (1 - gamma) / (2 ** n - 1)
    p[i] = p[i] / np.linalg.norm(p[i])

# 将概率转移矩阵转换为量子门
p_gate = Operator(p)

其中,
gamma
表示随机游走参数。接下来,我们需要将概率转移矩阵作用于量子电路上:

qc.unitary(p_gate, qr)

然后,我们需要添加逆量子傅里叶变换和测量操作:

qc.append(QFT(n, do_swaps=False).inverse(), qr)
qc.measure(qr, cr)

最后,我们可以通过 qiskit 提供的模拟器进行模拟和测试:

from qiskit import Aer, execute

backend = Aer.get_backend('qasm_simulator')
job = execute(qc, backend, shots=1024)
result = job.result()
counts = result.get_counts(qc)
print(counts)

前言

最近一直在忙(2月份沉迷steam,3月开始工作各种忙),好久没更新博客了,不过也积累了一些,忙里偷闲记录一下。

这个需求是这样的,我之前做了个工单系统,现在要对登录、注册、发起工单这些功能做限流,不能让用户请求太频繁。

从 .Net7 开始,已经有内置的限流功能了,但目前我们的项目还在使用 .Net6 LTS 版本,下一个 LTS 没发布之前,暂时不考虑使用 .Net7 这种非 LTS 版本。

然后我找到了这个
AspNetCoreRateLimit
组件,在 Github 上有接近三千个星星,看了一下文档使用也简单灵活,于是决定尝试一下~

AspNetCoreRateLimit 组件

项目主页:
https://github.com/stefanprodan/AspNetCoreRateLimit

这是官方的介绍:

AspNetCoreRateLimit is an ASP.NET Core rate limiting solution designed to control the rate of requests that clients can make to a Web API or MVC app based on IP address or client ID.

The AspNetCoreRateLimit
NuGet package
contains an
IpRateLimitMiddleware
and a
ClientRateLimitMiddleware
, with each middleware you can set multiple limits for different scenarios like allowing an IP or Client to make a maximum number of calls in a time interval like per second, 15 minutes, etc. You can define these limits to address all requests made to an API or you can scope the limits to each API URL or HTTP verb and path.

用最近很厉害的 ChatGPT 翻译一下:

AspNetCoreRateLimit是一个ASP.NET Core速率限制解决方案,旨在基于IP地址或客户端ID控制客户端对Web API或MVC应用程序发出请求的速率。

AspNetCoreRateLimit
NuGet包
包含一个
IpRateLimitMiddleware
和一个
ClientRateLimitMiddleware
,每个中间件都可以为不同的场景设置多个限制,比如允许IP或客户端在时间间隔内进行最大数量的调用,比如每秒、15分钟等。您可以定义这些限制以处理对API发出的所有请求,也可以将限制范围限定为每个API URL或HTTP动词和路径。

这个组件使用起来挺灵活的,直接在
AspNetCore配置
里定义规则,意味着可以不重新编译程序就修改限流规则,官方给的例子是直接在
appsettings.json
里配置,但使用其他配置源理论上也没问题(配置中心用起来)。

简单介绍下这个组件的思路

首先它有两种模式:

  • 根据IP地址限流
  • 根据 ClientID 限流

IP地址很容易理解,ClientID 我一开始以为是用户ID,不过看了说明,是一个放在请求头里的参数,比如
X-ClientId
,这个要自己实现,可以直接用用户ID。

为了方便使用,我这个项目里面直接用IP地址模式。

RateLimit 组件可以配置全局的限流,也可以配置对某个IP地址(段)进行限流。

配置服务

为了从
appsettings.json
读取数据,先在
Program.cs
注册配置服务

builder.Services.AddOptions();

然后写个扩展方法来注册 RateLimit 的相关服务

引入命名空间

using AspNetCoreRateLimit;
using AspNetCoreRateLimit.Redis;
using StackExchange.Redis;

写个静态类

public static class ConfigureRateLimit {
    public static void AddRateLimit(this IServiceCollection services, IConfiguration conf) {
        //load general configuration from appsettings.json
        services.Configure<IpRateLimitOptions>(conf.GetSection("IpRateLimiting"));

        var redisOptions = ConfigurationOptions.Parse(conf.GetConnectionString("Redis"));
        services.AddSingleton<IConnectionMultiplexer>(provider => ConnectionMultiplexer.Connect(redisOptions));
        services.AddRedisRateLimiting();

        services.AddSingleton<IRateLimitConfiguration, RateLimitConfiguration>();
    }

    public static IApplicationBuilder UseRateLimit(this IApplicationBuilder app) {
        app.UseIpRateLimiting();

        return app;
    }
}

来解析一下配置的代码。

我暂时不需要对不同的IP地址段应用不同的限流规则

所以直接用
IpRateLimitOptions

services.Configure<IpRateLimitOptions>(conf.GetSection("IpRateLimiting"));

要做根据IP限流,就得记录每个IP访问了多少次,RateLimit 组件支持多种存储方式,最简单的可以直接存内存里,不过为了稳定我还是选择 Redis。

这几行代码就是配置 Redis 的。

var redisOptions = ConfigurationOptions.Parse(conf.GetConnectionString("Redis"));
services.AddSingleton<IConnectionMultiplexer>(provider => ConnectionMultiplexer.Connect(redisOptions));
services.AddRedisRateLimiting();

最后注入一下
IRateLimitConfiguration
,我猜应该是中间件要用到的。至少我目前在 Controller 代码里不需要用到任何跟 RateLimit 有关的代码。

写完了扩展方法,回到
Program.cs

注册服务

builder.Services.AddRateLimit(builder.Configuration);

添加中间件

var app = builder.Build();

app.UseExceptionless();
app.UseStaticFiles(new StaticFileOptions {
    ServeUnknownFileTypes = true
});
app.UseRateLimit();

// ...

app.Run();

我这里把
UseRateLimit
放在
UseStaticFiles
后面,不然页面里的静态文件都被算进去访问次数,很快就被限流了。

配置


appsettings.json
里写具体的限流规则。

官网提供的配置规则不能照抄,要理解一下他的文档

  • EnableEndpointRateLimiting
    - 这个选项要设置为 true ,不然设置的限流是全局的,不能根据某个路径单独设置限流
  • StackBlockedRequests
    - 按照默认的设置为 false 就行,设置成 true 的话,一个接口被限流之后再重复请求还会计算到访问次数里面,这样有可能导致限流到天荒地老。

其他的配置顾名思义,懂的都懂。

GeneralRules
是对具体路径的限流规则

如果全局限流,把
EnableEndpointRateLimiting
设置为 false 的话,那就这样设置,1分钟只能访问5次

{
    "Endpoint": "*",
    "Period": "1m",
    "Limit": 5
}

Endpoint
可以设置
HTTP方法:路径
的形式,比如
post:/account/login
具体看文档吧(参考文档第三条)

附上我的配置文件,对添加工单、登录、注册接口进行限流。

{
  "IpRateLimiting": {
    "EnableEndpointRateLimiting": true,
    "StackBlockedRequests": false,
    "RealIpHeader": "X-Real-IP",
    "ClientIdHeader": "X-ClientId",
    "HttpStatusCode": 429,
    "IpWhitelist": [],
    "EndpointWhitelist": [
      "get:/api/license",
      "*:/api/status"
    ],
    "ClientWhitelist": [
      "dev-id-1",
      "dev-id-2"
    ],
    "GeneralRules": [
      {
        "Endpoint": "*:/ticket/add",
        "Period": "1m",
        "Limit": 5
      },
      {
        "Endpoint": "post:/account/login",
        "Period": "1m",
        "Limit": 5
      },
      {
        "Endpoint": "post:/account/SignUp",
        "Period": "1m",
        "Limit": 5
      }
    ],
    "QuotaExceededResponse": {
      "Content": "{{ \"message\": \"先别急,你访问得太快了!\", \"details\": \"已经触发限流。限流规则: 每 {1} 只能访问 {0} 次。请 {2} 秒后再重试。\" }}",
      "ContentType": "application/json",
      "StatusCode": 429
    }
  }
}

同时自定义了被限流时的提示。

效果如下

{
  "message": "先别急,你访问得太快了!",
  "details": "已经触发限流。限流规则: 每 1m 只能访问 5 次。请 16 秒后再重试。"
}

参考资料