blob: 1bcb1bd4ac134c53cb4944e05c7e3a9c8ea01082 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.binary.BinaryObjectException;
import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.QueryIndexType;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryMetadata;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.spi.discovery.tcp.BlockTcpDiscoverySpi;
import org.apache.ignite.transactions.Transaction;
import org.junit.Test;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
* Tests scenario for too early metadata update completion in case of multiple concurrent updates for the same schema.
* <p>
* Scenario is the following:
*
* <ul>
* <li>Start 4 nodes, connect client to node 2 in topology order (starting from 1).</li>
* <li>Start two concurrent transactions from client node producing same schema update.</li>
* <li>Delay second update until first update will return to client with stamped propose message and writes new
* schema to local metadata cache</li>
* <li>Unblock second update. It should correctly wait until the metadata is applied on all
* nodes or tx will fail on commit.</li>
* </ul>
*/
public class BinaryMetadataConcurrentUpdateWithIndexesTest extends GridCommonAbstractTest {
/** */
private static final int FIELDS = 2;
/** */
private static final int MB = 1024 * 1024;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setIncludeEventTypes(EventType.EVTS_DISCOVERY);
BlockTcpDiscoverySpi spi = new BlockTcpDiscoverySpi();
Field rndAddrsField = U.findField(BlockTcpDiscoverySpi.class, "skipAddrsRandomization");
assertNotNull(rndAddrsField);
rndAddrsField.set(spi, true);
cfg.setDiscoverySpi(spi.setIpFinder(sharedStaticIpFinder));
QueryEntity qryEntity = new QueryEntity("java.lang.Integer", "Value");
LinkedHashMap<String, String> fields = new LinkedHashMap<>();
Collection<QueryIndex> indexes = new ArrayList<>(FIELDS);
for (int i = 0; i < FIELDS; i++) {
String name = "s" + i;
fields.put(name, "java.lang.String");
indexes.add(new QueryIndex(name, QueryIndexType.SORTED));
}
qryEntity.setFields(fields);
qryEntity.setIndexes(indexes);
cfg.setDataStorageConfiguration(new DataStorageConfiguration().
setDefaultDataRegionConfiguration(new DataRegionConfiguration().setMaxSize(50 * MB)));
cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME).
setBackups(0).
setQueryEntities(Collections.singleton(qryEntity)).
setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL).
setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC).
setCacheMode(CacheMode.PARTITIONED));
return cfg;
}
/** Flag to start syncing metadata requests. Should skip on exchange. */
private volatile boolean syncMeta;
/** Metadata init latch. Both threads must request initial metadata. */
private CountDownLatch initMetaReq = new CountDownLatch(2);
/** Thread local flag for need of waiting local metadata update. */
private ThreadLocal<Boolean> delayMetadataUpdateThreadLoc = new ThreadLocal<>();
/** Latch for waiting local metadata update. */
public static final CountDownLatch localMetaUpdatedLatch = new CountDownLatch(1);
/** */
@Test
public void testMissingSchemaUpdate() throws Exception {
// Start order is important.
Ignite node0 = startGrid("node0");
Ignite node1 = startGrid("node1");
IgniteEx client0 = startClientGrid("client0");
CacheObjectBinaryProcessorImpl.TestBinaryContext clientCtx =
(CacheObjectBinaryProcessorImpl.TestBinaryContext)((CacheObjectBinaryProcessorImpl)client0.context().
cacheObjects()).binaryContext();
clientCtx.addListener(new CacheObjectBinaryProcessorImpl.TestBinaryContext.TestBinaryContextListener() {
@Override public void onAfterMetadataRequest(int typeId, BinaryType type) {
if (syncMeta) {
try {
initMetaReq.countDown();
initMetaReq.await();
}
catch (Exception e) {
throw new BinaryObjectException(e);
}
}
}
@Override public void onBeforeMetadataUpdate(int typeId, BinaryMetadata metadata) {
// Delay one of updates until schema is locally updated on propose message.
if (delayMetadataUpdateThreadLoc.get() != null)
await(localMetaUpdatedLatch, 5000);
}
});
Ignite node2 = startGrid("node2");
Ignite node3 = startGrid("node3");
startGrid("node4");
node0.cluster().active(true);
awaitPartitionMapExchange();
syncMeta = true;
CountDownLatch clientProposeMsgBlockedLatch = new CountDownLatch(1);
AtomicBoolean clientWait = new AtomicBoolean();
final Object clientMux = new Object();
AtomicBoolean srvWait = new AtomicBoolean();
final Object srvMux = new Object();
((BlockTcpDiscoverySpi)node1.configuration().getDiscoverySpi()).setClosure((snd, msg) -> {
if (msg instanceof MetadataUpdateProposedMessage) {
if (Thread.currentThread().getName().contains("client")) {
log.info("Block custom message to client0: [locNode=" + snd + ", msg=" + msg + ']');
clientProposeMsgBlockedLatch.countDown();
// Message to client
synchronized (clientMux) {
while (!clientWait.get())
try {
clientMux.wait();
}
catch (InterruptedException e) {
fail();
}
}
}
}
return null;
});
((BlockTcpDiscoverySpi)node2.configuration().getDiscoverySpi()).setClosure((snd, msg) -> {
if (msg instanceof MetadataUpdateProposedMessage) {
MetadataUpdateProposedMessage msg0 = (MetadataUpdateProposedMessage)msg;
int pendingVer = U.field(msg0, "pendingVer");
// Should not block propose messages until they reach coordinator.
if (pendingVer == 0)
return null;
log.info("Block custom message to next server: [locNode=" + snd + ", msg=" + msg + ']');
// Message to client
synchronized (srvMux) {
while (!srvWait.get())
try {
srvMux.wait();
}
catch (InterruptedException e) {
fail();
}
}
}
return null;
});
Integer key = primaryKey(node3.cache(DEFAULT_CACHE_NAME));
IgniteInternalFuture fut0 = runAsync(() -> {
try (Transaction tx = client0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
client0.cache(DEFAULT_CACHE_NAME).put(key, build(client0, "val", 0));
tx.commit();
}
catch (Throwable t) {
log.error("err", t);
}
});
// Implements test logic.
IgniteInternalFuture fut1 = runAsync(() -> {
// Wait for initial metadata received. It should be initial version: pending=0, accepted=0
await(initMetaReq, 5000);
// Wait for blocking proposal message to client node.
await(clientProposeMsgBlockedLatch, 5000);
// Unblock proposal message to client.
clientWait.set(true);
synchronized (clientMux) {
clientMux.notify();
}
// Give some time to apply update.
doSleep(3000);
// Unblock second metadata update.
localMetaUpdatedLatch.countDown();
// Give some time for tx to complete (success or fail). fut2 will throw an error if tx has failed on commit.
doSleep(3000);
// Unblock metadata message and allow for correct version acceptance.
srvWait.set(true);
synchronized (srvMux) {
srvMux.notify();
}
});
IgniteInternalFuture fut2 = runAsync(() -> {
delayMetadataUpdateThreadLoc.set(true);
try (Transaction tx = client0.transactions().
txStart(PESSIMISTIC, REPEATABLE_READ, 0, 1)) {
client0.cache(DEFAULT_CACHE_NAME).put(key, build(client0, "val", 0));
tx.commit();
}
});
fut0.get();
fut1.get();
fut2.get();
}
/**
* @param latch Latch.
* @param timeout Timeout.
*/
private void await(CountDownLatch latch, long timeout) {
try {
latch.await(5000, MILLISECONDS);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
long cnt = initMetaReq.getCount();
if (cnt != 0)
throw new RuntimeException("Invalid latch count after wait: " + cnt);
}
/**
* @param ignite Ignite.
* @param prefix Value prefix.
* @param fields Fields.
*/
protected BinaryObject build(Ignite ignite, String prefix, int... fields) {
BinaryObjectBuilder builder = ignite.binary().builder("Value");
for (int field : fields) {
assertTrue(field < FIELDS);
builder.setField("i" + field, field);
builder.setField("s" + field, prefix + field);
}
return builder.build();
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
CacheObjectBinaryProcessorImpl.useTestBinaryCtx = true;
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
CacheObjectBinaryProcessorImpl.useTestBinaryCtx = false;
stopAllGrids();
}
}