blob: d35d827ff618b393069214fd8fb9252d55c40533 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.kubernetes.operator.admission;
import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
import org.apache.flink.kubernetes.operator.admission.mutator.FlinkMutator;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.fs.FileSystemWatchService;
import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator;
import org.apache.flink.kubernetes.operator.ssl.ReloadableSslContext;
import org.apache.flink.kubernetes.operator.utils.EnvUtils;
import org.apache.flink.kubernetes.operator.utils.MutatorUtils;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContext;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.util.Set;
/** Main Class for Flink native k8s operator. */
public class FlinkOperatorWebhook {
private static final Logger LOG = LoggerFactory.getLogger(FlinkOperatorWebhook.class);
private static final int MAX_CONTEXT_LENGTH = 104_857_600;
private static FileSystemWatchService fileSystemWatchService;
public static void main(String[] args) throws Exception {
EnvUtils.logEnvironmentInfo(LOG, "Flink Kubernetes Webhook", args);
var informerManager = new InformerManager(new KubernetesClientBuilder().build());
var configManager = new FlinkConfigManager(informerManager::setNamespaces);
var operatorConfig = configManager.getOperatorConfiguration();
if (!operatorConfig.isDynamicNamespacesEnabled()) {
informerManager.setNamespaces(operatorConfig.getWatchedNamespaces());
}
Set<FlinkResourceValidator> validators = ValidatorUtils.discoverValidators(configManager);
Set<FlinkResourceMutator> mutators = MutatorUtils.discoverMutators(configManager);
AdmissionHandler endpoint =
new AdmissionHandler(
new FlinkValidator(validators, informerManager),
new FlinkMutator(mutators, informerManager));
ChannelInitializer<SocketChannel> initializer = createChannelInitializer(endpoint);
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
Channel serverChannel = bootstrap.bind(getPort()).sync().channel();
InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
InetAddress inetAddress = bindAddress.getAddress();
LOG.info(
"Webhook listening at {}" + ':' + "{}",
inetAddress.getHostAddress(),
bindAddress.getPort());
serverChannel.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private static int getPort() {
String portString = EnvUtils.getRequired(EnvUtils.ENV_WEBHOOK_SERVER_PORT);
return Integer.parseInt(portString);
}
private static ChannelInitializer<SocketChannel> createChannelInitializer(
AdmissionHandler admissionHandler) throws Exception {
SslContext sslContext = createSslContext();
return new ChannelInitializer<>() {
@Override
protected void initChannel(SocketChannel ch) {
if (sslContext != null) {
ch.pipeline().addLast(sslContext.newHandler(ch.alloc()));
}
ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(MAX_CONTEXT_LENGTH));
ch.pipeline()
.addLast(new ChunkedWriteHandler())
.addLast(admissionHandler.getClass().getName(), admissionHandler);
}
};
}
private static SslContext createSslContext() throws Exception {
var keystorePathOpt = EnvUtils.get(EnvUtils.ENV_WEBHOOK_KEYSTORE_FILE);
if (keystorePathOpt.isEmpty()) {
LOG.info(
"No keystore path is defined in "
+ EnvUtils.ENV_WEBHOOK_KEYSTORE_FILE
+ ", running without ssl");
return null;
}
String keystoreType = EnvUtils.getRequired(EnvUtils.ENV_WEBHOOK_KEYSTORE_TYPE);
String keystorePassword = EnvUtils.getRequired(EnvUtils.ENV_WEBHOOK_KEYSTORE_PASSWORD);
ReloadableSslContext reloadableSslContext =
new ReloadableSslContext(keystorePathOpt.get(), keystoreType, keystorePassword);
stopFileSystemWatchService();
final String realKeystoreFileName =
Path.of(keystorePathOpt.get()).toRealPath().getFileName().toString();
LOG.info("Keystore path is resolved to real filename: " + realKeystoreFileName);
fileSystemWatchService =
new FileSystemWatchService(Path.of(keystorePathOpt.get()).getParent().toString()) {
@Override
protected void onFileOrDirectoryModified(Path relativePath) {
try {
LOG.info("Reloading SSL context because of certificate change");
reloadableSslContext.reload();
LOG.info("SSL context reloaded successfully");
} catch (Exception e) {
LOG.error("SSL context reload received exception: " + e);
}
}
};
fileSystemWatchService.start();
return reloadableSslContext;
}
private static void stopFileSystemWatchService() throws InterruptedException {
if (fileSystemWatchService != null) {
fileSystemWatchService.interrupt();
fileSystemWatchService.join();
}
}
}