blob: b65df9f89b081674866bcb060b18896be9295f82 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.storage;
import com.google.protobuf.ByteString;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.KeyData;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.KeyValue;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.UUID;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putKey;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls
.writeChunk;
/**
* An {@link OutputStream} used by the REST service in combination with the
* SCMClient to write the value of a key to a sequence
* of container chunks. Writes are buffered locally and periodically written to
* the container as a new chunk. In order to preserve the semantics that
* replacement of a pre-existing key is atomic, each instance of the stream has
* an internal unique identifier. This unique identifier and a monotonically
* increasing chunk index form a composite key that is used as the chunk name.
* After all data is written, a putKey call creates or updates the corresponding
* container key, and this call includes the full list of chunks that make up
* the key data. The list of chunks is updated all at once. Therefore, a
* concurrent reader never can see an intermediate state in which different
* chunks of data from different versions of the key data are interleaved.
* This class encapsulates all state management for buffering and writing
* through to the container.
*/
public class ChunkOutputStream extends OutputStream {
private final String containerKey;
private final String key;
private final String traceID;
private final KeyData.Builder containerKeyData;
private XceiverClientManager xceiverClientManager;
private XceiverClientSpi xceiverClient;
private ByteBuffer buffer;
private final String streamId;
private int chunkIndex;
private int chunkSize;
/**
* Creates a new ChunkOutputStream.
*
* @param containerKey container key
* @param key chunk key
* @param xceiverClientManager client manager that controls client
* @param xceiverClient client to perform container calls
* @param traceID container protocol call args
* @param chunkSize chunk size
*/
public ChunkOutputStream(String containerKey, String key,
XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient,
String traceID, int chunkSize) {
this.containerKey = containerKey;
this.key = key;
this.traceID = traceID;
this.chunkSize = chunkSize;
KeyValue keyValue = KeyValue.newBuilder()
.setKey("TYPE").setValue("KEY").build();
this.containerKeyData = KeyData.newBuilder()
.setContainerName(xceiverClient.getPipeline().getContainerName())
.setName(containerKey)
.addMetadata(keyValue);
this.xceiverClientManager = xceiverClientManager;
this.xceiverClient = xceiverClient;
this.buffer = ByteBuffer.allocate(chunkSize);
this.streamId = UUID.randomUUID().toString();
this.chunkIndex = 0;
}
@Override
public synchronized void write(int b) throws IOException {
checkOpen();
int rollbackPosition = buffer.position();
int rollbackLimit = buffer.limit();
buffer.put((byte)b);
if (buffer.position() == chunkSize) {
flushBufferToChunk(rollbackPosition, rollbackLimit);
}
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
}
if ((off < 0) || (off > b.length) || (len < 0) ||
((off + len) > b.length) || ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
}
if (len == 0) {
return;
}
checkOpen();
while (len > 0) {
int writeLen = Math.min(chunkSize - buffer.position(), len);
int rollbackPosition = buffer.position();
int rollbackLimit = buffer.limit();
buffer.put(b, off, writeLen);
if (buffer.position() == chunkSize) {
flushBufferToChunk(rollbackPosition, rollbackLimit);
}
off += writeLen;
len -= writeLen;
}
}
@Override
public synchronized void flush() throws IOException {
checkOpen();
if (buffer.position() > 0) {
int rollbackPosition = buffer.position();
int rollbackLimit = buffer.limit();
flushBufferToChunk(rollbackPosition, rollbackLimit);
}
}
@Override
public synchronized void close() throws IOException {
if (xceiverClientManager != null && xceiverClient != null &&
buffer != null) {
try {
if (buffer.position() > 0) {
writeChunkToContainer();
}
putKey(xceiverClient, containerKeyData.build(), traceID);
} catch (IOException e) {
throw new IOException(
"Unexpected Storage Container Exception: " + e.toString(), e);
} finally {
xceiverClientManager.releaseClient(xceiverClient);
xceiverClientManager = null;
xceiverClient = null;
buffer = null;
}
}
}
/**
* Checks if the stream is open. If not, throws an exception.
*
* @throws IOException if stream is closed
*/
private synchronized void checkOpen() throws IOException {
if (xceiverClient == null) {
throw new IOException("ChunkOutputStream has been closed.");
}
}
/**
* Attempts to flush buffered writes by writing a new chunk to the container.
* If successful, then clears the buffer to prepare to receive writes for a
* new chunk.
*
* @param rollbackPosition position to restore in buffer if write fails
* @param rollbackLimit limit to restore in buffer if write fails
* @throws IOException if there is an I/O error while performing the call
*/
private synchronized void flushBufferToChunk(int rollbackPosition,
int rollbackLimit) throws IOException {
boolean success = false;
try {
writeChunkToContainer();
success = true;
} finally {
if (success) {
buffer.clear();
} else {
buffer.position(rollbackPosition);
buffer.limit(rollbackLimit);
}
}
}
/**
* Writes buffered data as a new chunk to the container and saves chunk
* information to be used later in putKey call.
*
* @throws IOException if there is an I/O error while performing the call
*/
private synchronized void writeChunkToContainer() throws IOException {
buffer.flip();
ByteString data = ByteString.copyFrom(buffer);
ChunkInfo chunk = ChunkInfo
.newBuilder()
.setChunkName(
DigestUtils.md5Hex(key) + "_stream_"
+ streamId + "_chunk_" + ++chunkIndex)
.setOffset(0)
.setLen(data.size())
.build();
try {
writeChunk(xceiverClient, chunk, key, data, traceID);
} catch (IOException e) {
throw new IOException(
"Unexpected Storage Container Exception: " + e.toString(), e);
}
containerKeyData.addChunks(chunk);
}
}