blob: 05c5bda76be39547cf6050e4b50507cbc01da60d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.runtime.multi.driver;
import org.apache.reef.runtime.common.driver.api.*;
import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEvent;
import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent;
import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent;
import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEvent;
import org.apache.reef.runtime.local.driver.*;
import org.apache.reef.runtime.multi.client.MultiRuntimeDefinitionBuilder;
import org.apache.reef.runtime.multi.client.parameters.SerializedRuntimeDefinition;
import org.apache.reef.runtime.multi.utils.MultiRuntimeDefinitionSerializer;
import org.apache.reef.runtime.multi.utils.avro.AvroMultiRuntimeDefinition;
import org.apache.reef.tang.*;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.tang.formats.ConfigurationModule;
import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.time.runtime.event.RuntimeStart;
import org.apache.reef.wake.time.runtime.event.RuntimeStop;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.inject.Inject;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* Tests for RuntimesHost.
*/
public class RuntimesHostTest {
private Injector injector;
private static Queue<Object> commandsQueue = new LinkedBlockingQueue<>();
@SuppressWarnings("unchecked")
@Before
public void setUp() throws InjectionException {
final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder()
.bindNamedParameter(
RuntimeParameters.NodeDescriptorHandler.class,
RuntimesHostTest.TestNodeDescriptorHandler.class)
.bindNamedParameter(
RuntimeParameters.ResourceStatusHandler.class,
RuntimesHostTest.TestResourceStatusHandler.class)
.bindNamedParameter(
RuntimeParameters.RuntimeStatusHandler.class,
RuntimesHostTest.TestRuntimeStatusHandler.class)
.bindNamedParameter(
RuntimeParameters.ResourceAllocationHandler.class,
RuntimesHostTest.TestResourceAllocationHandler.class);
this.injector = Tang.Factory.getTang().newInjector(cb.build());
RuntimesHostTest.commandsQueue.clear();
}
@Test(expected = InjectionException.class)
public void testRuntimesHostInitializedWithCorruptedRuntimesList() throws InjectionException {
final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
cb.bindNamedParameter(
SerializedRuntimeDefinition.class,
"corrupted avro");
final Injector forked = injector.forkInjector(cb.build());
final RuntimesHost rHost = forked.getInstance(RuntimesHost.class);
rHost.onRuntimeStart(new RuntimeStart(System.currentTimeMillis()));
}
@Test(expected = RuntimeException.class)
public void testRuntimesHostInitializedWithMissingNodeDescriptorHandler() throws InjectionException {
final Configuration configuration = LocalDriverConfiguration.CONF
.set(LocalDriverConfiguration.MAX_NUMBER_OF_EVALUATORS, 1)
.set(LocalDriverConfiguration.ROOT_FOLDER, "")
.set(LocalDriverConfiguration.JVM_HEAP_SLACK, 1024)
.set(LocalDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, "ID")
.set(LocalDriverConfiguration.JOB_IDENTIFIER, "ID")
.set(LocalDriverConfiguration.RACK_NAMES, "").build();
final String config = getRuntimeDefinition(new MultiRuntimeDefinitionBuilder().addRuntime(configuration, "local")
.build());
final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder()
.bindNamedParameter(
RuntimeParameters.ResourceStatusHandler.class,
RuntimesHostTest.TestResourceStatusHandler.class)
.bindNamedParameter(
RuntimeParameters.RuntimeStatusHandler.class,
RuntimesHostTest.TestRuntimeStatusHandler.class)
.bindNamedParameter(
RuntimeParameters.ResourceAllocationHandler.class,
RuntimesHostTest
.TestResourceAllocationHandler.class);
Injector badInjector = Tang.Factory.getTang().newInjector(cb.build());
final JavaConfigurationBuilder cbtest = Tang.Factory.getTang().newConfigurationBuilder();
cbtest.bindNamedParameter(
SerializedRuntimeDefinition.class,
config);
Injector forked = badInjector.forkInjector(cbtest.build());
final RuntimesHost rHost = forked.getInstance(RuntimesHost.class);
rHost.onRuntimeStart(new RuntimeStart(System.currentTimeMillis()));
}
@Test(expected = RuntimeException.class)
public void testRuntimesHostInitializedWithMissingResourceStatusHandler() throws InjectionException {
final Configuration configuration = LocalDriverConfiguration.CONF
.set(LocalDriverConfiguration.MAX_NUMBER_OF_EVALUATORS, 1)
.set(LocalDriverConfiguration.ROOT_FOLDER, "")
.set(LocalDriverConfiguration.JVM_HEAP_SLACK, 1024)
.set(LocalDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, "ID")
.set(LocalDriverConfiguration.JOB_IDENTIFIER, "ID")
.set(LocalDriverConfiguration.RACK_NAMES, "").build();
final String config = getRuntimeDefinition(new MultiRuntimeDefinitionBuilder().addRuntime(configuration, "local")
.build());
final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder()
.bindNamedParameter(
RuntimeParameters.NodeDescriptorHandler.class,
RuntimesHostTest.TestNodeDescriptorHandler.class)
.bindNamedParameter(
RuntimeParameters.RuntimeStatusHandler.class,
RuntimesHostTest.TestRuntimeStatusHandler.class)
.bindNamedParameter(
RuntimeParameters.ResourceAllocationHandler.class,
RuntimesHostTest.TestResourceAllocationHandler.class);
Injector badInjector = Tang.Factory.getTang().newInjector(cb.build());
final JavaConfigurationBuilder cbtest = Tang.Factory.getTang().newConfigurationBuilder();
cbtest.bindNamedParameter(
SerializedRuntimeDefinition.class,
config);
Injector forked = badInjector.forkInjector(cbtest.build());
final RuntimesHost rHost = forked.getInstance(RuntimesHost.class);
rHost.onRuntimeStart(new RuntimeStart(System.currentTimeMillis()));
}
@Test(expected = RuntimeException.class)
public void testRuntimesHostInitializedWithMissingRuntimeStatusHandler() throws InjectionException {
final Configuration configuration = LocalDriverConfiguration.CONF
.set(LocalDriverConfiguration.MAX_NUMBER_OF_EVALUATORS, 1)
.set(LocalDriverConfiguration.ROOT_FOLDER, "")
.set(LocalDriverConfiguration.JVM_HEAP_SLACK, 1024)
.set(LocalDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, "ID")
.set(LocalDriverConfiguration.JOB_IDENTIFIER, "ID")
.set(LocalDriverConfiguration.RACK_NAMES, "").build();
final String config = getRuntimeDefinition(new MultiRuntimeDefinitionBuilder().addRuntime(configuration, "local")
.build());
final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder()
.bindNamedParameter(
RuntimeParameters.NodeDescriptorHandler.class,
RuntimesHostTest.TestNodeDescriptorHandler.class)
.bindNamedParameter(
RuntimeParameters.RuntimeStatusHandler.class,
RuntimesHostTest.TestRuntimeStatusHandler.class)
.bindNamedParameter(
RuntimeParameters.ResourceAllocationHandler.class,
RuntimesHostTest.TestResourceAllocationHandler.class);
Injector badInjector = Tang.Factory.getTang().newInjector(cb.build());
final JavaConfigurationBuilder cbtest = Tang.Factory.getTang().newConfigurationBuilder();
cbtest.bindNamedParameter(
SerializedRuntimeDefinition.class,
config);
Injector forked = badInjector.forkInjector(cbtest.build());
final RuntimesHost rHost = forked.getInstance(RuntimesHost.class);
rHost.onRuntimeStart(new RuntimeStart(System.currentTimeMillis()));
}
@Test(expected = RuntimeException.class)
public void testRuntimesHostInitializedWithMissingResourceAllocationHandler() throws InjectionException {
final Configuration configuration = LocalDriverConfiguration.CONF
.set(LocalDriverConfiguration.MAX_NUMBER_OF_EVALUATORS, 1)
.set(LocalDriverConfiguration.ROOT_FOLDER, "")
.set(LocalDriverConfiguration.JVM_HEAP_SLACK, 1024)
.set(LocalDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, "ID")
.set(LocalDriverConfiguration.JOB_IDENTIFIER, "ID")
.set(LocalDriverConfiguration.RACK_NAMES, "").build();
final String config = getRuntimeDefinition(new MultiRuntimeDefinitionBuilder().addRuntime(configuration, "local")
.build());
final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder()
.bindNamedParameter(
RuntimeParameters.NodeDescriptorHandler.class,
RuntimesHostTest.TestNodeDescriptorHandler.class)
.bindNamedParameter(
RuntimeParameters.RuntimeStatusHandler.class,
RuntimesHostTest.TestRuntimeStatusHandler.class)
.bindNamedParameter(
RuntimeParameters.ResourceAllocationHandler.class,
RuntimesHostTest.TestResourceAllocationHandler.class);
Injector badInjector = Tang.Factory.getTang().newInjector(cb.build());
final JavaConfigurationBuilder cbtest = Tang.Factory.getTang().newConfigurationBuilder();
cbtest.bindNamedParameter(
SerializedRuntimeDefinition.class,
config);
Injector forked = badInjector.forkInjector(cbtest.build());
final RuntimesHost rHost = forked.getInstance(RuntimesHost.class);
rHost.onRuntimeStart(new RuntimeStart(System.currentTimeMillis()));
}
@Test
public void testRuntimesHostRoutedToDefaultRuntime() throws InjectionException {
final String serializedConfiguration = getRuntimeDefinition(
new MultiRuntimeDefinitionBuilder().addRuntime(TestDriverConfiguration.CONF.build(), "test")
.build());
final JavaConfigurationBuilder cbtest = Tang.Factory.getTang().newConfigurationBuilder();
cbtest.bindNamedParameter(
SerializedRuntimeDefinition.class,
serializedConfiguration);
Injector forked = injector.forkInjector(cbtest.build());
final RuntimesHost rHost = forked.getInstance(RuntimesHost.class);
rHost.onRuntimeStart(new RuntimeStart(System.currentTimeMillis()));
Assert.assertEquals(1, RuntimesHostTest.commandsQueue.size());
Object obj = RuntimesHostTest.commandsQueue.poll();
Assert.assertTrue(obj instanceof RuntimeStart);
}
private String getRuntimeDefinition(final AvroMultiRuntimeDefinition rd) {
return new MultiRuntimeDefinitionSerializer().toString(rd);
}
static class TestResourceStatusHandler implements EventHandler<ResourceStatusEvent> {
@Inject
TestResourceStatusHandler() {
}
@Override
public void onNext(final ResourceStatusEvent value) {
RuntimesHostTest.commandsQueue.add(value);
}
}
static class TestRuntimeStatusHandler implements EventHandler<RuntimeStatusEvent> {
@Inject
TestRuntimeStatusHandler() {
}
@Override
public void onNext(final RuntimeStatusEvent value) {
RuntimesHostTest.commandsQueue.add(value);
}
}
static class TestNodeDescriptorHandler implements EventHandler<NodeDescriptorEvent> {
@Inject
TestNodeDescriptorHandler() {
}
@Override
public void onNext(final NodeDescriptorEvent value) {
RuntimesHostTest.commandsQueue.add(value);
}
}
static class TestResourceAllocationHandler implements EventHandler<ResourceAllocationEvent> {
@Inject
TestResourceAllocationHandler() {
}
@Override
public void onNext(final ResourceAllocationEvent value) {
RuntimesHostTest.commandsQueue.add(value);
}
}
static class TestDriverConfiguration extends ConfigurationModuleBuilder {
public static final ConfigurationModule CONF = new TestDriverConfiguration()
.bindImplementation(ResourceLaunchHandler.class, TestResourceLaunchHandler.class)
.bindImplementation(ResourceRequestHandler.class, TestResourceRequestHandler.class)
.bindImplementation(ResourceReleaseHandler.class, TestResourceReleaseHandler.class)
.bindImplementation(ResourceManagerStartHandler.class, TestResourceManagerStartHandler.class)
.bindImplementation(ResourceManagerStopHandler.class, TestResourceManagerStopHandler.class).build();
}
private static class TestResourceLaunchHandler implements ResourceLaunchHandler {
@Inject
TestResourceLaunchHandler() {
}
@Override
public void onNext(final ResourceLaunchEvent value) {
RuntimesHostTest.commandsQueue.add(value);
}
}
private static class TestResourceRequestHandler implements ResourceRequestHandler {
@Inject
TestResourceRequestHandler() {
}
@Override
public void onNext(final ResourceRequestEvent value) {
RuntimesHostTest.commandsQueue.add(value);
}
}
private static class TestResourceReleaseHandler implements ResourceReleaseHandler {
@Inject
TestResourceReleaseHandler() {
}
@Override
public void onNext(final ResourceReleaseEvent value) {
RuntimesHostTest.commandsQueue.add(value);
}
}
private static class TestResourceManagerStartHandler implements ResourceManagerStartHandler {
@Inject
TestResourceManagerStartHandler() {
}
@Override
public void onNext(final RuntimeStart value) {
RuntimesHostTest.commandsQueue.add(value);
}
}
private static class TestResourceManagerStopHandler implements ResourceManagerStopHandler {
@Inject
TestResourceManagerStopHandler() {
}
@Override
public void onNext(final RuntimeStop value) {
RuntimesHostTest.commandsQueue.add(value);
}
}
}