blob: 8b8a89d3a050bb9e14a7cbfb8630fa8715d5b831 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
import org.apache.ignite.internal.managers.deployment.P2PClassLoadingIssues;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P2;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.marshaller.Marshaller;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVTS_ALL;
/**
* Continuous routine handler for remote event listening.
*/
class GridEventConsumeHandler implements GridContinuousHandler {
/** */
private static final long serialVersionUID = 0L;
/** Default callback. */
private static final IgniteBiPredicate<UUID, Event> DFLT_CALLBACK = new P2<UUID, Event>() {
@Override public boolean apply(UUID uuid, Event e) {
return true;
}
};
/** Local callback. */
private IgniteBiPredicate<UUID, Event> cb;
/** Filter. */
private IgnitePredicate<Event> filter;
/** Serialized filter. */
private byte[] filterBytes;
/** Deployment class name. */
private String clsName;
/** Deployment info. */
private GridDeploymentInfo depInfo;
/** Types. */
private int[] types;
/** Listener. */
private GridLocalEventListener lsnr;
/** P2P unmarshalling future. */
private IgniteInternalFuture<Void> p2pUnmarshalFut = new GridFinishedFuture<>();
/**
* Required by {@link Externalizable}.
*/
public GridEventConsumeHandler() {
// No-op.
}
/**
* @param cb Local callback.
* @param filter Filter.
* @param types Types.
*/
GridEventConsumeHandler(@Nullable IgniteBiPredicate<UUID, Event> cb, @Nullable IgnitePredicate<Event> filter,
@Nullable int[] types) {
this.cb = cb == null ? DFLT_CALLBACK : cb;
this.filter = filter;
this.types = types;
}
/** {@inheritDoc} */
@Override public boolean isEvents() {
return true;
}
/** {@inheritDoc} */
@Override public boolean isMessaging() {
return false;
}
/** {@inheritDoc} */
@Override public boolean isQuery() {
return false;
}
/** {@inheritDoc} */
@Override public boolean keepBinary() {
return false;
}
/** {@inheritDoc} */
@Override public String cacheName() {
throw new IllegalStateException();
}
/** {@inheritDoc} */
@Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode,
Map<Integer, T2<Long, Long>> cntrs) {
// No-op.
}
/** {@inheritDoc} */
@Override public Map<Integer, T2<Long, Long>> updateCounters() {
return Collections.emptyMap();
}
/**
* Performs remote filter initialization.
*
* @param filter Remote filter.
* @param ctx Kernal context.
* @throws IgniteCheckedException In case if initialization failed.
*/
private void initFilter(IgnitePredicate<Event> filter, GridKernalContext ctx) throws IgniteCheckedException {
if (filter != null)
ctx.resource().injectGeneric(filter);
if (filter instanceof PlatformEventFilterListener)
((PlatformEventFilterListener)filter).initialize(ctx);
}
/** {@inheritDoc} */
@Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx)
throws IgniteCheckedException {
assert nodeId != null;
assert routineId != null;
assert ctx != null;
if (cb != null)
ctx.resource().injectGeneric(cb);
final boolean loc = nodeId.equals(ctx.localNodeId());
lsnr = new GridLocalEventListener() {
/** node ID, routine ID, event */
private final Queue<T3<UUID, UUID, Event>> notificationQueue = new LinkedList<>();
private boolean notificationInProgress;
@Override public void onEvent(Event evt) {
if (filterDropsEvent(evt))
return;
if (loc) {
if (!cb.apply(nodeId, evt))
ctx.continuous().stopRoutine(routineId);
}
else {
if (ctx.discovery().node(nodeId) == null)
return;
synchronized (notificationQueue) {
notificationQueue.add(new T3<>(nodeId, routineId, evt));
if (!notificationInProgress) {
ctx.pools().getSystemExecutorService().execute(new Runnable() {
@Override public void run() {
if (!ctx.continuous().lockStopping())
return;
try {
while (true) {
T3<UUID, UUID, Event> t3;
synchronized (notificationQueue) {
t3 = notificationQueue.poll();
if (t3 == null) {
notificationInProgress = false;
return;
}
}
try {
Event evt = t3.get3();
EventWrapper wrapper = new EventWrapper(evt);
if (evt instanceof CacheEvent) {
String cacheName = ((CacheEvent)evt).cacheName();
ClusterNode node = ctx.discovery().node(t3.get1());
if (node == null)
continue;
if (ctx.config().isPeerClassLoadingEnabled() &&
ctx.discovery().cacheNode(node, cacheName)) {
GridCacheAdapter cache = ctx.cache().internalCache(cacheName);
if (cache != null && cache.context().deploymentEnabled()) {
wrapper.p2pMarshal(ctx.config().getMarshaller());
wrapper.cacheName = cacheName;
cache.context().deploy().prepare(wrapper);
}
}
}
ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null,
false, false);
}
catch (ClusterTopologyCheckedException ignored) {
// No-op.
}
catch (Throwable e) {
U.error(ctx.log(GridEventConsumeHandler.class),
"Failed to send event notification to node: " + nodeId, e);
}
}
}
finally {
ctx.continuous().unlockStopping();
}
}
});
notificationInProgress = true;
}
}
}
}
};
if (F.isEmpty(types))
types = EVTS_ALL;
p2pUnmarshalFut.listen((fut) -> {
if (fut.error() == null) {
try {
initFilter(filter, ctx);
}
catch (IgniteCheckedException e) {
throw F.wrap(e);
}
ctx.event().addLocalEventListener(lsnr, types);
}
});
return RegisterStatus.REGISTERED;
}
/**
* Returns {@code true} if there is a filter and this filter filters the given event out.
*
* @param evt event to check
* @return {@code true} if there is a filter and this filter filters the given event out
*/
private boolean filterDropsEvent(Event evt) {
try {
return filter != null && !filter.apply(evt);
}
catch (NoClassDefFoundError e) {
// Filter might be installed using P2P class loading, so let's be careful and avoid a NCDFE from getting
// to a Failure Handler.
P2PClassLoadingIssues.rethrowDisarmedP2PClassLoadingFailure(e);
return true;
}
}
/** {@inheritDoc} */
@Override public void unregister(UUID routineId, GridKernalContext ctx) {
assert routineId != null;
assert ctx != null;
if (lsnr != null)
ctx.event().removeLocalEventListener(lsnr, types);
RuntimeException err = null;
try {
if (filter instanceof PlatformEventFilterListener)
((PlatformEventFilterListener)filter).onClose();
}
catch (RuntimeException ex) {
err = ex;
}
try {
if (cb instanceof PlatformEventFilterListener)
((PlatformEventFilterListener)cb).onClose();
}
catch (RuntimeException ex) {
if (err == null)
err = ex;
}
if (err != null)
throw err;
}
/**
* @param nodeId Node ID.
* @param objs Notification objects.
*/
@Override public void notifyCallback(UUID nodeId, UUID routineId, Collection<?> objs, GridKernalContext ctx) {
assert nodeId != null;
assert routineId != null;
assert objs != null;
assert ctx != null;
for (Object obj : objs) {
assert obj instanceof EventWrapper;
EventWrapper wrapper = (EventWrapper)obj;
if (wrapper.bytes != null) {
assert ctx.config().isPeerClassLoadingEnabled();
GridCacheAdapter cache = ctx.cache().internalCache(wrapper.cacheName);
ClassLoader ldr = null;
try {
if (cache != null) {
GridCacheDeploymentManager depMgr = cache.context().deploy();
GridDeploymentInfo depInfo = wrapper.depInfo;
if (depInfo != null) {
depMgr.p2pContext(
nodeId,
depInfo.classLoaderId(),
depInfo.userVersion(),
depInfo.deployMode(),
depInfo.participants()
);
}
ldr = depMgr.globalLoader();
}
else {
U.warn(ctx.log(getClass()), "Received cache event for cache that is not configured locally " +
"when peer class loading is enabled: " + wrapper.cacheName + ". Will try to unmarshal " +
"with default class loader.");
}
wrapper.p2pUnmarshal(ctx.config().getMarshaller(), U.resolveClassLoader(ldr, ctx.config()));
}
catch (IgniteCheckedException e) {
U.error(ctx.log(getClass()), "Failed to unmarshal event.", e);
}
}
if (!cb.apply(nodeId, wrapper.evt)) {
ctx.continuous().stopRoutine(routineId);
break;
}
}
}
/** {@inheritDoc} */
@Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException {
assert ctx != null;
assert ctx.config().isPeerClassLoadingEnabled();
if (filter != null) {
Class cls = U.detectClass(filter);
clsName = cls.getName();
GridDeployment dep = ctx.deploy().deploy(cls, U.detectClassLoader(cls));
if (dep == null)
throw new IgniteDeploymentCheckedException("Failed to deploy event filter: " + filter);
depInfo = new GridDeploymentInfoBean(dep);
filterBytes = U.marshal(ctx.config().getMarshaller(), filter);
}
}
/** {@inheritDoc} */
@Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException {
assert nodeId != null;
assert ctx != null;
assert ctx.config().isPeerClassLoadingEnabled();
if (filterBytes != null) {
try {
GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName,
depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null);
if (dep == null)
throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName);
filter = U.unmarshal(ctx, filterBytes, U.resolveClassLoader(dep.classLoader(), ctx.config()));
((GridFutureAdapter)p2pUnmarshalFut).onDone();
}
catch (IgniteCheckedException e) {
((GridFutureAdapter)p2pUnmarshalFut).onDone(e);
throw e;
}
catch (ExceptionInInitializerError e) {
((GridFutureAdapter)p2pUnmarshalFut).onDone(e);
throw new IgniteCheckedException("Failed to unmarshal deployable object.", e);
}
}
}
/** {@inheritDoc} */
@Override public GridContinuousBatch createBatch() {
return new GridContinuousBatchAdapter();
}
/** {@inheritDoc} */
@Override public void onClientDisconnected() {
// No-op.
}
/** {@inheritDoc} */
@Override public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx) {
// No-op.
}
/** {@inheritDoc} */
@Nullable @Override public Object orderedTopic() {
return null;
}
/** {@inheritDoc} */
@Override public GridContinuousHandler clone() {
try {
return (GridContinuousHandler)super.clone();
}
catch (CloneNotSupportedException e) {
throw new IllegalStateException(e);
}
}
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
boolean b = filterBytes != null;
out.writeBoolean(b);
if (b) {
U.writeByteArray(out, filterBytes);
U.writeString(out, clsName);
out.writeObject(depInfo);
}
else
out.writeObject(filter);
out.writeObject(types);
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
boolean b = in.readBoolean();
if (b) {
p2pUnmarshalFut = new GridFutureAdapter<>();
filterBytes = U.readByteArray(in);
clsName = U.readString(in);
depInfo = (GridDeploymentInfo)in.readObject();
}
else
filter = (IgnitePredicate<Event>)in.readObject();
types = (int[])in.readObject();
}
/**
* Event wrapper.
*/
private static class EventWrapper implements GridCacheDeployable, Externalizable {
/** */
private static final long serialVersionUID = 0L;
/** Event. */
private Event evt;
/** Serialized event. */
private byte[] bytes;
/** Cache name (for cache events only). */
private String cacheName;
/** Deployment info. */
private GridDeploymentInfo depInfo;
/**
* Required by {@link Externalizable}.
*/
public EventWrapper() {
// No-op.
}
/**
* @param evt Event.
*/
EventWrapper(Event evt) {
assert evt != null;
this.evt = evt;
}
/**
* @param marsh Marshaller.
* @throws IgniteCheckedException In case of error.
*/
void p2pMarshal(Marshaller marsh) throws IgniteCheckedException {
assert marsh != null;
bytes = U.marshal(marsh, evt);
}
/**
* @param marsh Marshaller.
* @param ldr Class loader.
* @throws IgniteCheckedException In case of error.
*/
void p2pUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException {
assert marsh != null;
assert evt == null;
assert bytes != null;
evt = U.unmarshal(marsh, bytes, ldr);
}
/** {@inheritDoc} */
@Override public void prepare(GridDeploymentInfo depInfo) {
assert evt instanceof CacheEvent;
this.depInfo = depInfo;
}
/** {@inheritDoc} */
@Override public GridDeploymentInfo deployInfo() {
return depInfo;
}
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
boolean b = bytes != null;
out.writeBoolean(b);
if (b) {
U.writeByteArray(out, bytes);
U.writeString(out, cacheName);
out.writeObject(depInfo);
}
else
out.writeObject(evt);
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
boolean b = in.readBoolean();
if (b) {
bytes = U.readByteArray(in);
cacheName = U.readString(in);
depInfo = (GridDeploymentInfo)in.readObject();
}
else
evt = (Event)in.readObject();
}
}
}