blob: a74ecb99906d99ba3de27d353138e911b24c24db [file] [log] [blame]
/*
*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
*
* contributor license agreements. See the NOTICE file distributed with
*
* this work for additional information regarding copyright ownership.
*
* The ASF licenses this file to You under the Apache License, Version 2.0
*
* (the "License"); you may not use this file except in compliance with
*
* the License. You may obtain a copy of the License at
*
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
*
* Unless required by applicable law or agreed to in writing, software
*
* distributed under the License is distributed on an "AS IS" BASIS,
*
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
* See the License for the specific language governing permissions and
*
* limitations under the License.
*
* /
*/
package org.apache.kylin.provision;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.TimeZone;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.StreamingBatch;
import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.invertedindex.index.Slice;
import org.apache.kylin.invertedindex.index.SliceBuilder;
import org.apache.kylin.invertedindex.model.IIDesc;
import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
import org.apache.kylin.invertedindex.model.IIRow;
import org.apache.kylin.job.DeployUtil;
import org.apache.kylin.job.JoinedFlatTable;
import org.apache.kylin.job.common.ShellExecutable;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.source.hive.HiveCmdBuilder;
import org.apache.kylin.source.hive.HiveTableReader;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.ii.IICreateHTableJob;
import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
public class BuildIIWithStream {
private static final Logger logger = LoggerFactory.getLogger(BuildIIWithStream.class);
private static final String[] II_NAME = new String[] { "test_kylin_ii_left_join", "test_kylin_ii_inner_join" };
private IIManager iiManager;
private KylinConfig kylinConfig;
public static void main(String[] args) throws Exception {
try {
beforeClass();
BuildIIWithStream buildCubeWithEngine = new BuildIIWithStream();
buildCubeWithEngine.before();
buildCubeWithEngine.build();
logger.info("Build is done");
afterClass();
logger.info("Going to exit");
System.exit(0);
} catch (Exception e) {
logger.error("error", e);
System.exit(1);
}
}
public static void beforeClass() throws Exception {
logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
}
HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
}
protected void deployEnv() throws Exception {
DeployUtil.overrideJobJarLocations();
}
public void before() throws Exception {
deployEnv();
kylinConfig = KylinConfig.getInstanceFromEnv();
iiManager = IIManager.getInstance(kylinConfig);
for (String iiInstance : II_NAME) {
IIInstance ii = iiManager.getII(iiInstance);
if (ii.getStatus() != RealizationStatusEnum.DISABLED) {
ii.setStatus(RealizationStatusEnum.DISABLED);
iiManager.updateII(ii);
}
}
}
public static void afterClass() throws Exception {
cleanupOldStorage();
HBaseMetadataTestCase.staticCleanupTestMetadata();
}
private String createIntermediateTable(IIDesc desc, KylinConfig kylinConfig) throws IOException {
IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(desc);
JobEngineConfig jobEngineConfig = new JobEngineConfig(kylinConfig);
final String uuid = UUID.randomUUID().toString();
final String useDatabaseHql = "USE " + kylinConfig.getHiveDatabaseForIntermediateTable() + ";";
final String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc);
final String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, JobBuilderSupport.getJobWorkingDir(jobEngineConfig, uuid));
String insertDataHqls;
try {
insertDataHqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, jobEngineConfig);
} catch (IOException e1) {
e1.printStackTrace();
throw new RuntimeException("Failed to generate insert data SQL for intermediate table.");
}
ShellExecutable step = new ShellExecutable();
HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
hiveCmdBuilder.addStatement(useDatabaseHql);
hiveCmdBuilder.addStatement(dropTableHql);
hiveCmdBuilder.addStatement(createTableHql);
hiveCmdBuilder.addStatement(insertDataHqls);
step.setCmd(hiveCmdBuilder.build());
logger.info(step.getCmd());
step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
kylinConfig.getCliCommandExecutor().execute(step.getCmd(), null);
return intermediateTableDesc.getTableName();
}
private void clearSegment(String iiName) throws Exception {
IIInstance ii = iiManager.getII(iiName);
ii.getSegments().clear();
iiManager.updateII(ii);
}
private IISegment createSegment(String iiName) throws Exception {
clearSegment(iiName);
SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
f.setTimeZone(TimeZone.getTimeZone("GMT"));
long date1 = 0;
long date2 = f.parse("2015-01-01").getTime();
return buildSegment(iiName, date1, date2);
}
private IISegment buildSegment(String iiName, long startDate, long endDate) throws Exception {
IIInstance iiInstance = iiManager.getII(iiName);
IISegment segment = iiManager.buildSegment(iiInstance, startDate, endDate);
iiInstance.getSegments().add(segment);
iiManager.updateII(iiInstance);
return segment;
}
private void buildII(String iiName) throws Exception {
final IIDesc desc = iiManager.getII(iiName).getDescriptor();
final String tableName = createIntermediateTable(desc, kylinConfig);
logger.info("intermediate table name:" + tableName);
HiveTableReader reader = new HiveTableReader("default", tableName);
final List<TblColRef> tblColRefs = desc.listAllColumns();
for (TblColRef tblColRef : tblColRefs) {
if (desc.isMetricsCol(tblColRef)) {
logger.info("matrix:" + tblColRef.getName());
} else {
logger.info("measure:" + tblColRef.getName());
}
}
final IISegment segment = createSegment(iiName);
final Table htable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(TableName.valueOf(segment.getStorageLocationIdentifier()));
String[] args = new String[] { "-iiname", iiName, "-htablename", segment.getStorageLocationIdentifier() };
ToolRunner.run(new IICreateHTableJob(), args);
final IIDesc iiDesc = segment.getIIDesc();
final SliceBuilder sliceBuilder = new SliceBuilder(desc, (short) 0);
List<String[]> sorted = getSortedRows(reader, desc.getTimestampColumn());
int count = sorted.size();
ArrayList<StreamingMessage> messages = Lists.newArrayList();
for (String[] row : sorted) {
messages.add((parse(row)));
if (messages.size() >= iiDesc.getSliceSize()) {
build(sliceBuilder, new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable);
messages.clear();
}
}
if (!messages.isEmpty()) {
build(sliceBuilder, new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable);
}
reader.close();
logger.info("total record count:" + count + " htable:" + segment.getStorageLocationIdentifier());
logger.info("stream build finished, htable name:" + segment.getStorageLocationIdentifier());
}
public void build() throws Exception {
for (String iiName : II_NAME) {
buildII(iiName);
IIInstance ii = iiManager.getII(iiName);
if (ii.getStatus() != RealizationStatusEnum.READY) {
ii.setStatus(RealizationStatusEnum.READY);
iiManager.updateII(ii);
}
}
}
private void build(SliceBuilder sliceBuilder, StreamingBatch batch, Table htable) throws IOException {
final Slice slice = sliceBuilder.buildSlice(batch);
try {
loadToHBase(htable, slice, new IIKeyValueCodec(slice.getInfo()));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void loadToHBase(Table hTable, Slice slice, IIKeyValueCodec codec) throws IOException {
List<Put> data = Lists.newArrayList();
for (IIRow row : codec.encodeKeyValue(slice)) {
final byte[] key = row.getKey().get();
final byte[] value = row.getValue().get();
Put put = new Put(key);
put.addColumn(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, value);
final ImmutableBytesWritable dictionary = row.getDictionary();
final byte[] dictBytes = dictionary.get();
if (dictionary.getOffset() == 0 && dictionary.getLength() == dictBytes.length) {
put.addColumn(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES, dictBytes);
} else {
throw new RuntimeException("dict offset should be 0, and dict length should be " + dictBytes.length + " but they are" + dictionary.getOffset() + " " + dictionary.getLength());
}
data.add(put);
}
hTable.put(data);
//omit hTable.flushCommits(), because htable is auto flush
}
private StreamingMessage parse(String[] row) {
return new StreamingMessage(Lists.newArrayList(row), System.currentTimeMillis(), System.currentTimeMillis(), Collections.<String, Object> emptyMap());
}
private List<String[]> getSortedRows(HiveTableReader reader, final int tsCol) throws IOException {
List<String[]> unsorted = Lists.newArrayList();
while (reader.next()) {
unsorted.add(reader.getRow());
}
Collections.sort(unsorted, new Comparator<String[]>() {
@Override
public int compare(String[] o1, String[] o2) {
long t1 = DateFormat.stringToMillis(o1[tsCol]);
long t2 = DateFormat.stringToMillis(o2[tsCol]);
return Long.compare(t1, t2);
}
});
return unsorted;
}
private static int cleanupOldStorage() throws Exception {
String[] args = { "--delete", "true" };
int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
return exitCode;
}
}