blob: 8e1e6405e770e3d1f7c558503c12f5fa6dc265be [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.client.io;
import java.io.IOException;
import java.io.OutputStream;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ChecksumType;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import java.util.Collection;
import java.util.Collections;
/**
* Helper class used inside {@link BlockOutputStream}.
* */
public final class BlockOutputStreamEntry extends OutputStream {
private OutputStream outputStream;
private BlockID blockID;
private final String key;
private final XceiverClientManager xceiverClientManager;
private final Pipeline pipeline;
private final ChecksumType checksumType;
private final int bytesPerChecksum;
private final int chunkSize;
// total number of bytes that should be written to this stream
private final long length;
// the current position of this stream 0 <= currentPosition < length
private long currentPosition;
private Token<OzoneBlockTokenIdentifier> token;
private final int streamBufferSize;
private final long streamBufferFlushSize;
private final boolean streamBufferFlushDelay;
private final long streamBufferMaxSize;
private final long watchTimeout;
private BufferPool bufferPool;
@SuppressWarnings({"parameternumber", "squid:S00107"})
private BlockOutputStreamEntry(BlockID blockID, String key,
XceiverClientManager xceiverClientManager,
Pipeline pipeline, String requestId, int chunkSize,
long length, int streamBufferSize, long streamBufferFlushSize,
boolean streamBufferFlushDelay, long streamBufferMaxSize,
long watchTimeout, BufferPool bufferPool,
ChecksumType checksumType, int bytesPerChecksum,
Token<OzoneBlockTokenIdentifier> token) {
this.outputStream = null;
this.blockID = blockID;
this.key = key;
this.xceiverClientManager = xceiverClientManager;
this.pipeline = pipeline;
this.chunkSize = chunkSize;
this.token = token;
this.length = length;
this.currentPosition = 0;
this.streamBufferSize = streamBufferSize;
this.streamBufferFlushSize = streamBufferFlushSize;
this.streamBufferFlushDelay = streamBufferFlushDelay;
this.streamBufferMaxSize = streamBufferMaxSize;
this.watchTimeout = watchTimeout;
this.bufferPool = bufferPool;
this.checksumType = checksumType;
this.bytesPerChecksum = bytesPerChecksum;
}
long getLength() {
return length;
}
Token<OzoneBlockTokenIdentifier> getToken() {
return token;
}
long getRemaining() {
return length - currentPosition;
}
/**
* BlockOutputStream is initialized in this function. This makes sure that
* xceiverClient initialization is not done during preallocation and only
* done when data is written.
* @throws IOException if xceiverClient initialization fails
*/
private void checkStream() throws IOException {
if (this.outputStream == null) {
if (getToken() != null) {
UserGroupInformation.getCurrentUser().addToken(getToken());
}
this.outputStream =
new BlockOutputStream(blockID, xceiverClientManager,
pipeline, streamBufferSize, streamBufferFlushSize,
streamBufferFlushDelay, streamBufferMaxSize, bufferPool,
checksumType, bytesPerChecksum);
}
}
@Override
public void write(int b) throws IOException {
checkStream();
outputStream.write(b);
this.currentPosition += 1;
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
checkStream();
outputStream.write(b, off, len);
this.currentPosition += len;
}
@Override
public void flush() throws IOException {
if (this.outputStream != null) {
this.outputStream.flush();
}
}
@Override
public void close() throws IOException {
if (this.outputStream != null) {
this.outputStream.close();
// after closing the chunkOutPutStream, blockId would have been
// reconstructed with updated bcsId
this.blockID = ((BlockOutputStream) outputStream).getBlockID();
}
}
boolean isClosed() {
if (outputStream != null) {
return ((BlockOutputStream) outputStream).isClosed();
}
return false;
}
long getTotalAckDataLength() {
if (outputStream != null) {
BlockOutputStream out = (BlockOutputStream) this.outputStream;
blockID = out.getBlockID();
return out.getTotalAckDataLength();
} else {
// For a pre allocated block for which no write has been initiated,
// the OutputStream will be null here.
// In such cases, the default blockCommitSequenceId will be 0
return 0;
}
}
Collection<DatanodeDetails> getFailedServers() {
if (outputStream != null) {
BlockOutputStream out = (BlockOutputStream) this.outputStream;
return out.getFailedServers();
}
return Collections.emptyList();
}
long getWrittenDataLength() {
if (outputStream != null) {
BlockOutputStream out = (BlockOutputStream) this.outputStream;
return out.getWrittenDataLength();
} else {
// For a pre allocated block for which no write has been initiated,
// the OutputStream will be null here.
// In such cases, the default blockCommitSequenceId will be 0
return 0;
}
}
void cleanup(boolean invalidateClient) throws IOException {
checkStream();
BlockOutputStream out = (BlockOutputStream) this.outputStream;
out.cleanup(invalidateClient);
}
void writeOnRetry(long len) throws IOException {
checkStream();
BlockOutputStream out = (BlockOutputStream) this.outputStream;
out.writeOnRetry(len);
this.currentPosition += len;
}
/**
* Builder class for ChunkGroupOutputStreamEntry.
* */
public static class Builder {
private BlockID blockID;
private String key;
private XceiverClientManager xceiverClientManager;
private Pipeline pipeline;
private String requestId;
private int chunkSize;
private long length;
private int streamBufferSize;
private long streamBufferFlushSize;
private boolean streamBufferFlushDelay;
private long streamBufferMaxSize;
private long watchTimeout;
private BufferPool bufferPool;
private Token<OzoneBlockTokenIdentifier> token;
private ChecksumType checksumType;
private int bytesPerChecksum;
public Builder setChecksumType(ChecksumType type) {
this.checksumType = type;
return this;
}
public Builder setBytesPerChecksum(int bytes) {
this.bytesPerChecksum = bytes;
return this;
}
public Builder setBlockID(BlockID bID) {
this.blockID = bID;
return this;
}
public Builder setKey(String keys) {
this.key = keys;
return this;
}
public Builder setXceiverClientManager(XceiverClientManager
xClientManager) {
this.xceiverClientManager = xClientManager;
return this;
}
public Builder setPipeline(Pipeline ppln) {
this.pipeline = ppln;
return this;
}
public Builder setRequestId(String request) {
this.requestId = request;
return this;
}
public Builder setChunkSize(int cSize) {
this.chunkSize = cSize;
return this;
}
public Builder setLength(long len) {
this.length = len;
return this;
}
public Builder setStreamBufferSize(int bufferSize) {
this.streamBufferSize = bufferSize;
return this;
}
public Builder setStreamBufferFlushSize(long bufferFlushSize) {
this.streamBufferFlushSize = bufferFlushSize;
return this;
}
public Builder setStreamBufferFlushDelay(boolean bufferFlushDelay) {
this.streamBufferFlushDelay = bufferFlushDelay;
return this;
}
public Builder setStreamBufferMaxSize(long bufferMaxSize) {
this.streamBufferMaxSize = bufferMaxSize;
return this;
}
public Builder setWatchTimeout(long timeout) {
this.watchTimeout = timeout;
return this;
}
public Builder setbufferPool(BufferPool pool) {
this.bufferPool = pool;
return this;
}
public Builder setToken(Token<OzoneBlockTokenIdentifier> bToken) {
this.token = bToken;
return this;
}
public BlockOutputStreamEntry build() {
return new BlockOutputStreamEntry(blockID, key,
xceiverClientManager, pipeline, requestId, chunkSize,
length, streamBufferSize, streamBufferFlushSize,
streamBufferFlushDelay, streamBufferMaxSize, watchTimeout,
bufferPool, checksumType, bytesPerChecksum, token);
}
}
@VisibleForTesting
public OutputStream getOutputStream() {
return outputStream;
}
public BlockID getBlockID() {
return blockID;
}
public String getKey() {
return key;
}
public XceiverClientManager getXceiverClientManager() {
return xceiverClientManager;
}
public Pipeline getPipeline() {
return pipeline;
}
public int getChunkSize() {
return chunkSize;
}
public long getCurrentPosition() {
return currentPosition;
}
public int getStreamBufferSize() {
return streamBufferSize;
}
public long getStreamBufferFlushSize() {
return streamBufferFlushSize;
}
public boolean getStreamBufferFlushDelay() {
return streamBufferFlushDelay;
}
public long getStreamBufferMaxSize() {
return streamBufferMaxSize;
}
public long getWatchTimeout() {
return watchTimeout;
}
public BufferPool getBufferPool() {
return bufferPool;
}
public void setCurrentPosition(long curPosition) {
this.currentPosition = curPosition;
}
}