blob: 49394c965a7ab1c11b46ca54f50fc67236bb292f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.io.hcatalog;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.commons.io.IOUtils;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.ReadableData;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.test.CrunchTestSupport;
import org.apache.crunch.test.TemporaryPath;
import org.apache.crunch.types.avro.Avros;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.hcatalog.data.DefaultHCatRecord;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;
public class HCatSourceITSpec extends CrunchTestSupport {
private static IMetaStoreClient client;
private static TemporaryPath temporaryPath;
private static Configuration conf;
@Rule
public TestName testName = new TestName();
@BeforeClass
public static void setUp() throws Throwable {
HCatTestSuiteIT.startTest();
client = HCatTestSuiteIT.getClient();
temporaryPath = HCatTestSuiteIT.getRootPath();
conf = HCatTestSuiteIT.getConf();
}
@AfterClass
public static void tearDown() throws Exception {
HCatTestSuiteIT.endTest();
}
@Test
public void testBasic() throws Exception {
String tableName = testName.getMethodName();
Path tableRootLocation = temporaryPath.getPath(tableName);
String data = "17,josh\n29,indiana\n";
writeDataToHdfs(data, tableRootLocation, conf);
HCatTestUtils.createUnpartitionedTable(client, tableName, TableType.MANAGED_TABLE, tableRootLocation);
Pipeline p = new MRPipeline(HCatSourceITSpec.class, conf);
HCatSourceTarget src = (HCatSourceTarget) FromHCat.table(tableName);
HCatSchema schema = src.getTableSchema(p.getConfiguration());
PCollection<HCatRecord> records = p.read(src);
List<Pair<Integer, String>> mat = Lists.newArrayList(
records.parallelDo(new HCatTestUtils.Fns.MapPairFn(schema), Avros.tableOf(Avros.ints(), Avros.strings()))
.materialize());
p.done();
assertEquals(ImmutableList.of(Pair.of(17, "josh"), Pair.of(29, "indiana")), mat);
}
@Test
public void testReadable() throws Exception {
String tableName = testName.getMethodName();
Path tableRootLocation = temporaryPath.getPath(tableName);
String data = "17,josh\n29,indiana\n";
writeDataToHdfs(data, tableRootLocation, conf);
HCatTestUtils.createUnpartitionedTable(client, tableName, TableType.MANAGED_TABLE, tableRootLocation);
Pipeline p = new MRPipeline(HCatSourceITSpec.class, conf);
HCatSourceTarget src = (HCatSourceTarget) FromHCat.table(tableName);
HCatSchema schema = src.getTableSchema(p.getConfiguration());
PCollection<HCatRecord> records = p.read(src);
ReadableData<HCatRecord> readable = records.asReadable(true);
TaskInputOutputContext mockTIOC = Mockito.mock(TaskInputOutputContext.class);
when(mockTIOC.getConfiguration()).thenReturn(conf);
readable.configure(conf);
Iterator<HCatRecord> iterator = readable.read(mockTIOC).iterator();
HCatTestUtils.Fns.MapPairFn fn = new HCatTestUtils.Fns.MapPairFn(schema);
List<Pair<Integer, String>> results = new ArrayList<>();
while (iterator.hasNext()) {
results.add(fn.map(iterator.next()));
}
p.done();
assertEquals(ImmutableList.of(Pair.of(17, "josh"), Pair.of(29, "indiana")), results);
}
@Test
public void testmaterialize() throws Exception {
String tableName = testName.getMethodName();
Path tableRootLocation = temporaryPath.getPath(tableName);
String data = "17,josh\n29,indiana\n";
writeDataToHdfs(data, tableRootLocation, conf);
HCatTestUtils.createUnpartitionedTable(client, tableName, TableType.MANAGED_TABLE, tableRootLocation);
Pipeline p = new MRPipeline(HCatSourceITSpec.class, conf);
HCatSourceTarget src = (HCatSourceTarget) FromHCat.table(tableName);
HCatSchema schema = src.getTableSchema(p.getConfiguration());
PCollection<HCatRecord> records = p.read(src);
// force the materialize here on the HCatRecords themselves ... then
// transform
Iterable<HCatRecord> materialize = records.materialize();
HCatTestUtils.Fns.MapPairFn fn = new HCatTestUtils.Fns.MapPairFn(schema);
List<Pair<Integer, String>> results = new ArrayList<>();
for (final HCatRecord record : materialize) {
results.add(fn.map(record));
}
p.done();
assertEquals(ImmutableList.of(Pair.of(17, "josh"), Pair.of(29, "indiana")), results);
}
@Test
public void testMaterialize_partitionedTable_multiplePartitionsRequested() throws Exception {
String tableName = testName.getMethodName();
Path tableRootLocation = temporaryPath.getPath(tableName);
String part1Data = "17,josh\n29,indiana\n";
String part1Value = "1234";
Path partition1Location = new Path(tableRootLocation, part1Value);
String part2Data = "42,jackie\n17,ohio\n";
String part2Value = "5678";
Path partition2Location = new Path(tableRootLocation, part2Value);
writeDataToHdfs(part1Data, partition1Location, conf);
writeDataToHdfs(part2Data, partition2Location, conf);
FieldSchema partitionSchema = new FieldSchema();
partitionSchema.setName("timestamp");
partitionSchema.setType("string");
Table table = HCatTestUtils.createTable(client, "default", tableName, TableType.EXTERNAL_TABLE, tableRootLocation,
Collections.singletonList(partitionSchema));
client
.add_partition(HCatTestUtils.createPartition(table, partition1Location, Collections.singletonList(part1Value)));
client
.add_partition(HCatTestUtils.createPartition(table, partition2Location, Collections.singletonList(part2Value)));
Pipeline p = new MRPipeline(HCatSourceITSpec.class, conf);
String filter = "timestamp=\"" + part1Value + "\" or timestamp=\"" + part2Value + "\"";
// HCatSource src = new HCatSource("default", tableName, filter);
HCatSourceTarget src = (HCatSourceTarget) FromHCat.table("default", tableName, filter);
HCatSchema schema = src.getTableSchema(p.getConfiguration());
PCollection<HCatRecord> records = p.read(src);
// force the materialize here on the HCatRecords themselves ... then
// transform
Iterable<HCatRecord> materialize = records.materialize();
HCatTestUtils.Fns.MapPairFn fn = new HCatTestUtils.Fns.MapPairFn(schema);
List<Pair<Integer, String>> results = new ArrayList<>();
for (final HCatRecord record : materialize) {
results.add(fn.map(record));
}
p.done();
assertEquals(
ImmutableList.of(Pair.of(17, "josh"), Pair.of(29, "indiana"), Pair.of(42, "jackie"), Pair.of(17, "ohio")),
results);
}
@Test
public void testGroupBy() throws Exception {
String tableName = testName.getMethodName();
Path tableRootLocation = temporaryPath.getPath(tableName);
String data = "17,josh\n29,indiana\n";
writeDataToHdfs(data, tableRootLocation, conf);
HCatTestUtils.createUnpartitionedTable(client, tableName, TableType.MANAGED_TABLE, tableRootLocation);
Pipeline p = new MRPipeline(HCatSourceITSpec.class, conf);
HCatSourceTarget src = (HCatSourceTarget) FromHCat.table(tableName);
HCatSchema schema = src.getTableSchema(p.getConfiguration());
PCollection<HCatRecord> records = p.read(src);
// can't use HCatRecord here as the intermediate output is written out by
// hadoop, and there is
// an explicit check to ensure that the type being written out matches the
// defined output type.
// e.g. DefaultHCatRecord != HCatRecord, therefore an exception is thrown
PTable<String, DefaultHCatRecord> table = records.parallelDo(new HCatTestUtils.Fns.GroupByHCatRecordFn(),
Writables.tableOf(Writables.strings(), Writables.writables(DefaultHCatRecord.class)));
PTable<Integer, String> finaltable = table.groupByKey().parallelDo(new HCatTestUtils.Fns.HCatRecordMapFn(schema),
Avros.tableOf(Avros.ints(), Avros.strings()));
List<Pair<Integer, String>> results = new ArrayList<>();
for (final Map.Entry<Integer, String> entry : finaltable.materializeToMap().entrySet()) {
results.add(Pair.of(entry.getKey(), entry.getValue()));
}
p.done();
assertEquals(ImmutableList.of(Pair.of(17, "josh"), Pair.of(29, "indiana")), results);
}
@Test
public void test_HCatRead_NonNativeTable_HBase() throws Exception {
HBaseTestingUtility hbaseTestUtil = null;
try {
String db = "default";
String hiveTable = "test";
Configuration hbaseConf = HBaseConfiguration.create(conf);
hbaseTestUtil = new HBaseTestingUtility(hbaseConf);
hbaseTestUtil.startMiniZKCluster();
hbaseTestUtil.startMiniHBaseCluster(1, 1);
org.apache.hadoop.hbase.client.Table table = hbaseTestUtil.createTable(TableName.valueOf("test-table"), "fam");
String key1 = "this-is-a-key";
Put put = new Put(Bytes.toBytes(key1));
put.addColumn("fam".getBytes(), "foo".getBytes(), "17".getBytes());
table.put(put);
String key2 = "this-is-a-key-too";
Put put2 = new Put(Bytes.toBytes(key2));
put2.addColumn("fam".getBytes(), "foo".getBytes(), "29".getBytes());
table.put(put2);
table.close();
org.apache.hadoop.hive.ql.metadata.Table tbl = new org.apache.hadoop.hive.ql.metadata.Table(db, hiveTable);
tbl.setOwner(UserGroupInformation.getCurrentUser().getShortUserName());
tbl.setTableType(TableType.EXTERNAL_TABLE);
FieldSchema f1 = new FieldSchema();
f1.setName("foo");
f1.setType("int");
FieldSchema f2 = new FieldSchema();
f2.setName("key");
f2.setType("string");
tbl.setProperty("hbase.table.name", "test-table");
tbl.setProperty("hbase.mapred.output.outputtable", "test-table");
tbl.setProperty("storage_handler", "org.apache.hadoop.hive.hbase.HBaseStorageHandler");
tbl.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe");
tbl.setFields(ImmutableList.of(f1, f2));
tbl.setSerdeParam("hbase.columns.mapping", "fam:foo,:key");
this.client.createTable(tbl.getTTable());
Pipeline p = new MRPipeline(HCatSourceITSpec.class, hbaseConf);
HCatSourceTarget src = (HCatSourceTarget) FromHCat.table(hiveTable);
HCatSchema schema = src.getTableSchema(p.getConfiguration());
PCollection<HCatRecord> records = p.read(src);
List<Pair<String, Integer>> mat = Lists.newArrayList(
records.parallelDo(new HCatTestUtils.Fns.KeyMapPairFn(schema), Avros.tableOf(Avros.strings(), Avros.ints()))
.materialize());
p.done();
assertEquals(ImmutableList.of(Pair.of(key1, 17), Pair.of(key2, 29)), mat);
} finally {
if (hbaseTestUtil != null) {
hbaseTestUtil.shutdownMiniHBaseCluster();
hbaseTestUtil.shutdownMiniZKCluster();
}
}
}
// writes data to the specified location and ensures the directory exists
// prior to writing
private Path writeDataToHdfs(String data, Path location, Configuration conf) throws IOException {
FileSystem fs = location.getFileSystem(conf);
Path writeLocation = new Path(location, UUID.randomUUID().toString());
fs.mkdirs(location);
fs.create(writeLocation);
ByteArrayInputStream baos = new ByteArrayInputStream(data.getBytes("UTF-8"));
try (FSDataOutputStream fos = fs.create(writeLocation)) {
IOUtils.copy(baos, fos);
}
return writeLocation;
}
}