blob: 3731ebaa0d48a145ac1133f412943c33b7485fbf [file] [log] [blame]
/*
* Copyright 2009-2012 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.asterix.external.dataset.adapter;
import java.util.Map;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
/**
* Provides the functionality of fetching data in form of ADM records from a Hive dataset.
*/
public class HiveAdapter extends AbstractDatasourceAdapter {
private static final long serialVersionUID = 1L;
public static final String HIVE_DATABASE = "database";
public static final String HIVE_TABLE = "table";
public static final String HIVE_HOME = "hive-home";
public static final String HIVE_METASTORE_URI = "metastore-uri";
public static final String HIVE_WAREHOUSE_DIR = "warehouse-dir";
public static final String HIVE_METASTORE_RAWSTORE_IMPL = "rawstore-impl";
private HDFSAdapter hdfsAdapter;
public HiveAdapter(IAType atype) {
this.hdfsAdapter = new HDFSAdapter(atype);
this.atype = atype;
}
@Override
public AdapterType getAdapterType() {
return AdapterType.READ;
}
@Override
public void configure(Map<String, String> arguments) throws Exception {
configuration = arguments;
configureHadoopAdapter();
}
private void configureHadoopAdapter() throws Exception {
String database = configuration.get(HIVE_DATABASE);
String tablePath = null;
if (database == null) {
tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + configuration.get(HIVE_TABLE);
} else {
tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + tablePath + ".db" + "/"
+ configuration.get(HIVE_TABLE);
}
configuration.put(HDFSAdapter.KEY_PATH, tablePath);
if (!configuration.get(KEY_FORMAT).equals(FORMAT_DELIMITED_TEXT)) {
throw new IllegalArgumentException("format" + configuration.get(KEY_FORMAT) + " is not supported");
}
if (!(configuration.get(HDFSAdapter.KEY_INPUT_FORMAT).equals(HDFSAdapter.INPUT_FORMAT_TEXT) || configuration
.get(HDFSAdapter.KEY_INPUT_FORMAT).equals(HDFSAdapter.INPUT_FORMAT_SEQUENCE))) {
throw new IllegalArgumentException("file input format" + configuration.get(HDFSAdapter.KEY_INPUT_FORMAT)
+ " is not supported");
}
hdfsAdapter = new HDFSAdapter(atype);
hdfsAdapter.configure(configuration);
}
@Override
public void initialize(IHyracksTaskContext ctx) throws Exception {
hdfsAdapter.initialize(ctx);
}
@Override
public void start(int partition, IFrameWriter writer) throws Exception {
hdfsAdapter.start(partition, writer);
}
@Override
public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
return hdfsAdapter.getPartitionConstraint();
}
}