Cyclops Tensor Framework
parallel arithmetic on multidimensional arrays
schedule.cxx
Go to the documentation of this file.
1 #include "common.h"
2 #include "schedule.h"
3 
4 using namespace CTF_int;
5 
6 namespace CTF {
7 
9 
11  global_schedule = this;
12  }
13 
15  assert(op->dependency_left == 0);
16 
17  typename std::vector<TensorOperation* >::iterator it;
18  for (it=op->successors.begin(); it!=op->successors.end(); it++) {
19  (*it)->dependency_left--;
20  assert((*it)->dependency_left >= 0);
21  if ((*it)->dependency_left == 0) {
22  ready_tasks.push_back(*it);
23  }
24  }
25  }
26 
28  return A->estimate_time() > B->estimate_time();
29  //return A->successors.size() > B->successors.size();
30  }
31 
35  struct PartitionOps {
36  int color;
38 
39  std::vector<TensorOperation*> ops; // operations to execute
40  std::set<Idx_Tensor*, tensor_name_less > local_tensors; // all local tensors used
41  std::map<tensor*, tensor*> remap; // mapping from global tensor -> local tensor
42 
43  std::set<Idx_Tensor*, tensor_name_less > global_tensors; // all referenced tensors stored as global tensors
44  std::set<Idx_Tensor*, tensor_name_less > output_tensors; // tensors to be written back out, stored as global tensors
45  };
46 
48  ScheduleTimer schedule_timer;
49  schedule_timer.total_time = MPI_Wtime();
50 
51  int rank, size;
52  MPI_Comm_rank(world->comm, &rank);
53  MPI_Comm_size(world->comm, &size);
54 
55  // Partition operations into worlds, and do split
56  std::vector<PartitionOps > comm_ops; // operations for each subcomm
57  int max_colors = size <= (int64_t)ready_tasks.size()? size : ready_tasks.size();
58  if (partitions > 0 && max_colors > partitions) {
59  max_colors = partitions;
60  }
61 
62  // Sort tasks by descending runtime
63  std::sort(ready_tasks.begin(), ready_tasks.end(), tensor_op_cost_greater);
64 
65  // Maximum load imbalance algorithm:
66  // Keep attempting to add the next available task until either reached max_colors
67  // (user-specified parameter or number of nodes) or the next added node would
68  // require less than one processor's worth of compute
69 
70  int max_starting_task = 0;
71  int max_num_tasks = 0;
72  int max_cost = 0;
73  // Try to find the longest sequence of tasks that aren't too imbalanced
74  for (int starting_task=0; starting_task<(int64_t)ready_tasks.size(); starting_task++) {
75  double sum_cost = 0;
76  double min_cost = 0;
77  int num_tasks = 0;
78  for (int i=starting_task; i<(int64_t)ready_tasks.size(); i++) {
79  double this_cost = ready_tasks[i]->estimate_time();
80  if (min_cost == 0 || this_cost < min_cost) {
81  min_cost = this_cost;
82  }
83  if (min_cost < (this_cost + sum_cost) / size) {
84  break;
85  } else {
86  num_tasks = i - starting_task + 1;
87  sum_cost += this_cost;
88  }
89  if (num_tasks >= max_colors) {
90  break;
91  }
92  }
93 
94  if (num_tasks > max_num_tasks) {
95  max_num_tasks = num_tasks;
96  max_starting_task = starting_task;
97  max_cost = sum_cost;
98  }
99  }
100 
101  // Do processor division according to estimated cost
102  // Algorithm: divide sum_cost into size blocks, and each processor samples the
103  // middle of its block to determine which task it works on
104  int color_sample_point = (max_cost / size) * rank + (max_cost / size / 2);
105  int my_color = 0;
106  for (int i=0; i<max_num_tasks; i++) {
107  my_color = i;
108  if (color_sample_point < ready_tasks[max_starting_task+i]->estimate_time()) {
109  break;
110  } else {
111  color_sample_point -= ready_tasks[max_starting_task+i]->estimate_time();
112  }
113  }
114 
115  MPI_Comm my_comm;
116  MPI_Comm_split(world->comm, my_color, rank, &my_comm);
117 
118  if (rank == 0) {
119  std::cout << "Maxparts " << max_colors << ", start " << max_starting_task <<
120  ", tasks " << max_num_tasks << " // ";
121  typename std::deque<TensorOperation*>::iterator ready_tasks_iter;
122  for (ready_tasks_iter=ready_tasks.begin();ready_tasks_iter!=ready_tasks.end();ready_tasks_iter++) {
123  std::cout << (*ready_tasks_iter)->name() << "(" << (*ready_tasks_iter)->estimate_time() << ") ";
124  }
125  std::cout << std::endl;
126  }
127 
128  for (int color=0; color<max_num_tasks; color++) {
129  comm_ops.push_back(PartitionOps());
130  comm_ops[color].color = color;
131  if (color == my_color) {
132  comm_ops[color].world = new World(my_comm);
133  } else {
134  comm_ops[color].world = NULL;
135  }
136  comm_ops[color].ops.push_back(ready_tasks[max_starting_task + color]);
137  }
138 
139  for (int color=0; color<max_num_tasks; color++) {
140  ready_tasks.erase(ready_tasks.begin() + max_starting_task);
141  }
142 
143  typename std::vector<PartitionOps >::iterator comm_op_iter;
144  // Initialize local data structures
145  for (comm_op_iter=comm_ops.begin(); comm_op_iter!=comm_ops.end(); comm_op_iter++) {
146  // gather required tensors
147  typename std::vector<TensorOperation*>::iterator op_iter;
148  for (op_iter=comm_op_iter->ops.begin(); op_iter!=comm_op_iter->ops.end(); op_iter++) {
149  assert(*op_iter != NULL);
150  (*op_iter)->get_inputs(&comm_op_iter->global_tensors);
151  (*op_iter)->get_outputs(&comm_op_iter->global_tensors);
152  (*op_iter)->get_outputs(&comm_op_iter->output_tensors);
153  }
154  }
155 
156  // Create and communicate tensors to subworlds
157  schedule_timer.comm_down_time = MPI_Wtime();
158  for (comm_op_iter=comm_ops.begin(); comm_op_iter!=comm_ops.end(); comm_op_iter++) {
159  typename std::set<Idx_Tensor*, tensor_name_less >::iterator global_tensor_iter;
160  for (global_tensor_iter=comm_op_iter->global_tensors.begin(); global_tensor_iter!=comm_op_iter->global_tensors.end(); global_tensor_iter++) {
161  Idx_Tensor* local_clone;
162  if (comm_op_iter->world != NULL) {
163  local_clone = new Idx_Tensor(*(*global_tensor_iter));//, *comm_op_iter->world);
164  } else {
165  local_clone = NULL;
166  }
167  comm_op_iter->local_tensors.insert(local_clone);
168  comm_op_iter->remap[(*global_tensor_iter)->parent] = local_clone->parent;
169  (*global_tensor_iter)->parent->add_to_subworld(local_clone->parent, (*global_tensor_iter)->sr->mulid(), (*global_tensor_iter)->sr->addid());
170  }
171  typename std::set<Idx_Tensor*, tensor_name_less >::iterator output_tensor_iter;
172  for (output_tensor_iter=comm_op_iter->output_tensors.begin(); output_tensor_iter!=comm_op_iter->output_tensors.end(); output_tensor_iter++) {
173  assert(comm_op_iter->remap.find((*output_tensor_iter)->parent) != comm_op_iter->remap.end());
174  }
175  }
176  schedule_timer.comm_down_time = MPI_Wtime() - schedule_timer.comm_down_time;
177 
178  // Run my tasks
179  MPI_Barrier(world->comm);
180  schedule_timer.exec_time = MPI_Wtime();
181  if ((int64_t)comm_ops.size() > my_color) {
182  typename std::vector<TensorOperation*>::iterator op_iter;
183  for (op_iter=comm_ops[my_color].ops.begin(); op_iter!=comm_ops[my_color].ops.end(); op_iter++) {
184  (*op_iter)->execute(&comm_ops[my_color].remap);
185  }
186  }
187  double my_exec_time = MPI_Wtime() - schedule_timer.exec_time;
188  MPI_Barrier(world->comm);
189  schedule_timer.exec_time = MPI_Wtime() - schedule_timer.exec_time;
190 
191  // Instrument imbalance
192  double min_exec, max_exec, my_imbal, accum_imbal;
193  MPI_Allreduce(&my_exec_time, &min_exec, 1, MPI_DOUBLE, MPI_MIN, world->comm);
194  MPI_Allreduce(&my_exec_time, &max_exec, 1, MPI_DOUBLE, MPI_MAX, world->comm);
195  schedule_timer.imbalance_wall_time = max_exec - min_exec;
196 
197  my_imbal = my_exec_time - min_exec;
198  MPI_Allreduce(&my_imbal, &accum_imbal, 1, MPI_DOUBLE, MPI_SUM, world->comm);
199  schedule_timer.imbalance_acuum_time = accum_imbal;
200 
201  // Communicate results back into global
202  schedule_timer.comm_up_time = MPI_Wtime();
203  for (comm_op_iter=comm_ops.begin(); comm_op_iter!=comm_ops.end(); comm_op_iter++) {
204  typename std::set<Idx_Tensor*, tensor_name_less >::iterator output_tensor_iter;
205  for (output_tensor_iter=comm_op_iter->output_tensors.begin(); output_tensor_iter!=comm_op_iter->output_tensors.end(); output_tensor_iter++) {
206  (*output_tensor_iter)->parent->add_from_subworld(comm_op_iter->remap[(*output_tensor_iter)->parent], (*output_tensor_iter)->sr->mulid(), (*output_tensor_iter)->sr->addid());
207  }
208  }
209  schedule_timer.comm_up_time = MPI_Wtime() - schedule_timer.comm_up_time;
210 
211  // Clean up local tensors & world
212  if ((int64_t)comm_ops.size() > my_color) {
213  typename std::set<Idx_Tensor*, tensor_name_less >::iterator local_tensor_iter;
214  for (local_tensor_iter=comm_ops[my_color].local_tensors.begin(); local_tensor_iter!=comm_ops[my_color].local_tensors.end(); local_tensor_iter++) {
215  delete *local_tensor_iter;
216  }
217  delete comm_ops[my_color].world;
218  }
219 
220  // Update ready tasks
221  for (comm_op_iter=comm_ops.begin(); comm_op_iter!=comm_ops.end(); comm_op_iter++) {
222  typename std::vector<TensorOperation*>::iterator op_iter;
223  for (op_iter=comm_op_iter->ops.begin(); op_iter!=comm_op_iter->ops.end(); op_iter++) {
224  schedule_op_successors(*op_iter);
225  }
226  }
227 
228  schedule_timer.total_time = MPI_Wtime() - schedule_timer.total_time;
229  return schedule_timer;
230  }
231 
232  /*
233  // The dead simple scheduler
234  void Schedule::partition_and_execute() {
235  while (ready_tasks.size() >= 1) {
236  TensorOperation* op = ready_tasks.front();
237  ready_tasks.pop_front();
238  op->execute();
239  schedule_op_successors(op);
240  }
241  }
242  */
243 
245  ScheduleTimer schedule_timer;
246 
247  global_schedule = NULL;
248 
249  typename std::deque<TensorOperation*>::iterator it;
250 
251  // Initialize all tasks & initial ready queue
252  for (it = steps_original.begin(); it != steps_original.end(); it++) {
253  (*it)->dependency_left = (*it)->dependency_count;
254  }
256 
257  // Preprocess dummy operations
258  while (!ready_tasks.empty()) {
259  if (ready_tasks.front()->is_dummy()) {
261  ready_tasks.pop_front();
262  } else {
263  break;
264  }
265  }
266 
267  while (!ready_tasks.empty()) {
268  int rank;
269  MPI_Comm_rank(world->comm, &rank);
270  ScheduleTimer iter_timer = partition_and_execute();
271  if (rank == 0) {
272  printf("Schedule imbalance, wall: %lf; accum: %lf\n", iter_timer.imbalance_wall_time, iter_timer.imbalance_acuum_time);
273  }
274  schedule_timer += iter_timer;
275  }
276  return schedule_timer;
277  }
278 
280  steps_original.push_back(op);
281 
282  std::set<Idx_Tensor*, tensor_name_less > op_lhs_set;
283  op->get_outputs(&op_lhs_set);
284  assert(op_lhs_set.size() == 1); // limited case to make this a bit easier
285  tensor* op_lhs = (*op_lhs_set.begin())->parent;
286 
287  std::set<Idx_Tensor*, tensor_name_less > op_deps;
288  op->get_inputs(&op_deps);
289 
290  typename std::set<Idx_Tensor*, tensor_name_less >::iterator deps_iter;
291  for (deps_iter = op_deps.begin(); deps_iter != op_deps.end(); deps_iter++) {
292  tensor* dep = (*deps_iter)->parent;
293  typename std::map<tensor*, TensorOperation*>::iterator dep_loc = latest_write.find(dep);
294  TensorOperation* dep_op;
295  if (dep_loc != latest_write.end()) {
296  dep_op = dep_loc->second;
297  } else {
298  // create dummy operation to serve as a root dependency
299  // TODO: this can be optimized away
300  dep_op = new TensorOperation(TENSOR_OP_NONE, NULL, NULL);
301  latest_write[dep] = dep_op;
302  root_tasks.push_back(dep_op);
303  steps_original.push_back(dep_op);
304  }
305 
306  dep_op->successors.push_back(op);
307  dep_op->reads.push_back(op);
308  op->dependency_count++;
309  }
310  typename std::map<tensor*, TensorOperation*>::iterator prev_loc = latest_write.find(op_lhs);
311  if (prev_loc != latest_write.end()) {
312  // if there was a previous write, add its dependencies to my dependencies
313  // to ensure that I don't clobber values that a ready dependency needs
314  std::vector<TensorOperation*>* prev_reads = &(prev_loc->second->reads);
315  typename std::vector<TensorOperation*>::iterator prev_iter;
316  for (prev_iter = prev_reads->begin(); prev_iter != prev_reads->end(); prev_iter++) {
317  if (*prev_iter != op) {
318  (*prev_iter)->successors.push_back(op);
319  op->dependency_count++;
320  }
321  }
322  }
323 
324  latest_write[op_lhs] = op;
325  }
326 
328  TensorOperation* op_typed = dynamic_cast<TensorOperation* >(op);
329  assert(op_typed != NULL);
330  add_operation_typed(op_typed);
331  }
332 
333  void TensorOperation::execute(std::map<tensor*, tensor*>* remap) {
334  assert(global_schedule == NULL); // ensure this isn't going into a record()
335 
336  Idx_Tensor* remapped_lhs = lhs;
337  const Term* remapped_rhs = rhs;
338 
339  if (remap != NULL) {
340  remapped_lhs = dynamic_cast<Idx_Tensor* >(remapped_lhs->clone(remap));
341  assert(remapped_lhs != NULL);
342  remapped_rhs = remapped_rhs->clone(remap);
343  }
344 
345  switch (op) {
346  case TENSOR_OP_NONE:
347  break;
348  case TENSOR_OP_SET:
349  *remapped_lhs = *remapped_rhs;
350  break;
351  case TENSOR_OP_SUM:
352  *remapped_lhs += *remapped_rhs;
353  break;
354  case TENSOR_OP_SUBTRACT:
355  *remapped_lhs -= *remapped_rhs;
356  break;
357  case TENSOR_OP_MULTIPLY:
358  *remapped_lhs *= *remapped_rhs;
359  break;
360  default:
361  std::cerr << "TensorOperation::execute(): unexpected op: " << op << std::endl;
362  assert(false);
363  }
364  }
365 
366  void TensorOperation::get_outputs(std::set<Idx_Tensor*, tensor_name_less >* outputs_set) const {
367  assert(lhs->parent);
368  assert(outputs_set != NULL);
369  outputs_set->insert(lhs);
370  }
371 
372  void TensorOperation::get_inputs(std::set<Idx_Tensor*, tensor_name_less >* inputs_set) const {
373  rhs->get_inputs(inputs_set);
374 
375  switch (op) {
376  case TENSOR_OP_SET:
377  break;
378  case TENSOR_OP_SUM:
379  case TENSOR_OP_SUBTRACT:
380  case TENSOR_OP_MULTIPLY:
381  assert(lhs->parent != NULL);
382  inputs_set->insert(lhs);
383  break;
384  default:
385  std::cerr << "TensorOperation::get_inputs(): unexpected op: " << op << std::endl;
386  assert(false);
387  }
388  }
389 
391  if (cached_estimated_cost == 0) {
392  assert(rhs != NULL);
393  assert(lhs != NULL);
394  cached_estimated_cost = rhs->estimate_time(*lhs);
395  assert(cached_estimated_cost > 0);
396  }
397  return cached_estimated_cost;
398  }
399 }
a term is an abstract object representing some expression of tensors
Definition: term.h:33
World * world
Definition: schedule.h:186
void get_outputs(std::set< Idx_Tensor *, CTF_int::tensor_name_less > *outputs_set) const
appends the tensors this writes to to the input set
Definition: schedule.cxx:366
def rank(self)
Definition: core.pyx:312
double estimate_time()
provides an estimated runtime cost
Definition: schedule.cxx:390
void record()
Starts recording all tensor operations to this schedule (instead of executing them immediately) ...
Definition: schedule.cxx:10
double comm_down_time
Definition: schedule.h:112
bool tensor_op_cost_greater(TensorOperation *A, TensorOperation *B)
Definition: schedule.cxx:27
void schedule_op_successors(TensorOperation *op)
Call when a tensor op finishes, this adds newly enabled ops to the ready queue.
Definition: schedule.cxx:14
std::set< Idx_Tensor *, tensor_name_less > output_tensors
Definition: schedule.cxx:44
A tensor operation, containing all the data (op, lhs, rhs) required to run it. Also provides methods ...
Definition: schedule.h:34
an instance of the CTF library (world) on a MPI communicator
Definition: world.h:19
std::deque< TensorOperation * > root_tasks
Definition: schedule.h:217
std::map< tensor *, tensor * > remap
Definition: schedule.cxx:41
void add_operation(TensorOperationBase *op)
Definition: schedule.cxx:327
std::deque< TensorOperation * > steps_original
Definition: schedule.h:220
std::vector< TensorOperation * > successors
Definition: schedule.h:80
void add_operation_typed(TensorOperation *op)
Adds a tensor operation to this schedule. THIS IS CALL ORDER DEPENDENT - operations will appear to ex...
Definition: schedule.cxx:279
CTF::World World
Definition: back_comp.h:7
virtual Term * clone(std::map< tensor *, tensor * > *remap=NULL) const =0
base classes must implement this copy function to retrieve pointer
std::map< CTF_int::tensor *, TensorOperation * > latest_write
Definition: schedule.h:223
algstrct * sr
algstrct on which tensor elements and operations are defined
std::vector< TensorOperation * > ops
Definition: schedule.cxx:39
CTF_int::Term * clone(std::map< CTF_int::tensor *, CTF_int::tensor * > *remap=NULL) const
base classes must implement this copy function to retrieve pointer
Definition: idx_tensor.cxx:168
Provides a untemplated base class for tensor operations.
Definition: schedule.h:23
void execute(std::map< CTF_int::tensor *, CTF_int::tensor * > *remap=NULL)
runs this operation, but does NOT handle dependency scheduling optionally takes a remapping of tensor...
Definition: schedule.cxx:333
double imbalance_acuum_time
Definition: schedule.h:115
ScheduleTimer execute()
Executes the schedule and implicitly terminates recording.
Definition: schedule.cxx:244
void add_to_subworld(tensor *tsr_sub, char const *alpha, char const *beta)
accumulates this tensor to a tensor object defined on a different world
std::vector< TensorOperation * > reads
Definition: schedule.h:81
Definition: apsp.cxx:17
internal distributed tensor class
double comm_up_time
Definition: schedule.h:116
double imbalance_wall_time
Definition: schedule.h:114
ScheduleBase * global_schedule
Definition: schedule.cxx:8
int partitions
Definition: schedule.h:234
std::set< Idx_Tensor *, tensor_name_less > global_tensors
Definition: schedule.cxx:43
virtual char const * mulid() const
identity element for multiplication i.e. 1
Definition: algstrct.cxx:93
std::set< Idx_Tensor *, tensor_name_less > local_tensors
Definition: schedule.cxx:40
std::deque< TensorOperation * > ready_tasks
Definition: schedule.h:229
void get_inputs(std::set< Idx_Tensor *, CTF_int::tensor_name_less > *inputs_set) const
appends the tensors this depends on (reads from, including the output if a previous value is required...
Definition: schedule.cxx:372
a tensor with an index map associated with it (necessary for overloaded operators) ...
Definition: idx_tensor.h:15
Data structure containing what each partition is going to do.
Definition: schedule.cxx:35
MPI_Comm comm
set of processors making up this world
Definition: world.h:22
ScheduleTimer partition_and_execute()
Executes a slide of the ready_queue, partitioning it among the processors in the grid.
Definition: schedule.cxx:47
CTF_int::tensor * parent
Definition: idx_tensor.h:17