blob: 5ec4de72ec5e9e3dfc95958e59e9a8122b973c0e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Core.Impl.Client.Compute
{
using System;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;
using Apache.Ignite.Core.Client;
using Apache.Ignite.Core.Client.Compute;
using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Common;
/// <summary>
/// Compute client.
/// </summary>
internal class ComputeClient : IComputeClient
{
/** */
private readonly IgniteClient _ignite;
/** */
private readonly ComputeClientFlags _flags;
/** */
private readonly TimeSpan _timeout;
/** */
private readonly IClientClusterGroup _clusterGroup;
/// <summary>
/// Initializes a new instance of <see cref="ComputeClient"/>.
/// </summary>
internal ComputeClient(
IgniteClient ignite,
ComputeClientFlags flags,
TimeSpan? timeout,
IClientClusterGroup clusterGroup)
{
_ignite = ignite;
_flags = flags;
_timeout = timeout ?? TimeSpan.Zero;
_clusterGroup = clusterGroup;
}
/** <inheritdoc /> */
public TRes ExecuteJavaTask<TRes>(string taskName, object taskArg)
{
IgniteArgumentCheck.NotNullOrEmpty(taskName, "taskName");
return ExecuteJavaTaskAsync<TRes>(taskName, taskArg).Result;
}
/** <inheritdoc /> */
public Task<TRes> ExecuteJavaTaskAsync<TRes>(string taskName, object taskArg)
{
return ExecuteJavaTaskAsync<TRes>(taskName, taskArg, CancellationToken.None);
}
/** <inheritdoc /> */
public Task<TRes> ExecuteJavaTaskAsync<TRes>(string taskName, object taskArg,
CancellationToken cancellationToken)
{
IgniteArgumentCheck.NotNullOrEmpty(taskName, "taskName");
var tcs = new TaskCompletionSource<TRes>();
cancellationToken.Register(() => tcs.TrySetCanceled());
var keepBinary = (_flags & ComputeClientFlags.KeepBinary) == ComputeClientFlags.KeepBinary;
var task = _ignite.Socket.DoOutInOpAsync(
ClientOp.ComputeTaskExecute,
ctx => WriteJavaTaskRequest(ctx, taskName, taskArg),
ctx => ReadJavaTaskResponse(ctx, tcs, keepBinary, cancellationToken));
// ReSharper disable once AssignNullToNotNullAttribute (t.Exception won't be null).
task.ContWith(t => tcs.TrySetException(t.Exception), TaskContinuationOptions.OnlyOnFaulted);
task.ContWith(t => tcs.TrySetCanceled(), TaskContinuationOptions.OnlyOnCanceled);
return tcs.Task;
}
/** <inheritdoc /> */
public IComputeClient WithTimeout(TimeSpan timeout)
{
return _timeout != timeout
? new ComputeClient(_ignite, _flags, timeout, _clusterGroup)
: this;
}
/** <inheritdoc /> */
public IComputeClient WithNoFailover()
{
return SetFlag(ComputeClientFlags.NoFailover);
}
/** <inheritdoc /> */
public IComputeClient WithNoResultCache()
{
return SetFlag(ComputeClientFlags.NoResultCache);
}
/** <inheritdoc /> */
public IComputeClient WithKeepBinary()
{
return SetFlag(ComputeClientFlags.KeepBinary);
}
/** <inheritdoc /> */
public IClientClusterGroup ClusterGroup
{
get { return _clusterGroup ?? _ignite.GetCluster(); }
}
/// <summary>
/// Returns a new instance with the given flag enabled, or this instance if the flag is already present.
/// </summary>
private IComputeClient SetFlag(ComputeClientFlags newFlag)
{
var flags = _flags | newFlag;
return flags != _flags
? new ComputeClient(_ignite, flags, _timeout, _clusterGroup)
: this;
}
/// <summary>
/// Writes the java task.
/// </summary>
private void WriteJavaTaskRequest(ClientRequestContext ctx, string taskName, object taskArg)
{
var writer = ctx.Writer;
if (_clusterGroup != null)
{
var nodes = _clusterGroup.GetNodes();
writer.WriteInt(nodes.Count);
foreach (var node in nodes)
{
BinaryUtils.WriteGuid(node.Id, ctx.Stream);
}
}
else
{
writer.WriteInt(0);
}
writer.WriteByte((byte) _flags);
writer.WriteLong((long) _timeout.TotalMilliseconds);
writer.WriteString(taskName);
writer.WriteObject(taskArg);
ctx.Socket.ExpectNotifications();
}
/// <summary>
/// Reads java task execution response.
/// </summary>
[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
private static object ReadJavaTaskResponse<TRes>(
ClientResponseContext ctx,
TaskCompletionSource<TRes> tcs,
bool keepBinary,
CancellationToken cancellationToken)
{
var taskId = ctx.Stream.ReadLong();
cancellationToken.Register(() =>
{
ctx.Socket.DoOutInOpAsync<object>(ClientOp.ResourceClose,
c => c.Stream.WriteLong(taskId),
_ => null,
(status, msg) =>
{
if (status == ClientStatusCode.ResourceDoesNotExist)
{
// Task finished before we could cancel it - ignore.
return null;
}
throw new IgniteClientException(msg, null, status);
});
});
ctx.Socket.AddNotificationHandler(taskId, (stream, ex) =>
{
ctx.Socket.RemoveNotificationHandler(taskId);
if (ex != null)
{
tcs.TrySetException(ex);
return;
}
var reader = ctx.Marshaller.StartUnmarshal(stream,
keepBinary ? BinaryMode.ForceBinary : BinaryMode.Deserialize);
try
{
var flags = (ClientFlags) reader.ReadShort();
var opCode = (ClientOp) reader.ReadShort();
if (opCode != ClientOp.ComputeTaskFinished)
{
tcs.TrySetException(new IgniteClientException(
string.Format("Invalid server notification code. Expected {0}, but got {1}",
ClientOp.ComputeTaskFinished, opCode)));
}
else if ((flags & ClientFlags.Error) == ClientFlags.Error)
{
var status = (ClientStatusCode) reader.ReadInt();
var msg = reader.ReadString();
tcs.TrySetException(new IgniteClientException(msg, null, status));
}
else
{
tcs.TrySetResult(reader.ReadObject<TRes>());
}
}
catch (Exception e)
{
tcs.TrySetException(e);
}
});
return null;
}
}
}