blob: 844f0f83993d13ad135da44d9ee109ebcdf23fc9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.locks;
import java.util.Collections;
import java.util.List;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
/**
* For Handling the zookeeper locking implementation
*/
public class ZooKeeperLocking extends AbstractCarbonLock {
private static final Logger LOGGER =
LogServiceFactory.getLogService(ZooKeeperLocking.class.getName());
/**
* zk is the zookeeper client instance
*/
private static ZooKeeper zk;
/**
* zooKeeperLocation is the location in the zoo keeper file system where the locks will be
* maintained.
*/
private static final String zooKeeperLocation = CarbonCommonConstants.ZOOKEEPER_LOCATION;
/**
* Unique folder for each table with DatabaseName_TableName
*/
private final String tableIdFolder;
/**
* lockName is the name of the lock to use. This name should be same for every process that want
* to share the same lock
*/
private String lockName;
/**
* lockPath is the unique path created for the each instance of the carbon lock.
*/
private String lockPath;
private String lockTypeFolder;
public ZooKeeperLocking(AbsoluteTableIdentifier absoluteTableIdentifier, String lockFile) {
this(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName()
+ CarbonCommonConstants.FILE_SEPARATOR + absoluteTableIdentifier.getCarbonTableIdentifier()
.getTableName(), lockFile);
}
public static void initialize() {
String zooKeeperUrl =
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ZOOKEEPER_URL);
if (null == zk) {
zk = ZookeeperInit.getInstance(zooKeeperUrl).getZookeeper();
}
}
/**
* @param lockLocation
* @param lockFile
*/
public ZooKeeperLocking(String lockLocation, String lockFile) {
this.lockName = lockFile;
this.tableIdFolder = zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR
+ CarbonTablePath.getLockFilesDirPath(lockLocation);
initialize();
this.lockTypeFolder = tableIdFolder + CarbonCommonConstants.FILE_SEPARATOR + lockFile;
try {
createBaseNode();
// if exists returns null then path doesnt exist. so creating.
if (null == zk.exists(this.tableIdFolder, true)) {
createRecursively(this.tableIdFolder);
}
// if exists returns null then path doesnt exist. so creating.
if (null == zk.exists(this.lockTypeFolder, true)) {
zk.create(this.lockTypeFolder, new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException | InterruptedException e) {
LOGGER.error(e.getMessage(), e);
}
initRetry();
}
/**
* Creating a znode in which all the znodes (lock files )are maintained.
*/
private void createBaseNode() throws KeeperException, InterruptedException {
if (null == zk.exists(zooKeeperLocation, true)) {
// creating a znode in which all the znodes (lock files )are maintained.
zk.create(zooKeeperLocation, new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
/**
* Create zookeeper node if not exist
* @param path
* @throws KeeperException
* @throws InterruptedException
*/
private void createRecursively(String path) throws KeeperException, InterruptedException {
if (zk.exists(path, true) == null && path.length() > 0) {
String temp = path.substring(0, path.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR));
createRecursively(temp);
zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
/**
* Handling of the locking mechanism using zoo keeper.
*/
@Override
public boolean lock() {
try {
// create the lock file with lockName.
lockPath =
zk.create(this.lockTypeFolder + CarbonCommonConstants.FILE_SEPARATOR + lockName, null,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// get the children present in zooKeeperLocation.
List<String> nodes = zk.getChildren(this.lockTypeFolder, null);
// sort the children
Collections.sort(nodes);
// here the logic is , for each lock request zookeeper will create a file ending with
// incremental digits.
// so first request will be 00001 next is 00002 and so on.
// if the current request is 00002 and already one previous request(00001) is present then get
// children will give both nodes.
// after the sort we are checking if the lock path is first or not .if it is first then lock
// has been acquired.
if (lockPath.endsWith(nodes.get(0))) {
return true;
} else {
// if locking failed then deleting the created lock as next time again new lock file will be
// created.
zk.delete(lockPath, -1);
return false;
}
} catch (KeeperException | InterruptedException e) {
LOGGER.error(e.getMessage(), e);
return false;
}
}
/**
* @return status where lock file is unlocked or not.
*/
@Override
public boolean unlock() {
try {
// exists will return null if the path doesn't exists.
if (lockPath != null && null != zk.exists(lockPath, true)) {
zk.delete(lockPath, -1);
lockPath = null;
}
} catch (KeeperException | InterruptedException e) {
LOGGER.error(e.getMessage(), e);
return false;
}
return true;
}
}