ARGoS
3
A parallel, multi-engine simulator for swarm robotics
|
00001 00007 #include <unistd.h> 00008 #include <cstring> 00009 #include <argos3/core/simulator/simulator.h> 00010 #include <argos3/core/utility/profiler/profiler.h> 00011 #include "space_multi_thread_balance_quantity.h" 00012 00013 namespace argos { 00014 00015 /****************************************/ 00016 /****************************************/ 00017 00018 struct SCleanupUpdateThreadData { 00019 pthread_mutex_t* SenseControlStepConditionalMutex; 00020 pthread_mutex_t* ActConditionalMutex; 00021 pthread_mutex_t* PhysicsConditionalMutex; 00022 pthread_mutex_t* MediaConditionalMutex; 00023 }; 00024 00025 static void CleanupUpdateThread(void* p_data) { 00026 CSimulator& cSimulator = CSimulator::GetInstance(); 00027 if(cSimulator.IsProfiling()) { 00028 cSimulator.GetProfiler().CollectThreadResourceUsage(); 00029 } 00030 SCleanupUpdateThreadData& sData = 00031 *reinterpret_cast<SCleanupUpdateThreadData*>(p_data); 00032 pthread_mutex_unlock(sData.SenseControlStepConditionalMutex); 00033 pthread_mutex_unlock(sData.ActConditionalMutex); 00034 pthread_mutex_unlock(sData.PhysicsConditionalMutex); 00035 pthread_mutex_unlock(sData.MediaConditionalMutex); 00036 } 00037 00038 void* LaunchUpdateThreadBalanceQuantity(void* p_data) { 00039 LOG.AddThreadSafeBuffer(); 00040 LOGERR.AddThreadSafeBuffer(); 00041 CSpaceMultiThreadBalanceQuantity::SUpdateThreadData* psData = reinterpret_cast<CSpaceMultiThreadBalanceQuantity::SUpdateThreadData*>(p_data); 00042 psData->Space->UpdateThread(psData->ThreadId); 00043 return NULL; 00044 } 00045 00046 /****************************************/ 00047 /****************************************/ 00048 00049 void CSpaceMultiThreadBalanceQuantity::Init(TConfigurationNode& t_tree) { 00050 /* Initialize the space */ 00051 CSpace::Init(t_tree); 00052 /* Initialize thread related structures */ 00053 int nErrors; 00054 /* First the counters */ 00055 m_unSenseControlStepPhaseDoneCounter = CSimulator::GetInstance().GetNumThreads(); 00056 m_unActPhaseDoneCounter = CSimulator::GetInstance().GetNumThreads(); 00057 m_unPhysicsPhaseDoneCounter = CSimulator::GetInstance().GetNumThreads(); 00058 m_unMediaPhaseDoneCounter = CSimulator::GetInstance().GetNumThreads(); 00059 /* Then the mutexes */ 00060 if((nErrors = pthread_mutex_init(&m_tSenseControlStepConditionalMutex, NULL)) || 00061 (nErrors = pthread_mutex_init(&m_tActConditionalMutex, NULL)) || 00062 (nErrors = pthread_mutex_init(&m_tPhysicsConditionalMutex, NULL)) || 00063 (nErrors = pthread_mutex_init(&m_tMediaConditionalMutex, NULL))) { 00064 THROW_ARGOSEXCEPTION("Error creating thread mutexes " << ::strerror(nErrors)); 00065 } 00066 /* Finally the conditionals */ 00067 if((nErrors = pthread_cond_init(&m_tSenseControlStepConditional, NULL)) || 00068 (nErrors = pthread_cond_init(&m_tActConditional, NULL)) || 00069 (nErrors = pthread_cond_init(&m_tPhysicsConditional, NULL)) || 00070 (nErrors = pthread_cond_init(&m_tMediaConditional, NULL))) { 00071 THROW_ARGOSEXCEPTION("Error creating thread conditionals " << ::strerror(nErrors)); 00072 } 00073 /* Start threads */ 00074 StartThreads(); 00075 } 00076 00077 /****************************************/ 00078 /****************************************/ 00079 00080 void CSpaceMultiThreadBalanceQuantity::StartThreads() { 00081 int nErrors; 00082 /* Create the threads to update the controllable entities */ 00083 m_ptUpdateThreads = new pthread_t[CSimulator::GetInstance().GetNumThreads()]; 00084 m_psUpdateThreadData = new SUpdateThreadData*[CSimulator::GetInstance().GetNumThreads()]; 00085 for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) { 00086 /* Create the struct with the info to launch the thread */ 00087 m_psUpdateThreadData[i] = new SUpdateThreadData(i, this); 00088 /* Create the thread */ 00089 if((nErrors = pthread_create(m_ptUpdateThreads + i, 00090 NULL, 00091 LaunchUpdateThreadBalanceQuantity, 00092 reinterpret_cast<void*>(m_psUpdateThreadData[i])))) { 00093 THROW_ARGOSEXCEPTION("Error creating thread: " << ::strerror(nErrors)); 00094 } 00095 } 00096 } 00097 00098 /****************************************/ 00099 /****************************************/ 00100 00101 void CSpaceMultiThreadBalanceQuantity::Destroy() 00102 { 00103 /* Destroy the threads to update the controllable entities */ 00104 int nErrors; 00105 if(m_ptUpdateThreads != NULL) { 00106 for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) { 00107 if((nErrors = pthread_cancel(m_ptUpdateThreads[i]))) { 00108 THROW_ARGOSEXCEPTION("Error canceling controllable entities update threads " << ::strerror(nErrors)); 00109 } 00110 } 00111 void** ppJoinResult = new void*[CSimulator::GetInstance().GetNumThreads()]; 00112 for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) { 00113 if((nErrors = pthread_join(m_ptUpdateThreads[i], ppJoinResult + i))) { 00114 THROW_ARGOSEXCEPTION("Error joining controllable entities update threads " << ::strerror(nErrors)); 00115 } 00116 if(ppJoinResult[i] != PTHREAD_CANCELED) { 00117 LOGERR << "[WARNING] Controllable entities update thread #" << i<< " not canceled" << std::endl; 00118 } 00119 } 00120 delete[] ppJoinResult; 00121 } 00122 delete[] m_ptUpdateThreads; 00123 /* Destroy the thread launch info */ 00124 if(m_psUpdateThreadData != NULL) { 00125 for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) { 00126 delete m_psUpdateThreadData[i]; 00127 } 00128 } 00129 delete[] m_psUpdateThreadData; 00130 pthread_mutex_destroy(&m_tSenseControlStepConditionalMutex); 00131 pthread_mutex_destroy(&m_tActConditionalMutex); 00132 pthread_cond_destroy(&m_tSenseControlStepConditional); 00133 pthread_cond_destroy(&m_tActConditional); 00134 pthread_cond_destroy(&m_tPhysicsConditional); 00135 pthread_cond_destroy(&m_tMediaConditional); 00136 /* Destroy the base space */ 00137 CSpace::Destroy(); 00138 } 00139 00140 /****************************************/ 00141 /****************************************/ 00142 00143 void CSpaceMultiThreadBalanceQuantity::AddControllableEntity(CControllableEntity& c_entity) { 00144 m_bIsControllableEntityAssignmentRecalculationNeeded = true; 00145 CSpace::AddControllableEntity(c_entity); 00146 } 00147 00148 /****************************************/ 00149 /****************************************/ 00150 00151 void CSpaceMultiThreadBalanceQuantity::RemoveControllableEntity(CControllableEntity& c_entity) { 00152 m_bIsControllableEntityAssignmentRecalculationNeeded = true; 00153 CSpace::RemoveControllableEntity(c_entity); 00154 } 00155 00156 /****************************************/ 00157 /****************************************/ 00158 00159 #define MAIN_SEND_GO_FOR_PHASE(PHASE) \ 00160 LOG.Flush(); \ 00161 LOGERR.Flush(); \ 00162 pthread_mutex_lock(&m_t ## PHASE ## ConditionalMutex); \ 00163 m_un ## PHASE ## PhaseDoneCounter = 0; \ 00164 pthread_cond_broadcast(&m_t ## PHASE ## Conditional); \ 00165 pthread_mutex_unlock(&m_t ## PHASE ## ConditionalMutex); 00166 00167 #define MAIN_WAIT_FOR_PHASE_END(PHASE) \ 00168 pthread_mutex_lock(&m_t ## PHASE ## ConditionalMutex); \ 00169 while(m_un ## PHASE ## PhaseDoneCounter < CSimulator::GetInstance().GetNumThreads()) { \ 00170 pthread_cond_wait(&m_t ## PHASE ## Conditional, &m_t ## PHASE ## ConditionalMutex); \ 00171 } \ 00172 pthread_mutex_unlock(&m_t ## PHASE ## ConditionalMutex); 00173 00174 void CSpaceMultiThreadBalanceQuantity::UpdateControllableEntities() { 00175 MAIN_SEND_GO_FOR_PHASE(SenseControlStep); 00176 MAIN_WAIT_FOR_PHASE_END(SenseControlStep); 00177 MAIN_SEND_GO_FOR_PHASE(Act); 00178 MAIN_WAIT_FOR_PHASE_END(Act); 00179 /* Avoid recalculation at the next time step */ 00180 m_bIsControllableEntityAssignmentRecalculationNeeded = false; 00181 } 00182 00183 /****************************************/ 00184 /****************************************/ 00185 00186 void CSpaceMultiThreadBalanceQuantity::UpdatePhysics() { 00187 /* Update the physics engines */ 00188 MAIN_SEND_GO_FOR_PHASE(Physics); 00189 MAIN_WAIT_FOR_PHASE_END(Physics); 00190 /* Perform entity transfer from engine to engine, if needed */ 00191 for(size_t i = 0; i < m_ptPhysicsEngines->size(); ++i) { 00192 if((*m_ptPhysicsEngines)[i]->IsEntityTransferNeeded()) { 00193 (*m_ptPhysicsEngines)[i]->TransferEntities(); 00194 } 00195 } 00196 } 00197 00198 /****************************************/ 00199 /****************************************/ 00200 00201 void CSpaceMultiThreadBalanceQuantity::UpdateMedia() { 00202 /* Update the media */ 00203 MAIN_SEND_GO_FOR_PHASE(Media); 00204 MAIN_WAIT_FOR_PHASE_END(Media); 00205 } 00206 00207 /****************************************/ 00208 /****************************************/ 00209 00210 #define THREAD_WAIT_FOR_GO_SIGNAL(PHASE) \ 00211 pthread_mutex_lock(&m_t ## PHASE ## ConditionalMutex); \ 00212 while(m_un ## PHASE ## PhaseDoneCounter == CSimulator::GetInstance().GetNumThreads()) { \ 00213 pthread_cond_wait(&m_t ## PHASE ## Conditional, &m_t ## PHASE ## ConditionalMutex); \ 00214 } \ 00215 pthread_mutex_unlock(&m_t ## PHASE ## ConditionalMutex); \ 00216 pthread_testcancel(); 00217 00218 #define THREAD_SIGNAL_PHASE_DONE(PHASE) \ 00219 pthread_mutex_lock(&m_t ## PHASE ## ConditionalMutex); \ 00220 ++m_un ## PHASE ## PhaseDoneCounter; \ 00221 pthread_cond_broadcast(&m_t ## PHASE ## Conditional); \ 00222 pthread_mutex_unlock(&m_t ## PHASE ## ConditionalMutex); \ 00223 pthread_testcancel(); 00224 00225 CRange<size_t> CalculatePluginRangeForThread(size_t un_id, 00226 size_t un_tot_plugins) { 00227 /* This is the minimum number of plugins assigned to a thread */ 00228 size_t unMinPortion = un_tot_plugins / CSimulator::GetInstance().GetNumThreads(); 00229 /* If the division has a remainder, the extra plugins must be assigned too */ 00230 size_t unExtraPortion = un_tot_plugins % CSimulator::GetInstance().GetNumThreads(); 00231 /* Calculate the range */ 00232 if(unMinPortion == 0) { 00233 /* Not all threads get a plugin */ 00234 if(un_id < unExtraPortion) { 00235 /* This thread does */ 00236 return CRange<size_t>(un_id, un_id+1); 00237 } 00238 else { 00239 /* This thread does not */ 00240 return CRange<size_t>(); 00241 } 00242 } 00243 else { 00244 /* For sure this thread will get unMinPortion plugins, does it get an extra too? */ 00245 if(un_id < unExtraPortion) { 00246 /* Yes, it gets an extra */ 00247 return CRange<size_t>( un_id * (unMinPortion+1), 00248 (un_id+1) * (unMinPortion+1)); 00249 } 00250 else { 00251 /* No, it doesn't get an extra */ 00252 return CRange<size_t>(unExtraPortion * (unMinPortion+1) + (un_id-unExtraPortion) * unMinPortion, 00253 unExtraPortion * (unMinPortion+1) + (un_id-unExtraPortion+1) * unMinPortion); 00254 } 00255 } 00256 } 00257 00258 void CSpaceMultiThreadBalanceQuantity::UpdateThread(UInt32 un_id) { 00259 /* Copy the id */ 00260 UInt32 unId = un_id; 00261 /* Create cancellation data */ 00262 SCleanupUpdateThreadData sCancelData; 00263 sCancelData.SenseControlStepConditionalMutex = &m_tSenseControlStepConditionalMutex; 00264 sCancelData.ActConditionalMutex = &m_tActConditionalMutex; 00265 sCancelData.PhysicsConditionalMutex = &m_tPhysicsConditionalMutex; 00266 sCancelData.MediaConditionalMutex = &m_tMediaConditionalMutex; 00267 pthread_cleanup_push(CleanupUpdateThread, &sCancelData); 00268 /* Id range for the physics engines assigned to this thread */ 00269 CRange<size_t> cPhysicsRange = CalculatePluginRangeForThread(unId, m_ptPhysicsEngines->size()); 00270 /* Id range for the physics engines assigned to this thread */ 00271 CRange<size_t> cMediaRange = CalculatePluginRangeForThread(unId, m_ptMedia->size()); 00272 /* Variables storing the portion of entities to update */ 00273 CRange<size_t> cEntityRange; 00274 while(1) { 00275 THREAD_WAIT_FOR_GO_SIGNAL(SenseControlStep); 00276 /* Calculate the portion of entities to update, if needed */ 00277 if(m_bIsControllableEntityAssignmentRecalculationNeeded) { 00278 cEntityRange = CalculatePluginRangeForThread(unId, m_vecControllableEntities.size()); 00279 } 00280 /* Cope with the fact that there may be less entities than threads */ 00281 if(cEntityRange.GetSpan() > 0) { 00282 /* This thread has entities */ 00283 /* Update sensor readings and call controllers */ 00284 for(size_t i = cEntityRange.GetMin(); i < cEntityRange.GetMax(); ++i) { 00285 m_vecControllableEntities[i]->Sense(); 00286 m_vecControllableEntities[i]->ControlStep(); 00287 } 00288 pthread_testcancel(); 00289 THREAD_SIGNAL_PHASE_DONE(SenseControlStep); 00290 /* Actuate control choices */ 00291 THREAD_WAIT_FOR_GO_SIGNAL(Act); 00292 for(size_t i = cEntityRange.GetMin(); i < cEntityRange.GetMax(); ++i) { 00293 m_vecControllableEntities[i]->Act(); 00294 } 00295 pthread_testcancel(); 00296 THREAD_SIGNAL_PHASE_DONE(Act); 00297 } 00298 else { 00299 /* This thread has no entities -> dummy computation */ 00300 /* Update sensor readings */ 00301 /* Call controllers */ 00302 THREAD_WAIT_FOR_GO_SIGNAL(SenseControlStep); 00303 THREAD_SIGNAL_PHASE_DONE(SenseControlStep); 00304 /* Actuate control choices */ 00305 THREAD_WAIT_FOR_GO_SIGNAL(Act); 00306 THREAD_SIGNAL_PHASE_DONE(Act); 00307 } 00308 /* Update physics engines, if this thread has been assigned to them */ 00309 THREAD_WAIT_FOR_GO_SIGNAL(Physics); 00310 if(cPhysicsRange.GetSpan() > 0) { 00311 /* This thread has engines, update them */ 00312 for(size_t i = cPhysicsRange.GetMin(); i < cPhysicsRange.GetMax(); ++i) { 00313 (*m_ptPhysicsEngines)[i]->Update(); 00314 } 00315 pthread_testcancel(); 00316 THREAD_SIGNAL_PHASE_DONE(Physics); 00317 } 00318 else { 00319 /* This thread has no engines -> dummy computation */ 00320 THREAD_SIGNAL_PHASE_DONE(Physics); 00321 } 00322 /* Update media, if this thread has been assigned to them */ 00323 THREAD_WAIT_FOR_GO_SIGNAL(Media); 00324 if(cMediaRange.GetSpan() > 0) { 00325 /* This thread has media, update them */ 00326 for(size_t i = cMediaRange.GetMin(); i < cMediaRange.GetMax(); ++i) { 00327 (*m_ptMedia)[i]->Update(); 00328 } 00329 pthread_testcancel(); 00330 THREAD_SIGNAL_PHASE_DONE(Media); 00331 } 00332 else { 00333 /* This thread has no media -> dummy computation */ 00334 THREAD_SIGNAL_PHASE_DONE(Media); 00335 } 00336 } 00337 pthread_cleanup_pop(1); 00338 } 00339 00340 /****************************************/ 00341 /****************************************/ 00342 00343 }