1#ifndef FPMAS_HARD_SYNC_LINKER_H
2#define FPMAS_HARD_SYNC_LINKER_H
18namespace fpmas {
namespace synchro {
namespace hard {
43 Epoch epoch = Epoch::EVEN;
49 std::set<DistributedId> locked_unlink_edges;
50 std::set<DistributedId> locked_remove_nodes;
65 comm(comm), graph(graph),
66 id_mpi(id_mpi), edge_mpi(edge_mpi) {}
75 locked_unlink_edges.insert(edge_id);
79 return locked_unlink_edges.count(edge_id) > 0;
83 locked_unlink_edges.erase(edge_id);
87 locked_remove_nodes.insert(node_id);
91 return locked_remove_nodes.count(node_id);
95 locked_remove_nodes.erase(node_id);
103 if(edge_mpi.Iprobe(MPI_ANY_SOURCE, epoch | Tag::LINK, status)) {
105 FPMAS_LOGD(this->comm.getRank(),
"LINK_SERVER",
106 "receive link request from %i", status.
source);
107 graph.importEdge(edge);
109 if(id_mpi.Iprobe(MPI_ANY_SOURCE, epoch | Tag::UNLINK, status)) {
111 FPMAS_LOGD(this->comm.getRank(),
"LINK_SERVER",
112 "receive unlink request %s from %i",
114 if(!isLockedUnlink(unlink_id)) {
116 const auto& edges = graph.getEdges();
117 auto it = edges.find(unlink_id);
118 if(it != edges.end()) {
121 auto* edge = it->second;
126 if(!(isLockedRemoveNode(edge->getSourceNode()->getId())
127 || isLockedRemoveNode(edge->getTargetNode()->getId()))) {
133 if(id_mpi.Iprobe(MPI_ANY_SOURCE, epoch | Tag::REMOVE_NODE, status)) {
135 FPMAS_LOGD(this->comm.getRank(),
"LINK_SERVER",
136 "receive remove node request %s from %i",
141 graph.removeNode(graph.getNode(node_id));
190 comm(comm), id_mpi(id_mpi), edge_mpi(edge_mpi),
191 server_pack(server_pack) {}
217 server_pack.getEpoch() | Tag::LINK, req_src
223 server_pack.getEpoch() | Tag::LINK, req_tgt
230 server_pack.waitSendRequest(req_src);
233 server_pack.waitSendRequest(req_tgt);
243 server_pack.getEpoch() | Tag::LINK, req
245 server_pack.waitSendRequest(req);
264 server_pack.getEpoch() | Tag::UNLINK, req_src
270 server_pack.getEpoch() | Tag::UNLINK, req_tgt
277 server_pack.waitSendRequest(req_src);
280 server_pack.waitSendRequest(req_tgt);
289 server_pack.getEpoch() | Tag::REMOVE_NODE, req
292 server_pack.waitSendRequest(req);
322 std::vector<EdgeApi*> ghost_edges;
326 std::vector<fpmas::api::graph::DistributedNode<T>*> nodes_to_remove;
331 nodes_to_remove.push_back(node);
346 graph(graph), link_client(link_client), server_pack(server_pack) {
363 link_client.
link(edge);
367 ghost_edges.push_back(
const_cast<EdgeApi*
>(edge));
451 "HARD_SYNC_LINKER",
"Synchronizing sync linker...",
"");
452 for(
auto edge : ghost_edges)
458 for(
auto node : nodes_to_remove) {
461 nodes_to_remove.clear();
464 "HARD_SYNC_LINKER",
"Synchronized.",
"");
469 namespace ghost_link {
502 color_mpi(graph.getMpiCommunicator()),
503 termination(graph.getMpiCommunicator(), color_mpi),
504 server_pack(server_pack) {
Definition: communication.h:251
virtual int getRank() const =0
Definition: communication.h:637
Definition: distributed_edge.h:91
virtual LocationState state() const =0
Definition: distributed_graph.h:169
virtual api::communication::MpiCommunicator & getMpiCommunicator()=0
virtual void unlink(DistributedEdge< T > *edge)=0
Definition: distributed_id.h:89
Definition: distributed_node.h:28
virtual LocationState state() const =0
virtual int location() const =0
virtual IdType getId() const =0
virtual NodeType * getSourceNode() const =0
virtual NodeType * getTargetNode() const =0
virtual void erase(NodeType *node)=0
virtual const std::vector< EdgeType * > getIncomingEdges() const =0
virtual IdType getId() const =0
virtual const std::vector< EdgeType * > getOutgoingEdges() const =0
Definition: ptr_wrapper.h:21
Definition: ghost_mode.h:215
void synchronize_links()
Definition: ghost_mode.h:291
GhostSyncLinkerBase(EdgeMpi &edge_mpi, IdMpi &id_mpi, api::graph::DistributedGraph< T > &graph)
Definition: ghost_mode.h:263
Definition: server_pack.h:39
void terminate()
Definition: server_pack.h:119
Definition: server_pack.h:257
LinkServer & linkServer() override
Definition: server_pack.h:285
Definition: termination.h:26
void terminate(api::Server &server) override
Definition: termination.cpp:15
Definition: hard_sync_mode.h:59
Definition: client_server.h:294
virtual void unlink(const fpmas::api::graph::DistributedEdge< T > *edge)=0
virtual void removeNode(const fpmas::api::graph::DistributedNode< T > *node)=0
virtual void link(const fpmas::api::graph::DistributedEdge< T > *edge)=0
Definition: client_server.h:379
Definition: client_server.h:208
Definition: hard_sync_linker.h:480
void synchronize() override
Definition: hard_sync_linker.h:508
HardSyncLinker(communication::TypedMpi< graph::EdgePtrWrapper< T > > &edge_mpi, communication::TypedMpi< DistributedId > &id_mpi, fpmas::api::graph::DistributedGraph< T > &graph, ServerPackBase &server_pack)
Definition: hard_sync_linker.h:496
Definition: hard_sync_linker.h:310
fpmas::api::graph::DistributedNode< T > NodeApi
Definition: hard_sync_linker.h:319
void unlink(EdgeApi *edge) override
Definition: hard_sync_linker.h:381
fpmas::api::graph::DistributedEdge< T > EdgeApi
Definition: hard_sync_linker.h:315
void removeNode(NodeApi *node) override
Definition: hard_sync_linker.h:412
void link(EdgeApi *edge) override
Definition: hard_sync_linker.h:361
void registerNodeToRemove(fpmas::api::graph::DistributedNode< T > *node) override
Definition: hard_sync_linker.h:329
void synchronize() override
Definition: hard_sync_linker.h:449
HardSyncLinker(fpmas::api::graph::DistributedGraph< T > &graph, api::LinkClient< T > &link_client, ServerPack< api::MutexServer< T >, api::LinkServer > &server_pack)
Definition: hard_sync_linker.h:342
Definition: hard_sync_linker.h:149
void removeNode(const NodeApi *) override
Definition: hard_sync_linker.h:285
fpmas::api::communication::TypedMpi< graph::EdgePtrWrapper< T > > EdgeMpi
Definition: hard_sync_linker.h:162
fpmas::api::graph::DistributedNode< T > NodeApi
Definition: hard_sync_linker.h:158
void link(const EdgeApi *) override
Definition: hard_sync_linker.h:199
void unlink(const EdgeApi *) override
Definition: hard_sync_linker.h:250
LinkClient(fpmas::api::communication::MpiCommunicator &comm, IdMpi &id_mpi, EdgeMpi &edge_mpi, ServerPackBase &server_pack)
Definition: hard_sync_linker.h:186
fpmas::api::communication::TypedMpi< DistributedId > IdMpi
Definition: hard_sync_linker.h:166
fpmas::api::graph::DistributedEdge< T > EdgeApi
Definition: hard_sync_linker.h:154
Definition: hard_sync_linker.h:28
LinkServer(fpmas::api::communication::MpiCommunicator &comm, fpmas::api::graph::DistributedGraph< T > &graph, IdMpi &id_mpi, EdgeMpi &edge_mpi)
Definition: hard_sync_linker.h:61
void handleIncomingRequests() override
Definition: hard_sync_linker.h:100
bool isLockedRemoveNode(DistributedId node_id) override
Definition: hard_sync_linker.h:90
fpmas::api::graph::DistributedEdge< T > EdgeApi
Definition: hard_sync_linker.h:33
void unlockUnlink(DistributedId edge_id) override
Definition: hard_sync_linker.h:82
void setEpoch(api::Epoch epoch) override
Definition: hard_sync_linker.h:69
bool isLockedUnlink(DistributedId edge_id) override
Definition: hard_sync_linker.h:78
void lockUnlink(DistributedId edge_id) override
Definition: hard_sync_linker.h:74
fpmas::api::communication::TypedMpi< graph::EdgePtrWrapper< T > > EdgeMpi
Definition: hard_sync_linker.h:37
void lockRemoveNode(DistributedId node_id) override
Definition: hard_sync_linker.h:86
void unlockRemoveNode(DistributedId node_id) override
Definition: hard_sync_linker.h:94
fpmas::api::communication::TypedMpi< DistributedId > IdMpi
Definition: hard_sync_linker.h:41
Epoch getEpoch() const override
Definition: hard_sync_linker.h:68
#define FPMAS_C_STR(arg)
Definition: macros.h:24
LocationState
Definition: location_state.h:15
@ DISTANT
Definition: location_state.h:28
Epoch
Definition: enums.h:15
Tag
Definition: enums.h:23
Definition: communication.h:162
Definition: communication.h:215
int tag
Definition: communication.h:236
int source
Definition: communication.h:232