blob: 2e3e93513e2b487b2460f8e65fc2ec6cff990397 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.session;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobContext;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeJobSibling;
import org.apache.ignite.compute.ComputeTaskFuture;
import org.apache.ignite.compute.ComputeTaskSession;
import org.apache.ignite.compute.ComputeTaskSessionFullSupport;
import org.apache.ignite.compute.ComputeTaskSplitAdapter;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.resources.JobContextResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.resources.TaskSessionResource;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonTest;
import org.junit.Test;
/**
* Tests waiting for session attributes.
*/
@GridCommonTest(group = "Task Session")
@SuppressWarnings({"PublicInnerClass"})
public class GridSessionWaitAttributeSelfTest extends GridCommonAbstractTest {
/** */
private static final int ATTR_NUM = 100;
/** */
private static final int JOB_NUM = 10;
/** */
private static final long WAIT_TIMEOUT = 20000;
/** */
private enum WaitAttributeType {
/** waitForAttribute(Serializable key). */
WAIT_FOR_ATTRIBUTE_KEY,
/** waitForAttribute(Serializable key, Serializable val). */
WAIT_FOR_ATTRIBUTE_KEY_VAL,
/** waitForAttribute(Serializable key, long timeout). */
WAIT_FOR_ATTRIBUTE_KEY_TIMEOUT,
/** waitForAttribute(Serializable key, Serializable val, long timeout). */
WAIT_FOR_ATTRIBUTE_KEY_VAL_TIMEOUT,
/** waitForAttributes(Collection<? extends Serializable> keys). */
WAIT_FOR_ATTRIBUTES_KEYS,
/** waitForAttributes(Map<? extends Serializable, ? extends Serializable> attrs). */
WAIT_FOR_ATTRIBUTES_ATTRS,
/** waitForAttributes(Collection<? extends Serializable> keys, long timeout). */
WAIT_FOR_ATTRIBUTES_KEYS_TIMEOUT,
/** waitForAttributes(Map<? extends Serializable, ? extends Serializable> attrs, long timeout). */
WAIT_FOR_ATTRIBUTES_ATTRS_TIMEOUT
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration c = super.getConfiguration(igniteInstanceName);
TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
c.setDiscoverySpi(discoSpi);
c.setPublicThreadPoolSize(JOB_NUM * 2);
return c;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
startGrid(1);
startGrid(2);
}
/**
* @param prefix Prefix.
* @param mtd Method.
* @param i Index.
* @return Session attribute key.
*/
private static String createKey(String prefix, Enum mtd, int i) {
assert prefix != null;
assert mtd != null;
return prefix + "test.key." + mtd.name() + '.' + i;
}
/**
* @param prefix Prefix.
* @param mtd Method.
* @param i Index.
* @return Session attribute value.
*/
private static String createValue(String prefix, Enum mtd, int i) {
assert prefix != null;
assert mtd != null;
return prefix + "test.value." + mtd.name() + '.' + i;
}
/**
* @throws Exception If failed.
*/
@Test
public void testWaitAttribute() throws Exception {
checkWaitAttributeMethod(WaitAttributeType.WAIT_FOR_ATTRIBUTE_KEY);
}
/**
* @throws Exception If failed.
*/
@Test
public void testWaitAttributeWithTimeout() throws Exception {
checkWaitAttributeMethod(WaitAttributeType.WAIT_FOR_ATTRIBUTE_KEY_TIMEOUT);
}
/**
* @throws Exception If failed.
*/
@Test
public void testWaitAttributeValue() throws Exception {
checkWaitAttributeMethod(WaitAttributeType.WAIT_FOR_ATTRIBUTE_KEY_VAL);
}
/**
* @throws Exception If failed.
*/
@Test
public void testWaitAttributeValueWithTimeout() throws Exception {
checkWaitAttributeMethod(WaitAttributeType.WAIT_FOR_ATTRIBUTE_KEY_VAL_TIMEOUT);
}
/**
* @throws Exception If failed.
*/
@Test
public void testWaitAttributeValues() throws Exception {
checkWaitAttributeMethod(WaitAttributeType.WAIT_FOR_ATTRIBUTES_ATTRS);
}
/**
* @throws Exception If failed.
*/
@Test
public void testWaitAttributeValuesWithTimeout() throws Exception {
checkWaitAttributeMethod(WaitAttributeType.WAIT_FOR_ATTRIBUTES_ATTRS_TIMEOUT);
}
/**
* @throws Exception If failed.
*/
@Test
public void testWaitAttributes() throws Exception {
checkWaitAttributeMethod(WaitAttributeType.WAIT_FOR_ATTRIBUTES_KEYS);
}
/**
* @throws Exception If failed.
*/
@Test
public void testWaitAttributesWithTimeout() throws Exception {
checkWaitAttributeMethod(WaitAttributeType.WAIT_FOR_ATTRIBUTES_KEYS_TIMEOUT);
}
/**
* @param type Type.
* @throws Exception If failed.
*/
private void checkWaitAttributeMethod(WaitAttributeType type) throws Exception {
assert type != null;
Ignite ignite1 = G.ignite(getTestIgniteInstanceName() + '1');
Ignite ignite2 = G.ignite(getTestIgniteInstanceName() + '2');
assert ignite1 != null;
assert ignite2 != null;
ignite1.compute().localDeployTask(TestSessionTask.class, TestSessionTask.class.getClassLoader());
ComputeTaskFuture<?> fut = ignite1.compute().executeAsync(TestSessionTask.class.getName(), type);
fut.getTaskSession().mapFuture().get();
ComputeTaskSession ses = fut.getTaskSession();
info("Task job siblings [size=" + ses.getJobSiblings().size() + ", siblings=" + ses.getJobSiblings() + ']');
for (int i = 0; i < ATTR_NUM; i++) {
String key = createKey("fut", type, i);
String val = createValue("fut", type, i);
ses.setAttribute(key, val);
}
// Check all job attributes.
for (ComputeJobSibling sibling : ses.getJobSiblings()) {
info("Checking session attributes for sibling: " + sibling);
checkSessionAttributes(ses, sibling.getJobId().toString(), type);
}
// Check that fut attributes have been set.
checkSessionAttributes(ses, "fut", type);
// Signal finish.
ses.setAttribute("done", true);
fut.get();
}
/**
* @param ses Session.
* @param prefix Prefix.
* @param type Type.
*/
private static void checkSessionAttributes(ComputeTaskSession ses, String prefix, WaitAttributeType type) {
assert ses != null;
assert type != null;
try {
switch (type) {
case WAIT_FOR_ATTRIBUTE_KEY: {
for (int i = 0; i < ATTR_NUM; i++) {
String key = createKey(prefix, type, i);
String val = createValue(prefix, type, i);
Serializable obj = ses.waitForAttribute(key, 0);
assert obj != null :
"Failed to wait for attribute [key=" + key + ", val=" + val + ", receivedVal=" + obj + ']';
assert val.equals(obj) :
"Failed to wait for attribute [key=" + key + ", val=" + val + ", receivedVal=" + obj + ']';
//System.out.println(
// Thread.currentThread().getName() +
// ":: Waited for attribute [key=" + key + ", val=" + obj + ", ses=" + ses + ']'
//);
}
break;
}
case WAIT_FOR_ATTRIBUTE_KEY_TIMEOUT: {
for (int i = 0; i < ATTR_NUM; i++) {
String key = createKey(prefix, type, i);
String val = createValue(prefix, type, i);
Serializable obj = ses.waitForAttribute(key, WAIT_TIMEOUT);
assert obj != null :
"Failed to wait for attribute [key=" + key + ", val=" + val + ", receivedVal=" + obj + ']';
assert val.equals(obj) :
"Failed to wait for attribute [key=" + key + ", val=" + val + ", receivedVal=" + obj + ']';
}
break;
}
case WAIT_FOR_ATTRIBUTE_KEY_VAL: {
for (int i = 0; i < ATTR_NUM; i++) {
String key = createKey(prefix, type, i);
String val = createValue(prefix, type, i);
boolean attr = ses.waitForAttribute(key, val, 0);
assert attr :
"Failed to wait for attribute [key=" + key + ", val=" + val + ']';
}
break;
}
case WAIT_FOR_ATTRIBUTE_KEY_VAL_TIMEOUT: {
for (int i = 0; i < ATTR_NUM; i++) {
String key = createKey(prefix, type, i);
String val = createValue(prefix, type, i);
boolean attr = ses.waitForAttribute(key, val, WAIT_TIMEOUT);
assert attr :
"Failed to wait for attribute [key=" + key + ", val=" + val + ']';
}
break;
}
case WAIT_FOR_ATTRIBUTES_ATTRS: {
Map<Object, Object> map = new HashMap<>();
for (int i = 0; i < ATTR_NUM; i++)
map.put(createKey(prefix, type, i), createValue(prefix, type, i));
boolean attrs = ses.waitForAttributes(map, 0);
assert attrs :
"Failed to wait for attribute [attrs=" + map + ']';
break;
}
case WAIT_FOR_ATTRIBUTES_ATTRS_TIMEOUT: {
Map<Object, Object> map = new HashMap<>();
for (int i = 0; i < ATTR_NUM; i++)
map.put(createKey(prefix, type, i), createValue(prefix, type, i));
boolean attrs = ses.waitForAttributes(map, WAIT_TIMEOUT);
assert attrs :
"Failed to wait for attribute [attrs=" + map + ']';
break;
}
case WAIT_FOR_ATTRIBUTES_KEYS: {
Map<Object, Object> map = new HashMap<>();
for (int i = 0; i < ATTR_NUM; i++)
map.put(createKey(prefix, type, i), createValue(prefix, type, i));
Map<?, ?> res = ses.waitForAttributes(map.keySet(), 0);
assert res != null : "Failed to wait for attribute [keys=" + map.keySet() + ']';
for (Map.Entry<Object, Object> entry : map.entrySet()) {
Object obj = res.get(entry.getKey());
assert obj != null : "Failed to get value from result map [key=" + entry.getKey() + ']';
assert entry.getValue().equals(obj) : "Fount unexpected value [key=" + entry.getKey()
+ ", val=" + obj + ", expected=" + entry.getValue();
}
break;
}
case WAIT_FOR_ATTRIBUTES_KEYS_TIMEOUT: {
Map<Object, Object> map = new HashMap<>();
for (int i = 0; i < ATTR_NUM; i++)
map.put(createKey(prefix, type, i), createValue(prefix, type, i));
Map<?, ?> res = ses.waitForAttributes(map.keySet(), WAIT_TIMEOUT);
assert res != null : "Failed to wait for attribute [keys=" + map.keySet() + ']';
for (Map.Entry<Object, Object> entry : map.entrySet()) {
Object obj = res.get(entry.getKey());
assert obj != null : "Failed to get value from result map [key=" + entry.getKey() + ']';
assert entry.getValue().equals(obj) : "Fount unexpected value [key=" + entry.getKey()
+ ", val=" + obj + ", expected=" + entry.getValue();
}
break;
}
default: {
assert false : "Unknown session wait type.";
}
}
}
catch (InterruptedException e) {
throw new IgniteException("Got interrupted while waiting for session attributes.", e);
}
}
/** */
@ComputeTaskSessionFullSupport
public static class TestSessionTask extends ComputeTaskSplitAdapter<WaitAttributeType, Object> {
/** {@inheritDoc} */
@Override protected Collection<TestSessionJob> split(int gridSize, WaitAttributeType type) {
assert type != null;
Collection<TestSessionJob> jobs = new ArrayList<>(JOB_NUM);
for (int i = 0; i < JOB_NUM; i++)
jobs.add(new TestSessionJob(type));
return jobs;
}
/** {@inheritDoc} */
@Override public Object reduce(List<ComputeJobResult> results) {
return null;
}
}
/** */
public static class TestSessionJob extends ComputeJobAdapter {
/** */
@TaskSessionResource
private ComputeTaskSession taskSes;
/** */
@JobContextResource
private ComputeJobContext jobCtx;
/** Logger. */
@LoggerResource
private IgniteLogger log;
/**
* @param arg Wait attribute type.
*/
public TestSessionJob(WaitAttributeType arg) {
super(arg);
}
/** {@inheritDoc} */
@Override public Serializable execute() {
WaitAttributeType m = argument(0);
checkSessionAttributes(taskSes, "fut", m);
IgniteUuid jobId = jobCtx.getJobId();
for (int i = 0; i < ATTR_NUM; i++) {
String key = createKey(jobId.toString(), m, i);
String val = createValue(jobId.toString(), m, i);
taskSes.setAttribute(key, val);
}
// Check that attributes just set are present.
checkSessionAttributes(taskSes, jobId.toString(), m);
Collection<ComputeJobSibling> siblings = taskSes.getJobSiblings();
if (log.isInfoEnabled())
log.info("Got siblings from job [size=" + siblings.size() + ", siblings=" + siblings + ']');
// Check attributes from siblings.
for (ComputeJobSibling sibling : taskSes.getJobSiblings()) {
if (!sibling.getJobId().equals(jobId))
checkSessionAttributes(taskSes, sibling.getJobId().toString(), m);
}
try {
taskSes.waitForAttribute("done", true, 0);
}
catch (InterruptedException e) {
throw new IgniteException("Got interrupted while waiting for 'done' attribute.", e);
}
return null;
}
}
}