blob: b35dbb0162280914bad87b6600527caccf6ffb09 [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.task;
import com.google.common.base.Preconditions;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
import org.apache.tez.runtime.api.impl.TezUmbilical;
import org.apache.tez.runtime.internals.api.TezTrapEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
/**
* Class that handles the events after the trap has been activated. At
* this point no more events of some types shouldn't be sent and it's
* a bug to do so. If the events arrive here probably the task will be
* restarted because it has failed.
*/
public class TezTrapEventHandler implements EventHandler<TezTrapEvent> {
/**
* logger.
*/
private static final Logger
LOG = LoggerFactory.getLogger(TezOutputContextImpl.class);
/**
* Output context that will report the events.
*/
private final OutputContext outputContext;
/**
* Protocol to send the events.
*/
private final TezUmbilical tezUmbilical;
/**
* @param output context that will report the events.
* @param umbilical used to send the events to the AM.
*/
TezTrapEventHandler(final OutputContext output,
final TezUmbilical umbilical) {
this.outputContext = output;
this.tezUmbilical = umbilical;
}
/**
* Decide what to do with the events.
* @param tezTrapEvent event holding the tez events.
*/
@Override
public final void handle(final TezTrapEvent tezTrapEvent) {
Preconditions.checkArgument(tezTrapEvent.getTezEvents() != null);
List<TezEvent> tezEvents = new ArrayList<TezEvent>(
tezTrapEvent.getTezEvents().size());
for (TezEvent tezEvent: tezTrapEvent.getTezEvents()) {
switch (tezEvent.getEventType()) {
case COMPOSITE_DATA_MOVEMENT_EVENT:
case DATA_MOVEMENT_EVENT:
String errorMsg = "Some events won't be sent to the AM because all"
+ " the events should have been sent at this point. Most likely"
+ " this would result in a bug. "
+ " event:" + tezEvent.toString();
Throwable throwable = new Throwable(errorMsg);
LOG.error(errorMsg, throwable);
break;
default:
LOG.info("Event of type " + tezEvent.getEventType() + " will be sent"
+ " to the AM after the task was closed ");
tezEvents.add(tezEvent);
}
}
tezUmbilical.addEvents(tezEvents);
}
}