blob: 5f3863eb6297ee5899701461e2405dbacdc60de5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "CombineHandler.h"
namespace NativeTask {
const char * REFILL = "refill";
const int LENGTH_OF_REFILL_STRING = 6;
const Command CombineHandler::COMBINE(4, "Combine");
CombineHandler::CombineHandler()
: _combineContext(NULL), _kvIterator(NULL), _writer(NULL), _kType(UnknownType),
_vType(UnknownType), _config(NULL), _kvCached(false), _combineInputRecordCount(0),
_combineInputBytes(0), _combineOutputRecordCount(0), _combineOutputBytes(0) {
}
CombineHandler::~CombineHandler() {
}
void CombineHandler::configure(Config * config) {
_config = config;
MapOutputSpec::getSpecFromConfig(_config, _mapOutputSpec);
_kType = _mapOutputSpec.keyType;
_vType = _mapOutputSpec.valueType;
}
uint32_t CombineHandler::feedDataToJavaInWritableSerialization() {
uint32_t written = 0;
bool firstKV = true;
_out.position(0);
if (_kvCached) {
uint32_t kvLength = _key.outerLength + _value.outerLength + KVBuffer::headerLength();
outputInt(bswap(_key.outerLength));
outputInt(bswap(_value.outerLength));
outputKeyOrValue(_key, _kType);
outputKeyOrValue(_value, _vType);
written += kvLength;
_kvCached = false;
firstKV = false;
}
uint32_t recordCount = 0;
while (nextKeyValue(_key, _value)) {
//::sleep(5);
_kvCached = false;
recordCount++;
uint32_t kvLength = _key.outerLength + _value.outerLength + KVBuffer::headerLength();
if (!firstKV && kvLength > _out.remain()) {
_kvCached = true;
break;
} else {
firstKV = false;
//write final key length and final value length
outputInt(bswap(_key.outerLength));
outputInt(bswap(_value.outerLength));
outputKeyOrValue(_key, _kType);
outputKeyOrValue(_value, _vType);
written += kvLength;
}
}
if (_out.position() > 0) {
flushOutput();
}
_combineInputRecordCount += recordCount;
_combineInputBytes += written;
return written;
}
/**
* KV: key or value
*/
void CombineHandler::outputKeyOrValue(SerializeInfo & KV, KeyValueType type) {
switch (type) {
case TextType:
output(KV.varBytes, KV.outerLength - KV.buffer.length());
output(KV.buffer.data(), KV.buffer.length());
break;
case BytesType:
outputInt(bswap(KV.buffer.length()));
output(KV.buffer.data(), KV.buffer.length());
break;
default:
output(KV.buffer.data(), KV.buffer.length());
break;
}
}
bool CombineHandler::nextKeyValue(SerializeInfo & key, SerializeInfo & value) {
if (!_kvIterator->next(key.buffer, value.buffer)) {
return false;
}
uint32_t varLength = 0;
switch (_kType) {
case TextType:
WritableUtils::WriteVInt(key.buffer.length(), key.varBytes, varLength);
key.outerLength = key.buffer.length() + varLength;
break;
case BytesType:
key.outerLength = key.buffer.length() + 4;
break;
default:
key.outerLength = key.buffer.length();
break;
}
//prepare final value length
uint32_t varValueLength = 0;
switch (_vType) {
case TextType:
WritableUtils::WriteVInt(value.buffer.length(), value.varBytes, varValueLength);
value.outerLength = value.buffer.length() + varValueLength;
break;
case BytesType:
value.outerLength = value.buffer.length() + 4;
break;
default:
value.outerLength = value.buffer.length();
break;
}
return true;
}
uint32_t CombineHandler::feedDataToJava(SerializationFramework serializationType) {
if (serializationType == WRITABLE_SERIALIZATION) {
return feedDataToJavaInWritableSerialization();
}
THROW_EXCEPTION(IOException, "Native Serialization not supported");
}
void CombineHandler::handleInput(ByteBuffer & in) {
char * buff = in.current();
uint32_t length = in.remain();
uint32_t remain = length;
char * pos = buff;
if (_asideBuffer.remain() > 0) {
uint32_t filledLength = _asideBuffer.fill(pos, length);
pos += filledLength;
remain -= filledLength;
}
if (_asideBuffer.size() > 0 && _asideBuffer.remain() == 0) {
_asideBuffer.position(0);
write(_asideBuffer.current(), _asideBuffer.size());
_asideBuffer.wrap(NULL, 0);
}
if (remain == 0) {
return;
}
KVBuffer * kvBuffer = (KVBuffer *)pos;
if (unlikely(remain < kvBuffer->headerLength())) {
THROW_EXCEPTION(IOException, "k/v meta information incomplete");
}
uint32_t kvLength = kvBuffer->lengthConvertEndium();
if (kvLength > remain) {
_asideBytes.resize(kvLength);
_asideBuffer.wrap(_asideBytes.buff(), _asideBytes.size());
_asideBuffer.fill(pos, remain);
pos += remain;
remain = 0;
} else {
write(pos, remain);
}
}
void CombineHandler::write(char * buf, uint32_t length) {
KVBuffer * kv = NULL;
char * pos = buf;
uint32_t remain = length;
uint32_t outputRecordCount = 0;
while (remain > 0) {
kv = (KVBuffer *)pos;
kv->keyLength = bswap(kv->keyLength);
kv->valueLength = bswap(kv->valueLength);
_writer->write(kv->getKey(), kv->keyLength, kv->getValue(), kv->valueLength);
outputRecordCount++;
remain -= kv->length();
pos += kv->length();
}
_combineOutputRecordCount += outputRecordCount;
_combineOutputBytes += length;
}
string toString(uint32_t length) {
string result;
result.reserve(4);
result.assign((char *)(&length), 4);
return result;
}
void CombineHandler::onLoadData() {
feedDataToJava(WRITABLE_SERIALIZATION);
}
ResultBuffer * CombineHandler::onCall(const Command& command, ParameterBuffer * param) {
THROW_EXCEPTION(UnsupportException, "Command not supported by RReducerHandler");
}
void CombineHandler::combine(CombineContext type, KVIterator * kvIterator, IFileWriter * writer) {
_combineInputRecordCount = 0;
_combineOutputRecordCount = 0;
_combineInputBytes = 0;
_combineOutputBytes = 0;
this->_combineContext = &type;
this->_kvIterator = kvIterator;
this->_writer = writer;
call(COMBINE, NULL);
LOG("[CombineHandler] input Record Count: %d, input Bytes: %d, "
"output Record Count: %d, output Bytes: %d",
_combineInputRecordCount, _combineInputBytes,
_combineOutputRecordCount, _combineOutputBytes);
return;
}
void CombineHandler::finish() {
}
} /* namespace NativeTask */