1#ifndef FPMAS_MUTEX_SERVER_H
2#define FPMAS_MUTEX_SERVER_H
11#include "../data_update_pack.h"
12#include "fpmas/utils/log.h"
15namespace fpmas {
namespace synchro {
namespace hard {
36 Epoch epoch = Epoch::EVEN;
37 std::unordered_map<DistributedId, HardSyncMutex*> mutex_map;
44 void handleIncomingReadAcquireLock();
85 comm(comm), id_mpi(id_mpi), data_mpi(data_mpi),
86 data_update_mpi(data_update_mpi),
87 server_pack(server_pack) {
94 mutex_map[id] = mutex;
111 void wait(
const Request&)
override;
116 void MutexServer<T>::handleIncomingReadAcquireLock() {
120 if(id_mpi.Iprobe(MPI_ANY_SOURCE, epoch | Tag::READ, status)) {
122 FPMAS_LOGD(this->comm.getRank(),
"MUTEX_SERVER",
"receive read request %s from %i",
FPMAS_C_STR(
id), status.
source);
123 this->handleRead(
id, status.
source);
127 if(id_mpi.Iprobe(MPI_ANY_SOURCE, epoch | Tag::ACQUIRE, status)) {
129 FPMAS_LOGD(this->comm.getRank(),
"MUTEX_SERVER",
"receive acquire request %s from %i",
FPMAS_C_STR(
id), status.
source);
130 this->handleAcquire(
id, status.
source);
134 if(id_mpi.Iprobe(MPI_ANY_SOURCE, epoch | Tag::LOCK, status)) {
136 FPMAS_LOGD(this->comm.getRank(),
"MUTEX_SERVER",
"receive lock request %s from %i",
FPMAS_C_STR(
id), status.
source);
137 this->handleLock(
id, status.
source);
141 if(id_mpi.Iprobe(MPI_ANY_SOURCE, epoch | Tag::LOCK_SHARED, status)) {
143 FPMAS_LOGD(this->comm.getRank(),
"MUTEX_SERVER",
"receive shared lock request %s from %i",
FPMAS_C_STR(
id), status.
source);
144 this->handleLockShared(
id, status.
source);
162 handleIncomingReadAcquireLock();
165 if(data_update_mpi.Iprobe(MPI_ANY_SOURCE, epoch | Tag::RELEASE_ACQUIRE, status)) {
167 FPMAS_LOGV(this->comm.getRank(),
"MUTEX_SERVER",
"receive release acquire %s from %i",
170 this->handleReleaseAcquire(update);
174 if(id_mpi.Iprobe(MPI_ANY_SOURCE, epoch | Tag::UNLOCK, status)) {
176 FPMAS_LOGV(this->comm.getRank(),
"MUTEX_SERVER",
"receive unlock %s from %i",
179 this->handleUnlock(
id);
183 if(id_mpi.Iprobe(MPI_ANY_SOURCE, epoch | Tag::UNLOCK_SHARED, status)) {
185 FPMAS_LOGV(this->comm.getRank(),
"MUTEX_SERVER",
"receive unlock shared %s from %i",
188 this->handleUnlockShared(
id);
200 auto* mutex = mutex_map.find(
id)->second;
201 if(mutex->locked()) {
202 FPMAS_LOGV(comm.getRank(),
"MUTEX_SERVER",
"Enqueueing READ request of node %s for %i",
204 mutex->pushRequest(Request(
id, source, MutexRequestType::READ));
206 respondToRead(
id, source);
215 void MutexServer<T>::respondToRead(
DistributedId id,
int source) {
216 auto* mutex = mutex_map.find(
id)->second;
217 this->MutexServerBase::lockShared(mutex);
219 const T& data = mutex->data();
220 server_pack.pendingRequests().emplace_back(
223 data, source, epoch | Tag::READ_RESPONSE,
224 server_pack.pendingRequests().back());
234 void MutexServer<T>::handleAcquire(
DistributedId id,
int source) {
235 auto* mutex = mutex_map.find(
id)->second;
236 if(mutex->locked() || mutex->sharedLockCount() > 0) {
237 FPMAS_LOGV(comm.getRank(),
"MUTEX_SERVER",
"Enqueueing ACQUIRE request of node %s for %i",
239 mutex->pushRequest(Request(
id, source, MutexRequestType::ACQUIRE));
241 respondToAcquire(
id, source);
251 void MutexServer<T>::respondToAcquire(
DistributedId id,
int source) {
252 auto* mutex = mutex_map.find(
id)->second;
253 this->MutexServerBase::lock(mutex);
254 const T& data = mutex->data();
255 server_pack.pendingRequests().emplace_back(
258 data, source, epoch | Tag::ACQUIRE_RESPONSE,
259 server_pack.pendingRequests().back()
264 void MutexServer<T>::respondToRequests(HardSyncMutex* mutex) {
265 FPMAS_LOGV(comm.getRank(),
"MUTEX_SERVER",
"Unqueueing requests...",
"");
266 std::queue<Request> requests = mutex->requestsToProcess();
267 while(!requests.empty()) {
268 Request request = requests.front();
269 switch(request.type) {
270 case MutexRequestType::READ :
271 FPMAS_LOGV(comm.getRank(),
"MUTEX_SERVER",
"%i->READ(%s)",
273 respondToRead(request.id, request.source);
275 case MutexRequestType::LOCK :
276 FPMAS_LOGV(comm.getRank(),
"MUTEX_SERVER",
"%i->LOCK(%s)",
278 respondToLock(request.id, request.source);
280 case MutexRequestType::ACQUIRE :
281 FPMAS_LOGV(comm.getRank(),
"MUTEX_SERVER",
"%i->ACQUIRE(%s)",
283 respondToAcquire(request.id, request.source);
285 case MutexRequestType::LOCK_SHARED :
286 FPMAS_LOGV(comm.getRank(),
"MUTEX_SERVER",
"%i->LOCK_SHARED(%s)",
288 respondToLockShared(request.id, request.source);
295 void MutexServer<T>::handleReleaseAcquire(DataUpdatePack<T>& update) {
296 auto* mutex = mutex_map.find(update.id)->second;
297 this->MutexServerBase::unlock(mutex);
298 mutex->data() = std::move(update.updated_data);
300 respondToRequests(mutex);
310 void MutexServer<T>::handleLock(
DistributedId id,
int source) {
311 auto* mutex = mutex_map.find(
id)->second;
312 if(mutex->locked() || mutex->sharedLockCount() > 0) {
313 FPMAS_LOGV(comm.getRank(),
"MUTEX_SERVER",
"Enqueueing LOCK request of node %s for %i",
315 mutex->pushRequest(Request(
id, source, MutexRequestType::LOCK));
317 respondToLock(
id, source);
327 void MutexServer<T>::respondToLock(
DistributedId id,
int source) {
328 auto* mutex = mutex_map.find(
id)->second;
329 this->MutexServerBase::lock(mutex);
331 server_pack.pendingRequests().emplace_back(
333 comm.Isend(source, epoch | Tag::LOCK_RESPONSE,
334 server_pack.pendingRequests().back());
339 auto* mutex = mutex_map.find(
id)->second;
340 this->MutexServerBase::unlock(mutex);
342 respondToRequests(mutex);
346 void MutexServer<T>::handleLockShared(
DistributedId id,
int source) {
347 auto* mutex = mutex_map.find(
id)->second;
348 if(mutex->locked()) {
349 FPMAS_LOGV(comm.getRank(),
"MUTEX_SERVER",
"Enqueueing LOCK_SHARED request of node %s for %i",
351 mutex->pushRequest(Request(
id, source, MutexRequestType::LOCK_SHARED));
353 respondToLockShared(
id, source);
358 void MutexServer<T>::respondToLockShared(
DistributedId id,
int source) {
359 auto* mutex = mutex_map.find(
id)->second;
360 this->MutexServerBase::lockShared(mutex);
362 server_pack.pendingRequests().emplace_back(
364 comm.Isend(source, epoch | Tag::LOCK_SHARED_RESPONSE,
365 server_pack.pendingRequests().back());
370 auto* mutex = mutex_map.find(
id)->second;
371 this->MutexServerBase::unlockShared(mutex);
375 respondToRequests(mutex);
382 bool MutexServer<T>::respondToRequests(HardSyncMutex* mutex,
const Request& request_to_wait) {
383 FPMAS_LOGV(comm.getRank(),
"MUTEX_SERVER",
"Unqueueing requests...",
"");
384 std::queue<Request> requests = mutex->requestsToProcess();
385 while(!requests.empty()) {
386 Request request = requests.front();
388 switch(request.type) {
389 case MutexRequestType::READ :
390 FPMAS_LOGV(comm.getRank(),
"MUTEX_SERVER",
"%i->READ(%s)",
392 respondToRead(request.id, request.source);
394 case MutexRequestType::LOCK :
395 FPMAS_LOGV(comm.getRank(),
"MUTEX_SERVER",
"%i->LOCK(%s)",
397 respondToLock(request.id, request.source);
399 case MutexRequestType::ACQUIRE :
400 FPMAS_LOGV(comm.getRank(),
"MUTEX_SERVER",
"%i->ACQUIRE(%s)",
402 respondToAcquire(request.id, request.source);
404 case MutexRequestType::LOCK_SHARED :
405 FPMAS_LOGV(comm.getRank(),
"MUTEX_SERVER",
"%i->LOCK_SHARED(%s)",
407 respondToLockShared(request.id, request.source);
411 if(request == request_to_wait) {
420 bool MutexServer<T>::handleReleaseAcquire(DataUpdatePack<T>& update,
const Request& request_to_wait) {
421 auto* mutex = mutex_map.find(update.id)->second;
422 this->MutexServerBase::unlock(mutex);
423 mutex->data() = std::move(update.updated_data);
425 return respondToRequests(mutex, request_to_wait);
429 bool MutexServer<T>::handleUnlock(
DistributedId id,
const Request& request_to_wait) {
430 auto* mutex = mutex_map.find(
id)->second;
431 this->MutexServerBase::unlock(mutex);
433 return respondToRequests(mutex, request_to_wait);
437 bool MutexServer<T>::handleUnlockShared(
DistributedId id,
const Request& request_to_wait) {
438 auto* mutex = mutex_map.find(
id)->second;
439 this->MutexServerBase::unlockShared(mutex);
441 return respondToRequests(mutex, request_to_wait);
446 handleIncomingReadAcquireLock();
451 if(data_update_mpi.Iprobe(MPI_ANY_SOURCE, epoch | Tag::RELEASE_ACQUIRE, status)) {
452 DataUpdatePack<T> update = data_update_mpi.recv(status.
source, status.
tag);
453 FPMAS_LOGV(this->comm.getRank(),
"MUTEX_SERVER",
"receive release acquire %s from %i",
455 if(this->handleReleaseAcquire(update, request_to_wait)){
461 if(id_mpi.Iprobe(MPI_ANY_SOURCE, epoch | Tag::UNLOCK, status)) {
463 FPMAS_LOGV(this->comm.getRank(),
"MUTEX_SERVER",
"receive unlock %s from %i",
465 if(this->handleUnlock(
id, request_to_wait)) {
471 if(id_mpi.Iprobe(MPI_ANY_SOURCE, epoch | Tag::UNLOCK_SHARED, status)) {
474 FPMAS_LOGV(this->comm.getRank(),
"MUTEX_SERVER",
"receive unlock shared %s from %i",
476 if(this->handleUnlockShared(
id, request_to_wait)) {
485 FPMAS_LOGD(comm.getRank(),
"MUTEX_SERVER",
486 "Waiting for local request to node %s to complete...",
488 bool request_processed =
false;
489 while(!request_processed) {
490 request_processed = handleIncomingRequests(request_to_wait);
491 server_pack.linkServer().handleIncomingRequests();
493 FPMAS_LOGD(comm.getRank(),
"MUTEX_SERVER",
494 "Handling local request to node %s.",
500 FPMAS_LOGD(comm.getRank(),
"MUTEX_SERVER",
502 auto* mutex = mutex_map.find(
id)->second;
503 return respondToRequests(mutex);
Definition: communication.h:251
Definition: communication.h:637
Definition: distributed_id.h:89
Definition: mutex_server.h:26
void setEpoch(api::Epoch e) override
Definition: mutex_server.h:90
Epoch getEpoch() const override
Definition: mutex_server.h:91
void remove(DistributedId id) override
Definition: mutex_server.h:96
void wait(const Request &) override
Definition: mutex_server.h:484
const std::unordered_map< DistributedId, HardSyncMutex * > & getManagedMutexes() const
Definition: mutex_server.h:105
void notify(DistributedId) override
Definition: mutex_server.h:499
void manage(DistributedId id, HardSyncMutex *mutex) override
Definition: mutex_server.h:93
MutexServer(MpiComm &comm, IdMpi &id_mpi, DataMpi &data_mpi, DataUpdateMpi &data_update_mpi, ServerPackBase &server_pack)
Definition: mutex_server.h:80
void handleIncomingRequests() override
Definition: mutex_server.h:160
Definition: server_pack.h:39
Definition: hard_sync_mode.h:24
Definition: client_server.h:208
#define FPMAS_C_STR(arg)
Definition: macros.h:24
@ LOCAL
Definition: location_state.h:21
Epoch
Definition: enums.h:15
MutexRequestType
Definition: enums.h:54
Tag
Definition: enums.h:23
Definition: communication.h:162
Definition: communication.h:215
int tag
Definition: communication.h:236
int source
Definition: communication.h:232
Definition: data_update_pack.h:16
DistributedId id
Definition: data_update_pack.h:20
Definition: client_server.h:15
DistributedId id
Definition: client_server.h:24