blob: 168136979d591fafe696414c862283025e3bbe97 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.transforms.windowing;
import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.joda.time.Duration;
/**
* A {@link WindowFn} that windows values into sessions separated by periods with no input for at
* least the duration specified by {@link #getGapDuration()}.
*
* <p>For example, in order to window data into session with at least 10 minute gaps in between
* them:
*
* <pre>{@code
* PCollection<Integer> pc = ...;
* PCollection<Integer> windowed_pc = pc.apply(
* Window.<Integer>into(Sessions.withGapDuration(Duration.standardMinutes(10))));
* }</pre>
*/
public class Sessions extends WindowFn<Object, IntervalWindow> {
/** Duration of the gaps between sessions. */
private final Duration gapDuration;
/** Creates a {@code Sessions} {@link WindowFn} with the specified gap duration. */
public static Sessions withGapDuration(Duration gapDuration) {
return new Sessions(gapDuration);
}
/** Creates a {@code Sessions} {@link WindowFn} with the specified gap duration. */
private Sessions(Duration gapDuration) {
this.gapDuration = gapDuration;
}
@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) {
// Assign each element into a window from its timestamp until gapDuration in the
// future. Overlapping windows (representing elements within gapDuration of
// each other) will be merged.
return Arrays.asList(new IntervalWindow(c.timestamp(), gapDuration));
}
@Override
public void mergeWindows(MergeContext c) throws Exception {
MergeOverlappingIntervalWindows.mergeWindows(c);
}
@Override
public Coder<IntervalWindow> windowCoder() {
return IntervalWindow.getCoder();
}
@Override
public boolean isCompatible(WindowFn<?, ?> other) {
return other instanceof Sessions;
}
@Override
public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
if (!this.isCompatible(other)) {
throw new IncompatibleWindowException(
other,
String.format(
"%s is only compatible with %s.",
Sessions.class.getSimpleName(), Sessions.class.getSimpleName()));
}
}
@Override
public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
throw new UnsupportedOperationException("Sessions is not allowed in side inputs");
}
public Duration getGapDuration() {
return gapDuration;
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.add(DisplayData.item("gapDuration", gapDuration).withLabel("Session Gap Duration"));
}
@Override
public boolean equals(Object object) {
if (!(object instanceof Sessions)) {
return false;
}
Sessions other = (Sessions) object;
return getGapDuration().equals(other.getGapDuration());
}
@Override
public int hashCode() {
return Objects.hash(gapDuration);
}
}