blob: 57d5a43b36aadaeec7bca620479dee364cbfc9f1 [file] [log] [blame]
package org.apache.helix.cloud.event.helix;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.function.BiConsumer;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.cloud.event.CloudEventListener;
import org.apache.helix.util.HelixUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.helix.cloud.event.helix.CloudEventCallbackProperty.HelixOperation;
import static org.apache.helix.cloud.event.helix.CloudEventCallbackProperty.UserDefinedCallbackType;
/**
* A helix manager-based cloud event listener implementation
*/
public class HelixCloudEventListener implements CloudEventListener {
private static Logger LOG = LoggerFactory.getLogger(HelixCloudEventListener.class);
private final CloudEventCallbackProperty _property;
private final DefaultCloudEventCallbackImpl _callbackImplClass;
private final HelixManager _helixManager;
public HelixCloudEventListener(CloudEventCallbackProperty property, HelixManager helixManager)
throws InstantiationException, IllegalAccessException {
this._property = property;
this._helixManager = helixManager;
this._callbackImplClass = loadCloudEventCallbackImplClass(property.getUserArgs()
.getOrDefault(CloudEventCallbackProperty.UserArgsInputKey.CALLBACK_IMPL_CLASS_NAME,
DefaultCloudEventCallbackImpl.class.getCanonicalName()));
}
/**
* The type of incoming event
*/
public enum EventType {
ON_PAUSE, ON_RESUME
}
/**
* Below are lists defining the type and sequence of callbacks for each type of events
*/
private final List<Object> onPauseOperations = Arrays
.asList(UserDefinedCallbackType.PRE_ON_PAUSE, HelixOperation.MAINTENANCE_MODE,
HelixOperation.ENABLE_DISABLE_INSTANCE, UserDefinedCallbackType.POST_ON_PAUSE);
private final List<Object> onResumeOperations = Arrays
.asList(UserDefinedCallbackType.PRE_ON_RESUME, HelixOperation.ENABLE_DISABLE_INSTANCE,
HelixOperation.MAINTENANCE_MODE, UserDefinedCallbackType.POST_ON_RESUME);
@Override
public void performAction(Object eventType, Object eventInfo) {
LOG.info("Received {} event, event info {}, timestamp {}. Acting on the event... "
+ "Actor {}, based on callback implementation class {}.", ((EventType) eventType).name(),
eventInfo == null ? "N/A" : eventInfo.toString(), System.currentTimeMillis(), _helixManager,
_callbackImplClass.getClass().getCanonicalName());
if (eventType == EventType.ON_PAUSE) {
onPauseOperations.forEach(operation -> executeOperation(eventType, eventInfo, operation));
} else if (eventType == EventType.ON_RESUME) {
onResumeOperations.forEach(operation -> executeOperation(eventType, eventInfo, operation));
}
}
private void executeOperation(Object eventType, Object eventInfo, Object operation) {
Set<CloudEventCallbackProperty.HelixOperation> enabledHelixOperationSet =
_property.getEnabledHelixOperation();
if (HelixOperation.ENABLE_DISABLE_INSTANCE.equals(operation)) {
if (enabledHelixOperationSet.contains(HelixOperation.ENABLE_DISABLE_INSTANCE)) {
if (eventType == EventType.ON_PAUSE) {
_callbackImplClass.disableInstance(_helixManager, eventInfo);
} else {
_callbackImplClass.enableInstance(_helixManager, eventInfo);
}
}
} else if (HelixOperation.MAINTENANCE_MODE.equals(operation)) {
if (enabledHelixOperationSet.contains(HelixOperation.MAINTENANCE_MODE)) {
if (eventType == EventType.ON_PAUSE) {
_callbackImplClass.enterMaintenanceMode(_helixManager, eventInfo);
} else {
_callbackImplClass.exitMaintenanceMode(_helixManager, eventInfo);
}
}
} else if (operation instanceof UserDefinedCallbackType) {
BiConsumer<HelixManager, Object> callback =
_property.getUserDefinedCallbackMap().get(operation);
if (callback != null) {
callback.accept(_helixManager, eventInfo);
}
} else {
// Should not reach here
throw new HelixException("Unknown category of cloud event operation " + operation.toString());
}
}
@Override
public CloudEventListener.ListenerType getListenerType() {
return CloudEventListener.ListenerType.UNORDERED;
}
private DefaultCloudEventCallbackImpl loadCloudEventCallbackImplClass(String implClassName)
throws IllegalAccessException, InstantiationException {
DefaultCloudEventCallbackImpl implClass;
try {
LOG.info("Loading class: " + implClassName);
implClass = (DefaultCloudEventCallbackImpl) HelixUtil.loadClass(getClass(), implClassName)
.newInstance();
} catch (Exception e) {
implClass = DefaultCloudEventCallbackImpl.class.newInstance();
LOG.error(
"No cloud event callback implementation class found for: {}. message: {}. Using default callback impl class instead.",
implClassName, e.getMessage());
}
LOG.info("Using {} as cloud event callback impl class.",
implClass.getClass().getCanonicalName());
return implClass;
}
}