blob: 7b1fb757004377789aed9b2d98f85186d4bd5111 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapred;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.lib.CombineFileSplit;
import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
import com.gemstone.gemfire.cache.hdfs.internal.UnsortedHoplogPersistedEvent;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.GFKey;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.HDFSSplitIterator;
public class AbstractGFRecordReader
extends
com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.AbstractGFRecordReader
implements RecordReader<GFKey, PersistedEventImpl> {
/**
* Initializes instance of record reader using file split and job
* configuration
*
* @param split
* @param conf
* @throws IOException
*/
public void initialize(CombineFileSplit split, JobConf conf) throws IOException {
CombineFileSplit cSplit = (CombineFileSplit) split;
Path[] path = cSplit.getPaths();
long[] start = cSplit.getStartOffsets();
long[] len = cSplit.getLengths();
FileSystem fs = cSplit.getPath(0).getFileSystem(conf);
this.splitIterator = HDFSSplitIterator.newInstance(fs, path, start, len, 0l, 0l);
}
@Override
public boolean next(GFKey key, PersistedEventImpl value) throws IOException {
/*
* if there are more records in the hoplog, iterate to the next record. Set
* key object as is.
*/
if (!super.hasNext()) {
key.setKey(null);
// TODO make value null;
return false;
}
super.next();
key.setKey(super.getKey().getKey());
PersistedEventImpl usersValue = super.getValue();
value.copy(usersValue);
return true;
}
@Override
public GFKey createKey() {
return new GFKey();
}
@Override
public PersistedEventImpl createValue() {
if(this.isSequential) {
return new UnsortedHoplogPersistedEvent();
} else {
return new SortedHoplogPersistedEvent();
}
}
@Override
public long getPos() throws IOException {
// there is no efficient way to find the position of key in hoplog file.
return 0;
}
@Override
public void close() throws IOException {
super.close();
}
@Override
public float getProgress() throws IOException {
return super.getProgressRatio();
}
}