blob: 264f5c8bcfcf0d0bc568c84db0a9028bf3819ee0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.Logger;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.MembershipListener;
import org.apache.geode.distributed.internal.OperationExecutors;
import org.apache.geode.distributed.internal.ProcessorKeeper21;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.serialization.DataSerializableFixedID;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* This class keeps track of how many InitialImageMessages are in flight between the initial image
* provider and the image target.
*
* The provider is responsible for calling acquirePermit before sending an initial image chunk. The
* acquire will block if two many messages are in flight.
*
* The initial image target sends FlowControlPermitMessage to the image provider after each
* processed chunk. Upon receiving the FlowControlPermit message, the provider will increase the
* number of permits available.
*
*/
public class InitialImageFlowControl implements MembershipListener {
private static final Logger logger = LogService.getLogger();
@MakeNotStatic
private static final ProcessorKeeper21 keeper = new ProcessorKeeper21(false);
private int id;
private int maxPermits = InitialImageOperation.CHUNK_PERMITS;
private final Semaphore permits = new Semaphore(maxPermits);
private final DistributionManager dm;
private final InternalDistributedMember target;
private final AtomicBoolean aborted = new AtomicBoolean();
public static InitialImageFlowControl register(DistributionManager dm,
InternalDistributedMember target) {
InitialImageFlowControl control = new InitialImageFlowControl(dm, target);
int id = keeper.put(control);
control.id = id;
Set availableIds = dm.addMembershipListenerAndGetDistributionManagerIds(control);
if (!availableIds.contains(target)) {
control.abort();
}
return control;
}
private InitialImageFlowControl(DistributionManager dm, InternalDistributedMember target) {
this.dm = dm;
this.target = target;
}
private void releasePermit() {
permits.release();
incMessagesInFlight(-1);
}
private void incMessagesInFlight(int val) {
dm.getStats().incInitialImageMessagesInFlight(val);
}
/**
* Acquire a permit to send another message
*/
public void acquirePermit() {
long startWaitTime = System.currentTimeMillis();
while (!aborted.get()) {
checkCancellation();
boolean interrupted = false;
try {
basicWait(startWaitTime);
break;
} catch (InterruptedException e) {
interrupted = true; // keep looping
checkCancellation();
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} // while
if (!aborted.get()) {
incMessagesInFlight(1);
}
}
private void basicWait(long startWaitTime) throws InterruptedException {
long timeout = getAckWaitThreshold() * 1000L;
long timeSoFar = System.currentTimeMillis() - startWaitTime;
if (timeout <= 0) {
timeout = Long.MAX_VALUE;
}
if (!aborted.get() && !permits.tryAcquire(timeout - timeSoFar - 1, TimeUnit.MILLISECONDS)) {
checkCancellation();
Set activeMembers = dm.getDistributionManagerIds();
logger.warn(
"{} seconds have elapsed while waiting for replies: {} on {} whose current membership list is: [{}]",
getAckWaitThreshold(), this, dm.getId(), activeMembers);
permits.acquire();
// Give an info message since timeout gave a warning.
logger.info("{} wait for replies completed",
"InitialImageFlowControl");
}
}
/**
* Return the time in sec to wait before sending an alert while waiting for ack replies. Note that
* the ack wait threshold may change at runtime, so we have to consult the system every time.
*/
private int getAckWaitThreshold() {
return dm.getConfig().getAckWaitThreshold();
}
private void checkCancellation() {
dm.getCancelCriterion().checkCancelInProgress(null);
}
public void unregister() {
dm.removeMembershipListener(this);
keeper.remove(id);
abort();
}
public int getId() {
return id;
}
@Override
public void memberDeparted(DistributionManager distributionManager, InternalDistributedMember id,
boolean crashed) {
if (id.equals(target)) {
abort();
}
}
private void abort() {
if (!aborted.getAndSet(true)) {
incMessagesInFlight(-(maxPermits - permits.availablePermits()));
// Just in case java has issues with semaphores rolling over, set this
// to half Integer.MAX_VALUE rather to release all of the waiters
permits.release(Integer.MAX_VALUE / 2);
}
}
@Override
public void memberJoined(DistributionManager distributionManager, InternalDistributedMember id) {
// Do nothing
}
@Override
public void quorumLost(DistributionManager distributionManager,
Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {}
@Override
public void memberSuspect(DistributionManager distributionManager, InternalDistributedMember id,
InternalDistributedMember whoSuspected, String reason) {
// Do nothing
}
@Override
public String toString() {
return "<InitialImageFlowControl for GII to " + target + " with " + permits.availablePermits()
+ " available permits>";
}
public static class FlowControlPermitMessage extends DistributionMessage
implements DataSerializableFixedID {
private int keeperId;
private FlowControlPermitMessage(int keeperId2) {
this.keeperId = keeperId2;
}
public FlowControlPermitMessage() {}
public static void send(DistributionManager dm, InternalDistributedMember recipient,
int keeperId) {
FlowControlPermitMessage message = new FlowControlPermitMessage(keeperId);
message.setRecipient(recipient);
dm.putOutgoing(message);
}
@Override
public int getProcessorType() {
return OperationExecutors.STANDARD_EXECUTOR;
}
@Override
public boolean getInlineProcess() {
return true;
}
@Override
protected void process(ClusterDistributionManager dm) {
InitialImageFlowControl control = (InitialImageFlowControl) keeper.retrieve(keeperId);
if (control != null) {
control.releasePermit();
}
}
@Override
public int getDSFID() {
return FLOW_CONTROL_PERMIT_MESSAGE;
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
keeperId = in.readInt();
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
out.writeInt(keeperId);
}
}
}