[CARBONDATA-3870] Optimize global lock to object lock for CarbonEnv.init

Why is this PR needed?
global lock of CarbonEnv.init impacts the performance of the concurrent query with multiple SparkSession.

What changes were proposed in this PR?
Optimize global lock to object lock for CarbonEnv.init
The global lock is only used to create CarbonEnv(not include init).
Does this PR introduce any user interface change?
No
Is any new testcase added?
No, derby only support one connection at a time, we can't create multiple sessions to connect a same derby metastore.

This closes #3805
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 5062a43..67de334 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -62,7 +62,11 @@
 
   var initialized = false
 
-  def init(sparkSession: SparkSession): Unit = {
+  def init(sparkSession: SparkSession): Unit = this.synchronized {
+    // after locking, check initialized at first
+    if (initialized) {
+      return
+    }
     val properties = CarbonProperties.getInstance()
     var storePath = properties.getProperty(CarbonCommonConstants.STORE_LOCATION)
     if (storePath == null) {
@@ -96,43 +100,38 @@
     // added for handling timeseries function like hour, minute, day, month, year
     sparkSession.udf.register(MVFunctions.TIME_SERIES_FUNCTION, new TimeSeriesFunction)
 
-    // acquiring global level lock so global configuration will be updated by only one thread
-    CarbonEnv.carbonEnvMap.synchronized {
-      if (!initialized) {
-        // update carbon session parameters , preserve thread parameters
-        val currentThreadSesssionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
-        carbonSessionInfo = new CarbonSessionInfo()
-        // We should not corrupt the information in carbonSessionInfo object which is at the
-        // session level. Instead create a new object and in that set the user specified values in
-        // thread/session params
-        val threadLevelCarbonSessionInfo = new CarbonSessionInfo()
-        if (currentThreadSesssionInfo != null) {
-          threadLevelCarbonSessionInfo.setThreadParams(currentThreadSesssionInfo.getThreadParams)
-        }
-        ThreadLocalSessionInfo.setCarbonSessionInfo(threadLevelCarbonSessionInfo)
-        ThreadLocalSessionInfo.setConfigurationToCurrentThread(sparkSession
-          .sessionState.newHadoopConf())
-        val config = new CarbonSQLConf(sparkSession)
-        if (sparkSession.conf.getOption(CarbonCommonConstants.ENABLE_UNSAFE_SORT).isEmpty) {
-          config.addDefaultCarbonParams()
-        }
-        // add session params after adding DefaultCarbonParams
-        config.addDefaultCarbonSessionParams()
-        carbonMetaStore = {
-          // trigger event for CarbonEnv create
-          val operationContext = new OperationContext
-          val carbonEnvInitPreEvent: CarbonEnvInitPreEvent =
-            CarbonEnvInitPreEvent(sparkSession, carbonSessionInfo, storePath)
-          OperationListenerBus.getInstance.fireEvent(carbonEnvInitPreEvent, operationContext)
-
-          CarbonMetaStoreFactory.createCarbonMetaStore(sparkSession.conf)
-        }
-        CarbonProperties.getInstance
-          .addNonSerializableProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
-        initialized = true
-      }
+    // update carbon session parameters , preserve thread parameters
+    val currentThreadSesssionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+    carbonSessionInfo = new CarbonSessionInfo()
+    // We should not corrupt the information in carbonSessionInfo object which is at the
+    // session level. Instead create a new object and in that set the user specified values in
+    // thread/session params
+    val threadLevelCarbonSessionInfo = new CarbonSessionInfo()
+    if (currentThreadSesssionInfo != null) {
+      threadLevelCarbonSessionInfo.setThreadParams(currentThreadSesssionInfo.getThreadParams)
     }
+    ThreadLocalSessionInfo.setCarbonSessionInfo(threadLevelCarbonSessionInfo)
+    ThreadLocalSessionInfo.setConfigurationToCurrentThread(
+      sparkSession.sessionState.newHadoopConf())
+    val config = new CarbonSQLConf(sparkSession)
+    if (sparkSession.conf.getOption(CarbonCommonConstants.ENABLE_UNSAFE_SORT).isEmpty) {
+      config.addDefaultCarbonParams()
+    }
+    // add session params after adding DefaultCarbonParams
+    config.addDefaultCarbonSessionParams()
+    carbonMetaStore = {
+      // trigger event for CarbonEnv create
+      val operationContext = new OperationContext
+      val carbonEnvInitPreEvent: CarbonEnvInitPreEvent =
+        CarbonEnvInitPreEvent(sparkSession, carbonSessionInfo, storePath)
+      OperationListenerBus.getInstance.fireEvent(carbonEnvInitPreEvent, operationContext)
+      CarbonMetaStoreFactory.createCarbonMetaStore(sparkSession.conf)
+    }
+    CarbonProperties.getInstance
+      .addNonSerializableProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
     Profiler.initialize(sparkSession.sparkContext)
+    CarbonToSparkAdapter.addSparkSessionListener(sparkSession)
+    initialized = true
     LOGGER.info("Initialize CarbonEnv completed...")
   }
 }
@@ -147,14 +146,23 @@
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
   def getInstance(sparkSession: SparkSession): CarbonEnv = {
-      var carbonEnv: CarbonEnv = carbonEnvMap.get(sparkSession)
-      if (carbonEnv == null) {
-        carbonEnv = new CarbonEnv
-        carbonEnv.init(sparkSession)
-        CarbonToSparkAdapter.addSparkSessionListener(sparkSession)
-        carbonEnvMap.put(sparkSession, carbonEnv)
+    var carbonEnv: CarbonEnv = carbonEnvMap.get(sparkSession)
+    if (carbonEnv == null) {
+      // if carbonEnv is null, need to create create CarbonEnv
+      // use global level lock, it will be fine for only creating CarbonEnv object
+      CarbonEnv.carbonEnvMap.synchronized {
+        // need to double check whether CarbonEnv exists or not
+        carbonEnv = carbonEnvMap.get(sparkSession)
+        if (carbonEnv == null) {
+          carbonEnv = new CarbonEnv
+          carbonEnvMap.put(sparkSession, carbonEnv)
+        }
       }
-      carbonEnv
+    }
+    if (!carbonEnv.initialized) {
+      carbonEnv.init(sparkSession)
+    }
+    carbonEnv
   }
 
   /**