blob: 219f9ae0e64c96b8bce5f150ad533fd86af4d54f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence;
import java.io.File;
import java.io.IOException;
import java.nio.file.OpenOption;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
import static org.apache.ignite.testframework.GridTestUtils.suppressException;
/**
* Tests for verification of binary metadata async writing to disk.
*/
public class IgnitePdsBinaryMetadataAsyncWritingTest extends GridCommonAbstractTest {
/** */
private static final AtomicReference<CountDownLatch> fileWriteLatchRef = new AtomicReference<>(null);
/** */
private FileIOFactory specialFileIOFactory;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
if (igniteInstanceName.contains("client")) {
cfg.setClientMode(true);
return cfg;
}
cfg.setDataStorageConfiguration(
new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
.setPersistenceEnabled(true)
)
.setFileIOFactory(
specialFileIOFactory != null ? specialFileIOFactory : new RandomAccessFileIOFactory()
)
);
cfg.setCacheConfiguration(
new CacheConfiguration(DEFAULT_CACHE_NAME)
.setBackups(1)
.setAffinity(new RendezvousAffinityFunction(false, 16))
);
cfg.setFailureHandler(new StopNodeFailureHandler());
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
stopAllGrids();
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
if (fileWriteLatchRef != null && fileWriteLatchRef.get() != null)
fileWriteLatchRef.get().countDown();
stopAllGrids();
cleanPersistenceDir();
}
/**
* Verifies that registration of new binary meta does not block discovery thread
* and new node can join the cluster when binary metadata is in the process of writing.
*
* @throws Exception If failed.
*/
@Test
public void testNodeJoinIsNotBlockedByAsyncMetaWriting() throws Exception {
final CountDownLatch fileWriteLatch = initSlowFileIOFactory();
Ignite ig = startGrid(0);
ig.cluster().active(true);
IgniteCache<Object, Object> cache = ig.cache(DEFAULT_CACHE_NAME);
GridTestUtils.runAsync(() -> cache.put(0, new TestAddress(0, "USA", "NYC", "Park Ave")));
specialFileIOFactory = null;
startGrid(1);
waitForTopology(2);
fileWriteLatch.countDown();
}
/**
* Verifies that metadata is restored on node join even if it was deleted when the node was down.
*
* @throws Exception If failed.
*/
@Test
public void testBinaryMetadataIsRestoredAfterDeletionOnNodeJoin() throws Exception {
IgniteEx ig0 = startGrid(0);
IgniteEx ig1 = startGrid(1);
ig0.cluster().active(true);
IgniteCache<Object, Object> cache = ig0.cache(DEFAULT_CACHE_NAME);
int key = findAffinityKeyForNode(ig0.affinity(DEFAULT_CACHE_NAME), ig1.localNode());
cache.put(key, new TestAddress(0, "USA", "NYC", "Park Ave"));
String ig1ConsId = ig1.localNode().consistentId().toString();
stopGrid(1);
cleanBinaryMetaFolderForNode(ig1ConsId);
ig1 = startGrid(1);
stopGrid(0);
cache = ig1.cache(DEFAULT_CACHE_NAME);
TestAddress addr = (TestAddress)cache.get(key);
assertNotNull(addr);
assertEquals("USA", addr.country);
}
/**
* Verifies that request adding/modifying binary metadata (e.g. put to cache a new value)
* is blocked until write to disk is finished.
*
* @throws Exception If failed.
*/
@Test
public void testThreadRequestingUpdateBlockedTillWriteCompletion() throws Exception {
final CountDownLatch fileWriteLatch = initSlowFileIOFactory();
Ignite ig = startGrid();
ig.cluster().active(true);
IgniteCache<Object, Object> cache = ig.cache(DEFAULT_CACHE_NAME);
GridTestUtils.runAsync(() -> cache.put(1, new TestPerson(0, "John", "Oliver")));
assertEquals(0, cache.size(CachePeekMode.PRIMARY));
fileWriteLatch.countDown();
assertTrue(GridTestUtils.waitForCondition(() -> cache.size(CachePeekMode.PRIMARY) == 1, 10_000));
}
/**
* @throws Exception
*/
@Test
public void testDiscoveryIsNotBlockedOnMetadataWrite() throws Exception {
final CountDownLatch fileWriteLatch = initSlowFileIOFactory();
IgniteKernal ig = (IgniteKernal)startGrid();
ig.cluster().active(true);
IgniteCache<Object, Object> cache = ig.cache(DEFAULT_CACHE_NAME);
TestAddress addr = new TestAddress(0, "RUS", "Spb", "Nevsky");
TestPerson person = new TestPerson(0, "John", "Oliver");
person.address(addr);
TestAccount account = new TestAccount(person, 0, 1000);
GridTestUtils.runAsync(() -> cache.put(0, addr));
GridTestUtils.runAsync(() -> cache.put(0, person));
GridTestUtils.runAsync(() -> cache.put(0, account));
assertEquals(0, cache.size(CachePeekMode.PRIMARY));
Map locCache = GridTestUtils.getFieldValue(
(CacheObjectBinaryProcessorImpl)ig.context().cacheObjects(), "metadataLocCache");
assertTrue(GridTestUtils.waitForCondition(() -> locCache.size() == 3, 5_000));
fileWriteLatch.countDown();
}
/**
*
* @throws Exception If failed.
*/
@Test
public void testNodeIsStoppedOnExceptionDuringStoringMetadata() throws Exception {
IgniteEx ig0 = startGrid(0);
specialFileIOFactory = new FailingFileIOFactory(new RandomAccessFileIOFactory());
IgniteEx ig1 = startGrid(1);
ig0.cluster().active(true);
int ig1Key = findAffinityKeyForNode(ig0.affinity(DEFAULT_CACHE_NAME), ig1.localNode());
IgniteCache<Object, Object> cache = ig0.cache(DEFAULT_CACHE_NAME);
cache.put(ig1Key, new TestAddress(0, "USA", "NYC", "6th Ave"));
waitForTopology(1);
}
/**
* Verifies that no updates are applied to cache on node until all metadata write operations
* for updated type are fully written to disk.
*
* @throws Exception If failed.
*/
@Test
public void testParallelUpdatesToBinaryMetadata() throws Exception {
IgniteEx ig0 = startGrid(0);
final CountDownLatch fileWriteLatch = initSlowFileIOFactory();
IgniteEx ig1 = startGrid(1);
specialFileIOFactory = null;
IgniteEx ig2 = startGrid(2);
ig0.cluster().active(true);
int key0 = findAffinityKeyForNode(ig0.affinity(DEFAULT_CACHE_NAME), ig1.localNode());
int key1 = findAffinityKeyForNode(ig0.affinity(DEFAULT_CACHE_NAME), ig1.localNode(), key0);
assertTrue(key0 != key1);
GridTestUtils.runAsync(() -> ig0.cache(DEFAULT_CACHE_NAME).put(key0, new TestAddress(key0, "Russia", "Moscow")));
GridTestUtils.runAsync(() -> ig2.cache(DEFAULT_CACHE_NAME).put(key1, new TestAddress(key1, "USA", "NYC", "Park Ave")));
assertEquals(0, ig0.cache(DEFAULT_CACHE_NAME).size(CachePeekMode.PRIMARY));
fileWriteLatch.countDown();
assertTrue(GridTestUtils.
waitForCondition(() -> ig0.cache(DEFAULT_CACHE_NAME).size(CachePeekMode.PRIMARY) == 2, 10_000));
stopGrid(0);
stopGrid(2);
IgniteCache<Object, Object> cache = ig1.cache(DEFAULT_CACHE_NAME);
TestAddress addr0 = (TestAddress)cache.get(key0);
TestAddress addr1 = (TestAddress)cache.get(key1);
assertEquals("Russia", addr0.country);
assertEquals("USA", addr1.country);
}
/**
* Verifies that put(key) method called from client on cache in FULL_SYNC mode returns only when
* all affinity nodes for this key finished writing binary metadata.
*
* @throws Exception If failed.
*/
@Test
public void testPutRequestFromClientIsBlockedIfBinaryMetaWriteIsHanging() throws Exception {
String cacheName = "testCache";
CacheConfiguration testCacheCfg = new CacheConfiguration(cacheName)
.setBackups(2)
.setAtomicityMode(CacheAtomicityMode.ATOMIC)
.setCacheMode(CacheMode.PARTITIONED)
.setWriteSynchronizationMode(FULL_SYNC);
IgniteEx ig0 = startGrid(0);
IgniteEx cl0 = startGrid("client0");
CountDownLatch fileWriteLatch = new CountDownLatch(1);
IgniteEx ig1 = startGrid(1);
ig1.context().discovery().setCustomEventListener(
MetadataUpdateAcceptedMessage.class,
(topVer, snd, msg) -> suppressException(fileWriteLatch::await)
);
ListeningTestLogger listeningLog = new ListeningTestLogger(true, log);
LogListener waitingForWriteLsnr = LogListener.matches("Waiting for write completion of").build();
listeningLog.registerListener(waitingForWriteLsnr);
startGrid(2);
listeningLog = null;
ig0.cluster().active(true);
IgniteCache cache0 = cl0.createCache(testCacheCfg);
int key0 = findAffinityKeyForNode(ig0.affinity(cacheName), ig0.localNode());
AtomicBoolean putFinished = new AtomicBoolean(false);
GridTestUtils.runAsync(() -> {
cache0.put(key0, new TestAddress(key0, "Russia", "Saint-Petersburg"));
putFinished.set(true);
});
assertFalse(GridTestUtils.waitForCondition(() -> putFinished.get(), 5_000));
fileWriteLatch.countDown();
assertTrue(GridTestUtils.waitForCondition(() -> putFinished.get(), 5_000));
}
/**
* Verifies that put(key) method called from non-affinity server on cache in FULL_SYNC mode returns only when
* all affinity nodes for this key finished writing binary metadata.
*
* @throws Exception If failed.
*/
@Test
public void testPutRequestFromServerIsBlockedIfBinaryMetaWriteIsHanging() throws Exception {
putRequestFromServer(true);
}
/**
* Verifies that put from client to ATOMIC cache in PRIMARY_SYNC mode is not blocked
* if binary metadata async write operation hangs on backup node and not on primary.
*
* @throws Exception If failed.
*/
@Test
public void testPutRequestFromClientCompletesIfMetadataWriteHangOnBackup() throws Exception {
String cacheName = "testCache";
CacheConfiguration testCacheCfg = new CacheConfiguration(cacheName)
.setBackups(2)
.setAtomicityMode(CacheAtomicityMode.ATOMIC)
.setCacheMode(CacheMode.PARTITIONED)
.setWriteSynchronizationMode(PRIMARY_SYNC);
IgniteEx ig0 = startGrid(0);
CountDownLatch fileWriteLatch = initSlowFileIOFactory();
IgniteEx ig1 = startGrid(1);
specialFileIOFactory = null;
IgniteEx ig2 = startGrid(2);
ig0.cluster().active(true);
IgniteEx cl0 = startGrid("client0");
IgniteCache cache = cl0.createCache(testCacheCfg);
Affinity<Object> aff = cl0.affinity(cacheName);
AtomicBoolean putCompleted = new AtomicBoolean(false);
int key = findAffinityKeyForNode(aff, ig0.localNode());
GridTestUtils.runAsync(() -> {
cache.put(key, new TestAddress(key, "USA", "NYC"));
putCompleted.set(true);
});
assertTrue(GridTestUtils.waitForCondition(() -> putCompleted.get(), 5_000));
fileWriteLatch.countDown();
}
/**
* Verifies that put from server to ATOMIC cache in PRIMARY_SYNC mode is not blocked
* if binary metadata async write operation hangs on backup node and not on primary.
*
* @throws Exception If failed.
*/
@Test
public void testPutRequestFromServerCompletesIfMetadataWriteHangOnBackup() throws Exception {
putRequestFromServer(false);
}
/**
* Verifies that metadata write hanging on non-affinity node doesn't block on-going put operation.
*
* @throws Exception If failed.
*/
@Test
public void testPutRequestFromClientCompletesIfMetadataWriteHangOnNonAffinityNode() throws Exception {
String cacheName = "testCache";
CacheConfiguration testCacheCfg = new CacheConfiguration(cacheName)
.setBackups(1)
.setAtomicityMode(CacheAtomicityMode.ATOMIC)
.setCacheMode(CacheMode.PARTITIONED)
.setWriteSynchronizationMode(FULL_SYNC);
IgniteEx ig0 = startGrid(0);
CountDownLatch fileWriteLatch = initSlowFileIOFactory();
IgniteEx ig1 = startGrid(1);
specialFileIOFactory = null;
IgniteEx ig2 = startGrid(2);
IgniteEx cl0 = startGrid("client0");
cl0.cluster().state(ClusterState.ACTIVE);
IgniteCache cache = cl0.createCache(testCacheCfg);
Affinity<Object> aff = cl0.affinity(cacheName);
int nonAffKey = findNonAffinityKeyForNode(aff, ig1.localNode(), 0);
AtomicBoolean putCompleted = new AtomicBoolean(false);
GridTestUtils.runAsync(() -> {
cache.put(nonAffKey, new TestAddress(nonAffKey, "USA", "NYC"));
putCompleted.set(true);
});
assertTrue(GridTestUtils.waitForCondition(() -> putCompleted.get(), 5_000));
//internal map in BinaryMetadataFileStore with futures awaiting write operations
Map map = GridTestUtils.getFieldValue(
(CacheObjectBinaryProcessorImpl)ig1.context().cacheObjects(), "metadataFileStore", "writer", "preparedWriteTasks");
assertTrue(!map.isEmpty());
fileWriteLatch.countDown();
assertTrue(GridTestUtils.waitForCondition(() -> map.isEmpty(), 5_000));
}
/**
* @param expectedBlocked
* @throws Exception
*/
private void putRequestFromServer(boolean expectedBlocked) throws Exception {
String cacheName = "testCache";
CacheWriteSynchronizationMode syncMode = expectedBlocked ? FULL_SYNC : PRIMARY_SYNC;
CacheConfiguration testCacheCfg = new CacheConfiguration(cacheName)
.setBackups(2)
.setAtomicityMode(CacheAtomicityMode.ATOMIC)
.setCacheMode(CacheMode.PARTITIONED)
.setWriteSynchronizationMode(syncMode);
IgniteEx ig0 = startGrid(0);
startGrid(1);
final CountDownLatch fileWriteLatch = initSlowFileIOFactory();
IgniteEx ig2 = startGrid(2);
specialFileIOFactory = null;
startGrid(3);
ig0.cluster().active(true);
IgniteCache cache = ig0.createCache(testCacheCfg);
int key = 0;
Affinity<Object> aff = ig0.affinity(cacheName);
while (true) {
key = findNonAffinityKeyForNode(aff, ig0.localNode(), key);
if (aff.isBackup(ig2.localNode(), key))
break;
else
key++;
}
AtomicBoolean putFinished = new AtomicBoolean(false);
int key0 = key;
GridTestUtils.runAsync(() -> {
cache.put(key0, new TestAddress(key0, "USA", "NYC"));
putFinished.set(true);
});
if (expectedBlocked) {
assertFalse(GridTestUtils.waitForCondition(() -> putFinished.get(), 5_000));
fileWriteLatch.countDown();
assertTrue(GridTestUtils.waitForCondition(() -> putFinished.get(), 5_000));
}
else
assertTrue(GridTestUtils.waitForCondition(() -> putFinished.get(), 5_000));
}
/**
* Initializes special FileIOFactory emulating slow write to disk.
*
* @return Latch to release write operation.
*/
private CountDownLatch initSlowFileIOFactory() {
CountDownLatch cdl = new CountDownLatch(1);
specialFileIOFactory = new SlowFileIOFactory(new RandomAccessFileIOFactory());
fileWriteLatchRef.set(cdl);
return cdl;
}
/**
* Deletes directory with persisted binary metadata for a node with given Consistent ID.
*/
private void cleanBinaryMetaFolderForNode(String consId) throws IgniteCheckedException {
String dfltWorkDir = U.defaultWorkDirectory();
File metaDir = U.resolveWorkDirectory(dfltWorkDir, "binary_meta", false);
for (File subDir : metaDir.listFiles()) {
if (subDir.getName().contains(consId)) {
U.delete(subDir);
return;
}
}
}
/** Finds a key that target node is neither primary or backup. */
private int findNonAffinityKeyForNode(Affinity aff, ClusterNode targetNode, int startFrom) {
int key = startFrom;
while (true) {
if (!aff.isPrimaryOrBackup(targetNode, key))
return key;
key++;
}
}
/** Finds a key that target node is a primary node for. */
private int findAffinityKeyForNode(Affinity aff, ClusterNode targetNode, Integer... excludeKeys) {
int key = 0;
while (true) {
if (aff.isPrimary(targetNode, key)
&& (excludeKeys != null ? !Arrays.asList(excludeKeys).contains(Integer.valueOf(key)) : true))
return key;
key++;
}
}
/** */
static final class TestPerson {
/** */
private final int id;
/** */
private final String firstName;
/** */
private final String surname;
/** */
private TestAddress addr;
/** */
TestPerson(int id, String firstName, String surname) {
this.id = id;
this.firstName = firstName;
this.surname = surname;
}
/** */
void address(TestAddress addr) {
this.addr = addr;
}
}
/** */
static final class TestAddress {
/** */
private final int id;
/** */
private final String country;
/** */
private final String city;
/** */
private final String address;
/** */
TestAddress(int id, String country, String city) {
this.id = id;
this.country = country;
this.city = city;
this.address = null;
}
/** */
TestAddress(int id, String country, String city, String street) {
this.id = id;
this.country = country;
this.city = city;
this.address = street;
}
}
/** */
static final class TestAccount {
/** */
private final TestPerson person;
/** */
private final int accountId;
/** */
private final long accountBalance;
/** */
TestAccount(TestPerson person, int id, long balance) {
this.person = person;
accountId = id;
accountBalance = balance;
}
}
/** */
private static boolean isBinaryMetaFile(File file) {
return file.getPath().contains("binary_meta");
}
/** */
static final class SlowFileIOFactory implements FileIOFactory {
/** */
private final FileIOFactory delegateFactory;
/**
* @param delegateFactory Delegate factory.
*/
SlowFileIOFactory(FileIOFactory delegateFactory) {
this.delegateFactory = delegateFactory;
}
/** {@inheritDoc} */
@Override public FileIO create(File file, OpenOption... modes) throws IOException {
FileIO delegate = delegateFactory.create(file, modes);
if (isBinaryMetaFile(file))
return new SlowFileIO(delegate, fileWriteLatchRef.get());
return delegate;
}
}
/** */
static class SlowFileIO extends FileIODecorator {
/** */
private final CountDownLatch fileWriteLatch;
/**
* @param delegate File I/O delegate
*/
public SlowFileIO(FileIO delegate, CountDownLatch fileWriteLatch) {
super(delegate);
this.fileWriteLatch = fileWriteLatch;
}
/** {@inheritDoc} */
@Override public int write(byte[] buf, int off, int len) throws IOException {
try {
fileWriteLatch.await();
}
catch (InterruptedException e) {
// No-op.
}
return super.write(buf, off, len);
}
}
/** */
static final class FailingFileIOFactory implements FileIOFactory {
/** */
private final FileIOFactory delegateFactory;
/**
* @param factory Delegate factory.
*/
FailingFileIOFactory(FileIOFactory factory) {
delegateFactory = factory;
}
/** {@inheritDoc}*/
@Override public FileIO create(File file, OpenOption... modes) throws IOException {
FileIO delegate = delegateFactory.create(file, modes);
if (isBinaryMetaFile(file))
return new FailingFileIO(delegate);
return delegate;
}
}
/** */
static final class FailingFileIO extends FileIODecorator {
/**
* @param delegate File I/O delegate
*/
public FailingFileIO(FileIO delegate) {
super(delegate);
}
/** {@inheritDoc}*/
@Override public int write(byte[] buf, int off, int len) throws IOException {
throw new IOException("Error occured during write of binary metadata");
}
}
}