ARGoS  3
A parallel, multi-engine simulator for swarm robotics
core/simulator/space/space_multi_thread_balance_length.cpp
Go to the documentation of this file.
00001 
00007 #include "space_multi_thread_balance_length.h"
00008 #include <argos3/core/simulator/simulator.h>
00009 #include <argos3/core/utility/profiler/profiler.h>
00010 
00011 #include <cstdio>
00012 
00013 namespace argos {
00014 
00015    /****************************************/
00016    /****************************************/
00017 
00018    struct SCleanupThreadData {
00019       pthread_mutex_t* StartSenseControlPhaseMutex;
00020       pthread_mutex_t* StartActPhaseMutex;
00021       pthread_mutex_t* StartPhysicsPhaseMutex;
00022       pthread_mutex_t* FetchTaskMutex;
00023    };
00024 
00025    static void CleanupThread(void* p_data) {
00026       CSimulator& cSimulator = CSimulator::GetInstance();
00027       if(cSimulator.IsProfiling()) {
00028          cSimulator.GetProfiler().CollectThreadResourceUsage();
00029       }
00030       SCleanupThreadData& sData =
00031          *reinterpret_cast<SCleanupThreadData*>(p_data);
00032       pthread_mutex_unlock(sData.FetchTaskMutex);
00033       pthread_mutex_unlock(sData.StartSenseControlPhaseMutex);
00034       pthread_mutex_unlock(sData.StartActPhaseMutex);
00035       pthread_mutex_unlock(sData.StartPhysicsPhaseMutex);
00036    }
00037 
00038    void* LaunchThreadBalanceLength(void* p_data) {
00039       /* Set up thread-safe buffers for this new thread */
00040       LOG.AddThreadSafeBuffer();
00041       LOGERR.AddThreadSafeBuffer();
00042       /* Make this thread cancellable */
00043       pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
00044       pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
00045       /* Get a handle to the thread launch data */
00046       CSpaceMultiThreadBalanceLength::SThreadLaunchData* psData = reinterpret_cast<CSpaceMultiThreadBalanceLength::SThreadLaunchData*>(p_data);
00047       /* Create cancellation data */
00048       SCleanupThreadData sCancelData;
00049       sCancelData.StartSenseControlPhaseMutex = &(psData->Space->m_tStartSenseControlPhaseMutex);
00050       sCancelData.StartActPhaseMutex = &(psData->Space->m_tStartActPhaseMutex);
00051       sCancelData.StartPhysicsPhaseMutex = &(psData->Space->m_tStartPhysicsPhaseMutex);
00052       sCancelData.FetchTaskMutex = &(psData->Space->m_tFetchTaskMutex);
00053       pthread_cleanup_push(CleanupThread, &sCancelData);
00054       if(psData->ThreadId == 0) {
00055          /* Execute the code for the dispatch thread */
00056          psData->Space->DispatchThread(psData->ThreadId);
00057       }
00058       else {
00059          /* Execute the code for a slave thread */
00060          psData->Space->SlaveThread(psData->ThreadId);
00061       }
00062       /* Dispose of cancellation data */
00063       pthread_cleanup_pop(1);
00064       return NULL;
00065    }
00066 
00067    /****************************************/
00068    /****************************************/
00069 
00070    void CSpaceMultiThreadBalanceLength::STaskData::Reset() {
00071       Index = 0;
00072       Used = false;
00073       Done = false;
00074    }
00075 
00076    /****************************************/
00077    /****************************************/
00078 
00079    void CSpaceMultiThreadBalanceLength::Init(TConfigurationNode& t_tree) {
00080       /* Initialize the space */
00081       CSpace::Init(t_tree);
00082       /* Initialize thread related structures */
00083       int nErrors;
00084       /* Init mutexes */
00085       if((nErrors = pthread_mutex_init(&m_tStartSenseControlPhaseMutex, NULL)) ||
00086          (nErrors = pthread_mutex_init(&m_tStartActPhaseMutex, NULL)) ||
00087          (nErrors = pthread_mutex_init(&m_tStartPhysicsPhaseMutex, NULL)) ||
00088          (nErrors = pthread_mutex_init(&m_tFetchTaskMutex, NULL))) {
00089          THROW_ARGOSEXCEPTION("Error creating thread mutexes " << ::strerror(nErrors));
00090       }
00091       /* Init conditionals */
00092       if((nErrors = pthread_cond_init(&m_tStartSenseControlPhaseCond, NULL)) ||
00093          (nErrors = pthread_cond_init(&m_tStartActPhaseCond, NULL)) ||
00094          (nErrors = pthread_cond_init(&m_tStartPhysicsPhaseCond, NULL)) ||
00095          (nErrors = pthread_cond_init(&m_tFetchTaskCond, NULL))) {
00096          THROW_ARGOSEXCEPTION("Error creating thread conditionals " << ::strerror(nErrors));
00097       }
00098       /* Reset the idle thread count */
00099       m_unSenseControlPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
00100       m_unActPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
00101       m_unPhysicsPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
00102       /* Start threads */
00103       StartThreads();
00104    }
00105 
00106    /****************************************/
00107    /****************************************/
00108 
00109    void CSpaceMultiThreadBalanceLength::Destroy() {
00110       /* Destroy the threads to update the controllable entities */
00111       int nErrors;
00112       if(m_ptThreads != NULL) {
00113          for(UInt32 i = 0; i <= CSimulator::GetInstance().GetNumThreads(); ++i) {
00114             if((nErrors = pthread_cancel(m_ptThreads[i]))) {
00115                THROW_ARGOSEXCEPTION("Error canceling threads " << ::strerror(nErrors));
00116             }
00117          }
00118          void** ppJoinResult = new void*[CSimulator::GetInstance().GetNumThreads()+1];
00119          for(UInt32 i = 0; i <= CSimulator::GetInstance().GetNumThreads(); ++i) {
00120             if((nErrors = pthread_join(m_ptThreads[i], ppJoinResult + i))) {
00121                THROW_ARGOSEXCEPTION("Error joining threads " << ::strerror(nErrors));
00122             }
00123             if(ppJoinResult[i] != PTHREAD_CANCELED) {
00124                LOGERR << "[WARNING] Thread #" << i<< " not canceled" << std::endl;
00125             }
00126          }
00127          delete[] ppJoinResult;
00128       }
00129       delete[] m_ptThreads;
00130       /* Destroy the thread launch info */
00131       if(m_psThreadData != NULL) {
00132          for(UInt32 i = 0; i <= CSimulator::GetInstance().GetNumThreads(); ++i) {
00133             delete m_psThreadData[i];
00134          }
00135       }
00136       delete[] m_psThreadData;
00137       pthread_mutex_destroy(&m_tStartSenseControlPhaseMutex);
00138       pthread_mutex_destroy(&m_tStartActPhaseMutex);
00139       pthread_mutex_destroy(&m_tStartPhysicsPhaseMutex);
00140       pthread_mutex_destroy(&m_tFetchTaskMutex);
00141       pthread_cond_destroy(&m_tStartSenseControlPhaseCond);
00142       pthread_cond_destroy(&m_tStartActPhaseCond);
00143       pthread_cond_destroy(&m_tStartPhysicsPhaseCond);
00144       pthread_cond_destroy(&m_tFetchTaskCond);
00145 
00146       /* Destroy the base space */
00147       CSpace::Destroy();
00148    }
00149 
00150    /****************************************/
00151    /****************************************/
00152 
00153 #define MAIN_START_PHASE(PHASE)                             \
00154    pthread_mutex_lock(&m_tStart ## PHASE ## PhaseMutex);    \
00155    m_un ## PHASE ## PhaseIdleCounter = 0;                   \
00156    m_sTaskData.Reset();                                     \
00157    pthread_cond_broadcast(&m_tStart ## PHASE ## PhaseCond); \
00158    pthread_mutex_unlock(&m_tStart ## PHASE ## PhaseMutex);
00159 
00160 #define MAIN_WAIT_FOR_END_OF(PHASE)                                                         \
00161    pthread_mutex_lock(&m_tStart ## PHASE ## PhaseMutex);                                    \
00162    while(m_un ## PHASE ## PhaseIdleCounter < CSimulator::GetInstance().GetNumThreads()) {   \
00163       pthread_cond_wait(&m_tStart ## PHASE ## PhaseCond, &m_tStart ## PHASE ## PhaseMutex); \
00164    }                                                                                        \
00165    pthread_mutex_unlock(&m_tStart ## PHASE ## PhaseMutex);
00166 
00167    void CSpaceMultiThreadBalanceLength::UpdateControllableEntities() {
00168       /* Reset the idle thread count */
00169       m_unSenseControlPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
00170       m_unActPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
00171       m_unPhysicsPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
00172       /* Sense/control phase */
00173       MAIN_START_PHASE(SenseControl);
00174       MAIN_WAIT_FOR_END_OF(SenseControl);
00175       /* Act phase */
00176       MAIN_START_PHASE(Act);
00177       MAIN_WAIT_FOR_END_OF(Act);
00178    }
00179 
00180    /****************************************/
00181    /****************************************/
00182 
00183    void CSpaceMultiThreadBalanceLength::UpdatePhysics() {
00184       /* Physics phase */
00185       MAIN_START_PHASE(Physics);
00186       MAIN_WAIT_FOR_END_OF(Physics);
00187       /* Perform entity transfer from engine to engine, if needed */
00188       for(size_t i = 0; i < m_ptPhysicsEngines->size(); ++i) {
00189          if((*m_ptPhysicsEngines)[i]->IsEntityTransferNeeded()) {
00190             (*m_ptPhysicsEngines)[i]->TransferEntities();
00191          }
00192       }
00193    }
00194 
00195    /****************************************/
00196    /****************************************/
00197 
00198    void CSpaceMultiThreadBalanceLength::StartThreads() {
00199       int nErrors;
00200       /* Create the threads to update the controllable entities */
00201       m_ptThreads = new pthread_t[CSimulator::GetInstance().GetNumThreads() + 1];
00202       m_psThreadData = new SThreadLaunchData*[CSimulator::GetInstance().GetNumThreads() + 1];
00203       for(UInt32 i = 0; i <= CSimulator::GetInstance().GetNumThreads(); ++i) {
00204          /* Create the struct with the info to launch the thread */
00205          m_psThreadData[i] = new SThreadLaunchData(i, this);
00206          /* Create the thread */
00207          if((nErrors = pthread_create(m_ptThreads + i,
00208                                       NULL,
00209                                       LaunchThreadBalanceLength,
00210                                       reinterpret_cast<void*>(m_psThreadData[i])))) {
00211             THROW_ARGOSEXCEPTION("Error creating thread: " << ::strerror(nErrors));
00212          }
00213       }
00214    }
00215 
00216    /****************************************/
00217    /****************************************/
00218 
00219 #define THREAD_WAIT_FOR_START_OF(PHASE)                                                     \
00220    pthread_mutex_lock(&m_tStart ## PHASE ## PhaseMutex);                                    \
00221    while(m_un ## PHASE ## PhaseIdleCounter == CSimulator::GetInstance().GetNumThreads()) {  \
00222       pthread_cond_wait(&m_tStart ## PHASE ## PhaseCond, &m_tStart ## PHASE ## PhaseMutex); \
00223    }                                                                                        \
00224    pthread_mutex_unlock(&m_tStart ## PHASE ## PhaseMutex);                                  \
00225    pthread_testcancel();
00226 
00227 #define THREAD_DISPATCH_TASK(TASKVEC)                                 \
00228    if(! (TASKVEC).empty()) {                                          \
00229       while(! m_sTaskData.Done) {                                     \
00230          pthread_mutex_lock(&m_tFetchTaskMutex);                      \
00231          while(! m_sTaskData.Used) {                                  \
00232             pthread_cond_wait(&m_tFetchTaskCond, &m_tFetchTaskMutex); \
00233          }                                                            \
00234          ++m_sTaskData.Index;                                         \
00235          if(m_sTaskData.Index < (TASKVEC).size()) {                   \
00236             m_sTaskData.Used = false;                                 \
00237          }                                                            \
00238          else {                                                       \
00239             m_sTaskData.Done = true;                                  \
00240          }                                                            \
00241          pthread_cond_broadcast(&m_tFetchTaskCond);                   \
00242          pthread_mutex_unlock(&m_tFetchTaskMutex);                    \
00243          pthread_testcancel();                                        \
00244       }                                                               \
00245    }                                                                  \
00246    else {                                                             \
00247       m_sTaskData.Done = true;                                        \
00248       pthread_mutex_unlock(&m_tFetchTaskMutex);                       \
00249       pthread_testcancel();                                           \
00250    }
00251 
00252    /****************************************/
00253    /****************************************/
00254 
00255    void CSpaceMultiThreadBalanceLength::DispatchThread(UInt32 un_id) {
00256       while(1) {
00257          THREAD_WAIT_FOR_START_OF(SenseControl);
00258          THREAD_DISPATCH_TASK(m_vecControllableEntities);
00259          THREAD_WAIT_FOR_START_OF(Act);
00260          THREAD_DISPATCH_TASK(m_vecControllableEntities);
00261          THREAD_WAIT_FOR_START_OF(Physics);
00262          THREAD_DISPATCH_TASK(*m_ptPhysicsEngines);
00263       }
00264    }
00265 
00266    /****************************************/
00267    /****************************************/
00268 
00269 #define THREAD_PERFORM_TASK(PHASE, SNIPPET)                         \
00270    while(1) {                                                       \
00271       pthread_mutex_lock(&m_tFetchTaskMutex);                       \
00272       while(m_sTaskData.Used && ! m_sTaskData.Done) {               \
00273          pthread_cond_wait(&m_tFetchTaskCond, &m_tFetchTaskMutex);  \
00274       }                                                             \
00275       if(! m_sTaskData.Done) {                                      \
00276          unTaskIndex = m_sTaskData.Index;                           \
00277          m_sTaskData.Used = true;                                   \
00278          pthread_cond_broadcast(&m_tFetchTaskCond);                 \
00279          pthread_mutex_unlock(&m_tFetchTaskMutex);                  \
00280          pthread_testcancel();                                      \
00281          {                                                          \
00282             SNIPPET;                                                \
00283          }                                                          \
00284          pthread_testcancel();                                      \
00285       }                                                             \
00286       else {                                                        \
00287          pthread_mutex_unlock(&m_tFetchTaskMutex);                  \
00288          pthread_testcancel();                                      \
00289          pthread_mutex_lock(&m_tStart ## PHASE ## PhaseMutex);      \
00290          ++m_un ## PHASE ## PhaseIdleCounter;                       \
00291          pthread_cond_broadcast(&m_tStart ## PHASE ## PhaseCond);   \
00292          pthread_mutex_unlock(&m_tStart ## PHASE ## PhaseMutex);    \
00293          pthread_testcancel();                                      \
00294          break;                                                     \
00295       }                                                             \
00296    }                                                                \
00297    pthread_testcancel();
00298 
00299    void CSpaceMultiThreadBalanceLength::SlaveThread(UInt32 un_id) {
00300       /* Task index */
00301       size_t unTaskIndex;
00302       while(1) {
00303          THREAD_WAIT_FOR_START_OF(SenseControl);
00304          THREAD_PERFORM_TASK(
00305             SenseControl,
00306             m_vecControllableEntities[unTaskIndex]->Sense();
00307             m_vecControllableEntities[unTaskIndex]->ControlStep();
00308             );
00309          THREAD_WAIT_FOR_START_OF(Act);
00310          THREAD_PERFORM_TASK(
00311             Act,
00312             m_vecControllableEntities[unTaskIndex]->Act();
00313             );
00314          THREAD_WAIT_FOR_START_OF(Physics);
00315          THREAD_PERFORM_TASK(
00316             Physics,
00317             (*m_ptPhysicsEngines)[unTaskIndex]->Update();
00318             );
00319       }
00320    }
00321 
00322    /****************************************/
00323    /****************************************/
00324 
00325 }