blob: d56656ec0aac214001913a5d6572a2f0200745e2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Core.Impl
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading.Tasks;
using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Binary.IO;
using Apache.Ignite.Core.Impl.Common;
using BinaryReader = Apache.Ignite.Core.Impl.Binary.BinaryReader;
using BinaryWriter = Apache.Ignite.Core.Impl.Binary.BinaryWriter;
/// <summary>
/// Base class for interop targets, provides additional functionality over <see cref="IPlatformTargetInternal"/>.
/// </summary>
[SuppressMessage("ReSharper", "LocalVariableHidesMember")]
internal class PlatformTargetAdapter
{
/** */
internal const int False = 0;
/** */
internal const int True = 1;
/** */
internal const int Error = -1;
/** */
private static readonly Dictionary<Type, FutureType> IgniteFutureTypeMap
= new Dictionary<Type, FutureType>
{
{typeof(bool), FutureType.Bool},
{typeof(byte), FutureType.Byte},
{typeof(char), FutureType.Char},
{typeof(double), FutureType.Double},
{typeof(float), FutureType.Float},
{typeof(int), FutureType.Int},
{typeof(long), FutureType.Long},
{typeof(short), FutureType.Short}
};
/** Unmanaged target. */
private readonly IPlatformTargetInternal _target;
/** Marshaller. */
private readonly Marshaller _marsh;
/// <summary>
/// Constructor.
/// </summary>
/// <param name="target">Target.</param>
protected PlatformTargetAdapter(IPlatformTargetInternal target)
{
Debug.Assert(target != null);
_target = target;
_marsh = target.Marshaller;
}
/// <summary>
/// Unmanaged target.
/// </summary>
public IPlatformTargetInternal Target
{
get { return _target; }
}
/// <summary>
/// Marshaller.
/// </summary>
public Marshaller Marshaller
{
get { return _marsh; }
}
#region OUT operations
/// <summary>
/// Perform out operation.
/// </summary>
/// <param name="type">Operation type.</param>
/// <param name="action">Action to be performed on the stream.</param>
/// <returns></returns>
protected long DoOutOp(int type, Action<IBinaryStream> action)
{
return _target.InStreamOutLong(type, action);
}
/// <summary>
/// Perform out operation.
/// </summary>
/// <param name="type">Operation type.</param>
/// <param name="action">Action to be performed on the stream.</param>
/// <returns></returns>
protected long DoOutOp(int type, Action<BinaryWriter> action)
{
return DoOutOp(type, stream => WriteToStream(action, stream, _marsh));
}
/// <summary>
/// Perform out operation.
/// </summary>
/// <param name="type">Operation type.</param>
/// <param name="action">Action to be performed on the stream.</param>
/// <returns>Resulting object.</returns>
protected IPlatformTargetInternal DoOutOpObject(int type, Action<BinaryWriter> action)
{
return _target.InStreamOutObject(type, stream => WriteToStream(action, stream, _marsh));
}
/// <summary>
/// Perform out operation.
/// </summary>
/// <param name="type">Operation type.</param>
/// <param name="action">Action to be performed on the stream.</param>
/// <returns>Resulting object.</returns>
protected IPlatformTargetInternal DoOutOpObject(int type, Action<IBinaryStream> action)
{
return _target.InStreamOutObject(type, action);
}
/// <summary>
/// Perform out operation.
/// </summary>
/// <param name="type">Operation type.</param>
/// <returns>Resulting object.</returns>
protected IPlatformTargetInternal DoOutOpObject(int type)
{
return _target.OutObjectInternal(type);
}
/// <summary>
/// Perform simple output operation accepting single argument.
/// </summary>
/// <param name="type">Operation type.</param>
/// <param name="val1">Value.</param>
/// <returns>Result.</returns>
protected long DoOutOp<T1>(int type, T1 val1)
{
return DoOutOp(type, writer =>
{
writer.Write(val1);
});
}
/// <summary>
/// Perform simple output operation accepting two arguments.
/// </summary>
/// <param name="type">Operation type.</param>
/// <param name="val1">Value 1.</param>
/// <param name="val2">Value 2.</param>
/// <returns>Result.</returns>
protected long DoOutOp<T1, T2>(int type, T1 val1, T2 val2)
{
return DoOutOp(type, writer =>
{
writer.Write(val1);
writer.Write(val2);
});
}
#endregion
#region IN operations
/// <summary>
/// Perform in operation.
/// </summary>
/// <param name="type">Type.</param>
/// <param name="action">Action.</param>
/// <returns>Result.</returns>
protected T DoInOp<T>(int type, Func<IBinaryStream, T> action)
{
return _target.OutStream(type, action);
}
/// <summary>
/// Perform simple in operation returning immediate result.
/// </summary>
/// <param name="type">Type.</param>
/// <returns>Result.</returns>
protected T DoInOp<T>(int type)
{
return _target.OutStream(type, s => Unmarshal<T>(s));
}
#endregion
#region OUT-IN operations
/// <summary>
/// Perform out-in operation.
/// </summary>
/// <param name="type">Operation type.</param>
/// <param name="outAction">Out action.</param>
/// <param name="inAction">In action.</param>
/// <param name="errorAction">Error action.</param>
/// <returns>Result.</returns>
protected TR DoOutInOp<TR>(int type, Action<BinaryWriter> outAction, Func<IBinaryStream, TR> inAction, Func<Exception, TR> errorAction = null)
{
return _target.InStreamOutStream(type, stream => WriteToStream(outAction, stream, _marsh), inAction, errorAction);
}
/// <summary>
/// Perform out-in operation with a single stream.
/// </summary>
/// <typeparam name="TR">The type of the r.</typeparam>
/// <param name="type">Operation type.</param>
/// <param name="outAction">Out action.</param>
/// <param name="inAction">In action.</param>
/// <param name="inErrorAction">The action to read an error.</param>
/// <returns>
/// Result.
/// </returns>
protected TR DoOutInOpX<TR>(int type, Action<BinaryWriter> outAction, Func<IBinaryStream, long, TR> inAction,
Func<IBinaryStream, Exception> inErrorAction)
{
return _target.InStreamOutLong(type, stream => WriteToStream(outAction, stream, _marsh),
inAction, inErrorAction);
}
/// <summary>
/// Perform out-in operation with a single stream.
/// </summary>
/// <param name="type">Operation type.</param>
/// <param name="outAction">Out action.</param>
/// <param name="inErrorAction">The action to read an error.</param>
/// <returns>
/// Result.
/// </returns>
protected bool DoOutInOpX(int type, Action<BinaryWriter> outAction,
Func<IBinaryStream, Exception> inErrorAction)
{
return _target.InStreamOutLong(type, stream => WriteToStream(outAction, stream, _marsh),
(stream, res) => res == True, inErrorAction);
}
/// <summary>
/// Perform out-in operation.
/// </summary>
/// <param name="type">Operation type.</param>
/// <param name="outAction">Out action.</param>
/// <param name="inAction">In action.</param>
/// <param name="arg">Argument.</param>
/// <returns>Result.</returns>
protected TR DoOutInOp<TR>(int type, Action<BinaryWriter> outAction,
Func<IBinaryStream, IPlatformTargetInternal, TR> inAction, IPlatformTargetInternal arg)
{
return _target.InObjectStreamOutObjectStream(type, stream => WriteToStream(outAction, stream, _marsh),
inAction, arg);
}
/// <summary>
/// Perform out-in operation.
/// </summary>
/// <param name="type">Operation type.</param>
/// <param name="outAction">Out action.</param>
/// <returns>Result.</returns>
protected TR DoOutInOp<TR>(int type, Action<BinaryWriter> outAction)
{
return _target.InStreamOutStream(type, stream => WriteToStream(outAction, stream, _marsh),
stream => Unmarshal<TR>(stream));
}
/// <summary>
/// Perform simple out-in operation accepting single argument.
/// </summary>
/// <param name="type">Operation type.</param>
/// <param name="val">Value.</param>
/// <returns>Result.</returns>
protected TR DoOutInOp<T1, TR>(int type, T1 val)
{
return _target.InStreamOutStream(type, stream => WriteToStream(val, stream, _marsh),
stream => Unmarshal<TR>(stream));
}
/// <summary>
/// Perform simple out-in operation accepting two arguments.
/// </summary>
/// <param name="type">Operation type.</param>
/// <param name="val">Value.</param>
/// <returns>Result.</returns>
protected long DoOutInOp(int type, long val = 0)
{
return _target.InLongOutLong(type, val);
}
#endregion
#region Async operations
/// <summary>
/// Performs async operation.
/// </summary>
/// <param name="type">The type code.</param>
/// <param name="writeAction">The write action.</param>
/// <returns>Task for async operation</returns>
protected Task DoOutOpAsync(int type, Action<BinaryWriter> writeAction = null)
{
return DoOutOpAsync<object>(type, writeAction);
}
/// <summary>
/// Performs async operation.
/// </summary>
/// <typeparam name="T">Type of the result.</typeparam>
/// <param name="type">The type code.</param>
/// <param name="writeAction">The write action.</param>
/// <param name="keepBinary">Keep binary flag, only applicable to object futures. False by default.</param>
/// <param name="convertFunc">The function to read future result from stream.</param>
/// <returns>Task for async operation</returns>
protected Task<T> DoOutOpAsync<T>(int type, Action<BinaryWriter> writeAction = null, bool keepBinary = false,
Func<BinaryReader, T> convertFunc = null)
{
return GetFuture((futId, futType) => DoOutOp(type, w =>
{
if (writeAction != null)
{
writeAction(w);
}
w.WriteLong(futId);
w.WriteInt(futType);
}), keepBinary, convertFunc).Task;
}
/// <summary>
/// Performs async operation.
/// </summary>
/// <typeparam name="T">Type of the result.</typeparam>
/// <param name="type">The type code.</param>
/// <param name="writeAction">The write action.</param>
/// <returns>Future for async operation</returns>
protected Future<T> DoOutOpObjectAsync<T>(int type, Action<BinaryWriter> writeAction)
{
return GetFuture<T>((futId, futType) => DoOutOpObject(type, w =>
{
writeAction(w);
w.WriteLong(futId);
w.WriteInt(futType);
}));
}
/// <summary>
/// Performs async operation.
/// </summary>
/// <typeparam name="TR">Type of the result.</typeparam>
/// <typeparam name="T1">The type of the first arg.</typeparam>
/// <param name="type">The type code.</param>
/// <param name="val1">First arg.</param>
/// <returns>
/// Task for async operation
/// </returns>
protected Task<TR> DoOutOpAsync<T1, TR>(int type, T1 val1)
{
return GetFuture<TR>((futId, futType) => DoOutOp(type, w =>
{
w.WriteObject(val1);
w.WriteLong(futId);
w.WriteInt(futType);
})).Task;
}
/// <summary>
/// Performs async operation.
/// </summary>
/// <typeparam name="TR">Type of the result.</typeparam>
/// <typeparam name="T1">The type of the first arg.</typeparam>
/// <typeparam name="T2">The type of the second arg.</typeparam>
/// <param name="type">The type code.</param>
/// <param name="val1">First arg.</param>
/// <param name="val2">Second arg.</param>
/// <returns>
/// Task for async operation
/// </returns>
protected Task<TR> DoOutOpAsync<T1, T2, TR>(int type, T1 val1, T2 val2)
{
return GetFuture<TR>((futId, futType) => DoOutOp(type, w =>
{
w.WriteObjectDetached(val1);
w.WriteObjectDetached(val2);
w.WriteLong(futId);
w.WriteInt(futType);
})).Task;
}
#endregion
#region Miscelanneous
/// <summary>
/// Unmarshal object using the given stream.
/// </summary>
/// <param name="stream">Stream.</param>
/// <returns>Unmarshalled object.</returns>
protected virtual T Unmarshal<T>(IBinaryStream stream)
{
return _marsh.Unmarshal<T>(stream);
}
/// <summary>
/// Creates a future and starts listening.
/// </summary>
/// <typeparam name="T">Future result type</typeparam>
/// <param name="listenAction">The listen action.</param>
/// <param name="keepBinary">Keep binary flag, only applicable to object futures. False by default.</param>
/// <param name="convertFunc">The function to read future result from stream.</param>
/// <returns>Created future.</returns>
private Future<T> GetFuture<T>(Func<long, int, IPlatformTargetInternal> listenAction, bool keepBinary = false,
Func<BinaryReader, T> convertFunc = null)
{
var futType = FutureType.Object;
var type = typeof(T);
if (type.IsPrimitive)
IgniteFutureTypeMap.TryGetValue(type, out futType);
var fut = convertFunc == null && futType != FutureType.Object
? new Future<T>()
: new Future<T>(new FutureConverter<T>(_marsh, keepBinary, convertFunc));
var futHnd = _marsh.Ignite.HandleRegistry.Allocate(fut);
IPlatformTargetInternal futTarget;
try
{
futTarget = listenAction(futHnd, (int)futType);
}
catch (Exception)
{
_marsh.Ignite.HandleRegistry.Release(futHnd);
throw;
}
fut.SetTarget(new Listenable(futTarget));
return fut;
}
/// <summary>
/// Creates a future and starts listening.
/// </summary>
/// <typeparam name="T">Future result type</typeparam>
/// <param name="listenAction">The listen action.</param>
/// <param name="keepBinary">Keep binary flag, only applicable to object futures. False by default.</param>
/// <param name="convertFunc">The function to read future result from stream.</param>
/// <returns>Created future.</returns>
private Future<T> GetFuture<T>(Action<long, int> listenAction, bool keepBinary = false,
Func<BinaryReader, T> convertFunc = null)
{
var futType = FutureType.Object;
var type = typeof(T);
if (type.IsPrimitive)
IgniteFutureTypeMap.TryGetValue(type, out futType);
var fut = convertFunc == null && futType != FutureType.Object
? new Future<T>()
: new Future<T>(new FutureConverter<T>(_marsh, keepBinary, convertFunc));
var futHnd = _marsh.Ignite.HandleRegistry.Allocate(fut);
try
{
listenAction(futHnd, (int)futType);
}
catch (Exception)
{
_marsh.Ignite.HandleRegistry.Release(futHnd);
throw;
}
return fut;
}
/// <summary>
/// Writes to stream.
/// </summary>
private static bool WriteToStream(Action<BinaryWriter> action, IBinaryStream stream, Marshaller marsh)
{
var writer = marsh.StartMarshal(stream);
action(writer);
marsh.FinishMarshal(writer);
return true;
}
/// <summary>
/// Writes to stream.
/// </summary>
private static void WriteToStream<T>(T obj, IBinaryStream stream, Marshaller marsh)
{
var writer = marsh.StartMarshal(stream);
writer.WriteObject(obj);
marsh.FinishMarshal(writer);
}
#endregion
}
}