blob: 33526dd37bdca5f119c7fdc6c474a1435df890c4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Core.Impl.Cache.Query.Continuous
{
using System;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Cache.Event;
using Apache.Ignite.Core.Cache.Query;
using Apache.Ignite.Core.Cache.Query.Continuous;
using Apache.Ignite.Core.Common;
using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Binary.IO;
using Apache.Ignite.Core.Impl.Common;
using Apache.Ignite.Core.Impl.Resource;
using CQU = ContinuousQueryUtils;
/// <summary>
/// Continuous query handle interface.
/// </summary>
internal interface IContinuousQueryHandleImpl : IDisposable
{
/// <summary>
/// Process callback.
/// </summary>
/// <param name="stream">Stream.</param>
/// <returns>Result.</returns>
void Apply(IBinaryStream stream);
}
/// <summary>
/// Continuous query handle.
/// </summary>
internal class ContinuousQueryHandleImpl<TK, TV> : IContinuousQueryHandleImpl, IContinuousQueryFilter,
IContinuousQueryHandle<ICacheEntry<TK, TV>>, IContinuousQueryHandleFields
{
/** Marshaller. */
private readonly Marshaller _marsh;
/** Keep binary flag. */
private readonly bool _keepBinary;
/** Real listener. */
private readonly ICacheEntryEventListener<TK, TV> _lsnr;
/** Real filter. */
private readonly ICacheEntryEventFilter<TK, TV> _filter;
/** GC handle. */
private readonly long _hnd;
/** Native query. */
private readonly IPlatformTargetInternal _nativeQry;
/** Indicates whether initial query is a Fields query. */
private readonly bool _initialQueryIsFields;
/** Initial query cursor. */
private volatile IPlatformTargetInternal _nativeInitialQueryCursor;
/** Disposed flag. */
private bool _disposed;
/// <summary>
/// Constructor.
/// </summary>
/// <param name="qry">Query.</param>
/// <param name="marsh">Marshaller.</param>
/// <param name="keepBinary">Keep binary flag.</param>
/// <param name="createTargetCb">The initialization callback.</param>
/// <param name="initialQry">The initial query.</param>
public ContinuousQueryHandleImpl(ContinuousQuery<TK, TV> qry, Marshaller marsh, bool keepBinary,
Func<Action<BinaryWriter>, IPlatformTargetInternal> createTargetCb, IQueryBaseInternal initialQry)
{
_marsh = marsh;
_keepBinary = keepBinary;
_lsnr = qry.Listener;
_filter = qry.Filter;
// 1. Inject resources.
ResourceProcessor.Inject(_lsnr, _marsh.Ignite);
ResourceProcessor.Inject(_filter, _marsh.Ignite);
try
{
// 2. Allocate handle.
_hnd = _marsh.Ignite.HandleRegistry.Allocate(this);
// 3. Call Java.
_nativeQry = createTargetCb(writer =>
{
writer.WriteLong(_hnd);
writer.WriteBoolean(qry.Local);
writer.WriteBoolean(qry.IncludeExpired);
writer.WriteBoolean(_filter != null);
var javaFilter = _filter as PlatformJavaObjectFactoryProxy;
if (javaFilter != null)
{
writer.WriteObject(javaFilter.GetRawProxy());
}
else
{
var filterHolder = _filter == null || qry.Local
? null
: new ContinuousQueryFilterHolder(_filter, _keepBinary);
writer.WriteObject(filterHolder);
}
writer.WriteInt(qry.BufferSize);
writer.WriteLong((long)qry.TimeInterval.TotalMilliseconds);
writer.WriteBoolean(qry.AutoUnsubscribe);
if (initialQry != null)
{
writer.WriteInt((int) initialQry.OpId);
initialQry.Write(writer, _keepBinary);
}
else
writer.WriteInt(-1); // no initial query
});
// 4. Initial query.
_nativeInitialQueryCursor = _nativeQry.OutObjectInternal(0);
_initialQueryIsFields = initialQry is SqlFieldsQuery;
}
catch (Exception)
{
if (_hnd > 0)
_marsh.Ignite.HandleRegistry.Release(_hnd);
if (_nativeQry != null)
_nativeQry.Dispose();
if (_nativeInitialQueryCursor != null)
_nativeInitialQueryCursor.Dispose();
throw;
}
}
/** <inheritdoc /> */
public void Apply(IBinaryStream stream)
{
ICacheEntryEvent<TK, TV>[] evts = CQU.ReadEvents<TK, TV>(stream, _marsh, _keepBinary);
_lsnr.OnEvent(evts);
}
/** <inheritdoc /> */
public bool Evaluate(IBinaryStream stream)
{
Debug.Assert(_filter != null, "Evaluate should not be called if filter is not set.");
ICacheEntryEvent<TK, TV> evt = CQU.ReadEvent<TK, TV>(stream, _marsh, _keepBinary);
return _filter.Evaluate(evt);
}
/** <inheritdoc /> */
public void Inject(Ignite grid)
{
throw new NotSupportedException("Should not be called.");
}
/** <inheritdoc /> */
public long Allocate()
{
throw new NotSupportedException("Should not be called.");
}
/** <inheritdoc /> */
public void Release()
{
_marsh.Ignite.HandleRegistry.Release(_hnd);
}
/** <inheritdoc /> */
public IQueryCursor<ICacheEntry<TK, TV>> GetInitialQueryCursor()
{
if (_initialQueryIsFields)
{
throw new IgniteException("Initial query is SqlFieldsQuery");
}
return new QueryCursor<TK, TV>(GetInitialQueryCursorInternal(), _keepBinary);
}
/** <inheritdoc /> */
IFieldsQueryCursor IContinuousQueryHandleFields.GetInitialQueryCursor()
{
if (!_initialQueryIsFields)
{
throw new IgniteException("Initial query is not SqlFieldsQuery");
}
return new FieldsQueryCursor(GetInitialQueryCursorInternal(), _keepBinary);
}
private IPlatformTargetInternal GetInitialQueryCursorInternal()
{
lock (this)
{
if (_disposed)
throw new ObjectDisposedException("Continuous query handle has been disposed.");
var cur = _nativeInitialQueryCursor;
if (cur == null)
throw new InvalidOperationException("GetInitialQueryCursor() can be called only once.");
_nativeInitialQueryCursor = null;
return cur;
}
}
/** <inheritdoc /> */
[SuppressMessage("Microsoft.Usage", "CA1816:CallGCSuppressFinalizeCorrectly",
Justification = "There is no finalizer.")]
public void Dispose()
{
lock (this)
{
if (_disposed || _nativeQry == null)
return;
try
{
_nativeQry.InLongOutLong(0, 0);
var cur = _nativeInitialQueryCursor;
if (cur != null)
{
cur.Dispose();
}
}
finally
{
_nativeQry.Dispose();
_disposed = true;
}
}
}
}
}