blob: 830c5c441ea2fdef0d9147eb29fe651b9c995ede [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.runtime.executor.transfer;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.nemo.runtime.common.comm.ControlMessage.ByteTransferDataDirection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.concurrent.atomic.AtomicReference;
import java.util.Objects;
/**
* {@link ByteInputContext} and {@link ByteOutputContext}.
*/
public abstract class ByteTransferContext {
private static final Logger LOG = LoggerFactory.getLogger(ByteTransferContext.class);
private final String remoteExecutorId;
private final ContextId contextId;
private final byte[] contextDescriptor;
private final ChannelWriteFutureListener channelWriteFutureListener = new ChannelWriteFutureListener();
private final ContextManager contextManager;
private final AtomicReference<Throwable> exception = new AtomicReference<>();
/**
* Creates a transfer context.
*
* @param remoteExecutorId id of the remote executor
* @param contextId identifier for this context
* @param contextDescriptor user-provided context descriptor
* @param contextManager to de-register context when this context expires
*/
ByteTransferContext(final String remoteExecutorId,
final ContextId contextId,
final byte[] contextDescriptor,
final ContextManager contextManager) {
this.remoteExecutorId = remoteExecutorId;
this.contextId = contextId;
this.contextDescriptor = contextDescriptor;
this.contextManager = contextManager;
}
/**
* @return the remote executor id.
*/
public final String getRemoteExecutorId() {
return remoteExecutorId;
}
/**
* @return the identifier for this transfer context.
*/
public final ContextId getContextId() {
return contextId;
}
/**
* @return user-provided context descriptor.
*/
public final byte[] getContextDescriptor() {
return contextDescriptor;
}
/**
* @return Whether this context has exception or not.
*/
public final boolean hasException() {
return exception.get() != null;
}
/**
* @return The exception involved with this context, or {@code null}.
*/
public final Throwable getException() {
return exception.get();
}
@Override
public final String toString() {
return contextId.toString();
}
/**
* @return Listener for channel write.
*/
final ChannelFutureListener getChannelWriteListener() {
return channelWriteFutureListener;
}
/**
* Handles exception.
*
* @param cause the cause of exception handling
*/
public abstract void onChannelError(@Nullable Throwable cause);
/**
* Sets exception.
*
* @param cause the exception to set
*/
protected final void setChannelError(@Nullable final Throwable cause) {
if (hasException()) {
return;
}
LOG.error(String.format("A channel exception set on %s", toString())); // Not logging throwable, which isn't useful
exception.set(cause);
}
/**
* De-registers this context from {@link ContextManager}.
*/
protected final void deregister() {
contextManager.onContextExpired(this);
}
/**
* Globally unique identifier of transfer context.
*/
static final class ContextId {
private final String initiatorExecutorId;
private final String partnerExecutorId;
private final ByteTransferDataDirection dataDirection;
private final int transferIndex;
private final boolean isPipe;
/**
* Create {@link ContextId}.
*
* @param initiatorExecutorId id of the executor who initiated this context and issued context id
* @param partnerExecutorId the other executor
* @param dataDirection the direction of the data flow
* @param transferIndex an index issued by the initiator
* @param isPipe is a pipe context
*/
ContextId(final String initiatorExecutorId,
final String partnerExecutorId,
final ByteTransferDataDirection dataDirection,
final int transferIndex,
final boolean isPipe) {
this.initiatorExecutorId = initiatorExecutorId;
this.partnerExecutorId = partnerExecutorId;
this.dataDirection = dataDirection;
this.transferIndex = transferIndex;
this.isPipe = isPipe;
}
public String getInitiatorExecutorId() {
return initiatorExecutorId;
}
public String getPartnerExecutorId() {
return partnerExecutorId;
}
public boolean isPipe() {
return isPipe;
}
public ByteTransferDataDirection getDataDirection() {
return dataDirection;
}
public int getTransferIndex() {
return transferIndex;
}
@Override
public String toString() {
if (dataDirection == ByteTransferDataDirection.INITIATOR_SENDS_DATA) {
return String.format("%s--%d->%s", initiatorExecutorId, transferIndex, partnerExecutorId);
} else {
return String.format("%s<-%d--%s", initiatorExecutorId, transferIndex, partnerExecutorId);
}
}
@Override
public boolean equals(final Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
final ContextId contextId = (ContextId) other;
return transferIndex == contextId.transferIndex
&& Objects.equals(initiatorExecutorId, contextId.initiatorExecutorId)
&& Objects.equals(partnerExecutorId, contextId.partnerExecutorId)
&& dataDirection == contextId.dataDirection;
}
@Override
public int hashCode() {
return Objects.hash(initiatorExecutorId, partnerExecutorId, dataDirection, transferIndex);
}
}
/**
* Listener for channel write.
*/
private final class ChannelWriteFutureListener implements ChannelFutureListener {
@Override
public void operationComplete(final ChannelFuture future) {
if (future.isSuccess()) {
return;
}
onChannelError(future.cause());
}
}
}