blob: 1381b5495ae24694b9684cc8f2c73a430253ede2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.hive;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStore;
import org.apache.hadoop.hive.metastore.IHMSHandler;
import org.apache.hadoop.hive.metastore.RetryingHMSHandler;
import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.hadoop.Util;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportFactory;
import static java.nio.file.Files.createTempDirectory;
import static java.nio.file.attribute.PosixFilePermissions.asFileAttribute;
import static java.nio.file.attribute.PosixFilePermissions.fromString;
public class TestHiveMetastore {
private static final String DEFAULT_DATABASE_NAME = "default";
private static final int DEFAULT_POOL_SIZE = 5;
// create the metastore handlers based on whether we're working with Hive2 or Hive3 dependencies
// we need to do this because there is a breaking API change between Hive2 and Hive3
private static final DynConstructors.Ctor<HiveMetaStore.HMSHandler> HMS_HANDLER_CTOR = DynConstructors.builder()
.impl(HiveMetaStore.HMSHandler.class, String.class, Configuration.class)
.impl(HiveMetaStore.HMSHandler.class, String.class, HiveConf.class)
.build();
private static final DynMethods.StaticMethod GET_BASE_HMS_HANDLER = DynMethods.builder("getProxy")
.impl(RetryingHMSHandler.class, Configuration.class, IHMSHandler.class, boolean.class)
.impl(RetryingHMSHandler.class, HiveConf.class, IHMSHandler.class, boolean.class)
.buildStatic();
// Hive3 introduces background metastore tasks (MetastoreTaskThread) for performing various cleanup duties. These
// threads are scheduled and executed in a static thread pool (org.apache.hadoop.hive.metastore.ThreadPool).
// This thread pool is shut down normally as part of the JVM shutdown hook, but since we're creating and tearing down
// multiple metastore instances within the same JVM, we have to call this cleanup method manually, otherwise
// threads from our previous test suite will be stuck in the pool with stale config, and keep on being scheduled.
// This can lead to issues, e.g. accidental Persistence Manager closure by ScheduledQueryExecutionsMaintTask.
private static final DynMethods.StaticMethod METASTORE_THREADS_SHUTDOWN = DynMethods.builder("shutdown")
.impl("org.apache.hadoop.hive.metastore.ThreadPool")
.orNoop()
.buildStatic();
private File hiveLocalDir;
private HiveConf hiveConf;
private ExecutorService executorService;
private TServer server;
private HiveMetaStore.HMSHandler baseHandler;
private HiveClientPool clientPool;
/**
* Starts a TestHiveMetastore with the default connection pool size (5) and the default HiveConf.
*/
public void start() {
start(new HiveConf(new Configuration(), TestHiveMetastore.class), DEFAULT_POOL_SIZE);
}
/**
* Starts a TestHiveMetastore with the default connection pool size (5) with the provided HiveConf.
* @param hiveConf The hive configuration to use
*/
public void start(HiveConf conf) {
start(conf, DEFAULT_POOL_SIZE);
}
/**
* Starts a TestHiveMetastore with a provided connection pool size and HiveConf.
* @param hiveConf The hive configuration to use
* @param poolSize The number of threads in the executor pool
*/
public void start(HiveConf conf, int poolSize) {
try {
this.hiveLocalDir = createTempDirectory("hive", asFileAttribute(fromString("rwxrwxrwx"))).toFile();
File derbyLogFile = new File(hiveLocalDir, "derby.log");
System.setProperty("derby.stream.error.file", derbyLogFile.getAbsolutePath());
setupMetastoreDB("jdbc:derby:" + getDerbyPath() + ";create=true");
TServerSocket socket = new TServerSocket(0);
int port = socket.getServerSocket().getLocalPort();
initConf(conf, port);
this.hiveConf = conf;
this.server = newThriftServer(socket, poolSize, hiveConf);
this.executorService = Executors.newSingleThreadExecutor();
this.executorService.submit(() -> server.serve());
// in Hive3, setting this as a system prop ensures that it will be picked up whenever a new HiveConf is created
System.setProperty(HiveConf.ConfVars.METASTOREURIS.varname, hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS));
this.clientPool = new HiveClientPool(1, hiveConf);
} catch (Exception e) {
throw new RuntimeException("Cannot start TestHiveMetastore", e);
}
}
public void stop() {
if (clientPool != null) {
clientPool.close();
}
if (server != null) {
server.stop();
}
if (executorService != null) {
executorService.shutdown();
}
if (hiveLocalDir != null) {
hiveLocalDir.delete();
}
if (baseHandler != null) {
baseHandler.shutdown();
}
METASTORE_THREADS_SHUTDOWN.invoke();
}
public HiveConf hiveConf() {
return hiveConf;
}
public HiveClientPool clientPool() {
return clientPool;
}
public String getDatabasePath(String dbName) {
File dbDir = new File(hiveLocalDir, dbName + ".db");
return dbDir.getPath();
}
public void reset() throws Exception {
for (String dbName : clientPool.run(client -> client.getAllDatabases())) {
for (String tblName : clientPool.run(client -> client.getAllTables(dbName))) {
clientPool.run(client -> {
client.dropTable(dbName, tblName, true, true, true);
return null;
});
}
if (!DEFAULT_DATABASE_NAME.equals(dbName)) {
// Drop cascade, functions dropped by cascade
clientPool.run(client -> {
client.dropDatabase(dbName, true, true, true);
return null;
});
}
}
Path warehouseRoot = new Path(hiveLocalDir.getAbsolutePath());
FileSystem fs = Util.getFs(warehouseRoot, hiveConf);
for (FileStatus fileStatus : fs.listStatus(warehouseRoot)) {
if (!fileStatus.getPath().getName().equals("derby.log") &&
!fileStatus.getPath().getName().equals("metastore_db")) {
fs.delete(fileStatus.getPath(), true);
}
}
}
private TServer newThriftServer(TServerSocket socket, int poolSize, HiveConf conf) throws Exception {
HiveConf serverConf = new HiveConf(conf);
serverConf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, "jdbc:derby:" + getDerbyPath() + ";create=true");
baseHandler = HMS_HANDLER_CTOR.newInstance("new db based metaserver", serverConf);
IHMSHandler handler = GET_BASE_HMS_HANDLER.invoke(serverConf, baseHandler, false);
TThreadPoolServer.Args args = new TThreadPoolServer.Args(socket)
.processor(new TSetIpAddressProcessor<>(handler))
.transportFactory(new TTransportFactory())
.protocolFactory(new TBinaryProtocol.Factory())
.minWorkerThreads(poolSize)
.maxWorkerThreads(poolSize);
return new TThreadPoolServer(args);
}
private void initConf(HiveConf conf, int port) {
conf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://localhost:" + port);
conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, "file:" + hiveLocalDir.getAbsolutePath());
conf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, "false");
conf.set(HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES.varname, "false");
conf.set("iceberg.hive.client-pool-size", "2");
}
private void setupMetastoreDB(String dbURL) throws SQLException, IOException {
Connection connection = DriverManager.getConnection(dbURL);
ScriptRunner scriptRunner = new ScriptRunner(connection, true, true);
ClassLoader classLoader = ClassLoader.getSystemClassLoader();
InputStream inputStream = classLoader.getResourceAsStream("hive-schema-3.1.0.derby.sql");
try (Reader reader = new InputStreamReader(inputStream)) {
scriptRunner.runScript(reader);
}
}
private String getDerbyPath() {
File metastoreDB = new File(hiveLocalDir, "metastore_db");
return metastoreDB.getPath();
}
}