blob: 225fd2301dcf030d7d714c699efbb0bd4ee7ea1b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.state.spillable;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
import org.apache.apex.malhar.lib.utils.serde.StringSerde;
import com.google.common.collect.Lists;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.util.KryoCloneUtils;
import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
public class SpillableArrayListImplTest
{
public static final byte[] ID1 = new byte[]{(byte)0};
public static final byte[] ID2 = new byte[]{(byte)1};
@Rule
public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
@Test
public void simpleAddGetAndSetTest1()
{
InMemSpillableStateStore store = new InMemSpillableStateStore();
simpleAddGetAndSetTest1Helper(store);
}
@Test
public void simpleAddGetAndSetManagedStateTest1()
{
simpleAddGetAndSetTest1Helper(testMeta.store);
}
public void simpleAddGetAndSetTest1Helper(SpillableStateStore store)
{
SpillableArrayListImpl<String> list = new SpillableArrayListImpl<>(0L, ID1, store,
new StringSerde(), 1);
store.setup(testMeta.operatorContext);
list.setup(testMeta.operatorContext);
long windowId = 0L;
store.beginWindow(windowId);
list.beginWindow(windowId);
SpillableTestUtils.checkOutOfBounds(list, 0);
Assert.assertEquals(0, list.size());
list.add("a");
SpillableTestUtils.checkOutOfBounds(list, 1);
Assert.assertEquals(1, list.size());
Assert.assertEquals("a", list.get(0));
list.addAll(Lists.newArrayList("a", "b", "c"));
Assert.assertEquals(4, list.size());
Assert.assertEquals("a", list.get(0));
Assert.assertEquals("a", list.get(1));
Assert.assertEquals("b", list.get(2));
Assert.assertEquals("c", list.get(3));
SpillableTestUtils.checkOutOfBounds(list, 4);
list.endWindow();
store.endWindow();
store.beforeCheckpoint(windowId);
store.checkpointed(windowId);
store.committed(windowId);
windowId++;
store.beginWindow(windowId);
list.beginWindow(windowId);
SpillableTestUtils.checkValue(store, 0L, ID1, 0, Lists.newArrayList("a"));
SpillableTestUtils.checkValue(store, 0L, ID1, 1, Lists.newArrayList("a"));
SpillableTestUtils.checkValue(store, 0L, ID1, 2, Lists.newArrayList("b"));
SpillableTestUtils.checkValue(store, 0L, ID1, 3, Lists.newArrayList("c"));
Assert.assertEquals(4, list.size());
Assert.assertEquals("a", list.get(0));
Assert.assertEquals("a", list.get(1));
Assert.assertEquals("b", list.get(2));
Assert.assertEquals("c", list.get(3));
list.add("tt");
list.add("ab");
list.add("99");
list.add("oo");
Assert.assertEquals("tt", list.get(4));
Assert.assertEquals("ab", list.get(5));
Assert.assertEquals("99", list.get(6));
Assert.assertEquals("oo", list.get(7));
list.set(1, "111");
Assert.assertEquals("a", list.get(0));
Assert.assertEquals("111", list.get(1));
Assert.assertEquals("b", list.get(2));
Assert.assertEquals("c", list.get(3));
Assert.assertEquals("tt", list.get(4));
Assert.assertEquals("ab", list.get(5));
Assert.assertEquals("99", list.get(6));
Assert.assertEquals("oo", list.get(7));
list.endWindow();
store.endWindow();
store.beforeCheckpoint(windowId);
store.checkpointed(windowId);
store.committed(windowId);
windowId++;
store.beginWindow(windowId);
list.beginWindow(windowId);
SpillableTestUtils.checkValue(store, 0L, ID1, 0, Lists.newArrayList("a"));
SpillableTestUtils.checkValue(store, 0L, ID1, 1, Lists.newArrayList("111"));
SpillableTestUtils.checkValue(store, 0L, ID1, 2, Lists.newArrayList("b"));
SpillableTestUtils.checkValue(store, 0L, ID1, 3, Lists.newArrayList("c"));
SpillableTestUtils.checkValue(store, 0L, ID1, 4, Lists.newArrayList("tt"));
SpillableTestUtils.checkValue(store, 0L, ID1, 5, Lists.newArrayList("ab"));
SpillableTestUtils.checkValue(store, 0L, ID1, 6, Lists.newArrayList("99"));
SpillableTestUtils.checkValue(store, 0L, ID1, 7, Lists.newArrayList("oo"));
list.endWindow();
store.endWindow();
store.beforeCheckpoint(windowId);
store.checkpointed(windowId);
store.committed(windowId);
list.teardown();
store.teardown();
}
@Test
public void simpleAddGetAndSetTest3()
{
InMemSpillableStateStore store = new InMemSpillableStateStore();
simpleAddGetAndSetTest3Helper(store);
}
@Test
public void simpleAddGetAndSetManagedStateTest3()
{
simpleAddGetAndSetTest3Helper(testMeta.store);
}
private void simpleAddGetAndSetTest3Helper(SpillableStateStore store)
{
SpillableArrayListImpl<String> list = new SpillableArrayListImpl<>(0L, ID1, store,
new StringSerde(), 3);
store.setup(testMeta.operatorContext);
list.setup(testMeta.operatorContext);
long windowId = 0L;
store.beginWindow(windowId);
list.beginWindow(windowId);
SpillableTestUtils.checkOutOfBounds(list, 0);
Assert.assertEquals(0, list.size());
list.add("a");
SpillableTestUtils.checkOutOfBounds(list, 1);
Assert.assertEquals(1, list.size());
Assert.assertEquals("a", list.get(0));
list.addAll(Lists.newArrayList("a", "b", "c", "d", "e", "f", "g"));
Assert.assertEquals(8, list.size());
Assert.assertEquals("a", list.get(0));
Assert.assertEquals("a", list.get(1));
Assert.assertEquals("b", list.get(2));
Assert.assertEquals("c", list.get(3));
Assert.assertEquals("d", list.get(4));
Assert.assertEquals("e", list.get(5));
Assert.assertEquals("f", list.get(6));
Assert.assertEquals("g", list.get(7));
SpillableTestUtils.checkOutOfBounds(list, 20);
list.endWindow();
store.endWindow();
store.beforeCheckpoint(windowId);
store.checkpointed(windowId);
store.committed(windowId);
windowId++;
store.beginWindow(windowId);
list.beginWindow(windowId);
SpillableTestUtils.checkValue(store, 0L, ID1, 0, Lists.newArrayList("a", "a", "b"));
SpillableTestUtils.checkValue(store, 0L, ID1, 1, Lists.newArrayList("c", "d", "e"));
SpillableTestUtils.checkValue(store, 0L, ID1, 2, Lists.newArrayList("f", "g"));
Assert.assertEquals(8, list.size());
Assert.assertEquals("a", list.get(0));
Assert.assertEquals("a", list.get(1));
Assert.assertEquals("b", list.get(2));
Assert.assertEquals("c", list.get(3));
Assert.assertEquals("d", list.get(4));
Assert.assertEquals("e", list.get(5));
Assert.assertEquals("f", list.get(6));
Assert.assertEquals("g", list.get(7));
list.add("tt");
list.add("ab");
list.add("99");
list.add("oo");
Assert.assertEquals("tt", list.get(8));
Assert.assertEquals("ab", list.get(9));
Assert.assertEquals("99", list.get(10));
Assert.assertEquals("oo", list.get(11));
list.endWindow();
store.endWindow();
store.beforeCheckpoint(windowId);
store.checkpointed(windowId);
store.committed(windowId);
windowId++;
store.beginWindow(windowId);
list.beginWindow(windowId);
SpillableTestUtils.checkValue(store, 0L, ID1, 0, Lists.newArrayList("a", "a", "b"));
SpillableTestUtils.checkValue(store, 0L, ID1, 1, Lists.newArrayList("c", "d", "e"));
SpillableTestUtils.checkValue(store, 0L, ID1, 2, Lists.newArrayList("f", "g", "tt"));
SpillableTestUtils.checkValue(store, 0L, ID1, 3, Lists.newArrayList("ab", "99", "oo"));
list.set(1, "111");
list.set(3, "222");
list.set(5, "333");
list.set(11, "444");
Assert.assertEquals("a", list.get(0));
Assert.assertEquals("111", list.get(1));
Assert.assertEquals("b", list.get(2));
Assert.assertEquals("222", list.get(3));
Assert.assertEquals("d", list.get(4));
Assert.assertEquals("333", list.get(5));
Assert.assertEquals("f", list.get(6));
Assert.assertEquals("g", list.get(7));
Assert.assertEquals("tt", list.get(8));
Assert.assertEquals("ab", list.get(9));
Assert.assertEquals("99", list.get(10));
Assert.assertEquals("444", list.get(11));
list.endWindow();
store.endWindow();
store.beforeCheckpoint(windowId);
store.checkpointed(windowId);
store.committed(windowId);
windowId++;
store.beginWindow(windowId);
list.beginWindow(windowId);
SpillableTestUtils.checkValue(store, 0L, ID1, 0, Lists.newArrayList("a", "111", "b"));
SpillableTestUtils.checkValue(store, 0L, ID1, 1, Lists.newArrayList("222", "d", "333"));
SpillableTestUtils.checkValue(store, 0L, ID1, 2, Lists.newArrayList("f", "g", "tt"));
SpillableTestUtils.checkValue(store, 0L, ID1, 3, Lists.newArrayList("ab", "99", "444"));
list.endWindow();
store.endWindow();
store.beforeCheckpoint(windowId);
store.checkpointed(windowId);
store.committed(windowId);
list.teardown();
store.teardown();
}
@Test
public void simpleMultiListTest()
{
InMemSpillableStateStore store = new InMemSpillableStateStore();
simpleMultiListTestHelper(store);
}
@Test
public void simpleMultiListManagedStateTest()
{
simpleMultiListTestHelper(testMeta.store);
}
public void simpleMultiListTestHelper(SpillableStateStore store)
{
SpillableArrayListImpl<String> list1 = new SpillableArrayListImpl<>(0L, ID1, store,
new StringSerde(), 1);
SpillableArrayListImpl<String> list2 = new SpillableArrayListImpl<>(0L, ID2, store,
new StringSerde(), 1);
store.setup(testMeta.operatorContext);
list1.setup(testMeta.operatorContext);
list2.setup(testMeta.operatorContext);
long windowId = 0L;
store.beginWindow(windowId);
list1.beginWindow(windowId);
list2.beginWindow(windowId);
SpillableTestUtils.checkOutOfBounds(list1, 0);
Assert.assertEquals(0, list1.size());
list1.add("a");
SpillableTestUtils.checkOutOfBounds(list2, 0);
list2.add("2a");
SpillableTestUtils.checkOutOfBounds(list1, 1);
SpillableTestUtils.checkOutOfBounds(list2, 1);
Assert.assertEquals(1, list1.size());
Assert.assertEquals(1, list2.size());
Assert.assertEquals("a", list1.get(0));
Assert.assertEquals("2a", list2.get(0));
list1.addAll(Lists.newArrayList("a", "b", "c"));
list2.addAll(Lists.newArrayList("2a", "2b"));
Assert.assertEquals(4, list1.size());
Assert.assertEquals(3, list2.size());
Assert.assertEquals("a", list1.get(0));
Assert.assertEquals("a", list1.get(1));
Assert.assertEquals("b", list1.get(2));
Assert.assertEquals("c", list1.get(3));
Assert.assertEquals("2a", list2.get(0));
Assert.assertEquals("2a", list2.get(1));
Assert.assertEquals("2b", list2.get(2));
SpillableTestUtils.checkOutOfBounds(list1, 4);
SpillableTestUtils.checkOutOfBounds(list2, 3);
list1.endWindow();
list2.endWindow();
store.endWindow();
store.beforeCheckpoint(windowId);
store.checkpointed(windowId);
store.committed(windowId);
SpillableTestUtils.checkValue(store, 0L, ID1, 0, Lists.newArrayList("a"));
SpillableTestUtils.checkValue(store, 0L, ID1, 1, Lists.newArrayList("a"));
SpillableTestUtils.checkValue(store, 0L, ID1, 2, Lists.newArrayList("b"));
SpillableTestUtils.checkValue(store, 0L, ID1, 3, Lists.newArrayList("c"));
SpillableTestUtils.checkValue(store, 0L, ID2, 0, Lists.newArrayList("2a"));
SpillableTestUtils.checkValue(store, 0L, ID2, 1, Lists.newArrayList("2a"));
SpillableTestUtils.checkValue(store, 0L, ID2, 2, Lists.newArrayList("2b"));
windowId++;
store.beginWindow(windowId);
list1.beginWindow(windowId);
list2.beginWindow(windowId);
Assert.assertEquals(4, list1.size());
Assert.assertEquals(3, list2.size());
Assert.assertEquals("a", list1.get(0));
Assert.assertEquals("a", list1.get(1));
Assert.assertEquals("b", list1.get(2));
Assert.assertEquals("c", list1.get(3));
Assert.assertEquals("2a", list2.get(0));
Assert.assertEquals("2a", list2.get(1));
Assert.assertEquals("2b", list2.get(2));
list1.add("tt");
list1.add("ab");
list1.add("99");
list1.add("oo");
list2.add("2tt");
list2.add("2ab");
Assert.assertEquals("tt", list1.get(4));
Assert.assertEquals("ab", list1.get(5));
Assert.assertEquals("99", list1.get(6));
Assert.assertEquals("oo", list1.get(7));
Assert.assertEquals("2tt", list2.get(3));
Assert.assertEquals("2ab", list2.get(4));
list1.set(1, "111");
list2.set(1, "2111");
Assert.assertEquals("a", list1.get(0));
Assert.assertEquals("111", list1.get(1));
Assert.assertEquals("b", list1.get(2));
Assert.assertEquals("c", list1.get(3));
Assert.assertEquals("tt", list1.get(4));
Assert.assertEquals("ab", list1.get(5));
Assert.assertEquals("99", list1.get(6));
Assert.assertEquals("oo", list1.get(7));
Assert.assertEquals("2a", list2.get(0));
Assert.assertEquals("2111", list2.get(1));
Assert.assertEquals("2b", list2.get(2));
Assert.assertEquals("2tt", list2.get(3));
Assert.assertEquals("2ab", list2.get(4));
list1.endWindow();
list2.endWindow();
store.endWindow();
store.beforeCheckpoint(windowId);
store.checkpointed(windowId);
store.committed(windowId);
windowId++;
store.beginWindow(windowId);
list1.beginWindow(windowId);
list2.beginWindow(windowId);
SpillableTestUtils.checkValue(store, 0L, ID1, 0, Lists.newArrayList("a"));
SpillableTestUtils.checkValue(store, 0L, ID1, 1, Lists.newArrayList("111"));
SpillableTestUtils.checkValue(store, 0L, ID1, 2, Lists.newArrayList("b"));
SpillableTestUtils.checkValue(store, 0L, ID1, 3, Lists.newArrayList("c"));
SpillableTestUtils.checkValue(store, 0L, ID1, 4, Lists.newArrayList("tt"));
SpillableTestUtils.checkValue(store, 0L, ID1, 5, Lists.newArrayList("ab"));
SpillableTestUtils.checkValue(store, 0L, ID1, 6, Lists.newArrayList("99"));
SpillableTestUtils.checkValue(store, 0L, ID1, 7, Lists.newArrayList("oo"));
SpillableTestUtils.checkValue(store, 0L, ID2, 0, Lists.newArrayList("2a"));
SpillableTestUtils.checkValue(store, 0L, ID2, 1, Lists.newArrayList("2111"));
SpillableTestUtils.checkValue(store, 0L, ID2, 2, Lists.newArrayList("2b"));
SpillableTestUtils.checkValue(store, 0L, ID2, 3, Lists.newArrayList("2tt"));
SpillableTestUtils.checkValue(store, 0L, ID2, 4, Lists.newArrayList("2ab"));
list1.endWindow();
list2.endWindow();
store.endWindow();
store.beforeCheckpoint(windowId);
store.checkpointed(windowId);
store.committed(windowId);
list1.teardown();
list2.teardown();
store.teardown();
}
@Test
public void recoveryManagedStateTest()
{
SpillableStateStore store = testMeta.store;
SpillableArrayListImpl<String> list = new SpillableArrayListImpl<>(0L, ID1, store,
new StringSerde(), 3);
store.setup(testMeta.operatorContext);
list.setup(testMeta.operatorContext);
long windowId = 0L;
store.beginWindow(windowId);
list.beginWindow(windowId);
SpillableTestUtils.checkOutOfBounds(list, 0);
list.add("a");
list.addAll(Lists.newArrayList("a", "b", "c", "d", "e", "f", "g"));
Assert.assertEquals(8, list.size());
list.endWindow();
store.endWindow();
windowId++;
store.beginWindow(windowId);
list.beginWindow(windowId);
list.add("tt");
list.add("ab");
list.add("99");
list.add("oo");
list.endWindow();
store.endWindow();
store.beforeCheckpoint(windowId);
store.checkpointed(windowId);
store.committed(windowId);
windowId++;
store.beginWindow(windowId);
list.beginWindow(windowId);
list.set(1, "111");
list.set(3, "222");
list.set(5, "333");
list.set(11, "444");
list.endWindow();
store.endWindow();
windowId++;
store.beginWindow(windowId);
list.beginWindow(windowId);
list.endWindow();
store.endWindow();
store.beforeCheckpoint(windowId);
long activationWindow = windowId;
SpillableArrayListImpl<String> clonedList = KryoCloneUtils.cloneObject(list);
store.checkpointed(windowId);
store.committed(windowId);
windowId++;
store.beginWindow(windowId);
list.beginWindow(windowId);
list.set(1, "111111");
list.set(3, "222222");
list.add("xyz");
list.endWindow();
store.endWindow();
list.teardown();
store.teardown();
Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);
attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, activationWindow);
OperatorContext context = mockOperatorContext(testMeta.operatorContext.getId(), attributes);
list = clonedList;
store = clonedList.getStore();
store.setup(context);
list.setup(context);
windowId = activationWindow + 1L;
store.beginWindow(windowId);
list.beginWindow(windowId);
Assert.assertEquals("a", list.get(0));
Assert.assertEquals("111", list.get(1));
Assert.assertEquals("b", list.get(2));
Assert.assertEquals("222", list.get(3));
Assert.assertEquals("d", list.get(4));
Assert.assertEquals("333", list.get(5));
Assert.assertEquals("f", list.get(6));
Assert.assertEquals("g", list.get(7));
Assert.assertEquals("tt", list.get(8));
Assert.assertEquals("ab", list.get(9));
Assert.assertEquals("99", list.get(10));
Assert.assertEquals("444", list.get(11));
Assert.assertEquals(12, list.size());
list.endWindow();
store.endWindow();
list.teardown();
store.teardown();
}
}