blob: 0350c6ed4dd561727a9e660064a42f30c22d3549 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.hive.bolt;
import backtype.storm.Config;
import backtype.storm.task.GeneralTopologyContext;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.TupleImpl;
import backtype.storm.tuple.Values;
import org.apache.storm.hive.common.HiveOptions;
import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
import org.apache.storm.hive.bolt.mapper.JsonRecordHiveMapper;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.Spy;
import org.mockito.MockitoAnnotations;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*;
import junit.framework.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.HashMap;
import java.util.ArrayList;
import java.io.IOException;
import java.util.Date;
import java.util.Set;
import java.util.HashSet;
import java.text.SimpleDateFormat;
import org.apache.hive.hcatalog.streaming.*;
public class TestHiveBolt {
final static String dbName = "testdb";
final static String tblName = "test_table";
final static String dbName1 = "testdb1";
final static String tblName1 = "test_table1";
final static String PART1_NAME = "city";
final static String PART2_NAME = "state";
final static String[] partNames = { PART1_NAME, PART2_NAME };
final String partitionVals = "sunnyvale,ca";
private static final String COL1 = "id";
private static final String COL2 = "msg";
final String[] colNames = {COL1,COL2};
final String[] colNames1 = {COL2,COL1};
private String[] colTypes = {serdeConstants.INT_TYPE_NAME, serdeConstants.STRING_TYPE_NAME};
private final HiveConf conf;
private final Driver driver;
private final int port ;
final String metaStoreURI;
private String dbLocation;
private Config config = new Config();
private HiveBolt bolt;
private final static boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
@Rule
public TemporaryFolder dbFolder = new TemporaryFolder();
@Mock
private OutputCollector collector;
private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class);
public TestHiveBolt() throws Exception {
port=9083;
dbLocation = new String();
//metaStoreURI = "jdbc:derby:;databaseName="+System.getProperty("java.io.tmpdir") +"metastore_db;create=true";
metaStoreURI = null;
conf = HiveSetupUtil.getHiveConf();
TxnDbUtil.setConfValues(conf);
TxnDbUtil.cleanDb();
TxnDbUtil.prepDb();
SessionState.start(new CliSessionState(conf));
driver = new Driver(conf);
// driver.init();
}
@Before
public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
HiveSetupUtil.dropDB(conf, dbName);
if(WINDOWS) {
dbLocation = dbFolder.newFolder(dbName + ".db").getCanonicalPath();
} else {
dbLocation = "raw://" + dbFolder.newFolder(dbName + ".db").getCanonicalPath();
}
HiveSetupUtil.createDbAndTable(conf, dbName, tblName, Arrays.asList(partitionVals.split(",")),
colNames, colTypes, partNames, dbLocation);
System.out.println("done");
}
@Test
public void testEndpointConnection() throws Exception {
// 1) Basic
HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName
, Arrays.asList(partitionVals.split(",")));
StreamingConnection connection = endPt.newConnection(false, null); //shouldn't throw
connection.close();
// 2) Leave partition unspecified
endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, null);
endPt.newConnection(false, null).close(); // should not throw
}
@Test
public void testWithByteArrayIdandMessage()
throws Exception {
DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
.withColumnFields(new Fields(colNames))
.withPartitionFields(new Fields(partNames));
HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
.withTxnsPerBatch(2)
.withBatchSize(2);
bolt = new HiveBolt(hiveOptions);
bolt.prepare(config,null,collector);
Integer id = 100;
String msg = "test-123";
String city = "sunnyvale";
String state = "ca";
checkRecordCountInTable(tblName, dbName, 0);
Set<Tuple> tupleSet = new HashSet<Tuple>();
for (int i=0; i < 4; i++) {
Tuple tuple = generateTestTuple(id,msg,city,state);
bolt.execute(tuple);
tupleSet.add(tuple);
}
for (Tuple t : tupleSet)
verify(collector).ack(t);
checkRecordCountInTable(tblName, dbName, 4);
bolt.cleanup();
}
@Test
public void testWithoutPartitions()
throws Exception {
HiveSetupUtil.dropDB(conf,dbName1);
HiveSetupUtil.createDbAndTable(conf, dbName1, tblName1,null,
colNames,colTypes,null, dbLocation);
DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
.withColumnFields(new Fields(colNames));
HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName1,tblName1,mapper)
.withTxnsPerBatch(2)
.withBatchSize(2)
.withAutoCreatePartitions(false);
bolt = new HiveBolt(hiveOptions);
bolt.prepare(config,null,collector);
Integer id = 100;
String msg = "test-123";
String city = "sunnyvale";
String state = "ca";
checkRecordCountInTable(tblName1,dbName1,0);
Set<Tuple> tupleSet = new HashSet<Tuple>();
for (int i=0; i < 4; i++) {
Tuple tuple = generateTestTuple(id,msg,city,state);
bolt.execute(tuple);
tupleSet.add(tuple);
}
for (Tuple t : tupleSet)
verify(collector).ack(t);
bolt.cleanup();
checkRecordCountInTable(tblName1, dbName1, 4);
}
@Test
public void testWithTimeformat()
throws Exception {
String[] partNames1 = {"date"};
String timeFormat = "yyyy/MM/dd";
HiveSetupUtil.dropDB(conf,dbName1);
HiveSetupUtil.createDbAndTable(conf, dbName1, tblName1, null,
colNames, colTypes, partNames1, dbLocation);
DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
.withColumnFields(new Fields(colNames))
.withTimeAsPartitionField(timeFormat);
HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName1,tblName1,mapper)
.withTxnsPerBatch(2)
.withBatchSize(1);
bolt = new HiveBolt(hiveOptions);
bolt.prepare(config,null,collector);
Integer id = 100;
String msg = "test-123";
Date d = new Date();
SimpleDateFormat parseDate = new SimpleDateFormat(timeFormat);
String today=parseDate.format(d.getTime());
checkRecordCountInTable(tblName1, dbName1, 0);
Set<Tuple> tupleSet = new HashSet<Tuple>();
for (int i=0; i < 2; i++) {
Tuple tuple = generateTestTuple(id,msg,null,null);
tupleSet.add(tuple);
bolt.execute(tuple);
}
for (Tuple t : tupleSet)
verify(collector).ack(t);
checkDataWritten(tblName1, dbName1, "100,test-123," + today, "100,test-123," + today);
bolt.cleanup();
}
@Test
public void testData()
throws Exception {
DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
.withColumnFields(new Fields(colNames))
.withPartitionFields(new Fields(partNames));
HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
.withTxnsPerBatch(2)
.withBatchSize(1);
bolt = new HiveBolt(hiveOptions);
bolt.prepare(config, null, new OutputCollector(collector));
Tuple tuple1 = generateTestTuple(1, "SJC", "Sunnyvale", "CA");
//Tuple tuple2 = generateTestTuple(2,"SFO","San Jose","CA");
bolt.execute(tuple1);
verify(collector).ack(tuple1);
//bolt.execute(tuple2);
//verify(collector).ack(tuple2);
checkDataWritten(tblName, dbName, "1,SJC,Sunnyvale,CA");
bolt.cleanup();
}
@Test
public void testJsonWriter()
throws Exception {
// json record doesn't need columns to be in the same order
// as table in hive.
JsonRecordHiveMapper mapper = new JsonRecordHiveMapper()
.withColumnFields(new Fields(colNames1))
.withPartitionFields(new Fields(partNames));
HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
.withTxnsPerBatch(2)
.withBatchSize(1);
bolt = new HiveBolt(hiveOptions);
bolt.prepare(config,null,collector);
Tuple tuple1 = generateTestTuple(1,"SJC","Sunnyvale","CA");
//Tuple tuple2 = generateTestTuple(2,"SFO","San Jose","CA");
bolt.execute(tuple1);
verify(collector).ack(tuple1);
//bolt.execute(tuple2);
//verify(collector).ack(tuple2);
checkDataWritten(tblName, dbName, "1,SJC,Sunnyvale,CA");
bolt.cleanup();
}
@Test
public void testNoAcksUntilFlushed()
{
JsonRecordHiveMapper mapper = new JsonRecordHiveMapper()
.withColumnFields(new Fields(colNames1))
.withPartitionFields(new Fields(partNames));
HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
.withTxnsPerBatch(2)
.withBatchSize(2);
bolt = new HiveBolt(hiveOptions);
bolt.prepare(config, null, new OutputCollector(collector));
Tuple tuple1 = generateTestTuple(1,"SJC","Sunnyvale","CA");
Tuple tuple2 = generateTestTuple(2,"SFO","San Jose","CA");
bolt.execute(tuple1);
verifyZeroInteractions(collector);
bolt.execute(tuple2);
verify(collector).ack(tuple1);
verify(collector).ack(tuple2);
bolt.cleanup();
}
@Test
public void testNoAcksIfFlushFails() throws Exception
{
JsonRecordHiveMapper mapper = new JsonRecordHiveMapper()
.withColumnFields(new Fields(colNames1))
.withPartitionFields(new Fields(partNames));
HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
.withTxnsPerBatch(2)
.withBatchSize(2);
HiveBolt spyBolt = Mockito.spy(new HiveBolt(hiveOptions));
//This forces a failure of all the flush attempts
doThrow(new InterruptedException()).when(spyBolt).flushAllWriters(true);
doThrow(new Exception()).when(spyBolt).flushAndCloseWriters();
spyBolt.prepare(config, null, new OutputCollector(collector));
Tuple tuple1 = generateTestTuple(1,"SJC","Sunnyvale","CA");
Tuple tuple2 = generateTestTuple(2,"SFO","San Jose","CA");
spyBolt.execute(tuple1);
spyBolt.execute(tuple2);
verify(collector, never()).ack(tuple1);
verify(collector, never()).ack(tuple2);
spyBolt.cleanup();
}
@Test
public void testMultiPartitionTuples()
throws Exception {
DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
.withColumnFields(new Fields(colNames))
.withPartitionFields(new Fields(partNames));
HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
.withTxnsPerBatch(10)
.withBatchSize(10);
bolt = new HiveBolt(hiveOptions);
bolt.prepare(config,null,new OutputCollector(collector));
Integer id = 1;
String msg = "test";
String city = "San Jose";
String state = "CA";
checkRecordCountInTable(tblName,dbName,0);
Set<Tuple> tupleSet = new HashSet<Tuple>();
for(int i=0; i < 100; i++) {
Tuple tuple = generateTestTuple(id,msg,city,state);
tupleSet.add(tuple);
bolt.execute(tuple);
}
checkRecordCountInTable(tblName, dbName, 100);
for (Tuple t : tupleSet)
verify(collector).ack(t);
bolt.cleanup();
}
private void checkRecordCountInTable(String tableName,String dbName,int expectedCount)
throws CommandNeedRetryException, IOException {
int count = listRecordsInTable(tableName,dbName).size();
Assert.assertEquals(expectedCount, count);
}
private ArrayList<String> listRecordsInTable(String tableName,String dbName)
throws CommandNeedRetryException, IOException {
driver.compile("select * from " + dbName + "." + tableName);
ArrayList<String> res = new ArrayList<String>();
driver.getResults(res);
return res;
}
private void checkDataWritten(String tableName,String dbName,String... row)
throws CommandNeedRetryException, IOException {
ArrayList<String> results = listRecordsInTable(tableName,dbName);
for(int i = 0; i < row.length && results.size() > 0; i++) {
String resultRow = results.get(i).replace("\t",",");
System.out.println(resultRow);
assertEquals(row[i],resultRow);
}
}
private Tuple generateTestTuple(Object id, Object msg,Object city,Object state) {
TopologyBuilder builder = new TopologyBuilder();
GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(),
new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
@Override
public Fields getComponentOutputFields(String componentId, String streamId) {
return new Fields("id", "msg","city","state");
}
};
return new TupleImpl(topologyContext, new Values(id, msg,city,state), 1, "");
}
}