fpmas 1.6
hard_sync_mutex.h
Go to the documentation of this file.
1#ifndef FPMAS_HARD_SYNC_MUTEX_H
2#define FPMAS_HARD_SYNC_MUTEX_H
3
8#include "mutex_client.h"
9#include "mutex_server.h"
10#include "../synchro.h"
11
12namespace fpmas { namespace synchro { namespace hard {
13
16
24 template<typename T>
26 private:
30
32 bool _locked = false;
33 int _locked_shared = 0;
34
35 MutexClient& mutex_client;
36 MutexServer& mutex_server;
37
38 std::queue<Request> lock_requests;
39 std::queue<Request> lock_shared_requests;
40
41 void _lock() override {_locked=true;}
42 void _lockShared() override {_locked_shared++;}
43 void _unlock() override {_locked=false;}
44 void _unlockShared() override {_locked_shared--;}
45 public:
57 MutexClient& mutex_client,
58 MutexServer& mutex_server)
59 : node(node), mutex_client(mutex_client), mutex_server(mutex_server) {}
60
61 void pushRequest(Request request) override;
62 std::queue<Request> requestsToProcess() override;
63
67 T& data() override {return node->data();}
71 const T& data() const override {return node->data();}
72
73 const T& read() override;
74 void releaseRead() override;
75
76 T& acquire() override;
77 void releaseAcquire() override;
78
79 void lock() override;
80 void unlock() override;
84 bool locked() const override {return _locked;}
85
86 void lockShared() override;
87 void unlockShared() override;
91 int sharedLockCount() const override {return _locked_shared;}
92
93 void synchronize() override {};
94 };
95
110 template<typename T>
112 if(node->state() == LocationState::LOCAL) {
113 if(_locked) {
114 Request req = Request(node->getId(), Request::LOCAL, MutexRequestType::READ);
115 pushRequest(req);
116 mutex_server.wait(req);
117 }
118 _locked_shared++;
119 return node->data();
120 }
122 node->data(),
123 mutex_client.read(node->getId(), node->location())
124 );
125 return node->data();
126 }
127
142 template<typename T>
144 if(node->state() == LocationState::LOCAL) {
145 _locked_shared--;
146 if(_locked_shared==0) {
147 mutex_server.notify(node->getId());
148 }
149 return;
150 }
151 mutex_client.releaseRead(node->getId(), node->location());
152 }
153
168 template<typename T>
170 if(node->state()==LocationState::LOCAL) {
171 if(_locked || _locked_shared > 0) {
172 Request req = Request(node->getId(), Request::LOCAL, MutexRequestType::ACQUIRE);
173 pushRequest(req);
174 mutex_server.wait(req);
175 }
176 this->_locked = true;
177 return node->data();
178 }
179
181 node->data(),
182 mutex_client.acquire(node->getId(), node->location())
183 );
184 return node->data();
185 }
186
201 template<typename T>
203 if(node->state()==LocationState::LOCAL) {
204 this->_locked = false;
205 mutex_server.notify(node->getId());
206 return;
207 }
208 mutex_client.releaseAcquire(node->getId(), node->data(), node->location());
209 }
210
225 template<typename T>
227 if(node->state()==LocationState::LOCAL) {
228 if(_locked || _locked_shared > 0) {
229 Request req = Request(node->getId(), Request::LOCAL, MutexRequestType::LOCK);
230 pushRequest(req);
231 mutex_server.wait(req);
232 }
233 this->_locked = true;
234 return;
235 }
236 mutex_client.lock(node->getId(), node->location());
237 }
238
252 template<typename T>
254 if(node->state()==LocationState::LOCAL) {
255 // TODO : this is NOT thread safe
256 this->_locked = false;
257 mutex_server.notify(node->getId());
258 return;
259 }
260 mutex_client.unlock(node->getId(), node->location());
261 }
262
277 template<typename T>
279 if(node->state()==LocationState::LOCAL) {
280 if(_locked) {
281 Request req = Request(node->getId(), Request::LOCAL, MutexRequestType::LOCK_SHARED);
282 pushRequest(req);
283 mutex_server.wait(req);
284 }
285 _locked_shared++;
286 return;
287 }
288 mutex_client.lockShared(node->getId(), node->location());
289 }
290
305 template<typename T>
307 if(node->state()==LocationState::LOCAL) {
308 _locked_shared--;
309 if(_locked_shared==0) {
310 mutex_server.notify(node->getId());
311 }
312 return;
313 }
314 mutex_client.unlockShared(node->getId(), node->location());
315 }
316
317 template<typename T>
319 switch(request.type) {
320 case MutexRequestType::READ :
321 lock_shared_requests.push(request);
322 break;
323 case MutexRequestType::LOCK :
324 lock_requests.push(request);
325 break;
326 case MutexRequestType::ACQUIRE :
327 lock_requests.push(request);
328 break;
329 case MutexRequestType::LOCK_SHARED:
330 lock_shared_requests.push(request);
331 }
332 }
333
334 // TODO : this must be the job of a ReadersWriters component
335 template<typename T>
336 std::queue<typename HardSyncMutex<T>::Request> HardSyncMutex<T>::requestsToProcess() {
337 std::queue<Request> requests;
338 if(_locked) {
339 return requests;
340 }
341 while(!lock_shared_requests.empty()) {
342 Request lock_shared_request = lock_shared_requests.front();
343 requests.push(lock_shared_request);
344 lock_shared_requests.pop();
345 if(lock_shared_request.source == Request::LOCAL) {
346 // Immediately returns, so that the last request processed
347 // is the local request
348 return requests;
349 }
350 }
351 if(!lock_requests.empty()) {
352 requests.push(lock_requests.front());
353 lock_requests.pop();
354 }
355 return requests;
356 }
357}}}
358#endif
Definition: distributed_node.h:28
Definition: hard_sync_mutex.h:25
T & data() override
Definition: hard_sync_mutex.h:67
HardSyncMutex(fpmas::api::graph::DistributedNode< T > *node, MutexClient &mutex_client, MutexServer &mutex_server)
Definition: hard_sync_mutex.h:55
void lockShared() override
Definition: hard_sync_mutex.h:278
std::queue< Request > requestsToProcess() override
Definition: hard_sync_mutex.h:336
void lock() override
Definition: hard_sync_mutex.h:226
void pushRequest(Request request) override
Definition: hard_sync_mutex.h:318
int sharedLockCount() const override
Definition: hard_sync_mutex.h:91
void releaseRead() override
Definition: hard_sync_mutex.h:143
const T & data() const override
Definition: hard_sync_mutex.h:71
void unlock() override
Definition: hard_sync_mutex.h:253
void synchronize() override
Definition: hard_sync_mutex.h:93
void releaseAcquire() override
Definition: hard_sync_mutex.h:202
void unlockShared() override
Definition: hard_sync_mutex.h:306
const T & read() override
Definition: hard_sync_mutex.h:111
bool locked() const override
Definition: hard_sync_mutex.h:84
T & acquire() override
Definition: hard_sync_mutex.h:169
Definition: hard_sync_mode.h:24
Definition: client_server.h:73
Definition: client_server.h:208
LocationState
Definition: location_state.h:15
@ LOCAL
Definition: location_state.h:21
MutexRequestType
Definition: enums.h:54
Definition: fpmas.cpp:3
static void update(T &local_data, T &&updated_data)
Definition: synchro.h:35
Definition: client_server.h:15
MutexRequestType type
Definition: client_server.h:32
int source
Definition: client_server.h:28