blob: 15045aaa7bbd91caf7bcc2a2f233111cdcd62789 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.framework.recipes.queue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.listen.ListenerContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
* A version of {@link DistributedQueue} that allows IDs to be associated with queue items. Items
* can then be removed from the queue if needed
*/
public class DistributedIdQueue<T> implements QueueBase<T>
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final DistributedQueue<T> queue;
private static final char SEPARATOR = '|';
private static class Parts
{
final String id;
final String cleaned;
private Parts(String id, String cleaned)
{
this.id = id;
this.cleaned = cleaned;
}
}
DistributedIdQueue
(
CuratorFramework client,
QueueConsumer<T> consumer, QueueSerializer<T>
serializer, String
queuePath, ThreadFactory
threadFactory, Executor
executor,
int minItemsBeforeRefresh,
boolean refreshOnWatch,
String lockPath,
int maxItems,
boolean putInBackground,
int finalFlushMs
)
{
queue = new DistributedQueue<T>(client, consumer, serializer, queuePath, threadFactory, executor, minItemsBeforeRefresh, refreshOnWatch, lockPath, maxItems, putInBackground, finalFlushMs)
{
@Override
protected void sortChildren(List<String> children)
{
internalSortChildren(children);
}
@Override
protected String makeRequeueItemPath(String itemPath)
{
return makeIdPath(parseId(itemPath).id);
}
};
if ( queue.makeItemPath().contains(Character.toString(SEPARATOR)) )
{
throw new IllegalStateException("DistributedQueue can't use " + SEPARATOR);
}
}
@Override
public void start() throws Exception
{
queue.start();
}
@Override
public void close() throws IOException
{
queue.close();
}
@Override
public ListenerContainer<QueuePutListener<T>> getPutListenerContainer()
{
return queue.getPutListenerContainer();
}
@Override
public void setErrorMode(ErrorMode newErrorMode)
{
queue.setErrorMode(newErrorMode);
}
@Override
public boolean flushPuts(long waitTime, TimeUnit timeUnit) throws InterruptedException
{
return queue.flushPuts(waitTime, timeUnit);
}
@Override
public int getLastMessageCount()
{
return queue.getLastMessageCount();
}
/**
* Put an item into the queue with the given Id<br><br>
* NOTE: if an upper bound was set via {@link QueueBuilder#maxItems}, this method will
* block until there is available space in the queue.
*
* @param item item
* @param itemId item Id
* @throws Exception errors
*/
public void put(T item, String itemId) throws Exception
{
put(item, itemId, 0, null);
}
/**
* Same as {@link #put(Object, String)} but allows a maximum wait time if an upper bound was set
* via {@link QueueBuilder#maxItems}.
*
* @param item item
* @param itemId item Id
* @param maxWait maximum wait
* @param unit wait unit
* @return true if items was added, false if timed out
* @throws Exception
*/
public boolean put(T item, String itemId, int maxWait, TimeUnit unit) throws Exception
{
Preconditions.checkArgument(isValidId(itemId), "Invalid id: " + itemId);
queue.checkState();
return queue.internalPut(item, null, makeIdPath(itemId), maxWait, unit);
}
/**
* Remove any items with the given Id
*
* @param id item Id to remove
* @return number of items removed
* @throws Exception errors
*/
public int remove(String id) throws Exception
{
id = Preconditions.checkNotNull(id, "id cannot be null");
queue.checkState();
int count = 0;
for ( String name : queue.getChildren() )
{
if ( parseId(name).id.equals(id) )
{
if ( queue.tryRemove(name) )
{
++count;
}
}
}
return count;
}
@VisibleForTesting
boolean debugIsQueued(String id) throws Exception
{
for ( String name : queue.getChildren() )
{
if ( parseId(name).id.equals(id) )
{
return true;
}
}
return false;
}
private String makeIdPath(String itemId)
{
return queue.makeItemPath() + SEPARATOR + fixId(itemId) + SEPARATOR;
}
private void internalSortChildren(List<String> children)
{
Collections.sort
(
children,
new Comparator<String>()
{
@Override
public int compare(String o1, String o2)
{
return parseId(o1).cleaned.compareTo(parseId(o2).cleaned);
}
}
);
}
private boolean isValidId(String id)
{
return (id != null) && (id.length() > 0);
}
private static String fixId(String id)
{
String fixed = id.replace('/', '_');
return fixed.replace(SEPARATOR, '_');
}
private Parts parseId(String name)
{
int firstIndex = name.indexOf(SEPARATOR);
int secondIndex = name.indexOf(SEPARATOR, firstIndex + 1);
if ( (firstIndex < 0) || (secondIndex < 0) )
{
log.error("Bad node in queue: " + name);
return new Parts(name, name);
}
return new Parts
(
name.substring(firstIndex + 1, secondIndex),
name.substring(0, firstIndex) + name.substring(secondIndex + 1)
);
}
}