| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.curator.framework.recipes.queue; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Function; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.Lists; |
| |
| import org.apache.curator.utils.CloseableUtils; |
| import org.apache.curator.framework.CuratorFramework; |
| import org.apache.curator.framework.api.BackgroundCallback; |
| import org.apache.curator.framework.api.CuratorEvent; |
| import org.apache.curator.framework.api.CuratorEventType; |
| import org.apache.curator.framework.imps.CuratorFrameworkState; |
| import org.apache.curator.framework.listen.ListenerContainer; |
| import org.apache.curator.framework.recipes.leader.LeaderSelector; |
| import org.apache.curator.utils.ZKPaths; |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.data.Stat; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import java.io.IOException; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicReference; |
| import org.apache.curator.utils.PathUtils; |
| |
| /** |
| * <p>An implementation of the Distributed Queue ZK recipe. Items put into the queue |
| * are guaranteed to be ordered (by means of ZK's PERSISTENT_SEQUENTIAL node).</p> |
| * |
| * <p> |
| * Guarantees:</p> |
| * <ul> |
| * <li>If a single consumer takes items out of the queue, they will be ordered FIFO. i.e. if ordering is important, |
| * use a {@link LeaderSelector} to nominate a single consumer.</li> |
| * <li>Unless a {@link QueueBuilder#lockPath(String)} is used, there is only guaranteed processing of each message to the point of receipt by a given instance. |
| * <li>If an instance receives an item from the queue but dies while processing it, the item will be lost. If you need message recoverability, use |
| * a {@link QueueBuilder#lockPath(String)}</li> |
| * </ul> |
| */ |
| public class DistributedQueue<T> implements QueueBase<T> |
| { |
| private final Logger log = LoggerFactory.getLogger(getClass()); |
| private final CuratorFramework client; |
| private final QueueSerializer<T> serializer; |
| private final String queuePath; |
| private final Executor executor; |
| private final ExecutorService service; |
| private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT); |
| private final QueueConsumer<T> consumer; |
| private final int minItemsBeforeRefresh; |
| private final boolean refreshOnWatch; |
| private final boolean isProducerOnly; |
| private final String lockPath; |
| private final AtomicReference<ErrorMode> errorMode = new AtomicReference<ErrorMode>(ErrorMode.REQUEUE); |
| private final ListenerContainer<QueuePutListener<T>> putListenerContainer = new ListenerContainer<QueuePutListener<T>>(); |
| private final AtomicInteger lastChildCount = new AtomicInteger(0); |
| private final int maxItems; |
| private final int finalFlushMs; |
| private final boolean putInBackground; |
| private final ChildrenCache childrenCache; |
| |
| private final AtomicInteger putCount = new AtomicInteger(0); |
| |
| private enum State |
| { |
| LATENT, |
| STARTED, |
| STOPPED |
| } |
| |
| @VisibleForTesting |
| protected enum ProcessType |
| { |
| NORMAL, |
| REMOVE |
| } |
| |
| private static final String QUEUE_ITEM_NAME = "queue-"; |
| |
| DistributedQueue |
| ( |
| CuratorFramework client, |
| QueueConsumer<T> consumer, |
| QueueSerializer<T> serializer, |
| String queuePath, |
| ThreadFactory threadFactory, |
| Executor executor, |
| int minItemsBeforeRefresh, |
| boolean refreshOnWatch, |
| String lockPath, |
| int maxItems, |
| boolean putInBackground, |
| int finalFlushMs |
| ) |
| { |
| Preconditions.checkNotNull(client, "client cannot be null"); |
| Preconditions.checkNotNull(serializer, "serializer cannot be null"); |
| Preconditions.checkNotNull(threadFactory, "threadFactory cannot be null"); |
| Preconditions.checkNotNull(executor, "executor cannot be null"); |
| Preconditions.checkArgument(maxItems > 0, "maxItems must be a positive number"); |
| |
| isProducerOnly = (consumer == null); |
| this.lockPath = (lockPath == null) ? null : PathUtils.validatePath(lockPath); |
| this.putInBackground = putInBackground; |
| this.consumer = consumer; |
| this.minItemsBeforeRefresh = minItemsBeforeRefresh; |
| this.refreshOnWatch = refreshOnWatch; |
| this.client = client; |
| this.serializer = serializer; |
| this.queuePath = PathUtils.validatePath(queuePath); |
| this.executor = executor; |
| this.maxItems = maxItems; |
| this.finalFlushMs = finalFlushMs; |
| service = Executors.newFixedThreadPool(2, threadFactory); |
| childrenCache = new ChildrenCache(client, queuePath); |
| |
| if ( (maxItems != QueueBuilder.NOT_SET) && putInBackground ) |
| { |
| log.warn("Bounded queues should set putInBackground(false) in the builder. Putting in the background will result in spotty maxItem consistency."); |
| } |
| } |
| |
| /** |
| * Start the queue. No other methods work until this is called |
| * |
| * @throws Exception startup errors |
| */ |
| @Override |
| public void start() throws Exception |
| { |
| if ( !state.compareAndSet(State.LATENT, State.STARTED) ) |
| { |
| throw new IllegalStateException(); |
| } |
| |
| try |
| { |
| client.create().creatingParentsIfNeeded().forPath(queuePath); |
| } |
| catch ( KeeperException.NodeExistsException ignore ) |
| { |
| // this is OK |
| } |
| if ( lockPath != null ) |
| { |
| try |
| { |
| client.create().creatingParentsIfNeeded().forPath(lockPath); |
| } |
| catch ( KeeperException.NodeExistsException ignore ) |
| { |
| // this is OK |
| } |
| } |
| |
| if ( !isProducerOnly || (maxItems != QueueBuilder.NOT_SET) ) |
| { |
| childrenCache.start(); |
| } |
| |
| if ( !isProducerOnly ) |
| { |
| service.submit |
| ( |
| new Callable<Object>() |
| { |
| @Override |
| public Object call() |
| { |
| runLoop(); |
| return null; |
| } |
| } |
| ); |
| } |
| } |
| |
| @Override |
| public void close() throws IOException |
| { |
| if ( state.compareAndSet(State.STARTED, State.STOPPED) ) |
| { |
| if ( finalFlushMs > 0 ) |
| { |
| try |
| { |
| flushPuts(finalFlushMs, TimeUnit.MILLISECONDS); |
| } |
| catch ( InterruptedException e ) |
| { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| CloseableUtils.closeQuietly(childrenCache); |
| putListenerContainer.clear(); |
| service.shutdownNow(); |
| } |
| } |
| |
| /** |
| * Return the manager for put listeners |
| * |
| * @return put listener container |
| */ |
| @Override |
| public ListenerContainer<QueuePutListener<T>> getPutListenerContainer() |
| { |
| return putListenerContainer; |
| } |
| |
| /** |
| * Used when the queue is created with a {@link QueueBuilder#lockPath(String)}. Determines |
| * the behavior when the queue consumer throws an exception |
| * |
| * @param newErrorMode the new error mode (the default is {@link ErrorMode#REQUEUE} |
| */ |
| @Override |
| public void setErrorMode(ErrorMode newErrorMode) |
| { |
| Preconditions.checkNotNull(lockPath, "lockPath cannot be null"); |
| |
| if ( newErrorMode == ErrorMode.REQUEUE ) |
| { |
| log.warn("ErrorMode.REQUEUE requires ZooKeeper version 3.4.x+ - make sure you are not using a prior version"); |
| } |
| |
| errorMode.set(newErrorMode); |
| } |
| |
| /** |
| * Wait until any pending puts are committed |
| * |
| * @param waitTime max wait time |
| * @param timeUnit time unit |
| * @return true if the flush was successful, false if it timed out first |
| * @throws InterruptedException if thread was interrupted |
| */ |
| @Override |
| public boolean flushPuts(long waitTime, TimeUnit timeUnit) throws InterruptedException |
| { |
| long msWaitRemaining = TimeUnit.MILLISECONDS.convert(waitTime, timeUnit); |
| synchronized(putCount) |
| { |
| while ( putCount.get() > 0 ) |
| { |
| if ( msWaitRemaining <= 0 ) |
| { |
| return false; |
| } |
| |
| long startMs = System.currentTimeMillis(); |
| |
| putCount.wait(msWaitRemaining); |
| |
| long elapsedMs = System.currentTimeMillis() - startMs; |
| msWaitRemaining -= elapsedMs; |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * Add an item into the queue. Adding is done in the background - thus, this method will |
| * return quickly.<br><br> |
| * NOTE: if an upper bound was set via {@link QueueBuilder#maxItems}, this method will |
| * block until there is available space in the queue. |
| * |
| * @param item item to add |
| * @throws Exception connection issues |
| */ |
| public void put(T item) throws Exception |
| { |
| put(item, 0, null); |
| } |
| |
| /** |
| * Same as {@link #put(Object)} but allows a maximum wait time if an upper bound was set |
| * via {@link QueueBuilder#maxItems}. |
| * |
| * @param item item to add |
| * @param maxWait maximum wait |
| * @param unit wait unit |
| * @return true if items was added, false if timed out |
| * @throws Exception |
| */ |
| public boolean put(T item, int maxWait, TimeUnit unit) throws Exception |
| { |
| checkState(); |
| |
| String path = makeItemPath(); |
| return internalPut(item, null, path, maxWait, unit); |
| } |
| |
| /** |
| * Add a set of items into the queue. Adding is done in the background - thus, this method will |
| * return quickly.<br><br> |
| * NOTE: if an upper bound was set via {@link QueueBuilder#maxItems}, this method will |
| * block until there is available space in the queue. |
| * |
| * @param items items to add |
| * @throws Exception connection issues |
| */ |
| public void putMulti(MultiItem<T> items) throws Exception |
| { |
| putMulti(items, 0, null); |
| } |
| |
| /** |
| * Same as {@link #putMulti(MultiItem)} but allows a maximum wait time if an upper bound was set |
| * via {@link QueueBuilder#maxItems}. |
| * |
| * @param items items to add |
| * @param maxWait maximum wait |
| * @param unit wait unit |
| * @return true if items was added, false if timed out |
| * @throws Exception |
| */ |
| public boolean putMulti(MultiItem<T> items, int maxWait, TimeUnit unit) throws Exception |
| { |
| checkState(); |
| |
| String path = makeItemPath(); |
| return internalPut(null, items, path, maxWait, unit); |
| } |
| |
| /** |
| * Return the most recent message count from the queue. This is useful for debugging/information |
| * purposes only. |
| * |
| * @return count (can be 0) |
| */ |
| @Override |
| public int getLastMessageCount() |
| { |
| return lastChildCount.get(); |
| } |
| |
| boolean internalPut(final T item, MultiItem<T> multiItem, String path, int maxWait, TimeUnit unit) throws Exception |
| { |
| if ( !blockIfMaxed(maxWait, unit) ) |
| { |
| return false; |
| } |
| |
| final MultiItem<T> givenMultiItem = multiItem; |
| if ( item != null ) |
| { |
| final AtomicReference<T> ref = new AtomicReference<T>(item); |
| multiItem = new MultiItem<T>() |
| { |
| @Override |
| public T nextItem() throws Exception |
| { |
| return ref.getAndSet(null); |
| } |
| }; |
| } |
| |
| putCount.incrementAndGet(); |
| byte[] bytes = ItemSerializer.serialize(multiItem, serializer); |
| if ( putInBackground ) |
| { |
| doPutInBackground(item, path, givenMultiItem, bytes); |
| } |
| else |
| { |
| doPutInForeground(item, path, givenMultiItem, bytes); |
| } |
| return true; |
| } |
| |
| private void doPutInForeground(final T item, String path, final MultiItem<T> givenMultiItem, byte[] bytes) throws Exception |
| { |
| client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(path, bytes); |
| synchronized(putCount) |
| { |
| putCount.decrementAndGet(); |
| putCount.notifyAll(); |
| } |
| putListenerContainer.forEach |
| ( |
| new Function<QueuePutListener<T>, Void>() |
| { |
| @Override |
| public Void apply(QueuePutListener<T> listener) |
| { |
| if ( item != null ) |
| { |
| listener.putCompleted(item); |
| } |
| else |
| { |
| listener.putMultiCompleted(givenMultiItem); |
| } |
| return null; |
| } |
| } |
| ); |
| } |
| |
| private void doPutInBackground(final T item, String path, final MultiItem<T> givenMultiItem, byte[] bytes) throws Exception |
| { |
| BackgroundCallback callback = new BackgroundCallback() |
| { |
| @Override |
| public void processResult(CuratorFramework client, CuratorEvent event) throws Exception |
| { |
| if ( event.getResultCode() != KeeperException.Code.OK.intValue() ) |
| { |
| return; |
| } |
| |
| if ( event.getType() == CuratorEventType.CREATE ) |
| { |
| synchronized(putCount) |
| { |
| putCount.decrementAndGet(); |
| putCount.notifyAll(); |
| } |
| } |
| |
| putListenerContainer.forEach |
| ( |
| new Function<QueuePutListener<T>, Void>() |
| { |
| @Override |
| public Void apply(QueuePutListener<T> listener) |
| { |
| if ( item != null ) |
| { |
| listener.putCompleted(item); |
| } |
| else |
| { |
| listener.putMultiCompleted(givenMultiItem); |
| } |
| return null; |
| } |
| } |
| ); |
| } |
| }; |
| internalCreateNode(path, bytes, callback); |
| } |
| |
| @VisibleForTesting |
| void internalCreateNode(String path, byte[] bytes, BackgroundCallback callback) throws Exception |
| { |
| client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).inBackground(callback).forPath(path, bytes); |
| } |
| |
| void checkState() throws Exception |
| { |
| if ( state.get() != State.STARTED ) |
| { |
| throw new IllegalStateException(); |
| } |
| } |
| |
| String makeItemPath() |
| { |
| return ZKPaths.makePath(queuePath, QUEUE_ITEM_NAME); |
| } |
| |
| @VisibleForTesting |
| ChildrenCache getCache() |
| { |
| return childrenCache; |
| } |
| |
| protected void sortChildren(List<String> children) |
| { |
| Collections.sort(children); |
| } |
| |
| protected List<String> getChildren() throws Exception |
| { |
| return client.getChildren().forPath(queuePath); |
| } |
| |
| protected long getDelay(String itemNode) |
| { |
| return 0; |
| } |
| |
| protected boolean tryRemove(String itemNode) throws Exception |
| { |
| boolean isUsingLockSafety = (lockPath != null); |
| if ( isUsingLockSafety ) |
| { |
| return processWithLockSafety(itemNode, ProcessType.REMOVE); |
| } |
| |
| return processNormally(itemNode, ProcessType.REMOVE); |
| } |
| |
| private boolean blockIfMaxed(int maxWait, TimeUnit unit) throws Exception |
| { |
| ChildrenCache.Data data = childrenCache.getData(); |
| while ( data.children.size() >= maxItems ) |
| { |
| long previousVersion = data.version; |
| data = childrenCache.blockingNextGetData(data.version, maxWait, unit); |
| if ( data.version == previousVersion ) |
| { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| private void runLoop() |
| { |
| long currentVersion = -1; |
| long maxWaitMs = -1; |
| try |
| { |
| while ( !Thread.currentThread().isInterrupted() ) |
| { |
| ChildrenCache.Data data = (maxWaitMs > 0) ? childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) : childrenCache.blockingNextGetData(currentVersion); |
| currentVersion = data.version; |
| |
| List<String> children = Lists.newArrayList(data.children); |
| sortChildren(children); // makes sure items are processed in the correct order |
| |
| if ( children.size() > 0 ) |
| { |
| maxWaitMs = getDelay(children.get(0)); |
| if ( maxWaitMs > 0 ) |
| { |
| continue; |
| } |
| } |
| else |
| { |
| continue; |
| } |
| |
| processChildren(children, currentVersion); |
| } |
| } |
| catch ( InterruptedException ignore ) |
| { |
| Thread.currentThread().interrupt(); |
| } |
| catch ( Exception e ) |
| { |
| log.error("Exception caught in background handler", e); |
| } |
| } |
| |
| private void processChildren(List<String> children, long currentVersion) throws Exception |
| { |
| final Semaphore processedLatch = new Semaphore(0); |
| final boolean isUsingLockSafety = (lockPath != null); |
| int min = minItemsBeforeRefresh; |
| for ( final String itemNode : children ) |
| { |
| if ( Thread.currentThread().isInterrupted() ) |
| { |
| processedLatch.release(children.size()); |
| break; |
| } |
| |
| if ( !itemNode.startsWith(QUEUE_ITEM_NAME) ) |
| { |
| log.warn("Foreign node in queue path: " + itemNode); |
| processedLatch.release(); |
| continue; |
| } |
| |
| if ( min-- <= 0 ) |
| { |
| if ( refreshOnWatch && (currentVersion != childrenCache.getData().version) ) |
| { |
| processedLatch.release(children.size()); |
| break; |
| } |
| } |
| |
| if ( getDelay(itemNode) > 0 ) |
| { |
| processedLatch.release(); |
| continue; |
| } |
| |
| executor.execute |
| ( |
| new Runnable() |
| { |
| @Override |
| public void run() |
| { |
| try |
| { |
| if ( isUsingLockSafety ) |
| { |
| processWithLockSafety(itemNode, ProcessType.NORMAL); |
| } |
| else |
| { |
| processNormally(itemNode, ProcessType.NORMAL); |
| } |
| } |
| catch ( Exception e ) |
| { |
| log.error("Error processing message at " + itemNode, e); |
| } |
| finally |
| { |
| processedLatch.release(); |
| } |
| } |
| } |
| ); |
| } |
| |
| processedLatch.acquire(children.size()); |
| } |
| |
| private enum ProcessMessageBytesCode |
| { |
| NORMAL, |
| REQUEUE |
| } |
| |
| private ProcessMessageBytesCode processMessageBytes(String itemNode, byte[] bytes) throws Exception |
| { |
| ProcessMessageBytesCode resultCode = ProcessMessageBytesCode.NORMAL; |
| MultiItem<T> items; |
| try |
| { |
| items = ItemSerializer.deserialize(bytes, serializer); |
| } |
| catch ( Throwable e ) |
| { |
| log.error("Corrupted queue item: " + itemNode, e); |
| return resultCode; |
| } |
| |
| for(;;) |
| { |
| T item = items.nextItem(); |
| if ( item == null ) |
| { |
| break; |
| } |
| |
| try |
| { |
| consumer.consumeMessage(item); |
| } |
| catch ( Throwable e ) |
| { |
| log.error("Exception processing queue item: " + itemNode, e); |
| if ( errorMode.get() == ErrorMode.REQUEUE ) |
| { |
| resultCode = ProcessMessageBytesCode.REQUEUE; |
| break; |
| } |
| } |
| } |
| return resultCode; |
| } |
| |
| private boolean processNormally(String itemNode, ProcessType type) throws Exception |
| { |
| try |
| { |
| String itemPath = ZKPaths.makePath(queuePath, itemNode); |
| Stat stat = new Stat(); |
| |
| byte[] bytes = null; |
| if ( type == ProcessType.NORMAL ) |
| { |
| bytes = client.getData().storingStatIn(stat).forPath(itemPath); |
| } |
| if ( client.getState() == CuratorFrameworkState.STARTED ) |
| { |
| client.delete().withVersion(stat.getVersion()).forPath(itemPath); |
| } |
| |
| if ( type == ProcessType.NORMAL ) |
| { |
| processMessageBytes(itemNode, bytes); |
| } |
| |
| return true; |
| } |
| catch ( KeeperException.NodeExistsException ignore ) |
| { |
| // another process got it |
| } |
| catch ( KeeperException.NoNodeException ignore ) |
| { |
| // another process got it |
| } |
| catch ( KeeperException.BadVersionException ignore ) |
| { |
| // another process got it |
| } |
| |
| return false; |
| } |
| |
| @VisibleForTesting |
| protected boolean processWithLockSafety(String itemNode, ProcessType type) throws Exception |
| { |
| String lockNodePath = ZKPaths.makePath(lockPath, itemNode); |
| boolean lockCreated = false; |
| try |
| { |
| client.create().withMode(CreateMode.EPHEMERAL).forPath(lockNodePath); |
| lockCreated = true; |
| |
| String itemPath = ZKPaths.makePath(queuePath, itemNode); |
| boolean requeue = false; |
| byte[] bytes = null; |
| if ( type == ProcessType.NORMAL ) |
| { |
| bytes = client.getData().forPath(itemPath); |
| requeue = (processMessageBytes(itemNode, bytes) == ProcessMessageBytesCode.REQUEUE); |
| } |
| |
| if ( requeue ) |
| { |
| client.inTransaction() |
| .delete().forPath(itemPath) |
| .and() |
| .create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(makeItemPath(), bytes) |
| .and() |
| .commit(); |
| } |
| else |
| { |
| client.delete().forPath(itemPath); |
| } |
| |
| return true; |
| } |
| catch ( KeeperException.NodeExistsException ignore ) |
| { |
| // another process got it |
| } |
| catch ( KeeperException.NoNodeException ignore ) |
| { |
| // another process got it |
| } |
| catch ( KeeperException.BadVersionException ignore ) |
| { |
| // another process got it |
| } |
| finally |
| { |
| if ( lockCreated ) |
| { |
| client.delete().guaranteed().forPath(lockNodePath); |
| } |
| } |
| |
| return false; |
| } |
| } |