From 44884a18b37d75aa946ece3f0ed4c0f3c25e4f9a Mon Sep 17 00:00:00 2001 From: Pavel Krajcevski Date: Fri, 31 Aug 2012 17:33:54 -0400 Subject: [PATCH] Add join synchronization. --- Core/src/ThreadGroup.cpp | 37 ++++++++++++++++++++++++++++++++++--- Core/src/ThreadGroup.h | 10 ++++++++++ 2 files changed, 44 insertions(+), 3 deletions(-) diff --git a/Core/src/ThreadGroup.cpp b/Core/src/ThreadGroup.cpp index 83c4bd1..d114ccf 100644 --- a/Core/src/ThreadGroup.cpp +++ b/Core/src/ThreadGroup.cpp @@ -6,9 +6,14 @@ #include #include +#include +#include CmpThread::CmpThread() - : m_Barrier(NULL) + : m_ParentCounter(NULL) + , m_ParentCounterLock(NULL) + , m_FinishCV(NULL) + , m_Barrier(NULL) , m_Width(0) , m_Height(0) , m_CmpFunc(NULL) @@ -17,32 +22,51 @@ CmpThread::CmpThread() { } void CmpThread::operator()() { - if(!m_Barrier || !m_CmpFunc || !m_OutBuf || !m_InBuf ) { + if(!m_Barrier || !m_CmpFunc || !m_OutBuf || !m_InBuf + || !m_ParentCounter || !m_ParentCounterLock + || !m_FinishCV + ) { fprintf(stderr, "Incorrect thread initialization.\n"); return; } + // Wait for all threads to be ready... m_Barrier->wait(); (*m_CmpFunc)(m_InBuf, m_OutBuf, m_Width, m_Height); + + { + boost::lock_guard lock(*m_ParentCounterLock); + (*m_ParentCounter)++; + } + + m_FinishCV->notify_one(); } ThreadGroup::ThreadGroup( int numThreads, const ImageFile &image, CompressionFunc func, unsigned char *outBuf ) : m_Barrier(new boost::barrier(numThreads)) + , m_FinishMutex(new boost::mutex()) + , m_FinishCV(new boost::condition_variable()) , m_NumThreads(numThreads) , m_ActiveThreads(0) , m_Func(func) , m_Image(image) , m_OutBuf(outBuf) { - for(int i = 0; i < kMaxNumThreads; i++) + for(int i = 0; i < kMaxNumThreads; i++) { + // Thread synchronization primitives + m_Threads[i].m_ParentCounterLock = m_FinishMutex; + m_Threads[i].m_FinishCV = m_FinishCV; + m_Threads[i].m_ParentCounter = &m_ThreadsFinished; m_Threads[i].m_Barrier = m_Barrier; + } } ThreadGroup::~ThreadGroup() { delete m_Barrier; } + unsigned int ThreadGroup::GetCompressedBlockSize() { if(m_Func == BC7C::CompressImageBC7) return 16; if(m_Func == BC7C::CompressImageBC7SIMD) return 16; @@ -71,6 +95,8 @@ void ThreadGroup::Start() { int blocksProcessed = 0; int blocksPerThread = (numBlocks/m_NumThreads) + ((numBlocks % m_NumThreads)? 1 : 0); + // Currently no threads are finished... + m_ThreadsFinished = 0; for(int i = 0; i < m_NumThreads; i++) { if(m_ActiveThreads >= kMaxNumThreads) @@ -101,6 +127,11 @@ void ThreadGroup::Start() { void ThreadGroup::Join() { + boost::unique_lock lock(*m_FinishMutex); + while(m_ThreadsFinished != m_ActiveThreads) { + m_FinishCV->wait(lock); + } + for(int i = 0; i < m_ActiveThreads; i++) { m_ThreadHandles[i]->join(); delete m_ThreadHandles[i]; diff --git a/Core/src/ThreadGroup.h b/Core/src/ThreadGroup.h index bb2a5ce..d9536b2 100644 --- a/Core/src/ThreadGroup.h +++ b/Core/src/ThreadGroup.h @@ -8,12 +8,19 @@ namespace boost { class barrier; class thread; + class mutex; + class condition_variable; } struct CmpThread { friend class ThreadGroup; private: + int *m_ParentCounter; + + boost::mutex *m_ParentCounterLock; + boost::condition_variable *m_FinishCV; + boost::barrier *m_Barrier; int m_Width; @@ -43,11 +50,14 @@ class ThreadGroup { private: boost::barrier *const m_Barrier; + boost::mutex *const m_FinishMutex; + boost::condition_variable *const m_FinishCV; static const int kMaxNumThreads = 256; const int m_NumThreads; int m_ActiveThreads; + int m_ThreadsFinished; CmpThread m_Threads[kMaxNumThreads]; boost::thread *m_ThreadHandles[kMaxNumThreads];