| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.sqoop.hbase; |
| |
| import static org.apache.hadoop.hbase.HConstants.MASTER_INFO_PORT; |
| import static org.apache.hadoop.hbase.HConstants.REGIONSERVER_INFO_PORT; |
| import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_CLIENT_PORT; |
| import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGION_COPROCESSOR_CONF_KEY; |
| import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.KRB_PRINCIPAL; |
| import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.MASTER_KRB_PRINCIPAL; |
| import static org.apache.hadoop.hbase.security.User.HBASE_SECURITY_CONF_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY; |
| import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_PRINCIPAL; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.List; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.HBaseTestingUtility; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.client.Connection; |
| import org.apache.hadoop.hbase.client.ConnectionFactory; |
| import org.apache.hadoop.hbase.client.Table; |
| import org.apache.hadoop.hbase.client.Get; |
| import org.apache.hadoop.hbase.client.Result; |
| import org.apache.hadoop.hbase.client.ResultScanner; |
| import org.apache.hadoop.hbase.security.HBaseKerberosUtils; |
| import org.apache.hadoop.hbase.security.token.TokenProvider; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.http.HttpConfig; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.sqoop.infrastructure.kerberos.KerberosConfigurationProvider; |
| import org.junit.After; |
| import org.junit.Before; |
| |
| import org.apache.sqoop.testutil.CommonArgs; |
| import org.apache.sqoop.testutil.HsqldbTestServer; |
| import org.apache.sqoop.testutil.ImportJobTestCase; |
| |
| /** |
| * Utility methods that facilitate HBase import tests. |
| */ |
| public abstract class HBaseTestCase extends ImportJobTestCase { |
| |
| public static final Log LOG = LogFactory.getLog( |
| HBaseTestCase.class.getName()); |
| private static final String INFO_PORT_DISABLE_WEB_UI = "-1"; |
| private static final String DEFAULT_DFS_HTTPS_ADDRESS = "localhost:0"; |
| |
| private final KerberosConfigurationProvider kerberosConfigurationProvider; |
| private HBaseTestingUtility hbaseTestUtil; |
| |
| public HBaseTestCase() { |
| this(null); |
| } |
| |
| public HBaseTestCase(KerberosConfigurationProvider kerberosConfigurationProvider) { |
| this.kerberosConfigurationProvider = kerberosConfigurationProvider; |
| } |
| |
| /** |
| * Create the argv to pass to Sqoop. |
| * @return the argv as an array of strings. |
| */ |
| protected String [] getArgv(boolean includeHadoopFlags, |
| String hbaseTable, String hbaseColFam, boolean hbaseCreate, |
| String queryStr) { |
| |
| ArrayList<String> args = new ArrayList<String>(); |
| |
| if (includeHadoopFlags) { |
| CommonArgs.addHadoopFlags(args); |
| String zookeeperPort = hbaseTestUtil.getConfiguration().get(ZOOKEEPER_CLIENT_PORT); |
| args.add("-D"); |
| args.add("hbase.zookeeper.property.clientPort=" + zookeeperPort); |
| args.addAll(getKerberosFlags()); |
| } |
| |
| if (null != queryStr) { |
| args.add("--query"); |
| args.add(queryStr); |
| } else { |
| args.add("--table"); |
| args.add(getTableName()); |
| } |
| args.add("--split-by"); |
| args.add(getColName(0)); |
| args.add("--connect"); |
| args.add(HsqldbTestServer.getUrl()); |
| args.add("--num-mappers"); |
| args.add("1"); |
| args.add("--column-family"); |
| args.add(hbaseColFam); |
| args.add("--hbase-table"); |
| args.add(hbaseTable); |
| if (hbaseCreate) { |
| args.add("--hbase-create-table"); |
| } |
| return args.toArray(new String[0]); |
| } |
| |
| /** |
| * Create the argv to pass to Sqoop as incremental options. |
| * @return the argv as an array of strings. |
| */ |
| protected String [] getIncrementalArgv(boolean includeHadoopFlags, |
| String hbaseTable, String hbaseColFam, boolean hbaseCreate, |
| String queryStr, boolean isAppend, boolean appendTimestamp, String checkColumn, String checkValue, String lastModifiedColumn, String nullMode) { |
| |
| String[] argsStrArray = getArgv(includeHadoopFlags, hbaseTable, hbaseColFam, hbaseCreate, queryStr); |
| List<String> args = new ArrayList<String>(Arrays.asList(argsStrArray)); |
| |
| if (isAppend) { |
| args.add("--incremental"); |
| args.add("append"); |
| if (!appendTimestamp) { |
| args.add("--check-column"); |
| args.add(checkColumn);//"ID"); |
| } else { |
| args.add("--check-column"); |
| args.add(lastModifiedColumn);//LAST_MODIFIED"); |
| } |
| } else { |
| args.add("--incremental"); |
| args.add("lastmodified"); |
| args.add("--check-column"); |
| args.add(checkColumn); |
| args.add("--last-value"); |
| args.add(checkValue); |
| } |
| |
| // Set --hbase-null-incremental-mode (default is 'ignore') |
| if (nullMode == null) { |
| nullMode = "ignore"; |
| } |
| args.add("--hbase-null-incremental-mode"); |
| args.add(nullMode); |
| |
| return args.toArray(new String[0]); |
| } |
| |
| @Override |
| @Before |
| public void setUp() { |
| try { |
| hbaseTestUtil = new HBaseTestingUtility(); |
| // We set the port for the hbase master and regionserver web UI to -1 because we do not want the info server to run. |
| hbaseTestUtil.getConfiguration().set(MASTER_INFO_PORT, INFO_PORT_DISABLE_WEB_UI); |
| hbaseTestUtil.getConfiguration().set(REGIONSERVER_INFO_PORT, INFO_PORT_DISABLE_WEB_UI); |
| |
| setupKerberos(); |
| |
| hbaseTestUtil.startMiniCluster(); |
| super.setUp(); |
| } catch (Throwable e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| private void setupKerberos() { |
| if (!isKerberized()){ |
| return; |
| } |
| |
| HBaseKerberosUtils.setPrincipalForTesting(kerberosConfigurationProvider.getTestPrincipal()); |
| HBaseKerberosUtils.setKeytabFileForTesting(kerberosConfigurationProvider.getKeytabFilePath()); |
| |
| Configuration configuration = hbaseTestUtil.getConfiguration(); |
| HBaseKerberosUtils.setSecuredConfiguration(configuration); |
| UserGroupInformation.setConfiguration(configuration); |
| configuration.setStrings(REGION_COPROCESSOR_CONF_KEY, TokenProvider.class.getName()); |
| |
| setupKerberosForHdfs(kerberosConfigurationProvider.getTestPrincipal(), configuration); |
| } |
| |
| private void setupKerberosForHdfs(String servicePrincipal, Configuration configuration) { |
| configuration.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, servicePrincipal); |
| configuration.set(DFS_NAMENODE_KEYTAB_FILE_KEY, kerberosConfigurationProvider.getKeytabFilePath()); |
| configuration.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, servicePrincipal); |
| configuration.set(DFS_DATANODE_KEYTAB_FILE_KEY, kerberosConfigurationProvider.getKeytabFilePath()); |
| configuration.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); |
| configuration.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, servicePrincipal); |
| configuration.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTP_ONLY.name()); |
| configuration.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, DEFAULT_DFS_HTTPS_ADDRESS); |
| configuration.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, DEFAULT_DFS_HTTPS_ADDRESS); |
| configuration.setBoolean(IGNORE_SECURE_PORTS_FOR_TESTING_KEY, true); |
| } |
| |
| public void shutdown() throws Exception { |
| LOG.info("In shutdown() method"); |
| LOG.info("Shutting down HBase cluster"); |
| hbaseTestUtil.shutdownMiniCluster(); |
| hbaseTestUtil = null; |
| LOG.info("shutdown() method returning."); |
| } |
| |
| @Override |
| @After |
| public void tearDown() { |
| try { |
| shutdown(); |
| } catch (Exception e) { |
| LOG.warn("Error shutting down HBase minicluster: " |
| + StringUtils.stringifyException(e)); |
| } |
| super.tearDown(); |
| } |
| |
| protected void verifyHBaseCell(String tableName, String rowKey, |
| String colFamily, String colName, String val) throws IOException { |
| Get get = new Get(Bytes.toBytes(rowKey)); |
| get.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(colName)); |
| try ( |
| Connection hbaseConnection = createHBaseConnection(); |
| Table table = getHBaseTable(hbaseConnection, tableName) |
| ) { |
| Result r = table.get(get); |
| byte [] actualVal = r.getValue(Bytes.toBytes(colFamily), |
| Bytes.toBytes(colName)); |
| if (null == val) { |
| assertNull("Got a result when expected null", actualVal); |
| } else { |
| assertNotNull("No result, but we expected one", actualVal); |
| assertEquals(val, Bytes.toString(actualVal)); |
| } |
| } |
| } |
| |
| protected int countHBaseTable(String tableName, String colFamily) |
| throws IOException { |
| int count = 0; |
| try ( |
| Connection hbaseConnection = createHBaseConnection(); |
| Table table = getHBaseTable(hbaseConnection, tableName) |
| ) { |
| ResultScanner scanner = table.getScanner(Bytes.toBytes(colFamily)); |
| for(Result result = scanner.next(); |
| result != null; |
| result = scanner.next()) { |
| count++; |
| } |
| } |
| return count; |
| } |
| |
| private Connection createHBaseConnection() throws IOException { |
| return ConnectionFactory.createConnection(new Configuration(hbaseTestUtil.getConfiguration())); |
| } |
| |
| private Table getHBaseTable(Connection connection, String tableName) throws IOException { |
| return connection.getTable(TableName.valueOf(tableName)); |
| } |
| |
| protected boolean isKerberized() { |
| return kerberosConfigurationProvider != null; |
| } |
| |
| private String createFlagWithValue(String flag, String value) { |
| return String.format("%s=%s", flag, value); |
| } |
| |
| private List<String> getKerberosFlags() { |
| if (!isKerberized()) { |
| return Collections.emptyList(); |
| } |
| List<String> result = new ArrayList<>(); |
| |
| String principalForTesting = HBaseKerberosUtils.getPrincipalForTesting(); |
| result.add("-D"); |
| result.add(createFlagWithValue(HBASE_SECURITY_CONF_KEY, "kerberos")); |
| result.add("-D"); |
| result.add(createFlagWithValue(MASTER_KRB_PRINCIPAL, principalForTesting)); |
| result.add("-D"); |
| result.add(createFlagWithValue(KRB_PRINCIPAL, principalForTesting)); |
| result.add("-D"); |
| result.add(createFlagWithValue(RM_PRINCIPAL, principalForTesting)); |
| |
| return result; |
| } |
| } |