blob: 32d65cbf8d7e698c75a3a9e004459686299bedc8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.bkjournal;
import java.io.IOException;
import java.net.InetAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.apache.hadoop.contrib.bkjournal.BKJournalProtos.CurrentInprogressProto;
import com.google.protobuf.TextFormat;
import static com.google.common.base.Charsets.UTF_8;
/**
* Distributed write permission lock, using ZooKeeper. Read the version number
* and return the current inprogress node path available in CurrentInprogress
* path. If it exist, caller can treat that some other client already operating
* on it. Then caller can take action. If there is no inprogress node exist,
* then caller can treat that there is no client operating on it. Later same
* caller should update the his newly created inprogress node path. At this
* point, if some other activities done on this node, version number might
* change, so update will fail. So, this read, update api will ensure that there
* is only node can continue further after checking with CurrentInprogress.
*/
class CurrentInprogress {
static final Log LOG = LogFactory.getLog(CurrentInprogress.class);
private final ZooKeeper zkc;
private final String currentInprogressNode;
private volatile int versionNumberForPermission = -1;
private final String hostName = InetAddress.getLocalHost().toString();
CurrentInprogress(ZooKeeper zkc, String lockpath) throws IOException {
this.currentInprogressNode = lockpath;
this.zkc = zkc;
}
void init() throws IOException {
try {
Stat isCurrentInprogressNodeExists = zkc.exists(currentInprogressNode,
false);
if (isCurrentInprogressNodeExists == null) {
try {
zkc.create(currentInprogressNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} catch (NodeExistsException e) {
// Node might created by other process at the same time. Ignore it.
if (LOG.isDebugEnabled()) {
LOG.debug(currentInprogressNode + " already created by other process.",
e);
}
}
}
} catch (KeeperException e) {
throw new IOException("Exception accessing Zookeeper", e);
} catch (InterruptedException ie) {
throw new IOException("Interrupted accessing Zookeeper", ie);
}
}
/**
* Update the path with prepending version number and hostname
*
* @param path
* - to be updated in zookeeper
* @throws IOException
*/
void update(String path) throws IOException {
CurrentInprogressProto.Builder builder = CurrentInprogressProto.newBuilder();
builder.setPath(path).setHostname(hostName);
String content = TextFormat.printToString(builder.build());
try {
zkc.setData(this.currentInprogressNode, content.getBytes(UTF_8),
this.versionNumberForPermission);
} catch (KeeperException e) {
throw new IOException("Exception when setting the data "
+ "[" + content + "] to CurrentInprogress. ", e);
} catch (InterruptedException e) {
throw new IOException("Interrupted while setting the data "
+ "[" + content + "] to CurrentInprogress", e);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Updated data[" + content + "] to CurrentInprogress");
}
}
/**
* Read the CurrentInprogress node data from Zookeeper and also get the znode
* version number. Return the 3rd field from the data. i.e saved path with
* #update api
*
* @return available inprogress node path. returns null if not available.
* @throws IOException
*/
String read() throws IOException {
Stat stat = new Stat();
byte[] data = null;
try {
data = zkc.getData(this.currentInprogressNode, false, stat);
} catch (KeeperException e) {
throw new IOException("Exception while reading the data from "
+ currentInprogressNode, e);
} catch (InterruptedException e) {
throw new IOException("Interrupted while reading data from "
+ currentInprogressNode, e);
}
this.versionNumberForPermission = stat.getVersion();
if (data != null) {
CurrentInprogressProto.Builder builder = CurrentInprogressProto.newBuilder();
TextFormat.merge(new String(data, UTF_8), builder);
if (!builder.isInitialized()) {
throw new IOException("Invalid/Incomplete data in znode");
}
return builder.build().getPath();
} else {
LOG.debug("No data available in CurrentInprogress");
}
return null;
}
/** Clear the CurrentInprogress node data */
void clear() throws IOException {
try {
zkc.setData(this.currentInprogressNode, null, versionNumberForPermission);
} catch (KeeperException e) {
throw new IOException(
"Exception when setting the data to CurrentInprogress node", e);
} catch (InterruptedException e) {
throw new IOException(
"Interrupted when setting the data to CurrentInprogress node", e);
}
LOG.debug("Cleared the data from CurrentInprogress");
}
}