blob: e9e462d84133aca93e3ca3b49095ce59d4e471a9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.spi.discovery.tcp.internal;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridComponent;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CONTINUOUS_PROC;
/**
* Carries discovery data in marshalled form
* and allows convenient way of converting it to and from {@link DiscoveryDataBag} objects.
*/
public class DiscoveryDataPacket implements Serializable {
/** Local file header signature(read as a little-endian number). */
private static int ZIP_HEADER_SIGNATURE = 0x04034b50;
/** */
private static final long serialVersionUID = 0L;
/** */
private final UUID joiningNodeId;
/** */
private Map<Integer, byte[]> joiningNodeData = new HashMap<>();
/** */
private transient Map<Integer, Serializable> unmarshalledJoiningNodeData;
/** */
private Map<Integer, byte[]> commonData = new HashMap<>();
/** */
private Map<UUID, Map<Integer, byte[]>> nodeSpecificData = new LinkedHashMap<>();
/** */
private transient boolean joiningNodeClient;
/**
* @param joiningNodeId Joining node id.
*/
public DiscoveryDataPacket(UUID joiningNodeId) {
this.joiningNodeId = joiningNodeId;
}
/**
*
*/
public UUID joiningNodeId() {
return joiningNodeId;
}
/**
* @param bag Bag.
* @param nodeId Node id.
* @param marsh Marsh.
* @param log Logger.
*/
public void marshalGridNodeData(DiscoveryDataBag bag, UUID nodeId, Marshaller marsh, boolean isCompressionEnabled,
int compressionLevel, IgniteLogger log) {
marshalData(bag.commonData(), commonData, marsh, isCompressionEnabled, compressionLevel, log);
Map<Integer, Serializable> locNodeSpecificData = bag.localNodeSpecificData();
if (locNodeSpecificData != null) {
Map<Integer, byte[]> marshLocNodeSpecificData = U.newHashMap(locNodeSpecificData.size());
marshalData(locNodeSpecificData, marshLocNodeSpecificData, marsh, isCompressionEnabled, compressionLevel, log);
filterDuplicatedData(marshLocNodeSpecificData);
if (!marshLocNodeSpecificData.isEmpty())
nodeSpecificData.put(nodeId, marshLocNodeSpecificData);
}
}
/**
* @param bag Bag.
* @param marsh Marsh.
* @param log Logger.
*/
public void marshalJoiningNodeData(DiscoveryDataBag bag, Marshaller marsh, boolean isCompressionEnabled,
int compressionLevel, IgniteLogger log) {
marshalData(bag.joiningNodeData(), joiningNodeData, marsh, isCompressionEnabled, compressionLevel, log);
}
/**
* @return {@code true} if joining node data was transferred via network in zipped format.
*/
public boolean isJoiningDataZipped() {
for (Map.Entry<Integer, byte[]> entry : joiningNodeData.entrySet()) {
if (isZipped(entry.getValue()))
return true;
}
return false;
}
/**
* @param marsh Marsh.
* @param clsLdr Class loader.
* @param clientNode Client node.
* @param log Logger.
*/
public DiscoveryDataBag unmarshalGridData(
Marshaller marsh,
ClassLoader clsLdr,
boolean clientNode,
IgniteLogger log
) throws IgniteCheckedException {
DiscoveryDataBag dataBag = new DiscoveryDataBag(joiningNodeId, joiningNodeClient);
if (commonData != null && !commonData.isEmpty())
dataBag.commonData(unmarshalData(commonData, marsh, clsLdr, clientNode, log, true));
if (nodeSpecificData != null && !nodeSpecificData.isEmpty()) {
Map<UUID, Map<Integer, Serializable>> unmarshNodeSpecData = U.newLinkedHashMap(nodeSpecificData.size());
for (Map.Entry<UUID, Map<Integer, byte[]>> nodeBinEntry : nodeSpecificData.entrySet()) {
Map<Integer, byte[]> nodeBinData = nodeBinEntry.getValue();
if (nodeBinData == null || nodeBinData.isEmpty())
continue;
unmarshNodeSpecData.put(
nodeBinEntry.getKey(),
unmarshalData(nodeBinData, marsh, clsLdr, clientNode, log, true)
);
}
dataBag.nodeSpecificData(unmarshNodeSpecData);
}
return dataBag;
}
/**
* @param marsh Marsh.
* @param clsLdr Class loader.
* @param clientNode Client node.
* @param log Logger.
* @throws IgniteCheckedException If unmarshalling failed.
*/
public DiscoveryDataBag unmarshalJoiningNodeData(
Marshaller marsh,
ClassLoader clsLdr,
boolean clientNode,
IgniteLogger log
) throws IgniteCheckedException {
return unmarshalJoiningNodeData(marsh, clsLdr, clientNode, log, true);
}
/**
* @param marsh Marsh.
* @param clsLdr Class loader.
* @param clientNode Client node.
* @param log Logger.
*/
public DiscoveryDataBag unmarshalJoiningNodeDataSilently(
Marshaller marsh,
ClassLoader clsLdr,
boolean clientNode,
IgniteLogger log
) {
try {
return unmarshalJoiningNodeData(marsh, clsLdr, clientNode, log, false);
}
catch (IgniteCheckedException impossible) {
assert false : impossible;
log.error("Failed to unmarshal joining node data", impossible);
throw new IgniteException(impossible);
}
}
/**
* @param marsh Marsh.
* @param clsLdr Class loader.
* @param clientNode Client node.
* @param log Logger.
* @param panic Throw unmarshalling if {@code true}.
* @throws IgniteCheckedException If {@code panic} is {@code true} and unmarshalling failed.
*/
private DiscoveryDataBag unmarshalJoiningNodeData(
Marshaller marsh,
ClassLoader clsLdr,
boolean clientNode,
IgniteLogger log,
boolean panic
) throws IgniteCheckedException {
DiscoveryDataBag dataBag = new DiscoveryDataBag(joiningNodeId, joiningNodeClient);
if (joiningNodeData != null && !joiningNodeData.isEmpty()) {
unmarshalledJoiningNodeData = unmarshalData(joiningNodeData, marsh, clsLdr, clientNode, log, panic);
dataBag.joiningNodeData(unmarshalledJoiningNodeData);
}
return dataBag;
}
/**
*
*/
public boolean hasJoiningNodeData() {
return joiningNodeData != null && !joiningNodeData.isEmpty();
}
/**
* @param nodeId Node id.
*/
public boolean hasDataFromNode(UUID nodeId) {
return nodeSpecificData.containsKey(nodeId);
}
/**
* @param existingDataPacket Existing data packet.
* @param mrgdCmnDataKeys Mrgd cmn data keys.
* @param mrgdSpecifDataKeys Mrgd specif data keys.
*/
public boolean mergeDataFrom(
DiscoveryDataPacket existingDataPacket,
Collection<Integer> mrgdCmnDataKeys,
Collection<UUID> mrgdSpecifDataKeys
) {
if (commonData.size() != mrgdCmnDataKeys.size()) {
for (Map.Entry<Integer, byte[]> e : commonData.entrySet()) {
if (!mrgdCmnDataKeys.contains(e.getKey())) {
byte[] data = existingDataPacket.commonData.get(e.getKey());
if (data != null && Arrays.equals(e.getValue(), data)) {
e.setValue(data);
boolean add = mrgdCmnDataKeys.add(e.getKey());
assert add;
if (mrgdCmnDataKeys.size() == commonData.size())
break;
}
}
}
}
if (nodeSpecificData.size() != mrgdSpecifDataKeys.size()) {
for (Map.Entry<UUID, Map<Integer, byte[]>> e : nodeSpecificData.entrySet()) {
if (!mrgdSpecifDataKeys.contains(e.getKey())) {
Map<Integer, byte[]> data = existingDataPacket.nodeSpecificData.get(e.getKey());
if (data != null && mapsEqual(e.getValue(), data)) {
e.setValue(data);
boolean add = mrgdSpecifDataKeys.add(e.getKey());
assert add;
if (mrgdSpecifDataKeys.size() == nodeSpecificData.size())
break;
}
}
}
}
return (mrgdCmnDataKeys.size() == commonData.size()) && (mrgdSpecifDataKeys.size() == nodeSpecificData.size());
}
/**
* @param m1 first map to compare.
* @param m2 second map to compare.
*/
private boolean mapsEqual(Map<Integer, byte[]> m1, Map<Integer, byte[]> m2) {
if (m1 == m2)
return true;
if (m1.size() == m2.size()) {
for (Map.Entry<Integer, byte[]> e : m1.entrySet()) {
byte[] data = m2.get(e.getKey());
if (!Arrays.equals(e.getValue(), data))
return false;
}
return true;
}
return false;
}
/**
* @param src Source.
* @param marsh Marsh.
* @param clsLdr Class loader.
* @param clientNode Client node.
* @param log Logger.
* @param panic Throw unmarshalling if {@code true}.
* @throws IgniteCheckedException If {@code panic} is {@true} and unmarshalling failed.
*/
private Map<Integer, Serializable> unmarshalData(
Map<Integer, byte[]> src,
Marshaller marsh,
ClassLoader clsLdr,
boolean clientNode,
IgniteLogger log,
boolean panic
) throws IgniteCheckedException {
Map<Integer, Serializable> res = U.newHashMap(src.size());
for (Map.Entry<Integer, byte[]> binEntry : src.entrySet()) {
try {
Serializable compData = isZipped(binEntry.getValue()) ?
U.unmarshalZip(marsh, binEntry.getValue(), clsLdr) :
U.unmarshal(marsh, binEntry.getValue(), clsLdr);
res.put(binEntry.getKey(), compData);
}
catch (IgniteCheckedException e) {
if (CONTINUOUS_PROC.ordinal() == binEntry.getKey() &&
X.hasCause(e, ClassNotFoundException.class) && clientNode
) {
U.warn(log, "Failed to unmarshal continuous query remote filter on client node. Can be ignored.");
continue;
}
else if (binEntry.getKey() < GridComponent.DiscoveryDataExchangeType.VALUES.length) {
U.error(log,
"Failed to unmarshal discovery data for component: " +
GridComponent.DiscoveryDataExchangeType.VALUES[binEntry.getKey()],
e
);
}
else {
U.warn(log, "Failed to unmarshal discovery data." +
" Component " + binEntry.getKey() + " is not found.");
}
if (panic)
throw e;
}
}
return res;
}
/**
* @param value Value to check.
* @return {@code true} if value is zipped.
*/
private boolean isZipped(byte[] value) {
return value != null && value.length > 3 && makeInt(value) == ZIP_HEADER_SIGNATURE;
}
/**
* Make int from first 4 bytes in little-endian byte order.
*
* @param b Source of bytes.
* @return Made int.
*/
private static int makeInt(byte[] b) {
return (((b[3]) << 24) |
((b[2] & 0xff) << 16) |
((b[1] & 0xff) << 8) |
((b[0] & 0xff)));
}
/**
* @param src Source.
* @param target Target.
* @param marsh Marsh.
* @param log Logger.
*/
private void marshalData(
Map<Integer, Serializable> src,
Map<Integer, byte[]> target,
Marshaller marsh,
boolean isCompressionEnabled,
int compressionLevel,
IgniteLogger log
) {
//may happen if nothing was collected from components,
// corresponding map (for common data or for node specific data) left null
if (src == null)
return;
for (Map.Entry<Integer, Serializable> entry : src.entrySet()) {
try {
target.put(entry.getKey(), isCompressionEnabled ?
U.zip(U.marshal(marsh, entry.getValue()), compressionLevel) :
U.marshal(marsh, entry.getValue()));
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to marshal discovery data " +
"[comp=" + entry.getKey() + ", data=" + entry.getValue() + ']', e);
}
}
}
/**
* @param log Logger.
*/
public void unzipData(IgniteLogger log) {
for (Map.Entry<Integer, byte[]> entry : joiningNodeData.entrySet()) {
try {
entry.setValue(U.unzip(entry.getValue()));
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to unzip discovery data " +
"[comp=" + entry.getKey() + ']', e);
}
}
}
/**
* TODO https://issues.apache.org/jira/browse/IGNITE-4435
*/
private void filterDuplicatedData(Map<Integer, byte[]> discoData) {
for (Map<Integer, byte[]> existingData : nodeSpecificData.values()) {
Iterator<Map.Entry<Integer, byte[]>> it = discoData.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Integer, byte[]> discoDataEntry = it.next();
byte[] curData = existingData.get(discoDataEntry.getKey());
if (Arrays.equals(curData, discoDataEntry.getValue()))
it.remove();
}
if (discoData.isEmpty())
break;
}
}
/**
* Returns {@link DiscoveryDataBag} aware of components with already initialized common data
* (e.g. on nodes prior in cluster to the one where this method is called).
*/
public DiscoveryDataBag bagForDataCollection() {
DiscoveryDataBag dataBag = new DiscoveryDataBag(joiningNodeId, commonData.keySet(), joiningNodeClient);
if (unmarshalledJoiningNodeData != null)
dataBag.joiningNodeData(unmarshalledJoiningNodeData);
return dataBag;
}
/**
* @param joiningNodeClient Joining node is client flag.
*/
public void joiningNodeClient(boolean joiningNodeClient) {
this.joiningNodeClient = joiningNodeClient;
}
/**
* Clears {@link #unmarshalledJoiningNodeData}
*/
public void clearUnmarshalledJoiningNodeData() {
unmarshalledJoiningNodeData = null;
}
}