blob: 8c1c4af60419c5f4cc38c35ec25ef8c0fe9a6cef [file] [log] [blame]
package org.apache.helix.integration.task;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.helix.HelixException;
import org.apache.helix.task.Task;
import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskConfig;
import org.apache.helix.task.TaskResult;
import org.apache.helix.task.UserContentStore;
public class MockTask extends UserContentStore implements Task {
public static final String TASK_COMMAND = "Reindex";
public static final String JOB_DELAY = "Delay";
public static final String TASK_RESULT_STATUS = "TaskResultStatus";
public static final String THROW_EXCEPTION = "ThrowException";
public static final String ERROR_MESSAGE = "ErrorMessage";
public static final String FAILURE_COUNT_BEFORE_SUCCESS = "FailureCountBeforeSuccess";
public static final String SUCCESS_COUNT_BEFORE_FAIL = "SuccessCountBeforeFail";
public static final String NOT_ALLOW_TO_CANCEL = "NotAllowToCancel";
public static final String TARGET_PARTITION_CONFIG = "TargetPartitionConfig";
private long _delay;
private volatile boolean _notAllowToCancel;
private volatile boolean _canceled;
private TaskResult.Status _taskResultStatus;
private boolean _throwException;
private int _numOfFailBeforeSuccess;
private int _numOfSuccessBeforeFail;
private String _errorMsg;
public static boolean _signalFail;
public MockTask(TaskCallbackContext context) {
Map<String, String> cfg = context.getJobConfig().getJobCommandConfigMap();
if (cfg == null) {
cfg = new HashMap<>();
}
TaskConfig taskConfig = context.getTaskConfig();
Map<String, String> taskConfigMap = taskConfig.getConfigMap();
if (taskConfigMap != null) {
cfg.putAll(taskConfigMap);
}
_delay = cfg.containsKey(JOB_DELAY) ? Long.parseLong(cfg.get(JOB_DELAY)) : 100L;
_notAllowToCancel = cfg.containsKey(NOT_ALLOW_TO_CANCEL)
? Boolean.parseBoolean(cfg.get(NOT_ALLOW_TO_CANCEL))
: false;
_taskResultStatus = cfg.containsKey(TASK_RESULT_STATUS) ?
TaskResult.Status.valueOf(cfg.get(TASK_RESULT_STATUS)) :
TaskResult.Status.COMPLETED;
_throwException = cfg.containsKey(THROW_EXCEPTION) ?
Boolean.valueOf(cfg.containsKey(THROW_EXCEPTION)) :
false;
_numOfFailBeforeSuccess =
cfg.containsKey(FAILURE_COUNT_BEFORE_SUCCESS) ? Integer.parseInt(
cfg.get(FAILURE_COUNT_BEFORE_SUCCESS)) : 0;
_numOfSuccessBeforeFail = cfg.containsKey(SUCCESS_COUNT_BEFORE_FAIL) ? Integer
.parseInt(cfg.get(SUCCESS_COUNT_BEFORE_FAIL)) : Integer.MAX_VALUE;
_errorMsg = cfg.containsKey(ERROR_MESSAGE) ? cfg.get(ERROR_MESSAGE) : null;
setTargetPartitionsConfigs(cfg, taskConfig.getTargetPartition());
}
// Override configs if there's config for specific target partitions
private void setTargetPartitionsConfigs(Map<String, String> cfg, String targetPartition) {
if (cfg.containsKey(TARGET_PARTITION_CONFIG)) {
Map<String, Map<String, String>> targetPartitionConfigs =
deserializeTargetPartitionConfig(cfg.get(TARGET_PARTITION_CONFIG));
if (targetPartitionConfigs.containsKey(targetPartition)) {
Map<String, String> targetPartitionConfig = targetPartitionConfigs.get(targetPartition);
if (targetPartitionConfig.containsKey(JOB_DELAY)) {
_delay = Long.parseLong(targetPartitionConfig.get(JOB_DELAY));
}
if (targetPartitionConfig.containsKey(TASK_RESULT_STATUS)) {
_taskResultStatus = TaskResult.Status.valueOf(targetPartitionConfig.get(TASK_RESULT_STATUS));
}
}
}
}
public static String serializeTargetPartitionConfig(Map<String, Map<String, String>> config) {
ObjectMapper mapper = new ObjectMapper();
try {
return mapper.writeValueAsString(config);
} catch (IOException e) {
throw new HelixException(e);
}
}
private static Map<String, Map<String, String>> deserializeTargetPartitionConfig(String configString) {
ObjectMapper mapper = new ObjectMapper();
try {
return mapper.readValue(configString, Map.class);
} catch (IOException e) {
throw new HelixException(e);
}
}
@Override
public TaskResult run() {
long expiry = System.currentTimeMillis() + _delay;
long timeLeft;
while (System.currentTimeMillis() < expiry) {
if (_canceled && !_notAllowToCancel) {
timeLeft = expiry - System.currentTimeMillis();
return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
: timeLeft));
}
if (_signalFail) {
return new TaskResult(TaskResult.Status.FAILED, "Signaled to fail.");
}
sleep(10);
}
timeLeft = expiry - System.currentTimeMillis();
if (_throwException) {
_numOfFailBeforeSuccess--;
if (_errorMsg == null) {
_errorMsg = "Test failed";
}
throw new RuntimeException(_errorMsg != null ? _errorMsg : "Test failed");
}
if (getUserContent(SUCCESS_COUNT_BEFORE_FAIL, Scope.WORKFLOW) != null) {
_numOfSuccessBeforeFail =
Integer.parseInt(getUserContent(SUCCESS_COUNT_BEFORE_FAIL, Scope.WORKFLOW));
}
putUserContent(SUCCESS_COUNT_BEFORE_FAIL, "" + --_numOfSuccessBeforeFail, Scope.WORKFLOW);
if (_numOfFailBeforeSuccess > 0 || _numOfSuccessBeforeFail < 0){
_numOfFailBeforeSuccess--;
throw new RuntimeException(_errorMsg != null ? _errorMsg : "Test failed");
}
return new TaskResult(_taskResultStatus,
_errorMsg != null ? _errorMsg : String.valueOf(timeLeft < 0 ? 0 : timeLeft));
}
@Override
public void cancel() {
_canceled = true;
}
private static void sleep(long d) {
try {
Thread.sleep(d);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}