blob: a52e7e3870b28a5753cff936f051bf1991206bbe [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
package org.apache.storm.trident.util;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.storm.generated.StreamInfo;
import org.apache.storm.thrift.TBase;
import org.apache.storm.topology.IComponent;
import org.apache.storm.topology.OutputFieldsGetter;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
public class TridentUtils {
public static Fields fieldsUnion(Fields... fields) {
Set<String> ret = new HashSet<String>();
for (Fields f : fields) {
if (f != null) {
return new Fields(new ArrayList<String>(ret));
public static Fields fieldsConcat(Fields... fields) {
List<String> ret = new ArrayList<String>();
for (Fields f : fields) {
if (f != null) {
return new Fields(ret);
public static Fields fieldsSubtract(Fields all, Fields minus) {
Set<String> removeSet = new HashSet<String>(minus.toList());
List<String> toKeep = new ArrayList<String>();
for (String s : all.toList()) {
if (!removeSet.contains(s)) {
return new Fields(toKeep);
public static Fields getSingleOutputStreamFields(IComponent component) {
OutputFieldsGetter getter = new OutputFieldsGetter();
Map<String, StreamInfo> declaration = getter.getFieldsDeclaration();
if (declaration.size() != 1) {
throw new RuntimeException("Trident only supports components that emit a single stream");
StreamInfo si = declaration.values().iterator().next();
if (si.is_direct()) {
throw new RuntimeException("Trident does not support direct streams");
return new Fields(si.get_output_fields());
* Assumes edge contains an index.
public static <T> List<T> getParents(DirectedGraph g, T n) {
List<IndexedEdge> incoming = new ArrayList(g.incomingEdgesOf(n));
List<T> ret = new ArrayList();
for (IndexedEdge e : incoming) {
ret.add((T) e.source);
return ret;
public static <T> List<T> getChildren(DirectedGraph g, T n) {
List<IndexedEdge> outgoing = new ArrayList(g.outgoingEdgesOf(n));
List<T> ret = new ArrayList();
for (IndexedEdge e : outgoing) {
return ret;
public static <T> T getParent(DirectedGraph g, T n) {
List<T> parents = getParents(g, n);
if (parents.size() != 1) {
throw new RuntimeException("Expected a single parent");
return parents.get(0);
public static byte[] thriftSerialize(TBase t) {
return Utils.thriftSerialize(t);
public static <T> T thriftDeserialize(Class<T> c, byte[] b) {
return Utils.thriftDeserialize(c, b);