blob: 6121b47263b683ca1c1ff10de4584cf2421f2600 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.pubsub
import java.io.{ByteArrayInputStream, File, FileOutputStream}
import java.nio.file.{Files, Paths}
import java.util
import com.google.api.client.auth.oauth2.Credential
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential
import com.google.api.client.http.HttpTransport
import com.google.api.client.json.jackson.JacksonFactory
import com.google.api.services.pubsub.PubsubScopes
import com.google.cloud.hadoop.util.{CredentialFactory, HttpTransportFactory}
/**
* Serializable interface providing a method executors can call to obtain an
* GCPCredentialsProvider instance for authenticating to GCP services.
*/
private[pubsub] sealed trait SparkGCPCredentials extends Serializable {
def provider: Credential
def jacksonFactory(): JacksonFactory = new JacksonFactory
def httpTransport(): HttpTransport = HttpTransportFactory.createHttpTransport(
HttpTransportFactory.HttpTransportType.JAVA_NET, null
)
def scopes(): util.Collection[String] = PubsubScopes.all()
}
/**
* Application default credentials.
*/
private[pubsub] final case object ApplicationDefaultCredentials extends SparkGCPCredentials {
override def provider: Credential = {
GoogleCredential.getApplicationDefault.createScoped(PubsubScopes.all())
}
}
/**
* Credentials based on JSON key file.
*/
private[pubsub] final case class JsonConfigCredentials(jsonContent: Array[Byte])
extends SparkGCPCredentials {
def this(jsonFilePath: String) = this(Files.readAllBytes(Paths.get(jsonFilePath)))
override def provider: Credential = {
val stream = new ByteArrayInputStream(jsonContent)
val credentials = CredentialFactory.GoogleCredentialWithRetry.fromGoogleCredential(
GoogleCredential.fromStream(
stream, httpTransport(), jacksonFactory()
).createScoped(scopes())
)
stream.close()
credentials
}
}
/**
* Credentials based on e-mail account and P12 key.
*/
private[pubsub] final case class EMailPrivateKeyCredentials(
emailAccount: String, p12Content: Array[Byte]
) extends SparkGCPCredentials {
def this(emailAccount: String, p12FilePath: String) = {
this(emailAccount, Files.readAllBytes(Paths.get(p12FilePath)))
}
override def provider: Credential = {
val tempFile = File.createTempFile(emailAccount, ".p12")
tempFile.deleteOnExit()
val p12Out = Files.newOutputStream(tempFile.toPath)
p12Out.write(p12Content, 0, p12Content.length)
p12Out.flush()
p12Out.close()
new CredentialFactory.GoogleCredentialWithRetry(
new GoogleCredential.Builder().setTransport(httpTransport())
.setJsonFactory(jacksonFactory())
.setServiceAccountId(emailAccount)
.setServiceAccountScopes(scopes())
.setServiceAccountPrivateKeyFromP12File(tempFile)
.setRequestInitializer(new CredentialFactory.CredentialHttpRetryInitializer())
)
}
}
/**
* Credentials based on metadata service.
*/
private[pubsub] final case class MetadataServiceCredentials() extends SparkGCPCredentials {
override def provider: Credential = {
(new CredentialFactory).getCredentialFromMetadataServiceAccount
}
}
object SparkGCPCredentials {
/**
* Builder for SparkGCPCredentials instance.
*/
class Builder {
private var credentials: Option[SparkGCPCredentials] = None
/**
* Use a JSON type key file for service account credential
* @param jsonFilePath JSON type key file
* @return Reference to this SparkGCPCredentials.Builder
*/
def jsonServiceAccount(jsonFilePath: String): Builder = {
credentials = Option(new JsonConfigCredentials(jsonFilePath))
this
}
/**
* Use a JSON type key file for service account credential
* @param jsonFileBuffer binary content of JSON type key file
* @return Reference to this SparkGCPCredentials.Builder
*/
def jsonServiceAccount(jsonFileBuffer: Array[Byte]): Builder = {
credentials = Option(JsonConfigCredentials(jsonFileBuffer))
this
}
/**
* Use a P12 type key file service account credential
* @param p12FilePath P12 type key file
* @param emailAccount email of service account
* @return Reference to this SparkGCPCredentials.Builder
*/
def p12ServiceAccount(p12FilePath: String, emailAccount: String): Builder = {
credentials = Option(new EMailPrivateKeyCredentials(emailAccount, p12FilePath))
this
}
/**
* Use a P12 type key file service account credential
* @param p12FileBuffer binary content of P12 type key file
* @param emailAccount email of service account
* @return Reference to this SparkGCPCredentials.Builder
*/
def p12ServiceAccount(p12FileBuffer: Array[Byte], emailAccount: String): Builder = {
credentials = Option(EMailPrivateKeyCredentials(emailAccount, p12FileBuffer))
this
}
/**
* Use a meta data service to return service account
* @return Reference to this SparkGCPCredentials.Builder
*/
def metadataServiceAccount(): Builder = {
credentials = Option(MetadataServiceCredentials())
this
}
/**
* Returns the appropriate instance of SparkGCPCredentials given the configured
* parameters.
* - The service account credentials will be returned if they were provided.
* - The application default credentials will be returned otherwise.
* @return SparkGCPCredentials object
*/
def build(): SparkGCPCredentials = credentials.getOrElse(ApplicationDefaultCredentials)
}
/**
* Creates a SparkGCPCredentials.Builder for constructing
* SparkGCPCredentials instance.
* @return SparkGCPCredentials.Builder instance
*/
def builder: Builder = new Builder
}