blob: 780dd4e6dd4b38272b59d508e06cf1d045a9aa95 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Linq.Impl
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Cache.Query;
using Apache.Ignite.Core.Impl.Cache;
using Apache.Ignite.Core.Impl.Common;
using Remotion.Linq;
/// <summary>
/// Fields query executor.
/// </summary>
internal class CacheFieldsQueryExecutor : IQueryExecutor
{
/** */
private readonly ICacheInternal _cache;
/** */
private readonly QueryOptions _options;
/** */
private static readonly CopyOnWriteConcurrentDictionary<ConstructorInfo, object> CtorCache =
new CopyOnWriteConcurrentDictionary<ConstructorInfo, object>();
/// <summary>
/// Initializes a new instance of the <see cref="CacheFieldsQueryExecutor" /> class.
/// </summary>
/// <param name="cache">The executor function.</param>
/// <param name="options">Query options.</param>
public CacheFieldsQueryExecutor(ICacheInternal cache, QueryOptions options)
{
Debug.Assert(cache != null);
Debug.Assert(options != null);
_cache = cache;
_options = options;
}
/** <inheritdoc /> */
public T ExecuteScalar<T>(QueryModel queryModel)
{
return ExecuteSingle<T>(queryModel, false);
}
/** <inheritdoc /> */
public T ExecuteSingle<T>(QueryModel queryModel, bool returnDefaultWhenEmpty)
{
var col = ExecuteCollection<T>(queryModel);
return returnDefaultWhenEmpty ? col.SingleOrDefault() : col.Single();
}
/** <inheritdoc /> */
[SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
public IEnumerable<T> ExecuteCollection<T>(QueryModel queryModel)
{
Debug.Assert(queryModel != null);
var qryData = GetQueryData(queryModel);
Debug.WriteLine("\nFields Query: {0} | {1}", qryData.QueryText,
string.Join(", ", qryData.Parameters.Select(x => x == null ? "null" : x.ToString())));
var qry = GetFieldsQuery(qryData.QueryText, qryData.Parameters.ToArray());
var selector = GetResultSelector<T>(queryModel.SelectClause.Selector);
return _cache.Query(qry, selector);
}
/// <summary>
/// Compiles the query without regard to number or order of arguments.
/// </summary>
public Func<object[], IQueryCursor<T>> CompileQuery<T>(QueryModel queryModel)
{
Debug.Assert(queryModel != null);
var qryText = GetQueryData(queryModel).QueryText;
var selector = GetResultSelector<T>(queryModel.SelectClause.Selector);
return args => _cache.Query(GetFieldsQuery(qryText, args), selector);
}
/// <summary>
/// Compiles the query.
/// </summary>
/// <typeparam name="T">Result type.</typeparam>
/// <param name="queryModel">The query model.</param>
/// <param name="queryLambdaModel">The query model generated from lambda body.</param>
/// <param name="queryLambda">The query lambda.</param>
/// <returns>Compiled query func.</returns>
public Func<object[], IQueryCursor<T>> CompileQuery<T>(QueryModel queryModel, QueryModel queryLambdaModel,
LambdaExpression queryLambda)
{
Debug.Assert(queryModel != null);
// Get model from lambda to map arguments properly.
var qryData = GetQueryData(queryLambdaModel);
var qryText = GetQueryData(queryModel).QueryText;
var qryTextLambda = qryData.QueryText;
if (qryText != qryTextLambda)
{
Debug.WriteLine(qryText);
Debug.WriteLine(qryTextLambda);
throw new InvalidOperationException("Error compiling query: entire LINQ expression should be " +
"specified within lambda passed to Compile method. " +
"Part of the query can't be outside the Compile method call.");
}
var selector = GetResultSelector<T>(queryModel.SelectClause.Selector);
var qryParams = qryData.Parameters.ToArray();
// Compiled query is a delegate with query parameters
// Delegate parameters order and query parameters order may differ
// Simple case: lambda with no parameters. Only embedded parameters are used.
if (!queryLambda.Parameters.Any())
{
return argsUnused => _cache.Query(GetFieldsQuery(qryText, qryParams), selector);
}
// These are in order of usage in query
var qryOrderArgs = qryParams.OfType<ParameterExpression>().Select(x => x.Name).ToArray();
// These are in order they come from user
var userOrderArgs = queryLambda.Parameters.Select(x => x.Name).ToList();
// Simple case: all query args directly map to the lambda args in the same order
if (qryOrderArgs.Length == qryParams.Length
&& qryOrderArgs.SequenceEqual(userOrderArgs))
{
return args => _cache.Query(GetFieldsQuery(qryText, args), selector);
}
// General case: embedded args and lambda args are mixed; same args can be used multiple times.
// Produce a mapping that defines where query arguments come from.
var mapping = qryParams.Select(x =>
{
var pe = x as ParameterExpression;
if (pe != null)
return userOrderArgs.IndexOf(pe.Name);
return -1;
}).ToArray();
return args => _cache.Query(
GetFieldsQuery(qryText, MapQueryArgs(args, qryParams, mapping)), selector);
}
/// <summary>
/// Maps the query arguments.
/// </summary>
private static object[] MapQueryArgs(object[] userArgs, object[] embeddedArgs, int[] mapping)
{
var mappedArgs = new object[embeddedArgs.Length];
for (var i = 0; i < mappedArgs.Length; i++)
{
var map = mapping[i];
mappedArgs[i] = map < 0 ? embeddedArgs[i] : userArgs[map];
}
return mappedArgs;
}
/// <summary>
/// Gets the fields query.
/// </summary>
internal SqlFieldsQuery GetFieldsQuery(string text, object[] args)
{
return new SqlFieldsQuery(text)
{
EnableDistributedJoins = _options.EnableDistributedJoins,
PageSize = _options.PageSize,
EnforceJoinOrder = _options.EnforceJoinOrder,
Timeout = _options.Timeout,
#pragma warning disable 618
ReplicatedOnly = _options.ReplicatedOnly,
#pragma warning restore 618
Colocated = _options.Colocated,
Local = _options.Local,
Arguments = args,
Lazy = _options.Lazy,
UpdateBatchSize = _options.UpdateBatchSize,
Partitions = _options.Partitions
};
}
/// <summary>
/// Generates <see cref="QueryData"/> from specified <see cref="QueryModel"/>.
/// </summary>
public static QueryData GetQueryData(QueryModel queryModel)
{
Debug.Assert(queryModel != null);
return new CacheQueryModelVisitor().GenerateQuery(queryModel);
}
/// <summary>
/// Gets the result selector.
/// </summary>
private static Func<IBinaryRawReader, int, T> GetResultSelector<T>(Expression selectorExpression)
{
var newExpr = selectorExpression as NewExpression;
if (newExpr != null)
return GetCompiledCtor<T>(newExpr.Constructor);
var entryCtor = GetCacheEntryCtorInfo(typeof(T));
if (entryCtor != null)
return GetCompiledCtor<T>(entryCtor);
if (typeof(T) == typeof(bool))
return ReadBool<T>;
return (reader, count) => reader.ReadObject<T>();
}
/// <summary>
/// Reads the bool. Actual data may be bool or int/long.
/// </summary>
private static T ReadBool<T>(IBinaryRawReader reader, int count)
{
var obj = reader.ReadObject<object>();
if (obj is bool)
return (T) obj;
if (obj is long)
return TypeCaster<T>.Cast((long) obj != 0);
if (obj is int)
return TypeCaster<T>.Cast((int) obj != 0);
throw new InvalidOperationException("Expected bool, got: " + obj);
}
/// <summary>
/// Gets the cache entry constructor.
/// </summary>
private static ConstructorInfo GetCacheEntryCtorInfo(Type entryType)
{
if (!entryType.IsGenericType || entryType.GetGenericTypeDefinition() != typeof(ICacheEntry<,>))
return null;
var args = entryType.GetGenericArguments();
var targetType = typeof (CacheEntry<,>).MakeGenericType(args);
return targetType.GetConstructors().Single();
}
/// <summary>
/// Gets the compiled constructor.
/// </summary>
private static Func<IBinaryRawReader, int, T> GetCompiledCtor<T>(ConstructorInfo ctorInfo)
{
object result;
if (CtorCache.TryGetValue(ctorInfo, out result))
return (Func<IBinaryRawReader, int, T>) result;
return (Func<IBinaryRawReader, int, T>) CtorCache.GetOrAdd(ctorInfo, x =>
{
var innerCtor1 = DelegateConverter.CompileCtor<T>(x, GetCacheEntryCtorInfo);
return (Func<IBinaryRawReader, int, T>) ((r, c) => innerCtor1(r));
});
}
}
}