blob: 6593212008c89b97ec804183b53fce40a58a2e22 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.service;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.util.Instrumentable;
import org.apache.oozie.util.Instrumentation;
import org.apache.oozie.lock.LockToken;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.ZKUtils;
import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.curator.framework.recipes.locks.ChildReaper;
import org.apache.curator.framework.recipes.locks.Reaper;
import org.apache.curator.utils.ThreadUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.MapMaker;
import org.apache.zookeeper.KeeperException;
/**
* Service that provides distributed locks via ZooKeeper. Requires that a ZooKeeper ensemble is available. The locks will be
* located under a ZNode named "locks" under the namespace (see {@link ZKUtils}). For example, with default settings, if the
* resource we're locking is called "foo", then the ZNode backing the lock will be at /oozie/locks/foo.
*/
public class ZKLocksService extends MemoryLocksService implements Service, Instrumentable {
private ZKUtils zk;
public static final String LOCKS_NODE = "/locks";
private static final XLog LOG = XLog.getLog(ZKLocksService.class);
private final ConcurrentMap<String, InterProcessReadWriteLock> zkLocks = new MapMaker().weakValues().makeMap();
private ChildReaper reaper = null;
private static final String REAPING_LEADER_PATH = ZKUtils.ZK_BASE_SERVICES_PATH + "/locksChildReaperLeaderPath";
static final String REAPING_THRESHOLD = CONF_PREFIX + "ZKLocksService.locks.reaper.threshold";
static final String REAPING_THREADS = CONF_PREFIX + "ZKLocksService.locks.reaper.threads";
private static final String RELEASE_RETRY_TIME_LIMIT_MINUTES = CONF_PREFIX + "ZKLocksService.lock.release.retry.time.limit"
+ ".minutes";
/**
* Initialize the zookeeper locks service
*
* @param services services instance.
*/
@Override
public void init(Services services) throws ServiceException {
super.init(services);
try {
zk = ZKUtils.register(this);
reaper = new ChildReaper(zk.getClient(), LOCKS_NODE, Reaper.Mode.REAP_UNTIL_GONE, getExecutorService(),
ConfigurationService.getInt(services.getConf(), REAPING_THRESHOLD) * 1000, REAPING_LEADER_PATH);
reaper.start();
}
catch (Exception ex) {
throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex);
}
}
/**
* Destroy the zookeeper locks service.
*/
@Override
public void destroy() {
if (reaper != null) {
try {
reaper.close();
}
catch (IOException e) {
LOG.error("Error closing childReaper", e);
}
}
if (zk != null) {
zk.unregister(this);
}
zk = null;
super.destroy();
}
/**
* Instruments the zookeeper locks service.
*
* @param instr instance to instrument the memory locks service to.
*/
@Override
public void instrument(Instrumentation instr) {
// Similar to MemoryLocksService's instrumentation, though this is only the number of locks this Oozie server currently has
instr.addVariable(INSTRUMENTATION_GROUP, "locks", new Instrumentation.Variable<Integer>() {
@Override
public Integer getValue() {
return zkLocks.size();
}
});
}
/**
* Obtain a READ lock for a source.
*
* @param resource resource name.
* @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait.
* @return the lock token for the resource, or <code>null</code> if the lock could not be obtained.
* @throws InterruptedException thrown if the thread was interrupted while waiting.
*/
@Override
public LockToken getReadLock(String resource, long wait) throws InterruptedException {
return acquireLock(resource, Type.READ, wait);
}
/**
* Obtain a WRITE lock for a source.
*
* @param resource resource name.
* @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait.
* @return the lock token for the resource, or <code>null</code> if the lock could not be obtained.
* @throws InterruptedException thrown if the thread was interrupted while waiting.
*/
@Override
public LockToken getWriteLock(String resource, long wait) throws InterruptedException {
return acquireLock(resource, Type.WRITE, wait);
}
private LockToken acquireLock(final String resource, final Type type, final long wait) throws InterruptedException {
LOG.debug("Acquiring ZooKeeper lock. [resource={};type={};wait={}]", resource, type, wait);
InterProcessReadWriteLock lockEntry;
final String zkPath = LOCKS_NODE + "/" + resource;
LOG.debug("Checking existing Curator lock or creating new one. [zkPath={}]", zkPath);
// Creating a Curator InterProcessReadWriteLock is lightweight - only calling acquire() costs real ZooKeeper calls
final InterProcessReadWriteLock newLockEntry = new InterProcessReadWriteLock(zk.getClient(), zkPath);
final InterProcessReadWriteLock existingLockEntry = zkLocks.putIfAbsent(resource, newLockEntry);
if (existingLockEntry == null) {
lockEntry = newLockEntry;
LOG.debug("No existing Curator lock present, new one created successfully. [zkPath={}]", zkPath);
}
else {
// We can't destoy newLockEntry and we don't have to - it's taken care of by Curator and JVM GC
lockEntry = existingLockEntry;
LOG.debug("Reusing existing Curator lock. [zkPath={}]", zkPath);
}
ZKLockToken token = null;
try {
LOG.debug("Calling Curator to acquire ZooKeeper lock. [resource={};type={};wait={}]", resource, type, wait);
final InterProcessMutex lock = (type.equals(Type.READ)) ? lockEntry.readLock() : lockEntry.writeLock();
if (wait == -1) {
lock.acquire();
token = new ZKLockToken(lockEntry, type);
LOG.debug("ZooKeeper lock acquired successfully. [resource={};type={}]", resource, type);
}
else if (lock.acquire(wait, TimeUnit.MILLISECONDS)) {
token = new ZKLockToken(lockEntry, type);
LOG.debug("ZooKeeper lock acquired successfully waiting. [resource={};type={};wait={}]", resource, type, wait);
}
else {
LOG.warn("Could not acquire ZooKeeper lock, timed out. [resource={};type={};wait={}]", resource, type, wait);
}
}
catch (final Exception ex) {
//Not throwing exception. Should return null, so that command can be requeued
LOG.warn("Could not acquire lock due to a ZooKeeper error. " +
"[ex={};resource={};type={};wait={}]", ex, resource, type, wait);
LOG.error("Error while acquiring lock", ex);
}
return token;
}
/**
* Implementation of {@link LockToken} for zookeeper locks.
*/
class ZKLockToken implements LockToken {
private final InterProcessReadWriteLock lockEntry;
private final Type type;
private ZKLockToken(InterProcessReadWriteLock lockEntry, Type type) {
this.lockEntry = lockEntry;
this.type = type;
}
/**
* Release the lock.
*/
@Override
public void release() {
try {
retriableRelease();
}
catch (Exception ex) {
LOG.warn("Could not release lock: " + ex.getMessage(), ex);
}
}
/**
* Retires on failure to release lock
*
* @throws InterruptedException
*/
private void retriableRelease() throws Exception {
long retryTimeLimit = TimeUnit.MINUTES.toSeconds(ConfigurationService.getLong(RELEASE_RETRY_TIME_LIMIT_MINUTES, 30));
int sleepSeconds = 10;
for(int retryCount = 1; retryTimeLimit>=0; retryTimeLimit -= sleepSeconds, retryCount++) {
try {
switch (type) {
case WRITE:
lockEntry.writeLock().release();
break;
case READ:
lockEntry.readLock().release();
break;
}
break;
}
catch (KeeperException.ConnectionLossException ex) {
LOG.warn("Could not release lock: " + ex.getMessage() + ". Retry will be after " + sleepSeconds + " seconds",
ex);
Thread.sleep(TimeUnit.SECONDS.toMillis(sleepSeconds));
LOG.info("Retrying to release lock. Retry number=" + retryCount);
}
}
}
}
@VisibleForTesting
public ConcurrentMap<String, InterProcessReadWriteLock> getLocks(){
return zkLocks;
}
private static ScheduledExecutorService getExecutorService() {
return ThreadUtils.newFixedThreadScheduledPool(ConfigurationService.getInt(REAPING_THREADS),
"ZKLocksChildReaper");
}
}