blob: 5813340da288f00379720ea824898b2f944f387f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import javax.ws.rs.core.MultivaluedMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
public class TestTimelineClientV2Impl {
private static final Log LOG =
LogFactory.getLog(TestTimelineClientV2Impl.class);
private TestV2TimelineClient client;
private static final long TIME_TO_SLEEP = 150L;
private static final String EXCEPTION_MSG = "Exception in the content";
@Before
public void setup() {
conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
conf.setInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE, 3);
if (!currTestName.getMethodName()
.contains("testRetryOnConnectionFailure")) {
client = createTimelineClient(conf);
}
}
@Rule
public TestName currTestName = new TestName();
private YarnConfiguration conf;
private TestV2TimelineClient createTimelineClient(YarnConfiguration config) {
ApplicationId id = ApplicationId.newInstance(0, 0);
TestV2TimelineClient tc = new TestV2TimelineClient(id);
tc.init(config);
tc.start();
return tc;
}
private class TestV2TimelineClientForExceptionHandling
extends TimelineClientImpl {
public TestV2TimelineClientForExceptionHandling(ApplicationId id) {
super(id);
}
private boolean throwYarnException;
public void setThrowYarnException(boolean throwYarnException) {
this.throwYarnException = throwYarnException;
}
public boolean isThrowYarnException() {
return throwYarnException;
}
@Override
protected void putObjects(URI base, String path,
MultivaluedMap<String, String> params, Object obj)
throws IOException, YarnException {
if (throwYarnException) {
throw new YarnException(EXCEPTION_MSG);
} else {
throw new IOException(
"Failed to get the response from the timeline server.");
}
}
}
private class TestV2TimelineClient
extends TestV2TimelineClientForExceptionHandling {
private boolean sleepBeforeReturn;
private List<TimelineEntities> publishedEntities;
public TimelineEntities getPublishedEntities(int putIndex) {
Assert.assertTrue("Not So many entities Published",
putIndex < publishedEntities.size());
return publishedEntities.get(putIndex);
}
public void setSleepBeforeReturn(boolean sleepBeforeReturn) {
this.sleepBeforeReturn = sleepBeforeReturn;
}
public int getNumOfTimelineEntitiesPublished() {
return publishedEntities.size();
}
public TestV2TimelineClient(ApplicationId id) {
super(id);
publishedEntities = new ArrayList<TimelineEntities>();
}
protected void putObjects(String path,
MultivaluedMap<String, String> params, Object obj)
throws IOException, YarnException {
if (isThrowYarnException()) {
throw new YarnException("ActualException");
}
publishedEntities.add((TimelineEntities) obj);
if (sleepBeforeReturn) {
try {
Thread.sleep(TIME_TO_SLEEP);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
@Test
public void testExceptionMultipleRetry() {
TestV2TimelineClientForExceptionHandling c =
new TestV2TimelineClientForExceptionHandling(
ApplicationId.newInstance(0, 0));
int maxRetries = 2;
conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
maxRetries);
c.init(conf);
c.start();
c.setTimelineServiceAddress("localhost:12345");
try {
c.putEntities(new TimelineEntity());
} catch (IOException e) {
Assert.fail("YARN exception is expected");
} catch (YarnException e) {
Throwable cause = e.getCause();
Assert.assertTrue("IOException is expected",
cause instanceof IOException);
Assert.assertTrue("YARN exception is expected",
cause.getMessage().contains(
"TimelineClient has reached to max retry times : " + maxRetries));
}
c.setThrowYarnException(true);
try {
c.putEntities(new TimelineEntity());
} catch (IOException e) {
Assert.fail("YARN exception is expected");
} catch (YarnException e) {
Throwable cause = e.getCause();
Assert.assertTrue("YARN exception is expected",
cause instanceof YarnException);
Assert.assertTrue("YARN exception is expected",
cause.getMessage().contains(EXCEPTION_MSG));
}
c.stop();
}
@Test
public void testPostEntities() throws Exception {
try {
client.putEntities(generateEntity("1"));
} catch (YarnException e) {
Assert.fail("Exception is not expected");
}
}
@Test
public void testASyncCallMerge() throws Exception {
client.setSleepBeforeReturn(true);
try {
client.putEntitiesAsync(generateEntity("1"));
Thread.sleep(TIME_TO_SLEEP / 2);
// by the time first put response comes push 2 entities in the queue
client.putEntitiesAsync(generateEntity("2"));
client.putEntitiesAsync(generateEntity("3"));
} catch (YarnException e) {
Assert.fail("Exception is not expected");
}
for (int i = 0; i < 4; i++) {
if (client.getNumOfTimelineEntitiesPublished() == 2) {
break;
}
Thread.sleep(TIME_TO_SLEEP);
}
Assert.assertEquals("two merged TimelineEntities needs to be published", 2,
client.getNumOfTimelineEntitiesPublished());
TimelineEntities secondPublishedEntities = client.getPublishedEntities(1);
Assert.assertEquals(
"Merged TimelineEntities Object needs to 2 TimelineEntity Object", 2,
secondPublishedEntities.getEntities().size());
Assert.assertEquals("Order of Async Events Needs to be FIFO", "2",
secondPublishedEntities.getEntities().get(0).getId());
Assert.assertEquals("Order of Async Events Needs to be FIFO", "3",
secondPublishedEntities.getEntities().get(1).getId());
}
@Test
public void testSyncCall() throws Exception {
try {
// sync entity should not be be merged with Async
client.putEntities(generateEntity("1"));
client.putEntitiesAsync(generateEntity("2"));
client.putEntitiesAsync(generateEntity("3"));
// except for the sync call above 2 should be merged
client.putEntities(generateEntity("4"));
} catch (YarnException e) {
Assert.fail("Exception is not expected");
}
for (int i = 0; i < 4; i++) {
if (client.getNumOfTimelineEntitiesPublished() == 3) {
break;
}
Thread.sleep(TIME_TO_SLEEP);
}
printReceivedEntities();
Assert.assertEquals("TimelineEntities not published as desired", 3,
client.getNumOfTimelineEntitiesPublished());
TimelineEntities firstPublishedEntities = client.getPublishedEntities(0);
Assert.assertEquals("sync entities should not be merged with async", 1,
firstPublishedEntities.getEntities().size());
// test before pushing the sync entities asyncs are merged and pushed
TimelineEntities secondPublishedEntities = client.getPublishedEntities(1);
Assert.assertEquals(
"async entities should be merged before publishing sync", 2,
secondPublishedEntities.getEntities().size());
Assert.assertEquals("Order of Async Events Needs to be FIFO", "2",
secondPublishedEntities.getEntities().get(0).getId());
Assert.assertEquals("Order of Async Events Needs to be FIFO", "3",
secondPublishedEntities.getEntities().get(1).getId());
// test the last entity published is sync put
TimelineEntities thirdPublishedEntities = client.getPublishedEntities(2);
Assert.assertEquals("sync entities had to be published at the last", 1,
thirdPublishedEntities.getEntities().size());
Assert.assertEquals("Expected last sync Event is not proper", "4",
thirdPublishedEntities.getEntities().get(0).getId());
}
@Test
public void testExceptionCalls() throws Exception {
client.setThrowYarnException(true);
try {
client.putEntitiesAsync(generateEntity("1"));
} catch (YarnException e) {
Assert.fail("Async calls are not expected to throw exception");
}
try {
client.putEntities(generateEntity("2"));
Assert.fail("Sync calls are expected to throw exception");
} catch (YarnException e) {
Assert.assertEquals("Same exception needs to be thrown",
"ActualException", e.getCause().getMessage());
}
}
@Test
public void testConfigurableNumberOfMerges() throws Exception {
client.setSleepBeforeReturn(true);
try {
// At max 3 entities need to be merged
client.putEntitiesAsync(generateEntity("1"));
client.putEntitiesAsync(generateEntity("2"));
client.putEntitiesAsync(generateEntity("3"));
client.putEntitiesAsync(generateEntity("4"));
client.putEntities(generateEntity("5"));
client.putEntitiesAsync(generateEntity("6"));
client.putEntitiesAsync(generateEntity("7"));
client.putEntitiesAsync(generateEntity("8"));
client.putEntitiesAsync(generateEntity("9"));
client.putEntitiesAsync(generateEntity("10"));
} catch (YarnException e) {
Assert.fail("No exception expected");
}
// not having the same logic here as it doesn't depend on how many times
// events are published.
Thread.sleep(2 * TIME_TO_SLEEP);
printReceivedEntities();
for (TimelineEntities publishedEntities : client.publishedEntities) {
Assert.assertTrue(
"Number of entities should not be greater than 3 for each publish,"
+ " but was " + publishedEntities.getEntities().size(),
publishedEntities.getEntities().size() <= 3);
}
}
@Test
public void testAfterStop() throws Exception {
client.setSleepBeforeReturn(true);
try {
// At max 3 entities need to be merged
client.putEntities(generateEntity("1"));
for (int i = 2; i < 20; i++) {
client.putEntitiesAsync(generateEntity("" + i));
}
client.stop();
try {
client.putEntitiesAsync(generateEntity("50"));
Assert.fail("Exception expected");
} catch (YarnException e) {
// expected
}
} catch (YarnException e) {
Assert.fail("No exception expected");
}
// not having the same logic here as it doesn't depend on how many times
// events are published.
for (int i = 0; i < 5; i++) {
TimelineEntities publishedEntities =
client.publishedEntities.get(client.publishedEntities.size() - 1);
TimelineEntity timelineEntity = publishedEntities.getEntities()
.get(publishedEntities.getEntities().size() - 1);
if (!timelineEntity.getId().equals("19")) {
Thread.sleep(2 * TIME_TO_SLEEP);
}
}
printReceivedEntities();
TimelineEntities publishedEntities =
client.publishedEntities.get(client.publishedEntities.size() - 1);
TimelineEntity timelineEntity = publishedEntities.getEntities()
.get(publishedEntities.getEntities().size() - 1);
Assert.assertEquals("", "19", timelineEntity.getId());
}
private void printReceivedEntities() {
for (int i = 0; i < client.getNumOfTimelineEntitiesPublished(); i++) {
TimelineEntities publishedEntities = client.getPublishedEntities(i);
StringBuilder entitiesPerPublish = new StringBuilder();
for (TimelineEntity entity : publishedEntities.getEntities()) {
entitiesPerPublish.append(entity.getId());
entitiesPerPublish.append(",");
}
LOG.info("Entities Published @ index " + i + " : "
+ entitiesPerPublish.toString());
}
}
private static TimelineEntity generateEntity(String id) {
TimelineEntity entity = new TimelineEntity();
entity.setId(id);
entity.setType("testEntity");
entity.setCreatedTime(System.currentTimeMillis());
return entity;
}
@After
public void tearDown() {
if (client != null) {
client.stop();
}
}
}