blob: 206eced5d69bc252f2c27024cd0c2e1d3160a21a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.service;
import org.apache.falcon.cluster.util.EmbeddedCluster;
import org.apache.falcon.entity.AbstractTestBase;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.jdbc.BacklogMetricStore;
import org.apache.falcon.metrics.MetricNotificationService;
import org.apache.falcon.tools.FalconStateStoreDBCLI;
import org.apache.falcon.util.StateStoreProperties;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* Test cases for Backlog Metric Store.
*/
public class BacklogMetricEmitterServiceTest extends AbstractTestBase{
private static final String DB_BASE_DIR = "target/test-data/backlogmetricdb";
private static String dbLocation = DB_BASE_DIR + File.separator + "data.db";
protected static String url = "jdbc:derby:" + dbLocation + ";create=true";
private static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql";
protected LocalFileSystem fs = new LocalFileSystem();
private static BacklogMetricStore backlogMetricStore;
private static FalconJPAService falconJPAService = FalconJPAService.get();
private static BacklogMetricEmitterService backlogMetricEmitterService;
private MetricNotificationService mockMetricNotificationService;
private int execDBCLICommands(String[] args) {
return new FalconStateStoreDBCLI().run(args);
}
private void createDB(String file) {
File sqlFile = new File(file);
String[] argsCreate = {"create", "-sqlfile", sqlFile.getAbsolutePath(), "-run"};
int result = execDBCLICommands(argsCreate);
Assert.assertEquals(0, result);
Assert.assertTrue(sqlFile.exists());
}
@AfterClass
public void cleanup() throws IOException {
cleanupDB();
}
private void cleanupDB() throws IOException {
fs.delete(new Path(DB_BASE_DIR), true);
}
@BeforeClass
public void setup() throws Exception {
StateStoreProperties.get().setProperty(FalconJPAService.URL, url);
Configuration localConf = new Configuration();
fs.initialize(LocalFileSystem.getDefaultUri(localConf), localConf);
fs.mkdirs(new Path(DB_BASE_DIR));
createDB(DB_SQL_FILE);
falconJPAService.init();
this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
this.conf = dfsCluster.getConf();
backlogMetricStore = new BacklogMetricStore();
mockMetricNotificationService = Mockito.mock(MetricNotificationService.class);
Mockito.when(mockMetricNotificationService.getName()).thenReturn("MetricNotificationService");
Services.get().register(mockMetricNotificationService);
Services.get().register(BacklogMetricEmitterService.get());
backlogMetricEmitterService = BacklogMetricEmitterService.get();
}
@Test
public void testBacklogEmitter() throws Exception {
backlogMetricEmitterService.init();
storeEntity(EntityType.PROCESS, "entity1");
backlogMetricEmitterService.highSLAMissed("entity1", "cluster1", EntityType.PROCESS,
BacklogMetricEmitterService.DATE_FORMAT.get().parse("2016-06-30T00-00Z"));
Thread.sleep(10);
ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Long> valueCaptor = ArgumentCaptor.forClass(Long.class);
Mockito.verify(mockMetricNotificationService, Mockito.atLeastOnce()).publish(captor.capture(),
valueCaptor.capture());
Assert.assertEquals(captor.getValue(), "falcon.cluster1.testPipeline.EXECUTION.entity1.backlogInMins");
WorkflowExecutionContext workflowExecutionContext = getWorkflowExecutionContext();
backlogMetricEmitterService.onSuccess(workflowExecutionContext);
Thread.sleep(100);
Mockito.reset(mockMetricNotificationService);
Mockito.verify(mockMetricNotificationService, Mockito.times(0)).publish(Mockito.any(String.class),
Mockito.any(Long.class));
backlogMetricEmitterService.highSLAMissed("entity1", "cluster1", EntityType.PROCESS,
BacklogMetricEmitterService.DATE_FORMAT.get().parse("2016-06-30T00-00Z"));
Thread.sleep(1000);
backlogMetricEmitterService.onRemove(EntityUtil.getEntity(EntityType.PROCESS, "entity1"));
Assert.assertNull(backlogMetricStore.getAllInstances());
}
private WorkflowExecutionContext getWorkflowExecutionContext() {
Map<WorkflowExecutionArgs, String> args = new HashMap<>();
args.put(WorkflowExecutionArgs.ENTITY_TYPE, "process");
args.put(WorkflowExecutionArgs.CLUSTER_NAME, "cluster1");
args.put(WorkflowExecutionArgs.ENTITY_NAME, "entity1");
args.put(WorkflowExecutionArgs.NOMINAL_TIME, "2016-06-30-00-00");
args.put(WorkflowExecutionArgs.OPERATION, "GENERATE");
return new WorkflowExecutionContext(args);
}
}