SAMZA-2514: Refactor codes to make runWithClassLoader method more generic (#1349)
API/Usage changes: None
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 0bd66dc..ed99ad4 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -20,7 +20,6 @@
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
@@ -71,6 +70,7 @@
import org.apache.samza.util.ConfigUtil;
import org.apache.samza.util.CoordinatorStreamUtil;
import org.apache.samza.util.DiagnosticsUtil;
+import org.apache.samza.util.SplitDeploymentUtil;
import org.apache.samza.util.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -464,78 +464,21 @@
* The entry point for the {@link ClusterBasedJobCoordinator}.
*/
public static void main(String[] args) {
- boolean dependencyIsolationEnabled = Boolean.parseBoolean(
- System.getenv(ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED));
Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> {
LOG.error("Uncaught exception in ClusterBasedJobCoordinator::main. Exiting job coordinator", exception);
System.exit(1);
});
- if (!dependencyIsolationEnabled) {
+ if (!SplitDeploymentUtil.isSplitDeploymentEnabled()) {
// no isolation enabled, so can just execute runClusterBasedJobCoordinator directly
runClusterBasedJobCoordinator(args);
} else {
- runWithClassLoader(new IsolatingClassLoaderFactory().buildClassLoader(), args);
+ SplitDeploymentUtil.runWithClassLoader(new IsolatingClassLoaderFactory().buildClassLoader(),
+ ClusterBasedJobCoordinator.class, "runClusterBasedJobCoordinator", args);
}
System.exit(0);
}
/**
- * Execute the coordinator using a separate isolated classloader.
- * @param classLoader {@link ClassLoader} to use to load the {@link ClusterBasedJobCoordinator} which will run
- * @param args arguments to pass when running the {@link ClusterBasedJobCoordinator}
- */
- @VisibleForTesting
- static void runWithClassLoader(ClassLoader classLoader, String[] args) {
- // need to use the isolated classloader to load ClusterBasedJobCoordinator and then run using that new class
- Class<?> clusterBasedJobCoordinatorClass;
- try {
- clusterBasedJobCoordinatorClass = classLoader.loadClass(ClusterBasedJobCoordinator.class.getName());
- } catch (ClassNotFoundException e) {
- throw new SamzaException(
- "Isolation was enabled, but unable to find ClusterBasedJobCoordinator in isolated classloader", e);
- }
-
- // save the current context classloader so it can be reset after finishing the call to runClusterBasedJobCoordinator
- ClassLoader previousContextClassLoader = Thread.currentThread().getContextClassLoader();
- // this is needed because certain libraries (e.g. log4j) use the context classloader
- Thread.currentThread().setContextClassLoader(classLoader);
-
- try {
- executeRunClusterBasedJobCoordinatorForClass(clusterBasedJobCoordinatorClass, args);
- } finally {
- // reset the context class loader; it's good practice, and could be important when running a test suite
- Thread.currentThread().setContextClassLoader(previousContextClassLoader);
- }
- }
-
- /**
- * Runs the {@link ClusterBasedJobCoordinator#runClusterBasedJobCoordinator(String[])} method of the given
- * {@code clusterBasedJobCoordinatorClass} using reflection.
- * @param clusterBasedJobCoordinatorClass {@link ClusterBasedJobCoordinator} {@link Class} for which to execute
- * {@link ClusterBasedJobCoordinator#runClusterBasedJobCoordinator(String[])}
- * @param args arguments to pass to {@link ClusterBasedJobCoordinator#runClusterBasedJobCoordinator(String[])}
- */
- private static void executeRunClusterBasedJobCoordinatorForClass(Class<?> clusterBasedJobCoordinatorClass,
- String[] args) {
- Method runClusterBasedJobCoordinatorMethod;
- try {
- runClusterBasedJobCoordinatorMethod =
- clusterBasedJobCoordinatorClass.getDeclaredMethod("runClusterBasedJobCoordinator", String[].class);
- } catch (NoSuchMethodException e) {
- throw new SamzaException("Isolation was enabled, but unable to find runClusterBasedJobCoordinator method", e);
- }
- // only sets accessible flag for this Method instance, not other Method instances for runClusterBasedJobCoordinator
- runClusterBasedJobCoordinatorMethod.setAccessible(true);
-
- try {
- // wrapping args in object array so that args is passed as a single argument to the method
- runClusterBasedJobCoordinatorMethod.invoke(null, new Object[]{args});
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new SamzaException("Exception while executing runClusterBasedJobCoordinator method", e);
- }
- }
-
- /**
* This is the actual execution for the {@link ClusterBasedJobCoordinator}. This is separated out from
* {@link #main(String[])} so that it can be executed directly or from a separate classloader.
*/
diff --git a/samza-core/src/main/java/org/apache/samza/util/SplitDeploymentUtil.java b/samza-core/src/main/java/org/apache/samza/util/SplitDeploymentUtil.java
new file mode 100644
index 0000000..200cd3c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/util/SplitDeploymentUtil.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.util;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.ShellCommandConfig;
+
+
+public final class SplitDeploymentUtil {
+
+ /**
+ * The split deployment feature uses system env {@code ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED} to represent
+ * if the user chooses to enable it.
+ * This function helps to detect if the split deployment feature is enabled.
+ *
+ * @return true if split deployment is enabled; vice versa
+ */
+ public static boolean isSplitDeploymentEnabled() {
+ return Boolean.parseBoolean(System.getenv(ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED));
+ }
+
+ /**
+ * Execute the runner class using a separate isolated classloader.
+ * @param classLoader {@link ClassLoader} to use to load the runner class which will run
+ * @param originalRunnerClass {@link Class} for which will be executed with the new class loader.
+ * @param runMethodName run method name of runner class
+ * @param runMethodArgs arguments to pass to run method
+ */
+ public static void runWithClassLoader(ClassLoader classLoader, Class<?> originalRunnerClass, String runMethodName,
+ String[] runMethodArgs) {
+ // need to use the isolated classloader to load run method and then execute using that new class
+ Class<?> runnerClass;
+ try {
+ runnerClass = classLoader.loadClass(originalRunnerClass.getName());
+ } catch (ClassNotFoundException e) {
+ throw new SamzaException(String.format(
+ "Isolation was enabled, but unable to find %s in isolated classloader", originalRunnerClass.getName()), e);
+ }
+
+ // save the current context classloader so it can be reset after finishing the call to run method
+ ClassLoader previousContextClassLoader = Thread.currentThread().getContextClassLoader();
+ // this is needed because certain libraries (e.g. log4j) use the context classloader
+ Thread.currentThread().setContextClassLoader(classLoader);
+
+ try {
+ executeRunForRunnerClass(runnerClass, runMethodName, runMethodArgs);
+ } finally {
+ // reset the context class loader; it's good practice, and could be important when running a test suite
+ Thread.currentThread().setContextClassLoader(previousContextClassLoader);
+ }
+ }
+
+ private static void executeRunForRunnerClass(Class<?> runnerClass, String runMethodName, String[] runMethodArgs) {
+ Method runMethod;
+ try {
+ runMethod = runnerClass.getDeclaredMethod(runMethodName, String[].class);
+ } catch (NoSuchMethodException e) {
+ throw new SamzaException(String.format("Isolation was enabled, but unable to find %s method", runMethodName), e);
+ }
+ // only sets accessible flag for this method instance
+ runMethod.setAccessible(true);
+
+ try {
+ // wrapping args in object array so that args is passed as a single argument to the method
+ runMethod.invoke(null, new Object[]{runMethodArgs});
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new SamzaException(String.format("Exception while executing %s method", runMethodName), e);
+ }
+ }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
index 967bc81..6444451 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
@@ -54,7 +54,6 @@
import org.powermock.modules.junit4.PowerMockRunner;
import static org.junit.Assert.*;
-import static org.mockito.AdditionalMatchers.aryEq;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
@@ -62,7 +61,6 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.verifyPrivate;
/**
@@ -176,38 +174,6 @@
}
@Test
- public void testRunWithClassLoader() throws Exception {
- // partially mock ClusterBasedJobCoordinator (mock runClusterBasedJobCoordinator method only)
- PowerMockito.spy(ClusterBasedJobCoordinator.class);
- // save the context classloader to make sure that it gets set properly once the test is finished
- ClassLoader previousContextClassLoader = Thread.currentThread().getContextClassLoader();
- ClassLoader classLoader = mock(ClassLoader.class);
- String[] args = new String[]{"arg0", "arg1"};
- doReturn(ClusterBasedJobCoordinator.class).when(classLoader).loadClass(ClusterBasedJobCoordinator.class.getName());
-
- // stub the private static method which is called by reflection
- PowerMockito.doAnswer(invocation -> {
- // make sure the only calls to this method has the expected arguments
- assertArrayEquals(args, invocation.getArgumentAt(0, String[].class));
- // checks that the context classloader is set correctly
- assertEquals(classLoader, Thread.currentThread().getContextClassLoader());
- return null;
- }).when(ClusterBasedJobCoordinator.class, "runClusterBasedJobCoordinator", any());
-
- try {
- ClusterBasedJobCoordinator.runWithClassLoader(classLoader, args);
- assertEquals(previousContextClassLoader, Thread.currentThread().getContextClassLoader());
- } finally {
- // reset it explicitly just in case runWithClassLoader throws an exception
- Thread.currentThread().setContextClassLoader(previousContextClassLoader);
- }
- // make sure that the classloader got used
- verify(classLoader).loadClass(ClusterBasedJobCoordinator.class.getName());
- // make sure runClusterBasedJobCoordinator only got called once
- verifyPrivate(ClusterBasedJobCoordinator.class).invoke("runClusterBasedJobCoordinator", new Object[]{aryEq(args)});
- }
-
- @Test
public void testToArgs() {
ApplicationConfig appConfig = new ApplicationConfig(new MapConfig(ImmutableMap.of(
JobConfig.JOB_NAME, "test1",
diff --git a/samza-core/src/test/java/org/apache/samza/util/TestSplitDeploymentUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestSplitDeploymentUtil.java
new file mode 100644
index 0000000..1336190
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/util/TestSplitDeploymentUtil.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.util;
+
+import org.apache.samza.clustermanager.ClusterBasedJobCoordinator;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.*;
+import static org.mockito.AdditionalMatchers.*;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.*;
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ClusterBasedJobCoordinator.class})
+public class TestSplitDeploymentUtil {
+
+ @Test
+ public void testRunWithIsolatingClassLoader() throws Exception {
+ // partially mock ClusterBasedJobCoordinator (mock runClusterBasedJobCoordinator method only)
+ PowerMockito.spy(ClusterBasedJobCoordinator.class);
+ // save the context classloader to make sure that it gets set properly once the test is finished
+ ClassLoader previousContextClassLoader = Thread.currentThread().getContextClassLoader();
+ ClassLoader classLoader = mock(ClassLoader.class);
+ String[] args = new String[]{"arg0", "arg1"};
+ doReturn(ClusterBasedJobCoordinator.class).when(classLoader).loadClass(ClusterBasedJobCoordinator.class.getName());
+
+ // stub the private static method which is called by reflection
+ PowerMockito.doAnswer(invocation -> {
+ // make sure the only calls to this method has the expected arguments
+ assertArrayEquals(args, invocation.getArgumentAt(0, String[].class));
+ // checks that the context classloader is set correctly
+ assertEquals(classLoader, Thread.currentThread().getContextClassLoader());
+ return null;
+ }).when(ClusterBasedJobCoordinator.class, "runClusterBasedJobCoordinator", any());
+
+ try {
+ SplitDeploymentUtil.runWithClassLoader(classLoader,
+ ClusterBasedJobCoordinator.class, "runClusterBasedJobCoordinator", args);
+ assertEquals(previousContextClassLoader, Thread.currentThread().getContextClassLoader());
+ } finally {
+ // reset it explicitly just in case runWithClassLoader throws an exception
+ Thread.currentThread().setContextClassLoader(previousContextClassLoader);
+ }
+ // make sure that the classloader got used
+ verify(classLoader).loadClass(ClusterBasedJobCoordinator.class.getName());
+ // make sure runClusterBasedJobCoordinator only got called once
+ verifyPrivate(ClusterBasedJobCoordinator.class).invoke("runClusterBasedJobCoordinator", new Object[]{aryEq(args)});
+ }
+}