blob: b419f52a7153a9ea8ff1df6b244f502a8e1059d9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.execute;
import java.util.Collections;
import java.util.HashMap;
import java.util.Set;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.cache.execute.ResultSender;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
public abstract class StreamingFunctionOperation {
protected final InternalDistributedSystem sys;
protected Set recipients = null;
protected ResultCollector rc;
protected Function functionObject;
protected HashMap<InternalDistributedMember, Object> memberArgs;
protected ResultSender resultSender = null;
protected ResultCollector reply;
protected int totalLastMsgReceived = 0;
/** Creates a new instance of StreamingOperation */
public StreamingFunctionOperation(InternalDistributedSystem sys, ResultCollector rc,
Function function, final HashMap<InternalDistributedMember, Object> memberArgs,
Set recipients, ResultSender resultSender) {
this.sys = sys;
this.rc = rc;
this.functionObject = function;
this.memberArgs = memberArgs;
this.recipients = recipients;
this.resultSender = resultSender;
}
/** Creates a new instance of StreamingOperation */
public StreamingFunctionOperation(InternalDistributedSystem sys, ResultCollector rc,
Function function, ResultSender resultSender) {
this.sys = sys;
this.rc = rc;
this.functionObject = function;
this.resultSender = resultSender;
}
public void processData(Object result, boolean lastMsg, DistributedMember memberID) {
boolean completelyDone = false;
if (lastMsg) {
this.totalLastMsgReceived++;
}
if (this.totalLastMsgReceived == this.recipients.size()) {
completelyDone = true;
}
if (resultSender instanceof MemberFunctionResultSender) {
MemberFunctionResultSender rs = (MemberFunctionResultSender) resultSender;
rs.lastResult(result, completelyDone, this.reply, memberID);
} else {
if (completelyDone) {
((DistributedRegionFunctionResultSender) resultSender).lastResult(result, memberID);
} else {
((DistributedRegionFunctionResultSender) resultSender).sendResult(result, memberID);
}
}
}
public ResultCollector getFunctionResultFrom(Set recipients, Function function,
AbstractExecution execution) {
if (recipients.isEmpty())
return rc;
FunctionStreamingResultCollector processor =
new FunctionStreamingResultCollector(this, this.sys, recipients, rc, function, execution);
this.reply = processor;
for (InternalDistributedMember recip : this.memberArgs.keySet()) {
DistributionMessage m = null;
if (execution instanceof DistributedRegionFunctionExecutor
|| execution instanceof MultiRegionFunctionExecutor) {
m = createRequestMessage(Collections.singleton(recip), processor, execution.isReExecute(),
execution.isFnSerializationReqd());
} else {
m = createRequestMessage(Collections.singleton(recip), processor, false,
execution.isFnSerializationReqd());
}
this.sys.getDistributionManager().putOutgoing(m);
}
return processor;
}
protected abstract DistributionMessage createRequestMessage(
Set<InternalDistributedMember> singleton, FunctionStreamingResultCollector processor,
boolean isReExecute, boolean isFnSerializationReqd);
}