11 global_schedule =
this;
17 typename std::vector<TensorOperation* >::iterator
it;
19 (*it)->dependency_left--;
20 assert((*it)->dependency_left >= 0);
21 if ((*it)->dependency_left == 0) {
39 std::vector<TensorOperation*>
ops;
41 std::map<tensor*, tensor*>
remap;
56 std::vector<PartitionOps > comm_ops;
70 int max_starting_task = 0;
71 int max_num_tasks = 0;
74 for (
int starting_task=0; starting_task<(int64_t)
ready_tasks.size(); starting_task++) {
78 for (
int i=starting_task; i<(int64_t)
ready_tasks.size(); i++) {
80 if (min_cost == 0 || this_cost < min_cost) {
83 if (min_cost < (this_cost + sum_cost) / size) {
86 num_tasks = i - starting_task + 1;
87 sum_cost += this_cost;
89 if (num_tasks >= max_colors) {
94 if (num_tasks > max_num_tasks) {
95 max_num_tasks = num_tasks;
96 max_starting_task = starting_task;
104 int color_sample_point = (max_cost / size) * rank + (max_cost / size / 2);
106 for (
int i=0; i<max_num_tasks; i++) {
108 if (color_sample_point <
ready_tasks[max_starting_task+i]->estimate_time()) {
111 color_sample_point -=
ready_tasks[max_starting_task+i]->estimate_time();
116 MPI_Comm_split(
world->
comm, my_color, rank, &my_comm);
119 std::cout <<
"Maxparts " << max_colors <<
", start " << max_starting_task <<
120 ", tasks " << max_num_tasks <<
" // ";
121 typename std::deque<TensorOperation*>::iterator ready_tasks_iter;
123 std::cout << (*ready_tasks_iter)->name() <<
"(" << (*ready_tasks_iter)->estimate_time() <<
") ";
125 std::cout << std::endl;
128 for (
int color=0; color<max_num_tasks; color++) {
130 comm_ops[color].color = color;
131 if (color == my_color) {
132 comm_ops[color].world =
new World(my_comm);
134 comm_ops[color].world = NULL;
136 comm_ops[color].ops.push_back(
ready_tasks[max_starting_task + color]);
139 for (
int color=0; color<max_num_tasks; color++) {
143 typename std::vector<PartitionOps >::iterator comm_op_iter;
145 for (comm_op_iter=comm_ops.begin(); comm_op_iter!=comm_ops.end(); comm_op_iter++) {
147 typename std::vector<TensorOperation*>::iterator op_iter;
148 for (op_iter=comm_op_iter->ops.begin(); op_iter!=comm_op_iter->ops.end(); op_iter++) {
149 assert(*op_iter != NULL);
150 (*op_iter)->get_inputs(&comm_op_iter->global_tensors);
151 (*op_iter)->get_outputs(&comm_op_iter->global_tensors);
152 (*op_iter)->get_outputs(&comm_op_iter->output_tensors);
158 for (comm_op_iter=comm_ops.begin(); comm_op_iter!=comm_ops.end(); comm_op_iter++) {
159 typename std::set<Idx_Tensor*, tensor_name_less >::iterator global_tensor_iter;
160 for (global_tensor_iter=comm_op_iter->global_tensors.begin(); global_tensor_iter!=comm_op_iter->global_tensors.end(); global_tensor_iter++) {
162 if (comm_op_iter->world != NULL) {
163 local_clone =
new Idx_Tensor(*(*global_tensor_iter));
167 comm_op_iter->local_tensors.insert(local_clone);
168 comm_op_iter->remap[(*global_tensor_iter)->parent] = local_clone->
parent;
169 (*global_tensor_iter)->parent->
add_to_subworld(local_clone->
parent, (*global_tensor_iter)->
sr->
mulid(), (*global_tensor_iter)->sr->addid());
171 typename std::set<Idx_Tensor*, tensor_name_less >::iterator output_tensor_iter;
172 for (output_tensor_iter=comm_op_iter->output_tensors.begin(); output_tensor_iter!=comm_op_iter->output_tensors.end(); output_tensor_iter++) {
173 assert(comm_op_iter->remap.find((*output_tensor_iter)->parent) != comm_op_iter->remap.end());
181 if ((int64_t)comm_ops.size() > my_color) {
182 typename std::vector<TensorOperation*>::iterator op_iter;
183 for (op_iter=comm_ops[my_color].ops.begin(); op_iter!=comm_ops[my_color].ops.end(); op_iter++) {
184 (*op_iter)->execute(&comm_ops[my_color].remap);
187 double my_exec_time = MPI_Wtime() - schedule_timer.
exec_time;
192 double min_exec, max_exec, my_imbal, accum_imbal;
193 MPI_Allreduce(&my_exec_time, &min_exec, 1, MPI_DOUBLE, MPI_MIN,
world->
comm);
194 MPI_Allreduce(&my_exec_time, &max_exec, 1, MPI_DOUBLE, MPI_MAX,
world->
comm);
197 my_imbal = my_exec_time - min_exec;
198 MPI_Allreduce(&my_imbal, &accum_imbal, 1, MPI_DOUBLE, MPI_SUM,
world->
comm);
203 for (comm_op_iter=comm_ops.begin(); comm_op_iter!=comm_ops.end(); comm_op_iter++) {
204 typename std::set<Idx_Tensor*, tensor_name_less >::iterator output_tensor_iter;
205 for (output_tensor_iter=comm_op_iter->output_tensors.begin(); output_tensor_iter!=comm_op_iter->output_tensors.end(); output_tensor_iter++) {
206 (*output_tensor_iter)->parent->add_from_subworld(comm_op_iter->remap[(*output_tensor_iter)->parent], (*output_tensor_iter)->sr->mulid(), (*output_tensor_iter)->sr->addid());
212 if ((int64_t)comm_ops.size() > my_color) {
213 typename std::set<Idx_Tensor*, tensor_name_less >::iterator local_tensor_iter;
214 for (local_tensor_iter=comm_ops[my_color].local_tensors.begin(); local_tensor_iter!=comm_ops[my_color].local_tensors.end(); local_tensor_iter++) {
215 delete *local_tensor_iter;
217 delete comm_ops[my_color].world;
221 for (comm_op_iter=comm_ops.begin(); comm_op_iter!=comm_ops.end(); comm_op_iter++) {
222 typename std::vector<TensorOperation*>::iterator op_iter;
223 for (op_iter=comm_op_iter->ops.begin(); op_iter!=comm_op_iter->ops.end(); op_iter++) {
229 return schedule_timer;
247 global_schedule = NULL;
249 typename std::deque<TensorOperation*>::iterator
it;
253 (*it)->dependency_left = (*it)->dependency_count;
274 schedule_timer += iter_timer;
276 return schedule_timer;
282 std::set<Idx_Tensor*, tensor_name_less > op_lhs_set;
284 assert(op_lhs_set.size() == 1);
285 tensor* op_lhs = (*op_lhs_set.begin())->parent;
287 std::set<Idx_Tensor*, tensor_name_less > op_deps;
290 typename std::set<Idx_Tensor*, tensor_name_less >::iterator deps_iter;
291 for (deps_iter = op_deps.begin(); deps_iter != op_deps.end(); deps_iter++) {
292 tensor* dep = (*deps_iter)->parent;
293 typename std::map<tensor*, TensorOperation*>::iterator dep_loc =
latest_write.find(dep);
296 dep_op = dep_loc->second;
307 dep_op->
reads.push_back(op);
310 typename std::map<tensor*, TensorOperation*>::iterator prev_loc =
latest_write.find(op_lhs);
314 std::vector<TensorOperation*>* prev_reads = &(prev_loc->second->reads);
315 typename std::vector<TensorOperation*>::iterator prev_iter;
316 for (prev_iter = prev_reads->begin(); prev_iter != prev_reads->end(); prev_iter++) {
317 if (*prev_iter != op) {
318 (*prev_iter)->successors.push_back(op);
329 assert(op_typed != NULL);
334 assert(global_schedule == NULL);
337 const Term* remapped_rhs = rhs;
341 assert(remapped_lhs != NULL);
342 remapped_rhs = remapped_rhs->
clone(remap);
349 *remapped_lhs = *remapped_rhs;
352 *remapped_lhs += *remapped_rhs;
355 *remapped_lhs -= *remapped_rhs;
358 *remapped_lhs *= *remapped_rhs;
361 std::cerr <<
"TensorOperation::execute(): unexpected op: " << op << std::endl;
368 assert(outputs_set != NULL);
369 outputs_set->insert(lhs);
373 rhs->get_inputs(inputs_set);
381 assert(lhs->parent != NULL);
382 inputs_set->insert(lhs);
385 std::cerr <<
"TensorOperation::get_inputs(): unexpected op: " << op << std::endl;
391 if (cached_estimated_cost == 0) {
394 cached_estimated_cost = rhs->estimate_time(*lhs);
395 assert(cached_estimated_cost > 0);
397 return cached_estimated_cost;
a term is an abstract object representing some expression of tensors
void get_outputs(std::set< Idx_Tensor *, CTF_int::tensor_name_less > *outputs_set) const
appends the tensors this writes to to the input set
double estimate_time()
provides an estimated runtime cost
void record()
Starts recording all tensor operations to this schedule (instead of executing them immediately) ...
bool tensor_op_cost_greater(TensorOperation *A, TensorOperation *B)
void schedule_op_successors(TensorOperation *op)
Call when a tensor op finishes, this adds newly enabled ops to the ready queue.
std::set< Idx_Tensor *, tensor_name_less > output_tensors
A tensor operation, containing all the data (op, lhs, rhs) required to run it. Also provides methods ...
an instance of the CTF library (world) on a MPI communicator
std::deque< TensorOperation * > root_tasks
std::map< tensor *, tensor * > remap
void add_operation(TensorOperationBase *op)
std::deque< TensorOperation * > steps_original
std::vector< TensorOperation * > successors
void add_operation_typed(TensorOperation *op)
Adds a tensor operation to this schedule. THIS IS CALL ORDER DEPENDENT - operations will appear to ex...
virtual Term * clone(std::map< tensor *, tensor * > *remap=NULL) const =0
base classes must implement this copy function to retrieve pointer
std::map< CTF_int::tensor *, TensorOperation * > latest_write
algstrct * sr
algstrct on which tensor elements and operations are defined
std::vector< TensorOperation * > ops
CTF_int::Term * clone(std::map< CTF_int::tensor *, CTF_int::tensor * > *remap=NULL) const
base classes must implement this copy function to retrieve pointer
Provides a untemplated base class for tensor operations.
void execute(std::map< CTF_int::tensor *, CTF_int::tensor * > *remap=NULL)
runs this operation, but does NOT handle dependency scheduling optionally takes a remapping of tensor...
double imbalance_acuum_time
ScheduleTimer execute()
Executes the schedule and implicitly terminates recording.
void add_to_subworld(tensor *tsr_sub, char const *alpha, char const *beta)
accumulates this tensor to a tensor object defined on a different world
std::vector< TensorOperation * > reads
internal distributed tensor class
double imbalance_wall_time
ScheduleBase * global_schedule
std::set< Idx_Tensor *, tensor_name_less > global_tensors
virtual char const * mulid() const
identity element for multiplication i.e. 1
std::set< Idx_Tensor *, tensor_name_less > local_tensors
std::deque< TensorOperation * > ready_tasks
void get_inputs(std::set< Idx_Tensor *, CTF_int::tensor_name_less > *inputs_set) const
appends the tensors this depends on (reads from, including the output if a previous value is required...
a tensor with an index map associated with it (necessary for overloaded operators) ...
Data structure containing what each partition is going to do.
MPI_Comm comm
set of processors making up this world
ScheduleTimer partition_and_execute()
Executes a slide of the ready_queue, partitioning it among the processors in the grid.