blob: 8ffb8cb81ab45e17e4be9bf796b1ac7bda0f2ac5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.cache.query;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import javax.cache.*;
import javax.cache.event.*;
/**
* API for configuring continuous cache queries.
* <p>
* Continuous queries allow to register a remote filter and a local listener
* for cache updates. If an update event passes the filter, it will be sent to
* the node that executed the query and local listener will be notified.
* <p>
* Additionally, you can execute initial query to get currently existing data.
* Query can be of any type (SQL, TEXT or SCAN) and can be set via {@link #setInitialQuery(Query)}
* method.
* <p>
* Query can be executed either on all nodes in topology using {@link IgniteCache#query(Query)}
* method, or only on the local node, if {@link Query#setLocal(boolean)} parameter is set to {@code true}.
* Note that in case query is distributed and a new node joins, it will get the remote
* filter for the query during discovery process before it actually joins topology,
* so no updates will be missed.
* <h1 class="header">Example</h1>
* As an example, suppose we have cache with {@code 'Person'} objects and we need
* to query all persons with salary above 1000.
* <p>
* Here is the {@code Person} class:
* <pre name="code" class="java">
* public class Person {
* // Name.
* private String name;
*
* // Salary.
* private double salary;
*
* ...
* }
* </pre>
* <p>
* You can create and execute continuous query like so:
* <pre name="code" class="java">
* // Create new continuous query.
* ContinuousQuery&lt;Long, Person&gt; qry = new ContinuousQuery&lt;&gt;();
*
* // Initial iteration query will return all persons with salary above 1000.
* qry.setInitialQuery(new ScanQuery&lt;&gt;((id, p) -> p.getSalary() &gt; 1000));
*
*
* // Callback that is called locally when update notifications are received.
* // It simply prints out information about all created persons.
* qry.setLocalListener((evts) -> {
* for (CacheEntryEvent&lt;? extends Long, ? extends Person&gt; e : evts) {
* Person p = e.getValue();
*
* System.out.println(p.getFirstName() + " " + p.getLastName() + "'s salary is " + p.getSalary());
* }
* });
*
* // Continuous listener will be notified for persons with salary above 1000.
* qry.setRemoteFilter(evt -> evt.getValue().getSalary() &gt; 1000);
*
* // Execute query and get cursor that iterates through initial data.
* QueryCursor&lt;Cache.Entry&lt;Long, Person&gt;&gt; cur = cache.query(qry);
* </pre>
* This will execute query on all nodes that have cache you are working with and
* listener will start to receive notifications for cache updates.
* <p>
* To stop receiving updates call {@link QueryCursor#close()} method:
* <pre name="code" class="java">
* cur.close();
* </pre>
* Note that this works even if you didn't provide initial query. Cursor will
* be empty in this case, but it will still unregister listeners when {@link QueryCursor#close()}
* is called.
*/
public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
/** */
private static final long serialVersionUID = 0L;
/**
* Default page size. Size of {@code 1} means that all entries
* will be sent to master node immediately (buffering is disabled).
*/
public static final int DFLT_PAGE_SIZE = 1;
/** Maximum default time interval after which buffer will be flushed (if buffering is enabled). */
public static final long DFLT_TIME_INTERVAL = 0;
/**
* Default value for automatic unsubscription flag. Remote filters
* will be unregistered by default if master node leaves topology.
*/
public static final boolean DFLT_AUTO_UNSUBSCRIBE = true;
/** Initial query. */
private Query<Cache.Entry<K, V>> initQry;
/** Local listener. */
private CacheEntryUpdatedListener<K, V> locLsnr;
/** Remote filter. */
private CacheEntryEventSerializableFilter<K, V> rmtFilter;
/** Time interval. */
private long timeInterval = DFLT_TIME_INTERVAL;
/** Automatic unsubscription flag. */
private boolean autoUnsubscribe = DFLT_AUTO_UNSUBSCRIBE;
/**
* Creates new continuous query.
*/
public ContinuousQuery() {
setPageSize(DFLT_PAGE_SIZE);
}
/**
* Sets initial query.
* <p>
* This query will be executed before continuous listener is registered
* which allows to iterate through entries which already existed at the
* time continuous query is executed.
*
* @param initQry Initial query.
* @return {@code this} for chaining.
*/
public ContinuousQuery<K, V> setInitialQuery(Query<Cache.Entry<K, V>> initQry) {
this.initQry = initQry;
return this;
}
/**
* Gets initial query.
*
* @return Initial query.
*/
public Query<Cache.Entry<K, V>> getInitialQuery() {
return initQry;
}
/**
* Sets local callback. This callback is called only in local node when new updates are received.
* <p>
* The callback predicate accepts ID of the node from where updates are received and collection
* of received entries. Note that for removed entries value will be {@code null}.
* <p>
* If the predicate returns {@code false}, query execution will be cancelled.
* <p>
* <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking (e.g.,
* synchronization or transactional cache operations), should be executed asynchronously without
* blocking the thread that called the callback. Otherwise, you can get deadlocks.
*
* @param locLsnr Local callback.
* @return {@code this} for chaining.
*/
public ContinuousQuery<K, V> setLocalListener(CacheEntryUpdatedListener<K, V> locLsnr) {
this.locLsnr = locLsnr;
return this;
}
/**
* Gets local listener.
*
* @return Local listener.
*/
public CacheEntryUpdatedListener<K, V> getLocalListener() {
return locLsnr;
}
/**
* Sets optional key-value filter. This filter is called before entry is sent to the master node.
* <p>
* <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking
* (e.g., synchronization or transactional cache operations), should be executed asynchronously
* without blocking the thread that called the filter. Otherwise, you can get deadlocks.
*
* @param rmtFilter Key-value filter.
* @return {@code this} for chaining.
*/
public ContinuousQuery<K, V> setRemoteFilter(CacheEntryEventSerializableFilter<K, V> rmtFilter) {
this.rmtFilter = rmtFilter;
return this;
}
/**
* Gets remote filter.
*
* @return Remote filter.
*/
public CacheEntryEventSerializableFilter<K, V> getRemoteFilter() {
return rmtFilter;
}
/**
* Sets time interval.
* <p>
* When a cache update happens, entry is first put into a buffer. Entries from buffer will
* be sent to the master node only if the buffer is full (its size can be provided via {@link #setPageSize(int)}
* method) or time provided via this method is exceeded.
* <p>
* Default time interval is {@code 0} which means that
* time check is disabled and entries will be sent only when buffer is full.
*
* @param timeInterval Time interval.
* @return {@code this} for chaining.
*/
public ContinuousQuery<K, V> setTimeInterval(long timeInterval) {
if (timeInterval < 0)
throw new IllegalArgumentException("Time interval can't be negative.");
this.timeInterval = timeInterval;
return this;
}
/**
* Gets time interval.
*
* @return Time interval.
*/
public long getTimeInterval() {
return timeInterval;
}
/**
* Sets automatic unsubscribe flag.
* <p>
* This flag indicates that query filters on remote nodes should be
* automatically unregistered if master node (node that initiated the query) leaves topology. If this flag is
* {@code false}, filters will be unregistered only when the query is cancelled from master node, and won't ever be
* unregistered if master node leaves grid.
* <p>
* Default value for this flag is {@code true}.
*
* @param autoUnsubscribe Automatic unsubscription flag.
* @return {@code this} for chaining.
*/
public ContinuousQuery<K, V> setAutoUnsubscribe(boolean autoUnsubscribe) {
this.autoUnsubscribe = autoUnsubscribe;
return this;
}
/** {@inheritDoc} */
@Override public ContinuousQuery<K, V> setPageSize(int pageSize) {
return (ContinuousQuery<K, V>)super.setPageSize(pageSize);
}
/** {@inheritDoc} */
@Override public ContinuousQuery<K, V> setLocal(boolean loc) {
return (ContinuousQuery<K, V>)super.setLocal(loc);
}
/**
* Gets automatic unsubscription flag value.
*
* @return Automatic unsubscription flag.
*/
public boolean isAutoUnsubscribe() {
return autoUnsubscribe;
}
}