blob: 7e281e59a7e1ea3d0fe013ebc34380bc3d287236 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.IgniteCodeGeneratingFail;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
* Partition supply message.
public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage implements GridCacheDeployable {
/** */
private static final long serialVersionUID = 0L;
/** An unique (per demander) rebalance id. */
private long rebalanceId;
/** Topology version. */
private AffinityTopologyVersion topVer;
/** Partitions that have been fully sent. */
@GridDirectMap(keyType = int.class, valueType = long.class)
private Map<Integer, Long> last;
/** Partitions which were not found. */
private Collection<Integer> missed;
/** Partitions for which we were able to get historical iterator. */
private Collection<Integer> clean;
/** Entries. */
@GridDirectMap(keyType = int.class, valueType = CacheEntryInfoCollection.class)
private Map<Integer, CacheEntryInfoCollection> infos;
/** Message size. */
private int msgSize;
/** Estimated keys count. */
private long estimatedKeysCnt = -1;
/** Estimated keys count per cache in case the message is for shared group. */
@GridDirectMap(keyType = int.class, valueType = long.class)
private Map<Integer, Long> keysPerCache;
* @param rebalanceId Rebalance id.
* @param grpId Cache group ID.
* @param topVer Topology version.
* @param addDepInfo Deployment info flag.
long rebalanceId,
int grpId,
AffinityTopologyVersion topVer,
boolean addDepInfo
) {
this.grpId = grpId;
this.rebalanceId = rebalanceId;
this.topVer = topVer;
this.addDepInfo = addDepInfo;
* Empty constructor required for {@link Externalizable}.
public GridDhtPartitionSupplyMessage() {
// No-op.
/** {@inheritDoc} */
@Override public boolean ignoreClassErrors() {
return true;
* @return Rebalance id.
long rebalanceId() {
return rebalanceId;
* @return Topology version for which demand message is sent.
@Override public AffinityTopologyVersion topologyVersion() {
return topVer;
* @return Flag to indicate last message for partition.
Map<Integer, Long> last() {
return last == null ? Collections.<Integer, Long>emptyMap() : last;
* @param p Partition which was fully sent.
void last(int p, long cntr) {
if (last == null)
last = new HashMap<>();
if (last.put(p, cntr) == null) {
msgSize += 12;
// If partition is empty, we need to add it.
if (!infos().containsKey(p)) {
CacheEntryInfoCollection infoCol = new CacheEntryInfoCollection();
infos().put(p, infoCol);
* @param p Partition to clean.
void clean(int p) {
if (clean == null)
clean = new HashSet<>();
if (clean.add(p))
msgSize += 4;
* @param p Partition to check.
* @return Check result.
boolean isClean(int p) {
return clean != null && clean.contains(p);
* @param p Missed partition.
void missed(int p) {
if (missed == null)
missed = new HashSet<>();
if (missed.add(p))
msgSize += 4;
* @return Missed partitions.
Collection<Integer> missed() {
return missed == null ? Collections.<Integer>emptySet() : missed;
* @return Entries.
Map<Integer, CacheEntryInfoCollection> infos() {
if (infos == null)
infos = new HashMap<>();
return infos;
* @return Message size.
int messageSize() {
return msgSize;
* @param p Partition.
* @param historical {@code True} if partition rebalancing using WAL history.
* @param info Entry to add.
* @param ctx Cache shared context.
* @param cacheObjCtx Cache object context.
* @throws IgniteCheckedException If failed.
void addEntry0(int p, boolean historical, GridCacheEntryInfo info, GridCacheSharedContext ctx, CacheObjectContext cacheObjCtx) throws IgniteCheckedException {
assert info != null;
assert info.key() != null : info;
assert info.value() != null || historical : info;
// Need to call this method to initialize info properly.
marshalInfo(info, ctx, cacheObjCtx);
msgSize += info.marshalledSize(cacheObjCtx);
CacheEntryInfoCollection infoCol = infos().get(p);
if (infoCol == null) {
msgSize += 4;
infos().put(p, infoCol = new CacheEntryInfoCollection());
/** {@inheritDoc} */
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
CacheGroupContext grp = ctx.cache().cacheGroup(grpId);
if (grp == null)
for (CacheEntryInfoCollection col : infos().values()) {
List<GridCacheEntryInfo> entries = col.infos();
for (int i = 0; i < entries.size(); i++)
entries.get(i).unmarshal(grp.cacheObjectContext(), ldr);
/** {@inheritDoc} */
@Override public boolean addDeploymentInfo() {
return addDepInfo;
* @return Number of entries in message.
public int size() {
return infos().size();
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
if (!super.writeTo(buf, writer))
return false;
if (!writer.isHeaderWritten()) {
if (!writer.writeHeader(directType(), fieldsCount()))
return false;
switch (writer.state()) {
case 4:
if (!writer.writeCollection("clean", clean, MessageCollectionItemType.INT))
return false;
case 5:
if (!writer.writeLong("estimatedKeysCnt", estimatedKeysCnt))
return false;
case 6:
if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
return false;
case 7:
if (!writer.writeMap("keysPerCache", keysPerCache, MessageCollectionItemType.INT, MessageCollectionItemType.LONG))
return false;
case 8:
if (!writer.writeMap("last", last, MessageCollectionItemType.INT, MessageCollectionItemType.LONG))
return false;
case 9:
if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT))
return false;
case 10:
if (!writer.writeInt("msgSize", msgSize))
return false;
case 11:
if (!writer.writeAffinityTopologyVersion("topVer", topVer))
return false;
case 12:
// Keep 'updateSeq' name for compatibility.
if (!writer.writeLong("updateSeq", rebalanceId))
return false;
return true;
/** {@inheritDoc} */
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
if (!reader.beforeMessageRead())
return false;
if (!super.readFrom(buf, reader))
return false;
switch (reader.state()) {
case 4:
clean = reader.readCollection("clean", MessageCollectionItemType.INT);
if (!reader.isLastRead())
return false;
case 5:
estimatedKeysCnt = reader.readLong("estimatedKeysCnt");
if (!reader.isLastRead())
return false;
case 6:
infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
if (!reader.isLastRead())
return false;
case 7:
keysPerCache = reader.readMap("keysPerCache", MessageCollectionItemType.INT, MessageCollectionItemType.LONG, false);
if (!reader.isLastRead())
return false;
case 8:
last = reader.readMap("last", MessageCollectionItemType.INT, MessageCollectionItemType.LONG, false);
if (!reader.isLastRead())
return false;
case 9:
missed = reader.readCollection("missed", MessageCollectionItemType.INT);
if (!reader.isLastRead())
return false;
case 10:
msgSize = reader.readInt("msgSize");
if (!reader.isLastRead())
return false;
case 11:
topVer = reader.readAffinityTopologyVersion("topVer");
if (!reader.isLastRead())
return false;
case 12:
// Keep 'updateSeq' name for compatibility.
rebalanceId = reader.readLong("updateSeq");
if (!reader.isLastRead())
return false;
return reader.afterMessageRead(GridDhtPartitionSupplyMessage.class);
/** {@inheritDoc} */
@Override public short directType() {
return 114;
/** {@inheritDoc} */
@Override public byte fieldsCount() {
return 13;
* @return Estimated keys count.
public long estimatedKeysCount() {
return -1;
* @param cnt Keys count to add.
public void addEstimatedKeysCount(long cnt) {
this.estimatedKeysCnt += cnt;
* @return Estimated keys count for a given cache ID.
public long keysForCache(int cacheId) {
return -1;
* @param cacheId Cache ID.
* @param cnt Keys count.
public void addKeysForCache(int cacheId, long cnt) {
assert cacheId != 0 && cnt >= 0;
if (keysPerCache == null)
keysPerCache = new HashMap<>();
Long cnt0 = keysPerCache.get(cacheId);
if (cnt0 == null) {
keysPerCache.put(cacheId, cnt);
msgSize += 12;
keysPerCache.put(cacheId, cnt0 + cnt);
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtPartitionSupplyMessage.class, this,
"size", size(),
"parts", infos().keySet(),
"super", super.toString());