| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.doris.qe.cache; |
| |
| import org.apache.doris.common.Status; |
| import org.apache.doris.proto.InternalService; |
| import org.apache.doris.proto.Types; |
| import org.apache.doris.qe.SimpleScheduler; |
| import org.apache.doris.rpc.BackendServiceProxy; |
| import org.apache.doris.rpc.RpcException; |
| import org.apache.doris.system.Backend; |
| import org.apache.doris.thrift.TNetworkAddress; |
| import org.apache.doris.thrift.TStatusCode; |
| |
| import org.apache.logging.log4j.LogManager; |
| import org.apache.logging.log4j.Logger; |
| |
| import java.util.List; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| |
| /** |
| * Encapsulates access to BE, including network and other exception handling |
| */ |
| public class CacheBeProxy extends CacheProxy { |
| private static final Logger LOG = LogManager.getLogger(CacheBeProxy.class); |
| |
| public void updateCache(InternalService.PUpdateCacheRequest request, int timeoutMs, Status status) { |
| Types.PUniqueId sqlKey = request.getSqlKey(); |
| Backend backend = CacheCoordinator.getInstance().findBackend(sqlKey); |
| if (backend == null) { |
| LOG.warn("update cache can't find backend, sqlKey {}", sqlKey); |
| return; |
| } |
| TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); |
| try { |
| Future<InternalService.PCacheResponse> future = BackendServiceProxy.getInstance() |
| .updateCache(address, request); |
| InternalService.PCacheResponse response = future.get(timeoutMs, TimeUnit.MILLISECONDS); |
| if (response.getStatus() == InternalService.PCacheStatus.CACHE_OK) { |
| status.setStatus(new Status(TStatusCode.OK, "CACHE_OK")); |
| } else { |
| status.setStatus(response.getStatus().toString()); |
| } |
| } catch (Exception e) { |
| LOG.warn("update cache exception, sqlKey {}", sqlKey, e); |
| status.setRpcStatus(e.getMessage()); |
| SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage()); |
| } |
| } |
| |
| public InternalService.PFetchCacheResult fetchCache(InternalService.PFetchCacheRequest request, |
| int timeoutMs, Status status) { |
| Types.PUniqueId sqlKey = request.getSqlKey(); |
| Backend backend = CacheCoordinator.getInstance().findBackend(sqlKey); |
| if (backend == null) { |
| return null; |
| } |
| TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); |
| try { |
| Future<InternalService.PFetchCacheResult> future = BackendServiceProxy.getInstance() |
| .fetchCache(address, request); |
| return future.get(timeoutMs, TimeUnit.MILLISECONDS); |
| } catch (RpcException e) { |
| LOG.warn("fetch catch rpc exception, sqlKey {}, backend {}", sqlKey, backend.getId(), e); |
| status.setRpcStatus(e.getMessage()); |
| SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage()); |
| } catch (InterruptedException e) { |
| LOG.warn("future get interrupted exception, sqlKey {}, backend {}", sqlKey, backend.getId(), e); |
| status.setStatus("interrupted exception"); |
| } catch (ExecutionException e) { |
| LOG.warn("future get execution exception, sqlKey {}, backend {}", sqlKey, backend.getId(), e); |
| status.setStatus("execution exception"); |
| } catch (TimeoutException e) { |
| LOG.warn("fetch result timeout, sqlKey {}, backend {}", sqlKey, backend.getId(), e); |
| status.setStatus("query timeout"); |
| } |
| return null; |
| } |
| |
| public void clearCache(InternalService.PClearCacheRequest request) { |
| this.clearCache(request, CacheCoordinator.getInstance().getBackendList()); |
| } |
| |
| public void clearCache(InternalService.PClearCacheRequest request, List<Backend> beList) { |
| int retry; |
| Status status = new Status(); |
| for (Backend backend : beList) { |
| retry = 1; |
| while (retry < 3 && !this.clearCache(request, backend, CLEAR_TIMEOUT, status)) { |
| retry++; |
| try { |
| Thread.sleep(1000); //sleep 1 second |
| } catch (Exception e) { |
| // CHECKSTYLE IGNORE THIS LINE |
| } |
| } |
| if (retry >= 3) { |
| String errMsg = "clear cache timeout, backend " + backend.getId(); |
| LOG.warn(errMsg); |
| SimpleScheduler.addToBlacklist(backend.getId(), errMsg); |
| } |
| } |
| } |
| |
| protected boolean clearCache(InternalService.PClearCacheRequest request, |
| Backend backend, int timeoutMs, Status status) { |
| TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); |
| try { |
| request = request.toBuilder().setClearType(InternalService.PClearType.CLEAR_ALL).build(); |
| LOG.info("clear all backend cache, backendId {}", backend.getId()); |
| Future<InternalService.PCacheResponse> future |
| = BackendServiceProxy.getInstance().clearCache(address, request); |
| InternalService.PCacheResponse response = future.get(timeoutMs, TimeUnit.MILLISECONDS); |
| if (response.getStatus() == InternalService.PCacheStatus.CACHE_OK) { |
| status.setStatus(new Status(TStatusCode.OK, "CACHE_OK")); |
| return true; |
| } else { |
| status.setStatus(response.getStatus().toString()); |
| return false; |
| } |
| } catch (Exception e) { |
| LOG.warn("clear cache exception, backendId {}", backend.getId(), e); |
| } |
| return false; |
| } |
| } |