blob: 25d6d3d3b3e7a9cccdfa4b97bffc382c2a00b4fe [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.util;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URISyntaxException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL;
import org.apache.hadoop.conf.Configuration;
import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService;
import org.apache.impala.authorization.User;
import org.apache.impala.common.ByteUnits;
import org.apache.impala.common.InternalException;
import org.apache.impala.common.RuntimeEnv;
import org.apache.impala.thrift.TErrorCode;
import org.apache.impala.thrift.TPoolConfig;
import org.apache.impala.thrift.TResolveRequestPoolParams;
import org.apache.impala.thrift.TResolveRequestPoolResult;
import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.QueuePlacementPolicy;
import com.google.common.collect.Iterables;
import com.google.common.io.Files;
/**
* Unit tests for the user to pool resolution, authorization, and getting configuration
* parameters via {@link RequestPoolService}. Sets a configuration file and ensures the
* appropriate user to pool resolution, authentication, and pool configs are returned.
* This also tests that updating the files after startup causes them to be reloaded and
* the updated values are returned.
* TODO: Move tests to C++ to test the API that's actually used.
*/
public class TestRequestPoolService {
// Pool definitions and includes memory resource limits, copied to a temporary file
private static final String ALLOCATION_FILE = "fair-scheduler-test.xml";
// A second allocation file which overwrites the temporary file to check for changes.
private static final String ALLOCATION_FILE_MODIFIED = "fair-scheduler-test2.xml";
private static final String ALLOCATION_FILE_EMPTY = "fair-scheduler-empty.xml";
private static final String ALLOCATION_FILE_GROUP_RULE = "fair-scheduler-group-rule.xml";
// Contains per-pool configurations for maximum number of running queries and queued
// requests.
private static final String LLAMA_CONFIG_FILE = "llama-site-test.xml";
// A second Llama config which overwrites the temporary file to check for changes.
private static final String LLAMA_CONFIG_FILE_MODIFIED = "llama-site-test2.xml";
private static final String LLAMA_CONFIG_FILE_EMPTY = "llama-site-empty.xml";
// Set the file check interval to something short so we don't have to wait long after
// changing the file.
private static final long CHECK_INTERVAL_MS = 100L;
// Temp folder where the config files are copied so we can modify them in place.
// The JUnit @Rule creates and removes the temp folder between every test.
@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
private RequestPoolService poolService_;
private File allocationConfFile_;
private File llamaConfFile_;
/**
* Creates the poolService_ with the specified configuration.
* @param allocationFile The file on the classpath of the allocation conf.
* @param llamaConfFile The file on the classpath of the Llama conf. May be null to
* create a RequestPoolService with no llama-conf.xml as it is
* not required.
*/
private void createPoolService(String allocationFile, String llamaConfFile)
throws Exception {
allocationConfFile_ = tempFolder.newFile("fair-scheduler-temp-file.xml");
Files.copy(getClasspathFile(allocationFile), allocationConfFile_);
String llamaConfPath = null;
if (llamaConfFile != null) {
llamaConfFile_ = tempFolder.newFile("llama-conf-temp-file.xml");
Files.copy(getClasspathFile(llamaConfFile), llamaConfFile_);
llamaConfPath = llamaConfFile_.getAbsolutePath();
}
poolService_ = new RequestPoolService(allocationConfFile_.getAbsolutePath(),
llamaConfPath);
// Lower the wait times on the AllocationFileLoaderService and RequestPoolService so
// the test doesn't have to wait very long to test that file changes are reloaded.
Field f = AllocationFileLoaderService.class.getDeclaredField("reloadIntervalMs");
f.setAccessible(true);
f.set(poolService_.allocLoader_, CHECK_INTERVAL_MS);
if (llamaConfFile != null) {
poolService_.confWatcher_.setCheckIntervalMs(CHECK_INTERVAL_MS);
}
poolService_.start();
// Make sure that the Hadoop configuration from classpath is used for the underlying
// QueuePlacementPolicy.
QueuePlacementPolicy policy = poolService_.getAllocationConfig().getPlacementPolicy();
Configuration conf = policy.getConf();
Assert.assertTrue(conf.getBoolean("impala.core-site.overridden", false));
}
@BeforeClass
public static void setUpClass() throws Exception {
RuntimeEnv.INSTANCE.setTestEnv(true);
User.setRulesForTesting(
new Configuration().get(HADOOP_SECURITY_AUTH_TO_LOCAL, "DEFAULT"));
}
@AfterClass
public static void cleanUpClass() {
RuntimeEnv.INSTANCE.reset();
}
@After
public void cleanUp() throws Exception {
if (poolService_ != null) poolService_.stop();
}
/**
* Returns a {@link File} for the file on the classpath.
*/
private File getClasspathFile(String filename) throws URISyntaxException {
return new File(getClass().getClassLoader().getResource(filename).toURI());
}
@Test
public void testPoolResolution() throws Exception {
createPoolService(ALLOCATION_FILE, LLAMA_CONFIG_FILE);
Assert.assertEquals("root.queueA", poolService_.assignToPool("root.queueA", "userA"));
Assert.assertNull(poolService_.assignToPool("nonexistentQueue", "userA"));
}
@Test
public void testResolvePrincipalName() throws Exception {
// Tests that we can resolve user names that are Kerberos principals/LDAP users.
createPoolService(ALLOCATION_FILE, LLAMA_CONFIG_FILE);
TResolveRequestPoolResult result = poolService_.resolveRequestPool(
new TResolveRequestPoolParams("userA@abc.com", "root.queueA"));
Assert.assertEquals(TErrorCode.OK, result.getStatus().getStatus_code());
Assert.assertEquals("root.queueA", result.getResolved_pool());
result = poolService_.resolveRequestPool(
new TResolveRequestPoolParams("userA/a.qualified.domain@abc.com", "root.queueA"));
Assert.assertEquals(TErrorCode.OK, result.getStatus().getStatus_code());
Assert.assertEquals("root.queueA", result.getResolved_pool());
}
@Test
public void testUserNoGroupsError() throws Exception {
// Test fix for IMPALA-922: "Return helpful errors with Yarn group rules"
createPoolService(ALLOCATION_FILE_GROUP_RULE, LLAMA_CONFIG_FILE);
TResolveRequestPoolResult result = poolService_.resolveRequestPool(
new TResolveRequestPoolParams("userA", "root.NOT_A_POOL"));
Assert.assertEquals(false, result.isSetResolved_pool());
Assert.assertEquals(false, result.isSetHas_access());
Assert.assertEquals(TErrorCode.INTERNAL_ERROR, result.getStatus().getStatus_code());
String expectedMessage = "Failed to resolve user 'userA' to a pool while " +
"evaluating the 'primaryGroup' or 'secondaryGroup' queue placement rules because " +
"no groups were found for the user. This is likely because the user does not " +
"exist on the local operating system.";
Assert.assertEquals(expectedMessage,
Iterables.getOnlyElement(result.getStatus().getError_msgs()));
}
@Test
public void testPoolAcls() throws Exception {
createPoolService(ALLOCATION_FILE, LLAMA_CONFIG_FILE);
Assert.assertTrue(poolService_.hasAccess("root.queueA", "userA"));
Assert.assertTrue(poolService_.hasAccess("root.queueB", "userB"));
Assert.assertFalse(poolService_.hasAccess("root.queueB", "userA"));
Assert.assertTrue(poolService_.hasAccess("root.queueB", "root"));
}
@Test
public void testPoolLimitConfigs() throws Exception {
createPoolService(ALLOCATION_FILE, LLAMA_CONFIG_FILE);
checkPoolConfigResult("root", 15, 50, -1, 30000L, "mem_limit=1024m");
checkPoolConfigResult("root.queueA", 10, 30, 1024 * ByteUnits.MEGABYTE,
10000L, "mem_limit=1024m,query_timeout_s=10");
checkPoolConfigResult("root.queueB", 5, 10, -1, 30000L, "mem_limit=1024m");
checkPoolConfigResult("root.queueC", 5, 10, 1024 * ByteUnits.MEGABYTE, 30000L,
"mem_limit=1024m", 1000, 10, false, 0.0, 0.0, 0);
checkPoolConfigResult("root.queueD", 5, 10, 1024 * ByteUnits.MEGABYTE, 30000L,
"mem_limit=1024m", 0, 0, true, 0.5, 2.5, 25);
}
@Test
public void testDefaultConfigs() throws Exception {
createPoolService(ALLOCATION_FILE_EMPTY, LLAMA_CONFIG_FILE_EMPTY);
Assert.assertEquals("root.userA", poolService_.assignToPool("", "userA"));
Assert.assertTrue(poolService_.hasAccess("root.userA", "userA"));
checkPoolConfigResult("root", -1, 200, -1, null, "", 0, 0, true, 0.0, 0.0, 0);
}
@Ignore("IMPALA-4868") @Test
public void testUpdatingConfigs() throws Exception {
// Tests updating the config files and then checking the pool resolution, ACLs, and
// pool limit configs. This tests all three together rather than separating into
// separate test cases because we updateConfigFiles() will end up waiting around 7
// seconds, so this helps cut down on the total test execution time.
// A one second pause is necessary to ensure the file timestamps are unique if the
// test gets here within one second.
createPoolService(ALLOCATION_FILE, LLAMA_CONFIG_FILE);
Thread.sleep(1000L);
Files.copy(getClasspathFile(ALLOCATION_FILE_MODIFIED), allocationConfFile_);
Files.copy(getClasspathFile(LLAMA_CONFIG_FILE_MODIFIED), llamaConfFile_);
// Need to wait for the YARN AllocationFileLoaderService (for the
// allocationConfFile_) as well as the FileWatchService (for the llamaConfFile_). If
// the system is busy this may take even longer, so we need to try a few times.
Thread.sleep(CHECK_INTERVAL_MS + AllocationFileLoaderService.ALLOC_RELOAD_WAIT_MS);
int numAttempts = 20;
while (true) {
try {
checkModifiedConfigResults();
break;
} catch (AssertionError e) {
if (numAttempts == 0) throw e;
--numAttempts;
Thread.sleep(1000L);
}
}
}
@Test
public void testModifiedConfigs() throws Exception {
// Tests the results are the same as testUpdatingConfigs() as when we create the
// pool service with the same modified configs initially (i.e. not updating).
createPoolService(ALLOCATION_FILE_MODIFIED, LLAMA_CONFIG_FILE_MODIFIED);
checkModifiedConfigResults();
}
@Test
public void testNullLlamaSite() throws Exception {
createPoolService(ALLOCATION_FILE_MODIFIED, null);
// Test pool resolution
Assert.assertEquals("root.queueA", poolService_.assignToPool("queueA", "userA"));
Assert.assertNull(poolService_.assignToPool("queueX", "userA"));
Assert.assertEquals("root.queueC", poolService_.assignToPool("queueC", "userA"));
// Test pool ACLs
Assert.assertTrue(poolService_.hasAccess("root.queueA", "userA"));
Assert.assertTrue(poolService_.hasAccess("root.queueB", "userB"));
Assert.assertTrue(poolService_.hasAccess("root.queueB", "userA"));
Assert.assertFalse(poolService_.hasAccess("root.queueC", "userA"));
Assert.assertTrue(poolService_.hasAccess("root.queueC", "root"));
// Test pool limits
checkPoolConfigResult("root", -1, 200, -1);
checkPoolConfigResult("root.queueA", -1, 200, 100000 * ByteUnits.MEGABYTE);
checkPoolConfigResult("root.queueB", -1, 200, -1);
checkPoolConfigResult("root.queueC", -1, 200, 128 * ByteUnits.MEGABYTE);
}
private void checkModifiedConfigResults()
throws InternalException, IOException {
// Test pool resolution: now there's a queueC
Assert.assertEquals("root.queueA", poolService_.assignToPool("queueA", "userA"));
Assert.assertNull(poolService_.assignToPool("queueX", "userA"));
Assert.assertEquals("root.queueC", poolService_.assignToPool("queueC", "userA"));
// Test pool ACL changes
Assert.assertTrue(poolService_.hasAccess("root.queueA", "userA"));
Assert.assertTrue(poolService_.hasAccess("root.queueB", "userB"));
Assert.assertTrue(poolService_.hasAccess("root.queueB", "userA"));
Assert.assertFalse(poolService_.hasAccess("root.queueC", "userA"));
Assert.assertTrue(poolService_.hasAccess("root.queueC", "root"));
// Test pool limit changes
checkPoolConfigResult("root", 15, 100, -1, 30000L, "");
// not_a_valid_option=foo.bar gets filtered out when parsing the query options on
// the backend, but it should be observed coming from the test file here.
checkPoolConfigResult("root.queueA", 1, 30, 100000 * ByteUnits.MEGABYTE,
50L, "mem_limit=128m,query_timeout_s=5,not_a_valid_option=foo.bar");
checkPoolConfigResult("root.queueB", 5, 10, -1, 600000L, "");
checkPoolConfigResult("root.queueC", 10, 30, 128 * ByteUnits.MEGABYTE,
30000L, "mem_limit=2048m,query_timeout_s=60");
}
/**
* Helper method to verify the per-pool limits.
*/
private void checkPoolConfigResult(String pool, long expectedMaxRequests,
long expectedMaxQueued, long expectedMaxMem, Long expectedQueueTimeoutMs,
String expectedQueryOptions, long max_query_mem_limit, long min_query_mem_limit,
boolean clamp_mem_limit_query_option, double max_running_queries_multiple,
double max_queued_queries_multiple, long max_memory_multiple) {
TPoolConfig expectedResult = new TPoolConfig();
expectedResult.setMax_requests(expectedMaxRequests);
expectedResult.setMax_queued(expectedMaxQueued);
expectedResult.setMax_mem_resources(expectedMaxMem);
expectedResult.setMax_query_mem_limit(max_query_mem_limit);
expectedResult.setMin_query_mem_limit(min_query_mem_limit);
expectedResult.setClamp_mem_limit_query_option(clamp_mem_limit_query_option);
expectedResult.setMax_running_queries_multiple(max_running_queries_multiple);
expectedResult.setMax_queued_queries_multiple(max_queued_queries_multiple);
expectedResult.setMax_memory_multiple(max_memory_multiple);
if (expectedQueueTimeoutMs != null) {
expectedResult.setQueue_timeout_ms(expectedQueueTimeoutMs);
}
if (expectedQueryOptions != null) {
expectedResult.setDefault_query_options(expectedQueryOptions);
}
Assert.assertEquals("Unexpected config values for pool " + pool,
expectedResult, poolService_.getPoolConfig(pool));
}
private void checkPoolConfigResult(String pool, long expectedMaxRequests,
long expectedMaxQueued, long expectedMaxMem, Long expectedQueueTimeoutMs,
String expectedQueryOptions) {
checkPoolConfigResult(pool, expectedMaxRequests, expectedMaxQueued, expectedMaxMem,
expectedQueueTimeoutMs, expectedQueryOptions, 0, 0, true, 0.0, 0.0, 0);
}
private void checkPoolConfigResult(String pool, long expectedMaxRequests,
long expectedMaxQueued, long expectedMaxMemUsage) {
checkPoolConfigResult(pool, expectedMaxRequests, expectedMaxQueued,
expectedMaxMemUsage, null, "");
}
}