blob: f753e8c10e5396d8ccd6d3968fb30cdc6760a321 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.service;
import java.io.File;
import java.net.URI;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.fs.Path;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jgit.api.Git;
import org.eclipse.jgit.lib.RepositoryCache;
import org.eclipse.jgit.transport.RefSpec;
import org.eclipse.jgit.util.FS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.linkedin.data.template.StringMap;
import com.linkedin.r2.transport.http.client.HttpClientFactory;
import com.linkedin.restli.client.RestLiResponseException;
import com.typesafe.config.Config;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.MysqlJobStatusStateStoreFactory;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.restli.EmbeddedRestliServer;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.modules.core.GitConfigMonitor;
import org.apache.gobblin.service.modules.core.GobblinServiceManager;
import org.apache.gobblin.service.modules.flow.MockedSpecCompiler;
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
import org.apache.gobblin.testing.AssertWithBackoff;
import org.apache.gobblin.util.ConfigUtils;
public class GobblinServiceManagerTest {
private static final Logger logger = LoggerFactory.getLogger(GobblinServiceManagerTest.class);
private static Gson gson = new GsonBuilder().setPrettyPrinting().create();
private static final String SERVICE_WORK_DIR = "/tmp/serviceWorkDir/";
private static final String SPEC_STORE_PARENT_DIR = "/tmp/serviceCore/";
private static final String SPEC_DESCRIPTION = "Test ServiceCore";
private static final String TOPOLOGY_SPEC_STORE_DIR = "/tmp/serviceCore/topologyTestSpecStore";
private static final String FLOW_SPEC_STORE_DIR = "/tmp/serviceCore/flowTestSpecStore";
private static final String GIT_CLONE_DIR = "/tmp/serviceCore/clone";
private static final String GIT_REMOTE_REPO_DIR = "/tmp/serviceCore/remote";
private static final String GIT_LOCAL_REPO_DIR = "/tmp/serviceCore/local";
private static final String JOB_STATUS_STATE_STORE_DIR = "/tmp/serviceCore/fsJobStatusRetriever";
private static final String GROUP_OWNERSHIP_CONFIG_DIR = Files.createTempDir().getAbsolutePath();
private static final String TEST_GROUP_NAME = "testGroup";
private static final String TEST_FLOW_NAME = "testFlow";
private static final String TEST_FLOW_NAME2 = "testFlow2";
private static final FlowId TEST_FLOW_ID = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME);
private static final FlowId TEST_FLOW_ID2 = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME2);
private static final FlowId UNCOMPILABLE_FLOW_ID = new FlowId().setFlowGroup(TEST_GROUP_NAME)
.setFlowName(MockedSpecCompiler.UNCOMPILABLE_FLOW);
private static final String TEST_SCHEDULE = "0 1/0 * ? * *";
private static final String TEST_TEMPLATE_URI = "FS:///templates/test.template";
private static final String TEST_DUMMY_GROUP_NAME = "dummyGroup";
private static final String TEST_DUMMY_FLOW_NAME = "dummyFlow";
private static final String TEST_GOBBLIN_EXECUTOR_NAME = "testGobblinExecutor";
private static final String TEST_SOURCE_NAME = "testSource";
private static final String TEST_SINK_NAME = "testSink";
private final URI TEST_URI = FlowSpec.Utils.createFlowSpecUri(TEST_FLOW_ID);
private MockGobblinServiceManager gobblinServiceManager;
private FlowConfigV2Client flowConfigClient;
private Git gitForPush;
private TestingServer testingServer;
Properties serviceCoreProperties = new Properties();
Map<String, String> flowProperties = Maps.newHashMap();
Map<String, String> transportClientProperties = Maps.newHashMap();
public GobblinServiceManagerTest() throws Exception {
}
@BeforeClass
public void setup() throws Exception {
cleanUpDir(SERVICE_WORK_DIR);
cleanUpDir(SPEC_STORE_PARENT_DIR);
ITestMetastoreDatabase testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
testingServer = new TestingServer(true);
flowProperties.put("param1", "value1");
flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, TEST_SOURCE_NAME);
flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, TEST_SINK_NAME);
serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_USER_KEY, "testUser");
serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, "testPassword");
serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_URL_KEY, testMetastoreDatabase.getJdbcUrl());
serviceCoreProperties.put("zookeeper.connect", testingServer.getConnectString());
serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY, MysqlJobStatusStateStoreFactory.class.getName());
serviceCoreProperties.put(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY, TOPOLOGY_SPEC_STORE_DIR);
serviceCoreProperties.put(FlowCatalog.FLOWSPEC_STORE_DIR_KEY, FLOW_SPEC_STORE_DIR);
serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_TOPOLOGY_NAMES_KEY, TEST_GOBBLIN_EXECUTOR_NAME);
serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".description",
"StandaloneTestExecutor");
serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".version",
FlowSpec.Builder.DEFAULT_VERSION);
serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".uri",
"gobblinExecutor");
serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".specExecutorInstance",
"org.apache.gobblin.service.InMemorySpecExecutor");
serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".specExecInstance.capabilities",
TEST_SOURCE_NAME + ":" + TEST_SINK_NAME);
serviceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY, true);
serviceCoreProperties.put(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_URI, GIT_REMOTE_REPO_DIR);
serviceCoreProperties.put(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_DIR, GIT_LOCAL_REPO_DIR);
serviceCoreProperties.put(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5);
serviceCoreProperties.put(FsJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, JOB_STATUS_STATE_STORE_DIR);
serviceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY, false);
serviceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY, MockedSpecCompiler.class.getCanonicalName());
transportClientProperties.put(HttpClientFactory.HTTP_REQUEST_TIMEOUT, "10000");
// Create a bare repository
RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(new File(GIT_REMOTE_REPO_DIR), FS.DETECTED);
fileKey.open(false).create(true);
this.gitForPush = Git.cloneRepository().setURI(GIT_REMOTE_REPO_DIR).setDirectory(new File(GIT_CLONE_DIR)).call();
// push an empty commit as a base for detecting changes
this.gitForPush.commit().setMessage("First commit").call();
this.gitForPush.push().setRemote("origin").setRefSpecs(new RefSpec("master")).call();
this.gobblinServiceManager = new MockGobblinServiceManager("CoreService", "1",
ConfigUtils.propertiesToConfig(serviceCoreProperties), Optional.of(new Path(SERVICE_WORK_DIR)));
this.gobblinServiceManager.start();
this.flowConfigClient = new FlowConfigV2Client(String.format("http://127.0.0.1:%s/",
this.gobblinServiceManager.getRestLiServer().getListeningURI().getPort()), transportClientProperties);
}
private void cleanUpDir(String dir) throws Exception {
File specStoreDir = new File(dir);
if (specStoreDir.exists()) {
FileUtils.deleteDirectory(specStoreDir);
}
}
@AfterClass
public void cleanUp() throws Exception {
// Shutdown Service
try {
this.gobblinServiceManager.stop();
} catch (Exception e) {
logger.warn("Could not cleanly stop Gobblin Service Manager", e);
}
try {
cleanUpDir(SERVICE_WORK_DIR);
} catch (Exception e) {
logger.warn("Could not completely cleanup Work Dir");
}
try {
cleanUpDir(SPEC_STORE_PARENT_DIR);
} catch (Exception e) {
logger.warn("Could not completely cleanup Spec Store Parent Dir");
}
try {
cleanUpDir(GROUP_OWNERSHIP_CONFIG_DIR);
} catch (Exception e) {
logger.warn("Could not completely cleanup Group Ownership Parent Dir");
}
try {
this.testingServer.close();
} catch(Exception e) {
System.err.println("Failed to close ZK testing server.");
}
}
/**
* To test an existing flow in a spec store does not get deleted just because it is not compilable during service restarts
*/
@Test
public void testRestart() throws Exception {
FlowConfig uncompilableFlowConfig = new FlowConfig().setId(UNCOMPILABLE_FLOW_ID).setTemplateUris(TEST_TEMPLATE_URI)
.setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(true))
.setProperties(new StringMap(flowProperties));
FlowSpec uncompilableSpec = FlowConfigResourceLocalHandler.createFlowSpecForConfig(uncompilableFlowConfig);
FlowConfig runOnceFlowConfig = new FlowConfig().setId(TEST_FLOW_ID)
.setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties));
FlowSpec runOnceSpec = FlowConfigResourceLocalHandler.createFlowSpecForConfig(runOnceFlowConfig);
// add the non compilable flow directly to the spec store skipping flow catalog which would not allow this
this.gobblinServiceManager.getFlowCatalog().getSpecStore().addSpec(uncompilableSpec);
this.gobblinServiceManager.getFlowCatalog().getSpecStore().addSpec(runOnceSpec);
List<Spec> specs = (List<Spec>) this.gobblinServiceManager.getFlowCatalog().getSpecs();
Assert.assertEquals(specs.size(), 2);
if (specs.get(0).getUri().equals(uncompilableSpec.getUri())) {
Assert.assertEquals(specs.get(1).getUri(), runOnceSpec.getUri());
} else if (specs.get(0).getUri().equals(runOnceSpec.getUri())) {
Assert.assertEquals(specs.get(1).getUri(), uncompilableSpec.getUri());
} else {
Assert.fail();
}
// restart the service
serviceReboot();
// runOnce job should get deleted from the spec store after running but uncompilable flow should stay
AssertWithBackoff.create().maxSleepMs(200L).timeoutMs(20000L).backoffFactor(1)
.assertTrue(input -> this.gobblinServiceManager.getFlowCatalog().getSpecs().size() == 1,
"Waiting for the runOnce job to finish");
specs = (List<Spec>) this.gobblinServiceManager.getFlowCatalog().getSpecs();
Assert.assertEquals(specs.get(0).getUri(), uncompilableSpec.getUri());
Assert.assertTrue(uncompilableSpec.getConfig().getBoolean(ConfigurationKeys.FLOW_RUN_IMMEDIATELY));
// clean it
this.gobblinServiceManager.getFlowCatalog().remove(uncompilableSpec.getUri());
specs = (List<Spec>) this.gobblinServiceManager.getFlowCatalog().getSpecs();
Assert.assertEquals(specs.size(), 0);
}
@Test (dependsOnMethods = "testRestart")
public void testUncompilableJob() throws Exception {
FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(MockedSpecCompiler.UNCOMPILABLE_FLOW);
URI uri = FlowSpec.Utils.createFlowSpecUri(flowId);
FlowConfig flowConfig = new FlowConfig().setId(flowId)
.setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties));
RestLiResponseException exception = null;
try {
this.flowConfigClient.createFlowConfig(flowConfig);
} catch (RestLiResponseException e) {
exception = e;
}
Assert.assertEquals(exception.getStatus(), HttpStatus.BAD_REQUEST_400);
// uncompilable job should not be persisted
Assert.assertEquals(this.gobblinServiceManager.getFlowCatalog().getSpecs().size(), 0);
Assert.assertFalse(this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(uri.toString()));
}
@Test (dependsOnMethods = "testUncompilableJob")
public void testRunOnceJob() throws Exception {
FlowConfig flowConfig = new FlowConfig().setId(TEST_FLOW_ID)
.setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties));
this.flowConfigClient.createFlowConfig(flowConfig);
// runOnce job is deleted soon after it is orchestrated
AssertWithBackoff.create().maxSleepMs(200L).timeoutMs(2000L).backoffFactor(1)
.assertTrue(input -> this.gobblinServiceManager.getFlowCatalog().getSpecs().size() == 0,
"Waiting for job to get orchestrated...");
AssertWithBackoff.create().maxSleepMs(100L).timeoutMs(1000L).backoffFactor(1)
.assertTrue(input -> !this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(TEST_URI.toString()),
"Waiting for job to get orchestrated...");
}
@Test (dependsOnMethods = "testRunOnceJob")
public void testExplainJob() throws Exception {
FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME))
.setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties)).setExplain(true);
this.flowConfigClient.createFlowConfig(flowConfig);
// explain job should not be persisted
Assert.assertEquals(this.gobblinServiceManager.getFlowCatalog().getSpecs().size(), 0);
Assert.assertFalse(this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(TEST_URI.toString()));
}
@Test (dependsOnMethods = "testExplainJob")
public void testCreate() throws Exception {
FlowConfig flowConfig = new FlowConfig().setId(TEST_FLOW_ID)
.setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(true))
.setProperties(new StringMap(flowProperties));
this.flowConfigClient.createFlowConfig(flowConfig);
Assert.assertEquals(this.gobblinServiceManager.getFlowCatalog().getSpecs().size(), 1);
Assert.assertTrue(this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(TEST_URI.toString()));
}
@Test (dependsOnMethods = "testCreate")
public void testCreateAgain() throws Exception {
FlowConfig flowConfig = new FlowConfig().setId(TEST_FLOW_ID)
.setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE))
.setProperties(new StringMap(flowProperties));
RestLiResponseException exception = null;
try {
this.flowConfigClient.createFlowConfig(flowConfig);
} catch (RestLiResponseException e) {
exception = e;
}
Assert.assertNotNull(exception);
Assert.assertEquals(exception.getStatus(), HttpStatus.CONFLICT_409);
}
@Test (dependsOnMethods = "testCreateAgain")
public void testGet() throws Exception {
FlowConfig flowConfig = this.flowConfigClient.getFlowConfig(TEST_FLOW_ID);
Assert.assertEquals(flowConfig.getId().getFlowGroup(), TEST_GROUP_NAME);
Assert.assertEquals(flowConfig.getId().getFlowName(), TEST_FLOW_NAME);
Assert.assertEquals(flowConfig.getSchedule().getCronSchedule(), TEST_SCHEDULE);
Assert.assertEquals(flowConfig.getTemplateUris(), TEST_TEMPLATE_URI);
Assert.assertTrue(flowConfig.getSchedule().isRunImmediately());
// Add this assert back when getFlowSpec() is changed to return the raw flow spec
//Assert.assertEquals(flowConfig.getProperties().size(), 1);
Assert.assertEquals(flowConfig.getProperties().get("param1"), "value1");
}
@Test (dependsOnMethods = "testCreateAgain")
public void testGetAll() throws Exception {
FlowConfig flowConfig2 = new FlowConfig().setId(TEST_FLOW_ID2)
.setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE))
.setProperties(new StringMap(flowProperties));
this.flowConfigClient.createFlowConfig(flowConfig2);
Collection<FlowConfig> flowConfigs = this.flowConfigClient.getAllFlowConfigs();
Assert.assertEquals(flowConfigs.size(), 2);
this.flowConfigClient.deleteFlowConfig(TEST_FLOW_ID2);
}
@Test (dependsOnMethods = "testCreateAgain", enabled = false)
public void testGetFilteredFlows() throws Exception {
// Not implemented for FsSpecStore
Collection<FlowConfig> flowConfigs = this.flowConfigClient.getFlowConfigs(TEST_GROUP_NAME, null, null, null, null, null,
null, null, null, null);
Assert.assertEquals(flowConfigs.size(), 2);
flowConfigs = this.flowConfigClient.getFlowConfigs(TEST_GROUP_NAME, TEST_FLOW_NAME2, null, null, null, null,
null, null, null, null);
Assert.assertEquals(flowConfigs.size(), 1);
flowConfigs = this.flowConfigClient.getFlowConfigs(null, null, null, null, null, null,
TEST_SCHEDULE, null, null, null);
Assert.assertEquals(flowConfigs.size(), 2);
}
@Test (dependsOnMethods = "testGet")
public void testUpdate() throws Exception {
FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME);
Map<String, String> flowProperties = Maps.newHashMap();
flowProperties.put("param1", "value1b");
flowProperties.put("param2", "value2b");
flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, TEST_SOURCE_NAME);
flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, TEST_SINK_NAME);
FlowConfig flowConfig = new FlowConfig().setId(TEST_FLOW_ID)
.setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE))
.setProperties(new StringMap(flowProperties));
this.flowConfigClient.updateFlowConfig(flowConfig);
FlowConfig retrievedFlowConfig = this.flowConfigClient.getFlowConfig(flowId);
Assert.assertTrue(this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(TEST_URI.toString()));
Assert.assertEquals(retrievedFlowConfig.getId().getFlowGroup(), TEST_GROUP_NAME);
Assert.assertEquals(retrievedFlowConfig.getId().getFlowName(), TEST_FLOW_NAME);
Assert.assertEquals(retrievedFlowConfig.getSchedule().getCronSchedule(), TEST_SCHEDULE);
Assert.assertEquals(retrievedFlowConfig.getTemplateUris(), TEST_TEMPLATE_URI);
// Add this asssert when getFlowSpec() is changed to return the raw flow spec
//Assert.assertEquals(flowConfig.getProperties().size(), 2);
Assert.assertEquals(retrievedFlowConfig.getProperties().get("param1"), "value1b");
Assert.assertEquals(retrievedFlowConfig.getProperties().get("param2"), "value2b");
}
@Test (dependsOnMethods = "testUpdate")
public void testDelete() throws Exception {
FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME);
URI uri = FlowSpec.Utils.createFlowSpecUri(flowId);
// make sure flow config exists
FlowConfig flowConfig = this.flowConfigClient.getFlowConfig(flowId);
Assert.assertEquals(flowConfig.getId().getFlowGroup(), TEST_GROUP_NAME);
Assert.assertEquals(flowConfig.getId().getFlowName(), TEST_FLOW_NAME);
this.flowConfigClient.deleteFlowConfig(flowId);
try {
this.flowConfigClient.getFlowConfig(flowId);
} catch (RestLiResponseException e) {
Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404);
Assert.assertFalse(this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(uri.toString()));
return;
}
Assert.fail("Get should have gotten a 404 error");
}
@Test (dependsOnMethods = "testDelete")
public void testGitCreate() throws Exception {
// push a new config file
File testFlowFile = new File(GIT_CLONE_DIR + "/gobblin-config/testGroup/testFlow.pull");
testFlowFile.getParentFile().mkdirs();
Files.write("flow.name=testFlow\nflow.group=testGroup\nparam1=value20\n", testFlowFile, Charsets.UTF_8);
Collection<Spec> specs = this.gobblinServiceManager.getFlowCatalog().getSpecs();
Assert.assertEquals(specs.size(), 0);
// add, commit, push
this.gitForPush.add().addFilepattern("gobblin-config/testGroup/testFlow.pull").call();
this.gitForPush.commit().setMessage("second commit").call();
this.gitForPush.push().setRemote("origin").setRefSpecs(new RefSpec("master")).call();
// polling is every 5 seconds, so wait twice as long and check
TimeUnit.SECONDS.sleep(10);
// spec generated using git monitor do not have schedule, so their life cycle should be similar to runOnce jobs
Assert.assertEquals(this.gobblinServiceManager.getFlowCatalog().getSpecs().size(), 0);
AssertWithBackoff.create().maxSleepMs(200L).timeoutMs(2000L).backoffFactor(1)
.assertTrue(input -> !this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(TEST_URI.toString()),
"Waiting for job to get orchestrated...");
}
@Test
public void testBadGet() throws Exception {
FlowId flowId = new FlowId().setFlowGroup(TEST_DUMMY_GROUP_NAME).setFlowName(TEST_DUMMY_FLOW_NAME);
try {
this.flowConfigClient.getFlowConfig(flowId);
} catch (RestLiResponseException e) {
Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404);
return;
}
Assert.fail("Get should have raised a 404 error");
}
@Test
public void testBadDelete() throws Exception {
FlowId flowId = new FlowId().setFlowGroup(TEST_DUMMY_GROUP_NAME).setFlowName(TEST_DUMMY_FLOW_NAME);
try {
this.flowConfigClient.getFlowConfig(flowId);
} catch (RestLiResponseException e) {
Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404);
return;
}
Assert.fail("Get should have raised a 404 error");
}
@Test
public void testBadUpdate() throws Exception {
Map<String, String> flowProperties = Maps.newHashMap();
flowProperties.put("param1", "value1b");
flowProperties.put("param2", "value2b");
FlowConfig flowConfig = new FlowConfig()
.setId(new FlowId().setFlowGroup(TEST_DUMMY_GROUP_NAME).setFlowName(TEST_DUMMY_FLOW_NAME))
.setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE))
.setProperties(new StringMap(flowProperties));
try {
this.flowConfigClient.updateFlowConfig(flowConfig);
} catch (RestLiResponseException e) {
Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404);
}
}
private void serviceReboot() throws Exception {
this.gobblinServiceManager.stop();
this.gobblinServiceManager = new MockGobblinServiceManager("CoreService", "1",
ConfigUtils.propertiesToConfig(serviceCoreProperties), Optional.of(new Path(SERVICE_WORK_DIR)));
this.flowConfigClient = new FlowConfigV2Client(String.format("http://127.0.0.1:%s/",
this.gobblinServiceManager.getRestLiServer().getPort()), transportClientProperties);
this.gobblinServiceManager.start();
}
class MockGobblinServiceManager extends GobblinServiceManager {
public MockGobblinServiceManager(String serviceName, String serviceId, Config config,
Optional<Path> serviceWorkDirOptional)
throws Exception {
super(serviceName, serviceId, config, serviceWorkDirOptional);
}
protected EmbeddedRestliServer getRestLiServer() {
return this.restliServer;
}
}
}