blob: f36ac150660bc6d442ca8d73c612b91f1e023ec9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.protocol;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
import org.apache.nifi.remote.VersionNegotiator;
public class ProtocolHandshake {
public static final byte[] MAGIC_HEADER = new byte[]{'N', 'i', 'F', 'i'};
public static final int RESOURCE_OK = 20;
public static final int DIFFERENT_RESOURCE_VERSION = 21;
public static final int ABORT = 255;
public static void initiateHandshake(final InputStream in, final OutputStream out, final VersionNegotiator versionNegotiator) throws IOException, HandshakeException {
final DataInputStream dis = new DataInputStream(in);
final DataOutputStream dos = new DataOutputStream(out);
try {
dos.write(MAGIC_HEADER);
initiateVersionNegotiation(versionNegotiator, dis, dos);
} finally {
dos.flush();
}
}
public static void receiveHandshake(final InputStream in, final OutputStream out, final VersionNegotiator versionNegotiator) throws IOException, HandshakeException {
final DataInputStream dis = new DataInputStream(in);
final DataOutputStream dos = new DataOutputStream(out);
try {
final byte[] magicHeaderBuffer = new byte[MAGIC_HEADER.length];
dis.readFully(magicHeaderBuffer);
receiveVersionNegotiation(versionNegotiator, dis, dos);
} finally {
dos.flush();
}
}
private static void initiateVersionNegotiation(final VersionNegotiator negotiator, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {
// Write the classname of the RemoteStreamCodec, followed by its version
dos.writeInt(negotiator.getVersion());
dos.flush();
// wait for response from server.
final int statusCode = dis.read();
switch (statusCode) {
case RESOURCE_OK: // server accepted our proposal of codec name/version
return;
case DIFFERENT_RESOURCE_VERSION: // server accepted our proposal of codec name but not the version
// Get server's preferred version
final int newVersion = dis.readInt();
// Determine our new preferred version that is no greater than the server's preferred version.
final Integer newPreference = negotiator.getPreferredVersion(newVersion);
// If we could not agree with server on a version, fail now.
if (newPreference == null) {
throw new HandshakeException("Could not agree on protocol version");
}
negotiator.setVersion(newPreference);
// Attempt negotiation of resource based on our new preferred version.
initiateVersionNegotiation(negotiator, dis, dos);
case ABORT:
throw new HandshakeException("Remote destination aborted connection with message: " + dis.readUTF());
default:
throw new HandshakeException("Received unexpected response code " + statusCode + " when negotiating version with remote server");
}
}
private static void receiveVersionNegotiation(final VersionNegotiator negotiator, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {
final int version = dis.readInt();
if (negotiator.isVersionSupported(version)) {
dos.write(RESOURCE_OK);
dos.flush();
negotiator.setVersion(version);
} else {
final Integer preferred = negotiator.getPreferredVersion(version);
if (preferred == null) {
dos.write(ABORT);
dos.flush();
throw new HandshakeException("Unable to negotiate an acceptable version of the Distributed Cache Protocol");
}
dos.write(DIFFERENT_RESOURCE_VERSION);
dos.writeInt(preferred);
dos.flush();
receiveVersionNegotiation(negotiator, dis, dos);
}
}
}