blob: 032fd38ad0ff13372ae70be47e38db8c4ba8ef8f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.hbase;
import com.cloudera.sqoop.lib.FieldMapProcessor;
import com.cloudera.sqoop.lib.FieldMappable;
import com.cloudera.sqoop.lib.ProcessingException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* SqoopRecordProcessor that performs an HBase "put" operation
* that contains all the fields of the record.
*/
public class HBasePutProcessor implements Closeable, Configurable,
FieldMapProcessor {
public static final Log LOG = LogFactory.getLog(
HBasePutProcessor.class.getName());
/** Configuration key specifying the table to insert into. */
public static final String TABLE_NAME_KEY = "sqoop.hbase.insert.table";
/** Configuration key specifying the column family to insert into. */
public static final String COL_FAMILY_KEY =
"sqoop.hbase.insert.column.family";
/** Configuration key specifying the column of the input whose value
* should be used as the row id.
*/
public static final String ROW_KEY_COLUMN_KEY =
"sqoop.hbase.insert.row.key.column";
/**
* Configuration key specifying the PutTransformer implementation to use.
*/
public static final String TRANSFORMER_CLASS_KEY =
"sqoop.hbase.insert.put.transformer.class";
/**
* Configuration key to enable/disable hbase bulkLoad.
*/
public static final String BULK_LOAD_ENABLED_KEY =
"sqoop.hbase.bulk.load.enabled";
/** Configuration key to specify whether to add the row key column into
* HBase. Set to false by default.
*/
public static final String ADD_ROW_KEY = "sqoop.hbase.add.row.key";
public static final boolean ADD_ROW_KEY_DEFAULT = false;
private Configuration conf;
// An object that can transform a map of fieldName->object
// into a Put command.
private PutTransformer putTransformer;
private String tableName;
private HTable table;
public HBasePutProcessor() {
}
@Override
@SuppressWarnings("unchecked")
public void setConf(Configuration config) {
this.conf = config;
// Get the implementation of PutTransformer to use.
// By default, we call toString() on every non-null field.
Class<? extends PutTransformer> xformerClass =
(Class<? extends PutTransformer>)
this.conf.getClass(TRANSFORMER_CLASS_KEY, ToStringPutTransformer.class);
this.putTransformer = (PutTransformer)
ReflectionUtils.newInstance(xformerClass, this.conf);
if (null == putTransformer) {
throw new RuntimeException("Could not instantiate PutTransformer.");
}
putTransformer.init(conf);
this.tableName = conf.get(TABLE_NAME_KEY, null);
try {
this.table = new HTable(conf, this.tableName);
} catch (IOException ioe) {
throw new RuntimeException("Could not access HBase table " + tableName,
ioe);
}
this.table.setAutoFlush(false);
}
@Override
public Configuration getConf() {
return this.conf;
}
@Override
/**
* Processes a record by extracting its field map and converting
* it into a list of Put commands into HBase.
*/
public void accept(FieldMappable record)
throws IOException, ProcessingException {
Map<String, Object> fields = record.getFieldMap();
List<Mutation> mutationList = putTransformer.getMutationCommand(fields);
if (null != mutationList) {
for (Mutation mutation : mutationList) {
if (mutation!=null) {
if(mutation instanceof Put) {
Put putObject = (Put) mutation;
if (putObject.isEmpty()) {
LOG.warn("Could not insert row with no columns "
+ "for row-key column: " + Bytes.toString(putObject.getRow()));
} else {
this.table.put(putObject);
}
} else if(mutation instanceof Delete) {
Delete deleteObject = (Delete) mutation;
if (deleteObject.isEmpty()) {
LOG.warn("Could not delete row with no columns "
+ "for row-key column: " + Bytes.toString(deleteObject.getRow()));
} else {
this.table.delete(deleteObject);
}
}
}
}
}
}
@Override
/**
* Closes the HBase table and commits all pending operations.
*/
public void close() throws IOException {
this.table.flushCommits();
this.table.close();
}
}