Add join synchronization.

This commit is contained in:
Pavel Krajcevski 2012-08-31 17:33:54 -04:00
parent a6bbc3989f
commit 44884a18b3
2 changed files with 44 additions and 3 deletions

View file

@ -6,9 +6,14 @@
#include <boost/thread/thread.hpp> #include <boost/thread/thread.hpp>
#include <boost/thread/barrier.hpp> #include <boost/thread/barrier.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
CmpThread::CmpThread() CmpThread::CmpThread()
: m_Barrier(NULL) : m_ParentCounter(NULL)
, m_ParentCounterLock(NULL)
, m_FinishCV(NULL)
, m_Barrier(NULL)
, m_Width(0) , m_Width(0)
, m_Height(0) , m_Height(0)
, m_CmpFunc(NULL) , m_CmpFunc(NULL)
@ -17,32 +22,51 @@ CmpThread::CmpThread()
{ } { }
void CmpThread::operator()() { void CmpThread::operator()() {
if(!m_Barrier || !m_CmpFunc || !m_OutBuf || !m_InBuf ) { if(!m_Barrier || !m_CmpFunc || !m_OutBuf || !m_InBuf
|| !m_ParentCounter || !m_ParentCounterLock
|| !m_FinishCV
) {
fprintf(stderr, "Incorrect thread initialization.\n"); fprintf(stderr, "Incorrect thread initialization.\n");
return; return;
} }
// Wait for all threads to be ready...
m_Barrier->wait(); m_Barrier->wait();
(*m_CmpFunc)(m_InBuf, m_OutBuf, m_Width, m_Height); (*m_CmpFunc)(m_InBuf, m_OutBuf, m_Width, m_Height);
{
boost::lock_guard<boost::mutex> lock(*m_ParentCounterLock);
(*m_ParentCounter)++;
}
m_FinishCV->notify_one();
} }
ThreadGroup::ThreadGroup( int numThreads, const ImageFile &image, CompressionFunc func, unsigned char *outBuf ) ThreadGroup::ThreadGroup( int numThreads, const ImageFile &image, CompressionFunc func, unsigned char *outBuf )
: m_Barrier(new boost::barrier(numThreads)) : m_Barrier(new boost::barrier(numThreads))
, m_FinishMutex(new boost::mutex())
, m_FinishCV(new boost::condition_variable())
, m_NumThreads(numThreads) , m_NumThreads(numThreads)
, m_ActiveThreads(0) , m_ActiveThreads(0)
, m_Func(func) , m_Func(func)
, m_Image(image) , m_Image(image)
, m_OutBuf(outBuf) , m_OutBuf(outBuf)
{ {
for(int i = 0; i < kMaxNumThreads; i++) for(int i = 0; i < kMaxNumThreads; i++) {
// Thread synchronization primitives
m_Threads[i].m_ParentCounterLock = m_FinishMutex;
m_Threads[i].m_FinishCV = m_FinishCV;
m_Threads[i].m_ParentCounter = &m_ThreadsFinished;
m_Threads[i].m_Barrier = m_Barrier; m_Threads[i].m_Barrier = m_Barrier;
}
} }
ThreadGroup::~ThreadGroup() { ThreadGroup::~ThreadGroup() {
delete m_Barrier; delete m_Barrier;
} }
unsigned int ThreadGroup::GetCompressedBlockSize() { unsigned int ThreadGroup::GetCompressedBlockSize() {
if(m_Func == BC7C::CompressImageBC7) return 16; if(m_Func == BC7C::CompressImageBC7) return 16;
if(m_Func == BC7C::CompressImageBC7SIMD) return 16; if(m_Func == BC7C::CompressImageBC7SIMD) return 16;
@ -71,6 +95,8 @@ void ThreadGroup::Start() {
int blocksProcessed = 0; int blocksProcessed = 0;
int blocksPerThread = (numBlocks/m_NumThreads) + ((numBlocks % m_NumThreads)? 1 : 0); int blocksPerThread = (numBlocks/m_NumThreads) + ((numBlocks % m_NumThreads)? 1 : 0);
// Currently no threads are finished...
m_ThreadsFinished = 0;
for(int i = 0; i < m_NumThreads; i++) { for(int i = 0; i < m_NumThreads; i++) {
if(m_ActiveThreads >= kMaxNumThreads) if(m_ActiveThreads >= kMaxNumThreads)
@ -101,6 +127,11 @@ void ThreadGroup::Start() {
void ThreadGroup::Join() { void ThreadGroup::Join() {
boost::unique_lock<boost::mutex> lock(*m_FinishMutex);
while(m_ThreadsFinished != m_ActiveThreads) {
m_FinishCV->wait(lock);
}
for(int i = 0; i < m_ActiveThreads; i++) { for(int i = 0; i < m_ActiveThreads; i++) {
m_ThreadHandles[i]->join(); m_ThreadHandles[i]->join();
delete m_ThreadHandles[i]; delete m_ThreadHandles[i];

View file

@ -8,12 +8,19 @@
namespace boost { namespace boost {
class barrier; class barrier;
class thread; class thread;
class mutex;
class condition_variable;
} }
struct CmpThread { struct CmpThread {
friend class ThreadGroup; friend class ThreadGroup;
private: private:
int *m_ParentCounter;
boost::mutex *m_ParentCounterLock;
boost::condition_variable *m_FinishCV;
boost::barrier *m_Barrier; boost::barrier *m_Barrier;
int m_Width; int m_Width;
@ -43,11 +50,14 @@ class ThreadGroup {
private: private:
boost::barrier *const m_Barrier; boost::barrier *const m_Barrier;
boost::mutex *const m_FinishMutex;
boost::condition_variable *const m_FinishCV;
static const int kMaxNumThreads = 256; static const int kMaxNumThreads = 256;
const int m_NumThreads; const int m_NumThreads;
int m_ActiveThreads; int m_ActiveThreads;
int m_ThreadsFinished;
CmpThread m_Threads[kMaxNumThreads]; CmpThread m_Threads[kMaxNumThreads];
boost::thread *m_ThreadHandles[kMaxNumThreads]; boost::thread *m_ThreadHandles[kMaxNumThreads];