blob: e46705dfe202f9f244ed90f5b42742834a0b83ae [file] [log] [blame]
/*
* Copyright 2009-2012 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.asterix.external.dataset.adapter;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
import edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
public abstract class FileSystemBasedAdapter extends AbstractDatasourceAdapter {
private static final long serialVersionUID = 1L;
protected ITupleParserFactory parserFactory;
protected ITupleParser parser;
public static final String KEY_DELIMITER = "delimiter";
public static final String KEY_PATH = "path";
public abstract InputStream getInputStream(int partition) throws IOException;
public FileSystemBasedAdapter(IAType atype) {
this.atype = atype;
}
@Override
public void start(int partition, IFrameWriter writer) throws Exception {
InputStream in = getInputStream(partition);
parser = getTupleParser();
parser.parse(in, writer);
}
@Override
public abstract void initialize(IHyracksTaskContext ctx) throws Exception;
@Override
public abstract void configure(Map<String, String> arguments) throws Exception;
@Override
public abstract AdapterType getAdapterType();
@Override
public abstract AlgebricksPartitionConstraint getPartitionConstraint() throws Exception;
protected ITupleParser getTupleParser() throws Exception {
return parserFactory.createTupleParser(ctx);
}
protected void configureFormat() throws Exception {
String parserFactoryClassname = configuration.get(KEY_PARSER_FACTORY);
if (parserFactoryClassname == null) {
String specifiedFormat = configuration.get(KEY_FORMAT);
if (specifiedFormat == null) {
throw new IllegalArgumentException(" Unspecified data format");
} else if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat)) {
parserFactory = getDelimitedDataTupleParserFactory((ARecordType) atype);
} else if (FORMAT_ADM.equalsIgnoreCase(configuration.get(KEY_FORMAT))) {
parserFactory = getADMDataTupleParserFactory((ARecordType) atype);
} else {
throw new IllegalArgumentException(" format " + configuration.get(KEY_FORMAT) + " not supported");
}
} else {
parserFactory = (ITupleParserFactory) Class.forName(parserFactoryClassname).newInstance();
}
}
protected ITupleParserFactory getDelimitedDataTupleParserFactory(ARecordType recordType) throws AsterixException {
int n = recordType.getFieldTypes().length;
IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
for (int i = 0; i < n; i++) {
ATypeTag tag = recordType.getFieldTypes()[i].getTypeTag();
IValueParserFactory vpf = typeToValueParserFactMap.get(tag);
if (vpf == null) {
throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
}
fieldParserFactories[i] = vpf;
}
String delimiterValue = (String) configuration.get(KEY_DELIMITER);
if (delimiterValue != null && delimiterValue.length() > 1) {
throw new AsterixException("improper delimiter");
}
Character delimiter = delimiterValue.charAt(0);
return new NtDelimitedDataTupleParserFactory(recordType, fieldParserFactories, delimiter);
}
protected ITupleParserFactory getADMDataTupleParserFactory(ARecordType recordType) throws AsterixException {
try {
return new AdmSchemafullRecordParserFactory(recordType);
} catch (Exception e) {
throw new AsterixException(e);
}
}
}