blob: f0e9ea445bce85b80127e7833fa129c7db643473 [file] [log] [blame]
/*
* Copyright 2009-2011 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.asterix.external.dataset.adapter;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
import edu.uci.ics.asterix.external.data.parser.IDataParser;
import edu.uci.ics.asterix.external.data.parser.IDataStreamParser;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
import edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory;
import edu.uci.ics.asterix.runtime.util.AsterixRuntimeUtil;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
import edu.uci.ics.hyracks.dataflow.hadoop.util.InputSplitsProxy;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
public class HDFSAdapter extends AbstractDatasourceAdapter implements IDatasourceReadAdapter {
private static final Logger LOGGER = Logger.getLogger(HDFSAdapter.class.getName());
private String inputFormatClassName;
private Object[] inputSplits;
private transient JobConf conf;
private IHyracksTaskContext ctx;
private boolean isDelimited;
private Character delimiter;
private InputSplitsProxy inputSplitsProxy;
private String parserClass;
private static final Map<String, String> formatClassNames = new HashMap<String, String>();
public static final String KEY_HDFS_URL = "hdfs";
public static final String KEY_HDFS_PATH = "path";
public static final String KEY_INPUT_FORMAT = "input-format";
public static final String INPUT_FORMAT_TEXT = "text-input-format";
public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
static {
formatClassNames.put(INPUT_FORMAT_TEXT, "org.apache.hadoop.mapred.TextInputFormat");
formatClassNames.put(INPUT_FORMAT_SEQUENCE, "org.apache.hadoop.mapred.SequenceFileInputFormat");
}
@Override
public void configure(Map<String, String> arguments, IAType atype) throws Exception {
configuration = arguments;
configureFormat();
configureJobConf();
configurePartitionConstraint();
this.atype = atype;
}
private void configureFormat() throws Exception {
String format = configuration.get(KEY_INPUT_FORMAT);
inputFormatClassName = formatClassNames.get(format);
if (inputFormatClassName == null) {
throw new Exception("format " + format + " not supported");
}
parserClass = configuration.get(KEY_PARSER);
if (parserClass == null) {
if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(configuration.get(KEY_FORMAT))) {
parserClass = formatToParserMap.get(FORMAT_DELIMITED_TEXT);
} else if (FORMAT_ADM.equalsIgnoreCase(configuration.get(KEY_FORMAT))) {
parserClass = formatToParserMap.get(FORMAT_ADM);
}
}
}
private IDataParser createDataParser() throws Exception {
IDataParser dataParser = (IDataParser) Class.forName(parserClass).newInstance();
dataParser.configure(configuration);
return dataParser;
}
private void configurePartitionConstraint() throws Exception {
AlgebricksAbsolutePartitionConstraint absPartitionConstraint;
List<String> locations = new ArrayList<String>();
Random random = new Random();
boolean couldConfigureLocationConstraints = true;
if (inputSplitsProxy == null) {
InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, 0);
try {
for (InputSplit inputSplit : inputSplits) {
String[] dataNodeLocations = inputSplit.getLocations();
for (String datanodeLocation : dataNodeLocations) {
Set<String> nodeControllersAtLocation = AsterixRuntimeUtil
.getNodeControllersOnHostName(datanodeLocation);
if (nodeControllersAtLocation == null || nodeControllersAtLocation.size() == 0) {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.log(Level.INFO, "No node controller found at " + datanodeLocation
+ " will look at replica location");
}
couldConfigureLocationConstraints = false;
} else {
int locationIndex = random.nextInt(nodeControllersAtLocation.size());
String chosenLocation = (String) nodeControllersAtLocation.toArray()[locationIndex];
locations.add(chosenLocation);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.log(Level.INFO, "split : " + inputSplit + " to be processed by :"
+ chosenLocation);
}
couldConfigureLocationConstraints = true;
break;
}
}
if(!couldConfigureLocationConstraints){
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.log(Level.INFO, "No local node controller found to process split : " + inputSplit + " will use count constraint!");
}
break;
}
}
if (couldConfigureLocationConstraints) {
partitionConstraint = new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {}));
} else {
partitionConstraint = new AlgebricksCountPartitionConstraint(inputSplits.length);
}
} catch (UnknownHostException e) {
partitionConstraint = new AlgebricksCountPartitionConstraint(inputSplits.length);
}
inputSplitsProxy = new InputSplitsProxy(conf, inputSplits);
}
}
private ITupleParserFactory createTupleParserFactory(ARecordType recType) {
if (isDelimited) {
int n = recType.getFieldTypes().length;
IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
for (int i = 0; i < n; i++) {
ATypeTag tag = recType.getFieldTypes()[i].getTypeTag();
IValueParserFactory vpf = typeToValueParserFactMap.get(tag);
if (vpf == null) {
throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
}
fieldParserFactories[i] = vpf;
}
return new NtDelimitedDataTupleParserFactory(recType, fieldParserFactories, delimiter);
} else {
return new AdmSchemafullRecordParserFactory(recType);
}
}
private JobConf configureJobConf() throws Exception {
conf = new JobConf();
conf.set("fs.default.name", configuration.get(KEY_HDFS_URL));
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
conf.setClassLoader(HDFSAdapter.class.getClassLoader());
conf.set("mapred.input.dir", configuration.get(KEY_HDFS_PATH));
conf.set("mapred.input.format.class", formatClassNames.get(configuration.get(KEY_INPUT_FORMAT)));
return conf;
}
public AdapterDataFlowType getAdapterDataFlowType() {
return AdapterDataFlowType.PULL;
}
public AdapterType getAdapterType() {
return AdapterType.READ_WRITE;
}
@Override
public void initialize(IHyracksTaskContext ctx) throws Exception {
this.ctx = ctx;
inputSplits = inputSplitsProxy.toInputSplits(conf);
}
private Reporter getReporter() {
Reporter reporter = new Reporter() {
@Override
public Counter getCounter(Enum<?> arg0) {
return null;
}
@Override
public Counter getCounter(String arg0, String arg1) {
return null;
}
@Override
public InputSplit getInputSplit() throws UnsupportedOperationException {
return null;
}
@Override
public void incrCounter(Enum<?> arg0, long arg1) {
}
@Override
public void incrCounter(String arg0, String arg1, long arg2) {
}
@Override
public void setStatus(String arg0) {
}
@Override
public void progress() {
}
};
return reporter;
}
@Override
public IDataParser getDataParser(int partition) throws Exception {
Path path = new Path(inputSplits[partition].toString());
FileSystem fs = FileSystem.get(conf);
InputStream inputStream;
if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
RecordReader reader = format.getRecordReader((org.apache.hadoop.mapred.FileSplit) inputSplits[partition],
conf, getReporter());
inputStream = new HDFSStream(reader, ctx);
} else {
try {
TextInputFormat format = (TextInputFormat) conf.getInputFormat();
RecordReader reader = format.getRecordReader(
(org.apache.hadoop.mapred.FileSplit) inputSplits[partition], conf, getReporter());
inputStream = new HDFSStream(reader, ctx);
} catch (FileNotFoundException e) {
throw new HyracksDataException(e);
}
}
IDataParser dataParser = createDataParser();
if (dataParser instanceof IDataStreamParser) {
((IDataStreamParser) dataParser).setInputStream(inputStream);
} else {
throw new IllegalArgumentException(" parser not compatible");
}
dataParser.configure(configuration);
dataParser.initialize((ARecordType) atype, ctx);
return dataParser;
}
}
class HDFSStream extends InputStream {
private ByteBuffer buffer;
private int capacity;
private RecordReader reader;
private boolean readNext = true;
private final Object key;
private final Text value;
public HDFSStream(RecordReader reader, IHyracksTaskContext ctx) throws Exception {
capacity = ctx.getFrameSize();
buffer = ByteBuffer.allocate(capacity);
this.reader = reader;
key = reader.createKey();
try {
value = (Text) reader.createValue();
} catch (ClassCastException cce) {
throw new Exception("context is not of type org.apache.hadoop.io.Text"
+ " type not supported in sequence file format", cce);
}
initialize();
}
private void initialize() throws Exception {
boolean hasMore = reader.next(key, value);
if (!hasMore) {
buffer.limit(0);
} else {
buffer.position(0);
buffer.limit(capacity);
buffer.put(value.getBytes());
buffer.put("\n".getBytes());
buffer.flip();
}
}
@Override
public int read() throws IOException {
if (!buffer.hasRemaining()) {
boolean hasMore = reader.next(key, value);
if (!hasMore) {
return -1;
}
buffer.position(0);
buffer.limit(capacity);
buffer.put(value.getBytes());
buffer.put("\n".getBytes());
buffer.flip();
return buffer.get();
} else {
return buffer.get();
}
}
}