From ed6325551461fae2fd4a8e7a8bedc665b56b9b27 Mon Sep 17 00:00:00 2001 From: Pavel Krajcevski Date: Sat, 29 Sep 2012 15:36:42 -0400 Subject: [PATCH] Move worker queue implementation over to new abstracted scheme. --- Core/src/Thread.h | 15 +++++++++++---- Core/src/ThreadBoost.cpp | 31 ++++++++++++++++++++++--------- Core/src/WorkerQueue.cpp | 25 +++++++++++++------------ Core/src/WorkerQueue.h | 18 ++++++++---------- 4 files changed, 54 insertions(+), 35 deletions(-) diff --git a/Core/src/Thread.h b/Core/src/Thread.h index 79cdfb4..e0075e1 100644 --- a/Core/src/Thread.h +++ b/Core/src/Thread.h @@ -21,6 +21,15 @@ class TCThreadBase { TCThreadBaseImpl *m_Impl; }; +// The base class for a thread implementation +class TCCallable { + protected: + TCCallable() { } + public: + virtual ~TCCallable() { } + virtual void operator()() = 0; +}; + //////////////////////////////////////////////////////////////////////////////// // // Thread implementation @@ -30,12 +39,10 @@ class TCThreadBase { class TCThread : public TCThreadBase { public: - template - TCThread(C &); - ~TCThread(); + TCThread(TCCallable &); void Join(); - + static void Yield(); }; //////////////////////////////////////////////////////////////////////////////// diff --git a/Core/src/ThreadBoost.cpp b/Core/src/ThreadBoost.cpp index 84cff2b..eb24d38 100644 --- a/Core/src/ThreadBoost.cpp +++ b/Core/src/ThreadBoost.cpp @@ -63,11 +63,22 @@ TCThreadBase::~TCThreadBase() { //////////////////////////////////////////////////////////////////////////////// class TCThreadImpl : public TCThreadBaseImpl { +private: + class Instance { + private: + TCCallable &m_Callable; + public: + Instance(TCCallable &c) : m_Callable(c) { } + + void operator()() { + m_Callable(); + } + }; + boost::thread m_Thread; public: - template - TCThreadImpl(C &callable) - : m_Thread(callable) + TCThreadImpl(TCCallable &callable) + : m_Thread(Instance(callable)) { } void Join() { @@ -75,20 +86,18 @@ public: } }; -template class TCThreadImplFactory : public TCThreadBaseImplFactory { - C &m_Callable; + TCCallable &m_Callable; public: - TCThreadImplFactory(C &callable) : m_Callable(callable) { } + TCThreadImplFactory(TCCallable &callable) : m_Callable(callable) { } virtual ~TCThreadImplFactory() { } virtual TCThreadBaseImpl *CreateImpl() const { return new TCThreadImpl(m_Callable); } }; -template -TCThread::TCThread(C &callable) - : TCThreadBase(TCThreadImplFactory(callable)) +TCThread::TCThread(TCCallable &callable) + : TCThreadBase(TCThreadImplFactory(callable)) { } void TCThread::Join() { @@ -96,6 +105,10 @@ void TCThread::Join() { ((TCThreadImpl *)m_Impl)->Join(); } +void TCThread::Yield() { + boost::thread::yield(); +} + //////////////////////////////////////////////////////////////////////////////// // // Mutex Implementation diff --git a/Core/src/WorkerQueue.cpp b/Core/src/WorkerQueue.cpp index 2075415..d3c8ee2 100644 --- a/Core/src/WorkerQueue.cpp +++ b/Core/src/WorkerQueue.cpp @@ -4,8 +4,7 @@ #include #include - -#include +#include template static inline T max(const T &a, const T &b) { @@ -24,7 +23,8 @@ static inline void clamp(T &x, const T &min, const T &max) { } WorkerThread::WorkerThread(WorkerQueue * parent, uint32 idx) - : m_ThreadIdx(idx) + : TCCallable() + , m_ThreadIdx(idx) , m_Parent(parent) { } @@ -55,7 +55,7 @@ void WorkerThread::operator()() { case eAction_Wait: { - boost::thread::yield(); + TCThread::Yield(); break; } @@ -114,10 +114,10 @@ WorkerQueue::WorkerQueue( void WorkerQueue::Run() { // Spawn a bunch of threads... - boost::unique_lock lock(m_Mutex); + TCLock lock(m_Mutex); for(int i = 0; i < m_NumThreads; i++) { - WorkerThread t (this, i); - m_ThreadHandles[m_ActiveThreads] = new boost::thread(t); + m_Workers[i] = new WorkerThread(this, i); + m_ThreadHandles[m_ActiveThreads] = new TCThread(*m_Workers[i]); m_ActiveThreads++; } @@ -129,24 +129,25 @@ void WorkerQueue::Run() { // Wait for them to finish... while(m_ActiveThreads > 0) { - m_CV.wait(lock); + m_CV.Wait(lock); } m_StopWatch.Stop(); // Join them all together.. for(int i = 0; i < m_NumThreads; i++) { - m_ThreadHandles[i]->join(); + m_ThreadHandles[i]->Join(); delete m_ThreadHandles[i]; + delete m_Workers[i]; } } void WorkerQueue::NotifyWorkerFinished() { { - boost::lock_guard lock(m_Mutex); + TCLock lock(m_Mutex); m_ActiveThreads--; } - m_CV.notify_one(); + m_CV.NotifyOne(); } WorkerThread::EAction WorkerQueue::AcceptThreadData(uint32 threadIdx) { @@ -158,7 +159,7 @@ WorkerThread::EAction WorkerQueue::AcceptThreadData(uint32 threadIdx) { const uint32 totalBlocks = m_InBufSz / 64; // Make sure we have exclusive access... - boost::lock_guard lock(m_Mutex); + TCLock lock(m_Mutex); // If we've completed all blocks, then mark the thread for // completion. diff --git a/Core/src/WorkerQueue.h b/Core/src/WorkerQueue.h index b6cac35..c8aad39 100644 --- a/Core/src/WorkerQueue.h +++ b/Core/src/WorkerQueue.h @@ -3,23 +3,20 @@ // Forward declare... class WorkerQueue; -namespace boost { - class thread; -} // Necessary includes... #include "TexCompTypes.h" #include "TexComp.h" -#include -#include +#include "Thread.h" #include "StopWatch.h" -struct WorkerThread { +struct WorkerThread : public TCCallable { friend class WorkerQueue; public: WorkerThread(WorkerQueue *, uint32 idx); - void operator ()(); + virtual ~WorkerThread() { } + virtual void operator ()(); enum EAction { eAction_Wait, @@ -64,8 +61,8 @@ class WorkerQueue { const uint8 *m_InBuf; uint8 *m_OutBuf; - boost::condition_variable m_CV; - boost::mutex m_Mutex; + TCConditionVariable m_CV; + TCMutex m_Mutex; uint32 m_NextBlock; @@ -73,7 +70,8 @@ class WorkerQueue { uint32 m_Offsets[kMaxNumWorkerThreads]; uint32 m_NumBlocks[kMaxNumWorkerThreads]; - boost::thread *m_ThreadHandles[kMaxNumWorkerThreads]; + WorkerThread *m_Workers[kMaxNumWorkerThreads]; + TCThread *m_ThreadHandles[kMaxNumWorkerThreads]; const uint8 *GetSrcForThread(const int threadIdx) const; uint8 *GetDstForThread(const int threadIdx) const;