Minor
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 50c54d6..0470fad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -899,8 +899,11 @@
* @param internal Internal flag.
* @return Whether listener was actually registered.
*/
- GridContinuousHandler.RegisterStatus registerListener(UUID lsnrId, CacheContinuousQueryListener lsnr,
- boolean internal) {
+ GridContinuousHandler.RegisterStatus registerListener(
+ UUID lsnrId,
+ CacheContinuousQueryListener lsnr,
+ boolean internal
+ ) {
boolean added;
if (internal) {
@@ -908,8 +911,10 @@
if (added)
intLsnrCnt.incrementAndGet();
- } else {
+ }
+ else {
listenerLock.writeLock().lock();
+
try {
if (lsnrCnt.get() == 0) {
if (cctx.group().sharedGroup() && !cctx.isLocal())
@@ -920,7 +925,8 @@
if (added)
lsnrCnt.incrementAndGet();
- } finally {
+ }
+ finally {
listenerLock.writeLock().unlock();
}
@@ -929,7 +935,7 @@
}
return added ? GridContinuousHandler.RegisterStatus.REGISTERED
- : GridContinuousHandler.RegisterStatus.NOT_REGISTERED;
+ : GridContinuousHandler.RegisterStatus.NOT_REGISTERED;
}
/**
@@ -945,8 +951,10 @@
lsnr.onUnregister();
}
- } else {
+ }
+ else {
listenerLock.writeLock().lock();
+
try {
if ((lsnr = lsnrs.remove(id)) != null) {
int cnt = lsnrCnt.decrementAndGet();
@@ -954,7 +962,8 @@
if (cctx.group().sharedGroup() && cnt == 0 && !cctx.isLocal())
cctx.group().removeCacheWithContinuousQuery(cctx);
}
- } finally {
+ }
+ finally {
listenerLock.writeLock().unlock();
}