COMBINATORIAL_BLAS 1.6
 
Loading...
Searching...
No Matches
Reductions.h
Go to the documentation of this file.
1#ifndef _REDUCTIONS_H_
2#define _REDUCTIONS_H_
3
4
5#include <mpi.h>
6#include <sys/time.h>
7#include <iostream>
8#include <iomanip>
9#include <functional>
10#include <algorithm>
11#include <vector>
12#include <string>
13#include <sstream>
14
15
16#include "CombBLAS/CombBLAS.h"
17#include "Glue.h"
18#include "CCGrid.h"
19
20namespace combblas {
21
22
23/***************************************************************************
24 * Distribute a local m/sqrt(p) x n/sqrt(p) matrix (represented by a list of tuples) across layers
25 * so that a each processor along the third dimension receives m/sqrt(p) x n/(c*sqrt(p)) submatrices.
26 * After receiving c submatrices, they are merged to create one m/sqrt(p) x n/(c*sqrt(p)) matrix.
27 * Assumption: input tuples are deleted
28 * Inputs:
29 * fibWorld: Communicator along the third dimension
30 * localmerged: input array of tuples, which will be distributed across layers
31 * Output: output array of tuples, after distributing across layers
32 and merging locally in the received processor
33 *
34 ***************************************************************************/
35
36template <typename SR, typename IT, typename NT>
38{
39 double comp_begin, comm_begin, comp_time=0, comm_time=0;
40 int fprocs, fibrank;
41 MPI_Comm_size(fibWorld,&fprocs);
42 MPI_Comm_rank(fibWorld,&fibrank);
43 IT mdim = localmerged->getnrow();
44 IT ndim = localmerged->getncol();
45 if(fprocs == 1)
46 {
47 return localmerged;
48 }
49
50
51 // ------------ find splitters to distributed across layers -----------
52 comp_begin = MPI_Wtime();
53 std::vector<int> send_sizes(fprocs);
54 std::vector<int> recv_sizes(fprocs);
55 std::vector<int> recv_offsets(fprocs);
56 std::vector<int> send_offsets = findColSplitters<int>(localmerged, fprocs);
57 for(int i=0; i<fprocs; i++)
58 {
59 send_sizes[i] = send_offsets[i+1] - send_offsets[i];
60 }
61 comp_time += (MPI_Wtime() - comp_begin);
62
63
64 // ------------ Communicate counts -----------
65 comm_begin = MPI_Wtime();
66 MPI_Alltoall( send_sizes.data(), 1, MPI_INT, recv_sizes.data(), 1, MPI_INT,fibWorld);
67 comm_time += (MPI_Wtime() - comm_begin);
68 MPI_Datatype MPI_triple;
69 MPI_Type_contiguous(sizeof(std::tuple<IT,IT,NT>), MPI_CHAR, &MPI_triple);
70 MPI_Type_commit(&MPI_triple);
71
72
73 // ------------ Allocate memory to receive data -----------
74 comp_begin = MPI_Wtime();
75 int recv_count = 0;
76 for( int i = 0; i < fprocs; i++ )
77 {
78 recv_count += recv_sizes[i];
79 }
80 std::tuple<IT,IT,NT> * recvbuf = static_cast<std::tuple<IT, IT, NT>*> (::operator new (sizeof(std::tuple<IT, IT, NT>[recv_count])));
81
82 recv_offsets[0] = 0;
83 for( int i = 1; i < fprocs; i++ )
84 {
85 recv_offsets[i] = recv_offsets[i-1]+recv_sizes[i-1];
86 }
87 comp_time += (MPI_Wtime() - comp_begin);
88
89
90 // ------------ Communicate split tuples -----------
91 comm_begin = MPI_Wtime();
92 MPI_Alltoallv( localmerged->tuples, send_sizes.data(), send_offsets.data(), MPI_triple, recvbuf, recv_sizes.data(), recv_offsets.data(), MPI_triple, fibWorld); // WARNING: is this big enough?
93 comm_time += (MPI_Wtime() - comm_begin);
94
95
96
97 // -------- update column indices of split tuples ----------
98 comp_begin = MPI_Wtime();
99 IT ndimSplit = ndim/fprocs;
100 if(fibrank==(fprocs-1))
101 ndimSplit = ndim - ndimSplit * fibrank;
102 IT coloffset = fibrank * ndimSplit;
103#pragma omp parallel for
104 for(int k=0; k<recv_count; k++)
105 {
106 std::get<1>(recvbuf[k]) = std::get<1>(recvbuf[k]) - coloffset;
107 }
108
109
110 // -------- create vector of SpTuples for MultiwayMerge ----------
111 std::vector< SpTuples<IT,NT>* > lists;
112 for(int i=0; i< fprocs; ++i)
113 {
114 SpTuples<IT, NT>* spTuples = new SpTuples<IT, NT> (recv_sizes[i], mdim, ndimSplit, &recvbuf[recv_offsets[i]], true); // If needed pass an empty object of proper dimension
115 lists.push_back(spTuples);
116 }
117
118 // -------- merge received tuples ----------
119 SpTuples<IT,NT> * globalmerged = MultiwayMerge<SR>(lists, mdim, ndimSplit, false);
120
121 comp_time += (MPI_Wtime() - comp_begin);
122 comp_reduce_layer += comp_time;
123 comm_reduce += comm_time;
124
125
126 ::operator delete(recvbuf);
127 delete localmerged; // not sure if we can call ::operator delete here
128
129 return globalmerged;
130}
131
132
133template <typename NT, typename IT>
134SpDCCols<IT,NT> * ReduceAll_threaded(std::vector< SpTuples<IT,NT>* > & unreducedC, CCGrid & CMG)
135{
137 IT mdim = unreducedC[0]->getnrow();
138 IT ndim = unreducedC[0]->getncol();
139
140 // ------ merge list of tuples from n/sqrt(p) stages of SUMMA -------
141 double loc_beg1 = MPI_Wtime();
142 //SpTuples<IT, NT>* localmerged = multiwayMerge(unreducedC, true);
143 SpTuples<IT, NT>* localmerged = MultiwayMerge<PTDD>(unreducedC, mdim, ndim, true);
144 comp_reduce += (MPI_Wtime() - loc_beg1);
145
146 // scatter local tuples across layers
147 SpTuples<IT,NT> * mergedSpTuples = ParallelReduce_Alltoall_threaded<PTDD>(CMG.fiberWorld, localmerged);
148
149 loc_beg1 = MPI_Wtime();
150 // TODO: this is not a good constructor. Change it back to SpTuple-based constructor
151 SpDCCols<IT,NT> * reducedC = new SpDCCols<IT,NT>(mergedSpTuples->getnrow(), mergedSpTuples->getncol(), mergedSpTuples->getnnz(), mergedSpTuples->tuples, false);
152 comp_result += (MPI_Wtime() - loc_beg1);
153 delete mergedSpTuples; // too expensive
154 return reducedC;
155}
156
157}
158
159#endif
160
161
double comm_reduce
double comp_reduce
double comp_result
double comp_reduce_layer
MPI_Comm fiberWorld
Definition CCGrid.h:42
IT getncol() const
Definition SpTuples.h:268
std::tuple< IT, IT, NT > * tuples
Definition SpTuples.h:272
IT getnrow() const
Definition SpTuples.h:267
int64_t getnnz() const
Definition SpTuples.h:269
SpTuples< IT, NT > * ParallelReduce_Alltoall_threaded(MPI_Comm &fibWorld, SpTuples< IT, NT > *&localmerged)
Definition Reductions.h:37
SpDCCols< IT, NT > * ReduceAll_threaded(std::vector< SpTuples< IT, NT > * > &unreducedC, CCGrid &CMG)
Definition Reductions.h:134