blob: cba975d4d756a7aaa99986229674db8f6e2c8252 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBMINIFI_INCLUDE_IO_POSIX_CLIENTSOCKET_H_
#define LIBMINIFI_INCLUDE_IO_POSIX_CLIENTSOCKET_H_
#include <cstdint>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <unistd.h>
#include <mutex>
#include <atomic>
#include "io/BaseStream.h"
#include "core/Core.h"
#include "core/logging/Logger.h"
#include "io/validation.h"
#include "properties/Configure.h"
#include "io/NetworkPrioritizer.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace io {
/**
* Context class for socket. This is currently only used as a parent class for TLSContext. It is necessary so the Socket and TLSSocket constructors
* can be the same. It also gives us a common place to set timeouts, etc from the Configure object in the future.
*/
class SocketContext {
public:
SocketContext(const std::shared_ptr<Configure> &configure) {
}
};
/**
* Socket class.
* Purpose: Provides a general purpose socket interface that abstracts
* connecting information from users
* Design: Extends DataStream and allows us to perform most streaming
* operations against a BSD socket
*
*
*/
class Socket : public BaseStream {
public:
/**
* Constructor that creates a client socket.
* @param context the SocketContext
* @param hostname hostname we are connecting to.
* @param port port we are connecting to.
*/
explicit Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port);
/**
* Move constructor.
*/
explicit Socket(const Socket &&);
/**
* Static function to return the current machine's host name
*/
static std::string getMyHostName() {
static std::string HOSTNAME = init_hostname();
return HOSTNAME;
}
/**
* Destructor
*/
virtual ~Socket();
virtual void closeStream();
/**
* Initializes the socket
* @return result of the creation operation.
*/
virtual int16_t initialize();
virtual void setInterface(io::NetworkInterface &&interface) {
local_network_interface_ = std::move(interface);
}
/**
* Sets the non blocking flag on the file descriptor.
*/
void setNonBlocking();
std::string getHostname() const;
/**
* Return the port for this socket
* @returns port
*/
uint16_t getPort() const {
return port_;
}
// data stream extensions
/**
* Reads data and places it into buf
* @param buf buffer in which we extract data
* @param buflen
* @param retrieve_all_bytes determines if we should read all bytes before returning
*/
virtual int readData(std::vector<uint8_t> &buf, int buflen) {
return readData(buf, buflen, true);
}
/**
* Reads data and places it into buf
* @param buf buffer in which we extract data
* @param buflen
* @param retrieve_all_bytes determines if we should read all bytes before returning
*/
virtual int readData(uint8_t *buf, int buflen) {
return readData(buf, buflen, true);
}
/**
* Reads data and places it into buf
* @param buf buffer in which we extract data
* @param buflen
* @param retrieve_all_bytes determines if we should read all bytes before returning
*/
virtual int readData(std::vector<uint8_t> &buf, int buflen, bool retrieve_all_bytes);
/**
* Reads data and places it into buf
* @param buf buffer in which we extract data
* @param buflen
* @param retrieve_all_bytes determines if we should read all bytes before returning
*/
virtual int readData(uint8_t *buf, int buflen, bool retrieve_all_bytes);
/**
* Write value to the stream using std::vector
* @param buf incoming buffer
* @param buflen buffer to write
*
*/
virtual int writeData(std::vector<uint8_t> &buf, int buflen);
/**
* writes value to stream
* @param value value to write
* @param size size of value
*/
virtual int writeData(uint8_t *value, int size);
/**
* Writes a system word
* @param value value to write
*/
virtual int write(uint64_t value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
/**
* Writes a uint32_t
* @param value value to write
*/
virtual int write(uint32_t value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
/**
* Writes a system short
* @param value value to write
*/
virtual int write(uint16_t value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
/**
* Reads a system word
* @param value value to write
*/
virtual int read(uint64_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
/**
* Reads a uint32_t
* @param value value to write
*/
virtual int read(uint32_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
/**
* Reads a system short
* @param value value to write
*/
virtual int read(uint16_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
/**
* Returns the underlying buffer
* @return vector's array
**/
const uint8_t *getBuffer() const {
return DataStream::getBuffer();
}
/**
* Retrieve size of data stream
* @return size of data stream
**/
const uint64_t getSize() const {
return DataStream::getSize();
}
protected:
/**
* Constructor that accepts host name, port and listeners. With this
* contructor we will be creating a server socket
* @param context the SocketContext
* @param hostname our host name
* @param port connecting port
* @param listeners number of listeners in the queue
*/
explicit Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners);
/**
* Creates a vector and returns the vector using the provided
* type name.
* @param t incoming object
* @returns vector.
*/
template<typename T>
std::vector<uint8_t> readBuffer(const T&);
/**
* Creates a connection using the address info object.
* @param p addrinfo structure.
* @returns fd.
*/
virtual int8_t createConnection(const addrinfo *p, in_addr_t &addr);
/**
* Sets socket options depending on the instance.
* @param sock socket file descriptor.
*/
virtual int16_t setSocketOptions(const int sock);
/**
* Attempt to select the socket file descriptor
* @param msec timeout interval to wait
* @returns file descriptor
*/
virtual int16_t select_descriptor(const uint16_t msec);
addrinfo *addr_info_;
std::recursive_mutex selection_mutex_;
std::string requested_hostname_;
std::string canonical_hostname_;
uint16_t port_;
bool is_loopback_only_;
io::NetworkInterface local_network_interface_;
// connection information
int32_t socket_file_descriptor_;
fd_set total_list_;
fd_set read_fds_;
std::atomic<uint16_t> socket_max_;
std::atomic<uint64_t> total_written_;
std::atomic<uint64_t> total_read_;
uint16_t listeners_;
bool nonBlocking_;
std::shared_ptr<logging::Logger> logger_;
void setPort(uint16_t port) {
port_ = port;
}
private:
static std::string init_hostname() {
char hostname[1024];
gethostname(hostname, 1024);
Socket mySock(nullptr, hostname, 0);
mySock.initialize();
auto resolved_hostname = mySock.getHostname();
return !IsNullOrEmpty(resolved_hostname) ? resolved_hostname : hostname;
}
};
} /* namespace io */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */
#endif /* LIBMINIFI_INCLUDE_IO_POSIX_CLIENTSOCKET_H_ */