| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.drill.exec.client; |
| |
| import static org.apache.drill.shaded.guava.com.google.common.base.Preconditions.checkArgument; |
| import static org.apache.drill.shaded.guava.com.google.common.base.Preconditions.checkNotNull; |
| import static org.apache.drill.shaded.guava.com.google.common.base.Preconditions.checkState; |
| import static org.apache.drill.exec.proto.UserProtos.QueryResultsMode.STREAM_FULL; |
| import static org.apache.drill.exec.proto.UserProtos.RunQuery.newBuilder; |
| |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.Vector; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.SynchronousQueue; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.drill.common.DrillAutoCloseables; |
| import org.apache.drill.common.Version; |
| import org.apache.drill.common.config.DrillConfig; |
| import org.apache.drill.common.config.DrillProperties; |
| import org.apache.drill.common.exceptions.DrillRuntimeException; |
| import org.apache.drill.common.exceptions.UserException; |
| import org.apache.drill.exec.ExecConstants; |
| import org.apache.drill.exec.coord.ClusterCoordinator; |
| import org.apache.drill.exec.coord.zk.ZKClusterCoordinator; |
| import org.apache.drill.exec.exception.OutOfMemoryException; |
| import org.apache.drill.exec.memory.BufferAllocator; |
| import org.apache.drill.exec.memory.RootAllocatorFactory; |
| import org.apache.drill.exec.proto.BitControl.PlanFragment; |
| import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; |
| import org.apache.drill.exec.proto.GeneralRPCProtos; |
| import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; |
| import org.apache.drill.exec.proto.UserBitShared; |
| import org.apache.drill.exec.proto.UserBitShared.QueryId; |
| import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; |
| import org.apache.drill.exec.proto.UserBitShared.QueryType; |
| import org.apache.drill.exec.proto.UserProtos; |
| import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementReq; |
| import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementResp; |
| import org.apache.drill.exec.proto.UserProtos.GetCatalogsReq; |
| import org.apache.drill.exec.proto.UserProtos.GetCatalogsResp; |
| import org.apache.drill.exec.proto.UserProtos.GetColumnsReq; |
| import org.apache.drill.exec.proto.UserProtos.GetColumnsResp; |
| import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments; |
| import org.apache.drill.exec.proto.UserProtos.GetSchemasReq; |
| import org.apache.drill.exec.proto.UserProtos.GetSchemasResp; |
| import org.apache.drill.exec.proto.UserProtos.GetServerMetaReq; |
| import org.apache.drill.exec.proto.UserProtos.GetServerMetaResp; |
| import org.apache.drill.exec.proto.UserProtos.GetTablesReq; |
| import org.apache.drill.exec.proto.UserProtos.GetTablesResp; |
| import org.apache.drill.exec.proto.UserProtos.LikeFilter; |
| import org.apache.drill.exec.proto.UserProtos.PreparedStatementHandle; |
| import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments; |
| import org.apache.drill.exec.proto.UserProtos.RpcEndpointInfos; |
| import org.apache.drill.exec.proto.UserProtos.RpcType; |
| import org.apache.drill.exec.proto.UserProtos.RunQuery; |
| import org.apache.drill.exec.proto.helper.QueryIdHelper; |
| import org.apache.drill.exec.rpc.ChannelClosedException; |
| import org.apache.drill.exec.rpc.ConnectionThrottle; |
| import org.apache.drill.exec.rpc.DrillRpcFuture; |
| import org.apache.drill.exec.rpc.NamedThreadFactory; |
| import org.apache.drill.exec.rpc.NonTransientRpcException; |
| import org.apache.drill.exec.rpc.RpcException; |
| import org.apache.drill.exec.rpc.TransportCheck; |
| import org.apache.drill.exec.rpc.user.QueryDataBatch; |
| import org.apache.drill.exec.rpc.user.UserClient; |
| import org.apache.drill.exec.rpc.user.UserResultsListener; |
| import org.apache.drill.exec.rpc.user.UserRpcUtils; |
| |
| import com.fasterxml.jackson.core.JsonProcessingException; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.fasterxml.jackson.databind.node.ArrayNode; |
| import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; |
| import org.apache.drill.shaded.guava.com.google.common.base.Strings; |
| import org.apache.drill.shaded.guava.com.google.common.util.concurrent.SettableFuture; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import io.netty.channel.EventLoopGroup; |
| |
| /** |
| * Thin wrapper around a UserClient that handles connect/close and transforms |
| * String into ByteBuf. |
| */ |
| public class DrillClient implements Closeable, ConnectionThrottle { |
| private static Logger logger = LoggerFactory.getLogger(DrillClient.class); |
| public static final String DEFAULT_CLIENT_NAME = "Apache Drill Java client"; |
| private static final ObjectMapper objectMapper = new ObjectMapper(); |
| |
| private final DrillConfig config; |
| private UserClient client; |
| private DrillProperties properties; |
| private volatile ClusterCoordinator clusterCoordinator; |
| private volatile boolean connected; |
| private final BufferAllocator allocator; |
| private final int reconnectTimes; |
| private final int reconnectDelay; |
| private boolean supportComplexTypes; |
| private final boolean ownsZkConnection; |
| private final boolean ownsAllocator; |
| private final boolean isDirectConnection; // true if the connection bypasses zookeeper and connects directly to a drillbit |
| private EventLoopGroup eventLoopGroup; |
| private ExecutorService executor; |
| private String clientName = DEFAULT_CLIENT_NAME; |
| |
| public DrillClient() throws OutOfMemoryException { |
| this(DrillConfig.create(), false); |
| } |
| |
| public DrillClient(boolean isDirect) throws OutOfMemoryException { |
| this(DrillConfig.create(), isDirect); |
| } |
| |
| public DrillClient(String fileName) throws OutOfMemoryException { |
| this(DrillConfig.create(fileName), false); |
| } |
| |
| public DrillClient(DrillConfig config) throws OutOfMemoryException { |
| this(config, null, false); |
| } |
| |
| public DrillClient(DrillConfig config, boolean isDirect) |
| throws OutOfMemoryException { |
| this(config, null, isDirect); |
| } |
| |
| public DrillClient(DrillConfig config, ClusterCoordinator coordinator) |
| throws OutOfMemoryException { |
| this(config, coordinator, null, false); |
| } |
| |
| public DrillClient(DrillConfig config, ClusterCoordinator coordinator, boolean isDirect) |
| throws OutOfMemoryException { |
| this(config, coordinator, null, isDirect); |
| } |
| |
| public DrillClient(DrillConfig config, ClusterCoordinator coordinator, BufferAllocator allocator) |
| throws OutOfMemoryException { |
| this(config, coordinator, allocator, false); |
| } |
| |
| public DrillClient(DrillConfig config, ClusterCoordinator coordinator, BufferAllocator allocator, boolean isDirect) { |
| // if isDirect is true, the client will connect directly to the drillbit instead of |
| // going thru the zookeeper |
| this.isDirectConnection = isDirect; |
| this.ownsZkConnection = coordinator == null && !isDirect; |
| this.ownsAllocator = allocator == null; |
| this.allocator = ownsAllocator ? RootAllocatorFactory.newRoot(config) : allocator; |
| this.config = config; |
| this.clusterCoordinator = coordinator; |
| this.reconnectTimes = config.getInt(ExecConstants.BIT_RETRY_TIMES); |
| this.reconnectDelay = config.getInt(ExecConstants.BIT_RETRY_DELAY); |
| this.supportComplexTypes = config.getBoolean(ExecConstants.CLIENT_SUPPORT_COMPLEX_TYPES); |
| } |
| |
| public DrillConfig getConfig() { |
| return config; |
| } |
| |
| @Override |
| public void setAutoRead(boolean enableAutoRead) { |
| client.setAutoRead(enableAutoRead); |
| } |
| |
| /** |
| * Sets the client name. |
| * |
| * If not set, default is {@code DrillClient#DEFAULT_CLIENT_NAME}. |
| * |
| * @param name the client name |
| * |
| * @throws IllegalStateException if called after a connection has been established. |
| * @throws NullPointerException if client name is null |
| */ |
| public void setClientName(String name) { |
| if (connected) { |
| throw new IllegalStateException("Attempted to modify client connection property after connection has been established."); |
| } |
| this.clientName = checkNotNull(name, "client name should not be null"); |
| } |
| |
| /** |
| * Sets whether the application is willing to accept complex types (Map, |
| * Arrays) in the returned result set. Default is {@code true}. If set to |
| * {@code false}, the complex types are returned as JSON encoded VARCHAR type. |
| * |
| * @throws IllegalStateException |
| * if called after a connection has been established. |
| */ |
| public void setSupportComplexTypes(boolean supportComplexTypes) { |
| if (connected) { |
| throw new IllegalStateException("Attempted to modify client connection property after connection has been established."); |
| } |
| this.supportComplexTypes = supportComplexTypes; |
| } |
| |
| /** |
| * Connects the client to a Drillbit server |
| * |
| * @throws RpcException |
| */ |
| public void connect() throws RpcException { |
| connect(null, new Properties()); |
| } |
| |
| /** |
| * Start's a connection from client to server |
| * @param props - not null {@link Properties} filled with connection url parameters |
| * @throws RpcException |
| */ |
| public void connect(Properties props) throws RpcException { |
| connect(null, props); |
| } |
| |
| /** |
| * Populates the endpointlist with drillbits information provided in the connection string by client. |
| * For direct connection we can have connection string with drillbit property as below: |
| * <dl> |
| * <dt>drillbit=ip</dt> |
| * <dd>use the ip specified as the Foreman ip with default port in config file</dd> |
| * <dt>drillbit=ip:port</dt> |
| * <dd>use the ip and port specified as the Foreman ip and port</dd> |
| * <dt>drillbit=ip1:port1,ip2:port2,...</dt> |
| * <dd>randomly select the ip and port pair from the specified list as the Foreman ip and port.</dd> |
| * </dl> |
| * |
| * @param drillbits string with drillbit value provided in connection string |
| * @param defaultUserPort string with default userport of drillbit specified in config file |
| * @return list of drillbit endpoints parsed from connection string |
| * @throws InvalidConnectionInfoException if the connection string has invalid or no drillbit information |
| */ |
| static List<DrillbitEndpoint> parseAndVerifyEndpoints(String drillbits, String defaultUserPort) |
| throws InvalidConnectionInfoException { |
| // If no drillbits is provided then throw exception |
| drillbits = drillbits.trim(); |
| if (drillbits.isEmpty()) { |
| throw new InvalidConnectionInfoException("No drillbit information specified in the connection string"); |
| } |
| |
| final List<DrillbitEndpoint> endpointList = new ArrayList<>(); |
| final String[] connectInfo = drillbits.split(","); |
| |
| // Fetch ip address and port information for each drillbit and populate the list |
| for (String drillbit : connectInfo) { |
| |
| // Trim all the empty spaces and check if the entry is empty string. |
| // Ignore the empty ones. |
| drillbit = drillbit.trim(); |
| |
| if (!drillbit.isEmpty()) { |
| // Verify if we have only ":" or only ":port" pattern |
| if (drillbit.charAt(0) == ':') { |
| // Invalid drillbit information |
| throw new InvalidConnectionInfoException("Malformed connection string with drillbit hostname or " + |
| "hostaddress missing for an entry: " + drillbit); |
| } |
| |
| // We are now sure that each ip:port entry will have both the values atleast once. |
| // Split each drillbit connection string to get ip address and port value |
| final String[] drillbitInfo = drillbit.split(":"); |
| |
| // Check if we have more than one port |
| if (drillbitInfo.length > 2) { |
| throw new InvalidConnectionInfoException("Malformed connection string with more than one port in a " + |
| "drillbit entry: " + drillbit); |
| } |
| |
| // At this point we are sure that drillbitInfo has atleast hostname or host address |
| // trim all the empty spaces which might be present in front of hostname or |
| // host address information |
| final String ipAddress = drillbitInfo[0].trim(); |
| String port = defaultUserPort; |
| |
| if (drillbitInfo.length == 2) { |
| // We have a port value also given by user. trim all the empty spaces between : and port value before |
| // validating the correctness of value. |
| port = drillbitInfo[1].trim(); |
| } |
| |
| try { |
| final DrillbitEndpoint endpoint = DrillbitEndpoint.newBuilder() |
| .setAddress(ipAddress) |
| .setUserPort(Integer.parseInt(port)) |
| .build(); |
| |
| endpointList.add(endpoint); |
| } catch (NumberFormatException e) { |
| throw new InvalidConnectionInfoException("Malformed port value in entry: " + ipAddress + ":" + port + " " + |
| "passed in connection string"); |
| } |
| } |
| } |
| if (endpointList.size() == 0) { |
| throw new InvalidConnectionInfoException("No valid drillbit information specified in the connection string"); |
| } |
| return endpointList; |
| } |
| |
| /** |
| * Start's a connection from client to server |
| * @param connect - Zookeeper connection string provided at connection URL |
| * @param props - not null {@link Properties} filled with connection url parameters |
| * @throws RpcException |
| */ |
| public synchronized void connect(String connect, Properties props) throws RpcException { |
| if (connected) { |
| return; |
| } |
| |
| properties = DrillProperties.createFromProperties(props); |
| final List<DrillbitEndpoint> endpoints = new ArrayList<>(); |
| |
| if (isDirectConnection) { |
| // Populate the endpoints list with all the drillbit information provided in the connection string |
| endpoints.addAll(parseAndVerifyEndpoints(properties.getProperty(DrillProperties.DRILLBIT_CONNECTION), |
| config.getString(ExecConstants.INITIAL_USER_PORT))); |
| } else { |
| if (ownsZkConnection) { |
| try { |
| this.clusterCoordinator = new ZKClusterCoordinator(this.config, connect); |
| this.clusterCoordinator.start(10000); |
| } catch (Exception e) { |
| throw new RpcException("Failure setting up ZK for client.", e); |
| } |
| } |
| // Gets the drillbit endpoints that are ONLINE and excludes the drillbits that are |
| // in QUIESCENT state. This avoids the clients connecting to drillbits that are |
| // shutting down thereby avoiding reducing the chances of query failures. |
| endpoints.addAll(clusterCoordinator.getOnlineEndPoints()); |
| // Make sure we have at least one endpoint in the list |
| checkState(!endpoints.isEmpty(), "No active Drillbit endpoint found from ZooKeeper. Check connection parameters?"); |
| } |
| |
| // shuffle the collection then get the first endpoint |
| Collections.shuffle(endpoints); |
| |
| eventLoopGroup = createEventLoop(config.getInt(ExecConstants.CLIENT_RPC_THREADS), "Client-"); |
| executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, |
| new SynchronousQueue<Runnable>(), |
| new NamedThreadFactory("drill-client-executor-")) { |
| @Override |
| protected void afterExecute(final Runnable r, final Throwable t) { |
| if (t != null) { |
| logger.error("{}.run() leaked an exception.", r.getClass().getName(), t); |
| } |
| super.afterExecute(r, t); |
| } |
| }; |
| |
| final String connectTriesConf = properties.getProperty(DrillProperties.TRIES, "5"); |
| int connectTriesVal; |
| try { |
| connectTriesVal = Math.min(endpoints.size(), Integer.parseInt(connectTriesConf)); |
| } catch (NumberFormatException e) { |
| throw new InvalidConnectionInfoException("Invalid tries value: " + connectTriesConf + " specified in " + |
| "connection string"); |
| } |
| |
| // If the value provided in the connection string is <=0 then override with 1 since we want to try connecting |
| // at least once |
| connectTriesVal = Math.max(1, connectTriesVal); |
| |
| int triedEndpointIndex = 0; |
| DrillbitEndpoint endpoint; |
| |
| while (triedEndpointIndex < connectTriesVal) { |
| endpoint = endpoints.get(triedEndpointIndex); |
| |
| // Set in both props and properties since props is passed to UserClient |
| // TODO: Logically here it's doing putIfAbsent, please change to use that api once JDK 8 is minimum required |
| // version |
| if (!properties.containsKey(DrillProperties.SERVICE_HOST)) { |
| properties.setProperty(DrillProperties.SERVICE_HOST, endpoint.getAddress()); |
| props.setProperty(DrillProperties.SERVICE_HOST, endpoint.getAddress()); |
| } |
| |
| // Note: the properties member is a DrillProperties instance which lower cases names of |
| // properties. That does not work too well with properties that are mixed case. |
| // For user client severla properties are mixed case so we do not use the properties member |
| // but instead pass the props parameter. |
| client = new UserClient(clientName, config, props, supportComplexTypes, allocator, eventLoopGroup, executor, endpoint); |
| logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort()); |
| |
| try { |
| connect(endpoint); |
| connected = true; |
| logger.info("Successfully connected to server {}:{}", endpoint.getAddress(), endpoint.getUserPort()); |
| break; |
| } catch (NonTransientRpcException ex) { |
| logger.error("Connection to {}:{} failed with error {}. Not retrying anymore", endpoint.getAddress(), |
| endpoint.getUserPort(), ex.getMessage()); |
| throw ex; |
| } catch (RpcException ex) { |
| ++triedEndpointIndex; |
| logger.error("Attempt {}: Failed to connect to server {}:{}", triedEndpointIndex, endpoint.getAddress(), |
| endpoint.getUserPort()); |
| |
| // Throw exception when we have exhausted all the tries without having a successful connection |
| if (triedEndpointIndex == connectTriesVal) { |
| throw ex; |
| } |
| |
| // Close the connection here to avoid calling close twice in case when all tries are exhausted. |
| // Since DrillClient.close is also calling client.close |
| client.close(); |
| } |
| } |
| } |
| |
| protected static EventLoopGroup createEventLoop(int size, String prefix) { |
| return TransportCheck.createEventLoopGroup(size, prefix); |
| } |
| |
| public synchronized boolean reconnect() { |
| if (client.isActive()) { |
| return true; |
| } |
| int retry = reconnectTimes; |
| while (retry > 0) { |
| retry--; |
| try { |
| Thread.sleep(this.reconnectDelay); |
| // Gets the drillbit endpoints that are ONLINE and excludes the drillbits that are |
| // in QUIESCENT state. This avoids the clients connecting to drillbits that are |
| // shutting down thereby reducing the chances of query failures. |
| final ArrayList<DrillbitEndpoint> endpoints = new ArrayList<>(clusterCoordinator.getOnlineEndPoints()); |
| if (endpoints.isEmpty()) { |
| continue; |
| } |
| client.close(); |
| Collections.shuffle(endpoints); |
| connect(endpoints.iterator().next()); |
| return true; |
| } catch (Exception e) { |
| } |
| } |
| return false; |
| } |
| |
| private void connect(DrillbitEndpoint endpoint) throws RpcException { |
| client.connect(endpoint, properties, getUserCredentials()); |
| logger.info("Foreman drillbit is {}", endpoint.getAddress()); |
| } |
| |
| public BufferAllocator getAllocator() { |
| return allocator; |
| } |
| |
| /** |
| * Closes this client's connection to the server |
| */ |
| @Override |
| public void close() { |
| if (client != null) { |
| client.close(); |
| } |
| if (ownsAllocator && allocator != null) { |
| DrillAutoCloseables.closeNoChecked(allocator); |
| } |
| if (ownsZkConnection) { |
| if (clusterCoordinator != null) { |
| try { |
| clusterCoordinator.close(); |
| clusterCoordinator = null; |
| } catch (Exception e) { |
| logger.warn("Error while closing Cluster Coordinator.", e); |
| } |
| } |
| } |
| if (eventLoopGroup != null) { |
| eventLoopGroup.shutdownGracefully(); |
| eventLoopGroup = null; |
| } |
| |
| if (executor != null) { |
| executor.shutdownNow(); |
| executor = null; |
| } |
| |
| connected = false; |
| } |
| |
| /** |
| * Return the server infos. Only available after connecting |
| * |
| * The result might be null if the server doesn't provide the informations. |
| * |
| * @return the server informations, or null if not connected or if the server |
| * doesn't provide the information |
| * @deprecated use {@code DrillClient#getServerVersion()} |
| */ |
| @Deprecated |
| public RpcEndpointInfos getServerInfos() { |
| return client != null ? client.getServerInfos() : null; |
| } |
| |
| /** |
| * Return the server name. Only available after connecting |
| * |
| * The result might be null if the server doesn't provide the name information. |
| * |
| * @return the server name, or null if not connected or if the server |
| * doesn't provide the name |
| * @return The server name. |
| */ |
| public String getServerName() { |
| return (client != null && client.getServerInfos() != null) ? client.getServerInfos().getName() : null; |
| } |
| |
| /** |
| * Return the server version. Only available after connecting |
| * |
| * The result might be null if the server doesn't provide the version information. |
| * |
| * @return the server version, or null if not connected or if the server |
| * doesn't provide the version |
| * @return The server version. |
| */ |
| public Version getServerVersion() { |
| return (client != null && client.getServerInfos() != null) ? UserRpcUtils.getVersion(client.getServerInfos()) : null; |
| } |
| |
| /** |
| * Get server meta information |
| * |
| * Get meta information about the server like the the available functions |
| * or the identifier quoting string used by the current session |
| * |
| * @return a future to the server meta response |
| */ |
| public DrillRpcFuture<GetServerMetaResp> getServerMeta() { |
| return client.send(RpcType.GET_SERVER_META, GetServerMetaReq.getDefaultInstance(), GetServerMetaResp.class); |
| } |
| |
| /** |
| * Returns the list of methods supported by the server based on its advertised information. |
| * |
| * @return an immutable set of capabilities |
| */ |
| public Set<ServerMethod> getSupportedMethods() { |
| return client != null ? ServerMethod.getSupportedMethods(client.getSupportedMethods(), client.getServerInfos()) : null; |
| } |
| |
| /** |
| * Submits a string based query plan for execution and returns the result batches. Supported query types are: |
| * <p><ul> |
| * <li>{@link QueryType#LOGICAL} |
| * <li>{@link QueryType#PHYSICAL} |
| * <li>{@link QueryType#SQL} |
| * </ul> |
| * |
| * @param type Query type |
| * @param plan Query to execute |
| * @return a handle for the query result |
| * @throws RpcException |
| */ |
| public List<QueryDataBatch> runQuery(QueryType type, String plan) throws RpcException { |
| checkArgument(type == QueryType.LOGICAL || type == QueryType.PHYSICAL || type == QueryType.SQL, |
| String.format("Only query types %s, %s and %s are supported in this API", |
| QueryType.LOGICAL, QueryType.PHYSICAL, QueryType.SQL)); |
| final UserProtos.RunQuery query = newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build(); |
| final ListHoldingResultsListener listener = new ListHoldingResultsListener(query); |
| client.submitQuery(listener, query); |
| return listener.getResults(); |
| } |
| |
| /** |
| * API to just plan a query without execution |
| * |
| * @param type |
| * @param query |
| * @param isSplitPlan |
| * option to tell whether to return single or split plans for a query |
| * @return list of PlanFragments that can be used later on in |
| * {@link #runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType, |
| * java.util.List, org.apache.drill.exec.rpc.user.UserResultsListener)} |
| * to run a query without additional planning |
| */ |
| public DrillRpcFuture<QueryPlanFragments> planQuery(QueryType type, String query, boolean isSplitPlan) { |
| GetQueryPlanFragments runQuery = GetQueryPlanFragments.newBuilder().setQuery(query).setType(type).setSplitPlan(isSplitPlan).build(); |
| return client.planQuery(runQuery); |
| } |
| |
| /** |
| * Run query based on list of fragments that were supposedly produced during query planning phase. Supported |
| * query type is {@link QueryType#EXECUTION} |
| * @param type |
| * @param planFragments |
| * @param resultsListener |
| * @throws RpcException |
| */ |
| public void runQuery(QueryType type, List<PlanFragment> planFragments, UserResultsListener resultsListener) |
| throws RpcException { |
| // QueryType can be only executional |
| checkArgument((QueryType.EXECUTION == type), "Only EXECUTION type query is supported with PlanFragments"); |
| // setting Plan on RunQuery will be used for logging purposes and therefore can not be null |
| // since there is no Plan string provided we will create a JsonArray out of individual fragment Plans |
| ArrayNode jsonArray = objectMapper.createArrayNode(); |
| for (PlanFragment fragment : planFragments) { |
| try { |
| jsonArray.add(objectMapper.readTree(fragment.getFragmentJson())); |
| } catch (IOException e) { |
| logger.error("Exception while trying to read PlanFragment JSON for {}", fragment.getHandle().getQueryId(), e); |
| throw new RpcException(e); |
| } |
| } |
| final String fragmentsToJsonString; |
| try { |
| fragmentsToJsonString = objectMapper.writeValueAsString(jsonArray); |
| } catch (JsonProcessingException e) { |
| logger.error("Exception while trying to get JSONString from Array of individual Fragments Json for %s", e); |
| throw new RpcException(e); |
| } |
| final UserProtos.RunQuery query = newBuilder().setType(type).addAllFragments(planFragments) |
| .setPlan(fragmentsToJsonString) |
| .setResultsMode(STREAM_FULL).build(); |
| client.submitQuery(resultsListener, query); |
| } |
| |
| /** |
| * Helper method to generate the UserCredentials message from the properties. |
| */ |
| private UserBitShared.UserCredentials getUserCredentials() { |
| String userName = properties.getProperty(DrillProperties.USER); |
| if (Strings.isNullOrEmpty(userName)) { |
| userName = "anonymous"; // if username is not propagated as one of the properties |
| } |
| return UserBitShared.UserCredentials.newBuilder() |
| .setUserName(userName) |
| .build(); |
| } |
| |
| public DrillRpcFuture<Ack> cancelQuery(QueryId id) { |
| if(logger.isDebugEnabled()) { |
| logger.debug("Cancelling query {}", QueryIdHelper.getQueryId(id)); |
| } |
| return client.send(RpcType.CANCEL_QUERY, id, Ack.class); |
| } |
| |
| public DrillRpcFuture<Ack> resumeQuery(final QueryId queryId) { |
| if(logger.isDebugEnabled()) { |
| logger.debug("Resuming query {}", QueryIdHelper.getQueryId(queryId)); |
| } |
| return client.send(RpcType.RESUME_PAUSED_QUERY, queryId, Ack.class); |
| } |
| |
| /** |
| * Get the list of catalogs in {@code INFORMATION_SCHEMA.CATALOGS} table satisfying the given filters. |
| * |
| * @param catalogNameFilter Filter on {@code catalog name}. Pass null to apply no filter. |
| * @return The list of catalogs in {@code INFORMATION_SCHEMA.CATALOGS} table satisfying the given filters. |
| */ |
| public DrillRpcFuture<GetCatalogsResp> getCatalogs(LikeFilter catalogNameFilter) { |
| final GetCatalogsReq.Builder reqBuilder = GetCatalogsReq.newBuilder(); |
| if (catalogNameFilter != null) { |
| reqBuilder.setCatalogNameFilter(catalogNameFilter); |
| } |
| |
| return client.send(RpcType.GET_CATALOGS, reqBuilder.build(), GetCatalogsResp.class); |
| } |
| |
| /** |
| * Get the list of schemas in {@code INFORMATION_SCHEMA.SCHEMATA} table satisfying the given filters. |
| * |
| * @param catalogNameFilter Filter on {@code catalog name}. Pass null to apply no filter. |
| * @param schemaNameFilter Filter on {@code schema name}. Pass null to apply no filter. |
| * @return The list of schemas in {@code INFORMATION_SCHEMA.SCHEMATA} table satisfying the given filters. |
| */ |
| public DrillRpcFuture<GetSchemasResp> getSchemas(LikeFilter catalogNameFilter, LikeFilter schemaNameFilter) { |
| final GetSchemasReq.Builder reqBuilder = GetSchemasReq.newBuilder(); |
| if (catalogNameFilter != null) { |
| reqBuilder.setCatalogNameFilter(catalogNameFilter); |
| } |
| |
| if (schemaNameFilter != null) { |
| reqBuilder.setSchemaNameFilter(schemaNameFilter); |
| } |
| |
| return client.send(RpcType.GET_SCHEMAS, reqBuilder.build(), GetSchemasResp.class); |
| } |
| |
| /** |
| * Get the list of tables in {@code INFORMATION_SCHEMA.TABLES} table satisfying the given filters. |
| * |
| * @param catalogNameFilter Filter on {@code catalog name}. Pass null to apply no filter. |
| * @param schemaNameFilter Filter on {@code schema name}. Pass null to apply no filter. |
| * @param tableNameFilter Filter in {@code table name}. Pass null to apply no filter. |
| * @param tableTypeFilter Filter in {@code table type}. Pass null to apply no filter |
| * @return The list of tables in {@code INFORMATION_SCHEMA.TABLES} table satisfying the given filters. |
| */ |
| public DrillRpcFuture<GetTablesResp> getTables(LikeFilter catalogNameFilter, LikeFilter schemaNameFilter, |
| LikeFilter tableNameFilter, List<String> tableTypeFilter) { |
| final GetTablesReq.Builder reqBuilder = GetTablesReq.newBuilder(); |
| if (catalogNameFilter != null) { |
| reqBuilder.setCatalogNameFilter(catalogNameFilter); |
| } |
| |
| if (schemaNameFilter != null) { |
| reqBuilder.setSchemaNameFilter(schemaNameFilter); |
| } |
| |
| if (tableNameFilter != null) { |
| reqBuilder.setTableNameFilter(tableNameFilter); |
| } |
| |
| if (tableTypeFilter != null) { |
| reqBuilder.addAllTableTypeFilter(tableTypeFilter); |
| } |
| |
| return client.send(RpcType.GET_TABLES, reqBuilder.build(), GetTablesResp.class); |
| } |
| |
| /** |
| * Get the list of columns in {@code INFORMATION_SCHEMA.COLUMNS} table satisfying the given filters. |
| * |
| * @param catalogNameFilter Filter on {@code catalog name}. Pass null to apply no filter. |
| * @param schemaNameFilter Filter on {@code schema name}. Pass null to apply no filter. |
| * @param tableNameFilter Filter in {@code table name}. Pass null to apply no filter. |
| * @param columnNameFilter Filter in {@code column name}. Pass null to apply no filter. |
| * @return The list of columns in {@code INFORMATION_SCHEMA.COLUMNS} table satisfying the given filters. |
| */ |
| public DrillRpcFuture<GetColumnsResp> getColumns(LikeFilter catalogNameFilter, LikeFilter schemaNameFilter, |
| LikeFilter tableNameFilter, LikeFilter columnNameFilter) { |
| final GetColumnsReq.Builder reqBuilder = GetColumnsReq.newBuilder(); |
| if (catalogNameFilter != null) { |
| reqBuilder.setCatalogNameFilter(catalogNameFilter); |
| } |
| |
| if (schemaNameFilter != null) { |
| reqBuilder.setSchemaNameFilter(schemaNameFilter); |
| } |
| |
| if (tableNameFilter != null) { |
| reqBuilder.setTableNameFilter(tableNameFilter); |
| } |
| |
| if (columnNameFilter != null) { |
| reqBuilder.setColumnNameFilter(columnNameFilter); |
| } |
| |
| return client.send(RpcType.GET_COLUMNS, reqBuilder.build(), GetColumnsResp.class); |
| } |
| |
| /** |
| * Create a prepared statement for given the {@code query}. |
| * |
| * @param query |
| * @return The prepared statement for given the {@code query}. |
| */ |
| public DrillRpcFuture<CreatePreparedStatementResp> createPreparedStatement(final String query) { |
| final CreatePreparedStatementReq req = |
| CreatePreparedStatementReq.newBuilder() |
| .setSqlQuery(query) |
| .build(); |
| |
| return client.send(RpcType.CREATE_PREPARED_STATEMENT, req, CreatePreparedStatementResp.class); |
| } |
| |
| /** |
| * Execute the given prepared statement. |
| * |
| * @param preparedStatementHandle Prepared statement handle returned in response to |
| * {@link #createPreparedStatement(String)}. |
| * @param resultsListener {@link UserResultsListener} instance for listening for query results. |
| */ |
| public void executePreparedStatement(final PreparedStatementHandle preparedStatementHandle, |
| final UserResultsListener resultsListener) { |
| final RunQuery runQuery = newBuilder() |
| .setResultsMode(STREAM_FULL) |
| .setType(QueryType.PREPARED_STATEMENT) |
| .setPreparedStatementHandle(preparedStatementHandle) |
| .build(); |
| client.submitQuery(resultsListener, runQuery); |
| } |
| |
| /** |
| * Execute the given prepared statement and return the results. |
| * |
| * @param preparedStatementHandle Prepared statement handle returned in response to |
| * {@link #createPreparedStatement(String)}. |
| * @return List of {@link QueryDataBatch}s. It is responsibility of the caller to release query data batches. |
| * @throws RpcException |
| */ |
| @VisibleForTesting |
| public List<QueryDataBatch> executePreparedStatement(final PreparedStatementHandle preparedStatementHandle) |
| throws RpcException { |
| final RunQuery runQuery = newBuilder() |
| .setResultsMode(STREAM_FULL) |
| .setType(QueryType.PREPARED_STATEMENT) |
| .setPreparedStatementHandle(preparedStatementHandle) |
| .build(); |
| |
| final ListHoldingResultsListener resultsListener = new ListHoldingResultsListener(runQuery); |
| |
| client.submitQuery(resultsListener, runQuery); |
| |
| return resultsListener.getResults(); |
| } |
| |
| /** |
| * Submits a Logical plan for direct execution (bypasses parsing) |
| * |
| * @param plan the plan to execute |
| */ |
| public void runQuery(QueryType type, String plan, UserResultsListener resultsListener) { |
| client.submitQuery(resultsListener, newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build()); |
| } |
| |
| /** |
| * @return true if client has connection and it is connected, false otherwise |
| */ |
| public boolean connectionIsActive() { |
| return client.isActive(); |
| } |
| |
| /** |
| * Verify connection with request-answer. |
| * |
| * @param timeoutSec time in seconds to wait answer receiving. If 0 then won't wait. |
| * @return true if {@link GeneralRPCProtos.RpcMode#PONG PONG} received until timeout, false otherwise |
| */ |
| public boolean hasPing(long timeoutSec) throws DrillRuntimeException { |
| if (timeoutSec < 0) { |
| timeoutSec = 0; |
| } |
| return client.hasPing(timeoutSec); |
| } |
| |
| private class ListHoldingResultsListener implements UserResultsListener { |
| private final Vector<QueryDataBatch> results = new Vector<>(); |
| private final SettableFuture<List<QueryDataBatch>> future = SettableFuture.create(); |
| private final UserProtos.RunQuery query; |
| |
| public ListHoldingResultsListener(UserProtos.RunQuery query) { |
| logger.debug( "Listener created for query \"{}\"", query ); |
| this.query = query; |
| } |
| |
| @Override |
| public void submissionFailed(UserException ex) { |
| // or !client.isActive() |
| if (ex.getCause() instanceof ChannelClosedException) { |
| if (reconnect()) { |
| try { |
| client.submitQuery(this, query); |
| } catch (Exception e) { |
| fail(e); |
| } |
| } else { |
| fail(ex); |
| } |
| } else { |
| fail(ex); |
| } |
| } |
| |
| @Override |
| public void queryCompleted(QueryState state) { |
| future.set(results); |
| } |
| |
| private void fail(Exception ex) { |
| logger.debug("Submission failed.", ex); |
| future.setException(ex); |
| future.set(results); |
| } |
| |
| @Override |
| public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) { |
| logger.debug("Result arrived: Row count: {}", result.getHeader().getRowCount()); |
| logger.trace("Result batch: {}", result); |
| results.add(result); |
| } |
| |
| public List<QueryDataBatch> getResults() throws RpcException { |
| try { |
| return future.get(); |
| } catch (Throwable t) { |
| /* |
| * Since we're not going to return the result to the caller |
| * to clean up, we have to do it. |
| */ |
| for(final QueryDataBatch queryDataBatch : results) { |
| queryDataBatch.release(); |
| } |
| |
| throw RpcException.mapException(t); |
| } |
| } |
| |
| @Override |
| public void queryIdArrived(QueryId queryId) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Query ID arrived: {}", QueryIdHelper.getQueryId(queryId)); |
| } |
| } |
| } |
| } |