| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.solr.cloud; |
| |
| import com.codahale.metrics.Timer; |
| import org.apache.solr.common.AlreadyClosedException; |
| import org.apache.solr.common.SolrException; |
| import org.apache.solr.common.cloud.SolrZkClient; |
| import org.apache.solr.common.cloud.ZkNodeProps; |
| import org.apache.solr.common.util.IOUtils; |
| import org.apache.solr.common.util.TimeOut; |
| import org.apache.solr.common.util.TimeSource; |
| import org.apache.solr.common.util.Utils; |
| import org.apache.zookeeper.AddWatchMode; |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.WatchedEvent; |
| import org.apache.zookeeper.Watcher; |
| import org.apache.zookeeper.data.Stat; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.Closeable; |
| import java.lang.invoke.MethodHandles; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| /** |
| * A {@link ZkDistributedQueue} augmented with helper methods specific to the overseer task queues. |
| * Methods specific to this subclass ignore superclass internal state and hit ZK directly. |
| * This is inefficient! But the API on this class is kind of muddy.. |
| */ |
| public class OverseerTaskQueue extends ZkDistributedQueue { |
| private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); |
| |
| static final String RESPONSE_PREFIX = "qnr-" ; |
| public static final byte[] BYTES = new byte[0]; |
| |
| private final AtomicBoolean shuttingDown = new AtomicBoolean(false); |
| private final AtomicInteger pendingResponses = new AtomicInteger(0); |
| |
| public OverseerTaskQueue(SolrZkClient zookeeper, String dir) { |
| this(zookeeper, dir, new Stats()); |
| } |
| |
| public OverseerTaskQueue(SolrZkClient zookeeper, String dir, Stats stats) { |
| super(zookeeper, dir, stats); |
| } |
| |
| /** |
| * Returns true if the queue contains a task with the specified async id. |
| */ |
| public boolean containsTaskWithRequestId(String requestIdKey, String requestId) |
| throws KeeperException, InterruptedException { |
| |
| List<String> childNames = zookeeper.getChildren(dir, null, true); |
| stats.setQueueLength(childNames.size()); |
| for (String childName : childNames) { |
| if (childName != null && childName.startsWith(PREFIX)) { |
| try { |
| byte[] data = zookeeper.getData(dir + "/" + childName, null, null, true); |
| if (data != null) { |
| ZkNodeProps message = ZkNodeProps.load(data); |
| if (message.containsKey(requestIdKey)) { |
| if (log.isDebugEnabled()) { |
| log.debug("Looking for {}, found {}", message.get(requestIdKey), requestId); |
| } |
| if(message.get(requestIdKey).equals(requestId)) return true; |
| } |
| } |
| } catch (KeeperException.NoNodeException e) { |
| // Another client removed the node first, try next |
| } |
| } |
| } |
| |
| return false; |
| } |
| |
| /** |
| * Remove the event and save the response into the other path. |
| */ |
| public void remove(QueueEvent event) throws KeeperException, |
| InterruptedException { |
| Timer.Context time = stats.time(dir + "_remove_event"); |
| try { |
| String path = event.getId(); |
| String responsePath = dir + "/" + RESPONSE_PREFIX |
| + path.substring(path.lastIndexOf("-") + 1); |
| |
| try { |
| zookeeper.setData(responsePath, event.getBytes(), true); |
| } catch (KeeperException.NoNodeException ignored) { |
| // we must handle the race case where the node no longer exists |
| log.info("Response ZK path: {} doesn't exist. Requestor may have disconnected from ZooKeeper", responsePath); |
| } |
| try { |
| zookeeper.delete(path, -1, true); |
| } catch (KeeperException.NoNodeException ignored) { |
| } |
| } finally { |
| time.stop(); |
| } |
| } |
| |
| /** |
| * Watcher that blocks until a WatchedEvent occurs for a znode. |
| */ |
| static final class LatchWatcher implements Watcher, Closeable { |
| |
| private final Lock lock; |
| private final Condition eventReceived; |
| private final String path; |
| private final SolrZkClient zkClient; |
| private volatile WatchedEvent event; |
| |
| LatchWatcher(String path, SolrZkClient zkClient) { |
| this.lock = new ReentrantLock(true); |
| this.eventReceived = lock.newCondition(); |
| this.path = path; |
| this.zkClient = zkClient; |
| } |
| |
| |
| @Override |
| public void process(WatchedEvent event) { |
| // session events are not change events, and do not remove the watcher |
| if (Event.EventType.None.equals(event.getType())) { |
| return; |
| } |
| |
| if (log.isDebugEnabled()) log.debug("{} fired on path {} state {} latchEventType {}", event.getType(), event.getPath(), event.getState()); |
| |
| Stat stat = null; |
| try { |
| stat = zkClient.exists(path, null, true); |
| } catch (KeeperException e) { |
| log.error("exists failed", e); |
| } catch (InterruptedException e) { |
| log.error("interrupted", e); |
| } |
| if (stat != null && stat.getDataLength() > 0) { |
| lock.lock(); |
| try { |
| this.event = event; |
| eventReceived.signalAll(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| } |
| |
| public void await(long timeoutMs) { |
| |
| if (event != null) { |
| return; |
| } |
| |
| createWatch(); |
| |
| TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME); |
| |
| lock.lock(); |
| try { |
| while (!timeout.hasTimedOut() && event == null) { |
| try { |
| eventReceived.await(500, TimeUnit.MILLISECONDS); |
| } catch (InterruptedException e) { |
| |
| } |
| } |
| |
| if (timeout.hasTimedOut()) { |
| log.warn("Timeout waiting for response after {}ms", timeout.timeElapsed(TimeUnit.MILLISECONDS)); |
| } |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| private void createWatch() { |
| try { |
| zkClient.addWatch(path, this, AddWatchMode.PERSISTENT); |
| } catch (Exception e) { |
| log.error("could not add watch", e); |
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); |
| } |
| } |
| |
| public WatchedEvent getWatchedEvent() { |
| return event; |
| } |
| |
| @Override |
| public void close() { |
| try { |
| zkClient.removeWatches(path, this, WatcherType.Any, true); |
| } catch (KeeperException.NoWatcherException | AlreadyClosedException e) { |
| |
| } catch (Exception e) { |
| log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage()); |
| } |
| } |
| } |
| |
| /** |
| * Inserts data into zookeeper. |
| * |
| * @return true if data was successfully added |
| */ |
| private String createData(String path, byte[] data, CreateMode mode) |
| throws KeeperException, InterruptedException { |
| for (;;) { |
| try { |
| return zookeeper.create(path, data, mode, true); |
| } catch (KeeperException.NodeExistsException e) { |
| log.warn("Found request node already, waiting to see if it frees up ..."); |
| // TODO: use a watch? |
| Thread.sleep(50); |
| try { |
| return zookeeper.create(path, data, mode, true); |
| } catch (KeeperException.NodeExistsException ne) { |
| // someone created it |
| } |
| } |
| } |
| } |
| |
| /** |
| * Offer the data and wait for the response |
| * |
| */ |
| public QueueEvent offer(byte[] data, long timeout) throws KeeperException, |
| InterruptedException { |
| if (log.isDebugEnabled()) log.debug("offer operation to the Overseer queue {}", Utils.fromJSON(data)); |
| |
| // if (shuttingDown.get()) { |
| // throw new SolrException(SolrException.ErrorCode.CONFLICT,"Solr is shutting down, no more overseer tasks may be offered"); |
| // } |
| // Timer.Context time = stats.time(dir + "_offer"); |
| LatchWatcher watcher = null; |
| try { |
| // Create and watch the response node before creating the request node; |
| // otherwise we may miss the response. |
| String watchID = createResponseNode(); |
| |
| if (log.isDebugEnabled()) log.debug("watchId for response node {}, setting a watch ... ", watchID); |
| |
| watcher = new LatchWatcher(watchID, zookeeper); |
| |
| // create the request node |
| String path = createRequestNode(data, watchID); |
| if (log.isDebugEnabled()) log.debug("created request node at {}", path); |
| |
| pendingResponses.incrementAndGet(); |
| if (log.isDebugEnabled()) log.debug("wait on latch {}", timeout); |
| watcher.await(timeout); |
| |
| byte[] bytes = zookeeper.getData(watchID, null, null, true); |
| if (log.isDebugEnabled()) log.debug("get data from response node {} {} {}", watchID, bytes == null ? null : bytes.length, watcher.getWatchedEvent()); |
| |
| if (bytes == null || bytes.length == 0) { |
| log.error("Found no data at response node, Overseer likely changed {}", watchID); |
| } |
| |
| QueueEvent event = new QueueEvent(watchID, bytes, watcher.getWatchedEvent()); |
| |
| zookeeper.delete(watchID,-1, true, false); |
| |
| return event; |
| } finally { |
| // time.stop(); |
| pendingResponses.decrementAndGet(); |
| IOUtils.closeQuietly(watcher); |
| } |
| } |
| |
| |
| String createRequestNode(byte[] data, String watchID) throws KeeperException, InterruptedException { |
| return createData(dir + "/" + PREFIX + watchID.substring(watchID.lastIndexOf("-") + 1), |
| data, CreateMode.EPHEMERAL); |
| } |
| |
| String createResponseNode() throws KeeperException, InterruptedException { |
| return createData( |
| Overseer.OVERSEER_COLLECTION_MAP_COMPLETED + "/" + RESPONSE_PREFIX, |
| null, CreateMode.PERSISTENT_SEQUENTIAL); |
| } |
| |
| private static void printQueueEventsListElementIds(ArrayList<QueueEvent> topN) { |
| if (log.isDebugEnabled() && !topN.isEmpty()) { |
| StringBuilder sb = new StringBuilder("["); |
| for (QueueEvent queueEvent : topN) { |
| sb.append(queueEvent.getId()).append(", "); |
| } |
| sb.append("]"); |
| log.debug("Returning topN elements: {}", sb); |
| } |
| } |
| |
| public static class QueueEvent { |
| @Override |
| public int hashCode() { |
| final int prime = 31; |
| int result = 1; |
| result = prime * result + ((id == null) ? 0 : id.hashCode()); |
| return result; |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (this == obj) return true; |
| if (obj == null) return false; |
| if (getClass() != obj.getClass()) return false; |
| QueueEvent other = (QueueEvent) obj; |
| if (id == null) { |
| if (other.id != null) return false; |
| } else if (!id.equals(other.id)) return false; |
| return true; |
| } |
| |
| private final WatchedEvent event; |
| private final String id; |
| private volatile byte[] bytes; |
| |
| QueueEvent(String id, byte[] bytes, WatchedEvent event) { |
| this.id = id; |
| this.bytes = bytes; |
| this.event = event; |
| } |
| |
| public String getId() { |
| return id; |
| } |
| |
| public void setBytes(byte[] bytes) { |
| this.bytes = bytes; |
| } |
| |
| public byte[] getBytes() { |
| return bytes; |
| } |
| |
| public WatchedEvent getWatchedEvent() { |
| return event; |
| } |
| |
| } |
| } |