fpmas 1.6
ghost_mode.h
Go to the documentation of this file.
1#ifndef FPMAS_GHOST_MODE_H
2#define FPMAS_GHOST_MODE_H
3
10#include "fpmas/utils/macros.h"
11#include "../data_update_pack.h"
12#include "../synchro.h"
13#include "single_thread_mutex.h"
14
15namespace fpmas { namespace synchro {
16 namespace ghost {
17
26 template<typename T>
27 class GhostMutex : public SingleThreadMutex<T> {
28 public:
30
31 const T& read() override {return this->data();};
32 void releaseRead() override {};
33 T& acquire() override {return this->data();};
34 void releaseAcquire() override {};
35 void synchronize() override {};
36 };
37
44 template<typename T>
46 public:
55
56 private:
57 DataMpi& data_mpi;
58 IdMpi& id_mpi;
59
61 std::unordered_map<int, std::vector<DistributedId>> buildRequests();
62 std::unordered_map<int, std::vector<DistributedId>> buildRequests(
63 std::unordered_set<api::graph::DistributedNode<T>*> nodes
64 );
65 void _synchronize(
66 std::unordered_map<int, std::vector<DistributedId>> requests
67 );
68
69 public:
79 DataMpi& data_mpi, IdMpi& id_mpi,
81 )
82 : data_mpi(data_mpi), id_mpi(id_mpi), graph(graph) {}
83
88 void synchronize() override;
89
90 void synchronize(
91 std::unordered_set<api::graph::DistributedNode<T>*> nodes
92 ) override;
93 };
94
95 template<typename T>
96 void GhostDataSync<T>::_synchronize(
97 std::unordered_map<int, std::vector<DistributedId>> requests) {
98 FPMAS_LOGI(
99 graph.getMpiCommunicator().getRank(), "GHOST_MODE",
100 "Synchronizing graph data...", ""
101 );
102 requests = id_mpi.migrate(std::move(requests));
103
104 std::unordered_map<int, std::vector<NodeUpdatePack<T>>> updated_data;
105 updated_data.reserve(graph.getMpiCommunicator().getSize());
106 for(auto list : requests) {
107 updated_data[list.first].reserve(list.second.size());
108 for(auto id : list.second) {
109 FPMAS_LOGV(
110 graph.getMpiCommunicator().getRank(), "GHOST_MODE",
111 "Export %s to %i", FPMAS_C_STR(id), list.first
112 );
113 auto node = graph.getNode(id);
114 updated_data[list.first].emplace_back(
115 id, node->data(), node->getWeight()
116 );
117 }
118 }
119
120 // TODO : Should use data update packs.
121 updated_data = data_mpi.migrate(std::move(updated_data));
122 for(auto list : updated_data) {
123 for(auto& data : list.second) {
124 auto local_node = graph.getNode(data.id);
126 local_node->data(),
127 std::move(data.updated_data)
128 );
129 local_node->setWeight(data.updated_weight);
130 }
131 }
132
133 FPMAS_LOGI(
134 graph.getMpiCommunicator().getRank(), "GHOST_MODE",
135 "Graph data synchronized.", ""
136 );
137 }
138
139 template<typename T>
140 std::unordered_map<int, std::vector<DistributedId>> GhostDataSync<T>
141 ::buildRequests(std::unordered_set<api::graph::DistributedNode<T>*> nodes) {
142 std::unordered_map<int, std::vector<DistributedId>> requests;
143 requests.reserve(graph.getMpiCommunicator().getSize());
144 for(auto node : nodes) {
145 if(node->state() == fpmas::api::graph::DISTANT) {
146 FPMAS_LOGV(
147 graph.getMpiCommunicator().getRank(), "GHOST_MODE",
148 "Request %s from %i",
149 FPMAS_C_STR(node->getId()), node->location());
150
151 requests[node->location()].emplace_back(node->getId());
152 }
153 }
154 return requests;
155 }
156
157 template<typename T>
159 std::unordered_set<api::graph::DistributedNode<T>*> nodes) {
160 FPMAS_LOGI(
161 graph.getMpiCommunicator().getRank(), "GHOST_MODE",
162 "Synchronizing graph data...", ""
163 );
164
165 _synchronize(buildRequests(nodes));
166 for(auto node : nodes)
167 node->mutex()->synchronize();
168 }
169
170 template<typename T>
171 std::unordered_map<int, std::vector<DistributedId>> GhostDataSync<T>
173 std::unordered_map<int, std::vector<DistributedId>> requests;
174 requests.reserve(graph.getMpiCommunicator().getSize());
175 for(auto node : graph.getLocationManager().getDistantNodes()) {
176 FPMAS_LOGV(
177 graph.getMpiCommunicator().getRank(), "GHOST_MODE",
178 "Request %s from %i",
179 FPMAS_C_STR(node.first), node.second->location()
180 );
181 requests[node.second->location()].emplace_back(node.first);
182 }
183 return requests;
184 }
185
186 template<typename T>
188 FPMAS_LOGI(
189 graph.getMpiCommunicator().getRank(), "GHOST_MODE",
190 "Synchronizing graph data...", "");
191
192 _synchronize(buildRequests());
193 for(auto node : graph.getNodes())
194 node.second->mutex()->synchronize();
195 }
196
197
214 template<typename T>
216 public:
237 private:
238 std::vector<EdgePtr> link_buffer;
239 std::unordered_map<int, std::vector<DistributedId>> unlink_migration;
240 std::unordered_map<int, std::vector<DistributedId>> remove_node_buffer;
241 std::vector<NodeApi*> local_nodes_to_remove;
242
243 EdgeMpi& edge_mpi;
244 IdMpi& id_mpi;
246
247 protected:
252 void synchronize_links();
253
254 public:
264 EdgeMpi& edge_mpi, IdMpi& id_mpi,
266 : edge_mpi(edge_mpi), id_mpi(id_mpi), graph(graph) {}
267
274 void link(EdgeApi* edge) override;
275
285 void unlink(EdgeApi* edge) override;
286
287 void removeNode(NodeApi* node) override;
288 };
289
290 template<typename T>
292 FPMAS_LOGI(graph.getMpiCommunicator().getRank(),
293 "GHOST_MODE", "Synchronizing graph links...", "");
294 /*
295 * Migrate links
296 */
297 std::vector<EdgePtr> edges_to_clear;
298 std::unordered_map<int, std::vector<EdgePtr>> link_migration;
299 for(auto edge : link_buffer) {
300 auto src = edge->getSourceNode();
301 if(src->state() == LocationState::DISTANT) {
302 link_migration[src->location()].push_back(edge);
303 }
304 auto tgt = edge->getTargetNode();
305 if(tgt->state() == LocationState::DISTANT) {
306 link_migration[tgt->location()].push_back(edge);
307 }
308 if(src->state() == LocationState::DISTANT
309 && tgt->state() == LocationState::DISTANT) {
310 edges_to_clear.push_back(edge);
311 }
312 }
313 link_migration = edge_mpi.migrate(std::move(link_migration));
314
315 for(auto import_list : link_migration) {
316 for (auto edge : import_list.second) {
317 graph.importEdge(edge);
318 }
319 }
320 link_buffer.clear();
321
322 /*
323 * Migrate node removal
324 */
325 remove_node_buffer = id_mpi.migrate(std::move(remove_node_buffer));
326 for(auto import_list : remove_node_buffer) {
327 for(DistributedId node_id : import_list.second) {
328 auto* node = graph.getNode(node_id);
329 for(auto edge : node->getOutgoingEdges())
330 graph.unlink(edge);
331 for(auto edge : node->getIncomingEdges())
332 graph.unlink(edge);
333 }
334 }
335
336 /*
337 * Migrate unlinks
338 */
339 unlink_migration = id_mpi.migrate(std::move(unlink_migration));
340 for(auto import_list : unlink_migration) {
341 for(DistributedId id : import_list.second) {
342 const auto& edges = graph.getEdges();
343 auto it = edges.find(id);
344 if(it != edges.end())
345 graph.erase(it->second);
346 }
347 }
348 unlink_migration.clear();
349
350 for(auto edge : edges_to_clear)
351 graph.erase(edge);
352
353 for(auto import_list : remove_node_buffer)
354 for(auto node_id : import_list.second)
355 local_nodes_to_remove.push_back(graph.getNode(node_id));
356 remove_node_buffer.clear();
357
358 for(auto node : local_nodes_to_remove) {
359 graph.erase(node);
360 }
361 local_nodes_to_remove.clear();
362
363 FPMAS_LOGI(graph.getMpiCommunicator().getRank(),
364 "GHOST_MODE", "Graph links synchronized.", "");
365 }
366
367 template<typename T>
369 if(edge->state() == LocationState::DISTANT) {
370 link_buffer.push_back(const_cast<EdgeApi*>(edge));
371 }
372 }
373
374 template<typename T>
376 if(edge->state() == LocationState::DISTANT) {
377 link_buffer.erase(
378 std::remove(link_buffer.begin(), link_buffer.end(), edge),
379 link_buffer.end()
380 );
381 auto src = edge->getSourceNode();
382 if(src->state() == LocationState::DISTANT) {
383 unlink_migration[src->location()].push_back(edge->getId());
384 }
385 auto tgt = edge->getTargetNode();
386 if(tgt->state() == LocationState::DISTANT) {
387 unlink_migration[tgt->location()].push_back(edge->getId());
388 }
389 }
390 }
391
392 template<typename T>
394 if(node->state() == LocationState::DISTANT) {
395 remove_node_buffer[node->location()].push_back(node->getId());
396 } else {
397 for(auto edge : node->getOutgoingEdges())
398 graph.unlink(edge);
399 for(auto edge : node->getIncomingEdges())
400 graph.unlink(edge);
401 }
402 local_nodes_to_remove.push_back(node);
403 }
404
414 template<typename T>
416 public:
418
419 void synchronize() override;
420 };
421
422 template<typename T>
424 this->synchronize_links();
425 }
426
449 template<typename T, template<typename> class Mutex>
454
455 GhostDataSync<T> data_sync;
456 GhostSyncLinker<T> sync_linker;
457
458 public:
468 : id_mpi(comm), data_mpi(comm), edge_mpi(comm),
469 data_sync(data_mpi, id_mpi, graph), sync_linker(edge_mpi, id_mpi, graph) {}
470
476 Mutex<T>* buildMutex(api::graph::DistributedNode<T>* node) override {
477 return new Mutex<T>(node->data());
478 };
479
485 GhostDataSync<T>& getDataSync() override {return data_sync;}
486
492 GhostSyncLinker<T>& getSyncLinker() override {return sync_linker;}
493 };
494 }
495
541 template<typename T>
543}}
544#endif
Definition: communication.h:251
Definition: communication.h:637
Definition: distributed_edge.h:91
virtual LocationState state() const =0
Definition: distributed_graph.h:169
Definition: distributed_id.h:89
Definition: distributed_node.h:28
virtual LocationState state() const =0
virtual int location() const =0
virtual IdType getId() const =0
virtual NodeType * getSourceNode() const =0
virtual NodeType * getTargetNode() const =0
virtual const std::vector< EdgeType * > getIncomingEdges() const =0
virtual IdType getId() const =0
virtual const std::vector< EdgeType * > getOutgoingEdges() const =0
Definition: sync_mode.h:26
Definition: sync_mode.h:79
Definition: sync_mode.h:120
Definition: ptr_wrapper.h:21
Definition: ghost_mode.h:45
void synchronize() override
Definition: ghost_mode.h:187
api::communication::TypedMpi< DistributedId > IdMpi
Definition: ghost_mode.h:54
GhostDataSync(DataMpi &data_mpi, IdMpi &id_mpi, api::graph::DistributedGraph< T > &graph)
Definition: ghost_mode.h:78
api::communication::TypedMpi< NodeUpdatePack< T > > DataMpi
Definition: ghost_mode.h:50
Definition: ghost_mode.h:450
GhostDataSync< T > & getDataSync() override
Definition: ghost_mode.h:485
Mutex< T > * buildMutex(api::graph::DistributedNode< T > *node) override
Definition: ghost_mode.h:476
GhostSyncLinker< T > & getSyncLinker() override
Definition: ghost_mode.h:492
GhostMode(api::graph::DistributedGraph< T > &graph, api::communication::MpiCommunicator &comm)
Definition: ghost_mode.h:465
Definition: ghost_mode.h:27
const T & read() override
Definition: ghost_mode.h:31
void synchronize() override
Definition: ghost_mode.h:35
void releaseAcquire() override
Definition: ghost_mode.h:34
void releaseRead() override
Definition: ghost_mode.h:32
T & acquire() override
Definition: ghost_mode.h:33
Definition: ghost_mode.h:215
void unlink(EdgeApi *edge) override
Definition: ghost_mode.h:375
api::communication::TypedMpi< DistributedId > IdMpi
Definition: ghost_mode.h:236
void removeNode(NodeApi *node) override
Definition: ghost_mode.h:393
api::graph::DistributedNode< T > NodeApi
Definition: ghost_mode.h:224
void link(EdgeApi *edge) override
Definition: ghost_mode.h:368
api::utils::PtrWrapper< EdgeApi > EdgePtr
Definition: ghost_mode.h:228
api::graph::DistributedEdge< T > EdgeApi
Definition: ghost_mode.h:220
void synchronize_links()
Definition: ghost_mode.h:291
api::communication::TypedMpi< EdgePtr > EdgeMpi
Definition: ghost_mode.h:232
GhostSyncLinkerBase(EdgeMpi &edge_mpi, IdMpi &id_mpi, api::graph::DistributedGraph< T > &graph)
Definition: ghost_mode.h:263
Definition: ghost_mode.h:415
void synchronize() override
Definition: ghost_mode.h:423
Definition: single_thread_mutex.h:17
T & data() override
Definition: single_thread_mutex.h:33
#define FPMAS_C_STR(arg)
Definition: macros.h:24
LocationState
Definition: location_state.h:15
@ DISTANT
Definition: location_state.h:28
Definition: fpmas.cpp:3
static void update(T &local_data, T &&updated_data)
Definition: synchro.h:35