blob: a237201fb9c0cf4b3f7297b71313cef2ea630ac5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.contrib.hbase;
import java.util.*;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import com.datatorrent.lib.util.FieldInfo.SupportType;
import com.datatorrent.lib.util.TableInfo;
import com.datatorrent.contrib.util.TestPOJO;
import com.datatorrent.contrib.util.TupleGenerator;
import com.datatorrent.api.Attribute.AttributeMap;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Operator.ProcessingMode;
import static com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest.APP_ID;
import static com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest.OPERATOR_ID;
import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
public class HBasePOJOPutOperatorTest
{
private static final Logger logger = LoggerFactory.getLogger(HBasePOJOPutOperatorTest.class);
public static final int TEST_SIZE = 15000;
public static final int WINDOW_SIZE = 1500;
private HBasePOJOPutOperator operator;
private final long startWindowId = Calendar.getInstance().getTimeInMillis();
public HBasePOJOPutOperatorTest()
{
}
@Before
public void prepare() throws Exception
{
operator = new HBasePOJOPutOperator();
setupOperator(operator);
createOrDeleteTable(operator.getStore(), false );
}
@After
public void cleanup() throws Exception
{
createOrDeleteTable(operator.getStore(), true );
}
/**
* this test case only test if HBasePojoPutOperator can save data to the
* HBase. it doesn't test connection to the other operators
*/
@Test
public void testPutInternal() throws Exception
{
writeRecords();
readRecordsAndVerify();
}
protected void writeRecords()
{
long windowId = startWindowId;
try
{
int countInWindow = 0;
for (int i = 0; i < TEST_SIZE; ++i)
{
if( countInWindow == 0 )
operator.beginWindow(windowId++);
operator.processTuple(getNextTuple());
if( ++countInWindow == WINDOW_SIZE )
{
operator.endWindow();
countInWindow = 0;
}
}
Thread.sleep(30000);
}
catch (Exception e)
{
logger.error("testPutInternal() exception.", e);
Assert.fail(e.getMessage());
}
}
protected void createOrDeleteTable(HBaseStore store, boolean isDelete ) throws Exception
{
HBaseAdmin admin = null;
try
{
admin = new HBaseAdmin(store.getConfiguration());
final String tableName = store.getTableName();
if (!admin.isTableAvailable(tableName) && !isDelete )
{
HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
tableDescriptor.addFamily(new HColumnDescriptor("f0"));
tableDescriptor.addFamily(new HColumnDescriptor("f1"));
admin.createTable(tableDescriptor);
}
else if( isDelete )
{
admin.disableTable(tableName);
admin.deleteTable( tableName );
}
}
catch (Exception e)
{
logger.error("exception", e);
throw e;
}
finally
{
if (admin != null)
{
try
{
admin.close();
}
catch (Exception e)
{
logger.warn("close admin exception. ", e);
}
}
}
}
protected void setupOperator(HBasePOJOPutOperator operator)
{
configure(operator);
AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap();
attributeMap.put(OperatorContext.PROCESSING_MODE, ProcessingMode.AT_LEAST_ONCE);
attributeMap.put(OperatorContext.ACTIVATION_WINDOW_ID, -1L);
attributeMap.put(DAG.APPLICATION_ID, APP_ID);
OperatorContext context = mockOperatorContext(OPERATOR_ID, attributeMap);
operator.setup(context);
}
protected void configure(HBasePOJOPutOperator operator)
{
TableInfo<HBaseFieldInfo> tableInfo = new TableInfo<HBaseFieldInfo>();
tableInfo.setRowOrIdExpression("row");
List<HBaseFieldInfo> fieldsInfo = new ArrayList<HBaseFieldInfo>();
fieldsInfo.add( new HBaseFieldInfo( "name", "name", SupportType.STRING, "f0") );
fieldsInfo.add( new HBaseFieldInfo( "age", "age", SupportType.INTEGER, "f1") );
fieldsInfo.add( new HBaseFieldInfo( "address", "address", SupportType.STRING, "f1") );
tableInfo.setFieldsInfo(fieldsInfo);
operator.setTableInfo(tableInfo);
HBaseStore store = new HBaseStore();
store.setTableName("test");
store.setZookeeperQuorum("localhost");
store.setZookeeperClientPort(2181);
operator.setStore(store);
}
private TupleGenerator<TestPOJO> tupleGenerator;
protected Object getNextTuple()
{
if( tupleGenerator == null )
tupleGenerator = new TupleGenerator<TestPOJO>( TestPOJO.class );
return tupleGenerator.getNextTuple();
}
protected void resetTupleGenerator()
{
if( tupleGenerator == null )
tupleGenerator = new TupleGenerator<TestPOJO>( TestPOJO.class );
else
tupleGenerator.reset();
}
protected void readRecordsAndVerify()
{
int[] rowIds = new int[ TEST_SIZE ];
for( int i=1; i<=TEST_SIZE; ++i )
rowIds[i-1] = 1;
try
{
HTable table = operator.getStore().getTable();
Scan scan = new Scan();
ResultScanner resultScanner = table.getScanner(scan);
int recordCount = 0;
while( true )
{
Result result = resultScanner.next();
if( result == null )
break;
int rowId = Integer.valueOf( Bytes.toString( result.getRow() ) );
Assert.assertTrue( "rowId="+rowId+" aut of range" , ( rowId > 0 && rowId <= TEST_SIZE ) );
Assert.assertTrue( "the rowId="+rowId+" already processed.", rowIds[rowId-1] == 1 );
rowIds[rowId-1]=0;
List<Cell> cells = result.listCells();
Map<String, byte[]> map = new HashMap<String,byte[]>();
for( Cell cell : cells )
{
String columnName = Bytes.toString( CellUtil.cloneQualifier(cell) );
byte[] value = CellUtil.cloneValue(cell);
map.put(columnName, value);
}
TestPOJO read = TestPOJO.from(map);
read.setRowId((long)rowId);
TestPOJO expected = new TestPOJO( rowId );
Assert.assertTrue( String.format( "expected %s, get %s ", expected.toString(), read.toString() ), expected.completeEquals(read) );
recordCount++;
}
int missedCount = 0;
if( recordCount != TEST_SIZE )
{
logger.error( "unsaved records: " );
StringBuilder sb = new StringBuilder();
for( int i=0; i<TEST_SIZE; ++i )
{
if( rowIds[i] != 0 )
{
sb.append(i+1).append(", ");
missedCount++;
}
if( missedCount>0 && ( missedCount%20 == 0 ) )
{
logger.error( sb.toString() );
sb.delete( 0, sb.length() );
}
}
logger.error( sb.toString() );
logger.error( "End of unsaved records" );
}
Assert.assertTrue( "expected total records = " + TEST_SIZE + ", got " + recordCount + ", missed " + missedCount, recordCount==TEST_SIZE );
}
catch( Exception e )
{
throw new RuntimeException( "exception", e );
}
}
public HBasePOJOPutOperator getOperator()
{
return operator;
}
}