blob: aee7536467c0b79b9668710fd8970b8940a33b5b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.flink.core.logger;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.ToIntFunction;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.statefun.flink.core.di.ObjectContainer;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard.Lease;
public final class Loggers {
private Loggers() {}
public static UnboundedFeedbackLoggerFactory<?> unboundedSpillableLoggerFactory(
IOManager ioManager,
int maxParallelism,
long inMemoryMaxBufferSize,
TypeSerializer<?> serializer,
Function<?, ?> keySelector) {
ObjectContainer container =
unboundedSpillableLoggerContainer(
ioManager, maxParallelism, inMemoryMaxBufferSize, serializer, keySelector);
return container.get(UnboundedFeedbackLoggerFactory.class);
}
/** Wires the required dependencies to construct an {@link UnboundedFeedbackLogger}. */
@VisibleForTesting
static ObjectContainer unboundedSpillableLoggerContainer(
IOManager ioManager,
int maxParallelism,
long inMemoryMaxBufferSize,
TypeSerializer<?> serializer,
Function<?, ?> keySelector) {
ObjectContainer container = new ObjectContainer();
container.add("max-parallelism", int.class, maxParallelism);
container.add("in-memory-max-buffer-size", long.class, inMemoryMaxBufferSize);
container.add("io-manager", IOManager.class, ioManager);
container.add("key-group-supplier", Supplier.class, KeyGroupStreamFactory.class);
container.add(
"key-group-assigner", ToIntFunction.class, new KeyAssigner<>(keySelector, maxParallelism));
container.add("envelope-serializer", TypeSerializer.class, serializer);
container.add(
"checkpoint-stream-ops",
CheckpointedStreamOperations.class,
KeyedStateCheckpointOutputStreamOps.INSTANCE);
container.add(UnboundedFeedbackLoggerFactory.class);
return container;
}
private enum KeyedStateCheckpointOutputStreamOps implements CheckpointedStreamOperations {
INSTANCE;
@Override
public void requireKeyedStateCheckpointed(OutputStream stream) {
if (stream instanceof KeyedStateCheckpointOutputStream) {
return;
}
throw new IllegalStateException("Not a KeyedStateCheckpointOutputStream");
}
@Override
public void startNewKeyGroup(OutputStream stream, int keyGroup) throws IOException {
cast(stream).startNewKeyGroup(keyGroup);
}
@Override
@SuppressWarnings("resource")
public Closeable acquireLease(OutputStream stream) {
Preconditions.checkState(stream instanceof KeyedStateCheckpointOutputStream);
try {
Lease lease = cast(stream).acquireLease();
return lease::close;
} catch (IOException e) {
throw new IllegalStateException("Unable to obtain a lease for the input stream.", e);
}
}
private static KeyedStateCheckpointOutputStream cast(OutputStream stream) {
Preconditions.checkState(stream instanceof KeyedStateCheckpointOutputStream);
return (KeyedStateCheckpointOutputStream) stream;
}
}
private static final class KeyAssigner<T> implements ToIntFunction<T> {
private final Function<T, ?> keySelector;
private final int maxParallelism;
private KeyAssigner(Function<T, ?> keySelector, int maxParallelism) {
this.keySelector = Objects.requireNonNull(keySelector);
this.maxParallelism = maxParallelism;
}
@Override
public int applyAsInt(T value) {
Object key = keySelector.apply(value);
return KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
}
}
}