blob: 7471030ef2ef0e8db396b3638f4d0bb2e21649a2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* @file
* Declares ignite::impl::compute::MultipleJobComputeTaskHolder class template.
*/
#ifndef _IGNITE_IMPL_COMPUTE_MULTIPLE_JOB_COMPUTE_TASK
#define _IGNITE_IMPL_COMPUTE_MULTIPLE_JOB_COMPUTE_TASK
#include <stdint.h>
#include <vector>
#include <ignite/common/promise.h>
#include <ignite/impl/compute/compute_job_result.h>
#include <ignite/impl/compute/compute_task_holder.h>
namespace ignite
{
namespace impl
{
namespace compute
{
/**
* Multiple Job Compute task holder type-specific implementation.
* Used for broadcast.
*
* @tparam F Function type.
* @tparam R Function result type.
*/
template<typename F, typename R>
class MultipleJobComputeTaskHolder : public ComputeTaskHolder
{
public:
typedef F JobType;
typedef R ResultType;
/**
* Constructor.
*
* @param handle Job handle.
*/
MultipleJobComputeTaskHolder(int64_t handle) :
ComputeTaskHolder(handle),
result(new std::vector<ResultType>()),
error(),
promise()
{
// No-op.
}
/**
* Destructor.
*/
virtual ~MultipleJobComputeTaskHolder()
{
// No-op.
}
/**
* Process local job result.
*
* @param job Job.
* @return Policy.
*/
virtual int32_t JobResultLocal(ComputeJobHolder& job)
{
typedef ComputeJobHolderImpl<JobType, ResultType> ActualComputeJobHolder;
ActualComputeJobHolder& job0 = static_cast<ActualComputeJobHolder&>(job);
ProcessResult(job0.GetResult());
return ComputeJobResultPolicy::WAIT;
}
/**
* Process remote job result.
*
* @param reader Reader for stream with result.
* @return Policy.
*/
virtual int32_t JobResultRemote(binary::BinaryReaderImpl& reader)
{
ComputeJobResult<ResultType> res;
res.Read(reader);
ProcessResult(res);
return ComputeJobResultPolicy::WAIT;
}
/**
* Process remote job result.
*
* @param reader Reader for stream with result.
*/
virtual void JobResultError(const IgniteError& err)
{
ComputeJobResult<ResultType> res;
res.SetError(err);
ProcessResult(res);
}
/**
* Process successful result.
*
* @param value Value.
*/
virtual void JobResultSuccess(int64_t value)
{
ComputeJobResult<ResultType> res;
res.SetResult(PrimitiveFutureResult<ResultType>(value));
ProcessResult(res);
}
/**
* Process successful result.
*
* @param reader Reader for stream with result.
*/
virtual void JobResultSuccess(binary::BinaryReaderImpl& reader)
{
ComputeJobResult<ResultType> res;
res.SetResult(reader.ReadObject<ResultType>());
ProcessResult(res);
}
/**
* Process successful null result.
*/
virtual void JobNullResultSuccess()
{
ComputeJobResult<ResultType> res;
res.SetResult(impl::binary::BinaryUtils::GetDefaultValue<ResultType>());
ProcessResult(res);
}
/**
* Reduce results of related jobs.
*/
virtual void Reduce()
{
if (error.GetCode() == IgniteError::IGNITE_SUCCESS)
promise.SetValue(result);
else
promise.SetError(error);
}
/**
* Get result promise.
*
* @return Reference to result promise.
*/
common::Promise< std::vector<ResultType> >& GetPromise()
{
return promise;
}
private:
/**
* Process result.
*
* @param res Result.
*/
void ProcessResult(const ComputeJobResult<ResultType>& res)
{
const IgniteError& err = res.GetError();
if (err.GetCode() == IgniteError::IGNITE_SUCCESS)
result->push_back(res.GetResult());
else
error = err;
}
/** Result. */
std::auto_ptr< std::vector<ResultType> > result;
/** Error. */
IgniteError error;
/** Task result promise. */
common::Promise< std::vector<ResultType> > promise;
};
/**
* Compute task holder type-specific implementation.
*/
template<typename F>
class MultipleJobComputeTaskHolder<F, void> : public ComputeTaskHolder
{
public:
typedef F JobType;
/**
* Constructor.
*
* @param handle Job handle.
*/
MultipleJobComputeTaskHolder(int64_t handle) :
ComputeTaskHolder(handle)
{
// No-op.
}
/**
* Destructor.
*/
virtual ~MultipleJobComputeTaskHolder()
{
// No-op.
}
/**
* Process local job result.
*
* @param job Job.
* @return Policy.
*/
virtual int32_t JobResultLocal(ComputeJobHolder& job)
{
typedef ComputeJobHolderImpl<JobType, void> ActualComputeJobHolder;
ActualComputeJobHolder& job0 = static_cast<ActualComputeJobHolder&>(job);
ProcessResult(job0.GetResult());
return ComputeJobResultPolicy::WAIT;
}
/**
* Process remote job result.
*
* @param reader Reader for stream with result.
* @return Policy.
*/
virtual int32_t JobResultRemote(binary::BinaryReaderImpl& reader)
{
ComputeJobResult<void> res;
res.Read(reader);
ProcessResult(res);
return ComputeJobResultPolicy::WAIT;
}
/**
* Process remote job result.
*
* @param reader Reader for stream with result.
*/
virtual void JobResultError(const IgniteError& err)
{
ComputeJobResult<void> res;
res.SetError(err);
ProcessResult(res);
}
/**
* Process successful result.
*
* @param value Value.
*/
virtual void JobResultSuccess(int64_t)
{
ComputeJobResult<void> res;
res.SetResult();
ProcessResult(res);
}
/**
* Process successful result.
*
* @param reader Reader for stream with result.
*/
virtual void JobResultSuccess(binary::BinaryReaderImpl&)
{
ComputeJobResult<void> res;
res.SetResult();
ProcessResult(res);
}
/**
* Process successful null result.
*/
virtual void JobNullResultSuccess()
{
ComputeJobResult<void> res;
res.SetResult();
ProcessResult(res);
}
/**
* Reduce results of related jobs.
*/
virtual void Reduce()
{
if (error.GetCode() == IgniteError::IGNITE_SUCCESS)
promise.SetValue();
else
promise.SetError(error);
}
/**
* Get result promise.
*
* @return Reference to result promise.
*/
common::Promise<void>& GetPromise()
{
return promise;
}
private:
/**
* Process result.
*
* @param res Result.
*/
void ProcessResult(const ComputeJobResult<void>& res)
{
const IgniteError& err = res.GetError();
if (err.GetCode() != IgniteError::IGNITE_SUCCESS)
error = err;
}
/** Error. */
IgniteError error;
/** Task result promise. */
common::Promise<void> promise;
};
}
}
}
#endif //_IGNITE_IMPL_COMPUTE_MULTIPLE_JOB_COMPUTE_TASK