blob: 79c937ceb58b01e78961c38bc6e4de50b684a887 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
/**
* Tests for TestXceiverClientGrpc, to ensure topology aware reads work
* select the closest node, and connections are re-used after a getBlock call.
*/
public class TestXceiverClientGrpc {
private Pipeline pipeline;
private List<DatanodeDetails> dns;
private List<DatanodeDetails> dnsInOrder;
private OzoneConfiguration conf = new OzoneConfiguration();
@BeforeEach
public void setup() {
dns = new ArrayList<>();
dns.add(MockDatanodeDetails.randomDatanodeDetails());
dns.add(MockDatanodeDetails.randomDatanodeDetails());
dns.add(MockDatanodeDetails.randomDatanodeDetails());
dnsInOrder = new ArrayList<>();
for (int i = 2; i >= 0; i--) {
dnsInOrder.add(dns.get(i));
}
pipeline = Pipeline.newBuilder()
.setId(PipelineID.randomId())
.setReplicationConfig(
RatisReplicationConfig.getInstance(ReplicationFactor.THREE))
.setState(Pipeline.PipelineState.CLOSED)
.setNodes(dns)
.setNodesInOrder(dnsInOrder)
.build();
}
@Test
public void testCorrectDnsReturnedFromPipeline() throws IOException {
assertEquals(dnsInOrder.get(0), pipeline.getClosestNode());
assertEquals(dns.get(0), pipeline.getFirstNode());
assertNotEquals(dns.get(0), dnsInOrder.get(0));
}
@Test
@Timeout(5)
public void testRandomFirstNodeIsCommandTarget() throws IOException {
final ArrayList<DatanodeDetails> allDNs = new ArrayList<>(dns);
conf.setBoolean(
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, false);
// Using a new Xceiver Client, call it repeatedly until all DNs in the
// pipeline have been the target of the command, indicating it is shuffling
// the DNs on each call with a new client. This test will timeout if this
// is not happening.
while (allDNs.size() > 0) {
try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) {
@Override
public XceiverClientReply sendCommandAsync(
ContainerProtos.ContainerCommandRequestProto request,
DatanodeDetails dn) {
allDNs.remove(dn);
return buildValidResponse();
}
}) {
invokeXceiverClientGetBlock(client);
}
}
}
@Test
@Timeout(5)
public void testGetBlockRetryAlNodes() {
final ArrayList<DatanodeDetails> allDNs = new ArrayList<>(dns);
assertThat(allDNs.size()).isGreaterThan(1);
try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) {
@Override
public XceiverClientReply sendCommandAsync(
ContainerProtos.ContainerCommandRequestProto request,
DatanodeDetails dn) throws IOException {
allDNs.remove(dn);
throw new IOException("Failed " + dn);
}
}) {
invokeXceiverClientGetBlock(client);
} catch (IOException e) {
e.printStackTrace();
}
assertEquals(0, allDNs.size());
}
@Test
@Timeout(5)
public void testReadChunkRetryAllNodes() {
final ArrayList<DatanodeDetails> allDNs = new ArrayList<>(dns);
assertThat(allDNs.size()).isGreaterThan(1);
try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) {
@Override
public XceiverClientReply sendCommandAsync(
ContainerProtos.ContainerCommandRequestProto request,
DatanodeDetails dn) throws IOException {
allDNs.remove(dn);
throw new IOException("Failed " + dn);
}
}) {
invokeXceiverClientReadChunk(client);
} catch (IOException e) {
e.printStackTrace();
}
assertEquals(0, allDNs.size());
}
@Test
public void testFirstNodeIsCorrectWithTopologyForCommandTarget()
throws IOException {
final Set<DatanodeDetails> seenDNs = new HashSet<>();
conf.setBoolean(
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, true);
// With a new Client, make 100 calls and ensure the first sortedDN is used
// each time. The logic should always use the sorted node, so we can check
// only a single DN is ever seen after 100 calls.
for (int i = 0; i < 100; i++) {
try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) {
@Override
public XceiverClientReply sendCommandAsync(
ContainerProtos.ContainerCommandRequestProto request,
DatanodeDetails dn) {
seenDNs.add(dn);
return buildValidResponse();
}
}) {
invokeXceiverClientGetBlock(client);
}
}
assertEquals(1, seenDNs.size());
}
@Test
public void testConnectionReusedAfterGetBlock() throws IOException {
// With a new Client, make 100 calls. On each call, ensure that only one
// DN is seen, indicating the same DN connection is reused.
for (int i = 0; i < 100; i++) {
final Set<DatanodeDetails> seenDNs = new HashSet<>();
try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) {
@Override
public XceiverClientReply sendCommandAsync(
ContainerProtos.ContainerCommandRequestProto request,
DatanodeDetails dn) {
seenDNs.add(dn);
return buildValidResponse();
}
}) {
invokeXceiverClientGetBlock(client);
invokeXceiverClientGetBlock(client);
invokeXceiverClientReadChunk(client);
invokeXceiverClientReadSmallFile(client);
}
assertEquals(1, seenDNs.size());
}
}
private void invokeXceiverClientGetBlock(XceiverClientSpi client)
throws IOException {
ContainerProtocolCalls.getBlock(client,
ContainerProtos.DatanodeBlockID.newBuilder()
.setContainerID(1)
.setLocalID(1)
.setBlockCommitSequenceId(1)
.build(), null);
}
private void invokeXceiverClientReadChunk(XceiverClientSpi client)
throws IOException {
BlockID bid = new BlockID(1, 1);
bid.setBlockCommitSequenceId(1);
ContainerProtocolCalls.readChunk(client,
ContainerProtos.ChunkInfo.newBuilder()
.setChunkName("Anything")
.setChecksumData(ContainerProtos.ChecksumData.newBuilder()
.setBytesPerChecksum(512)
.setType(ContainerProtos.ChecksumType.CRC32)
.build())
.setLen(-1)
.setOffset(0)
.build(),
bid,
null, null);
}
private void invokeXceiverClientReadSmallFile(XceiverClientSpi client)
throws IOException {
BlockID bid = new BlockID(1, 1);
bid.setBlockCommitSequenceId(1);
ContainerProtocolCalls.readSmallFile(client, bid, null);
}
private XceiverClientReply buildValidResponse() {
ContainerProtos.ContainerCommandResponseProto resp =
ContainerProtos.ContainerCommandResponseProto.newBuilder()
.setCmdType(ContainerProtos.Type.GetBlock)
.setResult(ContainerProtos.Result.SUCCESS).build();
final CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
replyFuture = new CompletableFuture<>();
replyFuture.complete(resp);
return new XceiverClientReply(replyFuture);
}
}