Cyclops Tensor Framework
parallel arithmetic on multidimensional arrays
btwn_central.cxx
Go to the documentation of this file.
1 
8 #include <float.h>
9 #include "btwn_central.h"
10 
11 using namespace CTF;
12 
13 
14 //overwrite printfs to make it possible to print matrices of mpaths
15 namespace CTF {
16  template <>
17  inline void Set<mpath>::print(char const * a, FILE * fp) const {
18  fprintf(fp,"(w=%d m=%d)",((mpath*)a)[0].w,((mpath*)a)[0].m);
19  }
20  template <>
21  inline void Set<cpath>::print(char const * a, FILE * fp) const {
22  fprintf(fp,"(w=%d m=%f c=%lf)",((cpath*)a)[0].w,((cpath*)a)[0].m,((cpath*)a)[0].c);
23  }
24 }
25 
35 void btwn_cnt_fast(Matrix<int> A, int b, Vector<double> & v, int nbatches=0, bool sp_B=true, bool sp_C=true){
36  assert(sp_B || !sp_C);
37  World dw = *A.wrld;
38  int n = A.nrow;
39 
42 
43 
44  Matrix<mpath> speye(n,n,SP,dw,p);
45  Scalar<mpath> sm(mpath(0,1),dw,p);
46  speye["ii"] = sm[""];
47  Transform<int>([=](int& w){ w = INT_MAX/2; })(A["ii"]);
48  for (int ib=0; ib<n && (nbatches == 0 || ib/b<nbatches); ib+=b){
49  int k = std::min(b, n-ib);
50 
51  //initialize shortest mpath vectors from the next k sources to the corresponding columns of the adjacency matrices and loops with weight 0
52  //Transform<int>([=](int& w){ w = 0; })(A["ii"]);
53  Tensor<int> iA = A.slice(ib*n, (ib+k-1)*n+n-1);
54 
55  //let shortest mpaths vectors be mpaths
56  int atr_C = 0;
57  if (sp_C) atr_C = atr_C | SP;
58  Matrix<mpath> B(n, k, atr_C, dw, p, "B");
59  Matrix<mpath> all_B(n, k, dw, p, "all_B");
60  B["ij"] = Function<int,mpath>([](int w){ return mpath(w, 1); })(iA["ij"]);
61 
63 
64 
65  //compute Bellman Ford
66  int nbl = 0;
67 #ifndef TEST_SUITE
68  double sbl = MPI_Wtime();
69 #endif
70  all_B["ij"] = B["ij"];
71  for (int i=0; i<n; i++, nbl++){
72  Matrix<mpath> C(B);
73  B.set_zero();
74  if (sp_B || sp_C){
75  C.sparsify([](mpath p){ return p.w < INT_MAX/2; });
76 // if (dw.rank == 0) printf("Bellman nnz_tot = %ld\n",C.nnz_tot);
77  if (C.nnz_tot == 0){ nbl--; break; }
78  }
79  CTF::Timer tbl("Bellman");
80  tbl.start();
81  (*Bellman)(A["ik"],C["kj"],B["ij"]);
82  tbl.stop();
83  CTF::Timer tblp("Bellman_post_tform");
84  tblp.start();
85  Transform<mpath,mpath>([](mpath p, mpath & q){ if (p.w<q.w || (p.w==q.w && q.m==0)) q.w = INT_MAX/2; } )(all_B["ij"],B["ij"]);
86  // Transform<mpath,mpath>([](mpath p, mpath & q){ if (p.w <= q.w){ if (p.w < q.w){ q=p; } else if (p.m > 0){ q.m+=p.m; } } })(B["ij"],all_B["ij"]);
87  all_B["ij"] += B["ij"];
88  tblp.stop();
89  if (!sp_B && !sp_C){
90  Scalar<int> num_changed(dw);
91  num_changed[""] += Function<mpath,int>([](mpath p){ return p.w<INT_MAX/2; })(B["ij"]);
92  if (num_changed.get_val() == 0) break;
93  }
94  }
95  delete Bellman;
96  Tensor<mpath> ispeye = speye.slice(ib*n, (ib+k-1)*n+n-1);
97  all_B["ij"] += ispeye["ij"];
98 
99 #ifndef TEST_SUITE
100  double tbl = MPI_Wtime() - sbl;
101 #endif
102 
103  //transfer shortest mpath data to Matrix of cpaths to compute c centrality scores
104  Matrix<cpath> cB(n, k, atr_C, dw, cp, "cB");
105  cB["ij"] = Function<mpath,cpath>([](mpath p){ return cpath(p.w, 1./p.m, 1.); })(all_B["ij"]);
106  Matrix<cpath> all_cB(n, k, dw, cp, "all_cB");
108  //compute centrality scores by propagating them backwards from the furthest nodes (reverse Bellman Ford)
109  int nbr = 0;
110 #ifndef TEST_SUITE
111  double sbr = MPI_Wtime();
112 #endif
113  Transform<mpath,cpath>([](mpath p, cpath & cp){ cp = cpath(p.w, 1./p.m, 0.); })(all_B["ij"],all_cB["ij"]);
114 
115  for (int i=0; i<n; i++, nbr++){
116  Matrix<cpath> C(cB);
117  if (sp_B || sp_C){
118  C.sparsify([](cpath p){ return p.w >= 0 && p.c != 0.0; });
119 // if (dw.rank == 0) printf("Brandes nnz_tot = %ld\n",C.nnz_tot);
120  if (C.nnz_tot == 0){ nbr--; break; }
121  }
122  cB.set_zero();
123  CTF::Timer tbr("Brandes");
124  tbr.start();
125  cB["ij"] += (*Brandes)(A["ki"],C["kj"]);
126  tbr.stop();
127  CTF::Timer tbrp("Brandes_post_tform");
128  tbrp.start();
129  Transform<mpath,cpath>([](mpath p, cpath & cp){ if (p.w == cp.w){ cp = cpath(p.w, 1./p.m, cp.c*p.m); } else { cp = cpath(p.w, 1./p.m, 0.0); } })(all_B["ij"],cB["ij"]);
130  tbrp.stop();
131  CTF::Timer tbra("Brandes_post_add");
132  tbra.start();
133  all_cB["ij"] += cB["ij"];
134  tbra.stop();
135 
136  if (!sp_B && !sp_C){
137  Scalar<int> num_changed = Scalar<int>();
138  num_changed[""] += Function<cpath,int>([](cpath p){ return p.w >= 0 && p.c!=0.0; })(cB["ij"]);
139  if (num_changed.get_val() == 0) break;
140  }
141  }
142  delete Brandes;
143  Transform<mpath,cpath>([](mpath p, cpath & cp){ if (p.w == cp.w){ cp = cpath(p.w, 1./p.m, cp.c); } else { cp = cpath(p.w, 1./p.m, 0.0); } })(all_B["ij"],cB["ij"]);
144 #ifndef TEST_SUITE
145  double tbr = MPI_Wtime() - sbr;
146  if (dw.rank == 0)
147  printf("(%d, %d) iter (%lf, %lf) sec\n", nbl, nbr, tbl, tbr);
148 #endif
149  //set self-centrality scores to zero
150  //FIXME: assumes loops are zero edges and there are no others zero edges in A
151  Transform<cpath>([](cpath & p){ if (p.w == 0) p.c=0; })(all_cB["ij"]);
152 
153  //accumulate centrality scores
154  v["i"] += Function<cpath,double>([](cpath a){ return a.c; })(all_cB["ij"]);
155  }
156 }
157 
164  World dw = *A.wrld;
165  int n = A.nrow;
166 
169  //mpath matrix to contain distance matrix
170  Matrix<mpath> P(n, n, dw, p, "P");
171 
172  Function<int,mpath> setw([](int w){ return mpath(w, 1); });
173 
174  P["ij"] = setw(A["ij"]);
175 
176  Transform<mpath>([=](mpath& w){ w = mpath(INT_MAX/2, 1); })(P["ii"]);
177 
178  Matrix<mpath> Pi(n, n, dw, p);
179  Pi["ij"] = P["ij"];
180 
181  //compute all shortest mpaths by Bellman Ford
182  for (int i=0; i<n; i++){
183  Transform<mpath>([=](mpath & p){ p = mpath(0,1); })(P["ii"]);
184  P["ij"] = Pi["ik"]*P["kj"];
185  }
186  Transform<mpath>([=](mpath& p){ p = mpath(INT_MAX/2, 1); })(P["ii"]);
187 
188  int lenn[3] = {n,n,n};
189  Tensor<cpath> postv(3, lenn, dw, cp, "postv");
190 
191  //set postv_ijk = shortest mpath from i to k (d_ik)
192  postv["ijk"] += Function<mpath,cpath>([](mpath p){ return cpath(p.w, p.m, 0.0); })(P["ik"]);
193 
194  //set postv_ijk =
195  // for all nodes j on the shortest mpath from i to k (d_ik=d_ij+d_jk)
196  // let multiplicity of shortest mpaths from i to j is a, from j to k is b, and from i to k is c
197  // then postv_ijk = a*b/c
199  [=](mpath a, mpath b, cpath & c){
200  if (c.w<INT_MAX/2 && a.w+b.w == c.w){ c.c = ((double)a.m*b.m)/c.m; }
201  else { c.c = 0; }
202  }
203  )(P["ij"],P["jk"],postv["ijk"]);
204 
205  //sum multiplicities v_j = sum(i,k) postv_ijk
206  v["j"] += Function<cpath,double>([](cpath p){ return p.c; })(postv["ijk"]);
207 }
208 
209 // calculate betweenness centrality a graph of n nodes distributed on World (communicator) dw
210 int btwn_cnt(int n,
211  World & dw,
212  double sp=.20,
213  int bsize=2,
214  int nbatches=1,
215  int test=0,
216  bool sp_B=1,
217  bool sp_C=1){
218 
219  //tropical semiring, define additive identity to be INT_MAX/2 to prevent integer overflow
220  Semiring<int> s(INT_MAX/2,
221  [](int a, int b){ return std::min(a,b); },
222  MPI_MIN,
223  0,
224  [](int a, int b){ return a+b; });
225 
226  //random adjacency matrix
227  Matrix<int> A(n, n, SP, dw, s, "A");
228 
229  //fill with values in the range of [1,min(n*n,100)]
230  srand(dw.rank+1);
231 // A.fill_random(1, std::min(n*n,100));
232  int64_t nmy = ((int64_t)std::max((int64_t)(n*sp),(int64_t)1))*((int64_t)((n+dw.np-1)/dw.np));
233  int64_t inds[nmy];
234  int vals[nmy];
235  int64_t i=0;
236  for (int64_t row=dw.rank*n/dw.np; row<(int64_t)(dw.rank+1)*n/dw.np; row++){
237  int64_t cols[std::max((int64_t)(n*sp),(int64_t)1)];
238  for (int64_t col=0; col<std::max((int64_t)(n*sp),(int64_t)1); col++){
239  bool is_rep;
240  do {
241  cols[col] = rand()%n;
242  is_rep = 0;
243  for (int64_t c=0; c<col; c++){
244  if (cols[c] == cols[col]) is_rep = 1;
245  }
246  } while (is_rep);
247  inds[i] = cols[col]*n+row;
248  vals[i] = (rand()%std::min(n*n,20))+1;
249  i++;
250  }
251  }
252  A.write(i,inds,vals);
253 
254  A["ii"] = 0;
255 
256 
257  //keep only values smaller than 20 (about 20% sparsity)
258  //A.sparsify([=](int a){ return a<sp*100; });
259 
260 
261  Vector<double> v1(n,dw);
262  Vector<double> v2(n,dw);
263 
264  double st_time = MPI_Wtime();
265 
266 
267 
268  if (test || n<= 20){
269  btwn_cnt_fast(A, bsize, v2, 0, sp_B, sp_C);
270  btwn_cnt_naive(A, v1);
271  //compute centrality scores by Bellman Ford with block size bsize
272 // v1.print();
273 // v2.print();
274  v1["i"] -= v2["i"];
275  int pass = v1.norm2() <= n*1.E-6;
276 
277  if (dw.rank == 0){
278  MPI_Reduce(MPI_IN_PLACE, &pass, 1, MPI_INT, MPI_MIN, 0, MPI_COMM_WORLD);
279  if (pass)
280  printf("{ betweenness centrality } passed \n");
281  else
282  printf("{ betweenness centrality } failed \n");
283  } else
284  MPI_Reduce(&pass, MPI_IN_PLACE, 1, MPI_INT, MPI_MIN, 0, MPI_COMM_WORLD);
285  return pass;
286  } else {
287  if (dw.rank == 0)
288  printf("Executing warm-up batch\n");
289  btwn_cnt_fast(A, bsize, v2, 1);
290  if (dw.rank == 0)
291  printf("Starting benchmarking\n");
292  Timer_epoch tbtwn("Betweenness centrality");
293  tbtwn.begin();
294  btwn_cnt_fast(A, bsize, v2, nbatches);
295  tbtwn.end();
296  if (dw.rank == 0){
297  if (nbatches == 0) printf("Completed all batches in time %lf sec, projected total %lf sec.\n", MPI_Wtime()-st_time, MPI_Wtime()-st_time);
298  else printf("Completed %d batches in time %lf sec, projected total %lf sec.\n", nbatches, MPI_Wtime()-st_time, (n/(bsize*nbatches))*(MPI_Wtime()-st_time));
299  }
300  return 1;
301  }
302 }
303 
304 
305 #ifndef TEST_SUITE
306 char* getCmdOption(char ** begin,
307  char ** end,
308  const std::string & option){
309  char ** itr = std::find(begin, end, option);
310  if (itr != end && ++itr != end){
311  return *itr;
312  }
313  return 0;
314 }
315 
316 
317 int main(int argc, char ** argv){
318  int rank, np, n, pass, bsize, nbatches, test;
319  bool sp_B, sp_C;
320  double sp;
321  int const in_num = argc;
322  char ** input_str = argv;
323 
324  MPI_Init(&argc, &argv);
325  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
326  MPI_Comm_size(MPI_COMM_WORLD, &np);
327 
328  if (getCmdOption(input_str, input_str+in_num, "-n")){
329  n = atoi(getCmdOption(input_str, input_str+in_num, "-n"));
330  if (n < 0) n = 7;
331  } else n = 7;
332 
333  if (getCmdOption(input_str, input_str+in_num, "-sp")){
334  sp = atof(getCmdOption(input_str, input_str+in_num, "-sp"));
335  if (sp < 0) sp = .2;
336  } else sp = .2;
337  if (getCmdOption(input_str, input_str+in_num, "-bsize")){
338  bsize = atoi(getCmdOption(input_str, input_str+in_num, "-bsize"));
339  if (bsize < 0) bsize = 2;
340  } else bsize = 2;
341  if (getCmdOption(input_str, input_str+in_num, "-nbatches")){
342  nbatches = atoi(getCmdOption(input_str, input_str+in_num, "-nbatches"));
343  if (nbatches < 0) nbatches = 1;
344  } else nbatches = 1;
345  if (getCmdOption(input_str, input_str+in_num, "-test")){
346  test = atoi(getCmdOption(input_str, input_str+in_num, "-test"));
347  if (test < 0) test = 0;
348  } else test = 0;
349  if (getCmdOption(input_str, input_str+in_num, "-sp_B")){
350  sp_B = (bool)atoi(getCmdOption(input_str, input_str+in_num, "-sp_B"));
351  } else sp_B = 1;
352  if (getCmdOption(input_str, input_str+in_num, "-sp_C")){
353  sp_C = (bool)atoi(getCmdOption(input_str, input_str+in_num, "-sp_C"));
354  } else sp_C = 1;
355 
356 
357 
358  {
359  World dw(argc, argv);
360 
361  if (rank == 0){
362  printf("Computing betweenness centrality for graph with %d nodes, with %lf percent sparsity, and batch size %d, second operand sparsity set to %d, output sparsity set to %d\n",n,sp,bsize,sp_B,sp_C);
363  }
364  pass = btwn_cnt(n, dw, sp, bsize, nbatches, test, sp_B, sp_C);
365  //assert(pass);
366  }
367 
368  MPI_Finalize();
369  return 0;
370 }
376 #endif
Matrix class which encapsulates a 2D tensor.
Definition: matrix.h:18
def rank(self)
Definition: core.pyx:312
Tensor< dtype > slice(int const *offsets, int const *ends) const
cuts out a slice (block) of this tensor A[offsets,ends) result will always be fully nonsymmetric ...
Definition: tensor.cxx:643
Definition: common.h:37
dtype get_val()
returns scalar value
void stop()
Definition: int_timer.cxx:151
local process walltime measurement
Definition: timer.h:50
int main(int argc, char **argv)
Semiring is a Monoid with an addition multiplicaton function addition must have an identity and be as...
Definition: semiring.h:359
Vector class which encapsulates a 1D tensor.
Definition: vector.h:14
custom bivariate function on two tensors: e.g. C["ij"] = f(A["ik"],B["kj"])
Definition: functions.h:137
int btwn_cnt(int n, World &dw, double sp=.20, int bsize=2, int nbatches=1, int test=0, bool sp_B=1, bool sp_C=1)
an instance of the CTF library (world) on a MPI communicator
Definition: world.h:19
string
Definition: core.pyx:456
CTF::Monoid< cpath > get_cpath_monoid()
dtype norm2()
computes the frobenius norm of the tensor (needs sqrt()!)
Definition: tensor.h:811
CTF::World * wrld
distributed processor context on which tensor is defined
Scalar class which encapsulates a 0D tensor.
Definition: scalar.h:13
int rank
rank of local processor
Definition: world.h:24
void btwn_cnt_naive(Matrix< int > &A, Vector< double > &v)
naive algorithm for betweenness centrality using 3D tensor of counts
double c
Definition: btwn_central.h:31
void print(char const *a, FILE *fp=stdout) const
prints the value
Definition: set.h:386
void sparsify()
reduce tensor to sparse format, storing only nonzero data, or data above a specified threshold...
Definition: tensor.cxx:449
void btwn_cnt_fast(Matrix< int > A, int b, Vector< double > &v, int nbatches=0, bool sp_B=true, bool sp_C=true)
fast algorithm for betweenness centrality using Bellman Ford
CTF::Bivar_Function< int, cpath, cpath > * get_Brandes_kernel()
int64_t nnz_tot
maximum number of local nonzero elements over all procs
epoch during which to measure timers
Definition: timer.h:69
int speye(int n, int order, World &dw)
Definition: speye.cxx:11
int set_zero()
elects a mapping and sets tensor data to zero
int nrow
Definition: matrix.h:20
char * getCmdOption(char **begin, char **end, const std::string &option)
void start()
Definition: int_timer.cxx:141
Definition: apsp.cxx:17
A Monoid is a Set equipped with a binary addition operator &#39;+&#39; or a custom function addition must hav...
Definition: monoid.h:69
an instance of a tensor within a CTF world
Definition: tensor.h:74
int w
Definition: btwn_central.h:18
int w
Definition: btwn_central.h:33
CTF::Bivar_Function< int, mpath, mpath > * get_Bellman_kernel()
int np
number of processors
Definition: world.h:26
def np(self)
Definition: core.pyx:315
CTF::Semiring< mpath > get_mpath_semiring()