fpmas 1.6
communication.h
Go to the documentation of this file.
1#ifndef FPMAS_COMMUNICATION_H
2#define FPMAS_COMMUNICATION_H
3
8#include <cstdint>
9#include <string>
10#include <unordered_map>
11
12#include "fpmas/utils/log.h"
15
16namespace fpmas {
17 void init(int argc, char** argv);
18}
19
20namespace fpmas { namespace communication {
21 using api::communication::DataPack;
22 using api::communication::Request;
23 using api::communication::Status;
24
30 protected:
34 int size;
38 int rank;
39
43 MPI_Group group;
47 MPI_Comm comm;
48
49 private:
50 static void convertStatus(MPI_Status&, Status&, MPI_Datatype datatype);
51
52 public:
58 MPI_Comm getMpiComm() const;
59
65 MPI_Group getMpiGroup() const;
66
72 int getRank() const override;
73
80 int getSize() const override;
81
91 void send(
92 const void* data, int count, MPI_Datatype datatype,
93 int destination, int tag) override;
94
95 void send(
96 const DataPack& data, MPI_Datatype datatype,
97 int destination, int tag) override;
98
105 void send(int destination, int tag) override;
106
107
108 void Isend(
109 const void* data, int count, MPI_Datatype datatype,
110 int destination, int tag, Request& req) override;
111
112 void Isend(
113 const DataPack& data, MPI_Datatype datatype,
114 int destination, int tag, Request& req) override;
115
116 void Isend(int destination, int tag, Request& req) override;
117
128 void Issend(
129 const void* data, int count, MPI_Datatype datatype,
130 int destination, int tag, Request& req) override;
131
132 void Issend(
133 const DataPack& data, MPI_Datatype datatype,
134 int destination, int tag, Request& req) override;
142 void Issend(int destination, int tag, Request& req) override;
143
151 void recv(int source, int tag, Status& status = Status::IGNORE) override;
152
163 void recv(
164 void* buffer, int count, MPI_Datatype datatype,
165 int source, int tag, Status& status = Status::IGNORE) override;
166
167 void recv(
168 DataPack& data, MPI_Datatype datatype,
169 int source, int tag, Status& status = Status::IGNORE) override;
170
182 void probe(MPI_Datatype type, int source, int tag, Status&) override;
183
196 bool Iprobe(MPI_Datatype type, int source, int tag, Status&) override;
197
204 bool test(Request& req) override;
205
211 void wait(Request& req) override;
212
213 void waitAll(std::vector<Request>& req) override;
214
222 std::unordered_map<int, DataPack>
223 allToAll(std::unordered_map<int, DataPack> export_map, MPI_Datatype datatype) override;
224
234 std::vector<DataPack>
235 gather(DataPack data, MPI_Datatype datatype, int root) override;
236
244 std::vector<DataPack>
245 allGather(DataPack data, MPI_Datatype datatype) override;
246
255 DataPack bcast(DataPack data, MPI_Datatype datatype, int root) override;
256
260 void barrier() override;
261 };
262
263
268 public:
276
277 MpiCommunicator(const MpiCommunicator&) = delete;
278 MpiCommunicator& operator=(const MpiCommunicator&) = delete;
279
280
287 };
288
294 public:
303 void init() {
304 this->comm = MPI_COMM_WORLD;
305 MPI_Comm_group(MPI_COMM_WORLD, &this->group);
306 MPI_Comm_size(MPI_COMM_WORLD, &this->size);
307 MPI_Comm_rank(MPI_COMM_WORLD, &this->rank);
308 };
309 };
310
314 extern MpiCommWorld WORLD;
315
316 namespace detail {
336 template<typename T, typename PackType>
338 private:
340 public:
344 typedef PackType pack_type;
345
360
364 std::unordered_map<int, std::vector<T>>
365 migrate(std::unordered_map<int, std::vector<T>> export_map) override;
366
370 std::unordered_map<int, T>
371 allToAll(std::unordered_map<int, T> export_map) override;
372
376 std::vector<T> gather(const T&, int root) override;
380 std::vector<T> allGather(const T&) override;
384 T bcast(const T&, int root) override;
385
389 void send(const T&, int, int) override;
390
394 void Isend(const T&, int, int, Request&) override;
395
399 void Issend(const T&, int, int, Request&) override;
400
404 void probe(int source, int tag, Status& status) override;
408 bool Iprobe(int source, int tag, Status& status) override;
409
413 T recv(int source, int tag, Status& status = Status::IGNORE) override;
414 };
415
416 template<typename T, typename PackType> std::unordered_map<int, std::vector<T>>
417 TypedMpi<T, PackType>::migrate(std::unordered_map<int, std::vector<T>> export_map) {
418 std::unordered_map<int, DataPack> import_data_pack;
419
420 {
421 // Pack
422 std::unordered_map<int, DataPack> export_data_pack;
423 for(auto& item : export_map)
424 export_data_pack.emplace(item.first, PackType(item.second).dump());
425
426 // export_data_pack buffers are moved to the temporary allToAll
427 // argument, and automatically freed by the allToAll
428 // implementation
429 import_data_pack = comm.allToAll(
430 std::move(export_data_pack), MPI_CHAR
431 );
432 }
433
434 std::unordered_map<int, std::vector<T>> import_map;
435 for(auto& item : import_data_pack)
436 import_map.emplace(
437 item.first, PackType::parse(std::move(item.second))
438 .template get<std::vector<T>>()
439 );
440
441 // Should perform "copy elision"
442 return import_map;
443 }
444
445 template<typename T, typename PackType> std::unordered_map<int, T>
446 TypedMpi<T, PackType>::allToAll(std::unordered_map<int, T> export_map) {
447 // Pack
448 std::unordered_map<int, DataPack> import_data_pack;
449
450 {
451 std::unordered_map<int, DataPack> export_data_pack;
452 for(auto& item : export_map)
453 export_data_pack.emplace(item.first, PackType(item.second).dump());
454
455 // export_data_pack buffers are moved to the temporary allToAll
456 // argument, and automatically freed by the allToAll
457 // implementation
458 import_data_pack = comm.allToAll(
459 std::move(export_data_pack), MPI_CHAR
460 );
461 }
462
463 std::unordered_map<int, T> import_map;
464 for(auto& item : import_data_pack)
465 import_map.emplace(
466 item.first, PackType::parse(std::move(item.second))
467 .template get<T>()
468 );
469
470 // Should perform "copy elision"
471 return import_map;
472 }
473
474 template<typename T, typename PackType> std::vector<T>
475 TypedMpi<T, PackType>::gather(const T& data, int root) {
476 DataPack data_pack = PackType(data).dump();
477
478 std::vector<DataPack> import_data_pack
479 = comm.gather(data_pack, MPI_CHAR, root);
480
481 std::vector<T> import_data;
482 for(std::size_t i = 0; i < import_data_pack.size(); i++) {
483 import_data.emplace_back(
484 PackType::parse(
485 std::move(import_data_pack[i])).template get<T>()
486 );
487 }
488 return import_data;
489 }
490
491 template<typename T, typename PackType> std::vector<T>
493 // Pack
494 DataPack data_pack = PackType(data).dump();
495
496 std::vector<DataPack> import_data_pack
497 = comm.allGather(data_pack, MPI_CHAR);
498
499 std::vector<T> import_data;
500 for(std::size_t i = 0; i < import_data_pack.size(); i++) {
501 import_data.emplace_back(
502 PackType::parse(
503 std::move(import_data_pack[i])).template get<T>()
504 );
505 }
506 return import_data;
507 }
508
509 template<typename T, typename PackType>
510 T TypedMpi<T, PackType>::bcast(const T& data, int root) {
511 DataPack data_pack = PackType(data).dump();
512
513 DataPack recv_data_pack = comm.bcast(data_pack, MPI_CHAR, root);
514
515 return PackType::parse(std::move(recv_data_pack)).template get<T>();
516 }
517
518 template<typename T, typename PackType>
519 void TypedMpi<T, PackType>::send(const T& data, int destination, int tag) {
520 //FPMAS_LOGD(comm.getRank(), "TYPED_MPI", "Send JSON to process %i : %s", destination, str.c_str());
521 DataPack data_pack = PackType(data).dump();
522 comm.send(data_pack, MPI_CHAR, destination, tag);
523 }
524
525 template<typename T, typename PackType>
526 void TypedMpi<T, PackType>::Isend(const T& data, int destination, int tag, Request& req) {
527 //FPMAS_LOGD(comm.getRank(), "TYPED_MPI", "Issend JSON to process %i : %s", destination, str.c_str());
528 DataPack data_pack = PackType(data).dump();
529 comm.Isend(data_pack, MPI_CHAR, destination, tag, req);
530 }
531
532
533 template<typename T, typename PackType>
534 void TypedMpi<T, PackType>::Issend(const T& data, int destination, int tag, Request& req) {
535 //FPMAS_LOGD(comm.getRank(), "TYPED_MPI", "Issend JSON to process %i : %s", destination, str.c_str());
536 DataPack data_pack = PackType(data).dump();
537 comm.Issend(data_pack, MPI_CHAR, destination, tag, req);
538 }
539
540 template<typename T, typename PackType>
541 void TypedMpi<T, PackType>::probe(int source, int tag, Status &status) {
542 return comm.probe(MPI_CHAR, source, tag, status);
543 }
544 template<typename T, typename PackType>
545 bool TypedMpi<T, PackType>::Iprobe(int source, int tag, Status &status) {
546 return comm.Iprobe(MPI_CHAR, source, tag, status);
547 }
548
549 template<typename T, typename PackType>
550 T TypedMpi<T, PackType>::recv(int source, int tag, Status& status) {
551 Status message_to_receive_status;
552 this->probe(source, tag, message_to_receive_status);
553 int count = message_to_receive_status.item_count;
554 DataPack data_pack(count, sizeof(char));
555 comm.recv(data_pack, MPI_CHAR, source, tag, status);
556
557 //FPMAS_LOGD(comm.getRank(), "TYPED_MPI", "Receive JSON from process %i : %s", source, data.c_str());
558 return PackType::parse(std::move(data_pack)).template get<T>();
559 }
560 }
561
584 template<typename T>
585 struct TypedMpi : public detail::TypedMpi<T, io::datapack::ObjectPack> {
587 };
588
589
608 template<typename T, typename BinaryOp = std::plus<T>>
610 api::communication::TypedMpi<T>& mpi, int root,
611 const T& data, BinaryOp binary_op = BinaryOp()) {
612 std::vector<T> data_vec = mpi.gather(data, root);
613 if(data_vec.size() > 0)
614 return std::accumulate(std::next(data_vec.begin()), data_vec.end(), data_vec[0], binary_op);
615 return data;
616 }
633 template<typename T, typename BinaryOp = std::plus<T>>
636 const T& data, BinaryOp binary_op = BinaryOp()) {
637 std::vector<T> data_vec = mpi.allGather(data);
638 return std::accumulate(std::next(data_vec.begin()), data_vec.end(), data_vec[0], binary_op);
639 }
640}}
641#endif
Definition: communication.h:16
Definition: communication.h:251
Definition: communication.h:637
virtual std::vector< T > gather(const T &data, int root)=0
virtual std::vector< T > allGather(const T &data)=0
Definition: communication.h:293
void init()
Definition: communication.h:303
Definition: communication.h:29
void send(const void *data, int count, MPI_Datatype datatype, int destination, int tag) override
Definition: communication.cpp:39
int getSize() const override
Definition: communication.cpp:35
MPI_Comm getMpiComm() const
Definition: communication.cpp:23
int rank
Definition: communication.h:38
MPI_Group getMpiGroup() const
Definition: communication.cpp:27
void wait(Request &req) override
Definition: communication.cpp:149
MPI_Comm comm
Definition: communication.h:47
bool Iprobe(MPI_Datatype type, int source, int tag, Status &) override
Definition: communication.cpp:130
std::vector< DataPack > allGather(DataPack data, MPI_Datatype datatype) override
Definition: communication.cpp:293
std::unordered_map< int, DataPack > allToAll(std::unordered_map< int, DataPack > export_map, MPI_Datatype datatype) override
Definition: communication.cpp:167
int size
Definition: communication.h:34
int getRank() const override
Definition: communication.cpp:31
void probe(MPI_Datatype type, int source, int tag, Status &) override
Definition: communication.cpp:124
std::vector< DataPack > gather(DataPack data, MPI_Datatype datatype, int root) override
Definition: communication.cpp:243
void barrier() override
Definition: communication.cpp:347
void Isend(const void *data, int count, MPI_Datatype datatype, int destination, int tag, Request &req) override
Definition: communication.cpp:55
bool test(Request &req) override
Definition: communication.cpp:139
void waitAll(std::vector< Request > &req) override
Definition: communication.cpp:155
void Issend(const void *data, int count, MPI_Datatype datatype, int destination, int tag, Request &req) override
Definition: communication.cpp:81
void recv(int source, int tag, Status &status=Status::IGNORE) override
Definition: communication.cpp:104
DataPack bcast(DataPack data, MPI_Datatype datatype, int root) override
Definition: communication.cpp:330
MPI_Group group
Definition: communication.h:43
Definition: communication.h:267
MpiCommunicator()
Definition: communication.cpp:351
~MpiCommunicator()
Definition: communication.cpp:363
Definition: communication.h:337
bool Iprobe(int source, int tag, Status &status) override
Definition: communication.h:545
T recv(int source, int tag, Status &status=Status::IGNORE) override
Definition: communication.h:550
std::vector< T > allGather(const T &) override
Definition: communication.h:492
T bcast(const T &, int root) override
Definition: communication.h:510
std::unordered_map< int, T > allToAll(std::unordered_map< int, T > export_map) override
Definition: communication.h:446
TypedMpi(api::communication::MpiCommunicator &comm)
Definition: communication.h:359
std::unordered_map< int, std::vector< T > > migrate(std::unordered_map< int, std::vector< T > > export_map) override
Definition: communication.h:417
std::vector< T > gather(const T &, int root) override
Definition: communication.h:475
void Issend(const T &, int, int, Request &) override
Definition: communication.h:534
void probe(int source, int tag, Status &status) override
Definition: communication.h:541
PackType pack_type
Definition: communication.h:344
void Isend(const T &, int, int, Request &) override
Definition: communication.h:526
void send(const T &, int, int) override
Definition: communication.h:519
MpiCommWorld WORLD
Definition: communication.cpp:11
T all_reduce(api::communication::TypedMpi< T > &mpi, const T &data, BinaryOp binary_op=BinaryOp())
Definition: communication.h:634
T reduce(api::communication::TypedMpi< T > &mpi, int root, const T &data, BinaryOp binary_op=BinaryOp())
Definition: communication.h:609
Definition: fpmas.cpp:3
void init(int argc, char **argv)
Definition: fpmas.cpp:6
Definition: communication.h:162
Definition: communication.h:215
static Status IGNORE
Definition: communication.h:220
int item_count
Definition: communication.h:228
Definition: communication.h:585