blob: ee809e062e21bd5428d3c2bb6ba504294efd1a92 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.igfs;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.internal.U;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
/**
* Work batch is an abstraction of the logically grouped tasks.
*/
public abstract class IgfsFileWorkerBatch implements Runnable {
/** Stop marker. */
private static final byte[] FINISH_MARKER = new byte[0];
/** Cancel marker. */
private static final byte[] CANCEL_MARKER = new byte[0];
/** Tasks queue. */
private final BlockingDeque<byte[]> queue = new LinkedBlockingDeque<>();
/** Future which completes when batch processing is finished. */
private final GridFutureAdapter fut = new GridFutureAdapter();
/** Path to the file in the primary file system. */
private final IgfsPath path;
/** Output stream to the file. */
private final OutputStream out;
/** Finishing flag. */
private volatile boolean finishing;
/**
* Constructor.
*
* @param path Path to the file in the primary file system.
* @param out Output stream opened to that file.
*/
IgfsFileWorkerBatch(IgfsPath path, OutputStream out) {
assert path != null;
assert out != null;
this.path = path;
this.out = out;
}
/**
* Perform write if batch is not finishing yet.
*
* @param data Data to be written.
* @return {@code True} in case write was enqueued.
*/
synchronized boolean write(final byte[] data) {
return offer(data, false, false);
}
/**
* Add the last task to that batch which will release all the resources.
*
* @return {@code True} if finish was signalled.
*/
synchronized boolean finish() {
return offer(FINISH_MARKER, false, true);
}
/**
* Cancel batch processing.
*
* @return {@code True} if cancel was signalled.
*/
synchronized boolean cancel() {
return offer(CANCEL_MARKER, true, true);
}
/**
* Add request to queue.
*
* @param data Data.
* @param head Whether to add to head.
* @param finish Whether this is the last batch to be accepted.
* @return {@code True} if request was added to queue.
*/
private synchronized boolean offer(byte[] data, boolean head, boolean finish) {
if (finishing)
return false;
if (head)
queue.addFirst(data);
else
queue.addLast(data);
if (finish)
finishing = true;
return true;
}
/**
* @return {@code True} if finish was called on this batch.
*/
boolean finishing() {
return finishing;
}
/**
* Process the batch.
*/
@Override @SuppressWarnings("unchecked")
public void run() {
Throwable err = null;
try {
while (true) {
try {
byte[] data = queue.poll(1000, TimeUnit.MILLISECONDS);
if (data == FINISH_MARKER) {
assert queue.isEmpty();
break;
}
else if (data == CANCEL_MARKER)
throw new IgfsFileWorkerBatchCancelledException(path);
else if (data != null) {
try {
out.write(data);
}
catch (IOException e) {
throw new IgniteCheckedException("Failed to write data to the file due to secondary " +
"file system exception: " + path, e);
}
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
err = e;
break;
}
catch (Exception e) {
err = e;
break;
}
}
}
catch (Throwable e) {
// Safety. This should never happen under normal conditions.
err = e;
if (e instanceof Error)
throw e;
}
finally {
// Order of events is very important here. First, we close the stream so that metadata locks are released.
// This action must be the very first because otherwise a writer thread could interfere with itself.
U.closeQuiet(out);
// Next, we invoke callback so that IgfsImpl is able to enqueue new requests. This is safe because
// at this point file processing is completed.
onDone();
// Finally, we complete the future, so that waiting threads could resume.
assert !fut.isDone();
fut.onDone(null, err);
}
}
/**
* Await for that worker batch to complete.
*
* @throws IgniteCheckedException In case any exception has occurred during batch tasks processing.
*/
void await() throws IgniteCheckedException {
fut.get();
}
/**
* Get primary file system path.
*
* @return Primary file system path.
*/
IgfsPath path() {
return path;
}
/**
* Callback invoked when execution finishes.
*/
protected abstract void onDone();
}