blob: d5049293c9e8e48dabeed993acf3c3f551d25ece [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.samza.runtime;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternalsFactory;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateContext;
import org.apache.beam.sdk.state.TimeDomain;
import org.joda.time.Instant;
/** Provides access to the keyed StateInternals and TimerInternals. */
@ThreadSafe
class KeyedInternals<K> {
private static class KeyedStates<K> {
private final K key;
private final List<State> states;
private KeyedStates(K key) {
this.key = key;
this.states = new ArrayList<>();
}
}
private static final ThreadLocal<KeyedStates> threadLocalKeyedStates = new ThreadLocal<>();
private final StateInternalsFactory<K> stateFactory;
private final TimerInternalsFactory<K> timerFactory;
KeyedInternals(StateInternalsFactory<K> stateFactory, TimerInternalsFactory<K> timerFactory) {
this.stateFactory = stateFactory;
this.timerFactory = timerFactory;
}
StateInternals stateInternals() {
return new KeyedStateInternals();
}
TimerInternals timerInternals() {
return new KeyedTimerInternals();
}
void setKey(K key) {
checkState(
threadLocalKeyedStates.get() == null,
"States for key %s is not cleared before processing",
key);
threadLocalKeyedStates.set(new KeyedStates<K>(key));
}
K getKey() {
KeyedStates<K> keyedStates = threadLocalKeyedStates.get();
return keyedStates == null ? null : keyedStates.key;
}
void clearKey() {
final List<State> states = threadLocalKeyedStates.get().states;
states.forEach(
state -> {
if (state instanceof SamzaStoreStateInternals.KeyValueIteratorState) {
((SamzaStoreStateInternals.KeyValueIteratorState) state).closeIterators();
}
});
states.clear();
threadLocalKeyedStates.remove();
}
private class KeyedStateInternals implements StateInternals {
@Override
public K getKey() {
return KeyedInternals.this.getKey();
}
@Override
public <T extends State> T state(
StateNamespace namespace, StateTag<T> address, StateContext<?> c) {
checkState(getKey() != null, "Key is not set before state access in Stateful ParDo.");
final T state = stateFactory.stateInternalsForKey(getKey()).state(namespace, address, c);
threadLocalKeyedStates.get().states.add(state);
return state;
}
}
private class KeyedTimerInternals implements TimerInternals {
private TimerInternals getInternals() {
return timerFactory.timerInternalsForKey(getKey());
}
@Override
public void setTimer(
StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) {
getInternals().setTimer(namespace, timerId, target, timeDomain);
}
@Override
public void setTimer(TimerData timerData) {
getInternals().setTimer(timerData);
}
@Override
public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
getInternals().deleteTimer(namespace, timerId, timeDomain);
}
@Override
public void deleteTimer(StateNamespace namespace, String timerId) {
getInternals().deleteTimer(namespace, timerId);
}
@Override
public void deleteTimer(TimerData timerKey) {
getInternals().deleteTimer(timerKey);
}
@Override
public Instant currentProcessingTime() {
return getInternals().currentProcessingTime();
}
@Nullable
@Override
public Instant currentSynchronizedProcessingTime() {
return getInternals().currentSynchronizedProcessingTime();
}
@Override
public Instant currentInputWatermarkTime() {
return getInternals().currentInputWatermarkTime();
}
@Nullable
@Override
public Instant currentOutputWatermarkTime() {
return getInternals().currentOutputWatermarkTime();
}
}
}