| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.session; |
| |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.compute.ComputeJobAdapter; |
| import org.apache.ignite.compute.ComputeJobResult; |
| import org.apache.ignite.compute.ComputeTaskFuture; |
| import org.apache.ignite.compute.ComputeTaskMapAsync; |
| import org.apache.ignite.compute.ComputeTaskName; |
| import org.apache.ignite.compute.ComputeTaskSession; |
| import org.apache.ignite.compute.ComputeTaskSessionFullSupport; |
| import org.apache.ignite.compute.ComputeTaskSessionScope; |
| import org.apache.ignite.compute.ComputeTaskSplitAdapter; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.internal.util.typedef.G; |
| import org.apache.ignite.marshaller.Marshaller; |
| import org.apache.ignite.resources.IgniteInstanceResource; |
| import org.apache.ignite.resources.TaskSessionResource; |
| import org.apache.ignite.spi.checkpoint.CheckpointSpi; |
| import org.apache.ignite.testframework.junits.IgniteTestResources; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.apache.ignite.testframework.junits.common.GridCommonTest; |
| |
| /** |
| * Grid session checkpoint self test. |
| */ |
| @GridCommonTest(group = "Task Session") |
| public abstract class GridSessionCheckpointAbstractSelfTest extends GridCommonAbstractTest { |
| /** */ |
| protected static CheckpointSpi spi; |
| |
| /** */ |
| private static final int SPLIT_COUNT = 5; |
| |
| /** */ |
| private static volatile CountDownLatch taskLatch; |
| |
| /** */ |
| protected GridSessionCheckpointAbstractSelfTest() { |
| super(/*start grid*/false); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTest() throws Exception { |
| taskLatch = null; |
| |
| super.beforeTest(); |
| } |
| |
| /** |
| * @param sesKey Session key. |
| * @param globalKey Global key. |
| * @param globalState Global state. |
| * @throws Exception If check failed. |
| */ |
| private void checkFinishedState(String sesKey, String globalKey, String globalState) throws Exception { |
| byte[] serState = spi.loadCheckpoint(sesKey); |
| |
| assert serState == null : "Session scope variable is not null: " + Arrays.toString(serState); |
| |
| serState = spi.loadCheckpoint(globalKey); |
| |
| Marshaller marshaller = IgniteTestResources.getMarshaller(); |
| |
| assert marshaller != null; |
| |
| String state = marshaller.unmarshal(serState, getClass().getClassLoader()); |
| |
| assert state != null : "Global state is missing: " + globalKey; |
| assert state.equals(globalState) : "Invalid state value: " + state; |
| |
| spi.removeCheckpoint(globalKey); |
| |
| Object cp = spi.loadCheckpoint(globalKey); |
| |
| assert cp == null; |
| } |
| |
| /** |
| * @param sesKey Session key. |
| * @param sesState Session state. |
| * @param globalKey Global key. |
| * @param globalState Global state. |
| * @param marsh Marshaller. |
| * @param cl Class loader. |
| * @throws Exception If check failed. |
| */ |
| private static void checkRunningState(String sesKey, String sesState, String globalKey, String globalState, |
| Marshaller marsh, ClassLoader cl) throws Exception { |
| assert marsh != null; |
| assert cl != null; |
| |
| byte[] serState = spi.loadCheckpoint(sesKey); |
| |
| String state = marsh.unmarshal(serState, cl); |
| |
| assert state != null : "Session state is missing: " + sesKey; |
| assert state.equals(sesState) : "Invalid state value: " + state; |
| |
| serState = spi.loadCheckpoint(globalKey); |
| |
| state = marsh.unmarshal(serState, cl); |
| |
| assert state != null : "Global state is missing: " + globalKey; |
| assert state.equals(globalState) : "Invalid state value: " + state; |
| } |
| |
| /** |
| * @param cfg Configuration. |
| * @throws Exception If check failed. |
| */ |
| protected void checkCheckpoints(IgniteConfiguration cfg) throws Exception { |
| Ignite ignite = G.start(cfg); |
| |
| try { |
| taskLatch = new CountDownLatch(1); |
| |
| ignite.compute().localDeployTask(GridCheckpointTestTask.class, GridCheckpointTestTask.class.getClassLoader()); |
| |
| ComputeTaskFuture<?> fut = executeAsync(ignite.compute(), "GridCheckpointTestTask", null); |
| |
| fut.getTaskSession().saveCheckpoint("future:session:key", "future:session:testval"); |
| fut.getTaskSession().saveCheckpoint("future:global:key", "future:global:testval", |
| ComputeTaskSessionScope.GLOBAL_SCOPE, 0); |
| |
| taskLatch.countDown(); |
| |
| int res = (Integer)fut.get(); |
| |
| assert res == SPLIT_COUNT : "Invalid result: " + res; |
| |
| // Check fut states. |
| checkFinishedState("future:session:key", "future:global:key", "future:global:testval"); |
| |
| // Check states saved by jobs. |
| for (int i = 0; i < SPLIT_COUNT; i++) |
| checkFinishedState("job:session:key:" + i, "job:global:key:" + i, "job:global:testval:" + i); |
| // Check states saved by map(..). |
| for (int i = 0; i < SPLIT_COUNT; i++) |
| checkFinishedState("map:session:key:" + i, "map:global:key:" + i, "map:global:testval:" + i); |
| // Check states saved by reduce(..). |
| for (int i = 0; i < SPLIT_COUNT; i++) |
| checkFinishedState("reduce:session:key:" + i, "reduce:global:key:" + i, "reduce:global:testval:" + i); |
| } |
| finally { |
| G.stop(getTestIgniteInstanceName(), false); |
| } |
| } |
| |
| /** */ |
| @ComputeTaskName("GridCheckpointTestTask") |
| @ComputeTaskSessionFullSupport |
| @ComputeTaskMapAsync |
| private static class GridCheckpointTestTask extends ComputeTaskSplitAdapter<Object, Object> { |
| /** */ |
| @TaskSessionResource |
| private ComputeTaskSession ses; |
| |
| /** */ |
| @IgniteInstanceResource |
| private Ignite ignite; |
| |
| /** {@inheritDoc} */ |
| @Override protected Collection<ComputeJobAdapter> split(int gridSize, Object arg) { |
| for (int i = 0; i < SPLIT_COUNT; i++) { |
| ses.saveCheckpoint("map:session:key:" + i, "map:session:testval:" + i); |
| ses.saveCheckpoint("map:global:key:" + i, "map:global:testval:" + i, |
| ComputeTaskSessionScope.GLOBAL_SCOPE, 0); |
| } |
| |
| Collection<ComputeJobAdapter> jobs = new ArrayList<>(SPLIT_COUNT); |
| |
| for (int i = 0; i < SPLIT_COUNT; i++) { |
| jobs.add(new ComputeJobAdapter(i) { |
| /** */ |
| private static final long serialVersionUID = -9118687978815477993L; |
| |
| /** {@inheritDoc} */ |
| @Override public Serializable execute() { |
| ses.saveCheckpoint("job:session:key:" + argument(0), "job:session:testval:" + argument(0)); |
| ses.saveCheckpoint("job:global:key:" + argument(0), "job:global:testval:" + argument(0), |
| ComputeTaskSessionScope.GLOBAL_SCOPE, 0); |
| |
| return 1; |
| } |
| }); |
| } |
| |
| return jobs; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Object reduce(List<ComputeJobResult> results) { |
| int res = 0; |
| |
| for (ComputeJobResult result : results) |
| res += (Integer)result.getData(); |
| |
| for (int i = 0; i < SPLIT_COUNT; i++) { |
| ses.saveCheckpoint("reduce:session:key:" + i, "reduce:session:testval:" + i); |
| ses.saveCheckpoint("reduce:global:key:" + i, "reduce:global:testval:" + i, |
| ComputeTaskSessionScope.GLOBAL_SCOPE, 0); |
| } |
| |
| try { |
| if (taskLatch != null) |
| taskLatch.await(30, TimeUnit.SECONDS); |
| |
| // Check task and job states. |
| for (int i = 0; i < SPLIT_COUNT; i++) { |
| // Check task map state. |
| checkRunningState("map:session:key:" + i, "map:session:testval:" + i, "map:global:key:" + i, |
| "map:global:testval:" + i, ignite.configuration().getMarshaller(), getClass().getClassLoader()); |
| |
| // Check task reduce state. |
| checkRunningState("reduce:session:key:" + i, "reduce:session:testval:" + i, |
| "reduce:global:key:" + i, "reduce:global:testval:" + i, |
| ignite.configuration().getMarshaller(), getClass().getClassLoader()); |
| |
| // Check task map state. |
| checkRunningState("job:session:key:" + i, "job:session:testval:" + i, "job:global:key:" + i, |
| "job:global:testval:" + i, ignite.configuration().getMarshaller(), getClass().getClassLoader()); |
| } |
| } |
| catch (Exception e) { |
| throw new IgniteException("Running state check failure.", e); |
| } |
| |
| return res; |
| } |
| } |
| } |