blob: 4e007b0fdcacc863270c124731ebd894697574df [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.s4.edsl;
import java.lang.reflect.Array;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.s4.base.Event;
import org.apache.s4.core.App;
import org.apache.s4.core.ProcessingElement;
import org.apache.s4.core.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
/**
* Implementation of the S4 embedded domain-specific language (EDSL).
*
* <p>
* To write an app extend this class and define the application graph using a chain of methods as follows:
*
* <pre>
* final public class MyApp extends BuilderS4DSL {
*
* protected void onInit() {
*
* pe("Consumer").type(ConsumerPE.class).asSingleton().
* pe("Producer").type(ProducerPE.class).timer().withPeriod(1, TimeUnit.MILLISECONDS).asSingleton().
* emit(SomeEvent.class).withKey("someKey").to("Consumer").
* build()
* }
* </pre>
*
* <p>
* A few things to notice:
* <ul>
* <li>Applications must extend class {@link BuilderS4DSL}
* <li>The graph definition is implemented in the {@link App#onInit} method which is called by the container when the
* application is loaded.
* <li>PEs are defined using strings because they need to be referenced by other parts of the graph. By doing this, we
* can create the whole application in a single chain of methods.
* <li>To assign target streams to PE fields additional information may need to be provided using the {@code onField}
* grammar token when there is an ambiguity. This will happen when a PE has more than one targetStream field with the
* same {@link Event} type. Use the construct {@code emit(SomeEvent.class).onField("streamFieldName")}. If the PE
* doesn't have a field named {@code "streamField"} whose stream parameter type is {@code someEvent)} then the parser
* will fail to build the app.
* <li>To configure a PE, set property values by chaining any number of {@code prop(name, value)} methods. The name
* should match a PE field and the value will be parsed using the type of that field.
* </ul>
* <p>
* Grammar:
*
* <pre>
* (pe , type , prop* , (fireOn , afterInterval? , afterNumEvents?)? , (timer, withPeriod)? ,
* (cache, size , expires? )? , asSingleton? , (emit, onField?,
* (withKey|withKeyFinder)?, to )* )+ , build
* </pre>
*
* <p>
* See the <a href="http://code.google.com/p/diezel">Diezel</a> project for details.
*
*/
public class AppBuilder extends App {
protected App app = this;
static final Logger logger = LoggerFactory.getLogger(AppBuilder.class);
private Multimap<ProcessingElement, StreamBuilder<? extends Event>> pe2stream = LinkedListMultimap.create();
Set<StreamBuilder<? extends Event>> streamBuilders = Sets.newHashSet();
/* Variables used to hold values from state to state. */
ProcessingElement processingElement;
String peName;
Class<? extends Event> triggerEventType;
long triggerInterval = 0;
TimeUnit triggerTimeUnit;
int cacheSize;
StreamBuilder<? extends Event> streamBuilder;
String propertyName, propertyValue;
public static AppBuilder getAppBuilder() {
return new BuilderS4DSL();
}
void addProperty(String name, String value) {
propertyName = name;
propertyValue = value;
setField();
}
void addPe2Stream(ProcessingElement pe, StreamBuilder<? extends Event> st) {
pe2stream.put(pe, st);
}
App buildApp() {
/* Stream to PE writing. */
for (StreamBuilder<? extends Event> sb : streamBuilders) {
for (String peName : sb.pes) {
ProcessingElement pe = getPE(peName);
sb.stream.setPEs(pe);
}
}
/* PE to Stream wiring. */
Map<ProcessingElement, Collection<StreamBuilder<? extends Event>>> pe2streamMap = pe2stream.asMap();
for (Map.Entry<ProcessingElement, Collection<StreamBuilder<? extends Event>>> entry : pe2streamMap.entrySet()) {
ProcessingElement pe = entry.getKey();
Collection<StreamBuilder<? extends Event>> streams = entry.getValue();
if (pe != null && streams != null) {
try {
setStreamField(pe, streams);
} catch (Exception e) {
logger.error("Unable to build app.", e);
return null;
}
}
}
return this;
}
/**
* @param peName
* the peName to set
*/
protected void setPeName(String peName) {
this.peName = peName;
}
/*
* Cannot create an abstract class in Diezel so for now, I just implement the abstract methods here. They need to be
* overloaded by the app developer.
*/
@Override
protected void onStart() {
}
@Override
protected void onInit() {
}
@Override
protected void onClose() {
}
private <T extends ProcessingElement> void setField() {
logger.debug("Adding property [{}] to PE of type [{}].", propertyName, processingElement.getClass().getName());
Class<? extends ProcessingElement> type = processingElement.getClass();
try {
Field f = type.getDeclaredField(propertyName);
f.setAccessible(true);
logger.trace("Type: {}.", f.getType());
logger.trace("GenericType: {}.", f.getGenericType());
/* Set the field. */
if (f.getType().getCanonicalName() == "long") {
f.setLong(processingElement, Long.parseLong(propertyValue));
return;
} else if (f.getType().getCanonicalName() == "int") {
f.setInt(processingElement, Integer.parseInt(propertyValue));
return;
} else if (f.getType().getCanonicalName() == "float") {
f.setFloat(processingElement, Float.parseFloat(propertyValue));
return;
} else if (f.getType().getCanonicalName() == "double") {
f.setDouble(processingElement, Double.parseDouble(propertyValue));
return;
} else if (f.getType().getCanonicalName() == "short") {
f.setShort(processingElement, Short.parseShort(propertyValue));
return;
} else if (f.getType().getCanonicalName() == "byte") {
f.setByte(processingElement, Byte.parseByte(propertyValue));
return;
} else if (f.getType().getCanonicalName() == "boolean") {
f.setBoolean(processingElement, Boolean.parseBoolean(propertyValue));
return;
} else if (f.getType().getCanonicalName() == "char") {
f.setChar(processingElement, (char) Byte.parseByte(propertyValue));
return;
} else if (f.getType().getCanonicalName() == "java.lang.String") {
f.set(processingElement, propertyValue);
return;
}
logger.error("Unable to set field named [{}] in PE of type [{}].", propertyName, type);
throw new IllegalArgumentException();
// production code should handle these exceptions more gracefully
} catch (NoSuchFieldException e) {
logger.error("There is no field named [{}] in PE of type [{}].", propertyName, type);
} catch (Exception e) {
logger.error("Couldn't set value for field [{}] in PE of type [{}].", propertyName, type);
}
}
/* Set the stream fields in PE classes. Infer the field by checking the stream parameter type <? extends Event>. */
private void setStreamField(ProcessingElement pe, Collection<StreamBuilder<? extends Event>> streams)
throws Exception {
/*
* Create a map of the stream fields to the corresponding generic type. We will use this info to assign the
* streams. If the field type matches the stream type and there is no ambiguity, then the assignment is easy. If
* more than one field has the same type, then then we need to do more work.
*/
Field[] fields = pe.getClass().getDeclaredFields();
Multimap<String, Field> typeMap = LinkedListMultimap.create();
logger.debug("Analyzing PE [{}].", pe.getClass().getName());
for (Field field : fields) {
logger.trace("Field [{}] is of generic type [{}].", field.getName(), field.getGenericType());
if (field.getType() == Stream[].class) {
logger.debug("Found stream field: {}", field.getGenericType());
/* Track what fields have streams with the same event type. */
String key = field.getGenericType().toString();
typeMap.put(key, field);
}
}
/* Assign streams to stream fields. */
Multimap<Field, Stream<? extends Event>> assignment = LinkedListMultimap.create();
for (StreamBuilder<? extends Event> sm : streams) {
Stream<? extends Event> stream = sm.stream;
Class<? extends Event> eventType = sm.type;
String key = Stream.class.getCanonicalName() + "<" + eventType.getCanonicalName() + ">[]";
if (typeMap.containsKey(key)) {
String fieldName;
Field field;
Collection<Field> streamFields = typeMap.get(key);
int numStreamFields = streamFields.size();
logger.debug("Found [{}] stream fields for type [{}].", numStreamFields, key);
if (numStreamFields > 1) {
/*
* There is more than one field that can be used for this stream type. To resolve the ambiguity we
* need additional information. The app graph should include the name of the field that should be
* used to assign this stream. If the name is missing we bail out.
*/
fieldName = sm.fieldName;
/* Bail out. */
if (fieldName == null) {
String msg = String
.format("There are [%d] stream fields in PE [%s]. To assign stream [%s] you need to provide the field name in the application graph using the method withFiled(). See Javadocs for an example.",
numStreamFields, pe.getClass().getName(), stream.getName());
logger.error(msg);
throw new Exception(msg);
}
/* Use the provided field name to choose the PE field. */
field = pe.getClass().getDeclaredField(fieldName);
} else {
/*
* The easy case, no ambiguity, we don't need an explicit field name to be provided. We have the
* field that matches the stream type.
*/
Iterator<Field> iter = streamFields.iterator();
field = iter.next(); // Note that numStreamFields == 1, the size of this collection is 1.
logger.debug("Using field [{}].", field.getName());
}
/*
* By now, we found the field to use for this stream or we bailed out. We are not ready to finish yet.
* There may be more than one stream that needs to be assigned to this field. The stream fields must be
* arrays by convention and there may be more than one stream assigned to this fields. For now we create
* a multimap from field to streams so we can construct the array in the next step.
*/
assignment.put(field, stream);
} else {
/* We couldn't find a match. Tell user to fix the EDSL code. */
String msg = String.format(
"There is no stream of type [%s] in PE [%s]. I was unable to assign stream [%s].", key, pe
.getClass().getName(), stream.getName());
logger.error(msg);
throw new Exception(msg);
}
}
/* Now we construct the array and do the final assignment. */
Map<Field, Collection<Stream<? extends Event>>> assignmentMap = assignment.asMap();
for (Map.Entry<Field, Collection<Stream<? extends Event>>> entry : assignmentMap.entrySet()) {
Field f = entry.getKey();
int arraySize = entry.getValue().size();
@SuppressWarnings("unchecked")
Stream<? extends Event> streamArray[] = (Stream<? extends Event>[]) Array.newInstance(Stream.class,
arraySize);
int i = 0;
for (Stream<? extends Event> s : entry.getValue()) {
streamArray[i++] = s;
f.setAccessible(true);
f.set(pe, streamArray);
logger.debug("Assigned [{}] streams to field [{}].", streamArray.length, f.getName());
}
}
}
void clearPEState() {
propertyName = null;
propertyValue = null;
processingElement = null;
peName = null;
triggerEventType = null;
triggerTimeUnit = null;
cacheSize = -1;
streamBuilder = null;
}
}