blob: 3453f10555ac9925512b010901d8319cf5564d78 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.platform.entityframework;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.cache.Cache;
import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.processors.platform.cache.PlatformCache;
import org.apache.ignite.internal.processors.platform.cache.PlatformCacheExtension;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.resources.IgniteInstanceResource;
/**
* EntityFramework cache extension.
*/
@SuppressWarnings("unchecked")
public class PlatformDotNetEntityFrameworkCacheExtension implements PlatformCacheExtension {
/** Extension ID. */
private static final int EXT_ID = 1;
/** Operation: increment entity set versions. */
private static final int OP_INVALIDATE_SETS = 1;
/** Operation: put item async. */
private static final int OP_PUT_ITEM = 2;
/** Operation: get item. */
private static final int OP_GET_ITEM = 3;
/** Cache key for cleanup node ID. */
private static final CleanupNodeId CLEANUP_NODE_ID = new CleanupNodeId();
/** Indicates whether local cleanup is in progress, per cache name. */
private final Map<String, Boolean> cleanupFlags = new ConcurrentHashMap<>();
/** {@inheritDoc} */
@Override public int id() {
return EXT_ID;
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public long processInOutStreamLong(PlatformCache target, int type, BinaryRawReaderEx reader,
PlatformMemory mem) throws IgniteCheckedException {
switch (type) {
case OP_INVALIDATE_SETS: {
final IgniteCache<String, Long> metaCache = target.rawCache();
final String dataCacheName = reader.readString();
int cnt = reader.readInt();
assert cnt > 0;
final Set<String> entitySetNames = new HashSet(cnt);
for (int i = 0; i < cnt; i++)
entitySetNames.add(reader.readString());
final Map<String, EntryProcessorResult<Long>> curVers =
metaCache.invokeAll(entitySetNames, new PlatformDotNetEntityFrameworkIncreaseVersionProcessor());
if (curVers.size() != cnt)
throw new IgniteCheckedException("Failed to update entity set versions [expected=" + cnt +
", actual=" + curVers.size() + ']');
Ignite grid = target.platformContext().kernalContext().grid();
startBackgroundCleanup(grid, (IgniteCache<CleanupNodeId, UUID>)(IgniteCache)metaCache,
dataCacheName, curVers);
return target.writeResult(mem, null);
}
case OP_PUT_ITEM: {
String query = reader.readString();
long[] versions = null;
String[] entitySets = null;
int cnt = reader.readInt();
if (cnt >= 0) {
versions = new long[cnt];
entitySets = new String[cnt];
for (int i = 0; i < cnt; i++) {
versions[i] = reader.readLong();
entitySets[i] = reader.readString();
}
}
byte[] data = reader.readByteArray();
PlatformDotNetEntityFrameworkCacheEntry efEntry =
new PlatformDotNetEntityFrameworkCacheEntry(entitySets, data);
IgniteCache<PlatformDotNetEntityFrameworkCacheKey, PlatformDotNetEntityFrameworkCacheEntry> dataCache
= target.rawCache();
PlatformDotNetEntityFrameworkCacheKey key = new PlatformDotNetEntityFrameworkCacheKey(query, versions);
dataCache.put(key, efEntry);
return target.writeResult(mem, null);
}
case OP_GET_ITEM: {
String query = reader.readString();
long[] versions = null;
int cnt = reader.readInt();
if (cnt >= 0) {
versions = new long[cnt];
for (int i = 0; i < cnt; i++)
versions[i] = reader.readLong();
}
IgniteCache<PlatformDotNetEntityFrameworkCacheKey, PlatformDotNetEntityFrameworkCacheEntry> dataCache
= target.rawCache();
PlatformDotNetEntityFrameworkCacheKey key = new PlatformDotNetEntityFrameworkCacheKey(query, versions);
PlatformDotNetEntityFrameworkCacheEntry entry = dataCache.get(key);
byte[] data = entry == null ? null : entry.data();
return target.writeResult(mem, data);
}
}
throw new IgniteCheckedException("Unsupported operation type: " + type);
}
/**
* Starts the background cleanup of old cache entries.
*
* @param grid Grid.
* @param metaCache Meta cache.
* @param dataCacheName Data cache name.
* @param currentVersions Current versions.
*/
private void startBackgroundCleanup(Ignite grid, final Cache<CleanupNodeId, UUID> metaCache,
final String dataCacheName, final Map<String, EntryProcessorResult<Long>> currentVersions) {
if (cleanupFlags.containsKey(dataCacheName))
return; // Current node already performs cleanup.
if (!trySetGlobalCleanupFlag(grid, metaCache))
return;
cleanupFlags.put(dataCacheName, true);
final ClusterGroup dataNodes = grid.cluster().forDataNodes(dataCacheName);
IgniteFuture f = grid.compute(dataNodes).broadcastAsync(
new RemoveOldEntriesRunnable(dataCacheName, currentVersions));
f.listen(new CleanupCompletionListener(metaCache, dataCacheName));
}
/**
* Tries to set the global cleanup node id to current node.
*
* @param grid Grid.
* @param metaCache Meta cache.
*
* @return True if successfully set the flag indicating that current node performs the cleanup; otherwise false.
*/
private boolean trySetGlobalCleanupFlag(Ignite grid, final Cache<CleanupNodeId, UUID> metaCache) {
final UUID localNodeId = grid.cluster().localNode().id();
while (true) {
// Get the node performing cleanup.
UUID nodeId = metaCache.get(CLEANUP_NODE_ID);
if (nodeId == null) {
if (metaCache.putIfAbsent(CLEANUP_NODE_ID, localNodeId))
return true; // Successfully reserved cleanup to local node.
// Failed putIfAbsent: someone else may have started cleanup. Retry the check.
continue;
}
if (nodeId.equals(localNodeId))
return false; // Current node already performs cleanup.
if (grid.cluster().node(nodeId) != null)
return false; // Another node already performs cleanup and is alive.
// Node that performs cleanup has disconnected.
if (metaCache.replace(CLEANUP_NODE_ID, nodeId, localNodeId))
return true; // Successfully replaced disconnected node id with our id.
// Replace failed: someone else started cleanup.
return false;
}
}
/**
* Removes old cache entries locally.
*
* @param ignite Ignite.
* @param dataCacheName Cache name.
* @param currentVersions Current versions.
*/
private static void removeOldEntries(final Ignite ignite, final String dataCacheName,
final Map<String, EntryProcessorResult<Long>> currentVersions) {
IgniteCache<PlatformDotNetEntityFrameworkCacheKey, PlatformDotNetEntityFrameworkCacheEntry> cache =
ignite.cache(dataCacheName);
Set<PlatformDotNetEntityFrameworkCacheKey> keysToRemove = new TreeSet<>();
ClusterNode localNode = ignite.cluster().localNode();
for (Cache.Entry<PlatformDotNetEntityFrameworkCacheKey, PlatformDotNetEntityFrameworkCacheEntry> cacheEntry :
cache.localEntries(CachePeekMode.ALL)) {
// Check if we are on a primary node for the key, since we use CachePeekMode.ALL
// and we don't want to process backup entries.
if (!ignite.affinity(dataCacheName).isPrimary(localNode, cacheEntry.getKey()))
continue;
long[] versions = cacheEntry.getKey().versions();
String[] entitySets = cacheEntry.getValue().entitySets();
for (int i = 0; i < entitySets.length; i++) {
EntryProcessorResult<Long> curVer = currentVersions.get(entitySets[i]);
if (curVer != null && versions[i] < curVer.get())
keysToRemove.add(cacheEntry.getKey());
}
}
cache.removeAll(keysToRemove);
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(PlatformDotNetEntityFrameworkCacheExtension.class, this);
}
/**
* Cache key for cleanup node id.
*/
private static class CleanupNodeId {
// No-op.
}
/**
* Old entries remover.
*/
private static class RemoveOldEntriesRunnable implements IgniteRunnable {
/** */
private static final long serialVersionUID = 0L;
/** */
private final String dataCacheName;
/** */
private final Map<String, EntryProcessorResult<Long>> currentVersions;
/** Inject Ignite. */
@IgniteInstanceResource
private Ignite ignite;
/**
* Ctor.
*
* @param dataCacheName Name of the cache to clean up.
* @param currentVersions Map of current entity set versions.
*/
private RemoveOldEntriesRunnable(String dataCacheName,
Map<String, EntryProcessorResult<Long>> currentVersions) {
this.dataCacheName = dataCacheName;
this.currentVersions = currentVersions;
}
/** {@inheritDoc} */
@Override public void run() {
removeOldEntries(ignite, dataCacheName, currentVersions);
}
}
/**
* Cleanup completion listener.
*/
private class CleanupCompletionListener implements IgniteInClosure<IgniteFuture<Object>> {
/** */
private static final long serialVersionUID = 0L;
/** */
private final Cache<CleanupNodeId, UUID> metaCache;
/** */
private final String dataCacheName;
/**
* Ctor.
*
* @param metaCache Metadata cache.
* @param dataCacheName Data cache name.
*/
private CleanupCompletionListener(Cache<CleanupNodeId, UUID> metaCache, String dataCacheName) {
this.metaCache = metaCache;
this.dataCacheName = dataCacheName;
}
/** {@inheritDoc} */
@Override public void apply(IgniteFuture<Object> future) {
// Reset distributed cleanup flag.
metaCache.remove(CLEANUP_NODE_ID);
// Reset local cleanup flag.
cleanupFlags.remove(dataCacheName);
}
}
}