blob: 598212bd04e3abce1c43ce44e62efdadba1fb08c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.compute.ComputeJobMasterLeaveAware;
import org.apache.ignite.compute.ComputeTaskSession;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.testframework.GridTestUtils;
/**
* Test to validate https://issues.apache.org/jira/browse/IGNITE-2310
*/
public class IgniteCacheLockPartitionOnAffinityRunTest extends IgniteCacheLockPartitionOnAffinityRunAbstractTest {
/**
* @param ignite Ignite.
* @param log Logger.
* @param orgId Organization id.
* @return Count of found Person object with specified orgId
* @throws Exception If failed.
*/
private static int getPersonsCountFromPartitionMapCheckBothCaches(final IgniteEx ignite, IgniteLogger log,
int orgId) throws Exception {
assertEquals(1, getOrganizationCountFromPartitionMap(ignite, orgId));
return getPersonsCountFromPartitionMap(ignite, orgId);
}
/**
* @param ignite Ignite.
* @param orgId Organization id.
* @return Count of found Person object with specified orgId
* @throws Exception If failed.
*/
private static int getOrganizationCountFromPartitionMap(final IgniteEx ignite,
int orgId) throws Exception {
int part = ignite.affinity(Organization.class.getSimpleName()).partition(orgId);
GridCacheAdapter<?, ?> cacheAdapterOrg = ignite.context().cache()
.internalCache(Organization.class.getSimpleName());
GridDhtLocalPartition pOrgs = cacheAdapterOrg.context().topology()
.localPartition(part, AffinityTopologyVersion.NONE, false);
int cnt = 0;
GridCursor<? extends CacheDataRow> c = pOrgs.dataStore().cursor();
CacheObjectContext ctx = cacheAdapterOrg.context().cacheObjectContext();
while (c.next()) {
CacheDataRow e = c.get();
Integer k = e.key().value(ctx, false);
Organization org = e.value().value(ctx, false);
if (org != null && org.getId() == orgId)
++cnt;
}
return cnt;
}
/**
* @param ignite Ignite.
* @param orgId Organization id.
* @return Count of found Person object with specified orgId
* @throws Exception If failed.
*/
private static int getPersonsCountFromPartitionMap(final IgniteEx ignite, int orgId)
throws Exception {
int part = ignite.affinity(Organization.class.getSimpleName()).partition(orgId);
GridCacheAdapter<?, ?> cacheAdapterPers = ignite.context().cache()
.internalCache(Person.class.getSimpleName());
GridDhtLocalPartition pPers = cacheAdapterPers.context().topology()
.localPartition(part, AffinityTopologyVersion.NONE, false);
int cnt = 0;
GridCursor<? extends CacheDataRow> c = pPers.dataStore().cursor();
CacheObjectContext ctx = cacheAdapterPers.context().cacheObjectContext();
while (c.next()) {
CacheDataRow e = c.get();
Person.Key k = e.key().value(ctx, false);
Person p = e.value().value(ctx, false);
if (p != null && p.getOrgId() == orgId && k.orgId == orgId)
++cnt;
}
return cnt;
}
/**
* @param ignite Ignite.
* @param orgId Organization id.
* @return Count of found Person object with specified orgId
*/
private static int getPersonsCountBySqlFieldLocalQuery(final IgniteEx ignite, int orgId) {
List res = ignite.cache(Person.class.getSimpleName())
.query(new SqlFieldsQuery(
String.format("SELECT p.id FROM \"%s\".Person as p " +
"WHERE p.orgId = " + orgId,
Person.class.getSimpleName())).setLocal(true))
.getAll();
return res.size();
}
/**
* @param ignite Ignite.
* @param orgId Organization id.
* @return Count of found Person object with specified orgId
*/
private static int getPersonsCountBySqlFieledLocalQueryJoinOrgs(final IgniteEx ignite,
int orgId) {
List res = ignite.cache(Person.class.getSimpleName())
.query(new SqlFieldsQuery(
String.format("SELECT p.id FROM \"%s\".Person as p, \"%s\".Organization as o " +
"WHERE p.orgId = o.id " +
"AND p.orgId = " + orgId,
Person.class.getSimpleName(),
Organization.class.getSimpleName())).setLocal(true))
.getAll();
return res.size();
}
/**
* @param ignite Ignite.
* @param orgId Organization id.
* @return Count of found Person object with specified orgId
*/
private static int getPersonsCountBySqlLocalQuery(final IgniteEx ignite, int orgId) {
List res = ignite.cache(Person.class.getSimpleName())
.query(new SqlQuery<Person.Key, Person>(Person.class, "orgId = ?").setArgs(orgId).setLocal(true))
.getAll();
return res.size();
}
/**
* @param ignite Ignite.
* @param orgId Organization id.
* @return Count of found Person object with specified orgId
*/
private static int getPersonsCountByScanLocalQuery(final IgniteEx ignite, final int orgId) {
List res = ignite.cache(Person.class.getSimpleName())
.query(new ScanQuery<>(new IgniteBiPredicate<Person.Key, Person>() {
@Override public boolean apply(Person.Key key, Person person) {
return person.getOrgId() == orgId;
}
}).setLocal(true)).getAll();
return res.size();
}
/**
* @param ignite Ignite instance.
* @param orgId Organization ID.
* @return {@code true} if partition for the given organization ID is primary on the given node.
*/
private static boolean primaryPartition(IgniteEx ignite, int orgId) {
int part = ignite.affinity(Organization.class.getSimpleName()).partition(orgId);
GridCacheAdapter<?, ?> cacheAdapterPers = ignite.context().cache()
.internalCache(Person.class.getSimpleName());
GridDhtLocalPartition pPers = cacheAdapterPers.context().topology()
.localPartition(part, AffinityTopologyVersion.NONE, false);
return pPers.primary(AffinityTopologyVersion.NONE);
}
/**
* @param ignite Ignite.
* @param log Logger.
* @param orgId Organization id.
* @return Count of found Person object with specified orgId
* @throws Exception If failed.
*/
private static int getPersonsCountSingleCache(final IgniteEx ignite, IgniteLogger log, final int orgId)
throws Exception {
int sqlCnt = getPersonsCountBySqlLocalQuery(ignite, orgId);
int sqlFieldCnt = getPersonsCountBySqlFieldLocalQuery(ignite, orgId);
int scanCnt = getPersonsCountByScanLocalQuery(ignite, orgId);
int partCnt = getPersonsCountFromPartitionMap(ignite, orgId);
assertEquals(PERS_AT_ORG_CNT, partCnt);
assertEquals(partCnt, scanCnt);
// TODO this comparison should be switched back to assertEquals
// TODO when https://issues.apache.org/jira/browse/IGNITE-7692 is fixed.
if (partCnt != sqlFieldCnt)
assertFalse("Partition is primary, but size check failed [expected=" + partCnt +
", actual=" + sqlFieldCnt + ']', primaryPartition(ignite, orgId));
if (partCnt != sqlCnt)
assertFalse("Partition is primary, but size check failed [expected=" + partCnt +
", actual=" + sqlCnt + ']', primaryPartition(ignite, orgId));
return partCnt;
}
/**
* @param ignite Ignite.
* @param log Logger.
* @param orgId Organization id.
* @return Count of found Person object with specified orgId
* @throws Exception If failed.
*/
private static int getPersonsCountMultipleCache(final IgniteEx ignite, IgniteLogger log, final int orgId)
throws Exception {
int sqlFieldCnt = getPersonsCountBySqlFieledLocalQueryJoinOrgs(ignite, orgId);
int partCnt = getPersonsCountFromPartitionMapCheckBothCaches(ignite, log, orgId);
assertEquals(partCnt, sqlFieldCnt);
return partCnt;
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
// Workaround for initial update job metadata.
grid(0).compute().affinityCall(
Arrays.asList(Person.class.getSimpleName(), Organization.class.getSimpleName()),
0,
new TestAffinityCall(new PersonsCountGetter() {
@Override public int getPersonsCount(IgniteEx ignite, IgniteLogger log, int orgId) throws Exception {
return PERS_AT_ORG_CNT;
}
}, 0));
grid(0).compute().affinityRun(
Arrays.asList(Person.class.getSimpleName(), Organization.class.getSimpleName()),
0,
new TestAffinityRun(new PersonsCountGetter() {
@Override public int getPersonsCount(IgniteEx ignite, IgniteLogger log, int orgId) throws Exception {
return PERS_AT_ORG_CNT;
}
}, 0));
}
/**
* @throws Exception If failed.
*/
public void testSingleCache() throws Exception {
final PersonsCountGetter personsCntGetter = new PersonsCountGetter() {
@Override public int getPersonsCount(IgniteEx ignite, IgniteLogger log, int orgId) throws Exception {
return getPersonsCountSingleCache(ignite, log, orgId);
}
};
// Run restart threads: start re-balancing.
beginNodesRestart();
IgniteInternalFuture<Long> affFut = null;
try {
final AtomicInteger threadNum = new AtomicInteger(0);
affFut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
@Override public void run() {
if (threadNum.getAndIncrement() % 2 == 0) {
while (System.currentTimeMillis() < endTime) {
for (final int orgId : orgIds) {
if (System.currentTimeMillis() >= endTime)
break;
grid(0).compute().affinityRun(Person.class.getSimpleName(),
new Person(0, orgId).createKey(),
new TestAffinityRun(personsCntGetter, orgId));
}
}
}
else {
while (System.currentTimeMillis() < endTime) {
for (final int orgId : orgIds) {
if (System.currentTimeMillis() >= endTime)
break;
int personsCnt = grid(0).compute().affinityCall(Person.class.getSimpleName(),
new Person(0, orgId).createKey(),
new TestAffinityCall(personsCntGetter, orgId));
assertEquals(PERS_AT_ORG_CNT, personsCnt);
}
}
}
}
}, AFFINITY_THREADS_CNT, "affinity-run");
}
finally {
if (affFut != null)
affFut.get();
}
}
/**
* @throws Exception If failed.
*/
public void testMultipleCaches() throws Exception {
final PersonsCountGetter personsCntGetter = new PersonsCountGetter() {
@Override public int getPersonsCount(IgniteEx ignite, IgniteLogger log, int orgId) throws Exception {
return getPersonsCountMultipleCache(ignite, log, orgId);
}
};
// Run restart threads: start re-balancing.
beginNodesRestart();
IgniteInternalFuture<Long> affFut = null;
try {
final AtomicInteger threadNum = new AtomicInteger(0);
affFut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
@Override public void run() {
if (threadNum.getAndIncrement() % 2 == 0) {
while (System.currentTimeMillis() < endTime) {
for (final int orgId : orgIds) {
if (System.currentTimeMillis() >= endTime)
break;
grid(0).compute().affinityRun(
Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
new Integer(orgId),
new TestAffinityRun(personsCntGetter, orgId));
}
}
}
else {
while (System.currentTimeMillis() < endTime) {
for (final int orgId : orgIds) {
if (System.currentTimeMillis() >= endTime)
break;
int personsCnt = grid(0).compute().affinityCall(
Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
new Integer(orgId),
new TestAffinityCall(personsCntGetter, orgId));
assertEquals(PERS_AT_ORG_CNT, personsCnt);
}
}
}
}
}, AFFINITY_THREADS_CNT, "affinity-run");
}
finally {
if (affFut != null)
affFut.get();
}
}
/**
* @throws Exception If failed.
*/
public void testCheckReservePartitionException() throws Exception {
int orgId = primaryKey(grid(1).cache(Organization.class.getSimpleName()));
try {
grid(0).compute().affinityRun(
Arrays.asList(Organization.class.getSimpleName(), OTHER_CACHE_NAME),
new Integer(orgId),
new IgniteRunnable() {
@Override public void run() {
// No-op.
}
});
fail("Exception is expected");
}
catch (Exception e) {
assertTrue(e.getMessage()
.startsWith("Failed partition reservation. Partition is not primary on the node."));
}
try {
grid(0).compute().affinityCall(
Arrays.asList(Organization.class.getSimpleName(), OTHER_CACHE_NAME),
new Integer(orgId),
new IgniteCallable<Object>() {
@Override public Object call() throws Exception {
return null;
}
});
fail("Exception is expected");
}
catch (Exception e) {
assertTrue(e.getMessage()
.startsWith("Failed partition reservation. Partition is not primary on the node."));
}
}
/**
* @throws Exception If failed.
*/
public void testReleasePartitionJobCompletesNormally() throws Exception {
final int orgId = primaryKey(grid(1).cache(Organization.class.getSimpleName()));
grid(0).compute().affinityRun(
Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
new Integer(orgId),
new IgniteRunnable() {
@IgniteInstanceResource
IgniteEx ignite;
@Override public void run() {
try {
checkPartitionsReservations(ignite, orgId, 1);
}
catch (Exception e) {
e.printStackTrace();
fail("Unexpected exception");
}
}
});
checkPartitionsReservations(grid(1), orgId, 0);
grid(0).compute().affinityCall(
Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
new Integer(orgId),
new IgniteCallable<Object>() {
@IgniteInstanceResource
IgniteEx ignite;
@Override public Object call() {
try {
checkPartitionsReservations(ignite, orgId, 1);
}
catch (Exception e) {
e.printStackTrace();
fail("Unexpected exception");
}
return null;
}
});
checkPartitionsReservations(grid(1), orgId, 0);
}
/**
* @throws Exception If failed.
*/
public void testReleasePartitionJobThrowsException() throws Exception {
final int orgId = primaryKey(grid(1).cache(Organization.class.getSimpleName()));
try {
grid(0).compute().affinityRun(
Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
new Integer(orgId),
new IgniteRunnable() {
@IgniteInstanceResource
IgniteEx ignite;
@Override public void run() {
try {
checkPartitionsReservations(ignite, orgId, 1);
}
catch (Exception e) {
e.printStackTrace();
fail("Unexpected exception");
}
throw new RuntimeException("Test job throws exception");
}
});
fail("Exception must be thrown");
}
catch (Exception ignored) {
checkPartitionsReservations(grid(1), orgId, 0);
}
try {
grid(0).compute().affinityCall(
Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
new Integer(orgId),
new IgniteCallable<Object>() {
@IgniteInstanceResource
IgniteEx ignite;
@Override public Object call() {
try {
checkPartitionsReservations(ignite, orgId, 1);
}
catch (Exception e) {
e.printStackTrace();
fail("Unexpected exception");
}
throw new RuntimeException("Test job throws exception");
}
});
fail("Exception must be thrown");
}
catch (Exception ignored) {
checkPartitionsReservations(grid(1), orgId, 0);
}
}
/**
* @throws Exception If failed.
*/
public void testReleasePartitionJobThrowsError() throws Exception {
final int orgId = primaryKey(grid(1).cache(Organization.class.getSimpleName()));
try {
grid(0).compute().affinityRun(
Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
new Integer(orgId),
new IgniteRunnable() {
@IgniteInstanceResource
IgniteEx ignite;
@Override public void run() {
try {
checkPartitionsReservations(ignite, orgId, 1);
}
catch (Exception e) {
e.printStackTrace();
fail("Unexpected exception");
}
throw new Error("Test job throws error");
}
});
fail("Error must be thrown");
}
catch (Throwable ignored) {
checkPartitionsReservations(grid(1), orgId, 0);
}
try {
grid(0).compute().affinityCall(
Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
new Integer(orgId),
new IgniteCallable<Object>() {
@IgniteInstanceResource
IgniteEx ignite;
@Override public Object call() {
try {
checkPartitionsReservations(ignite, orgId, 1);
}
catch (Exception e) {
e.printStackTrace();
fail("Unexpected exception");
}
throw new Error("Test job throws error");
}
});
fail("Error must be thrown");
}
catch (Throwable ignored) {
checkPartitionsReservations(grid(1), orgId, 0);
}
}
/**
* @throws Exception If failed.
*/
public void testReleasePartitionJobUnmarshalingFails() throws Exception {
final int orgId = primaryKey(grid(1).cache(Organization.class.getSimpleName()));
try {
grid(0).compute().affinityRun(
Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
new Integer(orgId),
new JobFailUnmarshaling());
fail("Unmarshaling exception must be thrown");
}
catch (Exception ignored) {
checkPartitionsReservations(grid(1), orgId, 0);
}
}
/**
* @throws Exception If failed.
*/
public void testReleasePartitionJobMasterLeave() throws Exception {
final int orgId = primaryKey(grid(0).cache(Organization.class.getSimpleName()));
try {
grid(1).compute().affinityRunAsync(
Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
new Integer(orgId),
new IgniteRunnable() {
@IgniteInstanceResource
private Ignite ignite;
@Override public void run() {
try {
checkPartitionsReservations((IgniteEx)ignite, orgId, 1);
}
catch (Exception e) {
e.printStackTrace();
fail("Unexpected exception");
}
try {
Thread.sleep(1000);
}
catch (InterruptedException ignored) {
// No-op.
}
}
});
stopGrid(1, true);
Thread.sleep(3000);
awaitPartitionMapExchange();
checkPartitionsReservations(grid(0), orgId, 0);
}
finally {
startGrid(1);
awaitPartitionMapExchange();
}
try {
grid(1).compute().affinityCallAsync(
Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
new Integer(orgId),
new IgniteCallable<Object>() {
@IgniteInstanceResource
private Ignite ignite;
@Override public Object call() {
try {
checkPartitionsReservations((IgniteEx)ignite, orgId, 1);
}
catch (Exception e) {
e.printStackTrace();
fail("Unexpected exception");
}
try {
Thread.sleep(1000);
}
catch (InterruptedException ignored) {
// No-op.
}
return null;
}
});
stopGrid(1, true);
Thread.sleep(3000);
awaitPartitionMapExchange();
checkPartitionsReservations(grid(0), orgId, 0);
}
finally {
startGrid(1);
awaitPartitionMapExchange();
}
}
/**
* @throws Exception If failed.
*/
public void testReleasePartitionJobImplementMasterLeave() throws Exception {
final int orgId = primaryKey(grid(0).cache(Organization.class.getSimpleName()));
try {
grid(1).compute().affinityRunAsync(
Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
new Integer(orgId),
new RunnableWithMasterLeave() {
@IgniteInstanceResource
private Ignite ignite;
@Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteException {
// No-op.
}
@Override public void run() {
try {
checkPartitionsReservations((IgniteEx)ignite, orgId, 1);
}
catch (Exception e) {
e.printStackTrace();
fail("Unexpected exception");
}
try {
Thread.sleep(1000);
}
catch (InterruptedException ignored) {
// No-op.
}
}
});
stopGrid(1, true);
Thread.sleep(3000);
awaitPartitionMapExchange();
checkPartitionsReservations(grid(0), orgId, 0);
}
finally {
startGrid(1);
awaitPartitionMapExchange();
}
}
/** */
private interface PersonsCountGetter {
/**
* @param ignite Ignite.
* @param log Logger.
* @param orgId Org id.
* @return Count of found Person object with specified orgId
* @throws Exception If failed.
*/
int getPersonsCount(IgniteEx ignite, IgniteLogger log, int orgId) throws Exception;
}
/** */
interface RunnableWithMasterLeave extends IgniteRunnable, ComputeJobMasterLeaveAware {
}
/** */
private static class TestAffinityCall implements IgniteCallable<Integer> {
/** Persons count getter. */
PersonsCountGetter personsCntGetter;
/** Org id. */
int orgId;
/** */
@IgniteInstanceResource
private IgniteEx ignite;
/** */
@LoggerResource
private IgniteLogger log;
/** */
public TestAffinityCall() {
// No-op.
}
/**
* @param personsCntGetter Object to count Person.
* @param orgId Organization Id.
*/
public TestAffinityCall(PersonsCountGetter personsCntGetter, int orgId) {
this.personsCntGetter = personsCntGetter;
this.orgId = orgId;
}
/** {@inheritDoc} */
@Override public Integer call() throws Exception {
log.info("Begin call. orgId=" + orgId);
return personsCntGetter.getPersonsCount(ignite, log, orgId);
}
}
/** */
private static class TestAffinityRun implements IgniteRunnable {
/** Persons count getter. */
PersonsCountGetter personsCntGetter;
/** Org id. */
int orgId;
/** */
@IgniteInstanceResource
private IgniteEx ignite;
/** */
@LoggerResource
private IgniteLogger log;
/** */
public TestAffinityRun() {
// No-op.
}
/**
* @param personsCntGetter Object to count Person.
* @param orgId Organization Id.
*/
public TestAffinityRun(PersonsCountGetter personsCntGetter, int orgId) {
this.personsCntGetter = personsCntGetter;
this.orgId = orgId;
}
/** {@inheritDoc} */
@Override public void run() {
try {
log.info("Begin run. orgId=" + orgId);
int cnt = personsCntGetter.getPersonsCount(ignite, log, orgId);
assertEquals(PERS_AT_ORG_CNT, cnt);
}
catch (Exception e) {
throw new IgniteException(e);
}
}
}
/** */
static class JobFailUnmarshaling implements Externalizable, IgniteRunnable {
/**
* Default constructor (required by Externalizable).
*/
public JobFailUnmarshaling() {
// No-op.
}
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
//No-op.
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
throw new IOException("Test job unmarshaling fails");
}
/** {@inheritDoc} */
@Override public void run() {
fail("Must not be executed");
}
}
}