blob: 4a6942b33d9b87f81628c00b1463df7b99e81da9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.common
import scala.collection.concurrent.TrieMap
/**
* A Semaphore that coordinates the memory (ForcibleSemaphore) and concurrency (ResizableSemaphore) where
* - for invocations when maxConcurrent == 1, delegate to super
* - for invocations that cause acquire on memory slots, also acquire concurrency slots, and do it atomically
* @param memoryPermits
* @tparam T
*/
class NestedSemaphore[T](memoryPermits: Int) extends ForcibleSemaphore(memoryPermits) {
private val actionConcurrentSlotsMap = TrieMap.empty[T, ResizableSemaphore] //one key per action; resized per container
final def tryAcquireConcurrent(actionid: T, maxConcurrent: Int, memoryPermits: Int): Boolean = {
if (maxConcurrent == 1) {
super.tryAcquire(memoryPermits)
} else {
tryOrForceAcquireConcurrent(actionid, maxConcurrent, memoryPermits, false)
}
}
/**
* Coordinated permit acquisition:
* - first try to acquire concurrency slot
* - then try to acquire lock for this action
* - within the lock:
* - try to acquire concurrency slot (double check)
* - try to acquire memory slot
* - if memory slot acquired, release concurrency slots
* - release the lock
* - if neither concurrency slot nor memory slot acquired, return false
* @param actionid
* @param maxConcurrent
* @param memoryPermits
* @param force
* @return
*/
private def tryOrForceAcquireConcurrent(actionid: T,
maxConcurrent: Int,
memoryPermits: Int,
force: Boolean): Boolean = {
val concurrentSlots = actionConcurrentSlotsMap
.getOrElseUpdate(actionid, new ResizableSemaphore(0, maxConcurrent))
if (concurrentSlots.tryAcquire(1)) {
true
} else {
// with synchronized:
concurrentSlots.synchronized {
if (concurrentSlots.tryAcquire(1)) {
true
} else if (force) {
super.forceAcquire(memoryPermits)
concurrentSlots.release(maxConcurrent - 1, false)
true
} else if (super.tryAcquire(memoryPermits)) {
concurrentSlots.release(maxConcurrent - 1, false)
true
} else {
false
}
}
}
}
def forceAcquireConcurrent(actionid: T, maxConcurrent: Int, memoryPermits: Int): Unit = {
require(memoryPermits > 0, "cannot force acquire negative or no permits")
if (maxConcurrent == 1) {
super.forceAcquire(memoryPermits)
} else {
tryOrForceAcquireConcurrent(actionid, maxConcurrent, memoryPermits, true)
}
}
/**
* Releases the given amount of permits
*
* @param acquires the number of permits to release
*/
def releaseConcurrent(actionid: T, maxConcurrent: Int, memoryPermits: Int): Unit = {
require(memoryPermits > 0, "cannot release negative or no permits")
if (maxConcurrent == 1) {
super.release(memoryPermits)
} else {
val concurrentSlots = actionConcurrentSlotsMap(actionid)
val (memoryRelease, actionRelease) = concurrentSlots.release(1, true)
//concurrent slots
if (memoryRelease) {
super.release(memoryPermits)
}
if (actionRelease) {
actionConcurrentSlotsMap.remove(actionid)
}
}
}
//for testing
def concurrentState = actionConcurrentSlotsMap.readOnlySnapshot()
}