blob: 86ccb923e8bcaa304d14708affd2a751574e3c18 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.lib.appdata.snapshot;
import java.util.List;
import java.util.Map;
import org.codehaus.jettison.json.JSONObject;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.KryoCloneUtils;
public class AppDataSnapshotServerMapTest
{
public static final String SIMPLE_RESULT = "{\"id\":\"1\"," +
"\"type\":\"dataQuery\",\"data\":[{\"count\":\"2\",\"word\":\"a\"},{\"count\":\"3\"," +
"\"word\":\"b\"}],\"countdown\":10}";
public static final String SIMPLE_QUERY = "{\"id\": \"1\",\n"
+ "\"type\": \"dataQuery\",\n"
+ "\"data\": {\n"
+ "\"fields\": [ \"word\", \"count\" ]\n"
+ "},\n"
+ "\"countdown\":10\n"
+ "}";
public static final String SIMPLE_SCHEMA = "{\n"
+ " \"values\": [{\"name\": \"word\", \"type\": \"string\"},\n"
+ " {\"name\": \"count\", \"type\": \"integer\"}]\n"
+ "}";
@Test
public void simpleTest() throws Exception
{
AppDataSnapshotServerMap snapshotServer = new AppDataSnapshotServerMap();
snapshotServer.setSnapshotSchemaJSON(SIMPLE_SCHEMA);
//// Input Data
List<Map<String, Object>> dataList = Lists.newArrayList();
Map<String, Object> data = Maps.newHashMap();
data.put("word", "a");
data.put("count", 2);
Map<String, Object> data1 = Maps.newHashMap();
data1.put("word", "b");
data1.put("count", 3);
dataList.add(data);
dataList.add(data1);
////
CollectorTestSink<String> resultSink = new CollectorTestSink<String>();
@SuppressWarnings({"unchecked", "rawtypes"})
CollectorTestSink<Object> tempResultSink = (CollectorTestSink)resultSink;
snapshotServer.queryResult.setSink(tempResultSink);
snapshotServer.setup(null);
snapshotServer.beginWindow(0L);
snapshotServer.input.put(dataList);
snapshotServer.endWindow();
snapshotServer.beginWindow(1L);
snapshotServer.query.put(SIMPLE_QUERY);
snapshotServer.endWindow();
String result = (String)tempResultSink.collectedTuples.get(0);
Assert.assertEquals("Should get only 1 result back", 1, tempResultSink.collectedTuples.size());
Assert.assertEquals("Countdown incorrect", 10, new JSONObject(result).getInt("countdown"));
Assert.assertEquals("Num results", 2, new JSONObject(result).getJSONArray("data").length());
JSONObject firstRow = new JSONObject(result).getJSONArray("data").getJSONObject(0);
JSONObject secondRow = new JSONObject(result).getJSONArray("data").getJSONObject(1);
Assert.assertEquals(2, firstRow.getInt("count"));
Assert.assertEquals("a", firstRow.getString("word"));
Assert.assertEquals(3, secondRow.getInt("count"));
Assert.assertEquals("b", secondRow.getString("word"));
//Test serialization
KryoCloneUtils.cloneObject(snapshotServer);
}
private static final Logger LOG = LoggerFactory.getLogger(AppDataSnapshotServerMapTest.class);
}