#5060: Upgrade to Akka 2.6.12 (#5065)
- removed usage implicit custom ActorMaterializer
- upgraded to Akka Http 10.2.3
- changed the way how Https set up when host name verification is disabled
- changed RestartSink.withBackoff to accept RestartSettings
- changed akka.actor.FSM.setTimer => akka.actor.FSM.startTimerAtFixedRate
- changed akka.actor.Scheduler.schedule => akka.actor.Scheduler.scheduleAtFixedRate
- updated to the new signature of Source.actorRef
- replaced deprecated HttpResponse.copy
- replaced Gzip with Coders.Gzip
- fixed the scalafmt in tests
- removed the implicit Materializer shutdown considering its lifecycle is managed by akka
- increased the ansible wait time
- switched to classic networking
- disabled JFR
- merging with New Scheduler
- increased wait time for FSM due to intermittent failures in build
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index 575aaf4..21f33a6 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -365,4 +365,4 @@
register: result
until: result.status == 200
retries: 12
- delay: 5
+ delay: 10
diff --git a/ansible/roles/controller/tasks/join_akka_cluster.yml b/ansible/roles/controller/tasks/join_akka_cluster.yml
index 4623bcc..dc0a1ee 100644
--- a/ansible/roles/controller/tasks/join_akka_cluster.yml
+++ b/ansible/roles/controller/tasks/join_akka_cluster.yml
@@ -32,7 +32,7 @@
env: >-
{{ env | combine({
'CONFIG_akka_cluster_seedNodes_' ~ seedNode.0:
- 'akka.tcp://controller-actor-system@'~seedNode.1~':'~(controller.akka.cluster.basePort+seedNode.0)
+ 'akka://controller-actor-system@'~seedNode.1~':'~(controller.akka.cluster.basePort+seedNode.0)
}) }}
with_indexed_items: "{{ controller.akka.cluster.seedNodes }}"
loop_control:
@@ -42,11 +42,11 @@
vars:
akka_env:
"CONFIG_akka_actor_provider": "{{ controller.akka.provider }}"
- "CONFIG_akka_remote_netty_tcp_hostname":
+ "CONFIG_akka_remote_artery_canonical_hostname":
"{{ controller.akka.cluster.host[(controller_index | int)] }}"
- "CONFIG_akka_remote_netty_tcp_port":
+ "CONFIG_akka_remote_artery_canonical_port":
"{{ controller.akka.cluster.basePort + (controller_index | int) }}"
- "CONFIG_akka_remote_netty_tcp_bindPort":
+ "CONFIG_akka_remote_artery_bind_port":
"{{ controller.akka.cluster.bindPort }}"
set_fact:
env: "{{ env | combine(akka_env) }}"
diff --git a/build.gradle b/build.gradle
index 9007e3e..5228625 100644
--- a/build.gradle
+++ b/build.gradle
@@ -52,7 +52,7 @@
'akka-discovery', 'akka-distributed-data', 'akka-protobuf', 'akka-remote', 'akka-slf4j',
'akka-stream', 'akka-stream-testkit', 'akka-testkit']
def akkaHttp = ['akka-http', 'akka-http-core', 'akka-http-spray-json', 'akka-http-testkit', 'akka-http-xml',
- 'akka-parsing']
+ 'akka-parsing', 'akka-http2-support']
akka.forEach {
cons.add('compile', "com.typesafe.akka:${it}_${gradle.scala.depVersion}:${gradle.akka.version}")
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 230e16d..287b5c3 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -18,6 +18,10 @@
# default application configuration file for akka
include "logging"
+akka {
+ java-flight-recorder.enabled = false
+}
+
akka.http {
client {
parsing.illegal-header-warnings = off
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Https.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Https.scala
index 13c4bf8..b60b86c 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Https.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Https.scala
@@ -20,10 +20,8 @@
import java.io.{FileInputStream, InputStream}
import java.security.{KeyStore, SecureRandom}
import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}
-
-import akka.http.scaladsl.ConnectionContext
+import akka.http.scaladsl.{ConnectionContext, HttpsConnectionContext}
import akka.stream.TLSClientAuth
-import com.typesafe.sslconfig.akka.AkkaSSLConfig
object Https {
case class HttpsConfig(keystorePassword: String, keystoreFlavor: String, keystorePath: String, clientAuth: String)
@@ -35,8 +33,16 @@
cs
}
- def connectionContext(httpsConfig: HttpsConfig, sslConfig: Option[AkkaSSLConfig] = None) = {
+ def httpsInsecureClient(context: SSLContext): HttpsConnectionContext =
+ ConnectionContext.httpsClient((host, port) => {
+ val engine = context.createSSLEngine(host, port)
+ engine.setUseClientMode(true)
+ // WARNING: this creates an SSL Engine without enabling endpoint identification/verification procedures
+ // Disabling host name verification is a very bad idea, please don't unless you have a very good reason to.
+ engine
+ })
+ def applyHttpsConfig(httpsConfig: HttpsConfig, withDisableHostnameVerification: Boolean = false): SSLContext = {
val keyFactoryType = "SunX509"
val clientAuth = {
if (httpsConfig.clientAuth.toBoolean)
@@ -63,7 +69,27 @@
val sslContext: SSLContext = SSLContext.getInstance("TLS")
sslContext.init(keyManagerFactory.getKeyManagers, trustManagerFactory.getTrustManagers, new SecureRandom)
+ sslContext
+ }
- ConnectionContext.https(sslContext, sslConfig, clientAuth = clientAuth)
+ def connectionContextClient(httpsConfig: HttpsConfig,
+ withDisableHostnameVerification: Boolean = false): HttpsConnectionContext = {
+ val sslContext = applyHttpsConfig(httpsConfig, withDisableHostnameVerification)
+ connectionContextClient(sslContext, withDisableHostnameVerification)
+ }
+
+ def connectionContextClient(sslContext: SSLContext,
+ withDisableHostnameVerification: Boolean): HttpsConnectionContext = {
+ if (withDisableHostnameVerification) {
+ httpsInsecureClient(sslContext)
+ } else {
+ ConnectionContext.httpsClient(sslContext)
+ }
+ }
+
+ def connectionContextServer(httpsConfig: HttpsConfig,
+ withDisableHostnameVerification: Boolean = false): HttpsConnectionContext = {
+ val sslContext: SSLContext = applyHttpsConfig(httpsConfig, withDisableHostnameVerification)
+ ConnectionContext.httpsServer(sslContext)
}
}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
index 6518aee..83267ba 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
@@ -32,7 +32,7 @@
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.alpakka.file.scaladsl.LogRotatorSink
-import akka.stream.{Graph, SinkShape, UniformFanOutShape}
+import akka.stream.{Graph, RestartSettings, SinkShape, UniformFanOutShape}
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, MergeHub, RestartSink, Sink, Source}
import akka.util.ByteString
@@ -87,32 +87,33 @@
protected val writeToFile: Sink[ByteString, _] = MergeHub
.source[ByteString]
.batchWeighted(bufferSize.toBytes, _.length, identity)(_ ++ _)
- .to(RestartSink.withBackoff(minBackoff = 1.seconds, maxBackoff = 60.seconds, randomFactor = 0.2) { () =>
- LogRotatorSink(() => {
- val maxSize = bufferSize.toBytes
- var bytesRead = maxSize
- element =>
- {
- val size = element.size
- if (bytesRead + size > maxSize) {
- bytesRead = size
- val logFilePath = destinationDirectory.resolve(s"userlogs-${Instant.now.toEpochMilli}.log")
- logging.info(this, s"Rotating log file to '$logFilePath'")
- try {
- Files.createFile(logFilePath)
- Files.setPosixFilePermissions(logFilePath, perms)
- } catch {
- case t: Throwable =>
- logging.error(this, s"Couldn't create userlogs file: $t")
- throw t
+ .to(RestartSink.withBackoff(RestartSettings(minBackoff = 1.seconds, maxBackoff = 60.seconds, randomFactor = 0.2)) {
+ () =>
+ LogRotatorSink(() => {
+ val maxSize = bufferSize.toBytes
+ var bytesRead = maxSize
+ element =>
+ {
+ val size = element.size
+ if (bytesRead + size > maxSize) {
+ bytesRead = size
+ val logFilePath = destinationDirectory.resolve(s"userlogs-${Instant.now.toEpochMilli}.log")
+ logging.info(this, s"Rotating log file to '$logFilePath'")
+ try {
+ Files.createFile(logFilePath)
+ Files.setPosixFilePermissions(logFilePath, perms)
+ } catch {
+ case t: Throwable =>
+ logging.error(this, s"Couldn't create userlogs file: $t")
+ throw t
+ }
+ Some(logFilePath)
+ } else {
+ bytesRead += size
+ None
}
- Some(logFilePath)
- } else {
- bytesRead += size
- None
}
- }
- })
+ })
})
.run()
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationLogStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationLogStore.scala
index 0fa8892..0c8bf48 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationLogStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationLogStore.scala
@@ -21,7 +21,6 @@
import akka.NotUsed
import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Source
@@ -62,7 +61,7 @@
*/
class DockerToActivationLogStore(system: ActorSystem) extends LogStore {
implicit val ec: ExecutionContext = system.dispatcher
- implicit val mat: ActorMaterializer = ActorMaterializer()(system)
+ implicit val actorSystem: ActorSystem = system
/* "json-file" is the log-driver that writes out to file */
override val containerParameters = Map("--log-driver" -> Set("json-file"))
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStore.scala
index fcc5bb3..65033d3 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStore.scala
@@ -75,8 +75,6 @@
elasticSearchConfig.logSchema.time)
}
- implicit val actorSystem = system
-
private val esClient = new ElasticSearchRestClient(
elasticSearchConfig.protocol,
elasticSearchConfig.host,
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/SplunkLogStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/SplunkLogStore.scala
index c83309d..a65fe20 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/SplunkLogStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/SplunkLogStore.scala
@@ -21,6 +21,7 @@
import java.time.temporal.ChronoUnit
import akka.actor.ActorSystem
+import akka.http.scaladsl.ConnectionContext
import akka.http.scaladsl.Http
import akka.http.scaladsl.client.RequestBuilding.Post
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
@@ -32,14 +33,13 @@
import akka.http.scaladsl.model.headers.Authorization
import akka.http.scaladsl.model.headers.BasicHttpCredentials
import akka.http.scaladsl.unmarshalling.Unmarshal
-import akka.stream.ActorMaterializer
import akka.stream.OverflowStrategy
import akka.stream.QueueOfferResult
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
-import com.typesafe.sslconfig.akka.AkkaSSLConfig
+import javax.net.ssl.{SSLContext, SSLEngine}
import pureconfig._
import pureconfig.generic.auto._
@@ -89,7 +89,6 @@
extends LogDriverLogStore(actorSystem) {
implicit val as = actorSystem
implicit val ec = as.dispatcher
- implicit val materializer = ActorMaterializer()
private val logging = new AkkaLogging(actorSystem.log)
private val splunkApi = Path / "services" / "search" / "jobs" //see http://docs.splunk.com/Documentation/Splunk/6.6.3/RESTREF/RESTsearch#search.2Fjobs
@@ -98,12 +97,22 @@
val maxPendingRequests = 500
+ def createInsecureSslEngine(host: String, port: Int): SSLEngine = {
+ val engine = SSLContext.getDefault.createSSLEngine(host, port)
+ engine.setUseClientMode(true)
+
+ // WARNING: this creates an SSL Engine without enabling endpoint identification/verification procedures
+ // Disabling host name verification is a very bad idea, please don't unless you have a very good reason to.
+
+ engine
+ }
+
val defaultHttpFlow = Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](
host = splunkConfig.host,
port = splunkConfig.port,
connectionContext =
if (splunkConfig.disableSNI)
- Http().createClientHttpsContext(AkkaSSLConfig().mapSettings(s => s.withLoose(s.loose.withDisableSNI(true))))
+ ConnectionContext.httpsClient(createInsecureSslEngine _)
else Http().defaultClientHttpsContext)
override def fetchLogs(namespace: String,
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationFileStorage.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationFileStorage.scala
index 997ebaf..dd41abc 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationFileStorage.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationFileStorage.scala
@@ -22,7 +22,8 @@
import java.time.Instant
import java.util.EnumSet
-import akka.stream.ActorMaterializer
+import akka.actor.ActorSystem
+import akka.stream.RestartSettings
import akka.stream.alpakka.file.scaladsl.LogRotatorSink
import akka.stream.scaladsl.{Flow, MergeHub, RestartSink, Sink, Source}
import akka.util.ByteString
@@ -37,10 +38,9 @@
class ActivationFileStorage(logFilePrefix: String,
logPath: Path,
writeResultToFile: Boolean,
- actorMaterializer: ActorMaterializer,
+ actorSystem: ActorSystem,
logging: Logging) {
-
- implicit val materializer = actorMaterializer
+ implicit val system: ActorSystem = actorSystem
private var logFile = logPath
private val bufferSize = 100.MB
@@ -48,27 +48,28 @@
private val writeToFile: Sink[ByteString, _] = MergeHub
.source[ByteString]
.batchWeighted(bufferSize.toBytes, _.length, identity)(_ ++ _)
- .to(RestartSink.withBackoff(minBackoff = 1.seconds, maxBackoff = 60.seconds, randomFactor = 0.2) { () =>
- LogRotatorSink(() => {
- val maxSize = bufferSize.toBytes
- var bytesRead = maxSize
- element =>
- {
- val size = element.size
+ .to(RestartSink.withBackoff(RestartSettings(minBackoff = 1.seconds, maxBackoff = 60.seconds, randomFactor = 0.2)) {
+ () =>
+ LogRotatorSink(() => {
+ val maxSize = bufferSize.toBytes
+ var bytesRead = maxSize
+ element =>
+ {
+ val size = element.size
- if (bytesRead + size > maxSize) {
- logFile = logPath.resolve(s"$logFilePrefix-${Instant.now.toEpochMilli}.log")
+ if (bytesRead + size > maxSize) {
+ logFile = logPath.resolve(s"$logFilePrefix-${Instant.now.toEpochMilli}.log")
- logging.info(this, s"Rotating log file to '$logFile'")
- createLogFile(logFile)
- bytesRead = size
- Some(logFile)
- } else {
- bytesRead += size
- None
+ logging.info(this, s"Rotating log file to '$logFile'")
+ createLogFile(logFile)
+ bytesRead = size
+ Some(logFile)
+ } else {
+ bytesRead += size
+ None
+ }
}
- }
- })
+ })
})
.run()
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStore.scala
index 99783fe..8365150 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStore.scala
@@ -20,7 +20,6 @@
import java.time.Instant
import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
import akka.http.scaladsl.model.HttpRequest
import spray.json.JsObject
import org.apache.openwhisk.common.{Logging, TransactionId}
@@ -199,5 +198,5 @@
}
trait ActivationStoreProvider extends Spi {
- def instance(actorSystem: ActorSystem, actorMaterializer: ActorMaterializer, logging: Logging): ActivationStore
+ def instance(actorSystem: ActorSystem, logging: Logging): ActivationStore
}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactActivationStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactActivationStore.scala
index 828273b..1e2ab88 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactActivationStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactActivationStore.scala
@@ -20,7 +20,6 @@
import java.time.Instant
import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
import spray.json.JsObject
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.entity._
@@ -28,13 +27,12 @@
import scala.concurrent.Future
import scala.util.{Failure, Success}
-class ArtifactActivationStore(actorSystem: ActorSystem, actorMaterializer: ActorMaterializer, logging: Logging)
- extends ActivationStore {
+class ArtifactActivationStore(actorSystem: ActorSystem, logging: Logging) extends ActivationStore {
implicit val executionContext = actorSystem.dispatcher
private val artifactStore: ArtifactStore[WhiskActivation] =
- WhiskActivationStore.datastore()(actorSystem, logging, actorMaterializer)
+ WhiskActivationStore.datastore()(actorSystem, logging)
def store(activation: WhiskActivation, context: UserContext)(
implicit transid: TransactionId,
@@ -131,6 +129,6 @@
}
object ArtifactActivationStoreProvider extends ActivationStoreProvider {
- override def instance(actorSystem: ActorSystem, actorMaterializer: ActorMaterializer, logging: Logging) =
- new ArtifactActivationStore(actorSystem, actorMaterializer, logging)
+ override def instance(actorSystem: ActorSystem, logging: Logging) =
+ new ArtifactActivationStore(actorSystem, logging)
}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactStoreProvider.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactStoreProvider.scala
index d9622a4..ace3f5e 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactStoreProvider.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactStoreProvider.scala
@@ -18,7 +18,6 @@
package org.apache.openwhisk.core.database
import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
import com.typesafe.config.ConfigFactory
import spray.json.RootJsonFormat
import org.apache.openwhisk.common.Logging
@@ -31,17 +30,14 @@
* An Spi for providing ArtifactStore implementations
*/
trait ArtifactStoreProvider extends Spi {
- def makeStore[D <: DocumentSerializer: ClassTag](useBatching: Boolean = false)(
- implicit jsonFormat: RootJsonFormat[D],
- docReader: DocumentReader,
- actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): ArtifactStore[D]
+ def makeStore[D <: DocumentSerializer: ClassTag](useBatching: Boolean = false)(implicit jsonFormat: RootJsonFormat[D],
+ docReader: DocumentReader,
+ actorSystem: ActorSystem,
+ logging: Logging): ArtifactStore[D]
- protected def getAttachmentStore[D <: DocumentSerializer: ClassTag]()(
- implicit actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): Option[AttachmentStore] = {
+ protected def getAttachmentStore[D <: DocumentSerializer: ClassTag]()(implicit
+ actorSystem: ActorSystem,
+ logging: Logging): Option[AttachmentStore] = {
if (ConfigFactory.load().hasPath("whisk.spi.AttachmentStoreProvider")) {
Some(SpiLoader.get[AttachmentStoreProvider].makeStore[D]())
} else {
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactWithFileStorageActivationStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactWithFileStorageActivationStore.scala
index 3deb5a6..cba59b1 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactWithFileStorageActivationStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactWithFileStorageActivationStore.scala
@@ -20,7 +20,6 @@
import java.nio.file.Paths
import akka.actor.ActorSystem
-import akka.stream._
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.entity.{DocInfo, _}
@@ -37,18 +36,17 @@
class ArtifactWithFileStorageActivationStore(
actorSystem: ActorSystem,
- actorMaterializer: ActorMaterializer,
logging: Logging,
config: ArtifactWithFileStorageActivationStoreConfig =
loadConfigOrThrow[ArtifactWithFileStorageActivationStoreConfig](ConfigKeys.activationStoreWithFileStorage))
- extends ArtifactActivationStore(actorSystem, actorMaterializer, logging) {
+ extends ArtifactActivationStore(actorSystem, logging) {
private val activationFileStorage =
new ActivationFileStorage(
config.logFilePrefix,
Paths.get(config.logPath),
config.writeResultToFile,
- actorMaterializer,
+ actorSystem,
logging)
def getLogFile = activationFileStorage.getLogFile
@@ -71,6 +69,6 @@
}
object ArtifactWithFileStorageActivationStoreProvider extends ActivationStoreProvider {
- override def instance(actorSystem: ActorSystem, actorMaterializer: ActorMaterializer, logging: Logging) =
- new ArtifactWithFileStorageActivationStore(actorSystem, actorMaterializer, logging)
+ override def instance(actorSystem: ActorSystem, logging: Logging) =
+ new ArtifactWithFileStorageActivationStore(actorSystem, logging)
}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/AttachmentStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/AttachmentStore.scala
index 48c711b..facb698 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/AttachmentStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/AttachmentStore.scala
@@ -18,7 +18,6 @@
import akka.actor.ActorSystem
import akka.http.scaladsl.model.ContentType
-import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
import org.apache.openwhisk.common.{Logging, TransactionId}
@@ -29,9 +28,9 @@
import scala.reflect.ClassTag
trait AttachmentStoreProvider extends Spi {
- def makeStore[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): AttachmentStore
+ def makeStore[D <: DocumentSerializer: ClassTag]()(implicit
+ actorSystem: ActorSystem,
+ logging: Logging): AttachmentStore
}
case class AttachResult(digest: String, length: Long)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/AttachmentSupport.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/AttachmentSupport.scala
index 173a3ee..afd1187 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/AttachmentSupport.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/AttachmentSupport.scala
@@ -20,8 +20,8 @@
import java.util.Base64
import akka.NotUsed
+import akka.actor.ActorSystem
import akka.http.scaladsl.model.{ContentType, Uri}
-import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
import spray.json.DefaultJsonProtocol
@@ -48,9 +48,6 @@
*/
trait AttachmentSupport[DocumentAbstraction <: DocumentSerializer] extends DefaultJsonProtocol {
- /** Materializer required for stream processing */
- protected[core] implicit val materializer: Materializer
-
protected def executionContext: ExecutionContext
/**
@@ -75,9 +72,9 @@
* Either - Left(byteString) containing all the bytes from the source or Right(Source[ByteString, _])
* if the source is large
*/
- protected[database] def inlineOrAttach(
- docStream: Source[ByteString, _],
- previousPrefix: ByteString = ByteString.empty): Future[Either[ByteString, Source[ByteString, _]]] = {
+ protected[database] def inlineOrAttach(docStream: Source[ByteString, _],
+ previousPrefix: ByteString = ByteString.empty)(
+ implicit system: ActorSystem): Future[Either[ByteString, Source[ByteString, _]]] = {
implicit val ec = executionContext
docStream.prefixAndTail(1).runWith(Sink.head).flatMap {
case (Nil, _) =>
@@ -137,16 +134,16 @@
* @param docStream attachment source
* @param oldAttachment old attachment in case of update. Required for deleting the old attachment
* @param attachmentStore attachmentStore where attachment needs to be stored
- *
* @return a tuple of updated document info and attachment metadata
*/
- protected[database] def attachToExternalStore[A <: DocumentAbstraction](
- doc: A,
- update: (A, Attached) => A,
- contentType: ContentType,
- docStream: Source[ByteString, _],
- oldAttachment: Option[Attached],
- attachmentStore: AttachmentStore)(implicit transid: TransactionId): Future[(DocInfo, Attached)] = {
+ protected[database] def attachToExternalStore[A <: DocumentAbstraction](doc: A,
+ update: (A, Attached) => A,
+ contentType: ContentType,
+ docStream: Source[ByteString, _],
+ oldAttachment: Option[Attached],
+ attachmentStore: AttachmentStore)(
+ implicit transid: TransactionId,
+ actorSystem: ActorSystem): Future[(DocInfo, Attached)] = {
val asJson = doc.toDocumentRecord
val id = asJson.fields("_id").convertTo[String].trim
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/Batcher.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/Batcher.scala
index 222d009..c3dfe0f 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/Batcher.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/Batcher.scala
@@ -17,11 +17,13 @@
package org.apache.openwhisk.core.database
+import akka.Done
+import akka.actor.ActorSystem
+
import scala.collection.immutable.Queue
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success}
-
-import akka.stream.{ActorMaterializer, OverflowStrategy}
+import akka.stream.{CompletionStrategy, OverflowStrategy}
import akka.stream.scaladsl.{Sink, Source}
/**
@@ -44,16 +46,24 @@
* @param batchSize maximum size of a batch
* @param concurrency number of batches being handled in parallel
* @param operation operation taking the batch
-
* @tparam T the type to be batched
* @tparam R return type of a single element after operation
*/
-class Batcher[T, R](batchSize: Int, concurrency: Int)(operation: Seq[T] => Future[Seq[R]])(
- implicit materializer: ActorMaterializer,
- ec: ExecutionContext) {
+class Batcher[T, R](batchSize: Int, concurrency: Int)(operation: Seq[T] => Future[Seq[R]])(implicit
+ system: ActorSystem,
+ ec: ExecutionContext) {
+
+ val cm: PartialFunction[Any, CompletionStrategy] = {
+ case Done =>
+ CompletionStrategy.immediately
+ }
private val stream = Source
- .actorRef[(T, Promise[R])](Int.MaxValue, OverflowStrategy.dropNew)
+ .actorRef[(T, Promise[R])](
+ completionMatcher = cm,
+ failureMatcher = PartialFunction.empty[Any, Throwable],
+ bufferSize = Int.MaxValue,
+ overflowStrategy = OverflowStrategy.dropNew)
.batch(batchSize, Queue(_))((queue, element) => queue :+ element)
.mapAsyncUnordered(concurrency) { els =>
val elements = els.map(_._1)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/CouchDbRestStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/CouchDbRestStore.scala
index 76fe33a..be10459 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/CouchDbRestStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/CouchDbRestStore.scala
@@ -20,7 +20,6 @@
import akka.actor.ActorSystem
import akka.event.Logging.ErrorLevel
import akka.http.scaladsl.model._
-import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.util.ByteString
import scala.concurrent.Await
@@ -58,7 +57,6 @@
implicit system: ActorSystem,
val logging: Logging,
jsonFormat: RootJsonFormat[DocumentAbstraction],
- val materializer: ActorMaterializer,
docReader: DocumentReader)
extends ArtifactStore[DocumentAbstraction]
with DefaultJsonProtocol
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/CouchDbStoreProvider.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/CouchDbStoreProvider.scala
index 2ecc043..19c77f7 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/CouchDbStoreProvider.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/CouchDbStoreProvider.scala
@@ -18,7 +18,6 @@
package org.apache.openwhisk.core.database
import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
import spray.json.RootJsonFormat
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.core.ConfigKeys
@@ -49,20 +48,18 @@
object CouchDbStoreProvider extends ArtifactStoreProvider {
- def makeStore[D <: DocumentSerializer: ClassTag](useBatching: Boolean)(
- implicit jsonFormat: RootJsonFormat[D],
- docReader: DocumentReader,
- actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): ArtifactStore[D] = makeArtifactStore(useBatching, getAttachmentStore())
+ def makeStore[D <: DocumentSerializer: ClassTag](useBatching: Boolean)(implicit jsonFormat: RootJsonFormat[D],
+ docReader: DocumentReader,
+ actorSystem: ActorSystem,
+ logging: Logging): ArtifactStore[D] =
+ makeArtifactStore(useBatching, getAttachmentStore())
def makeArtifactStore[D <: DocumentSerializer: ClassTag](useBatching: Boolean,
attachmentStore: Option[AttachmentStore])(
implicit jsonFormat: RootJsonFormat[D],
docReader: DocumentReader,
actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): ArtifactStore[D] = {
+ logging: Logging): ArtifactStore[D] = {
val dbConfig = loadConfigOrThrow[CouchDbConfig](ConfigKeys.couchdb)
require(
dbConfig.provider == "Cloudant" || dbConfig.provider == "CouchDB",
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStore.scala
index 3642429..b0ad05c 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStore.scala
@@ -26,7 +26,6 @@
import akka.http.scaladsl.model.StatusCodes.NotFound
import akka.http.scaladsl.model.{ContentType, HttpRequest, HttpResponse, Uri}
import akka.http.scaladsl.unmarshalling.Unmarshal
-import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.util.{ByteString, ByteStringBuilder}
import com.azure.storage.blob.sas.{BlobContainerSasPermission, BlobServiceSasSignatureValues}
@@ -75,14 +74,12 @@
secondaryHost: Option[String])
object AzureBlobAttachmentStoreProvider extends AttachmentStoreProvider {
override def makeStore[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): AttachmentStore = {
+ logging: Logging): AttachmentStore = {
makeStore[D](actorSystem.settings.config)
}
def makeStore[D <: DocumentSerializer: ClassTag](config: Config)(implicit actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): AttachmentStore = {
+ logging: Logging): AttachmentStore = {
val azConfig = loadConfigOrThrow[AzBlobConfig](config, ConfigKeys.azBlob)
new AzureBlobAttachmentStore(createClient(azConfig), azConfig.prefixFor[D], azConfig)
}
@@ -114,9 +111,9 @@
}
class AzureBlobAttachmentStore(client: BlobContainerAsyncClient, prefix: String, config: AzBlobConfig)(
- implicit system: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer)
+ implicit
+ system: ActorSystem,
+ logging: Logging)
extends AttachmentStore {
override protected[core] def scheme: String = "az"
@@ -250,8 +247,8 @@
failure => s"[ATTS_DELETE] '$prefix' internal error, doc: '$docId', failure: '${failure.getMessage}'")
}
- override protected[core] def deleteAttachment(docId: DocId, name: String)(
- implicit transid: TransactionId): Future[Boolean] = {
+ override protected[core] def deleteAttachment(docId: DocId, name: String)(implicit
+ transid: TransactionId): Future[Boolean] = {
val start =
transid.started(this, DATABASE_ATT_DELETE, s"[ATT_DELETE] deleting attachment '$name' of document 'id: $docId'")
@@ -277,7 +274,8 @@
client.getBlobAsyncClient(objectKey(docId, name)).getBlockBlobAsyncClient
private def getAttachmentSource(objectKey: String, config: AzBlobConfig)(
- implicit tid: TransactionId): Future[Option[Source[ByteString, Any]]] = {
+ implicit
+ tid: TransactionId): Future[Option[Source[ByteString, Any]]] = {
val blobClient = client.getBlobAsyncClient(objectKey).getBlockBlobAsyncClient
config.azureCdnConfig match {
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
index 038c86a..1ca87af 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
@@ -21,7 +21,6 @@
import akka.actor.ActorSystem
import akka.event.Logging.InfoLevel
import akka.http.scaladsl.model.{ContentType, StatusCodes, Uri}
-import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
import com.microsoft.azure.cosmosdb._
@@ -53,7 +52,6 @@
implicit system: ActorSystem,
val logging: Logging,
jsonFormat: RootJsonFormat[DocumentAbstraction],
- val materializer: ActorMaterializer,
docReader: DocumentReader)
extends ArtifactStore[DocumentAbstraction]
with DefaultJsonProtocol
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala
index d71e458..d9825d1 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala
@@ -20,7 +20,6 @@
import java.io.Closeable
import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
import com.typesafe.config.ConfigFactory
import org.apache.openwhisk.common.Logging
@@ -48,8 +47,7 @@
implicit jsonFormat: RootJsonFormat[D],
docReader: DocumentReader,
actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): ArtifactStore[D] = {
+ logging: Logging): ArtifactStore[D] = {
val tag = implicitly[ClassTag[D]]
val config = CosmosDBConfig(ConfigFactory.load(), tag.runtimeClass.getSimpleName)
makeStoreForClient(config, getOrCreateReference(config), getAttachmentStore())
@@ -60,8 +58,7 @@
implicit jsonFormat: RootJsonFormat[D],
docReader: DocumentReader,
actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): CosmosDBArtifactStore[D] = {
+ logging: Logging): CosmosDBArtifactStore[D] = {
makeStoreForClient(config, createReference(config).reference(), attachmentStore)
}
@@ -72,8 +69,7 @@
implicit jsonFormat: RootJsonFormat[D],
docReader: DocumentReader,
actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): CosmosDBArtifactStore[D] = {
+ logging: Logging): CosmosDBArtifactStore[D] = {
val classTag = implicitly[ClassTag[D]]
val (dbName, handler, viewMapper) = handlerAndMapper(classTag)
@@ -90,8 +86,7 @@
private def handlerAndMapper[D](entityType: ClassTag[D])(
implicit actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): (String, DocumentHandler, CosmosDBViewMapper) = {
+ logging: Logging): (String, DocumentHandler, CosmosDBViewMapper) = {
val entityClass = entityType.runtimeClass
if (entityClass == classOf[WhiskEntity]) ("whisks", WhisksHandler, WhisksViewMapper)
else if (entityClass == classOf[WhiskActivation]) ("activations", ActivationHandler, ActivationViewMapper)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala
index 8f25712..654cc69 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala
@@ -25,7 +25,6 @@
import akka.event.Logging.ErrorLevel
import akka.http.scaladsl.model._
import akka.stream.scaladsl.Flow
-import akka.stream._
import com.sksamuel.elastic4s.http.search.SearchHit
import com.sksamuel.elastic4s.http.{ElasticClient, ElasticProperties, NoOpRequestConfigCallback}
import com.sksamuel.elastic4s.indexes.IndexRequest
@@ -58,9 +57,7 @@
class ElasticSearchActivationStore(
httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None,
elasticSearchConfig: ElasticSearchActivationStoreConfig,
- useBatching: Boolean = false)(implicit actorSystem: ActorSystem,
- actorMaterializer: ActorMaterializer,
- logging: Logging)
+ useBatching: Boolean = false)(implicit actorSystem: ActorSystem, logging: Logging)
extends ActivationStore {
import com.sksamuel.elastic4s.http.ElasticDsl._
@@ -426,9 +423,8 @@
object ElasticSearchActivationStoreProvider extends ActivationStoreProvider {
import ElasticSearchActivationStore.elasticSearchConfig
- override def instance(actorSystem: ActorSystem, actorMaterializer: ActorMaterializer, logging: Logging) =
+ override def instance(actorSystem: ActorSystem, logging: Logging) =
new ElasticSearchActivationStore(elasticSearchConfig = elasticSearchConfig, useBatching = true)(
actorSystem,
- actorMaterializer,
logging)
}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/memory/MemoryArtifactStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/memory/MemoryArtifactStore.scala
index b62c45c..e2293dd 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/memory/MemoryArtifactStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/memory/MemoryArtifactStore.scala
@@ -21,7 +21,6 @@
import akka.actor.ActorSystem
import akka.http.scaladsl.model.{ContentType, Uri}
-import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
import org.apache.openwhisk.common.{Logging, LoggingMarkers, TransactionId}
@@ -47,8 +46,7 @@
implicit jsonFormat: RootJsonFormat[D],
docReader: DocumentReader,
actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): ArtifactStore[D] = {
+ logging: Logging): ArtifactStore[D] = {
makeArtifactStore(MemoryAttachmentStoreProvider.makeStore())
}
@@ -56,8 +54,7 @@
implicit jsonFormat: RootJsonFormat[D],
docReader: DocumentReader,
actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): ArtifactStore[D] = {
+ logging: Logging): ArtifactStore[D] = {
val classTag = implicitly[ClassTag[D]]
val (dbName, handler, viewMapper) = handlerAndMapper(classTag)
@@ -70,8 +67,7 @@
private def handlerAndMapper[D](entityType: ClassTag[D])(
implicit actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): (String, DocumentHandler, MemoryViewMapper) = {
+ logging: Logging): (String, DocumentHandler, MemoryViewMapper) = {
entityType.runtimeClass match {
case x if x == classOf[WhiskEntity] =>
("whisks", WhisksHandler, WhisksViewMapper)
@@ -96,7 +92,6 @@
implicit system: ActorSystem,
val logging: Logging,
jsonFormat: RootJsonFormat[DocumentAbstraction],
- val materializer: ActorMaterializer,
docReader: DocumentReader)
extends ArtifactStore[DocumentAbstraction]
with DefaultJsonProtocol
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/memory/MemoryAttachmentStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/memory/MemoryAttachmentStore.scala
index 2d7d934..271635c 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/memory/MemoryAttachmentStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/memory/MemoryAttachmentStore.scala
@@ -19,7 +19,6 @@
import akka.actor.ActorSystem
import akka.http.scaladsl.model.ContentType
-import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.util.{ByteString, ByteStringBuilder}
import org.apache.openwhisk.common.LoggingMarkers.{
@@ -39,18 +38,14 @@
object MemoryAttachmentStoreProvider extends AttachmentStoreProvider {
override def makeStore[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): AttachmentStore =
+ logging: Logging): AttachmentStore =
new MemoryAttachmentStore(implicitly[ClassTag[D]].runtimeClass.getSimpleName.toLowerCase)
}
/**
* Basic in-memory AttachmentStore implementation. Useful for testing.
*/
-class MemoryAttachmentStore(dbName: String)(implicit system: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer)
- extends AttachmentStore {
+class MemoryAttachmentStore(dbName: String)(implicit system: ActorSystem, logging: Logging) extends AttachmentStore {
override protected[core] implicit val executionContext: ExecutionContext = system.dispatcher
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/memory/NoopActivationStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/memory/NoopActivationStore.scala
index 9b3d0b6..7b50685 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/memory/NoopActivationStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/memory/NoopActivationStore.scala
@@ -20,7 +20,6 @@
import java.time.Instant
import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
import org.apache.openwhisk.common.{Logging, TransactionId, WhiskInstants}
import org.apache.openwhisk.core.database.{
ActivationStore,
@@ -89,6 +88,6 @@
}
object NoopActivationStoreProvider extends ActivationStoreProvider {
- override def instance(actorSystem: ActorSystem, actorMaterializer: ActorMaterializer, logging: Logging) =
+ override def instance(actorSystem: ActorSystem, logging: Logging) =
NoopActivationStore
}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/S3AttachmentStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/S3AttachmentStore.scala
index c30406c..10c65ea 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/S3AttachmentStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/S3AttachmentStore.scala
@@ -24,7 +24,6 @@
import akka.http.scaladsl.model.headers._
import akka.http.scaladsl.model.{ContentType, HttpRequest, HttpResponse, Uri}
import akka.http.scaladsl.unmarshalling.Unmarshal
-import akka.stream.ActorMaterializer
import akka.stream.alpakka.s3.headers.CannedAcl
import akka.stream.alpakka.s3.scaladsl.S3
import akka.stream.alpakka.s3.{S3Attributes, S3Exception, S3Headers, S3Settings}
@@ -61,15 +60,13 @@
}
override def makeStore[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): AttachmentStore = {
+ logging: Logging): AttachmentStore = {
val config = loadConfigOrThrow[S3Config](ConfigKeys.s3)
new S3AttachmentStore(s3Settings(actorSystem.settings.config), config.bucket, config.prefixFor[D], config.signer)
}
def makeStore[D <: DocumentSerializer: ClassTag](config: Config)(implicit actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): AttachmentStore = {
+ logging: Logging): AttachmentStore = {
val s3config = loadConfigOrThrow[S3Config](config, ConfigKeys.s3)
new S3AttachmentStore(s3Settings(config), s3config.bucket, s3config.prefixFor[D], s3config.signer)
}
@@ -84,8 +81,7 @@
class S3AttachmentStore(s3Settings: S3Settings, bucket: String, prefix: String, urlSigner: Option[UrlSigner])(
implicit system: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer)
+ logging: Logging)
extends AttachmentStore {
private val s3attributes = S3Attributes.settings(s3Settings)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskStore.scala
index 8b7fadf..2899794 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskStore.scala
@@ -23,7 +23,6 @@
import scala.language.postfixOps
import scala.util.Try
import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
import spray.json.JsNumber
import spray.json.JsObject
import spray.json.JsString
@@ -85,28 +84,22 @@
object WhiskAuthStore {
implicit val docReader = WhiskDocumentReader
- def datastore()(implicit system: ActorSystem, logging: Logging, materializer: ActorMaterializer) =
+ def datastore()(implicit system: ActorSystem, logging: Logging) =
SpiLoader.get[ArtifactStoreProvider].makeStore[WhiskAuth]()
}
object WhiskEntityStore {
- def datastore()(implicit system: ActorSystem, logging: Logging, materializer: ActorMaterializer) =
+ def datastore()(implicit system: ActorSystem, logging: Logging) =
SpiLoader
.get[ArtifactStoreProvider]
- .makeStore[WhiskEntity]()(
- classTag[WhiskEntity],
- WhiskEntityJsonFormat,
- WhiskDocumentReader,
- system,
- logging,
- materializer)
+ .makeStore[WhiskEntity]()(classTag[WhiskEntity], WhiskEntityJsonFormat, WhiskDocumentReader, system, logging)
}
object WhiskActivationStore {
implicit val docReader = WhiskDocumentReader
- def datastore()(implicit system: ActorSystem, logging: Logging, materializer: ActorMaterializer) =
+ def datastore()(implicit system: ActorSystem, logging: Logging) =
SpiLoader.get[ArtifactStoreProvider].makeStore[WhiskActivation](useBatching = true)
}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/service/LeaseKeepAliveService.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/service/LeaseKeepAliveService.scala
index c29b878..092f718 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/service/LeaseKeepAliveService.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/service/LeaseKeepAliveService.scala
@@ -114,7 +114,8 @@
private def startKeepAliveService(lease: Lease): Future[SetWatcher] = {
val worker =
- actorSystem.scheduler.schedule(initialDelay = 0.second, interval = 500.milliseconds)(keepAliveOnce(lease))
+ actorSystem.scheduler.scheduleAtFixedRate(initialDelay = 0.second, interval = 500.milliseconds)(() =>
+ keepAliveOnce(lease))
/**
* To verify that lease has been deleted since timeout,
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNComponentActor.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNComponentActor.scala
index 78aeeb9..7409204 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNComponentActor.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNComponentActor.scala
@@ -19,7 +19,6 @@
import akka.actor.{Actor, ActorSystem}
import akka.http.scaladsl.model.{HttpMethods, StatusCodes}
-import akka.stream.ActorMaterializer
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.core.entity.ExecManifest.ImageName
import org.apache.openwhisk.core.yarn.YARNComponentActor.{CreateContainerAsync, RemoveContainer}
@@ -41,7 +40,6 @@
extends Actor {
implicit val as: ActorSystem = actorSystem
- implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val ec: ExecutionContext = actorSystem.dispatcher
//Adding a container via the YARN REST API is actually done by flexing the component's container pool to a certain size.
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNContainerFactory.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNContainerFactory.scala
index 176812a..e8fc86b 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNContainerFactory.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNContainerFactory.scala
@@ -35,7 +35,6 @@
import scala.concurrent.{blocking, ExecutionContext, Future}
import scala.concurrent.duration._
import YARNJsonProtocol._
-import akka.stream.ActorMaterializer
case class YARNConfig(masterUrl: String,
yarnLinkLogMessage: Boolean,
@@ -84,7 +83,6 @@
val containerStartTimeoutMS = 60000
implicit val as: ActorSystem = actorSystem
- implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val ec: ExecutionContext = actorSystem.dispatcher
override def init(): Unit = {
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNContainerInfoActor.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNContainerInfoActor.scala
index 4837940..fab9b53 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNContainerInfoActor.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNContainerInfoActor.scala
@@ -18,7 +18,6 @@
package org.apache.openwhisk.core.yarn
import akka.actor.{Actor, ActorRef, ActorSystem}
-import akka.stream.ActorMaterializer
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.core.containerpool.{ContainerAddress, ContainerId}
import org.apache.openwhisk.core.entity.ExecManifest.ImageName
@@ -37,7 +36,6 @@
extends Actor {
implicit val as: ActorSystem = actorSystem
- implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val ec: ExecutionContext = actorSystem.dispatcher
val containerStartTimeoutMS = 60000
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/http/BasicHttpService.scala b/common/scala/src/main/scala/org/apache/openwhisk/http/BasicHttpService.scala
index a168df7..28ebc7e 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/http/BasicHttpService.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/http/BasicHttpService.scala
@@ -19,12 +19,12 @@
import akka.actor.ActorSystem
import akka.event.Logging
-import akka.http.scaladsl.{Http, HttpConnectionContext}
+import akka.http.scaladsl.{Http, ServerBuilder}
import akka.http.scaladsl.model.{HttpRequest, _}
import akka.http.scaladsl.server.RouteResult.Rejected
import akka.http.scaladsl.server._
import akka.http.scaladsl.server.directives._
-import akka.stream.ActorMaterializer
+
import kamon.metric.MeasurementUnit
import spray.json._
import org.apache.openwhisk.common.Https.HttpsConfig
@@ -169,15 +169,18 @@
* Starts an HTTP(S) route handler on given port and registers a shutdown hook.
*/
def startHttpService(route: Route, port: Int, config: Option[HttpsConfig] = None, interface: String = "0.0.0.0")(
- implicit actorSystem: ActorSystem,
- materializer: ActorMaterializer): Unit = {
- val connectionContext = config.map(Https.connectionContext(_)).getOrElse(HttpConnectionContext)
- val httpBinding = Http().bindAndHandle(route, interface, port, connectionContext = connectionContext)
+ implicit
+ actorSystem: ActorSystem): Unit = {
+ val httpsContext = config.map(Https.connectionContextServer(_))
+ var httpBindingBuilder: ServerBuilder = Http().newServerAt(interface, port)
+ if (httpsContext.isDefined) {
+ httpBindingBuilder = httpBindingBuilder.enableHttps(httpsContext.get)
+ }
+ val httpBinding = httpBindingBuilder.bindFlow(route)
addShutdownHook(httpBinding)
}
- def addShutdownHook(binding: Future[Http.ServerBinding])(implicit actorSystem: ActorSystem,
- materializer: ActorMaterializer): Unit = {
+ def addShutdownHook(binding: Future[Http.ServerBinding])(implicit actorSystem: ActorSystem): Unit = {
implicit val executionContext = actorSystem.dispatcher
sys.addShutdownHook {
Await.result(binding.map(_.unbind()), 30.seconds)
@@ -190,7 +193,7 @@
RejectionHandler.default.mapRejectionResponse {
case res @ HttpResponse(_, _, ent: HttpEntity.Strict, _) =>
val error = ErrorResponse(ent.data.utf8String, transid).toJson
- res.copy(entity = HttpEntity(ContentTypes.`application/json`, error.compactPrint))
+ res.withEntity(HttpEntity(ContentTypes.`application/json`, error.compactPrint))
case x => x
}
}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/http/PoolingRestClient.scala b/common/scala/src/main/scala/org/apache/openwhisk/http/PoolingRestClient.scala
index 7297edf..8d86654 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/http/PoolingRestClient.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/http/PoolingRestClient.scala
@@ -24,7 +24,7 @@
import akka.http.scaladsl.model._
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.http.scaladsl.unmarshalling._
-import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
+import akka.stream.{OverflowStrategy, QueueOfferResult}
import akka.stream.scaladsl.{Flow, _}
import spray.json._
import scala.concurrent.{ExecutionContext, Future, Promise}
@@ -49,7 +49,6 @@
require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.")
protected implicit val context: ExecutionContext = system.dispatcher
- protected implicit val materializer: ActorMaterializer = ActorMaterializer()
//if specified, override the ClientConnection idle-timeout and keepalive socket option value
private val timeoutSettings = {
@@ -128,7 +127,7 @@
}
}
- def shutdown(): Future[Unit] = Future.successful(materializer.shutdown())
+ def shutdown(): Future[Unit] = Future.unit
}
object PoolingRestClient {
diff --git a/core/controller/init.sh b/core/controller/init.sh
index 48b8d5c..9d399b5 100644
--- a/core/controller/init.sh
+++ b/core/controller/init.sh
@@ -19,6 +19,6 @@
./copyJMXFiles.sh
export CONTROLLER_OPTS
-CONTROLLER_OPTS="$CONTROLLER_OPTS -Dakka.remote.netty.tcp.bind-hostname=$(hostname -i) $(./transformEnvironment.sh)"
+CONTROLLER_OPTS="$CONTROLLER_OPTS -Dakka.remote.artery.bind.hostname=$(hostname -i) $(./transformEnvironment.sh)"
exec controller/bin/controller "$@"
diff --git a/core/controller/src/main/resources/application.conf b/core/controller/src/main/resources/application.conf
index 8358ced..e7bf1a9 100644
--- a/core/controller/src/main/resources/application.conf
+++ b/core/controller/src/main/resources/application.conf
@@ -86,7 +86,12 @@
# Check out all akka-remote-2.5.4 options here:
# http://doc.akka.io/docs/akka/2.5.4/scala/general/configuration.html#config-akka-remote
akka {
+ java-flight-recorder.enabled = false
remote {
+ artery {
+ enabled = on
+ transport = tcp
+ }
log-remote-lifecycle-events = DEBUG
log-received-messages = on
log-sent-messages = on
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
index f3af376..0e662fa 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
@@ -24,7 +24,6 @@
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.server.Route
-import akka.stream.ActorMaterializer
import kamon.Kamon
import org.apache.openwhisk.common.Https.HttpsConfig
import org.apache.openwhisk.common._
@@ -76,7 +75,6 @@
runtimes: Runtimes,
implicit val whiskConfig: WhiskConfig,
implicit val actorSystem: ActorSystem,
- implicit val materializer: ActorMaterializer,
implicit val logging: Logging)
extends BasicRasService {
@@ -90,6 +88,7 @@
* A Route in Akka is technically a function taking a RequestContext as a parameter.
*
* The "~" Akka DSL operator composes two independent Routes, building a routing tree structure.
+ *
* @see http://doc.akka.io/docs/akka-http/current/scala/http/routing-dsl/routes.html#composing-routes
*/
override def routes(implicit transid: TransactionId): Route = {
@@ -121,7 +120,7 @@
private implicit val activationIdFactory = new ActivationIdGenerator {}
private implicit val logStore = SpiLoader.get[LogStoreProvider].instance(actorSystem)
private implicit val activationStore =
- SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, logging)
+ SpiLoader.get[ActivationStoreProvider].instance(actorSystem, logging)
// register collections
Collection.initialize(entityStore)
@@ -278,20 +277,12 @@
ExecManifest.initialize(config) match {
case Success(_) =>
- val controller = new Controller(
- instance,
- ExecManifest.runtimesManifest,
- config,
- actorSystem,
- ActorMaterializer.create(actorSystem),
- logger)
+ val controller = new Controller(instance, ExecManifest.runtimesManifest, config, actorSystem, logger)
val httpsConfig =
if (Controller.protocol == "https") Some(loadConfigOrThrow[HttpsConfig]("whisk.controller.https")) else None
- BasicHttpService.startHttpService(controller.route, port, httpsConfig, interface)(
- actorSystem,
- controller.materializer)
+ BasicHttpService.startHttpService(controller.route, port, httpsConfig, interface)(actorSystem)
case Failure(t) =>
abort(s"Invalid runtimes manifest: $t")
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/RestAPIs.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/RestAPIs.scala
index 2dd35f0..74ae5bd 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/RestAPIs.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/RestAPIs.scala
@@ -23,7 +23,6 @@
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.server.directives.AuthenticationDirective
import akka.http.scaladsl.server.{Directives, Route}
-import akka.stream.ActorMaterializer
import pureconfig._
import pureconfig.generic.auto._
import spray.json.DefaultJsonProtocol._
@@ -160,7 +159,6 @@
class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
implicit val activeAckTopicIndex: ControllerInstanceId,
implicit val actorSystem: ActorSystem,
- implicit val materializer: ActorMaterializer,
implicit val logging: Logging,
implicit val entityStore: EntityStore,
implicit val entitlementProvider: EntitlementProvider,
@@ -306,8 +304,7 @@
override val cacheChangeNotification: Some[CacheChangeNotification],
override val executionContext: ExecutionContext,
override val logging: Logging,
- override val whiskConfig: WhiskConfig,
- override val materializer: ActorMaterializer)
+ override val whiskConfig: WhiskConfig)
extends WhiskTriggersApi
with WhiskServices
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Triggers.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Triggers.scala
index 86b793e..c70caee 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Triggers.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Triggers.scala
@@ -32,9 +32,7 @@
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.{RequestContext, RouteResult}
import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller}
-import akka.stream.ActorMaterializer
import spray.json.DefaultJsonProtocol._
-import com.typesafe.sslconfig.akka.AkkaSSLConfig
import pureconfig._
import pureconfig.generic.auto._
import spray.json._
@@ -63,12 +61,8 @@
/** Connection context for HTTPS */
protected lazy val httpsConnectionContext = {
- val sslConfig = AkkaSSLConfig().mapSettings { s =>
- s.withLoose(s.loose.withDisableHostnameVerification(true))
- }
val httpsConfig = loadConfigOrThrow[HttpsConfig]("whisk.controller.https")
- Https.connectionContext(httpsConfig, Some(sslConfig))
-
+ Https.connectionContextClient(httpsConfig, true)
}
protected val controllerProtocol = loadConfigOrThrow[String]("whisk.controller.protocol")
@@ -96,8 +90,6 @@
protected val triggersPath = "triggers"
protected val url = Uri(s"${controllerProtocol}://localhost:${whiskConfig.servicePort}")
- protected implicit val materializer: ActorMaterializer
-
import RestApiCommons.emptyEntityToJsObject
/**
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
index 7820207..901293a 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
@@ -23,7 +23,6 @@
import akka.actor.ActorSystem
import akka.event.Logging.InfoLevel
-import akka.stream.ActorMaterializer
import org.apache.kafka.clients.producer.RecordMetadata
import pureconfig._
import pureconfig.generic.auto._
@@ -45,9 +44,9 @@
*/
abstract class CommonLoadBalancer(config: WhiskConfig,
feedFactory: FeedFactory,
- controllerInstance: ControllerInstanceId)(implicit val actorSystem: ActorSystem,
+ controllerInstance: ControllerInstanceId)(implicit
+ val actorSystem: ActorSystem,
logging: Logging,
- materializer: ActorMaterializer,
messagingProvider: MessagingProvider)
extends LoadBalancer {
@@ -79,7 +78,7 @@
totalManagedActivationMemory.longValue)
}
- actorSystem.scheduler.schedule(10.seconds, 10.seconds)(emitMetrics())
+ actorSystem.scheduler.scheduleAtFixedRate(10.seconds, 10.seconds)(() => emitMetrics())
override def activeActivationsFor(namespace: UUID): Future[Int] =
Future.successful(activationsPerNamespace.get(namespace).map(_.intValue).getOrElse(0))
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
index 4f4c8ad..86ece17 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
@@ -291,7 +291,7 @@
def healthPingingTransitionHandler(state: InvokerState): TransitionHandler = {
case _ -> `state` =>
invokeTestAction()
- setTimer(InvokerActor.timerName, Tick, 1.minute, repeat = true)
+ startTimerAtFixedRate(InvokerActor.timerName, Tick, 1.minute)
case `state` -> _ => cancelTimer(InvokerActor.timerName)
}
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LeanBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LeanBalancer.scala
index 80def27..dac40c2 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LeanBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LeanBalancer.scala
@@ -18,7 +18,6 @@
package org.apache.openwhisk.core.loadBalancer
import akka.actor.{ActorRef, ActorSystem, Props}
-import akka.stream.ActorMaterializer
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.WhiskConfig._
import org.apache.openwhisk.core.connector._
@@ -46,8 +45,7 @@
controllerInstance: ControllerInstanceId,
implicit val messagingProvider: MessagingProvider = SpiLoader.get[MessagingProvider])(
implicit actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer)
+ logging: Logging)
extends CommonLoadBalancer(config, feedFactory, controllerInstance) {
/** Loadbalancer interface methods */
@@ -89,10 +87,8 @@
object LeanBalancer extends LoadBalancerProvider {
- override def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(
- implicit actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): LoadBalancer = {
+ override def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(implicit actorSystem: ActorSystem,
+ logging: Logging): LoadBalancer = {
new LeanBalancer(whiskConfig, createFeedFactory(whiskConfig, instance), instance)
}
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
index 4c87615..11db4a3 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
@@ -18,7 +18,6 @@
package org.apache.openwhisk.core.loadBalancer
import akka.actor.{ActorRefFactory, ActorSystem, Props}
-import akka.stream.ActorMaterializer
import org.apache.openwhisk.common.{InvokerHealth, Logging, TransactionId}
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.connector._
@@ -73,8 +72,7 @@
def requiredProperties: Map[String, String]
def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(implicit actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): LoadBalancer
+ logging: Logging): LoadBalancer
/** Return default FeedFactory */
def createFeedFactory(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(implicit actorSystem: ActorSystem,
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 1aaf68f..d443243 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -26,7 +26,6 @@
import akka.cluster.{Cluster, Member, MemberStatus}
import akka.management.scaladsl.AkkaManagement
import akka.management.cluster.bootstrap.ClusterBootstrap
-import akka.stream.ActorMaterializer
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy, Unresponsive}
import pureconfig._
@@ -152,8 +151,7 @@
val invokerPoolFactory: InvokerPoolFactory,
implicit val messagingProvider: MessagingProvider = SpiLoader.get[MessagingProvider])(
implicit actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer)
+ logging: Logging)
extends CommonLoadBalancer(config, feedFactory, controllerInstance) {
/** Build a cluster of all loadbalancers */
@@ -334,10 +332,8 @@
object ShardingContainerPoolBalancer extends LoadBalancerProvider {
- override def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(
- implicit actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): LoadBalancer = {
+ override def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(implicit actorSystem: ActorSystem,
+ logging: Logging): LoadBalancer = {
val invokerPoolFactory = new InvokerPoolFactory {
override def createInvokerPool(
diff --git a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidator.scala b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidator.scala
index 04472f1..c698a12 100644
--- a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidator.scala
+++ b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidator.scala
@@ -19,7 +19,6 @@
import akka.Done
import akka.actor.{ActorSystem, CoordinatedShutdown}
import akka.kafka.ProducerSettings
-import akka.stream.ActorMaterializer
import com.typesafe.config.Config
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.openwhisk.common.Logging
@@ -28,9 +27,7 @@
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.Success
-class CacheInvalidator(globalConfig: Config)(implicit system: ActorSystem,
- materializer: ActorMaterializer,
- log: Logging) {
+class CacheInvalidator(globalConfig: Config)(implicit system: ActorSystem, log: Logging) {
import CacheInvalidator._
val instanceId = "cache-invalidator"
val whisksCollection = "whisks"
diff --git a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/KafkaEventProducer.scala b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/KafkaEventProducer.scala
index 24f353e..0771276 100644
--- a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/KafkaEventProducer.scala
+++ b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/KafkaEventProducer.scala
@@ -31,10 +31,9 @@
import scala.collection.immutable.Seq
import scala.concurrent.{ExecutionContext, Future, Promise}
-case class KafkaEventProducer(
- settings: ProducerSettings[String, String],
- topic: String,
- eventProducerConfig: EventProducerConfig)(implicit system: ActorSystem, materializer: ActorMaterializer, log: Logging)
+case class KafkaEventProducer(settings: ProducerSettings[String, String],
+ topic: String,
+ eventProducerConfig: EventProducerConfig)(implicit system: ActorSystem, log: Logging)
extends EventProducer {
private implicit val executionContext: ExecutionContext = system.dispatcher
diff --git a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala
index 62d58a1..81a4a3e 100644
--- a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala
+++ b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala
@@ -18,7 +18,6 @@
package org.apache.openwhisk.core.database.cosmosdb.cache
import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
import kamon.Kamon
import org.apache.openwhisk.common.{AkkaLogging, ConfigMXBean, Logging}
import org.apache.openwhisk.http.{BasicHttpService, BasicRasService}
@@ -28,7 +27,6 @@
object Main {
def main(args: Array[String]): Unit = {
implicit val system: ActorSystem = ActorSystem("cache-invalidator-actor-system")
- implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val log: Logging = new AkkaLogging(akka.event.Logging.getLogger(system, this))
implicit val ec: ExecutionContext = system.dispatcher
ConfigMXBean.register()
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
index c6358aa..3c2ebd9 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
@@ -76,7 +76,7 @@
var resent: Option[Run] = None
val logMessageInterval = 10.seconds
//periodically emit metrics (don't need to do this for each message!)
- context.system.scheduler.schedule(30.seconds, 10.seconds, self, EmitMetrics)
+ context.system.scheduler.scheduleAtFixedRate(30.seconds, 10.seconds, self, EmitMetrics)
// Key is ColdStartKey, value is the number of cold Start in minute
var coldStartCount = immutable.Map.empty[ColdStartKey, Int]
@@ -92,7 +92,7 @@
.getOrElse(0)
.seconds
if (prewarmConfig.exists(!_.reactive.isEmpty)) {
- context.system.scheduler.schedule(
+ context.system.scheduler.scheduleAtFixedRate(
poolConfig.prewarmExpirationCheckInitDelay,
interval,
self,
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index 9af4f84..f50f6ca 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -34,7 +34,6 @@
import akka.pattern.pipe
import pureconfig.loadConfigOrThrow
import pureconfig.generic.auto._
-import akka.stream.ActorMaterializer
import java.net.InetSocketAddress
import java.net.SocketException
@@ -267,7 +266,6 @@
implicit val ec = context.system.dispatcher
implicit val logging = new AkkaLogging(context.system.log)
implicit val ac = context.system
- implicit val materializer = ActorMaterializer()
var rescheduleJob = false // true iff actor receives a job but cannot process it because actor will destroy itself
var runBuffer = immutable.Queue.empty[Run] //does not retain order, but does manage jobs that would have pushed past action concurrency limit
//track buffer processing state to avoid extra transitions near end of buffer - this provides a pseudo-state between Running and Ready
@@ -1128,7 +1126,7 @@
private def restartPing() = {
cancelPing() //just in case restart is called twice
scheduledPing = Some(
- context.system.scheduler.schedule(config.checkPeriod, config.checkPeriod, self, HealthPingSend))
+ context.system.scheduler.scheduleAtFixedRate(config.checkPeriod, config.checkPeriod, self, HealthPingSend))
}
private def cancelPing() = {
scheduledPing.foreach(_.cancel())
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
index 643ed08..9dd588e 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
@@ -31,7 +31,7 @@
import akka.pattern.after
import akka.stream.scaladsl.Source
import akka.stream.stage._
-import akka.stream.{ActorMaterializer, Attributes, Outlet, SourceShape}
+import akka.stream.{Attributes, Outlet, SourceShape}
import akka.util.ByteString
import collection.JavaConverters._
@@ -123,7 +123,6 @@
extends KubernetesApi
with ProcessRunner {
implicit protected val ec = executionContext
- implicit protected val am = ActorMaterializer()
implicit protected val scheduler = as.scheduler
implicit protected val kubeRestClient = testClient.getOrElse {
val configBuilder = new ConfigBuilder()
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
index 7260c83..fbdade7 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
@@ -102,18 +102,16 @@
private var prewarmConfigQueue = Queue.empty[(CodeExec[_], ByteSize, Option[FiniteDuration])]
private val prewarmCreateFailedCount = new AtomicInteger(0)
- val logScheduler = context.system.scheduler.schedule(0.seconds, 1.seconds) {
+ val logScheduler = context.system.scheduler.scheduleAtFixedRate(0.seconds, 1.seconds)(() => {
MetricEmitter.emitHistogramMetric(
LoggingMarkers.INVOKER_CONTAINERPOOL_MEMORY("inprogress"),
memoryConsumptionOf(inProgressPool))
- MetricEmitter.emitHistogramMetric(
- LoggingMarkers.INVOKER_CONTAINERPOOL_MEMORY("busy"),
- memoryConsumptionOf(busyPool))
- MetricEmitter.emitHistogramMetric(
- LoggingMarkers.INVOKER_CONTAINERPOOL_MEMORY("prewarmed"),
- memoryConsumptionOf(prewarmedPool))
+ MetricEmitter
+ .emitHistogramMetric(LoggingMarkers.INVOKER_CONTAINERPOOL_MEMORY("busy"), memoryConsumptionOf(busyPool))
+ MetricEmitter
+ .emitHistogramMetric(LoggingMarkers.INVOKER_CONTAINERPOOL_MEMORY("prewarmed"), memoryConsumptionOf(prewarmedPool))
MetricEmitter.emitHistogramMetric(LoggingMarkers.INVOKER_CONTAINERPOOL_MEMORY("max"), poolConfig.userMemory.toMB)
- }
+ })
// Key is ColdStartKey, value is the number of cold Start in minute
var coldStartCount = immutable.Map.empty[ColdStartKey, Int]
@@ -130,16 +128,16 @@
.seconds
if (prewarmConfig.exists(!_.reactive.isEmpty)) {
- context.system.scheduler.schedule(
+ context.system.scheduler.scheduleAtFixedRate(
poolConfig.prewarmExpirationCheckInitDelay,
interval,
self,
AdjustPrewarmedContainer)
}
- val resourceSubmitter = context.system.scheduler.schedule(0.seconds, poolConfig.memorySyncInterval) {
+ val resourceSubmitter = context.system.scheduler.scheduleAtFixedRate(0.seconds, poolConfig.memorySyncInterval)(() => {
syncMemoryInfo
- }
+ })
private def logContainerStart(c: ContainerCreationMessage, action: WhiskAction, containerState: String): Unit = {
val FQN = c.action
@@ -461,7 +459,7 @@
if (preWarmScheduler.isEmpty) {
preWarmScheduler = Some(
context.system.scheduler
- .schedule(0.seconds, config.creationDelay, self, PrewarmContainer(config.maxConcurrent)))
+ .scheduleAtFixedRate(0.seconds, config.creationDelay, self, PrewarmContainer(config.maxConcurrent)))
}
})
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
index 54bd174..c8b92a3 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
@@ -219,7 +219,7 @@
}
/**
- * This is to decide wether to change from the newState or not.
+ * This is to decide weather to change from the newState or not.
* If current state is already newState, it will stay, otherwise it will change its state.
*
* @param newState the desired state to change.
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
index 65083b1..437beb3 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
@@ -20,7 +20,6 @@
import akka.Done
import akka.actor.{ActorSystem, CoordinatedShutdown}
import akka.http.scaladsl.server.Route
-import akka.stream.ActorMaterializer
import com.typesafe.config.ConfigValueFactory
import kamon.Kamon
import org.apache.openwhisk.common.Https.HttpsConfig
@@ -202,9 +201,7 @@
if (Invoker.protocol == "https") Some(loadConfigOrThrow[HttpsConfig]("whisk.invoker.https")) else None
val invokerServer = SpiLoader.get[InvokerServerProvider].instance(invoker)
- BasicHttpService.startHttpService(invokerServer.route, port, httpsConfig)(
- actorSystem,
- ActorMaterializer.create(actorSystem))
+ BasicHttpService.startHttpService(invokerServer.route, port, httpsConfig)(actorSystem)
}
}
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
index 7ddff8d..3177cb7 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
@@ -25,7 +25,6 @@
import akka.event.Logging.InfoLevel
import akka.http.scaladsl.server.Directives.complete
import akka.http.scaladsl.server.Route
-import akka.stream.ActorMaterializer
import org.apache.openwhisk.common._
import org.apache.openwhisk.common.tracing.WhiskTracerProvider
import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
@@ -68,7 +67,6 @@
logging: Logging)
extends InvokerCore {
- implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val ec: ExecutionContext = actorSystem.dispatcher
implicit val cfg: WhiskConfig = config
@@ -105,7 +103,7 @@
/** Initialize needed databases */
private val entityStore = WhiskEntityStore.datastore()
private val activationStore =
- SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, logging)
+ SpiLoader.get[ActivationStoreProvider].instance(actorSystem, logging)
private val authStore = WhiskAuthStore.datastore()
diff --git a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
index 2ec5ace..d34be02 100644
--- a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
+++ b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
@@ -24,7 +24,7 @@
import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Committer, Consumer}
import akka.kafka.{CommitterSettings, ConsumerSettings, Subscriptions}
-import akka.stream.ActorMaterializer
+import akka.stream.RestartSettings
import akka.stream.scaladsl.{RestartSource, Sink}
import javax.management.ObjectName
import org.apache.kafka.clients.consumer.ConsumerConfig
@@ -48,7 +48,7 @@
case class EventConsumer(settings: ConsumerSettings[String, String],
recorders: Seq[MetricRecorder],
- metricConfig: MetricConfig)(implicit system: ActorSystem, materializer: ActorMaterializer)
+ metricConfig: MetricConfig)(implicit system: ActorSystem)
extends KafkaMetricsProvider {
import EventConsumer._
@@ -91,10 +91,11 @@
private val result = RestartSource
.onFailuresWithBackoff(
- minBackoff = metricConfig.retry.minBackoff,
- maxBackoff = metricConfig.retry.maxBackoff,
- randomFactor = metricConfig.retry.randomFactor,
- maxRestarts = metricConfig.retry.maxRestarts) { () =>
+ RestartSettings(
+ minBackoff = metricConfig.retry.minBackoff,
+ maxBackoff = metricConfig.retry.maxBackoff,
+ randomFactor = metricConfig.retry.randomFactor)
+ .withMaxRestarts(metricConfig.retry.maxRestarts, metricConfig.retry.minBackoff)) { () =>
Consumer
.committableSource(updatedSettings, Subscriptions.topics(userEventTopic))
// this is to access to the Consumer.Control
@@ -109,7 +110,7 @@
.runWith(Sink.ignore)
private val lagRecorder =
- system.scheduler.schedule(10.seconds, 10.seconds)(lagGauge.update(consumerLag))
+ system.scheduler.scheduleAtFixedRate(10.seconds, 10.seconds)(() => lagGauge.update(consumerLag))
private def processEvent(value: String): Unit = {
EventMessage
diff --git a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/Main.scala b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/Main.scala
index 286906f..231e5ea 100644
--- a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/Main.scala
+++ b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/Main.scala
@@ -19,7 +19,6 @@
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
-import akka.stream.ActorMaterializer
import kamon.Kamon
import scala.concurrent.duration.DurationInt
@@ -29,13 +28,11 @@
def main(args: Array[String]): Unit = {
Kamon.init()
implicit val system: ActorSystem = ActorSystem("events-actor-system")
- implicit val materializer: ActorMaterializer = ActorMaterializer()
val binding = OpenWhiskEvents.start(system.settings.config)
addShutdownHook(binding)
}
- private def addShutdownHook(binding: Future[Http.ServerBinding])(implicit actorSystem: ActorSystem,
- materializer: ActorMaterializer): Unit = {
+ private def addShutdownHook(binding: Future[Http.ServerBinding])(implicit actorSystem: ActorSystem): Unit = {
implicit val ec: ExecutionContextExecutor = actorSystem.dispatcher
sys.addShutdownHook {
Await.result(binding.map(_.unbind()), 30.seconds)
diff --git a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala
index 034e4e7..abdb7c4 100644
--- a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala
+++ b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala
@@ -21,7 +21,6 @@
import akka.event.slf4j.SLF4JLogging
import akka.http.scaladsl.Http
import akka.kafka.ConsumerSettings
-import akka.stream.ActorMaterializer
import com.typesafe.config.Config
import kamon.Kamon
import kamon.prometheus.PrometheusReporter
@@ -42,8 +41,7 @@
case class RetryConfig(minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)
- def start(config: Config)(implicit system: ActorSystem,
- materializer: ActorMaterializer): Future[Http.ServerBinding] = {
+ def start(config: Config)(implicit system: ActorSystem): Future[Http.ServerBinding] = {
implicit val ec: ExecutionContext = system.dispatcher
val prometheusReporter = new PrometheusReporter()
@@ -61,7 +59,7 @@
}
val port = metricConfig.port
val api = new PrometheusEventsApi(eventConsumer, prometheusRecorder)
- val httpBinding = Http().bindAndHandle(api.routes, "0.0.0.0", port)
+ val httpBinding = Http().newServerAt("0.0.0.0", port).bindFlow(api.routes)
httpBinding.foreach(_ => log.info(s"Started the http server on http://localhost:$port"))(system.dispatcher)
httpBinding
}
diff --git a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/EventsTestHelper.scala b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/EventsTestHelper.scala
index 1b53e31..7c28799 100644
--- a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/EventsTestHelper.scala
+++ b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/EventsTestHelper.scala
@@ -20,7 +20,6 @@
import java.net.ServerSocket
import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
import com.typesafe.config.Config
import org.apache.openwhisk.core.monitoring.metrics.OpenWhiskEvents.MetricConfig
import pureconfig._
@@ -29,8 +28,7 @@
trait EventsTestHelper {
protected def createConsumer(kport: Int, globalConfig: Config, recorder: MetricRecorder)(
- implicit system: ActorSystem,
- materializer: ActorMaterializer) = {
+ implicit system: ActorSystem) = {
val settings = OpenWhiskEvents
.eventConsumerSettings(OpenWhiskEvents.defaultConsumerConfig(globalConfig))
.withBootstrapServers(s"localhost:$kport")
diff --git a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KafkaSpecBase.scala b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KafkaSpecBase.scala
index 6c7be2e..5ba7491 100644
--- a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KafkaSpecBase.scala
+++ b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KafkaSpecBase.scala
@@ -18,7 +18,6 @@
package org.apache.openwhisk.core.monitoring.metrics
import akka.kafka.testkit.scaladsl.{EmbeddedKafkaLike, ScalatestKafkaSpec}
-import akka.stream.ActorMaterializer
import net.manub.embeddedkafka.EmbeddedKafka
import org.scalatest._
import org.scalatest.concurrent.{Eventually, IntegrationPatience, ScalaFutures}
@@ -36,7 +35,6 @@
with Eventually
with EventsTestHelper { this: Suite =>
implicit val timeoutConfig: PatienceConfig = PatienceConfig(1.minute)
- implicit val materializer: ActorMaterializer = ActorMaterializer()
override val sleepAfterProduce: FiniteDuration = 10.seconds
override protected val topicCreationTimeout = 60.seconds
}
diff --git a/core/scheduler/init.sh b/core/scheduler/init.sh
index 8d359d5..6801156 100644
--- a/core/scheduler/init.sh
+++ b/core/scheduler/init.sh
@@ -19,6 +19,6 @@
./copyJMXFiles.sh
export SCHEDULER_OPTS
-SCHEDULER_OPTS="$SCHEDULER_OPTS -Dakka.remote.netty.tcp.bind-hostname=$(hostname -i) $(./transformEnvironment.sh)"
+SCHEDULER_OPTS="$SCHEDULER_OPTS -Dakka.remote.artery.bind.hostname=$(hostname -i) $(./transformEnvironment.sh)"
exec scheduler/bin/scheduler "$@"
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
index 61f9927..d3d60d4 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
@@ -19,7 +19,6 @@
import akka.Done
import akka.actor.{ActorRef, ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown}
-import akka.stream.ActorMaterializer
import akka.util.Timeout
import com.typesafe.config.ConfigValueFactory
import kamon.Kamon
@@ -45,11 +44,9 @@
import scala.util.{Failure, Success, Try}
import pureconfig.generic.auto._
-class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerEndpoints)(
- implicit config: WhiskConfig,
- actorSystem: ActorSystem,
- materializer: ActorMaterializer,
- logging: Logging)
+class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerEndpoints)(implicit config: WhiskConfig,
+ actorSystem: ActorSystem,
+ logging: Logging)
extends SchedulerCore {
implicit val ec = actorSystem.dispatcher
private val authStore = WhiskAuthStore.datastore()
@@ -65,7 +62,7 @@
implicit val entityStore = WhiskEntityStore.datastore()
private val activationStore =
- SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, logging)
+ SpiLoader.get[ActivationStoreProvider].instance(actorSystem, logging)
private val ack = {
val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) else None
@@ -198,7 +195,6 @@
implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
implicit val actorSystem: ActorSystem =
ActorSystem(name = "scheduler-actor-system", defaultExecutionContext = Some(ec))
- implicit val materializer = ActorMaterializer.create(actorSystem)
implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
@@ -257,9 +253,7 @@
val httpsConfig =
if (Scheduler.protocol == "https") Some(loadConfigOrThrow[HttpsConfig]("whisk.controller.https")) else None
- BasicHttpService.startHttpService(FPCSchedulerServer.instance(scheduler).route, port, httpsConfig)(
- actorSystem,
- ActorMaterializer.create(actorSystem))
+ BasicHttpService.startHttpService(FPCSchedulerServer.instance(scheduler).route, port, httpsConfig)(actorSystem)
case Failure(t) =>
abort(s"Invalid runtimes manifest: $t")
@@ -274,7 +268,7 @@
def getRemoteRef(name: String)(implicit context: ActorRefFactory): ActorSelection = {
implicit val ec = context.dispatcher
- val path = s"akka.tcp://scheduler-actor-system@${asAkkaEndpoint}/user/${name}"
+ val path = s"akka://scheduler-actor-system@${asAkkaEndpoint}/user/${name}"
context.actorSelection(path)
}
@@ -292,7 +286,7 @@
def getRemoteRef(name: String)(implicit context: ActorRefFactory): ActorSelection = {
implicit val ec = context.dispatcher
- val path = s"akka.tcp://scheduler-actor-system@${endpoints.asAkkaEndpoint}/user/${name}"
+ val path = s"akka//scheduler-actor-system@${endpoints.asAkkaEndpoint}/user/${name}"
context.actorSelection(path)
}
diff --git a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/CouchDBLauncher.scala b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/CouchDBLauncher.scala
index 64345a3..5a54f95 100644
--- a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/CouchDBLauncher.scala
+++ b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/CouchDBLauncher.scala
@@ -26,7 +26,6 @@
import akka.http.scaladsl.model.headers.{Accept, Authorization, BasicHttpCredentials}
import akka.http.scaladsl.model.{HttpHeader, HttpMethods, HttpRequest, MediaTypes, StatusCode, StatusCodes, Uri}
import akka.http.scaladsl.unmarshalling.Unmarshal
-import akka.stream.ActorMaterializer
import com.typesafe.config.ConfigFactory
import org.apache.commons.io.IOUtils
import org.apache.openwhisk.common.{Logging, TransactionId}
@@ -44,7 +43,6 @@
class CouchDBLauncher(docker: StandaloneDockerClient, port: Int, dataDir: File)(implicit logging: Logging,
ec: ExecutionContext,
actorSystem: ActorSystem,
- materializer: ActorMaterializer,
tid: TransactionId) {
case class CouchDBConfig(image: String,
user: String,
diff --git a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/KafkaLauncher.scala b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/KafkaLauncher.scala
index 5d4d984..2c67908 100644
--- a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/KafkaLauncher.scala
+++ b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/KafkaLauncher.scala
@@ -20,7 +20,6 @@
import java.io.File
import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
import kafka.server.KafkaConfig
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.commons.io.FileUtils
@@ -35,16 +34,13 @@
import scala.reflect.io.Directory
import scala.util.Try
-class KafkaLauncher(docker: StandaloneDockerClient,
- kafkaPort: Int,
- kafkaDockerPort: Int,
- zkPort: Int,
- workDir: File,
- kafkaUi: Boolean)(implicit logging: Logging,
- ec: ExecutionContext,
- actorSystem: ActorSystem,
- materializer: ActorMaterializer,
- tid: TransactionId) {
+class KafkaLauncher(
+ docker: StandaloneDockerClient,
+ kafkaPort: Int,
+ kafkaDockerPort: Int,
+ zkPort: Int,
+ workDir: File,
+ kafkaUi: Boolean)(implicit logging: Logging, ec: ExecutionContext, actorSystem: ActorSystem, tid: TransactionId) {
def run(): Future[Seq[ServiceContainer]] = {
for {
@@ -115,10 +111,9 @@
object KafkaAwareLeanBalancer extends LoadBalancerProvider {
override def requiredProperties: Map[String, String] = LeanBalancer.requiredProperties ++ kafkaHosts
- override def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(
- implicit actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): LoadBalancer = LeanBalancer.instance(whiskConfig, instance)
+ override def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(implicit actorSystem: ActorSystem,
+ logging: Logging): LoadBalancer =
+ LeanBalancer.instance(whiskConfig, instance)
}
object KafkaLauncher {
diff --git a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/PlaygroundLauncher.scala b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/PlaygroundLauncher.scala
index bac8cc9..ff52c40 100644
--- a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/PlaygroundLauncher.scala
+++ b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/PlaygroundLauncher.scala
@@ -23,7 +23,6 @@
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.server.directives.FileAndResourceDirectives.ResourceFile
-import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import ch.megard.akka.http.cors.scaladsl.CorsDirectives._
import org.apache.commons.io.IOUtils
@@ -38,17 +37,14 @@
import scala.sys.process._
import scala.util.{Failure, Success, Try}
-class PlaygroundLauncher(host: String,
- extHost: String,
- controllerPort: Int,
- pgPort: Int,
- authKey: String,
- devMode: Boolean,
- noBrowser: Boolean)(implicit logging: Logging,
- ec: ExecutionContext,
- actorSystem: ActorSystem,
- materializer: ActorMaterializer,
- tid: TransactionId) {
+class PlaygroundLauncher(
+ host: String,
+ extHost: String,
+ controllerPort: Int,
+ pgPort: Int,
+ authKey: String,
+ devMode: Boolean,
+ noBrowser: Boolean)(implicit logging: Logging, ec: ExecutionContext, actorSystem: ActorSystem, tid: TransactionId) {
private val interface = loadConfigOrThrow[String]("whisk.controller.interface")
private val jsFileName = "playgroundFunctions.js"
private val jsContentType = ContentType(MediaTypes.`application/javascript`, HttpCharsets.`UTF-8`)
@@ -75,7 +71,7 @@
private val wsk = new Wsk(host, controllerPort, authKey)
def run(): ServiceContainer = {
- BasicHttpService.startHttpService(PlaygroundService.route, pgPort, None, interface)(actorSystem, materializer)
+ BasicHttpService.startHttpService(PlaygroundService.route, pgPort, None, interface)(actorSystem)
ServiceContainer(pgPort, pgUrl, "Playground")
}
diff --git a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/StandaloneOpenWhisk.scala b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/StandaloneOpenWhisk.scala
index f4ee2b7..4ea758d 100644
--- a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/StandaloneOpenWhisk.scala
+++ b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/StandaloneOpenWhisk.scala
@@ -24,7 +24,6 @@
import akka.actor.ActorSystem
import akka.event.slf4j.SLF4JLogging
import akka.http.scaladsl.model.Uri
-import akka.stream.ActorMaterializer
import org.apache.commons.io.{FileUtils, FilenameUtils, IOUtils}
import org.apache.openwhisk.common.TransactionId.systemPrefix
import org.apache.openwhisk.common.{AkkaLogging, Config, Logging, TransactionId}
@@ -222,7 +221,6 @@
initialize(conf)
//Create actor system only after initializing the config
implicit val actorSystem: ActorSystem = ActorSystem("standalone-actor-system")
- implicit val materializer: ActorMaterializer = ActorMaterializer.create(actorSystem)
implicit val logger: Logging = createLogging(actorSystem, conf)
implicit val ec: ExecutionContext = actorSystem.dispatcher
@@ -277,8 +275,7 @@
}
}
- def startServer(
- conf: Conf)(implicit actorSystem: ActorSystem, materializer: ActorMaterializer, logging: Logging): Unit = {
+ def startServer(conf: Conf)(implicit actorSystem: ActorSystem, logging: Logging): Unit = {
if (canInstallUserAndActions(conf)) {
bootstrapUsers()
}
@@ -340,9 +337,7 @@
Controller.start(Array("standalone"))
}
- private def bootstrapUsers()(implicit actorSystem: ActorSystem,
- materializer: ActorMaterializer,
- logging: Logging): Unit = {
+ private def bootstrapUsers()(implicit actorSystem: ActorSystem, logging: Logging): Unit = {
implicit val userTid: TransactionId = TransactionId(systemPrefix + "userBootstrap")
getUsers().foreach {
case (subject, key) =>
@@ -482,8 +477,7 @@
private def startCouchDb(dataDir: File, dockerClient: StandaloneDockerClient)(
implicit logging: Logging,
as: ActorSystem,
- ec: ExecutionContext,
- materializer: ActorMaterializer): ServiceContainer = {
+ ec: ExecutionContext): ServiceContainer = {
implicit val tid: TransactionId = TransactionId(systemPrefix + "couchDB")
val port = checkOrAllocatePort(5984)
val dbDataDir = new File(dataDir, "couchdb")
@@ -501,8 +495,7 @@
private def startKafka(workDir: File, dockerClient: StandaloneDockerClient, conf: Conf, kafkaUi: Boolean)(
implicit logging: Logging,
as: ActorSystem,
- ec: ExecutionContext,
- materializer: ActorMaterializer): (Int, Seq[ServiceContainer]) = {
+ ec: ExecutionContext): (Int, Seq[ServiceContainer]) = {
implicit val tid: TransactionId = TransactionId(systemPrefix + "kafka")
val kafkaPort = getPort(conf.kafkaPort.toOption, preferredKafkaPort)
val kafkaDockerPort = getPort(conf.kafkaDockerPort.toOption, preferredKafkaDockerPort)
@@ -536,11 +529,9 @@
existingUserEventSvcPort: Option[Int],
workDir: File,
dataDir: File,
- dockerClient: StandaloneDockerClient)(
- implicit logging: Logging,
- as: ActorSystem,
- ec: ExecutionContext,
- materializer: ActorMaterializer): Seq[ServiceContainer] = {
+ dockerClient: StandaloneDockerClient)(implicit logging: Logging,
+ as: ActorSystem,
+ ec: ExecutionContext): Seq[ServiceContainer] = {
implicit val tid: TransactionId = TransactionId(systemPrefix + "userevents")
val k = new UserEventLauncher(dockerClient, owPort, kafkaDockerPort, existingUserEventSvcPort, workDir, dataDir)
@@ -562,9 +553,8 @@
setSysProp("whisk.docker.standalone.container-factory.pull-standard-images", "false")
}
- private def createPgLauncher(
- owPort: Int,
- conf: Conf)(implicit logging: Logging, as: ActorSystem, ec: ExecutionContext, materializer: ActorMaterializer) = {
+ private def createPgLauncher(owPort: Int,
+ conf: Conf)(implicit logging: Logging, as: ActorSystem, ec: ExecutionContext) = {
implicit val tid: TransactionId = TransactionId(systemPrefix + "playground")
val pgPort = getPort(conf.uiPort.toOption, preferredPgPort)
new PlaygroundLauncher(
diff --git a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/UserEventLauncher.scala b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/UserEventLauncher.scala
index 71ee271..273ac3f 100644
--- a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/UserEventLauncher.scala
+++ b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/UserEventLauncher.scala
@@ -21,7 +21,6 @@
import java.nio.charset.StandardCharsets.UTF_8
import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
import org.apache.commons.io.{FileUtils, IOUtils}
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.standalone.StandaloneDockerSupport.{checkOrAllocatePort, containerName, createRunCmd}
@@ -30,16 +29,13 @@
import scala.concurrent.{ExecutionContext, Future}
-class UserEventLauncher(docker: StandaloneDockerClient,
- owPort: Int,
- kafkaDockerPort: Int,
- existingUserEventSvcPort: Option[Int],
- workDir: File,
- dataDir: File)(implicit logging: Logging,
- ec: ExecutionContext,
- actorSystem: ActorSystem,
- materializer: ActorMaterializer,
- tid: TransactionId) {
+class UserEventLauncher(
+ docker: StandaloneDockerClient,
+ owPort: Int,
+ kafkaDockerPort: Int,
+ existingUserEventSvcPort: Option[Int],
+ workDir: File,
+ dataDir: File)(implicit logging: Logging, ec: ExecutionContext, actorSystem: ActorSystem, tid: TransactionId) {
//owPort+1 is used by Api Gateway
private val userEventPort = existingUserEventSvcPort.getOrElse(checkOrAllocatePort(owPort + 2))
diff --git a/settings.gradle b/settings.gradle
index 4792aa9..861f00a 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -61,9 +61,9 @@
config: new File(rootProject.projectDir, '.scalafmt.conf')
]
-gradle.ext.akka = [version : '2.5.26']
+gradle.ext.akka = [version : '2.6.12']
gradle.ext.akka_kafka = [version : '2.0.2']
-gradle.ext.akka_http = [version : '10.1.11']
+gradle.ext.akka_http = [version : '10.2.4']
gradle.ext.akka_management = [version : '1.0.5']
gradle.ext.curator = [version : '4.0.0']
diff --git a/tests/src/test/scala/common/rest/WskRestOperations.scala b/tests/src/test/scala/common/rest/WskRestOperations.scala
index 2c652d9..6769dda 100644
--- a/tests/src/test/scala/common/rest/WskRestOperations.scala
+++ b/tests/src/test/scala/common/rest/WskRestOperations.scala
@@ -31,10 +31,9 @@
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.Uri.{Path, Query}
import akka.http.scaladsl.model.headers.{Authorization, BasicHttpCredentials, OAuth2BearerToken}
-import akka.stream.ActorMaterializer
import akka.util.ByteString
-import com.typesafe.sslconfig.akka.AkkaSSLConfig
import common.TestUtils.{ANY_ERROR_EXIT, DONTCARE_EXIT, RunResult, SUCCESS_EXIT}
+import common.rest.SSL.httpsConfig
import common.{
DeleteFromCollectionOperations,
HasActivation,
@@ -47,7 +46,7 @@
import javax.net.ssl._
import org.apache.commons.io.{FileUtils, FilenameUtils}
import org.apache.openwhisk.common.Https.HttpsConfig
-import org.apache.openwhisk.common.{AkkaLogging, TransactionId}
+import org.apache.openwhisk.common.{AkkaLogging, Https, TransactionId}
import org.apache.openwhisk.core.entity.ByteSize
import org.apache.openwhisk.utils.retry
import org.scalatest.Matchers
@@ -103,10 +102,7 @@
}
def httpsConnectionContext(implicit system: ActorSystem): HttpsConnectionContext = {
- val sslConfig = AkkaSSLConfig().mapSettings { s =>
- s.withHostnameVerifierClass(classOf[AcceptAllHostNameVerifier].asInstanceOf[Class[HostnameVerifier]])
- }
- new HttpsConnectionContext(SSL.nonValidatingContext(httpsConfig.clientAuth.toBoolean), Some(sslConfig))
+ Https.connectionContextClient(httpsConfig, true)
}
}
@@ -115,6 +111,7 @@
/**
* Returns either the https context that is tailored for self-signed certificates on the controller, or
* a default connection context used in Http.SingleRequest
+ *
* @param protocol protocol used to communicate with controller API
* @param system actor system
* @return https connection context
@@ -816,8 +813,8 @@
* @param expectedExitCode (optional) the expected exit code for the command
* if the code is anything but DONTCARE_EXIT, assert the code is as expected
*/
- override def list(expectedExitCode: Int = OK.intValue, nameSort: Option[Boolean] = None)(
- implicit wp: WskProps): RestResult = {
+ override def list(expectedExitCode: Int = OK.intValue, nameSort: Option[Boolean] = None)(implicit
+ wp: WskProps): RestResult = {
val entPath = Path(s"$basePath/namespaces")
val resp = requestEntity(GET, entPath)
val result = new RestResult(resp.status, getTransactionId(resp), getRespData(resp))
@@ -1145,13 +1142,9 @@
implicit val config: PatienceConfig = PatienceConfig(100 seconds, 15 milliseconds)
implicit val actorSystem: ActorSystem
lazy implicit val executionContext: ExecutionContext = actorSystem.dispatcher
- lazy implicit val materializer: ActorMaterializer = ActorMaterializer()
- lazy val sslConfig: AkkaSSLConfig = AkkaSSLConfig().mapSettings {
- _.withHostnameVerifierClass(classOf[AcceptAllHostNameVerifier].asInstanceOf[Class[HostnameVerifier]])
- }
-
- lazy val connectionContext = new HttpsConnectionContext(SSL.nonValidatingContext(), Some(sslConfig))
+ lazy val connectionContext =
+ Https.connectionContextClient(SSL.nonValidatingContext(httpsConfig.clientAuth.toBoolean), true)
def isStatusCodeExpected(expectedExitCode: Int, statusCode: Int): Boolean = {
if ((expectedExitCode != DONTCARE_EXIT) && (expectedExitCode != ANY_ERROR_EXIT))
diff --git a/tests/src/test/scala/ha/CacheInvalidationTests.scala b/tests/src/test/scala/ha/CacheInvalidationTests.scala
index 6e7640a..33e2d0d 100644
--- a/tests/src/test/scala/ha/CacheInvalidationTests.scala
+++ b/tests/src/test/scala/ha/CacheInvalidationTests.scala
@@ -30,7 +30,6 @@
import akka.http.scaladsl.model.headers.Authorization
import akka.http.scaladsl.model.headers.BasicHttpCredentials
import akka.http.scaladsl.unmarshalling.Unmarshal
-import akka.stream.ActorMaterializer
import common.WhiskProperties
import common.WskActorSystem
import common.WskTestHelpers
@@ -42,8 +41,6 @@
@RunWith(classOf[JUnitRunner])
class CacheInvalidationTests extends FlatSpec with Matchers with WskTestHelpers with WskActorSystem {
- implicit val materializer = ActorMaterializer()
-
val hosts = WhiskProperties.getProperty("controller.hosts").split(",")
val controllerProtocol = loadConfigOrThrow[String]("whisk.controller.protocol")
diff --git a/tests/src/test/scala/ha/ShootComponentsTests.scala b/tests/src/test/scala/ha/ShootComponentsTests.scala
index b057bc9..386b46d 100644
--- a/tests/src/test/scala/ha/ShootComponentsTests.scala
+++ b/tests/src/test/scala/ha/ShootComponentsTests.scala
@@ -31,7 +31,6 @@
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.unmarshalling.Unmarshal
-import akka.stream.ActorMaterializer
import common._
import common.rest.{HttpConnection, WskRestOperations}
import pureconfig._
@@ -58,7 +57,6 @@
val wsk = new WskRestOperations
val defaultAction = Some(TestUtils.getTestActionFilename("hello.js"))
- implicit val materializer = ActorMaterializer()
implicit val testConfig = PatienceConfig(1.minute)
val controllerProtocol = loadConfigOrThrow[String]("whisk.controller.protocol")
diff --git a/tests/src/test/scala/org/apache/openwhisk/common/PrometheusTests.scala b/tests/src/test/scala/org/apache/openwhisk/common/PrometheusTests.scala
index 8d0c80c..f8e456c 100644
--- a/tests/src/test/scala/org/apache/openwhisk/common/PrometheusTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/common/PrometheusTests.scala
@@ -16,7 +16,7 @@
*/
package org.apache.openwhisk.common
-import akka.http.scaladsl.coding.Gzip
+import akka.http.scaladsl.coding.{Coders}
import akka.http.scaladsl.model.{HttpCharsets, HttpResponse}
import akka.http.scaladsl.model.headers.HttpEncodings.gzip
import akka.http.scaladsl.model.headers.{`Accept-Encoding`, `Content-Encoding`, HttpEncoding, HttpEncodings}
@@ -65,7 +65,7 @@
contentType.mediaType.params("version") shouldBe "0.0.4"
response should haveContentEncoding(gzip)
- val responseText = Unmarshal(Gzip.decodeMessage(response)).to[String].futureValue
+ val responseText = Unmarshal(Coders.Gzip.decodeMessage(response)).to[String].futureValue
withClue(responseText) {
responseText should include("foo_bar")
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
index 2c25eac..e70467a 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -20,7 +20,6 @@
import java.io.IOException
import java.time.Instant
-import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
import common.TimingHelpers
@@ -89,8 +88,6 @@
stream.reset()
}
- implicit val materializer: ActorMaterializer = ActorMaterializer()
-
/** Reads logs into memory and awaits them */
def awaitLogs(source: Source[ByteString, Any], timeout: FiniteDuration = 500.milliseconds): Vector[String] =
Await.result(source.via(DockerToActivationLogStore.toFormattedString).runWith(Sink.seq[String]), timeout).toVector
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
index 0fecb7a..53e634c 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
@@ -20,7 +20,6 @@
import java.time.Instant
import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Concat, Sink, Source}
import scala.concurrent.Await
@@ -57,8 +56,6 @@
import KubernetesClientTests._
- implicit val materializer: ActorMaterializer = ActorMaterializer()
-
val commandTimeout = 500.milliseconds
def await[A](f: Future[A], timeout: FiniteDuration = commandTimeout) = Await.result(f, timeout)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
index 0959193..0c0b5a0 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -21,7 +21,6 @@
import java.time.{Instant, ZoneId}
import akka.NotUsed
-import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.util.ByteString
import common.TimingHelpers
@@ -76,8 +75,6 @@
stream.reset()
}
- implicit val materializer: ActorMaterializer = ActorMaterializer()
-
def instantDT(instant: Instant): Instant = Instant.from(instant.atZone(ZoneId.of("GMT+0")))
val Epoch = Instant.EPOCH
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStoreTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStoreTests.scala
index 6d14ad9..dcdcb9b 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStoreTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStoreTests.scala
@@ -24,7 +24,6 @@
import akka.http.scaladsl.model.HttpMethods.{GET, POST}
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.{Accept, RawHeader}
-import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Flow
import akka.testkit.TestKit
import common.StreamLogging
@@ -52,7 +51,6 @@
with StreamLogging {
implicit val ec: ExecutionContext = system.dispatcher
- implicit val materializer: ActorMaterializer = ActorMaterializer()
private val uuid = UUID()
private val tenantId = s"testSpace_${uuid}"
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchRestClientTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchRestClientTests.scala
index 49ae2a4..c3c34cb 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchRestClientTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchRestClientTests.scala
@@ -27,7 +27,6 @@
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.Accept
import akka.stream.scaladsl.Flow
-import akka.stream.ActorMaterializer
import akka.testkit.TestKit
import akka.http.scaladsl.model.HttpMethods.POST
import common.StreamLogging
@@ -47,7 +46,6 @@
with StreamLogging {
implicit val ec: ExecutionContext = system.dispatcher
- implicit val materializer: ActorMaterializer = ActorMaterializer()
private val defaultResponseSource =
"""{"stream":"stdout","activationId":"197d60b33137424ebd60b33137d24ea3","action":"guest/someAction","@version":"1","@timestamp":"2018-03-27T15:48:09.112Z","type":"user_logs","tenant":"19bc46b1-71f6-4ed5-8c54-816aa4f8c502","message":"namespace : user@email.com\n","time_date":"2018-03-27T15:48:08.716152793Z"}"""
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/SplunkLogStoreTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/SplunkLogStoreTests.scala
index d8816e0..0109fa4 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/SplunkLogStoreTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/SplunkLogStoreTests.scala
@@ -26,7 +26,6 @@
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.RawHeader
import akka.http.scaladsl.unmarshalling.Unmarshal
-import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Flow
import akka.testkit.TestKit
import common.StreamLogging
@@ -106,7 +105,6 @@
val context = UserContext(user, request)
implicit val ec: ExecutionContext = system.dispatcher
- implicit val materializer: ActorMaterializer = ActorMaterializer()
val testFlow: Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), NotUsed] =
Flow[(HttpRequest, Promise[HttpResponse])]
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationFileLogStoreTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationFileLogStoreTests.scala
index 38d5f83..2fc7e29 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationFileLogStoreTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationFileLogStoreTests.scala
@@ -71,7 +71,8 @@
val testActor = TestProbe()
val container = new TestContainer(testSource)
- val store = new TestLogStoreTo(Flow[ByteString].map(_.utf8String).to(Sink.actorRef(testActor.ref, ())))
+ val store =
+ new TestLogStoreTo(Flow[ByteString].map(_.utf8String).to(Sink.actorRef(testActor.ref, (), (_: Throwable) => _)))
val collected = store.collectLogs(TransactionId.testing, user, successfulActivation, container, action)
@@ -93,7 +94,8 @@
val testActor = TestProbe()
val container = new TestContainer(Source(toByteString(logs)))
- val store = new TestLogStoreTo(Flow[ByteString].map(_.utf8String).to(Sink.actorRef(testActor.ref, ())))
+ val store =
+ new TestLogStoreTo(Flow[ByteString].map(_.utf8String).to(Sink.actorRef(testActor.ref, (), (_: Throwable) => _)))
val ex = the[LogCollectingException] thrownBy await(
store.collectLogs(TransactionId.testing, user, developerErrorActivation, container, action))
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
index 4496742..c27a298 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
@@ -18,7 +18,6 @@
package org.apache.openwhisk.core.containerpool.mesos.test
import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.testkit.TestKit
import akka.testkit.TestProbe
@@ -327,7 +326,6 @@
val container = await(c)
implicit val tid = TransactionId.testing
- implicit val m = ActorMaterializer()
val logs = container
.logs(actionMemory, false)
.via(DockerToActivationLogStore.toFormattedString)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
index e6ed5ed..1904d8c 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
@@ -21,7 +21,6 @@
import java.util.concurrent.TimeUnit
import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, Props}
-import akka.stream.ActorMaterializer
import akka.testkit.{ImplicitSender, TestActor, TestKit, TestProbe}
import common.StreamLogging
import org.apache.kafka.clients.producer.RecordMetadata
@@ -93,7 +92,6 @@
val timeout = 5.seconds
- private implicit val mt = ActorMaterializer()
private implicit val transId = TransactionId.testing
private implicit val creationId = CreationId.generate()
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/InvokerHealthManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/InvokerHealthManagerTests.scala
index d574069..7d12bec 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/InvokerHealthManagerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/InvokerHealthManagerTests.scala
@@ -19,7 +19,6 @@
import akka.actor.FSM.{CurrentState, SubscribeTransitionCallBack, Transition}
import akka.actor.{ActorRef, ActorRefFactory, ActorSystem}
-import akka.stream.ActorMaterializer
import akka.testkit.{ImplicitSender, TestActor, TestFSMRef, TestKit, TestProbe}
import common.StreamLogging
import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
@@ -55,7 +54,6 @@
override def afterAll = TestKit.shutdownActorSystem(system)
- implicit val mt = ActorMaterializer()
implicit val ec = system.dispatcher
val config = new WhiskConfig(ExecManifest.requiredProperties)
@@ -351,7 +349,7 @@
TestActor.KeepRunning
})
- probe.expectMsg(5.seconds, Transition(fsm, Unhealthy, Healthy))
+ probe.expectMsg(10.seconds, Transition(fsm, Unhealthy, Healthy))
dataManagementService.expectMsg(
UpdateDataOnChange(
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerRoutesTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerRoutesTests.scala
index 24f9736..88ad6d9 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerRoutesTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerRoutesTests.scala
@@ -48,7 +48,7 @@
configureBuildInfo()
val controller =
- new Controller(instance, Runtimes(Set.empty, Set.empty, None), whiskConfig, system, materializer, logger)
+ new Controller(instance, Runtimes(Set.empty, Set.empty, None), whiskConfig, system, logger)
Get("/invokers/ready") ~> Route.seal(controller.internalInvokerHealth) ~> check {
status shouldBe InternalServerError
responseAs[JsObject].fields("unhealthy") shouldBe JsString("0/0")
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
index 7431f12..36e5403 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
@@ -109,7 +109,7 @@
val entityStore = WhiskEntityStore.datastore()
val authStore = WhiskAuthStore.datastore()
val logStore = SpiLoader.get[LogStoreProvider].instance(actorSystem)
- val activationStore = SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, logging)
+ val activationStore = SpiLoader.get[ActivationStoreProvider].instance(actorSystem, logging)
def deleteAction(doc: DocId)(implicit transid: TransactionId) = {
Await.result(WhiskAction.get(entityStore, doc) flatMap { doc =>
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/ArtifactActivationStoreBehaviorBase.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/ArtifactActivationStoreBehaviorBase.scala
index 070d35e..c11f1f1 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/ArtifactActivationStoreBehaviorBase.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/ArtifactActivationStoreBehaviorBase.scala
@@ -17,7 +17,6 @@
package org.apache.openwhisk.core.database
-import akka.stream.ActorMaterializer
import org.apache.openwhisk.core.controller.test.WhiskAuthHelpers
import org.apache.openwhisk.core.database.test.behavior.ActivationStoreBehaviorBase
import org.scalatest.FlatSpec
@@ -28,6 +27,6 @@
override val context = UserContext(WhiskAuthHelpers.newIdentity())
override lazy val activationStore = {
- ArtifactActivationStoreProvider.instance(actorSystem, ActorMaterializer.create(actorSystem), logging)
+ ArtifactActivationStoreProvider.instance(actorSystem, logging)
}
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/ArtifactWithFileStorageActivationStoreTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/ArtifactWithFileStorageActivationStoreTests.scala
index 0517ebb..90074ca 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/ArtifactWithFileStorageActivationStoreTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/ArtifactWithFileStorageActivationStoreTests.scala
@@ -23,7 +23,6 @@
import akka.actor.ActorSystem
import akka.http.scaladsl.model.HttpRequest
-import akka.stream.ActorMaterializer
import akka.testkit.TestKit
import common.StreamLogging
import org.junit.runner.RunWith
@@ -56,7 +55,6 @@
implicit val transid: TransactionId = TransactionId.testing
implicit val notifier: Option[CacheChangeNotification] = None
- private val materializer = ActorMaterializer()
private val uuid = UUID()
private val subject = Subject()
private val user =
@@ -136,7 +134,7 @@
it should "store activations in artifact store and to file without result" in {
val config = ArtifactWithFileStorageActivationStoreConfig("userlogs", "logs", "namespaceId", false)
- val activationStore = new ArtifactWithFileStorageActivationStore(system, materializer, logging, config)
+ val activationStore = new ArtifactWithFileStorageActivationStore(system, logging, config)
val logDir = new File(new File(".").getCanonicalPath, config.logPath)
try {
@@ -187,7 +185,7 @@
it should "store activations in artifact store and to file with result" in {
val config = ArtifactWithFileStorageActivationStoreConfig("userlogs", "logs", "namespaceId", true)
- val activationStore = new ArtifactWithFileStorageActivationStore(system, materializer, logging, config)
+ val activationStore = new ArtifactWithFileStorageActivationStore(system, logging, config)
val logDir = new File(new File(".").getCanonicalPath, config.logPath)
try {
@@ -254,19 +252,18 @@
class ArtifactWithFileStorageActivationStoreExtendedTest(
actorSystem: ActorSystem,
- actorMaterializer: ActorMaterializer,
logging: Logging,
config: ArtifactWithFileStorageActivationStoreConfigExtendedTest =
loadConfigOrThrow[ArtifactWithFileStorageActivationStoreConfigExtendedTest](
ConfigKeys.activationStoreWithFileStorage))
- extends ArtifactActivationStore(actorSystem, actorMaterializer, logging) {
+ extends ArtifactActivationStore(actorSystem, logging) {
private val activationFileStorage =
new ActivationFileStorage(
config.logFilePrefix,
Paths.get(config.logPath),
config.writeResultToFile,
- actorMaterializer,
+ actorSystem,
logging)
def getLogFile = activationFileStorage.getLogFile
@@ -275,7 +272,8 @@
// other simple example for the flag: (includeResult && !activation.response.isSuccess)
override def store(activation: WhiskActivation, context: UserContext)(
- implicit transid: TransactionId,
+ implicit
+ transid: TransactionId,
notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
val additionalFields = Map(config.userIdField -> context.user.namespace.uuid.toJson)
@@ -299,7 +297,7 @@
ArtifactWithFileStorageActivationStoreConfigExtendedTest("userlogs", "logs", "namespaceId", !includeResult)
val activationStore =
- new ArtifactWithFileStorageActivationStoreExtendedTest(system, materializer, logging, config)
+ new ArtifactWithFileStorageActivationStoreExtendedTest(system, logging, config)
val logDir = new File(new File(".").getCanonicalPath, config.logPath)
try {
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/CouchDBStoreBehaviorBase.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/CouchDBStoreBehaviorBase.scala
index 765232f..2730779 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/CouchDBStoreBehaviorBase.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/CouchDBStoreBehaviorBase.scala
@@ -44,8 +44,7 @@
WhiskEntityJsonFormat,
WhiskDocumentReader,
actorSystem,
- logging,
- materializer)
+ logging)
override val activationStore = {
implicit val docReader: DocumentReader = WhiskDocumentReader
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/WhiskAdminCliTestBase.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/WhiskAdminCliTestBase.scala
index 2a18d4c..f7ee5d0 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/WhiskAdminCliTestBase.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/WhiskAdminCliTestBase.scala
@@ -17,7 +17,6 @@
package org.apache.openwhisk.core.database
-import akka.stream.ActorMaterializer
import common.{StreamLogging, WskActorSystem}
import org.rogach.scallop.throwError
import org.scalatest.concurrent.ScalaFutures
@@ -38,7 +37,6 @@
with ScalaFutures
with Matchers {
- implicit val materializer = ActorMaterializer()
//Bring in sync the timeout used by ScalaFutures and DBUtils
implicit override val patienceConfig: PatienceConfig = PatienceConfig(timeout = dbOpTimeout)
protected val authStore = WhiskAuthStore.datastore()
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlob.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlob.scala
index 0dadcb7..97ff656 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlob.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlob.scala
@@ -18,7 +18,6 @@
package org.apache.openwhisk.core.database.azblob
import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
import com.typesafe.config.ConfigFactory
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.core.database.{AttachmentStore, DocumentSerializer}
@@ -30,8 +29,7 @@
def azureCdnConfig: String = ""
def makeAzureStore[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): AttachmentStore = {
+ logging: Logging): AttachmentStore = {
val config = ConfigFactory.parseString(s"""
|whisk {
| azure-blob {
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStoreBehaviorBase.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStoreBehaviorBase.scala
index 825a445..d8a4434 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStoreBehaviorBase.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStoreBehaviorBase.scala
@@ -18,7 +18,6 @@
package org.apache.openwhisk.core.database.azblob
import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.core.database.{AttachmentStore, DocumentSerializer}
import org.apache.openwhisk.core.database.memory.{MemoryArtifactStoreBehaviorBase, MemoryArtifactStoreProvider}
@@ -37,8 +36,6 @@
with AttachmentStoreBehaviors {
override lazy val store = makeAzureStore[WhiskEntity]
- override implicit val materializer: ActorMaterializer = ActorMaterializer()
-
override val prefix = s"attachmentTCK_${Random.alphanumeric.take(4).mkString}"
override protected def beforeAll(): Unit = {
@@ -50,6 +47,5 @@
makeAzureStore[D]()
def makeAzureStore[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): AttachmentStore
+ logging: Logging): AttachmentStore
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBStoreBehaviorBase.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBStoreBehaviorBase.scala
index b78bdbc..8ca9b14 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBStoreBehaviorBase.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBStoreBehaviorBase.scala
@@ -50,8 +50,7 @@
WhiskEntityJsonFormat,
WhiskDocumentReader,
actorSystem,
- logging,
- materializer)
+ logging)
override lazy val activationStore = {
implicit val docReader: DocumentReader = WhiskDocumentReader
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupportTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupportTests.scala
index 6723e16..78de424 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupportTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupportTests.scala
@@ -17,7 +17,6 @@
package org.apache.openwhisk.core.database.cosmosdb
-import akka.stream.ActorMaterializer
import com.microsoft.azure.cosmosdb.IndexKind.Range
import com.microsoft.azure.cosmosdb.DataType.String
import com.microsoft.azure.cosmosdb.DocumentCollection
@@ -50,8 +49,6 @@
behavior of "CosmosDB init"
- protected implicit val materializer: ActorMaterializer = ActorMaterializer()
-
it should "create and update index" in {
val testDb = createTestDB()
val config: CosmosDBConfig = storeConfig.copy(db = testDb.getId)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala
index 97c6c45..e4d2528 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala
@@ -21,7 +21,6 @@
import akka.Done
import akka.actor.CoordinatedShutdown
import akka.kafka.testkit.scaladsl.{EmbeddedKafkaLike, ScalatestKafkaSpec}
-import akka.stream.ActorMaterializer
import com.typesafe.config.ConfigFactory
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.kafka.common.KafkaException
@@ -56,7 +55,6 @@
with ScalaFutures
with TryValues {
- private implicit val materializer: ActorMaterializer = ActorMaterializer()
private implicit val logging = new AkkaLogging(system.log)
implicit override val patienceConfig: PatienceConfig = PatienceConfig(timeout = 300.seconds)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/memory/MemoryArtifactStoreBehaviorBase.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/memory/MemoryArtifactStoreBehaviorBase.scala
index 8ca2d7c..87f5ae4 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/memory/MemoryArtifactStoreBehaviorBase.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/memory/MemoryArtifactStoreBehaviorBase.scala
@@ -50,8 +50,7 @@
WhiskEntityJsonFormat,
WhiskDocumentReader,
actorSystem,
- logging,
- materializer)
+ logging)
override lazy val activationStore = {
implicit val docReader: DocumentReader = WhiskDocumentReader
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/s3/S3AttachmentStoreBehaviorBase.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/s3/S3AttachmentStoreBehaviorBase.scala
index 9226a4f..aadab44 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/s3/S3AttachmentStoreBehaviorBase.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/s3/S3AttachmentStoreBehaviorBase.scala
@@ -18,7 +18,6 @@
package org.apache.openwhisk.core.database.s3
import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
import org.scalatest.FlatSpec
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.core.database.{AttachmentStore, DocumentSerializer}
@@ -37,8 +36,6 @@
with AttachmentStoreBehaviors {
override lazy val store = makeS3Store[WhiskEntity]
- override implicit val materializer: ActorMaterializer = ActorMaterializer()
-
override val prefix = s"attachmentTCK_${Random.alphanumeric.take(4).mkString}"
override protected def beforeAll(): Unit = {
@@ -50,6 +47,5 @@
makeS3Store[D]()
def makeS3Store[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): AttachmentStore
+ logging: Logging): AttachmentStore
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/s3/S3Aws.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/s3/S3Aws.scala
index 2a8f558..722997a 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/s3/S3Aws.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/s3/S3Aws.scala
@@ -18,7 +18,6 @@
package org.apache.openwhisk.core.database.s3
import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
import com.typesafe.config.ConfigFactory
import org.scalatest.FlatSpec
import org.apache.openwhisk.common.Logging
@@ -31,8 +30,7 @@
def cloudFrontConfig: String = ""
def makeS3Store[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): AttachmentStore = {
+ logging: Logging): AttachmentStore = {
val config = ConfigFactory.parseString(s"""
|whisk {
| s3 {
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/s3/S3Minio.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/s3/S3Minio.scala
index 0431f70..1a3bd83 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/s3/S3Minio.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/s3/S3Minio.scala
@@ -21,7 +21,6 @@
import actionContainers.ActionContainer
import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials}
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.amazonaws.services.s3.AmazonS3ClientBuilder
@@ -36,8 +35,7 @@
trait S3Minio extends FlatSpec with BeforeAndAfterAll with StreamLogging {
def makeS3Store[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): AttachmentStore = {
+ logging: Logging): AttachmentStore = {
val config = ConfigFactory.parseString(s"""
|whisk {
| s3 {
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/test/AttachmentCompatibilityTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/test/AttachmentCompatibilityTests.scala
index 39fad56..6965113 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/test/AttachmentCompatibilityTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/test/AttachmentCompatibilityTests.scala
@@ -21,7 +21,6 @@
import java.util.Base64
import akka.http.scaladsl.model.{ContentType, StatusCodes}
-import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Source, StreamConverters}
import akka.util.ByteString
import common.{StreamLogging, WskActorSystem}
@@ -59,7 +58,6 @@
//Bring in sync the timeout used by ScalaFutures and DBUtils
implicit override val patienceConfig: PatienceConfig = PatienceConfig(timeout = dbOpTimeout)
- implicit val materializer = ActorMaterializer()
val creds = WhiskAuthHelpers.newIdentity()
val namespace = EntityPath(creds.subject.asString)
@@ -248,7 +246,6 @@
WhiskEntityJsonFormat,
WhiskDocumentReader,
actorSystem,
- logging,
- materializer)
+ logging)
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/test/AttachmentStoreBehaviors.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/test/AttachmentStoreBehaviors.scala
index e63e99c..b0c537d 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/test/AttachmentStoreBehaviors.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/test/AttachmentStoreBehaviors.scala
@@ -20,7 +20,6 @@
import java.io.ByteArrayInputStream
import akka.http.scaladsl.model.ContentTypes
-import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source, StreamConverters}
import akka.util.{ByteString, ByteStringBuilder}
import common.{StreamLogging, WskActorSystem}
@@ -47,8 +46,6 @@
//Bring in sync the timeout used by ScalaFutures and DBUtils
implicit override val patienceConfig: PatienceConfig = PatienceConfig(timeout = dbOpTimeout)
- protected implicit val materializer: ActorMaterializer = ActorMaterializer()
-
protected val prefix = s"attachmentTCK_${Random.alphanumeric.take(4).mkString}"
private val attachmentsToDelete = ListBuffer[String]()
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/test/AttachmentSupportTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/test/AttachmentSupportTests.scala
index a3b9a27..28937d3 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/test/AttachmentSupportTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/test/AttachmentSupportTests.scala
@@ -19,7 +19,6 @@
import akka.http.scaladsl.model.Uri
import akka.stream.scaladsl.Source
-import akka.stream.{ActorMaterializer, Materializer}
import akka.util.CompactByteString
import common.WskActorSystem
import org.junit.runner.RunWith
@@ -36,8 +35,6 @@
behavior of "Attachment inlining"
- implicit val materializer: Materializer = ActorMaterializer()
-
it should "not inline if maxInlineSize set to zero" in {
val inliner = new AttachmentSupportTestMock(InliningConfig(maxInlineSize = 0.KB))
val bs = CompactByteString("hello world")
@@ -49,7 +46,6 @@
}
class AttachmentSupportTestMock(val inliningConfig: InliningConfig) extends AttachmentSupport[WhiskEntity] {
- override protected[core] implicit val materializer: Materializer = ActorMaterializer()
override protected def attachmentScheme: String = "test"
override protected def executionContext = actorSystem.dispatcher
override protected[database] def put(d: WhiskEntity)(implicit transid: TransactionId) = ???
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/test/BatcherTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/test/BatcherTests.scala
index dc2a02a..9cbfa15 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/test/BatcherTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/test/BatcherTests.scala
@@ -20,7 +20,6 @@
import java.time.Instant
import java.util.concurrent.atomic.AtomicInteger
-import akka.stream.ActorMaterializer
import common.{LoggedFunction, WskActorSystem}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
@@ -34,7 +33,6 @@
@RunWith(classOf[JUnitRunner])
class BatcherTests extends FlatSpec with Matchers with WskActorSystem {
- implicit val materializer: ActorMaterializer = ActorMaterializer()
def await[V](f: Future[V]) = Await.result(f, 10.seconds)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ActivationStoreBehaviorBase.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ActivationStoreBehaviorBase.scala
index d1cf892..3b7a66a 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ActivationStoreBehaviorBase.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ActivationStoreBehaviorBase.scala
@@ -19,7 +19,6 @@
import java.time.Instant
-import akka.stream.ActorMaterializer
import common.{StreamLogging, WskActorSystem}
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.database.{ActivationStore, CacheChangeNotification, UserContext}
@@ -44,7 +43,6 @@
with IntegrationPatience
with BeforeAndAfterEach {
- protected implicit val materializer: ActorMaterializer = ActorMaterializer()
protected implicit val notifier: Option[CacheChangeNotification] = None
def context: UserContext
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ArtifactStoreBehaviorBase.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ArtifactStoreBehaviorBase.scala
index 6e3e8f5..4a9510d 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ArtifactStoreBehaviorBase.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ArtifactStoreBehaviorBase.scala
@@ -19,7 +19,6 @@
import java.time.Instant
-import akka.stream.ActorMaterializer
import common.{StreamLogging, WskActorSystem}
import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures}
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec, Matchers}
@@ -50,8 +49,6 @@
//Bring in sync the timeout used by ScalaFutures and DBUtils
implicit override val patienceConfig: PatienceConfig = PatienceConfig(timeout = dbOpTimeout)
- protected implicit val materializer: ActorMaterializer = ActorMaterializer()
-
protected val prefix = s"artifactTCK_${Random.alphanumeric.take(4).mkString}"
def authStore: ArtifactStore[WhiskAuth]
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/DatastoreTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/DatastoreTests.scala
index eca4b2c..c1c8ae3 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/DatastoreTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/DatastoreTests.scala
@@ -23,7 +23,6 @@
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner
-import akka.stream.ActorMaterializer
import common.{StreamLogging, WskActorSystem}
import org.apache.openwhisk.common.WhiskInstants
import org.mockito.Mockito._
@@ -44,7 +43,6 @@
with StreamLogging
with WhiskInstants {
- implicit val materializer = ActorMaterializer()
val namespace = EntityPath("test namespace")
val datastore = WhiskEntityStore.datastore()
val authstore = WhiskAuthStore.datastore()
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ViewTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ViewTests.scala
index be4bb3e..df3b4e3 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ViewTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ViewTests.scala
@@ -25,7 +25,6 @@
import org.junit.runner.RunWith
import org.scalatest._
import org.scalatest.junit.JUnitRunner
-import akka.stream.ActorMaterializer
import common.StreamLogging
import common.WskActorSystem
import org.apache.openwhisk.core.controller.test.WhiskAuthHelpers
@@ -62,8 +61,6 @@
val creds2 = WhiskAuthHelpers.newAuth(Subject("t12345"))
val namespace2 = EntityPath(creds2.subject.asString)
- implicit val materializer = ActorMaterializer()
-
val entityStore = WhiskEntityStore.datastore()
val activationStore = WhiskActivationStore.datastore()
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/ContainerMessageConsumerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/ContainerMessageConsumerTests.scala
index 5ceddfb..89abe7b 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/ContainerMessageConsumerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/ContainerMessageConsumerTests.scala
@@ -20,7 +20,6 @@
import java.nio.charset.StandardCharsets
import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
import akka.testkit.{TestKit, TestProbe}
import common.StreamLogging
import org.apache.kafka.clients.producer.RecordMetadata
@@ -61,7 +60,6 @@
implicit val actualActorSystem = system // Use system for duplicate system and actorSystem.
implicit val ec = actualActorSystem.dispatcher
- implicit val materializer = ActorMaterializer()
implicit val transId = TransactionId.testing
implicit val creationId = CreationId.generate()
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/NamespaceBlacklistTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/NamespaceBlacklistTests.scala
index 51df0e9..686c172 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/NamespaceBlacklistTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/NamespaceBlacklistTests.scala
@@ -17,7 +17,6 @@
package org.apache.openwhisk.core.invoker.test
-import akka.stream.ActorMaterializer
import common.{StreamLogging, WskActorSystem}
import org.junit.runner.RunWith
import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures}
@@ -44,7 +43,6 @@
behavior of "NamespaceBlacklist"
- implicit val materializer = ActorMaterializer()
implicit val tid = TransactionId.testing
val authStore = WhiskAuthStore.datastore()
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index dbadbce..907264c 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -20,7 +20,6 @@
import akka.actor.ActorRef
import akka.actor.ActorRefFactory
import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
import akka.testkit.TestProbe
import common.{StreamLogging, WhiskProperties}
import java.nio.charset.StandardCharsets
@@ -409,7 +408,6 @@
}
- implicit val am = ActorMaterializer()
val config = new WhiskConfig(ExecManifest.requiredProperties)
val invokerMem = 2000.MB
val concurrencyEnabled = Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala
index 75c913e..16c6f1c 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala
@@ -18,7 +18,6 @@
package org.apache.openwhisk.core.scheduler.grpc.test
import akka.actor.{Actor, ActorSystem, Props}
-import akka.stream.ActorMaterializer
import akka.testkit.{ImplicitSender, TestKit}
import common.StreamLogging
import org.apache.openwhisk.common.TransactionId
@@ -61,7 +60,6 @@
behavior of "ActivationService"
- implicit val mat = ActorMaterializer()
implicit val ec = system.dispatcher
val messageTransId = TransactionId(TransactionId.testing.meta.id)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheckerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheckerTests.scala
index 8b30d60..4f6d33e 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheckerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheckerTests.scala
@@ -17,7 +17,6 @@
package org.apache.openwhisk.core.scheduler.queue.test
-import akka.stream.ActorMaterializer
import com.sksamuel.elastic4s.http.ElasticDsl._
import com.sksamuel.elastic4s.http.{ElasticClient, ElasticProperties, NoOpRequestConfigCallback}
import common._
@@ -66,7 +65,6 @@
val wskadmin: RunCliCmd = new RunCliCmd {
override def baseCommand: mutable.Buffer[String] = WskAdmin.baseCommand
}
- implicit val mt: ActorMaterializer = ActorMaterializer()
implicit val ec: ExecutionContextExecutor = actorSystem.dispatcher
implicit val timeoutConfig: PatienceConfig = PatienceConfig(5 seconds, 15 milliseconds)
diff --git a/tests/src/test/scala/org/apache/openwhisk/http/PoolingRestClientTests.scala b/tests/src/test/scala/org/apache/openwhisk/http/PoolingRestClientTests.scala
index be71858..0afdd62 100644
--- a/tests/src/test/scala/org/apache/openwhisk/http/PoolingRestClientTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/http/PoolingRestClientTests.scala
@@ -24,7 +24,6 @@
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.Flow
-import akka.stream.ActorMaterializer
import akka.testkit.TestKit
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.HttpMethods.{GET, POST}
@@ -49,7 +48,6 @@
with ScalaFutures
with StreamLogging {
implicit val ec: ExecutionContext = system.dispatcher
- implicit val materializer: ActorMaterializer = ActorMaterializer()
override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
diff --git a/tests/src/test/scala/org/apache/openwhisk/standalone/StandaloneUserEventTests.scala b/tests/src/test/scala/org/apache/openwhisk/standalone/StandaloneUserEventTests.scala
index 10b2fca..b89cf53 100644
--- a/tests/src/test/scala/org/apache/openwhisk/standalone/StandaloneUserEventTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/standalone/StandaloneUserEventTests.scala
@@ -17,7 +17,6 @@
package org.apache.openwhisk.standalone
-import akka.stream.ActorMaterializer
import common.{FreePortFinder, WskProps}
import org.apache.openwhisk.common.UserEventTests
import org.junit.runner.RunWith
@@ -25,7 +24,6 @@
@RunWith(classOf[JUnitRunner])
class StandaloneUserEventTests extends UserEventTests with StandaloneServerFixture {
- private implicit val materializer: ActorMaterializer = ActorMaterializer()
private val kafkaPort = sys.props.get("whisk.kafka.port").map(_.toInt).getOrElse(FreePortFinder.freePort())
protected override val customConfig = Some("""
diff --git a/tests/src/test/scala/org/apache/openwhisk/test/http/RESTProxy.scala b/tests/src/test/scala/org/apache/openwhisk/test/http/RESTProxy.scala
index c4a868e..aa56643 100644
--- a/tests/src/test/scala/org/apache/openwhisk/test/http/RESTProxy.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/test/http/RESTProxy.scala
@@ -26,8 +26,6 @@
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers._
-import akka.stream.ActorMaterializer
-import akka.stream.Materializer
import akka.stream.scaladsl._
import akka.pattern.ask
import akka.pattern.pipe
@@ -52,7 +50,6 @@
private val destPort = serviceAuthority.port
// These change as connections come and go
- private var materializer: Option[ActorMaterializer] = None
private var binding: Option[Http.ServerBinding] = None
// Public messages
@@ -64,8 +61,7 @@
private case class Request(request: HttpRequest)
// Route requests through messages to this actor, to serialize w.r.t events such as unbinding
- private def mkRequestFlow(materializer: Materializer): Flow[HttpRequest, HttpResponse, _] = {
- implicit val m = materializer
+ private def mkRequestFlow: Flow[HttpRequest, HttpResponse, _] = {
Flow.apply[HttpRequest].mapAsync(4) { request =>
ask(self, Request(request))(timeout = Timeout(1.minute)).mapTo[HttpResponse]
@@ -76,11 +72,8 @@
assert(!checkState || binding.isEmpty, "Proxy is already bound")
if (binding.isEmpty) {
- assert(materializer.isEmpty)
- implicit val m = ActorMaterializer()
- materializer = Some(m)
log.debug(s"[RESTProxy] Binding to '$host:$port'.")
- val b = Await.result(Http().bindAndHandle(mkRequestFlow(m), host, port), 5.seconds)
+ val b = Await.result(Http().newServerAt(host, port).bindFlow(mkRequestFlow), 5.seconds)
binding = Some(b)
}
}
@@ -92,11 +85,6 @@
log.debug(s"[RESTProxy] Unbinding from '${b.localAddress}'")
Await.result(b.unbind(), 5.seconds)
binding = None
- assert(materializer.isDefined)
- materializer.foreach { m =>
- materializer = None
- m.shutdown()
- }
}
}
@@ -118,26 +106,26 @@
case Request(request) =>
// If the actor isn't bound to the port / has no materializer,
// the request is simply dropped.
- materializer.map { implicit m =>
- log.debug(s"[RESTProxy] Proxying '${request.uri}' to '${serviceAuthority}'")
- val flow = if (useHTTPS) {
- Http().outgoingConnectionHttps(destHost, destPort)
- } else {
- Http().outgoingConnection(destHost, destPort)
- }
+ log.debug(s"[RESTProxy] Proxying '${request.uri}' to '${serviceAuthority}'")
- // akka-http doesn't like us to set those headers ourselves.
- val upstreamRequest = request.copy(headers = request.headers.filter(_ match {
- case `Timeout-Access`(_) => false
- case _ => true
- }))
-
- Source
- .single(upstreamRequest)
- .via(flow)
- .runWith(Sink.head)
- .pipeTo(sender)
+ val flow = if (useHTTPS) {
+ Http().outgoingConnectionHttps(destHost, destPort)
+ } else {
+ Http().outgoingConnection(destHost, destPort)
}
+
+ // akka-http doesn't like us to set those headers ourselves.
+ val upstreamRequest = request.withHeaders(headers = request.headers.filter(_ match {
+ case `Timeout-Access`(_) => false
+ case _ => true
+ }))
+
+ Source
+ .single(upstreamRequest)
+ .via(flow)
+ .runWith(Sink.head)
+ .pipeTo(sender)
+
}
}
diff --git a/tests/src/test/scala/services/HeadersTests.scala b/tests/src/test/scala/services/HeadersTests.scala
index b45bbaa..6ed49b9 100644
--- a/tests/src/test/scala/services/HeadersTests.scala
+++ b/tests/src/test/scala/services/HeadersTests.scala
@@ -48,7 +48,6 @@
import akka.http.scaladsl.model.headers._
import akka.http.scaladsl.model.HttpMethod
import akka.http.scaladsl.model.HttpHeader
-import akka.stream.ActorMaterializer
import common.WskActorSystem
import pureconfig._
@@ -57,8 +56,6 @@
behavior of "Headers at general API"
- implicit val materializer = ActorMaterializer()
-
val controllerProtocol = loadConfigOrThrow[String]("whisk.controller.protocol")
val whiskAuth = WhiskProperties.getBasicAuth
val creds = BasicHttpCredentials(whiskAuth.fst, whiskAuth.snd)
diff --git a/tools/admin/src/main/scala/org/apache/openwhisk/core/cli/Main.scala b/tools/admin/src/main/scala/org/apache/openwhisk/core/cli/Main.scala
index 8e5a5f1..134b712 100644
--- a/tools/admin/src/main/scala/org/apache/openwhisk/core/cli/Main.scala
+++ b/tools/admin/src/main/scala/org/apache/openwhisk/core/cli/Main.scala
@@ -21,7 +21,6 @@
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
-import akka.stream.ActorMaterializer
import ch.qos.logback.classic.{Level, LoggerContext}
import org.rogach.scallop._
import org.slf4j.LoggerFactory
@@ -100,7 +99,6 @@
private def executeWithSystem(conf: Conf)(implicit actorSystem: ActorSystem): Int = {
implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
- implicit val materializer = ActorMaterializer.create(actorSystem)
val admin = new WhiskAdmin(conf)
val result = Try {
@@ -181,9 +179,7 @@
}
}
-case class WhiskAdmin(conf: Conf)(implicit val actorSystem: ActorSystem,
- implicit val materializer: ActorMaterializer,
- implicit val logging: Logging) {
+case class WhiskAdmin(conf: Conf)(implicit val actorSystem: ActorSystem, implicit val logging: Logging) {
implicit val tid = TransactionId(TransactionId.systemPrefix + "cli")
def executeCommand(): Future[Either[CommandError, String]] = {
conf.subcommands match {
diff --git a/tools/admin/src/main/scala/org/apache/openwhisk/core/database/LimitsCommand.scala b/tools/admin/src/main/scala/org/apache/openwhisk/core/database/LimitsCommand.scala
index 488c3bd..0b85ca7 100644
--- a/tools/admin/src/main/scala/org/apache/openwhisk/core/database/LimitsCommand.scala
+++ b/tools/admin/src/main/scala/org/apache/openwhisk/core/database/LimitsCommand.scala
@@ -18,7 +18,6 @@
package org.apache.openwhisk.core.database
import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.cli.{CommandError, CommandMessages, IllegalState, WhiskCommand}
import org.apache.openwhisk.core.database.LimitsCommand.LimitEntity
@@ -107,7 +106,6 @@
def exec(cmd: ScallopConfBase)(implicit system: ActorSystem,
logging: Logging,
- materializer: ActorMaterializer,
transid: TransactionId): Future[Either[CommandError, String]] = {
implicit val executionContext = system.dispatcher
val authStore = LimitsCommand.createDataStore()
@@ -175,12 +173,10 @@
object LimitsCommand {
def limitIdOf(name: EntityName) = DocId(s"${name.name}/limits")
- def createDataStore()(implicit system: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): ArtifactStore[WhiskAuth] =
+ def createDataStore()(implicit system: ActorSystem, logging: Logging): ArtifactStore[WhiskAuth] =
SpiLoader
.get[ArtifactStoreProvider]
- .makeStore[WhiskAuth]()(classTag[WhiskAuth], LimitsFormat, WhiskDocumentReader, system, logging, materializer)
+ .makeStore[WhiskAuth]()(classTag[WhiskAuth], LimitsFormat, WhiskDocumentReader, system, logging)
class LimitEntity(val name: EntityName, val limits: UserLimits) extends WhiskAuth(Subject(), Set.empty) {
override def docid: DocId = limitIdOf(name)
diff --git a/tools/admin/src/main/scala/org/apache/openwhisk/core/database/UserCommand.scala b/tools/admin/src/main/scala/org/apache/openwhisk/core/database/UserCommand.scala
index 6f41f63..bfeee59 100644
--- a/tools/admin/src/main/scala/org/apache/openwhisk/core/database/UserCommand.scala
+++ b/tools/admin/src/main/scala/org/apache/openwhisk/core/database/UserCommand.scala
@@ -20,7 +20,6 @@
import java.util.UUID
import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.cli.{CommandError, CommandMessages, IllegalState, WhiskCommand}
@@ -144,7 +143,6 @@
def exec(cmd: ScallopConfBase)(implicit system: ActorSystem,
logging: Logging,
- materializer: ActorMaterializer,
transid: TransactionId): Future[Either[CommandError, String]] = {
implicit val executionContext = system.dispatcher
val authStore = UserCommand.createDataStore()
@@ -283,7 +281,7 @@
def changeUserState(authStore: AuthStore, subjects: List[String], blocked: Boolean)(
implicit transid: TransactionId,
- materializer: ActorMaterializer,
+ system: ActorSystem,
ec: ExecutionContext): Future[Either[CommandError, String]] = {
Source(subjects)
.mapAsync(1)(changeUserState(authStore, _, blocked))
@@ -320,18 +318,10 @@
}
object UserCommand {
- def createDataStore()(implicit system: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): ArtifactStore[WhiskAuth] =
+ def createDataStore()(implicit system: ActorSystem, logging: Logging): ArtifactStore[WhiskAuth] =
SpiLoader
.get[ArtifactStoreProvider]
- .makeStore[WhiskAuth]()(
- classTag[WhiskAuth],
- ExtendedAuthFormat,
- WhiskDocumentReader,
- system,
- logging,
- materializer)
+ .makeStore[WhiskAuth]()(classTag[WhiskAuth], ExtendedAuthFormat, WhiskDocumentReader, system, logging)
class ExtendedAuth(subject: Subject, namespaces: Set[WhiskNamespace], blocked: Option[Boolean])
extends WhiskAuth(subject, namespaces) {