blob: 162118374499062564daccf586d398b168d92ba9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Core.Impl.Common
{
using System;
using System.Globalization;
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Cache.Event;
using Apache.Ignite.Core.Compute;
using Apache.Ignite.Core.Datastream;
using Apache.Ignite.Core.Events;
using Apache.Ignite.Core.Impl.Binary.IO;
using Apache.Ignite.Core.Impl.Cache;
using Apache.Ignite.Core.Impl.Cache.Query.Continuous;
using Apache.Ignite.Core.Impl.Datastream;
using Apache.Ignite.Core.Messaging;
/// <summary>
/// Type descriptor with precompiled delegates for known methods.
/// </summary>
internal class DelegateTypeDescriptor
{
/** Cached descriptors. */
private static readonly CopyOnWriteConcurrentDictionary<Type, DelegateTypeDescriptor> Descriptors
= new CopyOnWriteConcurrentDictionary<Type, DelegateTypeDescriptor>();
/** */
private readonly Func<object, object> _computeOutFunc;
/** */
private readonly Func<object, object, object> _computeFunc;
/** */
private readonly Func<object, object, bool> _eventFilter;
/** */
private readonly Func<object, object, object, bool> _cacheEntryFilter;
/** */
private readonly Tuple<Func<object, IMutableCacheEntryInternal, object, object>, Tuple<Type, Type>>
_cacheEntryProcessor;
/** */
private readonly Func<object, Guid, object, bool> _messageLsnr;
/** */
private readonly Func<object, object> _computeJobExecute;
/** */
private readonly Action<object> _computeJobCancel;
/** */
private readonly Action<object, Ignite, IPlatformTargetInternal, IBinaryStream, bool> _streamReceiver;
/** */
private readonly Func<object, object> _streamTransformerCtor;
/** */
private readonly Func<object, object, object> _continuousQueryFilterCtor;
/// <summary>
/// Gets the <see cref="IComputeFunc{T}" /> invocator.
/// </summary>
/// <param name="type">Type.</param>
/// <returns>Precompiled invocator delegate.</returns>
public static Func<object, object> GetComputeOutFunc(Type type)
{
return Get(type)._computeOutFunc;
}
/// <summary>
/// Gets the <see cref="IComputeFunc{T, R}" /> invocator.
/// </summary>
/// <param name="type">Type.</param>
/// <returns>Precompiled invocator delegate.</returns>
public static Func<object, object, object> GetComputeFunc(Type type)
{
return Get(type)._computeFunc;
}
/// <summary>
/// Gets the <see cref="IEventFilter{T}" /> invocator.
/// </summary>
/// <param name="type">Type.</param>
/// <returns>Precompiled invocator delegate.</returns>
public static Func<object, object, bool> GetEventFilter(Type type)
{
return Get(type)._eventFilter;
}
/// <summary>
/// Gets the <see cref="ICacheEntryFilter{TK,TV}" /> invocator.
/// </summary>
/// <param name="type">Type.</param>
/// <returns>Precompiled invocator delegate.</returns>
public static Func<object, object, object, bool> GetCacheEntryFilter(Type type)
{
return Get(type)._cacheEntryFilter;
}
/// <summary>
/// Gets the <see cref="ICacheEntryProcessor{K, V, A, R}" /> invocator.
/// </summary>
/// <param name="type">Type.</param>
/// <returns>Precompiled invocator delegate.</returns>
public static Func<object, IMutableCacheEntryInternal, object, object> GetCacheEntryProcessor(Type type)
{
return Get(type)._cacheEntryProcessor.Item1;
}
/// <summary>
/// Gets key and value types for the <see cref="ICacheEntryProcessor{K, V, A, R}" />.
/// </summary>
/// <param name="type">Type.</param>
/// <returns>Key and value types.</returns>
public static Tuple<Type, Type> GetCacheEntryProcessorTypes(Type type)
{
return Get(type)._cacheEntryProcessor.Item2;
}
/// <summary>
/// Gets the <see cref="IMessageListener{T}" /> invocator.
/// </summary>
/// <param name="type">Type.</param>
/// <returns>Precompiled invocator delegate.</returns>
public static Func<object, Guid, object, bool> GetMessageListener(Type type)
{
return Get(type)._messageLsnr;
}
/// <summary>
/// Gets the <see cref="IComputeJob{T}.Execute" /> and <see cref="IComputeJob{T}.Cancel" /> invocators.
/// </summary>
/// <param name="type">Type.</param>
/// <param name="execute">Execute invocator.</param>
/// <param name="cancel">Cancel invocator.</param>
public static void GetComputeJob(Type type, out Func<object, object> execute, out Action<object> cancel)
{
var desc = Get(type);
execute = desc._computeJobExecute;
cancel = desc._computeJobCancel;
}
/// <summary>
/// Gets the <see cref="IStreamReceiver{TK,TV}"/> invocator.
/// </summary>
/// <param name="type">Type.</param>
/// <returns>Precompiled invocator delegate.</returns>
public static Action<object, Ignite, IPlatformTargetInternal, IBinaryStream, bool> GetStreamReceiver(Type type)
{
return Get(type)._streamReceiver;
}
/// <summary>
/// Gets the <see cref="StreamTransformer{K, V, A, R}"/>> ctor invocator.
/// </summary>
/// <param name="type">Type.</param>
/// <returns>Precompiled invocator delegate.</returns>
public static Func<object, object> GetStreamTransformerCtor(Type type)
{
return Get(type)._streamTransformerCtor;
}
/// <summary>
/// Gets the <see cref="ContinuousQueryFilter{TK,TV}"/>> ctor invocator.
/// </summary>
/// <param name="type">Type.</param>
/// <returns>Precompiled invocator delegate.</returns>
public static Func<object, object, object> GetContinuousQueryFilterCtor(Type type)
{
return Get(type)._continuousQueryFilterCtor;
}
/// <summary>
/// Gets the <see cref="DelegateTypeDescriptor" /> by type.
/// </summary>
private static DelegateTypeDescriptor Get(Type type)
{
DelegateTypeDescriptor result;
return Descriptors.TryGetValue(type, out result)
? result
: Descriptors.GetOrAdd(type, t => new DelegateTypeDescriptor(t));
}
/// <summary>
/// Throws an exception if first argument is not null.
/// </summary>
// ReSharper disable once UnusedParameter.Local
// ReSharper disable once ParameterOnlyUsedForPreconditionCheck.Local
private static void ThrowIfMultipleInterfaces(object check, Type userType, Type interfaceType)
{
if (check != null)
throw new InvalidOperationException(string.Format(CultureInfo.InvariantCulture,
"Not Supported: Type {0} implements interface {1} multiple times.", userType, interfaceType));
}
/// <summary>
/// Initializes a new instance of the <see cref="DelegateTypeDescriptor"/> class.
/// </summary>
/// <param name="type">The type.</param>
private DelegateTypeDescriptor(Type type)
{
foreach (var iface in type.GetInterfaces())
{
if (!iface.IsGenericType)
continue;
var genericTypeDefinition = iface.GetGenericTypeDefinition();
if (genericTypeDefinition == typeof (IComputeFunc<>))
{
ThrowIfMultipleInterfaces(_computeOutFunc, type, typeof(IComputeFunc<>));
_computeOutFunc = DelegateConverter.CompileFunc(iface);
}
else if (genericTypeDefinition == typeof (IComputeFunc<,>))
{
ThrowIfMultipleInterfaces(_computeFunc, type, typeof(IComputeFunc<,>));
var args = iface.GetGenericArguments();
_computeFunc = DelegateConverter.CompileFunc<Func<object, object, object>>(iface, new[] {args[0]});
}
else if (genericTypeDefinition == typeof (IEventFilter<>))
{
ThrowIfMultipleInterfaces(_eventFilter, type, typeof(IEventFilter<>));
var args = iface.GetGenericArguments();
_eventFilter = DelegateConverter.CompileFunc<Func<object, object, bool>>(iface,
new[] {args[0]}, new[] {true, false});
}
else if (genericTypeDefinition == typeof (ICacheEntryFilter<,>))
{
ThrowIfMultipleInterfaces(_cacheEntryFilter, type, typeof(ICacheEntryFilter<,>));
var args = iface.GetGenericArguments();
var entryType = typeof (ICacheEntry<,>).MakeGenericType(args);
var invokeFunc = DelegateConverter.CompileFunc<Func<object, object, bool>>(iface,
new[] { entryType }, new[] { true, false });
var ctor = DelegateConverter.CompileCtor<Func<object, object, object>>(
typeof (CacheEntry<,>).MakeGenericType(args), args);
// Resulting func constructs CacheEntry and passes it to user implementation
_cacheEntryFilter = (obj, k, v) => invokeFunc(obj, ctor(k, v));
}
else if (genericTypeDefinition == typeof (ICacheEntryProcessor<,,,>))
{
ThrowIfMultipleInterfaces(_cacheEntryProcessor, type, typeof(ICacheEntryProcessor<,,,>));
var args = iface.GetGenericArguments();
var entryType = typeof (IMutableCacheEntry<,>).MakeGenericType(args[0], args[1]);
var func = DelegateConverter.CompileFunc<Func<object, object, object, object>>(iface,
new[] { entryType, args[2] }, null, "Process");
var types = new Tuple<Type, Type>(args[0], args[1]);
_cacheEntryProcessor = new Tuple<Func<object, IMutableCacheEntryInternal, object, object>, Tuple<Type, Type>>
(func, types);
var transformerType = typeof (StreamTransformer<,,,>).MakeGenericType(args);
_streamTransformerCtor = DelegateConverter.CompileCtor<Func<object, object>>(transformerType,
new[] {iface});
}
else if (genericTypeDefinition == typeof (IMessageListener<>))
{
ThrowIfMultipleInterfaces(_messageLsnr, type, typeof(IMessageListener<>));
var arg = iface.GetGenericArguments()[0];
_messageLsnr = DelegateConverter.CompileFunc<Func<object, Guid, object, bool>>(iface,
new[] { typeof(Guid), arg }, new[] { false, true, false });
}
else if (genericTypeDefinition == typeof (IComputeJob<>))
{
ThrowIfMultipleInterfaces(_messageLsnr, type, typeof(IComputeJob<>));
_computeJobExecute = DelegateConverter.CompileFunc<Func<object, object>>(iface, new Type[0],
methodName: "Execute");
_computeJobCancel = DelegateConverter.CompileFunc<Action<object>>(iface, new Type[0],
new[] {false}, "Cancel");
}
else if (genericTypeDefinition == typeof (IStreamReceiver<,>))
{
ThrowIfMultipleInterfaces(_streamReceiver, type, typeof (IStreamReceiver<,>));
var method =
typeof (StreamReceiverHolder).GetMethod("InvokeReceiver")
.MakeGenericMethod(iface.GetGenericArguments());
_streamReceiver = DelegateConverter
.CompileFunc<Action<object, Ignite, IPlatformTargetInternal, IBinaryStream, bool>>(
typeof (StreamReceiverHolder),
method,
new[]
{
iface, typeof (Ignite), typeof (IPlatformTargetInternal), typeof (IBinaryStream),
typeof (bool)
},
new[] {true, false, false, false, false, false});
}
else if (genericTypeDefinition == typeof (ICacheEntryEventFilter<,>))
{
ThrowIfMultipleInterfaces(_streamReceiver, type, typeof(ICacheEntryEventFilter<,>));
var args = iface.GetGenericArguments();
_continuousQueryFilterCtor =
DelegateConverter.CompileCtor<Func<object, object, object>>(
typeof(ContinuousQueryFilter<,>).MakeGenericType(args), new[] { iface, typeof(bool) });
}
}
}
}
}