ARGoS  3
A parallel, multi-engine simulator for swarm robotics
core/simulator/space/space_multi_thread_balance_quantity.cpp
Go to the documentation of this file.
00001 
00007 #include <unistd.h>
00008 #include <cstring>
00009 #include <argos3/core/simulator/simulator.h>
00010 #include <argos3/core/utility/profiler/profiler.h>
00011 #include "space_multi_thread_balance_quantity.h"
00012 
00013 namespace argos {
00014 
00015    /****************************************/
00016    /****************************************/
00017 
00018    struct SCleanupUpdateThreadData {
00019       pthread_mutex_t* SenseControlStepConditionalMutex;
00020       pthread_mutex_t* ActConditionalMutex;
00021       pthread_mutex_t* PhysicsConditionalMutex;
00022       pthread_mutex_t* MediaConditionalMutex;
00023    };
00024 
00025    static void CleanupUpdateThread(void* p_data) {
00026       CSimulator& cSimulator = CSimulator::GetInstance();
00027       if(cSimulator.IsProfiling()) {
00028          cSimulator.GetProfiler().CollectThreadResourceUsage();
00029       }
00030       SCleanupUpdateThreadData& sData =
00031          *reinterpret_cast<SCleanupUpdateThreadData*>(p_data);
00032       pthread_mutex_unlock(sData.SenseControlStepConditionalMutex);
00033       pthread_mutex_unlock(sData.ActConditionalMutex);
00034       pthread_mutex_unlock(sData.PhysicsConditionalMutex);
00035       pthread_mutex_unlock(sData.MediaConditionalMutex);
00036    }
00037 
00038    void* LaunchUpdateThreadBalanceQuantity(void* p_data) {
00039       LOG.AddThreadSafeBuffer();
00040       LOGERR.AddThreadSafeBuffer();
00041       CSpaceMultiThreadBalanceQuantity::SUpdateThreadData* psData = reinterpret_cast<CSpaceMultiThreadBalanceQuantity::SUpdateThreadData*>(p_data);
00042       psData->Space->UpdateThread(psData->ThreadId);
00043       return NULL;
00044    }
00045 
00046    /****************************************/
00047    /****************************************/
00048 
00049    void CSpaceMultiThreadBalanceQuantity::Init(TConfigurationNode& t_tree) {
00050       /* Initialize the space */
00051       CSpace::Init(t_tree);
00052       /* Initialize thread related structures */
00053       int nErrors;
00054       /* First the counters */
00055       m_unSenseControlStepPhaseDoneCounter = CSimulator::GetInstance().GetNumThreads();
00056       m_unActPhaseDoneCounter = CSimulator::GetInstance().GetNumThreads();
00057       m_unPhysicsPhaseDoneCounter = CSimulator::GetInstance().GetNumThreads();
00058       m_unMediaPhaseDoneCounter = CSimulator::GetInstance().GetNumThreads();
00059       /* Then the mutexes */
00060       if((nErrors = pthread_mutex_init(&m_tSenseControlStepConditionalMutex, NULL)) ||
00061          (nErrors = pthread_mutex_init(&m_tActConditionalMutex, NULL)) ||
00062          (nErrors = pthread_mutex_init(&m_tPhysicsConditionalMutex, NULL)) ||
00063          (nErrors = pthread_mutex_init(&m_tMediaConditionalMutex, NULL))) {
00064          THROW_ARGOSEXCEPTION("Error creating thread mutexes " << ::strerror(nErrors));
00065       }
00066       /* Finally the conditionals */
00067       if((nErrors = pthread_cond_init(&m_tSenseControlStepConditional, NULL)) ||
00068          (nErrors = pthread_cond_init(&m_tActConditional, NULL)) ||
00069          (nErrors = pthread_cond_init(&m_tPhysicsConditional, NULL)) ||
00070          (nErrors = pthread_cond_init(&m_tMediaConditional, NULL))) {
00071          THROW_ARGOSEXCEPTION("Error creating thread conditionals " << ::strerror(nErrors));
00072       }
00073       /* Start threads */
00074       StartThreads();
00075    }
00076 
00077    /****************************************/
00078    /****************************************/
00079 
00080    void CSpaceMultiThreadBalanceQuantity::StartThreads() {
00081       int nErrors;
00082       /* Create the threads to update the controllable entities */
00083       m_ptUpdateThreads = new pthread_t[CSimulator::GetInstance().GetNumThreads()];
00084       m_psUpdateThreadData = new SUpdateThreadData*[CSimulator::GetInstance().GetNumThreads()];
00085       for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
00086          /* Create the struct with the info to launch the thread */
00087          m_psUpdateThreadData[i] = new SUpdateThreadData(i, this);
00088          /* Create the thread */
00089          if((nErrors = pthread_create(m_ptUpdateThreads + i,
00090                                       NULL,
00091                                       LaunchUpdateThreadBalanceQuantity,
00092                                       reinterpret_cast<void*>(m_psUpdateThreadData[i])))) {
00093             THROW_ARGOSEXCEPTION("Error creating thread: " << ::strerror(nErrors));
00094          }
00095       }
00096    }
00097 
00098    /****************************************/
00099    /****************************************/
00100 
00101    void CSpaceMultiThreadBalanceQuantity::Destroy()
00102    {
00103       /* Destroy the threads to update the controllable entities */
00104       int nErrors;
00105       if(m_ptUpdateThreads != NULL) {
00106          for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
00107             if((nErrors = pthread_cancel(m_ptUpdateThreads[i]))) {
00108                THROW_ARGOSEXCEPTION("Error canceling controllable entities update threads " << ::strerror(nErrors));
00109             }
00110          }
00111          void** ppJoinResult = new void*[CSimulator::GetInstance().GetNumThreads()];
00112          for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
00113             if((nErrors = pthread_join(m_ptUpdateThreads[i], ppJoinResult + i))) {
00114                THROW_ARGOSEXCEPTION("Error joining controllable entities update threads " << ::strerror(nErrors));
00115             }
00116             if(ppJoinResult[i] != PTHREAD_CANCELED) {
00117                LOGERR << "[WARNING] Controllable entities update thread #" << i<< " not canceled" << std::endl;
00118             }
00119          }
00120          delete[] ppJoinResult;
00121       }
00122       delete[] m_ptUpdateThreads;
00123       /* Destroy the thread launch info */
00124       if(m_psUpdateThreadData != NULL) {
00125          for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
00126             delete m_psUpdateThreadData[i];
00127          }
00128       }
00129       delete[] m_psUpdateThreadData;
00130       pthread_mutex_destroy(&m_tSenseControlStepConditionalMutex);
00131       pthread_mutex_destroy(&m_tActConditionalMutex);
00132       pthread_cond_destroy(&m_tSenseControlStepConditional);
00133       pthread_cond_destroy(&m_tActConditional);
00134       pthread_cond_destroy(&m_tPhysicsConditional);
00135       pthread_cond_destroy(&m_tMediaConditional);
00136       /* Destroy the base space */
00137       CSpace::Destroy();
00138    }
00139 
00140    /****************************************/
00141    /****************************************/
00142    
00143    void CSpaceMultiThreadBalanceQuantity::AddControllableEntity(CControllableEntity& c_entity) {
00144       m_bIsControllableEntityAssignmentRecalculationNeeded = true;
00145       CSpace::AddControllableEntity(c_entity);
00146    }
00147 
00148    /****************************************/
00149    /****************************************/
00150    
00151    void CSpaceMultiThreadBalanceQuantity::RemoveControllableEntity(CControllableEntity& c_entity) {
00152       m_bIsControllableEntityAssignmentRecalculationNeeded = true;
00153       CSpace::RemoveControllableEntity(c_entity);
00154    }
00155 
00156    /****************************************/
00157    /****************************************/
00158    
00159 #define MAIN_SEND_GO_FOR_PHASE(PHASE)                       \
00160    LOG.Flush();                                             \
00161    LOGERR.Flush();                                          \
00162    pthread_mutex_lock(&m_t ## PHASE ## ConditionalMutex);   \
00163    m_un ## PHASE ## PhaseDoneCounter = 0;                   \
00164    pthread_cond_broadcast(&m_t ## PHASE ## Conditional);    \
00165    pthread_mutex_unlock(&m_t ## PHASE ## ConditionalMutex);
00166 
00167 #define MAIN_WAIT_FOR_PHASE_END(PHASE)                                  \
00168    pthread_mutex_lock(&m_t ## PHASE ## ConditionalMutex);               \
00169    while(m_un ## PHASE ## PhaseDoneCounter < CSimulator::GetInstance().GetNumThreads()) { \
00170       pthread_cond_wait(&m_t ## PHASE ## Conditional, &m_t ## PHASE ## ConditionalMutex); \
00171    }                                                                    \
00172    pthread_mutex_unlock(&m_t ## PHASE ## ConditionalMutex);
00173    
00174    void CSpaceMultiThreadBalanceQuantity::UpdateControllableEntities() {
00175       MAIN_SEND_GO_FOR_PHASE(SenseControlStep);
00176       MAIN_WAIT_FOR_PHASE_END(SenseControlStep);
00177       MAIN_SEND_GO_FOR_PHASE(Act);
00178       MAIN_WAIT_FOR_PHASE_END(Act);
00179       /* Avoid recalculation at the next time step */
00180       m_bIsControllableEntityAssignmentRecalculationNeeded = false;
00181    }
00182 
00183    /****************************************/
00184    /****************************************/
00185 
00186    void CSpaceMultiThreadBalanceQuantity::UpdatePhysics() {
00187       /* Update the physics engines */
00188       MAIN_SEND_GO_FOR_PHASE(Physics);
00189       MAIN_WAIT_FOR_PHASE_END(Physics);
00190       /* Perform entity transfer from engine to engine, if needed */
00191       for(size_t i = 0; i < m_ptPhysicsEngines->size(); ++i) {
00192          if((*m_ptPhysicsEngines)[i]->IsEntityTransferNeeded()) {
00193             (*m_ptPhysicsEngines)[i]->TransferEntities();
00194          }
00195       }
00196    }
00197 
00198    /****************************************/
00199    /****************************************/
00200 
00201    void CSpaceMultiThreadBalanceQuantity::UpdateMedia() {
00202       /* Update the media */
00203       MAIN_SEND_GO_FOR_PHASE(Media);
00204       MAIN_WAIT_FOR_PHASE_END(Media);
00205    }
00206 
00207    /****************************************/
00208    /****************************************/
00209 
00210 #define THREAD_WAIT_FOR_GO_SIGNAL(PHASE)                                                   \
00211    pthread_mutex_lock(&m_t ## PHASE ## ConditionalMutex);                                  \
00212    while(m_un ## PHASE ## PhaseDoneCounter == CSimulator::GetInstance().GetNumThreads()) { \
00213       pthread_cond_wait(&m_t ## PHASE ## Conditional, &m_t ## PHASE ## ConditionalMutex);  \
00214    }                                                                                       \
00215    pthread_mutex_unlock(&m_t ## PHASE ## ConditionalMutex);                                \
00216    pthread_testcancel();
00217    
00218 #define THREAD_SIGNAL_PHASE_DONE(PHASE)                     \
00219    pthread_mutex_lock(&m_t ## PHASE ## ConditionalMutex);   \
00220    ++m_un ## PHASE ## PhaseDoneCounter;                     \
00221    pthread_cond_broadcast(&m_t ## PHASE ## Conditional);    \
00222    pthread_mutex_unlock(&m_t ## PHASE ## ConditionalMutex); \
00223    pthread_testcancel();
00224 
00225    CRange<size_t> CalculatePluginRangeForThread(size_t un_id,
00226                                                 size_t un_tot_plugins) {
00227       /* This is the minimum number of plugins assigned to a thread */
00228       size_t unMinPortion = un_tot_plugins / CSimulator::GetInstance().GetNumThreads();
00229       /* If the division has a remainder, the extra plugins must be assigned too */
00230       size_t unExtraPortion = un_tot_plugins % CSimulator::GetInstance().GetNumThreads();
00231       /* Calculate the range */
00232       if(unMinPortion == 0) {
00233          /* Not all threads get a plugin */
00234          if(un_id < unExtraPortion) {
00235             /* This thread does */
00236             return CRange<size_t>(un_id, un_id+1);
00237          }
00238          else {
00239             /* This thread does not */
00240             return CRange<size_t>();
00241          }
00242       }
00243       else {
00244          /* For sure this thread will get unMinPortion plugins, does it get an extra too? */
00245          if(un_id < unExtraPortion) {
00246             /* Yes, it gets an extra */
00247             return CRange<size_t>( un_id    * (unMinPortion+1),
00248                                   (un_id+1) * (unMinPortion+1));
00249          }
00250          else {
00251             /* No, it doesn't get an extra */
00252             return CRange<size_t>(unExtraPortion * (unMinPortion+1) + (un_id-unExtraPortion)   * unMinPortion,
00253                                   unExtraPortion * (unMinPortion+1) + (un_id-unExtraPortion+1) * unMinPortion);
00254          }
00255       }
00256    }
00257 
00258    void CSpaceMultiThreadBalanceQuantity::UpdateThread(UInt32 un_id) {
00259       /* Copy the id */
00260       UInt32 unId = un_id;
00261       /* Create cancellation data */
00262       SCleanupUpdateThreadData sCancelData;
00263       sCancelData.SenseControlStepConditionalMutex = &m_tSenseControlStepConditionalMutex;
00264       sCancelData.ActConditionalMutex = &m_tActConditionalMutex;
00265       sCancelData.PhysicsConditionalMutex = &m_tPhysicsConditionalMutex;
00266       sCancelData.MediaConditionalMutex = &m_tMediaConditionalMutex;
00267       pthread_cleanup_push(CleanupUpdateThread, &sCancelData);
00268       /* Id range for the physics engines assigned to this thread */
00269       CRange<size_t> cPhysicsRange = CalculatePluginRangeForThread(unId, m_ptPhysicsEngines->size());
00270       /* Id range for the physics engines assigned to this thread */
00271       CRange<size_t> cMediaRange = CalculatePluginRangeForThread(unId, m_ptMedia->size());
00272       /* Variables storing the portion of entities to update */
00273       CRange<size_t> cEntityRange;
00274       while(1) {
00275          THREAD_WAIT_FOR_GO_SIGNAL(SenseControlStep);
00276          /* Calculate the portion of entities to update, if needed */
00277          if(m_bIsControllableEntityAssignmentRecalculationNeeded) {
00278             cEntityRange = CalculatePluginRangeForThread(unId, m_vecControllableEntities.size());
00279          }
00280          /* Cope with the fact that there may be less entities than threads */
00281          if(cEntityRange.GetSpan() > 0) {
00282             /* This thread has entities */
00283             /* Update sensor readings and call controllers */
00284             for(size_t i = cEntityRange.GetMin(); i < cEntityRange.GetMax(); ++i) {
00285                m_vecControllableEntities[i]->Sense();
00286                m_vecControllableEntities[i]->ControlStep();
00287             }
00288             pthread_testcancel();
00289             THREAD_SIGNAL_PHASE_DONE(SenseControlStep);
00290             /* Actuate control choices */
00291             THREAD_WAIT_FOR_GO_SIGNAL(Act);
00292             for(size_t i = cEntityRange.GetMin(); i < cEntityRange.GetMax(); ++i) {
00293                m_vecControllableEntities[i]->Act();
00294             }
00295             pthread_testcancel();
00296             THREAD_SIGNAL_PHASE_DONE(Act);
00297          }
00298          else {
00299             /* This thread has no entities -> dummy computation */
00300             /* Update sensor readings */
00301             /* Call controllers */
00302             THREAD_WAIT_FOR_GO_SIGNAL(SenseControlStep);
00303             THREAD_SIGNAL_PHASE_DONE(SenseControlStep);
00304             /* Actuate control choices */
00305             THREAD_WAIT_FOR_GO_SIGNAL(Act);
00306             THREAD_SIGNAL_PHASE_DONE(Act);
00307          }
00308          /* Update physics engines, if this thread has been assigned to them */
00309          THREAD_WAIT_FOR_GO_SIGNAL(Physics);
00310          if(cPhysicsRange.GetSpan() > 0) {
00311             /* This thread has engines, update them */
00312             for(size_t i = cPhysicsRange.GetMin(); i < cPhysicsRange.GetMax(); ++i) {
00313                (*m_ptPhysicsEngines)[i]->Update();
00314             }
00315             pthread_testcancel();
00316             THREAD_SIGNAL_PHASE_DONE(Physics);
00317          }
00318          else {
00319             /* This thread has no engines -> dummy computation */
00320             THREAD_SIGNAL_PHASE_DONE(Physics);
00321          }
00322          /* Update media, if this thread has been assigned to them */
00323          THREAD_WAIT_FOR_GO_SIGNAL(Media);
00324          if(cMediaRange.GetSpan() > 0) {
00325             /* This thread has media, update them */
00326             for(size_t i = cMediaRange.GetMin(); i < cMediaRange.GetMax(); ++i) {
00327                (*m_ptMedia)[i]->Update();
00328             }
00329             pthread_testcancel();
00330             THREAD_SIGNAL_PHASE_DONE(Media);
00331          }
00332          else {
00333             /* This thread has no media -> dummy computation */
00334             THREAD_SIGNAL_PHASE_DONE(Media);
00335          }
00336       }
00337       pthread_cleanup_pop(1);
00338    }
00339 
00340    /****************************************/
00341    /****************************************/
00342 
00343 }