DRILL-8235: Add Storage Plugin for Google Sheets (#2585)

* Getting there...

* Added Limit unit test

* Code fixes

* SerDe and Agg unit tests now passing

* Fixed type conversion bug

* Various fixes

* Fix LGTM errors

* Fix LGTM, Add unit tests

* Fixed checkstyle

* Revised unit tests

* Addressed Doc Updates

* Fix unit test

* Moved Typifier to common

* Updated comments

* Removed duplicate OAuthToken class

* Consolidated column writers

* Fixed writer bug

* Added note to writer test

* Removed case insensitive config

* Remove unused import

* Updated Readme for Windows
diff --git a/.gitignore b/.gitignore
index fe5a3c0..76a5687 100644
--- a/.gitignore
+++ b/.gitignore
@@ -28,3 +28,4 @@
 .*.html
 venv/
 tools/venv/
+
diff --git a/common/src/main/java/org/apache/drill/common/Typifier.java b/common/src/main/java/org/apache/drill/common/Typifier.java
new file mode 100644
index 0000000..1c4b570
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/Typifier.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.common;
+
+import java.nio.CharBuffer;
+
+import java.time.LocalDate;
+import java.time.format.DateTimeParseException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Map.Entry;
+import java.util.AbstractMap.SimpleEntry;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+
+/**
+ * This class attempts to infer the data type of an unknown data type. It is somewhat
+ * configurable.  This was sourced from <a href="https://gist.github.com/awwsmm/56b8164410c89c719ebfca7b3d85870b">this code on github</a>.
+ */
+public class Typifier {
+
+  private static final Locale defaultLocale = new Locale("en");
+
+  private static final HashSet<DateTimeFormatter> formats = new HashSet<>(
+    Arrays.asList(
+      DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", defaultLocale),
+      DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SS", defaultLocale),
+      DateTimeFormatter.ofPattern("MM/dd/yyyy hh:mm:ss a", defaultLocale),
+      DateTimeFormatter.ofPattern("M/d/yy H:mm", defaultLocale),
+      DateTimeFormatter.ofPattern("dd/MM/yyyy HH:mm:ss", defaultLocale)));
+
+  private static final HashSet<DateTimeFormatter> dateFormats = new HashSet<>(
+    Arrays.asList(
+      DateTimeFormatter.ofPattern("yyyy-MM-dd", defaultLocale),
+      DateTimeFormatter.ofPattern("MM/dd/yyyy", defaultLocale),
+      DateTimeFormatter.ofPattern("M/d/yy", defaultLocale),
+      DateTimeFormatter.ofPattern("dd/MM/yyyy", defaultLocale),
+      DateTimeFormatter.ofPattern("yyyy/MM/dd", defaultLocale),
+      DateTimeFormatter.ofPattern("M d, yyyy", defaultLocale)
+    ));
+
+  // Only Strings contain these characters -- skip all numeric processing
+  // arranged roughly by frequency in ~130MB of sample DASGIP files:
+  //   $ awk -vFS="" '{for(i=1;i<=NF;i++)w[$i]++}END{for(i in w) print i,w[i]}' file.txt
+  private static final char[] StringCharacters = new
+    char[] {' ', ':', 'n', 'a', 't', 'r', 'o', 'C', 'i', 'P', 'D', 's', 'c', 'S', 'u', 'A', 'm', '=', 'O', '\\', 'd', 'p', 'T', 'M', 'g', 'I', 'b', 'U', 'h', 'H'};
+
+  // Typify looks for the above characters in an input String before it makes
+  // any attempt at parsing that String. If it finds any of the above characters,
+  // it immediately skips to the String-processing section, because no numerical
+  // type can contain those characters.
+
+  // Adding more characters means that there are more characters to look for in
+  // the input String every time a piece of data is parsed, but it also reduces
+  // the likelihood that an Exception will be thrown when String data is attempted
+  // to be parsed as numerical data (which saves time).
+
+  // The characters below can also be added to the list, but the list above
+  // seems to be near-optimal
+
+  //  'J', '+', 'V', 'B', 'G',   'R', 'y', '(', ')', 'v',   '_', ',', '[', ']', '/',
+  //  'N', 'k', 'w', '}', '{',   'X', '%', '>', 'x', '\'',  'W', '<', 'K', 'Q', 'q',
+  //  'z', 'Y', 'j', 'Z', '!',   '#', '$', '&', '*', ',',   ';', '?', '@', '^', '`',
+  //  '|', '~'};
+
+  private static final String[] falseAliases = new String[]{"false", "False", "FALSE"};
+
+  private static final String[] trueAliases = new String[]{"true", "True", "TRUE"};
+
+  // If a String contains any of these, try to evaluate it as an equation
+  private static final char[] MathCharacters = new char[]{'+', '-', '/', '*', '='};
+
+  // default is:
+  //   > don't interpret "0" and "1" as true and false
+  //   > restrict interpretation to common types
+  //   > don't allow f/F/l/L postfixes for float/long numbers
+  //   > attempt to parse dates
+
+  /**
+   * Attempts to classify String input as double, int, char, etc.
+   * Common types are: boolean, double, string, timestamp.
+   * This is the default constructor for the simplest use case.  The defaults are not
+   * to interpet 0,1 as true/false, restrict interpretation to common types,
+   * not to allow f/F/l/L postfixes for float and longs and attempts to parse
+   * dates.
+   * @param data Input string of data
+   * @return An Entry of the Class and the original value
+   */
+  public static Entry<Class, String> typify(String data) {
+    return typify(data, false, true, true, true);
+  }
+
+  /**
+   * Attempts to determine the best data type for an unknown bit of text.  This
+   * constructor allows you to set a few
+   * @param data The unknown data string
+   * @param bool01 True, if you want 0/1 to be marked as boolean, false if not
+   * @param commonTypes Limit typifier to boolean, double, string, timestamp
+   * @param postfixFL Allow typifier to consider f/F/l/L postfixes for float and longs
+   * @param parseDates Attempt to parse timestamps
+   * @return An {@link Entry} consisting of the object class and the original value as a String
+   */
+  public static Entry<Class, String> typify(String data,
+                                            boolean bool01,
+                                            boolean commonTypes,
+                                            boolean postfixFL,
+                                            boolean parseDates) {
+
+    // -2. if the input data has 0 length, return as null object
+    if (data == null || data.length() == 0) {
+      return new SimpleEntry<>(Object.class, null);
+    }
+
+    String s = data.trim();
+    int slen = s.length();
+
+    // -1. if the input data is only whitespace, return "String" and input as-is
+    if (slen == 0) {
+      return new SimpleEntry<>(String.class, data);
+    }
+
+    // In most data, numerical values are more common than true/false values. So,
+    // if we want to speed up data parsing, we can move this block to the end when
+    // looking only for common types.
+
+    /// Check if the data is Boolean (true or false)
+    if (!commonTypes) {
+      if (contains(falseAliases, s)) {
+        return new SimpleEntry<>(Boolean.class, "false");
+      } else if (contains(trueAliases, s)) {
+        return new SimpleEntry<>(Boolean.class, "true");
+      }
+    }
+
+    // Check for any String-only characters; if we find them, don't bother trying to parse this as a number
+    if (!containsAny(s, StringCharacters)) {
+
+      // try again for boolean -- need to make sure it's not parsed as Byte
+      if (bool01) {
+        if (s.equals("0")) {
+          return new SimpleEntry<>(Boolean.class, "false");
+        } else if (s.equals("1")) {
+          return new SimpleEntry<>(Boolean.class, "true");
+        }
+      }
+
+      char lastChar = s.charAt(slen - 1);
+      boolean lastCharF = (lastChar == 'f' || lastChar == 'F');
+      boolean lastCharL = (lastChar == 'l' || lastChar == 'L');
+
+      // If we're not restricted to common types, look for anything
+      if (!commonTypes) {
+        // 1. Check if data is a Byte (1-byte integer with range [-(2e7) = -128, ((2e7)-1) = 127])
+        try {
+          byte b = Byte.parseByte(s);
+          return new SimpleEntry<>(Byte.class, Byte.toString(b));
+        } catch (NumberFormatException ex) {
+          // Okay, guess it's not a Byte
+        }
+
+        // 2. Check if data is a Short (2-byte integer with range [-(2e15) = -32768, ((2e15)-1) = 32767])
+        try {
+          short h = Short.parseShort(s);
+          return new SimpleEntry<>(Short.class, Short.toString(h));
+        } catch (NumberFormatException ex) {
+          // Okay, guess it's not a Short
+        }
+
+        // 3. Check if data is an Integer (4-byte integer with range [-(2e31), (2e31)-1])
+        try {
+          int i = Integer.parseInt(s);
+          return new SimpleEntry<>(Integer.class, Integer.toString(i));
+        } catch (NumberFormatException ex) {
+          // okay, guess it's not an Integer
+        }
+        String s_L_trimmed = s;
+
+        // 4. Check if data is a Long (8-byte integer with range [-(2e63), (2e63)-1])
+        //    ...first, see if the last character of the string is "L" or "l"
+        //    ... Java parses "3.3F", etc. fine as a float, but throws an error with "3L", etc.
+        if (postfixFL && slen > 1 && lastCharL) {
+          s_L_trimmed = s.substring(0, slen - 1);
+        }
+
+        try {
+          long l = Long.parseLong(s_L_trimmed);
+          return new SimpleEntry<>(Long.class, Long.toString(l));
+        } catch (NumberFormatException ex) {
+          // okay, guess it's not a Long
+        }
+
+        // 5. Check if data is a Float (32-bit IEEE 754 floating point with approximate extents +/- 3.4028235e38)
+        if (postfixFL || !lastCharF) {
+          try {
+            float f = Float.parseFloat(s);
+            // If it's beyond the range of Float, maybe it's not beyond the range of Double
+            if (!Float.isInfinite(f)) {
+              return new SimpleEntry<>(Float.class, Float.toString(f));
+            }
+          } catch (NumberFormatException ex) {
+            // okay, guess it's not a Float
+          }
+        }
+      }
+
+      // 6. Check if data is a Double (64-bit IEEE 754 floating point with approximate extents +/- 1.797693134862315e308 )
+      if (postfixFL || !lastCharF) {
+        try {
+          double d = Double.parseDouble(s);
+          if (!Double.isInfinite(d)) {
+            return new SimpleEntry<>(Double.class, Double.toString(d));
+          } else {
+            return new SimpleEntry<>(String.class, s);
+          }
+        } catch (NumberFormatException ex) {
+          // okay, guess it's not a Double
+        }
+      }
+    }
+
+    // Check for either Boolean or String
+    if (commonTypes) {
+      if (contains(falseAliases, s)) {
+        return new SimpleEntry<>(Boolean.class, "false");
+      } else if (contains(trueAliases, s)) {
+        return new SimpleEntry<>(Boolean.class, "true");
+      }
+    }
+
+    // 7. revert to String by default, with caveats...
+
+    // 7a. if string has length 1, it is a single character
+    if (!commonTypes && slen == 1) {
+      return new SimpleEntry<>(Character.class, s); // end uncommon types 2/2
+    }
+
+    // 7b. attempt to parse String as a LocalDateTime
+    if (parseDates && stringAsDateTime(s) != null) {
+      return new SimpleEntry<>(LocalDateTime.class, s);
+    }
+
+    // 7c. Attempt to parse the String as a LocalDate
+    if (parseDates && stringAsDate(s) != null) {
+      return new SimpleEntry<>(LocalDate.class, s);
+    }
+
+    // ...if we've made it all the way to here without returning, give up and return "String" and input as-is
+    return new SimpleEntry<>(String.class, data);
+  }
+
+  /**
+   * Helper function that attempts to parse a String as a LocalDateTime.  If the
+   * string cannot be parsed, return null.
+   * @param date Input date string
+   * @return LocalDateTime representation of the input String.
+   */
+  private static LocalDateTime stringAsDateTime(String date) {
+    for (DateTimeFormatter format : formats) {
+      try {
+        return LocalDateTime.parse(date, format);
+      } catch (DateTimeParseException ex) {
+        // can't parse it as this format, but maybe the next one...?
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Helper function that attempts to parse a String as a LocalDateTime.  If the
+   * string cannot be parsed, return null.
+   * @param date Input date string
+   * @return LocalDateTime representation of the input String.
+   */
+  private static LocalDate stringAsDate(String date) {
+    for (DateTimeFormatter format : dateFormats) {
+      try {
+        return LocalDate.parse(date, format);
+      } catch (DateTimeParseException ex) {
+        // can't parse it as this format, but maybe the next one...?
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Returns true if any of the source characters are found in the target.
+   * @param target The target character sequence AKA the haystack.
+   * @param source The source characters, AKA the needle
+   * @return True if the needle is in the haystack, false if not
+   */
+  public static boolean containsAny(CharSequence target, CharSequence source) {
+    if (target == null || target.length() == 0 || source == null || source.length() == 0) {
+      return false;
+    }
+
+    for (int aa = 0; aa < target.length(); ++aa) {
+      for (int bb = 0; bb < source.length(); ++bb) {
+        if (source.charAt(bb) == target.charAt(aa)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  public static boolean containsAny(CharSequence target, char[] source) {
+    return containsAny(target, CharBuffer.wrap(source));
+  }
+
+  /**
+   * Checks if a target array contains the source term.
+   * @param target The target array
+   * @param source The source term
+   * @param <T> Unknown class
+   * @return True if the target array contains the source term, false if not
+   */
+  public static <T> boolean contains(T[] target, T source) {
+    if (source == null) {
+      return false;
+    }
+    for (T t : target) {
+      if (t != null && t.equals(source)) {
+        return true;
+      }
+    }
+    return false;
+  }
+}
diff --git a/contrib/format-httpd/src/test/java/org/apache/drill/exec/store/httpd/TestHTTPDLogReaderUserAgent.java b/contrib/format-httpd/src/test/java/org/apache/drill/exec/store/httpd/TestHTTPDLogReaderUserAgent.java
index 131ea48..15da1da 100644
--- a/contrib/format-httpd/src/test/java/org/apache/drill/exec/store/httpd/TestHTTPDLogReaderUserAgent.java
+++ b/contrib/format-httpd/src/test/java/org/apache/drill/exec/store/httpd/TestHTTPDLogReaderUserAgent.java
@@ -105,7 +105,7 @@
                     "          `request_user-agent_device__name`,                 " +
                     "          `request_user-agent_agent__name__version__major`   " +
                     "FROM       table(                                            " +
-                    "             cp.`httpd/typeremap.log`                        " +
+                    "             cp.`httpd/typeremap.httpd`                        " +
                     "                 (                                           " +
                     "                   type => 'httpd',                          " +
                     "                   logFormat => 'common\ncombined\n%h %l %u %t \"%r\" %>s %b %{RequestId}o\n',\n" +
@@ -140,7 +140,7 @@
             "          `request_user-agent_device__name`,                 " +
             "          `request_user-agent_agent__name__version__major`   " +
             "FROM       table(                                            " +
-            "             cp.`httpd/typeremap.log`                        " +
+            "             cp.`httpd/typeremap.httpd`                        " +
             "                 (                                           " +
             "                   type => 'httpd',                          " +
             "                   logFormat => 'common\ncombined\n%h %l %u %t \"%r\" %>s %b %{RequestId}o\n',\n" +
@@ -182,7 +182,7 @@
             "        , `response_header_requestid_epoch`                                      \n" +
 //            "        , *                                                                     \n"+
             "FROM       table(                                                                \n" +
-            "             cp.`httpd/typeremap.log`                                            \n" +
+            "             cp.`httpd/typeremap.httpd`                                            \n" +
             "                 (                                                               \n" +
             "                   type => 'httpd',                                              \n" +
             //                  LogFormat: Mind the leading and trailing spaces! Empty lines are ignored
diff --git a/contrib/format-httpd/src/test/resources/httpd/typeremap.log b/contrib/format-httpd/src/test/resources/httpd/typeremap.httpd
similarity index 100%
rename from contrib/format-httpd/src/test/resources/httpd/typeremap.log
rename to contrib/format-httpd/src/test/resources/httpd/typeremap.httpd
diff --git a/contrib/native/client/patches/zookeeper-3.4.6-x64.patch b/contrib/native/client/patches/zookeeper-3.4.6-x64.patch
deleted file mode 100644
index 96f2d10..0000000
--- a/contrib/native/client/patches/zookeeper-3.4.6-x64.patch
+++ /dev/null
@@ -1,163 +0,0 @@
-From 64697ddd8a90f29d1693658f04e975e435e3c869 Mon Sep 17 00:00:00 2001
-From: unknown <norrisl@NorrisL.simba.ad>
-Date: Thu, 5 Jun 2014 16:40:48 -0700
-Subject: [PATCH] Allow zookeeper to build in x64
-
----
- src/c/include/winstdint.h |  4 ++++
- src/c/src/mt_adaptor.c    | 54 +++++++++++++++++++++++------------------------
- 2 files changed, 30 insertions(+), 28 deletions(-)
-
-diff --git a/src/c/include/winstdint.h b/src/c/include/winstdint.h
-index d02608a..df405f7 100644
---- a/src/c/include/winstdint.h
-+++ b/src/c/include/winstdint.h
-@@ -40,6 +40,9 @@
- #pragma once
- #endif
- 
-+#if (_MSC_VER > 1500) // Visual Studio 2010 and Beyond
-+#include <stdint.h>
-+#else 
- #include <limits.h>
- 
- // For Visual Studio 6 in C++ mode and for many Visual Studio versions when
-@@ -244,4 +247,5 @@ typedef uint64_t  uintmax_t;
- #endif // __STDC_CONSTANT_MACROS ]
- 
- 
-+#endif
- #endif // _MSC_STDINT_H_ ]
-diff --git a/src/c/src/mt_adaptor.c b/src/c/src/mt_adaptor.c
-index 974063f..5ce0fd9 100644
---- a/src/c/src/mt_adaptor.c
-+++ b/src/c/src/mt_adaptor.c
-@@ -114,7 +114,7 @@ int process_async(int outstanding_sync)
- unsigned __stdcall do_io( void * );
- unsigned __stdcall do_completion( void * );
- 
--int handle_error(SOCKET sock, char* message)
-+int handle_error(zhandle_t* zh, SOCKET sock, char* message)
- {
-        LOG_ERROR(("%s. %d",message, WSAGetLastError()));
-        closesocket (sock);
-@@ -122,7 +122,7 @@ int handle_error(SOCKET sock, char* message)
- }
- 
- //--create socket pair for interupting selects.
--int create_socket_pair(SOCKET fds[2]) 
-+int create_socket_pair(zhandle_t* zh, SOCKET fds[2]) 
- { 
-     struct sockaddr_in inaddr; 
-     struct sockaddr addr; 
-@@ -141,23 +141,23 @@ int create_socket_pair(SOCKET fds[2])
-     inaddr.sin_port = 0; //--system assigns the port
- 
-     if ( setsockopt(lst,SOL_SOCKET,SO_REUSEADDR,(char*)&yes,sizeof(yes)) == SOCKET_ERROR  ) {
--       return handle_error(lst,"Error trying to set socket option.");          
-+       return handle_error(zh, lst,"Error trying to set socket option.");          
-     }  
-     if (bind(lst,(struct sockaddr *)&inaddr,sizeof(inaddr)) == SOCKET_ERROR){
--       return handle_error(lst,"Error trying to bind socket.");                
-+       return handle_error(zh, lst,"Error trying to bind socket.");                
-     }
-     if (listen(lst,1) == SOCKET_ERROR){
--       return handle_error(lst,"Error trying to listen on socket.");
-+       return handle_error(zh, lst,"Error trying to listen on socket.");
-     }
-     len=sizeof(inaddr); 
-     getsockname(lst, &addr,&len); 
-     fds[0]=socket(AF_INET, SOCK_STREAM,0); 
-     if (connect(fds[0],&addr,len) == SOCKET_ERROR){
--       return handle_error(lst, "Error while connecting to socket.");
-+       return handle_error(zh, lst, "Error while connecting to socket.");
-     }
-     if ((fds[1]=accept(lst,0,0)) == INVALID_SOCKET){
-        closesocket(fds[0]);
--       return handle_error(lst, "Error while accepting socket connection.");
-+       return handle_error(zh, lst, "Error while accepting socket connection.");
-     }
-     closesocket(lst);  
-     return 0;
-@@ -238,11 +238,11 @@ int adaptor_init(zhandle_t *zh)
- 
-     /* We use a pipe for interrupting select() in unix/sol and socketpair in windows. */
- #ifdef WIN32   
--    if (create_socket_pair(adaptor_threads->self_pipe) == -1){
-+    if (create_socket_pair(zh, adaptor_threads->self_pipe) == -1){
-        LOG_ERROR(("Can't make a socket."));
- #else
-     if(pipe(adaptor_threads->self_pipe)==-1) {
--        LOG_ERROR(("Can't make a pipe %d",errno));
-+        LOG_ERROR(LOGCALLBACK(zh), "Can't make a pipe %d",errno);
- #endif
-         free(adaptor_threads);
-         return -1;
-@@ -255,6 +255,7 @@ int adaptor_init(zhandle_t *zh)
-     zh->adaptor_priv = adaptor_threads;
-     pthread_mutex_init(&zh->to_process.lock,0);
-     pthread_mutex_init(&adaptor_threads->zh_lock,0);
-+    pthread_mutex_init(&adaptor_threads->zh_lock,0);
-     // to_send must be recursive mutex    
-     pthread_mutexattr_init(&recursive_mx_attr);
-     pthread_mutexattr_settype(&recursive_mx_attr, PTHREAD_MUTEX_RECURSIVE);
-@@ -364,7 +365,7 @@ void *do_io(void *v)
- 
-     api_prolog(zh);
-     notify_thread_ready(zh);
--    LOG_DEBUG(("started IO thread"));
-+    LOG_DEBUG(LOGCALLBACK(zh), "started IO thread");
-     fds[0].fd=adaptor_threads->self_pipe[0];
-     fds[0].events=POLLIN;
-     while(!zh->close_requested) {
-@@ -483,25 +484,9 @@ int32_t inc_ref_counter(zhandle_t* zh,int i)
- int32_t fetch_and_add(volatile int32_t* operand, int incr)
- {
- #ifndef WIN32
--    int32_t result;
--    asm __volatile__(
--         "lock xaddl %0,%1\n"
--         : "=r"(result), "=m"(*(int *)operand)
--         : "0"(incr)
--         : "memory");
--   return result;
-+    return __sync_fetch_and_add(operand, incr);
- #else
--    volatile int32_t result;
--    _asm
--    {
--        mov eax, operand; //eax = v;
--       mov ebx, incr; // ebx = i;
--        mov ecx, 0x0; // ecx = 0;
--        lock xadd dword ptr [eax], ecx; 
--       lock xadd dword ptr [eax], ebx; 
--        mov result, ecx; // result = ebx;        
--     }
--     return result;    
-+    return InterlockedExchangeAdd(operand, incr);
- #endif
- }
- 
-@@ -515,6 +500,19 @@ __attribute__((constructor)) int32_t get_xid()
-     return fetch_and_add(&xid,1);
- }
- 
-+void lock_reconfig(struct _zhandle *zh)
-+{
-+    struct adaptor_threads *adaptor = zh->adaptor_priv;
-+    if(adaptor)
-+        pthread_mutex_lock(&adaptor->zh_lock);
-+}
-+void unlock_reconfig(struct _zhandle *zh)
-+{
-+    struct adaptor_threads *adaptor = zh->adaptor_priv;
-+    if(adaptor)
-+        pthread_mutex_lock(&adaptor->zh_lock);
-+}
-+
- void enter_critical(zhandle_t* zh)
- {
-     struct adaptor_threads *adaptor = zh->adaptor_priv;
--- 
-1.9.2.msysgit.0
-
diff --git a/contrib/native/client/readme.win.txt b/contrib/native/client/readme.win.txt
index c4ed704..01f118d 100644
--- a/contrib/native/client/readme.win.txt
+++ b/contrib/native/client/readme.win.txt
@@ -129,12 +129,12 @@
     d) Build the protobuf project first (not the solution)
     e) Build the solution!
 
-2.3 Zookeeper (3.4.6) 
+2.3 Zookeeper (3.5.7)
     a) Set the ZOOKEEPER_HOME environment variable
-    b) The 3.4.6 release of Zookeeper does not build correctly on 64 bit windows. To
-    fix that for the 64 bit build, apply patch zookeeper-3.4.6-x64.patch
-    For example in Msysgit 
-        $ cd <ZOOKEEPER_HOME> && git apply <DRILL_HOME>/contrib/native/client/patches/zookeeper-3.4.6-x64.patch
+    b) The 3.5.7 release of Zookeeper is the current verison of Zookeeper and will build on Windows.  Note:
+    there is a patch in git history which allowed Drill to run using ZooKeeper 3.4.6.
+     However, this version is ancient, ZooKeeper 3.5.7 is current and does not need any patch to build on Windows so the
+    patch remains in the Git history only
     c) In Visual Studio 2010 Express open <ZOOKEEPER_HOME>/src/c/zookeeper.sln
         i) Add a 64 bit project configuration for each project. (Make sure the
             platform toolset is set to Windows7.1SDK)
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 44c1e03..d77bfa2 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -56,6 +56,7 @@
     <module>format-image</module>
     <module>format-pcapng</module>
     <module>storage-phoenix</module>
+    <module>storage-googlesheets</module>
     <module>storage-hive</module>
     <module>storage-mongo</module>
     <module>storage-jdbc</module>
diff --git a/contrib/storage-googlesheets/.gitignore b/contrib/storage-googlesheets/.gitignore
new file mode 100644
index 0000000..9f011a5
--- /dev/null
+++ b/contrib/storage-googlesheets/.gitignore
@@ -0,0 +1,2 @@
+# Directory to store oauth tokens for testing Googlesheets Storage plugin
+src/test/resources/tokens/*
diff --git a/contrib/storage-googlesheets/README.md b/contrib/storage-googlesheets/README.md
new file mode 100644
index 0000000..14a367e
--- /dev/null
+++ b/contrib/storage-googlesheets/README.md
@@ -0,0 +1,156 @@
+# Google Sheets Connector for Apache Drill
+This connector enables you to query and write to Google Sheets.  
+
+### Usage Notes:
+This feature should be considered experimental as Google's API for Sheets is quite complex and amazingly 
+poorly documented.
+
+## Setup Step 1:  Obtain Credential Information from Google
+Ok... this is a pain.  Google Sheets uses OAuth2.0 (may it be quickly deprecated) for authorization. In order to query Google Sheets, you will first need to obtain three artifacts:
+
+* Your `clientID`:  This is an identifier which uniquely identifies your application to Google
+* Your `client_secret`: You can think of this as your password for your application to access Google Sheets
+* Your redirect URL:  This is the URL to which Google will send the various access tokens and which you will need later.  For a local installation of Drill, it will be: 
+  `http://localhost:8047/credentials/<plugin name>/update_oauth2_authtoken`.
+
+1. To obtain the `clientID` and `clientSecret` you will need to obtain the Google keys, open the [Google Sheets API](https://console.cloud.google.com/apis/library/sheets.googleapis.com) and click on the `Enable` button. 
+2. Once you've enabled the API, you will be taken to the API Manager.  Either select an existing project or create a new one.
+3. Next, navigate to the `Credentials` in the left panel.
+4. Click on `+Create Credentials` at the top of the page.  Select `OAuth client ID` and select `Web Application` or `Desktop` as the type.  Follow the instructions and download 
+   the JSON file that Google provides.
+
+Drill does not use the JSON file, but you will be cutting and pasting values from the JSON file into the Drill configuration.
+
+## Setup Step 2:  Configure Drill
+Create a storage plugin following the normal procedure for doing so.  You can use the example below as a template.  Cut and paste the `clientID` and `client_secret` from the 
+JSON file into your Drill configuration as shown below.  Once you've done that, save the configuration.
+
+```json
+{
+  "type": "googlesheets",
+  "allTextMode": true,
+  "extractHeaders": true,
+  "oAuthConfig": {
+    "callbackURL": "http://localhost:8047/credentials/googlesheets/update_oauth2_authtoken",
+    "authorizationURL": "https://accounts.google.com/o/oauth2/auth",
+    "authorizationParams": {
+      "response_type": "code",
+      "scope": "https://www.googleapis.com/auth/spreadsheets"
+    }
+  },
+  "credentialsProvider": {
+    "credentialsProviderType": "PlainCredentialsProvider",
+    "credentials": {
+      "clientID": "<YOUR CLIENT ID>",
+      "clientSecret": "<YOUR CLIENT SECRET>",
+      "tokenURI": "https://oauth2.googleapis.com/token"
+    },
+    "userCredentials": {}
+  },
+  "enabled": true,
+  "authMode": "SHARED_USER"
+}
+```
+
+With the exception of the clientID, client_secret and redirects, you should not have to modify any of the other parameters in the configuration. 
+
+### Other Configuration Parameters
+
+There are two configuration parameters which you may want to adjust:
+* `allTextMode`:  This parameter when `true` disables Drill's data type inferencing for your files.  If your data has inconsistent data types, set this to `true`.  Default is 
+  `true`. 
+* `extractHeaders`:  When `true`, Drill will treat the first row of your data as headers.  When `false` Drill will assign column names like `field_n` for each column.
+
+### Authenticating with Google
+Once you have configured Drill to query Google Sheets, there is one final step before you can access data.  You must authenticate the application (Drill) with Google Sheets.  After you have saved your Google Sheets configuration, navigate back to the configuration screen for your plugin and click on `Authorize`. A new window should appear which will prompt you to authenticate with Google services.  Once you have done that, you should be able to query Google Sheets!  See, that wasn't so hard!
+
+### Authentication Modes:
+The Google Sheets plugin supports the `SHARED_USER` and `USER_TRANSLATION` authentication modes. `SHARED_USER` is as the name implies, one user for everyone. `USER_TRANSLATION` 
+uses different credentials for each individual user.  In this case, the credentials are the OAuth2.0 access tokens.  
+
+At the time of writing, we have not yet documented `USER_TRANSLATION` fully, however we will update this readme once that is complete.
+
+## Querying Data
+Once you have configured Drill to connect to Google Sheets, querying is very straightforward.
+
+### Obtaining the SpreadsheetID
+The URL below is a public spreadsheet hosted on Google Sheets:
+[https://docs.google.com/spreadsheets/d/1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms/](https://docs.google.com/spreadsheets/d/1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms/)
+
+In this URL, the portion `1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms` is the spreadsheetID. Thus, 
+if you wanted to query this sheet in Drill, after configuring Drill, you could do so with the following
+query:
+
+```sql
+SELECT * 
+FROM googlesheets.`1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms`.`Class Data`
+```
+
+The format for the `FROM` clause for Google Sheets is:
+```sql
+FROM <plugin name>.<sheet ID>.<tab name>
+```
+Note that you must specify the tab name to successfully query Google Sheets.
+
+### Using Aliases
+Since the sheet IDs from Google are not human readable, one way to make your life easier is to use Drill's aliasing features to provide a better name for the actual sheet name. 
+
+### Data Types
+Drill's Google Sheets reader will attempt to infer the data types of the incoming data.  As with other connectors, this is an imperfect process since Google Sheets does not 
+supply a schema or other information to allow Drill to identify the data types of a column.  At present, here is how Drill will map your data:
+* Numbers:  All numeric columns will be mapped to `DOUBLE` data types
+* Boolean:  Columns containing `true/false` will be mapped to the `BOOLEAN` type
+* Time, Date, Timestamp:  Temporal fields will be mapped to the correct type.
+* Text:  Anything else will be projected as `VARCHAR`
+
+If the data type inference is not working for you, you can set the `allTextMode` to `true` and Drill will read everything as a `VARCHAR`.
+
+#### Schema Provisioning
+As with other plugins, you can provide a schema inline as shown in the example query below.
+
+```sql
+SELECT * 
+FROM table(`googlesheets`.`<your google sheet>`.`MixedSheet` 
+    (schema => 'inline=(`Col1` VARCHAR, `Col2` INTEGER, `Col3` VARCHAR)')) 
+LIMIT 5
+```
+
+
+### Column Headers
+When Drill reads Google Sheets, it is assumed that the first row contains column headers.  
+If this is incorrect you can set the `extractHeaders` parameter to `false`and Drill will name each field `field_n` where `n` is the column index. 
+
+# Writing Data To Google Sheets
+When Drill is connected to Google Sheets, you can also write data to Google Sheets. The basic procedure is 
+the same as with any other data source.  Simply write a `CREATE TABLE AS` (CTAS) query and your data will be
+written to Google Sheets.
+
+One challenge is that once you have created the new sheet, you will have to manually retrieve the spreadsheet ID 
+from Google Sheets in order to query your new data.
+
+### Dropping Tables
+At the time of implementation, it is only possible to delete tables from within a Google Sheets document. You may encounter errors if you try to delete tables from documents 
+that only have one table in them.  The format for deleting a table is:
+
+```sql
+DROP TABLE googlesheets.<sheet id>.<tab name>
+```
+
+# Possible Future Work
+
+### Auto-Aliasing
+As of Drill 1.20, Drill allows you to create user and public aliases for tables and storage plugins. Since Google Sheets
+requires you to use a non-human readable ID to identify the Sheet.  One possible idea to make the Drill connection to Google Sheets 
+much more usable would be to automatically create an alias (either public) automatically mapping the unreadable sheetID to the document title.
+This could be accomplished after the first query or after a CTAS query.
+
+### Google Drive Integration
+Integrating with Google Drive may allow additional functionality such as getting the actual document name, deleting documents and a few other basic functions. However, the 
+Google Drive permissions require additional validation from Google. 
+
+### Additional Pushdowns
+The current implementation supports pushdowns for projection and limit.  
+The Google Sheets API is quite complex and incredibly poorly documented. In this author's opinion, it is quite possibly one of the worst APIs he has ever seen.
+In any event, it may be possible to add filter, sort and perhaps other pushdowns.  
+The current implementation keeps the logic to push filters down to the batch reader, but does not act on these filters.  
+If someone figures out how to add the filter pushdowns and wishes to do so, the query planning logic is all there.
diff --git a/contrib/storage-googlesheets/pom.xml b/contrib/storage-googlesheets/pom.xml
new file mode 100644
index 0000000..1d7f880
--- /dev/null
+++ b/contrib/storage-googlesheets/pom.xml
@@ -0,0 +1,116 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <properties>
+    <test.elasticsearch.version>7.10.1</test.elasticsearch.version>
+  </properties>
+  <parent>
+    <artifactId>drill-contrib-parent</artifactId>
+    <groupId>org.apache.drill.contrib</groupId>
+    <version>2.0.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>drill-storage-googlesheets</artifactId>
+
+  <name>Drill : Contrib : Storage : GoogleSheets</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <!-- Google Dependencies -->
+    <dependency>
+      <groupId>com.google.api-client</groupId>
+      <artifactId>google-api-client</artifactId>
+      <version>1.35.2</version>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>com.google.api-client</groupId>
+      <artifactId>google-api-client-jackson2</artifactId>
+      <version>1.35.2</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.oauth-client</groupId>
+      <artifactId>google-oauth-client-jetty</artifactId>
+      <version>1.34.1</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.auth</groupId>
+      <artifactId>google-auth-library-oauth2-http</artifactId>
+      <version>1.8.0</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.apis</groupId>
+      <artifactId>google-api-services-sheets</artifactId>
+      <version>v4-rev20220606-1.32.1</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>drill-common</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-java-sources</id>
+            <phase>process-sources</phase>
+            <goals>
+              <goal>copy-resources</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${basedir}/target/classes/org/apache/drill/exec/store/googlesheets
+              </outputDirectory>
+              <resources>
+                <resource>
+                  <directory>src/main/java/org/apache/drill/exec/store/googlesheets</directory>
+                  <filtering>true</filtering>
+                </resource>
+              </resources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/DrillDataStore.java b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/DrillDataStore.java
new file mode 100644
index 0000000..ec147de
--- /dev/null
+++ b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/DrillDataStore.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.googlesheets;
+
+import com.google.api.client.auth.oauth2.StoredCredential;
+import com.google.api.client.util.store.AbstractMemoryDataStore;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.exec.oauth.OAuthTokenProvider;
+import org.apache.drill.exec.oauth.PersistentTokenTable;
+import org.apache.drill.exec.oauth.TokenRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+public class DrillDataStore<V extends Serializable> extends AbstractMemoryDataStore<V> {
+
+  private static final Logger logger = LoggerFactory.getLogger(DrillDataStore.class);
+  private final PersistentTokenTable tokenTable;
+
+  private final DrillDataStoreFactory drillDataStoreFactory;
+
+  DrillDataStore(OAuthTokenProvider tokenProvider, String pluginName, String userID, DrillDataStoreFactory dataStoreFactory) {
+    super(dataStoreFactory, userID);
+    this.drillDataStoreFactory = dataStoreFactory;
+    TokenRegistry tokenRegistry = tokenProvider.getOauthTokenRegistry(userID);
+    this.tokenTable = tokenRegistry.getTokenTable(pluginName);
+    if (hasValidTokens(tokenTable)) {
+      keyValueMap.put(tokenTable.ACCESS_TOKEN_KEY, tokenTable.getAccessToken().getBytes(StandardCharsets.UTF_8));
+      keyValueMap.put(tokenTable.REFRESH_TOKEN_KEY, tokenTable.getRefreshToken().getBytes(StandardCharsets.UTF_8));
+
+      if (tokenTable.getExpiresIn() != null) {
+        keyValueMap.put(tokenTable.EXPIRES_IN_KEY, tokenTable.getExpiresIn().getBytes(StandardCharsets.UTF_8));
+      }
+    }
+  }
+
+  /**
+   * Updates credentials in Drill's persistent store.
+   */
+  @Override
+  public void save() {
+    logger.debug("Saving credentials to token table");
+    tokenTable.setAccessToken(Arrays.toString(keyValueMap.get(tokenTable.ACCESS_TOKEN_KEY)));
+    tokenTable.setRefreshToken(Arrays.toString(keyValueMap.get(tokenTable.REFRESH_TOKEN_KEY)));
+  }
+
+  /**
+   * Returns a {@link StoredCredential} containing the given user's access and refresh tokens.  This method
+   * must only be called AFTER the tokenTable has been initialized.
+   * @return A {@link StoredCredential with the user's access and refresh tokens.}
+   */
+  public StoredCredential getStoredCredential() {
+    if (tokenTable == null) {
+      logger.debug("Token table is null. Please be sure to initialize token table before calling getStoredCredentials.");
+      return null;
+    }
+
+    StoredCredential storedCredential = new StoredCredential();
+    storedCredential.setAccessToken(tokenTable.getAccessToken());
+    storedCredential.setRefreshToken(tokenTable.getRefreshToken());
+
+    if (StringUtils.isNotEmpty(tokenTable.getExpiresIn())) {
+      storedCredential.setExpirationTimeMilliseconds(Long.valueOf(tokenTable.getExpiresIn()));
+    }
+    return storedCredential;
+  }
+
+  @Override
+  public DrillDataStoreFactory getDataStoreFactory() {
+    return drillDataStoreFactory;
+  }
+
+  private boolean hasValidTokens(PersistentTokenTable tokenTable) {
+    return StringUtils.isNotEmpty(tokenTable.getAccessToken()) && StringUtils.isNotEmpty(tokenTable.getRefreshToken());
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("tokenTable", tokenTable)
+      .field("data store factory", drillDataStoreFactory)
+      .toString();
+  }
+}
diff --git a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/DrillDataStoreFactory.java b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/DrillDataStoreFactory.java
new file mode 100644
index 0000000..92ff534
--- /dev/null
+++ b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/DrillDataStoreFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.googlesheets;
+
+import com.google.api.client.util.store.AbstractDataStoreFactory;
+import com.google.api.client.util.store.DataStore;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.exec.oauth.OAuthTokenProvider;
+
+import java.io.Serializable;
+
+public class DrillDataStoreFactory extends AbstractDataStoreFactory {
+  private final OAuthTokenProvider tokenProvider;
+  private final String pluginName;
+
+  public DrillDataStoreFactory(OAuthTokenProvider tokenProvider, String pluginName) {
+    this.tokenProvider = tokenProvider;
+    this.pluginName = pluginName;
+  }
+
+  @Override
+  protected <V extends Serializable> DataStore<V> createDataStore(String id) {
+    return new DrillDataStore<>(tokenProvider, pluginName, id, this);
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("tokenProvider", tokenProvider)
+      .field("pluginName", pluginName)
+      .toString();
+  }
+}
diff --git a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsBatchReader.java b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsBatchReader.java
new file mode 100644
index 0000000..7b6ae18
--- /dev/null
+++ b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsBatchReader.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.googlesheets;
+
+import com.google.api.services.sheets.v4.Sheets;
+import com.google.api.services.sheets.v4.model.Sheet;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.googlesheets.columns.GoogleSheetsColumnWriter.GoogleSheetsBigIntegerColumnWriter;
+import org.apache.drill.exec.store.googlesheets.columns.GoogleSheetsColumnWriter.GoogleSheetsBooleanColumnWriter;
+import org.apache.drill.exec.store.googlesheets.columns.GoogleSheetsColumnWriter.GoogleSheetsDateColumnWriter;
+import org.apache.drill.exec.store.googlesheets.columns.GoogleSheetsColumnWriter.GoogleSheetsFloatColumnWriter;
+import org.apache.drill.exec.store.googlesheets.columns.GoogleSheetsColumnWriter.GoogleSheetsIntegerColumnWriter;
+import org.apache.drill.exec.store.googlesheets.columns.GoogleSheetsColumnWriter.GoogleSheetsNumericColumnWriter;
+import org.apache.drill.exec.store.googlesheets.columns.GoogleSheetsColumnWriter.GoogleSheetsTimeColumnWriter;
+import org.apache.drill.exec.store.googlesheets.columns.GoogleSheetsColumnWriter.GoogleSheetsTimestampColumnWriter;
+import org.apache.drill.exec.store.googlesheets.columns.GoogleSheetsColumnWriter.GoogleSheetsVarcharColumnWriter;
+import org.apache.drill.exec.store.googlesheets.utils.GoogleSheetsRangeBuilder;
+import org.apache.drill.exec.store.googlesheets.utils.GoogleSheetsUtils;
+import org.apache.drill.exec.util.Utilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class GoogleSheetsBatchReader implements ManagedReader<SchemaNegotiator> {
+  private static final Logger logger = LoggerFactory.getLogger(GoogleSheetsBatchReader.class);
+
+  // The default batch size is 1k rows.  It appears that Google sets the maximum batch size at 1000
+  // rows. There is conflicting information about this online, but during testing, ranges with more than
+  // 1000 rows would throw invalid request errors.
+  private static final int BATCH_SIZE = 1000;
+
+  private final GoogleSheetsStoragePluginConfig config;
+  private final GoogleSheetsSubScan subScan;
+  private final List<SchemaPath> projectedColumns;
+  private final Sheet sheet;
+  private final Sheets service;
+  private final GoogleSheetsRangeBuilder rangeBuilder;
+  private final String sheetID;
+  private CustomErrorContext errorContext;
+  private Map<String, GoogleSheetsColumn> columnMap;
+  private RowSetLoader rowWriter;
+
+  public GoogleSheetsBatchReader(GoogleSheetsStoragePluginConfig config, GoogleSheetsSubScan subScan, GoogleSheetsStoragePlugin plugin) {
+    this.config = config;
+    this.subScan = subScan;
+    this.projectedColumns = subScan.getColumns();
+    this.service = plugin.getSheetsService(subScan.getUserName());
+    this.sheetID = subScan.getScanSpec().getSheetID();
+    try {
+      List<Sheet> sheetList = GoogleSheetsUtils.getSheetList(service, sheetID);
+      this.sheet = sheetList.get(subScan.getScanSpec().getTabIndex());
+    } catch (IOException e) {
+      throw UserException.connectionError(e)
+        .message("Could not find tab with index " + subScan.getScanSpec().getTabIndex())
+        .build(logger);
+    }
+
+    int maxRecords = subScan.getMaxRecords();
+    this.rangeBuilder = new GoogleSheetsRangeBuilder(subScan.getScanSpec().getTableName(), BATCH_SIZE)
+      .addRowCount(sheet.getProperties().getGridProperties().getRowCount());
+    if (maxRecords > 0) {
+      // Since the headers are in the first row, add one more row of records to the batch.
+      if (config.getExtractHeaders()) {
+        rangeBuilder.addLimit(maxRecords + 1);
+      } else {
+        rangeBuilder.addLimit(maxRecords);
+      }
+    }
+  }
+
+  @Override
+  public boolean open(SchemaNegotiator negotiator) {
+    logger.debug("Opening Google Sheet {}", subScan.getScanSpec().getTableName());
+    this.errorContext = negotiator.parentErrorContext();
+
+    // Build Schema
+    String tableName = subScan.getScanSpec().getTableName();
+    String pluginName = subScan.getScanSpec().getSheetID();
+    try {
+      columnMap = GoogleSheetsUtils.getColumnMap(GoogleSheetsUtils.getFirstRows(service, pluginName, tableName), projectedColumns, config.allTextMode());
+    } catch (IOException e) {
+      throw UserException.validationError(e)
+        .message("Error building schema: " + e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+
+    // For a star query, we can assume that all columns are projected, and thus,
+    // we can construct a range with the first and last column letters.  IE: A:F
+    // For additional fun, column indexing starts at one rather than zero, so the columnIndex,
+    // which starts at zero needs to be incremented by one to match the letter notation.  IE:
+    // A = 1, B = 2...
+    // We cannot assume that the columns are in order, so it is necessary to iterate over the
+    // list of columns.
+    if (isStarQuery()) {
+      int minIndex = 1;
+      int maxIndex = 1;
+      for (GoogleSheetsColumn column : columnMap.values()) {
+        if ((column.getColumnIndex() + 1) < minIndex) {
+          minIndex = column.getColumnIndex();
+        } else if ((column.getColumnIndex() + 1) > maxIndex ) {
+          maxIndex = column.getColumnIndex() + 1;
+        }
+      }
+      logger.debug("Min index: {}, max index: {}", minIndex, maxIndex);
+      rangeBuilder.isStarQuery(true);
+      rangeBuilder.addFirstColumn(GoogleSheetsUtils.columnToLetter(minIndex))
+        .addLastColumn(GoogleSheetsUtils.columnToLetter(maxIndex));
+    } else {
+      // For non-star queries, we need to build a range which consists of
+      // multiple columns.  For example, let's say that we wanted to project
+      // columns 1-3,5,7-9.  We'd need to construct ranges like this:
+      // A-C,E,G-I
+      rangeBuilder.isStarQuery(false);
+      rangeBuilder.addProjectedRanges(GoogleSheetsUtils.getProjectedRanges(tableName, columnMap));
+    }
+
+    // Add the max row count from the sheet metadata
+    rangeBuilder.addRowCount(sheet.getProperties().getGridProperties().getRowCount());
+    logger.debug(rangeBuilder.toString());
+
+    // Add provided schema if present.
+    TupleMetadata schema;
+    if (negotiator.hasProvidedSchema()) {
+      schema = negotiator.providedSchema();
+    } else {
+      schema = GoogleSheetsUtils.buildSchema(columnMap);
+    }
+    negotiator.tableSchema(schema, true);
+    ResultSetLoader resultLoader = negotiator.build();
+    // Create ScalarWriters
+    rowWriter = resultLoader.writer();
+
+    if (negotiator.hasProvidedSchema()) {
+      setColumnWritersFromProvidedSchema(schema);
+    } else {
+      // Build writers
+      MinorType dataType;
+      for (GoogleSheetsColumn column : columnMap.values()) {
+        dataType = column.getDrillDataType();
+        if (dataType == MinorType.FLOAT8) {
+          column.setWriter(new GoogleSheetsNumericColumnWriter(rowWriter, column.getColumnName()));
+        } else if (dataType == MinorType.VARCHAR) {
+          column.setWriter(new GoogleSheetsVarcharColumnWriter(rowWriter, column.getColumnName()));
+        } else if (dataType == MinorType.DATE) {
+          column.setWriter(new GoogleSheetsDateColumnWriter(rowWriter, column.getColumnName()));
+        } else if (dataType == MinorType.TIMESTAMP) {
+          column.setWriter(new GoogleSheetsTimestampColumnWriter(rowWriter, column.getColumnName()));
+        } else if (dataType == MinorType.TIME) {
+          column.setWriter(new GoogleSheetsTimeColumnWriter(rowWriter, column.getColumnName()));
+        } else if (dataType == MinorType.BIT) {
+          column.setWriter(new GoogleSheetsBooleanColumnWriter(rowWriter, column.getColumnName()));
+        }
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    logger.debug("Processing batch.");
+    while (!rowWriter.isFull()) {
+      if (!processRow()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean processRow() {
+    List<List<Object>> data;
+    try {
+      if (isStarQuery()) {
+        // Get next range
+        String range = rangeBuilder.next();
+        if (range == null) {
+          return false;
+        }
+        data = GoogleSheetsUtils.getDataFromRange(service, sheetID, range);
+      } else {
+        List<String> batches = rangeBuilder.nextBatch();
+        data = GoogleSheetsUtils.getBatchData(service, sheetID, batches);
+      }
+    } catch (IOException e) {
+      throw UserException.dataReadError(e)
+        .message("Error reading Google Sheet: " + e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+
+    int colIndex;
+    List<Object> row;
+    int startIndex = 0;
+    Object value;
+    if (config.getExtractHeaders()) {
+      startIndex = 1;
+    }
+    for (int rowIndex = startIndex; rowIndex < data.size(); rowIndex++) {
+      rowWriter.start();
+      row = data.get(rowIndex);
+      for (GoogleSheetsColumn column : columnMap.values()) {
+        colIndex = column.getDrillColumnIndex();
+        try {
+          value = row.get(colIndex);
+        } catch (IndexOutOfBoundsException e) {
+          // This is a bit of an edge case.  In some circumstances, if there is a null value at the end of
+          // a row, instead of returning an empty string, Google Sheets will shorten the row. This does not
+          // occur if the null value appears in the middle or beginning of a row, or even all the time. This check
+          // prevents out of bounds errors and moves on to the next row.
+          continue;
+        }
+        column.load(value);
+      }
+      rowWriter.save();
+    }
+
+    // If the results contained less than the batch size, stop iterating.
+    if (rowWriter.rowCount() < BATCH_SIZE) {
+      rangeBuilder.lastBatch();
+      return false;
+    }
+    return true;
+  }
+
+  private void setColumnWritersFromProvidedSchema(TupleMetadata schema) {
+    List<MaterializedField> fieldList = schema.toFieldList();
+
+    MinorType dataType;
+    GoogleSheetsColumn column;
+    for (MaterializedField field: fieldList) {
+      dataType = field.getType().getMinorType();
+      column = columnMap.get(field.getName());
+
+      // Get the field
+      if (dataType == MinorType.FLOAT8) {
+        column.setWriter(new GoogleSheetsNumericColumnWriter(rowWriter, column.getColumnName()));
+      } else if (dataType == MinorType.FLOAT4) {
+        column.setWriter(new GoogleSheetsFloatColumnWriter(rowWriter, column.getColumnName()));
+      } else if (dataType == MinorType.VARCHAR) {
+        column.setWriter(new GoogleSheetsVarcharColumnWriter(rowWriter, column.getColumnName()));
+      } else if (dataType == MinorType.INT) {
+        column.setWriter(new GoogleSheetsIntegerColumnWriter(rowWriter, column.getColumnName()));
+      } else if (dataType == MinorType.BIGINT) {
+        column.setWriter(new GoogleSheetsBigIntegerColumnWriter(rowWriter, column.getColumnName()));
+      } else if (dataType == MinorType.DATE) {
+        column.setWriter(new GoogleSheetsDateColumnWriter(rowWriter, column.getColumnName()));
+      } else if (dataType == MinorType.TIMESTAMP) {
+        column.setWriter(new GoogleSheetsTimestampColumnWriter(rowWriter, column.getColumnName()));
+      } else if (dataType == MinorType.TIME) {
+        column.setWriter(new GoogleSheetsTimeColumnWriter(rowWriter, column.getColumnName()));
+      } else if (dataType == MinorType.BIT) {
+        column.setWriter(new GoogleSheetsBooleanColumnWriter(rowWriter, column.getColumnName()));
+      } else {
+        throw UserException.validationError()
+          .message(dataType + " is not supported for GoogleSheets.")
+          .build(logger);
+      }
+    }
+  }
+
+  /**
+   * This function is necessary for star or aggregate queries.
+   * @return True if the query is a star or aggregate query, false if not.
+   */
+  private boolean isStarQuery() {
+    return (Utilities.isStarQuery(projectedColumns)) || projectedColumns.size() == 0;
+  }
+
+  @Override
+  public void close() {
+    // Do nothing
+  }
+}
diff --git a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsBatchWriter.java b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsBatchWriter.java
new file mode 100644
index 0000000..6741a31
--- /dev/null
+++ b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsBatchWriter.java
@@ -0,0 +1,683 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.googlesheets;
+
+import com.google.api.services.sheets.v4.Sheets;
+import com.google.api.services.sheets.v4.model.Spreadsheet;
+import com.google.api.services.sheets.v4.model.SpreadsheetProperties;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.googlesheets.utils.GoogleSheetsUtils;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class GoogleSheetsBatchWriter extends AbstractRecordWriter {
+  private static final Logger logger = LoggerFactory.getLogger(GoogleSheetsBatchWriter.class);
+
+  private final Sheets service;
+  private final String tabName;
+  private final String sheetName;
+  private final List<List<Object>> values;
+  private List<Object> rowList;
+  private String spreadsheetID;
+
+  public GoogleSheetsBatchWriter(OperatorContext context, String name, GoogleSheetsWriter config) {
+    GoogleSheetsStoragePlugin plugin = config.getPlugin();
+
+    this.service = plugin.getSheetsService(config.getQueryUser());
+
+    // GoogleSheets has three different identifiers to track:
+    // 1.  The spreadsheetID is a non-human readable ID for the actual document which can contain
+    //     one or more tabs of data.  This ID can be found in the URL when viewing a GoogleSheet.
+    // 2.  The sheetName is the human readable name of the document.  When you have the spreadsheetID,
+    //      you can obtain the sheetName, however Google did not provide any obvious way to list available
+    //      GoogleSheet documents, nor to look up a sheetID from a title.
+    // 3.  The tabName refers to the tab within a GoogleSheet document.
+    this.tabName = name;
+    this.sheetName = config.getSheetName();
+    values = new ArrayList<>();
+  }
+
+  @Override
+  public void init(Map<String, String> writerOptions) {
+    // Do nothing
+  }
+
+  @Override
+  public void updateSchema(VectorAccessible batch) throws IOException {
+    // Create the new GoogleSheet doc
+    Spreadsheet spreadsheet = new Spreadsheet()
+      .setProperties(new SpreadsheetProperties().setTitle(sheetName));
+
+    spreadsheet = service.spreadsheets().create(spreadsheet)
+      .setFields("spreadsheetId")
+      .execute();
+
+    this.spreadsheetID = spreadsheet.getSpreadsheetId();
+    // Now add the tab
+    GoogleSheetsUtils.addTabToGoogleSheet(service, spreadsheetID, tabName);
+
+    // Add the column names to the values list.  GoogleSheets does not have any concept
+    // of column names, so we just insert the column names as the first row of data.
+    BatchSchema schema = batch.getSchema();
+    List<Object> columnNames = new ArrayList<>();
+    for (MaterializedField field : schema) {
+      columnNames.add(field.getName());
+    }
+    values.add(columnNames);
+  }
+
+  @Override
+  public void startRecord() {
+    rowList = new ArrayList<>();
+  }
+
+  @Override
+  public void endRecord() {
+    values.add(rowList);
+  }
+
+  @Override
+  public void abort() {
+    // Do nothing
+  }
+
+  @Override
+  public void cleanup() {
+    try {
+      GoogleSheetsUtils.writeDataToGoogleSheet(service, spreadsheetID, tabName, values);
+    } catch (IOException e) {
+      throw UserException.dataWriteError(e)
+        .message("Error writing to GoogleSheets " + e.getMessage())
+        .build(logger);
+    }
+  }
+
+  @Override
+  public EventBasedRecordWriter.FieldConverter getNewNullableBigIntConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new NullableBigIntGSConverter(fieldId, fieldName, reader);
+  }
+
+  public class NullableBigIntGSConverter extends EventBasedRecordWriter.FieldConverter {
+    private final NullableBigIntHolder holder = new NullableBigIntHolder();
+
+    public NullableBigIntGSConverter(int fieldID, String fieldName, FieldReader reader) {
+      super(fieldID, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() {
+      if (!reader.isSet()) {
+        rowList.add(null);
+        return;
+      }
+      reader.read(holder);
+      rowList.add(holder.value);
+    }
+  }
+
+  @Override
+  public EventBasedRecordWriter.FieldConverter getNewBigIntConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new BigIntGSConverter(fieldId, fieldName, reader);
+  }
+
+  public class BigIntGSConverter extends EventBasedRecordWriter.FieldConverter {
+    private final BigIntHolder holder = new BigIntHolder();
+
+    public BigIntGSConverter(int fieldID, String fieldName, FieldReader reader) {
+      super(fieldID, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() {
+      if (!reader.isSet()) {
+        rowList.add(null);
+        return;
+      }
+      reader.read(holder);
+      rowList.add(holder.value);
+    }
+  }
+
+  @Override
+  public EventBasedRecordWriter.FieldConverter getNewNullableIntConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new NullableIntGSConverter(fieldId, fieldName, reader);
+  }
+  public class NullableIntGSConverter extends EventBasedRecordWriter.FieldConverter {
+    private final NullableIntHolder holder = new NullableIntHolder();
+
+    public NullableIntGSConverter(int fieldID, String fieldName, FieldReader reader) {
+      super(fieldID, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() {
+      if (!reader.isSet()) {
+        rowList.add(null);
+        return;
+      }
+      reader.read(holder);
+      rowList.add(holder.value);
+    }
+  }
+
+  @Override
+  public EventBasedRecordWriter.FieldConverter getNewIntConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new IntGSConverter(fieldId, fieldName, reader);
+  }
+
+  public class IntGSConverter extends EventBasedRecordWriter.FieldConverter {
+    private final IntHolder holder = new IntHolder();
+
+    public IntGSConverter(int fieldID, String fieldName, FieldReader reader) {
+      super(fieldID, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() {
+      if (!reader.isSet()) {
+        rowList.add(null);
+        return;
+      }
+      reader.read(holder);
+      rowList.add(holder.value);
+    }
+  }
+  @Override
+  public EventBasedRecordWriter.FieldConverter getNewNullableSmallIntConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new NullableSmallIntGSConverter(fieldId, fieldName, reader);
+  }
+  public class NullableSmallIntGSConverter extends EventBasedRecordWriter.FieldConverter {
+    private final NullableSmallIntHolder holder = new NullableSmallIntHolder();
+
+    public NullableSmallIntGSConverter(int fieldID, String fieldName, FieldReader reader) {
+      super(fieldID, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() {
+      if (!reader.isSet()) {
+        rowList.add(null);
+        return;
+      }
+      reader.read(holder);
+      rowList.add(holder.value);
+    }
+  }
+
+  @Override
+  public EventBasedRecordWriter.FieldConverter getNewSmallIntConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new SmallIntGSConverter(fieldId, fieldName, reader);
+  }
+
+  public class SmallIntGSConverter extends EventBasedRecordWriter.FieldConverter {
+    private final SmallIntHolder holder = new SmallIntHolder();
+
+    public SmallIntGSConverter(int fieldID, String fieldName, FieldReader reader) {
+      super(fieldID, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() {
+      if (!reader.isSet()) {
+        rowList.add(null);
+        return;
+      }
+      reader.read(holder);
+      rowList.add(holder.value);
+    }
+  }
+  @Override
+  public EventBasedRecordWriter.FieldConverter getNewNullableTinyIntConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new NullableTinyIntGSConverter(fieldId, fieldName, reader);
+  }
+  public class NullableTinyIntGSConverter extends EventBasedRecordWriter.FieldConverter {
+    private final NullableTinyIntHolder holder = new NullableTinyIntHolder();
+
+    public NullableTinyIntGSConverter(int fieldID, String fieldName, FieldReader reader) {
+      super(fieldID, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() {
+      if (!reader.isSet()) {
+        rowList.add(null);
+        return;
+      }
+      reader.read(holder);
+      rowList.add(holder.value);
+    }
+  }
+
+  @Override
+  public EventBasedRecordWriter.FieldConverter getNewTinyIntConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new TinyIntGSConverter(fieldId, fieldName, reader);
+  }
+
+  public class TinyIntGSConverter extends EventBasedRecordWriter.FieldConverter {
+    private final TinyIntHolder holder = new TinyIntHolder();
+
+    public TinyIntGSConverter(int fieldID, String fieldName, FieldReader reader) {
+      super(fieldID, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() {
+      if (!reader.isSet()) {
+        rowList.add(null);
+        return;
+      }
+      reader.read(holder);
+      rowList.add(holder.value);
+    }
+  }
+
+  @Override
+  public EventBasedRecordWriter.FieldConverter getNewNullableFloat8Converter(int fieldId, String fieldName, FieldReader reader) {
+    return new NullableFloat8GSConverter(fieldId, fieldName, reader);
+  }
+  public class NullableFloat8GSConverter extends EventBasedRecordWriter.FieldConverter {
+    private final NullableFloat8Holder holder = new NullableFloat8Holder();
+
+    public NullableFloat8GSConverter(int fieldID, String fieldName, FieldReader reader) {
+      super(fieldID, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() {
+      if (!reader.isSet()) {
+        rowList.add(null);
+        return;
+      }
+      reader.read(holder);
+      rowList.add(holder.value);
+    }
+  }
+  @Override
+  public EventBasedRecordWriter.FieldConverter getNewFloat8Converter(int fieldId, String fieldName, FieldReader reader) {
+    return new Float8GSConverter(fieldId, fieldName, reader);
+  }
+
+  public class Float8GSConverter extends EventBasedRecordWriter.FieldConverter {
+    private final Float8Holder holder = new Float8Holder();
+
+    public Float8GSConverter(int fieldID, String fieldName, FieldReader reader) {
+      super(fieldID, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() {
+      if (!reader.isSet()) {
+        rowList.add(null);
+        return;
+      }
+      reader.read(holder);
+      rowList.add(holder.value);
+    }
+  }
+
+  @Override
+  public EventBasedRecordWriter.FieldConverter getNewNullableFloat4Converter(int fieldId, String fieldName, FieldReader reader) {
+    return new NullableFloat4GSConverter(fieldId, fieldName, reader);
+  }
+  public class NullableFloat4GSConverter extends EventBasedRecordWriter.FieldConverter {
+    private final NullableFloat4Holder holder = new NullableFloat4Holder();
+
+    public NullableFloat4GSConverter(int fieldID, String fieldName, FieldReader reader) {
+      super(fieldID, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() {
+      if (!reader.isSet()) {
+        rowList.add(null);
+        return;
+      }
+      reader.read(holder);
+      rowList.add(holder.value);
+    }
+  }
+
+  @Override
+  public EventBasedRecordWriter.FieldConverter getNewFloat4Converter(int fieldId, String fieldName, FieldReader reader) {
+    return new Float4GSConverter(fieldId, fieldName, reader);
+  }
+
+  public class Float4GSConverter extends EventBasedRecordWriter.FieldConverter {
+    private final Float4Holder holder = new Float4Holder();
+
+    public Float4GSConverter(int fieldID, String fieldName, FieldReader reader) {
+      super(fieldID, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() {
+      if (!reader.isSet()) {
+        rowList.add(null);
+        return;
+      }
+      reader.read(holder);
+      rowList.add(holder.value);
+    }
+  }
+
+  @Override
+  public FieldConverter getNewNullableVarDecimalConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new NullableVardecimalGSConverter(fieldId, fieldName, reader);
+  }
+
+  public class NullableVardecimalGSConverter extends FieldConverter {
+    private final NullableVarDecimalHolder holder = new NullableVarDecimalHolder();
+
+    public NullableVardecimalGSConverter(int fieldID, String fieldName, FieldReader reader) {
+      super(fieldID, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() {
+      if (!reader.isSet()) {
+        rowList.add("null");
+        return;
+      }
+      reader.read(holder);
+      BigDecimal value = DecimalUtility.getBigDecimalFromDrillBuf(holder.buffer,
+        holder.start, holder.end - holder.start, holder.scale);
+      rowList.add(value);
+    }
+  }
+
+  @Override
+  public FieldConverter getNewVarDecimalConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new VardecimalGSConverter(fieldId, fieldName, reader);
+  }
+
+  public class VardecimalGSConverter extends FieldConverter {
+    private final VarDecimalHolder holder = new VarDecimalHolder();
+
+    public VardecimalGSConverter(int fieldID, String fieldName, FieldReader reader) {
+      super(fieldID, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() {
+      reader.read(holder);
+      BigDecimal value = DecimalUtility.getBigDecimalFromDrillBuf(holder.buffer,
+        holder.start, holder.end - holder.start, holder.scale);
+      rowList.add(value);
+    }
+  }
+  @Override
+  public FieldConverter getNewNullableVarCharConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new NullableVarCharGSConverter(fieldId, fieldName, reader);
+  }
+
+  public class NullableVarCharGSConverter extends FieldConverter {
+    private final NullableVarCharHolder holder = new NullableVarCharHolder();
+
+    public NullableVarCharGSConverter(int fieldID, String fieldName, FieldReader reader) {
+      super(fieldID, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() {
+      reader.read(holder);
+      if (reader.isSet()) {
+        String input = StringFunctionHelpers.toStringFromUTF8(holder.start, holder.end, holder.buffer);
+        rowList.add(input);
+      }
+    }
+  }
+
+
+  @Override
+  public FieldConverter getNewVarCharConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new VarCharGSConverter(fieldId, fieldName, reader);
+  }
+
+  public class VarCharGSConverter extends FieldConverter {
+    private final VarCharHolder holder = new VarCharHolder();
+
+    public VarCharGSConverter(int fieldID, String fieldName, FieldReader reader) {
+      super(fieldID, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() {
+      reader.read(holder);
+      if (reader.isSet()) {
+        String input = StringFunctionHelpers.toStringFromUTF8(holder.start, holder.end, holder.buffer);
+        rowList.add(input);
+      }
+    }
+  }
+
+  @Override
+  public FieldConverter getNewNullableDateConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new NullableDateGSConverter(fieldId, fieldName, reader);
+  }
+
+  public class NullableDateGSConverter extends FieldConverter {
+    private final NullableDateHolder holder = new NullableDateHolder();
+
+    public NullableDateGSConverter(int fieldID, String fieldName, FieldReader reader) {
+      super(fieldID, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() {
+      if (!reader.isSet()) {
+        rowList.add("null");
+        return;
+      }
+      reader.read(holder);
+      rowList.add(holder.value);
+    }
+  }
+
+  @Override
+  public FieldConverter getNewDateConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new DateGSConverter(fieldId, fieldName, reader);
+  }
+
+  public class DateGSConverter extends FieldConverter {
+    private final DateHolder holder = new DateHolder();
+
+    public DateGSConverter(int fieldID, String fieldName, FieldReader reader) {
+      super(fieldID, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() {
+      reader.read(holder);
+      rowList.add(holder.value);
+    }
+  }
+
+  @Override
+  public FieldConverter getNewNullableTimeConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new NullableTimeGSConverter(fieldId, fieldName, reader);
+  }
+
+  public class NullableTimeGSConverter extends FieldConverter {
+    private final NullableTimeHolder holder = new NullableTimeHolder();
+
+    public NullableTimeGSConverter(int fieldID, String fieldName, FieldReader reader) {
+      super(fieldID, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() {
+      if (!reader.isSet()) {
+        rowList.add("null");
+        return;
+      }
+      reader.read(holder);
+      rowList.add(holder.value);
+    }
+  }
+
+  @Override
+  public FieldConverter getNewTimeConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new TimeGSConverter(fieldId, fieldName, reader);
+  }
+
+  public class TimeGSConverter extends FieldConverter {
+    private final TimeHolder holder = new TimeHolder();
+
+    public TimeGSConverter(int fieldID, String fieldName, FieldReader reader) {
+      super(fieldID, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() {
+      reader.read(holder);
+      rowList.add(holder.value);
+    }
+  }
+
+  @Override
+  public FieldConverter getNewNullableTimeStampConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new NullableTimeStampGSConverter(fieldId, fieldName, reader);
+  }
+
+  public class NullableTimeStampGSConverter extends FieldConverter {
+    private final NullableTimeStampHolder holder = new NullableTimeStampHolder();
+
+    public NullableTimeStampGSConverter(int fieldID, String fieldName, FieldReader reader) {
+      super(fieldID, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() {
+      if (!reader.isSet()) {
+        rowList.add("null");
+        return;
+      }
+      reader.read(holder);
+      rowList.add(holder.value);
+    }
+  }
+
+  @Override
+  public FieldConverter getNewTimeStampConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new TimeStampGSConverter(fieldId, fieldName, reader);
+  }
+
+  public class TimeStampGSConverter extends FieldConverter {
+    private final TimeStampHolder holder = new TimeStampHolder();
+
+    public TimeStampGSConverter(int fieldID, String fieldName, FieldReader reader) {
+      super(fieldID, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() {
+      reader.read(holder);
+      rowList.add(holder.value);
+    }
+  }
+
+  @Override
+  public FieldConverter getNewNullableBitConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new NullableBitGSConverter(fieldId, fieldName, reader);
+  }
+
+  public class NullableBitGSConverter extends FieldConverter {
+    private final NullableBitHolder holder = new NullableBitHolder();
+
+    public NullableBitGSConverter(int fieldID, String fieldName, FieldReader reader) {
+      super(fieldID, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() {
+      if (!reader.isSet()) {
+        rowList.add("null");
+        return;
+      }
+      reader.read(holder);
+      String booleanValue = "false";
+      if (holder.value == 1) {
+        booleanValue = "true";
+      }
+      rowList.add(booleanValue);
+    }
+  }
+  @Override
+  public FieldConverter getNewBitConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new BitGSConverter(fieldId, fieldName, reader);
+  }
+
+  public class BitGSConverter extends FieldConverter {
+    private final BitHolder holder = new BitHolder();
+
+    public BitGSConverter(int fieldID, String fieldName, FieldReader reader) {
+      super(fieldID, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() {
+      reader.read(holder);
+      String booleanValue = "false";
+      if (holder.value == 1) {
+        booleanValue = "true";
+      }
+      rowList.add(booleanValue);
+    }
+  }
+}
diff --git a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsColumn.java b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsColumn.java
new file mode 100644
index 0000000..d348a97
--- /dev/null
+++ b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsColumn.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.googlesheets;
+
+
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.store.googlesheets.columns.GoogleSheetsColumnWriter;
+import org.apache.drill.exec.store.googlesheets.utils.GoogleSheetsUtils;
+
+import java.util.Objects;
+
+
+/**
+ * This class is the representation of a GoogleSheets column. Since column
+ * metadata appears to be unavailable at either by accessing the master document
+ * or the sheet itself, this class gathers it and represents that as a Java object.
+ *
+ * Additionally, GoogleSheets does not allow you to name columns or access them by position,
+ * instead using A1 notation (or other equally useless forms of accessing columns). In order to facilitate
+ * the projection pushdown we have to track the column's: name, data type and also be able
+ * to translate that position into a letter.  Note that Google sheets has a limit of approx 18k
+ * columns.
+ */
+public class GoogleSheetsColumn {
+  private final String columnName;
+  private final GoogleSheetsUtils.DATA_TYPES dataType;
+  private final MinorType drillDataType;
+  private final int columnIndex;
+  private final int drillColumnIndex;
+  private final String columnLetter;
+  private GoogleSheetsColumnWriter writer;
+
+  public GoogleSheetsColumn(String columnName, GoogleSheetsUtils.DATA_TYPES dataType, int googleColumnIndex, int drillColumnIndex) {
+    this.columnName = columnName;
+    this.columnIndex = googleColumnIndex;
+    this.drillColumnIndex = drillColumnIndex;
+    this.dataType = dataType;
+    this.columnLetter = GoogleSheetsUtils.columnToLetter(googleColumnIndex + 1);
+    this.drillDataType = getDrillDataType(dataType);
+  }
+
+  private MinorType getDrillDataType(GoogleSheetsUtils.DATA_TYPES dataType) {
+    switch (dataType) {
+      case NUMERIC:
+        return MinorType.FLOAT8;
+      case DATE:
+        return MinorType.DATE;
+      case TIME:
+        return MinorType.TIME;
+      case TIMESTAMP:
+        return MinorType.TIMESTAMP;
+      case UNKNOWN:
+      case VARCHAR:
+      default:
+        return MinorType.VARCHAR;
+    }
+  }
+
+  public void setWriter(GoogleSheetsColumnWriter writer) {
+    this.writer = writer;
+  }
+
+  public MinorType getDrillDataType() {
+    return drillDataType;
+  }
+
+  public int getColumnIndex() {
+    return columnIndex;
+  }
+
+  public int getDrillColumnIndex() { return drillColumnIndex; }
+
+  public String getColumnLetter() { return columnLetter; }
+
+  public String getColumnName() {
+    return columnName;
+  }
+
+  public void load(Object value) {
+    writer.load(value);
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("columnName", columnName)
+      .field("columnIndex", columnIndex)
+      .field("columnLetter", columnLetter)
+      .field("data type", dataType)
+      .toString();
+  }
+  @Override
+  public boolean equals(Object that) {
+    if (this == that) {
+      return true;
+    } else if (that == null || getClass() != that.getClass()) {
+      return false;
+    }
+    GoogleSheetsColumn otherColumn  = (GoogleSheetsColumn) that;
+    return Objects.equals(columnName, otherColumn.columnName) &&
+      Objects.equals(columnIndex, otherColumn.columnIndex) &&
+      Objects.equals(columnLetter, otherColumn.columnLetter) &&
+      Objects.equals(dataType, otherColumn.dataType);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(columnName, columnIndex, columnLetter, dataType);
+  }
+}
diff --git a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsGroupScan.java b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsGroupScan.java
new file mode 100644
index 0000000..dc04837
--- /dev/null
+++ b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsGroupScan.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.googlesheets;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.planner.logical.DrillScanRel;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.base.filter.ExprNode;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.metastore.metadata.TableMetadata;
+import org.apache.drill.metastore.metadata.TableMetadataProvider;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+@JsonTypeName("googlesheets-group-scan")
+public class GoogleSheetsGroupScan extends AbstractGroupScan {
+
+  private final GoogleSheetsScanSpec scanSpec;
+  private final GoogleSheetsStoragePluginConfig config;
+  private final List<SchemaPath> columns;
+  private final String pluginName;
+  private final Map<String, ExprNode.ColRelOpConstNode> filters;
+  private final ScanStats scanStats;
+  private final double filterSelectivity;
+  private final int maxRecords;
+  private final GoogleSheetsStoragePlugin plugin;
+  private int hashCode;
+  private MetadataProviderManager metadataProviderManager;
+
+  // Initial Constructor
+  public GoogleSheetsGroupScan(String userName,
+                               GoogleSheetsScanSpec scanSpec,
+                               GoogleSheetsStoragePlugin plugin,
+                               MetadataProviderManager metadataProviderManager) {
+    super(userName);
+    this.scanSpec = scanSpec;
+    this.config = scanSpec.getConfig();
+    this.columns = ALL_COLUMNS;
+    this.pluginName = plugin.getName();
+    this.filters = null;
+    this.filterSelectivity = 0.0;
+    this.maxRecords = -1;
+    this.scanStats = computeScanStats();
+    this.plugin = plugin;
+    this.metadataProviderManager = metadataProviderManager;
+  }
+
+  // Copy Constructor
+  public GoogleSheetsGroupScan(GoogleSheetsGroupScan that) {
+    super(that);
+    this.scanSpec = that.scanSpec;
+    this.config = that.config;
+    this.columns = that.columns;
+    this.filters = that.filters;
+    this.pluginName = that.pluginName;
+    this.filterSelectivity = that.filterSelectivity;
+    this.scanStats = that.scanStats;
+    this.maxRecords = that.maxRecords;
+    this.plugin = that.plugin;
+    this.metadataProviderManager = that.metadataProviderManager;
+    this.hashCode = hashCode();
+  }
+
+  /**
+   * Constructor for applying a limit.
+   * @param that The previous group scan without the limit.
+   * @param maxRecords  The desired limit, pushed down from Calcite
+   */
+  public GoogleSheetsGroupScan(GoogleSheetsGroupScan that, int maxRecords) {
+    super(that);
+    this.scanSpec = that.scanSpec;
+    this.config = that.config;
+    this.columns = that.columns;
+    this.pluginName = that.pluginName;
+    this.filters = that.filters;
+    this.filterSelectivity = that.filterSelectivity;
+    this.maxRecords = maxRecords;
+    this.plugin = that.plugin;
+    this.metadataProviderManager = that.metadataProviderManager;
+    this.scanStats = computeScanStats();
+  }
+
+  /**
+   * Constructor for applying columns (Projection pushdown).
+   * @param that The previous GroupScan, without the columns
+   * @param columns The list of columns to push down
+   */
+  public GoogleSheetsGroupScan(GoogleSheetsGroupScan that, List<SchemaPath> columns) {
+    super(that);
+    this.scanSpec = that.scanSpec;
+    this.config = scanSpec.getConfig();
+    this.columns = columns;
+    this.filters = that.filters;
+    this.pluginName = that.pluginName;
+    this.filterSelectivity = that.filterSelectivity;
+    this.maxRecords = that.maxRecords;
+    this.plugin = that.plugin;
+    this.metadataProviderManager = that.metadataProviderManager;
+    this.scanStats = computeScanStats();
+  }
+
+  /**
+   * Constructor for applying a filter
+   * @param that Previous group scan w/o filters
+   * @param filters The list of filters
+   * @param filterSelectivity  The filter selectivity
+   */
+  public GoogleSheetsGroupScan(GoogleSheetsGroupScan that,
+                               Map<String, ExprNode.ColRelOpConstNode> filters,
+                               double filterSelectivity) {
+    super(that);
+    this.scanSpec = that.scanSpec;
+    this.config = that.config;
+    this.columns = that.columns;
+    this.filters = filters;
+    this.pluginName = that.pluginName;
+    this.filterSelectivity = filterSelectivity;
+    this.maxRecords = that.maxRecords;
+    this.plugin = that.plugin;
+    this.metadataProviderManager = that.metadataProviderManager;
+    this.scanStats = computeScanStats();
+  }
+
+  @JsonCreator
+  public GoogleSheetsGroupScan(
+    @JsonProperty("userName") String userName,
+    @JsonProperty("scanSpec") GoogleSheetsScanSpec scanSpec,
+    @JsonProperty("columns") List<SchemaPath> columns,
+    @JsonProperty("filters") Map<String, ExprNode.ColRelOpConstNode> filters,
+    @JsonProperty("filterSelectivity") double selectivity,
+    @JsonProperty("maxRecords") int maxRecords,
+    @JacksonInject StoragePluginRegistry plugins
+  ) {
+    super(userName);
+    this.scanSpec = scanSpec;
+    this.config = scanSpec.getConfig();
+    this.columns = columns;
+    this.filters = filters;
+    this.filterSelectivity = selectivity;
+    this.maxRecords = maxRecords;
+    this.scanStats = computeScanStats();
+    this.plugin = plugins.resolve(config, GoogleSheetsStoragePlugin.class);
+    this.pluginName = plugin.getName();
+  }
+
+  @JsonProperty("scanSpec")
+  public GoogleSheetsScanSpec scanSpec() {
+    return scanSpec;
+  }
+
+  @JsonProperty("config")
+  public GoogleSheetsStoragePluginConfig config() {
+    return config;
+  }
+
+  @JsonProperty("columns")
+  public List<SchemaPath> columns() {
+    return columns;
+  }
+
+  @JsonProperty("filters")
+  public Map<String, ExprNode.ColRelOpConstNode> filters() {
+    return filters;
+  }
+
+  @JsonProperty("maxRecords")
+  public int maxRecords() {
+    return maxRecords;
+  }
+
+  @Override
+  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
+
+  }
+
+  public TupleMetadata getSchema() {
+    if (metadataProviderManager == null) {
+      return null;
+    }
+    try {
+      return metadataProviderManager.getSchemaProvider().read().getSchema();
+    } catch (IOException | NullPointerException e) {
+      return null;
+    }
+  }
+
+  @Override
+  public TableMetadata getTableMetadata() {
+    if (getMetadataProvider() == null) {
+      return null;
+    }
+    return getMetadataProvider().getTableMetadata();
+  }
+
+  @Override
+  public TableMetadataProvider getMetadataProvider() {
+    if (metadataProviderManager == null) {
+      return null;
+    }
+    return metadataProviderManager.getTableMetadataProvider();
+  }
+
+  @Override
+  @JsonIgnore
+  public boolean canPushdownProjects(List<SchemaPath> columns) {
+    return true;
+  }
+
+  @JsonIgnore
+  public boolean allowsFilters() {
+    return true;
+  }
+
+  @Override
+  public SubScan getSpecificScan(int minorFragmentId) {
+    return new GoogleSheetsSubScan(userName, config, scanSpec, columns, filters, maxRecords, getSchema());
+  }
+
+  @Override
+  public int getMaxParallelizationWidth() {
+    return 1;
+  }
+
+  @Override
+  public GroupScan clone(List<SchemaPath> columns) {
+    return new GoogleSheetsGroupScan(this, columns);
+  }
+
+  @Override
+  public boolean supportsLimitPushdown() {
+    return true;
+  }
+
+  @Override
+  public GroupScan applyLimit(int maxRecords) {
+    if (maxRecords == this.maxRecords) {
+      return null;
+    }
+    return new GoogleSheetsGroupScan(this, maxRecords);
+  }
+
+  @Override
+  public String getDigest() {
+    return toString();
+  }
+
+  @Override
+  public ScanStats getScanStats() {
+
+    // Since this class is immutable, compute stats once and cache
+    // them. If the scan changes (adding columns, adding filters), we
+    // get a new scan without cached stats.
+    return scanStats;
+  }
+
+  private ScanStats computeScanStats() {
+
+    // If this config allows filters, then make the default
+    // cost very high to force the planner to choose the version
+    // with filters.
+    if (!hasFilters()) {
+      return new ScanStats(ScanStats.GroupScanProperty.ESTIMATED_TOTAL_COST,
+        1E9, 1E112, 1E12);
+    }
+
+    // No good estimates at all, just make up something.
+    // TODO It may be possible to obtain this from the Google SDK.
+    double estRowCount = 10_000;
+    if (maxRecords > 0) {
+      estRowCount = maxRecords;
+    }
+
+    // NOTE this was important! if the predicates don't make the query more
+    // efficient they won't get pushed down
+    if (hasFilters()) {
+      estRowCount *= filterSelectivity;
+    }
+
+    double estColCount = Utilities.isStarQuery(columns) ? DrillScanRel.STAR_COLUMN_COST : columns.size();
+    double valueCount = estRowCount * estColCount;
+    double cpuCost = valueCount;
+    double ioCost = valueCount;
+
+    // Force the caller to use our costs rather than the
+    // defaults (which sets IO cost to zero).
+    return new ScanStats(ScanStats.GroupScanProperty.ESTIMATED_TOTAL_COST,
+      estRowCount, cpuCost, ioCost);
+  }
+
+  @JsonIgnore
+  public boolean hasFilters() {
+    return filters != null;
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.isEmpty());
+    return new GoogleSheetsGroupScan(this);
+  }
+
+  @Override
+  public int hashCode() {
+    if (hashCode == 0) {
+      hashCode = Objects.hash(scanSpec, config, columns, filters, maxRecords, pluginName);
+    }
+    return hashCode;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+
+    GoogleSheetsGroupScan other = (GoogleSheetsGroupScan) obj;
+    return Objects.equals(scanSpec, other.scanSpec) &&
+      Objects.equals(config, other.config) &&
+      Objects.equals(columns, other.columns) &&
+      Objects.equals(filters, other.filters) &&
+      Objects.equals(maxRecords, other.maxRecords);
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("scanSpec", scanSpec)
+      .field("filters", filters)
+      .field("columns", columns)
+      .field("field selectivity", filterSelectivity)
+      .field("maxRecords", maxRecords)
+      .toString();
+  }
+}
diff --git a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsPushDownListener.java b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsPushDownListener.java
new file mode 100644
index 0000000..c1fb069
--- /dev/null
+++ b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsPushDownListener.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.googlesheets;
+
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.base.filter.ExprNode;
+import org.apache.drill.exec.store.base.filter.ExprNode.AndNode;
+import org.apache.drill.exec.store.base.filter.ExprNode.ColRelOpConstNode;
+import org.apache.drill.exec.store.base.filter.ExprNode.OrNode;
+import org.apache.drill.exec.store.base.filter.FilterPushDownListener;
+import org.apache.drill.exec.store.base.filter.FilterPushDownStrategy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The GoogleSheets storage plugin accepts filters which are:
+ * <ul>
+ * <li>A single column = value expression </li>
+ * <li>An AND'ed set of such expressions,</li>
+ * <li>If the value is one with an unambiguous conversion to
+ * a string. (That is, not dates, binary, maps, etc.)</li>
+ * </ul>
+ *
+ * Note, at the moment, no filters are pushed down. Once we figure out the Google SDK for this,
+ * we can easily uncomment out the lines below and the filters will be pushed down.
+ */
+public class GoogleSheetsPushDownListener implements FilterPushDownListener {
+
+  public static Set<StoragePluginOptimizerRule> rulesFor(OptimizerRulesContext optimizerRulesContext) {
+    return FilterPushDownStrategy.rulesFor(new GoogleSheetsPushDownListener());
+  }
+
+  @Override
+  public String prefix() {
+    return "GoogleSheets";
+  }
+
+  @Override
+  public boolean isTargetScan(GroupScan groupScan) {
+    return groupScan instanceof GoogleSheetsGroupScan;
+  }
+
+  @Override
+  public ScanPushDownListener builderFor(GroupScan groupScan) {
+    GoogleSheetsGroupScan gsScan = (GoogleSheetsGroupScan) groupScan;
+    if (gsScan.hasFilters() || !gsScan.allowsFilters()) {
+      return null;
+    } else {
+      return new GoogleSheetsScanPushDownListener(gsScan);
+    }
+  }
+
+  private static class GoogleSheetsScanPushDownListener implements ScanPushDownListener {
+
+    private final GoogleSheetsGroupScan groupScan;
+    private final Map<String, String> filterParams = CaseInsensitiveMap.newHashMap();
+
+    GoogleSheetsScanPushDownListener(GoogleSheetsGroupScan groupScan) {
+      this.groupScan = groupScan;
+      for (SchemaPath field : groupScan.columns()) {
+        filterParams.put(field.getAsUnescapedPath(), field.getAsUnescapedPath());
+      }
+    }
+
+    @Override
+    public ExprNode accept(ExprNode node) {
+      if (node instanceof OrNode) {
+        return null;
+      } else if (node instanceof ColRelOpConstNode) {
+        // TODO Implement Filter Pushdowns
+        // This entire method always returns null.  Google Sheets SDK may allow
+        // filter pushdowns, however the SDK was so complicated and poorly documented, I was not able
+        // to figure out how to implement filter pushdowns. If and when we figure that out, uncomment
+        // the line below, and then the filter rules will be pushed down.
+        return null;
+        //return acceptRelOp((ColRelOpConstNode) node);
+      } else {
+        return null;
+      }
+    }
+
+    private ColRelOpConstNode acceptRelOp(ColRelOpConstNode relOp) {
+      return acceptColumn(relOp.colName) && acceptType(relOp.value.type) ? relOp : null;
+    }
+
+    /**
+     * Only accept columns in the filter params list.
+     */
+    private boolean acceptColumn(String colName) {
+      return filterParams.containsKey(colName);
+    }
+
+    /**
+     * Only accept types which have an unambiguous mapping to
+     * String.
+     */
+    private boolean acceptType(MinorType type) {
+      switch (type) {
+        case BIGINT:
+        case BIT:
+        case FLOAT4:
+        case FLOAT8:
+        case INT:
+        case SMALLINT:
+        case VARCHAR:
+        case VARDECIMAL:
+          return true;
+        default:
+          return false;
+      }
+    }
+
+    /**
+     * Convert the nodes to a map of param/string pairs using
+     * the case specified in the storage plugin config.
+     */
+    @Override
+    public Pair<GroupScan, List<RexNode>> transform(AndNode andNode) {
+      Map<String, ExprNode.ColRelOpConstNode> filters = new HashMap<>();
+      double selectivity = 1;
+      for (ExprNode expr : andNode.children) {
+        ColRelOpConstNode relOp = (ColRelOpConstNode) expr;
+        filters.put(filterParams.get(relOp.colName), relOp);
+        selectivity *= relOp.op.selectivity();
+      }
+      GoogleSheetsGroupScan newScan = new GoogleSheetsGroupScan(groupScan, filters, selectivity);
+      return Pair.of(newScan, Collections.emptyList());
+    }
+  }
+}
diff --git a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsScanBatchCreator.java b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsScanBatchCreator.java
new file mode 100644
index 0000000..e6163cd
--- /dev/null
+++ b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsScanBatchCreator.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.googlesheets;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ScanFrameworkBuilder;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class GoogleSheetsScanBatchCreator implements BatchCreator<GoogleSheetsSubScan> {
+
+  private static final Logger logger = LoggerFactory.getLogger(GoogleSheetsScanBatchCreator.class);
+
+  @Override
+  public CloseableRecordBatch getBatch(ExecutorFragmentContext context,
+                                       GoogleSheetsSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+
+    StoragePluginRegistry registry = context.getDrillbitContext().getStorage();
+    GoogleSheetsStoragePlugin plugin;
+    try {
+      plugin = (GoogleSheetsStoragePlugin) registry.getPlugin(subScan.getScanSpec().getPluginName());
+    } catch (PluginException e) {
+      throw UserException.internalError(e)
+        .message("Unable to locate GoogleSheets storage plugin")
+        .build(logger);
+    }
+
+    try {
+      ScanFrameworkBuilder builder = createBuilder(context.getOptions(), subScan, plugin);
+      return builder.buildScanOperator(context, subScan);
+    } catch (UserException e) {
+      // Rethrow user exceptions directly
+      throw e;
+    } catch (Throwable e) {
+      // Wrap all others
+      throw new ExecutionSetupException(e);
+    }
+  }
+
+  private ScanFrameworkBuilder createBuilder(OptionManager options, GoogleSheetsSubScan subScan, GoogleSheetsStoragePlugin plugin) {
+    GoogleSheetsStoragePluginConfig config = subScan.getConfig();
+    ScanFrameworkBuilder builder = new ScanFrameworkBuilder();
+    builder.projection(subScan.getColumns());
+    builder.providedSchema(subScan.getSchema());
+    builder.setUserName(subScan.getUserName());
+
+    // Reader
+    ReaderFactory readerFactory = new GoogleSheetsReaderFactory(config, subScan, plugin);
+    builder.setReaderFactory(readerFactory);
+    builder.nullType(Types.optional(MinorType.VARCHAR));
+    return builder;
+  }
+
+  private static class GoogleSheetsReaderFactory implements ReaderFactory {
+
+    private final GoogleSheetsStoragePluginConfig config;
+    private final GoogleSheetsSubScan subScan;
+    private final GoogleSheetsStoragePlugin plugin;
+    private int count;
+
+    public GoogleSheetsReaderFactory(GoogleSheetsStoragePluginConfig config, GoogleSheetsSubScan subScan, GoogleSheetsStoragePlugin plugin) {
+      this.config = config;
+      this.subScan = subScan;
+      this.plugin = plugin;
+    }
+
+    @Override
+    public void bind(ManagedScanFramework framework) {
+    }
+
+    @Override
+    public ManagedReader<SchemaNegotiator> next() {
+      // Only a single scan (in a single thread)
+      if (count++ == 0) {
+        return new GoogleSheetsBatchReader(config, subScan, plugin);
+      }
+      return null;
+    }
+  }
+}
diff --git a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsScanSpec.java b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsScanSpec.java
new file mode 100644
index 0000000..adcfdc3
--- /dev/null
+++ b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsScanSpec.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.googlesheets;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.exec.planner.logical.DrillTableSelection;
+
+import java.util.Objects;
+
+@JsonTypeName("googlesheets-scan-spec")
+public class GoogleSheetsScanSpec implements DrillTableSelection {
+
+  private final String sheetID;
+  private final GoogleSheetsStoragePluginConfig config;
+  private final String tableName;
+  private final int tabIndex;
+  private final String pluginName;
+
+  public GoogleSheetsScanSpec(@JsonProperty("sheetID") String sheetID,
+                              @JsonProperty("config") GoogleSheetsStoragePluginConfig config,
+                              @JsonProperty("tableName") String tableName,
+                              @JsonProperty("pluginName") String pluginName,
+                              @JsonProperty("tabIndex") int tabIndex) {
+    this.sheetID = sheetID;
+    this.config = config;
+    this.pluginName = pluginName;
+    this.tableName = tableName;
+    this.tabIndex = tabIndex;
+  }
+
+  @JsonProperty("sheetID")
+  public String getSheetID(){
+    return sheetID;
+  }
+
+  @JsonProperty("config")
+  public GoogleSheetsStoragePluginConfig getConfig() {
+    return config;
+  }
+
+  @JsonProperty("tableName")
+  public String getTableName() {
+    return tableName;
+  }
+
+  @JsonProperty("pluginName")
+  public String getPluginName() {
+    return pluginName;
+  }
+
+  @JsonProperty("tabIndex")
+  public int getTabIndex() {
+    return tabIndex;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+
+    GoogleSheetsScanSpec other = (GoogleSheetsScanSpec) obj;
+    return Objects.equals(sheetID, other.sheetID) &&
+      Objects.equals(config, other.config) &&
+      Objects.equals(tableName, other.tableName) &&
+      Objects.equals(pluginName, other.pluginName) &&
+      Objects.equals(tabIndex, other.tabIndex);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(sheetID, config, tableName, tabIndex, pluginName);
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("pluginName", sheetID)
+      .field("config", config)
+      .field("tableName", tableName)
+      .field("pluginName", pluginName)
+      .field("tabIndex", tabIndex)
+      .toString();
+  }
+
+  @Override
+  public String digest() {
+    return toString();
+  }
+}
diff --git a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsStoragePlugin.java b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsStoragePlugin.java
new file mode 100644
index 0000000..379f610
--- /dev/null
+++ b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsStoragePlugin.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.googlesheets;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.api.client.auth.oauth2.StoredCredential;
+import com.google.api.client.util.store.DataStore;
+import com.google.api.services.sheets.v4.Sheets;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
+import org.apache.drill.exec.oauth.OAuthTokenProvider;
+import org.apache.drill.exec.oauth.PersistentTokenTable;
+import org.apache.drill.exec.oauth.TokenRegistry;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.planner.PlannerPhase;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.SessionOptionManager;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.base.filter.FilterPushDownUtils;
+import org.apache.drill.exec.store.googlesheets.schema.GoogleSheetsSchemaFactory;
+import org.apache.drill.exec.store.googlesheets.utils.GoogleSheetsUtils;
+import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.List;
+import java.util.Set;
+
+public class GoogleSheetsStoragePlugin extends AbstractStoragePlugin {
+  private final static Logger logger = LoggerFactory.getLogger(GoogleSheetsStoragePlugin.class);
+  private final static String SHARED_USERNAME = "anonymous";
+  private final GoogleSheetsStoragePluginConfig config;
+  private final GoogleSheetsSchemaFactory schemaFactory;
+  private final OAuthTokenProvider tokenProvider;
+  private DataStore<StoredCredential> dataStore;
+  private Sheets service;
+  private TokenRegistry tokenRegistry;
+  private String username;
+
+
+  public GoogleSheetsStoragePlugin(GoogleSheetsStoragePluginConfig configuration, DrillbitContext context, String name) {
+    super(context, name);
+    this.config = configuration;
+    this.tokenProvider = context.getoAuthTokenProvider();
+    this.schemaFactory = new GoogleSheetsSchemaFactory(this);
+  }
+
+  public void initializeOauthTokenTable(SchemaConfig schemaConfig) {
+    // A word about how GoogleSheets (GS) handles authorization and authentication.
+    // GS uses OAuth 2.0 for authorization.
+    // The GS Sheets object is the client which interacts with the actual data, however
+    // it does not provide a straightforward way of passing credentials into this object.
+    // GS has three objects:  the credential, storedCredential, and the credential dataStore.
+    //
+    // The Credential Object
+    // The credential really should be called the applicationCredential or something like that, as
+    // it stores the OAuth credentials for the application such as the clientID, clientSecret
+    //
+    // The Stored Credential Object
+    // This object has no relation to the Credential object, and it stores the user's credentials,
+    // specifically the access and refresh tokens.
+    //
+    // The DataStore Object is a synchronized store of storedCredential objects.
+    // The approach we take here is to use Drill's existing OAuth infrastructure
+    // to store the tokens in PersistentTokenStores, just like the HTTP plugin. When
+    // the plugin is loaded, we read the tokens from the persistent store into a GS dataStore.
+    // This happens when the plugin is registered.
+
+    if (config.getAuthMode() == AuthMode.USER_TRANSLATION) {
+      this.username = schemaConfig.getUserName();
+      tokenRegistry = tokenProvider.getOauthTokenRegistry(this.username);
+    } else {
+      this.username = SHARED_USERNAME;
+      tokenRegistry = tokenProvider.getOauthTokenRegistry(null);
+    }
+    tokenRegistry.createTokenTable(getName());
+    this.dataStore = new DrillDataStoreFactory(tokenProvider, getName()).createDataStore(this.username);
+  }
+
+  public DataStore<StoredCredential> getDataStore(String username) {
+    if (this.dataStore == null) {
+      this.dataStore = new DrillDataStoreFactory(tokenProvider, getName()).createDataStore(username);
+    }
+    return dataStore;
+  }
+
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
+    initializeOauthTokenTable(schemaConfig);
+    schemaFactory.registerSchemas(schemaConfig, parent);
+  }
+
+  public PersistentTokenTable getTokenTable() {
+    return tokenRegistry.getTokenTable(getName());
+  }
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection,
+                                           SessionOptionManager options) throws IOException {
+    return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS,
+      options, null);
+  }
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection,
+                                           SessionOptionManager options, MetadataProviderManager metadataProviderManager) throws IOException {
+    return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS,
+      options, metadataProviderManager);
+  }
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection,
+                                           List<SchemaPath> columns) throws IOException {
+    return getPhysicalScan(userName, selection, columns, null, null);
+  }
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
+    return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS, null);
+  }
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns, SessionOptionManager options,
+                                           MetadataProviderManager metadataProviderManager) throws IOException {
+    GoogleSheetsScanSpec scanSpec = selection.getListWith(context.getLpPersistence().getMapper(), new TypeReference<GoogleSheetsScanSpec>() {});
+    return new GoogleSheetsGroupScan(this.username, scanSpec, this, metadataProviderManager);
+  }
+
+  @Override
+  public Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext optimizerContext, PlannerPhase phase) {
+
+    // Push-down planning is done at the logical phase so it can
+    // influence parallelization in the physical phase. Note that many
+    // existing plugins perform filter push-down at the physical
+    // phase, which also works fine if push-down is independent of
+    // parallelization.
+    if (FilterPushDownUtils.isFilterPushDownPhase(phase)) {
+      return GoogleSheetsPushDownListener.rulesFor(optimizerContext);
+    } else {
+      return ImmutableSet.of();
+    }
+  }
+
+  @Override
+  public StoragePluginConfig getConfig() {
+    return config;
+  }
+
+  /**
+   * This function is only used for testing and creates the necessary token tables.  Note that
+   * the token tables still need to be populated.
+   */
+  @VisibleForTesting
+  public void initializeTokenTableForTesting() {
+    OAuthTokenProvider tokenProvider = context.getoAuthTokenProvider();
+    tokenRegistry = tokenProvider.getOauthTokenRegistry(null);
+  }
+
+  @Override
+  public boolean supportsWrite() {
+    return true;
+  }
+
+  /**
+   * This method gets (and caches) the Google Service needed for API calls.
+   * @return An authenticated {@link Sheets} Google Sheets service.
+   */
+  public Sheets getSheetsService(String queryUser) {
+    if (service != null && dataStore != null) {
+      return service;
+    } else {
+      // Check if datastore is null and initialize if so.
+      if (dataStore == null) {
+        this.dataStore = getDataStore(queryUser);
+      }
+
+      try {
+        if (config.getAuthMode() == AuthMode.USER_TRANSLATION) {
+          service = GoogleSheetsUtils.getSheetsService(config, dataStore, queryUser);
+        } else {
+          service = GoogleSheetsUtils.getSheetsService(config, dataStore, SHARED_USERNAME);
+        }
+        return service;
+      } catch (IOException | GeneralSecurityException e) {
+        throw UserException.connectionError(e)
+          .message("Error connecting to Googlesheets Service: " + e.getMessage())
+          .build(logger);
+      }
+    }
+  }
+}
diff --git a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsStoragePluginConfig.java b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsStoragePluginConfig.java
new file mode 100644
index 0000000..7f2f2f1
--- /dev/null
+++ b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsStoragePluginConfig.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.googlesheets;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.api.client.googleapis.auth.oauth2.GoogleClientSecrets;
+import com.google.api.client.googleapis.auth.oauth2.GoogleClientSecrets.Details;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.logical.OAuthConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.exec.store.security.CredentialProviderUtils;
+import org.apache.drill.exec.store.security.oauth.OAuthTokenCredentials;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+@JsonTypeName(GoogleSheetsStoragePluginConfig.NAME)
+public class GoogleSheetsStoragePluginConfig extends StoragePluginConfig {
+  private static final String AUTH_URI = "https://accounts.google.com/o/oauth2/auth";
+  private static final String TOKEN_URI = "https://oauth2.googleapis.com/token";
+  private static final String GOOGLE_SHEET_SCOPE = "https://www.googleapis.com/auth/spreadsheets";
+  private static final String GOOGLE_DRIVE_SCOPE = "https://www.googleapis.com/auth/drive.readonly";
+  private static final String DEFAULT_SCOPE = GOOGLE_SHEET_SCOPE + " " + GOOGLE_DRIVE_SCOPE;
+  private static final String DEFAULT_RESPONSE_TYPE = "code";
+  public static final String NAME = "googlesheets";
+  private final List<String> redirectUris;
+  private final String authUri;
+  private final String tokenUri;
+  private final Boolean extractHeaders;
+  private final Boolean allTextMode;
+  private final OAuthConfig oAuthConfig;
+
+  @JsonCreator
+  public GoogleSheetsStoragePluginConfig(@JsonProperty("clientID") String clientID,
+                                         @JsonProperty("clientSecret") String clientSecret,
+                                         @JsonProperty("redirectUris") List<String> redirectUris,
+                                         @JsonProperty("authUri") String authUri,
+                                         @JsonProperty("tokenUri") String tokenUri,
+                                         @JsonProperty("allTextMode") Boolean allTextMode,
+                                         @JsonProperty("extractHeaders") Boolean extractHeaders,
+                                         @JsonProperty("oAuthConfig") OAuthConfig oAuthConfig,
+                                         @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider) {
+    super(CredentialProviderUtils.getCredentialsProvider(clientID, clientSecret, null, null,
+        null, null, null, credentialsProvider),
+      false);
+    this.redirectUris = redirectUris == null ? new ArrayList<>() : redirectUris;
+    this.authUri = StringUtils.isEmpty(authUri) ? AUTH_URI: authUri;
+    this.tokenUri = StringUtils.isEmpty(tokenUri) ? TOKEN_URI: tokenUri;
+
+    this.extractHeaders = extractHeaders != null && extractHeaders;
+    this.allTextMode = allTextMode != null && allTextMode;
+
+    Map<String, String> authParams = new HashMap<>();
+
+    // Add OAuth Information
+    if (oAuthConfig == null) {
+      StringBuilder callbackBuilder = new StringBuilder();
+      if (redirectUris != null) {
+        boolean firstRun = true;
+        for (String url: redirectUris) {
+          if (!firstRun) {
+            callbackBuilder.append(",");
+          }
+          callbackBuilder.append(url);
+          firstRun = false;
+        }
+        // Add additional parameter for Google Authentication
+        authParams.put("response_type", DEFAULT_RESPONSE_TYPE);
+        authParams.put("scope",DEFAULT_SCOPE);
+      }
+
+      this.oAuthConfig = OAuthConfig.builder()
+        .authorizationURL(AUTH_URI)
+        .callbackURL(callbackBuilder.toString())
+        .authorizationParams(authParams)
+        .build();
+    } else {
+      this.oAuthConfig = oAuthConfig;
+    }
+  }
+
+  public GoogleSheetsStoragePluginConfig(GoogleSheetsStoragePluginConfig that, CredentialsProvider credentialsProvider) {
+    super(credentialsProvider, false, that.authMode);
+    this.redirectUris = that.redirectUris;
+    this.authUri = that.authUri;
+    this.tokenUri = that.tokenUri;
+    this.extractHeaders = that.extractHeaders;
+    this.allTextMode = that.allTextMode;
+    this.oAuthConfig = that.oAuthConfig;
+  }
+
+  @JsonIgnore
+  public static GoogleSheetsStoragePluginConfigBuilder builder() {
+    return new GoogleSheetsStoragePluginConfigBuilder();
+  }
+
+  @JsonIgnore
+  public Optional<OAuthTokenCredentials> getOAuthCredentials() {
+    return new OAuthTokenCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider)
+      .build();
+  }
+
+  @JsonProperty("clientID")
+  public String getClientID() {
+    if(getOAuthCredentials().isPresent()) {
+      return getOAuthCredentials().get().getClientID();
+    } else {
+      return null;
+    }
+  }
+
+  @JsonProperty("clientSecret")
+  public String getClientSecret() {
+    if (getOAuthCredentials().isPresent()) {
+      return getOAuthCredentials().get().getClientSecret();
+    } else {
+      return null;
+    }
+  }
+
+  @JsonProperty("allTextMode")
+  public Boolean allTextMode() {
+    return allTextMode;
+  }
+
+  @JsonProperty("extractHeaders")
+  public Boolean getExtractHeaders() {
+    return extractHeaders;
+  }
+
+  @JsonProperty("oAuthConfig")
+  public OAuthConfig getoAuthConfig() {
+    return oAuthConfig;
+  }
+
+  @JsonIgnore
+  public GoogleClientSecrets getSecrets() {
+    Details details = new Details()
+      .setClientId(getClientID())
+      .setClientSecret(getClientSecret())
+      .setRedirectUris(redirectUris)
+      .setAuthUri(oAuthConfig.getAuthorizationURL())
+      .setTokenUri(tokenUri);
+
+    return new GoogleClientSecrets().setInstalled(details);
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (this == that) {
+      return true;
+    } else if (that == null || getClass() != that.getClass()) {
+      return false;
+    }
+    GoogleSheetsStoragePluginConfig thatConfig  = (GoogleSheetsStoragePluginConfig) that;
+    return Objects.equals(credentialsProvider, thatConfig.credentialsProvider) &&
+      Objects.equals(redirectUris, thatConfig.redirectUris) &&
+      Objects.equals(tokenUri, thatConfig.tokenUri) &&
+      Objects.equals(allTextMode, thatConfig.allTextMode) &&
+      Objects.equals(oAuthConfig, thatConfig.oAuthConfig) &&
+      Objects.equals(extractHeaders, thatConfig.extractHeaders);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(credentialsProvider, redirectUris, tokenUri, allTextMode, extractHeaders, oAuthConfig);
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("credentialProvider", credentialsProvider)
+      .field("redirectUris", redirectUris.toArray())
+      .field("extractHeaders", extractHeaders)
+      .field("allTextMode", allTextMode)
+      .field("tokenUri", tokenUri)
+      .field("oauthConfig", oAuthConfig)
+      .toString();
+  }
+
+  @Override
+  public StoragePluginConfig updateCredentialProvider(CredentialsProvider credentialsProvider) {
+    return new GoogleSheetsStoragePluginConfig(this, credentialsProvider);
+  }
+
+  public static class GoogleSheetsStoragePluginConfigBuilder {
+    private String clientID;
+
+    private String clientSecret;
+
+    private List<String> redirectUris;
+
+    private String authUri;
+
+    private String tokenUri;
+
+    private OAuthConfig oAuthConfig;
+
+    private Boolean allTextMode;
+
+    private Boolean extractHeaders;
+
+    private CredentialsProvider credentialsProvider;
+
+    GoogleSheetsStoragePluginConfigBuilder() {
+    }
+
+    public GoogleSheetsStoragePluginConfigBuilder clientID(String clientID) {
+      this.clientID = clientID;
+      return this;
+    }
+
+    public GoogleSheetsStoragePluginConfigBuilder clientSecret(String clientSecret) {
+      this.clientSecret = clientSecret;
+      return this;
+    }
+
+    public GoogleSheetsStoragePluginConfigBuilder redirectUris(List<String> redirectUris) {
+      this.redirectUris = redirectUris;
+      return this;
+    }
+
+    public GoogleSheetsStoragePluginConfigBuilder authUri(String authUri) {
+      this.authUri = authUri;
+      return this;
+    }
+
+    public GoogleSheetsStoragePluginConfigBuilder tokenUri(String tokenUri) {
+      this.tokenUri = tokenUri;
+      return this;
+    }
+
+    public GoogleSheetsStoragePluginConfigBuilder OAuthConfig(OAuthConfig oAuthConfig) {
+      this.oAuthConfig = oAuthConfig;
+      return this;
+    }
+
+    public GoogleSheetsStoragePluginConfigBuilder allTextMode(Boolean allTextMode) {
+      this.allTextMode = allTextMode;
+      return this;
+    }
+
+    public GoogleSheetsStoragePluginConfigBuilder extractHeaders(Boolean extractHeaders) {
+      this.extractHeaders = extractHeaders;
+      return this;
+    }
+
+    public GoogleSheetsStoragePluginConfigBuilder credentialsProvider(CredentialsProvider credentialsProvider) {
+      this.credentialsProvider = credentialsProvider;
+      return this;
+    }
+
+    public GoogleSheetsStoragePluginConfig build() {
+      return new GoogleSheetsStoragePluginConfig(clientID, clientSecret, redirectUris, authUri, tokenUri, allTextMode, extractHeaders, oAuthConfig, credentialsProvider);
+    }
+
+    @Override
+    public String toString() {
+      return new PlanStringBuilder(this)
+        .field("clientID", clientID)
+        .maskedField("clientSecret", clientSecret)
+        .field("allTextMode", allTextMode)
+        .field("extractHeaders", extractHeaders)
+        .field("redirectUris", redirectUris)
+        .field("authUri", authUri)
+        .field("tokenUri", tokenUri)
+        .field("oAuthConfig", oAuthConfig)
+        .toString();
+    }
+  }
+}
diff --git a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsSubScan.java b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsSubScan.java
new file mode 100644
index 0000000..88c8df0
--- /dev/null
+++ b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsSubScan.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.googlesheets;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractSubScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.base.filter.ExprNode;
+import org.apache.drill.exec.store.base.filter.ExprNode.ColRelOpConstNode;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+@JsonTypeName("googlesheets-sub-scan")
+public class GoogleSheetsSubScan extends AbstractSubScan {
+
+  public static final String OPERATOR_TYPE = "GOOGLESHEETS_SUB_SCAN";
+
+  private final GoogleSheetsStoragePluginConfig config;
+  private final GoogleSheetsScanSpec scanSpec;
+  private final List<SchemaPath> columns;
+  private final Map<String, ColRelOpConstNode> filters;
+  private final int maxRecords;
+  private final TupleMetadata schema;
+
+  @JsonCreator
+  public GoogleSheetsSubScan(
+    @JsonProperty("userName") String username,
+    @JsonProperty("config") GoogleSheetsStoragePluginConfig config,
+    @JsonProperty("tableSpec") GoogleSheetsScanSpec scanSpec,
+    @JsonProperty("columns") List<SchemaPath> columns,
+    @JsonProperty("filters") Map<String, ColRelOpConstNode> filters,
+    @JsonProperty("maxRecords") int maxRecords,
+    @JsonProperty("schema") TupleMetadata schema) {
+    super(username);
+    this.config = config;
+    this.scanSpec = scanSpec;
+    this.columns = columns;
+    this.filters = filters;
+    this.schema = schema;
+    this.maxRecords = maxRecords;
+  }
+
+  @JsonProperty("config")
+  public GoogleSheetsStoragePluginConfig getConfig() {
+    return config;
+  }
+
+  @JsonProperty("tableSpec")
+  public GoogleSheetsScanSpec getScanSpec() {
+    return scanSpec;
+  }
+
+  @JsonProperty("columns")
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
+  @JsonProperty("filters")
+  public Map<String, ExprNode.ColRelOpConstNode> getFilters() {
+    return filters;
+  }
+
+  @JsonProperty("maxRecords")
+  public int getMaxRecords() {
+    return maxRecords;
+  }
+
+  @JsonProperty("schema")
+  public TupleMetadata getSchema() {
+    return schema;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(
+    PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitSubScan(this, value);
+  }
+
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return ImmutableSet.<PhysicalOperator>of().iterator();
+  }
+
+  @Override
+  @JsonIgnore
+  public String getOperatorType() {
+    return OPERATOR_TYPE;
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("config", config)
+      .field("tableSpec", scanSpec)
+      .field("columns", columns)
+      .field("filters", filters)
+      .field("maxRecords", maxRecords)
+      .field("schema", schema)
+      .toString();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(config, scanSpec, columns, filters, maxRecords, schema);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    GoogleSheetsSubScan other = (GoogleSheetsSubScan) obj;
+    return Objects.equals(scanSpec, other.scanSpec)
+      && Objects.equals(config, other.config)
+      && Objects.equals(columns, other.columns)
+      && Objects.equals(filters, other.filters)
+      && Objects.equals(schema, other.schema)
+      && Objects.equals(maxRecords, other.maxRecords);
+  }
+}
diff --git a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsWriter.java b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsWriter.java
new file mode 100644
index 0000000..54370f4
--- /dev/null
+++ b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsWriter.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.googlesheets;
+
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+public class GoogleSheetsWriter extends AbstractWriter {
+
+  public static final String OPERATOR_TYPE = "GOOGLESHEETS_WRITER";
+
+  private final GoogleSheetsStoragePlugin plugin;
+  private final String tableName;
+  private final String sheetName;
+  private final String queryUser;
+
+  @JsonCreator
+  public GoogleSheetsWriter(
+    @JsonProperty("child") PhysicalOperator child,
+    @JsonProperty("sheetName") String sheetName,
+    @JsonProperty("name") String name,
+    @JsonProperty("storage") StoragePluginConfig storageConfig,
+    @JsonProperty("queryUser") String queryUser,
+    @JacksonInject StoragePluginRegistry engineRegistry) {
+    super(child);
+    this.plugin = engineRegistry.resolve(storageConfig, GoogleSheetsStoragePlugin.class);
+    this.sheetName = sheetName;
+    this.queryUser = queryUser;
+    this.tableName = name;
+  }
+
+  public GoogleSheetsWriter(PhysicalOperator child, String sheetName, String name, String queryUser, GoogleSheetsStoragePlugin plugin) {
+    super(child);
+    this.tableName = name;
+    this.sheetName = sheetName;
+    this.queryUser = queryUser;
+    this.plugin = plugin;
+  }
+
+  @Override
+  public String getOperatorType() {
+    return OPERATOR_TYPE;
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new GoogleSheetsWriter(child, sheetName, tableName, queryUser, plugin);
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public String getSheetName() {
+    return sheetName;
+  }
+
+  public String getQueryUser() {
+    return queryUser;
+  }
+
+  public StoragePluginConfig getStorage() {
+    return plugin.getConfig();
+  }
+
+  @JsonIgnore
+  public GoogleSheetsStoragePlugin getPlugin() {
+    return plugin;
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("tableName", tableName)
+      .field("sheetName", sheetName)
+      .field("queryUser", queryUser)
+      .field("storageStrategy", getStorageStrategy())
+      .toString();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (this == that) {
+      return true;
+    } else if (that == null || getClass() != that.getClass()) {
+      return false;
+    }
+    GoogleSheetsWriter otherWriter  = (GoogleSheetsWriter) that;
+    return Objects.equals(tableName, otherWriter.tableName) &&
+      Objects.equals(sheetName, otherWriter.sheetName) &&
+      Objects.equals(queryUser, otherWriter.queryUser);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(tableName, sheetName, queryUser);
+  }
+}
diff --git a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsWriterBatchCreator.java b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsWriterBatchCreator.java
new file mode 100644
index 0000000..cd92ffd
--- /dev/null
+++ b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsWriterBatchCreator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.googlesheets;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.WriterRecordBatch;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+
+import java.util.List;
+
+public class GoogleSheetsWriterBatchCreator implements BatchCreator<GoogleSheetsWriter> {
+
+  @Override
+  public CloseableRecordBatch getBatch(ExecutorFragmentContext context, GoogleSheetsWriter config, List<RecordBatch> children)
+    throws ExecutionSetupException {
+    assert children != null && children.size() == 1;
+
+    return new WriterRecordBatch(config, children.iterator().next(), context,
+      new GoogleSheetsBatchWriter (null, config.getTableName(), config));
+  }
+}
diff --git a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/columns/GoogleSheetsColumnRange.java b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/columns/GoogleSheetsColumnRange.java
new file mode 100644
index 0000000..d6752a6
--- /dev/null
+++ b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/columns/GoogleSheetsColumnRange.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.googlesheets.columns;
+
+import org.apache.drill.exec.store.googlesheets.utils.GoogleSheetsUtils;
+
+public class GoogleSheetsColumnRange {
+  private Integer startColIndex;
+  private Integer endColIndex;
+  private final String sheetName;
+
+  public GoogleSheetsColumnRange(String sheetName) {
+    this.sheetName = sheetName;
+  }
+
+  public Integer getStartColIndex() {
+    return startColIndex;
+  }
+
+  public String getStartColumnLetter() {
+    return GoogleSheetsUtils.columnToLetter(startColIndex + 1);
+  }
+
+  public String getEndColumnLetter() {
+    return GoogleSheetsUtils.columnToLetter(endColIndex + 1);
+  }
+
+  public Integer getEndColIndex() {
+    return endColIndex;
+  }
+
+  public GoogleSheetsColumnRange setStartIndex(int startColIndex) {
+    this.startColIndex = startColIndex;
+    return this;
+  }
+
+  public GoogleSheetsColumnRange setEndIndex(int endColIndex) {
+    this.endColIndex = endColIndex;
+    return this;
+  }
+}
diff --git a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/columns/GoogleSheetsColumnWriter.java b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/columns/GoogleSheetsColumnWriter.java
new file mode 100644
index 0000000..471106c
--- /dev/null
+++ b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/columns/GoogleSheetsColumnWriter.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.googlesheets.columns;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+
+public abstract class GoogleSheetsColumnWriter {
+  protected static final Logger logger = LoggerFactory.getLogger(GoogleSheetsColumnWriter.class);
+  protected final RowSetLoader rowWriter;
+  protected final ScalarWriter columnWriter;
+
+  public GoogleSheetsColumnWriter(RowSetLoader rowWriter, String colName) {
+    this.rowWriter = rowWriter;
+    this.columnWriter = rowWriter.scalar(colName);
+  }
+
+  public abstract void load(Object value);
+
+  public static class GoogleSheetsBigIntegerColumnWriter extends GoogleSheetsColumnWriter {
+    public GoogleSheetsBigIntegerColumnWriter(RowSetLoader rowWriter, String colName) {
+      super(rowWriter, colName);
+    }
+
+    @Override
+    public void load(Object rawValue) {
+      String stringValue = (String) rawValue;
+      if (StringUtils.isNotEmpty(stringValue)) {
+        long finalValue;
+        try {
+          finalValue = Long.parseLong(stringValue);
+          columnWriter.setLong(finalValue);
+        } catch (NumberFormatException e) {
+          logger.info("Could not parse {} into long from Googlesheets.", stringValue);
+        }
+      }
+    }
+  }
+
+  public static class GoogleSheetsBooleanColumnWriter extends GoogleSheetsColumnWriter {
+
+    public GoogleSheetsBooleanColumnWriter(RowSetLoader rowWriter, String colName) {
+      super(rowWriter, colName);
+    }
+
+    @Override
+    public void load(Object rawValue) {
+      String stringValue = (String)rawValue;
+
+      if (StringUtils.isNotEmpty(stringValue)) {
+        boolean finalValue;
+        try {
+          finalValue = cleanUpIncomingString(stringValue);
+          columnWriter.setBoolean(finalValue);
+        } catch (NumberFormatException e) {
+          logger.info("Could not parse {} into long from Googlesheets.", stringValue);
+        }
+      }
+    }
+
+    private boolean cleanUpIncomingString(String incoming) {
+      if (StringUtils.isEmpty(incoming)) {
+        return false;
+      } else if (incoming.equalsIgnoreCase("1")) {
+        return true;
+      } else if (incoming.equalsIgnoreCase("0")) {
+        return false;
+      } else {
+        return Boolean.parseBoolean(incoming);
+      }
+    }
+  }
+
+  public static class GoogleSheetsDateColumnWriter extends GoogleSheetsColumnWriter {
+
+    public GoogleSheetsDateColumnWriter(RowSetLoader rowWriter, String colName) {
+      super(rowWriter, colName);
+    }
+
+    @Override
+    public void load(Object rawValue) {
+      String stringValue = (String)rawValue;
+      if (StringUtils.isNotEmpty(stringValue)) {
+        LocalDate finalValue;
+        try {
+          finalValue = LocalDate.parse(stringValue);
+        } catch (NumberFormatException e) {
+          finalValue = null;
+        }
+        columnWriter.setDate(finalValue);
+      }
+    }
+  }
+
+  public static class GoogleSheetsFloatColumnWriter extends GoogleSheetsColumnWriter {
+
+    public GoogleSheetsFloatColumnWriter(RowSetLoader rowWriter, String colName) {
+      super(rowWriter, colName);
+    }
+
+    @Override
+    public void load(Object rawValue) {
+      String stringValue = (String)rawValue;
+      if (StringUtils.isNotEmpty(stringValue)) {
+        float finalValue;
+        try {
+          finalValue = Float.parseFloat(stringValue);
+        } catch (NumberFormatException e) {
+          finalValue = Float.NaN;
+        }
+        columnWriter.setFloat(finalValue);
+      }
+    }
+  }
+
+  public static class GoogleSheetsIntegerColumnWriter extends GoogleSheetsColumnWriter {
+
+    public GoogleSheetsIntegerColumnWriter(RowSetLoader rowWriter, String colName) {
+      super(rowWriter, colName);
+    }
+
+    @Override
+    public void load(Object rawValue) {
+      String stringValue = (String)rawValue;
+      if (StringUtils.isNotEmpty(stringValue)) {
+        int finalValue;
+        try {
+          finalValue = (int) Math.floor(Double.parseDouble(stringValue));
+          columnWriter.setInt(finalValue);
+        } catch (NumberFormatException e) {
+          logger.info("Could not parse {} into integer from Googlesheets.", stringValue);
+        }
+      }
+    }
+  }
+
+  public static class GoogleSheetsNumericColumnWriter extends GoogleSheetsColumnWriter {
+
+    public GoogleSheetsNumericColumnWriter(RowSetLoader rowWriter, String colName) {
+      super(rowWriter, colName);
+    }
+
+    @Override
+    public void load(Object rawValue) {
+      String stringValue = (String)rawValue;
+      if (StringUtils.isNotEmpty(stringValue)) {
+        double finalValue;
+        try {
+          finalValue = Double.parseDouble(stringValue);
+        } catch (NumberFormatException e) {
+          finalValue = Double.NaN;
+        }
+        columnWriter.setDouble(finalValue);
+      }
+    }
+  }
+
+  public static class GoogleSheetsTimeColumnWriter extends GoogleSheetsColumnWriter {
+
+    public GoogleSheetsTimeColumnWriter(RowSetLoader rowWriter, String colName) {
+      super(rowWriter, colName);
+    }
+
+    @Override
+    public void load(Object rawValue) {
+      String stringValue = (String)rawValue;
+      if (StringUtils.isNotEmpty(stringValue)) {
+        LocalTime finalValue;
+        try {
+          finalValue = LocalTime.parse(stringValue);
+        } catch (NumberFormatException e) {
+          finalValue = null;
+        }
+        columnWriter.setTime(finalValue);
+      }
+    }
+  }
+
+  public static class GoogleSheetsTimestampColumnWriter extends GoogleSheetsColumnWriter {
+
+    public GoogleSheetsTimestampColumnWriter(RowSetLoader rowWriter, String colName) {
+      super(rowWriter, colName);
+    }
+
+    @Override
+    public void load(Object rawValue) {
+      String stringValue = (String)rawValue;
+      if (StringUtils.isNotEmpty(stringValue)) {
+        Instant finalValue;
+        try {
+          finalValue = Instant.parse(stringValue);
+        } catch (NumberFormatException e) {
+          finalValue = null;
+        }
+        columnWriter.setTimestamp(finalValue);
+      }
+    }
+  }
+
+  public static class GoogleSheetsVarcharColumnWriter extends GoogleSheetsColumnWriter {
+
+    public GoogleSheetsVarcharColumnWriter(RowSetLoader rowWriter, String colName) {
+      super(rowWriter, colName);
+    }
+
+    @Override
+    public void load(Object rawValue) {
+      String stringValue = (String)rawValue;
+      if (StringUtils.isNotEmpty(stringValue)) {
+        columnWriter.setString(stringValue);
+      }
+    }
+  }
+}
diff --git a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/schema/GoogleSheetsDrillSchema.java b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/schema/GoogleSheetsDrillSchema.java
new file mode 100644
index 0000000..531337f
--- /dev/null
+++ b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/schema/GoogleSheetsDrillSchema.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.googlesheets.schema;
+
+import com.google.api.services.sheets.v4.model.Sheet;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Writer;
+import org.apache.drill.exec.planner.logical.CreateTableEntry;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.StorageStrategy;
+import org.apache.drill.exec.store.googlesheets.GoogleSheetsScanSpec;
+import org.apache.drill.exec.store.googlesheets.GoogleSheetsStoragePlugin;
+import org.apache.drill.exec.store.googlesheets.GoogleSheetsStoragePluginConfig;
+import org.apache.drill.exec.store.googlesheets.GoogleSheetsWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class represents the actual tab within a GoogleSheets document.
+ */
+public class GoogleSheetsDrillSchema extends AbstractSchema {
+  private static final Logger logger = LoggerFactory.getLogger(GoogleSheetsDrillSchema.class);
+
+  private final Map<String, DynamicDrillTable> activeTables = CaseInsensitiveMap.newHashMap();
+  private final GoogleSheetsStoragePlugin plugin;
+
+  private final SchemaConfig schemaConfig;
+
+  public GoogleSheetsDrillSchema(AbstractSchema parent, String name,
+                                 GoogleSheetsStoragePlugin plugin,
+                                 List<Sheet> subSchemas, SchemaConfig schemaConfig) {
+    super(parent.getSchemaPath(), name);
+    this.plugin = plugin;
+    this.schemaConfig = schemaConfig;
+
+    // Add sub schemas to list, then create tables
+    for (Sheet sheet : subSchemas) {
+      registerTable(sheet.getProperties().getTitle(),
+        new DynamicDrillTable(plugin, plugin.getName(),
+        new GoogleSheetsScanSpec(
+          name,
+          (GoogleSheetsStoragePluginConfig) plugin.getConfig(),
+          sheet.getProperties().getTitle(),
+          plugin.getName(),
+          subSchemas.indexOf(sheet))
+        )
+      );
+    }
+  }
+
+  public void setHolder(SchemaPlus plusOfThis) {
+    for (String s : getSubSchemaNames()) {
+      GoogleSheetsDrillSchema inner = getSubSchema(s);
+      SchemaPlus holder = plusOfThis.add(s, inner);
+      inner.setHolder(holder);
+    }
+  }
+
+  @Override
+  public String getTypeName() {
+    return GoogleSheetsStoragePluginConfig.NAME;
+  }
+
+  @Override
+  public Table getTable(String tableName) {
+    logger.debug("Getting table: {}", tableName);
+    DynamicDrillTable table = activeTables.computeIfAbsent(tableName, this::getDrillTable);
+    if (table != null) {
+      logger.debug("Found table: {}", table.getJdbcTableType().jdbcName);
+    } else {
+      logger.debug("Oh no! {} not found and returning null!", tableName);
+      return null;
+    }
+    return table;
+  }
+
+  private DynamicDrillTable getDrillTable(String tableName) {
+    logger.debug("Getting Drill Table {}", tableName);
+    return activeTables.get(tableName);
+  }
+
+  @Override
+  public Set<String> getTableNames() {
+    return activeTables.keySet();
+  }
+
+  @Override
+  public GoogleSheetsDrillSchema getSubSchema(String name) {
+    return null;
+  }
+
+  @Override
+  public boolean isMutable() {
+    return plugin.supportsWrite();
+  }
+
+  @Override
+  public CreateTableEntry createNewTable(String tableName,
+                                         List<String> partitionColumns,
+                                         StorageStrategy storageStrategy) {
+    if (! plugin.supportsWrite()) {
+      throw UserException
+        .dataWriteError()
+        .message(plugin.getName() + " is not writable.")
+        .build(logger);
+    }
+    String documentName = this.name;
+    return new CreateTableEntry() {
+      @Override
+      public Writer getWriter(PhysicalOperator child) {
+        return new GoogleSheetsWriter(child, documentName, tableName, schemaConfig.getUserName(), plugin);
+      }
+
+      @Override
+      public List<String> getPartitionColumns() {
+        return Collections.emptyList();
+      }
+    };
+  }
+
+  private DynamicDrillTable registerTable(String name, DynamicDrillTable table) {
+    activeTables.put(name, table);
+    return table;
+  }
+}
diff --git a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/schema/GoogleSheetsRootSchema.java b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/schema/GoogleSheetsRootSchema.java
new file mode 100644
index 0000000..494a08c
--- /dev/null
+++ b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/schema/GoogleSheetsRootSchema.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.googlesheets.schema;
+
+import com.google.api.services.sheets.v4.Sheets;
+import com.google.api.services.sheets.v4.model.Sheet;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.googlesheets.GoogleSheetsStoragePlugin;
+import org.apache.drill.exec.store.googlesheets.GoogleSheetsStoragePluginConfig;
+import org.apache.drill.exec.store.googlesheets.utils.GoogleSheetsUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class GoogleSheetsRootSchema extends AbstractSchema {
+  private static final Logger logger = LoggerFactory.getLogger(GoogleSheetsRootSchema.class);
+
+  private final Map<String, DynamicDrillTable> activeTables = new HashMap<>();
+  private final Map<String, GoogleSheetsDrillSchema> schemas = new HashMap<>();
+
+  private List<Sheet> sheetList = new ArrayList<>();
+  private final GoogleSheetsStoragePlugin plugin;
+  private final SchemaConfig schemaConfig;
+
+
+  public GoogleSheetsRootSchema(GoogleSheetsStoragePlugin plugin, SchemaConfig schemaConfig) {
+    super(Collections.emptyList(), plugin.getName());
+    this.schemaConfig = schemaConfig;
+    this.plugin = plugin;
+  }
+
+  void setHolder(SchemaPlus plusOfThis) {
+    for (String s : getSubSchemaNames()) {
+      GoogleSheetsDrillSchema inner = getSubSchema(s);
+      SchemaPlus holder = plusOfThis.add(s, inner);
+      inner.setHolder(holder);
+    }
+  }
+
+  @Override
+  public Set<String> getSubSchemaNames() {
+    return schemas.keySet();
+  }
+
+  @Override
+  public GoogleSheetsDrillSchema getSubSchema(String name) {
+    GoogleSheetsDrillSchema schema = schemas.get(name);
+    // This level here represents the actual Google document. Attempt to validate that it exists, and
+    // if so, add it to the schema list.  If not, throw an exception.
+    //
+    // TODO In the future, we could add a check here to see whether the user has the DRIVE permission, and if so,
+    // retrieve the actual "file" name to use in the query instead of the non-readable ID.
+    if (schema == null) {
+      Sheets service = plugin.getSheetsService(schemaConfig.getUserName());
+      try {
+        // This is needed for stored credentials.  In theory while we aren't impersonating the user
+        // we are storing separate access tokens for each user.
+        logger.debug("Accessing credentials for {}", schemaConfig.getUserName());
+
+        sheetList = GoogleSheetsUtils.getSheetList(service, name);
+      } catch (IOException e) {
+        // Do nothing
+      }
+      // At this point we know we have a valid sheet because we obtained the Sheet list, so we need to
+      // add the schema to the schemas list and return it.
+      schema = new GoogleSheetsDrillSchema(this, name, plugin, sheetList, schemaConfig);
+      schemas.put(name, schema);
+    }
+    return schema;
+  }
+
+  @Override
+  public Table getTable(String tableName) {
+    logger.debug("Getting table in root schema: {}", tableName);
+    DynamicDrillTable table = activeTables.computeIfAbsent(tableName, this::getDrillTable);
+    if (table != null) {
+      logger.debug("Found table: {}", table.getJdbcTableType().jdbcName);
+    } else {
+      logger.debug("Oh no! {} not found and returning null!", tableName);
+    }
+    return table;
+  }
+
+  private DynamicDrillTable getDrillTable(String tableName) {
+    logger.debug("Getting Drill Table in Root schema {}", tableName);
+    return activeTables.get(tableName);
+  }
+
+  @Override
+  public String getTypeName() {
+    return GoogleSheetsStoragePluginConfig.NAME;
+  }
+}
diff --git a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/schema/GoogleSheetsSchemaFactory.java b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/schema/GoogleSheetsSchemaFactory.java
new file mode 100644
index 0000000..8835fcb
--- /dev/null
+++ b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/schema/GoogleSheetsSchemaFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.googlesheets.schema;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.exec.store.AbstractSchemaFactory;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.googlesheets.GoogleSheetsStoragePlugin;
+import org.apache.drill.exec.store.googlesheets.GoogleSheetsStoragePluginConfig;
+
+public class GoogleSheetsSchemaFactory extends AbstractSchemaFactory {
+  private final GoogleSheetsStoragePlugin plugin;
+
+  public GoogleSheetsSchemaFactory(GoogleSheetsStoragePlugin plugin) {
+    super(GoogleSheetsStoragePluginConfig.NAME);
+    this.plugin = plugin;
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
+    GoogleSheetsRootSchema schema = new GoogleSheetsRootSchema(plugin, schemaConfig);
+    SchemaPlus holder = parent.add(getName(), schema);
+    schema.setHolder(holder);
+  }
+}
diff --git a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/utils/GoogleSheetsRangeBuilder.java b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/utils/GoogleSheetsRangeBuilder.java
new file mode 100644
index 0000000..18b0531
--- /dev/null
+++ b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/utils/GoogleSheetsRangeBuilder.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.googlesheets.utils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.exec.store.googlesheets.columns.GoogleSheetsColumnRange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * This class is used to construct a range with the GoogleSheet reader in Drill.  This
+ * builder generates ranges and can apply the projection and limit pushdowns to the ranges.
+ *
+ * GoogleSheets uses A1 notation for defining columns and ranges. An example would be:
+ * 'Sheet1'!A11:F20
+ *
+ */
+public class GoogleSheetsRangeBuilder implements Iterator<String> {
+  private static final Logger logger = LoggerFactory.getLogger(GoogleSheetsRangeBuilder.class);
+
+  private final List<String> columns;
+  private final String sheetName;
+  private final int batchSize;
+  private List<GoogleSheetsColumnRange> projectedRanges;
+  private int limit;
+  private boolean hasMore;
+  private int batchIndex;
+  private boolean isStarQuery;
+  private String firstColumn;
+  private String lastColumn;
+  private int rowCount;
+
+  public GoogleSheetsRangeBuilder(String sheetName, int batchSize) {
+    this.sheetName = sheetName;
+    this.batchSize = batchSize;
+    columns = new ArrayList<>();
+    batchIndex = -1;
+    limit = 0;
+    hasMore = true;
+    rowCount = 0;
+    isStarQuery = false;
+  }
+
+  public GoogleSheetsRangeBuilder addColumn(String columnLetter) {
+    columns.add(columnLetter);
+    return this;
+  }
+
+  public GoogleSheetsRangeBuilder addColumn(int columnIndex) {
+    columns.add(GoogleSheetsUtils.columnToLetter(columnIndex));
+    return this;
+  }
+
+  /**
+   * Adds a limit to the range builder.
+   * @param limit The maximum number of total results returned
+   * @return The range builder with the limit applied
+   */
+  public GoogleSheetsRangeBuilder addLimit(int limit) {
+    this.limit = limit;
+
+    return this;
+  }
+
+  public GoogleSheetsRangeBuilder addFirstColumn(String column) {
+    this.firstColumn = column;
+    return this;
+  }
+
+  public GoogleSheetsRangeBuilder addLastColumn(String column) {
+    this.lastColumn = column;
+    return this;
+  }
+
+  public GoogleSheetsRangeBuilder addRowCount(int rowCount) {
+    this.rowCount = rowCount;
+    return this;
+  }
+
+  public GoogleSheetsRangeBuilder addProjectedRanges(List<GoogleSheetsColumnRange> projectedRanges) {
+    this.projectedRanges = projectedRanges;
+    this.isStarQuery = false;
+    return this;
+  }
+
+  public GoogleSheetsRangeBuilder isStarQuery(boolean isStarQuery) {
+    this.isStarQuery = isStarQuery;
+    return this;
+  }
+
+  private int getStartIndex() {
+    return (batchIndex * batchSize) + 1;
+  }
+
+  private int getEndIndex() {
+    // We have a few cases here:
+    int end;
+    if (limit == 0 && rowCount == 0) {
+      // Case 1.  We have no limit or exact row count
+      return (batchIndex + 1) * batchSize;
+    } else if (limit > 0 && rowCount == 0) {
+      // Case 2:  We have a limit but no exact row count
+      end = Math.min(((batchIndex + 1) * batchSize), limit);
+    } else if (rowCount > 0 && limit == 0) {
+      // Case 3:  We have a rowCount but no limit
+      end = Math.min(((batchIndex + 1) * batchSize), rowCount);
+    } else {
+      // We have both a rowCount and a limit
+      end = Math.min(((batchIndex + 1) * batchSize), rowCount);
+      end = Math.min(end, limit);
+    }
+    return end;
+  }
+
+  /**
+   * When a limit is not present, the BatchReader must call this method
+   * to indicate when there are no more results and to stop generating new
+   * ranges.
+   */
+  public void lastBatch() {
+    hasMore = false;
+  }
+
+  private String build() {
+    if (!hasMore) {
+      return null;
+    } else if (getStartIndex() > getEndIndex()) {
+      hasMore = false;
+      return null;
+    }
+
+    StringBuilder range = new StringBuilder();
+    // In the case of a star query, without columns provided all columns are projected.
+    // In this case, the range is <SheetName>!>StartIndex:EndIndex
+    if ((columns.size() == 0 || isStarQuery) &&
+      projectedRanges == null &&
+      StringUtils.isEmpty(firstColumn) &&
+      StringUtils.isEmpty(lastColumn)) {
+      range.append("'")
+        .append(sheetName)
+        .append("'!")
+        .append(getStartIndex())
+        .append(":")
+        .append(getEndIndex());
+    } else if (columns.size() == 0 && isStarQuery) {
+      range.append("'")
+        .append(sheetName)
+        .append("'!")
+        .append(firstColumn)
+        .append(getStartIndex())
+        .append(":")
+        .append(lastColumn)
+        .append(getEndIndex());
+    } else if (projectedRanges != null && projectedRanges.size() > 0) {
+      range.append("'")
+        .append(sheetName)
+        .append("'!");
+      int rangeCount = 0;
+      for (GoogleSheetsColumnRange columnRange : projectedRanges) {
+        if (rangeCount > 0) {
+          range.append(",");
+        }
+        range.append(columnRange.getStartColumnLetter())
+          .append(getStartIndex())
+          .append(":")
+          .append(columnRange.getEndColumnLetter())
+          .append(getEndIndex());
+        rangeCount++;
+      }
+    }
+
+    logger.debug("Range built: {}", range);
+    return range.toString();
+  }
+
+  private List<String> buildBatchList() {
+    if (isStarQuery) {
+      return null;
+    }
+
+    List<String> batchList = new ArrayList<>();
+    StringBuilder batch = new StringBuilder();
+
+    for (GoogleSheetsColumnRange columnRange : projectedRanges) {
+      batch.append("'")
+        .append(sheetName)
+        .append("'!")
+        .append(columnRange.getStartColumnLetter())
+        .append(getStartIndex())
+        .append(":")
+        .append(columnRange.getEndColumnLetter())
+        .append(getEndIndex());
+      batchList.add(batch.toString());
+      batch = new StringBuilder();
+    }
+    return batchList;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return hasMore;
+  }
+
+  @Override
+  public String next() {
+    batchIndex++;
+    return build();
+  }
+
+  public List<String> nextBatch() {
+    batchIndex++;
+    return buildBatchList();
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("Sheet name", sheetName)
+      .field("Batch size", batchSize)
+      .field("Limit", limit)
+      .field("isStarQuery", isStarQuery)
+      .field("First Column", firstColumn)
+      .field("Last Column", lastColumn)
+      .field("Row Count", rowCount)
+      .toString();
+  }
+}
diff --git a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/utils/GoogleSheetsUtils.java b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/utils/GoogleSheetsUtils.java
new file mode 100644
index 0000000..1788fbd
--- /dev/null
+++ b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/utils/GoogleSheetsUtils.java
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.googlesheets.utils;
+
+import com.google.api.client.auth.oauth2.AuthorizationCodeFlow;
+import com.google.api.client.auth.oauth2.Credential;
+import com.google.api.client.auth.oauth2.DataStoreCredentialRefreshListener;
+import com.google.api.client.auth.oauth2.StoredCredential;
+import com.google.api.client.googleapis.auth.oauth2.GoogleAuthorizationCodeFlow;
+import com.google.api.client.googleapis.auth.oauth2.GoogleClientSecrets;
+import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.gson.GsonFactory;
+import com.google.api.client.util.store.DataStore;
+import com.google.api.services.sheets.v4.Sheets;
+import com.google.api.services.sheets.v4.Sheets.Spreadsheets.Values.BatchGet;
+import com.google.api.services.sheets.v4.SheetsScopes;
+import com.google.api.services.sheets.v4.model.AddSheetRequest;
+import com.google.api.services.sheets.v4.model.BatchUpdateSpreadsheetRequest;
+import com.google.api.services.sheets.v4.model.Request;
+import com.google.api.services.sheets.v4.model.Sheet;
+import com.google.api.services.sheets.v4.model.SheetProperties;
+import com.google.api.services.sheets.v4.model.Spreadsheet;
+import com.google.api.services.sheets.v4.model.UpdateValuesResponse;
+import com.google.api.services.sheets.v4.model.ValueRange;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.Typifier;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.googlesheets.DrillDataStore;
+import org.apache.drill.exec.store.googlesheets.GoogleSheetsColumn;
+import org.apache.drill.exec.store.googlesheets.GoogleSheetsStoragePluginConfig;
+import org.apache.drill.exec.store.googlesheets.columns.GoogleSheetsColumnRange;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.parquet.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import static com.google.api.client.util.Strings.isNullOrEmpty;
+
+
+public class GoogleSheetsUtils {
+
+  private static final Logger logger = LoggerFactory.getLogger(GoogleSheetsUtils.class);
+  private static final int SAMPLE_SIZE = 5;
+  private static final JsonFactory JSON_FACTORY = GsonFactory.getDefaultInstance();
+  private static final String UNKNOWN_HEADER = "field_";
+
+  /**
+   * Represents the possible data types found in a GoogleSheets document
+   */
+  public enum DATA_TYPES {
+    /**
+     * Represents a field before the datatype is known
+     */
+    UNKNOWN,
+    /**
+     * A numeric data type, either a float or an int.  These are all
+     * converted to Doubles when projected.
+     */
+    NUMERIC,
+    /**
+     * A string data type
+     */
+    VARCHAR,
+    /**
+     * A field containing a date
+     */
+    DATE,
+    /**
+     * A field containing a time
+     */
+    TIME,
+    /**
+     * A field containing timestamps.
+     */
+    TIMESTAMP
+  }
+
+  /**
+   * Creates an authorized {@link Credential} for use in GoogleSheets queries.
+   * @param config The {@link GoogleSheetsStoragePluginConfig} to be authorized
+   * @param dataStore A {@link DrillDataStore} containing the user's tokens
+   * @param queryUser The current query user's ID.  This should be set to anonymous if user translation is disabled.
+   * @return A validated {@link Credential} object.
+   * @throws IOException If anything goes wrong
+   * @throws GeneralSecurityException If the credentials are invalid
+   */
+  public static Credential authorize(GoogleSheetsStoragePluginConfig config,
+                                     DataStore<StoredCredential> dataStore,
+                                     String queryUser) throws IOException, GeneralSecurityException {
+    GoogleClientSecrets clientSecrets = config.getSecrets();
+    GoogleAuthorizationCodeFlow flow;
+    List<String> scopes = Collections.singletonList(SheetsScopes.SPREADSHEETS);
+
+    if (dataStore == null) {
+      logger.debug("Datastore is null");
+      throw UserException.connectionError()
+        .message("The DrillDataStore is null.  This should not happen.")
+        .build(logger);
+    } else if (dataStore.getDataStoreFactory() == null) {
+      logger.debug("Datastore factory is null");
+      throw UserException.connectionError()
+        .message("The DrillDataStoreFactory is null.  This should not happen.")
+        .build(logger);
+    }
+
+    flow = new GoogleAuthorizationCodeFlow.Builder
+      (GoogleNetHttpTransport.newTrustedTransport(), JSON_FACTORY, clientSecrets, scopes)
+        .setDataStoreFactory(dataStore.getDataStoreFactory())
+        .setAccessType("offline")
+        .build();
+
+    return loadCredential(queryUser, flow, dataStore);
+  }
+
+  public static Credential loadCredential(String userId, GoogleAuthorizationCodeFlow flow, DataStore<StoredCredential> credentialDataStore) {
+    // No requests need to be performed when userId is not specified.
+    if (isNullOrEmpty(userId)) {
+      return null;
+    }
+
+    if (credentialDataStore == null) {
+      return null;
+    }
+    Credential credential = newCredential(userId, flow, credentialDataStore);
+    StoredCredential stored = ((DrillDataStore<StoredCredential>)credentialDataStore).getStoredCredential();
+    if (stored == null) {
+      return null;
+    }
+    credential.setAccessToken(stored.getAccessToken());
+    credential.setRefreshToken(stored.getRefreshToken());
+    credential.setExpirationTimeMilliseconds(stored.getExpirationTimeMilliseconds());
+
+    return credential;
+  }
+
+  /**
+   * Returns a new credential instance based on the given user ID.
+   *
+   * @param userId user ID or {@code null} if not using a persisted credential store
+   */
+  private static Credential newCredential(String userId, AuthorizationCodeFlow flow, DataStore<StoredCredential> credentialDataStore) {
+    Credential.Builder builder =
+      new Credential.Builder(flow.getMethod())
+        .setTransport(flow.getTransport())
+        .setJsonFactory(flow.getJsonFactory())
+        .setTokenServerEncodedUrl(flow.getTokenServerEncodedUrl())
+        .setClientAuthentication(flow.getClientAuthentication())
+        .setRequestInitializer(flow.getRequestInitializer())
+        .setClock(flow.getClock());
+
+    if (credentialDataStore != null) {
+      builder.addRefreshListener(
+        new DataStoreCredentialRefreshListener(userId, credentialDataStore));
+    }
+    builder.getRefreshListeners().addAll(flow.getRefreshListeners());
+    return builder.build();
+  }
+
+
+
+  public static Sheets getSheetsService(GoogleSheetsStoragePluginConfig config,
+                                        DataStore<StoredCredential> dataStore,
+                                        String queryUser)
+    throws IOException, GeneralSecurityException {
+    Credential credential = GoogleSheetsUtils.authorize(config, dataStore, queryUser);
+    return new Sheets.Builder(
+      GoogleNetHttpTransport.newTrustedTransport(), GsonFactory.getDefaultInstance(), credential)
+      .setApplicationName("Drill")
+      .build();
+  }
+
+  /**
+   * Returns a list of the titles of the available spreadsheets within a given Google sheet.
+   * @param service The Google Sheets service
+   * @param sheetID The sheetID for the Google sheet.  This can be obtained from the URL of your Google sheet
+   * @return A list of spreadsheet names within a given Google Sheet
+   * @throws IOException If the Google sheet is unreachable or invalid.
+   */
+  public static List<Sheet> getSheetList(Sheets service, String sheetID) throws IOException {
+    Spreadsheet spreadsheet = service.spreadsheets().get(sheetID).execute();
+    return spreadsheet.getSheets();
+  }
+
+  /**
+   * Converts a column index to A1 notation. Google sheets has a limitation of approx 18k
+   * columns, but that is not enforced here. The column index must be greater than zero or
+   * the function will return null.
+   *
+   * References code found here:
+   * <a href="https://stackoverflow.com/questions/21229180/convert-column-index-into-corresponding-column-letter">Stack Overflow Article</a>
+   * @param column The column index for the desired column. Must be greater than zero
+   * @return The A1 representation of the column index.
+   */
+  public static String columnToLetter(int column) {
+    if (column <= 0) {
+      return null;
+    }
+
+    int temp;
+    StringBuilder letter = new StringBuilder();
+    while (column > 0) {
+      temp = (column - 1) % 26;
+      letter.insert(0, (char) (temp + 65));
+      column = (column - temp - 1) / 26;
+    }
+    return letter.toString();
+  }
+
+  /**
+   * Given a column reference in A1 notation, this function will
+   * return the column numeric index. GoogleSheets has a limit of approx
+   * 18k columns, but that is not enforced here.
+   *
+   * References code found here:
+   * <a href="https://stackoverflow.com/questions/21229180/convert-column-index-into-corresponding-column-letter">Stack Overflow Article</a>
+   * @param letter The desired column in A1 notation
+   * @return The index of the supplied column
+   */
+  public static int letterToColumnIndex(String letter) {
+    // Make sure the letters are all upper case.
+    letter = letter.toUpperCase();
+    int column = 0;
+    int length = letter.length();
+    for (int i = 0; i < length; i++) {
+      column += (Character.codePointAt(letter, i) - 64) * (int)Math.pow(26, length - i - 1);
+    }
+    return column;
+  }
+
+  /**
+   * This function will be used to build the schema for Drill.  As Google sheets does
+   * @param service An authenticated Google Sheets Service
+   * @param sheetID The Sheet ID for the Google Sheet (Can be found in the Sheet URL)
+   * @param tabName The tab name of the actual spreadsheet you want to query
+   * @return A nested list of the first five rows of the dataset.
+   * @throws IOException If the request fails, throw an IOException
+   */
+  public static List<List<Object>> getFirstRows (Sheets service, String sheetID, String tabName) throws IOException {
+    String range = tabName + "!1:" + SAMPLE_SIZE;
+    return service.spreadsheets().values().get(sheetID, range).execute().getValues();
+  }
+
+  /**
+   * Returns a 2D table of Objects representing the given range in A1 notation. Note that this
+   * function cannot be used with multiple ranges.  If you are trying to retrieve multiple groups
+   * of columns, you must use the getBatchData function.
+   * @param service The Authenticated GoogleSheets service
+   * @param sheetID The GoogleSheet ID.  This can be found in the Sheet URL
+   * @param range The range in A1 notation.
+   * @return  A 2D table of Objects representing the given range.
+   * @throws IOException If the request fails, throw an IOException.
+   */
+  public static List<List<Object>> getDataFromRange(Sheets service, String sheetID, String range) throws IOException {
+    return service.spreadsheets().values().get(sheetID, range).execute().getValues();
+  }
+
+  /**
+   * This function is used to get data when projection is pushed down to Google Sheets.
+   * @param service The Authenticated GoogleSheets service
+   * @param sheetID The GoogleSheet ID.  This can be found in the Sheet URL
+   * @param ranges The list of ranges
+   * @throws IOException If anything goes wrong, IOException will be thrown
+   */
+  public static List<List<Object>> getBatchData(Sheets service, String sheetID, List<String> ranges) throws IOException {
+    logger.debug("Getting ranges: {}", ranges);
+    BatchGet request = service.spreadsheets().values().batchGet(sheetID).setRanges(ranges);
+    List<ValueRange> response = request.execute().getValueRanges();
+
+    List<List<Object>> results  = new ArrayList<>();
+    // In Google's infinite wisdom when designing this API, the results
+    // are returned in a completely different fashion than when projection is not
+    // pushed down to Google Sheets. Specifically, if you use the regular values() to retrieve
+    // values from a GoogleSheet, you get a List of rows.  Whereas if you use the BatchGet,
+    // you get a list of columns, sort of.  Except these columns are embedded in a bunch of
+    // other debris from which you must extract the actual data.
+    //
+    // It should be noted that the GoogleSheets API does not accept multiple ranges in the
+    // request, so it is necessary to use the batch request.
+    for (int rowIndex = 0; rowIndex < ((ArrayList<?>) response.get(0).get("values")).size(); rowIndex++) {
+      List<Object> row = new ArrayList<>();
+      for (int colIndex = 0; colIndex < response.size(); colIndex++) {
+        try {
+          Object value = ((ArrayList<?>) ((ArrayList<?>) response.get(colIndex).get("values")).get(rowIndex)).get(0);
+          row.add(value);
+        } catch (IndexOutOfBoundsException | NullPointerException e) {
+          row.add(null);
+        }
+      }
+      results.add(row);
+    }
+    return results;
+  }
+
+  /**
+   *
+   * @param sampleData This represents a sample of the first few rows of data which will be used to build the schema.
+   * @param projectedColumns A list of projected columns
+   * @param allTextMode If true, the columns will all be of the VARCHAR type
+   * @return A map of the column name and {@link GoogleSheetsColumn} column for every projected column.
+   */
+  public static Map<String, GoogleSheetsColumn> getColumnMap(List<List<Object>> sampleData, List<SchemaPath> projectedColumns, boolean allTextMode) {
+    // For now, we assume that the column headers are in the first row
+    int emptyColumnCount = 0;
+    List<String> headers = new ArrayList<>();
+    Map<String, DATA_TYPES> dataTypes = new HashMap<>();
+
+    for (Object rawHeader : sampleData.get(0)) {
+      String header = (String) rawHeader;
+
+      // If the header row is empty, assign a value of `field_n` where n is the unknown header count.
+      if (Strings.isNullOrEmpty(header)) {
+        header = UNKNOWN_HEADER + emptyColumnCount;
+        emptyColumnCount++;
+      }
+      headers.add(header);
+      if (allTextMode) {
+        dataTypes.put(header, DATA_TYPES.VARCHAR);
+      } else {
+        dataTypes.put(header, DATA_TYPES.UNKNOWN);
+      }
+    }
+
+    if (!allTextMode) {
+      for (int rowIndex = 1; rowIndex < sampleData.size(); rowIndex++) {
+        for (int colIndex = 0; colIndex < sampleData.get(rowIndex).size(); colIndex++) {
+          updateDataType(headers.get(colIndex), dataTypes, sampleData.get(rowIndex).get(colIndex).toString());
+        }
+      }
+    }
+    // At this point, we have inferred the columns.  We will return a list of {@link GoogleSheetsColumn} which
+    // we will need later for projection pushdown and other schema creation activities.
+    Map<String, GoogleSheetsColumn> columnMap = new LinkedHashMap<>();
+    int colCount = 0;
+    for (String header: headers) {
+      // When building the schema map, we only want to include projected columns.  This will be important later
+      // when we build the range request.
+      if (Utilities.isStarQuery(projectedColumns) || isProjected(projectedColumns, header)) {
+        GoogleSheetsColumn column = new GoogleSheetsColumn(header, dataTypes.get(header), headers.indexOf(header), colCount);
+        columnMap.put(header, column);
+        colCount++;
+      }
+    }
+    return columnMap;
+  }
+
+  public static List<GoogleSheetsColumnRange> getProjectedRanges(String sheetName, Map<String, GoogleSheetsColumn> columnMap) {
+    List<GoogleSheetsColumnRange> projectedRanges = new ArrayList<>();
+    int lastIndex = -1;
+    int currentIndex;
+    GoogleSheetsColumnRange currentRange = new GoogleSheetsColumnRange(sheetName);
+    for (GoogleSheetsColumn column : columnMap.values()) {
+      currentIndex = column.getColumnIndex();
+
+      // Edge case for first range
+      if (currentRange.getStartColIndex() == null) {
+        currentRange = currentRange.setStartIndex(currentIndex);
+      }
+
+      // End the range and create a new one.
+      if (currentIndex != (lastIndex + 1) && lastIndex != -1) {
+        currentRange.setEndIndex(lastIndex);
+        projectedRanges.add(currentRange);
+        currentRange = new GoogleSheetsColumnRange(sheetName)
+          .setStartIndex(currentIndex);
+      }
+      lastIndex = currentIndex;
+    }
+    currentRange = currentRange.setEndIndex(lastIndex);
+    projectedRanges.add(currentRange);
+    return projectedRanges;
+  }
+
+  /**
+   * Returns true if the column is projected, false if not.
+   * @param projectedColumns A list of projected columns AKA the haystack.
+   * @param columnName The column name AKA the needle
+   * @return True if the needle is in the haystack, false if not.
+   */
+  public static boolean isProjected(List<SchemaPath> projectedColumns, String columnName) {
+    // Star queries project everything, so return true.  Technically this
+    // might not always be correct, in the case that a query projects a non-existent column.
+    if (Utilities.isStarQuery(projectedColumns)) {
+      return true;
+    }
+    for (SchemaPath path : projectedColumns) {
+      if (path.getAsNamePart().getName().contains(columnName)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Builds a Drill Schema from a Map of GoogleSheetsColumns.
+   * @param columnMap A map of {@link GoogleSheetsColumn} containing the schema info.
+   * @return A populated {@link TupleMetadata} schema
+   */
+  public static TupleMetadata buildSchema(Map<String, GoogleSheetsColumn> columnMap) {
+    SchemaBuilder builder = new SchemaBuilder();
+    for (GoogleSheetsColumn column : columnMap.values()) {
+      builder.addNullable(column.getColumnName(), column.getDrillDataType());
+    }
+    return builder.build();
+  }
+
+  /**
+   * Infers the datatype of an unknown string.
+   * @param data An input String of unknown type.
+   * @return The {@link DATA_TYPES} of the unknown string.
+   */
+  public static DATA_TYPES inferDataType (String data) {
+    Entry<Class, String> result = Typifier.typify(data);
+    String dataType = result.getKey().getSimpleName();
+
+    // If the string is empty, return UNKNOWN
+    if (StringUtils.isEmpty(data)) {
+      return DATA_TYPES.UNKNOWN;
+    } else if (dataType.equalsIgnoreCase("Double")) {
+      return DATA_TYPES.NUMERIC;
+    } else if(dataType.equalsIgnoreCase("LocalDateTime")) {
+      return DATA_TYPES.TIMESTAMP;
+    } else if (dataType.equalsIgnoreCase("LocalDate")) {
+      return DATA_TYPES.DATE;
+    } else if (dataType.equalsIgnoreCase("LocalTime")) {
+      return DATA_TYPES.TIME;
+    } else {
+      return DATA_TYPES.VARCHAR;
+    }
+  }
+
+  public static void updateDataType(String columnName, Map<String, DATA_TYPES> dataTypesMap, String value) {
+    if (StringUtils.isEmpty(value)) {
+      return;
+    }
+
+    // Get the data type of the unknown value
+    DATA_TYPES probableDataType = inferDataType(value);
+    DATA_TYPES columnDataType = dataTypesMap.get(columnName);
+
+    // If the column data type matches the new value's data type, make no changes
+    if (probableDataType == columnDataType) {
+      return;
+    }
+
+    // If the column was unknown assign it the data type that we found
+    if (columnDataType == DATA_TYPES.UNKNOWN) {
+      dataTypesMap.put(columnName, probableDataType);
+    } else if (columnDataType == DATA_TYPES.NUMERIC && probableDataType == DATA_TYPES.VARCHAR) {
+      // If we have a column that is thought to be numeric, we will continue to consider it numeric unless
+      // we encounter a string, at which point we will convert it to a String.
+      dataTypesMap.put(columnName, DATA_TYPES.VARCHAR);
+    }
+  }
+
+  /**
+   * Adds a new tab to an existing GoogleSheet document.
+   *
+   * @param service   An authenticated GoogleSheet service
+   * @param sheetName The GoogleSheet name of the document
+   * @param tabName   The name of the tab you wish to add to the GoogleSheet document
+   * @throws IOException Throws an IOException if anything goes wrong.
+   */
+  public static void addTabToGoogleSheet(Sheets service, String sheetName, String tabName)
+    throws IOException {
+    List<Request> requests = new ArrayList<>();
+    requests.add(new Request()
+      .setAddSheet(new AddSheetRequest().setProperties(new SheetProperties()
+        .setTitle(tabName)
+        .setIndex(0))));
+    BatchUpdateSpreadsheetRequest body = new BatchUpdateSpreadsheetRequest().setRequests(requests);
+    service.spreadsheets().batchUpdate(sheetName, body).execute();
+  }
+
+  /**
+   * Accepts a list of data and writes this data to a GoogleSheet document.
+   * @param service An authenticated GoogleSheet service
+   * @param sheetID The SheetID.  This can be obtained from the URL of the GoogleSheet Document
+   * @param tabName The tab name within the aforementioned GoogleSheet
+   * @param data A list of rows of the data to be inserted.
+   * @throws IOException If anything goes wrong, throw an IO exception
+   */
+  public static void writeDataToGoogleSheet(Sheets service, String sheetID, String tabName, List<List<Object>> data)
+    throws IOException {
+    String range = tabName + "!A1";
+    ValueRange body = new ValueRange()
+      .setValues(data)
+      .setMajorDimension("ROWS");
+
+    UpdateValuesResponse result = service.spreadsheets().values().update(sheetID, range, body)
+        .setValueInputOption("RAW")
+        .execute();
+  }
+}
diff --git a/contrib/storage-googlesheets/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-googlesheets/src/main/resources/bootstrap-storage-plugins.json
new file mode 100644
index 0000000..92d9eb4
--- /dev/null
+++ b/contrib/storage-googlesheets/src/main/resources/bootstrap-storage-plugins.json
@@ -0,0 +1,28 @@
+{
+  "storage":{
+    "googlesheets" : {
+      "type": "googlesheets",
+      "allTextMode": true,
+      "extractHeaders": true,
+      "oAuthConfig": {
+        "callbackURL": "http://localhost:8047/credentials/googlesheets/update_oauth2_authtoken",
+        "authorizationURL": "https://accounts.google.com/o/oauth2/auth",
+        "authorizationParams": {
+          "response_type": "code",
+          "scope": "https://www.googleapis.com/auth/spreadsheets"
+        }
+      },
+      "credentialsProvider": {
+        "credentialsProviderType": "PlainCredentialsProvider",
+        "credentials": {
+          "clientID": "<YOUR CLIENT ID>",
+          "clientSecret": "<YOUR CLIENT SECRET>",
+          "tokenURI": "https://oauth2.googleapis.com/token https://www.googleapis.com/auth/drive.readonly"
+        },
+        "userCredentials": {}
+      },
+      "enabled": false,
+      "authMode": "SHARED_USER"
+    }
+  }
+}
diff --git a/contrib/storage-googlesheets/src/main/resources/drill-module.conf b/contrib/storage-googlesheets/src/main/resources/drill-module.conf
new file mode 100644
index 0000000..979b1f4
--- /dev/null
+++ b/contrib/storage-googlesheets/src/main/resources/drill-module.conf
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#  This file tells Drill to consider this module when class path scanning.
+#  This file can also include any supplementary configuration information.
+#  This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+drill.classpath.scanning: {
+  packages += "org.apache.drill.exec.store.googlesheets"
+}
diff --git a/contrib/storage-googlesheets/src/test/README.md b/contrib/storage-googlesheets/src/test/README.md
new file mode 100644
index 0000000..b8d5ab8
--- /dev/null
+++ b/contrib/storage-googlesheets/src/test/README.md
@@ -0,0 +1,31 @@
+# Testing Procedures for Google Sheets Plugin
+The GS plugin is a little tricky to test because it makes extensive use of the Google APIs. The plugin is designed to make extensive use of static functions in the `GoogleSheetsUtils` class which can be tested without a live connection to Google Sheets.  
+
+This plugin makes extensive use of the `GoogleSheetUtils` class to test the various functions and steps.  The functions which do not require a live connection to GoogleSheets are mostly covered by the unit tests in `TestGoogleSheetUtils` and `TestRangeBuilder`.  
+
+# Testing Actual Queries
+
+## Step One:  Obtaining Credentials
+To run the end to end tests in `TestGoogleSheetsQueries` and `TestGoogleSheetsWriter` you first have to provide credentials in the form of the client id, client secret, access token, and refresh token.  The `client_id` and `client_secret` tokens can be obtained by following the instructions in the main `README` in the root directory. 
+
+To obtain the access and refresh tokens, build Drill, add a GoogleSheets plugin and use the `clientID` and `clientSecret` to authorize your Drill.  Then look in your Drill folder for the files where Drill stores OAuth tokens and you will find your access and refresh tokens.
+
+## Step Two:  Saving Your Tokens
+Now that you have the actual tokens, the next step is to actually save them in a file called `oauth_tokens.json` in the `/test/resources/token` directory.  Create the file and copy the JSON below, filling in your tokens. 
+
+```json
+{
+  "client_id": "<your client id>",
+  "client_secret": "<your client secret>",
+  "access_token":"<your access token>",
+  "refresh_token":"<your refresh token>",
+  "sheet_id": "<your sheet id>"
+}
+
+```
+
+## Step 3:  Populating Data
+The final step is to create the actual test GoogleSheet.  In the `test/resources` document, there is a file called `Drill Test Data.xlsx`.  Upload this file to GoogleSheets.  Once you have done so, simply add your sheet ID to the tokens.json file that you created earlier.
+
+Once this is done, you should be able to run the tests in `TestGoogleSheetsWriter` and `TestGoogleSheetsQueries`. 
+
diff --git a/contrib/storage-googlesheets/src/test/java/org/apache/drill/exec/store/googlesheets/TestGoogleSheetUtils.java b/contrib/storage-googlesheets/src/test/java/org/apache/drill/exec/store/googlesheets/TestGoogleSheetUtils.java
new file mode 100644
index 0000000..9814ccf
--- /dev/null
+++ b/contrib/storage-googlesheets/src/test/java/org/apache/drill/exec/store/googlesheets/TestGoogleSheetUtils.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.googlesheets;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.googlesheets.columns.GoogleSheetsColumnRange;
+import org.apache.drill.exec.store.googlesheets.utils.GoogleSheetsUtils;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestGoogleSheetUtils {
+
+  @Test
+  public void testSchemaInference() {
+    List<List<Object>> data = new ArrayList<>();
+    List<Object> row = new ArrayList<>(Arrays.asList("Col1", "Col2", "Col3"));
+    SchemaPath sp = new SchemaPath(SchemaPath.parseFromString("Col1"));
+    List<SchemaPath> projectedColumns = new ArrayList<>(
+      Arrays.asList(
+        new SchemaPath(SchemaPath.parseFromString("Col1")),
+        new SchemaPath(SchemaPath.parseFromString("Col2")),
+        new SchemaPath(SchemaPath.parseFromString("Col3"))
+      )
+    );
+    data.add(row);
+
+    row = new ArrayList<>(Arrays.asList("Rosaline Thales", 1));
+    data.add(row);
+
+    row = new ArrayList<>(Arrays.asList("Abdolhossein Detlev", "2.0001", "2020-04-30"));
+    data.add(row);
+
+    row = new ArrayList<>(Arrays.asList("Yosuke  Simon", "", "2020-05-22"));
+    data.add(row);
+
+    row = new ArrayList<>(Arrays.asList("", "4", "2020-06-30"));
+    data.add(row);
+
+    Map<String, GoogleSheetsColumn> columnMap = GoogleSheetsUtils.getColumnMap(data, projectedColumns, false);
+    assertEquals(3, columnMap.size());
+    assertEquals(MinorType.VARCHAR, columnMap.get("Col1").getDrillDataType());
+    assertEquals(MinorType.FLOAT8, columnMap.get("Col2").getDrillDataType());
+    assertEquals(MinorType.DATE, columnMap.get("Col3").getDrillDataType());
+  }
+
+  @Test
+  public void testBuildSchema() {
+    List<List<Object>> data = new ArrayList<>();
+    List<SchemaPath> projectedColumns = new ArrayList<>(
+      Arrays.asList(
+        new SchemaPath(SchemaPath.parseFromString("Col1")),
+        new SchemaPath(SchemaPath.parseFromString("Col2")),
+        new SchemaPath(SchemaPath.parseFromString("Col3"))
+      )
+    );
+    List<Object> row = new ArrayList<>(Arrays.asList("Col1", "Col2", "Col3"));
+    data.add(row);
+
+    row = new ArrayList<>(Arrays.asList("Rosaline Thales", 1));
+    data.add(row);
+
+    row = new ArrayList<>(Arrays.asList("Abdolhossein Detlev", "2.0001", "2020-04-30"));
+    data.add(row);
+
+    row = new ArrayList<>(Arrays.asList("Yosuke  Simon", "", "2020-05-22"));
+    data.add(row);
+
+    row = new ArrayList<>(Arrays.asList("", "4", "2020-06-30"));
+    data.add(row);
+
+    Map<String, GoogleSheetsColumn> columnMap = GoogleSheetsUtils.getColumnMap(data, projectedColumns, false);
+    TupleMetadata actualSchema = GoogleSheetsUtils.buildSchema(columnMap);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addNullable("Col1", MinorType.VARCHAR)
+      .addNullable("Col2", MinorType.FLOAT8)
+      .addNullable("Col3", MinorType.DATE)
+      .build();
+
+    assertEquals(actualSchema, expectedSchema);
+  }
+
+  @Test
+  public void testColumnProjector() {
+    Map<String, GoogleSheetsColumn> columnMap = new LinkedHashMap<>();
+    columnMap.put("f1", new GoogleSheetsColumn("f1", GoogleSheetsUtils.DATA_TYPES.UNKNOWN, 0, 0));
+    columnMap.put("f2", new GoogleSheetsColumn("f2", GoogleSheetsUtils.DATA_TYPES.UNKNOWN, 1, 1));
+    columnMap.put("f3", new GoogleSheetsColumn("f3", GoogleSheetsUtils.DATA_TYPES.UNKNOWN, 2, 2));
+    columnMap.put("f4", new GoogleSheetsColumn("f4", GoogleSheetsUtils.DATA_TYPES.UNKNOWN, 3, 3));
+    columnMap.put("f6", new GoogleSheetsColumn("f6", GoogleSheetsUtils.DATA_TYPES.UNKNOWN, 5, 4));
+    columnMap.put("f9", new GoogleSheetsColumn("f9", GoogleSheetsUtils.DATA_TYPES.UNKNOWN, 8, 5));
+    columnMap.put("f10", new GoogleSheetsColumn("f10", GoogleSheetsUtils.DATA_TYPES.UNKNOWN, 9, 6));
+
+    List<GoogleSheetsColumnRange> results = GoogleSheetsUtils.getProjectedRanges("Sheet1", columnMap);
+    assertEquals(3, results.size());
+  }
+
+  @Test
+  public void testColumnProjectorWithSingleColumns() {
+    Map<String, GoogleSheetsColumn> columnMap = new LinkedHashMap<>();
+    columnMap.put("f1", new GoogleSheetsColumn("f1", GoogleSheetsUtils.DATA_TYPES.UNKNOWN, 0,0));
+    columnMap.put("f2", new GoogleSheetsColumn("f2", GoogleSheetsUtils.DATA_TYPES.UNKNOWN, 2,1));
+    columnMap.put("f3", new GoogleSheetsColumn("f3", GoogleSheetsUtils.DATA_TYPES.UNKNOWN, 4,2));
+    columnMap.put("f4", new GoogleSheetsColumn("f4", GoogleSheetsUtils.DATA_TYPES.UNKNOWN, 6, 3));
+    columnMap.put("f6", new GoogleSheetsColumn("f6", GoogleSheetsUtils.DATA_TYPES.UNKNOWN, 7, 4));
+    columnMap.put("f9", new GoogleSheetsColumn("f9", GoogleSheetsUtils.DATA_TYPES.UNKNOWN, 8, 5));
+    columnMap.put("f10", new GoogleSheetsColumn("f10", GoogleSheetsUtils.DATA_TYPES.UNKNOWN, 9, 6));
+
+    List<GoogleSheetsColumnRange> results = GoogleSheetsUtils.getProjectedRanges("Sheet1", columnMap);
+    assertEquals(4, results.size());
+  }
+
+  @Test
+  public void testColumnConversion() {
+    assertEquals("A", GoogleSheetsUtils.columnToLetter(1));
+    assertEquals("B", GoogleSheetsUtils.columnToLetter(2));
+    assertEquals("AA", GoogleSheetsUtils.columnToLetter(27));
+    assertEquals("CV", GoogleSheetsUtils.columnToLetter(100));
+    // Close to largest possible column index
+    assertEquals("ZWZ", GoogleSheetsUtils.columnToLetter(18200));
+  }
+
+  @Test
+  public void testA1toIntResolution() {
+    assertEquals(1, GoogleSheetsUtils.letterToColumnIndex("A"));
+    assertEquals(2, GoogleSheetsUtils.letterToColumnIndex("B"));
+    assertEquals(27, GoogleSheetsUtils.letterToColumnIndex("AA"));
+    assertEquals(100, GoogleSheetsUtils.letterToColumnIndex("CV"));
+    // Close to largest possible column index
+    assertEquals(18200, GoogleSheetsUtils.letterToColumnIndex("ZWZ"));
+  }
+}
diff --git a/contrib/storage-googlesheets/src/test/java/org/apache/drill/exec/store/googlesheets/TestGoogleSheetsLimitPushdown.java b/contrib/storage-googlesheets/src/test/java/org/apache/drill/exec/store/googlesheets/TestGoogleSheetsLimitPushdown.java
new file mode 100644
index 0000000..b644285
--- /dev/null
+++ b/contrib/storage-googlesheets/src/test/java/org/apache/drill/exec/store/googlesheets/TestGoogleSheetsLimitPushdown.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.googlesheets;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.drill.common.util.DrillFileUtils;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.oauth.PersistentTokenTable;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.drill.shaded.guava.com.google.common.io.Files;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.fail;
+
+@Ignore("This test requires a live connection to Google Sheets.  Please run tests manually.")
+public class TestGoogleSheetsLimitPushdown extends ClusterTest {
+
+  private static final String AUTH_URI = "https://accounts.google.com/o/oauth2/auth";
+  private static final String TOKEN_URI = "https://oauth2.googleapis.com/token";
+  private static final List<String> REDIRECT_URI = new ArrayList<>(Arrays.asList("urn:ietf:wg:oauth:2.0:oob", "http://localhost"));
+
+  private static StoragePluginRegistry pluginRegistry;
+  private static String accessToken;
+  private static String refreshToken;
+  private static String sheetID;
+
+  @BeforeClass
+  public static void init() throws Exception {
+
+    String oauthJson = Files.asCharSource(DrillFileUtils.getResourceAsFile("/tokens/oauth_tokens.json"), Charsets.UTF_8).read();
+
+    ObjectMapper mapper = new ObjectMapper();
+    Map<String,String> tokenMap = mapper.readValue(oauthJson, Map.class);
+
+    String clientID = tokenMap.get("client_id");
+    String clientSecret = tokenMap.get("client_secret");
+    accessToken = tokenMap.get("access_token");
+    refreshToken = tokenMap.get("refresh_token");
+    sheetID = tokenMap.get("sheet_id");
+
+    ClusterFixtureBuilder builder = new ClusterFixtureBuilder(dirTestWatcher)
+      .configProperty(ExecConstants.HTTP_ENABLE, true)
+      .configProperty(ExecConstants.HTTP_PORT_HUNT, true)
+      .configProperty(ExecConstants.IMPERSONATION_ENABLED, true);
+
+    startCluster(builder);
+
+    int portNumber = cluster.drillbit().getWebServerPort();
+
+    pluginRegistry = cluster.drillbit().getContext().getStorage();
+    GoogleSheetsStoragePluginConfig config = GoogleSheetsStoragePluginConfig.builder()
+      .clientID(clientID)
+      .clientSecret(clientSecret)
+      .redirectUris(REDIRECT_URI)
+      .authUri(AUTH_URI)
+      .tokenUri(TOKEN_URI)
+      .allTextMode(false)
+      .extractHeaders(true)
+      .build();
+
+    config.setEnabled(true);
+    pluginRegistry.validatedPut("googlesheets", config);
+  }
+
+  @Test
+  public void testLimit() throws Exception {
+    try {
+      initializeTokens();
+    } catch (PluginException e) {
+      fail(e.getMessage());
+    }
+
+    String sql = String.format("SELECT * FROM googlesheets.`%s`.`MixedSheet` LIMIT 5", sheetID);
+    queryBuilder()
+      .sql(sql)
+      .planMatcher()
+      .include("Limit", "maxRecords=5")
+      .match();
+  }
+
+  @Test
+  public void testLimitWithOrderBy() throws Exception {
+    try {
+      initializeTokens();
+    } catch (PluginException e) {
+      fail(e.getMessage());
+    }
+
+    // Limit should not be pushed down for this example due to the sort
+    String sql = String.format("SELECT * FROM googlesheets.`%s`.`MixedSheet` ORDER BY Col2 LIMIT 4", sheetID);
+    queryBuilder()
+      .sql(sql)
+      .planMatcher()
+      .include("Limit", "maxRecords=-1")
+      .match();
+  }
+
+  @Test
+  public void testLimitWithOffset() throws Exception {
+    try {
+      initializeTokens();
+    } catch (PluginException e) {
+      fail(e.getMessage());
+    }
+
+    // Limit should be pushed down and include the offset
+    String sql = String.format("SELECT * FROM googlesheets.`%s`.`MixedSheet` LIMIT 4 OFFSET 5", sheetID);
+    queryBuilder()
+      .sql(sql)
+      .planMatcher()
+      .include("Limit", "maxRecords=9")
+      .match();
+  }
+
+  /**
+   * This function is used for testing only.  It initializes a {@link PersistentTokenTable} and populates it
+   * with a valid access and refresh token.
+   * @throws PluginException If anything goes wrong
+   */
+  private void initializeTokens() throws PluginException {
+    GoogleSheetsStoragePlugin plugin = (GoogleSheetsStoragePlugin) pluginRegistry.getPlugin("googlesheets");
+    plugin.initializeTokenTableForTesting();
+    PersistentTokenTable tokenTable = plugin.getTokenTable();
+    tokenTable.setAccessToken(accessToken);
+    tokenTable.setRefreshToken(refreshToken);
+    tokenTable.setExpiresIn("50000");
+  }
+}
diff --git a/contrib/storage-googlesheets/src/test/java/org/apache/drill/exec/store/googlesheets/TestGoogleSheetsQueries.java b/contrib/storage-googlesheets/src/test/java/org/apache/drill/exec/store/googlesheets/TestGoogleSheetsQueries.java
new file mode 100644
index 0000000..d1e6a61
--- /dev/null
+++ b/contrib/storage-googlesheets/src/test/java/org/apache/drill/exec/store/googlesheets/TestGoogleSheetsQueries.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.googlesheets;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.util.DrillFileUtils;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.oauth.PersistentTokenTable;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.drill.shaded.guava.com.google.common.io.Files;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * This class tests the Google Sheets plugin. Since GoogleSheets is essentially an API, these tests
+ * must be run with a live internet connection.  These tests use test data which can be found in the
+ * resources directory.
+ */
+@Ignore("Requires live connection to Google Sheets.  Please run tests manually.")
+public class TestGoogleSheetsQueries extends ClusterTest {
+
+  private static final String AUTH_URI = "https://accounts.google.com/o/oauth2/auth";
+  private static final String TOKEN_URI = "https://oauth2.googleapis.com/token";
+  private static final List<String> REDIRECT_URI = new ArrayList<>(Arrays.asList("urn:ietf:wg:oauth:2.0:oob", "http://localhost"));
+
+  private static StoragePluginRegistry pluginRegistry;
+  private static String accessToken;
+  private static String refreshToken;
+  private static String sheetID;
+  private static String clientID;
+  private static String clientSecret;
+
+  @BeforeClass
+  public static void init() throws Exception {
+
+    String oauthJson = Files.asCharSource(DrillFileUtils.getResourceAsFile("/tokens/oauth_tokens.json"), Charsets.UTF_8).read();
+
+    ObjectMapper mapper = new ObjectMapper();
+    Map<String,String> tokenMap = mapper.readValue(oauthJson, Map.class);
+
+    clientID = tokenMap.get("client_id");
+    clientSecret = tokenMap.get("client_secret");
+    accessToken = tokenMap.get("access_token");
+    refreshToken = tokenMap.get("refresh_token");
+    sheetID = tokenMap.get("sheet_id");
+
+    ClusterFixtureBuilder builder = new ClusterFixtureBuilder(dirTestWatcher)
+      .configProperty(ExecConstants.HTTP_ENABLE, true)
+      .configProperty(ExecConstants.HTTP_PORT_HUNT, true)
+      .configProperty(ExecConstants.IMPERSONATION_ENABLED, true);
+
+    startCluster(builder);
+
+    pluginRegistry = cluster.drillbit().getContext().getStorage();
+    GoogleSheetsStoragePluginConfig config = GoogleSheetsStoragePluginConfig.builder()
+      .clientID(clientID)
+      .clientSecret(clientSecret)
+      .redirectUris(REDIRECT_URI)
+      .authUri(AUTH_URI)
+      .tokenUri(TOKEN_URI)
+      .allTextMode(false)
+      .extractHeaders(true)
+      .build();
+    config.setEnabled(true);
+    pluginRegistry.validatedPut("googlesheets", config);
+  }
+
+  @Test
+  public void testStarQuery() throws Exception {
+    try {
+      initializeTokens("googlesheets");
+    } catch (PluginException e) {
+      fail(e.getMessage());
+    }
+
+    String sql = String.format("SELECT * FROM googlesheets.`%s`.`MixedSheet` WHERE `Col2` < 6.0", sheetID);
+    RowSet results = queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addNullable("Col1", MinorType.VARCHAR)
+      .addNullable("Col2", MinorType.FLOAT8)
+      .addNullable("Col3", MinorType.DATE)
+      .buildSchema();
+
+   RowSet expected = client.rowSetBuilder(expectedSchema)
+      .addRow("Rosaline  Thales", 1.0, null)
+      .addRow("Abdolhossein  Detlev", 2.0001, LocalDate.parse("2020-04-30"))
+      .addRow(null, 4.0, LocalDate.parse("2020-06-30"))
+      .addRow("Yunus  Elena", 3.5, LocalDate.parse("2021-01-15"))
+      .addRow("Swaran  Ohiyesa", -63.8, LocalDate.parse("2021-04-08"))
+      .addRow("Kalani  Godabert", 0.0, LocalDate.parse("2021-06-28"))
+      .addRow("Caishen  Origenes", 5.0E-7, LocalDate.parse("2021-07-09"))
+      .addRow("Toufik  Gurgen", 2.0, LocalDate.parse("2021-11-05"))
+      .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testProjectPushdown() throws Exception {
+    try {
+      initializeTokens("googlesheets");
+    } catch (PluginException e) {
+      fail(e.getMessage());
+    }
+
+    String sql = String.format("SELECT Col1, Col3 FROM googlesheets.`%s`.`MixedSheet` LIMIT 5", sheetID);
+    queryBuilder()
+      .sql(sql)
+      .planMatcher()
+      .include("Project", "columns=\\[`Col1`, `Col3`\\]", "Limit", "maxRecords=5")
+      .match();
+  }
+
+
+  @Test
+  public void testWithExplicitColumns() throws Exception {
+    try {
+      initializeTokens("googlesheets");
+    } catch (PluginException e) {
+      fail(e.getMessage());
+    }
+
+    String sql = String.format("SELECT Col1, Col3 FROM googlesheets.`%s`.`MixedSheet` WHERE `Col2` < 6.0", sheetID);
+    RowSet results = queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addNullable("Col1", MinorType.VARCHAR)
+      .addNullable("Col3", MinorType.DATE)
+      .buildSchema();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+      .addRow("Rosaline  Thales", null)
+      .addRow("Abdolhossein  Detlev", LocalDate.parse("2020-04-30"))
+      .addRow(null, LocalDate.parse("2020-06-30"))
+      .addRow("Yunus  Elena", LocalDate.parse("2021-01-15"))
+      .addRow("Swaran  Ohiyesa", LocalDate.parse("2021-04-08"))
+      .addRow("Kalani  Godabert",LocalDate.parse("2021-06-28"))
+      .addRow("Caishen  Origenes", LocalDate.parse("2021-07-09"))
+      .addRow("Toufik  Gurgen", LocalDate.parse("2021-11-05"))
+      .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testAggregateQuery() throws Exception {
+    try {
+      initializeTokens("googlesheets");
+    } catch (PluginException e) {
+      fail(e.getMessage());
+    }
+
+    String sql = String.format("SELECT EXTRACT(YEAR FROM Col3) AS event_year, COUNT(*) AS event_count FROM googlesheets.`%s`.`MixedSheet` GROUP BY event_year", sheetID);
+    List<QueryDataBatch> results = queryBuilder().sql(sql).results();
+
+    for(QueryDataBatch b : results){
+      b.release();
+    }
+    assertEquals(4, results.size());
+  }
+
+  @Test
+  public void testSerDe() throws Exception {
+    try {
+      initializeTokens("googlesheets");
+    } catch (PluginException e) {
+      fail(e.getMessage());
+    }
+
+    String sql = String.format("SELECT COUNT(*) FROM googlesheets.`%s`.`MixedSheet`", sheetID);
+    String plan = queryBuilder().sql(sql).explainJson();
+    long cnt = queryBuilder().physical(plan).singletonLong();
+    assertEquals("Counts should match", 25L, cnt);
+  }
+
+  @Test
+  public void testAllTextMode() throws Exception {
+    try {
+      initializeTokens("googlesheets");
+    } catch (PluginException e) {
+      fail(e.getMessage());
+    }
+
+    GoogleSheetsStoragePluginConfig config = GoogleSheetsStoragePluginConfig.builder()
+      .clientID(clientID)
+      .clientSecret(clientSecret)
+      .redirectUris(REDIRECT_URI)
+      .authUri(AUTH_URI)
+      .tokenUri(TOKEN_URI)
+      .allTextMode(true)
+      .extractHeaders(true)
+      .build();
+    config.setEnabled(true);
+    pluginRegistry.validatedPut("googlesheets", config);
+
+    String sql = String.format("SELECT * FROM googlesheets.`%s`.`MixedSheet` LIMIT 5", sheetID);
+    RowSet results = queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addNullable("Col1", MinorType.VARCHAR)
+      .addNullable("Col2", MinorType.VARCHAR)
+      .addNullable("Col3", MinorType.VARCHAR)
+      .buildSchema();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+      .addRow("Rosaline  Thales", "1", null)
+      .addRow("Abdolhossein  Detlev", "2.0001", "2020-04-30")
+      .addRow("Yosuke  Simon", null, "2020-05-22")
+      .addRow(null, "4", "2020-06-30")
+      .addRow("Avitus  Stribog", "5.00E+05", "2020-07-27")
+      .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+
+    config = GoogleSheetsStoragePluginConfig.builder()
+      .clientID(clientID)
+      .clientSecret(clientSecret)
+      .redirectUris(REDIRECT_URI)
+      .authUri(AUTH_URI)
+      .tokenUri(TOKEN_URI)
+      .allTextMode(false)
+      .extractHeaders(true)
+      .build();
+    config.setEnabled(true);
+    pluginRegistry.validatedPut("googlesheets", config);
+  }
+
+  @Test
+  public void testSchemaProvisioning() throws Exception {
+    try {
+      initializeTokens("googlesheets");
+    } catch (PluginException e) {
+      fail(e.getMessage());
+    }
+
+    String sql = String.format("SELECT * FROM table(`googlesheets`.`%s`.`MixedSheet` (schema => 'inline=(`Col1` VARCHAR, `Col2` INTEGER, `Col3` VARCHAR)')) LIMIT 5", sheetID);
+    RowSet results = queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addNullable("Col1", MinorType.VARCHAR)
+      .addNullable("Col2", MinorType.INT)
+      .addNullable("Col3", MinorType.VARCHAR)
+      .buildSchema();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+      .addRow("Rosaline  Thales", 1, null)
+      .addRow("Abdolhossein  Detlev", 2, "2020-04-30")
+      .addRow("Yosuke  Simon", null, "2020-05-22")
+      .addRow(null, 4, "2020-06-30")
+      .addRow("Avitus  Stribog", 500000, "2020-07-27")
+      .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  /**
+   * This function is used for testing only.  It initializes a {@link PersistentTokenTable} and populates it
+   * with a valid access and refresh token.
+   * @throws PluginException If anything goes wrong
+   */
+  private void initializeTokens(String pluginName) throws PluginException {
+    GoogleSheetsStoragePlugin plugin = (GoogleSheetsStoragePlugin) pluginRegistry.getPlugin(pluginName);
+    plugin.initializeTokenTableForTesting();
+    PersistentTokenTable tokenTable = plugin.getTokenTable();
+    tokenTable.setAccessToken(accessToken);
+    tokenTable.setRefreshToken(refreshToken);
+    tokenTable.setExpiresIn("50000");
+  }
+}
diff --git a/contrib/storage-googlesheets/src/test/java/org/apache/drill/exec/store/googlesheets/TestGoogleSheetsWriter.java b/contrib/storage-googlesheets/src/test/java/org/apache/drill/exec/store/googlesheets/TestGoogleSheetsWriter.java
new file mode 100644
index 0000000..daaba71
--- /dev/null
+++ b/contrib/storage-googlesheets/src/test/java/org/apache/drill/exec/store/googlesheets/TestGoogleSheetsWriter.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.googlesheets;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.drill.categories.RowSetTest;
+import org.apache.drill.common.util.DrillFileUtils;
+import org.apache.drill.exec.oauth.PersistentTokenTable;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.drill.shaded.guava.com.google.common.io.Files;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder.QuerySummary;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category(RowSetTest.class)
+@Ignore("These tests require a live Google Sheets connection.  Please run manually.")
+public class TestGoogleSheetsWriter extends ClusterTest {
+  private static final String AUTH_URI = "https://accounts.google.com/o/oauth2/auth";
+  private static final String TOKEN_URI = "https://oauth2.googleapis.com/token";
+  private static final List<String> REDIRECT_URI = new ArrayList<>(Arrays.asList("urn:ietf:wg:oauth:2.0:oob", "http://localhost"));
+
+  private static StoragePluginRegistry pluginRegistry;
+  private static String accessToken;
+  private static String refreshToken;
+
+  // Note on testing:  Testing the writing capabilites of this plugin is challenging.
+  // The primary issue is that when you execute a CTAS query, you do so using the file name.
+  // However, it does not seem possible to retrieve the created file's ID which is what you
+  // need to actually verify that the query successfully wrote the results.  Therefore, at this
+  // juncture, I can only recommend manual tests for the writing capabilities of this plugin.
+
+  @BeforeClass
+  public static void init() throws Exception {
+    ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+    dirTestWatcher.copyResourceToRoot(Paths.get(""));
+
+    String oauthJson = Files.asCharSource(DrillFileUtils.getResourceAsFile("/tokens/oauth_tokens.json"), Charsets.UTF_8).read();
+
+    ObjectMapper mapper = new ObjectMapper();
+    Map<String,String> tokenMap = mapper.readValue(oauthJson, Map.class);
+
+    String clientID = tokenMap.get("client_id");
+    String clientSecret = tokenMap.get("client_secret");
+    accessToken = tokenMap.get("access_token");
+    refreshToken = tokenMap.get("refresh_token");
+
+    pluginRegistry = cluster.drillbit().getContext().getStorage();
+    GoogleSheetsStoragePluginConfig config = GoogleSheetsStoragePluginConfig.builder()
+      .clientID(clientID)
+      .clientSecret(clientSecret)
+      .redirectUris(REDIRECT_URI)
+      .authUri(AUTH_URI)
+      .tokenUri(TOKEN_URI)
+      .build();
+
+    config.setEnabled(true);
+    pluginRegistry.validatedPut("googlesheets", config);
+  }
+
+  @Test
+  public void testBasicCTAS() throws Exception {
+    try {
+      initializeTokens();
+    } catch (PluginException e) {
+      fail(e.getMessage());
+    }
+
+    String query = "CREATE TABLE googlesheets.`test_sheet`.`test_table` (ID, NAME) AS " +
+      "SELECT * FROM (VALUES(1,2), (3,4))";
+    // Create the table and insert the values
+    QuerySummary insertResults = queryBuilder().sql(query).run();
+    assertTrue(insertResults.succeeded());
+  }
+
+  /**
+   * This function is used for testing only.  It initializes a {@link PersistentTokenTable} and populates it
+   * with a valid access and refresh token.
+   * @throws PluginException If anything goes wrong
+   */
+  private void initializeTokens() throws PluginException {
+    GoogleSheetsStoragePlugin plugin = (GoogleSheetsStoragePlugin) pluginRegistry.getPlugin("googlesheets");
+    plugin.initializeTokenTableForTesting();
+    PersistentTokenTable tokenTable = plugin.getTokenTable();
+    tokenTable.setAccessToken(accessToken);
+    tokenTable.setRefreshToken(refreshToken);
+    tokenTable.setExpiresIn("50000");
+  }
+}
diff --git a/contrib/storage-googlesheets/src/test/java/org/apache/drill/exec/store/googlesheets/TestRangeBuilder.java b/contrib/storage-googlesheets/src/test/java/org/apache/drill/exec/store/googlesheets/TestRangeBuilder.java
new file mode 100644
index 0000000..23ee093
--- /dev/null
+++ b/contrib/storage-googlesheets/src/test/java/org/apache/drill/exec/store/googlesheets/TestRangeBuilder.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.googlesheets;
+
+import org.apache.drill.exec.store.googlesheets.utils.GoogleSheetsRangeBuilder;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class TestRangeBuilder {
+
+  @Test
+  public void testBasicRange() {
+    GoogleSheetsRangeBuilder rangeBuilder = new GoogleSheetsRangeBuilder("Sheet1", 10_000);
+    assertEquals("'Sheet1'!1:10000", rangeBuilder.next());
+    assertEquals("'Sheet1'!10001:20000", rangeBuilder.next());
+    assertEquals("'Sheet1'!20001:30000", rangeBuilder.next());
+    rangeBuilder.lastBatch();
+    assertNull(rangeBuilder.next());
+  }
+
+  @Test
+  public void testRangeWithLimit() {
+    GoogleSheetsRangeBuilder rangeBuilder = new GoogleSheetsRangeBuilder("Sheet1", 100)
+      .addLimit(204);
+
+    assertEquals("'Sheet1'!1:100", rangeBuilder.next());
+    assertEquals("'Sheet1'!101:200", rangeBuilder.next());
+    assertEquals("'Sheet1'!201:204", rangeBuilder.next());
+    // Limit reached... no more results.
+    assertNull(rangeBuilder.next());
+  }
+
+  @Test
+  public void testRangeWithColumnsAndRowCount() {
+    GoogleSheetsRangeBuilder rangeBuilder = new GoogleSheetsRangeBuilder("Sheet1", 10);
+    rangeBuilder.addFirstColumn("A")
+      .addLastColumn("F")
+      .isStarQuery(true)
+      .addRowCount(25);
+    assertEquals("'Sheet1'!A1:F10", rangeBuilder.next());
+    assertEquals("'Sheet1'!A11:F20", rangeBuilder.next());
+    assertEquals("'Sheet1'!A21:F25", rangeBuilder.next());
+    assertNull(rangeBuilder.next());
+
+
+
+  }
+
+  @Test
+  public void testRangeWithColumnsAndLimitAndRowCount() {
+    GoogleSheetsRangeBuilder rangeBuilder = new GoogleSheetsRangeBuilder("Sheet1", 100);
+    rangeBuilder.addFirstColumn("A")
+      .addLastColumn("F")
+      .isStarQuery(true)
+      .addRowCount(25);
+    assertEquals("'Sheet1'!A1:F25", rangeBuilder.next());
+    // Row count reached... no more records
+    assertNull(rangeBuilder.next());
+
+    rangeBuilder = new GoogleSheetsRangeBuilder("Sheet1", 100);
+    rangeBuilder.addFirstColumn("A")
+      .addLastColumn("F")
+      .isStarQuery(true)
+      .addRowCount(25)
+      .addLimit(17);
+    assertEquals("'Sheet1'!A1:F17", rangeBuilder.next());
+    assertNull(rangeBuilder.next());
+
+    rangeBuilder = new GoogleSheetsRangeBuilder("Sheet1", 100);
+    rangeBuilder.addFirstColumn("A")
+      .addLastColumn("F")
+      .isStarQuery(true)
+      .addRowCount(150)
+      .addLimit(125);
+    assertEquals("'Sheet1'!A1:F100", rangeBuilder.next());
+    assertEquals("'Sheet1'!A101:F125", rangeBuilder.next());
+    assertNull(rangeBuilder.next());
+
+    rangeBuilder = new GoogleSheetsRangeBuilder("Sheet1", 100);
+    rangeBuilder.addFirstColumn("A")
+      .addLastColumn("F")
+      .isStarQuery(true)
+      .addRowCount(125)
+      .addLimit(150);
+    assertEquals("'Sheet1'!A1:F100", rangeBuilder.next());
+    assertEquals("'Sheet1'!A101:F125", rangeBuilder.next());
+    assertNull(rangeBuilder.next());
+  }
+}
diff --git a/contrib/storage-googlesheets/src/test/resources/data/Drill_Test_Data.xlsx b/contrib/storage-googlesheets/src/test/resources/data/Drill_Test_Data.xlsx
new file mode 100644
index 0000000..19a924c
--- /dev/null
+++ b/contrib/storage-googlesheets/src/test/resources/data/Drill_Test_Data.xlsx
Binary files differ
diff --git a/distribution/pom.xml b/distribution/pom.xml
index dd60eb1..8ed79a0 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -377,6 +377,11 @@
         </dependency>
         <dependency>
           <groupId>org.apache.drill.contrib</groupId>
+          <artifactId>drill-storage-googlesheets</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.drill.contrib</groupId>
           <artifactId>drill-storage-phoenix</artifactId>
           <version>${project.version}</version>
         </dependency>
diff --git a/distribution/src/assemble/component.xml b/distribution/src/assemble/component.xml
index 853793d..6927b05 100644
--- a/distribution/src/assemble/component.xml
+++ b/distribution/src/assemble/component.xml
@@ -59,6 +59,7 @@
         <include>org.apache.drill.contrib:drill-kudu-storage:jar</include>
         <include>org.apache.drill.contrib:drill-storage-phoenix:jar</include>
         <include>org.apache.drill.contrib:drill-storage-splunk:jar</include>
+        <include>org.apache.drill.contrib:drill-storage-googlesheets:jar</include>
         <include>org.apache.drill.contrib:drill-storage-kafka:jar</include>
         <include>org.apache.drill.contrib:drill-storage-elasticsearch:jar</include>
         <include>org.apache.drill.contrib:drill-storage-cassandra:jar</include>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/oauth/PersistentTokenTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/oauth/PersistentTokenTable.java
index a586ff0..7350fe2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/oauth/PersistentTokenTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/oauth/PersistentTokenTable.java
@@ -34,6 +34,7 @@
 public class PersistentTokenTable implements Tokens {
   public final String ACCESS_TOKEN_KEY = "access_token";
   public final String REFRESH_TOKEN_KEY = "refresh_token";
+  public final String EXPIRES_IN_KEY = "expires_in";
 
   private final Map<String, String> tokens;
 
@@ -88,6 +89,20 @@
 
   @Override
   @JsonIgnore
+  public String getExpiresIn() {
+    return get(EXPIRES_IN_KEY);
+  }
+
+  @Override
+  @JsonIgnore
+  public void setExpiresIn(String expiresIn) {
+    if (!tokens.containsKey(EXPIRES_IN_KEY) || !expiresIn.equals(getAccessToken())) {
+      put(EXPIRES_IN_KEY, expiresIn, true);
+    }
+  }
+
+  @Override
+  @JsonIgnore
   public void setAccessToken(String token) {
     // Only update the access token if it is not the same as the previous token
     if (!tokens.containsKey(ACCESS_TOKEN_KEY) || !token.equals(getAccessToken())) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/oauth/Tokens.java b/exec/java-exec/src/main/java/org/apache/drill/exec/oauth/Tokens.java
index 574eb8a..fcac97f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/oauth/Tokens.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/oauth/Tokens.java
@@ -41,6 +41,11 @@
 
   void setRefreshToken(String refreshToken);
 
+  void setExpiresIn(String expiresIn);
+
+  String getExpiresIn();
+
+
   /**
    * Returns value from tokens table that corresponds to provided plugin.
    *
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
index 23030cc..4c9f963 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
@@ -197,6 +197,7 @@
   @Deprecated
   public Response updateRefreshToken(@PathParam("name") String name, OAuthTokenContainer tokens) {
     // This endpoint is deprecated.  Use the same path in credentials resources instead.
+    logger.warn("Deprecated endpoint call: {}", "/storage/" + name + "/update_refresh_token.");
     return OAuthRequests.updateRefreshToken(name, tokens, storage, authEnabled, sc);
   }
 
@@ -207,6 +208,8 @@
   @Deprecated
   public Response updateAccessToken(@PathParam("name") String name, OAuthTokenContainer tokens) {
     // This endpoint is deprecated.  Use the same path in credentials resources instead.
+    logger.warn("Deprecated endpoint call: {}", "/storage/" + name + "/update_access_token.");
+
     return OAuthRequests.updateAccessToken(name, tokens, storage, authEnabled, sc);
   }
 
@@ -218,6 +221,7 @@
   public Response updateOAuthTokens(@PathParam("name") String name,
                                     OAuthTokenContainer tokenContainer) {
     // This endpoint is deprecated.  Use the same path in credentials resources instead.
+    logger.warn("Deprecated endpoint call: {}", "/storage/" + name + "/update_oauth_tokens.");
     return OAuthRequests.updateOAuthTokens(name, tokenContainer, storage, authEnabled, sc);
   }
 
@@ -227,6 +231,7 @@
   @Deprecated
   public Response updateAuthToken(@PathParam("name") String name, @QueryParam("code") String code) {
     // This endpoint is deprecated.  Use the same path in credentials resources instead.
+    logger.warn("Deprecated endpoint call: {}", "/storage/" + name + "/update_oauth2_authtoken.");
     return OAuthRequests.updateAuthToken(name, code, request, storage, authEnabled, sc);
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
index e589a47..d8a1a0b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
@@ -82,7 +82,7 @@
   /**
    * Get the physical scan operator for the particular GroupScan (read) node.
    *
-   * @param userName User whom to impersonate when when reading the contents as part of Scan.
+   * @param userName User whom to impersonate when reading the contents as part of Scan.
    * @param selection The configured storage engine specific selection.
    * @param options (optional) session options
    * @return The physical scan operator for the particular GroupScan (read) node.
@@ -93,7 +93,7 @@
   /**
    * Get the physical scan operator for the particular GroupScan (read) node.
    *
-   * @param userName        User whom to impersonate when when reading the contents as part of Scan.
+   * @param userName        User whom to impersonate when reading the contents as part of Scan.
    * @param selection       The configured storage engine specific selection.
    * @param options         (optional) session options
    * @param providerManager manager for handling metadata providers
@@ -105,7 +105,7 @@
   /**
    * Get the physical scan operator for the particular GroupScan (read) node.
    *
-   * @param userName User whom to impersonate when when reading the contents as part of Scan.
+   * @param userName User whom to impersonate when reading the contents as part of Scan.
    * @param selection The configured storage engine specific selection.
    * @param columns (optional) The list of column names to scan from the data source.
    * @return The physical scan operator for the particular GroupScan (read) node.
@@ -116,7 +116,7 @@
   /**
    * Get the physical scan operator for the particular GroupScan (read) node.
    *
-   * @param userName User whom to impersonate when when reading the contents as part of Scan.
+   * @param userName User whom to impersonate when reading the contents as part of Scan.
    * @param selection The configured storage engine specific selection.
    * @param columns (optional) The list of column names to scan from the data source.
    * @param options (optional) session options
@@ -128,7 +128,7 @@
   /**
    * Get the physical scan operator for the particular GroupScan (read) node.
    *
-   * @param userName        User whom to impersonate when when reading the contents as part of Scan.
+   * @param userName        User whom to impersonate when reading the contents as part of Scan.
    * @param selection       The configured storage engine specific selection.
    * @param columns         (optional) The list of column names to scan from the data source.
    * @param options         (optional) session options
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/oauth/OAuthTokenCredentials.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/oauth/OAuthTokenCredentials.java
index 08217dc..94bfa84 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/oauth/OAuthTokenCredentials.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/oauth/OAuthTokenCredentials.java
@@ -30,6 +30,7 @@
   public static final String CLIENT_ID = "clientID";
   public static final String CLIENT_SECRET = "clientSecret";
   public static final String ACCESS_TOKEN = "accessToken";
+  public static final String EXPIRES_IN = "expiresIn";
   public static final String REFRESH_TOKEN = "refreshToken";
   public static final String TOKEN_URI = "tokenURI";
   public static final String PROXY_USERNAME = "proxyUsername";
@@ -38,6 +39,7 @@
   private final String clientID;
   private final String clientSecret;
   private final String tokenURI;
+
   private Optional<PersistentTokenTable> tokenTable;
 
   /**
@@ -121,6 +123,10 @@
     return tokenTable.map(PersistentTokenTable::getRefreshToken).orElse(null);
   }
 
+  public String getExpiresIn() {
+    return tokenTable.map(PersistentTokenTable::getExpiresIn).orElse(null);
+  }
+
   public String getTokenUri() {
     return tokenURI;
   }
diff --git a/exec/java-exec/src/main/resources/rest/storage/update.ftl b/exec/java-exec/src/main/resources/rest/storage/update.ftl
index 972890d..63d3812 100644
--- a/exec/java-exec/src/main/resources/rest/storage/update.ftl
+++ b/exec/java-exec/src/main/resources/rest/storage/update.ftl
@@ -44,7 +44,7 @@
   <#else>
       <a id="enabled" class="btn btn-success text-white">Enable</a>
   </#if>
-  <#if model.getType() == "HttpStoragePluginConfig" && model.getPlugin().isOauth() >
+  <#if model.getPlugin().isOauth() >
       <a id="getOauth" class="btn btn-success text-white">Authorize</a>
   </#if>
     <button type="button" class="btn btn-secondary export" name="${model.getPlugin().getName()}" data-toggle="modal"
@@ -137,7 +137,6 @@
       }
     });
 
-  <#if model.getType() == "HttpStoragePluginConfig" >
     $("#getOauth").click(function() {
       var field = document.getElementById("config");
       try {
@@ -194,7 +193,6 @@
       window.alert("Cannot parse JSON.");
     }
   });
-  </#if>
 
     function doUpdate() {
       $("#updateForm").ajaxForm({