For more accurate timing results with the worker queue, make sure that all threads are done processing an image before restarting the compression when testing multiple compressions.

This commit is contained in:
Pavel Krajcevski 2012-09-25 17:05:52 -04:00
parent 54cb951f71
commit 01c01b96cd
2 changed files with 51 additions and 13 deletions

View file

@ -41,16 +41,39 @@ void WorkerThread::operator()() {
return; return;
} }
while(1) { bool quitFlag = false;
while(!quitFlag) {
EAction action = m_Parent->AcceptThreadData(m_ThreadIdx); switch(m_Parent->AcceptThreadData(m_ThreadIdx))
if(eAction_Quit == action) { {
break;
}
const uint8 *src = m_Parent->GetSrcForThread(m_ThreadIdx); case eAction_Quit:
uint8 *dst = m_Parent->GetDstForThread(m_ThreadIdx); {
(*f)(src, dst, 4 * m_Parent->GetNumBlocksForThread(m_ThreadIdx), 4); quitFlag = true;
break;
}
case eAction_Wait:
{
boost::thread::yield();
break;
}
case eAction_DoWork:
{
const uint8 *src = m_Parent->GetSrcForThread(m_ThreadIdx);
uint8 *dst = m_Parent->GetDstForThread(m_ThreadIdx);
(*f)(src, dst, 4 * m_Parent->GetNumBlocksForThread(m_ThreadIdx), 4);
break;
}
default:
{
fprintf(stderr, "Unrecognized thread command!\n");
quitFlag = true;
break;
}
}
} }
m_Parent->NotifyWorkerFinished(); m_Parent->NotifyWorkerFinished();
@ -70,6 +93,7 @@ WorkerQueue::WorkerQueue(
: m_NumCompressions(0) : m_NumCompressions(0)
, m_TotalNumCompressions(max(uint32(1), numCompressions)) , m_TotalNumCompressions(max(uint32(1), numCompressions))
, m_NumThreads(numThreads) , m_NumThreads(numThreads)
, m_WaitingThreads(0)
, m_ActiveThreads(0) , m_ActiveThreads(0)
, m_JobSize(max(uint32(1), jobSize)) , m_JobSize(max(uint32(1), jobSize))
, m_InBufSz(inBufSz) , m_InBufSz(inBufSz)
@ -99,6 +123,8 @@ void WorkerQueue::Run() {
m_StopWatch.Reset(); m_StopWatch.Reset();
m_StopWatch.Start(); m_StopWatch.Start();
m_WaitingThreads = 0;
// Wait for them to finish... // Wait for them to finish...
while(m_ActiveThreads > 0) { while(m_ActiveThreads > 0) {
m_CV.wait(lock); m_CV.wait(lock);
@ -134,8 +160,18 @@ WorkerThread::EAction WorkerQueue::AcceptThreadData(uint32 threadIdx) {
// If we've completed all blocks, then mark the thread for // If we've completed all blocks, then mark the thread for
// completion. // completion.
if(m_NextBlock >= totalBlocks) { if(m_NextBlock == totalBlocks) {
return WorkerThread::eAction_Quit; if(m_NumCompressions < m_TotalNumCompressions) {
if(++m_WaitingThreads == m_ActiveThreads) {
m_NextBlock = 0;
m_WaitingThreads = 0;
} else {
return WorkerThread::eAction_Wait;
}
}
else {
return WorkerThread::eAction_Quit;
}
} }
// Otherwise, this thread's offset is the current block... // Otherwise, this thread's offset is the current block...
@ -149,8 +185,8 @@ WorkerThread::EAction WorkerQueue::AcceptThreadData(uint32 threadIdx) {
// Make sure the next block is updated. // Make sure the next block is updated.
m_NextBlock += blocksProcessed; m_NextBlock += blocksProcessed;
if(m_NextBlock == totalBlocks && ++m_NumCompressions < m_TotalNumCompressions) { if(m_NextBlock == totalBlocks) {
m_NextBlock = 0; ++m_NumCompressions;
} }
return WorkerThread::eAction_DoWork; return WorkerThread::eAction_DoWork;

View file

@ -22,6 +22,7 @@ public:
void operator ()(); void operator ()();
enum EAction { enum EAction {
eAction_Wait,
eAction_DoWork, eAction_DoWork,
eAction_Quit, eAction_Quit,
@ -53,10 +54,10 @@ class WorkerQueue {
const StopWatch &GetStopWatch() const { return m_StopWatch; } const StopWatch &GetStopWatch() const { return m_StopWatch; }
private: private:
uint32 m_NumCompressions; uint32 m_NumCompressions;
const uint32 m_TotalNumCompressions; const uint32 m_TotalNumCompressions;
uint32 m_NumThreads; uint32 m_NumThreads;
uint32 m_WaitingThreads;
uint32 m_ActiveThreads; uint32 m_ActiveThreads;
uint32 m_JobSize; uint32 m_JobSize;
uint32 m_InBufSz; uint32 m_InBufSz;
@ -65,6 +66,7 @@ class WorkerQueue {
boost::condition_variable m_CV; boost::condition_variable m_CV;
boost::mutex m_Mutex; boost::mutex m_Mutex;
uint32 m_NextBlock; uint32 m_NextBlock;
static const int kMaxNumWorkerThreads = 256; static const int kMaxNumWorkerThreads = 256;