blob: 08a91ba83e03716024dde72b46ccb2d6373b8c69 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.rabbitmq.common;
import org.apache.flink.util.Preconditions;
import com.rabbitmq.client.ConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
/**
* Connection Configuration for RMQ.
* If {@link Builder#setUri(String)} has been set then {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer,
* Boolean, Boolean, Integer, Integer, Integer, Integer)}
* will be used for initialize the RMQ connection or
* {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, String, String, String, Integer, Boolean,
* Boolean, Integer, Integer, Integer, Integer)}
* will be used for initialize the RMQ connection
*/
public class RMQConnectionConfig implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(RMQConnectionConfig.class);
private String host;
private Integer port;
private String virtualHost;
private String username;
private String password;
private String uri;
private Integer networkRecoveryInterval;
private Boolean automaticRecovery;
private Boolean topologyRecovery;
private Integer connectionTimeout;
private Integer requestedChannelMax;
private Integer requestedFrameMax;
private Integer requestedHeartbeat;
private Boolean autoCreateQueue;
/**
*
* @param host host name
* @param port port
* @param virtualHost virtual host
* @param username username
* @param password password
* @param networkRecoveryInterval connection recovery interval in milliseconds
* @param automaticRecovery if automatic connection recovery
* @param topologyRecovery if topology recovery
* @param connectionTimeout connection timeout
* @param requestedChannelMax requested maximum channel number
* @param requestedFrameMax requested maximum frame size
* @param requestedHeartbeat requested heartbeat interval
* @throws NullPointerException if host or virtual host or username or password is null
*/
private RMQConnectionConfig(String host, Integer port, String virtualHost, String username, String password,
Integer networkRecoveryInterval, Boolean automaticRecovery,
Boolean topologyRecovery, Integer connectionTimeout, Integer requestedChannelMax,
Integer requestedFrameMax, Integer requestedHeartbeat){
Preconditions.checkNotNull(host, "host can not be null");
Preconditions.checkNotNull(port, "port can not be null");
Preconditions.checkNotNull(virtualHost, "virtualHost can not be null");
Preconditions.checkNotNull(username, "username can not be null");
Preconditions.checkNotNull(password, "password can not be null");
this.host = host;
this.port = port;
this.virtualHost = virtualHost;
this.username = username;
this.password = password;
this.networkRecoveryInterval = networkRecoveryInterval;
this.automaticRecovery = automaticRecovery;
this.topologyRecovery = topologyRecovery;
this.connectionTimeout = connectionTimeout;
this.requestedChannelMax = requestedChannelMax;
this.requestedFrameMax = requestedFrameMax;
this.requestedHeartbeat = requestedHeartbeat;
}
/**
*
* @param uri the connection URI
* @param networkRecoveryInterval connection recovery interval in milliseconds
* @param automaticRecovery if automatic connection recovery
* @param topologyRecovery if topology recovery
* @param connectionTimeout connection timeout
* @param requestedChannelMax requested maximum channel number
* @param requestedFrameMax requested maximum frame size
* @param requestedHeartbeat requested heartbeat interval
* @throws NullPointerException if URI is null
*/
private RMQConnectionConfig(String uri, Integer networkRecoveryInterval, Boolean automaticRecovery,
Boolean topologyRecovery, Integer connectionTimeout, Integer requestedChannelMax,
Integer requestedFrameMax, Integer requestedHeartbeat){
Preconditions.checkNotNull(uri, "Uri can not be null");
this.uri = uri;
this.networkRecoveryInterval = networkRecoveryInterval;
this.automaticRecovery = automaticRecovery;
this.topologyRecovery = topologyRecovery;
this.connectionTimeout = connectionTimeout;
this.requestedChannelMax = requestedChannelMax;
this.requestedFrameMax = requestedFrameMax;
this.requestedHeartbeat = requestedHeartbeat;
}
/** @return the host to use for connections */
public String getHost() {
return host;
}
/** @return the port to use for connections */
public int getPort() {
return port;
}
/**
* Retrieve the virtual host.
* @return the virtual host to use when connecting to the broker
*/
public String getVirtualHost() {
return virtualHost;
}
/**
* Retrieve the user name.
* @return the AMQP user name to use when connecting to the broker
*/
public String getUsername() {
return username;
}
/**
* Retrieve the password.
* @return the password to use when connecting to the broker
*/
public String getPassword() {
return password;
}
/**
* Retrieve the URI.
* @return the connection URI when connecting to the broker
*/
public String getUri() {
return uri;
}
/**
* Returns automatic connection recovery interval in milliseconds.
* @return how long will automatic recovery wait before attempting to reconnect, in ms; default is 5000
*/
public Integer getNetworkRecoveryInterval() {
return networkRecoveryInterval;
}
/**
* Returns true if automatic connection recovery is enabled, false otherwise.
* @return true if automatic connection recovery is enabled, false otherwise
*/
public Boolean isAutomaticRecovery() {
return automaticRecovery;
}
/**
* Returns true if topology recovery is enabled, false otherwise.
* @return true if topology recovery is enabled, false otherwise
*/
public Boolean isTopologyRecovery() {
return topologyRecovery;
}
/**
* Retrieve the connection timeout.
* @return the connection timeout, in milliseconds; zero for infinite
*/
public Integer getConnectionTimeout() {
return connectionTimeout;
}
/**
* Retrieve the requested maximum channel number.
* @return the initially requested maximum channel number; zero for unlimited
*/
public Integer getRequestedChannelMax() {
return requestedChannelMax;
}
/**
* Retrieve the requested maximum frame size.
* @return the initially requested maximum frame size, in octets; zero for unlimited
*/
public Integer getRequestedFrameMax() {
return requestedFrameMax;
}
/**
* Retrieve the requested heartbeat interval.
* @return the initially requested heartbeat interval, in seconds; zero for none
*/
public Integer getRequestedHeartbeat() {
return requestedHeartbeat;
}
/**
*
* @return Connection Factory for RMQ
* @throws URISyntaxException if Malformed URI has been passed
* @throws NoSuchAlgorithmException if the ssl factory could not be created
* @throws KeyManagementException if the ssl context could not be initialized
*/
public ConnectionFactory getConnectionFactory() throws URISyntaxException,
NoSuchAlgorithmException, KeyManagementException {
ConnectionFactory factory = new ConnectionFactory();
if (this.uri != null && !this.uri.isEmpty()){
try {
factory.setUri(this.uri);
} catch (URISyntaxException e) {
LOG.error("Failed to parse uri", e);
throw e;
} catch (KeyManagementException e) {
// this should never happen
LOG.error("Failed to initialize ssl context.", e);
throw e;
} catch (NoSuchAlgorithmException e) {
// this should never happen
LOG.error("Failed to setup ssl factory.", e);
throw e;
}
} else {
factory.setHost(this.host);
factory.setPort(this.port);
factory.setVirtualHost(this.virtualHost);
factory.setUsername(this.username);
factory.setPassword(this.password);
}
if (this.automaticRecovery != null) {
factory.setAutomaticRecoveryEnabled(this.automaticRecovery);
}
if (this.connectionTimeout != null) {
factory.setConnectionTimeout(this.connectionTimeout);
}
if (this.networkRecoveryInterval != null) {
factory.setNetworkRecoveryInterval(this.networkRecoveryInterval);
}
if (this.requestedHeartbeat != null) {
factory.setRequestedHeartbeat(this.requestedHeartbeat);
}
if (this.topologyRecovery != null) {
factory.setTopologyRecoveryEnabled(this.topologyRecovery);
}
if (this.requestedChannelMax != null) {
factory.setRequestedChannelMax(this.requestedChannelMax);
}
if (this.requestedFrameMax != null) {
factory.setRequestedFrameMax(this.requestedFrameMax);
}
return factory;
}
/**
* The Builder Class for {@link RMQConnectionConfig}.
*/
public static class Builder {
private String host;
private Integer port;
private String virtualHost;
private String username;
private String password;
private Integer networkRecoveryInterval;
private Boolean automaticRecovery;
private Boolean topologyRecovery;
private Integer connectionTimeout;
private Integer requestedChannelMax;
private Integer requestedFrameMax;
private Integer requestedHeartbeat;
private String uri;
/**
* Set the target port.
* @param port the default port to use for connections
* @return the Builder
*/
public Builder setPort(int port) {
this.port = port;
return this;
}
/** @param host the default host to use for connections
* @return the Builder
*/
public Builder setHost(String host) {
this.host = host;
return this;
}
/**
* Set the virtual host.
* @param virtualHost the virtual host to use when connecting to the broker
* @return the Builder
*/
public Builder setVirtualHost(String virtualHost) {
this.virtualHost = virtualHost;
return this;
}
/**
* Set the user name.
* @param username the AMQP user name to use when connecting to the broker
* @return the Builder
*/
public Builder setUserName(String username) {
this.username = username;
return this;
}
/**
* Set the password.
* @param password the password to use when connecting to the broker
* @return the Builder
*/
public Builder setPassword(String password) {
this.password = password;
return this;
}
/**
* Convenience method for setting the fields in an AMQP URI: host,
* port, username, password and virtual host. If any part of the
* URI is omitted, the ConnectionFactory's corresponding variable
* is left unchanged.
* @param uri is the AMQP URI containing the data
* @return the Builder
*/
public Builder setUri(String uri) {
this.uri = uri;
return this;
}
/**
* Enables or disables topology recovery.
* @param topologyRecovery if true, enables topology recovery
* @return the Builder
*/
public Builder setTopologyRecoveryEnabled(boolean topologyRecovery) {
this.topologyRecovery = topologyRecovery;
return this;
}
/**
* Set the requested heartbeat.
* @param requestedHeartbeat the initially requested heartbeat interval, in seconds; zero for none
* @return the Builder
*/
public Builder setRequestedHeartbeat(int requestedHeartbeat) {
this.requestedHeartbeat = requestedHeartbeat;
return this;
}
/**
* Set the requested maximum frame size.
* @param requestedFrameMax initially requested maximum frame size, in octets; zero for unlimited
* @return the Builder
*/
public Builder setRequestedFrameMax(int requestedFrameMax) {
this.requestedFrameMax = requestedFrameMax;
return this;
}
/**
* Set the requested maximum channel number.
* @param requestedChannelMax initially requested maximum channel number; zero for unlimited
*/
public Builder setRequestedChannelMax(int requestedChannelMax) {
this.requestedChannelMax = requestedChannelMax;
return this;
}
/**
* Sets connection recovery interval. Default is 5000.
* @param networkRecoveryInterval how long will automatic recovery wait before attempting to reconnect, in ms
* @return the Builder
*/
public Builder setNetworkRecoveryInterval(int networkRecoveryInterval) {
this.networkRecoveryInterval = networkRecoveryInterval;
return this;
}
/**
* Set the connection timeout.
* @param connectionTimeout connection establishment timeout in milliseconds; zero for infinite
* @return the Builder
*/
public Builder setConnectionTimeout(int connectionTimeout) {
this.connectionTimeout = connectionTimeout;
return this;
}
/**
* Enables or disables automatic connection recovery.
* @param automaticRecovery if true, enables connection recovery
* @return the Builder
*/
public Builder setAutomaticRecovery(boolean automaticRecovery) {
this.automaticRecovery = automaticRecovery;
return this;
}
/**
* The Builder method.
*
* <p>If URI is NULL we use host, port, vHost, username, password combination
* to initialize connection. using {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, String, String, String,
* Integer, Boolean, Boolean, Integer, Integer, Integer, Integer)}.
*
* <p>Otherwise the URI will be used to initialize the client connection
* {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, Boolean, Boolean, Integer, Integer, Integer, Integer)}
* @return RMQConnectionConfig
*/
public RMQConnectionConfig build(){
if (this.uri != null) {
return new RMQConnectionConfig(this.uri, this.networkRecoveryInterval,
this.automaticRecovery, this.topologyRecovery, this.connectionTimeout, this.requestedChannelMax,
this.requestedFrameMax, this.requestedHeartbeat);
} else {
return new RMQConnectionConfig(this.host, this.port, this.virtualHost, this.username, this.password,
this.networkRecoveryInterval, this.automaticRecovery, this.topologyRecovery,
this.connectionTimeout, this.requestedChannelMax, this.requestedFrameMax, this.requestedHeartbeat);
}
}
}
}