blob: 084783aa45b62e2b22a34c66a74ea6329b38fc7d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.spi.discovery.tcp.ipfinder.gce;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.InputStreamContent;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.StorageScopes;
import com.google.api.services.storage.model.Bucket;
import com.google.api.services.storage.model.StorageObject;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiConfiguration;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinderAdapter;
/**
* Google Cloud Storage based IP finder.
* <p>
* For information about Cloud Storage visit <a href="https://cloud.google.com/storage/">cloud.google.com</a>.
* <h1 class="header">Configuration</h1>
* <h2 class="header">Mandatory</h2>
* <ul>
* <li>Service Account Id (see {@link #setServiceAccountId(String)})</li>
* <li>Service Account P12 key file path (see {@link #setServiceAccountP12FilePath(String)})</li>
* <li>Google Platform project name (see {@link #setProjectName(String)})</li>
* <li>Google Storage bucket name (see {@link #setBucketName(String)})</li>
* </ul>
* <h2 class="header">Optional</h2>
* <ul>
* <li>Shared flag (see {@link #setShared(boolean)})</li>
* </ul>
* <p>
* The finder will create a bucket with the provided name. The bucket will contain entries named
* like the following: {@code 192.168.1.136#1001}.
* <p>
* Note that storing data in Google Cloud Storage service will result in charges to your Google Cloud Platform account.
* Choose another implementation of {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} for local
* or home network tests.
* <p>
* Note that this finder is shared by default (see {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder#isShared()}.
*/
public class TcpDiscoveryGoogleStorageIpFinder extends TcpDiscoveryIpFinderAdapter {
/** Default object's content. */
private static final ByteArrayInputStream OBJECT_CONTENT = new ByteArrayInputStream(new byte[0]);
/** Grid logger. */
@LoggerResource
private IgniteLogger log;
/** Google Cloud Platform's project name.*/
private String projectName;
/** Google Storage bucket name. */
private String bucketName;
/** Service account p12 private key file name. */
private String srvcAccountP12FilePath;
/** Service account id. */
private String srvcAccountId;
/** Google storage. */
private Storage storage;
/** Init routine guard. */
private final AtomicBoolean initGuard = new AtomicBoolean();
/** Init routine latch. */
private final CountDownLatch initLatch = new CountDownLatch(1);
/**
*
*/
public TcpDiscoveryGoogleStorageIpFinder() {
setShared(true);
}
/** {@inheritDoc} */
@Override public Collection<InetSocketAddress> getRegisteredAddresses() throws IgniteSpiException {
init();
Collection<InetSocketAddress> addrs = new ArrayList<>();
try {
Storage.Objects.List listObjects = storage.objects().list(bucketName);
com.google.api.services.storage.model.Objects objects;
do {
objects = listObjects.execute();
if (objects == null || objects.getItems() == null)
break;
for (StorageObject object : objects.getItems())
addrs.add(addrFromString(object.getName()));
listObjects.setPageToken(objects.getNextPageToken());
}
while (null != objects.getNextPageToken());
}
catch (Exception e) {
throw new IgniteSpiException("Failed to get content from the bucket: " + bucketName, e);
}
return addrs;
}
/** {@inheritDoc} */
@Override public void registerAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException {
assert !F.isEmpty(addrs);
init();
for (InetSocketAddress addr : addrs) {
String key = keyFromAddr(addr);
StorageObject object = new StorageObject();
object.setBucket(bucketName);
object.setName(key);
InputStreamContent content = new InputStreamContent("application/octet-stream", OBJECT_CONTENT);
content.setLength(OBJECT_CONTENT.available());
try {
Storage.Objects.Insert insertObject = storage.objects().insert(bucketName, object, content);
insertObject.execute();
}
catch (Exception e) {
throw new IgniteSpiException("Failed to put entry [bucketName=" + bucketName +
", entry=" + key + ']', e);
}
}
}
/** {@inheritDoc} */
@Override public void unregisterAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException {
assert !F.isEmpty(addrs);
init();
for (InetSocketAddress addr : addrs) {
String key = keyFromAddr(addr);
try {
Storage.Objects.Delete deleteObject = storage.objects().delete(bucketName, key);
deleteObject.execute();
}
catch (Exception e) {
throw new IgniteSpiException("Failed to delete entry [bucketName=" + bucketName +
", entry=" + key + ']', e);
}
}
}
/**
* Sets Google Cloud Platforms project name.
* Usually this is an auto generated project number (ex. 208709979073) that can be found in "Overview" section
* of Google Developer Console.
* <p>
* For details refer to Google Cloud Platform API reference.
*
* @param projectName Project name.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = false)
public TcpDiscoveryGoogleStorageIpFinder setProjectName(String projectName) {
this.projectName = projectName;
return this;
}
/**
* Sets Google Cloud Storage bucket name.
* If the bucket doesn't exist Ignite will automatically create it. However the name must be unique across whole
* Google Cloud Storage and Service Account Id (see {@link #setServiceAccountId(String)}) must be authorized to
* perform this operation.
*
* @param bucketName Bucket name.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = false)
public TcpDiscoveryGoogleStorageIpFinder setBucketName(String bucketName) {
this.bucketName = bucketName;
return this;
}
/**
* Sets a full path to the private key in PKCS12 format of the Service Account.
* <p>
* For more information please refer to
* <a href="https://cloud.google.com/storage/docs/authentication#service_accounts">
* Service Account Authentication</a>.
*
* @param p12FileName Private key file full path.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = false)
public TcpDiscoveryGoogleStorageIpFinder setServiceAccountP12FilePath(String p12FileName) {
this.srvcAccountP12FilePath = p12FileName;
return this;
}
/**
* Sets the service account ID (typically an e-mail address).
* <p>
* For more information please refer to
* <a href="https://cloud.google.com/storage/docs/authentication#service_accounts">
* Service Account Authentication</a>.
*
* @param id Service account ID.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = false)
public TcpDiscoveryGoogleStorageIpFinder setServiceAccountId(String id) {
this.srvcAccountId = id;
return this;
}
/**
* Google Cloud Storage initialization.
*
* @throws IgniteSpiException In case of error.
*/
private void init() throws IgniteSpiException {
if (initGuard.compareAndSet(false, true)) {
if (srvcAccountId == null ||
srvcAccountP12FilePath == null ||
projectName == null ||
bucketName == null) {
throw new IgniteSpiException(
"One or more of the required parameters is not set [serviceAccountId=" +
srvcAccountId + ", serviceAccountP12FilePath=" + srvcAccountP12FilePath + ", projectName=" +
projectName + ", bucketName=" + bucketName + "]");
}
try {
NetHttpTransport httpTransport;
try {
httpTransport = GoogleNetHttpTransport.newTrustedTransport();
}
catch (GeneralSecurityException | IOException e) {
throw new IgniteSpiException(e);
}
GoogleCredential cred;
try {
cred = new GoogleCredential.Builder().setTransport(httpTransport)
.setJsonFactory(JacksonFactory.getDefaultInstance()).setServiceAccountId(srvcAccountId)
.setServiceAccountPrivateKeyFromP12File(new File(srvcAccountP12FilePath))
.setServiceAccountScopes(Collections.singleton(StorageScopes.DEVSTORAGE_FULL_CONTROL)).build();
}
catch (Exception e) {
throw new IgniteSpiException("Failed to authenticate on Google Cloud Platform", e);
}
try {
storage = new Storage.Builder(httpTransport, JacksonFactory.getDefaultInstance(), cred)
.setApplicationName(projectName).build();
}
catch (Exception e) {
throw new IgniteSpiException("Failed to open a storage for given project name: " + projectName, e);
}
boolean createBucket = false;
try {
Storage.Buckets.Get getBucket = storage.buckets().get(bucketName);
getBucket.setProjection("full");
getBucket.execute();
}
catch (GoogleJsonResponseException e) {
if (e.getStatusCode() == 404) {
U.warn(log, "Bucket doesn't exist, will create it [bucketName=" + bucketName + "]");
createBucket = true;
}
else
throw new IgniteSpiException("Failed to open the bucket: " + bucketName, e);
}
catch (Exception e) {
throw new IgniteSpiException("Failed to open the bucket: " + bucketName, e);
}
if (createBucket) {
Bucket newBucket = new Bucket();
newBucket.setName(bucketName);
try {
Storage.Buckets.Insert insertBucket = storage.buckets().insert(projectName, newBucket);
insertBucket.setProjection("full");
insertBucket.setPredefinedDefaultObjectAcl("projectPrivate");
insertBucket.execute();
}
catch (Exception e) {
throw new IgniteSpiException("Failed to create the bucket: " + bucketName, e);
}
}
}
finally {
initLatch.countDown();
}
}
else {
try {
U.await(initLatch);
}
catch (IgniteInterruptedCheckedException e) {
throw new IgniteSpiException("Thread has been interrupted.", e);
}
if (storage == null)
throw new IgniteSpiException("IpFinder has not been initialized properly");
}
}
/**
* Constructs bucket's key from an address.
*
* @param addr Node address.
* @return Bucket key.
*/
private String keyFromAddr(InetSocketAddress addr) {
return addr.getAddress().getHostAddress() + "#" + addr.getPort();
}
/**
* Constructs a node address from bucket's key.
*
* @param key Bucket key.
* @return Node address.
* @throws IgniteSpiException In case of error.
*/
private InetSocketAddress addrFromString(String key) throws IgniteSpiException {
String[] res = key.split("#");
if (res.length != 2)
throw new IgniteSpiException("Invalid address string: " + key);
int port;
try {
port = Integer.parseInt(res[1]);
}
catch (NumberFormatException ignored) {
throw new IgniteSpiException("Invalid port number: " + res[1]);
}
return new InetSocketAddress(res[0], port);
}
/**
* Used by TEST SUITES only. Called through reflection.
*
* @param bucketName Bucket to delete.
*/
private void removeBucket(String bucketName) {
init();
try {
Storage.Buckets.Delete deleteBucket = storage.buckets().delete(bucketName);
deleteBucket.execute();
}
catch (Exception e) {
throw new IgniteSpiException("Failed to remove the bucket: " + bucketName, e);
}
}
/** {@inheritDoc} */
@Override public TcpDiscoveryGoogleStorageIpFinder setShared(boolean shared) {
super.setShared(shared);
return this;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoveryGoogleStorageIpFinder.class, this);
}
}