ARGoS
3
A parallel, multi-engine simulator for swarm robotics
|
00001 00007 #include "space_multi_thread_balance_length.h" 00008 #include <argos3/core/simulator/simulator.h> 00009 #include <argos3/core/utility/profiler/profiler.h> 00010 00011 #include <cstdio> 00012 00013 namespace argos { 00014 00015 /****************************************/ 00016 /****************************************/ 00017 00018 struct SCleanupThreadData { 00019 pthread_mutex_t* StartSenseControlPhaseMutex; 00020 pthread_mutex_t* StartActPhaseMutex; 00021 pthread_mutex_t* StartPhysicsPhaseMutex; 00022 pthread_mutex_t* FetchTaskMutex; 00023 }; 00024 00025 static void CleanupThread(void* p_data) { 00026 CSimulator& cSimulator = CSimulator::GetInstance(); 00027 if(cSimulator.IsProfiling()) { 00028 cSimulator.GetProfiler().CollectThreadResourceUsage(); 00029 } 00030 SCleanupThreadData& sData = 00031 *reinterpret_cast<SCleanupThreadData*>(p_data); 00032 pthread_mutex_unlock(sData.FetchTaskMutex); 00033 pthread_mutex_unlock(sData.StartSenseControlPhaseMutex); 00034 pthread_mutex_unlock(sData.StartActPhaseMutex); 00035 pthread_mutex_unlock(sData.StartPhysicsPhaseMutex); 00036 } 00037 00038 void* LaunchThreadBalanceLength(void* p_data) { 00039 /* Set up thread-safe buffers for this new thread */ 00040 LOG.AddThreadSafeBuffer(); 00041 LOGERR.AddThreadSafeBuffer(); 00042 /* Make this thread cancellable */ 00043 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); 00044 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); 00045 /* Get a handle to the thread launch data */ 00046 CSpaceMultiThreadBalanceLength::SThreadLaunchData* psData = reinterpret_cast<CSpaceMultiThreadBalanceLength::SThreadLaunchData*>(p_data); 00047 /* Create cancellation data */ 00048 SCleanupThreadData sCancelData; 00049 sCancelData.StartSenseControlPhaseMutex = &(psData->Space->m_tStartSenseControlPhaseMutex); 00050 sCancelData.StartActPhaseMutex = &(psData->Space->m_tStartActPhaseMutex); 00051 sCancelData.StartPhysicsPhaseMutex = &(psData->Space->m_tStartPhysicsPhaseMutex); 00052 sCancelData.FetchTaskMutex = &(psData->Space->m_tFetchTaskMutex); 00053 pthread_cleanup_push(CleanupThread, &sCancelData); 00054 if(psData->ThreadId == 0) { 00055 /* Execute the code for the dispatch thread */ 00056 psData->Space->DispatchThread(psData->ThreadId); 00057 } 00058 else { 00059 /* Execute the code for a slave thread */ 00060 psData->Space->SlaveThread(psData->ThreadId); 00061 } 00062 /* Dispose of cancellation data */ 00063 pthread_cleanup_pop(1); 00064 return NULL; 00065 } 00066 00067 /****************************************/ 00068 /****************************************/ 00069 00070 void CSpaceMultiThreadBalanceLength::STaskData::Reset() { 00071 Index = 0; 00072 Used = false; 00073 Done = false; 00074 } 00075 00076 /****************************************/ 00077 /****************************************/ 00078 00079 void CSpaceMultiThreadBalanceLength::Init(TConfigurationNode& t_tree) { 00080 /* Initialize the space */ 00081 CSpace::Init(t_tree); 00082 /* Initialize thread related structures */ 00083 int nErrors; 00084 /* Init mutexes */ 00085 if((nErrors = pthread_mutex_init(&m_tStartSenseControlPhaseMutex, NULL)) || 00086 (nErrors = pthread_mutex_init(&m_tStartActPhaseMutex, NULL)) || 00087 (nErrors = pthread_mutex_init(&m_tStartPhysicsPhaseMutex, NULL)) || 00088 (nErrors = pthread_mutex_init(&m_tFetchTaskMutex, NULL))) { 00089 THROW_ARGOSEXCEPTION("Error creating thread mutexes " << ::strerror(nErrors)); 00090 } 00091 /* Init conditionals */ 00092 if((nErrors = pthread_cond_init(&m_tStartSenseControlPhaseCond, NULL)) || 00093 (nErrors = pthread_cond_init(&m_tStartActPhaseCond, NULL)) || 00094 (nErrors = pthread_cond_init(&m_tStartPhysicsPhaseCond, NULL)) || 00095 (nErrors = pthread_cond_init(&m_tFetchTaskCond, NULL))) { 00096 THROW_ARGOSEXCEPTION("Error creating thread conditionals " << ::strerror(nErrors)); 00097 } 00098 /* Reset the idle thread count */ 00099 m_unSenseControlPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads(); 00100 m_unActPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads(); 00101 m_unPhysicsPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads(); 00102 /* Start threads */ 00103 StartThreads(); 00104 } 00105 00106 /****************************************/ 00107 /****************************************/ 00108 00109 void CSpaceMultiThreadBalanceLength::Destroy() { 00110 /* Destroy the threads to update the controllable entities */ 00111 int nErrors; 00112 if(m_ptThreads != NULL) { 00113 for(UInt32 i = 0; i <= CSimulator::GetInstance().GetNumThreads(); ++i) { 00114 if((nErrors = pthread_cancel(m_ptThreads[i]))) { 00115 THROW_ARGOSEXCEPTION("Error canceling threads " << ::strerror(nErrors)); 00116 } 00117 } 00118 void** ppJoinResult = new void*[CSimulator::GetInstance().GetNumThreads()+1]; 00119 for(UInt32 i = 0; i <= CSimulator::GetInstance().GetNumThreads(); ++i) { 00120 if((nErrors = pthread_join(m_ptThreads[i], ppJoinResult + i))) { 00121 THROW_ARGOSEXCEPTION("Error joining threads " << ::strerror(nErrors)); 00122 } 00123 if(ppJoinResult[i] != PTHREAD_CANCELED) { 00124 LOGERR << "[WARNING] Thread #" << i<< " not canceled" << std::endl; 00125 } 00126 } 00127 delete[] ppJoinResult; 00128 } 00129 delete[] m_ptThreads; 00130 /* Destroy the thread launch info */ 00131 if(m_psThreadData != NULL) { 00132 for(UInt32 i = 0; i <= CSimulator::GetInstance().GetNumThreads(); ++i) { 00133 delete m_psThreadData[i]; 00134 } 00135 } 00136 delete[] m_psThreadData; 00137 pthread_mutex_destroy(&m_tStartSenseControlPhaseMutex); 00138 pthread_mutex_destroy(&m_tStartActPhaseMutex); 00139 pthread_mutex_destroy(&m_tStartPhysicsPhaseMutex); 00140 pthread_mutex_destroy(&m_tFetchTaskMutex); 00141 pthread_cond_destroy(&m_tStartSenseControlPhaseCond); 00142 pthread_cond_destroy(&m_tStartActPhaseCond); 00143 pthread_cond_destroy(&m_tStartPhysicsPhaseCond); 00144 pthread_cond_destroy(&m_tFetchTaskCond); 00145 00146 /* Destroy the base space */ 00147 CSpace::Destroy(); 00148 } 00149 00150 /****************************************/ 00151 /****************************************/ 00152 00153 #define MAIN_START_PHASE(PHASE) \ 00154 pthread_mutex_lock(&m_tStart ## PHASE ## PhaseMutex); \ 00155 m_un ## PHASE ## PhaseIdleCounter = 0; \ 00156 m_sTaskData.Reset(); \ 00157 pthread_cond_broadcast(&m_tStart ## PHASE ## PhaseCond); \ 00158 pthread_mutex_unlock(&m_tStart ## PHASE ## PhaseMutex); 00159 00160 #define MAIN_WAIT_FOR_END_OF(PHASE) \ 00161 pthread_mutex_lock(&m_tStart ## PHASE ## PhaseMutex); \ 00162 while(m_un ## PHASE ## PhaseIdleCounter < CSimulator::GetInstance().GetNumThreads()) { \ 00163 pthread_cond_wait(&m_tStart ## PHASE ## PhaseCond, &m_tStart ## PHASE ## PhaseMutex); \ 00164 } \ 00165 pthread_mutex_unlock(&m_tStart ## PHASE ## PhaseMutex); 00166 00167 void CSpaceMultiThreadBalanceLength::UpdateControllableEntities() { 00168 /* Reset the idle thread count */ 00169 m_unSenseControlPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads(); 00170 m_unActPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads(); 00171 m_unPhysicsPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads(); 00172 /* Sense/control phase */ 00173 MAIN_START_PHASE(SenseControl); 00174 MAIN_WAIT_FOR_END_OF(SenseControl); 00175 /* Act phase */ 00176 MAIN_START_PHASE(Act); 00177 MAIN_WAIT_FOR_END_OF(Act); 00178 } 00179 00180 /****************************************/ 00181 /****************************************/ 00182 00183 void CSpaceMultiThreadBalanceLength::UpdatePhysics() { 00184 /* Physics phase */ 00185 MAIN_START_PHASE(Physics); 00186 MAIN_WAIT_FOR_END_OF(Physics); 00187 /* Perform entity transfer from engine to engine, if needed */ 00188 for(size_t i = 0; i < m_ptPhysicsEngines->size(); ++i) { 00189 if((*m_ptPhysicsEngines)[i]->IsEntityTransferNeeded()) { 00190 (*m_ptPhysicsEngines)[i]->TransferEntities(); 00191 } 00192 } 00193 } 00194 00195 /****************************************/ 00196 /****************************************/ 00197 00198 void CSpaceMultiThreadBalanceLength::StartThreads() { 00199 int nErrors; 00200 /* Create the threads to update the controllable entities */ 00201 m_ptThreads = new pthread_t[CSimulator::GetInstance().GetNumThreads() + 1]; 00202 m_psThreadData = new SThreadLaunchData*[CSimulator::GetInstance().GetNumThreads() + 1]; 00203 for(UInt32 i = 0; i <= CSimulator::GetInstance().GetNumThreads(); ++i) { 00204 /* Create the struct with the info to launch the thread */ 00205 m_psThreadData[i] = new SThreadLaunchData(i, this); 00206 /* Create the thread */ 00207 if((nErrors = pthread_create(m_ptThreads + i, 00208 NULL, 00209 LaunchThreadBalanceLength, 00210 reinterpret_cast<void*>(m_psThreadData[i])))) { 00211 THROW_ARGOSEXCEPTION("Error creating thread: " << ::strerror(nErrors)); 00212 } 00213 } 00214 } 00215 00216 /****************************************/ 00217 /****************************************/ 00218 00219 #define THREAD_WAIT_FOR_START_OF(PHASE) \ 00220 pthread_mutex_lock(&m_tStart ## PHASE ## PhaseMutex); \ 00221 while(m_un ## PHASE ## PhaseIdleCounter == CSimulator::GetInstance().GetNumThreads()) { \ 00222 pthread_cond_wait(&m_tStart ## PHASE ## PhaseCond, &m_tStart ## PHASE ## PhaseMutex); \ 00223 } \ 00224 pthread_mutex_unlock(&m_tStart ## PHASE ## PhaseMutex); \ 00225 pthread_testcancel(); 00226 00227 #define THREAD_DISPATCH_TASK(TASKVEC) \ 00228 if(! (TASKVEC).empty()) { \ 00229 while(! m_sTaskData.Done) { \ 00230 pthread_mutex_lock(&m_tFetchTaskMutex); \ 00231 while(! m_sTaskData.Used) { \ 00232 pthread_cond_wait(&m_tFetchTaskCond, &m_tFetchTaskMutex); \ 00233 } \ 00234 ++m_sTaskData.Index; \ 00235 if(m_sTaskData.Index < (TASKVEC).size()) { \ 00236 m_sTaskData.Used = false; \ 00237 } \ 00238 else { \ 00239 m_sTaskData.Done = true; \ 00240 } \ 00241 pthread_cond_broadcast(&m_tFetchTaskCond); \ 00242 pthread_mutex_unlock(&m_tFetchTaskMutex); \ 00243 pthread_testcancel(); \ 00244 } \ 00245 } \ 00246 else { \ 00247 m_sTaskData.Done = true; \ 00248 pthread_mutex_unlock(&m_tFetchTaskMutex); \ 00249 pthread_testcancel(); \ 00250 } 00251 00252 /****************************************/ 00253 /****************************************/ 00254 00255 void CSpaceMultiThreadBalanceLength::DispatchThread(UInt32 un_id) { 00256 while(1) { 00257 THREAD_WAIT_FOR_START_OF(SenseControl); 00258 THREAD_DISPATCH_TASK(m_vecControllableEntities); 00259 THREAD_WAIT_FOR_START_OF(Act); 00260 THREAD_DISPATCH_TASK(m_vecControllableEntities); 00261 THREAD_WAIT_FOR_START_OF(Physics); 00262 THREAD_DISPATCH_TASK(*m_ptPhysicsEngines); 00263 } 00264 } 00265 00266 /****************************************/ 00267 /****************************************/ 00268 00269 #define THREAD_PERFORM_TASK(PHASE, SNIPPET) \ 00270 while(1) { \ 00271 pthread_mutex_lock(&m_tFetchTaskMutex); \ 00272 while(m_sTaskData.Used && ! m_sTaskData.Done) { \ 00273 pthread_cond_wait(&m_tFetchTaskCond, &m_tFetchTaskMutex); \ 00274 } \ 00275 if(! m_sTaskData.Done) { \ 00276 unTaskIndex = m_sTaskData.Index; \ 00277 m_sTaskData.Used = true; \ 00278 pthread_cond_broadcast(&m_tFetchTaskCond); \ 00279 pthread_mutex_unlock(&m_tFetchTaskMutex); \ 00280 pthread_testcancel(); \ 00281 { \ 00282 SNIPPET; \ 00283 } \ 00284 pthread_testcancel(); \ 00285 } \ 00286 else { \ 00287 pthread_mutex_unlock(&m_tFetchTaskMutex); \ 00288 pthread_testcancel(); \ 00289 pthread_mutex_lock(&m_tStart ## PHASE ## PhaseMutex); \ 00290 ++m_un ## PHASE ## PhaseIdleCounter; \ 00291 pthread_cond_broadcast(&m_tStart ## PHASE ## PhaseCond); \ 00292 pthread_mutex_unlock(&m_tStart ## PHASE ## PhaseMutex); \ 00293 pthread_testcancel(); \ 00294 break; \ 00295 } \ 00296 } \ 00297 pthread_testcancel(); 00298 00299 void CSpaceMultiThreadBalanceLength::SlaveThread(UInt32 un_id) { 00300 /* Task index */ 00301 size_t unTaskIndex; 00302 while(1) { 00303 THREAD_WAIT_FOR_START_OF(SenseControl); 00304 THREAD_PERFORM_TASK( 00305 SenseControl, 00306 m_vecControllableEntities[unTaskIndex]->Sense(); 00307 m_vecControllableEntities[unTaskIndex]->ControlStep(); 00308 ); 00309 THREAD_WAIT_FOR_START_OF(Act); 00310 THREAD_PERFORM_TASK( 00311 Act, 00312 m_vecControllableEntities[unTaskIndex]->Act(); 00313 ); 00314 THREAD_WAIT_FOR_START_OF(Physics); 00315 THREAD_PERFORM_TASK( 00316 Physics, 00317 (*m_ptPhysicsEngines)[unTaskIndex]->Update(); 00318 ); 00319 } 00320 } 00321 00322 /****************************************/ 00323 /****************************************/ 00324 00325 }