blob: b7d8c54d9667f6b737230a5a2e4441e944d224ec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.geode.DataSerializer;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.cache.event.EventSequenceNumberHolder;
import org.apache.geode.internal.cache.ha.HARegionQueue.DispatchedAndCurrentEvents;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.serialization.DataSerializableFixedID;
/**
* A helper class for serializing the event state map.
*
* TODO - Store the event state map in DataSerializable object that keeps the map in this compressed
* format in memory.
*
*/
public class EventStateHelper {
/**
* Post 7.1, if changes are made to this method make sure that it is backwards compatible by
* creating toDataPreXX methods. Also make sure that the callers to this method are backwards
* compatible by creating toDataPreXX methods for them even if they are not changed. <br>
* Callers for this method are: <br>
* {@link DataSerializableFixedID#toData(DataOutput, org.apache.geode.internal.serialization.SerializationContext)}
* <br>
* {@link DataSerializableFixedID#toData(DataOutput, org.apache.geode.internal.serialization.SerializationContext)}
* <br>
*
* @param myId the memberId that is serializing
*/
@SuppressWarnings("synthetic-access")
public static void dataSerialize(DataOutput dop, Map eventState, boolean isHARegion,
InternalDistributedMember myId) throws IOException {
// For HARegionQueues, the event state map is uses different values
// than a regular region :(
Map<EventStateMemberIdentifier, Map<ThreadIdentifier, Object>> groupedThreadIds =
groupThreadIds(eventState);
List<EventStateMemberIdentifier> orderedIds = new LinkedList();
Map<EventStateMemberIdentifier, Integer> seenIds = new HashMap();
myId.writeEssentialData(dop); // added in 7.0 for version tag processing in fromData
for (EventStateMemberIdentifier memberId : groupedThreadIds.keySet()) {
if (!seenIds.containsKey(memberId)) {
orderedIds.add(memberId);
seenIds.put(memberId, Integer.valueOf(seenIds.size()));
}
}
dop.writeInt(seenIds.size());
for (EventStateMemberIdentifier memberId : orderedIds) {
DataSerializer.writeByteArray(memberId.bytes, dop);
}
dop.writeInt(groupedThreadIds.size());
for (Map.Entry<EventStateMemberIdentifier, Map<ThreadIdentifier, Object>> memberIdEntry : groupedThreadIds
.entrySet()) {
EventStateMemberIdentifier memberId = memberIdEntry.getKey();
dop.writeInt(seenIds.get(memberId).intValue());
Map<ThreadIdentifier, Object> threadIdMap = memberIdEntry.getValue();
dop.writeInt(threadIdMap.size());
for (Object next : threadIdMap.entrySet()) {
Map.Entry entry = (Map.Entry) next;
ThreadIdentifier key = (ThreadIdentifier) entry.getKey();
dop.writeLong(key.getThreadID());
if (isHARegion) {
DispatchedAndCurrentEvents value = (DispatchedAndCurrentEvents) entry.getValue();
InternalDataSerializer.invokeToData(value, dop);
} else {
EventSequenceNumberHolder value = (EventSequenceNumberHolder) entry.getValue();
InternalDataSerializer.invokeToData(value, dop);
}
}
}
}
/**
* Post 7.1, if changes are made to this method make sure that it is backwards compatible by
* creating fromDataPreXX methods. Also make sure that the callers to this method are backwards
* compatible by creating fromDataPreXX methods for them even if they are not changed. <br>
* Callers for this method are: <br>
* {@link DataSerializableFixedID#fromData(DataInput, org.apache.geode.internal.serialization.DeserializationContext)}
* <br>
* {@link DataSerializableFixedID#fromData(DataInput, org.apache.geode.internal.serialization.DeserializationContext)}
* <br>
*/
public static Map deDataSerialize(DataInput dip, boolean isHARegion)
throws IOException, ClassNotFoundException {
InternalDistributedMember senderId = InternalDistributedMember.readEssentialData(dip);
int numIds = dip.readInt();
Map<Integer, byte[]> numberToMember = new HashMap();
for (int i = 0; i < numIds; i++) {
numberToMember.put(Integer.valueOf(i), DataSerializer.readByteArray(dip));
}
int size = dip.readInt();
HashMap eventState = new HashMap(size);
for (int i = 0; i < size; i++) {
int idNumber = dip.readInt();
int subMapSize = dip.readInt();
for (int j = 0; j < subMapSize; j++) {
long threadId = dip.readLong();
ThreadIdentifier key = new ThreadIdentifier(numberToMember.get(idNumber), threadId);
if (isHARegion) {
DispatchedAndCurrentEvents value = new DispatchedAndCurrentEvents();
InternalDataSerializer.invokeFromData(value, dip);
eventState.put(key, value);
} else {
EventSequenceNumberHolder value = new EventSequenceNumberHolder();
InternalDataSerializer.invokeFromData(value, dip);
eventState.put(key, value);
if (value.getVersionTag() != null) {
value.getVersionTag().replaceNullIDs(senderId);
}
}
}
}
return eventState;
}
private static Map<EventStateMemberIdentifier, Map<ThreadIdentifier, Object>> groupThreadIds(
Map eventState) {
Map<EventStateMemberIdentifier, Map<ThreadIdentifier, Object>> results =
new HashMap<EventStateMemberIdentifier, Map<ThreadIdentifier, Object>>();
for (Object next : eventState.entrySet()) {
Map.Entry entry = (Map.Entry) next;
ThreadIdentifier key = (ThreadIdentifier) entry.getKey();
EventStateMemberIdentifier memberId = new EventStateMemberIdentifier(key.getMembershipID());
Object value = entry.getValue();
Map<ThreadIdentifier, Object> subMap = results.get(memberId);
if (subMap == null) {
subMap = new HashMap<ThreadIdentifier, Object>();
results.put(memberId, subMap);
}
subMap.put(key, value);
}
return results;
}
private static class EventStateMemberIdentifier {
private final byte[] bytes;
public EventStateMemberIdentifier(byte[] bytes) {
this.bytes = bytes;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + Arrays.hashCode(bytes);
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (!(obj instanceof EventStateMemberIdentifier))
return false;
EventStateMemberIdentifier other = (EventStateMemberIdentifier) obj;
if (!Arrays.equals(bytes, other.bytes))
return false;
return true;
}
}
}