blob: 6e4b860af7350bea7a2dc48a3fa9a41351d71300 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote.protocol.socket;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.RemoteResourceFactory;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.VersionNegotiator;
import org.apache.nifi.remote.cluster.ClusterNodeInformation;
import org.apache.nifi.remote.cluster.NodeInformation;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.protocol.AbstractFlowFileServerProtocol;
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.HandshakenProperties;
import org.apache.nifi.remote.protocol.RequestType;
import org.apache.nifi.remote.protocol.ResponseCode;
import org.apache.nifi.util.NiFiProperties;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol {
public static final String RESOURCE_NAME = "SocketFlowFileProtocol";
// Version 6 added to support Zero-Master Clustering, which was introduced in NiFi 1.0.0
private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(6, 5, 4, 3, 2, 1);
@Override
protected HandshakenProperties doHandshake(Peer peer) throws IOException, HandshakeException {
HandshakenProperties confirmed = new HandshakenProperties();
final CommunicationsSession commsSession = peer.getCommunicationsSession();
final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
confirmed.setCommsIdentifier(dis.readUTF());
if (versionNegotiator.getVersion() >= 3) {
String transitUriPrefix = dis.readUTF();
if (!transitUriPrefix.endsWith("/")) {
transitUriPrefix = transitUriPrefix + "/";
}
confirmed.setTransitUriPrefix(transitUriPrefix);
}
final Map<String, String> properties = new HashMap<>();
final int numProperties = dis.readInt();
for (int i = 0; i < numProperties; i++) {
final String propertyName = dis.readUTF();
final String propertyValue = dis.readUTF();
properties.put(propertyName, propertyValue);
}
// evaluate the properties received
boolean responseWritten = false;
try {
validateHandshakeRequest(confirmed, peer, properties);
} catch (HandshakeException e) {
ResponseCode handshakeResult = e.getResponseCode();
if(handshakeResult.containsMessage()){
handshakeResult.writeResponse(dos, e.getMessage());
} else {
handshakeResult.writeResponse(dos);
}
switch (handshakeResult) {
case UNAUTHORIZED:
case PORT_NOT_IN_VALID_STATE:
case PORTS_DESTINATION_FULL:
responseWritten = true;
break;
default:
throw e;
}
}
// send "OK" response
if (!responseWritten) {
ResponseCode.PROPERTIES_OK.writeResponse(dos);
}
return confirmed;
}
@Override
public FlowFileCodec negotiateCodec(final Peer peer) throws IOException, ProtocolException {
if (!handshakeCompleted) {
throw new IllegalStateException("Handshake has not been completed");
}
if (shutdown) {
throw new IllegalStateException("Protocol is shutdown");
}
logger.debug("{} Negotiating Codec with {} using {}", new Object[]{this, peer, peer.getCommunicationsSession()});
final CommunicationsSession commsSession = peer.getCommunicationsSession();
final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
if (port == null) {
RemoteResourceFactory.rejectCodecNegotiation(dis, dos, "Cannot transfer FlowFiles because no port was specified");
}
// Negotiate the FlowFileCodec to use.
try {
negotiatedFlowFileCodec = RemoteResourceFactory.receiveCodecNegotiation(dis, dos);
logger.debug("{} Negotiated Codec {} with {}", new Object[]{this, negotiatedFlowFileCodec, peer});
return negotiatedFlowFileCodec;
} catch (final HandshakeException e) {
throw new ProtocolException(e.toString());
}
}
@Override
public RequestType getRequestType(final Peer peer) throws IOException {
if (!handshakeCompleted) {
throw new IllegalStateException("Handshake has not been completed");
}
if (shutdown) {
throw new IllegalStateException("Protocol is shutdown");
}
logger.debug("{} Reading Request Type from {} using {}", new Object[]{this, peer, peer.getCommunicationsSession()});
final RequestType requestType = RequestType.readRequestType(new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream()));
logger.debug("{} Got Request Type {} from {}", new Object[]{this, requestType, peer});
return requestType;
}
@Override
public void sendPeerList(final Peer peer, final Optional<ClusterNodeInformation> clusterNodeInfo) throws IOException {
if (!handshakeCompleted) {
throw new IllegalStateException("Handshake has not been completed");
}
if (shutdown) {
throw new IllegalStateException("Protocol is shutdown");
}
logger.debug("{} Sending Peer List to {}", this, peer);
final CommunicationsSession commsSession = peer.getCommunicationsSession();
final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
final NiFiProperties properties = NiFiProperties.getInstance();
String remoteInputHost = properties.getRemoteInputHost();
if (remoteInputHost == null) {
remoteInputHost = InetAddress.getLocalHost().getHostName();
}
logger.debug("{} Advertising Remote Input host name {}", this, peer);
List<NodeInformation> nodeInfos;
if (clusterNodeInfo.isPresent()) {
nodeInfos = new ArrayList<>(clusterNodeInfo.get().getNodeInformation());
} else {
final NodeInformation self = new NodeInformation(remoteInputHost, properties.getRemoteInputPort(), properties.getRemoteInputHttpPort(), properties.getRemoteInputHttpPort(),
properties.isSiteToSiteSecure(), 0);
nodeInfos = Collections.singletonList(self);
}
// determine how many nodes have Site-to-site enabled
int numPeers = 0;
for (final NodeInformation nodeInfo : nodeInfos) {
if (nodeInfo.getSiteToSitePort() != null) {
numPeers++;
}
}
dos.writeInt(numPeers);
for (final NodeInformation nodeInfo : nodeInfos) {
if (nodeInfo.getSiteToSitePort() == null) {
continue;
}
dos.writeUTF(nodeInfo.getSiteToSiteHostname());
dos.writeInt(nodeInfo.getSiteToSitePort());
dos.writeBoolean(nodeInfo.isSiteToSiteSecure());
dos.writeInt(nodeInfo.getTotalFlowFiles());
}
logger.info("Sending list of {} peers back to client {}", numPeers, peer);
dos.flush();
}
@Override
public String getResourceName() {
return RESOURCE_NAME;
}
@Override
public VersionNegotiator getVersionNegotiator() {
return versionNegotiator;
}
@Override
protected String createTransitUri(Peer peer, String sourceFlowFileIdentifier) {
String transitUriPrefix = handshakenProperties.getTransitUriPrefix();
return (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier;
}
}