changes.
diff --git a/contrib/src/main/java/com/datatorrent/contrib/memsql/MemsqlOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/memsql/MemsqlOutputOperator.java
index ce83aa7..795cd53 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/memsql/MemsqlOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/memsql/MemsqlOutputOperator.java
@@ -192,23 +192,23 @@
String getterExpression = expression.get(i);
if (type == Types.CHAR) {
GetterChar getChar = PojoUtils.createGetterChar(fqcn, getterExpression);
- getters.add(getChar.get(tuple));
+ getters.add(getChar);
}
else if (type == Types.VARCHAR) {
GetterString getVarchar = PojoUtils.createGetterString(fqcn, getterExpression);
- getters.add(getVarchar.get(tuple));
+ getters.add(getVarchar);
}
else if (type == Types.BOOLEAN || type == Types.TINYINT) {
GetterBoolean getBoolean = PojoUtils.createGetterBoolean(fqcn, getterExpression);
- getters.add(getBoolean.get(tuple));
+ getters.add(getBoolean);
}
else if (type == Types.SMALLINT) {
GetterShort getShort = PojoUtils.createGetterShort(fqcn, getterExpression);
- getters.add(getShort.get(tuple));
+ getters.add(getShort);
}
else if (type == Types.INTEGER) {
GetterInt getInt = PojoUtils.createGetterInt(fqcn, getterExpression);
- getters.add(getInt.get(tuple));
+ getters.add(getInt);
}
else if (type == Types.BIGINT) {
GetterLong getLong = PojoUtils.createExpressionGetterLong(fqcn, getterExpression);
@@ -216,31 +216,31 @@
}
else if (type == Types.DECIMAL) {
GetterObject getObject = PojoUtils.createGetterObject(fqcn, getterExpression);
- getters.add((Number)getObject.get(tuple));
+ getters.add(getObject);
}
else if (type == Types.FLOAT) {
GetterFloat getFloat = PojoUtils.createGetterFloat(fqcn, getterExpression);
- getters.add(getFloat.get(tuple));
+ getters.add(getFloat);
}
else if (type == Types.DOUBLE) {
GetterDouble getDouble = PojoUtils.createGetterDouble(fqcn, getterExpression);
- getters.add(getDouble.get(tuple));
+ getters.add(getDouble);
}
else if (type == Types.DATE) {
GetterObject getObject = PojoUtils.createGetterObject(fqcn, getterExpression);
- getters.add((Date)getObject.get(tuple));
+ getters.add(getObject);
}
else if (type == Types.TIME) {
GetterObject getObject = PojoUtils.createGetterObject(fqcn, getterExpression);
- getters.add((Time)getObject.get(tuple));
+ getters.add(getObject);
}
else if (type == Types.ARRAY) {
GetterObject getObject = PojoUtils.createGetterObject(fqcn, getterExpression);
- getters.add((Array)getObject.get(tuple));
+ getters.add(getObject);
}
else if (type == Types.OTHER) {
GetterObject getObject = PojoUtils.createGetterObject(fqcn, getterExpression);
- getters.add(getObject.get(tuple));
+ getters.add(getObject);
}
}
@@ -256,9 +256,55 @@
@Override
protected void setStatementParameters(PreparedStatement statement, Object tuple) throws SQLException
{
- int size = dataColumns.size();
+ int size = columnDataTypes.size();
+ Object getter;
for (int i = 0; i < size; i++) {
- statement.setObject(i + 1, getters.get(i));
+ int type = columnDataTypes.get(i);
+ switch (type) {
+ case (Types.CHAR):
+ getter = ((GetterChar)getters.get(i)).get(tuple);
+ break;
+ case (Types.VARCHAR):
+ getter = ((GetterString)getters.get(i)).get(tuple);
+ break;
+ case (Types.BOOLEAN):
+ getter = ((GetterString)getters.get(i)).get(tuple);
+ break;
+ case (Types.SMALLINT):
+ getter = ((GetterString)getters.get(i)).get(tuple);
+ break;
+ case (Types.INTEGER):
+ getter = ((GetterString)getters.get(i)).get(tuple);
+ break;
+ case (Types.BIGINT):
+ getter = ((GetterString)getters.get(i)).get(tuple);
+ break;
+ case (Types.DECIMAL):
+ getter = ((GetterString)getters.get(i)).get(tuple);
+ break;
+ case (Types.FLOAT):
+ getter = ((GetterString)getters.get(i)).get(tuple);
+ break;
+ case (Types.DOUBLE):
+ getter = ((GetterString)getters.get(i)).get(tuple);
+ break;
+ case (Types.DATE):
+ getter = ((GetterString)getters.get(i)).get(tuple);
+ break;
+ case (Types.TIME):
+ getter = ((GetterString)getters.get(i)).get(tuple);
+ break;
+ case (Types.ARRAY):
+ getter = ((GetterString)getters.get(i)).get(tuple);
+ break;
+ case (Types.OTHER):
+ getter = ((GetterString)getters.get(i)).get(tuple);
+ break;
+ default:
+ getter = ((GetterString)getters.get(i)).get(tuple);
+ break;
+ }
+ statement.setObject(i + 1, getter);
}
}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkInputOperator.java
new file mode 100644
index 0000000..0aa510a
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkInputOperator.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2012-2015 Malhar, Inc.
+ * All Rights Reserved.
+ */
+package com.datatorrent.contrib.splunk;
+
+import javax.validation.constraints.NotNull;
+
+public class SplunkInputOperator extends AbstractSplunkInputOperator<String>
+{
+ @NotNull
+ private String query = "search * | head 100";
+
+ @Override
+ public String getTuple(String value)
+ {
+ return value;
+ }
+
+ @Override
+ public String queryToRetrieveData()
+ {
+ return query;
+ }
+
+ /*
+ * Query to retrieve data from Splunk.
+ */
+ public String getQuery()
+ {
+ return query;
+ }
+
+ public void setQuery(String query)
+ {
+ this.query = query;
+ }
+}