blob: 67743b2228cb556aa3993bc29045c1226bf17129 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.bkjournal;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.List;
import java.util.Collections;
import java.util.Comparator;
import java.net.InetAddress;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Distributed lock, using ZooKeeper.
*
* The lock is vulnerable to timing issues. For example, the process could
* encounter a really long GC cycle between acquiring the lock, and writing to
* a ledger. This could have timed out the lock, and another process could have
* acquired the lock and started writing to bookkeeper. Therefore other
* mechanisms are required to ensure correctness (i.e. Fencing).
*/
class WriteLock implements Watcher {
static final Log LOG = LogFactory.getLog(WriteLock.class);
private final ZooKeeper zkc;
private final String lockpath;
private AtomicInteger lockCount = new AtomicInteger(0);
private String myznode = null;
WriteLock(ZooKeeper zkc, String lockpath) throws IOException {
this.lockpath = lockpath;
this.zkc = zkc;
try {
if (zkc.exists(lockpath, false) == null) {
String localString = InetAddress.getLocalHost().toString();
zkc.create(lockpath, localString.getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
throw new IOException("Exception accessing Zookeeper", e);
}
}
void acquire() throws IOException {
while (true) {
if (lockCount.get() == 0) {
try {
synchronized(this) {
if (lockCount.get() > 0) {
lockCount.incrementAndGet();
return;
}
myznode = zkc.create(lockpath + "/lock-", new byte[] {'0'},
Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
if (LOG.isTraceEnabled()) {
LOG.trace("Acquiring lock, trying " + myznode);
}
List<String> nodes = zkc.getChildren(lockpath, false);
Collections.sort(nodes, new Comparator<String>() {
public int compare(String o1,
String o2) {
Integer l1 = Integer.valueOf(o1.replace("lock-", ""));
Integer l2 = Integer.valueOf(o2.replace("lock-", ""));
return l1 - l2;
}
});
if ((lockpath + "/" + nodes.get(0)).equals(myznode)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Lock acquired - " + myznode);
}
lockCount.set(1);
zkc.exists(myznode, this);
return;
} else {
LOG.error("Failed to acquire lock with " + myznode
+ ", " + nodes.get(0) + " already has it");
throw new IOException("Could not acquire lock");
}
}
} catch (KeeperException e) {
throw new IOException("Exception accessing Zookeeper", e);
} catch (InterruptedException ie) {
throw new IOException("Exception accessing Zookeeper", ie);
}
} else {
int ret = lockCount.getAndIncrement();
if (ret == 0) {
lockCount.decrementAndGet();
continue; // try again;
} else {
return;
}
}
}
}
void release() throws IOException {
try {
if (lockCount.decrementAndGet() <= 0) {
if (lockCount.get() < 0) {
LOG.warn("Unbalanced lock handling somewhere, lockCount down to "
+ lockCount.get());
}
synchronized(this) {
if (lockCount.get() <= 0) {
if (LOG.isTraceEnabled()) {
LOG.trace("releasing lock " + myznode);
}
if (myznode != null) {
zkc.delete(myznode, -1);
myznode = null;
}
}
}
}
} catch (Exception e) {
throw new IOException("Exception accessing Zookeeper", e);
}
}
public void checkWriteLock() throws IOException {
if (!haveLock()) {
throw new IOException("Lost writer lock");
}
}
boolean haveLock() throws IOException {
return lockCount.get() > 0;
}
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.Disconnected
|| event.getState() == KeeperState.Expired) {
LOG.warn("Lost zookeeper session, lost lock ");
lockCount.set(0);
} else {
// reapply the watch
synchronized (this) {
LOG.info("Zookeeper event " + event
+ " received, reapplying watch to " + myznode);
if (myznode != null) {
try {
zkc.exists(myznode, this);
} catch (Exception e) {
LOG.warn("Could not set watch on lock, releasing", e);
try {
release();
} catch (IOException ioe) {
LOG.error("Could not release Zk lock", ioe);
}
}
}
}
}
}
}