blob: fbd9208b13843020a14bc27d08dbcc9c3e0535e5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.unomi.graphql.commands;
import graphql.language.InputObjectTypeDefinition;
import graphql.schema.GraphQLInputObjectField;
import graphql.schema.GraphQLInputObjectType;
import org.apache.unomi.api.Event;
import org.apache.unomi.api.services.EventService;
import org.apache.unomi.api.services.ProfileService;
import org.apache.unomi.graphql.CDPGraphQLConstants;
import org.apache.unomi.graphql.types.input.CDPConsentUpdateEventInput;
import org.apache.unomi.graphql.types.input.CDPEventInput;
import org.apache.unomi.graphql.types.input.CDPEventProcessor;
import org.apache.unomi.graphql.types.input.CDPListsUpdateEventInput;
import org.apache.unomi.graphql.types.input.CDPSessionEventInput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
public class ProcessEventsCommand extends BaseCommand<Integer> {
private static final Logger LOG = LoggerFactory.getLogger(ProcessEventsCommand.class.getName());
private static final List<String> STATIC_FIELDS = new ArrayList<>();
private final List<CDPEventInput> eventInputs;
private final List<LinkedHashMap<String, Object>> eventsAsMap;
private final List<GraphQLInputObjectField> fieldDefinitions;
private final AtomicInteger processedEventsQty = new AtomicInteger();
static {
STATIC_FIELDS.add(CDPConsentUpdateEventInput.EVENT_NAME);
STATIC_FIELDS.add(CDPListsUpdateEventInput.EVENT_NAME);
STATIC_FIELDS.add(CDPSessionEventInput.EVENT_NAME);
}
private ProcessEventsCommand(final Builder builder) {
super(builder);
this.eventInputs = builder.eventInputs;
this.eventsAsMap = environment.getArgument("events");
final GraphQLInputObjectType objectType =
(GraphQLInputObjectType) environment.getGraphQLSchema().getType(CDPEventInput.TYPE_NAME);
this.fieldDefinitions = objectType.getFieldDefinitions();
}
@Override
public Integer execute() {
for (int i = 0; i < eventInputs.size(); i++) {
final CDPEventInput eventInput = eventInputs.get(i);
final LinkedHashMap<String, Object> eventInputAsMap = eventsAsMap.get(i);
processStaticFields(eventInput, eventInputAsMap);
processDynamicFields(eventInputAsMap);
}
return processedEventsQty.get();
}
private void processStaticFields(
final CDPEventInput eventInput, final LinkedHashMap<String, Object> eventInputAsMap) {
final List<CDPEventProcessor> eventProcessors = new ArrayList<>();
eventProcessors.add(eventInput.getCdp_consentUpdateEvent());
eventProcessors.add(eventInput.getCdp_listUpdateEvent());
eventProcessors.add(eventInput.getCdp_sessionEvent());
eventProcessors.stream()
.filter(Objects::nonNull)
.forEach(eventProcessor -> {
try {
final Event event = eventProcessor.buildEvent(eventInputAsMap, environment);
if (event != null) {
processEvent(event);
}
} catch (Exception e) {
LOG.warn("Process field {} is failed. enable debug log to see the full stack trace", eventProcessor.getFieldName());
if (LOG.isDebugEnabled()) {
LOG.debug("Process field failed", e);
}
}
});
}
private void processDynamicFields(final LinkedHashMap<String, Object> eventInputAsMap) {
fieldDefinitions.forEach(fieldDefinition -> {
if (!STATIC_FIELDS.contains(fieldDefinition.getName())) {
try {
processField(fieldDefinition, eventInputAsMap);
} catch (Exception e) {
LOG.warn("Process field {} is failed. enable debug log to see the full stack trace", fieldDefinition);
if (LOG.isDebugEnabled()) {
LOG.debug("Process field failed", e);
}
}
}
});
}
private boolean processField(
final GraphQLInputObjectField fieldDefinition,
final LinkedHashMap<String, Object> eventInputAsMap) throws Exception {
if (!eventInputAsMap.containsKey(fieldDefinition.getName())) {
return false;
}
if (fieldDefinition.getType() instanceof GraphQLInputObjectType) {
final GraphQLInputObjectType inputObjectType = (GraphQLInputObjectType) fieldDefinition.getType();
final InputObjectTypeDefinition typeDefinition = inputObjectType.getDefinition();
if (typeDefinition != null && typeDefinition.getAdditionalData().containsKey(CDPGraphQLConstants.EVENT_PROCESSOR_CLASS)) {
final String className = typeDefinition.getAdditionalData().get(CDPGraphQLConstants.EVENT_PROCESSOR_CLASS);
buildAndProcessEvent(className, eventInputAsMap);
return true;
}
}
return false;
}
private void buildAndProcessEvent(final String className, final LinkedHashMap<String, Object> eventInputAsMap) throws Exception {
final Constructor<?> constructor = Class.forName(className).getConstructor();
final Object instance = constructor.newInstance();
if (instance instanceof CDPEventProcessor) {
final Event event = ((CDPEventProcessor) instance).buildEvent(eventInputAsMap, environment);
if (event != null) {
processEvent(event);
}
}
}
private void processEvent(final Event event) {
int eventCode = serviceManager.getService(EventService.class).send(event);
if (eventCode == EventService.PROFILE_UPDATED) {
serviceManager.getService(ProfileService.class).save(event.getProfile());
}
processedEventsQty.incrementAndGet();
}
public static Builder create(final List<CDPEventInput> eventInputs) {
return new Builder(eventInputs);
}
public static final class Builder extends BaseCommand.Builder<Builder> {
private final List<CDPEventInput> eventInputs;
public Builder(final List<CDPEventInput> eventInputs) {
this.eventInputs = eventInputs;
}
@Override
public void validate() {
super.validate();
final List<LinkedHashMap<String, Object>> events = environment.getArgument("events");
if (events == null || events.isEmpty()) {
throw new IllegalArgumentException("The \"events\" variable can not be null or empty");
}
events.forEach(eventInput -> {
Objects.requireNonNull(eventInput.get("cdp_objectID"), "The \"cdp_objectID\" field can not be null");
Objects.requireNonNull(eventInput.get("cdp_profileID"), "The \"cdp_profileID\" field can not be null");
});
}
public ProcessEventsCommand build() {
validate();
return new ProcessEventsCommand(this);
}
}
}