fpmas 1.6
mutex_client.h
Go to the documentation of this file.
1#ifndef FPMAS_MUTEX_CLIENT_H
2#define FPMAS_MUTEX_CLIENT_H
3
10#include "../data_update_pack.h"
11#include "server_pack.h"
12
13namespace fpmas { namespace synchro { namespace hard {
14 using api::Epoch;
15 using api::Tag;
16
20 template<typename T>
22 public api::MutexClient<T> {
31
32 private:
33 MpiCommunicator& comm;
34 IdMpi& id_mpi;
35 DataMpi& data_mpi;
36 DataUpdateMpi& data_update_mpi;
37 ServerPackBase& server_pack;
38
39 public:
50 MpiCommunicator& comm,
51 IdMpi& id_mpi, DataMpi& data_mpi, DataUpdateMpi& data_update_mpi,
52 ServerPackBase& server_pack) :
53 comm(comm),
54 id_mpi(id_mpi), data_mpi(data_mpi), data_update_mpi(data_update_mpi),
55 server_pack(server_pack) {}
56
57 T read(DistributedId, int location) override;
58 void releaseRead(DistributedId, int location) override;
59
60 T acquire(DistributedId, int location) override;
61 void releaseAcquire(DistributedId, const T& updated_data, int location) override;
62
63 void lock(DistributedId, int location) override;
64 void unlock(DistributedId, int location) override;
65
66 void lockShared(DistributedId, int location) override;
67 void unlockShared(DistributedId, int location) override;
68 };
69
70 template<typename T>
71 T MutexClient<T>::read(DistributedId id, int location) {
72 FPMAS_LOGD(this->comm.getRank(), "MUTEX_CLIENT", "reading node %s from %i", FPMAS_C_STR(id), location);
73 // Starts non-blocking synchronous send
75 this->id_mpi.Issend(id, location, server_pack.getEpoch() | Tag::READ, req);
76
77 // Keep responding to other READ / ACQUIRE request to avoid deadlock,
78 // until the request has been received
79 server_pack.waitSendRequest(req);
80
81 // The request has been received : it is assumed that the receiving proc is
82 // now responding so we can safely wait for response without deadlocking
83 // TODO : this is not thread safe.
84 fpmas::api::communication::Status read_response_status;
85 server_pack.waitResponse(data_mpi, location, Tag::READ_RESPONSE, read_response_status);
86
87 return data_mpi.recv(read_response_status.source, read_response_status.tag);
88 }
89
90 template<typename T>
92 FPMAS_LOGV(this->comm.getRank(), "MUTEX_CLIENT", "releasing read node %s from %i", FPMAS_C_STR(id), location);
93
95 id_mpi.Issend(id, location, server_pack.getEpoch() | Tag::UNLOCK_SHARED, req);
96
97 server_pack.waitSendRequest(req);
98 }
99
100 template<typename T>
102 FPMAS_LOGD(this->comm.getRank(), "MUTEX_CLIENT", "acquiring node %s from %i", FPMAS_C_STR(id), location);
103 // Starts non-blocking synchronous send
105 this->id_mpi.Issend(id, location, server_pack.getEpoch() | Tag::ACQUIRE, req);
106
107 server_pack.waitSendRequest(req);
108
109 // The request has been received : it is assumed that the receiving proc is
110 // now responding so we can safely wait for response without deadlocking
111 // TODO : this is not thread safe.
112 fpmas::api::communication::Status acquire_response_status;
113 server_pack.waitResponse(data_mpi, location, Tag::ACQUIRE_RESPONSE, acquire_response_status);
114
115 return data_mpi.recv(acquire_response_status.source, acquire_response_status.tag);
116 }
117
118 template<typename T>
119 void MutexClient<T>::releaseAcquire(DistributedId id, const T& updated_data, int location) {
120 FPMAS_LOGV(this->comm.getRank(), "MUTEX_CLIENT", "releasing acquired node %s from %i", FPMAS_C_STR(id), location);
121 DataUpdatePack<T> update {id, updated_data};
122
124 data_update_mpi.Issend(update, location, server_pack.getEpoch() | Tag::RELEASE_ACQUIRE, req);
125
126 server_pack.waitSendRequest(req);
127 }
128
129 template<typename T>
130 void MutexClient<T>::lock(DistributedId id, int location) {
131 FPMAS_LOGD(this->comm.getRank(), "MUTEX_CLIENT", "locking node %s from %i", FPMAS_C_STR(id), location);
132
134 this->id_mpi.Issend(id, location, server_pack.getEpoch() | Tag::LOCK, req);
135
136 server_pack.waitSendRequest(req);
137
138 // The request has been received : it is assumed that the receiving proc is
139 // now responding so we can safely wait for response without deadlocking
140 // TODO : this is not thread safe.
141 fpmas::api::communication::Status lock_response_status;
142 server_pack.waitVoidResponse(comm, location, Tag::LOCK_RESPONSE, lock_response_status);
143
144 comm.recv(lock_response_status.source, lock_response_status.tag);
145 }
146
147 template<typename T>
148 void MutexClient<T>::unlock(DistributedId id, int location) {
149 FPMAS_LOGV(this->comm.getRank(), "MUTEX_CLIENT", "unlocking node %s from %i", FPMAS_C_STR(id), location);
150
152 this->id_mpi.Issend(id, location, server_pack.getEpoch() | Tag::UNLOCK, req);
153
154 server_pack.waitSendRequest(req);
155 }
156
157 template<typename T>
159 FPMAS_LOGD(this->comm.getRank(), "MUTEX_CLIENT", "share locking node %s from %i", FPMAS_C_STR(id), location);
160
162 this->id_mpi.Issend(id, location, server_pack.getEpoch() | Tag::LOCK_SHARED, req);
163
164 server_pack.waitSendRequest(req);
165
166 // The request has been received : it is assumed that the receiving proc is
167 // now responding so we can safely wait for response without deadlocking
168 fpmas::api::communication::Status lock_response_status;
169 server_pack.waitVoidResponse(comm, location, Tag::LOCK_SHARED_RESPONSE, lock_response_status);
170
171 comm.recv(lock_response_status.source, lock_response_status.tag);
172 }
173
174 template<typename T>
176 FPMAS_LOGV(this->comm.getRank(), "MUTEX_CLIENT", "share unlocking node %s from %i", FPMAS_C_STR(id), location);
177
179 this->id_mpi.Issend(id, location, server_pack.getEpoch() | Tag::UNLOCK_SHARED, req);
180
181 server_pack.waitSendRequest(req);
182 }
183}}}
184#endif
Definition: communication.h:251
Definition: communication.h:637
Definition: distributed_id.h:89
Definition: mutex_client.h:22
T acquire(DistributedId, int location) override
Definition: mutex_client.h:101
MutexClient(MpiCommunicator &comm, IdMpi &id_mpi, DataMpi &data_mpi, DataUpdateMpi &data_update_mpi, ServerPackBase &server_pack)
Definition: mutex_client.h:49
void releaseRead(DistributedId, int location) override
Definition: mutex_client.h:91
T read(DistributedId, int location) override
Definition: mutex_client.h:71
void lockShared(DistributedId, int location) override
Definition: mutex_client.h:158
void releaseAcquire(DistributedId, const T &updated_data, int location) override
Definition: mutex_client.h:119
void unlock(DistributedId, int location) override
Definition: mutex_client.h:148
void lock(DistributedId, int location) override
Definition: mutex_client.h:130
void unlockShared(DistributedId, int location) override
Definition: mutex_client.h:175
Definition: server_pack.h:39
Definition: hard_sync_mode.h:24
Definition: client_server.h:73
Definition: client_server.h:208
#define FPMAS_C_STR(arg)
Definition: macros.h:24
Epoch
Definition: enums.h:15
Tag
Definition: enums.h:23
Definition: fpmas.cpp:3
Definition: communication.h:162
Definition: communication.h:215
int tag
Definition: communication.h:236
int source
Definition: communication.h:232
Definition: data_update_pack.h:16
Definition: client_server.h:15