/**
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
 * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
 * and limitations under the License.
 */

package org.apache.storm.blobstore;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.storm.Config;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.BeginDownloadResult;
import org.apache.storm.generated.KeyAlreadyExistsException;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.ListBlobsResult;
import org.apache.storm.generated.ReadableBlobMeta;
import org.apache.storm.generated.SettableBlobMeta;
import org.apache.storm.thrift.TException;
import org.apache.storm.utils.NimbusClient;
import org.apache.storm.utils.ObjectReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * NimbusBlobStore is a USER facing client API to perform basic operations such as create, update, delete and read for local and hdfs blob
 * store.
 *
 * <p>For local blob store it is also the client facing API for supervisor in order to download blobs from nimbus.
 */
public class NimbusBlobStore extends ClientBlobStore implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(NimbusBlobStore.class);
    private NimbusClient client;
    private int bufferSize = 4096;

    @Override
    public void prepare(Map<String, Object> conf) {
        this.client = NimbusClient.getConfiguredClient(conf);
        if (conf != null) {
            this.bufferSize = ObjectReader.getInt(conf.get(Config.STORM_BLOBSTORE_INPUTSTREAM_BUFFER_SIZE_BYTES), bufferSize);
        }
    }

    @Override
    protected AtomicOutputStream createBlobToExtend(String key, SettableBlobMeta meta)
        throws AuthorizationException, KeyAlreadyExistsException {
        try {
            synchronized (client) {
                return new NimbusUploadAtomicOutputStream(client.getClient().beginCreateBlob(key, meta), this.bufferSize, key);
            }
        } catch (AuthorizationException | KeyAlreadyExistsException exp) {
            throw exp;
        } catch (TException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public AtomicOutputStream updateBlob(String key)
        throws AuthorizationException, KeyNotFoundException {
        try {
            synchronized (client) {
                return new NimbusUploadAtomicOutputStream(client.getClient().beginUpdateBlob(key), this.bufferSize, key);
            }
        } catch (AuthorizationException | KeyNotFoundException exp) {
            throw exp;
        } catch (TException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public ReadableBlobMeta getBlobMeta(String key) throws AuthorizationException, KeyNotFoundException {
        try {
            synchronized (client) {
                return client.getClient().getBlobMeta(key);
            }
        } catch (AuthorizationException | KeyNotFoundException exp) {
            throw exp;
        } catch (TException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public boolean isRemoteBlobExists(String blobKey) throws AuthorizationException {
        try {
            return client.getClient().isRemoteBlobExists(blobKey);
        } catch (AuthorizationException aze) {
            throw aze;
        } catch (TException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    protected void setBlobMetaToExtend(String key, SettableBlobMeta meta)
        throws AuthorizationException, KeyNotFoundException {
        try {
            synchronized (client) {
                client.getClient().setBlobMeta(key, meta);
            }
        } catch (AuthorizationException | KeyNotFoundException exp) {
            throw exp;
        } catch (TException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void deleteBlob(String key) throws AuthorizationException, KeyNotFoundException {
        try {
            synchronized (client) {
                client.getClient().deleteBlob(key);
            }
        } catch (AuthorizationException | KeyNotFoundException exp) {
            throw exp;
        } catch (TException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void createStateInZookeeper(String key) {
        try {
            synchronized (client) {
                client.getClient().createStateInZookeeper(key);
            }
        } catch (TException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public InputStreamWithMeta getBlob(String key) throws AuthorizationException, KeyNotFoundException {
        try {
            synchronized (client) {
                return new NimbusDownloadInputStream(client.getClient().beginBlobDownload(key));
            }
        } catch (AuthorizationException | KeyNotFoundException exp) {
            throw exp;
        } catch (TException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public Iterator<String> listKeys() {
        try {
            synchronized (client) {
                return new NimbusKeyIterator(client.getClient().listBlobs(""));
            }
        } catch (TException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public int getBlobReplication(String key) throws AuthorizationException, KeyNotFoundException {
        try {
            return client.getClient().getBlobReplication(key);
        } catch (AuthorizationException | KeyNotFoundException exp) {
            throw exp;
        } catch (TException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public int updateBlobReplication(String key, int replication) throws AuthorizationException, KeyNotFoundException {
        try {
            return client.getClient().updateBlobReplication(key, replication);
        } catch (AuthorizationException | KeyNotFoundException exp) {
            throw exp;
        } catch (TException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public boolean setClient(Map<String, Object> conf, NimbusClient client) {
        if (this.client != null) {
            this.client.close();
        }
        this.client = client;
        if (conf != null) {
            this.bufferSize = ObjectReader.getInt(conf.get(Config.STORM_BLOBSTORE_INPUTSTREAM_BUFFER_SIZE_BYTES), bufferSize);
        }
        return true;
    }

    @Override
    @SuppressWarnings("checkstyle:NoFinalizer")
    protected void finalize() {
        shutdown();
    }

    @Override
    public void shutdown() {
        if (client != null) {
            client.close();
            client = null;
        }
    }

    @Override
    public void close() {
        shutdown();
    }

    public class NimbusKeyIterator implements Iterator<String> {
        private ListBlobsResult listBlobs = null;
        private int offset = 0;
        private boolean eof = false;

        public NimbusKeyIterator(ListBlobsResult listBlobs) {
            this.listBlobs = listBlobs;
            this.eof = (listBlobs.get_keys_size() == 0);
        }

        private boolean isCacheEmpty() {
            return listBlobs.get_keys_size() <= offset;
        }

        private void readMore() throws TException {
            if (!eof) {
                offset = 0;
                synchronized (client) {
                    listBlobs = client.getClient().listBlobs(listBlobs.get_session());
                }
                if (listBlobs.get_keys_size() == 0) {
                    eof = true;
                }
            }
        }

        @Override
        public synchronized boolean hasNext() {
            try {
                if (isCacheEmpty()) {
                    readMore();
                }
            } catch (TException e) {
                throw new RuntimeException(e);
            }
            return !eof;
        }

        @Override
        public synchronized String next() {
            if (!hasNext()) {
                throw new NoSuchElementException();
            }
            String ret = listBlobs.get_keys().get(offset);
            offset++;
            return ret;
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException("Delete Not Supported");
        }
    }

    public class NimbusDownloadInputStream extends InputStreamWithMeta {
        private BeginDownloadResult beginBlobDownload;
        private byte[] buffer = null;
        private int offset = 0;
        private int end = 0;
        private boolean eof = false;

        public NimbusDownloadInputStream(BeginDownloadResult beginBlobDownload) {
            this.beginBlobDownload = beginBlobDownload;
        }

        @Override
        public long getVersion() throws IOException {
            return beginBlobDownload.get_version();
        }

        @Override
        public synchronized int read() throws IOException {
            try {
                if (isEmpty()) {
                    readMore();
                    if (eof) {
                        return -1;
                    }
                }
                int length = Math.min(1, available());
                if (length == 0) {
                    return -1;
                }
                int ret = buffer[offset];
                offset += length;
                return ret;
            } catch (TException exp) {
                throw new IOException(exp);
            }
        }

        @Override
        public synchronized int read(byte[] b, int off, int len) throws IOException {
            try {
                if (isEmpty()) {
                    readMore();
                    if (eof) {
                        return -1;
                    }
                }
                int length = Math.min(len, available());
                System.arraycopy(buffer, offset, b, off, length);
                offset += length;
                return length;
            } catch (TException exp) {
                throw new IOException(exp);
            }
        }

        @Override
        public synchronized int read(byte[] b) throws IOException {
            return read(b, 0, b.length);
        }

        private boolean isEmpty() {
            return buffer == null || offset >= end;
        }

        private void readMore() throws TException {
            if (!eof) {
                ByteBuffer buff;
                synchronized (client) {
                    buff = client.getClient().downloadBlobChunk(beginBlobDownload.get_session());
                }
                buffer = buff.array();
                offset = buff.arrayOffset() + buff.position();
                int length = buff.remaining();
                end = offset + length;
                if (length == 0) {
                    eof = true;
                }
            }
        }

        @Override
        public synchronized int available() {
            return buffer == null ? 0 : (end - offset);
        }

        @Override
        public long getFileLength() {
            return beginBlobDownload.get_data_size();
        }
    }

    public class NimbusUploadAtomicOutputStream extends AtomicOutputStream {
        private String session;
        private int maxChunkSize = 4096;
        private String key;

        public NimbusUploadAtomicOutputStream(String session, int bufferSize, String key) {
            this.session = session;
            this.maxChunkSize = bufferSize;
            this.key = key;
        }

        @Override
        public void cancel() throws IOException {
            try {
                synchronized (client) {
                    client.getClient().cancelBlobUpload(session);
                }
            } catch (TException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public void write(int b) throws IOException {
            try {
                synchronized (client) {
                    client.getClient().uploadBlobChunk(session, ByteBuffer.wrap(new byte[]{ (byte) b }));
                }
            } catch (TException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public void write(byte[] b) throws IOException {
            write(b, 0, b.length);
        }

        @Override
        public void write(byte[] b, int offset, int len) throws IOException {
            try {
                int end = offset + len;
                for (int realOffset = offset; realOffset < end; realOffset += maxChunkSize) {
                    int realLen = Math.min(end - realOffset, maxChunkSize);
                    LOG.debug("Writing {} bytes of {} remaining", realLen, (end - realOffset));
                    synchronized (client) {
                        client.getClient().uploadBlobChunk(session, ByteBuffer.wrap(b, realOffset, realLen));
                    }
                }
            } catch (TException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public void close() throws IOException {
            try {
                synchronized (client) {
                    client.getClient().finishBlobUpload(session);
                    client.getClient().createStateInZookeeper(key);
                }
            } catch (TException e) {
                throw new RuntimeException(e);
            }
        }
    }
}
