blob: 76b404813cf94550e501ecb1964de485a8a70c8b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.apache.geode.redis.internal.pubsub;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.redis.internal.executor.GlobPattern;
import org.apache.geode.redis.internal.netty.Client;
import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
/**
* Class that manages both channel and pattern subscriptions.
*/
public class Subscriptions {
private final List<Subscription> subscriptions = new CopyOnWriteArrayList<>();
/**
* Check whether a given client has already subscribed to a channel or pattern
*
* @param channelOrPattern channel or pattern
* @param client a client connection
*/
@VisibleForTesting
boolean exists(Object channelOrPattern, Client client) {
return subscriptions.stream()
.anyMatch(subscription -> subscription.isEqualTo(channelOrPattern, client));
}
/**
* Return all subscriptions for a given client
*
* @param client the subscribed client
* @return a list of subscriptions
*/
public List<Subscription> findSubscriptions(Client client) {
return subscriptions.stream()
.filter(subscription -> subscription.matchesClient(client))
.collect(Collectors.toList());
}
/**
* Return all subscriptions for a given channel or pattern
*
* @param channelOrPattern the channel or pattern
* @return a list of subscriptions
*/
public List<Subscription> findSubscriptions(byte[] channelOrPattern) {
return subscriptions.stream()
.filter(subscription -> subscription.matches(channelOrPattern))
.collect(Collectors.toList());
}
/**
* Add a new subscription
*/
@VisibleForTesting
void add(Subscription subscription) {
subscriptions.add(subscription);
}
/**
* Remove all subscriptions for a given client
*/
public void remove(Client client) {
subscriptions.removeIf(subscription -> subscription.matchesClient(client));
}
/**
* Remove a single subscription
*/
@VisibleForTesting
boolean remove(Object channel, Client client) {
return subscriptions.removeIf(subscription -> subscription.isEqualTo(channel, client));
}
/**
* @return the total number of all local subscriptions
*/
@VisibleForTesting
int size() {
return subscriptions.size();
}
public synchronized SubscribeResult subscribe(byte[] channel, ExecutionHandlerContext context,
Client client) {
Subscription createdSubscription = null;
if (!exists(channel, client)) {
createdSubscription = new ChannelSubscription(client, channel, context, this);
add(createdSubscription);
}
long channelCount = findSubscriptions(client).size();
return new SubscribeResult(createdSubscription, channelCount, channel);
}
public SubscribeResult psubscribe(byte[] patternBytes, ExecutionHandlerContext context,
Client client) {
GlobPattern pattern = new GlobPattern(new String(patternBytes));
Subscription createdSubscription = null;
synchronized (this) {
if (!exists(pattern, client)) {
createdSubscription = new PatternSubscription(client, pattern, context, this);
add(createdSubscription);
}
long channelCount = findSubscriptions(client).size();
return new SubscribeResult(createdSubscription, channelCount, patternBytes);
}
}
public synchronized long unsubscribe(Object channelOrPattern, Client client) {
remove(channelOrPattern, client);
return findSubscriptions(client).size();
}
}