blob: 0172ee86ce9f1a85bf16781fdcf62f3fcd06efdd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "singa/core/scheduler.h"
#include <algorithm>
#include <functional>
#include <iomanip>
#include <sstream>
#include <thread>
#include "singa/core/device.h"
#include "singa/utils/safe_queue.h"
namespace singa {
void Node::AddInEdge(Edge *in_edge) { in_edges_.push_back(in_edge); }
void Node::AddOutEdge(Edge *out_edge) { out_edges_.push_back(out_edge); }
void Edge::SetBlock(Block *blk) { blk_ = blk; }
void Edge::SetSrcNode(Node *src_node) { src_node_ = src_node; }
void Edge::SetDstNode(Node *dst_node) { dst_node_ = dst_node; }
Graph::Graph(Device *device) : device_(device) {}
Graph::~Graph() { Reset(); }
Node *BlkInfo::used_node(const size_t idx) const {
CHECK_LT(idx, used_nodes_.size());
return used_nodes_[idx];
}
Node *Graph::node(const size_t idx) const {
CHECK_LT(idx, nodes_.size());
return nodes_[idx];
}
Edge *Graph::edge(const size_t idx) const {
CHECK_LT(idx, edges_.size());
return edges_[idx];
}
BlkInfo *Graph::block(Block *blk) const {
auto it = blocks_.find(blk);
CHECK(it != blocks_.end());
return it->second;
}
Node *Graph::begin_node(const size_t idx) const {
CHECK_LT(idx, begin_nodes_.size());
return begin_nodes_[idx];
}
const NodeVec &Graph::next_nodes(const size_t idx) const {
CHECK_LT(idx, next_nodes_.size());
return next_nodes_[idx];
}
const BlockVec &Graph::free_blocks(const size_t idx) const {
CHECK_LT(idx, free_blocks_.size());
return free_blocks_[idx];
}
void Graph::Reset() {
for (auto it : nodes_) {
delete it;
}
nodes_.clear();
for (auto it : edges_) {
delete it;
}
edges_.clear();
for (auto it : blocks_) {
delete it.second;
}
blocks_.clear();
leaf_blocks_.clear();
iteration_ = 0;
time_elapsed_ = 0;
dirty_ = false;
in_serial_ = false;
}
void Graph::Debug() {
if (dirty_) Analyze();
int w = 0;
size_t max_in_num = 0, max_out_num = 0, max_next_num = 0, max_free_num = 0;
for (auto &it : nodes_) {
if (it->op_name_ == "Waiting") continue;
max_in_num = std::max(max_in_num, it->in_edges_.size());
max_out_num = std::max(max_out_num, it->out_edges_.size());
}
for (auto &it : next_nodes_) {
max_next_num = std::max(max_next_num, it.size());
}
for (auto &it : free_blocks_) {
max_free_num = std::max(max_free_num, it.size());
}
size_t max_size = std::max(nodes_.size(), blocks_.size());
for (size_t i = max_size; i > 0; i /= 10, ++w) {
}
std::stringstream ss;
ss << "begin nodes:[";
for (size_t i = 0; i < begin_nodes_.size(); ++i) {
ss << begin_nodes_[i]->id_ << " ";
}
ss << "]" << std::endl;
size_t size = 0;
for (size_t i = 0; i < nodes_.size(); ++i) {
ss << "OP[" << std::setw(w) << i;
auto node = nodes_[i];
string name;
if (node->op_name_.size() > 16) {
name = node->op_name_.substr(0, 13) + "...";
} else {
name = node->op_name_;
}
ss << "] Inputs:[";
size = node->in_edges_.size();
for (size_t j = 0; j < std::max(max_in_num, size); ++j) {
if (j < size)
ss << std::setw(w) << blocks_[node->in_edges_[j]->blk_]->id_ << " ";
else
ss << std::setw(w + 1) << " ";
}
ss << "] Outputs:[";
size = node->out_edges_.size();
for (size_t j = 0; j < std::max(max_out_num, size); ++j) {
if (j < size)
ss << std::setw(w) << blocks_[node->out_edges_[j]->blk_]->id_ << " ";
else
ss << std::setw(w + 1) << " ";
}
ss << "] Next nodes:[";
size = next_nodes_[i].size();
for (size_t j = 0; j < max_next_num; ++j) {
if (j < size)
ss << std::setw(w) << next_nodes_[i][j]->id_ << " ";
else
ss << std::setw(w + 1) << " ";
}
ss << "] Free blocks:[";
size = free_blocks_[i].size();
for (size_t j = 0; j < max_free_num; ++j) {
if (j < size)
ss << std::setw(w) << blocks_[free_blocks_[i][j]]->id_ << " ";
else
ss << std::setw(w + 1) << " ";
}
ss << "]" << std::endl;
}
size_t max_used_num = 0;
std::vector<BlkInfo *> blkInfos;
blkInfos.resize(blocks_.size());
for (auto it : blocks_) {
blkInfos[it.second->id_] = it.second;
max_used_num = std::max(max_used_num, it.second->used_nodes_.size());
}
for (auto it : blkInfos) {
auto blkInfo = it;
ss << "Block[" << std::setw(w) << blkInfo->id_ << "] addr[" << std::setw(10)
<< blkInfo->blk_ << "] size[" << std::setw(10) << blkInfo->blk_->size()
<< "] graph_ref[" << std::setw(w) << blkInfo->graph_ref_
<< "] ref_count[" << std::setw(w) << blkInfo->blk_->ref_count() << "] ";
switch (blkInfo->type_) {
case BlockType::kInput:
ss << "type[input] ";
break;
case BlockType::kParam:
ss << "type[param] ";
break;
case BlockType::kInter:
ss << "type[inter] ";
break;
case BlockType::kEnd:
ss << "type[_end_] ";
break;
default:
break;
}
int id = -1;
if (blkInfo->write_edge_) {
id = blkInfo->write_edge_->src_node_->id_;
}
ss << " write_node[" << std::setw(w) << id;
ss << "] used_nodes[";
size = blkInfo->used_nodes_.size();
for (size_t i = 0; i < max_used_num; ++i) {
if (i < size)
ss << std::setw(w) << blkInfo->used_nodes_[i]->id_ << " ";
else
ss << std::setw(w + 1) << " ";
}
ss << "]" << std::endl;
}
printf("%s", ss.str().c_str());
}
void Graph::PrintTimeProfiling() {
std::stringstream ss;
// verbosity level: 1 -> forward and backward propagation time
if (device_->verbosity() == 1) {
bool forward = true;
float forward_time = 0;
float backward_time = 0;
float time_elapsed;
for (size_t i = 0; i < nodes_.size(); ++i)
if (nodes_[i]->time_elapsed() > 0) {
if (forward == true)
// check the op of cross entropy backward, after that are backward ops
// note that the function is more accurate when either
// SoftmaxCrossEntropy or Softmax is used
if (nodes_[i]->op_name().find("Backward") != std::string::npos)
forward = false;
// when forward becomes false, it starts the backward propagation
time_elapsed = (nodes_[i]->time_elapsed()) /
(iteration_ - device_->skip_iteration());
if (forward == true) forward_time += time_elapsed;
}
backward_time = (time_elapsed_ / (iteration_ - device_->skip_iteration())) -
forward_time;
ss << std::endl << "Time Profiling:" << std::endl;
ss << "Forward Propagation Time : " << forward_time << " sec" << std::endl;
ss << "Backward Propagation Time : " << backward_time << " sec"
<< std::endl;
}
// verbosity level: 2 -> each operation time (OP_ID, operation name, time)
if (device_->verbosity() == 2) {
ss << std::endl << "Time Profiling:" << std::endl;
for (size_t i = 0; i < nodes_.size(); ++i)
if (nodes_[i]->time_elapsed() > 0)
ss << "OP_ID" << nodes_[i]->id_ << ". " << nodes_[i]->op_name() << " : "
<< (nodes_[i]->time_elapsed()) / (iteration_) << " sec" << std::endl;
}
// verbosity level: 3 -> Distributed training operations
if (device_->verbosity() == 3) {
ss << std::endl << "Time Profiling:" << std::endl;
for (size_t i = 0; i < nodes_.size(); ++i)
if ((nodes_[i]->op_name().find("Dist") != std::string::npos) &&
(nodes_[i]->time_elapsed() > 0))
ss << "OP_ID" << nodes_[i]->id_ << ". " << nodes_[i]->op_name() << " : "
<< (nodes_[i]->time_elapsed()) / (iteration_) << " sec" << std::endl;
}
printf("%s", ss.str().c_str());
}
void Graph::TimeProfilingDoExec(Node *curNode) {
if ((device_->verbosity() > 0) && (curNode->op_name_ != "Waiting") &&
(iteration_ >= device_->skip_iteration()))
device_->TimeProfilingDoExec(std::move(curNode->op_), 0, curNode);
else
device_->DoExec(std::move(curNode->op_), 0);
}
void Graph::EvaluateTimeElapsed(const TimePoint &start) {
if ((device_->verbosity() > 0) && (iteration_ > device_->skip_iteration())) {
device_->Sync();
std::chrono::duration<float> duration =
std::chrono::high_resolution_clock::now() - start;
time_elapsed_inc(duration.count());
for (size_t i = 0; i < nodes_.size(); ++i) {
Node *curNode = nodes_[i];
if (curNode->op_name_ != "Waiting") {
device_->EvaluateTimeElapsed(curNode);
}
}
}
}
void Graph::TakeStartTime(TimePoint &start) {
if ((device_->verbosity() > 0) && (iteration_ >= device_->skip_iteration())) {
device_->Sync();
start = std::chrono::high_resolution_clock::now();
}
}
void Graph::RunGraph() {
in_serial_ = false;
if (dirty_) Analyze();
TimePoint start;
SafeQueue<Node *> node_queue;
// activate nodes
for (auto it : begin_nodes_) {
node_queue.Push(it);
}
TakeStartTime(start);
// run graph
while (node_queue.Size()) {
// step 1: pop the first element, get the node corresponding to the index
Node *curNode = nullptr;
node_queue.Pop(curNode);
int curIndex = curNode->id_;
// step 2: execute the operation
TimeProfilingDoExec(curNode);
// step 3: release some blocks' data that won't be used later
for (auto it : free_blocks_[curIndex]) {
it->free_data();
}
/*
if (free_blocks_[curIndex].size()) {
CBData *cb_data = new CBData(this, curNode);
cudaStreamAddCallback(device_->ctx_.stream, Graph::Callback, (void
*)(cb_data), 0);
}
*/
// step 4: activate the following nodes
for (auto it : next_nodes_[curIndex]) {
node_queue.Push(it);
}
}
// increment iteration counter
step();
EvaluateTimeElapsed(start);
}
void Graph::RunInSerial() {
in_serial_ = true;
if (dirty_) Analyze();
TimePoint start;
TakeStartTime(start);
for (size_t i = 0; i < nodes_.size(); ++i) {
Node *curNode = nodes_[i];
// step 1: execute the operation
TimeProfilingDoExec(curNode);
// step 2: release some blocks' data that won't be used later
for (auto it : free_blocks_[i]) {
it->free_data();
}
/*
// Wait for calculation to complete and then recyle the data
CBData *cb_data = new CBData(this, curNode);
CHECK(cudaStreamAddCallback(device_->ctx_.stream, Graph::Callback, (void
*)(cb_data), 0));
*/
}
// increment iteration counter
step();
EvaluateTimeElapsed(start);
}
void Graph::AddOperation(OpFunc &&op, const BlockVec &read_blocks,
const BlockVec &write_blocks, string op_name) {
dirty_ = true;
// if the size of both read_blocks and write_blocks is zero,
// this operation is used for synchronization
if (read_blocks.size() == 0 && write_blocks.size() == 0) {
AddSyncOp(std::move(op), op_name);
return;
}
// create new node
Node *node = new Node(nodes_.size(), std::move(op), op_name);
// create edges for read_blocks
for (size_t i = 0; i < read_blocks.size(); ++i) {
Block *blk = read_blocks[i];
Node *src_node = nullptr;
BlkInfo *blkInfo = nullptr;
// update leaf blocks
auto iter = leaf_blocks_.find(blk);
if (iter != leaf_blocks_.end()) {
leaf_blocks_.erase(iter);
}
// check if the block is already in the computational graph
auto it = blocks_.find(blk);
if (it == blocks_.end()) {
blkInfo = new BlkInfo(blocks_.size(), blk, BlockType::kInput);
blocks_[blk] = blkInfo;
} else {
blkInfo = it->second;
if (blkInfo->type_ == BlockType::kEnd) {
blkInfo->type_ = BlockType::kInter;
}
// update the existing edge, update dst node and create new edge
Edge *write_edge = blkInfo->write_edge_;
if (write_edge) {
if (!write_edge->dst_node_) {
// change the dst node of the write_edge
write_edge->dst_node_ = node;
node->AddInEdge(write_edge);
blkInfo->graph_ref_ += 1;
continue;
} else {
src_node = write_edge->src_node_;
}
}
}
// create new edge for new block
Edge *edge = new Edge(edges_.size(), blk, src_node, node);
blkInfo->graph_ref_ += 1;
if (src_node) {
src_node->AddOutEdge(edge);
}
node->AddInEdge(edge);
edges_.push_back(edge);
}
// update last node for write_blocks
for (size_t i = 0; i < write_blocks.size(); ++i) {
Block *blk = write_blocks[i];
BlkInfo *blkInfo = nullptr;
// update leaf blocks
leaf_blocks_.insert(blk);
auto it = blocks_.find(blk);
if (it == blocks_.end()) {
blkInfo = new BlkInfo(blocks_.size(), blk, BlockType::kEnd);
blocks_[blk] = blkInfo;
} else {
blkInfo = it->second;
if (blkInfo->type_ == BlockType::kInput) {
blkInfo->type_ = BlockType::kParam;
}
Edge *write_edge = blkInfo->write_edge_;
if (write_edge) {
if (!write_edge->dst_node_) {
write_edge->dst_node_ = node;
node->AddInEdge(write_edge);
} else {
Node *lastNode = write_edge->src_node_;
auto outEdges = lastNode->out_edges();
for (auto outEdge : outEdges) {
if (outEdge->blk_ == blk && outEdge->dst_node_ != node) {
Edge *edge =
new Edge(edges_.size(), blk, outEdge->dst_node_, node);
outEdge->dst_node_->AddOutEdge(edge);
node->AddInEdge(edge);
}
}
}
}
}
// create new edge for new block
Edge *edge = new Edge(edges_.size(), blk, node, nullptr);
blkInfo->write_edge_ = edge;
blkInfo->graph_ref_ += 1;
node->AddOutEdge(edge);
edges_.push_back(edge);
}
// add node into nodes
nodes_.push_back(node);
}
void Graph::AddSyncOp(function<void(Context *)> &&op, string op_name) {
// create new node
Node *node = new Node(nodes_.size(), std::move(op), op_name);
for (auto it : leaf_blocks_) {
Block *blk = it;
BlkInfo *blkInfo = blocks_[blk];
Edge *edge = nullptr;
if (blkInfo->type_ == BlockType::kEnd) {
blkInfo->type_ = BlockType::kInter;
}
Edge *write_edge = blkInfo->write_edge_;
if (!write_edge->dst_node_) {
// change the dst node of the write_edge
write_edge->dst_node_ = node;
edge = write_edge;
} else {
Node *src_node = write_edge->src_node_;
edge = new Edge(edges_.size(), blk, src_node, node);
src_node->AddOutEdge(edge);
edges_.push_back(edge);
}
node->AddInEdge(edge);
// fake edges, no need to add the graph ref
edge = new Edge(edges_.size(), blk, node, nullptr);
blkInfo->write_edge_ = edge;
node->AddOutEdge(edge);
edges_.push_back(edge);
}
// add node into nodes
nodes_.push_back(node);
}
void Graph::Analyze() {
begin_nodes_.clear();
next_nodes_.clear();
next_nodes_.resize(nodes_.size());
free_blocks_.clear();
free_blocks_.resize(nodes_.size());
for (auto &it : blocks_) {
it.second->used_nodes_.clear();
}
AnalyzeNodes();
AnalyzeEdges();
dirty_ = false;
// Debug();
}
void Graph::AnalyzeNodes() {
if (in_serial_) {
begin_nodes_.push_back(nodes_[0]);
for (size_t i = 0; i < nodes_.size(); ++i) {
Node *curNode = nodes_[i];
next_nodes_[i].clear();
if (i + 1 < nodes_.size()) {
next_nodes_[i].push_back(nodes_[i + 1]);
}
BlockSet blks;
for (size_t j = 0; j < curNode->in_edges_.size(); ++j) {
blks.insert(curNode->in_edges_[j]->blk_);
}
for (size_t j = 0; j < curNode->out_edges_.size(); ++j) {
blks.insert(curNode->out_edges_[j]->blk_);
}
for (auto &it : blks) {
blocks_[it]->used_nodes_.push_back(curNode);
}
}
} else {
// init node ref
std::vector<int> node_ref_;
node_ref_.resize(nodes_.size());
for (size_t i = 0; i < nodes_.size(); ++i) {
node_ref_[i] = nodes_[i]->in_edges_.size();
}
// find all input edges and decrease ref count of nodes
for (size_t i = 0; i < edges_.size(); ++i) {
Node *src_node = edges_[i]->src_node_;
if (!src_node) {
Node *node = edges_[i]->dst_node_;
int nodeId = node->id_;
node_ref_[nodeId] -= 1;
}
}
// activate nodes
SafeQueue<Node *> node_queue;
for (size_t i = 0; i < node_ref_.size(); ++i) {
if (node_ref_[i] == 0) {
begin_nodes_.push_back(nodes_[i]);
node_queue.Push(nodes_[i]);
}
}
// run graph
while (node_queue.Size()) {
// step 1: pop the first element, get the node corresponding to the index
Node *curNode = nullptr;
node_queue.Pop(curNode);
int curIndex = curNode->id_;
// step 2: decrease ref count of nodes and activate nodes
next_nodes_[curIndex].clear();
for (size_t i = 0; i < curNode->out_edges_.size(); ++i) {
Edge *edge = curNode->out_edges_[i];
Node *nextNode = edge->dst_node_;
if (!nextNode) {
continue;
}
int nodeId = nextNode->id_;
node_ref_[nodeId] -= 1;
if (node_ref_[nodeId] <= 0) {
node_queue.Push(nextNode);
next_nodes_[curIndex].push_back(nextNode);
}
}
// step 3: push_back curNode to the used_nodes_ of relevant blocks
BlockSet blks;
for (size_t j = 0; j < curNode->in_edges_.size(); ++j) {
blks.insert(curNode->in_edges_[j]->blk_);
}
for (size_t j = 0; j < curNode->out_edges_.size(); ++j) {
blks.insert(curNode->out_edges_[j]->blk_);
}
for (auto &it : blks) {
blocks_[it]->used_nodes_.push_back(curNode);
}
}
}
}
void Graph::AnalyzeEdges() {
for (auto &it : blocks_) {
Block *blk = it.first;
BlkInfo *blkInfo = it.second;
if (blkInfo->used_nodes_.size()) {
int node_id = blkInfo->used_nodes_.back()->id_;
BlockType type = blkInfo->type_;
// if the block belongs to a inter tensor
// and isn't refered on the Python Side
if ((type == BlockType::kInter || type == BlockType::kEnd) &&
blkInfo->graph_ref_ >= blk->ref_count()) {
free_blocks_[node_id].push_back(blk);
}
}
}
}
void Graph::FreeLoop() {
int id = 0;
for (;;) {
free_queue_.Pop(id);
if (id == -1) {
break;
} else {
for (auto it : free_blocks_[id]) {
it->free_data();
}
}
}
}
/*
void CUDART_CB Graph::Callback(cudaStream_t stream, cudaError_t status,
void *data) {
CBData *cb_data = (CBData *)data;
Graph *graph = cb_data->graph_;
graph->free_queue_.Push(cb_data->node_->id_);
delete cb_data;
}
*/
} // namespace singa