blob: 25020eb1dc2c89ca633c024eda999275f2861dae [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.engine.spark;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
import org.apache.kylin.shaded.com.google.common.collect.Maps;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayPrimitiveWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ByteBufferBackedInputStream;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.CubeUpdate;
import org.apache.kylin.cube.model.DimensionDesc;
import org.apache.kylin.dict.DictionaryInfo;
import org.apache.kylin.dict.DictionaryInfoSerializer;
import org.apache.kylin.dict.DictionaryManager;
import org.apache.kylin.dict.lookup.ILookupTable;
import org.apache.kylin.dict.lookup.SnapshotManager;
import org.apache.kylin.dict.lookup.SnapshotTable;
import org.apache.kylin.dict.lookup.SnapshotTableSerializer;
import org.apache.kylin.engine.mr.SortedColumnDFSFile;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.SerializableConfiguration;
import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducer;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.metadata.TableMetadataManager;
import org.apache.kylin.metadata.model.JoinDesc;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TableRef;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.IReadableTable;
import org.apache.kylin.source.SourceManager;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.Tuple3;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
public class SparkBuildDictionary extends AbstractApplication implements Serializable {
protected static final Logger logger = LoggerFactory.getLogger(SparkBuildDictionary.class);
public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
.isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
public static final Option OPTION_DICT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_DICT_PATH).hasArg()
.isRequired(true).withDescription("Cube dictionary output path").create(BatchConstants.ARG_DICT_PATH);
public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segmentId").hasArg().isRequired(true)
.withDescription("Cube Segment Id").create("segmentId");
public static final Option OPTION_CUBING_JOB_ID = OptionBuilder
.withArgName(BatchConstants.ARG_CUBING_JOB_ID).hasArg().isRequired(true)
.withDescription("Cubing job id").create(BatchConstants.ARG_CUBING_JOB_ID);
public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
.isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
.withDescription("HDFS metadata url").create("metaUrl");
public static final Option OPTION_COUNTER_PATH = OptionBuilder.withArgName(BatchConstants.ARG_COUNTER_OUTPUT).hasArg()
.isRequired(true).withDescription("counter output path").create(BatchConstants.ARG_COUNTER_OUTPUT);
private Options options;
public SparkBuildDictionary() {
options = new Options();
options.addOption(OPTION_CUBE_NAME);
options.addOption(OPTION_DICT_PATH);
options.addOption(OPTION_INPUT_PATH);
options.addOption(OPTION_SEGMENT_ID);
options.addOption(OPTION_CUBING_JOB_ID);
options.addOption(OPTION_META_URL);
options.addOption(OPTION_COUNTER_PATH);
}
@Override
protected Options getOptions() {
return options;
}
@Override
protected void execute(OptionsHelper optionsHelper) throws Exception {
String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
String dictPath = optionsHelper.getOptionValue(OPTION_DICT_PATH);
String factColumnsInputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
final String jobId = optionsHelper.getOptionValue(OPTION_CUBING_JOB_ID);
final String counterPath = optionsHelper.getOptionValue(OPTION_COUNTER_PATH);
Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1"),
Class.forName("org.apache.kylin.engine.mr.steps.SelfDefineSortableKey") ,
Class.forName("scala.collection.mutable.WrappedArray$ofRef")};
SparkConf sparkConf = new SparkConf().setAppName("Build Dimension Dictionary for: " + cubeName + " segment " + segmentId);
//serialization conf
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
sparkConf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
KylinSparkJobListener jobListener = new KylinSparkJobListener();
try(JavaSparkContext sc = new JavaSparkContext(sparkConf)) {
sc.sc().addSparkListener(jobListener);
HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(dictPath));
// calculate source record bytes size
final LongAccumulator bytesWritten = sc.sc().longAccumulator();
final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
KylinConfig config = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
CubeSegment cubeSegment = cube.getSegmentById(segmentId);
Set<TblColRef> tblColRefs = cubeSegment.getCubeDesc().getAllColumnsNeedDictionaryBuilt();
JavaRDD<TblColRef> tblColRefRDD = sc.parallelize(Lists.newArrayList(tblColRefs));
logger.info("Dimensions all is :" + cubeSegment.getCubeDesc().getDimensions().toString());
List<TblColRef> uhcColumns = cube.getDescriptor().getAllUHCColumns();
logger.info("Spark build dict uhc columns is " + uhcColumns.size());
List<Tuple2<String, Tuple3<String, Integer, Integer>>> dictsInfoMap = tblColRefRDD
.mapToPair(new DimensionDictsBuildFunction(cubeName, segmentId, config, factColumnsInputPath, dictPath, uhcColumns))
.collect();
cubeSegment = CubeManager.getInstance(config).reloadCube(cubeName).getSegmentById(segmentId);
JavaRDD<DimensionDesc> dimensionDescRDD = sc.parallelize(cubeSegment.getCubeDesc().getDimensions());
List<Tuple2<String, String>> snapshotPathMap = dimensionDescRDD.filter(new SnapshotFilterFunction(cubeName, segmentId, config))
.mapToPair(dimensionDesc -> new Tuple2<>(dimensionDesc.getTableRef().getTableIdentity(), dimensionDesc.getTableRef()))
.groupByKey()
.mapToPair(new SnapshotBuildFunction(cubeName, segmentId, jobId, config))
.filter(tuple -> !tuple._2.isEmpty())
.collect();
// work on copy instead of cached objects
CubeInstance cubeCopy = cubeSegment.getCubeInstance().latestCopyForWrite(); // get a latest copy
CubeSegment segCopy = cubeCopy.getSegmentById(cubeSegment.getUuid());
for (Tuple2<String, Tuple3<String, Integer, Integer>> tuple2 : dictsInfoMap) {
Tuple3<String, Integer, Integer> dictInfo = tuple2._2;
segCopy.getDictionaries().put(tuple2._1, dictInfo._1());
segCopy.getRowkeyStats().add(new Object[] { tuple2._1, dictInfo._2(), dictInfo._3() });
}
for (Tuple2<String, String> tuple2 : snapshotPathMap) {
segCopy.putSnapshotResPath(tuple2._1, tuple2._2);
}
CubeUpdate update = new CubeUpdate(cubeCopy);
update.setToUpdateSegs(segCopy);
CubeManager.getInstance(config).updateCube(update);
checkSnapshot(CubeManager.getInstance(config), CubeManager.getInstance(config).getCube(cubeName).getSegmentById(segmentId));
boolean dictsAndSnapshotsBuildState = isAllDictsAndSnapshotsReady(config, cubeName, segmentId);
if(!dictsAndSnapshotsBuildState) {
logger.error("Not all dictionaries and snapshots ready for cube segment: {}", segmentId);
} else {
logger.info("Succeed to build all dictionaries and snapshots for cube segment: {}", segmentId);
}
long recordCount = tblColRefRDD.count();
logger.info("Map input records={}", recordCount);
logger.info("HDFS Read: {} HDFS Write", bytesWritten.value());
logger.info("HDFS: Number of bytes written={}", jobListener.metrics.getBytesWritten());
Map<String, String> counterMap = Maps.newHashMap();
counterMap.put(ExecutableConstants.HDFS_BYTES_WRITTEN, String.valueOf(jobListener.metrics.getBytesWritten()));
counterMap.put(ExecutableConstants.SPARK_DIMENSION_DIC_SEGMENT_ID, segmentId);
// save counter to hdfs
HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, counterMap);
}
}
static class DimensionDictsBuildFunction implements PairFunction<TblColRef, String, Tuple3<String, Integer, Integer>> {
private transient volatile boolean initialized = false;
private String cubeName;
private String segmentId;
private CubeSegment cubeSegment;
private DictionaryManager dictManager;
private KylinConfig config;
private String factColumnsInputPath;
private String dictPath;
private List<TblColRef> uhcColumns;
public DimensionDictsBuildFunction(String cubeName, String segmentId, KylinConfig config, String factColumnsInputPath, String dictPath, List<TblColRef> uhcColumns) {
this.cubeName = cubeName;
this.segmentId = segmentId;
this.config = config;
this.factColumnsInputPath = factColumnsInputPath;
this.dictPath = dictPath;
this.uhcColumns = uhcColumns;
logger.info("Cube name is {}, segment id is {}", cubeName, segmentId);
logger.info("Fact columns input path is " + factColumnsInputPath);
logger.info("Fact columns input path is " + dictPath);
}
private void init() {
try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
.setAndUnsetThreadLocalConfig(config)) {
cubeSegment = CubeManager.getInstance(config).getCube(cubeName).getSegmentById(segmentId);
dictManager = DictionaryManager.getInstance(config);
}
initialized = true;
}
@Override
public Tuple2<String, Tuple3<String, Integer, Integer>> call(TblColRef tblColRef) throws Exception {
if (initialized == false) {
synchronized (SparkBuildDictionary.class) {
if (initialized == false) {
init();
}
}
}
logger.info("Building dictionary for column {}", tblColRef);
IReadableTable inpTable = getDistinctValuesFor(tblColRef);
Dictionary<String> preBuiltDict;
DictionaryInfo dictInfo;
try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
.setAndUnsetThreadLocalConfig(config)) {
preBuiltDict = getDictionary(tblColRef);
if (preBuiltDict != null) {
logger.info("Dict for '{}' has already been built, save it", tblColRef.getName());
dictInfo = dictManager.saveDictionary(tblColRef, inpTable, preBuiltDict);
} else {
logger.info("Dict for '{}' not pre-built, build it from {}", tblColRef.getName(), inpTable);
String builderClass = cubeSegment.getCubeDesc().getDictionaryBuilderClass(tblColRef);
dictInfo = dictManager.buildDictionary(tblColRef, inpTable, builderClass);
preBuiltDict = dictInfo.getDictionaryObject();
}
}
return new Tuple2<>(tblColRef.getIdentity(),
new Tuple3<>(dictInfo.getResourcePath(), preBuiltDict.getSize(), preBuiltDict.getSizeOfId()));
}
public IReadableTable getDistinctValuesFor(TblColRef col) {
return new SortedColumnDFSFile(factColumnsInputPath + "/" + col.getIdentity(), col.getType());
}
public Dictionary<String> getDictionary(TblColRef col) throws IOException {
Path colDir;
if (config.isBuildUHCDictWithMREnabled() && uhcColumns.contains(col)) {
colDir = new Path(dictPath, col.getIdentity());
} else {
colDir = new Path(factColumnsInputPath, col.getIdentity());
}
FileSystem fs = HadoopUtil.getWorkingFileSystem();
//FileSystem fs = colDir.getFileSystem(HadoopUtil.getCurrentConfiguration());
Path dictFile = HadoopUtil.getFilterOnlyPath(fs, colDir,
col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX);
if (dictFile == null) {
logger.info("Dict for '{}' not pre-built.", col.getName());
return null;
}
try (SequenceFile.Reader reader = new SequenceFile.Reader(HadoopUtil.getCurrentConfiguration(),
SequenceFile.Reader.file(dictFile))) {
NullWritable key = NullWritable.get();
ArrayPrimitiveWritable value = new ArrayPrimitiveWritable();
reader.next(key, value);
ByteBuffer buffer = new ByteArray((byte[]) value.get()).asBuffer();
try (DataInputStream is = new DataInputStream(new ByteBufferBackedInputStream(buffer))) {
String dictClassName = is.readUTF();
Dictionary<String> dict = (Dictionary<String>) ClassUtil.newInstance(dictClassName);
dict.readFields(is);
logger.info("DictionaryProvider read dict from file: {}", dictFile);
return dict;
}
}
}
}
static class SnapshotBuildFunction implements PairFunction<Tuple2<String, Iterable<TableRef>>, String, String> {
private String cubeName;
private String segmentId;
private String jobId;
private KylinConfig config;
private CubeManager cubeManager;
private CubeSegment cubeSegment;
private transient volatile boolean initialized = false;
public SnapshotBuildFunction(String cubeName, String segmentId, String jobId, KylinConfig config) {
this.cubeName = cubeName;
this.segmentId = segmentId;
this.jobId = jobId;
this.config = config;
}
private void init() {
try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
.setAndUnsetThreadLocalConfig(config)) {
cubeManager = CubeManager.getInstance(config);
cubeSegment = cubeManager.getCube(cubeName).getSegmentById(segmentId);
}
initialized = true;
}
@Override
public Tuple2<String, String> call(Tuple2<String, Iterable<TableRef>> snapShot) throws Exception {
if (initialized == false) {
synchronized (SparkBuildDictionary.class) {
if (initialized == false) {
init();
}
}
}
String tableIdentity = snapShot._1();
String snapshotPath = "";
try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
.setAndUnsetThreadLocalConfig(config)) {
logger.info("Building snapshot of {}", tableIdentity);
if (cubeSegment.getModel().isLookupTable(tableIdentity) && !cubeSegment.getCubeDesc().isExtSnapshotTable(tableIdentity)) {
try {
snapshotPath = buildSnapshotTable(config, cubeSegment, tableIdentity, jobId);
} catch (IOException e) {
logger.error("Error while build snapshot table " + tableIdentity + ", " + e.getMessage());
return new Tuple2<>(tableIdentity, snapshotPath);
}
}
}
return new Tuple2<>(tableIdentity, snapshotPath);
}
private String buildSnapshotTable(KylinConfig config, CubeSegment cubeSeg, String lookupTable, String uuid) throws IOException{
CubeInstance cubeCopy = cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy
CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid());
TableMetadataManager metaMgr = TableMetadataManager.getInstance(config);
SnapshotManager snapshotMgr = SnapshotManager.getInstance(config);
TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable, segCopy.getProject()));
IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc, uuid);
SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc, cubeSeg.getConfig());
return snapshot.getResourcePath();
}
}
static class SnapshotFilterFunction implements Function<DimensionDesc, Boolean> {
private String cubeName;
private String segmentId;
private KylinConfig config;
private CubeSegment cubeSegment;
private transient volatile boolean initialized = false;
public SnapshotFilterFunction(String cubeName, String segmentId, KylinConfig config) {
this.cubeName = cubeName;
this.segmentId = segmentId;
this.config = config;
}
private void init() {
try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
.setAndUnsetThreadLocalConfig(config)) {
cubeSegment = CubeManager.getInstance(config).getCube(cubeName).getSegmentById(segmentId);
}
initialized = true;
}
@Override
public Boolean call(DimensionDesc dimensionDesc) throws Exception {
if (initialized == false) {
synchronized (SparkBuildDictionary.class) {
if (initialized == false) {
init();
}
}
}
return !cubeSegment.getCubeDesc().isExtSnapshotTable(dimensionDesc.getTableRef().getTableIdentity());
}
}
private void checkSnapshot(CubeManager cubeManager, CubeSegment cubeSegment) {
List<DimensionDesc> dimensionDescs = cubeSegment.getCubeDesc().getDimensions();
for (DimensionDesc dimensionDesc : dimensionDescs) {
TableRef lookup = dimensionDesc.getTableRef();
String tableIdentity = lookup.getTableIdentity();
if (cubeSegment.getModel().isLookupTable(tableIdentity) && !cubeSegment.getCubeDesc().isExtSnapshotTable(tableIdentity)) {
logger.info("Checking snapshot of {}", lookup);
try {
JoinDesc join = cubeSegment.getModel().getJoinsTree().getJoinByPKSide(lookup);
ILookupTable table = cubeManager.getLookupTable(cubeSegment, join);
if (table != null) {
IOUtils.closeStream(table);
}
} catch (Throwable th) {
throw new RuntimeException(String.format(Locale.ROOT, "Checking snapshot of %s failed.", lookup), th);
}
}
}
}
private boolean isAllDictsAndSnapshotsReady(KylinConfig config, String cubeName, String segmentID) {
final CubeManager cubeManager = CubeManager.getInstance(config);
CubeInstance cube = cubeManager.reloadCube(cubeName);
CubeSegment segment = cube.getSegmentById(segmentID);
ResourceStore store = ResourceStore.getStore(config);
// check dicts
logger.info("Begin to check if all dictionaries exist of Segment: {}", segmentID);
Map<String, String> dictionaries = segment.getDictionaries();
logger.info("Get dictionaries number: {}", dictionaries.size());
for (Map.Entry<String, String> entry : dictionaries.entrySet()) {
String dictResPath = entry.getValue();
String dictKey = entry.getKey();
try {
DictionaryInfo dictInfo = store.getResource(dictResPath, DictionaryInfoSerializer.INFO_SERIALIZER);
if (dictInfo == null) {
logger.warn("Dictionary=[key: {}, resource path: {}] doesn't exist in resource store", dictKey,
dictResPath);
return false;
}
} catch (IOException e) {
logger.warn("Dictionary=[key: {}, path: {}] failed to check, details: {}", dictKey, dictResPath, e);
return false;
}
}
// check snapshots
logger.info("Begin to check if all snapshots exist of Segment: {}", segmentID);
Map<String, String> snapshots = segment.getSnapshots();
logger.info("Get snapshot number: {}", snapshots.size());
for (Map.Entry<String, String> entry : snapshots.entrySet()) {
String snapshotKey = entry.getKey();
String snapshotResPath = entry.getValue();
try {
SnapshotTable snapshot = store.getResource(snapshotResPath, SnapshotTableSerializer.INFO_SERIALIZER);
if (snapshot == null) {
logger.info("SnapshotTable=[key: {}, resource path: {}] doesn't exist in resource store",
snapshotKey, snapshotResPath);
return false;
}
} catch (IOException e) {
logger.warn("SnapshotTable=[key: {}, resource path: {}] failed to check, details: {}", snapshotKey,
snapshotResPath, e);
return false;
}
}
logger.info("All dictionaries and snapshots exist checking succeed for Cube Segment: {}", segmentID);
return true;
}
}