blob: 71d04a00e643b8d67f40c4af2fa3fc59fa547521 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.storage;
import java.io.IOException;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetCommittedBlockLengthResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.jetbrains.annotations.NotNull;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
/**
* UNIT test for BlockOutputStream.
* <p>
* Compares bytes written to the stream and received in the ChunkWriteRequests.
*/
public class TestBlockOutputStreamCorrectness {
private static final long SEED = 18480315L;
private int writeUnitSize = 1;
@Test
public void test() throws IOException {
final BufferPool bufferPool = new BufferPool(4 * 1024 * 1024, 32 / 4);
for (int block = 0; block < 10; block++) {
BlockOutputStream outputStream =
createBlockOutputStream(bufferPool);
Random random = new Random(SEED);
int max = 256 * 1024 * 1024 / writeUnitSize;
byte[] writeBuffer = new byte[writeUnitSize];
for (int t = 0; t < max; t++) {
if (writeUnitSize > 1) {
for (int i = 0; i < writeBuffer.length; i++) {
writeBuffer[i] = (byte) random.nextInt();
}
outputStream.write(writeBuffer, 0, writeBuffer.length);
} else {
outputStream.write((byte) random.nextInt());
}
}
outputStream.close();
}
}
@NotNull
private BlockOutputStream createBlockOutputStream(BufferPool bufferPool)
throws IOException {
final Pipeline pipeline = MockPipeline.createRatisPipeline();
final XceiverClientManager xcm = Mockito.mock(XceiverClientManager.class);
Mockito.when(xcm.acquireClient(Mockito.any()))
.thenReturn(new MockXceiverClientSpi(pipeline));
OzoneClientConfig config = new OzoneClientConfig();
config.setStreamBufferSize(4 * 1024 * 1024);
config.setStreamBufferMaxSize(32 * 1024 * 1024);
config.setStreamBufferFlushDelay(true);
config.setStreamBufferFlushSize(16 * 1024 * 1024);
config.setChecksumType(ChecksumType.NONE);
config.setBytesPerChecksum(256 * 1024);
BlockOutputStream outputStream = new BlockOutputStream(
new BlockID(1L, 1L),
xcm,
pipeline,
bufferPool,
config,
null);
return outputStream;
}
/**
* XCeiverClient which simulates responses.
*/
private class MockXceiverClientSpi extends XceiverClientSpi {
private final Pipeline pipeline;
private Random expectedRandomStream = new Random(SEED);
private AtomicInteger counter = new AtomicInteger();
MockXceiverClientSpi(Pipeline pipeline) {
super();
this.pipeline = pipeline;
}
@Override
public void connect() throws Exception {
}
@Override
public void connect(String encodedToken) throws Exception {
}
@Override
public void close() {
}
@Override
public Pipeline getPipeline() {
return pipeline;
}
@Override
public XceiverClientReply sendCommandAsync(
ContainerCommandRequestProto request
)
throws IOException, ExecutionException, InterruptedException {
final ContainerCommandResponseProto.Builder builder =
ContainerCommandResponseProto.newBuilder()
.setResult(Result.SUCCESS)
.setCmdType(request.getCmdType());
switch (request.getCmdType()) {
case PutBlock:
builder.setPutBlock(PutBlockResponseProto.newBuilder()
.setCommittedBlockLength(
GetCommittedBlockLengthResponseProto.newBuilder()
.setBlockID(
request.getPutBlock().getBlockData().getBlockID())
.setBlockLength(
request.getPutBlock().getBlockData().getSize())
.build())
.build());
break;
case WriteChunk:
ByteString data = request.getWriteChunk().getData();
final byte[] writePayload = data.toByteArray();
for (int i = 0; i < writePayload.length; i++) {
byte expectedByte = (byte) expectedRandomStream.nextInt();
Assert.assertEquals(expectedByte,
writePayload[i]);
}
break;
default:
//no-op
}
final XceiverClientReply result = new XceiverClientReply(
CompletableFuture.completedFuture(builder.build()));
result.setLogIndex(counter.incrementAndGet());
return result;
}
@Override
public ReplicationType getPipelineType() {
return null;
}
@Override
public XceiverClientReply watchForCommit(long index)
throws InterruptedException, ExecutionException, TimeoutException,
IOException {
final ContainerCommandResponseProto.Builder builder =
ContainerCommandResponseProto.newBuilder()
.setCmdType(Type.WriteChunk)
.setResult(Result.SUCCESS);
final XceiverClientReply xceiverClientReply = new XceiverClientReply(
CompletableFuture.completedFuture(builder.build()));
xceiverClientReply.setLogIndex(index);
return xceiverClientReply;
}
@Override
public long getReplicatedMinCommitIndex() {
return 0;
}
@Override
public Map<DatanodeDetails, ContainerCommandResponseProto>
sendCommandOnAllNodes(ContainerCommandRequestProto request
) throws IOException, InterruptedException {
return null;
}
}
}