fpmas 1.6
server_pack.h
Go to the documentation of this file.
1#ifndef FPMAS_SERVER_PACK_H
2#define FPMAS_SERVER_PACK_H
3
10#include "fpmas/utils/log.h"
11
12namespace fpmas { namespace synchro { namespace hard {
13
17 class VoidServer : public api::Server {
18 public:
19 void setEpoch(api::Epoch) override {};
20 api::Epoch getEpoch() const override {
21 return {};
22 };
23 void handleIncomingRequests() override {};
24 };
25
39 class ServerPackBase : public api::Server {
40 typedef api::Epoch Epoch;
41
42 private:
43 std::vector<fpmas::api::communication::Request> pending_requests;
44
46 api::TerminationAlgorithm& termination;
47 api::Server& mutex_server;
48 api::Server& link_server;
49 Epoch epoch;
50
51 public:
64 api::TerminationAlgorithm& termination,
65 api::Server& mutex_server,
66 api::Server& link_server)
67 : comm(comm), termination(termination), mutex_server(mutex_server), link_server(link_server) {
68 setEpoch(Epoch::EVEN);
69 }
70
75 return mutex_server;
76 }
77
82 return link_server;
83 }
84
89 void handleIncomingRequests() override {
90 mutex_server.handleIncomingRequests();
91 link_server.handleIncomingRequests();
92 }
93
100 void setEpoch(Epoch epoch) override {
101 this->epoch = epoch;
102 mutex_server.setEpoch(epoch);
103 link_server.setEpoch(epoch);
104 }
105
112 Epoch getEpoch() const override {
113 return epoch;
114 }
115
119 void terminate() {
120 FPMAS_LOGV(comm.getRank(), "SERVER_PACK", "wait all...", "");
121 // Applies the termination algorithm: keeps handling requests
122 termination.terminate(*this);
123
124 // Completes Isend responses performed while handling requests
125 // since the last terminate() call.
126 comm.waitAll(pending_requests);
127 pending_requests.clear();
128 }
129
148 std::vector<fpmas::api::communication::Request>& pendingRequests() {
149 return pending_requests;
150 }
151
164 FPMAS_LOGV(comm.getRank(), "SERVER_PACK", "wait for send...", "");
165 bool sent = comm.test(req);
166
167 while(!sent) {
169 sent = comm.test(req);
170 }
171 FPMAS_LOGV(comm.getRank(), "SERVER_PACK", "Request sent.", "");
172 }
173
195 template<typename T>
198 int source, api::Tag tag,
200 FPMAS_LOGV(comm.getRank(), "SERVER_PACK", "wait for response...", "");
201 bool response_available = mpi.Iprobe(source, getEpoch() | tag, status);
202
203 while(!response_available) {
205 response_available = mpi.Iprobe(source, getEpoch() | tag, status);
206 }
207 FPMAS_LOGV(comm.getRank(), "SERVER_PACK", "Response available.", "");
208 }
209
233 int source, api::Tag tag,
235 FPMAS_LOGV(comm.getRank(), "SERVER_PACK", "wait for response...", "");
236 bool response_available = comm.Iprobe(
238 source, getEpoch() | tag, status);
239
240 while(!response_available) {
242 response_available = comm.Iprobe(
244 source, getEpoch() | tag, status);
245 }
246 FPMAS_LOGV(comm.getRank(), "SERVER_PACK", "Response available.", "");
247 }
248
249 };
250
251
256 template<typename MutexServer, typename LinkServer>
257 class ServerPack : public ServerPackBase {
258 private:
259 MutexServer& mutex_server;
260 LinkServer& link_server;
261
262 public:
275 api::TerminationAlgorithm& termination,
276 MutexServer& mutex_server, LinkServer& link_server) :
277 ServerPackBase(comm, termination, mutex_server, link_server),
278 mutex_server(mutex_server), link_server(link_server) {
279 }
280
282 return mutex_server;
283 }
284
285 LinkServer& linkServer() override {
286 return link_server;
287 }
288 };
289}}}
290#endif
Definition: communication.h:251
virtual bool test(Request &request)=0
virtual void waitAll(std::vector< Request > &requests)=0
static MPI_Datatype IGNORE_TYPE
Definition: communication.h:259
virtual bool Iprobe(MPI_Datatype type, int source, int tag, Status &status)=0
Definition: communication.h:637
virtual bool Iprobe(int source, int tag, Status &status)=0
Definition: mutex_server.h:26
Definition: server_pack.h:39
ServerPackBase(fpmas::api::communication::MpiCommunicator &comm, api::TerminationAlgorithm &termination, api::Server &mutex_server, api::Server &link_server)
Definition: server_pack.h:62
Epoch getEpoch() const override
Definition: server_pack.h:112
void terminate()
Definition: server_pack.h:119
void setEpoch(Epoch epoch) override
Definition: server_pack.h:100
void waitVoidResponse(fpmas::api::communication::MpiCommunicator &comm, int source, api::Tag tag, fpmas::api::communication::Status &status)
Definition: server_pack.h:231
virtual api::Server & linkServer()
Definition: server_pack.h:81
std::vector< fpmas::api::communication::Request > & pendingRequests()
Definition: server_pack.h:148
void waitResponse(fpmas::api::communication::TypedMpi< T > &mpi, int source, api::Tag tag, fpmas::api::communication::Status &status)
Definition: server_pack.h:196
virtual api::Server & mutexServer()
Definition: server_pack.h:74
void handleIncomingRequests() override
Definition: server_pack.h:89
void waitSendRequest(fpmas::api::communication::Request &req)
Definition: server_pack.h:163
Definition: server_pack.h:257
ServerPack(fpmas::api::communication::MpiCommunicator &comm, api::TerminationAlgorithm &termination, MutexServer &mutex_server, LinkServer &link_server)
Definition: server_pack.h:273
LinkServer & linkServer() override
Definition: server_pack.h:285
MutexServer & mutexServer() override
Definition: server_pack.h:281
Definition: server_pack.h:17
api::Epoch getEpoch() const override
Definition: server_pack.h:20
void handleIncomingRequests() override
Definition: server_pack.h:23
void setEpoch(api::Epoch) override
Definition: server_pack.h:19
Definition: client_server.h:161
virtual void handleIncomingRequests()=0
virtual void setEpoch(Epoch epoch)=0
virtual void terminate(Server &server)=0
Epoch
Definition: enums.h:15
Tag
Definition: enums.h:23
Definition: fpmas.cpp:3
Definition: communication.h:162
Definition: communication.h:215