1#ifndef FPMAS_COMMUNICATION_H
2#define FPMAS_COMMUNICATION_H
10#include <unordered_map>
12#include "fpmas/utils/log.h"
17 void init(
int argc,
char** argv);
20namespace fpmas {
namespace communication {
21 using api::communication::DataPack;
22 using api::communication::Request;
23 using api::communication::Status;
50 static void convertStatus(MPI_Status&,
Status&, MPI_Datatype datatype);
92 const void* data,
int count, MPI_Datatype datatype,
93 int destination,
int tag)
override;
96 const DataPack& data, MPI_Datatype datatype,
97 int destination,
int tag)
override;
105 void send(
int destination,
int tag)
override;
109 const void* data,
int count, MPI_Datatype datatype,
110 int destination,
int tag,
Request& req)
override;
113 const DataPack& data, MPI_Datatype datatype,
114 int destination,
int tag,
Request& req)
override;
116 void Isend(
int destination,
int tag,
Request& req)
override;
129 const void* data,
int count, MPI_Datatype datatype,
130 int destination,
int tag,
Request& req)
override;
133 const DataPack& data, MPI_Datatype datatype,
134 int destination,
int tag,
Request& req)
override;
142 void Issend(
int destination,
int tag,
Request& req)
override;
164 void* buffer,
int count, MPI_Datatype datatype,
168 DataPack& data, MPI_Datatype datatype,
182 void probe(MPI_Datatype type,
int source,
int tag,
Status&)
override;
196 bool Iprobe(MPI_Datatype type,
int source,
int tag,
Status&)
override;
213 void waitAll(std::vector<Request>& req)
override;
222 std::unordered_map<int, DataPack>
223 allToAll(std::unordered_map<int, DataPack> export_map, MPI_Datatype datatype)
override;
234 std::vector<DataPack>
244 std::vector<DataPack>
304 this->
comm = MPI_COMM_WORLD;
305 MPI_Comm_group(MPI_COMM_WORLD, &this->
group);
306 MPI_Comm_size(MPI_COMM_WORLD, &this->
size);
307 MPI_Comm_rank(MPI_COMM_WORLD, &this->
rank);
314 extern MpiCommWorld
WORLD;
336 template<
typename T,
typename PackType>
364 std::unordered_map<int, std::vector<T>>
365 migrate(std::unordered_map<
int, std::vector<T>> export_map)
override;
370 std::unordered_map<int, T>
371 allToAll(std::unordered_map<int, T> export_map)
override;
376 std::vector<T>
gather(
const T&,
int root)
override;
384 T
bcast(
const T&,
int root)
override;
389 void send(
const T&,
int,
int)
override;
416 template<
typename T,
typename PackType> std::unordered_map<int, std::vector<T>>
418 std::unordered_map<int, DataPack> import_data_pack;
422 std::unordered_map<int, DataPack> export_data_pack;
423 for(
auto& item : export_map)
424 export_data_pack.emplace(item.first, PackType(item.second).dump());
429 import_data_pack = comm.allToAll(
430 std::move(export_data_pack), MPI_CHAR
434 std::unordered_map<int, std::vector<T>> import_map;
435 for(
auto& item : import_data_pack)
437 item.first, PackType::parse(std::move(item.second))
438 .template get<std::vector<T>>()
445 template<
typename T,
typename PackType> std::unordered_map<int, T>
448 std::unordered_map<int, DataPack> import_data_pack;
451 std::unordered_map<int, DataPack> export_data_pack;
452 for(
auto& item : export_map)
453 export_data_pack.emplace(item.first, PackType(item.second).dump());
458 import_data_pack = comm.allToAll(
459 std::move(export_data_pack), MPI_CHAR
463 std::unordered_map<int, T> import_map;
464 for(
auto& item : import_data_pack)
466 item.first, PackType::parse(std::move(item.second))
474 template<
typename T,
typename PackType> std::vector<T>
476 DataPack data_pack = PackType(data).dump();
478 std::vector<DataPack> import_data_pack
479 = comm.gather(data_pack, MPI_CHAR, root);
481 std::vector<T> import_data;
482 for(std::size_t i = 0; i < import_data_pack.size(); i++) {
483 import_data.emplace_back(
485 std::move(import_data_pack[i])).
template get<T>()
491 template<
typename T,
typename PackType> std::vector<T>
494 DataPack data_pack = PackType(data).dump();
496 std::vector<DataPack> import_data_pack
497 = comm.allGather(data_pack, MPI_CHAR);
499 std::vector<T> import_data;
500 for(std::size_t i = 0; i < import_data_pack.size(); i++) {
501 import_data.emplace_back(
503 std::move(import_data_pack[i])).
template get<T>()
509 template<
typename T,
typename PackType>
511 DataPack data_pack = PackType(data).dump();
513 DataPack recv_data_pack = comm.bcast(data_pack, MPI_CHAR, root);
515 return PackType::parse(std::move(recv_data_pack)).template get<T>();
518 template<
typename T,
typename PackType>
521 DataPack data_pack = PackType(data).dump();
522 comm.send(data_pack, MPI_CHAR, destination, tag);
525 template<
typename T,
typename PackType>
528 DataPack data_pack = PackType(data).dump();
529 comm.Isend(data_pack, MPI_CHAR, destination, tag, req);
533 template<
typename T,
typename PackType>
536 DataPack data_pack = PackType(data).dump();
537 comm.Issend(data_pack, MPI_CHAR, destination, tag, req);
540 template<
typename T,
typename PackType>
542 return comm.probe(MPI_CHAR, source, tag, status);
544 template<
typename T,
typename PackType>
546 return comm.Iprobe(MPI_CHAR, source, tag, status);
549 template<
typename T,
typename PackType>
551 Status message_to_receive_status;
552 this->probe(source, tag, message_to_receive_status);
553 int count = message_to_receive_status.
item_count;
554 DataPack data_pack(count,
sizeof(
char));
555 comm.recv(data_pack, MPI_CHAR, source, tag, status);
558 return PackType::parse(std::move(data_pack)).template get<T>();
608 template<
typename T,
typename BinaryOp = std::plus<T>>
611 const T& data, BinaryOp binary_op = BinaryOp()) {
612 std::vector<T> data_vec = mpi.
gather(data, root);
613 if(data_vec.size() > 0)
614 return std::accumulate(std::next(data_vec.begin()), data_vec.end(), data_vec[0], binary_op);
633 template<
typename T,
typename BinaryOp = std::plus<T>>
636 const T& data, BinaryOp binary_op = BinaryOp()) {
637 std::vector<T> data_vec = mpi.
allGather(data);
638 return std::accumulate(std::next(data_vec.begin()), data_vec.end(), data_vec[0], binary_op);
Definition: communication.h:16
Definition: communication.h:251
Definition: communication.h:637
virtual std::vector< T > gather(const T &data, int root)=0
virtual std::vector< T > allGather(const T &data)=0
Definition: communication.h:293
void init()
Definition: communication.h:303
Definition: communication.h:29
void send(const void *data, int count, MPI_Datatype datatype, int destination, int tag) override
Definition: communication.cpp:39
int getSize() const override
Definition: communication.cpp:35
MPI_Comm getMpiComm() const
Definition: communication.cpp:23
int rank
Definition: communication.h:38
MPI_Group getMpiGroup() const
Definition: communication.cpp:27
void wait(Request &req) override
Definition: communication.cpp:149
MPI_Comm comm
Definition: communication.h:47
bool Iprobe(MPI_Datatype type, int source, int tag, Status &) override
Definition: communication.cpp:130
std::vector< DataPack > allGather(DataPack data, MPI_Datatype datatype) override
Definition: communication.cpp:293
std::unordered_map< int, DataPack > allToAll(std::unordered_map< int, DataPack > export_map, MPI_Datatype datatype) override
Definition: communication.cpp:167
int size
Definition: communication.h:34
int getRank() const override
Definition: communication.cpp:31
void probe(MPI_Datatype type, int source, int tag, Status &) override
Definition: communication.cpp:124
std::vector< DataPack > gather(DataPack data, MPI_Datatype datatype, int root) override
Definition: communication.cpp:243
void barrier() override
Definition: communication.cpp:347
void Isend(const void *data, int count, MPI_Datatype datatype, int destination, int tag, Request &req) override
Definition: communication.cpp:55
bool test(Request &req) override
Definition: communication.cpp:139
void waitAll(std::vector< Request > &req) override
Definition: communication.cpp:155
void Issend(const void *data, int count, MPI_Datatype datatype, int destination, int tag, Request &req) override
Definition: communication.cpp:81
void recv(int source, int tag, Status &status=Status::IGNORE) override
Definition: communication.cpp:104
DataPack bcast(DataPack data, MPI_Datatype datatype, int root) override
Definition: communication.cpp:330
MPI_Group group
Definition: communication.h:43
Definition: communication.h:267
MpiCommunicator()
Definition: communication.cpp:351
~MpiCommunicator()
Definition: communication.cpp:363
Definition: communication.h:337
bool Iprobe(int source, int tag, Status &status) override
Definition: communication.h:545
T recv(int source, int tag, Status &status=Status::IGNORE) override
Definition: communication.h:550
std::vector< T > allGather(const T &) override
Definition: communication.h:492
T bcast(const T &, int root) override
Definition: communication.h:510
std::unordered_map< int, T > allToAll(std::unordered_map< int, T > export_map) override
Definition: communication.h:446
TypedMpi(api::communication::MpiCommunicator &comm)
Definition: communication.h:359
std::unordered_map< int, std::vector< T > > migrate(std::unordered_map< int, std::vector< T > > export_map) override
Definition: communication.h:417
std::vector< T > gather(const T &, int root) override
Definition: communication.h:475
void Issend(const T &, int, int, Request &) override
Definition: communication.h:534
void probe(int source, int tag, Status &status) override
Definition: communication.h:541
PackType pack_type
Definition: communication.h:344
void Isend(const T &, int, int, Request &) override
Definition: communication.h:526
void send(const T &, int, int) override
Definition: communication.h:519
MpiCommWorld WORLD
Definition: communication.cpp:11
T all_reduce(api::communication::TypedMpi< T > &mpi, const T &data, BinaryOp binary_op=BinaryOp())
Definition: communication.h:634
T reduce(api::communication::TypedMpi< T > &mpi, int root, const T &data, BinaryOp binary_op=BinaryOp())
Definition: communication.h:609
void init(int argc, char **argv)
Definition: fpmas.cpp:6
Definition: communication.h:162
Definition: communication.h:215
static Status IGNORE
Definition: communication.h:220
int item_count
Definition: communication.h:228
Definition: communication.h:585