diff --git a/Core/src/TexComp.cpp b/Core/src/TexComp.cpp index 41bdaea..ce79d3c 100644 --- a/Core/src/TexComp.cpp +++ b/Core/src/TexComp.cpp @@ -6,10 +6,13 @@ #include #include "BC7Compressor.h" +#include "WorkerQueue.h" #include "ThreadGroup.h" + #include "ImageFile.h" #include "Image.h" + template static T min(const T &a, const T &b) { return (a < b)? a : b; @@ -125,12 +128,22 @@ static double CompressImageWithThreads( } static double CompressImageWithWorkerQueue( - const ImageFile &img, + const unsigned char *imgData, + const unsigned int imgDataSz, const SCompressionSettings &settings, const CompressionFunc f, unsigned char *outBuf ) { - return 0.0; + WorkerQueue wq ( + settings.iNumThreads, + settings.iJobSize, + imgData, + imgDataSz, + f, + outBuf + ); + + wq.Run(); } bool CompressImageData( @@ -178,7 +191,10 @@ bool CompressImageData( double cmpMSTime = 0.0; if(settings.iNumThreads > 1) { - cmpMSTime = CompressImageWithThreads(data, dataSz, settings, f, cmpData); + if(settings.iJobSize > 0) + cmpMSTime = CompressImageWithWorkerQueue(data, dataSz, settings, f, cmpData); + else + cmpMSTime = CompressImageWithThreads(data, dataSz, settings, f, cmpData); } else { cmpMSTime = CompressImageInSerial(data, dataSz, settings, f, cmpData); diff --git a/Core/src/WorkerQueue.cpp b/Core/src/WorkerQueue.cpp index ea7ec5c..cdb22a7 100644 --- a/Core/src/WorkerQueue.cpp +++ b/Core/src/WorkerQueue.cpp @@ -6,9 +6,22 @@ #include #include -#include -#include -#include + +template +static inline T max(const T &a, const T &b) { + return (a > b)? a : b; +} + +template +static inline T min(const T &a, const T &b) { + return (a < b)? a : b; +} + +template +static inline void clamp(T &x, const T &min, const T &max) { + if(x < min) x = min; + else if(x > max) x = max; +} WorkerThread::WorkerThread(WorkerQueue * parent, uint32 idx) : m_ThreadIdx(idx) @@ -28,159 +41,131 @@ void WorkerThread::operator()() { return; } + while(1) { + + EAction action = m_Parent->AcceptThreadData(m_ThreadIdx); + if(eAction_Quit == action) { + break; + } + + const uint8 *src = m_Parent->GetSrcForThread(m_ThreadIdx); + uint8 *dst = m_Parent->GetDstForThread(m_ThreadIdx); + (*f)(src, dst, 4 * m_Parent->GetNumBlocksForThread(m_ThreadIdx), 4); + } + + m_Parent->NotifyWorkerFinished(); + return; } -#if 0 -ThreadGroup::ThreadGroup( int numThreads, const unsigned char *inBuf, unsigned int inBufSz, CompressionFunc func, unsigned char *outBuf ) - : m_StartBarrier(new boost::barrier(numThreads + 1)) - , m_FinishMutex(new boost::mutex()) - , m_FinishCV(new boost::condition_variable()) - , m_NumThreads(numThreads) +WorkerQueue::WorkerQueue( + uint32 numThreads, + uint32 jobSize, + const uint8 *inBuf, + uint32 inBufSz, + CompressionFunc func, + uint8 *outBuf +) + : m_NumThreads(numThreads) , m_ActiveThreads(0) - , m_Func(func) - , m_ImageDataSz(inBufSz) - , m_ImageData(inBuf) + , m_JobSize(max(uint32(1), jobSize)) + , m_InBufSz(inBufSz) + , m_InBuf(inBuf) , m_OutBuf(outBuf) - , m_ThreadState(eThreadState_Done) - , m_ExitFlag(false) -{ - for(int i = 0; i < kMaxNumThreads; i++) { - // Thread synchronization primitives - m_Threads[i].m_ParentCounterLock = m_FinishMutex; - m_Threads[i].m_FinishCV = m_FinishCV; - m_Threads[i].m_ParentCounter = &m_ThreadsFinished; - m_Threads[i].m_StartBarrier = m_StartBarrier; - m_Threads[i].m_ParentExitFlag = &m_ExitFlag; + , m_CompressionFunc(func) +{ + clamp(m_NumThreads, uint32(1), kMaxNumWorkerThreads); + +#ifndef NDEBUG + if(m_InBufSz % 64) { + fprintf(stderr, "WorkerQueue.cpp -- WARNING: InBufSz not a multiple of 64. Are you sure that your image dimensions are correct?"); } -} - -ThreadGroup::~ThreadGroup() { - delete m_StartBarrier; - delete m_FinishMutex; - delete m_FinishCV; -} - -unsigned int ThreadGroup::GetCompressedBlockSize() { - if(m_Func == BC7C::CompressImageBC7) return 16; -#ifdef HAS_SSE_41 - if(m_Func == BC7C::CompressImageBC7SIMD) return 16; #endif } -unsigned int ThreadGroup::GetUncompressedBlockSize() { - if(m_Func == BC7C::CompressImageBC7) return 64; -#ifdef HAS_SSE_41 - if(m_Func == BC7C::CompressImageBC7SIMD) return 64; -#endif -} - -bool ThreadGroup::PrepareThreads() { - - // Make sure that threads aren't running. - if(m_ThreadState != eThreadState_Done) { - return false; - } - - // Have we already activated the thread group? - if(m_ActiveThreads > 0) { - m_ThreadState = eThreadState_Waiting; - return true; - } - - // We can assume that the image data is in block stream order - // so, the size of the data given to each thread will be (nb*4)x4 - int numBlocks = m_ImageDataSz / 64; - - int blocksProcessed = 0; - int blocksPerThread = (numBlocks/m_NumThreads) + ((numBlocks % m_NumThreads)? 1 : 0); - - // Currently no threads are finished... - m_ThreadsFinished = 0; +void WorkerQueue::Run() { + + // Spawn a bunch of threads... + boost::unique_lock lock(m_Mutex); for(int i = 0; i < m_NumThreads; i++) { - - if(m_ActiveThreads >= kMaxNumThreads) - break; - - int numBlocksThisThread = blocksPerThread; - if(blocksProcessed + numBlocksThisThread > numBlocks) { - numBlocksThisThread = numBlocks - blocksProcessed; - } - - CmpThread &t = m_Threads[m_ActiveThreads]; - t.m_Height = 4; - t.m_Width = numBlocksThisThread * 4; - t.m_CmpFunc = m_Func; - t.m_OutBuf = m_OutBuf + (blocksProcessed * GetCompressedBlockSize()); - t.m_InBuf = m_ImageData + (blocksProcessed * GetUncompressedBlockSize()); - - blocksProcessed += numBlocksThisThread; - + WorkerThread t (this, i); m_ThreadHandles[m_ActiveThreads] = new boost::thread(t); - m_ActiveThreads++; } - m_ThreadState = eThreadState_Waiting; - return true; -} - -bool ThreadGroup::Start() { - - if(m_ActiveThreads <= 0) { - return false; + // Wait for them to finish... + while(m_ActiveThreads > 0) { + m_CV.wait(lock); } - if(m_ThreadState != eThreadState_Waiting) { - return false; - } - - m_StopWatch.Reset(); - m_StopWatch.Start(); - - // Last thread to activate the barrier is this one. - m_ThreadState = eThreadState_Running; - m_StartBarrier->wait(); - - return true; -} - -bool ThreadGroup::CleanUpThreads() { - - // Are the threads currently running? - if(m_ThreadState == eThreadState_Running) { - // ... if so, wait for them to finish - Join(); - } - - assert(m_ThreadState == eThreadState_Done || m_ThreadState == eThreadState_Waiting); - - // Mark all threads for exit - m_ExitFlag = true; - - // Hit the barrier to signal them to go. - m_StartBarrier->wait(); - - // Clean up. - for(int i = 0; i < m_ActiveThreads; i++) { + // Join them all together.. + for(int i = 0; i < m_NumThreads; i++) { m_ThreadHandles[i]->join(); delete m_ThreadHandles[i]; } - - // Reset active number of threads... - m_ActiveThreads = 0; - m_ExitFlag = false; } -void ThreadGroup::Join() { +void WorkerQueue::NotifyWorkerFinished() { + { + boost::lock_guard lock(m_Mutex); + m_ActiveThreads--; + } + m_CV.notify_one(); +} - boost::unique_lock lock(*m_FinishMutex); - while(m_ThreadsFinished != m_ActiveThreads) { - m_FinishCV->wait(lock); +WorkerThread::EAction WorkerQueue::AcceptThreadData(uint32 threadIdx) { + if(threadIdx < 0 || threadIdx >= m_ActiveThreads) { + return WorkerThread::eAction_Quit; } - m_StopWatch.Stop(); - m_ThreadState = eThreadState_Done; - m_ThreadsFinished = 0; + // How many blocks total do we have? + const uint32 totalBlocks = m_InBufSz / 64; + + // Make sure we have exclusive access... + boost::lock_guard lock(m_Mutex); + + // If we've completed all blocks, then mark the thread for + // completion. + if(m_NextBlock >= totalBlocks) { + return WorkerThread::eAction_Quit; + } + + // Otherwise, this thread's offset is the current block... + m_Offsets[threadIdx] = m_NextBlock; + + // The number of blocks to process is either the job size + // or the number of blocks remaining. + int blocksProcessed = min(m_JobSize, totalBlocks - m_NextBlock); + m_NumBlocks[threadIdx] = blocksProcessed; + + // Make sure the next block is updated. + m_NextBlock += blocksProcessed; + + return WorkerThread::eAction_DoWork; +} + +const uint8 *WorkerQueue::GetSrcForThread(const int threadIdx) const { + assert(m_Offsets[threadIdx] >= 0); + assert(threadIdx >= 0); + assert(threadIdx < m_NumThreads); + + const uint32 inBufBlockSz = 16 * 4; + return m_InBuf + m_Offsets[threadIdx] * inBufBlockSz; +} + +uint8 *WorkerQueue::GetDstForThread(const int threadIdx) const { + assert(m_Offsets[threadIdx] >= 0); + assert(threadIdx >= 0); + assert(threadIdx < m_NumThreads); + + const uint32 outBufBlockSz = 16; + return m_OutBuf + m_Offsets[threadIdx] * outBufBlockSz; +} + +uint32 WorkerQueue::GetNumBlocksForThread(const int threadIdx) const { + assert(m_Offsets[threadIdx] >= 0); + assert(threadIdx >= 0); + assert(threadIdx < m_NumThreads); + + return m_NumBlocks[threadIdx]; } -#endif diff --git a/Core/src/WorkerQueue.h b/Core/src/WorkerQueue.h index 2b4079d..648719d 100644 --- a/Core/src/WorkerQueue.h +++ b/Core/src/WorkerQueue.h @@ -1,18 +1,18 @@ #ifndef __TEXCOMP_WORKDER_QUEUE_H__ #define __TEXCOMP_WORKDER_QUEUE_H__ -#include "TexCompTypes.h" -#include "TexComp.h" - // Forward declare... class WorkerQueue; namespace boost { class thread; - class mutex; - class barrier; - class condition_variable; } +// Necessary includes... +#include "TexCompTypes.h" +#include "TexComp.h" +#include +#include + struct WorkerThread { friend class WorkerQueue; public: @@ -20,6 +20,13 @@ public: WorkerThread(WorkerQueue *, uint32 idx); void operator ()(); + enum EAction { + eAction_DoWork, + eAction_Quit, + + kNumWorkerThreadActions + }; + private: uint32 m_ThreadIdx; WorkerQueue *const m_Parent; @@ -39,21 +46,37 @@ class WorkerQueue { ~WorkerQueue() { } - // Runs the + // Runs the workers void Run(); private: - static const int kMaxNumWorkerThreads = 256; - int m_Offsets[kMaxNumWorkerThreads]; + uint32 m_NumThreads; + uint32 m_ActiveThreads; + uint32 m_JobSize; + uint32 m_InBufSz; + const uint8 *m_InBuf; + uint8 *m_OutBuf; - int GetOffsetForThread(const int threadIdx) const; + boost::condition_variable m_CV; + boost::mutex m_Mutex; + uint32 m_NextBlock; + + static const uint32 kMaxNumWorkerThreads = 256; + uint32 m_Offsets[kMaxNumWorkerThreads]; + uint32 m_NumBlocks[kMaxNumWorkerThreads]; + + boost::thread *m_ThreadHandles[kMaxNumWorkerThreads]; + + const uint8 *GetSrcForThread(const int threadIdx) const; + uint8 *GetDstForThread(const int threadIdx) const; + uint32 GetNumBlocksForThread(const int threadIdx) const; const CompressionFunc m_CompressionFunc; CompressionFunc GetCompressionFunc() const { return m_CompressionFunc; } - void SignalThreadReady(int threadIdx); - bool AcceptThreadData(int threadIdx) const; + WorkerThread::EAction AcceptThreadData(uint32 threadIdx); + void NotifyWorkerFinished(); }; #endif //__TEXCOMP_WORKDER_QUEUE_H__