blob: 5410ef52eb7a4580880f506e1dbbeeeb141a9009 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.platform.cache.query;
import java.io.ObjectStreamException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.cache.Cache;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryListenerException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.binary.BinaryObjectImpl;
import org.apache.ignite.internal.binary.GridBinaryMarshaller;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.PlatformJavaObjectFactoryProxy;
import org.apache.ignite.internal.processors.platform.PlatformTarget;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
/**
* Interop continuous query handle.
*/
public class PlatformContinuousQueryImpl implements PlatformContinuousQuery {
/** */
private static final long serialVersionUID = 0L;
/** Context. */
protected final PlatformContext platformCtx;
/** Whether filter exists. */
private final boolean hasFilter;
/** Native filter in serialized form. If null, then filter is either not set, or this is local query. */
protected final Object filter;
/** Java filter. */
protected final CacheEntryEventFilter javaFilter;
/** Pointer to native counterpart; zero if closed. */
private long ptr;
/** Cursor to handle filter close. */
private QueryCursor cursor;
/** Lock for concurrency control. */
private final ReadWriteLock lock = new ReentrantReadWriteLock();
/** Wrapped initial qry cursor. */
private PlatformQueryCursor initialQryCur;
/**
* Constructor.
*
* @param platformCtx Context.
* @param ptr Pointer to native counterpart.
* @param hasFilter Whether filter exists.
* @param filter Filter.
*/
public PlatformContinuousQueryImpl(PlatformContext platformCtx, long ptr, boolean hasFilter, Object filter) {
assert ptr != 0L;
this.platformCtx = platformCtx;
this.ptr = ptr;
this.hasFilter = hasFilter;
this.filter = filter;
javaFilter = getJavaFilter(filter, platformCtx.kernalContext());
}
/**
* Gets the Java filter if present.
*
* @param filter Filter object.
* @param ctx Context.
* @return Java filter or null.
*/
private static CacheEntryEventFilter getJavaFilter(Object filter, GridKernalContext ctx) {
if (filter instanceof BinaryObjectImpl) {
BinaryObjectImpl bo = (BinaryObjectImpl)filter;
if (bo.typeId() == GridBinaryMarshaller.PLATFORM_JAVA_OBJECT_FACTORY_PROXY) {
PlatformJavaObjectFactoryProxy prx = bo.deserialize();
return (CacheEntryEventFilter)prx.factory(ctx).create();
}
}
return null;
}
/**
* Start query execution.
*
* @param cache Cache.
* @param loc Local flag.
* @param bufSize Buffer size.
* @param timeInterval Time interval.
* @param autoUnsubscribe Auto-unsubscribe flag.
* @param initialQry Initial query.
*/
@SuppressWarnings("unchecked")
@Override public void start(IgniteCacheProxy cache, boolean loc, int bufSize, long timeInterval,
boolean autoUnsubscribe, Query initialQry) throws IgniteCheckedException {
lock.writeLock().lock();
try {
try {
ContinuousQuery qry = new ContinuousQuery();
qry.setLocalListener(this);
qry.setRemoteFilter(this); // Filter must be set always for correct resource release.
qry.setPageSize(bufSize);
qry.setTimeInterval(timeInterval);
qry.setAutoUnsubscribe(autoUnsubscribe);
qry.setInitialQuery(initialQry);
cursor = cache.query(qry.setLocal(loc));
if (initialQry != null)
initialQryCur = new PlatformQueryCursor(platformCtx, new QueryCursorEx<Cache.Entry>() {
@Override public Iterator<Cache.Entry> iterator() {
return cursor.iterator();
}
@Override public List<Cache.Entry> getAll() {
return cursor.getAll();
}
@Override public void close() {
// No-op: do not close whole continuous query when initial query cursor closes.
}
@Override public void getAll(Consumer<Cache.Entry> clo) throws IgniteCheckedException {
for (Cache.Entry t : this)
clo.consume(t);
}
@Override public List<GridQueryFieldMetadata> fieldsMeta() {
return null;
}
}, initialQry.getPageSize() > 0 ? initialQry.getPageSize() : Query.DFLT_PAGE_SIZE);
}
catch (Exception e) {
try
{
close0();
}
catch (Exception ignored)
{
// Ignore
}
throw PlatformUtils.unwrapQueryException(e);
}
}
finally {
lock.writeLock().unlock();
}
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void onUpdated(Iterable evts) throws CacheEntryListenerException {
lock.readLock().lock();
try {
if (ptr == 0)
throw new CacheEntryListenerException("Failed to notify listener because it has been closed.");
PlatformUtils.applyContinuousQueryEvents(platformCtx, ptr, evts);
}
finally {
lock.readLock().unlock();
}
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public boolean evaluate(CacheEntryEvent evt) throws CacheEntryListenerException {
if (javaFilter != null)
return javaFilter.evaluate(evt);
lock.readLock().lock();
try {
if (ptr == 0)
throw new CacheEntryListenerException("Failed to evaluate the filter because it has been closed.");
return !hasFilter || PlatformUtils.evaluateContinuousQueryEvent(platformCtx, ptr, evt);
}
finally {
lock.readLock().unlock();
}
}
/** {@inheritDoc} */
@Override public void onQueryUnregister() {
close();
}
/** {@inheritDoc} */
@Override public void close() {
lock.writeLock().lock();
try {
close0();
}
finally {
lock.writeLock().unlock();
}
}
/** {@inheritDoc} */
@SuppressWarnings({"UnusedDeclaration", "unchecked"})
@Override public PlatformTarget getInitialQueryCursor() {
return initialQryCur;
}
/**
* Internal close routine.
*/
private void close0() {
if (ptr != 0) {
long ptr0 = ptr;
ptr = 0;
if (cursor != null)
cursor.close();
platformCtx.gateway().continuousQueryFilterRelease(ptr0);
}
}
/**
* Replacer for remote filter.
*
* @return Filter to be deployed on remote node.
* @throws ObjectStreamException If failed.
*/
Object writeReplace() throws ObjectStreamException {
if (javaFilter != null)
return javaFilter;
return filter == null ? null : platformCtx.createContinuousQueryFilter(filter);
}
}