blob: 41d16c6651f063a30953f76f0174506d65c71380 [file] [log] [blame]
package org.apache.helix.integration.task;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.collect.ImmutableMap;
import org.apache.helix.TestHelper;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.TaskConfig;
import org.apache.helix.task.TaskPartitionState;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.Workflow;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestNoDoubleAssign extends TaskTestBase {
private static final int THREAD_COUNT = 10;
private static final long CONNECTION_DELAY = 100L;
private static final long POLL_DELAY = 50L;
private static final String TASK_DURATION = "200";
private static final Random RANDOM = new Random();
private ScheduledExecutorService _executorServicePoll;
private ScheduledExecutorService _executorServiceConnection;
private AtomicBoolean _existsDoubleAssign = new AtomicBoolean(false);
private Set<String> _jobNames = new HashSet<>();
@BeforeClass
public void beforeClass() throws Exception {
_numDbs = 0;
_numPartitions = 0;
_numReplicas = 0;
super.beforeClass();
}
/**
* Tests that no Participants have more tasks from the same job than is specified in the config.
* (MaxConcurrentTaskPerInstance, default value = 1)
* NOTE: this test is supposed to generate a lot of Participant-side ERROR message (ZkClient
* already closed!) because we are disconnecting them on purpose.
*/
@Test
public void testNoDoubleAssign() throws InterruptedException {
// Some arbitrary workload that creates a reasonably large amount of tasks
int workload = 10;
// Create a workflow with jobs and tasks
String workflowName = TestHelper.getTestMethodName();
Workflow.Builder builder = new Workflow.Builder(workflowName);
for (int i = 0; i < workload; i++) {
List<TaskConfig> taskConfigs = new ArrayList<>();
for (int j = 0; j < workload; j++) {
String taskID = "JOB_" + i + "_TASK_" + j;
TaskConfig.Builder taskConfigBuilder = new TaskConfig.Builder();
taskConfigBuilder.setTaskId(taskID).setCommand(MockTask.TASK_COMMAND)
.addConfig(MockTask.JOB_DELAY, TASK_DURATION);
taskConfigs.add(taskConfigBuilder.build());
}
String jobName = "JOB_" + i;
_jobNames.add(workflowName + "_" + jobName); // Add the namespaced job name
JobConfig.Builder jobBuilder =
new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(10000)
.setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG)
.addTaskConfigs(taskConfigs).setIgnoreDependentJobFailure(true)
.setFailureThreshold(100000)
.setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, TASK_DURATION));
builder.addJob(jobName, jobBuilder);
}
// Start the workflow
_driver.start(builder.build());
_driver.pollForWorkflowState(workflowName, TaskState.IN_PROGRESS);
breakConnection();
pollForDoubleAssign();
_driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
// Shut down thread pools
_executorServicePoll.shutdown();
_executorServiceConnection.shutdown();
try {
if (!_executorServicePoll.awaitTermination(180, TimeUnit.SECONDS)) {
_executorServicePoll.shutdownNow();
}
if (!_executorServiceConnection.awaitTermination(180, TimeUnit.SECONDS)) {
_executorServiceConnection.shutdownNow();
}
} catch (InterruptedException e) {
_executorServicePoll.shutdownNow();
_executorServiceConnection.shutdownNow();
}
Assert.assertFalse(_existsDoubleAssign.get());
}
/**
* Fetch the JobContext for all jobs in ZK and check that no two tasks are running on the same
* Participant.
*/
private void pollForDoubleAssign() {
_executorServicePoll = Executors.newScheduledThreadPool(THREAD_COUNT);
_executorServicePoll.scheduleAtFixedRate(() -> {
if (!_existsDoubleAssign.get()) {
// Get JobContexts and test that they are assigned to disparate Participants
for (String job : _jobNames) {
JobContext jobContext = _driver.getJobContext(job);
if (jobContext == null) {
continue;
}
Set<String> instanceCache = new HashSet<>();
for (int partition : jobContext.getPartitionSet()) {
if (jobContext.getPartitionState(partition) == TaskPartitionState.RUNNING) {
String assignedParticipant = jobContext.getAssignedParticipant(partition);
if (assignedParticipant != null) {
if (instanceCache.contains(assignedParticipant)) {
// Two tasks running on the same instance at the same time
_existsDoubleAssign.set(true);
return;
}
instanceCache.add(assignedParticipant);
}
}
}
}
}
}, 0L, POLL_DELAY, TimeUnit.MILLISECONDS);
}
/**
* Randomly causes Participants to lost connection temporarily.
*/
private void breakConnection() {
_executorServiceConnection = Executors.newScheduledThreadPool(THREAD_COUNT);
_executorServiceConnection.scheduleAtFixedRate(() -> {
synchronized (this) {
int participantIndex = RANDOM.nextInt(_numNodes);
stopParticipant(participantIndex);
startParticipant(participantIndex);
}
}, 0L, CONNECTION_DELAY, TimeUnit.MILLISECONDS);
}
}