| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.spi.discovery.tcp.ipfinder.s3; |
| |
| import java.io.ByteArrayInputStream; |
| import java.net.InetSocketAddress; |
| import java.nio.charset.StandardCharsets; |
| import java.util.Collection; |
| import java.util.LinkedList; |
| import java.util.StringTokenizer; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.regex.Pattern; |
| import com.amazonaws.AmazonClientException; |
| import com.amazonaws.ClientConfiguration; |
| import com.amazonaws.auth.AWSCredentials; |
| import com.amazonaws.auth.AWSCredentialsProvider; |
| import com.amazonaws.services.s3.AmazonS3; |
| import com.amazonaws.services.s3.AmazonS3Client; |
| import com.amazonaws.services.s3.model.ObjectListing; |
| import com.amazonaws.services.s3.model.ObjectMetadata; |
| import com.amazonaws.services.s3.model.S3ObjectSummary; |
| import org.apache.commons.codec.binary.Base32; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.internal.IgniteInterruptedCheckedException; |
| import org.apache.ignite.internal.util.tostring.GridToStringExclude; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.SB; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.resources.LoggerResource; |
| import org.apache.ignite.spi.IgniteSpiConfiguration; |
| import org.apache.ignite.spi.IgniteSpiException; |
| import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinderAdapter; |
| import org.apache.ignite.spi.discovery.tcp.ipfinder.s3.encrypt.EncryptionService; |
| import org.jetbrains.annotations.Nullable; |
| |
| /** |
| * AWS S3-based IP finder. |
| * <p> |
| * For information about Amazon S3 visit <a href="http://aws.amazon.com">aws.amazon.com</a>. |
| * <h1 class="header">Configuration</h1> |
| * <h2 class="header">Mandatory</h2> |
| * <ul> |
| * <li>AWS credentials (see {@link #setAwsCredentials(AWSCredentials)} and |
| * {@link #setAwsCredentialsProvider(AWSCredentialsProvider)}</li> |
| * <li>Bucket name (see {@link #setBucketName(String)})</li> |
| * </ul> |
| * <h2 class="header">Optional</h2> |
| * <ul> |
| * <li>Client configuration (see {@link #setClientConfiguration(ClientConfiguration)})</li> |
| * <li>Shared flag (see {@link #setShared(boolean)})</li> |
| * <li>Bucket endpoint (see {@link #setBucketEndpoint(String)})</li> |
| * <li>Server side encryption algorithm (see {@link #setSSEAlgorithm(String)})</li> |
| * <li>Key prefix for the node addresses (see {@link #setKeyPrefix(String)})</li> |
| * <li>Client side encryption service (see {@link #setEncryptionService(EncryptionService)})</li> |
| * </ul> |
| * <p> |
| * The finder will create S3 bucket with configured name. The bucket will contain entries named like the following: |
| * {@code 192.168.1.136#1001}. |
| * <p> |
| * Note that storing data in AWS S3 service will result in charges to your AWS account. Choose another implementation of |
| * {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} for local or home network tests. |
| * <p> |
| * Note that this finder is shared by default (see {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder#isShared()}. |
| */ |
| public class TcpDiscoveryS3IpFinder extends TcpDiscoveryIpFinderAdapter { |
| /** Delimiter to use in S3 entries name. */ |
| private static final String DELIM = "#"; |
| |
| /** Entry content. */ |
| private static final byte[] ENTRY_CONTENT = new byte[] {1}; |
| |
| /** Entry metadata. */ |
| @GridToStringExclude |
| private final ObjectMetadata objMetadata = new ObjectMetadata(); |
| |
| /** Grid logger. */ |
| @LoggerResource |
| private IgniteLogger log; |
| |
| /** Client to interact with S3 storage. */ |
| @GridToStringExclude |
| private AmazonS3 s3; |
| |
| /** Bucket name. */ |
| private String bucketName; |
| |
| /** Bucket endpoint. */ |
| @Nullable private String bucketEndpoint; |
| |
| /** Server side encryption algorithm. */ |
| @Nullable private String sseAlg; |
| |
| /** Sub-folder name to write node addresses. */ |
| @Nullable private String keyPrefix; |
| |
| /** Encryption service. **/ |
| @Nullable private EncryptionService encryptionSvc; |
| |
| /** Init guard. */ |
| @GridToStringExclude |
| private final AtomicBoolean initGuard = new AtomicBoolean(); |
| |
| /** Init latch. */ |
| @GridToStringExclude |
| private final CountDownLatch initLatch = new CountDownLatch(1); |
| |
| /** Amazon client configuration. */ |
| private ClientConfiguration cfg; |
| |
| /** AWS Credentials. */ |
| @GridToStringExclude |
| private AWSCredentials cred; |
| |
| /** AWS Credentials. */ |
| @GridToStringExclude |
| private AWSCredentialsProvider credProvider; |
| |
| /** |
| * Constructor. |
| */ |
| public TcpDiscoveryS3IpFinder() { |
| setShared(true); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Collection<InetSocketAddress> getRegisteredAddresses() throws IgniteSpiException { |
| initClient(); |
| |
| Collection<InetSocketAddress> addrs = new LinkedList<>(); |
| |
| try { |
| ObjectListing list = keyPrefix == null ? s3.listObjects(bucketName) : s3.listObjects(bucketName, keyPrefix); |
| |
| while (true) { |
| for (S3ObjectSummary sum : list.getObjectSummaries()) { |
| InetSocketAddress addr = addr(sum); |
| |
| if (addr != null) |
| addrs.add(addr); |
| } |
| |
| if (list.isTruncated()) |
| list = s3.listNextBatchOfObjects(list); |
| else |
| break; |
| } |
| } |
| catch (AmazonClientException e) { |
| throw new IgniteSpiException("Failed to list objects in the bucket: " + bucketName, e); |
| } |
| |
| return addrs; |
| } |
| |
| /** |
| * Parses the S3 key to return the ip and addresses. |
| * |
| * @param sum S3 Object summary. |
| */ |
| private InetSocketAddress addr(S3ObjectSummary sum) { |
| String key = sum.getKey(); |
| String addr = key; |
| |
| if (keyPrefix != null) |
| addr = key.replaceFirst(Pattern.quote(keyPrefix), ""); |
| |
| if (encryptionSvc != null) { |
| byte[] encBytes = new Base32().decode(addr.getBytes(StandardCharsets.UTF_8)); |
| byte[] decBytes = encryptionSvc.decrypt(encBytes); |
| addr = new String(decBytes, StandardCharsets.UTF_8).replaceAll("=", ""); |
| } |
| |
| StringTokenizer st = new StringTokenizer(addr, DELIM); |
| |
| if (st.countTokens() != 2) |
| U.error(log, "Failed to parse S3 entry due to invalid format: " + addr); |
| else { |
| String addrStr = st.nextToken(); |
| String portStr = st.nextToken(); |
| |
| int port = -1; |
| |
| try { |
| port = Integer.parseInt(portStr); |
| } |
| catch (NumberFormatException e) { |
| U.error(log, "Failed to parse port for S3 entry: " + addr, e); |
| } |
| |
| if (port != -1) |
| try { |
| return new InetSocketAddress(addrStr, port); |
| } |
| catch (IllegalArgumentException e) { |
| U.error(log, "Failed to parse port for S3 entry: " + addr, e); |
| } |
| } |
| |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void registerAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { |
| assert !F.isEmpty(addrs); |
| |
| initClient(); |
| |
| for (InetSocketAddress addr : addrs) { |
| String key = key(addr); |
| |
| try { |
| s3.putObject(bucketName, key, new ByteArrayInputStream(ENTRY_CONTENT), objMetadata); |
| } |
| catch (AmazonClientException e) { |
| throw new IgniteSpiException("Failed to put entry [bucketName=" + bucketName + |
| ", entry=" + key + ']', e); |
| } |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void unregisterAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { |
| assert !F.isEmpty(addrs); |
| |
| initClient(); |
| |
| for (InetSocketAddress addr : addrs) { |
| String key = key(addr); |
| |
| try { |
| s3.deleteObject(bucketName, key); |
| } |
| catch (AmazonClientException e) { |
| throw new IgniteSpiException("Failed to delete entry [bucketName=" + bucketName + |
| ", entry=" + key + ']', e); |
| } |
| } |
| } |
| |
| /** |
| * Gets S3 key for provided address. |
| * |
| * @param addr Node address. |
| * @return Key. |
| */ |
| private String key(InetSocketAddress addr) { |
| assert addr != null; |
| |
| SB sb = new SB(); |
| |
| if (keyPrefix != null) |
| sb.a(keyPrefix); |
| |
| String addrStr = addr.getAddress().getHostAddress(); |
| |
| if (encryptionSvc != null) { |
| String addrPort = new SB() |
| .a(addrStr) |
| .a(DELIM) |
| .a(addr.getPort()).toString(); |
| |
| byte[] encBytes = encryptionSvc.encrypt(addrPort.getBytes(StandardCharsets.UTF_8)); |
| byte[] base32Bytes = new Base32().encode(encBytes); |
| String encStr = new String(base32Bytes, StandardCharsets.UTF_8).replaceAll("=", ""); |
| |
| sb.a(encStr); |
| } |
| else |
| sb.a(addrStr) |
| .a(DELIM) |
| .a(addr.getPort()); |
| |
| return sb.toString(); |
| } |
| |
| /** |
| * Amazon s3 client initialization. |
| * |
| * @throws org.apache.ignite.spi.IgniteSpiException In case of error. |
| */ |
| private void initClient() throws IgniteSpiException { |
| if (initGuard.compareAndSet(false, true)) |
| try { |
| if (cred == null && credProvider == null) |
| throw new IgniteSpiException("AWS credentials are not set."); |
| |
| if (cfg == null) |
| U.warn(log, "Amazon client configuration is not set (will use default)."); |
| |
| if (F.isEmpty(bucketName)) |
| throw new IgniteSpiException("Bucket name is null or empty (provide bucket name and restart)."); |
| |
| objMetadata.setContentLength(ENTRY_CONTENT.length); |
| |
| if (!F.isEmpty(sseAlg)) |
| objMetadata.setSSEAlgorithm(sseAlg); |
| |
| s3 = createAmazonS3Client(); |
| |
| if (!s3.doesBucketExist(bucketName)) { |
| try { |
| s3.createBucket(bucketName); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Created S3 bucket: " + bucketName); |
| |
| while (!s3.doesBucketExist(bucketName)) |
| try { |
| U.sleep(200); |
| } |
| catch (IgniteInterruptedCheckedException e) { |
| throw new IgniteSpiException("Thread has been interrupted.", e); |
| } |
| } |
| catch (AmazonClientException e) { |
| if (!s3.doesBucketExist(bucketName)) { |
| s3 = null; |
| |
| throw new IgniteSpiException("Failed to create bucket: " + bucketName, e); |
| } |
| } |
| } |
| } |
| finally { |
| initLatch.countDown(); |
| } |
| else { |
| try { |
| U.await(initLatch); |
| } |
| catch (IgniteInterruptedCheckedException e) { |
| throw new IgniteSpiException("Thread has been interrupted.", e); |
| } |
| |
| if (s3 == null) |
| throw new IgniteSpiException("Ip finder has not been initialized properly."); |
| } |
| } |
| |
| /** |
| * Instantiates {@code AmazonS3Client} instance. |
| * |
| * @return Client instance to use to connect to AWS. |
| */ |
| AmazonS3Client createAmazonS3Client() { |
| AmazonS3Client cln = cfg != null |
| ? (cred != null ? new AmazonS3Client(cred, cfg) : new AmazonS3Client(credProvider, cfg)) |
| : (cred != null ? new AmazonS3Client(cred) : new AmazonS3Client(credProvider)); |
| |
| if (!F.isEmpty(bucketEndpoint)) |
| cln.setEndpoint(bucketEndpoint); |
| |
| return cln; |
| } |
| |
| /** |
| * Sets bucket name for IP finder. |
| * |
| * @param bucketName Bucket name. |
| * @return {@code this} for chaining. |
| */ |
| @IgniteSpiConfiguration(optional = false) |
| public TcpDiscoveryS3IpFinder setBucketName(String bucketName) { |
| this.bucketName = bucketName; |
| |
| return this; |
| } |
| |
| /** |
| * Sets bucket endpoint for IP finder. If the endpoint is not set then IP finder will go to each region to find a |
| * corresponding bucket. For information about possible endpoint names visit |
| * <a href="http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region">docs.aws.amazon.com</a>. |
| * |
| * @param bucketEndpoint Bucket endpoint, for example, s3.us-east-2.amazonaws.com. |
| * @return {@code this} for chaining. |
| */ |
| @IgniteSpiConfiguration(optional = true) |
| public TcpDiscoveryS3IpFinder setBucketEndpoint(String bucketEndpoint) { |
| this.bucketEndpoint = bucketEndpoint; |
| |
| return this; |
| } |
| |
| /** |
| * Sets server-side encryption algorithm for Amazon S3-managed encryption keys. For information about possible |
| * S3-managed encryption keys visit |
| * <a href="http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingServerSideEncryption.html">docs.aws.amazon.com</a>. |
| * |
| * @param sseAlg Server-side encryption algorithm, for example, AES256 or SSES3. |
| * @return {@code this} for chaining. |
| */ |
| @IgniteSpiConfiguration(optional = true) |
| public TcpDiscoveryS3IpFinder setSSEAlgorithm(String sseAlg) { |
| this.sseAlg = sseAlg; |
| |
| return this; |
| } |
| |
| /** |
| * Sets Amazon client configuration. |
| * <p> |
| * For details refer to Amazon S3 API reference. |
| * |
| * @param cfg Amazon client configuration. |
| * @return {@code this} for chaining. |
| */ |
| @IgniteSpiConfiguration(optional = true) |
| public TcpDiscoveryS3IpFinder setClientConfiguration(ClientConfiguration cfg) { |
| this.cfg = cfg; |
| |
| return this; |
| } |
| |
| /** |
| * Sets encryption service for client side node address encryption. |
| * |
| * @param encryptionSvc Encryption service . |
| * @return {@code this} for chaining. |
| */ |
| @IgniteSpiConfiguration(optional = true) |
| public TcpDiscoveryS3IpFinder setEncryptionService(EncryptionService encryptionSvc) { |
| this.encryptionSvc = encryptionSvc; |
| |
| return this; |
| } |
| |
| /** |
| * Sets AWS credentials. Either use {@link #setAwsCredentialsProvider(AWSCredentialsProvider)} or this one. |
| * <p> |
| * For details refer to Amazon S3 API reference. |
| * |
| * @param cred AWS credentials. |
| * @return {@code this} for chaining. |
| */ |
| @IgniteSpiConfiguration(optional = false) |
| public TcpDiscoveryS3IpFinder setAwsCredentials(AWSCredentials cred) { |
| this.cred = cred; |
| |
| return this; |
| } |
| |
| /** |
| * Sets AWS credentials provider. Either use {@link #setAwsCredentials(AWSCredentials)} or this one. |
| * <p> |
| * For details refer to Amazon S3 API reference. |
| * |
| * @param credProvider AWS credentials provider. |
| * @return {@code this} for chaining. |
| */ |
| @IgniteSpiConfiguration(optional = false) |
| public TcpDiscoveryS3IpFinder setAwsCredentialsProvider(AWSCredentialsProvider credProvider) { |
| this.credProvider = credProvider; |
| |
| return this; |
| } |
| |
| /** |
| * This can be thought of as the sub-folder within the bucket that will hold the node addresses. |
| * <p> |
| * For details visit |
| * <a href="https://docs.aws.amazon.com/AmazonS3/latest/dev/ListingKeysHierarchy.html"/> |
| * |
| * @param keyPrefix AWS credentials provider. |
| * @return {@code this} for chaining. |
| */ |
| @IgniteSpiConfiguration(optional = true) |
| public TcpDiscoveryS3IpFinder setKeyPrefix(String keyPrefix) { |
| this.keyPrefix = keyPrefix; |
| |
| return this; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public TcpDiscoveryS3IpFinder setShared(boolean shared) { |
| super.setShared(shared); |
| |
| return this; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(TcpDiscoveryS3IpFinder.class, this, "super", super.toString()); |
| } |
| } |