blob: 32d1fb604f2ce66ac4d3f2c9e094a77008a6e682 [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.serviceplugins.api.ServicePluginContextBase;
import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
public class ErrorPluginConfiguration {
public static final String REPORT_FATAL_ERROR_MESSAGE = "ReportedFatalError";
public static final String REPORT_NONFATAL_ERROR_MESSAGE = "ReportedError";
public static final String THROW_ERROR_EXCEPTION_STRING = "Simulated Error";
private static final String CONF_THROW_ERROR = "throw.error";
private static final String CONF_REPORT_ERROR = "report.error";
private static final String CONF_REPORT_ERROR_FATAL = "report.error.fatal";
private static final String CONF_REPORT_ERROR_DAG_NAME = "report.error.dag.name";
private final HashMap<String, String> kv;
private ErrorPluginConfiguration() {
this.kv = new HashMap<>();
}
private ErrorPluginConfiguration(HashMap<String, String> map) {
this.kv = map;
}
public static ErrorPluginConfiguration createThrowErrorConf() {
ErrorPluginConfiguration conf = new ErrorPluginConfiguration();
conf.kv.put(CONF_THROW_ERROR, String.valueOf(true));
return conf;
}
public static ErrorPluginConfiguration createReportFatalErrorConf(String dagName) {
ErrorPluginConfiguration conf = new ErrorPluginConfiguration();
conf.kv.put(CONF_REPORT_ERROR, String.valueOf(true));
conf.kv.put(CONF_REPORT_ERROR_FATAL, String.valueOf(true));
conf.kv.put(CONF_REPORT_ERROR_DAG_NAME, dagName);
return conf;
}
public static ErrorPluginConfiguration createReportNonFatalErrorConf(String dagName) {
ErrorPluginConfiguration conf = new ErrorPluginConfiguration();
conf.kv.put(CONF_REPORT_ERROR, String.valueOf(true));
conf.kv.put(CONF_REPORT_ERROR_FATAL, String.valueOf(false));
conf.kv.put(CONF_REPORT_ERROR_DAG_NAME, dagName);
return conf;
}
public static UserPayload toUserPayload(ErrorPluginConfiguration conf) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(conf.kv);
oos.close();
UserPayload userPayload = UserPayload.create(ByteBuffer.wrap(baos.toByteArray()));
return userPayload;
}
@SuppressWarnings("unchecked")
public static ErrorPluginConfiguration toErrorPluginConfiguration(UserPayload userPayload) throws
IOException, ClassNotFoundException {
byte[] b = new byte[userPayload.getPayload().remaining()];
userPayload.getPayload().get(b);
ByteArrayInputStream bais = new ByteArrayInputStream(b);
ObjectInputStream ois = new ObjectInputStream(bais);
HashMap<String, String> map = (HashMap) ois.readObject();
ErrorPluginConfiguration conf = new ErrorPluginConfiguration(map);
return conf;
}
public boolean shouldThrowError() {
return (kv.containsKey(CONF_THROW_ERROR) && Boolean.parseBoolean(kv.get(CONF_THROW_ERROR)));
}
public boolean shouldReportFatalError(String dagName) {
if (kv.containsKey(CONF_REPORT_ERROR) && Boolean.parseBoolean(kv.get(CONF_REPORT_ERROR)) &&
Boolean.parseBoolean(kv.get(CONF_REPORT_ERROR_FATAL))) {
if (dagName == null || dagName.isEmpty() || kv.get(CONF_REPORT_ERROR_DAG_NAME).equals("*") ||
kv.get(CONF_REPORT_ERROR_DAG_NAME).equals(dagName)) {
return true;
}
}
return false;
}
public boolean shouldReportNonFatalError(String dagName) {
if (kv.containsKey(CONF_REPORT_ERROR) && Boolean.parseBoolean(kv.get(CONF_REPORT_ERROR)) &&
Boolean.parseBoolean(kv.get(CONF_REPORT_ERROR_FATAL)) == false) {
if (dagName == null || dagName.isEmpty() || kv.get(CONF_REPORT_ERROR_DAG_NAME).equals("*") ||
kv.get(CONF_REPORT_ERROR_DAG_NAME).equals(dagName)) {
return true;
}
}
return false;
}
public static void processError(ErrorPluginConfiguration conf, ServicePluginContextBase context) {
if (conf.shouldThrowError()) {
throw new RuntimeException(ErrorPluginConfiguration.THROW_ERROR_EXCEPTION_STRING);
} else if (conf.shouldReportFatalError(null)) {
context.reportError(ServicePluginErrorDefaults.INCONSISTENT_STATE,
ErrorPluginConfiguration.REPORT_FATAL_ERROR_MESSAGE,
context.getCurrentDagInfo());
} else if (conf.shouldReportNonFatalError(null)) {
context.reportError(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE,
ErrorPluginConfiguration.REPORT_NONFATAL_ERROR_MESSAGE,
context.getCurrentDagInfo());
}
}
}