blob: e397f90545c68f8ad2303731a8197a124c63ff17 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef IFILE_H_
#define IFILE_H_
#include "util/Checksum.h"
#include "lib/Buffers.h"
#include "util/WritableUtils.h"
#include "lib/SpillInfo.h"
#include "lib/MapOutputSpec.h"
namespace NativeTask {
/**
* IFileReader
*/
class IFileReader {
private:
InputStream * _stream;
ChecksumInputStream * _source;
ReadBuffer _reader;
ChecksumType _checksumType;
KeyValueType _kType;
KeyValueType _vType;
string _codec;
int32_t _segmentIndex;
SingleSpillInfo * _spillInfo;
const char * _valuePos;
uint32_t _valueLen;
bool _deleteSourceStream;
public:
IFileReader(InputStream * stream, SingleSpillInfo * spill, bool deleteSourceStream = false);
virtual ~IFileReader();
/**
* @return 0 if have next partition, none 0 if no more partition
*/
bool nextPartition();
/**
* get next key
* NULL if no more, then next_partition() need to be called
* NOTICE: before value() is called, the return pointer value is
* guaranteed to be valid
*/
const char * nextKey(uint32_t & keyLen) {
int64_t t1 = _reader.readVLong();
int64_t t2 = _reader.readVLong();
if (t1 == -1) {
return NULL;
}
const char * kvbuff = _reader.get((uint32_t)(t1 + t2));
uint32_t len;
switch (_kType) {
case TextType:
keyLen = WritableUtils::ReadVInt(kvbuff, len);
break;
case BytesType:
keyLen = bswap(*(uint32_t*)kvbuff);
len = 4;
break;
default:
keyLen = t1;
len = 0;
}
const char * kbuff = kvbuff + len;
const char * vbuff = kvbuff + (uint32_t)t1;
switch (_vType) {
case TextType:
_valueLen = WritableUtils::ReadVInt(vbuff, len);
_valuePos = vbuff + len;
break;
case BytesType:
_valueLen = bswap(*(uint32_t*)vbuff);
_valuePos = vbuff + 4;
break;
default:
_valueLen = t2;
_valuePos = vbuff;
}
return kbuff;
}
/**
* length of current value part of IFile entry
*/
uint32_t valueLen() {
return _valueLen;
}
/**
* get current value
*/
const char * value(uint32_t & valueLen) {
valueLen = _valueLen;
return _valuePos;
}
};
/**
* IFile Writer
*/
class IFileWriter : public Collector {
protected:
OutputStream * _stream;
ChecksumOutputStream * _dest;
ChecksumType _checksumType;
KeyValueType _kType;
KeyValueType _vType;
string _codec;
AppendBuffer _appendBuffer;
vector<IFileSegment> _spillFileSegments;
Counter * _recordCounter;
uint64_t _recordCount;
bool _deleteTargetStream;
private:
IFileSegment * toArray(std::vector<IFileSegment> *segments);
public:
static IFileWriter * create(const std::string & filepath, const MapOutputSpec & spec,
Counter * spilledRecords);
IFileWriter(OutputStream * stream, ChecksumType checksumType, KeyValueType ktype,
KeyValueType vtype, const string & codec, Counter * recordCounter,
bool deleteTargetStream = false);
virtual ~IFileWriter();
void startPartition();
void endPartition();
virtual void write(const char * key, uint32_t keyLen, const char * value, uint32_t valueLen);
SingleSpillInfo * getSpillInfo();
void getStatistics(uint64_t & offset, uint64_t & realOffset, uint64_t & recordCount);
virtual void collect(const void * key, uint32_t keyLen, const void * value, uint32_t valueLen) {
write((const char*)key, keyLen, (const char*)value, valueLen);
}
};
} // namespace NativeTask
#endif /* IFILE_H_ */