blob: ed61efb03a3826314cc4a9cb9bffff646a4f9d53 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram.client;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.Properties;
import org.junit.Assert;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import com.datatorrent.stram.security.StramUserLogin;
import com.datatorrent.stram.util.ConfigUtils;
/**
* Unit tests for StramClientUtils
*/
public class StramClientUtilsTest
{
@Test
public void testEvalExpression() throws Exception
{
Configuration conf = new Configuration();
conf.set("a.b.c", "123");
conf.set("d.e.f", "456");
conf.set("x.y.z", "foobar");
Properties prop = new Properties();
prop.put("product.result", "Product result is {% (_prop[\"a.b.c\"] * _prop[\"d.e.f\"]).toFixed(0) %}...");
prop.put("concat.result", "Concat result is {% _prop[\"x.y.z\"] %} ... {% _prop[\"a.b.c\"] %} blah");
StramClientUtils.evalProperties(prop, conf);
Assert.assertEquals("Product result is " + (123 * 456) + "...", prop.get("product.result"));
Assert.assertEquals("Concat result is foobar ... 123 blah", prop.get("concat.result"));
Assert.assertEquals("123", conf.get("a.b.c"));
Assert.assertEquals("456", conf.get("d.e.f"));
Assert.assertEquals("foobar", conf.get("x.y.z"));
}
@Test
public void testVariableSubstitution() throws Exception
{
Configuration conf = new Configuration();
conf.set("a.b.c", "123");
conf.set("x.y.z", "foobar");
Properties prop = new Properties();
prop.put("var.result", "1111 ${a.b.c} xxx ${x.y.z} yyy");
StramClientUtils.evalProperties(prop, conf);
Assert.assertEquals("1111 123 xxx foobar yyy", prop.get("var.result"));
}
@Test
public void testEvalConfiguration() throws Exception
{
Configuration conf = new Configuration();
conf.set("a.b.c", "123");
conf.set("x.y.z", "foobar");
conf.set("sub.result", "1111 ${a.b.c} xxx ${x.y.z} yyy");
conf.set("script.result", "1111 {% (_prop[\"a.b.c\"] * _prop[\"a.b.c\"]).toFixed(0) %} xxx");
StramClientUtils.evalConfiguration(conf);
Assert.assertEquals("1111 123 xxx foobar yyy", conf.get("sub.result"));
Assert.assertEquals("1111 15129 xxx", conf.get("script.result"));
}
private String getHostString(String host) throws UnknownHostException
{
InetAddress address = InetAddress.getByName(host);
if (address.isAnyLocalAddress() || address.isLoopbackAddress()) {
return address.getCanonicalHostName();
} else {
return address.getHostName();
}
}
@Test
public void testRMWebAddress() throws UnknownHostException
{
Configuration conf = new YarnConfiguration(new Configuration(false))
{
@Override
public InetSocketAddress getSocketAddr(String name, String defaultAddress, int defaultPort)
{
String rmId = get(ConfigUtils.RM_HA_ID);
if (rmId != null) {
name = name + "." + rmId;
}
return super.getSocketAddr(name, defaultAddress, defaultPort);
}
};
// basic test
conf.setBoolean(CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_KEY, false);
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "192.168.1.1:8032");
conf.set(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "192.168.1.2:8032");
Assert.assertEquals(getHostString("192.168.1.1") + ":8032", StramClientUtils.getSocketConnectString(StramClientUtils.getRMWebAddress(conf, null)));
List<InetSocketAddress> addresses = StramClientUtils.getRMAddresses(conf);
Assert.assertEquals(1, addresses.size());
Assert.assertEquals(getHostString("192.168.1.1") + ":8032", StramClientUtils.getSocketConnectString(addresses.get(0)));
conf.setBoolean(CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_KEY, true);
Assert.assertEquals(getHostString("192.168.1.2") + ":8032", StramClientUtils.getSocketConnectString(StramClientUtils.getRMWebAddress(conf, null)));
addresses = StramClientUtils.getRMAddresses(conf);
Assert.assertEquals(1, addresses.size());
Assert.assertEquals(getHostString("192.168.1.2") + ":8032", StramClientUtils.getSocketConnectString(addresses.get(0)));
// set localhost if host is unknown
conf.set(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "someunknownhost.:8032");
Assert.assertEquals(InetAddress.getLocalHost().getCanonicalHostName() + ":8032", StramClientUtils.getSocketConnectString(StramClientUtils.getRMWebAddress(conf, null)));
// set localhost
conf.set(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "127.0.0.1:8032");
Assert.assertEquals(InetAddress.getLocalHost().getCanonicalHostName() + ":8032", StramClientUtils.getSocketConnectString(StramClientUtils.getRMWebAddress(conf, null)));
// test when HA is enabled
conf.setBoolean(ConfigUtils.RM_HA_ENABLED, true);
conf.set(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + ".rm1", "192.168.1.1:8032");
conf.set(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + ".rm2", "192.168.1.2:8032");
conf.set("yarn.resourcemanager.ha.rm-ids", "rm1,rm2");
Assert.assertEquals(getHostString("192.168.1.1") + ":8032", StramClientUtils.getSocketConnectString(StramClientUtils.getRMWebAddress(conf, "rm1")));
Assert.assertEquals(getHostString("192.168.1.2") + ":8032", StramClientUtils.getSocketConnectString(StramClientUtils.getRMWebAddress(conf, "rm2")));
addresses = StramClientUtils.getRMAddresses(conf);
Assert.assertEquals(2, addresses.size());
Assert.assertEquals(getHostString("192.168.1.1") + ":8032", StramClientUtils.getSocketConnectString(addresses.get(0)));
Assert.assertEquals(getHostString("192.168.1.2") + ":8032", StramClientUtils.getSocketConnectString(addresses.get(1)));
}
/**
* apex.dfsRootDirectory not set: legacy behavior of getDTDFSRootDir()
* @throws IOException
*
*/
@Test
public void getApexDFSRootDirLegacy() throws IOException
{
Configuration conf = new YarnConfiguration(new Configuration(false));
conf.set(StramClientUtils.DT_DFS_ROOT_DIR, "/a/b/c");
conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, false);
FileSystem fs = FileSystem.newInstance(conf);
Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
Assert.assertEquals("file:/a/b/c", path.toString());
}
/**
* apex.dfsRootDirectory set: absolute path e.g. /x/y/z
* @throws IOException
*
*/
@Test
public void getApexDFSRootDirAbsPath() throws IOException
{
Configuration conf = new YarnConfiguration(new Configuration(false));
conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "/x/y/z");
conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, false);
FileSystem fs = FileSystem.newInstance(conf);
UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""});
UserGroupInformation.setLoginUser(testUser);
Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
Assert.assertEquals(fs.getHomeDirectory() + "/datatorrent", path.toString());
}
/**
* apex.dfsRootDirectory set: absolute path with scheme e.g. file:/p/q/r
* @throws IOException
*
*/
@Test
public void getApexDFSRootDirScheme() throws IOException
{
Configuration conf = new YarnConfiguration(new Configuration(false));
conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "file:/p/q/r");
conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, false);
FileSystem fs = FileSystem.newInstance(conf);
UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""});
UserGroupInformation.setLoginUser(testUser);
Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
Assert.assertEquals(fs.getHomeDirectory() + "/datatorrent", path.toString());
}
/**
* apex.dfsRootDirectory set: absolute path with variable %USER_NAME%
* @throws IOException
* @throws InterruptedException
*
*/
@Test
public void getApexDFSRootDirWithVar() throws IOException, InterruptedException
{
final Configuration conf = new YarnConfiguration(new Configuration(false));
conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "/x/%USER_NAME%/z");
conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, false);
final FileSystem fs = FileSystem.newInstance(conf);
UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""});
UserGroupInformation.setLoginUser(testUser);
UserGroupInformation doAsUser = UserGroupInformation.createUserForTesting("impersonated", new String[]{""});
doAsUser.doAs(new PrivilegedExceptionAction<Void>()
{
@Override
public Void run() throws Exception
{
Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
Assert.assertEquals(fs.getHomeDirectory() + "/datatorrent", path.toString());
return null;
}
});
}
/**
* apex.dfsRootDirectory set: absolute path with %USER_NAME% and scheme e.g. file:/x/%USER_NAME%/z
* @throws IOException
* @throws InterruptedException
*
*/
@Test
public void getApexDFSRootDirWithSchemeAndVar() throws IOException, InterruptedException
{
final Configuration conf = new YarnConfiguration(new Configuration(false));
conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "file:/x/%USER_NAME%/z");
conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, true);
final FileSystem fs = FileSystem.newInstance(conf);
UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""});
UserGroupInformation.setLoginUser(testUser);
UserGroupInformation doAsUser = UserGroupInformation.createUserForTesting("impersonated", new String[]{""});
doAsUser.doAs(new PrivilegedExceptionAction<Void>()
{
@Override
public Void run() throws Exception
{
Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
Assert.assertEquals("file:/x/impersonated/z", path.toString());
return null;
}
});
}
/**
* apex.dfsRootDirectory set: relative path
* @throws IOException
* @throws InterruptedException
*
*/
@Test
public void getApexDFSRootDirRelPath() throws IOException, InterruptedException
{
final Configuration conf = new YarnConfiguration(new Configuration(false));
conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "apex");
conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, false);
final FileSystem fs = FileSystem.newInstance(conf);
UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""});
UserGroupInformation.setLoginUser(testUser);
UserGroupInformation doAsUser = UserGroupInformation.createUserForTesting("impersonated", new String[]{""});
doAsUser.doAs(new PrivilegedExceptionAction<Void>()
{
@Override
public Void run() throws Exception
{
Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
Assert.assertEquals(fs.getHomeDirectory() + "/datatorrent", path.toString());
return null;
}
});
}
/**
* apex.dfsRootDirectory set: absolute path with %USER_NAME% and impersonation enabled
* @throws IOException
* @throws InterruptedException
*
*/
@Test
public void getApexDFSRootDirAbsPathAndVar() throws IOException, InterruptedException
{
final Configuration conf = new YarnConfiguration(new Configuration(false));
conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "/x/%USER_NAME%/z");
conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, true);
final FileSystem fs = FileSystem.newInstance(conf);
UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""});
UserGroupInformation.setLoginUser(testUser);
UserGroupInformation doAsUser = UserGroupInformation.createUserForTesting("impersonated", new String[]{""});
doAsUser.doAs(new PrivilegedExceptionAction<Void>()
{
@Override
public Void run() throws Exception
{
Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
Assert.assertEquals("file:/x/impersonated/z", path.toString());
return null;
}
});
}
/**
* apex.dfsRootDirectory set: relative path and impersonation enabled and doAS
* @throws IOException
* @throws InterruptedException
*
*/
@Test
public void getApexDFSRootDirRelPathAndImpersonation() throws IOException, InterruptedException
{
final Configuration conf = new YarnConfiguration(new Configuration(false));
conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "apex");
conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, true);
final FileSystem fs = FileSystem.newInstance(conf);
UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""});
UserGroupInformation.setLoginUser(testUser);
UserGroupInformation doAsUser = UserGroupInformation.createUserForTesting("testUser2", new String[]{""});
doAsUser.doAs(new PrivilegedExceptionAction<Void>()
{
@Override
public Void run() throws Exception
{
Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
Assert.assertEquals("file:/user/testUser2/apex", path.toString());
return null;
}
});
}
/**
* apex.dfsRootDirectory set: relative path blank and impersonation enabled and doAS
* @throws IOException
* @throws InterruptedException
*
*/
@Test
public void getApexDFSRootDirBlankPathAndImpersonation() throws IOException, InterruptedException
{
final Configuration conf = new YarnConfiguration(new Configuration(false));
conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, true);
final FileSystem fs = FileSystem.newInstance(conf);
UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""});
UserGroupInformation.setLoginUser(testUser);
UserGroupInformation doAsUser = UserGroupInformation.createUserForTesting("testUser2", new String[]{""});
doAsUser.doAs(new PrivilegedExceptionAction<Void>()
{
@Override
public Void run() throws Exception
{
Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
Assert.assertEquals("file:/user/testUser2/datatorrent", path.toString());
return null;
}
});
}
/**
* apex.dfsRootDirectory set: relative path having %USER_NAME% and impersonation enabled and doAS
* Make sure currentUser appears twice
* @throws IOException
* @throws InterruptedException
*
*/
@Test
public void getApexDFSRootDirRelPathVarAndImpersonation() throws IOException, InterruptedException
{
final Configuration conf = new YarnConfiguration(new Configuration(false));
conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "apex/%USER_NAME%/xyz");
conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, true);
final FileSystem fs = FileSystem.newInstance(conf);
UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""});
UserGroupInformation.setLoginUser(testUser);
UserGroupInformation doAsUser = UserGroupInformation.createUserForTesting("testUser2", new String[]{""});
doAsUser.doAs(new PrivilegedExceptionAction<Void>()
{
@Override
public Void run() throws Exception
{
Path path = StramClientUtils.getApexDFSRootDir(fs, conf);
Assert.assertEquals("file:/user/testUser2/apex/testUser2/xyz", path.toString());
return null;
}
});
}
}