blob: c01996e3e0bdf5426082cb9770a1c5d71373b7d6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __CSITE2SITE_CLIENT_PROTOCOL_H__
#define __CSITE2SITE_CLIENT_PROTOCOL_H__
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#include "api/nanofi.h"
#include "CSiteToSite.h"
#include "CPeer.h"
#include "uthash.h"
#ifdef __cplusplus
extern "C" {
#endif
#define DESCRIPTION_BUFFER_SIZE 2048
struct CRawSiteToSiteClient;
int readResponse(struct CRawSiteToSiteClient* client, RespondCode *code);
int writeResponse(struct CRawSiteToSiteClient* client, RespondCode code, const char * message);
int readRequestType(struct CRawSiteToSiteClient* client, RequestType *type);
int writeRequestType(struct CRawSiteToSiteClient* client, RequestType type);
void tearDown(struct CRawSiteToSiteClient* client);
int initiateResourceNegotiation(struct CRawSiteToSiteClient* client);
int initiateCodecResourceNegotiation(struct CRawSiteToSiteClient* client);
int negotiateCodec(struct CRawSiteToSiteClient* client);
int establish(struct CRawSiteToSiteClient* client);
void addTransaction(struct CRawSiteToSiteClient * client, CTransaction * transaction);
CTransaction * findTransaction(const struct CRawSiteToSiteClient * client, const char * id);
void deleteTransaction(struct CRawSiteToSiteClient * client, const char * id);
void clearTransactions(struct CRawSiteToSiteClient * client);
int handShake(struct CRawSiteToSiteClient * client);
int bootstrap(struct CRawSiteToSiteClient * client);
int complete(struct CRawSiteToSiteClient * client, const char * transactionID);
int confirm(struct CRawSiteToSiteClient * client, const char * transactionID);
int transmitPayload(struct CRawSiteToSiteClient * client, const char * payload, const attribute_set * attributes);
int16_t sendPacket(struct CRawSiteToSiteClient * client, const char * transactionID, CDataPacket *packet, flow_file_record * ff);
CTransaction* createTransaction(struct CRawSiteToSiteClient * client, TransferDirection direction);
static const char * getResourceName(const struct CRawSiteToSiteClient * c) {
return "SocketFlowFileProtocol";
}
static const char * getCodecResourceName(const struct CRawSiteToSiteClient * c) {
return "StandardFlowFileCodec";
}
static RespondCodeContext *getRespondCodeContext(RespondCode code) {
unsigned int i;
for ( i = 0; i < sizeof(respondCodeContext) / sizeof(RespondCodeContext); i++) {
if (respondCodeContext[i].code == code) {
return &respondCodeContext[i];
}
}
return NULL;
}
// RawSiteToSiteClient Class
struct CRawSiteToSiteClient {
// Batch Count
uint64_t _batch_count;
// Batch Size
uint64_t _batch_size;
// Batch Duration in msec
uint64_t _batch_duration;
// Timeout in msec
uint64_t _timeout;
// commsIdentifier
char _commsIdentifier[37];
// Peer State
PeerState _peer_state;
// portIDStr
char _port_id_str[37];
char _description_buffer[DESCRIPTION_BUFFER_SIZE]; //should be big enough
// Peer Connection
struct SiteToSiteCPeer* _peer;
// Indicatates if _peer is owned by the client
enum Bool _owns_resource;
CTransaction * _known_transactions;
// BATCH_SEND_NANOS
uint64_t _batchSendNanos;
/***
* versioning
*/
uint32_t _supportedVersion[5];
uint32_t _currentVersion;
int _currentVersionIndex;
uint32_t _supportedCodecVersion[1];
uint32_t _currentCodecVersion;
int _currentCodecVersionIndex;
};
static const char * getPortId(const struct CRawSiteToSiteClient * client) {
return client->_port_id_str;
}
static void setPortId(struct CRawSiteToSiteClient * client, const char * id) {
strncpy(client->_port_id_str, id, 37);
client->_port_id_str[36] = '\0';
int i;
for(i = 0; i < 37; i++){
client->_port_id_str[i] = tolower(client->_port_id_str[i]);
}
}
static void setBatchSize(struct CRawSiteToSiteClient *client, uint64_t size) {
client->_batch_size = size;
}
static void setBatchCount(struct CRawSiteToSiteClient *client, uint64_t count) {
client->_batch_count = count;
}
static void setBatchDuration(struct CRawSiteToSiteClient *client, uint64_t duration) {
client->_batch_duration = duration;
}
static uint64_t getTimeOut(const struct CRawSiteToSiteClient *client) {
return client->_timeout;
}
static void initRawClient(struct CRawSiteToSiteClient *client, struct SiteToSiteCPeer * peer) {
client->_owns_resource = False;
client->_peer = peer;
client->_peer_state = IDLE;
client->_batch_size = 0;
client->_batch_count = 0;
client->_batch_duration = 0;
client->_batchSendNanos = 5000000000; // 5 seconds
client->_timeout = 30000; // 30 seconds
client->_supportedVersion[0] = 5;
client->_supportedVersion[1] = 4;
client->_supportedVersion[2] = 3;
client->_supportedVersion[3] = 2;
client->_supportedVersion[4] = 1;
client->_currentVersion = client->_supportedVersion[0];
client->_currentVersionIndex = 0;
client->_supportedCodecVersion[0] = 1;
client->_currentCodecVersion = client->_supportedCodecVersion[0];
client->_currentCodecVersionIndex = 0;
client->_known_transactions = NULL;
memset(client->_description_buffer, 0, DESCRIPTION_BUFFER_SIZE);
}
static struct CRawSiteToSiteClient* createClient(const char * host, uint16_t port, const char * nifi_port) {
struct SiteToSiteCPeer * peer = (struct SiteToSiteCPeer *)malloc(sizeof(struct SiteToSiteCPeer));
initPeer(peer, host, port, "");
struct CRawSiteToSiteClient* client = (struct CRawSiteToSiteClient*)malloc(sizeof(struct CRawSiteToSiteClient));
initRawClient(client, peer);
client->_owns_resource = True;
setPortId(client, nifi_port);
return client;
}
static void destroyClient(struct CRawSiteToSiteClient * client){
tearDown(client);
if(client->_owns_resource == True) {
freePeer(client->_peer);
}
}
#ifdef __cplusplus
}
#endif
#endif