blob: 85d9ce94cddc2cbaf3ab02532dee514d7744e22a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.tablefunction;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.SchemaTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.proc.FrontendsProcNode;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergMetadataCache;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.extensions.mtmv.MTMVJob;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.mtmv.MTMVPartitionUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.plsql.metastore.PlsqlManager;
import org.apache.doris.plsql.metastore.PlsqlProcedureKey;
import org.apache.doris.plsql.metastore.PlsqlStoredProcedure;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;
import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.TBackendsMetadataParams;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TFetchSchemaTableDataRequest;
import org.apache.doris.thrift.TFetchSchemaTableDataResult;
import org.apache.doris.thrift.TIcebergMetadataParams;
import org.apache.doris.thrift.TIcebergQueryType;
import org.apache.doris.thrift.TJobsMetadataParams;
import org.apache.doris.thrift.TMaterializedViewsMetadataParams;
import org.apache.doris.thrift.TMetadataTableRequestParams;
import org.apache.doris.thrift.TMetadataType;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TSchemaTableRequestParams;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTasksMetadataParams;
import org.apache.doris.thrift.TUserIdentity;
import com.google.common.base.Stopwatch;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import org.apache.iceberg.Snapshot;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.jetbrains.annotations.NotNull;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class MetadataGenerator {
private static final Logger LOG = LogManager.getLogger(MetadataGenerator.class);
private static final ImmutableMap<String, Integer> ACTIVE_QUERIES_COLUMN_TO_INDEX;
private static final ImmutableMap<String, Integer> WORKLOAD_GROUPS_COLUMN_TO_INDEX;
private static final ImmutableMap<String, Integer> ROUTINE_INFO_COLUMN_TO_INDEX;
private static final ImmutableMap<String, Integer> WORKLOAD_SCHED_POLICY_COLUMN_TO_INDEX;
static {
ImmutableMap.Builder<String, Integer> activeQueriesbuilder = new ImmutableMap.Builder();
List<Column> activeQueriesColList = SchemaTable.TABLE_MAP.get("active_queries").getFullSchema();
for (int i = 0; i < activeQueriesColList.size(); i++) {
activeQueriesbuilder.put(activeQueriesColList.get(i).getName().toLowerCase(), i);
}
ACTIVE_QUERIES_COLUMN_TO_INDEX = activeQueriesbuilder.build();
ImmutableMap.Builder<String, Integer> workloadGroupBuilder = new ImmutableMap.Builder();
for (int i = 0; i < WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.size(); i++) {
workloadGroupBuilder.put(WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.get(i).toLowerCase(), i);
}
WORKLOAD_GROUPS_COLUMN_TO_INDEX = workloadGroupBuilder.build();
ImmutableMap.Builder<String, Integer> routineInfoBuilder = new ImmutableMap.Builder();
for (int i = 0; i < PlsqlManager.ROUTINE_INFO_TITLE_NAMES.size(); i++) {
routineInfoBuilder.put(PlsqlManager.ROUTINE_INFO_TITLE_NAMES.get(i).toLowerCase(), i);
}
ROUTINE_INFO_COLUMN_TO_INDEX = routineInfoBuilder.build();
ImmutableMap.Builder<String, Integer> policyBuilder = new ImmutableMap.Builder();
List<Column> policyColList = SchemaTable.TABLE_MAP.get("workload_schedule_policy").getFullSchema();
for (int i = 0; i < policyColList.size(); i++) {
policyBuilder.put(policyColList.get(i).getName().toLowerCase(), i);
}
WORKLOAD_SCHED_POLICY_COLUMN_TO_INDEX = policyBuilder.build();
}
public static TFetchSchemaTableDataResult getMetadataTable(TFetchSchemaTableDataRequest request) throws TException {
if (LOG.isDebugEnabled()) {
LOG.debug("getMetadataTable() start.");
}
if (!request.isSetMetadaTableParams() || !request.getMetadaTableParams().isSetMetadataType()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Metadata table params is not set.");
}
return errorResult("Metadata table params is not set. ");
}
TFetchSchemaTableDataResult result;
TMetadataTableRequestParams params = request.getMetadaTableParams();
switch (request.getMetadaTableParams().getMetadataType()) {
case ICEBERG:
result = icebergMetadataResult(params);
break;
case BACKENDS:
result = backendsMetadataResult(params);
break;
case FRONTENDS:
result = frontendsMetadataResult(params);
break;
case FRONTENDS_DISKS:
result = frontendsDisksMetadataResult(params);
break;
case CATALOGS:
result = catalogsMetadataResult(params);
break;
case MATERIALIZED_VIEWS:
result = mtmvMetadataResult(params);
break;
case JOBS:
result = jobMetadataResult(params);
break;
case TASKS:
result = taskMetadataResult(params);
break;
default:
return errorResult("Metadata table params is not set.");
}
if (result.getStatus().getStatusCode() == TStatusCode.OK) {
filterColumns(result, params.getColumnsName(), params.getMetadataType(), params);
}
if (LOG.isDebugEnabled()) {
LOG.debug("getMetadataTable() end.");
}
return result;
}
public static TFetchSchemaTableDataResult getSchemaTableData(TFetchSchemaTableDataRequest request)
throws TException {
if (!request.isSetSchemaTableParams()) {
return errorResult("schema table params is not set.");
}
TFetchSchemaTableDataResult result;
TSchemaTableRequestParams schemaTableParams = request.getSchemaTableParams();
ImmutableMap<String, Integer> columnIndex;
switch (request.getSchemaTableName()) {
case ACTIVE_QUERIES:
result = queriesMetadataResult(schemaTableParams, request);
columnIndex = ACTIVE_QUERIES_COLUMN_TO_INDEX;
break;
case WORKLOAD_GROUPS:
result = workloadGroupsMetadataResult(schemaTableParams);
columnIndex = WORKLOAD_GROUPS_COLUMN_TO_INDEX;
break;
case ROUTINES_INFO:
result = routineInfoMetadataResult(schemaTableParams);
columnIndex = ROUTINE_INFO_COLUMN_TO_INDEX;
break;
case WORKLOAD_SCHEDULE_POLICY:
result = workloadSchedPolicyMetadataResult(schemaTableParams);
columnIndex = WORKLOAD_SCHED_POLICY_COLUMN_TO_INDEX;
break;
default:
return errorResult("invalid schema table name.");
}
if (schemaTableParams.isSetColumnsName() && result.getStatus().getStatusCode() == TStatusCode.OK) {
filterColumns(result, schemaTableParams.getColumnsName(), columnIndex);
}
return result;
}
@NotNull
public static TFetchSchemaTableDataResult errorResult(String msg) {
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
result.setStatus(new TStatus(TStatusCode.INTERNAL_ERROR));
result.status.addToErrorMsgs(msg);
return result;
}
private static TFetchSchemaTableDataResult icebergMetadataResult(TMetadataTableRequestParams params) {
if (!params.isSetIcebergMetadataParams()) {
return errorResult("Iceberg metadata params is not set.");
}
TIcebergMetadataParams icebergMetadataParams = params.getIcebergMetadataParams();
TIcebergQueryType icebergQueryType = icebergMetadataParams.getIcebergQueryType();
IcebergMetadataCache icebergMetadataCache = Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache();
List<TRow> dataBatch = Lists.newArrayList();
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
switch (icebergQueryType) {
case SNAPSHOTS:
List<Snapshot> snapshotList;
try {
snapshotList = icebergMetadataCache.getSnapshotList(icebergMetadataParams);
} catch (UserException e) {
return errorResult(e.getMessage());
}
for (Snapshot snapshot : snapshotList) {
TRow trow = new TRow();
LocalDateTime committedAt = LocalDateTime.ofInstant(Instant.ofEpochMilli(
snapshot.timestampMillis()), TimeUtils.getTimeZone().toZoneId());
long encodedDatetime = convertToDateTimeV2(committedAt.getYear(), committedAt.getMonthValue(),
committedAt.getDayOfMonth(), committedAt.getHour(), committedAt.getMinute(),
committedAt.getSecond(), committedAt.getNano() / 1000);
trow.addToColumnValue(new TCell().setLongVal(encodedDatetime));
trow.addToColumnValue(new TCell().setLongVal(snapshot.snapshotId()));
if (snapshot.parentId() == null) {
trow.addToColumnValue(new TCell().setLongVal(-1L));
} else {
trow.addToColumnValue(new TCell().setLongVal(snapshot.parentId()));
}
trow.addToColumnValue(new TCell().setStringVal(snapshot.operation()));
trow.addToColumnValue(new TCell().setStringVal(snapshot.manifestListLocation()));
trow.addToColumnValue(new TCell().setStringVal(new Gson().toJson(snapshot.summary())));
dataBatch.add(trow);
}
break;
default:
return errorResult("Unsupported iceberg inspect type: " + icebergQueryType);
}
result.setDataBatch(dataBatch);
result.setStatus(new TStatus(TStatusCode.OK));
return result;
}
private static TFetchSchemaTableDataResult backendsMetadataResult(TMetadataTableRequestParams params) {
if (!params.isSetBackendsMetadataParams()) {
return errorResult("backends metadata param is not set.");
}
TBackendsMetadataParams backendsParam = params.getBackendsMetadataParams();
final SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
List<Long> backendIds = systemInfoService.getAllBackendIds(false);
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
long start = System.currentTimeMillis();
Stopwatch watch = Stopwatch.createUnstarted();
List<TRow> dataBatch = Lists.newArrayList();
for (long backendId : backendIds) {
Backend backend = systemInfoService.getBackend(backendId);
if (backend == null) {
continue;
}
watch.start();
Integer tabletNum = Env.getCurrentInvertedIndex().getTabletNumByBackendId(backendId);
watch.stop();
TRow trow = new TRow();
trow.addToColumnValue(new TCell().setLongVal(backendId));
trow.addToColumnValue(new TCell().setStringVal(backend.getHost()));
if (Strings.isNullOrEmpty(backendsParam.cluster_name)) {
trow.addToColumnValue(new TCell().setIntVal(backend.getHeartbeatPort()));
trow.addToColumnValue(new TCell().setIntVal(backend.getBePort()));
trow.addToColumnValue(new TCell().setIntVal(backend.getHttpPort()));
trow.addToColumnValue(new TCell().setIntVal(backend.getBrpcPort()));
trow.addToColumnValue(new TCell().setIntVal(backend.getArrowFlightSqlPort()));
}
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(backend.getLastStartTime())));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(backend.getLastUpdateMs())));
trow.addToColumnValue(new TCell().setBoolVal(backend.isAlive()));
trow.addToColumnValue(new TCell().setBoolVal(backend.isDecommissioned()));
trow.addToColumnValue(new TCell().setLongVal(tabletNum));
// capacity
// data used
trow.addToColumnValue(new TCell().setLongVal(backend.getDataUsedCapacityB()));
// trash used
trow.addToColumnValue(new TCell().setLongVal(backend.getTrashUsedCapacityB()));
// available
long availB = backend.getAvailableCapacityB();
trow.addToColumnValue(new TCell().setLongVal(availB));
// total
long totalB = backend.getTotalCapacityB();
trow.addToColumnValue(new TCell().setLongVal(totalB));
// used percent
double used = 0.0;
if (totalB <= 0) {
used = 0.0;
} else {
used = (double) (totalB - availB) * 100 / totalB;
}
trow.addToColumnValue(new TCell().setDoubleVal(used));
trow.addToColumnValue(new TCell().setDoubleVal(backend.getMaxDiskUsedPct() * 100));
// remote used capacity
trow.addToColumnValue(new TCell().setLongVal(backend.getRemoteUsedCapacityB()));
// tags
trow.addToColumnValue(new TCell().setStringVal(backend.getTagMapString()));
// err msg
trow.addToColumnValue(new TCell().setStringVal(backend.getHeartbeatErrMsg()));
// version
trow.addToColumnValue(new TCell().setStringVal(backend.getVersion()));
// status
trow.addToColumnValue(new TCell().setStringVal(new Gson().toJson(backend.getBackendStatus())));
// heartbeat failure counter
trow.addToColumnValue(new TCell().setIntVal(backend.getHeartbeatFailureCounter()));
// node role, show the value only when backend is alive.
trow.addToColumnValue(new TCell().setStringVal(backend.isAlive() ? backend.getNodeRoleTag().value : ""));
dataBatch.add(trow);
}
// backends proc node get result too slow, add log to observer.
if (LOG.isDebugEnabled()) {
LOG.debug("backends proc get tablet num cost: {}, total cost: {}",
watch.elapsed(TimeUnit.MILLISECONDS), (System.currentTimeMillis() - start));
}
result.setDataBatch(dataBatch);
result.setStatus(new TStatus(TStatusCode.OK));
return result;
}
private static TFetchSchemaTableDataResult frontendsMetadataResult(TMetadataTableRequestParams params) {
if (!params.isSetFrontendsMetadataParams()) {
return errorResult("frontends metadata param is not set.");
}
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
List<TRow> dataBatch = Lists.newArrayList();
List<List<String>> infos = Lists.newArrayList();
FrontendsProcNode.getFrontendsInfo(Env.getCurrentEnv(), infos);
for (List<String> info : infos) {
TRow trow = new TRow();
for (String item : info) {
trow.addToColumnValue(new TCell().setStringVal(item));
}
dataBatch.add(trow);
}
result.setDataBatch(dataBatch);
result.setStatus(new TStatus(TStatusCode.OK));
return result;
}
private static TFetchSchemaTableDataResult frontendsDisksMetadataResult(TMetadataTableRequestParams params) {
if (!params.isSetFrontendsMetadataParams()) {
return errorResult("frontends metadata param is not set.");
}
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
List<TRow> dataBatch = Lists.newArrayList();
List<List<String>> infos = Lists.newArrayList();
FrontendsProcNode.getFrontendsDiskInfo(Env.getCurrentEnv(), infos);
for (List<String> info : infos) {
TRow trow = new TRow();
for (String item : info) {
trow.addToColumnValue(new TCell().setStringVal(item));
}
dataBatch.add(trow);
}
result.setDataBatch(dataBatch);
result.setStatus(new TStatus(TStatusCode.OK));
return result;
}
private static TFetchSchemaTableDataResult catalogsMetadataResult(TMetadataTableRequestParams params) {
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
List<CatalogIf> info = Env.getCurrentEnv().getCatalogMgr().listCatalogs();
List<TRow> dataBatch = Lists.newArrayList();
for (CatalogIf catalog : info) {
TRow trow = new TRow();
trow.addToColumnValue(new TCell().setLongVal(catalog.getId()));
trow.addToColumnValue(new TCell().setStringVal(catalog.getName()));
trow.addToColumnValue(new TCell().setStringVal(catalog.getType()));
Map<String, String> properties = catalog.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
TRow subTrow = new TRow(trow);
subTrow.addToColumnValue(new TCell().setStringVal(entry.getKey()));
subTrow.addToColumnValue(new TCell().setStringVal(entry.getValue()));
dataBatch.add(subTrow);
}
if (properties.isEmpty()) {
trow.addToColumnValue(new TCell().setStringVal("NULL"));
trow.addToColumnValue(new TCell().setStringVal("NULL"));
dataBatch.add(trow);
}
}
result.setDataBatch(dataBatch);
result.setStatus(new TStatus(TStatusCode.OK));
return result;
}
private static TFetchSchemaTableDataResult workloadGroupsMetadataResult(TSchemaTableRequestParams params) {
if (!params.isSetCurrentUserIdent()) {
return errorResult("current user ident is not set.");
}
TUserIdentity tcurrentUserIdentity = params.getCurrentUserIdent();
List<List<String>> workloadGroupsInfo = Env.getCurrentEnv().getWorkloadGroupMgr()
.getResourcesInfo(tcurrentUserIdentity);
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
List<TRow> dataBatch = Lists.newArrayList();
for (List<String> rGroupsInfo : workloadGroupsInfo) {
TRow trow = new TRow();
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(0)))); // id
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(1))); // name
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(2)))); // cpu_share
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(3))); // mem_limit
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(4))); // mem overcommit
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(5)))); // max concurrent
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(6)))); // max queue size
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(7)))); // queue timeout
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(8))); // cpu hard limit
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(9)))); // scan thread num
// max remote scan thread num
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(10))));
// min remote scan thread num
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(11))));
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(12))); // spill low watermark
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(13))); // spill high watermark
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(14))); // tag
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(15))); // running query num
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(16))); // waiting query num
dataBatch.add(trow);
}
result.setDataBatch(dataBatch);
result.setStatus(new TStatus(TStatusCode.OK));
return result;
}
private static TFetchSchemaTableDataResult workloadSchedPolicyMetadataResult(TSchemaTableRequestParams params) {
if (!params.isSetCurrentUserIdent()) {
return errorResult("current user ident is not set.");
}
TUserIdentity tcurrentUserIdentity = params.getCurrentUserIdent();
List<List<String>> workloadPolicyList = Env.getCurrentEnv().getWorkloadSchedPolicyMgr()
.getWorkloadSchedPolicyTvfInfo(tcurrentUserIdentity);
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
List<TRow> dataBatch = Lists.newArrayList();
for (List<String> policyRow : workloadPolicyList) {
TRow trow = new TRow();
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(policyRow.get(0)))); // id
trow.addToColumnValue(new TCell().setStringVal(policyRow.get(1))); // name
trow.addToColumnValue(new TCell().setStringVal(policyRow.get(2))); // condition
trow.addToColumnValue(new TCell().setStringVal(policyRow.get(3))); // action
trow.addToColumnValue(new TCell().setIntVal(Integer.valueOf(policyRow.get(4)))); // priority
trow.addToColumnValue(new TCell().setBoolVal(Boolean.valueOf(policyRow.get(5)))); // enabled
trow.addToColumnValue(new TCell().setIntVal(Integer.valueOf(policyRow.get(6)))); // version
trow.addToColumnValue(new TCell().setStringVal(policyRow.get(7))); // workload group id
dataBatch.add(trow);
}
result.setDataBatch(dataBatch);
result.setStatus(new TStatus(TStatusCode.OK));
return result;
}
private static TFetchSchemaTableDataResult queriesMetadataResult(TSchemaTableRequestParams tSchemaTableParams,
TFetchSchemaTableDataRequest parentRequest) {
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
String selfNode = Env.getCurrentEnv().getSelfNode().getHost();
if (ConnectContext.get() != null && !Strings.isNullOrEmpty(ConnectContext.get().getCurrentConnectedFEIp())) {
selfNode = ConnectContext.get().getCurrentConnectedFEIp();
}
selfNode = NetUtils.getHostnameByIp(selfNode);
List<TRow> dataBatch = Lists.newArrayList();
Map<String, QueryInfo> queryInfoMap = QeProcessorImpl.INSTANCE.getQueryInfoMap();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
for (Map.Entry<String, QueryInfo> entry : queryInfoMap.entrySet()) {
String queryId = entry.getKey();
QueryInfo queryInfo = entry.getValue();
TRow trow = new TRow();
trow.addToColumnValue(new TCell().setStringVal(queryId));
long queryStartTime = queryInfo.getStartExecTime();
if (queryStartTime > 0) {
trow.addToColumnValue(new TCell().setStringVal(sdf.format(new Date(queryStartTime))));
trow.addToColumnValue(
new TCell().setLongVal(System.currentTimeMillis() - queryInfo.getStartExecTime()));
} else {
trow.addToColumnValue(new TCell());
trow.addToColumnValue(new TCell().setLongVal(-1));
}
List<TPipelineWorkloadGroup> tgroupList = queryInfo.getCoord().gettWorkloadGroups();
if (tgroupList != null && tgroupList.size() == 1) {
trow.addToColumnValue(new TCell().setLongVal(tgroupList.get(0).id));
} else {
trow.addToColumnValue(new TCell().setLongVal(-1));
}
if (queryInfo.getConnectContext() != null) {
trow.addToColumnValue(new TCell().setStringVal(queryInfo.getConnectContext().getDatabase()));
} else {
trow.addToColumnValue(new TCell().setStringVal(""));
}
trow.addToColumnValue(new TCell().setStringVal(selfNode));
long queueStartTime = queryInfo.getQueueStartTime();
if (queueStartTime > 0) {
trow.addToColumnValue(new TCell().setStringVal(sdf.format(new Date(queueStartTime))));
} else {
trow.addToColumnValue(new TCell());
}
long queueEndTime = queryInfo.getQueueEndTime();
if (queueEndTime > 0) {
trow.addToColumnValue(new TCell().setStringVal(sdf.format(new Date(queueEndTime))));
} else {
trow.addToColumnValue(new TCell());
}
TokenState tokenState = queryInfo.getQueueStatus();
if (tokenState == null) {
trow.addToColumnValue(new TCell());
} else if (tokenState == TokenState.READY_TO_RUN) {
trow.addToColumnValue(new TCell().setStringVal("RUNNING"));
} else {
trow.addToColumnValue(new TCell().setStringVal("QUEUED"));
}
trow.addToColumnValue(new TCell().setStringVal(queryInfo.getSql()));
dataBatch.add(trow);
}
/* Get the query results from other FE also */
if (tSchemaTableParams.isReplayToOtherFe()) {
TSchemaTableRequestParams replaySchemaTableParams = new TSchemaTableRequestParams(tSchemaTableParams);
replaySchemaTableParams.setReplayToOtherFe(false);
TFetchSchemaTableDataRequest replayFetchSchemaTableReq = new TFetchSchemaTableDataRequest(parentRequest);
replayFetchSchemaTableReq.setSchemaTableParams(replaySchemaTableParams);
List<TFetchSchemaTableDataResult> relayResults = forwardToOtherFrontends(replayFetchSchemaTableReq);
relayResults
.forEach(rs -> rs.getDataBatch()
.forEach(row -> dataBatch.add(row)));
}
result.setDataBatch(dataBatch);
result.setStatus(new TStatus(TStatusCode.OK));
return result;
}
private static List<TFetchSchemaTableDataResult> forwardToOtherFrontends(TFetchSchemaTableDataRequest request) {
List<TFetchSchemaTableDataResult> results = new ArrayList<>();
List<Pair<String, Integer>> frontends = FrontendsProcNode.getFrontendWithRpcPort(Env.getCurrentEnv(), false);
FrontendService.Client client = null;
int waitTimeOut = ConnectContext.get() == null ? 300 : ConnectContext.get().getExecTimeout();
for (Pair<String, Integer> fe : frontends) {
TNetworkAddress thriftAddress = new TNetworkAddress(fe.key(), fe.value());
try {
client = ClientPool.frontendPool.borrowObject(thriftAddress, waitTimeOut * 1000);
} catch (Exception e) {
LOG.warn("Failed to get frontend {} client. exception: {}", fe.key(), e);
continue;
}
boolean isReturnToPool = false;
try {
TFetchSchemaTableDataResult result = client.fetchSchemaTableData(request);
results.add(result);
isReturnToPool = true;
} catch (Exception e) {
LOG.warn("Failed to finish forward fetch operation to fe: {} . exception: {}", fe.key(), e);
} finally {
if (isReturnToPool) {
ClientPool.frontendPool.returnObject(thriftAddress, client);
} else {
ClientPool.frontendPool.invalidateObject(thriftAddress, client);
}
}
}
return results;
}
private static void filterColumns(TFetchSchemaTableDataResult result,
List<String> columnNames, TMetadataType type, TMetadataTableRequestParams params) throws TException {
if (LOG.isDebugEnabled()) {
LOG.debug("filterColumns() start.");
}
List<TRow> fullColumnsRow = result.getDataBatch();
List<TRow> filterColumnsRows = Lists.newArrayList();
for (TRow row : fullColumnsRow) {
TRow filterRow = new TRow();
try {
for (String columnName : columnNames) {
Integer index = MetadataTableValuedFunction.getColumnIndexFromColumnName(type, columnName, params);
filterRow.addToColumnValue(row.getColumnValue().get(index));
}
} catch (AnalysisException e) {
throw new TException(e);
}
filterColumnsRows.add(filterRow);
}
result.setDataBatch(filterColumnsRows);
if (LOG.isDebugEnabled()) {
LOG.debug("filterColumns() end.");
}
}
private static void filterColumns(TFetchSchemaTableDataResult result,
List<String> columnNames,
ImmutableMap<String, Integer> columnIndex) throws TException {
List<TRow> fullColumnsRow = result.getDataBatch();
List<TRow> filterColumnsRows = Lists.newArrayList();
for (TRow row : fullColumnsRow) {
TRow filterRow = new TRow();
try {
for (String columnName : columnNames) {
Integer index = columnIndex.get(columnName.toLowerCase());
filterRow.addToColumnValue(row.getColumnValue().get(index));
}
} catch (Throwable e) {
LOG.info("error happens when filter columns.", e);
throw new TException(e);
}
filterColumnsRows.add(filterRow);
}
result.setDataBatch(filterColumnsRows);
}
private static long convertToDateTimeV2(
int year, int month, int day, int hour, int minute, int second, int microsecond) {
return (long) microsecond | (long) second << 20 | (long) minute << 26 | (long) hour << 32
| (long) day << 37 | (long) month << 42 | (long) year << 46;
}
private static TFetchSchemaTableDataResult mtmvMetadataResult(TMetadataTableRequestParams params) {
if (LOG.isDebugEnabled()) {
LOG.debug("mtmvMetadataResult() start");
}
if (!params.isSetMaterializedViewsMetadataParams()) {
if (LOG.isDebugEnabled()) {
LOG.debug("MaterializedViews metadata params is not set.");
}
return errorResult("MaterializedViews metadata params is not set.");
}
TMaterializedViewsMetadataParams mtmvMetadataParams = params.getMaterializedViewsMetadataParams();
String dbName = mtmvMetadataParams.getDatabase();
if (LOG.isDebugEnabled()) {
LOG.debug("dbName: " + dbName);
}
TUserIdentity currentUserIdent = mtmvMetadataParams.getCurrentUserIdent();
UserIdentity userIdentity = UserIdentity.fromThrift(currentUserIdent);
if (LOG.isDebugEnabled()) {
LOG.debug("userIdentity: " + userIdentity);
}
List<TRow> dataBatch = Lists.newArrayList();
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
List<Table> tables;
try {
tables = Env.getCurrentEnv().getCatalogMgr()
.getCatalogOrAnalysisException(InternalCatalog.INTERNAL_CATALOG_NAME)
.getDbOrAnalysisException(dbName).getTables();
} catch (AnalysisException e) {
LOG.warn(e.getMessage());
return errorResult(e.getMessage());
}
for (Table table : tables) {
if (table instanceof MTMV) {
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(userIdentity, InternalCatalog.INTERNAL_CATALOG_NAME,
table.getQualifiedDbName(), table.getName(),
PrivPredicate.SHOW)) {
continue;
}
MTMV mv = (MTMV) table;
if (LOG.isDebugEnabled()) {
LOG.debug("mv: " + mv);
}
TRow trow = new TRow();
trow.addToColumnValue(new TCell().setLongVal(mv.getId()));
trow.addToColumnValue(new TCell().setStringVal(mv.getName()));
trow.addToColumnValue(new TCell().setStringVal(mv.getJobInfo().getJobName()));
trow.addToColumnValue(new TCell().setStringVal(mv.getStatus().getState().name()));
trow.addToColumnValue(new TCell().setStringVal(mv.getStatus().getSchemaChangeDetail()));
trow.addToColumnValue(new TCell().setStringVal(mv.getStatus().getRefreshState().name()));
trow.addToColumnValue(new TCell().setStringVal(mv.getRefreshInfo().toString()));
trow.addToColumnValue(new TCell().setStringVal(mv.getQuerySql()));
trow.addToColumnValue(new TCell().setStringVal(mv.getEnvInfo().toString()));
trow.addToColumnValue(new TCell().setStringVal(mv.getMvProperties().toString()));
trow.addToColumnValue(new TCell().setStringVal(mv.getMvPartitionInfo().toNameString()));
trow.addToColumnValue(new TCell().setBoolVal(MTMVPartitionUtil.isMTMVSync(mv)));
if (LOG.isDebugEnabled()) {
LOG.debug("mvend: " + mv.getName());
}
dataBatch.add(trow);
}
}
result.setDataBatch(dataBatch);
result.setStatus(new TStatus(TStatusCode.OK));
if (LOG.isDebugEnabled()) {
LOG.debug("mtmvMetadataResult() end");
}
return result;
}
private static TFetchSchemaTableDataResult jobMetadataResult(TMetadataTableRequestParams params) {
if (!params.isSetJobsMetadataParams()) {
return errorResult("Jobs metadata params is not set.");
}
TJobsMetadataParams jobsMetadataParams = params.getJobsMetadataParams();
String type = jobsMetadataParams.getType();
JobType jobType = JobType.valueOf(type);
TUserIdentity currentUserIdent = jobsMetadataParams.getCurrentUserIdent();
UserIdentity userIdentity = UserIdentity.fromThrift(currentUserIdent);
List<TRow> dataBatch = Lists.newArrayList();
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
List<org.apache.doris.job.base.AbstractJob> jobList = Env.getCurrentEnv().getJobManager().queryJobs(jobType);
for (org.apache.doris.job.base.AbstractJob job : jobList) {
if (job instanceof MTMVJob) {
MTMVJob mtmvJob = (MTMVJob) job;
if (!mtmvJob.hasPriv(userIdentity, PrivPredicate.SHOW)) {
continue;
}
}
dataBatch.add(job.getTvfInfo());
}
result.setDataBatch(dataBatch);
result.setStatus(new TStatus(TStatusCode.OK));
return result;
}
private static TFetchSchemaTableDataResult taskMetadataResult(TMetadataTableRequestParams params) {
if (!params.isSetTasksMetadataParams()) {
return errorResult("Tasks metadata params is not set.");
}
TTasksMetadataParams tasksMetadataParams = params.getTasksMetadataParams();
String type = tasksMetadataParams.getType();
JobType jobType = JobType.valueOf(type);
TUserIdentity currentUserIdent = tasksMetadataParams.getCurrentUserIdent();
UserIdentity userIdentity = UserIdentity.fromThrift(currentUserIdent);
List<TRow> dataBatch = Lists.newArrayList();
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
List<org.apache.doris.job.base.AbstractJob> jobList = Env.getCurrentEnv().getJobManager().queryJobs(jobType);
for (org.apache.doris.job.base.AbstractJob job : jobList) {
if (job instanceof MTMVJob) {
MTMVJob mtmvJob = (MTMVJob) job;
if (!mtmvJob.hasPriv(userIdentity, PrivPredicate.SHOW)) {
continue;
}
}
List<AbstractTask> tasks = job.queryAllTasks();
for (AbstractTask task : tasks) {
TRow tvfInfo = task.getTvfInfo();
if (tvfInfo != null) {
dataBatch.add(tvfInfo);
}
}
}
result.setDataBatch(dataBatch);
result.setStatus(new TStatus(TStatusCode.OK));
return result;
}
private static TFetchSchemaTableDataResult routineInfoMetadataResult(TSchemaTableRequestParams params) {
if (!params.isSetCurrentUserIdent()) {
return errorResult("current user ident is not set.");
}
PlsqlManager plSqlClient = Env.getCurrentEnv().getPlsqlManager();
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
List<TRow> dataBatch = Lists.newArrayList();
Map<PlsqlProcedureKey, PlsqlStoredProcedure> allProc = plSqlClient.getAllPlsqlStoredProcedures();
for (Map.Entry<PlsqlProcedureKey, PlsqlStoredProcedure> entry : allProc.entrySet()) {
PlsqlStoredProcedure proc = entry.getValue();
TRow trow = new TRow();
trow.addToColumnValue(new TCell().setStringVal(proc.getName())); // SPECIFIC_NAME
trow.addToColumnValue(new TCell().setStringVal(Long.toString(proc.getCatalogId()))); // ROUTINE_CATALOG
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(proc.getCatalogId());
if (catalog != null) {
DatabaseIf db = catalog.getDbNullable(proc.getDbId());
if (db != null) {
trow.addToColumnValue(new TCell().setStringVal(db.getFullName())); // ROUTINE_SCHEMA
} else {
trow.addToColumnValue(new TCell().setStringVal("")); // ROUTINE_SCHEMA
}
} else {
trow.addToColumnValue(new TCell().setStringVal("")); // ROUTINE_SCHEMA
}
trow.addToColumnValue(new TCell().setStringVal(proc.getName())); // ROUTINE_NAME
trow.addToColumnValue(new TCell().setStringVal("PROCEDURE")); // ROUTINE_TYPE
trow.addToColumnValue(new TCell().setStringVal("")); // DTD_IDENTIFIER
trow.addToColumnValue(new TCell().setStringVal(proc.getSource())); // ROUTINE_BODY
trow.addToColumnValue(new TCell().setStringVal("")); // ROUTINE_DEFINITION
trow.addToColumnValue(new TCell().setStringVal("NULL")); // EXTERNAL_NAME
trow.addToColumnValue(new TCell().setStringVal("")); // EXTERNAL_LANGUAGE
trow.addToColumnValue(new TCell().setStringVal("SQL")); // PARAMETER_STYLE
trow.addToColumnValue(new TCell().setStringVal("")); // IS_DETERMINISTIC
trow.addToColumnValue(new TCell().setStringVal("")); // SQL_DATA_ACCESS
trow.addToColumnValue(new TCell().setStringVal("NULL")); // SQL_PATH
trow.addToColumnValue(new TCell().setStringVal("DEFINER")); // SECURITY_TYPE
trow.addToColumnValue(new TCell().setStringVal(proc.getCreateTime())); // CREATED
trow.addToColumnValue(new TCell().setStringVal(proc.getModifyTime())); // LAST_ALTERED
trow.addToColumnValue(new TCell().setStringVal("")); // SQ_MODE
trow.addToColumnValue(new TCell().setStringVal("")); // ROUTINE_COMMENT
trow.addToColumnValue(new TCell().setStringVal(proc.getOwnerName())); // DEFINER
trow.addToColumnValue(new TCell().setStringVal("")); // CHARACTER_SET_CLIENT
trow.addToColumnValue(new TCell().setStringVal("")); // COLLATION_CONNECTION
trow.addToColumnValue(new TCell().setStringVal("")); // DATABASE_COLLATION
dataBatch.add(trow);
}
result.setDataBatch(dataBatch);
result.setStatus(new TStatus(TStatusCode.OK));
return result;
}
}