blob: 838b5042f45c8bb7aafe9cdc8a4c3e0a3634c0f4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite;
import java.util.Collection;
import java.util.Map;
import javax.cache.CacheException;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.stream.StreamReceiver;
import org.jetbrains.annotations.Nullable;
/**
* Data streamer is responsible for streaming external data into cache. It achieves it by
* properly buffering updates and properly mapping keys to nodes responsible for the data
* to make sure that there is the least amount of data movement possible and optimal
* network and memory utilization.
* <p>
* Note that data streamer data manipulation methods do not support transactions.
* When updating data with {@link #allowOverwrite()} set to {@code false} new entry
* is created on primary and backups if it has not existed. If {@link #allowOverwrite()}
* is {@code true} then batches are applied with regular {@code cache.put(..)} methods
* starting implicit transactions if streamer is targeted to a transactional cache.
* <p>
* However, explicit transactional updates inside are possible with custom {@link StreamReceiver}.
* This way batches can be applied within transaction(s) on target node.
* See {@link #receiver(StreamReceiver)} for details.
* <p>
* Note that streamer will stream data concurrently by multiple internal threads, so the
* data may get to remote nodes in different order from which it was added to
* the streamer.
* <p>
* Also note that {@code IgniteDataStreamer} is not the only way to add data into cache.
* Alternatively you can use {@link IgniteCache#loadCache(IgniteBiPredicate, Object...)}
* method to add data from underlying data store. You can also use standard
* cache {@code put(...)} and {@code putAll(...)} operations as well, but they most
* likely will not perform as well as this class for adding data. And finally,
* data can be added from underlying data store on demand, whenever it is accessed -
* for this no explicit data adding step is needed.
* <p>
* {@code IgniteDataStreamer} supports the following configuration properties:
* <ul>
* <li>
* {@link #perNodeBufferSize(int)} - when entries are added to data streamer via
* {@link #addData(Object, Object)} method, they are not sent to in-memory data grid right
* away and are buffered internally for better performance and network utilization.
* This setting controls the size of internal per-node buffer before buffered data
* is sent to remote node. Default is defined by {@link #DFLT_PER_NODE_BUFFER_SIZE}
* value.
* </li>
* <li>
* {@link #perNodeParallelOperations(int)} - sometimes data may be added
* to the data streamer via {@link #addData(Object, Object)} method faster than it can
* be put in cache. In this case, new buffered stream messages are sent to remote nodes
* before responses from previous ones are received. This could cause unlimited heap
* memory utilization growth on local and remote nodes. To control memory utilization,
* this setting limits maximum allowed number of parallel buffered stream messages that
* are being processed on remote nodes. If this number is exceeded, then
* {@link #addData(Object, Object)} method will block to control memory utilization.
* Default is equal to CPU count on remote node multiply by {@link #DFLT_PARALLEL_OPS_MULTIPLIER}.
* </li>
* <li>
* {@link #autoFlushFrequency(long)} - automatic flush frequency in milliseconds. Essentially,
* this is the time after which the streamer will make an attempt to submit all data
* added so far to remote nodes. Note that there is no guarantee that data will be
* delivered after this concrete attempt (e.g., it can fail when topology is
* changing), but it won't be lost anyway. Disabled by default (default value is {@code 0}).
* </li>
* <li>
* {@link #allowOverwrite(boolean)} - Sets flag enabling overwriting existing values in cache.
* Data streamer will perform better if this flag is disabled, which is the default setting.
* </li>
* <li>
* {@link #receiver(StreamReceiver)} - defines how cache will be updated with added entries.
* It allows to provide user-defined custom logic to update the cache in the most effective and flexible way.
* </li>
* <li>
* {@link #deployClass(Class)} - optional deploy class for peer deployment. All classes
* streamed by a data streamer must be class-loadable from the same class-loader.
* Ignite will make the best effort to detect the most suitable class-loader
* for data loading. However, in complex cases, where compound or deeply nested
* class-loaders are used, it is best to specify a deploy class which can be any
* class loaded by the class-loader for given data.
* </li>
* </ul>
*/
public interface IgniteDataStreamer<K, V> extends AutoCloseable {
/**
* Default max concurrent put operations count.
* @deprecated Is not used anymore.
*/
@Deprecated
public static final int DFLT_MAX_PARALLEL_OPS = 16;
/**
* Default multiplier for data streamer pool size to get concurrent batches count for each remote node.
*
* @see IgniteConfiguration#getDataStreamerThreadPoolSize()
* @see #perNodeParallelOperations()
*/
public static final int DFLT_PARALLEL_OPS_MULTIPLIER = 8;
/** Default operations batch size to sent to remote node for loading. */
public static final int DFLT_PER_NODE_BUFFER_SIZE = 512;
/** Default batch size per thread to send to buffer on node. */
public static final int DFLT_PER_THREAD_BUFFER_SIZE = 4096;
/** Default timeout for streamer's operations. */
public static final long DFLT_UNLIMIT_TIMEOUT = -1;
/**
* Name of cache to stream data to.
*
* @return Cache name or {@code null} for default cache.
*/
public String cacheName();
/**
* Gets flag enabling overwriting existing values in cache.
* Data streamer will perform better if this flag is disabled.
* <p>
* This flag is disabled by default (default is {@code false}).
*
* @return {@code True} if overwriting is allowed, {@code false} otherwise..
*/
public boolean allowOverwrite();
/**
* Sets flag enabling overwriting existing values in cache.
* Data streamer will perform better if this flag is disabled.
* Note that when this flag is {@code false}, updates will not be propagated to the cache store
* (i.e. {@link #skipStore()} flag will be set to {@code true} implicitly).
* <p>
* This flag is disabled by default (default is {@code false}).
* <p>
* The flag has no effect when custom cache receiver set using {@link #receiver(StreamReceiver)} method.
*
* @param allowOverwrite Flag value.
* @throws CacheException If failed.
*/
public void allowOverwrite(boolean allowOverwrite) throws CacheException;
/**
* Gets flag indicating that write-through behavior should be disabled for data streaming.
* Default is {@code false}.
*
* @return Skip store flag.
*/
public boolean skipStore();
/**
* Sets flag indicating that write-through behavior should be disabled for data streaming.
* Default is {@code false}.
*
* @param skipStore Skip store flag.
*/
public void skipStore(boolean skipStore);
/**
* Gets flag indicating that objects should be kept in binary format when passed to the stream receiver.
* Default is {@code false}.
*
* @return Skip store flag.
*/
public boolean keepBinary();
/**
* Sets flag indicating that objects should be kept in binary format when passes to the steam receiver.
* Default is {@code false}.
*
* @param keepBinary Keep binary flag.
*/
public void keepBinary(boolean keepBinary);
/**
* Gets size of per node key-value pairs buffer.
*
* @return Per node buffer size.
*/
public int perNodeBufferSize();
/**
* Sets size of per node key-value pairs buffer.
* <p>
* This method should be called prior to {@link #addData(Object, Object)} call.
* <p>
* If not provided, default value is {@link #DFLT_PER_NODE_BUFFER_SIZE}.
*
* @param bufSize Per node buffer size.
*/
public void perNodeBufferSize(int bufSize);
/**
* Gets maximum number of parallel stream operations for a single node.
*
* @return Maximum number of parallel stream operations for a single node.
*/
public int perNodeParallelOperations();
/**
* Sets maximum number of parallel stream operations for a single node.
* <p>
* This method should be called prior to {@link #addData(Object, Object)} call.
* <p>
* If not provided, default value is calculated as follows
* {@link #DFLT_PARALLEL_OPS_MULTIPLIER} * {@code DATA_STREAMER_POOL_SIZE_ON_REMOTE_NODE}.
*
* @param parallelOps Maximum number of parallel stream operations for a single node.
* @see IgniteConfiguration#getDataStreamerThreadPoolSize()
*/
public void perNodeParallelOperations(int parallelOps);
/**
* Allows to set buffer size for thread in case of stream by {@link #addData(Object, Object)} call.
*
* @param size Size of buffer.
*/
public void perThreadBufferSize(int size);
/**
* Gets buffer size set by {@link #perThreadBufferSize(int)}.
*
* @return Buffer size.
*/
public int perThreadBufferSize();
/**
* Sets the timeout that is used in the following cases:
* <ul>
* <li>any data addition method can be blocked when all per node parallel operations are exhausted.
* The timeout defines the max time you will be blocked waiting for a permit to add a chunk of data
* into the streamer;</li>
* <li>Total timeout time for {@link #flush()} operation;</li>
* <li>Total timeout time for {@link #close()} operation.</li>
* </ul>
* By default the timeout is disabled.
*
* @param timeout Timeout in milliseconds.
* @throws IllegalArgumentException If timeout is zero or less than {@code -1}.
*/
public void timeout(long timeout);
/**
* Gets timeout set by {@link #timeout(long)}.
*
* @return Timeout in milliseconds.
*/
public long timeout();
/**
* Gets automatic flush frequency. Essentially, this is the time after which the
* streamer will make an attempt to submit all data added so far to remote nodes.
* Note that there is no guarantee that data will be delivered after this concrete
* attempt (e.g., it can fail when topology is changing), but it won't be lost anyway.
* <p>
* If set to {@code 0}, automatic flush is disabled.
* <p>
* Automatic flush is disabled by default (default value is {@code 0}).
*
* @return Flush frequency or {@code 0} if automatic flush is disabled.
* @see #flush()
*/
public long autoFlushFrequency();
/**
* Sets automatic flush frequency. Essentially, this is the time after which the
* streamer will make an attempt to submit all data added so far to remote nodes.
* Note that there is no guarantee that data will be delivered after this concrete
* attempt (e.g., it can fail when topology is changing), but it won't be lost anyway.
* <p>
* If set to {@code 0}, automatic flush is disabled.
* <p>
* Automatic flush is disabled by default (default value is {@code 0}).
*
* @param autoFlushFreq Flush frequency or {@code 0} to disable automatic flush.
* @see #flush()
*/
public void autoFlushFrequency(long autoFlushFreq);
/**
* Gets future for this streaming process. This future completes whenever method
* {@link #close(boolean)} completes. By attaching listeners to this future
* it is possible to get asynchronous notifications for completion of this
* streaming process.
*
* @return Future for this streaming process.
*/
public IgniteFuture<?> future();
/**
* Optional deploy class for peer deployment. All classes added by a data streamer
* must be class-loadable from the same class-loader. Ignite will make the best
* effort to detect the most suitable class-loader for data loading. However,
* in complex cases, where compound or deeply nested class-loaders are used,
* it is best to specify a deploy class which can be any class loaded by
* the class-loader for given data.
*
* @param depCls Any class loaded by the class-loader for given data.
*/
public void deployClass(Class<?> depCls);
/**
* Sets custom stream receiver to this data streamer.
*
* @param rcvr Stream receiver.
*/
public void receiver(StreamReceiver<K, V> rcvr);
/**
* Adds key for removal on remote node. Equivalent to {@link #addData(Object, Object) addData(key, null)}.
*
* @param key Key.
* @return Future for this operation.
* Note: It may never complete unless {@link #flush()} or {@link #close()} are explicitly called.
* @throws CacheException If failed to map key to node.
* @throws IgniteInterruptedException If thread has been interrupted.
* @throws IllegalStateException If grid has been concurrently stopped or
* {@link #close(boolean)} has already been called on streamer.
*/
public IgniteFuture<?> removeData(K key) throws CacheException, IgniteInterruptedException, IllegalStateException;
/**
* Adds data for streaming on remote node. This method can be called from multiple
* threads in parallel to speed up streaming if needed.
* <p>
* Note that streamer will stream data concurrently by multiple internal threads, so the
* data may get to remote nodes in different order from which it was added to
* the streamer. The data may not be sent until {@link #flush()} or {@link #close()} are called.
* <p>
* Note: if {@link IgniteDataStreamer#allowOverwrite()} set to {@code false} (by default)
* then data streamer will not overwrite existing cache entries for better performance
* (to change, set {@link IgniteDataStreamer#allowOverwrite(boolean)} to {@code true})
*
* @param key Key.
* @param val Value or {@code null} if respective entry must be removed from cache.
* @return Future for this operation.
* Note: It may never complete unless {@link #flush()} or {@link #close()} are explicitly called.
* @throws CacheException If failed to map key to node.
* @throws IgniteInterruptedException If thread has been interrupted.
* @throws IllegalStateException If grid has been concurrently stopped or
* {@link #close(boolean)} has already been called on streamer.
* @throws IgniteDataStreamerTimeoutException If {@code timeout} is exceeded.
* @see #allowOverwrite()
*/
public IgniteFuture<?> addData(K key, @Nullable V val) throws CacheException, IgniteInterruptedException,
IllegalStateException, IgniteDataStreamerTimeoutException;
/**
* Adds data for streaming on remote node. This method can be called from multiple
* threads in parallel to speed up streaming if needed.
* <p>
* Note that streamer will stream data concurrently by multiple internal threads, so the
* data may get to remote nodes in different order from which it was added to
* the streamer. The data may not be sent until {@link #flush()} or {@link #close()} are called.
* <p>
* Note: if {@link IgniteDataStreamer#allowOverwrite()} set to {@code false} (by default)
* then data streamer will not overwrite existing cache entries for better performance
* (to change, set {@link IgniteDataStreamer#allowOverwrite(boolean)} to {@code true})
*
* @param entry Entry.
* @return Future for this operation.
* Note: It may never complete unless {@link #flush()} or {@link #close()} are explicitly called.
* @throws CacheException If failed to map key to node.
* @throws IgniteInterruptedException If thread has been interrupted.
* @throws IllegalStateException If grid has been concurrently stopped or
* {@link #close(boolean)} has already been called on streamer.
* @throws IgniteDataStreamerTimeoutException If {@code timeout} is exceeded.
* @see #allowOverwrite()
*/
public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws CacheException, IgniteInterruptedException,
IllegalStateException, IgniteDataStreamerTimeoutException;
/**
* Adds data for streaming on remote node. This method can be called from multiple
* threads in parallel to speed up streaming if needed.
* <p>
* Note that streamer will stream data concurrently by multiple internal threads, so the
* data may get to remote nodes in different order from which it was added to
* the streamer. The data may not be sent until {@link #flush()} or {@link #close()} are called.
* <p>
* Note: if {@link IgniteDataStreamer#allowOverwrite()} set to {@code false} (by default)
* then data streamer will not overwrite existing cache entries for better performance
* (to change, set {@link IgniteDataStreamer#allowOverwrite(boolean)} to {@code true})
*
* @param entries Collection of entries to be streamed.
* @return Future for this stream operation.
* Note: It may never complete unless {@link #flush()} or {@link #close()} are explicitly called.
* @throws IllegalStateException If grid has been concurrently stopped or
* {@link #close(boolean)} has already been called on streamer.
* @throws IgniteDataStreamerTimeoutException If {@code timeout} is exceeded.
* @see #allowOverwrite()
*/
public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) throws IllegalStateException,
IgniteDataStreamerTimeoutException;
/**
* Adds data for streaming on remote node. This method can be called from multiple
* threads in parallel to speed up streaming if needed.
* <p>
* Note that streamer will stream data concurrently by multiple internal threads, so the
* data may get to remote nodes in different order from which it was added to
* the streamer. The data may not be sent until {@link #flush()} or {@link #close()} are called.
* <p>
* Note: if {@link IgniteDataStreamer#allowOverwrite()} set to {@code false} (by default)
* then data streamer will not overwrite existing cache entries for better performance
* (to change, set {@link IgniteDataStreamer#allowOverwrite(boolean)} to {@code true})
*
* @param entries Map to be streamed.
* @return Future for this stream operation.
* Note: It may never complete unless {@link #flush()} or {@link #close()} are explicitly called.
* @throws IllegalStateException If grid has been concurrently stopped or
* {@link #close(boolean)} has already been called on streamer.
* @throws IgniteDataStreamerTimeoutException If {@code timeout} is exceeded.
* @see #allowOverwrite()
*/
public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException,
IgniteDataStreamerTimeoutException;
/**
* Streams any remaining data, but doesn't close the streamer. Data can be still added after
* flush is finished. This method blocks and doesn't allow to add any data until all data
* is streamed.
* <p>
* If another thread is already performing flush, this method will block, wait for
* another thread to complete flush and exit. If you don't want to wait in this case,
* use {@link #tryFlush()} method.
* <p>
* Note that #flush() guarantees completion of all futures returned by {@link #addData(Object, Object)}, listeners
* should be tracked separately.
*
* @throws CacheException If failed to load data from buffer.
* @throws IgniteInterruptedException If thread has been interrupted.
* @throws IllegalStateException If grid has been concurrently stopped or
* {@link #close(boolean)} has already been called on streamer.
* @throws IgniteDataStreamerTimeoutException If {@code timeout} is exceeded.
* @see #tryFlush()
*/
public void flush() throws CacheException, IgniteInterruptedException, IllegalStateException,
IgniteDataStreamerTimeoutException;
/**
* Makes an attempt to stream remaining data. This method is mostly similar to {@link #flush},
* with the difference that it won't wait and will exit immediately.
*
* @throws CacheException If failed to load data from buffer.
* @throws IgniteInterruptedException If thread has been interrupted.
* @throws IllegalStateException If grid has been concurrently stopped or
* {@link #close(boolean)} has already been called on streamer.
* @see #flush()
*/
public void tryFlush() throws CacheException, IgniteInterruptedException, IllegalStateException;
/**
* Streams any remaining data and closes this streamer.
*
* @param cancel {@code True} to cancel ongoing streaming operations.
* @throws CacheException If failed to close data streamer.
* @throws IgniteInterruptedException If thread has been interrupted.
* @throws IgniteDataStreamerTimeoutException If {@code timeout} is exceeded, only if cancel is {@code false}.
*/
public void close(boolean cancel) throws CacheException, IgniteInterruptedException,
IgniteDataStreamerTimeoutException;
/**
* Closes data streamer. This method is identical to calling {@link #close(boolean) close(false)} method.
* <p>
* The method is invoked automatically on objects managed by the
* {@code try-with-resources} statement.
*
* @throws CacheException If failed to close data streamer.
* @throws IgniteInterruptedException If thread has been interrupted.
* @throws IgniteDataStreamerTimeoutException If {@code timeout} is exceeded.
*/
@Override public void close() throws CacheException, IgniteInterruptedException,
IgniteDataStreamerTimeoutException;
}