SpikeStream Application Library  0.2
AnalysisRunner.cpp
Go to the documentation of this file.
00001 #include "AnalysisRunner.h"
00002 #include "Globals.h"
00003 #include "SpikeStreamException.h"
00004 using namespace spikestream;
00005 
00006 //Qt includes
00007 #include <QMutexLocker>
00008 
00009 //Other includes
00010 #include <iostream>
00011 using namespace std;
00012 
00013 
00015 AnalysisRunner::AnalysisRunner(const DBInfo& netDBInfo, const DBInfo& archDBInfo, const DBInfo& anaDBInfo){
00016         //Store information about databases
00017         this->networkDBInfo = netDBInfo;
00018         this->archiveDBInfo = archDBInfo;
00019         this->analysisDBInfo = anaDBInfo;
00020 
00021         //Initialize variables
00022         stopThread = true;
00023 }
00024 
00025 
00027 AnalysisRunner::~AnalysisRunner(){
00028 }
00029 
00030 
00031 /*-------------------------------------------------------------*/
00032 /*-------                 PUBLIC METHODS                 ------*/
00033 /*-------------------------------------------------------------*/
00034 
00036 void AnalysisRunner::prepareAnalysisTask(const AnalysisInfo& analysisInfo, int firstTimeStep, int lastTimeStep){
00037         if(subThreadsRunning())
00038                 throw SpikeStreamException("Cannot prepare analysis task when sub threads are running.");
00039         if(stopThread == false)
00040                 throw SpikeStreamException("Cannot prepare analysis task when analysis thread is running.");
00041 
00042         //Reset class
00043         this->reset();
00044 
00045         //Check variables
00046         if(analysisInfo.getID() == 0)
00047                 throw SpikeStreamException("Analysis ID has not been set.");
00048         if(firstTimeStep > lastTimeStep)
00049                 throw SpikeStreamException("First time step is greater than the last time step.");
00050 
00051         //Store variables
00052         this->analysisInfo = analysisInfo;
00053         this->firstTimeStep = firstTimeStep;
00054         this->lastTimeStep = lastTimeStep;
00055         nextTimeStep = firstTimeStep;
00056 }
00057 
00058 
00063 void AnalysisRunner::run(){
00064         stopThread = false;
00065 
00066         //Launch the first batch of threads
00067         unsigned int threadCount = 0;
00068         while(threadCount < analysisInfo.getNumberOfThreads()){
00069                 int tmpTimeStep = getNextTimeStep();
00070 
00071                 //No more time steps, break out of the launching loop
00072                 if(tmpTimeStep == -1)
00073                         break;
00074 
00075                 //Start the thread running to analyze this time step
00076                 try{
00077                         startAnalysisTimeStepThread(tmpTimeStep);
00078                         ++threadCount;
00079                 }
00080                 catch(SpikeStreamException& ex){
00081                         setError(ex.getMessage());
00082                 }
00083         }
00084 
00085         /* Wait for the analysis to complete
00086         When all of the sub threads have finished, stopThread should be set to true. */
00087         while(!stopThread){
00088                 usleep(500000);//Sleep for half a second
00089         }
00090 }
00091 
00092 
00094 void AnalysisRunner::stop(){
00095         //Halt all of the other threads and wait for them to finish
00096         for(QHash<int, AbstractAnalysisTimeStepThread*>::iterator iter = subThreadMap.begin(); iter != subThreadMap.end(); ++iter){
00097                 iter.value()->stopThread();
00098                 iter.value()->wait();
00099                 delete iter.value();
00100         }
00101 
00102         subThreadMap.clear();
00103         stopThread = true;
00104 }
00105 
00106 
00108 void AnalysisRunner::setTimeStepThreadCreationFunction(AbstractAnalysisTimeStepThread* (*createAnalysisTimeStepThread)(const DBInfo& netDBInfo, const DBInfo& archDBInfo, const DBInfo& anaDBInfo)){
00109         this->createAnalysisTimeStepThread = createAnalysisTimeStepThread;
00110 }
00111 
00112 
00113 /*-------------------------------------------------------------*/
00114 /*-------                  PRIVATE SLOTS                 ------*/
00115 /*-------------------------------------------------------------*/
00116 
00118 void AnalysisRunner::updateProgress(const QString& msg, unsigned int timeStep, unsigned int stepsCompleted, unsigned int totalSteps){
00119         emit progress(msg, timeStep, stepsCompleted, totalSteps);
00120 }
00121 
00122 
00125 void AnalysisRunner::threadFinished(){
00126         //Do nothing if thread is trying to stop
00127         if(stopThread)
00128                 return;
00129 
00130         /* Multiple sub threads could call this method simultaneously and mess things up
00131         Lock mutex to make sure we only process one thread finished event at a time. */
00132         QMutexLocker(&this->mutex);
00133 
00134         //Get a reference to the thread that has stopped
00135         AbstractAnalysisTimeStepThread* tmpSubThread = (AbstractAnalysisTimeStepThread*) sender();
00136 
00137         //Remove it from the map of currently running threads
00138         int tmpTimeStep = tmpSubThread->getTimeStep();
00139         if(!subThreadMap.contains(tmpTimeStep)){
00140                 setError("Thread with time step " + QString::number(tmpTimeStep) + " is not recorded as running.");
00141                 return;
00142         }
00143 
00144         //Check for errors - set error will delete all of the threads
00145         if(tmpSubThread->isError()){
00146                 setError(tmpSubThread->getErrorMessage());
00147                 return;
00148         }
00149 
00150         //Delete the thread class and remove it from the map
00151         delete tmpSubThread;
00152         subThreadMap.remove(tmpTimeStep);
00153 
00154         //Inform listening classes that the analysis of this time step is now complete.
00155         emit timeStepComplete(tmpTimeStep);
00156 
00157         //Determine if any time steps need to be analyzed
00158         int tmpNextTimeStep = getNextTimeStep();
00159         if(tmpNextTimeStep >= 0){
00160                 try{
00161                         startAnalysisTimeStepThread(tmpNextTimeStep);
00162                 }
00163                 catch(SpikeStreamException& ex){
00164                         setError(ex.getMessage());
00165                 }
00166         }
00167 
00168         //Next time step is -1. All of the time step threads have been launched and may have all finished
00169         else if(!subThreadsRunning()){//See if any threads are still running
00170                 //Stop this thread when no sub threads are running
00171                 stop();
00172         }
00173 }
00174 
00175 
00177 void AnalysisRunner::updateResults(){
00178         emit newResultsFound();
00179 }
00180 
00181 
00182 /*-------------------------------------------------------------*/
00183 /*-------                PRIVATE METHODS                 ------*/
00184 /*-------------------------------------------------------------*/
00185 
00187 void AnalysisRunner::clearError(){
00188         error = false;
00189         errorMessage = "";
00190 }
00191 
00192 
00195 int AnalysisRunner::getNextTimeStep(){
00196         if(nextTimeStep == -1)
00197                 return -1;
00198 
00199         //Store current value of next time step. This is what will be returned
00200         int oldNextTimeStep = nextTimeStep;
00201 
00202         //Increase the next time step and check to see if it has exceeded the range
00203         ++nextTimeStep;
00204         if(nextTimeStep > lastTimeStep)
00205                 nextTimeStep = -1;//No more time steps left. -1 will be returned next time this method is called
00206 
00207         //Return the original value of the next time step before it was increased or invalidated
00208         return oldNextTimeStep;
00209 }
00210 
00211 
00213 void AnalysisRunner::reset(){
00214         subThreadMap.clear();
00215         stopThread = true;
00216         clearError();
00217         firstTimeStep = -1;
00218         lastTimeStep = -1;
00219         nextTimeStep = -1;
00220 }
00221 
00222 
00224 void AnalysisRunner::setError(const QString& message){
00225         error = true;
00226         errorMessage = message;
00227 
00228         //exit this thread
00229         stop();
00230 }
00231 
00232 
00234 void AnalysisRunner::startAnalysisTimeStepThread(int timeStep){
00235         //Run some checks on the time step
00236         if(timeStep == -1){
00237                 throw SpikeStreamException("Trying to start a new thread with an invalid time step.");
00238         }
00239         if(subThreadMap.contains(timeStep)){
00240                 throw SpikeStreamException("Trying to launch new thread to analyze time step " + QString::number(timeStep) + " which is already running.");
00241         }
00242 
00243         //Lanch the new thread and store its address in the map
00244         AbstractAnalysisTimeStepThread* newThread = createAnalysisTimeStepThread(networkDBInfo, archiveDBInfo, analysisDBInfo);
00245         connect(newThread, SIGNAL(newResultsFound()), this, SLOT(updateResults()));
00246         connect(newThread, SIGNAL(finished()), this, SLOT(threadFinished()));
00247         connect(newThread, SIGNAL(progress(const QString&, unsigned int, unsigned int, unsigned int)), this, SLOT(updateProgress(const QString&, unsigned int, unsigned int, unsigned int)));
00248         newThread->prepareTimeStepAnalysis(analysisInfo, timeStep);
00249         subThreadMap[timeStep] = newThread;
00250         newThread->start();
00251 }
00252 
00253 
00255 bool AnalysisRunner::subThreadsRunning(){
00256         if(subThreadMap.isEmpty())
00257                 return false;
00258         return true;
00259 }
00260 
 All Classes Files Functions Variables Typedefs Friends Defines