| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with this |
| * work for additional information regarding copyright ownership. The ASF |
| * licenses this file to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package org.apache.hadoop.ozone.client.rpc; |
| |
| import java.io.IOException; |
| import java.time.Duration; |
| import java.util.UUID; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| |
| import org.apache.commons.lang3.RandomUtils; |
| import org.apache.hadoop.hdds.client.ReplicationFactor; |
| import org.apache.hadoop.hdds.client.ReplicationType; |
| import org.apache.hadoop.hdds.conf.ConfigurationSource; |
| import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig; |
| import org.apache.hadoop.hdds.conf.OzoneConfiguration; |
| import org.apache.hadoop.hdds.conf.StorageUnit; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos; |
| import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig; |
| import org.apache.hadoop.hdds.scm.OzoneClientConfig; |
| import org.apache.hadoop.hdds.scm.XceiverClientManager; |
| import org.apache.hadoop.hdds.scm.XceiverClientMetrics; |
| import org.apache.hadoop.hdds.scm.storage.BufferPool; |
| import org.apache.hadoop.hdds.scm.storage.RatisBlockOutputStream; |
| import org.apache.hadoop.ozone.ClientConfigForTesting; |
| import org.apache.hadoop.ozone.MiniOzoneCluster; |
| import org.apache.hadoop.ozone.client.ObjectStore; |
| import org.apache.hadoop.ozone.client.OzoneClient; |
| import org.apache.hadoop.ozone.client.OzoneClientFactory; |
| import org.apache.hadoop.ozone.client.io.KeyOutputStream; |
| import org.apache.hadoop.ozone.client.io.OzoneOutputStream; |
| import org.apache.hadoop.ozone.container.TestHelper; |
| |
| import org.junit.jupiter.api.AfterAll; |
| import org.junit.jupiter.api.BeforeAll; |
| import org.junit.jupiter.api.TestInstance; |
| import org.junit.jupiter.api.Timeout; |
| import org.junit.jupiter.params.ParameterizedTest; |
| import org.junit.jupiter.params.provider.ValueSource; |
| |
| import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type.PutBlock; |
| import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type.WriteChunk; |
| import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT; |
| import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL; |
| import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; |
| import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE; |
| import static org.apache.hadoop.ozone.container.TestHelper.validateData; |
| import static org.assertj.core.api.Assertions.assertThat; |
| import static org.junit.jupiter.api.Assertions.assertEquals; |
| import static org.junit.jupiter.api.Assertions.assertInstanceOf; |
| |
| @TestInstance(TestInstance.Lifecycle.PER_CLASS) |
| @Timeout(300) |
| class TestBlockOutputStream { |
| |
| static final int CHUNK_SIZE = 100; |
| static final int FLUSH_SIZE = 2 * CHUNK_SIZE; |
| static final int MAX_FLUSH_SIZE = 2 * FLUSH_SIZE; |
| static final int BLOCK_SIZE = 2 * MAX_FLUSH_SIZE; |
| static final String VOLUME = "testblockoutputstream"; |
| static final String BUCKET = VOLUME; |
| |
| private MiniOzoneCluster cluster; |
| |
| static MiniOzoneCluster createCluster() throws IOException, |
| InterruptedException, TimeoutException { |
| |
| OzoneConfiguration conf = new OzoneConfiguration(); |
| conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); |
| conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, TimeUnit.SECONDS); |
| conf.setQuietMode(false); |
| conf.setStorageSize(OZONE_SCM_BLOCK_SIZE, 4, StorageUnit.MB); |
| conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, 3); |
| |
| DatanodeRatisServerConfig ratisServerConfig = |
| conf.getObject(DatanodeRatisServerConfig.class); |
| ratisServerConfig.setRequestTimeOut(Duration.ofSeconds(3)); |
| ratisServerConfig.setWatchTimeOut(Duration.ofSeconds(3)); |
| conf.setFromObject(ratisServerConfig); |
| |
| RatisClientConfig.RaftConfig raftClientConfig = |
| conf.getObject(RatisClientConfig.RaftConfig.class); |
| raftClientConfig.setRpcRequestTimeout(Duration.ofSeconds(3)); |
| raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(3)); |
| conf.setFromObject(raftClientConfig); |
| |
| RatisClientConfig ratisClientConfig = |
| conf.getObject(RatisClientConfig.class); |
| ratisClientConfig.setWriteRequestTimeout(Duration.ofSeconds(30)); |
| ratisClientConfig.setWatchRequestTimeout(Duration.ofSeconds(30)); |
| conf.setFromObject(ratisClientConfig); |
| |
| ClientConfigForTesting.newBuilder(StorageUnit.BYTES) |
| .setBlockSize(BLOCK_SIZE) |
| .setChunkSize(CHUNK_SIZE) |
| .setStreamBufferFlushSize(FLUSH_SIZE) |
| .setStreamBufferMaxSize(MAX_FLUSH_SIZE) |
| .applyTo(conf); |
| |
| MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf) |
| .setNumDatanodes(5) |
| .build(); |
| cluster.waitForClusterToBeReady(); |
| |
| cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.THREE, |
| 180000); |
| |
| try (OzoneClient client = cluster.newClient()) { |
| ObjectStore objectStore = client.getObjectStore(); |
| objectStore.createVolume(VOLUME); |
| objectStore.getVolume(VOLUME).createBucket(BUCKET); |
| } |
| |
| return cluster; |
| } |
| |
| @BeforeAll |
| void init() throws Exception { |
| cluster = createCluster(); |
| } |
| |
| @AfterAll |
| void shutdown() { |
| if (cluster != null) { |
| cluster.shutdown(); |
| } |
| } |
| |
| static OzoneClientConfig newClientConfig(ConfigurationSource source, |
| boolean flushDelay) { |
| OzoneClientConfig clientConfig = source.getObject(OzoneClientConfig.class); |
| clientConfig.setChecksumType(ChecksumType.NONE); |
| clientConfig.setStreamBufferFlushDelay(flushDelay); |
| return clientConfig; |
| } |
| |
| static OzoneClient newClient(OzoneConfiguration conf, |
| OzoneClientConfig config) throws IOException { |
| OzoneConfiguration copy = new OzoneConfiguration(conf); |
| copy.setFromObject(config); |
| return OzoneClientFactory.getRpcClient(copy); |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = {true, false}) |
| void testWriteLessThanChunkSize(boolean flushDelay) throws Exception { |
| OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay); |
| try (OzoneClient client = newClient(cluster.getConf(), config)) { |
| XceiverClientMetrics metrics = |
| XceiverClientManager.getXceiverClientMetrics(); |
| long writeChunkCount = metrics.getContainerOpCountMetrics(WriteChunk); |
| long putBlockCount = metrics.getContainerOpCountMetrics(PutBlock); |
| long pendingWriteChunkCount = |
| metrics.getPendingContainerOpCountMetrics(WriteChunk); |
| long pendingPutBlockCount = |
| metrics.getPendingContainerOpCountMetrics(PutBlock); |
| long totalOpCount = metrics.getTotalOpCount(); |
| String keyName = getKeyName(); |
| OzoneOutputStream key = createKey(client, keyName); |
| int dataLength = 50; |
| final int totalWriteLength = dataLength * 2; |
| byte[] data1 = RandomUtils.nextBytes(dataLength); |
| key.write(data1); |
| KeyOutputStream keyOutputStream = |
| assertInstanceOf(KeyOutputStream.class, key.getOutputStream()); |
| |
| assertEquals(1, keyOutputStream.getStreamEntries().size()); |
| RatisBlockOutputStream blockOutputStream = |
| assertInstanceOf(RatisBlockOutputStream.class, |
| keyOutputStream.getStreamEntries().get(0).getOutputStream()); |
| |
| // we have written data less than a chunk size, the data will just sit |
| // in the buffer, with only one buffer being allocated in the buffer pool |
| |
| BufferPool bufferPool = blockOutputStream.getBufferPool(); |
| assertEquals(1, bufferPool.getSize()); |
| //Just the writtenDataLength will be updated here |
| assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); |
| |
| // no data will be flushed till now |
| assertEquals(0, blockOutputStream.getTotalDataFlushedLength()); |
| assertEquals(0, blockOutputStream.getTotalAckDataLength()); |
| assertEquals(pendingWriteChunkCount, |
| metrics.getPendingContainerOpCountMetrics(WriteChunk)); |
| assertEquals(pendingPutBlockCount, |
| metrics.getPendingContainerOpCountMetrics(PutBlock)); |
| |
| // commitIndex2FlushedData Map will be empty here |
| assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size()); |
| // Total write data greater than or equal one chunk |
| // size to make sure flush will sync data. |
| key.write(data1); |
| // This will flush the data and update the flush length and the map. |
| key.flush(); |
| |
| // flush is a sync call, all pending operations will complete |
| assertEquals(pendingWriteChunkCount, |
| metrics.getPendingContainerOpCountMetrics(WriteChunk)); |
| assertEquals(pendingPutBlockCount, |
| metrics.getPendingContainerOpCountMetrics(PutBlock)); |
| // we have written data less than a chunk size, the data will just sit |
| // in the buffer, with only one buffer being allocated in the buffer pool |
| |
| assertEquals(1, bufferPool.getSize()); |
| assertEquals(0, bufferPool.getBuffer(0).position()); |
| assertEquals(totalWriteLength, blockOutputStream.getWrittenDataLength()); |
| assertEquals(totalWriteLength, |
| blockOutputStream.getTotalDataFlushedLength()); |
| assertEquals(0, |
| blockOutputStream.getCommitIndex2flushedDataMap().size()); |
| |
| // flush ensures watchForCommit updates the total length acknowledged |
| assertEquals(totalWriteLength, blockOutputStream.getTotalAckDataLength()); |
| |
| assertEquals(1, keyOutputStream.getStreamEntries().size()); |
| // now close the stream, It will update ack length after watchForCommit |
| key.close(); |
| |
| assertEquals(pendingWriteChunkCount, |
| metrics.getPendingContainerOpCountMetrics(WriteChunk)); |
| assertEquals(pendingPutBlockCount, |
| metrics.getPendingContainerOpCountMetrics(PutBlock)); |
| assertEquals(writeChunkCount + 1, |
| metrics.getContainerOpCountMetrics(WriteChunk)); |
| assertEquals(putBlockCount + 2, |
| metrics.getContainerOpCountMetrics(PutBlock)); |
| assertEquals(totalOpCount + 3, metrics.getTotalOpCount()); |
| |
| // make sure the bufferPool is empty |
| assertEquals(0, bufferPool.computeBufferData()); |
| assertEquals(totalWriteLength, blockOutputStream.getTotalAckDataLength()); |
| assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size()); |
| assertEquals(0, keyOutputStream.getStreamEntries().size()); |
| |
| validateData(keyName, data1, client.getObjectStore(), VOLUME, BUCKET); |
| } |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = {true, false}) |
| void testWriteExactlyFlushSize(boolean flushDelay) throws Exception { |
| OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay); |
| try (OzoneClient client = newClient(cluster.getConf(), config)) { |
| XceiverClientMetrics metrics = |
| XceiverClientManager.getXceiverClientMetrics(); |
| final long writeChunkCount = |
| metrics.getContainerOpCountMetrics(WriteChunk); |
| final long putBlockCount = |
| metrics.getContainerOpCountMetrics(PutBlock); |
| final long pendingWriteChunkCount = |
| metrics.getPendingContainerOpCountMetrics(WriteChunk); |
| final long pendingPutBlockCount = |
| metrics.getPendingContainerOpCountMetrics(PutBlock); |
| final long totalOpCount = metrics.getTotalOpCount(); |
| |
| String keyName = getKeyName(); |
| OzoneOutputStream key = createKey(client, keyName); |
| // write data equal to 2 chunks |
| int dataLength = FLUSH_SIZE; |
| byte[] data1 = RandomUtils.nextBytes(dataLength); |
| key.write(data1); |
| |
| assertEquals(writeChunkCount + 2, |
| metrics.getContainerOpCountMetrics(WriteChunk)); |
| assertEquals(putBlockCount + 1, |
| metrics.getContainerOpCountMetrics(PutBlock)); |
| // The WriteChunk and PutBlock can be completed soon. |
| assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk)) |
| .isLessThanOrEqualTo(pendingWriteChunkCount + 2); |
| assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock)) |
| .isLessThanOrEqualTo(pendingPutBlockCount + 1); |
| |
| KeyOutputStream keyOutputStream = |
| assertInstanceOf(KeyOutputStream.class, key.getOutputStream()); |
| assertEquals(1, keyOutputStream.getStreamEntries().size()); |
| RatisBlockOutputStream blockOutputStream = |
| assertInstanceOf(RatisBlockOutputStream.class, |
| keyOutputStream.getStreamEntries().get(0).getOutputStream()); |
| |
| // we have just written data equal flush Size = 2 chunks, at this time |
| // buffer pool will have 2 buffers allocated worth of chunk size |
| |
| assertEquals(2, blockOutputStream.getBufferPool().getSize()); |
| // writtenDataLength as well flushedDataLength will be updated here |
| assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); |
| |
| assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength()); |
| assertEquals(0, blockOutputStream.getTotalAckDataLength()); |
| |
| // Before flush, if there was no pending PutBlock which means it is complete. |
| // It put a commit index into commitIndexMap. |
| assertEquals((metrics.getPendingContainerOpCountMetrics(PutBlock) == pendingPutBlockCount) ? 1 : 0, |
| blockOutputStream.getCommitIndex2flushedDataMap().size()); |
| |
| // Now do a flush. |
| key.flush(); |
| |
| assertEquals(1, keyOutputStream.getStreamEntries().size()); |
| // The previously written data is equal to flushSize, so no action is |
| // triggered when execute flush, if flushDelay is enabled. |
| // If flushDelay is disabled, it will call waitOnFlushFutures to wait all |
| // putBlocks finished. It was broken because WriteChunk and PutBlock |
| // can be complete regardless of whether the flush executed or not. |
| if (flushDelay) { |
| assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk)) |
| .isLessThanOrEqualTo(pendingWriteChunkCount + 2); |
| assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock)) |
| .isLessThanOrEqualTo(pendingWriteChunkCount + 1); |
| } else { |
| assertEquals(pendingWriteChunkCount, |
| metrics.getPendingContainerOpCountMetrics(WriteChunk)); |
| assertEquals(pendingPutBlockCount, |
| metrics.getPendingContainerOpCountMetrics(PutBlock)); |
| } |
| |
| // Since the data in the buffer is already flushed, flush here will have |
| // no impact on the counters and data structures |
| assertEquals(2, blockOutputStream.getBufferPool().getSize()); |
| |
| // No action is triggered when execute flush, BlockOutputStream will not |
| // be updated. |
| assertEquals(flushDelay ? dataLength : 0, |
| blockOutputStream.getBufferPool().computeBufferData()); |
| assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); |
| assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength()); |
| // If the flushDelay feature is enabled, nothing happens. |
| // The assertions will be as same as those before flush. |
| // If it flushed, the Commit index will be removed. |
| assertEquals((flushDelay && |
| (metrics.getPendingContainerOpCountMetrics(PutBlock) == pendingPutBlockCount)) ? 1 : 0, |
| blockOutputStream.getCommitIndex2flushedDataMap().size()); |
| assertEquals(flushDelay ? 0 : dataLength, |
| blockOutputStream.getTotalAckDataLength()); |
| |
| // now close the stream, It will update ack length after watchForCommit |
| key.close(); |
| |
| assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); |
| // make sure the bufferPool is empty |
| assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); |
| assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); |
| assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size()); |
| assertEquals(pendingWriteChunkCount, |
| metrics.getPendingContainerOpCountMetrics(WriteChunk)); |
| assertEquals(pendingPutBlockCount, |
| metrics.getPendingContainerOpCountMetrics(PutBlock)); |
| assertEquals(writeChunkCount + 2, |
| metrics.getContainerOpCountMetrics(WriteChunk)); |
| assertEquals(putBlockCount + 2, |
| metrics.getContainerOpCountMetrics(PutBlock)); |
| assertEquals(totalOpCount + 4, metrics.getTotalOpCount()); |
| assertEquals(0, keyOutputStream.getStreamEntries().size()); |
| |
| validateData(keyName, data1, client.getObjectStore(), VOLUME, BUCKET); |
| } |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = {true, false}) |
| void testWriteMoreThanChunkSize(boolean flushDelay) throws Exception { |
| OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay); |
| try (OzoneClient client = newClient(cluster.getConf(), config)) { |
| XceiverClientMetrics metrics = |
| XceiverClientManager.getXceiverClientMetrics(); |
| long writeChunkCount = metrics.getContainerOpCountMetrics( |
| WriteChunk); |
| long putBlockCount = metrics.getContainerOpCountMetrics( |
| PutBlock); |
| long pendingWriteChunkCount = metrics.getPendingContainerOpCountMetrics( |
| WriteChunk); |
| long pendingPutBlockCount = metrics.getPendingContainerOpCountMetrics( |
| PutBlock); |
| long totalOpCount = metrics.getTotalOpCount(); |
| String keyName = getKeyName(); |
| OzoneOutputStream key = createKey(client, keyName); |
| // write data more than 1 chunk |
| int dataLength = CHUNK_SIZE + 50; |
| byte[] data1 = RandomUtils.nextBytes(dataLength); |
| key.write(data1); |
| assertEquals(totalOpCount + 1, metrics.getTotalOpCount()); |
| KeyOutputStream keyOutputStream = |
| assertInstanceOf(KeyOutputStream.class, key.getOutputStream()); |
| |
| assertEquals(1, keyOutputStream.getStreamEntries().size()); |
| RatisBlockOutputStream blockOutputStream = |
| assertInstanceOf(RatisBlockOutputStream.class, |
| keyOutputStream.getStreamEntries().get(0).getOutputStream()); |
| |
| // we have just written data equal flush Size > 1 chunk, at this time |
| // buffer pool will have 2 buffers allocated worth of chunk size |
| |
| BufferPool bufferPool = blockOutputStream.getBufferPool(); |
| assertEquals(2, bufferPool.getSize()); |
| // writtenDataLength as well flushedDataLength will be updated here |
| assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); |
| |
| // since data written is still less than flushLength, flushLength will |
| // still be 0. |
| assertEquals(0, blockOutputStream.getTotalDataFlushedLength()); |
| assertEquals(0, blockOutputStream.getTotalAckDataLength()); |
| |
| assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size()); |
| |
| // This will flush the data and update the flush length and the map. |
| key.flush(); |
| assertEquals(writeChunkCount + 2, |
| metrics.getContainerOpCountMetrics(WriteChunk)); |
| assertEquals(putBlockCount + 1, |
| metrics.getContainerOpCountMetrics(PutBlock)); |
| assertEquals(pendingWriteChunkCount, |
| metrics.getPendingContainerOpCountMetrics(WriteChunk)); |
| assertEquals(pendingPutBlockCount, |
| metrics.getPendingContainerOpCountMetrics(PutBlock)); |
| |
| assertEquals(2, bufferPool.getSize()); |
| assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); |
| |
| assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength()); |
| assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size()); |
| |
| // flush ensures watchForCommit updates the total length acknowledged |
| assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); |
| |
| // now close the stream, It will update ack length after watchForCommit |
| key.close(); |
| assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); |
| // make sure the bufferPool is empty |
| assertEquals(0, bufferPool.computeBufferData()); |
| assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); |
| assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size()); |
| assertEquals(pendingWriteChunkCount, |
| metrics.getPendingContainerOpCountMetrics(WriteChunk)); |
| assertEquals(pendingPutBlockCount, |
| metrics.getPendingContainerOpCountMetrics(PutBlock)); |
| assertEquals(writeChunkCount + 2, |
| metrics.getContainerOpCountMetrics(WriteChunk)); |
| assertEquals(putBlockCount + 2, |
| metrics.getContainerOpCountMetrics(PutBlock)); |
| assertEquals(totalOpCount + 4, metrics.getTotalOpCount()); |
| assertEquals(0, keyOutputStream.getStreamEntries().size()); |
| |
| validateData(keyName, data1, client.getObjectStore(), VOLUME, BUCKET); |
| } |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = {true, false}) |
| void testWriteMoreThanFlushSize(boolean flushDelay) throws Exception { |
| OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay); |
| try (OzoneClient client = newClient(cluster.getConf(), config)) { |
| XceiverClientMetrics metrics = |
| XceiverClientManager.getXceiverClientMetrics(); |
| long writeChunkCount = metrics.getContainerOpCountMetrics( |
| WriteChunk); |
| long putBlockCount = metrics.getContainerOpCountMetrics( |
| PutBlock); |
| long pendingWriteChunkCount = metrics.getPendingContainerOpCountMetrics( |
| WriteChunk); |
| long pendingPutBlockCount = metrics.getPendingContainerOpCountMetrics( |
| PutBlock); |
| long totalOpCount = metrics.getTotalOpCount(); |
| |
| String keyName = getKeyName(); |
| OzoneOutputStream key = createKey(client, keyName); |
| int dataLength = FLUSH_SIZE + 50; |
| byte[] data1 = RandomUtils.nextBytes(dataLength); |
| key.write(data1); |
| |
| assertEquals(totalOpCount + 3, metrics.getTotalOpCount()); |
| KeyOutputStream keyOutputStream = |
| assertInstanceOf(KeyOutputStream.class, key.getOutputStream()); |
| |
| assertEquals(1, keyOutputStream.getStreamEntries().size()); |
| RatisBlockOutputStream blockOutputStream = |
| assertInstanceOf(RatisBlockOutputStream.class, |
| keyOutputStream.getStreamEntries().get(0).getOutputStream()); |
| |
| // we have just written data more than flush Size(2 chunks), at this time |
| // buffer pool will have 3 buffers allocated worth of chunk size |
| |
| assertEquals(3, blockOutputStream.getBufferPool().getSize()); |
| // writtenDataLength as well flushedDataLength will be updated here |
| assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); |
| |
| assertEquals(FLUSH_SIZE, blockOutputStream.getTotalDataFlushedLength()); |
| assertEquals(0, blockOutputStream.getTotalAckDataLength()); |
| |
| // Before flush, if there was no pending PutBlock which means it is complete. |
| // It put a commit index into commitIndexMap. |
| assertEquals((metrics.getPendingContainerOpCountMetrics(PutBlock) == pendingPutBlockCount) ? 1 : 0, |
| blockOutputStream.getCommitIndex2flushedDataMap().size()); |
| |
| key.flush(); |
| if (flushDelay) { |
| // If the flushDelay feature is enabled, nothing happens. |
| // The assertions will be as same as those before flush. |
| assertEquals(FLUSH_SIZE, blockOutputStream.getTotalDataFlushedLength()); |
| assertEquals((metrics.getPendingContainerOpCountMetrics(PutBlock) == pendingPutBlockCount) ? 1 : 0, |
| blockOutputStream.getCommitIndex2flushedDataMap().size()); |
| |
| assertEquals(0, blockOutputStream.getTotalAckDataLength()); |
| assertEquals(1, keyOutputStream.getStreamEntries().size()); |
| } else { |
| assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength()); |
| assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size()); |
| |
| assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); |
| assertEquals(1, keyOutputStream.getStreamEntries().size()); |
| } |
| |
| |
| key.close(); |
| |
| assertEquals(pendingWriteChunkCount, |
| metrics.getPendingContainerOpCountMetrics(WriteChunk)); |
| assertEquals(pendingPutBlockCount, |
| metrics.getPendingContainerOpCountMetrics(PutBlock)); |
| assertEquals(writeChunkCount + 3, |
| metrics.getContainerOpCountMetrics(WriteChunk)); |
| // If the flushDelay was disabled, it sends PutBlock with the data in the buffer. |
| assertEquals(putBlockCount + (flushDelay ? 2 : 3), |
| metrics.getContainerOpCountMetrics(PutBlock)); |
| assertEquals(totalOpCount + (flushDelay ? 5 : 6), metrics.getTotalOpCount()); |
| assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); |
| // make sure the bufferPool is empty |
| assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); |
| assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); |
| assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size()); |
| assertEquals(0, keyOutputStream.getStreamEntries().size()); |
| |
| validateData(keyName, data1, client.getObjectStore(), VOLUME, BUCKET); |
| } |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = {true, false}) |
| void testWriteExactlyMaxFlushSize(boolean flushDelay) throws Exception { |
| OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay); |
| try (OzoneClient client = newClient(cluster.getConf(), config)) { |
| XceiverClientMetrics metrics = |
| XceiverClientManager.getXceiverClientMetrics(); |
| long writeChunkCount = metrics.getContainerOpCountMetrics(WriteChunk); |
| long putBlockCount = metrics.getContainerOpCountMetrics(PutBlock); |
| long pendingWriteChunkCount = |
| metrics.getPendingContainerOpCountMetrics(WriteChunk); |
| long pendingPutBlockCount = |
| metrics.getPendingContainerOpCountMetrics(PutBlock); |
| long totalOpCount = metrics.getTotalOpCount(); |
| |
| String keyName = getKeyName(); |
| OzoneOutputStream key = createKey(client, keyName); |
| int dataLength = MAX_FLUSH_SIZE; |
| byte[] data1 = RandomUtils.nextBytes(dataLength); |
| key.write(data1); |
| |
| // since its hitting the full bufferCondition, it will call watchForCommit |
| // and completes atleast putBlock for first flushSize worth of data |
| assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk)) |
| .isLessThanOrEqualTo(pendingWriteChunkCount + 2); |
| assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock)) |
| .isLessThanOrEqualTo(pendingPutBlockCount + 1); |
| KeyOutputStream keyOutputStream = |
| assertInstanceOf(KeyOutputStream.class, key.getOutputStream()); |
| |
| assertEquals(1, keyOutputStream.getStreamEntries().size()); |
| RatisBlockOutputStream blockOutputStream = |
| assertInstanceOf(RatisBlockOutputStream.class, |
| keyOutputStream.getStreamEntries().get(0).getOutputStream()); |
| |
| |
| assertEquals(4, blockOutputStream.getBufferPool().getSize()); |
| // writtenDataLength as well flushedDataLength will be updated here |
| assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); |
| |
| assertEquals(MAX_FLUSH_SIZE, |
| blockOutputStream.getTotalDataFlushedLength()); |
| |
| // since data equals to maxBufferSize is written, this will be a blocking |
| // call and hence will wait for atleast flushSize worth of data to get |
| // ack'd by all servers right here |
| assertThat(blockOutputStream.getTotalAckDataLength()) |
| .isGreaterThanOrEqualTo(FLUSH_SIZE); |
| |
| // watchForCommit will clean up atleast one entry from the map where each |
| // entry corresponds to flushSize worth of data |
| |
| assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size()) |
| .isLessThanOrEqualTo(1); |
| |
| // This will flush the data and update the flush length and the map. |
| key.flush(); |
| assertEquals(1, keyOutputStream.getStreamEntries().size()); |
| assertEquals(totalOpCount + 6, metrics.getTotalOpCount()); |
| |
| // Since the data in the buffer is already flushed, flush here will have |
| // no impact on the counters and data structures |
| |
| assertEquals(4, blockOutputStream.getBufferPool().getSize()); |
| assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); |
| |
| assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength()); |
| assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size()) |
| .isLessThanOrEqualTo(1); |
| |
| // now close the stream, it will update ack length after watchForCommit |
| key.close(); |
| assertEquals(pendingWriteChunkCount, |
| metrics.getPendingContainerOpCountMetrics(WriteChunk)); |
| assertEquals(pendingPutBlockCount, |
| metrics.getPendingContainerOpCountMetrics(PutBlock)); |
| assertEquals(writeChunkCount + 4, |
| metrics.getContainerOpCountMetrics(WriteChunk)); |
| assertEquals(putBlockCount + 3, |
| metrics.getContainerOpCountMetrics(PutBlock)); |
| assertEquals(totalOpCount + 7, metrics.getTotalOpCount()); |
| assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); |
| // make sure the bufferPool is empty |
| assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); |
| assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); |
| assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size()); |
| assertEquals(0, keyOutputStream.getStreamEntries().size()); |
| validateData(keyName, data1, client.getObjectStore(), VOLUME, BUCKET); |
| } |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = {true, false}) |
| void testWriteMoreThanMaxFlushSize(boolean flushDelay) throws Exception { |
| OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay); |
| try (OzoneClient client = newClient(cluster.getConf(), config)) { |
| XceiverClientMetrics metrics = |
| XceiverClientManager.getXceiverClientMetrics(); |
| long writeChunkCount = metrics.getContainerOpCountMetrics(WriteChunk); |
| long putBlockCount = metrics.getContainerOpCountMetrics(PutBlock); |
| long pendingWriteChunkCount = |
| metrics.getPendingContainerOpCountMetrics(WriteChunk); |
| long pendingPutBlockCount = |
| metrics.getPendingContainerOpCountMetrics(PutBlock); |
| long totalOpCount = metrics.getTotalOpCount(); |
| String keyName = getKeyName(); |
| OzoneOutputStream key = createKey(client, keyName); |
| int dataLength = MAX_FLUSH_SIZE + 50; |
| // write data more than 1 chunk |
| byte[] data1 = RandomUtils.nextBytes(dataLength); |
| key.write(data1); |
| KeyOutputStream keyOutputStream = |
| assertInstanceOf(KeyOutputStream.class, key.getOutputStream()); |
| |
| // since it's hitting full-buffer, it will call watchForCommit |
| // and completes putBlock at least for first flushSize worth of data |
| assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk)) |
| .isLessThanOrEqualTo(pendingWriteChunkCount + 2); |
| assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock)) |
| .isLessThanOrEqualTo(pendingPutBlockCount + 1); |
| assertEquals(writeChunkCount + 4, |
| metrics.getContainerOpCountMetrics(WriteChunk)); |
| assertEquals(putBlockCount + 2, |
| metrics.getContainerOpCountMetrics(PutBlock)); |
| assertEquals(totalOpCount + 6, metrics.getTotalOpCount()); |
| assertEquals(1, keyOutputStream.getStreamEntries().size()); |
| RatisBlockOutputStream blockOutputStream = |
| assertInstanceOf(RatisBlockOutputStream.class, |
| keyOutputStream.getStreamEntries().get(0).getOutputStream()); |
| |
| assertEquals(4, blockOutputStream.getBufferPool().getSize()); |
| // writtenDataLength as well flushedDataLength will be updated here |
| assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); |
| |
| assertEquals(MAX_FLUSH_SIZE, |
| blockOutputStream.getTotalDataFlushedLength()); |
| |
| // since data equals to maxBufferSize is written, this will be a blocking |
| // call and hence will wait for atleast flushSize worth of data to get |
| // ack'd by all servers right here |
| assertThat(blockOutputStream.getTotalAckDataLength()) |
| .isGreaterThanOrEqualTo(FLUSH_SIZE); |
| |
| // watchForCommit will clean up atleast one entry from the map where each |
| // entry corresponds to flushSize worth of data |
| assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size()) |
| .isLessThanOrEqualTo(1); |
| |
| // Now do a flush. |
| key.flush(); |
| assertEquals(1, keyOutputStream.getStreamEntries().size()); |
| assertEquals(pendingWriteChunkCount, |
| metrics.getPendingContainerOpCountMetrics(WriteChunk)); |
| assertEquals(pendingPutBlockCount, |
| metrics.getPendingContainerOpCountMetrics(PutBlock)); |
| |
| // Since the data in the buffer is already flushed, flush here will have |
| // no impact on the counters and data structures |
| |
| assertEquals(4, blockOutputStream.getBufferPool().getSize()); |
| assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); |
| // dataLength > MAX_FLUSH_SIZE |
| assertEquals(flushDelay ? MAX_FLUSH_SIZE : dataLength, |
| blockOutputStream.getTotalDataFlushedLength()); |
| assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size()) |
| .isLessThanOrEqualTo(2); |
| |
| // now close the stream, it will update ack length after watchForCommit |
| key.close(); |
| assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); |
| // make sure the bufferPool is empty |
| assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); |
| assertEquals(pendingWriteChunkCount, |
| metrics.getPendingContainerOpCountMetrics(WriteChunk)); |
| assertEquals(pendingPutBlockCount, |
| metrics.getPendingContainerOpCountMetrics(PutBlock)); |
| assertEquals(writeChunkCount + 5, |
| metrics.getContainerOpCountMetrics(WriteChunk)); |
| // The previous flush did not trigger any action with flushDelay enabled |
| assertEquals(putBlockCount + (flushDelay ? 3 : 4), |
| metrics.getContainerOpCountMetrics(PutBlock)); |
| assertEquals(totalOpCount + (flushDelay ? 8 : 9), |
| metrics.getTotalOpCount()); |
| assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); |
| assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size()); |
| assertEquals(0, keyOutputStream.getStreamEntries().size()); |
| validateData(keyName, data1, client.getObjectStore(), VOLUME, BUCKET); |
| } |
| } |
| |
| static OzoneOutputStream createKey(OzoneClient client, String keyName) |
| throws Exception { |
| return createKey(client, keyName, 0, ReplicationFactor.THREE); |
| } |
| |
| static OzoneOutputStream createKey(OzoneClient client, String keyName, |
| long size, ReplicationFactor factor) throws Exception { |
| return TestHelper.createKey(keyName, ReplicationType.RATIS, factor, size, |
| client.getObjectStore(), VOLUME, BUCKET); |
| } |
| |
| static String getKeyName() { |
| return UUID.randomUUID().toString(); |
| } |
| |
| } |