HICSC
006 进程管理:信号量的简单应用

信号量可以用来解决很多经典的同步问题,比如:生产者-消费者问题、哲学家进餐问题、读者-写者问题、理发师问题等等;也可以用于解决实际开发中的很多问题,比如:限流、资源隔离、异步任务,同步返回等等。这些问题在后文都会有详细的讲解,本文从两个简单问题入手,说说信号量的应用。

一、信号量实现互斥

回想下前面的内容,让多个进程互斥的访问某临界资源,只需要设置资源的信号量数值为1,然后各个进程进入临界区之前先执行wait操作,若资源未被访问,wait成功,进入临界区,否则wait失败,进程阻塞,通过这样的机制来保证该临界资源可以被互斥的访问。当前进程退出临界区后,还要对信号量进行signal操作,以便释放该资源,大致的流程可以用如下的伪代码表示:

// 临界资源的信号量数值为1,即资源需要互斥的访问
Semaphore mutex = new Semaphore(1);
try {
  // 先尝试执行wait操作,如果成功进入临界区,最后执行signal操作释放资源
  mutex.wait();
  // 临界区
  critical section;
} finally {
   mutex.signal()
}
// 剩余区
remainder section;

利用信号量实现互斥时有两个必要条件:

这两点都比较好理解,第一点是需要开发者保证的,即在使用信号量的时候,PV操作务必成对出现,而第二点则需要具体的操作系统、语言平台或软件等来保证,比如使用Java语言中的信号量,就需要JVM来保证,而JVM则需要更底层的操作系统或者硬件来支持。

简单的原子操作,底层CPU会提供对应的单条指令来保证,比如INC(自增)和XHG(交换)操作。而复杂的原子操作,由于其包含多条指令,在执行过程中会出现上下文切换(比如任务切换和中断处理等),这些行为会影响这些操作的原子性。通常情况下,我们都是使用自旋锁(spinlock)来保证操作指令序列不会在执行中途受到干扰。

这里以Java信号量源码为例,来看看在高级语言中,如果通过软件方式来实现信号量。在Java中获取信号量的方式有种,公平和非公平的获取,其底层同步(Sync)的实现代码如下所示。

// 代码来自于:java.util.concurrent.Semaphore.Sync
final int nonfairTryAcquireShared (int acquires){
    for (; ; ) {
        // 在Java信号量中使用AQS的state来代表信号量的许可数量或者资源数量
        // 如果用信号量实现互斥锁的话,这里的state可能的最大值是1
        int available = getState();
        // 剩余资源数量或者许可数量
        int remaining = available - acquires;
        // 如果小于0,说明已经没有可用资源,也就不用继续往下执行
        // 如果等于或者大于0,说明还有可用资源
        // 这时候,需要去更新可用资源数量,在Java中使用CAS机制保证多个线程对数据安全的更新
        if (remaining < 0 || compareAndSetState(available, remaining))
            return remaining;
    }
}

上述代码中,如果compareAndSet失败,则会一直循环,直到成功。也就是说,当一个进程试图进入临界区时,如果发现资源正被访问,自己会陷入循环检查状态,即忙等待,这样的信号量也被称为自旋锁(spinlock)。它的主要优点是,进程在等待过程中不需要切换上下文。如果等待时间小于上下文切换时间的话,旋转锁会更有效。

二、信号量实现前趋关系

信号量还可以用来描述程序或语句之间的前趋关系。比如有两个并发的进程P1和P2,P1中有语句S1,P2中有语句S2,我们希望在S1执行之后在执行S2。为了实现这种前趋关系,必须使P1和P2共用一个信号量S,其初始值为0,将signal(S)放在S1后面,而在S2语句前面插入wait(S)操作,其大概的流程如下图所示。

进程前趋关系示意图

由于S被初始化为0,如果P2先执行,其必定会阻塞,只有在进程P1执行完S1语句并signal(S)后,P2才能执行S2语句。同样,我们可以利用信号量,实现一个更复杂的可并发执行程序,如下图所示,每个节点代表一段程序或者一个线程,为使各个节点按照正确的顺序执行,应设置若干个初始值为0的信号量,比如为保证S1—>S2,S1—>S3的前趋关系,应分别为其设置信号量a和b,以此类推,其整个执行流程如下所示。

进程或线程运行示意图

可以使用下面的伪代码来实现图中的前趋关系:

// 定义7个信号量,每一个信号量用于控制两个节点的前趋关系
// 比如t1t2这个信号量用于保证S1-->S2,其他类似
var t1t2,t1t3,t2t5,t3t5,t1t4,t4t6,t5t6 = new semaphore(0);
begin
    // S1执行完成后,对后续的3个信号量执行signal操作
    begin S1; signal(t1t2); signal(t1t3); signal(t1t4)
  // S2只能等S1执行完成并signal对应的信号量后,才能执行
  // 同理,当S2执行完成后,也需要对后续的信号量执行signal操作
    begin wait(t1t2); S2; signal(t2t5);
    begin wait(t1t3); S3; singal(t3t5);
    begin wait(t1t4); S4; singal(t4t6);
    begin wait(t2t5); wait(t3t5); S5; signal(t5t6);
    begin wait(t5t6); wait(t4t6); S6
end

如果图中的节点代表线程的话,在Java中,可以使用Semphore来实现图中各个线程的前趋关系,大家可以试试,也可以参考:

public class SemphoreTrend {

    public static void main(String[] args) {

        // 定义7个信号量,用于控制6个任务相互之间的前趋关系
        Semaphore t1t2 = new Semaphore(0);
        Semaphore t1t3 = new Semaphore(0);
        Semaphore t1t4 = new Semaphore(0);
        Semaphore t2t5 = new Semaphore(0);
        Semaphore t3t5 = new Semaphore(0);
        Semaphore t5t6 = new Semaphore(0);
        Semaphore t4t6 = new Semaphore(0);

        Task task1 = new Task(1, null, new Semaphore[]{t1t2,t1t3,t1t4});
        Task task2 = new Task(2, new Semaphore[]{t1t2}, new Semaphore[]{t2t5});
        Task task3 = new Task(3, new Semaphore[]{t1t3}, new Semaphore[]{t3t5});
        Task task4 = new Task(4, new Semaphore[]{t1t4}, new Semaphore[]{t4t6});
        Task task5 = new Task(5, new Semaphore[]{t2t5,t3t5}, new Semaphore[]{t5t6});
        Task task6 = new Task(6, new Semaphore[]{t5t6,t4t6}, null);

        new Thread(task1).start();
        new Thread(task2).start();
        new Thread(task3).start();
        new Thread(task4).start();
        new Thread(task5).start();
        new Thread(task6).start();
    }

    public static class Task implements Runnable {
        // 任务名称
        private int number;
        // 当前任务需要执行wait操作的信号量列表
        private Semaphore[] ps;
        // 当前任务执行完成后,需要执行signal的信号量列表
        private Semaphore[] vs;

        public Task(int number,Semaphore[] ps,Semaphore[] vs) {
            this.number = number;
            this.ps = ps;
            this.vs = vs;
        }

        @Override
        public void run() {
            try {
                if (ps != null) {
                    // 等待
                    while (true) {
                        for (Semaphore semaphore : ps) {
                            if (!semaphore.tryAcquire()) {
                                TimeUnit.MICROSECONDS.sleep(100);
                                break;
                            }
                        }
                        break;
                    }
                    // 执行wait操作
                    for (Semaphore semaphore : ps) {
                        semaphore.acquire();
                    }
                }
                runTask();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        /**
         * 具体的任务
         */
        public void runTask() throws InterruptedException {
            TimeUnit.SECONDS.sleep(new Random().nextInt(10));
            System.out.println(“task “ + number + “ done”);
            if (vs != null) {
                for (Semaphore semaphore : vs) {
                    semaphore.release();
                }
            }
        }
    }
}
封面图:Christian Lambert on Unsplash

Comments

Post a Message

人生在世,错别字在所难免,无需纠正。

提交评论