package org.apache.ignite.spi.discovery.tcp.ipfinder.azure;
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.specialized.BlockBlobClient;
import com.azure.storage.common.StorageSharedKeyCredential;
import java.io.ByteArrayInputStream;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiConfiguration;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinderAdapter;

/**
 * Azure Blob Storage based IP Finder
 * <p>
 * For information about Blob Storage visit <a href="https://azure.microsoft.com/en-in/services/storage/blobs/">azure.microsoft.com</a>.
 * <h1 class="header">Configuration</h1>
 * <h2 class="header">Mandatory</h2>
 * <ul>
 *      <li>AccountName (see {@link #setAccountName(String)})</li>
 *      <li>AccountKey (see {@link #setAccountKey(String)})</li>
 *      <li>Account Endpoint (see {@link #setAccountEndpoint(String)})</li>
 *      <li>Container Name (see {@link #setContainerName(String)})</li>
 * </ul>
 * <h2 class="header">Optional</h2>
 * <ul>
 *      <li>Shared flag (see {@link #setShared(boolean)})</li>
 * </ul>
 * <p>
 * The finder will create a container with the provided name. The container will contain entries named
 * like the following: {@code 192.168.1.136#1001}.
 * <p>
 * Note that storing data in Azure Blob Storage service will result in charges to your Azure account.
 * Choose another implementation of {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} for local
 * or home network tests.
 * <p>
 * Note that this finder is shared by default (see {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder#isShared()}.
 */
public class TcpDiscoveryAzureBlobStoreIpFinder extends TcpDiscoveryIpFinderAdapter {
    /** Default object's content. */
    private static final byte[] OBJECT_CONTENT = new byte[0];

    /** Grid logger. */
    @LoggerResource
    private IgniteLogger log;

    /** Azure Blob Storage's account name*/
    private String accountName;

    /** Azure Blob Storage's account key */
    private String accountKey;

    /** End point URL */
    private String endPoint;

    /** Container name */
    private String containerName;

    /** Storage credential */
    StorageSharedKeyCredential credential;

    /** Blob service client */
    private BlobServiceClient blobServiceClient;

    /** Blob container client */
    private BlobContainerClient blobContainerClient;

    /** Init routine guard. */
    private final AtomicBoolean initGuard = new AtomicBoolean();

    /** Init routine latch. */
    private final CountDownLatch initLatch = new CountDownLatch(1);

    /**
     * Default constructor
     */
    public TcpDiscoveryAzureBlobStoreIpFinder() {
        setShared(true);
    }

    /** {@inheritDoc} */
    @Override public Collection<InetSocketAddress> getRegisteredAddresses() throws IgniteSpiException {
        init();

        Collection<InetSocketAddress> addrs = new ArrayList<>();
        Set<String> seenBlobNames = new HashSet<>();

        for (BlobItem blobItem : blobContainerClient.listBlobs()) {
            try {
                if (!blobItem.isDeleted()) {
                    addrs.add(addrFromString(blobItem.getName()));
                    seenBlobNames.add(blobItem.getName());
                }
            }
            catch (Exception e) {
                throw new IgniteSpiException("Failed to get content from the container: " + containerName, e);
            }
        }

        return addrs;
    }

    /** {@inheritDoc} */
    @Override public void registerAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException {
        assert !F.isEmpty(addrs);

        init();

        for (InetSocketAddress addr : addrs) {
            try {
                String key = URLEncoder.encode(keyFromAddr(addr), StandardCharsets.UTF_8.name());
                BlockBlobClient blobClient = blobContainerClient.getBlobClient(key).getBlockBlobClient();

                blobClient.upload(new ByteArrayInputStream(OBJECT_CONTENT), OBJECT_CONTENT.length);
            }
            catch (UnsupportedEncodingException e) {
                throw new IgniteSpiException("Unable to encode URL due to error " + e.getMessage(), e);
            }
            catch (BlobStorageException e) {
                // If the blob already exists, ignore
                if (e.getStatusCode() != 409)
                    throw new IgniteSpiException("Failed to upload blob with exception " + e.getMessage(), e);
            }
        }
    }

    /** {@inheritDoc} */
    @Override public void unregisterAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException {
        assert !F.isEmpty(addrs);

        init();

        for (InetSocketAddress addr : addrs) {
            String key = keyFromAddr(addr);

            try {
                blobContainerClient.getBlobClient(key).delete();
            } catch (Exception e) {
                // https://github.com/Azure/azure-sdk-for-java/issues/20551
                if ((!(e.getMessage().contains("InterruptedException"))) || (e instanceof BlobStorageException
                    && (((BlobStorageException)e).getErrorCode() != BlobErrorCode.BLOB_NOT_FOUND))) {
                    throw new IgniteSpiException("Failed to delete entry [containerName=" + containerName +
                        ", entry=" + key + ']', e);
                }
            }
        }
    }

    /**
     * Sets Azure Blob Storage Account Name.
     * <p>
     * For details refer to Azure Blob Storage API reference.
     *
     * @param accountName Account Name
     * @return {@code this} for chaining.
     */
    @IgniteSpiConfiguration(optional = false)
    public TcpDiscoveryAzureBlobStoreIpFinder setAccountName(String accountName) {
        this.accountName = accountName;

        return this;
    }

    /**
     * Sets Azure Blob Storage Account Key
     * <p>
     * For details refer to Azure Blob Storage API reference.
     *
     * @param accountKey Account Key
     * @return {@code this} for chaining.
     */
    @IgniteSpiConfiguration(optional = false)
    public TcpDiscoveryAzureBlobStoreIpFinder setAccountKey(String accountKey) {
        this.accountKey = accountKey;

        return this;
    }

    /**
     * Sets Azure Blob Storage endpoint
     * <p>
     * For details refer to Azure Blob Storage API reference.
     *
     * @param endPoint Endpoint for storage
     * @return {@code this} for chaining.
     */
    @IgniteSpiConfiguration(optional = false)
    public TcpDiscoveryAzureBlobStoreIpFinder setAccountEndpoint(String endPoint) {
        this.endPoint = endPoint;

        return this;
    }

    /**
     * Sets container name for using in the context
     * If the container name doesn't exist Ignite will automatically create itß.
     *
     * @param containerName Container Name.
     * @return {@code this} for chaining.
     */
    @IgniteSpiConfiguration(optional = false)
    public TcpDiscoveryAzureBlobStoreIpFinder setContainerName(String containerName) {
        this.containerName = containerName;

        return this;
    }

    /**
     * Initialize the IP finder
     * @throws IgniteSpiException
     */
    private void init() throws IgniteSpiException {
        if (initGuard.compareAndSet(false, true)) {
            if (accountKey == null || accountName == null || containerName == null || endPoint == null) {
                throw new IgniteSpiException(
                        "One or more of the required parameters is not set [accountName=" +
                                accountName + ", accountKey=" + accountKey + ", containerName=" +
                                containerName + ", endPoint=" + endPoint + "]");
            }

            try {
                credential = new StorageSharedKeyCredential(accountName, accountKey);
                blobServiceClient = new BlobServiceClientBuilder().endpoint(endPoint).credential(credential).buildClient();
                blobContainerClient = blobServiceClient.getBlobContainerClient(containerName);

                if (!blobContainerClient.exists()) {
                    U.warn(log, "Container doesn't exist, will create it [containerName=" + containerName + "]");

                    blobContainerClient.create();
                }
            }
            finally {
                initLatch.countDown();
            }
        }
        else {
            try {
                U.await(initLatch);
            }
            catch (IgniteInterruptedCheckedException e) {
                throw new IgniteSpiException("Thread has been interrupted.", e);
            }

            try {
                if (!blobContainerClient.exists())
                    throw new IgniteSpiException("IpFinder has not been initialized properly");
            } catch (Exception e) {
                // Check if this is a nested exception wrapping an InterruptedException
                // https://github.com/Azure/azure-sdk-for-java/issues/20551
                if (!(e.getCause() instanceof InterruptedException)) {
                    throw e;
                }
            }
        }
    }

    /**
     * Constructs a node address from bucket's key.
     *
     * @param key Bucket key.
     * @return Node address.
     * @throws IgniteSpiException In case of error.
     */
    private InetSocketAddress addrFromString(String key) throws IgniteSpiException {
        //TODO: This needs to move out to a generic helper class
        String[] res = key.split("#");

        if (res.length != 2)
            throw new IgniteSpiException("Invalid address string: " + key);

        int port;

        try {
            port = Integer.parseInt(res[1]);
        }
        catch (NumberFormatException ignored) {
            throw new IgniteSpiException("Invalid port number: " + res[1]);
        }

        return new InetSocketAddress(res[0], port);
    }

    /**
     * Constructs bucket's key from an address.
     *
     * @param addr Node address.
     * @return Bucket key.
     */
    private String keyFromAddr(InetSocketAddress addr) {
        // TODO: This needs to move out to a generic helper class
        return addr.getAddress().getHostAddress() + "#" + addr.getPort();
    }

    /** {@inheritDoc} */
    @Override public TcpDiscoveryAzureBlobStoreIpFinder setShared(boolean shared) {
        super.setShared(shared);

        return this;
    }

    /** {@inheritDoc} */
    @Override public String toString() {
        return S.toString(TcpDiscoveryAzureBlobStoreIpFinder.class, this);
    }
}
