[ISSUES #483] Fix worker source task commit offset FileNotFoundException
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/FileAndPropertyUtil.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/FileAndPropertyUtil.java
index 69ac1b2..d83bf53 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/FileAndPropertyUtil.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/FileAndPropertyUtil.java
@@ -37,21 +37,22 @@
* @throws IOException
*/
public static void string2File(final String str, final String fileName) throws IOException {
+ synchronized (fileName) {
+ String tmpFile = fileName + ".tmp";
+ string2FileNotSafe(str, tmpFile);
- String tmpFile = fileName + ".tmp";
- string2FileNotSafe(str, tmpFile);
+ String bakFile = fileName + ".bak";
+ String prevContent = file2String(fileName);
+ if (prevContent != null) {
+ string2FileNotSafe(prevContent, bakFile);
+ }
- String bakFile = fileName + ".bak";
- String prevContent = file2String(fileName);
- if (prevContent != null) {
- string2FileNotSafe(prevContent, bakFile);
+ File file = new File(fileName);
+ file.delete();
+
+ file = new File(tmpFile);
+ file.renameTo(new File(fileName));
}
-
- File file = new File(fileName);
- file.delete();
-
- file = new File(tmpFile);
- file.renameTo(new File(fileName));
}
public static void string2FileNotSafe(final String str, final String fileName) throws IOException {
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/utils/FileAndPropertyUtilTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/utils/FileAndPropertyUtilTest.java
index 50cec64..58e352d 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/utils/FileAndPropertyUtilTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/utils/FileAndPropertyUtilTest.java
@@ -21,7 +21,13 @@
import org.junit.Test;
import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
@@ -45,6 +51,37 @@
}
@Test
+ public void testMultiThreadString2File2String() {
+ CountDownLatch countDownLatch = new CountDownLatch(100);
+ List<Thread> threadList = new ArrayList<>();
+ AtomicInteger atomicInteger = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ final int n = i;
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ String str1 = String.valueOf(n);
+ FileAndPropertyUtil.string2File(str1, filePath);
+ } catch (IOException e) {
+ atomicInteger.getAndIncrement();
+ throw new RuntimeException(e);
+ }
+ countDownLatch.countDown();
+ }
+ });
+ threadList.add(thread);
+ }
+ threadList.forEach(t -> t.start());
+ try {
+ countDownLatch.await(3, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ assertEquals(atomicInteger.get(), 0);
+ }
+
+ @Test
public void testString2FileNotSafe() throws Exception {
FileAndPropertyUtil.string2FileNotSafe(str, filePath);
String s = FileAndPropertyUtil.file2String(filePath);