blob: a411f8ef812d0ea8f6057e661254b998656f967d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.grpc;
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import io.grpc.CallCredentials;
import io.grpc.ManagedChannel;
import io.grpc.auth.MoreCallCredentials;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.StreamObserver;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.component.grpc.auth.jwt.JwtCallCredentials;
import org.apache.camel.component.grpc.auth.jwt.JwtHelper;
import org.apache.camel.component.grpc.client.GrpcExchangeForwarder;
import org.apache.camel.component.grpc.client.GrpcExchangeForwarderFactory;
import org.apache.camel.component.grpc.client.GrpcResponseAggregationStreamObserver;
import org.apache.camel.component.grpc.client.GrpcResponseRouterStreamObserver;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.spi.ClassResolver;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ResourceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Represents asynchronous and synchronous gRPC producer implementations.
*/
public class GrpcProducer extends DefaultProducer implements AsyncProcessor {
private static final Logger LOG = LoggerFactory.getLogger(GrpcProducer.class);
protected final GrpcConfiguration configuration;
protected final GrpcEndpoint endpoint;
private ManagedChannel channel;
private Object grpcStub;
private GrpcExchangeForwarder forwarder;
private StreamObserver<Object> globalResponseObserver;
public GrpcProducer(GrpcEndpoint endpoint, GrpcConfiguration configuration) {
super(endpoint);
this.endpoint = endpoint;
this.configuration = configuration;
if (configuration.getProducerStrategy() == GrpcProducerStrategy.STREAMING) {
if (endpoint.isSynchronous()) {
throw new IllegalStateException("Cannot use synchronous processing in streaming mode");
} else if (configuration.getStreamRepliesTo() == null) {
throw new IllegalStateException("The streamReplyTo property is mandatory when using the STREAMING mode");
}
}
if (configuration.getAuthenticationType() == GrpcAuthType.GOOGLE && configuration.getNegotiationType() != NegotiationType.TLS) {
throw new IllegalStateException("Google token-based authentication requires SSL/TLS negotiation mode");
}
}
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
StreamObserver<Object> streamObserver = this.globalResponseObserver;
if (globalResponseObserver == null) {
streamObserver = new GrpcResponseAggregationStreamObserver(exchange, callback);
}
return forwarder.forward(exchange, streamObserver, callback);
}
@Override
public void process(Exchange exchange) throws Exception {
forwarder.forward(exchange);
}
@Override
protected void doStart() throws Exception {
super.doStart();
if (channel == null) {
CallCredentials callCreds = null;
initializeChannel();
if (configuration.getAuthenticationType() == GrpcAuthType.GOOGLE) {
ObjectHelper.notNull(configuration.getKeyCertChainResource(), "serviceAccountResource");
ClassResolver classResolver = endpoint.getCamelContext().getClassResolver();
Credentials creds = GoogleCredentials.fromStream(ResourceHelper.resolveResourceAsInputStream(classResolver, configuration.getServiceAccountResource()));
callCreds = MoreCallCredentials.from(creds);
} else if (configuration.getAuthenticationType() == GrpcAuthType.JWT) {
ObjectHelper.notNull(configuration.getJwtSecret(), "jwtSecret");
String jwtToken = JwtHelper.createJwtToken(configuration.getJwtAlgorithm(), configuration.getJwtSecret(), configuration.getJwtIssuer(), configuration.getJwtSubject());
callCreds = new JwtCallCredentials(jwtToken);
}
if (endpoint.isSynchronous()) {
LOG.debug("Getting synchronous method stub from channel");
grpcStub = GrpcUtils.constructGrpcBlockingStub(endpoint.getServicePackage(), endpoint.getServiceName(), channel, callCreds, endpoint.getCamelContext());
} else {
LOG.debug("Getting asynchronous method stub from channel");
grpcStub = GrpcUtils.constructGrpcAsyncStub(endpoint.getServicePackage(), endpoint.getServiceName(), channel, callCreds, endpoint.getCamelContext());
}
forwarder = GrpcExchangeForwarderFactory.createExchangeForwarder(configuration, grpcStub);
if (configuration.getStreamRepliesTo() != null) {
this.globalResponseObserver = new GrpcResponseRouterStreamObserver(configuration, getEndpoint());
}
}
}
@Override
protected void doStop() throws Exception {
if (channel != null) {
forwarder.shutdown();
forwarder = null;
LOG.debug("Terminating channel to the remote gRPC server");
channel.shutdown().shutdownNow();
channel = null;
grpcStub = null;
globalResponseObserver = null;
}
super.doStop();
}
protected void initializeChannel() throws Exception {
NettyChannelBuilder channelBuilder = null;
if (!ObjectHelper.isEmpty(configuration.getHost()) && !ObjectHelper.isEmpty(configuration.getPort())) {
LOG.info("Creating channel to the remote gRPC server {}:{}", configuration.getHost(), configuration.getPort());
channelBuilder = NettyChannelBuilder.forAddress(configuration.getHost(), configuration.getPort());
} else {
throw new IllegalArgumentException("No connection properties (host or port) specified");
}
if (configuration.getNegotiationType() == NegotiationType.TLS) {
ObjectHelper.notNull(configuration.getKeyCertChainResource(), "keyCertChainResource");
ObjectHelper.notNull(configuration.getKeyResource(), "keyResource");
ClassResolver classResolver = endpoint.getCamelContext().getClassResolver();
SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient()
.sslProvider(SslProvider.OPENSSL)
.keyManager(ResourceHelper.resolveResourceAsInputStream(classResolver, configuration.getKeyCertChainResource()),
ResourceHelper.resolveResourceAsInputStream(classResolver, configuration.getKeyResource()),
configuration.getKeyPassword());
if (ObjectHelper.isNotEmpty(configuration.getTrustCertCollectionResource())) {
sslContextBuilder = sslContextBuilder.trustManager(ResourceHelper.resolveResourceAsInputStream(classResolver, configuration.getTrustCertCollectionResource()));
}
channelBuilder = channelBuilder.sslContext(sslContextBuilder.build());
}
channel = channelBuilder.negotiationType(configuration.getNegotiationType())
.flowControlWindow(configuration.getFlowControlWindow())
.userAgent(configuration.getUserAgent())
.maxInboundMessageSize(configuration.getMaxMessageSize())
.build();
}
}