blob: 322148f8ad22e13d77b7c7a8a0cf18144ab5285e [file] [log] [blame]
package org.apache.ignite.spi.discovery.tcp.ipfinder.azure;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.specialized.BlockBlobClient;
import com.azure.storage.common.StorageSharedKeyCredential;
import java.io.ByteArrayInputStream;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiConfiguration;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinderAdapter;
/**
* Azure Blob Storage based IP Finder
* <p>
* For information about Blob Storage visit <a href="https://azure.microsoft.com/en-in/services/storage/blobs/">azure.microsoft.com</a>.
* <h1 class="header">Configuration</h1>
* <h2 class="header">Mandatory</h2>
* <ul>
* <li>AccountName (see {@link #setAccountName(String)})</li>
* <li>AccountKey (see {@link #setAccountKey(String)})</li>
* <li>Account Endpoint (see {@link #setAccountEndpoint(String)})</li>
* <li>Container Name (see {@link #setContainerName(String)})</li>
* </ul>
* <h2 class="header">Optional</h2>
* <ul>
* <li>Shared flag (see {@link #setShared(boolean)})</li>
* </ul>
* <p>
* The finder will create a container with the provided name. The container will contain entries named
* like the following: {@code 192.168.1.136#1001}.
* <p>
* Note that storing data in Azure Blob Storage service will result in charges to your Azure account.
* Choose another implementation of {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} for local
* or home network tests.
* <p>
* Note that this finder is shared by default (see {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder#isShared()}.
*/
public class TcpDiscoveryAzureBlobStoreIpFinder extends TcpDiscoveryIpFinderAdapter {
/** Default object's content. */
private static final byte[] OBJECT_CONTENT = new byte[0];
/** Grid logger. */
@LoggerResource
private IgniteLogger log;
/** Azure Blob Storage's account name*/
private String accountName;
/** Azure Blob Storage's account key */
private String accountKey;
/** End point URL */
private String endPoint;
/** Container name */
private String containerName;
/** Storage credential */
StorageSharedKeyCredential credential;
/** Blob service client */
private BlobServiceClient blobServiceClient;
/** Blob container client */
private BlobContainerClient blobContainerClient;
/** Init routine guard. */
private final AtomicBoolean initGuard = new AtomicBoolean();
/** Init routine latch. */
private final CountDownLatch initLatch = new CountDownLatch(1);
/**
* Default constructor
*/
public TcpDiscoveryAzureBlobStoreIpFinder() {
setShared(true);
}
/** {@inheritDoc} */
@Override public Collection<InetSocketAddress> getRegisteredAddresses() throws IgniteSpiException {
init();
Collection<InetSocketAddress> addrs = new ArrayList<>();
Set<String> seenBlobNames = new HashSet<>();
for (BlobItem blobItem : blobContainerClient.listBlobs()) {
try {
if (!blobItem.isDeleted()) {
addrs.add(addrFromString(blobItem.getName()));
seenBlobNames.add(blobItem.getName());
}
}
catch (Exception e) {
throw new IgniteSpiException("Failed to get content from the container: " + containerName, e);
}
}
return addrs;
}
/** {@inheritDoc} */
@Override public void registerAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException {
assert !F.isEmpty(addrs);
init();
for (InetSocketAddress addr : addrs) {
try {
String key = URLEncoder.encode(keyFromAddr(addr), StandardCharsets.UTF_8.name());
BlockBlobClient blobClient = blobContainerClient.getBlobClient(key).getBlockBlobClient();
blobClient.upload(new ByteArrayInputStream(OBJECT_CONTENT), OBJECT_CONTENT.length);
}
catch (UnsupportedEncodingException e) {
throw new IgniteSpiException("Unable to encode URL due to error " + e.getMessage(), e);
}
catch (BlobStorageException e) {
// If the blob already exists, ignore
if (e.getStatusCode() != 409)
throw new IgniteSpiException("Failed to upload blob with exception " + e.getMessage(), e);
}
}
}
/** {@inheritDoc} */
@Override public void unregisterAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException {
assert !F.isEmpty(addrs);
init();
for (InetSocketAddress addr : addrs) {
String key = keyFromAddr(addr);
try {
blobContainerClient.getBlobClient(key).delete();
} catch (Exception e) {
// https://github.com/Azure/azure-sdk-for-java/issues/20551
if ((!(e.getMessage().contains("InterruptedException"))) || (e instanceof BlobStorageException
&& (((BlobStorageException)e).getErrorCode() != BlobErrorCode.BLOB_NOT_FOUND))) {
throw new IgniteSpiException("Failed to delete entry [containerName=" + containerName +
", entry=" + key + ']', e);
}
}
}
}
/**
* Sets Azure Blob Storage Account Name.
* <p>
* For details refer to Azure Blob Storage API reference.
*
* @param accountName Account Name
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = false)
public TcpDiscoveryAzureBlobStoreIpFinder setAccountName(String accountName) {
this.accountName = accountName;
return this;
}
/**
* Sets Azure Blob Storage Account Key
* <p>
* For details refer to Azure Blob Storage API reference.
*
* @param accountKey Account Key
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = false)
public TcpDiscoveryAzureBlobStoreIpFinder setAccountKey(String accountKey) {
this.accountKey = accountKey;
return this;
}
/**
* Sets Azure Blob Storage endpoint
* <p>
* For details refer to Azure Blob Storage API reference.
*
* @param endPoint Endpoint for storage
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = false)
public TcpDiscoveryAzureBlobStoreIpFinder setAccountEndpoint(String endPoint) {
this.endPoint = endPoint;
return this;
}
/**
* Sets container name for using in the context
* If the container name doesn't exist Ignite will automatically create itß.
*
* @param containerName Container Name.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = false)
public TcpDiscoveryAzureBlobStoreIpFinder setContainerName(String containerName) {
this.containerName = containerName;
return this;
}
/**
* Initialize the IP finder
* @throws IgniteSpiException
*/
private void init() throws IgniteSpiException {
if (initGuard.compareAndSet(false, true)) {
if (accountKey == null || accountName == null || containerName == null || endPoint == null) {
throw new IgniteSpiException(
"One or more of the required parameters is not set [accountName=" +
accountName + ", accountKey=" + accountKey + ", containerName=" +
containerName + ", endPoint=" + endPoint + "]");
}
try {
credential = new StorageSharedKeyCredential(accountName, accountKey);
blobServiceClient = new BlobServiceClientBuilder().endpoint(endPoint).credential(credential).buildClient();
blobContainerClient = blobServiceClient.getBlobContainerClient(containerName);
if (!blobContainerClient.exists()) {
U.warn(log, "Container doesn't exist, will create it [containerName=" + containerName + "]");
blobContainerClient.create();
}
}
finally {
initLatch.countDown();
}
}
else {
try {
U.await(initLatch);
}
catch (IgniteInterruptedCheckedException e) {
throw new IgniteSpiException("Thread has been interrupted.", e);
}
try {
if (!blobContainerClient.exists())
throw new IgniteSpiException("IpFinder has not been initialized properly");
} catch (Exception e) {
// Check if this is a nested exception wrapping an InterruptedException
// https://github.com/Azure/azure-sdk-for-java/issues/20551
if (!(e.getCause() instanceof InterruptedException)) {
throw e;
}
}
}
}
/**
* Constructs a node address from bucket's key.
*
* @param key Bucket key.
* @return Node address.
* @throws IgniteSpiException In case of error.
*/
private InetSocketAddress addrFromString(String key) throws IgniteSpiException {
//TODO: This needs to move out to a generic helper class
String[] res = key.split("#");
if (res.length != 2)
throw new IgniteSpiException("Invalid address string: " + key);
int port;
try {
port = Integer.parseInt(res[1]);
}
catch (NumberFormatException ignored) {
throw new IgniteSpiException("Invalid port number: " + res[1]);
}
return new InetSocketAddress(res[0], port);
}
/**
* Constructs bucket's key from an address.
*
* @param addr Node address.
* @return Bucket key.
*/
private String keyFromAddr(InetSocketAddress addr) {
// TODO: This needs to move out to a generic helper class
return addr.getAddress().getHostAddress() + "#" + addr.getPort();
}
/** {@inheritDoc} */
@Override public TcpDiscoveryAzureBlobStoreIpFinder setShared(boolean shared) {
super.setShared(shared);
return this;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoveryAzureBlobStoreIpFinder.class, this);
}
}