blob: d5eac2c38d7499dddf7e45955d2fbd9d391c6e94 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.instance.bolt;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import com.google.protobuf.ByteString;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.apache.heron.api.Config;
import org.apache.heron.api.bolt.IRichBolt;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.api.serializer.IPluggableSerializer;
import org.apache.heron.api.serializer.JavaSerializer;
import org.apache.heron.common.basics.SingletonRegistry;
import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
import org.apache.heron.instance.InstanceControlMsg;
import org.apache.heron.instance.SlaveTester;
import org.apache.heron.proto.system.HeronTuples;
import org.apache.heron.proto.system.PhysicalPlans;
import org.apache.heron.resource.Constants;
import org.apache.heron.resource.MockPhysicalPlansBuilder;
import org.apache.heron.resource.TestSpout;
import org.apache.heron.resource.TestStatefulBolt;
import org.apache.heron.resource.TestTwoPhaseStatefulBolt;
import org.apache.heron.resource.UnitTestHelper;
import static org.junit.Assert.*;
/**
* Test if stateful bolt is able to respond to incoming control/data tuples as expected.
*/
public class BoltStatefulInstanceTest {
private SlaveTester slaveTester;
private static IPluggableSerializer serializer = new JavaSerializer();
@Before
public void before() {
slaveTester = new SlaveTester();
slaveTester.start();
}
@After
public void after() throws NoSuchFieldException, IllegalAccessException {
slaveTester.stop();
}
@Test
public void testPreSaveAndPostSave() throws Exception {
CountDownLatch preSaveLatch = new CountDownLatch(1);
CountDownLatch postSaveLatch = new CountDownLatch(1);
SingletonRegistry.INSTANCE.registerSingleton(Constants.PRESAVE_LATCH, preSaveLatch);
SingletonRegistry.INSTANCE.registerSingleton(Constants.POSTSAVE_LATCH, postSaveLatch);
slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0"));
slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0"));
slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageFor2PCBolt());
// initially non of preSave or postSave are invoked yet
assertEquals(1, preSaveLatch.getCount());
assertEquals(1, postSaveLatch.getCount());
// this should invoke preSave
slaveTester.getInStreamQueue().offer(UnitTestHelper.buildPersistStateMessage("c0"));
assertTrue(preSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
assertEquals(0, preSaveLatch.getCount());
assertEquals(1, postSaveLatch.getCount());
// this should invoke postSave
slaveTester.getInControlQueue().offer(UnitTestHelper.buildCheckpointSavedMessage("c0", "p0"));
assertTrue(postSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
assertEquals(0, preSaveLatch.getCount());
assertEquals(0, postSaveLatch.getCount());
}
@Test
public void testPreRestore() throws InterruptedException {
CountDownLatch preRestoreLatch = new CountDownLatch(1);
SingletonRegistry.INSTANCE.registerSingleton(Constants.PRERESTORE_LATCH, preRestoreLatch);
slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0"));
slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0"));
slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageFor2PCBolt());
assertEquals(1, preRestoreLatch.getCount());
slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("cx"));
assertTrue(preRestoreLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
assertEquals(0, preRestoreLatch.getCount());
}
/**
* Ensure that for ITwoPhaseStatefulComponent bolts, after a preSave, execute will not be invoked
* unless the corresponding postSave is called.
*/
@Test
public void testPostSaveBlockExecute() throws Exception {
CountDownLatch preSaveLatch = new CountDownLatch(1);
CountDownLatch postSaveLatch = new CountDownLatch(1);
CountDownLatch executeLatch = new CountDownLatch(1); // expect to execute one tuple
SingletonRegistry.INSTANCE.registerSingleton(Constants.PRESAVE_LATCH, preSaveLatch);
SingletonRegistry.INSTANCE.registerSingleton(Constants.POSTSAVE_LATCH, postSaveLatch);
SingletonRegistry.INSTANCE.registerSingleton(Constants.EXECUTE_LATCH, executeLatch);
slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0"));
slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0"));
slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageFor2PCBolt());
// initially non of preSave or postSave are invoked yet
assertEquals(1, preSaveLatch.getCount());
assertEquals(1, postSaveLatch.getCount());
assertEquals(1, executeLatch.getCount());
// this should invoke preSave
slaveTester.getInStreamQueue().offer(UnitTestHelper.buildPersistStateMessage("c0"));
// put a data tuple into the inStreamQueue
slaveTester.getInStreamQueue().offer(buildTupleSet());
assertTrue(preSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
assertEquals(0, preSaveLatch.getCount());
assertEquals(1, postSaveLatch.getCount());
assertEquals(1, executeLatch.getCount());
// Wait for a bounded amount of time, assert that the tuple will not execute as it is
// blocked on postSave. This is because we only want to allow one uncommitted "transaction" on
// each task. See the design doc for more details.
assertFalse(executeLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
assertEquals(0, preSaveLatch.getCount());
assertEquals(1, postSaveLatch.getCount());
assertEquals(1, executeLatch.getCount());
// this should invoke postSave
slaveTester.getInControlQueue().offer(UnitTestHelper.buildCheckpointSavedMessage("c0", "p0"));
assertTrue(postSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
assertTrue(executeLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
assertEquals(0, preSaveLatch.getCount());
assertEquals(0, postSaveLatch.getCount());
assertEquals(0, executeLatch.getCount());
}
/**
* Ensure that the aforementioned behaviour does not apply for bolts that don't implement
* ITwoPhaseStatefulComponent
*/
@Test
public void testExecuteNotBlocked() throws Exception {
CountDownLatch preSaveLatch = new CountDownLatch(1);
CountDownLatch executeLatch = new CountDownLatch(1); // expect to execute one tuple
SingletonRegistry.INSTANCE.registerSingleton(Constants.PRESAVE_LATCH, preSaveLatch);
SingletonRegistry.INSTANCE.registerSingleton(Constants.EXECUTE_LATCH, executeLatch);
slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0"));
slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0"));
slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageForStatefulBolt());
// initially non of preSave or postSave are invoked yet
assertEquals(1, preSaveLatch.getCount());
assertEquals(1, executeLatch.getCount());
// this should invoke preSave
slaveTester.getInStreamQueue().offer(UnitTestHelper.buildPersistStateMessage("c0"));
// put a data tuple into the inStreamQueue
slaveTester.getInStreamQueue().offer(buildTupleSet());
assertTrue(preSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
assertEquals(0, preSaveLatch.getCount());
// no need to wait for postSave as the bolt doesn't implement ITwoPhaseStatefulComponent
assertTrue(executeLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
assertEquals(0, preSaveLatch.getCount());
assertEquals(0, executeLatch.getCount());
}
// build a tuple set that contains one data tuple
private HeronTuples.HeronTupleSet buildTupleSet() {
HeronTuples.HeronTupleSet.Builder heronTupleSet = HeronTuples.HeronTupleSet.newBuilder();
heronTupleSet.setSrcTaskId(1);
HeronTuples.HeronDataTupleSet.Builder dataTupleSet = HeronTuples.HeronDataTupleSet.newBuilder();
TopologyAPI.StreamId.Builder streamId = TopologyAPI.StreamId.newBuilder();
streamId.setComponentName("test-spout");
streamId.setId("default");
dataTupleSet.setStream(streamId);
HeronTuples.HeronDataTuple.Builder dataTuple = HeronTuples.HeronDataTuple.newBuilder();
dataTuple.setKey(0);
HeronTuples.RootId.Builder rootId = HeronTuples.RootId.newBuilder();
rootId.setKey(0);
rootId.setTaskid(0);
dataTuple.addRoots(rootId);
dataTuple.addValues(ByteString.copyFrom(serializer.serialize("A")));
dataTupleSet.addTuples(dataTuple);
heronTupleSet.setData(dataTupleSet);
return heronTupleSet.build();
}
private InstanceControlMsg buildPhysicalPlanMessageFor2PCBolt() {
return buildPhysicalPlanMessage(new TestTwoPhaseStatefulBolt());
}
private InstanceControlMsg buildPhysicalPlanMessageForStatefulBolt() {
return buildPhysicalPlanMessage(new TestStatefulBolt());
}
private InstanceControlMsg buildPhysicalPlanMessage(IRichBolt bolt) {
PhysicalPlans.PhysicalPlan physicalPlan =
MockPhysicalPlansBuilder
.newBuilder()
.withTopologyConfig(Config.TopologyReliabilityMode.EFFECTIVELY_ONCE, -1)
.withTopologyState(TopologyAPI.TopologyState.RUNNING)
.withSpoutInstance(
"test-spout",
0,
"spout-id",
new TestSpout()
)
.withBoltInstance(
"test-bolt",
1,
"bolt-id",
"test-spout",
bolt
)
.build();
PhysicalPlanHelper ph = new PhysicalPlanHelper(physicalPlan, "bolt-id");
return InstanceControlMsg.newBuilder()
.setNewPhysicalPlanHelper(ph)
.build();
}
}