﻿// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

using System;
using System.Collections.Generic;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Utilities.Diagnostics;
using Org.Apache.REEF.Utilities.Logging;

namespace Org.Apache.REEF.IMRU.API
{
    /// <summary>
    /// Use this class to create an IMRU Job Definition.
    /// </summary>
    /// <seealso cref="IMRUJobDefinition" />
    public sealed class IMRUJobDefinitionBuilder
    {
        private static readonly Logger Logger = Logger.GetLogger(typeof(IMRUJobDefinitionBuilder));

        private string _jobName;
        private int _numberOfMappers;
        private int _memoryPerMapper;
        private int _updateTaskMemory;
        private int _coresPerMapper;
        private int _updateTaskCores;
        private int _maxRetryNumberInRecovery;
        private IConfiguration _updateTaskStateConfiguration;
        private IConfiguration _mapTaskStateConfiguration;
        private IConfiguration _mapFunctionConfiguration;
        private IConfiguration _mapInputCodecConfiguration;
        private IConfiguration _updateFunctionCodecsConfiguration;
        private IConfiguration _reduceFunctionConfiguration;
        private IConfiguration _updateFunctionConfiguration;
        private IConfiguration _mapOutputPipelineDataConverterConfiguration;
        private IConfiguration _mapInputPipelineDataConverterConfiguration;
        private IConfiguration _partitionedDatasetConfiguration;
        private IConfiguration _resultHandlerConfiguration;
        private readonly ISet<IConfiguration> _perMapConfigGeneratorConfig;
        private bool _invokeGC;

        private static readonly IConfiguration EmptyConfiguration =
            TangFactory.GetTang().NewConfigurationBuilder().Build();

        /// <summary>
        /// Constructor
        /// </summary>
        public IMRUJobDefinitionBuilder()
        {
            _updateTaskStateConfiguration = EmptyConfiguration;
            _mapTaskStateConfiguration = EmptyConfiguration;
            _mapInputPipelineDataConverterConfiguration = EmptyConfiguration;
            _mapOutputPipelineDataConverterConfiguration = EmptyConfiguration;
            _partitionedDatasetConfiguration = EmptyConfiguration;
            _resultHandlerConfiguration = EmptyConfiguration;
            _memoryPerMapper = 512;
            _updateTaskMemory = 512;
            _coresPerMapper = 1;
            _updateTaskCores = 1;
            _maxRetryNumberInRecovery = 0;
            _invokeGC = true;
            _perMapConfigGeneratorConfig = new HashSet<IConfiguration>();
        }

        /// <summary>
        /// Set the name of the job.
        /// </summary>
        /// <param name="name">the name of the job</param>
        /// <returns>this</returns>
        public IMRUJobDefinitionBuilder SetJobName(string name)
        {
            _jobName = name;
            return this;
        }

        /// <summary>
        /// Sets configuration of map task state
        /// </summary>
        /// <param name="mapTaskStateConfiguration">Configuration for map task state</param>
        /// <returns>this</returns>
        public IMRUJobDefinitionBuilder SetMapTaskStateConfiguration(IConfiguration mapTaskStateConfiguration)
        {
            _mapTaskStateConfiguration = mapTaskStateConfiguration;
            return this;
        }

        /// <summary>
        /// Sets configuration of update task state
        /// </summary>
        /// <param name="updateTaskStateConfiguration">Configuration for update task state</param>
        /// <returns>this</returns>
        public IMRUJobDefinitionBuilder SetUpdateTaskStateConfiguration(IConfiguration updateTaskStateConfiguration)
        {
            _updateTaskStateConfiguration = updateTaskStateConfiguration;
            return this;
        }

        /// <summary>
        /// Sets configuration of map function
        /// </summary>
        /// <param name="mapFunctionConfiguration">Configuration</param>
        /// <returns>this</returns>
        public IMRUJobDefinitionBuilder SetMapFunctionConfiguration(IConfiguration mapFunctionConfiguration)
        {
            _mapFunctionConfiguration = mapFunctionConfiguration;
            return this;
        }

        /// <summary>
        /// Sets configuration of codec for TMapInput
        /// </summary>
        /// <param name="mapInputCodecConfiguration">Configuration</param>
        /// <returns>this</returns>
        public IMRUJobDefinitionBuilder SetMapInputCodecConfiguration(IConfiguration mapInputCodecConfiguration)
        {
            _mapInputCodecConfiguration = mapInputCodecConfiguration;
            return this;
        }

        /// <summary>
        /// Sets configuration of codecs needed by Update function
        /// </summary>
        /// <param name="updateFunctionCodecsConfiguration">Configuration</param>
        /// <returns>this</returns>
        public IMRUJobDefinitionBuilder SetUpdateFunctionCodecsConfiguration(
            IConfiguration updateFunctionCodecsConfiguration)
        {
            _updateFunctionCodecsConfiguration = updateFunctionCodecsConfiguration;
            return this;
        }

        /// <summary>
        /// Sets configuration of reduce function
        /// </summary>
        /// <param name="reduceFunctionConfiguration">Configuration</param>
        /// <returns>this</returns>
        public IMRUJobDefinitionBuilder SetReduceFunctionConfiguration(IConfiguration reduceFunctionConfiguration)
        {
            _reduceFunctionConfiguration = reduceFunctionConfiguration;
            return this;
        }

        /// <summary>
        /// Sets configuration of update function
        /// </summary>
        /// <param name="updateFunctionConfiguration">Configuration</param>
        /// <returns>this</returns>
        public IMRUJobDefinitionBuilder SetUpdateFunctionConfiguration(IConfiguration updateFunctionConfiguration)
        {
            _updateFunctionConfiguration = updateFunctionConfiguration;
            return this;
        }

        /// <summary>
        /// Sets configuration of PipelineDataConverter for Map output
        /// </summary>
        /// <param name="mapOutputPipelineDataConverterConfiguration">Configuration</param>
        /// <returns>this</returns>
        public IMRUJobDefinitionBuilder SetMapOutputPipelineDataConverterConfiguration(
            IConfiguration mapOutputPipelineDataConverterConfiguration)
        {
            _mapOutputPipelineDataConverterConfiguration = mapOutputPipelineDataConverterConfiguration;
            return this;
        }

        /// <summary>
        /// Sets configuration of PipelineDataConverter for Map Input
        /// </summary>
        /// <param name="mapInputPipelineDataConverterConfiguration">Configuration</param>
        /// <returns>this</returns>
        public IMRUJobDefinitionBuilder SetMapInputPipelineDataConverterConfiguration(
            IConfiguration mapInputPipelineDataConverterConfiguration)
        {
            _mapInputPipelineDataConverterConfiguration = mapInputPipelineDataConverterConfiguration;
            return this;
        }

        /// <summary>
        /// Sets configuration of partitioned dataset
        /// </summary>
        /// <param name="partitionedDatasetConfiguration">Configuration</param>
        /// <returns>this</returns>
        public IMRUJobDefinitionBuilder SetPartitionedDatasetConfiguration(
            IConfiguration partitionedDatasetConfiguration)
        {
            _partitionedDatasetConfiguration = partitionedDatasetConfiguration;
            return this;
        }

        /// <summary>
        /// Sets Number of mappers
        /// </summary>
        /// <param name="numberOfMappers">Number of mappers</param>
        /// <returns>this</returns>
        /// TODO: This is duplicate in a sense that it can be determined 
        /// TODO: automatically from IPartitionedDataset. However, right now 
        /// TODO: GroupComm. instantiated in IMRUDriver needs this parameter 
        /// TODO: in constructor. This will be removed once we remove it from GroupComm.
        public IMRUJobDefinitionBuilder SetNumberOfMappers(int numberOfMappers)
        {
            _numberOfMappers = numberOfMappers;
            return this;
        }

        /// <summary>
        /// Sets mapper memory
        /// </summary>
        /// <param name="memory">memory in MB</param>
        /// <returns></returns>
        public IMRUJobDefinitionBuilder SetMapperMemory(int memory)
        {
            _memoryPerMapper = memory;
            return this;
        }

        /// <summary>
        /// Set update task memory
        /// </summary>
        /// <param name="memory">memory in MB</param>
        /// <returns></returns>
        public IMRUJobDefinitionBuilder SetUpdateTaskMemory(int memory)
        {
            _updateTaskMemory = memory;
            return this;
        }

        /// <summary>
        /// Sets cores for map tasks
        /// </summary>
        /// <param name="cores">number of cores</param>
        /// <returns></returns>
        public IMRUJobDefinitionBuilder SetMapTaskCores(int cores)
        {
            _coresPerMapper = cores;
            return this;
        }

        /// <summary>
        /// Set update task cores
        /// </summary>
        /// <param name="cores">number of cores</param>
        /// <returns></returns>
        public IMRUJobDefinitionBuilder SetUpdateTaskCores(int cores)
        {
            _updateTaskCores = cores;
            return this;
        }

        /// <summary>
        /// Set max number of retries done if first run of IMRU job failed.
        /// </summary>
        /// <param name="maxRetryNumberInRecovery">Max number of retries</param>
        /// <returns></returns>
        public IMRUJobDefinitionBuilder SetMaxRetryNumberInRecovery(int maxRetryNumberInRecovery)
        {
            _maxRetryNumberInRecovery = maxRetryNumberInRecovery;
            return this;
        }

        /// <summary>
        /// Sets Per Map Configuration
        /// </summary>
        /// <param name="perMapperConfig">Mapper configs</param>
        /// <returns></returns>
        public IMRUJobDefinitionBuilder SetPerMapConfigurations(IConfiguration perMapperConfig)
        {
            _perMapConfigGeneratorConfig.Add(perMapperConfig);
            return this;
        }

        /// <summary>
        /// Sets Result handler Configuration
        /// </summary>
        /// <param name="resultHandlerConfig">Result handler config</param>
        /// <returns></returns>
        public IMRUJobDefinitionBuilder SetResultHandlerConfiguration(IConfiguration resultHandlerConfig)
        {
            _resultHandlerConfiguration = resultHandlerConfig;
            return this;
        }

        /// <summary>
        /// Whether to invoke Garbage Collector after each IMRU iteration
        /// </summary>
        /// <param name="invokeGC">variable telling whether to invoke or not</param>
        /// <returns>The modified definition builder</returns>
        public IMRUJobDefinitionBuilder InvokeGarbageCollectorAfterIteration(bool invokeGC)
        {
            _invokeGC = invokeGC;
            return this;
        }

        /// <summary>
        /// Instantiate the IMRUJobDefinition.
        /// </summary>
        /// <returns>The IMRUJobDefintion configured.</returns>
        /// <exception cref="NullReferenceException">If any of the required parameters is not set.</exception>
        public IMRUJobDefinition Build()
        {
            if (null == _jobName)
            {
                Exceptions.Throw(new NullReferenceException("Job name cannot be null"),
                    Logger);
            }

            if (null == _mapFunctionConfiguration)
            {
                Exceptions.Throw(new NullReferenceException("Map function configuration cannot be null"), Logger);
            }

            if (null == _mapInputCodecConfiguration)
            {
                Exceptions.Throw(new NullReferenceException("Map input codec configuration cannot be null"), Logger);
            }

            if (null == _updateFunctionCodecsConfiguration)
            {
                Exceptions.Throw(new NullReferenceException("Update function codecs configuration cannot be null"),
                    Logger);
            }

            if (null == _reduceFunctionConfiguration)
            {
                Exceptions.Throw(new NullReferenceException("Reduce function configuration cannot be null"), Logger);
            }

            if (null == _updateFunctionConfiguration)
            {
                Exceptions.Throw(new NullReferenceException("Update function configuration cannot be null"), Logger);
            }

            return new IMRUJobDefinition(
                _updateTaskStateConfiguration,
                _mapTaskStateConfiguration,
                _mapFunctionConfiguration,
                _mapInputCodecConfiguration,
                _updateFunctionCodecsConfiguration,
                _reduceFunctionConfiguration,
                _updateFunctionConfiguration,
                _mapOutputPipelineDataConverterConfiguration,
                _mapInputPipelineDataConverterConfiguration,
                _partitionedDatasetConfiguration,
                _resultHandlerConfiguration,
                _perMapConfigGeneratorConfig,
                _numberOfMappers,
                _memoryPerMapper,
                _updateTaskMemory,
                _coresPerMapper,
                _updateTaskCores,
                _maxRetryNumberInRecovery,
                _jobName,
                _invokeGC);
        }
    }
}