做曖曖視頻網(wǎng)站安全嗎it培訓(xùn)機(jī)構(gòu)哪個(gè)好一點(diǎn)
- 生產(chǎn)者和消費(fèi)者概念
- 基于BlockingQueue的生產(chǎn)者消費(fèi)者模型
- 全部代碼
生產(chǎn)者和消費(fèi)者概念
生產(chǎn)者消費(fèi)者模式就是通過一個(gè)容器來解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問題。
生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過這個(gè)容器來通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理,直接將生產(chǎn)的數(shù)據(jù)放到這個(gè)容器當(dāng)中,消費(fèi)者也不用找生產(chǎn)者要數(shù)據(jù),而是直接從這個(gè)容器里取數(shù)據(jù),這個(gè)容器就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力,這個(gè)容器實(shí)際上就是用來給生產(chǎn)者和消費(fèi)者解耦的。
生產(chǎn)者消費(fèi)者模型的特點(diǎn)
生產(chǎn)者消費(fèi)者模型是多線程同步與互斥的一個(gè)經(jīng)典場景,其特點(diǎn)如下:
- 三種關(guān)系: 生產(chǎn)者和生產(chǎn)者(互斥關(guān)系)、消費(fèi)者和消費(fèi)者(互斥關(guān)系)、生產(chǎn)者和消費(fèi)者(互斥關(guān)系、同步關(guān)系)。
- 兩種角色: 生產(chǎn)者和消費(fèi)者。(通常由進(jìn)程或線程承擔(dān))
- 一個(gè)交易場所: 通常指的是內(nèi)存中的一段緩沖區(qū)。(可以自己通過某種方式組織起來)
生產(chǎn)者和生產(chǎn)者、消費(fèi)者和消費(fèi)者、生產(chǎn)者和消費(fèi)者,它們之間為什么會(huì)存在互斥關(guān)系?
因?yàn)樵谏a(chǎn)者和消費(fèi)者之間存在多種執(zhí)行流同時(shí)訪問的問題,,因此我們需要將他們同時(shí)訪問的臨界區(qū)進(jìn)行加互斥保護(hù)起來
其中,所有的生產(chǎn)者和消費(fèi)者都會(huì)競爭式的申請鎖,因此生產(chǎn)者和生產(chǎn)者、消費(fèi)者和消費(fèi)者、生產(chǎn)者和消費(fèi)者之間都存在互斥關(guān)系。
生產(chǎn)者和消費(fèi)者之間為什么會(huì)存在同步關(guān)系?
- 如果讓生產(chǎn)者一直生產(chǎn),那么當(dāng)生產(chǎn)者生產(chǎn)的數(shù)據(jù)將容器塞滿后,生產(chǎn)者再生產(chǎn)數(shù)據(jù)就會(huì)生產(chǎn)失敗。
- 反之,讓消費(fèi)者一直消費(fèi),那么當(dāng)容器當(dāng)中的數(shù)據(jù)被消費(fèi)完后,消費(fèi)者再進(jìn)行消費(fèi)就會(huì)消費(fèi)失敗。
雖然這樣不會(huì)造成任何數(shù)據(jù)不一致的問題,但是這樣會(huì)引起另一方的饑餓問題,是非常低效的。我們應(yīng)該讓生產(chǎn)者和消費(fèi)者訪問該容器時(shí)具有一定的順序性,比如讓生產(chǎn)者先生產(chǎn),然后再讓消費(fèi)者進(jìn)行消費(fèi)。
- 注意: 互斥關(guān)系保證的是數(shù)據(jù)的正確性,而同步關(guān)系是為了讓多線程之間協(xié)同起來。
基于BlockingQueue的生產(chǎn)者消費(fèi)者模型
當(dāng)多個(gè)生產(chǎn)者,消費(fèi)者同時(shí)出現(xiàn)進(jìn)行搶占線程時(shí),我們可以使用BlockingQueue來進(jìn)行緩沖,如圖
其與普通的隊(duì)列的區(qū)別在于:
- 當(dāng)隊(duì)列為空時(shí),從隊(duì)列獲取元素的操作將會(huì)被阻塞,直到隊(duì)列中放入了元素。
- 當(dāng)隊(duì)列滿時(shí),往隊(duì)列里存放元素的操作會(huì)被阻塞,直到有元素從隊(duì)列中取出。
知識聯(lián)系: 看到以上阻塞隊(duì)列的描述,我們很容易想到的就是管道,而阻塞隊(duì)列最典型的應(yīng)用場景實(shí)際上就是管道的實(shí)現(xiàn)。
put為生產(chǎn)者,take為消費(fèi)者
全部代碼
task.hpp 用于實(shí)現(xiàn)打印和計(jì)算
#pragma once
#include <iostream>
#include <string>class Task
{
public:Task(){}Task(int x,int y,char op):_x(x),_y(y),_op(op),_result(0),_exitCode(0){}void operator()(){switch(_op){case '+' :_result = _x + _y;break;case '-':_result = _x - _y;break;case '*':_result = _x * _y;break;case '/':{if (_y == 0)_exitCode = -1;else_result = _x / _y;}break;case '%':{if (_y == 0)_exitCode = -2;else_result = _x % _y;}break;default:break;}}std::string formatArg(){return std::to_string(_x) + _op + std::to_string(_y) + "="; }std::string formatRes(){return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";}~Task(){}
private:int _x;int _y;char _op;//輸入的符號int _result;int _exitCode;
};
blockQueue.hpp 維護(hù)線程之間的同步
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>const int gcap = 5;//最大容量
template<class T>
class BlockQueue
{
public:BlockQueue(const int cap = gcap):_cap(gcap){pthread_mutex_init(&_mutex,nullptr);//初始化互斥量//初始化用戶和生產(chǎn)者的條件變量pthread_cond_init(&_consumerCond,nullptr);pthread_cond_init(&_consumerCond,nullptr);}//判斷是否為慢bool isFull(){return _q.size() == _cap;}//判斷是否為空bool isEmpty(){return _q.empty();}//插入void push(const T &in){pthread_mutex_lock(&_mutex);//細(xì)節(jié)1:一定要保證,在任何時(shí)候,都要符合條件,才進(jìn)行生產(chǎn)while(isFull()){//1 我們只能在臨界區(qū)內(nèi)部,判斷臨界區(qū)資源是否就緒!注定了我們在當(dāng)前一定持有鎖。//2 要讓線程進(jìn)行休眠等待,不能持有鎖等待//3 注定了pthread_cond_wait要有鎖的釋放能力pthread_cond_wait(&_productCond,&_mutex);// 4. 當(dāng)線程醒來的時(shí)候,注定了繼續(xù)從臨界區(qū)內(nèi)部繼續(xù)運(yùn)行!因?yàn)槲沂窃谂R界區(qū)被切走的!// 5. 注定了當(dāng)線程被喚醒的時(shí)候,繼續(xù)在pthread_cond_wait函數(shù)出向后運(yùn)行,又要重新申請鎖,申請成功才會(huì)徹底返回}// 沒有滿,就要讓他繼續(xù)運(yùn)行_q.push(in);//加策略pthread_cond_signal(&_consumerCond);pthread_mutex_unlock(&_mutex);}//取出刪除void pop(T* out){pthread_mutex_lock(&_mutex);while(isEmpty()) {pthread_cond_wait(&_consumerCond, &_mutex);}*out = _q.front();_q.pop();// 加策略pthread_cond_signal(&_productCond);pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_consumerCond);pthread_cond_destroy(&_productCond);}
private:std::queue<T> _q;int _cap;//生產(chǎn)容量//為什么我們這份代碼只有一個(gè)鎖,根本原因在于//我們生產(chǎn)者和消費(fèi)者訪問的是同一個(gè)queue && queue 被當(dāng)作整體使用pthread_mutex_t _mutex;pthread_cond_t _consumerCond;//消費(fèi)者對應(yīng)的條件變量pthread_cond_t _productCond;//生產(chǎn)者對應(yīng)的條件變量
};
main.cc
#include <iostream>
#include "task.hpp"
#include "blockQueue.hpp"
#include <pthread.h>
#include <ctime>
#include <unistd.h>void* consumer(void* args)
{BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);while(1){Task t;// 1. 將數(shù)據(jù)從blockqueue中獲取 -- 獲取到了數(shù)據(jù)bq->pop(&t);t();// 2. 結(jié)合某種業(yè)務(wù)邏輯,處理數(shù)據(jù)! -- TODOstd::cout << pthread_self() << " | consumer data: " << t.formatArg() << t.formatRes() << std::endl;}
}void *productor(void *args)
{BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);std::string opers = "+-*/%";while(1){int x = rand()%20 +1;int y = rand()%20 +1;char op = opers[rand() % opers.size()];Task t(x,y,op);bq->push(t);std::cout << pthread_self() << " | productor Task: " << t.formatArg() << "?" << std::endl;}
}int main()
{BlockQueue<Task>*bq = new BlockQueue<Task>();pthread_t c[2], p[3];pthread_create(&c[0], nullptr, consumer, bq);pthread_create(&c[1], nullptr, consumer, bq);pthread_create(&p[0], nullptr, productor, bq);pthread_create(&p[1], nullptr, productor, bq);pthread_create(&p[2], nullptr, productor, bq);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);pthread_join(p[2], nullptr);delete bq;return 0;return 0;
}