blob: bcca21ee66ee26e52394ddff8b15dfb0ae3bd426 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.recon;
import static java.net.HttpURLConnection.HTTP_CREATED;
import static java.net.HttpURLConnection.HTTP_OK;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_CONNECTION_REQUEST_TIMEOUT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_CONNECTION_REQUEST_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_CONNECTION_TIMEOUT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_CONNECTION_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SOCKET_TIMEOUT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SOCKET_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl.OmSnapshotTaskName.OmDeltaRequest;
import static org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl.OmSnapshotTaskName.OmSnapshotRequest;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.utils.db.RDBStore;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.gson.Gson;
import com.google.gson.internal.LinkedTreeMap;
import org.junit.Rule;
import org.junit.rules.Timeout;
/**
* Test Ozone Recon.
*/
public class TestReconWithOzoneManager {
/**
* Set a timeout for each test.
*/
@Rule
public Timeout timeout = Timeout.seconds(300);
private static MiniOzoneCluster cluster = null;
private static OzoneConfiguration conf;
private static OMMetadataManager metadataManager;
private static CloseableHttpClient httpClient;
private static String containerKeyServiceURL;
private static String taskStatusURL;
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
int socketTimeout = (int) conf.getTimeDuration(
OZONE_RECON_OM_SOCKET_TIMEOUT,
conf.get(
ReconServerConfigKeys.RECON_OM_SOCKET_TIMEOUT,
OZONE_RECON_OM_SOCKET_TIMEOUT_DEFAULT),
TimeUnit.MILLISECONDS);
int connectionTimeout = (int) conf.getTimeDuration(
OZONE_RECON_OM_CONNECTION_TIMEOUT,
conf.get(
ReconServerConfigKeys.RECON_OM_CONNECTION_TIMEOUT,
OZONE_RECON_OM_CONNECTION_TIMEOUT_DEFAULT),
TimeUnit.MILLISECONDS);
int connectionRequestTimeout = (int)conf.getTimeDuration(
OZONE_RECON_OM_CONNECTION_REQUEST_TIMEOUT,
conf.get(
ReconServerConfigKeys.RECON_OM_CONNECTION_REQUEST_TIMEOUT,
OZONE_RECON_OM_CONNECTION_REQUEST_TIMEOUT_DEFAULT),
TimeUnit.MILLISECONDS
);
RequestConfig config = RequestConfig.custom()
.setConnectTimeout(socketTimeout)
.setConnectionRequestTimeout(connectionTimeout)
.setSocketTimeout(connectionRequestTimeout).build();
cluster =
MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(1)
.includeRecon(true)
.build();
cluster.waitForClusterToBeReady();
metadataManager = cluster.getOzoneManager().getMetadataManager();
cluster.getStorageContainerManager().exitSafeMode();
InetSocketAddress address =
cluster.getReconServer().getHttpServer().getHttpAddress();
String reconHTTPAddress = address.getHostName() + ":" + address.getPort();
containerKeyServiceURL = "http://" + reconHTTPAddress +
"/api/v1/containers";
taskStatusURL = "http://" + reconHTTPAddress + "/api/v1/task/status";
// initialize HTTPClient
httpClient = HttpClientBuilder
.create()
.setDefaultRequestConfig(config)
.build();
}
@AfterClass
public static void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
/**
* Returns a {@link CloseableHttpClient} configured by given configuration.
* If conf is null, returns a default instance.
*
* @param url URL
* @return a JSON String Response.
*/
private String makeHttpCall(String url)
throws IOException {
HttpGet httpGet = new HttpGet(url);
HttpResponse response = httpClient.execute(httpGet);
int errorCode = response.getStatusLine().getStatusCode();
HttpEntity entity = response.getEntity();
if ((errorCode == HTTP_OK) || (errorCode == HTTP_CREATED)) {
return EntityUtils.toString(entity);
}
if (entity != null) {
throw new IOException("Unexpected exception when trying to reach " +
"Recon Server, " + EntityUtils.toString(entity));
} else {
throw new IOException("Unexpected null in http payload," +
" while processing request");
}
}
@Test
public void testOmDBSyncing() throws Exception {
// add a vol, bucket and key
addKeys(0, 1);
// check if OM metadata has vol0/bucket0/key0 info
String ozoneKey = metadataManager.getOzoneKey(
"vol0", "bucket0", "key0");
OmKeyInfo keyInfo1 = metadataManager.getKeyTable().get(ozoneKey);
TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
omKeyValueTableIterator = metadataManager.getKeyTable().iterator();
long omMetadataKeyCount = getTableKeyCount(omKeyValueTableIterator);
// verify if OM has /vol0/bucket0/key0
Assert.assertEquals("vol0", keyInfo1.getVolumeName());
Assert.assertEquals("bucket0", keyInfo1.getBucketName());
OzoneManagerServiceProviderImpl impl = (OzoneManagerServiceProviderImpl)
cluster.getReconServer().getOzoneManagerServiceProvider();
impl.syncDataFromOM();
// HTTP call to /api/containers
String containerResponse = makeHttpCall(containerKeyServiceURL);
long reconMetadataContainerCount =
getReconContainerCount(containerResponse);
// verify count of keys after full snapshot
Assert.assertEquals(omMetadataKeyCount, reconMetadataContainerCount);
// verify if Recon Metadata captures vol0/bucket0/key0 info in container0
LinkedTreeMap containerResponseMap = getContainerResponseMap(
containerResponse, 0);
Assert.assertEquals(0,
(long)(double) containerResponseMap.get("ContainerID"));
Assert.assertEquals(1,
(long)(double) containerResponseMap.get("NumberOfKeys"));
// HTTP call to /api/task/status
long omLatestSeqNumber = ((RDBStore) metadataManager.getStore())
.getDb().getLatestSequenceNumber();
String taskStatusResponse = makeHttpCall(taskStatusURL);
long reconLatestSeqNumber = getReconTaskAttributeFromJson(
taskStatusResponse,
OmSnapshotRequest.name(),
"lastUpdatedSeqNumber");
// verify sequence number after full snapshot
Assert.assertEquals(omLatestSeqNumber, reconLatestSeqNumber);
//add 4 keys to check for delta updates
addKeys(1, 5);
omKeyValueTableIterator = metadataManager.getKeyTable().iterator();
omMetadataKeyCount = getTableKeyCount(omKeyValueTableIterator);
// update the next snapshot from om to verify delta updates
impl.syncDataFromOM();
// HTTP call to /api/containers
containerResponse = makeHttpCall(containerKeyServiceURL);
reconMetadataContainerCount = getReconContainerCount(containerResponse);
//verify count of keys
Assert.assertEquals(omMetadataKeyCount, reconMetadataContainerCount);
//verify if Recon Metadata captures vol3/bucket3/key3 info in container3
containerResponseMap = getContainerResponseMap(
containerResponse, 3);
Assert.assertEquals(3,
(long)(double) containerResponseMap.get("ContainerID"));
Assert.assertEquals(1,
(long)(double) containerResponseMap.get("NumberOfKeys"));
// HTTP call to /api/task/status
omLatestSeqNumber = ((RDBStore) metadataManager.getStore())
.getDb().getLatestSequenceNumber();
taskStatusResponse = makeHttpCall(taskStatusURL);
reconLatestSeqNumber = getReconTaskAttributeFromJson(
taskStatusResponse, OmDeltaRequest.name(), "lastUpdatedSeqNumber");
//verify sequence number after Delta Updates
Assert.assertEquals(omLatestSeqNumber, reconLatestSeqNumber);
long beforeRestartSnapShotTimeStamp = getReconTaskAttributeFromJson(
taskStatusResponse,
OmSnapshotRequest.name(),
"lastUpdatedTimestamp");
//restart Recon
cluster.restartReconServer();
impl = (OzoneManagerServiceProviderImpl)
cluster.getReconServer().getOzoneManagerServiceProvider();
//add 5 more keys to OM
addKeys(5, 10);
omKeyValueTableIterator = metadataManager.getKeyTable().iterator();
omMetadataKeyCount = getTableKeyCount(omKeyValueTableIterator);
// get the next snapshot from om
impl.syncDataFromOM();
// HTTP call to /api/containers
containerResponse = makeHttpCall(containerKeyServiceURL);
reconMetadataContainerCount = getReconContainerCount(containerResponse);
// verify count of keys
Assert.assertEquals(omMetadataKeyCount, reconMetadataContainerCount);
// verify if Recon Metadata captures vol7/bucket7/key7 info in container7
containerResponseMap = getContainerResponseMap(
containerResponse, 7);
Assert.assertEquals(7,
(long)(double) containerResponseMap.get("ContainerID"));
Assert.assertEquals(1,
(long)(double) containerResponseMap.get("NumberOfKeys"));
// HTTP call to /api/task/status
omLatestSeqNumber = ((RDBStore) metadataManager.getStore())
.getDb().getLatestSequenceNumber();
taskStatusResponse = makeHttpCall(taskStatusURL);
reconLatestSeqNumber = getReconTaskAttributeFromJson(
taskStatusResponse,
OmDeltaRequest.name(),
"lastUpdatedSeqNumber");
long afterRestartSnapShotTimeStamp =
getReconTaskAttributeFromJson(taskStatusResponse,
OmSnapshotRequest.name(),
"lastUpdatedTimestamp");
// verify only Delta updates were added to recon after restart.
Assert.assertEquals(beforeRestartSnapShotTimeStamp,
afterRestartSnapShotTimeStamp);
//verify sequence number after Delta Updates
Assert.assertEquals(omLatestSeqNumber, reconLatestSeqNumber);
}
private long getReconTaskAttributeFromJson(String taskStatusResponse,
String taskName, String entityAttribute) {
ArrayList<LinkedTreeMap> taskStatusList = new Gson()
.fromJson(taskStatusResponse, ArrayList.class);
Optional<LinkedTreeMap> taskEntity =
taskStatusList
.stream()
.filter(task -> task.get("taskName").equals(taskName))
.findFirst();
Assert.assertTrue(taskEntity.isPresent());
return (long)(double) taskEntity.get().get(entityAttribute);
}
private long getReconContainerCount(String containerResponse) {
Map map = new Gson().fromJson(containerResponse, HashMap.class);
LinkedTreeMap linkedTreeMap = (LinkedTreeMap) map.get("data");
return (long)(double) linkedTreeMap.get("totalCount");
}
private LinkedTreeMap getContainerResponseMap(String containerResponse,
int expectedContainerID) {
Map map = new Gson().fromJson(containerResponse, HashMap.class);
LinkedTreeMap linkedTreeMap = (LinkedTreeMap) map.get("data");
ArrayList containers = (ArrayList) linkedTreeMap.get("containers");
return (LinkedTreeMap)containers.get(expectedContainerID);
}
/**
* Helper function to add voli/bucketi/keyi to containeri to OM Metadata.
* For test purpose each container will have only one key.
*/
private void addKeys(int start, int end) throws Exception {
for(int i = start; i < end; i++) {
Pipeline pipeline = TestUtils.getRandomPipeline();
List<OmKeyLocationInfo> omKeyLocationInfoList = new ArrayList<>();
BlockID blockID = new BlockID(i, 1);
OmKeyLocationInfo omKeyLocationInfo1 = getOmKeyLocationInfo(blockID,
pipeline);
omKeyLocationInfoList.add(omKeyLocationInfo1);
OmKeyLocationInfoGroup omKeyLocationInfoGroup = new
OmKeyLocationInfoGroup(0, omKeyLocationInfoList);
writeDataToOm("key"+i, "bucket"+i, "vol"+i,
Collections.singletonList(omKeyLocationInfoGroup));
}
}
private long getTableKeyCount(TableIterator<String, ? extends
Table.KeyValue<String, OmKeyInfo>> iterator) {
long keyCount = 0;
while(iterator.hasNext()) {
keyCount++;
iterator.next();
}
return keyCount;
}
private static OmKeyLocationInfo getOmKeyLocationInfo(BlockID blockID,
Pipeline pipeline) {
return new OmKeyLocationInfo.Builder()
.setBlockID(blockID)
.setPipeline(pipeline)
.build();
}
private static void writeDataToOm(String key, String bucket, String volume,
List<OmKeyLocationInfoGroup>
omKeyLocationInfoGroupList)
throws IOException {
String omKey = metadataManager.getOzoneKey(volume,
bucket, key);
metadataManager.getKeyTable().put(omKey,
new OmKeyInfo.Builder()
.setBucketName(bucket)
.setVolumeName(volume)
.setKeyName(key)
.setReplicationFactor(HddsProtos.ReplicationFactor.ONE)
.setReplicationType(HddsProtos.ReplicationType.STAND_ALONE)
.setOmKeyLocationInfos(omKeyLocationInfoGroupList)
.build());
}
}