[ISSUES #483] Fix worker source task commit offset FileNotFoundException

diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/FileAndPropertyUtil.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/FileAndPropertyUtil.java
index 69ac1b2..d83bf53 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/FileAndPropertyUtil.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/FileAndPropertyUtil.java
@@ -37,21 +37,22 @@
      * @throws IOException
      */
     public static void string2File(final String str, final String fileName) throws IOException {
+        synchronized (fileName) {
+            String tmpFile = fileName + ".tmp";
+            string2FileNotSafe(str, tmpFile);
 
-        String tmpFile = fileName + ".tmp";
-        string2FileNotSafe(str, tmpFile);
+            String bakFile = fileName + ".bak";
+            String prevContent = file2String(fileName);
+            if (prevContent != null) {
+                string2FileNotSafe(prevContent, bakFile);
+            }
 
-        String bakFile = fileName + ".bak";
-        String prevContent = file2String(fileName);
-        if (prevContent != null) {
-            string2FileNotSafe(prevContent, bakFile);
+            File file = new File(fileName);
+            file.delete();
+
+            file = new File(tmpFile);
+            file.renameTo(new File(fileName));
         }
-
-        File file = new File(fileName);
-        file.delete();
-
-        file = new File(tmpFile);
-        file.renameTo(new File(fileName));
     }
 
     public static void string2FileNotSafe(final String str, final String fileName) throws IOException {
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/utils/FileAndPropertyUtilTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/utils/FileAndPropertyUtilTest.java
index 50cec64..58e352d 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/utils/FileAndPropertyUtilTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/utils/FileAndPropertyUtilTest.java
@@ -21,7 +21,13 @@
 import org.junit.Test;
 
 import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
 
@@ -45,6 +51,37 @@
     }
 
     @Test
+    public void testMultiThreadString2File2String() {
+        CountDownLatch countDownLatch = new CountDownLatch(100);
+        List<Thread> threadList = new ArrayList<>();
+        AtomicInteger atomicInteger = new AtomicInteger(0);
+        for (int i = 0; i < 100; i++) {
+            final int n = i;
+            Thread thread = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        String str1 = String.valueOf(n);
+                        FileAndPropertyUtil.string2File(str1, filePath);
+                    } catch (IOException e) {
+                        atomicInteger.getAndIncrement();
+                        throw new RuntimeException(e);
+                    }
+                    countDownLatch.countDown();
+                }
+            });
+            threadList.add(thread);
+        }
+        threadList.forEach(t -> t.start());
+        try {
+            countDownLatch.await(3, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+        assertEquals(atomicInteger.get(), 0);
+    }
+
+    @Test
     public void testString2FileNotSafe() throws Exception {
         FileAndPropertyUtil.string2FileNotSafe(str, filePath);
         String s = FileAndPropertyUtil.file2String(filePath);