blob: bed00acf98b1cc829434374b812d80e0fad8a228 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.binary;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.GridTestUtils.DiscoveryHook;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static org.junit.Assert.assertArrayEquals;
/**
*
*/
public class BinaryMetadataUpdatesFlowTest extends GridCommonAbstractTest {
/** */
private static final String SEQ_NUM_FLD = "f0";
/** */
private volatile DiscoveryHook discoveryHook;
/** */
private static final int UPDATES_COUNT = 1_000;
/** */
private static final int RESTART_DELAY = 1_000;
/** */
private static final int GRID_CNT = 5;
/** */
private static final String BINARY_TYPE_NAME = "TestBinaryType";
/** */
private static final int BINARY_TYPE_ID = 708045005;
/** */
private final Queue<BinaryUpdateDescription> updatesQueue = new ConcurrentLinkedQueue<>();
/** */
private final List<BinaryUpdateDescription> updatesList = new ArrayList<>(UPDATES_COUNT);
/** */
private final CountDownLatch startLatch = new CountDownLatch(1);
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
for (int i = 0; i < UPDATES_COUNT; i++) {
FieldType fType = null;
Object fVal = null;
switch (i % 4) {
case 0:
fType = FieldType.NUMBER;
fVal = getNumberFieldVal();
break;
case 1:
fType = FieldType.STRING;
fVal = getStringFieldVal();
break;
case 2:
fType = FieldType.ARRAY;
fVal = getArrayFieldVal();
break;
case 3:
fType = FieldType.OBJECT;
fVal = new Object();
}
BinaryUpdateDescription desc = new BinaryUpdateDescription(i, "f" + (i + 1), fType, fVal);
updatesQueue.add(desc);
updatesList.add(desc);
}
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
cfg.setPeerClassLoadingEnabled(false);
if (discoveryHook != null) {
((TestTcpDiscoverySpi)cfg.getDiscoverySpi()).discoveryHook(discoveryHook);
cfg.setMetricsUpdateFrequency(1000);
}
cfg.setMarshaller(new BinaryMarshaller());
cfg.setClientMode("client".equals(gridName) || getTestIgniteInstanceIndex(gridName) >= GRID_CNT);
CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
ccfg.setCacheMode(CacheMode.REPLICATED);
cfg.setCacheConfiguration(ccfg);
return cfg;
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
stopAllGrids();
}
/**
* Starts computation job.
*
* @param idx Grid index on which computation job should start.
* @param restartIdx The index of the node to be restarted.
* @param workersCntr The current number of computation threads.
*/
private void startComputation(int idx, AtomicInteger restartIdx, AtomicInteger workersCntr) {
Ignite ignite = grid(idx);
ClusterGroup cg = ignite.cluster().forLocal();
ignite.compute(cg).callAsync(new BinaryObjectAdder(startLatch, idx, updatesQueue, restartIdx, workersCntr));
}
/**
* @throws Exception If failed.
*/
@Test
public void testFlowNoConflicts() throws Exception {
startGridsMultiThreaded(GRID_CNT);
doTestFlowNoConflicts();
awaitPartitionMapExchange();
Ignite randomNode = G.allGrids().get(0);
IgniteCache<Object, Object> cache = randomNode.cache(DEFAULT_CACHE_NAME);
int cacheEntries = cache.size(CachePeekMode.PRIMARY);
assertTrue("Cache cannot contain more entries than were put in it;", cacheEntries <= UPDATES_COUNT);
assertEquals("There are less than expected entries, data loss occurred;", UPDATES_COUNT, cacheEntries);
validateCache(randomNode);
}
/**
* @throws Exception If failed.
*/
@Test
public void testFlowNoConflictsWithClients() throws Exception {
startGridsMultiThreaded(GRID_CNT);
if (!tcpDiscovery())
return;
discoveryHook = new DiscoveryHook() {
@Override public void beforeDiscovery(DiscoveryCustomMessage customMsg) {
if (customMsg instanceof MetadataUpdateProposedMessage) {
if (((MetadataUpdateProposedMessage) customMsg).typeId() == BINARY_TYPE_ID)
GridTestUtils.setFieldValue(customMsg, "typeId", 1);
}
else if (customMsg instanceof MetadataUpdateAcceptedMessage) {
if (((MetadataUpdateAcceptedMessage) customMsg).typeId() == BINARY_TYPE_ID)
GridTestUtils.setFieldValue(customMsg, "typeId", 1);
}
}
};
Ignite deafClient = startGrid(GRID_CNT);
discoveryHook = null;
Ignite regClient = startGrid(GRID_CNT + 1);
doTestFlowNoConflicts();
awaitPartitionMapExchange();
validateCache(deafClient);
validateCache(regClient);
}
/**
* Validates that all updates are readable on the specified node.
*
* @param ignite Ignite instance.
*/
private void validateCache(Ignite ignite) {
String name = ignite.name();
for (Cache.Entry entry : ignite.cache(DEFAULT_CACHE_NAME).withKeepBinary()) {
BinaryObject binObj = (BinaryObject)entry.getValue();
Integer idx = binObj.field(SEQ_NUM_FLD);
BinaryUpdateDescription desc = updatesList.get(idx - 1);
Object val = binObj.field(desc.fieldName);
String errMsg = "Field " + desc.fieldName + " has unexpeted value (index=" + idx + ", node=" + name + ")";
if (desc.fieldType == FieldType.OBJECT)
assertTrue(errMsg, val instanceof BinaryObject);
else if (desc.fieldType == FieldType.ARRAY)
assertArrayEquals(errMsg, (byte[])desc.val, (byte[])val);
else
assertEquals(errMsg, desc.val, binObj.field(desc.fieldName));
}
}
/**
* @throws Exception If failed.
*/
private void doTestFlowNoConflicts() throws Exception {
final AtomicBoolean stopFlag = new AtomicBoolean();
final AtomicInteger restartIdx = new AtomicInteger(-1);
final AtomicInteger workersCntr = new AtomicInteger(0);
try {
for (int i = 0; i < GRID_CNT; i++)
startComputation(i, restartIdx, workersCntr);
IgniteInternalFuture fut =
GridTestUtils.runAsync(new NodeRestarter(stopFlag, restartIdx, workersCntr), "worker");
startLatch.countDown();
fut.get();
GridTestUtils.waitForCondition(() -> workersCntr.get() == 0, 5_000);
}
finally {
stopFlag.set(true);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testConcurrentMetadataUpdates() throws Exception {
startGrid(0);
final Ignite client = startGrid(getConfiguration("client"));
final IgniteCache<Integer, Object> cache = client.cache(DEFAULT_CACHE_NAME).withKeepBinary();
int threadsNum = 10;
final int updatesNum = 2000;
List<IgniteInternalFuture> futs = new ArrayList<>();
for (int i = 0; i < threadsNum; i++) {
final int threadId = i;
IgniteInternalFuture fut = runAsync(new Runnable() {
@Override public void run() {
try {
for (int j = 0; j < updatesNum; j++) {
BinaryObjectBuilder bob = client.binary().builder(BINARY_TYPE_NAME);
bob.setField("field" + j, threadId);
cache.put(threadId, bob.build());
}
}
catch (Exception e) {
e.printStackTrace();
}
}
}, "updater-" + i);
futs.add(fut);
}
for (IgniteInternalFuture fut : futs)
fut.get();
}
/**
* Instruction for node to perform <b>add new binary object</b> action on cache in <b>keepBinary</b> mode.
*
* Instruction includes id the object should be added under, new field to add to binary schema
* and {@link FieldType type} of the field.
*/
private static final class BinaryUpdateDescription {
/** */
private int itemId;
/** */
private String fieldName;
/** */
private FieldType fieldType;
/** */
private Object val;
/**
* @param itemId Item id.
* @param fieldName Field name.
* @param fieldType Field type.
* @param val Field value.
*/
private BinaryUpdateDescription(int itemId, String fieldName, FieldType fieldType, Object val) {
this.itemId = itemId;
this.fieldName = fieldName;
this.fieldType = fieldType;
this.val = val;
}
}
/**
*
*/
private enum FieldType {
/** */
NUMBER,
/** */
STRING,
/** */
ARRAY,
/** */
OBJECT
}
/**
* Generates random number to use when creating binary object with field of numeric {@link FieldType type}.
*/
private static int getNumberFieldVal() {
return ThreadLocalRandom.current().nextInt(100);
}
/**
* Generates random string to use when creating binary object with field of string {@link FieldType type}.
*/
private static String getStringFieldVal() {
return "str" + (100 + ThreadLocalRandom.current().nextInt(9));
}
/**
* Generates random array to use when creating binary object with field of array {@link FieldType type}.
*/
private static byte[] getArrayFieldVal() {
byte[] res = new byte[3];
ThreadLocalRandom.current().nextBytes(res);
return res;
}
/**
* @param builder Builder.
* @param desc Descriptor with parameters of BinaryObject to build.
* @return BinaryObject built by provided description
*/
private static BinaryObject newBinaryObject(BinaryObjectBuilder builder, BinaryUpdateDescription desc) {
builder.setField(SEQ_NUM_FLD, desc.itemId + 1);
builder.setField(desc.fieldName, desc.val);
return builder.build();
}
/**
* Compute job executed on each node in cluster which constantly adds new entries to ignite cache
* according to {@link BinaryUpdateDescription descriptions} it reads from shared queue.
*/
private static final class BinaryObjectAdder implements IgniteCallable<Object> {
/** */
private final CountDownLatch startLatch;
/** */
private final int idx;
/** */
private final Queue<BinaryUpdateDescription> updatesQueue;
/** */
private final AtomicInteger restartIdx;
/** */
private final AtomicInteger workersCntr;
/** */
@IgniteInstanceResource
private Ignite ignite;
/**
* @param startLatch Startup latch.
* @param idx Ignite instance index.
* @param updatesQueue Updates queue.
* @param restartIdx The index of the node to be restarted.
* @param workersCntr The number of active computation threads.
*/
BinaryObjectAdder(
CountDownLatch startLatch,
int idx,
Queue<BinaryUpdateDescription> updatesQueue,
AtomicInteger restartIdx,
AtomicInteger workersCntr
) {
this.startLatch = startLatch;
this.idx = idx;
this.updatesQueue = updatesQueue;
this.restartIdx = restartIdx;
this.workersCntr = workersCntr;
}
/** {@inheritDoc} */
@Override public Object call() throws Exception {
startLatch.await();
IgniteCache<Object, Object> cache = ignite.cache(DEFAULT_CACHE_NAME).withKeepBinary();
workersCntr.incrementAndGet();
try {
while (!updatesQueue.isEmpty()) {
BinaryUpdateDescription desc = updatesQueue.poll();
if (desc == null)
break;
BinaryObjectBuilder builder = ignite.binary().builder(BINARY_TYPE_NAME);
BinaryObject bo = newBinaryObject(builder, desc);
cache.put(desc.itemId, bo);
if (restartIdx.get() == idx)
break;
}
}
finally {
workersCntr.decrementAndGet();
if (restartIdx.get() == idx)
restartIdx.set(-1);
}
return null;
}
}
/**
* Restarts random server node and computation job.
*/
private final class NodeRestarter implements Runnable {
/** Stop thread flag. */
private final AtomicBoolean stopFlag;
/** The index of the node to be restarted. */
private final AtomicInteger restartIdx;
/** The current number of computation threads. */
private final AtomicInteger workersCntr;
/**
* @param stopFlag Stop thread flag.
* @param restartIdx The index of the node to be restarted.
* @param workersCntr The current number of computation threads.
*/
NodeRestarter(AtomicBoolean stopFlag, AtomicInteger restartIdx, AtomicInteger workersCntr) {
this.stopFlag = stopFlag;
this.restartIdx = restartIdx;
this.workersCntr = workersCntr;
}
/** {@inheritDoc} */
@Override public void run() {
try {
startLatch.await();
while (!shouldStop()) {
int idx = ThreadLocalRandom.current().nextInt(5);
restartIdx.set(idx);
while (restartIdx.get() != -1) {
if (shouldStop())
return;
Thread.sleep(10);
}
stopGrid(idx);
if (shouldStop())
return;
startGrid(idx);
startComputation(idx, restartIdx, workersCntr);
Thread.sleep(RESTART_DELAY);
}
}
catch (Exception ignore) {
// No-op.
}
}
/** */
private boolean shouldStop() {
return updatesQueue.isEmpty() || stopFlag.get() || Thread.currentThread().isInterrupted();
}
}
}