blob: e24df910e1c6a12c7ea3fb7c18b15d2ec617f2f3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import org.apache.druid.indexer.HadoopDruidDetermineConfigurationJob;
import org.apache.druid.indexer.HadoopDruidIndexerConfig;
import org.apache.druid.indexer.HadoopDruidIndexerJob;
import org.apache.druid.indexer.HadoopIngestionSpec;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.MetadataStorageUpdaterJobHandler;
import org.apache.druid.indexer.TaskMetricsGetter;
import org.apache.druid.indexer.TaskMetricsUtils;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.hadoop.OverlordActionBasedUsedSegmentsRetriever;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.util.ToolRunner;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.File;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
public class HadoopIndexTask extends HadoopTask implements ChatHandler
{
private static final Logger log = new Logger(HadoopIndexTask.class);
private static final String HADOOP_JOB_ID_FILENAME = "mapReduceJobId.json";
private static final String TYPE = "index_hadoop";
private TaskConfig taskConfig = null;
private static String getTheDataSource(HadoopIngestionSpec spec)
{
return spec.getDataSchema().getDataSource();
}
@JsonIgnore
private HadoopIngestionSpec spec;
@JsonIgnore
private final String classpathPrefix;
@JsonIgnore
private final ObjectMapper jsonMapper;
@JsonIgnore
private final AuthorizerMapper authorizerMapper;
@JsonIgnore
private final Optional<ChatHandlerProvider> chatHandlerProvider;
@JsonIgnore
private InnerProcessingStatsGetter determinePartitionsStatsGetter;
@JsonIgnore
private InnerProcessingStatsGetter buildSegmentsStatsGetter;
@JsonIgnore
private IngestionState ingestionState;
@JsonIgnore
private HadoopDetermineConfigInnerProcessingStatus determineConfigStatus = null;
@JsonIgnore
private HadoopIndexGeneratorInnerProcessingStatus buildSegmentsStatus = null;
@JsonIgnore
private String errorMsg;
/**
* @param spec is used by the HadoopDruidIndexerJob to set up the appropriate parameters
* for creating Druid index segments. It may be modified.
* <p/>
* Here, we will ensure that the DbConnectorConfig field of the spec is set to null, such that the
* job does not push a list of published segments the database. Instead, we will use the method
* IndexGeneratorJob.getPublishedSegments() to simply return a list of the published
* segments, and let the indexing service report these segments to the database.
*/
@JsonCreator
public HadoopIndexTask(
@JsonProperty("id") String id,
@JsonProperty("spec") HadoopIngestionSpec spec,
@JsonProperty("hadoopCoordinates") String hadoopCoordinates,
@JsonProperty("hadoopDependencyCoordinates") List<String> hadoopDependencyCoordinates,
@JsonProperty("classpathPrefix") String classpathPrefix,
@JacksonInject ObjectMapper jsonMapper,
@JsonProperty("context") Map<String, Object> context,
@JacksonInject AuthorizerMapper authorizerMapper,
@JacksonInject ChatHandlerProvider chatHandlerProvider
)
{
super(
getOrMakeId(id, TYPE, getTheDataSource(spec)),
getTheDataSource(spec),
hadoopDependencyCoordinates == null
? (hadoopCoordinates == null ? null : ImmutableList.of(hadoopCoordinates))
: hadoopDependencyCoordinates,
context
);
this.authorizerMapper = authorizerMapper;
this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
this.spec = spec;
// Some HadoopIngestionSpec stuff doesn't make sense in the context of the indexing service
Preconditions.checkArgument(
this.spec.getIOConfig().getSegmentOutputPath() == null,
"segmentOutputPath must be absent"
);
Preconditions.checkArgument(this.spec.getTuningConfig().getWorkingPath() == null, "workingPath must be absent");
Preconditions.checkArgument(
this.spec.getIOConfig().getMetadataUpdateSpec() == null,
"metadataUpdateSpec must be absent"
);
this.classpathPrefix = classpathPrefix;
this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "null ObjectMappper");
this.ingestionState = IngestionState.NOT_STARTED;
}
@Override
public String getType()
{
return "index_hadoop";
}
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
Optional<SortedSet<Interval>> intervals = spec.getDataSchema().getGranularitySpec().bucketIntervals();
if (intervals.isPresent()) {
Interval interval = JodaUtils.umbrellaInterval(
JodaUtils.condenseIntervals(
intervals.get()
)
);
return taskActionClient.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, interval)) != null;
} else {
return true;
}
}
@Override
public boolean requireLockExistingSegments()
{
throw new UnsupportedOperationException();
}
@Override
public List<DataSegment> findSegmentsToLock(TaskActionClient taskActionClient, List<Interval> intervals)
{
throw new UnsupportedOperationException();
}
@Override
public boolean isPerfectRollup()
{
return true;
}
@Nullable
@Override
public Granularity getSegmentGranularity()
{
final GranularitySpec granularitySpec = spec.getDataSchema().getGranularitySpec();
if (granularitySpec instanceof ArbitraryGranularitySpec) {
return null;
} else {
return granularitySpec.getSegmentGranularity();
}
}
@JsonProperty("spec")
public HadoopIngestionSpec getSpec()
{
return spec;
}
@Override
@JsonProperty
public List<String> getHadoopDependencyCoordinates()
{
return super.getHadoopDependencyCoordinates();
}
@JsonProperty
@Override
public String getClasspathPrefix()
{
return classpathPrefix;
}
private String getHadoopJobIdFileName()
{
return getHadoopJobIdFile().getAbsolutePath();
}
private boolean hadoopJobIdFileExists()
{
return getHadoopJobIdFile().exists();
}
private File getHadoopJobIdFile()
{
return new File(taskConfig.getTaskDir(getId()), HADOOP_JOB_ID_FILENAME);
}
@Override
public TaskStatus runTask(TaskToolbox toolbox)
{
try {
taskConfig = toolbox.getConfig();
if (chatHandlerProvider.isPresent()) {
log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName());
chatHandlerProvider.get().register(getId(), this, false);
} else {
log.warn("No chat handler detected");
}
return runInternal(toolbox);
}
catch (Exception e) {
Throwable effectiveException;
if (e instanceof RuntimeException && e.getCause() instanceof InvocationTargetException) {
InvocationTargetException ite = (InvocationTargetException) e.getCause();
effectiveException = ite.getCause();
log.error(effectiveException, "Got invocation target exception in run()");
} else {
effectiveException = e;
log.error(e, "Encountered exception in run()");
}
errorMsg = Throwables.getStackTraceAsString(effectiveException);
toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
return TaskStatus.failure(
getId(),
errorMsg
);
}
finally {
if (chatHandlerProvider.isPresent()) {
chatHandlerProvider.get().unregister(getId());
}
}
}
@SuppressWarnings("unchecked")
private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
{
registerResourceCloserOnAbnormalExit(config -> killHadoopJob());
String hadoopJobIdFile = getHadoopJobIdFileName();
final ClassLoader loader = buildClassLoader(toolbox);
boolean determineIntervals = !spec.getDataSchema().getGranularitySpec().bucketIntervals().isPresent();
HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed(
spec,
jsonMapper,
new OverlordActionBasedUsedSegmentsRetriever(toolbox)
);
Object determinePartitionsInnerProcessingRunner = getForeignClassloaderObject(
"org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopDetermineConfigInnerProcessingRunner",
loader
);
determinePartitionsStatsGetter = new InnerProcessingStatsGetter(determinePartitionsInnerProcessingRunner);
String[] determinePartitionsInput = new String[]{
toolbox.getJsonMapper().writeValueAsString(spec),
toolbox.getConfig().getHadoopWorkingPath(),
toolbox.getSegmentPusher().getPathForHadoop(),
hadoopJobIdFile
};
HadoopIngestionSpec indexerSchema;
final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
Class<?> determinePartitionsRunnerClass = determinePartitionsInnerProcessingRunner.getClass();
Method determinePartitionsInnerProcessingRunTask = determinePartitionsRunnerClass.getMethod(
"runTask",
determinePartitionsInput.getClass()
);
try {
Thread.currentThread().setContextClassLoader(loader);
ingestionState = IngestionState.DETERMINE_PARTITIONS;
final String determineConfigStatusString = (String) determinePartitionsInnerProcessingRunTask.invoke(
determinePartitionsInnerProcessingRunner,
new Object[]{determinePartitionsInput}
);
determineConfigStatus = toolbox
.getJsonMapper()
.readValue(determineConfigStatusString, HadoopDetermineConfigInnerProcessingStatus.class);
indexerSchema = determineConfigStatus.getSchema();
if (indexerSchema == null) {
errorMsg = determineConfigStatus.getErrorMsg();
toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
return TaskStatus.failure(
getId(),
errorMsg
);
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
finally {
Thread.currentThread().setContextClassLoader(oldLoader);
}
// We should have a lock from before we started running only if interval was specified
String version;
if (determineIntervals) {
Interval interval = JodaUtils.umbrellaInterval(
JodaUtils.condenseIntervals(
indexerSchema.getDataSchema().getGranularitySpec().bucketIntervals().get()
)
);
final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS);
// Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, the below line can incur http timeout error.
final TaskLock lock = Preconditions.checkNotNull(
toolbox.getTaskActionClient().submit(
new TimeChunkLockAcquireAction(TaskLockType.EXCLUSIVE, interval, lockTimeoutMs)
),
"Cannot acquire a lock for interval[%s]", interval
);
version = lock.getVersion();
} else {
Iterable<TaskLock> locks = getTaskLocks(toolbox.getTaskActionClient());
final TaskLock myLock = Iterables.getOnlyElement(locks);
version = myLock.getVersion();
}
final String specVersion = indexerSchema.getTuningConfig().getVersion();
if (indexerSchema.getTuningConfig().isUseExplicitVersion()) {
if (specVersion.compareTo(version) < 0) {
version = specVersion;
} else {
log.error(
"Spec version can not be greater than or equal to the lock version, Spec version: [%s] Lock version: [%s].",
specVersion,
version
);
toolbox.getTaskReportFileWriter().write(getId(), null);
return TaskStatus.failure(getId());
}
}
log.info("Setting version to: %s", version);
Object innerProcessingRunner = getForeignClassloaderObject(
"org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopIndexGeneratorInnerProcessingRunner",
loader
);
buildSegmentsStatsGetter = new InnerProcessingStatsGetter(innerProcessingRunner);
String[] buildSegmentsInput = new String[]{
toolbox.getJsonMapper().writeValueAsString(indexerSchema),
version,
hadoopJobIdFile
};
Class<?> buildSegmentsRunnerClass = innerProcessingRunner.getClass();
Method innerProcessingRunTask = buildSegmentsRunnerClass.getMethod("runTask", buildSegmentsInput.getClass());
try {
Thread.currentThread().setContextClassLoader(loader);
ingestionState = IngestionState.BUILD_SEGMENTS;
final String jobStatusString = (String) innerProcessingRunTask.invoke(
innerProcessingRunner,
new Object[]{buildSegmentsInput}
);
buildSegmentsStatus = toolbox.getJsonMapper().readValue(
jobStatusString,
HadoopIndexGeneratorInnerProcessingStatus.class
);
if (buildSegmentsStatus.getDataSegments() != null) {
ingestionState = IngestionState.COMPLETED;
toolbox.publishSegments(buildSegmentsStatus.getDataSegments());
toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
return TaskStatus.success(getId());
} else {
errorMsg = buildSegmentsStatus.getErrorMsg();
toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
return TaskStatus.failure(
getId(),
errorMsg
);
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
finally {
Thread.currentThread().setContextClassLoader(oldLoader);
}
}
private void killHadoopJob()
{
// To avoid issue of kill command once the ingestion task is actually completed
if (hadoopJobIdFileExists() && !ingestionState.equals(IngestionState.COMPLETED)) {
final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
String hadoopJobIdFile = getHadoopJobIdFileName();
try {
ClassLoader loader = HadoopTask.buildClassLoader(
getHadoopDependencyCoordinates(),
taskConfig.getDefaultHadoopCoordinates()
);
Object killMRJobInnerProcessingRunner = getForeignClassloaderObject(
"org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopKillMRJobIdProcessingRunner",
loader
);
String[] buildKillJobInput = new String[]{hadoopJobIdFile};
Class<?> buildKillJobRunnerClass = killMRJobInnerProcessingRunner.getClass();
Method innerProcessingRunTask = buildKillJobRunnerClass.getMethod("runTask", buildKillJobInput.getClass());
Thread.currentThread().setContextClassLoader(loader);
final String[] killStatusString = (String[]) innerProcessingRunTask.invoke(
killMRJobInnerProcessingRunner,
new Object[]{buildKillJobInput}
);
log.info(StringUtils.format("Tried killing job: [%s], status: [%s]", killStatusString[0], killStatusString[1]));
}
catch (Exception e) {
throw new RuntimeException(e);
}
finally {
Thread.currentThread().setContextClassLoader(oldLoader);
}
}
}
@GET
@Path("/rowStats")
@Produces(MediaType.APPLICATION_JSON)
public Response getRowStats(
@Context final HttpServletRequest req,
@QueryParam("windows") List<Integer> windows
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
Map<String, Object> returnMap = new HashMap<>();
Map<String, Object> totalsMap = new HashMap<>();
if (determinePartitionsStatsGetter != null) {
totalsMap.put(RowIngestionMeters.DETERMINE_PARTITIONS, determinePartitionsStatsGetter.getTotalMetrics());
}
if (buildSegmentsStatsGetter != null) {
totalsMap.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegmentsStatsGetter.getTotalMetrics());
}
returnMap.put("totals", totalsMap);
return Response.ok(returnMap).build();
}
private Map<String, TaskReport> getTaskCompletionReports()
{
return TaskReport.buildTaskReports(
new IngestionStatsAndErrorsTaskReport(
getId(),
new IngestionStatsAndErrorsTaskReportData(
ingestionState,
null,
getTaskCompletionRowStats(),
errorMsg
)
)
);
}
private Map<String, Object> getTaskCompletionRowStats()
{
Map<String, Object> metrics = new HashMap<>();
if (determineConfigStatus != null) {
metrics.put(
RowIngestionMeters.DETERMINE_PARTITIONS,
determineConfigStatus.getMetrics()
);
}
if (buildSegmentsStatus != null) {
metrics.put(
RowIngestionMeters.BUILD_SEGMENTS,
buildSegmentsStatus.getMetrics()
);
}
return metrics;
}
public static class InnerProcessingStatsGetter implements TaskMetricsGetter
{
static final List<String> KEYS = ImmutableList.of(
TaskMetricsUtils.ROWS_PROCESSED,
TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS,
TaskMetricsUtils.ROWS_THROWN_AWAY,
TaskMetricsUtils.ROWS_UNPARSEABLE
);
private final Method getStatsMethod;
private final Object innerProcessingRunner;
public InnerProcessingStatsGetter(
Object innerProcessingRunner
)
{
try {
Class<?> aClazz = innerProcessingRunner.getClass();
this.getStatsMethod = aClazz.getMethod("getStats");
this.innerProcessingRunner = innerProcessingRunner;
}
catch (NoSuchMethodException nsme) {
throw new RuntimeException(nsme);
}
}
@Override
public List<String> getKeys()
{
return KEYS;
}
@Nullable
@Override
public Map<String, Number> getTotalMetrics()
{
try {
Map<String, Object> statsMap = (Map<String, Object>) getStatsMethod.invoke(innerProcessingRunner);
if (statsMap == null) {
return null;
}
long curProcessed = (Long) statsMap.get(TaskMetricsUtils.ROWS_PROCESSED);
long curProcessedWithErrors = (Long) statsMap.get(TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS);
long curThrownAway = (Long) statsMap.get(TaskMetricsUtils.ROWS_THROWN_AWAY);
long curUnparseable = (Long) statsMap.get(TaskMetricsUtils.ROWS_UNPARSEABLE);
return ImmutableMap.of(
TaskMetricsUtils.ROWS_PROCESSED, curProcessed,
TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS, curProcessedWithErrors,
TaskMetricsUtils.ROWS_THROWN_AWAY, curThrownAway,
TaskMetricsUtils.ROWS_UNPARSEABLE, curUnparseable
);
}
catch (Exception e) {
log.error(e, "Got exception from getTotalMetrics()");
return null;
}
}
}
/** Called indirectly in {@link HadoopIndexTask#run(TaskToolbox)}. */
@SuppressWarnings("unused")
public static class HadoopDetermineConfigInnerProcessingRunner
{
private HadoopDruidDetermineConfigurationJob job;
public String runTask(String[] args) throws Exception
{
final String schema = args[0];
final String workingPath = args[1];
final String segmentOutputPath = args[2];
final String hadoopJobIdFile = args[3];
final HadoopIngestionSpec theSchema = HadoopDruidIndexerConfig.JSON_MAPPER
.readValue(
schema,
HadoopIngestionSpec.class
);
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(
theSchema
.withIOConfig(theSchema.getIOConfig().withSegmentOutputPath(segmentOutputPath))
.withTuningConfig(theSchema.getTuningConfig().withWorkingPath(workingPath))
);
job = new HadoopDruidDetermineConfigurationJob(config);
job.setHadoopJobIdFile(hadoopJobIdFile);
log.info("Starting a hadoop determine configuration job...");
if (job.run()) {
return HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(
new HadoopDetermineConfigInnerProcessingStatus(config.getSchema(), job.getStats(), null)
);
} else {
return HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(
new HadoopDetermineConfigInnerProcessingStatus(null, job.getStats(), job.getErrorMessage())
);
}
}
public Map<String, Object> getStats()
{
if (job == null) {
return null;
}
return job.getStats();
}
}
@SuppressWarnings("unused")
public static class HadoopIndexGeneratorInnerProcessingRunner
{
private HadoopDruidIndexerJob job;
public String runTask(String[] args) throws Exception
{
final String schema = args[0];
String version = args[1];
final String hadoopJobIdFile = args[2];
final HadoopIngestionSpec theSchema = HadoopDruidIndexerConfig.JSON_MAPPER
.readValue(
schema,
HadoopIngestionSpec.class
);
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(
theSchema
.withTuningConfig(theSchema.getTuningConfig().withVersion(version))
);
// MetadataStorageUpdaterJobHandler is only needed when running standalone without indexing service
// In that case the whatever runs the Hadoop Index Task must ensure MetadataStorageUpdaterJobHandler
// can be injected based on the configuration given in config.getSchema().getIOConfig().getMetadataUpdateSpec()
final MetadataStorageUpdaterJobHandler maybeHandler;
if (config.isUpdaterJobSpecSet()) {
maybeHandler = INJECTOR.getInstance(MetadataStorageUpdaterJobHandler.class);
} else {
maybeHandler = null;
}
job = new HadoopDruidIndexerJob(config, maybeHandler);
job.setHadoopJobIdFile(hadoopJobIdFile);
log.info("Starting a hadoop index generator job...");
try {
if (job.run()) {
return HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(
new HadoopIndexGeneratorInnerProcessingStatus(
job.getPublishedSegments(),
job.getStats(),
null
)
);
} else {
return HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(
new HadoopIndexGeneratorInnerProcessingStatus(
null,
job.getStats(),
job.getErrorMessage()
)
);
}
}
catch (Exception e) {
log.error(e, "Encountered exception in HadoopIndexGeneratorInnerProcessing.");
return HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(
new HadoopIndexGeneratorInnerProcessingStatus(
null,
job.getStats(),
e.getMessage()
)
);
}
}
public Map<String, Object> getStats()
{
if (job == null) {
return null;
}
return job.getStats();
}
}
@SuppressWarnings("unused")
public static class HadoopKillMRJobIdProcessingRunner
{
public String[] runTask(String[] args) throws Exception
{
File hadoopJobIdFile = new File(args[0]);
String jobId = null;
try {
if (hadoopJobIdFile.exists()) {
jobId = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(hadoopJobIdFile, String.class);
}
}
catch (Exception e) {
log.warn(e, "exeption while reading hadoop job id from: [%s]", hadoopJobIdFile);
}
if (jobId != null) {
int res = ToolRunner.run(new JobClient(), new String[]{
"-kill",
jobId
});
return new String[] {jobId, (res == 0 ? "Success" : "Fail")};
}
return new String[] {jobId, "Fail"};
}
}
public static class HadoopIndexGeneratorInnerProcessingStatus
{
private final List<DataSegment> dataSegments;
private final Map<String, Object> metrics;
private final String errorMsg;
@JsonCreator
public HadoopIndexGeneratorInnerProcessingStatus(
@JsonProperty("dataSegments") List<DataSegment> dataSegments,
@JsonProperty("metrics") Map<String, Object> metrics,
@JsonProperty("errorMsg") String errorMsg
)
{
this.dataSegments = dataSegments;
this.metrics = metrics;
this.errorMsg = errorMsg;
}
@JsonProperty
public List<DataSegment> getDataSegments()
{
return dataSegments;
}
@JsonProperty
public Map<String, Object> getMetrics()
{
return metrics;
}
@JsonProperty
public String getErrorMsg()
{
return errorMsg;
}
}
public static class HadoopDetermineConfigInnerProcessingStatus
{
private final HadoopIngestionSpec schema;
private final Map<String, Object> metrics;
private final String errorMsg;
@JsonCreator
public HadoopDetermineConfigInnerProcessingStatus(
@JsonProperty("schema") HadoopIngestionSpec schema,
@JsonProperty("metrics") Map<String, Object> metrics,
@JsonProperty("errorMsg") String errorMsg
)
{
this.schema = schema;
this.metrics = metrics;
this.errorMsg = errorMsg;
}
@JsonProperty
public HadoopIngestionSpec getSchema()
{
return schema;
}
@JsonProperty
public Map<String, Object> getMetrics()
{
return metrics;
}
@JsonProperty
public String getErrorMsg()
{
return errorMsg;
}
}
}