blob: 757dd9740550ce4b87eed525af5463c5cbd86f10 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.distributed.internal;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Set;
import org.apache.logging.log4j.Logger;
import org.apache.geode.DataSerializer;
import org.apache.geode.Instantiator;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.InternalDataSerializer.SerializerAttributesHolder;
import org.apache.geode.internal.InternalInstantiator;
import org.apache.geode.internal.InternalInstantiator.InstantiatorAttributesHolder;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LogMarker;
/**
* A message that is sent to all other distribution manager when a distribution manager starts up.
*/
public class StartupResponseMessage extends DistributionMessage
implements AdminMessageType {
private static final Logger logger = LogService.getLogger();
/** The current cache time of the sending DM */
protected String rejectionMessage;
protected int processorId;
protected boolean responderIsAdmin;
protected Set interfaces;
protected int distributedSystemId;
protected String redundancyZone;
/**
* To fix B39705, added the instance variables for storing instantiator information.
**/
protected int[] serializerIds = null;
protected String[] serializerClasseNames = null;
protected String[] instantiatorClasseNames = null;
protected String[] instantiatedClasseNames = null;
protected int[] instantiatorIds = null;
protected transient StringBuffer fromDataProblems;
public StartupResponseMessage() {
}
StartupResponseMessage(ClusterDistributionManager dm, int processorId,
InternalDistributedMember recipient, String rejectionMessage, boolean responderIsAdmin) {
// StartupResponseMessage m = new StartupResponseMessage();
setRecipient(recipient);
setProcessorId(processorId);
this.rejectionMessage = rejectionMessage;
this.responderIsAdmin = responderIsAdmin;
// Note that if we can't read our network addresses, the peer will reject us.
this.interfaces = StartupMessage.getMyAddresses(dm);
this.distributedSystemId = dm.getDistributedSystemId();
this.redundancyZone = dm.getRedundancyZone(dm.getId());
/**
* To fix B39705, we have added the instance variables to initialize the information about the
* instantiators. While preparing the response message, we populate this information.
**/
// Fix for #43677
Object[] instantiators = InternalInstantiator.getInstantiatorsForSerialization();
this.instantiatorClasseNames = new String[instantiators.length];
this.instantiatedClasseNames = new String[instantiators.length];
this.instantiatorIds = new int[instantiators.length];
for (int i = 0; i < instantiators.length; i++) {
if (instantiators[i] instanceof Instantiator) {
Instantiator inst = (Instantiator) instantiators[i];
this.instantiatorClasseNames[i] = inst.getClass().getName();
this.instantiatedClasseNames[i] = inst.getInstantiatedClass().getName();
this.instantiatorIds[i] = inst.getId();
} else {
InstantiatorAttributesHolder inst = (InstantiatorAttributesHolder) instantiators[i];
this.instantiatorClasseNames[i] = inst.getInstantiatorClassName();
this.instantiatedClasseNames[i] = inst.getInstantiatedClassName();
this.instantiatorIds[i] = inst.getId();
}
}
SerializerAttributesHolder[] sahs = InternalDataSerializer.getSerializersForDistribution();
this.serializerIds = new int[sahs.length];
this.serializerClasseNames = new String[sahs.length];
for (int i = 0; i < sahs.length; i++) {
this.serializerIds[i] = sahs[i].getId();
this.serializerClasseNames[i] = sahs[i].getClassName();
}
}
/**
* set the processor id for this message
*/
public void setProcessorId(int processorId) {
this.processorId = processorId;
}
/** replymessages are always processed in-line */
@Override
public boolean getInlineProcess() {
return true;
}
@Override
public int getProcessorType() {
return ClusterDistributionManager.WAITING_POOL_EXECUTOR;
}
@Override
public boolean sendViaUDP() {
return true;
}
/**
* Adds the distribution managers that have started up to the current DM's list of members.
*
* This method is invoked on the receiver side
*/
@Override
protected void process(ClusterDistributionManager dm) {
if (this.interfaces == null || this.interfaces.size() == 0) {
// this.rejectionMessage = "Peer " + getSender() + " has no network interfaces";
} else {
dm.setEquivalentHosts(this.interfaces);
}
dm.setDistributedSystemId(this.distributedSystemId);
dm.setRedundancyZone(getSender(), this.redundancyZone);
// Process the registration of instantiators & log failures, if any.
if (this.fromDataProblems != null) {
if (logger.isDebugEnabled()) {
logger.debug(this.fromDataProblems);
}
}
if (this.serializerIds != null) {
for (int i = 0; i < serializerIds.length; i++) {
String cName = this.serializerClasseNames[i];
if (cName != null) {
InternalDataSerializer.register(cName, false, null, null, serializerIds[i]);
}
}
}
if (this.instantiatorIds != null) {
// Process the Instantiator registrations.
for (int i = 0; i < instantiatorIds.length; i++) {
String instantiatorClassName = instantiatorClasseNames[i];
String instantiatedClassName = instantiatedClasseNames[i];
int id = instantiatorIds[i];
if ((instantiatorClassName != null) && (instantiatedClassName != null)) {
InternalInstantiator.register(instantiatorClassName, instantiatedClassName, id, false);
}
}
}
dm.processStartupResponse(this.sender, this.rejectionMessage);
StartupMessageReplyProcessor proc =
(StartupMessageReplyProcessor) ReplyProcessor21.getProcessor(processorId);
if (proc != null) {
if (this.rejectionMessage != null) {
// there's no reason to wait for other responses
proc.setReceivedRejectionMessage(true);
} else {
if (!this.responderIsAdmin) {
proc.setReceivedAcceptance(true);
}
}
proc.process(this);
if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
logger.trace(LogMarker.DM_VERBOSE, "{} Processed {}", proc, this);
}
} // proc != null
}
@Override
public int getDSFID() {
return STARTUP_RESPONSE_MESSAGE;
}
@Override
public Version[] getSerializationVersions() {
return null;
}
@Override
public void toData(DataOutput out) throws IOException {
super.toData(out);
out.writeInt(processorId);
DataSerializer.writeString(this.rejectionMessage, out);
out.writeBoolean(this.responderIsAdmin);
// Send a description of all of the DataSerializers and
// Instantiators that have been registered
out.writeInt(serializerIds.length);
for (int i = 0; i < serializerIds.length; i++) {
DataSerializer.writeNonPrimitiveClassName(serializerClasseNames[i], out);
out.writeInt(serializerIds[i]);
}
out.writeInt(this.instantiatorIds.length);
for (int i = 0; i < instantiatorIds.length; i++) {
DataSerializer.writeNonPrimitiveClassName(this.instantiatorClasseNames[i], out);
DataSerializer.writeNonPrimitiveClassName(this.instantiatedClasseNames[i], out);
out.writeInt(this.instantiatorIds[i]);
}
DataSerializer.writeObject(interfaces, out);
out.writeInt(distributedSystemId);
DataSerializer.writeString(redundancyZone, out);
}
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
super.fromData(in);
this.processorId = in.readInt();
this.rejectionMessage = DataSerializer.readString(in);
this.responderIsAdmin = in.readBoolean();
int serializerCount = in.readInt();
this.serializerClasseNames = new String[serializerCount];
this.serializerIds = new int[serializerCount];
for (int i = 0; i < serializerCount; i++) {
try {
serializerClasseNames[i] = DataSerializer.readNonPrimitiveClassName(in);
} finally {
serializerIds[i] = in.readInt(); // id
}
}
// Fix for B39705 : Deserialize the instantiators in the field variables.
int instantiatorCount = in.readInt();
instantiatorClasseNames = new String[instantiatorCount];
instantiatedClasseNames = new String[instantiatorCount];
instantiatorIds = new int[instantiatorCount];
for (int i = 0; i < instantiatorCount; i++) {
instantiatorClasseNames[i] = DataSerializer.readNonPrimitiveClassName(in);
instantiatedClasseNames[i] = DataSerializer.readNonPrimitiveClassName(in);
instantiatorIds[i] = in.readInt();
} // for
interfaces = (Set) DataSerializer.readObject(in);
distributedSystemId = in.readInt();
redundancyZone = DataSerializer.readString(in);
}
@Override
public String toString() {
return "StartupResponse: rejectionMessage=" + this.rejectionMessage + " processor="
+ processorId + " responderIsAdmin=" + this.responderIsAdmin + " distributed system id = "
+ this.distributedSystemId;
}
}