| package org.apache.solr.cloud; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| import org.apache.solr.common.SolrException; |
| import org.apache.solr.common.SolrException.ErrorCode; |
| import org.apache.solr.common.cloud.SolrZkClient; |
| import org.apache.solr.common.cloud.ZkCmdExecutor; |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.WatchedEvent; |
| import org.apache.zookeeper.Watcher; |
| import org.apache.zookeeper.ZooDefs; |
| import org.apache.zookeeper.data.ACL; |
| import org.apache.zookeeper.data.Stat; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.util.List; |
| |
| /** |
| * A distributed map. |
| * This supports basic map functions e.g. get, put, contains for interaction with zk which |
| * don't have to be ordered i.e. DistributedQueue. |
| */ |
| public class DistributedMap { |
| private static final Logger LOG = LoggerFactory |
| .getLogger(DistributedMap.class); |
| |
| private static long DEFAULT_TIMEOUT = 5*60*1000; |
| |
| private final String dir; |
| |
| private SolrZkClient zookeeper; |
| |
| private final String prefix = "mn-"; |
| |
| private final String response_prefix = "mnr-" ; |
| |
| public DistributedMap(SolrZkClient zookeeper, String dir, List<ACL> acl) { |
| this.dir = dir; |
| |
| ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(zookeeper.getZkClientTimeout()); |
| try { |
| cmdExecutor.ensureExists(dir, zookeeper); |
| } catch (KeeperException e) { |
| throw new SolrException(ErrorCode.SERVER_ERROR, e); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new SolrException(ErrorCode.SERVER_ERROR, e); |
| } |
| |
| this.zookeeper = zookeeper; |
| } |
| |
| private class LatchChildWatcher implements Watcher { |
| |
| Object lock = new Object(); |
| private WatchedEvent event = null; |
| |
| public LatchChildWatcher() {} |
| |
| public LatchChildWatcher(Object lock) { |
| this.lock = lock; |
| } |
| |
| @Override |
| public void process(WatchedEvent event) { |
| LOG.info("LatchChildWatcher fired on path: " + event.getPath() + " state: " |
| + event.getState() + " type " + event.getType()); |
| synchronized (lock) { |
| this.event = event; |
| lock.notifyAll(); |
| } |
| } |
| |
| public void await(long timeout) throws InterruptedException { |
| synchronized (lock) { |
| lock.wait(timeout); |
| } |
| } |
| |
| public WatchedEvent getWatchedEvent() { |
| return event; |
| } |
| } |
| |
| /** |
| * Inserts data into zookeeper. |
| * |
| * @return true if data was successfully added |
| */ |
| private String createData(String path, byte[] data, CreateMode mode) |
| throws KeeperException, InterruptedException { |
| for (;;) { |
| try { |
| return zookeeper.create(path, data, mode, true); |
| } catch (KeeperException.NoNodeException e) { |
| try { |
| zookeeper.create(dir, new byte[0], CreateMode.PERSISTENT, true); |
| } catch (KeeperException.NodeExistsException ne) { |
| // someone created it |
| } |
| } |
| } |
| } |
| |
| |
| public boolean put(String trackingId, byte[] data) throws KeeperException, InterruptedException { |
| return createData(dir + "/" + prefix + trackingId, data, |
| CreateMode.PERSISTENT) != null; |
| } |
| |
| /** |
| * Offer the data and wait for the response |
| * |
| */ |
| public MapEvent put(String trackingId, byte[] data, long timeout) throws KeeperException, |
| InterruptedException { |
| String path = createData(dir + "/" + prefix + trackingId, data, |
| CreateMode.PERSISTENT); |
| String watchID = createData( |
| dir + "/" + response_prefix + path.substring(path.lastIndexOf("-") + 1), |
| null, CreateMode.EPHEMERAL); |
| Object lock = new Object(); |
| LatchChildWatcher watcher = new LatchChildWatcher(lock); |
| synchronized (lock) { |
| if (zookeeper.exists(watchID, watcher, true) != null) { |
| watcher.await(timeout); |
| } |
| } |
| byte[] bytes = zookeeper.getData(watchID, null, null, true); |
| zookeeper.delete(watchID, -1, true); |
| return new MapEvent(watchID, bytes, watcher.getWatchedEvent()); |
| } |
| |
| public MapEvent get(String trackingId) throws KeeperException, InterruptedException { |
| return new MapEvent(trackingId, zookeeper.getData(dir + "/" + prefix + trackingId, null, null, true), null); |
| } |
| |
| public boolean contains(String trackingId) throws KeeperException, InterruptedException { |
| return zookeeper.exists(dir + "/" + prefix + trackingId, true); |
| } |
| |
| public int size() throws KeeperException, InterruptedException { |
| Stat stat = new Stat(); |
| zookeeper.getData(dir, null, stat, true); |
| return stat.getNumChildren(); |
| } |
| |
| public void remove(String trackingId) throws KeeperException, InterruptedException { |
| zookeeper.delete(dir + "/" + prefix + trackingId, -1, true); |
| } |
| |
| /** |
| * Helper method to clear all child nodes for a parent node. |
| */ |
| public void clear() throws KeeperException, InterruptedException { |
| List<String> childNames = zookeeper.getChildren(dir, null, true); |
| for(String childName: childNames) { |
| zookeeper.delete(dir + "/" + childName, -1, true); |
| } |
| |
| } |
| |
| public static class MapEvent { |
| @Override |
| public int hashCode() { |
| final int prime = 31; |
| int result = 1; |
| result = prime * result + ((id == null) ? 0 : id.hashCode()); |
| return result; |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (this == obj) return true; |
| if (obj == null) return false; |
| if (getClass() != obj.getClass()) return false; |
| MapEvent other = (MapEvent) obj; |
| if (id == null) { |
| if (other.id != null) return false; |
| } else if (!id.equals(other.id)) return false; |
| return true; |
| } |
| |
| private WatchedEvent event = null; |
| private String id; |
| private byte[] bytes; |
| |
| MapEvent(String id, byte[] bytes, WatchedEvent event) { |
| this.id = id; |
| this.bytes = bytes; |
| this.event = event; |
| } |
| |
| public void setId(String id) { |
| this.id = id; |
| } |
| |
| public String getId() { |
| return id; |
| } |
| |
| public void setBytes(byte[] bytes) { |
| this.bytes = bytes; |
| } |
| |
| public byte[] getBytes() { |
| return bytes; |
| } |
| |
| public WatchedEvent getWatchedEvent() { |
| return event; |
| } |
| |
| } |
| |
| |
| } |