fpmas 1.6
distributed_graph.h
Go to the documentation of this file.
1#ifndef FPMAS_DISTRIBUTED_GRAPH_H
2#define FPMAS_DISTRIBUTED_GRAPH_H
3
8#include <set>
9
14
15#include "fpmas/graph/graph.h"
16
17#define DIST_GRAPH_PARAMS\
18 typename T,\
19 template<typename> class SyncMode,\
20 template<typename> class DistNodeImpl,\
21 template<typename> class DistEdgeImpl,\
22 template<typename> class TypedMpi,\
23 template<typename> class LocationManagerImpl
24
25#define DIST_GRAPH_PARAMS_SPEC\
26 T,\
27 SyncMode,\
28 DistNodeImpl,\
29 DistEdgeImpl,\
30 TypedMpi,\
31 LocationManagerImpl
32
33namespace fpmas { namespace graph {
34
38 namespace detail {
39
43 template<DIST_GRAPH_PARAMS>
45 public Graph<
46 api::graph::DistributedNode<T>,
47 api::graph::DistributedEdge<T>>,
49 {
50 public:
51 static_assert(
52 std::is_base_of<api::graph::DistributedNode<T>, DistNodeImpl<T>>::value,
53 "DistNodeImpl must implement api::graph::DistributedNode"
54 );
55 static_assert(
56 std::is_base_of<api::graph::DistributedEdge<T>, DistEdgeImpl<T>>::value,
57 "DistEdgeImpl must implement api::graph::DistributedEdge"
58 );
59
60 public:
69
73
75 = delete;
76 DistributedGraph& operator=(
78 = delete;
79
88
89 private:
90 class EraseNodeCallback : public api::utils::Callback<NodeType*> {
91 private:
93 bool enabled = true;
94 public:
95 EraseNodeCallback(DistributedGraph<DIST_GRAPH_PARAMS_SPEC>& graph)
96 : graph(graph) {}
97
98 void disable() {
99 enabled = false;
100 }
101 void call(NodeType* node) {
102 if(enabled) {
103 graph.location_manager.remove(node);
104 graph.unsynchronized_nodes.erase(node);
105 }
106 }
107 };
108
109 api::communication::MpiCommunicator* mpi_communicator;
110 TypedMpi<DistributedId> id_mpi {*mpi_communicator};
111 TypedMpi<std::pair<DistributedId, int>> location_mpi {*mpi_communicator};
112
113 LocationManagerImpl<T> location_manager;
114 SyncMode<T> sync_mode;
115
116 std::vector<SetLocalNodeCallback*> set_local_callbacks;
117 std::vector<SetDistantNodeCallback*> set_distant_callbacks;
118
119 EraseNodeCallback* erase_node_callback;
120
121 NodeType* _buildNode(NodeType*);
122
123 void setLocal(
124 api::graph::DistributedNode<T>* node,
126 );
127 void setDistant(
128 api::graph::DistributedNode<T>* node,
130 );
131
132 void triggerSetLocalCallbacks(
133 const api::graph::SetLocalNodeEvent<T>& event) {
134 for(auto callback : set_local_callbacks)
135 callback->call(event);
136 }
137
138 void triggerSetDistantCallbacks(
139 const api::graph::SetDistantNodeEvent<T>& event) {
140 for(auto callback : set_distant_callbacks)
141 callback->call(event);
142 }
143
144 void clearDistantNodes();
145 void clearNode(NodeType*);
146 void _distribute(api::graph::PartitionMap);
147
148 DistributedId node_id;
149 DistributedId edge_id;
150
151 std::unordered_set<api::graph::DistributedNode<T>*> unsynchronized_nodes;
152
153 public:
160 mpi_communicator(&comm), location_manager(*mpi_communicator, id_mpi, location_mpi),
161 sync_mode(*this, *mpi_communicator),
162 // Graph base takes ownership of the erase_node_callback
163 erase_node_callback(new EraseNodeCallback(*this)),
164 node_id(mpi_communicator->getRank(), 0),
165 edge_id(mpi_communicator->getRank(), 0) {
166 this->addCallOnEraseNode(erase_node_callback);
167 }
168
175 return *mpi_communicator;
176 };
177
182 return *mpi_communicator;
183 };
184
185 DistributedId currentNodeId() const override {return node_id;}
186 void setCurrentNodeId(DistributedId id) override {this->node_id = id;}
187 DistributedId currentEdgeId() const override {return edge_id;}
188 void setCurrentEdgeId(DistributedId id) override {this->edge_id = id;}
189
190 NodeType* importNode(NodeType* node) override;
191 EdgeType* importEdge(EdgeType* edge) override;
192 std::unordered_set<api::graph::DistributedNode<T>*> getUnsyncNodes() const override {
193 return unsynchronized_nodes;
194 }
195
196
205 const SyncMode<T>& getSyncMode() const {return sync_mode;}
206
207 SyncMode<T>& synchronizationMode() override {return sync_mode;}
208
209 const LocationManagerImpl<T>&
210 getLocationManager() const override {return location_manager;}
211 LocationManagerImpl<T>&
212 getLocationManager() override {return location_manager;}
213
214 void balance(api::graph::LoadBalancing<T>& load_balancing) override {
215 balance(load_balancing, api::graph::PARTITION);
216 };
217
219 api::graph::LoadBalancing<T>& load_balancing,
220 api::graph::PartitionMode partition_mode
221 ) override {
222 FPMAS_LOGI(
223 getMpiCommunicator().getRank(), "LB",
224 "Balancing graph (%lu nodes, %lu edges)",
225 this->getNodes().size(), this->getEdges().size());
226
227 sync_mode.getSyncLinker().synchronize();
228 this->_distribute(load_balancing.balance(
229 this->location_manager.getLocalNodes(),
230 partition_mode
231 ));
232
233 // Data synchronization of newly imported DISTANT nodes.
234 // In GhostMode, this import DISTANT nodes data, so that it can
235 // be used directly following the _distribute() operation
236 // without any additional and complete synchronize().
237 this->synchronize(this->unsynchronized_nodes, false);
238
239 FPMAS_LOGI(
240 getMpiCommunicator().getRank(), "LB",
241 "Graph balanced : %lu nodes, %lu edges",
242 this->getNodes().size(), this->getEdges().size());
243 }
244
247 api::graph::PartitionMap fixed_vertices
248 ) override {
249 balance(load_balancing, fixed_vertices, api::graph::PARTITION);
250 };
251
254 api::graph::PartitionMap fixed_vertices,
255 api::graph::PartitionMode partition_mode
256 ) override {
257
258 FPMAS_LOGI(
259 getMpiCommunicator().getRank(), "LB",
260 "Balancing graph (%lu nodes, %lu edges)",
261 this->getNodes().size(), this->getEdges().size());
262
263 sync_mode.getSyncLinker().synchronize();
264 this->_distribute(load_balancing.balance(
265 this->location_manager.getLocalNodes(),
266 fixed_vertices,
267 partition_mode));
268
269 // Same as above
270 this->synchronize(this->unsynchronized_nodes, false);
271
272 FPMAS_LOGI(
273 getMpiCommunicator().getRank(), "LB",
274 "Graph balanced : %lu nodes, %lu edges",
275 this->getNodes().size(), this->getEdges().size());
276 }
277
278 void distribute(api::graph::PartitionMap partition) override;
279
280 void synchronize() override;
281 void synchronize(
282 std::unordered_set<NodeType*> nodes,
283 bool synchronize_links = true
284 ) override;
285
286 NodeType* buildNode(T&& = std::move(T())) override;
287 NodeType* buildNode(const T&) override;
289
290 EdgeType* link(NodeType* const src, NodeType* const tgt, api::graph::LayerId layer) override;
291
293 void removeNode(DistributedId id) override {
294 this->removeNode(this->getNode(id));
295 }
296
297 void unlink(EdgeType*) override;
298 void unlink(DistributedId id) override {
299 this->unlink(this->getEdge(id));
300 }
301
302 void switchLayer(EdgeType* edge, api::graph::LayerId layer_id) override;
303
304 void addCallOnSetLocal(SetLocalNodeCallback* callback) override {
305 set_local_callbacks.push_back(callback);
306 };
307 std::vector<SetLocalNodeCallback*> onSetLocalCallbacks() const override {
308 return set_local_callbacks;
309 }
310
312 set_distant_callbacks.push_back(callback);
313 };
314 std::vector<SetDistantNodeCallback*> onSetDistantCallbacks() const override {
315 return set_distant_callbacks;
316 }
317
319 };
320
321 template<DIST_GRAPH_PARAMS>
323 // Calls base Graph move constructor
324 Graph<api::graph::DistributedNode<T>, api::graph::DistributedEdge<T>>(std::move(graph)),
325 // Moves DistributedGraph specific fields
326 mpi_communicator(graph.mpi_communicator),
327 location_manager(std::move(graph.location_manager)),
328 sync_mode(std::move(graph.sync_mode)),
329 set_local_callbacks(std::move(graph.set_local_callbacks)),
330 set_distant_callbacks(std::move(graph.set_distant_callbacks)),
331 erase_node_callback(graph.erase_node_callback),
332 node_id(std::move(graph.node_id)), edge_id(std::move(graph.edge_id)) {
333 // TODO: Refactor callbacks system
334 // Prevents erase_node_callback->disable() when `graph` is
335 // deleted
336 graph.erase_node_callback = nullptr;
337 }
338
339 template<DIST_GRAPH_PARAMS>
341 // Calls base Graph move assignment
344 // Reassigns MPI communicators
345 //
346 // Notice that TypedMpi instances, location_manager and
347 // sync_mode have been constructed in `graph` using
348 // `graph.mpi_communicator`. Since we reassign
349 // `this->mpi_communicator` to `graph.mpi_communicator`, we can
350 // safely move those member fields from `graph` to `this`
351 // without introducing inconsistencies.
352 this->mpi_communicator = graph.mpi_communicator;
353 this->id_mpi = std::move(graph.id_mpi);
354 this->location_mpi = std::move(graph.location_mpi);
355
356 this->location_manager = std::move(graph.location_manager);
357 this->sync_mode = std::move(graph.sync_mode);
358 // The obsolete `this->erase_node_callback` is
359 // automatically deleted by the base Graph move assignment,
360 // when the "erase_node_callback" list is cleared.
361 this->erase_node_callback = graph.erase_node_callback;
362 // Prevents erase_node_callback->disable() when `graph` is
363 // deleted
364 graph.erase_node_callback = nullptr;
365
366 this->node_id = std::move(graph.node_id);
367 this->edge_id = std::move(graph.edge_id);
368 for(auto callback : set_local_callbacks)
369 delete callback;
370 for(auto callback : set_distant_callbacks)
371 delete callback;
372 this->set_local_callbacks = std::move(graph.set_local_callbacks);
373 this->set_distant_callbacks = std::move(graph.set_distant_callbacks);
374
375 return *this;
376 }
377
378 template<DIST_GRAPH_PARAMS>
382 location_manager.setLocal(node);
383 triggerSetLocalCallbacks({node, context});
384 }
385
386 template<DIST_GRAPH_PARAMS>
387 void DistributedGraph<DIST_GRAPH_PARAMS_SPEC>::setDistant(
390 location_manager.setDistant(node);
391 triggerSetDistantCallbacks({node, context});
392 }
393
394 template<DIST_GRAPH_PARAMS>
396 sync_mode.getSyncLinker().unlink(edge);
397 this->erase(edge);
398 }
399
400 template<DIST_GRAPH_PARAMS>
402 EdgeType* edge, api::graph::LayerId layer_id) {
403 assert(edge->state() == api::graph::LOCAL);
404
405 edge->getSourceNode()->unlinkOut(edge);
406 edge->getTargetNode()->unlinkIn(edge);
407 edge->setLayer(layer_id);
408 edge->getSourceNode()->linkOut(edge);
409 edge->getTargetNode()->linkIn(edge);
410 }
411
412 template<DIST_GRAPH_PARAMS>
415 FPMAS_LOGD(getMpiCommunicator().getRank(), "DIST_GRAPH", "Importing node %s...", FPMAS_C_STR(node->getId()));
416 // The input node must be a temporary dynamically allocated object.
417 // A representation of the node might already be contained in the
418 // graph, if it were already built as a "distant" node.
419
420 NodeType* local_node;
421 const auto& nodes = this->getNodes();
422 auto it = nodes.find(node->getId());
423 if(it == nodes.end()) {
424 FPMAS_LOGV(getMpiCommunicator().getRank(), "DIST_GRAPH", "Inserting new LOCAL node %s.", FPMAS_C_STR(node->getId()));
425 // The node is not contained in the graph, we need to build a new
426 // one.
427 // But instead of completely building a new node, we can re-use the
428 // temporary input node.
429 this->insert(node);
431 node->setMutex(sync_mode.buildMutex(node));
432 local_node = node;
433 } else {
434 FPMAS_LOGV(getMpiCommunicator().getRank(), "DIST_GRAPH", "Replacing existing DISTANT node %s.", FPMAS_C_STR(node->getId()));
435 // A representation of the node was already contained in the graph.
436 // We just need to update its state.
437
438 // Set local representation as local
439 local_node = it->second;
441 local_node->data(), std::move(node->data())
442 );
443 local_node->setWeight(node->getWeight());
445
446 // Deletes unused temporary input node
447 delete node;
448 }
449 for(auto& edge : local_node->getIncomingEdges())
450 if(edge->state() == api::graph::DISTANT
451 && edge->getSourceNode()->state() == api::graph::LOCAL)
453 for(auto& edge : local_node->getOutgoingEdges())
454 if(edge->state() == api::graph::DISTANT
455 && edge->getTargetNode()->state() == api::graph::LOCAL)
456 edge->setState(api::graph::LOCAL);
457
458
459 return local_node;
460 }
461
462 template<DIST_GRAPH_PARAMS>
465 std::unique_ptr<api::graph::TemporaryNode<T>> temp_src
466 = edge->getTempSourceNode();
467 std::unique_ptr<api::graph::TemporaryNode<T>> temp_tgt
468 = edge->getTempTargetNode();
469
470 FPMAS_LOGD(getMpiCommunicator().getRank(), "DIST_GRAPH", "Importing edge %s (from %s to %s)...",
471 FPMAS_C_STR(edge->getId()),
472 FPMAS_C_STR(temp_src->getId()),
473 FPMAS_C_STR(temp_tgt->getId())
474 );
475 // The input edge must be a dynamically allocated object, with temporary
476 // dynamically allocated nodes as source and target.
477 // A representation of the imported edge might already be present in the
478 // graph, for example if it has already been imported as a "distant"
479 // edge with other nodes at other epochs.
480
481 const auto& nodes = this->getNodes();
482 const auto& edges = this->getEdges();
483 auto edge_it = edges.find(edge->getId());
484 if(edge_it == edges.end()) {
485 // The edge does not belong to the graph : a new one must be built.
486
487 DistributedId src_id = temp_src->getId();
488 NodeType* src;
489 DistributedId tgt_id = temp_tgt->getId();
490 NodeType* tgt;
491
492 LocationState edgeLocationState = LocationState::LOCAL;
493
494 auto src_it = nodes.find(src_id);
495 if(src_it != nodes.end()) {
496 FPMAS_LOGV(getMpiCommunicator().getRank(), "DIST_GRAPH", "Linking existing source %s", FPMAS_C_STR(src_id));
497 // The source node is already contained in the graph
498 src = src_it->second;
499 if(src->state() == LocationState::DISTANT) {
500 // At least src is DISTANT, so the imported edge is
501 // necessarily DISTANT.
502 edgeLocationState = LocationState::DISTANT;
503 }
504 // Deletes the temporary source node
505 temp_src.reset();
506
507 // Links the temporary edge with the src contained in the graph
508 edge->setSourceNode(src);
509 src->linkOut(edge);
510 } else {
511 FPMAS_LOGV(getMpiCommunicator().getRank(), "DIST_GRAPH", "Creating DISTANT source %s", FPMAS_C_STR(src_id));
512 // The source node is not contained in the graph : it must be
513 // built as a DISTANT node.
514 edgeLocationState = LocationState::DISTANT;
515
516 // Instead of building a new node, we re-use the temporary
517 // source node.
518 src = temp_src->build();
519 temp_src.reset();
520
521 edge->setSourceNode(src);
522 src->linkOut(edge);
523 this->insert(src);
525 src->setMutex(sync_mode.buildMutex(src));
526 unsynchronized_nodes.insert(src);
527 }
528 auto tgt_it = nodes.find(tgt_id);
529 if(tgt_it != nodes.end()) {
530 FPMAS_LOGV(getMpiCommunicator().getRank(), "DIST_GRAPH", "Linking existing target %s", FPMAS_C_STR(tgt_id));
531 // The target node is already contained in the graph
532 tgt = tgt_it->second;
533 if(tgt->state() == LocationState::DISTANT) {
534 // At least src is DISTANT, so the imported edge is
535 // necessarily DISTANT.
536 edgeLocationState = LocationState::DISTANT;
537 }
538 // Deletes the temporary target node
539 temp_tgt.reset();
540
541 // Links the temporary edge with the tgt contained in the graph
542 edge->setTargetNode(tgt);
543 tgt->linkIn(edge);
544 } else {
545 FPMAS_LOGV(getMpiCommunicator().getRank(), "DIST_GRAPH", "Creating DISTANT target %s", FPMAS_C_STR(tgt_id));
546 // The target node is not contained in the graph : it must be
547 // built as a DISTANT node.
548 edgeLocationState = LocationState::DISTANT;
549
550 // Instead of building a new node, we re-use the temporary
551 // target node.
552 tgt = temp_tgt->build();
553 temp_tgt.reset();
554
555 edge->setTargetNode(tgt);
556 tgt->linkIn(edge);
557 this->insert(tgt);
559 tgt->setMutex(sync_mode.buildMutex(tgt));
560 unsynchronized_nodes.insert(tgt);
561 }
562 // Finally, insert the temporary edge into the graph.
563 edge->setState(edgeLocationState);
564 this->insert(edge);
565 return edge;
566 } // if (graph.count(edge_id) > 0)
567
568 // A representation of the edge is already present in the graph : it is
569 // useless to insert it again. We just need to update its state.
570
571 auto local_edge = edge_it->second;
572 if(local_edge->getSourceNode()->state() == LocationState::LOCAL
573 && local_edge->getTargetNode()->state() == LocationState::LOCAL) {
574 local_edge->setState(LocationState::LOCAL);
575 }
576
577 // Completely deletes temporary items, nothing is re-used
578 // Temporary nodes are automatically deleted
579 delete edge;
580
581 return local_edge;
582 }
583
584 template<DIST_GRAPH_PARAMS>
587 this->insert(node);
589 location_manager.addManagedNode(node->getId(), mpi_communicator->getRank());
590 node->setMutex(sync_mode.buildMutex(node));
591 return node;
592 }
593
594 template<DIST_GRAPH_PARAMS>
597 return _buildNode(new DistNodeImpl<T>(
598 this->node_id++, std::move(data)
599 ));
600 }
601
602 template<DIST_GRAPH_PARAMS>
605 return _buildNode(new DistNodeImpl<T>(
606 this->node_id++, data
607 ));
608 }
609 template<DIST_GRAPH_PARAMS>
612 FPMAS_LOGD(getMpiCommunicator().getRank(),
613 "GRAPH", "Inserting temporary distant node: %s",
614 FPMAS_C_STR(node->getId()));
615 const auto& nodes = this->getNodes();
616 auto node_it = nodes.find(node->getId());
617 if(node_it == nodes.end()) {
618 this->insert(node);
620 node->setMutex(sync_mode.buildMutex(node));
621 unsynchronized_nodes.insert(node);
622 return node;
623 } else {
624 delete node;
625 return node_it->second;
626 }
627 }
628
629 template<DIST_GRAPH_PARAMS>
632 sync_mode.getSyncLinker().removeNode(node);
633 }
634
635 template<DIST_GRAPH_PARAMS>
638 NodeType* const src, NodeType* const tgt, api::graph::LayerId layer) {
639 // Builds the new edge
640 auto edge = new DistEdgeImpl<T>(
641 this->edge_id++, layer
642 );
643 edge->setSourceNode(src);
644 src->linkOut(edge);
645 edge->setTargetNode(tgt);
646 tgt->linkIn(edge);
647
648 edge->setState(
652 );
653 sync_mode.getSyncLinker().link(edge);
654
655 // Inserts the edge in the Graph
656 this->insert(edge);
657
658 return edge;
659 }
660
661 template<DIST_GRAPH_PARAMS>
664
665 sync_mode.getSyncLinker().synchronize();
666
667 _distribute(partition);
668
669 sync_mode.getDataSync().synchronize();
670 }
671
672
673 template<DIST_GRAPH_PARAMS>
676 FPMAS_LOGI(getMpiCommunicator().getRank(), "DIST_GRAPH",
677 "Distributing graph...", "");
678 std::string partition_str = "\n";
679 for(auto item : partition) {
680 std::string str = FPMAS_C_STR(item.first);
681 str.append(" : " + std::to_string(item.second) + "\n");
682 partition_str.append(str);
683 }
684 FPMAS_LOGV(getMpiCommunicator().getRank(), "DIST_GRAPH", "Partition : %s", partition_str.c_str());
685
686 std::unordered_map<int, communication::DataPack> serial_nodes;
687 std::unordered_map<int, communication::DataPack> serial_edges;
688
689 std::vector<NodeType*> exported_nodes;
690 {
691 // Builds node and edges export maps
692 std::unordered_map<int, std::vector<NodePtrWrapper<T>>> node_export_map;
693 std::unordered_map<int, std::set<DistributedId>> edge_ids_to_export;
694 std::unordered_map<int, std::vector<EdgePtrWrapper<T>>> edge_export_map;
695 for(auto item : partition) {
696 if(item.second != mpi_communicator->getRank()) {
697 auto node = this->getNodes().find(item.first);
698 if(node != this->getNodes().end()
699 && node->second->state() == api::graph::LOCAL) {
700 FPMAS_LOGV(getMpiCommunicator().getRank(), "DIST_GRAPH",
701 "Exporting node %s to %i", FPMAS_C_STR(item.first), item.second);
702 auto node_to_export = node->second;
703 exported_nodes.push_back(node_to_export);
704 node_export_map[item.second].emplace_back(node_to_export);
705 for(auto edge : node_to_export->getIncomingEdges()) {
706 // If the target node is local on the
707 // destination process, it already owns this
708 // edge so no need to transmit it
709 if(edge->getSourceNode()->location() != item.second) {
710 // Insert or replace in the IDs set
711 edge_ids_to_export[item.second].insert(edge->getId());
712 }
713 }
714 for(auto edge : node_to_export->getOutgoingEdges()) {
715 // Same as above
716 if(edge->getTargetNode()->location() != item.second) {
717 // Insert or replace in the IDs set
718 edge_ids_to_export[item.second].insert(edge->getId());
719 }
720 }
721 }
722 }
723 }
724 // Ensures that each edge is exported once to each process
725 for(auto list : edge_ids_to_export) {
726 for(auto id : list.second) {
727 edge_export_map[list.first].emplace_back(this->getEdge(id));
728 }
729 }
730
731 // Serializes nodes and edges
732 for(auto& item : node_export_map)
733 serial_nodes.emplace(
734 item.first,
735 io::datapack::ObjectPack(item.second).dump()
736 );
737 for(auto& item : edge_export_map)
738 serial_edges.emplace(
739 item.first,
740 io::datapack::LightObjectPack(item.second).dump()
741 );
742
743 // Once serialized nodes and associated edges can
744 // eventually be cleared. This might save memory for the
745 // next import.
746
747 for(auto node : exported_nodes) {
748 setDistant(node, api::graph::SetDistantNodeEvent<T>::EXPORT_DISTANT);
749 }
750
751 // Clears buffers required for serialization
752 }
753
754 {
755 // Node import
756 std::unordered_map<int, std::vector<NodePtrWrapper<T>>> node_import;
757 for(auto& item : mpi_communicator->allToAll(std::move(serial_nodes), MPI_CHAR))
758 node_import.emplace(
759 item.first, io::datapack::ObjectPack::parse(
760 std::move(item.second)
761 ).get<std::vector<NodePtrWrapper<T>>>()
762 );
763
764 for(auto& import_node_list_from_proc : node_import) {
765 for(auto& imported_node : import_node_list_from_proc.second) {
766 this->importNode(imported_node);
767 }
768 import_node_list_from_proc.second.clear();
769 }
770 // node_import is deleted at the end of this scope
771 }
772
773 {
774 // Edge import
775 std::unordered_map<int, std::vector<EdgePtrWrapper<T>>> edge_import;
776
777 for(auto& item : mpi_communicator->allToAll(std::move(serial_edges), MPI_CHAR))
778 edge_import.emplace(
779 item.first, io::datapack::LightObjectPack::parse(
780 std::move(item.second)
781 ).get<std::vector<EdgePtrWrapper<T>>>()
782 );
783
784 for(auto& import_edge_list_from_proc : edge_import) {
785 for(auto& imported_edge : import_edge_list_from_proc.second) {
786 this->importEdge(imported_edge);
787 }
788 import_edge_list_from_proc.second.clear();
789 }
790 // edge_import is deleted at the end of this scope
791 }
792
793
794 FPMAS_LOGD(getMpiCommunicator().getRank(), "DIST_GRAPH", "Clearing exported nodes...", "");
795 for(auto node : exported_nodes) {
796 clearNode(node);
797 }
798
799 FPMAS_LOGD(getMpiCommunicator().getRank(), "DIST_GRAPH", "Exported nodes cleared.", "");
800
801
802 location_manager.updateLocations();
803
804 FPMAS_LOGI(getMpiCommunicator().getRank(), "DIST_GRAPH",
805 "End of distribution.", "");
806 }
807
808 template<DIST_GRAPH_PARAMS>
809 void DistributedGraph<DIST_GRAPH_PARAMS_SPEC>
810 ::synchronize() {
811 FPMAS_LOGI(getMpiCommunicator().getRank(), "DIST_GRAPH",
812 "Synchronizing graph...", "");
813
814 sync_mode.getSyncLinker().synchronize();
815
816 clearDistantNodes();
817
818 sync_mode.getDataSync().synchronize();
819
820 FPMAS_LOGI(getMpiCommunicator().getRank(), "DIST_GRAPH",
821 "End of graph synchronization.", "");
822 }
823
824 template<DIST_GRAPH_PARAMS>
826 ::synchronize(std::unordered_set<NodeType*> nodes, bool synchronize_links) {
827 FPMAS_LOGI(getMpiCommunicator().getRank(), "DIST_GRAPH",
828 "Partially synchronizing graph...", "");
829
830 if(synchronize_links) {
831 sync_mode.getSyncLinker().synchronize();
832 clearDistantNodes();
833 }
834
835 sync_mode.getDataSync().synchronize(nodes);
836
837 for(auto node : nodes)
838 unsynchronized_nodes.erase(node);
839
840 FPMAS_LOGI(getMpiCommunicator().getRank(), "DIST_GRAPH",
841 "End of partial graph synchronization.", "");
842 }
843
844 template<DIST_GRAPH_PARAMS>
847 NodeMap distant_nodes = location_manager.getDistantNodes();
848 for(auto node : distant_nodes)
849 clearNode(node.second);
850 }
851
852 template<DIST_GRAPH_PARAMS>
854 ::clearNode(NodeType* node) {
855 DistributedId id = node->getId();
856 FPMAS_LOGD(
857 mpi_communicator->getRank(),
858 "DIST_GRAPH", "Clearing node %s...",
859 FPMAS_C_STR(id)
860 );
861
862 bool eraseNode = true;
863 std::set<EdgeType*> obsoleteEdges;
864 for(auto edge : node->getIncomingEdges()) {
865 if(edge->getSourceNode()->state()==LocationState::LOCAL) {
866 eraseNode = false;
867 } else {
868 obsoleteEdges.insert(edge);
869 }
870 }
871 for(auto edge : node->getOutgoingEdges()) {
872 if(edge->getTargetNode()->state()==LocationState::LOCAL) {
873 eraseNode = false;
874 } else {
875 obsoleteEdges.insert(edge);
876 }
877 }
878 if(eraseNode) {
879 FPMAS_LOGD(
880 mpi_communicator->getRank(),
881 "DIST_GRAPH", "Erasing obsolete node %s.",
882 FPMAS_C_STR(node->getId())
883 );
884 this->erase(node);
885 } else {
886 for(auto edge : obsoleteEdges) {
887 FPMAS_LOGD(
888 mpi_communicator->getRank(),
889 "DIST_GRAPH", "Erasing obsolete edge %s (%p) (from %s to %s)",
890 FPMAS_C_STR(node->getId()), edge,
891 FPMAS_C_STR(edge->getSourceNode()->getId()),
892 FPMAS_C_STR(edge->getTargetNode()->getId())
893 );
894 this->erase(edge);
895 }
896 }
897 FPMAS_LOGD(
898 mpi_communicator->getRank(),
899 "DIST_GRAPH", "Node %s cleared.",
900 FPMAS_C_STR(id)
901 );
902 }
903
904 template<DIST_GRAPH_PARAMS>
905 DistributedGraph<DIST_GRAPH_PARAMS_SPEC>::~DistributedGraph() {
906 //TODO: Refactor callback system
907 if(erase_node_callback != nullptr)
908 // This DistributedGraph has not been moved
909 erase_node_callback->disable();
910 for(auto callback : set_local_callbacks)
911 delete callback;
912 for(auto callback : set_distant_callbacks)
913 delete callback;
914 }
915
916 }
917
923 template<typename T, template<typename> class SyncMode>
927
928}}
929
930namespace nlohmann {
940 template<typename T>
941 class adl_serializer<fpmas::api::graph::DistributedGraph<T>> {
942 private:
944
945 class NullTemporaryNode : public fpmas::api::graph::TemporaryNode<T> {
946 private:
947 DistributedId id;
948 public:
949 NullTemporaryNode(DistributedId id) : id(id) {
950 }
951
952 DistributedId getId() const override {
953 return id;
954 }
955
956 int getLocation() const override {
957 assert(false);
958 return -1;
959 }
960
961 fpmas::api::graph::DistributedNode<T>* build() override {
962 assert(false);
963 return nullptr;
964 }
965 };
966
967 public:
968
977 static void to_json(json& j, const fpmas::api::graph::DistributedGraph<T>& graph) {
978 // Specifies the rank on which the local distributed graph
979 // was built
980 j["rank"] = graph.getMpiCommunicator().getRank();
981 for(auto node : graph.getNodes())
982 j["graph"]["nodes"].push_back({
983 NodePtrWrapper<T>(node.second),
984 node.second->location()
985 });
986 for(auto edge : graph.getEdges()) {
987 j["graph"]["edges"].push_back({
988 {"id", edge.first},
989 {"layer", edge.second->getLayer()},
990 {"weight", edge.second->getWeight()},
991 {"src", edge.second->getSourceNode()->getId()},
992 {"tgt", edge.second->getTargetNode()->getId()}
993 });
994 }
995 j["edge_id"] = graph.currentEdgeId();
996 j["node_id"] = graph.currentNodeId();
997 j["loc_manager"] = graph.getLocationManager();
998 nlohmann::json::json_serializer<fpmas::api::graph::LocationManager<T>, void>
999 ::to_json(j["loc_manager"], graph.getLocationManager());
1000 }
1001
1015 static void from_json(const json& j, fpmas::api::graph::DistributedGraph<T>& graph) {
1016 auto j_graph = j["graph"];
1017 for(auto j_node : j_graph["nodes"]) {
1018 auto location = j_node[1].get<int>();
1020 = j_node[0].get<NodePtrWrapper<T>>();
1021 if(location == graph.getMpiCommunicator().getRank())
1022 node = graph.importNode(node);
1023 else
1024 node = graph.insertDistant(node);
1025
1026 node->setLocation(location);
1027 }
1028 for(auto j_edge : j_graph["edges"]) {
1029 auto edge = new fpmas::graph::DistributedEdge<T>(
1030 j_edge["id"].get<fpmas::graph::DistributedId>(),
1031 j_edge["layer"].get<fpmas::graph::LayerId>()
1032 );
1033 edge->setWeight(j_edge["weight"].get<float>());
1034 // Temp source and target are never used (expect for
1035 // their ids) since source and target are necessarily
1036 // contained in the graph (see previous step)
1037 auto src_id = j_edge["src"].get<DistributedId>();
1038 edge->setTempSourceNode(
1039 std::unique_ptr<fpmas::api::graph::TemporaryNode<T>>(
1040 new NullTemporaryNode(src_id)
1041 )
1042 );
1043 auto tgt_id = j_edge["tgt"].get<DistributedId>();
1044 edge->setTempTargetNode(
1045 std::unique_ptr<fpmas::api::graph::TemporaryNode<T>>(
1046 new NullTemporaryNode(tgt_id)
1047 )
1048 );
1049 assert(graph.getNodes().count(src_id) > 0);
1050 assert(graph.getNodes().count(tgt_id) > 0);
1051 graph.importEdge(edge);
1052 }
1053 graph.setCurrentNodeId(j.at("node_id").get<DistributedId>());
1054 graph.setCurrentEdgeId(j.at("edge_id").get<DistributedId>());
1055
1056 nlohmann::json::json_serializer<fpmas::api::graph::LocationManager<T>, void>
1057 ::from_json(j["loc_manager"], graph.getLocationManager());
1058 }
1059 };
1060}
1061#endif
Definition: communication.h:251
Definition: distributed_edge.h:91
virtual void setState(LocationState state)=0
virtual std::unique_ptr< TemporaryNode< T > > getTempSourceNode()=0
virtual LocationState state() const =0
virtual std::unique_ptr< TemporaryNode< T > > getTempTargetNode()=0
Definition: distributed_graph.h:169
virtual api::communication::MpiCommunicator & getMpiCommunicator()=0
virtual DistributedEdge< T > * importEdge(DistributedEdge< T > *edge)=0
virtual DistributedId currentEdgeId() const =0
virtual DistributedId currentNodeId() const =0
virtual void setCurrentNodeId(DistributedId id)=0
virtual DistributedNode< T > * insertDistant(DistributedNode< T > *node)=0
virtual DistributedNode< T > * importNode(DistributedNode< T > *node)=0
virtual void setCurrentEdgeId(DistributedId id)=0
virtual LocationManager< T > & getLocationManager()=0
Definition: distributed_id.h:89
Definition: distributed_node.h:28
virtual LocationState state() const =0
virtual void setMutex(synchro::Mutex< T > *mutex)=0
virtual void setState(LocationState state)=0
virtual IdType getId() const =0
virtual void setTargetNode(NodeType *const tgt)=0
virtual void setLayer(LayerId layer)=0
virtual void setSourceNode(NodeType *const src)=0
virtual NodeType * getSourceNode() const =0
virtual NodeType * getTargetNode() const =0
Definition: load_balancing.h:47
virtual PartitionMap balance(NodeMap< T > nodes, PartitionMap fixed_vertices)=0
virtual void erase(NodeType *node)=0
virtual const NodeMap & getNodes() const =0
virtual const EdgeMap & getEdges() const =0
std::unordered_map< NodeIdType, DistributedNode< T > *, NodeIdHash > NodeMap
Definition: graph.h:50
Definition: load_balancing.h:92
virtual PartitionMap balance(NodeMap< T > nodes)=0
virtual void unlinkOut(EdgeType *edge)=0
virtual void setWeight(float weight)=0
virtual const std::vector< EdgeType * > getIncomingEdges() const =0
virtual void linkIn(EdgeType *edge)=0
virtual float getWeight() const =0
virtual void unlinkIn(EdgeType *edge)=0
virtual IdType getId() const =0
virtual const std::vector< EdgeType * > getOutgoingEdges() const =0
virtual void linkOut(EdgeType *edge)=0
Definition: distributed_edge.h:47
Definition: callback.h:16
Definition: distributed_edge.h:22
Definition: distributed_node.h:25
Definition: graph.h:21
void addCallOnEraseNode(api::utils::Callback< api::graph::DistributedNode< T > * > *callback) override
Definition: graph.h:72
api::graph::DistributedEdge< T > * getEdge(EdgeIdType) override
Definition: graph.h:246
api::graph::DistributedNode< T > * getNode(NodeIdType) override
Definition: graph.h:228
Definition: location_manager.h:22
Definition: distributed_graph.h:49
void balance(api::graph::LoadBalancing< T > &load_balancing) override
Definition: distributed_graph.h:214
LocationManagerImpl< T > & getLocationManager() override
Definition: distributed_graph.h:212
DistributedGraph(api::communication::MpiCommunicator &comm)
Definition: distributed_graph.h:159
void setCurrentEdgeId(DistributedId id) override
Definition: distributed_graph.h:188
void balance(api::graph::FixedVerticesLoadBalancing< T > &load_balancing, api::graph::PartitionMap fixed_vertices) override
Definition: distributed_graph.h:245
EdgeType * importEdge(EdgeType *edge) override
Definition: distributed_graph.h:464
api::graph::DistributedNode< T > * insertDistant(api::graph::DistributedNode< T > *) override
Definition: distributed_graph.h:610
void addCallOnSetDistant(SetDistantNodeCallback *callback) override
Definition: distributed_graph.h:311
api::communication::MpiCommunicator & getMpiCommunicator() override
Definition: distributed_graph.h:174
NodeType * importNode(NodeType *node) override
Definition: distributed_graph.h:414
void setCurrentNodeId(DistributedId id) override
Definition: distributed_graph.h:186
const api::communication::MpiCommunicator & getMpiCommunicator() const override
Definition: distributed_graph.h:181
void switchLayer(EdgeType *edge, api::graph::LayerId layer_id) override
Definition: distributed_graph.h:401
void balance(api::graph::FixedVerticesLoadBalancing< T > &load_balancing, api::graph::PartitionMap fixed_vertices, api::graph::PartitionMode partition_mode) override
Definition: distributed_graph.h:252
void removeNode(api::graph::DistributedNode< T > *) override
Definition: distributed_graph.h:630
EdgeType * link(NodeType *const src, NodeType *const tgt, api::graph::LayerId layer) override
Definition: distributed_graph.h:637
void synchronize() override
Definition: distributed_graph.h:810
SyncMode< T > & synchronizationMode() override
Definition: distributed_graph.h:207
std::unordered_set< api::graph::DistributedNode< T > * > getUnsyncNodes() const override
Definition: distributed_graph.h:192
api::graph::DistributedNode< T > NodeType
Definition: distributed_graph.h:54
void removeNode(DistributedId id) override
Definition: distributed_graph.h:293
void addCallOnSetLocal(SetLocalNodeCallback *callback) override
Definition: distributed_graph.h:304
const SyncMode< T > & getSyncMode() const
Definition: distributed_graph.h:205
std::vector< SetLocalNodeCallback * > onSetLocalCallbacks() const override
Definition: distributed_graph.h:307
void unlink(DistributedId id) override
Definition: distributed_graph.h:298
NodeType * buildNode(T &&=std::move(T())) override
Definition: distributed_graph.h:596
void unlink(EdgeType *) override
Definition: distributed_graph.h:395
const LocationManagerImpl< T > & getLocationManager() const override
Definition: distributed_graph.h:210
std::vector< SetDistantNodeCallback * > onSetDistantCallbacks() const override
Definition: distributed_graph.h:314
api::graph::DistributedEdge< T > EdgeType
Definition: distributed_graph.h:68
DistributedId currentNodeId() const override
Definition: distributed_graph.h:185
DistributedId currentEdgeId() const override
Definition: distributed_graph.h:187
void distribute(api::graph::PartitionMap partition) override
Definition: distributed_graph.h:663
void balance(api::graph::LoadBalancing< T > &load_balancing, api::graph::PartitionMode partition_mode) override
Definition: distributed_graph.h:218
static void to_json(json &j, const fpmas::api::graph::DistributedGraph< T > &graph)
Definition: distributed_graph.h:977
static void from_json(const json &j, fpmas::api::graph::DistributedGraph< T > &graph)
Definition: distributed_graph.h:1015
#define FPMAS_C_STR(arg)
Definition: macros.h:24
LocationState
Definition: location_state.h:15
@ DISTANT
Definition: location_state.h:28
@ LOCAL
Definition: location_state.h:21
std::unordered_map< DistributedId, int, api::graph::IdHash< DistributedId > > PartitionMap
Definition: load_balancing.h:19
int LayerId
Definition: edge.h:13
PartitionMode
Definition: load_balancing.h:30
@ PARTITION
Definition: load_balancing.h:35
typename graph::Graph< graph::DistributedNode< T >, graph::DistributedEdge< T > >::NodeMap NodeMap
Definition: load_balancing.h:25
detail::DistributedGraph< T, SyncMode, graph::DistributedNode, graph::DistributedEdge, communication::TypedMpi, graph::LocationManager > DistributedGraph
Definition: distributed_graph.h:926
BasicObjectPack< LightSerializer > LightObjectPack
Definition: datapack.h:1395
BasicObjectPack< Serializer > ObjectPack
Definition: datapack.h:1393
Definition: fpmas.cpp:3
Definition: distributed_graph.h:99
Context
Definition: distributed_graph.h:103
Definition: distributed_graph.h:42
Context
Definition: distributed_graph.h:46
Definition: callback.h:33
Definition: communication.h:585
Definition: synchro.h:20