blob: 1a2e916bcff293420c400d1a24a8cfd34a12f640 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.blur.manager.indexserver;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.blur.log.Log;
import org.apache.blur.log.LogFactory;
import org.apache.blur.utils.ThreadValue;
import org.apache.blur.zookeeper.ZkUtils;
import org.apache.blur.zookeeper.ZooKeeperLockManager;
import org.apache.blur.zookeeper.ZookeeperPathConstants;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class MasterBasedDistributedLayoutFactory implements DistributedLayoutFactory {
private static final String SEP = "_";
private static final Log LOG = LogFactory.getLog(MasterBasedDistributedLayoutFactory.class);
private final ConcurrentMap<String, MasterBasedDistributedLayout> _cachedLayoutMap = new ConcurrentHashMap<String, MasterBasedDistributedLayout>();
private final ZooKeeper _zooKeeper;
private final String _storagePath;
private final ZooKeeperLockManager _zooKeeperLockManager;
private final String _locksStoragePath;
private final ThreadValue<Random> _random = new ThreadValue<Random>() {
@Override
protected Random initialValue() {
return new Random();
}
};
private final String _cluster;
public MasterBasedDistributedLayoutFactory(ZooKeeper zooKeeper, String cluster) {
_zooKeeper = zooKeeper;
_cluster = cluster;
_storagePath = ZookeeperPathConstants.getShardLayoutPath(cluster);
_locksStoragePath = ZookeeperPathConstants.getShardLayoutPathLocks(cluster);
ZkUtils.mkNodesStr(_zooKeeper, _storagePath);
ZkUtils.mkNodesStr(_zooKeeper, _locksStoragePath);
_zooKeeperLockManager = new ZooKeeperLockManager(_zooKeeper, _locksStoragePath);
}
@Override
public DistributedLayout readCurrentLayout(String table) {
LOG.info("Checking for existing layout for table [{0}]", table);
try {
String existingStoragePath = findExistingStoragePath(table);
if (existingStoragePath == null) {
LOG.info("Existing storage path NOT FOUND for table [{0}]", table, existingStoragePath);
return null;
}
Stat stat = _zooKeeper.exists(existingStoragePath, false);
if (stat != null) {
LOG.info("Existing storage path for table [{0}] is [{1}]", table, existingStoragePath);
LOG.info("Existing layout found for table [{0}]", table);
byte[] data = _zooKeeper.getData(existingStoragePath, false, stat);
if (data != null) {
return fromBytes(data);
} else {
return null;
}
} else {
LOG.info("Existing storage path NOT FOUND for table [{0}] path [{1}]", table, existingStoragePath);
return null;
}
} catch (Exception e) {
LOG.error("Unknown error during layout read.", e);
throw new RuntimeException(e);
}
}
@Override
public DistributedLayout createDistributedLayout(String table, List<String> shardList, List<String> onlineShardServerList) {
LOG.info("Creating layout for table [{0}]", table);
MasterBasedDistributedLayout layout = _cachedLayoutMap.get(table);
if (layout == null || layout.isOutOfDate(shardList, onlineShardServerList)) {
LOG.info("Layout out of date, recalculating for table [{0}].", table);
MasterBasedDistributedLayout newLayout = newLayout(table, shardList, onlineShardServerList);
_cachedLayoutMap.put(table, newLayout);
return newLayout;
} else {
LOG.info("Layout for table [{0}] is up to date.", table);
return layout;
}
}
private MasterBasedDistributedLayout newLayout(String table, List<String> shardList,
List<String> onlineShardServerList) {
try {
_zooKeeperLockManager.lock(table);
LOG.info("Checking for existing layout for table [{0}]", table);
String existingStoragePath = findExistingStoragePath(table);
Stat stat;
if (existingStoragePath == null) {
stat = null;
} else {
stat = _zooKeeper.exists(existingStoragePath, false);
}
MasterBasedDistributedLayout existingLayout = null;
if (stat != null) {
LOG.info("Existing layout found for table [{0}]", table);
byte[] data = _zooKeeper.getData(existingStoragePath, false, stat);
if (data != null) {
MasterBasedDistributedLayout storedLayout = fromBytes(data);
LOG.info("Checking if layout is out of date for table [{0}]", table);
if (!storedLayout.isOutOfDate(shardList, onlineShardServerList)) {
LOG.info("Layout is up-to-date for table [{0}]", table);
return storedLayout;
}
// If there was a stored layout, use the stored layout as a
// replacement for the existing layout.
existingLayout = storedLayout;
}
}
LOG.info("Calculating new layout for table [{0}]", table);
// recreate
Map<String, String> newCalculatedLayout = calculateNewLayout(table, existingLayout, shardList,
onlineShardServerList);
MasterBasedDistributedLayout layout = new MasterBasedDistributedLayout(newCalculatedLayout, shardList,
onlineShardServerList);
LOG.info("New layout created for table [{0}]", table);
String newPath = _zooKeeper.create(getStoragePath(table) + SEP, toBytes(layout), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
cleanupOldTableLayouts(table, newPath);
return layout;
} catch (LayoutMissingException e) {
throw e;
} catch (Exception e) {
LOG.error("Unknown error during layout update.", e);
throw new RuntimeException(e);
} finally {
try {
_zooKeeperLockManager.unlock(table);
} catch (InterruptedException e) {
LOG.error("Unknown error during unlock.", e);
} catch (KeeperException e) {
LOG.error("Unknown error during unlock.", e);
}
}
}
private void cleanupOldTableLayouts(String table, String newPath) throws KeeperException, InterruptedException {
String tableStoragePath = ZookeeperPathConstants.getTablePath(_cluster, table);
List<String> children = new ArrayList<String>(_zooKeeper.getChildren(tableStoragePath, false));
for (String child : children) {
int index = child.lastIndexOf(SEP);
if (index >= 0) {
if (child.substring(0, index).equals(table)) {
String oldPath = tableStoragePath + "/" + child;
if (!oldPath.equals(newPath)) {
LOG.info("Cleaning up old layouts for table [{0}]", table);
_zooKeeper.delete(oldPath, -1);
}
}
}
}
}
private String findExistingStoragePath(String table) throws KeeperException, InterruptedException {
String tableStoragePath = ZookeeperPathConstants.getTablePath(_cluster, table);
ZkUtils.mkNodesStr(_zooKeeper, tableStoragePath);
List<String> children = new ArrayList<String>(_zooKeeper.getChildren(tableStoragePath, false));
String path = null;
for (String child : children) {
int index = child.lastIndexOf(SEP);
if (index >= 0) {
if (child.substring(0, index).equals(table)) {
if (path == null || child.compareTo(path) > 0) {
path = child;
}
}
}
}
if (path == null) {
return null;
}
return tableStoragePath + "/" + path;
}
private Map<String, String> calculateNewLayout(String table, MasterBasedDistributedLayout existingLayout,
List<String> shardList, List<String> onlineShardServerList) {
Set<String> shardServerSet = new TreeSet<String>(onlineShardServerList);
if (shardServerSet.isEmpty()) {
throw new RuntimeException("No online servers.");
}
if (existingLayout == null) {
// blind setup, basic round robin
LOG.info("Blind shard layout.");
Map<String, String> newLayoutMap = new TreeMap<String, String>();
Iterator<String> iterator = shardServerSet.iterator();
for (String shard : shardList) {
if (!iterator.hasNext()) {
iterator = shardServerSet.iterator();
}
String server = iterator.next();
newLayoutMap.put(shard, server);
}
return newLayoutMap;
} else {
LOG.info("Gather counts for table [{0}]", table);
final Collection<String> shardsThatAreOffline = new TreeSet<String>();
final Map<String, Integer> onlineServerShardCount = new TreeMap<String, Integer>();
final Map<String, String> existingLayoutMap = existingLayout.getLayout();
for (Entry<String, String> e : existingLayoutMap.entrySet()) {
String shard = e.getKey();
String server = e.getValue();
if (!shardServerSet.contains(server)) {
shardsThatAreOffline.add(shard);
} else {
increment(onlineServerShardCount, server);
}
}
LOG.info("Existing layout counts for table [{0}] are [{1}] and offline shards are [{2}]", table,
onlineServerShardCount, shardsThatAreOffline);
LOG.info("Adding in new shard servers for table [{0}] current shard servers are [{1}]", table, shardServerSet);
// Add counts for new shard servers
for (String server : shardServerSet) {
if (!onlineServerShardCount.containsKey(server)) {
LOG.info("New shard server found [{0}]", server);
onlineServerShardCount.put(server, 0);
}
}
LOG.info("Assigning any missing shards [{1}] for table [{0}]", table, shardsThatAreOffline);
// Assign missing shards
final Map<String, String> newLayoutMap = new TreeMap<String, String>(existingLayoutMap);
for (String offlineShard : shardsThatAreOffline) {
// Find lowest shard count.
String server = getServerWithTheLowest(onlineServerShardCount);
LOG.info("Moving shard [{0}] to new server [{1}]", offlineShard, server);
newLayoutMap.put(offlineShard, server);
increment(onlineServerShardCount, server);
}
LOG.info("Leveling any shard hotspots for table [{0}] for layout [{1}]", table, newLayoutMap);
// Level shards
MasterBasedLeveler.level(shardList.size(), shardServerSet.size(), onlineServerShardCount, newLayoutMap, table,
_random.get());
return newLayoutMap;
}
}
private static <K> void increment(Map<K, Integer> map, K k) {
Integer count = map.get(k);
if (count == null) {
map.put(k, 1);
} else {
map.put(k, count + 1);
}
}
private String getServerWithTheLowest(Map<String, Integer> onlineServerShardCount) {
String server = null;
int count = Integer.MAX_VALUE;
for (Entry<String, Integer> e : onlineServerShardCount.entrySet()) {
if (server == null || count > e.getValue()) {
server = e.getKey();
count = e.getValue();
}
}
return server;
}
private byte[] toBytes(MasterBasedDistributedLayout layout) throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
objectOutputStream.writeObject(layout);
objectOutputStream.close();
return byteArrayOutputStream.toByteArray();
}
private MasterBasedDistributedLayout fromBytes(byte[] data) throws IOException {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data);
ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
try {
return (MasterBasedDistributedLayout) objectInputStream.readObject();
} catch (ClassNotFoundException e) {
throw new IOException(e);
} finally {
objectInputStream.close();
}
}
private String getStoragePath(String table) {
String tableStoragePath = ZookeeperPathConstants.getTablePath(_cluster, table);
return tableStoragePath + "/" + table;
}
@SuppressWarnings("serial")
static class MasterBasedDistributedLayout implements DistributedLayout, Serializable {
private final SortedSet<String> _shardList;
private final SortedSet<String> _onlineShardServerList;
private final Map<String, String> _layout;
public MasterBasedDistributedLayout(Map<String, String> layout, Collection<String> shardList,
Collection<String> onlineShardServerList) {
_shardList = new TreeSet<String>(shardList);
_onlineShardServerList = new TreeSet<String>(onlineShardServerList);
_layout = layout;
}
@Override
public Map<String, String> getLayout() {
return _layout;
}
public boolean isOutOfDate(List<String> shardList, List<String> onlineShardServerList) {
if (!_onlineShardServerList.equals(new TreeSet<String>(onlineShardServerList))) {
return true;
} else if (!_shardList.equals(new TreeSet<String>(shardList))) {
return true;
}
return false;
}
}
@Override
public Map<String, ?> getLayoutCache() {
return _cachedLayoutMap;
}
}