2024年3月


引言


大家好,我是你们的老伙计秀才!今天带来的是[深入浅出Java多线程]系列的第十一篇内容:AQS( AbstractQueuedSynchronizer )。大家觉得有用请点赞,喜欢请关注!秀才在此谢过大家了!!!

在现代多核CPU环境中,多线程编程已成为提升系统性能和并发处理能力的关键手段。然而,当多个线程共享同一资源或访问临界区时,如何有效地控制线程间的执行顺序以保证数据一致性及避免竞态条件变得至关重要。Java平台为解决这些问题提供了多种同步机制,如synchronized关键字、volatile变量以及更加灵活且功能强大的并发工具类库——java.util.concurrent包。

在这一庞大的并发工具箱中,AbstractQueuedSynchronizer(简称AQS)扮演了核心角色。作为Java并发框架中的基石,AQS是一个高度抽象的底层同步器,它不仅被广泛应用于诸如ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等标准同步组件,还为开发者提供了一种便捷的方式来构建符合特定需求的自定义同步器。

AQS的设计理念是基于模板方法模式,通过封装复杂的同步状态管理和线程排队逻辑,使得子类只需关注并实现资源获取与释放的核心算法即可。它使用一个名为
state
的volatile变量来表示同步状态,并借助于FIFO双端队列结构来管理等待获取资源的线程。AQS内部维护的Node节点不仅包含了每个等待线程的信息,而且还通过waitStatus标志位巧妙地实现了独占式和共享式的两种资源共享模式。

例如,在ReentrantLock中,AQS负责记录当前持有锁的线程重入次数,而当线程尝试获取但无法立即获得锁时,会将该线程包装成Node节点并安全地插入到等待队列中。随后,线程会被优雅地阻塞,直至锁被释放或者其在等待队列中的位置变为可以获取资源的状态。这个过程涉及到一系列精心设计的方法调用,如tryAcquire(int)、acquireQueued(Node, int)和release(int)等。

// 示例代码:ReentrantLock基于AQS的简单应用
import java.util.concurrent.locks.ReentrantLock;

public class AQSExample {
    private final ReentrantLock lock = new ReentrantLock();

    public void criticalSection() {
        lock.lock(); // 调用lock()即尝试获取AQS的资源

        try {
            // 临界区代码
            System.out.println("Thread " + Thread.currentThread().getName() + " is executing critical section.");
        } finally {
            lock.unlock(); // 释放资源
        }
    }

    public static void main(String[] args) {
        AQSExample example = new AQSExample();
        Thread t1 = new Thread(example::criticalSection, "Thread-1");
        Thread t2 = new Thread(example::criticalSection, "Thread-2");

        t1.start();
        t2.start();
    }
}

在这个简单的示例中,我们创建了一个ReentrantLock实例并在两个线程中分别调用lock方法进入临界区。如果第一个线程已经占有锁,第二个线程将会进入等待队列,直到锁被释放。这背后的机制正是由AQS提供的强大同步支持所驱动的。通过对AQS的深入探讨,读者将能更好地理解这些高级同步工具的内部工作原理,从而更高效地进行并发编程实践。


AQS简介


在Java多线程编程中,AbstractQueuedSynchronizer(简称AQS)作为J.U.C包下的一款核心同步框架,扮演了构建高效并发锁和同步器的重要角色。AQS的设计理念与实现机制极大地简化了开发人员创建自定义同步组件的工作量,同时提供了强大的底层支持以满足多样化的并发控制需求。

队列管理:
从数据结构层面看,AQS内部维护了一个基于先进先出(FIFO)原则的双端队列。该队列并非直接存储线程对象,而是使用Node节点表示等待资源的线程,并通过volatile变量state记录当前资源的状态。AQS利用两个指针head和tail精确地跟踪队列的首尾位置,确保线程在无法立即获取资源时能够安全且有序地进入等待状态。

同步功能:
AQS不仅实现了对资源的原子操作,例如通过
getState()

setState()
以及基于Unsafe的
compareAndSetState()
方法保证资源状态更新的原子性和可见性,还提供了线程排队和阻塞机制,包括线程等待队列的维护、入队与出队的逻辑,以及线程在资源未得到时如何正确地挂起和唤醒等核心功能。

应用实例:
AQS的强大之处在于它支撑了许多常见的并发工具类,诸如ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock以及SynchronousQueue等,这些同步工具均是建立在AQS基础之上的,有效地解决了多线程环境下的互斥访问、信号量控制、倒计数等待、读写分离等多种同步问题。

下面是一个简单的代码示例,展示了如何使用基于AQS实现的ReentrantLock进行线程同步:

import java.util.concurrent.locks.ReentrantLock;

public class AQSExample {
    private final ReentrantLock lock = new ReentrantLock();

    public void criticalSection() {
        lock.lock(); // 调用lock()方法尝试获取AQS管理的资源

        try {
            // 执行临界区代码
            System.out.println("Thread " + Thread.currentThread().getName() + " is in the critical section.");
        } finally {
            lock.unlock(); // 在finally块中确保资源始终会被释放
        }
    }

    public static void main(String[] args) {
        AQSExample example = new AQSExample();
        Thread t1 = new Thread(example::criticalSection, "Thread-1");
        Thread t2 = new Thread(example::criticalSection, "Thread-2");

        t1.start();
        t2.start();
    }
}

在这个例子中,当一个线程调用lock方法并成功获取到资源(即获得锁)时,另一个线程必须等待直至锁被释放。这一过程正是通过AQS所维护的线程等待队列和相应的同步算法得以实现的。此外,AQS也支持资源共享的两种模式,即独占模式(一次只有一个线程能获取资源)和共享模式(允许多个线程同时获取资源但数量有限制),并且灵活地支持可中断的资源请求操作,为复杂多样的并发场景提供了一站式的解决方案。


AQS的数据结构


在Java多线程编程中,AbstractQueuedSynchronizer(AQS)的数据结构设计是其高效实现同步功能的关键。AQS的核心数据结构主要包括以下几个部分:

volatile变量state

AQS内部维护了一个名为
state
的volatile整型变量,用于表示共享资源的状态。该状态值可以用来反映资源的数量、锁的持有状态等信息,具体含义由基于AQS构建的具体同步组件定义。由于state是volatile修饰的,因此确保了对它的修改能被其他线程及时看到,实现了跨线程的内存可见性。

protected volatile int state;

Node双端队列

AQS使用一个FIFO(先进先出)的双端队列来存储等待获取资源的线程。这里的节点并非直接存储线程对象,而是封装为
Node
类的对象,每个Node代表一个等待线程,并通过
prev

next
指针形成链表结构。头尾指针
head

tail
分别指向队列的首尾结点,便于进行快速插入和移除操作。

static final class Node {
    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    // 其他成员方法及属性...
}

waitStatus标志位

每个Node节点都有一个
waitStatus
字段,它是一个int类型的volatile变量,用以标识当前节点所对应的线程等待状态。例如,
CANCELLED
表示线程已经被取消,
SIGNAL
表示后继节点的线程需要被唤醒,
CONDITION
则表示线程在条件队列中等待某个条件满足,还有如
PROPAGATE
这样的状态值用于共享模式下的资源传播。

线程调度逻辑

当线程尝试获取资源失败时,会创建一个Node节点并将当前线程包装进去,然后利用CAS算法将其安全地加入到等待队列的尾部。而在释放资源时,AQS会根据资源管理策略从队列中选择合适的节点并唤醒对应线程。

资源共享模式支持

AQS内建了对独占模式和共享模式的支持,这两种模式的区别在于:独占模式下同一时刻只能有一个线程获取资源,典型的如ReentrantLock;而共享模式允许多个线程同时获取资源,如Semaphore和CountDownLatch。在Node节点的设计上,通过
SHARED

EXCLUSIVE
静态常量区分不同模式的节点。

尽管AQS提供了如
tryAcquire(int)

tryRelease(int)
等方法供子类覆盖以完成特定的资源控制逻辑,但具体的线程入队与出队、状态更新以及阻塞与唤醒等底层细节都是由AQS本身精心设计并实现的。这种机制使得基于AQS构建的同步工具能够有效地处理并发场景中的竞争问题,保证了线程间的安全协同执行。遗憾的是,由于篇幅限制,在此处无法提供完整的代码示例来展示AQS如何将线程包装成Node节点并维护其在线程等待队列中的位置变化。

总结AQS的数据结构如下图:


资源共享模式


在Java多线程同步框架AbstractQueuedSynchronizer(AQS)中,资源共享模式是其核心概念之一,用于定义并发环境中资源的访问方式。AQS支持两种主要的资源共享模式:独占模式(Exclusive)和共享模式(Share)。

独占模式

在独占模式下,同一时间只能有一个线程获取并持有资源,典型的例子就是ReentrantLock。当一个线程成功获取锁之后,其他试图获取锁的线程将被阻塞,直到持有锁的线程释放资源。通过AQS中的
tryAcquire(int)
方法实现对资源的尝试获取,以及
tryRelease(int)
方法来释放资源。例如:

import java.util.concurrent.locks.ReentrantLock;

public class ExclusiveModeExample {
    private final ReentrantLock lock = new ReentrantLock();

    public void criticalSection() {
        lock.lock(); // 尝试以独占模式获取资源(即获取锁)

        try {
            // 在这里执行临界区代码
        } finally {
            lock.unlock(); // 释放资源(即释放锁)
        }
    }

    public static void main(String[] args) {
        ExclusiveModeExample example = new ExclusiveModeExample();
        Thread t1 = new Thread(example::criticalSection, "Thread-1");
        Thread t2 = new Thread(example::criticalSection, "Thread-2");

        t1.start();
        t2.start();
    }
}

在这个示例中,两个线程尝试进入临界区,但由于使用的是ReentrantLock(基于AQS),因此在同一时刻仅允许一个线程执行临界区代码。

共享模式

而在共享模式下,多个线程可以同时获取资源,但通常会限制可同时访问资源的线程数量。Semaphore和CountDownLatch就是采用共享模式的例子。例如,在Semaphore中,可以通过参数指定允许多少个线程同时访问某个资源:

import java.util.concurrent.Semaphore;

public class SharedModeExample {
    private final Semaphore semaphore = new Semaphore(3); // 只允许最多3个线程同时访问资源

    public void accessResource() {
        try {
            semaphore.acquire(); // 获取许可,如果当前可用许可数小于1,则线程会被阻塞
            // 在这里执行需要保护的共享资源操作
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            semaphore.release(); // 释放许可,使其他等待的线程有机会继续访问
        }
    }

    public static void main(String[] args) {
        SharedModeExample example = new SharedModeExample();
        for (int i = 0; i < 5; i++) {
            Thread t = new Thread(example::accessResource, "Thread-" + (i + 1));
            t.start();
        }
    }
}

此例中,Semaphore初始化为3个许可,这意味着最多三个线程可以同时执行
accessResource
方法中的共享资源操作。超过三个线程则需等待其他线程释放许可后才能继续执行。

总之,无论是独占模式还是共享模式,AQS都提供了底层机制来确保线程安全地进行资源的获取与释放,并利用双端队列结构及状态变量维护线程的等待、唤醒逻辑,使得这些高级同步工具能够在各种复杂的并发场景中表现得既高效又稳定。


AQS关键方法解析


在Java多线程同步框架AbstractQueuedSynchronizer(AQS)中,有几个关键方法是实现资源获取与释放的核心逻辑。这些方法由子类覆盖以满足特定的同步需求,并结合AQS提供的底层队列管理和状态更新机制,确保了线程间的同步操作正确且高效地执行。

tryAcquire(int arg)

tryRelease(int arg)

这两个方法分别对应资源的独占式获取和释放操作。在ReentrantLock等基于AQS构建的独占锁中,子类需要重写这两个方法来定义资源是否可以被当前线程获取或释放的条件。例如,在ReentrantLock中,tryAcquire会检查当前线程是否已经持有锁以及锁的状态是否允许重新获取;tryRelease则负责递减锁的计数并根据结果决定是否唤醒等待队列中的线程。

tryAcquireShared(int arg)

tryReleaseShared(int arg)

对于共享模式下的资源控制,AQS提供了这两个方法。在Semaphore、CountDownLatch等共享资源管理器中,tryAcquireShared将尝试获取指定数量的资源,并返回一个表示成功与否及剩余资源量的整数值;而tryReleaseShared则是释放资源,同样根据资源总量的变化判断是否有等待的线程可以被唤醒。

acquire(int arg)

release(int arg)

这是AQS对外暴露的主要接口,用于资源的获取和释放。acquire首先调用tryAcquire试图直接获取资源,若失败则通过addWaiter方法将当前线程包装成Node节点加入到等待队列尾部,并进一步调用acquireQueued进入自旋循环直至成功获取资源或被中断。acquireQueued内部包含parkAndCheckInterrupt方法,使用LockSupport.park挂起当前线程,直到其他线程释放资源后通过unpark唤醒它。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

acquireInterruptibly(int arg)

acquireSharedInterruptibly(int arg)

这两个方法扩展了acquire和acquireShared的功能,使其支持可中断的资源请求。如果在等待过程中线程被中断,将会抛出InterruptedException,而非一直阻塞。

isHeldExclusively()

这个方法仅在使用条件变量时有用,用于确定当前线程是否独占资源。在ReentrantLock的Condition实现中,该方法用于检测当前线程是否持有锁,以便决定能否执行signal/signalAll等操作。

综上所述,AQS通过提供一套模板方法供子类扩展,从而实现了灵活且高效的线程同步机制。在实际应用中,开发者可以根据具体场景重写相应的tryAcquire系列方法,利用AQS强大的底层队列和原子状态管理功能来实现复杂的并发控制逻辑。

总结AQS的流程如下图:


AQS
资源释放

在Java多线程同步框架AbstractQueuedSynchronizer(AQS)中,资源释放逻辑是同步机制中的重要一环。当一个线程完成了对共享资源的独占或共享操作后,需要通过调用相应的release方法来释放资源,使得等待队列中的其他线程有机会获取并使用这些资源。

资源释放入口:
资源释放的主要入口是
release(int arg)
方法,它接受一个参数arg,表示要释放的资源数量。此方法首先调用子类实现的
tryRelease(int arg)
方法尝试释放资源。如果该方法返回true,说明资源成功释放,此时AQS会进一步检查当前头节点的状态,并决定是否唤醒下一个等待的线程。

public final boolean release(int arg) {
    if (tryRelease(arg)) { // 尝试释放资源
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h); // 唤醒等待队列中的下一个线程
        return true;
    }
    return false;
}

唤醒后续结点:
在资源成功释放后,
unparkSuccessor(Node node)
方法会被调用来唤醒等待队列中合适的下一个线程。这个方法首先检查头结点的waitStatus状态,如果大于等于0,则遍历队列以找到首个可用的未取消结点,并使用LockSupport.unpark唤醒对应的线程。

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }

    if (s != null)
        LockSupport.unpark(s.thread);
}

中断与资源管理:
对于支持可中断的同步器如ReentrantLock,其释放资源的过程还会考虑线程中断的情况。当一个线程在等待过程中被中断时,它的等待状态将被正确处理,并可能抛出InterruptedException异常,从而允许上层代码进行恰当的响应。

此外,在资源释放的过程中,AQS确保了操作的原子性和一致性,防止多个线程同时释放资源造成混乱。正是由于这种精心设计的资源释放逻辑,基于AQS构建的同步组件才能够高效、安全地协调多线程对共享资源的访问。

举例来说,在使用ReentrantLock时,线程在完成临界区代码后应调用lock对象的unlock()方法释放锁:

public class ReentrantLockExample {
    private final ReentrantLock lock = new ReentrantLock();

    public void criticalSection() {
        lock.lock();
        try {
            // 执行临界区代码
        } finally {
            lock.unlock(); // 释放锁,可能唤醒等待队列中的线程
        }
    }
}

在这个例子中,当执行到finally块的unlock()方法时,就触发了AQS内部的资源释放逻辑,从而有可能唤醒另一个之前因无法获取锁而进入等待状态的线程。


总结


AbstractQueuedSynchronizer(AQS)作为Java并发编程中至关重要的框架,为构建高效、安全的锁和其他同步器提供了基础结构。它巧妙地结合了数据结构和原子操作,实现了线程间的资源共享管理,并支持独占模式和共享模式两种主要的同步方式。

在AQS的设计中,volatile变量
state
是资源状态的核心表示,通过
tryAcquire(int)

tryRelease(int)
等protected方法,子类可以灵活定义资源获取和释放的具体逻辑。同时,AQS利用FIFO双端队列和Node节点结构来维护等待获取资源的线程队列,确保了线程间的公平性和互斥性。

对于资源的获取流程,AQS采用自旋+CAS的方式插入新的等待节点至队尾,当无法立即获取资源时,线程会进入等待状态并通过LockSupport.park阻塞自身。而在资源释放时,AQS则通过unparkSuccessor方法唤醒等待队列中的下一个合适节点,使得资源能够被有效地传递给其他线程。

例如,在ReentrantLock中,AQS用于实现可重入的锁功能,当线程调用lock()方法尝试获取锁时,如果当前锁已被占用,则线程将加入等待队列;而当线程调用unlock()方法释放锁时,AQS会自动处理后续线程的唤醒工作。

总的来说,AQS通过模板方法设计模式,简化了自定义同步组件的开发难度,开发者仅需关注资源访问策略的实现,即可构建出如ReentrantLock、Semaphore、CountDownLatch等多种广泛应用的同步工具类。AQS以其强大的内核机制,极大地提升了Java多线程环境下的同步性能和灵活性,成为并发编程库不可或缺的基石。

本文使用
markdown.com.cn
排版

3.4)Ansible 常用模块

Ansible 默认提供了很多模块来供我们使用。

我们可以通过 ansible-doc -l 命令查看到当前 ansible 都支持哪些模块
通过 ansible-doc -s 模块名 可以查看该模块有哪些参数可以使用。

目前 2023 为止:模块总量基本保存在 3387 个。
虽然模块众多,但最常用的模块也就 2 - 30 个而已,针对特定业务只用 10 几个模块。

[root@ansible ~] ansible-doc -l | wc -l
3387

常用模块帮助文档参考:
https://docs.ansible.com/ansible/2.9/modules/modules_by_category.html
https://docs.ansible.com/ansible/2.9/modules/list_of_all_modules.html

img

img

3.4.1)Command 模块

Comand 模块不会通过 Shell 处理命令,因此不支持像
$HOME
这样的变量。
以及
< > | ; 和 &
等都是无效的。

功能:
在远程主机执行
系统命令
,此为默认模块,可忽略 -m 选项
注意:
此命令不支持
$VARNAME < > | ; &
等,建议用 Shell 模块实现
注意:
此模块不具有幂等性
范例:

# command 模块不支持重定向
ansible websrvs -m command -a 'echo hello > /root/hello.log'
ansible websrvs -m command -a 'cat /root/hello.log'
 
# command 模块不支持变量引用  
ansible websrvs -m command -a "echo $HOSTNAME"

# command 模块不支持管道符
ansible websrvs -m command -a 'echo 123456 | passwd --stdin wangj'

image.png

# 仅支持简单的 Shell 命令, 且不具备幂等性
# 首行 WARNING 是告知我们所执行的操作可以使用其他模块代替
ansible websrvs -m command -a 'mkdir /data'
ansible websrvs -m command -a 'touch /data/1.log'
ansible websrvs -m command -a 'ls /data'

command 模块的部分参数演示

名称 必选 备注
chdir no 运行 command 命令前先 cd 到这个目录
creates no 如果这个参数对应的文件存在,就不运行 command
# chdir 参数
# 先切换到 /data 目录, 然后执行 ls -l 命令
ansible websrvs -m command -a 'chdir=/data ls -l'

# creates 参数
# 如果 /data/mysql 目录存在, 则跳过创建.
# 如果 /data/mysql 目录不存在, 则创建 /data/mysql 目录.
ansible websrvs -m command -a 'creates=/data/mysql mkdir /data/mysql'
ansible websrvs -m command -a 'creates=/data/mysql mkdir /data/mysql'

总结:

  • command 模块的命令不支持启动 Shell,直接通过 SSH 执行命令
  • command 不支持 Bash 的特性,如管道和重定向等功能
  • 若需要通过 Shell 运行一个命令,比如 < > | 等,你实际上需要使用 Shell 模块。
  • command 模块更安全,因为它不受用户环境的影响

3.4.2)Shell 模块

让远程主机在 Shell 进程下执行命令,从而支持 Shell 的特性,如管道等。
与 command 模块几乎相同,但在执行命令的时候使用的是 /bin/sh。
注意:
Command 和 Shell 模块都只能去执行一些非交互式的命令,不能去执行一些交互式的命令,比如 vim 或 top。

功能:
和 command 相似,用 Shell 执行命令,支持各种符号,比如:
*,$, >
注意:
此模块不具有幂等性
范例:

# Shell 模块支持变量引用
[root@ansible ~] ansible websrvs -m shell -a 'echo $HOSTNAME'

# Shell 模块支持管道符
[root@ansible ~] ansible websrvs -m shell -a 'useradd wangj'
[root@ansible ~] ansible websrvs -m shell -a 'echo 123456 | passwd --stdin wangj'
[root@ansible ~] ansible websrvs -m shell -a 'ls -l /etc/shadow'

image.png

# Shell 模块支持重定向
[root@ansible ~] ansible websrvs -m shell -a 'echo hello > /data/hello.log'
[root@ansible ~] ansible websrvs -m shell -a 'cat /data/hello.log'

注意:
即使是调用 bash 执行命令
类似
cat /tmp/test.md | awk -F'|' '{print $1,$2}' &> /tmp/example.txt
这些复杂命令
使用 Shell 模块也可能会失败。
解决办法:
建议写到脚本中,copy 到远程,执行,再把需要的结果拉回执行命令的机器

小技巧:
使用 Shell 模块替代 command,设为默认模块

[root@ansible ~] vim /etc/ansible/ansible.cfg
# 修改下面一行
module_name = shell
# 验证
ansible websrvs -a 'echo 123456 > /data/1.log'    # 默认是 shell 模块
ansible websrvs -m command -a 'echo 123456 > /data/1.log'    # 指定 command 模块 ( 不支持重定向等操作 )

3.4.3)Script 模块

功能:
在远程主机上运行 ansible 服务器上的脚本
注意:
此模块不具有幂等性

ansible websrvs -m script -a '/root/test.sh'

3.4.4)Copy 模块

功能:
从 ansible 服务器 主控端复制文件到远程主机
注意:
src=file 如果没指明绝对路径,则为当前目录或当前目录下的 files 目录下的 file 文件

参数解析

  • src:推送数据的源文件信息
  • dest:推送数据的目标路径
  • backup:对推送传输过去的文件,进行备份
  • content:直接批量在被管理端文件中添加内容
  • group:将本地文件推送到远端,指定文件属组信息
  • owner:将本地文件推送到远端,指定文件属主信息
  • mode:将本地文件推送到远端,指定文件权限信息
# backup 参数
# 如目标存在, 默认覆盖, 此处指定先备份
# ansible websrvs -m shell -a 'useradd wangj'
ansible websrvs -m copy -a 'src=/root/test.sh dest=/root/test01.sh owner=wangj group=bin mode=600 backup=yes'

# content 参数
# 指定内容, 直接生成目标文件    
ansible websrvs -m copy -a "content='test line1\ntest line2\n' dest=/tmp/test.txt"

copy 模块
小细节

# 复制 /etc 目录自身. ( 注意: /etc 后面没有 / )
# ansible websrvs -m shell -a "mkdir /backup"
ansible websrvs -m copy -a "src=/etc dest=/backup"

# 复制 /etc/ 下的文件, 不包括 /etc 目录自身. ( 注意: /etc/ 后面有 / )
ansible websrvs -m copy -a "src=/etc/ dest=/backup"

3.4.5)Get_url 模块

功能:
用于将文件从 http、https 或 ftp 下载到被管理机节点上。
注意:
被管理机节点必须要能够直接访问对应的远程资源。

常用参数如下:
url:下载文件的 URL,支持 HTTP, HTTPS 或 FTP 协议
dest:下载到目标路径(绝对路径),如果目标是一个目录,就用服务器上面文件的名称,如果目标设置了名称就用目标设置的名称
owner:指定属主
group:指定属组
mode:指定权限
force:如果 yes, dest 不是目录,将每次下载文件, 如果内容改变, 替换文件. 如果否, 则只有在目标不存在时才会下载该文件.
checksum:对目标文件在下载后计算摘要,以确保其完整性
    示例: checksum="sha256:D98291AC[...]B6DC7B97",
    checksum="sha256:http://example.com/path/sha256sum.txt"
url_username:用于 HTTP 基本认证的用户名。对于允许空密码的站点,可以不使用 url_password 参数
url_password:用于 HTTP 基本认证的密码。如果未指定 url_username 参数,则不会使用  url_password 参数
validate_certs:如果 no, SSL 证书将不会被验证. 适用于自签名证书在私有网站上使用
timeout:URL 请求的超时时间, 秒为单位
参数 说明
url 下载资源的URL(支持http、https、ftp协议)
dest 下载的资源在目标主机上的保存路径(绝对路径)
owner 指定属主
group 指定属组
mode 指定权限
force =yes|no,是否强制下载;默认为no。
设为yes,即强制下载,如果同名文件存在,则覆盖;设为no,则只有在文件不存在时才下载。
url_username 基于http basic认证的用户名(如果访问的URL需要的话);
url_password 基于http basic认证的密码,和url_username一起使用(如果URL允许使用空密码,则仅需提供url_username即可);
在没有指定url_username时,单独指定url_password也没有意义。
validate_certs yes|no,是否校验 SSL 证书,默认为yes;
当设为no时,SSL证书将不会被校验(建议仅在URL访问的站点使用可被信任的自签名证书的情况下使用validate_certs=no)
timeout URL 请求的超时时间,单位为 s
checksum 对下载后的文件计算校验和,与 checksum 指定的校验和进行比对,确保下载文件的完好性

范例:

# 提前下载好软件包验证哈希值
[root@ansible ~] wget http://nginx.org/download/nginx-1.18.0.tar.gz
# 使用如下两条命令验证哈希值
[root@ansible ~] openssl md5 nginx-1.18.0.tar.gz
[root@ansible ~] md5sum nginx-1.18.0.tar.gz 
b2d33d24d89b8b1f87ff5d251aa27eb8  nginx-1.18.0.tar.gz
# ansible get_url 模块使用
[root@ansible ~] ansible websrvs -m get_url -a 'url=http://nginx.org/download/nginx-1.18.0.tar.gz dest=/usr/local/src/nginx.tar.gz checksum="md5:b2d33d24d89b8b1f87ff5d251aa27eb8"'

3.4.6)Fetch 模块

功能:
从远程主机提取文件至 Ansible 的主控端
与 copy 模块相反,目前不支持目录( 建议:可以将目录打包,然后将压缩包拷贝至 ansible 主控端 )
常用于:
复制客户端的日志文件至 ansible 主控端

[root@ansible ~] ansible websrvs -m fetch -a 'src=/var/log/messages dest=/root/logs'
[root@ansible ~] tree

3.4.7)File 模块

功能:
文件管理模块,用于对文件或文件夹相关的操作
主要用来设置文件、链接、目录的属性,或者移除文件、链接、目录。
比如:
创建文件或目录,删除文件或目录,设置文件目录属性,创建目录软链接等等
幂等性:
任意次执行所产生的影响均与一次执行的影响相同

参数 说明
path 指定远程主机目录或文件信息
state directory:在远端创建目录
touch:在远端创建文件
link:link 或 hard 表示创建链接文件
absent:表示删除文件或目录
owner 设置所有者
group 设置所属的组
mode 权限 0000
recurse 递归 yes or no
# 创建空文件
ansible all -m file -a 'path=/data/test.txt state=touch'
ansible all -m file -a "path=/root/test.txt state=touch owner=wangj mode=755"

案例

# 创建目录 state=directory
ansible all -m file -a "path=/data/mysql state=directory owner=mysql group=mysql"

# 创建软链接 state=link
ansible all -m file -a 'src=/data/testfile dest=/data/testfile-link state=link'

# 递归修改目录属性, 但不递归至子目录
ansible all -m file -a "path=/data/mysql state=directory owner=mysql group=mysql"

# recurse 参数
# 递归修改目录及子目录的属性
ansible all -m file -a "path=/data/mysql state=directory owner=mysql group=mysql recurse=yes"

# 删除文件或目录 state=absent
ansible all -m file -a "path=/data/mysql state=absent"

3.4.8)stat 模块

stat 模块将获取指定文件或目录的信息,并使用 register 参数将其保存。

功能:
检查文件或文件系统的状态
注意:
对于 Windows 目标,请改用 win_stat 模块
选项:
path: 文件/对象的完整路径 (必须)

常用的返回值判断:
exists:判断是否存在
isuid:调用用户的 ID 与所有者 ID 是否匹配

范例:

[root@ansible ~] ansible 127.0.0.1 -m stat -a 'path=/etc/passwd'

image.png

案例:

- name: install | Check if file is already configured.
    stat: path={{ nginx_file_path }}
    connection: local
    register: nginx_file_result
    
- name: install | Download nginx file
    get_url: url={{ nginx_file_url }} dest={{ software_files_path }} validate_certs=no
    connection: local
    when:, not. nginx_file_result.stat.exists

范例:
使用 stat 模块验证文件状态,通过文件状态推进下一步实施动作

# 检查 websrvs 主机组中的所有主机上的 /data/mysql 路径是否存在
# 如果路径不存在, 它将在每个主机上输出一条调试信息, 说明该路径不存在
[root@ansible ansible] cat stat.yml
---
- hosts: websrvs
  tasks:
    - name: Check file
      stat: path=/data/mysql
      register: st
    - name: debug
      debug:
        msg: "/data/mysql is not exist"
      when: not st.stat.exists

 [root@ansible ansible] ansible-playbook stat.yml

3.4.9)unarchive 模块

功能:
解包解压缩
实现有两种用法:
1)将 ansible 主机上的压缩包传到远程主机后解压缩至特定目录,设置 copy=yes,
此为默认值,可省略
2)将远程主机上的某个压缩包解压缩到指定路径下,设置 copy=no

常见参数:
copy:默认为 yes,当 copy=yes,拷贝的文件是从 ansible 主机复制到远程主机上.
如果设置为 copy=no,会在远程主机上寻找 src 源文件
remote_src:和 copy 功能一样且互斥
yes 表示在远程主机,不在 ansible 主机
no 表示文件在 ansible 主机上
src:源路径,可以是 ansible 主机上的路径,也可以是远程主机(被管理端或者第三方主机)上的路径,
如果是远程主机上的路径,则需要设置 copy=no
dest:远程主机上的目标路径
mode:设置解压缩后的文件权限

范例:

# copy=yes ( 默认值 )
# 拷贝的文件是从 ansible 控制主机复制到远程主机上
ansible all -m unarchive -a 'src=/root/nginx-1.18.0.tar.gz dest=/usr/local/src owner=wangj group=bin'

# copy=no ( 在远程被控主机上寻找 src 源文件 )
# ansible websrvs -m get_url -a 'url=http://nginx.org/download/nginx-1.18.0.tar.gz dest=/root/nginx-1.18.0.tar.gz'
ansible all -m unarchive -a 'src=/root/nginx-1.18.0.tar.gz dest=/usr/local/src copy=no mode=0777'

# 下载压缩包并解压缩至指定目录 ( 需要添加参数 copy=no )
ansible websrvs -m unarchive -a 'src=http://nginx.org/download/nginx-1.18.0.tar.gz dest=/usr/local/src/ copy=no'

# remote_src=yes ( 表示内容在远程主机上 )
ansible websrvs -m unarchive -a 'src=https://releases.ansible.com/ansible/ansible-2.1.6.0-0.1.rc1.tar.gz dest=/usr/local/src owner=root remote_src=yes'

3.14.10)Archive 模块

功能:
打包压缩
保存在被管理节点
范例:

ansible websrvs -m archive  -a 'path=/var/log/ dest=/data/log.tar.bz2 format=bz2 owner=wangj mode=0600'

3.4.11)Hostname 模块

功能:
管理主机名
范例:

# 使用
ansible 192.168.80.18 -m hostname -a 'name=node18.wuhanjiayou.cn' 

# 验证
ansible 192.168.80.18 -m shell -a 'hostname'

image.png

# 注意:
# 千万别以分组来修改主机名 ( 不然整个组的主机名都是同一个 ) 
# 除非你确实有这个需求 
ansible websrvs -m hostname -a 'name=node.wuhanjiayou.cn'
ansible websrvs -m shell -a 'hostname'

image.png

3.4.12)Cron 模块

功能:
Cron 模块可以 帮助我们批量管理远程主机中的计划任务
支持时间:
minute,hour,day,month,weekday
分,时,日,月,周
范例:

# 创建任务
ansible 10.0.0.8 -m cron -a 'hour=2 minute=30 weekday=1-5 name="backup mysql" job=/root/mysql_backup.sh'
ansible websrvs -m cron -a "minute=*/5 job='/usr/sbin/ntpdate ntp.aliyun.com &>/dev/null' name=Synctime"

# 禁用计划任务
ansible websrvs -m cron -a "minute=*/5 job='/usr/sbin/ntpdate 172.20.0.1 &>/dev/null' name=Synctime disabled=yes"

# 启用计划任务
ansible websrvs -m cron -a "minute=*/5 job='/usr/sbin/ntpdate 172.20.0.1 &>/dev/null' name=Synctime disabled=no"

# 删除任务
ansible websrvs -m cron -a "name='backup mysql' state=absent"
ansible websrvs -m cron -a 'state=absent name=Synctime'

演示

ansible websrvs -m cron -a "minute=*/5 job='/usr/sbin/ntpdate ntp.aliyun.com &>/dev/null' name=Synctime"

image.png

3.4.13)Yum 和 Apt 模块

功能:
Yum 模块:管理软件包,支持 RHEL,CentOS,fedora,不支持 Ubuntu 其它版本
Apt 模块:管理 Debian 相关版本的软件包
后续:我们可以通过判断 Linux 系统版本来决定使用哪个模块。
范例:

// 安装: present
ansible websrvs -m yum -a 'name=httpd state=present'

// 卸载: absent
ansible websrvs -m yum -a 'name=httpd state=absent'

// 启用 epel 源进行安装
ansible websrvs -m yum -a 'name=nginx state=present enablerepo=epel'

// 升级除 kernel 和 foo 开头以外的所有包 ( 安装多个软件包 )
ansible websrvs -m yum -a 'name=* state=lastest exclude=kernel*,foo*'

// 一次安装多个软件包
[root@ansible ~] ansible websrvs -m yum -a 'name=sl,cowsay'

image.png

范例:

# 直接基于远程主机上的软件包安装
[root@ansible ~] ansible websrvs -m yum -a "name=https://mirror.tuna.tsinghua.edu.cn/zabbix/zabbix/5.2/rhel/7/x86_64/zabbix-agent-5.2.5-1.el7.x86_64.rpm state=present"

image.png

范例:

# 安装软件包
# 注意: RadHat 使用 yum 模块, Ubuntu 使用 apt 模块
[root@centos8 ~] ansible 192.168.80.18 -m apt -a 'name=bb,sl,cowsay,cmatrix,oneko,hollywood,boxes,libaa-bin,x11-apps state=present'

# 卸载软件包
# 注意: RadHat 使用 yum 模块, Ubuntu 使用 apt 模块
[root@centos8 ~] ansible websrvs -m apt -a 'name=sl,cowsay state=absent'

image.png

范例:
查看包

[root@ansible ~] ansible localhost -m yum -a "list=tree"

image.png

3.4.14)yum_repository 模块

功能:
可以帮助我们批量管理远程主机上的 Yum 仓库
image.png

- name: Add multiple repositories into the same file (1/2)
  yum_repository:
    name: epel
    description: EPEL YUM repo
    file: external_repos
    baseurl: https://download.fedoraproject.org/pub/epel/$releasever/$basearch/
    gpgcheck: no
    
- name: Add multiple repositories into the same file (2/2)
  yum_repository:
    name: rpmforge
    description: RPMforge YUM repo
    file: external_repos
    baseurl: http://apt.sw.be/redhat/el7/en/$basearch/rpmforge
    mirrorlist: http://mirrorlist.repoforge.org/el7/mirrors-rpmforge
    enabled: no
    
- name: Remove repository from a specific repo file
    yum_repository:
      name: epel
      file: external_repos
      state: absent                           

范例:
创建和删除仓库

[root@ansible ~] cat yum_repo.yml
- hosts: websrvs
  tasks:
    - name: Add multiple repositories into the same file 
      yum_repository:
      name: test
      description: EPEL YUM repo
      file: external_repos
      baseurl: https://download.fedoraproject.org/pub/epel/$releasever/$basearch/
      gpgcheck: no

[root@ansible ~] ansible-playbook yum_repo.yml
[root@web1 ~] cat /etc/yum.repos.d/external_repos.repo
[test]
baseurl = https://download.fedoraproject.org/pub/epel/$releasever/$basearch/
gpgcheck = 0
name = EPEL YUM repo

[root@ansible ~] cat remove_yum_repo.yml
- hosts: websrvs
  tasks:
    - name: remove repo 
      yum_repository:
        name: test
        file: external_repos
        state: absent
        
[root@ansible ~] ansible-playbook remove_yum_repo.yml        

3.4.15)Service 模块

功能:
可以帮助我们 批量管理远程主机上的服务
image.png
范例:

# 启动远程主机的 httpd 服务,并实现开机自启
ansible 192.168.80.18 -m service -a 'name=httpd state=started enabled=yes'

# 停止服务   
ansible websrvs -m service -a 'name=httpd state=stopped'

# 生效服务
ansible websrvs -m service -a 'name=httpd state=reloaded'

# 重启服务
ansible websrvs -m shell -a "sed -i 's/^Listen 80/Listen 8080/' /etc/httpd/conf/httpd.conf"
ansible websrvs -m service -a 'name=httpd state=restarted'

image.png

3.4.16)User 模块

功能:
管理用户
可以帮助我们 批量管理远程主机上的用户
比如创建用户、修改用户、删除用户、为用户创建密钥对等操作。
范例:

// 创建用户
ansible all -m user -a 'name=user1 comment="test user" uid=2048 home=/app/user1 group=root'
ansible all -m user -a 'name=nginx comment=nginx uid=88 group=nginx groups="root,daemon" shell=/sbin/nologin system=yes create_home=no home=/data/nginx non_unique=yes'

// remove=yes 表示删除用户及家目录等数据, 默认 remove=no
ansible all -m user -a 'name=nginx state=absent remove=yes'

// 生成 123456 加密的密码
ansible localhost -m debug -a "msg={{ '123456'| password_hash('sha512','salt')}}"
localhost | SUCCESS => {
    "msg": "$6$salt$MktMKPZJ6t59GfxcJU20DwcwQzfMvOlHFVZiOVD71w."
}

// 用上面创建的密码创建用户
ansible websrvs -m user -a 'name=test password="$6$salt$MktMKPZJ6t59GfxcJU20DwcwQzfMvOlHFVZiOVD71w."'

// 创建用户 test, 并生成 4096bit 的私钥
ansible websrvs -m user -a 'name=test generate_ssh_key=yes ssh_key_bits=4096 ssh_key_file=.ssh/id_rsa'

3.4.17)Group 模块

功能:
管理组
image.png
范例:

// 创建组
ansible websrvs -m group  -a 'name=nginx gid=88 system=yes'

// 删除组
ansible websrvs -m group  -a 'name=nginx state=absent'

3.4.18)Lineinfile 模块

参考:
https://www.cnblogs.com/breezey/p/9297252.html

info
功能:
相当于 sed,可以修改文件内容
ansible 在使用 sed 进行替换时,经常会遇到需要转义的问题,而且 ansible 在遇到特殊符号进行替换时,存在问题,无法正常进行替换 。
其实在 ansible 自身提供了两个模块:lineinfile 模块和 replace 模块,可以方便的进行替换
一般在 ansible 当中去修改某个文件的单行进行替换的时候需要使用 lineinfile 模块
幂等性
:重复执行不会创建多行内容,多次执行,依然只增加有最后一行。

范例:
regexp 参数:使用正则表达式匹配对应的行,当替换文本时,如果有多行文本都能被匹配,则只有最后面被匹配到的那行文本才会被替换,当删除文本时,如果有多行文本都能被匹配,这么这些行都会被删除。如果想进行多行匹配进行替换需要使用 replace 模块。
( 重点 )

# 将 Listen 开头的行,修改为 Listen 8080 ( regexp 参数 )
ansible 192.168.80.18 -m lineinfile -a "path=/etc/httpd/conf/httpd.conf regexp='^Listen' line='Listen 8080'"

# 批量禁用远程主机的 SELinux 功能
ansible all -m lineinfile -a "path=/etc/selinux/config regexp='^SELINUX=' line='SELINUX=disabled'"

# 将 # 开头的行都删除
ansible 192.168.80.18 -m lineinfile -a 'dest=/etc/fstab state=absent regexp="^#"'

image.png

3.4.19)Replace 模块

知识点:
Lineinfile 模块 与 replace 模块 区别

  1. Lineinfile 模块 regexp 参数:使用正则表达式匹配对应的行,当替换文本时,如果有多行文本都能被匹配,则只有最后面被匹配到的那行文本才会被替换,当删除文本时,如果有多行文本都能被匹配,这么这些行都会被删除。
  2. Replace 模块:可以根据我们指定的正则表达式替换文件中的字符串,文件中所有被匹配到的字符串都会被替换。

info
范例:

# 查找所有以 UUID 开头的行, 并将这些行注释掉
ansible all -m replace -a "path=/etc/fstab regexp='^(UUID.*)' replace='#\1'"

# 查找所有以 # 开头, 紧接着是 UUID 的 行 (这些行是被注释掉的)
# 并移除行首的 # 符号, 从而取消这些行的注释.
ansible all -m replace -a "path=/etc/fstab regexp='^#(UUID.*)' replace='\1'"

image.png

3.4.20)SELinux 模块

批量管理远端主机的 SELINUX 策略

info
范例:

# 启用
[root@ansible ~] ansible 192.168.80.18 -m selinux -a 'state=enforcing policy=targeted'

# 禁用
[root@ansible ~] ansible 192.168.80.18 -m selinux -a 'state=disabled'

# 验证
[root@centos8 ~] grep -v '#' /etc/selinux/config
SELINUX=disabled
SELINUXTYPE=targeted

[root@centos8 ~] getenforce 
Permissive

image.png

3.4.21)reboot 模块

注意:
对于 Windows 目标,请使用 win_reboot 模块

[root@ansible ~] ansible websrvs -m reboot

3.4.22)mount 模块

功能:
批量管理被控端设备挂载

参数 说明
src 本地或远程设备的路径
path 设备挂载至本地的路径
fstype 挂载的文件系统类型,xfs、nfs...
opts 挂载的参数,defaults、ro...
state 挂载的状态,absent、mounted、unmounted
// 临时挂载
mount websrvs -m mount -a 'src="UUID=b3e48f45-f933-4c8e-a700-22a159ec9077" path=/home fstype=xfs opts=noatime state=present'

// 临时取消挂载
mount websrvs -m mount -a 'path=/home fstype=xfs opts=noatime state=unmounted'

// 永久挂载
ansible websrvs -m mount -a 'src=10.0.0.8:/data/wordpress path=/var/www/html/wp-content/uploads opts="_netdev" state=mounted'

// 永久卸载
ansible websrvs -m mount -a 'src=10.0.0.8:/data/wordpress path=/var/www/html/wpcontent/uploads state=absent'

3.4.23)Setup 模块( 重要 )

功能:
setup 模块 用于收集主机的系统信息,这些 facts 信息可以直接以变量的形式使用
但是如果主机较多,会影响执行速度。我们可以使用 gather_facts: no 来禁止 Ansible 收集 facts 信息

info
范例:
filter 参数
:用于进行条件过滤。如果设置,仅返回匹配过滤条件的信息。

// 这条命令会收集 inventory 中 websrvs 组下所有主机的所有 facts
// 并将这些信息打印出来
ansible websrvs -m setup

# 主机的节点名
ansible websrvs -m setup -a "filter=ansible_nodename"

# 主机的主机名
ansible websrvs -m setup -a "filter=ansible_hostname"

# 主机所属的域名 
ansible websrvs -m setup -a "filter=ansible_domain"

# 主机的总内存量
ansible websrvs -m setup -a "filter=ansible_memtotal_mb"

# 主机的物理内存量
ansible websrvs -m setup -a "filter=ansible_memory_mb"

# 主机当前空闲的内存量
ansible websrvs -m setup -a "filter=ansible_memfree_mb"

# 主机操作系统的家族 // 例如 RedHat、Debian 等
ansible websrvs -m setup -a "filter=ansible_os_family"

# 主机操作系统的主版本号
ansible websrvs -m setup -a "filter=ansible_distribution_major_version"

# 主机操作系统的完整版本号
ansible websrvs -m setup -a "filter=ansible_distribution_version"

# 主机的虚拟 CPU 数量
ansible websrvs -m setup -a "filter=ansible_processor_vcpus"

# 主机的所有 IPv4 地址列表
ansible websrvs -m setup -a "filter=ansible_all_ipv4_addresses"

# 主机的架构类型
ansible websrvs -m setup -a "filter=ansible_architecture"

# 主机已运行的时间
ansible websrvs -m setup -a "filter=ansible_uptime_seconds"

# 以 ansible_processor 开头的所有 facts
ansible websrvs -m setup -a "filter=ansible_processor*"

# 主机的环境变量 
ansible websrvs -m setup -a 'filter=ansible_env'

范例:

[root@ansible ~] ansible all -m setup -a 'filter=ansible_python_version'

image.png

范例:
取 IP 地址

// 取所有 IP
ansible 192.168.80.18 -m setup -a 'filter=ansible_all_ipv4_addresses'

// 取默认 IP
ansible all -m setup -a 'filter="ansible_default_ipv4"'

image.png

3.4.24)debug 模块

此模块可以用于输出信息,并且通过 msg 定制输出的信息内容
注意:
msg 后面跟变量时,需要加 " " 引起来
范例:
debug 模块 默认输出 Hello world

# 默认输出 Hello world ( 默认没有指定 msg, 默认输出 "Hello world!" )
[root@ansible ~] ansible 192.168.80.18 -m debug

image.png

范例:
利用 debug 模块输出变量

[root@centos8 ~] vim debug.yaml
---
- hosts: websrvs
  tasks:
  - name: output variables
    debug:
      msg: Host "{{ ansible_nodename }}" Ip "{{ ansible_default_ipv4.address }}"
      
[root@centos8 ~] ansible-playbook debug.yaml

image.png

范例:
显示字符串特定字符

# cat debug.yml
- hosts: all
  gather_facts: n
  vars:
    a: "12345"
  tasks:
  - debug:
    msg: "{{a[2]}}"
    
// 定义了一个字符串变量 a, 如果想要获取 a 字符串的第 3 个字符
// 则可以使用 "a[2]" 获取, 索引从 0 开始, 执行上例 playbook, debug 的输出信息如下:
TASK [debug] *************************
ok: [test71] => {
    "msg": "3"
}    

概述

众所周知,rtsp的流是无法在浏览器中播放的,这就导致海康摄像头、海康ISC等平台的视频流无法直接在浏览器中播放。

当下是web最盛行的时代,恨不得所有功能、系统、平台。。。都装在浏览器中。

本文简单介绍如何间接实现在浏览器中播放rtsp的流,涉及技术点和工具较多,本文仅做功能实现思路的梳理和简单的代码实践,后续整理更深入的实现原理。

大致流程为:

  • 准备一个rtsp流,可以通过直连摄像头、vlc自己推流等
  • 使用node技术栈搭建一个基于express的应用
  • 在express应用中通过 fluent-ffmpeg(可以简单认为是ffmpeg的包装器) 和ffmpeg进行交互
  • 使用ffmpeg解码rtsp为flv视频格式,并返回数据流
  • 在express应用中通过 express-ws 创建一个WebsocketServer服务,此服务接收ffmpeg返回的数据流,并发送到连接此服务的WebsocketClient上
  • 使用flvjs(b站开源)播放flv格式的视频,即充当WebsocketClient
    ,实际是通过接收WebsocketServer发送的数据并渲染到video标签上面

环境

本文示例代码是在如下环境下调试的

  • IDE:VsCode
  • node:16.15.0
  • npm:8.5.5
  • 脚手架:未使用express脚手架工具
  • ffmpeg:6.1.0

项目目录清单


¦   package-lock.json
¦   package.json
¦   
+---client
¦       client.html
¦       flv-1.6.2.js           
+---public
¦       index.html
¦       yikepingguo.png
¦       
+---server
        server.js
        websocket.js

  • package.json :npm包管理器配置文件
  • client:WebsocketClient
    • client.html:一个单独的html页面,用来播放视频。独立于express,需要从windows资源管理器中访问,并在浏览器中打开
    • flv.js:当前最新版本1.6.2
  • public:express应用静态资源存放目录,当前有一个helloWorld的html页面和一张示例图片。仅作用验证express应用是否启动。
  • server:WebsocketServer
    • server.js:实例化express应用
    • websocker.js:实例化一个WebsockerServer;借助ffmpeg将rtsp转换为flv

项目搭建步骤

新建一个文件夹,假设取名 RtspToFlv,并在VsCode中打开此文件夹,后续所有操作均在此文件夹操作。

在RtspToFlv中创建server文件夹。

在RtspToFlv中创建client文件夹。

引入相关npm依赖

新建一个文件 package.json ,并填入如下内容:



{
    "name": "RtspToWebsocket",
    "version": "1.0.0",
    "main": "index.js",
    "scripts": {
        "start": "nodemon ./server/server.js"
    },
    "keywords": [],
    "author": {
        "email": "liangchen_beijing@163.com",
        "name": "一颗苹果",
        "url": ""
    },
    "license": "Apache-2.0",
    "dependencies": {
        "express": "4.18.3",
        "express-ws": "5.0.2",
        "nodemon": "3.1.0",
        "ws": "8.16.0",
        "websocket-stream": "5.5.2",       
        "fluent-ffmpeg": "2.1.2"
    }
}



  • script.start:配置启动脚本,执行server/server.js
  • nodemon:nodemon 是一种工具,可在检测到目录中的文件更改时通过自动重新启动 node 应用程序来帮助开发基于 node.js 的应用程序
  • dependencies:依赖包,均使用当前最新版本
  • fluent-ffmpeg:该库提供了一组函数和实用程序来抽象 ffmpeg 的命令行用法。简单来说就是包装了一层使用ffmpeg的胶水代码。要使用此库,需要已经安装了 ffmpeg。也就是说本机必须安装了ffmpeg并已经添加到了系统环境变量。

使用 npm install 安装所有的依赖。

实例化一个express应用

在server文件夹中新建一个server.js 文件,并填入如下内容:



const express = require('express')

const app = express()

const expressWs = require('express-ws')
const websocket = require('./websocket')


expressWs(app);
app.use(express.static('public'))
app.use('/websocket', websocket)
app.get('*', (req, res) => {})
app.listen(8009, () => {
  console.log('server is listening on port 8009')
})



  • app.use('/websocket', websocket):将后面创建的websocker.js模块加载进来

创建WebsocketServer并解析rtsp

在server文件夹中新建一个 websocket.js 文件,并填入如下内容:



const express = require('express');
const expressWs = require('express-ws')
var webSocketStream = require("websocket-stream/stream");
var ffmpeg = require("fluent-ffmpeg");


const router = express.Router()
expressWs(router);
router.ws('/test', (ws, req) => {
  ws.send('连接成功')
  //推视频流
  const stream = webSocketStream(ws, {
    binary: true,
    browserBufferTimeout: 1000000
  }, {
    browserBufferTimeout: 1000000
  });
   var url = "rtsp://xxx:xxx@192.168.1.xxx:xxx/ch1/main/av_stream";
  try {
    ffmpeg(url)
      .addInputOption("-rtsp_transport", "tcp", "-buffer_size", "102400") // 这里可以添加一些 RTSP 优化的参数
      .on("start", function () {
        console.log(url, "Stream started.");
      })
      .on("codecData", function () {
        console.log(url, "Stream codecData.")// 摄像机在线处理
      })
      .on("error", function (err) {
        console.log(url, "An error occured: ", err.message);
      })
      .on("end", function () {
        console.log(url, "Stream end!");// 摄像机断线的处理
      })
      .outputFormat("flv").videoCodec("copy").noAudio().pipe(stream);
  } catch (error) {
    console.log(error);
  }
  //接收到消息
  ws.on('message', msg => {
    console.log("收到消息");
    ws.send(msg)
  })
})
module.exports = router



  • 创建了一个Websocket端点(/websocket/test),提供给客户端连接
  • webSocketStream:创建了一个stream对象,将ffmpeg解析出来的数据流存入stream中,并实时发送到了WebsocketClient
  • ffmpeg有丰富的配置参数,可以通过参照官方文档进行调优

至此,WebsocketServer已经搭建好了,使用 npm start 命令尝试启动express应用。

使用flv播放

从flvjs官网下载flv.js文件,并放到client文件夹下面。

在client文件夹中新建一个文件 client.html ,并填入如下内容:




<html>
<head>
    <meta charset="UTF-8">
    <title>websocket客户端,使用flvjs播放视频</title>
    <style>
        .websocket {
            width: 500px;
            height: 500px;
            border: 1px solid #ccc;
            margin: 10px auto 0 auto;
            text-align: center;
        }
        #receive-box {
            width: 300px;
            height: 200px;
            overflow: auto;
            margin: 0 auto 10px auto;
            border: 1px solid #25d1ff;
        }
        #msg-need-send {
            width: 300px;
            height: 100px;
            resize: none;
            border-radius: 5px;
        }
        #send-btn {
            border: none;
            border-radius: 5px;
            background: #25d1ff;
            padding: 5px 10px;
            color: #fff;
            cursor: pointer;
        }
        #exit {
            border: 1px solid #ccc;
            border-radius: 5px;
            background: none;
            padding: 5px 10px;
            cursor: pointer;
        }
    </style>
</head>
<body>
    <h5>socket-client</h5>
    <div class="websocket">
        <div class="receive">
            <p>服务端返回的消息:</p>
            <div id="receive-box"></div>
        </div>
        <div class="send">
            <p>给服务端发送消息:</p>
            <textarea type="text" id="msg-need-send"></textarea>
            <p>
                <button id="send-btn">点击发消息给服务端</button>
            </p>
        </div>
        <button id="exit">关闭连接</button>
    </div>
    <h5>播放器</h5>
    <div>
        <video id="my-player" preload="auto" muted autoplay type="rtmp/flv">
            <source src="">
        </video>
    </div>
   
    <script src="./flv-1.6.2.js"></script>
    <script>
        window.onload = function () {
            // test();
            play();
           
        };
        //测试websocket收发消息
        function test() {
            const ws = new WebSocket('ws://127.0.0.1:8009/websocket/test')
            ws.onopen = e => {
                console.log(`WebSocket 连接状态: ${ws.readyState}`)
            }
            ws.onmessage = data => {
                console.log("接收到消息");
                const receiveBox = document.getElementById('receive-box')
                receiveBox.innerHTML += `<p>${data.data}</p>`
                receiveBox.scrollTo({
                    top: receiveBox.scrollHeight,
                    behavior: "smooth"
                })
            }
            ws.onclose = data => {
                console.log('WebSocket连接已关闭')
                console.log(data);
            }
            //发送msg
            var sendBtn = document.getElementById("send-btn");
            sendBtn.onclick = () => {
                ws.send(document.getElementById('msg-need-send').value)
            }
            //关闭websocket连接
            var exitBtn = document.getElementById("exit");
            exitBtn.onclick = () => {
                ws.close()
            }
        }
        //播放视频
        function play() {
            videoElement = document.getElementById('my-player');
            if (flvjs.isSupported()) {
                flvPlayer = flvjs.createPlayer({
                    type: 'flv',                    //媒体类型
                    //flv格式媒体URL,即ws-server地址
                    url: 'ws://127.0.0.1:8009/websocket/test',
                    isLive: true,                   //数据源是否为直播流
                    hasAudio: false,                //数据源是否包含有音频
                    hasVideo: true,                 //数据源是否包含有视频
                    enableStashBuffer: false        //是否启用缓存区
                }, {
                    enableWorker: false,            //不启用分离线程
                    enableStashBuffer: false,       //关闭IO隐藏缓冲区
                    autoCleanupSourceBuffer: true   //自动清除缓存
                });
                flvPlayer.attachMediaElement(videoElement); //将播放实例注册到节点
                flvPlayer.load();                   //加载数据流
                flvPlayer.play();                   //播放数据流
            } else {
                alert("not support flvjs");
            }
        }
       
    </script>
</body>



  • html文件中分两部分,第一部分是一个简单的Websocket功能测试,第二部分放了一个video的标签,flvjs将把视频流渲染到video中
  • js代码中有两个方法,test用于测试Websocket连接和通信,play方法用于播放flv视频流
  • url:注意url中path为/websocket/test,在服务的使用了
    express.Router,实际WebsocketServer对外提供服务的端点就是/websocket/test
  • flvjs参数:flvjs有丰富的各种参数和回调,可以参照官方文档做调优

浏览器中测试

进入windows的文件资源管理器并使用edge打开client.html,将可以看到rtsp视频流的画面已经渲染到了浏览器中,并且网络连接的地方不停的接收这WebSocketServer发送的flv视频流数据。

ws请求地址为:ws://127.0.0.1:8009/websocket/test

代码

引用

大家好,我是三友~~

RocketMQ作为阿里开源的消息中间件,深受广大开发者的喜爱

而这其中一个很重要原因就是,它处理消息和拉取消息的速度非常快

那么,问题来了,RocketMQ为什么这么快呢?

接下来,我将从以下10个方面来探讨一下RocketMQ这么快的背后原因

如果你对RocketMQ还不了解,可以从公众号后台菜单栏中查看我之前写的关于RocketMQ的几篇文章

如果你对RocketMQ源码也感兴趣,可以从下面这个仓库fork一下源码,我在源码中加了中文注释,并且后面我还会持续更新注释

https://github.com/sanyou3/rocketmq.git

本文是基于RocketMQ 4.9.x版本讲解


批量发送消息

RocketMQ在发送消息的时候支持一次性批量发送多条消息,如下代码所示:

public class Producer {
    public static void main(String[] args) throws Exception {
        //创建一个生产者,指定生产者组为 sanyouProducer
        DefaultMQProducer producer = new DefaultMQProducer("sanyouProducer");
        // 指定NameServer的地址
        producer.setNamesrvAddr("192.168.200.143:9876");
        // 启动生产者
        producer.start();

        //用以及集合保存多个消息
        List<Message> messages = new ArrayList<>();
        messages.add(new Message("sanyouTopic""三友的java日记 0".getBytes()));
        messages.add(new Message("sanyouTopic""三友的java日记 1".getBytes()));
        messages.add(new Message("sanyouTopic""三友的java日记 2".getBytes()));
        // 发送消息并得到消息的发送结果,然后打印
        SendResult sendResult = producer.send(messages);
        System.out.printf("%s%n", sendResult);

        // 关闭生产者
        producer.shutdown();
    }

}

通过批量发送消息,减少了RocketMQ客户端与服务端,也就是Broker之间的网络通信次数,提高传输效率

不过在使用批量消息的时候,需要注意以下三点:

  • 每条消息的Topic必须都得是一样的
  • 不支持延迟消息和事务消息
  • 不论是普通消息还是批量消息,总大小默认不能超过4m


消息压缩

RocketMQ在发送消息的时候,当发现消息的大小超过4k的时候,就会对消息进行压缩

这是因为如果消息过大,会对网络带宽造成压力

不过需要注意的是,如果是批量消息的话,就不会进行压缩,如下所示:

压缩消息除了能够减少网络带宽造成压力之外,还能够节省消息存储空间

RocketMQ在往磁盘存消息的时候,并不会去解压消息,而是直接将压缩后的消息存到磁盘

消费者拉取到的消息其实也是压缩后的消息

不过消费者在拿到消息之后会对消息进行解压缩

当我们的业务系统拿到消息的时候,其实就是解压缩后的消息

虽然压缩消息能够减少带宽压力和磁盘存储压力

但是由于压缩和解压缩的过程都是在客户端(生产者、消费者)完成的

所以就会导致客户端消耗更多的CPU资源,对CPU造成一定的压力


高性能网络通信模型

当生产者处理好消息之后,就会将消息通过网络通信发送给服务端

而RocketMQ之所以快的一个非常重要原因就是它拥有高性能网络通信模型

RocketMQ网络通信这块底层是基于Netty来实现的

Netty是一款非常强大、非常优秀的网络应用程序框架,主要有以下几个优点:

  • 异步和事件驱动:Netty基于事件驱动的架构,使用了异步I/O操作,避免了阻塞式I/O调用的缺陷,能够更有效地利用系统资源,提高并发处理能力。
  • 高性能:Netty针对性能进行了优化,比如使用直接内存进行缓冲,减少垃圾回收的压力和内存拷贝的开销,提供了高吞吐量、低延迟的网络通讯能力。
  • 可扩展性:Netty的设计允许用户自定义各种Handler来处理协议编码、协议解码和业务逻辑等。并且,它的模块可插拔性设计使得用户可以根据需要轻松地添加或更换组件。
  • 简化API:与Java原生NIO库相比,Netty提供了更加简洁易用的API,大大降低了网络编程的复杂度。
  • 安全:Netty内置了对SSL/TLS协议的支持,使得构建安全通信应用变得容易。
  • 丰富的协议支持:Netty提供了HTTP、HTTP/2、WebSocket、Google Protocol Buffers等多种协议的编解码支持,满足不同网络应用需求。
  • ...

就是因为Netty如此的强大,所以不仅仅RocketMQ是基于Netty实现网络通信的

几乎绝大多数只要涉及到网络通信的Java类框架,底层都离不开Netty的身影

比如知名RPC框架Dubbo、Java gRPC实现、Redis的亲儿子Redisson、分布式任务调度平台xxl-job等等

它们底层在实现网络通信时,都是基于Netty框架


零拷贝技术

当消息达到RocketMQ服务端之后,为了能够保证服务端重启之后消息也不丢失,此时就需要将消息持久化到磁盘

由于涉及到消息持久化操作,就涉及到磁盘文件的读写操作

RocketMQ为了保证磁盘文件的高性能读写,使用到了一个叫
零拷贝
的技术


1、传统IO读写方式

说零拷贝之前,先说一下传统的IO读写方式。

比如现在有一个需求,
将磁盘文件通过网络传输出去

那么整个传统的IO读写模型如下图所示

传统的IO读写其实就是read + write的操作,整个过程会分为如下几步

  • 用户调用read()方法,开始读取数据,此时发生一次上下文从用户态到内核态的切换,也就是图示的切换1
  • 将磁盘数据通过DMA拷贝到内核缓存区
  • 将内核缓存区的数据拷贝到用户缓冲区,这样用户,也就是我们写的代码就能拿到文件的数据
  • read()方法返回,此时就会从内核态切换到用户态,也就是图示的切换2
  • 当我们拿到数据之后,就可以调用write()方法,此时上下文会从用户态切换到内核态,即图示切换3
  • CPU将用户缓冲区的数据拷贝到Socket缓冲区
  • 将Socket缓冲区数据拷贝至网卡
  • write()方法返回,上下文重新从内核态切换到用户态,即图示切换4

整个过程发生了4次上下文切换和4次数据的拷贝,这在高并发场景下肯定会严重影响读写性能。

所以为了减少上下文切换次数和数据拷贝次数,就引入了零拷贝技术。


2、零拷贝

零拷贝技术是一个思想,指的是指计算机执行操作时,CPU不需要先将数据从某处内存复制到另一个特定区域。

实现零拷贝的有以下两种方式:

  • mmap()
  • sendfile()


mmap()

mmap(memory map)是一种内存映射文件的方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。

简单地说就是
内核缓冲区和应用缓冲区
进行映射

用户在操作应用缓冲区时就
好像
在操作内核缓冲区

比如你往应用缓冲区写数据,就好像直接往内核缓冲区写数据,这个过程不涉及到CPU拷贝

而传统IO就需要将在写完应用缓冲区之后需要将数据通过CPU拷贝到内核缓冲区

同样地上述文件传输功能,如果使用mmap的话,由于我们可以直接操作内核缓冲区

此时我们就可以将内核缓冲区的数据直接CPU拷贝到Socket缓冲区

整个IO模型就会如下图所示:

基于mmap IO读写其实就变成mmap + write的操作,也就是用mmap替代传统IO中的read操作

  • 当用户发起mmap调用的时候会发生上下文切换1,进行内存映射,然后数据被拷贝到内核缓冲区,mmap返回,发生上下文切换2
  • 随后用户调用write,发生上下文切换3,将内核缓冲区的数据拷贝到Socket缓冲区,write返回,发生上下文切换4。

上下文切换的次数仍然是4次,但是拷贝次数只有3次,少了一次CPU拷贝。

所以总的来说,
使用mmap就可以直接少一次CPU拷贝

说了这么多,那么在Java中,如何去实现mmap,也就是内核缓冲区和应用缓冲区映射呢?

其实在Java NIO类库中就提供了相应的API,当然底层也还是调用Linux系统的mmap()实现的,代码如下所示

FileChannel fileChannel = new RandomAccessFile("test.txt""rw").getChannel();
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, fileChannel.size());

MappedByteBuffer,你可以认为操作这个对象就好像直接操作内核缓冲区

比如可以通过MappedByteBuffer读写磁盘文件,此时就好像直接从内核缓冲区读写数据

当然也可以直接通过MappedByteBuffer将文件的数据拷贝到Socket缓冲区,实现上述文件传输的模型

这里我就不贴相应的代码了

RocketMQ在存储文件时,就是通过mmap技术来实现高效的文件读写

RocketMQ中使用mmap代码
RocketMQ中使用mmap代码

虽然前面一直说mmap不涉及CPU拷贝,但在某些特定场景下,尤其是在写操作或特定的系统优化策略下,还是可能涉及CPU拷贝。


sendfile()

sendfile()跟mmap()一样,也会减少一次CPU拷贝,但是它同时也会减少两次上下文切换。

sendfile()主要是用于文件传输,比如将文件传输到另一个文件,又或者是网络

当基于sendfile()时,一次文件传输的过程就如下图所示:

用户发起sendfile()调用时会发生切换1,之后数据通过DMA拷贝到内核缓冲区,之后再将内核缓冲区的数据CPU拷贝到Socket缓冲区,最后拷贝到网卡,sendfile()返回,发生切换2。

同样地,Java NIO类库中也提供了相应的API实现sendfile

当然底层还是操作系统的sendfile()

FileChannel channel = FileChannel.open(Paths.get("./test.txt"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
//调用transferTo方法向目标数据传输
channel.transferTo(position, len, target);

FileChannel的transferTo方法底层就是基于sendfile来的

在如上代码中,并没有文件的读写操作,而是直接将文件的数据传输到target目标缓冲区

也就是说,sendfile传输文件时是无法知道文件的具体的数据的

但是mmap不一样,mmap可以来直接修改内核缓冲区的数据

假设如果需要对文件的内容进行修改之后再传输,mmap可以满足


小总结

在传统IO中,如果想将用户缓存区的数据放到内核缓冲区,需要经过CPU拷贝

而基于零拷贝技术可以减少CPU拷贝次数,常见的有两种:

  • mmap()
  • sendfile()

mmap()是将用户缓冲区和内核缓冲区共享,操作用户缓冲区就好像直接操作内核缓冲区,读写数据时不需要CPU拷贝

Java中可以使用MappedByteBuffer这个API来达到操作内核缓冲区的效果

sendfile()主要是用于文件传输,可以通过sendfile()将一个文件内容传输到另一个文件中或者是网络中

sendfile()在整个过程中是无法对文件内容进行修改的,如果想修改之后再传输,可以通过mmap来修改内容之后再传输

上面出现的API都是Java NIO标准类库中的

如果你看的还是很迷糊,那直接记住一个结论

之所以基于零拷贝技术能够高效的实现文件的读写操作,主要因为是减少了CPU拷贝次数和上下文切换次数

在RocketMQ中,底层是基于mmap()来实现文件的高效读写的


顺序写

RocketMQ在存储消息时,除了使用零拷贝技术来实现文件的高效读写之外

还使用顺序写的方式提高数据写入的速度

RocketMQ会将消息按照顺序一条一条地写入文件中

这种顺序写的方式由于减少了磁头的移动和寻道时间,在大规模数据写入的场景下,使得数据写入的速度更快


高效的数据存储结构


Topic和队列的关系

在RocketMQ中,默认会为每个Topic在每个服务端Broker实例上创建4个队列

如果有两个Broker,那么默认就会有8个队列

每个Broker上的队列上的编号(queueId)都是从0开始


CommitLog

前面一直说,当消息到达RocektMQ服务端时,需要将消息存到磁盘文件

RocketMQ给这个存消息的文件起了一个高大上的名字:
CommitLog

由于消息会很多,所以为了防止文件过大,CommitLog在物理磁盘文件上被分为多个磁盘文件,每个文件默认的固定大小是1G

消息在写入到文件时,除了包含消息本身的内容数据,也还会包含其它信息,比如

  • 消息的Topic
  • 消息所在队列的id,生产者发送消息时会携带这个队列id
  • 消息生产者的ip和端口
  • ...

这些数据会和消息本身按照一定的顺序同时写到CommitLog文件中

上图中黄色排列顺序和实际的存的内容并非实际情况,我只是举个例子


ConsumeQueue

除了CommitLog文件之外,RocketMQ还会为每个队列创建一个磁盘文件

RocketMQ给这个文件也起了一个高大上的名字:
ConsumeQueue

当消息被存到CommitLog之后,其实还会往这条消息所在队列的ConsumeQueue文件中插一条数据

每个队列的ConsumeQueue也是由多个文件组成,每个文件默认是存30万条数据

插入ConsumeQueue中的
每条数据
由20个字节组成,包含3部分信息

  • 消息在CommitLog的起始位置(8个字节),也被称为偏移量
  • 消息在CommitLog存储的长度(8个字节)
  • 消息tag的hashCode(4个字节)

每条数据也有自己的编号(offset),默认从0开始,依次递增

所以,通过ConsumeQueue中存的数据可以从CommitLog中找到对应的消息

那么这个ConsumeQueue有什么作用呢?

其实通过名字也能猜到,这其实跟消息消费有关

当消费者拉取消息的时候,会告诉服务端四个比较重要的信息

  • 自己需要拉取哪个Topic的消息
  • 从Topic中的哪个队列(queueId)拉取
  • 从队列的哪个位置(offset)拉取消息
  • 拉取多少条消息(默认32条)

服务端接收到消息之后,总共分为四步处理:

  • 首先会找到对应的Topic
  • 之后根据queueId找到对应的ConsumeQueue文件
  • 然后根据offset位置,从ConsumeQueue中读取跟拉取消息条数一样条数的数据

由于ConsumeQueue每条数据都是20个字节,所以根据offset的位置可以很快定位到应该从文件的哪个位置开始读取数据

  • 最后解析每条数据,根据偏移量和消息的长度到CommitLog文件查找真正的消息内容

整个过程如下图所示:

所以,从这可以看出,当消费者在拉取消息时,ConsumeQueue其实就相当于是一个索引文件,方便快速查找在CommitLog中的消息

并且无论CommitLog存多少消息,整个查找消息的时间复杂度都是O(1)

由于ConsumeQueue每条数据都是20个字节,所以如果需要找第n条数据,只需要从第
n * 20
个字节的位置开始读20个字节的数据即可,这个过程是O(1)的

当从ConsumeQueue找到数据之后,解析出消息在CommitLog存储的起始位置和大小,之后就直接根据这两个信息就可以从CommitLog中找到这条消息了,这个过程也是O(1)的

所以整个查找消息的过程就是O(1)的

所以从这就可以看出,ConsumeQueue和CommitLog相互配合,就能保证快速查找到消息,消费者从而就可以快速拉取消息


异步处理

RocketMQ在处理消息时,有很多异步操作,这里我举两个例子:

  • 异步刷盘
  • 异步主从复制


异步刷盘

前面说到,文件的内容都是先写到内核缓冲区,也可以说是PageCache

而写到PageCache并不能保证消息一定不丢失

因为如果服务器挂了,这部分数据还是可能会丢失的

所以为了解决这个问题,RocketMQ会开启一个后台线程

这个后台线程默认每隔0.5s会将消息从PageCache刷到磁盘中

这样就能保证消息真正的持久化到磁盘中


异步主从复制

在RocketMQ中,支持主从复制的集群模式

这种模式下,写消息都是写入到主节点,读消息一般也是从主节点读,但是有些情况下可能会从从节点读

从节点在启动的时候会跟主节点建立网络连接

当主节点将消息存储的CommitLog文件之后,会通过后台一个异步线程,不停地将消息发送给从节点

从节点接收到消息之后,就直接将消息存到CommitLog文件


小总结

就是因为有这些异步操作,大大提高了消息存储的效率

不过值得注意的,尽管异步可以提高效率,但是也增加了不确定性,比如丢消息等等

当然RocketMQ也支持同步等待消息刷盘和主从复制成功,但这肯定会导致性能降低

所以在项目中可以根据自己的业务需要选择对应的刷盘和主从复制的策略


批量处理

除了异步之外,RocketMQ还大量使用了批量处理机制

比如前面说过,消费者拉取消息的时候,可以指定拉取拉取消息的条数,批量拉取消息

这种批量拉取机制可以减少消费者跟RocketMQ服务端的网络通信次数,提高效率

除了批量拉取消息之外,RocketMQ在提交消费进度的时候也使用了批量处理机制

所谓的提交消费进度就是指

当消费者在成功消费消息之后,需要将所消费消息的offset(ConsumeQueue中的offset)提交给RocketMQ服务端

告诉RocketMQ,这个Queue的消息我已经消费到了这个位置了

这样一旦消费者重启了或者其它啥的要从这个Queue重新开始拉取消息的时候

此时他只需要问问RocketMQ服务端上次这个Queue消息消费到哪个位置了

之后消费者只需要从这个位置开始消费消息就行了,这样就解决了接着消费的问题

RocketMQ在提交消费进度的时候并不是说每消费一条消息就提交一下这条消息对应的offset

而是默认每隔5s定时去批量提交一次这5s钟消费消息的offset


锁优化

由于RocketMQ内部采用了很多线程异步处理机制

这就一定会产生并发情况下的线程安全问题

在这种情况下,RocketMQ进行了多方面的锁优化以提高性能和并发能力

就比如拿消息存储来说

为了保证消息是按照顺序一条一条地写入到CommitLog文件中,就需要对这个写消息的操作进行加锁

而RocketMQ默认使用ReentrantLock来加锁,并不是synchronized

当然除了默认情况外,RocketMQ还提供了一种基于CAS加锁的实现

这种实现可以在写消息压力较低的情况下使用

当然除了写消息之外,在一些其它的地方,RocketMQ也使用了基于CAS的原子操作来代替传统的锁机制

例如使用大量使用了AtomicInteger、AtomicLong等原子类来实现并发控制,避免了显式的锁竞争,提高了性能


线程池隔离

RocketMQ在处理请求的时候,会为不同的请求分配不同的线程池进行处理

比如对于消息存储请求和拉取消息请求来说

Broker会有专门为它们分配两个不同的线程池去分别处理这些请求

这种让不同的业务由不同的线程池去处理的方式,能够有效地隔离不同业务逻辑之间的线程资源的影响

比如消息存储请求处理过慢并不会影响处理拉取消息请求

所以RocketMQ通过线程隔离及时可以有效地提高系统的并发性能和稳定性


总结

到这我就从10个方面讲完了RocketMQ为什么这么快背后的原因

不知道你读完文章之后有什么感受

其实实际上RocketMQ快的原因远远不止我上面说的这几点

RocketMQ本身还做了很多其它的优化,比如拉取消息的长轮询机制、文件预热机制等等

正是因为有各种各样设计细节上的优化,才最终决定了RocketMQ出色的性能表现

好了,本文就讲到这里,如果觉得本文对你有点帮助,欢迎点赞、在看、收藏、转发分享给其他需要的人

你的支持就是我更新的最大动力,感谢感谢!

让我们下期再见,拜拜!

参考文章:

  • https://mp.weixin.qq.com/s/mOD9Z6pxSxBQuNx3YaUw3A


往期热门文章推荐

如何去阅读源码,我总结了18条心法

如何写出漂亮代码,我总结了45个小技巧

三万字盘点Spring/Boot的那些常用扩展点

三万字盘点Spring 9大核心基础功能

两万字盘点那些被玩烂了的设计模式

万字+20张图探秘Nacos注册中心核心实现原理

万字+20张图剖析Spring启动时12个核心步骤

1.5万字+30张图盘点索引常见的11个知识点

扫码或者搜索关注公众号
三友的java日记
,及时干货不错过,公众号致力于通过画图加上通俗易懂的语言讲解技术,让技术更加容易学习,回复 面试 即可获得一套面试真题。

一、简介

在之前的多线程系列文章中,我们陆陆续续的介绍了
Thread
线程类相关的知识和用法,其实在
Thread
类上还有一层
ThreadGroup
类,也就是线程组。

今天我们就一起来简单的聊聊
线程组
相关的知识和用法。

二、什么是线程组

线程组,简单来说就是多个线程的集合,它的出现主要是为了更方便的管理线程。

从结构角度看,线程组与线程之间其实是一个父子结构,一个线程组可以拥有几个线程,同时也可以拥有几个线程组。整个组织结构像一棵树一样,每个线程一定有一个线程组,线程组可能又有一个父线程组,追溯到根节点就是一个系统线程组。

线程组与线程之间的关系,可以用如下图来描述。

比如,我们通常创建的
main
方法,对应的是
main
线程,它所属的是
main
线程组,
main
线程组的父级是是
system
系统线程组。

public static void main(String[] args) {
    Thread currentThread = Thread.currentThread();
    ThreadGroup currentThreadGroup = currentThread.getThreadGroup();
    ThreadGroup systemThreadGroup = currentThreadGroup.getParent();
    System.out.println("currentThread:" + currentThread.getName());
    System.out.println("currentThreadGroup:" + currentThreadGroup.getName());
    System.out.println("systemThreadGroup:" + systemThreadGroup.getName());
}

输出结果如下:

currentThread:main
currentThreadGroup:main
systemThreadGroup:system

其中
system
线程组就是根节点,再上一层就没有了,如果调用会抛空指针异常。

线程组最主要的作用是:可以实现批量管理线程或者线程组,有效的对线程或者线程组对象进行检查、尝试中断等操作。

下面我们就一起来看看
ThreadGroup
的常用方法和使用技巧。

三、线程组用法详解

3.1、构造方法介绍

ThreadGroup
提供了两个构造方法,内容如下:

方法 描述
ThreadGroup(String name) 根据线程组名称创建线程组,其父线程组为main线程组
ThreadGroup(ThreadGroup parent, String name) 根据线程组名称创建线程组,其父线程组为指定的 parent 线程组

其中支持指定父级线程组的方法,在实际的使用中比较常见。

下面,我们演示一下这两个构造函数的用法:

public static void main(String[] args) {
    ThreadGroup subThreadGroup1 = new ThreadGroup("sub1");
    ThreadGroup subThreadGroup2 = new ThreadGroup(subThreadGroup1, "sub2");
    System.out.println("sub1 parent thread group name:" + subThreadGroup1.getParent().getName());
    System.out.println("sub2 parent thread group name:" + subThreadGroup2.getParent().getName());
}

输出结果如下:

sub1 parent thread group name:main
sub2 parent thread group name:sub1

3.2、核心方法介绍

ThreadGroup
提供了很多有用的方法,下面整理了一些方法的简要介绍,内容如下:

方法 描述
public final String getName() 返回此线程组的名称
public final ThreadGroup getParent() 返回此线程组的父级
public final boolean parentOf(ThreadGroup g) 测试此线程组是线程组参数还是其父级线程组之一
public int activeCount() 返回此线程组及其子组中活动线程的数量的估计值,递归遍历该线程组中所有的子组,此方法主要用于调试和监视目的
public int activeGroupCount () 返回此线程组及其子组中活动组的数目的估计值。递归遍历该线程组中的所有子群,此方法主要用于调试和监视目的
public final void checkAccess() 确定当前运行的线程是否具有修改此线程组的权限
public int enumerate(Thread[] list) 将这个线程组复制到它所在的组及其子组中
public final void destroy() 销毁此线程组及其所有子组,当线程组还要子线程或者子线程组,会抛异常
public boolean isDestroyed() 测试此线程组是否已被销毁
public final int getMaxPriority() 返回此线程组的最大优先级
public final void setMaxPriority(int pri) 设置组的最大优先级。线程组中具有较高优先级的线程不会受到影响
public final boolean isDaemon() 测试此线程组是否是守护线程组
public final void setDaemon(boolean daemon) 修改此线程组的守护进程状态
public final void interrupt() 尝试中断此线程组中的所有线程
public void list() 将此线程组的信息打印到标准输出。此方法仅用于调试

下面我们抽取几个比较常见的方法,进行演示介绍。

3.2.1、activeCount 方法

activeCount()
方法用于返回此线程组及其子组中活动线程的数量的估计值,因为线程的数量是动态发生变化的,返回的值只是一个估计值。

我们看一个简单的例子就知道了。

public class MyThread extends Thread{

    public MyThread(ThreadGroup group, String name) {
        super(group, name);
    }

    @Override
    public void run() {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public class MyThreadMainTest {

    public static void main(String[] args) throws Exception {
        ThreadGroup tg = new ThreadGroup("group1");
        MyThread t1 = new MyThread (tg, "t1");
        MyThread t2 = new MyThread (tg, "t2");
        t1.start();
        t2.start();

        System.out.println("线程组的名称:" +  tg.getName() + ",活动的线程数:" +  tg.activeCount());
        Thread.sleep(1000);
        System.out.println("线程组的名称:" +  tg.getName() + ",活动的线程数:" +  tg.activeCount());
    }
}

输出结果如下:

线程组的名称:group1,活动的线程数:2
线程组的名称:group1,活动的线程数:0

第一次检查线程都处于运行状态,因此活动的线程数为 2;过 1 秒之后,线程运行结束,活动的线程数为 0。

3.2.2、isDaemon 方法

setDaemon()
方法用于测试此线程组是否是守护线程组。

需要注意的是:后台线程组和后台线程是两个概念,后台线程组的特性是最后一个线程执行完或最后一个线程被销毁时,后台线程组自动销毁,线程组只是为了统一管理线程的一个方式,跟后台线程有区别!

例子如下:

public class MyThread extends Thread{

    public MyThread(ThreadGroup group, String name) {
        super(group, name);
    }

    @Override
    public void run() {
        System.out.println("当前线程:" + Thread.currentThread().getName() + ",是否后台线程:" +  Thread.currentThread().isDaemon());
        System.out.println("当前线程组:" + Thread.currentThread().getThreadGroup().getName() + ",是否后台线程组:" +  Thread.currentThread().getThreadGroup().isDaemon());
    }
}

public class MyThreadMainTest4 {

    public static void main(String[] args) throws Exception {
        ThreadGroup mainGroup = Thread.currentThread().getThreadGroup();
        new MyThread(mainGroup, "t1").start();

        Thread.sleep(100);

        // 设置守护线程组
        ThreadGroup tg = new ThreadGroup("group1");
        tg.setDaemon(true);
        new MyThread(tg,"t2").start();
    }
}

输出结果如下:

当前线程:t1,是否后台线程:false
当前线程组:main,是否后台线程组:false
当前线程:t2,是否后台线程:false
当前线程组:group1,是否后台线程组:true
3.2.3、interrupt 方法

interrupt()
方法用于尝试中断此线程组中的所有线程。如果正在运行的线程没有进入阻塞,是无法中断的。

例子如下:

public class MyThreadA extends Thread{

    public MyThreadA(ThreadGroup group, String name) {
        super(group, name);
    }

    @Override
    public void run() {
        System.out.println("线程:" + Thread.currentThread().getName() + ",开始运行");
        String t;
        for (int i = 0; i < 1000000000; i++) {
            t = i + "";
        }
        System.out.println("线程:" + Thread.currentThread().getName() + ",停止运行");
    }
}
public class MyThreadB extends Thread{

    public MyThreadB(ThreadGroup group, String name) {
        super(group, name);
    }

    @Override
    public void run() {
        System.out.println("线程:" + Thread.currentThread().getName() + ",开始运行");
        while (!Thread.interrupted()){
        }
        System.out.println("线程:" + Thread.currentThread().getName() + ",停止运行");
    }
}
public class MyThreadC extends Thread{

    public MyThreadC(ThreadGroup group, String name) {
        super(group, name);
    }

    @Override
    public void run() {
        System.out.println("线程:" + Thread.currentThread().getName() + ",开始运行");
        try {
            Thread.sleep(1000);
        } catch (Exception e){
//            e.printStackTrace();
        }
        System.out.println("线程:" + Thread.currentThread().getName() + ",停止运行");
    }
}
public class MyThreadMainTest {

    public static void main(String[] args) throws Exception {
        ThreadGroup tg = new ThreadGroup("group1");
        new MyThreadA(tg,"t1").start();
        new MyThreadB(tg,"t2").start();
        new MyThreadC(tg,"t3").start();

        // 尝试中断线程组里面的线程
        tg.interrupt();
    }
}

输出结果如下:

线程:t1,开始运行
线程:t2,开始运行
线程:t2,停止运行
线程:t3,开始运行
线程:t3,停止运行

线程
t1
只有等它运行结束,通过
interrupt()
不能中断程序!

四、小结

本文主要围绕线程组的一些基本概念以及常用方法,并结合了一些简单示例进行介绍。

线程组的出现更多的是便于有组织的管理线程,比如 Java 的线程池就用到了线程组,更多的线程知识,我们在后续的文章中会进行介绍。

如果有描述不对的地方,欢迎网友留言指出。

五、参考

1、
https://www.cnblogs.com/xrq730/p/4856072.html

2、
https://cloud.tencent.com/developer/article/1633465