Add command line options to invoke worker queue.

This commit is contained in:
Pavel Krajcevski 2012-09-21 18:43:35 -04:00
parent c7bb6170f3
commit 54cb951f71
5 changed files with 45 additions and 3 deletions

View file

@ -18,6 +18,7 @@ int main(int argc, char **argv) {
exit(1); exit(1);
} }
int numJobs = 0;
int quality = 50; int quality = 50;
int numThreads = 1; int numThreads = 1;
int numCompressions = 1; int numCompressions = 1;
@ -69,6 +70,18 @@ int main(int argc, char **argv) {
knowArg = true; knowArg = true;
} }
if(strcmp(argv[fileArg], "-j") == 0) {
fileArg++;
if(fileArg == argc || (numJobs = atoi(argv[fileArg])) < 0) {
PrintUsage();
exit(1);
}
fileArg++;
knowArg = true;
}
} while(knowArg); } while(knowArg);
if(fileArg == argc) { if(fileArg == argc) {
@ -88,6 +101,7 @@ int main(int argc, char **argv) {
settings.iNumThreads = numThreads; settings.iNumThreads = numThreads;
settings.iQuality = quality; settings.iQuality = quality;
settings.iNumCompressions = numCompressions; settings.iNumCompressions = numCompressions;
settings.iJobSize = numJobs;
CompressedImage *ci = img->Compress(settings); CompressedImage *ci = img->Compress(settings);
if(NULL == ci) { if(NULL == ci) {

View file

@ -1,3 +1,6 @@
#ifndef __TEXCOMP_STOP_WATCH_H__
#define __TEXCOMP_STOP_WATCH_H__
//-------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------
// Copyright 2011 Intel Corporation // Copyright 2011 Intel Corporation
// All Rights Reserved // All Rights Reserved
@ -41,3 +44,5 @@ public:
private: private:
StopWatchImpl *impl; StopWatchImpl *impl;
}; };
#endif // __TEXCOMP_STOP_WATCH_H__

View file

@ -135,6 +135,7 @@ static double CompressImageWithWorkerQueue(
unsigned char *outBuf unsigned char *outBuf
) { ) {
WorkerQueue wq ( WorkerQueue wq (
settings.iNumCompressions,
settings.iNumThreads, settings.iNumThreads,
settings.iJobSize, settings.iJobSize,
imgData, imgData,
@ -144,6 +145,9 @@ static double CompressImageWithWorkerQueue(
); );
wq.Run(); wq.Run();
return wq.GetStopWatch().TimeInMilliseconds() /
double(settings.iNumCompressions);
} }
bool CompressImageData( bool CompressImageData(

View file

@ -59,6 +59,7 @@ void WorkerThread::operator()() {
} }
WorkerQueue::WorkerQueue( WorkerQueue::WorkerQueue(
uint32 numCompressions,
uint32 numThreads, uint32 numThreads,
uint32 jobSize, uint32 jobSize,
const uint8 *inBuf, const uint8 *inBuf,
@ -66,7 +67,9 @@ WorkerQueue::WorkerQueue(
CompressionFunc func, CompressionFunc func,
uint8 *outBuf uint8 *outBuf
) )
: m_NumThreads(numThreads) : m_NumCompressions(0)
, m_TotalNumCompressions(max(uint32(1), numCompressions))
, m_NumThreads(numThreads)
, m_ActiveThreads(0) , m_ActiveThreads(0)
, m_JobSize(max(uint32(1), jobSize)) , m_JobSize(max(uint32(1), jobSize))
, m_InBufSz(inBufSz) , m_InBufSz(inBufSz)
@ -74,7 +77,7 @@ WorkerQueue::WorkerQueue(
, m_OutBuf(outBuf) , m_OutBuf(outBuf)
, m_CompressionFunc(func) , m_CompressionFunc(func)
{ {
clamp(m_NumThreads, uint32(1), kMaxNumWorkerThreads); clamp(m_NumThreads, uint32(1), uint32(kMaxNumWorkerThreads));
#ifndef NDEBUG #ifndef NDEBUG
if(m_InBufSz % 64) { if(m_InBufSz % 64) {
@ -93,11 +96,16 @@ void WorkerQueue::Run() {
m_ActiveThreads++; m_ActiveThreads++;
} }
m_StopWatch.Reset();
m_StopWatch.Start();
// Wait for them to finish... // Wait for them to finish...
while(m_ActiveThreads > 0) { while(m_ActiveThreads > 0) {
m_CV.wait(lock); m_CV.wait(lock);
} }
m_StopWatch.Stop();
// Join them all together.. // Join them all together..
for(int i = 0; i < m_NumThreads; i++) { for(int i = 0; i < m_NumThreads; i++) {
m_ThreadHandles[i]->join(); m_ThreadHandles[i]->join();
@ -141,6 +149,10 @@ WorkerThread::EAction WorkerQueue::AcceptThreadData(uint32 threadIdx) {
// Make sure the next block is updated. // Make sure the next block is updated.
m_NextBlock += blocksProcessed; m_NextBlock += blocksProcessed;
if(m_NextBlock == totalBlocks && ++m_NumCompressions < m_TotalNumCompressions) {
m_NextBlock = 0;
}
return WorkerThread::eAction_DoWork; return WorkerThread::eAction_DoWork;
} }

View file

@ -12,6 +12,7 @@ namespace boost {
#include "TexComp.h" #include "TexComp.h"
#include <boost/thread/mutex.hpp> #include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp> #include <boost/thread/condition_variable.hpp>
#include "StopWatch.h"
struct WorkerThread { struct WorkerThread {
friend class WorkerQueue; friend class WorkerQueue;
@ -36,6 +37,7 @@ class WorkerQueue {
friend class WorkerThread; friend class WorkerThread;
public: public:
WorkerQueue( WorkerQueue(
uint32 numCompressions,
uint32 numThreads, uint32 numThreads,
uint32 jobSize, uint32 jobSize,
const uint8 *inBuf, const uint8 *inBuf,
@ -48,9 +50,12 @@ class WorkerQueue {
// Runs the workers // Runs the workers
void Run(); void Run();
const StopWatch &GetStopWatch() const { return m_StopWatch; }
private: private:
uint32 m_NumCompressions;
const uint32 m_TotalNumCompressions;
uint32 m_NumThreads; uint32 m_NumThreads;
uint32 m_ActiveThreads; uint32 m_ActiveThreads;
uint32 m_JobSize; uint32 m_JobSize;
@ -62,7 +67,7 @@ class WorkerQueue {
boost::mutex m_Mutex; boost::mutex m_Mutex;
uint32 m_NextBlock; uint32 m_NextBlock;
static const uint32 kMaxNumWorkerThreads = 256; static const int kMaxNumWorkerThreads = 256;
uint32 m_Offsets[kMaxNumWorkerThreads]; uint32 m_Offsets[kMaxNumWorkerThreads];
uint32 m_NumBlocks[kMaxNumWorkerThreads]; uint32 m_NumBlocks[kMaxNumWorkerThreads];
@ -75,6 +80,8 @@ class WorkerQueue {
const CompressionFunc m_CompressionFunc; const CompressionFunc m_CompressionFunc;
CompressionFunc GetCompressionFunc() const { return m_CompressionFunc; } CompressionFunc GetCompressionFunc() const { return m_CompressionFunc; }
StopWatch m_StopWatch;
WorkerThread::EAction AcceptThreadData(uint32 threadIdx); WorkerThread::EAction AcceptThreadData(uint32 threadIdx);
void NotifyWorkerFinished(); void NotifyWorkerFinished();
}; };