| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with this |
| * work for additional information regarding copyright ownership. The ASF |
| * licenses this file to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.hadoop.ozone.om; |
| |
| import java.io.IOException; |
| import java.net.ConnectException; |
| import java.net.InetSocketAddress; |
| import java.util.ArrayList; |
| import java.util.BitSet; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| |
| import org.apache.hadoop.ozone.OzoneAcl; |
| import org.apache.hadoop.ozone.OzoneConfigKeys; |
| import org.apache.hadoop.ozone.security.acl.OzoneObj; |
| import org.apache.hadoop.ozone.security.acl.OzoneObjInfo; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.ExpectedException; |
| import org.junit.rules.Timeout; |
| import org.apache.log4j.Logger; |
| |
| import org.apache.commons.lang3.RandomStringUtils; |
| import org.apache.hadoop.hdds.client.ReplicationFactor; |
| import org.apache.hadoop.hdds.client.ReplicationType; |
| import org.apache.hadoop.hdds.conf.OzoneConfiguration; |
| import org.apache.hadoop.hdds.protocol.StorageType; |
| import org.apache.hadoop.hdfs.LogVerificationAppender; |
| import org.apache.hadoop.ipc.RemoteException; |
| import org.apache.hadoop.ozone.MiniOzoneCluster; |
| import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl; |
| import org.apache.hadoop.ozone.OzoneTestUtils; |
| import org.apache.hadoop.ozone.client.BucketArgs; |
| import org.apache.hadoop.ozone.client.ObjectStore; |
| import org.apache.hadoop.ozone.client.OzoneBucket; |
| import org.apache.hadoop.ozone.client.OzoneClient; |
| import org.apache.hadoop.ozone.client.OzoneKeyDetails; |
| import org.apache.hadoop.ozone.client.io.OzoneInputStream; |
| import org.apache.hadoop.ozone.client.io.OzoneOutputStream; |
| import org.apache.hadoop.ozone.om.exceptions.OMException; |
| import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider; |
| import org.apache.hadoop.ozone.om.ha.OMProxyInfo; |
| import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; |
| import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; |
| import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.hadoop.ozone.client.OzoneClientFactory; |
| import org.apache.hadoop.ozone.client.OzoneVolume; |
| import org.apache.hadoop.ozone.client.VolumeArgs; |
| import org.apache.hadoop.util.Time; |
| |
| |
| import static org.apache.hadoop.ozone.MiniOzoneHAClusterImpl |
| .NODE_FAILURE_TIMEOUT; |
| import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS; |
| import static org.apache.hadoop.ozone.OzoneAcl.AclScope.DEFAULT; |
| import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED; |
| import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD; |
| import static org.apache.hadoop.ozone.OzoneConfigKeys |
| .OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY; |
| import static org.apache.hadoop.ozone.OzoneConfigKeys |
| .OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT; |
| import static org.apache.hadoop.ozone.OzoneConfigKeys |
| .OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY; |
| import static org.apache.hadoop.ozone.OzoneConfigKeys |
| .OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS; |
| import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_ALREADY_EXISTS; |
| import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_A_FILE; |
| import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType.USER; |
| import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.READ; |
| import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.WRITE; |
| import static org.junit.Assert.fail; |
| |
| /** |
| * Test Ozone Manager operation in distributed handler scenario. |
| */ |
| public class TestOzoneManagerHA { |
| |
| private MiniOzoneHAClusterImpl cluster = null; |
| private ObjectStore objectStore; |
| private OzoneConfiguration conf; |
| private String clusterId; |
| private String scmId; |
| private String omServiceId; |
| private int numOfOMs = 3; |
| private static final long SNAPSHOT_THRESHOLD = 50; |
| private static final int LOG_PURGE_GAP = 50; |
| |
| @Rule |
| public ExpectedException exception = ExpectedException.none(); |
| |
| @Rule |
| public Timeout timeout = new Timeout(300_000); |
| |
| /** |
| * Create a MiniDFSCluster for testing. |
| * <p> |
| * Ozone is made active by setting OZONE_ENABLED = true |
| * |
| * @throws IOException |
| */ |
| @Before |
| public void init() throws Exception { |
| conf = new OzoneConfiguration(); |
| clusterId = UUID.randomUUID().toString(); |
| scmId = UUID.randomUUID().toString(); |
| omServiceId = "om-service-test1"; |
| conf.setBoolean(OZONE_ACL_ENABLED, true); |
| conf.set(OzoneConfigKeys.OZONE_ADMINISTRATORS, |
| OZONE_ADMINISTRATORS_WILDCARD); |
| conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2); |
| conf.setInt(OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY, 10); |
| conf.setInt(OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 10); |
| conf.setLong( |
| OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY, |
| SNAPSHOT_THRESHOLD); |
| conf.setInt(OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP, LOG_PURGE_GAP); |
| cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf) |
| .setClusterId(clusterId) |
| .setScmId(scmId) |
| .setOMServiceId(omServiceId) |
| .setNumOfOzoneManagers(numOfOMs) |
| .build(); |
| cluster.waitForClusterToBeReady(); |
| objectStore = OzoneClientFactory.getRpcClient(omServiceId, conf) |
| .getObjectStore(); |
| } |
| |
| /** |
| * Shutdown MiniDFSCluster. |
| */ |
| @After |
| public void shutdown() { |
| if (cluster != null) { |
| cluster.shutdown(); |
| } |
| } |
| |
| |
| private OzoneVolume createAndCheckVolume(String volumeName) |
| throws Exception { |
| String userName = "user" + RandomStringUtils.randomNumeric(5); |
| String adminName = "admin" + RandomStringUtils.randomNumeric(5); |
| VolumeArgs createVolumeArgs = VolumeArgs.newBuilder() |
| .setOwner(userName) |
| .setAdmin(adminName) |
| .build(); |
| |
| objectStore.createVolume(volumeName, createVolumeArgs); |
| |
| OzoneVolume retVolume = objectStore.getVolume(volumeName); |
| |
| Assert.assertTrue(retVolume.getName().equals(volumeName)); |
| Assert.assertTrue(retVolume.getOwner().equals(userName)); |
| Assert.assertTrue(retVolume.getAdmin().equals(adminName)); |
| |
| return retVolume; |
| } |
| @Test |
| public void testAllVolumeOperations() throws Exception { |
| |
| String volumeName = "volume" + RandomStringUtils.randomNumeric(5); |
| |
| createAndCheckVolume(volumeName); |
| |
| objectStore.deleteVolume(volumeName); |
| |
| OzoneTestUtils.expectOmException(OMException.ResultCodes.VOLUME_NOT_FOUND, |
| () -> objectStore.getVolume(volumeName)); |
| |
| OzoneTestUtils.expectOmException(OMException.ResultCodes.VOLUME_NOT_FOUND, |
| () -> objectStore.deleteVolume(volumeName)); |
| } |
| |
| |
| @Test |
| public void testAllBucketOperations() throws Exception { |
| |
| String volumeName = "volume" + RandomStringUtils.randomNumeric(5); |
| String bucketName = "volume" + RandomStringUtils.randomNumeric(5); |
| |
| OzoneVolume retVolume = createAndCheckVolume(volumeName); |
| |
| BucketArgs bucketArgs = |
| BucketArgs.newBuilder().setStorageType(StorageType.DISK) |
| .setVersioning(true).build(); |
| |
| |
| retVolume.createBucket(bucketName, bucketArgs); |
| |
| |
| OzoneBucket ozoneBucket = retVolume.getBucket(bucketName); |
| |
| Assert.assertEquals(volumeName, ozoneBucket.getVolumeName()); |
| Assert.assertEquals(bucketName, ozoneBucket.getName()); |
| Assert.assertTrue(ozoneBucket.getVersioning()); |
| Assert.assertEquals(StorageType.DISK, ozoneBucket.getStorageType()); |
| Assert.assertTrue(ozoneBucket.getCreationTime() <= Time.now()); |
| |
| |
| // Change versioning to false |
| ozoneBucket.setVersioning(false); |
| |
| ozoneBucket = retVolume.getBucket(bucketName); |
| Assert.assertFalse(ozoneBucket.getVersioning()); |
| |
| retVolume.deleteBucket(bucketName); |
| |
| OzoneTestUtils.expectOmException(OMException.ResultCodes.BUCKET_NOT_FOUND, |
| () -> retVolume.deleteBucket(bucketName)); |
| |
| |
| |
| } |
| |
| /** |
| * Test a client request when all OM nodes are running. The request should |
| * succeed. |
| * @throws Exception |
| */ |
| @Test |
| public void testAllOMNodesRunning() throws Exception { |
| createVolumeTest(true); |
| createKeyTest(true); |
| } |
| |
| /** |
| * Test client request succeeds even if one OM is down. |
| */ |
| @Test |
| public void testOneOMNodeDown() throws Exception { |
| cluster.stopOzoneManager(1); |
| Thread.sleep(NODE_FAILURE_TIMEOUT * 2); |
| |
| createVolumeTest(true); |
| |
| createKeyTest(true); |
| } |
| |
| /** |
| * Test client request fails when 2 OMs are down. |
| */ |
| @Test |
| public void testTwoOMNodesDown() throws Exception { |
| cluster.stopOzoneManager(1); |
| cluster.stopOzoneManager(2); |
| Thread.sleep(NODE_FAILURE_TIMEOUT * 2); |
| |
| createVolumeTest(false); |
| |
| createKeyTest(false); |
| |
| } |
| |
| private OzoneBucket setupBucket() throws Exception { |
| String userName = "user" + RandomStringUtils.randomNumeric(5); |
| String adminName = "admin" + RandomStringUtils.randomNumeric(5); |
| String volumeName = "volume" + RandomStringUtils.randomNumeric(5); |
| |
| VolumeArgs createVolumeArgs = VolumeArgs.newBuilder() |
| .setOwner(userName) |
| .setAdmin(adminName) |
| .build(); |
| |
| objectStore.createVolume(volumeName, createVolumeArgs); |
| OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName); |
| |
| Assert.assertTrue(retVolumeinfo.getName().equals(volumeName)); |
| Assert.assertTrue(retVolumeinfo.getOwner().equals(userName)); |
| Assert.assertTrue(retVolumeinfo.getAdmin().equals(adminName)); |
| |
| String bucketName = UUID.randomUUID().toString(); |
| retVolumeinfo.createBucket(bucketName); |
| |
| OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName); |
| |
| Assert.assertTrue(ozoneBucket.getName().equals(bucketName)); |
| Assert.assertTrue(ozoneBucket.getVolumeName().equals(volumeName)); |
| |
| return ozoneBucket; |
| } |
| |
| @Test |
| public void testMultipartUpload() throws Exception { |
| |
| // Happy scenario when all OM's are up. |
| OzoneBucket ozoneBucket = setupBucket(); |
| |
| String keyName = UUID.randomUUID().toString(); |
| String uploadID = initiateMultipartUpload(ozoneBucket, keyName); |
| |
| createMultipartKeyAndReadKey(ozoneBucket, keyName, uploadID); |
| |
| } |
| |
| |
| @Test |
| public void testFileOperationsWithRecursive() throws Exception { |
| OzoneBucket ozoneBucket = setupBucket(); |
| |
| String data = "random data"; |
| |
| // one level key name |
| String keyName = UUID.randomUUID().toString(); |
| testCreateFile(ozoneBucket, keyName, data, true, false); |
| |
| // multi level key name |
| keyName = "dir1/dir2/dir3/file1"; |
| testCreateFile(ozoneBucket, keyName, data, true, false); |
| |
| |
| data = "random data random data"; |
| |
| // multi level key name with over write set. |
| testCreateFile(ozoneBucket, keyName, data, true, true); |
| |
| |
| try { |
| testCreateFile(ozoneBucket, keyName, data, true, false); |
| fail("testFileOperationsWithRecursive"); |
| } catch (OMException ex) { |
| Assert.assertEquals(FILE_ALREADY_EXISTS, ex.getResult()); |
| } |
| |
| // Try now with a file name which is same as a directory. |
| try { |
| keyName = "folder/folder2"; |
| ozoneBucket.createDirectory(keyName); |
| testCreateFile(ozoneBucket, keyName, data, true, false); |
| fail("testFileOperationsWithNonRecursive"); |
| } catch (OMException ex) { |
| Assert.assertEquals(NOT_A_FILE, ex.getResult()); |
| } |
| |
| } |
| |
| |
| @Test |
| public void testFileOperationsWithNonRecursive() throws Exception { |
| OzoneBucket ozoneBucket = setupBucket(); |
| |
| String data = "random data"; |
| |
| // one level key name |
| String keyName = UUID.randomUUID().toString(); |
| testCreateFile(ozoneBucket, keyName, data, false, false); |
| |
| // multi level key name |
| keyName = "dir1/dir2/dir3/file1"; |
| |
| // Should fail, as this is non-recursive and no parent directories exist |
| try { |
| testCreateFile(ozoneBucket, keyName, data, false, false); |
| } catch (OMException ex) { |
| Assert.assertEquals(NOT_A_FILE, ex.getResult()); |
| } |
| |
| // create directory, now this should pass. |
| ozoneBucket.createDirectory("dir1/dir2/dir3"); |
| testCreateFile(ozoneBucket, keyName, data, false, false); |
| data = "random data random data"; |
| |
| // multi level key name with over write set. |
| testCreateFile(ozoneBucket, keyName, data, false, true); |
| |
| try { |
| testCreateFile(ozoneBucket, keyName, data, false, false); |
| fail("testFileOperationsWithRecursive"); |
| } catch (OMException ex) { |
| Assert.assertEquals(FILE_ALREADY_EXISTS, ex.getResult()); |
| } |
| |
| |
| // Try now with a file which already exists under the path |
| ozoneBucket.createDirectory("folder1/folder2/folder3/folder4"); |
| |
| keyName = "folder1/folder2/folder3/folder4/file1"; |
| testCreateFile(ozoneBucket, keyName, data, false, false); |
| |
| keyName = "folder1/folder2/folder3/file1"; |
| testCreateFile(ozoneBucket, keyName, data, false, false); |
| |
| // Try now with a file under path already. This should fail. |
| try { |
| keyName = "folder/folder2"; |
| ozoneBucket.createDirectory(keyName); |
| testCreateFile(ozoneBucket, keyName, data, false, false); |
| fail("testFileOperationsWithNonRecursive"); |
| } catch (OMException ex) { |
| Assert.assertEquals(NOT_A_FILE, ex.getResult()); |
| } |
| |
| } |
| |
| /** |
| * This method createFile and verifies the file is successfully created or |
| * not. |
| * @param ozoneBucket |
| * @param keyName |
| * @param data |
| * @param recursive |
| * @param overwrite |
| * @throws Exception |
| */ |
| public void testCreateFile(OzoneBucket ozoneBucket, String keyName, |
| String data, boolean recursive, boolean overwrite) |
| throws Exception { |
| |
| OzoneOutputStream ozoneOutputStream = ozoneBucket.createFile(keyName, |
| data.length(), ReplicationType.RATIS, ReplicationFactor.ONE, |
| overwrite, recursive); |
| |
| ozoneOutputStream.write(data.getBytes(), 0, data.length()); |
| ozoneOutputStream.close(); |
| |
| OzoneKeyDetails ozoneKeyDetails = ozoneBucket.getKey(keyName); |
| |
| Assert.assertEquals(keyName, ozoneKeyDetails.getName()); |
| Assert.assertEquals(ozoneBucket.getName(), ozoneKeyDetails.getBucketName()); |
| Assert.assertEquals(ozoneBucket.getVolumeName(), |
| ozoneKeyDetails.getVolumeName()); |
| Assert.assertEquals(data.length(), ozoneKeyDetails.getDataSize()); |
| |
| OzoneInputStream ozoneInputStream = ozoneBucket.readKey(keyName); |
| |
| byte[] fileContent = new byte[data.getBytes().length]; |
| ozoneInputStream.read(fileContent); |
| Assert.assertEquals(data, new String(fileContent)); |
| } |
| |
| @Test |
| public void testMultipartUploadWithOneOmNodeDown() throws Exception { |
| |
| OzoneBucket ozoneBucket = setupBucket(); |
| |
| String keyName = UUID.randomUUID().toString(); |
| String uploadID = initiateMultipartUpload(ozoneBucket, keyName); |
| |
| // After initiate multipartupload, shutdown leader OM. |
| // Stop leader OM, to see when the OM leader changes |
| // multipart upload is happening successfully or not. |
| |
| OMFailoverProxyProvider omFailoverProxyProvider = |
| objectStore.getClientProxy().getOMProxyProvider(); |
| |
| // The OMFailoverProxyProvider will point to the current leader OM node. |
| String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId(); |
| |
| // Stop one of the ozone manager, to see when the OM leader changes |
| // multipart upload is happening successfully or not. |
| cluster.stopOzoneManager(leaderOMNodeId); |
| Thread.sleep(NODE_FAILURE_TIMEOUT * 2); |
| |
| createMultipartKeyAndReadKey(ozoneBucket, keyName, uploadID); |
| |
| String newLeaderOMNodeId = |
| omFailoverProxyProvider.getCurrentProxyOMNodeId(); |
| |
| Assert.assertTrue(leaderOMNodeId != newLeaderOMNodeId); |
| } |
| |
| |
| private String initiateMultipartUpload(OzoneBucket ozoneBucket, |
| String keyName) throws Exception { |
| |
| OmMultipartInfo omMultipartInfo = |
| ozoneBucket.initiateMultipartUpload(keyName, |
| ReplicationType.RATIS, |
| ReplicationFactor.ONE); |
| |
| String uploadID = omMultipartInfo.getUploadID(); |
| Assert.assertTrue(uploadID != null); |
| return uploadID; |
| } |
| |
| private void createMultipartKeyAndReadKey(OzoneBucket ozoneBucket, |
| String keyName, String uploadID) throws Exception { |
| |
| String value = "random data"; |
| OzoneOutputStream ozoneOutputStream = ozoneBucket.createMultipartKey( |
| keyName, value.length(), 1, uploadID); |
| ozoneOutputStream.write(value.getBytes(), 0, value.length()); |
| ozoneOutputStream.close(); |
| |
| |
| Map<Integer, String> partsMap = new HashMap<>(); |
| partsMap.put(1, ozoneOutputStream.getCommitUploadPartInfo().getPartName()); |
| OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo = |
| ozoneBucket.completeMultipartUpload(keyName, uploadID, partsMap); |
| |
| Assert.assertTrue(omMultipartUploadCompleteInfo != null); |
| Assert.assertTrue(omMultipartUploadCompleteInfo.getHash() != null); |
| |
| |
| OzoneInputStream ozoneInputStream = ozoneBucket.readKey(keyName); |
| |
| byte[] fileContent = new byte[value.getBytes().length]; |
| ozoneInputStream.read(fileContent); |
| Assert.assertEquals(value, new String(fileContent)); |
| } |
| |
| |
| private void createKeyTest(boolean checkSuccess) throws Exception { |
| String userName = "user" + RandomStringUtils.randomNumeric(5); |
| String adminName = "admin" + RandomStringUtils.randomNumeric(5); |
| String volumeName = "volume" + RandomStringUtils.randomNumeric(5); |
| |
| VolumeArgs createVolumeArgs = VolumeArgs.newBuilder() |
| .setOwner(userName) |
| .setAdmin(adminName) |
| .build(); |
| |
| try { |
| objectStore.createVolume(volumeName, createVolumeArgs); |
| |
| OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName); |
| |
| Assert.assertTrue(retVolumeinfo.getName().equals(volumeName)); |
| Assert.assertTrue(retVolumeinfo.getOwner().equals(userName)); |
| Assert.assertTrue(retVolumeinfo.getAdmin().equals(adminName)); |
| |
| String bucketName = UUID.randomUUID().toString(); |
| String keyName = UUID.randomUUID().toString(); |
| retVolumeinfo.createBucket(bucketName); |
| |
| OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName); |
| |
| Assert.assertTrue(ozoneBucket.getName().equals(bucketName)); |
| Assert.assertTrue(ozoneBucket.getVolumeName().equals(volumeName)); |
| |
| String value = "random data"; |
| OzoneOutputStream ozoneOutputStream = ozoneBucket.createKey(keyName, |
| value.length(), ReplicationType.STAND_ALONE, |
| ReplicationFactor.ONE, new HashMap<>()); |
| ozoneOutputStream.write(value.getBytes(), 0, value.length()); |
| ozoneOutputStream.close(); |
| |
| OzoneInputStream ozoneInputStream = ozoneBucket.readKey(keyName); |
| |
| byte[] fileContent = new byte[value.getBytes().length]; |
| ozoneInputStream.read(fileContent); |
| Assert.assertEquals(value, new String(fileContent)); |
| |
| } catch (ConnectException | RemoteException e) { |
| if (!checkSuccess) { |
| // If the last OM to be tried by the RetryProxy is down, we would get |
| // ConnectException. Otherwise, we would get a RemoteException from the |
| // last running OM as it would fail to get a quorum. |
| if (e instanceof RemoteException) { |
| GenericTestUtils.assertExceptionContains( |
| "NotLeaderException", e); |
| } |
| } else { |
| throw e; |
| } |
| } |
| } |
| |
| /** |
| * Create a volume and test its attribute. |
| */ |
| private void createVolumeTest(boolean checkSuccess) throws Exception { |
| String userName = "user" + RandomStringUtils.randomNumeric(5); |
| String adminName = "admin" + RandomStringUtils.randomNumeric(5); |
| String volumeName = "volume" + RandomStringUtils.randomNumeric(5); |
| |
| VolumeArgs createVolumeArgs = VolumeArgs.newBuilder() |
| .setOwner(userName) |
| .setAdmin(adminName) |
| .build(); |
| |
| try { |
| objectStore.createVolume(volumeName, createVolumeArgs); |
| |
| OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName); |
| |
| if (checkSuccess) { |
| Assert.assertTrue(retVolumeinfo.getName().equals(volumeName)); |
| Assert.assertTrue(retVolumeinfo.getOwner().equals(userName)); |
| Assert.assertTrue(retVolumeinfo.getAdmin().equals(adminName)); |
| } else { |
| // Verify that the request failed |
| fail("There is no quorum. Request should have failed"); |
| } |
| } catch (ConnectException | RemoteException e) { |
| if (!checkSuccess) { |
| // If the last OM to be tried by the RetryProxy is down, we would get |
| // ConnectException. Otherwise, we would get a RemoteException from the |
| // last running OM as it would fail to get a quorum. |
| if (e instanceof RemoteException) { |
| GenericTestUtils.assertExceptionContains( |
| "NotLeaderException", e); |
| } |
| } else { |
| throw e; |
| } |
| } |
| } |
| |
| /** |
| * Test that OMFailoverProxyProvider creates an OM proxy for each OM in the |
| * cluster. |
| */ |
| @Test |
| public void testOMProxyProviderInitialization() throws Exception { |
| OzoneClient rpcClient = cluster.getRpcClient(); |
| OMFailoverProxyProvider omFailoverProxyProvider = |
| rpcClient.getObjectStore().getClientProxy().getOMProxyProvider(); |
| List<OMProxyInfo> omProxies = |
| omFailoverProxyProvider.getOMProxyInfos(); |
| |
| Assert.assertEquals(numOfOMs, omProxies.size()); |
| |
| for (int i = 0; i < numOfOMs; i++) { |
| InetSocketAddress omRpcServerAddr = |
| cluster.getOzoneManager(i).getOmRpcServerAddr(); |
| boolean omClientProxyExists = false; |
| for (OMProxyInfo omProxyInfo : omProxies) { |
| if (omProxyInfo.getAddress().equals(omRpcServerAddr)) { |
| omClientProxyExists = true; |
| break; |
| } |
| } |
| Assert.assertTrue("There is no OM Client Proxy corresponding to OM " + |
| "node" + cluster.getOzoneManager(i).getOMNodeId(), |
| omClientProxyExists); |
| } |
| } |
| |
| /** |
| * Test OMFailoverProxyProvider failover on connection exception to OM client. |
| */ |
| @Test |
| public void testOMProxyProviderFailoverOnConnectionFailure() |
| throws Exception { |
| OMFailoverProxyProvider omFailoverProxyProvider = |
| objectStore.getClientProxy().getOMProxyProvider(); |
| String firstProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId(); |
| |
| createVolumeTest(true); |
| |
| // On stopping the current OM Proxy, the next connection attempt should |
| // failover to a another OM proxy. |
| cluster.stopOzoneManager(firstProxyNodeId); |
| Thread.sleep(OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT * 4); |
| |
| // Next request to the proxy provider should result in a failover |
| createVolumeTest(true); |
| Thread.sleep(OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT); |
| |
| // Get the new OM Proxy NodeId |
| String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId(); |
| |
| // Verify that a failover occured. the new proxy nodeId should be |
| // different from the old proxy nodeId. |
| Assert.assertNotEquals("Failover did not occur as expected", |
| firstProxyNodeId, newProxyNodeId); |
| } |
| |
| /** |
| * Test OMFailoverProxyProvider failover when current OM proxy is not |
| * the current OM Leader. |
| */ |
| @Test |
| public void testOMProxyProviderFailoverToCurrentLeader() throws Exception { |
| OMFailoverProxyProvider omFailoverProxyProvider = |
| objectStore.getClientProxy().getOMProxyProvider(); |
| |
| // Run couple of createVolume tests to discover the current Leader OM |
| createVolumeTest(true); |
| createVolumeTest(true); |
| |
| // The OMFailoverProxyProvider will point to the current leader OM node. |
| String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId(); |
| |
| // Perform a manual failover of the proxy provider to move the |
| // currentProxyIndex to a node other than the leader OM. |
| omFailoverProxyProvider.performFailover( |
| (OzoneManagerProtocolPB) omFailoverProxyProvider.getProxy().proxy); |
| |
| String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId(); |
| Assert.assertNotEquals(leaderOMNodeId, newProxyNodeId); |
| |
| // Once another request is sent to this new proxy node, the leader |
| // information must be returned via the response and a failover must |
| // happen to the leader proxy node. |
| createVolumeTest(true); |
| Thread.sleep(2000); |
| |
| String newLeaderOMNodeId = |
| omFailoverProxyProvider.getCurrentProxyOMNodeId(); |
| |
| // The old and new Leader OM NodeId must match since there was no new |
| // election in the Ratis ring. |
| Assert.assertEquals(leaderOMNodeId, newLeaderOMNodeId); |
| } |
| |
| @Test |
| public void testOMRetryProxy() throws Exception { |
| // Stop all the OMs. After making 5 (set maxRetries value) attempts at |
| // connection, the RpcClient should give up. |
| for (int i = 0; i < numOfOMs; i++) { |
| cluster.stopOzoneManager(i); |
| } |
| |
| final LogVerificationAppender appender = new LogVerificationAppender(); |
| final org.apache.log4j.Logger logger = Logger.getRootLogger(); |
| logger.addAppender(appender); |
| |
| try { |
| createVolumeTest(true); |
| fail("TestOMRetryProxy should fail when there are no OMs running"); |
| } catch (ConnectException e) { |
| // Each retry attempt tries upto 10 times to connect. So there should be |
| // 10*10 "Retrying connect to server" messages |
| Assert.assertEquals(100, |
| appender.countLinesWithMessage("Retrying connect to server:")); |
| |
| Assert.assertEquals(1, |
| appender.countLinesWithMessage("Failed to connect to OM. Attempted " + |
| "10 retries and 10 failovers")); |
| } |
| } |
| |
| @Test |
| public void testReadRequest() throws Exception { |
| String volumeName = "volume" + RandomStringUtils.randomNumeric(5); |
| objectStore.createVolume(volumeName); |
| |
| OMFailoverProxyProvider omFailoverProxyProvider = |
| objectStore.getClientProxy().getOMProxyProvider(); |
| String currentLeaderNodeId = omFailoverProxyProvider |
| .getCurrentProxyOMNodeId(); |
| |
| // A read request from any proxy should failover to the current leader OM |
| for (int i = 0; i < numOfOMs; i++) { |
| // Failover OMFailoverProxyProvider to OM at index i |
| OzoneManager ozoneManager = cluster.getOzoneManager(i); |
| String omHostName = ozoneManager.getOmRpcServerAddr().getHostName(); |
| int rpcPort = ozoneManager.getOmRpcServerAddr().getPort(); |
| |
| // Get the ObjectStore and FailoverProxyProvider for OM at index i |
| final ObjectStore store = OzoneClientFactory.getRpcClient( |
| omHostName, rpcPort, omServiceId, conf).getObjectStore(); |
| final OMFailoverProxyProvider proxyProvider = |
| store.getClientProxy().getOMProxyProvider(); |
| |
| // Failover to the OM node that the objectStore points to |
| omFailoverProxyProvider.performFailoverIfRequired( |
| ozoneManager.getOMNodeId()); |
| |
| // A read request should result in the proxyProvider failing over to |
| // leader node. |
| OzoneVolume volume = store.getVolume(volumeName); |
| Assert.assertEquals(volumeName, volume.getName()); |
| |
| Assert.assertEquals(currentLeaderNodeId, |
| proxyProvider.getCurrentProxyOMNodeId()); |
| } |
| } |
| |
| @Test |
| public void testAddBucketAcl() throws Exception { |
| OzoneBucket ozoneBucket = setupBucket(); |
| String remoteUserName = "remoteUser"; |
| OzoneAcl defaultUserAcl = new OzoneAcl(USER, remoteUserName, |
| READ, DEFAULT); |
| |
| OzoneObj ozoneObj = OzoneObjInfo.Builder.newBuilder() |
| .setResType(OzoneObj.ResourceType.BUCKET) |
| .setStoreType(OzoneObj.StoreType.OZONE) |
| .setVolumeName(ozoneBucket.getVolumeName()) |
| .setBucketName(ozoneBucket.getName()).build(); |
| |
| testAddAcl(remoteUserName, ozoneObj, defaultUserAcl); |
| } |
| @Test |
| public void testRemoveBucketAcl() throws Exception { |
| OzoneBucket ozoneBucket = setupBucket(); |
| String remoteUserName = "remoteUser"; |
| OzoneAcl defaultUserAcl = new OzoneAcl(USER, remoteUserName, |
| READ, DEFAULT); |
| |
| OzoneObj ozoneObj = OzoneObjInfo.Builder.newBuilder() |
| .setResType(OzoneObj.ResourceType.BUCKET) |
| .setStoreType(OzoneObj.StoreType.OZONE) |
| .setVolumeName(ozoneBucket.getVolumeName()) |
| .setBucketName(ozoneBucket.getName()).build(); |
| |
| testRemoveAcl(remoteUserName, ozoneObj, defaultUserAcl); |
| |
| } |
| |
| @Test |
| public void testSetBucketAcl() throws Exception { |
| OzoneBucket ozoneBucket = setupBucket(); |
| String remoteUserName = "remoteUser"; |
| OzoneAcl defaultUserAcl = new OzoneAcl(USER, remoteUserName, |
| READ, DEFAULT); |
| |
| OzoneObj ozoneObj = OzoneObjInfo.Builder.newBuilder() |
| .setResType(OzoneObj.ResourceType.BUCKET) |
| .setStoreType(OzoneObj.StoreType.OZONE) |
| .setVolumeName(ozoneBucket.getVolumeName()) |
| .setBucketName(ozoneBucket.getName()).build(); |
| |
| testSetAcl(remoteUserName, ozoneObj, defaultUserAcl); |
| } |
| |
| private boolean containsAcl(OzoneAcl ozoneAcl, List<OzoneAcl> ozoneAcls) { |
| for (OzoneAcl acl : ozoneAcls) { |
| boolean result = compareAcls(ozoneAcl, acl); |
| if (result) { |
| // We found a match, return. |
| return result; |
| } |
| } |
| return false; |
| } |
| |
| private boolean compareAcls(OzoneAcl givenAcl, OzoneAcl existingAcl) { |
| if (givenAcl.getType().equals(existingAcl.getType()) |
| && givenAcl.getName().equals(existingAcl.getName()) |
| && givenAcl.getAclScope().equals(existingAcl.getAclScope())) { |
| BitSet bitSet = (BitSet) givenAcl.getAclBitSet().clone(); |
| bitSet.and(existingAcl.getAclBitSet()); |
| if (bitSet.equals(existingAcl.getAclBitSet())) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| @Test |
| public void testAddKeyAcl() throws Exception { |
| OzoneBucket ozoneBucket = setupBucket(); |
| String remoteUserName = "remoteUser"; |
| OzoneAcl userAcl = new OzoneAcl(USER, remoteUserName, |
| READ, DEFAULT); |
| |
| String key = createKey(ozoneBucket); |
| |
| OzoneObj ozoneObj = OzoneObjInfo.Builder.newBuilder() |
| .setResType(OzoneObj.ResourceType.KEY) |
| .setStoreType(OzoneObj.StoreType.OZONE) |
| .setVolumeName(ozoneBucket.getVolumeName()) |
| .setBucketName(ozoneBucket.getName()) |
| .setKeyName(key).build(); |
| |
| testAddAcl(remoteUserName, ozoneObj, userAcl); |
| } |
| |
| @Test |
| public void testRemoveKeyAcl() throws Exception { |
| OzoneBucket ozoneBucket = setupBucket(); |
| String remoteUserName = "remoteUser"; |
| OzoneAcl userAcl = new OzoneAcl(USER, remoteUserName, |
| READ, DEFAULT); |
| |
| String key = createKey(ozoneBucket); |
| |
| OzoneObj ozoneObj = OzoneObjInfo.Builder.newBuilder() |
| .setResType(OzoneObj.ResourceType.KEY) |
| .setStoreType(OzoneObj.StoreType.OZONE) |
| .setVolumeName(ozoneBucket.getVolumeName()) |
| .setBucketName(ozoneBucket.getName()) |
| .setKeyName(key).build(); |
| |
| testRemoveAcl(remoteUserName, ozoneObj, userAcl); |
| |
| } |
| |
| @Test |
| public void testSetKeyAcl() throws Exception { |
| OzoneBucket ozoneBucket = setupBucket(); |
| String remoteUserName = "remoteUser"; |
| OzoneAcl userAcl = new OzoneAcl(USER, remoteUserName, |
| READ, DEFAULT); |
| |
| String key = createKey(ozoneBucket); |
| |
| OzoneObj ozoneObj = OzoneObjInfo.Builder.newBuilder() |
| .setResType(OzoneObj.ResourceType.KEY) |
| .setStoreType(OzoneObj.StoreType.OZONE) |
| .setVolumeName(ozoneBucket.getVolumeName()) |
| .setBucketName(ozoneBucket.getName()) |
| .setKeyName(key).build(); |
| |
| testSetAcl(remoteUserName, ozoneObj, userAcl); |
| |
| } |
| |
| @Test |
| public void testAddPrefixAcl() throws Exception { |
| OzoneBucket ozoneBucket = setupBucket(); |
| String remoteUserName = "remoteUser"; |
| String prefixName = RandomStringUtils.randomAlphabetic(5) +"/"; |
| OzoneAcl defaultUserAcl = new OzoneAcl(USER, remoteUserName, |
| READ, DEFAULT); |
| |
| OzoneObj ozoneObj = OzoneObjInfo.Builder.newBuilder() |
| .setResType(OzoneObj.ResourceType.PREFIX) |
| .setStoreType(OzoneObj.StoreType.OZONE) |
| .setVolumeName(ozoneBucket.getVolumeName()) |
| .setBucketName(ozoneBucket.getName()) |
| .setPrefixName(prefixName).build(); |
| |
| testAddAcl(remoteUserName, ozoneObj, defaultUserAcl); |
| } |
| @Test |
| public void testRemovePrefixAcl() throws Exception { |
| OzoneBucket ozoneBucket = setupBucket(); |
| String remoteUserName = "remoteUser"; |
| String prefixName = RandomStringUtils.randomAlphabetic(5) +"/"; |
| OzoneAcl userAcl = new OzoneAcl(USER, remoteUserName, |
| READ, ACCESS); |
| OzoneAcl userAcl1 = new OzoneAcl(USER, "remote", |
| READ, ACCESS); |
| |
| OzoneObj ozoneObj = OzoneObjInfo.Builder.newBuilder() |
| .setResType(OzoneObj.ResourceType.PREFIX) |
| .setStoreType(OzoneObj.StoreType.OZONE) |
| .setVolumeName(ozoneBucket.getVolumeName()) |
| .setBucketName(ozoneBucket.getName()) |
| .setPrefixName(prefixName).build(); |
| |
| boolean result = objectStore.addAcl(ozoneObj, userAcl); |
| Assert.assertTrue(result); |
| |
| result = objectStore.addAcl(ozoneObj, userAcl1); |
| Assert.assertTrue(result); |
| |
| result = objectStore.removeAcl(ozoneObj, userAcl); |
| Assert.assertTrue(result); |
| |
| // try removing already removed acl. |
| result = objectStore.removeAcl(ozoneObj, userAcl); |
| Assert.assertFalse(result); |
| |
| result = objectStore.removeAcl(ozoneObj, userAcl1); |
| Assert.assertTrue(result); |
| |
| } |
| |
| @Test |
| public void testSetPrefixAcl() throws Exception { |
| OzoneBucket ozoneBucket = setupBucket(); |
| String remoteUserName = "remoteUser"; |
| String prefixName = RandomStringUtils.randomAlphabetic(5) +"/"; |
| OzoneAcl defaultUserAcl = new OzoneAcl(USER, remoteUserName, |
| READ, DEFAULT); |
| |
| OzoneObj ozoneObj = OzoneObjInfo.Builder.newBuilder() |
| .setResType(OzoneObj.ResourceType.PREFIX) |
| .setStoreType(OzoneObj.StoreType.OZONE) |
| .setVolumeName(ozoneBucket.getVolumeName()) |
| .setBucketName(ozoneBucket.getName()) |
| .setPrefixName(prefixName).build(); |
| |
| testSetAcl(remoteUserName, ozoneObj, defaultUserAcl); |
| } |
| |
| |
| private void testSetAcl(String remoteUserName, OzoneObj ozoneObj, |
| OzoneAcl userAcl) throws Exception { |
| // As by default create will add some default acls in RpcClient. |
| |
| if (!ozoneObj.getResourceType().name().equals( |
| OzoneObj.ResourceType.PREFIX.name())) { |
| List<OzoneAcl> acls = objectStore.getAcl(ozoneObj); |
| |
| Assert.assertTrue(acls.size() > 0); |
| } |
| |
| OzoneAcl modifiedUserAcl = new OzoneAcl(USER, remoteUserName, |
| WRITE, DEFAULT); |
| |
| List<OzoneAcl> newAcls = Collections.singletonList(modifiedUserAcl); |
| boolean setAcl = objectStore.setAcl(ozoneObj, newAcls); |
| Assert.assertTrue(setAcl); |
| |
| // Get acls and check whether they are reset or not. |
| List<OzoneAcl> getAcls = objectStore.getAcl(ozoneObj); |
| |
| Assert.assertTrue(newAcls.size() == getAcls.size()); |
| int i = 0; |
| for (OzoneAcl ozoneAcl : newAcls) { |
| Assert.assertTrue(compareAcls(getAcls.get(i++), ozoneAcl)); |
| } |
| |
| } |
| |
| private void testAddAcl(String remoteUserName, OzoneObj ozoneObj, |
| OzoneAcl userAcl) throws Exception { |
| boolean addAcl = objectStore.addAcl(ozoneObj, userAcl); |
| Assert.assertTrue(addAcl); |
| |
| List<OzoneAcl> acls = objectStore.getAcl(ozoneObj); |
| |
| Assert.assertTrue(containsAcl(userAcl, acls)); |
| |
| // Add an already existing acl. |
| addAcl = objectStore.addAcl(ozoneObj, userAcl); |
| Assert.assertFalse(addAcl); |
| |
| // Add an acl by changing acl type with same type, name and scope. |
| userAcl = new OzoneAcl(USER, remoteUserName, |
| WRITE, DEFAULT); |
| addAcl = objectStore.addAcl(ozoneObj, userAcl); |
| Assert.assertTrue(addAcl); |
| } |
| |
| private void testRemoveAcl(String remoteUserName, OzoneObj ozoneObj, |
| OzoneAcl userAcl) |
| throws Exception{ |
| // As by default create will add some default acls in RpcClient. |
| List<OzoneAcl> acls = objectStore.getAcl(ozoneObj); |
| |
| Assert.assertTrue(acls.size() > 0); |
| |
| // Remove an existing acl. |
| boolean removeAcl = objectStore.removeAcl(ozoneObj, acls.get(0)); |
| Assert.assertTrue(removeAcl); |
| |
| // Trying to remove an already removed acl. |
| removeAcl = objectStore.removeAcl(ozoneObj, acls.get(0)); |
| Assert.assertFalse(removeAcl); |
| |
| boolean addAcl = objectStore.addAcl(ozoneObj, userAcl); |
| Assert.assertTrue(addAcl); |
| |
| // Just changed acl type here to write, rest all is same as defaultUserAcl. |
| OzoneAcl modifiedUserAcl = new OzoneAcl(USER, remoteUserName, |
| WRITE, DEFAULT); |
| addAcl = objectStore.addAcl(ozoneObj, modifiedUserAcl); |
| Assert.assertTrue(addAcl); |
| |
| removeAcl = objectStore.removeAcl(ozoneObj, modifiedUserAcl); |
| Assert.assertTrue(removeAcl); |
| |
| removeAcl = objectStore.removeAcl(ozoneObj, userAcl); |
| Assert.assertTrue(removeAcl); |
| } |
| |
| |
| |
| @Test |
| public void testOMRatisSnapshot() throws Exception { |
| String userName = "user" + RandomStringUtils.randomNumeric(5); |
| String adminName = "admin" + RandomStringUtils.randomNumeric(5); |
| String volumeName = "volume" + RandomStringUtils.randomNumeric(5); |
| String bucketName = "bucket" + RandomStringUtils.randomNumeric(5); |
| |
| VolumeArgs createVolumeArgs = VolumeArgs.newBuilder() |
| .setOwner(userName) |
| .setAdmin(adminName) |
| .build(); |
| |
| objectStore.createVolume(volumeName, createVolumeArgs); |
| OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName); |
| |
| retVolumeinfo.createBucket(bucketName); |
| OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName); |
| |
| String leaderOMNodeId = objectStore.getClientProxy().getOMProxyProvider() |
| .getCurrentProxyOMNodeId(); |
| OzoneManager ozoneManager = cluster.getOzoneManager(leaderOMNodeId); |
| |
| // Send commands to ratis to increase the log index so that ratis |
| // triggers a snapshot on the state machine. |
| |
| long appliedLogIndex = 0; |
| while (appliedLogIndex <= SNAPSHOT_THRESHOLD) { |
| createKey(ozoneBucket); |
| appliedLogIndex = ozoneManager.getOmRatisServer() |
| .getStateMachineLastAppliedIndex(); |
| } |
| |
| GenericTestUtils.waitFor(() -> { |
| if (ozoneManager.getRatisSnapshotIndex() > 0) { |
| return true; |
| } |
| return false; |
| }, 1000, 100000); |
| |
| // The current lastAppliedLogIndex on the state machine should be greater |
| // than or equal to the saved snapshot index. |
| long smLastAppliedIndex = |
| ozoneManager.getOmRatisServer().getStateMachineLastAppliedIndex(); |
| long ratisSnapshotIndex = ozoneManager.getRatisSnapshotIndex(); |
| Assert.assertTrue("LastAppliedIndex on OM State Machine (" |
| + smLastAppliedIndex + ") is less than the saved snapshot index(" |
| + ratisSnapshotIndex + ").", |
| smLastAppliedIndex >= ratisSnapshotIndex); |
| |
| // Add more transactions to Ratis to trigger another snapshot |
| while (appliedLogIndex <= (smLastAppliedIndex + SNAPSHOT_THRESHOLD)) { |
| createKey(ozoneBucket); |
| appliedLogIndex = ozoneManager.getOmRatisServer() |
| .getStateMachineLastAppliedIndex(); |
| } |
| |
| GenericTestUtils.waitFor(() -> { |
| if (ozoneManager.getRatisSnapshotIndex() > 0) { |
| return true; |
| } |
| return false; |
| }, 1000, 100000); |
| |
| // The new snapshot index must be greater than the previous snapshot index |
| long ratisSnapshotIndexNew = ozoneManager.getRatisSnapshotIndex(); |
| Assert.assertTrue("Latest snapshot index must be greater than previous " + |
| "snapshot indices", ratisSnapshotIndexNew > ratisSnapshotIndex); |
| |
| } |
| |
| /** |
| * Create a key in the bucket. |
| * @return the key name. |
| */ |
| static String createKey(OzoneBucket ozoneBucket) throws IOException { |
| String keyName = "key" + RandomStringUtils.randomNumeric(5); |
| String data = "data" + RandomStringUtils.randomNumeric(5); |
| OzoneOutputStream ozoneOutputStream = ozoneBucket.createKey(keyName, |
| data.length(), ReplicationType.STAND_ALONE, |
| ReplicationFactor.ONE, new HashMap<>()); |
| ozoneOutputStream.write(data.getBytes(), 0, data.length()); |
| ozoneOutputStream.close(); |
| return keyName; |
| } |
| |
| @Test |
| public void testOMRestart() throws Exception { |
| // Get the leader OM |
| String leaderOMNodeId = objectStore.getClientProxy().getOMProxyProvider() |
| .getCurrentProxyOMNodeId(); |
| OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId); |
| |
| // Get follower OMs |
| OzoneManager followerOM1 = cluster.getOzoneManager( |
| leaderOM.getPeerNodes().get(0).getOMNodeId()); |
| OzoneManager followerOM2 = cluster.getOzoneManager( |
| leaderOM.getPeerNodes().get(1).getOMNodeId()); |
| |
| // Do some transactions so that the log index increases |
| String userName = "user" + RandomStringUtils.randomNumeric(5); |
| String adminName = "admin" + RandomStringUtils.randomNumeric(5); |
| String volumeName = "volume" + RandomStringUtils.randomNumeric(5); |
| String bucketName = "bucket" + RandomStringUtils.randomNumeric(5); |
| |
| VolumeArgs createVolumeArgs = VolumeArgs.newBuilder() |
| .setOwner(userName) |
| .setAdmin(adminName) |
| .build(); |
| |
| objectStore.createVolume(volumeName, createVolumeArgs); |
| OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName); |
| |
| retVolumeinfo.createBucket(bucketName); |
| OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName); |
| |
| for (int i = 0; i < 10; i++) { |
| createKey(ozoneBucket); |
| } |
| |
| long lastAppliedTxOnFollowerOM = |
| followerOM1.getOmRatisServer().getStateMachineLastAppliedIndex(); |
| |
| // Stop one follower OM |
| followerOM1.stop(); |
| |
| // Do more transactions. Stopped OM should miss these transactions and |
| // the logs corresponding to atleast some of the missed transactions |
| // should be purged. This will force the OM to install snapshot when |
| // restarted. |
| long minNewTxIndex = lastAppliedTxOnFollowerOM + (LOG_PURGE_GAP * 10); |
| long leaderOMappliedLogIndex = leaderOM.getOmRatisServer() |
| .getStateMachineLastAppliedIndex(); |
| |
| List<String> missedKeys = new ArrayList<>(); |
| while (leaderOMappliedLogIndex < minNewTxIndex) { |
| missedKeys.add(createKey(ozoneBucket)); |
| leaderOMappliedLogIndex = leaderOM.getOmRatisServer() |
| .getStateMachineLastAppliedIndex(); |
| } |
| |
| // Restart the stopped OM. |
| followerOM1.restart(); |
| |
| // Get the latest snapshotIndex from the leader OM. |
| long leaderOMSnaphsotIndex = leaderOM.saveRatisSnapshot(); |
| |
| // The recently started OM should be lagging behind the leader OM. |
| long followerOMLastAppliedIndex = |
| followerOM1.getOmRatisServer().getStateMachineLastAppliedIndex(); |
| Assert.assertTrue( |
| followerOMLastAppliedIndex < leaderOMSnaphsotIndex); |
| |
| // Wait for the follower OM to catch up |
| GenericTestUtils.waitFor(() -> { |
| long lastAppliedIndex = |
| followerOM1.getOmRatisServer().getStateMachineLastAppliedIndex(); |
| if (lastAppliedIndex >= leaderOMSnaphsotIndex) { |
| return true; |
| } |
| return false; |
| }, 100, 200000); |
| |
| // Do more transactions. The restarted OM should receive the |
| // new transactions. It's last applied tx index should increase from the |
| // last snapshot index after more transactions are applied. |
| for (int i = 0; i < 10; i++) { |
| createKey(ozoneBucket); |
| } |
| long followerOM1lastAppliedIndex = followerOM1.getOmRatisServer() |
| .getStateMachineLastAppliedIndex(); |
| Assert.assertTrue(followerOM1lastAppliedIndex > |
| leaderOMSnaphsotIndex); |
| |
| // The follower OMs should be in sync. There can be a small lag between |
| // leader OM and follower OMs as txns are applied first on leader OM. |
| long followerOM2lastAppliedIndex = followerOM1.getOmRatisServer() |
| .getStateMachineLastAppliedIndex(); |
| Assert.assertEquals(followerOM1lastAppliedIndex, |
| followerOM2lastAppliedIndex); |
| |
| } |
| } |