KYLIN-4291 Parallel segment building may causes WriteConflictException
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index ffee105..6b23a9e 100755
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -27,6 +27,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.WriteConflictException;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.JsonUtil;
@@ -193,12 +194,24 @@
ResourceStore store = getStore();
if (StringUtils.isBlank(dictPath))
return NONE_INDICATOR;
- long now = System.currentTimeMillis();
- store.updateTimestamp(dictPath, now);
- logger.info("Update dictionary {} lastModifiedTime to {}", dictPath, now);
- DictionaryInfo dictInfo = load(dictPath, true);
- updateDictCache(dictInfo);
- return dictInfo;
+
+ int retry = 7;
+ while (retry-- > 0) {
+ try {
+ long now = System.currentTimeMillis();
+ store.updateTimestamp(dictPath, now);
+ logger.info("Update dictionary {} lastModifiedTime to {}", dictPath, now);
+ return loadAndUpdateLocalCache(dictPath);
+ } catch (WriteConflictException e) {
+ if (retry <= 0) {
+ logger.error("Retry is out, till got error, abandoning...", e);
+ throw e;
+ }
+ logger.warn("Write conflict to update dictionary " + dictPath + " retry remaining " + retry
+ + ", will retry...");
+ }
+ }
+ return loadAndUpdateLocalCache(dictPath);
}
private void initDictInfo(Dictionary<String> newDict, DictionaryInfo newDictInfo) {
@@ -411,6 +424,12 @@
store.putBigResource(path, dict, System.currentTimeMillis(), DictionaryInfoSerializer.FULL_SERIALIZER);
}
+ private DictionaryInfo loadAndUpdateLocalCache(String dictPath) throws IOException {
+ DictionaryInfo dictInfo = load(dictPath, true);
+ updateDictCache(dictInfo);
+ return dictInfo;
+ }
+
DictionaryInfo load(String resourcePath, boolean loadDictObj) throws IOException {
ResourceStore store = getStore();
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
index 76a3df9..9d591b5 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
@@ -27,6 +27,7 @@
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.WriteConflictException;
import org.apache.kylin.metadata.TableMetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.source.IReadableTable;
@@ -236,10 +237,23 @@
private SnapshotTable updateDictLastModifiedTime(String snapshotPath) throws IOException {
ResourceStore store = getStore();
- long now = System.currentTimeMillis();
- store.updateTimestamp(snapshotPath, now);
- logger.info("Update snapshotTable {} lastModifiedTime to {}", snapshotPath, now);
+ int retry = 7;
+ while (retry-- > 0) {
+ try {
+ long now = System.currentTimeMillis();
+ store.updateTimestamp(snapshotPath, now);
+ logger.info("Update snapshotTable {} lastModifiedTime to {}", snapshotPath, now);
+ return loadAndUpdateLocalCache(snapshotPath);
+ } catch (WriteConflictException e) {
+ if (retry <= 0) {
+ logger.error("Retry is out, till got error, abandoning...", e);
+ throw e;
+ }
+ logger.warn("Write conflict to update snapshotTable " + snapshotPath + " retry remaining " + retry
+ + ", will retry...");
+ }
+ }
// update cache
return loadAndUpdateLocalCache(snapshotPath);
}