blob: a08b7e34f1449f2554082e3b8b6a94ff7fa9fbe7 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
package org.apache.geode.internal.cache.snapshot;
import static org.apache.geode.distributed.internal.InternalDistributedSystem.getLogger;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionEvent;
import org.apache.geode.cache.RegionMembershipListener;
import org.apache.geode.cache.util.RegionMembershipListenerAdapter;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.OperationExecutors;
import org.apache.geode.distributed.internal.ProcessorKeeper21;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
* Provides flow control using permits based on the sliding window algorithm. The sender should
* invoke {@link #create(Region, DistributedMember, int)} while the recipient should respond with
* {@link #sendAck(DistributionManager, DistributedMember, int, String)} or
* {@link #sendAbort(DistributionManager, int, DistributedMember)}.
public class FlowController {
// watch out for rollover problems with MAX_VALUE
private static final int MAX_PERMITS = Integer.MAX_VALUE / 2;
* Provides a callback interface for sliding window flow control.
public interface Window {
* Returns the window id.
* @return the window id
int getWindowId();
* Returns true if the operation has been aborted.
* @return true if aborted
boolean isAborted();
* Returns true if the window is open and {{@link #waitForOpening()} will return immediately.
* @return true if open
boolean isOpen();
* Blocks until the window is open.
* @throws InterruptedException Interrupted while waiting
void waitForOpening() throws InterruptedException;
* Closes the window and releases resources.
void close();
/** the singleton */
private static final FlowController instance = new FlowController();
public static FlowController getInstance() {
return instance;
/** keeps a weak ref to {@link WindowImpl} implementations */
private final ProcessorKeeper21 processors;
private FlowController() {
processors = new ProcessorKeeper21();
* Creates and registers a {@link Window} that provides flow control.
* @param region the region
* @param sink the data recipient
* @param windowSize the size of the sliding window
* @see #sendAbort(DistributionManager, int, DistributedMember)
* @see #sendAck(DistributionManager, DistributedMember, int, String)
public <K, V> Window create(Region<K, V> region, DistributedMember sink, int windowSize) {
WindowImpl<K, V> w = new WindowImpl<K, V>(region, sink, windowSize);
int id = processors.put(w);
return w;
* Sends an ACK to allow the source to continue sending messages.
* @param dmgr the distribution manager
* @param member the data source
* @param windowId the window
* @param packetId the packet being ACK'd
public void sendAck(DistributionManager dmgr, DistributedMember member, int windowId,
String packetId) {
if (getLogger().fineEnabled())
getLogger().fine("SNP: Sending ACK for packet " + packetId + " on window " + windowId
+ " to member " + member);
if (dmgr.getDistributionManagerId().equals(member)) {
WindowImpl<?, ?> win = (WindowImpl<?, ?>) processors.retrieve(windowId);
if (win != null) {
} else {
FlowControlAckMessage ack = new FlowControlAckMessage(windowId, packetId);
ack.setRecipient((InternalDistributedMember) member);
* Aborts further message processing.
* @param dmgr the distribution manager
* @param windowId the window
* @param member the data source
public void sendAbort(DistributionManager dmgr, int windowId, DistributedMember member) {
if (getLogger().fineEnabled())
getLogger().fine("SNP: Sending ABORT to member " + member + " for window " + windowId);
if (dmgr.getDistributionManagerId().equals(member)) {
WindowImpl<?, ?> win = (WindowImpl<?, ?>) processors.retrieve(windowId);
if (win != null) {
} else {
FlowControlAbortMessage abort = new FlowControlAbortMessage(windowId);
abort.setRecipient((InternalDistributedMember) member);
private static class WindowImpl<K, V> implements Window {
/** controls access to the window */
private final Semaphore permits;
/** true if aborted */
private final AtomicBoolean abort;
/** the region (used to manage membership) */
private final Region<K, V> region;
/** the membership listener */
private final RegionMembershipListener<K, V> crash;
/** the window id */
private volatile int windowId;
public WindowImpl(Region<K, V> region, final DistributedMember sink, int size) {
permits = new Semaphore(size);
abort = new AtomicBoolean(false);
this.region = region;
crash = new RegionMembershipListenerAdapter<K, V>() {
public void afterRemoteRegionCrash(RegionEvent<K, V> event) {
if (event.getDistributedMember().equals(sink)) {
if (getLogger().fineEnabled())
getLogger().fine("SNP: " + sink + " has crashed, closing window");
public void close() {
public int getWindowId() {
return windowId;
public boolean isAborted() {
return abort.get();
public boolean isOpen() {
return permits.availablePermits() > 0;
public void waitForOpening() throws InterruptedException {
private void ack(String packetId) {
private void abort() {
private void setWindowId(int id) {
windowId = id;
* Sent to abort message processing.
* @see Window#isAborted()
* @see FlowController#sendAbort(DistributionManager, int, DistributedMember)
public static class FlowControlAbortMessage extends DistributionMessage {
/** the window id */
private int windowId;
public FlowControlAbortMessage(int windowId) {
this.windowId = windowId;
/** for deserialization */
public FlowControlAbortMessage() {}
public int getDSFID() {
public int getProcessorType() {
return OperationExecutors.STANDARD_EXECUTOR;
protected void process(ClusterDistributionManager dm) {
if (getLogger().fineEnabled())
.fine("SNP: Received ABORT on window " + windowId + " from member " + getSender());
WindowImpl<?, ?> win =
(WindowImpl<?, ?>) FlowController.getInstance().processors.retrieve(windowId);
if (win != null) {
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
windowId = in.readInt();
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
* Sent to acknowledge receipt of a message packet.
* @see FlowController#sendAck(DistributionManager, DistributedMember, int, String)
public static class FlowControlAckMessage extends DistributionMessage {
/** the window id */
private int windowId;
/** the packet id */
private String packetId;
public FlowControlAckMessage(int windowId, String packetId) {
this.windowId = windowId;
this.packetId = packetId;
/** for deserialization */
public FlowControlAckMessage() {}
public int getDSFID() {
public int getProcessorType() {
return OperationExecutors.STANDARD_EXECUTOR;
protected void process(ClusterDistributionManager dm) {
if (getLogger().fineEnabled())
getLogger().fine("SNP: Received ACK for packet " + packetId + " on window " + windowId
+ " from member " + getSender());
WindowImpl<?, ?> win =
(WindowImpl<?, ?>) FlowController.getInstance().processors.retrieve(windowId);
if (win != null) {
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
windowId = in.readInt();
packetId = InternalDataSerializer.readString(in);
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
InternalDataSerializer.writeString(packetId, out);