blob: a08b7e34f1449f2554082e3b8b6a94ff7fa9fbe7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.snapshot;
import static org.apache.geode.distributed.internal.InternalDistributedSystem.getLogger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionEvent;
import org.apache.geode.cache.RegionMembershipListener;
import org.apache.geode.cache.util.RegionMembershipListenerAdapter;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.OperationExecutors;
import org.apache.geode.distributed.internal.ProcessorKeeper21;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
/**
* Provides flow control using permits based on the sliding window algorithm. The sender should
* invoke {@link #create(Region, DistributedMember, int)} while the recipient should respond with
* {@link #sendAck(DistributionManager, DistributedMember, int, String)} or
* {@link #sendAbort(DistributionManager, int, DistributedMember)}.
*
*/
public class FlowController {
// watch out for rollover problems with MAX_VALUE
private static final int MAX_PERMITS = Integer.MAX_VALUE / 2;
/**
* Provides a callback interface for sliding window flow control.
*/
public interface Window {
/**
* Returns the window id.
*
* @return the window id
*/
int getWindowId();
/**
* Returns true if the operation has been aborted.
*
* @return true if aborted
*/
boolean isAborted();
/**
* Returns true if the window is open and {{@link #waitForOpening()} will return immediately.
*
* @return true if open
*/
boolean isOpen();
/**
* Blocks until the window is open.
*
* @throws InterruptedException Interrupted while waiting
*/
void waitForOpening() throws InterruptedException;
/**
* Closes the window and releases resources.
*/
void close();
}
/** the singleton */
@MakeNotStatic
private static final FlowController instance = new FlowController();
public static FlowController getInstance() {
return instance;
}
/** keeps a weak ref to {@link WindowImpl} implementations */
private final ProcessorKeeper21 processors;
private FlowController() {
processors = new ProcessorKeeper21();
}
/**
* Creates and registers a {@link Window} that provides flow control.
*
* @param region the region
* @param sink the data recipient
* @param windowSize the size of the sliding window
*
* @see #sendAbort(DistributionManager, int, DistributedMember)
* @see #sendAck(DistributionManager, DistributedMember, int, String)
*/
public <K, V> Window create(Region<K, V> region, DistributedMember sink, int windowSize) {
WindowImpl<K, V> w = new WindowImpl<K, V>(region, sink, windowSize);
int id = processors.put(w);
w.setWindowId(id);
return w;
}
/**
* Sends an ACK to allow the source to continue sending messages.
*
* @param dmgr the distribution manager
* @param member the data source
* @param windowId the window
* @param packetId the packet being ACK'd
*/
public void sendAck(DistributionManager dmgr, DistributedMember member, int windowId,
String packetId) {
if (getLogger().fineEnabled())
getLogger().fine("SNP: Sending ACK for packet " + packetId + " on window " + windowId
+ " to member " + member);
if (dmgr.getDistributionManagerId().equals(member)) {
WindowImpl<?, ?> win = (WindowImpl<?, ?>) processors.retrieve(windowId);
if (win != null) {
win.ack(packetId);
}
} else {
FlowControlAckMessage ack = new FlowControlAckMessage(windowId, packetId);
ack.setRecipient((InternalDistributedMember) member);
dmgr.putOutgoing(ack);
}
}
/**
* Aborts further message processing.
*
* @param dmgr the distribution manager
* @param windowId the window
* @param member the data source
*/
public void sendAbort(DistributionManager dmgr, int windowId, DistributedMember member) {
if (getLogger().fineEnabled())
getLogger().fine("SNP: Sending ABORT to member " + member + " for window " + windowId);
if (dmgr.getDistributionManagerId().equals(member)) {
WindowImpl<?, ?> win = (WindowImpl<?, ?>) processors.retrieve(windowId);
if (win != null) {
win.abort();
}
} else {
FlowControlAbortMessage abort = new FlowControlAbortMessage(windowId);
abort.setRecipient((InternalDistributedMember) member);
dmgr.putOutgoing(abort);
}
}
private static class WindowImpl<K, V> implements Window {
/** controls access to the window */
private final Semaphore permits;
/** true if aborted */
private final AtomicBoolean abort;
/** the region (used to manage membership) */
private final Region<K, V> region;
/** the membership listener */
private final RegionMembershipListener<K, V> crash;
/** the window id */
private volatile int windowId;
public WindowImpl(Region<K, V> region, final DistributedMember sink, int size) {
permits = new Semaphore(size);
abort = new AtomicBoolean(false);
this.region = region;
crash = new RegionMembershipListenerAdapter<K, V>() {
@Override
public void afterRemoteRegionCrash(RegionEvent<K, V> event) {
if (event.getDistributedMember().equals(sink)) {
if (getLogger().fineEnabled())
getLogger().fine("SNP: " + sink + " has crashed, closing window");
abort();
}
}
};
region.getAttributesMutator().addCacheListener(crash);
}
@Override
public void close() {
instance.processors.remove(windowId);
region.getAttributesMutator().removeCacheListener(crash);
permits.release(MAX_PERMITS);
}
@Override
public int getWindowId() {
return windowId;
}
@Override
public boolean isAborted() {
return abort.get();
}
@Override
public boolean isOpen() {
return permits.availablePermits() > 0;
}
@Override
public void waitForOpening() throws InterruptedException {
permits.acquire();
}
private void ack(String packetId) {
permits.release();
}
private void abort() {
abort.set(true);
permits.release(MAX_PERMITS);
}
private void setWindowId(int id) {
windowId = id;
}
}
/**
* Sent to abort message processing.
*
* @see Window#isAborted()
* @see FlowController#sendAbort(DistributionManager, int, DistributedMember)
*/
public static class FlowControlAbortMessage extends DistributionMessage {
/** the window id */
private int windowId;
public FlowControlAbortMessage(int windowId) {
this.windowId = windowId;
}
/** for deserialization */
public FlowControlAbortMessage() {}
@Override
public int getDSFID() {
return FLOW_CONTROL_ACK;
}
@Override
public int getProcessorType() {
return OperationExecutors.STANDARD_EXECUTOR;
}
@Override
protected void process(ClusterDistributionManager dm) {
if (getLogger().fineEnabled())
getLogger()
.fine("SNP: Received ABORT on window " + windowId + " from member " + getSender());
WindowImpl<?, ?> win =
(WindowImpl<?, ?>) FlowController.getInstance().processors.retrieve(windowId);
if (win != null) {
win.abort();
}
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
windowId = in.readInt();
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
out.writeInt(windowId);
}
}
/**
* Sent to acknowledge receipt of a message packet.
*
* @see FlowController#sendAck(DistributionManager, DistributedMember, int, String)
*/
public static class FlowControlAckMessage extends DistributionMessage {
/** the window id */
private int windowId;
/** the packet id */
private String packetId;
public FlowControlAckMessage(int windowId, String packetId) {
this.windowId = windowId;
this.packetId = packetId;
}
/** for deserialization */
public FlowControlAckMessage() {}
@Override
public int getDSFID() {
return FLOW_CONTROL_ACK;
}
@Override
public int getProcessorType() {
return OperationExecutors.STANDARD_EXECUTOR;
}
@Override
protected void process(ClusterDistributionManager dm) {
if (getLogger().fineEnabled())
getLogger().fine("SNP: Received ACK for packet " + packetId + " on window " + windowId
+ " from member " + getSender());
WindowImpl<?, ?> win =
(WindowImpl<?, ?>) FlowController.getInstance().processors.retrieve(windowId);
if (win != null) {
win.ack(packetId);
}
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
windowId = in.readInt();
packetId = InternalDataSerializer.readString(in);
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
out.writeInt(windowId);
InternalDataSerializer.writeString(packetId, out);
}
}
}