blob: b1e490de9d08050cf17a56560d7db1ff06953a2d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import javax.ws.rs.core.MultivaluedMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
/**
* Tests {@link TimelineV2ClientImpl}.
*/
public class TestTimelineClientV2Impl {
private static final Log LOG =
LogFactory.getLog(TestTimelineClientV2Impl.class);
private TestV2TimelineClient client;
private static final long TIME_TO_SLEEP = 150L;
private static final String EXCEPTION_MSG = "Exception in the content";
@Before
public void setup() {
conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
conf.setInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE, 3);
if (!currTestName.getMethodName()
.contains("testRetryOnConnectionFailure")) {
client = createTimelineClient(conf);
}
}
@Rule
public TestName currTestName = new TestName();
private YarnConfiguration conf;
private TestV2TimelineClient createTimelineClient(YarnConfiguration config) {
ApplicationId id = ApplicationId.newInstance(0, 0);
TestV2TimelineClient tc = new TestV2TimelineClient(id);
tc.init(config);
tc.start();
return tc;
}
private class TestV2TimelineClientForExceptionHandling
extends TimelineV2ClientImpl {
public TestV2TimelineClientForExceptionHandling(ApplicationId id) {
super(id);
}
private boolean throwYarnException;
public void setThrowYarnException(boolean throwYarnException) {
this.throwYarnException = throwYarnException;
}
public boolean isThrowYarnException() {
return throwYarnException;
}
@Override
protected void putObjects(URI base, String path,
MultivaluedMap<String, String> params, Object obj)
throws IOException, YarnException {
if (throwYarnException) {
throw new YarnException(EXCEPTION_MSG);
} else {
throw new IOException(
"Failed to get the response from the timeline server.");
}
}
}
private class TestV2TimelineClient
extends TestV2TimelineClientForExceptionHandling {
private boolean sleepBeforeReturn;
private List<TimelineEntities> publishedEntities;
public TimelineEntities getPublishedEntities(int putIndex) {
Assert.assertTrue("Not So many entities Published",
putIndex < publishedEntities.size());
return publishedEntities.get(putIndex);
}
public void setSleepBeforeReturn(boolean sleepBeforeReturn) {
this.sleepBeforeReturn = sleepBeforeReturn;
}
public int getNumOfTimelineEntitiesPublished() {
return publishedEntities.size();
}
public TestV2TimelineClient(ApplicationId id) {
super(id);
publishedEntities = new ArrayList<TimelineEntities>();
}
protected void putObjects(String path,
MultivaluedMap<String, String> params, Object obj)
throws IOException, YarnException {
if (isThrowYarnException()) {
throw new YarnException("ActualException");
}
publishedEntities.add((TimelineEntities) obj);
if (sleepBeforeReturn) {
try {
Thread.sleep(TIME_TO_SLEEP);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
@Test
public void testExceptionMultipleRetry() {
TestV2TimelineClientForExceptionHandling c =
new TestV2TimelineClientForExceptionHandling(
ApplicationId.newInstance(0, 0));
int maxRetries = 2;
conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
maxRetries);
c.init(conf);
c.start();
c.setTimelineCollectorInfo(CollectorInfo.newInstance("localhost:12345"));
try {
c.putEntities(new TimelineEntity());
} catch (IOException e) {
Assert.fail("YARN exception is expected");
} catch (YarnException e) {
Throwable cause = e.getCause();
Assert.assertTrue("IOException is expected",
cause instanceof IOException);
Assert.assertTrue("YARN exception is expected",
cause.getMessage().contains(
"TimelineClient has reached to max retry times : " + maxRetries));
}
c.setThrowYarnException(true);
try {
c.putEntities(new TimelineEntity());
} catch (IOException e) {
Assert.fail("YARN exception is expected");
} catch (YarnException e) {
Throwable cause = e.getCause();
Assert.assertTrue("YARN exception is expected",
cause instanceof YarnException);
Assert.assertTrue("YARN exception is expected",
cause.getMessage().contains(EXCEPTION_MSG));
}
c.stop();
}
@Test
public void testPostEntities() throws Exception {
try {
client.putEntities(generateEntity("1"));
} catch (YarnException e) {
Assert.fail("Exception is not expected");
}
}
@Test
public void testASyncCallMerge() throws Exception {
client.setSleepBeforeReturn(true);
try {
client.putEntitiesAsync(generateEntity("1"));
Thread.sleep(TIME_TO_SLEEP / 2);
// by the time first put response comes push 2 entities in the queue
client.putEntitiesAsync(generateEntity("2"));
client.putEntitiesAsync(generateEntity("3"));
} catch (YarnException e) {
Assert.fail("Exception is not expected");
}
for (int i = 0; i < 4; i++) {
if (client.getNumOfTimelineEntitiesPublished() == 2) {
break;
}
Thread.sleep(TIME_TO_SLEEP);
}
Assert.assertEquals("two merged TimelineEntities needs to be published", 2,
client.getNumOfTimelineEntitiesPublished());
TimelineEntities secondPublishedEntities = client.getPublishedEntities(1);
Assert.assertEquals(
"Merged TimelineEntities Object needs to 2 TimelineEntity Object", 2,
secondPublishedEntities.getEntities().size());
Assert.assertEquals("Order of Async Events Needs to be FIFO", "2",
secondPublishedEntities.getEntities().get(0).getId());
Assert.assertEquals("Order of Async Events Needs to be FIFO", "3",
secondPublishedEntities.getEntities().get(1).getId());
}
@Test
public void testSyncCall() throws Exception {
try {
// sync entity should not be be merged with Async
client.putEntities(generateEntity("1"));
client.putEntitiesAsync(generateEntity("2"));
client.putEntitiesAsync(generateEntity("3"));
// except for the sync call above 2 should be merged
client.putEntities(generateEntity("4"));
} catch (YarnException e) {
Assert.fail("Exception is not expected");
}
for (int i = 0; i < 4; i++) {
if (client.getNumOfTimelineEntitiesPublished() == 3) {
break;
}
Thread.sleep(TIME_TO_SLEEP);
}
printReceivedEntities();
Assert.assertEquals("TimelineEntities not published as desired", 3,
client.getNumOfTimelineEntitiesPublished());
TimelineEntities firstPublishedEntities = client.getPublishedEntities(0);
Assert.assertEquals("sync entities should not be merged with async", 1,
firstPublishedEntities.getEntities().size());
// test before pushing the sync entities asyncs are merged and pushed
TimelineEntities secondPublishedEntities = client.getPublishedEntities(1);
Assert.assertEquals(
"async entities should be merged before publishing sync", 2,
secondPublishedEntities.getEntities().size());
Assert.assertEquals("Order of Async Events Needs to be FIFO", "2",
secondPublishedEntities.getEntities().get(0).getId());
Assert.assertEquals("Order of Async Events Needs to be FIFO", "3",
secondPublishedEntities.getEntities().get(1).getId());
// test the last entity published is sync put
TimelineEntities thirdPublishedEntities = client.getPublishedEntities(2);
Assert.assertEquals("sync entities had to be published at the last", 1,
thirdPublishedEntities.getEntities().size());
Assert.assertEquals("Expected last sync Event is not proper", "4",
thirdPublishedEntities.getEntities().get(0).getId());
}
@Test
public void testExceptionCalls() throws Exception {
client.setThrowYarnException(true);
try {
client.putEntitiesAsync(generateEntity("1"));
} catch (YarnException e) {
Assert.fail("Async calls are not expected to throw exception");
}
try {
client.putEntities(generateEntity("2"));
Assert.fail("Sync calls are expected to throw exception");
} catch (YarnException e) {
Assert.assertEquals("Same exception needs to be thrown",
"ActualException", e.getCause().getMessage());
}
}
@Test
public void testConfigurableNumberOfMerges() throws Exception {
client.setSleepBeforeReturn(true);
try {
// At max 3 entities need to be merged
client.putEntitiesAsync(generateEntity("1"));
client.putEntitiesAsync(generateEntity("2"));
client.putEntitiesAsync(generateEntity("3"));
client.putEntitiesAsync(generateEntity("4"));
client.putEntities(generateEntity("5"));
client.putEntitiesAsync(generateEntity("6"));
client.putEntitiesAsync(generateEntity("7"));
client.putEntitiesAsync(generateEntity("8"));
client.putEntitiesAsync(generateEntity("9"));
client.putEntitiesAsync(generateEntity("10"));
} catch (YarnException e) {
Assert.fail("No exception expected");
}
// not having the same logic here as it doesn't depend on how many times
// events are published.
Thread.sleep(2 * TIME_TO_SLEEP);
printReceivedEntities();
for (TimelineEntities publishedEntities : client.publishedEntities) {
Assert.assertTrue(
"Number of entities should not be greater than 3 for each publish,"
+ " but was " + publishedEntities.getEntities().size(),
publishedEntities.getEntities().size() <= 3);
}
}
@Test
public void testSetTimelineToken() throws Exception {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
assertEquals(0, ugi.getTokens().size());
assertNull("Timeline token in v2 client should not be set",
client.currentTimelineToken);
Token token = Token.newInstance(
new byte[0], "kind", new byte[0], "service");
client.setTimelineCollectorInfo(CollectorInfo.newInstance(null, token));
assertNull("Timeline token in v2 client should not be set as token kind " +
"is unexepcted.", client.currentTimelineToken);
assertEquals(0, ugi.getTokens().size());
token = Token.newInstance(new byte[0], TimelineDelegationTokenIdentifier.
KIND_NAME.toString(), new byte[0], null);
client.setTimelineCollectorInfo(CollectorInfo.newInstance(null, token));
assertNull("Timeline token in v2 client should not be set as serice is " +
"not set.", client.currentTimelineToken);
assertEquals(0, ugi.getTokens().size());
TimelineDelegationTokenIdentifier ident =
new TimelineDelegationTokenIdentifier(new Text(ugi.getUserName()),
new Text("renewer"), null);
ident.setSequenceNumber(1);
token = Token.newInstance(ident.getBytes(),
TimelineDelegationTokenIdentifier.KIND_NAME.toString(), new byte[0],
"localhost:1234");
client.setTimelineCollectorInfo(CollectorInfo.newInstance(null, token));
assertEquals(1, ugi.getTokens().size());
assertNotNull("Timeline token should be set in v2 client.",
client.currentTimelineToken);
assertEquals(token, client.currentTimelineToken);
ident.setSequenceNumber(20);
Token newToken = Token.newInstance(ident.getBytes(),
TimelineDelegationTokenIdentifier.KIND_NAME.toString(), new byte[0],
"localhost:1234");
client.setTimelineCollectorInfo(CollectorInfo.newInstance(null, newToken));
assertEquals(1, ugi.getTokens().size());
assertNotEquals(token, client.currentTimelineToken);
assertEquals(newToken, client.currentTimelineToken);
}
@Test
public void testAfterStop() throws Exception {
client.setSleepBeforeReturn(true);
try {
// At max 3 entities need to be merged
client.putEntities(generateEntity("1"));
for (int i = 2; i < 20; i++) {
client.putEntitiesAsync(generateEntity("" + i));
}
client.stop();
try {
client.putEntitiesAsync(generateEntity("50"));
Assert.fail("Exception expected");
} catch (YarnException e) {
// expected
}
} catch (YarnException e) {
Assert.fail("No exception expected");
}
// not having the same logic here as it doesn't depend on how many times
// events are published.
for (int i = 0; i < 5; i++) {
TimelineEntities publishedEntities =
client.publishedEntities.get(client.publishedEntities.size() - 1);
TimelineEntity timelineEntity = publishedEntities.getEntities()
.get(publishedEntities.getEntities().size() - 1);
if (!timelineEntity.getId().equals("19")) {
Thread.sleep(2 * TIME_TO_SLEEP);
}
}
printReceivedEntities();
TimelineEntities publishedEntities =
client.publishedEntities.get(client.publishedEntities.size() - 1);
TimelineEntity timelineEntity = publishedEntities.getEntities()
.get(publishedEntities.getEntities().size() - 1);
Assert.assertEquals("", "19", timelineEntity.getId());
}
private void printReceivedEntities() {
for (int i = 0; i < client.getNumOfTimelineEntitiesPublished(); i++) {
TimelineEntities publishedEntities = client.getPublishedEntities(i);
StringBuilder entitiesPerPublish = new StringBuilder();
for (TimelineEntity entity : publishedEntities.getEntities()) {
entitiesPerPublish.append(entity.getId());
entitiesPerPublish.append(",");
}
LOG.info("Entities Published @ index " + i + " : "
+ entitiesPerPublish.toString());
}
}
private static TimelineEntity generateEntity(String id) {
TimelineEntity entity = new TimelineEntity();
entity.setId(id);
entity.setType("testEntity");
entity.setCreatedTime(System.currentTimeMillis());
return entity;
}
@After
public void tearDown() {
if (client != null) {
client.stop();
}
}
}