blob: ba67308a2f647dda6b3a21b00f23119f593334ec [file] [log] [blame]
/*
* Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.contrib.couchbase;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.api.Attribute.AttributeMap;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Partitioner.Partition;
import com.datatorrent.common.util.DTThrowable;
import com.datatorrent.lib.io.fs.AbstractFSDirectoryInputOperator;
import com.datatorrent.lib.io.fs.AbstractFSDirectoryInputOperatorTest.TestFSDirectoryInputOperator;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
public class CouchBaseInputOperatorTest
{
private static final Logger logger = LoggerFactory.getLogger(CouchBaseInputOperatorTest.class);
private static String APP_ID = "CouchBaseInputOperatorTest";
private static String bucket = "default";
private static String password = "";
private static int OPERATOR_ID = 0;
protected static ArrayList<URI> nodes = new ArrayList<URI>();
protected static ArrayList<String> keyList;
private static String uri = "127.0.0.1:8091";
public void TestCouchBaseInputOperator()
{
CouchBaseWindowStore store = new CouchBaseWindowStore();
keyList = new ArrayList<String>();
store.setBucket(bucket);
store.setPassword(password);
store.setUriString(uri);
try {
store.connect();
}
catch (IOException ex) {
DTThrowable.rethrow(ex);
}
store.getInstance().flush();
AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap();
attributeMap.put(DAG.APPLICATION_ID, APP_ID);
OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
TestInputOperator inputOperator = new TestInputOperator();
inputOperator.setStore(store);
inputOperator.insertEventsInTable(100);
CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
inputOperator.outputPort.setSink(sink);
inputOperator.setup(context);
inputOperator.beginWindow(0);
inputOperator.emitTuples();
inputOperator.endWindow();
Assert.assertEquals("tuples in couchbase", 100, sink.collectedTuples.size());
}
@Test
public void TestCouchBaseInputOperatorWithPartitions()
{
CouchBaseWindowStore store = new CouchBaseWindowStore();
keyList = new ArrayList<String>();
store.setBucket(bucket);
store.setPassword(password);
store.setUriString(uri);
try {
store.connect();
}
catch (IOException ex) {
DTThrowable.rethrow(ex);
}
store.getInstance().flush();
AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap();
attributeMap.put(DAG.APPLICATION_ID, APP_ID);
OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
List<Partition<AbstractCouchBaseInputOperator<String>>> partitions = Lists.newArrayList();
TestInputOperator inputOperator = new TestInputOperator();
inputOperator.setStore(store);
inputOperator.insertEventsInTable(100);
CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
inputOperator.outputPort.setSink(sink);
partitions.add(new DefaultPartition<AbstractCouchBaseInputOperator<String>>(inputOperator));
Collection<Partition<AbstractCouchBaseInputOperator<String>>> newPartitions = inputOperator.definePartitions(partitions, 1);
Assert.assertEquals(2, newPartitions.size());
for (Partition<AbstractCouchBaseInputOperator<String>> p : newPartitions) {
Assert.assertNotSame(inputOperator, p.getPartitionedInstance());
}
/* Collect all operators in a list */
List<AbstractCouchBaseInputOperator<String>> opers = Lists.newArrayList();
for (Partition<AbstractCouchBaseInputOperator<String>> p : newPartitions) {
TestInputOperator oi = (TestInputOperator)p.getPartitionedInstance();
oi.setStore(store);
oi.setup(null);
oi.outputPort.setSink(sink);
opers.add(oi);
}
sink.clear();
int wid = 0;
for(int i = 0; i < 10; i++) {
for(AbstractCouchBaseInputOperator<String> o : opers) {
o.beginWindow(wid);
o.emitTuples();
o.endWindow();
}
wid++;
}
Assert.assertEquals("Tuples read should be same ", 100, sink.collectedTuples.size());
}
public static class TestInputOperator extends AbstractCouchBaseInputOperator<String>
{
@SuppressWarnings("unchecked")
@Override
public String getTuple(Object entry)
{
String tuple = entry.toString();
return tuple;
}
@Override
public ArrayList<String> getKeys()
{
return keyList;
}
public void insertEventsInTable(int numEvents)
{
String key = null;
Integer value = null;
logger.info("number of events is" + numEvents);
for (int i = 0; i < numEvents; i++) {
key = String.valueOf("Key" + i * 10);
keyList.add(key);
value = i * 100;
try {
store.client.set(key, value).get();
}
catch (InterruptedException ex) {
DTThrowable.rethrow(ex);
}
catch (ExecutionException ex) {
DTThrowable.rethrow(ex);
}
}
}
}
}