blob: 0a9a57d0313f0448d721b64424f4913336f88d4c [file] [log] [blame]
package org.apache.blur.manager.clusterstatus;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.blur.BlurConfiguration;
import org.apache.blur.log.Log;
import org.apache.blur.log.LogFactory;
import org.apache.blur.lucene.search.FairSimilarity;
import org.apache.blur.thirdparty.thrift_0_9_0.TDeserializer;
import org.apache.blur.thirdparty.thrift_0_9_0.TException;
import org.apache.blur.thirdparty.thrift_0_9_0.TSerializer;
import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TJSONProtocol;
import org.apache.blur.thrift.generated.TableDescriptor;
import org.apache.blur.utils.BlurConstants;
import org.apache.blur.utils.BlurUtil;
import org.apache.blur.zookeeper.WatchChildren;
import org.apache.blur.zookeeper.WatchChildren.OnChange;
import org.apache.blur.zookeeper.WatchNodeData;
import org.apache.blur.zookeeper.ZkUtils;
import org.apache.blur.zookeeper.ZooKeeperLockManager;
import org.apache.blur.zookeeper.ZookeeperPathConstants;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class ZookeeperClusterStatus extends ClusterStatus {
private static final Log LOG = LogFactory.getLog(ZookeeperClusterStatus.class);
private final ZooKeeper _zk;
private final BlurConfiguration _configuration;
private final AtomicBoolean _running = new AtomicBoolean();
private final ConcurrentMap<String, List<String>> _onlineShardsNodes = new ConcurrentHashMap<String, List<String>>();
private final ConcurrentMap<String, Set<String>> _tablesPerCluster = new ConcurrentHashMap<String, Set<String>>();
private final AtomicReference<Set<String>> _clusters = new AtomicReference<Set<String>>(new HashSet<String>());
private final Map<String, TableDescriptor> _tableDescriptorCache = new ConcurrentHashMap<String, TableDescriptor>();
private final WatchChildren _clusterWatcher;
private final ConcurrentMap<String, WatchChildren> _onlineShardsNodesWatchers = new ConcurrentHashMap<String, WatchChildren>();
private final ConcurrentMap<String, WatchChildren> _tableWatchers = new ConcurrentHashMap<String, WatchChildren>();
private final Map<String, SafeModeCacheEntry> _clusterToSafeMode = new ConcurrentHashMap<String, ZookeeperClusterStatus.SafeModeCacheEntry>();
private final ConcurrentMap<String, WatchNodeData> _enabledWatchNodeExistance = new ConcurrentHashMap<String, WatchNodeData>();
private final Set<Action> _tableStateChange = Collections.newSetFromMap(new ConcurrentHashMap<Action, Boolean>());
public ZookeeperClusterStatus(ZooKeeper zooKeeper, BlurConfiguration configuration) {
_zk = zooKeeper;
_running.set(true);
_clusterWatcher = new WatchChildren(_zk, ZookeeperPathConstants.getClustersPath());
_clusterWatcher.watch(new Clusters());
_configuration = configuration;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public ZookeeperClusterStatus(String connectionStr, BlurConfiguration configuration) throws IOException {
this(new ZooKeeper(connectionStr, 30000, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
}), configuration);
}
public ZookeeperClusterStatus(ZooKeeper zooKeeper) throws IOException {
this(zooKeeper, new BlurConfiguration());
}
public ZookeeperClusterStatus(String connectionStr) throws IOException {
this(connectionStr, new BlurConfiguration());
}
class Clusters extends OnChange {
@Override
public void action(List<String> clusters) {
_clusters.set(new HashSet<String>(clusters));
for (String cluster : clusters) {
if (!_tableWatchers.containsKey(cluster)) {
String tablesPath = ZookeeperPathConstants.getTablesPath(cluster);
ZkUtils.waitUntilExists(_zk, tablesPath);
WatchChildren clusterWatcher = new WatchChildren(_zk, tablesPath);
clusterWatcher.watch(new Tables(cluster));
_tableWatchers.put(cluster, clusterWatcher);
}
}
List<String> clustersToCloseAndRemove = new ArrayList<String>(clusters);
clustersToCloseAndRemove.removeAll(_tableWatchers.keySet());
for (String cluster : clustersToCloseAndRemove) {
WatchChildren watcher = _tableWatchers.remove(cluster);
if (watcher == null) {
LOG.error("Error watcher is null [" + cluster + "] ");
} else {
watcher.close();
}
}
}
}
class Tables extends OnChange {
private final String _cluster;
private final String _tablesPath;
public Tables(String cluster) {
_cluster = cluster;
_tablesPath = ZookeeperPathConstants.getTablesPath(cluster);
}
@Override
public void action(List<String> tables) {
runActions();
Set<String> newSet = new HashSet<String>(filterTables(_tablesPath, tables));
Set<String> oldSet = _tablesPerCluster.put(_cluster, newSet);
Set<String> newTables = getNewTables(newSet, oldSet);
Set<String> oldTables = getOldTables(newSet, oldSet);
for (String table : oldTables) {
final String clusterTableKey = getClusterTableKey(_cluster, table);
WatchNodeData watch = _enabledWatchNodeExistance.remove(clusterTableKey);
if (watch != null) {
watch.close();
}
_tableDescriptorCache.remove(table);
}
for (String table : newTables) {
final String clusterTableKey = getClusterTableKey(_cluster, table);
WatchNodeData enabledWatcher = new WatchNodeData(_zk, ZookeeperPathConstants.getTablePath(_cluster, table));
enabledWatcher.watch(new WatchNodeData.OnChange() {
@Override
public void action(byte[] data) {
runActions();
_tableDescriptorCache.clear();
}
});
if (_enabledWatchNodeExistance.putIfAbsent(clusterTableKey, enabledWatcher) != null) {
enabledWatcher.close();
}
}
}
private Set<String> getOldTables(Set<String> newSet, Set<String> oldSet) {
Set<String> oldTables = new HashSet<String>();
if (oldSet != null) {
oldTables.addAll(oldSet);
}
if (newSet != null) {
oldTables.removeAll(newSet);
}
return oldTables;
}
private Set<String> getNewTables(Set<String> newSet, Set<String> oldSet) {
Set<String> newTables = new HashSet<String>(newSet);
if (oldSet != null) {
newTables.removeAll(oldSet);
}
return newTables;
}
}
private String getClusterTableKey(String cluster, String table) {
return cluster + "." + table;
}
private void runActions() {
for (Action action : _tableStateChange) {
action.action();
}
}
@Override
public List<String> getClusterList(boolean useCache) {
if (useCache) {
return new ArrayList<String>(_clusters.get());
}
long s = System.nanoTime();
try {
checkIfOpen();
return _zk.getChildren(ZookeeperPathConstants.getClustersPath(), false);
} catch (KeeperException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
long e = System.nanoTime();
LOG.debug("trace getClusterList [" + (e - s) / 1000000.0 + " ms]");
}
}
private void checkIfOpen() {
if (_running.get()) {
return;
}
throw new RuntimeException("not open");
}
@Override
public List<String> getOnlineControllerList() {
long s = System.nanoTime();
try {
checkIfOpen();
return _zk.getChildren(ZookeeperPathConstants.getOnlineControllersPath(), false);
} catch (KeeperException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
long e = System.nanoTime();
LOG.debug("trace getOnlineControllerList [" + (e - s) / 1000000.0 + " ms]");
}
}
@Override
public List<String> getControllerServerList() {
return getOnlineControllerList();
}
@Override
public List<String> getOnlineShardServers(boolean useCache, String cluster) {
if (useCache) {
List<String> shards = _onlineShardsNodes.get(cluster);
if (shards != null) {
return shards;
} else {
watchForOnlineShardNodes(cluster);
}
}
long s = System.nanoTime();
try {
checkIfOpen();
String onlineShardsPath = ZookeeperPathConstants.getOnlineShardsPath(cluster);
return _zk.getChildren(onlineShardsPath, false);
} catch (KeeperException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
long e = System.nanoTime();
LOG.debug("trace getOnlineShardServers took [" + (e - s) / 1000000.0 + " ms]");
}
}
private void watchForOnlineShardNodes(final String cluster) {
WatchChildren watch = new WatchChildren(_zk, ZookeeperPathConstants.getOnlineShardsPath(cluster));
watch.watch(new OnChange() {
@Override
public void action(List<String> children) {
_onlineShardsNodes.put(cluster, children);
}
});
if (_onlineShardsNodesWatchers.putIfAbsent(cluster, watch) != null) {
// There was already a watch created. Close the extra watcher.
watch.close();
}
}
@Override
public List<String> getShardServerList(String cluster) {
long s = System.nanoTime();
try {
checkIfOpen();
String registeredShardsPath = ZookeeperPathConstants.getRegisteredShardsPath(cluster);
return _zk.getChildren(registeredShardsPath, false);
} catch (KeeperException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
long e = System.nanoTime();
LOG.debug("trace getShardServerList took [" + (e - s) / 1000000.0 + " ms]");
}
}
@Override
public boolean exists(boolean useCache, String cluster, String table) {
if (useCache) {
Set<String> tables = _tablesPerCluster.get(cluster);
if (tables != null) {
if (tables.contains(table)) {
return true;
}
}
}
try {
checkIfOpen();
if (_zk.exists(ZookeeperPathConstants.getTablePath(cluster, table), false) == null) {
return false;
}
return true;
} catch (KeeperException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public boolean isEnabled(boolean useCache, String cluster, String table) {
TableDescriptor tableDescriptor = getTableDescriptor(useCache, cluster, table);
return tableDescriptor.isEnabled();
}
@Override
public TableDescriptor getTableDescriptor(boolean useCache, String cluster, String table) {
if (useCache) {
TableDescriptor tableDescriptor = _tableDescriptorCache.get(table);
if (tableDescriptor != null) {
return tableDescriptor;
}
}
long s = System.nanoTime();
TableDescriptor tableDescriptor = new TableDescriptor();
try {
checkIfOpen();
String blurTablePath = ZookeeperPathConstants.getTablePath(cluster, table);
byte[] bytes = getData(blurTablePath);
if (bytes == null) {
throw new RuntimeException("Table [" + table + "] in cluster [" + cluster + "] not found.");
}
TDeserializer deserializer = new TDeserializer(new TJSONProtocol.Factory());
deserializer.deserialize(tableDescriptor, bytes);
} catch (TException e) {
throw new RuntimeException(e);
} catch (KeeperException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
long e = System.nanoTime();
LOG.debug("trace getTableDescriptor took [" + (e - s) / 1000000.0 + " ms]");
}
tableDescriptor.cluster = cluster;
_tableDescriptorCache.put(table, tableDescriptor);
return tableDescriptor;
}
private byte[] getData(String path) throws KeeperException, InterruptedException {
Stat stat = _zk.exists(path, false);
if (stat == null) {
LOG.debug("Tried to fetch path [{0}] and path is missing", path);
return null;
}
byte[] data = _zk.getData(path, false, stat);
if (data == null) {
LOG.debug("Fetched path [{0}] and data is null", path);
return null;
}
return data;
}
@Override
public List<String> getTableList(boolean useCache, String cluster) {
if (useCache) {
Set<String> tables = _tablesPerCluster.get(cluster);
if (tables != null) {
return new ArrayList<String>(tables);
}
}
long s = System.nanoTime();
try {
checkIfOpen();
String tablesPath = ZookeeperPathConstants.getTablesPath(cluster);
return filterTables(tablesPath, _zk.getChildren(tablesPath, false));
} catch (KeeperException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
long e = System.nanoTime();
LOG.debug("trace getTableList took [" + (e - s) / 1000000.0 + " ms]");
}
}
private List<String> filterTables(String tablesPath, List<String> tables) {
try {
List<String> result = new ArrayList<String>();
for (String table : tables) {
String path = tablesPath + "/" + table;
Stat stat = _zk.exists(path, false);
if (stat != null) {
byte[] data = _zk.getData(path, false, stat);
if (data == null) {
LOG.info("Empty table node found at [" + path + "]");
_zk.delete(path, -1);
} else {
result.add(table);
}
}
}
return result;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void close() {
if (_running.get()) {
_running.set(false);
close(_clusterWatcher);
close(_onlineShardsNodesWatchers);
close(_tableWatchers);
close(_enabledWatchNodeExistance);
}
}
private void close(ConcurrentMap<String, ? extends Closeable> closableMap) {
Collection<? extends Closeable> values = closableMap.values();
for (Closeable closeable : values) {
close(closeable);
}
}
private void close(Closeable closeable) {
try {
closeable.close();
} catch (IOException e) {
LOG.error("Unknown error while trying to close [{0}]", closeable);
}
}
@Override
public String getCluster(boolean useCache, String table) {
if (useCache) {
for (Entry<String, Set<String>> entry : _tablesPerCluster.entrySet()) {
if (entry.getValue().contains(table)) {
return entry.getKey();
}
}
}
List<String> clusterList = getClusterList(useCache);
for (String cluster : clusterList) {
long s = System.nanoTime();
try {
checkIfOpen();
Stat stat = _zk.exists(ZookeeperPathConstants.getTablePath(cluster, table), false);
if (stat != null) {
// _tableToClusterCache.put(table, cluster);
return cluster;
}
} catch (KeeperException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
long e = System.nanoTime();
LOG.debug("trace getCluster took [" + (e - s) / 1000000.0 + " ms]");
}
}
return null;
}
static class SafeModeCacheEntry {
static final long _10_SECONDS = TimeUnit.SECONDS.toMillis(10);
boolean _safeMode;
long _lastCheck;
public SafeModeCacheEntry(boolean safeMode) {
_lastCheck = System.currentTimeMillis();
_safeMode = safeMode;
}
boolean isValid() {
long now = System.currentTimeMillis();
if (_lastCheck + _10_SECONDS < now) {
return false;
}
return true;
}
}
@Override
public boolean isInSafeMode(boolean useCache, String cluster) {
if (useCache) {
SafeModeCacheEntry safeModeCacheEntry = _clusterToSafeMode.get(cluster);
if (safeModeCacheEntry != null && safeModeCacheEntry.isValid()) {
return safeModeCacheEntry._safeMode;
}
}
try {
checkIfOpen();
String safemodePath = ZookeeperPathConstants.getSafemodePath(cluster);
ZooKeeperLockManager zooKeeperLockManager = new ZooKeeperLockManager(_zk, safemodePath);
if (zooKeeperLockManager.getNumberOfLockNodesPresent(cluster) == 0) {
_clusterToSafeMode.put(cluster, new SafeModeCacheEntry(false));
return false;
}
_clusterToSafeMode.put(cluster, new SafeModeCacheEntry(true));
return true;
} catch (KeeperException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public boolean isReadOnly(boolean useCache, String cluster, String table) {
TableDescriptor tableDescriptor = getTableDescriptor(useCache, cluster, table);
return tableDescriptor.isReadOnly();
}
@Override
public void createTable(TableDescriptor tableDescriptor) {
long s = System.nanoTime();
try {
checkIfOpen();
if (tableDescriptor.getSimilarityClass() == null) {
tableDescriptor.setSimilarityClass(FairSimilarity.class.getName());
}
String table = BlurUtil.nullCheck(tableDescriptor.name, "tableDescriptor.name cannot be null.");
String cluster = BlurUtil.nullCheck(tableDescriptor.cluster, "tableDescriptor.cluster cannot be null.");
assignTableUri(tableDescriptor);
String uri = BlurUtil.nullCheck(tableDescriptor.tableUri, "tableDescriptor.tableUri cannot be null.");
int shardCount = BlurUtil.zeroCheck(tableDescriptor.shardCount,
"tableDescriptor.shardCount cannot be less than 1");
String blurTablePath = ZookeeperPathConstants.getTablePath(cluster, table);
if (_zk.exists(blurTablePath, false) != null) {
throw new IOException("Table [" + table + "] already exists.");
}
BlurUtil.setupFileSystem(uri, shardCount);
byte[] bytes = serializeTableDescriptor(tableDescriptor);
BlurUtil.createPath(_zk, blurTablePath, bytes);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (KeeperException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
long e = System.nanoTime();
LOG.debug("trace createTable took [" + (e - s) / 1000000.0 + " ms]");
}
}
private byte[] serializeTableDescriptor(TableDescriptor td) {
try {
TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
return serializer.serialize(td);
} catch (TException e) {
throw new RuntimeException(e);
}
}
private void assignTableUri(TableDescriptor tableDescriptor) {
if (tableDescriptor.getTableUri() != null) {
return;
}
String cluster = tableDescriptor.getCluster();
String defaultTableUriPropertyName = BlurConstants.getDefaultTableUriPropertyName(cluster);
String parentPath = _configuration.get(defaultTableUriPropertyName);
if (parentPath == null) {
return;
}
String tableUri = parentPath + "/" + tableDescriptor.getName();
LOG.info("Setting default table uri for table [{0}] of [{1}]", tableDescriptor.getName(), tableUri);
tableDescriptor.setTableUri(tableUri);
}
@Override
public void disableTable(String cluster, String table) {
try {
checkIfOpen();
String tablePath = ZookeeperPathConstants.getTablePath(cluster, table);
Stat stat = _zk.exists(tablePath, false);
if (stat == null) {
throw new IOException("Table [" + table + "] does not exist.");
}
TableDescriptor tableDescriptor = getTableDescriptor(false, cluster, table);
if (!tableDescriptor.isEnabled()) {
return;
}
tableDescriptor.setEnabled(false);
byte[] bytes = serializeTableDescriptor(tableDescriptor);
_zk.setData(tablePath, bytes, stat.getVersion());
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (KeeperException e) {
throw new RuntimeException(e);
}
}
@Override
public void enableTable(String cluster, String table) {
long s = System.nanoTime();
try {
checkIfOpen();
String tablePath = ZookeeperPathConstants.getTablePath(cluster, table);
Stat stat = _zk.exists(tablePath, false);
if (stat == null) {
throw new IOException("Table [" + table + "] does not exist.");
}
TableDescriptor tableDescriptor = getTableDescriptor(false, cluster, table);
if (tableDescriptor.isEnabled()) {
return;
}
tableDescriptor.setEnabled(true);
byte[] bytes = serializeTableDescriptor(tableDescriptor);
_zk.setData(tablePath, bytes, stat.getVersion());
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (KeeperException e) {
throw new RuntimeException(e);
} finally {
long e = System.nanoTime();
LOG.debug("trace enableTable took [" + (e - s) / 1000000.0 + " ms]");
}
}
@Override
public void removeTable(String cluster, String table, boolean deleteIndexFiles) {
long s = System.nanoTime();
try {
checkIfOpen();
TableDescriptor tableDescriptor = getTableDescriptor(true, cluster, table);
String blurTablePath = ZookeeperPathConstants.getTablePath(cluster, table);
if (_zk.exists(blurTablePath, false) == null) {
throw new IOException("Table [" + table + "] does not exist.");
}
if (tableDescriptor.isEnabled()) {
throw new IOException("Table [" + table + "] must be disabled before it can be removed.");
}
String uri = tableDescriptor.getTableUri();
BlurUtil.removeAll(_zk, blurTablePath);
if (deleteIndexFiles) {
BlurUtil.removeIndexFiles(uri);
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (KeeperException e) {
throw new RuntimeException(e);
} finally {
long e = System.nanoTime();
LOG.debug("trace removeTable took [" + (e - s) / 1000000.0 + " ms]");
}
}
@Override
public boolean isOpen() {
return _running.get();
}
@Override
public void registerActionOnTableStateChange(Action action) {
_tableStateChange.add(action);
}
}