blob: 1cb43018387f098c17c2cc198fb367e7b7e956a5 [file] [log] [blame]
package org.apache.qpid.server.exchange.topic;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.AMQShortStringTokenizer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
public class TopicParser
{
private static final byte TOPIC_DELIMITER = (byte)'.';
private final TopicWordDictionary _dictionary = new TopicWordDictionary();
private final AtomicReference<TopicMatcherDFAState> _stateMachine = new AtomicReference<TopicMatcherDFAState>();
private static class Position
{
private final TopicWord _word;
private final boolean _selfTransition;
private final int _position;
private final boolean _endState;
private boolean _followedByAnyLoop;
public Position(final int position, final TopicWord word, final boolean selfTransition, final boolean endState)
{
_position = position;
_word = word;
_selfTransition = selfTransition;
_endState = endState;
}
public TopicWord getWord()
{
return _word;
}
public boolean isSelfTransition()
{
return _selfTransition;
}
public int getPosition()
{
return _position;
}
public boolean isEndState()
{
return _endState;
}
public boolean isFollowedByAnyLoop()
{
return _followedByAnyLoop;
}
public void setFollowedByAnyLoop(boolean followedByAnyLoop)
{
_followedByAnyLoop = followedByAnyLoop;
}
}
private static final Position ERROR_POSITION = new Position(Integer.MAX_VALUE,null, true, false);
private static class SimpleState
{
private Set<Position> _positions;
private Map<TopicWord, SimpleState> _nextState;
}
public void addBinding(AMQShortString bindingKey, TopicMatcherResult result)
{
TopicMatcherDFAState startingStateMachine;
TopicMatcherDFAState newStateMachine;
do
{
startingStateMachine = _stateMachine.get();
if(startingStateMachine == null)
{
newStateMachine = createStateMachine(bindingKey, result);
}
else
{
newStateMachine = startingStateMachine.mergeStateMachines(createStateMachine(bindingKey, result));
}
}
while(!_stateMachine.compareAndSet(startingStateMachine,newStateMachine));
}
public Collection<TopicMatcherResult> parse(AMQShortString routingKey)
{
TopicMatcherDFAState stateMachine = _stateMachine.get();
if(stateMachine == null)
{
return Collections.EMPTY_SET;
}
else
{
return stateMachine.parse(_dictionary,routingKey);
}
}
TopicMatcherDFAState createStateMachine(AMQShortString bindingKey, TopicMatcherResult result)
{
List<TopicWord> wordList = createTopicWordList(bindingKey);
int wildCards = 0;
for(TopicWord word : wordList)
{
if(word == TopicWord.WILDCARD_WORD)
{
wildCards++;
}
}
if(wildCards == 0)
{
TopicMatcherDFAState[] states = new TopicMatcherDFAState[wordList.size()+1];
states[states.length-1] = new TopicMatcherDFAState(Collections.EMPTY_MAP, Collections.singleton(result));
for(int i = states.length-2; i >= 0; i--)
{
states[i] = new TopicMatcherDFAState(Collections.singletonMap(wordList.get(i),states[i+1]),Collections.EMPTY_SET);
}
return states[0];
}
else if(wildCards == wordList.size())
{
Map<TopicWord,TopicMatcherDFAState> stateMap = new HashMap<TopicWord,TopicMatcherDFAState>();
TopicMatcherDFAState state = new TopicMatcherDFAState(stateMap, Collections.singleton(result));
stateMap.put(TopicWord.ANY_WORD, state);
return state;
}
int positionCount = wordList.size() - wildCards;
Position[] positions = new Position[positionCount+1];
int lastWord;
if(wordList.get(wordList.size()-1)== TopicWord.WILDCARD_WORD)
{
lastWord = wordList.size()-1;
positions[positionCount] = new Position(positionCount, TopicWord.ANY_WORD, true, true);
}
else
{
lastWord = wordList.size();
positions[positionCount] = new Position(positionCount, TopicWord.ANY_WORD, false, true);
}
int pos = 0;
int wordPos = 0;
while(wordPos < lastWord)
{
TopicWord word = wordList.get(wordPos++);
if(word == TopicWord.WILDCARD_WORD)
{
int nextWordPos = wordPos++;
word = wordList.get(nextWordPos);
positions[pos] = new Position(pos++,word,true,false);
}
else
{
positions[pos] = new Position(pos++,word,false,false);
}
}
for(int p = 0; p<positionCount; p++)
{
boolean followedByWildcards = true;
int n = p;
while(followedByWildcards && n<(positionCount+1))
{
if(positions[n].isSelfTransition())
{
break;
}
else if(positions[n].getWord() !=TopicWord.ANY_WORD)
{
followedByWildcards = false;
}
n++;
}
positions[p].setFollowedByAnyLoop(followedByWildcards && (n!= positionCount+1));
}
// from each position you transition to a set of other positions.
// we approach this by examining steps of increasing length - so we
// look how far we can go from the start position in 1 word, 2 words, etc...
Map<Set<Position>,SimpleState> stateMap = new HashMap<Set<Position>,SimpleState>();
SimpleState state = new SimpleState();
state._positions = Collections.singleton( positions[0] );
stateMap.put(state._positions, state);
calculateNextStates(state, stateMap, positions);
SimpleState[] simpleStates = stateMap.values().toArray(new SimpleState[stateMap.size()]);
HashMap<TopicWord, TopicMatcherDFAState>[] dfaStateMaps = new HashMap[simpleStates.length];
Map<SimpleState, TopicMatcherDFAState> simple2DFAMap = new HashMap<SimpleState, TopicMatcherDFAState>();
for(int i = 0; i < simpleStates.length; i++)
{
Collection<TopicMatcherResult> results;
boolean endState = false;
for(Position p : simpleStates[i]._positions)
{
if(p.isEndState())
{
endState = true;
break;
}
}
if(endState)
{
results = Collections.singleton(result);
}
else
{
results = Collections.EMPTY_SET;
}
dfaStateMaps[i] = new HashMap<TopicWord, TopicMatcherDFAState>();
simple2DFAMap.put(simpleStates[i], new TopicMatcherDFAState(dfaStateMaps[i],results));
}
for(int i = 0; i < simpleStates.length; i++)
{
SimpleState simpleState = simpleStates[i];
Map<TopicWord, SimpleState> nextSimpleStateMap = simpleState._nextState;
for(Map.Entry<TopicWord, SimpleState> stateMapEntry : nextSimpleStateMap.entrySet())
{
dfaStateMaps[i].put(stateMapEntry.getKey(), simple2DFAMap.get(stateMapEntry.getValue()));
}
}
return simple2DFAMap.get(state);
}
private void calculateNextStates(final SimpleState state,
final Map<Set<Position>, SimpleState> stateMap,
final Position[] positions)
{
Map<TopicWord, Set<Position>> transitions = new HashMap<TopicWord,Set<Position>>();
for(Position pos : state._positions)
{
if(pos.isSelfTransition())
{
Set<Position> dest = transitions.get(TopicWord.ANY_WORD);
if(dest == null)
{
dest = new HashSet<Position>();
transitions.put(TopicWord.ANY_WORD,dest);
}
dest.add(pos);
}
final int nextPos = pos.getPosition() + 1;
Position nextPosition = nextPos == positions.length ? ERROR_POSITION : positions[nextPos];
Set<Position> dest = transitions.get(pos.getWord());
if(dest == null)
{
dest = new HashSet<Position>();
transitions.put(pos.getWord(),dest);
}
dest.add(nextPosition);
}
Set<Position> anyWordTransitions = transitions.get(TopicWord.ANY_WORD);
if(anyWordTransitions != null)
{
for(Set<Position> dest : transitions.values())
{
dest.addAll(anyWordTransitions);
}
}
state._nextState = new HashMap<TopicWord, SimpleState>();
for(Map.Entry<TopicWord,Set<Position>> dest : transitions.entrySet())
{
if(dest.getValue().size()>1)
{
dest.getValue().remove(ERROR_POSITION);
}
Position loopingTerminal = null;
for(Position destPos : dest.getValue())
{
if(destPos.isSelfTransition() && destPos.isEndState())
{
loopingTerminal = destPos;
break;
}
}
if(loopingTerminal!=null)
{
dest.setValue(Collections.singleton(loopingTerminal));
}
else
{
Position anyLoop = null;
for(Position destPos : dest.getValue())
{
if(destPos.isFollowedByAnyLoop())
{
if(anyLoop == null || anyLoop.getPosition() < destPos.getPosition())
{
anyLoop = destPos;
}
}
}
if(anyLoop != null)
{
Collection<Position> removals = new ArrayList<Position>();
for(Position destPos : dest.getValue())
{
if(destPos.getPosition() < anyLoop.getPosition())
{
removals.add(destPos);
}
}
dest.getValue().removeAll(removals);
}
}
SimpleState stateForEntry = stateMap.get(dest.getValue());
if(stateForEntry == null)
{
stateForEntry = new SimpleState();
stateForEntry._positions = dest.getValue();
stateMap.put(dest.getValue(),stateForEntry);
calculateNextStates(stateForEntry,
stateMap,
positions);
}
state._nextState.put(dest.getKey(),stateForEntry);
}
// remove redundant transitions
SimpleState anyWordState = state._nextState.get(TopicWord.ANY_WORD);
if(anyWordState != null)
{
List<TopicWord> removeList = new ArrayList<TopicWord>();
for(Map.Entry<TopicWord,SimpleState> entry : state._nextState.entrySet())
{
if(entry.getValue() == anyWordState && entry.getKey() != TopicWord.ANY_WORD)
{
removeList.add(entry.getKey());
}
}
for(TopicWord removeKey : removeList)
{
state._nextState.remove(removeKey);
}
}
}
private List<TopicWord> createTopicWordList(final AMQShortString bindingKey)
{
AMQShortStringTokenizer tokens = bindingKey.tokenize(TOPIC_DELIMITER);
TopicWord previousWord = null;
List<TopicWord> wordList = new ArrayList<TopicWord>();
while(tokens.hasMoreTokens())
{
TopicWord nextWord = _dictionary.getOrCreateWord(tokens.nextToken());
if(previousWord == TopicWord.WILDCARD_WORD)
{
if(nextWord == TopicWord.WILDCARD_WORD)
{
// consecutive wildcards can be merged
// i.e. subsequent wildcards can be discarded
continue;
}
else if(nextWord == TopicWord.ANY_WORD)
{
// wildcard and anyword can be reordered to always put anyword first
wordList.set(wordList.size()-1,TopicWord.ANY_WORD);
nextWord = TopicWord.WILDCARD_WORD;
}
}
wordList.add(nextWord);
previousWord = nextWord;
}
return wordList;
}
}