2023年3月

安全-简介

前一章中我们已经创建了第一个打算用于存储业务数据的表。在odoo这样的一个商业应用中,第一个考虑的问题就是谁(Odoo 用户(或者组用户))可以访问数据。odoo为指定用户组用户提供了一个安全的数据访问机制。

更多关于安全主题的详细信息可以查看
Advanced B: ACL and Record Rules
。本章目标是覆盖我们新模块的最小安全需求。

Data Files (CSV)

Odoo是一个高度数据驱动的系统。虽然通过Python代码定制行为,但是模块的部分值在加载时设置的数据中。 加载数据的方式之一是通过一个CSV文件,比如在安装
base
模块时加载的
country states列表

"id","country_id:id","name","code"
state_us_1,us,"Alabama","AL"
state_us_2,us,"Alaska","AK"
state_us_3,us,"Arizona","AZ"
state_us_4,us,"Arkansas","AR"
...
  • id

    外部标识
    。可用于引用记录(不需要知道在数据库中的唯一标识)。
  • country_id:id
    通过使用外部标识指向国家.
  • name
    州的名称
  • code
    州的代码


res.country.state
model中
定义
了这三个字段

按约定,导入数据的文件存放在模块的
data
目录中。当数据和安全相关时,存放在
security
目录中,当数据和视图和action相关时,存放在
views
目录中. 此外,所有这些文件必须在
__manifest__.py
中的
data
列表中定义

查看示例文件

# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.


{
    'name': 'Base',
    'version': '1.3',
    'category': 'Hidden',
    'description': """
The kernel of Odoo, needed for all installation.
===================================================
""",
    'depends': [],
    'data': [
        'data/res.lang.csv',
        'data/res_lang_data.xml',
        'data/res_partner_data.xml',
        'data/res_company_data.xml',
        'data/res_users_data.xml',
        'data/report_paperformat_data.xml',
        'data/res_currency_data.xml',
        'data/res_country_data.xml',
        'data/ir_demo_data.xml',
        'security/base_groups.xml',
        'security/base_security.xml',
        'views/base_menus.xml',
        'views/decimal_precision_views.xml',
        'views/res_config_views.xml',
        'data/res.country.state.csv',
        'views/ir_actions_views.xml',
        'views/ir_config_parameter_views.xml',
        'views/ir_cron_views.xml',
        'views/ir_filters_views.xml',
        'views/ir_mail_server_views.xml',
        'views/ir_model_views.xml',
        'views/ir_attachment_views.xml',
        'views/ir_rule_views.xml',
        'views/ir_sequence_views.xml',
        'views/ir_translation_views.xml',
        'views/ir_ui_menu_views.xml',
        'views/ir_ui_view_views.xml',
        'views/ir_default_views.xml',
        'data/ir_cron_data.xml',
        'report/ir_model_report.xml',
        'report/ir_model_templates.xml',
        'views/ir_logging_views.xml',
        'views/ir_qweb_widget_templates.xml',
        'views/ir_module_views.xml',
        'data/ir_module_category_data.xml',
        'data/ir_module_module.xml',
        'report/ir_module_reports.xml',
        'report/ir_module_report_templates.xml',
        'wizard/base_module_update_views.xml',
        'wizard/base_language_install_views.xml',
        'wizard/base_import_language_views.xml',
        'wizard/base_module_upgrade_views.xml',
        'wizard/base_module_uninstall_views.xml',
        'wizard/base_export_language_views.xml',
        'wizard/base_update_translations_views.xml',
        'wizard/base_partner_merge_views.xml',
        'wizard/base_document_layout_views.xml',
        'data/ir_actions_data.xml',
        'data/ir_demo_failure_data.xml',
        'views/res_company_views.xml',
        'views/res_lang_views.xml',
        'views/res_partner_views.xml',
        'views/res_bank_views.xml',
        'views/res_country_views.xml',
        'views/res_currency_views.xml',
        'views/res_users_views.xml',
        'views/ir_property_views.xml',
        'views/res_config_settings_views.xml',
        'views/report_paperformat_views.xml',
        'views/onboarding_views.xml',
        'security/ir.model.access.csv',
    ],
    'demo': [
        'data/res_company_demo.xml',
        'data/res_users_demo.xml',
        'data/res_partner_bank_demo.xml',
        'data/res_currency_rate_demo.xml',
        'data/res_bank_demo.xml',
        'data/res_partner_demo.xml',
        'data/res_partner_image_demo.xml',
    ],
    'test': [],
    'installable': True,
    'auto_install': True,
    'post_init_hook': 'post_init',
}

同时需要注意的是,
当前数据文件仅在模块被安装或者更新时才被加载。

注意:

按数据文件在
__manifest__.py
中的顺序加载文件数据。这意味着数据
A
引用
B
, 则必须确保
B
放在
A
之前。

在country states的案例中, 你会发现country列表优先于country states列表被加载。这是因为country states 引用了country.

为什么这对于安全来说很重要,因为模块的所有安全配置,都是通过数据文件加载的。

访问权限(Access Rights)

参考
: 该主题相关文档可以查看
Access Rights
.

本节目标是解决类似以下告警:

WARNING rd-demo odoo.modules.loading: The model estate.property has no access rules...

当模型中没有定义任何访问权限时,odoo会认为没有任何用户可以访问数据,并在日志中打印:

2022-12-14 09:01:38,994 32508 WARNING odoo odoo.modules.loading: The model estate.property has no access rules, consider adding one. E.g. access_estate_property,access_estate_property,model_estate_property,base.group_user,1,0,0,0 

访问权限被定义为
ir.model.access
模型记录。每个访问权限关联一个model,一个group(针对全局访问,没有组) 和一系列权限:
create
,
read
,
write

unlink
(等同于
delete
)。这些访问权限通常定义在一个名为
ir.model.access.csv
的CSV文件中。

之前
test.model
的一个示例

id,name,model_id/id,group_id/id,perm_read,perm_write,perm_create,perm_unlink
access_test_model,access_test_model,model_test_model,base.group_user,1,0,0,0
  • id
    为外部标识
  • name
    ir.model.access
    的名称
  • model_id/id
    代指需要应用访问权限的model。标准格式为
    model_<model_name>
    ,其中,
    <model_name>
    为模块中
    _name
    替换
    .

    _
    后的
    _name
    的值
  • group_id/id
    代指需应用访问权限的组。
  • perm_read,perm_write,perm_create,perm_unlink
    : 分别代表
    create
    ,
    read
    ,
    write

    unlink
    权限,1表示有访问权限,0-表示无权限

练习

添加访问权限

在合适的目录中创建
ir.model.access.csv
文件(本例为
odoo14/custom/estate/security/ir.model.access.csv
),并在
__manifest__.py
文件中进行定义。


base.group_user
授予
read
,
write
,
create

unlink
权限

id,name,model_id/id,group_id/id,perm_read,perm_write,perm_create,perm_unlink
access_estate_model,access_estate_model,model_estate_property,base.group_user,1,1,1,1

修改
__manifest__.py
文件(本例为
odoo14/custom/estate/__manifest__.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
{
    'name': 'estate',
    'depends': ['base'],
    'data':['security/ir.model.access.csv']
}

重启odoo服务,查看日志是否还存在告警。


Redis 是一种高性能、高可靠的内存数据存储和处理系统,它支持多种数据结构和协议,可以用于各种不同的应用场景。本文将介绍 Redis 的高级特性,包括持久化、事务、Lua 脚本等方面,以及如何使用这些特性实现高性能、高可靠的数据存储和处理。

高性能、高可用、高可扩展性的原理

  1. 基于内存的数据结构:Redis将数据存储在内存中,而不是硬盘中,因此可以实现非常高速的读写操作。
  2. 单线程的模型:Redis采用单线程的模型,避免了多线程之间的竞争问题,也减少了线程切换的开销。
  3. 高效的网络通信:Redis采用自己设计的简单协议进行通信,协议本身非常轻量级,减少了网络传输的开销。
  4. 异步非阻塞式IO:Redis采用异步非阻塞的IO模型,当IO操作完成后才会通知应用程序,避免了IO阻塞对性能的影响。
  5. 高效的持久化机制:Redis支持多种持久化机制,包括快照和AOF,可以满足不同的业务需求,同时也可以提高数据的安全性。
学会与人相处,建立良好的人际关系,
这是在职场中获得成功的关键。

持久化

Redis是一种内存数据库,它将数据存储在内存中,因此它非常快速。但是,如果Redis进程意外终止,所有数据将丢失。为了解决这个问题,Redis提供了持久化功能,它可以将内存中的数据异步写入磁盘,以便在Redis重启后可以恢复数据。

RDB持久化

RDB持久化是将Redis在某个时间点上的数据保存到硬盘上,可以看作是对Redis内存中的数据做一个快照。RDB持久化可以通过配置Redis服务器的时间间隔来自动触发,也可以手动执行。

AOF持久化

AOF持久化是将Redis的写操作以文本形式追加到文件中。AOF文件中的每个写操作都是一个Redis命令,当Redis服务器重启时,可以通过执行AOF文件中的所有命令来恢复数据。

持久化的配置

RDB配置

RDB持久化的配置文件为redis.conf。在配置文件中,可以通过以下配置项来控制RDB持久化的行为:

  • save:指定Redis自动触发RDB持久化的条件,格式为 save ,其中seconds表示时间间隔,changes表示数据变化的次数。例如,save 900 1 表示如果900秒内有至少1个键被修改,则触发RDB持久化。
  • stop-writes-on-bgsave-error:如果设置为yes,则如果RDB持久化失败,Redis服务器将停止接受写请求,直到RDB持久化成功为止。
    Redis还提供了以下与RDB持久化相关的命令:
  • save:手动触发RDB持久化。
  • bgsave:在后台异步执行RDB持久化。

AOF配置

AOF持久化的配置文件为redis.conf。在配置文件中,可以通过以下配置项来控制AOF持久化的行为:

  • appendonly:如果设置为yes,则开启AOF持久化。
  • appendfsync:指定AOF缓冲区何时将数据同步到硬盘,有以下三个选项:
    • always:每个Redis命令都会立即同步到硬盘。
    • everysec:每秒将AOF缓冲区中的数据同步到硬盘。
    • no:完全依赖操作系统将数据同步到硬盘。
  • no-appendfsync-on-rewrite:如果设置为yes,则当Redis执行AOF重写时,不将数据同步到硬盘。
    Redis还提供了以下与AOF持久化相关的命令:
  • bgrewriteaof:在后台异步执行AOF重写。
  • bgappendonly:在后台异步执行AOF缓冲区的数据同步到硬盘。

持久化的恢复

RDB的恢复

RDB持久化的恢复比较简单,只需将RDB文件复制到Redis服务器的工作目录,并在redis.conf文件中指定RDB文件的路径即可。Redis服务器启动时会自动加载RDB文件,并恢复数据。

人生如一场旅程,不要只看到目的地,
更要享受旅途中的美好。

AOF的恢复

AOF持久化的恢复相对复杂。首先,需要将AOF文件加载到Redis服务器中:

plaintextCopy code
redis-cli
CONFIG SET appendonly yes
BGREWRITEAOF

然后,需要清空Redis服务器中的数据:

plaintextCopy code
redis-cli
FLUSHALL

最后,执行AOF文件中的所有命令,恢复数据:

plaintextCopy code
redis-cli
CONFIG SET appendonly yes
BGREWRITEAOF

RDB和AOF的选择

在选择持久化方式时,需要根据实际的业务场景和需求来选择RDB或AOF持久化。如果对数据完整性要求较高,可以选择AOF持久化;如果对数据完整性要求不高,可以选择RDB持久化。

持久化对性能的影响

持久化会对Redis服务器的性能产生一定的影响,特别是在执行RDB持久化时,由于需要fork出子进程,会占用一定的CPU和内存资源。因此,在配置持久化时,需要根据实际情况来平衡数据安全和性能的需求。

数据的丢失问题

由于Redis的持久化是异步的,因此在Redis意外终止时,可能会丢失部分数据。为了最小化数据丢失的风险,可以使用AOF持久化,并将appendfsync设置为always。这将确保每个写操作都同步到磁盘上的AOF文件中。

事务

Redis事务是指在一次操作中执行多个命令,并且这些命令要么全部被执行,要么全部不执行。Redis事务可以保证一系列命令的原子性执行。

职场中最重要的能力并不是技术或知识,
而是沟通和协作的能力。

事务的优点

  • 原子性:Redis事务可以保证多个命令的原子性执行,即要么全部执行,要么全部不执行。
  • 性能:Redis事务可以将多个命令打包成一个批量操作,从而减少网络通信的开销,提高性能。
  • 一致性:Redis事务可以保证多个命令的一致性,即在执行事务期间,其他客户端不会对这些命令进行修改。

实现方式

  • MULTI:开始一个事务。
  • EXEC:执行事务中的所有命令。
  • DISCARD:取消事务。
  • WATCH:监视一个或多个键,如果在事务执行期间这些键被修改,事务将被取消。

示例:

/**
     * 事务操作
     * @param isOpenError 是否开启异常
     */
    public void transactionalMethod(boolean isOpenError) {
        redisTemplate.execute(new SessionCallback<List<Object>>() {
            @Override
            public List<Object> execute(RedisOperations operations) {
                operations.multi(); // 开启事务
                ValueOperations<String, String> valueOps = operations.opsForValue();
                valueOps.set("key1", "value1");
                if(isOpenError){
                    int i = 1 / 0;
                }
                valueOps.set("key2", "value2");
                List exec = operations.exec();//提交事务
                return exec;
            }
        });
    }

注意事项

  • Redis事务不支持回滚操作。
  • 如果在执行事务期间,键被其他客户端修改,那么事务将被取消。
  • Redis事务不支持嵌套事务。
  • Redis事务中的命令不能使用事务外的数据。
  • Redis事务中的命令不支持乐观锁。

应用场景

  • 批量操作:将多个命令打包成一个事务,从而减少网络通信的开销,提高性能。
  • 保证数据一致性:在需要保证数据一致性的场景中使用Redis事务可以避免因为并发操作导致数据不一致的问题。

发布订阅

发布和订阅是 Redis 的一种消息传递机制,它可以实现多个客户端之间的消息通信。下面是一个简单的 Redis 发布和订阅的示例

实现消息订阅者

public class RedisMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message, byte[] bytes) {
        System.out.println("收到消息: " + message.toString());
    }
}

注册消息订阅者

@Bean
    public RedisMessageListenerContainer redisContainer() {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisTemplate().getConnectionFactory());
        container.addMessageListener(new RedisMessageListener(), new ChannelTopic("pubsub:example"));
        return container;
    }

发送消息

public void publish(String message) {
        redisTemplate.convertAndSend("pubsub:example", message);
    }

lua脚本

永远保持谦虚和学习的心态,
才能不断提升自己,赢得更多的机会和尊重。

Redis 支持 Lua 脚本,可以通过编写 Lua 脚本来实现复杂的数据操作和处理。Redis 的 Lua 脚本可以访问 Redis 数据库,Redis 提供的各种命令和函数。

下面是一个使用lua脚本实现redis自增计数器的示例

public Long increment(String key) {
        DefaultRedisScript<Long> script = new DefaultRedisScript<>();
        script.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/increment.lua")));
        script.setResultType(Long.class);
        List<String> keys = Collections.singletonList(key);
        return redisTemplate.execute(script, keys);
    }

lua脚本

local key = KEYS[1]
local value = redis.call('INCR', key)
return value

使用 Lua 脚本可以将多个 Redis 命令封装在一个脚本中,减少网络开销和服务器负载。此外,Lua 脚本还可以实现 Redis 不支持的数据结构和算法,可以扩展 Redis 的功能和应用范围。

管道操作

管道(pipeline)是一种高效的Redis命令执行方式,它可以在一次通信中发送多个Redis命令,并一次性获取所有命令的响应结果。这种方式可以有效地降低Redis服务器的网络延迟和通信开销,提高Redis的性能。

下面是一个使用管道操作的示例

/**
     * 管道操作
     * 注意管道操作不支持事务和watch命令,需要谨慎使用。
     * 管道操作会将多个命令打包成一个请求发送给Redis服务器,
     * 如果其中一个命令执行失败,那么整个管道操作都会失败
     */
    public void pipelineExample() {
        List<Object> results = redisTemplate.executePipelined(new RedisCallback<Object>() {
            @Override
            public Object doInRedis(RedisConnection connection) throws DataAccessException {
                connection.stringCommands().set("key1".getBytes(), "value1".getBytes());
                connection.stringCommands().set("key2".getBytes(), "value2".getBytes());
                connection.stringCommands().set("key3".getBytes(), "value3".getBytes());
                return null;
            }
        });
        System.out.println(results); // 打印结果
    }

完整代码地址

https://gitee.com/youlaiorg/youlai-learning.git

总结

本文介绍了Redis的高级特性,包括持久化、事务、发布订阅、lua脚本和管道操作。其中,持久化可以实现数据的持久化存储,事务可以保证一系列命令的原子性执行,发布订阅可以实现多个客户端之间的消息通信,lua脚本可以实现复杂的数据操作和处理,管道操作可以在一次通信中发送多个Redis命令。此外,本文还介绍了Redis高性能、高可用、高可扩展性的原理,包括基于内存的数据结构、单线程的模型、高效的网络通信、异步非阻塞式IO和高效的持久化机制。

做事要讲求团队合作,相互支持,共同进步。

我们都用过 vue 的cli ,或者 react的cli,  亦或是其他的cli 如 vite 等。他们都是提供了一个全局命令,然后在终端执行这个全局命令就可以创建出模板项目。今天我们就自己做一个,给自己用的脚手架项目,帮助自己开发一些项目。

现在我们来造点需求。

背景:我们要在nextjs 项目中,添加页面路由,用过nextjs 的同学应该知道,这个pages 下面的文件就是页面路由。我们一般添加页面都在pages文件夹下新建文件 ,有时候还会在pages同级文件夹下新建components 文件夹,来放我们页面的组件。

要求:

    1. 可以单独创建一个页面路由
    2. 如果页面复杂,那我需要在pages同级目录下创建一个components文件夹下创建同名的组件文件夹
    3. 可以控制是否生成style 文件

分析:我们回忆下@vue/cli 这样的脚手架是如何使用的

npm install -g @vue/cli

vue create my-project

全局安装之后 开始使用这个全局命名。

那么我们就按照这个思路倒着打。

因此我们也需要搞一个全局命令,然后执行我们全局命令后,就可以执行我们想要的动作,创建文件夹,写文件等。

  1. 创建空项目,next-project-add-page
  2. npm init
  3. 修改 package.json, 添加 bin 字段
  4. 本地调试,先生成软连接 来让我们可以使用 next-add-page 命名,
    npm  link
  5. 核心代码编写
{
  "name": "next-project-add-page",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "bin": {
    "next-add-page": "index.js"
  },
  "author": "",
  "license": "ISC",
  "dependencies": {
    "commander": "^10.0.0"
  }
}

先上一个能走完大概流程的核心代码,里面一些细节的代码逻辑,我们后面再补充。

// index.js
// console.log(process.argv)
// [
//     'C:\\Program Files\\nodejs\\node.exe',
//     'C:\\Users\\Administrator\\AppData\\Roaming\\npm\\node_modules\\next-project-add-page\\index.js',
//     'add', // 想要执行的命令
//     'list', // 新增的页面
//     '-components', // 是否添加到pages同级的components 文件夹下为 组件
//     '-style', // 是否添加样式文件
// ]
const { program } = require('commander');
const child_process = require('child_process');
const { clearInterval } = require('timers');
const fs = require('fs');
const path = require('path');
program
  .option('-components')
  .option('-style');

program.parse();
console.log(program.opts()) // { Components: true, Style: true }

;


function intervalProgress() {
    const readline = require('readline');
    const unloadChar='-';
    const loadedChar='=';

    let i = 0;
    let time = setInterval(()=>{
        if(i>10){
            clearInterval(time);
            console.log(`创建完成,请查收`);
            process.exit(0);
        }
        readline.cursorTo(process.stdout,0,1);
        readline.clearScreenDown(process.stdout);
        renderProgress('文件创建中',i);
        i++;
    },200);

    function renderProgress(text,step){
        const PERCENT = Math.round(step*10);
        const COUNT = 2;
        const unloadStr = new Array(COUNT*(10-step)).fill(unloadChar).join('');
        const loadedStr = new Array(COUNT*(step)).fill(loadedChar).join('');
        process.stdout.write(`${text}:【${loadedStr}${unloadStr}|${PERCENT}%】`);
    }

}




function main(){
    if(process.argv[2] && process.argv[2] == 'add') {
        if(process.argv[3]) {
            // 页面名称
            // 判断pages 文件夹是否存在
            if(fs.existsSync(`./pages`) && !fs.existsSync(`./pages/${process.argv[3]}`)) {
                let subProcess= child_process.exec("cd ./pages && mkdir " + process.argv[3],  function(err,stdout){
                    if(err)console.log(err);
                        // 读取 tempalte/index.tsx 的文件
                        let tempalteStr = fs.readFileSync( path.resolve(__dirname, './template/index.tsx'));
                        fs.writeFile(`./pages/${process.argv[3]}/index.tsx`, tempalteStr, (err) =>{
                            console.log(err);
                        });
                   // 创建style less
                    if(program.opts().Style && process.argv[3]) {
                        fs.writeFile(`./pages/${process.argv[3]}/index.less`,'', (err) =>{
                            console.log(err);
                        });
                    }
                    subProcess.kill();
                });
            }
          
        }
    }
    
     if(program.opts().Components && process.argv[3]) {
         // 添加component组件
         // 页面名称
         // 判断components 文件夹是否存在
         if(!fs.existsSync(`./components`)) {
            fs.mkdirSync('./components');
         }
         if(!fs.existsSync(`./components/${process.argv[3]}`)){
            let subProcess=child_process.exec("cd ./components && mkdir " + process.argv[3], function(err,stdout){
                if(err)console.log(err);
                subProcess.kill();
           });
         }
     }

     // 展示进度条
     intervalProgress();
     // 精确点: 实时查询新建的几个文件(文件夹)是否创建成功,如果创建完毕,应该提前结束进度条
   
}

main();

代码大概解说:

  • 使用nodejs 的fs 模块 api,对文件的读写以及文件存在的判断。
  • commander 对命令行的解析,得到命令行具体的值后 继续做文件的读写
  • 使用 child_process 子进程 对文件的读写
  • process.stdout.write 来写终端的输出内容
  • 用template 文件夹 放 具体的模板文件,具体生成文件的时候,可以将模板的内容给到相应的文件中
上述代码不是很复杂,一些比较细节的内容我没有做细节处理,比如进度条的展示,这里是用一个 定时器 来模拟进度的一个过程,如果需要精确点可以监听文件的生成进度,从而调优进度条的展示。

现在可以nextjs 项目下 执行命令:next-add-page add list -components -style

next-add-page:是全局的一个命令

add: 是添加页面的 一个命令

list: 是添加页面的名称

-components: 在 components 文件夹下创建同名的组件 (可选)

-style:  创建样式文件 (可选)

是否可选,取决与代码对这个命令行参数的解析以及操作。

现在执行这个命令行

就可以看到项目中 生成了对应的文件。

到这里基本的需求已经完成了。

接下里是发布,

具体发布到npm 的步骤就不多说了,没有发布过的可以参考下这里,https://cnodejs.org/topic/5823c4466666620be9438b02a31

好了,到这里应该知道怎木去开发自己的一个cli 脚手架了。

参考

https://juejin.cn/post/6844903702453556666661

https://juejin.cn/post/6857842033084760071

https://www.runoob.com/nodejs/nodejs-fs.html

1. 简单介绍

三月份的一个主要任务是学一门计算机科班的基础课程————《计算机组成原理》,在学之前对这部分的知识似懂非懂,没系统学过所以也不清楚它到底包含哪些知识点,但学完之后又能和本科时期学过的一些课程有了联系,比如最基础的模电和数电,想起之前课设的时候大家熬夜在宿舍做LED时钟显示器的情形,那个时候真的是平时不学习、期末抱佛脚,稀里糊涂四年时间就过去了,算是过的最舒服的几年时光了。但出来混,迟早要还的,现在开始学这些基础课程了。
计算机是一个复杂的结构,深挖计算机的第一性,从最基础的物理知识开始:

  1. 电路:I=U/R,涉及的硬件有电阻、电感、电容,也有电磁学相关的内容,是信息时代快速发展的基石。
  2. 模电: 用连续电信号作为信息载体,处理的是连续变化的模拟量电信号(即其幅值可以是任何值),如无线电、收音机等。不做深入了解。但基础元器件还是三极管。
  3. 数电:主要是针对数字信号处理的模块。数字集成电路的运行以开关状态进行运算,它的精度高,适合复杂的计算。数字电路只要能区分高电平和低电平即可,数字电路不仅能完成数值运算,还可以进行逻辑运算和判断,因此数字电路又称为数字逻辑电路。基础元器件还是三极管,也即晶体管,现在芯片中的基本单位就是这个玩意儿,其尺寸已经达到纳米尺度了。数电中我们需要理解的是与或非门电路,这是构建复杂计算逻辑组合电路的基础。
  4. 计算机组成原理:在数电原理基础上,构建的复杂电路。主要包含的硬件模块有:总线、CPU、主存、I/O设备。下面分别进行简要总结

2. 简单分模块总结

  1. 总线。可以理解为分布在计算机中用于各模块之间通讯、数据传输的线。各模块发出或者可以接受从总线上传输的数据,数据传输的速度和总线的频率和宽度相关。一般我们说的总线是系统总线,这里面又分为控制总线、地址总线和数据总线。此外,CPU内部也可能有用于在cpu内部进行数据和命令传输的总线,可称为内部总线。此外,I/O设备也可以拥有独立的io总线。总之,这些总线都是为传输信号工作的,为了使传输信号更快,总线的连接通路会更多,导致系统看起来非常复杂。
  2. 主存:在计算机运行期间存储数据和代码,CPU在运行期间直接从主存中存取指令和数据。将其理解为一个非常大的数组即可,且可以根据地址高速随机访问。
  3. CPU:这是整个计算机的控制计算中心,所有的任务都是在这里执行的。逻辑上主要包含四个组件:寄存器、ALU、CU和中断系统。
  • 寄存器是用来临时存储一些从内存中拿来或者ALU计算结果的存储单元,如通用寄存器,还有一些是专用寄存器,如PC用于存储下一条指令在内存中的地址,还有状态寄存器。各种cpu其寄存器配置不尽相同,功能都是用来存储的,是计算机中最快的存储器;
  • ALU用做算数逻辑运算,包括浮点、定点的加减乘除和逻辑与或非等能力,这个模块只要理解为按照既定的电路做算数逻辑计算即可,内部设计的组合逻辑电路各个芯片大同小异,有的还复用电路即可用做浮点计算也可以做整形计算,至于对输入的数据做何种运算,是由CU给出的信号(+-*/ &|!)决定的。
  • CU是核心部件,用于将PC中的指令进行解析,讲指令由操作码+操作数组成,CU会将操作码解析为硬件命令信号,将操作数从内存中加载到约定好的寄存器并按照命令运算。这个过程细分为取指+间址(option)+执行+中断(option)。其中执行过程最复杂也最耗时。这些步骤细分还会分为微指令等更细微的操作,涉及到时钟、流水化,微程序控制器等较细的模块,简单了解有这个东西就行。简言之,最简单的计算机就是一直在循环做取指+执行。汇编指令和机器指令是一一对应的,如果有需要做指令级优化的话,掌握相应CPU架构的指令集,写汇编代码即可。
  • 中断系统:中断系统是为相应系统的中断信号设计的,一般由硬件实现和软件实现两种,是速度和灵活性之间权衡的结果。在系统运行过程中,运行的程序本身或者IO设备都有可能发出中断请求,中断程序根据请求的种类不同,在保护现场后,去调用对应的中断程序来处理请求,结束后再返回到之前的程序断点继续执行,类似于函数调用。

3. 杂项

  1. 数的表示和计算。
    计算机顾名思义是用来计算的,那就涉及到数据的存储和计算,数据是如何编码为2进制并存储到内存或硬盘上的,大端法小端法等。ALU里面的加法器是如何实现的,乘法是如何利用加法器工作的等等。其实这些都是组合逻辑电路实现的,和电子时钟原理差不多。当实现了最基本的加减乘除逻辑电路后,复杂的超越函数可以通过泰勒展开等近似方法简化为加减乘除运算,只不过不能一次性在ALU中完成计算,需要通过软件的方式,运行很多个指令才能求解出一个近似值。对于那些专用的计算芯片,可能会将常用的非线性函数如sin/cos/log/e^x等以硬件实现模拟,计算速度会大大提高。当然需要有配套的编译器和指令支持才可以。
  2. 微程序控制单元。取到一条指令后,会被解析为很多粒度更细的指令按照严格顺序执行,是在cpu内部硬件实现的,相当于一个指令解码部件。
  3. 流水线。多个指令并行执行且不干扰执行结果,基本原则是下一条指令使用上一条指令使用过的内部运算或存储体,该存储体或运算单元是当前指令使用完并在指令结束前不会再使用。跳转指令和耗时不均匀的指令会打乱流水、影响效率。流水逻辑是严格保证逻辑和串行执行一致的,在写汇编代码的时候按照串行写即可。

4. 结语

这门课是在
计算机组成原理-刘宏伟-B站
跟着视频学的,中间太深太细的没有深究,毕竟工作中用不到,只把组成原理的整体结构和知识点看完了,也做了这篇总结,主要是为了建立计算机组成原理的基本知识点范畴,并把硬件知识和上层代码联系起来,也把计算机硬件和之前学的数电建立了链接。接下来,还有一门操作系统,是重中之重。下个月主攻CS,道阻且长行则将至。

KafkaConsumer 的概念

消费者 & 消费者群组

消费者读取消息。在其他基于发布与订阅的消息系统中,消费者可能被称为订阅者 或 读者。

消费者订阅一个或多个主题,并按照消息生成的顺序读取它们。消费者通过检查消息的偏移量来区分已经读取过的消息。

偏移量是一种元数据,它是一个不断递增的整数值,在创建消息时, Kafka 会把偏移量添加到消息里。在给定的分区里,每个消息的偏移量都是唯一的。消费者把每个分区最后读取的消息的偏移量保存在 Zookeeper 或 Kafka 上,如果消费者关闭或重启,它的读取状态不会丢失。


消费者群组

消费者是消费者群组的一部分。一个群组里的消费者订阅的是同一个主题,每个消费者接收主题的一部分分区的消息。消费者群组保证每个分区只能被一个消费者使用 。消费者与分区之间的映射通常被称为消费者对分区的所有权关系。

通过消费者群组的方式,消费者可以消费包含大量消息的主题。而且,如果一个消费者失效,消费者群组里的其他消费者可以接管失效消费者的工作。

往群组里增加消费者是横向伸缩消费能力的主要方式。Kafka 消费者经常会做一些高延迟的操作,比如把数据写到数据库或 HDFS,或者使用数据进行比较耗时的计算。在这些情况下,单个消费者无法跟上数据生成的速度,所以可以增加更多的消费者,让它们分担负载,每个消费者只处理部分分区的消息,这就是横向伸缩的主要手段。

image-20230325085315950.png

分区再均衡

当一个消费者被关闭或发生崩溃时,这个消费者就离开群组,原本由它读取的分区将由消费者群组里的其他消费者来读取。

当一个新的消费者加入消费者群组时,这个新的消费者读取的是原本由其他消费者读取的消息。

在主题发生变化时,比如管理员添加了新的分区,会发生分区重分配。分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡。分区再均衡非常重要,它为消费者群组带来了高可用性和伸缩性(我们可以放心地添加或移除消费者),不过在正常情况下,我们并不希望发生分区再均衡。原因如下:

  • 在分区再均衡期间,消费者无法读取消息,造成整个消费者群组一小段时间的不可用。
  • 另外,当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。


分区再均衡的过程

消费者通过向被指派为群组协调器的 broker(不同的消费者群组可以有不同的协调器)发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。消费者会在轮询消息(为了获取消息)或提交偏移量时发送心跳。

  • 只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。
  • 如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为消费者已经死亡,就会触发一次分区再均衡。

如果一个消费者发生崩溃,并停止读取消息,群组协调器会等待几秒钟,确认消费者已经死亡了才会触发分区再均衡。在清理消费者时,消费者会通知群组协调器它自己将要离开消费者群组,群组协调器会立即触发一次分区再均衡,尽量降低处理停顿。


分配分区的过程:

  • 当消费者要加入消费者群组时,消费者会向群组协调器发送一个 JoinGroup 请求。第一个加入群组的消费者将成为“群主”。

  • 群主从群组协调器那里获得群组的成员列表(列表中包含了所有最近发送过心跳的消费者,它们被认为是活跃的),并负责给每一个消费者分配分区。它使用一个实现了 PartitionAssignor 接口的类来决定哪些分区应该被分配给哪个消费者,Kafka 内置了两种分区分配策略。

  • 群主将分区分配完毕之后,群主把分区的分配情况列表发送给群组协调器,群组协调器再把这些信息发送给所有消费者。

每个消费者只能看到自己的分区分配信息,只有群主知道群组里所有消费者的分配信息。这个过程会在每次分区再均衡时重复发生。消费者群组的群主应该保证在分配分区时,尽可能少的改变原有的分区和消费者的映射关系。

订阅主题 & 轮询

应用程序使用 KafkaConsumer 向 Kafka 订阅主题,并从订阅的主题上接收消息。

应用程序调用 kafkaConsumer 的 subscribe() 方法订阅主题:

  • 我们可以在调用 subscribe() 方法时传入一个主题列表作为参数。
  • 我们也可以在调用 subscribe() 方法时传入一个正则表达式,正则表达式可以匹配多个主题,如果有人创建了新的主题,并且主题的名字与正则表达式匹配,那么会立即触发一次分区再均衡,消费者就可以读取新添加的主题了。


轮询

消费者通过一个简单的轮询向服务器请求数据。一旦消费者订阅了主题,轮询就会处理所有的细节,包括消费者群组协调、分区再均衡、发送心跳和获取数据,开发者只需要使用一组简单的 API 来处理从分区返回的数据。

轮询不只是获取数据那么简单。在第一次调用新消费者的 poll() 方法时,它会负责查找 GroupCoordinator,然后加入群组,接受分配的分区。如果发生了分区再均衡,整个过程也是在轮询期间进行的。当然,心跳也是从轮询里发送出去的。所以,我们要确保在轮询期间所做的任何处理工作都应该尽快完成。

提交 & 偏移量

我们把更新分区当前位置的操作叫作提交。那么消费者是如何提交偏移量的呢?消费者往一个叫作 _consumer_offset 的特殊主题发送消息,消息里包含每个分区的偏移量。

如果消费者一直处于运行状态,那么偏移量就没有什么用处。不过,如果消费者发生崩溃或者有新的消费者加入群组,就会触发分区再均衡,完成分区再均衡之后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。

  • 如果消费者提交的偏移量 小于 客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理
  • 如果消费者提交的偏移量 大于 客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失

所以,处理偏移量的方式对客户端会有很大的影响。KafkaConsumer API 提供了很多种方式来提交偏移量:自动提交偏移量、手动提交偏移量。

自动提交

如果 enable.auto.commit 被设为 true,那么每过 5s,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。提交的时间间隔由 auto.commit.interval.ms 控制,默认值是 5s。

与消费者里的其他东西一样,自动提交也是在轮询里进行的。消费者每次在进行轮询时会检查是否应该提交偏移量了,如果距离上次的提交时间已经超过了配置参数 auto.commit.interval.ms 指定的值,那么就会提交上一次轮询返回的偏移量。

在调用 close() 方法之前也会进行自动提交。


让消费者自动提交偏移量是最简单的方式。不过,在使用这种简便的方式之前,需要知道自动提交将会带来怎样的结果。

假设我们使用默认的 5s 提交时间间隔,在最近一次提交之后的 3s 发生了分区再均衡,分区再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了 3s,所以在这 3s 内消费者已经处理过的消息会再被重复处理。我们可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗口,不过这种情况是无法完全避免的。

手动提交

手动提交指的是,把 auto.commit.offset 设为 false,让应用程序决定何时提交偏移量。应用程序可以使用 commitSync()、commitAsync() 方法手动提交偏移量

  • commitSync 同步提交偏移量:手动提交偏移量之后,同步等待 broker 响应。commitSync() 方法会提交由 poll() 方法返回的最新偏移量,只要没有发生不可恢复的错误,commitSync() 方法会一直尝试直至提交成功。如果提交失败就抛出异常,我们也只能把异常记录到错误日志里。
  • commitAsync 异步提交偏移量:手动提交偏移量之后,不等待 broker 响应,而是在提交偏移量时指定一个回调方法,在 broker 作出响应时会执行这个回调方法。回调经常被用于记录提交错误或生成度量指标。在成功提交或碰到无怯恢复的错误之前,commitSync() 会一直重试,但是 commitAsync() 不会重试。

消费者也可以提交特定的偏移量
:消费者 API 允许在调用 commitSync() 和 commitAsync() 方法时传进去希望提交的分区和偏移量的 map,这样我们就可以提交特定的偏移量。需要使用期望处理的下一个消息的偏移量更新 map 里的偏移量。

异步提交
:同步提交有一个不足之处,在 broker 对提交请求作出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。我们可以通过降低提交频率来提升吞吐量,但如果发生了分区再均衡,会增加重复消息的数量。这个时候我们可以使用异步提交,我们只管发送提交请求,无需等待 broker 的响应。

再均衡监听器

在【分区再均衡前后】、【消费者开始读取消息之前】、【消费者停止读取消息之后】我们可以通过消费者 API 执行一些应用程序代码,在调用 kafkaConsumer 的 subscribe() 方法时传进去一个 ConsumerRebalanceListener 实例就可以了。

再均衡监听器 ConsumerRebalanceListener 有两个需要实现的方法。

  1. public void onPartitionsRevoked(Collection< TopicPartition > partitions):该方法会在【分区再均衡开始之前】和【消费者停止读取消息之后】被调用。我们可以在消费者失去分区所有权之前,通过 onPartitionsRevoked() 方法来提交偏移量。如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取消息了。
  2. public void onPartitionsAssigned(Collection< TopicPartition > partitions):该方法会在【重新分配分区之后】和【消费者开始读取消息之前】被调用。我们可以在消费者获取分区所有权之后,通过 onPartitionsAssigned() 方法来指定读取消息的起始偏移量。保证消费者总是能够从正确的位置开始读取消息。

如何退出

如果消费者确定要退出循环,需要通过另一个线程调用 consumer.wakeup() 方法。

consumer.wakeup() 是消费者唯一一个可以从其他线程里安全调用的方法。

调用 consumer.wakeup() 可以退出 poll(),并抛出 WakeupException 异常,或者如果调用 consumer.wakeup() 时线程没有等待轮询,那么异常将在下一轮调用 poll() 时抛出。我们不需要处理 WakeupException,因为它只是用于跳出循环的一种方式。

独立消费者

我们可能只需要一个消费者从一个主题的所有分区或者某个特定的分区读取数据。这个时候就不需要消费者群组和分区再均衡了,只需要把主题或者分区分配给消费者,然后开始读取消息并提交偏移量。

如果是这样的话,就不需要订阅主题,取而代之的是为自己分配分区。一个消费者可以订阅主题(并加入消费者群组),或者为自己分配分区,但不能同时做这两件事情。

独立消费者除了不会发生分区再均衡,也不需要手动查找分区,其他的看起来一切正常。不过要记住,如果主题增加了新的分区,消费者并不会收到通知。所以,要么周期性地调用 consumer.partitionsFor() 方法来检查是否有新分区加入,要么在添加新分区后重启应用程序。

public void singleCustomer() {
    // 向集群请求主题可用的分区。如果只打算读取特定分区,可以跳过这一步
    List<PartitionInfo> partitionInfos = consumer.partitionsFor("topic");
    ArrayList<TopicPartition> partitions = new ArrayList<>();

    if (partitionInfos != null) {
        for (PartitionInfo partition : partitionInfos) {
            partitions.add(new TopicPartition(partition.topic(), partition.partition()));
        }
        // 为自己分配分区
        consumer.assign(partitions);

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
            }
            consumer.commitSync();
        }
    }
}

消费者的示例代码

再均衡监听器

public class MyConsumerRebalanceListener implements ConsumerRebalanceListener {

    private KafkaConsumer consumer;

    public void MyConsumerRebalanceListener(KafkaConsumer consumer) {
        this.consumer = consumer;
    }

    public static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();

    /**
     * 在消费者失去分区所有权之前,提交偏移量
     *
     * @param partitions
     */
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        System.out.println("Lost partitions in rebalance. Committing current offsets:" + currentOffsets);
        consumer.commitSync(currentOffsets);
    }

    /**
     * 在消费者获取分区所有权之后,指定读取消息的起始偏移量
     *
     * @param partitions
     */
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        long offset = 0;
        for (TopicPartition partition : partitions) {
            consumer.seek(partition, offset);
        }
    }
}

消费者读取消息

public void customer() {
    consumer.subscribe(Collections.singletonList("MyTopic"), new MyConsumerRebalanceListener(consumer));

    // 如果不需要手动指定消费者读取消息的起始偏移量,下面的代码不是必须的
    consumer.poll(0);
    long offset = 0;
    for (TopicPartition partition : consumer.assignment()) {
        consumer.seek(partition, offset);
    }

    try {
        while (true) {
            // 参数是一个超时时间,用于控制 poll() 方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)。
            // 如果该参数被设为 0,poll() 会立即返回,否则它会在指定的毫秒数内一直等待 broker 返回数据。
            ConsumerRecords<String, String> records = consumer.poll(100);

            for (ConsumerRecord<String, String> record : records) {
                // 每条记录都包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量、消息以及消息键。
                System.out.printf("topic = %s, partition = %s, offset = % d, customer = %s, country = %s\n ", record.topic(), record.partition(), record.offset(), record.key(), record.value());

                // 将记录保存到数据存储系统里
                System.out.println(record);

                currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no metadata"));
            }

            // 如果一切正常,我们使用 commitAsync() 方法来提交。这样速度更快,而且即使这次提交失败,下一次提交很可能会成功。
            // 使用 commitAsync() 方法只会执行一次提交,不会重试
            consumer.commitAsync(currentOffsets, new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
                    if (e != null) {
                        log.error("Commit failed for offsets {}", map, e);
                    }
                }
            });
        }
    } catch (WakeupException e) {
        // 我们不需要处理 WakeupException,因为它只是用于跳出循环的一种方式
    } catch (Exception e) {
        log.error("Unexpected error", e);
    } finally {
        try {
            // 如果直接关闭消费者,就没有所谓的“下一次提交”了。使用 commitSync() 方法会一直重试,直到提交成功或发生无法恢复的错误。
            consumer.commitSync();
        } finally {
            // 在退出应用程序之前使用 close() 方法关闭消费者。网络连接和 socket 也会随之关闭
            // 并向群组协调器发送消息,告知自己要离开群组,接下来就会触发再均衡,而不需要等待会话超时
            consumer.close();
            System.out.println("Closed consumer and we are done");
        }
    }
}

参考资料

《Kafka 权威指南》第 4 章:Kafka 消费者——从 Kafka 读取数据