Redid thread synchonization in order to be able to signal threads to run without having to initialize them again. We can use this to average the running times.

This commit is contained in:
Pavel Krajcevski 2012-09-15 12:13:32 -04:00
parent e25e5bae90
commit 7116cc8b89
3 changed files with 113 additions and 32 deletions

View file

@ -4,6 +4,7 @@
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#include <assert.h>
template <typename T> template <typename T>
static T min(const T &a, const T &b) { static T min(const T &a, const T &b) {
@ -25,6 +26,7 @@ SCompressionSettings:: SCompressionSettings()
, bUseSIMD(false) , bUseSIMD(false)
, iNumThreads(1) , iNumThreads(1)
, iQuality(50) , iQuality(50)
, iNumCompressions(1)
{ {
clamp(iQuality, 0, 256); clamp(iQuality, 0, 256);
} }
@ -90,15 +92,22 @@ CompressedImage * CompressImage(
if(f) { if(f) {
StopWatch stopWatch = StopWatch(); StopWatch stopWatch = StopWatch();
double cmpMSTime = 0.0;
if(settings.iNumThreads > 1) { if(settings.iNumThreads > 1) {
ThreadGroup tgrp (settings.iNumThreads, img, f, cmpData); ThreadGroup tgrp (settings.iNumThreads, img, f, cmpData);
if(!(tgrp.PrepareThreads())) {
assert(!"Thread group failed to prepare threads?!");
return NULL;
}
tgrp.Start(); tgrp.Start();
tgrp.Join(); tgrp.Join();
stopWatch = tgrp.GetStopWatch(); stopWatch = tgrp.GetStopWatch();
tgrp.CleanUpThreads();
} }
else { else {
stopWatch.Start(); stopWatch.Start();

View file

@ -10,42 +10,50 @@
#include <boost/thread/condition_variable.hpp> #include <boost/thread/condition_variable.hpp>
CmpThread::CmpThread() CmpThread::CmpThread()
: m_ParentCounter(NULL) : m_StartBarrier(NULL)
, m_ParentCounter(NULL)
, m_ParentCounterLock(NULL) , m_ParentCounterLock(NULL)
, m_FinishCV(NULL) , m_FinishCV(NULL)
, m_Barrier(NULL)
, m_Width(0) , m_Width(0)
, m_Height(0) , m_Height(0)
, m_CmpFunc(NULL) , m_CmpFunc(NULL)
, m_OutBuf(NULL) , m_OutBuf(NULL)
, m_InBuf(NULL) , m_InBuf(NULL)
, m_ParentExitFlag(NULL)
{ } { }
void CmpThread::operator()() { void CmpThread::operator()() {
if(!m_Barrier || !m_CmpFunc || !m_OutBuf || !m_InBuf if(!m_CmpFunc || !m_OutBuf || !m_InBuf
|| !m_ParentCounter || !m_ParentCounterLock || !m_ParentCounter || !m_ParentCounterLock || !m_FinishCV
|| !m_FinishCV || !m_StartBarrier
|| !m_ParentExitFlag
) { ) {
fprintf(stderr, "Incorrect thread initialization.\n"); fprintf(stderr, "Incorrect thread initialization.\n");
return; return;
} }
// Wait for all threads to be ready... while(1) {
m_Barrier->wait(); // Wait for signal to start work...
m_StartBarrier->wait();
(*m_CmpFunc)(m_InBuf, m_OutBuf, m_Width, m_Height); if(*m_ParentExitFlag) {
return;
}
{ (*m_CmpFunc)(m_InBuf, m_OutBuf, m_Width, m_Height);
boost::lock_guard<boost::mutex> lock(*m_ParentCounterLock);
(*m_ParentCounter)++; {
boost::lock_guard<boost::mutex> lock(*m_ParentCounterLock);
(*m_ParentCounter)++;
}
m_FinishCV->notify_one();
} }
m_FinishCV->notify_one();
} }
ThreadGroup::ThreadGroup( int numThreads, const ImageFile &image, CompressionFunc func, unsigned char *outBuf ) ThreadGroup::ThreadGroup( int numThreads, const ImageFile &image, CompressionFunc func, unsigned char *outBuf )
: m_Barrier(new boost::barrier(numThreads)) : m_StartBarrier(new boost::barrier(numThreads + 1))
, m_FinishMutex(new boost::mutex()) , m_FinishMutex(new boost::mutex())
, m_FinishCV(new boost::condition_variable()) , m_FinishCV(new boost::condition_variable())
, m_NumThreads(numThreads) , m_NumThreads(numThreads)
@ -53,18 +61,21 @@ ThreadGroup::ThreadGroup( int numThreads, const ImageFile &image, CompressionFun
, m_Func(func) , m_Func(func)
, m_Image(image) , m_Image(image)
, m_OutBuf(outBuf) , m_OutBuf(outBuf)
, m_ThreadState(eThreadState_Done)
, m_ExitFlag(false)
{ {
for(int i = 0; i < kMaxNumThreads; i++) { for(int i = 0; i < kMaxNumThreads; i++) {
// Thread synchronization primitives // Thread synchronization primitives
m_Threads[i].m_ParentCounterLock = m_FinishMutex; m_Threads[i].m_ParentCounterLock = m_FinishMutex;
m_Threads[i].m_FinishCV = m_FinishCV; m_Threads[i].m_FinishCV = m_FinishCV;
m_Threads[i].m_ParentCounter = &m_ThreadsFinished; m_Threads[i].m_ParentCounter = &m_ThreadsFinished;
m_Threads[i].m_Barrier = m_Barrier; m_Threads[i].m_StartBarrier = m_StartBarrier;
m_Threads[i].m_ParentExitFlag = &m_ExitFlag;
} }
} }
ThreadGroup::~ThreadGroup() { ThreadGroup::~ThreadGroup() {
delete m_Barrier; delete m_StartBarrier;
delete m_FinishMutex; delete m_FinishMutex;
delete m_FinishCV; delete m_FinishCV;
} }
@ -83,11 +94,17 @@ unsigned int ThreadGroup::GetUncompressedBlockSize() {
#endif #endif
} }
void ThreadGroup::Start() { bool ThreadGroup::PrepareThreads() {
// Make sure that threads aren't running.
if(m_ThreadState != eThreadState_Done) {
return false;
}
// Have we already activated the thread group? // Have we already activated the thread group?
if(m_ActiveThreads > 0) { if(m_ActiveThreads > 0) {
return; m_ThreadState = eThreadState_Waiting;
return true;
} }
// Make sure that the image dimensions are multiples of 4 // Make sure that the image dimensions are multiples of 4
@ -127,8 +144,55 @@ void ThreadGroup::Start() {
m_ActiveThreads++; m_ActiveThreads++;
} }
m_ThreadState = eThreadState_Waiting;
return true;
}
bool ThreadGroup::Start() {
if(m_ActiveThreads <= 0) {
return false;
}
if(m_ThreadState != eThreadState_Waiting) {
return false;
}
m_StopWatch.Reset(); m_StopWatch.Reset();
m_StopWatch.Start(); m_StopWatch.Start();
// Last thread to activate the barrier is this one.
m_ThreadState = eThreadState_Running;
m_StartBarrier->wait();
return true;
}
bool ThreadGroup::CleanUpThreads() {
// Are the threads currently running?
if(m_ThreadState == eThreadState_Running) {
// ... if so, wait for them to finish
Join();
}
assert(m_ThreadState == eThreadState_Done || m_ThreadState == eThreadState_Waiting);
// Mark all threads for exit
m_ExitFlag = true;
// Hit the barrier to signal them to go.
m_StartBarrier->wait();
// Clean up.
for(int i = 0; i < m_ActiveThreads; i++) {
m_ThreadHandles[i]->join();
delete m_ThreadHandles[i];
}
// Reset active number of threads...
m_ActiveThreads = 0;
m_ExitFlag = false;
} }
void ThreadGroup::Join() { void ThreadGroup::Join() {
@ -139,12 +203,6 @@ void ThreadGroup::Join() {
} }
m_StopWatch.Stop(); m_StopWatch.Stop();
m_ThreadState = eThreadState_Done;
for(int i = 0; i < m_ActiveThreads; i++) { m_ThreadsFinished = 0;
m_ThreadHandles[i]->join();
delete m_ThreadHandles[i];
}
// Reset active number of threads...
m_ActiveThreads = 0;
} }

View file

@ -6,9 +6,9 @@
// forward declare // forward declare
namespace boost { namespace boost {
class barrier;
class thread; class thread;
class mutex; class mutex;
class barrier;
class condition_variable; class condition_variable;
} }
@ -16,13 +16,13 @@ struct CmpThread {
friend class ThreadGroup; friend class ThreadGroup;
private: private:
int *m_ParentCounter; boost::barrier *m_StartBarrier;
int *m_ParentCounter;
boost::mutex *m_ParentCounterLock; boost::mutex *m_ParentCounterLock;
boost::condition_variable *m_FinishCV; boost::condition_variable *m_FinishCV;
boost::barrier *m_Barrier;
int m_Width; int m_Width;
int m_Height; int m_Height;
@ -31,6 +31,8 @@ private:
unsigned char *m_OutBuf; unsigned char *m_OutBuf;
const unsigned char *m_InBuf; const unsigned char *m_InBuf;
bool *m_ParentExitFlag;
CmpThread(); CmpThread();
public: public:
@ -43,13 +45,22 @@ class ThreadGroup {
ThreadGroup( int numThreads, const ImageFile &, CompressionFunc func, unsigned char *outBuf ); ThreadGroup( int numThreads, const ImageFile &, CompressionFunc func, unsigned char *outBuf );
~ThreadGroup(); ~ThreadGroup();
void Start(); bool PrepareThreads();
bool Start();
void Join(); void Join();
bool CleanUpThreads();
const StopWatch &GetStopWatch() const { return m_StopWatch; } const StopWatch &GetStopWatch() const { return m_StopWatch; }
enum EThreadState {
eThreadState_Waiting,
eThreadState_Running,
eThreadState_Done
};
private: private:
boost::barrier *const m_Barrier; boost::barrier *const m_StartBarrier;
boost::mutex *const m_FinishMutex; boost::mutex *const m_FinishMutex;
boost::condition_variable *const m_FinishCV; boost::condition_variable *const m_FinishCV;
@ -71,6 +82,9 @@ class ThreadGroup {
unsigned int GetUncompressedBlockSize(); unsigned int GetUncompressedBlockSize();
StopWatch m_StopWatch; StopWatch m_StopWatch;
EThreadState m_ThreadState;
bool m_ExitFlag;
}; };
#endif // _THREAD_GROUP_H_ #endif // _THREAD_GROUP_H_