blob: 32f18879698a1629cbe8333a72aff84b26887f01 [file] [log] [blame]
/************************************************************
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*************************************************************/
#include "singa/neuralnet/input_layer.h"
#include "singa/utils/context.h"
#include "singa/utils/singleton.h"
namespace singa {
using std::thread;
StoreInputLayer::~StoreInputLayer() {
if (thread_ != nullptr) {
thread_->join();
delete thread_;
}
if (store_ != nullptr) {
delete store_;
}
}
void StoreInputLayer::Setup(const LayerProto& conf,
const vector<Layer*>& srclayers) {
InputLayer::Setup(conf, srclayers);
const auto& batchsize = conf.store_conf().batchsize();
CHECK(batchsize.size());
if (conf.partition_dim() == 0) {
if (batchsize.size() == 1) // equal partition
batchsize_ = batchsize.Get(0) / conf.num_partitions();
else // manual partition
batchsize_ = batchsize.Get(conf.partition_id());
} else {
batchsize_ = conf.store_conf().batchsize(0);
}
vector<int> shape {batchsize_};
for (int s : conf.store_conf().shape())
shape.push_back(s);
data_.Reshape(shape);
aux_data_.resize(batchsize_);
}
void StoreInputLayer::fetch_data() {
if (store_ == nullptr) {
store_ = io::OpenStore(layer_conf_.store_conf().backend(),
layer_conf_.store_conf().path(),
io::kRead);
if (layer_conf_.store_conf().random_skip() > 0) {
std::uniform_int_distribution<int>
distribution(0, layer_conf_.store_conf().random_skip());
auto generator = Singleton<Context>::Instance()->rand_generator(
std::this_thread::get_id());
random_skip_ = distribution(*generator);
}
string key, val;
while (random_skip_ > 0) {
if (!store_->Read(&key, &val)) {
store_->SeekToFirst();
CHECK(store_->Read(&key, &val));
}
random_skip_--;
}
buf_keys_.resize(batchsize_);
buf_vals_.resize(batchsize_);
}
for (int k = 0; k < batchsize_; k++) {
if (!store_->Read(&buf_keys_[k], &buf_vals_[k])) {
store_->SeekToFirst();
CHECK(store_->Read(&buf_keys_[k], &buf_vals_[k]));
}
}
}
void StoreInputLayer::ComputeFeature(int flag,
const vector<Layer*>& srclayers) {
// if prefetching, wait for the thread to finish
if (layer_conf_.store_conf().prefetching()) {
if (thread_ == nullptr) {
thread_ = new thread(&StoreInputLayer::fetch_data, this);
}
thread_->join();
delete thread_;
thread_ = nullptr;
} else {
fetch_data();
}
for (int k = 0; k < batchsize_; k++)
Parse(k, flag, buf_keys_[k], buf_vals_[k]);
if (layer_conf_.store_conf().prefetching())
thread_ = new thread(&StoreInputLayer::fetch_data, this);
}
void SingleLabelRecordLayer::Setup(const LayerProto& conf,
const vector<Layer*>& srclayers) {
StoreInputLayer::Setup(conf, srclayers);
}
void SingleLabelRecordLayer::ComputeFeature(int flag,
const vector<Layer*>& srclayers) {
StoreInputLayer::ComputeFeature(flag, srclayers);
auto& store_conf = layer_conf_.store_conf();
if (store_conf.has_mean_file() && mean_.count() == 0) {
mean_.Reshape(vector<int>{data_.count() / batchsize_});
LoadRecord(store_conf.backend(), store_conf.mean_file(), &mean_);
} else if (store_conf.has_mean_value() && mean_.count() == 0) {
mean_.Reshape(vector<int>{data_.count() / batchsize_});
for (int i = 0; i < data_.count() / batchsize_; i++)
mean_.mutable_cpu_data()[i] = store_conf.mean_value();
}
if (store_conf.has_std_file() && std_.count() == 0) {
std_.Reshape(vector<int>{data_.count() / batchsize_});
LoadRecord(store_conf.backend(), store_conf.std_file(), &std_);
// TODO(wangwei) check std[i] != 0
} else if (store_conf.has_std_value() && std_.count() == 0) {
std_.Reshape(vector<int>{data_.count() / batchsize_});
CHECK_NE(store_conf.std_value(), 0);
for (int i = 0; i < data_.count() / batchsize_; i++)
std_.mutable_cpu_data()[i] = store_conf.std_value();
}
if (mean_.count()) {
const float* mean = mean_.cpu_data();
for (int k = 0; k < batchsize_; k++) {
float* dptr = data_.mutable_cpu_data() + k * mean_.count();
for (int i = 0; i < mean_.count(); i++) {
dptr[i] -= mean[i];
}
}
}
if (std_.count()) {
const float* std = std_.cpu_data();
for (int k = 0; k < batchsize_; k++) {
float* dptr = data_.mutable_cpu_data() + k * std_.count();
for (int i = 0; i < std_.count(); i++) {
dptr[i] /= std[i];
}
}
}
}
} // namespace singa