| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.server; |
| |
| import com.fasterxml.jackson.core.JsonProcessingException; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.inject.Inject; |
| import com.google.inject.Provider; |
| import org.apache.calcite.avatica.remote.ProtobufTranslation; |
| import org.apache.calcite.avatica.remote.ProtobufTranslationImpl; |
| import org.apache.calcite.avatica.remote.Service; |
| import org.apache.commons.io.IOUtils; |
| import org.apache.druid.client.selector.Server; |
| import org.apache.druid.guice.annotations.Json; |
| import org.apache.druid.guice.annotations.Smile; |
| import org.apache.druid.guice.http.DruidHttpClientConfig; |
| import org.apache.druid.java.util.common.DateTimes; |
| import org.apache.druid.java.util.common.IAE; |
| import org.apache.druid.java.util.common.jackson.JacksonUtils; |
| import org.apache.druid.java.util.emitter.EmittingLogger; |
| import org.apache.druid.java.util.emitter.service.ServiceEmitter; |
| import org.apache.druid.query.DruidMetrics; |
| import org.apache.druid.query.GenericQueryMetricsFactory; |
| import org.apache.druid.query.Query; |
| import org.apache.druid.query.QueryMetrics; |
| import org.apache.druid.query.QueryToolChestWarehouse; |
| import org.apache.druid.server.log.RequestLogger; |
| import org.apache.druid.server.metrics.QueryCountStatsProvider; |
| import org.apache.druid.server.router.QueryHostFinder; |
| import org.apache.druid.server.router.Router; |
| import org.apache.druid.server.security.AuthConfig; |
| import org.apache.druid.server.security.AuthenticationResult; |
| import org.apache.druid.server.security.Authenticator; |
| import org.apache.druid.server.security.AuthenticatorMapper; |
| import org.apache.druid.sql.http.SqlQuery; |
| import org.eclipse.jetty.client.HttpClient; |
| import org.eclipse.jetty.client.api.Request; |
| import org.eclipse.jetty.client.api.Response; |
| import org.eclipse.jetty.client.api.Result; |
| import org.eclipse.jetty.client.util.BytesContentProvider; |
| import org.eclipse.jetty.http.HttpHeader; |
| import org.eclipse.jetty.http.HttpMethod; |
| import org.eclipse.jetty.proxy.AsyncProxyServlet; |
| |
| import javax.servlet.ServletException; |
| import javax.servlet.http.HttpServletRequest; |
| import javax.servlet.http.HttpServletResponse; |
| import javax.ws.rs.core.MediaType; |
| import javax.ws.rs.core.Response.Status; |
| import java.io.IOException; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.UUID; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| /** |
| * This class does async query processing and should be merged with QueryResource at some point |
| */ |
| public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements QueryCountStatsProvider |
| { |
| private static final EmittingLogger log = new EmittingLogger(AsyncQueryForwardingServlet.class); |
| @Deprecated // use SmileMediaTypes.APPLICATION_JACKSON_SMILE |
| private static final String APPLICATION_SMILE = "application/smile"; |
| |
| private static final String AVATICA_CONNECTION_ID = "connectionId"; |
| private static final String AVATICA_STATEMENT_HANDLE = "statementHandle"; |
| |
| private static final String HOST_ATTRIBUTE = "org.apache.druid.proxy.to.host"; |
| private static final String SCHEME_ATTRIBUTE = "org.apache.druid.proxy.to.host.scheme"; |
| private static final String QUERY_ATTRIBUTE = "org.apache.druid.proxy.query"; |
| private static final String AVATICA_QUERY_ATTRIBUTE = "org.apache.druid.proxy.avaticaQuery"; |
| private static final String SQL_QUERY_ATTRIBUTE = "org.apache.druid.proxy.sqlQuery"; |
| private static final String OBJECTMAPPER_ATTRIBUTE = "org.apache.druid.proxy.objectMapper"; |
| |
| private static final String PROPERTY_SQL_ENABLE = "druid.router.sql.enable"; |
| private static final String PROPERTY_SQL_ENABLE_DEFAULT = "false"; |
| |
| private static final int CANCELLATION_TIMEOUT_MILLIS = 500; |
| |
| private final AtomicLong successfulQueryCount = new AtomicLong(); |
| private final AtomicLong failedQueryCount = new AtomicLong(); |
| private final AtomicLong interruptedQueryCount = new AtomicLong(); |
| |
| private static void handleException(HttpServletResponse response, ObjectMapper objectMapper, Exception exception) |
| throws IOException |
| { |
| if (!response.isCommitted()) { |
| final String errorMessage = exception.getMessage() == null ? "null exception" : exception.getMessage(); |
| |
| response.resetBuffer(); |
| response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); |
| objectMapper.writeValue( |
| response.getOutputStream(), |
| ImmutableMap.of("error", errorMessage) |
| ); |
| } |
| response.flushBuffer(); |
| } |
| |
| private final QueryToolChestWarehouse warehouse; |
| private final ObjectMapper jsonMapper; |
| private final ObjectMapper smileMapper; |
| private final QueryHostFinder hostFinder; |
| private final Provider<HttpClient> httpClientProvider; |
| private final DruidHttpClientConfig httpClientConfig; |
| private final ServiceEmitter emitter; |
| private final RequestLogger requestLogger; |
| private final GenericQueryMetricsFactory queryMetricsFactory; |
| private final AuthenticatorMapper authenticatorMapper; |
| private final ProtobufTranslation protobufTranslation; |
| |
| private final boolean routeSqlQueries; |
| |
| private HttpClient broadcastClient; |
| |
| @Inject |
| public AsyncQueryForwardingServlet( |
| QueryToolChestWarehouse warehouse, |
| @Json ObjectMapper jsonMapper, |
| @Smile ObjectMapper smileMapper, |
| QueryHostFinder hostFinder, |
| @Router Provider<HttpClient> httpClientProvider, |
| @Router DruidHttpClientConfig httpClientConfig, |
| ServiceEmitter emitter, |
| RequestLogger requestLogger, |
| GenericQueryMetricsFactory queryMetricsFactory, |
| AuthenticatorMapper authenticatorMapper, |
| Properties properties |
| ) |
| { |
| this.warehouse = warehouse; |
| this.jsonMapper = jsonMapper; |
| this.smileMapper = smileMapper; |
| this.hostFinder = hostFinder; |
| this.httpClientProvider = httpClientProvider; |
| this.httpClientConfig = httpClientConfig; |
| this.emitter = emitter; |
| this.requestLogger = requestLogger; |
| this.queryMetricsFactory = queryMetricsFactory; |
| this.authenticatorMapper = authenticatorMapper; |
| this.protobufTranslation = new ProtobufTranslationImpl(); |
| this.routeSqlQueries = Boolean.parseBoolean( |
| properties.getProperty(PROPERTY_SQL_ENABLE, PROPERTY_SQL_ENABLE_DEFAULT) |
| ); |
| } |
| |
| @Override |
| public void init() throws ServletException |
| { |
| super.init(); |
| |
| // Note that httpClientProvider is setup to return same HttpClient instance on each get() so |
| // it is same http client as that is used by parent ProxyServlet. |
| broadcastClient = newHttpClient(); |
| try { |
| broadcastClient.start(); |
| } |
| catch (Exception e) { |
| throw new ServletException(e); |
| } |
| } |
| |
| @Override |
| public void destroy() |
| { |
| super.destroy(); |
| try { |
| broadcastClient.stop(); |
| } |
| catch (Exception e) { |
| log.warn(e, "Error stopping servlet"); |
| } |
| } |
| |
| @Override |
| protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException |
| { |
| final boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(request.getContentType()) |
| || APPLICATION_SMILE.equals(request.getContentType()); |
| final ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper; |
| request.setAttribute(OBJECTMAPPER_ATTRIBUTE, objectMapper); |
| |
| final String requestURI = request.getRequestURI(); |
| final String method = request.getMethod(); |
| final Server targetServer; |
| |
| // The Router does not have the ability to look inside SQL queries and route them intelligently, so just treat |
| // them as a generic request. |
| final boolean isNativeQueryEndpoint = requestURI.startsWith("/druid/v2") && !requestURI.startsWith("/druid/v2/sql"); |
| final boolean isSqlQueryEndpoint = requestURI.startsWith("/druid/v2/sql"); |
| |
| final boolean isAvaticaJson = requestURI.startsWith("/druid/v2/sql/avatica"); |
| final boolean isAvaticaPb = requestURI.startsWith("/druid/v2/sql/avatica-protobuf"); |
| |
| if (isAvaticaPb) { |
| byte[] requestBytes = IOUtils.toByteArray(request.getInputStream()); |
| Service.Request protobufRequest = this.protobufTranslation.parseRequest(requestBytes); |
| String connectionId = getAvaticaProtobufConnectionId(protobufRequest); |
| targetServer = hostFinder.findServerAvatica(connectionId); |
| request.setAttribute(AVATICA_QUERY_ATTRIBUTE, requestBytes); |
| } else if (isAvaticaJson) { |
| Map<String, Object> requestMap = objectMapper.readValue( |
| request.getInputStream(), |
| JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT |
| ); |
| String connectionId = getAvaticaConnectionId(requestMap); |
| targetServer = hostFinder.findServerAvatica(connectionId); |
| byte[] requestBytes = objectMapper.writeValueAsBytes(requestMap); |
| request.setAttribute(AVATICA_QUERY_ATTRIBUTE, requestBytes); |
| } else if (isNativeQueryEndpoint && HttpMethod.DELETE.is(method)) { |
| // query cancellation request |
| targetServer = hostFinder.pickDefaultServer(); |
| broadcastQueryCancelRequest(request, targetServer); |
| } else if (isNativeQueryEndpoint && HttpMethod.POST.is(method)) { |
| // query request |
| try { |
| Query inputQuery = objectMapper.readValue(request.getInputStream(), Query.class); |
| if (inputQuery != null) { |
| targetServer = hostFinder.pickServer(inputQuery); |
| if (inputQuery.getId() == null) { |
| inputQuery = inputQuery.withId(UUID.randomUUID().toString()); |
| } |
| } else { |
| targetServer = hostFinder.pickDefaultServer(); |
| } |
| request.setAttribute(QUERY_ATTRIBUTE, inputQuery); |
| } |
| catch (IOException e) { |
| handleQueryParseException(request, response, objectMapper, e, true); |
| return; |
| } |
| catch (Exception e) { |
| handleException(response, objectMapper, e); |
| return; |
| } |
| } else if (routeSqlQueries && isSqlQueryEndpoint && HttpMethod.POST.is(method)) { |
| try { |
| SqlQuery inputSqlQuery = objectMapper.readValue(request.getInputStream(), SqlQuery.class); |
| request.setAttribute(SQL_QUERY_ATTRIBUTE, inputSqlQuery); |
| targetServer = hostFinder.findServerSql(inputSqlQuery); |
| } |
| catch (IOException e) { |
| handleQueryParseException(request, response, objectMapper, e, false); |
| return; |
| } |
| catch (Exception e) { |
| handleException(response, objectMapper, e); |
| return; |
| } |
| } else { |
| targetServer = hostFinder.pickDefaultServer(); |
| } |
| |
| request.setAttribute(HOST_ATTRIBUTE, targetServer.getHost()); |
| request.setAttribute(SCHEME_ATTRIBUTE, targetServer.getScheme()); |
| |
| doService(request, response); |
| } |
| |
| /** |
| * Issues async query cancellation requests to all Brokers (except the given |
| * targetServer). Query cancellation on the targetServer is handled by the |
| * proxy servlet. |
| */ |
| private void broadcastQueryCancelRequest(HttpServletRequest request, Server targetServer) |
| { |
| // send query cancellation to all brokers this query may have gone to |
| // to keep the code simple, the proxy servlet will also send a request to the default targetServer. |
| for (final Server server : hostFinder.getAllServers()) { |
| if (server.getHost().equals(targetServer.getHost())) { |
| continue; |
| } |
| |
| // issue async requests |
| Response.CompleteListener completeListener = result -> { |
| if (result.isFailed()) { |
| log.warn( |
| result.getFailure(), |
| "Failed to forward cancellation request to [%s]", |
| server.getHost() |
| ); |
| } |
| }; |
| |
| Request broadcastReq = broadcastClient |
| .newRequest(rewriteURI(request, server.getScheme(), server.getHost())) |
| .method(HttpMethod.DELETE) |
| .timeout(CANCELLATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); |
| |
| copyRequestHeaders(request, broadcastReq); |
| broadcastReq.send(completeListener); |
| } |
| |
| interruptedQueryCount.incrementAndGet(); |
| } |
| |
| private void handleQueryParseException( |
| HttpServletRequest request, |
| HttpServletResponse response, |
| ObjectMapper objectMapper, |
| IOException parseException, |
| boolean isNativeQuery |
| ) throws IOException |
| { |
| log.warn(parseException, "Exception parsing query"); |
| |
| // Log the error message |
| final String errorMessage = parseException.getMessage() == null |
| ? "no error message" : parseException.getMessage(); |
| if (isNativeQuery) { |
| requestLogger.logNativeQuery( |
| RequestLogLine.forNative( |
| null, |
| DateTimes.nowUtc(), |
| request.getRemoteAddr(), |
| new QueryStats(ImmutableMap.of("success", false, "exception", errorMessage)) |
| ) |
| ); |
| } else { |
| requestLogger.logSqlQuery( |
| RequestLogLine.forSql( |
| null, |
| null, |
| DateTimes.nowUtc(), |
| request.getRemoteAddr(), |
| new QueryStats(ImmutableMap.of("success", false, "exception", errorMessage)) |
| ) |
| ); |
| } |
| |
| // Write to the response |
| response.setStatus(HttpServletResponse.SC_BAD_REQUEST); |
| response.setContentType(MediaType.APPLICATION_JSON); |
| objectMapper.writeValue( |
| response.getOutputStream(), |
| ImmutableMap.of("error", errorMessage) |
| ); |
| } |
| |
| protected void doService( |
| HttpServletRequest request, |
| HttpServletResponse response |
| ) throws ServletException, IOException |
| { |
| // Just call the superclass service method. Overriden in tests. |
| super.service(request, response); |
| } |
| |
| @Override |
| protected void sendProxyRequest( |
| HttpServletRequest clientRequest, |
| HttpServletResponse proxyResponse, |
| Request proxyRequest |
| ) |
| { |
| proxyRequest.timeout(httpClientConfig.getReadTimeout().getMillis(), TimeUnit.MILLISECONDS); |
| proxyRequest.idleTimeout(httpClientConfig.getReadTimeout().getMillis(), TimeUnit.MILLISECONDS); |
| |
| byte[] avaticaQuery = (byte[]) clientRequest.getAttribute(AVATICA_QUERY_ATTRIBUTE); |
| if (avaticaQuery != null) { |
| proxyRequest.content(new BytesContentProvider(avaticaQuery)); |
| } |
| |
| final Query query = (Query) clientRequest.getAttribute(QUERY_ATTRIBUTE); |
| final SqlQuery sqlQuery = (SqlQuery) clientRequest.getAttribute(SQL_QUERY_ATTRIBUTE); |
| if (query != null) { |
| setProxyRequestContent(proxyRequest, clientRequest, query); |
| } else if (sqlQuery != null) { |
| setProxyRequestContent(proxyRequest, clientRequest, sqlQuery); |
| } |
| |
| // Since we can't see the request object on the remote side, we can't check whether the remote side actually |
| // performed an authorization check here, so always set this to true for the proxy servlet. |
| // If the remote node failed to perform an authorization check, PreResponseAuthorizationCheckFilter |
| // will log that on the remote node. |
| clientRequest.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true); |
| |
| // Check if there is an authentication result and use it to decorate the proxy request if needed. |
| AuthenticationResult authenticationResult = (AuthenticationResult) clientRequest.getAttribute( |
| AuthConfig.DRUID_AUTHENTICATION_RESULT); |
| if (authenticationResult != null && authenticationResult.getAuthenticatedBy() != null) { |
| Authenticator authenticator = authenticatorMapper.getAuthenticatorMap() |
| .get(authenticationResult.getAuthenticatedBy()); |
| if (authenticator != null) { |
| authenticator.decorateProxyRequest( |
| clientRequest, |
| proxyResponse, |
| proxyRequest |
| ); |
| } else { |
| log.error("Can not find Authenticator with Name [%s]", authenticationResult.getAuthenticatedBy()); |
| } |
| } |
| super.sendProxyRequest( |
| clientRequest, |
| proxyResponse, |
| proxyRequest |
| ); |
| } |
| |
| private void setProxyRequestContent(Request proxyRequest, HttpServletRequest clientRequest, Object content) |
| { |
| final ObjectMapper objectMapper = (ObjectMapper) clientRequest.getAttribute(OBJECTMAPPER_ATTRIBUTE); |
| try { |
| byte[] bytes = objectMapper.writeValueAsBytes(content); |
| proxyRequest.content(new BytesContentProvider(bytes)); |
| proxyRequest.getHeaders().put(HttpHeader.CONTENT_LENGTH, String.valueOf(bytes.length)); |
| } |
| catch (JsonProcessingException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| protected Response.Listener newProxyResponseListener(HttpServletRequest request, HttpServletResponse response) |
| { |
| final Query query = (Query) request.getAttribute(QUERY_ATTRIBUTE); |
| if (query != null) { |
| return newMetricsEmittingProxyResponseListener(request, response, query, System.nanoTime()); |
| } else { |
| return super.newProxyResponseListener(request, response); |
| } |
| } |
| |
| @Override |
| protected String rewriteTarget(HttpServletRequest request) |
| { |
| return rewriteURI( |
| request, |
| (String) request.getAttribute(SCHEME_ATTRIBUTE), |
| (String) request.getAttribute(HOST_ATTRIBUTE) |
| ); |
| } |
| |
| protected String rewriteURI(HttpServletRequest request, String scheme, String host) |
| { |
| return makeURI(scheme, host, request.getRequestURI(), request.getQueryString()); |
| } |
| |
| @VisibleForTesting |
| static String makeURI(String scheme, String host, String requestURI, String rawQueryString) |
| { |
| return JettyUtils.concatenateForRewrite( |
| scheme + "://" + host, |
| requestURI, |
| rawQueryString |
| ); |
| } |
| |
| @Override |
| protected HttpClient newHttpClient() |
| { |
| return httpClientProvider.get(); |
| } |
| |
| @Override |
| protected HttpClient createHttpClient() throws ServletException |
| { |
| HttpClient client = super.createHttpClient(); |
| // override timeout set in ProxyServlet.createHttpClient |
| setTimeout(httpClientConfig.getReadTimeout().getMillis()); |
| return client; |
| } |
| |
| private Response.Listener newMetricsEmittingProxyResponseListener( |
| HttpServletRequest request, |
| HttpServletResponse response, |
| Query query, |
| long startNs |
| ) |
| { |
| return new MetricsEmittingProxyResponseListener(request, response, query, startNs); |
| } |
| |
| @Override |
| public long getSuccessfulQueryCount() |
| { |
| return successfulQueryCount.get(); |
| } |
| |
| @Override |
| public long getFailedQueryCount() |
| { |
| return failedQueryCount.get(); |
| } |
| |
| @Override |
| public long getInterruptedQueryCount() |
| { |
| return interruptedQueryCount.get(); |
| } |
| |
| @Override |
| public long getTimedOutQueryCount() |
| { |
| // Query timeout metric is not relevant here and this metric is already being tracked in the Broker and the |
| // data nodes using QueryResource |
| return 0L; |
| } |
| |
| @VisibleForTesting |
| static String getAvaticaConnectionId(Map<String, Object> requestMap) |
| { |
| // avatica commands always have a 'connectionId'. If commands are not part of a prepared statement, this appears at |
| // the top level of the request, but if it is part of a statement, then it will be nested in the 'statementHandle'. |
| // see https://calcite.apache.org/avatica/docs/json_reference.html#requests for more details |
| Object connectionIdObj = requestMap.get(AVATICA_CONNECTION_ID); |
| if (connectionIdObj == null) { |
| Object statementHandle = requestMap.get(AVATICA_STATEMENT_HANDLE); |
| if (statementHandle != null && statementHandle instanceof Map) { |
| connectionIdObj = ((Map) statementHandle).get(AVATICA_CONNECTION_ID); |
| } |
| } |
| |
| if (connectionIdObj == null) { |
| throw new IAE("Received an Avatica request without a %s.", AVATICA_CONNECTION_ID); |
| } |
| if (!(connectionIdObj instanceof String)) { |
| throw new IAE("Received an Avatica request with a non-String %s.", AVATICA_CONNECTION_ID); |
| } |
| |
| return (String) connectionIdObj; |
| } |
| |
| static String getAvaticaProtobufConnectionId(Service.Request request) |
| { |
| if (request instanceof Service.CatalogsRequest) { |
| return ((Service.CatalogsRequest) request).connectionId; |
| } |
| |
| if (request instanceof Service.SchemasRequest) { |
| return ((Service.SchemasRequest) request).connectionId; |
| } |
| |
| if (request instanceof Service.TablesRequest) { |
| return ((Service.TablesRequest) request).connectionId; |
| } |
| |
| if (request instanceof Service.TypeInfoRequest) { |
| return ((Service.TypeInfoRequest) request).connectionId; |
| } |
| |
| if (request instanceof Service.ColumnsRequest) { |
| return ((Service.ColumnsRequest) request).connectionId; |
| } |
| |
| if (request instanceof Service.ExecuteRequest) { |
| return ((Service.ExecuteRequest) request).statementHandle.connectionId; |
| } |
| |
| if (request instanceof Service.TableTypesRequest) { |
| return ((Service.TableTypesRequest) request).connectionId; |
| } |
| |
| if (request instanceof Service.PrepareRequest) { |
| return ((Service.PrepareRequest) request).connectionId; |
| } |
| |
| if (request instanceof Service.PrepareAndExecuteRequest) { |
| return ((Service.PrepareAndExecuteRequest) request).connectionId; |
| } |
| |
| if (request instanceof Service.FetchRequest) { |
| return ((Service.FetchRequest) request).connectionId; |
| } |
| |
| if (request instanceof Service.CreateStatementRequest) { |
| return ((Service.CreateStatementRequest) request).connectionId; |
| } |
| |
| if (request instanceof Service.CloseStatementRequest) { |
| return ((Service.CloseStatementRequest) request).connectionId; |
| } |
| |
| if (request instanceof Service.OpenConnectionRequest) { |
| return ((Service.OpenConnectionRequest) request).connectionId; |
| } |
| |
| if (request instanceof Service.CloseConnectionRequest) { |
| return ((Service.CloseConnectionRequest) request).connectionId; |
| } |
| |
| if (request instanceof Service.ConnectionSyncRequest) { |
| return ((Service.ConnectionSyncRequest) request).connectionId; |
| } |
| |
| if (request instanceof Service.DatabasePropertyRequest) { |
| return ((Service.DatabasePropertyRequest) request).connectionId; |
| } |
| |
| if (request instanceof Service.SyncResultsRequest) { |
| return ((Service.SyncResultsRequest) request).connectionId; |
| } |
| |
| if (request instanceof Service.CommitRequest) { |
| return ((Service.CommitRequest) request).connectionId; |
| } |
| |
| if (request instanceof Service.RollbackRequest) { |
| return ((Service.RollbackRequest) request).connectionId; |
| } |
| |
| if (request instanceof Service.PrepareAndExecuteBatchRequest) { |
| return ((Service.PrepareAndExecuteBatchRequest) request).connectionId; |
| } |
| |
| if (request instanceof Service.ExecuteBatchRequest) { |
| return ((Service.ExecuteBatchRequest) request).connectionId; |
| } |
| |
| throw new IAE("Received an unknown Avatica protobuf request"); |
| } |
| |
| private class MetricsEmittingProxyResponseListener<T> extends ProxyResponseListener |
| { |
| private final HttpServletRequest req; |
| private final HttpServletResponse res; |
| private final Query<T> query; |
| private final long startNs; |
| |
| public MetricsEmittingProxyResponseListener( |
| HttpServletRequest request, |
| HttpServletResponse response, |
| Query<T> query, |
| long startNs |
| ) |
| { |
| super(request, response); |
| |
| this.req = request; |
| this.res = response; |
| this.query = query; |
| this.startNs = startNs; |
| } |
| |
| @Override |
| public void onComplete(Result result) |
| { |
| final long requestTimeNs = System.nanoTime() - startNs; |
| try { |
| boolean success = result.isSucceeded(); |
| if (success) { |
| successfulQueryCount.incrementAndGet(); |
| } else { |
| failedQueryCount.incrementAndGet(); |
| } |
| emitQueryTime(requestTimeNs, success); |
| requestLogger.logNativeQuery( |
| RequestLogLine.forNative( |
| query, |
| DateTimes.nowUtc(), |
| req.getRemoteAddr(), |
| new QueryStats( |
| ImmutableMap.of( |
| "query/time", |
| TimeUnit.NANOSECONDS.toMillis(requestTimeNs), |
| "success", |
| success |
| && result.getResponse().getStatus() == Status.OK.getStatusCode() |
| ) |
| ) |
| ) |
| ); |
| } |
| catch (Exception e) { |
| log.error(e, "Unable to log query [%s]!", query); |
| } |
| |
| super.onComplete(result); |
| } |
| |
| @Override |
| public void onFailure(Response response, Throwable failure) |
| { |
| try { |
| final String errorMessage = failure.getMessage(); |
| failedQueryCount.incrementAndGet(); |
| emitQueryTime(System.nanoTime() - startNs, false); |
| requestLogger.logNativeQuery( |
| RequestLogLine.forNative( |
| query, |
| DateTimes.nowUtc(), |
| req.getRemoteAddr(), |
| new QueryStats( |
| ImmutableMap.of( |
| "success", |
| false, |
| "exception", |
| errorMessage == null ? "no message" : errorMessage |
| ) |
| ) |
| ) |
| ); |
| } |
| catch (IOException logError) { |
| log.error(logError, "Unable to log query [%s]!", query); |
| } |
| |
| log.makeAlert(failure, "Exception handling request") |
| .addData("exception", failure.toString()) |
| .addData("query", query) |
| .addData("peer", req.getRemoteAddr()) |
| .emit(); |
| |
| super.onFailure(response, failure); |
| } |
| |
| private void emitQueryTime(long requestTimeNs, boolean success) |
| { |
| QueryMetrics queryMetrics = DruidMetrics.makeRequestMetrics( |
| queryMetricsFactory, |
| warehouse.getToolChest(query), |
| query, |
| req.getRemoteAddr() |
| ); |
| queryMetrics.success(success); |
| queryMetrics.reportQueryTime(requestTimeNs).emit(emitter); |
| } |
| } |
| } |