blob: a277eeb1000390c2c52b2760288dc0a41e1013d1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.storage.hbase.ii;
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
import org.apache.hadoop.hbase.security.User;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.util.IIDeployCoprocessorCLI;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.model.IIDesc;
import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
import org.apache.kylin.metadata.realization.IRealizationConstants;
/**
* @author George Song (ysong1)
*/
public class IICreateHTableJob extends AbstractHadoopJob {
@Override
public int run(String[] args) throws Exception {
Options options = new Options();
Admin admin = null;
try {
options.addOption(OPTION_II_NAME);
options.addOption(OPTION_HTABLE_NAME);
parseOptions(options, args);
String tableName = getOptionValue(OPTION_HTABLE_NAME);
String iiName = getOptionValue(OPTION_II_NAME);
KylinConfig config = KylinConfig.getInstanceFromEnv();
IIManager iiManager = IIManager.getInstance(config);
IIInstance ii = iiManager.getII(iiName);
int sharding = ii.getDescriptor().getSharding();
Configuration conf = HBaseConfiguration.create(getConf());
Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
// check if the table already exists
admin = conn.getAdmin();
if (admin.tableExists(TableName.valueOf(tableName))) {
if (admin.isTableEnabled(TableName.valueOf(tableName))) {
logger.info("Table " + tableName + " already exists and is enabled, no need to create.");
return 0;
} else {
logger.error("Table " + tableName + " is disabled, couldn't append data");
return 1;
}
}
// table doesn't exist, need to create
HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
HColumnDescriptor cf = new HColumnDescriptor(IIDesc.HBASE_FAMILY);
cf.setMaxVersions(1);
String hbaseDefaultCC = config.getHbaseDefaultCompressionCodec().toLowerCase();
switch (hbaseDefaultCC) {
case "snappy": {
logger.info("hbase will use snappy to compress data");
cf.setCompressionType(Compression.Algorithm.SNAPPY);
break;
}
case "lzo": {
logger.info("hbase will use lzo to compress data");
cf.setCompressionType(Compression.Algorithm.LZO);
break;
}
case "gz":
case "gzip": {
logger.info("hbase will use gzip to compress data");
cf.setCompressionType(Compression.Algorithm.GZ);
break;
}
case "lz4": {
logger.info("hbase will use lz4 to compress data");
cf.setCompressionType(Compression.Algorithm.LZ4);
break;
}
default: {
logger.info("hbase will not user any compression codec to compress data");
}
}
cf.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
tableDesc.addFamily(cf);
tableDesc.setValue(IRealizationConstants.HTableTag, config.getMetadataUrlPrefix());
tableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis()));
tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
if (User.isHBaseSecurityEnabled(conf)) {
// add coprocessor for bulk load
tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
}
IIDeployCoprocessorCLI.deployCoprocessor(tableDesc);
// create table
byte[][] splitKeys = getSplits(sharding);
if (splitKeys.length == 0)
splitKeys = null;
admin.createTable(tableDesc, splitKeys);
if (splitKeys != null) {
for (int i = 0; i < splitKeys.length; i++) {
logger.info("split key " + i + ": " + BytesUtil.toHex(splitKeys[i]));
}
}
logger.info("create hbase table " + tableName + " done.");
return 0;
} catch (Exception e) {
printUsage(options);
throw e;
} finally {
if (admin != null)
admin.close();
}
}
//one region for one shard
private byte[][] getSplits(int shard) {
byte[][] result = new byte[shard - 1][];
for (int i = 1; i < shard; ++i) {
byte[] split = new byte[IIKeyValueCodec.SHARD_LEN];
BytesUtil.writeUnsigned(i, split, 0, IIKeyValueCodec.SHARD_LEN);
result[i - 1] = split;
}
return result;
}
}