Merge pull request #3 from apache/testing
Unreleased, not yet ready for actual inspection. Will start requesting reviews shortly.
diff --git a/LICENSE b/LICENSE
index f927b76..861f99b 100644
--- a/LICENSE
+++ b/LICENSE
@@ -203,8 +203,3 @@
See the License for the specific language governing permissions and
limitations under the License.
-------------------------------------------------------------
-
-
-
-APPENDIX B: Additional licenses relevant to this product:
- (none)
diff --git a/NOTICE b/NOTICE
index 01aca2a..8ff8dcf 100644
--- a/NOTICE
+++ b/NOTICE
@@ -1,5 +1,5 @@
-Apache DataSketches Server
-Copyright 2020 - The Apache Software Foundation
+Apache DataSketches-server
+Copyright 2021 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
diff --git a/README.md b/README.md
index 9b03b92..14f5224 100644
--- a/README.md
+++ b/README.md
@@ -10,9 +10,22 @@
The repository is still in an early state and lacks both unit tests and
robust documentation.
+## Installation and Running
+
+The server requires Java 8. There is no pre-built jar currently, so the server must be built
+locally. Running `mvn package` will download the relevant dependencies, including the
+DataSketches Java library and the Jetty web server. Once built, a stand-alone server is launched
+by running
+```
+java -cp <path_to_jar> org.apache.datasketches.server.SketchServer <json_config_file>
+```
+where the most convenient option will likely be the jar-with-dependencies. The configuration
+file format is described below.
+
+
## Interaction
-Configuration and interaction with the server are done via JSON. We will
+Configuration and interaction with the server happen via JSON. We will
demonstrate the main features of the server by creating a simple configuration
and using that for some example requests and responses.
@@ -21,7 +34,7 @@
of an error partway through an array the server will return an error, but the
effects of any requests processed up to that point will be retained.
-JSON input may be passed in via either POST or GET.
+The server accepts JSON input presented via either POST or GET.
### Sketch Families
diff --git a/pom.xml b/pom.xml
index 0f066bc..720cf15 100644
--- a/pom.xml
+++ b/pom.xml
@@ -65,6 +65,13 @@
<version>9.4.31.v20200723</version>
</dependency>
+ <!-- Annotation -->
+ <dependency>
+ <groupId>org.checkerframework</groupId>
+ <artifactId>checker-qual</artifactId>
+ <version>3.10.0</version>
+ </dependency>
+
<!--Testing-->
<dependency>
<groupId>org.testng</groupId>
diff --git a/src/main/java/org/apache/datasketches/server/BaseSketchesQueryHandler.java b/src/main/java/org/apache/datasketches/server/BaseSketchesQueryHandler.java
index a3b1b8e..211733a 100644
--- a/src/main/java/org/apache/datasketches/server/BaseSketchesQueryHandler.java
+++ b/src/main/java/org/apache/datasketches/server/BaseSketchesQueryHandler.java
@@ -103,15 +103,6 @@
*/
protected abstract JsonObject processQuery(JsonObject query);
- /**
- * Internal method to synchronize calls to subclasses
- * @param query A JSON query to process
- * @return A JSON response
- */
- final synchronized JsonObject callProcessQuery(final JsonObject query) {
- return processQuery(query);
- }
-
@Override
public void handle(final String target,
final Request baseRequest,
@@ -130,10 +121,10 @@
try {
if (query == null) {
- result = callProcessQuery(null);
+ result = processQuery(null);
} else if (query.isJsonArray()) {
for (final JsonElement subQuery : query.getAsJsonArray()) {
- final JsonObject subResult = callProcessQuery(subQuery.getAsJsonObject());
+ final JsonObject subResult = processQuery(subQuery.getAsJsonObject());
if (subResult != null) {
// lazy initialization to avoid possibly empty array
if (result == null) {
@@ -143,7 +134,8 @@
}
}
} else {
- result = callProcessQuery((JsonObject) query);
+ //result = callProcessQuery((JsonObject) query);
+ result = processQuery((JsonObject) query);
}
if (result != null) {
diff --git a/src/main/java/org/apache/datasketches/server/DataQueryHandler.java b/src/main/java/org/apache/datasketches/server/DataQueryHandler.java
index 42f9aca..cac8f58 100644
--- a/src/main/java/org/apache/datasketches/server/DataQueryHandler.java
+++ b/src/main/java/org/apache/datasketches/server/DataQueryHandler.java
@@ -59,35 +59,39 @@
throw new IllegalArgumentException("Invalid sketch name: " + key);
}
- final SketchStorage.SketchEntry se = sketches.getSketch(key);
JsonObject result = new JsonObject();
- // pre-populate with the sketch name, but may be overwritten with
- // null depending on the queyr
- result.addProperty(QUERY_NAME_FIELD, key);
+ // we do need to lock the sketch even for query processing
+ synchronized (key.intern()) {
+ final SketchStorage.SketchEntry se = sketches.getSketch(key);
- switch (se.family) {
- case UNION:
- case HLL:
- case CPC:
- result = processDistinctQuery(result, query, se.family, se.sketch);
- break;
+ // pre-populate with the sketch name, but may be overwritten with
+ // null depending on the query
+ result.addProperty(QUERY_NAME_FIELD, key);
- case KLL:
- result = processQuantilesQuery(result, query, se.family, se.sketch);
- break;
+ switch (se.family_) {
+ case UNION:
+ case HLL:
+ case CPC:
+ result = processDistinctQuery(result, query, se.family_, se.sketch_);
+ break;
- case FREQUENCY:
- result = processFrequencyQuery(result, query, se.family, se.sketch);
- break;
+ case KLL:
+ result = processQuantilesQuery(result, query, se.family_, se.sketch_);
+ break;
- case RESERVOIR:
- case VAROPT:
- result = processSamplingQuery(result, query, se.family, se.sketch);
- break;
+ case FREQUENCY:
+ result = processFrequencyQuery(result, query, se.family_, se.sketch_);
+ break;
- default:
- throw new IllegalStateException("Unexpected sketch family: " + se.family);
+ case RESERVOIR:
+ case VAROPT:
+ result = processSamplingQuery(result, query, se.family_, se.sketch_);
+ break;
+
+ default:
+ throw new IllegalStateException("Unexpected sketch family: " + se.family_);
+ }
}
return result;
diff --git a/src/main/java/org/apache/datasketches/server/MergeHandler.java b/src/main/java/org/apache/datasketches/server/MergeHandler.java
index f3c7fcf..d75fad1 100644
--- a/src/main/java/org/apache/datasketches/server/MergeHandler.java
+++ b/src/main/java/org/apache/datasketches/server/MergeHandler.java
@@ -64,6 +64,20 @@
super(sketches);
}
+ /**
+ * Holds an entry in the list of sketches to merge.
+ * <tt>name.intern()</tt> can be used to synchronize the sketch to avoid concurrency issues
+ */
+ static class MergeEntry {
+ final String name_;
+ final Object sketch_;
+
+ MergeEntry(final String name, final Object sketch) {
+ name_ = name;
+ sketch_ = sketch;
+ }
+ }
+
@Override
protected JsonObject processQuery(final JsonObject query) {
// optional targets:
@@ -95,15 +109,23 @@
Family dstFamily = null;
if (dst != null) {
se = sketches.getSketch(dst);
- dstFamily = se.family;
+ dstFamily = se.family_;
}
// we'll process (and dedup) any stored sketches before we handle encoded inputs
// but we'll run through all of them before doing anything
- final ArrayList<Object> srcSketches = new ArrayList<>(srcList.size());
+ final ArrayList<MergeEntry> srcSketches = new ArrayList<>(srcList.size());
dstFamily = prepareSketches(srcList, dstFamily, dst, srcSketches);
- final byte[] skBytes = mergeSketches(dstFamily, k, se, srcSketches);
+ final byte[] skBytes;
+ // need to synchronize if we have a named sketch
+ if (se == null) {
+ skBytes = mergeSketches(dstFamily, k, null, srcSketches);
+ } else {
+ synchronized (se.name_.intern()) {
+ skBytes = mergeSketches(dstFamily, k, se, srcSketches);
+ }
+ }
// skBytes == null if merging into another sketch; only non-null if returning a serialized image
if (skBytes != null) {
@@ -115,7 +137,7 @@
}
}
- private Family prepareSketches(final JsonArray sources, Family family, final String dst, final ArrayList<Object> sketchList) {
+ private Family prepareSketches(final JsonArray sources, Family family, final String dst, final ArrayList<MergeEntry> sketchList) {
final HashSet<String> namedSet = new HashSet<>();
// TODO: Check for sketch value types with distinct counting?
@@ -131,7 +153,7 @@
// check family
final String key = elmt.getAsString();
final SketchStorage.SketchEntry entry = sketches.getSketch(key);
- if (entry == null || (family != null && family != entry.family)) {
+ if (entry == null || (family != null && family != entry.family_)) {
throw new SketchesException("Input sketches must exist and be of the same family as the target");
}
@@ -139,14 +161,14 @@
if (!namedSet.contains(key)) {
namedSet.add(key);
if (family == null) {
- family = entry.family;
+ family = entry.family_;
}
// if we have a theta Union we need to get the result first
- if (entry.family == Family.UNION) {
- sketchList.add(((Union) entry.sketch).getResult());
+ if (entry.family_ == Family.UNION) {
+ sketchList.add(new MergeEntry(key, ((Union) entry.sketch_).getResult()));
} else {
- sketchList.add(entry.sketch);
+ sketchList.add(new MergeEntry(key, entry.sketch_));
}
}
} else { // is JsonObject
@@ -166,8 +188,9 @@
}
// add to list, save family if we didn't have one yet
+ // use hashcode of sketch as name -- we'll later create a needless lock but cleaner than conditional locking
final Object sketch = deserializeSketch(skFamily, skString);
- sketchList.add(sketch);
+ sketchList.add(new MergeEntry(Integer.toString(sketch.hashCode()), sketch));
if (family == null) {
family = skFamily;
}
@@ -213,7 +236,7 @@
@SuppressWarnings("unchecked")
private static byte[] mergeSketches(final Family family, final int k,
- final SketchStorage.SketchEntry dstEntry, final ArrayList<Object> sketchList) {
+ final SketchStorage.SketchEntry dstEntry, final ArrayList<MergeEntry> sketchList) {
if (family == null || sketchList.size() == 0) {
return null;
}
@@ -223,15 +246,17 @@
case QUICKSELECT: {
// for theta, the destination is already a union so no need to add explicitly
final Union dst = dstEntry == null ? new SetOperationBuilder().setNominalEntries(1 << k).buildUnion()
- : (Union) dstEntry.sketch;
- for (final Object obj : sketchList) {
- dst.union((CompactSketch) obj);
+ : (Union) dstEntry.sketch_;
+ for (final MergeEntry me : sketchList) {
+ synchronized (me.name_.intern()) {
+ dst.union((CompactSketch) me.sketch_);
+ }
}
if (dstEntry == null) {
return dst.getResult().toByteArray();
} else {
- dstEntry.sketch = dst;
+ dstEntry.sketch_ = dst;
return null;
}
}
@@ -239,7 +264,7 @@
case HLL: {
final org.apache.datasketches.hll.Union union = new org.apache.datasketches.hll.Union(k);
if (dstEntry != null) {
- union.update((HllSketch) dstEntry.sketch);
+ union.update((HllSketch) dstEntry.sketch_);
}
for (final Object obj : sketchList) {
union.update((HllSketch) obj);
@@ -248,7 +273,7 @@
if (dstEntry == null) {
return union.getResult().toCompactByteArray();
} else {
- dstEntry.sketch = union.getResult();
+ dstEntry.sketch_ = union.getResult();
return null;
}
}
@@ -256,7 +281,7 @@
case CPC: {
final CpcUnion union = new CpcUnion(k);
if (dstEntry != null) {
- union.update((CpcSketch) dstEntry.sketch);
+ union.update((CpcSketch) dstEntry.sketch_);
}
for (final Object obj : sketchList) {
union.update((CpcSketch) obj);
@@ -265,14 +290,14 @@
if (dstEntry == null) {
return union.getResult().toByteArray();
} else {
- dstEntry.sketch = union.getResult();
+ dstEntry.sketch_ = union.getResult();
return null;
}
}
case KLL: {
// Only merge(), no separate union. Slightly abusing terminology to call it union
- final KllFloatsSketch union = dstEntry == null ? new KllFloatsSketch(k) : (KllFloatsSketch) dstEntry.sketch;
+ final KllFloatsSketch union = dstEntry == null ? new KllFloatsSketch(k) : (KllFloatsSketch) dstEntry.sketch_;
for (final Object obj : sketchList) {
union.merge((KllFloatsSketch) obj);
@@ -281,14 +306,14 @@
if (dstEntry == null) {
return union.toByteArray();
} else {
- dstEntry.sketch = union;
+ dstEntry.sketch_ = union;
return null;
}
}
case FREQUENCY: {
// Only merge(), no separate union. Slightly abusing terminology to call it union
- final ItemsSketch<String> union = dstEntry == null ? new ItemsSketch<>(k) : (ItemsSketch<String>) dstEntry.sketch;
+ final ItemsSketch<String> union = dstEntry == null ? new ItemsSketch<>(k) : (ItemsSketch<String>) dstEntry.sketch_;
for (final Object obj : sketchList) {
union.merge((ItemsSketch<String>) obj);
@@ -297,7 +322,7 @@
if (dstEntry == null) {
return union.toByteArray(new ArrayOfStringsSerDe());
} else {
- dstEntry.sketch = union;
+ dstEntry.sketch_ = union;
return null;
}
}
@@ -305,7 +330,7 @@
case RESERVOIR: {
final ReservoirItemsUnion<String> union = ReservoirItemsUnion.newInstance(k);
if (dstEntry != null) {
- union.update((ReservoirItemsSketch<String>) dstEntry.sketch);
+ union.update((ReservoirItemsSketch<String>) dstEntry.sketch_);
}
for (final Object obj : sketchList) {
@@ -315,7 +340,7 @@
if (dstEntry == null) {
return union.getResult().toByteArray(new ArrayOfStringsSerDe());
} else {
- dstEntry.sketch = union.getResult();
+ dstEntry.sketch_ = union.getResult();
return null;
}
}
@@ -323,7 +348,7 @@
case VAROPT: {
final VarOptItemsUnion<String> union = VarOptItemsUnion.newInstance(k);
if (dstEntry != null) {
- union.update((VarOptItemsSketch<String>) dstEntry.sketch);
+ union.update((VarOptItemsSketch<String>) dstEntry.sketch_);
}
for (final Object obj : sketchList) {
@@ -333,7 +358,7 @@
if (dstEntry == null) {
return union.getResult().toByteArray(new ArrayOfStringsSerDe());
} else {
- dstEntry.sketch = union.getResult();
+ dstEntry.sketch_ = union.getResult();
return null;
}
}
diff --git a/src/main/java/org/apache/datasketches/server/ResetHandler.java b/src/main/java/org/apache/datasketches/server/ResetHandler.java
index ec22a47..493d9ab 100644
--- a/src/main/java/org/apache/datasketches/server/ResetHandler.java
+++ b/src/main/java/org/apache/datasketches/server/ResetHandler.java
@@ -50,30 +50,32 @@
throw new IllegalArgumentException("Invalid sketch name: " + key);
}
- final SketchStorage.SketchEntry se = sketches.getSketch(key);
+ synchronized (key.intern()) {
+ final SketchStorage.SketchEntry se = sketches.getSketch(key);
- switch (se.family) {
- case UNION:
- ((Union) se.sketch).reset();
- break;
- case KLL:
- se.sketch = new KllFloatsSketch(se.configK);
- break;
- case FREQUENCY:
- ((ItemsSketch<String>) se.sketch).reset();
- break;
- case HLL:
- ((HllSketch) se.sketch).reset();
- break;
- case CPC:
- ((CpcSketch) se.sketch).reset();
- break;
- case RESERVOIR:
- ((ReservoirItemsSketch<String>) se.sketch).reset();
- break;
- case VAROPT:
- ((VarOptItemsSketch<String>) se.sketch).reset();
- break;
+ switch (se.family_) {
+ case UNION:
+ ((Union) se.sketch_).reset();
+ break;
+ case KLL:
+ se.sketch_ = new KllFloatsSketch(se.configK_);
+ break;
+ case FREQUENCY:
+ ((ItemsSketch<String>) se.sketch_).reset();
+ break;
+ case HLL:
+ ((HllSketch) se.sketch_).reset();
+ break;
+ case CPC:
+ ((CpcSketch) se.sketch_).reset();
+ break;
+ case RESERVOIR:
+ ((ReservoirItemsSketch<String>) se.sketch_).reset();
+ break;
+ case VAROPT:
+ ((VarOptItemsSketch<String>) se.sketch_).reset();
+ break;
+ }
}
// nothing to return from reset calls
diff --git a/src/main/java/org/apache/datasketches/server/SerializationHandler.java b/src/main/java/org/apache/datasketches/server/SerializationHandler.java
index 8c285db..464667b 100644
--- a/src/main/java/org/apache/datasketches/server/SerializationHandler.java
+++ b/src/main/java/org/apache/datasketches/server/SerializationHandler.java
@@ -60,42 +60,47 @@
throw new IllegalArgumentException("Invalid sketch name: " + name);
}
- final SketchStorage.SketchEntry se = sketches.getSketch(name);
-
final byte[] bytes;
- switch (se.family) {
- case UNION:
- bytes = ((Union) se.sketch).getResult().toByteArray();
- break;
- case KLL:
- bytes = ((KllFloatsSketch) se.sketch).toByteArray();
- break;
- case FREQUENCY:
- bytes = ((ItemsSketch<String>) se.sketch).toByteArray(new ArrayOfStringsSerDe());
- break;
- case HLL:
- bytes = ((HllSketch) se.sketch).toCompactByteArray();
- break;
- case CPC:
- bytes = ((CpcSketch) se.sketch).toByteArray();
- break;
- case RESERVOIR:
- bytes = ((ReservoirItemsSketch<String>) se.sketch).toByteArray(new ArrayOfStringsSerDe());
- break;
- case VAROPT:
- bytes = ((VarOptItemsSketch<String>) se.sketch).toByteArray(new ArrayOfStringsSerDe());
- break;
- default:
- throw new IllegalStateException("Unexpected value: " + se.family);
+ final SketchStorage.SketchEntry se;
+
+ // need to lock the sketch even when just reading
+ synchronized (name.intern()) {
+ se = sketches.getSketch(name);
+
+ switch (se.family_) {
+ case UNION:
+ bytes = ((Union) se.sketch_).getResult().toByteArray();
+ break;
+ case KLL:
+ bytes = ((KllFloatsSketch) se.sketch_).toByteArray();
+ break;
+ case FREQUENCY:
+ bytes = ((ItemsSketch<String>) se.sketch_).toByteArray(new ArrayOfStringsSerDe());
+ break;
+ case HLL:
+ bytes = ((HllSketch) se.sketch_).toCompactByteArray();
+ break;
+ case CPC:
+ bytes = ((CpcSketch) se.sketch_).toByteArray();
+ break;
+ case RESERVOIR:
+ bytes = ((ReservoirItemsSketch<String>) se.sketch_).toByteArray(new ArrayOfStringsSerDe());
+ break;
+ case VAROPT:
+ bytes = ((VarOptItemsSketch<String>) se.sketch_).toByteArray(new ArrayOfStringsSerDe());
+ break;
+ default:
+ throw new IllegalStateException("Unexpected value: " + se.family_);
+ }
}
final String b64Sketch = Base64.getUrlEncoder().encodeToString(bytes);
final JsonObject result = new JsonObject();
result.addProperty(QUERY_NAME_FIELD, name);
- result.addProperty(CONFIG_FAMILY_FIELD, se.family.getFamilyName());
- if (se.type != null)
- result.addProperty(CONFIG_TYPE_FIELD, se.type.getTypeName());
+ result.addProperty(CONFIG_FAMILY_FIELD, se.family_.getFamilyName());
+ if (se.type_ != null)
+ result.addProperty(CONFIG_TYPE_FIELD, se.type_.getTypeName());
result.addProperty(QUERY_SKETCH_FIELD, b64Sketch);
return result;
diff --git a/src/main/java/org/apache/datasketches/server/SketchConstants.java b/src/main/java/org/apache/datasketches/server/SketchConstants.java
index 6ef32c8..1e4059b 100644
--- a/src/main/java/org/apache/datasketches/server/SketchConstants.java
+++ b/src/main/java/org/apache/datasketches/server/SketchConstants.java
@@ -21,12 +21,12 @@
public final class SketchConstants {
// API call paths, relative to root
- public static final String UPDATE_PATH = "/update";
- public static final String SERIALIZE_PATH = "/serialize";
- public static final String STATUS_PATH = "/status";
- public static final String QUERY_PATH = "/query";
- public static final String MERGE_PATH = "/merge";
- public static final String RESET_PATH = "/reset";
+ public static final String UPDATE_PATH = "update";
+ public static final String SERIALIZE_PATH = "serialize";
+ public static final String STATUS_PATH = "status";
+ public static final String QUERY_PATH = "query";
+ public static final String MERGE_PATH = "merge";
+ public static final String RESET_PATH = "reset";
// JSON Query/Update/Merge Field Names
public static final String QUERY_NAME_FIELD = "name";
@@ -76,6 +76,7 @@
public static final String RESPONSE_RESULT_MASS = "mass";
public static final String RESPONSE_QUANTILE_LIST = "estimatedQuantiles";
public static final String RESPONSE_RESULT_QUANTILE = "quantile";
+ public static final String RESPONSE_SKETCH_COUNT_FIELD = "count";
// JSON Config Field Names
public static final String CONFIG_PORT_FIELD = "port";
diff --git a/src/main/java/org/apache/datasketches/server/SketchServer.java b/src/main/java/org/apache/datasketches/server/SketchServer.java
index 4fefb9a..e0a8a0e 100644
--- a/src/main/java/org/apache/datasketches/server/SketchServer.java
+++ b/src/main/java/org/apache/datasketches/server/SketchServer.java
@@ -21,6 +21,7 @@
import java.io.IOException;
+import org.checkerframework.checker.nullness.qual.NonNull;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ContextHandler;
@@ -43,36 +44,48 @@
* @param configFile Path to a configuration file following the <tt>SketchServerConfig</tt> format
* @throws IOException on parse errors
*/
- public SketchServer(final String configFile) throws IOException {
+ public SketchServer(@NonNull final String configFile) throws IOException {
config = new SketchServerConfig(configFile);
}
// defines paths and registers the relevant handlers
private void createServer() {
- server = new Server(config.getPort());
+ server = new Server();
+
+ // configure port
+ final ServerConnector http = new ServerConnector(server);
+ http.setHost("localhost");
+ http.setPort(config.getPort());
+ server.addConnector(http);
// Error page unless you have a correct URL
final ContextHandler contextRoot = new ContextHandler("/");
- contextRoot.setContextPath("/");
contextRoot.setErrorHandler(new ErrorHandler());
- final ContextHandler contextStatus = new ContextHandler(STATUS_PATH);
+ // Add specific handlers
+ final ContextHandler contextStatus = new ContextHandler("/" + STATUS_PATH);
contextStatus.setHandler(new StatusHandler(sketches));
+ contextStatus.setAllowNullPathInfo(true);
- final ContextHandler contextSerialize = new ContextHandler(SERIALIZE_PATH);
+ final ContextHandler contextSerialize = new ContextHandler("/" + SERIALIZE_PATH);
contextSerialize.setHandler(new SerializationHandler(sketches));
+ contextSerialize.setAllowNullPathInfo(true);
- final ContextHandler contextUpdate = new ContextHandler(UPDATE_PATH);
+ final ContextHandler contextUpdate = new ContextHandler("/" + UPDATE_PATH);
contextUpdate.setHandler(new UpdateHandler(sketches));
+ contextUpdate.setAllowNullPathInfo(true);
- final ContextHandler contextMerge = new ContextHandler(MERGE_PATH);
+ final ContextHandler contextMerge = new ContextHandler("/" + MERGE_PATH);
contextMerge.setHandler(new MergeHandler(sketches));
+ contextMerge.setAllowNullPathInfo(true);
- final ContextHandler contextQuery = new ContextHandler(QUERY_PATH);
+ final ContextHandler contextQuery = new ContextHandler("/" + QUERY_PATH);
contextQuery.setHandler(new DataQueryHandler(sketches));
+ contextQuery.setAllowNullPathInfo(true);
- final ContextHandler contextReset = new ContextHandler(RESET_PATH);
+ final ContextHandler contextReset = new ContextHandler("/" + RESET_PATH);
contextReset.setHandler(new ResetHandler(sketches));
+ contextReset.setAllowNullPathInfo(true);
final ContextHandlerCollection contexts =
new ContextHandlerCollection(contextRoot,
@@ -116,6 +129,34 @@
return -1;
}
+ /**
+ * Returns the server's running status
+ * @return True for a running server, otherwise false
+ */
+ public boolean isRunning() {
+ return server != null && server.isRunning();
+ }
+
+ /**
+ * Stops the server from running. Cannot be be restarted without creating new sketches.
+ * @throws Exception Upon underlying server throwing an Exception
+ */
+ public void stop() throws Exception {
+ if (server != null) {
+ server.stop();
+ server.isStarted();
+ }
+ }
+
+ /**
+ * Package-private test method to get a specific SketchEntry
+ * @param name The name of the desired sketch
+ * @return The SketchEntry containing the sketch and type info
+ */
+ SketchStorage.SketchEntry getSketch(@NonNull final String name) {
+ return sketches.getSketch(name);
+ }
+
public static void main(final String[] args) throws Exception {
if (args.length < 1) {
System.err.println("Usage: SketchServer <config_file>");
diff --git a/src/main/java/org/apache/datasketches/server/SketchServerConfig.java b/src/main/java/org/apache/datasketches/server/SketchServerConfig.java
index 31a2daf..447e36e 100644
--- a/src/main/java/org/apache/datasketches/server/SketchServerConfig.java
+++ b/src/main/java/org/apache/datasketches/server/SketchServerConfig.java
@@ -24,6 +24,7 @@
import java.io.IOException;
import java.io.Reader;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
@@ -36,6 +37,8 @@
import static org.apache.datasketches.server.SketchConstants.*;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
/**
* A class to hold the server configuration, along with a supporting subclass and file-parsing methods.
*
@@ -54,25 +57,16 @@
this.family = family;
this.type = type;
}
-
- SketchInfo(final String name, final int k, final String family) {
- this(name, k, family, null);
- }
-
}
private int port = DEFAULT_PORT;
private ArrayList<SketchInfo> sketchList;
- SketchServerConfig(final String configFile) throws IOException {
+ SketchServerConfig(@NonNull final String configFile) throws IOException {
final JsonElement config = readJsonFromFile(configFile);
parseConfig(config);
}
- SketchServerConfig(final JsonElement config) throws IOException {
- parseConfig(config);
- }
-
int getPort() {
return port;
}
@@ -83,16 +77,10 @@
// output should have a list with full info per sketch, even if input allows a
// more condensed format
- private static JsonElement readJsonFromFile(final String configFile) {
- JsonElement config = null;
-
- try (final Reader reader = Files.newBufferedReader(Paths.get(configFile))) {
- config = JsonParser.parseReader(reader);
- } catch (final IOException e) {
- e.printStackTrace();
- }
-
- return config;
+ private static JsonElement readJsonFromFile(final String configFile) throws IOException {
+ final Path configPath = Paths.get(configFile);
+ final Reader reader = Files.newBufferedReader(configPath);
+ return JsonParser.parseReader(reader);
}
private void parseConfig(final JsonElement config) throws IOException {
diff --git a/src/main/java/org/apache/datasketches/server/SketchStorage.java b/src/main/java/org/apache/datasketches/server/SketchStorage.java
index c6c8c89..e8bc491 100644
--- a/src/main/java/org/apache/datasketches/server/SketchStorage.java
+++ b/src/main/java/org/apache/datasketches/server/SketchStorage.java
@@ -31,6 +31,8 @@
import org.apache.datasketches.sampling.ReservoirItemsSketch;
import org.apache.datasketches.sampling.VarOptItemsSketch;
import org.apache.datasketches.theta.SetOperationBuilder;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
@@ -43,7 +45,8 @@
* order to ensure that data is presented in a consistent way.
*/
public class SketchStorage {
- private static final String SKETCH_COUNT_NAME = "count";
+ // the set of SketchEntries held by this object
+ HashMap<String, SketchEntry> sketchMap;
/**
* Returns true if the sketch family is for distinct counting.
@@ -55,38 +58,44 @@
}
static class SketchEntry {
- public final Family family;
- public final ValueType type;
- public Object sketch;
- public int configK;
+ public final Family family_;
+ public final ValueType type_;
+ public Object sketch_;
+ public final String name_;
+ public int configK_;
- SketchEntry(final Family family, final ValueType type, final Object sketch, final int configK) throws IllegalArgumentException {
+ SketchEntry(@NonNull final Family family,
+ final ValueType type,
+ @NonNull final Object sketch,
+ @NonNull final String name,
+ final int configK) throws IllegalArgumentException {
if (isDistinctCounting(family) && type == null)
throw new IllegalArgumentException("Must specify a value type for distinct counting sketches");
- this.family = family;
- this.type = type;
- this.sketch = sketch;
- this.configK = configK;
+ family_ = family;
+ type_ = type;
+ sketch_ = sketch;
+ name_ = name;
+ configK_ = configK;
}
- SketchEntry(final Family family, final Object sketch, final int configK) throws IllegalArgumentException {
+ SketchEntry(@NonNull final Family family,
+ @NonNull final Object sketch,
+ @NonNull final String name,
+ final int configK) throws IllegalArgumentException {
if (isDistinctCounting(family))
throw new IllegalArgumentException("Must specify a value type for distinct counting sketches");
- this.family = family;
- this.type = null;
- this.sketch = sketch;
- this.configK = configK;
+ family_ = family;
+ type_ = null;
+ sketch_ = sketch;
+ name_ = name;
+ configK_ = configK;
}
}
- HashMap<String, SketchEntry> sketchMap;
-
- SketchStorage(final List<SketchServerConfig.SketchInfo> sketchList) {
- if (sketchList != null) {
- createSketches(sketchList);
- }
+ SketchStorage(@NonNull final List<SketchServerConfig.SketchInfo> sketchList) {
+ createSketches(sketchList);
}
JsonObject listSketches() {
@@ -96,17 +105,17 @@
for (final Map.Entry<String, SketchEntry> e : sketchMap.entrySet()) {
final JsonObject item = new JsonObject();
item.addProperty(CONFIG_SKETCH_NAME_FIELD, e.getKey());
- switch (e.getValue().family) {
+ switch (e.getValue().family_) {
case UNION:
- item.addProperty(CONFIG_TYPE_FIELD, e.getValue().type.getTypeName());
+ item.addProperty(CONFIG_TYPE_FIELD, e.getValue().type_.getTypeName());
item.addProperty(CONFIG_FAMILY_FIELD, SKETCH_FAMILY_THETA);
break;
case CPC:
- item.addProperty(CONFIG_TYPE_FIELD, e.getValue().type.getTypeName());
+ item.addProperty(CONFIG_TYPE_FIELD, e.getValue().type_.getTypeName());
item.addProperty(CONFIG_FAMILY_FIELD, SKETCH_FAMILY_CPC);
break;
case HLL:
- item.addProperty(CONFIG_TYPE_FIELD, e.getValue().type.getTypeName());
+ item.addProperty(CONFIG_TYPE_FIELD, e.getValue().type_.getTypeName());
item.addProperty(CONFIG_FAMILY_FIELD, SKETCH_FAMILY_HLL);
break;
case FREQUENCY:
@@ -125,7 +134,7 @@
sketchList.add(item);
}
- summary.addProperty(SKETCH_COUNT_NAME, sketchMap.size());
+ summary.addProperty(RESPONSE_SKETCH_COUNT_FIELD, sketchMap.size());
summary.add(SketchConstants.CONFIG_SKETCHES_PREFIX, sketchList); // bare prefix, sketches fully qualified
return summary;
@@ -150,38 +159,39 @@
SketchEntry sketchEntry = null;
final Family family = BaseSketchesQueryHandler.familyFromString(info.family);
+ final int k = info.k; // to reduce derferences in code later
switch (family) {
case QUICKSELECT:
// make a Union so we can handle merges later
sketchEntry = new SketchEntry(Family.UNION, ValueType.stringToType(info.type),
- new SetOperationBuilder().setNominalEntries(1 << info.k).buildUnion(), info.k);
+ new SetOperationBuilder().setNominalEntries(1 << k).buildUnion(), info.name, k);
break;
case HLL:
sketchEntry = new SketchEntry(Family.HLL, ValueType.stringToType(info.type),
- new HllSketch(info.k), info.k);
+ new HllSketch(k), info.name, k);
break;
case CPC:
sketchEntry = new SketchEntry(Family.CPC, ValueType.stringToType(info.type),
- new CpcSketch(info.k), info.k);
+ new CpcSketch(k), info.name, k);
break;
case KLL:
- sketchEntry = new SketchEntry(Family.KLL, new KllFloatsSketch(info.k), info.k);
+ sketchEntry = new SketchEntry(Family.KLL, new KllFloatsSketch(k), info.name, k);
break;
case FREQUENCY:
- sketchEntry = new SketchEntry(Family.FREQUENCY, new ItemsSketch<String>(info.k), info.k);
+ sketchEntry = new SketchEntry(Family.FREQUENCY, new ItemsSketch<String>(k), info.name, k);
break;
case RESERVOIR:
- sketchEntry = new SketchEntry(Family.RESERVOIR, ReservoirItemsSketch.<String>newInstance(info.k), info.k);
+ sketchEntry = new SketchEntry(Family.RESERVOIR, ReservoirItemsSketch.<String>newInstance(k), info.name, k);
break;
case VAROPT:
- sketchEntry = new SketchEntry(Family.VAROPT, VarOptItemsSketch.<String>newInstance(info.k), info.k);
+ sketchEntry = new SketchEntry(Family.VAROPT, VarOptItemsSketch.<String>newInstance(k), info.name, k);
break;
}
diff --git a/src/main/java/org/apache/datasketches/server/UpdateHandler.java b/src/main/java/org/apache/datasketches/server/UpdateHandler.java
index eb8c97a..84dc15a 100644
--- a/src/main/java/org/apache/datasketches/server/UpdateHandler.java
+++ b/src/main/java/org/apache/datasketches/server/UpdateHandler.java
@@ -80,10 +80,12 @@
throw new IllegalArgumentException("Attempt to call update with missing name or sketch not found");
}
- if (data.isJsonArray()) {
- processBatchUpdate(se, data.getAsJsonArray());
- } else {
- processSingleUpdate(se, data);
+ synchronized (name.intern()) {
+ if (data.isJsonArray()) {
+ processBatchUpdate(se, data.getAsJsonArray());
+ } else {
+ processSingleUpdate(se, data);
+ }
}
}
@@ -94,54 +96,54 @@
@SuppressWarnings("unchecked")
private static void processBatchUpdate(final SketchStorage.SketchEntry entry,
final JsonArray data) {
- switch (entry.family) {
+ switch (entry.family_) {
case UNION: // theta
- assert(entry.type != null);
- switch (entry.type) {
+ assert(entry.type_ != null);
+ switch (entry.type_) {
case FLOAT: case DOUBLE:
- for (final JsonElement e : data) { ((Union) entry.sketch).update(e.getAsDouble()); }
+ for (final JsonElement e : data) { ((Union) entry.sketch_).update(e.getAsDouble()); }
break;
case INT: case LONG:
- for (final JsonElement e : data) { ((Union) entry.sketch).update(e.getAsLong()); }
+ for (final JsonElement e : data) { ((Union) entry.sketch_).update(e.getAsLong()); }
break;
case STRING: default:
- for (final JsonElement e : data) { ((Union) entry.sketch).update(e.getAsString()); }
+ for (final JsonElement e : data) { ((Union) entry.sketch_).update(e.getAsString()); }
break;
}
break;
case CPC:
- assert(entry.type != null);
- switch (entry.type) {
+ assert(entry.type_ != null);
+ switch (entry.type_) {
case FLOAT: case DOUBLE:
- for (final JsonElement e : data) { ((CpcSketch) entry.sketch).update(e.getAsDouble()); }
+ for (final JsonElement e : data) { ((CpcSketch) entry.sketch_).update(e.getAsDouble()); }
break;
case INT: case LONG:
- for (final JsonElement e : data) { ((CpcSketch) entry.sketch).update(e.getAsLong()); }
+ for (final JsonElement e : data) { ((CpcSketch) entry.sketch_).update(e.getAsLong()); }
break;
case STRING: default:
- for (final JsonElement e : data) { ((CpcSketch) entry.sketch).update(e.getAsString()); }
+ for (final JsonElement e : data) { ((CpcSketch) entry.sketch_).update(e.getAsString()); }
break;
}
break;
case HLL:
- assert(entry.type != null);
- switch (entry.type) {
+ assert(entry.type_ != null);
+ switch (entry.type_) {
case FLOAT: case DOUBLE:
- for (final JsonElement e : data) { ((HllSketch) entry.sketch).update(e.getAsDouble()); }
+ for (final JsonElement e : data) { ((HllSketch) entry.sketch_).update(e.getAsDouble()); }
break;
case INT: case LONG:
- for (final JsonElement e : data) { ((CpcSketch) entry.sketch).update(e.getAsLong()); }
+ for (final JsonElement e : data) { ((CpcSketch) entry.sketch_).update(e.getAsLong()); }
break;
case STRING: default:
- for (final JsonElement e : data) { ((CpcSketch) entry.sketch).update(e.getAsString()); }
+ for (final JsonElement e : data) { ((CpcSketch) entry.sketch_).update(e.getAsString()); }
break;
}
break;
case KLL:
- for (final JsonElement e : data) { ((KllFloatsSketch) entry.sketch).update(e.getAsFloat()); }
+ for (final JsonElement e : data) { ((KllFloatsSketch) entry.sketch_).update(e.getAsFloat()); }
break;
case FREQUENCY:
@@ -154,15 +156,15 @@
}
final String item = inputPair.get(QUERY_PAIR_ITEM_FIELD).getAsString();
final int weight = inputPair.get(QUERY_PAIR_WEIGHT_FIELD).getAsInt();
- ((ItemsSketch<String>) entry.sketch).update(item, weight);
+ ((ItemsSketch<String>) entry.sketch_).update(item, weight);
} else {
- ((ItemsSketch<String>) entry.sketch).update(e.getAsString());
+ ((ItemsSketch<String>) entry.sketch_).update(e.getAsString());
}
}
break;
case RESERVOIR:
- for (final JsonElement e : data) { ((ReservoirItemsSketch<String>) entry.sketch).update(e.getAsString()); }
+ for (final JsonElement e : data) { ((ReservoirItemsSketch<String>) entry.sketch_).update(e.getAsString()); }
break;
case VAROPT:
@@ -175,69 +177,69 @@
}
final String item = inputPair.get(QUERY_PAIR_ITEM_FIELD).getAsString();
final double weight = inputPair.get(QUERY_PAIR_WEIGHT_FIELD).getAsDouble();
- ((VarOptItemsSketch<String>) entry.sketch).update(item, weight);
+ ((VarOptItemsSketch<String>) entry.sketch_).update(item, weight);
} else {
- ((VarOptItemsSketch<String>) entry.sketch).update(e.getAsString(), 1.0);
+ ((VarOptItemsSketch<String>) entry.sketch_).update(e.getAsString(), 1.0);
}
}
break;
default:
- throw new IllegalArgumentException("Unsupported sketch type: " + entry.family);
+ throw new IllegalArgumentException("Unsupported sketch type: " + entry.family_);
}
}
@SuppressWarnings("unchecked")
private static void processSingleUpdate(final SketchStorage.SketchEntry entry,
final JsonElement data) {
- switch (entry.family) {
+ switch (entry.family_) {
case UNION:
- assert(entry.type != null);
- switch (entry.type) {
+ assert(entry.type_ != null);
+ switch (entry.type_) {
case FLOAT: case DOUBLE:
- ((Union) entry.sketch).update(data.getAsDouble());
+ ((Union) entry.sketch_).update(data.getAsDouble());
break;
case INT: case LONG:
- ((Union) entry.sketch).update(data.getAsLong());
+ ((Union) entry.sketch_).update(data.getAsLong());
break;
case STRING: default:
- ((Union) entry.sketch).update(data.getAsString());
+ ((Union) entry.sketch_).update(data.getAsString());
break;
}
break;
case CPC:
- assert(entry.type != null);
- switch (entry.type) {
+ assert(entry.type_ != null);
+ switch (entry.type_) {
case FLOAT: case DOUBLE:
- ((CpcSketch) entry.sketch).update(data.getAsDouble());
+ ((CpcSketch) entry.sketch_).update(data.getAsDouble());
break;
case INT: case LONG:
- ((CpcSketch) entry.sketch).update(data.getAsLong());
+ ((CpcSketch) entry.sketch_).update(data.getAsLong());
break;
case STRING: default:
- ((CpcSketch) entry.sketch).update(data.getAsString());
+ ((CpcSketch) entry.sketch_).update(data.getAsString());
break;
}
break;
case HLL:
- assert(entry.type != null);
- switch (entry.type) {
+ assert(entry.type_ != null);
+ switch (entry.type_) {
case FLOAT: case DOUBLE:
- ((HllSketch) entry.sketch).update(data.getAsDouble());
+ ((HllSketch) entry.sketch_).update(data.getAsDouble());
break;
case INT: case LONG:
- ((HllSketch) entry.sketch).update(data.getAsLong());
+ ((HllSketch) entry.sketch_).update(data.getAsLong());
break;
case STRING: default:
- ((HllSketch) entry.sketch).update(data.getAsString());
+ ((HllSketch) entry.sketch_).update(data.getAsString());
break;
}
break;
case KLL:
- ((KllFloatsSketch) entry.sketch).update(data.getAsFloat());
+ ((KllFloatsSketch) entry.sketch_).update(data.getAsFloat());
break;
case FREQUENCY:
@@ -249,14 +251,14 @@
}
final String item = inputPair.get(QUERY_PAIR_ITEM_FIELD).getAsString();
final int weight = inputPair.get(QUERY_PAIR_WEIGHT_FIELD).getAsInt();
- ((ItemsSketch<String>) entry.sketch).update(item, weight);
+ ((ItemsSketch<String>) entry.sketch_).update(item, weight);
} else {
- ((ItemsSketch<String>) entry.sketch).update(data.getAsString());
+ ((ItemsSketch<String>) entry.sketch_).update(data.getAsString());
}
break;
case RESERVOIR:
- ((ReservoirItemsSketch<String>) entry.sketch).update(data.getAsString());
+ ((ReservoirItemsSketch<String>) entry.sketch_).update(data.getAsString());
break;
case VAROPT:
@@ -268,14 +270,14 @@
}
final String item = inputPair.get(QUERY_PAIR_ITEM_FIELD).getAsString();
final double weight = inputPair.get(QUERY_PAIR_WEIGHT_FIELD).getAsDouble();
- ((VarOptItemsSketch<String>) entry.sketch).update(item, weight);
+ ((VarOptItemsSketch<String>) entry.sketch_).update(item, weight);
} else {
- ((VarOptItemsSketch<String>) entry.sketch).update(data.getAsString(), 1.0);
+ ((VarOptItemsSketch<String>) entry.sketch_).update(data.getAsString(), 1.0);
}
break;
default:
- throw new IllegalArgumentException("Unsupported sketch type: " + entry.family);
+ throw new IllegalArgumentException("Unsupported sketch type: " + entry.family_);
}
}
diff --git a/src/test/java/org/apache/datasketches/server/ServerTestBase.java b/src/test/java/org/apache/datasketches/server/ServerTestBase.java
new file mode 100644
index 0000000..8c0d7f5
--- /dev/null
+++ b/src/test/java/org/apache/datasketches/server/ServerTestBase.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.datasketches.server;
+
+import static org.testng.Assert.fail;
+
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.util.Objects;
+import java.io.DataOutputStream;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+public class ServerTestBase {
+ final static String RESPONSE_FIELD = "response";
+
+ SketchServer server_ = null;
+ String serverUri_ = null;
+
+ @BeforeClass
+ public void launchServer() {
+ final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ server_ = new SketchServer(Objects.requireNonNull(classLoader.getResource("test_config.json")).getFile());
+ server_.start();
+ serverUri_ = server_.getURI();
+ } catch (final Exception e) {
+ fail();
+ }
+ }
+
+ @AfterClass
+ public void shutdownServer() {
+ if (server_ != null) {
+ try {
+ server_.stop();
+ } catch (final Exception e) {
+ fail();
+ }
+ }
+ }
+
+ int postData(@NonNull final String path,
+ @NonNull final JsonObject data,
+ @NonNull final JsonObject response) {
+ HttpURLConnection http = null;
+ int status = -1;
+
+ try {
+ // set up the POST
+ final URL url = new URL(serverUri_ + path);
+ http = (HttpURLConnection) url.openConnection();
+ http.setDoInput(true);
+ http.setDoOutput(true);
+ http.setRequestMethod("POST");
+ http.setRequestProperty("Content-Type", "application/json");
+ http.setRequestProperty("Accept", "application/json");
+
+ final byte[] jsonBytes = data.toString().getBytes(StandardCharsets.UTF_8);
+ http.setRequestProperty("Content-length", Integer.toString(jsonBytes.length));
+
+ // write JSON data to to stream
+ try (final DataOutputStream os = new DataOutputStream(http.getOutputStream())) {
+ os.write(jsonBytes);
+ }
+
+ status = http.getResponseCode();
+ if (status == HttpServletResponse.SC_OK) {
+ // read response, if any, and put into a JSON element
+ try (final InputStreamReader isr = new InputStreamReader(http.getInputStream())) {
+ response.add(RESPONSE_FIELD, JsonParser.parseReader(isr));
+ }
+ }
+ } catch (final IOException e) {
+ fail();
+ } finally {
+ if (http != null)
+ http.disconnect();
+ }
+
+ return status;
+ }
+
+ int getData(@NonNull final String path,
+ @NonNull final JsonObject data,
+ @NonNull final JsonObject response) {
+ HttpURLConnection http = null;
+ int status = -1;
+
+ try {
+ // set up the POST
+ final URL url = new URL(serverUri_ + path + "?" + data);
+ http = (HttpURLConnection) url.openConnection();
+ http.setDoInput(true);
+ http.setRequestProperty("Content-Type", "application/json");
+ http.connect();
+
+ status = http.getResponseCode();
+ if (status == HttpServletResponse.SC_OK) {
+ // read response, if any, and put into a JSON element
+ try (final InputStreamReader isr = new InputStreamReader(http.getInputStream())) {
+ response.add(RESPONSE_FIELD, JsonParser.parseReader(isr));
+ }
+ }
+ } catch (final IOException e) {
+ fail();
+ } finally {
+ if (http != null)
+ http.disconnect();
+ }
+
+ return status;
+ }
+}
diff --git a/src/test/java/org/apache/datasketches/server/SketchServerConfigTest.java b/src/test/java/org/apache/datasketches/server/SketchServerConfigTest.java
new file mode 100644
index 0000000..861d58a
--- /dev/null
+++ b/src/test/java/org/apache/datasketches/server/SketchServerConfigTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.datasketches.server;
+
+import static org.apache.datasketches.server.SketchConstants.DEFAULT_PORT;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import org.testng.annotations.Test;
+
+public class SketchServerConfigTest {
+
+ @Test
+ public void invalidFile() {
+ try {
+ new SketchServerConfig("fileDoesNotExist");
+ fail();
+ } catch (final IOException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void parseTestConfig() {
+ final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ final SketchServerConfig serverConf =
+ new SketchServerConfig(Objects.requireNonNull(classLoader.getResource("config_with_port.json")).getFile());
+ assertEquals(serverConf.getSketchList().size(), 2);
+ assertEquals(serverConf.getPort(), 8080);
+ } catch (final IOException e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void parseJsonArrayConfig() {
+ final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ final SketchServerConfig serverConf =
+ new SketchServerConfig(Objects.requireNonNull(classLoader.getResource("array_config.json")).getFile());
+ assertEquals(serverConf.getSketchList().size(), 2);
+ assertEquals(serverConf.getPort(), DEFAULT_PORT);
+ } catch (final IOException e) {
+ fail();
+ }
+ }
+}
diff --git a/src/test/java/org/apache/datasketches/server/SketchServerTest.java b/src/test/java/org/apache/datasketches/server/SketchServerTest.java
new file mode 100644
index 0000000..4a2a15d
--- /dev/null
+++ b/src/test/java/org/apache/datasketches/server/SketchServerTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.datasketches.server;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import org.testng.annotations.Test;
+
+public class SketchServerTest {
+ @Test
+ public void createServer() {
+ final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ SketchServer server = null;
+ try {
+ server = new SketchServer(Objects.requireNonNull(classLoader.getResource("config_with_port.json")).getFile());
+ } catch (final IOException e) {
+ fail();
+ }
+
+ // check that port and URI are invalid before starting the server
+ assertNotNull(server);
+ assertFalse(server.isRunning());
+ assertNull(server.getURI());
+ assertEquals(server.getPort(), -1);
+ try {
+ server.start();
+ assertTrue(server.isRunning());
+
+ // add the few tests in the try block for code simplicity
+ assertEquals(server.getPort(), 8080);
+ // initial testing suggests it's just using the host's IP address so just checking that the port
+ // is working correctly
+ assertTrue(server.getURI().endsWith(":" + server.getPort() + "/"));
+
+ server.stop();
+ assertFalse(server.isRunning());
+ } catch (final Exception e) {
+ fail();
+ }
+ }
+}
diff --git a/src/test/java/org/apache/datasketches/server/SketchStorageTest.java b/src/test/java/org/apache/datasketches/server/SketchStorageTest.java
new file mode 100644
index 0000000..03f4b5c
--- /dev/null
+++ b/src/test/java/org/apache/datasketches/server/SketchStorageTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.datasketches.server;
+
+import static org.apache.datasketches.server.SketchConstants.RESPONSE_SKETCH_COUNT_FIELD;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import org.apache.datasketches.Family;
+import org.apache.datasketches.cpc.CpcSketch;
+import org.apache.datasketches.hll.HllSketch;
+import org.testng.annotations.Test;
+
+
+import com.google.gson.JsonObject;
+
+public class SketchStorageTest {
+
+ @Test
+ public void invalidSketchEntry() {
+ try {
+ new SketchStorage.SketchEntry(Family.CPC, null, new CpcSketch(12), "cpcSketch", 12);
+ fail();
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+
+ try {
+ new SketchStorage.SketchEntry(Family.HLL, new HllSketch(10), "hllSketch", 10);
+ fail();
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void loadSketches() {
+ final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ SketchServerConfig serverConfig = null;
+ try {
+ serverConfig = new SketchServerConfig(Objects.requireNonNull(classLoader.getResource("test_config.json")).getFile());
+ } catch (final IOException e) {
+ fail();
+ }
+ assertNotNull(serverConfig);
+
+ final SketchStorage storage = new SketchStorage(serverConfig.getSketchList());
+ final JsonObject sketches = storage.listSketches();
+ assertTrue(sketches.has(RESPONSE_SKETCH_COUNT_FIELD));
+ assertEquals(sketches.get(RESPONSE_SKETCH_COUNT_FIELD).getAsInt(), 15);
+ assertTrue(storage.contains("cpcOfNumbers"));
+ }
+}
diff --git a/src/test/java/org/apache/datasketches/server/UpdateHandlerTest.java b/src/test/java/org/apache/datasketches/server/UpdateHandlerTest.java
new file mode 100644
index 0000000..a77369b
--- /dev/null
+++ b/src/test/java/org/apache/datasketches/server/UpdateHandlerTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.datasketches.server;
+
+import static org.apache.datasketches.server.SketchConstants.QUERY_PAIR_ITEM_FIELD;
+import static org.apache.datasketches.server.SketchConstants.QUERY_PAIR_WEIGHT_FIELD;
+import static org.apache.datasketches.server.SketchConstants.UPDATE_PATH;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.datasketches.cpc.CpcSketch;
+import org.testng.annotations.Test;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+
+public class UpdateHandlerTest extends ServerTestBase {
+ /* The tests here are going to be structured very similarly. It might be possible
+ * to find a common framework and reduce the amount o repetition? But not clear with
+ * type erasure as opposed to C++-style templates.
+ */
+
+ @Test
+ public void cpcUpdate() {
+ final JsonObject response = new JsonObject();
+ final String sketchName = "cpcOfNumbers";
+ final int nPoints = 1000;
+
+ // testing using both GET and POST
+ int status;
+
+ JsonObject request = new JsonObject();
+ JsonArray data = new JsonArray();
+ for (int i = 0; i < nPoints; ++i)
+ data.add(i);
+ request.add(sketchName, data);
+ status = postData(UPDATE_PATH, request, response);
+ assertEquals(status, HttpServletResponse.SC_OK);
+
+ request = new JsonObject();
+ data = new JsonArray();
+ for (int i = nPoints; i < 2 * nPoints; ++i)
+ data.add(i);
+ request.add(sketchName, data);
+ status = getData(UPDATE_PATH, request, response);
+ assertEquals(status, HttpServletResponse.SC_OK);
+
+ final JsonElement element = response.get(RESPONSE_FIELD);
+ assertTrue(element.isJsonNull());
+
+ final SketchStorage.SketchEntry entry = server_.getSketch(sketchName);
+ final CpcSketch sk = (CpcSketch) entry.sketch_;
+ assertEquals(sk.getEstimate(), 2 * nPoints, 2 * nPoints * 1e-2);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void fiUpdate() {
+ final JsonObject response = new JsonObject();
+ final String sketchName = "topItems";
+
+ // single item
+ JsonObject request = new JsonObject();
+ request.addProperty(sketchName, "item1");
+ assertEquals(postData(UPDATE_PATH, request, response), HttpServletResponse.SC_OK);
+
+ // item with weight
+ request = new JsonObject();
+ JsonObject data = new JsonObject();
+ data.addProperty(QUERY_PAIR_ITEM_FIELD, "item2");
+ data.addProperty(QUERY_PAIR_WEIGHT_FIELD, 5);
+ request.add(sketchName, data);
+ assertEquals(postData(UPDATE_PATH, request, response), HttpServletResponse.SC_OK);
+
+ // array of items with and without weights
+ request = new JsonObject();
+ final JsonArray dataArray = new JsonArray();
+ dataArray.add("item1"); // increases count to 2
+ data = new JsonObject();
+ data.addProperty(QUERY_PAIR_ITEM_FIELD, "item3");
+ data.addProperty(QUERY_PAIR_WEIGHT_FIELD, 10);
+ dataArray.add(data);
+ request.add(sketchName, dataArray);
+ assertEquals(postData(UPDATE_PATH, request, response), HttpServletResponse.SC_OK);
+
+ final JsonElement element = response.get(RESPONSE_FIELD);
+ assertTrue(element.isJsonNull());
+
+ final SketchStorage.SketchEntry entry = server_.getSketch(sketchName);
+ final org.apache.datasketches.frequencies.ItemsSketch<String> sk = (org.apache.datasketches.frequencies.ItemsSketch<String>) entry.sketch_;
+ assertEquals(sk.getEstimate("item1"), 2);
+ assertEquals(sk.getEstimate("item2"), 5);
+ assertEquals(sk.getEstimate("item3"), 10);
+ }
+
+ @Test
+ public void hllUpdate() {
+ // update multiple sketches from an array
+ }
+
+ @Test
+ public void kllUpdate() {
+
+ }
+
+ @Test
+ public void thetaUpdate() {
+
+ }
+
+ @Test
+ public void reservoirUpdate() {
+
+ }
+
+ @Test
+ public void voUpdate() {
+
+ }
+
+}
diff --git a/src/test/resources/array_config.json b/src/test/resources/array_config.json
new file mode 100644
index 0000000..7c718ee
--- /dev/null
+++ b/src/test/resources/array_config.json
@@ -0,0 +1,12 @@
+[
+ { "name": "cpcOfNumbers",
+ "k": 12,
+ "type": "long",
+ "family": "cpc"
+ },
+ { "name": "cpcOfStrings",
+ "k": 14,
+ "type": "string",
+ "family": "cpc"
+ }
+]
\ No newline at end of file
diff --git a/src/test/resources/config_with_port.json b/src/test/resources/config_with_port.json
new file mode 100644
index 0000000..f148049
--- /dev/null
+++ b/src/test/resources/config_with_port.json
@@ -0,0 +1,15 @@
+{
+ "port": 8080,
+ "sketches_A": [
+ { "name": "cpcOfNumbers",
+ "k": 12,
+ "type": "long",
+ "family": "cpc"
+ },
+ { "name": "cpcOfStrings",
+ "k": 14,
+ "type": "string",
+ "family": "cpc"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/example/example_config.json b/src/test/resources/test_config.json
similarity index 64%
rename from example/example_config.json
rename to src/test/resources/test_config.json
index 2c0b24c..a685c17 100644
--- a/example/example_config.json
+++ b/src/test/resources/test_config.json
@@ -1,20 +1,20 @@
{
- "port": 8080,
- "bookstoreSketches": [
- { "name": "cpc_int",
+ "port": 0,
+ "sketches_A": [
+ { "name": "cpcOfNumbers",
"k": 12,
- "type": "int",
+ "type": "long",
"family": "cpc"
},
- { "name": "cpc_string",
- "k": 12,
+ { "name": "cpcOfStrings",
+ "k": 14,
"type": "string",
"family": "cpc"
- }
+ }
],
- "bookGenreDurationSet": {
+ "set1": {
"family": "hll",
- "type": "double",
+ "type": "string",
"k": 14,
"names": [
"hll1",
@@ -26,7 +26,7 @@
"set2": {
"k": 12,
"family": "theta",
- "type": "float",
+ "type": "int",
"names": [
"theta0",
"theta1",
@@ -36,16 +36,11 @@
]
},
"sketches_B": [
- { "name": "kll",
+ { "name": "duration",
"k": "160",
"family": "kll"
},
- { "name": "cpc",
- "k": "14",
- "type": "string",
- "family": "cpc"
- },
- { "name": "fi",
+ { "name": "topItems",
"k": "128",
"family": "frequency"
},
@@ -59,4 +54,3 @@
}
]
}
-