DRILL-7929: Update Kafka unit tests to use ClusterTest
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
index 4bfd5d4..757f6e3 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.store.kafka;
 
-import org.apache.drill.PlanTestBase;
 import org.apache.drill.categories.KafkaStorageTest;
 import org.apache.drill.categories.SlowTest;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -59,8 +58,11 @@
         TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
 
     runKafkaSQLVerifyCount(queryString, expectedRowCount);
-    PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
-        new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null);
+    queryBuilder()
+        .sql(queryString)
+        .jsonPlanMatcher()
+        .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+        .match();
   }
 
   /**
@@ -75,8 +77,11 @@
         TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
 
     runKafkaSQLVerifyCount(queryString, expectedRowCount);
-    PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
-        new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null);
+    queryBuilder()
+        .sql(queryString)
+        .jsonPlanMatcher()
+        .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+        .match();
   }
 
   /**
@@ -91,8 +96,11 @@
         TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
 
     runKafkaSQLVerifyCount(queryString,expectedRowCount);
-    PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
-        new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null);
+    queryBuilder()
+        .sql(queryString)
+        .jsonPlanMatcher()
+        .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+        .match();
   }
 
   /**
@@ -108,8 +116,11 @@
         TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
 
     runKafkaSQLVerifyCount(queryString,expectedRowCount);
-    PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
-        new String[] {String.format(EXPECTED_PATTERN, expectedRowInPlan)}, null);
+    queryBuilder()
+        .sql(queryString)
+        .jsonPlanMatcher()
+        .include(String.format(EXPECTED_PATTERN, expectedRowInPlan))
+        .match();
   }
 
   /**
@@ -124,8 +135,11 @@
         TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
 
     runKafkaSQLVerifyCount(queryString,expectedRowCount);
-    PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
-        new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null);
+    queryBuilder()
+        .sql(queryString)
+        .jsonPlanMatcher()
+        .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+        .match();
   }
 
   /**
@@ -140,8 +154,11 @@
         TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
 
     runKafkaSQLVerifyCount(queryString,expectedRowCount);
-    PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
-        new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null);
+    queryBuilder()
+        .sql(queryString)
+        .jsonPlanMatcher()
+        .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+        .match();
   }
 
   /**
@@ -157,8 +174,11 @@
         TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
 
     runKafkaSQLVerifyCount(queryString,expectedRowCount);
-    PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
-        new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null);
+    queryBuilder()
+        .sql(queryString)
+        .jsonPlanMatcher()
+        .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+        .match();
   }
 
   /**
@@ -174,48 +194,66 @@
         TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = 10");
 
     runKafkaSQLVerifyCount(queryString,expectedRowCount);
-    PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
-        new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null);
+    queryBuilder()
+        .sql(queryString)
+        .jsonPlanMatcher()
+        .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+        .match();
 
     //"equal" such that value < startOffset
     queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
         TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = -1");
 
     runKafkaSQLVerifyCount(queryString,expectedRowCount);
-    PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
-        new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null);
+    queryBuilder()
+        .sql(queryString)
+        .jsonPlanMatcher()
+        .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+        .match();
 
     //"greater_than" such that value = endOffset-1
     queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
         TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset > 9");
 
     runKafkaSQLVerifyCount(queryString,expectedRowCount);
-    PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
-        new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null);
+    queryBuilder()
+        .sql(queryString)
+        .jsonPlanMatcher()
+        .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+        .match();
 
     //"greater_than_or_equal" such that value = endOffset
     queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
         TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset >= 10");
 
     runKafkaSQLVerifyCount(queryString,expectedRowCount);
-    PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
-        new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null);
+    queryBuilder()
+        .sql(queryString)
+        .jsonPlanMatcher()
+        .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+        .match();
 
     //"less_than" such that value = startOffset
     queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
         TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset < 0");
 
     runKafkaSQLVerifyCount(queryString,expectedRowCount);
-    PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
-        new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null);
+    queryBuilder()
+        .sql(queryString)
+        .jsonPlanMatcher()
+        .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+        .match();
 
     //"less_than_or_equal" such that value < startOffset
     queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
         TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset <= -1");
 
     runKafkaSQLVerifyCount(queryString,expectedRowCount);
-    PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
-        new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null);
+    queryBuilder()
+        .sql(queryString)
+        .jsonPlanMatcher()
+        .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+        .match();
   }
 
   /**
@@ -231,24 +269,33 @@
         TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = 9");
 
     runKafkaSQLVerifyCount(queryString, expectedRowCount);
-    PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
-        new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null);
+    queryBuilder()
+        .sql(queryString)
+        .jsonPlanMatcher()
+        .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+        .match();
 
     //"greater_than" such that value = endOffset-2
     queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
         TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset > 8");
 
     runKafkaSQLVerifyCount(queryString,expectedRowCount);
-    PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
-        new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null);
+    queryBuilder()
+        .sql(queryString)
+        .jsonPlanMatcher()
+        .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+        .match();
 
     //"greater_than_or_equal" such that value = endOffset-1
     queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
         TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset >= 9");
 
     runKafkaSQLVerifyCount(queryString,expectedRowCount);
-    PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
-        new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null);
+    queryBuilder()
+        .sql(queryString)
+        .jsonPlanMatcher()
+        .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+        .match();
   }
 
   /**
@@ -265,8 +312,11 @@
         TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
 
     runKafkaSQLVerifyCount(queryString,expectedRowCount);
-    PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
-        new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null);
+    queryBuilder()
+        .sql(queryString)
+        .jsonPlanMatcher()
+        .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+        .match();
   }
 
   /**
@@ -283,8 +333,11 @@
         TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
 
     runKafkaSQLVerifyCount(queryString,expectedRowCount);
-    PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
-        new String[] {String.format(EXPECTED_PATTERN, expectedRowInPlan)}, null);
+    queryBuilder()
+        .sql(queryString)
+        .jsonPlanMatcher()
+        .include(String.format(EXPECTED_PATTERN, expectedRowInPlan))
+        .match();
   }
 
   /**
@@ -301,8 +354,11 @@
         TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2, predicate3);
 
     runKafkaSQLVerifyCount(queryString,expectedRowCount);
-    PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
-        new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null);
+    queryBuilder()
+        .sql(queryString)
+        .jsonPlanMatcher()
+        .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+        .match();
   }
 
   /**
@@ -321,8 +377,11 @@
         TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2, predicate3, predicate4);
 
     runKafkaSQLVerifyCount(queryString,expectedRowCount);
-    PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
-        new String[] {String.format(EXPECTED_PATTERN, expectedRowCountInPlan)}, null);
+    queryBuilder()
+        .sql(queryString)
+        .jsonPlanMatcher()
+        .include(String.format(EXPECTED_PATTERN, expectedRowCountInPlan))
+        .match();
   }
 
   /**
@@ -340,8 +399,11 @@
         TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
 
     runKafkaSQLVerifyCount(queryString,expectedRowCount);
-    PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
-        new String[] {String.format(EXPECTED_PATTERN, expectedRowCountInPlan)}, null);
+    queryBuilder()
+        .sql(queryString)
+        .jsonPlanMatcher()
+        .include(String.format(EXPECTED_PATTERN, expectedRowCountInPlan))
+        .match();
   }
 
   /**
@@ -360,7 +422,10 @@
         TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2, predicate3);
 
     runKafkaSQLVerifyCount(queryString,expectedRowCount);
-    PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
-        new String[] {String.format(EXPECTED_PATTERN, expectedRowCountInPlan)}, null);
+    queryBuilder()
+        .sql(queryString)
+        .jsonPlanMatcher()
+        .include(String.format(EXPECTED_PATTERN, expectedRowCountInPlan))
+        .match();
   }
 }
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
index 62d1b66..cc7649f 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
@@ -58,7 +58,7 @@
   }
 
   @Test
-  public void testResultCount() throws Exception {
+  public void testResultCount() {
     String queryString = String.format(TestQueryConstants.MSG_SELECT_QUERY, TestQueryConstants.JSON_TOPIC);
     runKafkaSQLVerifyCount(queryString, TestKafkaSuit.NUM_JSON_MSG);
   }
@@ -94,7 +94,7 @@
   @Test
   public void testInformationSchema() throws Exception {
     String query = "select * from information_schema.`views`";
-    runSQL(query);
+    queryBuilder().sql(query).run();
   }
 
   private Map<TopicPartition, Long> fetchOffsets(int flag) {
@@ -136,7 +136,8 @@
   @Test
   public void testPhysicalPlanSubmission() throws Exception {
     String query = String.format(TestQueryConstants.MSG_SELECT_QUERY, TestQueryConstants.JSON_TOPIC);
-    testPhysicalPlanExecutionBasedOnQuery(query);
+    String plan = queryBuilder().sql(query).explainJson();
+    queryBuilder().physical(plan).run();
   }
 
   @Test
@@ -162,15 +163,15 @@
       KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class);
       generator.populateMessages(topicName, "Test");
 
-      alterSession(ExecConstants.KAFKA_READER_SKIP_INVALID_RECORDS, false);
+      client.alterSession(ExecConstants.KAFKA_READER_SKIP_INVALID_RECORDS, false);
       try {
-        test("select * from kafka.`%s`", topicName);
+        queryBuilder().sql("select * from kafka.`%s`", topicName).run();
         fail();
       } catch (UserException e) {
         // expected
       }
 
-      alterSession(ExecConstants.KAFKA_READER_SKIP_INVALID_RECORDS, true);
+      client.alterSession(ExecConstants.KAFKA_READER_SKIP_INVALID_RECORDS, true);
       testBuilder()
         .sqlQuery("select * from kafka.`%s`", topicName)
         .expectsEmptyResultSet();
@@ -185,7 +186,7 @@
         .baselineValues(2L)
         .go();
     } finally {
-      resetSessionOption(ExecConstants.KAFKA_READER_SKIP_INVALID_RECORDS);
+      client.resetSession(ExecConstants.KAFKA_READER_SKIP_INVALID_RECORDS);
     }
   }
 
@@ -197,15 +198,15 @@
       KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class);
       generator.populateMessages(topicName, "{\"nan_col\":NaN, \"inf_col\":Infinity}");
 
-      alterSession(ExecConstants.KAFKA_READER_NAN_INF_NUMBERS, false);
+      client.alterSession(ExecConstants.KAFKA_READER_NAN_INF_NUMBERS, false);
       try {
-        test("select nan_col, inf_col from kafka.`%s`", topicName);
+        queryBuilder().sql("select nan_col, inf_col from kafka.`%s`", topicName).run();
         fail();
       } catch (UserException e) {
         // expected
       }
 
-      alterSession(ExecConstants.KAFKA_READER_NAN_INF_NUMBERS, true);
+      client.alterSession(ExecConstants.KAFKA_READER_NAN_INF_NUMBERS, true);
       testBuilder()
         .sqlQuery("select nan_col, inf_col from kafka.`%s`", topicName)
         .unOrdered()
@@ -213,7 +214,7 @@
         .baselineValues(Double.NaN, Double.POSITIVE_INFINITY)
         .go();
     } finally {
-      resetSessionOption(ExecConstants.KAFKA_READER_NAN_INF_NUMBERS);
+      client.resetSession(ExecConstants.KAFKA_READER_NAN_INF_NUMBERS);
     }
   }
 
@@ -225,15 +226,15 @@
       KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class);
       generator.populateMessages(topicName, "{\"name\": \"AB\\\"\\C\"}");
 
-      alterSession(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR, false);
+      client.alterSession(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR, false);
       try {
-        test("select name from kafka.`%s`", topicName);
+        queryBuilder().sql("select name from kafka.`%s`", topicName).run();
         fail();
       } catch (UserException e) {
         // expected
       }
 
-      alterSession(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR, true);
+      client.alterSession(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR, true);
       testBuilder()
         .sqlQuery("select name from kafka.`%s`", topicName)
         .unOrdered()
@@ -241,7 +242,7 @@
         .baselineValues("AB\"C")
         .go();
     } finally {
-      resetSessionOption(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR);
+      client.resetSession(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR);
     }
   }
 }
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
index f3c24e6..56e8138 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
@@ -17,15 +17,15 @@
  */
 package org.apache.drill.exec.store.kafka;
 
-import java.util.List;
 import java.util.Map;
 
-import org.apache.drill.PlanTestBase;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.kafka.cluster.EmbeddedKafkaCluster;
+import org.apache.drill.exec.store.kafka.decoders.JsonMessageReader;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -34,47 +34,42 @@
 
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 
-public class KafkaTestBase extends PlanTestBase {
+public class KafkaTestBase extends ClusterTest {
   protected static KafkaStoragePluginConfig storagePluginConfig;
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     // Make sure this test is only running as part of the suit
     Assume.assumeTrue(TestKafkaSuit.isRunningSuite());
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+    startCluster(builder);
     TestKafkaSuit.initKafka();
     initKafkaStoragePlugin(TestKafkaSuit.embeddedKafkaCluster);
   }
 
   public static void initKafkaStoragePlugin(EmbeddedKafkaCluster embeddedKafkaCluster) throws Exception {
-    final StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage();
+    final StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage();
     Map<String, String> kafkaConsumerProps = Maps.newHashMap();
     kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaCluster.getKafkaBrokerList());
     kafkaConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "drill-test-consumer");
     storagePluginConfig = new KafkaStoragePluginConfig(kafkaConsumerProps);
     storagePluginConfig.setEnabled(true);
     pluginRegistry.put(KafkaStoragePluginConfig.NAME, storagePluginConfig);
-    testNoResult(String.format("alter session set `%s` = '%s'", ExecConstants.KAFKA_RECORD_READER,
-        "org.apache.drill.exec.store.kafka.decoders.JsonMessageReader"));
-    testNoResult(String.format("alter session set `%s` = %d", ExecConstants.KAFKA_POLL_TIMEOUT, 5000));
+    client.alterSession(ExecConstants.KAFKA_RECORD_READER, JsonMessageReader.class.getName());
+    client.alterSession(ExecConstants.KAFKA_POLL_TIMEOUT, 5000);
   }
 
-  public List<QueryDataBatch> runKafkaSQLWithResults(String sql) throws Exception {
-    return testSqlWithResults(sql);
-  }
-
-  public void runKafkaSQLVerifyCount(String sql, int expectedRowCount) throws Exception {
-    List<QueryDataBatch> results = runKafkaSQLWithResults(sql);
-    logResultAndVerifyRowCount(results, expectedRowCount);
-  }
-
-  public void logResultAndVerifyRowCount(List<QueryDataBatch> results, int expectedRowCount)
-      throws SchemaChangeException {
-    int rowCount = logResult(results);
+  public void runKafkaSQLVerifyCount(String sql, int expectedRowCount) {
+    long rowCount = queryBuilder().sql(sql).log();
     if (expectedRowCount != -1) {
       Assert.assertEquals(expectedRowCount, rowCount);
     }
   }
 
+  public static long testSql(String sql) {
+    return client.queryBuilder().sql(sql).log();
+  }
+
   @AfterClass
   public static void tearDownKafkaTestBase() {
     if (TestKafkaSuit.isRunningSuite()) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index 0be54f3..fb1fe15 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -672,6 +672,18 @@
     return new PlanMatcher(plan);
   }
 
+  /**
+   * Submits explain plan statement
+   * and creates plan matcher instance based on return query plan.
+   *
+   * @return plan matcher
+   * @throws Exception if the query fails
+   */
+  public PlanMatcher jsonPlanMatcher() throws Exception {
+    String plan = explainJson();
+    return new PlanMatcher(plan);
+  }
+
   private QuerySummary produceSummary(BufferingQueryEventListener listener) throws Exception {
     long start = System.currentTimeMillis();
     int recordCount = 0;