SAMZA-2514: Refactor codes to make runWithClassLoader method more generic (#1349)

API/Usage changes: None
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 0bd66dc..ed99ad4 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -20,7 +20,6 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -71,6 +70,7 @@
 import org.apache.samza.util.ConfigUtil;
 import org.apache.samza.util.CoordinatorStreamUtil;
 import org.apache.samza.util.DiagnosticsUtil;
+import org.apache.samza.util.SplitDeploymentUtil;
 import org.apache.samza.util.SystemClock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -464,78 +464,21 @@
    * The entry point for the {@link ClusterBasedJobCoordinator}.
    */
   public static void main(String[] args) {
-    boolean dependencyIsolationEnabled = Boolean.parseBoolean(
-        System.getenv(ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED));
     Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> {
         LOG.error("Uncaught exception in ClusterBasedJobCoordinator::main. Exiting job coordinator", exception);
         System.exit(1);
       });
-    if (!dependencyIsolationEnabled) {
+    if (!SplitDeploymentUtil.isSplitDeploymentEnabled()) {
       // no isolation enabled, so can just execute runClusterBasedJobCoordinator directly
       runClusterBasedJobCoordinator(args);
     } else {
-      runWithClassLoader(new IsolatingClassLoaderFactory().buildClassLoader(), args);
+      SplitDeploymentUtil.runWithClassLoader(new IsolatingClassLoaderFactory().buildClassLoader(),
+          ClusterBasedJobCoordinator.class, "runClusterBasedJobCoordinator", args);
     }
     System.exit(0);
   }
 
   /**
-   * Execute the coordinator using a separate isolated classloader.
-   * @param classLoader {@link ClassLoader} to use to load the {@link ClusterBasedJobCoordinator} which will run
-   * @param args arguments to pass when running the {@link ClusterBasedJobCoordinator}
-   */
-  @VisibleForTesting
-  static void runWithClassLoader(ClassLoader classLoader, String[] args) {
-    // need to use the isolated classloader to load ClusterBasedJobCoordinator and then run using that new class
-    Class<?> clusterBasedJobCoordinatorClass;
-    try {
-      clusterBasedJobCoordinatorClass = classLoader.loadClass(ClusterBasedJobCoordinator.class.getName());
-    } catch (ClassNotFoundException e) {
-      throw new SamzaException(
-          "Isolation was enabled, but unable to find ClusterBasedJobCoordinator in isolated classloader", e);
-    }
-
-    // save the current context classloader so it can be reset after finishing the call to runClusterBasedJobCoordinator
-    ClassLoader previousContextClassLoader = Thread.currentThread().getContextClassLoader();
-    // this is needed because certain libraries (e.g. log4j) use the context classloader
-    Thread.currentThread().setContextClassLoader(classLoader);
-
-    try {
-      executeRunClusterBasedJobCoordinatorForClass(clusterBasedJobCoordinatorClass, args);
-    } finally {
-      // reset the context class loader; it's good practice, and could be important when running a test suite
-      Thread.currentThread().setContextClassLoader(previousContextClassLoader);
-    }
-  }
-
-  /**
-   * Runs the {@link ClusterBasedJobCoordinator#runClusterBasedJobCoordinator(String[])} method of the given
-   * {@code clusterBasedJobCoordinatorClass} using reflection.
-   * @param clusterBasedJobCoordinatorClass {@link ClusterBasedJobCoordinator} {@link Class} for which to execute
-   * {@link ClusterBasedJobCoordinator#runClusterBasedJobCoordinator(String[])}
-   * @param args arguments to pass to {@link ClusterBasedJobCoordinator#runClusterBasedJobCoordinator(String[])}
-   */
-  private static void executeRunClusterBasedJobCoordinatorForClass(Class<?> clusterBasedJobCoordinatorClass,
-      String[] args) {
-    Method runClusterBasedJobCoordinatorMethod;
-    try {
-      runClusterBasedJobCoordinatorMethod =
-          clusterBasedJobCoordinatorClass.getDeclaredMethod("runClusterBasedJobCoordinator", String[].class);
-    } catch (NoSuchMethodException e) {
-      throw new SamzaException("Isolation was enabled, but unable to find runClusterBasedJobCoordinator method", e);
-    }
-    // only sets accessible flag for this Method instance, not other Method instances for runClusterBasedJobCoordinator
-    runClusterBasedJobCoordinatorMethod.setAccessible(true);
-
-    try {
-      // wrapping args in object array so that args is passed as a single argument to the method
-      runClusterBasedJobCoordinatorMethod.invoke(null, new Object[]{args});
-    } catch (IllegalAccessException | InvocationTargetException e) {
-      throw new SamzaException("Exception while executing runClusterBasedJobCoordinator method", e);
-    }
-  }
-
-  /**
    * This is the actual execution for the {@link ClusterBasedJobCoordinator}. This is separated out from
    * {@link #main(String[])} so that it can be executed directly or from a separate classloader.
    */
diff --git a/samza-core/src/main/java/org/apache/samza/util/SplitDeploymentUtil.java b/samza-core/src/main/java/org/apache/samza/util/SplitDeploymentUtil.java
new file mode 100644
index 0000000..200cd3c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/util/SplitDeploymentUtil.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.util;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.ShellCommandConfig;
+
+
+public final class SplitDeploymentUtil {
+
+  /**
+   * The split deployment feature uses system env {@code ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED} to represent
+   * if the user chooses to enable it.
+   * This function helps to detect if the split deployment feature is enabled.
+   *
+   * @return true if split deployment is enabled; vice versa
+   */
+  public static boolean isSplitDeploymentEnabled() {
+    return Boolean.parseBoolean(System.getenv(ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED));
+  }
+
+  /**
+   * Execute the runner class using a separate isolated classloader.
+   * @param classLoader {@link ClassLoader} to use to load the runner class which will run
+   * @param originalRunnerClass {@link Class} for which will be executed with the new class loader.
+   * @param runMethodName run method name of runner class
+   * @param runMethodArgs arguments to pass to run method
+   */
+  public static void runWithClassLoader(ClassLoader classLoader, Class<?> originalRunnerClass, String runMethodName,
+      String[] runMethodArgs) {
+    // need to use the isolated classloader to load run method and then execute using that new class
+    Class<?> runnerClass;
+    try {
+      runnerClass = classLoader.loadClass(originalRunnerClass.getName());
+    } catch (ClassNotFoundException e) {
+      throw new SamzaException(String.format(
+          "Isolation was enabled, but unable to find %s in isolated classloader", originalRunnerClass.getName()), e);
+    }
+
+    // save the current context classloader so it can be reset after finishing the call to run method
+    ClassLoader previousContextClassLoader = Thread.currentThread().getContextClassLoader();
+    // this is needed because certain libraries (e.g. log4j) use the context classloader
+    Thread.currentThread().setContextClassLoader(classLoader);
+
+    try {
+      executeRunForRunnerClass(runnerClass, runMethodName, runMethodArgs);
+    } finally {
+      // reset the context class loader; it's good practice, and could be important when running a test suite
+      Thread.currentThread().setContextClassLoader(previousContextClassLoader);
+    }
+  }
+
+  private static void executeRunForRunnerClass(Class<?> runnerClass, String runMethodName, String[] runMethodArgs) {
+    Method runMethod;
+    try {
+      runMethod = runnerClass.getDeclaredMethod(runMethodName, String[].class);
+    } catch (NoSuchMethodException e) {
+      throw new SamzaException(String.format("Isolation was enabled, but unable to find %s method", runMethodName), e);
+    }
+    // only sets accessible flag for this method instance
+    runMethod.setAccessible(true);
+
+    try {
+      // wrapping args in object array so that args is passed as a single argument to the method
+      runMethod.invoke(null, new Object[]{runMethodArgs});
+    } catch (IllegalAccessException | InvocationTargetException e) {
+      throw new SamzaException(String.format("Exception while executing %s method", runMethodName), e);
+    }
+  }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
index 967bc81..6444451 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
@@ -54,7 +54,6 @@
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import static org.junit.Assert.*;
-import static org.mockito.AdditionalMatchers.aryEq;
 import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doReturn;
@@ -62,7 +61,6 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.verifyPrivate;
 
 
 /**
@@ -176,38 +174,6 @@
   }
 
   @Test
-  public void testRunWithClassLoader() throws Exception {
-    // partially mock ClusterBasedJobCoordinator (mock runClusterBasedJobCoordinator method only)
-    PowerMockito.spy(ClusterBasedJobCoordinator.class);
-    // save the context classloader to make sure that it gets set properly once the test is finished
-    ClassLoader previousContextClassLoader = Thread.currentThread().getContextClassLoader();
-    ClassLoader classLoader = mock(ClassLoader.class);
-    String[] args = new String[]{"arg0", "arg1"};
-    doReturn(ClusterBasedJobCoordinator.class).when(classLoader).loadClass(ClusterBasedJobCoordinator.class.getName());
-
-    // stub the private static method which is called by reflection
-    PowerMockito.doAnswer(invocation -> {
-        // make sure the only calls to this method has the expected arguments
-        assertArrayEquals(args, invocation.getArgumentAt(0, String[].class));
-        // checks that the context classloader is set correctly
-        assertEquals(classLoader, Thread.currentThread().getContextClassLoader());
-        return null;
-      }).when(ClusterBasedJobCoordinator.class, "runClusterBasedJobCoordinator", any());
-
-    try {
-      ClusterBasedJobCoordinator.runWithClassLoader(classLoader, args);
-      assertEquals(previousContextClassLoader, Thread.currentThread().getContextClassLoader());
-    } finally {
-      // reset it explicitly just in case runWithClassLoader throws an exception
-      Thread.currentThread().setContextClassLoader(previousContextClassLoader);
-    }
-    // make sure that the classloader got used
-    verify(classLoader).loadClass(ClusterBasedJobCoordinator.class.getName());
-    // make sure runClusterBasedJobCoordinator only got called once
-    verifyPrivate(ClusterBasedJobCoordinator.class).invoke("runClusterBasedJobCoordinator", new Object[]{aryEq(args)});
-  }
-
-  @Test
   public void testToArgs() {
     ApplicationConfig appConfig = new ApplicationConfig(new MapConfig(ImmutableMap.of(
         JobConfig.JOB_NAME, "test1",
diff --git a/samza-core/src/test/java/org/apache/samza/util/TestSplitDeploymentUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestSplitDeploymentUtil.java
new file mode 100644
index 0000000..1336190
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/util/TestSplitDeploymentUtil.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.util;
+
+import org.apache.samza.clustermanager.ClusterBasedJobCoordinator;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.*;
+import static org.mockito.AdditionalMatchers.*;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.*;
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ClusterBasedJobCoordinator.class})
+public class TestSplitDeploymentUtil {
+
+  @Test
+  public void testRunWithIsolatingClassLoader() throws Exception {
+    // partially mock ClusterBasedJobCoordinator (mock runClusterBasedJobCoordinator method only)
+    PowerMockito.spy(ClusterBasedJobCoordinator.class);
+    // save the context classloader to make sure that it gets set properly once the test is finished
+    ClassLoader previousContextClassLoader = Thread.currentThread().getContextClassLoader();
+    ClassLoader classLoader = mock(ClassLoader.class);
+    String[] args = new String[]{"arg0", "arg1"};
+    doReturn(ClusterBasedJobCoordinator.class).when(classLoader).loadClass(ClusterBasedJobCoordinator.class.getName());
+
+    // stub the private static method which is called by reflection
+    PowerMockito.doAnswer(invocation -> {
+        // make sure the only calls to this method has the expected arguments
+        assertArrayEquals(args, invocation.getArgumentAt(0, String[].class));
+        // checks that the context classloader is set correctly
+        assertEquals(classLoader, Thread.currentThread().getContextClassLoader());
+        return null;
+      }).when(ClusterBasedJobCoordinator.class, "runClusterBasedJobCoordinator", any());
+
+    try {
+      SplitDeploymentUtil.runWithClassLoader(classLoader,
+          ClusterBasedJobCoordinator.class, "runClusterBasedJobCoordinator", args);
+      assertEquals(previousContextClassLoader, Thread.currentThread().getContextClassLoader());
+    } finally {
+      // reset it explicitly just in case runWithClassLoader throws an exception
+      Thread.currentThread().setContextClassLoader(previousContextClassLoader);
+    }
+    // make sure that the classloader got used
+    verify(classLoader).loadClass(ClusterBasedJobCoordinator.class.getName());
+    // make sure runClusterBasedJobCoordinator only got called once
+    verifyPrivate(ClusterBasedJobCoordinator.class).invoke("runClusterBasedJobCoordinator", new Object[]{aryEq(args)});
+  }
+}