blob: 20ae203b9af27f984788b2a3f46367af50f45a9d [file] [log] [blame]
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.select;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.ObjectArrays;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequences;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Druids;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
import io.druid.query.TableDataSource;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.dimension.ExtractionDimensionSpec;
import io.druid.query.lookup.LookupExtractionFn;
import io.druid.query.extraction.MapLookupExtractor;
import io.druid.query.filter.AndDimFilter;
import io.druid.query.filter.DimFilter;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.query.spec.LegacySegmentSpec;
import io.druid.query.spec.QuerySegmentSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
*/
@RunWith(Parameterized.class)
public class SelectQueryRunnerTest
{
// copied from druid.sample.tsv
public static final String[] V_0112 = {
"2011-01-12T00:00:00.000Z spot automotive preferred apreferred 100.000000",
"2011-01-12T00:00:00.000Z spot business preferred bpreferred 100.000000",
"2011-01-12T00:00:00.000Z spot entertainment preferred epreferred 100.000000",
"2011-01-12T00:00:00.000Z spot health preferred hpreferred 100.000000",
"2011-01-12T00:00:00.000Z spot mezzanine preferred mpreferred 100.000000",
"2011-01-12T00:00:00.000Z spot news preferred npreferred 100.000000",
"2011-01-12T00:00:00.000Z spot premium preferred ppreferred 100.000000",
"2011-01-12T00:00:00.000Z spot technology preferred tpreferred 100.000000",
"2011-01-12T00:00:00.000Z spot travel preferred tpreferred 100.000000",
"2011-01-12T00:00:00.000Z total_market mezzanine preferred mpreferred 1000.000000",
"2011-01-12T00:00:00.000Z total_market premium preferred ppreferred 1000.000000",
"2011-01-12T00:00:00.000Z upfront mezzanine preferred mpreferred 800.000000 value",
"2011-01-12T00:00:00.000Z upfront premium preferred ppreferred 800.000000 value"
};
public static final String[] V_0113 = {
"2011-01-13T00:00:00.000Z spot automotive preferred apreferred 94.874713",
"2011-01-13T00:00:00.000Z spot business preferred bpreferred 103.629399",
"2011-01-13T00:00:00.000Z spot entertainment preferred epreferred 110.087299",
"2011-01-13T00:00:00.000Z spot health preferred hpreferred 114.947403",
"2011-01-13T00:00:00.000Z spot mezzanine preferred mpreferred 104.465767",
"2011-01-13T00:00:00.000Z spot news preferred npreferred 102.851683",
"2011-01-13T00:00:00.000Z spot premium preferred ppreferred 108.863011",
"2011-01-13T00:00:00.000Z spot technology preferred tpreferred 111.356672",
"2011-01-13T00:00:00.000Z spot travel preferred tpreferred 106.236928",
"2011-01-13T00:00:00.000Z total_market mezzanine preferred mpreferred 1040.945505",
"2011-01-13T00:00:00.000Z total_market premium preferred ppreferred 1689.012875",
"2011-01-13T00:00:00.000Z upfront mezzanine preferred mpreferred 826.060182 value",
"2011-01-13T00:00:00.000Z upfront premium preferred ppreferred 1564.617729 value"
};
public static final QuerySegmentSpec I_0112_0114 = new LegacySegmentSpec(
new Interval("2011-01-12/2011-01-14")
);
public static final String[] V_0112_0114 = ObjectArrays.concat(V_0112, V_0113, String.class);
private static final SelectQueryQueryToolChest toolChest = new SelectQueryQueryToolChest(
new DefaultObjectMapper(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
);
@Parameterized.Parameters(name = "{0}:descending={1}")
public static Iterable<Object[]> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.cartesian(
QueryRunnerTestHelper.makeQueryRunners(
new SelectQueryRunnerFactory(
toolChest,
new SelectQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
),
Arrays.asList(false, true)
);
}
private final QueryRunner runner;
private final boolean descending;
public SelectQueryRunnerTest(QueryRunner runner, boolean descending)
{
this.runner = runner;
this.descending = descending;
}
private Druids.SelectQueryBuilder newTestQuery() {
return Druids.newSelectQueryBuilder()
.dataSource(new TableDataSource(QueryRunnerTestHelper.dataSource))
.dimensionSpecs(DefaultDimensionSpec.toSpec(Arrays.<String>asList()))
.metrics(Arrays.<String>asList())
.intervals(QueryRunnerTestHelper.fullOnInterval)
.granularity(QueryRunnerTestHelper.allGran)
.pagingSpec(PagingSpec.newSpec(3))
.descending(descending);
}
@Test
public void testFullOnSelect()
{
SelectQuery query = newTestQuery()
.intervals(I_0112_0114)
.build();
HashMap<String, Object> context = new HashMap<String, Object>();
Iterable<Result<SelectResultValue>> results = Sequences.toList(
runner.run(query, context),
Lists.<Result<SelectResultValue>>newArrayList()
);
PagingOffset offset = query.getPagingOffset(QueryRunnerTestHelper.segmentId);
List<Result<SelectResultValue>> expectedResults = toExpected(
toEvents(new String[]{EventHolder.timestampKey + ":TIME"}, V_0112_0114),
offset.startOffset(),
offset.threshold()
);
verify(expectedResults, results);
}
@Test
public void testSequentialPaging()
{
int[] asc = {2, 5, 8, 11, 14, 17, 20, 23, 25};
int[] dsc = {-3, -6, -9, -12, -15, -18, -21, -24, -26};
int[] expected = descending ? dsc : asc;
SelectQuery query = newTestQuery().intervals(I_0112_0114).build();
for (int offset : expected) {
List<Result<SelectResultValue>> results = Sequences.toList(
runner.run(query, ImmutableMap.of()),
Lists.<Result<SelectResultValue>>newArrayList()
);
Assert.assertEquals(1, results.size());
SelectResultValue result = results.get(0).getValue();
Map<String, Integer> pagingIdentifiers = result.getPagingIdentifiers();
Assert.assertEquals(offset, pagingIdentifiers.get(QueryRunnerTestHelper.segmentId).intValue());
Map<String, Integer> next = PagingSpec.next(pagingIdentifiers, descending);
query = query.withPagingSpec(new PagingSpec(next, 3));
}
query = newTestQuery().intervals(I_0112_0114).build();
for (int offset : expected) {
List<Result<SelectResultValue>> results = Sequences.toList(
runner.run(query, ImmutableMap.of()),
Lists.<Result<SelectResultValue>>newArrayList()
);
Assert.assertEquals(1, results.size());
SelectResultValue result = results.get(0).getValue();
Map<String, Integer> pagingIdentifiers = result.getPagingIdentifiers();
Assert.assertEquals(offset, pagingIdentifiers.get(QueryRunnerTestHelper.segmentId).intValue());
// use identifier as-is but with fromNext=true
query = query.withPagingSpec(new PagingSpec(pagingIdentifiers, 3, true));
}
}
@Test
public void testFullOnSelectWithDimensionSpec()
{
Map<String, String> map = new HashMap<>();
map.put("automotive", "automotive0");
map.put("business", "business0");
map.put("entertainment", "entertainment0");
map.put("health", "health0");
map.put("mezzanine", "mezzanine0");
map.put("news", "news0");
map.put("premium", "premium0");
map.put("technology", "technology0");
map.put("travel", "travel0");
SelectQuery query = newTestQuery()
.dimensionSpecs(
Arrays.<DimensionSpec>asList(
new DefaultDimensionSpec(QueryRunnerTestHelper.marketDimension, "mar"),
new ExtractionDimensionSpec(
QueryRunnerTestHelper.qualityDimension,
"qual",
new LookupExtractionFn(new MapLookupExtractor(map, true), false, null, true, false)
),
new DefaultDimensionSpec(QueryRunnerTestHelper.placementDimension, "place")
)
)
.build();
HashMap<String, Object> context = new HashMap<String, Object>();
Iterable<Result<SelectResultValue>> results = Sequences.toList(
runner.run(query, context),
Lists.<Result<SelectResultValue>>newArrayList()
);
List<Result<SelectResultValue>> expectedResultsAsc = Arrays.asList(
new Result<SelectResultValue>(
new DateTime("2011-01-12T00:00:00.000Z"),
new SelectResultValue(
ImmutableMap.of(QueryRunnerTestHelper.segmentId, 2),
Arrays.asList(
new EventHolder(
QueryRunnerTestHelper.segmentId,
0,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z"))
.put("mar", "spot")
.put("qual", "automotive0")
.put("place", "preferred")
.put(QueryRunnerTestHelper.indexMetric, 100.000000F)
.build()
),
new EventHolder(
QueryRunnerTestHelper.segmentId,
1,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z"))
.put("mar", "spot")
.put("qual", "business0")
.put("place", "preferred")
.put(QueryRunnerTestHelper.indexMetric, 100.000000F)
.build()
),
new EventHolder(
QueryRunnerTestHelper.segmentId,
2,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z"))
.put("mar", "spot")
.put("qual", "entertainment0")
.put("place", "preferred")
.put(QueryRunnerTestHelper.indexMetric, 100.000000F)
.build()
)
)
)
)
);
List<Result<SelectResultValue>> expectedResultsDsc = Arrays.asList(
new Result<SelectResultValue>(
new DateTime("2011-01-12T00:00:00.000Z"),
new SelectResultValue(
ImmutableMap.of(QueryRunnerTestHelper.segmentId, -3),
Arrays.asList(
new EventHolder(
QueryRunnerTestHelper.segmentId,
-1,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-04-15T00:00:00.000Z"))
.put("mar", "upfront")
.put("qual", "premium0")
.put("place", "preferred")
.put(QueryRunnerTestHelper.indexMetric, 780.27197265625F)
.build()
),
new EventHolder(
QueryRunnerTestHelper.segmentId,
-2,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-04-15T00:00:00.000Z"))
.put("mar", "upfront")
.put("qual", "mezzanine0")
.put("place", "preferred")
.put(QueryRunnerTestHelper.indexMetric, 962.731201171875F)
.build()
),
new EventHolder(
QueryRunnerTestHelper.segmentId,
-3,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-04-15T00:00:00.000Z"))
.put("mar", "total_market")
.put("qual", "premium0")
.put("place", "preferred")
.put(QueryRunnerTestHelper.indexMetric, 1029.0570068359375F)
.build()
)
)
)
)
);
verify(descending ? expectedResultsDsc : expectedResultsAsc, results);
}
@Test
public void testSelectWithDimsAndMets()
{
SelectQuery query = newTestQuery()
.intervals(I_0112_0114)
.dimensionSpecs(DefaultDimensionSpec.toSpec(QueryRunnerTestHelper.marketDimension))
.metrics(Arrays.asList(QueryRunnerTestHelper.indexMetric))
.build();
HashMap<String, Object> context = new HashMap<String, Object>();
Iterable<Result<SelectResultValue>> results = Sequences.toList(
runner.run(query, context),
Lists.<Result<SelectResultValue>>newArrayList()
);
PagingOffset offset = query.getPagingOffset(QueryRunnerTestHelper.segmentId);
List<Result<SelectResultValue>> expectedResults = toExpected(
toEvents(
new String[]{
EventHolder.timestampKey + ":TIME",
QueryRunnerTestHelper.marketDimension + ":STRING",
null,
null,
null,
QueryRunnerTestHelper.indexMetric + ":FLOAT"
},
V_0112_0114
),
offset.startOffset(),
offset.threshold()
);
verify(expectedResults, results);
}
@Test
public void testSelectPagination()
{
SelectQuery query = newTestQuery()
.intervals(I_0112_0114)
.dimensionSpecs(DefaultDimensionSpec.toSpec(QueryRunnerTestHelper.qualityDimension))
.metrics(Arrays.asList(QueryRunnerTestHelper.indexMetric))
.pagingSpec(new PagingSpec(toPagingIdentifier(3, descending), 3))
.build();
Iterable<Result<SelectResultValue>> results = Sequences.toList(
runner.run(query, Maps.newHashMap()),
Lists.<Result<SelectResultValue>>newArrayList()
);
PagingOffset offset = query.getPagingOffset(QueryRunnerTestHelper.segmentId);
List<Result<SelectResultValue>> expectedResults = toExpected(
toEvents(
new String[]{
EventHolder.timestampKey + ":TIME",
"foo:NULL",
"foo2:NULL"
},
V_0112_0114
),
offset.startOffset(),
offset.threshold()
);
verify(expectedResults, results);
}
@Test
public void testFullOnSelectWithFilter()
{
// startDelta + threshold pairs
for (int[] param : new int[][]{{3, 3}, {0, 1}, {5, 5}, {2, 7}, {3, 0}}) {
SelectQuery query = newTestQuery()
.intervals(I_0112_0114)
.filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null))
.granularity(QueryRunnerTestHelper.dayGran)
.dimensionSpecs(DefaultDimensionSpec.toSpec(QueryRunnerTestHelper.qualityDimension))
.metrics(Lists.<String>newArrayList(QueryRunnerTestHelper.indexMetric))
.pagingSpec(new PagingSpec(toPagingIdentifier(param[0], descending), param[1]))
.build();
HashMap<String, Object> context = new HashMap<String, Object>();
Iterable<Result<SelectResultValue>> results = Sequences.toList(
runner.run(query, context),
Lists.<Result<SelectResultValue>>newArrayList()
);
final List<List<Map<String, Object>>> events = toEvents(
new String[]{
EventHolder.timestampKey + ":TIME",
null,
QueryRunnerTestHelper.qualityDimension + ":STRING",
null,
null,
QueryRunnerTestHelper.indexMetric + ":FLOAT"
},
// filtered values with day granularity
new String[]{
"2011-01-12T00:00:00.000Z spot automotive preferred apreferred 100.000000",
"2011-01-12T00:00:00.000Z spot business preferred bpreferred 100.000000",
"2011-01-12T00:00:00.000Z spot entertainment preferred epreferred 100.000000",
"2011-01-12T00:00:00.000Z spot health preferred hpreferred 100.000000",
"2011-01-12T00:00:00.000Z spot mezzanine preferred mpreferred 100.000000",
"2011-01-12T00:00:00.000Z spot news preferred npreferred 100.000000",
"2011-01-12T00:00:00.000Z spot premium preferred ppreferred 100.000000",
"2011-01-12T00:00:00.000Z spot technology preferred tpreferred 100.000000",
"2011-01-12T00:00:00.000Z spot travel preferred tpreferred 100.000000"
},
new String[]{
"2011-01-13T00:00:00.000Z spot automotive preferred apreferred 94.874713",
"2011-01-13T00:00:00.000Z spot business preferred bpreferred 103.629399",
"2011-01-13T00:00:00.000Z spot entertainment preferred epreferred 110.087299",
"2011-01-13T00:00:00.000Z spot health preferred hpreferred 114.947403",
"2011-01-13T00:00:00.000Z spot mezzanine preferred mpreferred 104.465767",
"2011-01-13T00:00:00.000Z spot news preferred npreferred 102.851683",
"2011-01-13T00:00:00.000Z spot premium preferred ppreferred 108.863011",
"2011-01-13T00:00:00.000Z spot technology preferred tpreferred 111.356672",
"2011-01-13T00:00:00.000Z spot travel preferred tpreferred 106.236928"
}
);
PagingOffset offset = query.getPagingOffset(QueryRunnerTestHelper.segmentId);
List<Result<SelectResultValue>> expectedResults = toExpected(
events,
offset.startOffset(),
offset.threshold()
);
verify(expectedResults, results);
}
}
@Test
public void testSelectWithFilterLookupExtractionFn () {
Map<String, String> extractionMap = new HashMap<>();
extractionMap.put("total_market","replaced");
MapLookupExtractor mapLookupExtractor = new MapLookupExtractor(extractionMap, false);
LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(mapLookupExtractor, false, null, true, true);
SelectQuery query = newTestQuery()
.intervals(I_0112_0114)
.filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "replaced", lookupExtractionFn))
.granularity(QueryRunnerTestHelper.dayGran)
.dimensionSpecs(DefaultDimensionSpec.toSpec(QueryRunnerTestHelper.qualityDimension))
.metrics(Lists.<String>newArrayList(QueryRunnerTestHelper.indexMetric))
.build();
Iterable<Result<SelectResultValue>> results = Sequences.toList(
runner.run(query, Maps.newHashMap()),
Lists.<Result<SelectResultValue>>newArrayList()
);
Iterable<Result<SelectResultValue>> resultsOptimize = Sequences.toList(
toolChest.postMergeQueryDecoration(toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner))).
run(query, Maps.<String, Object>newHashMap()), Lists.<Result<SelectResultValue>>newArrayList()
);
final List<List<Map<String, Object>>> events = toEvents(
new String[]{
EventHolder.timestampKey + ":TIME",
null,
QueryRunnerTestHelper.qualityDimension + ":STRING",
null,
null,
QueryRunnerTestHelper.indexMetric + ":FLOAT"
},
// filtered values with day granularity
new String[]{
"2011-01-12T00:00:00.000Z total_market mezzanine preferred mpreferred 1000.000000",
"2011-01-12T00:00:00.000Z total_market premium preferred ppreferred 1000.000000"
},
new String[]{
"2011-01-13T00:00:00.000Z total_market mezzanine preferred mpreferred 1040.945505",
"2011-01-13T00:00:00.000Z total_market premium preferred ppreferred 1689.012875"
}
);
PagingOffset offset = query.getPagingOffset(QueryRunnerTestHelper.segmentId);
List<Result<SelectResultValue>> expectedResults = toExpected(
events,
offset.startOffset(),
offset.threshold()
);
verify(expectedResults, results);
verify(expectedResults, resultsOptimize);
}
@Test
public void testFullSelectNoResults()
{
SelectQuery query = newTestQuery()
.intervals(I_0112_0114)
.filters(
new AndDimFilter(
Arrays.<DimFilter>asList(
new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null),
new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "foo", null)
)
)
)
.build();
Iterable<Result<SelectResultValue>> results = Sequences.toList(
runner.run(query, Maps.newHashMap()),
Lists.<Result<SelectResultValue>>newArrayList()
);
List<Result<SelectResultValue>> expectedResults = Arrays.asList(
new Result<SelectResultValue>(
new DateTime("2011-01-12T00:00:00.000Z"),
new SelectResultValue(
ImmutableMap.<String, Integer>of(),
Lists.<EventHolder>newArrayList()
)
)
);
verify(expectedResults, results);
}
@Test
public void testFullSelectNoDimensionAndMetric()
{
SelectQuery query = newTestQuery()
.intervals(I_0112_0114)
.dimensionSpecs(DefaultDimensionSpec.toSpec("foo"))
.metrics(Lists.<String>newArrayList("foo2"))
.build();
Iterable<Result<SelectResultValue>> results = Sequences.toList(
runner.run(query, Maps.newHashMap()),
Lists.<Result<SelectResultValue>>newArrayList()
);
final List<List<Map<String, Object>>> events = toEvents(
new String[]{
EventHolder.timestampKey + ":TIME",
"foo:NULL",
"foo2:NULL"
},
V_0112_0114
);
PagingOffset offset = query.getPagingOffset(QueryRunnerTestHelper.segmentId);
List<Result<SelectResultValue>> expectedResults = toExpected(
events,
offset.startOffset(),
offset.threshold()
);
verify(expectedResults, results);
}
private Map<String, Integer> toPagingIdentifier(int startDelta, boolean descending)
{
return ImmutableMap.of(
QueryRunnerTestHelper.segmentId,
PagingOffset.toOffset(startDelta, descending)
);
}
private List<List<Map<String, Object>>> toEvents(final String[] dimSpecs, final String[]... valueSet)
{
List<List<Map<String, Object>>> events = Lists.newArrayList();
for (String[] values : valueSet) {
events.add(
Lists.newArrayList(
Iterables.transform(
Arrays.asList(values), new Function<String, Map<String, Object>>()
{
@Override
public Map<String, Object> apply(String input)
{
Map<String, Object> event = Maps.newHashMap();
String[] values = input.split("\\t");
for (int i = 0; i < dimSpecs.length; i++) {
if (dimSpecs[i] == null || i >= dimSpecs.length) {
continue;
}
String[] specs = dimSpecs[i].split(":");
event.put(
specs[0],
specs[1].equals("TIME") ? new DateTime(values[i]) :
specs[1].equals("FLOAT") ? Float.valueOf(values[i]) :
specs[1].equals("DOUBLE") ? Double.valueOf(values[i]) :
specs[1].equals("LONG") ? Long.valueOf(values[i]) :
specs[1].equals("NULL") ? null :
values[i]
);
}
return event;
}
}
)
)
);
}
return events;
}
private List<Result<SelectResultValue>> toExpected(
List<List<Map<String, Object>>> targets,
final int offset,
final int threshold
)
{
if (offset < 0) {
targets = Lists.reverse(targets);
}
List<Result<SelectResultValue>> expected = Lists.newArrayListWithExpectedSize(targets.size());
for (List<Map<String, Object>> group : targets) {
List<EventHolder> holders = Lists.newArrayListWithExpectedSize(threshold);
int newOffset = offset;
if (offset < 0) {
int start = group.size() + offset;
int end = Math.max(-1, start - threshold);
for (int i = start; i > end; i--) {
holders.add(new EventHolder(QueryRunnerTestHelper.segmentId, newOffset--, group.get(i)));
}
} else {
int end = Math.min(group.size(), offset + threshold);
for (int i = offset; i < end; i++) {
holders.add(new EventHolder(QueryRunnerTestHelper.segmentId, newOffset++, group.get(i)));
}
}
int lastOffset = holders.isEmpty() ? offset : holders.get(holders.size() - 1).getOffset();
expected.add(
new Result(
new DateTime(group.get(0).get(EventHolder.timestampKey)),
new SelectResultValue(ImmutableMap.of(QueryRunnerTestHelper.segmentId, lastOffset), holders)
)
);
}
return expected;
}
private static void verify(
Iterable<Result<SelectResultValue>> expectedResults,
Iterable<Result<SelectResultValue>> actualResults
)
{
Iterator<Result<SelectResultValue>> expectedIter = expectedResults.iterator();
Iterator<Result<SelectResultValue>> actualIter = actualResults.iterator();
while (expectedIter.hasNext()) {
Result<SelectResultValue> expected = expectedIter.next();
Result<SelectResultValue> actual = actualIter.next();
Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp());
for (Map.Entry<String, Integer> entry : expected.getValue().getPagingIdentifiers().entrySet()) {
Assert.assertEquals(entry.getValue(), actual.getValue().getPagingIdentifiers().get(entry.getKey()));
}
Iterator<EventHolder> expectedEvts = expected.getValue().getEvents().iterator();
Iterator<EventHolder> actualEvts = actual.getValue().getEvents().iterator();
while (expectedEvts.hasNext()) {
EventHolder exHolder = expectedEvts.next();
EventHolder acHolder = actualEvts.next();
Assert.assertEquals(exHolder.getTimestamp(), acHolder.getTimestamp());
Assert.assertEquals(exHolder.getOffset(), acHolder.getOffset());
for (Map.Entry<String, Object> ex : exHolder.getEvent().entrySet()) {
Object actVal = acHolder.getEvent().get(ex.getKey());
// work around for current II limitations
if (acHolder.getEvent().get(ex.getKey()) instanceof Double) {
actVal = ((Double) actVal).floatValue();
}
Assert.assertEquals("invalid value for " + ex.getKey(), ex.getValue(), actVal);
}
}
if (actualEvts.hasNext()) {
throw new ISE("This event iterator should be exhausted!");
}
}
if (actualIter.hasNext()) {
throw new ISE("This iterator should be exhausted!");
}
}
}