blob: a7e5ddf49fd3eeabeb46d47fa37dbe5897fe1fea [file] [log] [blame]
package com.a.eye.skywalking.collector.actor;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
import akka.cluster.MemberStatus;
import com.a.eye.skywalking.collector.cluster.WorkerListenerMessage;
import com.a.eye.skywalking.collector.cluster.WorkersListener;
import com.a.eye.skywalking.collector.log.LogManager;
import org.apache.logging.log4j.Logger;
/**
* The <code>AbstractClusterWorker</code> implementations represent workers,
* which receive remote messages.
* <p>
* Usually, the implementations are doing persistent, or aggregate works.
*
* @author pengys5
* @since v3.0-2017
*/
public abstract class AbstractClusterWorker extends AbstractWorker {
/**
* Construct an <code>AbstractClusterWorker</code> with the worker role and context.
*
* @param role If multi-workers are for load balance, they should be more likely called worker instance. Meaning,
* each worker have multi instances.
* @param clusterContext See {@link ClusterWorkerContext}
* @param selfContext See {@link LocalWorkerContext}
*/
protected AbstractClusterWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
/**
* This method use for message producer to call for send message.
*
* @param message The persistence data or metric data.
* @throws Exception The Exception happen in {@link #onWork(Object)}
*/
final public void allocateJob(Object message) throws Exception {
onWork(message);
}
/**
* This method use for message receiver to analyse message.
*
* @param message Cast the message object to a expect subclass.
* @throws Exception Don't handle the exception, throw it.
*/
protected abstract void onWork(Object message) throws Exception;
static class WorkerWithAkka extends UntypedActor {
private Logger logger = LogManager.INSTANCE.getFormatterLogger(WorkerWithAkka.class);
private Cluster cluster;
private final AbstractClusterWorker ownerWorker;
public WorkerWithAkka(AbstractClusterWorker ownerWorker) {
this.ownerWorker = ownerWorker;
cluster = Cluster.get(getContext().system());
}
@Override
public void preStart() throws Exception {
cluster.subscribe(getSelf(), ClusterEvent.MemberUp.class);
}
@Override
public void postStop() throws Exception {
cluster.unsubscribe(getSelf());
}
/**
* Listening {@link ClusterEvent.MemberUp} and {@link ClusterEvent.CurrentClusterState}
* cluster event, when event send from the member of {@link WorkersListener} then tell
* the sender to register self.
*/
@Override
public void onReceive(Object message) throws Throwable {
if (message instanceof ClusterEvent.CurrentClusterState) {
ClusterEvent.CurrentClusterState state = (ClusterEvent.CurrentClusterState)message;
for (Member member : state.getMembers()) {
if (member.status().equals(MemberStatus.up())) {
register(member);
}
}
} else if (message instanceof ClusterEvent.MemberUp) {
ClusterEvent.MemberUp memberUp = (ClusterEvent.MemberUp)message;
logger.info("receive ClusterEvent.MemberUp message, address: %s", memberUp.member().address().toString());
register(memberUp.member());
} else {
logger.debug("worker class: %s, message class: %s", this.getClass().getName(), message.getClass().getName());
ownerWorker.allocateJob(message);
}
}
/**
* When member role is {@link WorkersListener#WORK_NAME} then Select actor from context
* and send register message to {@link WorkersListener}
*
* @param member is the new created or restart worker
*/
void register(Member member) {
if (member.hasRole(WorkersListener.WORK_NAME)) {
WorkerListenerMessage.RegisterMessage registerMessage = new WorkerListenerMessage.RegisterMessage(ownerWorker.getRole());
logger.info("member address: %s, worker path: %s", member.address().toString(), getSelf().path().toString());
getContext().actorSelection(member.address() + "/user/" + WorkersListener.WORK_NAME).tell(registerMessage, getSelf());
}
}
}
}