blob: 4d5ebf39391eb055dd989d06920a784347042355 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.catalog;
import org.apache.falcon.FalconException;
import org.apache.falcon.resource.TestContext;
import org.apache.falcon.security.CurrentUser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hive.hcatalog.api.HCatAddPartitionDesc;
import org.apache.hive.hcatalog.api.HCatClient;
import org.apache.hive.hcatalog.api.HCatCreateDBDesc;
import org.apache.hive.hcatalog.api.HCatCreateTableDesc;
import org.apache.hive.hcatalog.api.HCatPartition;
import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* Tests Hive Meta Store service.
*/
public class HiveCatalogServiceIT {
private static final String METASTORE_URL = "thrift://localhost:49083";
private static final String DATABASE_NAME = "falcon_db";
private static final String TABLE_NAME = "falcon_table";
private static final String EXTERNAL_TABLE_NAME = "falcon_external";
private static final String EXTERNAL_TABLE_LOCATION = "jail://global:00/falcon/staging/falcon_external";
private final Configuration conf = new Configuration(false);
private HiveCatalogService hiveCatalogService;
private HCatClient client;
@BeforeClass
public void setUp() throws Exception {
// setup a logged in user
CurrentUser.authenticate(TestContext.REMOTE_USER);
hiveCatalogService = new HiveCatalogService();
client = TestContext.getHCatClient(METASTORE_URL);
createDatabase();
createTable();
createExternalTable();
}
private void createDatabase() throws Exception {
HCatCreateDBDesc dbDesc = HCatCreateDBDesc.create(DATABASE_NAME)
.ifNotExists(true).build();
client.createDatabase(dbDesc);
}
public void createTable() throws Exception {
ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
cols.add(new HCatFieldSchema("id", HCatFieldSchema.Type.INT, "id comment"));
cols.add(new HCatFieldSchema("value", HCatFieldSchema.Type.STRING, "value comment"));
List<HCatFieldSchema> partitionSchema = Arrays.asList(
new HCatFieldSchema("ds", HCatFieldSchema.Type.STRING, ""),
new HCatFieldSchema("region", HCatFieldSchema.Type.STRING, "")
);
HCatCreateTableDesc tableDesc = HCatCreateTableDesc
.create(DATABASE_NAME, TABLE_NAME, cols)
.fileFormat("rcfile")
.ifNotExists(true)
.comments("falcon integration test")
.partCols(new ArrayList<HCatFieldSchema>(partitionSchema))
.build();
client.createTable(tableDesc);
}
public void createExternalTable() throws Exception {
ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
cols.add(new HCatFieldSchema("id", HCatFieldSchema.Type.INT, "id comment"));
cols.add(new HCatFieldSchema("value", HCatFieldSchema.Type.STRING, "value comment"));
List<HCatFieldSchema> partitionSchema = Arrays.asList(
new HCatFieldSchema("ds", HCatFieldSchema.Type.STRING, ""),
new HCatFieldSchema("region", HCatFieldSchema.Type.STRING, "")
);
HCatCreateTableDesc tableDesc = HCatCreateTableDesc
.create(DATABASE_NAME, EXTERNAL_TABLE_NAME, cols)
.fileFormat("rcfile")
.ifNotExists(true)
.comments("falcon integration test")
.partCols(new ArrayList<HCatFieldSchema>(partitionSchema))
.isTableExternal(true)
.location(EXTERNAL_TABLE_LOCATION)
.build();
client.createTable(tableDesc);
}
@AfterClass
public void tearDown() throws Exception {
dropTable(EXTERNAL_TABLE_NAME);
dropTable(TABLE_NAME);
dropDatabase();
TestContext.deleteEntitiesFromStore();
}
private void dropTable(String tableName) throws Exception {
client.dropTable(DATABASE_NAME, tableName, true);
}
private void dropDatabase() throws Exception {
client.dropDatabase(DATABASE_NAME, true, HCatClient.DropDBMode.CASCADE);
}
@BeforeMethod
private void addPartitions() throws Exception {
Map<String, String> firstPtn = new HashMap<String, String>();
firstPtn.put("ds", "20130903"); //yyyyMMDD
firstPtn.put("region", "us");
HCatAddPartitionDesc addPtn = HCatAddPartitionDesc.create(
DATABASE_NAME, TABLE_NAME, null, firstPtn).build();
client.addPartition(addPtn);
Map<String, String> secondPtn = new HashMap<String, String>();
secondPtn.put("ds", "20130903");
secondPtn.put("region", "in");
HCatAddPartitionDesc addPtn2 = HCatAddPartitionDesc.create(
DATABASE_NAME, TABLE_NAME, null, secondPtn).build();
client.addPartition(addPtn2);
Map<String, String> thirdPtn = new HashMap<String, String>();
thirdPtn.put("ds", "20130902");
thirdPtn.put("region", "in");
HCatAddPartitionDesc addPtn3 = HCatAddPartitionDesc.create(
DATABASE_NAME, TABLE_NAME, null, thirdPtn).build();
client.addPartition(addPtn3);
}
@AfterMethod
private void dropPartitions() throws Exception {
Map<String, String> partitionSpec = new HashMap<String, String>();
partitionSpec.put("ds", "20130903");
client.dropPartitions(DATABASE_NAME, TABLE_NAME, partitionSpec, true);
partitionSpec = new HashMap<String, String>();
partitionSpec.put("ds", "20130902");
client.dropPartitions(DATABASE_NAME, TABLE_NAME, partitionSpec, true);
}
@Test
public void testIsAlive() throws Exception {
Assert.assertTrue(hiveCatalogService.isAlive(conf, METASTORE_URL));
}
@Test (expectedExceptions = Exception.class)
public void testIsAliveNegative() throws Exception {
hiveCatalogService.isAlive(conf, "thrift://localhost:9999");
}
@Test
public void testTableExistsNegative() throws Exception {
Assert.assertFalse(hiveCatalogService.tableExists(conf, METASTORE_URL, DATABASE_NAME, "blah"));
}
@Test
public void testTableExists() throws Exception {
Assert.assertTrue(hiveCatalogService.tableExists(
conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME));
}
@Test
public void testIsTableExternalFalse() throws Exception {
Assert.assertFalse(hiveCatalogService.isTableExternal(conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME));
}
@Test
public void testIsTableExternalTrue() throws Exception {
Assert.assertTrue(hiveCatalogService.isTableExternal(conf, METASTORE_URL, DATABASE_NAME, EXTERNAL_TABLE_NAME));
}
@Test
public void testListPartitionsByFilterNull() throws Exception {
List<CatalogPartition> filteredPartitions = hiveCatalogService.listPartitionsByFilter(
conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, null);
Assert.assertEquals(filteredPartitions.size(), 3);
}
@DataProvider (name = "lessThanFilter")
public Object[][] createLessThanFilter() {
return new Object[][] {
{"ds < \"20130905\"", 3},
{"ds < \"20130904\"", 3},
{"ds < \"20130903\"", 1},
{"ds < \"20130902\"", 0},
};
}
@Test (dataProvider = "lessThanFilter")
public void testListPartitionsByFilterLessThan(String lessThanFilter, int expectedPartitionCount)
throws Exception {
List<CatalogPartition> filteredPartitions = hiveCatalogService.listPartitionsByFilter(
conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, lessThanFilter);
Assert.assertEquals(filteredPartitions.size(), expectedPartitionCount);
}
@DataProvider (name = "greaterThanFilter")
public Object[][] createGreaterThanFilter() {
return new Object[][] {
{"ds > \"20130831\"", 3},
{"ds > \"20130905\"", 0},
{"ds > \"20130904\"", 0},
{"ds > \"20130903\"", 0},
{"ds > \"20130902\"", 2},
};
}
@Test (dataProvider = "greaterThanFilter")
public void testListPartitionsByFilterGreaterThan(String greaterThanFilter, int expectedPartitionCount)
throws Exception {
List<CatalogPartition> filteredPartitions = hiveCatalogService.listPartitionsByFilter(
conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, greaterThanFilter);
Assert.assertEquals(filteredPartitions.size(), expectedPartitionCount);
}
@Test
public void testListPartititions() throws FalconException {
List<String> filters = new ArrayList<String>();
filters.add("20130903");
List<CatalogPartition> partitions = hiveCatalogService.listPartitions(
conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, filters);
Assert.assertEquals(partitions.size(), 2);
filters.add("us");
partitions = hiveCatalogService.listPartitions(
conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, filters);
Assert.assertEquals(partitions.size(), 1);
}
@Test
public void testGetPartitionsFullSpec() throws Exception {
Map<String, String> partitionSpec = new HashMap<String, String>();
partitionSpec.put("ds", "20130902");
partitionSpec.put("region", "in");
HCatPartition ptn = client.getPartition(DATABASE_NAME, TABLE_NAME, partitionSpec);
Assert.assertTrue(ptn != null);
}
@Test
public void testGetPartitionsPartialSpec() throws Exception {
Map<String, String> partialPartitionSpec = new HashMap<String, String>();
partialPartitionSpec.put("ds", "20130903");
List<HCatPartition> partitions = client.getPartitions(DATABASE_NAME, TABLE_NAME, partialPartitionSpec);
Assert.assertEquals(partitions.size(), 2);
}
@Test
public void testDropPartition() throws Exception {
hiveCatalogService.dropPartition(
conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, Arrays.asList("20130902", "in"), true);
List<HCatPartition> partitions = client.getPartitions(DATABASE_NAME, TABLE_NAME);
Assert.assertEquals(partitions.size(), 2, "Unexpected number of partitions");
Assert.assertEquals(new String[]{"20130903", "in"},
partitions.get(0).getValues().toArray(), "Mismatched partition");
hiveCatalogService.dropPartitions(
conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, Arrays.asList("20130903"), true);
partitions = client.getPartitions(DATABASE_NAME, TABLE_NAME);
Assert.assertEquals(partitions.size(), 0, "Unexpected number of partitions");
}
@Test
public void testGetPartition() throws Exception {
CatalogPartition partition = CatalogServiceFactory.getCatalogService().getPartition(
conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, Arrays.asList("20130902", "in"));
Assert.assertNotNull(partition);
long createTime = partition.getCreateTime();
Assert.assertTrue(createTime > 0);
}
@Test
public void testReInstatePartition() throws Exception {
Map<String, String> partitionSpec = new LinkedHashMap<String, String>();
partitionSpec.put("ds", "20130918");
partitionSpec.put("region", "blah");
HCatAddPartitionDesc first = HCatAddPartitionDesc.create(
DATABASE_NAME, TABLE_NAME, null, partitionSpec).build();
client.addPartition(first);
CatalogPartition partition = CatalogServiceFactory.getCatalogService().getPartition(
conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, new ArrayList<String>(partitionSpec.values()));
Assert.assertNotNull(partition);
final long originalCreateTime = partition.getCreateTime();
Thread.sleep(1000); // sleep before deletion
client.dropPartitions(DATABASE_NAME, TABLE_NAME, partitionSpec, true);
Thread.sleep(1000); // sleep so the next add is delayed a bit
HCatAddPartitionDesc second = HCatAddPartitionDesc.create(
DATABASE_NAME, TABLE_NAME, null, partitionSpec).build();
client.addPartition(second);
CatalogPartition reInstatedPartition = CatalogServiceFactory.getCatalogService().getPartition(
conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, new ArrayList<String>(partitionSpec.values()));
Assert.assertNotNull(reInstatedPartition);
final long reInstatedCreateTime = reInstatedPartition.getCreateTime();
Assert.assertTrue(reInstatedCreateTime > originalCreateTime);
}
@DataProvider (name = "tableName")
public Object[][] createTableName() {
return new Object[][] {
{TABLE_NAME},
{EXTERNAL_TABLE_NAME},
};
}
@Test
public void testGetPartitionColumns() throws FalconException {
AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
List<String> columns = catalogService.getPartitionColumns(conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME);
Assert.assertEquals(columns, Arrays.asList("ds", "region"));
}
@Test
public void testAddPartition() throws FalconException {
AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
List<String> partitionValues = Arrays.asList("20130902", "us");
String location = EXTERNAL_TABLE_LOCATION + "/20130902";
catalogService.addPartition(conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionValues, location);
CatalogPartition partition =
catalogService.getPartition(conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionValues);
Assert.assertEquals(partition.getLocation(), location);
try {
catalogService.addPartition(conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionValues, location);
} catch (FalconException e) {
if (!(e.getCause() instanceof AlreadyExistsException)) {
Assert.fail("Expected FalconException(AlreadyExistsException)");
}
}
}
@Test
public void testUpdatePartition() throws FalconException {
AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
List<String> partitionValues = Arrays.asList("20130902", "us");
String location = EXTERNAL_TABLE_LOCATION + "/20130902";
catalogService.addPartition(conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionValues, location);
CatalogPartition partition =
catalogService.getPartition(conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionValues);
Assert.assertEquals(partition.getLocation(), location);
String location2 = location + "updated";
catalogService.updatePartition(conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionValues, location2);
partition = catalogService.getPartition(conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionValues);
Assert.assertEquals(partition.getLocation(), location2);
}
}