| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.hcatalog.hbase; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| |
| import org.apache.hadoop.conf.Configurable; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.HBaseConfiguration; |
| import org.apache.hadoop.hbase.HColumnDescriptor; |
| import org.apache.hadoop.hbase.HTableDescriptor; |
| import org.apache.hadoop.hbase.MasterNotRunningException; |
| import org.apache.hadoop.hbase.ZooKeeperConnectionException; |
| import org.apache.hadoop.hbase.client.HBaseAdmin; |
| import org.apache.hadoop.hbase.client.HTable; |
| import org.apache.hadoop.hbase.mapred.TableOutputFormat; |
| import org.apache.hadoop.hbase.mapreduce.TableInputFormat; |
| import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hive.hbase.HBaseSerDe; |
| import org.apache.hadoop.hive.metastore.HiveMetaHook; |
| import org.apache.hadoop.hive.metastore.MetaStoreUtils; |
| import org.apache.hadoop.hive.metastore.api.MetaException; |
| import org.apache.hadoop.hive.metastore.api.Table; |
| import org.apache.hadoop.hive.ql.metadata.HiveException; |
| import org.apache.hadoop.hive.ql.plan.TableDesc; |
| import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; |
| import org.apache.hadoop.hive.serde2.SerDe; |
| import org.apache.hadoop.mapred.InputFormat; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapred.OutputFormat; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hcatalog.common.HCatConstants; |
| import org.apache.hcatalog.common.HCatUtil; |
| import org.apache.hcatalog.data.schema.HCatSchema; |
| import org.apache.hcatalog.hbase.HBaseBulkOutputFormat.HBaseBulkOutputCommitter; |
| import org.apache.hcatalog.hbase.HBaseDirectOutputFormat.HBaseDirectOutputCommitter; |
| import org.apache.hcatalog.hbase.snapshot.RevisionManager; |
| import org.apache.hcatalog.hbase.snapshot.RevisionManagerConfiguration; |
| import org.apache.hcatalog.hbase.snapshot.Transaction; |
| import org.apache.hcatalog.mapreduce.HCatOutputFormat; |
| import org.apache.hcatalog.mapreduce.HCatTableInfo; |
| import org.apache.hcatalog.mapreduce.InputJobInfo; |
| import org.apache.hcatalog.mapreduce.OutputJobInfo; |
| import org.apache.hcatalog.mapreduce.HCatStorageHandler; |
| import org.apache.thrift.TBase; |
| import org.apache.zookeeper.ZooKeeper; |
| |
| import com.facebook.fb303.FacebookBase; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| |
| /** |
| * This class HBaseHCatStorageHandler provides functionality to create HBase |
| * tables through HCatalog. The implementation is very similar to the |
| * HiveHBaseStorageHandler, with more details to suit HCatalog. |
| */ |
| public class HBaseHCatStorageHandler extends HCatStorageHandler implements HiveMetaHook, Configurable { |
| |
| public final static String DEFAULT_PREFIX = "default."; |
| private final static String PROPERTY_INT_OUTPUT_LOCATION = "hcat.hbase.mapreduce.intermediateOutputLocation"; |
| |
| private Configuration hbaseConf; |
| private Configuration jobConf; |
| private HBaseAdmin admin; |
| |
| @Override |
| public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) { |
| // Populate jobProperties with input table name, table columns, RM snapshot, |
| // hbase-default.xml and hbase-site.xml |
| Map<String, String> tableJobProperties = tableDesc.getJobProperties(); |
| String jobString = tableJobProperties.get(HCatConstants.HCAT_KEY_JOB_INFO); |
| try { |
| InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(jobString); |
| HCatTableInfo tableInfo = inputJobInfo.getTableInfo(); |
| String qualifiedTableName = HBaseHCatStorageHandler.getFullyQualifiedHBaseTableName(tableInfo); |
| jobProperties.put(TableInputFormat.INPUT_TABLE, qualifiedTableName); |
| |
| Configuration jobConf = getJobConf(); |
| addResources(jobConf, jobProperties); |
| JobConf copyOfConf = new JobConf(jobConf); |
| HBaseConfiguration.addHbaseResources(copyOfConf); |
| //Getting hbase delegation token in getInputSplits does not work with PIG. So need to |
| //do it here |
| if (jobConf instanceof JobConf) { //Should be the case |
| HBaseUtil.addHBaseDelegationToken(copyOfConf); |
| ((JobConf) jobConf).getCredentials().addAll(copyOfConf.getCredentials()); |
| } |
| |
| String outputSchema = jobConf.get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA); |
| jobProperties.put(TableInputFormat.SCAN_COLUMNS, getScanColumns(tableInfo, outputSchema)); |
| |
| String serSnapshot = (String) inputJobInfo.getProperties().get( |
| HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY); |
| if (serSnapshot == null) { |
| HCatTableSnapshot snapshot = |
| HBaseRevisionManagerUtil.createSnapshot( |
| RevisionManagerConfiguration.create(copyOfConf), |
| qualifiedTableName, tableInfo); |
| jobProperties.put(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, |
| HCatUtil.serialize(snapshot)); |
| } |
| |
| //This adds it directly to the jobConf. Setting in jobProperties does not get propagated |
| //to JobConf as of now as the jobProperties is maintained per partition |
| //TODO: Remove when HCAT-308 is fixed |
| addOutputDependencyJars(jobConf); |
| jobProperties.put("tmpjars", jobConf.get("tmpjars")); |
| |
| } catch (IOException e) { |
| throw new IllegalStateException("Error while configuring job properties", e); |
| } |
| } |
| |
| @Override |
| public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) { |
| // Populate jobProperties with output table name, hbase-default.xml, hbase-site.xml, OutputJobInfo |
| // Populate RM transaction in OutputJobInfo |
| // In case of bulk mode, populate intermediate output location |
| Map<String, String> tableJobProperties = tableDesc.getJobProperties(); |
| String jobString = tableJobProperties.get(HCatConstants.HCAT_KEY_OUTPUT_INFO); |
| try { |
| OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(jobString); |
| HCatTableInfo tableInfo = outputJobInfo.getTableInfo(); |
| String qualifiedTableName = HBaseHCatStorageHandler.getFullyQualifiedHBaseTableName(tableInfo); |
| jobProperties.put(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, qualifiedTableName); |
| jobProperties.put(TableOutputFormat.OUTPUT_TABLE, qualifiedTableName); |
| |
| Configuration jobConf = getJobConf(); |
| addResources(jobConf, jobProperties); |
| |
| Configuration copyOfConf = new Configuration(jobConf); |
| HBaseConfiguration.addHbaseResources(copyOfConf); |
| |
| String txnString = outputJobInfo.getProperties().getProperty( |
| HBaseConstants.PROPERTY_WRITE_TXN_KEY); |
| Transaction txn = null; |
| if (txnString == null) { |
| txn = HBaseRevisionManagerUtil.beginWriteTransaction(qualifiedTableName, tableInfo, |
| RevisionManagerConfiguration.create(copyOfConf)); |
| String serializedTxn = HCatUtil.serialize(txn); |
| outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY, |
| serializedTxn); |
| } else { |
| txn = (Transaction) HCatUtil.deserialize(txnString); |
| } |
| if (isBulkMode(outputJobInfo)) { |
| String tableLocation = tableInfo.getTableLocation(); |
| String location = new Path(tableLocation, "REVISION_" + txn.getRevisionNumber()) |
| .toString(); |
| outputJobInfo.getProperties().setProperty(PROPERTY_INT_OUTPUT_LOCATION, location); |
| // We are writing out an intermediate sequenceFile hence |
| // location is not passed in OutputJobInfo.getLocation() |
| // TODO replace this with a mapreduce constant when available |
| jobProperties.put("mapred.output.dir", location); |
| jobProperties.put("mapred.output.committer.class", HBaseBulkOutputCommitter.class.getName()); |
| } else { |
| jobProperties.put("mapred.output.committer.class", HBaseDirectOutputCommitter.class.getName()); |
| } |
| |
| jobProperties.put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo)); |
| addOutputDependencyJars(jobConf); |
| jobProperties.put("tmpjars", jobConf.get("tmpjars")); |
| |
| } catch (IOException e) { |
| throw new IllegalStateException("Error while configuring job properties", e); |
| } |
| } |
| |
| /* |
| * @return instance of HiveAuthorizationProvider |
| * |
| * @throws HiveException |
| * |
| * @see org.apache.hcatalog.storagehandler.HCatStorageHandler# |
| * getAuthorizationProvider() |
| */ |
| @Override |
| public HiveAuthorizationProvider getAuthorizationProvider() |
| throws HiveException { |
| |
| HBaseAuthorizationProvider hbaseAuth = new HBaseAuthorizationProvider(); |
| hbaseAuth.init(getConf()); |
| return hbaseAuth; |
| } |
| |
| /* |
| * @param table |
| * |
| * @throws MetaException |
| * |
| * @see org.apache.hcatalog.storagehandler.HCatStorageHandler |
| * #commitCreateTable(org.apache.hadoop.hive.metastore.api.Table) |
| */ |
| @Override |
| public void commitCreateTable(Table table) throws MetaException { |
| } |
| |
| /* |
| * @param instance of table |
| * |
| * @param deleteData |
| * |
| * @throws MetaException |
| * |
| * @see org.apache.hcatalog.storagehandler.HCatStorageHandler |
| * #commitDropTable(org.apache.hadoop.hive.metastore.api.Table, boolean) |
| */ |
| @Override |
| public void commitDropTable(Table tbl, boolean deleteData) |
| throws MetaException { |
| checkDeleteTable(tbl); |
| |
| } |
| |
| /* |
| * @param instance of table |
| * |
| * @throws MetaException |
| * |
| * @see org.apache.hcatalog.storagehandler.HCatStorageHandler |
| * #preCreateTable(org.apache.hadoop.hive.metastore.api.Table) |
| */ |
| @Override |
| public void preCreateTable(Table tbl) throws MetaException { |
| boolean isExternal = MetaStoreUtils.isExternalTable(tbl); |
| |
| hbaseConf = getConf(); |
| |
| if (tbl.getSd().getLocation() != null) { |
| throw new MetaException("LOCATION may not be specified for HBase."); |
| } |
| |
| try { |
| String tableName = getFullyQualifiedHBaseTableName(tbl); |
| String hbaseColumnsMapping = tbl.getParameters().get( |
| HBaseSerDe.HBASE_COLUMNS_MAPPING); |
| |
| if (hbaseColumnsMapping == null) { |
| throw new MetaException( |
| "No hbase.columns.mapping defined in table" |
| + " properties."); |
| } |
| |
| List<String> hbaseColumnFamilies = new ArrayList<String>(); |
| List<String> hbaseColumnQualifiers = new ArrayList<String>(); |
| List<byte[]> hbaseColumnFamiliesBytes = new ArrayList<byte[]>(); |
| int iKey = HBaseUtil.parseColumnMapping(hbaseColumnsMapping, |
| hbaseColumnFamilies, hbaseColumnFamiliesBytes, |
| hbaseColumnQualifiers, null); |
| |
| HTableDescriptor tableDesc; |
| Set<String> uniqueColumnFamilies = new HashSet<String>(); |
| if (!getHBaseAdmin().tableExists(tableName)) { |
| // if it is not an external table then create one |
| if (!isExternal) { |
| // Create the column descriptors |
| tableDesc = new HTableDescriptor(tableName); |
| uniqueColumnFamilies.addAll(hbaseColumnFamilies); |
| uniqueColumnFamilies.remove(hbaseColumnFamilies.get(iKey)); |
| |
| for (String columnFamily : uniqueColumnFamilies) { |
| HColumnDescriptor familyDesc = new HColumnDescriptor(Bytes |
| .toBytes(columnFamily)); |
| familyDesc.setMaxVersions(Integer.MAX_VALUE); |
| tableDesc.addFamily(familyDesc); |
| } |
| |
| getHBaseAdmin().createTable(tableDesc); |
| } else { |
| // an external table |
| throw new MetaException("HBase table " + tableName |
| + " doesn't exist while the table is " |
| + "declared as an external table."); |
| } |
| |
| } else { |
| if (!isExternal) { |
| throw new MetaException("Table " + tableName |
| + " already exists within HBase." |
| + " Use CREATE EXTERNAL TABLE instead to" |
| + " register it in HCatalog."); |
| } |
| // make sure the schema mapping is right |
| tableDesc = getHBaseAdmin().getTableDescriptor( |
| Bytes.toBytes(tableName)); |
| |
| for (int i = 0; i < hbaseColumnFamilies.size(); i++) { |
| if (i == iKey) { |
| continue; |
| } |
| |
| if (!tableDesc.hasFamily(hbaseColumnFamiliesBytes.get(i))) { |
| throw new MetaException("Column Family " |
| + hbaseColumnFamilies.get(i) |
| + " is not defined in hbase table " + tableName); |
| } |
| } |
| } |
| |
| // ensure the table is online |
| new HTable(hbaseConf, tableDesc.getName()); |
| |
| //Set up table in revision manager. |
| RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(hbaseConf); |
| rm.createTable(tableName, new ArrayList<String>(uniqueColumnFamilies)); |
| |
| } catch (MasterNotRunningException mnre) { |
| throw new MetaException(StringUtils.stringifyException(mnre)); |
| } catch (IOException ie) { |
| throw new MetaException(StringUtils.stringifyException(ie)); |
| } catch (IllegalArgumentException iae) { |
| throw new MetaException(StringUtils.stringifyException(iae)); |
| } |
| |
| } |
| |
| /* |
| * @param table |
| * |
| * @throws MetaException |
| * |
| * @see org.apache.hcatalog.storagehandler.HCatStorageHandler |
| * #preDropTable(org.apache.hadoop.hive.metastore.api.Table) |
| */ |
| @Override |
| public void preDropTable(Table table) throws MetaException { |
| } |
| |
| /* |
| * @param table |
| * |
| * @throws MetaException |
| * |
| * @see org.apache.hcatalog.storagehandler.HCatStorageHandler |
| * #rollbackCreateTable(org.apache.hadoop.hive.metastore.api.Table) |
| */ |
| @Override |
| public void rollbackCreateTable(Table table) throws MetaException { |
| checkDeleteTable(table); |
| } |
| |
| /* |
| * @param table |
| * |
| * @throws MetaException |
| * |
| * @see org.apache.hcatalog.storagehandler.HCatStorageHandler |
| * #rollbackDropTable(org.apache.hadoop.hive.metastore.api.Table) |
| */ |
| @Override |
| public void rollbackDropTable(Table table) throws MetaException { |
| } |
| |
| /* |
| * @return instance of HiveMetaHook |
| * |
| * @see org.apache.hcatalog.storagehandler.HCatStorageHandler#getMetaHook() |
| */ |
| @Override |
| public HiveMetaHook getMetaHook() { |
| return this; |
| } |
| |
| private HBaseAdmin getHBaseAdmin() throws MetaException { |
| try { |
| if (admin == null) { |
| admin = new HBaseAdmin(this.getConf()); |
| } |
| return admin; |
| } catch (MasterNotRunningException mnre) { |
| throw new MetaException(StringUtils.stringifyException(mnre)); |
| } catch (ZooKeeperConnectionException zkce) { |
| throw new MetaException(StringUtils.stringifyException(zkce)); |
| } |
| } |
| |
| private String getFullyQualifiedHBaseTableName(Table tbl) { |
| String tableName = tbl.getParameters().get(HBaseSerDe.HBASE_TABLE_NAME); |
| if (tableName == null) { |
| tableName = tbl.getSd().getSerdeInfo().getParameters() |
| .get(HBaseSerDe.HBASE_TABLE_NAME); |
| } |
| if (tableName == null) { |
| if (tbl.getDbName().equals(MetaStoreUtils.DEFAULT_DATABASE_NAME)) { |
| tableName = tbl.getTableName(); |
| } else { |
| tableName = tbl.getDbName() + "." + tbl.getTableName(); |
| } |
| tableName = tableName.toLowerCase(); |
| } |
| return tableName; |
| } |
| |
| static String getFullyQualifiedHBaseTableName(HCatTableInfo tableInfo) { |
| String qualifiedName = tableInfo.getStorerInfo().getProperties() |
| .getProperty(HBaseSerDe.HBASE_TABLE_NAME); |
| if (qualifiedName == null) { |
| String databaseName = tableInfo.getDatabaseName(); |
| String tableName = tableInfo.getTableName(); |
| if ((databaseName == null) |
| || (databaseName.equals(MetaStoreUtils.DEFAULT_DATABASE_NAME))) { |
| qualifiedName = tableName; |
| } else { |
| qualifiedName = databaseName + "." + tableName; |
| } |
| qualifiedName = qualifiedName.toLowerCase(); |
| } |
| return qualifiedName; |
| } |
| |
| @Override |
| public Class<? extends InputFormat> getInputFormatClass() { |
| return HBaseInputFormat.class; |
| } |
| |
| @Override |
| public Class<? extends OutputFormat> getOutputFormatClass() { |
| return HBaseBaseOutputFormat.class; |
| } |
| |
| /* |
| * @return subclass of SerDe |
| * |
| * @throws UnsupportedOperationException |
| * |
| * @see |
| * org.apache.hcatalog.storagehandler.HCatStorageHandler#getSerDeClass() |
| */ |
| @Override |
| public Class<? extends SerDe> getSerDeClass() |
| throws UnsupportedOperationException { |
| return HBaseSerDe.class; |
| } |
| |
| public Configuration getJobConf() { |
| return jobConf; |
| } |
| |
| @Override |
| public Configuration getConf() { |
| |
| if (hbaseConf == null) { |
| hbaseConf = HBaseConfiguration.create(); |
| } |
| return hbaseConf; |
| } |
| |
| @Override |
| public void setConf(Configuration conf) { |
| //setConf is called both during DDL operations and mapred read/write jobs. |
| //Creating a copy of conf for DDL and adding hbase-default and hbase-site.xml to it. |
| //For jobs, maintaining a reference instead of cloning as we need to |
| // 1) add hbase delegation token to the Credentials. |
| // 2) set tmpjars on it. Putting in jobProperties does not get propagated to JobConf |
| // in case of InputFormat as they are maintained per partition. |
| //Not adding hbase-default.xml and hbase-site.xml to jobConf as it will override any |
| //hbase properties set in the JobConf by the user. In configureInputJobProperties and |
| //configureOutputJobProperties, we take care of adding the default properties |
| //that are not already present. TODO: Change to a copy for jobs after HCAT-308 is fixed. |
| jobConf = conf; |
| hbaseConf = RevisionManagerConfiguration.create(HBaseConfiguration.create(conf)); |
| } |
| |
| private void checkDeleteTable(Table table) throws MetaException { |
| boolean isExternal = MetaStoreUtils.isExternalTable(table); |
| String tableName = getFullyQualifiedHBaseTableName(table); |
| RevisionManager rm = null; |
| try { |
| if (!isExternal && getHBaseAdmin().tableExists(tableName)) { |
| // we have created an HBase table, so we delete it to roll back; |
| if (getHBaseAdmin().isTableEnabled(tableName)) { |
| getHBaseAdmin().disableTable(tableName); |
| } |
| getHBaseAdmin().deleteTable(tableName); |
| |
| //Drop table in revision manager. |
| rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(hbaseConf); |
| rm.dropTable(tableName); |
| } |
| } catch (IOException ie) { |
| throw new MetaException(StringUtils.stringifyException(ie)); |
| } finally { |
| HBaseRevisionManagerUtil.closeRevisionManagerQuietly(rm); |
| } |
| } |
| |
| /** |
| * Helper method for users to add the required depedency jars to distributed cache. |
| * @param conf |
| * @throws IOException |
| */ |
| private void addOutputDependencyJars(Configuration conf) throws IOException { |
| TableMapReduceUtil.addDependencyJars(conf, |
| //ZK |
| ZooKeeper.class, |
| //HBase |
| HTable.class, |
| //Hive |
| HiveException.class, |
| //HCatalog jar |
| HCatOutputFormat.class, |
| //hcat hbase storage handler jar |
| HBaseHCatStorageHandler.class, |
| //hive hbase storage handler jar |
| HBaseSerDe.class, |
| //hive jar |
| Table.class, |
| //libthrift jar |
| TBase.class, |
| //hbase jar |
| Bytes.class, |
| //thrift-fb303 .jar |
| FacebookBase.class, |
| //guava jar |
| ThreadFactoryBuilder.class); |
| } |
| |
| /** |
| * Utility method to add hbase-default.xml and hbase-site.xml properties to a new map |
| * if they are not already present in the jobConf. |
| * @param jobConf Job configuration |
| * @param newJobProperties Map to which new properties should be added |
| */ |
| private void addResources(Configuration jobConf, |
| Map<String, String> newJobProperties) { |
| Configuration conf = new Configuration(false); |
| HBaseConfiguration.addHbaseResources(conf); |
| RevisionManagerConfiguration.addResources(conf); |
| for (Entry<String, String> entry : conf) { |
| if (jobConf.get(entry.getKey()) == null) |
| newJobProperties.put(entry.getKey(), entry.getValue()); |
| } |
| } |
| |
| public static boolean isBulkMode(OutputJobInfo outputJobInfo) { |
| //Default is false |
| String bulkMode = outputJobInfo.getTableInfo().getStorerInfo().getProperties() |
| .getProperty(HBaseConstants.PROPERTY_BULK_OUTPUT_MODE_KEY, |
| "false"); |
| return "true".equals(bulkMode); |
| } |
| |
| private String getScanColumns(HCatTableInfo tableInfo, String outputColSchema) throws IOException { |
| StringBuilder builder = new StringBuilder(); |
| String hbaseColumnMapping = tableInfo.getStorerInfo().getProperties() |
| .getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING); |
| if (outputColSchema == null) { |
| String[] splits = hbaseColumnMapping.split("[,]"); |
| for (int i = 0; i < splits.length; i++) { |
| if (!splits[i].equals(HBaseSerDe.HBASE_KEY_COL)) |
| builder.append(splits[i]).append(" "); |
| } |
| } else { |
| HCatSchema outputSchema = (HCatSchema) HCatUtil.deserialize(outputColSchema); |
| HCatSchema tableSchema = tableInfo.getDataColumns(); |
| List<String> outputFieldNames = outputSchema.getFieldNames(); |
| List<Integer> outputColumnMapping = new ArrayList<Integer>(); |
| for (String fieldName : outputFieldNames) { |
| int position = tableSchema.getPosition(fieldName); |
| outputColumnMapping.add(position); |
| } |
| List<String> columnFamilies = new ArrayList<String>(); |
| List<String> columnQualifiers = new ArrayList<String>(); |
| HBaseUtil.parseColumnMapping(hbaseColumnMapping, columnFamilies, null, |
| columnQualifiers, null); |
| for (int i = 0; i < outputColumnMapping.size(); i++) { |
| int cfIndex = outputColumnMapping.get(i); |
| String cf = columnFamilies.get(cfIndex); |
| // We skip the key column. |
| if (cf.equals(HBaseSerDe.HBASE_KEY_COL) == false) { |
| String qualifier = columnQualifiers.get(i); |
| builder.append(cf); |
| builder.append(":"); |
| if (qualifier != null) { |
| builder.append(qualifier); |
| } |
| builder.append(" "); |
| } |
| } |
| } |
| //Remove the extra space delimiter |
| builder.deleteCharAt(builder.length() - 1); |
| return builder.toString(); |
| } |
| |
| } |