怎樣做電商網(wǎng)站社群營銷案例
rtc::Thread介紹
rtc::Thread類不僅僅實現(xiàn)了線程這個執(zhí)行器(比如posix底層調(diào)用pthread相關(guān)接口創(chuàng)建線程,管理線程等),還包括消息隊列(message_queue)的實現(xiàn),rtc::Thread啟動后就作為一個永不停止的event loop,沒有任務(wù)待執(zhí)行就阻塞等待,添加任務(wù)
后就喚醒event loop,去執(zhí)行任務(wù),周而復(fù)始,直到調(diào)用stop退出event loop,退出線程(線程join)。
在WebRTC內(nèi)部,可以將消息隊列等同于event loop,消息隊列為空,就進行阻塞等待。
class RTC_LOCKABLE Thread : public MessageQueue {
Thread關(guān)鍵接口
public:// Starts the execution of the thread.bool Start(Runnable* runnable = nullptr);// Tells the thread to stop and waits until it is joined.// Never call Stop on the current thread. Instead use the inherited Quit// function which will exit the base MessageQueue without terminating the// underlying OS thread.virtual void Stop();virtual void Send(const Location& posted_from,MessageHandler* phandler,uint32_t id = 0,MessageData* pdata = nullptr);// Convenience method to invoke a functor on another thread. Caller must// provide the |ReturnT| template argument, which cannot (easily) be deduced.// Uses Send() internally, which blocks the current thread until execution// is complete.// Ex: bool result = thread.Invoke<bool>(RTC_FROM_HERE,// &MyFunctionReturningBool);// NOTE: This function can only be called when synchronous calls are allowed.// See ScopedDisallowBlockingCalls for details.template <class ReturnT, class FunctorT>ReturnT Invoke(const Location& posted_from, FunctorT&& functor) {FunctorMessageHandler<ReturnT, FunctorT> handler(std::forward<FunctorT>(functor));InvokeInternal(posted_from, &handler);return handler.MoveResult();}// ProcessMessages will process I/O and dispatch messages until:// 1) cms milliseconds have elapsed (returns true)// 2) Stop() is called (returns false)bool ProcessMessages(int cms);protected:// Blocks the calling thread until this thread has terminated.void Join();
MessageQueue關(guān)鍵接口
public:
virtual void Quit();// Get() will process I/O until:
// 1) A message is available (returns true)
// 2) cmsWait seconds have elapsed (returns false)
// 3) Stop() is called (returns false)
virtual bool Get(Message* pmsg,int cmsWait = kForever,bool process_io = true);virtual void Post(const Location& posted_from,MessageHandler* phandler,uint32_t id = 0,MessageData* pdata = nullptr,bool time_sensitive = false);
virtual void PostDelayed(const Location& posted_from,int cmsDelay,MessageHandler* phandler,uint32_t id = 0,MessageData* pdata = nullptr);
virtual void PostAt(const Location& posted_from,int64_t tstamp,MessageHandler* phandler,uint32_t id = 0,MessageData* pdata = nullptr);virtual void Dispatch(Message* pmsg);
virtual void ReceiveSends();protected:
void WakeUpSocketServer();MessageList msgq_ RTC_GUARDED_BY(crit_);
PriorityQueue dmsgq_ RTC_GUARDED_BY(crit_);
線程啟動Start
調(diào)用Start接口啟動底層線程,同時進入一個永不停止的event loop(除非調(diào)用Stop接口)
流程如下:
Start->pthread_create->PreRun->Run
void Thread::Run() {ProcessMessages(kForever);
}
最終通過Get接口獲取消息去執(zhí)行(Dispatch),Get獲取不到消息就是進入阻塞狀態(tài)(wait),等待有消息后被喚醒。
線程消息隊列處理消息的流程ProcessMessage
- 1、處理從其他線程發(fā)送的要在本線程去執(zhí)行的消息,即同步調(diào)用
接收者線程處理流程:
發(fā)送者線程流程:
-
2、處理延遲消息(存儲在優(yōu)先級隊列)
延遲消息是通過PostDelayed和PostAt接口調(diào)用然后push到優(yōu)先級隊列中(dmsgq_,小根堆)
-
3、異步消息(存儲在普通隊列里)
延遲消息是通過Pos接口調(diào)用然后push到普通隊列中(msgq_)
任務(wù)提交方式(Invoke/Post)
webrtc內(nèi)部消息其實是對待執(zhí)行任務(wù)的封裝,消息和任務(wù)可以認為是一個意思
消息要繼承MessageHandler,實現(xiàn)OnMessage
class MessageHandler {public:virtual ~MessageHandler();virtual void OnMessage(Message* msg) = 0;protected:MessageHandler() {}private:RTC_DISALLOW_COPY_AND_ASSIGN(MessageHandler);
};
因為執(zhí)行消息,實際上就是執(zhí)行OnMessage(詳見Dispatch接口實現(xiàn))
上一章節(jié)其實已經(jīng)把三種任務(wù)提交方式介紹過了
1、同步阻塞調(diào)用(Send,Invoke)
Invoke其實最終也是調(diào)用Send,Invoke是個函數(shù)模版,可以非常方便在目標執(zhí)行線程執(zhí)行函數(shù)然后獲得返回值,Invoke實現(xiàn)如下:
// Convenience method to invoke a functor on another thread. Caller must// provide the |ReturnT| template argument, which cannot (easily) be deduced.// Uses Send() internally, which blocks the current thread until execution// is complete.// Ex: bool result = thread.Invoke<bool>(RTC_FROM_HERE,// &MyFunctionReturningBool);// NOTE: This function can only be called when synchronous calls are allowed.// See ScopedDisallowBlockingCalls for details.template <class ReturnT, class FunctorT>ReturnT Invoke(const Location& posted_from, FunctorT&& functor) {FunctorMessageHandler<ReturnT, FunctorT> handler(std::forward<FunctorT>(functor));InvokeInternal(posted_from, &handler);return handler.MoveResult();}void Thread::InvokeInternal(const Location& posted_from,MessageHandler* handler) {TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file_and_line",posted_from.file_and_line(), "src_func",posted_from.function_name());Send(posted_from, handler);
}
調(diào)用方式舉例:
bool result = thread.Invoke<bool>(RTC_FROM_HERE, &MyFunctionReturningBool);
2、異步非阻塞延遲調(diào)用
PostDelayed和PostAt
3、異步非阻塞調(diào)用
Post
線程退出Stop
void Thread::Stop() {MessageQueue::Quit();Join();
}void MessageQueue::Quit() {AtomicOps::ReleaseStore(&stop_, 1);WakeUpSocketServer();
}void Thread::Join() {if (!IsRunning())return;RTC_DCHECK(!IsCurrent());if (Current() && !Current()->blocking_calls_allowed_) {RTC_LOG(LS_WARNING) << "Waiting for the thread to join, "<< "but blocking calls have been disallowed";}#if defined(WEBRTC_WIN)RTC_DCHECK(thread_ != nullptr);WaitForSingleObject(thread_, INFINITE);CloseHandle(thread_);thread_ = nullptr;thread_id_ = 0;
#elif defined(WEBRTC_POSIX)pthread_join(thread_, nullptr);thread_ = 0;
#endif
}