| /* $Id$ | |
| * | |
| * Licensed to the Apache Software Foundation (ASF) under one or more | |
| * contributor license agreements. See the NOTICE file distributed with | |
| * this work for additional information regarding copyright ownership. | |
| * The ASF licenses this file to you under the Apache License, Version | |
| * 2.0 (the "License"); you may not use this file except in compliance | |
| * with the License. You may obtain a copy of the License at | |
| * | |
| * http://www.apache.org/licenses/LICENSE-2.0 | |
| * | |
| * Unless required by applicable law or agreed to in writing, software | |
| * distributed under the License is distributed on an "AS IS" BASIS, | |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| * See the License for the specific language governing permissions and | |
| * limitations under the License. | |
| */ | |
| /* | |
| * etch_connection.c | |
| * connection client and server classes - tcp, udp | |
| */ | |
| #include "apr_time.h" | |
| #include "apr_network_io.h" | |
| #include "etchthread.h" | |
| #include "etch_tcpserver.h" | |
| #include "etch_encoding.h" | |
| #include "etchflexbuf.h" | |
| #include "etch_global.h" | |
| #include "etch_sessionmsg.h" | |
| #include "etchlog.h" | |
| char* ETCHTCPS = "TCPS"; | |
| #define ETCH_CONNECTION_DEFLINGERTIME 30 | |
| #define ETCH_CONNECTION_DEFNODELAY TRUE | |
| #define ETCH_CONNECTION_DEFRETRYDELAYMS 1000 | |
| unsigned next_etch_tcpserver_id(); | |
| unsigned next_etch_connection_id(); | |
| etch_tcp_connection* new_accepted_tcp_connection | |
| (char* host, const int port, etch_rawsocket*); | |
| int etch_tcpsvr_acceptproc(etch_threadparams*); | |
| int etch_tcpsvr_dummy_connection(etch_tcp_connection*); | |
| int etch_tcpsvr_on_data (etch_connection*, const int, int, etch_flexbuffer*); | |
| int etch_deftcplistener_on_event(etch_tcp_server*, etch_tcp_connection*, | |
| const int, int, void*); | |
| /* transport interface */ | |
| int etch_tcpsvr_transport_control(etch_tcp_server*, etch_event*, objmask*); | |
| int etch_tcpsvr_transport_notify (etch_tcp_server*, etch_event*); | |
| objmask* etch_tcpsvr_transport_query (etch_tcp_server*, objmask*); | |
| i_session* etch_tcpsvr_get_session(etch_tcp_server*); | |
| void etch_tcpsvr_set_session(etch_tcp_server*, i_sessionlistener*); | |
| /* session listener interface stubs */ | |
| int etch_tcpsvr_stub_on_session_accepted(etch_tcp_server*, void*); | |
| int etch_tcpsvr_stub_session_control (void*, void*, void*); | |
| int etch_tcpsvr_stub_session_notify (void*, void*); | |
| void* etch_tcpsvr_stub_session_query (void*, void*); | |
| #if(0) | |
| TCPSERVER | |
| | Socket, hostIP, port, delay, isKeepalive, isNoDelay | |
| | buffersize, isAutoflush, trafficclass | |
| | InputStream, OutputStream | |
| | stop0(); openSocket(); setupSocket(); readSocket(); | |
| | close(); send(); flush(); shutdownInput(); shutdownOutput(); | |
| | remoteAddress(); fireData(); transportData(); | |
| | | |
| - CONNECTION<SESSIONLISTENER> | |
| | | sessionListener session; | |
| | | sessionAccepted(socket); | |
| | | SESSION | |
| | | sessionQuery(); sessionControl(); sessionNotify(); | |
| | | | |
| | | Monitor status; | |
| | | Connection(); started(); stopped(); exception(); | |
| | | run0(); localAddress(); translateHost(); | |
| | | openSocket(); setupSocket(); readSocket(); close(); | |
| | | transportQuery(); transportControl(); transportNotify(); | |
| | | fireUp(); fireDown(); | |
| | | waitUp(); waitDown(); | |
| | | | |
| | - RUNNER | |
| | | | Thread thread; | |
| | | | RunnerHandler handler; | |
| | | | start0() | |
| | | | stop0() | |
| | | | run() | |
| | | | run0() | |
| | | | fireStarted() | |
| | | | fireStopped() | |
| | | | fireException() | |
| | | - ABSTRACTSTARTABLE | |
| | | | |
| | - TRANSPORT<SESSIONDATA> | |
| | | transportQuery(); transportControl(); transportNotify(); | |
| | | | |
| | - RUNNERHANDLER interface | |
| | started(); stopped(); exception(); | |
| | | |
| - TRANSPORT<SESSIONLISTENER> | |
| | SessionListener getSession(); setSession(SessionListener); | |
| | transportQuery(); transportControl(); transportNotify(); | |
| #endif | |
| /* - - - - - - - - - - - - - - - - - | |
| * constructors / destructors | |
| * - - - - - - - - - - - - - - - - - | |
| */ | |
| /* | |
| * new_accepted_tcp_connection() | |
| * etch_tcp_connection constructor for use with an accepted socket | |
| * receive thread exit must delete this object | |
| */ | |
| etch_tcp_connection* new_accepted_tcp_connection | |
| (char* host, const int port, etch_rawsocket* accepted_socket) | |
| { | |
| int result = 0; | |
| etch_tcp_connection* newcon = NULL; | |
| if (!host || !accepted_socket) return NULL; | |
| newcon = (etch_tcp_connection*) new_object (sizeof(etch_tcp_connection), | |
| ETCHTYPEB_CONNECTION, CLASSID_TCP_CONNECTION); | |
| newcon->destroy = destroy_etch_tcp_connection; | |
| /* 1/1/09 need to verify this is correct - unit test failed since it expected | |
| * tcpconx.session to be non-null - while new_tcp_connection() initialized | |
| * these interfaces, the new_accepted_connection() did not - watch this spot | |
| * in case it turns out that the transport and session interfaces now plugged | |
| * in here are incorrect. fyi unit test now passes but must re-vet runtime. | |
| */ | |
| if (0 != init_etch_tcpconx_interfaces (newcon)) return NULL; /* 1/1/09 */ | |
| etch_init_connection (&newcon->cx, accepted_socket, newcon); | |
| newcon->cx.socket = accepted_socket; | |
| newcon->cx.hostname = etch_malloc(strlen(host)+1, ETCHTYPEB_BYTES); | |
| strcpy(newcon->cx.hostname, host); | |
| newcon->cx.port = port; | |
| newcon->is_nodelay = ETCH_CONNECTION_DEFNODELAY; | |
| newcon->linger = ETCH_CONNECTION_DEFLINGERTIME; | |
| newcon->cx.on_event = etch_tcpconx_on_event; /* connection state handler */ | |
| #ifndef IS_ETCH_NO_SESSIONDATA /* dead-end data for testing */ | |
| newcon->cx.on_data = etch_tcpsvr_on_data; /* received data handler */ | |
| #endif | |
| if (0 == result) | |
| newcon->cx.on_event (newcon, ETCH_CONXEVT_CREATED, (int) (size_t) accepted_socket, 0); | |
| else | |
| { newcon->cx.on_event (newcon, ETCH_CONXEVT_CREATERR, 0, 0); | |
| destroy_etch_tcp_connection(newcon); | |
| newcon = NULL; | |
| } | |
| return newcon; | |
| } | |
| /** | |
| * new_tcp_server() | |
| * etch_tcp_server constructor | |
| */ | |
| etch_tcp_server* new_tcp_server (etch_url* url, etch_threadpool* mp, etch_threadpool* sp, | |
| etch_resources* resxmap, i_sessionlistener* insession) | |
| { | |
| etch_tcp_server* svr; | |
| if (NULL == url) return NULL; | |
| svr = (etch_tcp_server*) new_object | |
| (sizeof(etch_tcp_server), ETCHTYPEB_TCPSERVER, CLASSID_TCP_LISTENER); | |
| svr->listener_id = next_etch_tcpserver_id(); | |
| etchlog (ETCHTCPS, ETCHLOG_DEBUG, "creating tcp server %d ...\n", svr->listener_id); | |
| svr->destroy = destroy_etch_tcp_server; | |
| svr->clone = clone_null; | |
| svr->on_event = etch_deftcplistener_on_event; | |
| svr->on_data = etch_defconx_on_data; | |
| svr->resxmap = resxmap; /* not owned, can be null */ | |
| svr->threadpool = mp; /* currently always passed in */ | |
| svr->subpool = sp; /* currently always passed in */ | |
| svr->url = url; | |
| /* - - - - - - - - - - - - - - - | |
| * transport (i_transport) | |
| * - - - - - - - - - - - - - - - | |
| */ | |
| svr->itransport = new_transport_interface(svr, | |
| (etch_transport_control) etch_tcpsvr_transport_control, | |
| (etch_transport_notify) etch_tcpsvr_transport_notify, | |
| (etch_transport_query) etch_tcpsvr_transport_query); | |
| svr->transport_control = etch_tcpsvr_transport_control; | |
| svr->transport_notify = etch_tcpsvr_transport_notify; | |
| svr->transport_query = etch_tcpsvr_transport_query; | |
| svr->get_session = etch_tcpsvr_get_session; | |
| svr->set_session = etch_tcpsvr_set_session; | |
| /* - - - - - - - - - - - - - - - | |
| * session (i_sessionlistener) | |
| * - - - - - - - - - - - - - - - | |
| */ | |
| if (insession) | |
| { | |
| svr->set_session(svr, insession); | |
| svr->is_session_owned = FALSE; | |
| } | |
| else | |
| { i_session* isession = new_session_interface(svr, | |
| etch_tcpsvr_stub_session_control, | |
| etch_tcpsvr_stub_session_notify, | |
| etch_tcpsvr_stub_session_query); | |
| i_sessionlistener* newsession = new_sessionlistener_interface(svr, | |
| etch_tcpsvr_stub_on_session_accepted, isession); | |
| svr->set_session(svr, newsession); | |
| svr->is_session_owned = TRUE; | |
| } | |
| /* - - - - - - - - - - - - - - - | |
| * etch_tcp_server continued | |
| * - - - - - - - - - - - - - - - | |
| */ | |
| /* create the listener's tcp connection */ | |
| svr->cxlisten = new_tcp_connection (url, NULL, NULL); | |
| if (svr->cxlisten) | |
| { | |
| svr->cxlisten->cx.listener = (etch_object*) svr; | |
| svr->on_event(svr, 0, ETCH_CONXEVT_CREATED, 0, 0); | |
| } | |
| else | |
| { svr->on_event(svr, 0, ETCH_CONXEVT_CREATERR, 0, 0); | |
| destroy_etch_tcp_server(svr); | |
| svr = NULL; | |
| } | |
| if (svr) | |
| etchlog (ETCHTCPS, ETCHLOG_DEBUG, "tcp server %d created\n", svr->listener_id); | |
| else | |
| etchlog (ETCHTCPS, ETCHLOG_ERROR, "could not create tcp server\n"); | |
| return svr; | |
| } | |
| /** | |
| * etch_tcpserver_listenerproc | |
| * tcp receive thread procedure for accepted session connections. | |
| * this is the client session lifetime pump. it runs in its own thread, which | |
| * exists until the peer socket is closed, the session requests closure of the | |
| * session or server, or a socket error is detected. | |
| */ | |
| void etch_tcpserver_listenerproc (etch_threadparams* params) | |
| { | |
| char* buf = NULL; | |
| int result = 0, arc = 0; | |
| etch_flexbuffer* fbuf = NULL; | |
| const int thread_id = params->etch_thread_id; | |
| int is_shutdown_request = FALSE; | |
| int is_other_end_closed = FALSE, is_this_end_closed = FALSE; | |
| etch_tcp_connection* accx = (etch_tcp_connection*) params->data; | |
| etch_connection* cx = &accx->cx; | |
| etch_tcp_server* svr =(etch_tcp_server*) cx->listener; | |
| const int session_id = cx->conxid; | |
| const int blen = cx->bufsize? cx->bufsize: ETCH_CONX_DEFAULT_BUFSIZE; | |
| ETCH_ASSERT(is_etch_tcpconnection(accx)); | |
| cx->on_event = etch_tcpconx_on_event; | |
| svr->on_event(svr, accx, ETCH_CONXEVT_RCVPUMP_START, 0, 0); | |
| fbuf = new_flexbuffer(blen); | |
| buf = etch_flexbuf_get_buffer(fbuf); | |
| /* we are using blocking sockets. fyi if we switch to non-blocking mode | |
| * in the future, the APR_STATUS_IS_EAGAIN(rc) macro is the test we want | |
| * to make on the read or write result to determine if the socket was not | |
| * ready to read or write - that macro tests multiple apr result codes. | |
| */ | |
| /* receive pump -- blocking read */ | |
| while (svr->state == ETCH_TCPSERVER_STATE_STARTED && cx->is_started) | |
| { | |
| memset(buf, 0, blen); /* nice for debugging but otherwise unnecessary */ | |
| svr->on_event(svr, accx, ETCH_CONXEVT_RCVPUMP_RECEIVING, thread_id, 0); | |
| /* receive data from tcp socket into buffer owned by flexbuffer. | |
| * note that if this receive were to stop blocking, for example if the | |
| * peer went down without it being detected here, we would see serious | |
| * slowdown begin here due to unfettered looping of the listener proc. | |
| */ | |
| result = etch_tcpclient_receive (accx, buf, blen, &arc); /* block */ | |
| /* if result is >= 0, it is the number of bytes received */ | |
| if (svr->state != ETCH_TCPSERVER_STATE_STARTED || !cx->is_started) | |
| break; /* if server stopped, exit */ | |
| if (APR_OTHER_END_CLOSED == arc || ETCH_OTHER_END_CLOSED == result) | |
| { is_other_end_closed = TRUE; /* if peer down, exit */ | |
| break; | |
| } | |
| if (APR_THIS_END_CLOSED == arc) | |
| { is_this_end_closed = TRUE; /* if this end down, exit */ | |
| break; | |
| } | |
| if (ETCH_SHUTDOWN_NOTIFIED == result) | |
| { is_shutdown_request = TRUE; /* client sent server shutdown request */ | |
| svr->is_started = FALSE; /* (as opposed to client stop request) */ | |
| break; | |
| } | |
| if (result < 0) /* if socket error, exit */ | |
| { svr->on_event(svr, svr->cxlisten, ETCH_CONXEVT_RCVPUMP_ERR, arc, 0); | |
| break; | |
| } | |
| etch_flexbuf_set_length(fbuf, result); | |
| etch_flexbuf_set_index (fbuf, 0); | |
| if (result > 0) /* forward (result) bytes to data handler for packetization */ | |
| result = cx->on_data (cx, 0, result, fbuf); | |
| } | |
| /* at this point the client receive loop has exited, most likely due to | |
| * socket closed on either end, but could also be due to a socket error. | |
| */ | |
| svr->connections--; /* todo lose this count in favor of session arraylist */ | |
| if (is_this_end_closed || is_other_end_closed || is_shutdown_request) result = 0; | |
| svr->on_event(svr, accx, ETCH_CONXEVT_RCVPUMP_STOP, result, (void*)(size_t)thread_id); | |
| if (is_shutdown_request) | |
| { /* if this receive caught a server shutdown request from a client, do not destroy | |
| * this session, but rather message back to the main listener to shut itself down. | |
| * the main thread is blocking on the main listener thread, and will iterate and | |
| * destroy all the server's sessions once the listener thread unblocks. | |
| */ | |
| i_sessionlistener* mainlistener = svr->session; | |
| const int resultx = mainlistener->transport_control (mainlistener->thisx, | |
| new_etch_event(CLASSID_CONTROL_STOP_WAITDOWN, ETCH_INFWAIT), NULL); | |
| /* we will not return here until after main listener has been destroyed */ | |
| result = resultx; /* this extra local is simply a breakpoint target */ | |
| } | |
| else | |
| { /* this code is executed when a session is terminates normally, either due to | |
| * peer connection closing, or client stop request. this code is not executed | |
| * if the client is forced down from the main thread. | |
| */ | |
| etchlog(ETCHTCPS, ETCHLOG_DEBUG, "session %d begin teardown ...\n", session_id); | |
| ETCHOBJ_DESTROY(cx->session); /* tear down client session not including accx */ | |
| ETCHOBJ_DESTROY(accx); /* destroy client connection object */ | |
| etchlog(ETCHTCPS, ETCHLOG_DEBUG, "session %d destroyed\n", session_id); | |
| // etchlog(ETCHTCPS, ETCHLOG_DEBUG, "server %d resumes listening ...\n", svr->listener_id); | |
| } | |
| fbuf->destroy(fbuf); /* we now exit the client session receive thread */ | |
| etchlog(ETCHTCPS, ETCHLOG_INFO, "client session %d ends\n", session_id); | |
| etchlog(ETCHTCPS, ETCHLOG_DEBUG, "session %d thread exit ...\n", session_id); | |
| } | |
| /* | |
| * destroy_etch_tcp_server() | |
| * etch_tcp_server destructor | |
| */ | |
| int destroy_etch_tcp_server (etch_tcp_server* svr) | |
| { | |
| int result = 0, serverid = 0; | |
| if (NULL == svr) return -1; | |
| svr->on_event(svr, 0, ETCH_CONXEVT_DESTRUCTOR, svr->refcount, 0); | |
| if (svr->refcount > 0 && --svr->refcount > 0) return -1; | |
| serverid = svr->listener_id; | |
| svr->on_event(svr, 0, ETCH_CONXEVT_DESTROYING, 0, 0); | |
| etch_tcpsvr_close(svr); /* close if open */ | |
| if (!is_etchobj_static_content(svr)) | |
| destroy_etch_tcp_connection(svr->cxlisten); | |
| if (svr->session && svr->is_session_owned) | |
| ((objmask*)svr->session)->destroy (svr->session); | |
| if (svr->itransport) | |
| etch_free(svr->itransport); | |
| if (svr->threadpool && svr->is_threadpool_owned) | |
| svr->threadpool->destroy(svr->threadpool); | |
| svr->on_event(svr, 0, ETCH_CONXEVT_DESTROYED, 0, 0); | |
| result = destroy_objectex((objmask*)svr); | |
| return result; | |
| } | |
| /* - - - - - - - - - - - - - - - - - | |
| * tcp server methods | |
| * - - - - - - - - - - - - - - - - - | |
| */ | |
| /** | |
| * etch_tcpsvr_open() | |
| * open the server accept listener socket. | |
| */ | |
| int etch_tcpsvr_open (etch_tcp_server *svr, const int is_reconnect) | |
| { | |
| int result = -1, is_new_socket = 0, arc = 0, attempt = 0; | |
| etch_tcp_connection* tcpx = NULL; | |
| etch_connection* cx = NULL; | |
| ETCH_ASSERT(is_etch_tcpserver(svr)); | |
| tcpx = svr->cxlisten; | |
| cx = &tcpx->cx; | |
| if (svr->state != ETCH_TCPSERVER_STATE_CLOSED) return 0; | |
| svr->on_event(svr, tcpx, ETCH_CONXEVT_OPENING, 0, 0); | |
| is_new_socket = cx->socket == NULL; | |
| if (is_reconnect) | |
| if (!cx->socket || !cx->hostname || !*cx->hostname || !cx->delay) | |
| return svr->on_event(svr, tcpx, ETCH_CONXEVT_OPENERR, 0, 0); | |
| do | |
| { if (0 != (arc = apr_sockaddr_info_get(&cx->sockdata, cx->hostname, | |
| ETCH_DEFAULT_SOCKET_FAMILY, cx->port, 0, cx->aprpool))) | |
| { svr->on_event(svr, tcpx, ETCH_CONXEVT_OPENERR, 4, (void*)(size_t)arc); | |
| break; | |
| } | |
| if (is_new_socket) /* not reconnecting, create a socket */ | |
| { | |
| if (0 != (arc = new_tcpsocket (&cx->socket,cx->aprpool))) | |
| { svr->on_event(svr, tcpx, ETCH_CONXEVT_OPENERR, 3, (void*)(size_t)arc); | |
| break; | |
| } | |
| /* set socket options here: NONBLOCK, TIMEOUT */ | |
| } | |
| while(attempt++ < ETCH_CONNECTION_DEFRETRYATTEMPTS+1) | |
| { | |
| if (is_reconnect || attempt > 0) | |
| etch_sleep(cx->delay); | |
| if (0 != (arc = apr_socket_bind(cx->socket, cx->sockdata))) | |
| { svr->on_event(svr, tcpx, ETCH_CONXEVT_OPENERR, 5, (void*)(size_t)arc); | |
| continue; | |
| } | |
| if (0 == (arc = apr_socket_listen(cx->socket, svr->backlog))) | |
| { cx->is_started = TRUE; | |
| svr->on_event(svr, tcpx, ETCH_CONXEVT_LISTENED, cx->conxid, 0); | |
| break; | |
| } | |
| svr->on_event(svr, tcpx, ETCH_CONXEVT_OPENERR, 6, (void*)(size_t)arc); | |
| } | |
| } while(0); | |
| if (cx->is_started) | |
| { result = 0; /* stopped means no longer closed but not yet started */ | |
| svr->state = ETCH_TCPSERVER_STATE_STOPPED; | |
| } | |
| else | |
| if (is_new_socket) | |
| cx->socket = NULL; | |
| svr->on_event(svr, tcpx, | |
| result? ETCH_CONXEVT_OPENERR: ETCH_CONXEVT_OPENED, 0, 0); | |
| return result; | |
| } | |
| /* | |
| * etch_tcpsvr_close() | |
| * close server socket | |
| */ | |
| int etch_tcpsvr_close (etch_tcp_server* lxr) | |
| { | |
| int result = 0; | |
| etch_connection* cx = NULL; | |
| ETCH_ASSERT(is_etch_tcpserver(lxr)); | |
| if (lxr->state < ETCH_TCPSERVER_STATE_STOPPED) | |
| return -1; | |
| if (lxr->state > ETCH_TCPSERVER_STATE_STOPPED) | |
| result = etch_tcpsvr_stop(lxr); /* SVR BREAK 007 */ | |
| if (lxr->state != ETCH_TCPSERVER_STATE_STOPPED) | |
| return -1; | |
| cx = &lxr->cxlisten->cx; | |
| lxr->state = ETCH_TCPSERVER_STATE_CLOSING; | |
| lxr->on_event(lxr, 0, ETCH_CONXEVT_CLOSING, 0, 0); | |
| result = etch_tcpconx_close(lxr->cxlisten, 0); /* close listen socket */ | |
| lxr->state = ETCH_TCPSERVER_STATE_CLOSED; | |
| lxr->on_event(lxr, 0, result? ETCH_CONXEVT_CLOSERR: ETCH_CONXEVT_CLOSED, 0, 0); | |
| if (cx->waiter) | |
| { /* if another thread is blocking on this condition variable, we set | |
| * the condition to DOWN, which is presumably what the other thread | |
| * will unblock on. | |
| */ | |
| ETCH_ASSERT(is_etch_wait(cx->waiter)); | |
| cx->waiter->set(cx->waiter, ETCH_CONXEVT_DOWN); | |
| } | |
| return result; | |
| } | |
| /** | |
| * etch_tcpsvr_acceptproc() | |
| * accept pump | |
| */ | |
| int etch_tcpsvr_acceptproc (etch_threadparams* params) | |
| { | |
| int result = 0, arc = 0; | |
| etch_tcp_server* svr = (etch_tcp_server*) params->data; | |
| const int thread_id = params->etch_thread_id; | |
| etch_tcp_connection* newx = 0; | |
| etch_tcp_connection* tcpx = svr->cxlisten; | |
| etch_connection* cx = &tcpx->cx; | |
| etch_rawsocket* listensock = cx->socket; | |
| etch_rawsocket* newsock = NULL; | |
| apr_pool_t* newsubpool = NULL; | |
| etch_thread* newthread = NULL; | |
| ETCH_ASSERT(is_etch_tcpserver(svr)); | |
| /* each accepted connection gets its own apr subpool, which is freed when | |
| * the connection is destroyed. this accounts for apr memory specific to | |
| * the connection which is not explicitly freed, such as apr_socket_t */ | |
| apr_pool_create(&newsubpool, etch_apr_mempool); | |
| while (svr->is_started) | |
| { | |
| svr->on_event(svr, tcpx, ETCH_CONXEVT_ACCEPTING, 0, 0); | |
| /* BLOCK here for a client connection request */ | |
| if (0 != (arc = apr_socket_accept (&newsock, listensock, newsubpool))) | |
| { | |
| svr->on_event(svr, tcpx, ETCH_CONXEVT_ACCEPTERR, 0,(void*)(size_t)arc); | |
| if (svr->is_started) result = -1; /* if server shutdown, no error */ | |
| /* todo catch other conditions in which nonzero return not error */ | |
| break; | |
| } | |
| newsubpool = NULL; | |
| if (!svr->is_started) break; /* server shutdown */ | |
| etchlog (ETCHTCPS, ETCHLOG_DEBUG, "connect request for socket %x\n", | |
| (size_t) newsock); | |
| /* create the new accepted connection object. note that this object | |
| * will be freed when its listener thread exits. SVR BREAK 004 | |
| */ | |
| newx = new_accepted_tcp_connection (cx->hostname, cx->port, newsock); | |
| newx->cx.listener = (etch_object*) svr; | |
| newx->cx.is_started = TRUE; | |
| svr->on_event (svr, newx, ETCH_CONXEVT_ACCEPTED, 0, 0); | |
| /* 1/4/09 permit socket data handler to be overridden, e.g. by a unit test */ | |
| if (svr->on_data && svr->on_data != etch_defconx_on_data) | |
| newx->cx.on_data = svr->on_data; | |
| /* TODO use the sessionlistener interface to call back to the accepted handler. | |
| * we temporarily plugged it in directly to the tcp server object in new_etch_listener(). | |
| */ | |
| if (svr->on_session_accepted) /* e.g. transport.tcpxfact_session_accepted */ | |
| svr->on_session_accepted (svr->session, newx); /* SVR BREAK 005 */ | |
| /* on return from on_session_accepted, the connection cx.session is a reference | |
| * to the etch_session* client session data object. the thread we start below | |
| * will call the destructor on this object when it exits, in order to destroy | |
| * the client session objects such as delivery service, stub, etc. | |
| */ | |
| /* run a read listener for the client connection, on a thread from the client pool, | |
| * via etch_threadpool_run_freethread, etch_thread_start, etch_tcpserver_listenerproc. | |
| * SVR BREAK 006 | |
| */ | |
| if (NULL == (newthread = svr->threadpool->run (svr->subpool, | |
| etch_tcpserver_listenerproc, newx))) | |
| { | |
| svr->on_event (svr, newx, ETCH_CONXEVT_STARTERR, 1, 0); | |
| newx->destroy(newx); newx = NULL; /* todo newx should cx.session.destroy() */ | |
| result = -1; | |
| break; | |
| } | |
| /* the new client session is now executing on its own thread. | |
| * loop back to continue listening for client connection requests. | |
| */ | |
| svr->connections++; | |
| newx->cx.thread = newthread; | |
| /* transport can now access thread via serverfactory.connection */ | |
| apr_pool_create (&newsubpool, etch_apr_mempool); | |
| } | |
| if (newsubpool) | |
| apr_pool_destroy(newsubpool); | |
| if (0 == result) | |
| svr->on_event(svr, tcpx, ETCH_CONXEVT_ACCEPTPUMPEXIT, thread_id, 0); | |
| else svr->on_event(svr, tcpx, ETCH_CONXEVT_ACCEPTPUMPEXITERR, 0, 0); | |
| return result; | |
| } | |
| /** | |
| * etch_tcpsvr_on_data() | |
| * tcp socket received data handler. | |
| * @param cx the connection object. | |
| * @param uu parameter currently unused. | |
| * @param length number of bytes in the supplied data buffer. | |
| * @param data the data as received via the socket wrapped in a flexbuffer. | |
| * caller retains this memory. | |
| */ | |
| int etch_tcpsvr_on_data (etch_connection* cx, const int uu, int length, etch_flexbuffer* data) | |
| { | |
| int result = 0; | |
| i_sessiondata* session = NULL; | |
| etch_tcp_connection* tcpx = cx? (etch_tcp_connection*) cx->owner: NULL; | |
| ETCH_ASSERT(is_etch_tcpconnection(tcpx)); | |
| ETCH_ASSERT(is_etch_flexbuffer(data)); | |
| ETCH_ASSERT(is_etch_sessiondata(tcpx->session)); | |
| session = tcpx->session; | |
| /* send the data up the chain to be packetized. note that tcpx->session->thisx | |
| * is the owner of the i_sessiondata* session, which is the next higher layer | |
| * of the transport stack, which is ordinarily the packetizer. | |
| */ | |
| if (-1 == (result = session->session_data (session->thisx, NULL, data))) | |
| etchlog (ETCHCONX, ETCHLOG_ERROR, "%d bytes via connxn %d discarded\n", | |
| length, cx->conxid); | |
| return result; | |
| } | |
| /** | |
| * etch_tcpsvr_start() | |
| * start accepting connections | |
| */ | |
| int etch_tcpsvr_start (etch_tcp_server* tcpsvr) | |
| { | |
| int result = 0; | |
| etch_tcp_connection* tcpx = tcpsvr->cxlisten; | |
| etch_connection* cx = &tcpx->cx; | |
| if (tcpsvr->state != ETCH_TCPSERVER_STATE_STOPPED) | |
| return tcpsvr->on_event(tcpsvr, tcpx, ETCH_CONXEVT_STARTERR, 0, 0); | |
| /* the threadpool acts as the server's thread manager. it creates threads | |
| * on request and destroys them at thread exit. the main pool is always | |
| * present here in the runtime, but could be null for unit tests. */ | |
| if (tcpsvr->threadpool == NULL) /* currently only null for unit tests */ | |
| { tcpsvr->threadpool = new_threadpool (ETCH_THREADPOOLTYPE_FREE, 0); | |
| tcpsvr->is_threadpool_owned = TRUE; | |
| } | |
| /* data passed to threads will be either this object, or accepted connection | |
| * objects. configure thread manager to not free this data at thread exit. */ | |
| tcpsvr->threadpool->is_free_data = FALSE; | |
| tcpsvr->threadpool->is_data_etchobject = TRUE; | |
| tcpsvr->state = ETCH_TCPSERVER_STATE_STARTED; | |
| tcpsvr->is_started = TRUE; | |
| tcpsvr->on_event(tcpsvr, tcpx, ETCH_CONXEVT_STARTING, 0, 0); | |
| /* start the accept thread on the main thread manager. SVR BREAK 003 */ | |
| if (NULL == tcpsvr->threadpool->run (tcpsvr->threadpool, etch_tcpsvr_acceptproc, tcpsvr)) | |
| { | |
| tcpsvr->on_event (tcpsvr, tcpx, ETCH_CONXEVT_STARTERR, 1, 0); | |
| result = -1; | |
| } | |
| tcpsvr->on_event(tcpsvr, tcpx, result? ETCH_CONXEVT_STARTERR: ETCH_CONXEVT_STARTED, 0, 0); | |
| if (cx->waiter) | |
| { /* if another thread is blocking on this condition variable, we set the wait | |
| * variable to UP, which is presumably what the waiting thread will unblock on. */ | |
| ETCH_ASSERT(is_etch_wait(cx->waiter)); | |
| cx->waiter->set(cx->waiter, ETCH_CONXEVT_UP); | |
| } | |
| return result; | |
| } | |
| /** | |
| * etch_tcpsvr_stop() | |
| * stop accepting connections and shut down the accept listener. | |
| */ | |
| int etch_tcpsvr_stop (etch_tcp_server* server) | |
| { | |
| int result = 0; | |
| apr_socket_t* bogus = 0; | |
| const int ACCEPTWAITMS = 100; | |
| etch_tcp_connection* tcpx = NULL; | |
| ETCH_ASSERT(is_etch_tcpserver(server)); | |
| tcpx = server? server->cxlisten: NULL; | |
| if (!tcpx) return -1; | |
| if (server->state < ETCH_TCPSERVER_STATE_STARTED) | |
| return server->on_event(server, tcpx, ETCH_CONXEVT_STOPERR, 0, 0); | |
| server->is_started = FALSE; /* pump threads conditional */ | |
| server->state = ETCH_TCPSERVER_STATE_STOPPING; | |
| server->on_event(server, tcpx, ETCH_CONXEVT_STOPPING, 0, 0); | |
| /* unblock the accept thread so it will recognize that it should exit */ | |
| result = etch_tcpsvr_dummy_connection (tcpx); | |
| etch_sleep(ACCEPTWAITMS); /* pause here avoids accept thread hang */ | |
| /* BLOCK here until all threads exited */ | |
| result = threadpool_waitfor_all (server->threadpool, TRUE); | |
| server->state = ETCH_TCPSERVER_STATE_STOPPED; | |
| server->on_event(server, tcpx, ETCH_CONXEVT_STOPPED, 0, 0); | |
| return result; | |
| } | |
| /** | |
| * next_etch_tcpserver_id() | |
| * return a unique ID used to identify a server instance | |
| */ | |
| unsigned next_etch_tcpserver_id() | |
| { | |
| do { apr_atomic_inc32 ((volatile apr_uint32_t*) &tcpserver_id_farm); | |
| } while(tcpserver_id_farm == 0); | |
| return tcpserver_id_farm; | |
| } | |
| /** | |
| * etch_tcpsvr_dummy_connection() | |
| * create a dummy client connection so as to unblock the accept thread, | |
| * in order that it can then recognize that it should exit, and do so. | |
| * note that an invalid server address such as 0.0.0.0 will cause the connection | |
| * attempt to fail, whereas it will not have caused the accept to fail. in such | |
| * a case, the accept thread is left hanging, and a subsequent crash on service | |
| * exit is likely. TODO both ends should validate IP addresses. etch_url does not. | |
| */ | |
| int etch_tcpsvr_dummy_connection (etch_tcp_connection* tcpx) | |
| { | |
| apr_socket_t* dummy = 0; | |
| int arc = 0, attempts = 0; | |
| etch_connection* cx = &tcpx->cx; | |
| const int MAXATTEMPTS = 8, DELAY_BETWEEN_ATTEMPTS_MS = 100; | |
| if (0 != (arc = new_tcpsocket(&dummy, cx->aprpool))) | |
| cx->on_event (tcpx, ETCH_CONXEVT_OPENERR, 3, (void*)(size_t)arc); | |
| else while (1) | |
| { | |
| if (0 == (arc = apr_socket_connect (dummy, cx->sockdata))) | |
| break; | |
| if (++attempts > MAXATTEMPTS) | |
| { cx->on_event (tcpx, ETCH_CONXEVT_OPENERR, 2, (void*)(size_t)arc); | |
| break; | |
| } | |
| else etch_sleep(DELAY_BETWEEN_ATTEMPTS_MS); | |
| } | |
| return arc? -1: 0; | |
| } | |
| /* - - - - - - - - - - - - - - - - - | |
| * tcp server :: i_sessionlistener | |
| * - - - - - - - - - - - - - - - - - | |
| */ | |
| /* | |
| * pointers to these funtions are copied from the i_sessionlistener implementation | |
| * at set_session() time. these stub implementations are provided as placeholders. | |
| */ | |
| /* | |
| * etch_tcpsvr_stub_on_session_accepted() | |
| * i_sessionlistener::session_accepted default implementation | |
| * @param thisx | |
| * @param socket presumably an etch_socket wrapper. caller relinquishes. | |
| */ | |
| int etch_tcpsvr_stub_on_session_accepted(etch_tcp_server* thisx, void* socket) | |
| { | |
| if (is_etch_socket(socket)) | |
| ((objmask*)socket)->destroy(socket); | |
| return -1; | |
| } | |
| int etch_tcpsvr_stub_session_control (void* obj, void* evt, void* v) | |
| { | |
| return -1; | |
| } | |
| int etch_tcpsvr_stub_session_notify (void* obj, void* evt) | |
| { | |
| return -1; | |
| } | |
| void* etch_tcpsvr_stub_session_query (void* obj, void* query) | |
| { | |
| return NULL; | |
| } | |
| /* - - - - - - - - - - - - - - - | |
| * tcpserver :: i_transport | |
| * - - - - - - - - - - - - - - - | |
| */ | |
| /** | |
| * etch_tcpsvr_get_session | |
| * i_transport::get_session implementation. | |
| * returns the i_session, whose thisx is a etch_session_listener* | |
| */ | |
| i_session* etch_tcpsvr_get_session(etch_tcp_server* thisx) | |
| { | |
| ETCH_ASSERT(is_etch_tcpserver(thisx)); | |
| return thisx->isession; | |
| } | |
| /** | |
| * etch_tcpsvr_set_session() | |
| * i_transport::set_session() override | |
| * @param session an i_sessionlistener*. caller owns this object. | |
| */ | |
| void etch_tcpsvr_set_session(etch_tcp_server* thisx, i_sessionlistener* session) | |
| { | |
| ETCH_ASSERT(is_etch_tcpserver(thisx)); | |
| ETCH_ASSERT(is_etch_sessionlxr(session)); | |
| thisx->is_session_owned = FALSE; /* internal caller will re-set */ | |
| if (thisx->session) /* replacing? */ | |
| { ETCH_ASSERT(is_etch_sessionlxr(thisx->session)); | |
| thisx->session->destroy(thisx->session); | |
| } | |
| thisx->session = session; | |
| thisx->isession = session->isession; | |
| thisx->session_control = session->session_control; | |
| thisx->session_notify = session->session_notify; | |
| thisx->session_query = session->session_query; | |
| thisx->on_session_accepted = session->session_accepted; | |
| } | |
| /** | |
| * etch_tcpsvr_transport_control() | |
| * i_transport::transport_control override. | |
| * @param control event, sender relinquishes. | |
| * @param value control value, sender relinquishes. | |
| */ | |
| int etch_tcpsvr_transport_control (etch_tcp_server* thisx, etch_event* control, objmask* value) | |
| { | |
| int result = 0; | |
| etch_connection* cx = NULL; | |
| const int timeoutms = value? ((etch_int32*) value)->value: 0; | |
| const int objclass = control? control->class_id: 0; | |
| ETCH_ASSERT(is_etch_tcpserver(thisx)); | |
| cx = &thisx->cxlisten->cx; | |
| switch(objclass) | |
| { | |
| case CLASSID_CONTROL_START: | |
| if (0 == (result = etch_tcpsvr_open(thisx, ETCH_CONX_NOT_RECONNECTING))) | |
| result = etch_tcpsvr_start(thisx); | |
| break; | |
| case CLASSID_CONTROL_START_WAITUP: | |
| /* point to the condition variable on the waiter. this is a semikludge; | |
| * however we need to have a target for the up state before we do the | |
| * waitup, since the connect will complete before we get around to waitup, | |
| * and it needs to be able to mark state as up. previously the state | |
| * variable cond_var was not set until the wait_up was invoked. in the | |
| * current design the cond_var is nulled out after a wait, in order to | |
| * reset wait state to not waiting, so we need to ensure it is populated | |
| * in advance of any need to set a wait condition to some state prior to | |
| * actually waiting. | |
| */ | |
| etchconx_init_waitstate(cx); | |
| /* open the connection, and wait for completion. caller blocks here. | |
| * timeout is indicated via result code 1 = ETCH_TIMEOUT. SVR BREAK 002 | |
| */ | |
| if (0 == (result = etch_tcpsvr_open(thisx, ETCH_CONX_NOT_RECONNECTING))) | |
| if (0 == (result = etch_tcpsvr_start(thisx))) | |
| result = etchconx_wait_up(cx, timeoutms); | |
| break; | |
| case CLASSID_CONTROL_STOP: | |
| result = etch_tcpsvr_close(thisx); | |
| break; | |
| case CLASSID_CONTROL_STOP_WAITDOWN: | |
| /* note all comments above at case CLASSID_CONTROL_START_WAITUP */ | |
| etchconx_init_waitstate(cx); | |
| if (0 == (result = etch_tcpsvr_close(thisx))) | |
| result = etchconx_wait_down(cx, timeoutms); | |
| break; | |
| } | |
| if (control) control->destroy(control); | |
| if (value) value->destroy(value); | |
| return result; | |
| } | |
| /** | |
| * etch_tcpsvr_transport_notify() | |
| * i_transport::transport_notify override. | |
| * @param evt, caller relinquishes. | |
| */ | |
| int etch_tcpsvr_transport_notify (etch_tcp_server* thisx, etch_event* evt) | |
| { | |
| ETCH_ASSERT(is_etch_tcpserver(thisx)); | |
| if (evt) evt->destroy(evt); | |
| return 0; /* nothing to do */ | |
| } | |
| /** | |
| * etch_tcpsvr_transport_query() | |
| * i_transport::transport_query override. | |
| * @param query, caller relinquishes. | |
| */ | |
| objmask* etch_tcpsvr_transport_query (etch_tcp_server* thisx, objmask* query) | |
| { | |
| ETCH_ASSERT(is_etch_tcpserver(thisx)); | |
| /* todo is this right? */ | |
| return thisx->itransport->transport_query(thisx->itransport, query); | |
| } | |