Linux:基于信号量的环形队列与生产者消费者模型(一)

📅 发布时间:2026/7/5 4:09:31 👁️ 浏览次数:
Linux:基于信号量的环形队列与生产者消费者模型(一)
前言在 Linux/C 后端开发的学习路径中生产者消费者模型是必须掌握的核心多线程同步场景而信号量Semaphore是实现该模型最经典、最直观的方案之一。相比于互斥锁只解决 “临界资源互斥访问”信号量可以直接通过 “计数” 完成线程间的同步控制完美适配 “生产速度与消费速度不匹配” 的场景。本文将从零开始基于 POSIX 信号量、环形队列封装一套完整的多生产者、多消费者模型所有代码均可直接编译运行全程聚焦原理讲解与代码实现帮助大家彻底吃透信号量的核心用法。一、核心理论铺垫1. 什么是生产者消费者模型简单来说生产者消费者模型包含三类角色生产者线程负责生产数据放入共享缓冲区消费者线程负责从共享缓冲区取出数据进行消费共享缓冲区线程间共享的数据容器本文使用环形队列实现。该模型要解决两个核心问题同步问题缓冲区为空时消费者不能消费缓冲区满时生产者不能生产互斥问题多个生产者 / 消费者不能同时操作缓冲区的同一块内存。2. 什么是信号量信号量是一种基于计数的同步工具本质是一个计数器用于控制对共享资源的访问核心只有两个操作P 操作wait计数器-1如果计数器≤0线程阻塞等待V 操作post计数器1如果有线程阻塞唤醒等待的线程。在本文中我们使用两个信号量_blank_sem记录环形队列的空闲空间数量生产者使用_data_sem记录环形队列的有效数据数量消费者使用。3. 什么是环形队列环形队列是一种固定大小、循环复用内存的队列结构通过下标取模实现空间复用避免普通队列频繁扩容的开销是生产者消费者模型的最优缓冲区选择。二、完整代码分模块详解本文代码共 5 个文件Sem.hpp、Mutex.hpp、RingQueue.hpp、Main.cc、Makefile所有代码严格遵循学习版规范不考虑冗余健壮性只聚焦核心逻辑。1. 信号量封装Sem.hpp这是对 POSIX 原生信号量的极简封装只保留核心的初始化、销毁、P/V 操作#include iostream #include semaphore.h class Sem { public: // 初始化信号量pshared0表示线程间共享value为初始计数器 Sem(unsigned int value 1) { sem_init(_sem, 0, value); } ~Sem() { sem_destroy(_sem); // 释放信号量资源 } void P() // wait申请资源计数-1不足则阻塞 { sem_wait(_sem); } void V() // post释放资源计数1唤醒等待线程 { sem_post(_sem); } private: sem_t _sem; // POSIX原生信号量对象 };关键细节sem_init第二个参数为 0代表信号量用于同一进程内的线程间共享这是多线程编程的标准用法。2. 互斥锁封装Mutex.hpp信号量只解决同步问题多线程同时修改队列下标时必须用互斥锁保证互斥访问这里使用 RAII 机制自动管理锁#pragma once #include iostream #include pthread.h // 原生互斥锁封装 class Mutex { public: Mutex() { pthread_mutex_init(_lock, nullptr); } ~Mutex() { pthread_mutex_destroy(_lock); } void Lock() { pthread_mutex_lock(_lock); } void UnLock() { pthread_mutex_unlock(_lock); } private: pthread_mutex_t _lock; }; // RAII风格锁守卫构造加锁析构解锁避免忘记解锁 class MutexGuard { public: MutexGuard(Mutex mutex) : _mutex(mutex) { _mutex.Lock(); } ~MutexGuard() { _mutex.UnLock(); } private: Mutex _mutex; };作用保证同一时间只有一个线程修改队列的下标防止多线程竞争导致的数据错乱。3. 环形队列封装RingQueue.hpp这是整个模型的核心信号量 互斥锁 环形队列三者协作的载体#pragma once #include Sem.hpp #include vector #include Mutex.hpp templateclass T class RingQueue { public: // 初始化队列容量空闲空间信号量容量数据信号量0 RingQueue(int cap 5) : _rq(cap) , _cap(cap) , _blank_sem(cap) , _step_c(0) , _data_sem(0) , _step_p(0) {} ~RingQueue() {} // 生产者生产数据 void push(T in) { _blank_sem.P(); // 先申请空闲空间满则阻塞 MutexGuard mg(_mutex_p); // 加锁保护下标修改 _rq[_step_p] in; _step_p % _cap; // 下标环形复用 _data_sem.V(); // 生产完成通知消费者有新数据 } // 消费者消费数据 T pop() { _data_sem.P(); // 先申请有效数据空则阻塞 MutexGuard mg(_mutex_p); // 加锁保护下标修改 T out _rq[_step_c]; _step_c % _cap; // 下标环形复用 _blank_sem.V(); // 消费完成通知生产者有空闲空间 return out; } private: std::vectorT _rq; // 队列底层存储 int _cap; // 队列最大容量 Sem _blank_sem; // 空闲空间信号量 int _step_p; // 生产者下标 Mutex _mutex_p; // 生产者互斥锁 Sem _data_sem; // 有效数据信号量 int _step_c; // 消费者下标 Mutex _mutex_c; // 消费者互斥锁 };核心逻辑生产者先通过P操作申请空闲空间再加锁修改队列消费者先通过P操作申请有效数据再加锁读取队列操作完成后通过V操作释放资源唤醒对方线程。4. 主测试文件Main.cc创建3 个生产者线程 2 个消费者线程模拟真实多线程场景cpp运行#include RingQueue.hpp #include pthread.h #include unistd.h #include string // 线程参数结构体传递队列指针线程名称 struct rq_pthread_name { RingQueueint* rq; std::string name; }; // 生产者线程例程 void* routine_p(void* args) { rq_pthread_name* rqn (rq_pthread_name*)args; std::string name rqn-name; int data 1; while (true) { sleep(2); // 模拟生产耗时 std::cout name 放入了 : data std::endl; rqn-rq-push(data); data; } } // 消费者线程例程 void* routine_c(void* args) { rq_pthread_name* rqn (rq_pthread_name*)args; std::string name rqn-name; while (true) { sleep(1); // 模拟消费耗时 int data rqn-rq-pop(); std::cout name 拿到了 : data std::endl; } } int main() { pthread_t c[2]; // 2个消费者 pthread_t p[3]; // 3个生产者 RingQueueint* rq new RingQueueint(); // 创建消费者线程 rq_pthread_name* rqn new rq_pthread_name; rqn-rq rq; rqn-name pthread - c0; pthread_create(c, nullptr, routine_c, rqn); rqn new rq_pthread_name; rqn-rq rq; rqn-name pthread - c1; pthread_create(c 1, nullptr, routine_c, rqn); // 创建生产者线程 rqn new rq_pthread_name; rqn-rq rq; rqn-name pthread - p0; pthread_create(p, nullptr, routine_p, rqn); rqn new rq_pthread_name; rqn-rq rq; rqn-name pthread - p1; pthread_create(p 1, nullptr, routine_p, rqn); rqn new rq_pthread_name; rqn-rq rq; rqn-name pthread - p2; pthread_create(p 2, nullptr, routine_p, rqn); // 等待所有线程退出 pthread_join(c[0], nullptr); pthread_join(c[1], nullptr); pthread_join(p[0], nullptr); pthread_join(p[1], nullptr); pthread_join(p[2], nullptr); return 0; }5. 编译脚本Makefilecode : Main.cc g $^ -o $ -lpthread -stdc11 .PHONY : clean clean : rm -f code编译命令make运行./code三、运行效果与原理分析运行后控制台会持续输出如下内容节选pthread - p0放入了 : 1 pthread - p1放入了 : 1 pthread - p2放入了 : 1 pthread - c0拿到了 : 1 pthread - c1拿到了 : 1 pthread - c0拿到了 : 1 pthread - p0放入了 : 2 pthread - c1拿到了 : 2效果说明3 个生产者每 2 秒生产一个数据2 个消费者每 1 秒消费一个数据当队列满时生产者会自动阻塞不会继续生产当队列空时消费者会自动阻塞不会继续消费所有线程安全运行无数据覆盖、无数据丢失。四、基础篇总结通过本文的实现我们完成了基于信号量的环形队列生产者消费者模型核心知识点可以总结为 3 点信号量负责同步通过计数控制生产 / 消费的节奏解决缓冲区空 / 满的问题互斥锁负责互斥保护队列下标的修改解决多线程竞争问题环形队列负责存储固定大小 循环复用高效适配生产者消费者模型。对于初学者而言先理解 “信号量计数和队列资源的对应关系”是掌握该模型的第一步。下一篇进阶博客我们将深入分析信号量与互斥锁的协作细节优化代码并讲解多线程场景的核心坑点。