DRILL-7730: Improve web query efficiency
Implements a direct transfer of batches from Screen to web client.
Cleans up web client query processing to avoid duplicate schema
info.
Much related code cleanup.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 237aba1..fdbb971 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -93,6 +93,8 @@
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.drill.shaded.guava.com.google.common.base.Strings;
import org.apache.drill.shaded.guava.com.google.common.util.concurrent.SettableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import io.netty.channel.EventLoopGroup;
@@ -101,16 +103,15 @@
* String into ByteBuf.
*/
public class DrillClient implements Closeable, ConnectionThrottle {
+ private static Logger logger = LoggerFactory.getLogger(DrillClient.class);
public static final String DEFAULT_CLIENT_NAME = "Apache Drill Java client";
-
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillClient.class);
-
private static final ObjectMapper objectMapper = new ObjectMapper();
+
private final DrillConfig config;
private UserClient client;
private DrillProperties properties;
private volatile ClusterCoordinator clusterCoordinator;
- private volatile boolean connected = false;
+ private volatile boolean connected;
private final BufferAllocator allocator;
private final int reconnectTimes;
private final int reconnectDelay;
@@ -199,10 +200,12 @@
}
/**
- * Sets whether the application is willing to accept complex types (Map, Arrays) in the returned result set.
- * Default is {@code true}. If set to {@code false}, the complex types are returned as JSON encoded VARCHAR type.
+ * Sets whether the application is willing to accept complex types (Map,
+ * Arrays) in the returned result set. Default is {@code true}. If set to
+ * {@code false}, the complex types are returned as JSON encoded VARCHAR type.
*
- * @throws IllegalStateException if called after a connection has been established.
+ * @throws IllegalStateException
+ * if called after a connection has been established.
*/
public void setSupportComplexTypes(boolean supportComplexTypes) {
if (connected) {
@@ -468,10 +471,10 @@
*/
@Override
public void close() {
- if (this.client != null) {
- this.client.close();
+ if (client != null) {
+ client.close();
}
- if (this.ownsAllocator && allocator != null) {
+ if (ownsAllocator && allocator != null) {
DrillAutoCloseables.closeNoChecked(allocator);
}
if (ownsZkConnection) {
@@ -486,19 +489,17 @@
}
if (eventLoopGroup != null) {
eventLoopGroup.shutdownGracefully();
+ eventLoopGroup = null;
}
if (executor != null) {
executor.shutdownNow();
+ executor = null;
}
- // TODO: Did DRILL-1735 changes cover this TODO?:
- // TODO: fix tests that fail when this is called.
- //allocator.close();
connected = false;
}
-
/**
* Return the server infos. Only available after connecting
*
@@ -585,11 +586,15 @@
/**
* API to just plan a query without execution
+ *
* @param type
* @param query
- * @param isSplitPlan - option to tell whether to return single or split plans for a query
- * @return list of PlanFragments that can be used later on in {@link #runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType, java.util.List, org.apache.drill.exec.rpc.user.UserResultsListener)}
- * to run a query without additional planning
+ * @param isSplitPlan
+ * option to tell whether to return single or split plans for a query
+ * @return list of PlanFragments that can be used later on in
+ * {@link #runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType,
+ * java.util.List, org.apache.drill.exec.rpc.user.UserResultsListener)}
+ * to run a query without additional planning
*/
public DrillRpcFuture<QueryPlanFragments> planQuery(QueryType type, String query, boolean isSplitPlan) {
GetQueryPlanFragments runQuery = GetQueryPlanFragments.newBuilder().setQuery(query).setType(type).setSplitPlan(isSplitPlan).build();
@@ -632,7 +637,7 @@
client.submitQuery(resultsListener, query);
}
- /*
+ /**
* Helper method to generate the UserCredentials message from the properties.
*/
private UserBitShared.UserCredentials getUserCredentials() {
@@ -660,10 +665,10 @@
}
/**
- * Get the list of catalogs in <code>INFORMATION_SCHEMA.CATALOGS</code> table satisfying the given filters.
+ * Get the list of catalogs in {@code INFORMATION_SCHEMA.CATALOGS} table satisfying the given filters.
*
- * @param catalogNameFilter Filter on <code>catalog name</code>. Pass null to apply no filter.
- * @return The list of catalogs in <code>INFORMATION_SCHEMA.CATALOGS</code> table satisfying the given filters.
+ * @param catalogNameFilter Filter on {@code catalog name}. Pass null to apply no filter.
+ * @return The list of catalogs in {@code INFORMATION_SCHEMA.CATALOGS} table satisfying the given filters.
*/
public DrillRpcFuture<GetCatalogsResp> getCatalogs(LikeFilter catalogNameFilter) {
final GetCatalogsReq.Builder reqBuilder = GetCatalogsReq.newBuilder();
@@ -675,11 +680,11 @@
}
/**
- * Get the list of schemas in <code>INFORMATION_SCHEMA.SCHEMATA</code> table satisfying the given filters.
+ * Get the list of schemas in {@code INFORMATION_SCHEMA.SCHEMATA} table satisfying the given filters.
*
- * @param catalogNameFilter Filter on <code>catalog name</code>. Pass null to apply no filter.
- * @param schemaNameFilter Filter on <code>schema name</code>. Pass null to apply no filter.
- * @return The list of schemas in <code>INFORMATION_SCHEMA.SCHEMATA</code> table satisfying the given filters.
+ * @param catalogNameFilter Filter on {@code catalog name}. Pass null to apply no filter.
+ * @param schemaNameFilter Filter on {@code schema name}. Pass null to apply no filter.
+ * @return The list of schemas in {@code INFORMATION_SCHEMA.SCHEMATA} table satisfying the given filters.
*/
public DrillRpcFuture<GetSchemasResp> getSchemas(LikeFilter catalogNameFilter, LikeFilter schemaNameFilter) {
final GetSchemasReq.Builder reqBuilder = GetSchemasReq.newBuilder();
@@ -695,13 +700,13 @@
}
/**
- * Get the list of tables in <code>INFORMATION_SCHEMA.TABLES</code> table satisfying the given filters.
+ * Get the list of tables in {@code INFORMATION_SCHEMA.TABLES} table satisfying the given filters.
*
- * @param catalogNameFilter Filter on <code>catalog name</code>. Pass null to apply no filter.
- * @param schemaNameFilter Filter on <code>schema name</code>. Pass null to apply no filter.
- * @param tableNameFilter Filter in <code>table name</code>. Pass null to apply no filter.
- * @param tableTypeFilter Filter in <code>table type</code>. Pass null to apply no filter
- * @return The list of tables in <code>INFORMATION_SCHEMA.TABLES</code> table satisfying the given filters.
+ * @param catalogNameFilter Filter on {@code catalog name}. Pass null to apply no filter.
+ * @param schemaNameFilter Filter on {@code schema name}. Pass null to apply no filter.
+ * @param tableNameFilter Filter in {@code table name}. Pass null to apply no filter.
+ * @param tableTypeFilter Filter in {@code table type}. Pass null to apply no filter
+ * @return The list of tables in {@code INFORMATION_SCHEMA.TABLES} table satisfying the given filters.
*/
public DrillRpcFuture<GetTablesResp> getTables(LikeFilter catalogNameFilter, LikeFilter schemaNameFilter,
LikeFilter tableNameFilter, List<String> tableTypeFilter) {
@@ -726,13 +731,13 @@
}
/**
- * Get the list of columns in <code>INFORMATION_SCHEMA.COLUMNS</code> table satisfying the given filters.
+ * Get the list of columns in {@code INFORMATION_SCHEMA.COLUMNS} table satisfying the given filters.
*
- * @param catalogNameFilter Filter on <code>catalog name</code>. Pass null to apply no filter.
- * @param schemaNameFilter Filter on <code>schema name</code>. Pass null to apply no filter.
- * @param tableNameFilter Filter in <code>table name</code>. Pass null to apply no filter.
- * @param columnNameFilter Filter in <code>column name</code>. Pass null to apply no filter.
- * @return The list of columns in <code>INFORMATION_SCHEMA.COLUMNS</code> table satisfying the given filters.
+ * @param catalogNameFilter Filter on {@code catalog name}. Pass null to apply no filter.
+ * @param schemaNameFilter Filter on {@code schema name}. Pass null to apply no filter.
+ * @param tableNameFilter Filter in {@code table name}. Pass null to apply no filter.
+ * @param columnNameFilter Filter in {@code column name}. Pass null to apply no filter.
+ * @return The list of columns in {@code INFORMATION_SCHEMA.COLUMNS} table satisfying the given filters.
*/
public DrillRpcFuture<GetColumnsResp> getColumns(LikeFilter catalogNameFilter, LikeFilter schemaNameFilter,
LikeFilter tableNameFilter, LikeFilter columnNameFilter) {
@@ -757,10 +762,10 @@
}
/**
- * Create a prepared statement for given the <code>query</code>.
+ * Create a prepared statement for given the {@code query}.
*
* @param query
- * @return The prepared statement for given the <code>query</code>.
+ * @return The prepared statement for given the {@code query}.
*/
public DrillRpcFuture<CreatePreparedStatementResp> createPreparedStatement(final String query) {
final CreatePreparedStatementReq req =
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DumpCat.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DumpCat.java
index 47a364a..c751897 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DumpCat.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DumpCat.java
@@ -250,7 +250,7 @@
System.out.println(getBatchMetaInfo(vcSerializable).toString());
System.out.println("Schema Information");
- for (final VectorWrapper w : vectorContainer) {
+ for (final VectorWrapper<?> w : vectorContainer) {
final MaterializedField field = w.getValueVector().getField();
System.out.println (String.format("name : %s, minor_type : %s, data_mode : %s",
field.getName(),
@@ -279,7 +279,7 @@
selectedRows = vcSerializable.getSv2().getCount();
}
- for (final VectorWrapper w : vectorContainer) {
+ for (final VectorWrapper<?> w : vectorContainer) {
totalDataSize += w.getValueVector().getBufferSize();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/InvalidConnectionInfoException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/InvalidConnectionInfoException.java
index 19e72ff..e90cfae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/InvalidConnectionInfoException.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/InvalidConnectionInfoException.java
@@ -22,6 +22,7 @@
/**
* Exception for malformed connection string from client
*/
+@SuppressWarnings("serial")
public class InvalidConnectionInfoException extends NonTransientRpcException {
public InvalidConnectionInfoException(String message) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/LoggingResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/LoggingResultsListener.java
index 25e472f..25331da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/LoggingResultsListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/LoggingResultsListener.java
@@ -36,11 +36,13 @@
import org.apache.drill.exec.util.VectorUtil;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import io.netty.buffer.DrillBuf;
public class LoggingResultsListener implements UserResultsListener {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LoggingResultsListener.class);
+ private static Logger logger = LoggerFactory.getLogger(LoggingResultsListener.class);
private final AtomicInteger count = new AtomicInteger();
private final Stopwatch w = Stopwatch.createUnstarted();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
index d60f6ae..5a5f725 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
@@ -201,7 +201,5 @@
watch.reset();
}
return 0;
-
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingUserConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingUserConnection.java
index c0c1e0b..7510cf4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingUserConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingUserConnection.java
@@ -17,7 +17,7 @@
*/
package org.apache.drill.exec.ops;
-import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
+import org.apache.drill.exec.physical.impl.materialize.QueryDataPackage;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.rpc.UserClientConnection;
@@ -31,14 +31,15 @@
private final SendingAccountor sendingAccountor;
private final RpcOutcomeListener<Ack> statusHandler;
- public AccountingUserConnection(UserClientConnection connection, SendingAccountor sendingAccountor, RpcOutcomeListener<Ack> statusHandler) {
+ public AccountingUserConnection(UserClientConnection connection, SendingAccountor sendingAccountor,
+ RpcOutcomeListener<Ack> statusHandler) {
this.connection = connection;
this.sendingAccountor = sendingAccountor;
this.statusHandler = statusHandler;
}
- public void sendData(QueryWritableBatch batch) {
+ public void sendData(QueryDataPackage data) {
sendingAccountor.increment();
- connection.sendData(statusHandler, batch);
+ connection.sendData(statusHandler, data);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 0f7ca13..3f71cbd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -19,17 +19,14 @@
import java.util.List;
-import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.AccountingUserConnection;
import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.ops.RootFragmentContext;
import org.apache.drill.exec.physical.config.Screen;
-import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
-import org.apache.drill.exec.physical.impl.materialize.RecordMaterializer;
+import org.apache.drill.exec.physical.impl.materialize.QueryDataPackage.DataPackage;
+import org.apache.drill.exec.physical.impl.materialize.QueryDataPackage.EmptyResultsPackage;
import org.apache.drill.exec.physical.impl.materialize.VectorRecordMaterializer;
-import org.apache.drill.exec.proto.UserBitShared.QueryData;
-import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
import org.apache.drill.exec.testing.ControlsInjector;
@@ -49,13 +46,18 @@
return new ScreenRoot(context, children.iterator().next(), config);
}
+ /**
+ * Transfer batches to a user connection. The user connection is typically a
+ * network connection, but may be internal for a web or REST client. Data is
+ * sent as a "package", allowing the network client to request serialization,
+ * and the internal client to just transfer buffer ownership.
+ */
public static class ScreenRoot extends BaseRootExec {
private static final Logger logger = LoggerFactory.getLogger(ScreenRoot.class);
private final RecordBatch incoming;
private final RootFragmentContext context;
private final AccountingUserConnection userConnection;
- private RecordMaterializer materializer;
-
+ private DataPackage dataPackage;
private boolean firstBatch = true;
public enum Metric implements MetricDef {
@@ -67,15 +69,11 @@
}
}
- public ScreenRoot(RootFragmentContext context, RecordBatch incoming, Screen config) throws OutOfMemoryException {
+ public ScreenRoot(RootFragmentContext context, RecordBatch incoming, Screen config) {
super(context, config);
this.context = context;
this.incoming = incoming;
- userConnection = context.getUserDataTunnel();
- }
-
- public RootFragmentContext getContext() {
- return context;
+ this.userConnection = context.getUserDataTunnel();
}
@Override
@@ -85,53 +83,40 @@
switch (outcome) {
case NONE:
if (firstBatch) {
- // this is the only data message sent to the client and may contain the schema
- QueryWritableBatch batch;
- QueryData header = QueryData.newBuilder()
- .setQueryId(context.getHandle().getQueryId())
- .setRowCount(0)
- .setDef(RecordBatchDef.getDefaultInstance())
- .build();
- batch = new QueryWritableBatch(header);
stats.startWait();
try {
- userConnection.sendData(batch);
+ // This is the only data message sent to the client and does not contain the schema
+ userConnection.sendData(new EmptyResultsPackage(context.getHandle().getQueryId()));
} finally {
stats.stopWait();
}
firstBatch = false; // we don't really need to set this. But who knows!
}
-
return false;
+
case OK_NEW_SCHEMA:
- materializer = new VectorRecordMaterializer(context, oContext, incoming);
+ dataPackage = new DataPackage(new VectorRecordMaterializer(context, oContext, incoming), stats);
//$FALL-THROUGH$
case OK:
injector.injectPause(context.getExecutionControls(), "sending-data", logger);
- final QueryWritableBatch batch = materializer.convertNext();
- updateStats(batch);
stats.startWait();
try {
- userConnection.sendData(batch);
+ // Stats updated if connection serializes the batch
+ userConnection.sendData(dataPackage);
} finally {
stats.stopWait();
}
firstBatch = false;
-
return true;
+
default:
throw new UnsupportedOperationException(outcome.name());
}
}
- public void updateStats(QueryWritableBatch queryBatch) {
- stats.addLongStat(Metric.BYTES_SENT, queryBatch.getByteCount());
- }
-
- RecordBatch getIncoming() {
- return incoming;
- }
+ public RootFragmentContext getContext() { return context; }
+ protected RecordBatch getIncoming() { return incoming; }
@Override
public void close() throws Exception {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
index a3d03c2..ec22632 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
@@ -24,9 +24,12 @@
import org.apache.drill.exec.record.TransferPair;
public interface Filterer {
- TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION2 = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate2.class);
- TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION4 = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate4.class);
+ TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION2 =
+ new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate2.class);
+ TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION4 =
+ new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate4.class);
- void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, TransferPair[] transfers) throws SchemaChangeException;
+ void setup(FragmentContext context, RecordBatch incoming,
+ RecordBatch outgoing, TransferPair[] transfers) throws SchemaChangeException;
void filterBatch(int recordCount) throws SchemaChangeException;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryDataPackage.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryDataPackage.java
new file mode 100644
index 0000000..f9a933d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryDataPackage.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.materialize;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.physical.impl.ScreenCreator.ScreenRoot.Metric;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+
+/**
+ * Packages a batch from the Screen operator to send to its
+ * user connection. In the original Drill, that connection was always a
+ * network connection, and so the outgoing batch is serialized to a set
+ * of buffers ready to send. However, the REST server runs in the same process.
+ * The original REST query implementation serialized the data to buffers, then
+ * copied the data to a big buffer to be deserialized, causing significant memory
+ * pressure. This version allows the user connection to elect for serialization,
+ * or just to access the original source batch.
+ */
+public interface QueryDataPackage {
+
+ QueryId queryId();
+ QueryWritableBatch toWritableBatch();
+ VectorContainer batch();
+ List<SerializedField> fields();
+
+ /**
+ * Package that contains only a query ID. Send for a query that
+ * finishes with no data. The results are null: no data, no schema.
+ */
+ public static class EmptyResultsPackage implements QueryDataPackage {
+
+ private final QueryId queryId;
+
+ public EmptyResultsPackage(QueryId queryId) {
+ this.queryId = queryId;
+ }
+
+ @Override
+ public QueryId queryId() { return queryId; }
+
+ /**
+ * Creates a message that sends only the query ID to the
+ * client.
+ */
+ @Override
+ public QueryWritableBatch toWritableBatch() {
+ QueryData header = QueryData.newBuilder()
+ .setQueryId(queryId)
+ .setRowCount(0)
+ .setDef(RecordBatchDef.getDefaultInstance())
+ .build();
+ return new QueryWritableBatch(header);
+ }
+
+ @Override
+ public VectorContainer batch() { return null; }
+
+ @Override
+ public List<SerializedField> fields() {
+ return Collections.emptyList();
+ }
+ }
+
+ /**
+ * Represents a batch of data with a schema.
+ */
+ public static class DataPackage implements QueryDataPackage {
+ private final RecordMaterializer materializer;
+ private final OperatorStats stats;
+
+ public DataPackage(RecordMaterializer materializer, OperatorStats stats) {
+ this.materializer = materializer;
+ this.stats = stats;
+ }
+
+ @Override
+ public QueryId queryId() { return materializer.queryId(); }
+
+ @Override
+ public QueryWritableBatch toWritableBatch() {
+ QueryWritableBatch batch = materializer.convertNext();
+ stats.addLongStat(Metric.BYTES_SENT, batch.getByteCount());
+ return batch;
+ }
+
+ @Override
+ public VectorContainer batch() {
+ return materializer.incoming();
+ }
+
+ @Override
+ public List<SerializedField> fields() {
+ List<SerializedField> metadata = new ArrayList<>();
+ for (VectorWrapper<?> vw : batch()) {
+ metadata.add(vw.getValueVector().getMetadata());
+ }
+ return metadata;
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
index e69bd51..f9537b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
@@ -24,7 +24,6 @@
import org.apache.drill.exec.proto.UserBitShared.QueryData;
public class QueryWritableBatch {
-// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWritableBatch.class);
private final QueryData header;
private final ByteBuf[] buffers;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
index 75de592..9c2f7ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
@@ -17,9 +17,14 @@
*/
package org.apache.drill.exec.physical.impl.materialize;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.record.VectorContainer;
public interface RecordMaterializer {
public QueryWritableBatch convertNext();
+ public QueryId queryId();
+
+ public VectorContainer incoming();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
index 7cdf9b3..c294774 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
@@ -25,16 +25,16 @@
import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionSet;
public class VectorRecordMaterializer implements RecordMaterializer {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorRecordMaterializer.class);
- private QueryId queryId;
- private RecordBatch batch;
- private BufferAllocator allocator;
- private OptionManager options;
+ private final QueryId queryId;
+ private final RecordBatch batch;
+ private final BufferAllocator allocator;
+ private final boolean resultResultsForDDL;
public VectorRecordMaterializer(FragmentContext context, OperatorContext oContext, RecordBatch batch) {
this.queryId = context.getHandle().getQueryId();
@@ -42,19 +42,27 @@
this.allocator = oContext.getAllocator();
BatchSchema schema = batch.getSchema();
assert schema != null : "Schema must be defined.";
- options = context.getOptions();
+ OptionSet options = context.getOptions();
+ this.resultResultsForDDL = options.getBoolean(ExecConstants.RETURN_RESULT_SET_FOR_DDL);
}
+ @Override
public QueryWritableBatch convertNext() {
WritableBatch w = batch.getWritableBatch().transfer(allocator);
QueryData.Builder builder = QueryData.newBuilder()
.setQueryId(queryId)
.setRowCount(batch.getRecordCount())
.setDef(w.getDef());
- if (!options.getBoolean(ExecConstants.RETURN_RESULT_SET_FOR_DDL)) {
+ if (!resultResultsForDDL) {
int count = w.getDef().getAffectedRowsCount();
builder.setAffectedRowsCount(count == -1 ? 0 : count);
}
return new QueryWritableBatch(builder.build(), w.getBuffers());
}
+
+ @Override
+ public QueryId queryId() { return queryId; }
+
+ @Override
+ public VectorContainer incoming() { return batch.getContainer(); }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index cb835e7..d4087fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -25,7 +25,7 @@
* Implements an AbstractUnaryRecordBatch where the incoming record batch is
* known at the time of creation
*
- * @param <T>
+ * @param <T> the plan definition of the operator
*/
public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> extends AbstractUnaryRecordBatch<T> {
@@ -43,12 +43,15 @@
}
/**
- * Based on lastKnownOutcome and if there are more records to be output for current record boundary detected by
- * EMIT outcome, this method returns EMIT or OK outcome.
+ * Based on lastKnownOutcome and if there are more records to be output for
+ * current record boundary detected by EMIT outcome, this method returns EMIT
+ * or OK outcome.
+ *
* @param hasMoreRecordInBoundary
- * @return - EMIT - If the lastknownOutcome was EMIT and output records corresponding to all the incoming records in
- * current record boundary is already produced.
- * - OK - otherwise
+ * @return EMIT - If the lastknownOutcome was EMIT and output records
+ * corresponding to all the incoming records in current record
+ * boundary is already produced.
+ * OK - otherwise
*/
protected IterOutcome getFinalOutcome(boolean hasMoreRecordInBoundary) {
final IterOutcome lastOutcome = getLastKnownOutcome();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
index 6d8865d..2ee1047 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -30,7 +30,6 @@
* {@link org.apache.drill.exec.record.metadata.TupleMetadata} instead.
*/
public class BatchSchema implements Iterable<MaterializedField> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchSchema.class);
private final SelectionVectorMode selectionVectorMode;
private final List<MaterializedField> fields;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index 064c601..1eb2ac0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -51,7 +51,7 @@
private final static Logger logger = LoggerFactory.getLogger(RecordBatchLoader.class);
private final BufferAllocator allocator;
- private VectorContainer container = new VectorContainer();
+ private VectorContainer container;
private int valueCount;
private BatchSchema schema;
@@ -60,6 +60,7 @@
*/
public RecordBatchLoader(BufferAllocator allocator) {
this.allocator = Preconditions.checkNotNull(allocator);
+ this.container = new VectorContainer(allocator);
}
public BufferAllocator allocator() { return allocator; }
@@ -116,12 +117,10 @@
vector = TypeHelper.getNewVector(fieldDef, allocator);
// If the field is a map or a dict, check if the schema changed.
-
} else if ((vector.getField().getType().getMinorType() == MinorType.MAP || vector.getField().getType().getMinorType() == MinorType.DICT) &&
! isSameSchema(vector.getField().getChildren(), field.getChildList())) {
// The schema changed. Discard the old one and create a new one.
-
schemaChanged = true;
vector.clear();
vector = TypeHelper.getNewVector(fieldDef, allocator);
@@ -155,8 +154,8 @@
container = newVectors;
container.setRecordCount(valueCount);
} catch (final Throwable cause) {
- // We have to clean up new vectors created here and pass over the actual cause. It is upper layer who should
- // adjudicate to call upper layer specific clean up logic.
+ // We have to clean up new vectors created here and pass over the actual cause.
+ // It is upper layer who should adjudicate to call upper layer specific clean up logic.
VectorAccessibleUtilities.clear(newVectors);
throw cause;
} finally {
@@ -190,7 +189,6 @@
// Column order can permute (see DRILL-5828). So, use a map
// for matching.
-
Map<String, MaterializedField> childMap = CaseInsensitiveMap.newHashMap();
for (MaterializedField currentChild : currentChildren) {
childMap.put(currentChild.getName(), currentChild);
@@ -199,13 +197,11 @@
MaterializedField currentChild = childMap.get(newChild.getNamePart().getName());
// New map member?
-
if (currentChild == null) {
return false;
}
// Changed data type?
-
if (! currentChild.getType().equals(newChild.getMajorType())) {
return false;
}
@@ -223,7 +219,6 @@
}
// Everything matches.
-
return true;
}
@@ -232,20 +227,6 @@
return container.getValueVectorId(path);
}
-//
-// @SuppressWarnings("unchecked")
-// public <T extends ValueVector> T getValueVectorId(int fieldId, Class<?> clazz) {
-// ValueVector v = container.get(fieldId);
-// assert v != null;
-// if (v.getClass() != clazz){
-// logger.warn(String.format(
-// "Failure while reading vector. Expected vector class of %s but was holding vector class %s.",
-// clazz.getCanonicalName(), v.getClass().getCanonicalName()));
-// return null;
-// }
-// return (T) v;
-// }
-
@Override
public int getRecordCount() { return valueCount; }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 966ade7..1b30d78 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -19,6 +19,7 @@
import io.netty.buffer.DrillBuf;
+import java.util.ArrayList;
import java.util.List;
import org.apache.drill.exec.memory.BufferAllocator;
@@ -28,7 +29,6 @@
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
/**
* A specialized version of record batch that can moves out buffers and preps
@@ -52,7 +52,7 @@
}
public WritableBatch transfer(BufferAllocator allocator) {
- List<DrillBuf> newBuffers = Lists.newArrayList();
+ List<DrillBuf> newBuffers = new ArrayList<>();
for (DrillBuf buf : buffers) {
int writerIndex = buf.writerIndex();
DrillBuf newBuf = buf.transferOwnership(allocator).buffer;
@@ -135,7 +135,7 @@
}
public static WritableBatch getBatchNoHVWrap(int recordCount, Iterable<VectorWrapper<?>> vws, boolean isSV2) {
- List<ValueVector> vectors = Lists.newArrayList();
+ List<ValueVector> vectors = new ArrayList<>();
for (VectorWrapper<?> vw : vws) {
Preconditions.checkArgument(!vw.isHyper());
vectors.add(vw.getValueVector());
@@ -144,8 +144,8 @@
}
public static WritableBatch getBatchNoHV(int recordCount, Iterable<ValueVector> vectors, boolean isSV2) {
- List<DrillBuf> buffers = Lists.newArrayList();
- List<SerializedField> metadata = Lists.newArrayList();
+ List<DrillBuf> buffers = new ArrayList<>();
+ List<SerializedField> metadata = new ArrayList<>();
for (ValueVector vv : vectors) {
metadata.add(vv.getMetadata());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractDisposableUserClientConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractDisposableUserClientConnection.java
index 2934c34..0fd2862 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractDisposableUserClientConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractDisposableUserClientConnection.java
@@ -18,6 +18,8 @@
package org.apache.drill.exec.rpc;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
@@ -29,13 +31,14 @@
import java.util.concurrent.TimeUnit;
/**
- * Helps to run a query and await on the results. All the inheriting sub-class manages the session/connection
- * state and submits query with respect to that state. The subclass instance lifetime is per query lifetime
- * and is not re-used.
+ * Helps to run a query and await on the results. All the inheriting sub-class
+ * manages the session/connection state and submits query with respect to that
+ * state. The subclass instance lifetime is per query lifetime and is not
+ * re-used.
*/
public abstract class AbstractDisposableUserClientConnection implements UserClientConnection {
- private static final org.slf4j.Logger logger =
- org.slf4j.LoggerFactory.getLogger(AbstractDisposableUserClientConnection.class);
+ private static final Logger logger =
+ LoggerFactory.getLogger(AbstractDisposableUserClientConnection.class);
protected final CountDownLatch latch = new CountDownLatch(1);
@@ -72,7 +75,8 @@
final QueryId queryId = result.getQueryId();
if (logger.isDebugEnabled()) {
- logger.debug("Result arrived for QueryId: {} with QueryState: {}", QueryIdHelper.getQueryId(queryId), state);
+ logger.debug("Result arrived for QueryId: {} with QueryState: {}",
+ QueryIdHelper.getQueryId(queryId), state);
}
switch (state) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserClientConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserClientConnection.java
index 1372b29..179cc7c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserClientConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserClientConnection.java
@@ -18,7 +18,8 @@
package org.apache.drill.exec.rpc;
import io.netty.channel.ChannelFuture;
-import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
+
+import org.apache.drill.exec.physical.impl.materialize.QueryDataPackage;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryResult;
import org.apache.drill.exec.rpc.user.UserSession;
@@ -26,12 +27,14 @@
import java.net.SocketAddress;
/**
- * Interface for getting user session properties and interacting with user connection. Separating this interface from
- * {@link AbstractRemoteConnection} implementation for user connection:
+ * Interface for getting user session properties and interacting with user
+ * connection. Separating this interface from {@link AbstractRemoteConnection}
+ * implementation for user connection:
* <p><ul>
- * <li> Connection is passed to Foreman and Screen operators. Instead passing this interface exposes few details.
- * <li> Makes it easy to have wrappers around user connection which can be helpful to tap the messages and data
- * going to the actual client.
+ * <li>Connection is passed to Foreman and Screen operators. Instead passing
+ * this interface exposes few details.
+ * <li>Makes it easy to have wrappers around user connection which can be
+ * helpful to tap the messages and data going to the actual client.
* </ul>
*/
public interface UserClientConnection {
@@ -41,7 +44,7 @@
UserSession getSession();
/**
- * Send query result outcome to client. Outcome is returned through <code>listener</code>
+ * Send query result outcome to client. Outcome is returned through {@code listener}.
*
* @param listener
* @param result
@@ -49,12 +52,12 @@
void sendResult(RpcOutcomeListener<Ack> listener, QueryResult result);
/**
- * Send query data to client. Outcome is returned through <code>listener</code>
+ * Send query data to client. Outcome is returned through {@code listener}.
*
* @param listener
* @param result
*/
- void sendData(RpcOutcomeListener<Ack> listener, QueryWritableBatch result);
+ void sendData(RpcOutcomeListener<Ack> listener, QueryDataPackage data);
/**
* Returns the {@link ChannelFuture} which will be notified when this
@@ -66,4 +69,4 @@
* @return Return the client node address.
*/
SocketAddress getRemoteAddress();
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/AwaitableUserResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/AwaitableUserResultsListener.java
index 461d8aa..25f1c68 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/AwaitableUserResultsListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/AwaitableUserResultsListener.java
@@ -78,5 +78,4 @@
}
return count.get();
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 99b0723..460dbb7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -120,13 +120,13 @@
try {
if (isFailureResult) {
- // Failure case--pass on via submissionFailed(...).
+ // Failure case--pass on via submissionFailed(...).
resultsListener.submissionFailed(new UserRemoteException(queryResult.getError(0)));
// Note: Listener is removed in finally below.
} else if (isTerminalResult) {
- // A successful completion/canceled case--pass on via resultArrived
+ // A successful completion/canceled case--pass on via resultArrived
try {
resultsListener.queryCompleted(queryState);
} catch (Exception e) {
@@ -189,9 +189,9 @@
UserResultsListener resultsListener = queryIdToResultsListenersMap.get(queryId);
logger.trace("For QueryId [{}], retrieved results listener {}", queryId, resultsListener);
if (null == resultsListener) {
+
// WHO?? didn't get query ID response and set submission listener yet,
// so install a buffering listener for now
-
BufferingResultsListener bl = new BufferingResultsListener();
resultsListener = queryIdToResultsListenersMap.putIfAbsent(queryId, bl);
// If we had a successful insertion, use that reference. Otherwise, just
@@ -272,8 +272,7 @@
}
@Override
- public void queryIdArrived(QueryId queryId) {
- }
+ public void queryIdArrived(QueryId queryId) { }
}
private class SubmissionListener extends BaseRpcOutcomeListener<QueryId> {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
index 1d864f2..190e9ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
@@ -25,14 +25,21 @@
public interface UserResultsListener {
/**
- * QueryId is available. Called when a query is successfully submitted to the server.
- * @param queryId sent by the server along {@link org.apache.drill.exec.rpc.Acks.OK Acks.OK}
+ * QueryId is available. Called when a query is successfully submitted to the
+ * server.
+ *
+ * @param queryId
+ * sent by the server along {@link org.apache.drill.exec.rpc.Acks.OK
+ * Acks.OK}
*/
void queryIdArrived(QueryId queryId);
/**
- * The query has failed. Most likely called when the server returns a FAILED query state. Can also be called if
- * {@link #dataArrived(QueryDataBatch, ConnectionThrottle) dataArrived()} throws an exception
+ * The query has failed. Most likely called when the server returns a FAILED
+ * query state. Can also be called if
+ * {@link #dataArrived(QueryDataBatch, ConnectionThrottle) dataArrived()}
+ * throws an exception
+ *
* @param ex exception describing the cause of the failure
*/
void submissionFailed(UserException ex);
@@ -45,10 +52,9 @@
void dataArrived(QueryDataBatch result, ConnectionThrottle throttle);
/**
- * The query has completed (successsful completion or cancellation). The listener will not receive any other
- * data or result message. Called when the server returns a terminal-non failing- state (COMPLETED or CANCELLED)
- * @param state
+ * The query has completed (successful completion or cancellation). The
+ * listener will not receive any other data or result message. Called when the
+ * server returns a terminal-non failing- state (COMPLETED or CANCELLED)
*/
void queryCompleted(QueryState state);
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 276758e..cb1db13 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -31,6 +31,7 @@
import org.apache.drill.common.exceptions.DrillException;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.impl.materialize.QueryDataPackage;
import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
@@ -63,6 +64,7 @@
import org.apache.hadoop.security.HadoopKerberosName;
import org.joda.time.DateTime;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLEngine;
import javax.security.sasl.SaslException;
@@ -75,7 +77,7 @@
import java.util.concurrent.ConcurrentHashMap;
public class UserServer extends BasicServer<RpcType, BitToUserConnection> {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserServer.class);
+ private static final Logger logger = LoggerFactory.getLogger(UserServer.class);
private static final String SERVER_NAME = "Apache Drill Server";
private final UserConnectionConfig config;
@@ -84,7 +86,7 @@
private final UserWorker userWorker;
private static final ConcurrentHashMap<BitToUserConnection, BitToUserConnectionConfig> userConnectionMap;
- //Initializing the singleton map during startup
+ // Initialize the singleton map during startup
static {
userConnectionMap = new ConcurrentHashMap<>();
}
@@ -195,11 +197,13 @@
}
/**
- * It represents a client connection accepted by Foreman Drillbit's UserServer from a DrillClient. This connection
- * is used to get hold of {@link UserSession} which stores all session related information like session options
- * changed over the lifetime of this connection. There is a 1:1 mapping between a BitToUserConnection and a
- * UserSession. This connection object is also used to send query data and result back to the client submitted as part
- * of the session tied to this connection.
+ * Represents a client connection accepted by Foreman Drillbit's UserServer
+ * from a DrillClient. This connection is used to get hold of
+ * {@link UserSession} which stores all session related information like
+ * session options changed over the lifetime of this connection. There is a
+ * 1:1 mapping between a BitToUserConnection and a UserSession. This
+ * connection object is also used to send query data and result back to the
+ * client submitted as part of the session tied to this connection.
*/
public class BitToUserConnection extends AbstractServerConnection<BitToUserConnection>
implements UserClientConnection {
@@ -240,7 +244,6 @@
* Sets the user on the session, and finalizes the session.
*
* @param userName user name to set on the session
- *
*/
void finalizeSession(String userName) {
// create a session
@@ -262,9 +265,10 @@
}
@Override
- public UserSession getSession(){
- return session;
- }
+ public UserSession getSession() { return session; }
+
+ @Override
+ protected Logger getLogger() { return logger; }
@Override
public void sendResult(final RpcOutcomeListener<Ack> listener, final QueryResult result) {
@@ -273,17 +277,13 @@
}
@Override
- public void sendData(final RpcOutcomeListener<Ack> listener, final QueryWritableBatch result) {
+ public void sendData(final RpcOutcomeListener<Ack> listener, final QueryDataPackage data) {
+ QueryWritableBatch result = data.toWritableBatch();
logger.trace("Sending data to client with {}", result);
send(listener, this, RpcType.QUERY_DATA, result.getHeader(), Ack.class, false, result.getBuffers());
}
@Override
- protected Logger getLogger() {
- return logger;
- }
-
- @Override
public ChannelFuture getChannelClosureFuture() {
return getChannel().closeFuture()
.addListener(new GenericFutureListener<Future<? super Void>>() {
@@ -504,10 +504,10 @@
* User Connection's config for System Table access
*/
public class BitToUserConnectionConfig {
- private DateTime established;
- private boolean isAuthEnabled;
- private boolean isEncryptionEnabled;
- private boolean isSSLEnabled;
+ private final DateTime established;
+ private final boolean isAuthEnabled;
+ private final boolean isEncryptionEnabled;
+ private final boolean isSSLEnabled;
public BitToUserConnectionConfig() {
established = new DateTime(); //Current Joda-based Time
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
index 0798dea..a7594f6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
@@ -49,11 +49,13 @@
import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class UserSession implements AutoCloseable {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserSession.class);
+ private static final Logger logger = LoggerFactory.getLogger(UserSession.class);
- private boolean supportComplexTypes = false;
+ private boolean supportComplexTypes;
private UserCredentials credentials;
private DrillProperties properties;
private SessionOptionManager sessionOptions;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/BaseWebUserConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/BaseWebUserConnection.java
new file mode 100644
index 0000000..42d93f2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/BaseWebUserConnection.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest;
+
+import java.net.SocketAddress;
+
+import org.apache.drill.exec.rpc.AbstractDisposableUserClientConnection;
+import org.apache.drill.exec.rpc.ConnectionThrottle;
+import org.apache.drill.exec.rpc.user.UserSession;
+
+import io.netty.channel.ChannelFuture;
+
+public abstract class BaseWebUserConnection extends AbstractDisposableUserClientConnection implements ConnectionThrottle {
+
+ protected WebSessionResources webSessionResources;
+
+ public BaseWebUserConnection(WebSessionResources webSessionResources) {
+ this.webSessionResources = webSessionResources;
+ }
+
+ @Override
+ public UserSession getSession() {
+ return webSessionResources.getSession();
+ }
+
+ @Override
+ public ChannelFuture getChannelClosureFuture() {
+ return webSessionResources.getCloseFuture();
+ }
+
+ @Override
+ public SocketAddress getRemoteAddress() {
+ return webSessionResources.getRemoteAddress();
+ }
+
+ @Override
+ public void setAutoRead(boolean enableAutoRead) { }
+
+ public WebSessionResources resources() {
+ return webSessionResources;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
index 141c027..c345571 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
@@ -237,9 +237,7 @@
}
@Override
- public void dispose(WebUserConnection instance) {
-
- }
+ public void dispose(WebUserConnection instance) { }
}
public static class AnonWebUserConnectionProvider implements Factory<WebUserConnection> {
@@ -300,14 +298,14 @@
}
@Override
- public void dispose(WebUserConnection instance) {
-
- }
+ public void dispose(WebUserConnection instance) { }
/**
- * Creates session user principal. If impersonation is enabled without authentication and User-Name header is present and valid,
- * will create session user principal with provided user name, otherwise anonymous user name will be used.
- * In both cases session user principal will have admin rights.
+ * Creates session user principal. If impersonation is enabled without
+ * authentication and User-Name header is present and valid, will create
+ * session user principal with provided user name, otherwise anonymous user
+ * name will be used. In both cases session user principal will have admin
+ * rights.
*
* @param config drill config
* @param request client request
@@ -322,10 +320,12 @@
}
return new AnonDrillUserPrincipal();
}
-
}
- // Provider which injects DrillUserPrincipal directly instead of getting it from SecurityContext and typecasting
+ /**
+ * Provider which injects DrillUserPrincipal directly instead of getting it
+ * from SecurityContext and typecasting
+ */
public static class DrillUserPrincipalProvider implements Factory<DrillUserPrincipal> {
@Inject HttpServletRequest request;
@@ -336,9 +336,7 @@
}
@Override
- public void dispose(DrillUserPrincipal principal) {
- // No-Op
- }
+ public void dispose(DrillUserPrincipal principal) { }
}
// Provider which creates and cleanups DrillUserPrincipal for anonymous (auth disabled) mode
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogsResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogsResources.java
index 4aa2061..e29050a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogsResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogsResources.java
@@ -150,7 +150,7 @@
private File getFileByName(File folder, final String name) {
File[] files = folder.listFiles((dir, fileName) -> fileName.equals(name));
if (files.length == 0) {
- throw new DrillRuntimeException (name + " doesn't exist");
+ throw new DrillRuntimeException(name + " doesn't exist");
}
return files[0];
}
@@ -159,9 +159,9 @@
@XmlRootElement
public class Log implements Comparable<Log> {
- private String name;
- private long size;
- private DateTime lastModified;
+ private final String name;
+ private final long size;
+ private final DateTime lastModified;
@JsonCreator
public Log (@JsonProperty("name") String name, @JsonProperty("size") long size, @JsonProperty("lastModified") long lastModified) {
@@ -190,9 +190,9 @@
@XmlRootElement
public class LogContent {
- private String name;
- private Collection<String> lines;
- private int maxLines;
+ private final String name;
+ private final Collection<String> lines;
+ private final int maxLines;
@JsonCreator
public LogContent (@JsonProperty("name") String name, @JsonProperty("lines") Collection<String> lines, @JsonProperty("maxLines") int maxLines) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
index 94fcea9..be4c331 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
@@ -22,6 +22,7 @@
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.shaded.guava.com.google.common.base.CharMatcher;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.parquet.Strings;
@@ -98,7 +99,7 @@
public static final class RestQueryBuilder {
private String query;
- private String queryType = "SQL";
+ private String queryType = QueryType.SQL.name();
private int rowLimit;
private String userName;
private String defaultSchema;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/RestQueryRunner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/RestQueryRunner.java
index e944386..69d81ae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/RestQueryRunner.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/RestQueryRunner.java
@@ -62,7 +62,7 @@
this.options = webUserConnection.getSession().getOptions();
}
- public RestQueryRunner.QueryResult run() throws Exception {
+ public QueryResult run() throws Exception {
applyUserName();
applyOptions();
applyDefaultSchema();
@@ -131,7 +131,7 @@
return maxRows;
}
- public RestQueryRunner.QueryResult submitQuery(int maxRows) {
+ public QueryResult submitQuery(int maxRows) {
final RunQuery runQuery = RunQuery.newBuilder()
.setType(QueryType.valueOf(query.getQueryType()))
.setPlan(query.getQuery())
@@ -161,7 +161,7 @@
}
} while (!isComplete && !nearlyOutOfHeapSpace);
- //Fail if nearly out of heap space
+ // Fail if nearly out of heap space
if (nearlyOutOfHeapSpace) {
UserException almostOutOfHeapException = UserException.resourceError()
.message("There is not enough heap memory to run this query using the web interface. ")
@@ -185,7 +185,7 @@
return new QueryResult(queryId, webUserConnection, webUserConnection.results);
}
- //Detect possible excess heap
+ // Detect possible excess heap
private float getHeapUsage() {
return (float) memMXBean.getHeapMemoryUsage().getUsed() / memMXBean.getHeapMemoryUsage().getMax();
}
@@ -198,15 +198,16 @@
public final String queryState;
public final int attemptedAutoLimit;
- //DRILL-6847: Modified the constructor so that the method has access to all the properties in webUserConnection
+ // DRILL-6847: Modified the constructor so that the method has access
+ // to all the properties in webUserConnection
public QueryResult(QueryId queryId, WebUserConnection webUserConnection, List<Map<String, String>> rows) {
- this.queryId = QueryIdHelper.getQueryId(queryId);
- this.columns = webUserConnection.columns;
- this.metadata = webUserConnection.metadata;
- this.queryState = webUserConnection.getQueryState();
- this.rows = rows;
- this.attemptedAutoLimit = webUserConnection.getAutoLimitRowCount();
- }
+ this.queryId = QueryIdHelper.getQueryId(queryId);
+ this.columns = webUserConnection.columns;
+ this.metadata = webUserConnection.metadata;
+ this.queryState = webUserConnection.getQueryState();
+ this.rows = rows;
+ this.attemptedAutoLimit = webUserConnection.getAutoLimitRowCount();
+ }
public String getQueryId() {
return queryId;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
index 6819457..e00cb48 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
@@ -262,12 +262,11 @@
responseHeadersSettingFilter.setInitParameters(ResponseHeadersSettingFilter.retrieveResponseHeaders(config));
servletContextHandler.addFilter(responseHeadersSettingFilter, "/*", EnumSet.of(DispatcherType.REQUEST));
-
return servletContextHandler;
}
/**
- * It creates A {@link SessionHandler} which contains a {@link HashSessionManager}
+ * Create a {@link SessionHandler} which contains a {@link HashSessionManager}
*
* @param securityHandler Set of init parameters that are used by the Authentication
* @return session handler
@@ -279,9 +278,7 @@
sessionManager.getSessionCookieConfig().setHttpOnly(true);
sessionManager.addEventListener(new HttpSessionListener() {
@Override
- public void sessionCreated(HttpSessionEvent se) {
-
- }
+ public void sessionCreated(HttpSessionEvent se) { }
@Override
public void sessionDestroyed(HttpSessionEvent se) {
@@ -338,8 +335,9 @@
}
/**
- * Create an HTTPS connector for given jetty server instance. If the admin has specified keystore/truststore settings
- * they will be used else a self-signed certificate is generated and used.
+ * Create an HTTPS connector for given jetty server instance. If the admin has
+ * specified keystore/truststore settings they will be used else a self-signed
+ * certificate is generated and used.
*
* @return Initialized {@link ServerConnector} for HTTPS connections.
*/
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java
index 016278f..c06770e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java
@@ -22,22 +22,23 @@
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.rpc.ChannelClosedException;
import org.apache.drill.exec.rpc.user.UserSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.net.SocketAddress;
/**
- * Class holding all the resources required for Web User Session. This class is responsible for the proper cleanup of
- * all the resources.
+ * Holds the resources required for Web User Session. This class is responsible
+ * for the proper cleanup of all the resources.
*/
public class WebSessionResources implements AutoCloseable {
+ private static final Logger logger = LoggerFactory.getLogger(WebSessionResources.class);
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WebSessionResources.class);
-
- private BufferAllocator allocator;
+ private final BufferAllocator allocator;
private final SocketAddress remoteAddress;
- private UserSession webUserSession;
+ private final UserSession webUserSession;
private ChannelPromise closeFuture;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java
index 0ca6abe..9427fc3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java
@@ -17,177 +17,128 @@
*/
package org.apache.drill.exec.server.rest;
-import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.util.ValueVectorElementFormatter;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.DrillBuf;
-import io.netty.channel.ChannelFuture;
-import org.apache.drill.common.exceptions.UserException;
+
import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
+import org.apache.drill.exec.physical.impl.materialize.QueryDataPackage;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.rpc.AbstractDisposableUserClientConnection;
import org.apache.drill.exec.rpc.Acks;
-import org.apache.drill.exec.rpc.ConnectionThrottle;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
-import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.vector.ValueVector.Accessor;
+import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
-import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
import java.util.Set;
/**
- * WebUserConnectionWrapper which represents the UserClientConnection between WebServer and Foreman, for the WebUser
- * submitting the query. It provides access to the UserSession executing the query. There is no actual physical
+ * {@code WebUserConnectionWrapper} which represents the {@code UserClientConnection} between
+ * WebServer and Foreman, for the WebUser submitting the query. It provides
+ * access to the {@code UserSession} executing the query. There is no actual physical
* channel corresponding to this connection wrapper.
*
- * It returns a close future with no actual underlying {@link io.netty.channel.Channel} associated with it but do have an
- * EventExecutor out of BitServer EventLoopGroup. Since there is no actual connection established using this class,
- * hence the close event will never be fired by underlying layer and close future is set only when the
+ * It returns a close future with no actual underlying
+ * {@link io.netty.channel.Channel} associated with it but do have an
+ * {@code EventExecutor} out of BitServer EventLoopGroup. Since there is no actual
+ * connection established using this class, hence the close event will never be
+ * fired by underlying layer and close future is set only when the
* {@link WebSessionResources} are closed.
*/
-
-public class WebUserConnection extends AbstractDisposableUserClientConnection implements ConnectionThrottle {
-
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WebUserConnection.class);
-
- protected WebSessionResources webSessionResources;
+public class WebUserConnection extends BaseWebUserConnection {
public final List<Map<String, String>> results = Lists.newArrayList();
-
public final Set<String> columns = Sets.newLinkedHashSet();
-
public final List<String> metadata = new ArrayList<>();
-
private int autoLimitRowCount;
+ private int rowCount;
WebUserConnection(WebSessionResources webSessionResources) {
- this.webSessionResources = webSessionResources;
+ super(webSessionResources);
}
@Override
- public UserSession getSession() {
- return webSessionResources.getSession();
+ public void sendData(RpcOutcomeListener<Ack> listener, QueryDataPackage data) {
+ processBatch(data.batch());
+ listener.success(Acks.OK, null);
}
- @Override
- public void sendData(RpcOutcomeListener<Ack> listener, QueryWritableBatch result) {
- // There can be overflow here but DrillBuf doesn't support allocating with
- // bytes in long. Hence we are just preserving the earlier behavior and logging debug log for the case.
- final int dataByteCount = (int) result.getByteCount();
-
- if (dataByteCount < 0) {
- if (logger.isDebugEnabled()) {
- logger.debug("There is BufferOverflow in dataByteCount: {}",
- dataByteCount);
- }
- listener.success(Acks.OK, null);
+ private void processBatch(VectorContainer batch) {
+ if (batch == null) {
+ // Empty query: no data, no schema.
return;
}
- // Create a ByteBuf with all the data in it.
- final int rows = result.getHeader().getRowCount();
- final BufferAllocator allocator = webSessionResources.getAllocator();
- final DrillBuf bufferWithData = allocator.buffer(dataByteCount);
- try {
- final ByteBuf[] resultDataBuffers = result.getBuffers();
+ // Build metadata only on first batch, or if the schema changes
+ if (metadata.isEmpty() || batch.isSchemaChanged()) {
+ columns.clear();
+ metadata.clear();
+ buildMetadata(batch.getSchema());
+ }
+ addResults(batch.getRecordCount(), batch);
+ batch.zeroVectors();
+ }
- for (final ByteBuf buffer : resultDataBuffers) {
- bufferWithData.writeBytes(buffer);
- buffer.release();
- }
+ private void buildMetadata(BatchSchema schema) {
+ for (int i = 0; i < schema.getFieldCount(); ++i) {
+ // DRILL-6847: This section adds query metadata to the REST results
+ MaterializedField col = schema.getColumn(i);
+ columns.add(col.getName());
+ StringBuilder dataType = new StringBuilder(col.getType().getMinorType().name());
- final RecordBatchLoader loader = new RecordBatchLoader(allocator);
- try {
- loader.load(result.getHeader().getDef(), bufferWithData);
- // TODO: Clean: DRILL-2933: That load(...) no longer throws
- // SchemaChangeException, so check/clean catch clause below.
- for (int i = 0; i < loader.getSchema().getFieldCount(); ++i) {
- //DRILL-6847: This section adds query metadata to the REST results
- MaterializedField col = loader.getSchema().getColumn(i);
- columns.add(col.getName());
- StringBuilder dataType = new StringBuilder(col.getType().getMinorType().name());
+ // For DECIMAL type
+ if (col.getType().hasPrecision()) {
+ dataType.append("(");
+ dataType.append(col.getType().getPrecision());
- //For DECIMAL type
- if (col.getType().hasPrecision()) {
- dataType.append("(");
- dataType.append(col.getType().getPrecision());
-
- if (col.getType().hasScale()) {
- dataType.append(", ");
- dataType.append(col.getType().getScale());
- }
-
- dataType.append(")");
- } else if (col.getType().hasWidth()) {
- //Case for VARCHAR columns with specified width
- dataType.append("(");
- dataType.append(col.getType().getWidth());
- dataType.append(")");
- }
- metadata.add(dataType.toString());
+ if (col.getType().hasScale()) {
+ dataType.append(", ");
+ dataType.append(col.getType().getScale());
}
- ValueVectorElementFormatter formatter = new ValueVectorElementFormatter(webSessionResources.getSession().getOptions());
- for (int i = 0; i < rows; ++i) {
- final Map<String, String> record = Maps.newHashMap();
- for (VectorWrapper<?> vw : loader) {
- final String field = vw.getValueVector().getMetadata().getNamePart().getName();
- final TypeProtos.MinorType fieldMinorType = vw.getValueVector().getMetadata().getMajorType().getMinorType();
- final Accessor accessor = vw.getValueVector().getAccessor();
- final Object value = i < accessor.getValueCount() ? accessor.getObject(i) : null;
- final String display = value == null ? null : formatter.format(value, fieldMinorType);
- record.put(field, display);
- }
- results.add(record);
- }
- } finally {
- loader.clear();
+
+ dataType.append(")");
+ } else if (col.getType().hasWidth()) {
+ // Case for VARCHAR columns with specified width
+ dataType.append("(");
+ dataType.append(col.getType().getWidth());
+ dataType.append(")");
}
- } catch (Exception e) {
- boolean verbose = webSessionResources.getSession().getOptions().getBoolean(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY);
- // Wrapping the exception into UserException and then into DrillPBError.
- // It will be thrown as exception in QueryWrapper class.
- // It's verbosity depends on system option "exec.errors.verbose".
- error = UserException.systemError(e).build(logger).getOrCreatePBError(verbose);
- } finally {
- // Notify the listener with ACK.OK both in error/success case because data was send successfully from Drillbit.
- bufferWithData.release();
- listener.success(Acks.OK, null);
+ metadata.add(dataType.toString());
}
}
- @Override
- public ChannelFuture getChannelClosureFuture() {
- return webSessionResources.getCloseFuture();
- }
-
- @Override
- public SocketAddress getRemoteAddress() {
- return webSessionResources.getRemoteAddress();
- }
-
- @Override
- public void setAutoRead(boolean enableAutoRead) {
- // no-op
+ private void addResults(int rows, VectorAccessible batch) {
+ ValueVectorElementFormatter formatter = new ValueVectorElementFormatter(webSessionResources.getSession().getOptions());
+ if (autoLimitRowCount > 0) {
+ rows = Math.max(0, Math.min(rows, autoLimitRowCount - rowCount));
+ }
+ for (int i = 0; i < rows; ++i) {
+ rowCount++;
+ final Map<String, String> record = Maps.newHashMap();
+ for (VectorWrapper<?> vw : batch) {
+ final String field = vw.getValueVector().getMetadata().getNamePart().getName();
+ final TypeProtos.MinorType fieldMinorType = vw.getValueVector().getMetadata().getMajorType().getMinorType();
+ final Accessor accessor = vw.getValueVector().getAccessor();
+ final Object value = i < accessor.getValueCount() ? accessor.getObject(i) : null;
+ final String display = value == null ? null : formatter.format(value, fieldMinorType);
+ record.put(field, display);
+ }
+ results.add(record);
+ }
}
/**
* For authenticated WebUser no cleanup of {@link WebSessionResources} is done since it's re-used
* for all the queries until lifetime of the web session.
*/
- public void cleanupSession() {
- // no-op
- }
+ public void cleanupSession() { }
public static class AnonWebUserConnection extends WebUserConnection {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index ac1b4cc..de49e28 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -21,6 +21,8 @@
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.drill.common.SelfCleaningRunnable;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.coord.ClusterCoordinator;
@@ -60,11 +62,11 @@
import java.util.concurrent.locks.ReentrantLock;
/**
- * Manages the running fragments in a Drillbit. Periodically requests run-time stats updates from fragments
- * running elsewhere.
+ * Manages the running fragments in a Drillbit. Periodically requests run-time
+ * stats updates from fragments running elsewhere.
*/
public class WorkManager implements AutoCloseable {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkManager.class);
+ private static final Logger logger = LoggerFactory.getLogger(WorkManager.class);
private static final int EXIT_TIMEOUT_MS = 5000;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java
index ef018f6..5de560a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java
@@ -18,14 +18,14 @@
package org.apache.drill.exec.work.prepare;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
-import io.netty.buffer.ByteBuf;
+
import io.netty.channel.ChannelFuture;
import org.apache.drill.common.exceptions.ErrorHelper;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
+import org.apache.drill.exec.physical.impl.materialize.QueryDataPackage;
import org.apache.drill.exec.proto.ExecProtos.ServerPreparedStatementState;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
@@ -43,6 +43,7 @@
import org.apache.drill.exec.proto.UserProtos.ResultColumnMetadata;
import org.apache.drill.exec.proto.UserProtos.RpcType;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.rpc.AbstractDisposableUserClientConnection;
import org.apache.drill.exec.rpc.Acks;
import org.apache.drill.exec.rpc.Response;
@@ -53,6 +54,8 @@
import org.apache.drill.exec.store.ischema.InfoSchemaConstants;
import org.apache.drill.exec.work.user.UserWorker;
import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.net.SocketAddress;
@@ -72,11 +75,11 @@
* Contains worker {@link Runnable} for creating a prepared statement and helper methods.
*/
public class PreparedStatementProvider {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PreparedStatementProvider.class);
+ private static final Logger logger = LoggerFactory.getLogger(PreparedStatementProvider.class);
/**
- * Static list of mappings from {@link MinorType} to JDBC ResultSet class name (to be returned through
- * {@link ResultSetMetaData#getColumnClassName(int)}.
+ * Static list of mappings from {@link MinorType} to JDBC ResultSet class name
+ * (to be returned through {@link ResultSetMetaData#getColumnClassName(int)}.
*/
private static final Map<MinorType, String> DRILL_TYPE_TO_JDBC_CLASSNAME = ImmutableMap.<MinorType, String>builder()
.put(MinorType.INT, Integer.class.getName())
@@ -186,9 +189,10 @@
}
/**
- * Helper method to create {@link DrillPBError} and set it in <code>respBuilder</code>
+ * Helper method to create {@link DrillPBError} and set it in {@code respBuilder}
*/
- private static void setErrorHelper(final CreatePreparedStatementResp.Builder respBuilder, final RequestStatus status,
+ private static void setErrorHelper(final CreatePreparedStatementResp.Builder respBuilder,
+ final RequestStatus status,
final Throwable ex, final String message, final ErrorType errorType) {
respBuilder.setStatus(status);
final String errorId = UUID.randomUUID().toString();
@@ -211,7 +215,7 @@
}
/**
- * Helper method to log error and set given {@link DrillPBError} in <code>respBuilder</code>
+ * Helper method to log error and set given {@link DrillPBError} in {@code respBuilder}
*/
private static void setErrorHelper(final CreatePreparedStatementResp.Builder respBuilder, final DrillPBError error,
final String message) {
@@ -250,14 +254,14 @@
}
@Override
- public void sendData(RpcOutcomeListener<Ack> listener, QueryWritableBatch result) {
+ public void sendData(RpcOutcomeListener<Ack> listener, QueryDataPackage data) {
// Save the query results schema and release the buffers.
- if (fields == null) {
- fields = result.getHeader().getDef().getFieldList();
- }
-
- for (ByteBuf buf : result.getBuffers()) {
- buf.release();
+ VectorContainer batch = data.batch();
+ if (batch != null) {
+ if (fields == null) {
+ fields = data.fields();
+ }
+ batch.zeroVectors();
}
listener.success(Acks.OK, null);
@@ -287,16 +291,18 @@
builder.setCatalogName(InfoSchemaConstants.IS_CATALOG_NAME);
/**
- * Designated column's schema name. Empty string if not applicable. Initial implementation defaults to empty string
- * as we use LIMIT 0 queries to get the schema and schema info is lost. If we derive the schema from plan, we may
- * get the right value.
+ * Designated column's schema name. Empty string if not applicable. Initial
+ * implementation defaults to empty string as we use LIMIT 0 queries to get
+ * the schema and schema info is lost. If we derive the schema from plan, we
+ * may get the right value.
*/
builder.setSchemaName("");
/**
- * Designated column's table name. Not set if not applicable. Initial implementation defaults to empty string as
- * we use LIMIT 0 queries to get the schema and table info is lost. If we derive the table from plan, we may get
- * the right value.
+ * Designated column's table name. Not set if not applicable. Initial
+ * implementation defaults to empty string as we use LIMIT 0 queries to get
+ * the schema and table info is lost. If we derive the table from plan, we
+ * may get the right value.
*/
builder.setTableName("");
@@ -327,7 +333,8 @@
builder.setPrecision(Types.getPrecision(field.getMajorType()));
/**
- * Column's number of digits to right of the decimal point. 0 is returned for types where the scale is not applicable
+ * Column's number of digits to right of the decimal point. 0 is returned
+ * for types where the scale is not applicable
*/
builder.setScale(Types.getScale(majorType));
@@ -342,8 +349,8 @@
builder.setDisplaySize(Types.getJdbcDisplaySize(majorType));
/**
- * Is the column an aliased column. Initial implementation defaults to true as we derive schema from LIMIT 0 query and
- * not plan
+ * Is the column an aliased column. Initial implementation defaults to true
+ * as we derive schema from LIMIT 0 query and not plan
*/
builder.setIsAliased(true);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
index 9c32b56..976820d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
@@ -41,9 +41,11 @@
import org.apache.drill.exec.work.metadata.MetadataProvider;
import org.apache.drill.exec.work.metadata.ServerMetaProvider.ServerMetaWorker;
import org.apache.drill.exec.work.prepare.PreparedStatementProvider.PreparedStatementWorker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class UserWorker{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserWorker.class);
+ static final Logger logger = LoggerFactory.getLogger(UserWorker.class);
private final WorkerBee bee;
private final QueryCountIncrementer incrementer = new QueryCountIncrementer() {
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
index 3fc5f7c..9e90d06 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
@@ -42,8 +42,9 @@
import io.netty.util.internal.PlatformDependent;
/**
- * Drill data structure for accessing and manipulating data buffers. This class is integrated with the
- * Drill memory management layer for quota enforcement and buffer sharing.
+ * Drill data structure for accessing and manipulating data buffers. This class
+ * is integrated with the Drill memory management layer for quota enforcement
+ * and buffer sharing.
*/
@SuppressWarnings("unused")
public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
@@ -121,18 +122,22 @@
}
/**
- * Create a new DrillBuf that is associated with an alternative allocator for the purposes of memory ownership and
- * accounting. This has no impact on the reference counting for the current DrillBuf except in the situation where the
- * passed in Allocator is the same as the current buffer.
+ * Create a new DrillBuf that is associated with an alternative allocator for
+ * the purposes of memory ownership and accounting. This has no impact on the
+ * reference counting for the current DrillBuf except in the situation where
+ * the passed in Allocator is the same as the current buffer.
*
- * This operation has no impact on the reference count of this DrillBuf. The newly created DrillBuf with either have a
- * reference count of 1 (in the case that this is the first time this memory is being associated with the new
- * allocator) or the current value of the reference count + 1 for the other AllocationManager/BufferLedger combination
- * in the case that the provided allocator already had an association to this underlying memory.
+ * This operation has no impact on the reference count of this DrillBuf. The
+ * newly created DrillBuf with either have a reference count of 1 (in the case
+ * that this is the first time this memory is being associated with the new
+ * allocator) or the current value of the reference count + 1 for the other
+ * AllocationManager/BufferLedger combination in the case that the provided
+ * allocator already had an association to this underlying memory.
*
* @param target
* The target allocator to create an association with.
- * @return A new DrillBuf which shares the same underlying memory as this DrillBuf.
+ * @return A new DrillBuf which shares the same underlying memory as this
+ * DrillBuf.
*/
public DrillBuf retain(BufferAllocator target) {
@@ -148,28 +153,35 @@
}
/**
- * Transfer the memory accounting ownership of this DrillBuf to another allocator. This will generate a new DrillBuf
- * that carries an association with the underlying memory of this DrillBuf. If this DrillBuf is connected to the
- * owning BufferLedger of this memory, that memory ownership/accounting will be transferred to the target allocator. If
- * this DrillBuf does not currently own the memory underlying it (and is only associated with it), this does not
- * transfer any ownership to the newly created DrillBuf.
+ * Transfer the memory accounting ownership of this DrillBuf to another
+ * allocator. This will generate a new DrillBuf that carries an association
+ * with the underlying memory of this DrillBuf. If this DrillBuf is connected
+ * to the owning BufferLedger of this memory, that memory ownership/accounting
+ * will be transferred to the target allocator. If this DrillBuf does not
+ * currently own the memory underlying it (and is only associated with it),
+ * this does not transfer any ownership to the newly created DrillBuf.
* <p>
- * This operation has no impact on the reference count of this DrillBuf. The newly created DrillBuf with either have a
- * reference count of 1 (in the case that this is the first time this memory is being associated with the new
- * allocator) or the current value of the reference count for the other AllocationManager/BufferLedger combination in
- * the case that the provided allocator already had an association to this underlying memory.
+ * This operation has no impact on the reference count of this DrillBuf. The
+ * newly created DrillBuf with either have a reference count of 1 (in the case
+ * that this is the first time this memory is being associated with the new
+ * allocator) or the current value of the reference count for the other
+ * AllocationManager/BufferLedger combination in the case that the provided
+ * allocator already had an association to this underlying memory.
* <p>
- * Transfers will always succeed, even if that puts the other allocator into an overlimit situation. This is possible
- * due to the fact that the original owning allocator may have allocated this memory out of a local reservation
- * whereas the target allocator may need to allocate new memory from a parent or RootAllocator. This operation is done
- * in a mostly-lockless but consistent manner. As such, the overlimit==true situation could occur slightly prematurely
- * to an actual overlimit==true condition. This is simply conservative behavior which means we may return overlimit
- * slightly sooner than is necessary.
+ * Transfers will always succeed, even if that puts the other allocator into
+ * an overlimit situation. This is possible due to the fact that the original
+ * owning allocator may have allocated this memory out of a local reservation
+ * whereas the target allocator may need to allocate new memory from a parent
+ * or RootAllocator. This operation is done in a mostly-lockless but
+ * consistent manner. As such, the overlimit==true situation could occur
+ * slightly prematurely to an actual overlimit==true condition. This is simply
+ * conservative behavior which means we may return overlimit slightly sooner
+ * than is necessary.
*
* @param target
* The allocator to transfer ownership to.
- * @return A new transfer result with the impact of the transfer (whether it was overlimit) as well as the newly
- * created DrillBuf.
+ * @return A new transfer result with the impact of the transfer (whether it
+ * was overlimit) as well as the newly created DrillBuf.
*/
public TransferResult transferOwnership(BufferAllocator target) {
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java
index bfce9a0..36550d0 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java
@@ -43,11 +43,11 @@
* UDLE. Ensures that one allocator owns the memory that multiple allocators may
* be referencing. Manages a BufferLedger between each of its associated
* allocators. This class is also responsible for managing when memory is
- * allocated and returned to the Netty-based PooledByteBufAllocatorL.
- *
+ * allocated and returned to the Netty-based {code PooledByteBufAllocatorL}.
+ * <p>
* The only reason that this isn't package private is we're forced to put
* DrillBuf in Netty's package which need access to these objects or methods.
- *
+ * <p>
* Threading: AllocationManager manages thread-safety internally. Operations
* within the context of a single BufferLedger are lockless in nature and can be
* leveraged by multiple threads. Operations that cross the context of two
@@ -56,7 +56,6 @@
* allocation. As such, there will be thousands of these in a typical query. The
* contention of acquiring a lock on AllocationManager should be very low.
*/
-
public class AllocationManager {
private static final AtomicLong MANAGER_ID_GENERATOR = new AtomicLong(0);
@@ -151,7 +150,6 @@
* AllocationManager that it now longer needs to hold a reference to
* particular piece of memory.
*/
-
private class ReleaseListener {
private final BufferAllocator allocator;
@@ -163,7 +161,6 @@
/**
* Can only be called when you already hold the writeLock.
*/
-
public void release() {
allocator.assertOpen();
@@ -200,7 +197,6 @@
* only reason this is public is due to DrillBuf being in io.netty.buffer
* package.
*/
-
public class BufferLedger {
private final IdentityHashMap<DrillBuf, Object> buffers =
@@ -223,8 +219,10 @@
}
/**
- * Transfer any balance the current ledger has to the target ledger. In the case that the current ledger holds no
- * memory, no transfer is made to the new ledger.
+ * Transfer any balance the current ledger has to the target ledger. In the
+ * case that the current ledger holds no memory, no transfer is made to the
+ * new ledger.
+ *
* @param target
* The ledger to transfer ownership account to.
* @return Whether transfer fit within target ledgers limits.
@@ -241,8 +239,8 @@
return true;
}
- // since two balance transfers out from the allocator manager could cause incorrect accounting, we need to ensure
- // that this won't happen by synchronizing on the allocator manager instance.
+ // since two balance transfers out from the allocator manager could cause incorrect accounting,
+ // we need to ensure that this won't happen by synchronizing on the allocator manager instance.
try (@SuppressWarnings("unused") Closeable write = writeLock.open()) {
if (owningLedger != this) {
return true;
@@ -316,7 +314,6 @@
* zero, this ledger should release its ownership back to the
* AllocationManager
*/
-
public int decrement(int decrement) {
allocator.assertOpen();
@@ -346,7 +343,6 @@
* @param allocator
* @return The ledger associated with a particular BufferAllocator.
*/
-
public BufferLedger getLedgerForAllocator(BufferAllocator allocator) {
return associate((BaseAllocator) allocator);
}
@@ -362,7 +358,6 @@
* @return A new DrillBuf that shares references with all DrillBufs
* associated with this BufferLedger
*/
-
public DrillBuf newDrillBuf(int offset, int length) {
allocator.assertOpen();
return newDrillBuf(offset, length, null);
@@ -411,7 +406,6 @@
*
* @return Size in bytes
*/
-
public int getSize() {
return size;
}
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
index 8514df0..9498cc8 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
@@ -20,20 +20,16 @@
import io.netty.buffer.ByteBuf;
public class BaseRpcOutcomeListener<T> implements RpcOutcomeListener<T> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseRpcOutcomeListener.class);
@Override
- public void failed(RpcException ex) {
- }
+ public void failed(RpcException ex) { }
@Override
- public void success(T value, ByteBuf buffer) {
- }
+ public void success(T value, ByteBuf buffer) { }
/**
* {@inheritDoc}
*/
@Override
- public void interrupted(final InterruptedException ex) {
- }
+ public void interrupted(final InterruptedException ex) { }
}
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java
index 83380e2..cc2d9b9 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java
@@ -23,6 +23,8 @@
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.carrotsearch.hppc.IntObjectHashMap;
import com.carrotsearch.hppc.procedures.IntObjectProcedure;
@@ -38,7 +40,7 @@
* else works via Atomic variables.
*/
class RequestIdMap {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RequestIdMap.class);
+ private static final Logger logger = LoggerFactory.getLogger(RequestIdMap.class);
private final AtomicInteger lastCoordinationId = new AtomicInteger();
private final AtomicBoolean isOpen = new AtomicBoolean(true);
@@ -72,9 +74,9 @@
@Override
public void apply(int key, RpcOutcome<?> value) {
- try{
+ try {
value.setException(exception);
- }catch (final Exception e){
+ } catch (final Exception e){
logger.warn("Failure while attempting to fail rpc response.", e);
}
}
@@ -158,6 +160,7 @@
return rpc;
}
+ @SuppressWarnings("unchecked")
public <V> RpcOutcome<V> getAndRemoveRpcOutcome(int rpcType, int coordinationId, Class<V> clazz) {
final RpcOutcome<?> rpc = removeFromMap(coordinationId);
@@ -172,16 +175,10 @@
clazz.getCanonicalName(), rpcType, outcomeClass.getCanonicalName()));
}
- @SuppressWarnings("unchecked")
- final
- RpcOutcome<V> crpc = (RpcOutcome<V>) rpc;
-
- // logger.debug("Returning casted future");
- return crpc;
+ return (RpcOutcome<V>) rpc;
}
public void recordRemoteFailure(int coordinationId, DrillPBError failure) {
- // logger.debug("Updating failed future.");
try {
final RpcOutcome<?> rpc = removeFromMap(coordinationId);
rpc.setException(new UserRemoteException(failure));
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
index a1c125b..4afa159 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
@@ -27,13 +27,11 @@
*/
void failed(RpcException ex);
-
void success(V value, ByteBuf buffer);
/**
- * Called when the sending thread is interrupted. Possible when the fragment is cancelled due to query cancellations or
- * failures.
+ * Called when the sending thread is interrupted. Possible when the fragment
+ * is cancelled due to query cancellations or failures.
*/
void interrupted(final InterruptedException e);
-
}