From 01c01b96cd5a5fb841cbc423d12071adb7636df1 Mon Sep 17 00:00:00 2001 From: Pavel Krajcevski Date: Tue, 25 Sep 2012 17:05:52 -0400 Subject: [PATCH] For more accurate timing results with the worker queue, make sure that all threads are done processing an image before restarting the compression when testing multiple compressions. --- Core/src/WorkerQueue.cpp | 60 ++++++++++++++++++++++++++++++++-------- Core/src/WorkerQueue.h | 4 ++- 2 files changed, 51 insertions(+), 13 deletions(-) diff --git a/Core/src/WorkerQueue.cpp b/Core/src/WorkerQueue.cpp index ae7ae6a..8cb54a5 100644 --- a/Core/src/WorkerQueue.cpp +++ b/Core/src/WorkerQueue.cpp @@ -41,16 +41,39 @@ void WorkerThread::operator()() { return; } - while(1) { + bool quitFlag = false; + while(!quitFlag) { - EAction action = m_Parent->AcceptThreadData(m_ThreadIdx); - if(eAction_Quit == action) { - break; - } + switch(m_Parent->AcceptThreadData(m_ThreadIdx)) + { - const uint8 *src = m_Parent->GetSrcForThread(m_ThreadIdx); - uint8 *dst = m_Parent->GetDstForThread(m_ThreadIdx); - (*f)(src, dst, 4 * m_Parent->GetNumBlocksForThread(m_ThreadIdx), 4); + case eAction_Quit: + { + quitFlag = true; + break; + } + + case eAction_Wait: + { + boost::thread::yield(); + break; + } + + case eAction_DoWork: + { + const uint8 *src = m_Parent->GetSrcForThread(m_ThreadIdx); + uint8 *dst = m_Parent->GetDstForThread(m_ThreadIdx); + (*f)(src, dst, 4 * m_Parent->GetNumBlocksForThread(m_ThreadIdx), 4); + break; + } + + default: + { + fprintf(stderr, "Unrecognized thread command!\n"); + quitFlag = true; + break; + } + } } m_Parent->NotifyWorkerFinished(); @@ -70,6 +93,7 @@ WorkerQueue::WorkerQueue( : m_NumCompressions(0) , m_TotalNumCompressions(max(uint32(1), numCompressions)) , m_NumThreads(numThreads) + , m_WaitingThreads(0) , m_ActiveThreads(0) , m_JobSize(max(uint32(1), jobSize)) , m_InBufSz(inBufSz) @@ -99,6 +123,8 @@ void WorkerQueue::Run() { m_StopWatch.Reset(); m_StopWatch.Start(); + m_WaitingThreads = 0; + // Wait for them to finish... while(m_ActiveThreads > 0) { m_CV.wait(lock); @@ -134,8 +160,18 @@ WorkerThread::EAction WorkerQueue::AcceptThreadData(uint32 threadIdx) { // If we've completed all blocks, then mark the thread for // completion. - if(m_NextBlock >= totalBlocks) { - return WorkerThread::eAction_Quit; + if(m_NextBlock == totalBlocks) { + if(m_NumCompressions < m_TotalNumCompressions) { + if(++m_WaitingThreads == m_ActiveThreads) { + m_NextBlock = 0; + m_WaitingThreads = 0; + } else { + return WorkerThread::eAction_Wait; + } + } + else { + return WorkerThread::eAction_Quit; + } } // Otherwise, this thread's offset is the current block... @@ -149,8 +185,8 @@ WorkerThread::EAction WorkerQueue::AcceptThreadData(uint32 threadIdx) { // Make sure the next block is updated. m_NextBlock += blocksProcessed; - if(m_NextBlock == totalBlocks && ++m_NumCompressions < m_TotalNumCompressions) { - m_NextBlock = 0; + if(m_NextBlock == totalBlocks) { + ++m_NumCompressions; } return WorkerThread::eAction_DoWork; diff --git a/Core/src/WorkerQueue.h b/Core/src/WorkerQueue.h index 3280ad1..b6cac35 100644 --- a/Core/src/WorkerQueue.h +++ b/Core/src/WorkerQueue.h @@ -22,6 +22,7 @@ public: void operator ()(); enum EAction { + eAction_Wait, eAction_DoWork, eAction_Quit, @@ -53,10 +54,10 @@ class WorkerQueue { const StopWatch &GetStopWatch() const { return m_StopWatch; } private: - uint32 m_NumCompressions; const uint32 m_TotalNumCompressions; uint32 m_NumThreads; + uint32 m_WaitingThreads; uint32 m_ActiveThreads; uint32 m_JobSize; uint32 m_InBufSz; @@ -65,6 +66,7 @@ class WorkerQueue { boost::condition_variable m_CV; boost::mutex m_Mutex; + uint32 m_NextBlock; static const int kMaxNumWorkerThreads = 256;