blob: 09457a3929ac31bd62c3a0db09cb059bc4225dfc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.construction;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowingStrategy;
/**
* Vends Java SDK objects rehydrated from a Runner API {@link Components} collection.
*
* <p>This ensures maximum memoization of rehydrated components, which is semantically necessary for
* {@link PCollection} and nice-to-have for other objects.
*/
public class RehydratedComponents {
private final Components components;
/**
* This class may be used in the context of a pipeline or not. If not, then it cannot
* rehydrated {@link PCollection PCollections}.
*/
@Nullable
private final Pipeline pipeline;
/**
* A non-evicting cache, serving as a memo table for rehydrated {@link WindowingStrategy
* WindowingStrategies}.
*/
private final LoadingCache<String, WindowingStrategy<?, ?>> windowingStrategies =
CacheBuilder.newBuilder()
.build(
new CacheLoader<String, WindowingStrategy<?, ?>>() {
@Override
public WindowingStrategy<?, ?> load(String id) throws Exception {
return WindowingStrategyTranslation.fromProto(
components.getWindowingStrategiesOrThrow(id), RehydratedComponents.this);
}
});
/** A non-evicting cache, serving as a memo table for rehydrated {@link Coder Coders}. */
private final LoadingCache<String, Coder<?>> coders =
CacheBuilder.newBuilder()
.build(
new CacheLoader<String, Coder<?>>() {
@Override
public Coder<?> load(String id) throws Exception {
@Nullable RunnerApi.Coder coder = components.getCodersOrDefault(id, null);
checkState(coder != null, "No coder with id '%s' in serialized components", id);
return CoderTranslation.fromProto(coder, RehydratedComponents.this);
}
});
/**
* A non-evicting cache, serving as a memo table for rehydrated {@link PCollection PCollections}.
*/
private final LoadingCache<String, PCollection<?>> pCollections =
CacheBuilder.newBuilder()
.build(
new CacheLoader<String, PCollection<?>>() {
@Override
public PCollection<?> load(String id) throws Exception {
checkState(
pipeline != null,
"%s Cannot rehydrate %s without a %s:"
+ " provide one via .withPipeline(...)",
RehydratedComponents.class.getSimpleName(),
PCollection.class.getSimpleName(),
Pipeline.class.getSimpleName());
return PCollectionTranslation.fromProto(
components.getPcollectionsOrThrow(id), pipeline, RehydratedComponents.this)
.setName(id);
}
});
/** Create a new {@link RehydratedComponents} from a Runner API {@link Components}. */
public static RehydratedComponents forComponents(RunnerApi.Components components) {
return new RehydratedComponents(components, null);
}
/** Create a new {@link RehydratedComponents} with a pipeline attached. */
public RehydratedComponents withPipeline(Pipeline pipeline) {
return new RehydratedComponents(components, pipeline);
}
private RehydratedComponents(RunnerApi.Components components, @Nullable Pipeline pipeline) {
this.components = components;
this.pipeline = pipeline;
}
/**
* Returns a {@link PCollection} rehydrated from the Runner API component with the given ID.
*
* <p>For a single instance of {@link RehydratedComponents}, this always returns the same instance
* for a particular id.
*/
public PCollection<?> getPCollection(String pCollectionId) throws IOException {
try {
return pCollections.get(pCollectionId);
} catch (ExecutionException exc) {
throw new RuntimeException(exc);
}
}
/**
* Returns a {@link WindowingStrategy} rehydrated from the Runner API component with the given ID.
*
* <p>For a single instance of {@link RehydratedComponents}, this always returns the same instance
* for a particular id.
*/
public WindowingStrategy<?, ?> getWindowingStrategy(String windowingStrategyId)
throws IOException {
try {
return windowingStrategies.get(windowingStrategyId);
} catch (ExecutionException exc) {
throw new RuntimeException(exc);
}
}
/**
* Returns a {@link Coder} rehydrated from the Runner API component with the given ID.
*
* <p>For a single instance of {@link RehydratedComponents}, this always returns the same instance
* for a particular id.
*/
public Coder<?> getCoder(String coderId) throws IOException {
try {
return coders.get(coderId);
} catch (ExecutionException exc) {
throw new RuntimeException(exc);
}
}
}