blob: 402ce0b5efec5c07fe60c055661457c6c9fdc037 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.platform.cache.affinity;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.List;
import java.util.UUID;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.AffinityFunctionContext;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.PlatformTargetProxyImpl;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.resources.IgniteInstanceResource;
/**
* Platform AffinityFunction.
*/
public class PlatformAffinityFunction implements AffinityFunction, Externalizable, LifecycleAware {
/** */
private static final long serialVersionUID = 0L;
/** */
private static final byte FLAG_PARTITION = 1;
/** */
private static final byte FLAG_REMOVE_NODE = 1 << 1;
/** */
private static final byte FLAG_ASSIGN_PARTITIONS = 1 << 2;
/** */
private Object userFunc;
/**
* Partition count.
*
* 1) Java calls partitions() method very early (before LifecycleAware.start) during CacheConfiguration validation.
* 2) Partition count never changes.
* Therefore, we get the value on .NET side once, and pass it along with PlatformAffinity.
*/
private int partitions;
/** */
private AffinityFunction baseFunc;
/** */
private byte overrideFlags;
/** */
private transient Ignite ignite;
/** */
private transient PlatformContext ctx;
/** */
private transient long ptr;
/** */
private transient PlatformAffinityFunctionTarget baseTarget;
/**
* Ctor for serialization.
*
*/
public PlatformAffinityFunction() {
partitions = -1;
}
/**
* Ctor.
*
* @param func User fun object.
* @param partitions Number of partitions.
*/
public PlatformAffinityFunction(Object func, int partitions, byte overrideFlags, AffinityFunction baseFunc) {
userFunc = func;
this.partitions = partitions;
this.overrideFlags = overrideFlags;
this.baseFunc = baseFunc;
}
/**
* Gets the user func object.
*
* @return User func object.
*/
public Object getUserFunc() {
return userFunc;
}
/**
* Gets the base func.
*
* @return Base func.
*/
public AffinityFunction getBaseFunc() {
return baseFunc;
}
/**
* Gets the override flags.
*
* @return The override flags
*/
public byte getOverrideFlags() {
return overrideFlags;
}
/** {@inheritDoc} */
@Override public void reset() {
// userFunc is always in initial state (it is serialized only once on start).
if (baseFunc != null)
baseFunc.reset();
}
/** {@inheritDoc} */
@Override public int partitions() {
// Affinity function can not return different number of partitions,
// so we pass this value once from the platform.
assert partitions > 0;
return partitions;
}
/** {@inheritDoc} */
@Override public int partition(Object key) {
if ((overrideFlags & FLAG_PARTITION) == 0) {
assert baseFunc != null;
return baseFunc.partition(key);
}
assert ctx != null;
assert ptr != 0;
try (PlatformMemory mem = ctx.memory().allocate()) {
PlatformOutputStream out = mem.output();
BinaryRawWriterEx writer = ctx.writer(out);
writer.writeLong(ptr);
writer.writeObject(key);
out.synchronize();
return ctx.gateway().affinityFunctionPartition(mem.pointer());
}
}
/** {@inheritDoc} */
@Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
if ((overrideFlags & FLAG_ASSIGN_PARTITIONS) == 0) {
assert baseFunc != null;
return baseFunc.assignPartitions(affCtx);
}
assert ctx != null;
assert ptr != 0;
assert affCtx != null;
try (PlatformMemory mem = ctx.memory().allocate()) {
PlatformOutputStream out = mem.output();
BinaryRawWriterEx writer = ctx.writer(out);
writer.writeLong(ptr);
// Write previous assignment
PlatformAffinityUtils.writeAffinityFunctionContext(affCtx, writer, ctx);
out.synchronize();
// Call platform
// We can not restore original AffinityFunctionContext after the call to platform,
// due to DiscoveryEvent (when node leaves, we can't get it by id anymore).
// Secondly, AffinityFunctionContext can't be changed by the user.
if (baseTarget != null)
baseTarget.setCurrentAffinityFunctionContext(affCtx);
try {
ctx.gateway().affinityFunctionAssignPartitions(mem.pointer());
}
finally {
if (baseTarget != null)
baseTarget.setCurrentAffinityFunctionContext(null);
}
// Read result
return PlatformAffinityUtils.readPartitionAssignment(ctx.reader(mem), ctx);
}
}
/** {@inheritDoc} */
@Override public void removeNode(UUID nodeId) {
if ((overrideFlags & FLAG_REMOVE_NODE) == 0) {
assert baseFunc != null;
baseFunc.removeNode(nodeId);
return;
}
assert ctx != null;
assert ptr != 0;
try (PlatformMemory mem = ctx.memory().allocate()) {
PlatformOutputStream out = mem.output();
BinaryRawWriterEx writer = ctx.writer(out);
writer.writeLong(ptr);
writer.writeUuid(nodeId);
out.synchronize();
ctx.gateway().affinityFunctionRemoveNode(mem.pointer());
}
}
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(userFunc);
out.writeInt(partitions);
out.writeByte(overrideFlags);
out.writeObject(baseFunc);
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
userFunc = in.readObject();
partitions = in.readInt();
overrideFlags = in.readByte();
baseFunc = (AffinityFunction)in.readObject();
}
/** {@inheritDoc} */
@Override public void start() throws IgniteException {
// userFunc is null when there is nothing overridden
if (userFunc == null)
return;
assert ignite != null;
ctx = PlatformUtils.platformContext(ignite);
assert ctx != null;
try (PlatformMemory mem = ctx.memory().allocate()) {
PlatformOutputStream out = mem.output();
BinaryRawWriterEx writer = ctx.writer(out);
writer.writeObject(userFunc);
out.synchronize();
baseTarget = baseFunc != null
? new PlatformAffinityFunctionTarget(ctx, baseFunc)
: null;
PlatformTargetProxyImpl baseTargetProxy = baseTarget != null
? new PlatformTargetProxyImpl(baseTarget, ctx)
: null;
ptr = ctx.gateway().affinityFunctionInit(mem.pointer(), baseTargetProxy);
}
}
/** {@inheritDoc} */
@Override public void stop() throws IgniteException {
if (ptr == 0)
return;
assert ctx != null;
ctx.gateway().affinityFunctionDestroy(ptr);
}
/**
* Injects the Ignite.
*
* @param ignite Ignite.
*/
@SuppressWarnings("unused")
@IgniteInstanceResource
public void setIgnite(Ignite ignite) throws IgniteCheckedException {
this.ignite = ignite;
if (baseFunc != null && ignite != null)
((IgniteEx)ignite).context().resource().injectGeneric(baseFunc);
}
}