blob: f5c6d2d853f882d8069aa768b1e0baa3bccab874 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.rest.service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager;
import org.apache.kylin.dict.lookup.IExtLookupTableCache.CacheState;
import org.apache.kylin.dict.lookup.LookupProviderFactory;
import org.apache.kylin.dict.lookup.SnapshotManager;
import org.apache.kylin.dict.lookup.SnapshotTable;
import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.metadata.TableMetadataManager;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.ISourceAware;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TableExtDesc;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.rest.exception.BadRequestException;
import org.apache.kylin.rest.msg.Message;
import org.apache.kylin.rest.msg.MsgPicker;
import org.apache.kylin.rest.response.TableDescResponse;
import org.apache.kylin.rest.response.TableSnapshotResponse;
import org.apache.kylin.rest.util.AclEvaluate;
import org.apache.kylin.source.IReadableTable;
import org.apache.kylin.source.IReadableTable.TableSignature;
import org.apache.kylin.source.ISource;
import org.apache.kylin.source.ISourceMetadataExplorer;
import org.apache.kylin.source.SourceManager;
import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob;
import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.SetMultimap;
@Component("tableService")
public class TableService extends BasicService {
private static final Logger logger = LoggerFactory.getLogger(TableService.class);
@Autowired
@Qualifier("modelMgmtService")
private ModelService modelService;
@Autowired
@Qualifier("streamingMgmtService")
private StreamingService streamingService;
@Autowired
@Qualifier("kafkaMgmtService")
private KafkaConfigService kafkaConfigService;
@Autowired
private AclEvaluate aclEvaluate;
public List<TableDesc> getTableDescByProject(String project, boolean withExt) throws IOException {
aclEvaluate.checkProjectReadPermission(project);
List<TableDesc> tables = getProjectManager().listDefinedTables(project);
if (null == tables) {
return Collections.emptyList();
}
if (withExt) {
aclEvaluate.checkProjectWritePermission(project);
tables = cloneTableDesc(tables, project);
}
return tables;
}
public TableDesc getTableDescByName(String tableName, boolean withExt, String prj) {
aclEvaluate.checkProjectReadPermission(prj);
TableDesc table = getTableManager().getTableDesc(tableName, prj);
if (withExt) {
aclEvaluate.checkProjectWritePermission(prj);
table = cloneTableDesc(table, prj);
}
return table;
}
/**
* @return all loaded table names
* @throws Exception on error
*/
public String[] loadHiveTablesToProject(String[] hiveTables, String project) throws Exception {
aclEvaluate.checkProjectAdminPermission(project);
List<Pair<TableDesc, TableExtDesc>> allMeta = extractHiveTableMeta(hiveTables, project);
return loadTablesToProject(allMeta, project);
}
/**
* @return all loaded table names
* @throws Exception on error
*/
public String[] loadTableToProject(TableDesc tableDesc, TableExtDesc extDesc, String project) throws IOException {
return loadTablesToProject(Lists.newArrayList(Pair.newPair(tableDesc, extDesc)), project);
}
private String[] loadTablesToProject(List<Pair<TableDesc, TableExtDesc>> allMeta, String project)
throws IOException {
aclEvaluate.checkProjectAdminPermission(project);
// do schema check
TableMetadataManager metaMgr = getTableManager();
CubeManager cubeMgr = getCubeManager();
TableSchemaUpdateChecker checker = new TableSchemaUpdateChecker(metaMgr, cubeMgr, getDataModelManager());
for (Pair<TableDesc, TableExtDesc> pair : allMeta) {
TableDesc tableDesc = pair.getFirst();
TableSchemaUpdateChecker.CheckResult result = checker.allowReload(tableDesc, project);
result.raiseExceptionWhenInvalid();
}
// save table meta
List<String> saved = Lists.newArrayList();
for (Pair<TableDesc, TableExtDesc> pair : allMeta) {
TableDesc tableDesc = pair.getFirst();
TableExtDesc extDesc = pair.getSecond();
TableDesc origTable = metaMgr.getTableDesc(tableDesc.getIdentity(), project);
if (origTable == null || origTable.getProject() == null) {
tableDesc.setUuid(RandomUtil.randomUUID().toString());
tableDesc.setLastModified(0);
} else {
tableDesc.setUuid(origTable.getUuid());
tableDesc.setLastModified(origTable.getLastModified());
}
metaMgr.saveSourceTable(tableDesc, project);
if (extDesc != null) {
TableExtDesc origExt = metaMgr.getTableExt(tableDesc.getIdentity(), project);
if (origExt == null || origExt.getProject() == null) {
extDesc.setUuid(UUID.randomUUID().toString());
extDesc.setLastModified(0);
} else {
extDesc.setUuid(origExt.getUuid());
extDesc.setLastModified(origExt.getLastModified());
}
extDesc.init(project);
metaMgr.saveTableExt(extDesc, project);
}
saved.add(tableDesc.getIdentity());
}
String[] result = (String[]) saved.toArray(new String[saved.size()]);
addTableToProject(result, project);
return result;
}
public List<Pair<TableDesc, TableExtDesc>> extractHiveTableMeta(String[] tables, String project) throws Exception { // de-dup
SetMultimap<String, String> db2tables = LinkedHashMultimap.create();
for (String fullTableName : tables) {
String[] parts = HadoopUtil.parseHiveTableName(fullTableName);
db2tables.put(parts[0], parts[1]);
}
// load all tables first
List<Pair<TableDesc, TableExtDesc>> allMeta = Lists.newArrayList();
ProjectInstance projectInstance = getProjectManager().getProject(project);
ISourceMetadataExplorer explr = SourceManager.getSource(projectInstance).getSourceMetadataExplorer();
for (Map.Entry<String, String> entry : db2tables.entries()) {
Pair<TableDesc, TableExtDesc> pair = explr.loadTableMetadata(entry.getKey(), entry.getValue(), project);
TableDesc tableDesc = pair.getFirst();
Preconditions.checkState(tableDesc.getDatabase().equals(entry.getKey().toUpperCase(Locale.ROOT)));
Preconditions.checkState(tableDesc.getName().equals(entry.getValue().toUpperCase(Locale.ROOT)));
Preconditions.checkState(tableDesc.getIdentity()
.equals(entry.getKey().toUpperCase(Locale.ROOT) + "." + entry.getValue().toUpperCase(Locale.ROOT)));
TableExtDesc extDesc = pair.getSecond();
Preconditions.checkState(tableDesc.getIdentity().equals(extDesc.getIdentity()));
allMeta.add(pair);
}
return allMeta;
}
private void addTableToProject(String[] tables, String project) throws IOException {
getProjectManager().addTableDescToProject(tables, project);
}
protected void removeTableFromProject(String tableName, String projectName) throws IOException {
tableName = normalizeHiveTableName(tableName);
getProjectManager().removeTableDescFromProject(tableName, projectName);
}
/**
* table may referenced by several projects, and kylin only keep one copy of meta for each table,
* that's why we have two if statement here.
* @param tableName
* @param project
* @return
*/
public boolean unloadHiveTable(String tableName, String project) throws IOException {
aclEvaluate.checkProjectAdminPermission(project);
Message msg = MsgPicker.getMsg();
boolean rtn = false;
int tableType = 0;
tableName = normalizeHiveTableName(tableName);
TableDesc desc = getTableManager().getTableDesc(tableName, project);
// unload of legacy global table is not supported for now
if (desc == null || desc.getProject() == null) {
logger.warn("Unload Table {} in Project {} failed, could not find TableDesc or related Project", tableName,
project);
return false;
}
if (!modelService.isTableInModel(desc, project)) {
removeTableFromProject(tableName, project);
rtn = true;
} else {
List<String> models = modelService.getModelsUsingTable(desc, project);
throw new BadRequestException(String.format(Locale.ROOT, msg.getTABLE_IN_USE_BY_MODEL(), models));
}
// it is a project local table, ready to remove since no model is using it within the project
TableMetadataManager metaMgr = getTableManager();
metaMgr.removeTableExt(tableName, project);
metaMgr.removeSourceTable(tableName, project);
// remove streaming info
SourceManager sourceManager = SourceManager.getInstance(KylinConfig.getInstanceFromEnv());
ISource source = sourceManager.getCachedSource(desc);
source.unloadTable(tableName, project);
return rtn;
}
/**
*
* @param project
* @return
* @throws Exception
*/
public List<String> getSourceDbNames(String project) throws Exception {
ISourceMetadataExplorer explr = SourceManager.getInstance(getConfig()).getProjectSource(project)
.getSourceMetadataExplorer();
return explr.listDatabases();
}
/**
*
* @param project
* @param database
* @return
* @throws Exception
*/
public List<String> getSourceTableNames(String project, String database) throws Exception {
ISourceMetadataExplorer explr = SourceManager.getInstance(getConfig()).getProjectSource(project)
.getSourceMetadataExplorer();
return explr.listTables(database);
}
private TableDescResponse cloneTableDesc(TableDesc table, String prj) {
TableExtDesc tableExtDesc = getTableManager().getTableExt(table.getIdentity(), prj);
// Clone TableDesc
TableDescResponse rtableDesc = new TableDescResponse(table);
Map<String, Long> cardinality = new HashMap<String, Long>();
Map<String, String> dataSourceProp = new HashMap<>();
String scard = tableExtDesc.getCardinality();
if (!StringUtils.isEmpty(scard)) {
String[] cards = StringUtils.split(scard, ",");
ColumnDesc[] cdescs = rtableDesc.getColumns();
for (int i = 0; i < cdescs.length; i++) {
ColumnDesc columnDesc = cdescs[i];
if (cards.length > i) {
cardinality.put(columnDesc.getName(), Long.parseLong(cards[i]));
} else {
logger.error("The result cardinality is not identical with hive table metadata, cardinality : "
+ scard + " column array length: " + cdescs.length);
break;
}
}
rtableDesc.setCardinality(cardinality);
}
dataSourceProp.putAll(tableExtDesc.getDataSourceProp());
rtableDesc.setDescExd(dataSourceProp);
return rtableDesc;
}
private List<TableDesc> cloneTableDesc(List<TableDesc> tables, String prj) throws IOException {
List<TableDesc> descs = new ArrayList<TableDesc>();
Iterator<TableDesc> it = tables.iterator();
while (it.hasNext()) {
TableDesc table = it.next();
TableDescResponse rtableDesc = cloneTableDesc(table, prj);
descs.add(rtableDesc);
}
return descs;
}
public void calculateCardinalityIfNotPresent(String[] tables, String submitter, String prj) throws Exception {
// calculate cardinality for Hive source
ProjectInstance projectInstance = getProjectManager().getProject(prj);
if (projectInstance == null || projectInstance.getSourceType() != ISourceAware.ID_HIVE){
return;
}
TableMetadataManager metaMgr = getTableManager();
ExecutableManager exeMgt = ExecutableManager.getInstance(getConfig());
for (String table : tables) {
TableExtDesc tableExtDesc = metaMgr.getTableExt(table, prj);
String jobID = tableExtDesc.getJodID();
if (null == jobID || ExecutableState.RUNNING != exeMgt.getOutput(jobID).getState()) {
calculateCardinality(table, submitter, prj);
}
}
}
public void updateSnapshotLocalCache(String project, String tableName, String snapshotID) {
ExtTableSnapshotInfoManager snapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(getConfig());
ExtTableSnapshotInfo extTableSnapshotInfo = snapshotInfoManager.getSnapshot(tableName, snapshotID);
TableDesc tableDesc = getTableManager().getTableDesc(tableName, project);
if (extTableSnapshotInfo == null) {
throw new IllegalArgumentException(
"cannot find ext snapshot info for table:" + tableName + " snapshot:" + snapshotID);
}
LookupProviderFactory.rebuildLocalCache(tableDesc, extTableSnapshotInfo);
}
public void removeSnapshotLocalCache(String tableName, String snapshotID) {
ExtTableSnapshotInfoManager snapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(getConfig());
ExtTableSnapshotInfo extTableSnapshotInfo = snapshotInfoManager.getSnapshot(tableName, snapshotID);
if (extTableSnapshotInfo == null) {
throw new IllegalArgumentException(
"cannot find ext snapshot info for table:" + tableName + " snapshot:" + snapshotID);
}
LookupProviderFactory.removeLocalCache(extTableSnapshotInfo);
}
public String getSnapshotLocalCacheState(String tableName, String snapshotID) {
ExtTableSnapshotInfoManager snapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(getConfig());
ExtTableSnapshotInfo extTableSnapshotInfo = snapshotInfoManager.getSnapshot(tableName, snapshotID);
if (extTableSnapshotInfo == null) {
throw new IllegalArgumentException(
"cannot find ext snapshot info for table:" + tableName + " snapshot:" + snapshotID);
}
CacheState cacheState = LookupProviderFactory.getCacheState(extTableSnapshotInfo);
return cacheState.name();
}
public List<TableSnapshotResponse> getLookupTableSnapshots(String project, String tableName) throws IOException {
TableDesc tableDesc = getTableManager().getTableDesc(tableName, project);
IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc, null);
TableSignature signature = hiveTable.getSignature();
return internalGetLookupTableSnapshots(tableName, signature);
}
List<TableSnapshotResponse> internalGetLookupTableSnapshots(String tableName, TableSignature signature)
throws IOException {
ExtTableSnapshotInfoManager extSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(getConfig());
SnapshotManager snapshotManager = SnapshotManager.getInstance(getConfig());
List<ExtTableSnapshotInfo> extTableSnapshots = extSnapshotInfoManager.getSnapshots(tableName);
List<SnapshotTable> metaStoreTableSnapshots = snapshotManager.getSnapshots(tableName, signature);
Map<String, List<String>> snapshotUsageMap = getSnapshotUsages();
List<TableSnapshotResponse> result = Lists.newArrayList();
for (ExtTableSnapshotInfo extTableSnapshot : extTableSnapshots) {
TableSnapshotResponse response = new TableSnapshotResponse();
response.setSnapshotID(extTableSnapshot.getId());
response.setSnapshotType(TableSnapshotResponse.TYPE_EXT);
response.setLastBuildTime(extTableSnapshot.getLastBuildTime());
response.setStorageType(extTableSnapshot.getStorageType());
response.setSourceTableSize(extTableSnapshot.getSignature().getSize());
response.setSourceTableLastModifyTime(extTableSnapshot.getSignature().getLastModifiedTime());
response.setCubesAndSegmentsUsage(snapshotUsageMap.get(extTableSnapshot.getResourcePath()));
result.add(response);
}
for (SnapshotTable metaStoreTableSnapshot : metaStoreTableSnapshots) {
TableSnapshotResponse response = new TableSnapshotResponse();
response.setSnapshotID(metaStoreTableSnapshot.getId());
response.setSnapshotType(TableSnapshotResponse.TYPE_INNER);
response.setLastBuildTime(metaStoreTableSnapshot.getLastBuildTime());
response.setStorageType(SnapshotTable.STORAGE_TYPE_METASTORE);
response.setSourceTableSize(metaStoreTableSnapshot.getSignature().getSize());
response.setSourceTableLastModifyTime(metaStoreTableSnapshot.getSignature().getLastModifiedTime());
response.setCubesAndSegmentsUsage(snapshotUsageMap.get(metaStoreTableSnapshot.getResourcePath()));
result.add(response);
}
return result;
}
/**
* @return Map of SnapshotID, CubeName or SegmentName list
*/
private Map<String, List<String>> getSnapshotUsages() {
CubeManager cubeManager = CubeManager.getInstance(getConfig());
Map<String, List<String>> snapshotCubeSegmentMap = Maps.newHashMap();
for (CubeInstance cube : cubeManager.listAllCubes()) {
Collection<String> cubeSnapshots = cube.getSnapshots().values();
for (String cubeSnapshot : cubeSnapshots) {
List<String> usages = snapshotCubeSegmentMap.get(cubeSnapshot);
if (usages == null) {
usages = Lists.newArrayList();
snapshotCubeSegmentMap.put(cubeSnapshot, usages);
}
usages.add(cube.getName());
}
for (CubeSegment segment : cube.getSegments()) {
Collection<String> segmentSnapshots = segment.getSnapshotPaths();
for (String segmentSnapshot : segmentSnapshots) {
List<String> usages = snapshotCubeSegmentMap.get(segmentSnapshot);
if (usages == null) {
usages = Lists.newArrayList();
snapshotCubeSegmentMap.put(segmentSnapshot, usages);
}
usages.add(cube.getName() + ":" + segment.getName());
}
}
}
return snapshotCubeSegmentMap;
}
/**
* Generate cardinality for table This will trigger a hadoop job
* The result will be merged into table exd info
*
* @param tableName
*/
public void calculateCardinality(String tableName, String submitter, String prj) throws Exception {
aclEvaluate.checkProjectWritePermission(prj);
Message msg = MsgPicker.getMsg();
tableName = normalizeHiveTableName(tableName);
TableDesc table = getTableManager().getTableDesc(tableName, prj);
final TableExtDesc tableExt = getTableManager().getTableExt(tableName, prj);
if (table == null) {
BadRequestException e = new BadRequestException(
String.format(Locale.ROOT, msg.getTABLE_DESC_NOT_FOUND(), tableName));
logger.error("Cannot find table descriptor " + tableName, e);
throw e;
}
DefaultChainedExecutable job = new DefaultChainedExecutable();
//make sure the job could be scheduled when the DistributedScheduler is enable.
job.setParam("segmentId", tableName);
job.setName("Hive Column Cardinality calculation for table '" + tableName + "'");
job.setSubmitter(submitter);
String outPath = getConfig().getHdfsWorkingDirectory() + "cardinality/" + job.getId() + "/" + tableName;
String param = "-table " + tableName + " -output " + outPath + " -project " + prj;
MapReduceExecutable step1 = new MapReduceExecutable();
step1.setMapReduceJobClass(HiveColumnCardinalityJob.class);
step1.setMapReduceParams(param);
step1.setParam("segmentId", tableName);
job.addTask(step1);
HadoopShellExecutable step2 = new HadoopShellExecutable();
step2.setJobClass(HiveColumnCardinalityUpdateJob.class);
step2.setJobParams(param);
step2.setParam("segmentId", tableName);
job.addTask(step2);
tableExt.setJodID(job.getId());
getTableManager().saveTableExt(tableExt, prj);
getExecutableManager().addJob(job);
}
public String normalizeHiveTableName(String tableName) {
String[] dbTableName = HadoopUtil.parseHiveTableName(tableName);
return (dbTableName[0] + "." + dbTableName[1]).toUpperCase(Locale.ROOT);
}
}