/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.tinkerpop.gremlin.structure.io.gryo;

import org.apache.tinkerpop.gremlin.structure.io.IoRegistry;
import org.apache.tinkerpop.gremlin.structure.io.util.IoRegistryHelper;
import org.apache.tinkerpop.shaded.kryo.Kryo;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.function.Function;

/**
 * Gryo objects are somewhat expensive to construct (given the dependency on Kryo), therefore this pool helps re-use
 * those objects.
 *
 * @author Marko A. Rodriguez (http://markorodriguez.com)
 * @author Stephen Mallette (http://stephen.genoprime.com)
 */
public final class GryoPool {

    public static final String CONFIG_IO_GRYO_POOL_SIZE = "gremlin.io.gryo.poolSize";
    public static final String CONFIG_IO_GRYO_VERSION = "gremlin.io.gryo.version";
    public static final int CONFIG_IO_GRYO_POOL_SIZE_DEFAULT = 256;
    public static final GryoVersion CONFIG_IO_GRYO_POOL_VERSION_DEFAULT = GryoVersion.V3_0;

    public enum Type {READER, WRITER, READER_WRITER}

    private Queue<GryoReader> gryoReaders;
    private Queue<GryoWriter> gryoWriters;
    private Queue<Kryo> kryos;
    private GryoMapper mapper;

    public static GryoPool.Builder build() {
        return new GryoPool.Builder();
    }

    /**
     * Used by {@code GryoPool.Builder}.
     */
    private GryoPool() {
    }

    public GryoMapper getMapper() {
        return mapper;
    }

    public Kryo takeKryo() {
        final Kryo kryo = kryos.poll();
        return null == kryo ? mapper.createMapper() : kryo;
    }

    public GryoReader takeReader() {
        final GryoReader reader = this.gryoReaders.poll();
        return null == reader ? GryoReader.build().mapper(mapper).create() : reader;
    }

    public GryoWriter takeWriter() {
        final GryoWriter writer = this.gryoWriters.poll();
        return null == writer ? GryoWriter.build().mapper(mapper).create() : writer;
    }

    public void offerKryo(final Kryo kryo) {
        kryos.offer(kryo);
    }

    public void offerReader(final GryoReader gryoReader) {
        gryoReaders.offer(gryoReader);
    }

    public void offerWriter(final GryoWriter gryoWriter) {
        gryoWriters.offer(gryoWriter);
    }

    public <A> A readWithKryo(final Function<Kryo, A> kryoFunction) {
        final Kryo kryo = takeKryo();
        final A a = kryoFunction.apply(kryo);
        offerKryo(kryo);
        return a;
    }

    public void writeWithKryo(final Consumer<Kryo> kryoConsumer) {
        final Kryo kryo = takeKryo();
        kryoConsumer.accept(kryo);
        offerKryo(kryo);
    }

    public <A> A doWithReader(final Function<GryoReader, A> readerFunction) {
        final GryoReader gryoReader = takeReader();
        final A a = readerFunction.apply(gryoReader);
        offerReader(gryoReader);
        return a;
    }

    public void doWithWriter(final Consumer<GryoWriter> writerFunction) {
        final GryoWriter gryoWriter = takeWriter();
        writerFunction.accept(gryoWriter);
        offerWriter(gryoWriter);
    }

    private void createPool(final int poolSize, final Type type, final GryoMapper gryoMapper) {
        this.mapper = gryoMapper;
        if (type.equals(Type.READER) || type.equals(Type.READER_WRITER)) {
            gryoReaders = new LinkedBlockingQueue<>(poolSize);
            for (int i = 0; i < poolSize; i++) {
                gryoReaders.add(GryoReader.build().mapper(gryoMapper).create());
            }
        }
        if (type.equals(Type.WRITER) || type.equals(Type.READER_WRITER)) {
            gryoWriters = new LinkedBlockingQueue<>(poolSize);
            for (int i = 0; i < poolSize; i++) {
                gryoWriters.add(GryoWriter.build().mapper(gryoMapper).create());
            }
        }

        kryos = new LinkedBlockingQueue<>(poolSize);
        for (int i = 0; i < poolSize; i++) {
            kryos.add(gryoMapper.createMapper());
        }
    }

    ////

    public static class Builder {

        private int poolSize = 256;
        private List<IoRegistry> ioRegistries = new ArrayList<>();
        private Type type = Type.READER_WRITER;
        private Consumer<GryoMapper.Builder> gryoMapperConsumer = null;
        private GryoVersion version = GryoVersion.V1_0;

        /**
         * Set the version of Gryo to use for this pool.
         */
        public Builder version(final GryoVersion version) {
            this.version = version;
            return this;
        }

        /**
         * The {@code IoRegistry} class names to use for the {@code GryoPool}
         *
         * @param ioRegistryClassNames a list of class names
         * @return the update builder
         */
        public Builder ioRegistries(final List<Object> ioRegistryClassNames) {
            this.ioRegistries.addAll(IoRegistryHelper.createRegistries(ioRegistryClassNames));
            return this;
        }

        /**
         * The {@code IoRegistry} class name to use for the {@code GryoPool}
         *
         * @param ioRegistryClassName a class name
         * @return the update builder
         */
        public Builder ioRegistry(final Object ioRegistryClassName) {
            this.ioRegistries.addAll(IoRegistryHelper.createRegistries(Collections.singletonList(ioRegistryClassName)));
            return this;
        }

        /**
         * The size of the {@code GryoPool}. The size can not be changed once created.
         *
         * @param poolSize the pool size
         * @return the updated builder
         */
        public Builder poolSize(int poolSize) {
            this.poolSize = poolSize;
            return this;
        }

        /**
         * The type of {@code GryoPool} to support -- see {@code Type}
         *
         * @param type the pool type
         * @return the updated builder
         */
        public Builder type(final Type type) {
            this.type = type;
            return this;
        }

        /**
         * A consumer to update the {@code GryoMapper.Builder} once constructed.
         *
         * @param gryoMapperConsumer the {@code GryoMapper.Builder} consumer
         * @return the updated builder
         */
        public Builder initializeMapper(final Consumer<GryoMapper.Builder> gryoMapperConsumer) {
            this.gryoMapperConsumer = gryoMapperConsumer;
            return this;
        }

        /**
         * Create the {@code GryoPool} from this builder.
         *
         * @return the new pool
         */
        public GryoPool create() {
            final GryoMapper.Builder mapper = GryoMapper.build().version(version);
            final GryoPool gryoPool = new GryoPool();
            if (null != this.ioRegistries)
                this.ioRegistries.forEach(mapper::addRegistry);
            if (null != this.gryoMapperConsumer)
                this.gryoMapperConsumer.accept(mapper);
            gryoPool.createPool(this.poolSize, this.type, mapper.create());
            return gryoPool;
        }
    }
}