blob: 3e48be19cdb91762112ace007e2e5d1354623872 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.fnexecution.control;
import java.io.Serializable;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* {@link ExecutableStageContext.Factory} which counts ExecutableStageContext reference for book
* keeping.
*/
public class ReferenceCountingExecutableStageContextFactory
implements ExecutableStageContext.Factory {
private static final Logger LOG =
LoggerFactory.getLogger(ReferenceCountingExecutableStageContextFactory.class);
private static final int MAX_RETRY = 3;
private final Creator creator;
private transient volatile ScheduledExecutorService executor;
private transient volatile ConcurrentHashMap<String, WrappedContext> keyRegistry;
private final SerializableFunction<Object, Boolean> isReleaseSynchronous;
public static ReferenceCountingExecutableStageContextFactory create(
Creator creator, SerializableFunction<Object, Boolean> isReleaseSynchronous) {
return new ReferenceCountingExecutableStageContextFactory(creator, isReleaseSynchronous);
}
private ReferenceCountingExecutableStageContextFactory(
Creator creator, SerializableFunction<Object, Boolean> isReleaseSynchronous) {
this.creator = creator;
this.isReleaseSynchronous = isReleaseSynchronous;
}
@Override
public ExecutableStageContext get(JobInfo jobInfo) {
// Retry is needed in case where an existing wrapper is picked from the cache but by
// the time we accessed wrapper.referenceCount, the wrapper was tombstoned by a pending
// release task.
// This race condition is highly unlikely to happen as there is no systematic coding
// practice which can cause this error because of TTL. However, even in very unlikely case
// when it happen we have the retry which get a valid context.
// Note: There is no leak in this logic as the cleanup is only done in release.
// In case of usage error where release is called before corresponding get finishes,
// release might throw an error. If release did not throw an error than we can be sure that
// the state of the system remains valid and appropriate cleanup will be done at TTL.
for (int retry = 0; retry < MAX_RETRY; retry++) {
// ConcurrentHashMap will handle the thread safety at the creation time.
WrappedContext wrapper =
getCache()
.computeIfAbsent(
jobInfo.jobId(),
jobId -> {
try {
return new WrappedContext(jobInfo, creator.apply(jobInfo));
} catch (Exception e) {
throw new RuntimeException(
"Unable to create context for job " + jobInfo.jobId(), e);
}
});
// Take a lock on wrapper before modifying reference count.
// Use null referenceCount == null as a tombstone for the wrapper.
synchronized (wrapper) {
if (wrapper.referenceCount != null) {
// The wrapper is still valid.
// Release has not yet got the lock and has not yet removed the wrapper.
wrapper.referenceCount.incrementAndGet();
return wrapper;
}
}
}
throw new RuntimeException(
String.format(
"Max retry %s exhausted while creating Context for job %s",
MAX_RETRY, jobInfo.jobId()));
}
@SuppressWarnings("FutureReturnValueIgnored")
private void scheduleRelease(JobInfo jobInfo) {
WrappedContext wrapper = getCache().get(jobInfo.jobId());
Preconditions.checkState(
wrapper != null, "Releasing context for unknown job: " + jobInfo.jobId());
PipelineOptions pipelineOptions =
PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
int environmentCacheTTLMillis =
pipelineOptions.as(PortablePipelineOptions.class).getEnvironmentCacheMillis();
if (environmentCacheTTLMillis > 0) {
if (isReleaseSynchronous.apply(this)) {
// Do immediate cleanup
release(wrapper);
} else {
// Schedule task to clean the container later.
getExecutor()
.schedule(() -> release(wrapper), environmentCacheTTLMillis, TimeUnit.MILLISECONDS);
}
} else {
// Do not release this asynchronously, as the releasing could fail due to the classloader not
// being available anymore after the tasks have been removed from the execution engine.
release(wrapper);
}
}
private ConcurrentHashMap<String, WrappedContext> getCache() {
// Lazily initialize keyRegistry because serialization will set it to null.
if (keyRegistry != null) {
return keyRegistry;
}
synchronized (this) {
if (keyRegistry == null) {
keyRegistry = new ConcurrentHashMap<>();
}
return keyRegistry;
}
}
private ScheduledExecutorService getExecutor() {
// Lazily initialize executor because serialization will set it to null.
if (executor != null) {
return executor;
}
synchronized (this) {
if (executor == null) {
executor =
Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).build());
}
return executor;
}
}
@VisibleForTesting
void release(ExecutableStageContext context) {
@SuppressWarnings({"unchecked", "Not exected to be called from outside."})
WrappedContext wrapper = (WrappedContext) context;
synchronized (wrapper) {
if (wrapper.referenceCount.decrementAndGet() == 0) {
// Tombstone wrapper.
wrapper.referenceCount = null;
if (getCache().remove(wrapper.jobInfo.jobId(), wrapper)) {
try {
wrapper.closeActual();
} catch (Throwable t) {
LOG.error("Unable to close ExecutableStageContext.", t);
}
}
}
}
}
/** {@link WrappedContext} does not expose equals of actual {@link ExecutableStageContext}. */
@VisibleForTesting
class WrappedContext implements ExecutableStageContext {
private JobInfo jobInfo;
private AtomicInteger referenceCount;
@VisibleForTesting ExecutableStageContext context;
/** {@link WrappedContext#equals(Object)} is only based on {@link JobInfo#jobId()}. */
WrappedContext(JobInfo jobInfo, ExecutableStageContext context) {
this.jobInfo = jobInfo;
this.context = context;
this.referenceCount = new AtomicInteger(0);
}
@Override
public StageBundleFactory getStageBundleFactory(ExecutableStage executableStage) {
return context.getStageBundleFactory(executableStage);
}
@Override
public void close() {
// Just schedule the context as we want to reuse it if possible.
scheduleRelease(jobInfo);
}
private void closeActual() throws Exception {
context.close();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WrappedContext that = (WrappedContext) o;
return Objects.equals(jobInfo.jobId(), that.jobInfo.jobId());
}
@Override
public int hashCode() {
return Objects.hash(jobInfo);
}
@Override
public String toString() {
return "ContextWrapper{"
+ "jobId='"
+ jobInfo
+ '\''
+ ", referenceCount="
+ referenceCount
+ '}';
}
}
/** Interface for creator which extends Serializable. */
public interface Creator
extends ThrowingFunction<JobInfo, ExecutableStageContext>, Serializable {}
}