blob: ae9cb88d7d4931066a905cd56a3894a01cd06527 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.scm;
import com.google.common.cache.Cache;
import org.apache.hadoop.hdds.scm.XceiverClientManager.ScmClientConfig;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.util.UUID;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_METADATA_DIR_NAME;
/**
* Test for XceiverClientManager caching and eviction.
*/
@Timeout(300)
public class TestXceiverClientManager {
private static OzoneConfiguration config;
private static MiniOzoneCluster cluster;
private static StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
@BeforeAll
public static void init() throws Exception {
config = new OzoneConfiguration();
cluster = MiniOzoneCluster.newBuilder(config)
.setNumDatanodes(3)
.build();
cluster.waitForClusterToBeReady();
storageContainerLocationClient = cluster
.getStorageContainerLocationClient();
}
@AfterAll
public static void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
IOUtils.cleanupWithLogger(null, storageContainerLocationClient);
}
@Test
public void testCaching() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
String metaDir = GenericTestUtils.getTempPath(
TestXceiverClientManager.class.getName() + UUID.randomUUID());
conf.set(HDDS_METADATA_DIR_NAME, metaDir);
XceiverClientManager clientManager = new XceiverClientManager(conf);
ContainerWithPipeline container1 = storageContainerLocationClient
.allocateContainer(
SCMTestUtils.getReplicationType(conf),
HddsProtos.ReplicationFactor.ONE,
OzoneConsts.OZONE);
XceiverClientSpi client1 = clientManager
.acquireClient(container1.getPipeline());
Assertions.assertEquals(1, client1.getRefcount());
ContainerWithPipeline container2 = storageContainerLocationClient
.allocateContainer(
SCMTestUtils.getReplicationType(conf),
HddsProtos.ReplicationFactor.THREE,
OzoneConsts.OZONE);
XceiverClientSpi client2 = clientManager
.acquireClient(container2.getPipeline());
Assertions.assertEquals(1, client2.getRefcount());
XceiverClientSpi client3 = clientManager
.acquireClient(container1.getPipeline());
Assertions.assertEquals(2, client3.getRefcount());
Assertions.assertEquals(2, client1.getRefcount());
Assertions.assertEquals(client1, client3);
clientManager.releaseClient(client1, false);
clientManager.releaseClient(client2, false);
clientManager.releaseClient(client3, false);
}
@Test
public void testFreeByReference() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
ScmClientConfig clientConfig = conf.getObject(ScmClientConfig.class);
clientConfig.setMaxSize(1);
String metaDir = GenericTestUtils.getTempPath(
TestXceiverClientManager.class.getName() + UUID.randomUUID());
conf.set(HDDS_METADATA_DIR_NAME, metaDir);
XceiverClientManager clientManager =
new XceiverClientManager(conf, clientConfig, null);
Cache<String, XceiverClientSpi> cache =
clientManager.getClientCache();
ContainerWithPipeline container1 =
storageContainerLocationClient.allocateContainer(
SCMTestUtils.getReplicationType(conf),
HddsProtos.ReplicationFactor.ONE,
OzoneConsts.OZONE);
XceiverClientSpi client1 = clientManager
.acquireClient(container1.getPipeline());
Assertions.assertEquals(1, client1.getRefcount());
Assertions.assertEquals(container1.getPipeline(),
client1.getPipeline());
ContainerWithPipeline container2 =
storageContainerLocationClient.allocateContainer(
SCMTestUtils.getReplicationType(conf),
HddsProtos.ReplicationFactor.THREE,
OzoneConsts.OZONE);
XceiverClientSpi client2 = clientManager
.acquireClient(container2.getPipeline());
Assertions.assertEquals(1, client2.getRefcount());
Assertions.assertNotEquals(client1, client2);
// least recent container (i.e containerName1) is evicted
XceiverClientSpi nonExistent1 = cache.getIfPresent(
container1.getContainerInfo().getPipelineID().getId().toString()
+ container1.getContainerInfo().getReplicationType());
Assertions.assertEquals(null, nonExistent1);
// However container call should succeed because of refcount on the client.
ContainerProtocolCalls.createContainer(client1,
container1.getContainerInfo().getContainerID(), null);
// After releasing the client, this connection should be closed
// and any container operations should fail
clientManager.releaseClient(client1, false);
// Create container should throw exception on closed client
Throwable t = Assertions.assertThrows(IOException.class,
() -> ContainerProtocolCalls.createContainer(client1,
container1.getContainerInfo().getContainerID(), null));
Assertions.assertTrue(
t.getMessage().contains("This channel is not connected"));
clientManager.releaseClient(client2, false);
}
@Test
public void testFreeByEviction() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
ScmClientConfig clientConfig = conf.getObject(ScmClientConfig.class);
clientConfig.setMaxSize(1);
String metaDir = GenericTestUtils.getTempPath(
TestXceiverClientManager.class.getName() + UUID.randomUUID());
conf.set(HDDS_METADATA_DIR_NAME, metaDir);
XceiverClientManager clientManager =
new XceiverClientManager(conf, clientConfig, null);
Cache<String, XceiverClientSpi> cache =
clientManager.getClientCache();
ContainerWithPipeline container1 =
storageContainerLocationClient.allocateContainer(
SCMTestUtils.getReplicationType(conf),
HddsProtos.ReplicationFactor.ONE,
OzoneConsts.OZONE);
XceiverClientSpi client1 = clientManager
.acquireClient(container1.getPipeline());
Assertions.assertEquals(1, client1.getRefcount());
clientManager.releaseClient(client1, false);
Assertions.assertEquals(0, client1.getRefcount());
ContainerWithPipeline container2 =
storageContainerLocationClient.allocateContainer(
SCMTestUtils.getReplicationType(conf),
HddsProtos.ReplicationFactor.THREE,
OzoneConsts.OZONE);
XceiverClientSpi client2 = clientManager
.acquireClient(container2.getPipeline());
Assertions.assertEquals(1, client2.getRefcount());
Assertions.assertNotEquals(client1, client2);
// now client 1 should be evicted
XceiverClientSpi nonExistent = cache.getIfPresent(
container1.getContainerInfo().getPipelineID().getId().toString()
+ container1.getContainerInfo().getReplicationType());
Assertions.assertEquals(null, nonExistent);
// Any container operation should now fail
Throwable t = Assertions.assertThrows(IOException.class,
() -> ContainerProtocolCalls.createContainer(client1,
container1.getContainerInfo().getContainerID(), null));
Assertions.assertTrue(
t.getMessage().contains("This channel is not connected"));
clientManager.releaseClient(client2, false);
}
@Test
public void testFreeByRetryFailure() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
ScmClientConfig clientConfig = conf.getObject(ScmClientConfig.class);
clientConfig.setMaxSize(1);
XceiverClientManager clientManager =
new XceiverClientManager(conf, clientConfig, null);
Cache<String, XceiverClientSpi> cache =
clientManager.getClientCache();
// client is added in cache
ContainerWithPipeline container1 =
storageContainerLocationClient.allocateContainer(
SCMTestUtils.getReplicationType(conf),
SCMTestUtils.getReplicationFactor(conf),
OzoneConsts.OZONE);
XceiverClientSpi client1 =
clientManager.acquireClient(container1.getPipeline());
clientManager.acquireClient(container1.getPipeline());
Assertions.assertEquals(2, client1.getRefcount());
// client should be invalidated in the cache
clientManager.releaseClient(client1, true);
Assertions.assertEquals(1, client1.getRefcount());
Assertions.assertNull(cache.getIfPresent(
container1.getContainerInfo().getPipelineID().getId().toString()
+ container1.getContainerInfo().getReplicationType()));
// new client should be added in cache
XceiverClientSpi client2 =
clientManager.acquireClient(container1.getPipeline());
Assertions.assertNotEquals(client1, client2);
Assertions.assertEquals(1, client2.getRefcount());
// on releasing the old client the entry in cache should not be invalidated
clientManager.releaseClient(client1, true);
Assertions.assertEquals(0, client1.getRefcount());
Assertions.assertNotNull(cache.getIfPresent(
container1.getContainerInfo().getPipelineID().getId().toString()
+ container1.getContainerInfo().getReplicationType()));
}
}