blob: 23100edc6c3656a1720444bfdc1724acdb971ada [file] [log] [blame]
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.broker.namespace;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.broker.PulsarService;
public class OwnedServiceUnit {
private static final Logger LOG = LoggerFactory.getLogger(OwnedServiceUnit.class);
private final ServiceUnitId suName;
/**
* {@link #nsLock} is used to protect read/write access to {@link #active} flag and the corresponding code section
* based on {@link #active} flag
*/
private final ReentrantReadWriteLock nsLock = new ReentrantReadWriteLock();
private final AtomicBoolean isActive = new AtomicBoolean(true);
/**
* constructor
*
* @param nsname
*/
public OwnedServiceUnit(ServiceUnitId suName) {
this.suName = suName;
};
/**
* Constructor to allow set initial active flag
*
* @param nsname
* @param nssvc
* @param active
*/
public OwnedServiceUnit(ServiceUnitId suName, boolean active) {
this.suName = suName;
this.isActive.set(active);
}
/**
* Access to the namespace name
*
* @return NamespaceName
*/
public ServiceUnitId getServiceUnitId() {
return this.suName;
}
/**
* This method initiates the unload namespace process. It is invoked by Admin API
* <code>Namespaces.unloadNamespace()</code>.
*
* @param pulsar
* @param adminView
* @param ownershipCache
*
* @throws Exception
*/
public void handleUnloadRequest(PulsarService pulsar) throws Exception {
long unloadBundleStartTime = System.nanoTime();
// Need a per namespace RenetrantReadWriteLock
// Here to do a writeLock to set the flag and proceed to check and close connections
while (!this.nsLock.writeLock().tryLock(1, TimeUnit.SECONDS)) {
// Using tryLock to avoid deadlocks caused by 2 threads trying to acquire 2 readlocks (eg: JMS replicators)
// while a handleUnloadRequest happens in the middle
LOG.warn("Contention on OwnedServiceUnit rw lock. Retrying to acquire lock write lock");
}
try {
// set the flag locally s.t. no more producer/consumer to this namespace is allowed
if (!this.isActive.compareAndSet(true, false)) {
// An exception is thrown when the namespace is not in active state (i.e. another thread is
// removing/have removed it)
throw new IllegalStateException(
"Namespace is not active. ns:" + this.suName + "; state:" + this.isActive.get());
}
} finally {
// no matter success or not, unlock
this.nsLock.writeLock().unlock();
}
int unloadedTopics = 0;
try {
LOG.info("Disabling ownership: {}", this.suName);
pulsar.getNamespaceService().getOwnershipCache().updateBundleState(this.suName, false);
// close topics forcefully
try {
unloadedTopics = pulsar.getBrokerService().unloadServiceUnit(suName).get();
} catch (Exception e) {
// ignore topic-close failure to unload bundle
LOG.error("Failed to close topics under namespace {}", suName.toString(), e);
}
// delete ownership node on zk
try {
pulsar.getNamespaceService().getOwnershipCache().removeOwnership(suName);
} catch (Exception e) {
// Failed to remove ownership node: enable namespace-bundle again so, it can serve new topics
pulsar.getNamespaceService().getOwnershipCache().updateBundleState(this.suName, true);
throw new RuntimeException(String.format("Failed to delete ownership node %s", suName.toString()),
e.getCause());
}
} catch (Exception e) {
LOG.error(String.format("failed to unload a namespace. ns=%s", suName.toString()), e);
throw new RuntimeException(e);
}
double unloadBundleTime = TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - unloadBundleStartTime));
LOG.info("Unloading {} namespace-bundle with {} topics completed in {} ms", this.suName, unloadedTopics, unloadBundleTime);
}
/**
* Access method to the namespace state to check whether the namespace is active or not
*
* @return boolean value indicate that the namespace is active or not.
*/
public boolean isActive() {
return this.isActive.get();
}
public void setActive(boolean active) {
isActive.set(active);
}
}