blob: 0bbdb23466984c4fd823bf7d589726b19fae678a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.testutils;
import java.io.FileInputStream;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveClient;
import org.apache.hudi.hive.testutils.HiveTestService;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.sources.TestDataSource;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.service.server.HiveServer2;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetFileWriter.Mode;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* Abstract test that provides a dfs & spark contexts.
*
*/
public class UtilitiesTestBase {
protected static String dfsBasePath;
protected static HdfsTestService hdfsTestService;
protected static MiniDFSCluster dfsCluster;
protected static DistributedFileSystem dfs;
protected transient JavaSparkContext jsc = null;
protected transient HoodieSparkEngineContext context = null;
protected transient SparkSession sparkSession = null;
protected transient SQLContext sqlContext;
protected static HiveServer2 hiveServer;
protected static HiveTestService hiveTestService;
private static ObjectMapper mapper = new ObjectMapper();
@BeforeAll
public static void initClass() throws Exception {
// Set log level to WARN for spark logs to avoid exceeding log limit in travis
Logger rootLogger = Logger.getRootLogger();
rootLogger.setLevel(Level.ERROR);
Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
initClass(true);
}
public static void initClass(boolean startHiveService) throws Exception {
hdfsTestService = new HdfsTestService();
dfsCluster = hdfsTestService.start(true);
dfs = dfsCluster.getFileSystem();
dfsBasePath = dfs.getWorkingDirectory().toString();
dfs.mkdirs(new Path(dfsBasePath));
if (startHiveService) {
hiveTestService = new HiveTestService(hdfsTestService.getHadoopConf());
hiveServer = hiveTestService.start();
clearHiveDb();
}
}
@AfterAll
public static void cleanupClass() {
if (hdfsTestService != null) {
hdfsTestService.stop();
}
if (hiveServer != null) {
hiveServer.stop();
}
if (hiveTestService != null) {
hiveTestService.stop();
}
}
@BeforeEach
public void setup() throws Exception {
TestDataSource.initDataGen();
jsc = UtilHelpers.buildSparkContext(this.getClass().getName() + "-hoodie", "local[2]");
context = new HoodieSparkEngineContext(jsc);
sqlContext = new SQLContext(jsc);
sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate();
}
@AfterEach
public void teardown() throws Exception {
TestDataSource.resetDataGen();
if (jsc != null) {
jsc.stop();
}
if (context != null) {
context = null;
}
}
/**
* Helper to get hive sync config.
*
* @param basePath
* @param tableName
* @return
*/
protected static HiveSyncConfig getHiveSyncConfig(String basePath, String tableName) {
HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
hiveSyncConfig.jdbcUrl = "jdbc:hive2://127.0.0.1:9999/";
hiveSyncConfig.hiveUser = "";
hiveSyncConfig.hivePass = "";
hiveSyncConfig.databaseName = "testdb1";
hiveSyncConfig.tableName = tableName;
hiveSyncConfig.basePath = basePath;
hiveSyncConfig.assumeDatePartitioning = false;
hiveSyncConfig.usePreApacheInputFormat = false;
hiveSyncConfig.partitionFields = CollectionUtils.createImmutableList("datestr");
return hiveSyncConfig;
}
/**
* Initialize Hive DB.
*
* @throws IOException
*/
private static void clearHiveDb() throws IOException {
HiveConf hiveConf = new HiveConf();
// Create Dummy hive sync config
HiveSyncConfig hiveSyncConfig = getHiveSyncConfig("/dummy", "dummy");
hiveConf.addResource(hiveServer.getHiveConf());
HoodieTableMetaClient.initTableType(dfs.getConf(), hiveSyncConfig.basePath, HoodieTableType.COPY_ON_WRITE,
hiveSyncConfig.tableName, null);
HoodieHiveClient client = new HoodieHiveClient(hiveSyncConfig, hiveConf, dfs);
client.updateHiveSQL("drop database if exists " + hiveSyncConfig.databaseName);
client.updateHiveSQL("create database " + hiveSyncConfig.databaseName);
client.close();
}
public static class Helpers {
// to get hold of resources bundled with jar
private static ClassLoader classLoader = Helpers.class.getClassLoader();
public static String readFile(String testResourcePath) throws IOException {
BufferedReader reader =
new BufferedReader(new InputStreamReader(classLoader.getResourceAsStream(testResourcePath)));
StringBuffer sb = new StringBuffer();
String line;
while ((line = reader.readLine()) != null) {
sb.append(line + "\n");
}
return sb.toString();
}
public static String readFileFromAbsolutePath(String absolutePathForResource) throws IOException {
BufferedReader reader =
new BufferedReader(new InputStreamReader(new FileInputStream(absolutePathForResource)));
StringBuffer sb = new StringBuffer();
String line;
while ((line = reader.readLine()) != null) {
sb.append(line + "\n");
}
return sb.toString();
}
public static void copyToDFS(String testResourcePath, FileSystem fs, String targetPath) throws IOException {
PrintStream os = new PrintStream(fs.create(new Path(targetPath), true));
os.print(readFile(testResourcePath));
os.flush();
os.close();
}
public static void copyToDFSFromAbsolutePath(String absolutePathForResource, FileSystem fs, String targetPath)
throws IOException {
PrintStream os = new PrintStream(fs.create(new Path(targetPath), true));
os.print(readFileFromAbsolutePath(absolutePathForResource));
os.flush();
os.close();
}
public static void savePropsToDFS(TypedProperties props, FileSystem fs, String targetPath) throws IOException {
String[] lines = props.keySet().stream().map(k -> String.format("%s=%s", k, props.get(k))).toArray(String[]::new);
saveStringsToDFS(lines, fs, targetPath);
}
public static void saveStringsToDFS(String[] lines, FileSystem fs, String targetPath) throws IOException {
PrintStream os = new PrintStream(fs.create(new Path(targetPath), true));
for (String l : lines) {
os.println(l);
}
os.flush();
os.close();
}
/**
* Converts the json records into CSV format and writes to a file.
*
* @param hasHeader whether the CSV file should have a header line.
* @param sep the column separator to use.
* @param lines the records in JSON format.
* @param fs {@link FileSystem} instance.
* @param targetPath File path.
* @throws IOException
*/
public static void saveCsvToDFS(
boolean hasHeader, char sep,
String[] lines, FileSystem fs, String targetPath) throws IOException {
Builder csvSchemaBuilder = CsvSchema.builder();
ArrayNode arrayNode = mapper.createArrayNode();
Arrays.stream(lines).forEachOrdered(
line -> {
try {
arrayNode.add(mapper.readValue(line, ObjectNode.class));
} catch (IOException e) {
throw new HoodieIOException(
"Error converting json records into CSV format: " + e.getMessage());
}
});
arrayNode.get(0).fieldNames().forEachRemaining(csvSchemaBuilder::addColumn);
ObjectWriter csvObjWriter = new CsvMapper()
.writerFor(JsonNode.class)
.with(csvSchemaBuilder.setUseHeader(hasHeader).setColumnSeparator(sep).build());
PrintStream os = new PrintStream(fs.create(new Path(targetPath), true));
csvObjWriter.writeValue(os, arrayNode);
os.flush();
os.close();
}
public static void saveParquetToDFS(List<GenericRecord> records, Path targetFile) throws IOException {
try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(targetFile)
.withSchema(HoodieTestDataGenerator.AVRO_SCHEMA)
.withConf(HoodieTestUtils.getDefaultHadoopConf())
.withWriteMode(Mode.OVERWRITE)
.build()) {
for (GenericRecord record : records) {
writer.write(record);
}
}
}
public static TypedProperties setupSchemaOnDFS() throws IOException {
return setupSchemaOnDFS("delta-streamer-config", "source.avsc");
}
public static TypedProperties setupSchemaOnDFS(String scope, String filename) throws IOException {
UtilitiesTestBase.Helpers.copyToDFS(scope + "/" + filename, dfs, dfsBasePath + "/" + filename);
TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/" + filename);
return props;
}
public static TypedProperties setupSchemaOnDFSWithAbsoluteScope(String scope, String filename) throws IOException {
UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(scope + "/" + filename, dfs, dfsBasePath + "/" + filename);
TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/" + filename);
return props;
}
public static GenericRecord toGenericRecord(HoodieRecord hoodieRecord) {
try {
Option<IndexedRecord> recordOpt = hoodieRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA);
return (GenericRecord) recordOpt.get();
} catch (IOException e) {
return null;
}
}
public static List<GenericRecord> toGenericRecords(List<HoodieRecord> hoodieRecords) {
List<GenericRecord> records = new ArrayList<>();
for (HoodieRecord hoodieRecord : hoodieRecords) {
records.add(toGenericRecord(hoodieRecord));
}
return records;
}
public static String toJsonString(HoodieRecord hr) {
try {
return ((RawTripTestPayload) hr.getData()).getJsonData();
} catch (IOException ioe) {
return null;
}
}
public static String[] jsonifyRecords(List<HoodieRecord> records) {
return records.stream().map(Helpers::toJsonString).toArray(String[]::new);
}
}
}