blob: 4a9fccd4663b5f45ab80ec9e8d4967dc67b66685 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.supervisor;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.EasyMockSupport;
import org.easymock.Mock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@RunWith(EasyMockRunner.class)
public class SupervisorManagerTest extends EasyMockSupport
{
@Mock
private MetadataSupervisorManager metadataSupervisorManager;
@Mock
private Supervisor supervisor1;
@Mock
private Supervisor supervisor2;
@Mock
private Supervisor supervisor3;
private SupervisorManager manager;
@Rule
public final ExpectedException exception = ExpectedException.none();
@Before
public void setUp()
{
manager = new SupervisorManager(metadataSupervisorManager);
}
@Test
public void testCreateUpdateAndRemoveSupervisor()
{
SupervisorSpec spec = new TestSupervisorSpec("id1", supervisor1);
SupervisorSpec spec2 = new TestSupervisorSpec("id1", supervisor2);
Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of(
"id3", new TestSupervisorSpec("id3", supervisor3)
);
Assert.assertTrue(manager.getSupervisorIds().isEmpty());
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
metadataSupervisorManager.insert("id1", spec);
supervisor3.start();
supervisor1.start();
replayAll();
manager.start();
Assert.assertEquals(1, manager.getSupervisorIds().size());
manager.createOrUpdateAndStartSupervisor(spec);
Assert.assertEquals(2, manager.getSupervisorIds().size());
Assert.assertEquals(spec, manager.getSupervisorSpec("id1").get());
verifyAll();
resetAll();
metadataSupervisorManager.insert("id1", spec2);
supervisor2.start();
supervisor1.stop(true);
replayAll();
manager.createOrUpdateAndStartSupervisor(spec2);
Assert.assertEquals(2, manager.getSupervisorIds().size());
Assert.assertEquals(spec2, manager.getSupervisorSpec("id1").get());
verifyAll();
resetAll();
metadataSupervisorManager.insert(EasyMock.eq("id1"), EasyMock.anyObject(NoopSupervisorSpec.class));
supervisor2.stop(true);
replayAll();
boolean retVal = manager.stopAndRemoveSupervisor("id1");
Assert.assertTrue(retVal);
Assert.assertEquals(1, manager.getSupervisorIds().size());
Assert.assertEquals(Optional.absent(), manager.getSupervisorSpec("id1"));
verifyAll();
resetAll();
supervisor3.stop(false);
replayAll();
manager.stop();
verifyAll();
Assert.assertTrue(manager.getSupervisorIds().isEmpty());
}
@Test
public void testCreateOrUpdateAndStartSupervisorNotStarted()
{
exception.expect(IllegalStateException.class);
manager.createOrUpdateAndStartSupervisor(new TestSupervisorSpec("id", null));
}
@Test
public void testCreateOrUpdateAndStartSupervisorNullSpec()
{
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(ImmutableMap.of());
replayAll();
exception.expect(NullPointerException.class);
manager.start();
manager.createOrUpdateAndStartSupervisor(null);
verifyAll();
}
@Test
public void testCreateOrUpdateAndStartSupervisorNullSpecId()
{
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(ImmutableMap.of());
replayAll();
exception.expect(NullPointerException.class);
manager.start();
manager.createOrUpdateAndStartSupervisor(new TestSupervisorSpec(null, null));
verifyAll();
}
@Test
public void testStopAndRemoveSupervisorNotStarted()
{
exception.expect(IllegalStateException.class);
manager.stopAndRemoveSupervisor("id");
}
@Test
public void testStopAndRemoveSupervisorNullSpecId()
{
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(ImmutableMap.of());
replayAll();
exception.expect(NullPointerException.class);
manager.start();
manager.stopAndRemoveSupervisor(null);
verifyAll();
}
@Test
public void testGetSupervisorHistory()
{
Map<String, List<VersionedSupervisorSpec>> supervisorHistory = ImmutableMap.of();
EasyMock.expect(metadataSupervisorManager.getAll()).andReturn(supervisorHistory);
replayAll();
Map<String, List<VersionedSupervisorSpec>> history = manager.getSupervisorHistory();
verifyAll();
Assert.assertEquals(supervisorHistory, history);
}
@Test
public void testGetSupervisorHistoryForId()
{
String id = "test-supervisor-1";
List<VersionedSupervisorSpec> supervisorHistory = ImmutableList.of();
EasyMock.expect(metadataSupervisorManager.getAllForId(id)).andReturn(supervisorHistory);
replayAll();
List<VersionedSupervisorSpec> history = manager.getSupervisorHistoryForId(id);
verifyAll();
Assert.assertEquals(supervisorHistory, history);
}
@Test
public void testGetSupervisorStatus()
{
SupervisorReport<Void> report = new SupervisorReport<>("id1", DateTimes.nowUtc(), null);
Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of(
"id1", new TestSupervisorSpec("id1", supervisor1)
);
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
EasyMock.expect(supervisor1.getStatus()).andReturn(report);
replayAll();
manager.start();
Assert.assertEquals(Optional.absent(), manager.getSupervisorStatus("non-existent-id"));
Assert.assertEquals(report, manager.getSupervisorStatus("id1").get());
verifyAll();
}
@Test
public void testStartAlreadyStarted()
{
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(ImmutableMap.of());
replayAll();
exception.expect(IllegalStateException.class);
manager.start();
manager.start();
}
@Test
public void testStartIndividualSupervisorsFailStart()
{
Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of(
"id1", new TestSupervisorSpec("id1", supervisor1),
"id3", new TestSupervisorSpec("id3", supervisor3)
);
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor3.start();
supervisor1.start();
EasyMock.expectLastCall().andThrow(new RuntimeException("supervisor explosion"));
replayAll();
manager.start();
// if we get here, we are properly insulated from exploding supervisors
}
@Test
public void testNoPersistOnFailedStart()
{
exception.expect(RuntimeException.class);
Capture<TestSupervisorSpec> capturedInsert = Capture.newInstance();
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(Collections.emptyMap());
metadataSupervisorManager.insert(EasyMock.eq("id1"), EasyMock.capture(capturedInsert));
supervisor1.start();
supervisor1.stop(true);
supervisor2.start();
EasyMock.expectLastCall().andThrow(new RuntimeException("supervisor failed to start"));
replayAll();
final SupervisorSpec testSpecOld = new TestSupervisorSpec("id1", supervisor1);
final SupervisorSpec testSpecNew = new TestSupervisorSpec("id1", supervisor2);
manager.start();
try {
manager.createOrUpdateAndStartSupervisor(testSpecOld);
manager.createOrUpdateAndStartSupervisor(testSpecNew);
}
catch (Exception e) {
Assert.assertEquals(testSpecOld, capturedInsert.getValue());
throw e;
}
}
@Test
public void testStopThrowsException()
{
Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of(
"id1", new TestSupervisorSpec("id1", supervisor1)
);
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
supervisor1.stop(false);
EasyMock.expectLastCall().andThrow(new RuntimeException("RTE"));
replayAll();
manager.start();
manager.stop();
verifyAll();
}
@Test
public void testResetSupervisor()
{
Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of(
"id1", new TestSupervisorSpec("id1", supervisor1)
);
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
supervisor1.reset(EasyMock.anyObject(DataSourceMetadata.class));
replayAll();
manager.start();
Assert.assertTrue("resetValidSupervisor", manager.resetSupervisor("id1", null));
Assert.assertFalse("resetInvalidSupervisor", manager.resetSupervisor("nobody_home", null));
verifyAll();
}
@Test
public void testResetSupervisorWithSpecificOffsets()
{
Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of(
"id1", new TestSupervisorSpec("id1", supervisor1)
);
DataSourceMetadata datasourceMetadata = new TestSeekableStreamDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(
"topic",
ImmutableMap.of("0", "10", "1", "20", "2", "30"),
ImmutableSet.of()
)
);
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
supervisor1.resetOffsets(datasourceMetadata);
replayAll();
manager.start();
Assert.assertTrue("resetValidSupervisor", manager.resetSupervisor("id1", datasourceMetadata));
Assert.assertFalse("resetInvalidSupervisor", manager.resetSupervisor("nobody_home", datasourceMetadata));
verifyAll();
}
@Test
public void testCreateSuspendResumeAndStopSupervisor()
{
Capture<TestSupervisorSpec> capturedInsert = Capture.newInstance();
SupervisorSpec spec = new TestSupervisorSpec("id1", supervisor1, false, supervisor2);
Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of(
"id3", new TestSupervisorSpec("id3", supervisor3)
);
// mock adding a supervisor to manager with existing supervisor then suspending it
Assert.assertTrue(manager.getSupervisorIds().isEmpty());
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
metadataSupervisorManager.insert("id1", spec);
supervisor3.start();
supervisor1.start();
replayAll();
manager.start();
Assert.assertEquals(1, manager.getSupervisorIds().size());
manager.createOrUpdateAndStartSupervisor(spec);
Assert.assertEquals(2, manager.getSupervisorIds().size());
Assert.assertEquals(spec, manager.getSupervisorSpec("id1").get());
verifyAll();
// mock suspend, which stops supervisor1 and sets suspended state in metadata, flipping to supervisor2
// in TestSupervisorSpec implementation of createSuspendedSpec
resetAll();
metadataSupervisorManager.insert(EasyMock.eq("id1"), EasyMock.capture(capturedInsert));
supervisor2.start();
supervisor1.stop(true);
replayAll();
manager.suspendOrResumeSupervisor("id1", true);
Assert.assertEquals(2, manager.getSupervisorIds().size());
Assert.assertEquals(capturedInsert.getValue(), manager.getSupervisorSpec("id1").get());
Assert.assertTrue(capturedInsert.getValue().suspended);
verifyAll();
// mock resume, which stops supervisor2 and sets suspended to false in metadata, flipping to supervisor1
// in TestSupervisorSpec implementation of createRunningSpec
resetAll();
metadataSupervisorManager.insert(EasyMock.eq("id1"), EasyMock.capture(capturedInsert));
supervisor2.stop(true);
supervisor1.start();
replayAll();
manager.suspendOrResumeSupervisor("id1", false);
Assert.assertEquals(2, manager.getSupervisorIds().size());
Assert.assertEquals(capturedInsert.getValue(), manager.getSupervisorSpec("id1").get());
Assert.assertFalse(capturedInsert.getValue().suspended);
verifyAll();
// mock stop of suspended then resumed supervisor
resetAll();
metadataSupervisorManager.insert(EasyMock.eq("id1"), EasyMock.anyObject(NoopSupervisorSpec.class));
supervisor1.stop(true);
replayAll();
boolean retVal = manager.stopAndRemoveSupervisor("id1");
Assert.assertTrue(retVal);
Assert.assertEquals(1, manager.getSupervisorIds().size());
Assert.assertEquals(Optional.absent(), manager.getSupervisorSpec("id1"));
verifyAll();
// mock manager shutdown to ensure supervisor 3 stops
resetAll();
supervisor3.stop(false);
replayAll();
manager.stop();
verifyAll();
Assert.assertTrue(manager.getSupervisorIds().isEmpty());
}
@Test
public void testGetActiveSupervisorIdForDatasourceWithAppendLock()
{
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(Collections.emptyMap());
NoopSupervisorSpec noopSupervisorSpec = new NoopSupervisorSpec("noop", ImmutableList.of("noopDS"));
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());
SeekableStreamSupervisorSpec suspendedSpec = EasyMock.mock(SeekableStreamSupervisorSpec.class);
Supervisor suspendedSupervisor = EasyMock.mock(SeekableStreamSupervisor.class);
EasyMock.expect(suspendedSpec.getId()).andReturn("suspendedSpec").anyTimes();
EasyMock.expect(suspendedSpec.isSuspended()).andReturn(true).anyTimes();
EasyMock.expect(suspendedSpec.getDataSources()).andReturn(ImmutableList.of("suspendedDS")).anyTimes();
EasyMock.expect(suspendedSpec.createSupervisor()).andReturn(suspendedSupervisor).anyTimes();
EasyMock.expect(suspendedSpec.createAutoscaler(suspendedSupervisor)).andReturn(null).anyTimes();
EasyMock.expect(suspendedSpec.getContext()).andReturn(null).anyTimes();
EasyMock.replay(suspendedSpec);
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());
SeekableStreamSupervisorSpec activeSpec = EasyMock.mock(SeekableStreamSupervisorSpec.class);
Supervisor activeSupervisor = EasyMock.mock(SeekableStreamSupervisor.class);
EasyMock.expect(activeSpec.getId()).andReturn("activeSpec").anyTimes();
EasyMock.expect(activeSpec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(activeSpec.getDataSources()).andReturn(ImmutableList.of("activeDS")).anyTimes();
EasyMock.expect(activeSpec.createSupervisor()).andReturn(activeSupervisor).anyTimes();
EasyMock.expect(activeSpec.createAutoscaler(activeSupervisor)).andReturn(null).anyTimes();
EasyMock.expect(activeSpec.getContext()).andReturn(null).anyTimes();
EasyMock.replay(activeSpec);
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());
SeekableStreamSupervisorSpec activeSpecWithConcurrentLocks = EasyMock.mock(SeekableStreamSupervisorSpec.class);
Supervisor activeSupervisorWithConcurrentLocks = EasyMock.mock(SeekableStreamSupervisor.class);
EasyMock.expect(activeSpecWithConcurrentLocks.getId()).andReturn("activeSpecWithConcurrentLocks").anyTimes();
EasyMock.expect(activeSpecWithConcurrentLocks.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(activeSpecWithConcurrentLocks.getDataSources())
.andReturn(ImmutableList.of("activeConcurrentLocksDS")).anyTimes();
EasyMock.expect(activeSpecWithConcurrentLocks.createSupervisor())
.andReturn(activeSupervisorWithConcurrentLocks).anyTimes();
EasyMock.expect(activeSpecWithConcurrentLocks.createAutoscaler(activeSupervisorWithConcurrentLocks))
.andReturn(null).anyTimes();
EasyMock.expect(activeSpecWithConcurrentLocks.getContext())
.andReturn(ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, true)).anyTimes();
EasyMock.replay(activeSpecWithConcurrentLocks);
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());
SeekableStreamSupervisorSpec activeAppendSpec = EasyMock.mock(SeekableStreamSupervisorSpec.class);
Supervisor activeAppendSupervisor = EasyMock.mock(SeekableStreamSupervisor.class);
EasyMock.expect(activeAppendSpec.getId()).andReturn("activeAppendSpec").anyTimes();
EasyMock.expect(activeAppendSpec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(activeAppendSpec.getDataSources()).andReturn(ImmutableList.of("activeAppendDS")).anyTimes();
EasyMock.expect(activeAppendSpec.createSupervisor()).andReturn(activeAppendSupervisor).anyTimes();
EasyMock.expect(activeAppendSpec.createAutoscaler(activeAppendSupervisor)).andReturn(null).anyTimes();
EasyMock.expect(activeAppendSpec.getContext()).andReturn(ImmutableMap.of(
Tasks.TASK_LOCK_TYPE,
TaskLockType.APPEND.name()
)).anyTimes();
EasyMock.replay(activeAppendSpec);
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());
// A supervisor with useConcurrentLocks set to false explicitly must not use an append lock
SeekableStreamSupervisorSpec specWithUseConcurrentLocksFalse = EasyMock.mock(SeekableStreamSupervisorSpec.class);
Supervisor supervisorWithUseConcurrentLocksFalse = EasyMock.mock(SeekableStreamSupervisor.class);
EasyMock.expect(specWithUseConcurrentLocksFalse.getId()).andReturn("useConcurrentLocksFalse").anyTimes();
EasyMock.expect(specWithUseConcurrentLocksFalse.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(specWithUseConcurrentLocksFalse.getDataSources())
.andReturn(ImmutableList.of("dsWithuseConcurrentLocksFalse")).anyTimes();
EasyMock.expect(specWithUseConcurrentLocksFalse.createSupervisor()).andReturn(supervisorWithUseConcurrentLocksFalse).anyTimes();
EasyMock.expect(specWithUseConcurrentLocksFalse.createAutoscaler(supervisorWithUseConcurrentLocksFalse))
.andReturn(null).anyTimes();
EasyMock.expect(specWithUseConcurrentLocksFalse.getContext()).andReturn(ImmutableMap.of(
Tasks.USE_CONCURRENT_LOCKS,
false,
Tasks.TASK_LOCK_TYPE,
TaskLockType.APPEND.name()
)).anyTimes();
EasyMock.replay(specWithUseConcurrentLocksFalse);
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());
replayAll();
manager.start();
Assert.assertFalse(manager.getActiveSupervisorIdForDatasourceWithAppendLock("nonExistent").isPresent());
manager.createOrUpdateAndStartSupervisor(noopSupervisorSpec);
Assert.assertFalse(manager.getActiveSupervisorIdForDatasourceWithAppendLock("noopDS").isPresent());
manager.createOrUpdateAndStartSupervisor(suspendedSpec);
Assert.assertFalse(manager.getActiveSupervisorIdForDatasourceWithAppendLock("suspendedDS").isPresent());
manager.createOrUpdateAndStartSupervisor(activeSpec);
Assert.assertFalse(manager.getActiveSupervisorIdForDatasourceWithAppendLock("activeDS").isPresent());
manager.createOrUpdateAndStartSupervisor(activeAppendSpec);
Assert.assertTrue(manager.getActiveSupervisorIdForDatasourceWithAppendLock("activeAppendDS").isPresent());
manager.createOrUpdateAndStartSupervisor(activeSpecWithConcurrentLocks);
Assert.assertTrue(manager.getActiveSupervisorIdForDatasourceWithAppendLock("activeConcurrentLocksDS").isPresent());
manager.createOrUpdateAndStartSupervisor(specWithUseConcurrentLocksFalse);
Assert.assertFalse(
manager.getActiveSupervisorIdForDatasourceWithAppendLock("dsWithUseConcurrentLocksFalse").isPresent()
);
verifyAll();
}
@Test
public void testRegisterUpgradedPendingSegmentOnSupervisor()
{
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(Collections.emptyMap());
NoopSupervisorSpec noopSpec = new NoopSupervisorSpec("noop", ImmutableList.of("noopDS"));
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());
SeekableStreamSupervisorSpec streamingSpec = EasyMock.mock(SeekableStreamSupervisorSpec.class);
SeekableStreamSupervisor streamSupervisor = EasyMock.mock(SeekableStreamSupervisor.class);
streamSupervisor.registerNewVersionOfPendingSegment(EasyMock.anyObject());
EasyMock.expectLastCall().once();
EasyMock.expect(streamingSpec.getId()).andReturn("sss").anyTimes();
EasyMock.expect(streamingSpec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(streamingSpec.getDataSources()).andReturn(ImmutableList.of("DS")).anyTimes();
EasyMock.expect(streamingSpec.createSupervisor()).andReturn(streamSupervisor).anyTimes();
EasyMock.expect(streamingSpec.createAutoscaler(streamSupervisor)).andReturn(null).anyTimes();
EasyMock.expect(streamingSpec.getContext()).andReturn(null).anyTimes();
EasyMock.replay(streamingSpec);
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());
EasyMock.expectLastCall().once();
replayAll();
final PendingSegmentRecord pendingSegment = new PendingSegmentRecord(
new SegmentIdWithShardSpec(
"DS",
Intervals.ETERNITY,
"version",
new NumberedShardSpec(0, 0)
),
"sequenceName",
"prevSegmentId",
"upgradedFromSegmentId",
"taskAllocatorId"
);
manager.start();
manager.createOrUpdateAndStartSupervisor(noopSpec);
Assert.assertFalse(manager.registerUpgradedPendingSegmentOnSupervisor("noop", pendingSegment));
manager.createOrUpdateAndStartSupervisor(streamingSpec);
Assert.assertTrue(manager.registerUpgradedPendingSegmentOnSupervisor("sss", pendingSegment));
verifyAll();
}
private static class TestSupervisorSpec implements SupervisorSpec
{
private final String id;
private final Supervisor supervisor;
private final boolean suspended;
private final Supervisor suspendedSupervisor;
TestSupervisorSpec(String id, Supervisor supervisor)
{
this(id, supervisor, false, null);
}
TestSupervisorSpec(String id, Supervisor supervisor, boolean suspended, Supervisor suspendedSupervisor)
{
this.id = id;
this.supervisor = supervisor;
this.suspended = suspended;
this.suspendedSupervisor = suspendedSupervisor;
}
@Override
public SupervisorSpec createSuspendedSpec()
{
return new TestSupervisorSpec(id, suspendedSupervisor, true, supervisor);
}
@Override
public SupervisorSpec createRunningSpec()
{
return new TestSupervisorSpec(id, suspendedSupervisor, false, supervisor);
}
@Override
public String getId()
{
return id;
}
@Override
public Supervisor createSupervisor()
{
return supervisor;
}
@Override
public boolean isSuspended()
{
return suspended;
}
@Override
public String getType()
{
return null;
}
@Override
public String getSource()
{
return null;
}
@Override
public List<String> getDataSources()
{
return new ArrayList<>();
}
}
}