| /* |
| * Copyright 2009-2012 by The Regents of the University of California |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * you may obtain a copy of the License from |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package edu.uci.ics.asterix.external.dataset.adapter; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.Map; |
| |
| import edu.uci.ics.asterix.common.exceptions.AsterixException; |
| import edu.uci.ics.asterix.om.types.ARecordType; |
| import edu.uci.ics.asterix.om.types.ATypeTag; |
| import edu.uci.ics.asterix.om.types.IAType; |
| import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory; |
| import edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory; |
| import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; |
| import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException; |
| import edu.uci.ics.hyracks.api.comm.IFrameWriter; |
| import edu.uci.ics.hyracks.api.context.IHyracksTaskContext; |
| import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory; |
| import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser; |
| import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory; |
| |
| public abstract class FileSystemBasedAdapter extends AbstractDatasourceAdapter { |
| |
| private static final long serialVersionUID = 1L; |
| |
| protected ITupleParserFactory parserFactory; |
| protected ITupleParser parser; |
| |
| public static final String KEY_DELIMITER = "delimiter"; |
| public static final String KEY_PATH = "path"; |
| |
| public abstract InputStream getInputStream(int partition) throws IOException; |
| |
| public FileSystemBasedAdapter(IAType atype) { |
| this.atype = atype; |
| } |
| |
| @Override |
| public void start(int partition, IFrameWriter writer) throws Exception { |
| InputStream in = getInputStream(partition); |
| parser = getTupleParser(); |
| parser.parse(in, writer); |
| } |
| |
| @Override |
| public abstract void initialize(IHyracksTaskContext ctx) throws Exception; |
| |
| @Override |
| public abstract void configure(Map<String, String> arguments) throws Exception; |
| |
| @Override |
| public abstract AdapterType getAdapterType(); |
| |
| @Override |
| public abstract AlgebricksPartitionConstraint getPartitionConstraint() throws Exception; |
| |
| protected ITupleParser getTupleParser() throws Exception { |
| return parserFactory.createTupleParser(ctx); |
| } |
| |
| protected void configureFormat() throws Exception { |
| String parserFactoryClassname = configuration.get(KEY_PARSER_FACTORY); |
| if (parserFactoryClassname == null) { |
| String specifiedFormat = configuration.get(KEY_FORMAT); |
| if (specifiedFormat == null) { |
| throw new IllegalArgumentException(" Unspecified data format"); |
| } else if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat)) { |
| parserFactory = getDelimitedDataTupleParserFactory((ARecordType) atype); |
| } else if (FORMAT_ADM.equalsIgnoreCase(configuration.get(KEY_FORMAT))) { |
| parserFactory = getADMDataTupleParserFactory((ARecordType) atype); |
| } else { |
| throw new IllegalArgumentException(" format " + configuration.get(KEY_FORMAT) + " not supported"); |
| } |
| } else { |
| parserFactory = (ITupleParserFactory) Class.forName(parserFactoryClassname).newInstance(); |
| } |
| |
| } |
| |
| protected ITupleParserFactory getDelimitedDataTupleParserFactory(ARecordType recordType) throws AsterixException { |
| int n = recordType.getFieldTypes().length; |
| IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n]; |
| for (int i = 0; i < n; i++) { |
| ATypeTag tag = recordType.getFieldTypes()[i].getTypeTag(); |
| IValueParserFactory vpf = typeToValueParserFactMap.get(tag); |
| if (vpf == null) { |
| throw new NotImplementedException("No value parser factory for delimited fields of type " + tag); |
| } |
| fieldParserFactories[i] = vpf; |
| } |
| String delimiterValue = (String) configuration.get(KEY_DELIMITER); |
| if (delimiterValue != null && delimiterValue.length() > 1) { |
| throw new AsterixException("improper delimiter"); |
| } |
| |
| Character delimiter = delimiterValue.charAt(0); |
| return new NtDelimitedDataTupleParserFactory(recordType, fieldParserFactories, delimiter); |
| } |
| |
| protected ITupleParserFactory getADMDataTupleParserFactory(ARecordType recordType) throws AsterixException { |
| try { |
| return new AdmSchemafullRecordParserFactory(recordType); |
| } catch (Exception e) { |
| throw new AsterixException(e); |
| } |
| |
| } |
| } |