blob: a7ca310ba0d6eb9dda6c5246167b05202280ad91 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.lambda;
import org.apache.crunch.*;
import org.apache.crunch.fn.Aggregators;
import org.apache.crunch.lambda.fn.SFunction;
import org.apache.crunch.lambda.fn.SPredicate;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;
import java.util.Optional;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/**
* Java 8 friendly version of the {@link PCollection} interface, allowing distributed operations to be expressed in
* terms of lambda expressions and method references, instead of creating a new class implementation for each operation.
* @param <S> The type of the elements in this collection
*/
public interface LCollection<S> {
/**
* Get the underlying {@link PCollection} for this LCollection
*/
PCollection<S> underlying();
/**
* Get the {@link LCollectionFactory} which can be used to create new Ltype instances
*/
LCollectionFactory factory();
/**
* Transform this LCollection using a standard Crunch {@link DoFn}
*/
default <T> LCollection<T> parallelDo(DoFn<S, T> fn, PType<T> pType) {
return factory().wrap(underlying().parallelDo(fn, pType));
}
/**
* Transform this LCollection to an {@link LTable} using a standard Crunch {@link DoFn}
*/
default <K, V> LTable<K, V> parallelDo(DoFn<S, Pair<K, V>> fn, PTableType<K, V> pType) {
return factory().wrap(underlying().parallelDo(fn, pType));
}
/**
* Transform this LCollection using a Lambda-friendly {@link LDoFn}.
*/
default <T> LCollection<T> parallelDo(LDoFn<S, T> fn, PType<T> pType) {
return parallelDo(new LDoFnWrapper<>(fn), pType);
}
/**
* Transform this LCollection using a Lambda-friendly {@link LDoFn}.
*/
default <K, V> LTable<K, V> parallelDo(LDoFn<S, Pair<K, V>> fn, PTableType<K, V> pType) {
return parallelDo(new LDoFnWrapper<>(fn), pType);
}
/**
* Map the elements of this collection 1-1 through the supplied function.
*/
default <T> LCollection<T> map(SFunction<S, T> fn, PType<T> pType) {
return parallelDo(ctx -> ctx.emit(fn.apply(ctx.element())), pType);
}
/**
* Map the elements of this collection 1-1 through the supplied function to yield an {@link LTable}
*/
default <K, V> LTable<K, V> map(SFunction<S, Pair<K, V>> fn, PTableType<K, V> pType) {
return parallelDo(ctx -> ctx.emit(fn.apply(ctx.element())), pType);
}
/**
* Map each element to zero or more output elements using the provided stream-returning function.
*/
default <T> LCollection<T> flatMap(SFunction<S, Stream<T>> fn, PType<T> pType) {
return parallelDo(ctx -> fn.apply(ctx.element()).forEach(ctx::emit), pType);
}
/**
* Map each element to zero or more output elements using the provided stream-returning function to yield an
* {@link LTable}
*/
default <K, V> LTable<K, V> flatMap(SFunction<S, Stream<Pair<K, V>>> fn, PTableType<K, V> pType) {
return parallelDo(ctx -> fn.apply(ctx.element()).forEach(ctx::emit), pType);
}
/**
* Combination of a filter and map operation by using a function with {@link Optional} return type.
*/
default <T> LCollection<T> filterMap(SFunction<S, Optional<T>> fn, PType<T> pType) {
return parallelDo(ctx -> fn.apply(ctx.element()).ifPresent(ctx::emit), pType);
}
/**
* Combination of a filter and map operation by using a function with {@link Optional} return type.
*/
default <K, V> LTable<K, V> filterMap(SFunction<S, Optional<Pair<K, V>>> fn, PTableType<K, V> pType) {
return parallelDo(ctx -> fn.apply(ctx.element()).ifPresent(ctx::emit), pType);
}
/**
* Filter the collection using the supplied predicate.
*/
default LCollection<S> filter(SPredicate<S> predicate) {
return parallelDo(ctx -> { if (predicate.test(ctx.element())) ctx.emit(ctx.element());}, pType());
}
/**
* Union this LCollection with another LCollection of the same type
*/
default LCollection<S> union(LCollection<S> other) {
return factory().wrap(underlying().union(other.underlying()));
}
/**
* Union this LCollection with a {@link PCollection} of the same type
*/
default LCollection<S> union(PCollection<S> other) {
return factory().wrap(underlying().union(other));
}
/**
* Increment a counter for every element in the collection
*/
default LCollection<S> increment(Enum<?> counter) {
return parallelDo(ctx -> {
ctx.increment(counter);
ctx.emit(ctx.element());
}, pType());
}
/**
* Increment a counter for every element in the collection
*/
default LCollection<S> increment(String counterGroup, String counterName) {
return parallelDo(ctx -> {
ctx.increment(counterGroup, counterName);
ctx.emit(ctx.element());
}, pType());
}
/**
* Increment a counter for every element satisfying the conditional predicate supplied.
*/
default LCollection<S> incrementIf(Enum<?> counter, SPredicate<S> condition) {
return parallelDo(ctx -> {
if (condition.test(ctx.element())) ctx.increment(counter);
ctx.emit(ctx.element());
}, pType());
}
/**
* Increment a counter for every element satisfying the conditional predicate supplied.
*/
default LCollection<S> incrementIf(String counterGroup, String counterName, SPredicate<S> condition) {
return parallelDo(ctx -> {
if (condition.test(ctx.element())) ctx.increment(counterGroup, counterName);
ctx.emit(ctx.element());
}, pType());
}
/**
* Cache the underlying {@link PCollection}
*/
default LCollection<S> cache() {
underlying().cache();
return this;
}
/**
* Cache the underlying {@link PCollection}
*/
default LCollection<S> cache(CachingOptions options) {
underlying().cache(options);
return this;
}
/**
* Key this LCollection by a key extracted from the element to yield a {@link LTable} mapping the key to the whole
* element.
*/
default <K> LTable<K, S> by(SFunction<S, K> extractFn, PType<K> pType) {
return parallelDo(
ctx -> ctx.emit(Pair.of(extractFn.apply(ctx.element()), ctx.element())),
ptf().tableOf(pType, pType()));
}
/**
* Count distict values in this LCollection, yielding an {@link LTable} mapping each value to the number
* of occurrences in the collection.
*/
default LTable<S, Long> count() {
return map(a -> Pair.of(a, 1L), ptf().tableOf(pType(), ptf().longs()))
.groupByKey()
.combineValues(Aggregators.SUM_LONGS());
}
/**
* Obtain the contents of this LCollection as a {@link Stream} that can be processed locally. Note, this may trigger
* your job to execute in a distributed environment if the pipeline has not yet been run.
*/
default Stream<S> materialize() {
return StreamSupport.stream(underlying().materialize().spliterator(), false);
}
/**
* Get the {@link PTypeFamily} representing how elements of this collection may be serialized.
*/
default PTypeFamily ptf() {
return underlying().getPType().getFamily();
}
/**
* Get the {@link PType} representing how elements of this collection may be serialized.
*/
default PType<S> pType() { return underlying().getPType(); }
/**
* Write this collection to the specified {@link Target}
*/
default LCollection<S> write(Target target) {
underlying().write(target);
return this;
}
/**
* Write this collection to the specified {@link Target} with the given {@link org.apache.crunch.Target.WriteMode}
*/
default LCollection<S> write(Target target, Target.WriteMode writeMode) {
underlying().write(target, writeMode);
return this;
}
}