minor changes to couchbase partitions.
diff --git a/benchmark/src/main/resources/META-INF/properties.xml b/benchmark/src/main/resources/META-INF/properties.xml
index c1f33d7..9c69b42 100644
--- a/benchmark/src/main/resources/META-INF/properties.xml
+++ b/benchmark/src/main/resources/META-INF/properties.xml
@@ -14,5 +14,95 @@
<name>dt.application.PerformanceBenchmarkForFixedNumberOfTuples.operator.WordGenerator.count</name>
<value>1000</value>
</property>
+ <property>
+ <name>dt.application.CouchBaseAppOutput.operator.couchbaseOuput.port.*.QUEUE_CAPACITY</name>
+ <value>32768</value>
+ <description>Specify queue capacity for all output ports</description>
+ </property>
+ <property>
+ <name>dt.application.CouchBaseAppOutput.operator.couchbaseInput.port.*.QUEUE_CAPACITY</name>
+ <value>32768</value>
+ <description>Specify queue capacity for all input ports</description>
+ </property>
+ <property>
+ <name>dt.application.CouchBaseAppOutput.operator.couchbaseUpdate.port.*.QUEUE_CAPACITY</name>
+ <value>32768</value>
+ <description>Specify queue capacity for all update ports</description>
+ </property>
+
+ <property>
+ <name>dt.application.CouchBaseAppOutput.operator.couchbaseOutput.store.uriString</name>
+ <value>127.0.0.1:8091</value>
+ </property>
+ <property>
+ <name>dt.application.CouchBaseAppOutput.operator.couchbaseOutput.store.bucket</name>
+ <value>default</value>
+ </property>
+ <property>
+ <name>dt.application.CouchBaseAppOutput.operator.couchbaseOutput.store.password</name>
+ <value>""</value>
+ </property>
+ <property>
+ <name>dt.application.CouchBaseAppOutput.operator.couchbaseOutput.store.bucketMeta</name>
+ <value>default</value>
+ </property>
+ <property>
+ <name>dt.application.CouchBaseAppOutput.operator.couchbaseOutput.store.passwordMeta</name>
+ <value>""</value>
+ </property>
+ <property>
+ <name>dt.application.couchbaseAppOutput.operator.couchbaseOutput.store.max_tuples</name>
+ <value>1000</value>
+ </property>
+ <property>
+ <name>dt.application.couchbaseAppOutput.operator.couchbaseOutput.store.queueSize</name>
+ <value>100</value>
+ </property>
+ <property>
+ <name>dt.application.couchbaseAppOutput.operator.couchbaseOutput.store.blocktime</name>
+ <value>10000</value>
+ </property>
+ <property>
+ <name>dt.application.couchbaseAppOutput.operator.couchbaseOutput.store.timeout</name>
+ <value>10000</value>
+ </property>
+ <property>
+ <name>dt.application.couchbaseAppOutput.operator.couchbaseOutput.store.userConfig</name>
+ <value>root</value>
+ </property>
+ <property>
+ <name>dt.application.couchbaseAppOutput.operator.couchbaseOutput.store.passwordConfig</name>
+ <value>prerna123</value>
+ </property>
+
+ <property>
+ <name>dt.application.CouchBaseAppInput.operator.couchbaseInput.store.uriString</name>
+ <value>127.0.0.1:8091</value>
+ </property>
+ <property>
+ <name>dt.application.CouchBaseAppInput.operator.couchbaseInput.store.blocktime</name>
+ <value>1000</value>
+ </property>
+ <property>
+ <name>dt.application.CouchBaseAppInput.operator.couchbaseInput.store.timeout</name>
+ <value>1000</value>
+ </property>
+ <property>
+ <name>dt.application.CouchBaseAppInput.operator.couchbaseInput.store.bucket</name>
+ <value>default</value>
+ </property>
+ <property>
+ <name>dt.application.CouchBaseAppInput.operator.couchbaseInput.store.password</name>
+ <value>""</value>
+ </property>
+ <property>
+ <name>dt.application.CouchBaseAppInput.operator.couchbaseInput.store.bucketMeta</name>
+ <value>default</value>
+ </property>
+ <property>
+ <name>dt.application.CouchBaseAppInput.operator.couchbaseInput.store.passwordMeta</name>
+ <value>""</value>
+ </property>
+
</configuration>
diff --git a/contrib/src/main/java/com/datatorrent/contrib/couchbase/AbstractCouchBaseInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/couchbase/AbstractCouchBaseInputOperator.java
index b47b69d..d71d070 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/couchbase/AbstractCouchBaseInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/couchbase/AbstractCouchBaseInputOperator.java
@@ -27,7 +27,6 @@
import com.datatorrent.api.Partitioner;
import com.datatorrent.common.util.DTThrowable;
-import static com.datatorrent.contrib.couchbase.CouchBaseStore.logger;
import com.esotericsoftware.kryo.Kryo;
import org.apache.commons.io.output.ByteArrayOutputStream;
import com.esotericsoftware.kryo.io.Input;
@@ -49,9 +48,7 @@
*/
public abstract class AbstractCouchBaseInputOperator<T> extends AbstractStoreInputOperator<T, CouchBaseStore> implements Partitioner<AbstractCouchBaseInputOperator<T>>
{
-
- //need to save this,hence non transient.
- protected static final Logger logger = LoggerFactory.getLogger(CouchBaseStore.class);
+ private static final Logger logger = LoggerFactory.getLogger(CouchBaseStore.class);
protected transient CouchbaseClient clientPartition = null;
private int serverIndex;
@@ -87,6 +84,7 @@
@Override
public void setup(Context.OperatorContext context)
{
+ super.setup(context);
if (conf == null) {
conf = store.getConf();
}
@@ -109,7 +107,6 @@
DTThrowable.rethrow(e);
}
}
-
}
@Override
@@ -126,7 +123,6 @@
{
List<String> keys = getKeys();
for (String key: keys) {
- //if(store.conf.getMaster(store.conf.getVbucketByKey(key))){
int master = conf.getMaster(conf.getVbucketByKey(key));
if (master == getServerIndex()) {
logger.info("master is {}", master);
@@ -140,7 +136,6 @@
}
}
- // }
}
@@ -173,7 +168,6 @@
oper.setServerIndex(i);
oper.setUrlString(list.get(i).toString());
logger.info("oper {} urlstring is {}", i, oper.getUrlString());
- // oper.setStore(this.store);
newPartitions.add(new DefaultPartition<AbstractCouchBaseInputOperator<T>>(oper));
}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/couchbase/CouchBaseStore.java b/contrib/src/main/java/com/datatorrent/contrib/couchbase/CouchBaseStore.java
index 43161df..a7be931 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/couchbase/CouchBaseStore.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/couchbase/CouchBaseStore.java
@@ -45,8 +45,8 @@
public class CouchBaseStore implements Connectable
{
- protected static final Logger logger = LoggerFactory.getLogger(CouchBaseStore.class);
- protected transient ConfigurationProvider configurationProvider;
+ private static final Logger logger = LoggerFactory.getLogger(CouchBaseStore.class);
+ private transient ConfigurationProvider configurationProvider;
@Nonnull
protected String bucket;
@@ -62,6 +62,31 @@
@Nonnull
protected String password;
@Nonnull
+ protected String userConfig;
+
+ public String getUserConfig()
+ {
+ return userConfig;
+ }
+
+ public void setUserConfig(String userConfig)
+ {
+ this.userConfig = userConfig;
+ }
+
+ public String getPasswordConfig()
+ {
+ return passwordConfig;
+ }
+
+ public void setPasswordConfig(String passwordConfig)
+ {
+ this.passwordConfig = passwordConfig;
+ }
+ @Nonnull
+ protected String passwordConfig;
+
+ @Nonnull
protected String uriString;
protected transient CouchbaseClient client;
@@ -144,28 +169,6 @@
return client;
}
- /*public CouchbaseClient getPartitionInstance(String serverURL)
- {
- ArrayList<URI> nodes = new ArrayList<URI>();
- CouchbaseClient clientPartition = null;
- serverURL = serverURL.replace("default", "pools");
- try {
- nodes.add(new URI(serverURL));
- }
- catch (URISyntaxException ex) {
- DTThrowable.rethrow(ex);
- }
-
- try {
- clientPartition = new CouchbaseClient(nodes, "default", "");
- }
- catch (IOException e) {
- logger.error("Error connecting to Couchbase: " + e.getMessage());
- DTThrowable.rethrow(e);
- }
- return clientPartition;
- }*/
-
public void addNodes(URI url)
{
baseURIs.add(url);
@@ -195,7 +198,7 @@
catch (IOException ex) {
DTThrowable.rethrow(ex);
}
- this.configurationProvider = new ConfigurationProviderHTTP(baseURIs, "root", "prerna123");
+ this.configurationProvider = new ConfigurationProviderHTTP(baseURIs, userConfig, passwordConfig);
Bucket configBucket = this.configurationProvider.getBucketConfiguration(bucket);
Config conf = configBucket.getConfig();
//List<InetSocketAddress> addrs=AddrUtil.getAddressesFromURL(cfb.getVBucketConfig().getCouchServers());
diff --git a/contrib/src/test/java/com/datatorrent/contrib/couchbase/CouchBaseInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/couchbase/CouchBaseInputOperatorTest.java
index 27f7a29..ba67308 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/couchbase/CouchBaseInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/couchbase/CouchBaseInputOperatorTest.java
@@ -52,9 +52,9 @@
private static int OPERATOR_ID = 0;
protected static ArrayList<URI> nodes = new ArrayList<URI>();
protected static ArrayList<String> keyList;
- private static String uri = "node13.morado.com:8091,node14.morado.com:8091";
+ private static String uri = "127.0.0.1:8091";
-
+
public void TestCouchBaseInputOperator()
{
CouchBaseWindowStore store = new CouchBaseWindowStore();
@@ -104,7 +104,6 @@
}
store.getInstance().flush();
- //store.getInstance().flush();
AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap();
attributeMap.put(DAG.APPLICATION_ID, APP_ID);
OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
@@ -133,7 +132,6 @@
sink.clear();
int wid = 0;
for(int i = 0; i < 10; i++) {
- //for(int i = 0; i < 10; i++) {
for(AbstractCouchBaseInputOperator<String> o : opers) {
o.beginWindow(wid);
o.emitTuples();
@@ -141,9 +139,6 @@
}
wid++;
}
- // wid++;
- //}
-
Assert.assertEquals("Tuples read should be same ", 100, sink.collectedTuples.size());
}