ATLAS-1889: fix to handle concurrent calls to update tags for an entity Signed-off-by: Madhan Neethiraj <madhan@apache.org>
diff --git a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java index 7d3bdf7..c6a4bbe 100644 --- a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java +++ b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
@@ -17,6 +17,7 @@ package org.apache.atlas; +import com.google.common.annotations.VisibleForTesting; import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInvocation; import org.apache.atlas.exception.AtlasBaseException; @@ -29,12 +30,18 @@ import javax.inject.Inject; import javax.ws.rs.core.Response; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; @Component public class GraphTransactionInterceptor implements MethodInterceptor { private static final Logger LOG = LoggerFactory.getLogger(GraphTransactionInterceptor.class); + @VisibleForTesting + private static final ObjectUpdateSynchronizer OBJECT_UPDATE_SYNCHRONIZER = new ObjectUpdateSynchronizer(); private static final ThreadLocal<List<PostTransactionHook>> postTransactionHooks = new ThreadLocal<>(); private final AtlasGraph graph; @@ -82,9 +89,19 @@ } } } + + OBJECT_UPDATE_SYNCHRONIZER.releaseLockedObjects(); } } + public static void lockObjectAndReleasePostCommit(final String guid) { + OBJECT_UPDATE_SYNCHRONIZER.lockObject(guid); + } + + public static void lockObjectAndReleasePostCommit(final List<String> guids) { + OBJECT_UPDATE_SYNCHRONIZER.lockObject(guids); + } + boolean logException(Throwable t) { if (t instanceof AtlasBaseException) { Response.Status httpCode = ((AtlasBaseException) t).getAtlasErrorCode().getHttpCode(); @@ -110,4 +127,107 @@ public abstract void onComplete(boolean isSuccess); } + + private static class RefCountedReentrantLock extends ReentrantLock { + private int refCount; + + public RefCountedReentrantLock() { + this.refCount = 0; + } + + public int increment() { + return ++refCount; + } + + public int decrement() { + return --refCount; + } + + public int getRefCount() { return refCount; } + } + + + public static class ObjectUpdateSynchronizer { + private final Map<String, RefCountedReentrantLock> guidLockMap = new ConcurrentHashMap<>(); + private final ThreadLocal<List<String>> lockedGuids = new ThreadLocal<List<String>>() { + @Override + protected List<String> initialValue() { + return new ArrayList<>(); + } + }; + + public void lockObject(final List<String> guids) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> lockObject(): guids: {}", guids); + } + + Collections.sort(guids); + for (String g : guids) { + lockObject(g); + } + } + + private void lockObject(final String guid) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> lockObject(): guid: {}, guidLockMap.size: {}", guid, guidLockMap.size()); + } + + ReentrantLock lock = getOrCreateObjectLock(guid); + lock.lock(); + + lockedGuids.get().add(guid); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== lockObject(): guid: {}, guidLockMap.size: {}", guid, guidLockMap.size()); + } + } + + public void releaseLockedObjects() { + if (LOG.isDebugEnabled()) { + LOG.debug("==> releaseLockedObjects(): lockedGuids.size: {}", lockedGuids.get().size()); + } + + for (String guid : lockedGuids.get()) { + releaseObjectLock(guid); + } + + lockedGuids.get().clear(); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== releaseLockedObjects(): lockedGuids.size: {}", lockedGuids.get().size()); + } + } + + private RefCountedReentrantLock getOrCreateObjectLock(String guid) { + synchronized (guidLockMap) { + RefCountedReentrantLock ret = guidLockMap.get(guid); + if (ret == null) { + ret = new RefCountedReentrantLock(); + guidLockMap.put(guid, ret); + } + + ret.increment(); + return ret; + } + } + + private RefCountedReentrantLock releaseObjectLock(String guid) { + synchronized (guidLockMap) { + RefCountedReentrantLock lock = guidLockMap.get(guid); + if (lock != null && lock.isHeldByCurrentThread()) { + int refCount = lock.decrement(); + + if (refCount == 0) { + guidLockMap.remove(guid); + } + + lock.unlock(); + } else { + LOG.warn("releaseLockedObjects: {} Attempting to release a lock not held by current thread.", guid); + } + + return lock; + } + } + } }
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java index 5bec8fa..0f3b06b 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
@@ -21,6 +21,7 @@ import com.google.common.base.Preconditions; import org.apache.atlas.AtlasException; import org.apache.atlas.CreateUpdateEntitiesResult; +import org.apache.atlas.GraphTransactionInterceptor; import org.apache.atlas.RequestContext; import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.model.instance.GuidMapping; @@ -49,7 +50,16 @@ import javax.inject.Inject; import javax.inject.Singleton; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; /** * An implementation backed by a Graph database provided @@ -303,6 +313,7 @@ LOG.debug("Adding a new trait={} for entities={}", traitInstance.getTypeName(), entityGuids); } + GraphTransactionInterceptor.lockObjectAndReleasePostCommit(entityGuids); for (String entityGuid : entityGuids) { addTraitImpl(entityGuid, traitInstance); } @@ -321,12 +332,12 @@ Preconditions.checkNotNull(guid, "guid cannot be null"); Preconditions.checkNotNull(traitInstance, "Trait instance cannot be null"); + GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid); addTraitImpl(guid, traitInstance); } private void addTraitImpl(String guid, ITypedStruct traitInstance) throws RepositoryException { final String traitName = traitInstance.getTypeName(); - if (LOG.isDebugEnabled()) { LOG.debug("Adding a new trait={} for entity={}", traitName, guid); } @@ -365,9 +376,8 @@ @Override @GraphTransaction public void deleteTrait(String guid, String traitNameToBeDeleted) throws TraitNotFoundException, EntityNotFoundException, RepositoryException { - if (LOG.isDebugEnabled()) { - LOG.debug("Deleting trait={} from entity={}", traitNameToBeDeleted, guid); - } + LOG.debug("Deleting trait={} from entity={}", traitNameToBeDeleted, guid); + GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid); AtlasVertex instanceVertex = graphHelper.getVertexForGUID(guid); @@ -383,11 +393,11 @@ AtlasEdge edge = graphHelper.getEdgeForLabel(instanceVertex, relationshipLabel); if(edge != null) { deleteHandler.deleteEdgeReference(edge, DataTypes.TypeCategory.TRAIT, false, true); - - // update the traits in entity once trait removal is successful - traitNames.remove(traitNameToBeDeleted); - updateTraits(instanceVertex, traitNames); } + + // update the traits in entity once trait removal is successful + traitNames.remove(traitNameToBeDeleted); + updateTraits(instanceVertex, traitNames); } catch (Exception e) { throw new RepositoryException(e); }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java index 75e9132..5ea4ff2 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -19,6 +19,7 @@ import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.GraphTransactionInterceptor; import org.apache.atlas.RequestContextV1; import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.exception.AtlasBaseException; @@ -456,6 +457,7 @@ LOG.debug("Adding classifications={} to entity={}", classifications, guid); } + GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid); for (AtlasClassification classification : classifications) { validateAndNormalize(classification); } @@ -484,6 +486,7 @@ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classifications(s) not specified"); } + GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid); List<AtlasClassification> updatedClassifications = new ArrayList<>(); for (AtlasClassification newClassification : newClassifications) { @@ -527,6 +530,8 @@ LOG.debug("Adding classification={} to entities={}", classification, guids); } + GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guids); + validateAndNormalize(classification); List<AtlasClassification> classifications = Collections.singletonList(classification); @@ -557,6 +562,8 @@ LOG.debug("Deleting classifications={} from entity={}", classificationNames, guid); } + GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid); + entityGraphMapper.deleteClassifications(guid, classificationNames); // notify listeners on classification deletion
diff --git a/repository/src/test/java/org/apache/atlas/utils/ObjectUpdateSynchronizerTest.java b/repository/src/test/java/org/apache/atlas/utils/ObjectUpdateSynchronizerTest.java new file mode 100644 index 0000000..03ebae4 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/utils/ObjectUpdateSynchronizerTest.java
@@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.utils; + +import org.apache.atlas.GraphTransactionInterceptor; +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.RandomStringUtils; +import org.springframework.util.CollectionUtils; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; + +public class ObjectUpdateSynchronizerTest { + private static final GraphTransactionInterceptor.ObjectUpdateSynchronizer objectUpdateSynchronizer = new GraphTransactionInterceptor.ObjectUpdateSynchronizer(); + + private final List<Integer> outputList = new ArrayList<>(); + private final int MAX_COUNT = 10; + + class CounterThread extends Thread { + String ids[]; + public CounterThread(String id) { + this.ids = new String[1]; + this.ids[0] = id; + } + + public void setIds(String... ids) { + this.ids = ids; + } + + public void run() { + objectUpdateSynchronizer.lockObject(CollectionUtils.arrayToList(ids)); + for (int i = 0; i < MAX_COUNT; i++) { + outputList.add(i); + RandomStringUtils.randomAlphabetic(20); + } + + objectUpdateSynchronizer.releaseLockedObjects(); + } + } + + @BeforeMethod + public void clearOutputList() { + outputList.clear(); + } + + @Test + public void singleThreadRun() throws InterruptedException { + verifyMultipleThreadRun(1); + } + + @Test + public void twoThreadsAccessingDifferntGuids_DoNotSerialize() throws InterruptedException { + CounterThread th[] = getCounterThreads(false, 2); + + startCounterThreads(th); + waitForThreadsToEnd(th); + assertArrayNotEquals(populateExpectedArrayOutput(2)); + } + + @Test + public void twoThreadsAccessingSameGuid_Serialize() throws InterruptedException { + verifyMultipleThreadRun(2); + } + + @Test + public void severalThreadsAccessingSameGuid_Serialize() throws InterruptedException { + verifyMultipleThreadRun(10); + } + + @Test + public void severalThreadsSequentialAccessingListOfGuids() throws InterruptedException { + CounterThread th[] = getCounterThreads(false, 10); + int i = 0; + th[i++].setIds("1", "2", "3", "4", "5"); + th[i++].setIds("1", "2", "3", "4"); + th[i++].setIds("1", "2", "3"); + th[i++].setIds("1", "2"); + th[i++].setIds("1"); + th[i++].setIds("1", "2"); + th[i++].setIds("1", "2", "3"); + th[i++].setIds("1", "2", "3", "4"); + th[i++].setIds("1", "2", "3", "4", "5"); + th[i++].setIds("1"); + + startCounterThreads(th); + waitForThreadsToEnd(th); + assertArrayEquals(populateExpectedArrayOutput(th.length)); + } + + @Test + public void severalThreadsNonSequentialAccessingListOfGuids() throws InterruptedException { + CounterThread th[] = getCounterThreads(false, 5); + int i = 0; + th[i++].setIds("2", "1", "3", "4", "5"); + th[i++].setIds("3", "2", "4", "1"); + th[i++].setIds("2", "3", "1"); + th[i++].setIds("1", "2"); + th[i++].setIds("1"); + + startCounterThreads(th); + waitForThreadsToEnd(th); + assertArrayEquals(populateExpectedArrayOutput(th.length)); + } + + @Test + public void severalThreadsAccessingOverlappingListOfGuids() throws InterruptedException { + CounterThread th[] = getCounterThreads(false, 5); + int i = 0; + th[i++].setIds("1", "2", "3", "4", "5"); + th[i++].setIds("3", "4", "5", "6"); + th[i++].setIds("5", "6", "7"); + th[i++].setIds("7", "8"); + th[i++].setIds("8"); + + startCounterThreads(th); + waitForThreadsToEnd(th); + assertArrayNotEquals(populateExpectedArrayOutput(th.length)); + } + + + @Test + public void severalThreadsAccessingOverlappingListOfGuids2() throws InterruptedException { + CounterThread th[] = getCounterThreads(false, 3); + int i = 0; + th[i++].setIds("1", "2", "3", "4", "5"); + th[i++].setIds("6", "7", "8", "9"); + th[i++].setIds("4", "5", "6"); + + startCounterThreads(th); + waitForThreadsToEnd(th); + assertArrayNotEquals(populateExpectedArrayOutput(th.length)); + } + + @Test + public void severalThreadsAccessingOverlappingListOfGuidsEnsuringSerialOutput() throws InterruptedException { + CounterThread th[] = getCounterThreads(false, 5); + int i = 0; + th[i++].setIds("1", "2", "3", "4", "7"); + th[i++].setIds("3", "4", "5", "7"); + th[i++].setIds("5", "6", "7"); + th[i++].setIds("7", "8"); + th[i++].setIds("7"); + + startCounterThreads(th); + waitForThreadsToEnd(th); + assertArrayEquals(populateExpectedArrayOutput(th.length)); + } + + private void verifyMultipleThreadRun(int limit) throws InterruptedException { + CounterThread[] th = getCounterThreads(limit); + startCounterThreads(th); + waitForThreadsToEnd(th); + assertArrayEquals(populateExpectedArrayOutput(limit)); + } + + private void startCounterThreads(CounterThread[] th) { + for (int i = 0; i < th.length; i++) { + th[i].start(); + } + } + private CounterThread[] getCounterThreads(int limit) { + return getCounterThreads(true, limit); + } + + private CounterThread[] getCounterThreads(boolean sameId, int limit) { + CounterThread th[] = new CounterThread[limit]; + for (Integer i = 0; i < limit; i++) { + th[i] = new CounterThread(sameId ? "1" : i.toString()); + } + return th; + } + + + private void assertArrayEquals(List<Integer> expected) { + assertEquals(outputList.toArray(), expected.toArray()); + } + + private void assertArrayNotEquals(List<Integer> expected) { + assertFalse(ArrayUtils.isEquals(outputList.toArray(), expected)); + } + + private void waitForThreadsToEnd(CounterThread... threads) throws InterruptedException { + for (Thread t : threads) { + t.join(); + } + } + + private List<Integer> populateExpectedArrayOutput(int limit) { + List<Integer> list = new ArrayList<>(); + for (int i = 0; i < limit*MAX_COUNT; i+=MAX_COUNT) { + for (int j = 0; j < MAX_COUNT; j++) { + list.add(j); + } + } + + return list; + } +}