| /* |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tez.dag.app; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.verify; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.DataInput; |
| import java.io.DataInputStream; |
| import java.io.DataOutput; |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.util.List; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.token.SecretManager; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.security.token.TokenIdentifier; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.util.SystemClock; |
| import org.apache.tez.client.TezApiVersionInfo; |
| import org.apache.tez.common.TezCommonUtils; |
| import org.apache.tez.common.security.JobTokenIdentifier; |
| import org.apache.tez.common.security.JobTokenSecretManager; |
| import org.apache.tez.common.security.TokenCache; |
| import org.apache.tez.dag.api.DagTypeConverters; |
| import org.apache.tez.dag.api.TezConfiguration; |
| import org.apache.tez.dag.api.TezConstants; |
| import org.apache.tez.dag.api.records.DAGProtos; |
| import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; |
| import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto; |
| import org.apache.tez.dag.app.dag.impl.DAGImpl; |
| import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler; |
| import org.apache.tez.dag.records.TezDAGID; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| public class TestDAGAppMaster { |
| |
| private static final File TEST_DIR = new File( |
| System.getProperty("test.build.data", |
| System.getProperty("java.io.tmpdir")), |
| TestDAGAppMaster.class.getSimpleName()).getAbsoluteFile(); |
| |
| @Before |
| public void setup() { |
| FileUtil.fullyDelete(TEST_DIR); |
| TEST_DIR.mkdir(); |
| } |
| |
| @After |
| public void teardown() { |
| FileUtil.fullyDelete(TEST_DIR); |
| } |
| |
| @Test(timeout = 20000) |
| public void testInvalidSession() throws Exception { |
| // AM should fail if not the first attempt and in session mode and |
| // DAG recovery is disabled, otherwise the app can succeed without |
| // finishing an in-progress DAG. |
| ApplicationId appId = ApplicationId.newInstance(1, 1); |
| ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 2); |
| DAGAppMasterForTest dam = new DAGAppMasterForTest(attemptId, true, 3); |
| TezConfiguration conf = new TezConfiguration(false); |
| conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false); |
| dam.init(conf); |
| dam.start(); |
| verify(dam.mockScheduler).setShouldUnregisterFlag(); |
| verify(dam.mockShutdown).shutdown(); |
| List<String> diags = dam.getDiagnostics(); |
| boolean found = false; |
| for (String diag : diags) { |
| if (diag.contains(DAGAppMaster.INVALID_SESSION_ERR_MSG)) { |
| found = true; |
| break; |
| } |
| } |
| assertTrue("Missing invalid session diagnostics", found); |
| dam.stop(); |
| } |
| |
| @Test |
| public void testDagCredentialsWithoutMerge() throws Exception { |
| testDagCredentials(false); |
| } |
| |
| @Test |
| public void testDagCredentialsWithMerge() throws Exception { |
| testDagCredentials(true); |
| } |
| |
| private void testDagCredentials(boolean doMerge) throws IOException { |
| TezConfiguration conf = new TezConfiguration(); |
| conf.setBoolean(TezConfiguration.TEZ_AM_CREDENTIALS_MERGE, doMerge); |
| conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); |
| conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, TEST_DIR.toString()); |
| ApplicationId appId = ApplicationId.newInstance(1, 1); |
| ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); |
| |
| // create some sample AM credentials |
| Credentials amCreds = new Credentials(); |
| JobTokenSecretManager jtsm = new JobTokenSecretManager(); |
| JobTokenIdentifier identifier = new JobTokenIdentifier( |
| new Text(appId.toString())); |
| Token<JobTokenIdentifier> sessionToken = |
| new Token<JobTokenIdentifier>(identifier, jtsm); |
| sessionToken.setService(identifier.getJobId()); |
| TokenCache.setSessionToken(sessionToken, amCreds); |
| TestTokenSecretManager ttsm = new TestTokenSecretManager(); |
| Text tokenAlias1 = new Text("alias1"); |
| Token<TestTokenIdentifier> amToken1 = new Token<TestTokenIdentifier>( |
| new TestTokenIdentifier(new Text("amtoken1")), ttsm); |
| amCreds.addToken(tokenAlias1, amToken1); |
| Text tokenAlias2 = new Text("alias2"); |
| Token<TestTokenIdentifier> amToken2 = new Token<TestTokenIdentifier>( |
| new TestTokenIdentifier(new Text("amtoken2")), ttsm); |
| amCreds.addToken(tokenAlias2, amToken2); |
| |
| FileSystem fs = FileSystem.getLocal(conf); |
| FSDataOutputStream sessionJarsPBOutStream = |
| TezCommonUtils.createFileForAM(fs, new Path(TEST_DIR.toString(), |
| TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME)); |
| DAGProtos.PlanLocalResourcesProto.getDefaultInstance() |
| .writeDelimitedTo(sessionJarsPBOutStream); |
| sessionJarsPBOutStream.close(); |
| DAGAppMaster am = new DAGAppMaster(attemptId, |
| ContainerId.newInstance(attemptId, 1), |
| "127.0.0.1", 0, 0, new SystemClock(), 1, true, |
| TEST_DIR.toString(), new String[] {TEST_DIR.toString()}, |
| new String[] {TEST_DIR.toString()}, |
| new TezApiVersionInfo().getVersion(), 1, amCreds, |
| "someuser"); |
| am.init(conf); |
| am.start(); |
| |
| // create some sample DAG credentials |
| Credentials dagCreds = new Credentials(); |
| Token<TestTokenIdentifier> dagToken1 = new Token<TestTokenIdentifier>( |
| new TestTokenIdentifier(new Text("dagtoken1")), ttsm); |
| dagCreds.addToken(tokenAlias2, dagToken1); |
| Text tokenAlias3 = new Text("alias3"); |
| Token<TestTokenIdentifier> dagToken2 = new Token<TestTokenIdentifier>( |
| new TestTokenIdentifier(new Text("dagtoken2")), ttsm); |
| dagCreds.addToken(tokenAlias3, dagToken2); |
| |
| TezDAGID dagId = TezDAGID.getInstance(appId, 1); |
| DAGPlan dagPlan = DAGPlan.newBuilder() |
| .setName("somedag") |
| .setCredentialsBinary( |
| DagTypeConverters.convertCredentialsToProto(dagCreds)) |
| .build(); |
| DAGImpl dag = am.createDAG(dagPlan, dagId); |
| Credentials fetchedDagCreds = dag.getCredentials(); |
| am.stop(); |
| |
| Token<? extends TokenIdentifier> fetchedToken1 = |
| fetchedDagCreds.getToken(tokenAlias1); |
| if (doMerge) { |
| assertNotNull("AM creds missing from DAG creds", fetchedToken1); |
| compareTestTokens(amToken1, fetchedDagCreds.getToken(tokenAlias1)); |
| } else { |
| assertNull("AM creds leaked to DAG creds", fetchedToken1); |
| } |
| compareTestTokens(dagToken1, fetchedDagCreds.getToken(tokenAlias2)); |
| compareTestTokens(dagToken2, fetchedDagCreds.getToken(tokenAlias3)); |
| } |
| |
| private static void compareTestTokens( |
| Token<? extends TokenIdentifier> expected, |
| Token<? extends TokenIdentifier> actual) throws IOException { |
| TestTokenIdentifier expectedId = getTestTokenIdentifier(expected); |
| TestTokenIdentifier actualId = getTestTokenIdentifier(actual); |
| assertEquals("Token id not preserved", expectedId.getTestId(), |
| actualId.getTestId()); |
| } |
| |
| private static TestTokenIdentifier getTestTokenIdentifier( |
| Token<? extends TokenIdentifier> token) throws IOException { |
| ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); |
| DataInputStream in = new DataInputStream(buf); |
| TestTokenIdentifier tokenId = new TestTokenIdentifier(); |
| tokenId.readFields(in); |
| in.close(); |
| return tokenId; |
| } |
| |
| private static class TestTokenIdentifier extends TokenIdentifier { |
| private static Text KIND_NAME = new Text("test-token"); |
| |
| private Text testId; |
| |
| public TestTokenIdentifier() { |
| this(new Text()); |
| } |
| |
| public TestTokenIdentifier(Text id) { |
| testId = id; |
| } |
| |
| @Override |
| public void readFields(DataInput in) throws IOException { |
| testId.readFields(in); |
| } |
| |
| @Override |
| public void write(DataOutput out) throws IOException { |
| testId.write(out); |
| } |
| |
| @Override |
| public Text getKind() { |
| return KIND_NAME; |
| } |
| |
| @Override |
| public UserGroupInformation getUser() { |
| return UserGroupInformation.createRemoteUser("token-user"); |
| } |
| |
| public Text getTestId() { |
| return testId; |
| } |
| } |
| |
| private static class TestTokenSecretManager extends |
| SecretManager<TestTokenIdentifier> { |
| @Override |
| public byte[] createPassword(TestTokenIdentifier id) { |
| return id.getBytes(); |
| } |
| |
| @Override |
| public byte[] retrievePassword(TestTokenIdentifier id) |
| throws InvalidToken { |
| return id.getBytes(); |
| } |
| |
| @Override |
| public TestTokenIdentifier createIdentifier() { |
| return new TestTokenIdentifier(); |
| } |
| } |
| |
| private static class DAGAppMasterForTest extends DAGAppMaster { |
| private DAGAppMasterShutdownHandler mockShutdown; |
| private TaskSchedulerEventHandler mockScheduler = mock(TaskSchedulerEventHandler.class); |
| |
| public DAGAppMasterForTest(ApplicationAttemptId attemptId, boolean isSession, int maxAttempts) { |
| super(attemptId, ContainerId.newContainerId(attemptId, 1), "hostname", 12345, 12346, |
| new SystemClock(), 0, isSession, TEST_DIR.getAbsolutePath(), |
| new String[] { TEST_DIR.getAbsolutePath() }, new String[] { TEST_DIR.getAbsolutePath() }, |
| new TezDagVersionInfo().getVersion(), maxAttempts, createCredentials(), "jobname"); |
| } |
| |
| private static Credentials createCredentials() { |
| Credentials creds = new Credentials(); |
| JobTokenSecretManager jtsm = new JobTokenSecretManager(); |
| JobTokenIdentifier jtid = new JobTokenIdentifier(new Text()); |
| Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(jtid, jtsm); |
| TokenCache.setSessionToken(token, creds); |
| return creds; |
| } |
| |
| private static void stubSessionResources() throws IOException { |
| FileOutputStream out = new FileOutputStream( |
| new File(TEST_DIR, TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME)); |
| PlanLocalResourcesProto planProto = PlanLocalResourcesProto.getDefaultInstance(); |
| planProto.writeDelimitedTo(out); |
| out.close(); |
| } |
| |
| @Override |
| public synchronized void serviceInit(Configuration conf) throws Exception { |
| stubSessionResources(); |
| conf.setBoolean(TezConfiguration.TEZ_AM_WEBSERVICE_ENABLE, false); |
| super.serviceInit(conf); |
| } |
| |
| @Override |
| protected DAGAppMasterShutdownHandler createShutdownHandler() { |
| mockShutdown = mock(DAGAppMasterShutdownHandler.class); |
| return mockShutdown; |
| } |
| |
| @Override |
| protected TaskSchedulerEventHandler createTaskSchedulerManager() { |
| return mockScheduler; |
| } |
| } |
| } |