| /** @file |
| |
| A brief file description |
| |
| @section license License |
| |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| |
| @section thoughts Transform thoughts |
| |
| - Must be able to handle a chain of transformations. |
| - Any transformation in the chain may fail. |
| Failure options: |
| - abort the client (if transformed data already sent) |
| - serve the client the untransformed document |
| - remove the failing transformation from the chain and attempt the transformation again (difficult to do) |
| - never send untransformed document to client if client would not understand it (e.g. a set top box) |
| - Must be able to change response header fields up until the point that TRANSFORM_READ_READY is sent to the user. |
| |
| @section usage Transform usage |
| |
| -# transformProcessor.open (cont, hooks); - returns "tvc", a TransformVConnection if 'hooks != NULL' |
| -# tvc->do_io_write (cont, nbytes, buffer1); |
| -# cont->handleEvent (TRANSFORM_READ_READY, NULL); |
| -# tvc->do_io_read (cont, nbytes, buffer2); |
| -# tvc->do_io_close (); |
| |
| @section visualization Transform visualization |
| |
| @verbatim |
| +----+ +----+ +----+ +----+ |
| -IB->| T1 |-B1->| T2 |-B2->| T3 |-B3->| T4 |-OB-> |
| +----+ +----+ +----+ +----+ |
| @endverbatim |
| |
| Data flows into the first transform in the form of the buffer |
| passed to TransformVConnection::do_io_write (IB). Data flows |
| out of the last transform in the form of the buffer passed to |
| TransformVConnection::do_io_read (OB). Between each transformation is |
| another buffer (B1, B2 and B3). |
| |
| A transformation is a Continuation. The continuation is called with the |
| event TRANSFORM_IO_WRITE to initialize the write and TRANSFORM_IO_READ |
| to initialize the read. |
| |
| */ |
| |
| #include "ProxyConfig.h" |
| #include "P_Net.h" |
| #include "TransformInternal.h" |
| #include "HdrUtils.h" |
| #include "Log.h" |
| |
| TransformProcessor transformProcessor; |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| void |
| TransformProcessor::start() |
| { |
| } |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| VConnection * |
| TransformProcessor::open(Continuation *cont, APIHook *hooks) |
| { |
| if (hooks) { |
| return new TransformVConnection(cont, hooks); |
| } else { |
| return nullptr; |
| } |
| } |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| INKVConnInternal * |
| TransformProcessor::null_transform(ProxyMutex *mutex) |
| { |
| return new NullTransform(mutex); |
| } |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| INKVConnInternal * |
| TransformProcessor::range_transform(ProxyMutex *mut, RangeRecord *ranges, int num_fields, HTTPHdr *transform_resp, |
| const char *content_type, int content_type_len, int64_t content_length) |
| { |
| RangeTransform *range_transform = |
| new RangeTransform(mut, ranges, num_fields, transform_resp, content_type, content_type_len, content_length); |
| return range_transform; |
| } |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| TransformTerminus::TransformTerminus(TransformVConnection *tvc) |
| : VConnection(tvc->mutex), |
| m_tvc(tvc), |
| m_read_vio(), |
| m_write_vio(), |
| m_event_count(0), |
| m_deletable(0), |
| m_closed(0), |
| m_called_user(0) |
| { |
| SET_HANDLER(&TransformTerminus::handle_event); |
| } |
| |
| #define RETRY() \ |
| if (ink_atomic_increment((int *)&m_event_count, 1) < 0) { \ |
| ink_assert(!"not reached"); \ |
| } \ |
| this_ethread()->schedule_in(this, HRTIME_MSECONDS(10)); \ |
| return 0; |
| |
| int |
| TransformTerminus::handle_event(int event, void * /* edata ATS_UNUSED */) |
| { |
| int val; |
| |
| m_deletable = ((m_closed != 0) && (m_tvc->m_closed != 0)); |
| |
| val = ink_atomic_increment(&m_event_count, -1); |
| |
| Debug("transform", "[TransformTerminus::handle_event] event_count %d", m_event_count); |
| |
| if (val <= 0) { |
| ink_assert(!"not reached"); |
| } |
| |
| m_deletable = m_deletable && (val == 1); |
| |
| if (m_closed != 0 && m_tvc->m_closed != 0) { |
| if (m_deletable) { |
| Debug("transform", "TransformVConnection destroy [0x%lx]", (long)m_tvc); |
| delete m_tvc; |
| return 0; |
| } |
| } else if (m_write_vio.op == VIO::WRITE) { |
| if (m_read_vio.op == VIO::NONE) { |
| if (!m_called_user) { |
| Debug("transform", "TransformVConnection calling user: %d %d [0x%lx] [0x%lx]", m_event_count, event, (long)m_tvc, |
| (long)m_tvc->m_cont); |
| |
| m_called_user = 1; |
| // It is our belief this is safe to pass a reference, i.e. its scope |
| // and locking ought to be safe across the lifetime of the continuation. |
| m_tvc->m_cont->handleEvent(TRANSFORM_READ_READY, (void *)&m_write_vio.nbytes); |
| } |
| } else { |
| int64_t towrite; |
| |
| MUTEX_TRY_LOCK(trylock1, m_write_vio.mutex, this_ethread()); |
| if (!trylock1.is_locked()) { |
| RETRY(); |
| } |
| |
| MUTEX_TRY_LOCK(trylock2, m_read_vio.mutex, this_ethread()); |
| if (!trylock2.is_locked()) { |
| RETRY(); |
| } |
| |
| if (m_closed != 0) { |
| return 0; |
| } |
| |
| if (m_write_vio.op == VIO::NONE) { |
| return 0; |
| } |
| |
| towrite = m_write_vio.ntodo(); |
| if (towrite > 0) { |
| if (towrite > m_write_vio.get_reader()->read_avail()) { |
| towrite = m_write_vio.get_reader()->read_avail(); |
| } |
| if (towrite > m_read_vio.ntodo()) { |
| towrite = m_read_vio.ntodo(); |
| } |
| |
| if (towrite > 0) { |
| m_read_vio.get_writer()->write(m_write_vio.get_reader(), towrite); |
| m_read_vio.ndone += towrite; |
| |
| m_write_vio.get_reader()->consume(towrite); |
| m_write_vio.ndone += towrite; |
| } |
| } |
| |
| if (m_write_vio.ntodo() > 0) { |
| if (towrite > 0) { |
| m_write_vio.cont->handleEvent(VC_EVENT_WRITE_READY, &m_write_vio); |
| } |
| } else { |
| m_write_vio.cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &m_write_vio); |
| } |
| |
| // We could have closed on the write callback |
| if (m_closed != 0 && m_tvc->m_closed != 0) { |
| return 0; |
| } |
| |
| if (m_read_vio.ntodo() > 0) { |
| if (m_write_vio.ntodo() <= 0) { |
| m_read_vio.cont->handleEvent(VC_EVENT_EOS, &m_read_vio); |
| } else if (towrite > 0) { |
| m_read_vio.cont->handleEvent(VC_EVENT_READ_READY, &m_read_vio); |
| } |
| } else { |
| m_read_vio.cont->handleEvent(VC_EVENT_READ_COMPLETE, &m_read_vio); |
| } |
| } |
| } else { |
| MUTEX_TRY_LOCK(trylock2, m_read_vio.mutex, this_ethread()); |
| if (!trylock2.is_locked()) { |
| RETRY(); |
| } |
| |
| if (m_closed != 0) { |
| // The terminus was closed, but the enclosing transform |
| // vconnection wasn't. If the terminus was aborted then we |
| // call the read_vio cont back with VC_EVENT_ERROR. If it |
| // was closed normally then we call it back with |
| // VC_EVENT_EOS. If a read operation hasn't been initiated |
| // yet and we haven't called the user back then we call |
| // the user back instead of the read_vio cont (which won't |
| // exist). |
| if (m_tvc->m_closed == 0) { |
| int ev = (m_closed == TS_VC_CLOSE_ABORT) ? VC_EVENT_ERROR : VC_EVENT_EOS; |
| |
| if (!m_called_user) { |
| m_called_user = 1; |
| m_tvc->m_cont->handleEvent(ev, &m_read_vio); |
| } else { |
| ink_assert(m_read_vio.cont != nullptr); |
| m_read_vio.cont->handleEvent(ev, &m_read_vio); |
| } |
| } |
| |
| return 0; |
| } |
| } |
| |
| return 0; |
| } |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| VIO * |
| TransformTerminus::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf) |
| { |
| m_read_vio.buffer.writer_for(buf); |
| m_read_vio.op = VIO::READ; |
| m_read_vio.set_continuation(c); |
| m_read_vio.nbytes = nbytes; |
| m_read_vio.ndone = 0; |
| m_read_vio.vc_server = this; |
| |
| if (ink_atomic_increment(&m_event_count, 1) < 0) { |
| ink_assert(!"not reached"); |
| } |
| Debug("transform", "[TransformTerminus::do_io_read] event_count %d", m_event_count); |
| |
| this_ethread()->schedule_imm_local(this); |
| |
| return &m_read_vio; |
| } |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| VIO * |
| TransformTerminus::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool owner) |
| { |
| // In the process of eliminating 'owner' mode so asserting against it |
| ink_assert(!owner); |
| m_write_vio.buffer.reader_for(buf); |
| m_write_vio.op = VIO::WRITE; |
| m_write_vio.set_continuation(c); |
| m_write_vio.nbytes = nbytes; |
| m_write_vio.ndone = 0; |
| m_write_vio.vc_server = this; |
| |
| if (ink_atomic_increment(&m_event_count, 1) < 0) { |
| ink_assert(!"not reached"); |
| } |
| Debug("transform", "[TransformTerminus::do_io_write] event_count %d", m_event_count); |
| |
| this_ethread()->schedule_imm_local(this); |
| |
| return &m_write_vio; |
| } |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| void |
| TransformTerminus::do_io_close(int error) |
| { |
| if (ink_atomic_increment(&m_event_count, 1) < 0) { |
| ink_assert(!"not reached"); |
| } |
| |
| INK_WRITE_MEMORY_BARRIER; |
| |
| if (error != -1) { |
| lerrno = error; |
| m_closed = TS_VC_CLOSE_ABORT; |
| } else { |
| m_closed = TS_VC_CLOSE_NORMAL; |
| } |
| |
| m_read_vio.op = VIO::NONE; |
| m_read_vio.buffer.clear(); |
| |
| m_write_vio.op = VIO::NONE; |
| m_write_vio.buffer.clear(); |
| |
| this_ethread()->schedule_imm_local(this); |
| } |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| void |
| TransformTerminus::do_io_shutdown(ShutdownHowTo_t howto) |
| { |
| if ((howto == IO_SHUTDOWN_READ) || (howto == IO_SHUTDOWN_READWRITE)) { |
| m_read_vio.op = VIO::NONE; |
| m_read_vio.buffer.clear(); |
| } |
| |
| if ((howto == IO_SHUTDOWN_WRITE) || (howto == IO_SHUTDOWN_READWRITE)) { |
| m_write_vio.op = VIO::NONE; |
| m_write_vio.buffer.clear(); |
| } |
| } |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| void |
| TransformTerminus::reenable(VIO *vio) |
| { |
| ink_assert((vio == &m_read_vio) || (vio == &m_write_vio)); |
| |
| if (m_event_count == 0) { |
| if (ink_atomic_increment(&m_event_count, 1) < 0) { |
| ink_assert(!"not reached"); |
| } |
| Debug("transform", "[TransformTerminus::reenable] event_count %d", m_event_count); |
| this_ethread()->schedule_imm_local(this); |
| } else { |
| Debug("transform", "[TransformTerminus::reenable] skipping due to " |
| "pending events"); |
| } |
| } |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| TransformVConnection::TransformVConnection(Continuation *cont, APIHook *hooks) |
| : TransformVCChain(cont->mutex.get()), m_cont(cont), m_terminus(this), m_closed(0) |
| { |
| INKVConnInternal *xform; |
| |
| SET_HANDLER(&TransformVConnection::handle_event); |
| |
| ink_assert(hooks != nullptr); |
| |
| m_transform = hooks->m_cont; |
| while (hooks->m_link.next) { |
| xform = (INKVConnInternal *)hooks->m_cont; |
| hooks = hooks->m_link.next; |
| xform->do_io_transform(hooks->m_cont); |
| } |
| xform = (INKVConnInternal *)hooks->m_cont; |
| xform->do_io_transform(&m_terminus); |
| |
| Debug("transform", "TransformVConnection create [0x%lx]", (long)this); |
| } |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| TransformVConnection::~TransformVConnection() |
| { |
| // Clear the continuations in terminus VConnections so that |
| // mutex's get released (INKqa05596) |
| m_terminus.m_read_vio.set_continuation(nullptr); |
| m_terminus.m_write_vio.set_continuation(nullptr); |
| m_terminus.mutex = nullptr; |
| this->mutex = nullptr; |
| } |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| int |
| TransformVConnection::handle_event(int /* event ATS_UNUSED */, void * /* edata ATS_UNUSED */) |
| { |
| ink_assert(!"not reached"); |
| return 0; |
| } |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| VIO * |
| TransformVConnection::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf) |
| { |
| Debug("transform", "TransformVConnection do_io_read: 0x%lx [0x%lx]", (long)c, (long)this); |
| |
| return m_terminus.do_io_read(c, nbytes, buf); |
| } |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| VIO * |
| TransformVConnection::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool /* owner ATS_UNUSED */) |
| { |
| Debug("transform", "TransformVConnection do_io_write: 0x%lx [0x%lx]", (long)c, (long)this); |
| |
| return m_transform->do_io_write(c, nbytes, buf); |
| } |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| void |
| TransformVConnection::do_io_close(int error) |
| { |
| Debug("transform", "TransformVConnection do_io_close: %d [0x%lx]", error, (long)this); |
| |
| if (m_closed != 0) { |
| return; |
| } |
| |
| if (error != -1) { |
| m_closed = TS_VC_CLOSE_ABORT; |
| } else { |
| m_closed = TS_VC_CLOSE_NORMAL; |
| } |
| |
| m_transform->do_io_close(error); |
| m_transform = nullptr; |
| } |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| void |
| TransformVConnection::do_io_shutdown(ShutdownHowTo_t howto) |
| { |
| ink_assert(howto == IO_SHUTDOWN_WRITE); |
| |
| Debug("transform", "TransformVConnection do_io_shutdown: %d [0x%lx]", howto, (long)this); |
| |
| m_transform->do_io_shutdown(howto); |
| } |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| void |
| TransformVConnection::reenable(VIO * /* vio ATS_UNUSED */) |
| { |
| ink_assert(!"not reached"); |
| } |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| uint64_t |
| TransformVConnection::backlog(uint64_t limit) |
| { |
| uint64_t b = 0; // backlog |
| VConnection *raw_vc = m_transform; |
| MIOBuffer *w; |
| while (raw_vc && raw_vc != &m_terminus) { |
| INKVConnInternal *vc = static_cast<INKVConnInternal *>(raw_vc); |
| if (nullptr != (w = vc->m_read_vio.buffer.writer())) { |
| b += w->max_read_avail(); |
| } |
| if (b >= limit) { |
| return b; |
| } |
| raw_vc = vc->m_output_vc; |
| } |
| if (nullptr != (w = m_terminus.m_read_vio.buffer.writer())) { |
| b += w->max_read_avail(); |
| } |
| if (b >= limit) { |
| return b; |
| } |
| |
| IOBufferReader *r = m_terminus.m_write_vio.get_reader(); |
| if (r) { |
| b += r->read_avail(); |
| } |
| return b; |
| } |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| TransformControl::TransformControl() : Continuation(new_ProxyMutex()), m_hooks() |
| { |
| SET_HANDLER(&TransformControl::handle_event); |
| |
| m_hooks.append(transformProcessor.null_transform(new_ProxyMutex())); |
| } |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| int |
| TransformControl::handle_event(int event, void * /* edata ATS_UNUSED */) |
| { |
| switch (event) { |
| case EVENT_IMMEDIATE: { |
| char *s, *e; |
| |
| ink_assert(m_tvc == nullptr); |
| if (http_global_hooks && http_global_hooks->get(TS_HTTP_RESPONSE_TRANSFORM_HOOK)) { |
| m_tvc = transformProcessor.open(this, http_global_hooks->get(TS_HTTP_RESPONSE_TRANSFORM_HOOK)); |
| } else { |
| m_tvc = transformProcessor.open(this, m_hooks.head()); |
| } |
| ink_assert(m_tvc != nullptr); |
| |
| m_write_buf = new_MIOBuffer(BUFFER_SIZE_INDEX_32K); |
| s = m_write_buf->end(); |
| e = m_write_buf->buf_end(); |
| |
| memset(s, 'a', e - s); |
| m_write_buf->fill(e - s); |
| |
| m_tvc->do_io_write(this, 4 * 1024, m_write_buf->alloc_reader()); |
| break; |
| } |
| |
| case TRANSFORM_READ_READY: { |
| MIOBuffer *buf = new_empty_MIOBuffer(BUFFER_SIZE_INDEX_32K); |
| |
| m_read_buf = buf->alloc_reader(); |
| m_tvc->do_io_read(this, INT64_MAX, buf); |
| break; |
| } |
| |
| case VC_EVENT_READ_COMPLETE: |
| case VC_EVENT_EOS: |
| m_tvc->do_io_close(); |
| |
| free_MIOBuffer(m_read_buf->mbuf); |
| m_read_buf = nullptr; |
| |
| free_MIOBuffer(m_write_buf); |
| m_write_buf = nullptr; |
| break; |
| |
| case VC_EVENT_WRITE_COMPLETE: |
| break; |
| |
| default: |
| ink_assert(!"not reached"); |
| break; |
| } |
| |
| return 0; |
| } |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| NullTransform::NullTransform(ProxyMutex *_mutex) |
| : INKVConnInternal(nullptr, reinterpret_cast<TSMutex>(_mutex)), |
| m_output_buf(nullptr), |
| m_output_reader(nullptr), |
| m_output_vio(nullptr) |
| { |
| SET_HANDLER(&NullTransform::handle_event); |
| |
| Debug("transform", "NullTransform create [0x%lx]", (long)this); |
| } |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| NullTransform::~NullTransform() |
| { |
| if (m_output_buf) { |
| free_MIOBuffer(m_output_buf); |
| } |
| } |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| int |
| NullTransform::handle_event(int event, void *edata) |
| { |
| handle_event_count(event); |
| |
| Debug("transform", "[NullTransform::handle_event] event count %d", m_event_count); |
| |
| if (m_closed) { |
| if (m_deletable) { |
| Debug("transform", "NullTransform destroy: %" PRId64 " [%p]", m_output_vio ? m_output_vio->ndone : 0, this); |
| delete this; |
| } |
| } else { |
| switch (event) { |
| case VC_EVENT_ERROR: |
| m_write_vio.cont->handleEvent(VC_EVENT_ERROR, &m_write_vio); |
| break; |
| case VC_EVENT_WRITE_COMPLETE: |
| ink_assert(m_output_vio == (VIO *)edata); |
| |
| // The write to the output vconnection completed. This |
| // could only be the case if the data being fed into us |
| // has also completed. |
| ink_assert(m_write_vio.ntodo() == 0); |
| |
| m_output_vc->do_io_shutdown(IO_SHUTDOWN_WRITE); |
| break; |
| case VC_EVENT_WRITE_READY: |
| default: { |
| int64_t towrite; |
| int64_t avail; |
| |
| ink_assert(m_output_vc != nullptr); |
| |
| if (!m_output_vio) { |
| m_output_buf = new_empty_MIOBuffer(BUFFER_SIZE_INDEX_32K); |
| m_output_reader = m_output_buf->alloc_reader(); |
| m_output_vio = m_output_vc->do_io_write(this, m_write_vio.nbytes, m_output_reader); |
| } |
| |
| MUTEX_TRY_LOCK(trylock, m_write_vio.mutex, this_ethread()); |
| if (!trylock.is_locked()) { |
| retry(10); |
| return 0; |
| } |
| |
| if (m_closed) { |
| return 0; |
| } |
| |
| if (m_write_vio.op == VIO::NONE) { |
| m_output_vio->nbytes = m_write_vio.ndone; |
| m_output_vio->reenable(); |
| return 0; |
| } |
| |
| towrite = m_write_vio.ntodo(); |
| if (towrite > 0) { |
| avail = m_write_vio.get_reader()->read_avail(); |
| if (towrite > avail) { |
| towrite = avail; |
| } |
| |
| if (towrite > 0) { |
| Debug("transform", |
| "[NullTransform::handle_event] " |
| "writing %" PRId64 " bytes to output", |
| towrite); |
| m_output_buf->write(m_write_vio.get_reader(), towrite); |
| |
| m_write_vio.get_reader()->consume(towrite); |
| m_write_vio.ndone += towrite; |
| } |
| } |
| |
| if (m_write_vio.ntodo() > 0) { |
| if (towrite > 0) { |
| m_output_vio->reenable(); |
| m_write_vio.cont->handleEvent(VC_EVENT_WRITE_READY, &m_write_vio); |
| } |
| } else { |
| m_output_vio->nbytes = m_write_vio.ndone; |
| m_output_vio->reenable(); |
| m_write_vio.cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &m_write_vio); |
| } |
| |
| break; |
| } |
| } |
| } |
| |
| return 0; |
| } |
| |
| /*------------------------------------------------------------------------- |
| Reasons the JG transform cannot currently be a plugin: |
| a) Uses the config system |
| - Easily avoided by using the plugin.config file to pass the config |
| values as parameters to the plugin initialization routine. |
| b) Uses the stat system |
| - FIXME: should probably solve this. |
| -------------------------------------------------------------------------*/ |
| |
| /* the JG transform is now a plugin. All the JG code, |
| config variables and stats are removed from Transform.cc */ |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| #if TS_HAS_TESTS |
| void |
| TransformTest::run() |
| { |
| if (is_action_tag_set("transform_test")) { |
| this_ethread()->schedule_imm_local(new TransformControl()); |
| } |
| } |
| #endif |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| RangeTransform::RangeTransform(ProxyMutex *mut, RangeRecord *ranges, int num_fields, HTTPHdr *transform_resp, |
| const char *content_type, int content_type_len, int64_t content_length) |
| : INKVConnInternal(nullptr, reinterpret_cast<TSMutex>(mut)), |
| m_output_buf(nullptr), |
| m_output_reader(nullptr), |
| m_transform_resp(transform_resp), |
| m_output_vio(nullptr), |
| m_range_content_length(0), |
| m_num_range_fields(num_fields), |
| m_current_range(0), |
| m_content_type(content_type), |
| m_content_type_len(content_type_len), |
| m_ranges(ranges), |
| m_output_cl(content_length), |
| m_done(0) |
| { |
| SET_HANDLER(&RangeTransform::handle_event); |
| |
| m_num_chars_for_cl = num_chars_for_int(m_range_content_length); |
| Debug("http_trans", "RangeTransform creation finishes"); |
| } |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| RangeTransform::~RangeTransform() |
| { |
| if (m_output_buf) { |
| free_MIOBuffer(m_output_buf); |
| } |
| } |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| int |
| RangeTransform::handle_event(int event, void *edata) |
| { |
| handle_event_count(event); |
| |
| if (m_closed) { |
| if (m_deletable) { |
| if (m_output_vc != nullptr) { |
| Debug("http_trans", "RangeTransform destroy: %p ndone=%" PRId64, this, m_output_vio ? m_output_vio->ndone : 0); |
| } else { |
| Debug("http_trans", "RangeTransform destroy"); |
| } |
| delete this; |
| } |
| } else { |
| switch (event) { |
| case VC_EVENT_ERROR: |
| m_write_vio.cont->handleEvent(VC_EVENT_ERROR, &m_write_vio); |
| break; |
| case VC_EVENT_WRITE_COMPLETE: |
| ink_assert(m_output_vio == (VIO *)edata); |
| m_output_vc->do_io_shutdown(IO_SHUTDOWN_WRITE); |
| break; |
| case VC_EVENT_WRITE_READY: |
| default: |
| ink_assert(m_output_vc != nullptr); |
| |
| if (!m_output_vio) { |
| m_output_buf = new_empty_MIOBuffer(BUFFER_SIZE_INDEX_32K); |
| m_output_reader = m_output_buf->alloc_reader(); |
| m_output_vio = m_output_vc->do_io_write(this, m_output_cl, m_output_reader); |
| |
| change_response_header(); |
| |
| if (m_num_range_fields > 1) { |
| add_boundary(false); |
| add_sub_header(m_current_range); |
| } |
| } |
| |
| MUTEX_TRY_LOCK(trylock, m_write_vio.mutex, this_ethread()); |
| if (!trylock.is_locked()) { |
| retry(10); |
| return 0; |
| } |
| |
| if (m_closed) { |
| return 0; |
| } |
| |
| if (m_write_vio.op == VIO::NONE) { |
| m_output_vio->nbytes = m_done; |
| m_output_vio->reenable(); |
| return 0; |
| } |
| |
| transform_to_range(); |
| break; |
| } |
| } |
| |
| return 0; |
| } |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| void |
| RangeTransform::transform_to_range() |
| { |
| IOBufferReader *reader = m_write_vio.get_reader(); |
| int64_t toskip, tosend, avail; |
| const int64_t *end, *start; |
| int64_t prev_end = 0; |
| int64_t *done_byte; |
| |
| end = &m_ranges[m_current_range]._end; |
| done_byte = &m_ranges[m_current_range]._done_byte; |
| start = &m_ranges[m_current_range]._start; |
| avail = reader->read_avail(); |
| |
| while (true) { |
| if (*done_byte < (*start - 1)) { |
| toskip = *start - *done_byte - 1; |
| |
| if (toskip > avail) { |
| toskip = avail; |
| } |
| |
| if (toskip > 0) { |
| reader->consume(toskip); |
| *done_byte += toskip; |
| avail = reader->read_avail(); |
| } |
| } |
| |
| if (avail > 0) { |
| tosend = *end - *done_byte; |
| |
| if (tosend > avail) { |
| tosend = avail; |
| } |
| |
| m_output_buf->write(reader, tosend); |
| reader->consume(tosend); |
| |
| m_done += tosend; |
| *done_byte += tosend; |
| } |
| |
| if (*done_byte == *end) { |
| prev_end = *end; |
| } |
| |
| // move to next Range if done one |
| // ignore bad Range: _done_byte -1, _end -1 |
| while (*done_byte == *end) { |
| m_current_range++; |
| |
| if (m_current_range == m_num_range_fields) { |
| if (m_num_range_fields > 1) { |
| m_done += m_output_buf->write("\r\n", 2); |
| add_boundary(true); |
| } |
| |
| Debug("http_trans", "total bytes of Range response body is %" PRId64, m_done); |
| m_output_vio->nbytes = m_done; |
| m_output_vio->reenable(); |
| |
| // if we are detaching before processing all the |
| // input data, send VC_EVENT_EOS to let the upstream know |
| // that it can rely on us consuming any more data |
| int cb_event = (m_write_vio.ntodo() > 0) ? VC_EVENT_EOS : VC_EVENT_WRITE_COMPLETE; |
| m_write_vio.cont->handleEvent(cb_event, &m_write_vio); |
| return; |
| } |
| |
| end = &m_ranges[m_current_range]._end; |
| done_byte = &m_ranges[m_current_range]._done_byte; |
| start = &m_ranges[m_current_range]._start; |
| |
| // if this is a good Range |
| if (*end != -1) { |
| m_done += m_output_buf->write("\r\n", 2); |
| add_boundary(false); |
| add_sub_header(m_current_range); |
| |
| // keep this part for future support of out-of-order Range |
| // if this is NOT a sequential Range compared to the previous one - |
| // start of current Range is larger than the end of last Range, do |
| // not need to go back to the start of the IOBuffereReader. |
| // Otherwise, reset the IOBufferReader. |
| // if ( *start > prev_end ) |
| *done_byte = prev_end; |
| // else |
| // reader->reset(); |
| |
| break; |
| } |
| } |
| |
| // When we need to read and there is nothing available |
| avail = reader->read_avail(); |
| if (avail == 0) { |
| break; |
| } |
| } |
| |
| m_output_vio->reenable(); |
| m_write_vio.cont->handleEvent(VC_EVENT_WRITE_READY, &m_write_vio); |
| } |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| /* |
| * these two need be changed at the same time |
| */ |
| |
| static char bound[] = "RANGE_SEPARATOR"; |
| static char range_type[] = "multipart/byteranges; boundary=RANGE_SEPARATOR"; |
| static char cont_type[] = "Content-type: "; |
| static char cont_range[] = "Content-range: bytes "; |
| |
| void |
| RangeTransform::add_boundary(bool end) |
| { |
| m_done += m_output_buf->write("--", 2); |
| m_done += m_output_buf->write(bound, sizeof(bound) - 1); |
| |
| if (end) { |
| m_done += m_output_buf->write("--", 2); |
| } |
| |
| m_done += m_output_buf->write("\r\n", 2); |
| } |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| #define RANGE_NUMBERS_LENGTH 60 |
| |
| void |
| RangeTransform::add_sub_header(int index) |
| { |
| // this should be large enough to hold three integers! |
| char numbers[RANGE_NUMBERS_LENGTH]; |
| int len; |
| |
| m_done += m_output_buf->write(cont_type, sizeof(cont_type) - 1); |
| if (m_content_type) { |
| m_done += m_output_buf->write(m_content_type, m_content_type_len); |
| } |
| m_done += m_output_buf->write("\r\n", 2); |
| m_done += m_output_buf->write(cont_range, sizeof(cont_range) - 1); |
| |
| snprintf(numbers, sizeof(numbers), "%" PRId64 "-%" PRId64 "/%" PRId64 "", m_ranges[index]._start, m_ranges[index]._end, |
| m_output_cl); |
| len = strlen(numbers); |
| if (len < RANGE_NUMBERS_LENGTH) { |
| m_done += m_output_buf->write(numbers, len); |
| } |
| m_done += m_output_buf->write("\r\n\r\n", 4); |
| } |
| |
| /*------------------------------------------------------------------------- |
| -------------------------------------------------------------------------*/ |
| |
| /* |
| * this function changes the response header to reflect this is |
| * a Range response. |
| */ |
| |
| void |
| RangeTransform::change_response_header() |
| { |
| MIMEField *field; |
| char *reason_phrase; |
| HTTPStatus status_code; |
| |
| ink_release_assert(m_transform_resp); |
| |
| status_code = HTTP_STATUS_PARTIAL_CONTENT; |
| m_transform_resp->status_set(status_code); |
| reason_phrase = const_cast<char *>(http_hdr_reason_lookup(status_code)); |
| m_transform_resp->reason_set(reason_phrase, strlen(reason_phrase)); |
| |
| if (m_num_range_fields > 1) { |
| // set the right Content-Type for multiple entry Range |
| field = m_transform_resp->field_find(MIME_FIELD_CONTENT_TYPE, MIME_LEN_CONTENT_TYPE); |
| |
| if (field != nullptr) { |
| m_transform_resp->field_delete(MIME_FIELD_CONTENT_TYPE, MIME_LEN_CONTENT_TYPE); |
| } |
| |
| field = m_transform_resp->field_create(MIME_FIELD_CONTENT_TYPE, MIME_LEN_CONTENT_TYPE); |
| field->value_append(m_transform_resp->m_heap, m_transform_resp->m_mime, range_type, sizeof(range_type) - 1); |
| |
| m_transform_resp->field_attach(field); |
| } else { |
| char numbers[RANGE_NUMBERS_LENGTH]; |
| m_transform_resp->field_delete(MIME_FIELD_CONTENT_RANGE, MIME_LEN_CONTENT_RANGE); |
| field = m_transform_resp->field_create(MIME_FIELD_CONTENT_RANGE, MIME_LEN_CONTENT_RANGE); |
| snprintf(numbers, sizeof(numbers), "bytes %" PRId64 "-%" PRId64 "/%" PRId64, m_ranges[0]._start, m_ranges[0]._end, m_output_cl); |
| field->value_set(m_transform_resp->m_heap, m_transform_resp->m_mime, numbers, strlen(numbers)); |
| m_transform_resp->field_attach(field); |
| } |
| } |
| |
| #undef RANGE_NUMBERS_LENGTH |