blob: d25cbd2bd491f7078203d644d921be30beaaeb41 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.server.conf.store.impl;
import static java.util.Objects.requireNonNullElseGet;
import java.io.IOException;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.fate.zookeeper.ZooReader;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.server.conf.codec.VersionedPropCodec;
import org.apache.accumulo.server.conf.codec.VersionedProperties;
import org.apache.accumulo.server.conf.store.PropCache;
import org.apache.accumulo.server.conf.store.PropChangeListener;
import org.apache.accumulo.server.conf.store.PropStore;
import org.apache.accumulo.server.conf.store.PropStoreKey;
import org.apache.accumulo.server.conf.store.SystemPropKey;
import org.apache.accumulo.server.conf.util.ConfigTransformer;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.benmanes.caffeine.cache.Ticker;
public class ZooPropStore implements PropStore, PropChangeListener {
private final static Logger log = LoggerFactory.getLogger(ZooPropStore.class);
private final static VersionedPropCodec codec = VersionedPropCodec.getDefault();
private final ZooReaderWriter zrw;
private final PropStoreWatcher propStoreWatcher;
private final PropCacheCaffeineImpl cache;
private final ReadyMonitor zkReadyMon;
/**
* Create instance using ZooPropStore.Builder
*
* @param instanceId the instance id
* @param zrw a wrapper set of utilities for accessing ZooKeeper.
*/
private ZooPropStore(final InstanceId instanceId, final ZooReaderWriter zrw) {
this(instanceId, zrw, null, null, null);
}
/**
* For testing create an instance with the optionally pass synthetic clock (Ticker), a
* ReadyMonitor and a PropStore watcher allowing them to be mocked. If the optional components are
* passed as null an internal instance is created.
*
* @param instanceId the instance id
* @param zrw a wrapper set of utilities for accessing ZooKeeper.
* @param monitor a ready monitor. Optional, if null, one is created.
* @param watcher a watcher. Optional, if null, one is created.
* @param ticker a synthetic clock used for testing. Optional, if null, one is created.
*/
ZooPropStore(final InstanceId instanceId, final ZooReaderWriter zrw, final ReadyMonitor monitor,
final PropStoreWatcher watcher, final Ticker ticker) {
this.zrw = zrw;
this.zkReadyMon = requireNonNullElseGet(monitor,
() -> new ReadyMonitor("prop-store", Math.round(zrw.getSessionTimeout() * 1.75)));
this.propStoreWatcher = requireNonNullElseGet(watcher, () -> new PropStoreWatcher(zkReadyMon));
ZooPropLoader propLoader = new ZooPropLoader(zrw, codec, this.propStoreWatcher);
if (ticker == null) {
this.cache = new PropCacheCaffeineImpl.Builder(propLoader).build();
} else {
this.cache = new PropCacheCaffeineImpl.Builder(propLoader).forTests(ticker).build();
}
try {
var path = ZooUtil.getRoot(instanceId);
if (zrw.exists(path, propStoreWatcher)) {
log.debug("Have a ZooKeeper connection and found instance node: {}", instanceId);
zkReadyMon.setReady();
} else {
throw new IllegalStateException("Instance may not have been initialized, root node: " + path
+ " does not exist in ZooKeeper");
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IllegalStateException(
"Interrupted trying to read root node " + instanceId + " from ZooKeeper", ex);
} catch (KeeperException ex) {
throw new IllegalStateException("Failed to read root node " + instanceId + " from ZooKeeper",
ex);
}
}
public static ZooPropStore initialize(@NonNull final InstanceId instanceId,
@NonNull final ZooReaderWriter zrw) {
return new ZooPropStore(instanceId, zrw);
}
@Override
public boolean exists(final PropStoreKey<?> propStoreKey) {
try {
if (zrw.exists(propStoreKey.getPath())) {
return true;
}
} catch (KeeperException ex) {
// ignore Keeper exception on check.
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Interrupted testing if node exists", ex);
}
return false;
}
@Override
public void create(PropStoreKey<?> propStoreKey, Map<String,String> props) {
try {
VersionedProperties vProps = new VersionedProperties(props);
String path = propStoreKey.getPath();
zrw.putPrivatePersistentData(path, codec.toBytes(vProps), ZooUtil.NodeExistsPolicy.FAIL);
} catch (IOException | KeeperException | InterruptedException ex) {
throw new IllegalStateException("Failed to serialize properties for " + propStoreKey, ex);
}
}
/**
* get or create properties from the store. If the property node does not exist in ZooKeeper,
* legacy properties exist, they will be converted to the new storage form and naming convention.
* The legacy properties are deleted once the new node format is written.
*
* @param propStoreKey the prop cache key
* @return The versioned properties.
* @throws IllegalStateException if the updates fails because of an underlying store exception
*/
@Override
public @NonNull VersionedProperties get(final PropStoreKey<?> propStoreKey) {
checkZkConnection(); // if ZK not connected, block, do not just return a cached value.
propStoreWatcher.registerListener(propStoreKey, this);
var props = cache.get(propStoreKey);
if (props != null) {
return props;
}
if (propStoreKey instanceof SystemPropKey) {
return new ConfigTransformer(zrw, codec, propStoreWatcher).transform(propStoreKey,
propStoreKey.getPath(), false);
}
throw new IllegalStateException(
"Invalid request for " + propStoreKey + ", the property node does not exist");
}
/**
* Convenience method for utilities that may not have a PropStore read the encoded properties
* directly from ZooKeeper. This allows utilities access when there is a ZooKeeper, may there may
* not be a full instance running. All exception handling is left to the caller.
*
* @param propStoreKey the prop cache key
* @param watcher a prop store watcher that will receive / handle ZooKeeper events.
* @param zooReader a ZooReader with an authenticated session.
* @return the versioned properties or null if the node does not exist.
* @throws IOException if the underlying data from the ZooKeeper node cannot be decoded.
* @throws KeeperException if a ZooKeeper exception occurs
* @throws InterruptedException if the ZooKeeper read was interrupted.
*/
public static @Nullable VersionedProperties readFromZk(final PropStoreKey<?> propStoreKey,
final PropStoreWatcher watcher, final ZooReader zooReader)
throws IOException, KeeperException, InterruptedException {
try {
Stat stat = new Stat();
byte[] bytes = zooReader.getData(propStoreKey.getPath(), watcher, stat);
if (stat.getDataLength() == 0) {
// node exists - but is empty - no props have been stored on node.
return null;
}
return codec.fromBytes(stat.getVersion(), bytes);
} catch (KeeperException.NoNodeException ex) {
// ignore no node - allow other exceptions to propagate
return null;
}
}
/**
* Copies all mappings from the specified map and into the existing property values and stores
* them into the backend store. New keys are added and keys the may have existed in the current
* properties are overwritten.
* <p>
* If multiple threads attempt to update values concurrently, this method will automatically
* retry. If the threads are setting different keys / values the result will be the sum of the
* changes. If the concurrent threads are attempting to set a value(s) for the same key(s), the
* value(s) will be the set to the values provided by the last thread to complete. The order is
* indeterminate.
*
* @param propStoreKey the prop cache id
* @param props a map of property k,v pairs
* @throws IllegalStateException if the values cannot be written or if an underlying store
* exception occurs.
*/
@Override
public void putAll(@NonNull PropStoreKey<?> propStoreKey, @NonNull Map<String,String> props) {
if (props.isEmpty()) {
return; // no props - noop
}
mutateVersionedProps(propStoreKey, VersionedProperties::addOrUpdate, props);
}
@Override
public void replaceAll(@NonNull PropStoreKey<?> propStoreKey, long version,
@NonNull Map<String,String> props) {
mutateVersionedProps(propStoreKey, VersionedProperties::replaceAll, version, props);
}
@Override
public void removeProperties(@NonNull PropStoreKey<?> propStoreKey,
@NonNull Collection<String> keys) {
if (keys.isEmpty()) {
return; // no keys - noop.
}
mutateVersionedProps(propStoreKey, VersionedProperties::remove, keys);
}
@Override
public void delete(@NonNull PropStoreKey<?> propStoreKey) {
Objects.requireNonNull(propStoreKey, "prop store delete() - Must provide propCacheId");
try {
log.trace("called delete() for: {}", propStoreKey);
final String path = propStoreKey.getPath();
zrw.delete(path);
cache.remove(propStoreKey);
} catch (KeeperException | InterruptedException ex) {
throw new IllegalStateException("Failed to delete properties for propCacheId " + propStoreKey,
ex);
}
}
private <T> void mutateVersionedProps(PropStoreKey<?> propStoreKey,
BiFunction<VersionedProperties,T,VersionedProperties> action, T changes) {
log.trace("mutateVersionedProps called for: {}", propStoreKey);
try {
VersionedProperties vProps = cache.getIfCached(propStoreKey);
if (vProps == null) {
vProps = readPropsFromZk(propStoreKey);
}
for (int attempts = 3; attempts > 0; --attempts) {
VersionedProperties updates = action.apply(vProps, changes);
if (zrw.overwritePersistentData(propStoreKey.getPath(), codec.toBytes(updates),
(int) updates.getDataVersion())) {
return;
}
Thread.sleep(20); // small pause to get thread to yield.
// re-read from zookeeper to ensure the latest version.
vProps = readPropsFromZk(propStoreKey);
}
throw new IllegalStateException(
"failed to remove properties to zooKeeper for " + propStoreKey, null);
} catch (IllegalArgumentException | IOException ex) {
throw new IllegalStateException(
"Codec failed to decode / encode properties for " + propStoreKey, ex);
} catch (InterruptedException | KeeperException ex) {
if (ex instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new IllegalStateException(
"failed to remove properties to zooKeeper for " + propStoreKey, ex);
}
}
private <T> void mutateVersionedProps(PropStoreKey<?> propStoreKey,
BiFunction<VersionedProperties,T,VersionedProperties> action, long existingVersion,
T changes) {
log.trace("mutateVersionedProps called for: {}", propStoreKey);
try {
// Grab the current properties
VersionedProperties vProps = cache.getIfCached(propStoreKey);
if (vProps == null) {
vProps = readPropsFromZk(propStoreKey);
}
// Compare the version of the current properties in the cache/ZK and the passed in version to
// see if the versions match
// If the versions do not match then we want to throw an error. This check here allows us to
// avoid
// an extra call to zookeeper if the versions don't match
if (vProps.getDataVersion() != existingVersion) {
throw new ConcurrentModificationException("Failed to modify properties to zooKeeper for "
+ propStoreKey + ", properties changed since reading.", null);
}
final VersionedProperties updates = action.apply(vProps, changes);
// We checked the version earlier but this could still fail if another update was made
// and hadn't propagated yet to update the cache during the time that updates were applied
// after reading
// Zookeeper will return false if the versions do not match, so we should throw an error to
// let the caller
// know the version supplied is no longer the latest
if (!zrw.overwritePersistentData(propStoreKey.getPath(), codec.toBytes(updates),
(int) updates.getDataVersion())) {
throw new ConcurrentModificationException("Failed to modify properties to zooKeeper for "
+ propStoreKey + ", properties changed since reading.", null);
}
} catch (IllegalArgumentException | IOException ex) {
throw new IllegalStateException(
"Codec failed to decode / encode properties for " + propStoreKey, ex);
} catch (InterruptedException | KeeperException ex) {
if (ex instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new IllegalStateException(
"failed to modify properties to zooKeeper for " + propStoreKey, ex);
}
}
@Override
public void registerAsListener(PropStoreKey<?> propStoreKey, PropChangeListener listener) {
propStoreWatcher.registerListener(propStoreKey, listener);
}
private void checkZkConnection() {
if (zkReadyMon.test()) {
return;
}
cache.removeAll();
// not ready block or propagate error if it times out.
zkReadyMon.isReady();
}
@Override
public void zkChangeEvent(PropStoreKey<?> propStoreKey) {
log.trace("Received change event from ZooKeeper for: {} removed from cache", propStoreKey);
cache.remove(propStoreKey);
}
/**
* NOOP for the prop store - the cache value will reflect the updated value on next read. The
* change is also sent to external listeners of the need to take action, but for the prop store,
* no additional action is required.
*
* @param propStoreKey the prop cache id.
*/
@Override
public void cacheChangeEvent(PropStoreKey<?> propStoreKey) {
log.trace("zkChangeEvent: {}", propStoreKey);
}
@Override
public void deleteEvent(PropStoreKey<?> propStoreKey) {
log.trace("deleteEvent: {}", propStoreKey);
cache.remove(propStoreKey);
}
@Override
public void connectionEvent() {
log.trace("connectionEvent");
cache.removeAll();
}
/**
* Read and decode property node from ZooKeeper.
*
* @param propStoreKey the propStoreKey
* @return the decoded properties.
* @throws KeeperException if a ZooKeeper exception occurs to allow caller to decide on action.
* @throws IOException if the node data cannot be decoded into properties to allow the caller to
* decide on action.
* @throws IllegalStateException if an interrupt occurs. The interrupt status is reasserted and
* usually best to not otherwise try to handle the exception.
*/
private VersionedProperties readPropsFromZk(PropStoreKey<?> propStoreKey)
throws KeeperException, IOException {
try {
Stat stat = new Stat();
byte[] bytes = zrw.getData(propStoreKey.getPath(), stat);
if (stat.getDataLength() == 0) {
return new VersionedProperties();
}
return codec.fromBytes(stat.getVersion(), bytes);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Interrupt received during ZooKeeper read", ex);
}
}
@Override
public PropCache getCache() {
return cache;
}
@Override
public @Nullable VersionedProperties getIfCached(PropStoreKey<?> propStoreKey) {
return cache.getIfCached(propStoreKey);
}
@Override
public boolean validateDataVersion(PropStoreKey<?> storeKey, long expectedVersion) {
try {
Stat stat = zrw.getStatus(storeKey.getPath());
log.trace("data version sync: stat returned: {} for {}", stat, storeKey);
if (stat == null || expectedVersion != stat.getVersion()) {
propStoreWatcher.signalZkChangeEvent(storeKey);
return false;
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IllegalStateException(ex);
} catch (KeeperException.NoNodeException ex) {
propStoreWatcher.signalZkChangeEvent(storeKey);
return false;
} catch (KeeperException ex) {
log.debug("exception occurred verifying data version for {}", storeKey);
return false;
}
return true;
}
}