blob: c7f3f608bef527926d6a6ff4749448e856cb055f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.contrib.couchbase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.testbench.CollectorTestSink;
import com.couchbase.client.protocol.views.DesignDocument;
import com.couchbase.client.protocol.views.ViewDesign;
import com.datatorrent.api.Attribute.AttributeMap;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG;
import com.datatorrent.netlet.util.DTThrowable;
import static org.apache.apex.malhar.lib.helper.OperatorContextTestHelper.mockOperatorContext;
public class CouchBasePOJOTest
{
private static final Logger logger = LoggerFactory.getLogger(CouchBaseInputOperatorTest.class);
private static final String APP_ID = "CouchBaseInputOperatorTest";
private static final String bucket = "default";
private static final String password = "";
private static final int OPERATOR_ID = 0;
protected static ArrayList<String> keyList;
private static final String uri = "localhost:8091";
private static final String DESIGN_DOC_ID1 = "dev_test1";
private static final String TEST_VIEW1 = "testView1";
@Test
public void TestCouchBaseInputOperator()
{
CouchBaseWindowStore store = new CouchBaseWindowStore();
System.setProperty("viewmode", "development");
keyList = new ArrayList<String>();
store.setBucket(bucket);
store.setPassword(password);
store.setUriString(uri);
try {
store.connect();
} catch (IOException ex) {
DTThrowable.rethrow(ex);
}
store.getInstance().flush();
AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap();
attributeMap.put(DAG.APPLICATION_ID, APP_ID);
OperatorContext context = mockOperatorContext(OPERATOR_ID, attributeMap);
TestInputOperator inputOperator = new TestInputOperator();
inputOperator.setStore(store);
inputOperator.setOutputClass("org.apache.apex.malhar.contrib.couchbase.TestComplexPojoInput");
inputOperator.insertEventsInTable(2);
try {
Thread.sleep(10000);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
inputOperator.createAndFetchViewQuery1();
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
inputOperator.outputPort.setSink(sink);
inputOperator.setup(context);
inputOperator.setDesignDocumentName(DESIGN_DOC_ID1);
inputOperator.setViewName(TEST_VIEW1);
inputOperator.beginWindow(0);
inputOperator.emitTuples();
inputOperator.endWindow();
logger.debug("collected tuples are {}", sink.collectedTuples.size());
int count = 0;
for (Object o: sink.collectedTuples) {
count++;
TestComplexPojoInput object = (TestComplexPojoInput)o;
if (count == 1) {
Assert.assertEquals("name set in testpojo", "test", object.getName());
Assert.assertEquals("map in testpojo", "{test=12345}", object.getMap().toString());
Assert.assertEquals("age in testpojo", "23", object.getAge().toString());
}
if (count == 2) {
Assert.assertEquals("name set in testpojo", "test1", object.getName());
Assert.assertEquals("map in testpojo", "{test2=12345}", object.getMap().toString());
Assert.assertEquals("age in testpojo", "12", object.getAge().toString());
}
}
sink.clear();
store.client.deleteDesignDoc(DESIGN_DOC_ID1);
inputOperator.teardown();
}
public static class TestInputOperator extends CouchBasePOJOInputOperator
{
private void insertEventsInTable(int numEvents)
{
logger.info("number of events is" + numEvents);
try {
store.client.set("Key1", 431);
store.client.set("Key2", "{\"name\":\"test\",\"map\":{\"test\":12345},\"age\":23}").get();
store.client.set("Key3", "{\"name\":\"test1\",\"map\":{\"test2\":12345},\"age\":12}").get();
} catch (InterruptedException ex) {
DTThrowable.rethrow(ex);
} catch (ExecutionException ex) {
DTThrowable.rethrow(ex);
}
}
public void createAndFetchViewQuery1()
{
DesignDocument designDoc = new DesignDocument(DESIGN_DOC_ID1);
String viewName = TEST_VIEW1;
String mapFunction
= "function (doc, meta) {\n"
+ " if( meta.type == \"json\") {\n"
+ " emit(doc.key,doc);\n"
+ " }\n"
+ " }";
ViewDesign viewDesign = new ViewDesign(viewName, mapFunction);
designDoc.getViews().add(viewDesign);
store.client.createDesignDoc(designDoc);
}
}
}