blob: c1e751ecf53bcbb18ad9a1d25778d93d2ddcc71e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.igfs;
import org.apache.ignite.events.IgfsEvent;
import org.apache.ignite.igfs.IgfsOutputStream;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
import java.io.DataInput;
import java.io.IOException;
import java.nio.ByteBuffer;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_WRITE;
/**
* Output stream to store data into grid cache with separate blocks.
*/
abstract class IgfsAbstractOutputStream extends IgfsOutputStream {
/** IGFS context. */
protected final IgfsContext igfsCtx;
/** Path to file. */
protected final IgfsPath path;
/** Buffer size. */
protected final int bufSize;
/** File worker batch. */
protected final IgfsFileWorkerBatch batch;
/** Mutex for synchronization. */
protected final Object mux = new Object();
/** Flag for this stream open/closed state. */
protected boolean closed;
/** Local buffer to store stream data as consistent block. */
protected ByteBuffer buf;
/** Bytes written. */
protected long bytes;
/** Time consumed by write operations. */
protected long time;
/**
* Constructs file output stream.
*
* @param igfsCtx IGFS context.
* @param path Path to stored file.
* @param bufSize The size of the buffer to be used.
* @param batch Optional secondary file system batch.
*/
IgfsAbstractOutputStream(IgfsContext igfsCtx, IgfsPath path, int bufSize, @Nullable IgfsFileWorkerBatch batch) {
synchronized (mux) {
this.path = path;
this.bufSize = optimizeBufferSize(bufSize);
this.igfsCtx = igfsCtx;
this.batch = batch;
}
igfsCtx.metrics().incrementFilesOpenedForWrite();
}
/**
* Optimize buffer size.
*
* @param bufSize Original byffer size.
* @return Optimized buffer size.
*/
protected abstract int optimizeBufferSize(int bufSize);
/** {@inheritDoc} */
@Override public void write(int b) throws IOException {
synchronized (mux) {
checkClosed(null, 0);
b &= 0xFF;
long startTime = System.nanoTime();
if (buf == null)
buf = allocateNewBuffer();
buf.put((byte)b);
sendBufferIfFull();
time += System.nanoTime() - startTime;
}
}
/** {@inheritDoc} */
@SuppressWarnings("NullableProblems")
@Override public void write(byte[] b, int off, int len) throws IOException {
A.notNull(b, "b");
if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
throw new IndexOutOfBoundsException("Invalid bounds [data.length=" + b.length + ", offset=" + off +
", length=" + len + ']');
}
synchronized (mux) {
checkClosed(null, 0);
// Check if there is anything to write.
if (len == 0)
return;
long startTime = System.nanoTime();
if (buf == null) {
if (len >= bufSize) {
// Send data right away.
ByteBuffer tmpBuf = ByteBuffer.wrap(b, off, len);
send(tmpBuf, tmpBuf.remaining());
}
else {
buf = allocateNewBuffer();
buf.put(b, off, len);
}
}
else {
// Re-allocate buffer if needed.
if (buf.remaining() < len)
buf = ByteBuffer.allocate(buf.position() + len).put((ByteBuffer)buf.flip());
buf.put(b, off, len);
sendBufferIfFull();
}
time += System.nanoTime() - startTime;
}
}
/** {@inheritDoc} */
@Override public void transferFrom(DataInput in, int len) throws IOException {
synchronized (mux) {
checkClosed(in, len);
long startTime = System.nanoTime();
// Clean-up local buffer before streaming.
sendBufferIfNotEmpty();
// Perform transfer.
send(in, len);
time += System.nanoTime() - startTime;
}
}
/**
* Validate this stream is open.
*
* @param in Data input.
* @param len Data len in bytes.
* @throws IOException If this stream is closed.
*/
protected void checkClosed(@Nullable DataInput in, int len) throws IOException {
assert Thread.holdsLock(mux);
if (closed) {
// Must read data from stream before throwing exception.
if (in != null)
in.skipBytes(len);
throw new IOException("Stream has been closed: " + this);
}
}
/**
* Send local buffer if it full.
*
* @throws IOException If failed.
*/
private void sendBufferIfFull() throws IOException {
if (buf.position() >= bufSize)
sendBuffer();
}
/**
* Send local buffer if at least something is stored there.
*
* @throws IOException If failed.
*/
void sendBufferIfNotEmpty() throws IOException {
if (buf != null && buf.position() > 0)
sendBuffer();
}
/**
* Send all local-buffered data to server.
*
* @throws IOException In case of IO exception.
*/
private void sendBuffer() throws IOException {
buf.flip();
send(buf, buf.remaining());
buf = null;
}
/**
* Store data block.
*
* @param data Block.
* @param writeLen Write length.
* @throws IOException If failed.
*/
protected abstract void send(Object data, int writeLen) throws IOException;
/**
* Allocate new buffer.
*
* @return New buffer.
*/
private ByteBuffer allocateNewBuffer() {
return ByteBuffer.allocate(bufSize);
}
/**
* Updates IGFS metrics when the stream is closed.
*/
protected void updateMetricsOnClose() {
IgfsLocalMetrics metrics = igfsCtx.metrics();
metrics.addWrittenBytesTime(bytes, time);
metrics.decrementFilesOpenedForWrite();
GridEventStorageManager evts = igfsCtx.kernalContext().event();
if (evts.isRecordable(EVT_IGFS_FILE_CLOSED_WRITE))
evts.record(new IgfsEvent(path, igfsCtx.localNode(),
EVT_IGFS_FILE_CLOSED_WRITE, bytes));
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IgfsAbstractOutputStream.class, this);
}
}