blob: 924cf14e8d266f3dca8b700faa44f485c4264f7f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.binary;
import java.io.File;
import java.io.FileInputStream;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.binary.BinaryMetadata;
import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
/**
* Class handles saving/restoring binary metadata to/from disk.
*
* Current implementation needs to be rewritten as it issues IO operations from discovery thread which may lead to
* segmentation of nodes from cluster.
*/
class BinaryMetadataFileStore {
/** Link to resolved binary metadata directory. Null for non persistent mode */
private File workDir;
/** */
private final ConcurrentMap<Integer, BinaryMetadataHolder> metadataLocCache;
/** */
private final GridKernalContext ctx;
/** */
private final boolean isPersistenceEnabled;
/** */
private FileIOFactory fileIOFactory;
/** */
private final IgniteLogger log;
/** */
private BinaryMetadataAsyncWriter writer;
/**
* @param metadataLocCache Metadata locale cache.
* @param ctx Context.
* @param log Logger.
* @param binaryMetadataFileStoreDir Path to binary metadata store configured by user, should include binary_meta
* and consistentId
*/
BinaryMetadataFileStore(
final ConcurrentMap<Integer, BinaryMetadataHolder> metadataLocCache,
final GridKernalContext ctx,
final IgniteLogger log,
@Nullable final File binaryMetadataFileStoreDir
) throws IgniteCheckedException {
this.metadataLocCache = metadataLocCache;
this.ctx = ctx;
this.isPersistenceEnabled = CU.isPersistenceEnabled(ctx.config());
this.log = log;
if (!CU.isPersistenceEnabled(ctx.config()))
return;
fileIOFactory = ctx.config().getDataStorageConfiguration().getFileIOFactory();
if (binaryMetadataFileStoreDir != null)
workDir = binaryMetadataFileStoreDir;
else {
final String subFolder = ctx.pdsFolderResolver().resolveFolders().folderName();
workDir = new File(U.resolveWorkDirectory(
ctx.config().getWorkDirectory(),
"binary_meta",
false
),
subFolder);
}
U.ensureDirectory(workDir, "directory for serialized binary metadata", log);
writer = new BinaryMetadataAsyncWriter();
new IgniteThread(writer).start();
}
/**
* Stops worker for async writing of binary metadata.
*/
void stop() {
U.cancel(writer);
}
/**
* @param binMeta Binary metadata to be written to disk.
*/
void writeMetadata(BinaryMetadata binMeta) {
if (!isPersistenceEnabled)
return;
try {
File file = new File(workDir, binMeta.typeId() + ".bin");
byte[] marshalled = U.marshal(ctx, binMeta);
try (final FileIO out = fileIOFactory.create(file)) {
int left = marshalled.length;
while ((left -= out.writeFully(marshalled, 0, Math.min(marshalled.length, left))) > 0)
;
out.force();
}
}
catch (Exception e) {
final String msg = "Failed to save metadata for typeId: " + binMeta.typeId() +
"; exception was thrown: " + e.getMessage();
U.error(log, msg);
writer.cancel();
ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
throw new IgniteException(msg, e);
}
}
/**
* Restores metadata on startup of {@link CacheObjectBinaryProcessorImpl} but before starting discovery.
*/
void restoreMetadata() {
if (!isPersistenceEnabled)
return;
for (File file : workDir.listFiles()) {
try (FileInputStream in = new FileInputStream(file)) {
BinaryMetadata meta = U.unmarshal(ctx.config().getMarshaller(), in, U.resolveClassLoader(ctx.config()));
metadataLocCache.put(meta.typeId(), new BinaryMetadataHolder(meta, 0, 0));
}
catch (Exception e) {
U.warn(log, "Failed to restore metadata from file: " + file.getName() +
"; exception was thrown: " + e.getMessage());
}
}
}
/**
* Checks if binary metadata for the same typeId is already presented on disk. If so merges it with new metadata and
* stores the result. Otherwise just writes new metadata.
*
* @param binMeta new binary metadata to write to disk.
*/
void mergeAndWriteMetadata(BinaryMetadata binMeta) {
BinaryMetadata existingMeta = readMetadata(binMeta.typeId());
if (existingMeta != null) {
BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(existingMeta, binMeta);
writeMetadata(mergedMeta);
}
else
writeMetadata(binMeta);
}
/**
* Reads binary metadata for given typeId.
*
* @param typeId typeId of BinaryMetadata to be read.
*/
private BinaryMetadata readMetadata(int typeId) {
File file = new File(workDir, Integer.toString(typeId) + ".bin");
if (!file.exists())
return null;
try (FileInputStream in = new FileInputStream(file)) {
return U.unmarshal(ctx.config().getMarshaller(), in, U.resolveClassLoader(ctx.config()));
}
catch (Exception e) {
U.warn(log, "Failed to restore metadata from file: " + file.getName() +
"; exception was thrown: " + e.getMessage());
}
return null;
}
/**
*
*/
void prepareMetadataWriting(BinaryMetadata meta, int typeVer) {
if (!isPersistenceEnabled)
return;
writer.prepareWriteFuture(meta, typeVer);
}
/**
* @param typeId Type ID.
* @param typeVer Type version.
*/
void writeMetadataAsync(int typeId, int typeVer) {
if (!isPersistenceEnabled)
return;
writer.startWritingAsync(typeId, typeVer);
}
/**
* {@code typeVer} parameter is always non-negative except one special case
* (see {@link CacheObjectBinaryProcessorImpl#addMeta(int, BinaryType, boolean)} for context):
* if request for bin meta update arrives right at the moment when node is stopping
* {@link MetadataUpdateResult} of special type is generated: UPDATE_DISABLED.
*
* @param typeId
* @param typeVer
* @throws IgniteCheckedException
*/
void waitForWriteCompletion(int typeId, int typeVer) throws IgniteCheckedException {
if (!isPersistenceEnabled)
return;
writer.waitForWriteCompletion(typeId, typeVer);
}
/**
* @param typeId Binary metadata type id.
* @param typeVer Type version.
*/
void finishWrite(int typeId, int typeVer) {
if (!isPersistenceEnabled)
return;
writer.finishWriteFuture(typeId, typeVer);
}
/**
*
*/
private class BinaryMetadataAsyncWriter extends GridWorker {
/**
* Queue of write tasks submitted for execution.
*/
private final BlockingQueue<WriteOperationTask> queue = new LinkedBlockingQueue<>();
/**
* Write operation tasks prepared for writing (but not yet submitted to execution (actual writing).
*/
private final ConcurrentMap<OperationSyncKey, WriteOperationTask> preparedWriteTasks = new ConcurrentHashMap<>();
/** */
BinaryMetadataAsyncWriter() {
super(ctx.igniteInstanceName(), "binary-metadata-writer", BinaryMetadataFileStore.this.log, ctx.workersRegistry());
}
/**
* @param typeId Type ID.
* @param typeVer Type version.
*/
synchronized void startWritingAsync(int typeId, int typeVer) {
if (isCancelled())
return;
WriteOperationTask task = preparedWriteTasks.get(new OperationSyncKey(typeId, typeVer));
if (task != null) {
if (log.isDebugEnabled())
log.debug(
"Submitting task for async write for" +
" [typeId=" + typeId +
", typeVersion=" + typeVer + ']'
);
queue.add(task);
}
else {
if (log.isDebugEnabled())
log.debug(
"Task for async write for" +
" [typeId=" + typeId +
", typeVersion=" + typeVer + "] not found"
);
}
}
/** {@inheritDoc} */
@Override public synchronized void cancel() {
super.cancel();
queue.clear();
IgniteCheckedException err = new IgniteCheckedException("Operation has been cancelled (node is stopping).");
for (Map.Entry<OperationSyncKey, WriteOperationTask> e : preparedWriteTasks.entrySet()) {
if (log.isDebugEnabled())
log.debug(
"Cancelling future for write operation for" +
" [typeId=" + e.getKey().typeId +
", typeVer=" + e.getKey().typeVer + ']'
);
e.getValue().future.onDone(err);
}
preparedWriteTasks.clear();
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
while (!isCancelled()) {
try {
body0();
}
catch (InterruptedException e) {
if (!isCancelled) {
ctx.failure().process(new FailureContext(FailureType.SYSTEM_WORKER_TERMINATION, e));
throw e;
}
}
}
}
/** */
private void body0() throws InterruptedException {
WriteOperationTask task;
blockingSectionBegin();
try {
task = queue.take();
if (log.isDebugEnabled())
log.debug(
"Starting write operation for" +
" [typeId=" + task.meta.typeId() +
", typeVer=" + task.typeVer + ']'
);
writeMetadata(task.meta);
}
finally {
blockingSectionEnd();
}
finishWriteFuture(task.meta.typeId(), task.typeVer);
}
/**
* @param typeId Binary metadata type id.
* @param typeVer Type version.
*/
void finishWriteFuture(int typeId, int typeVer) {
WriteOperationTask task = preparedWriteTasks.remove(new OperationSyncKey(typeId, typeVer));
if (task != null) {
if (log.isDebugEnabled())
log.debug(
"Future for write operation for" +
" [typeId=" + typeId +
", typeVer=" + typeVer + ']' +
" completed."
);
task.future.onDone();
}
else {
if (log.isDebugEnabled())
log.debug(
"Future for write operation for" +
" [typeId=" + typeId +
", typeVer=" + typeVer + ']' +
" not found."
);
}
}
/**
* @param meta Binary metadata.
* @param typeVer Type version.
*/
synchronized void prepareWriteFuture(BinaryMetadata meta, int typeVer) {
if (isCancelled())
return;
if (log.isDebugEnabled())
log.debug(
"Prepare task for async write for" +
"[typeName=" + meta.typeName() +
", typeId=" + meta.typeId() +
", typeVersion=" + typeVer + ']'
);
preparedWriteTasks.putIfAbsent(new OperationSyncKey(meta.typeId(), typeVer), new WriteOperationTask(meta, typeVer));
}
/**
* @param typeId Type ID.
* @param typeVer Type version.
* @throws IgniteCheckedException If write operation failed.
*/
void waitForWriteCompletion(int typeId, int typeVer) throws IgniteCheckedException {
//special case, see javadoc of {@link BinaryMetadataFileStore#waitForWriteCompletion}
if (typeVer == -1) {
if (log.isDebugEnabled())
log.debug("No need to wait for " + typeId + ", negative typeVer was passed.");
return;
}
WriteOperationTask task = preparedWriteTasks.get(new OperationSyncKey(typeId, typeVer));
if (task != null) {
if (log.isDebugEnabled())
log.debug(
"Waiting for write completion of" +
" [typeId=" + typeId +
", typeVer=" + typeVer + "]"
);
try {
task.future.get();
}
finally {
if (log.isDebugEnabled())
log.debug(
"Released for write completion of" +
" [typeId=" + typeId +
", typeVer=" + typeVer + ']'
);
}
}
else {
if (log.isDebugEnabled())
log.debug(
"Task for async write for" +
" [typeId=" + typeId +
", typeVersion=" + typeVer + "] not found"
);
}
}
}
/**
*
*/
private static final class WriteOperationTask {
/** */
private final BinaryMetadata meta;
/** */
private final int typeVer;
/** */
private final GridFutureAdapter future = new GridFutureAdapter();
/** */
private WriteOperationTask(BinaryMetadata meta, int ver) {
this.meta = meta;
typeVer = ver;
}
}
/**
*
*/
private static final class OperationSyncKey {
/** */
private final int typeId;
/** */
private final int typeVer;
/** */
private OperationSyncKey(int typeId, int typeVer) {
this.typeId = typeId;
this.typeVer = typeVer;
}
/** {@inheritDoc} */
@Override public int hashCode() {
return 31 * typeId + typeVer;
}
/** {@inheritDoc} */
@Override public boolean equals(Object obj) {
if (!(obj instanceof OperationSyncKey))
return false;
OperationSyncKey that = (OperationSyncKey)obj;
return (that.typeId == typeId) && (that.typeVer == typeVer);
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(OperationSyncKey.class, this);
}
}
}