blob: 51d0cba532473bb54eb2e6d8d8c3fc093caaf80c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.stream.server.rest.controller;
import java.util.List;
import java.util.Set;
import org.apache.commons.codec.binary.Base64;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.SetThreadName;
import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.gridtable.StorageSideBehavior;
import org.apache.kylin.metadata.filter.StringCodeSystem;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.filter.TupleFilterSerializer;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.stream.core.exception.StreamingException;
import org.apache.kylin.stream.core.model.DataRequest;
import org.apache.kylin.stream.core.model.DataResponse;
import org.apache.kylin.stream.core.query.IStreamingSearchResult;
import org.apache.kylin.stream.core.query.StreamingCubeDataSearcher;
import org.apache.kylin.stream.core.query.StreamingSearchContext;
import org.apache.kylin.stream.core.query.StreamingQueryProfile;
import org.apache.kylin.stream.core.storage.StreamingSegmentManager;
import org.apache.kylin.stream.core.storage.Record;
import org.apache.kylin.stream.core.util.RecordsSerializer;
import org.apache.kylin.stream.server.StreamingServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Sets;
@Controller
@RequestMapping(value = "/data")
public class DataController extends BasicController {
private static final Logger logger = LoggerFactory.getLogger(DataController.class);
private StreamingServer streamingServer;
public DataController() {
streamingServer = StreamingServer.getInstance();
}
@RequestMapping(value = "/query", method = RequestMethod.POST, produces = { "application/json" })
@ResponseBody
public DataResponse query(@RequestBody DataRequest dataRequest) {
IStreamingSearchResult searchResult = null;
String queryId = dataRequest.getQueryId();
StreamingQueryProfile queryProfile = new StreamingQueryProfile(queryId, dataRequest.getRequestSendTime());
if (dataRequest.isEnableDetailProfile()) {
queryProfile.enableDetailProfile();
}
if (dataRequest.getStorageBehavior() != null) {
queryProfile.setStorageBehavior(StorageSideBehavior.valueOf(dataRequest.getStorageBehavior()));
}
StreamingQueryProfile.set(queryProfile);
logger.info("receive query request queryId:{}", queryId);
try (SetThreadName changeName = new SetThreadName("Query %s", queryId)) {
final Stopwatch sw = new Stopwatch();
sw.start();
String cubeName = dataRequest.getCubeName();
long minSegmentTime = dataRequest.getMinSegmentTime();
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
CubeDesc cubeDesc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
Set<FunctionDesc> metrics = convertMetrics(cubeDesc, dataRequest.getMetrics());
byte[] tupleFilterBytes = Base64.decodeBase64(dataRequest.getTupleFilter());
TupleFilter tupleFilter = TupleFilterSerializer.deserialize(tupleFilterBytes, StringCodeSystem.INSTANCE);
TupleFilter havingFilter = null;
if (dataRequest.getHavingFilter() != null) {
byte[] havingFilterBytes = Base64.decodeBase64(dataRequest.getHavingFilter());
havingFilter = TupleFilterSerializer.deserialize(havingFilterBytes, StringCodeSystem.INSTANCE);
}
Set<TblColRef> dimensions = convertToTblColRef(dataRequest.getDimensions(), cubeDesc);
Set<TblColRef> groups = convertToTblColRef(dataRequest.getGroups(), cubeDesc);
StreamingSegmentManager segmentManager = streamingServer.getStreamingSegmentManager(cubeName);
StreamingCubeDataSearcher dataSearcher = segmentManager.getSearcher();
StreamingSearchContext gtSearchRequest = new StreamingSearchContext(cubeDesc, dimensions, groups,
metrics, tupleFilter, havingFilter);
gtSearchRequest.setDeadline(dataRequest.getDeadline());
searchResult = dataSearcher.doSearch(gtSearchRequest, minSegmentTime,
dataRequest.isAllowStorageAggregation());
if (StorageSideBehavior.RAW_SCAN == queryProfile.getStorageBehavior()) {
long counter = 0;
for (Record record : searchResult) {
counter ++;
}
logger.info("query-{}: scan {} rows", queryId, counter);
}
RecordsSerializer serializer = new RecordsSerializer(gtSearchRequest.getRespResultSchema());
Pair<byte[], Long> serializedRowsInfo = serializer.serialize(searchResult.iterator(),
dataRequest.getStoragePushDownLimit());
DataResponse dataResponse = new DataResponse();
dataResponse.setData(Base64.encodeBase64String(serializedRowsInfo.getFirst()));
sw.stop();
logger.info("query-{}: return response, took {} ms", queryId, sw.elapsedMillis());
long finalCnt = serializedRowsInfo.getSecond();
queryProfile.setFinalRows(finalCnt);
String profileInfo = queryProfile.toString();
dataResponse.setProfile(profileInfo);
logger.info("query-{}: profile: {}", queryId, profileInfo);
return dataResponse;
} catch (Exception e) {
throw new StreamingException(e);
} finally {
if (searchResult != null) {
try {
searchResult.close();
} catch (Exception e) {
logger.error("Fail to close result scanner, query id:" + queryId);
}
}
}
}
private Set<FunctionDesc> convertMetrics(CubeDesc cubeDesc, List<FunctionDesc> metrics) {
Set<FunctionDesc> result = Sets.newHashSet();
for (FunctionDesc metric : metrics) {
result.add(findAggrFuncFromCubeDesc(cubeDesc, metric));
}
return result;
}
private FunctionDesc findAggrFuncFromCubeDesc(CubeDesc cubeDesc, FunctionDesc aggrFunc) {
aggrFunc.init(cubeDesc.getModel());
for (MeasureDesc measure : cubeDesc.getMeasures()) {
if (measure.getFunction().equals(aggrFunc))
return measure.getFunction();
}
return aggrFunc;
}
private Set<TblColRef> convertToTblColRef(Set<String> columnSet, CubeDesc cubeDesc) {
Set<TblColRef> result = Sets.newHashSet();
for (String columnName : columnSet) {
TblColRef tblColRef = cubeDesc.getModel().findColumn(columnName);
result.add(tblColRef);
}
return result;
}
}