blob: b654c3e7d8e118ad6fe7506d3e37f4b2c4eeba28 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.contrib.couchbase;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.couchbase.mock.Bucket.BucketType;
import org.couchbase.mock.BucketConfiguration;
import org.couchbase.mock.CouchbaseMock;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.partitioner.StatelessPartitionerTest.PartitioningContextImpl;
import org.apache.apex.malhar.lib.testbench.CollectorTestSink;
import com.couchbase.client.CouchbaseClient;
import com.couchbase.client.CouchbaseConnectionFactory;
import com.couchbase.client.CouchbaseConnectionFactoryBuilder;
import com.google.common.collect.Lists;
import com.datatorrent.api.Attribute.AttributeMap;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Partitioner.Partition;
import com.datatorrent.netlet.util.DTThrowable;
public class CouchBaseInputOperatorTest
{
private static final Logger logger = LoggerFactory.getLogger(CouchBaseInputOperatorTest.class);
private static final String APP_ID = "CouchBaseInputOperatorTest";
private static final String password = "";
protected static ArrayList<String> keyList;
protected static CouchbaseClient client = null;
private final int numNodes = 2;
private final int numReplicas = 3;
protected CouchbaseConnectionFactory connectionFactory;
protected CouchbaseMock createMock(String name, String password,BucketConfiguration bucketConfiguration) throws Exception
{
bucketConfiguration.numNodes = numNodes;
bucketConfiguration.numReplicas = numReplicas;
bucketConfiguration.name = name;
bucketConfiguration.type = BucketType.COUCHBASE;
bucketConfiguration.password = password;
bucketConfiguration.hostname = "localhost";
ArrayList<BucketConfiguration> configList = new ArrayList<BucketConfiguration>();
configList.add(bucketConfiguration);
CouchbaseMock mockCouchbase = new CouchbaseMock(0, configList);
return mockCouchbase;
}
@Test
public void TestCouchBaseInputOperator() throws Exception
{
BucketConfiguration bucketConfiguration = new BucketConfiguration();
CouchbaseConnectionFactoryBuilder cfb = new CouchbaseConnectionFactoryBuilder();
CouchbaseMock mockCouchbase1 = createMock("default", "",bucketConfiguration);
CouchbaseMock mockCouchbase2 = createMock("default", "",bucketConfiguration);
mockCouchbase1.start();
mockCouchbase1.waitForStartup();
List<URI> uriList = new ArrayList<URI>();
int port1 = mockCouchbase1.getHttpPort();
logger.debug("port is {}", port1);
mockCouchbase2.start();
mockCouchbase2.waitForStartup();
int port2 = mockCouchbase2.getHttpPort();
logger.debug("port is {}", port2);
uriList.add(new URI("http", null, "localhost", port1, "/pools", "", ""));
connectionFactory = cfb.buildCouchbaseConnection(uriList, bucketConfiguration.name, bucketConfiguration.password);
client = new CouchbaseClient(connectionFactory);
CouchBaseStore store = new CouchBaseStore();
keyList = new ArrayList<String>();
store.setBucket(bucketConfiguration.name);
store.setPasswordConfig(password);
store.setPassword(bucketConfiguration.password);
store.setUriString("localhost:" + port1 + "," + "localhost:" + port1);
// couchbaseBucket.getCouchServers();
AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap();
attributeMap.put(DAG.APPLICATION_ID, APP_ID);
TestInputOperator inputOperator = new TestInputOperator();
inputOperator.setStore(store);
inputOperator.insertEventsInTable(10);
CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
inputOperator.outputPort.setSink(sink);
List<Partition<AbstractCouchBaseInputOperator<String>>> partitions = Lists.newArrayList();
Collection<Partition<AbstractCouchBaseInputOperator<String>>> newPartitions = inputOperator.definePartitions(partitions, new PartitioningContextImpl(null, 0));
Assert.assertEquals(2, newPartitions.size());
for (Partition<AbstractCouchBaseInputOperator<String>> p: newPartitions) {
Assert.assertNotSame(inputOperator, p.getPartitionedInstance());
}
//Collect all operators in a list
List<AbstractCouchBaseInputOperator<String>> opers = Lists.newArrayList();
for (Partition<AbstractCouchBaseInputOperator<String>> p: newPartitions) {
TestInputOperator oi = (TestInputOperator)p.getPartitionedInstance();
oi.setServerURIString("localhost:" + port1);
oi.setStore(store);
oi.setup(null);
oi.outputPort.setSink(sink);
opers.add(oi);
port1 = port2;
}
sink.clear();
int wid = 0;
for (int i = 0; i < 10; i++) {
for (AbstractCouchBaseInputOperator<String> o: opers) {
o.beginWindow(wid);
o.emitTuples();
o.endWindow();
}
wid++;
}
Assert.assertEquals("Tuples read should be same ", 10, sink.collectedTuples.size());
for (AbstractCouchBaseInputOperator<String> o: opers) {
o.teardown();
}
mockCouchbase1.stop();
mockCouchbase2.stop();
}
public static class TestInputOperator extends AbstractCouchBaseInputOperator<String>
{
@SuppressWarnings("unchecked")
@Override
public String getTuple(Object entry)
{
String tuple = entry.toString();
logger.debug("returned tuple is {}", tuple);
return tuple;
}
@Override
public ArrayList<String> getKeys()
{
return keyList;
}
public void insertEventsInTable(int numEvents)
{
String key;
Integer value;
logger.debug("number of events is {}", numEvents);
for (int i = 0; i < numEvents; i++) {
key = String.valueOf("Key" + i * 10);
keyList.add(key);
value = i * 100;
try {
client.set(key, value).get();
} catch (InterruptedException ex) {
DTThrowable.rethrow(ex);
} catch (ExecutionException ex) {
DTThrowable.rethrow(ex);
}
}
client.shutdown();
client = null;
}
}
}