Cyclops Tensor Framework
parallel arithmetic on multidimensional arrays
dgtog_redist_ror.h
Go to the documentation of this file.
1 
2 //to be included with proper ifdefs and inside a namespace
3 
4 using namespace CTF_int;
5 
6 #ifdef PUT_NOTIFY
7 typedef foMPI_Request CTF_Request;
8 #define MPI_Waitany(...) foMPI_Waitany(__VA_ARGS__)
9 #else
10 typedef MPI_Request CTF_Request;
11 #endif
12 
13 
14 
15 template <int idim>
16 void isendrecv(int * const * pe_offset,
17  int * const * bucket_offset,
18  int const * rep_phase,
19  int64_t const * counts,
20  int64_t const * displs,
21  CTF_Request * reqs,
22 #ifdef PUT_NOTIFY
23  foMPI_Win & cm,
24 #else
25  MPI_Comm cm,
26 #endif
27  char * buffer,
28  algstrct const * sr,
29  int bucket_off,
30  int pe_off,
31  int dir){
32  for (int r=0; r<rep_phase[idim]; r++){
33  int rec_bucket_off = bucket_off+bucket_offset[idim][r];
34  int rec_pe_off = pe_off+pe_offset[idim][r];
35  isendrecv<idim-1>(pe_offset, bucket_offset, rep_phase, counts, displs, reqs, cm, buffer, sr, rec_bucket_off, rec_pe_off, dir);
36  }
37 }
38 
39 template <>
40 void isendrecv<0>
41  (int * const * pe_offset,
42  int * const * bucket_offset,
43  int const * rep_phase,
44  int64_t const * counts,
45  int64_t const * displs,
46  CTF_Request * reqs,
47 #ifdef PUT_NOTIFY
48  foMPI_Win & cm,
49 #else
50  MPI_Comm cm,
51 #endif
52  char * buffer,
53  algstrct const * sr,
54  int bucket_off,
55  int pe_off,
56  int dir){
57  for (int r=0; r<rep_phase[0]; r++){
58  int bucket = bucket_off+r;
59  int pe = pe_off+pe_offset[0][r];
60 #ifdef PUT_NOTIFY
61  ASSERT(dir);
62  foMPI_Notify_init(cm, pe, MTAG, 1, reqs+bucket);
63  foMPI_Start(reqs+bucket);
64 #else
65  if (dir)
66  MPI_Irecv(buffer+displs[bucket]*sr->el_size, counts[bucket], sr->mdtype(), pe, MTAG, cm, reqs+bucket);
67  else
68  MPI_Isend(buffer+displs[bucket]*sr->el_size, counts[bucket], sr->mdtype(), pe, MTAG, cm, reqs+bucket);
69 #endif
70  }
71 }
72 
73 #ifdef ROR
74 template <int idim>
75 void redist_bucket_ror(int * const * bucket_offset,
76  int64_t * const * data_offset,
77  int * const * ivmax_pre,
78  int const * rep_phase,
79  int const * rep_idx,
80  int virt_dim0,
81  bool data_to_buckets,
82  char * __restrict__ data,
83  char ** __restrict__ buckets,
84  int64_t * counts,
85  algstrct const * sr,
86  int64_t data_off=0,
87  int bucket_off=0,
88  int prev_idx=0){
89  int ivmax = ivmax_pre[idim][prev_idx];
90  for (int iv=rep_idx[idim]; iv<=ivmax; iv+=rep_phase[idim]){
91  int64_t rec_data_off = data_off + data_offset[idim][iv];
92  redist_bucket_ror<idim-1>(bucket_offset, data_offset, ivmax_pre, rep_phase, rep_idx, virt_dim0, data_to_buckets, data, buckets, counts, sr, rec_data_off, bucket_off, iv);
93  }
94 }
95 
96 template <>
98  (int * const * bucket_offset,
99  int64_t * const * data_offset,
100  int * const * ivmax_pre,
101  int const * rep_phase,
102  int const * rep_idx,
103  int virt_dim0,
104  bool data_to_buckets,
105  char * __restrict__ data,
106  char ** __restrict__ buckets,
107  int64_t * counts,
108  algstrct const * sr,
109  int64_t data_off,
110  int bucket_off,
111  int prev_idx){
112  if (rep_idx[0] == -1)
113  redist_bucket<0>(bucket_offset, data_offset, ivmax_pre, rep_phase[0], virt_dim0, data_to_buckets, data, buckets, counts, sr, data_off, bucket_off, prev_idx);
114  else
115  CTF_int::redist_bucket_r0(bucket_offset, data_offset, ivmax_pre, rep_phase[0], rep_idx[0], virt_dim0, data_to_buckets, data, buckets, counts, sr, data_off, bucket_off, prev_idx);
116 }
117 
118 #ifdef PUTREDIST
119 template <int idim>
120 void put_buckets(int const * rep_phase,
121  int * const * pe_offset,
122  int * const * bucket_offset,
123  char * const * __restrict__ buckets,
124  int64_t const * counts,
125  algstrct const * sr,
126  int64_t const * put_displs,
127  CTF_Win & win,
128  int bucket_off,
129  int pe_off){
130  for (int r=0; r<rep_phase[idim]; r++){
131  int rec_bucket_off = bucket_off+bucket_offset[idim][r];
132  int rec_pe_off = pe_off+pe_offset[idim][r];
133  put_buckets<idim-1>(rep_phase, pe_offset, bucket_offset, buckets, counts, sr, put_displs, win, rec_bucket_off, rec_pe_off);
134  }
135 }
136 
137 template <>
138 void put_buckets<0>(
139  int const * rep_phase,
140  int * const * pe_offset,
141  int * const * bucket_offset,
142  char * const * __restrict__ buckets,
143  int64_t const * counts,
144  algstrct const * sr,
145  int64_t const * put_displs,
146  CTF_Win & win,
147  int bucket_off,
148  int pe_off){
149  for (int r=0; r<rep_phase[0]; r++){
150  int rec_pe_off = pe_off + pe_offset[0][r];
151  int rec_bucket_off = bucket_off + bucket_offset[0][r];
152 #ifdef PUT_NOTIFY
153  foMPI_Put_notify(buckets[rec_bucket_off], counts[rec_bucket_off], sr->mdtype(), rec_pe_off, put_displs[rec_bucket_off], counts[rec_bucket_off], sr->mdtype(), win, MTAG);
154 #else
155  MPI_Put(buckets[rec_bucket_off], counts[rec_bucket_off], sr->mdtype(), rec_pe_off, put_displs[rec_bucket_off], counts[rec_bucket_off], sr->mdtype(), win);
156 #endif
157  }
158 }
159 #endif
160 
161 template <int idim>
162 void redist_bucket_isr(int order,
163  int * const * pe_offset,
164  int * const * bucket_offset,
165  int64_t * const * data_offset,
166  int * const * ivmax_pre,
167  int const * rep_phase,
168  int * rep_idx,
169  int virt_dim0,
170 #ifdef IREDIST
171  CTF_Request * rep_reqs,
172  MPI_Comm cm,
173 #endif
174 #ifdef PUTREDIST
175  int64_t const * put_displs,
176  CTF_Win & win,
177 #endif
178  bool data_to_buckets,
179  char * __restrict__ data,
180  char ** __restrict__ buckets,
181  int64_t * counts,
182  algstrct const * sr,
183  int bucket_off=0,
184  int pe_off=0){
185 #ifdef USE_OMP
186  int tothi_rep_phase = 1;
187  for (int id=1; id<=idim; id++){
188  tothi_rep_phase *= rep_phase[id];
189  }
190  #pragma omp parallel for
191  for (int t=0; t<tothi_rep_phase; t++){
192  int rep_idx2[order];
193  memcpy(rep_idx2, rep_idx, sizeof(int)*order);
194  rep_idx[0] = -1;
195  int rec_bucket_off = bucket_off;
196  int rec_pe_off = pe_off;
197  int tleft = t;
198  for (int id=1; id<=idim; id++){
199  int r = tleft%rep_phase[id];
200  tleft = tleft / rep_phase[id];
201  rep_idx2[id] = r;
202  rec_bucket_off += bucket_offset[id][r];
203  rec_pe_off += pe_offset[id][r];
204  }
205  redist_bucket_isr<0>(order, pe_offset, bucket_offset, data_offset, ivmax_pre, rep_phase, rep_idx2, virt_dim0,
206 #ifdef IREDIST
207  rep_reqs, cm,
208 #endif
209 #ifdef PUTREDIST
210  put_displs, win,
211 #endif
212  data_to_buckets, data, buckets, counts, sr, rec_bucket_off, rec_pe_off);
213 
214  }
215 #else
216  if (rep_phase[idim] == 1){
217  int rec_bucket_off = bucket_off + bucket_offset[idim][0];
218  int rec_pe_off = pe_off + pe_offset[idim][0];
219  redist_bucket_isr<idim-1>(order, pe_offset, bucket_offset, data_offset, ivmax_pre, rep_phase, rep_idx, virt_dim0,
220 #ifdef IREDIST
221  rep_reqs, cm,
222 #endif
223 #ifdef PUTREDIST
224  put_displs, win,
225 #endif
226  data_to_buckets, data, buckets, counts, sr, rec_bucket_off, rec_pe_off);
227  } else {
228  for (int r=0; r<rep_phase[idim]; r++){
229  int rep_idx2[order];
230  memcpy(rep_idx2, rep_idx, sizeof(int)*order);
231  rep_idx[0] = -1;
232  rep_idx2[idim] = r;
233  int rec_bucket_off = bucket_off + bucket_offset[idim][r];
234  int rec_pe_off = pe_off + pe_offset[idim][r];
235  redist_bucket_isr<idim-1>(order, pe_offset, bucket_offset, data_offset, ivmax_pre, rep_phase, rep_idx2, virt_dim0,
236 #ifdef IREDIST
237  rep_reqs, cm,
238 #endif
239 #ifdef PUTREDIST
240  put_displs, win,
241 #endif
242  data_to_buckets, data, buckets, counts, sr, rec_bucket_off, rec_pe_off);
243  }
244  }
245 #endif
246 }
247 
248 
249 template <>
251  (int order,
252  int * const * pe_offset,
253  int * const * bucket_offset,
254  int64_t * const * data_offset,
255  int * const * ivmax_pre,
256  int const * rep_phase,
257  int * rep_idx,
258  int virt_dim0,
259 #ifdef IREDIST
260  CTF_Request * rep_reqs,
261  MPI_Comm cm,
262 #endif
263 #ifdef PUTREDIST
264  int64_t const * put_displs,
265  CTF_Win & win,
266 #endif
267  bool data_to_buckets,
268  char * __restrict__ data,
269  char ** __restrict__ buckets,
270  int64_t * counts,
271  algstrct const * sr,
272  int bucket_off,
273  int pe_off){
274 #ifndef WAITANY
275 #ifdef IREDIST
276  if (!data_to_buckets){
277  MPI_Waitall(rep_phase[0], rep_reqs+bucket_off, MPI_STATUSES_IGNORE);
278  }
279 #endif
280 #endif
281  SWITCH_ORD_CALL(redist_bucket_ror, order-1, bucket_offset, data_offset, ivmax_pre, rep_phase, rep_idx, virt_dim0, data_to_buckets, data, buckets, counts, sr, 0, bucket_off, 0)
282  if (data_to_buckets){
283 #ifdef IREDIST
284 #ifndef PUT_NOTIFY
285  for (int r=0; r<rep_phase[0]; r++){
286  int bucket = bucket_off + bucket_offset[0][r];
287  int pe = pe_off + pe_offset[0][r];
288  MPI_Isend(buckets[bucket], counts[bucket], sr->mdtype(), pe, MTAG, cm, rep_reqs+bucket);
289  }
290  //progressss please
291  if (bucket_off > 0){
292  int flag;
293  MPI_Testall(bucket_off, rep_reqs, &flag, MPI_STATUSES_IGNORE);
294  }
295 #endif
296 #endif
297 #ifdef PUTREDIST
298  put_buckets<0>(rep_phase, pe_offset, bucket_offset, buckets, counts, sr, put_displs, win, bucket_off, pe_off);
299 #endif
300  }
301 }
302 #endif
303 
304 void dgtog_reshuffle(int const * sym,
305  int const * edge_len,
306  distribution const & old_dist,
307  distribution const & new_dist,
308  char ** ptr_tsr_data,
309  char ** ptr_tsr_new_data,
310  algstrct const * sr,
311  CommData ord_glb_comm){
312  int order = old_dist.order;
313 
314  char * tsr_data = *ptr_tsr_data;
315  char * tsr_new_data = *ptr_tsr_new_data;
316 
317  if (order == 0){
318  tsr_new_data = sr->alloc(1);
319  if (ord_glb_comm.rank == 0){
320  sr->copy(tsr_new_data, tsr_data);
321  } else {
322  sr->copy(tsr_new_data, sr->addid());
323  }
324  *ptr_tsr_new_data = tsr_new_data;
325  sr->dealloc(tsr_data);
326  return;
327  }
328 #ifdef TUNE
329  MPI_Barrier(ord_glb_comm.cm);
330 #endif
332  double st_time = MPI_Wtime();
333 
334  int * old_virt_lda, * new_virt_lda;
335  alloc_ptr(order*sizeof(int), (void**)&old_virt_lda);
336  alloc_ptr(order*sizeof(int), (void**)&new_virt_lda);
337 
338  new_virt_lda[0] = 1;
339  old_virt_lda[0] = 1;
340 
341  int old_idx_lyr = ord_glb_comm.rank - old_dist.perank[0]*old_dist.pe_lda[0];
342  int new_idx_lyr = ord_glb_comm.rank - new_dist.perank[0]*new_dist.pe_lda[0];
343  int new_nvirt=new_dist.virt_phase[0], old_nvirt=old_dist.virt_phase[0];
344  for (int i=1; i<order; i++) {
345  new_virt_lda[i] = new_nvirt;
346  old_virt_lda[i] = old_nvirt;
347  old_nvirt = old_nvirt*old_dist.virt_phase[i];
348  new_nvirt = new_nvirt*new_dist.virt_phase[i];
349  old_idx_lyr -= old_dist.perank[i]*old_dist.pe_lda[i];
350  new_idx_lyr -= new_dist.perank[i]*new_dist.pe_lda[i];
351  }
352  int64_t old_virt_nelem = old_dist.size/old_nvirt;
353  int64_t new_virt_nelem = new_dist.size/new_nvirt;
354 
355  int *old_phys_edge_len; alloc_ptr(sizeof(int)*order, (void**)&old_phys_edge_len);
356  for (int dim = 0;dim < order;dim++)
357  old_phys_edge_len[dim] = old_dist.pad_edge_len[dim]/old_dist.phys_phase[dim];
358 
359  int *new_phys_edge_len; alloc_ptr(sizeof(int)*order, (void**)&new_phys_edge_len);
360  for (int dim = 0;dim < order;dim++)
361  new_phys_edge_len[dim] = new_dist.pad_edge_len[dim]/new_dist.phys_phase[dim];
362 
363  int *old_virt_edge_len; alloc_ptr(sizeof(int)*order, (void**)&old_virt_edge_len);
364  for (int dim = 0;dim < order;dim++)
365  old_virt_edge_len[dim] = old_phys_edge_len[dim]/old_dist.virt_phase[dim];
366 
367  int *new_virt_edge_len; alloc_ptr(sizeof(int)*order, (void**)&new_virt_edge_len);
368  for (int dim = 0;dim < order;dim++)
369  new_virt_edge_len[dim] = new_phys_edge_len[dim]/new_dist.virt_phase[dim];
370 
371  int nold_rep = 1;
372  int * old_rep_phase; alloc_ptr(sizeof(int)*order, (void**)&old_rep_phase);
373  for (int i=0; i<order; i++){
374  old_rep_phase[i] = lcm(old_dist.phys_phase[i], new_dist.phys_phase[i])/old_dist.phys_phase[i];
375  nold_rep *= old_rep_phase[i];
376  }
377 
378  int nnew_rep = 1;
379  int * new_rep_phase; alloc_ptr(sizeof(int)*order, (void**)&new_rep_phase);
380  for (int i=0; i<order; i++){
381  new_rep_phase[i] = lcm(new_dist.phys_phase[i], old_dist.phys_phase[i])/new_dist.phys_phase[i];
382  nnew_rep *= new_rep_phase[i];
383  }
384 
385  int64_t * send_counts = (int64_t*)alloc(sizeof(int64_t)*nold_rep);
386  std::fill(send_counts, send_counts+nold_rep, 0);
387  calc_drv_displs(sym, edge_len, old_dist, new_dist, send_counts, old_idx_lyr);
388 
389  int64_t * recv_counts = (int64_t*)alloc(sizeof(int64_t)*nnew_rep);
390  std::fill(recv_counts, recv_counts+nnew_rep, 0);
391  calc_drv_displs(sym, edge_len, new_dist, old_dist, recv_counts, new_idx_lyr);
392  int64_t * recv_displs = (int64_t*)alloc(sizeof(int64_t)*nnew_rep);
393 
394 #ifdef IREDIST
395  CTF_Request * recv_reqs = (CTF_Request*)alloc(sizeof(CTF_Request)*nnew_rep);
396  CTF_Request * send_reqs = (CTF_Request*)alloc(sizeof(CTF_Request)*nold_rep);
397 #endif
398 
399  for (int i=0; i<nnew_rep; i++){
400  if (i==0)
401  recv_displs[0] = 0;
402  else
403  recv_displs[i] = recv_displs[i-1] + recv_counts[i-1];
404  }
405 
406  int ** recv_bucket_offset; alloc_ptr(sizeof(int*)*order, (void**)&recv_bucket_offset);
407  int ** recv_pe_offset; alloc_ptr(sizeof(int*)*order, (void**)&recv_pe_offset);
408  int ** recv_ivmax_pre; alloc_ptr(sizeof(int*)*order, (void**)&recv_ivmax_pre);
409  int64_t ** recv_data_offset; alloc_ptr(sizeof(int64_t*)*order, (void**)&recv_data_offset);
410  precompute_offsets(new_dist, old_dist, sym, edge_len, new_rep_phase, new_phys_edge_len, new_virt_edge_len, new_dist.virt_phase, new_virt_lda, new_virt_nelem, recv_pe_offset, recv_bucket_offset, recv_data_offset, recv_ivmax_pre);
411 
412  int ** send_bucket_offset; alloc_ptr(sizeof(int*)*order, (void**)&send_bucket_offset);
413  int ** send_pe_offset; alloc_ptr(sizeof(int*)*order, (void**)&send_pe_offset);
414  int ** send_ivmax_pre; alloc_ptr(sizeof(int*)*order, (void**)&send_ivmax_pre);
415  int64_t ** send_data_offset; alloc_ptr(sizeof(int64_t*)*order, (void**)&send_data_offset);
416 
417  precompute_offsets(old_dist, new_dist, sym, edge_len, old_rep_phase, old_phys_edge_len, old_virt_edge_len, old_dist.virt_phase, old_virt_lda, old_virt_nelem, send_pe_offset, send_bucket_offset, send_data_offset, send_ivmax_pre);
418 
419 #if !defined(IREDIST) && !defined(PUTREDIST)
420  int64_t * send_displs = (int64_t*)alloc(sizeof(int64_t)*nold_rep);
421  send_displs[0] = 0;
422  for (int i=1; i<nold_rep; i++){
423  send_displs[i] = send_displs[i-1] + send_counts[i-1];
424  }
425 #elif defined(PUTREDIST)
426  int64_t * all_recv_displs = (int64_t*)alloc(sizeof(int64_t)*ord_glb_comm.np);
427  SWITCH_ORD_CALL(CTF_int::calc_cnt_from_rep_cnt, order-1, new_rep_phase, recv_pe_offset, recv_bucket_offset, recv_displs, all_recv_displs, 0, 0, 1);
428 
429  int64_t * all_put_displs = (int64_t*)alloc(sizeof(int64_t)*ord_glb_comm.np);
430  MPI_Alltoall(all_recv_displs, 1, MPI_INT64_T, all_put_displs, 1, MPI_INT64_T, ord_glb_comm.cm);
431  CTF_int::cdealloc(all_recv_displs);
432 
433  int64_t * put_displs = (int64_t*)alloc(sizeof(int64_t)*nold_rep);
434  SWITCH_ORD_CALL(CTF_int::calc_cnt_from_rep_cnt, order-1, old_rep_phase, send_pe_offset, send_bucket_offset, all_put_displs, put_displs, 0, 0, 0);
435 
436  CTF_int::cdealloc(all_put_displs);
437 
438  char * recv_buffer;
439  //mst_alloc_ptr(new_dist.size*sr->el_size, (void**)&recv_buffer);
440  recv_buffer = sr->alloc(new_dist.size);
441 
442  CTF_Win win;
443  int suc = MPI_Win_create(recv_buffer, new_dist.size*sr->el_size, sr->el_size, MPI_INFO_NULL, ord_glb_comm.cm, &win);
444  ASSERT(suc == MPI_SUCCESS);
445 #ifndef USE_FOMPI
446  MPI_Win_fence(0, win);
447 #endif
448 #endif
449 
450 #ifdef IREDIST
451 #ifdef PUTREDIST
452  if (new_idx_lyr == 0)
453  SWITCH_ORD_CALL(isendrecv, order-1, recv_pe_offset, recv_bucket_offset, new_rep_phase, recv_counts, recv_displs, recv_reqs, win, recv_buffer, sr, 0, 0, 1);
454 #else
455  char * recv_buffer;
456  recv_buffer = sr->alloc(new_dist.size);
457  if (new_idx_lyr == 0)
458  SWITCH_ORD_CALL(isendrecv, order-1, recv_pe_offset, recv_bucket_offset, new_rep_phase, recv_counts, recv_displs, recv_reqs, ord_glb_comm.cm, recv_buffer, sr, 0, 0, 1);
459 #endif
460 #endif
461 
462 
463  if (old_idx_lyr == 0){
464  char * aux_buf = sr->alloc(old_dist.size);
465  char * tmp = aux_buf;
466  aux_buf = tsr_data;
467  tsr_data = tmp;
468  char ** buckets = (char**)alloc(sizeof(char**)*nold_rep);
469 
470  buckets[0] = tsr_data;
471  for (int i=1; i<nold_rep; i++){
472  buckets[i] = buckets[i-1] + sr->el_size*send_counts[i-1];
473  }
474 #if DEBUG >= 1
475  int64_t save_counts[nold_rep];
476  memcpy(save_counts, send_counts, sizeof(int64_t)*nold_rep);
477 #endif
478  std::fill(send_counts, send_counts+nold_rep, 0);
480 #ifdef ROR
481  int * old_rep_idx; alloc_ptr(sizeof(int)*order, (void**)&old_rep_idx);
482  memset(old_rep_idx, 0, sizeof(int)*order);
483  old_rep_idx[0]=-1;
484  SWITCH_ORD_CALL(redist_bucket_isr, order-1, order, send_pe_offset, send_bucket_offset, send_data_offset,
485  send_ivmax_pre, old_rep_phase, old_rep_idx, old_dist.virt_phase[0],
486 #ifdef IREDIST
487  send_reqs, ord_glb_comm.cm,
488 #endif
489 #ifdef PUTREDIST
490  put_displs, win,
491 #endif
492  1, aux_buf, buckets, send_counts, sr);
493  CTF_int::cdealloc(old_rep_idx);
494 #else
495  SWITCH_ORD_CALL(redist_bucket, order-1, send_bucket_offset, send_data_offset,
496  send_ivmax_pre, old_rep_phase[0], old_dist.virt_phase[0], 1, aux_buf, buckets, send_counts, sr);
497 #endif
499  CTF_int::cdealloc(buckets);
500 
501 #if DEBUG>= 1
502  bool pass = true;
503  for (int i=0; i<nold_rep; i++){
504  if (save_counts[i] != send_counts[i]) pass = false;
505  }
506  if (!pass){
507  for (int i=0; i<nold_rep; i++){
508  printf("[%d] send_counts[%d] = %ld, redist_bucket counts[%d] = %ld\n", ord_glb_comm.rank, i, save_counts[i], i, send_counts[i]);
509  }
510  }
511  ASSERT(pass);
512 #endif
513  sr->dealloc(aux_buf);
514  }
515 #ifndef WAITANY
516 #ifndef IREDIST
517 #ifndef PUTREDIST
518  char * recv_buffer = sr->alloc(new_dist.size);
519 
520  /* Communicate data */
521  TAU_FSTART(COMM_RESHUFFLE);
522 
523  CTF_Request * reqs = (CTF_Request*)alloc(sizeof(CTF_Request)*(nnew_rep+nold_rep));
524  int nrecv = 0;
525  if (new_idx_lyr == 0){
526  nrecv = nnew_rep;
527  SWITCH_ORD_CALL(isendrecv, order-1, recv_pe_offset, recv_bucket_offset, new_rep_phase, recv_counts, recv_displs, reqs, ord_glb_comm.cm, recv_buffer, sr, 0, 0, 1);
528  }
529  int nsent = 0;
530  if (old_idx_lyr == 0){
531  nsent = nold_rep;
532  SWITCH_ORD_CALL(isendrecv, order-1, send_pe_offset, send_bucket_offset, old_rep_phase, send_counts, send_displs, reqs+nrecv, ord_glb_comm.cm, tsr_data, sr, 0, 0, 0);
533  }
534  if (nrecv+nsent > 0){
535 // MPI_Status * stat = (MPI_Status*)alloc(sizeof(MPI_Status)*(nrecv+nsent));
536  MPI_Waitall(nrecv+nsent, reqs, MPI_STATUSES_IGNORE);
537  }
538  cdealloc(reqs);
539  //ord_glb_comm.all_to_allv(tsr_data, send_counts, send_displs, sr->el_size,
540  // recv_buffer, recv_counts, recv_displs);
541  TAU_FSTOP(COMM_RESHUFFLE);
542  CTF_int::cdealloc(send_displs);
543 #else
544  CTF_int::cdealloc(put_displs);
545  TAU_FSTART(redist_fence);
546  MPI_Win_fence(0, win);
547  TAU_FSTOP(redist_fence);
548  MPI_Win_free(&win);
549 #endif
550  sr->dealloc(tsr_data);
551 #endif
552 #endif
553  CTF_int::cdealloc(send_counts);
554 
555  if (new_idx_lyr == 0){
556  char * aux_buf = sr->alloc(new_dist.size);
557  sr->init(new_dist.size, aux_buf);
558 
559  char ** buckets = (char**)alloc(sizeof(char**)*nnew_rep);
560 
561  buckets[0] = recv_buffer;
562  //printf("[%d] size of %dth bucket is %ld\n", ord_glb_comm.rank, 0, send_counts[0]);
563  for (int i=1; i<nnew_rep; i++){
564  buckets[i] = buckets[i-1] + sr->el_size*recv_counts[i-1];
565  //printf("[%d] size of %dth bucket is %ld\n", ord_glb_comm.rank, i, send_counts[i]);
566  }
567 
568 #if DEBUG >= 1
569  int64_t save_counts[nnew_rep];
570  memcpy(save_counts, recv_counts, sizeof(int64_t)*nnew_rep);
571 #endif
572  std::fill(recv_counts, recv_counts+nnew_rep, 0);
573 
574  TAU_FSTART(redist_debucket);
575 
576 #ifdef WAITANY
577  for (int nb=0; nb<nnew_rep; nb++){
578  //int source;
579 /*#ifdef PUT_NOTIFY
580  foMPI_Request req;
581  foMPI_Wait(&req, MPY_ANY_TAG);
582  source = req.source;
583 #else*/
584  MPI_Status stat;
585  int bucket_off;
586  MPI_Waitany(nnew_rep, recv_reqs, &bucket_off, &stat);
587 // foMPI_Start(recv_reqs+bucket_off);
588 // ASSERT(ret== MPI_SUCCESS);
589  ASSERT(bucket_off != MPI_UNDEFINED);
590  ASSERT(bucket_off >= 0 && bucket_off <nnew_rep);
591  ASSERT(recv_counts[bucket_off] == 0);
592  //source = stat.source;
593 //#endif
594  int rep_idx[order];
595  int iboff=bucket_off;
596  for (int i=0; i<order; i++){
597 /* //FIXME: lame
598  int pe_offi = ((source/MAX(1,old_dist.pe_lda[i]))%old_dist.phys_phase[i])*MAX(1,old_dist.pe_lda[i]);
599  int pidx = -1;
600  for (int j=0; j<new_rep_phase[i]; j++){
601  printf("source = %d i = %d pe_offi=%d recv_pe_offset[i][%d]=%d\n",source,i,pe_offi,j,recv_pe_offset[i][j]);
602  if (pe_offi == recv_pe_offset[i][j]) pidx=j;
603  }
604  ASSERT(pidx!=-1);
605  rep_idx[i] = pidx;
606  bucket_off += recv_bucket_offset[i][pidx];*/
607  rep_idx[i] = iboff%new_rep_phase[i];
608  iboff = iboff/new_rep_phase[i];
609  }
610 
611  SWITCH_ORD_CALL(redist_bucket_ror, order-1, recv_bucket_offset, recv_data_offset, recv_ivmax_pre, new_rep_phase, rep_idx, new_dist.virt_phase[0], 0, aux_buf, buckets, recv_counts, sr, 0, bucket_off, 0)
612  //printf("recv_counts[%d]=%d, saved_counts[%d]=%d\n",bucket_off,recv_counts[bucket_off],bucket_off,save_counts[bucket_off]);
613 
614  }
615 #ifdef PUT_NOTIFY
616  for (int nb=0; nb<nnew_rep; nb++){
617  foMPI_Request_free(recv_reqs+nb);
618  }
619 #endif
620 #else
621 #ifdef ROR
622  int * new_rep_idx; alloc_ptr(sizeof(int)*order, (void**)&new_rep_idx);
623  memset(new_rep_idx, 0, sizeof(int)*order);
624  new_rep_idx[0] = -1;
625  SWITCH_ORD_CALL(redist_bucket_isr, order-1, order, recv_pe_offset, recv_bucket_offset, recv_data_offset,
626  recv_ivmax_pre, new_rep_phase, new_rep_idx, new_dist.virt_phase[0],
627 #ifdef IREDIST
628  recv_reqs, ord_glb_comm.cm,
629 #endif
630 #ifdef PUTREDIST
631  NULL, win,
632 #endif
633  0, aux_buf, buckets, recv_counts, sr);
634  CTF_int::cdealloc(new_rep_idx);
635 #else
637  recv_bucket_offset, recv_data_offset, recv_ivmax_pre,
638  new_rep_phase[0], new_dist.virt_phase[0], 0, aux_buf, buckets, recv_counts, sr);
639 #endif
640 #endif
641  TAU_FSTOP(redist_debucket);
642  CTF_int::cdealloc(buckets);
643 #if DEBUG >= 1
644  bool pass = true;
645  for (int i=0; i<nnew_rep; i++){
646  if (save_counts[i] != recv_counts[i]) pass = false;
647  }
648  if (!pass){
649  for (int i=0; i<nnew_rep; i++){
650  printf("[%d] recv_counts[%d] = %ld, redist_bucket counts[%d] = %ld\n", ord_glb_comm.rank, i, save_counts[i], i, recv_counts[i]);
651  }
652  }
653  ASSERT(pass);
654 #endif
655  *ptr_tsr_new_data = aux_buf;
656  sr->dealloc(recv_buffer);
657  } else {
658  if (sr->addid() != NULL)
659  sr->set(recv_buffer, sr->addid(), new_dist.size);
660  *ptr_tsr_new_data = recv_buffer;
661  }
662  //printf("[%d] reached final barrier %d\n",ord_glb_comm.rank, MTAG);
663 #ifdef IREDIST
664 
665  CTF_int::cdealloc(recv_reqs);
666  CTF_int::cdealloc(send_reqs);
667 #endif
668  for (int i=0; i<order; i++){
669  CTF_int::cdealloc(recv_pe_offset[i]);
670  CTF_int::cdealloc(recv_bucket_offset[i]);
671  CTF_int::cdealloc(recv_data_offset[i]);
672  CTF_int::cdealloc(recv_ivmax_pre[i]);
673  }
674  CTF_int::cdealloc(recv_pe_offset);
675  CTF_int::cdealloc(recv_bucket_offset);
676  CTF_int::cdealloc(recv_data_offset);
677  CTF_int::cdealloc(recv_ivmax_pre);
678 
679  for (int i=0; i<order; i++){
680  CTF_int::cdealloc(send_pe_offset[i]);
681  CTF_int::cdealloc(send_bucket_offset[i]);
682  CTF_int::cdealloc(send_data_offset[i]);
683  CTF_int::cdealloc(send_ivmax_pre[i]);
684  }
685  CTF_int::cdealloc(send_pe_offset);
686  CTF_int::cdealloc(send_bucket_offset);
687  CTF_int::cdealloc(send_data_offset);
688  CTF_int::cdealloc(send_ivmax_pre);
689 
690  CTF_int::cdealloc(old_virt_lda);
691  CTF_int::cdealloc(new_virt_lda);
692  CTF_int::cdealloc(recv_counts);
693  CTF_int::cdealloc(recv_displs);
694  CTF_int::cdealloc(old_phys_edge_len);
695  CTF_int::cdealloc(new_phys_edge_len);
696  CTF_int::cdealloc(old_virt_edge_len);
697  CTF_int::cdealloc(new_virt_edge_len);
698  CTF_int::cdealloc(old_rep_phase);
699  CTF_int::cdealloc(new_rep_phase);
700 #ifdef IREDIST
701 #ifdef PUT_NOTIFY
702  foMPI_Win_flush_all(win);
703  foMPI_Win_free(&win);
704 #else
705  TAU_FSTART(barrier_after_dgtog_reshuffle);
706  MPI_Barrier(ord_glb_comm.cm);
707  TAU_FSTOP(barrier_after_dgtog_reshuffle);
708 #endif
709  sr->dealloc(tsr_data);
710 #endif
711 #ifdef TUNE
712  MPI_Barrier(ord_glb_comm.cm);
713 #endif
714  double exe_time = MPI_Wtime()-st_time;
715  double tps[] = {exe_time, 1.0, (double)log2(ord_glb_comm.np), (double)std::max(old_dist.size, new_dist.size)*log2(ord_glb_comm.np)*sr->el_size};
716 
717  // double-check
718  dgtog_res_mdl.observe(tps);
720 }
void redist_bucket(int *const *bucket_offset, int64_t *const *data_offset, int *const *ivmax_pre, int rep_phase0, int virt_dim0, bool data_to_buckets, char *__restrict__ data, char **__restrict__ buckets, int64_t *counts, algstrct const *sr, int64_t data_off, int bucket_off, int prev_idx)
Definition: dgtog_bucket.h:7
MPI_Request CTF_Request
virtual void init(int64_t n, char *arr) const
initialize n objects to zero
Definition: algstrct.cxx:697
void redist_bucket_ror< 0 >(int *const *bucket_offset, int64_t *const *data_offset, int *const *ivmax_pre, int const *rep_phase, int const *rep_idx, int virt_dim0, bool data_to_buckets, char *__restrict__ data, char **__restrict__ buckets, int64_t *counts, algstrct const *sr, int64_t data_off, int bucket_off, int prev_idx)
#define IREDIST
virtual void copy(char *a, char const *b) const
copies element b to element a
Definition: algstrct.cxx:538
#define ASSERT(...)
Definition: util.h:88
void * alloc(int64_t len)
alloc abstraction
Definition: memcontrol.cxx:365
#define PUTREDIST
void redist_bucket_isr< 0 >(int order, int *const *pe_offset, int *const *bucket_offset, int64_t *const *data_offset, int *const *ivmax_pre, int const *rep_phase, int *rep_idx, int virt_dim0, bool data_to_buckets, char *__restrict__ data, char **__restrict__ buckets, int64_t *counts, algstrct const *sr, int bucket_off, int pe_off)
void dgtog_reshuffle(int const *sym, int const *edge_len, distribution const &old_dist, distribution const &new_dist, char **ptr_tsr_data, char **ptr_tsr_new_data, algstrct const *sr, CommData ord_glb_comm)
void isendrecv< 0 >(int *const *pe_offset, int *const *bucket_offset, int const *rep_phase, int64_t const *counts, int64_t const *displs, CTF_Request *reqs, MPI_Comm cm, char *buffer, algstrct const *sr, int bucket_off, int pe_off, int dir)
virtual char const * addid() const
MPI datatype for pairs.
Definition: algstrct.cxx:89
virtual void dealloc(char *ptr) const
deallocate given pointer containing contiguous array of values
Definition: algstrct.cxx:689
void calc_drv_displs(int const *sym, int const *edge_len, distribution const &old_dist, distribution const &new_dist, int64_t *counts, int idx_lyr)
virtual char * alloc(int64_t n) const
allocate space for n items, necessary for object types
Definition: algstrct.cxx:685
void redist_bucket< 0 >(int *const *bucket_offset, int64_t *const *data_offset, int *const *ivmax_pre, int rep_phase0, int virt_dim0, bool data_to_buckets, char *__restrict__ data, char **__restrict__ buckets, int64_t *counts, algstrct const *sr, int64_t data_off, int bucket_off, int prev_idx)
Definition: dgtog_bucket.h:30
virtual void set(char *a, char const *b, int64_t n) const
sets n elements of array a to value b
Definition: algstrct.cxx:629
#define MTAG
int alloc_ptr(int64_t len, void **const ptr)
alloc abstraction
Definition: memcontrol.cxx:320
#define SWITCH_ORD_CALL(F, act_ord,...)
Definition: util.h:119
void redist_bucket_ror(int *const *bucket_offset, int64_t *const *data_offset, int *const *ivmax_pre, int const *rep_phase, int const *rep_idx, int virt_dim0, bool data_to_buckets, char *__restrict__ data, char **__restrict__ buckets, int64_t *counts, algstrct const *sr, int64_t data_off=0, int bucket_off=0, int prev_idx=0)
void isendrecv(int *const *pe_offset, int *const *bucket_offset, int const *rep_phase, int64_t const *counts, int64_t const *displs, CTF_Request *reqs, MPI_Comm cm, char *buffer, algstrct const *sr, int bucket_off, int pe_off, int dir)
#define TAU_FSTOP(ARG)
Definition: util.h:281
void redist_bucket_isr(int order, int *const *pe_offset, int *const *bucket_offset, int64_t *const *data_offset, int *const *ivmax_pre, int const *rep_phase, int *rep_idx, int virt_dim0, bool data_to_buckets, char *__restrict__ data, char **__restrict__ buckets, int64_t *counts, algstrct const *sr, int bucket_off=0, int pe_off=0)
MPI_Win CTF_Win
Definition: fompi_wrapper.h:15
#define TAU_FSTART(ARG)
Definition: util.h:280
void put_buckets< 0 >(int const *rep_phase, int *const *pe_offset, int *const *bucket_offset, char *const *__restrict__ buckets, int64_t const *counts, algstrct const *sr, int64_t const *put_displs, CTF_Win &win, int bucket_off, int pe_off)
MPI_Comm cm
Definition: common.h:129
int el_size
size of each element of algstrct in bytes
Definition: algstrct.h:16
int cdealloc(void *ptr)
free abstraction
Definition: memcontrol.cxx:480
algstrct (algebraic structure) defines the elementwise operations computed in each tensor contraction...
Definition: algstrct.h:34
LinModel< 3 > dgtog_res_mdl(dgtog_res_mdl_init,"dgtog_res_mdl")
int lcm(int a, int b)
Definition: util.h:340
void precompute_offsets(distribution const &old_dist, distribution const &new_dist, int const *sym, int const *len, int const *rep_phase, int const *phys_edge_len, int const *virt_edge_len, int const *virt_dim, int const *virt_lda, int64_t virt_nelem, int **pe_offset, int **bucket_offset, int64_t **data_offset, int **ivmax_pre)
void calc_cnt_from_rep_cnt(int const *rep_phase, int *const *pe_offset, int *const *bucket_offset, int64_t const *old_counts, int64_t *counts, int bucket_off, int pe_off, int dir)
void put_buckets(int const *rep_phase, int *const *pe_offset, int *const *bucket_offset, char *const *__restrict__ buckets, int64_t const *counts, algstrct const *sr, int64_t const *put_displs, CTF_Win &win, int bucket_off, int pe_off)
void redist_bucket_r0(int *const *bucket_offset, int64_t *const *data_offset, int *const *ivmax_pre, int rep_phase0, int rep_idx0, int virt_dim0, bool data_to_buckets, char *__restrict__ data, char **__restrict__ buckets, int64_t *counts, algstrct const *sr, int64_t data_off, int bucket_off, int prev_idx)
Definition: dgtog_bucket.h:90
virtual MPI_Datatype mdtype() const
MPI datatype.
Definition: algstrct.cxx:80