1#ifndef FPMAS_GHOST_MODE_H
2#define FPMAS_GHOST_MODE_H
11#include "../data_update_pack.h"
12#include "../synchro.h"
15namespace fpmas {
namespace synchro {
31 const T&
read()
override {
return this->
data();};
61 std::unordered_map<int, std::vector<DistributedId>> buildRequests();
62 std::unordered_map<int, std::vector<DistributedId>> buildRequests(
66 std::unordered_map<
int, std::vector<DistributedId>> requests
82 : data_mpi(data_mpi), id_mpi(id_mpi), graph(graph) {}
96 void GhostDataSync<T>::_synchronize(
97 std::unordered_map<
int, std::vector<DistributedId>> requests) {
99 graph.getMpiCommunicator().getRank(),
"GHOST_MODE",
100 "Synchronizing graph data...",
""
102 requests = id_mpi.migrate(std::move(requests));
104 std::unordered_map<int, std::vector<NodeUpdatePack<T>>> updated_data;
105 updated_data.reserve(graph.getMpiCommunicator().getSize());
106 for(
auto list : requests) {
107 updated_data[list.first].reserve(list.second.size());
108 for(
auto id : list.second) {
110 graph.getMpiCommunicator().getRank(),
"GHOST_MODE",
113 auto node = graph.getNode(
id);
114 updated_data[list.first].emplace_back(
115 id, node->data(), node->getWeight()
121 updated_data = data_mpi.migrate(std::move(updated_data));
122 for(
auto list : updated_data) {
123 for(
auto& data : list.second) {
124 auto local_node = graph.getNode(data.id);
127 std::move(data.updated_data)
129 local_node->setWeight(data.updated_weight);
134 graph.getMpiCommunicator().getRank(),
"GHOST_MODE",
135 "Graph data synchronized.",
""
140 std::unordered_map<int, std::vector<DistributedId>> GhostDataSync<T>
141 ::buildRequests(std::unordered_set<api::graph::DistributedNode<T>*> nodes) {
142 std::unordered_map<int, std::vector<DistributedId>> requests;
143 requests.reserve(graph.getMpiCommunicator().getSize());
144 for(
auto node : nodes) {
147 graph.getMpiCommunicator().getRank(),
"GHOST_MODE",
148 "Request %s from %i",
151 requests[node->location()].emplace_back(node->getId());
161 graph.getMpiCommunicator().getRank(),
"GHOST_MODE",
162 "Synchronizing graph data...",
""
165 _synchronize(buildRequests(nodes));
166 for(
auto node : nodes)
167 node->mutex()->synchronize();
173 std::unordered_map<int, std::vector<DistributedId>> requests;
174 requests.reserve(graph.getMpiCommunicator().getSize());
175 for(
auto node : graph.getLocationManager().getDistantNodes()) {
177 graph.getMpiCommunicator().getRank(),
"GHOST_MODE",
178 "Request %s from %i",
181 requests[node.second->location()].emplace_back(node.first);
189 graph.getMpiCommunicator().getRank(),
"GHOST_MODE",
190 "Synchronizing graph data...",
"");
192 _synchronize(buildRequests());
193 for(
auto node : graph.getNodes())
194 node.second->mutex()->synchronize();
238 std::vector<EdgePtr> link_buffer;
239 std::unordered_map<int, std::vector<DistributedId>> unlink_migration;
240 std::unordered_map<int, std::vector<DistributedId>> remove_node_buffer;
241 std::vector<NodeApi*> local_nodes_to_remove;
266 : edge_mpi(edge_mpi), id_mpi(id_mpi), graph(graph) {}
292 FPMAS_LOGI(graph.getMpiCommunicator().getRank(),
293 "GHOST_MODE",
"Synchronizing graph links...",
"");
297 std::vector<EdgePtr> edges_to_clear;
298 std::unordered_map<int, std::vector<EdgePtr>> link_migration;
299 for(
auto edge : link_buffer) {
300 auto src = edge->getSourceNode();
302 link_migration[src->location()].push_back(edge);
304 auto tgt = edge->getTargetNode();
306 link_migration[tgt->location()].push_back(edge);
310 edges_to_clear.push_back(edge);
313 link_migration = edge_mpi.migrate(std::move(link_migration));
315 for(
auto import_list : link_migration) {
316 for (
auto edge : import_list.second) {
317 graph.importEdge(edge);
325 remove_node_buffer = id_mpi.migrate(std::move(remove_node_buffer));
326 for(
auto import_list : remove_node_buffer) {
328 auto* node = graph.getNode(node_id);
329 for(
auto edge : node->getOutgoingEdges())
331 for(
auto edge : node->getIncomingEdges())
339 unlink_migration = id_mpi.migrate(std::move(unlink_migration));
340 for(
auto import_list : unlink_migration) {
342 const auto& edges = graph.getEdges();
343 auto it = edges.find(
id);
344 if(it != edges.end())
345 graph.erase(it->second);
348 unlink_migration.clear();
350 for(
auto edge : edges_to_clear)
353 for(
auto import_list : remove_node_buffer)
354 for(
auto node_id : import_list.second)
355 local_nodes_to_remove.push_back(graph.getNode(node_id));
356 remove_node_buffer.clear();
358 for(
auto node : local_nodes_to_remove) {
361 local_nodes_to_remove.clear();
363 FPMAS_LOGI(graph.getMpiCommunicator().getRank(),
364 "GHOST_MODE",
"Graph links synchronized.",
"");
370 link_buffer.push_back(
const_cast<EdgeApi*
>(edge));
378 std::remove(link_buffer.begin(), link_buffer.end(), edge),
383 unlink_migration[src->location()].push_back(edge->
getId());
387 unlink_migration[tgt->location()].push_back(edge->
getId());
395 remove_node_buffer[node->
location()].push_back(node->
getId());
402 local_nodes_to_remove.push_back(node);
424 this->synchronize_links();
449 template<
typename T,
template<
typename>
class Mutex>
468 : id_mpi(comm), data_mpi(comm), edge_mpi(comm),
469 data_sync(data_mpi, id_mpi, graph), sync_linker(edge_mpi, id_mpi, graph) {}
477 return new Mutex<T>(node->
data());
Definition: communication.h:251
Definition: communication.h:637
Definition: distributed_edge.h:91
virtual LocationState state() const =0
Definition: distributed_graph.h:169
Definition: distributed_id.h:89
Definition: distributed_node.h:28
virtual LocationState state() const =0
virtual int location() const =0
virtual IdType getId() const =0
virtual NodeType * getSourceNode() const =0
virtual NodeType * getTargetNode() const =0
virtual const std::vector< EdgeType * > getIncomingEdges() const =0
virtual IdType getId() const =0
virtual const std::vector< EdgeType * > getOutgoingEdges() const =0
Definition: sync_mode.h:26
Definition: sync_mode.h:79
Definition: sync_mode.h:120
Definition: ptr_wrapper.h:21
Definition: ghost_mode.h:45
void synchronize() override
Definition: ghost_mode.h:187
api::communication::TypedMpi< DistributedId > IdMpi
Definition: ghost_mode.h:54
GhostDataSync(DataMpi &data_mpi, IdMpi &id_mpi, api::graph::DistributedGraph< T > &graph)
Definition: ghost_mode.h:78
api::communication::TypedMpi< NodeUpdatePack< T > > DataMpi
Definition: ghost_mode.h:50
Definition: ghost_mode.h:450
GhostDataSync< T > & getDataSync() override
Definition: ghost_mode.h:485
Mutex< T > * buildMutex(api::graph::DistributedNode< T > *node) override
Definition: ghost_mode.h:476
GhostSyncLinker< T > & getSyncLinker() override
Definition: ghost_mode.h:492
GhostMode(api::graph::DistributedGraph< T > &graph, api::communication::MpiCommunicator &comm)
Definition: ghost_mode.h:465
Definition: ghost_mode.h:27
const T & read() override
Definition: ghost_mode.h:31
void synchronize() override
Definition: ghost_mode.h:35
void releaseAcquire() override
Definition: ghost_mode.h:34
void releaseRead() override
Definition: ghost_mode.h:32
T & acquire() override
Definition: ghost_mode.h:33
Definition: ghost_mode.h:215
void unlink(EdgeApi *edge) override
Definition: ghost_mode.h:375
api::communication::TypedMpi< DistributedId > IdMpi
Definition: ghost_mode.h:236
void removeNode(NodeApi *node) override
Definition: ghost_mode.h:393
api::graph::DistributedNode< T > NodeApi
Definition: ghost_mode.h:224
void link(EdgeApi *edge) override
Definition: ghost_mode.h:368
api::utils::PtrWrapper< EdgeApi > EdgePtr
Definition: ghost_mode.h:228
api::graph::DistributedEdge< T > EdgeApi
Definition: ghost_mode.h:220
void synchronize_links()
Definition: ghost_mode.h:291
api::communication::TypedMpi< EdgePtr > EdgeMpi
Definition: ghost_mode.h:232
GhostSyncLinkerBase(EdgeMpi &edge_mpi, IdMpi &id_mpi, api::graph::DistributedGraph< T > &graph)
Definition: ghost_mode.h:263
Definition: ghost_mode.h:415
void synchronize() override
Definition: ghost_mode.h:423
Definition: single_thread_mutex.h:17
T & data() override
Definition: single_thread_mutex.h:33
#define FPMAS_C_STR(arg)
Definition: macros.h:24
LocationState
Definition: location_state.h:15
@ DISTANT
Definition: location_state.h:28
static void update(T &local_data, T &&updated_data)
Definition: synchro.h:35