| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal; |
| |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.compute.ComputeJob; |
| import org.apache.ignite.compute.ComputeJobAdapter; |
| import org.apache.ignite.compute.ComputeJobResult; |
| import org.apache.ignite.compute.ComputeJobResultPolicy; |
| import org.apache.ignite.compute.ComputeTaskFuture; |
| import org.apache.ignite.compute.ComputeTaskMapAsync; |
| import org.apache.ignite.compute.ComputeTaskSession; |
| import org.apache.ignite.compute.ComputeTaskSessionFullSupport; |
| import org.apache.ignite.compute.ComputeTaskSplitAdapter; |
| import org.apache.ignite.compute.ComputeTaskTimeoutException; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.events.CheckpointEvent; |
| import org.apache.ignite.events.DeploymentEvent; |
| import org.apache.ignite.events.Event; |
| import org.apache.ignite.events.EventType; |
| import org.apache.ignite.events.JobEvent; |
| import org.apache.ignite.events.TaskEvent; |
| import org.apache.ignite.internal.util.IgniteUtils; |
| import org.apache.ignite.internal.util.lang.GridAbsPredicate; |
| import org.apache.ignite.internal.util.typedef.G; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.apache.ignite.resources.LoggerResource; |
| import org.apache.ignite.resources.TaskSessionResource; |
| import org.apache.ignite.testframework.GridTestUtils; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.apache.ignite.testframework.junits.common.GridCommonTest; |
| import org.jetbrains.annotations.Nullable; |
| import org.junit.Test; |
| |
| import static org.apache.ignite.compute.ComputeJobResultPolicy.FAILOVER; |
| import static org.apache.ignite.compute.ComputeJobResultPolicy.WAIT; |
| import static org.apache.ignite.events.EventType.EVT_CHECKPOINT_LOADED; |
| import static org.apache.ignite.events.EventType.EVT_CHECKPOINT_REMOVED; |
| import static org.apache.ignite.events.EventType.EVT_CHECKPOINT_SAVED; |
| import static org.apache.ignite.events.EventType.EVT_JOB_CANCELLED; |
| import static org.apache.ignite.events.EventType.EVT_JOB_FAILED; |
| import static org.apache.ignite.events.EventType.EVT_JOB_FAILED_OVER; |
| import static org.apache.ignite.events.EventType.EVT_JOB_FINISHED; |
| import static org.apache.ignite.events.EventType.EVT_JOB_MAPPED; |
| import static org.apache.ignite.events.EventType.EVT_JOB_QUEUED; |
| import static org.apache.ignite.events.EventType.EVT_JOB_RESULTED; |
| import static org.apache.ignite.events.EventType.EVT_JOB_STARTED; |
| import static org.apache.ignite.events.EventType.EVT_JOB_TIMEDOUT; |
| import static org.apache.ignite.events.EventType.EVT_TASK_DEPLOYED; |
| import static org.apache.ignite.events.EventType.EVT_TASK_FAILED; |
| import static org.apache.ignite.events.EventType.EVT_TASK_FINISHED; |
| import static org.apache.ignite.events.EventType.EVT_TASK_REDUCED; |
| import static org.apache.ignite.events.EventType.EVT_TASK_STARTED; |
| import static org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT; |
| import static org.apache.ignite.events.EventType.EVT_TASK_UNDEPLOYED; |
| |
| /** |
| * Test event storage. |
| */ |
| @GridCommonTest(group = "Kernal Self") |
| public class GridEventStorageCheckAllEventsSelfTest extends GridCommonAbstractTest { |
| /** */ |
| private static Ignite ignite; |
| |
| /** |
| * |
| */ |
| public GridEventStorageCheckAllEventsSelfTest() { |
| super(/*start grid*/true); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration() throws Exception { |
| IgniteConfiguration cfg = super.getConfiguration(); |
| |
| // TODO: IGNITE-3099 (hotfix the test to check the event order in common case). |
| cfg.setPublicThreadPoolSize(1); |
| |
| cfg.setIncludeEventTypes(EventType.EVTS_ALL); |
| |
| return cfg; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTestsStarted() throws Exception { |
| ignite = G.ignite(getTestIgniteInstanceName()); |
| |
| long tstamp = startTimestamp(); |
| |
| ignite.compute().localDeployTask(GridAllEventsTestTask.class, GridAllEventsTestTask.class.getClassLoader()); |
| |
| List<Event> evts = pullEvents(tstamp, 1); |
| |
| assertEvent(evts.get(0).type(), EVT_TASK_DEPLOYED, evts); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void afterTestsStopped() throws Exception { |
| ignite = null; |
| } |
| |
| /** |
| * @param evtType Actual event type. |
| * @param expType Expected event type. |
| * @param evts Full list of events. |
| */ |
| private void assertEvent(int evtType, int expType, List<Event> evts) { |
| assert evtType == expType : "Invalid event [evtType=" + evtType + ", expectedType=" + expType + |
| ", evts=" + evts + ']'; |
| } |
| |
| /** |
| * @throws Exception If test failed. |
| */ |
| @Test |
| public void testCheckpointEvents() throws Exception { |
| long tstamp = startTimestamp(); |
| |
| generateEvents(null, new GridAllCheckpointEventsTestJob()).get(); |
| |
| List<Event> evts = pullEvents(tstamp, 11); |
| |
| assertEvent(evts.get(0).type(), EVT_TASK_STARTED, evts); |
| assertEvent(evts.get(1).type(), EVT_JOB_MAPPED, evts); |
| assertEvent(evts.get(2).type(), EVT_JOB_QUEUED, evts); |
| assertEvent(evts.get(3).type(), EVT_JOB_STARTED, evts); |
| assertEvent(evts.get(4).type(), EVT_CHECKPOINT_SAVED, evts); |
| assertEvent(evts.get(5).type(), EVT_CHECKPOINT_LOADED, evts); |
| assertEvent(evts.get(6).type(), EVT_CHECKPOINT_REMOVED, evts); |
| assertEvent(evts.get(7).type(), EVT_JOB_RESULTED, evts); |
| assertEvent(evts.get(8).type(), EVT_TASK_REDUCED, evts); |
| assertEvent(evts.get(9).type(), EVT_TASK_FINISHED, evts); |
| assertEvent(evts.get(10).type(), EVT_JOB_FINISHED, evts); |
| |
| assertNotNull(((JobEvent)evts.get(7)).resultPolicy()); |
| assertEquals(WAIT, ((JobEvent)evts.get(7)).resultPolicy()); |
| } |
| |
| /** |
| * @throws Exception If test failed. |
| */ |
| @Test |
| public void testTaskUndeployEvents() throws Exception { |
| final long tstamp = startTimestamp(); |
| |
| generateEvents(null, new GridAllEventsSuccessTestJob()).get(); |
| |
| // TODO: IGNITE-3099 (hotfix the test to check the event order in common case). |
| GridTestUtils.waitForCondition(new GridAbsPredicate() { |
| @Override public boolean apply() { |
| try { |
| List<Event> evts = pullEvents(tstamp, 10); |
| return evts.get(evts.size() - 1).type() == EVT_JOB_FINISHED; |
| } |
| catch (Exception ignored) { |
| return false; |
| } |
| } |
| }, 500); |
| |
| ignite.compute().undeployTask(GridAllEventsTestTask.class.getName()); |
| ignite.compute().localDeployTask(GridAllEventsTestTask.class, GridAllEventsTestTask.class.getClassLoader()); |
| |
| List<Event> evts = pullEvents(tstamp, 12); |
| |
| assertEvent(evts.get(0).type(), EVT_TASK_STARTED, evts); |
| assertEvent(evts.get(1).type(), EVT_JOB_MAPPED, evts); |
| assertEvent(evts.get(2).type(), EVT_JOB_QUEUED, evts); |
| assertEvent(evts.get(3).type(), EVT_JOB_STARTED, evts); |
| assertEvent(evts.get(4).type(), EVT_CHECKPOINT_SAVED, evts); |
| assertEvent(evts.get(5).type(), EVT_CHECKPOINT_REMOVED, evts); |
| assertEvent(evts.get(6).type(), EVT_JOB_RESULTED, evts); |
| assertEvent(evts.get(7).type(), EVT_TASK_REDUCED, evts); |
| assertEvent(evts.get(8).type(), EVT_TASK_FINISHED, evts); |
| assertEvent(evts.get(9).type(), EVT_JOB_FINISHED, evts); |
| assertEvent(evts.get(10).type(), EVT_TASK_UNDEPLOYED, evts); |
| assertEvent(evts.get(11).type(), EVT_TASK_DEPLOYED, evts); |
| |
| assertNotNull(((JobEvent)evts.get(6)).resultPolicy()); |
| assertEquals(WAIT, ((JobEvent)evts.get(6)).resultPolicy()); |
| } |
| |
| /** |
| * @throws Exception If test failed. |
| */ |
| @Test |
| public void testSuccessTask() throws Exception { |
| generateEvents(null, new GridAllEventsSuccessTestJob()).get(); |
| |
| long tstamp = startTimestamp(); |
| |
| generateEvents(null, new GridAllEventsSuccessTestJob()).get(); |
| |
| List<Event> evts = pullEvents(tstamp, 10); |
| |
| assertEvent(evts.get(0).type(), EVT_TASK_STARTED, evts); |
| assertEvent(evts.get(1).type(), EVT_JOB_MAPPED, evts); |
| assertEvent(evts.get(2).type(), EVT_JOB_QUEUED, evts); |
| assertEvent(evts.get(3).type(), EVT_JOB_STARTED, evts); |
| assertEvent(evts.get(4).type(), EVT_CHECKPOINT_SAVED, evts); |
| assertEvent(evts.get(5).type(), EVT_CHECKPOINT_REMOVED, evts); |
| assertEvent(evts.get(6).type(), EVT_JOB_RESULTED, evts); |
| assertEvent(evts.get(7).type(), EVT_TASK_REDUCED, evts); |
| assertEvent(evts.get(8).type(), EVT_TASK_FINISHED, evts); |
| assertEvent(evts.get(9).type(), EVT_JOB_FINISHED, evts); |
| |
| assertNotNull(((JobEvent)evts.get(6)).resultPolicy()); |
| assertEquals(WAIT, ((JobEvent)evts.get(6)).resultPolicy()); |
| } |
| |
| /** |
| * @throws Exception If test failed. |
| */ |
| @Test |
| public void testFailoverJobTask() throws Exception { |
| startGrid(0); |
| |
| try { |
| generateEvents(null, new GridAllEventsSuccessTestJob()).get(); |
| |
| ignite.compute().execute(GridFailoverTestTask.class.getName(), new GridAllEventsSuccessTestJob()); |
| |
| long tstamp = startTimestamp(); |
| |
| ignite.compute().execute(GridFailoverTestTask.class.getName(), new GridAllEventsSuccessTestJob()); |
| |
| List<Event> evts = pullEvents(tstamp, 12, GridFailoverTestTask.class.getName()); |
| |
| int cnt = 0; |
| |
| assertEvent(evts.get(cnt++).type(), EVT_TASK_STARTED, evts); |
| assertEvent(evts.get(cnt++).type(), EVT_JOB_MAPPED, evts); |
| assertEvent(evts.get(cnt++).type(), EVT_JOB_RESULTED, evts); |
| |
| assertEquals(((JobEvent)evts.get(cnt - 1)).resultPolicy(), FAILOVER); |
| |
| assertEvent(evts.get(cnt++).type(), EVT_JOB_FAILED_OVER, evts); |
| assertEvent(evts.get(cnt++).type(), EVT_JOB_QUEUED, evts); |
| assertEvent(evts.get(cnt++).type(), EVT_JOB_STARTED, evts); |
| assertEvent(evts.get(cnt++).type(), EVT_CHECKPOINT_SAVED, evts); |
| assertEvent(evts.get(cnt++).type(), EVT_CHECKPOINT_REMOVED, evts); |
| assertEvent(evts.get(cnt++).type(), EVT_JOB_RESULTED, evts); |
| assertEvent(evts.get(cnt++).type(), EVT_TASK_REDUCED, evts); |
| assertEvent(evts.get(cnt++).type(), EVT_TASK_FINISHED, evts); |
| assertEvent(evts.get(cnt++).type(), EVT_JOB_FINISHED, evts); |
| } |
| finally { |
| stopGrid(0); |
| } |
| } |
| |
| |
| /** |
| * @throws Exception If test failed. |
| */ |
| @Test |
| public void testFailTask() throws Exception { |
| long tstamp = startTimestamp(); |
| |
| ComputeTaskFuture<?> fut = generateEvents(null, new GridAllEventsFailTestJob()); |
| |
| try { |
| fut.get(); |
| |
| assert false : "Grid with locally executed job with timeout should throw ComputeTaskTimeoutException."; |
| } |
| catch (IgniteException e) { |
| info("Expected exception caught [taskFuture=" + fut + ", exception=" + e + ']'); |
| } |
| |
| List<Event> evts = pullEvents(tstamp, 7); |
| |
| assertEvent(evts.get(0).type(), EVT_TASK_STARTED, evts); |
| assertEvent(evts.get(1).type(), EVT_JOB_MAPPED, evts); |
| assertEvent(evts.get(2).type(), EVT_JOB_QUEUED, evts); |
| assertEvent(evts.get(3).type(), EVT_JOB_STARTED, evts); |
| assertEvent(evts.get(4).type(), EVT_JOB_RESULTED, evts); |
| assertEvent(evts.get(5).type(), EVT_TASK_FAILED, evts); |
| assertEvent(evts.get(6).type(), EVT_JOB_FAILED, evts); |
| |
| // Exception was thrown, so policy is null. |
| assertNull(((JobEvent)evts.get(4)).resultPolicy()); |
| } |
| |
| /** |
| * @throws Exception If test failed. |
| */ |
| @Test |
| public void testTimeoutTask() throws Exception { |
| long tstamp = startTimestamp(); |
| |
| ComputeTaskFuture<?> fut = generateEvents(1000L, new GridAllEventsTimeoutTestJob()); |
| |
| try { |
| fut.get(); |
| |
| assert false : "Task should fail."; |
| } |
| catch (ComputeTaskTimeoutException e) { |
| info("Expected timeout exception caught [taskFuture=" + fut + ", exception=" + e + ']'); |
| } |
| |
| List<Event> evts = pullEvents(tstamp, 6); |
| |
| assertEvent(evts.get(0).type(), EVT_TASK_STARTED, evts); |
| assertEvent(evts.get(1).type(), EVT_JOB_MAPPED, evts); |
| assertEvent(evts.get(2).type(), EVT_JOB_QUEUED, evts); |
| assertEvent(evts.get(3).type(), EVT_JOB_STARTED, evts); |
| |
| boolean isTaskTimeout = false; |
| boolean isTaskFailed = false; |
| |
| for (int i = 4; i < evts.size(); i++) { |
| int evtType = evts.get(i).type(); |
| |
| if (evtType == EVT_TASK_TIMEDOUT) { |
| assert !isTaskTimeout; |
| assert !isTaskFailed; |
| |
| isTaskTimeout = true; |
| } |
| else if (evtType == EVT_TASK_FAILED) { |
| assert isTaskTimeout; |
| assert !isTaskFailed; |
| |
| isTaskFailed = true; |
| } |
| else { |
| assert evtType == EVT_JOB_CANCELLED |
| || evtType == EVT_JOB_TIMEDOUT |
| || evtType == EVT_JOB_FAILED |
| || evtType == EVT_JOB_FINISHED : |
| "Unexpected event: " + evts.get(i); |
| } |
| } |
| |
| assert isTaskTimeout; |
| assert isTaskFailed; |
| } |
| |
| /** |
| * Returns timestamp at the method call moment, but sleeps before return, |
| * to allow pass {@link IgniteUtils#currentTimeMillis()}. |
| * |
| * @return Call timestamp. |
| * @throws Exception If failed. |
| */ |
| private long startTimestamp() throws Exception { |
| final long tstamp = U.currentTimeMillis(); |
| |
| Thread.sleep(20); |
| |
| GridTestUtils.waitForCondition(new GridAbsPredicate() { |
| @Override public boolean apply() { |
| return U.currentTimeMillis() > tstamp; |
| } |
| }, 5000); |
| |
| assert U.currentTimeMillis() > tstamp; |
| |
| return U.currentTimeMillis(); |
| } |
| |
| /** |
| * Pull all test task related events since the given moment. |
| * |
| * @param since Earliest time to pulled events. |
| * @param evtCnt Expected event count |
| * @return List of events. |
| * @throws Exception If failed. |
| */ |
| private List<Event> pullEvents(long since, int evtCnt) throws Exception { |
| return pullEvents(since, evtCnt, GridAllEventsTestTask.class.getName()); |
| } |
| |
| /** |
| * Pull all test task related events since the given moment. |
| * |
| * @param since Earliest time to pulled events. |
| * @param evtCnt Expected event count. |
| * @param taskName Name of the task. |
| * @return List of events. |
| * @throws Exception If failed. |
| */ |
| private List<Event> pullEvents(long since, int evtCnt, String taskName) throws Exception { |
| IgnitePredicate<Event> filter = new CustomEventFilter(taskName, since); |
| |
| for (int i = 0; i < 3; i++) { |
| List<Event> evts = new ArrayList<>(ignite.events().localQuery((filter))); |
| |
| info("Filtered events [size=" + evts.size() + ", evts=" + evts + ']'); |
| |
| if (evtCnt != evts.size() && i < 2) { |
| U.warn(log, "Invalid event count (will retry in 1000 ms) [actual=" + evts.size() + |
| ", expected=" + evtCnt + ", evts=" + evts + ']'); |
| |
| U.sleep(1000); |
| |
| continue; |
| } |
| |
| assert evtCnt <= evts.size() : "Invalid event count [actual=" + evts.size() + ", expected=" + evtCnt + |
| ", evts=" + evts + ']'; |
| |
| return evts; |
| } |
| |
| assert false; |
| |
| return null; |
| } |
| |
| /** |
| * @param timeout Timeout. |
| * @param job Job. |
| * @return Task future. |
| * @throws Exception If failed. |
| */ |
| private ComputeTaskFuture<?> generateEvents(@Nullable Long timeout, ComputeJob job) throws Exception { |
| return timeout == null |
| ? ignite.compute().executeAsync(GridAllEventsTestTask.class.getName(), job) |
| : ignite.compute().withTimeout(timeout).executeAsync(GridAllEventsTestTask.class.getName(), job); |
| } |
| |
| /** |
| * |
| */ |
| private static class CustomEventFilter implements IgnitePredicate<Event> { |
| /** */ |
| private final String taskName; |
| |
| /** */ |
| private final long tstamp; |
| |
| /** |
| * @param taskName Task name. |
| * @param tstamp Timestamp. |
| */ |
| CustomEventFilter(String taskName, long tstamp) { |
| assert taskName != null; |
| assert tstamp > 0; |
| |
| this.taskName = taskName; |
| this.tstamp = tstamp; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean apply(Event evt) { |
| if (evt.timestamp() >= tstamp) { |
| if (evt instanceof TaskEvent) |
| return taskName.equals(((TaskEvent)evt).taskName()); |
| else if (evt instanceof JobEvent) |
| return taskName.equals(((JobEvent)evt).taskName()); |
| else if (evt instanceof DeploymentEvent) |
| return taskName.equals(((DeploymentEvent)evt).alias()); |
| else if (evt instanceof CheckpointEvent) |
| return true; |
| } |
| |
| return false; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class GridAllEventsSuccessTestJob extends ComputeJobAdapter { |
| /** */ |
| @TaskSessionResource |
| private ComputeTaskSession taskSes; |
| |
| /** {@inheritDoc} */ |
| @Override public String execute() { |
| assert taskSes != null; |
| |
| taskSes.saveCheckpoint("testCheckpoint", "TestState"); |
| taskSes.removeCheckpoint("testCheckpoint"); |
| |
| return "GridAllEventsSuccessTestJob-test-event-success."; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class GridAllEventsFailTestJob extends ComputeJobAdapter { |
| /** {@inheritDoc} */ |
| @Override public String execute() { |
| throw new RuntimeException("GridAllEventsFailTestJob expected test exception."); |
| } |
| } |
| |
| /** |
| */ |
| private static class GridAllEventsTimeoutTestJob extends ComputeJobAdapter { |
| /** */ |
| @LoggerResource |
| private IgniteLogger log; |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("BusyWait") |
| @Override public String execute() { |
| try { |
| while (!isCancelled()) |
| Thread.sleep(5000); |
| } |
| catch (InterruptedException ignored) { |
| if (log.isInfoEnabled()) |
| log.info("GridAllEventsTimeoutTestJob was interrupted."); |
| |
| return "GridAllEventsTimeoutTestJob-test-event-timeout."; |
| } |
| |
| return "GridAllEventsTimeoutTestJob-test-event-timeout."; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class GridAllCheckpointEventsTestJob extends ComputeJobAdapter { |
| /** */ |
| @TaskSessionResource |
| private ComputeTaskSession taskSes; |
| |
| /** {@inheritDoc} */ |
| @Override public String execute() { |
| assert taskSes != null; |
| |
| taskSes.saveCheckpoint("testAllCheckpoint", "CheckpointTestState"); |
| taskSes.loadCheckpoint("testAllCheckpoint"); |
| taskSes.removeCheckpoint("testAllCheckpoint"); |
| |
| return "GridAllCheckpointEventsSuccess-test-all-checkpoint-event-success."; |
| } |
| } |
| |
| /** |
| * |
| */ |
| @ComputeTaskSessionFullSupport |
| @ComputeTaskMapAsync // TODO: IGNITE-3099 (hotfix the test to check the event order in common case). |
| private static class GridAllEventsTestTask extends ComputeTaskSplitAdapter<Object, Object> { |
| /** {@inheritDoc} */ |
| @Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) { |
| return Collections.singleton((ComputeJob)arg); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Serializable reduce(List<ComputeJobResult> results) { |
| assert results != null; |
| assert results.size() == 1; |
| |
| return (Serializable)results; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class GridFailoverTestTask extends GridAllEventsTestTask { |
| /** */ |
| private final AtomicBoolean failed = new AtomicBoolean(); |
| |
| /** {@inheritDoc} */ |
| @Override public ComputeJobResultPolicy result(ComputeJobResult res, |
| List<ComputeJobResult> rcvd) throws IgniteException { |
| if (failed.compareAndSet(false, true)) |
| return FAILOVER; |
| |
| return super.result(res, rcvd); |
| } |
| } |
| } |