blob: 8381be08da4b1c193e835f8990b12e56fc30313f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.client.io;
import java.io.IOException;
import java.io.OutputStream;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ChecksumType;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import java.util.Collection;
/**
* Helper class used inside {@link BlockOutputStream}.
* */
public final class BlockOutputStreamEntry extends OutputStream {
private OutputStream outputStream;
private BlockID blockID;
private final String key;
private final XceiverClientManager xceiverClientManager;
private final Pipeline pipeline;
private final ChecksumType checksumType;
private final int bytesPerChecksum;
private final int chunkSize;
// total number of bytes that should be written to this stream
private final long length;
// the current position of this stream 0 <= currentPosition < length
private long currentPosition;
private Token<OzoneBlockTokenIdentifier> token;
private final long streamBufferFlushSize;
private final long streamBufferMaxSize;
private final long watchTimeout;
private BufferPool bufferPool;
@SuppressWarnings("parameternumber")
private BlockOutputStreamEntry(BlockID blockID, String key,
XceiverClientManager xceiverClientManager,
Pipeline pipeline, String requestId, int chunkSize,
long length, long streamBufferFlushSize, long streamBufferMaxSize,
long watchTimeout, BufferPool bufferPool,
ChecksumType checksumType, int bytesPerChecksum,
Token<OzoneBlockTokenIdentifier> token) {
this.outputStream = null;
this.blockID = blockID;
this.key = key;
this.xceiverClientManager = xceiverClientManager;
this.pipeline = pipeline;
this.chunkSize = chunkSize;
this.token = token;
this.length = length;
this.currentPosition = 0;
this.streamBufferFlushSize = streamBufferFlushSize;
this.streamBufferMaxSize = streamBufferMaxSize;
this.watchTimeout = watchTimeout;
this.bufferPool = bufferPool;
this.checksumType = checksumType;
this.bytesPerChecksum = bytesPerChecksum;
}
long getLength() {
return length;
}
Token<OzoneBlockTokenIdentifier> getToken() {
return token;
}
long getRemaining() {
return length - currentPosition;
}
/**
* BlockOutputStream is initialized in this function. This makes sure that
* xceiverClient initialization is not done during preallocation and only
* done when data is written.
* @throws IOException if xceiverClient initialization fails
*/
private void checkStream() throws IOException {
if (this.outputStream == null) {
if (getToken() != null) {
UserGroupInformation.getCurrentUser().addToken(getToken());
}
this.outputStream =
new BlockOutputStream(blockID, xceiverClientManager,
pipeline, chunkSize, streamBufferFlushSize,
streamBufferMaxSize, watchTimeout, bufferPool, checksumType,
bytesPerChecksum);
}
}
@Override
public void write(int b) throws IOException {
checkStream();
outputStream.write(b);
this.currentPosition += 1;
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
checkStream();
outputStream.write(b, off, len);
this.currentPosition += len;
}
@Override
public void flush() throws IOException {
if (this.outputStream != null) {
this.outputStream.flush();
}
}
@Override
public void close() throws IOException {
if (this.outputStream != null) {
this.outputStream.close();
// after closing the chunkOutPutStream, blockId would have been
// reconstructed with updated bcsId
this.blockID = ((BlockOutputStream) outputStream).getBlockID();
}
}
boolean isClosed() {
if (outputStream != null) {
return ((BlockOutputStream) outputStream).isClosed();
}
return false;
}
long getTotalAckDataLength() {
if (outputStream != null) {
BlockOutputStream out = (BlockOutputStream) this.outputStream;
blockID = out.getBlockID();
return out.getTotalAckDataLength();
} else {
// For a pre allocated block for which no write has been initiated,
// the OutputStream will be null here.
// In such cases, the default blockCommitSequenceId will be 0
return 0;
}
}
Collection<DatanodeDetails> getFailedServers() throws IOException {
if (outputStream != null) {
BlockOutputStream out = (BlockOutputStream) this.outputStream;
return out.getFailedServers();
}
return null;
}
long getWrittenDataLength() throws IOException {
if (outputStream != null) {
BlockOutputStream out = (BlockOutputStream) this.outputStream;
return out.getWrittenDataLength();
} else {
// For a pre allocated block for which no write has been initiated,
// the OutputStream will be null here.
// In such cases, the default blockCommitSequenceId will be 0
return 0;
}
}
void cleanup(boolean invalidateClient) throws IOException {
checkStream();
BlockOutputStream out = (BlockOutputStream) this.outputStream;
out.cleanup(invalidateClient);
}
void writeOnRetry(long len) throws IOException {
checkStream();
BlockOutputStream out = (BlockOutputStream) this.outputStream;
out.writeOnRetry(len);
this.currentPosition += len;
}
/**
* Builder class for ChunkGroupOutputStreamEntry.
* */
public static class Builder {
private BlockID blockID;
private String key;
private XceiverClientManager xceiverClientManager;
private Pipeline pipeline;
private String requestId;
private int chunkSize;
private long length;
private long streamBufferFlushSize;
private long streamBufferMaxSize;
private long watchTimeout;
private BufferPool bufferPool;
private Token<OzoneBlockTokenIdentifier> token;
private ChecksumType checksumType;
private int bytesPerChecksum;
public Builder setChecksumType(ChecksumType type) {
this.checksumType = type;
return this;
}
public Builder setBytesPerChecksum(int bytes) {
this.bytesPerChecksum = bytes;
return this;
}
public Builder setBlockID(BlockID bID) {
this.blockID = bID;
return this;
}
public Builder setKey(String keys) {
this.key = keys;
return this;
}
public Builder setXceiverClientManager(XceiverClientManager
xClientManager) {
this.xceiverClientManager = xClientManager;
return this;
}
public Builder setPipeline(Pipeline ppln) {
this.pipeline = ppln;
return this;
}
public Builder setRequestId(String request) {
this.requestId = request;
return this;
}
public Builder setChunkSize(int cSize) {
this.chunkSize = cSize;
return this;
}
public Builder setLength(long len) {
this.length = len;
return this;
}
public Builder setStreamBufferFlushSize(long bufferFlushSize) {
this.streamBufferFlushSize = bufferFlushSize;
return this;
}
public Builder setStreamBufferMaxSize(long bufferMaxSize) {
this.streamBufferMaxSize = bufferMaxSize;
return this;
}
public Builder setWatchTimeout(long timeout) {
this.watchTimeout = timeout;
return this;
}
public Builder setbufferPool(BufferPool pool) {
this.bufferPool = pool;
return this;
}
public Builder setToken(Token<OzoneBlockTokenIdentifier> bToken) {
this.token = bToken;
return this;
}
public BlockOutputStreamEntry build() {
return new BlockOutputStreamEntry(blockID, key,
xceiverClientManager, pipeline, requestId, chunkSize,
length, streamBufferFlushSize, streamBufferMaxSize, watchTimeout,
bufferPool, checksumType, bytesPerChecksum, token);
}
}
@VisibleForTesting
public OutputStream getOutputStream() {
return outputStream;
}
public BlockID getBlockID() {
return blockID;
}
public String getKey() {
return key;
}
public XceiverClientManager getXceiverClientManager() {
return xceiverClientManager;
}
public Pipeline getPipeline() {
return pipeline;
}
public int getChunkSize() {
return chunkSize;
}
public long getCurrentPosition() {
return currentPosition;
}
public long getStreamBufferFlushSize() {
return streamBufferFlushSize;
}
public long getStreamBufferMaxSize() {
return streamBufferMaxSize;
}
public long getWatchTimeout() {
return watchTimeout;
}
public BufferPool getBufferPool() {
return bufferPool;
}
public void setCurrentPosition(long curPosition) {
this.currentPosition = curPosition;
}
}