2022年8月

Bond简介

生产环境必须提供 7×24 小时的网络传输服务。借助于网卡绑定技术,不仅可以提高网络传输速度,更重要的是,还可以确保在其中一块网卡出现故障时,依然可以正常提供网络服务。假设我们对两块网卡实施了绑定技术,这样在正常工作中它们会共同传输数据,使得网络传输的速度变得更快;而且即使有一块网卡突然出现了故障,另外一块网卡便会立即自动顶替上去,保证数据传输不会中断。

Bond内核网卡绑定驱动模式

常用:
mode0(平衡负载模式):平时两块网卡均工作,且自动备援,但需要在与服务器本地网卡相连的交换机设备上进行端口聚合来支持绑定技术。
mode1(自动备援模式):平时只有一块网卡工作,在它故障后自动替换为另外的网卡。
mode6(平衡负载模式):平时两块网卡均工作,且自动备援,无须交换机设备提供辅助支持。

1.  mode=0  round-robin2.  mode=1  active-backup3.  mode=2 load balancing (xor)4.  mode=3  fault-tolerance (broadcast)5.  mode=4  lacp6.  mode=5  transmit load balancing7.  mode=6  adaptive load balancing

分别对应以下七种策略:(1)轮询策略(Round-robin policy),模式代号是0。该策略是按照设备顺序依次传输数据包,直到最后一个设备。这种模式提供负载均衡和容错能力。(2)活动备份策略(Active-backup policy),模式代号是1。该策略只有一个设备处理数据,当它宕机的时候就会由备份代替,仅提供容错能力。(3)异或策略(XOR policy),模式代号是2。该策略是根据MAC地址异或运算的结果来选择传输设备,提供负载均衡和容错能力。(4)广播策略(Broadcast policy),模式代号是3。该策略通过全部设备来传输所有数据,提供容错能力。(5)IEEE 802.3ad 动态链接聚合(IEEE 802.3ad Dynamic link aggregation),模式代号是4。该策略通过创建聚合组来共享相同的传输速度,需要交换机也支持 802.3ad 模式,提供容错能力。该模式下,网卡带宽最高可以翻倍(如从1Gbps翻到2Gbps)。(6)适配器传输负载均衡(Adaptive transmit load balancing),模式代号是5。该策略是根据当前的负载把发出的数据分给每一个设备,由当前使用的设备处理收到的数据,如果当前正用于接收数据的网卡发生故障,则由其它网卡接管,要求所用的网卡及网卡驱动可通过ethtool命令得到speed信息。本策略的通道联合不需要专用的交换机支持,提供负载均衡和容错能力。(7)适配器负载均衡(Adaptive load balancing),模式代号是6。该策略在IPV4情况下包含适配器传输负载均衡策略,由ARP协商完成接收的负载,通道联合驱动程序截获ARP在本地系统发送出的请求,用其中一个设备的硬件地址覆盖从属设备的原地址。即在策略6的基础之上,在接收数据的同时实现负载均衡,除要求ethtool命令可得到speed信息外,还要求支持对网卡MAC地址的动态修改功能。

注意:
Mode参数中的0、2、3、4模式要求交换机支持“ports group”功能并能进行相应的设置,例如在cisco中要将所连接的端口设为“port-channel”。
如果系统流量不超过单个网卡的带宽,请不要选择使用mode1之外的模式,因为负载均衡需要对流量进行计算,这对系统性能会有所损耗。
如果交换机及网卡都确认支持802.3ab,则实现负载均衡时尽量使用模式4以提高系统性能。
由于我们项目上的交换机型号不可控,且可能会更换,所以我们项目中设备绑定一般都使用mode5。

配置bond(单IP双网卡)

1、配置两块网卡配置信息

[root@CentOS ~]# vim /etc/sysconfig/network-scripts/ifcfg-ens33
TYPE=EthernetBOOTPROTO=none
ONBOOT=yes
DEVICE=ens33
MASTER=bond0
SLAVE=yes[root@CentOS ~]# vim /etc/sysconfig/network-scripts/ifcfg-ens37
TYPE=EthernetBOOTPROTO=none
ONBOOT=yes
DEVICE=ens37
MASTER=bond0
SLAVE=yes
2、配置bond网卡信息

[root@CentOS ~]# vim /etc/sysconfig/network-scripts/ifcfg-bond0 
TYPE=EthernetBOOTPROTO=none
ONBOOT=yes
DEVICE=bond0
IPV6INIT=no   # 关闭IPv6PEERDNS=yes   # 运行网卡在启动时向DHCP服务器查询DNS信息,并自动覆盖/etc/resolv.conf配置文件
IPADDR=192.168.1.210NETMASK=255.255.255.0GATEWAY=192.168.1.2DNS1=114.114.114.114DNS2=223.5.5.5
3、配置内核网卡驱动模式

[root@CentOS ~]# vim /etc/modprobe.d/bond.conf 
alias bond0 bonding
options bonding mode=6 miimon=200# miimon参数:指定网卡故障时切换时间间隔时间,以ms为单位# mode参数:bonding模式
4、重启网络

[root@CentOS ~]# systemctl restart network

配置bond(双IP双网卡,不同VLAN)

1、配置两块网卡配置信息

[root@CentOS ~]# vim /etc/sysconfig/network-scripts/ifcfg-ens33
TYPE=EthernetPROXY_METHOD=none
BROWSER_ONLY=no
BOOTPROTO=no
DEFROUTE=yes
NAME=ens33
DEVICE=ens33
ONBOOT=yes
MASTER=bond0
SLAVE=yes[root@CentOS ~]# vim /etc/sysconfig/network-scripts/ifcfg-ens37
TYPE=EthernetPROXY_METHOD=none
BROWSER_ONLY=no
BOOTPROTO=no
DEFROUTE=yes
NAME=ens37
DEVICE=ens37
ONBOOT=yes
MASTER=bond0
SLAVE=yes
2、配置bond网卡信息

[root@CentOS ~]# vim /etc/sysconfig/network-scripts/ifcfg-bond0 
DEVICE=bond0
BOOTPROTO=none
IPV6INIT=no
NM_CONTROLLED=no
ONBOOT=yes
TYPE=EthernetBONDING_OPTS="mode=4 miimon=100"    # 绑定模式选择
3、配置bond子网卡信息

[root@CentOS ~]# vim /etc/sysconfig/network-scripts/ifcfg-bond0.30DEVICE=bond0.30BOOTPROTO=none
NM_CONTROLLED=no
ONBOOT=yes
IPADDR=10.0.0.102NETMASK=255.255.255.0GATEWAY=10.0.0.1VLAN=yes
DNS1=223.5.5.5DNS2=114.114.114.114[root@CentOS ~]# vim /etc/sysconfig/network-scripts/ifcfg-bond0.51DEVICE=bond0.51BOOTPROTO=none
NM_CONTROLLED=no
ONBOOT=yes
IPADDR=157.0.0.202NETMASK=255.255.255.0GATEWAY=157.0.0.1VLAN=yes
DNS1=223.5.5.5DNS2=114.114.114.114
4、重启网络

[root@CentOS ~]# systemctl restart network



作者:运维猿Winter
链接:https://www.jianshu.com/p/2d5773b8e981
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。


一、引子

上一篇文章,我们讲述了:《MySQL 如何保证数据不丢失?》,介绍了 binlogredo log 的工作流程。
那么,MySQL 怎么保证高可用呢?
为了提高 MySQL 的读写性能,我们往往采用 MySQL 一主多从的方案。
即一个主库(主要负责写),多个从库(只负责读)。
因为单实例有性能瓶颈,多从库能优先解决 MySQL 的读负载压力。

二、主从同步

主从同步(简化)

原理:

MySQL 设计成一主多从模式。

简单来说,主要分为三步:

  • 第一步:所有增删改的 DML 语句都在 master 节点的示例上完成。

  • 第二步:将处理完成的 binlog 日志传输到各个 slave 节点。

  • 第三步:多个 slave 节点处理 binlog,从而保持主从一致。

详细来说,

主从同步(详细)

MasterSlave 之间会维护一个长连接,专门用来同步binlog

创建从库的过程:

  1. Slave 机器上,通过 change master 命令,设置主库的 IP、端口号、用户名、密码,以及binlog 从哪里开始获取等信息(具体binlog文件名 + 文件偏移量)。

  2. Slave 机器上,执行start slave命令,启动 io_threadsql_thread 线程。
    其中 io_thread 用于接收主库的 binlogsql_thread 用于处理主库的 binlog

  3. Slave 开始尝试连接 MasterMaster 校验完用户名密码后,dump_thread 根据 Slave 设置的 binlog 文件和偏移量,开始读取 binlog 发送给 Slave

  4. Slaveio_thread 将接收到的 binlog 写到 relay log (中转日志)。

  5. sql_thread 读取中转日志,执行对应SQL,同步完成。

问题:

1. 主从延迟

即“同步延迟”。
表示同一个事务下,主库执行完成到备库执行完成的时间差值。

主从延迟时间

时间线:

  1. Master 执行一个事务,成功写入binlog —— 这个时刻,我们记为 T1

  2. Slaveio_thread 接收到binlog —— 这个时刻,我们记为 T2

  3. Slave执行完这个事务。—— 这个时刻,我们记为 T3

所谓主从延迟,就是 T3-T1 的时间。

如果在这段时间里,在从库上查询主库刚插入/修改的数据,会出现主从不一致的现象。
这时,一些对可靠性要求比较高的业务场景里,就会出现错误。
我们可以在从库上执行:

show slave status;

其中,seconds_behind_master 就是从库延迟的时间(T3-T1

主从延迟的根本原因是:从库消费中转日志(relay log)的速度比主库生产 binlog 的速度慢。

2. 主从切换

在实际场景下,可能会遇到主库所在机器异常、掉电、或者机房升级等等。
这就会涉及到“主库”与“从库”之间的切换问题。
由于主从延迟的存在,在主从切换的时候,就会有不同的策略。

主从切换

可靠性优先策略(推荐):

  1. 查询 slaveseconds_behind_master,如果小于预定的某个值(比如3秒),就下一步。
    否则就一直轮训,直到出现满足条件的Slave。(选未来主库)

  2. masterreadonly = true,降为从库。

  3. 查询该 slave(未来主库) 的 seconds_behind_master 值变成 0。(即无主从延迟)

  4. 将该 slave (未来主库)的状态变成读写。readonly = false,升成主库。

  5. 将请求流量切到新主库。

  • 优点:可靠性高,数据可靠。

  • 缺点:会有一小段不可用的时间。

因此,得选择 seconds_behond_master 比较短的 slavemaster

可用性优先策略:

  1. 直接将 slave (未来主库)的状态变成读写。readonly = false,升成主库。

  2. 将请求流量切到新主库。

  3. 将老主库的 readonly = true,降为从库。

  • 优点:可用性高,没有真空期。

  • 缺点:可能会出现数据不一致的情况。

三、如何保证高可用

MySQL 如果要保证高可用,就要满足三个条件。

  1. 数据不丢失。(双1策略)

  2. 主从最终一致性。(主库所有binlog,备库都执行了)

  3. 无主从延迟。

主从延迟的来源:

1. Slave 所在机器性能问题。(部署在同一机器上)

我就遇到过这种 case:
我们的数据库和飞书的数据库部署在同一个机器上,
他们在大量的做一些DML操作,删除/归档很多老数据。
导致于我们的Slave资源被一直抢占,进而出现主从延迟。

解决思路:

  1. 如果成本允许,按服务,分开独立部署。

2. Slave 压力大,查询耗费了大量CPU资源,影响了同步速度。

这种也比较常见,表/索引设计不合理、或者有临时任务在拖库,导致慢慢查询,耗费了大量CPU资源。导致 io_threadsql_thread 抢占不到资源进而同步缓慢。

解决思路:
1.优化表设计、索引设计。解决慢 SQL 问题。
2.增加从库,分担现有从库的压力。
3.对于一些临时/定时任务:可用 Binlog -> Hadoop。转移让另外一个系统来提供查询能力。

3. 大事务

这种也比较好理解,主库上执行一个大事务花了n分钟,那么大概率就会导致从库延迟n分钟。
比如,磁盘空间快满了,需要归档一些历史数据,需要一次性删除大量历史数据。这时候和就会出现主从延迟。

解决思路:
1.业务允许的话,控制每个事务的数据量,分成多次操作。



作者:齐舞647
链接:https://www.jianshu.com/p/4f640003027e
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。


一、前言:

  • Socket的使用在 Android网络编程中非常重要

  • 今天我将带大家全面了解  Socket 及 其使用方法

    目录.png

二、详解:

1、网络基础

2、Socket定义

  • 即套接字,是应用层 与 TCP/IP 协议族通信的中间软件抽象层,表现为一个封装了 TCP / IP协议族 的编程接口(API)

    图片.png

1、Socket不是一种协议,而是一个编程调用接口(API),属于传输层(主要解决数据如何在网络中传输);
2、即:通过Socket,我们才能在Andorid平台上通过 TCP/IP协议进行开发;
3、对用户来说,只需调用Socket去组织数据,以符合指定的协议,即可通信;

  • 成对出现,一对套接字:

Socket ={(IP地址1:PORT端口号),(IP地址2:PORT端口号)}
  • 一个 Socket 实例 唯一代表一个主机上的一个应用程序的通信链路

3、建立Socket连接过程

图片.png

4、原理

Socket的使用类型主要有两种:

  • 流套接字(streamsocket) :基于 TCP协议,采用 流的方式 提供可靠的字节流服务

  • 数据报套接字(datagramsocket):基于 UDP协议,采用 数据报文 提供数据打包发送的服务

具体原理图如下:

图片.png

5、Socket 与 Http 对比

  • Socket属于传输层,因为 TCP / IP协议属于传输层,解决的是数据如何在网络中传输的问题

  • HTTP协议属于 应用层,解决的是如何包装数据

由于二者不属于同一层面,所以本来是没有可比性的。但随着发展,默认的Http里封装了下面几层的使用,所以才会出现Socket & HTTP协议的对比:(主要是工作方式的不同):

  • Http:采用 请求—响应 方式。

1、即建立网络连接后,当 客户端 向 服务器 发送请求后,服务器端才能向客户端返回数据。
2、可理解为:是客户端有需要才进行通信

  • Socket:采用 服务器主动发送数据 的方式

1、即建立网络连接后,服务器可主动发送消息给客户端,而不需要由客户端向服务器发送请求
2、可理解为:是服务器端有需要才进行通信

6、使用步骤

  • Socket可基于TCP或者UDP协议,但TCP更加常用

  • 所以下面的使用步骤 & 实例的Socket将基于TCP协议

// 步骤1:创建客户端 & 服务器的连接

   // 创建Socket对象 & 指定服务端的IP及端口号 
   Socket socket = new Socket("192.168.1.32", 1989);  

   // 判断客户端和服务器是否连接成功  
   socket.isConnected());

                    // 步骤2:客户端 & 服务器 通信// 通信包括:客户端 接收服务器的数据 & 发送数据 到 服务器

   <-- 操作1:接收服务器的数据 -->
       
           // 步骤1:创建输入流对象InputStream
           InputStream is = socket.getInputStream() 

           // 步骤2:创建输入流读取器对象 并传入输入流对象
           // 该对象作用:获取服务器返回的数据
           InputStreamReader isr = new InputStreamReader(is);
           BufferedReader br = new BufferedReader(isr);

           // 步骤3:通过输入流读取器对象 接收服务器发送过来的数据
           br.readLine();   <-- 操作2:发送数据 到 服务器 -->                  

           // 步骤1:从Socket 获得输出流对象OutputStream
           // 该对象作用:发送数据
           OutputStream outputStream = socket.getOutputStream(); 

           // 步骤2:写入需要发送的数据到输出流对象中
           outputStream.write(("Carson_Ho"+"\n").getBytes("utf-8"));           // 特别注意:数据的结尾加上换行符才可让服务器端的readline()停止阻塞

           // 步骤3:发送数据到服务端 
           outputStream.flush();  // 步骤3:断开客户端 & 服务器 连接

            os.close();
           // 断开 客户端发送到服务器 的连接,即关闭输出流对象OutputStream

           br.close();
           // 断开 服务器发送到客户端 的连接,即关闭输入流读取器对象BufferedReader

           socket.close();
           // 最终关闭整个Socket连接

三、具体实例

  • 实例 Demo 代码包括:客户端 & 服务器

  • 本文着重讲解客户端,服务器仅采用最简单的写法进行展示

1、客户端 实现

步骤1:加入网络权限

<uses-permission android:name="android.permission.INTERNET" />

步骤2:主布局界面设置

包括创建Socket连接、客户端 & 服务器通信的按钮

    <Button
        android:id="@+id/connect"
        android:layout_width="match_parent"
        android:layout_height="wrap_content"
        android:text="connect" />

    <Button
        android:id="@+id/disconnect"
        android:layout_width="match_parent"
        android:layout_height="wrap_content"
        android:text="disconnect" />

    <TextView
        android:id="@+id/receive_message"
        android:layout_width="match_parent"
        android:layout_height="wrap_content" />

    <Button
        android:id="@+id/Receive"
        android:layout_width="match_parent"
        android:layout_height="wrap_content"
        android:text="Receive from message" />

    <EditText
        android:id="@+id/edit"
        android:layout_width="match_parent"
        android:layout_height="wrap_content" />

    <Button
        android:id="@+id/send"
        android:layout_width="match_parent"
        android:layout_height="wrap_content"
        android:text="send"/>

步骤3:创建Socket连接、客户端 & 服务器通信

MainActivity.java

package scut.carson_ho.socket_carson;import android.os.Bundle;import android.os.Handler;import android.os.Message;import android.support.v7.app.AppCompatActivity;import android.view.View;import android.widget.Button;import android.widget.EditText;import android.widget.TextView;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStream;import java.io.InputStreamReader;import java.io.OutputStream;import java.net.Socket;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class MainActivity extends AppCompatActivity {

    /**
     * 主 变量
     */

    // 主线程Handler
    // 用于将从服务器获取的消息显示出来
    private Handler mMainHandler;

    // Socket变量
    private Socket socket;

    // 线程池
    // 为了方便展示,此处直接采用线程池进行线程管理,而没有一个个开线程
    private ExecutorService mThreadPool;

    /**
     * 接收服务器消息 变量
     */
    // 输入流对象
    InputStream is;

    // 输入流读取器对象
    InputStreamReader isr ;
    BufferedReader br ;

    // 接收服务器发送过来的消息
    String response;


    /**
     * 发送消息到服务器 变量
     */
    // 输出流对象
    OutputStream outputStream;

    /**
     * 按钮 变量
     */

    // 连接 断开连接 发送数据到服务器 的按钮变量
    private Button btnConnect, btnDisconnect, btnSend;

    // 显示接收服务器消息 按钮
    private TextView Receive,receive_message;

    // 输入需要发送的消息 输入框
    private EditText mEdit;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        /**
         * 初始化操作
         */

        // 初始化所有按钮
        btnConnect = (Button) findViewById(R.id.connect);
        btnDisconnect = (Button) findViewById(R.id.disconnect);
        btnSend = (Button) findViewById(R.id.send);
        mEdit = (EditText) findViewById(R.id.edit);
        receive_message = (TextView) findViewById(R.id.receive_message);
        Receive = (Button) findViewById(R.id.Receive);

        // 初始化线程池
        mThreadPool = Executors.newCachedThreadPool();


        // 实例化主线程,用于更新接收过来的消息
        mMainHandler = new Handler() {
            @Override
            public void handleMessage(Message msg) {
                switch (msg.what) {
                    case 0:
                        receive_message.setText(response);
                        break;
                }
            }
        };


        /**
         * 创建客户端 & 服务器的连接
         */
        btnConnect.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {

                // 利用线程池直接开启一个线程 & 执行该线程
                mThreadPool.execute(new Runnable() {
                    @Override
                    public void run() {

                        try {

                            // 创建Socket对象 & 指定服务端的IP 及 端口号
                            socket = new Socket("192.168.1.172", 8989);

                            // 判断客户端和服务器是否连接成功
                            System.out.println(socket.isConnected());

                        } catch (IOException e) {
                            e.printStackTrace();
                        }

                    }
                });

            }
        });

        /**
         * 接收 服务器消息
         */
        Receive.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {

                // 利用线程池直接开启一个线程 & 执行该线程
                mThreadPool.execute(new Runnable() {
                    @Override
                    public void run() {

                          try {
                            // 步骤1:创建输入流对象InputStream
                            is = socket.getInputStream();

                              // 步骤2:创建输入流读取器对象 并传入输入流对象
                              // 该对象作用:获取服务器返回的数据
                              isr = new InputStreamReader(is);
                              br = new BufferedReader(isr);

                              // 步骤3:通过输入流读取器对象 接收服务器发送过来的数据
                              response = br.readLine();

                              // 步骤4:通知主线程,将接收的消息显示到界面
                              Message msg = Message.obtain();
                              msg.what = 0;
                              mMainHandler.sendMessage(msg);

                        } catch (IOException e) {
                            e.printStackTrace();
                        }

                    }
                });

            }
        });


        /**
         * 发送消息 给 服务器
         */
        btnSend.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {

                // 利用线程池直接开启一个线程 & 执行该线程
                mThreadPool.execute(new Runnable() {
                    @Override
                    public void run() {

                        try {
                            // 步骤1:从Socket 获得输出流对象OutputStream
                            // 该对象作用:发送数据
                            outputStream = socket.getOutputStream();

                            // 步骤2:写入需要发送的数据到输出流对象中
                            outputStream.write((mEdit.getText().toString()+"\n").getBytes("utf-8"));
                            // 特别注意:数据的结尾加上换行符才可让服务器端的readline()停止阻塞

                            // 步骤3:发送数据到服务端
                            outputStream.flush();

                        } catch (IOException e) {
                            e.printStackTrace();
                        }

                    }
                });

            }
        });


        /**
         * 断开客户端 & 服务器的连接
         */
        btnDisconnect.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {

                try {
                    // 断开 客户端发送到服务器 的连接,即关闭输出流对象OutputStream
                    outputStream.close();

                    // 断开 服务器发送到客户端 的连接,即关闭输入流读取器对象BufferedReader
                    br.close();

                    // 最终关闭整个Socket连接
                    socket.close();

                    // 判断客户端和服务器是否已经断开连接
                    System.out.println(socket.isConnected());

                } catch (IOException e) {
                    e.printStackTrace();
                }

            }
        });


    }}

2、服务器 实现

  • 因本文主要讲解客户端,所以服务器仅仅是为了配合客户端展示;

  • 为了简化服务器使用,此处采用Mina框架

  1. 服务器代码请在eclipse平台运行

  2. 按照我的步骤一步步实现就可以无脑运行了

步骤1:导入Mina

请直接移步到百度网盘:下载链接(密码: q73e)

图片.png

步骤2:创建服务器线程
TestHandler.java

package mina;// 导入包public class TestHandler extends IoHandlerAdapter {

    @Override
    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
        System.out.println("exceptionCaught: " + cause);
    }

    @Override
    public void messageReceived(IoSession session, Object message) throws Exception {
        System.out.println("recieve : " + (String) message);
        session.write("hello I am server");
    }

    @Override
    public void messageSent(IoSession session, Object message) throws Exception {

    }

    @Override
    public void sessionClosed(IoSession session) throws Exception {
        System.out.println("sessionClosed");
    }

    @Override
    public void sessionOpened(IoSession session) throws Exception {
        System.out.println("sessionOpen");
    }

    @Override
    public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
    }}

步骤3:创建服务器主代码
TestHandler.java

package mina;import java.io.IOException;import java.net.InetSocketAddress;import org.apache.mina.filter.codec.ProtocolCodecFilter;import org.apache.mina.filter.codec.textline.TextLineCodecFactory;import org.apache.mina.transport.socket.nio.NioSocketAcceptor;public class TestServer {
    public static void main(String[] args) {
        NioSocketAcceptor acceptor = null;
        try {
            acceptor = new NioSocketAcceptor();
            acceptor.setHandler(new TestHandler());
            acceptor.getFilterChain().addLast("mFilter", new ProtocolCodecFilter(new TextLineCodecFactory()));
            acceptor.setReuseAddress(true);
            acceptor.bind(new InetSocketAddress(8989));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }}

至此,客户端 & 服务器的代码均实现完毕。

3、测试结果

  • 点击 Connect按钮: 连接成功


    图片.png

  • 输入发送的消息,点击 Send 按钮发送

图片.png

  • 服务器接收到客户端发送的消息

图片.png

  • 点击 Receive From Message按钮,客户端 读取 服务器返回的消息


    图片.png

  • 点击 DisConnect按钮,断开 客户端 & 服务器的连接

图片.png

图片.png

4、源码地址

Carson_Ho的Github地址:Socket具体实例



作者:北风吹过
链接:https://www.jianshu.com/p/964d8a955a21
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。


大家好,今天我们来聊一个比较实用的话题,动态可监控的线程池实践,全新开源项目(DynamicTp)地址在下方,欢迎star交流学习。


项目地址

gitee地址https://gitee.com/yanhom/dynamic-tp

github地址https://github.com/lyh200/dynamic-tp


系列文章

动态线程池框架(DynamicTp),监控及源码解析篇

动态线程池(DynamicTp)之动态调整Tomcat、Jetty、Undertow线程池参数篇


写在前面

稍微有些Java编程经验的小伙伴都知道,Java的精髓在juc包,这是大名鼎鼎的Doug Lea老爷
子的杰作,评价一个程序员Java水平怎么样,一定程度上看他对juc包下的一些技术掌握的怎么样,这也是面试中的基本上必问的一些技术点之一。

juc包主要包括:

1.原子类(AtomicXXX)

2.锁类(XXXLock)

3.线程同步类(AQS、CountDownLatch、CyclicBarrier、Semaphore、Exchanger)

4.任务执行器类(Executor体系类,包括今天的主角ThreadPoolExecutor)

5.并发集合类(ConcurrentXXX、CopyOnWriteXXX)相关集合类

6.阻塞队列类(BlockingQueue继承体系类)

7.Future相关类

8.其他一些辅助工具类

多线程编程场景下,这些类都是必备技能,会这些可以帮助我们写出高质量、高性能、少bug的代码,同时这些也是Java中比较难啃的一些技术,需要持之以恒,学以致用,在使用中感受他们带来的奥妙。

上边简单罗列了下juc包下功能分类,这篇文章我们主要来介绍动态可监控线程池的,所以具体内容也就不展开讲了,以后有时间单独来聊吧。看这篇文章前,希望读者最好有一定的线程池ThreadPoolExecutor使用经验,不然看起来会有点懵。

如果你对ThreadPoolExecutor不是很熟悉,推荐阅读下面两篇文章

javadoop: https://www.javadoop.com/post/java-thread-pool

美团技术博客: https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html


背景

使用ThreadPoolExecutor过程中你是否有以下痛点呢?

1.代码中创建了一个ThreadPoolExecutor,但是不知道那几个核心参数设置多少比较合适

2.凭经验设置参数值,上线后发现需要调整,改代码重启服务,非常麻烦

3.线程池相对开发人员来说是个黑盒,运行情况不能感知到,直到出现问题

如果你有以上痛点,这篇文章要介绍的动态可监控线程池(DynamicTp)或许能帮助到你。

如果看过ThreadPoolExecutor的源码,大概可以知道其实它有提供一些set方法,可以在运行时动态去修改相应的值,这些方法有:

public void setCorePoolSize(int corePoolSize);public void setMaximumPoolSize(int maximumPoolSize);public void setKeepAliveTime(long time, TimeUnit unit);public void setThreadFactory(ThreadFactory threadFactory);public void setRejectedExecutionHandler(RejectedExecutionHandler handler);

现在大多数的互联网项目其实都会微服务化部署,有一套自己的服务治理体系,微服务组件中的分布式配置中心扮演的就是动态修改配置,实时生效的角色。那么我们是否可以结合配置中心来做运行时线程池参数的动态调整呢?答案是肯定的,而且配置中心相对都是高可用的,使用它也不用过于担心配置推送出现问题这类事儿,而且也能减少研发动态线程池组件的难度和工作量。

综上,我们总结出以下的背景

  • 广泛性:在Java开发中,想要提高系统性能,线程池已经是一个90%以上的人都会选择使用的基础工具

  • 不确定性:项目中可能会创建很多线程池,既有IO密集型的,也有CPU密集型的,但线程池的参数并不好确定;需要有套机制在运行过程中动态去调整参数

  • 无感知性,线程池运行过程中的各项指标一般感知不到;需要有套监控报警机制在事前、事中就能让开发人员感知到线程池的运行状况,及时处理

  • 高可用性,配置变更需要及时推送到客户端;需要有高可用的配置管理推送服务,配置中心是现在大多数互联网系统都会使用的组件,与之结合可以大幅度减少开发量及接入难度


简介

我们基于配置中心对线程池ThreadPoolExecutor做一些扩展,实现对运行中线程池参数的动态修改,实时生效;以及实时监控线程池的运行状态,触发设置的报警策略时报警,报警信息会推送办公平台(钉钉、企微等)。报警维度包括(队列容量、线程池活性、拒绝触发等);同时也会定时采集线程池指标数据供监控平台可视化使用。使我们能时刻感知到线程池的负载,根据情况及时调整,避免出现问题影响线上业务。

    |  __ \                            (_) |__   __|
    | |  | |_   _ _ __   __ _ _ __ ___  _  ___| |_ __  
    | |  | | | | | '_ \ / _` | '_ ` _ | |/ __| | '_ \ 
    | |__| | |_| | | | | (_| | | | | | | | (__| | |_) |
    |_____/ __, |_| |_|__,_|_| |_| |_|_|___|_| .__/ 
             __/ |                              | |    
            |___/                               |_|    
     :: Dynamic Thread Pool ::

特性

  • 参考美团线程池实践 ,对线程池参数动态化管理,增加监控、报警功能

  • 基于Spring框架,现只支持SpringBoot项目使用,轻量级,引入starter即可食用

  • 基于配置中心实现线程池参数动态调整,实时生效;集成主流配置中心,默认支持Nacos、Apollo,同时也提供SPI接口可自定义扩展实现

  • 内置通知报警功能,提供多种报警维度(配置变更通知、活性报警、容量阈值报警、拒绝策略触发报警),默认支持企业微信、钉钉报警,同时提供SPI接口可自定义扩展实现

  • 内置线程池指标采集功能,支持通过MicroMeter、JsonLog日志输出、Endpoint三种方式,可通过SPI接口自定义扩展实现

  • 集成管理常用第三方组件的线程池,已集成SpringBoot内置WebServer(Tomcat、Undertow、Jetty)的线程池管理


架构设计

主要分四大模块

  • 配置变更监听模块:

    1.监听特定配置中心的指定配置文件(默认实现Nacos、Apollo),可通过内部提供的SPI接口扩展其他实现

    2.解析配置文件内容,内置实现yml、properties配置文件的解析,可通过内部提供的SPI接口扩展其他实现

    3.通知线程池管理模块实现刷新

  • 线程池管理模块:

    1.服务启动时从配置中心拉取配置信息,生成线程池实例注册到内部线程池注册中心中

    2.监听模块监听到配置变更时,将变更信息传递给管理模块,实现线程池参数的刷新

    3.代码中通过getExecutor()方法根据线程池名称来获取线程池对象实例

  • 监控模块:

    实现监控指标采集以及输出,默认提供以下三种方式,也可通过内部提供的SPI接口扩展其他实现

    1.默认实现Json log输出到磁盘

    2.MicroMeter采集,引入MicroMeter相关依赖

    3.暴雷Endpoint端点,可通过http方式访问

  • 通知告警模块:

    对接办公平台,实现通告告警功能,默认实现钉钉、企微,可通过内部提供的SPI接口扩展其他实现,通知告警类型如下

    1.线程池参数变更通知

    2.阻塞队列容量达到设置阈值告警

    3.线程池活性达到设置阈值告警

    4.触发拒绝策略告警

image


使用

  • maven依赖

  1. apollo应用用接入用此依赖

        <dependency>
            <groupId>io.github.lyh200</groupId>
            <artifactId>dynamic-tp-spring-boot-starter-apollo</artifactId>
            <version>1.0.2</version>
        </dependency>
  2. spring-cloud场景下的nacos应用接入用此依赖

        <dependency>
            <groupId>io.github.lyh200</groupId>
            <artifactId>dynamic-tp-spring-cloud-starter-nacos</artifactId>
            <version>1.0.2</version>
        </dependency>
  3. 非spring-cloud场景下的nacos应用接入用此依赖

        <dependency>
            <groupId>io.github.lyh200</groupId>
            <artifactId>dynamic-tp-spring-boot-starter-nacos</artifactId>
            <version>1.0.2</version>
        </dependency>
  • 线程池配置

    spring:
      dynamic:
        tp:
          enabled: true
          enabledBanner: true        # 是否开启banner打印,默认true
          enabledCollect: false      # 是否开启监控指标采集,默认false
          collectorType: logging     # 监控数据采集器类型(JsonLog | MicroMeter),默认logging
          logPath: /home/logs        # 监控日志数据路径,默认${user.home}/logs
          monitorInterval: 5         # 监控时间间隔(报警判断、指标采集),默认5s
          nacos:                     # nacos配置,不配置有默认值(规则name-dev.yml这样)
            dataId: dynamic-tp-demo-dev.yml        group: DEFAULT_GROUP      apollo:                    # apollo配置,不配置默认拿apollo配置第一个namespace
            namespace: dynamic-tp-demo-dev.yml      configType: yml            # 配置文件类型
          platforms:                 # 通知报警平台配置
            - platform: wechat          urlKey: 3a7500-1287-4bd-a798-c5c3d8b69c  # 替换
              receivers: test1,test2                   # 接受人企微名称
            - platform: ding          urlKey: f80dad441fcd655438f4a08dcd6a     # 替换
              secret: SECb5441fa6f375d5b9d21           # 替换,非sign模式可以没有此值
              receivers: 15810119805                   # 钉钉账号手机号    
          tomcatTp:                                    # tomcat web server线程池配置
              minSpare: 100
              max: 400      
          jettyTp:                                     # jetty web server线程池配置
              min: 100
              max: 400     
          undertowTp:                                  # undertow web server线程池配置
              ioThreads: 100
              workerThreads: 400      
          executors:                                   # 动态线程池配置
            - threadPoolName: dynamic-tp-test-1
              corePoolSize: 6
              maximumPoolSize: 8
              queueCapacity: 200
              queueType: VariableLinkedBlockingQueue   # 任务队列,查看源码QueueTypeEnum枚举类
              rejectedHandlerType: CallerRunsPolicy    # 拒绝策略,查看RejectedTypeEnum枚举类
              keepAliveTime: 50
              allowCoreThreadTimeOut: false
              threadNamePrefix: test           # 线程名前缀
              notifyItems:                     # 报警项,不配置自动会配置(变更通知、容量报警、活性报警、拒绝报警)
                - type: capacity               # 报警项类型,查看源码 NotifyTypeEnum枚举类
                  enabled: true
                  threshold: 80                # 报警阈值
                  platforms: [ding,wechat]     # 可选配置,不配置默认拿上层platforms配置的所以平台
                  interval: 120                # 报警间隔(单位:s)
                - type: change              enabled: true
                - type: liveness              enabled: true
                  threshold: 80
                - type: reject              enabled: true
                  threshold: 1
  • 代码方式生成,服务启动会自动注册

    @Configurationpublic class DtpConfig {
    
       @Bean
       public DtpExecutor demo1Executor() {
           return DtpCreator.createDynamicFast("demo1-executor");
      }
    
       @Bean
       public ThreadPoolExecutor demo2Executor() {
           return ThreadPoolBuilder.newBuilder()
                  .threadPoolName("demo2-executor")
                  .corePoolSize(8)
                  .maximumPoolSize(16)
                  .keepAliveTime(50)
                  .allowCoreThreadTimeOut(true)
                  .workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE.getName(), null, false)
                  .rejectedExecutionHandler(RejectedTypeEnum.CALLER_RUNS_POLICY.getName())
                  .buildDynamic();
      }}
  • 代码调用,根据线程池名称获取

    public static void main(String[] args) {
           DtpExecutor dtpExecutor = DtpRegistry.getExecutor("dynamic-tp-test-1");
           dtpExecutor.execute(() -> System.out.println("test"));}


注意事项

  1. 配置文件配置的参数会覆盖通过代码生成方式配置的参数

  2. 阻塞队列只有VariableLinkedBlockingQueue类型可以修改capacity,该类型功能和LinkedBlockingQueue相似,只是capacity不是final类型,可以修改,
    VariableLinkedBlockingQueue参考RabbitMq的实现

  3. 启动看到如下日志输出证明接入成功

    |  __ \                            (_) |__   __|   
    | |  | |_   _ _ __   __ _ _ __ ___  _  ___| |_ __  
    | |  | | | | | '_ \ / _` | '_ ` _ | |/ __| | '_ \ 
    | |__| | |_| | | | | (_| | | | | | | | (__| | |_) |
    |_____/ __, |_| |_|__,_|_| |_| |_|_|___|_| .__/ 
             __/ |                              | |    
            |___/                               |_|    
     :: Dynamic Thread Pool :: 
    
    DynamicTp register, executor: DtpMainPropWrapper(dtpName=dynamic-tp-test-1, corePoolSize=6, maxPoolSize=8, keepAliveTime=50, queueType=VariableLinkedBlockingQueue, queueCapacity=200, rejectType=RejectedCountableCallerRunsPolicy, allowCoreThreadTimeOut=false)
  1. 配置变更会推送通知消息,且会高亮变更的字段

    DynamicTp [dynamic-tp-test-2] refresh end, changed keys: [corePoolSize, queueCapacity], corePoolSize: [6 => 4], maxPoolSize: [8 => 8], queueType: [VariableLinkedBlockingQueue => VariableLinkedBlockingQueue], queueCapacity: [200 => 2000], keepAliveTime: [50s => 50s], rejectedType: [CallerRunsPolicy => CallerRunsPolicy], allowsCoreThreadTimeOut: [false => false]


通知报警

触发报警阈值会推送相应报警消息,且会高亮显示相关字段,活性告警、容量告警、拒绝告警

image

配置变更会推送通知消息,且会高亮变更的字段

image


监控日志

通过主配置文件collectType属性配置指标采集类型,默认值:logging

  • micrometer方式:通过引入micrometer相关依赖采集到相应的平台
    (如Prometheus,InfluxDb...)

  • logging:指标数据以json格式输出日志到磁盘,地址{logPath}/ dynamictp/{appName}.monitor.log

    2022-01-16 15:25:20.599 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":2,"queueSize":100,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":10,"taskCount":120,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":1078,"dtpName":"remoting-call","maximumPoolSize":8}2022-01-16 15:25:25.603 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":2,"queueSize":120,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":20,"taskCount":140,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":1459,"dtpName":"remoting-call","maximumPoolSize":8}2022-01-16 15:25:30.609 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":2,"queueSize":140,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":89,"taskCount":180,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":1890,"dtpName":"remoting-call","maximumPoolSize":8}2022-01-16 15:25:35.613 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":2,"queueSize":160,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":99,"taskCount":230,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":2780,"dtpName":"remoting-call","maximumPoolSize":8}2022-01-16 15:25:40.616 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":2,"queueSize":230,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":0,"taskCount":300,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":4030,"dtpName":"remoting-call","maximumPoolSize":8}
  • 暴露EndPoint端点(dynamic-tp),可以通过http方式请求

    [
        {
            "dtp_name": "remoting-call",
            "core_pool_size": 8,
            "maximum_pool_size": 16,
            "queue_type": "SynchronousQueue",
            "queue_capacity": 0,
            "queue_size": 0,
            "fair": false,
            "queue_remaining_capacity": 0,
            "active_count": 2,
            "task_count": 2760,
            "completed_task_count": 2760,
            "largest_pool_size": 16,
            "pool_size": 8,
            "wait_task_count": 0,
            "reject_count": 12462,
            "reject_handler_name": "CallerRunsPolicy"
        },
        {
            "max_memory": "220 MB",
            "total_memory": "140 MB",
            "free_memory": "44 MB",
            "usable_memory": "125 MB"
        }]



作者:CodeFox
链接:https://www.jianshu.com/p/e5c931119f5a
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。


1 启用Hyper-V功能

打开设置-应用,开启Hyper-V


image.png

2 下载安装WindowsSubsystemForAndroid

下载
打开网站 https://store.rg-adguard.net/,在搜索框中输入https://www.microsoft.com/store/productid/9p3395vx91nr,下载下图两个文件:

image.png


安装
windows搜索框搜索powershell,右键以管理员身份运行

image.png


进入到文件下载目录中,依次执行下面两条命令


Add-AppxPackage "Microsoft.UI.Xaml.2.6_2.62112.3002.0_x64__8wekyb3d8bbwe.Appx"

Add-AppxPackage "MicrosoftCorporationII.WindowsSubsystemForAndroid_2204.40000.19.0_neutral___8wekyb3d8bbwe.Msixbundle"

配置
打开安装好的WindowsSubsystemForAndroid

image.png


打开开发人员模式,然后点击刷新按钮,记录红框中的IP+端口号

image.png


3 安装配置ADB(安卓命令行调试工具)

下载&解压到磁盘
下载链接:https://www.jianeryi.com/download?post=1346
解压到磁盘上记住解压位置路径

配置环境变量
打开系统-系统信息-高级设置-环境变量

image.png


新建环境变量adb

image.png


在Path中添加环境变量%adb%

image.png


保存后关闭配置页面。打开命令行工具输入adb -version会打印出adb的版本信息。


Android Debug Bridge version 1.0.41
Version 31.0.3-7562133

4 安装安卓程序

打开cmd输入adb connect 127.0.0.1:58526连接安卓设备,127.0.0.1:58526是前面打开WindowsSubsystemForAndroid开发人员模式那一步得到的IP和端口号。

> adb connect 127.0.0.1:58526
connected to 127.0.0.1:58526

安装一个腾讯TIM

> adb install tim.apk

打开安装的apk文件:



作者:茂茂的小破号
链接:https://www.jianshu.com/p/b899bd053a38
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。