fpmas 1.6
mutex_server.h
Go to the documentation of this file.
1#ifndef FPMAS_MUTEX_SERVER_H
2#define FPMAS_MUTEX_SERVER_H
3
11#include "../data_update_pack.h"
12#include "fpmas/utils/log.h"
13#include "server_pack.h"
14
15namespace fpmas { namespace synchro { namespace hard {
16 using api::Epoch;
17 using api::Tag;
18
20
24 template<typename T>
26 public api::MutexServer<T> {
34
35 private:
36 Epoch epoch = Epoch::EVEN;
37 std::unordered_map<DistributedId, HardSyncMutex*> mutex_map;
38 MpiComm& comm;
39 IdMpi& id_mpi;
40 DataMpi& data_mpi;
41 DataUpdateMpi& data_update_mpi;
42 ServerPackBase& server_pack;
43
44 void handleIncomingReadAcquireLock();
45
46 void handleRead(DistributedId id, int source);
47 void respondToRead(DistributedId id, int source);
48
49 void handleAcquire(DistributedId id, int source);
50 void respondToAcquire(DistributedId id, int source);
51 void handleReleaseAcquire(DataUpdatePack<T>& update);
52
53 void respondToRequests(HardSyncMutex*);
54
55 void handleLock(DistributedId id, int source);
56 void respondToLock(DistributedId id, int source);
57 void handleUnlock(DistributedId id);
58
59 void handleLockShared(DistributedId id, int source);
60 void respondToLockShared(DistributedId id, int source);
61 void handleUnlockShared(DistributedId id);
62
63 bool respondToRequests(HardSyncMutex*, const Request& request_to_wait);
64 bool handleIncomingRequests(const Request& request_to_wait);
65 bool handleReleaseAcquire(DataUpdatePack<T>& update, const Request& request_to_wait);
66 bool handleUnlock(DistributedId id, const Request& request_to_wait);
67 bool handleUnlockShared(DistributedId id, const Request& request_to_wait);
68
69 public:
81 MpiComm& comm, IdMpi& id_mpi, DataMpi& data_mpi,
82 DataUpdateMpi& data_update_mpi,
83 ServerPackBase& server_pack
84 ) :
85 comm(comm), id_mpi(id_mpi), data_mpi(data_mpi),
86 data_update_mpi(data_update_mpi),
87 server_pack(server_pack) {
88 }
89
90 void setEpoch(api::Epoch e) override {this->epoch = e;}
91 Epoch getEpoch() const override {return this->epoch;}
92
93 void manage(DistributedId id, HardSyncMutex* mutex) override {
94 mutex_map[id] = mutex;
95 }
96 void remove(DistributedId id) override {
97 mutex_map.erase(id);
98 }
99
105 const std::unordered_map<DistributedId, HardSyncMutex*>& getManagedMutexes() const {
106 return mutex_map;
107 }
108
109 void handleIncomingRequests() override;
110
111 void wait(const Request&) override;
112 void notify(DistributedId) override;
113 };
114
115 template<typename T>
116 void MutexServer<T>::handleIncomingReadAcquireLock() {
118
119 // Check read request
120 if(id_mpi.Iprobe(MPI_ANY_SOURCE, epoch | Tag::READ, status)) {
121 DistributedId id = id_mpi.recv(status.source, status.tag);
122 FPMAS_LOGD(this->comm.getRank(), "MUTEX_SERVER", "receive read request %s from %i", FPMAS_C_STR(id), status.source);
123 this->handleRead(id, status.source);
124 }
125
126 // Check acquire request
127 if(id_mpi.Iprobe(MPI_ANY_SOURCE, epoch | Tag::ACQUIRE, status)) {
128 DistributedId id = id_mpi.recv(status.source, status.tag);
129 FPMAS_LOGD(this->comm.getRank(), "MUTEX_SERVER", "receive acquire request %s from %i", FPMAS_C_STR(id), status.source);
130 this->handleAcquire(id, status.source);
131 }
132
133 // Check lock
134 if(id_mpi.Iprobe(MPI_ANY_SOURCE, epoch | Tag::LOCK, status)) {
135 DistributedId id = id_mpi.recv(status.source, status.tag);
136 FPMAS_LOGD(this->comm.getRank(), "MUTEX_SERVER", "receive lock request %s from %i", FPMAS_C_STR(id), status.source);
137 this->handleLock(id, status.source);
138 }
139
140 // Check shared lock
141 if(id_mpi.Iprobe(MPI_ANY_SOURCE, epoch | Tag::LOCK_SHARED, status)) {
142 DistributedId id = id_mpi.recv(status.source, status.tag);
143 FPMAS_LOGD(this->comm.getRank(), "MUTEX_SERVER", "receive shared lock request %s from %i", FPMAS_C_STR(id), status.source);
144 this->handleLockShared(id, status.source);
145 }
146 }
159 template<typename T>
162 handleIncomingReadAcquireLock();
163
164 // Check release acquire
165 if(data_update_mpi.Iprobe(MPI_ANY_SOURCE, epoch | Tag::RELEASE_ACQUIRE, status)) {
166 DataUpdatePack<T> update = data_update_mpi.recv(status.source, status.tag);
167 FPMAS_LOGV(this->comm.getRank(), "MUTEX_SERVER", "receive release acquire %s from %i",
168 FPMAS_C_STR(update.id), status.source);
169
170 this->handleReleaseAcquire(update);
171 }
172
173 // Check unlock
174 if(id_mpi.Iprobe(MPI_ANY_SOURCE, epoch | Tag::UNLOCK, status)) {
175 DistributedId id = id_mpi.recv(status.source, status.tag);
176 FPMAS_LOGV(this->comm.getRank(), "MUTEX_SERVER", "receive unlock %s from %i",
177 FPMAS_C_STR(id), status.source);
178
179 this->handleUnlock(id);
180 }
181
182 // Check shared unlock
183 if(id_mpi.Iprobe(MPI_ANY_SOURCE, epoch | Tag::UNLOCK_SHARED, status)) {
184 DistributedId id = id_mpi.recv(status.source, status.tag);
185 FPMAS_LOGV(this->comm.getRank(), "MUTEX_SERVER", "receive unlock shared %s from %i",
186 FPMAS_C_STR(id), status.source);
187
188 this->handleUnlockShared(id);
189 }
190 }
191
192 /*
193 * Handles a read request.
194 * The request is transmitted to the corresponding ReaderWriter instance, that
195 * will respond immediately if the resource is available or put the request in an waiting
196 * queue otherwise.
197 */
198 template<typename T>
199 void MutexServer<T>::handleRead(DistributedId id, int source) {
200 auto* mutex = mutex_map.find(id)->second;
201 if(mutex->locked()) {
202 FPMAS_LOGV(comm.getRank(), "MUTEX_SERVER", "Enqueueing READ request of node %s for %i",
203 FPMAS_C_STR(id), source);
204 mutex->pushRequest(Request(id, source, MutexRequestType::READ));
205 } else {
206 respondToRead(id, source);
207 }
208 }
209
210 /*
211 * Sends a read response to the source proc, reading data using the
212 * resourceManager.
213 */
214 template<typename T>
215 void MutexServer<T>::respondToRead(DistributedId id, int source) {
216 auto* mutex = mutex_map.find(id)->second;
217 this->MutexServerBase::lockShared(mutex);
218 // Perform the response
219 const T& data = mutex->data();
220 server_pack.pendingRequests().emplace_back(
222 data_mpi.Isend(
223 data, source, epoch | Tag::READ_RESPONSE,
224 server_pack.pendingRequests().back());
225 }
226
227 /*
228 * Handles an acquire request.
229 * The request is transmitted to the corresponding ReaderWriter instance, that
230 * will respond if the resource immediately is available or put the request in an waiting
231 * queue otherwise.
232 */
233 template<typename T>
234 void MutexServer<T>::handleAcquire(DistributedId id, int source) {
235 auto* mutex = mutex_map.find(id)->second;
236 if(mutex->locked() || mutex->sharedLockCount() > 0) {
237 FPMAS_LOGV(comm.getRank(), "MUTEX_SERVER", "Enqueueing ACQUIRE request of node %s for %i",
238 FPMAS_C_STR(id), source);
239 mutex->pushRequest(Request(id, source, MutexRequestType::ACQUIRE));
240 } else {
241 respondToAcquire(id, source);
242 }
243 }
244
245
246 /*
247 * Sends an acquire response to the source proc, reading data using the
248 * resourceManager.
249 */
250 template<typename T>
251 void MutexServer<T>::respondToAcquire(DistributedId id, int source) {
252 auto* mutex = mutex_map.find(id)->second;
253 this->MutexServerBase::lock(mutex);
254 const T& data = mutex->data();
255 server_pack.pendingRequests().emplace_back(
257 data_mpi.Isend(
258 data, source, epoch | Tag::ACQUIRE_RESPONSE,
259 server_pack.pendingRequests().back()
260 );
261 }
262
263 template<typename T>
264 void MutexServer<T>::respondToRequests(HardSyncMutex* mutex) {
265 FPMAS_LOGV(comm.getRank(), "MUTEX_SERVER", "Unqueueing requests...", "");
266 std::queue<Request> requests = mutex->requestsToProcess();
267 while(!requests.empty()) {
268 Request request = requests.front();
269 switch(request.type) {
270 case MutexRequestType::READ :
271 FPMAS_LOGV(comm.getRank(), "MUTEX_SERVER", "%i->READ(%s)",
272 request.source, FPMAS_C_STR(request.id));
273 respondToRead(request.id, request.source);
274 break;
275 case MutexRequestType::LOCK :
276 FPMAS_LOGV(comm.getRank(), "MUTEX_SERVER", "%i->LOCK(%s)",
277 request.source, FPMAS_C_STR(request.id));
278 respondToLock(request.id, request.source);
279 break;
280 case MutexRequestType::ACQUIRE :
281 FPMAS_LOGV(comm.getRank(), "MUTEX_SERVER", "%i->ACQUIRE(%s)",
282 request.source, FPMAS_C_STR(request.id));
283 respondToAcquire(request.id, request.source);
284 break;
285 case MutexRequestType::LOCK_SHARED :
286 FPMAS_LOGV(comm.getRank(), "MUTEX_SERVER", "%i->LOCK_SHARED(%s)",
287 request.source, FPMAS_C_STR(request.id));
288 respondToLockShared(request.id, request.source);
289 }
290 requests.pop();
291 }
292 }
293
294 template<typename T>
295 void MutexServer<T>::handleReleaseAcquire(DataUpdatePack<T>& update) {
296 auto* mutex = mutex_map.find(update.id)->second;
297 this->MutexServerBase::unlock(mutex);
298 mutex->data() = std::move(update.updated_data);
299
300 respondToRequests(mutex);
301 }
302
303 /*
304 * Handles a lock request.
305 * The request is transmitted to the corresponding ReaderWriter instance, that
306 * will respond if the resource immediately is available or put the request in an waiting
307 * queue otherwise.
308 */
309 template<typename T>
310 void MutexServer<T>::handleLock(DistributedId id, int source) {
311 auto* mutex = mutex_map.find(id)->second;
312 if(mutex->locked() || mutex->sharedLockCount() > 0) {
313 FPMAS_LOGV(comm.getRank(), "MUTEX_SERVER", "Enqueueing LOCK request of node %s for %i",
314 FPMAS_C_STR(id), source);
315 mutex->pushRequest(Request(id, source, MutexRequestType::LOCK));
316 } else {
317 respondToLock(id, source);
318 }
319 }
320
321
322 /*
323 * Sends an acquire response to the source proc, reading data using the
324 * resourceManager.
325 */
326 template<typename T>
327 void MutexServer<T>::respondToLock(DistributedId id, int source) {
328 auto* mutex = mutex_map.find(id)->second;
329 this->MutexServerBase::lock(mutex);
330
331 server_pack.pendingRequests().emplace_back(
333 comm.Isend(source, epoch | Tag::LOCK_RESPONSE,
334 server_pack.pendingRequests().back());
335 }
336
337 template<typename T>
338 void MutexServer<T>::handleUnlock(DistributedId id) {
339 auto* mutex = mutex_map.find(id)->second;
340 this->MutexServerBase::unlock(mutex);
341
342 respondToRequests(mutex);
343 }
344
345 template<typename T>
346 void MutexServer<T>::handleLockShared(DistributedId id, int source) {
347 auto* mutex = mutex_map.find(id)->second;
348 if(mutex->locked()) {
349 FPMAS_LOGV(comm.getRank(), "MUTEX_SERVER", "Enqueueing LOCK_SHARED request of node %s for %i",
350 FPMAS_C_STR(id), source);
351 mutex->pushRequest(Request(id, source, MutexRequestType::LOCK_SHARED));
352 } else {
353 respondToLockShared(id, source);
354 }
355 }
356
357 template<typename T>
358 void MutexServer<T>::respondToLockShared(DistributedId id, int source) {
359 auto* mutex = mutex_map.find(id)->second;
360 this->MutexServerBase::lockShared(mutex);
361
362 server_pack.pendingRequests().emplace_back(
364 comm.Isend(source, epoch | Tag::LOCK_SHARED_RESPONSE,
365 server_pack.pendingRequests().back());
366 }
367
368 template<typename T>
369 void MutexServer<T>::handleUnlockShared(DistributedId id) {
370 auto* mutex = mutex_map.find(id)->second;
371 this->MutexServerBase::unlockShared(mutex);
372
373 // No requests will be processed if they is still at least one
374 // shared lock. See mutex::requestsToProcess.
375 respondToRequests(mutex);
376 }
377
378
379 /* Wait variants */
380
381 template<typename T>
382 bool MutexServer<T>::respondToRequests(HardSyncMutex* mutex, const Request& request_to_wait) {
383 FPMAS_LOGV(comm.getRank(), "MUTEX_SERVER", "Unqueueing requests...", "");
384 std::queue<Request> requests = mutex->requestsToProcess();
385 while(!requests.empty()) {
386 Request request = requests.front();
387 if(request.source != Request::LOCAL) {
388 switch(request.type) {
389 case MutexRequestType::READ :
390 FPMAS_LOGV(comm.getRank(), "MUTEX_SERVER", "%i->READ(%s)",
391 request.source, FPMAS_C_STR(request.id));
392 respondToRead(request.id, request.source);
393 break;
394 case MutexRequestType::LOCK :
395 FPMAS_LOGV(comm.getRank(), "MUTEX_SERVER", "%i->LOCK(%s)",
396 request.source, FPMAS_C_STR(request.id));
397 respondToLock(request.id, request.source);
398 break;
399 case MutexRequestType::ACQUIRE :
400 FPMAS_LOGV(comm.getRank(), "MUTEX_SERVER", "%i->ACQUIRE(%s)",
401 request.source, FPMAS_C_STR(request.id));
402 respondToAcquire(request.id, request.source);
403 break;
404 case MutexRequestType::LOCK_SHARED :
405 FPMAS_LOGV(comm.getRank(), "MUTEX_SERVER", "%i->LOCK_SHARED(%s)",
406 request.source, FPMAS_C_STR(request.id));
407 respondToLockShared(request.id, request.source);
408 }
409 requests.pop();
410 } else {
411 if(request == request_to_wait) {
412 return true;
413 }
414 }
415 }
416 return false;
417 }
418
419 template<typename T>
420 bool MutexServer<T>::handleReleaseAcquire(DataUpdatePack<T>& update, const Request& request_to_wait) {
421 auto* mutex = mutex_map.find(update.id)->second;
422 this->MutexServerBase::unlock(mutex);
423 mutex->data() = std::move(update.updated_data);
424
425 return respondToRequests(mutex, request_to_wait);
426 }
427
428 template<typename T>
429 bool MutexServer<T>::handleUnlock(DistributedId id, const Request& request_to_wait) {
430 auto* mutex = mutex_map.find(id)->second;
431 this->MutexServerBase::unlock(mutex);
432
433 return respondToRequests(mutex, request_to_wait);
434 }
435
436 template<typename T>
437 bool MutexServer<T>::handleUnlockShared(DistributedId id, const Request& request_to_wait) {
438 auto* mutex = mutex_map.find(id)->second;
439 this->MutexServerBase::unlockShared(mutex);
440
441 return respondToRequests(mutex, request_to_wait);
442 }
443
444 template<typename T>
445 bool MutexServer<T>::handleIncomingRequests(const Request& request_to_wait) {
446 handleIncomingReadAcquireLock();
447
449
450 // Check release acquire
451 if(data_update_mpi.Iprobe(MPI_ANY_SOURCE, epoch | Tag::RELEASE_ACQUIRE, status)) {
452 DataUpdatePack<T> update = data_update_mpi.recv(status.source, status.tag);
453 FPMAS_LOGV(this->comm.getRank(), "MUTEX_SERVER", "receive release acquire %s from %i",
454 FPMAS_C_STR(update.id), status.source);
455 if(this->handleReleaseAcquire(update, request_to_wait)){
456 return true;
457 }
458 }
459
460 // Check unlock
461 if(id_mpi.Iprobe(MPI_ANY_SOURCE, epoch | Tag::UNLOCK, status)) {
462 DistributedId id = id_mpi.recv(status.source, status.tag);
463 FPMAS_LOGV(this->comm.getRank(), "MUTEX_SERVER", "receive unlock %s from %i",
464 FPMAS_C_STR(id), status.source);
465 if(this->handleUnlock(id, request_to_wait)) {
466 return true;
467 }
468 }
469
470 // Check shared unlock
471 if(id_mpi.Iprobe(MPI_ANY_SOURCE, epoch | Tag::UNLOCK_SHARED, status)) {
472 DistributedId id = id_mpi.recv(status.source, status.tag);
473
474 FPMAS_LOGV(this->comm.getRank(), "MUTEX_SERVER", "receive unlock shared %s from %i",
475 FPMAS_C_STR(id), status.source);
476 if(this->handleUnlockShared(id, request_to_wait)) {
477 return true;
478 }
479 }
480 return false;
481 }
482
483 template<typename T>
484 void MutexServer<T>::wait(const Request& request_to_wait) {
485 FPMAS_LOGD(comm.getRank(), "MUTEX_SERVER",
486 "Waiting for local request to node %s to complete...",
487 FPMAS_C_STR(request_to_wait.id));
488 bool request_processed = false;
489 while(!request_processed) {
490 request_processed = handleIncomingRequests(request_to_wait);
491 server_pack.linkServer().handleIncomingRequests();
492 }
493 FPMAS_LOGD(comm.getRank(), "MUTEX_SERVER",
494 "Handling local request to node %s.",
495 FPMAS_C_STR(request_to_wait.id));
496 }
497
498 template<typename T>
500 FPMAS_LOGD(comm.getRank(), "MUTEX_SERVER",
501 "Notifying released node %s", FPMAS_C_STR(id));
502 auto* mutex = mutex_map.find(id)->second;
503 return respondToRequests(mutex);
504 }
505}}}
506#endif
Definition: communication.h:251
Definition: communication.h:637
Definition: distributed_id.h:89
Definition: mutex_server.h:26
void setEpoch(api::Epoch e) override
Definition: mutex_server.h:90
Epoch getEpoch() const override
Definition: mutex_server.h:91
void remove(DistributedId id) override
Definition: mutex_server.h:96
void wait(const Request &) override
Definition: mutex_server.h:484
const std::unordered_map< DistributedId, HardSyncMutex * > & getManagedMutexes() const
Definition: mutex_server.h:105
void notify(DistributedId) override
Definition: mutex_server.h:499
void manage(DistributedId id, HardSyncMutex *mutex) override
Definition: mutex_server.h:93
MutexServer(MpiComm &comm, IdMpi &id_mpi, DataMpi &data_mpi, DataUpdateMpi &data_update_mpi, ServerPackBase &server_pack)
Definition: mutex_server.h:80
void handleIncomingRequests() override
Definition: mutex_server.h:160
Definition: server_pack.h:39
Definition: hard_sync_mode.h:24
Definition: client_server.h:208
#define FPMAS_C_STR(arg)
Definition: macros.h:24
@ LOCAL
Definition: location_state.h:21
Epoch
Definition: enums.h:15
MutexRequestType
Definition: enums.h:54
Tag
Definition: enums.h:23
Definition: fpmas.cpp:3
Definition: communication.h:162
Definition: communication.h:215
int tag
Definition: communication.h:236
int source
Definition: communication.h:232
Definition: data_update_pack.h:16
DistributedId id
Definition: data_update_pack.h:20
Definition: client_server.h:15
DistributedId id
Definition: client_server.h:24