| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.kylin.storage.stream.rpc; |
| |
| import java.io.IOException; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.zip.DataFormatException; |
| |
| import org.apache.commons.codec.binary.Base64; |
| import org.apache.kylin.common.KylinConfig; |
| import org.apache.kylin.common.QueryContext; |
| import org.apache.kylin.common.QueryContextFacade; |
| import org.apache.kylin.common.debug.BackdoorToggles; |
| import org.apache.kylin.common.util.JsonUtil; |
| import org.apache.kylin.cube.CubeInstance; |
| import org.apache.kylin.cube.model.CubeDesc; |
| import org.apache.kylin.metadata.filter.StringCodeSystem; |
| import org.apache.kylin.metadata.filter.TupleFilter; |
| import org.apache.kylin.metadata.filter.TupleFilterSerializer; |
| import org.apache.kylin.metadata.model.FunctionDesc; |
| import org.apache.kylin.metadata.model.TblColRef; |
| import org.apache.kylin.metadata.tuple.ITuple; |
| import org.apache.kylin.metadata.tuple.ITupleIterator; |
| import org.apache.kylin.metadata.tuple.TupleInfo; |
| import org.apache.kylin.stream.coordinator.assign.AssignmentsCache; |
| import org.apache.kylin.stream.core.model.ReplicaSet; |
| import org.apache.kylin.stream.core.model.DataRequest; |
| import org.apache.kylin.stream.core.model.DataResponse; |
| import org.apache.kylin.stream.core.model.Node; |
| import org.apache.kylin.stream.core.query.ResponseResultSchema; |
| import org.apache.kylin.stream.core.query.StreamingTupleConverter; |
| import org.apache.kylin.stream.core.query.StreamingTupleIterator; |
| import org.apache.kylin.stream.core.storage.Record; |
| import org.apache.kylin.stream.core.util.NamedThreadFactory; |
| import org.apache.kylin.stream.core.util.RecordsSerializer; |
| import org.apache.kylin.stream.core.util.RestService; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.base.Stopwatch; |
| import com.google.common.collect.Iterators; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| |
| /** |
| * TODO use long connection rather than short connection |
| */ |
| public class HttpStreamDataSearchClient implements IStreamDataSearchClient { |
| public static final Logger logger = LoggerFactory.getLogger(HttpStreamDataSearchClient.class); |
| |
| private static ExecutorService executorService; |
| static { |
| executorService = new ThreadPoolExecutor(20, 100, 60L, TimeUnit.MILLISECONDS, |
| new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("stream-rpc-pool-t")); |
| } |
| private AssignmentsCache assignmentsCache; |
| private RestService restService; |
| private Map<Node, Long> failedReceivers = Maps.newConcurrentMap(); |
| |
| public HttpStreamDataSearchClient() { |
| assignmentsCache = AssignmentsCache.getInstance(); |
| KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); |
| int connectionTimeout = kylinConfig.getStreamingRPCHttpConnTimeout(); |
| int readTimeout = kylinConfig.getStreamingRPCHttpReadTimeout(); |
| restService = new RestService(connectionTimeout, readTimeout); |
| } |
| |
| @Override |
| public ITupleIterator search(final long minSegmentTime, final CubeInstance cube, final TupleInfo tupleInfo, |
| final TupleFilter tupleFilter, final Set<TblColRef> dimensions, final Set<TblColRef> groups, |
| final Set<FunctionDesc> metrics, final int storagePushDownLimit, final boolean allowStorageAggregation) { |
| List<ReplicaSet> replicaSetsOfCube = assignmentsCache.getReplicaSetsByCube(cube.getName()); |
| int timeout = 120 * 1000; // timeout should be configurable |
| final QueuedStreamingTupleIterator result = new QueuedStreamingTupleIterator(replicaSetsOfCube.size(), timeout); |
| final QueryContext query = QueryContextFacade.current(); |
| |
| final CubeDesc cubeDesc = cube.getDescriptor(); |
| final ResponseResultSchema schema = new ResponseResultSchema(cubeDesc, dimensions, metrics); |
| final StreamingTupleConverter tupleConverter = new StreamingTupleConverter(schema, tupleInfo); |
| final RecordsSerializer recordsSerializer = new RecordsSerializer(schema); |
| final DataRequest dataRequest = createDataRequest(query.getQueryId(), cube.getName(), minSegmentTime, tupleInfo, |
| tupleFilter, dimensions, groups, metrics, storagePushDownLimit, allowStorageAggregation); |
| |
| logger.info("Query-{}:send request to stream receivers", query.getQueryId()); |
| for (final ReplicaSet rs : replicaSetsOfCube) { |
| executorService.submit(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| Iterator<ITuple> tuplesBlock = search(dataRequest, cube, tupleConverter, recordsSerializer, rs, |
| tupleInfo); |
| result.addBlock(tuplesBlock); |
| } catch (Exception e) { |
| result.setEndpointException(e); |
| } |
| } |
| }); |
| } |
| |
| return result; |
| } |
| |
| public Iterator<ITuple> search(DataRequest dataRequest, CubeInstance cube, StreamingTupleConverter tupleConverter, |
| RecordsSerializer recordsSerializer, ReplicaSet rs, TupleInfo tupleInfo) throws Exception { |
| List<Node> receivers = Lists.newArrayList(rs.getNodes()); |
| Node queryReceiver = findBestReceiverServeQuery(receivers, cube.getName()); |
| IOException exception; |
| try { |
| return doSearch(dataRequest, cube, tupleConverter, recordsSerializer, queryReceiver, tupleInfo); |
| } catch (IOException e) { |
| exception = e; |
| failedReceivers.put(queryReceiver, System.currentTimeMillis()); |
| logger.error("exception throws for receiver:" + queryReceiver + " retry another receiver"); |
| } |
| |
| for (int i = 0; i < receivers.size(); i++) { |
| Node receiver = receivers.get(i); |
| if (receiver.equals(queryReceiver)) { |
| continue; |
| } |
| try { |
| return doSearch(dataRequest, cube, tupleConverter, recordsSerializer, receiver, tupleInfo); |
| } catch (IOException e) { |
| exception = e; |
| failedReceivers.put(receiver, System.currentTimeMillis()); |
| logger.error("exception throws for receiver:" + receiver + " retry another receiver"); |
| } |
| |
| } |
| throw exception; |
| } |
| |
| private Node findBestReceiverServeQuery(List<Node> receivers, String cubeName) { |
| // stick to one receiver according to cube name |
| int receiversSize = receivers.size(); |
| int receiverNo = Math.abs(cubeName.hashCode()) % receiversSize; |
| Node foundReceiver = receivers.get(receiverNo); |
| Long lastFailTime = failedReceivers.get(foundReceiver); |
| if (lastFailTime == null) { |
| return foundReceiver; |
| } |
| |
| if (System.currentTimeMillis() - lastFailTime > 2 * 60 * 1000) { // retry every 2 minutes |
| return foundReceiver; |
| } |
| |
| return receivers.get((receiverNo + 1) % receiversSize); |
| } |
| |
| public Iterator<ITuple> doSearch(DataRequest dataRequest, CubeInstance cube, StreamingTupleConverter tupleConverter, |
| RecordsSerializer recordsSerializer, Node receiver, TupleInfo tupleInfo) throws Exception { |
| String queryId = dataRequest.getQueryId(); |
| logger.info("send query to receiver " + receiver + " with query id:" + queryId); |
| String url = "http://" + receiver.getHost() + ":" + receiver.getPort() + "/kylin/api/data/query"; |
| |
| try { |
| String content = JsonUtil.writeValueAsString(dataRequest); |
| Stopwatch sw = new Stopwatch(); |
| sw.start(); |
| int connTimeout = cube.getConfig().getStreamingRPCHttpConnTimeout(); |
| int readTimeout = cube.getConfig().getStreamingRPCHttpReadTimeout(); |
| String msg = restService.postRequest(url, content, connTimeout, readTimeout); |
| |
| logger.info("query-{}: receive response from {} take time:{}", queryId, receiver, sw.elapsedMillis()); |
| if (failedReceivers.containsKey(receiver)) { |
| failedReceivers.remove(receiver); |
| } |
| DataResponse response = JsonUtil.readValue(msg, DataResponse.class); |
| logger.info("query-{}: receiver {} profile info:{}", queryId, receiver, response.getProfile()); |
| return deserializeResponse(tupleConverter, recordsSerializer, cube.getName(), tupleInfo, response); |
| } catch (Exception e) { |
| logger.error("error when search data from receiver:" + url, e); |
| throw e; |
| } |
| } |
| |
| public Iterator<ITuple> deserializeResponse(final StreamingTupleConverter tupleConverter, |
| final RecordsSerializer recordsSerializer, String cubeName, TupleInfo tupleInfo, DataResponse response) |
| throws IOException, DataFormatException { |
| final Iterator<Record> records = recordsSerializer.deserialize(Base64.decodeBase64(response.getData())); |
| return new StreamingTupleIterator(records, tupleConverter, tupleInfo); |
| } |
| |
| private DataRequest createDataRequest(String queryId, String cubeName, long minSegmentTime, TupleInfo tupleInfo, |
| TupleFilter tupleFilter, Set<TblColRef> dimensions, Set<TblColRef> groups, Set<FunctionDesc> metrics, |
| int storagePushDownLimit, boolean allowStorageAggregation) { |
| DataRequest request = new DataRequest(); |
| request.setCubeName(cubeName); |
| request.setQueryId(queryId); |
| request.setMinSegmentTime(minSegmentTime); |
| request.setTupleFilter(Base64.encodeBase64String(TupleFilterSerializer.serialize(tupleFilter, |
| StringCodeSystem.INSTANCE))); |
| request.setStoragePushDownLimit(storagePushDownLimit); |
| request.setAllowStorageAggregation(allowStorageAggregation); |
| request.setRequestSendTime(System.currentTimeMillis()); |
| request.setEnableDetailProfile(BackdoorToggles.isStreamingProfileEnable()); |
| request.setStorageBehavior(BackdoorToggles.getCoprocessorBehavior()); |
| |
| Set<String> dimensionSet = Sets.newHashSet(); |
| for (TblColRef dimension : dimensions) { |
| dimensionSet.add(dimension.getCanonicalName()); |
| } |
| request.setDimensions(dimensionSet); |
| |
| Set<String> groupSet = Sets.newHashSet(); |
| for (TblColRef group : groups) { |
| groupSet.add(group.getCanonicalName()); |
| } |
| request.setGroups(groupSet); |
| |
| request.setMetrics(Lists.newArrayList(metrics)); |
| |
| return request; |
| } |
| |
| public static class QueuedStreamingTupleIterator implements ITupleIterator { |
| private BlockingQueue<Iterator<ITuple>> queue; |
| |
| private Iterator<ITuple> currentBlock = Iterators.emptyIterator(); |
| |
| private int totalBlockNum; |
| private int numConsumeBlocks = 0; |
| |
| private int timeout; |
| private long timeoutTS; |
| private volatile Exception endpointException; |
| |
| public QueuedStreamingTupleIterator(int blockNum, int timeout) { |
| this.queue = new LinkedBlockingQueue<>(blockNum); |
| this.totalBlockNum = blockNum; |
| this.timeout *= 1.1; |
| this.timeoutTS = System.currentTimeMillis() + timeout; |
| } |
| |
| public void addBlock(Iterator<ITuple> tuples) { |
| try { |
| queue.put(tuples); |
| } catch (InterruptedException e) { |
| logger.error("interrupted", e); |
| throw new RuntimeException("interrupted", e); |
| } |
| } |
| |
| public void setEndpointException(Exception e) { |
| this.endpointException = e; |
| } |
| |
| private boolean hasEndpointFail() { |
| return endpointException != null; |
| } |
| |
| @Override |
| public void close() { |
| // do nothing |
| } |
| |
| @Override |
| public boolean hasNext() { |
| try { |
| if (currentBlock.hasNext()) { |
| return true; |
| } else if (numConsumeBlocks < totalBlockNum) { |
| while (numConsumeBlocks < totalBlockNum) { |
| if (hasEndpointFail()) { |
| throw new RuntimeException("endpoint fail", endpointException); |
| } |
| Iterator<ITuple> ret = null; |
| while (ret == null && endpointException == null && timeoutTS > System.currentTimeMillis()) { |
| ret = queue.poll(1000, TimeUnit.MILLISECONDS); |
| } |
| currentBlock = ret; |
| if (currentBlock == null) { |
| throw new RuntimeException("timeout when call stream rpc"); |
| } |
| numConsumeBlocks++; |
| if (currentBlock.hasNext()) { |
| return true; |
| } |
| } |
| |
| } |
| } catch (InterruptedException e) { |
| logger.error("interrupted", e); |
| throw new RuntimeException("interrupted", e); |
| } |
| |
| return false; |
| } |
| |
| @Override |
| public ITuple next() { |
| return currentBlock.next(); |
| } |
| |
| @Override |
| public void remove() { |
| throw new UnsupportedOperationException(); |
| } |
| } |
| |
| } |