| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.datastreamer; |
| |
| import java.util.ArrayDeque; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Queue; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentLinkedDeque; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.DelayQueue; |
| import java.util.concurrent.Delayed; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| import javax.cache.CacheException; |
| import javax.cache.expiry.ExpiryPolicy; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteDataStreamer; |
| import org.apache.ignite.IgniteDataStreamerTimeoutException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteInterruptedException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.cache.CacheMode; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.cluster.ClusterTopologyException; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.events.DiscoveryEvent; |
| import org.apache.ignite.events.Event; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; |
| import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.IgniteInterruptedCheckedException; |
| import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; |
| import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; |
| import org.apache.ignite.internal.managers.communication.GridIoPolicy; |
| import org.apache.ignite.internal.managers.communication.GridMessageListener; |
| import org.apache.ignite.internal.managers.deployment.GridDeployment; |
| import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor; |
| import org.apache.ignite.internal.processors.cache.CacheObject; |
| import org.apache.ignite.internal.processors.cache.CacheObjectContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.GridCacheContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; |
| import org.apache.ignite.internal.processors.cache.GridCacheGateway; |
| import org.apache.ignite.internal.processors.cache.GridCacheUtils; |
| import org.apache.ignite.internal.processors.cache.IgniteCacheFutureImpl; |
| import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; |
| import org.apache.ignite.internal.processors.cache.KeyCacheObject; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; |
| import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; |
| import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; |
| import org.apache.ignite.internal.processors.dr.GridDrType; |
| import org.apache.ignite.internal.util.GridConcurrentHashSet; |
| import org.apache.ignite.internal.util.GridSpinBusyLock; |
| import org.apache.ignite.internal.util.future.GridCompoundFuture; |
| import org.apache.ignite.internal.util.future.GridFutureAdapter; |
| import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl; |
| import org.apache.ignite.internal.util.lang.GridPeerDeployAware; |
| import org.apache.ignite.internal.util.tostring.GridToStringExclude; |
| import org.apache.ignite.internal.util.tostring.GridToStringInclude; |
| import org.apache.ignite.internal.util.typedef.CI1; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.internal.A; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.GPC; |
| import org.apache.ignite.internal.util.typedef.internal.LT; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteClosure; |
| import org.apache.ignite.lang.IgniteFuture; |
| import org.apache.ignite.lang.IgniteInClosure; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.apache.ignite.plugin.security.SecurityPermission; |
| import org.apache.ignite.stream.StreamReceiver; |
| import org.jetbrains.annotations.Nullable; |
| import org.jsr166.ConcurrentHashMap8; |
| import org.jsr166.LongAdder8; |
| |
| import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; |
| import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; |
| import static org.apache.ignite.internal.GridTopic.TOPIC_DATASTREAM; |
| import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL; |
| |
| /** |
| * Data streamer implementation. |
| */ |
| @SuppressWarnings("unchecked") |
| public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed { |
| /** Default policy resolver. */ |
| private static final DefaultIoPolicyResolver DFLT_IO_PLC_RSLVR = new DefaultIoPolicyResolver(); |
| |
| /** Isolated receiver. */ |
| private static final StreamReceiver ISOLATED_UPDATER = new IsolatedUpdater(); |
| |
| /** Amount of permissions should be available to continue new data processing. */ |
| private static final int REMAP_SEMAPHORE_PERMISSIONS_COUNT = Integer.MAX_VALUE; |
| |
| /** Cache receiver. */ |
| private StreamReceiver<K, V> rcvr = ISOLATED_UPDATER; |
| |
| /** */ |
| private byte[] updaterBytes; |
| |
| /** IO policy resovler for data load request. */ |
| private IgniteClosure<ClusterNode, Byte> ioPlcRslvr = DFLT_IO_PLC_RSLVR; |
| |
| /** Max remap count before issuing an error. */ |
| private static final int DFLT_MAX_REMAP_CNT = 32; |
| |
| /** Log reference. */ |
| private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); |
| |
| /** Logger. */ |
| private static IgniteLogger log; |
| |
| /** Cache name ({@code null} for default cache). */ |
| private final String cacheName; |
| |
| /** Per-node buffer size. */ |
| @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") |
| private int bufSize = DFLT_PER_NODE_BUFFER_SIZE; |
| |
| /** */ |
| private int parallelOps = DFLT_MAX_PARALLEL_OPS; |
| |
| /** */ |
| private long timeout = DFLT_UNLIMIT_TIMEOUT; |
| |
| /** */ |
| private long autoFlushFreq; |
| |
| /** Mapping. */ |
| @GridToStringInclude |
| private ConcurrentMap<UUID, Buffer> bufMappings = new ConcurrentHashMap8<>(); |
| |
| /** Discovery listener. */ |
| private final GridLocalEventListener discoLsnr; |
| |
| /** Context. */ |
| private final GridKernalContext ctx; |
| |
| /** */ |
| private final IgniteCacheObjectProcessor cacheObjProc; |
| |
| /** */ |
| private final CacheObjectContext cacheObjCtx; |
| |
| /** Communication topic for responses. */ |
| private final Object topic; |
| |
| /** */ |
| private byte[] topicBytes; |
| |
| /** {@code True} if data loader has been cancelled. */ |
| private volatile boolean cancelled; |
| |
| /** Fail counter. */ |
| private final LongAdder8 failCntr = new LongAdder8(); |
| |
| /** Active futures of this data loader. */ |
| @GridToStringInclude |
| private final Collection<IgniteInternalFuture<?>> activeFuts = new GridConcurrentHashSet<>(); |
| |
| /** Closure to remove from active futures. */ |
| @GridToStringExclude |
| private final IgniteInClosure<IgniteInternalFuture<?>> rmvActiveFut = new IgniteInClosure<IgniteInternalFuture<?>>() { |
| @Override public void apply(IgniteInternalFuture<?> t) { |
| boolean rmv = activeFuts.remove(t); |
| |
| assert rmv; |
| |
| Throwable err = t.error(); |
| |
| if (err != null && !(err instanceof IgniteClientDisconnectedCheckedException)) { |
| LT.error(log, t.error(), "DataStreamer operation failed.", true); |
| |
| failCntr.increment(); |
| |
| cancelled = true; |
| } |
| } |
| }; |
| |
| /** Job peer deploy aware. */ |
| private volatile GridPeerDeployAware jobPda; |
| |
| /** Deployment class. */ |
| private Class<?> depCls; |
| |
| /** Future to track loading finish. */ |
| private final GridFutureAdapter<?> fut; |
| |
| /** Public API future to track loading finish. */ |
| private final IgniteFuture<?> publicFut; |
| |
| /** Busy lock. */ |
| private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); |
| |
| /** */ |
| private CacheException disconnectErr; |
| |
| /** Closed flag. */ |
| private final AtomicBoolean closed = new AtomicBoolean(); |
| |
| /** */ |
| private volatile long lastFlushTime = U.currentTimeMillis(); |
| |
| /** */ |
| private final DelayQueue<DataStreamerImpl<K, V>> flushQ; |
| |
| /** */ |
| private boolean skipStore; |
| |
| /** */ |
| private boolean keepBinary; |
| |
| /** */ |
| private int maxRemapCnt = DFLT_MAX_REMAP_CNT; |
| |
| /** Whether a warning at {@link DataStreamerImpl#allowOverwrite()} printed */ |
| private static boolean isWarningPrinted; |
| |
| /** Allows to pause new data processing while failed data processing in progress. */ |
| private final Semaphore remapSem = new Semaphore(REMAP_SEMAPHORE_PERMISSIONS_COUNT); |
| |
| /** */ |
| private final ConcurrentLinkedDeque<Runnable> dataToRemap = new ConcurrentLinkedDeque<>(); |
| |
| /** */ |
| private final AtomicBoolean remapOwning = new AtomicBoolean(); |
| |
| /** |
| * @param ctx Grid kernal context. |
| * @param cacheName Cache name. |
| * @param flushQ Flush queue. |
| */ |
| public DataStreamerImpl( |
| final GridKernalContext ctx, |
| @Nullable final String cacheName, |
| DelayQueue<DataStreamerImpl<K, V>> flushQ |
| ) { |
| assert ctx != null; |
| |
| this.ctx = ctx; |
| this.cacheObjProc = ctx.cacheObjects(); |
| |
| if (log == null) |
| log = U.logger(ctx, logRef, DataStreamerImpl.class); |
| |
| CacheConfiguration ccfg = ctx.cache().cacheConfiguration(cacheName); |
| |
| try { |
| this.cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException("Failed to initialize cache context.", e); |
| } |
| |
| this.cacheName = cacheName; |
| this.flushQ = flushQ; |
| |
| discoLsnr = new GridLocalEventListener() { |
| @Override public void onEvent(Event evt) { |
| assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT; |
| |
| DiscoveryEvent discoEvt = (DiscoveryEvent)evt; |
| |
| UUID id = discoEvt.eventNode().id(); |
| |
| // Remap regular mappings. |
| final Buffer buf = bufMappings.remove(id); |
| |
| // Only async notification is possible since |
| // discovery thread may be trapped otherwise. |
| if (buf != null) { |
| waitAffinityAndRun(new Runnable() { |
| @Override public void run() { |
| buf.onNodeLeft(); |
| } |
| }, discoEvt.topologyVersion(), true); |
| } |
| } |
| }; |
| |
| ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT); |
| |
| // Generate unique topic for this loader. |
| topic = TOPIC_DATASTREAM.topic(IgniteUuid.fromUuid(ctx.localNodeId())); |
| |
| ctx.io().addMessageListener(topic, new GridMessageListener() { |
| @Override public void onMessage(UUID nodeId, Object msg) { |
| assert msg instanceof DataStreamerResponse; |
| |
| DataStreamerResponse res = (DataStreamerResponse)msg; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Received data load response: " + res); |
| |
| Buffer buf = bufMappings.get(nodeId); |
| |
| if (buf != null) |
| buf.onResponse(res, nodeId); |
| |
| else if (log.isDebugEnabled()) |
| log.debug("Ignoring response since node has left [nodeId=" + nodeId + ", "); |
| } |
| }); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Added response listener within topic: " + topic); |
| |
| fut = new DataStreamerFuture(this); |
| |
| publicFut = new IgniteCacheFutureImpl<>(fut); |
| |
| GridCacheAdapter cache = ctx.cache().internalCache(cacheName); |
| |
| if (cache == null) { // Possible, cache is not configured on node. |
| assert ccfg != null; |
| |
| if (ccfg.getCacheMode() == CacheMode.LOCAL) |
| throw new CacheException("Impossible to load Local cache configured remotely."); |
| |
| ctx.grid().getOrCreateCache(ccfg); |
| } |
| } |
| |
| /** |
| * @param c Closure to run. |
| * @param topVer Topology version to wait for. |
| * @param async Async flag. |
| */ |
| private void waitAffinityAndRun(final Runnable c, long topVer, boolean async) { |
| AffinityTopologyVersion topVer0 = new AffinityTopologyVersion(topVer, 0); |
| |
| IgniteInternalFuture<?> fut = ctx.cache().context().exchange().affinityReadyFuture(topVer0); |
| |
| if (fut != null && !fut.isDone()) { |
| fut.listen(new CI1<IgniteInternalFuture<?>>() { |
| @Override public void apply(IgniteInternalFuture<?> fut) { |
| ctx.closure().runLocalSafe(c, true); |
| } |
| }); |
| } |
| else { |
| if (async) |
| ctx.closure().runLocalSafe(c, true); |
| else |
| c.run(); |
| } |
| } |
| |
| /** |
| * @return Cache object context. |
| */ |
| public CacheObjectContext cacheObjectContext() { |
| return cacheObjCtx; |
| } |
| |
| /** |
| * Enters busy lock. |
| */ |
| private void enterBusy() { |
| if (!busyLock.enterBusy()) { |
| if (disconnectErr != null) |
| throw disconnectErr; |
| |
| throw new IllegalStateException("Data streamer has been closed."); |
| } |
| else if (cancelled) { |
| busyLock.leaveBusy(); |
| |
| throw new IllegalStateException("Data streamer has been closed."); |
| } |
| } |
| |
| /** |
| * Leaves busy lock. |
| */ |
| private void leaveBusy() { |
| busyLock.leaveBusy(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteFuture<?> future() { |
| return publicFut; |
| } |
| |
| /** |
| * @return Internal future. |
| */ |
| public IgniteInternalFuture<?> internalFuture() { |
| return fut; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void deployClass(Class<?> depCls) { |
| this.depCls = depCls; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void receiver(StreamReceiver<K, V> rcvr) { |
| A.notNull(rcvr, "rcvr"); |
| |
| this.rcvr = rcvr; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean allowOverwrite() { |
| return rcvr != ISOLATED_UPDATER; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void allowOverwrite(boolean allow) { |
| if (allow == allowOverwrite()) |
| return; |
| |
| ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes()); |
| |
| if (node == null) |
| throw new CacheException("Failed to get node for cache: " + cacheName); |
| |
| rcvr = allow ? DataStreamerCacheUpdaters.<K, V>individual() : ISOLATED_UPDATER; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean skipStore() { |
| return skipStore; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void skipStore(boolean skipStore) { |
| this.skipStore = skipStore; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean keepBinary() { |
| return keepBinary; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void keepBinary(boolean keepBinary) { |
| this.keepBinary = keepBinary; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override @Nullable public String cacheName() { |
| return cacheName; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int perNodeBufferSize() { |
| return bufSize; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void perNodeBufferSize(int bufSize) { |
| A.ensure(bufSize > 0, "bufSize > 0"); |
| |
| this.bufSize = bufSize; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int perNodeParallelOperations() { |
| return parallelOps; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void perNodeParallelOperations(int parallelOps) { |
| this.parallelOps = parallelOps; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void timeout(long timeout) { |
| if (timeout < -1 || timeout == 0) |
| throw new IllegalArgumentException(); |
| |
| this.timeout = timeout; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long timeout() { |
| return this.timeout; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long autoFlushFrequency() { |
| return autoFlushFreq; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void autoFlushFrequency(long autoFlushFreq) { |
| A.ensure(autoFlushFreq >= 0, "autoFlushFreq >= 0"); |
| |
| long old = this.autoFlushFreq; |
| |
| if (autoFlushFreq != old) { |
| this.autoFlushFreq = autoFlushFreq; |
| |
| if (autoFlushFreq != 0 && old == 0) |
| flushQ.add(this); |
| else if (autoFlushFreq == 0) |
| flushQ.remove(this); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException { |
| A.notNull(entries, "entries"); |
| |
| return addData(entries.entrySet()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) { |
| A.notEmpty(entries, "entries"); |
| |
| checkSecurityPermission(SecurityPermission.CACHE_PUT); |
| |
| enterBusy(); |
| |
| try { |
| GridFutureAdapter<Object> resFut = new GridFutureAdapter<>(); |
| |
| resFut.listen(rmvActiveFut); |
| |
| activeFuts.add(resFut); |
| |
| Collection<KeyCacheObjectWrapper> keys = |
| new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()), 1); |
| |
| Collection<DataStreamerEntry> entries0 = new ArrayList<>(entries.size()); |
| |
| for (Map.Entry<K, V> entry : entries) { |
| KeyCacheObject key = cacheObjProc.toCacheKeyObject(cacheObjCtx, null, entry.getKey(), true); |
| CacheObject val = cacheObjProc.toCacheObject(cacheObjCtx, entry.getValue(), true); |
| |
| keys.add(new KeyCacheObjectWrapper(key)); |
| |
| entries0.add(new DataStreamerEntry(key, val)); |
| } |
| |
| load0(entries0, resFut, keys, 0); |
| |
| return new IgniteCacheFutureImpl<>(resFut); |
| } |
| catch (IgniteDataStreamerTimeoutException e) { |
| throw e; |
| } |
| catch (IgniteException e) { |
| return new IgniteFinishedFutureImpl<>(e); |
| } |
| finally { |
| leaveBusy(); |
| } |
| } |
| |
| /** |
| * @param key Key. |
| * @param val Value. |
| * @return Future. |
| */ |
| public IgniteFuture<?> addDataInternal(KeyCacheObject key, CacheObject val) { |
| return addDataInternal(Collections.singleton(new DataStreamerEntry(key, val))); |
| } |
| |
| /** |
| * @param key Key. |
| * @return Future. |
| */ |
| public IgniteFuture<?> removeDataInternal(KeyCacheObject key) { |
| return addDataInternal(Collections.singleton(new DataStreamerEntry(key, null))); |
| } |
| |
| /** |
| * @param entries Entries. |
| * @return Future. |
| */ |
| public IgniteFuture<?> addDataInternal(Collection<? extends DataStreamerEntry> entries) { |
| enterBusy(); |
| |
| GridFutureAdapter<Object> resFut = new GridFutureAdapter<>(); |
| |
| try { |
| resFut.listen(rmvActiveFut); |
| |
| activeFuts.add(resFut); |
| |
| Collection<KeyCacheObjectWrapper> keys = null; |
| |
| if (entries.size() > 1) { |
| keys = new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()), 1); |
| |
| for (DataStreamerEntry entry : entries) |
| keys.add(new KeyCacheObjectWrapper(entry.getKey())); |
| } |
| |
| load0(entries, resFut, keys, 0); |
| |
| return new IgniteCacheFutureImpl<>(resFut); |
| } |
| catch (Throwable e) { |
| resFut.onDone(e); |
| |
| if (e instanceof Error || e instanceof IgniteDataStreamerTimeoutException) |
| throw e; |
| |
| return new IgniteFinishedFutureImpl<>(e); |
| } |
| finally { |
| leaveBusy(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) { |
| A.notNull(entry, "entry"); |
| |
| return addData(F.asList(entry)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteFuture<?> addData(K key, V val) { |
| A.notNull(key, "key"); |
| |
| if (val == null) |
| checkSecurityPermission(SecurityPermission.CACHE_REMOVE); |
| else |
| checkSecurityPermission(SecurityPermission.CACHE_PUT); |
| |
| KeyCacheObject key0 = cacheObjProc.toCacheKeyObject(cacheObjCtx, null, key, true); |
| CacheObject val0 = cacheObjProc.toCacheObject(cacheObjCtx, val, true); |
| |
| return addDataInternal(Collections.singleton(new DataStreamerEntry(key0, val0))); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteFuture<?> removeData(K key) { |
| return addData(key, null); |
| } |
| |
| /** |
| * @param ioPlcRslvr IO policy resolver. |
| */ |
| public void ioPolicyResolver(IgniteClosure<ClusterNode, Byte> ioPlcRslvr) { |
| this.ioPlcRslvr = ioPlcRslvr; |
| } |
| |
| /** |
| * |
| */ |
| private void acquireRemapSemaphore() throws IgniteInterruptedCheckedException { |
| try { |
| if (remapSem.availablePermits() != REMAP_SEMAPHORE_PERMISSIONS_COUNT) { |
| if (timeout == DFLT_UNLIMIT_TIMEOUT) { |
| // Wait until failed data being processed. |
| remapSem.acquire(REMAP_SEMAPHORE_PERMISSIONS_COUNT); |
| |
| remapSem.release(REMAP_SEMAPHORE_PERMISSIONS_COUNT); |
| } |
| else { |
| // Wait until failed data being processed. |
| boolean res = remapSem.tryAcquire(REMAP_SEMAPHORE_PERMISSIONS_COUNT, timeout, TimeUnit.MILLISECONDS); |
| |
| if (res) |
| remapSem.release(REMAP_SEMAPHORE_PERMISSIONS_COUNT); |
| else |
| throw new IgniteDataStreamerTimeoutException("Data streamer exceeded timeout " + |
| "while was waiting for failed data resending finished."); |
| } |
| } |
| } |
| catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| |
| throw new IgniteInterruptedCheckedException(e); |
| } |
| } |
| |
| /** |
| * @param entries Entries. |
| * @param resFut Result future. |
| * @param activeKeys Active keys. |
| * @param remaps Remaps count. |
| */ |
| private void load0( |
| Collection<? extends DataStreamerEntry> entries, |
| final GridFutureAdapter<Object> resFut, |
| @Nullable final Collection<KeyCacheObjectWrapper> activeKeys, |
| final int remaps |
| ) { |
| try { |
| assert entries != null; |
| |
| final boolean remap = remaps > 0; |
| |
| if (!remap) { // Failed data should be processed prior to new data. |
| acquireRemapSemaphore(); |
| } |
| |
| if (!isWarningPrinted) { |
| synchronized (this) { |
| if (!allowOverwrite() && !isWarningPrinted) { |
| U.warn(log, "Data streamer will not overwrite existing cache entries for better performance " + |
| "(to change, set allowOverwrite to true)"); |
| } |
| |
| isWarningPrinted = true; |
| } |
| } |
| |
| Map<ClusterNode, Collection<DataStreamerEntry>> mappings = new HashMap<>(); |
| |
| boolean initPda = ctx.deploy().enabled() && jobPda == null; |
| |
| GridCacheAdapter cache = ctx.cache().internalCache(cacheName); |
| |
| if (cache == null) |
| throw new IgniteCheckedException("Cache not created or already destroyed."); |
| |
| GridCacheContext cctx = cache.context(); |
| |
| GridCacheGateway gate = null; |
| |
| if (!allowOverwrite() && !cctx.isLocal()) { // Cases where cctx required. |
| gate = cctx.gate(); |
| |
| gate.enter(); |
| } |
| |
| try { |
| AffinityTopologyVersion topVer = allowOverwrite() || cctx.isLocal() ? |
| ctx.cache().context().exchange().readyAffinityVersion() : |
| cctx.topology().topologyVersion(); |
| |
| for (DataStreamerEntry entry : entries) { |
| List<ClusterNode> nodes; |
| |
| try { |
| KeyCacheObject key = entry.getKey(); |
| |
| assert key != null; |
| |
| if (initPda) { |
| if (cacheObjCtx.addDeploymentInfo()) |
| jobPda = new DataStreamerPda(key.value(cacheObjCtx, false), |
| entry.getValue() != null ? entry.getValue().value(cacheObjCtx, false) : null, |
| rcvr); |
| else if (rcvr != null) |
| jobPda = new DataStreamerPda(rcvr); |
| |
| initPda = false; |
| } |
| |
| nodes = nodes(key, topVer, cctx); |
| } |
| catch (IgniteCheckedException e) { |
| resFut.onDone(e); |
| |
| return; |
| } |
| |
| if (F.isEmpty(nodes)) { |
| resFut.onDone(new ClusterTopologyException("Failed to map key to node " + |
| "(no nodes with cache found in topology) [infos=" + entries.size() + |
| ", cacheName=" + cacheName + ']')); |
| |
| return; |
| } |
| |
| for (ClusterNode node : nodes) { |
| Collection<DataStreamerEntry> col = mappings.get(node); |
| |
| if (col == null) |
| mappings.put(node, col = new ArrayList<>()); |
| |
| col.add(entry); |
| } |
| } |
| |
| for (final Map.Entry<ClusterNode, Collection<DataStreamerEntry>> e : mappings.entrySet()) { |
| final UUID nodeId = e.getKey().id(); |
| |
| Buffer buf = bufMappings.get(nodeId); |
| |
| if (buf == null) { |
| Buffer old = bufMappings.putIfAbsent(nodeId, buf = new Buffer(e.getKey())); |
| |
| if (old != null) |
| buf = old; |
| } |
| |
| final Collection<DataStreamerEntry> entriesForNode = e.getValue(); |
| |
| IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() { |
| @Override public void apply(IgniteInternalFuture<?> t) { |
| try { |
| t.get(); |
| |
| if (activeKeys != null) { |
| for (DataStreamerEntry e : entriesForNode) |
| activeKeys.remove(new KeyCacheObjectWrapper(e.getKey())); |
| |
| if (activeKeys.isEmpty()) |
| resFut.onDone(); |
| } |
| else { |
| assert entriesForNode.size() == 1; |
| |
| // That has been a single key, |
| // so complete result future right away. |
| resFut.onDone(); |
| } |
| } |
| catch (IgniteClientDisconnectedCheckedException e1) { |
| if (log.isDebugEnabled()) |
| log.debug("Future finished with disconnect error [nodeId=" + nodeId + ", err=" + e1 + ']'); |
| |
| resFut.onDone(e1); |
| } |
| catch (IgniteCheckedException e1) { |
| if (log.isDebugEnabled()) |
| log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']'); |
| |
| if (cancelled) { |
| resFut.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + |
| DataStreamerImpl.this, e1)); |
| } |
| else if (remaps + 1 > maxRemapCnt) { |
| resFut.onDone(new IgniteCheckedException("Failed to finish operation (too many remaps): " |
| + remaps, e1)); |
| } |
| else { |
| try { |
| remapSem.acquire(); |
| |
| final Runnable r = new Runnable() { |
| @Override public void run() { |
| try { |
| if (cancelled) |
| throw new IllegalStateException("DataStreamer closed."); |
| |
| load0(entriesForNode, resFut, activeKeys, remaps + 1); |
| } |
| catch (Throwable ex) { |
| resFut.onDone( |
| new IgniteCheckedException("DataStreamer remapping failed. ", ex)); |
| } |
| finally { |
| remapSem.release(); |
| } |
| } |
| }; |
| |
| dataToRemap.add(r); |
| |
| if (!remapOwning.get() && remapOwning.compareAndSet(false, true)) { |
| ctx.closure().callLocalSafe(new GPC<Boolean>() { |
| @Override public Boolean call() { |
| boolean locked = true; |
| |
| while (locked || !dataToRemap.isEmpty()) { |
| if (!locked && !remapOwning.compareAndSet(false, true)) |
| return false; |
| |
| try { |
| Runnable r = dataToRemap.poll(); |
| |
| if (r != null) |
| r.run(); |
| } |
| finally { |
| if (!dataToRemap.isEmpty()) |
| locked = true; |
| else { |
| remapOwning.set(false); |
| |
| locked = false; |
| } |
| } |
| } |
| |
| return true; |
| } |
| }, true); |
| } |
| } |
| catch (InterruptedException e2) { |
| resFut.onDone(e2); |
| } |
| } |
| } |
| } |
| }; |
| |
| final GridFutureAdapter<?> f; |
| |
| try { |
| f = buf.update(entriesForNode, topVer, lsnr, remap); |
| } |
| catch (IgniteInterruptedCheckedException e1) { |
| resFut.onDone(e1); |
| |
| return; |
| } |
| |
| if (ctx.discovery().node(nodeId) == null) { |
| if (bufMappings.remove(nodeId, buf)) { |
| final Buffer buf0 = buf; |
| |
| waitAffinityAndRun(new Runnable() { |
| @Override public void run() { |
| buf0.onNodeLeft(); |
| |
| if (f != null) |
| f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " + |
| "(node has left): " + nodeId)); |
| } |
| }, ctx.discovery().topologyVersion(), false); |
| } |
| } |
| } |
| } |
| finally { |
| if (gate != null) |
| gate.leave(); |
| } |
| } |
| catch (Exception ex) { |
| resFut.onDone(new IgniteCheckedException("DataStreamer data loading failed.", ex)); |
| } |
| } |
| |
| /** |
| * @param key Key to map. |
| * @param topVer Topology version. |
| * @param cctx Context. |
| * @return Nodes to send requests to. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private List<ClusterNode> nodes(KeyCacheObject key, |
| AffinityTopologyVersion topVer, |
| GridCacheContext cctx) throws IgniteCheckedException { |
| GridAffinityProcessor aff = ctx.affinity(); |
| |
| List<ClusterNode> res = null; |
| |
| if (!allowOverwrite()) |
| res = cctx.isLocal() ? |
| aff.mapKeyToPrimaryAndBackups(cacheName, key, topVer) : |
| cctx.topology().nodes(cctx.affinity().partition(key), topVer); |
| else { |
| ClusterNode node = aff.mapKeyToNode(cacheName, key, topVer); |
| |
| if (node != null) |
| res = Collections.singletonList(node); |
| } |
| |
| if (F.isEmpty(res)) |
| throw new ClusterTopologyServerNotFoundException("Failed to find server node for cache (all affinity " + |
| "nodes have left the grid or cache was stopped): " + cacheName); |
| |
| return res; |
| } |
| |
| /** |
| * Performs flush. |
| * |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void doFlush() throws IgniteCheckedException { |
| lastFlushTime = U.currentTimeMillis(); |
| |
| List<IgniteInternalFuture> activeFuts0 = null; |
| |
| int doneCnt = 0; |
| |
| for (IgniteInternalFuture<?> f : activeFuts) { |
| if (!f.isDone()) { |
| if (activeFuts0 == null) |
| activeFuts0 = new ArrayList<>((int)(activeFuts.size() * 1.2)); |
| |
| activeFuts0.add(f); |
| } |
| else { |
| f.get(); |
| |
| doneCnt++; |
| } |
| } |
| |
| if (activeFuts0 == null || activeFuts0.isEmpty()) |
| return; |
| |
| while (true) { |
| Queue<IgniteInternalFuture<?>> q = null; |
| |
| for (Buffer buf : bufMappings.values()) { |
| IgniteInternalFuture<?> flushFut = buf.flush(); |
| |
| if (flushFut != null) { |
| if (q == null) |
| q = new ArrayDeque<>(bufMappings.size() * 2); |
| |
| q.add(flushFut); |
| } |
| } |
| |
| if (q != null) { |
| assert !q.isEmpty(); |
| |
| boolean err = false; |
| |
| long startTimeMillis = U.currentTimeMillis(); |
| |
| for (IgniteInternalFuture fut = q.poll(); fut != null; fut = q.poll()) { |
| try { |
| if (timeout == DFLT_UNLIMIT_TIMEOUT) |
| fut.get(); |
| else { |
| long timeRemain = timeout - U.currentTimeMillis() + startTimeMillis; |
| |
| if (timeRemain <= 0) |
| throw new IgniteDataStreamerTimeoutException("Data streamer exceeded timeout on flush."); |
| |
| fut.get(timeRemain); |
| } |
| } |
| catch (IgniteClientDisconnectedCheckedException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to flush buffer: " + e); |
| |
| throw CU.convertToCacheException(e); |
| } |
| catch (IgniteFutureTimeoutCheckedException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to flush buffer: " + e); |
| |
| throw new IgniteDataStreamerTimeoutException("Data streamer exceeded timeout on flush.", e); |
| } |
| catch (IgniteCheckedException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to flush buffer: " + e); |
| |
| err = true; |
| } |
| } |
| |
| if (err) |
| // Remaps needed - flush buffers. |
| continue; |
| } |
| |
| doneCnt = 0; |
| |
| for (int i = 0; i < activeFuts0.size(); i++) { |
| IgniteInternalFuture f = activeFuts0.get(i); |
| |
| if (f == null) |
| doneCnt++; |
| else if (f.isDone()) { |
| f.get(); |
| |
| doneCnt++; |
| |
| activeFuts0.set(i, null); |
| } |
| else |
| break; |
| } |
| |
| if (doneCnt == activeFuts0.size()) |
| return; |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("ForLoopReplaceableByForEach") |
| @Override public void flush() throws CacheException { |
| enterBusy(); |
| |
| try { |
| doFlush(); |
| } |
| catch (IgniteCheckedException e) { |
| throw CU.convertToCacheException(e); |
| } |
| finally { |
| leaveBusy(); |
| } |
| } |
| |
| /** |
| * Flushes every internal buffer if buffer was flushed before passed in |
| * threshold. |
| * <p> |
| * Does not wait for result and does not fail on errors assuming that this method |
| * should be called periodically. |
| */ |
| @Override public void tryFlush() throws IgniteInterruptedException { |
| if (!busyLock.enterBusy()) |
| return; |
| |
| try { |
| for (Buffer buf : bufMappings.values()) |
| buf.flush(); |
| |
| lastFlushTime = U.currentTimeMillis(); |
| } |
| catch (IgniteInterruptedCheckedException e) { |
| throw GridCacheUtils.convertToCacheException(e); |
| } |
| finally { |
| leaveBusy(); |
| } |
| } |
| |
| /** |
| * @param cancel {@code True} to close with cancellation. |
| * @throws CacheException If failed. |
| */ |
| @Override public void close(boolean cancel) throws CacheException { |
| try { |
| closeEx(cancel); |
| } |
| catch (IgniteCheckedException e) { |
| throw CU.convertToCacheException(e); |
| } |
| } |
| |
| /** |
| * @param cancel {@code True} to close with cancellation. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public void closeEx(boolean cancel) throws IgniteCheckedException { |
| IgniteCheckedException err = closeEx(cancel, null); |
| |
| if (err != null) |
| throw err; // Throws at close(). |
| } |
| |
| /** |
| * @param cancel {@code True} to close with cancellation. |
| * @param err Error. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private IgniteCheckedException closeEx(boolean cancel, IgniteCheckedException err) throws IgniteCheckedException { |
| if (!closed.compareAndSet(false, true)) |
| return null; |
| |
| busyLock.block(); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Closing data streamer [ldr=" + this + ", cancel=" + cancel + ']'); |
| |
| try { |
| // Assuming that no methods are called on this loader after this method is called. |
| if (cancel) { |
| cancelled = true; |
| |
| for (Buffer buf : bufMappings.values()) |
| buf.cancelAll(err); |
| } |
| else |
| doFlush(); |
| |
| ctx.event().removeLocalEventListener(discoLsnr); |
| |
| ctx.io().removeMessageListener(topic); |
| } |
| catch (IgniteCheckedException | IgniteDataStreamerTimeoutException e) { |
| fut.onDone(e); |
| throw e; |
| } |
| |
| long failed = failCntr.longValue(); |
| |
| if (failed > 0 && err == null) |
| err = new IgniteCheckedException("Some of DataStreamer operations failed [failedCount=" + failed + "]"); |
| |
| fut.onDone(err); |
| |
| return err; |
| } |
| |
| /** |
| * @param reconnectFut Reconnect future. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException { |
| IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut, |
| "Data streamer has been closed, client node disconnected."); |
| |
| disconnectErr = (CacheException)CU.convertToCacheException(err); |
| |
| for (Buffer buf : bufMappings.values()) |
| buf.cancelAll(err); |
| |
| closeEx(true, err); |
| } |
| |
| /** |
| * @return {@code true} If the loader is closed. |
| */ |
| boolean isClosed() { |
| return fut.isDone(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void close() throws CacheException { |
| close(false); |
| } |
| |
| /** |
| * @return Max remap count. |
| */ |
| public int maxRemapCount() { |
| return maxRemapCnt; |
| } |
| |
| /** |
| * @param maxRemapCnt New max remap count. |
| */ |
| public void maxRemapCount(int maxRemapCnt) { |
| this.maxRemapCnt = maxRemapCnt; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(DataStreamerImpl.class, this); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long getDelay(TimeUnit unit) { |
| return unit.convert(nextFlushTime() - U.currentTimeMillis(), TimeUnit.MILLISECONDS); |
| } |
| |
| /** |
| * @return Next flush time. |
| */ |
| private long nextFlushTime() { |
| return lastFlushTime + autoFlushFreq; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int compareTo(Delayed o) { |
| return nextFlushTime() > ((DataStreamerImpl)o).nextFlushTime() ? 1 : -1; |
| } |
| |
| /** |
| * Check permissions for streaming. |
| * |
| * @param perm Security permission. |
| * @throws org.apache.ignite.plugin.security.SecurityException If permissions are not enough for streaming. |
| */ |
| private void checkSecurityPermission(SecurityPermission perm) |
| throws org.apache.ignite.plugin.security.SecurityException { |
| if (!ctx.security().enabled()) |
| return; |
| |
| ctx.security().authorize(cacheName, perm, null); |
| } |
| |
| /** |
| * |
| */ |
| private class Buffer { |
| /** Node. */ |
| private final ClusterNode node; |
| |
| /** Active futures. */ |
| private final Collection<IgniteInternalFuture<Object>> locFuts; |
| |
| /** Buffered entries. */ |
| private List<DataStreamerEntry> entries; |
| |
| /** */ |
| @GridToStringExclude |
| private GridFutureAdapter<Object> curFut; |
| |
| /** Local node flag. */ |
| private final boolean isLocNode; |
| |
| /** ID generator. */ |
| private final AtomicLong idGen = new AtomicLong(); |
| |
| /** Active futures. */ |
| private final ConcurrentMap<Long, GridFutureAdapter<Object>> reqs; |
| |
| /** */ |
| private final Semaphore sem; |
| |
| /** Batch topology. */ |
| private AffinityTopologyVersion batchTopVer; |
| |
| /** Closure to signal on task finish. */ |
| @GridToStringExclude |
| private final IgniteInClosure<IgniteInternalFuture<Object>> signalC = new IgniteInClosure<IgniteInternalFuture<Object>>() { |
| @Override public void apply(IgniteInternalFuture<Object> t) { |
| signalTaskFinished(t); |
| } |
| }; |
| |
| /** |
| * @param node Node. |
| */ |
| Buffer(ClusterNode node) { |
| assert node != null; |
| |
| this.node = node; |
| |
| locFuts = new GridConcurrentHashSet<>(); |
| reqs = new ConcurrentHashMap8<>(); |
| |
| // Cache local node flag. |
| isLocNode = node.equals(ctx.discovery().localNode()); |
| |
| entries = newEntries(); |
| curFut = new GridFutureAdapter<>(); |
| curFut.listen(signalC); |
| |
| sem = new Semaphore(parallelOps); |
| } |
| |
| /** |
| * @param remap Remapping flag. |
| */ |
| private void renewBatch(boolean remap) { |
| entries = newEntries(); |
| curFut = new GridFutureAdapter<>(); |
| |
| batchTopVer = null; |
| |
| if (!remap) |
| curFut.listen(signalC); |
| } |
| |
| /** |
| * @param newEntries Infos. |
| * @param topVer Topology version. |
| * @param lsnr Listener for the operation future. |
| * @param remap Remapping flag. |
| * @return Future for operation. |
| * @throws IgniteInterruptedCheckedException If failed. |
| */ |
| @Nullable GridFutureAdapter<?> update(Iterable<DataStreamerEntry> newEntries, |
| AffinityTopologyVersion topVer, |
| IgniteInClosure<IgniteInternalFuture<?>> lsnr, |
| boolean remap) throws IgniteInterruptedCheckedException { |
| List<DataStreamerEntry> entries0 = null; |
| |
| GridFutureAdapter<Object> curFut0; |
| |
| AffinityTopologyVersion curBatchTopVer; |
| |
| synchronized (this) { |
| curFut0 = curFut; |
| |
| curFut0.listen(lsnr); |
| |
| if (batchTopVer == null) |
| batchTopVer = topVer; |
| |
| curBatchTopVer = batchTopVer; |
| |
| for (DataStreamerEntry entry : newEntries) |
| entries.add(entry); |
| |
| if (entries.size() >= bufSize) { |
| entries0 = entries; |
| |
| renewBatch(remap); |
| } |
| } |
| |
| if (!allowOverwrite() && !topVer.equals(curBatchTopVer)) { |
| renewBatch(remap); |
| |
| curFut0.onDone(null, new IgniteCheckedException("Topology changed during batch preparation." + |
| "[batchTopVer=" + curBatchTopVer + ", topVer=" + topVer + "]")); |
| } |
| else if (entries0 != null) { |
| submit(entries0, curBatchTopVer, curFut0, remap); |
| |
| if (cancelled) |
| curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + |
| DataStreamerImpl.this)); |
| else if (ctx.clientDisconnected()) |
| curFut0.onDone(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(), |
| "Client node disconnected.")); |
| } |
| |
| return curFut0; |
| } |
| |
| /** |
| * @return Fresh collection with some space for outgrowth. |
| */ |
| private List<DataStreamerEntry> newEntries() { |
| return new ArrayList<>((int)(bufSize * 1.2)); |
| } |
| |
| /** |
| * @return Future if any submitted. |
| * @throws IgniteInterruptedCheckedException If thread has been interrupted. |
| */ |
| @Nullable IgniteInternalFuture<?> flush() throws IgniteInterruptedCheckedException { |
| List<DataStreamerEntry> entries0 = null; |
| GridFutureAdapter<Object> curFut0 = null; |
| |
| acquireRemapSemaphore(); |
| |
| synchronized (this) { |
| if (!entries.isEmpty()) { |
| entries0 = entries; |
| curFut0 = curFut; |
| |
| entries = newEntries(); |
| curFut = new GridFutureAdapter<>(); |
| curFut.listen(signalC); |
| } |
| } |
| |
| if (entries0 != null) |
| submit(entries0, batchTopVer, curFut0, false); |
| |
| // Create compound future for this flush. |
| GridCompoundFuture<Object, Object> res = null; |
| |
| for (IgniteInternalFuture<Object> f : locFuts) { |
| if (res == null) |
| res = new GridCompoundFuture<>(); |
| |
| res.add(f); |
| } |
| |
| for (IgniteInternalFuture<Object> f : reqs.values()) { |
| if (res == null) |
| res = new GridCompoundFuture<>(); |
| |
| res.add(f); |
| } |
| |
| if (res != null) |
| res.markInitialized(); |
| |
| return res; |
| } |
| |
| /** |
| * Increments active tasks count. |
| * |
| * @throws IgniteInterruptedCheckedException If thread has been interrupted. |
| */ |
| private void incrementActiveTasks() throws IgniteInterruptedCheckedException { |
| if (timeout == DFLT_UNLIMIT_TIMEOUT) |
| U.acquire(sem); |
| else if (!U.tryAcquire(sem, timeout, TimeUnit.MILLISECONDS)) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to add parallel operation."); |
| |
| throw new IgniteDataStreamerTimeoutException("Data streamer exceeded timeout when starts parallel operation."); |
| } |
| } |
| |
| /** |
| * @param f Future that finished. |
| */ |
| private void signalTaskFinished(IgniteInternalFuture<Object> f) { |
| assert f != null; |
| |
| sem.release(); |
| } |
| |
| /** |
| * @param entries Entries. |
| * @param reqTopVer Request topology version. |
| * @param curFut Current future. |
| */ |
| private void localUpdate(final Collection<DataStreamerEntry> entries, |
| final AffinityTopologyVersion reqTopVer, |
| final GridFutureAdapter<Object> curFut) { |
| try { |
| GridCacheContext cctx = ctx.cache().internalCache(cacheName).context(); |
| |
| final boolean allowOverwrite = allowOverwrite(); |
| final boolean loc = cctx.isLocal(); |
| |
| if (!loc && !allowOverwrite) |
| cctx.topology().readLock(); |
| |
| try { |
| GridDhtTopologyFuture fut = loc ? null : cctx.topologyVersionFuture(); |
| |
| AffinityTopologyVersion topVer = loc ? reqTopVer : fut.topologyVersion(); |
| |
| if (!allowOverwrite && !topVer.equals(reqTopVer)) { |
| curFut.onDone(new IgniteCheckedException( |
| "DataStreamer will retry data transfer at stable topology. " + |
| "[reqTop=" + reqTopVer + " ,topVer=" + topVer + ", node=local]")); |
| } |
| else if (loc || allowOverwrite || fut.isDone()) { |
| IgniteInternalFuture<Object> callFut = ctx.closure().callLocalSafe( |
| new DataStreamerUpdateJob( |
| ctx, |
| log, |
| cacheName, |
| entries, |
| false, |
| skipStore, |
| keepBinary, |
| rcvr), |
| false); |
| |
| locFuts.add(callFut); |
| |
| final GridFutureAdapter waitFut = (loc || allowOverwrite) ? |
| null : |
| cctx.mvcc().addDataStreamerFuture(topVer); |
| |
| callFut.listen(new IgniteInClosure<IgniteInternalFuture<Object>>() { |
| @Override public void apply(IgniteInternalFuture<Object> t) { |
| try { |
| boolean rmv = locFuts.remove(t); |
| |
| assert rmv; |
| |
| curFut.onDone(t.get()); |
| } |
| catch (IgniteCheckedException e) { |
| curFut.onDone(e); |
| } |
| finally { |
| if (waitFut != null) |
| waitFut.onDone(); |
| } |
| } |
| }); |
| } |
| else { |
| fut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() { |
| @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) { |
| localUpdate(entries, reqTopVer, curFut); |
| } |
| }); |
| } |
| } |
| finally { |
| if (!loc && !allowOverwrite) |
| cctx.topology().readUnlock(); |
| } |
| } |
| catch (Throwable ex) { |
| curFut.onDone(new IgniteCheckedException("DataStreamer data handling failed.", ex)); |
| } |
| } |
| |
| /** |
| * @param entries Entries to submit. |
| * @param topVer Topology version. |
| * @param curFut Current future. |
| * @param remap Remapping flag. |
| * @throws IgniteInterruptedCheckedException If interrupted. |
| */ |
| private void submit(final Collection<DataStreamerEntry> entries, |
| @Nullable AffinityTopologyVersion topVer, |
| final GridFutureAdapter<Object> curFut, |
| boolean remap) |
| throws IgniteInterruptedCheckedException { |
| assert entries != null; |
| assert !entries.isEmpty(); |
| assert curFut != null; |
| |
| if (!remap) { |
| try { |
| incrementActiveTasks(); |
| } |
| catch (IgniteDataStreamerTimeoutException e) { |
| curFut.onDone(e); |
| |
| throw e; |
| } |
| } |
| |
| IgniteInternalFuture<Object> fut; |
| |
| Byte plc = ioPlcRslvr.apply(node); |
| |
| if (plc == null) |
| plc = PUBLIC_POOL; |
| |
| if (isLocNode && plc == GridIoPolicy.PUBLIC_POOL) |
| localUpdate(entries, topVer, curFut); |
| else { |
| try { |
| for (DataStreamerEntry e : entries) { |
| e.getKey().prepareMarshal(cacheObjCtx); |
| |
| CacheObject val = e.getValue(); |
| |
| if (val != null) |
| val.prepareMarshal(cacheObjCtx); |
| } |
| |
| if (updaterBytes == null) { |
| assert rcvr != null; |
| |
| updaterBytes = U.marshal(ctx, rcvr); |
| } |
| |
| if (topicBytes == null) |
| topicBytes = U.marshal(ctx, topic); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to marshal (request will not be sent).", e); |
| |
| return; |
| } |
| |
| GridDeployment dep = null; |
| GridPeerDeployAware jobPda0 = null; |
| |
| jobPda0 = jobPda; |
| |
| if (ctx.deploy().enabled() && jobPda0 != null) { |
| try { |
| dep = ctx.deploy().deploy(jobPda0.deployClass(), jobPda0.classLoader()); |
| |
| GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName); |
| |
| if (cache != null) |
| cache.context().deploy().onEnter(); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to deploy class (request will not be sent): " + jobPda0.deployClass(), e); |
| |
| return; |
| } |
| |
| if (dep == null) |
| U.warn(log, "Failed to deploy class (request will be sent): " + jobPda0.deployClass()); |
| } |
| |
| long reqId = idGen.incrementAndGet(); |
| |
| fut = curFut; |
| |
| reqs.put(reqId, (GridFutureAdapter<Object>)fut); |
| |
| if (topVer == null) |
| topVer = ctx.cache().context().exchange().readyAffinityVersion(); |
| |
| DataStreamerRequest req = new DataStreamerRequest( |
| reqId, |
| topicBytes, |
| cacheName, |
| updaterBytes, |
| entries, |
| true, |
| skipStore, |
| keepBinary, |
| dep != null ? dep.deployMode() : null, |
| dep != null ? jobPda0.deployClass().getName() : null, |
| dep != null ? dep.userVersion() : null, |
| dep != null ? dep.participants() : null, |
| dep != null ? dep.classLoaderId() : null, |
| dep == null, |
| topVer); |
| |
| try { |
| ctx.io().send(node, TOPIC_DATASTREAM, req, plc); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']'); |
| } |
| catch (IgniteCheckedException e) { |
| GridFutureAdapter<Object> fut0 = ((GridFutureAdapter<Object>)fut); |
| |
| try { |
| if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id())) |
| fut0.onDone(e); |
| else |
| fut0.onDone(new ClusterTopologyCheckedException("Failed to send request (node has left): " |
| + node.id())); |
| } |
| catch (IgniteClientDisconnectedCheckedException e0) { |
| fut0.onDone(e0); |
| } |
| } |
| } |
| } |
| |
| /** |
| * |
| */ |
| void onNodeLeft() { |
| assert !isLocNode; |
| assert bufMappings.get(node.id()) != this; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Forcibly completing futures (node has left): " + node.id()); |
| |
| Exception e = new ClusterTopologyCheckedException("Failed to wait for request completion " + |
| "(node has left): " + node.id()); |
| |
| for (GridFutureAdapter<Object> f : reqs.values()) |
| f.onDone(e); |
| |
| // Make sure to complete current future. |
| GridFutureAdapter<Object> curFut0; |
| |
| synchronized (this) { |
| curFut0 = curFut; |
| } |
| |
| curFut0.onDone(e); |
| } |
| |
| /** |
| * @param res Response. |
| * @param nodeId Node id. |
| */ |
| void onResponse(DataStreamerResponse res, UUID nodeId) { |
| if (log.isDebugEnabled()) |
| log.debug("Received data load response: " + res); |
| |
| GridFutureAdapter<?> f = reqs.remove(res.requestId()); |
| |
| if (f == null) { |
| if (log.isDebugEnabled()) |
| log.debug("Future for request has not been found: " + res.requestId()); |
| |
| return; |
| } |
| |
| Throwable err = null; |
| |
| byte[] errBytes = res.errorBytes(); |
| |
| if (errBytes != null) { |
| try { |
| GridPeerDeployAware jobPda0 = jobPda; |
| |
| err = new IgniteCheckedException("DataStreamer request failed [node=" + nodeId + "]", |
| (Throwable)U.unmarshal(ctx, |
| errBytes, |
| U.resolveClassLoader(jobPda0 != null ? jobPda0.classLoader() : null, ctx.config()))); |
| } |
| catch (IgniteCheckedException e) { |
| f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e)); |
| |
| return; |
| } |
| } |
| |
| f.onDone(null, err); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Finished future [fut=" + f + ", reqId=" + res.requestId() + ", err=" + err + ']'); |
| } |
| |
| /** |
| * @param err Error. |
| */ |
| void cancelAll(@Nullable IgniteCheckedException err) { |
| if (err == null) |
| err = new IgniteCheckedException("Data streamer has been cancelled: " + DataStreamerImpl.this); |
| |
| for (IgniteInternalFuture<?> f : locFuts) { |
| try { |
| f.cancel(); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to cancel mini-future.", e); |
| } |
| } |
| |
| for (GridFutureAdapter<?> f : reqs.values()) |
| f.onDone(err); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| int size; |
| |
| synchronized (this) { |
| size = entries.size(); |
| } |
| |
| return S.toString(Buffer.class, this, |
| "entriesCnt", size, |
| "locFutsSize", locFuts.size(), |
| "reqsSize", reqs.size()); |
| } |
| } |
| |
| /** |
| * Data streamer peer-deploy aware. |
| */ |
| private class DataStreamerPda implements GridPeerDeployAware { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Deploy class. */ |
| private Class<?> cls; |
| |
| /** Class loader. */ |
| private ClassLoader ldr; |
| |
| /** Collection of objects to detect deploy class and class loader. */ |
| private Collection<Object> objs; |
| |
| /** |
| * Constructs data streamer peer-deploy aware. |
| * |
| * @param objs Collection of objects to detect deploy class and class loader. |
| */ |
| private DataStreamerPda(Object... objs) { |
| this.objs = Arrays.asList(objs); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Class<?> deployClass() { |
| if (cls == null) { |
| Class<?> cls0 = null; |
| |
| if (depCls != null) |
| cls0 = depCls; |
| else { |
| for (Iterator<Object> it = objs.iterator(); (cls0 == null || U.isJdk(cls0)) && it.hasNext(); ) { |
| Object o = it.next(); |
| |
| if (o != null) |
| cls0 = U.detectClass(o); |
| } |
| |
| if (cls0 == null || U.isJdk(cls0)) |
| cls0 = DataStreamerImpl.class; |
| } |
| |
| assert cls0 != null : "Failed to detect deploy class [objs=" + objs + ']'; |
| |
| cls = cls0; |
| } |
| |
| return cls; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public ClassLoader classLoader() { |
| if (ldr == null) { |
| ClassLoader ldr0 = deployClass().getClassLoader(); |
| |
| // Safety. |
| if (ldr0 == null) |
| ldr0 = U.gridClassLoader(); |
| |
| assert ldr0 != null : "Failed to detect classloader [objs=" + objs + ']'; |
| |
| ldr = ldr0; |
| } |
| |
| return ldr; |
| } |
| } |
| |
| /** |
| * Isolated receiver which only loads entry initial value. |
| */ |
| protected static class IsolatedUpdater implements StreamReceiver<KeyCacheObject, CacheObject>, |
| DataStreamerCacheUpdaters.InternalUpdater { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** {@inheritDoc} */ |
| @Override public void receive(IgniteCache<KeyCacheObject, CacheObject> cache, |
| Collection<Map.Entry<KeyCacheObject, CacheObject>> entries) { |
| IgniteCacheProxy<KeyCacheObject, CacheObject> proxy = (IgniteCacheProxy<KeyCacheObject, CacheObject>)cache; |
| |
| GridCacheAdapter<KeyCacheObject, CacheObject> internalCache = proxy.context().cache(); |
| |
| if (internalCache.isNear()) |
| internalCache = internalCache.context().near().dht(); |
| |
| GridCacheContext cctx = internalCache.context(); |
| |
| AffinityTopologyVersion topVer = cctx.isLocal() ? |
| cctx.affinity().affinityTopologyVersion() : |
| cctx.shared().exchange().readyAffinityVersion(); |
| |
| GridCacheVersion ver = cctx.versions().isolatedStreamerVersion(); |
| |
| long ttl = CU.TTL_ETERNAL; |
| long expiryTime = CU.EXPIRE_TIME_ETERNAL; |
| |
| ExpiryPolicy plc = cctx.expiry(); |
| |
| for (Entry<KeyCacheObject, CacheObject> e : entries) { |
| try { |
| e.getKey().finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader()); |
| |
| GridCacheEntryEx entry = internalCache.entryEx(e.getKey(), topVer); |
| |
| if (plc != null) { |
| ttl = CU.toTtl(plc.getExpiryForCreation()); |
| |
| if (ttl == CU.TTL_ZERO) |
| continue; |
| else if (ttl == CU.TTL_NOT_CHANGED) |
| ttl = 0; |
| |
| expiryTime = CU.toExpireTime(ttl); |
| } |
| |
| boolean primary = cctx.affinity().primary(cctx.localNode(), entry.key(), topVer); |
| |
| entry.initialValue(e.getValue(), |
| ver, |
| ttl, |
| expiryTime, |
| false, |
| topVer, |
| primary ? GridDrType.DR_LOAD : GridDrType.DR_PRELOAD, |
| false); |
| |
| cctx.evicts().touch(entry, topVer); |
| |
| CU.unwindEvicts(cctx); |
| |
| entry.onUnlock(); |
| } |
| catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) { |
| // No-op. |
| } |
| catch (IgniteCheckedException ex) { |
| IgniteLogger log = cache.unwrap(Ignite.class).log(); |
| |
| U.error(log, "Failed to set initial value for cache entry: " + e, ex); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Default IO policy resolver. |
| */ |
| private static class DefaultIoPolicyResolver implements IgniteClosure<ClusterNode, Byte> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** {@inheritDoc} */ |
| @Override public Byte apply(ClusterNode gridNode) { |
| return PUBLIC_POOL; |
| } |
| } |
| |
| /** |
| * Key object wrapper. Using identity equals prevents slow down in case of hash code collision. |
| */ |
| private static class KeyCacheObjectWrapper { |
| /** key object */ |
| private final KeyCacheObject key; |
| |
| /** |
| * Constructor |
| * |
| * @param key key object |
| */ |
| KeyCacheObjectWrapper(KeyCacheObject key) { |
| assert key != null; |
| |
| this.key = key; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean equals(Object o) { |
| return o instanceof KeyCacheObjectWrapper && this.key == ((KeyCacheObjectWrapper)o).key; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int hashCode() { |
| return key.hashCode(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(KeyCacheObjectWrapper.class, this); |
| } |
| } |
| } |