tree: dddc1c35fd0578bd856b0bfd8eef3308763d409d [path history] [tgz]
  1. tests/
  2. AdmitRequestMessage.hpp
  3. BlockLocator.cpp
  4. BlockLocator.hpp
  5. BlockLocatorUtil.cpp
  6. BlockLocatorUtil.hpp
  7. CMakeLists.txt
  8. ForemanBase.hpp
  9. ForemanDistributed.cpp
  10. ForemanDistributed.hpp
  11. ForemanSingleNode.cpp
  12. ForemanSingleNode.hpp
  13. PolicyEnforcerBase.cpp
  14. PolicyEnforcerBase.hpp
  15. PolicyEnforcerDistributed.cpp
  16. PolicyEnforcerDistributed.hpp
  17. PolicyEnforcerSingleNode.cpp
  18. PolicyEnforcerSingleNode.hpp
  19. QueryContext.cpp
  20. QueryContext.hpp
  21. QueryContext.proto
  22. QueryExecutionMessages.proto
  23. QueryExecutionModule.hpp
  24. QueryExecutionState.hpp
  25. QueryExecutionTypedefs.hpp
  26. QueryExecutionUtil.hpp
  27. QueryManagerBase.cpp
  28. QueryManagerBase.hpp
  29. QueryManagerDistributed.cpp
  30. QueryManagerDistributed.hpp
  31. QueryManagerSingleNode.cpp
  32. QueryManagerSingleNode.hpp
  33. README.md
  34. Shiftboss.cpp
  35. Shiftboss.hpp
  36. ShiftbossDirectory.hpp
  37. Worker.cpp
  38. Worker.hpp
  39. WorkerDirectory.hpp
  40. WorkerMessage.hpp
  41. WorkerSelectionPolicy.hpp
  42. WorkOrderProtosContainer.hpp
  43. WorkOrdersContainer.cpp
  44. WorkOrdersContainer.hpp
  45. WorkOrderSelectionPolicy.hpp
query_execution/README.md

An Overview of Quickstep's Execution Engine

Types of threads

There are two kinds of threads in Quickstep - Foreman and Worker. The foreman thread controls the query execution progress, finds schedulable work (called as WorkOrder) and assigns (or schedules) it for execution to the Worker threads. The Worker threads receive the WorkOrders and execute them. After execution they send a completion message (or response message) back to Foreman.

High level functionality of Foreman

Foreman requests all the RelationalOperators in the physical query plan represented as a DAG to give any schedulable work (in the form of WorkOrders). While doing so, Foreman has to respect dependencies between operators. There are two kinds of dependencies between operators - pipeline breaking (or blocking) and pipeline non-breaking (or non-blocking). In the first case, the output of the producer operator can't be pipelined to the consumer operator. In the second case, the Foreman will facilitate the pipelining of the intermediate output produced by the producer operator to the consumer operator.

Messages in execution engine

WorkerMessage

There are multiple types of WorkerMessage, each of which indicates the purpose of the message.

Foreman -> Worker : WorkerMessage which consists of the following things

  • A pointer to the WorkOrder to be executed. The WorkOrder could be a normal WorkOrder or a rebuild WorkOrder. A normal WorkOrder involves the invocation of WorkOrder::execute() method which is overriden by all of the RelationalOperator classes. A rebuild WorkOrder has one StorageBlock as input and calls a rebuild() method on the block. More details about rebuild() can be found in the storage module.
  • The index of the relational operator in the query plan DAG that produced the WorkOrder.

ForemanMessage

Multiple senders are possible for this message. There are multiple types of ForemanMessages, each of which indicates the purpose of the message.

Worker -> Foreman : ForemanMessage of types WorkOrderCompletion and RebuildCompletion are sent after a Worker finishes executing a respective type of WorkOrder. This message helps the Foreman track the progress of individual operators as well as the whole query.

Some relational operators and InsertDestination -> Foreman : ForemanMessage of types DataPipeline and WorkOrdersAvailable. InsertDestination first determines when an output block of a relational operator gets full. Once a block is full, it streams the unique block ID of the filled block along with the index of the relational operator that produced the block to Foreman with the message type DataPipeline. Some operators which modify the block in place also send similar messages to Foreman.

FeedbackMessage

This message is sent from Workers to the Foreman during a WorkOrder execution.

In certain operators, e.g. TextScan (used for bulk loading data from text files) and Sort, there is a communication between the relational operator and its WorkOrders. In such cases, when a WorkOrder is under execution on a Worker thread, a FeedbackMessage is sent from the WorkOrder via the Worker to Foreman. Foreman relays this message to the relational operator that produced the sender WorkOrder. The relational operator uses this message to update its internal state to potentially generate newer WorkOrders.

PoisonMessage

This message is used to terminate a thread (i.e., Foreman and Worker), typically when shutting down the Quickstep process.

How does the Foreman react after receiving various messages?

WorkOrder completion message

  • Update the book-keeping of pending WorkOrders per Worker and per operator.
  • Fetch new WorkOrders if available for the operator of whose WorkOrder was just executed.
  • Update the state of an operator - the possible options are:
    • Normal WorkOrders are still under execution
    • All normal WorkOrders have finished execution and rebuild WorkOrders are yet to be generated.
    • All normal WorkOrders have finished execution, rebuild WorkOrders have been generated and issued to Workers.
    • All normal and rebuild WorkOrders have been executed AND all the dependency operators for the given operator have finished execution, therefore the given operator has finished its execution.
  • Fetch the WorkOrders from the dependents of the given operator.

Rebuild WorkOrder completion message

  • Update the book-keeping of pending WorkOrders per Worker and per operator.
  • If all the rebuild WorkOrders have finished their execution, try to fetch the WorkOrders of the dependent operators of the operator whose rebuild WorkOrder was just executed.

Data pipeline message

  • Find the consumer operators (i.e. operators which have a non pipeline-breaking link) of the producer operator.
  • Stream the block ID to the eligible consumer operators.
  • Fetch new WorkOrders from these consumer operators which may have become available because of the streaming of data.

WorkOrder available message

  • Fetch new WorkOrders that may have become available.

Feedback message

  • Relay the feedback message to a specified relational operator. The recipient operator is specified in the header of the message.

Example

We look at a sample query to better describe the flow of messages -

SELECT R.a, S.b from R, S where R.a = S.a and R.c < 20;

This is an equi-join query which can be implemented using a hash join. We assume that S is a larger relation and the build relation is the output of the selection on R.

The query execution plan involves the following operators:

  • SelectOperator to filter R based on predicate R.c < 20 (We call the output as R')
  • BuildHashOperator to construct a hash table on R'
  • HashJoinOperator to probe the hash table, where the probe relation is S
  • DestroyHashTableOperator to destroy the hash table after the join is done
  • Multiple DropTableOperators to destroy the temporaray relations produced as output.

R has two blocks with IDs as 1 and 2. S has two blocks with IDs as 3 and 4. We assume that the SelectOperator produces one filled block and one partially filled block as output. Note that in the query plan DAG, the link between SelectOperator and BuildHashOperator allows streaming of data. The HashJoinOperator‘s WorkOrder can’t be generated unless all of the BuildHashOperator's WorkOrders have finished their execution. The execution is assumed to be performed by a single Worker thread.

The following table describes the message exchange that happens during the query excution. We primarily focus on three operators - Select, BuildHash and HashJoin (probe).

SenderReceiverMessageMessage Description
ForemanWorkerWorkerMessage of type kWorkOrderMessageSelectWorkOrder on block 1.
InsertDestinationForemanForemanMessage of type kDataPipelineSelectWorkOrder on block 1 produced one fully filled block as output. The output block ID as pipelined from the InsertDestination to Foreman. Foreman relays this block ID to BuildHashOperator, which generates a WorkOrder which is ready to be scheduled.
WorkerForemanForemanMessage of type kWorkOrderCompletionSelectWorkOrder on block 1 completed.
ForemanWorkerWorkerMessage of type kWorkOrderMessageSelectWorkOrder on block 2.
WorkerForemanForemanMessage of type kWorkOrderCompletionSelectWorkOrder on block 2 completed. As a result of this execution, a partially filled block of output was produced.
ForemanWorkerWorkerMessage of type kWorkOrderMessageBuildHashWorkOrder on the fully filled block of R'
WorkerForemanForemanMessage of type kWorkOrderCompletionBuildHashWorkOrder execution complete.
ForemanWorkerWorkerMessage of type kWorkOrderMessageBuildHashWorkOrder on the partially filled block of R'
WorkerForemanForemanMessage of type kWorkOrderCompletionBuildHashWorkOrder execution complete.
ForemanWorkerWorkerMessage of type kWorkOrderMessageHashJoinWorkOrder for block 3 from S
WorkerForemanForemanMessage of type kWorkOrderCompletionHashJoinWorkOrder execution complete.
ForemanWorkerWorkerMessage of type kWorkOrderMessageHashJoinWorkOrder for block 4 from S
WorkerForemanForemanMessage of type kWorkOrderCompletionHashJoinWorkOrder execution complete.