Branch data Line data Source code
1 : : #pragma once 2 : : 3 : : #include <mutable/catalog/Scheduler.hpp> 4 : : #include <condition_variable> 5 : : #include <list> 6 : : #include <future> 7 : : #include <thread> 8 : : 9 : : 10 : : namespace m { 11 : : 12 : : /** This class implements a Scheduler that executes all incoming `DatabaseCommand`s serially. 13 : : * This means that there is no concurrent execution. 14 : : * Therefore, a consistent, serial execution of multiple queries is guaranteed. */ 15 : : struct SerialScheduler : Scheduler 16 : : { 17 : : private: 18 : : /** A thread-safe query plan queue. */ 19 : : struct CommandQueue 20 : : { 21 : : private: 22 : : std::list<queued_command> command_list_; 23 : : Transaction *running_transaction_; ///< the currently running transaction. Only commands by this transaction are returned. 24 : : std::mutex mutex_; 25 : : std::condition_variable has_element_; 26 : 1 : bool closed_ = false; 27 : : 28 : : public: 29 : 2 : CommandQueue() = default; 30 : 1 : ~CommandQueue() = default; 31 : : 32 : : ///> returns the next queued `ast::Command`. Returns `std::nullopt` if the queue is closed. 33 : : std::optional<queued_command> pop(); 34 : : /** Inserts the command into the queue. Internally the position in the `command_list_` is determined by the 35 : : * start time of `Transaction t`. A smaller start time is inserted before a larger start time and elements 36 : : * with the same start time are ordered in FIFO order by the time of arrival. */ 37 : : void push(Transaction &t, std::unique_ptr<ast::Command> command, Diagnostic &diag, std::promise<bool> promise); 38 : : void close(); ///< empties and closes the queue without executing the remaining `ast::Command`s. 39 : : bool is_closed(); ///< signals waiting threads that no more elements will be pushed 40 : : void stop_transaction(Transaction &t); ///< Marks `t` as no longer running. 41 : : }; 42 : : 43 : : static CommandQueue query_queue_; ///< instance of our thread-safe query queue that stores all incoming plans. 44 : : std::thread schedule_thread_; ///< the worker thread that executes all incoming queries. 45 : : 46 : : static std::atomic<int64_t> next_start_time; ///< stores the next transaction start time 47 : : 48 : : public: 49 : 1 : SerialScheduler() = default; 50 : : ~SerialScheduler(); 51 : : 52 : : std::future<bool> schedule_command(Transaction &t, std::unique_ptr<ast::Command> command, Diagnostic &diag) override; 53 : : 54 : : std::unique_ptr<Transaction> begin_transaction() override; 55 : : 56 : : bool commit(std::unique_ptr<Transaction> t) override; 57 : : 58 : : bool abort(std::unique_ptr<Transaction> t) override; 59 : : 60 : : private: 61 : : /** The method run by the worker thread `schedule_thread_`. 62 : : * While stopping, the query that is already being executed will complete its execution 63 : : * but queued queries will not be executed. */ 64 : : static void schedule_thread(); 65 : : }; 66 : : 67 : : }