blob: 838c8233df564c08febe954ea37e06c36ba46f61 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.cube;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.JsonSerializer;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.persistence.WriteConflictException;
import org.apache.kylin.common.util.AutoReadWriteLock;
import org.apache.kylin.common.util.AutoReadWriteLock.AutoLock;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.SnapshotTableDesc;
import org.apache.kylin.metadata.TableMetadataManager;
import org.apache.kylin.metadata.cachesync.Broadcaster;
import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.IEngineAware;
import org.apache.kylin.metadata.model.PartitionDesc;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentRange.TSRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.Segments;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.kylin.metadata.project.RealizationEntry;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.realization.IRealizationProvider;
import org.apache.kylin.metadata.realization.RealizationRegistry;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.source.SourcePartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kylin.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.kylin.shaded.com.google.common.base.Preconditions;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
import org.apache.kylin.shaded.com.google.common.collect.Maps;
import org.apache.kylin.shaded.com.google.common.collect.Sets;
/**
* @author yangli9
*/
public class CubeManager implements IRealizationProvider {
private static String ALPHA_NUM = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
private static int HBASE_TABLE_LENGTH = 10;
private static int PARQUET_IDENTIFIER_LENGTH = 3;
public static final Serializer<CubeInstance> CUBE_SERIALIZER = new JsonSerializer<>(CubeInstance.class);
private static final Logger logger = LoggerFactory.getLogger(CubeManager.class);
public static CubeManager getInstance(KylinConfig config) {
return config.getManager(CubeManager.class);
}
// called by reflection
static CubeManager newInstance(KylinConfig config) throws IOException {
return new CubeManager(config);
}
// ============================================================================
private KylinConfig config;
// cube name ==> CubeInstance
private CaseInsensitiveStringCache<CubeInstance> cubeMap;
private CachedCrudAssist<CubeInstance> crud;
// protects concurrent operations around the cached map, to avoid for example
// writing an entity in the middle of reloading it (dirty read)
private AutoReadWriteLock cubeMapLock = new AutoReadWriteLock();
// for generation hbase table name of a new segment
private ConcurrentMap<String, String> usedStorageLocation = new ConcurrentHashMap<>();
// a few inner classes to group related methods
private SegmentAssist segAssist = new SegmentAssist();
private Random ran = new Random();
private CubeManager(KylinConfig cfg) throws IOException {
logger.info("Initializing CubeManager with config {}", cfg);
this.config = cfg;
this.cubeMap = new CaseInsensitiveStringCache<CubeInstance>(config, "cube");
this.crud = new CachedCrudAssist<CubeInstance>(getStore(), ResourceStore.CUBE_RESOURCE_ROOT, CubeInstance.class,
cubeMap) {
@Override
protected CubeInstance initEntityAfterReload(CubeInstance cube, String resourceName) {
cube.init(config);
for (CubeSegment segment : cube.getSegments()) {
usedStorageLocation.put(segment.getUuid(), segment.getStorageLocationIdentifier());
}
return cube;
}
};
this.crud.setCheckCopyOnWrite(true);
// touch lower level metadata before registering my listener
crud.reloadAll();
Broadcaster.getInstance(config).registerListener(new CubeSyncListener(), "cube");
}
private class CubeSyncListener extends Broadcaster.Listener {
@Override
public void onProjectSchemaChange(Broadcaster broadcaster, String project) throws IOException {
ProjectManager projectManager = ProjectManager.getInstance(config);
for (IRealization real : projectManager.listAllRealizations(project)) {
if (real instanceof CubeInstance) {
reloadCubeQuietly(real.getName());
}
}
projectManager.reloadProjectL2Cache(project);
}
@Override
public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey)
throws IOException {
String cubeName = cacheKey;
if (event == Event.DROP)
removeCubeLocal(cubeName);
else
reloadCubeQuietly(cubeName);
for (ProjectInstance prj : ProjectManager.getInstance(config).findProjects(RealizationType.CUBE,
cubeName)) {
broadcaster.notifyProjectDataUpdate(prj.getName());
}
}
}
/**
* List all cubes from cache. Note the metadata may be out of date
* @return
*/
public List<CubeInstance> listAllCubes() {
try (AutoLock lock = cubeMapLock.lockForRead()) {
return new ArrayList<CubeInstance>(cubeMap.values());
}
}
/**
* Reload the cubes from database and list all cubes
* @return
* @throws IOException
*/
public List<CubeInstance> reloadAndListAllCubes() throws IOException {
try (AutoLock lock = cubeMapLock.lockForWrite()) {
crud.reloadAll();
}
return listAllCubes();
}
public CubeInstance getCube(String cubeName) {
try (AutoLock lock = cubeMapLock.lockForRead()) {
return cubeMap.get(cubeName);
}
}
public CubeInstance getCubeByUuid(String uuid) {
try (AutoLock lock = cubeMapLock.lockForRead()) {
for (CubeInstance cube : cubeMap.values()) {
if (uuid.equals(cube.getUuid()))
return cube;
}
return null;
}
}
public List<String> getErrorCubes() {
return crud.getLoadFailedEntities();
}
/**
* Get related Cubes by cubedesc name. By default, the desc name will be
* translated into upper case.
*
* @param descName CubeDesc name
* @return
*/
public List<CubeInstance> getCubesByDesc(String descName) {
try (AutoLock lock = cubeMapLock.lockForRead()) {
List<CubeInstance> list = listAllCubes();
List<CubeInstance> result = new ArrayList<CubeInstance>();
Iterator<CubeInstance> it = list.iterator();
while (it.hasNext()) {
CubeInstance ci = it.next();
if (descName.equalsIgnoreCase(ci.getDescName())) {
result.add(ci);
}
}
return result;
}
}
public CubeInstance createCube(String cubeName, String projectName, CubeDesc desc, String owner)
throws IOException {
try (AutoLock lock = cubeMapLock.lockForWrite()) {
logger.info("Creating cube '{}-->{}' from desc '{}'", projectName, cubeName, desc.getName());
// save cube resource
CubeInstance cube = CubeInstance.create(cubeName, desc);
cube.setOwner(owner);
updateCubeWithRetry(new CubeUpdate(cube), 0);
ProjectManager.getInstance(config).moveRealizationToProject(RealizationType.CUBE, cubeName, projectName,
owner);
return cube;
}
}
public CubeInstance createCube(CubeInstance cube, String projectName, String owner) throws IOException {
try (AutoLock lock = cubeMapLock.lockForWrite()) {
logger.info("Creating cube '{}-->{}' from instance object. '", projectName, cube.getName());
// save cube resource
cube.setOwner(owner);
updateCubeWithRetry(new CubeUpdate(cube), 0);
ProjectManager.getInstance(config).moveRealizationToProject(RealizationType.CUBE, cube.getName(),
projectName, owner);
return cube;
}
}
/**
* when clear all segments, it's supposed to reinitialize the CubeInstance
*/
public CubeInstance clearSegments(CubeInstance cube) throws IOException {
try (AutoLock lock = cubeMapLock.lockForWrite()) {
cube = cube.latestCopyForWrite(); // get a latest copy
CubeUpdate update = new CubeUpdate(cube);
update.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
update.setCuboids(Maps.<Long, Long> newHashMap());
update.setCuboidsRecommend(Sets.<Long> newHashSet());
update.setUpdateTableSnapshotPath(Maps.<String, String> newHashMap());
update.setCreateTimeUTC(System.currentTimeMillis());
update.setCuboidLastOptimized(0L);
return updateCube(update);
}
}
public CubeInstance updateCube(CubeUpdate update) throws IOException {
return updateCube(update, false);
}
// try minimize the use of this method, use udpateCubeXXX() instead
public CubeInstance updateCube(CubeUpdate update, boolean isLocal) throws IOException {
try (AutoLock lock = cubeMapLock.lockForWrite()) {
CubeInstance cube = updateCubeWithRetry(update, 0, isLocal);
return cube;
}
}
public CubeInstance updateCubeStatus(CubeInstance cube, RealizationStatusEnum newStatus) throws IOException {
try (AutoLock lock = cubeMapLock.lockForWrite()) {
cube = cube.latestCopyForWrite(); // get a latest copy
CubeUpdate update = new CubeUpdate(cube);
update.setStatus(newStatus);
ProjectManager.getInstance(config).touchProject(cube.getProject());
return updateCube(update);
}
}
public CubeInstance updateCubeOwner(CubeInstance cube, String owner) throws IOException {
try (AutoLock lock = cubeMapLock.lockForWrite()) {
cube = cube.latestCopyForWrite(); // get a latest copy
CubeUpdate update = new CubeUpdate(cube);
update.setOwner(owner);
ProjectManager.getInstance(config).touchProject(cube.getProject());
return updateCube(update);
}
}
public CubeInstance updateCubeDropSegments(CubeInstance cube, Collection<CubeSegment> segsToDrop)
throws IOException {
CubeSegment[] arr = (CubeSegment[]) segsToDrop.toArray(new CubeSegment[segsToDrop.size()]);
return updateCubeDropSegments(cube, arr);
}
public CubeInstance updateCubeDropSegments(CubeInstance cube, CubeSegment... segsToDrop) throws IOException {
try (AutoLock lock = cubeMapLock.lockForWrite()) {
cube = cube.latestCopyForWrite(); // get a latest copy
CubeUpdate update = new CubeUpdate(cube);
update.setToRemoveSegs(segsToDrop);
return updateCube(update);
}
}
public CubeInstance dropOptmizingSegments(CubeInstance cube, CubeSegment... segsToDrop) throws IOException {
try (AutoLock lock = cubeMapLock.lockForWrite()) {
cube = cube.latestCopyForWrite(); // get a latest copy
CubeUpdate update = new CubeUpdate(cube);
update.setToRemoveSegs(segsToDrop);
update.setCuboidsRecommend(Sets.<Long> newHashSet()); //Set recommend cuboids to be null
return updateCube(update);
}
}
public CubeInstance updateCubeSegStatus(CubeSegment seg, SegmentStatusEnum status) throws IOException {
try (AutoLock lock = cubeMapLock.lockForWrite()) {
CubeInstance cube = seg.getCubeInstance().latestCopyForWrite();
seg = cube.getSegmentById(seg.getUuid());
CubeUpdate update = new CubeUpdate(cube);
seg.setStatus(status);
update.setToUpdateSegs(seg);
return updateCube(update);
}
}
public CubeInstance updateCubeLookupSnapshot(CubeInstance cube, String lookupTableName, String newSnapshotResPath)
throws IOException {
try (AutoLock lock = cubeMapLock.lockForWrite()) {
cube = cube.latestCopyForWrite();
CubeUpdate update = new CubeUpdate(cube);
Map<String, String> map = Maps.newHashMap();
map.put(lookupTableName, newSnapshotResPath);
update.setUpdateTableSnapshotPath(map);
return updateCube(update);
}
}
private CubeInstance updateCubeWithRetry(CubeUpdate update, int retry) throws IOException {
return updateCubeWithRetry(update, retry, false);
}
private CubeInstance updateCubeWithRetry(CubeUpdate update, int retry, boolean isLocal) throws IOException {
if (update == null || update.getCubeInstance() == null)
throw new IllegalStateException();
CubeInstance cube = update.getCubeInstance();
logger.info("Updating cube instance '{}'", cube.getName());
Segments<CubeSegment> newSegs = (Segments) cube.getSegments().clone();
if (update.getToAddSegs() != null)
newSegs.addAll(Arrays.asList(update.getToAddSegs()));
List<String> toRemoveResources = Lists.newArrayList();
if (update.getToRemoveSegs() != null) {
processToRemoveSegments(update, newSegs, toRemoveResources);
}
if (update.getToUpdateSegs() != null) {
processToUpdateSegments(update, newSegs);
}
Collections.sort(newSegs);
newSegs.validate();
cube.setSegments(newSegs);
setCubeMember(cube, update);
try {
cube = crud.save(cube, isLocal);
} catch (WriteConflictException ise) {
logger.warn("Write conflict to update cube {} at try {}, will retry...", cube.getName(), retry);
if (retry >= 7) {
logger.error("Retried 7 times till got error, abandoning...", ise);
throw ise;
}
cube = crud.reload(cube.getName());
update.setCubeInstance(cube.latestCopyForWrite());
return updateCubeWithRetry(update, ++retry);
}
for (String resource : toRemoveResources) {
try {
getStore().deleteResource(resource);
} catch (IOException ioe) {
logger.error("Failed to delete resource {}", toRemoveResources);
}
}
//this is a duplicate call to take care of scenarios where REST cache service unavailable
ProjectManager.getInstance(cube.getConfig()).clearL2Cache(cube.getProject());
return cube;
}
private void setCubeMember(CubeInstance cube, CubeUpdate update) {
if (update.getStatus() != null) {
cube.setStatus(update.getStatus());
}
if (update.getOwner() != null) {
cube.setOwner(update.getOwner());
}
if (update.getCost() > 0) {
cube.setCost(update.getCost());
}
if (update.getCuboids() != null) {
cube.setCuboids(update.getCuboids());
}
if (update.getCuboidsRecommend() != null) {
cube.setCuboidsRecommend(update.getCuboidsRecommend());
}
if (update.getUpdateTableSnapshotPath() != null) {
for (Map.Entry<String, String> lookupSnapshotPathEntry : update.getUpdateTableSnapshotPath().entrySet()) {
cube.putSnapshotResPath(lookupSnapshotPathEntry.getKey(), lookupSnapshotPathEntry.getValue());
}
}
if (update.getCreateTimeUTC() >= 0) {
cube.setCreateTimeUTC(update.getCreateTimeUTC());
}
if (update.getCuboidLastOptimized() >= 0) {
cube.setCuboidLastOptimized(update.getCuboidLastOptimized());
}
}
private void processToUpdateSegments(CubeUpdate update, Segments<CubeSegment> newSegs) {
for (CubeSegment segment : update.getToUpdateSegs()) {
for (int i = 0; i < newSegs.size(); i++) {
if (newSegs.get(i).getUuid().equals(segment.getUuid())) {
newSegs.set(i, segment);
break;
}
}
}
}
private void processToRemoveSegments(CubeUpdate update, Segments<CubeSegment> newSegs,
List<String> toRemoveResources) {
Iterator<CubeSegment> iterator = newSegs.iterator();
while (iterator.hasNext()) {
CubeSegment currentSeg = iterator.next();
for (CubeSegment toRemoveSeg : update.getToRemoveSegs()) {
if (currentSeg.getUuid().equals(toRemoveSeg.getUuid())) {
logger.info("Remove segment {}", currentSeg);
toRemoveResources.add(currentSeg.getStatisticsResourcePath());
iterator.remove();
break;
}
}
}
}
// for test
public CubeInstance reloadCube(String cubeName) {
try (AutoLock lock = cubeMapLock.lockForWrite()) {
return crud.reload(cubeName);
}
}
public CubeInstance reloadCubeQuietly(String cubeName) {
try (AutoLock lock = cubeMapLock.lockForWrite()) {
CubeInstance cube = crud.reloadQuietly(cubeName);
if (cube != null)
Cuboid.clearCache(cube);
return cube;
}
}
public void removeCubeLocal(String cubeName) {
try (AutoLock lock = cubeMapLock.lockForWrite()) {
CubeInstance cube = cubeMap.get(cubeName);
if (cube != null) {
cubeMap.removeLocal(cubeName);
for (CubeSegment segment : cube.getSegments()) {
usedStorageLocation.remove(segment.getUuid());
}
Cuboid.clearCache(cube);
}
}
}
public CubeInstance dropCube(String cubeName, boolean deleteDesc) throws IOException {
try (AutoLock lock = cubeMapLock.lockForWrite()) {
logger.info("Dropping cube '{}'", cubeName);
// load projects before remove cube from project
// delete cube instance and cube desc
CubeInstance cube = getCube(cubeName);
// remove cube and update cache
crud.delete(cube);
Cuboid.clearCache(cube);
if (deleteDesc && cube.getDescriptor() != null) {
CubeDescManager.getInstance(config).removeCubeDesc(cube.getDescriptor());
}
// delete cube from project
ProjectManager.getInstance(config).removeRealizationsFromProjects(RealizationType.CUBE, cubeName);
return cube;
}
}
private String getSnapshotResPath(CubeSegment cubeSegment, String tableName, SnapshotTableDesc snapshotTableDesc) {
String snapshotResPath;
if (snapshotTableDesc == null || !snapshotTableDesc.isGlobal()) {
snapshotResPath = cubeSegment.getSnapshotResPath(tableName);
} else {
snapshotResPath = cubeSegment.getCubeInstance().getSnapshotResPath(tableName);
}
if (snapshotResPath == null) {
throw new IllegalStateException("No snapshot for table '" + tableName + "' found on cube segment"
+ cubeSegment.getCubeInstance().getName() + "/" + cubeSegment);
}
return snapshotResPath;
}
@VisibleForTesting
/*private*/ String generateStorageLocation(int engineType) {
String namePrefix = config.getHBaseTableNamePrefix();
String namespace = config.getHBaseStorageNameSpace();
String tableName = "";
do {
StringBuffer sb = new StringBuffer();
int identifierLength = HBASE_TABLE_LENGTH;
if (engineType != IEngineAware.ID_SPARK_II) {
if ((namespace.equals("default") || namespace.equals("")) == false) {
sb.append(namespace).append(":");
}
sb.append(namePrefix);
} else {
identifierLength = PARQUET_IDENTIFIER_LENGTH;
}
for (int i = 0; i < identifierLength; i++) {
sb.append(ALPHA_NUM.charAt(ran.nextInt(ALPHA_NUM.length())));
}
tableName = sb.toString();
} while (this.usedStorageLocation.containsValue(tableName));
return tableName;
}
public CubeInstance copyForWrite(CubeInstance cube) {
return crud.copyForWrite(cube);
}
private boolean isReady(CubeSegment seg) {
return seg.getStatus() == SegmentStatusEnum.READY;
}
private TableMetadataManager getTableManager() {
return TableMetadataManager.getInstance(config);
}
private ResourceStore getStore() {
return ResourceStore.getStore(this.config);
}
@Override
public RealizationType getRealizationType() {
return RealizationType.CUBE;
}
@Override
public IRealization getRealization(String name) {
return getCube(name);
}
// ============================================================================
// Segment related methods
// ============================================================================
// append a full build segment
public CubeSegment appendSegment(CubeInstance cube) throws IOException {
return appendSegment(cube, null, null, null, null);
}
public CubeSegment appendSegment(CubeInstance cube, TSRange tsRange) throws IOException {
return appendSegment(cube, tsRange, null, null, null);
}
public CubeSegment appendSegment(CubeInstance cube, SourcePartition src) throws IOException {
return appendSegment(cube, src.getTSRange(), src.getSegRange(), src.getSourcePartitionOffsetStart(),
src.getSourcePartitionOffsetEnd());
}
CubeSegment appendSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange,
Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd)
throws IOException {
try (AutoLock lock = cubeMapLock.lockForWrite()) {
return segAssist.appendSegment(cube, tsRange, segRange, sourcePartitionOffsetStart,
sourcePartitionOffsetEnd);
}
}
public CubeSegment refreshSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange) throws IOException {
try (AutoLock lock = cubeMapLock.lockForWrite()) {
return segAssist.refreshSegment(cube, tsRange, segRange);
}
}
public CubeSegment[] optimizeSegments(CubeInstance cube, Set<Long> cuboidsRecommend) throws IOException {
try (AutoLock lock = cubeMapLock.lockForWrite()) {
return segAssist.optimizeSegments(cube, cuboidsRecommend);
}
}
public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, SegmentRange segRange, boolean force)
throws IOException {
try (AutoLock lock = cubeMapLock.lockForWrite()) {
return segAssist.mergeSegments(cube, tsRange, segRange, force);
}
}
public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment newSegment) throws IOException {
try (AutoLock lock = cubeMapLock.lockForWrite()) {
segAssist.promoteNewlyBuiltSegments(cube, newSegment);
}
}
public void promoteNewlyOptimizeSegments(CubeInstance cube, CubeSegment... optimizedSegments) throws IOException {
try (AutoLock lock = cubeMapLock.lockForWrite()) {
segAssist.promoteNewlyOptimizeSegments(cube, optimizedSegments);
}
}
public void promoteCheckpointOptimizeSegments(CubeInstance cube, Map<Long, Long> recommendCuboids,
CubeSegment... optimizedSegments) throws IOException {
try (AutoLock lock = cubeMapLock.lockForWrite()) {
segAssist.promoteCheckpointOptimizeSegments(cube, recommendCuboids, optimizedSegments);
}
}
public List<CubeSegment> calculateHoles(String cubeName) {
return segAssist.calculateHoles(cubeName);
}
private class SegmentAssist {
CubeSegment appendSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange,
Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd)
throws IOException {
CubeInstance cubeCopy = cube.latestCopyForWrite(); // get a latest copy
checkInputRanges(tsRange, segRange);
// fix start/end a bit
PartitionDesc partitionDesc = cubeCopy.getModel().getPartitionDesc();
if (partitionDesc != null && partitionDesc.isPartitioned()) {
// if missing start, set it to where last time ends
if (tsRange != null && tsRange.start.v == 0) {
CubeDesc cubeDesc = cubeCopy.getDescriptor();
CubeSegment last = cubeCopy.getLastSegment();
if (last == null)
tsRange = new TSRange(cubeDesc.getPartitionDateStart(), tsRange.end.v);
else if (!last.isOffsetCube())
tsRange = new TSRange(last.getTSRange().end.v, tsRange.end.v);
}
} else {
// full build
tsRange = null;
segRange = null;
}
CubeSegment newSegment = newSegment(cubeCopy, tsRange, segRange);
newSegment.setSourcePartitionOffsetStart(sourcePartitionOffsetStart);
newSegment.setSourcePartitionOffsetEnd(sourcePartitionOffsetEnd);
validateNewSegments(cubeCopy, newSegment);
CubeUpdate update = new CubeUpdate(cubeCopy);
update.setToAddSegs(newSegment);
updateCube(update);
return newSegment;
}
public CubeSegment refreshSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange)
throws IOException {
CubeInstance cubeCopy = cube.latestCopyForWrite(); // get a latest copy
checkInputRanges(tsRange, segRange);
PartitionDesc partitionDesc = cubeCopy.getModel().getPartitionDesc();
if (partitionDesc == null || partitionDesc.isPartitioned() == false) {
// full build
tsRange = null;
segRange = null;
}
CubeSegment newSegment = newSegment(cubeCopy, tsRange, segRange);
Pair<Boolean, Boolean> pair = cubeCopy.getSegments().fitInSegments(newSegment);
if (pair.getFirst() == false || pair.getSecond() == false)
throw new IllegalArgumentException("The new refreshing segment " + newSegment
+ " does not match any existing segment in cube " + cubeCopy);
if (segRange != null) {
CubeSegment toRefreshSeg = null;
for (CubeSegment cubeSegment : cubeCopy.getSegments()) {
if (cubeSegment.getSegRange().equals(segRange)) {
toRefreshSeg = cubeSegment;
break;
}
}
if (toRefreshSeg == null) {
throw new IllegalArgumentException(
"For streaming cube, only one segment can be refreshed at one time");
}
newSegment.setSourcePartitionOffsetStart(toRefreshSeg.getSourcePartitionOffsetStart());
newSegment.setSourcePartitionOffsetEnd(toRefreshSeg.getSourcePartitionOffsetEnd());
}
CubeUpdate update = new CubeUpdate(cubeCopy);
update.setToAddSegs(newSegment);
updateCube(update);
return newSegment;
}
public CubeSegment[] optimizeSegments(CubeInstance cube, Set<Long> cuboidsRecommend) throws IOException {
CubeInstance cubeCopy = cube.latestCopyForWrite(); // get a latest copy
List<CubeSegment> readySegments = cubeCopy.getSegments(SegmentStatusEnum.READY);
CubeSegment[] optimizeSegments = new CubeSegment[readySegments.size()];
int i = 0;
for (CubeSegment segment : readySegments) {
CubeSegment newSegment = newSegment(cubeCopy, segment.getTSRange(), null);
validateNewSegments(cubeCopy, newSegment);
optimizeSegments[i++] = newSegment;
}
CubeUpdate update = new CubeUpdate(cubeCopy);
update.setCuboidsRecommend(cuboidsRecommend);
update.setToAddSegs(optimizeSegments);
updateCube(update);
return optimizeSegments;
}
public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, SegmentRange segRange, boolean force)
throws IOException {
CubeInstance cubeCopy = cube.latestCopyForWrite(); // get a latest copy
if (cubeCopy.getSegments().isEmpty())
throw new IllegalArgumentException("Cube " + cubeCopy + " has no segments");
checkInputRanges(tsRange, segRange);
checkCubeIsPartitioned(cubeCopy);
if (cubeCopy.getSegments().getFirstSegment().isOffsetCube()) {
// offset cube, merge by date range?
segRange = getOffsetCubeSegRange(cubeCopy, tsRange, segRange);
tsRange = null;
Preconditions.checkArgument(segRange != null);
} else {
/**In case of non-streaming segment,
* tsRange is the same as segRange,
* either could fulfill the merge job,
* so it needs to convert segRange to tsRange if tsRange is null.
**/
if (tsRange == null) {
tsRange = new TSRange((Long) segRange.start.v, (Long) segRange.end.v);
}
segRange = null;
}
CubeSegment newSegment = newSegment(cubeCopy, tsRange, segRange);
newSegment.setMerged(true);
Segments<CubeSegment> mergingSegments = cubeCopy.getMergingSegments(newSegment);
if (mergingSegments.size() <= 1)
throw new IllegalArgumentException("Range " + newSegment.getSegRange()
+ " must contain at least 2 segments, but there is " + mergingSegments.size());
CubeSegment first = mergingSegments.get(0);
CubeSegment last = mergingSegments.get(mergingSegments.size() - 1);
if (!force) {
checkReadyForMerge(mergingSegments);
}
if (first.isOffsetCube()) {
newSegment.setSegRange(new SegmentRange(first.getSegRange().start, last.getSegRange().end));
newSegment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
newSegment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd());
newSegment.setTSRange(null);
} else {
newSegment.setTSRange(new TSRange(mergingSegments.getTSStart(), mergingSegments.getTSEnd()));
newSegment.setSegRange(null);
}
validateNewSegments(cubeCopy, newSegment);
CubeUpdate update = new CubeUpdate(cubeCopy);
update.setToAddSegs(newSegment);
updateCube(update);
return newSegment;
}
private void checkReadyForMerge(Segments<CubeSegment> mergingSegments) {
// check if the segments to be merged are continuous
for (int i = 0; i < mergingSegments.size() - 1; i++) {
if (!mergingSegments.get(i).getSegRange().connects(mergingSegments.get(i + 1).getSegRange()))
throw new IllegalStateException("Merging segments must not have gaps between "
+ mergingSegments.get(i) + " and " + mergingSegments.get(i + 1));
}
// check if the segments to be merged are not empty
List<String> emptySegment = Lists.newArrayList();
for (CubeSegment seg : mergingSegments) {
if (seg.getSizeKB() == 0 && seg.getInputRecords() == 0) {
emptySegment.add(seg.getName());
}
}
long maxSegMergeSpan = KylinConfig.getInstanceFromEnv().getMaxSegmentMergeSpan();
for (CubeSegment seg : mergingSegments) {
if (maxSegMergeSpan > 0 && seg.getTSRange().duration() > maxSegMergeSpan) {
throw new IllegalArgumentException(
"Segment range is larger than the max segement merge span, couldn't merge unless 'forceMergeEmptySegment' set to true: "
+ seg);
}
}
if (emptySegment.size() > 0) {
throw new IllegalArgumentException(
"Empty cube segment found, couldn't merge unless 'forceMergeEmptySegment' set to true: "
+ emptySegment);
}
}
private SegmentRange getOffsetCubeSegRange(CubeInstance cubeCopy, TSRange tsRange, SegmentRange segRange) {
if (segRange == null && tsRange != null) {
Pair<CubeSegment, CubeSegment> pair = cubeCopy.getSegments(SegmentStatusEnum.READY)
.findMergeOffsetsByDateRange(tsRange, Long.MAX_VALUE);
if (pair == null)
throw new IllegalArgumentException(
"Find no segments to merge by " + tsRange + " for cube " + cubeCopy);
segRange = new SegmentRange(pair.getFirst().getSegRange().start, pair.getSecond().getSegRange().end);
}
return segRange;
}
private void checkInputRanges(TSRange tsRange, SegmentRange segRange) {
if (tsRange != null && segRange != null) {
throw new IllegalArgumentException(
"Build or refresh cube segment either by TSRange or by SegmentRange, not both.");
}
}
private void checkCubeIsPartitioned(CubeInstance cube) {
if (cube.getDescriptor().getModel().getPartitionDesc().isPartitioned() == false) {
throw new IllegalStateException(
"there is no partition date column specified, only full build is supported");
}
}
private CubeSegment newSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange) {
DataModelDesc modelDesc = cube.getModel();
CubeSegment segment = new CubeSegment();
segment.setUuid(RandomUtil.randomUUID().toString());
segment.setName(CubeSegment.makeSegmentName(tsRange, segRange, modelDesc));
segment.setCreateTimeUTC(System.currentTimeMillis());
segment.setCubeInstance(cube);
// let full build range be backward compatible
if (tsRange == null && segRange == null)
tsRange = new TSRange(0L, Long.MAX_VALUE);
segment.setTSRange(tsRange);
segment.setSegRange(segRange);
segment.setStatus(SegmentStatusEnum.NEW);
segment.setStorageLocationIdentifier(generateStorageLocation(cube.getEngineType()));
Map<String, String> additionalInfo = segment.getAdditionalInfo();
additionalInfo.put("storageType", "" + cube.getStorageType());
segment.setAdditionalInfo(additionalInfo);
segment.setCubeInstance(cube);
segment.validate();
return segment;
}
public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment newSegCopy) throws IOException {
// double check the updating objects are not on cache
if (newSegCopy.getCubeInstance().isCachedAndShared())
throw new IllegalStateException();
CubeInstance cubeCopy = getCube(cube.getName()).latestCopyForWrite();
if (StringUtils.isBlank(newSegCopy.getStorageLocationIdentifier()))
throw new IllegalStateException(
String.format(Locale.ROOT, "For cube %s, segment %s missing StorageLocationIdentifier",
cubeCopy.toString(), newSegCopy.toString()));
if (StringUtils.isBlank(newSegCopy.getLastBuildJobID()))
throw new IllegalStateException(String.format(Locale.ROOT,
"For cube %s, segment %s missing LastBuildJobID", cubeCopy.toString(), newSegCopy.toString()));
if (isReady(newSegCopy)) {
logger.warn("For cube {}, segment {} state should be NEW but is READY", cubeCopy, newSegCopy);
}
List<CubeSegment> tobe = cubeCopy.calculateToBeSegments(newSegCopy);
if (tobe.contains(newSegCopy) == false)
throw new IllegalStateException(
String.format(Locale.ROOT, "For cube %s, segment %s is expected but not in the tobe %s",
cubeCopy.toString(), newSegCopy.toString(), tobe.toString()));
newSegCopy.setStatus(SegmentStatusEnum.READY);
List<CubeSegment> toRemoveSegs = Lists.newArrayList();
for (CubeSegment segment : cubeCopy.getSegments()) {
if (!tobe.contains(segment))
toRemoveSegs.add(segment);
}
logger.info("Promoting cube {}, new segment {}, to remove segments {}", cubeCopy, newSegCopy, toRemoveSegs);
CubeUpdate update = new CubeUpdate(cubeCopy);
update.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()]))
.setToUpdateSegs(newSegCopy);
if (cube.getConfig().isJobAutoReadyCubeEnabled()) {
update.setStatus(RealizationStatusEnum.READY);
}
updateCube(update);
}
public void promoteNewlyOptimizeSegments(CubeInstance cube, CubeSegment... optimizedSegments)
throws IOException {
CubeInstance cubeCopy = cube.latestCopyForWrite();
CubeSegment[] segCopy = cube.regetSegments(optimizedSegments);
for (CubeSegment seg : segCopy) {
seg.setStatus(SegmentStatusEnum.READY_PENDING);
}
CubeUpdate update = new CubeUpdate(cubeCopy);
update.setToUpdateSegs(segCopy);
updateCube(update);
}
public void promoteCheckpointOptimizeSegments(CubeInstance cube, Map<Long, Long> recommendCuboids,
CubeSegment... optimizedSegments) throws IOException {
CubeInstance cubeCopy = cube.latestCopyForWrite();
CubeSegment[] optSegCopy = cubeCopy.regetSegments(optimizedSegments);
if (cubeCopy.getSegments().size() != optSegCopy.length * 2) {
throw new IllegalStateException(String.format(Locale.ROOT,
"For cube %s, every READY segment should be optimized and all segments should be READY before optimizing",
cubeCopy.toString()));
}
CubeSegment[] originalSegments = new CubeSegment[optSegCopy.length];
int i = 0;
for (CubeSegment seg : optSegCopy) {
originalSegments[i++] = cubeCopy.getOriginalSegmentToOptimize(seg);
if (StringUtils.isBlank(seg.getStorageLocationIdentifier()))
throw new IllegalStateException(
String.format(Locale.ROOT, "For cube %s, segment %s missing StorageLocationIdentifier",
cubeCopy.toString(), seg.toString()));
if (StringUtils.isBlank(seg.getLastBuildJobID()))
throw new IllegalStateException(String.format(Locale.ROOT,
"For cube %s, segment %s missing LastBuildJobID", cubeCopy.toString(), seg.toString()));
seg.setStatus(SegmentStatusEnum.READY);
}
logger.info("Promoting cube {}, new segments {}, to remove segments {}", cubeCopy,
Arrays.toString(optSegCopy), originalSegments);
CubeUpdate update = new CubeUpdate(cubeCopy);
update.setToRemoveSegs(originalSegments) //
.setToUpdateSegs(optSegCopy) //
.setCuboids(recommendCuboids) //
.setCuboidsRecommend(Sets.<Long> newHashSet());
if (cube.getConfig().isJobAutoReadyCubeEnabled()) {
update.setStatus(RealizationStatusEnum.READY);
}
updateCube(update);
}
private void validateNewSegments(CubeInstance cube, CubeSegment newSegments) {
List<CubeSegment> tobe = cube.calculateToBeSegments(newSegments);
List<CubeSegment> newList = Arrays.asList(newSegments);
if (tobe.containsAll(newList) == false) {
throw new IllegalStateException(String.format(Locale.ROOT,
"For cube %s, the new segments %s do not fit in its current %s; the resulted tobe is %s",
cube.toString(), newList.toString(), cube.getSegments().toString(), tobe.toString()));
}
}
/**
* Calculate the holes (gaps) in segments.
* @param cubeName
* @return
*/
public List<CubeSegment> calculateHoles(String cubeName) {
List<CubeSegment> holes = Lists.newArrayList();
final CubeInstance cube = getCube(cubeName);
DataModelDesc modelDesc = cube.getModel();
Preconditions.checkNotNull(cube);
final List<CubeSegment> segments = cube.getSegments();
logger.info("totally {} cubeSegments", segments.size());
if (segments.size() == 0) {
return holes;
}
Collections.sort(segments);
for (int i = 0; i < segments.size() - 1; ++i) {
CubeSegment first = segments.get(i);
CubeSegment second = segments.get(i + 1);
if (first.getSegRange().connects(second.getSegRange()))
continue;
if (first.getSegRange().apartBefore(second.getSegRange())) {
CubeSegment hole = new CubeSegment();
hole.setCubeInstance(cube);
if (first.isOffsetCube()) {
hole.setSegRange(new SegmentRange(first.getSegRange().end, second.getSegRange().start));
hole.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetEnd());
hole.setSourcePartitionOffsetEnd(second.getSourcePartitionOffsetStart());
hole.setName(CubeSegment.makeSegmentName(null, hole.getSegRange(), modelDesc));
} else {
hole.setTSRange(new TSRange(first.getTSRange().end.v, second.getTSRange().start.v));
hole.setName(CubeSegment.makeSegmentName(hole.getTSRange(), null, modelDesc));
}
holes.add(hole);
}
}
return holes;
}
}
// ============================================================================
// Dictionary/Snapshot related methods
// ============================================================================
/**
* To keep "select * from LOOKUP_TABLE" has consistent and latest result, we manually choose
* CubeInstance here to answer such query.
*/
public CubeInstance findLatestSnapshot(List<RealizationEntry> realizationEntries, String lookupTableName,
CubeInstance cubeInstance) {
CubeInstance cube = null;
try {
if (!realizationEntries.isEmpty()) {
long maxBuildTime = Long.MIN_VALUE;
RealizationRegistry registry = RealizationRegistry.getInstance(config);
for (RealizationEntry entry : realizationEntries) {
IRealization realization = registry.getRealization(entry.getType(), entry.getRealization());
if (realization != null && realization.isReady() && realization instanceof CubeInstance) {
CubeInstance current = (CubeInstance) realization;
if (current.getDescriptor().findDimensionByTable(lookupTableName) != null) {
CubeSegment segment = current.getLatestReadySegment();
if (segment != null) {
long latestBuildTime = segment.getLastBuildTime();
if (latestBuildTime > maxBuildTime) {
maxBuildTime = latestBuildTime;
cube = current;
}
}
}
}
}
}
} catch (Exception e) {
logger.info("Unexpected error.", e);
throw e;
}
if (!cubeInstance.equals(cube)) {
logger.debug("Picked cube {} over {} as it provides a more recent snapshot of the lookup table {}", cube,
cubeInstance, lookupTableName);
}
return cube == null ? cubeInstance : cube;
}
}