blob: 9679029c02fc4349f8a15a7195ccda715ce1ac58 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.messaging;
import java.util.UUID;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteMessaging;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.jetbrains.annotations.Nullable;
/**
* Convenience actor-base adapter for {@link IgniteMessaging#localListen(Object, IgniteBiPredicate)}
* method.
*/
public abstract class MessagingListenActor<T> implements IgniteBiPredicate<UUID, T> {
/** */
private static final long serialVersionUID = 0L;
/** */
private boolean keepGoing = true;
/** */
private UUID nodeId;
/** */
@IgniteInstanceResource
private transient Ignite ignite;
/** */
@LoggerResource
private transient IgniteLogger log;
/**
* Gets grid instance associated with this actor.
*
* @return Grid instance associated with this actor.
*/
protected final Ignite ignite() {
assert ignite != null;
return ignite;
}
/** {@inheritDoc} */
@Override public final boolean apply(UUID nodeId, T rcvMsg) {
assert nodeId != null;
assert rcvMsg != null;
if (!keepGoing)
return false;
this.nodeId = nodeId;
try {
receive(nodeId, rcvMsg);
}
catch (Throwable e) {
onError(e);
if (e instanceof Error)
throw (Error)e;
}
return keepGoing;
}
/**
* This method is called in case when method {@link #receive(UUID, Object)} threw an exception.
* Insides of this method the implementation should call any of the {@code respond}, {@code stop}
* or {@code skip} methods. If overriding method does nothing - than return value of method
* {@link #receive(UUID, Object)} is undefined.
* <p>
* Default implementation simply calls method {@link #stop()}.
*
* @param e Exception thrown from method {@link #receive(UUID, Object)}.
*/
protected void onError(Throwable e) {
U.error(log, "Listener operation failed.", e);
stop();
}
/**
* This method receives the message. This is the only method that subclass needs to override.
* Insides of this method the implementation should call any of the {@code respond}, {@code stop}
* or {@code skip} methods. Note that if none of these methods are called - listener will continue
* listen for the new messages.
*
* @param nodeId ID of the sender node.
* @param rcvMsg Received message.
* @throws Throwable Thrown in case of any errors. Method {@link #onError(Throwable)}} will
* be called right before returning from this method.
*/
protected abstract void receive(UUID nodeId, T rcvMsg) throws Throwable;
/**
* This method instructs underlying implementation to stop receiving new messages and unregister
* the message listener.
* <p>
* Note that subclasses can call any of {@code respond}, {@code stop} or {@code skip} methods any
* number of times. Only the last executed method will determine whether or not the implementation will
* continue listen for the new messages.
*/
protected final void stop() {
keepGoing = false;
}
/**
* This method sends the response message to the original sender node and instructs underlying
* implementation to stop receiving new messages and unregister the message listener.
* <p>
* Note that subclasses can call any of {@code respond}, {@code stop} or {@code skip} methods any
* number of times. Only the last executed method will determine whether or not the implementation will
* continue listen for the new messages.
*
* @param respMsg Optional response message. If not {@code null} - it will be sent to the original
* sender node.
* @throws IgniteException Thrown in case of any errors.
*/
protected final void stop(@Nullable Object respMsg) throws IgniteException {
keepGoing = false;
send(nodeId, respMsg);
}
/**
* Skips current message and continues to listen for new message. This method simply calls
* {@code respond(null)}.
* <p>
* Note that subclasses can call any of {@code respond}, {@code stop} or {@code skip} methods any
* number of times. Only the last executed method will determine whether or not the implementation will
* continue listen for the new messages.
*/
protected final void skip() {
checkReversing();
keepGoing = true;
}
/**
* Responds to the original sender node with given message and continues to listen for the new messages.
* <p>
* Note that subclasses can call any of {@code respond}, {@code stop} or {@code skip} methods any
* number of times. Only the last executed method will determine whether or not the implementation will
* continue listen for the new messages.
*
* @param respMsg Optional response message. If not {@code null} - it will be sent to the original
* sender node.
* @throws IgniteException Thrown in case of any errors.
*/
protected final void respond(@Nullable Object respMsg) throws IgniteException {
checkReversing();
keepGoing = true;
send(nodeId, respMsg);
}
/**
* Responds to the provided node with given message and continues to listen for the new messages.
* <p>
* Note that subclasses can call any of {@code respond}, {@code stop} or {@code skip} methods any
* number of times. Only the last executed method will determine whether or not the implementation will
* continue listen for the new messages.
*
* @param id ID of the node to send the message to, if any.
* @param respMsg Optional response message. If not {@code null} - it will be sent to the original
* sender node.
* @throws IgniteException Thrown in case of any errors.
*/
protected final void respond(UUID id, @Nullable Object respMsg) throws IgniteException {
checkReversing();
keepGoing = true;
send(id, respMsg);
}
/**
* Checks reversing.
*/
private void checkReversing() {
if (!keepGoing)
U.warn(log, "Suspect logic - reversing listener return status (was 'true', then 'false', " +
"now 'true' again).");
}
/**
* Sends optional message. If message is {@code null} - it's no-op.
*
* @param nodeId ID of the node to send message to.
* @param respMsg Message to send.
* @throws IgniteException Thrown in case of any errors.
*/
private void send(UUID nodeId, @Nullable Object respMsg) throws IgniteException {
assert nodeId != null;
if (respMsg != null) {
ClusterNode node = ignite.cluster().node(nodeId);
if (node != null)
ignite.message(ignite.cluster().forNode(node)).send(null, respMsg); // Can still fail.
else
throw new IgniteException("Failed to send message since destination node has " +
"left topology (ignoring) [nodeId=" + nodeId + ", respMsg=" + respMsg + ']');
}
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(MessagingListenActor.class, this);
}
}