blob: b213e9d4e8bed0494a397024e26c572d722e2285 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.common;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.rmi.RemoteException;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Optional;
import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.common.metadata.MetadataConstants;
import org.apache.asterix.common.metadata.NamespacePathResolver;
import org.apache.asterix.common.utils.Servlets;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
import org.apache.asterix.rebalance.NoOpDatasetRebalanceCallback;
import org.apache.asterix.test.common.TestExecutor;
import org.apache.asterix.testframework.context.TestCaseContext;
import org.apache.asterix.utils.RebalanceUtil;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.io.FileSplit;
import org.junit.Assert;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
public class TestDataUtil {
private static final TestExecutor TEST_EXECUTOR = new TestExecutor();
private static final TestCaseContext.OutputFormat OUTPUT_FORMAT = TestCaseContext.OutputFormat.CLEAN_JSON;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private TestDataUtil() {
}
/**
* Creates dataset with a single field called id as its primary key.
*
* @param dataset
* @throws Exception
*/
public static void createIdOnlyDataset(String dataset) throws Exception {
TEST_EXECUTOR.executeSqlppUpdateOrDdl("CREATE TYPE KeyType IF NOT EXISTS AS { id: int };", OUTPUT_FORMAT);
TEST_EXECUTOR.executeSqlppUpdateOrDdl("CREATE DATASET " + dataset + "(KeyType) PRIMARY KEY id;", OUTPUT_FORMAT);
}
public static void dropDataset(String dataset) throws Exception {
TEST_EXECUTOR.executeSqlppUpdateOrDdl("DROP DATASET " + dataset + ";", OUTPUT_FORMAT);
}
/**
* Creates a dataset with multiple fields
* @param dataset The name of the dataset
* @param fields The fields of the dataset
* @param PKName The primary key field name
* @throws Exception
*/
public static void createDataset(String dataset, Map<String, String> fields, String PKName) throws Exception {
StringBuilder stringBuilder = new StringBuilder("");
fields.forEach((fName, fType) -> stringBuilder.append(fName).append(":").append(fType).append(","));
stringBuilder.deleteCharAt(stringBuilder.length() - 1);
TEST_EXECUTOR.executeSqlppUpdateOrDdl("CREATE TYPE dsType AS {" + stringBuilder + "};", OUTPUT_FORMAT);
TEST_EXECUTOR.executeSqlppUpdateOrDdl("CREATE DATASET " + dataset + "(dsType) PRIMARY KEY " + PKName + ";",
OUTPUT_FORMAT);
}
/**
* Creates a dataset without type specification
* @param dataset The name of the dataset
* @param fields The fields composing the primary key
* @param pkAutogenerated Is the primary key autogenerated
* @throws Exception
*/
public static void createDatasetWithoutType(String dataset, Map<String, String> fields, boolean pkAutogenerated)
throws Exception {
StringBuilder stringBuilder = new StringBuilder("");
fields.forEach((fName, fType) -> stringBuilder.append(fName).append(":").append(fType).append(","));
stringBuilder.deleteCharAt(stringBuilder.length() - 1);
TEST_EXECUTOR.executeSqlppUpdateOrDdl("CREATE DATASET " + dataset + " PRIMARY KEY (" + stringBuilder + ")"
+ (pkAutogenerated ? "AUTOGENERATED;" : ";"), OUTPUT_FORMAT);
}
/**
* Creates a secondary primary index
* @param dataset the name of the dataset
* @param indexName the name of the index
* @throws Exception
*/
public static void createPrimaryIndex(String dataset, String indexName) throws Exception {
TEST_EXECUTOR.executeSqlppUpdateOrDdl("CREATE PRIMARY INDEX " + indexName + " ON " + dataset + ";",
OUTPUT_FORMAT);
}
/**
* Creates a secondary BTree index
* @param dataset the name of the dataset
* @param indexName the name of the index
* @param SKName the name of the field
* @throws Exception
*/
public static void createSecondaryBTreeIndex(String dataset, String indexName, String SKName) throws Exception {
TEST_EXECUTOR.executeSqlppUpdateOrDdl("CREATE INDEX " + indexName + " ON " + dataset + "(" + SKName + ");",
OUTPUT_FORMAT);
}
/**
* Upserts {@code count} ids into {@code dataset}
*
* @param dataset
* @param count
* @throws Exception
*/
public static void upsertData(String dataset, long count) throws Exception {
for (int i = 0; i < count; i++) {
TEST_EXECUTOR.executeSqlppUpdateOrDdl("UPSERT INTO " + dataset + " ({\"id\": " + i + "});",
TestCaseContext.OutputFormat.CLEAN_JSON);
}
}
/**
* Creates a single insert statement with multiple records containing name field
* @param dataset The name of the dataset
* @param count Number of records in the insert statement
* @throws Exception
*/
public static void insertBulkData(String dataset, long count) throws Exception {
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < count; i++) {
stringBuilder.append("{\"name\": \"name_" + i + "\"},");
}
stringBuilder.deleteCharAt(stringBuilder.length() - 1);
TEST_EXECUTOR.executeSqlppUpdateOrDdl("INSERT INTO " + dataset + "([" + stringBuilder + "]);",
TestCaseContext.OutputFormat.CLEAN_JSON);
}
/**
* Gets the number of records in dataset {@code dataset}
*
* @param datasetName
* @return The count
* @throws Exception
*/
public static long getDatasetCount(String datasetName) throws Exception {
final String query = "SELECT VALUE COUNT(*) FROM `" + datasetName + "`;";
final InputStream responseStream = TEST_EXECUTOR.executeQueryService(query,
TEST_EXECUTOR.getEndpoint(Servlets.QUERY_SERVICE), OUTPUT_FORMAT, StandardCharsets.UTF_8);
final ObjectNode response = OBJECT_MAPPER.readValue(responseStream, ObjectNode.class);
final JsonNode result = response.get("results");
// make sure there is a single value in result
Assert.assertEquals(1, result.size());
return result.get(0).asInt();
}
/**
* Rebalances a dataset to {@code targetNodes}
*
* @param integrationUtil
* @param dataverseName
* @param datasetName
* @param targetNodes
* @throws Exception
*/
public static void rebalanceDataset(AsterixHyracksIntegrationUtil integrationUtil, String database,
DataverseName dataverseName, String datasetName, String[] targetNodes) throws Exception {
ICcApplicationContext ccAppCtx =
(ICcApplicationContext) integrationUtil.getClusterControllerService().getApplicationContext();
MetadataProvider metadataProvider = MetadataProvider.createWithDefaultNamespace(ccAppCtx);
try {
ActiveNotificationHandler activeNotificationHandler =
(ActiveNotificationHandler) ccAppCtx.getActiveNotificationHandler();
activeNotificationHandler.suspend(metadataProvider);
try {
IMetadataLockManager lockManager = ccAppCtx.getMetadataLockManager();
lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(), database,
dataverseName, datasetName);
RebalanceUtil.rebalance(database, dataverseName, datasetName,
new LinkedHashSet<>(Arrays.asList(targetNodes)), metadataProvider, ccAppCtx.getHcc(),
NoOpDatasetRebalanceCallback.INSTANCE, false);
} finally {
activeNotificationHandler.resume(metadataProvider);
}
} finally {
metadataProvider.getLocks().unlock();
}
}
/**
* Gets the reference of dataset {@code dataset} from metadata
*
* @param integrationUtil
* @param datasetName
* @return the dataset reference if found. Otherwise null.
* @throws AlgebricksException
* @throws RemoteException
*/
public static Dataset getDataset(AsterixHyracksIntegrationUtil integrationUtil, String datasetName)
throws AlgebricksException, RemoteException {
final ICcApplicationContext appCtx =
(ICcApplicationContext) integrationUtil.getClusterControllerService().getApplicationContext();
final MetadataProvider metadataProvider = MetadataProvider.createWithDefaultNamespace(appCtx);
final MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
Dataset dataset;
try {
dataset = metadataProvider.findDataset(MetadataConstants.DEFAULT_DATABASE,
MetadataConstants.DEFAULT_DATAVERSE_NAME, datasetName);
} finally {
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
metadataProvider.getLocks().unlock();
}
return dataset;
}
/**
* Gets the file splits of {@code dataset}
*
* @param integrationUtil
* @param dataset
* @return the file splits of the dataset
* @throws RemoteException
* @throws AlgebricksException
*/
public static FileSplit[] getDatasetSplits(AsterixHyracksIntegrationUtil integrationUtil, Dataset dataset)
throws RemoteException, AlgebricksException {
final ICcApplicationContext ccAppCtx =
(ICcApplicationContext) integrationUtil.getClusterControllerService().getApplicationContext();
final MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
try {
String dvPath =
new NamespacePathResolver(false).resolve(dataset.getDatabaseName(), dataset.getDataverseName());
return SplitsAndConstraintsUtil.getIndexSplits(dataset, dataset.getDatasetName(), mdTxnCtx,
ccAppCtx.getClusterStateManager(), dvPath);
} finally {
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
}
}
public static String getIndexPath(AsterixHyracksIntegrationUtil integrationUtil, Dataset dataset, String nodeId)
throws Exception {
final FileSplit[] datasetSplits = TestDataUtil.getDatasetSplits(integrationUtil, dataset);
final Optional<FileSplit> nodeFileSplit =
Arrays.stream(datasetSplits).filter(s -> s.getNodeName().equals(nodeId)).findFirst();
Assert.assertTrue(nodeFileSplit.isPresent());
return nodeFileSplit.get().getPath();
}
}