blob: deaef1025df4c8c23992ede9e4333b1864904d33 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.coord.zk;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nullable;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.drill.common.collections.ImmutableEntry;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.exception.VersionMismatchException;
import org.apache.drill.exec.store.sys.store.DataChangeVersion;
import org.apache.jute.BinaryInputArchive;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A namespace aware Zookeeper client.
*
* The implementation only operates under the given namespace and is safe to use.
*
* Note that instance of this class holds onto resources that must be released via {@code #close()}.
*/
public class ZookeeperClient implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(ZookeeperClient.class);
private final CuratorFramework curator;
private final String root;
private final PathChildrenCache cache;
private final CreateMode mode;
private final int MAX_DATA_LENGTH = BinaryInputArchive.maxBuffer;
public ZookeeperClient(final CuratorFramework curator, final String root, final CreateMode mode) {
this.curator = Preconditions.checkNotNull(curator, "curator is required");
Preconditions.checkArgument(!Strings.isNullOrEmpty(root), "root path is required");
Preconditions.checkArgument(root.charAt(0) == '/', "root path must be absolute");
this.root = root;
this.mode = Preconditions.checkNotNull(mode, "mode is required");
this.cache = new PathChildrenCache(curator, root, true);
}
/**
* Starts the client. This call ensures the creation of the root path.
*
* @throws Exception if cache fails to start or root path creation fails.
* @see #close()
*/
public void start() throws Exception {
curator.newNamespaceAwareEnsurePath(root).ensure(curator.getZookeeperClient()); // ensure root is created
getCache().start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); //build cache at start up, to ensure we get correct results right away
}
public PathChildrenCache getCache() {
return cache;
}
public String getRoot() {
return root;
}
public CreateMode getMode() {
return mode;
}
/**
* Returns true if path exists in the cache, false otherwise.
* Note that calls to this method are eventually consistent.
*
* @param path path to check
* @return true if path exists, false otherwise
*/
public boolean hasPath(final String path) {
return hasPath(path, false, null);
}
/**
* Returns true if path exists, false otherwise.
* If consistent flag is set to true, check is done directly is made against Zookeeper directly,
* else check is done against local cache.
*
* @param path path to check
* @param consistent whether the check should be consistent
* @return true if path exists, false otherwise
*/
public boolean hasPath(final String path, final boolean consistent) {
return hasPath(path, consistent, null);
}
/**
* Checks if the given path exists.
* If the flag consistent is set, the check is consistent as it is made against Zookeeper directly.
* Otherwise, the check is eventually consistent.
*
* If consistency flag is set to true and version holder is not null, passes version holder to get data change version.
* Data change version is retrieved from {@link Stat} object, it increases each time znode data change is performed.
* Link to Zookeeper documentation - https://zookeeper.apache.org/doc/r3.2.2/zookeeperProgrammers.html#sc_zkDataModel_znodes
*
* @param path path to check
* @param consistent whether the check should be consistent
* @param version version holder
* @return true if path exists, false otherwise
*/
public boolean hasPath(final String path, final boolean consistent, final DataChangeVersion version) {
Preconditions.checkNotNull(path, "path is required");
final String target = PathUtils.join(root, path);
try {
if (consistent) {
Stat stat = curator.checkExists().forPath(target);
if (version != null && stat != null) {
version.setVersion(stat.getVersion());
}
return stat != null;
} else {
return getCache().getCurrentData(target) != null;
}
} catch (final Exception e) {
throw new DrillRuntimeException("error while checking path on zookeeper", e);
}
}
/**
* Returns a value corresponding to the given path if path exists in the cache, null otherwise.
*
* Note that calls to this method are eventually consistent.
*
* @param path target path
*/
public byte[] get(final String path) {
return get(path, false);
}
/**
* Returns the value corresponding to the given key, null otherwise.
*
* If the flag consistent is set, the check is consistent as it is made against Zookeeper directly. Otherwise,
* the check is eventually consistent.
*
* @param path target path
* @param consistent consistency flag
*/
public byte[] get(final String path, final boolean consistent) {
return get(path, consistent, null);
}
/**
* Returns the value corresponding to the given key, null otherwise.
*
* The check is consistent as it is made against Zookeeper directly.
*
* Passes version holder to get data change version.
*
* @param path target path
* @param version version holder
*/
public byte[] get(final String path, final DataChangeVersion version) {
return get(path, true, version);
}
/**
* Returns the value corresponding to the given key, null otherwise.
*
* If the flag consistent is set, the check is consistent as it is made against Zookeeper directly. Otherwise,
* the check is eventually consistent.
*
* If consistency flag is set to true and version holder is not null, passes version holder to get data change version.
* Data change version is retrieved from {@link Stat} object, it increases each time znode data change is performed.
* Link to Zookeeper documentation - https://zookeeper.apache.org/doc/r3.2.2/zookeeperProgrammers.html#sc_zkDataModel_znodes
*
* @param path target path
* @param consistent consistency check
* @param version version holder
*/
public byte[] get(final String path, final boolean consistent, final DataChangeVersion version) {
Preconditions.checkNotNull(path, "path is required");
final String target = PathUtils.join(root, path);
if (consistent) {
try {
if (version != null) {
Stat stat = new Stat();
final byte[] bytes = curator.getData().storingStatIn(stat).forPath(target);
version.setVersion(stat.getVersion());
return bytes;
}
return curator.getData().forPath(target);
} catch (final Exception ex) {
throw new DrillRuntimeException(String.format("error retrieving value for [%s]", path), ex);
}
} else {
final ChildData data = getCache().getCurrentData(target);
if (data != null) {
return data.getData();
}
}
return null;
}
/**
* Creates the given path without placing any data in.
*
* @param path target path
*/
public void create(final String path) {
Preconditions.checkNotNull(path, "path is required");
final String target = PathUtils.join(root, path);
try {
curator.create().withMode(mode).forPath(target);
getCache().rebuildNode(target);
} catch (final Exception e) {
throw new DrillRuntimeException("unable to put ", e);
}
}
/**
* Puts the given byte sequence into the given path.
*
* If path does not exists, this call creates it.
*
* @param path target path
* @param data data to store
* @throws java.lang.IllegalArgumentException if data size is bigger that jute.maxbuffer value
*/
public void put(final String path, final byte[] data) {
put(path, data, null);
}
/**
* Puts the given byte sequence into the given path.
* <p>
* If path does not exists, this call creates it.
* <p>
* If version holder is not null and path already exists, passes given version for comparison.
* Zookeeper maintains stat structure that holds version number which increases each time znode data change is performed.
* If we pass version that doesn't match the actual version of the data,
* the update will fail {@link org.apache.zookeeper.KeeperException.BadVersionException}.
* We catch such exception and re-throw it as {@link VersionMismatchException}.
* Link to documentation - https://zookeeper.apache.org/doc/r3.2.2/zookeeperProgrammers.html#sc_zkDataModel_znodes
*
* @param path target path
* @param data data to store
* @param version version holder
* @throws java.lang.IllegalArgumentException if data size is bigger that jute.maxbuffer value
*/
public void put(final String path, final byte[] data, DataChangeVersion version) {
Preconditions.checkNotNull(path, "path is required");
Preconditions.checkNotNull(data, "data is required");
if (data.length > MAX_DATA_LENGTH) {
throw new IllegalArgumentException(
String.format("Can't put this data. Data size %d bytes is bigger than jute.maxbuffer %d", data.length, MAX_DATA_LENGTH)
);
}
final String target = PathUtils.join(root, path);
try {
// we make a consistent read to ensure this call won't fail upon consecutive calls on the same path
// before cache is updated
boolean hasNode = hasPath(path, true);
if (!hasNode) {
try {
curator.create().withMode(mode).forPath(target, data);
} catch (NodeExistsException e) {
// Handle race conditions since Drill is distributed and other
// drillbits may have just created the node. This assumes that we do want to
// override the new node. Makes sense here, because if the node had existed,
// we'd have updated it.
hasNode = true;
}
}
if (hasNode) {
if (version != null) {
try {
curator.setData().withVersion(version.getVersion()).forPath(target, data);
} catch (final KeeperException.BadVersionException e) {
throw new VersionMismatchException("Unable to put data. Version mismatch is detected.", version.getVersion(), e);
}
} else {
curator.setData().forPath(target, data);
}
}
getCache().rebuildNode(target);
} catch (final VersionMismatchException e) {
throw e;
} catch (final Exception e) {
logger.info("Data size to persist is: {} bytes, client jute.maxbuffer value is: {}. Make sure that the client " +
"jute.maxbuffer value corresponds to the zookeeper server value.", data.length, MAX_DATA_LENGTH);
throw new DrillRuntimeException("unable to put ", e);
}
}
/**
* Puts the given byte sequence into the given path if path is does not exist.
*
* @param path target path
* @param data data to store
* @return null if path was created, else data stored for the given path
*/
public byte[] putIfAbsent(final String path, final byte[] data) {
Preconditions.checkNotNull(path, "path is required");
Preconditions.checkNotNull(data, "data is required");
if (data.length > MAX_DATA_LENGTH) {
throw new IllegalArgumentException(
String.format("Can't put this data. Data size %d bytes is bigger than jute.maxbuffer %d", data.length, MAX_DATA_LENGTH)
);
}
final String target = PathUtils.join(root, path);
try {
try {
curator.create().withMode(mode).forPath(target, data);
getCache().rebuildNode(target);
return null;
} catch (NodeExistsException e) {
// do nothing
}
return curator.getData().forPath(target);
} catch (final Exception e) {
logger.info("Data size to persist is: {} bytes, client jute.maxbuffer value is: {}. Make sure that the client " +
"jute.maxbuffer value corresponds to the zookeeper server value.", data.length, MAX_DATA_LENGTH);
throw new DrillRuntimeException("unable to put ", e);
}
}
/**
* Deletes the given node residing at the given path
*
* @param path target path to delete
*/
public void delete(final String path) {
Preconditions.checkNotNull(path, "path is required");
final String target = PathUtils.join(root, path);
try {
curator.delete().forPath(target);
getCache().rebuildNode(target);
} catch (final Exception e) {
throw new DrillRuntimeException(String.format("unable to delete node at %s", target), e);
}
}
/**
* Returns an iterator of (key, value) pairs residing under {@link #getRoot() root} path.
*/
public Iterator<Map.Entry<String, byte[]>> entries() {
final String prefix = PathUtils.join(root, "/");
return Iterables.transform(getCache().getCurrentData(), new Function<ChildData, Map.Entry<String, byte[]>>() {
@Nullable
@Override
public Map.Entry<String, byte[]> apply(final ChildData data) {
// normalize key name removing the root prefix. resultant key must be a relative path, not beginning with a '/'.
final String key = data.getPath().replace(prefix, "");
return new ImmutableEntry<>(key, data.getData());
}
}).iterator();
}
@Override
public void close() throws Exception {
getCache().close();
}
}