blob: 6469a8985e830c6715444c6c9b3e89eb5977db1e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query.h2.opt.join;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
/**
* Context for distributed joins.
*/
public class DistributedJoinContext {
/** */
private final AffinityTopologyVersion topVer;
/** */
private final Map<UUID, int[]> partsMap;
/** */
private final UUID originNodeId;
/** */
private final long qryId;
/** */
private final int segment;
/** */
private final int pageSize;
/** Range streams for indexes. */
private Map<Integer, Object> streams;
/** Range sources for indexes. */
private Map<SourceKey, Object> sources;
/** */
private int batchLookupIdGen;
/** */
private UUID[] partsNodes;
/** */
private volatile boolean cancelled;
/**
* Constructor.
*
* @param topVer Topology version.
* @param partsMap Partitions map.
* @param originNodeId ID of the node started the query.
* @param qryId Query ID.
* @param segment Segment.
* @param pageSize Pahe size.
*/
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
public DistributedJoinContext(
AffinityTopologyVersion topVer,
Map<UUID, int[]> partsMap,
UUID originNodeId,
long qryId,
int segment,
int pageSize
) {
this.topVer = topVer;
this.partsMap = partsMap;
this.originNodeId = originNodeId;
this.qryId = qryId;
this.segment = segment;
this.pageSize = pageSize;
}
/**
* @return Affinity topology version.
*/
public AffinityTopologyVersion topologyVersion() {
return topVer;
}
/**
* @return Partitions map.
*/
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
public Map<UUID,int[]> partitionsMap() {
return partsMap;
}
/**
* @return Origin node ID.
*/
public UUID originNodeId() {
return originNodeId;
}
/**
* @return Query request ID.
*/
public long queryId() {
return qryId;
}
/**
* @return index segment ID.
*/
public int segment() {
return segment;
}
/**
* @return Page size.
*/
public int pageSize() {
return pageSize;
}
/**
* @param p Partition.
* @param cctx Cache context.
* @return Owning node ID.
*/
public UUID nodeForPartition(int p, GridCacheContext<?, ?> cctx) {
UUID[] nodeIds = partsNodes;
if (nodeIds == null) {
assert partsMap != null;
nodeIds = new UUID[cctx.affinity().partitions()];
for (Map.Entry<UUID, int[]> e : partsMap.entrySet()) {
UUID nodeId = e.getKey();
int[] nodeParts = e.getValue();
assert nodeId != null;
assert !F.isEmpty(nodeParts);
for (int part : nodeParts) {
assert nodeIds[part] == null;
nodeIds[part] = nodeId;
}
}
partsNodes = nodeIds;
}
return nodeIds[p];
}
/**
* @param batchLookupId Batch lookup ID.
* @param streams Range streams.
*/
public synchronized void putStreams(int batchLookupId, Object streams) {
if (this.streams == null) {
if (streams == null)
return;
this.streams = new HashMap<>();
}
if (streams == null)
this.streams.remove(batchLookupId);
else
this.streams.put(batchLookupId, streams);
}
/**
* @param batchLookupId Batch lookup ID.
* @return Range streams.
*/
@SuppressWarnings("unchecked")
public synchronized <T> T getStreams(int batchLookupId) {
if (streams == null)
return null;
return (T)streams.get(batchLookupId);
}
/**
* @param ownerId Owner node ID.
* @param segmentId Index segment ID.
* @param batchLookupId Batch lookup ID.
* @param src Range source.
*/
public synchronized void putSource(UUID ownerId, int segmentId, int batchLookupId, Object src) {
SourceKey srcKey = new SourceKey(ownerId, segmentId, batchLookupId);
if (src != null) {
if (sources == null)
sources = new HashMap<>();
sources.put(srcKey, src);
}
else if (sources != null)
sources.remove(srcKey);
}
/**
* @param ownerId Owner node ID.
* @param segmentId Index segment ID.
* @param batchLookupId Batch lookup ID.
* @return Range source.
*/
@SuppressWarnings("unchecked")
public synchronized <T> T getSource(UUID ownerId, int segmentId, int batchLookupId) {
if (sources == null)
return null;
return (T)sources.get(new SourceKey(ownerId, segmentId, batchLookupId));
}
/**
* @return Next batch ID.
*/
public int nextBatchLookupId() {
return ++batchLookupIdGen;
}
/**
* @return Cleared flag.
*/
public boolean isCancelled() {
return cancelled;
}
/**
* Mark as cleared.
*/
public void cancel() {
cancelled = true;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DistributedJoinContext.class, this);
}
}