Define griffin plain-vanilla hook.
the purpose of hook is for integration with components outside. Griffin would offer information about internal task status.
Task: GRIFFIN-200
Author: Eugene <liujin@apache.org>
Author: William Guo <guoyp@apache.org>
Closes #444 from toyboxman/hook.
diff --git a/service/src/main/java/org/apache/griffin/core/event/EventPointcutType.java b/service/src/main/java/org/apache/griffin/core/event/EventPointcutType.java
new file mode 100644
index 0000000..166372d
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/event/EventPointcutType.java
@@ -0,0 +1,26 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.event;
+
+public enum EventPointcutType {
+ BEFORE,
+ PENDING,
+ AFTER
+}
diff --git a/service/src/main/java/org/apache/griffin/core/event/EventSourceType.java b/service/src/main/java/org/apache/griffin/core/event/EventSourceType.java
new file mode 100644
index 0000000..6bca570
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/event/EventSourceType.java
@@ -0,0 +1,25 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.event;
+
+public enum EventSourceType {
+ JOB,
+ MEASURE
+}
diff --git a/service/src/main/java/org/apache/griffin/core/event/EventType.java b/service/src/main/java/org/apache/griffin/core/event/EventType.java
new file mode 100644
index 0000000..b00a9b0
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/event/EventType.java
@@ -0,0 +1,26 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.event;
+
+public enum EventType {
+ CREATION_EVENT,
+ CHANGE_EVENT,
+ REMOVAL_EVENT
+}
diff --git a/service/src/main/java/org/apache/griffin/core/event/GriffinAbstractEvent.java b/service/src/main/java/org/apache/griffin/core/event/GriffinAbstractEvent.java
new file mode 100644
index 0000000..db29e44
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/event/GriffinAbstractEvent.java
@@ -0,0 +1,57 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.event;
+
+public abstract class GriffinAbstractEvent<T> implements GriffinEvent<T> {
+ private T source;
+ private EventType type;
+ private EventSourceType sourceType;
+ private EventPointcutType pointcutType;
+
+ public GriffinAbstractEvent(T source,
+ EventType type,
+ EventSourceType sourceType,
+ EventPointcutType pointcutType) {
+ this.source = source;
+ this.type = type;
+ this.sourceType = sourceType;
+ this.pointcutType = pointcutType;
+ }
+
+ @Override
+ public EventType getType() {
+ return this.type;
+ }
+
+ @Override
+ public EventPointcutType getPointcut() {
+ return pointcutType;
+ }
+
+ @Override
+ public EventSourceType getSourceType() {
+ return sourceType;
+ }
+
+ @Override
+ public T getSource() {
+ return source;
+ }
+}
diff --git a/service/src/main/java/org/apache/griffin/core/event/GriffinEvent.java b/service/src/main/java/org/apache/griffin/core/event/GriffinEvent.java
new file mode 100644
index 0000000..cff6163
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/event/GriffinEvent.java
@@ -0,0 +1,54 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.event;
+
+/**
+ * A semantic event which indicates that a griffin-defined action occurred.
+ * This high-level event is generated by an action (such as an
+ * <code>addJob</code>) when the task-specific action occurs.
+ * The event is passed to every <code>GriffinHook</code> object
+ * that registered to receive such events using configuration.
+ *
+ * @author Eugene Liu
+ * @since 0.3
+ */
+public interface GriffinEvent<T> {
+ /**
+ * @return concrete event type
+ */
+ EventType getType();
+
+ /**
+ * @return concrete event pointcut type
+ */
+ EventPointcutType getPointcut();
+
+ /**
+ * @return concrete event source type
+ */
+ EventSourceType getSourceType();
+
+ /**
+ * The object on which the Event initially occurred.
+ *
+ * @return The object on which the Event initially occurred.
+ */
+ T getSource();
+}
diff --git a/service/src/main/java/org/apache/griffin/core/event/GriffinEventManager.java b/service/src/main/java/org/apache/griffin/core/event/GriffinEventManager.java
new file mode 100644
index 0000000..996d7a7
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/event/GriffinEventManager.java
@@ -0,0 +1,59 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.event;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.ApplicationContext;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+@Component
+public class GriffinEventManager {
+ @Autowired
+ private ApplicationContext applicationContext;
+
+ @Value("#{'${internal.event.listeners}'.split(',')}")
+ private Set<String> enabledListeners;
+
+ private List<GriffinHook> eventListeners;
+
+ @PostConstruct
+ void initializeListeners() {
+ List<GriffinHook> eventListeners = new ArrayList<>();
+ applicationContext.getBeansOfType(GriffinHook.class)
+ .forEach((beanName, listener) -> {
+ if (enabledListeners.contains(beanName)) {
+ eventListeners.add(listener);
+ }
+ });
+ this.eventListeners = eventListeners;
+ }
+
+ public void notifyListeners(GriffinEvent event) {
+ eventListeners.forEach(listener -> {
+ listener.onEvent(event);
+ });
+ }
+}
diff --git a/service/src/main/java/org/apache/griffin/core/event/GriffinHook.java b/service/src/main/java/org/apache/griffin/core/event/GriffinHook.java
new file mode 100644
index 0000000..5090648
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/event/GriffinHook.java
@@ -0,0 +1,42 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.event;
+
+import org.apache.griffin.core.exception.GriffinException;
+
+/**
+ * The Hook interface for receiving internal events.
+ * The class that is interested in processing an event
+ * implements this interface, and the object created with that
+ * class is registered to griffin, using the configuration.
+ * When the event occurs, that object's <code>onEvent</code> method is
+ * invoked.
+ *
+ * @author Eugene Liu
+ * @since 0.3
+ */
+public interface GriffinHook {
+ /**
+ * Invoked when an action occurs.
+ *
+ * @see GriffinEvent
+ */
+ void onEvent(GriffinEvent event) throws GriffinException;
+}
diff --git a/service/src/main/java/org/apache/griffin/core/event/JobEvent.java b/service/src/main/java/org/apache/griffin/core/event/JobEvent.java
new file mode 100644
index 0000000..5fdea0f
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/event/JobEvent.java
@@ -0,0 +1,60 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.event;
+
+import org.apache.griffin.core.job.entity.AbstractJob;
+
+public class JobEvent extends GriffinAbstractEvent<AbstractJob> {
+
+ private JobEvent(AbstractJob source,
+ EventType type,
+ EventSourceType sourceType,
+ EventPointcutType pointcutType) {
+ super(source, type, sourceType, pointcutType);
+ }
+
+ public static JobEvent yieldJobEventBeforeCreation(AbstractJob source) {
+ return new JobEvent(source,
+ EventType.CREATION_EVENT,
+ EventSourceType.JOB,
+ EventPointcutType.BEFORE);
+ }
+
+ public static JobEvent yieldJobEventAfterCreation(AbstractJob source) {
+ return new JobEvent(source,
+ EventType.CREATION_EVENT,
+ EventSourceType.JOB,
+ EventPointcutType.AFTER);
+ }
+
+ public static JobEvent yieldJobEventBeforeRemoval(AbstractJob source) {
+ return new JobEvent(source,
+ EventType.REMOVAL_EVENT,
+ EventSourceType.JOB,
+ EventPointcutType.BEFORE);
+ }
+
+ public static JobEvent yieldJobEventAfterRemoval(AbstractJob source) {
+ return new JobEvent(source,
+ EventType.REMOVAL_EVENT,
+ EventSourceType.JOB,
+ EventPointcutType.AFTER);
+ }
+}
diff --git a/service/src/main/java/org/apache/griffin/core/event/JobEventHook.java b/service/src/main/java/org/apache/griffin/core/event/JobEventHook.java
new file mode 100644
index 0000000..5c95f3e
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/event/JobEventHook.java
@@ -0,0 +1,31 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.event;
+
+import org.apache.griffin.core.exception.GriffinException;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration(value = "GriffinJobEventHook")
+public class JobEventHook implements GriffinHook {
+ @Override
+ public void onEvent(GriffinEvent event) throws GriffinException {
+ // This method needs to be reimplemented by event-consuming purpose
+ }
+}
diff --git a/service/src/main/java/org/apache/griffin/core/exception/GriffinException.java b/service/src/main/java/org/apache/griffin/core/exception/GriffinException.java
index db532ab..f1cdaba 100644
--- a/service/src/main/java/org/apache/griffin/core/exception/GriffinException.java
+++ b/service/src/main/java/org/apache/griffin/core/exception/GriffinException.java
@@ -60,4 +60,9 @@
}
}
+ public static class UnImplementedException extends GriffinException {
+ public UnImplementedException(String message) {
+ super(message);
+ }
+ }
}
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
index ac14461..4768efc 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
@@ -19,43 +19,12 @@
package org.apache.griffin.core.job;
-import static java.util.TimeZone.getTimeZone;
-import static org.apache.griffin.core.config.EnvConfig.ENV_BATCH;
-import static org.apache.griffin.core.config.EnvConfig.ENV_STREAMING;
-import static org.apache.griffin.core.exception.GriffinExceptionMessage.INVALID_MEASURE_ID;
-import static org.apache.griffin.core.exception.GriffinExceptionMessage.JOB_ID_DOES_NOT_EXIST;
-import static org.apache.griffin.core.exception.GriffinExceptionMessage.JOB_NAME_DOES_NOT_EXIST;
-import static org.apache.griffin.core.exception.GriffinExceptionMessage.JOB_TYPE_DOES_NOT_SUPPORT;
-import static org.apache.griffin.core.exception.GriffinExceptionMessage.MEASURE_TYPE_DOES_NOT_SUPPORT;
-import static org.apache.griffin.core.exception.GriffinExceptionMessage.NO_SUCH_JOB_ACTION;
-import static org.apache.griffin.core.exception.GriffinExceptionMessage.QUARTZ_JOB_ALREADY_EXIST;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.BUSY;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.DEAD;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.IDLE;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.NOT_STARTED;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.RECOVERING;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.RUNNING;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.STARTING;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.UNKNOWN;
-import static org.apache.griffin.core.job.entity.LivySessionStates.isActive;
-import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.BATCH;
-import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.STREAMING;
-import static org.quartz.CronScheduleBuilder.cronSchedule;
-import static org.quartz.JobBuilder.newJob;
-import static org.quartz.JobKey.jobKey;
-import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
-import static org.quartz.TriggerBuilder.newTrigger;
-import static org.quartz.TriggerKey.triggerKey;
-
import com.fasterxml.jackson.core.type.TypeReference;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.TimeZone;
-
import org.apache.commons.lang.StringUtils;
+import org.apache.griffin.core.event.GriffinEventManager;
import org.apache.griffin.core.exception.GriffinException;
+import org.apache.griffin.core.event.JobEvent;
import org.apache.griffin.core.job.entity.AbstractJob;
import org.apache.griffin.core.job.entity.BatchJob;
import org.apache.griffin.core.job.entity.JobHealth;
@@ -99,10 +68,43 @@
import org.springframework.web.client.ResourceAccessException;
import org.springframework.web.client.RestTemplate;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TimeZone;
+
+import static java.util.TimeZone.getTimeZone;
+import static org.apache.griffin.core.config.EnvConfig.ENV_BATCH;
+import static org.apache.griffin.core.config.EnvConfig.ENV_STREAMING;
+import static org.apache.griffin.core.exception.GriffinExceptionMessage.INVALID_MEASURE_ID;
+import static org.apache.griffin.core.exception.GriffinExceptionMessage.JOB_ID_DOES_NOT_EXIST;
+import static org.apache.griffin.core.exception.GriffinExceptionMessage.JOB_NAME_DOES_NOT_EXIST;
+import static org.apache.griffin.core.exception.GriffinExceptionMessage.JOB_TYPE_DOES_NOT_SUPPORT;
+import static org.apache.griffin.core.exception.GriffinExceptionMessage.MEASURE_TYPE_DOES_NOT_SUPPORT;
+import static org.apache.griffin.core.exception.GriffinExceptionMessage.NO_SUCH_JOB_ACTION;
+import static org.apache.griffin.core.exception.GriffinExceptionMessage.QUARTZ_JOB_ALREADY_EXIST;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State.BUSY;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State.DEAD;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State.IDLE;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State.NOT_STARTED;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State.RECOVERING;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State.RUNNING;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State.STARTING;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State.UNKNOWN;
+import static org.apache.griffin.core.job.entity.LivySessionStates.isActive;
+import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.BATCH;
+import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.STREAMING;
+import static org.quartz.CronScheduleBuilder.cronSchedule;
+import static org.quartz.JobBuilder.newJob;
+import static org.quartz.JobKey.jobKey;
+import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
+import static org.quartz.TriggerBuilder.newTrigger;
+import static org.quartz.TriggerKey.triggerKey;
+
@Service
public class JobServiceImpl implements JobService {
private static final Logger LOGGER = LoggerFactory
- .getLogger(JobServiceImpl.class);
+ .getLogger(JobServiceImpl.class);
public static final String GRIFFIN_JOB_ID = "griffinJobId";
private static final int MAX_PAGE_SIZE = 1024;
private static final int DEFAULT_PAGE_SIZE = 10;
@@ -127,6 +129,8 @@
private BatchJobOperatorImpl batchJobOp;
@Autowired
private StreamingJobOperatorImpl streamingJobOp;
+ @Autowired
+ private GriffinEventManager eventManager;
private RestTemplate restTemplate;
@@ -158,17 +162,22 @@
} catch (SchedulerException e) {
LOGGER.error("Failed to get RUNNING jobs.", e);
throw new GriffinException
- .ServiceException("Failed to get RUNNING jobs.", e);
+ .ServiceException("Failed to get RUNNING jobs.", e);
}
return dataList;
}
@Override
public AbstractJob addJob(AbstractJob job) throws Exception {
+ JobEvent jobEvent = JobEvent.yieldJobEventBeforeCreation(job);
+ eventManager.notifyListeners(jobEvent);
Long measureId = job.getMeasureId();
GriffinMeasure measure = getMeasureIfValid(measureId);
JobOperator op = getJobOperator(measure.getProcessType());
- return op.add(job, measure);
+ AbstractJob jobSaved = op.add(job, measure);
+ jobEvent = JobEvent.yieldJobEventAfterCreation(jobSaved);
+ eventManager.notifyListeners(jobEvent);
+ return jobSaved;
}
@Override
@@ -177,7 +186,7 @@
if (job == null) {
LOGGER.warn("Job id {} does not exist.", jobId);
throw new GriffinException
- .NotFoundException(JOB_ID_DOES_NOT_EXIST);
+ .NotFoundException(JOB_ID_DOES_NOT_EXIST);
}
return job;
}
@@ -198,7 +207,7 @@
}
private void doAction(String action, AbstractJob job, JobOperator op)
- throws Exception {
+ throws Exception {
switch (action) {
case START:
op.start(job);
@@ -208,7 +217,7 @@
break;
default:
throw new GriffinException
- .NotFoundException(NO_SUCH_JOB_ACTION);
+ .NotFoundException(NO_SUCH_JOB_ACTION);
}
}
@@ -224,8 +233,12 @@
public void deleteJob(Long jobId) throws SchedulerException {
AbstractJob job = jobRepo.findByIdAndDeleted(jobId, false);
validateJobExist(job);
+ JobEvent event = JobEvent.yieldJobEventBeforeRemoval(job);
+ eventManager.notifyListeners(event);
JobOperator op = getJobOperator(job);
op.delete(job);
+ event = JobEvent.yieldJobEventAfterRemoval(job);
+ eventManager.notifyListeners(event);
}
/**
@@ -239,31 +252,35 @@
if (CollectionUtils.isEmpty(jobs)) {
LOGGER.warn("There is no job with '{}' name.", name);
throw new GriffinException
- .NotFoundException(JOB_NAME_DOES_NOT_EXIST);
+ .NotFoundException(JOB_NAME_DOES_NOT_EXIST);
}
for (AbstractJob job : jobs) {
+ JobEvent event = JobEvent.yieldJobEventBeforeRemoval(job);
+ eventManager.notifyListeners(event);
JobOperator op = getJobOperator(job);
op.delete(job);
+ event = JobEvent.yieldJobEventAfterRemoval(job);
+ eventManager.notifyListeners(event);
}
}
@Override
public List<JobInstanceBean> findInstancesOfJob(
- Long jobId,
- int page,
- int size) {
+ Long jobId,
+ int page,
+ int size) {
AbstractJob job = jobRepo.findByIdAndDeleted(jobId, false);
if (job == null) {
LOGGER.warn("Job id {} does not exist.", jobId);
throw new GriffinException
- .NotFoundException(JOB_ID_DOES_NOT_EXIST);
+ .NotFoundException(JOB_ID_DOES_NOT_EXIST);
}
size = size > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : size;
size = size <= 0 ? DEFAULT_PAGE_SIZE : size;
Pageable pageable = new PageRequest(page, size,
- Sort.Direction.DESC, "tms");
+ Sort.Direction.DESC, "tms");
List<JobInstanceBean> instances = instanceRepo.findByJobId(jobId,
- pageable);
+ pageable);
return updateState(instances);
}
@@ -294,7 +311,7 @@
} catch (SchedulerException e) {
LOGGER.error("Job schedule exception. {}", e);
throw new GriffinException
- .ServiceException("Fail to Get HealthInfo", e);
+ .ServiceException("Fail to Get HealthInfo", e);
}
}
@@ -305,8 +322,8 @@
public void deleteExpiredJobInstance() {
Long timeMills = System.currentTimeMillis();
List<JobInstanceBean> instances = instanceRepo
- .findByExpireTmsLessThanEqual
- (timeMills);
+ .findByExpireTmsLessThanEqual
+ (timeMills);
if (!batchJobOp.pauseJobInstances(instances)) {
LOGGER.error("Pause job failure.");
return;
@@ -329,7 +346,7 @@
return streamingJobOp;
}
throw new GriffinException.BadRequestException
- (JOB_TYPE_DOES_NOT_SUPPORT);
+ (JOB_TYPE_DOES_NOT_SUPPORT);
}
private JobOperator getJobOperator(ProcessType type) {
@@ -339,21 +356,21 @@
return streamingJobOp;
}
throw new GriffinException.BadRequestException
- (MEASURE_TYPE_DOES_NOT_SUPPORT);
+ (MEASURE_TYPE_DOES_NOT_SUPPORT);
}
TriggerKey getTriggerKeyIfValid(String qName, String qGroup) throws
- SchedulerException {
+ SchedulerException {
TriggerKey triggerKey = triggerKey(qName, qGroup);
if (factory.getScheduler().checkExists(triggerKey)) {
throw new GriffinException.ConflictException
- (QUARTZ_JOB_ALREADY_EXIST);
+ (QUARTZ_JOB_ALREADY_EXIST);
}
return triggerKey;
}
List<? extends Trigger> getTriggers(String name, String group) throws
- SchedulerException {
+ SchedulerException {
if (name == null || group == null) {
return null;
}
@@ -363,7 +380,7 @@
}
private JobState genJobState(AbstractJob job, String action) throws
- SchedulerException {
+ SchedulerException {
JobOperator op = getJobOperator(job);
JobState state = op.getState(job, action);
job.setJobState(state);
@@ -375,7 +392,7 @@
}
void addJob(TriggerKey tk, AbstractJob job, ProcessType type) throws
- Exception {
+ Exception {
JobDetail jobDetail = addJobDetail(tk, job);
Trigger trigger = genTriggerInstance(tk, jobDetail, job, type);
factory.getScheduler().scheduleJob(trigger);
@@ -405,34 +422,34 @@
private GriffinMeasure getMeasureIfValid(Long measureId) {
GriffinMeasure measure = measureRepo.findByIdAndDeleted(measureId,
- false);
+ false);
if (measure == null) {
LOGGER.warn("The measure id {} isn't valid. Maybe it doesn't " +
- "exist or is external measure type.",
- measureId);
+ "exist or is external measure type.",
+ measureId);
throw new GriffinException.BadRequestException(INVALID_MEASURE_ID);
}
return measure;
}
private Trigger genTriggerInstance(TriggerKey tk, JobDetail jd, AbstractJob
- job, ProcessType type) {
+ job, ProcessType type) {
TriggerBuilder builder = newTrigger().withIdentity(tk).forJob(jd);
if (type == BATCH) {
TimeZone timeZone = getTimeZone(job.getTimeZone());
return builder.withSchedule(cronSchedule(job.getCronExpression())
- .inTimeZone(timeZone)).build();
+ .inTimeZone(timeZone)).build();
} else if (type == STREAMING) {
return builder.startNow().withSchedule(simpleSchedule()
- .withRepeatCount(0)).build();
+ .withRepeatCount(0)).build();
}
throw new GriffinException.BadRequestException
- (JOB_TYPE_DOES_NOT_SUPPORT);
+ (JOB_TYPE_DOES_NOT_SUPPORT);
}
private JobDetail addJobDetail(TriggerKey triggerKey, AbstractJob job)
- throws SchedulerException {
+ throws SchedulerException {
Scheduler scheduler = factory.getScheduler();
JobKey jobKey = jobKey(triggerKey.getName(), triggerKey.getGroup());
JobDetail jobDetail;
@@ -441,7 +458,7 @@
jobDetail = scheduler.getJobDetail(jobKey);
} else {
jobDetail = newJob(JobInstance.class).storeDurably().withIdentity
- (jobKey).build();
+ (jobKey).build();
}
setJobDataMap(jobDetail, job);
scheduler.addJob(jobDetail, isJobKeyExist);
@@ -462,9 +479,9 @@
* @param measureId measure id
*/
public void deleteJobsRelateToMeasure(Long measureId) throws
- SchedulerException {
+ SchedulerException {
List<AbstractJob> jobs = jobRepo.findByMeasureIdAndDeleted(measureId,
- false);
+ false);
if (CollectionUtils.isEmpty(jobs)) {
LOGGER.info("Measure id {} has no related jobs.", measureId);
return;
@@ -478,7 +495,7 @@
@Scheduled(fixedDelayString = "${jobInstance.fixedDelay.in.milliseconds}")
public void syncInstancesOfAllJobs() {
LivySessionStates.State[] states = {STARTING, NOT_STARTED, RECOVERING,
- IDLE, RUNNING, BUSY};
+ IDLE, RUNNING, BUSY};
List<JobInstanceBean> beans = instanceRepo.findByActiveState(states);
for (JobInstanceBean jobInstance : beans) {
syncInstancesOfJob(jobInstance);
@@ -496,21 +513,21 @@
return;
}
String uri = env.getProperty("livy.uri") + "/"
- + instance.getSessionId();
+ + instance.getSessionId();
TypeReference<HashMap<String, Object>> type =
- new TypeReference<HashMap<String, Object>>() {
- };
+ new TypeReference<HashMap<String, Object>>() {
+ };
try {
String resultStr = restTemplate.getForObject(uri, String.class);
HashMap<String, Object> resultMap = JsonUtil.toEntity(resultStr,
- type);
+ type);
setJobInstanceIdAndUri(instance, resultMap);
} catch (ResourceAccessException e) {
LOGGER.error("Your url may be wrong. Please check {}.\n {}", uri, e
- .getMessage());
+ .getMessage());
} catch (HttpClientErrorException e) {
LOGGER.warn("sessionId({}) appId({}) {}.", instance.getSessionId(),
- instance.getAppId(), e.getMessage());
+ instance.getAppId(), e.getMessage());
setStateByYarn(instance, e);
} catch (Exception e) {
LOGGER.error(e.getMessage());
@@ -523,7 +540,7 @@
if (!checkStatus(instance, e)) {
int code = e.getStatusCode().value();
boolean match = (code == 400 || code == 404)
- && instance.getAppId() != null;
+ && instance.getAppId() != null;
//this means your url is correct,but your param is wrong or livy
//session may be overdue.
if (match) {
@@ -553,7 +570,7 @@
// {id} not found',this means instance may not be scheduled for
// a long time by spark for too many tasks. It may be dead.
if (code == 404 && appId == null && (responseBody != null &&
- responseBody.contains(sessionId.toString()))) {
+ responseBody.contains(sessionId.toString()))) {
instance.setState(DEAD);
instance.setDeleted(true);
instanceRepo.save(instance);
@@ -564,7 +581,7 @@
private void setStateByYarn(JobInstanceBean instance) {
LOGGER.warn("Spark session {} may be overdue! " +
- "Now we use yarn to update state.", instance.getSessionId());
+ "Now we use yarn to update state.", instance.getSessionId());
String yarnUrl = env.getProperty("yarn.uri");
boolean success = YarnNetUtil.update(yarnUrl, instance);
if (!success) {
@@ -578,16 +595,16 @@
private void setJobInstanceIdAndUri(JobInstanceBean instance, HashMap<String
- , Object> resultMap) {
+ , Object> resultMap) {
if (resultMap != null) {
Object state = resultMap.get("state");
Object appId = resultMap.get("appId");
instance.setState(state == null ? null : LivySessionStates.State
- .valueOf(state.toString().toUpperCase
- ()));
+ .valueOf(state.toString().toUpperCase
+ ()));
instance.setAppId(appId == null ? null : appId.toString());
instance.setAppUri(appId == null ? null : env
- .getProperty("yarn.uri") + " /cluster/app/ " + appId);
+ .getProperty("yarn.uri") + " /cluster/app/ " + appId);
instanceRepo.save(instance);
}
}
@@ -595,9 +612,9 @@
public Boolean isJobHealthy(Long jobId) {
Pageable pageable = new PageRequest(0, 1, Sort.Direction.DESC, "tms");
List<JobInstanceBean> instances = instanceRepo.findByJobId(jobId,
- pageable);
+ pageable);
return !CollectionUtils.isEmpty(instances) && LivySessionStates
- .isHealthy(instances.get(0).getState());
+ .isHealthy(instances.get(0).getState());
}
@Override
diff --git a/service/src/main/resources/application.properties b/service/src/main/resources/application.properties
index 2e9c8f7..1c26319 100644
--- a/service/src/main/resources/application.properties
+++ b/service/src/main/resources/application.properties
@@ -62,3 +62,5 @@
livy.uri=http://localhost:8998/batches
# yarn url
yarn.uri=http://localhost:8088
+# griffin event listener
+internal.event.listeners=GriffinJobEventHook
diff --git a/service/src/test/java/org/apache/griffin/core/job/EventServiceTest.java b/service/src/test/java/org/apache/griffin/core/job/EventServiceTest.java
new file mode 100644
index 0000000..2cbbd84
--- /dev/null
+++ b/service/src/test/java/org/apache/griffin/core/job/EventServiceTest.java
@@ -0,0 +1,107 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job;
+
+import static org.apache.griffin.core.util.EntityHelper.createGriffinMeasure;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.griffin.core.event.EventSourceType;
+import org.apache.griffin.core.event.EventType;
+import org.apache.griffin.core.event.GriffinEvent;
+import org.apache.griffin.core.event.GriffinHook;
+import org.apache.griffin.core.exception.GriffinException;
+import org.apache.griffin.core.job.entity.BatchJob;
+import org.apache.griffin.core.job.entity.JobDataSegment;
+import org.apache.griffin.core.measure.entity.GriffinMeasure;
+import org.apache.griffin.core.measure.entity.Measure;
+import org.apache.griffin.core.util.EntityHelper;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest;
+import org.springframework.boot.test.autoconfigure.orm.jpa.TestEntityManager;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@DataJpaTest
+@ComponentScan("org.apache.griffin.core")
+public class EventServiceTest {
+ @Autowired
+ private JobService jobService;
+
+ @Autowired
+ private TestEntityManager entityManager;
+
+ @Autowired
+ private List<GriffinEvent> eventList;
+
+ @Before
+ public void setup() throws Exception {
+ entityManager.clear();
+ entityManager.flush();
+ setEntityManager();
+ }
+
+ @Test
+ public void testAddJobEvent() throws Exception {
+ BatchJob batch_Job = EntityHelper.createGriffinJob();
+ batch_Job.setCronExpression("0 0 12 * * ?");
+ batch_Job.setTimeZone("Asia/Shanghai");
+ JobDataSegment jds = new JobDataSegment();
+ jds.setAsTsBaseline(true);
+ jds.setDataConnectorName("target_name");
+ List jds_list = new ArrayList();
+ jds_list.add(jds);
+ batch_Job.setSegments(jds_list);
+ jobService.addJob(batch_Job);
+ Assert.assertEquals(2, eventList.size());
+ Assert.assertEquals(EventType.CREATION_EVENT, eventList.get(0).getType());
+ Assert.assertEquals(EventSourceType.JOB, eventList.get(1).getSourceType());
+ }
+
+ public void setEntityManager() throws Exception {
+ Measure measure1 = createGriffinMeasure("m1");
+ measure1.setOrganization("org1");
+ ((GriffinMeasure) measure1).setProcessType(GriffinMeasure.ProcessType.BATCH);
+ entityManager.persistAndFlush(measure1);
+ }
+
+ @Configuration(value = "GriffinTestJobEventHook")
+ public static class TestJobEventHook implements GriffinHook {
+ private List<GriffinEvent> eventList = new ArrayList<>();
+
+ @Override
+ public void onEvent(GriffinEvent event) throws GriffinException {
+ eventList.add(event);
+ }
+
+ @Bean
+ public List<GriffinEvent> getReceivedEvents() {
+ return eventList;
+ }
+ }
+}
diff --git a/service/src/test/resources/application.properties b/service/src/test/resources/application.properties
index 96d28dc..86d5316 100644
--- a/service/src/test/resources/application.properties
+++ b/service/src/test/resources/application.properties
@@ -6,9 +6,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -56,4 +56,6 @@
elasticsearch.port=9200
elasticsearch.scheme=http
# elasticsearch.user = user
-# elasticsearch.password = password
\ No newline at end of file
+# elasticsearch.password = password
+# griffin event listener
+internal.event.listeners=GriffinJobEventHook,GriffinTestJobEventHook