SpikeStream Application Library
0.2
|
00001 #include "AnalysisRunner.h" 00002 #include "Globals.h" 00003 #include "SpikeStreamException.h" 00004 using namespace spikestream; 00005 00006 //Qt includes 00007 #include <QMutexLocker> 00008 00009 //Other includes 00010 #include <iostream> 00011 using namespace std; 00012 00013 00015 AnalysisRunner::AnalysisRunner(const DBInfo& netDBInfo, const DBInfo& archDBInfo, const DBInfo& anaDBInfo){ 00016 //Store information about databases 00017 this->networkDBInfo = netDBInfo; 00018 this->archiveDBInfo = archDBInfo; 00019 this->analysisDBInfo = anaDBInfo; 00020 00021 //Initialize variables 00022 stopThread = true; 00023 } 00024 00025 00027 AnalysisRunner::~AnalysisRunner(){ 00028 } 00029 00030 00031 /*-------------------------------------------------------------*/ 00032 /*------- PUBLIC METHODS ------*/ 00033 /*-------------------------------------------------------------*/ 00034 00036 void AnalysisRunner::prepareAnalysisTask(const AnalysisInfo& analysisInfo, int firstTimeStep, int lastTimeStep){ 00037 if(subThreadsRunning()) 00038 throw SpikeStreamException("Cannot prepare analysis task when sub threads are running."); 00039 if(stopThread == false) 00040 throw SpikeStreamException("Cannot prepare analysis task when analysis thread is running."); 00041 00042 //Reset class 00043 this->reset(); 00044 00045 //Check variables 00046 if(analysisInfo.getID() == 0) 00047 throw SpikeStreamException("Analysis ID has not been set."); 00048 if(firstTimeStep > lastTimeStep) 00049 throw SpikeStreamException("First time step is greater than the last time step."); 00050 00051 //Store variables 00052 this->analysisInfo = analysisInfo; 00053 this->firstTimeStep = firstTimeStep; 00054 this->lastTimeStep = lastTimeStep; 00055 nextTimeStep = firstTimeStep; 00056 } 00057 00058 00063 void AnalysisRunner::run(){ 00064 stopThread = false; 00065 00066 //Launch the first batch of threads 00067 unsigned int threadCount = 0; 00068 while(threadCount < analysisInfo.getNumberOfThreads()){ 00069 int tmpTimeStep = getNextTimeStep(); 00070 00071 //No more time steps, break out of the launching loop 00072 if(tmpTimeStep == -1) 00073 break; 00074 00075 //Start the thread running to analyze this time step 00076 try{ 00077 startAnalysisTimeStepThread(tmpTimeStep); 00078 ++threadCount; 00079 } 00080 catch(SpikeStreamException& ex){ 00081 setError(ex.getMessage()); 00082 } 00083 } 00084 00085 /* Wait for the analysis to complete 00086 When all of the sub threads have finished, stopThread should be set to true. */ 00087 while(!stopThread){ 00088 usleep(500000);//Sleep for half a second 00089 } 00090 } 00091 00092 00094 void AnalysisRunner::stop(){ 00095 //Halt all of the other threads and wait for them to finish 00096 for(QHash<int, AbstractAnalysisTimeStepThread*>::iterator iter = subThreadMap.begin(); iter != subThreadMap.end(); ++iter){ 00097 iter.value()->stopThread(); 00098 iter.value()->wait(); 00099 delete iter.value(); 00100 } 00101 00102 subThreadMap.clear(); 00103 stopThread = true; 00104 } 00105 00106 00108 void AnalysisRunner::setTimeStepThreadCreationFunction(AbstractAnalysisTimeStepThread* (*createAnalysisTimeStepThread)(const DBInfo& netDBInfo, const DBInfo& archDBInfo, const DBInfo& anaDBInfo)){ 00109 this->createAnalysisTimeStepThread = createAnalysisTimeStepThread; 00110 } 00111 00112 00113 /*-------------------------------------------------------------*/ 00114 /*------- PRIVATE SLOTS ------*/ 00115 /*-------------------------------------------------------------*/ 00116 00118 void AnalysisRunner::updateProgress(const QString& msg, unsigned int timeStep, unsigned int stepsCompleted, unsigned int totalSteps){ 00119 emit progress(msg, timeStep, stepsCompleted, totalSteps); 00120 } 00121 00122 00125 void AnalysisRunner::threadFinished(){ 00126 //Do nothing if thread is trying to stop 00127 if(stopThread) 00128 return; 00129 00130 /* Multiple sub threads could call this method simultaneously and mess things up 00131 Lock mutex to make sure we only process one thread finished event at a time. */ 00132 QMutexLocker(&this->mutex); 00133 00134 //Get a reference to the thread that has stopped 00135 AbstractAnalysisTimeStepThread* tmpSubThread = (AbstractAnalysisTimeStepThread*) sender(); 00136 00137 //Remove it from the map of currently running threads 00138 int tmpTimeStep = tmpSubThread->getTimeStep(); 00139 if(!subThreadMap.contains(tmpTimeStep)){ 00140 setError("Thread with time step " + QString::number(tmpTimeStep) + " is not recorded as running."); 00141 return; 00142 } 00143 00144 //Check for errors - set error will delete all of the threads 00145 if(tmpSubThread->isError()){ 00146 setError(tmpSubThread->getErrorMessage()); 00147 return; 00148 } 00149 00150 //Delete the thread class and remove it from the map 00151 delete tmpSubThread; 00152 subThreadMap.remove(tmpTimeStep); 00153 00154 //Inform listening classes that the analysis of this time step is now complete. 00155 emit timeStepComplete(tmpTimeStep); 00156 00157 //Determine if any time steps need to be analyzed 00158 int tmpNextTimeStep = getNextTimeStep(); 00159 if(tmpNextTimeStep >= 0){ 00160 try{ 00161 startAnalysisTimeStepThread(tmpNextTimeStep); 00162 } 00163 catch(SpikeStreamException& ex){ 00164 setError(ex.getMessage()); 00165 } 00166 } 00167 00168 //Next time step is -1. All of the time step threads have been launched and may have all finished 00169 else if(!subThreadsRunning()){//See if any threads are still running 00170 //Stop this thread when no sub threads are running 00171 stop(); 00172 } 00173 } 00174 00175 00177 void AnalysisRunner::updateResults(){ 00178 emit newResultsFound(); 00179 } 00180 00181 00182 /*-------------------------------------------------------------*/ 00183 /*------- PRIVATE METHODS ------*/ 00184 /*-------------------------------------------------------------*/ 00185 00187 void AnalysisRunner::clearError(){ 00188 error = false; 00189 errorMessage = ""; 00190 } 00191 00192 00195 int AnalysisRunner::getNextTimeStep(){ 00196 if(nextTimeStep == -1) 00197 return -1; 00198 00199 //Store current value of next time step. This is what will be returned 00200 int oldNextTimeStep = nextTimeStep; 00201 00202 //Increase the next time step and check to see if it has exceeded the range 00203 ++nextTimeStep; 00204 if(nextTimeStep > lastTimeStep) 00205 nextTimeStep = -1;//No more time steps left. -1 will be returned next time this method is called 00206 00207 //Return the original value of the next time step before it was increased or invalidated 00208 return oldNextTimeStep; 00209 } 00210 00211 00213 void AnalysisRunner::reset(){ 00214 subThreadMap.clear(); 00215 stopThread = true; 00216 clearError(); 00217 firstTimeStep = -1; 00218 lastTimeStep = -1; 00219 nextTimeStep = -1; 00220 } 00221 00222 00224 void AnalysisRunner::setError(const QString& message){ 00225 error = true; 00226 errorMessage = message; 00227 00228 //exit this thread 00229 stop(); 00230 } 00231 00232 00234 void AnalysisRunner::startAnalysisTimeStepThread(int timeStep){ 00235 //Run some checks on the time step 00236 if(timeStep == -1){ 00237 throw SpikeStreamException("Trying to start a new thread with an invalid time step."); 00238 } 00239 if(subThreadMap.contains(timeStep)){ 00240 throw SpikeStreamException("Trying to launch new thread to analyze time step " + QString::number(timeStep) + " which is already running."); 00241 } 00242 00243 //Lanch the new thread and store its address in the map 00244 AbstractAnalysisTimeStepThread* newThread = createAnalysisTimeStepThread(networkDBInfo, archiveDBInfo, analysisDBInfo); 00245 connect(newThread, SIGNAL(newResultsFound()), this, SLOT(updateResults())); 00246 connect(newThread, SIGNAL(finished()), this, SLOT(threadFinished())); 00247 connect(newThread, SIGNAL(progress(const QString&, unsigned int, unsigned int, unsigned int)), this, SLOT(updateProgress(const QString&, unsigned int, unsigned int, unsigned int))); 00248 newThread->prepareTimeStepAnalysis(analysisInfo, timeStep); 00249 subThreadMap[timeStep] = newThread; 00250 newThread->start(); 00251 } 00252 00253 00255 bool AnalysisRunner::subThreadsRunning(){ 00256 if(subThreadMap.isEmpty()) 00257 return false; 00258 return true; 00259 } 00260