blob: 253c3905fcc59c178c7f4a5c9609c4536099c7e0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.direct;
import org.apache.beam.runners.local.StructuralKey;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.joda.time.Instant;
/**
* A {@link BundleFactory} where a created {@link UncommittedBundle} clones all elements added to it
* using the coder of the {@link PCollection}.
*/
class CloningBundleFactory implements BundleFactory {
private static final CloningBundleFactory INSTANCE = new CloningBundleFactory();
public static CloningBundleFactory create() {
return INSTANCE;
}
private final ImmutableListBundleFactory underlying;
private CloningBundleFactory() {
this.underlying = ImmutableListBundleFactory.create();
}
@Override
public <T> UncommittedBundle<T> createRootBundle() {
// The DirectRunner is responsible for these elements, but they need not be encodable.
return underlying.createRootBundle();
}
@Override
public <T> UncommittedBundle<T> createBundle(PCollection<T> output) {
return new CloningBundle<>(underlying.createBundle(output));
}
@Override
public <K, T> UncommittedBundle<T> createKeyedBundle(
StructuralKey<K> key, PCollection<T> output) {
return new CloningBundle<>(underlying.createKeyedBundle(key, output));
}
private static class CloningBundle<T> implements UncommittedBundle<T> {
private final UncommittedBundle<T> underlying;
private final Coder<T> coder;
private CloningBundle(UncommittedBundle<T> underlying) {
this.underlying = underlying;
this.coder = underlying.getPCollection().getCoder();
}
@Override
public PCollection<T> getPCollection() {
return underlying.getPCollection();
}
@Override
public UncommittedBundle<T> add(WindowedValue<T> element) {
try {
// Use the cloned value to ensure that if the coder behaves poorly (e.g. a NoOpCoder that
// does not expect to be used) that is reflected in the values given to downstream
// transforms
WindowedValue<T> clone = element.withValue(CoderUtils.clone(coder, element.getValue()));
underlying.add(clone);
} catch (CoderException e) {
throw UserCodeException.wrap(e);
}
return this;
}
@Override
public CommittedBundle<T> commit(Instant synchronizedProcessingTime) {
return underlying.commit(synchronizedProcessingTime);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("Data", underlying.toString())
.add("Coder", coder.toString())
.toString();
}
}
}