| /********************************************************************** |
| // @@@ START COPYRIGHT @@@ |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // @@@ END COPYRIGHT @@@ |
| **********************************************************************/ |
| #ifndef EX_QUEUE_H |
| #define EX_QUEUE_H |
| |
| /* -*-C++-*- |
| ****************************************************************************** |
| * |
| * File: ex_queue.h |
| * Description: Classes and structures for queues and rows |
| * |
| * |
| * Created: 5/3/94 |
| * Language: C++ |
| * |
| * |
| * |
| * |
| ****************************************************************************** |
| */ |
| |
| #include "ExpAtp.h" |
| #include "ExScheduler.h" |
| #include "ex_god.h" |
| #include "ex_ex.h" |
| |
| #ifdef _DEBUG |
| #define ExQueueAssert(a,b) ex_assert(a,b) |
| #else |
| #define ExQueueAssert(a,b) |
| #endif |
| |
| // |
| // This file defines the classes and structures that implement queues |
| // and queue entries. |
| // |
| // The classes defined in this file: |
| // |
| class ex_queue; // queue of composite rows and row state |
| class ex_queue_entry; // an entry in the queue |
| class down_state; // describes state of down-queue entry |
| class up_state; // describes state of up-queue entry |
| |
| // other classes referenced |
| class ComDiagsArea; // diagnostics area |
| class ex_tcb_private_state; |
| |
| // queue_index is a handle into a queue entry |
| // Changing queue_index to int reduces the number of instructions required |
| // to access the queue vector. Shorts actually take more instructions. |
| #include "QueueIndex.h" |
| |
| // maximum queue length |
| const queue_index maxQueueSize = 0xffffffff; |
| |
| // ------------------------------------------------------------------------- |
| // Queue |
| // |
| class ex_queue : public ExGod |
| { |
| public: |
| enum queue_type {UP_QUEUE, DOWN_QUEUE}; |
| |
| enum down_request |
| { |
| GET_N, // return N matching rows |
| GET_ALL, // return all matching rows |
| GET_NOMORE, // get no more matching rows - Cancel request |
| GET_EMPTY, // empty entry - no request |
| GET_EOD, // this request type indicates that it is the end |
| // of input requests. Used when multiple input rows |
| // that are part of the same request, are sent to an |
| // operator from 'top'.For example, insert...select request. |
| // The select rows are input to insert operator until |
| // all selected rows have been retrieved. See |
| // ExTupleFlowTcb for details. |
| |
| // When multiple user rowset inserts are to be part of the same sidetree |
| // insert request, then this indicates to eid that all rows |
| // have been sent for this particular request but sidetree insert is |
| // still in progress and should not be committed. |
| GET_EOD_NO_ST_COMMIT, |
| |
| // BertBert VV |
| // The GET_NEXT_N protocol is currently used for "cursors for delete access" because |
| // for such cursors, the application must have fetched all rows that have been deleted. |
| // Of course, this only works well if cursors stay open across transactions. |
| // The GET_NEXT_N protocol allows one to control accurately how many rows are returned |
| // (and deleted in the 'for delete access' case). |
| // The GET_ALL protocol is used for regular cursors that stay open across transactions |
| // because it is assumed that such cursors don't lock. If they do, they should use the |
| // GET_NEXT_N protocol. |
| // Each GET_NEXT_N must be matched with a Q_GET_DONE. it is the responsibility |
| // of each operator to guarantee that. |
| // One must not wait for a Q_GET_DONE before issueing a subsequent GET_NEXT_N |
| // because GET_NEXT_N is cumulative (adds to requestValue). However, none of the |
| // operators that support the GET_NEXT protocol have been upgraded to handle "asynchronous" |
| // GET_NEXTs. note that there is no synchronization in this queue class. |
| GET_NEXT_N, // Attempt to return N rows. If there aren't N to return, return less. |
| // After returning N (or less or zero) rows, return the up_status Q_GET_DONE. |
| GET_NEXT_N_MAYBE_WAIT, |
| // Attempt to return N rows. If there are none to return, block waiting for |
| // rows to appear. If there are some rows to return, return them. |
| // After returning N (or less but not zero) rows, return the up-status Q_GET_DONE. |
| // Note that only a leaf EID operator can block waiting for rows to appear, and that |
| // this only can happen if the access is for 'stream' access. For the rest, |
| // GET_NEXT_N_MAYBE_WAIT is identical to GET_NEXT_N. |
| GET_NEXT_0_MAYBE_WAIT, |
| // This command never returns any rows. If rows could be returned, return the |
| // up-status Q_GET_DONE. If no rows could be retured, block waiting for rows |
| // to appear. The command is used for streaming destructive selects on |
| // partitioned tables to check which partition we can destructively select |
| // from. |
| |
| GET_NEXT_N_SKIP, |
| GET_NEXT_N_MAYBE_WAIT_SKIP, |
| GET_N_RETURN_SKIPPED, |
| // This setting is used specifically in the case of index joins. This indicates that the |
| // right child has skipped the record due to skip conflict access and this has to be added |
| // to the delta key list in the right child.. |
| // BertBert ^^ |
| }; |
| |
| enum up_status |
| { |
| Q_NO_DATA, // no more matching data found |
| Q_OK_MMORE, // matching row returned, maybe more coming |
| Q_SQLERROR, // sql error and (optionally) matching data |
| Q_INVALID // invalid state |
| // BertBert VV |
| , |
| Q_GET_DONE, // Used for destructive scans (which always use the GET_NEXT_N protocol). |
| // Indicates that a complete set of matching rows has |
| // been returned and the client should re-issue GET_NEXT_N/GET_NEXT_N_MAYBE_WAIT |
| // to retrieve more rows. The field matchNo in the up-state contains the total |
| // number of rows requested that have been satisfied with this control tuple. |
| // Used for non-destructive scans (which always use the GET_ALL protocol). |
| // It causes the DP2-root operator to reply with a buffer of accumulated rows, |
| // eventhough the buffer might not be full yet. In this case, the field matchNo |
| // in tht up-state will be -1. |
| Q_REC_SKIPPED, // Used specifically in the case of an index join. The right child of the join |
| // returns this to indicate that this record is skipped because it was locked and skip conflict |
| // was used. |
| // BertBert ^^ |
| Q_STATS // added for runtime stats by Srinivas for handling empty SQL buffers |
| // but with stats |
| }; |
| |
| enum queueAllocType // to allocate or not to allocate, that is the question |
| { |
| ALLOC_DEFAULT, // allocate for up queue, don't allocate for down queue |
| ALLOC_ATP, // allocate the ATPs for this queue |
| ALLOC_NONE // don't allocate the ATPs for this queue |
| }; |
| |
| // Constructor for a queue |
| NA_EIDPROC |
| ex_queue(const queue_type type, |
| queue_index initialSize, |
| ex_cri_desc * criDesc, |
| CollHeap * space, |
| enum queueAllocType allocType = ALLOC_DEFAULT); |
| |
| |
| // allocate ATPs for entries that don't have one already, |
| // use pre-allocated ATPs if they are passed in |
| NA_EIDPROC |
| NABoolean allocateAtps(CollHeap * space, |
| atp_struct * atps = NULL, |
| queue_index numNewAtps = 0, |
| Int32 atpSize = 0, |
| NABoolean failureIsFatal = TRUE); |
| |
| // allocate PSTATE objects for all queue entries, pass in a single |
| // PSTATE as a sample (deprecated method, use the other method that |
| // doesn't need the pstate to be passed in and that supports |
| // preallocated pstates and allocation as an array) |
| NA_EIDPROC |
| void allocatePstate(ex_tcb_private_state *p, |
| ex_tcb * tcb); |
| // allocate PSTATE objects for queue entries that don't have a |
| // PSTATE yet, using an array of pre-allocated PSTATEs if passed in |
| NA_EIDPROC |
| NABoolean allocatePstate(ex_tcb * tcb, |
| ex_tcb_private_state *pstates = NULL, |
| queue_index numNewPstates = 0, |
| Lng32 pstateLength = 0, |
| NABoolean failureIsFatal = TRUE); |
| |
| NA_EIDPROC |
| void deallocateAtps(); |
| NA_EIDPROC |
| void deallocatePstate(); |
| |
| // does this queue need its own ATPs? |
| NA_EIDPROC |
| Int32 needsAtps() const { return needsAtps_; } |
| |
| // resize a queue (including the ATPs and the pstates), return new size |
| NA_EIDPROC |
| NABoolean needsResize() const { return needsResize_; } |
| NA_EIDPROC |
| Lng32 getResizeLimit() const { return (Lng32) resizeLimit_; } |
| NA_EIDPROC |
| void setResizeLimit(Lng32 l) { resizeLimit_ = (ULng32) l; } |
| NA_EIDPROC |
| queue_index resize(ex_tcb * tcb, |
| queue_index newSize); |
| |
| NA_EIDPROC |
| ex_cri_desc * getCriDesc(); |
| |
| // destructor |
| NA_EIDPROC |
| ~ex_queue(); |
| |
| // |
| // Insert into the queue methods |
| // |
| // The following two methods are called only by the node |
| // that inserts into the queue. The parent inserts in the down |
| // queue and the child inserts in the up queue |
| // |
| // gets address/index of empty queue_entry to modify it |
| // (the entry is not yet inserted into the queue) |
| // |
| NA_EIDPROC ex_queue_entry * getTailEntry() const; |
| NA_EIDPROC |
| queue_index getTailIndex() const; |
| |
| // next queue_entry now becomes tail_ |
| NA_EIDPROC |
| void insert(); |
| |
| // |
| // Cancel a down request already inserted into the queue; cancel |
| // a request with a given value for parentIndex. |
| // |
| NA_EIDPROC void cancelRequest(); |
| NA_EIDPROC |
| void cancelRequest(const queue_index i); |
| NA_EIDPROC void cancelRequestWithParentIndex(queue_index pindex); |
| NA_EIDPROC void cancelRequestWithParentIndexRange(queue_index startPI, queue_index endPI); |
| |
| //?johannes? |
| NA_EIDPROC void getNextNRequestInit(const Lng32 tuples); |
| NA_EIDPROC void getNextNRequest(const Lng32 tuples); |
| |
| NA_EIDPROC void getNextNMaybeWaitRequestInit(const Lng32 tuples); |
| NA_EIDPROC void getNextNMaybeWaitRequest(const Lng32 tuples); |
| |
| NA_EIDPROC void getNextNSkipRequestInit(const Lng32 tuples); |
| NA_EIDPROC void getNextNSkipRequest(const Lng32 tuples); |
| |
| NA_EIDPROC void getNextNMaybeWaitSkipRequestInit(const Lng32 tuples); |
| NA_EIDPROC void getNextNMaybeWaitSkipRequest(const Lng32 tuples); |
| |
| NA_EIDPROC void getNext0MaybeWaitRequest(void); |
| NA_EIDPROC void getNext0MaybeWaitRequestInit(void); |
| //?johannes?? |
| |
| // |
| // Remove from the queue methods |
| // |
| // The following two methods are called only by the node |
| // that reads and removes from the queue |
| // |
| // gets address of i-th queue-entry |
| // |
| NA_EIDPROC |
| ex_queue_entry * getQueueEntry(const queue_index i) const; |
| NA_EIDPROC |
| ex_queue_entry * getHeadEntry() const; |
| NA_EIDPROC |
| queue_index getHeadIndex() const; |
| |
| // check whether a certain entry exists (used to loop through entries) |
| NA_EIDPROC |
| inline Int32 entryExists(queue_index ix) const; |
| |
| void handleFullQueue(); |
| void handleEmptyQueue(); |
| |
| // removes the head and it becomes empty; |
| NA_EIDPROC |
| #ifndef _DEBUG |
| inline |
| #endif |
| //#pragma nowarn(734) |
| void removeHead(); |
| //#pragma warn(734) |
| |
| // soln 10-040111-2308 start |
| // removes the head entry. Same as removeHead except that it doesn't |
| // any queue resizing logic. |
| // To be used to remove head entries ONLY after a cancel request has been |
| // sent by the operator. This method removes head entries without initiating |
| // the q resizing logic. This is used to handle special cases like cancelling |
| // a request after a GET_N is satisfied.After this removal of entries from |
| // the queue is just looking for a Q_NO_DATA. Hence these shouldnt be |
| // considered for qresizing as the query has been satisfied. |
| |
| NA_EIDPROC |
| void deleteHeadEntry(); |
| // soln 10-040111-2308 end |
| |
| // queue the size of the queue and the number of entries in it. |
| NA_EIDPROC |
| queue_index getSize() const; //allocated size |
| NA_EIDPROC |
| queue_index inline getLength() const; //occupied entries between 0 and size-1 |
| NA_EIDPROC |
| Int32 inline isFull() const; // returns true if queue is full |
| NA_EIDPROC |
| Int32 inline isEmpty() const; // returns true if queue is full |
| |
| NA_EIDPROC inline void setInsertSubtask(ExSubtask *insertSubtask) |
| { insertSubtask_ = insertSubtask; } |
| NA_EIDPROC inline void setUnblockSubtask(ExSubtask *unblockSubtask) |
| { unblockSubtask_ = unblockSubtask; } |
| // warning elimination (removed "inline") |
| NA_EIDPROC void setCancelSubtask(ExSubtask *cancelSubtask) |
| { ExQueueAssert(upDown_ == DOWN_QUEUE,""); cancelSubtask_ = cancelSubtask; } |
| NA_EIDPROC inline void setResizeSubtask(ExSubtask *resizeSubtask) |
| { resizeSubtask_ = resizeSubtask; } |
| |
| //?johannes? |
| // warning elimination (removed "inline") |
| NA_EIDPROC void setNextSubtask(ExSubtask *nextSubtask) |
| { ExQueueAssert(upDown_ == DOWN_QUEUE,""); nextSubtask_ = nextSubtask; } |
| //?johannes?? |
| |
| |
| private: |
| |
| const queue_type upDown_; |
| queue_index size_; |
| queue_index head_; |
| queue_index tail_; |
| |
| // Using a bit mask is faster than masking with (size_ - 1). |
| // |
| queue_index mask_; |
| |
| const unsigned short numTuples_; // number of tuple pointers in each atp |
| |
| // queue cri descriptors |
| ex_cri_desc * criDesc_; |
| |
| // are the ATPs allocated and owned by the queue? |
| NABoolean atpsAllocated_; |
| |
| ex_queue_entry * queue_; // pointer to (array of) queue entries |
| |
| // Subtasks associated with the queue |
| ExSubtask *insertSubtask_; |
| ExSubtask *unblockSubtask_; |
| ExSubtask *cancelSubtask_; |
| ExSubtask *resizeSubtask_; |
| |
| // Data members for dynamic resizing of queues: resizePoints_ |
| // keeps a weighted measure based on empty/full transitions |
| // of how much the queue needs resizing. queueWasFull_ indicates |
| // the previous state of the queue (empty or full). Once |
| // resizePoints_ reaches resizeLimit_ it's time to trigger the |
| // resize subtask of the queue. |
| ULng32 resizePoints_; |
| NABoolean queueWasFull_; |
| NABoolean needsResize_; |
| ULng32 resizeLimit_; |
| |
| // indiciates whether this queue needs ATPs and PSTATEs to be allocated |
| NABoolean needsPstates_; |
| NABoolean needsAtps_; |
| |
| //?johannes? |
| ExSubtask *nextSubtask_; |
| //?johannes?? |
| |
| // is queue entry i vacant? |
| NA_EIDPROC inline Int32 isVacant(const queue_index i) const; |
| |
| NA_EIDPROC void injectErrorOrCancel(); |
| |
| // for debuggin' |
| |
| NA_EIDPROC |
| |
| void logHeader(); |
| |
| NA_EIDPROC |
| void logCancel(queue_index i) |
| { |
| #if 0 |
| #if (!defined (__EID) && !defined(NA_C89)) |
| logHeader(); |
| cerr << " cancel " << i << " (" << (i & (size_-1)) << ")" << endl; |
| #endif |
| #endif |
| } |
| |
| |
| NA_EIDPROC |
| void logInsert() |
| { |
| #if 0 |
| #if (!defined (__EID) && !defined(NA_C89)) |
| |
| ex_queue_entry *qe = &queue_[tail_ & (size_ - 1)]; |
| |
| logHeader(); |
| cerr << " insert "; |
| |
| if (upDown_ == UP_QUEUE) |
| { |
| switch (qe->upState.status) |
| { |
| case Q_OK_MMORE: |
| cerr << "Q_OK_MMORE"; |
| break; |
| case Q_SQLERROR: |
| cerr << "Q_SQLERROR"; |
| break; |
| case Q_NO_DATA: |
| cerr << "Q_NO_DATA"; |
| break; |
| default: |
| cerr << "unknown"; |
| break; |
| } |
| cerr << ", pi " << qe->upState.parentIndex; |
| } |
| else |
| { |
| switch (qe->downState.request) |
| { |
| case GET_ALL: |
| cerr << "GET_ALL"; |
| break; |
| case GET_N: |
| cerr << "GET_N"; |
| break; |
| case GET_EOD: |
| cerr << "GET_EOD"; |
| break; |
| case GET_NOMORE: |
| cerr << "GET_NOMORE"; |
| break; |
| default: |
| cerr << "unknown"; |
| break; |
| } |
| cerr << ", pi " << qe->downState.parentIndex; |
| } |
| cerr << endl; |
| #endif |
| #endif |
| } |
| |
| NA_EIDPROC |
| void logRemoveHead(); |
| |
| }; // ex_queue |
| |
| |
| // ------------------------------------------------------------------------- |
| // |
| // Queue entry |
| // |
| // A queue entry encapsulates three things. The first describes |
| // the state of an entry, represented as an instance of class down_state |
| // for down queues, and up_state for up queues. The second is a pointer |
| // to a composite record instance. The third is a structure where the |
| // reader of the queue can keep state associated with this input record. |
| // The type/class of the record pointed to by the the second |
| // pointer depends on the type of node that is reading this queue. |
| // |
| |
| class down_state // public down status |
| { |
| public: |
| ex_queue::down_request request; |
| |
| // BertBert VV |
| // a value that is associated with 'request'. |
| // For GET_N, this is the number of rows that is requested. |
| // For GET_NEXT_N, this is the total number of rows |
| // requested across all GET_NEXT_Ns received so far (i.e. accumulative) |
| Lng32 requestValue; |
| |
| // A count of the number of GET_NEXT_Ns issued by the parent. |
| Lng32 numGetNextsIssued; |
| // BertBert ^^ |
| |
| queue_index parentIndex; |
| |
| NA_EIDPROC inline NABoolean operator==(const down_state &other) const |
| { |
| return (request == other.request) && |
| (requestValue == other.requestValue) && |
| (parentIndex == other.parentIndex) && |
| (numGetNextsIssued == other.numGetNextsIssued); |
| }; |
| }; |
| |
| // Some special values for a matchNo. |
| const Lng32 OverflowedMatchNo = -1; |
| const Lng32 GetDoneNoRowsMatchNo = -2; |
| |
| class up_state // public up state |
| { |
| friend class SqlBuffer; |
| friend class SqlBufferOlt; |
| friend class SqlBufferOltSmall; |
| |
| public: |
| NA_EIDPROC inline NABoolean operator==(const up_state &other) const |
| { |
| return (parentIndex == other.parentIndex) && |
| (status == other.status); |
| }; |
| |
| queue_index downIndex; // replying to this particular down request |
| queue_index parentIndex; // a copy of parent_index in the down state |
| ex_queue::up_status status; // type of entry (end-of-data, error, etc.) |
| |
| // accessors |
| NA_EIDPROC inline Lng32 getMatchNo(void) const; |
| NA_EIDPROC inline NABoolean getDoneNoRowsMatchNo(void) const; |
| |
| // mutators |
| NA_EIDPROC inline Lng32 setMatchNo(Int64 n); |
| NA_EIDPROC inline void setDoneNoRowsMatchNo(void); |
| |
| private: |
| // matchNo is private to handle overflow situations. There is only one |
| // consumer of matchNo where there is any possibility that matchNo has |
| // overflowed - that is ex_partn_access's workUp (and the olt variant), |
| // where matchNo is being used to return # of rows affected by an |
| // ins/upd/del. See special logic there. |
| // The only operators that create a value for matchNo that can overflow |
| // are the leaf operators in dp2. We don't care about overflow except when |
| // matchNo is used to count rows affected in an ins/upd/del. |
| // |
| // BertBert VV |
| // For Q_GET_DONE, this is the 'requestValue' that is matched by this |
| // control tuple. |
| Lng32 matchNo; // number of matches |
| // BertBert ^^ |
| }; |
| |
| class ex_queue_entry |
| { |
| friend class ex_queue; // class-key must be used when declaring a friend |
| |
| public: |
| |
| union // anonymous |
| { |
| up_state upState; |
| down_state downState; |
| }; |
| |
| ex_tcb_private_state * pstate; // private (reader's) state |
| |
| NA_EIDPROC |
| inline ex_cri_desc * criDesc() const; |
| NA_EIDPROC |
| inline unsigned short numTuples() const; |
| NA_EIDPROC |
| inline tupp & getTupp(Lng32 i) const; |
| NA_EIDPROC |
| void passAtp(const ex_queue_entry *from); |
| NA_EIDPROC |
| void passAtp(atp_struct *from); |
| NA_EIDPROC |
| inline void copyAtp(const ex_queue_entry *from); |
| NA_EIDPROC |
| inline void copyAtp(atp_struct *from); |
| NA_EIDPROC |
| inline atp_struct * getAtp() const; |
| |
| // |
| // Routines to get and set diagnostics area. |
| // |
| NA_EIDPROC |
| inline ComDiagsArea *getDiagsArea() const; |
| NA_EIDPROC |
| inline void setDiagsArea(ComDiagsArea* diagsArea); |
| |
| private: |
| |
| NA_EIDPROC |
| void initializeState (const ex_queue::queue_type type); |
| |
| // Used to check that an entry being inserted is intialized properly |
| NA_EIDPROC |
| inline Int32 checkState(const ex_queue::queue_type type) |
| { |
| return (type == ex_queue::UP_QUEUE |
| ? (upState.status != ex_queue::Q_INVALID) |
| : (downState.request != ex_queue::GET_EMPTY)); |
| } |
| |
| |
| atp_struct * atp_; // a pointer to the composite row instance |
| }; |
| |
| // |
| // queue pair. A pair of queue pointers |
| // |
| struct ex_queue_pair |
| { |
| ex_queue *down; // cris flowing down |
| ex_queue *up; // cris flowing up |
| }; |
| |
| |
| ////////////////////////////////////////////////////////////////////////////// |
| // |
| // Inline methods |
| // |
| ////////////////////////////////////////////////////////////////////////////// |
| |
| // |
| // for ex_queue_entry |
| // |
| |
| inline ex_cri_desc * ex_queue_entry::criDesc() const |
| { |
| return atp_->getCriDesc(); |
| } |
| |
| inline unsigned short ex_queue_entry::numTuples() const |
| { |
| return criDesc()->noTuples(); |
| } |
| |
| inline atp_struct * ex_queue_entry::getAtp() const |
| { |
| return atp_; |
| } |
| |
| inline tupp & ex_queue_entry::getTupp(Lng32 i) const |
| { |
| return atp_->getTupp(i); |
| } |
| |
| inline void ex_queue_entry::copyAtp(const ex_queue_entry *from) |
| { |
| atp_->copyAtp(from->getAtp()); |
| } |
| |
| inline void ex_queue_entry::copyAtp(atp_struct *from) |
| { |
| atp_->copyAtp(from); |
| } |
| |
| inline ComDiagsArea *ex_queue_entry::getDiagsArea() const |
| { |
| return atp_->getDiagsArea(); |
| } |
| |
| inline void ex_queue_entry::setDiagsArea(ComDiagsArea* diagsArea) |
| { |
| atp_->setDiagsArea(diagsArea); |
| } |
| |
| ///////////////////////////////////////////////////////////////////////////// |
| // Inline procedures -- class ex_queue |
| |
| // queue is implemented in a circular array of size "size_". |
| // |
| // head_, tail_ and all variables of type queue_index are unsigned number that |
| // wrap around from max-value to 0. To find the entry in the array that holds |
| // them compute index%size_. Since size_ is always a power of 2, a faster |
| // way to compute modulo is index & (size_-1) |
| // |
| // head_ points to the first occupied position - delete from head_ |
| // tail_ points to the next empty position - insert at tail_ |
| // head_ < tail_ (mod max unsigned number) |
| // |
| // head_ = tail_ means queue is empty |
| // head_ + size_ = tail_+1 means queue is full |
| // |
| // tail_ - head_ (mod size_) is the number of occupied entries |
| // [ mod max unsigned number ] |
| |
| inline ex_queue_entry * ex_queue::getTailEntry() const |
| { |
| return &queue_[tail_ & mask_]; //(size_ - 1)]; |
| } |
| |
| inline queue_index ex_queue::getTailIndex() const |
| { |
| return tail_; |
| } |
| |
| inline queue_index ex_queue::getLength() const |
| { |
| return tail_ - head_; |
| // note that this should even work for the very |
| // unlikely case that tail_ < head_ |
| } |
| |
| inline Int32 ex_queue::isFull() const |
| { |
| return (getLength() == mask_); |
| |
| // return (head_ + size_) == (tail_ + (queue_index)1); |
| } |
| |
| inline Int32 ex_queue::isEmpty() const |
| { |
| return head_ == tail_; |
| } |
| |
| inline Int32 ex_queue::isVacant(const queue_index i) const |
| { |
| // index has to be between head_ (inclusive) and tail_ (exclusive) |
| // to be not vacant if tail_ is less than head_ it means the queue |
| // indices are wrapping around the values an unsigned int can hold |
| |
| if (head_<=tail_) |
| return !((head_ <= i) && (i < tail_)); |
| else |
| return !((head_ <= i) || (i < tail_)); |
| } |
| |
| //#pragma nowarn(734) |
| #ifndef _DEBUG |
| inline void ex_queue::removeHead() |
| { |
| // make sure the queue is not empty |
| ExQueueAssert(!isEmpty(), "ex_queue::removeHead() on an empty queue"); |
| |
| ex_queue_entry *headEntry = &queue_[head_ & mask_]; //(size_ - 1)]; |
| |
| #ifdef _DEBUG |
| logRemoveHead(); |
| #endif |
| |
| if (atpsAllocated_ && headEntry->getAtp()) |
| headEntry->getAtp()->release(); |
| |
| if (isFull()) |
| handleFullQueue(); |
| else |
| { |
| head_++; // ok to wrap around |
| |
| // logic for dynamic resizing of queues |
| if (isEmpty()) |
| { |
| handleEmptyQueue(); |
| } // end of logic for dynamic resizing of queues |
| } |
| } // ex_queue::removeHead() |
| #endif |
| //#pragma warn(734) |
| |
| inline void ex_queue::cancelRequest(const queue_index i) |
| { |
| // It better be a down request |
| ExQueueAssert(upDown_ == DOWN_QUEUE, "Can't cancel up queue entries"); |
| |
| // check that the request has not been deleted |
| if (!isVacant(i)) |
| { |
| #ifdef _DEBUG |
| logCancel(i); |
| #endif |
| queue_[i & mask_].downState.request = GET_NOMORE; |
| |
| // schedule the task that handles the cancel request |
| cancelSubtask_->schedule(); |
| } |
| } |
| |
| inline void ex_queue::cancelRequest() |
| { |
| // It better be a down request |
| ExQueueAssert(upDown_ == DOWN_QUEUE, "Can't cancel up queue entries"); |
| |
| #ifdef _DEBUG |
| logCancel(head_); |
| #endif |
| |
| queue_[head_ & mask_].downState.request = GET_NOMORE; |
| |
| // schedule the task that handles the cancel request |
| cancelSubtask_->schedule(); |
| } |
| |
| |
| //?johannes? |
| |
| inline void ex_queue::getNextNRequest( const Lng32 tuples) |
| { |
| // It better be a down request |
| ExQueueAssert(upDown_ == DOWN_QUEUE, "Can't put GET_NEXT_N into up queue entries"); |
| queue_[head_ & mask_].downState.request = GET_NEXT_N; |
| |
| // increment request value by number of tuples asked for |
| queue_[head_ & mask_].downState.requestValue += tuples; |
| |
| // record that a new get next request has been issued. |
| queue_[head_ & mask_].downState.numGetNextsIssued++; |
| |
| // schedule the task that handles the GET_NEXT request |
| nextSubtask_->schedule(); |
| } |
| |
| inline void ex_queue::getNextNRequestInit( const Lng32 tuples) |
| { |
| // It better be a down request |
| ExQueueAssert(upDown_ == DOWN_QUEUE, "Can't put GET_NEXT_N into up queue entries"); |
| queue_[head_ & mask_].downState.request = GET_NEXT_N; |
| |
| // set request value by number of tuples asked for |
| queue_[head_ & mask_].downState.requestValue = tuples; |
| |
| // record that a new get next request has been issued |
| // but reset the fields. |
| queue_[head_ & mask_].downState.numGetNextsIssued = 1; |
| |
| // schedule the task that handles the GET_NEXT request |
| nextSubtask_->schedule(); |
| } |
| |
| |
| inline void ex_queue::getNextNMaybeWaitRequest( const Lng32 tuples) |
| { |
| // It better be a down request |
| ExQueueAssert(upDown_ == DOWN_QUEUE, "Can't put GET_NEXT_N_MAYBE_WAIT into up queue entries"); |
| queue_[head_ & mask_].downState.request = GET_NEXT_N_MAYBE_WAIT; |
| |
| //increment request value by number of tuples asked for |
| queue_[head_ & mask_].downState.requestValue += tuples; |
| |
| //record that a new get next request has been issued. |
| queue_[head_ & mask_].downState.numGetNextsIssued++; |
| |
| // schedule the task that handles the GET_NEXT request |
| nextSubtask_->schedule(); |
| } |
| |
| inline void ex_queue::getNextNMaybeWaitRequestInit( const Lng32 tuples) |
| { |
| // It better be a down request |
| ExQueueAssert(upDown_ == DOWN_QUEUE, "Can't put GET_NEXT_N_MAYBE_WAIT into up queue entries"); |
| queue_[head_ & mask_].downState.request = GET_NEXT_N_MAYBE_WAIT; |
| |
| // set request value by number of tuples asked for |
| queue_[head_ & mask_].downState.requestValue = tuples; |
| |
| // record that a new get next request has been issued. |
| queue_[head_ & mask_].downState.numGetNextsIssued = 1; |
| |
| // schedule the task that handles the GET_NEXT request |
| nextSubtask_->schedule(); |
| } |
| |
| inline void ex_queue::getNextNSkipRequest( const Lng32 tuples) |
| { |
| // It better be a down request |
| ExQueueAssert(upDown_ == DOWN_QUEUE, "Can't put GET_NEXT_N_SKIP into up queue entries"); |
| queue_[head_ & mask_].downState.request = GET_NEXT_N_SKIP; |
| |
| //increment request value by number of tuples asked for |
| queue_[head_ & mask_].downState.requestValue += tuples; |
| |
| //record that a new get next request has been issued. |
| queue_[head_ & mask_].downState.numGetNextsIssued++; |
| |
| // schedule the task that handles the GET_NEXT request |
| nextSubtask_->schedule(); |
| } |
| |
| inline void ex_queue::getNextNSkipRequestInit( const Lng32 tuples) |
| { |
| // It better be a down request |
| ExQueueAssert(upDown_ == DOWN_QUEUE, "Can't put GET_NEXT_N_SKIP into up queue entries"); |
| queue_[head_ & mask_].downState.request = GET_NEXT_N_SKIP; |
| |
| // set request value by number of tuples asked for |
| queue_[head_ & mask_].downState.requestValue = tuples; |
| |
| // record that a new get next request has been issued. |
| queue_[head_ & mask_].downState.numGetNextsIssued = 1; |
| |
| // schedule the task that handles the GET_NEXT request |
| nextSubtask_->schedule(); |
| } |
| |
| inline void ex_queue::getNextNMaybeWaitSkipRequest( const Lng32 tuples) |
| { |
| // It better be a down request |
| ExQueueAssert(upDown_ == DOWN_QUEUE, "Can't put GET_NEXT_N_MAYBE_WAIT_SKIP into up queue entries"); |
| queue_[head_ & mask_].downState.request = GET_NEXT_N_MAYBE_WAIT_SKIP; |
| |
| //increment request value by number of tuples asked for |
| queue_[head_ & mask_].downState.requestValue += tuples; |
| |
| //record that a new get next request has been issued. |
| queue_[head_ & mask_].downState.numGetNextsIssued++; |
| |
| // schedule the task that handles the GET_NEXT request |
| nextSubtask_->schedule(); |
| } |
| |
| inline void ex_queue::getNextNMaybeWaitSkipRequestInit( const Lng32 tuples) |
| { |
| // It better be a down request |
| ExQueueAssert(upDown_ == DOWN_QUEUE, "Can't put GET_NEXT_N_MAYBE_WAIT_SKIP into up queue entries"); |
| queue_[head_ & mask_].downState.request = GET_NEXT_N_MAYBE_WAIT_SKIP; |
| |
| // set request value by number of tuples asked for |
| queue_[head_ & mask_].downState.requestValue = tuples; |
| |
| // record that a new get next request has been issued. |
| queue_[head_ & mask_].downState.numGetNextsIssued = 1; |
| |
| // schedule the task that handles the GET_NEXT request |
| nextSubtask_->schedule(); |
| } |
| |
| inline void ex_queue::getNext0MaybeWaitRequest(void) |
| { |
| // It better be a down request |
| ExQueueAssert(upDown_ == DOWN_QUEUE, "Can't put GET_NEXT_0_MAYBE_WAIT into up queue entries"); |
| queue_[head_ & mask_].downState.request = GET_NEXT_0_MAYBE_WAIT; |
| |
| //record that a new get next request has been issued. |
| queue_[head_ & mask_].downState.numGetNextsIssued++; |
| |
| // schedule the task that handles the GET_NEXT request |
| nextSubtask_->schedule(); |
| } |
| |
| inline void ex_queue::getNext0MaybeWaitRequestInit(void) |
| { |
| // It better be a down request |
| ExQueueAssert(upDown_ == DOWN_QUEUE, "Can't put GET_NEXT_0_MAYBE_WAIT into up queue entries"); |
| queue_[head_ & mask_].downState.request = GET_NEXT_0_MAYBE_WAIT; |
| |
| //record that a new get next request has been issued. |
| queue_[head_ & mask_].downState.numGetNextsIssued = 1; |
| queue_[head_ & mask_].downState.requestValue = 0; |
| |
| // schedule the task that handles the GET_NEXT request |
| nextSubtask_->schedule(); |
| } |
| //?johannes?? |
| |
| inline ex_queue_entry * ex_queue::getQueueEntry(const queue_index i) const |
| { |
| ExQueueAssert( !isVacant(i), |
| "ex_queue::getQueueEntry getting an empty entry"); |
| return &queue_[i & mask_]; //(size_-1)]; |
| } |
| |
| inline ex_queue_entry * ex_queue::getHeadEntry() const |
| { |
| // make sure the head_ is not an empty entry |
| ExQueueAssert(head_ != tail_, |
| "ex_queue::getHeadEntry() get head on an empty queue"); |
| return &queue_[head_ & mask_]; //(size_ - 1)]; |
| } |
| |
| inline queue_index ex_queue::getHeadIndex() const |
| { |
| return head_; |
| } |
| |
| Int32 ex_queue::entryExists(queue_index ix) const |
| { |
| return !isVacant(ix); |
| } |
| |
| inline void ex_queue::insert() |
| { |
| // Check if the queue is full and no insert is possible |
| ExQueueAssert(!isFull(), "ex_queue::insert() Inserting into a full queue_"); |
| ExQueueAssert(queue_[tail_ & mask_].checkState(upDown_), |
| "ex_queue::insert() Inserting an empty entry into a queue"); |
| #ifdef _DEBUG |
| logInsert(); |
| #endif |
| |
| tail_++; |
| |
| #ifdef _DEBUG |
| injectErrorOrCancel(); |
| #endif |
| |
| // schedule the task that reads from the queue |
| insertSubtask_->schedule(); |
| } |
| |
| inline queue_index ex_queue::getSize() const |
| { |
| return size_; |
| } |
| |
| inline ex_cri_desc * ex_queue::getCriDesc() |
| { |
| return criDesc_; |
| }; |
| |
| inline void atp_struct::copyAtp(const ex_queue_entry *from) |
| { |
| copyAtp(from->getAtp()); |
| } |
| |
| |
| ///////////////////////////////////////////////////////////////////////////// |
| // Inline procedures -- class up_state |
| |
| inline Lng32 up_state::getMatchNo(void) const |
| { |
| return matchNo; |
| }; |
| |
| inline NABoolean up_state::getDoneNoRowsMatchNo(void) const |
| { |
| return (matchNo == GetDoneNoRowsMatchNo); |
| } |
| |
| inline Lng32 up_state::setMatchNo(Int64 n) |
| { |
| if (n <= INT_MAX) |
| matchNo = (Lng32) n; |
| else |
| matchNo = OverflowedMatchNo; |
| return matchNo; |
| } |
| |
| inline void up_state::setDoneNoRowsMatchNo(void) |
| { |
| matchNo = GetDoneNoRowsMatchNo; |
| } |
| |
| #endif |
| |
| |