| /** @file |
| |
| A brief file description |
| |
| @section license License |
| |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| //------------------------------------------------------------------------- |
| // include files |
| //------------------------------------------------------------------------- |
| |
| #include "ink_config.h" |
| |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <limits.h> |
| #include <string.h> |
| #include <sys/types.h> |
| |
| #include "P_EventSystem.h" |
| #include "P_Net.h" |
| |
| #include "LogUtils.h" |
| #include "LogSock.h" |
| #include "LogField.h" |
| #include "LogFile.h" |
| #include "LogFormat.h" |
| #include "LogBuffer.h" |
| #include "LogHost.h" |
| #include "LogObject.h" |
| #include "LogConfig.h" |
| #include "Log.h" |
| |
| #include "LogCollationHostSM.h" |
| |
| //------------------------------------------------------------------------- |
| // statics |
| //------------------------------------------------------------------------- |
| |
| int |
| LogCollationHostSM::ID = 0; |
| |
| //------------------------------------------------------------------------- |
| // LogCollationHostSM::LogCollationHostSM |
| //------------------------------------------------------------------------- |
| |
| LogCollationHostSM::LogCollationHostSM(NetVConnection * client_vc): |
| Continuation(new_ProxyMutex()), |
| m_client_vc(client_vc), |
| m_client_vio(NULL), |
| m_client_buffer(NULL), |
| m_client_reader(NULL), |
| m_pending_event(NULL), |
| m_read_buffer(NULL), m_read_bytes_wanted(0), m_read_bytes_received(0), m_client_ip(0), m_client_port(0), m_id(ID++) |
| { |
| |
| Debug("log-coll", "[%d]host::constructor", m_id); |
| |
| ink_assert(m_client_vc != NULL); |
| |
| // get client info |
| m_client_ip = m_client_vc->get_remote_ip(); |
| m_client_port = m_client_vc->get_remote_port(); |
| Note("[log-coll] client connected [%d.%d.%d.%d:%d]", |
| ((unsigned char *) (&m_client_ip))[0], |
| ((unsigned char *) (&m_client_ip))[1], |
| ((unsigned char *) (&m_client_ip))[2], ((unsigned char *) (&m_client_ip))[3], m_client_port); |
| |
| SET_HANDLER((LogCollationHostSMHandler) & LogCollationHostSM::host_handler); |
| host_init(LOG_COLL_EVENT_SWITCH, NULL); |
| |
| } |
| |
| //------------------------------------------------------------------------- |
| //------------------------------------------------------------------------- |
| // |
| // handlers |
| // |
| //------------------------------------------------------------------------- |
| //------------------------------------------------------------------------- |
| |
| //------------------------------------------------------------------------- |
| // LogCollationHostSM::host_handler |
| //------------------------------------------------------------------------- |
| |
| int |
| LogCollationHostSM::host_handler(int event, void *data) |
| { |
| |
| switch (m_host_state) { |
| case LOG_COLL_HOST_AUTH: |
| return host_auth(event, data); |
| case LOG_COLL_HOST_DONE: |
| return host_done(event, data); |
| case LOG_COLL_HOST_INIT: |
| return host_init(event, data); |
| case LOG_COLL_HOST_RECV: |
| return host_recv(event, data); |
| default: |
| ink_assert(!"unexpected state"); |
| return EVENT_CONT; |
| } |
| |
| } |
| |
| //------------------------------------------------------------------------- |
| // LogCollationHostSM::read_handler |
| //------------------------------------------------------------------------- |
| |
| int |
| LogCollationHostSM::read_handler(int event, void *data) |
| { |
| |
| switch (m_read_state) { |
| case LOG_COLL_READ_BODY: |
| return read_body(event, (VIO *) data); |
| case LOG_COLL_READ_HDR: |
| return read_hdr(event, (VIO *) data); |
| default: |
| ink_assert(!"unexpected state"); |
| return EVENT_CONT; |
| } |
| |
| } |
| |
| //------------------------------------------------------------------------- |
| //------------------------------------------------------------------------- |
| // |
| // host states |
| // |
| //------------------------------------------------------------------------- |
| //------------------------------------------------------------------------- |
| |
| //------------------------------------------------------------------------- |
| // LogCollationHostSM::host_auth |
| // next: host_done || host_recv |
| //------------------------------------------------------------------------- |
| |
| int |
| LogCollationHostSM::host_auth(int event, void *data) |
| { |
| NOWARN_UNUSED(data); |
| Debug("log-coll", "[%d]host::host_auth", m_id); |
| |
| switch (event) { |
| |
| case LOG_COLL_EVENT_SWITCH: |
| Debug("log-coll", "[%d]host::host_auth - SWITCH", m_id); |
| m_host_state = LOG_COLL_HOST_AUTH; |
| return read_start(); |
| |
| case LOG_COLL_EVENT_READ_COMPLETE: |
| Debug("log-coll", "[%d]host::host_auth - READ_COMPLETE", m_id); |
| { |
| // compare authorization secrets |
| ink_assert(m_read_buffer != NULL); |
| int diff = strncmp(m_read_buffer, Log::config->collation_secret, |
| m_read_bytes_received); |
| delete[]m_read_buffer; |
| m_read_buffer = 0; |
| if (!diff) { |
| Debug("log-coll", "[%d]host::host_auth - authenticated!", m_id); |
| return host_recv(LOG_COLL_EVENT_SWITCH, NULL); |
| } else { |
| Debug("log-coll", "[%d]host::host_auth - authenticated failed!", m_id); |
| Note("[log-coll] authentication failed [%d.%d.%d.%d:%d]", |
| ((unsigned char *) (&m_client_ip))[0], |
| ((unsigned char *) (&m_client_ip))[1], |
| ((unsigned char *) (&m_client_ip))[2], ((unsigned char *) (&m_client_ip))[3], m_client_port); |
| return host_done(LOG_COLL_EVENT_SWITCH, NULL); |
| } |
| |
| } |
| |
| case LOG_COLL_EVENT_ERROR: |
| Debug("log-coll", "[%d]host::host_auth - ERROR", m_id); |
| return host_done(LOG_COLL_EVENT_SWITCH, NULL); |
| |
| default: |
| ink_assert(!"unexpected state"); |
| return EVENT_CONT; |
| |
| } |
| |
| } |
| |
| //------------------------------------------------------------------------- |
| // LogCollationHostSM::host_done |
| // next: none |
| //------------------------------------------------------------------------- |
| |
| int |
| LogCollationHostSM::host_done(int event, void *data) |
| { |
| NOWARN_UNUSED(event); |
| NOWARN_UNUSED(data); |
| Debug("log-coll", "[%d]host::host_done", m_id); |
| |
| // close connections |
| if (m_client_vc) { |
| Debug("log-coll", "[%d]host::host_done - disconnecting!", m_id); |
| m_client_vc->do_io_close(); |
| m_client_vc = 0; |
| Note("[log-coll] client disconnected [%d.%d.%d.%d:%d]", |
| ((unsigned char *) (&m_client_ip))[0], |
| ((unsigned char *) (&m_client_ip))[1], |
| ((unsigned char *) (&m_client_ip))[2], ((unsigned char *) (&m_client_ip))[3], m_client_port); |
| } |
| // free memory |
| if (m_client_buffer) { |
| if (m_client_reader) { |
| m_client_buffer->dealloc_reader(m_client_reader); |
| } |
| free_MIOBuffer(m_client_buffer); |
| } |
| // delete this state machine and return |
| delete this; |
| return EVENT_DONE; |
| |
| } |
| |
| //------------------------------------------------------------------------- |
| // LogCollationHostSM::host_init |
| // next: host_auth || host_done |
| //------------------------------------------------------------------------- |
| |
| int |
| LogCollationHostSM::host_init(int event, void *data) |
| { |
| NOWARN_UNUSED(data); |
| Debug("log-coll", "[%d]host::host_init", m_id); |
| |
| switch (event) { |
| |
| case LOG_COLL_EVENT_SWITCH: |
| m_host_state = LOG_COLL_HOST_INIT; |
| m_pending_event = eventProcessor.schedule_imm(this); |
| return EVENT_CONT; |
| |
| case EVENT_IMMEDIATE: |
| // allocate memory |
| m_client_buffer = new_MIOBuffer(); |
| ink_assert(m_client_buffer != NULL); |
| m_client_reader = m_client_buffer->alloc_reader(); |
| ink_assert(m_client_reader != NULL); |
| return host_auth(LOG_COLL_EVENT_SWITCH, NULL); |
| |
| default: |
| ink_assert(!"unexpected state"); |
| return EVENT_DONE; |
| |
| } |
| |
| } |
| |
| //------------------------------------------------------------------------- |
| // LogCollationHostSM::host_recv |
| // next: host_done || host_recv |
| //------------------------------------------------------------------------- |
| |
| int |
| LogCollationHostSM::host_recv(int event, void *data) |
| { |
| NOWARN_UNUSED(data); |
| Debug("log-coll", "[%d]host::host_recv", m_id); |
| |
| switch (event) { |
| |
| case LOG_COLL_EVENT_SWITCH: |
| Debug("log-coll", "[%d]host::host_recv - SWITCH", m_id); |
| m_host_state = LOG_COLL_HOST_RECV; |
| return read_start(); |
| |
| case LOG_COLL_EVENT_READ_COMPLETE: |
| Debug("log-coll", "[%d]host::host_recv - READ_COMPLETE", m_id); |
| { |
| // grab the log_buffer |
| LogBufferHeader *log_buffer_header; |
| LogBuffer *log_buffer; |
| LogFormat *log_format; |
| LogObject *log_object; |
| unsigned version; |
| |
| ink_assert(m_read_buffer != NULL); |
| ink_assert(m_read_bytes_received >= (int64_t)sizeof(LogBufferHeader)); |
| log_buffer_header = (LogBufferHeader *) m_read_buffer; |
| |
| // convert the buffer we just received to host order |
| LogBuffer::convert_to_host_order(log_buffer_header); |
| |
| version = log_buffer_header->version; |
| if (version != LOG_SEGMENT_VERSION) { |
| Note("[log-coll] invalid LogBuffer received; invalid version - " |
| "buffer = %u, current = %u", version, LOG_SEGMENT_VERSION); |
| delete[]m_read_buffer; |
| |
| } else { |
| log_object = Log::match_logobject(log_buffer_header); |
| if (!log_object) { |
| Note("[log-coll] LogObject not found with fieldlist id; " "writing LogBuffer to scrap file"); |
| log_object = Log::global_scrap_object; |
| } |
| log_format = log_object->m_format; |
| Debug("log-coll", "[%d]host::host_recv - using format '%s'", m_id, log_format->name()); |
| |
| // make a new LogBuffer (log_buffer_header plus subsequent |
| // buffer already converted to host order) and add it to the |
| // object's flush queue |
| // |
| log_buffer = NEW(new LogBuffer(log_object, log_buffer_header)); |
| log_object->add_to_flush_queue(log_buffer); |
| ink_mutex_acquire(&Log::flush_mutex); |
| Log::flush_counter++; |
| ink_cond_signal(&Log::flush_cond); |
| ink_mutex_release(&Log::flush_mutex); |
| } |
| |
| #if defined(LOG_BUFFER_TRACKING) |
| Debug("log-buftrak", "[%d]host::host_recv - network read complete", log_buffer_header->id); |
| #endif // defined(LOG_BUFFER_TRACKING) |
| |
| // get ready for next read (memory may not be freed!!!) |
| m_read_buffer = 0; |
| |
| return host_recv(LOG_COLL_EVENT_SWITCH, NULL); |
| |
| } |
| |
| case LOG_COLL_EVENT_ERROR: |
| Debug("log-coll", "[%d]host::host_recv - ERROR", m_id); |
| return host_done(LOG_COLL_EVENT_SWITCH, NULL); |
| |
| default: |
| ink_assert(!"unexpected state"); |
| return EVENT_DONE; |
| |
| } |
| |
| } |
| |
| //------------------------------------------------------------------------- |
| //------------------------------------------------------------------------- |
| // |
| // read states |
| // |
| //------------------------------------------------------------------------- |
| //------------------------------------------------------------------------- |
| |
| //------------------------------------------------------------------------- |
| // LogCollationHostSM::read_start |
| // next: read_hdr |
| //------------------------------------------------------------------------- |
| |
| int |
| LogCollationHostSM::read_start() |
| { |
| |
| Debug("log-coll", "[%d]host::read_start", m_id); |
| |
| SET_HANDLER((LogCollationHostSMHandler) & LogCollationHostSM::read_handler); |
| if (m_read_buffer) { |
| ink_assert(!"m_read_buffer still points to something, doh!"); |
| } |
| return read_hdr(LOG_COLL_EVENT_SWITCH, NULL); |
| |
| } |
| |
| //------------------------------------------------------------------------- |
| // LogCollationHostSM::read_hdr |
| // next: read_body || read_done |
| //------------------------------------------------------------------------- |
| |
| int |
| LogCollationHostSM::read_hdr(int event, VIO * vio) |
| { |
| |
| Debug("log-coll", "[%d]host::read_hdr", m_id); |
| |
| switch (event) { |
| |
| case LOG_COLL_EVENT_SWITCH: |
| Debug("log-coll", "[%d]host:read_hdr - SWITCH", m_id); |
| m_read_state = LOG_COLL_READ_HDR; |
| |
| m_read_bytes_wanted = sizeof(NetMsgHeader); |
| m_read_bytes_received = 0; |
| m_read_buffer = (char *) &m_net_msg_header; |
| ink_assert(m_client_vc != NULL); |
| Debug("log-coll", "[%d]host:read_hdr - do_io_read(%d)", m_id, m_read_bytes_wanted); |
| m_client_vio = m_client_vc->do_io_read(this, m_read_bytes_wanted, m_client_buffer); |
| ink_assert(m_client_vio != NULL); |
| return EVENT_CONT; |
| |
| case VC_EVENT_IMMEDIATE: |
| Debug("log-coll", "[%d]host::read_hdr - IMMEDIATE", m_id); |
| return EVENT_CONT; |
| |
| case VC_EVENT_READ_READY: |
| Debug("log-coll", "[%d]host::read_hdr - READ_READY", m_id); |
| read_partial(vio); |
| return EVENT_CONT; |
| |
| case VC_EVENT_READ_COMPLETE: |
| Debug("log-coll", "[%d]host::read_hdr - READ_COMPLETE", m_id); |
| read_partial(vio); |
| ink_assert(m_read_bytes_wanted == m_read_bytes_received); |
| return read_body(LOG_COLL_EVENT_SWITCH, NULL); |
| |
| case VC_EVENT_EOS: |
| case VC_EVENT_ERROR: |
| Debug("log-coll", "[%d]host::read_hdr - EOS|ERROR", m_id); |
| return read_done(LOG_COLL_EVENT_ERROR, NULL); |
| |
| default: |
| Debug("log-coll", "[%d]host::read_hdr - default %d", m_id, event); |
| return read_done(LOG_COLL_EVENT_ERROR, NULL); |
| |
| } |
| |
| } |
| |
| //------------------------------------------------------------------------- |
| // LogCollationHostSM::read_body |
| // next: read_body || read_done |
| //------------------------------------------------------------------------- |
| |
| int |
| LogCollationHostSM::read_body(int event, VIO * vio) |
| { |
| |
| Debug("log-coll", "[%d]host::read_body", m_id); |
| |
| switch (event) { |
| |
| case LOG_COLL_EVENT_SWITCH: |
| Debug("log-coll", "[%d]host:read_body - SWITCH", m_id); |
| m_read_state = LOG_COLL_READ_BODY; |
| |
| m_read_bytes_wanted = ntohl(m_net_msg_header.htonl_size); |
| ink_assert(m_read_bytes_wanted > 0); |
| m_read_bytes_received = 0; |
| m_read_buffer = new char[m_read_bytes_wanted]; |
| ink_assert(m_read_buffer != NULL); |
| ink_assert(m_client_vc != NULL); |
| Debug("log-coll", "[%d]host:read_body - do_io_read(%d)", m_id, m_read_bytes_wanted); |
| m_client_vio = m_client_vc->do_io_read(this, m_read_bytes_wanted, m_client_buffer); |
| ink_assert(m_client_vio != NULL); |
| return EVENT_CONT; |
| |
| case VC_EVENT_IMMEDIATE: |
| Debug("log-coll", "[%d]host::read_body - IMMEDIATE", m_id); |
| return EVENT_CONT; |
| |
| case VC_EVENT_READ_READY: |
| Debug("log-coll", "[%d]host::read_body - READ_READY", m_id); |
| read_partial(vio); |
| return EVENT_CONT; |
| |
| case VC_EVENT_READ_COMPLETE: |
| Debug("log-coll", "[%d]host::read_body - READ_COMPLETE", m_id); |
| read_partial(vio); |
| ink_assert(m_read_bytes_wanted == m_read_bytes_received); |
| return read_done(LOG_COLL_EVENT_READ_COMPLETE, NULL); |
| |
| case VC_EVENT_EOS: |
| case VC_EVENT_ERROR: |
| Debug("log-coll", "[%d]host::read_body - EOS|ERROR", m_id); |
| return read_done(LOG_COLL_EVENT_ERROR, NULL); |
| |
| default: |
| Debug("log-coll", "[%d]host::read_body - default %d", m_id, event); |
| return read_done(LOG_COLL_EVENT_ERROR, NULL); |
| |
| } |
| |
| } |
| |
| //------------------------------------------------------------------------- |
| // LogCollationHostSM::read_done |
| // next: give control back to host state-machine |
| //------------------------------------------------------------------------- |
| |
| int |
| LogCollationHostSM::read_done(int event, void *data) |
| { |
| NOWARN_UNUSED(data); |
| SET_HANDLER((LogCollationHostSMHandler) & LogCollationHostSM::host_handler); |
| return host_handler(event, NULL); |
| |
| } |
| |
| //------------------------------------------------------------------------- |
| // LogCollationHostSM::read_partial |
| //------------------------------------------------------------------------- |
| |
| void |
| LogCollationHostSM::read_partial(VIO * vio) |
| { |
| |
| // checks |
| ink_assert(vio != NULL); |
| ink_assert(vio->vc_server == m_client_vc); |
| ink_assert(m_client_buffer != NULL); |
| ink_assert(m_client_reader != NULL); |
| |
| // careful not to read more than we have memory for |
| char *p = &(m_read_buffer[m_read_bytes_received]); |
| int64_t bytes_wanted_now = m_read_bytes_wanted - m_read_bytes_received; |
| int64_t bytes_received_now = m_client_reader->read(p, bytes_wanted_now); |
| |
| m_read_bytes_received += bytes_received_now; |
| |
| // stats |
| LOG_SUM_DYN_STAT(log_stat_bytes_received_from_network_stat, bytes_received_now); |
| |
| } |