blob: 2cdb34bc8658117b4adc7b746873b0be0cfb00d6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Map;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Test for {@link CatalogLoader} and {@link TableLoader}.
*/
public class TestCatalogTableLoader extends FlinkTestBase {
private static File warehouse = null;
private static final TableIdentifier IDENTIFIER = TableIdentifier.of("default", "my_table");
private static final Schema SCHEMA = new Schema(Types.NestedField.required(1, "f1", Types.StringType.get()));
@BeforeClass
public static void createWarehouse() throws IOException {
warehouse = File.createTempFile("warehouse", null);
Assert.assertTrue(warehouse.delete());
hiveConf.set("my_key", "my_value");
}
@AfterClass
public static void dropWarehouse() {
if (warehouse != null && warehouse.exists()) {
warehouse.delete();
}
}
@Test
public void testHadoopCatalogLoader() throws IOException, ClassNotFoundException {
Map<String, String> properties = Maps.newHashMap();
properties.put(CatalogProperties.WAREHOUSE_LOCATION, "file:" + warehouse);
CatalogLoader loader = CatalogLoader.hadoop("my_catalog", hiveConf, properties);
validateCatalogLoader(loader);
}
@Test
public void testHiveCatalogLoader() throws IOException, ClassNotFoundException {
CatalogLoader loader = CatalogLoader.hive("my_catalog", hiveConf, Maps.newHashMap());
validateCatalogLoader(loader);
}
@Test
public void testHadoopTableLoader() throws IOException, ClassNotFoundException {
String location = "file:" + warehouse + "/my_table";
new HadoopTables(hiveConf).create(SCHEMA, location);
validateTableLoader(TableLoader.fromHadoopTable(location, hiveConf));
}
@Test
public void testHiveCatalogTableLoader() throws IOException, ClassNotFoundException {
CatalogLoader catalogLoader = CatalogLoader.hive("my_catalog", hiveConf, Maps.newHashMap());
validateTableLoader(TableLoader.fromCatalog(catalogLoader, IDENTIFIER));
}
private static void validateCatalogLoader(CatalogLoader loader) throws IOException, ClassNotFoundException {
Table table = javaSerAndDeSer(loader).loadCatalog().createTable(IDENTIFIER, SCHEMA);
validateHadoopConf(table);
}
private static void validateTableLoader(TableLoader loader) throws IOException, ClassNotFoundException {
TableLoader copied = javaSerAndDeSer(loader);
copied.open();
try {
validateHadoopConf(copied.loadTable());
} finally {
copied.close();
}
}
private static void validateHadoopConf(Table table) {
FileIO io = table.io();
Assert.assertTrue("FileIO should be a HadoopFileIO", io instanceof HadoopFileIO);
HadoopFileIO hadoopIO = (HadoopFileIO) io;
Assert.assertEquals("my_value", hadoopIO.conf().get("my_key"));
}
@SuppressWarnings("unchecked")
private static <T> T javaSerAndDeSer(T object) throws IOException, ClassNotFoundException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try (ObjectOutputStream out = new ObjectOutputStream(bytes)) {
out.writeObject(object);
}
try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray()))) {
return (T) in.readObject();
}
}
}