Move worker queue implementation over to new abstracted scheme.

This commit is contained in:
Pavel Krajcevski 2012-09-29 15:36:42 -04:00
parent bb9370adaf
commit ed63255514
4 changed files with 54 additions and 35 deletions

View file

@ -21,6 +21,15 @@ class TCThreadBase {
TCThreadBaseImpl *m_Impl; TCThreadBaseImpl *m_Impl;
}; };
// The base class for a thread implementation
class TCCallable {
protected:
TCCallable() { }
public:
virtual ~TCCallable() { }
virtual void operator()() = 0;
};
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// //
// Thread implementation // Thread implementation
@ -30,12 +39,10 @@ class TCThreadBase {
class TCThread : public TCThreadBase { class TCThread : public TCThreadBase {
public: public:
template<typename C> TCThread(TCCallable &);
TCThread(C &);
~TCThread();
void Join(); void Join();
static void Yield();
}; };
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View file

@ -63,11 +63,22 @@ TCThreadBase::~TCThreadBase() {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
class TCThreadImpl : public TCThreadBaseImpl { class TCThreadImpl : public TCThreadBaseImpl {
private:
class Instance {
private:
TCCallable &m_Callable;
public:
Instance(TCCallable &c) : m_Callable(c) { }
void operator()() {
m_Callable();
}
};
boost::thread m_Thread; boost::thread m_Thread;
public: public:
template<typename C> TCThreadImpl(TCCallable &callable)
TCThreadImpl(C &callable) : m_Thread(Instance(callable))
: m_Thread(callable)
{ } { }
void Join() { void Join() {
@ -75,20 +86,18 @@ public:
} }
}; };
template<typename C>
class TCThreadImplFactory : public TCThreadBaseImplFactory { class TCThreadImplFactory : public TCThreadBaseImplFactory {
C &m_Callable; TCCallable &m_Callable;
public: public:
TCThreadImplFactory(C &callable) : m_Callable(callable) { } TCThreadImplFactory(TCCallable &callable) : m_Callable(callable) { }
virtual ~TCThreadImplFactory() { } virtual ~TCThreadImplFactory() { }
virtual TCThreadBaseImpl *CreateImpl() const { virtual TCThreadBaseImpl *CreateImpl() const {
return new TCThreadImpl(m_Callable); return new TCThreadImpl(m_Callable);
} }
}; };
template<typename C> TCThread::TCThread(TCCallable &callable)
TCThread::TCThread(C &callable) : TCThreadBase(TCThreadImplFactory(callable))
: TCThreadBase(TCThreadImplFactory<C>(callable))
{ } { }
void TCThread::Join() { void TCThread::Join() {
@ -96,6 +105,10 @@ void TCThread::Join() {
((TCThreadImpl *)m_Impl)->Join(); ((TCThreadImpl *)m_Impl)->Join();
} }
void TCThread::Yield() {
boost::thread::yield();
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// //
// Mutex Implementation // Mutex Implementation

View file

@ -4,8 +4,7 @@
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#include <assert.h>
#include <boost/thread/thread.hpp>
template <typename T> template <typename T>
static inline T max(const T &a, const T &b) { static inline T max(const T &a, const T &b) {
@ -24,7 +23,8 @@ static inline void clamp(T &x, const T &min, const T &max) {
} }
WorkerThread::WorkerThread(WorkerQueue * parent, uint32 idx) WorkerThread::WorkerThread(WorkerQueue * parent, uint32 idx)
: m_ThreadIdx(idx) : TCCallable()
, m_ThreadIdx(idx)
, m_Parent(parent) , m_Parent(parent)
{ } { }
@ -55,7 +55,7 @@ void WorkerThread::operator()() {
case eAction_Wait: case eAction_Wait:
{ {
boost::thread::yield(); TCThread::Yield();
break; break;
} }
@ -114,10 +114,10 @@ WorkerQueue::WorkerQueue(
void WorkerQueue::Run() { void WorkerQueue::Run() {
// Spawn a bunch of threads... // Spawn a bunch of threads...
boost::unique_lock<boost::mutex> lock(m_Mutex); TCLock lock(m_Mutex);
for(int i = 0; i < m_NumThreads; i++) { for(int i = 0; i < m_NumThreads; i++) {
WorkerThread t (this, i); m_Workers[i] = new WorkerThread(this, i);
m_ThreadHandles[m_ActiveThreads] = new boost::thread(t); m_ThreadHandles[m_ActiveThreads] = new TCThread(*m_Workers[i]);
m_ActiveThreads++; m_ActiveThreads++;
} }
@ -129,24 +129,25 @@ void WorkerQueue::Run() {
// Wait for them to finish... // Wait for them to finish...
while(m_ActiveThreads > 0) { while(m_ActiveThreads > 0) {
m_CV.wait(lock); m_CV.Wait(lock);
} }
m_StopWatch.Stop(); m_StopWatch.Stop();
// Join them all together.. // Join them all together..
for(int i = 0; i < m_NumThreads; i++) { for(int i = 0; i < m_NumThreads; i++) {
m_ThreadHandles[i]->join(); m_ThreadHandles[i]->Join();
delete m_ThreadHandles[i]; delete m_ThreadHandles[i];
delete m_Workers[i];
} }
} }
void WorkerQueue::NotifyWorkerFinished() { void WorkerQueue::NotifyWorkerFinished() {
{ {
boost::lock_guard<boost::mutex> lock(m_Mutex); TCLock lock(m_Mutex);
m_ActiveThreads--; m_ActiveThreads--;
} }
m_CV.notify_one(); m_CV.NotifyOne();
} }
WorkerThread::EAction WorkerQueue::AcceptThreadData(uint32 threadIdx) { WorkerThread::EAction WorkerQueue::AcceptThreadData(uint32 threadIdx) {
@ -158,7 +159,7 @@ WorkerThread::EAction WorkerQueue::AcceptThreadData(uint32 threadIdx) {
const uint32 totalBlocks = m_InBufSz / 64; const uint32 totalBlocks = m_InBufSz / 64;
// Make sure we have exclusive access... // Make sure we have exclusive access...
boost::lock_guard<boost::mutex> lock(m_Mutex); TCLock lock(m_Mutex);
// If we've completed all blocks, then mark the thread for // If we've completed all blocks, then mark the thread for
// completion. // completion.

View file

@ -3,23 +3,20 @@
// Forward declare... // Forward declare...
class WorkerQueue; class WorkerQueue;
namespace boost {
class thread;
}
// Necessary includes... // Necessary includes...
#include "TexCompTypes.h" #include "TexCompTypes.h"
#include "TexComp.h" #include "TexComp.h"
#include <boost/thread/mutex.hpp> #include "Thread.h"
#include <boost/thread/condition_variable.hpp>
#include "StopWatch.h" #include "StopWatch.h"
struct WorkerThread { struct WorkerThread : public TCCallable {
friend class WorkerQueue; friend class WorkerQueue;
public: public:
WorkerThread(WorkerQueue *, uint32 idx); WorkerThread(WorkerQueue *, uint32 idx);
void operator ()(); virtual ~WorkerThread() { }
virtual void operator ()();
enum EAction { enum EAction {
eAction_Wait, eAction_Wait,
@ -64,8 +61,8 @@ class WorkerQueue {
const uint8 *m_InBuf; const uint8 *m_InBuf;
uint8 *m_OutBuf; uint8 *m_OutBuf;
boost::condition_variable m_CV; TCConditionVariable m_CV;
boost::mutex m_Mutex; TCMutex m_Mutex;
uint32 m_NextBlock; uint32 m_NextBlock;
@ -73,7 +70,8 @@ class WorkerQueue {
uint32 m_Offsets[kMaxNumWorkerThreads]; uint32 m_Offsets[kMaxNumWorkerThreads];
uint32 m_NumBlocks[kMaxNumWorkerThreads]; uint32 m_NumBlocks[kMaxNumWorkerThreads];
boost::thread *m_ThreadHandles[kMaxNumWorkerThreads]; WorkerThread *m_Workers[kMaxNumWorkerThreads];
TCThread *m_ThreadHandles[kMaxNumWorkerThreads];
const uint8 *GetSrcForThread(const int threadIdx) const; const uint8 *GetSrcForThread(const int threadIdx) const;
uint8 *GetDstForThread(const int threadIdx) const; uint8 *GetDstForThread(const int threadIdx) const;