blob: fefb8306aff602d64cbb4be7ac3e87f0a2579ed2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.spark;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.util.VersionInfo;
import org.apache.zeppelin.interpreter.BaseZeppelinContext;
import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(Enclosed.class)
public class SparkShimsTest {
@RunWith(Parameterized.class)
public static class ParamTests {
@Parameters(name = "Hadoop {0} supports jobUrl: {1}")
public static Collection<Object[]> data() {
return Arrays.asList(
new Object[][] {
{"2.6.0", false},
{"2.6.1", false},
{"2.6.2", false},
{"2.6.3", false},
{"2.6.4", false},
{"2.6.5", false},
{"2.6.6", true}, // The latest fixed version
{"2.6.7", true}, // Future version
{"2.7.0", false},
{"2.7.1", false},
{"2.7.2", false},
{"2.7.3", false},
{"2.7.4", true}, // The latest fixed version
{"2.7.5", true}, // Future versions
{"2.8.0", false},
{"2.8.1", false},
{"2.8.2", true}, // The latest fixed version
{"2.8.3", true}, // Future versions
{"2.9.0", true}, // The latest fixed version
{"2.9.1", true}, // Future versions
{"3.0.0", true}, // The latest fixed version
{"3.0.0-alpha4", true}, // The latest fixed version
{"3.0.1", true}, // Future versions
});
}
@Parameter public String version;
@Parameter(1)
public boolean expected;
@Test
public void checkYarnVersionTest() {
SparkShims sparkShims =
new SparkShims(new Properties()) {
@Override
public void setupSparkListener(String master, String sparkWebUrl) {}
};
assertEquals(expected, sparkShims.supportYarn6615(version));
}
}
@RunWith(PowerMockRunner.class)
@PrepareForTest({BaseZeppelinContext.class, VersionInfo.class})
@PowerMockIgnore({"javax.net.*", "javax.security.*"})
public static class SingleTests {
@Mock Properties mockProperties;
@Captor ArgumentCaptor<Map<String, String>> argumentCaptor;
SparkShims sparkShims;
@Before
public void setUp() {
PowerMockito.mockStatic(BaseZeppelinContext.class);
RemoteEventClientWrapper mockRemoteEventClientWrapper = mock(RemoteEventClientWrapper.class);
when(BaseZeppelinContext.getEventClient()).thenReturn(mockRemoteEventClientWrapper);
doNothing()
.when(mockRemoteEventClientWrapper)
.onParaInfosReceived(anyString(), anyString(), argumentCaptor.capture());
when(mockProperties.getProperty("spark.jobGroup.id")).thenReturn("zeppelin-user1-note-paragraph");
try {
sparkShims = SparkShims.getInstance(SparkVersion.SPARK_2_0_0.toString(), new Properties());
} catch (Throwable ignore) {
sparkShims = SparkShims.getInstance(SparkVersion.SPARK_1_6_0.toString(), new Properties());
}
}
@Test
public void runUnerLocalTest() {
sparkShims.buildSparkJobUrl("local", "http://sparkurl", 0, mockProperties);
Map<String, String> mapValue = argumentCaptor.getValue();
assertTrue(mapValue.keySet().contains("jobUrl"));
assertTrue(mapValue.get("jobUrl").contains("/jobs/job?id="));
}
@Test
public void runUnerYarnTest() {
sparkShims.buildSparkJobUrl("yarn", "http://sparkurl", 0, mockProperties);
Map<String, String> mapValue = argumentCaptor.getValue();
assertTrue(mapValue.keySet().contains("jobUrl"));
if (sparkShims.supportYarn6615(VersionInfo.getVersion())) {
assertTrue(mapValue.get("jobUrl").contains("/jobs/job?id="));
} else {
assertFalse(mapValue.get("jobUrl").contains("/jobs/job?id="));
}
}
}
}