[LIVY-491][LIVY-492][LIVY-493] Add Thriftserver module and implementation

## What changes were proposed in this pull request?

The PR contains an implementation of a JDBC API for Livy server based on the Hive Thriftserver. The implementation is based on the version 3.0 of Hive Thriftserver.

This initial PR contains the thriftserver module added to Livy and its implementation. It doesn't contain any binding for starting it, this will be added later.

Some Hive classes have been ported here because they needed come modifications in order to works properly in Livy. Long term solution is to get of them all by re-implementing the needed parts in the Livy thriftserver itself, without relying on Hive code (other than the PRC interface) anymore. Those classes/changes can be summarized in three categories:

 1. Changes to make the Hive classes easy to extend: for instance, some visibility modifiers were changes (moving from `private` or `package private` to `protected` or `public`);
 2. Changes in order to reduce the dependencies on Hive modules/classes: for instance all the classes in the `operation` package were modified in order to get rid of the usage of `HiveSession` and the `HiveServer2` was changed in order not to use `CLIService`.
 3. The UGI management which is currently performed in Livy is definitely very different from the Hive one, this required changes to the `HiveAuthFactory` in order not to interfere with the existing codebase.

## How was this patch tested?

added integration tests as UTs

Author: Marco Gaido <mgaido@hortonworks.com>

Closes #104 from mgaido91/LIVY-491.
diff --git a/.travis.yml b/.travis.yml
index e62a8d9..6c5eee3 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -31,13 +31,14 @@
 matrix:
   include:
       # Spark 2.2+ will only be verified using JDK8
-    - env: MVN_FLAG='-Pspark-2.2-it -DskipTests'
+      # Thriftserver requires JDK8
+    - env: MVN_FLAG='-Pthriftserver -Pspark-2.2-it -DskipTests'
       jdk: oraclejdk8
-    - env: MVN_FLAG='-Pspark-2.2 -DskipITs'
+    - env: MVN_FLAG='-Pthriftserver -Pspark-2.2 -DskipITs'
       jdk: oraclejdk8
-    - env: MVN_FLAG='-Pspark-2.3-it -DskipTests'
+    - env: MVN_FLAG='-Pthriftserver -Pspark-2.3-it -DskipTests'
       jdk: oraclejdk8
-    - env: MVN_FLAG='-Pspark-2.3 -DskipITs'
+    - env: MVN_FLAG='-Pthriftserver -Pspark-2.3 -DskipITs'
       jdk: oraclejdk8
 
 
diff --git a/checkstyle-suppressions.xml b/checkstyle-suppressions.xml
index 4b8f4f2..84e7667 100644
--- a/checkstyle-suppressions.xml
+++ b/checkstyle-suppressions.xml
@@ -19,4 +19,5 @@
   "-//Puppy Crawl//DTD Suppressions 1.1//EN"
   "http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
 <suppressions>
+  <suppress checks="." files="java[\\/]org[\\/]apache[\\/]hive[\\/]service"/>
 </suppressions>
diff --git a/pom.xml b/pom.xml
index 85adaa1..4ce3435 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,6 +83,7 @@
     <spark.scala-2.11.version>1.6.2</spark.scala-2.11.version>
     <spark.scala-2.10.version>1.6.2</spark.scala-2.10.version>
     <spark.version>${spark.scala-2.11.version}</spark.version>
+    <hive.version>3.0.0</hive.version>
     <commons-codec.version>1.9</commons-codec.version>
     <guava.version>15.0</guava.version>
     <httpclient.version>4.5.3</httpclient.version>
@@ -92,6 +93,7 @@
     <jetty.version>9.2.16.v20160414</jetty.version>
     <json4s.version>3.2.10</json4s.version>
     <junit.version>4.11</junit.version>
+    <libthrift.version>0.9.3</libthrift.version>
     <kryo.version>2.22</kryo.version>
     <metrics.version>3.1.0</metrics.version>
     <mockito.version>1.9.5</mockito.version>
@@ -570,10 +572,11 @@
         <version>${py4j.version}</version>
       </dependency>
 
+      <!-- we need a version > 1.7.13 because of SLF4J-324 -->
       <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-api</artifactId>
-        <version>1.7.0</version>
+        <version>1.7.25</version>
       </dependency>
 
     </dependencies>
@@ -1077,6 +1080,39 @@
   </distributionManagement>
 
   <profiles>
+    <profile>
+      <id>thriftserver</id>
+      <modules>
+        <module>thriftserver/server</module>
+      </modules>
+      <properties>
+        <jetty.version>9.3.8.v20160314</jetty.version>
+      </properties>
+
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-enforcer-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>enforce-java</id>
+                <goals>
+                  <goal>enforce</goal>
+                </goals>
+                <configuration>
+                  <rules>
+                    <requireJavaVersion>
+                      <version>1.8</version>
+                    </requireJavaVersion>
+                  </rules>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
     <!-- Spark version profile -->
     <profile>
       <id>spark-1.6</id>
diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala
index edee79a..73544c8 100644
--- a/server/src/main/scala/org/apache/livy/LivyConf.scala
+++ b/server/src/main/scala/org/apache/livy/LivyConf.scala
@@ -99,6 +99,10 @@
   val LAUNCH_KERBEROS_REFRESH_INTERVAL = Entry("livy.server.launch.kerberos.refresh-interval", "1h")
   val KINIT_FAIL_THRESHOLD = Entry("livy.server.launch.kerberos.kinit-fail-threshold", 5)
 
+  val THRIFT_INCR_COLLECT_ENABLED = Entry("livy.server.thrift.incrementalCollect", false)
+  val THRIFT_SESSION_CREATION_TIMEOUT = Entry("livy.server.thrift.session.creationTimeout", "10m")
+  val THRIFT_SERVER_JAR_LOCATION = Entry("livy.server.thrift.jarLocation", null)
+
   /**
    * Recovery mode of Livy. Possible values:
    * off: Default. Turn off recovery. Every time Livy shuts down, it stops and forgets all sessions.
diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
index a97747c..8bb1641 100644
--- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
+++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
@@ -360,7 +360,7 @@
     id: Int,
     appIdHint: Option[String],
     appTag: String,
-    client: Option[RSCClient],
+    val client: Option[RSCClient],
     initialState: SessionState,
     val kind: Kind,
     heartbeatTimeoutS: Int,
diff --git a/thriftserver/server/pom.xml b/thriftserver/server/pom.xml
new file mode 100644
index 0000000..ef5870c
--- /dev/null
+++ b/thriftserver/server/pom.xml
@@ -0,0 +1,234 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>livy-main</artifactId>
+    <groupId>org.apache.livy</groupId>
+    <version>0.6.0-incubating-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>livy-thriftserver</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-service-rpc</artifactId>
+      <version>${hive.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-service</artifactId>
+      <version>${hive.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hive</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-exec</artifactId>
+      <version>${hive.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hive</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.logging.log4j</groupId>
+          <artifactId>log4j-slf4j-impl</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.logging.log4j</groupId>
+          <artifactId>log4j-1.2-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.livy</groupId>
+      <artifactId>livy-rsc</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.livy</groupId>
+      <artifactId>livy-core_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.livy</groupId>
+      <artifactId>livy-api</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.livy</groupId>
+      <artifactId>livy-server</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.eclipse.jetty</groupId>
+          <artifactId>jetty-server</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libthrift</artifactId>
+      <version>${libthrift.version}</version>
+    </dependency>
+
+    <!-- needed for compiling successfully when using JobContext -->
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-hive_${scala.binary.version}</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <!-- needed for testing -->
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-jdbc</artifactId>
+      <version>${hive.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hive</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.logging.log4j</groupId>
+          <artifactId>log4j-slf4j-impl</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <excludeArtifactIds>
+                jetty-util,
+                jetty-xml,
+                jetty-webapp
+              </excludeArtifactIds>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <configuration>
+          <outputDirectory>${project.build.directory}/jars</outputDirectory>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>com.googlecode.maven-download-plugin</groupId>
+        <artifactId>download-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>download-spark-files</id>
+            <goals>
+              <goal>wget</goal>
+            </goals>
+            <configuration>
+              <readTimeOut>60000</readTimeOut>
+              <retries>5</retries>
+              <url>${spark.bin.download.url}</url>
+              <outputDirectory>${project.build.directory}</outputDirectory>
+              <unpack>true</unpack>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest-maven-plugin</artifactId>
+        <executions>
+          <!-- Unbind integration test from test phase and bind it to integration-test phase -->
+          <execution>
+            <id>test</id>
+            <phase>none</phase>
+          </execution>
+          <execution>
+            <id>integration-test</id>
+            <phase>integration-test</phase>
+            <goals>
+              <goal>test</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <environmentVariables>
+            <LIVY_HOME>${execution.root}</LIVY_HOME>
+            <LIVY_TEST>false</LIVY_TEST>
+            <LIVY_INTEGRATION_TEST>true</LIVY_INTEGRATION_TEST>
+            <SPARK_HOME>${project.build.directory}/${spark.bin.name}</SPARK_HOME>
+          </environmentVariables>
+          <skipTests>${skipITs}</skipTests>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/thriftserver/server/src/main/java/org/apache/hive/service/auth/HiveAuthFactory.java b/thriftserver/server/src/main/java/org/apache/hive/service/auth/HiveAuthFactory.java
new file mode 100644
index 0000000..f2041e3
--- /dev/null
+++ b/thriftserver/server/src/main/java/org/apache/hive/service/auth/HiveAuthFactory.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.auth;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.AuthenticationException;
+import javax.security.sasl.Sasl;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.conf.HiveConfUtil;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.metastore.security.DBTokenStore;
+import org.apache.hadoop.hive.metastore.security.DelegationTokenStore;
+import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
+import org.apache.hadoop.hive.metastore.security.MemoryTokenStore;
+import org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager;
+import org.apache.hadoop.hive.metastore.security.ZooKeeperTokenStore;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.thrift.ThriftCLIService;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class helps in some aspects of authentication. It creates the proper Thrift classes for the
+ * given configuration as well as helps with authenticating requests.
+ */
+public class HiveAuthFactory {
+  private static final Logger LOG = LoggerFactory.getLogger(HiveAuthFactory.class);
+
+  private HadoopThriftAuthBridge.Server saslServer;
+  private String authTypeStr;
+  private final String transportMode;
+  private final HiveConf conf;
+  private String hadoopAuth;
+  private MetastoreDelegationTokenManager delegationTokenManager = null;
+
+  public HiveAuthFactory(HiveConf conf) throws TTransportException {
+    this.conf = conf;
+    transportMode = conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE);
+    authTypeStr = conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION);
+
+    // ShimLoader.getHadoopShims().isSecurityEnabled() will only check that
+    // hadoopAuth is not simple, it does not guarantee it is kerberos
+    hadoopAuth = conf.get(HADOOP_SECURITY_AUTHENTICATION, "simple");
+
+    // In http mode we use NOSASL as the default auth type
+    if (authTypeStr == null) {
+      if ("http".equalsIgnoreCase(transportMode)) {
+        authTypeStr = HiveAuthConstants.AuthTypes.NOSASL.getAuthName();
+      } else {
+        authTypeStr = HiveAuthConstants.AuthTypes.NONE.getAuthName();
+      }
+    }
+    if (isSASLWithKerberizedHadoop()) {
+      saslServer = new HadoopThriftAuthBridge.Server();
+
+      // Start delegation token manager
+      delegationTokenManager = new MetastoreDelegationTokenManager();
+      try {
+        Object baseHandler = null;
+        String tokenStoreClass = SecurityUtils.getTokenStoreClassName(conf);
+
+        if (tokenStoreClass.equals(DBTokenStore.class.getName())) {
+          // IMetaStoreClient is needed to access token store if DBTokenStore is to be used. It
+          // will be got via Hive.get(conf).getMSC in a thread where the DelegationTokenStore
+          // is called. To avoid the cyclic reference, we pass the Hive class to DBTokenStore where
+          // it is used to get a threadLocal Hive object with a synchronized MetaStoreClient using
+          // Java reflection.
+          // Note: there will be two HS2 life-long opened MSCs, one is stored in HS2 thread local
+          // Hive object, the other is in a daemon thread spawned in DelegationTokenSecretManager
+          // to remove expired tokens.
+          baseHandler = Hive.class;
+        }
+
+        delegationTokenManager.startDelegationTokenSecretManager(conf, baseHandler,
+            HadoopThriftAuthBridge.Server.ServerMode.HIVESERVER2);
+        saslServer.setSecretManager(delegationTokenManager.getSecretManager());
+      }
+      catch (IOException e) {
+        throw new TTransportException("Failed to start token manager", e);
+      }
+    }
+  }
+
+  public Map<String, String> getSaslProperties() {
+    Map<String, String> saslProps = new HashMap<String, String>();
+    SaslQOP saslQOP = SaslQOP.fromString(conf.getVar(ConfVars.HIVE_SERVER2_THRIFT_SASL_QOP));
+    saslProps.put(Sasl.QOP, saslQOP.toString());
+    saslProps.put(Sasl.SERVER_AUTH, "true");
+    return saslProps;
+  }
+
+  public TTransportFactory getAuthTransFactory() throws LoginException {
+    TTransportFactory transportFactory;
+    TSaslServerTransport.Factory serverTransportFactory;
+
+    if (isSASLWithKerberizedHadoop()) {
+      try {
+        serverTransportFactory = saslServer.createSaslServerTransportFactory(
+            getSaslProperties());
+      } catch (TTransportException e) {
+        throw new LoginException(e.getMessage());
+      }
+      if (authTypeStr.equalsIgnoreCase(HiveAuthConstants.AuthTypes.KERBEROS.getAuthName())) {
+        // no-op
+      } else if (authTypeStr.equalsIgnoreCase(HiveAuthConstants.AuthTypes.NONE.getAuthName()) ||
+          authTypeStr.equalsIgnoreCase(HiveAuthConstants.AuthTypes.LDAP.getAuthName()) ||
+          authTypeStr.equalsIgnoreCase(HiveAuthConstants.AuthTypes.PAM.getAuthName()) ||
+          authTypeStr.equalsIgnoreCase(HiveAuthConstants.AuthTypes.CUSTOM.getAuthName())) {
+        try {
+          serverTransportFactory.addServerDefinition("PLAIN",
+              authTypeStr, null, new HashMap<String, String>(),
+              new PlainSaslHelper.PlainServerCallbackHandler(authTypeStr));
+        } catch (AuthenticationException e) {
+          throw new LoginException ("Error setting callback handler" + e);
+        }
+      } else {
+        throw new LoginException("Unsupported authentication type " + authTypeStr);
+      }
+      transportFactory = saslServer.wrapTransportFactory(serverTransportFactory);
+    } else if (authTypeStr.equalsIgnoreCase(HiveAuthConstants.AuthTypes.NONE.getAuthName()) ||
+          authTypeStr.equalsIgnoreCase(HiveAuthConstants.AuthTypes.LDAP.getAuthName()) ||
+          authTypeStr.equalsIgnoreCase(HiveAuthConstants.AuthTypes.PAM.getAuthName()) ||
+          authTypeStr.equalsIgnoreCase(HiveAuthConstants.AuthTypes.CUSTOM.getAuthName())) {
+       transportFactory = PlainSaslHelper.getPlainTransportFactory(authTypeStr);
+    } else if (authTypeStr.equalsIgnoreCase(HiveAuthConstants.AuthTypes.NOSASL.getAuthName())) {
+      transportFactory = new TTransportFactory();
+    } else {
+      throw new LoginException("Unsupported authentication type " + authTypeStr);
+    }
+    return transportFactory;
+  }
+
+  /**
+   * Returns the thrift processor factory for HiveServer2 running in binary mode
+   * @param service
+   * @return
+   * @throws LoginException
+   */
+  public TProcessorFactory getAuthProcFactory(ThriftCLIService service) throws LoginException {
+    if (isSASLWithKerberizedHadoop()) {
+      return KerberosSaslHelper.getKerberosProcessorFactory(saslServer, service);
+    } else {
+      return PlainSaslHelper.getPlainProcessorFactory(service);
+    }
+  }
+
+  public String getRemoteUser() {
+    return saslServer == null ? null : saslServer.getRemoteUser();
+  }
+
+  public String getIpAddress() {
+    if (saslServer == null || saslServer.getRemoteAddress() == null) {
+      return null;
+    } else {
+      return saslServer.getRemoteAddress().getHostAddress();
+    }
+  }
+
+  public String getUserAuthMechanism() {
+    return saslServer == null ? null : saslServer.getUserAuthMechanism();
+  }
+
+  public boolean isSASLWithKerberizedHadoop() {
+    return "kerberos".equalsIgnoreCase(hadoopAuth)
+        && !authTypeStr.equalsIgnoreCase(HiveAuthConstants.AuthTypes.NOSASL.getAuthName());
+  }
+
+  public boolean isSASLKerberosUser() {
+    return AuthMethod.KERBEROS.getMechanismName().equals(getUserAuthMechanism())
+            || AuthMethod.TOKEN.getMechanismName().equals(getUserAuthMechanism());
+  }
+
+  // Perform SPNEGO login using the hadoop shim API if the configuration is available
+  public static UserGroupInformation loginFromSpnegoKeytabAndReturnUGI(HiveConf hiveConf)
+    throws IOException {
+    String principal = hiveConf.getVar(ConfVars.HIVE_SERVER2_SPNEGO_PRINCIPAL);
+    String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_SPNEGO_KEYTAB);
+    if (principal.isEmpty() || keyTabFile.isEmpty()) {
+      throw new IOException("HiveServer2 SPNEGO principal or keytab is not correctly configured");
+    } else {
+      return UserGroupInformation.loginUserFromKeytabAndReturnUGI(SecurityUtil.getServerPrincipal(principal, "0.0.0.0"), keyTabFile);
+    }
+  }
+
+  // retrieve delegation token for the given user
+  public String getDelegationToken(String owner, String renewer, String remoteAddr)
+      throws HiveSQLException {
+    if (delegationTokenManager == null) {
+      throw new HiveSQLException(
+          "Delegation token only supported over kerberos authentication", "08S01");
+    }
+
+    try {
+      String tokenStr = delegationTokenManager.getDelegationTokenWithService(owner, renewer,
+          HiveAuthConstants.HS2_CLIENT_TOKEN, remoteAddr);
+      if (tokenStr == null || tokenStr.isEmpty()) {
+        throw new HiveSQLException(
+            "Received empty retrieving delegation token for user " + owner, "08S01");
+      }
+      return tokenStr;
+    } catch (IOException e) {
+      throw new HiveSQLException(
+          "Error retrieving delegation token for user " + owner, "08S01", e);
+    } catch (InterruptedException e) {
+      throw new HiveSQLException("delegation token retrieval interrupted", "08S01", e);
+    }
+  }
+
+  // cancel given delegation token
+  public void cancelDelegationToken(String delegationToken) throws HiveSQLException {
+    if (delegationTokenManager == null) {
+      throw new HiveSQLException(
+          "Delegation token only supported over kerberos authentication", "08S01");
+    }
+    try {
+      delegationTokenManager.cancelDelegationToken(delegationToken);
+    } catch (IOException e) {
+      throw new HiveSQLException(
+          "Error canceling delegation token " + delegationToken, "08S01", e);
+    }
+  }
+
+  public void renewDelegationToken(String delegationToken) throws HiveSQLException {
+    if (delegationTokenManager == null) {
+      throw new HiveSQLException(
+          "Delegation token only supported over kerberos authentication", "08S01");
+    }
+    try {
+      delegationTokenManager.renewDelegationToken(delegationToken);
+    } catch (IOException e) {
+      throw new HiveSQLException(
+          "Error renewing delegation token " + delegationToken, "08S01", e);
+    }
+  }
+
+  public String verifyDelegationToken(String delegationToken) throws HiveSQLException {
+    if (delegationTokenManager == null) {
+      throw new HiveSQLException(
+          "Delegation token only supported over kerberos authentication", "08S01");
+    }
+    try {
+      return delegationTokenManager.verifyDelegationToken(delegationToken);
+    } catch (IOException e) {
+      String msg =  "Error verifying delegation token " + delegationToken;
+      LOG.error(msg, e);
+      throw new HiveSQLException(msg, "08S01", e);
+    }
+  }
+
+  public String getUserFromToken(String delegationToken) throws HiveSQLException {
+    if (delegationTokenManager == null) {
+      throw new HiveSQLException(
+          "Delegation token only supported over kerberos authentication", "08S01");
+    }
+    try {
+      return delegationTokenManager.getUserFromToken(delegationToken);
+    } catch (IOException e) {
+      throw new HiveSQLException(
+          "Error extracting user from delegation token " + delegationToken, "08S01", e);
+    }
+  }
+
+  public static void verifyProxyAccess(String realUser, String proxyUser, String ipAddress,
+    HiveConf hiveConf) throws HiveSQLException {
+    try {
+      UserGroupInformation sessionUgi;
+      if (UserGroupInformation.isSecurityEnabled()) {
+        KerberosNameShim kerbName = ShimLoader.getHadoopShims().getKerberosNameShim(realUser);
+        sessionUgi = UserGroupInformation.createProxyUser(
+            kerbName.getServiceName(), UserGroupInformation.getLoginUser());
+      } else {
+        sessionUgi = UserGroupInformation.createRemoteUser(realUser);
+      }
+      if (!proxyUser.equalsIgnoreCase(realUser)) {
+        ProxyUsers.refreshSuperUserGroupsConfiguration(hiveConf);
+        ProxyUsers.authorize(UserGroupInformation.createProxyUser(proxyUser, sessionUgi),
+            ipAddress, hiveConf);
+      }
+    } catch (IOException e) {
+      throw new HiveSQLException(
+        "Failed to validate proxy privilege of " + realUser + " for " + proxyUser, "08S01", e);
+    }
+  }
+}
diff --git a/thriftserver/server/src/main/java/org/apache/hive/service/cli/ColumnBasedSet.java b/thriftserver/server/src/main/java/org/apache/hive/service/cli/ColumnBasedSet.java
new file mode 100644
index 0000000..0dbd239
--- /dev/null
+++ b/thriftserver/server/src/main/java/org/apache/hive/service/cli/ColumnBasedSet.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli;
+
+import java.io.ByteArrayInputStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.serde2.thrift.ColumnBuffer;
+import org.apache.hadoop.hive.serde2.thrift.Type;
+import org.apache.hive.service.rpc.thrift.TColumn;
+import org.apache.hive.service.rpc.thrift.TRow;
+import org.apache.hive.service.rpc.thrift.TRowSet;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * ColumnBasedSet.
+ */
+public class ColumnBasedSet implements RowSet {
+
+  private long startOffset;
+
+  private final TypeDescriptor[] descriptors; // non-null only for writing (server-side)
+  private final List<ColumnBuffer> columns;
+  private byte[] blob;
+  private boolean isBlobBased = false;
+  public static final Logger LOG = LoggerFactory.getLogger(ColumnBasedSet.class);
+
+  public ColumnBasedSet(TableSchema schema) {
+    descriptors = schema.toTypeDescriptors();
+    columns = new ArrayList<ColumnBuffer>();
+    for (ColumnDescriptor colDesc : schema.getColumnDescriptors()) {
+      columns.add(new ColumnBuffer(colDesc.getType()));
+    }
+  }
+
+  public ColumnBasedSet(TRowSet tRowSet) throws TException {
+    descriptors = null;
+    columns = new ArrayList<ColumnBuffer>();
+    // Use TCompactProtocol to read serialized TColumns
+    if (tRowSet.isSetBinaryColumns()) {
+      TProtocol protocol =
+          new TCompactProtocol(new TIOStreamTransport(new ByteArrayInputStream(
+              tRowSet.getBinaryColumns())));
+      // Read from the stream using the protocol for each column in final schema
+      for (int i = 0; i < tRowSet.getColumnCount(); i++) {
+        TColumn tvalue = new TColumn();
+        try {
+          tvalue.read(protocol);
+        } catch (TException e) {
+          LOG.error(e.getMessage(), e);
+          throw new TException("Error reading column value from the row set blob", e);
+        }
+        columns.add(new ColumnBuffer(tvalue));
+      }
+    }
+    else {
+      if (tRowSet.getColumns() != null) {
+        for (TColumn tvalue : tRowSet.getColumns()) {
+          columns.add(new ColumnBuffer(tvalue));
+        }
+      }
+    }
+    startOffset = tRowSet.getStartRowOffset();
+  }
+
+  public ColumnBasedSet(TypeDescriptor[] descriptors, List<ColumnBuffer> columns, long startOffset) {
+    this.descriptors = descriptors;
+    this.columns = columns;
+    this.startOffset = startOffset;
+  }
+
+  public ColumnBasedSet(TableSchema schema, boolean isBlobBased) {
+    this(schema);
+    this.isBlobBased = isBlobBased;
+  }
+
+  @Override
+  public ColumnBasedSet addRow(Object[] fields) {
+    if (isBlobBased) {
+      this.blob = (byte[]) fields[0];
+    } else {
+      for (int i = 0; i < fields.length; i++) {
+        TypeDescriptor descriptor = descriptors[i];
+        Object field = fields[i];
+        if (field != null && descriptor.getType() == Type.DECIMAL_TYPE) {
+          int scale = descriptor.getDecimalDigits();
+          field = ((HiveDecimal) field).toFormatString(scale);
+        }
+        columns.get(i).addValue(descriptor.getType(), field);
+      }
+    }
+    return this;
+  }
+
+  public List<ColumnBuffer> getColumns() {
+    return columns;
+  }
+
+  @Override
+  public int numColumns() {
+    return columns.size();
+  }
+
+  @Override
+  public int numRows() {
+    return columns.isEmpty() ? 0 : columns.get(0).size();
+  }
+
+  @Override
+  public ColumnBasedSet extractSubset(int maxRows) {
+    int numRows = Math.min(numRows(), maxRows);
+
+    List<ColumnBuffer> subset = new ArrayList<ColumnBuffer>();
+    for (int i = 0; i < columns.size(); i++) {
+      subset.add(columns.get(i).extractSubset(numRows));
+    }
+    ColumnBasedSet result = new ColumnBasedSet(descriptors, subset, startOffset);
+    startOffset += numRows;
+    return result;
+  }
+
+  @Override
+  public long getStartOffset() {
+    return startOffset;
+  }
+
+  @Override
+  public void setStartOffset(long startOffset) {
+    this.startOffset = startOffset;
+  }
+
+  public TRowSet toTRowSet() {
+    TRowSet tRowSet = new TRowSet(startOffset, new ArrayList<TRow>());
+    if (isBlobBased) {
+      tRowSet.setColumns(null);
+      tRowSet.setBinaryColumns(blob);
+      tRowSet.setColumnCount(numColumns());
+    } else {
+      for (int i = 0; i < columns.size(); i++) {
+        tRowSet.addToColumns(columns.get(i).toTColumn());
+      }
+    }
+    return tRowSet;
+  }
+
+  @Override
+  public Iterator<Object[]> iterator() {
+    return new Iterator<Object[]>() {
+
+      private int index;
+      private final Object[] convey = new Object[numColumns()];
+
+      @Override
+      public boolean hasNext() {
+        return index < numRows();
+      }
+
+      @Override
+      public Object[] next() {
+        for (int i = 0; i < columns.size(); i++) {
+          convey[i] = columns.get(i).get(index);
+        }
+        index++;
+        return convey;
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException("remove");
+      }
+    };
+  }
+
+  public Object[] fill(int index, Object[] convey) {
+    for (int i = 0; i < columns.size(); i++) {
+      convey[i] = columns.get(i).get(index);
+    }
+    return convey;
+  }
+}
diff --git a/thriftserver/server/src/main/java/org/apache/hive/service/cli/JobProgressUpdate.java b/thriftserver/server/src/main/java/org/apache/hive/service/cli/JobProgressUpdate.java
new file mode 100644
index 0000000..5c3d8ce
--- /dev/null
+++ b/thriftserver/server/src/main/java/org/apache/hive/service/cli/JobProgressUpdate.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.cli;
+
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
+
+import java.util.List;
+
+public class JobProgressUpdate {
+  public final double progressedPercentage;
+  public final String footerSummary;
+  public final long startTimeMillis;
+  private final List<String> headers;
+  private final List<List<String>> rows;
+  public final String status;
+
+
+  public JobProgressUpdate(ProgressMonitor monitor) {
+    this(monitor.headers(), monitor.rows(), monitor.footerSummary(), monitor.progressedPercentage(),
+        monitor.startTime(), monitor.executionStatus());
+  }
+
+  private JobProgressUpdate(List<String> headers, List<List<String>> rows, String footerSummary,
+      double progressedPercentage, long startTimeMillis, String status) {
+    this.progressedPercentage = progressedPercentage;
+    this.footerSummary = footerSummary;
+    this.startTimeMillis = startTimeMillis;
+    this.headers = headers;
+    this.rows = rows;
+    this.status = status;
+  }
+
+  public List<String> headers() {
+    return headers;
+  }
+
+  public List<List<String>> rows() {
+    return rows;
+  }
+}
diff --git a/thriftserver/server/src/main/java/org/apache/hive/service/cli/OperationStatus.java b/thriftserver/server/src/main/java/org/apache/hive/service/cli/OperationStatus.java
new file mode 100644
index 0000000..663be4b
--- /dev/null
+++ b/thriftserver/server/src/main/java/org/apache/hive/service/cli/OperationStatus.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli;
+
+/**
+ * OperationStatus
+ *
+ */
+public class OperationStatus {
+
+  private final OperationState state;
+  private final String taskStatus;
+  private final long operationStarted;
+  private final long operationCompleted;
+  private final boolean hasResultSet;
+  private final HiveSQLException operationException;
+  private JobProgressUpdate jobProgressUpdate;
+
+  public OperationStatus(OperationState state, String taskStatus, long operationStarted, long operationCompleted, boolean hasResultSet, HiveSQLException operationException) {
+    this.state = state;
+    this.taskStatus = taskStatus;
+    this.operationStarted = operationStarted;
+    this.operationCompleted = operationCompleted;
+    this.hasResultSet = hasResultSet;
+    this.operationException = operationException;
+  }
+
+  public OperationState getState() {
+    return state;
+  }
+
+  public String getTaskStatus() {
+    return taskStatus;
+  }
+
+  public long getOperationStarted() {
+    return operationStarted;
+  }
+
+  public long getOperationCompleted() {
+    return operationCompleted;
+  }
+
+  public boolean getHasResultSet() {
+    return hasResultSet;
+  }
+
+  public HiveSQLException getOperationException() {
+    return operationException;
+  }
+
+  public void setJobProgressUpdate(JobProgressUpdate jobProgressUpdate){
+    this.jobProgressUpdate = jobProgressUpdate;
+  }
+
+  public JobProgressUpdate jobProgressUpdate(){
+    return jobProgressUpdate;
+  }
+}
diff --git a/thriftserver/server/src/main/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java b/thriftserver/server/src/main/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java
new file mode 100644
index 0000000..02b7bf3
--- /dev/null
+++ b/thriftserver/server/src/main/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.operation;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hive.service.cli.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * GetCatalogsOperation.
+ *
+ */
+public class GetCatalogsOperation extends MetadataOperation {
+
+  private static final Logger LOG = LoggerFactory.getLogger(GetCatalogsOperation.class.getName());
+
+  private static final TableSchema RESULT_SET_SCHEMA = new TableSchema()
+  .addStringColumn("TABLE_CAT", "Catalog name. NULL if not applicable.");
+
+  private final RowSet rowSet;
+
+  public GetCatalogsOperation(SessionHandle sessionHandle) {
+    super(sessionHandle, OperationType.GET_CATALOGS);
+    rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion(), false);
+    LOG.info("Starting GetCatalogsOperation");
+  }
+
+  @Override
+  public void runInternal() throws HiveSQLException {
+    setState(OperationState.RUNNING);
+    try {
+      // catalogs are actually not supported in hive, so this is a no-op
+      setState(OperationState.FINISHED);
+      LOG.info("Fetching catalog metadata has been successfully finished");
+    } catch (HiveSQLException e) {
+      setState(OperationState.ERROR);
+      throw e;
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.Operation#getResultSetSchema()
+   */
+  @Override
+  public TableSchema getResultSetSchema() throws HiveSQLException {
+    return RESULT_SET_SCHEMA;
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.Operation#getNextRowSet(org.apache.hive.service.cli.FetchOrientation, long)
+   */
+  @Override
+  public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException {
+    assertState(new ArrayList<OperationState>(Arrays.asList(OperationState.FINISHED)));
+    validateDefaultFetchOrientation(orientation);
+    if (orientation.equals(FetchOrientation.FETCH_FIRST)) {
+      rowSet.setStartOffset(0);
+    }
+    return rowSet.extractSubset((int)maxRows);
+  }
+}
diff --git a/thriftserver/server/src/main/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java b/thriftserver/server/src/main/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java
new file mode 100644
index 0000000..0f3cc24
--- /dev/null
+++ b/thriftserver/server/src/main/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.operation;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hive.service.cli.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.livy.thriftserver.LivyThriftServer$;
+
+/**
+ * GetTableTypesOperation.
+ *
+ */
+public class GetTableTypesOperation extends MetadataOperation {
+
+  private static final Logger LOG = LoggerFactory.getLogger(GetTableTypesOperation.class.getName());
+
+  protected static TableSchema RESULT_SET_SCHEMA = new TableSchema()
+  .addStringColumn("TABLE_TYPE", "Table type name.");
+
+  private final RowSet rowSet;
+  private final TableTypeMapping tableTypeMapping;
+
+  public GetTableTypesOperation(SessionHandle sessionHandle) {
+    super(sessionHandle, OperationType.GET_TABLE_TYPES);
+    String tableMappingStr = LivyThriftServer$.MODULE$.getInstance().get().getHiveConf()
+        .getVar(HiveConf.ConfVars.HIVE_SERVER2_TABLE_TYPE_MAPPING);
+    tableTypeMapping = TableTypeMappingFactory.getTableTypeMapping(tableMappingStr);
+    rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion(), false);
+    LOG.info("Starting GetTableTypesOperation");
+  }
+
+  @Override
+  public void runInternal() throws HiveSQLException {
+    setState(OperationState.RUNNING);
+    LOG.info("Fetching table type metadata");
+    try {
+      for (TableType type : TableType.values()) {
+        String tableType = tableTypeMapping.mapToClientType(type.toString());
+        rowSet.addRow(new String[] {tableType});
+        if (LOG.isDebugEnabled()) {
+          String debugMessage = getDebugMessage("table type", RESULT_SET_SCHEMA);
+          LOG.debug(debugMessage, tableType);
+        }
+      }
+      if (LOG.isDebugEnabled() && rowSet.numRows() == 0) {
+        LOG.debug("No table type metadata has been returned.");
+      }
+      setState(OperationState.FINISHED);
+      LOG.info("Fetching table type metadata has been successfully finished");
+    } catch (Exception e) {
+      setState(OperationState.ERROR);
+      throw new HiveSQLException(e);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.Operation#getResultSetSchema()
+   */
+  @Override
+  public TableSchema getResultSetSchema() throws HiveSQLException {
+    assertState(new ArrayList<OperationState>(Arrays.asList(OperationState.FINISHED)));
+    return RESULT_SET_SCHEMA;
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.Operation#getNextRowSet(org.apache.hive.service.cli.FetchOrientation, long)
+   */
+  @Override
+  public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException {
+    assertState(new ArrayList<OperationState>(Arrays.asList(OperationState.FINISHED)));
+    validateDefaultFetchOrientation(orientation);
+    if (orientation.equals(FetchOrientation.FETCH_FIRST)) {
+      rowSet.setStartOffset(0);
+    }
+    return rowSet.extractSubset((int)maxRows);
+  }
+
+}
diff --git a/thriftserver/server/src/main/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java b/thriftserver/server/src/main/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java
new file mode 100644
index 0000000..501a2a9
--- /dev/null
+++ b/thriftserver/server/src/main/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.operation;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.serde2.thrift.Type;
+import org.apache.hive.service.cli.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * GetTypeInfoOperation.
+ *
+ */
+public class GetTypeInfoOperation extends MetadataOperation {
+
+  private static final Logger LOG = LoggerFactory.getLogger(GetTypeInfoOperation.class.getName());
+
+  private final static TableSchema RESULT_SET_SCHEMA = new TableSchema()
+  .addPrimitiveColumn("TYPE_NAME", Type.STRING_TYPE,
+      "Type name")
+  .addPrimitiveColumn("DATA_TYPE", Type.INT_TYPE,
+      "SQL data type from java.sql.Types")
+  .addPrimitiveColumn("PRECISION", Type.INT_TYPE,
+      "Maximum precision")
+  .addPrimitiveColumn("LITERAL_PREFIX", Type.STRING_TYPE,
+      "Prefix used to quote a literal (may be null)")
+  .addPrimitiveColumn("LITERAL_SUFFIX", Type.STRING_TYPE,
+      "Suffix used to quote a literal (may be null)")
+  .addPrimitiveColumn("CREATE_PARAMS", Type.STRING_TYPE,
+      "Parameters used in creating the type (may be null)")
+  .addPrimitiveColumn("NULLABLE", Type.SMALLINT_TYPE,
+      "Can you use NULL for this type")
+  .addPrimitiveColumn("CASE_SENSITIVE", Type.BOOLEAN_TYPE,
+      "Is it case sensitive")
+  .addPrimitiveColumn("SEARCHABLE", Type.SMALLINT_TYPE,
+      "Can you use \"WHERE\" based on this type")
+  .addPrimitiveColumn("UNSIGNED_ATTRIBUTE", Type.BOOLEAN_TYPE,
+      "Is it unsigned")
+  .addPrimitiveColumn("FIXED_PREC_SCALE", Type.BOOLEAN_TYPE,
+      "Can it be a money value")
+  .addPrimitiveColumn("AUTO_INCREMENT", Type.BOOLEAN_TYPE,
+      "Can it be used for an auto-increment value")
+  .addPrimitiveColumn("LOCAL_TYPE_NAME", Type.STRING_TYPE,
+      "Localized version of type name (may be null)")
+  .addPrimitiveColumn("MINIMUM_SCALE", Type.SMALLINT_TYPE,
+      "Minimum scale supported")
+  .addPrimitiveColumn("MAXIMUM_SCALE", Type.SMALLINT_TYPE,
+      "Maximum scale supported")
+  .addPrimitiveColumn("SQL_DATA_TYPE", Type.INT_TYPE,
+      "Unused")
+  .addPrimitiveColumn("SQL_DATETIME_SUB", Type.INT_TYPE,
+      "Unused")
+  .addPrimitiveColumn("NUM_PREC_RADIX", Type.INT_TYPE,
+      "Usually 2 or 10");
+
+  private final RowSet rowSet;
+
+  public GetTypeInfoOperation(SessionHandle sessionHandle) {
+    super(sessionHandle, OperationType.GET_TYPE_INFO);
+    rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion(), false);
+    LOG.info("Starting GetTypeInfoOperation");
+  }
+
+  @Override
+  public void runInternal() throws HiveSQLException {
+    setState(OperationState.RUNNING);
+    LOG.info("Fetching type info metadata");
+    try {
+      for (Type type : Type.values()) {
+        Object[] rowData = new Object[] {
+            type.getName(), // TYPE_NAME
+            type.toJavaSQLType(), // DATA_TYPE
+            type.getMaxPrecision(), // PRECISION
+            type.getLiteralPrefix(), // LITERAL_PREFIX
+            type.getLiteralSuffix(), // LITERAL_SUFFIX
+            type.getCreateParams(), // CREATE_PARAMS
+            type.getNullable(), // NULLABLE
+            type.isCaseSensitive(), // CASE_SENSITIVE
+            type.getSearchable(), // SEARCHABLE
+            type.isUnsignedAttribute(), // UNSIGNED_ATTRIBUTE
+            type.isFixedPrecScale(), // FIXED_PREC_SCALE
+            type.isAutoIncrement(), // AUTO_INCREMENT
+            type.getLocalizedName(), // LOCAL_TYPE_NAME
+            type.getMinimumScale(), // MINIMUM_SCALE
+            type.getMaximumScale(), // MAXIMUM_SCALE
+            null, // SQL_DATA_TYPE, unused
+            null, // SQL_DATETIME_SUB, unused
+            type.getNumPrecRadix() //NUM_PREC_RADIX
+        };
+        rowSet.addRow(rowData);
+        if (LOG.isDebugEnabled()) {
+          String debugMessage = getDebugMessage("type info", RESULT_SET_SCHEMA);
+          LOG.debug(debugMessage, rowData);
+        }
+      }
+      if (LOG.isDebugEnabled() && rowSet.numRows() == 0) {
+        LOG.debug("No type info metadata has been returned.");
+      }
+      setState(OperationState.FINISHED);
+      LOG.info("Fetching type info metadata has been successfully finished");
+    } catch (Exception e) {
+      setState(OperationState.ERROR);
+      throw new HiveSQLException(e);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.Operation#getResultSetSchema()
+   */
+  @Override
+  public TableSchema getResultSetSchema() throws HiveSQLException {
+    assertState(new ArrayList<OperationState>(Arrays.asList(OperationState.FINISHED)));
+    return RESULT_SET_SCHEMA;
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.Operation#getNextRowSet(org.apache.hive.service.cli.FetchOrientation, long)
+   */
+  @Override
+  public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException {
+    assertState(new ArrayList<OperationState>(Arrays.asList(OperationState.FINISHED)));
+    validateDefaultFetchOrientation(orientation);
+    if (orientation.equals(FetchOrientation.FETCH_FIRST)) {
+      rowSet.setStartOffset(0);
+    }
+    return rowSet.extractSubset((int)maxRows);
+  }
+}
diff --git a/thriftserver/server/src/main/java/org/apache/hive/service/cli/operation/HiveTableTypeMapping.java b/thriftserver/server/src/main/java/org/apache/hive/service/cli/operation/HiveTableTypeMapping.java
new file mode 100644
index 0000000..6bf2507
--- /dev/null
+++ b/thriftserver/server/src/main/java/org/apache/hive/service/cli/operation/HiveTableTypeMapping.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.operation;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.hive.metastore.TableType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HiveTableTypeMapping.
+ * Default table type mapping
+ *
+ */
+public class HiveTableTypeMapping implements TableTypeMapping {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HiveTableTypeMapping.class);
+
+  @Override
+  public String[] mapToHiveType(String clientTypeName) {
+    return new String[] {mapToClientType(clientTypeName)};
+  }
+
+  @Override
+  public String mapToClientType(String hiveTypeName) {
+    try {
+      TableType hiveType = TableType.valueOf(hiveTypeName.toUpperCase());
+      return hiveType.name();
+    } catch (IllegalArgumentException e) {
+      LOG.warn("Invalid hive table type " + hiveTypeName);
+      return hiveTypeName;
+    }
+  }
+
+  @Override
+  public Set<String> getTableTypeNames() {
+    Set<String> typeNameSet = new HashSet<String>();
+    for (TableType typeNames : TableType.values()) {
+      typeNameSet.add(typeNames.name());
+    }
+    return typeNameSet;
+  }
+}
diff --git a/thriftserver/server/src/main/java/org/apache/hive/service/cli/operation/MetadataOperation.java b/thriftserver/server/src/main/java/org/apache/hive/service/cli/operation/MetadataOperation.java
new file mode 100644
index 0000000..553a283
--- /dev/null
+++ b/thriftserver/server/src/main/java/org/apache/hive/service/cli/operation/MetadataOperation.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.operation;
+
+import org.apache.hive.service.cli.ColumnDescriptor;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.OperationState;
+import org.apache.hive.service.cli.OperationType;
+import org.apache.hive.service.cli.SessionHandle;
+import org.apache.hive.service.cli.TableSchema;
+
+/**
+ * MetadataOperation.
+ *
+ */
+public abstract class MetadataOperation extends Operation {
+
+  protected static final String DEFAULT_HIVE_CATALOG = "";
+
+  protected MetadataOperation(SessionHandle sessionHandle, OperationType opType) {
+    super(sessionHandle, opType);
+    setHasResultSet(true);
+  }
+
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.Operation#close()
+   */
+  @Override
+  public void close() throws HiveSQLException {
+    setState(OperationState.CLOSED);
+  }
+
+  /**
+   * Convert wildchars and escape sequence from JDBC format to datanucleous/regex
+   */
+  protected String convertIdentifierPattern(final String pattern, boolean datanucleusFormat) {
+    if (pattern == null) {
+      return convertPattern("%", true);
+    } else {
+      return convertPattern(pattern, datanucleusFormat);
+    }
+  }
+
+  /**
+   * Convert wildchars and escape sequence of schema pattern from JDBC format to datanucleous/regex
+   * The schema pattern treats empty string also as wildchar
+   */
+  protected String convertSchemaPattern(final String pattern) {
+    if ((pattern == null) || pattern.isEmpty()) {
+      return convertPattern("%", true);
+    } else {
+      return convertPattern(pattern, true);
+    }
+  }
+
+  /**
+   * Convert a pattern containing JDBC catalog search wildcards into
+   * Java regex patterns.
+   *
+   * @param pattern input which may contain '%' or '_' wildcard characters, or
+   * these characters escaped using {@link #getSearchStringEscape()}.
+   * @return replace %/_ with regex search characters, also handle escaped
+   * characters.
+   *
+   * The datanucleus module expects the wildchar as '*'. The columns search on the
+   * other hand is done locally inside the hive code and that requires the regex wildchar
+   * format '.*'  This is driven by the datanucleusFormat flag.
+   */
+  private String convertPattern(String pattern, boolean datanucleusFormat) {
+    String wStr;
+    if (datanucleusFormat) {
+      wStr = "*";
+    } else {
+      wStr = ".*";
+    }
+    pattern = replaceAll(pattern, "([^\\\\])%", "$1" + wStr);
+    pattern = replaceAll(pattern, "\\\\%", "%");
+    pattern = replaceAll(pattern, "^%", wStr);
+    pattern = replaceAll(pattern, "([^\\\\])_", "$1.");
+    pattern = replaceAll(pattern, "\\\\_", "_");
+    pattern = replaceAll(pattern, "^_", ".");
+    return pattern;
+  }
+
+  private String replaceAll(String input, final String pattern, final String replace) {
+    while (true) {
+      String replaced = input.replaceAll(pattern, replace);
+      if (replaced.equals(input)) {
+        return replaced;
+      }
+      input = replaced;
+    }
+  }
+
+  @Override
+  public void cancel(OperationState stateAfterCancel) throws HiveSQLException {
+    throw new UnsupportedOperationException("MetadataOperation.cancel()");
+  }
+
+  protected String getDebugMessage(final String type, final TableSchema resultSetSchema) {
+    StringBuilder debugMessage = new StringBuilder();
+    debugMessage.append("Returning ");
+    debugMessage.append(type);
+    debugMessage.append(" metadata: ");
+    boolean firstColumn = true;
+    for (ColumnDescriptor column : resultSetSchema.getColumnDescriptors()) {
+      if (!firstColumn) {
+        debugMessage.append(", ");
+      }
+      debugMessage.append(column.getName());
+      debugMessage.append("={}");
+      firstColumn = false;
+    }
+    return debugMessage.toString();
+  }
+}
diff --git a/thriftserver/server/src/main/java/org/apache/hive/service/cli/operation/Operation.java b/thriftserver/server/src/main/java/org/apache/hive/service/cli/operation/Operation.java
new file mode 100644
index 0000000..9a3b516
--- /dev/null
+++ b/thriftserver/server/src/main/java/org/apache/hive/service/cli/operation/Operation.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.cli.operation;
+
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hive.service.cli.FetchOrientation;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.OperationState;
+import org.apache.hive.service.cli.OperationStatus;
+import org.apache.hive.service.cli.OperationType;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.SessionHandle;
+import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.rpc.thrift.TProtocolVersion;
+import org.apache.livy.thriftserver.LivyThriftServer$;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class Operation {
+  protected final SessionHandle sessionHandle;
+  private volatile OperationState state = OperationState.INITIALIZED;
+  private final OperationHandle opHandle;
+  public static final FetchOrientation DEFAULT_FETCH_ORIENTATION = FetchOrientation.FETCH_NEXT;
+  public static final Logger LOG = LoggerFactory.getLogger(Operation.class.getName());
+  protected boolean hasResultSet;
+  protected volatile HiveSQLException operationException;
+  protected volatile Future<?> backgroundHandle;
+  private ScheduledExecutorService scheduledExecutorService;
+
+  private long operationTimeout;
+  private volatile long lastAccessTime;
+  private final long beginTime;
+
+  protected long operationStart;
+  protected long operationComplete;
+
+  protected static final EnumSet<FetchOrientation> DEFAULT_FETCH_ORIENTATION_SET =
+      EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST);
+
+
+  protected Operation(SessionHandle sessionHandle, OperationType opType) {
+    this(sessionHandle, null, opType);
+  }
+
+  protected Operation(SessionHandle sessionHandle,
+      Map<String, String> confOverlay, OperationType opType) {
+    this.sessionHandle = sessionHandle;
+    this.opHandle = new OperationHandle(opType, sessionHandle.getProtocolVersion());
+    beginTime = System.currentTimeMillis();
+    lastAccessTime = beginTime;
+    HiveConf conf = LivyThriftServer$.MODULE$.getInstance().get().getHiveConf();
+    operationTimeout = HiveConf.getTimeVar(conf,
+        HiveConf.ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT, TimeUnit.MILLISECONDS);
+    scheduledExecutorService = Executors.newScheduledThreadPool(1);
+
+  }
+
+  public Future<?> getBackgroundHandle() {
+    return backgroundHandle;
+  }
+
+  protected void setBackgroundHandle(Future<?> backgroundHandle) {
+    this.backgroundHandle = backgroundHandle;
+  }
+
+  public boolean shouldRunAsync() {
+    return false; // Most operations cannot run asynchronously.
+  }
+
+  public SessionHandle getSessionHandle() {
+    return sessionHandle;
+  }
+
+  public OperationHandle getHandle() {
+    return opHandle;
+  }
+
+  public TProtocolVersion getProtocolVersion() {
+    return opHandle.getProtocolVersion();
+  }
+
+  public OperationType getType() {
+    return opHandle.getOperationType();
+  }
+
+  public OperationStatus getStatus() {
+    String taskStatus = null;
+    try {
+      taskStatus = getTaskStatus();
+    } catch (HiveSQLException sqlException) {
+      LOG.error("Error getting task status for " + opHandle.toString(), sqlException);
+    }
+    return new OperationStatus(state, taskStatus, operationStart, operationComplete, hasResultSet, operationException);
+  }
+
+  public boolean hasResultSet() {
+    return hasResultSet;
+  }
+
+  protected void setHasResultSet(boolean hasResultSet) {
+    this.hasResultSet = hasResultSet;
+    opHandle.setHasResultSet(hasResultSet);
+  }
+
+  protected final OperationState setState(OperationState newState) throws HiveSQLException {
+    state.validateTransition(newState);
+    OperationState prevState = state;
+    this.state = newState;
+    onNewState(state, prevState);
+    this.lastAccessTime = System.currentTimeMillis();
+    return this.state;
+  }
+
+  public boolean isTimedOut(long current) {
+    if (operationTimeout == 0) {
+      return false;
+    }
+    if (operationTimeout > 0) {
+      // check only when it's in terminal state
+      return state.isTerminal() && lastAccessTime + operationTimeout <= current;
+    }
+    return lastAccessTime + -operationTimeout <= current;
+  }
+
+  public long getLastAccessTime() {
+    return lastAccessTime;
+  }
+
+  public long getOperationTimeout() {
+    return operationTimeout;
+  }
+
+  public void setOperationTimeout(long operationTimeout) {
+    this.operationTimeout = operationTimeout;
+  }
+
+  protected void setOperationException(HiveSQLException operationException) {
+    this.operationException = operationException;
+  }
+
+  protected final void assertState(List<OperationState> states) throws HiveSQLException {
+    if (!states.contains(state)) {
+      throw new HiveSQLException("Expected states: " + states.toString() + ", but found "
+          + this.state);
+    }
+    this.lastAccessTime = System.currentTimeMillis();
+  }
+
+  public boolean isDone() {
+    return state.isTerminal();
+  }
+
+  /**
+   * Invoked before runInternal().
+   * Set up some preconditions, or configurations.
+   */
+  protected void beforeRun() {
+    // For Livy operations, this currently does nothing
+  }
+
+  /**
+   * Invoked after runInternal(), even if an exception is thrown in runInternal().
+   * Clean up resources, which was set up in beforeRun().
+   */
+  protected void afterRun() {
+    // For Livy operations, this currently does nothing
+  }
+
+  /**
+   * Implemented by subclass of Operation class to execute specific behaviors.
+   * @throws HiveSQLException
+   */
+  protected abstract void runInternal() throws HiveSQLException;
+
+  public void run() throws HiveSQLException {
+    beforeRun();
+    try {
+      runInternal();
+    } finally {
+      afterRun();
+    }
+  }
+
+  public abstract void cancel(OperationState stateAfterCancel) throws HiveSQLException;
+
+  public abstract void close() throws HiveSQLException;
+
+  public abstract TableSchema getResultSetSchema() throws HiveSQLException;
+
+  public abstract RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException;
+
+  public String getTaskStatus() throws HiveSQLException {
+    return null;
+  }
+
+  /**
+   * Verify if the given fetch orientation is part of the default orientation types.
+   * @param orientation
+   * @throws HiveSQLException
+   */
+  protected void validateDefaultFetchOrientation(FetchOrientation orientation)
+      throws HiveSQLException {
+    validateFetchOrientation(orientation, DEFAULT_FETCH_ORIENTATION_SET);
+  }
+
+  /**
+   * Verify if the given fetch orientation is part of the supported orientation types.
+   * @param orientation
+   * @param supportedOrientations
+   * @throws HiveSQLException
+   */
+  protected void validateFetchOrientation(FetchOrientation orientation,
+      EnumSet<FetchOrientation> supportedOrientations) throws HiveSQLException {
+    if (!supportedOrientations.contains(orientation)) {
+      throw new HiveSQLException("The fetch type " + orientation.toString() +
+          " is not supported for this resultset", "HY106");
+    }
+  }
+
+  protected HiveSQLException toSQLException(String prefix, CommandProcessorResponse response) {
+    HiveSQLException ex = new HiveSQLException(prefix + ": " + response.getErrorMessage(),
+        response.getSQLState(), response.getResponseCode());
+    if (response.getException() != null) {
+      ex.initCause(response.getException());
+    }
+    return ex;
+  }
+
+  public long getBeginTime() {
+    return beginTime;
+  }
+
+  protected OperationState getState() {
+    return state;
+  }
+
+  protected void onNewState(OperationState state, OperationState prevState) {
+    switch(state) {
+      case RUNNING:
+        markOperationStartTime();
+        break;
+      case ERROR:
+      case FINISHED:
+      case CANCELED:
+        markOperationCompletedTime();
+        break;
+    }
+  }
+
+  public long getOperationComplete() {
+    return operationComplete;
+  }
+
+  public long getOperationStart() {
+    return operationStart;
+  }
+
+  protected void markOperationStartTime() {
+    operationStart = System.currentTimeMillis();
+  }
+
+  protected void markOperationCompletedTime() {
+    operationComplete = System.currentTimeMillis();
+  }
+}
diff --git a/thriftserver/server/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/thriftserver/server/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
new file mode 100644
index 0000000..7873e33
--- /dev/null
+++ b/thriftserver/server/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.thrift;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hive.common.auth.HiveAuthUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.ICLIService;
+import org.apache.hive.service.cli.SessionHandle;
+import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportFactory;
+
+
+public class ThriftBinaryCLIService extends ThriftCLIService {
+  private final Runnable oomHook;
+  protected TServer server;
+
+  public ThriftBinaryCLIService(ICLIService cliService, Runnable oomHook) {
+    super(cliService, ThriftBinaryCLIService.class.getSimpleName());
+    this.oomHook = oomHook;
+  }
+
+  @Override
+  protected void initServer() {
+    try {
+      // Server thread pool
+      String threadPoolName = "HiveServer2-Handler-Pool";
+      ExecutorService executorService = new ThreadPoolExecutorWithOomHook(minWorkerThreads, maxWorkerThreads,
+          workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+          new ThreadFactoryWithGarbageCleanup(threadPoolName), oomHook);
+
+      // Thrift configs
+      hiveAuthFactory = new HiveAuthFactory(hiveConf);
+      TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory();
+      TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this);
+      TServerSocket serverSocket = null;
+      List<String> sslVersionBlacklist = new ArrayList<String>();
+      for (String sslVersion : hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(",")) {
+        sslVersionBlacklist.add(sslVersion);
+      }
+      if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) {
+        serverSocket = HiveAuthUtils.getServerSocket(hiveHost, portNum);
+      } else {
+        String keyStorePath = hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH).trim();
+        if (keyStorePath.isEmpty()) {
+          throw new IllegalArgumentException(
+              ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname + " Not configured for SSL connection");
+        }
+        String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf,
+            HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname);
+        serverSocket = HiveAuthUtils.getServerSSLSocket(hiveHost, portNum, keyStorePath, keyStorePassword,
+            sslVersionBlacklist);
+      }
+
+      // Server args
+      int maxMessageSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE);
+      int requestTimeout = (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT,
+          TimeUnit.SECONDS);
+      int beBackoffSlotLength = (int) hiveConf
+          .getTimeVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH, TimeUnit.MILLISECONDS);
+      TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket).processorFactory(processorFactory)
+          .transportFactory(transportFactory).protocolFactory(new TBinaryProtocol.Factory())
+          .inputProtocolFactory(new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize))
+          .requestTimeout(requestTimeout).requestTimeoutUnit(TimeUnit.SECONDS).beBackoffSlotLength(beBackoffSlotLength)
+          .beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS).executorService(executorService);
+
+      // TCP Server
+      server = new TThreadPoolServer(sargs);
+      server.setServerEventHandler(new TServerEventHandler() {
+        @Override
+        public ServerContext createContext(TProtocol input, TProtocol output) {
+          return new ThriftCLIServerContext();
+        }
+
+        @Override
+        public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) {
+          ThriftCLIServerContext context = (ThriftCLIServerContext) serverContext;
+          SessionHandle sessionHandle = context.getSessionHandle();
+          if (sessionHandle != null) {
+            LOG.info("Session disconnected without closing properly. ");
+            try {
+              boolean close = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_CLOSE_SESSION_ON_DISCONNECT);
+              LOG.info("Closing the session: " + sessionHandle);
+              if (close) {
+                cliService.closeSession(sessionHandle);
+              }
+            } catch (HiveSQLException e) {
+              LOG.warn("Failed to close session: " + e, e);
+            }
+          }
+        }
+
+        @Override
+        public void preServe() {
+        }
+
+        @Override
+        public void processContext(ServerContext serverContext, TTransport input, TTransport output) {
+          currentServerContext.set(serverContext);
+        }
+      });
+      String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port " + portNum + " with "
+          + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
+      LOG.info(msg);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to init thrift server", e);
+    }
+  }
+
+  @Override
+  public void run() {
+    try {
+      server.serve();
+    } catch (Throwable t) {
+      if (t instanceof InterruptedException) {
+        // This is likely a shutdown
+        LOG.info("Caught " + t.getClass().getSimpleName() + ". Shutting down.");
+      } else {
+        LOG.error("Exception caught by " + this.getClass().getSimpleName() +
+            ". Exiting.", t);
+      }
+      System.exit(-1);
+    }
+  }
+
+  @Override
+  protected void stopServer() {
+    server.stop();
+    server = null;
+    LOG.info("Thrift server has stopped");
+  }
+
+}
diff --git a/thriftserver/server/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/thriftserver/server/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
new file mode 100644
index 0000000..e16c6de
--- /dev/null
+++ b/thriftserver/server/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -0,0 +1,871 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.thrift;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.hive.service.rpc.thrift.TSetClientInfoReq;
+import org.apache.hive.service.rpc.thrift.TSetClientInfoResp;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import javax.security.auth.login.LoginException;
+import org.apache.hadoop.hive.common.ServerUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hive.service.AbstractService;
+import org.apache.hive.service.ServiceException;
+import org.apache.hive.service.ServiceUtils;
+import org.apache.hive.service.auth.HiveAuthConstants;
+import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.auth.TSetIpAddressProcessor;
+import org.apache.hive.service.cli.FetchOrientation;
+import org.apache.hive.service.cli.FetchType;
+import org.apache.hive.service.cli.GetInfoType;
+import org.apache.hive.service.cli.GetInfoValue;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.ICLIService;
+import org.apache.hive.service.cli.JobProgressUpdate;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.OperationStatus;
+import org.apache.hive.service.cli.OperationType;
+import org.apache.hive.service.cli.ProgressMonitorStatusMapper;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.SessionHandle;
+import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.rpc.thrift.TCLIService;
+import org.apache.hive.service.rpc.thrift.TCancelDelegationTokenReq;
+import org.apache.hive.service.rpc.thrift.TCancelDelegationTokenResp;
+import org.apache.hive.service.rpc.thrift.TCancelOperationReq;
+import org.apache.hive.service.rpc.thrift.TCancelOperationResp;
+import org.apache.hive.service.rpc.thrift.TCloseOperationReq;
+import org.apache.hive.service.rpc.thrift.TCloseOperationResp;
+import org.apache.hive.service.rpc.thrift.TCloseSessionReq;
+import org.apache.hive.service.rpc.thrift.TCloseSessionResp;
+import org.apache.hive.service.rpc.thrift.TExecuteStatementReq;
+import org.apache.hive.service.rpc.thrift.TExecuteStatementResp;
+import org.apache.hive.service.rpc.thrift.TFetchResultsReq;
+import org.apache.hive.service.rpc.thrift.TFetchResultsResp;
+import org.apache.hive.service.rpc.thrift.TGetCatalogsReq;
+import org.apache.hive.service.rpc.thrift.TGetCatalogsResp;
+import org.apache.hive.service.rpc.thrift.TGetColumnsReq;
+import org.apache.hive.service.rpc.thrift.TGetColumnsResp;
+import org.apache.hive.service.rpc.thrift.TGetCrossReferenceReq;
+import org.apache.hive.service.rpc.thrift.TGetCrossReferenceResp;
+import org.apache.hive.service.rpc.thrift.TGetDelegationTokenReq;
+import org.apache.hive.service.rpc.thrift.TGetDelegationTokenResp;
+import org.apache.hive.service.rpc.thrift.TGetFunctionsReq;
+import org.apache.hive.service.rpc.thrift.TGetFunctionsResp;
+import org.apache.hive.service.rpc.thrift.TGetInfoReq;
+import org.apache.hive.service.rpc.thrift.TGetInfoResp;
+import org.apache.hive.service.rpc.thrift.TGetOperationStatusReq;
+import org.apache.hive.service.rpc.thrift.TGetOperationStatusResp;
+import org.apache.hive.service.rpc.thrift.TGetPrimaryKeysReq;
+import org.apache.hive.service.rpc.thrift.TGetPrimaryKeysResp;
+import org.apache.hive.service.rpc.thrift.TGetQueryIdReq;
+import org.apache.hive.service.rpc.thrift.TGetQueryIdResp;
+import org.apache.hive.service.rpc.thrift.TGetResultSetMetadataReq;
+import org.apache.hive.service.rpc.thrift.TGetResultSetMetadataResp;
+import org.apache.hive.service.rpc.thrift.TGetSchemasReq;
+import org.apache.hive.service.rpc.thrift.TGetSchemasResp;
+import org.apache.hive.service.rpc.thrift.TGetTableTypesReq;
+import org.apache.hive.service.rpc.thrift.TGetTableTypesResp;
+import org.apache.hive.service.rpc.thrift.TGetTablesReq;
+import org.apache.hive.service.rpc.thrift.TGetTablesResp;
+import org.apache.hive.service.rpc.thrift.TGetTypeInfoReq;
+import org.apache.hive.service.rpc.thrift.TGetTypeInfoResp;
+import org.apache.hive.service.rpc.thrift.TJobExecutionStatus;
+import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
+import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
+import org.apache.hive.service.rpc.thrift.TProgressUpdateResp;
+import org.apache.hive.service.rpc.thrift.TProtocolVersion;
+import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenReq;
+import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenResp;
+import org.apache.hive.service.rpc.thrift.TStatus;
+import org.apache.hive.service.rpc.thrift.TStatusCode;
+import org.apache.hive.service.server.HiveServer2;
+import org.apache.thrift.TException;
+import org.apache.thrift.server.ServerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.livy.thriftserver.LivyCLIService;
+import org.apache.livy.thriftserver.SessionInfo;
+
+
+/**
+ * ThriftCLIService.
+ *
+ */
+public abstract class ThriftCLIService extends AbstractService implements TCLIService.Iface, Runnable {
+
+  public static final Logger LOG = LoggerFactory.getLogger(ThriftCLIService.class.getName());
+
+  protected ICLIService cliService;
+  private static final TStatus OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS);
+  protected static HiveAuthFactory hiveAuthFactory;
+
+  protected int portNum;
+  protected InetAddress serverIPAddress;
+  protected String hiveHost;
+  private boolean isStarted = false;
+  protected boolean isEmbedded = false;
+
+  protected HiveConf hiveConf;
+
+  protected int minWorkerThreads;
+  protected int maxWorkerThreads;
+  protected long workerKeepAliveTime;
+  private Thread serverThread;
+
+  protected ThreadLocal<ServerContext> currentServerContext;
+
+  static class ThriftCLIServerContext implements ServerContext {
+    private SessionHandle sessionHandle = null;
+
+    public void setSessionHandle(SessionHandle sessionHandle) {
+      this.sessionHandle = sessionHandle;
+    }
+
+    public SessionHandle getSessionHandle() {
+      return sessionHandle;
+    }
+  }
+
+  public ThriftCLIService(ICLIService service, String serviceName) {
+    super(serviceName);
+    this.cliService = service;
+    currentServerContext = new ThreadLocal<ServerContext>();
+  }
+
+  @Override
+  public synchronized void init(HiveConf hiveConf) {
+    this.hiveConf = hiveConf;
+
+    String hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
+    if (hiveHost == null) {
+      hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
+    }
+    try {
+      serverIPAddress = ServerUtils.getHostAddress(hiveHost);
+    } catch (UnknownHostException e) {
+      throw new ServiceException(e);
+    }
+
+    // Initialize common server configs needed in both binary & http modes
+    String portString;
+    // HTTP mode
+    if (HiveServer2.isHTTPTransportMode(hiveConf)) {
+      workerKeepAliveTime =
+          hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME,
+              TimeUnit.SECONDS);
+      portString = System.getenv("HIVE_SERVER2_THRIFT_HTTP_PORT");
+      if (portString != null) {
+        portNum = Integer.parseInt(portString);
+      } else {
+        portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT);
+      }
+    }
+    // Binary mode
+    else {
+      workerKeepAliveTime =
+          hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS);
+      portString = System.getenv("HIVE_SERVER2_THRIFT_PORT");
+      if (portString != null) {
+        portNum = Integer.parseInt(portString);
+      } else {
+        portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT);
+      }
+    }
+    minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS);
+    maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS);
+    super.init(hiveConf);
+  }
+
+  protected abstract void initServer();
+
+  @Override
+  public synchronized void start() {
+    super.start();
+    if (!isStarted && !isEmbedded) {
+      initServer();
+      serverThread = new Thread(this);
+      serverThread.setName("Thrift Server");
+      serverThread.start();
+      isStarted = true;
+    }
+  }
+
+  protected abstract void stopServer();
+
+  @Override
+  public synchronized void stop() {
+    if (isStarted && !isEmbedded) {
+      if (serverThread != null) {
+        serverThread.interrupt();
+        serverThread = null;
+      }
+      stopServer();
+      isStarted = false;
+    }
+    super.stop();
+  }
+
+  public int getPortNumber() {
+    return portNum;
+  }
+
+  public InetAddress getServerIPAddress() {
+    return serverIPAddress;
+  }
+
+  @Override
+  public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq req)
+      throws TException {
+    TGetDelegationTokenResp resp = new TGetDelegationTokenResp();
+
+    if (hiveAuthFactory == null || !hiveAuthFactory.isSASLKerberosUser()) {
+      resp.setStatus(unsecureTokenErrorStatus());
+    } else {
+      try {
+        String token = cliService.getDelegationToken(
+            new SessionHandle(req.getSessionHandle()),
+            hiveAuthFactory, req.getOwner(), req.getRenewer());
+        resp.setDelegationToken(token);
+        resp.setStatus(OK_STATUS);
+      } catch (HiveSQLException e) {
+        LOG.error("Error obtaining delegation token", e);
+        TStatus tokenErrorStatus = HiveSQLException.toTStatus(e);
+        tokenErrorStatus.setSqlState("42000");
+        resp.setStatus(tokenErrorStatus);
+      }
+    }
+    return resp;
+  }
+
+  @Override
+  public TCancelDelegationTokenResp CancelDelegationToken(TCancelDelegationTokenReq req)
+      throws TException {
+    TCancelDelegationTokenResp resp = new TCancelDelegationTokenResp();
+
+    if (hiveAuthFactory == null || !hiveAuthFactory.isSASLKerberosUser()) {
+      resp.setStatus(unsecureTokenErrorStatus());
+    } else {
+      try {
+        cliService.cancelDelegationToken(new SessionHandle(req.getSessionHandle()),
+            hiveAuthFactory, req.getDelegationToken());
+        resp.setStatus(OK_STATUS);
+      } catch (HiveSQLException e) {
+        LOG.error("Error canceling delegation token", e);
+        resp.setStatus(HiveSQLException.toTStatus(e));
+      }
+    }
+    return resp;
+  }
+
+  @Override
+  public TRenewDelegationTokenResp RenewDelegationToken(TRenewDelegationTokenReq req)
+      throws TException {
+    TRenewDelegationTokenResp resp = new TRenewDelegationTokenResp();
+    if (hiveAuthFactory == null || !hiveAuthFactory.isSASLKerberosUser()) {
+      resp.setStatus(unsecureTokenErrorStatus());
+    } else {
+      try {
+        cliService.renewDelegationToken(new SessionHandle(req.getSessionHandle()),
+            hiveAuthFactory, req.getDelegationToken());
+        resp.setStatus(OK_STATUS);
+      } catch (HiveSQLException e) {
+        LOG.error("Error obtaining renewing token", e);
+        resp.setStatus(HiveSQLException.toTStatus(e));
+      }
+    }
+    return resp;
+  }
+
+  private TStatus unsecureTokenErrorStatus() {
+    TStatus errorStatus = new TStatus(TStatusCode.ERROR_STATUS);
+    errorStatus.setErrorMessage("Delegation token only supported over remote " +
+        "client with kerberos authentication");
+    return errorStatus;
+  }
+
+  @Override
+  public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException {
+    LOG.info("Client protocol version: " + req.getClient_protocol());
+    TOpenSessionResp resp = new TOpenSessionResp();
+    try {
+      SessionHandle sessionHandle = getSessionHandle(req, resp);
+      resp.setSessionHandle(sessionHandle.toTSessionHandle());
+      Map<String, String> configurationMap = new HashMap<String, String>();
+      // Set the updated fetch size from the server into the configuration map for the client
+      String defaultFetchSize = Integer.toString(
+          hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE));
+      configurationMap.put(
+        HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.varname,
+          defaultFetchSize);
+      resp.setConfiguration(configurationMap);
+      resp.setStatus(OK_STATUS);
+      ThriftCLIServerContext context =
+        (ThriftCLIServerContext)currentServerContext.get();
+      if (context != null) {
+        context.setSessionHandle(sessionHandle);
+      }
+    } catch (Exception e) {
+      LOG.warn("Error opening session: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TSetClientInfoResp SetClientInfo(TSetClientInfoReq req) throws TException {
+    // TODO: We don't do anything for now, just log this for debugging.
+    //       We may be able to make use of this later, e.g. for workload management.
+    TSetClientInfoResp resp = null;
+    if (req.isSetConfiguration()) {
+      StringBuilder sb = null;
+      SessionHandle sh = null;
+      for (Map.Entry<String, String> e : req.getConfiguration().entrySet()) {
+        if (sb == null) {
+          sh = new SessionHandle(req.getSessionHandle());
+          sb = new StringBuilder("Client information for ").append(sh).append(": ");
+        } else {
+          sb.append(", ");
+        }
+        sb.append(e.getKey()).append(" = ").append(e.getValue());
+        if ("ApplicationName".equals(e.getKey())) {
+          try {
+            cliService.setApplicationName(sh, e.getValue());
+          } catch (Exception ex) {
+            LOG.warn("Error setting application name", ex);
+            resp = new TSetClientInfoResp(HiveSQLException.toTStatus(ex));
+          }
+        }
+      }
+      if (sb != null) {
+        LOG.info("{}", sb);
+      }
+    }
+    return resp == null ? new TSetClientInfoResp(OK_STATUS) : resp;
+  }
+
+  private String getIpAddress() {
+    String clientIpAddress;
+    // Http transport mode.
+    // We set the thread local ip address, in ThriftHttpServlet.
+    if (hiveConf.getVar(
+        ConfVars.HIVE_SERVER2_TRANSPORT_MODE).equalsIgnoreCase("http")) {
+      clientIpAddress = SessionInfo.getIpAddress();
+    }
+    else {
+      if (hiveAuthFactory != null && hiveAuthFactory.isSASLWithKerberizedHadoop()) {
+        clientIpAddress = hiveAuthFactory.getIpAddress();
+      }
+      // NOSASL
+      else {
+        clientIpAddress = TSetIpAddressProcessor.getUserIpAddress();
+      }
+    }
+    LOG.debug("Client's IP Address: " + clientIpAddress);
+    return clientIpAddress;
+  }
+
+  /**
+   * Returns the effective username.
+   * 1. If hive.server2.allow.user.substitution = false: the username of the connecting user
+   * 2. If hive.server2.allow.user.substitution = true: the username of the end user,
+   * that the connecting user is trying to proxy for.
+   * This includes a check whether the connecting user is allowed to proxy for the end user.
+   * @param req
+   * @return
+   * @throws HiveSQLException
+   */
+  private String getUserName(TOpenSessionReq req) throws HiveSQLException, IOException {
+    String userName = null;
+
+    if (hiveAuthFactory != null && hiveAuthFactory.isSASLWithKerberizedHadoop()) {
+      userName = hiveAuthFactory.getRemoteUser();
+    }
+    // NOSASL
+    if (userName == null) {
+      userName = TSetIpAddressProcessor.getUserName();
+    }
+    // Http transport mode.
+    // We set the thread local username, in ThriftHttpServlet.
+    if (hiveConf.getVar(
+        ConfVars.HIVE_SERVER2_TRANSPORT_MODE).equalsIgnoreCase("http")) {
+      userName = SessionInfo.getUserName();
+    }
+    if (userName == null) {
+      userName = req.getUsername();
+    }
+
+    userName = getShortName(userName);
+    String effectiveClientUser = getProxyUser(userName, req.getConfiguration(), getIpAddress());
+    LOG.debug("Client's username: " + effectiveClientUser);
+    return effectiveClientUser;
+  }
+
+  private String getShortName(String userName) throws IOException {
+    String ret = null;
+
+    if (userName != null) {
+      if (hiveAuthFactory != null && hiveAuthFactory.isSASLKerberosUser()) {
+        // KerberosName.getShorName can only be used for kerberos user, but not for the user
+        // logged in via other authentications such as LDAP
+        KerberosNameShim fullKerberosName = ShimLoader.getHadoopShims().getKerberosNameShim(userName);
+        ret = fullKerberosName.getShortName();
+      } else {
+        int indexOfDomainMatch = ServiceUtils.indexOfDomainMatch(userName);
+        ret = (indexOfDomainMatch <= 0) ? userName : userName.substring(0, indexOfDomainMatch);
+      }
+    }
+
+    return ret;
+  }
+
+  /**
+   * Create a session handle
+   * @param req
+   * @param res
+   * @return
+   * @throws HiveSQLException
+   * @throws LoginException
+   * @throws IOException
+   */
+  SessionHandle getSessionHandle(TOpenSessionReq req, TOpenSessionResp res)
+      throws HiveSQLException, LoginException, IOException {
+    String userName = getUserName(req);
+    String ipAddress = getIpAddress();
+    TProtocolVersion protocol = getMinVersion(LivyCLIService.SERVER_VERSION(),
+        req.getClient_protocol());
+    SessionHandle sessionHandle;
+    if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) &&
+        (userName != null)) {
+      sessionHandle = ((LivyCLIService) cliService).openSessionWithImpersonation(protocol,
+          userName, req.getPassword(), ipAddress, req.getConfiguration(), null);
+    } else {
+      sessionHandle = ((LivyCLIService) cliService).openSession(protocol, userName,
+          req.getPassword(), ipAddress, req.getConfiguration());
+    }
+    res.setServerProtocolVersion(protocol);
+    return sessionHandle;
+  }
+
+  private double getProgressedPercentage(OperationHandle opHandle) throws HiveSQLException {
+    checkArgument(OperationType.EXECUTE_STATEMENT.equals(opHandle.getOperationType()));
+    return 0.0;
+  }
+
+  private TProtocolVersion getMinVersion(TProtocolVersion... versions) {
+    TProtocolVersion[] values = TProtocolVersion.values();
+    int current = values[values.length - 1].getValue();
+    for (TProtocolVersion version : versions) {
+      if (current > version.getValue()) {
+        current = version.getValue();
+      }
+    }
+    for (TProtocolVersion version : values) {
+      if (version.getValue() == current) {
+        return version;
+      }
+    }
+    throw new IllegalArgumentException("never");
+  }
+
+  @Override
+  public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException {
+    TCloseSessionResp resp = new TCloseSessionResp();
+    try {
+      SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle());
+      cliService.closeSession(sessionHandle);
+      resp.setStatus(OK_STATUS);
+      ThriftCLIServerContext context =
+        (ThriftCLIServerContext)currentServerContext.get();
+      if (context != null) {
+        context.setSessionHandle(null);
+      }
+    } catch (Exception e) {
+      LOG.warn("Error closing session: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetInfoResp GetInfo(TGetInfoReq req) throws TException {
+    TGetInfoResp resp = new TGetInfoResp();
+    try {
+      GetInfoValue getInfoValue =
+          cliService.getInfo(new SessionHandle(req.getSessionHandle()),
+              GetInfoType.getGetInfoType(req.getInfoType()));
+      resp.setInfoValue(getInfoValue.toTGetInfoValue());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting info: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws TException {
+    TExecuteStatementResp resp = new TExecuteStatementResp();
+    try {
+      SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle());
+      String statement = req.getStatement();
+      Map<String, String> confOverlay = req.getConfOverlay();
+      Boolean runAsync = req.isRunAsync();
+      long queryTimeout = req.getQueryTimeout();
+      OperationHandle operationHandle =
+          runAsync ? cliService.executeStatementAsync(sessionHandle, statement, confOverlay,
+              queryTimeout) : cliService.executeStatement(sessionHandle, statement, confOverlay,
+              queryTimeout);
+      resp.setOperationHandle(operationHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      // Note: it's rather important that this (and other methods) catch Exception, not Throwable;
+      // in combination with HiveSessionProxy.invoke code, perhaps unintentionally, it used
+      // to also catch all errors; and now it allows OOMs only to propagate.
+      LOG.warn("Error executing statement: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws TException {
+    TGetTypeInfoResp resp = new TGetTypeInfoResp();
+    try {
+      OperationHandle operationHandle = cliService.getTypeInfo(new SessionHandle(req.getSessionHandle()));
+      resp.setOperationHandle(operationHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting type info: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws TException {
+    TGetCatalogsResp resp = new TGetCatalogsResp();
+    try {
+      OperationHandle opHandle = cliService.getCatalogs(new SessionHandle(req.getSessionHandle()));
+      resp.setOperationHandle(opHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting catalogs: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws TException {
+    TGetSchemasResp resp = new TGetSchemasResp();
+    try {
+      OperationHandle opHandle = cliService.getSchemas(
+          new SessionHandle(req.getSessionHandle()), req.getCatalogName(), req.getSchemaName());
+      resp.setOperationHandle(opHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting schemas: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetTablesResp GetTables(TGetTablesReq req) throws TException {
+    TGetTablesResp resp = new TGetTablesResp();
+    try {
+      OperationHandle opHandle = cliService
+          .getTables(new SessionHandle(req.getSessionHandle()), req.getCatalogName(),
+              req.getSchemaName(), req.getTableName(), req.getTableTypes());
+      resp.setOperationHandle(opHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting tables: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetTableTypesResp GetTableTypes(TGetTableTypesReq req) throws TException {
+    TGetTableTypesResp resp = new TGetTableTypesResp();
+    try {
+      OperationHandle opHandle = cliService.getTableTypes(new SessionHandle(req.getSessionHandle()));
+      resp.setOperationHandle(opHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting table types: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetColumnsResp GetColumns(TGetColumnsReq req) throws TException {
+    TGetColumnsResp resp = new TGetColumnsResp();
+    try {
+      OperationHandle opHandle = cliService.getColumns(
+          new SessionHandle(req.getSessionHandle()),
+          req.getCatalogName(),
+          req.getSchemaName(),
+          req.getTableName(),
+          req.getColumnName());
+      resp.setOperationHandle(opHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting columns: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetFunctionsResp GetFunctions(TGetFunctionsReq req) throws TException {
+    TGetFunctionsResp resp = new TGetFunctionsResp();
+    try {
+      OperationHandle opHandle = cliService.getFunctions(
+          new SessionHandle(req.getSessionHandle()), req.getCatalogName(),
+          req.getSchemaName(), req.getFunctionName());
+      resp.setOperationHandle(opHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting functions: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) throws TException {
+    TGetOperationStatusResp resp = new TGetOperationStatusResp();
+    OperationHandle operationHandle = new OperationHandle(req.getOperationHandle());
+    try {
+      OperationStatus operationStatus =
+          cliService.getOperationStatus(operationHandle, req.isGetProgressUpdate());
+
+      resp.setOperationState(operationStatus.getState().toTOperationState());
+      resp.setErrorMessage(operationStatus.getState().getErrorMessage());
+      HiveSQLException opException = operationStatus.getOperationException();
+      resp.setTaskStatus(operationStatus.getTaskStatus());
+      resp.setOperationStarted(operationStatus.getOperationStarted());
+      resp.setOperationCompleted(operationStatus.getOperationCompleted());
+      resp.setHasResultSet(operationStatus.getHasResultSet());
+      JobProgressUpdate progressUpdate = operationStatus.jobProgressUpdate();
+      ProgressMonitorStatusMapper mapper = ProgressMonitorStatusMapper.DEFAULT;
+
+      TJobExecutionStatus executionStatus =
+          mapper.forStatus(progressUpdate.status);
+      resp.setProgressUpdateResponse(new TProgressUpdateResp(
+          progressUpdate.headers(),
+          progressUpdate.rows(),
+          progressUpdate.progressedPercentage,
+          executionStatus,
+          progressUpdate.footerSummary,
+          progressUpdate.startTimeMillis
+      ));
+      if (opException != null) {
+        resp.setSqlState(opException.getSQLState());
+        resp.setErrorCode(opException.getErrorCode());
+        if (opException.getErrorCode() == 29999)
+          resp.setErrorMessage(org.apache.hadoop.util.StringUtils.stringifyException(opException));
+        else
+          resp.setErrorMessage(opException.getMessage());
+      } else if (executionStatus == TJobExecutionStatus.NOT_AVAILABLE
+          && OperationType.EXECUTE_STATEMENT.equals(operationHandle.getOperationType())) {
+        resp.getProgressUpdateResponse().setProgressedPercentage(
+            getProgressedPercentage(operationHandle));
+      }
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting operation status: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TCancelOperationResp CancelOperation(TCancelOperationReq req) throws TException {
+    TCancelOperationResp resp = new TCancelOperationResp();
+    try {
+      cliService.cancelOperation(new OperationHandle(req.getOperationHandle()));
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error cancelling operation: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TCloseOperationResp CloseOperation(TCloseOperationReq req) throws TException {
+    TCloseOperationResp resp = new TCloseOperationResp();
+    try {
+      cliService.closeOperation(new OperationHandle(req.getOperationHandle()));
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error closing operation: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetResultSetMetadataResp GetResultSetMetadata(TGetResultSetMetadataReq req)
+      throws TException {
+    TGetResultSetMetadataResp resp = new TGetResultSetMetadataResp();
+    try {
+      TableSchema schema = cliService.getResultSetMetadata(new OperationHandle(req.getOperationHandle()));
+      resp.setSchema(schema.toTTableSchema());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting result set metadata: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException {
+    TFetchResultsResp resp = new TFetchResultsResp();
+    try {
+      // Set fetch size
+      int maxFetchSize =
+        hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE);
+      if (req.getMaxRows() > maxFetchSize) {
+        req.setMaxRows(maxFetchSize);
+      }
+      RowSet rowSet = cliService.fetchResults(
+          new OperationHandle(req.getOperationHandle()),
+          FetchOrientation.getFetchOrientation(req.getOrientation()),
+          req.getMaxRows(),
+          FetchType.getFetchType(req.getFetchType()));
+      resp.setResults(rowSet.toTRowSet());
+      resp.setHasMoreRows(false);
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error fetching results: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetPrimaryKeysResp GetPrimaryKeys(TGetPrimaryKeysReq req)
+		throws TException {
+    TGetPrimaryKeysResp resp = new TGetPrimaryKeysResp();
+    try {
+      OperationHandle opHandle = cliService.getPrimaryKeys(
+      new SessionHandle(req.getSessionHandle()), req.getCatalogName(),
+      req.getSchemaName(), req.getTableName());
+      resp.setOperationHandle(opHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+     LOG.warn("Error getting functions: ", e);
+     resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetCrossReferenceResp GetCrossReference(TGetCrossReferenceReq req)
+		throws TException {
+    TGetCrossReferenceResp resp = new TGetCrossReferenceResp();
+    try {
+      OperationHandle opHandle = cliService.getCrossReference(
+        new SessionHandle(req.getSessionHandle()), req.getParentCatalogName(),
+	      req.getParentSchemaName(), req.getParentTableName(),
+          req.getForeignCatalogName(), req.getForeignSchemaName(), req.getForeignTableName());
+          resp.setOperationHandle(opHandle.toTOperationHandle());
+          resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting functions: ", e);
+	  resp.setStatus(HiveSQLException.toTStatus(e));
+	}
+    return resp;
+  }
+
+  @Override
+  public TGetQueryIdResp GetQueryId(TGetQueryIdReq req) throws TException {
+    try {
+      return new TGetQueryIdResp(cliService.getQueryId(req.getOperationHandle()));
+    } catch (HiveSQLException e) {
+      throw new TException(e);
+    }
+  }
+
+  @Override
+  public abstract void run();
+
+  /**
+   * If the proxy user name is provided then check privileges to substitute the user.
+   * @param realUser
+   * @param sessionConf
+   * @param ipAddress
+   * @return
+   * @throws HiveSQLException
+   */
+  private String getProxyUser(String realUser, Map<String, String> sessionConf,
+      String ipAddress) throws HiveSQLException {
+    String proxyUser = null;
+    // Http transport mode.
+    // We set the thread local proxy username, in ThriftHttpServlet.
+    if (hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE).equalsIgnoreCase("http")) {
+      proxyUser = SessionInfo.getProxyUserName();
+      LOG.debug("Proxy user from query string: " + proxyUser);
+    }
+
+    if (proxyUser == null && sessionConf != null && sessionConf.containsKey(HiveAuthConstants.HS2_PROXY_USER)) {
+      String proxyUserFromThriftBody = sessionConf.get(HiveAuthConstants.HS2_PROXY_USER);
+      LOG.debug("Proxy user from thrift body: " + proxyUserFromThriftBody);
+      proxyUser = proxyUserFromThriftBody;
+    }
+
+    if (proxyUser == null) {
+      return realUser;
+    }
+
+    // check whether substitution is allowed
+    if (!hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ALLOW_USER_SUBSTITUTION)) {
+      throw new HiveSQLException("Proxy user substitution is not allowed");
+    }
+
+    // If there's no authentication, then directly substitute the user
+    if (HiveAuthConstants.AuthTypes.NONE.toString().
+        equalsIgnoreCase(hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION))) {
+      return proxyUser;
+    }
+
+    // Verify proxy user privilege of the realUser for the proxyUser
+    HiveAuthFactory.verifyProxyAccess(realUser, proxyUser, ipAddress, hiveConf);
+    LOG.debug("Verified proxy user: " + proxyUser);
+    return proxyUser;
+  }
+}
diff --git a/thriftserver/server/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java b/thriftserver/server/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
new file mode 100644
index 0000000..4e48cc5
--- /dev/null
+++ b/thriftserver/server/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.thrift;
+
+import java.util.Arrays;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.HttpMethod;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.cli.ICLIService;
+import org.apache.hive.service.rpc.thrift.TCLIService;
+import org.apache.hive.service.rpc.thrift.TCLIService.Iface;
+import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
+import org.apache.livy.thriftserver.LivyCLIService;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TServlet;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.gzip.GzipHandler;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.util.thread.ExecutorThreadPool;
+
+
+public class ThriftHttpCLIService extends ThriftCLIService {
+  private static final String APPLICATION_THRIFT = "application/x-thrift";
+  protected org.eclipse.jetty.server.Server server;
+
+  private final Runnable oomHook;
+  public ThriftHttpCLIService(ICLIService cliService, Runnable oomHook) {
+    super(cliService, ThriftHttpCLIService.class.getSimpleName());
+    this.oomHook = oomHook;
+  }
+
+  /**
+   * Configure Jetty to serve http requests. Example of a client connection URL:
+   * http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target
+   * URL to differ, e.g. http://gateway:port/hive2/servlets/thrifths2/
+   */
+  @Override
+  protected void initServer() {
+    try {
+      // Server thread pool
+      // Start with minWorkerThreads, expand till maxWorkerThreads and reject
+      // subsequent requests
+      String threadPoolName = "HiveServer2-HttpHandler-Pool";
+      ExecutorService executorService = new ThreadPoolExecutorWithOomHook(minWorkerThreads,
+          maxWorkerThreads,workerKeepAliveTime, TimeUnit.SECONDS,
+          new SynchronousQueue<Runnable>(), new ThreadFactoryWithGarbageCleanup(threadPoolName), oomHook);
+      ExecutorThreadPool threadPool = new ExecutorThreadPool(executorService);
+
+      // HTTP Server
+      server = new Server(threadPool);
+
+      ServerConnector connector;
+
+      final HttpConfiguration conf = new HttpConfiguration();
+      // Configure header size
+      int requestHeaderSize =
+          hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_REQUEST_HEADER_SIZE);
+      int responseHeaderSize =
+          hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_RESPONSE_HEADER_SIZE);
+      conf.setRequestHeaderSize(requestHeaderSize);
+      conf.setResponseHeaderSize(responseHeaderSize);
+      final HttpConnectionFactory http = new HttpConnectionFactory(conf);
+
+      boolean useSsl = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL);
+      String schemeName = useSsl ? "https" : "http";
+
+      // Change connector if SSL is used
+      if (useSsl) {
+        String keyStorePath = hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH).trim();
+        String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf,
+            HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname);
+        if (keyStorePath.isEmpty()) {
+          throw new IllegalArgumentException(
+              ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname 
+              + " Not configured for SSL connection");
+        }
+        SslContextFactory sslContextFactory = new SslContextFactory();
+        String[] excludedProtocols = hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(",");
+        LOG.info("HTTP Server SSL: adding excluded protocols: " + Arrays.toString(excludedProtocols));
+        sslContextFactory.addExcludeProtocols(excludedProtocols);
+        LOG.info("HTTP Server SSL: SslContextFactory.getExcludeProtocols = "
+            + Arrays.toString(sslContextFactory.getExcludeProtocols()));
+        sslContextFactory.setKeyStorePath(keyStorePath);
+        sslContextFactory.setKeyStorePassword(keyStorePassword);
+        connector = new ServerConnector(server, sslContextFactory, http);
+      } else {
+        connector = new ServerConnector(server, http);
+      }
+
+      connector.setPort(portNum);
+      // Linux:yes, Windows:no
+      connector.setReuseAddress(true);
+      int maxIdleTime = (int) hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME,
+          TimeUnit.MILLISECONDS);
+      connector.setIdleTimeout(maxIdleTime);
+
+      server.addConnector(connector);
+
+      // Thrift configs
+      hiveAuthFactory = new HiveAuthFactory(hiveConf);
+      TProcessor processor = new TCLIService.Processor<Iface>(this);
+      TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
+      // Set during the init phase of HiveServer2 if auth mode is kerberos
+      // UGI for the hive/_HOST (kerberos) principal
+      UserGroupInformation serviceUGI = ((LivyCLIService) cliService).getServiceUGI();
+      // UGI for the http/_HOST (SPNego) principal
+      UserGroupInformation httpUGI = ((LivyCLIService) cliService).getHttpUGI();
+      String authType = hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION);
+      TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, authType, serviceUGI, httpUGI,
+          hiveAuthFactory);
+
+      // Context handler
+      final ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+      context.setContextPath("/");
+      if (hiveConf.getBoolean(ConfVars.HIVE_SERVER2_XSRF_FILTER_ENABLED.varname, false)) {
+        // context.addFilter(Utils.getXSRFFilterHolder(null, null), "/" ,
+        // FilterMapping.REQUEST);
+        // Filtering does not work here currently, doing filter in ThriftHttpServlet
+        LOG.debug("XSRF filter enabled");
+      } else {
+        LOG.warn("XSRF filter disabled");
+      }
+
+      final String httpPath = getHttpPath(hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH));
+
+      if (HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_SERVER2_THRIFT_HTTP_COMPRESSION_ENABLED)) {
+        final GzipHandler gzipHandler = new GzipHandler();
+        gzipHandler.setHandler(context);
+        gzipHandler.addIncludedMethods(HttpMethod.POST);
+        gzipHandler.addIncludedMimeTypes(APPLICATION_THRIFT);
+        server.setHandler(gzipHandler);
+      } else {
+        server.setHandler(context);
+      }
+      context.addServlet(new ServletHolder(thriftHttpServlet), httpPath);
+
+      // TODO: check defaults: maxTimeout, keepalive, maxBodySize,
+      // bodyRecieveDuration, etc.
+      // Finally, start the server
+      server.start();
+      String msg = "Started " + ThriftHttpCLIService.class.getSimpleName() + " in " + schemeName
+          + " mode on port " + portNum + " path=" + httpPath + " with " + minWorkerThreads + "..."
+          + maxWorkerThreads + " worker threads";
+      LOG.info(msg);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to init HttpServer", e);
+    }
+  }
+
+  @Override
+  public void run() {
+    try {
+      server.join();
+    } catch (Throwable t) {
+      if (t instanceof InterruptedException) {
+        // This is likely a shutdown
+        LOG.info("Caught " + t.getClass().getSimpleName() + ". Shutting down thrift server.");
+      } else {
+        LOG.error("Exception caught by " + ThriftHttpCLIService.class.getSimpleName() +
+            ". Exiting.", t);
+        System.exit(-1);
+      }
+    }
+  }
+
+  /**
+   * The config parameter can be like "path", "/path", "/path/", "path/*", "/path1/path2/*" and so on.
+   * httpPath should end up as "/*", "/path/*" or "/path1/../pathN/*"
+   * @param httpPath
+   * @return
+   */
+  private String getHttpPath(String httpPath) {
+    if(httpPath == null || httpPath.equals("")) {
+      httpPath = "/*";
+    }
+    else {
+      if(!httpPath.startsWith("/")) {
+        httpPath = "/" + httpPath;
+      }
+      if(httpPath.endsWith("/")) {
+        httpPath = httpPath + "*";
+      }
+      if(!httpPath.endsWith("/*")) {
+        httpPath = httpPath + "/*";
+      }
+    }
+    return httpPath;
+  }
+
+  @Override
+  protected void stopServer() {
+    if((server != null) && server.isStarted()) {
+      try {
+        server.stop();
+        server = null;
+        LOG.info("Thrift HTTP server has been stopped");
+      } catch (Exception e) {
+        LOG.error("Error stopping HTTP server: ", e);
+      }
+    }
+  }
+
+}
diff --git a/thriftserver/server/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java b/thriftserver/server/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java
new file mode 100644
index 0000000..33c942f
--- /dev/null
+++ b/thriftserver/server/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.thrift;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.security.PrivilegedExceptionAction;
+import java.security.SecureRandom;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.Cookie;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.NewCookie;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.codec.binary.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.service.CookieSigner;
+import org.apache.hive.service.auth.AuthenticationProviderFactory;
+import org.apache.hive.service.auth.AuthenticationProviderFactory.AuthMethods;
+import org.apache.hive.service.auth.HiveAuthConstants;
+import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.auth.HttpAuthUtils;
+import org.apache.hive.service.auth.HttpAuthenticationException;
+import org.apache.hive.service.auth.PasswdAuthenticationProvider;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TServlet;
+import org.ietf.jgss.GSSContext;
+import org.ietf.jgss.GSSCredential;
+import org.ietf.jgss.GSSException;
+import org.ietf.jgss.GSSManager;
+import org.ietf.jgss.GSSName;
+import org.ietf.jgss.Oid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.livy.thriftserver.SessionInfo;
+
+/**
+ *
+ * ThriftHttpServlet
+ *
+ */
+public class ThriftHttpServlet extends TServlet {
+
+  private static final long serialVersionUID = 1L;
+  public static final Logger LOG = LoggerFactory.getLogger(ThriftHttpServlet.class.getName());
+  private final String authType;
+  private final UserGroupInformation serviceUGI;
+  private final UserGroupInformation httpUGI;
+  private HiveConf hiveConf = new HiveConf();
+
+  // Class members for cookie based authentication.
+  private CookieSigner signer;
+  public static final String AUTH_COOKIE = "hive.server2.auth";
+  private static final SecureRandom RAN = new SecureRandom();
+  private boolean isCookieAuthEnabled;
+  private String cookieDomain;
+  private String cookiePath;
+  private int cookieMaxAge;
+  private boolean isCookieSecure;
+  private boolean isHttpOnlyCookie;
+  private final HiveAuthFactory hiveAuthFactory;
+  private static final String HIVE_DELEGATION_TOKEN_HEADER =  "X-Hive-Delegation-Token";
+  private static final String X_FORWARDED_FOR = "X-Forwarded-For";
+
+  public ThriftHttpServlet(TProcessor processor, TProtocolFactory protocolFactory,
+      String authType, UserGroupInformation serviceUGI, UserGroupInformation httpUGI,
+      HiveAuthFactory hiveAuthFactory) {
+    super(processor, protocolFactory);
+    this.authType = authType;
+    this.serviceUGI = serviceUGI;
+    this.httpUGI = httpUGI;
+    this.hiveAuthFactory = hiveAuthFactory;
+    this.isCookieAuthEnabled = hiveConf.getBoolVar(
+      ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_AUTH_ENABLED);
+    // Initialize the cookie based authentication related variables.
+    if (isCookieAuthEnabled) {
+      // Generate the signer with secret.
+      String secret = Long.toString(RAN.nextLong());
+      LOG.debug("Using the random number as the secret for cookie generation " + secret);
+      this.signer = new CookieSigner(secret.getBytes());
+      this.cookieMaxAge = (int) hiveConf.getTimeVar(
+        ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_MAX_AGE, TimeUnit.SECONDS);
+      this.cookieDomain = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_DOMAIN);
+      this.cookiePath = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_PATH);
+      // always send secure cookies for SSL mode
+      this.isCookieSecure = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL);
+      this.isHttpOnlyCookie = hiveConf.getBoolVar(
+        ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_IS_HTTPONLY);
+    }
+  }
+
+  @Override
+  protected void doPost(HttpServletRequest request, HttpServletResponse response)
+      throws ServletException, IOException {
+    String clientUserName = null;
+    String clientIpAddress;
+    boolean requireNewCookie = false;
+
+    try {
+      if (hiveConf.getBoolean(ConfVars.HIVE_SERVER2_XSRF_FILTER_ENABLED.varname,false)){
+        boolean continueProcessing = Utils.doXsrfFilter(request,response,null,null);
+        if (!continueProcessing){
+          LOG.warn("Request did not have valid XSRF header, rejecting.");
+          return;
+        }
+      }
+      // If the cookie based authentication is already enabled, parse the
+      // request and validate the request cookies.
+      if (isCookieAuthEnabled) {
+        clientUserName = validateCookie(request);
+        requireNewCookie = (clientUserName == null);
+        if (requireNewCookie) {
+          LOG.info("Could not validate cookie sent, will try to generate a new cookie");
+        }
+      }
+      // If the cookie based authentication is not enabled or the request does
+      // not have a valid cookie, use the kerberos or password based authentication
+      // depending on the server setup.
+      if (clientUserName == null) {
+        // For a kerberos setup
+        if (isKerberosAuthMode(authType)) {
+          String delegationToken = request.getHeader(HIVE_DELEGATION_TOKEN_HEADER);
+          // Each http request must have an Authorization header
+          if ((delegationToken != null) && (!delegationToken.isEmpty())) {
+            clientUserName = doTokenAuth(request, response);
+          } else {
+            clientUserName = doKerberosAuth(request);
+          }
+        }
+        // For password based authentication
+        else {
+          clientUserName = doPasswdAuth(request, authType);
+        }
+      }
+      LOG.debug("Client username: " + clientUserName);
+
+      // Set the thread local username to be used for doAs if true
+      SessionInfo.setUserName(clientUserName);
+
+      // find proxy user if any from query param
+      String doAsQueryParam = getDoAsQueryParam(request.getQueryString());
+      if (doAsQueryParam != null) {
+        SessionInfo.setProxyUserName(doAsQueryParam);
+      }
+
+      clientIpAddress = request.getRemoteAddr();
+      LOG.debug("Client IP Address: " + clientIpAddress);
+      // Set the thread local ip address
+      SessionInfo.setIpAddress(clientIpAddress);
+
+      // get forwarded hosts address
+      String forwarded_for = request.getHeader(X_FORWARDED_FOR);
+      if (forwarded_for != null) {
+        LOG.debug("{}:{}", X_FORWARDED_FOR, forwarded_for);
+        List<String> forwardedAddresses = Arrays.asList(forwarded_for.split(","));
+        SessionInfo.setForwardedAddresses(forwardedAddresses);
+      } else {
+        SessionInfo.setForwardedAddresses(Collections.<String>emptyList());
+      }
+
+      // Generate new cookie and add it to the response
+      if (requireNewCookie &&
+          !authType.equalsIgnoreCase(HiveAuthConstants.AuthTypes.NOSASL.toString())) {
+        String cookieToken = HttpAuthUtils.createCookieToken(clientUserName);
+        Cookie hs2Cookie = createCookie(signer.signCookie(cookieToken));
+
+        if (isHttpOnlyCookie) {
+          response.setHeader("SET-COOKIE", getHttpOnlyCookieHeader(hs2Cookie));
+        } else {
+          response.addCookie(hs2Cookie);
+        }
+        LOG.info("Cookie added for clientUserName " + clientUserName);
+      }
+      super.doPost(request, response);
+    }
+    catch (HttpAuthenticationException e) {
+      LOG.error("Error: ", e);
+      // Send a 401 to the client
+      response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
+      if(isKerberosAuthMode(authType)) {
+        response.addHeader(HttpAuthUtils.WWW_AUTHENTICATE, HttpAuthUtils.NEGOTIATE);
+      }
+      response.getWriter().println("Authentication Error: " + e.getMessage());
+    }
+    finally {
+      // Clear the thread locals
+      SessionInfo.clearUserName();
+      SessionInfo.clearIpAddress();
+      SessionInfo.clearProxyUserName();
+      SessionInfo.clearForwardedAddresses();
+    }
+  }
+
+  /**
+   * Retrieves the client name from cookieString. If the cookie does not
+   * correspond to a valid client, the function returns null.
+   * @param cookies HTTP Request cookies.
+   * @return Client Username if cookieString has a HS2 Generated cookie that is currently valid.
+   * Else, returns null.
+   */
+  private String getClientNameFromCookie(Cookie[] cookies) {
+    // Current Cookie Name, Current Cookie Value
+    String currName, currValue;
+
+    // Following is the main loop which iterates through all the cookies send by the client.
+    // The HS2 generated cookies are of the format hive.server2.auth=<value>
+    // A cookie which is identified as a hiveserver2 generated cookie is validated
+    // by calling signer.verifyAndExtract(). If the validation passes, send the
+    // username for which the cookie is validated to the caller. If no client side
+    // cookie passes the validation, return null to the caller.
+    for (Cookie currCookie : cookies) {
+      // Get the cookie name
+      currName = currCookie.getName();
+      if (!currName.equals(AUTH_COOKIE)) {
+        // Not a HS2 generated cookie, continue.
+        continue;
+      }
+      // If we reached here, we have match for HS2 generated cookie
+      currValue = currCookie.getValue();
+      // Validate the value.
+      currValue = signer.verifyAndExtract(currValue);
+      // Retrieve the user name, do the final validation step.
+      if (currValue != null) {
+        String userName = HttpAuthUtils.getUserNameFromCookieToken(currValue);
+
+        if (userName == null) {
+          LOG.warn("Invalid cookie token " + currValue);
+          continue;
+        }
+        //We have found a valid cookie in the client request.
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Validated the cookie for user " + userName);
+        }
+        return userName;
+      }
+    }
+    // No valid HS2 generated cookies found, return null
+    return null;
+  }
+
+  /**
+   * Convert cookie array to human readable cookie string
+   * @param cookies Cookie Array
+   * @return String containing all the cookies separated by a newline character.
+   * Each cookie is of the format [key]=[value]
+   */
+  private String toCookieStr(Cookie[] cookies) {
+	String cookieStr = "";
+
+	for (Cookie c : cookies) {
+     cookieStr += c.getName() + "=" + c.getValue() + " ;\n";
+    }
+    return cookieStr;
+  }
+
+  /**
+   * Validate the request cookie. This function iterates over the request cookie headers
+   * and finds a cookie that represents a valid client/server session. If it finds one, it
+   * returns the client name associated with the session. Else, it returns null.
+   * @param request The HTTP Servlet Request send by the client
+   * @return Client Username if the request has valid HS2 cookie, else returns null
+   * @throws UnsupportedEncodingException
+   */
+  private String validateCookie(HttpServletRequest request) throws UnsupportedEncodingException {
+    // Find all the valid cookies associated with the request.
+    Cookie[] cookies = request.getCookies();
+
+    if (cookies == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("No valid cookies associated with the request " + request);
+      }
+      return null;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Received cookies: " + toCookieStr(cookies));
+    }
+    return getClientNameFromCookie(cookies);
+  }
+
+  /**
+   * Generate a server side cookie given the cookie value as the input.
+   * @param str Input string token.
+   * @return The generated cookie.
+   * @throws UnsupportedEncodingException
+   */
+  private Cookie createCookie(String str) throws UnsupportedEncodingException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Cookie name = " + AUTH_COOKIE + " value = " + str);
+    }
+    Cookie cookie = new Cookie(AUTH_COOKIE, str);
+
+    cookie.setMaxAge(cookieMaxAge);
+    if (cookieDomain != null) {
+      cookie.setDomain(cookieDomain);
+    }
+    if (cookiePath != null) {
+      cookie.setPath(cookiePath);
+    }
+    cookie.setSecure(isCookieSecure);
+    return cookie;
+  }
+
+  /**
+   * Generate httponly cookie from HS2 cookie
+   * @param cookie HS2 generated cookie
+   * @return The httponly cookie
+   */
+  private static String getHttpOnlyCookieHeader(Cookie cookie) {
+    NewCookie newCookie = new NewCookie(cookie.getName(), cookie.getValue(),
+      cookie.getPath(), cookie.getDomain(), cookie.getVersion(),
+      cookie.getComment(), cookie.getMaxAge(), cookie.getSecure());
+    return newCookie + "; HttpOnly";
+  }
+
+  /**
+   * Do the LDAP/PAM authentication
+   * @param request
+   * @param authType
+   * @throws HttpAuthenticationException
+   */
+  private String doPasswdAuth(HttpServletRequest request, String authType)
+      throws HttpAuthenticationException {
+    String userName = getUsername(request, authType);
+    // No-op when authType is NOSASL
+    if (!authType.equalsIgnoreCase(HiveAuthConstants.AuthTypes.NOSASL.toString())) {
+      try {
+        AuthMethods authMethod = AuthMethods.getValidAuthMethod(authType);
+        PasswdAuthenticationProvider provider =
+            AuthenticationProviderFactory.getAuthenticationProvider(authMethod, hiveConf);
+        provider.Authenticate(userName, getPassword(request, authType));
+
+      } catch (Exception e) {
+        throw new HttpAuthenticationException(e);
+      }
+    }
+    return userName;
+  }
+
+  private String doTokenAuth(HttpServletRequest request, HttpServletResponse response)
+      throws HttpAuthenticationException {
+    String tokenStr = request.getHeader(HIVE_DELEGATION_TOKEN_HEADER);
+    try {
+      return hiveAuthFactory.verifyDelegationToken(tokenStr);
+    } catch (HiveSQLException e) {
+      throw new HttpAuthenticationException(e);
+    }
+  }
+
+  /**
+   * Do the GSS-API kerberos authentication.
+   * We already have a logged in subject in the form of serviceUGI,
+   * which GSS-API will extract information from.
+   * In case of a SPNego request we use the httpUGI,
+   * for the authenticating service tickets.
+   * @param request
+   * @return
+   * @throws HttpAuthenticationException
+   */
+  private String doKerberosAuth(HttpServletRequest request)
+      throws HttpAuthenticationException {
+    // Try authenticating with the http/_HOST principal
+    if (httpUGI != null) {
+      try {
+        return httpUGI.doAs(new HttpKerberosServerAction(request, httpUGI));
+      } catch (Exception e) {
+        LOG.info("Failed to authenticate with http/_HOST kerberos principal, " +
+            "trying with hive/_HOST kerberos principal");
+      }
+    }
+    // Now try with hive/_HOST principal
+    try {
+      return serviceUGI.doAs(new HttpKerberosServerAction(request, serviceUGI));
+    } catch (Exception e) {
+      LOG.error("Failed to authenticate with hive/_HOST kerberos principal");
+      throw new HttpAuthenticationException(e);
+    }
+
+  }
+
+  class HttpKerberosServerAction implements PrivilegedExceptionAction<String> {
+    HttpServletRequest request;
+    UserGroupInformation serviceUGI;
+
+    HttpKerberosServerAction(HttpServletRequest request,
+        UserGroupInformation serviceUGI) {
+      this.request = request;
+      this.serviceUGI = serviceUGI;
+    }
+
+    @Override
+    public String run() throws HttpAuthenticationException {
+      // Get own Kerberos credentials for accepting connection
+      GSSManager manager = GSSManager.getInstance();
+      GSSContext gssContext = null;
+      String serverPrincipal = getPrincipalWithoutRealm(
+          serviceUGI.getUserName());
+      try {
+        // This Oid for Kerberos GSS-API mechanism.
+        Oid kerberosMechOid = new Oid("1.2.840.113554.1.2.2");
+        // Oid for SPNego GSS-API mechanism.
+        Oid spnegoMechOid = new Oid("1.3.6.1.5.5.2");
+        // Oid for kerberos principal name
+        Oid krb5PrincipalOid = new Oid("1.2.840.113554.1.2.2.1");
+
+        // GSS name for server
+        GSSName serverName = manager.createName(serverPrincipal, krb5PrincipalOid);
+
+        // GSS credentials for server
+        GSSCredential serverCreds = manager.createCredential(serverName,
+            GSSCredential.DEFAULT_LIFETIME,
+            new Oid[]{kerberosMechOid, spnegoMechOid},
+            GSSCredential.ACCEPT_ONLY);
+
+        // Create a GSS context
+        gssContext = manager.createContext(serverCreds);
+        // Get service ticket from the authorization header
+        String serviceTicketBase64 = getAuthHeader(request, authType);
+        byte[] inToken = Base64.decodeBase64(serviceTicketBase64.getBytes());
+        gssContext.acceptSecContext(inToken, 0, inToken.length);
+        // Authenticate or deny based on its context completion
+        if (!gssContext.isEstablished()) {
+          throw new HttpAuthenticationException("Kerberos authentication failed: " +
+              "unable to establish context with the service ticket " +
+              "provided by the client.");
+        }
+        else {
+          return getPrincipalWithoutRealmAndHost(gssContext.getSrcName().toString());
+        }
+      }
+      catch (GSSException e) {
+        throw new HttpAuthenticationException("Kerberos authentication failed: ", e);
+      }
+      finally {
+        if (gssContext != null) {
+          try {
+            gssContext.dispose();
+          } catch (GSSException e) {
+            // No-op
+          }
+        }
+      }
+    }
+
+    private String getPrincipalWithoutRealm(String fullPrincipal)
+        throws HttpAuthenticationException {
+      KerberosNameShim fullKerberosName;
+      try {
+        fullKerberosName = ShimLoader.getHadoopShims().getKerberosNameShim(fullPrincipal);
+      } catch (IOException e) {
+        throw new HttpAuthenticationException(e);
+      }
+      String serviceName = fullKerberosName.getServiceName();
+      String hostName = fullKerberosName.getHostName();
+      String principalWithoutRealm = serviceName;
+      if (hostName != null) {
+        principalWithoutRealm = serviceName + "/" + hostName;
+      }
+      return principalWithoutRealm;
+    }
+
+    private String getPrincipalWithoutRealmAndHost(String fullPrincipal)
+        throws HttpAuthenticationException {
+      KerberosNameShim fullKerberosName;
+      try {
+        fullKerberosName = ShimLoader.getHadoopShims().getKerberosNameShim(fullPrincipal);
+        return fullKerberosName.getShortName();
+      } catch (IOException e) {
+        throw new HttpAuthenticationException(e);
+      }
+    }
+  }
+
+  private String getUsername(HttpServletRequest request, String authType)
+      throws HttpAuthenticationException {
+    String creds[] = getAuthHeaderTokens(request, authType);
+    // Username must be present
+    if (creds[0] == null || creds[0].isEmpty()) {
+      throw new HttpAuthenticationException("Authorization header received " +
+          "from the client does not contain username.");
+    }
+    return creds[0];
+  }
+
+  private String getPassword(HttpServletRequest request, String authType)
+      throws HttpAuthenticationException {
+    String creds[] = getAuthHeaderTokens(request, authType);
+    // Password must be present
+    if (creds[1] == null || creds[1].isEmpty()) {
+      throw new HttpAuthenticationException("Authorization header received " +
+          "from the client does not contain username.");
+    }
+    return creds[1];
+  }
+
+  private String[] getAuthHeaderTokens(HttpServletRequest request,
+      String authType) throws HttpAuthenticationException {
+    String authHeaderBase64 = getAuthHeader(request, authType);
+    String authHeaderString = StringUtils.newStringUtf8(
+        Base64.decodeBase64(authHeaderBase64.getBytes()));
+    String[] creds = authHeaderString.split(":");
+    return creds;
+  }
+
+  /**
+   * Returns the base64 encoded auth header payload
+   * @param request
+   * @param authType
+   * @return
+   * @throws HttpAuthenticationException
+   */
+  private String getAuthHeader(HttpServletRequest request, String authType)
+      throws HttpAuthenticationException {
+    String authHeader = request.getHeader(HttpAuthUtils.AUTHORIZATION);
+    // Each http request must have an Authorization header
+    if (authHeader == null || authHeader.isEmpty()) {
+      throw new HttpAuthenticationException("Authorization header received " +
+          "from the client is empty.");
+    }
+
+    String authHeaderBase64String;
+    int beginIndex;
+    if (isKerberosAuthMode(authType)) {
+      beginIndex = (HttpAuthUtils.NEGOTIATE + " ").length();
+    }
+    else {
+      beginIndex = (HttpAuthUtils.BASIC + " ").length();
+    }
+    authHeaderBase64String = authHeader.substring(beginIndex);
+    // Authorization header must have a payload
+    if (authHeaderBase64String == null || authHeaderBase64String.isEmpty()) {
+      throw new HttpAuthenticationException("Authorization header received " +
+          "from the client does not contain any data.");
+    }
+    return authHeaderBase64String;
+  }
+
+  private boolean isKerberosAuthMode(String authType) {
+    return authType.equalsIgnoreCase(HiveAuthConstants.AuthTypes.KERBEROS.toString());
+  }
+
+  private static String getDoAsQueryParam(String queryString) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("URL query string:" + queryString);
+    }
+    if (queryString == null) {
+      return null;
+    }
+    Map<String, String[]> params = javax.servlet.http.HttpUtils.parseQueryString( queryString );
+    Set<String> keySet = params.keySet();
+    for (String key: keySet) {
+      if (key.equalsIgnoreCase("doAs")) {
+        return params.get(key)[0];
+      }
+    }
+    return null;
+  }
+
+}
+
+
diff --git a/thriftserver/server/src/main/java/org/apache/hive/service/server/HiveServer2.java b/thriftserver/server/src/main/java/org/apache/hive/service/server/HiveServer2.java
new file mode 100644
index 0000000..3db0590
--- /dev/null
+++ b/thriftserver/server/src/main/java/org/apache/hive/service/server/HiveServer2.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.server;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.JvmPauseMonitor;
+import org.apache.hadoop.hive.common.LogUtils;
+import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
+import org.apache.hadoop.hive.common.cli.CommonCliOptions;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.apache.hive.common.util.ShutdownHookManager;
+import org.apache.hive.service.CompositeService;
+import org.apache.hive.service.cli.ICLIService;
+import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService;
+import org.apache.hive.service.cli.thrift.ThriftCLIService;
+import org.apache.hive.service.cli.thrift.ThriftHttpCLIService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HiveServer2.
+ *
+ */
+public class HiveServer2 extends CompositeService {
+  private static final Logger LOG = LoggerFactory.getLogger(HiveServer2.class);
+  public static final String INSTANCE_URI_CONFIG = "hive.server2.instance.uri";
+  protected ICLIService cliService;
+  protected ThriftCLIService thriftCLIService;
+
+  public HiveServer2() {
+    super(HiveServer2.class.getSimpleName());
+    HiveConf.setLoadHiveServer2Config(true);
+  }
+
+  @Override
+  public synchronized void init(HiveConf hiveConf) {
+    assert cliService != null;
+    assert cliService instanceof CompositeService;
+    addService((CompositeService) cliService);
+    final HiveServer2 hiveServer2 = this;
+    Runnable oomHook = new Runnable() {
+      @Override
+      public void run() {
+        hiveServer2.stop();
+      }
+    };
+    if (isHTTPTransportMode(hiveConf)) {
+      thriftCLIService = new ThriftHttpCLIService(cliService, oomHook);
+    } else {
+      thriftCLIService = new ThriftBinaryCLIService(cliService, oomHook);
+    }
+    addService(thriftCLIService);
+    super.init(hiveConf);
+    // Add a shutdown hook for catching SIGTERM & SIGINT
+    ShutdownHookManager.addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        hiveServer2.stop();
+      }
+    });
+  }
+
+  public static boolean isHTTPTransportMode(Configuration hiveConf) {
+    String transportMode = System.getenv("HIVE_SERVER2_TRANSPORT_MODE");
+    if (transportMode == null) {
+      transportMode = hiveConf.get(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname);
+    }
+    if (transportMode != null && (transportMode.equalsIgnoreCase("http"))) {
+      return true;
+    }
+    return false;
+  }
+
+  public static boolean isKerberosAuthMode(Configuration hiveConf) {
+    String authMode = hiveConf.get(ConfVars.HIVE_SERVER2_AUTHENTICATION.varname);
+    if (authMode != null && (authMode.equalsIgnoreCase("KERBEROS"))) {
+      return true;
+    }
+    return false;
+  }
+
+  public String getServerHost() throws Exception {
+    if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress() == null)) {
+      throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
+    }
+    return thriftCLIService.getServerIPAddress().getHostName();
+  }
+
+  @Override
+  public synchronized void start() {
+    super.start();
+  }
+
+  @Override
+  public synchronized void stop() {
+    LOG.info("Shutting down HiveServer2");
+    super.stop();
+  }
+
+  private static void startHiveServer2() throws Throwable {
+    long attempts = 0, maxAttempts = 1;
+    while (true) {
+      LOG.info("Starting HiveServer2");
+      HiveConf hiveConf = new HiveConf();
+      maxAttempts = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS);
+      long retrySleepIntervalMs = hiveConf
+          .getTimeVar(ConfVars.HIVE_SERVER2_SLEEP_INTERVAL_BETWEEN_START_ATTEMPTS,
+              TimeUnit.MILLISECONDS);
+      HiveServer2 server = null;
+      try {
+        server = new HiveServer2();
+        server.init(hiveConf);
+        server.start();
+
+        try {
+          JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(hiveConf);
+          pauseMonitor.start();
+        } catch (Throwable t) {
+          LOG.warn("Could not initiate the JvmPauseMonitor thread." + " GCs and Pauses may not be " +
+            "warned upon.", t);
+        }
+        break;
+      } catch (Throwable throwable) {
+        if (server != null) {
+          try {
+            server.stop();
+          } catch (Throwable t) {
+            LOG.info("Exception caught when calling stop of HiveServer2 before retrying start", t);
+          } finally {
+            server = null;
+          }
+        }
+        if (++attempts >= maxAttempts) {
+          throw new Error("Max start attempts " + maxAttempts + " exhausted", throwable);
+        } else {
+          LOG.warn("Error starting HiveServer2 on attempt " + attempts
+              + ", will retry in " + retrySleepIntervalMs + "ms", throwable);
+          try {
+            Thread.sleep(retrySleepIntervalMs);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          }
+        }
+      }
+    }
+  }
+
+  public static void main(String[] args) {
+    HiveConf.setLoadHiveServer2Config(true);
+    try {
+      ServerOptionsProcessor oproc = new ServerOptionsProcessor("hiveserver2");
+      ServerOptionsProcessorResponse oprocResponse = oproc.parse(args);
+
+      // NOTE: It is critical to do this here so that log4j is reinitialized
+      // before any of the other core hive classes are loaded
+      String initLog4jMessage = LogUtils.initHiveLog4j();
+      LOG.debug(initLog4jMessage);
+      HiveStringUtils.startupShutdownMessage(HiveServer2.class, args, LOG);
+
+      // Logger debug message from "oproc" after log4j initialize properly
+      LOG.debug(oproc.getDebugMessage().toString());
+
+      // Call the executor which will execute the appropriate command based on the parsed options
+      oprocResponse.getServerOptionsExecutor().execute();
+    } catch (LogInitializationException e) {
+      LOG.error("Error initializing log: " + e.getMessage(), e);
+      System.exit(-1);
+    }
+  }
+
+  /**
+   * ServerOptionsProcessor.
+   * Process arguments given to HiveServer2 (-hiveconf property=value)
+   * Set properties in System properties
+   * Create an appropriate response object,
+   * which has executor to execute the appropriate command based on the parsed options.
+   */
+  public static class ServerOptionsProcessor {
+    private final Options options = new Options();
+    private org.apache.commons.cli.CommandLine commandLine;
+    private final String serverName;
+    private final StringBuilder debugMessage = new StringBuilder();
+
+    @SuppressWarnings("static-access")
+    public ServerOptionsProcessor(String serverName) {
+      this.serverName = serverName;
+      // -hiveconf x=y
+      options.addOption(OptionBuilder
+          .withValueSeparator()
+          .hasArgs(2)
+          .withArgName("property=value")
+          .withLongOpt("hiveconf")
+          .withDescription("Use value for given property")
+          .create());
+      options.addOption(new Option("H", "help", false, "Print help information"));
+    }
+
+    public ServerOptionsProcessorResponse parse(String[] argv) {
+      try {
+        commandLine = new GnuParser().parse(options, argv);
+        // Process --hiveconf
+        // Get hiveconf param values and set the System property values
+        Properties confProps = commandLine.getOptionProperties("hiveconf");
+        for (String propKey : confProps.stringPropertyNames()) {
+          // save logging message for log4j output latter after log4j initialize properly
+          debugMessage.append("Setting " + propKey + "=" + confProps.getProperty(propKey) + ";\n");
+          if (propKey.equalsIgnoreCase("hive.root.logger")) {
+            CommonCliOptions.splitAndSetLogger(propKey, confProps);
+          } else {
+            System.setProperty(propKey, confProps.getProperty(propKey));
+          }
+        }
+
+        // Process --help
+        if (commandLine.hasOption('H')) {
+          return new ServerOptionsProcessorResponse(new HelpOptionExecutor(serverName, options));
+        }
+      } catch (ParseException e) {
+        // Error out & exit - we were not able to parse the args successfully
+        System.err.println("Error starting HiveServer2 with given arguments: ");
+        System.err.println(e.getMessage());
+        System.exit(-1);
+      }
+      // Default executor, when no option is specified
+      return new ServerOptionsProcessorResponse(new StartOptionExecutor());
+    }
+
+    StringBuilder getDebugMessage() {
+      return debugMessage;
+    }
+  }
+
+  /**
+   * The response sent back from {@link ServerOptionsProcessor#parse(String[])}
+   */
+  static class ServerOptionsProcessorResponse {
+    private final ServerOptionsExecutor serverOptionsExecutor;
+
+    ServerOptionsProcessorResponse(ServerOptionsExecutor serverOptionsExecutor) {
+      this.serverOptionsExecutor = serverOptionsExecutor;
+    }
+
+    ServerOptionsExecutor getServerOptionsExecutor() {
+      return serverOptionsExecutor;
+    }
+  }
+
+  /**
+   * The executor interface for running the appropriate HiveServer2 command based on parsed options
+   */
+  static interface ServerOptionsExecutor {
+    public void execute();
+  }
+
+  /**
+   * HelpOptionExecutor: executes the --help option by printing out the usage
+   */
+  static class HelpOptionExecutor implements ServerOptionsExecutor {
+    private final Options options;
+    private final String serverName;
+
+    HelpOptionExecutor(String serverName, Options options) {
+      this.options = options;
+      this.serverName = serverName;
+    }
+
+    @Override
+    public void execute() {
+      new HelpFormatter().printHelp(serverName, options);
+      System.exit(0);
+    }
+  }
+
+  /**
+   * StartOptionExecutor: starts HiveServer2.
+   * This is the default executor, when no option is specified.
+   */
+  static class StartOptionExecutor implements ServerOptionsExecutor {
+    @Override
+    public void execute() {
+      try {
+        startHiveServer2();
+      } catch (Throwable t) {
+        LOG.error("Error starting HiveServer2", t);
+        System.exit(-1);
+      }
+    }
+  }
+}
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyCLIService.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyCLIService.scala
new file mode 100644
index 0000000..5289354
--- /dev/null
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyCLIService.scala
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver
+
+import java.io.IOException
+import java.util
+import java.util.concurrent.{CancellationException, ExecutionException, TimeoutException, TimeUnit}
+import javax.security.auth.login.LoginException
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.common.log.ProgressMonitor
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+import org.apache.hadoop.hive.ql.parse.ParseUtils
+import org.apache.hadoop.hive.shims.Utils
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hive.service.{CompositeService, ServiceException}
+import org.apache.hive.service.auth.HiveAuthFactory
+import org.apache.hive.service.cli._
+import org.apache.hive.service.cli.operation.Operation
+import org.apache.hive.service.rpc.thrift.{TOperationHandle, TProtocolVersion}
+
+import org.apache.livy.{LIVY_VERSION, Logging}
+
+class LivyCLIService(server: LivyThriftServer)
+  extends CompositeService(classOf[LivyCLIService].getName) with ICLIService with Logging {
+  import LivyCLIService._
+
+  private var sessionManager: LivyThriftSessionManager = _
+  private var defaultFetchRows: Int = _
+  private var serviceUGI: UserGroupInformation = _
+  private var httpUGI: UserGroupInformation = _
+
+  override def init(hiveConf: HiveConf): Unit = {
+    sessionManager = new LivyThriftSessionManager(server, hiveConf)
+    addService(sessionManager)
+    defaultFetchRows =
+      hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE)
+    //  If the hadoop cluster is secure, do a kerberos login for the service from the keytab
+    if (UserGroupInformation.isSecurityEnabled) {
+      try {
+        serviceUGI = Utils.getUGI
+      } catch {
+        case e: IOException =>
+          throw new ServiceException("Unable to login to kerberos with given principal/keytab", e)
+        case e: LoginException =>
+          throw new ServiceException("Unable to login to kerberos with given principal/keytab", e)
+      }
+      // Also try creating a UGI object for the SPNego principal
+      val principal = hiveConf.getVar(ConfVars.HIVE_SERVER2_SPNEGO_PRINCIPAL)
+      val keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_SPNEGO_KEYTAB)
+      if (principal.isEmpty || keyTabFile.isEmpty) {
+        info(s"SPNego httpUGI not created, SPNegoPrincipal: $principal, ketabFile: $keyTabFile")
+      } else try {
+        httpUGI = HiveAuthFactory.loginFromSpnegoKeytabAndReturnUGI(hiveConf)
+        info("SPNego httpUGI successfully created.")
+      } catch {
+        case e: IOException =>
+          warn("SPNego httpUGI creation failed: ", e)
+      }
+    }
+    super.init(hiveConf)
+  }
+
+  def getServiceUGI: UserGroupInformation = this.serviceUGI
+
+  def getHttpUGI: UserGroupInformation = this.httpUGI
+
+  def getSessionManager: LivyThriftSessionManager = sessionManager
+
+  @throws[HiveSQLException]
+  override def getInfo(sessionHandle: SessionHandle, getInfoType: GetInfoType): GetInfoValue = {
+    getInfoType match {
+      case GetInfoType.CLI_SERVER_NAME => new GetInfoValue("Livy JDBC")
+      case GetInfoType.CLI_DBMS_NAME => new GetInfoValue("Livy JDBC")
+      case GetInfoType.CLI_DBMS_VER => new GetInfoValue(LIVY_VERSION)
+      // below values are copied from Hive
+      case GetInfoType.CLI_MAX_COLUMN_NAME_LEN => new GetInfoValue(128)
+      case GetInfoType.CLI_MAX_SCHEMA_NAME_LEN => new GetInfoValue(128)
+      case GetInfoType.CLI_MAX_TABLE_NAME_LEN => new GetInfoValue(128)
+      case GetInfoType.CLI_ODBC_KEYWORDS =>
+        new GetInfoValue(ParseUtils.getKeywords(LivyCLIService.ODBC_KEYWORDS))
+      case _ => throw new HiveSQLException(s"Unrecognized GetInfoType value: $getInfoType")
+    }
+  }
+
+  @throws[HiveSQLException]
+  def openSession(
+      protocol: TProtocolVersion,
+      username: String,
+      password: String,
+      ipAddress: String,
+      configuration: util.Map[String, String]): SessionHandle = {
+    val sessionHandle = sessionManager.openSession(
+      protocol, username, password, ipAddress, configuration, false, null)
+    debug(sessionHandle + ": openSession()")
+    sessionHandle
+  }
+
+  @throws[HiveSQLException]
+  def openSessionWithImpersonation(
+      protocol: TProtocolVersion,
+      username: String,
+      password: String,
+      ipAddress: String,
+      configuration: util.Map[String, String],
+      delegationToken: String): SessionHandle = {
+    val sessionHandle = sessionManager.openSession(
+      protocol, username, password, ipAddress, configuration, true, delegationToken)
+    debug(sessionHandle + ": openSession()")
+    sessionHandle
+  }
+
+  @throws[HiveSQLException]
+  override def openSession(
+      username: String,
+      password: String,
+      configuration: util.Map[String, String]): SessionHandle = {
+    val sessionHandle = sessionManager.openSession(
+      SERVER_VERSION, username, password, null, configuration, false, null)
+    debug(sessionHandle + ": openSession()")
+    sessionHandle
+  }
+
+  @throws[HiveSQLException]
+  override def openSessionWithImpersonation(
+      username: String,
+      password: String,
+      configuration: util.Map[String, String], delegationToken: String): SessionHandle = {
+    val sessionHandle = sessionManager.openSession(
+      SERVER_VERSION, username, password, null, configuration, true, delegationToken)
+    debug(sessionHandle + ": openSession()")
+    sessionHandle
+  }
+
+  @throws[HiveSQLException]
+  override def closeSession(sessionHandle: SessionHandle): Unit = {
+    sessionManager.closeSession(sessionHandle)
+    debug(sessionHandle + ": closeSession()")
+  }
+
+  @throws[HiveSQLException]
+  override def executeStatement(
+      sessionHandle: SessionHandle,
+      statement: String,
+      confOverlay: util.Map[String, String]): OperationHandle = {
+    executeStatement(sessionHandle, statement, confOverlay, 0)
+  }
+
+  /**
+   * Execute statement on the server with a timeout. This is a blocking call.
+   */
+  @throws[HiveSQLException]
+  override def executeStatement(
+      sessionHandle: SessionHandle,
+      statement: String,
+      confOverlay: util.Map[String, String],
+      queryTimeout: Long): OperationHandle = {
+    val opHandle: OperationHandle = sessionManager.operationManager.executeStatement(
+      sessionHandle, statement, confOverlay, runAsync = false, queryTimeout)
+    debug(sessionHandle + ": executeStatement()")
+    opHandle
+  }
+
+  @throws[HiveSQLException]
+  override def executeStatementAsync(
+      sessionHandle: SessionHandle,
+      statement: String,
+      confOverlay: util.Map[String, String]): OperationHandle = {
+    executeStatementAsync(sessionHandle, statement, confOverlay, 0)
+  }
+
+  /**
+   * Execute statement asynchronously on the server with a timeout. This is a non-blocking call
+   */
+  @throws[HiveSQLException]
+  override def executeStatementAsync(
+      sessionHandle: SessionHandle,
+      statement: String,
+      confOverlay: util.Map[String, String],
+      queryTimeout: Long): OperationHandle = {
+    val opHandle = sessionManager.operationManager.executeStatement(
+      sessionHandle, statement, confOverlay, runAsync = true, queryTimeout)
+    debug(sessionHandle + ": executeStatementAsync()")
+    opHandle
+  }
+
+  @throws[HiveSQLException]
+  override def getTypeInfo(sessionHandle: SessionHandle): OperationHandle = {
+    debug(sessionHandle + ": getTypeInfo()")
+    sessionManager.operationManager.getTypeInfo(sessionHandle)
+  }
+
+  @throws[HiveSQLException]
+  override def getCatalogs(sessionHandle: SessionHandle): OperationHandle = {
+    debug(sessionHandle + ": getCatalogs()")
+    sessionManager.operationManager.getCatalogs(sessionHandle)
+  }
+
+  @throws[HiveSQLException]
+  override def getSchemas(
+      sessionHandle: SessionHandle,
+      catalogName: String,
+      schemaName: String): OperationHandle = {
+    // TODO
+    throw new HiveSQLException("Operation GET_SCHEMAS is not yet supported")
+  }
+
+  @throws[HiveSQLException]
+  override def getTables(
+      sessionHandle: SessionHandle,
+      catalogName: String,
+      schemaName: String,
+      tableName: String,
+      tableTypes: util.List[String]): OperationHandle = {
+    // TODO
+    throw new HiveSQLException("Operation GET_TABLES is not yet supported")
+  }
+
+  @throws[HiveSQLException]
+  override def getTableTypes(sessionHandle: SessionHandle): OperationHandle = {
+    debug(sessionHandle + ": getTableTypes()")
+    sessionManager.operationManager.getTableTypes(sessionHandle)
+  }
+
+  @throws[HiveSQLException]
+  override def getColumns(
+      sessionHandle: SessionHandle,
+      catalogName: String,
+      schemaName: String,
+      tableName: String,
+      columnName: String): OperationHandle = {
+    // TODO
+    throw new HiveSQLException("Operation GET_COLUMNS is not yet supported")
+  }
+
+  @throws[HiveSQLException]
+  override def getFunctions(
+      sessionHandle: SessionHandle,
+      catalogName: String,
+      schemaName: String,
+      functionName: String): OperationHandle = {
+    // TODO
+    throw new HiveSQLException("Operation GET_FUNCTIONS is not yet supported")
+  }
+
+  @throws[HiveSQLException]
+  override def getPrimaryKeys(
+      sessionHandle: SessionHandle,
+      catalog: String,
+      schema: String,
+      table: String): OperationHandle = {
+    // TODO
+    throw new HiveSQLException("Operation GET_PRIMARY_KEYS is not yet supported")
+  }
+
+  @throws[HiveSQLException]
+  override def getCrossReference(
+      sessionHandle: SessionHandle,
+      primaryCatalog: String,
+      primarySchema: String,
+      primaryTable: String,
+      foreignCatalog: String,
+      foreignSchema: String,
+      foreignTable: String): OperationHandle = {
+    // TODO
+    throw new HiveSQLException("Operation GET_CROSS_REFERENCE is not yet supported")
+  }
+
+  @throws[HiveSQLException]
+  override def getOperationStatus(
+      opHandle: OperationHandle,
+      getProgressUpdate: Boolean): OperationStatus = {
+    val operation: Operation = sessionManager.operationManager.getOperation(opHandle)
+    /**
+     * If this is a background operation run asynchronously,
+     * we block for a duration determined by a step function, before we return
+     * However, if the background operation is complete, we return immediately.
+     */
+    if (operation.shouldRunAsync) {
+      val maxTimeout: Long = HiveConf.getTimeVar(
+        getHiveConf,
+        HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT,
+        TimeUnit.MILLISECONDS)
+      val elapsed: Long = System.currentTimeMillis - operation.getBeginTime
+      // A step function to increase the polling timeout by 500 ms every 10 sec,
+      // starting from 500 ms up to HIVE_SERVER2_LONG_POLLING_TIMEOUT
+      val timeout: Long = Math.min(maxTimeout, (elapsed / TimeUnit.SECONDS.toMillis(10) + 1) * 500)
+      try {
+        operation.getBackgroundHandle.get(timeout, TimeUnit.MILLISECONDS)
+      } catch {
+        case e: TimeoutException =>
+          // No Op, return to the caller since long polling timeout has expired
+          trace(opHandle + ": Long polling timed out")
+        case e: CancellationException =>
+          // The background operation thread was cancelled
+          trace(opHandle + ": The background operation was cancelled", e)
+        case e: ExecutionException =>
+          // Note: Hive ops do not use the normal Future failure path, so this will not happen
+          //       in case of actual failure; the Future will just be done.
+          // The background operation thread was aborted
+          warn(opHandle + ": The background operation was aborted", e)
+        case _: InterruptedException =>
+        // No op, this thread was interrupted
+        // In this case, the call might return sooner than long polling timeout
+      }
+    }
+    val opStatus: OperationStatus = operation.getStatus
+    debug(opHandle + ": getOperationStatus()")
+    opStatus.setJobProgressUpdate(new JobProgressUpdate(ProgressMonitor.NULL))
+    opStatus
+  }
+
+  @throws[HiveSQLException]
+  override def cancelOperation(opHandle: OperationHandle): Unit = {
+    sessionManager.operationManager.cancelOperation(opHandle)
+    debug(opHandle + ": cancelOperation()")
+  }
+
+  @throws[HiveSQLException]
+  override def closeOperation(opHandle: OperationHandle): Unit = {
+    sessionManager.operationManager.closeOperation(opHandle)
+    debug(opHandle + ": closeOperation")
+  }
+
+  @throws[HiveSQLException]
+  override def getResultSetMetadata(opHandle: OperationHandle): TableSchema = {
+    debug(opHandle + ": getResultSetMetadata()")
+    sessionManager.operationManager.getOperation(opHandle).getResultSetSchema
+  }
+
+  @throws[HiveSQLException]
+  override def fetchResults(opHandle: OperationHandle): RowSet = {
+    fetchResults(
+      opHandle, Operation.DEFAULT_FETCH_ORIENTATION, defaultFetchRows, FetchType.QUERY_OUTPUT)
+  }
+
+  @throws[HiveSQLException]
+  override def fetchResults(
+      opHandle: OperationHandle,
+      orientation: FetchOrientation,
+      maxRows: Long,
+      fetchType: FetchType): RowSet = {
+    debug(opHandle + ": fetchResults()")
+    sessionManager.operationManager.fetchResults(opHandle, orientation, maxRows, fetchType)
+  }
+
+  @throws[HiveSQLException]
+  override def getDelegationToken(
+      sessionHandle: SessionHandle,
+      authFactory: HiveAuthFactory,
+      owner: String,
+      renewer: String): String = {
+    throw new HiveSQLException("Operation not yet supported.")
+  }
+
+  @throws[HiveSQLException]
+  override def setApplicationName(sh: SessionHandle, value: String): Unit = {
+    throw new HiveSQLException("Operation not yet supported.")
+  }
+
+  override def cancelDelegationToken(
+      sessionHandle: SessionHandle,
+      authFactory: HiveAuthFactory,
+      tokenStr: String): Unit = {
+    throw new HiveSQLException("Operation not yet supported.")
+  }
+
+  override def renewDelegationToken(
+      sessionHandle: SessionHandle,
+      authFactory: HiveAuthFactory,
+      tokenStr: String): Unit = {
+    throw new HiveSQLException("Operation not yet supported.")
+  }
+
+  @throws[HiveSQLException]
+  override def getQueryId(opHandle: TOperationHandle): String = {
+    throw new HiveSQLException("Operation not yet supported.")
+  }
+}
+
+
+object LivyCLIService {
+  val SERVER_VERSION: TProtocolVersion = TProtocolVersion.values().last
+
+  // scalastyle:off line.size.limit
+  // From https://docs.microsoft.com/en-us/sql/t-sql/language-elements/reserved-keywords-transact-sql#odbc-reserved-keywords
+  // scalastyle:on line.size.limit
+  private val ODBC_KEYWORDS = Set("ABSOLUTE", "ACTION", "ADA", "ADD", "ALL", "ALLOCATE", "ALTER",
+    "AND", "ANY", "ARE", "AS", "ASC", "ASSERTION", "AT", "AUTHORIZATION", "AVG", "BEGIN",
+    "BETWEEN", "BIT_LENGTH", "BIT", "BOTH", "BY", "CASCADE", "CASCADED", "CASE", "CAST", "CATALOG",
+    "CHAR_LENGTH", "CHAR", "CHARACTER_LENGTH", "CHARACTER", "CHECK", "CLOSE", "COALESCE",
+    "COLLATE", "COLLATION", "COLUMN", "COMMIT", "CONNECT", "CONNECTION", "CONSTRAINT",
+    "CONSTRAINTS", "CONTINUE", "CONVERT", "CORRESPONDING", "COUNT", "CREATE", "CROSS",
+    "CURRENT_DATE", "CURRENT_TIME", "CURRENT_TIMESTAMP", "CURRENT_USER", "CURRENT", "CURSOR",
+    "DATE", "DAY", "DEALLOCATE", "DEC", "DECIMAL", "DECLARE", "DEFAULT", "DEFERRABLE", "DEFERRED",
+    "DELETE", "DESC", "DESCRIBE", "DESCRIPTOR", "DIAGNOSTICS", "DISCONNECT", "DISTINCT", "DOMAIN",
+    "DOUBLE", "DROP", "ELSE", "END", "ESCAPE", "EXCEPT", "EXCEPTION", "EXEC", "EXECUTE", "EXISTS",
+    "EXTERNAL", "EXTRACT", "FALSE", "FETCH", "FIRST", "FLOAT", "FOR", "FOREIGN", "FORTRAN",
+    "FOUND", "FROM", "FULL", "GET", "GLOBAL", "GO", "GOTO", "GRANT", "GROUP", "HAVING", "HOUR",
+    "IDENTITY", "IMMEDIATE", "IN", "INCLUDE", "INDEX", "INDICATOR", "INITIALLY", "INNER", "INPUT",
+    "INSENSITIVE", "INSERT", "INT", "INTEGER", "INTERSECT", "INTERVAL", "INTO", "IS", "ISOLATION",
+    "JOIN", "KEY", "LANGUAGE", "LAST", "LEADING", "LEFT", "LEVEL", "LIKE", "LOCAL", "LOWER",
+    "MATCH", "MAX", "MIN", "MINUTE", "MODULE", "MONTH", "NAMES", "NATIONAL", "NATURAL", "NCHAR",
+    "NEXT", "NO", "NONE", "NOT", "NULL", "NULLIF", "NUMERIC", "OCTET_LENGTH", "OF", "ON", "ONLY",
+    "OPEN", "OPTION", "OR", "ORDER", "OUTER", "OUTPUT", "OVERLAPS", "PAD", "PARTIAL", "PASCAL",
+    "POSITION", "PRECISION", "PREPARE", "PRESERVE", "PRIMARY", "PRIOR", "PRIVILEGES", "PROCEDURE",
+    "PUBLIC", "READ", "REAL", "REFERENCES", "RELATIVE", "RESTRICT", "REVOKE", "RIGHT", "ROLLBACK",
+    "ROWS", "SCHEMA", "SCROLL", "SECOND", "SECTION", "SELECT", "SESSION_USER", "SESSION", "SET",
+    "SIZE", "SMALLINT", "SOME", "SPACE", "SQL", "SQLCA", "SQLCODE", "SQLERROR", "SQLSTATE",
+    "SQLWARNING", "SUBSTRING", "SUM", "SYSTEM_USER", "TABLE", "TEMPORARY", "THEN", "TIME",
+    "TIMESTAMP", "TIMEZONE_HOUR", "TIMEZONE_MINUTE", "TO", "TRAILING", "TRANSACTION", "TRANSLATE",
+    "TRANSLATION", "TRIM", "TRUE", "UNION", "UNIQUE", "UNKNOWN", "UPDATE", "UPPER", "USAGE",
+    "USER", "USING", "VALUE", "VALUES", "VARCHAR", "VARYING", "VIEW", "WHEN", "WHENEVER", "WHERE",
+    "WITH", "WORK", "WRITE", "YEAR", "ZONE").asJava
+}
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyExecuteStatementOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyExecuteStatementOperation.scala
new file mode 100644
index 0000000..142eebf
--- /dev/null
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyExecuteStatementOperation.scala
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver
+
+import java.security.PrivilegedExceptionAction
+import java.util
+import java.util.{Map => JMap}
+import java.util.concurrent.{ConcurrentLinkedQueue, RejectedExecutionException}
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.hive.serde2.thrift.ColumnBuffer
+import org.apache.hadoop.hive.shims.Utils
+import org.apache.hive.service.cli._
+import org.apache.hive.service.cli.operation.Operation
+
+import org.apache.livy.Logging
+import org.apache.livy.thriftserver.SessionStates._
+import org.apache.livy.thriftserver.rpc.RpcClient
+import org.apache.livy.thriftserver.types.DataTypeUtils._
+
+class LivyExecuteStatementOperation(
+    sessionHandle: SessionHandle,
+    statement: String,
+    confOverlay: JMap[String, String],
+    runInBackground: Boolean = true,
+    sessionManager: LivyThriftSessionManager)
+  extends Operation(sessionHandle, confOverlay, OperationType.EXECUTE_STATEMENT)
+    with Logging {
+
+  /**
+   * Contains the messages which have to be sent to the client.
+   */
+  private val operationMessages = new ConcurrentLinkedQueue[String]
+
+  // The initialization need to be lazy in order not to block when the instance is created
+  private lazy val rpcClient = {
+    val sessionState = sessionManager.livySessionState(sessionHandle)
+    if (sessionState == CREATION_IN_PROGRESS) {
+      operationMessages.offer(
+        "Livy session has not yet started. Please wait for it to be ready...")
+    }
+    // This call is blocking, we are waiting for the session to be ready.
+    new RpcClient(sessionManager.getLivySession(sessionHandle))
+  }
+  private var rowOffset = 0L
+
+  private def statementId: String = getHandle.getHandleIdentifier.toString
+
+  private def rpcClientValid: Boolean =
+    sessionManager.livySessionState(sessionHandle) == CREATION_SUCCESS && rpcClient.isValid
+
+  override def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
+    validateDefaultFetchOrientation(order)
+    assertState(util.Arrays.asList(OperationState.FINISHED))
+    setHasResultSet(true)
+
+    // maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int
+    val maxRows = maxRowsL.toInt
+    val jsonSchema = rpcClient.fetchResultSchema(statementId).get()
+    val types = getInternalTypes(jsonSchema)
+    val livyColumnResultSet = rpcClient.fetchResult(statementId, types, maxRows).get()
+
+    val thriftColumns = livyColumnResultSet.columns.map { col =>
+      new ColumnBuffer(toHiveThriftType(col.dataType), col.getNulls, col.getColumnValues)
+    }
+    val result = new ColumnBasedSet(tableSchemaFromSparkJson(jsonSchema).toTypeDescriptors,
+      thriftColumns.toList.asJava,
+      rowOffset)
+    livyColumnResultSet.columns.headOption.foreach { c =>
+      rowOffset += c.size
+    }
+    result
+  }
+
+  override def runInternal(): Unit = {
+    setState(OperationState.PENDING)
+    setHasResultSet(true) // avoid no resultset for async run
+
+    if (!runInBackground) {
+      execute()
+    } else {
+      val livyServiceUGI = Utils.getUGI
+
+      // Runnable impl to call runInternal asynchronously,
+      // from a different thread
+      val backgroundOperation = new Runnable() {
+
+        override def run(): Unit = {
+          val doAsAction = new PrivilegedExceptionAction[Unit]() {
+            override def run(): Unit = {
+              try {
+                execute()
+              } catch {
+                case e: HiveSQLException =>
+                  setOperationException(e)
+                  error("Error running hive query: ", e)
+              }
+            }
+          }
+
+          try {
+            livyServiceUGI.doAs(doAsAction)
+          } catch {
+            case e: Exception =>
+              setOperationException(new HiveSQLException(e))
+              error("Error running hive query as user : " +
+                livyServiceUGI.getShortUserName, e)
+          }
+        }
+      }
+      try {
+        // This submit blocks if no background threads are available to run this operation
+        val backgroundHandle = sessionManager.submitBackgroundOperation(backgroundOperation)
+        setBackgroundHandle(backgroundHandle)
+      } catch {
+        case rejected: RejectedExecutionException =>
+          setState(OperationState.ERROR)
+          throw new HiveSQLException("The background threadpool cannot accept" +
+            " new task for execution, please retry the operation", rejected)
+        case NonFatal(e) =>
+          error(s"Error executing query in background", e)
+          setState(OperationState.ERROR)
+          throw e
+      }
+    }
+  }
+
+  protected def execute(): Unit = {
+    if (logger.isDebugEnabled) {
+      debug(s"Running query '$statement' with id $statementId (session = " +
+        s"${sessionHandle.getSessionId})")
+    }
+    setState(OperationState.RUNNING)
+
+    try {
+      rpcClient.executeSql(sessionHandle, statementId, statement).get()
+    } catch {
+      case e: Throwable =>
+        val currentState = getStatus.getState
+        info(s"Error executing query, currentState $currentState, ", e)
+        setState(OperationState.ERROR)
+        throw new HiveSQLException(e)
+    }
+    setState(OperationState.FINISHED)
+  }
+
+  def close(): Unit = {
+    info(s"Close $statementId")
+    cleanup(OperationState.CLOSED)
+  }
+
+  override def cancel(state: OperationState): Unit = {
+    info(s"Cancel $statementId with state $state")
+    cleanup(state)
+  }
+
+  def getResultSetSchema: TableSchema = {
+    val tableSchema = tableSchemaFromSparkJson(rpcClient.fetchResultSchema(statementId).get())
+    // Workaround for operations returning an empty schema (eg. CREATE, INSERT, ...)
+    if (tableSchema.getSize == 0) {
+      tableSchema.addStringColumn("Result", "")
+    }
+    tableSchema
+  }
+
+  private def cleanup(state: OperationState) {
+    if (statementId != null && rpcClientValid) {
+      rpcClient.cleanupStatement(statementId).get()
+    }
+    setState(state)
+  }
+
+  /**
+   * Returns the messages that should be sent to the client and removes them from the queue in
+   * order not to send them twice.
+   */
+  def getOperationMessages: Seq[String] = {
+    def fetchNext(acc: mutable.ListBuffer[String]): Boolean = {
+      val m = operationMessages.poll()
+      if (m == null) {
+        false
+      } else {
+        acc += m
+        true
+      }
+    }
+    val res = new mutable.ListBuffer[String]
+    while (fetchNext(res)) {}
+    res
+  }
+}
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala
new file mode 100644
index 0000000..c71171a
--- /dev/null
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver
+
+import java.util
+import java.util.{Map => JMap}
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.mutable
+
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema}
+import org.apache.hive.service.cli._
+import org.apache.hive.service.cli.operation.{GetCatalogsOperation, GetTableTypesOperation, GetTypeInfoOperation, Operation}
+
+import org.apache.livy.Logging
+
+class LivyOperationManager(val livyThriftSessionManager: LivyThriftSessionManager)
+  extends Logging {
+
+  private val handleToOperation = new ConcurrentHashMap[OperationHandle, Operation]()
+  private val sessionToOperationHandles =
+    new mutable.HashMap[SessionHandle, mutable.Set[OperationHandle]]()
+
+  private def addOperation(operation: Operation, sessionHandle: SessionHandle): Unit = {
+    handleToOperation.put(operation.getHandle, operation)
+    sessionToOperationHandles.synchronized {
+      val set = sessionToOperationHandles.getOrElseUpdate(sessionHandle,
+        new mutable.HashSet[OperationHandle])
+      set += operation.getHandle
+    }
+  }
+
+  @throws[HiveSQLException]
+  private def removeOperation(operationHandle: OperationHandle): Operation = {
+    val operation = handleToOperation.remove(operationHandle)
+    if (operation == null) {
+      throw new HiveSQLException(s"Operation does not exist: $operationHandle")
+    }
+    val sessionHandle = operation.getSessionHandle
+    sessionToOperationHandles.synchronized {
+      sessionToOperationHandles(sessionHandle) -= operationHandle
+      if (sessionToOperationHandles(sessionHandle).isEmpty) {
+        sessionToOperationHandles.remove(sessionHandle)
+      }
+    }
+    operation
+  }
+
+  def getOperations(sessionHandle: SessionHandle): Set[OperationHandle] = {
+    sessionToOperationHandles.synchronized {
+      sessionToOperationHandles(sessionHandle).toSet
+    }
+  }
+
+  def getTimedOutOperations(sessionHandle: SessionHandle): Set[Operation] = {
+    val opHandles = getOperations(sessionHandle)
+    val currentTime = System.currentTimeMillis()
+    opHandles.flatMap { handle =>
+      // Some operations may have finished and been removed since we got them.
+      Option(handleToOperation.get(handle))
+    }.filter(_.isTimedOut(currentTime))
+  }
+
+  @throws[HiveSQLException]
+  def getOperation(operationHandle: OperationHandle): Operation = {
+    val operation = handleToOperation.get(operationHandle)
+    if (operation == null) {
+      throw new HiveSQLException(s"Invalid OperationHandle: $operationHandle")
+    }
+    operation
+  }
+
+  def newExecuteStatementOperation(
+      sessionHandle: SessionHandle,
+      statement: String,
+      confOverlay: JMap[String, String],
+      runAsync: Boolean,
+      queryTimeout: Long): Operation = {
+    val op = new LivyExecuteStatementOperation(
+      sessionHandle,
+      statement,
+      confOverlay,
+      runAsync,
+      livyThriftSessionManager)
+    addOperation(op, sessionHandle)
+    debug(s"Created Operation for $statement with session=$sessionHandle, " +
+      s"runInBackground=$runAsync")
+    op
+  }
+
+  def getOperationLogRowSet(
+      opHandle: OperationHandle,
+      orientation: FetchOrientation,
+      maxRows: Long): RowSet = {
+    val tableSchema = new TableSchema(LivyOperationManager.LOG_SCHEMA)
+    val session = livyThriftSessionManager.getSessionInfo(getOperation(opHandle).getSessionHandle)
+    val logs = RowSetFactory.create(tableSchema, session.protocolVersion, false)
+
+    if (!livyThriftSessionManager.getHiveConf.getBoolVar(
+        ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) {
+      warn("Try to get operation log when " +
+        ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED.varname +
+        " is false, no log will be returned. ")
+    } else {
+      // Get the operation log. This is implemented only for LivyExecuteStatementOperation
+      val operation = getOperation(opHandle)
+      if (operation.isInstanceOf[LivyExecuteStatementOperation]) {
+        val op = getOperation(opHandle).asInstanceOf[LivyExecuteStatementOperation]
+        op.getOperationMessages.foreach { l =>
+          logs.addRow(Array(l))
+        }
+      }
+    }
+    logs
+  }
+
+  @throws[HiveSQLException]
+  def executeStatement(
+      sessionHandle: SessionHandle,
+      statement: String,
+      confOverlay: util.Map[String, String],
+      runAsync: Boolean,
+      queryTimeout: Long): OperationHandle = {
+    executeOperation(sessionHandle, {
+      newExecuteStatementOperation(sessionHandle, statement, confOverlay, runAsync, queryTimeout)
+    })
+  }
+
+  @throws[HiveSQLException]
+  private def executeOperation(
+      sessionHandle: SessionHandle,
+      operationCreator: => Operation): OperationHandle = {
+    var opHandle: OperationHandle = null
+    try {
+      val operation = operationCreator
+      opHandle = operation.getHandle
+      operation.run()
+      opHandle
+    } catch {
+      case e: HiveSQLException =>
+        if (opHandle != null) {
+          closeOperation(opHandle)
+        }
+        throw e
+    }
+  }
+
+  @throws[HiveSQLException]
+  def getTypeInfo(sessionHandle: SessionHandle): OperationHandle = {
+    executeOperation(sessionHandle, {
+      val op = new GetTypeInfoOperation(sessionHandle)
+      addOperation(op, sessionHandle)
+      op
+    })
+  }
+
+  @throws[HiveSQLException]
+  def getCatalogs(sessionHandle: SessionHandle): OperationHandle = {
+    executeOperation(sessionHandle, {
+      val op = new GetCatalogsOperation(sessionHandle)
+      addOperation(op, sessionHandle)
+      op
+    })
+  }
+
+  @throws[HiveSQLException]
+  def getTableTypes(sessionHandle: SessionHandle): OperationHandle = {
+    executeOperation(sessionHandle, {
+      val op = new GetTableTypesOperation(sessionHandle)
+      addOperation(op, sessionHandle)
+      op
+    })
+  }
+
+  /**
+   * Cancel the running operation unless it is already in a terminal state
+   */
+  @throws[HiveSQLException]
+  def cancelOperation(opHandle: OperationHandle, errMsg: String): Unit = {
+    val operation = getOperation(opHandle)
+    val opState = operation.getStatus.getState
+    if (opState.isTerminal) {
+      // Cancel should be a no-op
+      debug(s"$opHandle: Operation is already aborted in state - $opState")
+    } else {
+      debug(s"$opHandle: Attempting to cancel from state - $opState")
+      val operationState = OperationState.CANCELED
+      operationState.setErrorMessage(errMsg)
+      operation.cancel(operationState)
+    }
+  }
+
+  @throws[HiveSQLException]
+  def cancelOperation(opHandle: OperationHandle): Unit = {
+    cancelOperation(opHandle, "")
+  }
+
+  @throws[HiveSQLException]
+  def closeOperation(opHandle: OperationHandle): Unit = {
+    info("Closing operation: " + opHandle)
+    val operation = removeOperation(opHandle)
+    operation.close()
+  }
+
+  @throws[HiveSQLException]
+  def fetchResults(
+      opHandle: OperationHandle,
+      orientation: FetchOrientation,
+      maxRows: Long,
+      fetchType: FetchType): RowSet = {
+    if (fetchType == FetchType.QUERY_OUTPUT) {
+      getOperation(opHandle).getNextRowSet(orientation, maxRows)
+    } else {
+      getOperationLogRowSet(opHandle, orientation, maxRows)
+    }
+  }
+}
+
+object LivyOperationManager {
+ val LOG_SCHEMA: Schema = {
+    val schema = new Schema
+    val fieldSchema = new FieldSchema
+    fieldSchema.setName("operation_log")
+    fieldSchema.setType("string")
+    schema.addToFieldSchemas(fieldSchema)
+    schema
+  }
+}
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftServer.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftServer.scala
new file mode 100644
index 0000000..d34c1c0
--- /dev/null
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftServer.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hive.service.server.HiveServer2
+import org.scalatra.ScalatraServlet
+
+import org.apache.livy.{LivyConf, Logging}
+import org.apache.livy.server.AccessManager
+import org.apache.livy.server.interactive.InteractiveSession
+import org.apache.livy.server.recovery.SessionStore
+import org.apache.livy.sessions.InteractiveSessionManager
+
+/**
+ * The main entry point for the Livy thrift server leveraging HiveServer2. Starts up a
+ * `HiveThriftServer2` thrift server.
+ */
+object LivyThriftServer extends Logging {
+
+  // Visible for testing
+  private[thriftserver] var thriftServerThread: Thread = _
+  private var thriftServer: LivyThriftServer = _
+
+  private def hiveConf(livyConf: LivyConf): HiveConf = {
+    val conf = new HiveConf()
+    // Remove all configs coming from hive-site.xml which may be in the classpath for the Spark
+    // applications to run.
+    conf.getAllProperties.asScala.filter(_._1.startsWith("hive.")).foreach { case (key, _) =>
+      conf.unset(key)
+    }
+    livyConf.asScala.foreach {
+      case nameAndValue if nameAndValue.getKey.startsWith("livy.hive") =>
+        conf.set(nameAndValue.getKey.stripPrefix("livy."), nameAndValue.getValue)
+      case _ => // Ignore
+    }
+    conf
+  }
+
+  def start(
+      livyConf: LivyConf,
+      livySessionManager: InteractiveSessionManager,
+      sessionStore: SessionStore,
+      accessManager: AccessManager): Unit = synchronized {
+    if (thriftServerThread == null) {
+      info("Starting LivyThriftServer")
+      val runThriftServer = new Runnable {
+        override def run(): Unit = {
+          try {
+            thriftServer = new LivyThriftServer(
+              livyConf,
+              livySessionManager,
+              sessionStore,
+              accessManager)
+            thriftServer.init(hiveConf(livyConf))
+            thriftServer.start()
+            info("LivyThriftServer started")
+          } catch {
+            case e: Exception =>
+              error("Error starting LivyThriftServer", e)
+          }
+        }
+      }
+      thriftServerThread =
+        new Thread(new ThreadGroup("thriftserver"), runThriftServer, "Livy-Thriftserver")
+      thriftServerThread.start()
+    } else {
+      error("Livy Thriftserver is already started")
+    }
+  }
+
+  private[thriftserver] def getInstance: Option[LivyThriftServer] = {
+    Option(thriftServer)
+  }
+
+  // Used in testing
+  def stopServer(): Unit = {
+    if (thriftServerThread != null) {
+      thriftServerThread.join()
+    }
+    thriftServerThread = null
+    thriftServer.stop()
+    thriftServer = null
+  }
+}
+
+
+class LivyThriftServer(
+    private[thriftserver] val livyConf: LivyConf,
+    private[thriftserver] val livySessionManager: InteractiveSessionManager,
+    private[thriftserver] val sessionStore: SessionStore,
+    private val accessManager: AccessManager) extends HiveServer2 {
+  override def init(hiveConf: HiveConf): Unit = {
+    this.cliService = new LivyCLIService(this)
+    super.init(hiveConf)
+  }
+
+  private[thriftserver] def getSessionManager(): LivyThriftSessionManager = {
+    this.cliService.asInstanceOf[LivyCLIService].getSessionManager
+  }
+
+  def isAllowedToUse(user: String, session: InteractiveSession): Boolean = {
+    session.owner == user || accessManager.checkModifyPermissions(user)
+  }
+}
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
new file mode 100644
index 0000000..ec987c5
--- /dev/null
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
@@ -0,0 +1,635 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver
+
+import java.lang.reflect.UndeclaredThrowableException
+import java.net.URI
+import java.security.PrivilegedExceptionAction
+import java.util
+import java.util.{Date, Map => JMap, UUID}
+import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration.Duration
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.util.{Failure, Success, Try}
+
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+import org.apache.hadoop.hive.shims.Utils
+import org.apache.hive.service.CompositeService
+import org.apache.hive.service.cli.{HiveSQLException, SessionHandle}
+import org.apache.hive.service.rpc.thrift.TProtocolVersion
+import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup
+
+import org.apache.livy.LivyConf
+import org.apache.livy.Logging
+import org.apache.livy.server.interactive.{CreateInteractiveRequest, InteractiveSession}
+import org.apache.livy.sessions.Spark
+import org.apache.livy.thriftserver.SessionStates._
+import org.apache.livy.thriftserver.rpc.RpcClient
+import org.apache.livy.utils.LivySparkUtils
+
+class LivyThriftSessionManager(val server: LivyThriftServer, hiveConf: HiveConf)
+  extends CompositeService(classOf[LivyThriftSessionManager].getName) with Logging {
+
+  private[thriftserver] val operationManager = new LivyOperationManager(this)
+  private val sessionHandleToLivySession =
+    new ConcurrentHashMap[SessionHandle, Future[InteractiveSession]]()
+  // A map which returns how many incoming connections are open for a Livy session.
+  // This map tracks only the sessions created by the Livy thriftserver and not those which have
+  // been created through the REST API, as those should not be stopped even though there are no
+  // more active connections.
+  private val managedLivySessionActiveUsers = new mutable.HashMap[Int, Int]()
+
+  // Contains metadata about a session
+  private val sessionInfo = new ConcurrentHashMap[SessionHandle, SessionInfo]()
+
+  // Map the number of incoming connections for IP, user. It is used in order to check
+  // that the configured limits are not exceeded.
+  private val connectionsCount = new ConcurrentHashMap[String, AtomicLong]
+
+  // Timeout for a Spark session creation
+  private val maxSessionWait = Duration(
+    server.livyConf.getTimeAsMs(LivyConf.THRIFT_SESSION_CREATION_TIMEOUT),
+    scala.concurrent.duration.MILLISECONDS)
+
+  // Flag indicating whether the Spark version being used supports the USE database statement
+  val supportUseDatabase: Boolean = {
+    val sparkVersion = server.livyConf.get(LivyConf.LIVY_SPARK_VERSION)
+    val (sparkMajorVersion, _) = LivySparkUtils.formatSparkVersion(sparkVersion)
+    sparkMajorVersion > 1 || server.livyConf.getBoolean(LivyConf.ENABLE_HIVE_CONTEXT)
+  }
+
+  // Configs from Hive
+  private val userLimit = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER)
+  private val ipAddressLimit =
+    hiveConf.getIntVar(ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS)
+  private val userIpAddressLimit =
+    hiveConf.getIntVar(ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS)
+  private val checkInterval = HiveConf.getTimeVar(
+    hiveConf, ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL, TimeUnit.MILLISECONDS)
+  private val sessionTimeout = HiveConf.getTimeVar(
+    hiveConf, ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT, TimeUnit.MILLISECONDS)
+  private val checkOperation = HiveConf.getBoolVar(
+    hiveConf, ConfVars.HIVE_SERVER2_IDLE_SESSION_CHECK_OPERATION)
+
+  private var backgroundOperationPool: ThreadPoolExecutor = _
+
+  def getLivySession(sessionHandle: SessionHandle): InteractiveSession = {
+    val future = sessionHandleToLivySession.get(sessionHandle)
+    assert(future != null, s"Looking for not existing session: $sessionHandle.")
+
+    if (!future.isCompleted) {
+      Try(Await.result(future, maxSessionWait)) match {
+        case Success(session) => session
+        case Failure(e) => throw e.getCause
+      }
+    } else {
+      future.value match {
+        case Some(Success(session)) => session
+        case Some(Failure(e)) => throw e.getCause
+        case None => throw new RuntimeException("Future cannot be None when it is completed")
+      }
+    }
+  }
+
+  def livySessionId(sessionHandle: SessionHandle): Option[Int] = {
+    sessionHandleToLivySession.get(sessionHandle).value.filter(_.isSuccess).map(_.get.id)
+  }
+
+  def livySessionState(sessionHandle: SessionHandle): SessionStates = {
+    sessionHandleToLivySession.get(sessionHandle).value match {
+      case Some(Success(_)) => CREATION_SUCCESS
+      case Some(Failure(_)) => CREATION_FAILED
+      case None => CREATION_IN_PROGRESS
+    }
+  }
+
+  def onLivySessionOpened(livySession: InteractiveSession): Unit = {
+    server.livySessionManager.register(livySession)
+    synchronized {
+      managedLivySessionActiveUsers += livySession.id -> 0
+    }
+  }
+
+  def onUserSessionClosed(sessionHandle: SessionHandle, livySession: InteractiveSession): Unit = {
+    val closeSession = synchronized[Boolean] {
+      managedLivySessionActiveUsers.get(livySession.id) match {
+        case Some(1) =>
+          // it was the last user, so we can close the LivySession
+          managedLivySessionActiveUsers -= livySession.id
+          true
+        case Some(activeUsers) =>
+          managedLivySessionActiveUsers(livySession.id) = activeUsers - 1
+          false
+        case None =>
+          // This case can happen when we don't track the number of active users because the session
+          // has not been created in the thriftserver (ie. it has been created in the REST API).
+          false
+      }
+    }
+    if (closeSession) {
+      server.livySessionManager.delete(livySession)
+    } else {
+      // We unregister the session only if we don't close it, as it is unnecessary in that case
+      val rpcClient = new RpcClient(livySession)
+      try {
+        rpcClient.executeUnregisterSession(sessionHandle).get()
+      } catch {
+        case e: Exception => warn(s"Unable to unregister session $sessionHandle", e)
+      }
+    }
+  }
+
+  /**
+   * If the user specified an existing sessionId to use, the corresponding session is returned,
+   * otherwise a new session is created and returned.
+   */
+  private def getOrCreateLivySession(
+      sessionHandle: SessionHandle,
+      sessionId: Option[Int],
+      username: String,
+      createLivySession: () => InteractiveSession): InteractiveSession = {
+    sessionId match {
+      case Some(id) =>
+        server.livySessionManager.get(id) match {
+          case None =>
+            warn(s"InteractiveSession $id doesn't exist.")
+            throw new IllegalArgumentException(s"Session $id doesn't exist.")
+          case Some(session) if !server.isAllowedToUse(username, session) =>
+            warn(s"$username has no modify permissions to InteractiveSession $id.")
+            throw new IllegalAccessException(
+              s"$username is not allowed to use InteractiveSession $id.")
+          case Some(session) =>
+            if (session.state.isActive) {
+              info(s"Reusing Session $id for $sessionHandle.")
+              session
+            } else {
+              warn(s"InteractiveSession $id is not active anymore.")
+              throw new IllegalArgumentException(s"Session $id is not active anymore.")
+            }
+        }
+      case None =>
+        createLivySession()
+    }
+  }
+
+  /**
+   * Performs the initialization of the new Thriftserver session:
+   *  - adds the Livy thrifserver JAR to the Spark application;
+   *  - register the new Thriftserver session in the Spark application;
+   *  - runs the initialization statements;
+   */
+  private def initSession(
+      sessionHandle: SessionHandle,
+      livySession: InteractiveSession,
+      initStatements: List[String]): Unit = {
+    // Add the thriftserver jar to Spark application as we need to deserialize there the classes
+    // which handle the job submission.
+    // Note: if this is an already existing session, adding the JARs multiple times is not a
+    // problem as Spark ignores JARs which have already been added.
+    try {
+      livySession.addJar(LivyThriftSessionManager.thriftserverJarLocation(server.livyConf))
+    } catch {
+      case e: java.util.concurrent.ExecutionException
+          if Option(e.getCause).forall(_.getMessage.contains("has already been uploaded")) =>
+        // We have already uploaded the jar to this session, we can ignore this error
+        debug(e.getMessage, e)
+    }
+
+    val rpcClient = new RpcClient(livySession)
+    rpcClient.executeRegisterSession(sessionHandle).get()
+    initStatements.foreach { statement =>
+      val statementId = UUID.randomUUID().toString
+      try {
+        rpcClient.executeSql(sessionHandle, statementId, statement).get()
+      } finally {
+        Try(rpcClient.cleanupStatement(statementId).get()).failed.foreach { e =>
+          error(s"Failed to close init operation $statementId", e)
+        }
+      }
+    }
+  }
+
+  def openSession(
+      protocol: TProtocolVersion,
+      username: String,
+      password: String,
+      ipAddress: String,
+      sessionConf: JMap[String, String],
+      withImpersonation: Boolean,
+      delegationToken: String): SessionHandle = {
+    val sessionHandle = new SessionHandle(protocol)
+    incrementConnections(username, ipAddress, SessionInfo.getForwardedAddresses)
+    sessionInfo.put(sessionHandle,
+      new SessionInfo(username, ipAddress, SessionInfo.getForwardedAddresses, protocol))
+    val (initStatements, createInteractiveRequest, sessionId) =
+      LivyThriftSessionManager.processSessionConf(sessionConf, supportUseDatabase)
+    val createLivySession = () => {
+      createInteractiveRequest.kind = Spark
+      val newSession = InteractiveSession.create(
+        server.livySessionManager.nextId(),
+        username,
+        None,
+        server.livyConf,
+        createInteractiveRequest,
+        server.sessionStore)
+      onLivySessionOpened(newSession)
+      newSession
+    }
+    val futureLivySession = Future {
+      val livyServiceUGI = Utils.getUGI
+      var livySession: InteractiveSession = null
+      try {
+        livyServiceUGI.doAs(new PrivilegedExceptionAction[InteractiveSession] {
+          override def run(): InteractiveSession = {
+            livySession =
+              getOrCreateLivySession(sessionHandle, sessionId, username, createLivySession)
+            synchronized {
+              managedLivySessionActiveUsers.get(livySession.id).foreach { numUsers =>
+                managedLivySessionActiveUsers(livySession.id) = numUsers + 1
+              }
+            }
+            initSession(sessionHandle, livySession, initStatements)
+            livySession
+          }
+        })
+      } catch {
+        case e: UndeclaredThrowableException =>
+          throw new ThriftSessionCreationException(Option(livySession), e.getCause)
+        case e: Throwable =>
+          throw new ThriftSessionCreationException(Option(livySession), e)
+      }
+    }
+    sessionHandleToLivySession.put(sessionHandle, futureLivySession)
+    sessionHandle
+  }
+
+  def closeSession(sessionHandle: SessionHandle): Unit = {
+    val removedSession = sessionHandleToLivySession.remove(sessionHandle)
+    val removedSessionInfo = sessionInfo.remove(sessionHandle)
+    try {
+      removedSession.value match {
+        case Some(Success(interactiveSession)) =>
+          onUserSessionClosed(sessionHandle, interactiveSession)
+        case Some(Failure(e: ThriftSessionCreationException)) =>
+          e.livySession.foreach(onUserSessionClosed(sessionHandle, _))
+        case None =>
+          removedSession.onComplete {
+            case Success(interactiveSession) =>
+              onUserSessionClosed(sessionHandle, interactiveSession)
+            case Failure(e: ThriftSessionCreationException) =>
+              e.livySession.foreach(onUserSessionClosed(sessionHandle, _))
+          }
+        case _ => // We should never get here
+      }
+    } finally {
+      decrementConnections(removedSessionInfo)
+    }
+  }
+
+  // Taken from Hive
+  override def init(hiveConf: HiveConf): Unit = {
+    createBackgroundOperationPool(hiveConf)
+    info("Connections limit are user: {} ipaddress: {} user-ipaddress: {}",
+      userLimit, ipAddressLimit, userIpAddressLimit)
+    super.init(hiveConf)
+  }
+
+  // Taken from Hive
+  private def createBackgroundOperationPool(hiveConf: HiveConf): Unit = {
+    val poolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS)
+    info("HiveServer2: Background operation thread pool size: " + poolSize)
+    val poolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE)
+    info("HiveServer2: Background operation thread wait queue size: " + poolQueueSize)
+    val keepAliveTime = HiveConf.getTimeVar(
+      hiveConf, ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME, TimeUnit.SECONDS)
+    info(s"HiveServer2: Background operation thread keepalive time: $keepAliveTime seconds")
+    // Create a thread pool with #poolSize threads
+    // Threads terminate when they are idle for more than the keepAliveTime
+    // A bounded blocking queue is used to queue incoming operations, if #operations > poolSize
+    val threadPoolName = "LivyServer2-Background-Pool"
+    val queue = new LinkedBlockingQueue[Runnable](poolQueueSize)
+    backgroundOperationPool = new ThreadPoolExecutor(
+      poolSize,
+      poolSize,
+      keepAliveTime,
+      TimeUnit.SECONDS,
+      queue,
+      new ThreadFactoryWithGarbageCleanup(threadPoolName))
+    backgroundOperationPool.allowCoreThreadTimeOut(true)
+  }
+
+  // Taken from Hive
+  override def start(): Unit = {
+    super.start()
+    if (checkInterval > 0) startTimeoutChecker()
+  }
+
+  private val timeoutCheckerLock: Object = new Object
+  @volatile private var shutdown: Boolean = false
+
+  // Taken from Hive
+  private def startTimeoutChecker(): Unit = {
+    val interval: Long = Math.max(checkInterval, 3000L)
+    // minimum 3 seconds
+    val timeoutChecker: Runnable = new Runnable() {
+      override def run(): Unit = {
+        sleepFor(interval)
+        while (!shutdown) {
+          val current: Long = System.currentTimeMillis
+          val iterator = sessionHandleToLivySession.entrySet().iterator()
+          while (iterator.hasNext && ! shutdown) {
+            val entry = iterator.next()
+            val sessionHandle = entry.getKey
+            entry.getValue.value.flatMap(_.toOption).foreach { livySession =>
+
+              if (sessionTimeout > 0 && livySession.lastActivity + sessionTimeout <= current &&
+                 (!checkOperation || getNoOperationTime(sessionHandle) > sessionTimeout)) {
+                warn(s"Session $sessionHandle is Timed-out (last access : " +
+                  new Date(livySession.lastActivity) + ") and will be closed")
+                try {
+                  closeSession(sessionHandle)
+                } catch {
+                  case e: HiveSQLException =>
+                    warn(s"Exception is thrown closing session $sessionHandle", e)
+                }
+              } else {
+                val operations = operationManager.getTimedOutOperations(sessionHandle)
+                if (operations.nonEmpty) {
+                  operations.foreach { op =>
+                    try {
+                      warn(s"Operation ${op.getHandle} is timed-out and will be closed")
+                      operationManager.closeOperation(op.getHandle)
+                    } catch {
+                      case e: Exception =>
+                        warn("Exception is thrown closing timed-out operation: " + op.getHandle, e)
+                    }
+                  }
+                }
+              }
+            }
+          }
+          sleepFor(interval)
+        }
+      }
+
+      private def sleepFor(interval: Long): Unit = {
+        timeoutCheckerLock.synchronized {
+          try {
+            timeoutCheckerLock.wait(interval)
+          } catch {
+            case e: InterruptedException =>
+            // Ignore, and break.
+          }
+        }
+      }
+    }
+    backgroundOperationPool.execute(timeoutChecker)
+  }
+
+  // Taken from Hive
+  private def shutdownTimeoutChecker(): Unit = {
+    shutdown = true
+    timeoutCheckerLock.synchronized { timeoutCheckerLock.notify() }
+  }
+
+  // Taken from Hive
+  override def stop(): Unit = {
+    super.stop()
+    shutdownTimeoutChecker()
+    if (backgroundOperationPool != null) {
+      backgroundOperationPool.shutdown()
+      val timeout =
+        hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)
+      try {
+        backgroundOperationPool.awaitTermination(timeout, TimeUnit.SECONDS)
+      } catch {
+        case e: InterruptedException =>
+          warn("HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + timeout +
+            " seconds has been exceeded. RUNNING background operations will be shut down", e)
+      }
+      backgroundOperationPool = null
+    }
+  }
+
+  // Taken from Hive
+  @throws[HiveSQLException]
+  private def incrementConnections(
+      username: String,
+      ipAddress: String,
+      forwardedAddresses: util.List[String]): Unit = {
+    val clientIpAddress: String = getOriginClientIpAddress(ipAddress, forwardedAddresses)
+    val violation = anyViolations(username, clientIpAddress)
+    // increment the counters only when there are no violations
+    if (violation.isEmpty) {
+      if (trackConnectionsPerUser(username)) incrementConnectionsCount(username)
+      if (trackConnectionsPerIpAddress(clientIpAddress)) incrementConnectionsCount(clientIpAddress)
+      if (trackConnectionsPerUserIpAddress(username, clientIpAddress)) {
+        incrementConnectionsCount(username + ":" + clientIpAddress)
+      }
+    } else {
+      error(violation.get)
+      throw new HiveSQLException(violation.get)
+    }
+  }
+
+  // Taken from Hive
+  private def incrementConnectionsCount(key: String): Unit = {
+    if (!connectionsCount.containsKey(key)) connectionsCount.get(key).incrementAndGet
+    else connectionsCount.put(key, new AtomicLong)
+  }
+
+  // Taken from Hive
+  private def decrementConnectionsCount(key: String): Unit = {
+    if (!connectionsCount.containsKey(key)) connectionsCount.get(key).decrementAndGet
+    else connectionsCount.put(key, new AtomicLong)
+  }
+
+  // Taken from Hive
+  private def getOriginClientIpAddress(ipAddress: String, forwardedAddresses: util.List[String]) = {
+    if (forwardedAddresses == null || forwardedAddresses.isEmpty) {
+      ipAddress
+    } else {
+      // order of forwarded ips per X-Forwarded-For http spec (client, proxy1, proxy2)
+      forwardedAddresses.get(0)
+    }
+  }
+
+  // Taken from Hive
+  private def anyViolations(username: String, ipAddress: String): Option[String] = {
+    val userAndAddress = username + ":" + ipAddress
+    if (trackConnectionsPerUser(username) && !withinLimits(username, userLimit)) {
+      Some(s"Connection limit per user reached (user: $username limit: $userLimit)")
+    } else if (trackConnectionsPerIpAddress(ipAddress) &&
+        !withinLimits(ipAddress, ipAddressLimit)) {
+      Some(s"Connection limit per ipaddress reached (ipaddress: $ipAddress limit: " +
+        s"$ipAddressLimit)")
+    } else if (trackConnectionsPerUserIpAddress(username, ipAddress) &&
+        !withinLimits(userAndAddress, userIpAddressLimit)) {
+      Some(s"Connection limit per user:ipaddress reached (user:ipaddress: $userAndAddress " +
+        s"limit: $userIpAddressLimit)")
+    } else {
+      None
+    }
+  }
+
+  // Taken from Hive
+  private def trackConnectionsPerUserIpAddress(username: String, ipAddress: String): Boolean = {
+    userIpAddressLimit > 0 && username != null && !username.isEmpty && ipAddress != null &&
+      !ipAddress.isEmpty
+  }
+
+  // Taken from Hive
+  private def trackConnectionsPerIpAddress(ipAddress: String): Boolean = {
+    ipAddressLimit > 0 && ipAddress != null && !ipAddress.isEmpty
+  }
+
+  // Taken from Hive
+  private def trackConnectionsPerUser(username: String): Boolean = {
+    userLimit > 0 && username != null && !username.isEmpty
+  }
+
+  // Taken from Hive
+  private def withinLimits(track: String, limit: Int): Boolean = {
+    !(connectionsCount.containsKey(track) && connectionsCount.get(track).intValue >= limit)
+  }
+
+  private def decrementConnections(sessionInfo: SessionInfo): Unit = {
+    val username = sessionInfo.username
+    val clientIpAddress = getOriginClientIpAddress(
+      sessionInfo.ipAddress, sessionInfo.forwardedAddresses)
+    if (trackConnectionsPerUser(username)) {
+      decrementConnectionsCount(username)
+    }
+    if (trackConnectionsPerIpAddress(clientIpAddress)) {
+      decrementConnectionsCount(clientIpAddress)
+    }
+    if (trackConnectionsPerUserIpAddress(username, clientIpAddress)) {
+      decrementConnectionsCount(username + ":" + clientIpAddress)
+    }
+  }
+
+  def submitBackgroundOperation(r: Runnable): util.concurrent.Future[_] = {
+    backgroundOperationPool.submit(r)
+  }
+
+  def getNoOperationTime(sessionHandle: SessionHandle): Long = {
+    if (operationManager.getOperations(sessionHandle).isEmpty) {
+      System.currentTimeMillis() - getLivySession(sessionHandle).lastActivity
+    } else {
+      0
+    }
+  }
+
+  def getSessions: Set[SessionHandle] = {
+    sessionInfo.keySet().asScala.toSet
+  }
+
+  def getSessionInfo(sessionHandle: SessionHandle): SessionInfo = {
+    sessionInfo.get(sessionHandle)
+  }
+}
+
+object LivyThriftSessionManager extends Logging {
+  // Users can explicitly set the Livy connection id they want to connect to using this hiveconf
+  // variable
+  private val livySessionIdConfigKey = "set:hiveconf:livy.server.sessionId"
+  private val livySessionConfRegexp = "set:hiveconf:livy.session.conf.(.*)".r
+  private val hiveVarPattern = "set:hivevar:(.*)".r
+  private val JAR_LOCATION = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
+
+  def thriftserverJarLocation(livyConf: LivyConf): URI = {
+    Option(livyConf.get(LivyConf.THRIFT_SERVER_JAR_LOCATION)).map(new URI(_))
+      .getOrElse(JAR_LOCATION)
+  }
+
+  private def convertConfValueToInt(key: String, value: String) = {
+    val res = Try(value.toInt)
+    if (res.isFailure) {
+      warn(s"Ignoring $key = $value as it is not a valid integer")
+      None
+    } else {
+      Some(res.get)
+    }
+  }
+
+  private def processSessionConf(
+      sessionConf: JMap[String, String],
+      supportUseDatabase: Boolean): (List[String], CreateInteractiveRequest, Option[Int]) = {
+    if (null != sessionConf && !sessionConf.isEmpty) {
+      val statements = new mutable.ListBuffer[String]
+      val extraLivyConf = new mutable.ListBuffer[(String, String)]
+      val createInteractiveRequest = new CreateInteractiveRequest
+      sessionConf.asScala.foreach {
+        case (key, value) =>
+          key match {
+            case v if v.startsWith("use:") && supportUseDatabase =>
+              statements += s"use $value"
+            // Process session configs for Livy session creation request
+            case "set:hiveconf:livy.session.driverMemory" =>
+              createInteractiveRequest.driverMemory = Some(value)
+            case "set:hiveconf:livy.session.driverCores" =>
+              createInteractiveRequest.driverCores = convertConfValueToInt(key, value)
+            case "set:hiveconf:livy.session.executorMemory" =>
+              createInteractiveRequest.executorMemory = Some(value)
+            case "set:hiveconf:livy.session.executorCores" =>
+              createInteractiveRequest.executorCores = convertConfValueToInt(key, value)
+            case "set:hiveconf:livy.session.queue" =>
+              createInteractiveRequest.queue = Some(value)
+            case "set:hiveconf:livy.session.name" =>
+              createInteractiveRequest.name = Some(value)
+            case "set:hiveconf:livy.session.heartbeatTimeoutInSecond" =>
+              convertConfValueToInt(key, value).foreach { heartbeatTimeoutInSecond =>
+                createInteractiveRequest.heartbeatTimeoutInSecond = heartbeatTimeoutInSecond
+              }
+            case livySessionConfRegexp(livyConfKey) => extraLivyConf += (livyConfKey -> value)
+            // set the hivevars specified by the user
+            case hiveVarPattern(confKey) => statements += s"set hivevar:${confKey.trim}=$value"
+            case _ if key == livySessionIdConfigKey => // Ignore it, we handle it later
+            case _ =>
+              info(s"Ignoring key: $key = '$value'")
+          }
+      }
+      createInteractiveRequest.conf = extraLivyConf.toMap
+      val sessionId = Option(sessionConf.get(livySessionIdConfigKey)).flatMap { id =>
+        val res = Try(id.toInt)
+        if (res.isFailure) {
+          warn(s"Ignoring $livySessionIdConfigKey=$id as it is not an int.")
+          None
+        } else {
+          Some(res.get)
+        }
+      }
+      (statements.toList, createInteractiveRequest, sessionId)
+    } else {
+      (List(), new CreateInteractiveRequest, None)
+    }
+  }
+}
+
+/**
+ * Exception which happened during the session creation and/or initialization. It contains the
+ * `livySession` (if it was created) where the error occurred and the `cause` of the error.
+ */
+class ThriftSessionCreationException(val livySession: Option[InteractiveSession], cause: Throwable)
+  extends Exception(cause)
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/SessionInfo.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/SessionInfo.scala
new file mode 100644
index 0000000..4ebf867
--- /dev/null
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/SessionInfo.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver
+
+import java.util
+
+import org.apache.hive.service.rpc.thrift.TProtocolVersion
+
+import org.apache.livy.Logging
+
+case class SessionInfo(username: String,
+    ipAddress: String,
+    forwardedAddresses: util.List[String],
+    protocolVersion: TProtocolVersion) {
+  val creationTime: Long = System.currentTimeMillis()
+}
+
+/**
+ * Mirrors Hive behavior which stores thread local information in its session manager.
+ */
+object SessionInfo extends Logging {
+
+  private val threadLocalIpAddress = new ThreadLocal[String]
+
+  def setIpAddress(ipAddress: String): Unit = {
+    threadLocalIpAddress.set(ipAddress)
+  }
+
+  def clearIpAddress(): Unit = {
+    threadLocalIpAddress.remove()
+  }
+
+  def getIpAddress: String = threadLocalIpAddress.get
+
+  private val threadLocalForwardedAddresses = new ThreadLocal[util.List[String]]
+
+  def setForwardedAddresses(ipAddress: util.List[String]): Unit = {
+    threadLocalForwardedAddresses.set(ipAddress)
+  }
+
+  def clearForwardedAddresses(): Unit = {
+    threadLocalForwardedAddresses.remove()
+  }
+
+  def getForwardedAddresses: util.List[String] = threadLocalForwardedAddresses.get
+
+  private val threadLocalUserName = new ThreadLocal[String]() {
+    override protected def initialValue: String = null
+  }
+
+  def setUserName(userName: String): Unit = {
+    threadLocalUserName.set(userName)
+  }
+
+  def clearUserName(): Unit = {
+    threadLocalUserName.remove()
+  }
+
+  def getUserName: String = threadLocalUserName.get
+
+  private val threadLocalProxyUserName = new ThreadLocal[String]() {
+    override protected def initialValue: String = null
+  }
+
+  def setProxyUserName(userName: String): Unit = {
+    debug("setting proxy user name based on query param to: " + userName)
+    threadLocalProxyUserName.set(userName)
+  }
+
+  def getProxyUserName: String = threadLocalProxyUserName.get
+
+  def clearProxyUserName(): Unit = {
+    threadLocalProxyUserName.remove()
+  }
+}
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/SessionStates.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/SessionStates.scala
new file mode 100644
index 0000000..c4fd248
--- /dev/null
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/SessionStates.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver
+
+object SessionStates extends Enumeration {
+  type SessionStates = Value
+
+  val CREATION_SUCCESS = Value("success")
+  val CREATION_FAILED = Value("failed")
+  val CREATION_IN_PROGRESS = Value("in_progress")
+}
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/rpc/RpcClient.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/rpc/RpcClient.scala
new file mode 100644
index 0000000..75bab0b
--- /dev/null
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/rpc/RpcClient.scala
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.rpc
+
+import java.lang.reflect.InvocationTargetException
+
+import scala.collection.immutable.HashMap
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+
+import org.apache.hive.service.cli.SessionHandle
+
+import org.apache.livy._
+import org.apache.livy.server.interactive.InteractiveSession
+import org.apache.livy.thriftserver.serde.ColumnOrientedResultSet
+import org.apache.livy.thriftserver.types.DataType
+import org.apache.livy.utils.LivySparkUtils
+
+class RpcClient(livySession: InteractiveSession) extends Logging {
+  import RpcClient._
+
+  private val isSpark1 = {
+    val (sparkMajorVersion, _) =
+      LivySparkUtils.formatSparkVersion(livySession.livyConf.get(LivyConf.LIVY_SPARK_VERSION))
+    sparkMajorVersion == 1
+  }
+  private val defaultIncrementalCollect =
+    livySession.livyConf.getBoolean(LivyConf.THRIFT_INCR_COLLECT_ENABLED).toString
+
+  private val rscClient = livySession.client.get
+
+  def isValid: Boolean = rscClient.isAlive
+
+  private def sessionId(sessionHandle: SessionHandle): String = {
+    sessionHandle.getSessionId.toString
+  }
+
+  @throws[Exception]
+  def executeSql(
+      sessionHandle: SessionHandle,
+      statementId: String,
+      statement: String): JobHandle[_] = {
+    info(s"RSC client is executing SQL query: $statement, statementId = $statementId, session = " +
+      sessionHandle)
+    require(null != statementId, s"Invalid statementId specified. StatementId = $statementId")
+    require(null != statement, s"Invalid statement specified. StatementId = $statement")
+    livySession.recordActivity()
+    rscClient.submit(executeSqlJob(sessionId(sessionHandle),
+      statementId,
+      statement,
+      isSpark1,
+      defaultIncrementalCollect,
+      s"spark.${LivyConf.THRIFT_INCR_COLLECT_ENABLED}"))
+  }
+
+  @throws[Exception]
+  def fetchResult(statementId: String,
+      types: Array[DataType],
+      maxRows: Int): JobHandle[ColumnOrientedResultSet] = {
+    info(s"RSC client is fetching result for statementId $statementId with $maxRows maxRows.")
+    require(null != statementId, s"Invalid statementId specified. StatementId = $statementId")
+    livySession.recordActivity()
+    rscClient.submit(fetchResultJob(statementId, types, maxRows))
+  }
+
+  @throws[Exception]
+  def fetchResultSchema(statementId: String): JobHandle[String] = {
+    info(s"RSC client is fetching result schema for statementId = $statementId")
+    require(null != statementId, s"Invalid statementId specified. statementId = $statementId")
+    livySession.recordActivity()
+    rscClient.submit(fetchResultSchemaJob(statementId))
+  }
+
+  @throws[Exception]
+  def cleanupStatement(statementId: String, cancelJob: Boolean = false): JobHandle[_] = {
+    info(s"Cleaning up remote session for statementId = $statementId")
+    require(null != statementId, s"Invalid statementId specified. statementId = $statementId")
+    livySession.recordActivity()
+    rscClient.submit(cleanupStatementJob(statementId))
+  }
+
+  /**
+   * Creates a new Spark context for the specified session and stores it in a shared variable so
+   * that any incoming session uses a different one: it is needed in order to avoid interactions
+   * between different users working on the same remote Livy session (eg. setting a property,
+   * changing database, etc.).
+   */
+  @throws[Exception]
+  def executeRegisterSession(sessionHandle: SessionHandle): JobHandle[_] = {
+    info(s"RSC client is executing register session $sessionHandle")
+    livySession.recordActivity()
+    rscClient.submit(registerSessionJob(sessionId(sessionHandle), isSpark1))
+  }
+
+  /**
+   * Removes the Spark session created for the specified session from the shared variable.
+   */
+  @throws[Exception]
+  def executeUnregisterSession(sessionHandle: SessionHandle): JobHandle[_] = {
+    info(s"RSC client is executing unregister session $sessionHandle")
+    livySession.recordActivity()
+    rscClient.submit(unregisterSessionJob(sessionId(sessionHandle)))
+  }
+}
+
+/**
+ * As remotely we don't have any class instance, all the job definitions are placed here in
+ * order to enforce that we are not accessing any class attribute
+ */
+object RpcClient {
+  // Maps a session ID to its SparkSession (or HiveContext/SQLContext according to the Spark
+  // version used)
+  val SESSION_SPARK_ENTRY_MAP = "livy.thriftserver.rpc_sessionIdToSparkSQLSession"
+  val STATEMENT_RESULT_ITER_MAP = "livy.thriftserver.rpc_statementIdToResultIter"
+  val STATEMENT_SCHEMA_MAP = "livy.thriftserver.rpc_statementIdToSchema"
+
+  private def registerSessionJob(sessionId: String, isSpark1: Boolean): Job[_] = new Job[Boolean] {
+    override def call(jc: JobContext): Boolean = {
+      val spark: Any = if (isSpark1) {
+        Option(jc.hivectx()).getOrElse(jc.sqlctx())
+      } else {
+        jc.sparkSession()
+      }
+      val sessionSpecificSpark = spark.getClass.getMethod("newSession").invoke(spark)
+      jc.sc().synchronized {
+        val existingMap =
+          Try(jc.getSharedObject[HashMap[String, AnyRef]](SESSION_SPARK_ENTRY_MAP))
+            .getOrElse(new HashMap[String, AnyRef]())
+        jc.setSharedObject(SESSION_SPARK_ENTRY_MAP,
+          existingMap + ((sessionId, sessionSpecificSpark)))
+        Try(jc.getSharedObject[HashMap[String, String]](STATEMENT_SCHEMA_MAP))
+          .failed.foreach { _ =>
+          jc.setSharedObject(STATEMENT_SCHEMA_MAP, new HashMap[String, String]())
+        }
+        Try(jc.getSharedObject[HashMap[String, Iterator[_]]](STATEMENT_RESULT_ITER_MAP))
+          .failed.foreach { _ =>
+          jc.setSharedObject(STATEMENT_RESULT_ITER_MAP, new HashMap[String, Iterator[_]]())
+        }
+      }
+      true
+    }
+  }
+
+  private def unregisterSessionJob(sessionId: String): Job[_] = new Job[Boolean] {
+    override def call(jobContext: JobContext): Boolean = {
+      jobContext.sc().synchronized {
+        val existingMap =
+          jobContext.getSharedObject[HashMap[String, AnyRef]](SESSION_SPARK_ENTRY_MAP)
+        jobContext.setSharedObject(SESSION_SPARK_ENTRY_MAP, existingMap - sessionId)
+      }
+      true
+    }
+  }
+
+  private def cleanupStatementJob(statementId: String): Job[_] = new Job[Boolean] {
+    override def call(jc: JobContext): Boolean = {
+      val sparkContext = jc.sc()
+      sparkContext.cancelJobGroup(statementId)
+      sparkContext.synchronized {
+        // Clear job group only if current job group is same as expected job group.
+        if (sparkContext.getLocalProperty("spark.jobGroup.id") == statementId) {
+          sparkContext.clearJobGroup()
+        }
+        val iterMap = jc.getSharedObject[HashMap[String, Iterator[_]]](STATEMENT_RESULT_ITER_MAP)
+        jc.setSharedObject(STATEMENT_RESULT_ITER_MAP, iterMap - statementId)
+        val schemaMap = jc.getSharedObject[HashMap[String, String]](STATEMENT_SCHEMA_MAP)
+        jc.setSharedObject(STATEMENT_SCHEMA_MAP, schemaMap - statementId)
+      }
+      true
+    }
+  }
+
+  private def fetchResultSchemaJob(statementId: String): Job[String] = new Job[String] {
+    override def call(jobContext: JobContext): String = {
+      jobContext.getSharedObject[HashMap[String, String]](STATEMENT_SCHEMA_MAP)(statementId)
+    }
+  }
+
+  private def fetchResultJob(statementId: String,
+      types: Array[DataType],
+      maxRows: Int): Job[ColumnOrientedResultSet] = new Job[ColumnOrientedResultSet] {
+    override def call(jobContext: JobContext): ColumnOrientedResultSet = {
+      val statementIterMap =
+        jobContext.getSharedObject[HashMap[String, Iterator[_]]](STATEMENT_RESULT_ITER_MAP)
+      val iter = statementIterMap(statementId)
+
+      if (null == iter) {
+        // Previous query execution failed.
+        throw new NoSuchElementException("No successful query executed for output")
+      }
+
+      val resultSet = new ColumnOrientedResultSet(types)
+      val numOfColumns = types.length
+      if (!iter.hasNext) {
+        resultSet
+      } else {
+        var curRow = 0
+        while (curRow < maxRows && iter.hasNext) {
+          val sparkRow = iter.next()
+          val row = ArrayBuffer[Any]()
+          var curCol: Integer = 0
+          while (curCol < numOfColumns) {
+            row += sparkRow.getClass.getMethod("get", classOf[Int]).invoke(sparkRow, curCol)
+            curCol += 1
+          }
+          resultSet.addRow(row.toArray.asInstanceOf[Array[Object]])
+          curRow += 1
+        }
+        resultSet
+      }
+    }
+  }
+
+  private def executeSqlJob(sessionId: String,
+      statementId: String,
+      statement: String,
+      isSpark1: Boolean,
+      defaultIncrementalCollect: String,
+      incrementalCollectEnabledProp: String): Job[_] = new Job[Boolean] {
+    override def call(jc: JobContext): Boolean = {
+      val sparkContext = jc.sc()
+      sparkContext.synchronized {
+        sparkContext.setJobGroup(statementId, statement)
+      }
+      val spark = jc.getSharedObject[HashMap[String, AnyRef]](SESSION_SPARK_ENTRY_MAP)(sessionId)
+      try {
+        val result = spark.getClass.getMethod("sql", classOf[String]).invoke(spark, statement)
+        val schema = result.getClass.getMethod("schema").invoke(result)
+        val jsonString = schema.getClass.getMethod("json").invoke(schema).asInstanceOf[String]
+
+        // Set the schema in the shared map
+        sparkContext.synchronized {
+          val existingMap = jc.getSharedObject[HashMap[String, String]](STATEMENT_SCHEMA_MAP)
+          jc.setSharedObject(STATEMENT_SCHEMA_MAP, existingMap + ((statementId, jsonString)))
+        }
+
+        val incrementalCollect = {
+          if (isSpark1) {
+            spark.getClass.getMethod("getConf", classOf[String], classOf[String])
+              .invoke(spark,
+                incrementalCollectEnabledProp,
+                defaultIncrementalCollect)
+              .asInstanceOf[String].toBoolean
+          } else {
+            val conf = spark.getClass.getMethod("conf").invoke(spark)
+            conf.getClass.getMethod("get", classOf[String], classOf[String])
+              .invoke(conf,
+                incrementalCollectEnabledProp,
+                defaultIncrementalCollect)
+              .asInstanceOf[String].toBoolean
+          }
+        }
+
+        val iter = if (incrementalCollect) {
+          val rdd = result.getClass.getMethod("rdd").invoke(result)
+          rdd.getClass.getMethod("toLocalIterator").invoke(rdd).asInstanceOf[Iterator[_]]
+        } else {
+          result.getClass.getMethod("collect").invoke(result).asInstanceOf[Array[_]].iterator
+        }
+
+        // Set the iterator in the shared map
+        sparkContext.synchronized {
+          val existingMap =
+            jc.getSharedObject[HashMap[String, Iterator[_]]](STATEMENT_RESULT_ITER_MAP)
+          jc.setSharedObject(STATEMENT_RESULT_ITER_MAP, existingMap + ((statementId, iter)))
+        }
+      } catch {
+        case e: InvocationTargetException => throw e.getCause
+      }
+
+      true
+    }
+  }
+}
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/serde/ColumnBuffer.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/serde/ColumnBuffer.scala
new file mode 100644
index 0000000..248a77d
--- /dev/null
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/serde/ColumnBuffer.scala
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.serde
+
+import java.nio.ByteBuffer
+import java.util
+
+import scala.collection.mutable
+
+import org.apache.livy.thriftserver.types.{DataType, DataTypeUtils}
+
+object ColumnBuffer {
+  private val DEFAULT_SIZE = 100
+  private val EMPTY_BINARY = ByteBuffer.allocate(0)
+  private val EMPTY_STRING = ""
+  private val HANDLED_TYPES =
+    Set("boolean", "byte", "short", "integer", "long", "float", "double", "binary")
+}
+
+class ColumnBuffer(val dataType: DataType) {
+  private val nulls = new mutable.BitSet()
+  private var currentSize = 0
+  private var boolVars: Array[Boolean] = _
+  private var byteVars: Array[Byte] = _
+  private var shortVars: Array[Short] = _
+  private var intVars: Array[Int] = _
+  private var longVars: Array[Long] = _
+  private var doubleVars: Array[Double] = _
+  private var stringVars: util.List[String] = _
+  private var binaryVars: util.List[ByteBuffer] = _
+
+  dataType.name match {
+    case "boolean" =>
+      boolVars = new Array[Boolean](ColumnBuffer.DEFAULT_SIZE)
+    case "byte" =>
+      byteVars = new Array[Byte](ColumnBuffer.DEFAULT_SIZE)
+    case "short" =>
+      shortVars = new Array[Short](ColumnBuffer.DEFAULT_SIZE)
+    case "integer" =>
+      intVars = new Array[Int](ColumnBuffer.DEFAULT_SIZE)
+    case "long" =>
+      longVars = new Array[Long](ColumnBuffer.DEFAULT_SIZE)
+    case "float" | "double" =>
+      doubleVars = new Array[Double](ColumnBuffer.DEFAULT_SIZE)
+    case "binary" =>
+      binaryVars = new util.ArrayList[ByteBuffer]
+    case "void" => // all NULLs, nothing to do
+    case _ =>
+      stringVars = new util.ArrayList[String]
+  }
+
+  def get(index: Int): Any = {
+    if (this.nulls(index)) {
+      null
+    } else {
+      dataType.name match {
+        case "boolean" =>
+          boolVars(index)
+        case "byte" =>
+          byteVars(index)
+        case "short" =>
+          shortVars(index)
+        case "integer" =>
+          intVars(index)
+        case "long" =>
+          longVars(index)
+        case "float" | "double" =>
+          doubleVars(index)
+        case "binary" =>
+          binaryVars.get(index).array()
+        case _ =>
+          stringVars.get(index)
+      }
+    }
+  }
+
+  def size: Int = currentSize
+
+  def addValue(field: Any): Unit = {
+    if (field == null) {
+      nulls += currentSize
+      if (!ColumnBuffer.HANDLED_TYPES.contains(dataType.name)) {
+        stringVars.add(ColumnBuffer.EMPTY_STRING)
+      } else if (dataType.name == "binary") {
+        binaryVars.add(ColumnBuffer.EMPTY_BINARY)
+      }
+    } else {
+      dataType.name match {
+        case "boolean" =>
+          ensureBoolVarsSize()
+          boolVars(currentSize) = field.asInstanceOf[Boolean]
+        case "byte" =>
+          ensureByteVarsSize()
+          byteVars(currentSize) = field.asInstanceOf[Byte]
+        case "short" =>
+          ensureShortVarsSize()
+          shortVars(currentSize) = field.asInstanceOf[Short]
+        case "integer" =>
+          ensureIntVarsSize()
+          intVars(currentSize) = field.asInstanceOf[Int]
+        case "long" =>
+          ensureLongVarsSize()
+          longVars(currentSize) = field.asInstanceOf[Long]
+        case "float" =>
+          ensureDoubleVarsSize()
+          // We need to convert the float to string and then to double in order to avoid precision
+          // issues caused by the poor precision of Float
+          doubleVars(currentSize) = field.toString.toDouble
+        case "double" =>
+          ensureDoubleVarsSize()
+          doubleVars(currentSize) = field.asInstanceOf[Double]
+        case "binary" =>
+          binaryVars.add(ByteBuffer.wrap(field.asInstanceOf[Array[Byte]]))
+        case _ =>
+          stringVars.add(DataTypeUtils.toHiveString(field, dataType))
+      }
+    }
+
+    currentSize += 1
+  }
+
+  private def ensureBoolVarsSize(): Unit = if (boolVars.length == currentSize) {
+    val newVars = new Array[Boolean](currentSize << 1)
+    System.arraycopy(boolVars, 0, newVars, 0, currentSize)
+    boolVars = newVars
+  }
+
+  private def ensureByteVarsSize(): Unit = if (byteVars.length == currentSize) {
+    val newVars = new Array[Byte](currentSize << 1)
+    System.arraycopy(byteVars, 0, newVars, 0, currentSize)
+    byteVars = newVars
+  }
+
+  private def ensureShortVarsSize(): Unit = if (shortVars.length == currentSize) {
+    val newVars = new Array[Short](currentSize << 1)
+    System.arraycopy(shortVars, 0, newVars, 0, currentSize)
+    shortVars = newVars
+  }
+
+  private def ensureIntVarsSize(): Unit = if (intVars.length == currentSize) {
+    val newVars = new Array[Int](currentSize << 1)
+    System.arraycopy(intVars, 0, newVars, 0, currentSize)
+    intVars = newVars
+  }
+
+  private def ensureLongVarsSize(): Unit = if (longVars.length == currentSize) {
+    val newVars = new Array[Long](currentSize << 1)
+    System.arraycopy(longVars, 0, newVars, 0, currentSize)
+    longVars = newVars
+  }
+
+  private def ensureDoubleVarsSize(): Unit = if (doubleVars.length == currentSize) {
+    val newVars = new Array[Double](currentSize << 1)
+    System.arraycopy(doubleVars, 0, newVars, 0, currentSize)
+    doubleVars = newVars
+  }
+
+  private[thriftserver] def getColumnValues: Any = dataType.name match {
+    case "boolean" => boolVars.take(currentSize)
+    case "byte" => byteVars.take(currentSize)
+    case "short" => shortVars.take(currentSize)
+    case "integer" => intVars.take(currentSize)
+    case "long" => longVars.take(currentSize)
+    case "float" => doubleVars.take(currentSize)
+    case "double" => doubleVars.take(currentSize)
+    case "binary" => binaryVars
+    case _ => stringVars
+  }
+
+  private[thriftserver] def getNulls: util.BitSet = util.BitSet.valueOf(nulls.toBitMask)
+}
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/serde/ColumnOrientedResultSet.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/serde/ColumnOrientedResultSet.scala
new file mode 100644
index 0000000..64b6e4e
--- /dev/null
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/serde/ColumnOrientedResultSet.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.serde
+
+import org.apache.livy.thriftserver.types.DataType
+
+/**
+ * Utility class for (de-)serialize the results from the Spark application and Livy thriftserver.
+ */
+class ColumnOrientedResultSet(val types: Array[DataType]) {
+  val columns: Array[ColumnBuffer] = types.map(new ColumnBuffer(_))
+  def addRow(fields: Array[AnyRef]): Unit = {
+    var i = 0
+    while (i < fields.length) {
+      val field = fields(i)
+      columns(i).addValue(field)
+      i += 1
+    }
+  }
+}
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/types/DataType.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/types/DataType.scala
new file mode 100644
index 0000000..c942b46
--- /dev/null
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/types/DataType.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.types
+
+private[thriftserver] trait DataType {
+  def name: String
+}
+
+private[thriftserver] case class BasicDataType(name: String) extends DataType
+
+private[thriftserver] case class StructField(name: String, dataType: DataType)
+
+private[thriftserver]case class StructType(fields: Array[StructField]) extends DataType {
+  val name = "struct"
+}
+
+private[thriftserver] case class ArrayType(elementsType: DataType) extends DataType {
+  val name = "array"
+}
+
+private[thriftserver] case class MapType(keyType: DataType, valueType: DataType) extends DataType {
+  val name = "map"
+}
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/types/DataTypeUtils.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/types/DataTypeUtils.scala
new file mode 100644
index 0000000..1644fdb
--- /dev/null
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/types/DataTypeUtils.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.types
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema
+import org.apache.hadoop.hive.serde2.thrift.Type
+import org.apache.hive.service.cli.TableSchema
+import org.json4s.{DefaultFormats, JValue}
+import org.json4s.JsonAST.{JObject, JString}
+import org.json4s.jackson.JsonMethods.parse
+
+import org.apache.livy.Logging
+
+/**
+ * Utility class for converting and representing Spark and Hive data types.
+ */
+object DataTypeUtils extends Logging {
+  // Used for JSON conversion
+  private implicit val formats = DefaultFormats
+
+  private def toHive(jValue: JValue): String = {
+    jValue match {
+      case JString(t) => primitiveToHive(t)
+      case o: JObject => complexToHive(o)
+      case _ => throw new IllegalArgumentException(
+        s"Spark type was neither a string nor a object. It was: $jValue.")
+    }
+  }
+
+  private def getInternalType(jValue: JValue): DataType = {
+    jValue match {
+      case JString(t) => BasicDataType(t)
+      case o: JObject => complexToInternal(o)
+      case _ => throw new IllegalArgumentException(
+        s"Spark type was neither a string nor a object. It was: $jValue.")
+    }
+  }
+
+  private def primitiveToHive(sparkType: String): String = {
+    sparkType match {
+      case "integer" => "int"
+      case "long" => "bigint"
+      case "short" => "smallint"
+      case "byte" => "tinyint"
+      case "null" => "void"
+      // boolean, string, float, double, decimal, date, timestamp are the same
+      case other => other
+    }
+  }
+
+  private def complexToHive(sparkType: JObject): String = {
+    (sparkType \ "type").extract[String] match {
+      case "array" => s"array<${toHive(sparkType \ "elementType")}>"
+      case "struct" =>
+        val fields = (sparkType \ "fields").children.map { f =>
+          s"${(f \ "name").extract[String]}:${toHive(f \ "type")}"
+        }
+        s"struct<${fields.mkString(",")}>"
+      case "map" => s"map<${toHive(sparkType \ "keyType")}, ${toHive(sparkType \ "valueType")}>"
+      case "udt" => toHive(sparkType \ "sqlType")
+    }
+  }
+
+  private def complexToInternal(sparkType: JObject): DataType = {
+    (sparkType \ "type").extract[String] match {
+      case "array" => ArrayType(getInternalType(sparkType \ "elementType"))
+      case "struct" =>
+        val fields = (sparkType \ "fields").children.map { f =>
+          StructField((f \ "name").extract[String], getInternalType(f \ "type"))
+        }
+        StructType(fields.toArray)
+      case "map" =>
+        MapType(getInternalType(sparkType \ "keyType"), getInternalType(sparkType \ "valueType"))
+      case "udt" => getInternalType(sparkType \ "sqlType")
+    }
+  }
+
+  /**
+   * Converts a JSON representing the Spark schema (the one returned by `df.schema.json`) into
+   * a Hive [[TableSchema]] instance.
+   *
+   * @param sparkJson a [[String]] containing the JSON representation of a Spark Dataframe schema
+   * @return a [[TableSchema]] representing the schema provided as input
+   */
+  def tableSchemaFromSparkJson(sparkJson: String): TableSchema = {
+    val schema = parse(sparkJson) \ "fields"
+    val fields = schema.children.map { field =>
+      val name = (field \ "name").extract[String]
+      val hiveType = toHive(field \ "type")
+      new FieldSchema(name, hiveType, "")
+    }
+    new TableSchema(fields.asJava)
+  }
+
+  /**
+   * Extracts the main type of each column contained in the JSON. This means that complex types
+   * are not returned in their full representation with the nested types: eg. for an array of any
+   * kind of data it returns `"array"`.
+   *
+   * @param sparkJson a [[String]] containing the JSON representation of a Spark Dataframe schema
+   * @return an [[Array]] of the principal type of the columns is the schema.
+   */
+  def getInternalTypes(sparkJson: String): Array[DataType] = {
+    val schema = parse(sparkJson) \ "fields"
+    schema.children.map { field =>
+      getInternalType(field \ "type")
+    }.toArray
+  }
+
+  /**
+   * Returns the Hive [[Type]] used in the thrift communications for {@param thriftDt}.
+   */
+  def toHiveThriftType(thriftDt: DataType): Type = {
+    thriftDt.name match {
+      case "boolean" => Type.BOOLEAN_TYPE
+      case "byte" => Type.TINYINT_TYPE
+      case "short" => Type.SMALLINT_TYPE
+      case "integer" => Type.INT_TYPE
+      case "long" => Type.BIGINT_TYPE
+      case "float" => Type.FLOAT_TYPE
+      case "double" => Type.DOUBLE_TYPE
+      case "binary" => Type.BINARY_TYPE
+      case _ => Type.STRING_TYPE
+    }
+  }
+
+  def toHiveString(value: Any, dt: DataType): String = (value, dt) match {
+    case (null, _) => "NULL"
+    case (struct: Any, StructType(fields)) =>
+      val values = struct.getClass.getMethod("toSeq").invoke(struct).asInstanceOf[Seq[Any]]
+      values.zip(fields).map {
+        case (v, t) => s""""${t.name}":${toHiveComplexTypeFieldString((v, t.dataType))}"""
+      }.mkString("{", ",", "}")
+    case (seq: Seq[_], ArrayType(t)) =>
+      seq.map(v => (v, t)).map(toHiveComplexTypeFieldString).mkString("[", ",", "]")
+    case (map: Map[_, _], MapType(kType, vType)) =>
+      map.map { case (k, v) =>
+        s"${toHiveComplexTypeFieldString((k, kType))}:${toHiveComplexTypeFieldString((v, vType))}"
+      }.toSeq.sorted.mkString("{", ",", "}")
+    case (decimal: java.math.BigDecimal, t) if t.name.startsWith("decimal") =>
+      decimal.stripTrailingZeros.toString
+    case (other, _) => other.toString
+  }
+
+  def toHiveComplexTypeFieldString(a: (Any, DataType)): String = a match {
+    case (null, _) => "null"
+    case (struct: Any, StructType(fields)) =>
+      val values = struct.getClass.getMethod("toSeq").invoke(struct).asInstanceOf[Seq[Any]]
+      values.zip(fields).map {
+        case (v, t) => s""""${t.name}":${toHiveComplexTypeFieldString((v, t.dataType))}"""
+      }.mkString("{", ",", "}")
+    case (seq: Seq[_], ArrayType(t)) =>
+      seq.map(v => (v, t)).map(toHiveComplexTypeFieldString).mkString("[", ",", "]")
+    case (map: Map[_, _], MapType(kType, vType)) =>
+      map.map { case (k, v) =>
+        s"${toHiveComplexTypeFieldString((k, kType))}:${toHiveComplexTypeFieldString((v, vType))}"
+      }.toSeq.sorted.mkString("{", ",", "}")
+    case (s: String, t) if t.name == "string" => s""""$s""""
+    case (other, _) => other.toString
+  }
+}
diff --git a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerBaseTest.scala b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerBaseTest.scala
new file mode 100644
index 0000000..d1ffd12
--- /dev/null
+++ b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerBaseTest.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver
+
+import java.io.File
+import java.sql.{Connection, DriverManager, Statement}
+
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hive.jdbc.HiveDriver
+import org.apache.hive.service.Service.STATE
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+import org.apache.livy.LIVY_VERSION
+import org.apache.livy.LivyConf
+import org.apache.livy.LivyConf.{LIVY_SPARK_SCALA_VERSION, LIVY_SPARK_VERSION}
+import org.apache.livy.server.AccessManager
+import org.apache.livy.server.recovery.{SessionStore, StateStore}
+import org.apache.livy.sessions.InteractiveSessionManager
+import org.apache.livy.utils.LivySparkUtils.{formatSparkVersion, sparkScalaVersion, sparkSubmitVersion, testSparkVersion}
+
+object ServerMode extends Enumeration {
+  val binary, http = Value
+}
+
+abstract class ThriftServerBaseTest extends FunSuite with BeforeAndAfterAll {
+  def mode: ServerMode.Value
+  def port: Int
+
+  val THRIFT_SERVER_STARTUP_TIMEOUT = 30000 // ms
+
+  val livyConf = new LivyConf()
+  val (sparkVersion, scalaVersionFromSparkSubmit) = sparkSubmitVersion(livyConf)
+  val formattedSparkVersion: (Int, Int) = {
+    formatSparkVersion(sparkVersion)
+  }
+
+  def jdbcUri(defaultDb: String, sessionConf: String*): String = if (mode == ServerMode.http) {
+    s"jdbc:hive2://localhost:$port/$defaultDb?hive.server2.transport.mode=http;" +
+      s"hive.server2.thrift.http.path=cliservice;${sessionConf.mkString(";")}"
+  } else {
+    s"jdbc:hive2://localhost:$port/$defaultDb?${sessionConf.mkString(";")}"
+  }
+
+  override def beforeAll(): Unit = {
+    Class.forName(classOf[HiveDriver].getCanonicalName)
+    livyConf.set(s"livy.${HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE}", mode.toString)
+    val portConfKey = if (mode == ServerMode.http) {
+      s"livy.${HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT}"
+    } else {
+      s"livy.${HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT}"
+    }
+    livyConf.set(portConfKey, port.toString)
+    val home = sys.env("LIVY_HOME")
+    val thriftserverJarName = s"livy-thriftserver-${LIVY_VERSION}.jar"
+    val thriftserverJarFile = Option(new File(home, s"jars/$thriftserverJarName"))
+      .filter(_.exists())
+      .getOrElse(new File(home, s"thriftserver/server/target/jars/$thriftserverJarName"))
+    livyConf.set(LivyConf.THRIFT_SERVER_JAR_LOCATION, thriftserverJarFile.getAbsolutePath)
+    livyConf.set(LivyConf.LOCAL_FS_WHITELIST, thriftserverJarFile.getParent)
+
+    // Set formatted Spark and Scala version into livy configuration, this will be used by
+    // session creation.
+    livyConf.set(LIVY_SPARK_VERSION.key, formattedSparkVersion.productIterator.mkString("."))
+    livyConf.set(LIVY_SPARK_SCALA_VERSION.key,
+      sparkScalaVersion(formattedSparkVersion, scalaVersionFromSparkSubmit, livyConf))
+    StateStore.init(livyConf)
+
+    val ss = new SessionStore(livyConf)
+    val sessionManager = new InteractiveSessionManager(livyConf, ss)
+    val accessManager = new AccessManager(livyConf)
+    LivyThriftServer.start(livyConf, sessionManager, ss, accessManager)
+    LivyThriftServer.thriftServerThread.join(THRIFT_SERVER_STARTUP_TIMEOUT)
+    assert(LivyThriftServer.getInstance.isDefined)
+    assert(LivyThriftServer.getInstance.get.getServiceState == STATE.STARTED)
+  }
+
+  override def afterAll(): Unit = {
+    LivyThriftServer.stopServer()
+  }
+
+  def withJdbcConnection(f: (Connection => Unit)): Unit = {
+    withJdbcConnection("default", Seq.empty)(f)
+  }
+
+  def withJdbcConnection(db: String, sessionConf: Seq[String])(f: (Connection => Unit)): Unit = {
+    withJdbcConnection(jdbcUri(db, sessionConf: _*))(f)
+  }
+
+  def withJdbcConnection(uri: String)(f: (Connection => Unit)): Unit = {
+    val user = System.getProperty("user.name")
+    val connection = DriverManager.getConnection(uri, user, "")
+    try {
+      f(connection)
+    } finally {
+      connection.close()
+    }
+  }
+
+  def withJdbcStatement(f: (Statement => Unit)): Unit = {
+    withJdbcConnection { connection =>
+      val s = connection.createStatement()
+      try {
+        f(s)
+      } finally {
+        s.close()
+      }
+    }
+  }
+}
diff --git a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala
new file mode 100644
index 0000000..6eea6f3
--- /dev/null
+++ b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver
+
+import java.sql.{Date, Statement}
+
+import org.apache.livy.LivyConf
+
+
+trait CommonThriftTests {
+  def hiveSupportEnabled(sparkMajorVersion: Int, livyConf: LivyConf): Boolean = {
+    sparkMajorVersion > 1 || livyConf.getBoolean(LivyConf.ENABLE_HIVE_CONTEXT)
+  }
+
+  def dataTypesTest(statement: Statement, mapSupported: Boolean): Unit = {
+    val resultSet = statement.executeQuery(
+      "select 1, 'a', cast(null as int), 1.2345, CAST('2018-08-06' as date)")
+    resultSet.next()
+    assert(resultSet.getInt(1) == 1)
+    assert(resultSet.getString(2) == "a")
+    assert(resultSet.getInt(3) == 0)
+    assert(resultSet.wasNull())
+    assert(resultSet.getDouble(4) == 1.2345)
+    assert(resultSet.getDate(5) == Date.valueOf("2018-08-06"))
+    assert(!resultSet.next())
+
+    val resultSetWithNulls = statement.executeQuery("select cast(null as string), " +
+      "cast(null as decimal), cast(null as double), cast(null as date), null")
+    resultSetWithNulls.next()
+    assert(resultSetWithNulls.getString(1) == null)
+    assert(resultSetWithNulls.wasNull())
+    assert(resultSetWithNulls.getBigDecimal(2) == null)
+    assert(resultSetWithNulls.wasNull())
+    assert(resultSetWithNulls.getDouble(3) == 0.0)
+    assert(resultSetWithNulls.wasNull())
+    assert(resultSetWithNulls.getDate(4) == null)
+    assert(resultSetWithNulls.wasNull())
+    assert(resultSetWithNulls.getString(5) == null)
+    assert(resultSetWithNulls.wasNull())
+    assert(!resultSetWithNulls.next())
+
+    val complexTypesQuery = if (mapSupported) {
+      "select array(1.5, 2.4, 1.3), struct('a', 1, 1.5), map(1, 'a', 2, 'b')"
+    } else {
+      "select array(1.5, 2.4, 1.3), struct('a', 1, 1.5)"
+    }
+
+    val resultSetComplex = statement.executeQuery(complexTypesQuery)
+    resultSetComplex.next()
+    assert(resultSetComplex.getString(1) == "[1.5,2.4,1.3]")
+    assert(resultSetComplex.getString(2) == "{\"col1\":\"a\",\"col2\":1,\"col3\":1.5}")
+    if (mapSupported) {
+      assert(resultSetComplex.getString(3) == "{1:\"a\",2:\"b\"}")
+    }
+    assert(!resultSetComplex.next())
+  }
+}
+
+class BinaryThriftServerSuite extends ThriftServerBaseTest with CommonThriftTests {
+  override def mode: ServerMode.Value = ServerMode.binary
+  override def port: Int = 20000
+
+  test("Reuse existing session") {
+    withJdbcConnection { _ =>
+      val sessionManager = LivyThriftServer.getInstance.get.getSessionManager()
+      val sessionHandle = sessionManager.getSessions.head
+      // Blocks until the session is ready
+      val session = sessionManager.getLivySession(sessionHandle)
+      withJdbcConnection("default", Seq(s"livy.server.sessionId=${session.id}")) { _ =>
+        val it = sessionManager.getSessions.iterator
+        // Blocks until all the sessions are ready
+        while (it.hasNext) {
+          sessionManager.getLivySession(it.next())
+        }
+        assert(LivyThriftServer.getInstance.get.livySessionManager.size() == 1)
+      }
+    }
+  }
+
+  test("fetch different data types") {
+    val supportMap = hiveSupportEnabled(formattedSparkVersion._1, livyConf)
+    withJdbcStatement { statement =>
+      dataTypesTest(statement, supportMap)
+    }
+  }
+
+  test("support default database in connection URIs") {
+    assume(hiveSupportEnabled(formattedSparkVersion._1, livyConf))
+    val db = "new_db"
+    withJdbcConnection { c =>
+      val s1 = c.createStatement()
+      s1.execute(s"create database $db")
+      s1.close()
+      val sessionManager = LivyThriftServer.getInstance.get.getSessionManager()
+      val sessionHandle = sessionManager.getSessions.head
+      // Blocks until the session is ready
+      val session = sessionManager.getLivySession(sessionHandle)
+      withJdbcConnection(db, Seq(s"livy.server.sessionId=${session.id}")) { c =>
+        val statement = c.createStatement()
+        val resultSet = statement.executeQuery("select current_database()")
+        resultSet.next()
+        assert(resultSet.getString(1) === db)
+        statement.close()
+      }
+      val s2 = c.createStatement()
+      s2.execute(s"drop database $db")
+      s2.close()
+    }
+  }
+
+  test("support hivevar") {
+    assume(hiveSupportEnabled(formattedSparkVersion._1, livyConf))
+    withJdbcConnection(jdbcUri("default") + "#myVar1=val1;myVar2=val2") { c =>
+      val statement = c.createStatement()
+      val selectRes = statement.executeQuery("select \"${myVar1}\", \"${myVar2}\"")
+      selectRes.next()
+      assert(selectRes.getString(1) === "val1")
+      assert(selectRes.getString(2) === "val2")
+      statement.close()
+    }
+  }
+}
+
+class HttpThriftServerSuite extends ThriftServerBaseTest with CommonThriftTests {
+  override def mode: ServerMode.Value = ServerMode.http
+  override def port: Int = 20001
+
+  test("fetch different data types") {
+    val supportMap = hiveSupportEnabled(formattedSparkVersion._1, livyConf)
+    withJdbcStatement { statement =>
+      dataTypesTest(statement, supportMap)
+    }
+  }
+}