blob: b472275f5e28ea7616686227df165b8a18ca376d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.platform;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
import org.apache.ignite.internal.processors.platform.utils.PlatformListenable;
import org.apache.ignite.internal.processors.platform.utils.PlatformListenableTarget;
import org.apache.ignite.lang.IgniteFuture;
/**
* Platform target that is invoked via JNI and propagates calls to underlying {@link PlatformTarget}.
*/
public class PlatformTargetProxyImpl implements PlatformTargetProxy {
/** Context. */
protected final PlatformContext platformCtx;
/** Underlying target. */
private final PlatformTarget target;
/**
* @param target Platform target.
* @param platformCtx Platform context.
*/
public PlatformTargetProxyImpl(PlatformTarget target, PlatformContext platformCtx) {
assert platformCtx != null;
assert target != null;
this.platformCtx = platformCtx;
this.target = target;
}
/** {@inheritDoc} */
@Override public long inLongOutLong(int type, long val) throws Exception {
try {
return target.processInLongOutLong(type, val);
}
catch (Exception e) {
throw target.convertException(e);
}
}
/** {@inheritDoc} */
@Override public long inStreamOutLong(int type, long memPtr) throws Exception {
try (PlatformMemory mem = platformCtx.memory().get(memPtr)) {
BinaryRawReaderEx reader = platformCtx.reader(mem);
return target.processInStreamOutLong(type, reader, mem);
}
catch (Exception e) {
throw target.convertException(e);
}
}
/** {@inheritDoc} */
@Override public Object inStreamOutObject(int type, long memPtr) throws Exception {
try (PlatformMemory mem = memPtr != 0 ? platformCtx.memory().get(memPtr) : null) {
BinaryRawReaderEx reader = mem != null ? platformCtx.reader(mem) : null;
return wrapProxy(target.processInStreamOutObject(type, reader));
}
catch (Exception e) {
throw target.convertException(e);
}
}
/** {@inheritDoc} */
@Override public void outStream(int type, long memPtr) throws Exception {
try (PlatformMemory mem = platformCtx.memory().get(memPtr)) {
PlatformOutputStream out = mem.output();
BinaryRawWriterEx writer = platformCtx.writer(out);
target.processOutStream(type, writer);
out.synchronize();
}
catch (Exception e) {
throw target.convertException(e);
}
}
/** {@inheritDoc} */
@Override public Object outObject(int type) throws Exception {
try {
return wrapProxy(target.processOutObject(type));
}
catch (Exception e) {
throw target.convertException(e);
}
}
/** {@inheritDoc} */
@Override public void inStreamAsync(int type, long memPtr) throws Exception {
inStreamOutListenableAsync(type, memPtr);
}
/** {@inheritDoc} */
@Override public Object inStreamOutObjectAsync(int type, long memPtr) throws Exception {
PlatformListenable listenable = inStreamOutListenableAsync(type, memPtr);
PlatformListenableTarget target = new PlatformListenableTarget(listenable, platformCtx);
return wrapProxy(target);
}
/** {@inheritDoc} */
@Override public void inStreamOutStream(int type, long inMemPtr, long outMemPtr) throws Exception {
try (PlatformMemory inMem = platformCtx.memory().get(inMemPtr)) {
BinaryRawReaderEx reader = platformCtx.reader(inMem);
try (PlatformMemory outMem = platformCtx.memory().get(outMemPtr)) {
PlatformOutputStream out = outMem.output();
BinaryRawWriterEx writer = platformCtx.writer(out);
target.processInStreamOutStream(type, reader, writer);
out.synchronize();
}
}
catch (Exception e) {
throw target.convertException(e);
}
}
/** {@inheritDoc} */
@Override public Object inObjectStreamOutObjectStream(int type, Object arg, long inMemPtr, long outMemPtr)
throws Exception {
PlatformMemory inMem = null;
PlatformMemory outMem = null;
try {
BinaryRawReaderEx reader = null;
if (inMemPtr != 0) {
inMem = platformCtx.memory().get(inMemPtr);
reader = platformCtx.reader(inMem);
}
PlatformOutputStream out = null;
BinaryRawWriterEx writer = null;
if (outMemPtr != 0) {
outMem = platformCtx.memory().get(outMemPtr);
out = outMem.output();
writer = platformCtx.writer(out);
}
PlatformTarget res = target.processInObjectStreamOutObjectStream(type, unwrapProxy(arg), reader, writer);
if (out != null)
out.synchronize();
return wrapProxy(res);
}
catch (Exception e) {
throw target.convertException(e);
}
finally {
try {
if (inMem != null)
inMem.close();
}
finally {
if (outMem != null)
outMem.close();
}
}
}
/** {@inheritDoc} */
@Override public PlatformTarget unwrap() {
return target;
}
/**
* Wraps an object in a proxy when possible.
*
* @param obj Object to wrap.
* @return Wrapped object.
*/
private Object wrapProxy(PlatformTarget obj) {
return obj == null ? null : new PlatformTargetProxyImpl(obj, platformCtx);
}
/**
* Unwraps an object from a proxy when possible.
*
* @param obj Object to unwrap.
* @return Unwrapped object.
*/
private PlatformTarget unwrapProxy(Object obj) {
return obj == null ? null : ((PlatformTargetProxyImpl)obj).target;
}
/**
* Performs asyncronous operation.
*
* @param type Type.
* @param memPtr Stream pointer.
* @return Listenable.
* @throws Exception On error.
*/
private PlatformListenable inStreamOutListenableAsync(int type, long memPtr) throws Exception {
try (PlatformMemory mem = platformCtx.memory().get(memPtr)) {
BinaryRawReaderEx reader = platformCtx.reader(mem);
long futId = reader.readLong();
int futTyp = reader.readInt();
final PlatformAsyncResult res = target.processInStreamAsync(type, reader);
if (res == null)
throw new IgniteException("PlatformTarget.processInStreamAsync should not return null.");
IgniteFuture fut = res.future();
if (fut == null)
throw new IgniteException("PlatformAsyncResult.future() should not return null.");
return PlatformFutureUtils.listen(platformCtx, fut, futId, futTyp, new PlatformFutureUtils.Writer() {
/** {@inheritDoc} */
@Override public void write(BinaryRawWriterEx writer, Object obj, Throwable err) {
res.write(writer, obj);
}
/** {@inheritDoc} */
@Override public boolean canWrite(Object obj, Throwable err) {
return err == null;
}
}, target);
}
catch (Exception e) {
throw target.convertException(e);
}
}
}