blob: 567c4bc56fd85b71c6a074e28524021ab8887dcb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.compute.ComputeTaskFuture;
import org.apache.ignite.compute.ComputeTaskMapAsync;
import org.apache.ignite.compute.ComputeTaskSession;
import org.apache.ignite.compute.ComputeTaskSessionFullSupport;
import org.apache.ignite.compute.ComputeTaskSplitAdapter;
import org.apache.ignite.compute.ComputeTaskTimeoutException;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.CheckpointEvent;
import org.apache.ignite.events.DeploymentEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.events.JobEvent;
import org.apache.ignite.events.TaskEvent;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.resources.TaskSessionResource;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static org.apache.ignite.compute.ComputeJobResultPolicy.FAILOVER;
import static org.apache.ignite.compute.ComputeJobResultPolicy.WAIT;
import static org.apache.ignite.events.EventType.EVT_CHECKPOINT_LOADED;
import static org.apache.ignite.events.EventType.EVT_CHECKPOINT_REMOVED;
import static org.apache.ignite.events.EventType.EVT_CHECKPOINT_SAVED;
import static org.apache.ignite.events.EventType.EVT_JOB_CANCELLED;
import static org.apache.ignite.events.EventType.EVT_JOB_FAILED;
import static org.apache.ignite.events.EventType.EVT_JOB_FAILED_OVER;
import static org.apache.ignite.events.EventType.EVT_JOB_FINISHED;
import static org.apache.ignite.events.EventType.EVT_JOB_MAPPED;
import static org.apache.ignite.events.EventType.EVT_JOB_QUEUED;
import static org.apache.ignite.events.EventType.EVT_JOB_RESULTED;
import static org.apache.ignite.events.EventType.EVT_JOB_STARTED;
import static org.apache.ignite.events.EventType.EVT_JOB_TIMEDOUT;
import static org.apache.ignite.events.EventType.EVT_TASK_DEPLOYED;
import static org.apache.ignite.events.EventType.EVT_TASK_FAILED;
import static org.apache.ignite.events.EventType.EVT_TASK_FINISHED;
import static org.apache.ignite.events.EventType.EVT_TASK_REDUCED;
import static org.apache.ignite.events.EventType.EVT_TASK_STARTED;
import static org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT;
import static org.apache.ignite.events.EventType.EVT_TASK_UNDEPLOYED;
/**
* Test event storage.
*/
@GridCommonTest(group = "Kernal Self")
public class GridEventStorageCheckAllEventsSelfTest extends GridCommonAbstractTest {
/** */
private static Ignite ignite;
/**
*
*/
public GridEventStorageCheckAllEventsSelfTest() {
super(/*start grid*/true);
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration() throws Exception {
IgniteConfiguration cfg = super.getConfiguration();
// TODO: IGNITE-3099 (hotfix the test to check the event order in common case).
cfg.setPublicThreadPoolSize(1);
cfg.setIncludeEventTypes(EventType.EVTS_ALL);
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
ignite = G.ignite(getTestIgniteInstanceName());
long tstamp = startTimestamp();
ignite.compute().localDeployTask(GridAllEventsTestTask.class, GridAllEventsTestTask.class.getClassLoader());
List<Event> evts = pullEvents(tstamp, 1);
assertEvent(evts.get(0).type(), EVT_TASK_DEPLOYED, evts);
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
ignite = null;
}
/**
* @param evtType Actual event type.
* @param expType Expected event type.
* @param evts Full list of events.
*/
private void assertEvent(int evtType, int expType, List<Event> evts) {
assert evtType == expType : "Invalid event [evtType=" + evtType + ", expectedType=" + expType +
", evts=" + evts + ']';
}
/**
* @throws Exception If test failed.
*/
@Test
public void testCheckpointEvents() throws Exception {
long tstamp = startTimestamp();
generateEvents(null, new GridAllCheckpointEventsTestJob()).get();
List<Event> evts = pullEvents(tstamp, 11);
assertEvent(evts.get(0).type(), EVT_TASK_STARTED, evts);
assertEvent(evts.get(1).type(), EVT_JOB_MAPPED, evts);
assertEvent(evts.get(2).type(), EVT_JOB_QUEUED, evts);
assertEvent(evts.get(3).type(), EVT_JOB_STARTED, evts);
assertEvent(evts.get(4).type(), EVT_CHECKPOINT_SAVED, evts);
assertEvent(evts.get(5).type(), EVT_CHECKPOINT_LOADED, evts);
assertEvent(evts.get(6).type(), EVT_CHECKPOINT_REMOVED, evts);
assertEvent(evts.get(7).type(), EVT_JOB_RESULTED, evts);
assertEvent(evts.get(8).type(), EVT_TASK_REDUCED, evts);
assertEvent(evts.get(9).type(), EVT_TASK_FINISHED, evts);
assertEvent(evts.get(10).type(), EVT_JOB_FINISHED, evts);
assertNotNull(((JobEvent)evts.get(7)).resultPolicy());
assertEquals(WAIT, ((JobEvent)evts.get(7)).resultPolicy());
}
/**
* @throws Exception If test failed.
*/
@Test
public void testTaskUndeployEvents() throws Exception {
final long tstamp = startTimestamp();
generateEvents(null, new GridAllEventsSuccessTestJob()).get();
// TODO: IGNITE-3099 (hotfix the test to check the event order in common case).
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
try {
List<Event> evts = pullEvents(tstamp, 10);
return evts.get(evts.size() - 1).type() == EVT_JOB_FINISHED;
}
catch (Exception ignored) {
return false;
}
}
}, 500);
ignite.compute().undeployTask(GridAllEventsTestTask.class.getName());
ignite.compute().localDeployTask(GridAllEventsTestTask.class, GridAllEventsTestTask.class.getClassLoader());
List<Event> evts = pullEvents(tstamp, 12);
assertEvent(evts.get(0).type(), EVT_TASK_STARTED, evts);
assertEvent(evts.get(1).type(), EVT_JOB_MAPPED, evts);
assertEvent(evts.get(2).type(), EVT_JOB_QUEUED, evts);
assertEvent(evts.get(3).type(), EVT_JOB_STARTED, evts);
assertEvent(evts.get(4).type(), EVT_CHECKPOINT_SAVED, evts);
assertEvent(evts.get(5).type(), EVT_CHECKPOINT_REMOVED, evts);
assertEvent(evts.get(6).type(), EVT_JOB_RESULTED, evts);
assertEvent(evts.get(7).type(), EVT_TASK_REDUCED, evts);
assertEvent(evts.get(8).type(), EVT_TASK_FINISHED, evts);
assertEvent(evts.get(9).type(), EVT_JOB_FINISHED, evts);
assertEvent(evts.get(10).type(), EVT_TASK_UNDEPLOYED, evts);
assertEvent(evts.get(11).type(), EVT_TASK_DEPLOYED, evts);
assertNotNull(((JobEvent)evts.get(6)).resultPolicy());
assertEquals(WAIT, ((JobEvent)evts.get(6)).resultPolicy());
}
/**
* @throws Exception If test failed.
*/
@Test
public void testSuccessTask() throws Exception {
generateEvents(null, new GridAllEventsSuccessTestJob()).get();
long tstamp = startTimestamp();
generateEvents(null, new GridAllEventsSuccessTestJob()).get();
List<Event> evts = pullEvents(tstamp, 10);
assertEvent(evts.get(0).type(), EVT_TASK_STARTED, evts);
assertEvent(evts.get(1).type(), EVT_JOB_MAPPED, evts);
assertEvent(evts.get(2).type(), EVT_JOB_QUEUED, evts);
assertEvent(evts.get(3).type(), EVT_JOB_STARTED, evts);
assertEvent(evts.get(4).type(), EVT_CHECKPOINT_SAVED, evts);
assertEvent(evts.get(5).type(), EVT_CHECKPOINT_REMOVED, evts);
assertEvent(evts.get(6).type(), EVT_JOB_RESULTED, evts);
assertEvent(evts.get(7).type(), EVT_TASK_REDUCED, evts);
assertEvent(evts.get(8).type(), EVT_TASK_FINISHED, evts);
assertEvent(evts.get(9).type(), EVT_JOB_FINISHED, evts);
assertNotNull(((JobEvent)evts.get(6)).resultPolicy());
assertEquals(WAIT, ((JobEvent)evts.get(6)).resultPolicy());
}
/**
* @throws Exception If test failed.
*/
@Test
public void testFailoverJobTask() throws Exception {
startGrid(0);
try {
generateEvents(null, new GridAllEventsSuccessTestJob()).get();
ignite.compute().execute(GridFailoverTestTask.class.getName(), new GridAllEventsSuccessTestJob());
long tstamp = startTimestamp();
ignite.compute().execute(GridFailoverTestTask.class.getName(), new GridAllEventsSuccessTestJob());
List<Event> evts = pullEvents(tstamp, 12, GridFailoverTestTask.class.getName());
int cnt = 0;
assertEvent(evts.get(cnt++).type(), EVT_TASK_STARTED, evts);
assertEvent(evts.get(cnt++).type(), EVT_JOB_MAPPED, evts);
assertEvent(evts.get(cnt++).type(), EVT_JOB_RESULTED, evts);
assertEquals(((JobEvent)evts.get(cnt - 1)).resultPolicy(), FAILOVER);
assertEvent(evts.get(cnt++).type(), EVT_JOB_FAILED_OVER, evts);
assertEvent(evts.get(cnt++).type(), EVT_JOB_QUEUED, evts);
assertEvent(evts.get(cnt++).type(), EVT_JOB_STARTED, evts);
assertEvent(evts.get(cnt++).type(), EVT_CHECKPOINT_SAVED, evts);
assertEvent(evts.get(cnt++).type(), EVT_CHECKPOINT_REMOVED, evts);
assertEvent(evts.get(cnt++).type(), EVT_JOB_RESULTED, evts);
assertEvent(evts.get(cnt++).type(), EVT_TASK_REDUCED, evts);
assertEvent(evts.get(cnt++).type(), EVT_TASK_FINISHED, evts);
assertEvent(evts.get(cnt++).type(), EVT_JOB_FINISHED, evts);
}
finally {
stopGrid(0);
}
}
/**
* @throws Exception If test failed.
*/
@Test
public void testFailTask() throws Exception {
long tstamp = startTimestamp();
ComputeTaskFuture<?> fut = generateEvents(null, new GridAllEventsFailTestJob());
try {
fut.get();
assert false : "Grid with locally executed job with timeout should throw ComputeTaskTimeoutException.";
}
catch (IgniteException e) {
info("Expected exception caught [taskFuture=" + fut + ", exception=" + e + ']');
}
List<Event> evts = pullEvents(tstamp, 7);
assertEvent(evts.get(0).type(), EVT_TASK_STARTED, evts);
assertEvent(evts.get(1).type(), EVT_JOB_MAPPED, evts);
assertEvent(evts.get(2).type(), EVT_JOB_QUEUED, evts);
assertEvent(evts.get(3).type(), EVT_JOB_STARTED, evts);
assertEvent(evts.get(4).type(), EVT_JOB_RESULTED, evts);
assertEvent(evts.get(5).type(), EVT_TASK_FAILED, evts);
assertEvent(evts.get(6).type(), EVT_JOB_FAILED, evts);
// Exception was thrown, so policy is null.
assertNull(((JobEvent)evts.get(4)).resultPolicy());
}
/**
* @throws Exception If test failed.
*/
@Test
public void testTimeoutTask() throws Exception {
long tstamp = startTimestamp();
ComputeTaskFuture<?> fut = generateEvents(1000L, new GridAllEventsTimeoutTestJob());
try {
fut.get();
assert false : "Task should fail.";
}
catch (ComputeTaskTimeoutException e) {
info("Expected timeout exception caught [taskFuture=" + fut + ", exception=" + e + ']');
}
List<Event> evts = pullEvents(tstamp, 6);
assertEvent(evts.get(0).type(), EVT_TASK_STARTED, evts);
assertEvent(evts.get(1).type(), EVT_JOB_MAPPED, evts);
assertEvent(evts.get(2).type(), EVT_JOB_QUEUED, evts);
assertEvent(evts.get(3).type(), EVT_JOB_STARTED, evts);
boolean isTaskTimeout = false;
boolean isTaskFailed = false;
for (int i = 4; i < evts.size(); i++) {
int evtType = evts.get(i).type();
if (evtType == EVT_TASK_TIMEDOUT) {
assert !isTaskTimeout;
assert !isTaskFailed;
isTaskTimeout = true;
}
else if (evtType == EVT_TASK_FAILED) {
assert isTaskTimeout;
assert !isTaskFailed;
isTaskFailed = true;
}
else {
assert evtType == EVT_JOB_CANCELLED
|| evtType == EVT_JOB_TIMEDOUT
|| evtType == EVT_JOB_FAILED
|| evtType == EVT_JOB_FINISHED :
"Unexpected event: " + evts.get(i);
}
}
assert isTaskTimeout;
assert isTaskFailed;
}
/**
* Returns timestamp at the method call moment, but sleeps before return,
* to allow pass {@link IgniteUtils#currentTimeMillis()}.
*
* @return Call timestamp.
* @throws Exception If failed.
*/
private long startTimestamp() throws Exception {
final long tstamp = U.currentTimeMillis();
Thread.sleep(20);
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
return U.currentTimeMillis() > tstamp;
}
}, 5000);
assert U.currentTimeMillis() > tstamp;
return U.currentTimeMillis();
}
/**
* Pull all test task related events since the given moment.
*
* @param since Earliest time to pulled events.
* @param evtCnt Expected event count
* @return List of events.
* @throws Exception If failed.
*/
private List<Event> pullEvents(long since, int evtCnt) throws Exception {
return pullEvents(since, evtCnt, GridAllEventsTestTask.class.getName());
}
/**
* Pull all test task related events since the given moment.
*
* @param since Earliest time to pulled events.
* @param evtCnt Expected event count.
* @param taskName Name of the task.
* @return List of events.
* @throws Exception If failed.
*/
private List<Event> pullEvents(long since, int evtCnt, String taskName) throws Exception {
IgnitePredicate<Event> filter = new CustomEventFilter(taskName, since);
for (int i = 0; i < 3; i++) {
List<Event> evts = new ArrayList<>(ignite.events().localQuery((filter)));
info("Filtered events [size=" + evts.size() + ", evts=" + evts + ']');
if (evtCnt != evts.size() && i < 2) {
U.warn(log, "Invalid event count (will retry in 1000 ms) [actual=" + evts.size() +
", expected=" + evtCnt + ", evts=" + evts + ']');
U.sleep(1000);
continue;
}
assert evtCnt <= evts.size() : "Invalid event count [actual=" + evts.size() + ", expected=" + evtCnt +
", evts=" + evts + ']';
return evts;
}
assert false;
return null;
}
/**
* @param timeout Timeout.
* @param job Job.
* @return Task future.
* @throws Exception If failed.
*/
private ComputeTaskFuture<?> generateEvents(@Nullable Long timeout, ComputeJob job) throws Exception {
return timeout == null
? ignite.compute().executeAsync(GridAllEventsTestTask.class.getName(), job)
: ignite.compute().withTimeout(timeout).executeAsync(GridAllEventsTestTask.class.getName(), job);
}
/**
*
*/
private static class CustomEventFilter implements IgnitePredicate<Event> {
/** */
private final String taskName;
/** */
private final long tstamp;
/**
* @param taskName Task name.
* @param tstamp Timestamp.
*/
CustomEventFilter(String taskName, long tstamp) {
assert taskName != null;
assert tstamp > 0;
this.taskName = taskName;
this.tstamp = tstamp;
}
/** {@inheritDoc} */
@Override public boolean apply(Event evt) {
if (evt.timestamp() >= tstamp) {
if (evt instanceof TaskEvent)
return taskName.equals(((TaskEvent)evt).taskName());
else if (evt instanceof JobEvent)
return taskName.equals(((JobEvent)evt).taskName());
else if (evt instanceof DeploymentEvent)
return taskName.equals(((DeploymentEvent)evt).alias());
else if (evt instanceof CheckpointEvent)
return true;
}
return false;
}
}
/**
*
*/
private static class GridAllEventsSuccessTestJob extends ComputeJobAdapter {
/** */
@TaskSessionResource
private ComputeTaskSession taskSes;
/** {@inheritDoc} */
@Override public String execute() {
assert taskSes != null;
taskSes.saveCheckpoint("testCheckpoint", "TestState");
taskSes.removeCheckpoint("testCheckpoint");
return "GridAllEventsSuccessTestJob-test-event-success.";
}
}
/**
*
*/
private static class GridAllEventsFailTestJob extends ComputeJobAdapter {
/** {@inheritDoc} */
@Override public String execute() {
throw new RuntimeException("GridAllEventsFailTestJob expected test exception.");
}
}
/**
*/
private static class GridAllEventsTimeoutTestJob extends ComputeJobAdapter {
/** */
@LoggerResource
private IgniteLogger log;
/** {@inheritDoc} */
@SuppressWarnings("BusyWait")
@Override public String execute() {
try {
while (!isCancelled())
Thread.sleep(5000);
}
catch (InterruptedException ignored) {
if (log.isInfoEnabled())
log.info("GridAllEventsTimeoutTestJob was interrupted.");
return "GridAllEventsTimeoutTestJob-test-event-timeout.";
}
return "GridAllEventsTimeoutTestJob-test-event-timeout.";
}
}
/**
*
*/
private static class GridAllCheckpointEventsTestJob extends ComputeJobAdapter {
/** */
@TaskSessionResource
private ComputeTaskSession taskSes;
/** {@inheritDoc} */
@Override public String execute() {
assert taskSes != null;
taskSes.saveCheckpoint("testAllCheckpoint", "CheckpointTestState");
taskSes.loadCheckpoint("testAllCheckpoint");
taskSes.removeCheckpoint("testAllCheckpoint");
return "GridAllCheckpointEventsSuccess-test-all-checkpoint-event-success.";
}
}
/**
*
*/
@ComputeTaskSessionFullSupport
@ComputeTaskMapAsync // TODO: IGNITE-3099 (hotfix the test to check the event order in common case).
private static class GridAllEventsTestTask extends ComputeTaskSplitAdapter<Object, Object> {
/** {@inheritDoc} */
@Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) {
return Collections.singleton((ComputeJob)arg);
}
/** {@inheritDoc} */
@Override public Serializable reduce(List<ComputeJobResult> results) {
assert results != null;
assert results.size() == 1;
return (Serializable)results;
}
}
/**
*
*/
private static class GridFailoverTestTask extends GridAllEventsTestTask {
/** */
private final AtomicBoolean failed = new AtomicBoolean();
/** {@inheritDoc} */
@Override public ComputeJobResultPolicy result(ComputeJobResult res,
List<ComputeJobResult> rcvd) throws IgniteException {
if (failed.compareAndSet(false, true))
return FAILOVER;
return super.result(res, rcvd);
}
}
}