IGNITE-12099 Avoid writing metadata to disk in discovery threads - Fixes #7396.
Signed-off-by: Alexey Goncharuk <alexey.goncharuk@gmail.com>
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
index 76448a8..45e8b57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
@@ -188,7 +188,11 @@
/** {@inheritDoc} */
@Override public void finishUnmarshal(CacheObjectValueContext ctx, ClassLoader ldr) throws IgniteCheckedException {
- this.ctx = ((CacheObjectBinaryProcessorImpl)ctx.kernalContext().cacheObjects()).binaryContext();
+ CacheObjectBinaryProcessorImpl binaryProc = (CacheObjectBinaryProcessorImpl)ctx.kernalContext().cacheObjects();
+
+ this.ctx = binaryProc.binaryContext();
+
+ binaryProc.waitMetadataWriteIfNeeded(typeId());
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java
index bee4099..924cf14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java
@@ -18,26 +18,35 @@
import java.io.File;
import java.io.FileInputStream;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.binary.BinaryMetadata;
import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
/**
* Class handles saving/restoring binary metadata to/from disk.
*
- * Current implementation needs to be rewritten as it issues IO operations from discovery thread
- * which may lead to segmentation of nodes from cluster.
+ * Current implementation needs to be rewritten as it issues IO operations from discovery thread which may lead to
+ * segmentation of nodes from cluster.
*/
class BinaryMetadataFileStore {
/** Link to resolved binary metadata directory. Null for non persistent mode */
@@ -50,16 +59,23 @@
private final GridKernalContext ctx;
/** */
+ private final boolean isPersistenceEnabled;
+
+ /** */
private FileIOFactory fileIOFactory;
/** */
private final IgniteLogger log;
+ /** */
+ private BinaryMetadataAsyncWriter writer;
+
/**
* @param metadataLocCache Metadata locale cache.
* @param ctx Context.
* @param log Logger.
- * @param binaryMetadataFileStoreDir Path to binary metadata store configured by user, should include binary_meta and consistentId
+ * @param binaryMetadataFileStoreDir Path to binary metadata store configured by user, should include binary_meta
+ * and consistentId
*/
BinaryMetadataFileStore(
final ConcurrentMap<Integer, BinaryMetadataHolder> metadataLocCache,
@@ -69,6 +85,7 @@
) throws IgniteCheckedException {
this.metadataLocCache = metadataLocCache;
this.ctx = ctx;
+ this.isPersistenceEnabled = CU.isPersistenceEnabled(ctx.config());
this.log = log;
if (!CU.isPersistenceEnabled(ctx.config()))
@@ -90,13 +107,23 @@
}
U.ensureDirectory(workDir, "directory for serialized binary metadata", log);
+
+ writer = new BinaryMetadataAsyncWriter();
+ new IgniteThread(writer).start();
+ }
+
+ /**
+ * Stops worker for async writing of binary metadata.
+ */
+ void stop() {
+ U.cancel(writer);
}
/**
* @param binMeta Binary metadata to be written to disk.
*/
void writeMetadata(BinaryMetadata binMeta) {
- if (!CU.isPersistenceEnabled(ctx.config()))
+ if (!isPersistenceEnabled)
return;
try {
@@ -118,6 +145,8 @@
U.error(log, msg);
+ writer.cancel();
+
ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
throw new IgniteException(msg, e);
@@ -128,7 +157,7 @@
* Restores metadata on startup of {@link CacheObjectBinaryProcessorImpl} but before starting discovery.
*/
void restoreMetadata() {
- if (!CU.isPersistenceEnabled(ctx.config()))
+ if (!isPersistenceEnabled)
return;
for (File file : workDir.listFiles()) {
@@ -145,9 +174,8 @@
}
/**
- * Checks if binary metadata for the same typeId is already presented on disk.
- * If so merges it with new metadata and stores the result.
- * Otherwise just writes new metadata.
+ * Checks if binary metadata for the same typeId is already presented on disk. If so merges it with new metadata and
+ * stores the result. Otherwise just writes new metadata.
*
* @param binMeta new binary metadata to write to disk.
*/
@@ -158,7 +186,8 @@
BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(existingMeta, binMeta);
writeMetadata(mergedMeta);
- } else
+ }
+ else
writeMetadata(binMeta);
}
@@ -183,4 +212,317 @@
return null;
}
+
+ /**
+ *
+ */
+ void prepareMetadataWriting(BinaryMetadata meta, int typeVer) {
+ if (!isPersistenceEnabled)
+ return;
+
+ writer.prepareWriteFuture(meta, typeVer);
+ }
+
+ /**
+ * @param typeId Type ID.
+ * @param typeVer Type version.
+ */
+ void writeMetadataAsync(int typeId, int typeVer) {
+ if (!isPersistenceEnabled)
+ return;
+
+ writer.startWritingAsync(typeId, typeVer);
+ }
+
+ /**
+ * {@code typeVer} parameter is always non-negative except one special case
+ * (see {@link CacheObjectBinaryProcessorImpl#addMeta(int, BinaryType, boolean)} for context):
+ * if request for bin meta update arrives right at the moment when node is stopping
+ * {@link MetadataUpdateResult} of special type is generated: UPDATE_DISABLED.
+ *
+ * @param typeId
+ * @param typeVer
+ * @throws IgniteCheckedException
+ */
+ void waitForWriteCompletion(int typeId, int typeVer) throws IgniteCheckedException {
+ if (!isPersistenceEnabled)
+ return;
+
+ writer.waitForWriteCompletion(typeId, typeVer);
+ }
+
+ /**
+ * @param typeId Binary metadata type id.
+ * @param typeVer Type version.
+ */
+ void finishWrite(int typeId, int typeVer) {
+ if (!isPersistenceEnabled)
+ return;
+
+ writer.finishWriteFuture(typeId, typeVer);
+ }
+
+ /**
+ *
+ */
+ private class BinaryMetadataAsyncWriter extends GridWorker {
+ /**
+ * Queue of write tasks submitted for execution.
+ */
+ private final BlockingQueue<WriteOperationTask> queue = new LinkedBlockingQueue<>();
+
+ /**
+ * Write operation tasks prepared for writing (but not yet submitted to execution (actual writing).
+ */
+ private final ConcurrentMap<OperationSyncKey, WriteOperationTask> preparedWriteTasks = new ConcurrentHashMap<>();
+
+ /** */
+ BinaryMetadataAsyncWriter() {
+ super(ctx.igniteInstanceName(), "binary-metadata-writer", BinaryMetadataFileStore.this.log, ctx.workersRegistry());
+ }
+
+ /**
+ * @param typeId Type ID.
+ * @param typeVer Type version.
+ */
+ synchronized void startWritingAsync(int typeId, int typeVer) {
+ if (isCancelled())
+ return;
+
+ WriteOperationTask task = preparedWriteTasks.get(new OperationSyncKey(typeId, typeVer));
+
+ if (task != null) {
+ if (log.isDebugEnabled())
+ log.debug(
+ "Submitting task for async write for" +
+ " [typeId=" + typeId +
+ ", typeVersion=" + typeVer + ']'
+ );
+
+ queue.add(task);
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug(
+ "Task for async write for" +
+ " [typeId=" + typeId +
+ ", typeVersion=" + typeVer + "] not found"
+ );
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void cancel() {
+ super.cancel();
+
+ queue.clear();
+
+ IgniteCheckedException err = new IgniteCheckedException("Operation has been cancelled (node is stopping).");
+
+ for (Map.Entry<OperationSyncKey, WriteOperationTask> e : preparedWriteTasks.entrySet()) {
+ if (log.isDebugEnabled())
+ log.debug(
+ "Cancelling future for write operation for" +
+ " [typeId=" + e.getKey().typeId +
+ ", typeVer=" + e.getKey().typeVer + ']'
+ );
+
+ e.getValue().future.onDone(err);
+ }
+
+ preparedWriteTasks.clear();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+ while (!isCancelled()) {
+ try {
+ body0();
+ }
+ catch (InterruptedException e) {
+ if (!isCancelled) {
+ ctx.failure().process(new FailureContext(FailureType.SYSTEM_WORKER_TERMINATION, e));
+
+ throw e;
+ }
+ }
+ }
+ }
+
+ /** */
+ private void body0() throws InterruptedException {
+ WriteOperationTask task;
+
+ blockingSectionBegin();
+
+ try {
+ task = queue.take();
+
+ if (log.isDebugEnabled())
+ log.debug(
+ "Starting write operation for" +
+ " [typeId=" + task.meta.typeId() +
+ ", typeVer=" + task.typeVer + ']'
+ );
+
+ writeMetadata(task.meta);
+ }
+ finally {
+ blockingSectionEnd();
+ }
+
+ finishWriteFuture(task.meta.typeId(), task.typeVer);
+ }
+
+ /**
+ * @param typeId Binary metadata type id.
+ * @param typeVer Type version.
+ */
+ void finishWriteFuture(int typeId, int typeVer) {
+ WriteOperationTask task = preparedWriteTasks.remove(new OperationSyncKey(typeId, typeVer));
+
+ if (task != null) {
+ if (log.isDebugEnabled())
+ log.debug(
+ "Future for write operation for" +
+ " [typeId=" + typeId +
+ ", typeVer=" + typeVer + ']' +
+ " completed."
+ );
+
+ task.future.onDone();
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug(
+ "Future for write operation for" +
+ " [typeId=" + typeId +
+ ", typeVer=" + typeVer + ']' +
+ " not found."
+ );
+ }
+ }
+
+ /**
+ * @param meta Binary metadata.
+ * @param typeVer Type version.
+ */
+ synchronized void prepareWriteFuture(BinaryMetadata meta, int typeVer) {
+ if (isCancelled())
+ return;
+
+ if (log.isDebugEnabled())
+ log.debug(
+ "Prepare task for async write for" +
+ "[typeName=" + meta.typeName() +
+ ", typeId=" + meta.typeId() +
+ ", typeVersion=" + typeVer + ']'
+ );
+
+ preparedWriteTasks.putIfAbsent(new OperationSyncKey(meta.typeId(), typeVer), new WriteOperationTask(meta, typeVer));
+ }
+
+ /**
+ * @param typeId Type ID.
+ * @param typeVer Type version.
+ * @throws IgniteCheckedException If write operation failed.
+ */
+ void waitForWriteCompletion(int typeId, int typeVer) throws IgniteCheckedException {
+ //special case, see javadoc of {@link BinaryMetadataFileStore#waitForWriteCompletion}
+ if (typeVer == -1) {
+ if (log.isDebugEnabled())
+ log.debug("No need to wait for " + typeId + ", negative typeVer was passed.");
+
+ return;
+ }
+
+ WriteOperationTask task = preparedWriteTasks.get(new OperationSyncKey(typeId, typeVer));
+
+ if (task != null) {
+ if (log.isDebugEnabled())
+ log.debug(
+ "Waiting for write completion of" +
+ " [typeId=" + typeId +
+ ", typeVer=" + typeVer + "]"
+ );
+
+ try {
+ task.future.get();
+ }
+ finally {
+ if (log.isDebugEnabled())
+ log.debug(
+ "Released for write completion of" +
+ " [typeId=" + typeId +
+ ", typeVer=" + typeVer + ']'
+ );
+ }
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug(
+ "Task for async write for" +
+ " [typeId=" + typeId +
+ ", typeVersion=" + typeVer + "] not found"
+ );
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private static final class WriteOperationTask {
+ /** */
+ private final BinaryMetadata meta;
+
+ /** */
+ private final int typeVer;
+
+ /** */
+ private final GridFutureAdapter future = new GridFutureAdapter();
+
+ /** */
+ private WriteOperationTask(BinaryMetadata meta, int ver) {
+ this.meta = meta;
+ typeVer = ver;
+ }
+ }
+
+ /**
+ *
+ */
+ private static final class OperationSyncKey {
+ /** */
+ private final int typeId;
+
+ /** */
+ private final int typeVer;
+
+ /** */
+ private OperationSyncKey(int typeId, int typeVer) {
+ this.typeId = typeId;
+ this.typeVer = typeVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return 31 * typeId + typeVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ if (!(obj instanceof OperationSyncKey))
+ return false;
+
+ OperationSyncKey that = (OperationSyncKey)obj;
+
+ return (that.typeId == typeId) && (that.typeVer == typeVer);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(OperationSyncKey.class, this);
+ }
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
index 3a95586..7b89632 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
@@ -142,7 +142,7 @@
if (clientNode)
ctx.event().addLocalEventListener(new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
- DiscoveryEvent evt0 = (DiscoveryEvent) evt;
+ DiscoveryEvent evt0 = (DiscoveryEvent)evt;
if (!ctx.isStopping()) {
for (ClientMetadataRequestFuture fut : clientReqSyncMap.values())
@@ -198,13 +198,13 @@
.map(BinaryMetadataHolder::metadata)
.orElse(null);
- Set<Integer> changedSchemas = new LinkedHashSet<>();
+ Set<Integer> changedSchemas = new LinkedHashSet<>();
//Ensure after putting pending future, metadata still has difference.
BinaryMetadata mergedMeta = mergeMetadata(oldMeta, newMeta, changedSchemas);
if (mergedMeta == oldMeta) {
- resFut.onDone(MetadataUpdateResult.createSuccessfulResult());
+ resFut.onDone(MetadataUpdateResult.createSuccessfulResult(-1));
return null;
}
@@ -282,13 +282,14 @@
BinaryMetadataHolder holder = metaLocCache.get(typeId);
if (holder.acceptedVersion() >= ver)
- resFut.onDone(MetadataUpdateResult.createSuccessfulResult());
+ resFut.onDone(MetadataUpdateResult.createSuccessfulResult(-1));
return resFut;
}
/**
* Await specific schema update.
+ *
* @param typeId Type id.
* @param schemaId Schema id.
* @return Future which will be completed when schema is received.
@@ -303,8 +304,8 @@
}
/**
- * Allows client node to request latest version of binary metadata for a given typeId from the cluster
- * in case client is able to detect that it has obsolete metadata in its local cache.
+ * Allows client node to request latest version of binary metadata for a given typeId from the cluster in case
+ * client is able to detect that it has obsolete metadata in its local cache.
*
* @param typeId ID of binary type.
* @return future to wait for request arrival on.
@@ -354,7 +355,8 @@
private final class MetadataUpdateProposedListener implements CustomEventListener<MetadataUpdateProposedMessage> {
/** {@inheritDoc} */
- @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, MetadataUpdateProposedMessage msg) {
+ @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd,
+ MetadataUpdateProposedMessage msg) {
if (log.isDebugEnabled())
log.debug("Received MetadataUpdateProposedListener [typeId=" + msg.typeId() +
", typeName=" + msg.metadata().typeName() +
@@ -392,10 +394,10 @@
if (log.isDebugEnabled())
log.debug("Versions are stamped on coordinator" +
- " [typeId=" + typeId +
- ", changedSchemas=" + changedSchemas +
- ", pendingVer=" + pendingVer +
- ", acceptedVer=" + acceptedVer + "]"
+ " [typeId=" + typeId +
+ ", changedSchemas=" + changedSchemas +
+ ", pendingVer=" + pendingVer +
+ ", acceptedVer=" + acceptedVer + "]"
);
msg.metadata(mergedMeta);
@@ -429,13 +431,13 @@
holder = metaLocCache.get(typeId);
if (obsoleteUpdate(
- holder.pendingVersion(),
- holder.acceptedVersion(),
- pendingVer,
- acceptedVer)) {
+ holder.pendingVersion(),
+ holder.acceptedVersion(),
+ pendingVer,
+ acceptedVer)) {
obsoleteUpd = true;
- fut.onDone(MetadataUpdateResult.createSuccessfulResult());
+ fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1));
break;
}
@@ -457,6 +459,8 @@
log.debug("Updated metadata on originating node: " + newHolder);
metaLocCache.put(typeId, newHolder);
+
+ metadataFileStore.prepareMetadataWriting(msg.metadata(), pendingVer);
}
}
}
@@ -479,13 +483,14 @@
holder = metaLocCache.get(typeId);
if (obsoleteUpdate(
- holder.pendingVersion(),
- holder.acceptedVersion(),
- pendingVer,
- acceptedVer))
+ holder.pendingVersion(),
+ holder.acceptedVersion(),
+ pendingVer,
+ acceptedVer))
break;
- } while (!metaLocCache.replace(typeId, holder, newHolder));
+ }
+ while (!metaLocCache.replace(typeId, holder, newHolder));
}
}
else {
@@ -494,6 +499,8 @@
", changedSchemas=" + changedSchemas + ']');
metaLocCache.put(typeId, newHolder);
+
+ metadataFileStore.prepareMetadataWriting(mergedMeta, pendingVer);
}
}
catch (BinaryObjectException ignored) {
@@ -537,7 +544,8 @@
private final class MetadataUpdateAcceptedListener implements CustomEventListener<MetadataUpdateAcceptedMessage> {
/** {@inheritDoc} */
- @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, MetadataUpdateAcceptedMessage msg) {
+ @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd,
+ MetadataUpdateAcceptedMessage msg) {
if (log.isDebugEnabled())
log.debug("Received MetadataUpdateAcceptedMessage " + msg);
@@ -554,7 +562,7 @@
if (clientNode) {
BinaryMetadataHolder newHolder = new BinaryMetadataHolder(holder.metadata(),
- holder.pendingVersion(), newAcceptedVer);
+ holder.pendingVersion(), newAcceptedVer);
do {
holder = metaLocCache.get(typeId);
@@ -577,10 +585,12 @@
//this is duplicate ack
msg.duplicated(true);
+ metadataFileStore.finishWrite(typeId, newAcceptedVer);
+
return;
}
- metadataFileStore.writeMetadata(holder.metadata());
+ metadataFileStore.writeMetadataAsync(typeId, newAcceptedVer);
metaLocCache.put(typeId, new BinaryMetadataHolder(holder.metadata(), holder.pendingVersion(), newAcceptedVer));
}
@@ -612,13 +622,13 @@
}
if (fut != null)
- fut.onDone(MetadataUpdateResult.createSuccessfulResult());
+ fut.onDone(MetadataUpdateResult.createSuccessfulResult(newAcceptedVer));
}
}
/**
- * Future class responsible for blocking threads until particular events with metadata updates happen,
- * e.g. arriving {@link MetadataUpdateAcceptedMessage} acknowledgment or {@link MetadataResponseMessage} response.
+ * Future class responsible for blocking threads until particular events with metadata updates happen, e.g. arriving
+ * {@link MetadataUpdateAcceptedMessage} acknowledgment or {@link MetadataResponseMessage} response.
*/
public final class MetadataUpdateResultFuture extends GridFutureAdapter<MetadataUpdateResult> {
/** */
@@ -657,6 +667,11 @@
this.key = key;
}
+ /** */
+ public int typeVersion() {
+ return key.ver;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(MetadataUpdateResultFuture.class, this);
@@ -664,8 +679,8 @@
}
/**
- * Key for mapping arriving {@link MetadataUpdateAcceptedMessage} messages
- * to {@link MetadataUpdateResultFuture}s other threads may be waiting on.
+ * Key for mapping arriving {@link MetadataUpdateAcceptedMessage} messages to {@link MetadataUpdateResultFuture}s
+ * other threads may be waiting on.
*/
private static final class SyncKey {
/** */
@@ -710,7 +725,7 @@
if (!(o instanceof SyncKey))
return false;
- SyncKey that = (SyncKey) o;
+ SyncKey that = (SyncKey)o;
return (typeId == that.typeId) && (ver == that.ver);
}
@@ -739,7 +754,7 @@
@Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert msg instanceof MetadataRequestMessage : msg;
- MetadataRequestMessage msg0 = (MetadataRequestMessage) msg;
+ MetadataRequestMessage msg0 = (MetadataRequestMessage)msg;
int typeId = msg0.typeId();
@@ -783,7 +798,7 @@
@Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert msg instanceof MetadataResponseMessage : msg;
- MetadataResponseMessage msg0 = (MetadataResponseMessage) msg;
+ MetadataResponseMessage msg0 = (MetadataResponseMessage)msg;
int typeId = msg0.typeId();
@@ -795,7 +810,7 @@
return;
if (msg0.metadataNotFound()) {
- fut.onDone(MetadataUpdateResult.createSuccessfulResult());
+ fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1));
return;
}
@@ -811,16 +826,16 @@
// typeId metadata cannot be removed after initialization.
if (obsoleteUpdate(
- oldHolder.pendingVersion(),
- oldHolder.acceptedVersion(),
- newHolder.pendingVersion(),
- newHolder.acceptedVersion()))
+ oldHolder.pendingVersion(),
+ oldHolder.acceptedVersion(),
+ newHolder.pendingVersion(),
+ newHolder.acceptedVersion()))
break;
}
while (!metaLocCache.replace(typeId, oldHolder, newHolder));
}
- fut.onDone(MetadataUpdateResult.createSuccessfulResult());
+ fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1));
}
catch (IgniteCheckedException e) {
fut.onDone(MetadataUpdateResult.createFailureResult(new BinaryObjectException(e)));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index ac2d237..b5dabcb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -324,6 +324,9 @@
@Override public void stop(boolean cancel) {
if (transport != null)
transport.stop();
+
+ if (metadataFileStore != null)
+ metadataFileStore.stop();
}
/** {@inheritDoc} */
@@ -575,6 +578,8 @@
if (res.rejected())
throw res.error();
+ else if (!ctx.clientNode())
+ metadataFileStore.waitForWriteCompletion(typeId, res.typeVersion());
}
catch (IgniteCheckedException e) {
IgniteCheckedException ex = e;
@@ -649,6 +654,30 @@
}
/**
+ * Forces caller thread to wait for binary metadata write operation for given type ID.
+ *
+ * In case of in-memory mode this method becomes a No-op as no binary metadata is written to disk in this mode.
+ *
+ * @param typeId ID of binary type to wait for metadata write operation.
+ */
+ public void waitMetadataWriteIfNeeded(final int typeId) {
+ if (metadataFileStore == null)
+ return;
+
+ BinaryMetadataHolder hldr = metadataLocCache.get(typeId);
+
+ if (hldr != null) {
+ try {
+ metadataFileStore.waitForWriteCompletion(typeId, hldr.pendingVersion());
+ }
+ catch (IgniteCheckedException e) {
+ log.warning("Failed to wait for metadata write operation for [typeId=" + typeId +
+ ", typeVer=" + hldr.acceptedVersion() + ']', e);
+ }
+ }
+ }
+
+ /**
* @param typeId Type ID.
* @return Metadata.
* @throws IgniteException In case of error.
@@ -691,6 +720,17 @@
// No-op.
}
}
+ else if (metadataFileStore != null) {
+ try {
+ metadataFileStore.waitForWriteCompletion(typeId, holder.pendingVersion());
+ }
+ catch (IgniteCheckedException e) {
+ log.warning("Failed to wait for metadata write operation for [typeId=" + typeId +
+ ", typeVer=" + holder.acceptedVersion() + ']', e);
+
+ return null;
+ }
+ }
return holder.metadata();
}
@@ -805,6 +845,18 @@
}
}
+ if (holder != null && metadataFileStore != null) {
+ try {
+ metadataFileStore.waitForWriteCompletion(typeId, holder.pendingVersion());
+ }
+ catch (IgniteCheckedException e) {
+ log.warning("Failed to wait for metadata write operation for [typeId=" + typeId +
+ ", typeVer=" + holder.acceptedVersion() + ']', e);
+
+ return null;
+ }
+ }
+
return holder != null ? holder.metadata().wrap(binaryCtx) : null;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateResult.java
index 6c299ab..f6d0d9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateResult.java
@@ -28,13 +28,18 @@
/** */
private final BinaryObjectException error;
+ /** */
+ private final int typeVer;
+
/**
* @param resType Response type.
* @param error Error.
+ * @param typeVer Accepted version of updated type.
*/
- private MetadataUpdateResult(ResultType resType, BinaryObjectException error) {
+ private MetadataUpdateResult(ResultType resType, BinaryObjectException error, int typeVer) {
this.resType = resType;
this.error = error;
+ this.typeVer = typeVer;
}
/**
@@ -51,11 +56,16 @@
return error;
}
+ /** */
+ int typeVersion() {
+ return typeVer;
+ }
+
/**
- *
+ * @param typeVer Accepted version of updated BinaryMetadata type or <code>-1</code> if not applicable.
*/
- static MetadataUpdateResult createSuccessfulResult() {
- return new MetadataUpdateResult(ResultType.SUCCESS, null);
+ static MetadataUpdateResult createSuccessfulResult(int typeVer) {
+ return new MetadataUpdateResult(ResultType.SUCCESS, null, typeVer);
}
/**
@@ -64,14 +74,14 @@
static MetadataUpdateResult createFailureResult(BinaryObjectException err) {
assert err != null;
- return new MetadataUpdateResult(ResultType.REJECT, err);
+ return new MetadataUpdateResult(ResultType.REJECT, err, -1);
}
/**
*
*/
static MetadataUpdateResult createUpdateDisabledResult() {
- return new MetadataUpdateResult(ResultType.UPDATE_DISABLED, null);
+ return new MetadataUpdateResult(ResultType.UPDATE_DISABLED, null, -1);
}
/**
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index db821f1..6abb02e 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -123,6 +123,7 @@
org.apache.ignite.client.SslMode
org.apache.ignite.client.SslProtocol
org.apache.ignite.cluster.ClusterGroupEmptyException
+org.apache.ignite.cluster.ClusterState
org.apache.ignite.cluster.ClusterTopologyException
org.apache.ignite.compute.ComputeExecutionRejectedException
org.apache.ignite.compute.ComputeJob
@@ -1313,6 +1314,7 @@
org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor$BaselineStateAndHistoryData
org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor$CheckGlobalStateComputeRequest
org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor$ClientChangeGlobalStateComputeRequest
+org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor$ClientSetGlobalStateComputeRequest
org.apache.ignite.internal.processors.cluster.baseline.autoadjust.BaselineAutoAdjustStatus$TaskState
org.apache.ignite.internal.processors.configuration.distributed.DetachedPropertyException
org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor$AllowableAction
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsBinaryMetadataAsyncWritingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsBinaryMetadataAsyncWritingTest.java
new file mode 100644
index 0000000..219f9ae
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsBinaryMetadataAsyncWritingTest.java
@@ -0,0 +1,754 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.OpenOption;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.testframework.GridTestUtils.suppressException;
+
+/**
+ * Tests for verification of binary metadata async writing to disk.
+ */
+public class IgnitePdsBinaryMetadataAsyncWritingTest extends GridCommonAbstractTest {
+ /** */
+ private static final AtomicReference<CountDownLatch> fileWriteLatchRef = new AtomicReference<>(null);
+
+ /** */
+ private FileIOFactory specialFileIOFactory;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ if (igniteInstanceName.contains("client")) {
+ cfg.setClientMode(true);
+
+ return cfg;
+ }
+
+ cfg.setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setPersistenceEnabled(true)
+ )
+ .setFileIOFactory(
+ specialFileIOFactory != null ? specialFileIOFactory : new RandomAccessFileIOFactory()
+ )
+ );
+
+ cfg.setCacheConfiguration(
+ new CacheConfiguration(DEFAULT_CACHE_NAME)
+ .setBackups(1)
+ .setAffinity(new RendezvousAffinityFunction(false, 16))
+ );
+
+ cfg.setFailureHandler(new StopNodeFailureHandler());
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ if (fileWriteLatchRef != null && fileWriteLatchRef.get() != null)
+ fileWriteLatchRef.get().countDown();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /**
+ * Verifies that registration of new binary meta does not block discovery thread
+ * and new node can join the cluster when binary metadata is in the process of writing.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testNodeJoinIsNotBlockedByAsyncMetaWriting() throws Exception {
+ final CountDownLatch fileWriteLatch = initSlowFileIOFactory();
+
+ Ignite ig = startGrid(0);
+ ig.cluster().active(true);
+
+ IgniteCache<Object, Object> cache = ig.cache(DEFAULT_CACHE_NAME);
+ GridTestUtils.runAsync(() -> cache.put(0, new TestAddress(0, "USA", "NYC", "Park Ave")));
+
+ specialFileIOFactory = null;
+
+ startGrid(1);
+ waitForTopology(2);
+
+ fileWriteLatch.countDown();
+ }
+
+ /**
+ * Verifies that metadata is restored on node join even if it was deleted when the node was down.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testBinaryMetadataIsRestoredAfterDeletionOnNodeJoin() throws Exception {
+ IgniteEx ig0 = startGrid(0);
+ IgniteEx ig1 = startGrid(1);
+
+ ig0.cluster().active(true);
+
+ IgniteCache<Object, Object> cache = ig0.cache(DEFAULT_CACHE_NAME);
+ int key = findAffinityKeyForNode(ig0.affinity(DEFAULT_CACHE_NAME), ig1.localNode());
+ cache.put(key, new TestAddress(0, "USA", "NYC", "Park Ave"));
+
+ String ig1ConsId = ig1.localNode().consistentId().toString();
+ stopGrid(1);
+ cleanBinaryMetaFolderForNode(ig1ConsId);
+
+ ig1 = startGrid(1);
+ stopGrid(0);
+
+ cache = ig1.cache(DEFAULT_CACHE_NAME);
+ TestAddress addr = (TestAddress)cache.get(key);
+ assertNotNull(addr);
+ assertEquals("USA", addr.country);
+ }
+
+ /**
+ * Verifies that request adding/modifying binary metadata (e.g. put to cache a new value)
+ * is blocked until write to disk is finished.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testThreadRequestingUpdateBlockedTillWriteCompletion() throws Exception {
+ final CountDownLatch fileWriteLatch = initSlowFileIOFactory();
+
+ Ignite ig = startGrid();
+
+ ig.cluster().active(true);
+
+ IgniteCache<Object, Object> cache = ig.cache(DEFAULT_CACHE_NAME);
+
+ GridTestUtils.runAsync(() -> cache.put(1, new TestPerson(0, "John", "Oliver")));
+
+ assertEquals(0, cache.size(CachePeekMode.PRIMARY));
+
+ fileWriteLatch.countDown();
+
+ assertTrue(GridTestUtils.waitForCondition(() -> cache.size(CachePeekMode.PRIMARY) == 1, 10_000));
+ }
+
+ /**
+ * @throws Exception
+ */
+ @Test
+ public void testDiscoveryIsNotBlockedOnMetadataWrite() throws Exception {
+ final CountDownLatch fileWriteLatch = initSlowFileIOFactory();
+
+ IgniteKernal ig = (IgniteKernal)startGrid();
+
+ ig.cluster().active(true);
+
+ IgniteCache<Object, Object> cache = ig.cache(DEFAULT_CACHE_NAME);
+
+ TestAddress addr = new TestAddress(0, "RUS", "Spb", "Nevsky");
+ TestPerson person = new TestPerson(0, "John", "Oliver");
+ person.address(addr);
+ TestAccount account = new TestAccount(person, 0, 1000);
+
+ GridTestUtils.runAsync(() -> cache.put(0, addr));
+ GridTestUtils.runAsync(() -> cache.put(0, person));
+ GridTestUtils.runAsync(() -> cache.put(0, account));
+
+ assertEquals(0, cache.size(CachePeekMode.PRIMARY));
+
+ Map locCache = GridTestUtils.getFieldValue(
+ (CacheObjectBinaryProcessorImpl)ig.context().cacheObjects(), "metadataLocCache");
+
+ assertTrue(GridTestUtils.waitForCondition(() -> locCache.size() == 3, 5_000));
+
+ fileWriteLatch.countDown();
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testNodeIsStoppedOnExceptionDuringStoringMetadata() throws Exception {
+ IgniteEx ig0 = startGrid(0);
+
+ specialFileIOFactory = new FailingFileIOFactory(new RandomAccessFileIOFactory());
+
+ IgniteEx ig1 = startGrid(1);
+
+ ig0.cluster().active(true);
+
+ int ig1Key = findAffinityKeyForNode(ig0.affinity(DEFAULT_CACHE_NAME), ig1.localNode());
+
+ IgniteCache<Object, Object> cache = ig0.cache(DEFAULT_CACHE_NAME);
+
+ cache.put(ig1Key, new TestAddress(0, "USA", "NYC", "6th Ave"));
+
+ waitForTopology(1);
+ }
+
+ /**
+ * Verifies that no updates are applied to cache on node until all metadata write operations
+ * for updated type are fully written to disk.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testParallelUpdatesToBinaryMetadata() throws Exception {
+ IgniteEx ig0 = startGrid(0);
+
+ final CountDownLatch fileWriteLatch = initSlowFileIOFactory();
+ IgniteEx ig1 = startGrid(1);
+
+ specialFileIOFactory = null;
+ IgniteEx ig2 = startGrid(2);
+
+ ig0.cluster().active(true);
+
+ int key0 = findAffinityKeyForNode(ig0.affinity(DEFAULT_CACHE_NAME), ig1.localNode());
+ int key1 = findAffinityKeyForNode(ig0.affinity(DEFAULT_CACHE_NAME), ig1.localNode(), key0);
+
+ assertTrue(key0 != key1);
+
+ GridTestUtils.runAsync(() -> ig0.cache(DEFAULT_CACHE_NAME).put(key0, new TestAddress(key0, "Russia", "Moscow")));
+ GridTestUtils.runAsync(() -> ig2.cache(DEFAULT_CACHE_NAME).put(key1, new TestAddress(key1, "USA", "NYC", "Park Ave")));
+
+ assertEquals(0, ig0.cache(DEFAULT_CACHE_NAME).size(CachePeekMode.PRIMARY));
+
+ fileWriteLatch.countDown();
+
+ assertTrue(GridTestUtils.
+ waitForCondition(() -> ig0.cache(DEFAULT_CACHE_NAME).size(CachePeekMode.PRIMARY) == 2, 10_000));
+
+ stopGrid(0);
+ stopGrid(2);
+
+ IgniteCache<Object, Object> cache = ig1.cache(DEFAULT_CACHE_NAME);
+ TestAddress addr0 = (TestAddress)cache.get(key0);
+ TestAddress addr1 = (TestAddress)cache.get(key1);
+
+ assertEquals("Russia", addr0.country);
+ assertEquals("USA", addr1.country);
+ }
+
+ /**
+ * Verifies that put(key) method called from client on cache in FULL_SYNC mode returns only when
+ * all affinity nodes for this key finished writing binary metadata.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testPutRequestFromClientIsBlockedIfBinaryMetaWriteIsHanging() throws Exception {
+ String cacheName = "testCache";
+
+ CacheConfiguration testCacheCfg = new CacheConfiguration(cacheName)
+ .setBackups(2)
+ .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+ .setCacheMode(CacheMode.PARTITIONED)
+ .setWriteSynchronizationMode(FULL_SYNC);
+
+ IgniteEx ig0 = startGrid(0);
+ IgniteEx cl0 = startGrid("client0");
+
+ CountDownLatch fileWriteLatch = new CountDownLatch(1);
+ IgniteEx ig1 = startGrid(1);
+
+ ig1.context().discovery().setCustomEventListener(
+ MetadataUpdateAcceptedMessage.class,
+ (topVer, snd, msg) -> suppressException(fileWriteLatch::await)
+ );
+
+ ListeningTestLogger listeningLog = new ListeningTestLogger(true, log);
+ LogListener waitingForWriteLsnr = LogListener.matches("Waiting for write completion of").build();
+ listeningLog.registerListener(waitingForWriteLsnr);
+
+ startGrid(2);
+
+ listeningLog = null;
+
+ ig0.cluster().active(true);
+ IgniteCache cache0 = cl0.createCache(testCacheCfg);
+
+ int key0 = findAffinityKeyForNode(ig0.affinity(cacheName), ig0.localNode());
+
+ AtomicBoolean putFinished = new AtomicBoolean(false);
+
+ GridTestUtils.runAsync(() -> {
+ cache0.put(key0, new TestAddress(key0, "Russia", "Saint-Petersburg"));
+
+ putFinished.set(true);
+ });
+
+ assertFalse(GridTestUtils.waitForCondition(() -> putFinished.get(), 5_000));
+
+ fileWriteLatch.countDown();
+
+ assertTrue(GridTestUtils.waitForCondition(() -> putFinished.get(), 5_000));
+ }
+
+ /**
+ * Verifies that put(key) method called from non-affinity server on cache in FULL_SYNC mode returns only when
+ * all affinity nodes for this key finished writing binary metadata.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testPutRequestFromServerIsBlockedIfBinaryMetaWriteIsHanging() throws Exception {
+ putRequestFromServer(true);
+ }
+
+ /**
+ * Verifies that put from client to ATOMIC cache in PRIMARY_SYNC mode is not blocked
+ * if binary metadata async write operation hangs on backup node and not on primary.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testPutRequestFromClientCompletesIfMetadataWriteHangOnBackup() throws Exception {
+ String cacheName = "testCache";
+
+ CacheConfiguration testCacheCfg = new CacheConfiguration(cacheName)
+ .setBackups(2)
+ .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+ .setCacheMode(CacheMode.PARTITIONED)
+ .setWriteSynchronizationMode(PRIMARY_SYNC);
+
+ IgniteEx ig0 = startGrid(0);
+
+ CountDownLatch fileWriteLatch = initSlowFileIOFactory();
+ IgniteEx ig1 = startGrid(1);
+
+ specialFileIOFactory = null;
+ IgniteEx ig2 = startGrid(2);
+
+ ig0.cluster().active(true);
+
+ IgniteEx cl0 = startGrid("client0");
+
+ IgniteCache cache = cl0.createCache(testCacheCfg);
+ Affinity<Object> aff = cl0.affinity(cacheName);
+
+ AtomicBoolean putCompleted = new AtomicBoolean(false);
+ int key = findAffinityKeyForNode(aff, ig0.localNode());
+ GridTestUtils.runAsync(() -> {
+ cache.put(key, new TestAddress(key, "USA", "NYC"));
+
+ putCompleted.set(true);
+ });
+
+ assertTrue(GridTestUtils.waitForCondition(() -> putCompleted.get(), 5_000));
+
+ fileWriteLatch.countDown();
+ }
+
+ /**
+ * Verifies that put from server to ATOMIC cache in PRIMARY_SYNC mode is not blocked
+ * if binary metadata async write operation hangs on backup node and not on primary.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testPutRequestFromServerCompletesIfMetadataWriteHangOnBackup() throws Exception {
+ putRequestFromServer(false);
+ }
+
+ /**
+ * Verifies that metadata write hanging on non-affinity node doesn't block on-going put operation.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testPutRequestFromClientCompletesIfMetadataWriteHangOnNonAffinityNode() throws Exception {
+ String cacheName = "testCache";
+
+ CacheConfiguration testCacheCfg = new CacheConfiguration(cacheName)
+ .setBackups(1)
+ .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+ .setCacheMode(CacheMode.PARTITIONED)
+ .setWriteSynchronizationMode(FULL_SYNC);
+
+ IgniteEx ig0 = startGrid(0);
+
+ CountDownLatch fileWriteLatch = initSlowFileIOFactory();
+ IgniteEx ig1 = startGrid(1);
+
+ specialFileIOFactory = null;
+ IgniteEx ig2 = startGrid(2);
+
+ IgniteEx cl0 = startGrid("client0");
+ cl0.cluster().state(ClusterState.ACTIVE);
+
+ IgniteCache cache = cl0.createCache(testCacheCfg);
+ Affinity<Object> aff = cl0.affinity(cacheName);
+ int nonAffKey = findNonAffinityKeyForNode(aff, ig1.localNode(), 0);
+
+ AtomicBoolean putCompleted = new AtomicBoolean(false);
+
+ GridTestUtils.runAsync(() -> {
+ cache.put(nonAffKey, new TestAddress(nonAffKey, "USA", "NYC"));
+
+ putCompleted.set(true);
+ });
+
+ assertTrue(GridTestUtils.waitForCondition(() -> putCompleted.get(), 5_000));
+
+ //internal map in BinaryMetadataFileStore with futures awaiting write operations
+ Map map = GridTestUtils.getFieldValue(
+ (CacheObjectBinaryProcessorImpl)ig1.context().cacheObjects(), "metadataFileStore", "writer", "preparedWriteTasks");
+
+ assertTrue(!map.isEmpty());
+
+ fileWriteLatch.countDown();
+
+ assertTrue(GridTestUtils.waitForCondition(() -> map.isEmpty(), 5_000));
+ }
+
+ /**
+ * @param expectedBlocked
+ * @throws Exception
+ */
+ private void putRequestFromServer(boolean expectedBlocked) throws Exception {
+ String cacheName = "testCache";
+
+ CacheWriteSynchronizationMode syncMode = expectedBlocked ? FULL_SYNC : PRIMARY_SYNC;
+
+ CacheConfiguration testCacheCfg = new CacheConfiguration(cacheName)
+ .setBackups(2)
+ .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+ .setCacheMode(CacheMode.PARTITIONED)
+ .setWriteSynchronizationMode(syncMode);
+
+ IgniteEx ig0 = startGrid(0);
+
+ startGrid(1);
+ final CountDownLatch fileWriteLatch = initSlowFileIOFactory();
+ IgniteEx ig2 = startGrid(2);
+
+ specialFileIOFactory = null;
+ startGrid(3);
+
+ ig0.cluster().active(true);
+ IgniteCache cache = ig0.createCache(testCacheCfg);
+
+ int key = 0;
+ Affinity<Object> aff = ig0.affinity(cacheName);
+
+ while (true) {
+ key = findNonAffinityKeyForNode(aff, ig0.localNode(), key);
+
+ if (aff.isBackup(ig2.localNode(), key))
+ break;
+ else
+ key++;
+ }
+
+ AtomicBoolean putFinished = new AtomicBoolean(false);
+
+ int key0 = key;
+ GridTestUtils.runAsync(() -> {
+ cache.put(key0, new TestAddress(key0, "USA", "NYC"));
+
+ putFinished.set(true);
+ });
+
+ if (expectedBlocked) {
+ assertFalse(GridTestUtils.waitForCondition(() -> putFinished.get(), 5_000));
+
+ fileWriteLatch.countDown();
+
+ assertTrue(GridTestUtils.waitForCondition(() -> putFinished.get(), 5_000));
+ }
+ else
+ assertTrue(GridTestUtils.waitForCondition(() -> putFinished.get(), 5_000));
+ }
+
+ /**
+ * Initializes special FileIOFactory emulating slow write to disk.
+ *
+ * @return Latch to release write operation.
+ */
+ private CountDownLatch initSlowFileIOFactory() {
+ CountDownLatch cdl = new CountDownLatch(1);
+
+ specialFileIOFactory = new SlowFileIOFactory(new RandomAccessFileIOFactory());
+ fileWriteLatchRef.set(cdl);
+
+ return cdl;
+ }
+
+ /**
+ * Deletes directory with persisted binary metadata for a node with given Consistent ID.
+ */
+ private void cleanBinaryMetaFolderForNode(String consId) throws IgniteCheckedException {
+ String dfltWorkDir = U.defaultWorkDirectory();
+ File metaDir = U.resolveWorkDirectory(dfltWorkDir, "binary_meta", false);
+
+ for (File subDir : metaDir.listFiles()) {
+ if (subDir.getName().contains(consId)) {
+ U.delete(subDir);
+
+ return;
+ }
+ }
+ }
+
+ /** Finds a key that target node is neither primary or backup. */
+ private int findNonAffinityKeyForNode(Affinity aff, ClusterNode targetNode, int startFrom) {
+ int key = startFrom;
+
+ while (true) {
+ if (!aff.isPrimaryOrBackup(targetNode, key))
+ return key;
+
+ key++;
+ }
+ }
+
+ /** Finds a key that target node is a primary node for. */
+ private int findAffinityKeyForNode(Affinity aff, ClusterNode targetNode, Integer... excludeKeys) {
+ int key = 0;
+
+ while (true) {
+ if (aff.isPrimary(targetNode, key)
+ && (excludeKeys != null ? !Arrays.asList(excludeKeys).contains(Integer.valueOf(key)) : true))
+ return key;
+
+ key++;
+ }
+ }
+
+ /** */
+ static final class TestPerson {
+ /** */
+ private final int id;
+
+ /** */
+ private final String firstName;
+
+ /** */
+ private final String surname;
+
+ /** */
+ private TestAddress addr;
+
+ /** */
+ TestPerson(int id, String firstName, String surname) {
+ this.id = id;
+ this.firstName = firstName;
+ this.surname = surname;
+ }
+
+ /** */
+ void address(TestAddress addr) {
+ this.addr = addr;
+ }
+ }
+
+ /** */
+ static final class TestAddress {
+ /** */
+ private final int id;
+
+ /** */
+ private final String country;
+
+ /** */
+ private final String city;
+
+ /** */
+ private final String address;
+
+ /** */
+ TestAddress(int id, String country, String city) {
+ this.id = id;
+ this.country = country;
+ this.city = city;
+ this.address = null;
+ }
+
+ /** */
+ TestAddress(int id, String country, String city, String street) {
+ this.id = id;
+ this.country = country;
+ this.city = city;
+ this.address = street;
+ }
+ }
+
+ /** */
+ static final class TestAccount {
+ /** */
+ private final TestPerson person;
+
+ /** */
+ private final int accountId;
+
+ /** */
+ private final long accountBalance;
+
+ /** */
+ TestAccount(TestPerson person, int id, long balance) {
+ this.person = person;
+ accountId = id;
+ accountBalance = balance;
+ }
+ }
+
+ /** */
+ private static boolean isBinaryMetaFile(File file) {
+ return file.getPath().contains("binary_meta");
+ }
+
+ /** */
+ static final class SlowFileIOFactory implements FileIOFactory {
+ /** */
+ private final FileIOFactory delegateFactory;
+
+ /**
+ * @param delegateFactory Delegate factory.
+ */
+ SlowFileIOFactory(FileIOFactory delegateFactory) {
+ this.delegateFactory = delegateFactory;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+ FileIO delegate = delegateFactory.create(file, modes);
+
+ if (isBinaryMetaFile(file))
+ return new SlowFileIO(delegate, fileWriteLatchRef.get());
+
+ return delegate;
+ }
+ }
+
+ /** */
+ static class SlowFileIO extends FileIODecorator {
+ /** */
+ private final CountDownLatch fileWriteLatch;
+
+ /**
+ * @param delegate File I/O delegate
+ */
+ public SlowFileIO(FileIO delegate, CountDownLatch fileWriteLatch) {
+ super(delegate);
+
+ this.fileWriteLatch = fileWriteLatch;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int write(byte[] buf, int off, int len) throws IOException {
+ try {
+ fileWriteLatch.await();
+ }
+ catch (InterruptedException e) {
+ // No-op.
+ }
+
+ return super.write(buf, off, len);
+ }
+ }
+
+ /** */
+ static final class FailingFileIOFactory implements FileIOFactory {
+ /** */
+ private final FileIOFactory delegateFactory;
+
+ /**
+ * @param factory Delegate factory.
+ */
+ FailingFileIOFactory(FileIOFactory factory) {
+ delegateFactory = factory;
+ }
+
+ /** {@inheritDoc}*/
+ @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+ FileIO delegate = delegateFactory.create(file, modes);
+
+ if (isBinaryMetaFile(file))
+ return new FailingFileIO(delegate);
+
+ return delegate;
+ }
+ }
+
+ /** */
+ static final class FailingFileIO extends FileIODecorator {
+ /**
+ * @param delegate File I/O delegate
+ */
+ public FailingFileIO(FileIO delegate) {
+ super(delegate);
+ }
+
+ /** {@inheritDoc}*/
+ @Override public int write(byte[] buf, int off, int len) throws IOException {
+ throw new IOException("Error occured during write of binary metadata");
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index 7a26606..2384b4a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -17,6 +17,11 @@
package org.apache.ignite.testframework;
+import javax.cache.CacheException;
+import javax.cache.configuration.Factory;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
@@ -66,11 +71,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import javax.cache.CacheException;
-import javax.cache.configuration.Factory;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManager;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
@@ -2380,4 +2380,11 @@
}
}
}
+
+ /**
+ * @param runnableX Runnable with exception.
+ */
+ public static void suppressException(RunnableX runnableX) {
+ runnableX.run();
+ }
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
index 539f5aa..d58948d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
@@ -18,6 +18,7 @@
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsAtomicCacheHistoricalRebalancingTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsAtomicCacheRebalancingTest;
+import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsBinaryMetadataAsyncWritingTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsBinaryMetadataOnClusterRestartTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsBinarySortObjectFieldsTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCorruptedIndexTest;
@@ -80,6 +81,7 @@
IgnitePdsCacheDestroyDuringCheckpointTest.class,
IgnitePdsBinaryMetadataOnClusterRestartTest.class,
+ IgnitePdsBinaryMetadataAsyncWritingTest.class,
IgnitePdsMarshallerMappingRestoreOnNodeStartTest.class,
IgnitePdsThreadInterruptionTest.class,
IgnitePdsBinarySortObjectFieldsTest.class,