blob: 5d05df5844a69337f06ff6a6d611e2e699c31e59 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.clustermanager;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import java.time.Duration;
import org.apache.samza.job.CommandBuilder;
import org.junit.Assert;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class MockClusterResourceManager extends ClusterResourceManager {
final Set<SamzaResource> releasedResources = Collections.synchronizedSet(new HashSet<>());
final List<SamzaResource> resourceRequests = Collections.synchronizedList(new ArrayList<>());
final List<SamzaResourceRequest> cancelledRequests = Collections.synchronizedList(new ArrayList<>());
final List<SamzaResource> launchedResources = Collections.synchronizedList(new ArrayList<>());
final List<MockContainerListener> mockContainerListeners = Collections.synchronizedList(new ArrayList<>());
private final Semaphore requestCountSemaphore = new Semaphore(0);
private final Semaphore launchCountSemaphore = new Semaphore(0);
private final SamzaApplicationState state;
Throwable nextException = null;
MockClusterResourceManager(ClusterResourceManager.Callback callback, SamzaApplicationState state) {
super(callback);
this.state = state;
}
@Override
public void start() {
}
@Override
public void requestResources(SamzaResourceRequest resourceRequest) {
SamzaResource resource = new SamzaResource(resourceRequest.getNumCores(), resourceRequest.getMemoryMB(),
resourceRequest.getPreferredHost(), UUID.randomUUID().toString());
resourceRequests.add(resource);
requestCountSemaphore.release();
clusterManagerCallback.onResourcesAvailable(ImmutableList.of(resource));
}
@Override
public void cancelResourceRequest(SamzaResourceRequest request) {
cancelledRequests.add(request);
}
public boolean awaitResourceRequests(int numExpectedRequests, long val, TimeUnit unit) throws Exception {
return requestCountSemaphore.tryAcquire(numExpectedRequests, val, unit);
}
public boolean awaitContainerLaunch(int numExpectedContainers, long val, TimeUnit unit) throws Exception {
return launchCountSemaphore.tryAcquire(numExpectedContainers, val, unit);
}
@Override
public void releaseResources(SamzaResource resource) {
releasedResources.add(resource);
}
@Override
public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) {
// assert that the resource is in "pending" state prior to invoking this method
Assert.assertTrue(state.pendingProcessors.values().contains(resource));
if (nextException != null) {
clusterManagerCallback.onStreamProcessorLaunchFailure(resource, new ProcessorLaunchException(nextException));
} else {
launchedResources.add(resource);
clusterManagerCallback.onStreamProcessorLaunchSuccess(resource);
}
for (MockContainerListener listener : mockContainerListeners) {
listener.postRunContainer(launchedResources.size());
}
launchCountSemaphore.release();
}
@Override
public void stopStreamProcessor(SamzaResource resource) {
stopStreamProcessor(resource, SamzaResourceStatus.PREEMPTED);
}
@VisibleForTesting
void stopStreamProcessor(SamzaResource resource, int exitCode) {
SamzaResourceStatus status = new SamzaResourceStatus(resource.getContainerId(), "diagnostics", exitCode);
List<SamzaResourceStatus> statList = new ArrayList<>();
statList.add(status);
clusterManagerCallback.onResourcesCompleted(statList);
}
@Override
public boolean isResourceExpired(SamzaResource resource) {
Duration yarnAllocatedResourceExpiry = Duration.ofMinutes(10).minus(Duration.ofSeconds(30));
return System.currentTimeMillis() - resource.getTimestamp() > yarnAllocatedResourceExpiry.toMillis();
}
public void registerContainerListener(MockContainerListener listener) {
mockContainerListeners.add(listener);
}
public void clearContainerListeners() {
mockContainerListeners.clear();
}
public boolean containsReleasedResource(SamzaResource resource) {
return releasedResources.contains(resource);
}
@Override
public void stop(SamzaApplicationState.SamzaAppStatus status) {
}
}