From 7116cc8b89f63320c2557733614dacc33716675e Mon Sep 17 00:00:00 2001 From: Pavel Krajcevski Date: Sat, 15 Sep 2012 12:13:32 -0400 Subject: [PATCH] Redid thread synchonization in order to be able to signal threads to run without having to initialize them again. We can use this to average the running times. --- Core/src/TexComp.cpp | 9 ++++ Core/src/ThreadGroup.cpp | 110 ++++++++++++++++++++++++++++++--------- Core/src/ThreadGroup.h | 26 ++++++--- 3 files changed, 113 insertions(+), 32 deletions(-) diff --git a/Core/src/TexComp.cpp b/Core/src/TexComp.cpp index d288eab..8bf512a 100644 --- a/Core/src/TexComp.cpp +++ b/Core/src/TexComp.cpp @@ -4,6 +4,7 @@ #include #include +#include template static T min(const T &a, const T &b) { @@ -25,6 +26,7 @@ SCompressionSettings:: SCompressionSettings() , bUseSIMD(false) , iNumThreads(1) , iQuality(50) + , iNumCompressions(1) { clamp(iQuality, 0, 256); } @@ -90,15 +92,22 @@ CompressedImage * CompressImage( if(f) { StopWatch stopWatch = StopWatch(); + double cmpMSTime = 0.0; if(settings.iNumThreads > 1) { ThreadGroup tgrp (settings.iNumThreads, img, f, cmpData); + if(!(tgrp.PrepareThreads())) { + assert(!"Thread group failed to prepare threads?!"); + return NULL; + } tgrp.Start(); tgrp.Join(); stopWatch = tgrp.GetStopWatch(); + + tgrp.CleanUpThreads(); } else { stopWatch.Start(); diff --git a/Core/src/ThreadGroup.cpp b/Core/src/ThreadGroup.cpp index 3f2cabe..7926899 100644 --- a/Core/src/ThreadGroup.cpp +++ b/Core/src/ThreadGroup.cpp @@ -10,42 +10,50 @@ #include CmpThread::CmpThread() - : m_ParentCounter(NULL) + : m_StartBarrier(NULL) + , m_ParentCounter(NULL) , m_ParentCounterLock(NULL) , m_FinishCV(NULL) - , m_Barrier(NULL) , m_Width(0) , m_Height(0) , m_CmpFunc(NULL) , m_OutBuf(NULL) , m_InBuf(NULL) + , m_ParentExitFlag(NULL) { } void CmpThread::operator()() { - if(!m_Barrier || !m_CmpFunc || !m_OutBuf || !m_InBuf - || !m_ParentCounter || !m_ParentCounterLock - || !m_FinishCV + if(!m_CmpFunc || !m_OutBuf || !m_InBuf + || !m_ParentCounter || !m_ParentCounterLock || !m_FinishCV + || !m_StartBarrier + || !m_ParentExitFlag ) { fprintf(stderr, "Incorrect thread initialization.\n"); return; } - // Wait for all threads to be ready... - m_Barrier->wait(); + while(1) { + // Wait for signal to start work... + m_StartBarrier->wait(); - (*m_CmpFunc)(m_InBuf, m_OutBuf, m_Width, m_Height); + if(*m_ParentExitFlag) { + return; + } - { - boost::lock_guard lock(*m_ParentCounterLock); - (*m_ParentCounter)++; + (*m_CmpFunc)(m_InBuf, m_OutBuf, m_Width, m_Height); + + { + boost::lock_guard lock(*m_ParentCounterLock); + (*m_ParentCounter)++; + } + + m_FinishCV->notify_one(); } - - m_FinishCV->notify_one(); } ThreadGroup::ThreadGroup( int numThreads, const ImageFile &image, CompressionFunc func, unsigned char *outBuf ) - : m_Barrier(new boost::barrier(numThreads)) + : m_StartBarrier(new boost::barrier(numThreads + 1)) , m_FinishMutex(new boost::mutex()) , m_FinishCV(new boost::condition_variable()) , m_NumThreads(numThreads) @@ -53,18 +61,21 @@ ThreadGroup::ThreadGroup( int numThreads, const ImageFile &image, CompressionFun , m_Func(func) , m_Image(image) , m_OutBuf(outBuf) + , m_ThreadState(eThreadState_Done) + , m_ExitFlag(false) { for(int i = 0; i < kMaxNumThreads; i++) { // Thread synchronization primitives m_Threads[i].m_ParentCounterLock = m_FinishMutex; m_Threads[i].m_FinishCV = m_FinishCV; m_Threads[i].m_ParentCounter = &m_ThreadsFinished; - m_Threads[i].m_Barrier = m_Barrier; + m_Threads[i].m_StartBarrier = m_StartBarrier; + m_Threads[i].m_ParentExitFlag = &m_ExitFlag; } } ThreadGroup::~ThreadGroup() { - delete m_Barrier; + delete m_StartBarrier; delete m_FinishMutex; delete m_FinishCV; } @@ -83,11 +94,17 @@ unsigned int ThreadGroup::GetUncompressedBlockSize() { #endif } -void ThreadGroup::Start() { +bool ThreadGroup::PrepareThreads() { + + // Make sure that threads aren't running. + if(m_ThreadState != eThreadState_Done) { + return false; + } // Have we already activated the thread group? if(m_ActiveThreads > 0) { - return; + m_ThreadState = eThreadState_Waiting; + return true; } // Make sure that the image dimensions are multiples of 4 @@ -127,8 +144,55 @@ void ThreadGroup::Start() { m_ActiveThreads++; } + m_ThreadState = eThreadState_Waiting; + return true; +} + +bool ThreadGroup::Start() { + + if(m_ActiveThreads <= 0) { + return false; + } + + if(m_ThreadState != eThreadState_Waiting) { + return false; + } + m_StopWatch.Reset(); m_StopWatch.Start(); + + // Last thread to activate the barrier is this one. + m_ThreadState = eThreadState_Running; + m_StartBarrier->wait(); + + return true; +} + +bool ThreadGroup::CleanUpThreads() { + + // Are the threads currently running? + if(m_ThreadState == eThreadState_Running) { + // ... if so, wait for them to finish + Join(); + } + + assert(m_ThreadState == eThreadState_Done || m_ThreadState == eThreadState_Waiting); + + // Mark all threads for exit + m_ExitFlag = true; + + // Hit the barrier to signal them to go. + m_StartBarrier->wait(); + + // Clean up. + for(int i = 0; i < m_ActiveThreads; i++) { + m_ThreadHandles[i]->join(); + delete m_ThreadHandles[i]; + } + + // Reset active number of threads... + m_ActiveThreads = 0; + m_ExitFlag = false; } void ThreadGroup::Join() { @@ -139,12 +203,6 @@ void ThreadGroup::Join() { } m_StopWatch.Stop(); - - for(int i = 0; i < m_ActiveThreads; i++) { - m_ThreadHandles[i]->join(); - delete m_ThreadHandles[i]; - } - - // Reset active number of threads... - m_ActiveThreads = 0; + m_ThreadState = eThreadState_Done; + m_ThreadsFinished = 0; } diff --git a/Core/src/ThreadGroup.h b/Core/src/ThreadGroup.h index d9536b2..5435ffc 100644 --- a/Core/src/ThreadGroup.h +++ b/Core/src/ThreadGroup.h @@ -6,9 +6,9 @@ // forward declare namespace boost { - class barrier; class thread; class mutex; + class barrier; class condition_variable; } @@ -16,13 +16,13 @@ struct CmpThread { friend class ThreadGroup; private: - int *m_ParentCounter; + boost::barrier *m_StartBarrier; + int *m_ParentCounter; + boost::mutex *m_ParentCounterLock; boost::condition_variable *m_FinishCV; - boost::barrier *m_Barrier; - int m_Width; int m_Height; @@ -31,6 +31,8 @@ private: unsigned char *m_OutBuf; const unsigned char *m_InBuf; + bool *m_ParentExitFlag; + CmpThread(); public: @@ -43,13 +45,22 @@ class ThreadGroup { ThreadGroup( int numThreads, const ImageFile &, CompressionFunc func, unsigned char *outBuf ); ~ThreadGroup(); - void Start(); + bool PrepareThreads(); + bool Start(); void Join(); + bool CleanUpThreads(); const StopWatch &GetStopWatch() const { return m_StopWatch; } + enum EThreadState { + eThreadState_Waiting, + eThreadState_Running, + eThreadState_Done + }; + private: - boost::barrier *const m_Barrier; + boost::barrier *const m_StartBarrier; + boost::mutex *const m_FinishMutex; boost::condition_variable *const m_FinishCV; @@ -71,6 +82,9 @@ class ThreadGroup { unsigned int GetUncompressedBlockSize(); StopWatch m_StopWatch; + + EThreadState m_ThreadState; + bool m_ExitFlag; }; #endif // _THREAD_GROUP_H_