blob: f2203dbfa938ad8c33e57de2619fac761619846c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "communication/rmcomm_AsyncComm.h"
#include "communication/rmcomm_Connect.h"
#include "utils/network_utils.h"
#include "rmcommon.h"
#define ASYNCCOMM_MEMORY_CONTEXT_NAME "asynccomm"
#define ASYNCCOMM_CONNECTION_MAX_CAPABILITY 0X10000
#define ASYNCCOMM_READ_WRITE_ONCE_SIZE 8192
#define ASYNCCOMM_CONN_FILEINDEX 3
MCTYPE AsyncCommContext;
struct pollfd RegClients[ASYNCCOMM_CONNECTION_MAX_CAPABILITY];
AsyncCommBuffer CommBuffers[ASYNCCOMM_CONNECTION_MAX_CAPABILITY];
int CommBufferCounter;
char RWBuffer[ASYNCCOMM_READ_WRITE_ONCE_SIZE];
void freeCommBuffer(AsyncCommBuffer *pcommbuffer);
AsyncCommBuffer createCommBuffer(int fd,
uint32_t actionmask,
AsyncCommBufferHandlers methods,
void *userdata);
void closeRegisteredFileDesc(AsyncCommBuffer commbuff);
static void closeAllRegisteredFileDescs(int code, Datum arg);
void initializeAsyncComm(void)
{
AsyncCommContext = NULL;
AsyncCommContext = AllocSetContextCreate(TopMemoryContext,
ASYNCCOMM_MEMORY_CONTEXT_NAME,
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
CommBufferCounter = 0;
on_proc_exit(closeAllRegisteredFileDescs, 0);
}
int registerFileDesc(int fd,
uint32_t actionmask,
AsyncCommBufferHandlers methods,
void *userdata,
AsyncCommBuffer *newcommbuffer)
{
if ( CommBufferCounter >= ASYNCCOMM_CONNECTION_MAX_CAPABILITY )
{
elog(WARNING, "There are too many communication buffers in use. "
"Current in used buffer number %d",
CommBufferCounter);
return ASYNCCOMM_BUFFER_ARRAY_FULL;
}
if ( setConnectionNonBlocked(fd) != FUNC_RETURN_OK )
{
return UTIL_NETWORK_FAIL_SETFCNTL;
}
CommBuffers[CommBufferCounter] = createCommBuffer(fd,
actionmask,
methods,
userdata);
RegClients[CommBufferCounter].fd = fd;
*newcommbuffer = CommBuffers[CommBufferCounter];
CommBufferCounter++;
elog(DEBUG3, "Resource manager registered FD %d in poll slot %d, %s%s%s%s",
fd,
CommBufferCounter - 1,
(actionmask & ASYNCCOMM_READ) ? "(read)" : "",
(actionmask & ASYNCCOMM_WRITE) ? "(write)" : "",
(actionmask & ASYNCCOMM_READBYTES) ? "(read bytes)" : "",
(actionmask & ASYNCCOMM_WRITEBYTES) ? "(write bytes)" : "");
return FUNC_RETURN_OK;
}
void assignFileDescClientAddressInfo(AsyncCommBuffer commbuffer,
const char *clienthostname,
uint16_t serverport,
struct sockaddr_in *clientaddr,
socklen_t clientaddrlen)
{
/* Assign connection information into comm buffer. */
memcpy(&(commbuffer->ClientAddr), clientaddr, clientaddrlen);
commbuffer->ClientAddrLen = clientaddrlen;
strncpy(commbuffer->ClientAddrDotStr,
SOCKADDR(clientaddr),
sizeof(commbuffer->ClientAddrDotStr)-1);
commbuffer->ClientAddrPort = SOCKPORT(clientaddr);
commbuffer->ServerPort = serverport;
if ( clienthostname != NULL )
{
setSimpleStringNoLen(&(commbuffer->ClientHostname), clienthostname);
elog(DEBUG3, "Resource manager assigned hostname %s, port %d",
clienthostname,
commbuffer->ClientAddrPort);
}
}
int processAllCommFileDescs(void)
{
static int FreeIndexes[ASYNCCOMM_CONNECTION_MAX_CAPABILITY];
static int freeidx = -1;
/*
* This loop is to check if there are some FDs no need to check POLLOUT
* event. Because, some FDs maybe usually POLLOUT ready but no data to
* write, which causes CPU resource wasted.
*/
for ( int i = 0 ; i < CommBufferCounter ; ++i )
{
Assert(CommBuffers[i] != NULL);
RegClients[i].events = POLLERR | POLLHUP | POLLNVAL;
if ( CommBuffers[i]->ActionMask & (ASYNCCOMM_READ|ASYNCCOMM_READBYTES) )
{
RegClients[i].events |= POLLIN;
}
/*
* If the connection has no intention to write something or to handle
* write ready event, we don't care POLLOUT, otherwise, CPU resource may
* be wasted.
*/
if ( (CommBuffers[i]->ActionMask & ASYNCCOMM_WRITE) ||
((CommBuffers[i]->ActionMask & ASYNCCOMM_WRITEBYTES) &&
list_length(CommBuffers[i]->WriteBuffer) > 0) )
{
RegClients[i].events |= POLLOUT;
}
}
/* Check and process ready FDs. */
int readycount = 0;
readycount = poll(RegClients, CommBufferCounter, RESOURCE_NETWORK_POLL_TIMEOUT);
if( readycount > 0 )
{
for ( int i = 0 ; i < CommBufferCounter && readycount > 0 ; ++i )
{
/* The corresponding comm buffer instance must have correct fd. */
Assert(CommBuffers[i] != NULL);
Assert(CommBuffers[i]->FD == RegClients[i].fd);
/* Case 1. Process connection having error. */
if ( RegClients[i].revents & (POLLERR | POLLHUP | POLLNVAL) )
{
bool erroccured = false;
int error = 0;
socklen_t errlen = sizeof(error);
int res = getsockopt(RegClients[i].fd,
SOL_SOCKET,
SO_ERROR,
(void *)&error,
&errlen);
if (res < 0)
{
elog(WARNING, "getsocketopt() on FD %d have errors raised. "
"errno %d",
CommBuffers[i]->FD,
errno);
/* In fact, this should not occur. */
erroccured = true;
}
else if ( error > 0 )
{
elog(WARNING, "FD %d having errors raised. errno %d",
CommBuffers[i]->FD,
error);
erroccured = true;
}
if ( erroccured )
{
Assert( CommBuffers[i]->Methods->ErrorReadyHandle != NULL );
CommBuffers[i]->Methods->ErrorReadyHandle(CommBuffers[i]);
/* Tell the close this connection and free the buffer. */
forceCloseFileDesc(CommBuffers[i]);
readycount--;
continue;
}
/* Otherwise, skip this error. */
elog(DEBUG3, "poll() detected error is skipped.");
}
/* Case 2. Process connection ready to send data. */
if ( RegClients[i].revents & POLLOUT )
{
int wbuffsize = list_length(CommBuffers[i]->WriteBuffer);
elog(DEBUG3, "FD %d (client) is write ready.", CommBuffers[i]->FD);
/* Call write ready call back if necessary. */
if ( CommBuffers[i]->Methods->WriteReadyHandle != NULL )
{
SelfMaintainBuffer firstbuff = getFirstWriteBuffer(CommBuffers[i]);
elog(DEBUG3, "Write ready callback is set.");
/*
* When commbuffer wants to process received bytes or it
* cares the write event ready only, we call write ready
* handle here.
*/
if ( ((CommBuffers[i]->ActionMask & ASYNCCOMM_WRITEBYTES) &&
firstbuff != NULL &&
CommBuffers[i]->WriteContentSize ==
CommBuffers[i]->WriteContentOriginalSize) ||
((CommBuffers[i]->ActionMask & ASYNCCOMM_WRITE)) )
{
CommBuffers[i]->Methods->WriteReadyHandle(CommBuffers[i]);
}
}
/*
* Send the content firstly. Write ready handler might change
* the content to send or force the connection to close without
* writing out content, therefore we fetch the content size
* again and double check the close mark.
*/
wbuffsize = list_length(CommBuffers[i]->WriteBuffer);
if ( (CommBuffers[i]->ActionMask & ASYNCCOMM_WRITEBYTES) &&
!CommBuffers[i]->forcedClose &&
wbuffsize > 0 )
{
SelfMaintainBuffer tosendbuff = getFirstWriteBuffer(CommBuffers[i]);
/*
* Get content start point, WriteContentSize save the left
* content size should be sent.
*/
char *pstart = tosendbuff->Buffer +
getSMBContentSize(tosendbuff) -
CommBuffers[i]->WriteContentSize;
int wrsize = send(CommBuffers[i]->FD,
pstart,
CommBuffers[i]->WriteContentSize,
0);
if ( wrsize > 0 )
{
Assert( CommBuffers[i]->WriteContentSize >= wrsize );
/* Adjust the content size not sent yet. */
CommBuffers[i]->WriteContentSize -= wrsize;
if ( CommBuffers[i]->WriteContentSize == 0 )
{
/*
* Before destroy sent content, Write post handler
* is called to make handler able to recognize the
* sent content by reading the first send buffer.
*/
if ( CommBuffers[i]->Methods->WritePostHandle != NULL )
{
CommBuffers[i]->Methods->WritePostHandle(CommBuffers[i]);
}
/* Truly drop the sent content. */
shiftOutFirstWriteBuffer(CommBuffers[i]);
}
elog(DEBUG3, "FD %d (client) wrote %d bytes out. "
"Current buffer has %d bytes left, total "
"%d buffers.",
CommBuffers[i]->FD,
wrsize,
CommBuffers[i]->WriteContentSize,
list_length(CommBuffers[i]->WriteBuffer));
}
else if ( wrsize == -1 &&
errno != EWOULDBLOCK &&
errno != EAGAIN &&
errno != EINTR)
{
elog(WARNING, "FD %d failed to send message. errno %d",
CommBuffers[i]->FD,
errno);
Assert( CommBuffers[i]->Methods->ErrorReadyHandle != NULL );
CommBuffers[i]->Methods->ErrorReadyHandle(CommBuffers[i]);
/* Not acceptable error, should actively force close. */
forceCloseFileDesc(CommBuffers[i]);
}
}
readycount--;
}
/* Case 3. Process connection ready to receive data. */
else if ( RegClients[i].revents & POLLIN )
{
elog(DEBUG3, "Find FD %d is read ready.", CommBuffers[i]->FD);
/* Call Ready ready handler call back to do possible actions. */
if ( CommBuffers[i]->Methods->ReadReadyHandle != NULL)
{
CommBuffers[i]->Methods->ReadReadyHandle(CommBuffers[i]);
}
elog(DEBUG3, "commbuffer action mask %d, toclose %d, forced %d",
CommBuffers[i]->ActionMask,
CommBuffers[i]->toClose ? 1 : 0,
CommBuffers[i]->forcedClose ? 1 : 0);
/* Read ready handler might force the connection to close. */
if ( (CommBuffers[i]->ActionMask & ASYNCCOMM_READBYTES) &&
!CommBuffers[i]->toClose &&
!CommBuffers[i]->forcedClose )
{
/* Read data and append to the read buffer. */
int rdsize = recv(CommBuffers[i]->FD,
RWBuffer,
sizeof(RWBuffer),
0);
if ( rdsize > 0 )
{
appendSelfMaintainBuffer(&(CommBuffers[i]->ReadBuffer),
RWBuffer,
rdsize);
/*
* For client connection, the read post handler is
* mandatory. This is for recognizing the content
* format and do necessary action.
*/
Assert(CommBuffers[i]->Methods->ReadPostHandle != NULL);
CommBuffers[i]->Methods->ReadPostHandle(CommBuffers[i]);
elog(DEBUG3, "FD %d read %d bytes. %d to handle",
CommBuffers[i]->FD,
rdsize,
getSMBContentSize(&(CommBuffers[i]->ReadBuffer)));
}
else if ( rdsize == 0 )
{
forceCloseFileDesc(CommBuffers[i]);
elog(DEBUG3, "FD %d (client) is normally closed.",
CommBuffers[i]->FD);
}
else if ( rdsize == -1 &&
errno != EWOULDBLOCK &&
errno != EAGAIN &&
errno != EINTR)
{
elog(WARNING, "FD %d is forced closed due to recv() error. "
"errno %d",
CommBuffers[i]->FD,
errno);
Assert( CommBuffers[i]->Methods->ErrorReadyHandle != NULL );
CommBuffers[i]->Methods->ErrorReadyHandle(CommBuffers[i]);
/* Not acceptable error, should actively close. */
forceCloseFileDesc(CommBuffers[i]);
}
else
{
elog(WARNING, "FD %d errno %d", CommBuffers[i]->FD, errno);
}
}
readycount--;
} /* End of case 3. */
} /* End of looping each FD. */
/* Validate that all ready FDs should be processed. */
Assert(readycount == 0);
}
/* In case, poll() has error raised. */
else if ( readycount == -1 )
{
/* Ignore the errors due to signal, should be fine to retry next time. */
if ( errno == EAGAIN || errno == EINTR )
{
return FUNC_RETURN_OK;
}
return SYSTEM_CALL_ERROR; /* Fail to call poll() */
}
/* Actively close the connection should be actively closed. */
freeidx = -1;
for ( int i = 0 ; i < CommBufferCounter ; ++i )
{
bool shouldfree = false;
if ( CommBuffers[i]->forcedClose )
{
/* Call cleanup handler if necessary to do user-defined cleanup. */
elog(DEBUG3, "Close FD %d Index %d.", CommBuffers[i]->FD, i);
/* Close connection and free buffer */
closeRegisteredFileDesc(CommBuffers[i]);
shouldfree = true;
}
else if ( CommBuffers[i]->toClose && CommBuffers[i]->WriteBuffer == NULL )
{
if ( CommBuffers[i]->ClientHostname.Str != NULL &&
CommBuffers[i]->ServerPort != 0 )
{
elog(DEBUG3, "Return FD %d Index %d.", CommBuffers[i]->FD, i);
returnAliveConnectionRemoteByHostname(
&(CommBuffers[i]->FD),
CommBuffers[i]->ClientHostname.Str,
CommBuffers[i]->ServerPort);
}
else
{
elog(DEBUG3, "Close FD %d Index %d normally.", CommBuffers[i]->FD, i);
closeRegisteredFileDesc(CommBuffers[i]);
}
shouldfree = true;
}
if ( shouldfree )
{
Assert(CommBuffers[i]->Methods->CleanUpHandle != NULL);
CommBuffers[i]->Methods->CleanUpHandle(CommBuffers[i]);
freeCommBuffer(&CommBuffers[i]); /* Now CommBuffers[i] is set NULL.*/
freeidx++;
FreeIndexes[freeidx] = i;
/* Clear poll() array. */
RegClients[i].fd = -1;
}
}
/*
* Shift to remove freed slots among in-use slots to shorten the length of
* poll status array.
*/
if ( freeidx >= 0 )
{
for ( int i = 0 ; i <= freeidx ; ++i )
{
while( CommBufferCounter-1 >= 0 &&
CommBuffers[CommBufferCounter-1] == NULL )
{
elog(DEBUG5, "Skip poll slot %d, Curr counter is %d",
CommBufferCounter - 1,
CommBufferCounter);
CommBufferCounter--;
}
if ( CommBufferCounter-1 < FreeIndexes[i] )
{
/* No need to move any more. */
break;
}
elog(DEBUG5, "Move poll slot from %d to %d. Curr counter is %d",
CommBufferCounter - 1,
FreeIndexes[i],
CommBufferCounter);
RegClients[FreeIndexes[i]].fd = RegClients[CommBufferCounter-1].fd;
RegClients[FreeIndexes[i]].events = RegClients[CommBufferCounter-1].events;
RegClients[FreeIndexes[i]].revents = RegClients[CommBufferCounter-1].revents;
CommBuffers[FreeIndexes[i]] = CommBuffers[CommBufferCounter-1];
/*
* NOTE: CommBuffers[CommBufferCounter-1] will be overwritten by new
* comm buffer instance. not setting NULL here.
*/
CommBufferCounter--;
}
}
return FUNC_RETURN_OK;
}
AsyncCommBuffer createCommBuffer(int fd,
uint32_t actionmask,
AsyncCommBufferHandlers methods,
void *userdata)
{
Assert(methods != NULL);
AsyncCommBuffer result = rm_palloc0(AsyncCommContext,
sizeof(AsyncCommBufferData));
result->FD = fd;
result->ActionMask = actionmask;
result->Methods = methods;
result->toClose = false;
result->forcedClose = false;
result->UserData = userdata;
result->ClientAddrLen = 0;
result->ClientAddrPort = 0;
result->ServerPort = 0;
initSimpleString(&(result->ClientHostname), AsyncCommContext);
initializeSelfMaintainBuffer(&(result->ReadBuffer), AsyncCommContext);
result->WriteBuffer = NULL;
result->WriteContentSize = -1;
result->WriteContentOriginalSize = -1;
if ( result->Methods->InitHandle != NULL )
{
result->Methods->InitHandle(result);
}
return result;
}
void freeCommBuffer(AsyncCommBuffer *pcommbuffer)
{
Assert( pcommbuffer != NULL );
elog(DEBUG3, "Free CommBuffer for FD %d.", (*pcommbuffer)->FD);
freeSimpleStringContent(&((*pcommbuffer)->ClientHostname));
destroySelfMaintainBuffer(&((*pcommbuffer)->ReadBuffer));
MEMORY_CONTEXT_SWITCH_TO(AsyncCommContext)
while( (*pcommbuffer)->WriteBuffer != NULL )
{
SelfMaintainBuffer wbuffer = getFirstWriteBuffer(*pcommbuffer);
destroySelfMaintainBuffer(wbuffer);
(*pcommbuffer)->WriteBuffer = list_delete_first((*pcommbuffer)->WriteBuffer);
}
MEMORY_CONTEXT_SWITCH_BACK
rm_pfree(AsyncCommContext, *pcommbuffer);
*pcommbuffer = NULL;
}
void closeRegisteredFileDesc(AsyncCommBuffer commbuff)
{
closeConnectionRemote(&(commbuff->FD));
}
void closeAndRemoveAllRegisteredFileDesc(void)
{
for ( int i = 0 ; i < CommBufferCounter ; ++i )
{
Assert(CommBuffers[i] != NULL);
/* Call cleanup handler if necessary to do user-defined cleanup. */
CommBuffers[i]->Methods->CleanUpHandle(CommBuffers[i]);
elog(DEBUG5, "Close FD %d Index %d.", CommBuffers[i]->FD, i);
closeRegisteredFileDesc(CommBuffers[i]);
freeCommBuffer(&CommBuffers[i]);
/* Clear poll() array. */
RegClients[i].fd = -1;
}
CommBufferCounter = 0;
}
static void closeAllRegisteredFileDescs(int code, Datum arg)
{
closeAndRemoveAllRegisteredFileDesc();
}
void unresigsterFileDesc(int fd)
{
int pos = -1;
for ( int i = 0 ; i < CommBufferCounter ; ++i )
{
Assert(CommBuffers[i] != NULL);
if ( CommBuffers[i]->FD == fd )
{
/* Call cleanup handler if necessary to do user-defined cleanup. */
CommBuffers[i]->Methods->CleanUpHandle(CommBuffers[i]);
elog(DEBUG3, "Unregister FD %d Index %d.", CommBuffers[i]->FD, i);
CommBuffers[i]->FD = -1;
freeCommBuffer(&CommBuffers[i]);
pos = i;
break;
}
}
/*
* Shift to remove freed slot among in-use slots to shorten the length of
* poll status array.
*/
if ( pos >= 0 )
{
if ( CommBufferCounter > 1 )
{
RegClients[pos].fd = RegClients[CommBufferCounter-1].fd;
RegClients[pos].events = RegClients[CommBufferCounter-1].events;
RegClients[pos].revents = RegClients[CommBufferCounter-1].revents;
CommBuffers[pos] = CommBuffers[CommBufferCounter-1];
}
CommBufferCounter--;
}
}
void addMessageContentToCommBuffer(AsyncCommBuffer buffer,
SelfMaintainBuffer content)
{
MEMORY_CONTEXT_SWITCH_TO(AsyncCommContext)
buffer->WriteBuffer = lappend(buffer->WriteBuffer, content);
MEMORY_CONTEXT_SWITCH_BACK
if ( list_length(buffer->WriteBuffer) == 1 )
{
buffer->WriteContentSize = getSMBContentSize(content);
buffer->WriteContentOriginalSize = buffer->WriteContentSize;
}
}
SelfMaintainBuffer getFirstWriteBuffer(AsyncCommBuffer commbuffer)
{
if ( list_length(commbuffer->WriteBuffer) == 0 )
{
return NULL;
}
return (SelfMaintainBuffer)(lfirst(list_head(commbuffer->WriteBuffer)));
}
void shiftOutFirstWriteBuffer(AsyncCommBuffer commbuffer)
{
SelfMaintainBuffer rmbuff = getFirstWriteBuffer(commbuffer);
/* Free current buffer */
deleteSelfMaintainBuffer(rmbuff);
/* Shift to next buffer to get ready to send. */
MEMORY_CONTEXT_SWITCH_TO(AsyncCommContext)
commbuffer->WriteBuffer = list_delete_first(commbuffer->WriteBuffer);
MEMORY_CONTEXT_SWITCH_BACK
SelfMaintainBuffer firstbuff = getFirstWriteBuffer(commbuffer);
commbuffer->WriteContentSize = firstbuff == NULL ?
0 :
getSMBContentSize(firstbuff);
commbuffer->WriteContentOriginalSize = commbuffer->WriteContentSize;
}
void closeFileDesc(AsyncCommBuffer commbuff)
{
commbuff->toClose = true;
}
void forceCloseFileDesc(AsyncCommBuffer commbuff)
{
commbuff->toClose = true;
commbuff->forcedClose = true;
}
int registerAsyncConnectionFileDesc(const char *address,
uint16_t port,
uint32_t actionmask,
AsyncCommBufferHandlers methods,
void *userdata,
AsyncCommBuffer *newcommbuffer)
{
int res = FUNC_RETURN_OK;
int fd = -1;
int sockres = 0;
struct sockaddr_in server_addr;
/* Prepare for connecting. */
AddressString resolvedaddr = getAddressStringByHostName(address);
if ( resolvedaddr == NULL )
{
write_log("Failed to get host by name %s for async connecting a remote "
"socket server %s:%d",
address,
address,
port);
return UTIL_NETWORK_FAIL_GETHOST;
}
if ( rm_enable_connpool )
{
/* Try to get an alive connection from connection pool. */
fd = fetchAliveSocketConnection(address, resolvedaddr, port);
}
if ( fd != -1 )
{
res = registerFileDesc(fd,
actionmask,
methods,
userdata,
newcommbuffer);
return res;
}
/* Create socket FD */
fd = socket(AF_INET, SOCK_STREAM, 0);
if ( fd < 0 )
{
write_log("Failed to open socket for async connecting a remote socket "
"(errno %d)",
errno);
return UTIL_NETWORK_FAIL_CREATESOCKET;
}
/* Set FD unblocked. */
if ( setConnectionNonBlocked(fd) != FUNC_RETURN_OK )
{
close(fd);
return UTIL_NETWORK_FAIL_SETFCNTL;
}
bzero((char *)&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
memcpy((char *)&server_addr.sin_addr.s_addr,
resolvedaddr->Address,
resolvedaddr->Length);
server_addr.sin_port = htons(port);
/* Asynchronous connect. Should return value at once. */
sockres = connect(fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
if ( sockres == 0 )
{
if ( setConnectionLongTermNoDelay(fd) != FUNC_RETURN_OK )
{
close(fd);
return UTIL_NETWORK_FAIL_CONNECT;
}
/*
* New connection is created. Suppose domain socket and local socket
* connection can be done now. Register a normal client FD in poll() to
* perform content sending and receiving.
*/
res = registerFileDesc(fd, actionmask, methods, userdata, newcommbuffer);
if ( res == FUNC_RETURN_OK )
{
/* Assign connection address. */
assignFileDescClientAddressInfo(*newcommbuffer,
address,
port,
&server_addr,
sizeof(server_addr));
}
else
{
write_log("registerAsyncCommectionFileDesc failed registering fd %d.",
fd);
close(fd);
res = UTIL_NETWORK_FAIL_CONNECT;
}
}
else if ( sockres < 0 && errno == EINPROGRESS )
{
/*
* System is building connection now. Register it into poll() to perform
* asynchronous check. Build asynchronous connection commbuffer.
*/
res = registerFileDescForAsyncConn(fd,
actionmask,
methods,
userdata,
newcommbuffer);
if ( res == FUNC_RETURN_OK )
{
/* Assign connection address. */
assignFileDescClientAddressInfo(*newcommbuffer,
address,
port,
&server_addr,
sizeof(server_addr));
}
else
{
write_log("registerAsyncCommectionFileDesc failed registering fd %d.",
fd);
close(fd);
res = UTIL_NETWORK_FAIL_CONNECT;
}
}
else
{
/* Fail to build connection. */
write_log("registerAsyncConnectionFileDesc connect socket failed, "
"fd %d (errno %d)",
fd,
errno);
close(fd);
res = UTIL_NETWORK_FAIL_CONNECT;
}
return res;
}