fpmas 1.6
hard_sync_linker.h
Go to the documentation of this file.
1#ifndef FPMAS_HARD_SYNC_LINKER_H
2#define FPMAS_HARD_SYNC_LINKER_H
3
8#include <set>
9
10#include "fpmas/utils/macros.h"
11
15#include "server_pack.h"
16#include "termination.h"
17
18namespace fpmas { namespace synchro { namespace hard {
19 using api::Tag;
20 using api::Epoch;
22
23 namespace hard_link {
27 template<typename T>
28 class LinkServer : public api::LinkServer {
29 public:
42 private:
43 Epoch epoch = Epoch::EVEN;
44
47 IdMpi& id_mpi;
48 EdgeMpi& edge_mpi;
49 std::set<DistributedId> locked_unlink_edges;
50 std::set<DistributedId> locked_remove_nodes;
51
52 public:
64 IdMpi& id_mpi, EdgeMpi& edge_mpi) :
65 comm(comm), graph(graph),
66 id_mpi(id_mpi), edge_mpi(edge_mpi) {}
67
68 Epoch getEpoch() const override {return epoch;}
69 void setEpoch(api::Epoch epoch) override {this->epoch = epoch;}
70
71 void handleIncomingRequests() override;
72
73
74 void lockUnlink(DistributedId edge_id) override {
75 locked_unlink_edges.insert(edge_id);
76 }
77
78 bool isLockedUnlink(DistributedId edge_id) override {
79 return locked_unlink_edges.count(edge_id) > 0;
80 }
81
82 void unlockUnlink(DistributedId edge_id) override {
83 locked_unlink_edges.erase(edge_id);
84 }
85
86 void lockRemoveNode(DistributedId node_id) override {
87 locked_remove_nodes.insert(node_id);
88 }
89
90 bool isLockedRemoveNode(DistributedId node_id) override {
91 return locked_remove_nodes.count(node_id);
92 }
93
94 void unlockRemoveNode(DistributedId node_id) override {
95 locked_remove_nodes.erase(node_id);
96 }
97 };
98
99 template<typename T>
102 // Check read request
103 if(edge_mpi.Iprobe(MPI_ANY_SOURCE, epoch | Tag::LINK, status)) {
104 EdgeApi* edge = edge_mpi.recv(status.source, status.tag);
105 FPMAS_LOGD(this->comm.getRank(), "LINK_SERVER",
106 "receive link request from %i", status.source);
107 graph.importEdge(edge);
108 }
109 if(id_mpi.Iprobe(MPI_ANY_SOURCE, epoch | Tag::UNLINK, status)) {
110 DistributedId unlink_id = id_mpi.recv(status.source, status.tag);
111 FPMAS_LOGD(this->comm.getRank(), "LINK_SERVER",
112 "receive unlink request %s from %i",
113 FPMAS_C_STR(unlink_id), status.source);
114 if(!isLockedUnlink(unlink_id)) {
115 // The edge is not being unlinked by the local process
116 const auto& edges = graph.getEdges();
117 auto it = edges.find(unlink_id);
118 if(it != edges.end()) {
119 // The edge has not been unlinked by an other UNLINK
120 // operation
121 auto* edge = it->second;
122 // Source or target node is not being removed by the local
123 // process. In this case, the local process is responsible
124 // for all the required unlink operations, so the incoming
125 // request is ignored.
126 if(!(isLockedRemoveNode(edge->getSourceNode()->getId())
127 || isLockedRemoveNode(edge->getTargetNode()->getId()))) {
128 graph.erase(edge);
129 }
130 }
131 }
132 }
133 if(id_mpi.Iprobe(MPI_ANY_SOURCE, epoch | Tag::REMOVE_NODE, status)) {
134 DistributedId node_id = id_mpi.recv(status.source, status.tag);
135 FPMAS_LOGD(this->comm.getRank(), "LINK_SERVER",
136 "receive remove node request %s from %i",
137 FPMAS_C_STR(node_id), status.source);
138
139 // Initiates a removeNode operation from the local process,
140 // that will trigger all required UNLINK operations
141 graph.removeNode(graph.getNode(node_id));
142 }
143 }
144
148 template<typename T>
149 class LinkClient : public api::LinkClient<T> {
150 public:
167
168 private:
170 IdMpi& id_mpi;
171 EdgeMpi& edge_mpi;
172 ServerPackBase& server_pack;
173
174 public:
188 IdMpi& id_mpi, EdgeMpi& edge_mpi,
189 ServerPackBase& server_pack) :
190 comm(comm), id_mpi(id_mpi), edge_mpi(edge_mpi),
191 server_pack(server_pack) {}
192
193 void link(const EdgeApi*) override;
194 void unlink(const EdgeApi*) override;
195 void removeNode(const NodeApi*) override;
196 };
197
198 template<typename T>
199 void LinkClient<T>::link(const EdgeApi* edge) {
200 bool distant_src =
202 bool distant_tgt =
204
205 if(edge->getSourceNode()->location() != edge->getTargetNode()->location()) {
206 // The edge is DISTANT, so at least of the two nodes is
207 // DISTANT. In this case, if the two nodes are
208 // DISTANT, they don't have the same location, so two
209 // requests must be performed.
212 // Simultaneously initiate the two requests, that are potentially
213 // made to different procs
214 if(distant_src) {
215 edge_mpi.Issend(
216 const_cast<EdgeApi*>(edge), edge->getSourceNode()->location(),
217 server_pack.getEpoch() | Tag::LINK, req_src
218 );
219 }
220 if(distant_tgt) {
221 edge_mpi.Issend(
222 const_cast<EdgeApi*>(edge), edge->getTargetNode()->location(),
223 server_pack.getEpoch() | Tag::LINK, req_tgt
224 );
225 }
226 // Sequentially waits for each request : if req_tgt completes
227 // before req_src, the second waitSendRequest will immediatly return
228 // so there is no performance issue.
229 if(distant_src) {
230 server_pack.waitSendRequest(req_src);
231 }
232 if(distant_tgt) {
233 server_pack.waitSendRequest(req_tgt);
234 }
235 } else {
236 // The edge is DISTANT, and its source and target nodes
237 // locations are the same, so the two nodes are necessarily
238 // located on the same DISTANT proc : only need to perform
239 // one request
241 edge_mpi.Issend(
242 const_cast<EdgeApi*>(edge), edge->getSourceNode()->location(),
243 server_pack.getEpoch() | Tag::LINK, req
244 );
245 server_pack.waitSendRequest(req);
246 }
247 }
248
249 template<typename T>
250 void LinkClient<T>::unlink(const EdgeApi* edge) {
251
252 bool distant_src
254 bool distant_tgt
256
259 // Simultaneously initiate the two requests, that are potentially
260 // made to different procs
261 if(distant_src) {
262 this->id_mpi.Issend(
263 edge->getId(), edge->getSourceNode()->location(),
264 server_pack.getEpoch() | Tag::UNLINK, req_src
265 );
266 }
267 if(distant_tgt) {
268 this->id_mpi.Issend(
269 edge->getId(), edge->getTargetNode()->location(),
270 server_pack.getEpoch() | Tag::UNLINK, req_tgt
271 );
272 }
273 // Sequentially waits for each request : if req_tgt completes
274 // before req_src, the second waitSendRequest will immediatly return
275 // so there is no performance issue.
276 if(distant_src) {
277 server_pack.waitSendRequest(req_src);
278 }
279 if(distant_tgt) {
280 server_pack.waitSendRequest(req_tgt);
281 }
282 }
283
284 template<typename T>
287 this->id_mpi.Issend(
288 node->getId(), node->location(),
289 server_pack.getEpoch() | Tag::REMOVE_NODE, req
290 );
291
292 server_pack.waitSendRequest(req);
293 }
294
295
309 template<typename T>
311 public:
320
321 private:
322 std::vector<EdgeApi*> ghost_edges;
324 api::LinkClient<T>& link_client;
326 std::vector<fpmas::api::graph::DistributedNode<T>*> nodes_to_remove;
327
328 public:
331 nodes_to_remove.push_back(node);
332 }
333
344 api::LinkClient<T>& link_client,
346 graph(graph), link_client(link_client), server_pack(server_pack) {
347 }
348
361 void link(EdgeApi* edge) override {
362 if(edge->state() == LocationState::DISTANT) {
363 link_client.link(edge);
364
366 && edge->getTargetNode()->state() == LocationState::DISTANT) {
367 ghost_edges.push_back(const_cast<EdgeApi*>(edge));
368 }
369 }
370 };
371
381 void unlink(EdgeApi* edge) override {
382 // Prevents other processes to unlink the edge while the local
383 // process is unlinking it. In this case, incoming unlink requests
384 // have no effect. (see LinkServer::handleIncomingRequests())
385 server_pack.linkServer().lockUnlink(edge->getId());
386
387 if(edge->state() == LocationState::DISTANT) {
388 link_client.unlink(edge);
389 }
390
391 // Unlocks the unlink operation. The edge will actually be erased
392 // from the graph upon return, in the DistributedGraph::unlink()
393 // method.
394 server_pack.linkServer().unlockUnlink(edge->getId());
395 };
396
412 void removeNode(NodeApi* node) override {
413 if(node->state() == LocationState::DISTANT) {
414 link_client.removeNode(node);
415 } else {
416 server_pack.linkServer().lockRemoveNode(node->getId());
417 for(auto edge : node->getOutgoingEdges())
418 graph.unlink(edge);
419 for(auto edge : node->getIncomingEdges())
420 graph.unlink(edge);
421 server_pack.linkServer().unlockRemoveNode(node->getId());
422 }
424 }
425
449 void synchronize() override {
450 FPMAS_LOGI(graph.getMpiCommunicator().getRank(),
451 "HARD_SYNC_LINKER", "Synchronizing sync linker...", "");
452 for(auto edge : ghost_edges)
453 graph.erase(edge);
454 ghost_edges.clear();
455
456 server_pack.terminate();
457
458 for(auto node : nodes_to_remove) {
459 graph.erase(node);
460 }
461 nodes_to_remove.clear();
462
463 FPMAS_LOGI(graph.getMpiCommunicator().getRank(),
464 "HARD_SYNC_LINKER", "Synchronized.", "");
465 };
466 };
467 }
468
469 namespace ghost_link {
479 template<typename T>
481 private:
483 TerminationAlgorithm termination;
484 ServerPackBase& server_pack;
485
486 public:
500 ServerPackBase& server_pack) :
501 ghost::GhostSyncLinkerBase<T>(edge_mpi, id_mpi, graph),
502 color_mpi(graph.getMpiCommunicator()),
503 termination(graph.getMpiCommunicator(), color_mpi),
504 server_pack(server_pack) {
505 }
506
507
508 void synchronize() override {
509 /*
510 * As specified by the
511 * fpmas::api::graph::DistributedGraph::synchronize()
512 * method, when a graph synchronization is performed,
513 * SyncLinker::synchronize() is called before
514 * DataSync::synchronize().
515 *
516 * But, in the context of this component, DataSync
517 * operations are performed on the fly using point-to-point
518 * communications, so it is necessary to finish those
519 * operations before initializing the collective
520 * synchronize_links() communications.
521 */
522 termination.terminate(server_pack);
523 this->synchronize_links();
524 }
525 };
526
527 }
528}}}
529#endif
Definition: communication.h:251
Definition: communication.h:637
Definition: distributed_edge.h:91
virtual LocationState state() const =0
Definition: distributed_graph.h:169
virtual api::communication::MpiCommunicator & getMpiCommunicator()=0
virtual void unlink(DistributedEdge< T > *edge)=0
Definition: distributed_id.h:89
Definition: distributed_node.h:28
virtual LocationState state() const =0
virtual int location() const =0
virtual IdType getId() const =0
virtual NodeType * getSourceNode() const =0
virtual NodeType * getTargetNode() const =0
virtual void erase(NodeType *node)=0
virtual const std::vector< EdgeType * > getIncomingEdges() const =0
virtual IdType getId() const =0
virtual const std::vector< EdgeType * > getOutgoingEdges() const =0
Definition: ptr_wrapper.h:21
Definition: ghost_mode.h:215
void synchronize_links()
Definition: ghost_mode.h:291
GhostSyncLinkerBase(EdgeMpi &edge_mpi, IdMpi &id_mpi, api::graph::DistributedGraph< T > &graph)
Definition: ghost_mode.h:263
Definition: server_pack.h:39
void terminate()
Definition: server_pack.h:119
Definition: server_pack.h:257
LinkServer & linkServer() override
Definition: server_pack.h:285
Definition: termination.h:26
void terminate(api::Server &server) override
Definition: termination.cpp:15
Definition: hard_sync_mode.h:59
Definition: client_server.h:294
virtual void unlink(const fpmas::api::graph::DistributedEdge< T > *edge)=0
virtual void removeNode(const fpmas::api::graph::DistributedNode< T > *node)=0
virtual void link(const fpmas::api::graph::DistributedEdge< T > *edge)=0
Definition: client_server.h:379
Definition: client_server.h:208
#define FPMAS_C_STR(arg)
Definition: macros.h:24
LocationState
Definition: location_state.h:15
@ DISTANT
Definition: location_state.h:28
Epoch
Definition: enums.h:15
Tag
Definition: enums.h:23
Definition: fpmas.cpp:3
Definition: communication.h:162
Definition: communication.h:215
int tag
Definition: communication.h:236
int source
Definition: communication.h:232