blob: 0f614a8defbce8197a7c5fba7dc05548903bf152 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.wal;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.apache.apex.malhar.lib.util.TestUtils;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.common.util.Pair;
import static org.apache.apex.malhar.lib.helper.OperatorContextTestHelper.mockOperatorContext;
/**
* Tests for {@link WindowDataManager}
*/
public class FSWindowDataManagerTest
{
private static class TestMeta extends TestWatcher
{
String applicationPath;
Attribute.AttributeMap.DefaultAttributeMap attributes;
@Override
protected void starting(Description description)
{
TestUtils.deleteTargetTestClassFolder(description);
super.starting(description);
applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
attributes = new Attribute.AttributeMap.DefaultAttributeMap();
attributes.put(DAG.APPLICATION_PATH, applicationPath);
}
@Override
protected void finished(Description description)
{
TestUtils.deleteTargetTestClassFolder(description);
}
}
@Rule
public TestMeta testMeta = new TestMeta();
@Test
public void testLargestRecoveryWindow()
{
Pair<Context.OperatorContext, FSWindowDataManager> pair = createManagerAndContextFor(1);
pair.second.setup(pair.first);
Assert.assertEquals("largest recovery", Stateless.WINDOW_ID, pair.second.getLargestCompletedWindow());
pair.second.teardown();
}
@Test
public void testSave() throws IOException
{
Pair<Context.OperatorContext, FSWindowDataManager> pair = createManagerAndContextFor(1);
pair.second.setup(pair.first);
Map<Integer, String> data = Maps.newHashMap();
data.put(1, "one");
data.put(2, "two");
data.put(3, "three");
pair.second.save(data, 1);
pair.second.setup(pair.first);
@SuppressWarnings("unchecked")
Map<Integer, String> artifact = (Map<Integer, String>)pair.second.retrieve(1);
Assert.assertEquals("dataOf1", data, artifact);
pair.second.teardown();
}
@Test
public void testRetrieve() throws IOException
{
Pair<Context.OperatorContext, FSWindowDataManager> pair1 = createManagerAndContextFor(1);
Pair<Context.OperatorContext, FSWindowDataManager> pair2 = createManagerAndContextFor(2);
pair1.second.setup(pair1.first);
pair2.second.setup(pair2.first);
Map<Integer, String> dataOf1 = Maps.newHashMap();
dataOf1.put(1, "one");
dataOf1.put(2, "two");
dataOf1.put(3, "three");
Map<Integer, String> dataOf2 = Maps.newHashMap();
dataOf2.put(4, "four");
dataOf2.put(5, "five");
dataOf2.put(6, "six");
pair1.second.save(dataOf1, 1);
pair2.second.save(dataOf2, 1);
pair1.second.setup(pair1.first);
Object artifact1 = pair1.second.retrieve(1);
Assert.assertEquals("data of 1", dataOf1, artifact1);
pair2.second.setup(pair2.first);
Object artifact2 = pair2.second.retrieve(1);
Assert.assertEquals("data of 2", dataOf2, artifact2);
pair1.second.teardown();
pair2.second.teardown();
}
@Test
public void testRetrieveAllPartitions() throws IOException
{
Pair<Context.OperatorContext, FSWindowDataManager> pair1 = createManagerAndContextFor(1);
Pair<Context.OperatorContext, FSWindowDataManager> pair2 = createManagerAndContextFor(2);
pair1.second.setup(pair1.first);
pair2.second.setup(pair2.first);
Map<Integer, String> dataOf1 = Maps.newHashMap();
dataOf1.put(1, "one");
dataOf1.put(2, "two");
dataOf1.put(3, "three");
Map<Integer, String> dataOf2 = Maps.newHashMap();
dataOf2.put(4, "four");
dataOf2.put(5, "five");
dataOf2.put(6, "six");
pair1.second.save(dataOf1, 1);
pair2.second.save(dataOf2, 1);
pair1.second.teardown();
pair2.second.teardown();
List<WindowDataManager> managers = pair1.second.partition(3, null);
managers.get(0).setup(pair1.first);
Map<Integer, Object> artifacts = managers.get(0).retrieveAllPartitions(1);
Assert.assertEquals("num artifacts", 2, artifacts.size());
Assert.assertEquals("artifact 1", dataOf1, artifacts.get(1));
Assert.assertEquals("artifact 2", dataOf2, artifacts.get(2));
managers.get(0).teardown();
}
@Test
public void testRecovery() throws IOException
{
Pair<Context.OperatorContext, FSWindowDataManager> pair1 = createManagerAndContextFor(1);
Pair<Context.OperatorContext, FSWindowDataManager> pair2 = createManagerAndContextFor(2);
pair1.second.setup(pair1.first);
pair2.second.setup(pair2.first);
Map<Integer, String> dataOf1 = Maps.newHashMap();
dataOf1.put(1, "one");
dataOf1.put(2, "two");
dataOf1.put(3, "three");
Map<Integer, String> dataOf2 = Maps.newHashMap();
dataOf2.put(4, "four");
dataOf2.put(5, "five");
dataOf2.put(6, "six");
pair1.second.save(dataOf1, 1);
pair2.second.save(dataOf2, 2);
pair1.second.setup(pair1.first);
Assert.assertEquals("largest recovery window", 1, pair1.second.getLargestCompletedWindow());
pair2.second.setup(pair2.first);
Assert.assertEquals("largest recovery window", 2, pair2.second.getLargestCompletedWindow());
pair1.second.teardown();
pair2.second.teardown();
WindowDataManager manager = pair1.second.partition(1, Sets.newHashSet(2)).get(0);
manager.setup(pair1.first);
Assert.assertEquals("largest recovery window", 1, manager.getLargestCompletedWindow());
manager.teardown();
}
@Test
public void testDelete() throws IOException
{
Pair<Context.OperatorContext, FSWindowDataManager> pair1 = createManagerAndContextFor(1);
pair1.second.getWal().setMaxLength(2);
pair1.second.setup(pair1.first);
Map<Integer, String> dataOf1 = Maps.newHashMap();
dataOf1.put(1, "one");
dataOf1.put(2, "two");
dataOf1.put(3, "three");
for (int i = 1; i <= 9; ++i) {
pair1.second.save(dataOf1, i);
}
pair1.second.committed(3);
pair1.second.teardown();
Pair<Context.OperatorContext, FSWindowDataManager> pair1AfterRecovery = createManagerAndContextFor(1);
testMeta.attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 1L);
pair1AfterRecovery.second.setup(pair1AfterRecovery.first);
Assert.assertEquals("window 1 deleted", null, pair1AfterRecovery.second.retrieve(1));
Assert.assertEquals("window 3 deleted", null, pair1AfterRecovery.second.retrieve(3));
Assert.assertEquals("window 4 exists", dataOf1, pair1AfterRecovery.second.retrieve(4));
pair1.second.teardown();
}
@Test
public void testDeleteDoesNotRemoveTmpFiles() throws IOException
{
Pair<Context.OperatorContext, FSWindowDataManager> pair1 = createManagerAndContextFor(1);
pair1.second.setup(pair1.first);
Pair<Context.OperatorContext, FSWindowDataManager> pair2 = createManagerAndContextFor(2);
pair2.second.setup(pair2.first);
Pair<Context.OperatorContext, FSWindowDataManager> pair3 = createManagerAndContextFor(3);
pair3.second.setup(pair3.first);
Map<Integer, String> dataOf1 = Maps.newHashMap();
dataOf1.put(1, "one");
dataOf1.put(2, "two");
dataOf1.put(3, "three");
Map<Integer, String> dataOf2 = Maps.newHashMap();
dataOf2.put(4, "four");
dataOf2.put(5, "five");
dataOf2.put(6, "six");
Map<Integer, String> dataOf3 = Maps.newHashMap();
dataOf2.put(7, "seven");
dataOf2.put(8, "eight");
dataOf2.put(9, "nine");
for (int i = 1; i <= 9; ++i) {
pair1.second.save(dataOf1, i);
}
for (int i = 1; i <= 6; ++i) {
pair2.second.save(dataOf2, i);
}
for (int i = 1; i <= 3; ++i) {
pair3.second.save(dataOf3, i);
}
pair1.second.teardown();
pair2.second.teardown();
pair3.second.teardown();
FSWindowDataManager fsManager = (FSWindowDataManager)pair1.second.partition(1, Sets.newHashSet(2, 3)).get(0);
fsManager.setup(pair1.first);
Assert.assertEquals("recovery window", 3, fsManager.getLargestCompletedWindow());
Map<Integer, Object> artifacts = fsManager.retrieveAllPartitions(1);
Assert.assertEquals("num artifacts", 3, artifacts.size());
fsManager.committed(3);
fsManager.teardown();
testMeta.attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 3L);
fsManager.setup(pair1.first);
Assert.assertEquals("recovery window", Stateless.WINDOW_ID, fsManager.getLargestCompletedWindow());
fsManager.teardown();
}
@Test
public void testAbsoluteRecoveryPath() throws IOException
{
Pair<Context.OperatorContext, FSWindowDataManager> pair = createManagerAndContextFor(1);
pair.second.setStatePathRelativeToAppPath(false);
long time = System.currentTimeMillis();
pair.second.setStatePath("target/" + time);
pair.second.setup(pair.first);
Map<Integer, String> data = Maps.newHashMap();
data.put(1, "one");
data.put(2, "two");
data.put(3, "three");
pair.second.save(data, 1);
File recoveryDir = new File("target/" + time);
Assert.assertTrue("recover filePath exist", recoveryDir.isDirectory());
pair.second.teardown();
}
private Pair<Context.OperatorContext, FSWindowDataManager> createManagerAndContextFor(int operatorId)
{
FSWindowDataManager dataManager = new FSWindowDataManager();
OperatorContext context = mockOperatorContext(operatorId, testMeta.attributes);
return new Pair<>(context, dataManager);
}
}