blob: 3120be143ea08603ddd841c70b774dc96db6f567 [file] [log] [blame]
/************************************************************
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*************************************************************/
#include "singa/io/kvfile.h"
#include <glog/logging.h>
namespace singa {
namespace io {
KVFile::KVFile(const std::string& path, Mode mode, int capacity) :
path_(path), mode_(mode), capacity_(capacity) {
buf_ = new char[capacity];
switch (mode) {
case KVFile::kRead:
fdat_.open(path_, std::ios::in | std::ios::binary);
if (!fdat_.is_open()) {
// path may be a directory
path_ = path + "/shard.dat";
fdat_.open(path_, std::ios::in | std::ios::binary);
}
CHECK(fdat_.is_open()) << "Cannot create file " << path_;
break;
case KVFile::kCreate:
fdat_.open(path_, std::ios::binary | std::ios::out | std::ios::trunc);
CHECK(fdat_.is_open()) << "Cannot create file " << path_;
break;
case KVFile::kAppend:
fdat_.open(path_, std::ios::in | std::ios::binary);
if (!fdat_.is_open()) {
// path may be a directory
path_ = path + "/shard.dat";
fdat_.open(path_, std::ios::in | std::ios::binary);
}
CHECK(fdat_.is_open()) << "Cannot open file " << path_;
fdat_.close();
{
int last_tuple = PrepareForAppend(path_);
fdat_.open(path_, std::ios::binary | std::ios::out
| std::ios::in | std::ios::ate);
fdat_.seekp(last_tuple);
}
break;
default:
LOG(FATAL) << "unknown model to open KVFile " << mode;
break;
}
}
KVFile::~KVFile() {
if (mode_ != kRead)
Flush();
delete[] buf_;
fdat_.close();
}
#ifdef USE_PROTOBUF
bool KVFile::Next(std::string* key, google::protobuf::Message* val) {
int vallen = Next(key);
if (vallen == 0) return false;
val->ParseFromArray(buf_ + offset_, vallen);
offset_ += vallen;
return true;
}
bool KVFile::Insert(const std::string& key,
const google::protobuf::Message& val) {
std::string str;
val.SerializeToString(&str);
return Insert(key, str);
}
#endif
bool KVFile::Next(std::string *key, std::string* val) {
int vallen = Next(key);
if (vallen == 0) return false;
val->clear();
for (int i = 0; i < vallen; ++i)
val->push_back(buf_[offset_ + i]);
offset_ += vallen;
return true;
}
// insert one complete tuple
bool KVFile::Insert(const std::string& key, const std::string& val) {
if (keys_.find(key) != keys_.end() || val.size() == 0)
return false;
int size = key.size() + val.size() + 2*sizeof(size_t);
if (bufsize_ + size > capacity_) {
fdat_.write(buf_, bufsize_);
bufsize_ = 0;
CHECK_LE(size, capacity_) << "Tuple size is larger than capacity "
<< "Try a larger capacity size";
}
*reinterpret_cast<size_t*>(buf_ + bufsize_) = key.size();
bufsize_ += sizeof(size_t);
memcpy(buf_ + bufsize_, key.data(), key.size());
bufsize_ += key.size();
*reinterpret_cast<size_t*>(buf_ + bufsize_) = val.size();
bufsize_ += sizeof(size_t);
memcpy(buf_ + bufsize_, val.data(), val.size());
bufsize_ += val.size();
return true;
}
void KVFile::SeekToFirst() {
CHECK_EQ(mode_, kRead);
bufsize_ = 0;
offset_ = 0;
fdat_.clear();
fdat_.seekg(0);
CHECK(fdat_.is_open()) << "Cannot create file " << path_;
}
void KVFile::Flush() {
fdat_.write(buf_, bufsize_);
fdat_.flush();
bufsize_ = 0;
}
int KVFile::Count() {
std::ifstream fin(path_, std::ios::in | std::ios::binary);
CHECK(fdat_.is_open()) << "Cannot create file " << path_;
int count = 0;
while (true) {
size_t len;
fin.read(reinterpret_cast<char*>(&len), sizeof(len));
if (!fin.good()) break;
fin.seekg(len, std::ios_base::cur);
if (!fin.good()) break;
fin.read(reinterpret_cast<char*>(&len), sizeof(len));
if (!fin.good()) break;
fin.seekg(len, std::ios_base::cur);
if (!fin.good()) break;
count++;
}
fin.close();
return count;
}
int KVFile::Next(std::string *key) {
key->clear();
int ssize = sizeof(size_t);
if (!PrepareNextField(ssize)) return 0;
int keylen = *reinterpret_cast<size_t*>(buf_ + offset_);
offset_ += ssize;
if (!PrepareNextField(keylen)) return 0;
for (int i = 0; i < keylen; ++i)
key->push_back(buf_[offset_ + i]);
offset_ += keylen;
if (!PrepareNextField(ssize)) return 0;
int vallen = *reinterpret_cast<size_t*>(buf_ + offset_);
offset_ += ssize;
if (!PrepareNextField(vallen)) return 0;
return vallen;
}
int KVFile::PrepareForAppend(const std::string& path) {
std::ifstream fin(path, std::ios::in | std::ios::binary);
if (!fin.is_open()) return 0;
int last_tuple_offset = 0;
char buf[256];
size_t len;
while (true) {
fin.read(reinterpret_cast<char*>(&len), sizeof(len));
if (!fin.good()) break;
fin.read(buf, len);
buf[len] = '\0';
if (!fin.good()) break;
fin.read(reinterpret_cast<char*>(&len), sizeof(len));
if (!fin.good()) break;
fin.seekg(len, std::ios_base::cur);
if (!fin.good()) break;
keys_.insert(std::string(buf));
last_tuple_offset = fin.tellg();
}
fin.close();
return last_tuple_offset;
}
// if the buf does not have the next complete field, read data from disk
bool KVFile::PrepareNextField(int size) {
if (offset_ + size > bufsize_) {
bufsize_ -= offset_;
// wangsh: commented, not sure what this check does
// CHECK_LE(bufsize_, offset_);
for (int i = 0; i < bufsize_; ++i)
buf_[i] = buf_[i + offset_];
offset_ = 0;
if (fdat_.eof()) {
return false;
} else {
fdat_.read(buf_ + bufsize_, capacity_ - bufsize_);
bufsize_ += fdat_.gcount();
if (size > bufsize_) return false;
}
}
return true;
}
} // namespace io
} // namespace singa