blob: f5cf9a222ae327ef89b3647f6f864f9c5de5719f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.platform.datastreamer;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
import org.apache.ignite.internal.util.lang.GridMapEntry;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
/**
* Interop data streamer wrapper.
*/
@SuppressWarnings({"unchecked"})
public class PlatformDataStreamer extends PlatformAbstractTarget {
/** Policy: continue. */
private static final int PLC_CONTINUE = 0;
/** Policy: close. */
private static final int PLC_CLOSE = 1;
/** Policy: cancel and close. */
private static final int PLC_CANCEL_CLOSE = 2;
/** Policy: do flush. */
private static final int PLC_FLUSH = 3;
/** */
private static final int OP_UPDATE = 1;
/** */
private static final int OP_RECEIVER = 2;
/** */
private static final int OP_ALLOW_OVERWRITE = 3;
/** */
private static final int OP_SET_ALLOW_OVERWRITE = 4;
/** */
private static final int OP_SKIP_STORE = 5;
/** */
private static final int OP_SET_SKIP_STORE = 6;
/** */
private static final int OP_PER_NODE_BUFFER_SIZE = 7;
/** */
private static final int OP_SET_PER_NODE_BUFFER_SIZE = 8;
/** */
private static final int OP_PER_NODE_PARALLEL_OPS = 9;
/** */
private static final int OP_SET_PER_NODE_PARALLEL_OPS = 10;
/** */
private static final int OP_LISTEN_TOPOLOGY = 11;
/** */
private static final int OP_GET_TIMEOUT = 12;
/** */
private static final int OP_SET_TIMEOUT = 13;
/** */
private static final int OP_PER_THREAD_BUFFER_SIZE = 14;
/** */
private static final int OP_SET_PER_THREAD_BUFFER_SIZE = 15;
/** Cache name. */
private final String cacheName;
/** Data streamer. */
private final DataStreamerImpl ldr;
/** Binary flag. */
private final boolean keepBinary;
/** Topology update event listener. */
private volatile GridLocalEventListener lsnr;
/**
* Constructor.
*
* @param platformCtx Context.
* @param ldr Data streamer.
*/
public PlatformDataStreamer(PlatformContext platformCtx, String cacheName, DataStreamerImpl ldr,
boolean keepBinary) {
super(platformCtx);
this.cacheName = cacheName;
this.ldr = ldr;
this.keepBinary = keepBinary;
}
/** {@inheritDoc} */
@Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
switch (type) {
case OP_UPDATE:
int plc = reader.readInt();
if (plc == PLC_CANCEL_CLOSE) {
// Close with cancel.
platformCtx.kernalContext().event().removeLocalEventListener(lsnr);
ldr.close(true);
}
else {
final long futPtr = reader.readLong();
int valsCnt = reader.readInt();
if (valsCnt > 0) {
Collection<GridMapEntry> vals = new ArrayList<>(valsCnt);
for (int i = 0; i < valsCnt; i++)
vals.add(new GridMapEntry(reader.readObjectDetached(), reader.readObjectDetached()));
PlatformFutureUtils.listen(platformCtx, ldr.addData(vals), futPtr,
PlatformFutureUtils.TYP_OBJ, this);
}
if (plc == PLC_CLOSE) {
platformCtx.kernalContext().event().removeLocalEventListener(lsnr);
ldr.close(false);
}
else if (plc == PLC_FLUSH)
ldr.tryFlush();
else
assert plc == PLC_CONTINUE;
}
return TRUE;
case OP_RECEIVER: {
long ptr = reader.readLong();
Object rec = reader.readObjectDetached();
ldr.receiver(platformCtx.createStreamReceiver(rec, ptr, keepBinary));
return TRUE;
}
default:
return super.processInStreamOutLong(type, reader);
}
}
/** {@inheritDoc} */
@Override public long processInLongOutLong(int type, final long val) throws IgniteCheckedException {
switch (type) {
case OP_SET_ALLOW_OVERWRITE:
ldr.allowOverwrite(val == TRUE);
return TRUE;
case OP_SET_PER_NODE_BUFFER_SIZE:
ldr.perNodeBufferSize((int) val);
return TRUE;
case OP_SET_PER_THREAD_BUFFER_SIZE:
ldr.perThreadBufferSize((int) val);
return TRUE;
case OP_SET_SKIP_STORE:
ldr.skipStore(val == TRUE);
return TRUE;
case OP_SET_PER_NODE_PARALLEL_OPS:
ldr.perNodeParallelOperations((int) val);
return TRUE;
case OP_LISTEN_TOPOLOGY: {
lsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
long topVer = discoEvt.topologyVersion();
int topSize = platformCtx.kernalContext().discovery().cacheNodes(
cacheName, new AffinityTopologyVersion(topVer)).size();
platformCtx.gateway().dataStreamerTopologyUpdate(val, topVer, topSize);
}
};
platformCtx.kernalContext().event().addLocalEventListener(lsnr,
EVT_NODE_JOINED, EVT_NODE_FAILED, EVT_NODE_LEFT);
GridDiscoveryManager discoMgr = platformCtx.kernalContext().discovery();
AffinityTopologyVersion topVer =
platformCtx.kernalContext().cache().context().exchange().lastTopologyFuture().get();
int topSize = discoMgr.cacheNodes(cacheName, topVer).size();
platformCtx.gateway().dataStreamerTopologyUpdate(val, topVer.topologyVersion(), topSize);
return TRUE;
}
case OP_ALLOW_OVERWRITE:
return ldr.allowOverwrite() ? TRUE : FALSE;
case OP_PER_NODE_BUFFER_SIZE:
return ldr.perNodeBufferSize();
case OP_PER_THREAD_BUFFER_SIZE:
return ldr.perThreadBufferSize();
case OP_SKIP_STORE:
return ldr.skipStore() ? TRUE : FALSE;
case OP_PER_NODE_PARALLEL_OPS:
return ldr.perNodeParallelOperations();
case OP_GET_TIMEOUT:
return ldr.timeout();
case OP_SET_TIMEOUT:
ldr.timeout(val);
return TRUE;
}
return super.processInLongOutLong(type, val);
}
/**
* @return Allow-overwrite flag.
*/
public boolean allowOverwrite() {
return ldr.allowOverwrite();
}
/**
* @param val Allow-overwrite flag.
*/
public void allowOverwrite(boolean val) {
ldr.allowOverwrite(val);
}
/**
* @return Skip store flag.
*/
public boolean skipStore() {
return ldr.skipStore();
}
/**
* @param skipStore Skip store flag.
*/
public void skipStore(boolean skipStore) {
ldr.skipStore(skipStore);
}
/**
* @return Per-node buffer size.
*/
public int perNodeBufferSize() {
return ldr.perNodeBufferSize();
}
/**
* @param val Per-node buffer size.
*/
public void perNodeBufferSize(int val) {
ldr.perNodeBufferSize(val);
}
/**
* @return Per-node parallel load operations.
*/
public int perNodeParallelOperations() {
return ldr.perNodeParallelOperations();
}
/**
* @param val Per-node parallel load operations.
*/
public void perNodeParallelOperations(int val) {
ldr.perNodeParallelOperations(val);
}
}