/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with this
 * work for additional information regarding copyright ownership. The ASF
 * licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */

package org.apache.hugegraph.backend.store;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.hugegraph.backend.query.ConditionQuery;
import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.backend.serializer.BytesBuffer;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.type.define.Directions;
import org.apache.hugegraph.type.define.HugeKeys;
import org.apache.hugegraph.util.Bytes;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.NumericUtil;
import org.apache.hugegraph.util.StringEncoding;
import com.google.common.collect.ImmutableList;

public abstract class BackendTable<Session extends BackendSession, Entry> {

    private final String table;

    private final MetaDispatcher<Session> dispatcher;

    public BackendTable(String table) {
        this.table = table.toLowerCase();
        this.dispatcher = new MetaDispatcher<>();

        this.registerMetaHandlers();
    }

    public String table() {
        return this.table;
    }

    public MetaDispatcher<Session> metaDispatcher() {
        return this.dispatcher;
    }

    public void registerMetaHandler(String name, MetaHandler<Session> handler) {
        this.dispatcher.registerMetaHandler(name, handler);
    }

    protected void registerMetaHandlers() {
        // pass
    }

    public void updateIfPresent(Session session, Entry entry) {
        // TODO: use fine-grained row lock
        synchronized (this.table) {
            assert session == null || !session.hasChanges();
            if (this.queryExist(session, entry)) {
                this.insert(session, entry);
                if (session != null) {
                    session.commit();
                }
            }
        }
    }

    public void updateIfAbsent(Session session, Entry entry) {
        // TODO: use fine-grained row lock
        synchronized (this.table) {
            assert session == null || !session.hasChanges();
            if (!this.queryExist(session, entry)) {
                this.insert(session, entry);
                if (session != null) {
                    session.commit();
                }
            }
        }
    }

    /**
     *  Mapping query-type to table-type
     * @param query origin query
     * @return corresponding table type
     */
    public static HugeType tableType(Query query) {
        HugeType type = query.resultType();

        // Mapping EDGE to EDGE_OUT/EDGE_IN
        if (type == HugeType.EDGE) {
            // We assume query OUT edges
            type = HugeType.EDGE_OUT;

            while (!(query instanceof ConditionQuery ||
                     query.originQuery() == null)) {
                /*
                 * Some backends(like RocksDB) may trans ConditionQuery to
                 * IdQuery or IdPrefixQuery, so we should get the origin query.
                 */
                query = query.originQuery();
            }

            if (query.conditionsSize() > 0 && query instanceof ConditionQuery) {
                ConditionQuery cq = (ConditionQuery) query;
                // Does query IN edges
                if (cq.condition(HugeKeys.DIRECTION) == Directions.IN) {
                    type = HugeType.EDGE_IN;
                }
            }
        }

        return type;
    }

    public static final String joinTableName(String prefix, String table) {
        return prefix + "_" + table.toLowerCase();
    }

    public abstract void init(Session session);

    public abstract void clear(Session session);

    public abstract Iterator<BackendEntry> query(Session session, Query query);

    public abstract Number queryNumber(Session session, Query query);

    public abstract boolean queryExist(Session session, Entry entry);

    public abstract void insert(Session session, Entry entry);

    public abstract void delete(Session session, Entry entry);

    public abstract void append(Session session, Entry entry);

    public abstract void eliminate(Session session, Entry entry);

    /****************************** ShardSplitter ******************************/

    public abstract static class ShardSplitter<Session extends BackendSession> {

        // The min shard size should >= 1M to prevent too many number of shards
        protected static final int MIN_SHARD_SIZE = (int) Bytes.MB;

        // We assume the size of each key-value is 100 bytes
        protected static final int ESTIMATE_BYTES_PER_KV = 100;

        public static final String START = "";
        public static final String END = "";

        private static final byte[] EMPTY = new byte[0];
        public static final byte[] START_BYTES = new byte[]{0x0};
        public static final byte[] END_BYTES = new byte[]{-1, -1, -1, -1,
                                                          -1, -1, -1, -1,
                                                          -1, -1, -1, -1,
                                                          -1, -1, -1, -1};

        private final String table;

        public ShardSplitter(String table) {
            this.table = table;
        }

        public String table() {
            return this.table;
        }

        public List<Shard> getSplits(Session session, long splitSize) {
            E.checkArgument(splitSize >= MIN_SHARD_SIZE,
                            "The split-size must be >= %s bytes, but got %s",
                            MIN_SHARD_SIZE, splitSize);

            long size = this.estimateDataSize(session);
            if (size <= 0) {
                size = this.estimateNumKeys(session) * ESTIMATE_BYTES_PER_KV;
            }

            double count = Math.ceil(size / (double) splitSize);
            if (count <= 0) {
                count = 1;
            }
            long maxKey = this.maxKey();
            Double each = maxKey / count;

            List<Shard> splits = new ArrayList<>((int) count);
            String last = START;
            long offset = 0L;
            while (offset < maxKey) {
                offset += each.longValue();
                if (offset > maxKey) {
                    splits.add(new Shard(last, END, 0L));
                    break;
                }
                String current = this.position(offset);
                splits.add(new Shard(last, current, 0L));
                last = current;
            }
            return splits;
        }

        public final String position(long position) {
            return String.valueOf(position);
        }

        public byte[] position(String position) {
            if (END.equals(position)) {
                return null;
            }
            int value = Long.valueOf(position).intValue();
            return NumericUtil.intToBytes(value);
        }

        protected long maxKey() {
            return BytesBuffer.UINT32_MAX;
        }

        protected abstract long estimateDataSize(Session session);

        protected abstract long estimateNumKeys(Session session);

        public static class Range {

            private byte[] startKey;
            private byte[] endKey;

            public Range(byte[] startKey, byte[] endKey) {
                this.startKey = Arrays.equals(EMPTY, startKey) ?
                                START_BYTES : startKey;
                this.endKey = Arrays.equals(EMPTY, endKey) ? END_BYTES : endKey;
            }

            public List<Shard> splitEven(int count) {
                if (count <= 1) {
                    return ImmutableList.of(new Shard(startKey(this.startKey),
                                                      endKey(this.endKey), 0));
                }

                byte[] start;
                byte[] end;
                boolean startChanged = false;
                boolean endChanged = false;
                int length;
                if (this.startKey.length < this.endKey.length) {
                    length = this.endKey.length;
                    start = new byte[length];
                    System.arraycopy(this.startKey, 0, start, 0,
                                     this.startKey.length);
                    end = this.endKey;
                    startChanged = true;
                } else if (this.startKey.length > this.endKey.length) {
                    length = this.startKey.length;
                    end = new byte[length];
                    System.arraycopy(this.endKey, 0, end, 0,
                                     this.endKey.length);
                    start = this.startKey;
                    endChanged = true;
                } else {
                    assert this.startKey.length == this.endKey.length;
                    length = this.startKey.length;
                    start = this.startKey;
                    end = this.endKey;
                }

                assert count > 1;
                byte[] each = align(new BigInteger(1, subtract(end, start))
                                        .divide(BigInteger.valueOf(count))
                                        .toByteArray(),
                                    length);
                byte[] offset = start;
                byte[] last = offset;
                boolean finished = false;
                List<Shard> shards = new ArrayList<>(count);
                while (Bytes.compare(offset, end) < 0 && !finished) {
                    offset = add(offset, each);
                    if (offset.length > end.length ||
                        Bytes.compare(offset, end) > 0) {
                        offset = end;
                    }
                    if (startChanged) {
                        last = this.startKey;
                        startChanged = false;
                    }
                    if (endChanged && Arrays.equals(offset, end)) {
                        offset = this.endKey;
                        finished = true;
                    }
                    shards.add(new Shard(startKey(last), endKey(offset), 0));
                    last = offset;
                }
                return shards;
            }

            private static String startKey(byte[] start) {
                return Arrays.equals(start, START_BYTES) ?
                       START : StringEncoding.encodeBase64(start);
            }

            private static String endKey(byte[] end) {
                return Arrays.equals(end, END_BYTES) ?
                       END : StringEncoding.encodeBase64(end);
            }

            private static byte[] add(byte[] array1, byte[] array2) {
                E.checkArgument(array1.length == array2.length,
                                "The length of array should be equal");
                int length = array1.length;
                byte[] result = new byte[length];
                int carry = 0;
                for (int i = length - 1; i >= 0; i--) {
                    int i1 = byte2int(array1[i]);
                    int i2 = byte2int(array2[i]);
                    int col = i1 + i2 + carry;
                    carry = (col >> 8);
                    result[i] = int2byte(col);
                }
                if (carry == 0) {
                    return result;
                }

                byte[] target = new byte[length + 1];
                target[0] = 0x1;
                System.arraycopy(result, 0, target, 1, length);
                return target;
            }

            private static byte[] subtract(byte[] array1, byte[] array2) {
                E.checkArgument(array1.length == array2.length,
                                "The length of array should be equal");
                int length = array1.length;
                byte[] result = new byte[length];
                int borrow = 0;
                for (int i = length - 1; 0 <= i; i--) {
                    int i1 = byte2int(array1[i]);
                    int i2 = byte2int(array2[i]);
                    int col = i1 - i2 + borrow;
                    borrow = (col >> 8);
                    result[i] = int2byte(col);
                }
                E.checkArgument(borrow == 0, "The array1 must >= array2");
                return result;
            }

            public static byte[] increase(byte[] array) {
                int length = array.length;
                byte[] target = new byte[length + 1];
                System.arraycopy(array, 0, target, 0, length);
                return target;
            }

            private static byte[] align(byte[] array, int length) {
                int len = array.length;
                E.checkArgument(len <= length,
                                "The length of array '%s' exceed " +
                                "align length '%s'", len, length);
                byte[] target = new byte[length];
                System.arraycopy(array, 0, target, length - len, len);
                return target;
            }

            private static int byte2int(byte b) {
                return (b & 0x000000ff);
            }

            private static byte int2byte(int i) {
                return (byte) (i & 0x000000ff);
            }
        }
    }
}
