LCOV - code coverage report
Current view: top level - src/catalog - SerialScheduler.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 12 105 11.4 %
Date: 2025-03-25 01:19:55 Functions: 5 18 27.8 %
Branches: 3 123 2.4 %

           Branch data     Line data    Source code
       1                 :            : #include "catalog/SerialScheduler.hpp"
       2                 :            : #include "parse/Sema.hpp"
       3                 :            : #include <mutable/mutable.hpp>
       4                 :            : 
       5                 :            : 
       6                 :            : using namespace m;
       7                 :            : 
       8                 :            : 
       9                 :          0 : std::optional<m::Scheduler::queued_command> SerialScheduler::CommandQueue::pop()
      10                 :            : {
      11                 :          0 :     std::unique_lock<std::mutex> lock(mutex_);
      12         [ #  # ]:          0 :     has_element_.wait(lock, []{
      13                 :            :         // always wake up if the queue is closed
      14         [ #  # ]:          0 :         if (query_queue_.closed_) [[unlikely]]
      15                 :          0 :             return true;
      16                 :            :         // keep waiting if the queue is empty but not closed
      17         [ #  # ]:          0 :         else if (query_queue_.command_list_.empty())
      18                 :          0 :             return false;
      19                 :            :         // wake up if there is no running_transaction_ or if the next command is from the running_transaction_
      20   [ #  #  #  # ]:          0 :         else if (not query_queue_.running_transaction_ or std::get<0>(query_queue_.command_list_.front()) == *query_queue_.running_transaction_)
      21                 :          0 :             return true;
      22                 :            :         // otherwise keep waiting
      23                 :            :         else
      24                 :          0 :             return false;
      25                 :          0 :     });
      26                 :            :     // if the queue is still empty here the queue should be closed.
      27         [ #  # ]:          0 :     if (command_list_.empty()) [[unlikely]] return std::nullopt;
      28                 :            : 
      29                 :          0 :     queued_command res = std::move(command_list_.front());
      30                 :          0 :     command_list_.pop_front();
      31                 :            : 
      32                 :            :     // if there is currently no running transaction, set the next transaction in the queue as the running transaction
      33         [ #  # ]:          0 :     if (not running_transaction_)
      34                 :          0 :         running_transaction_ = &std::get<0>(res);
      35                 :            : 
      36                 :          0 :     return {std::move(res)};
      37                 :          0 : }
      38                 :            : 
      39                 :          0 : void SerialScheduler::CommandQueue::push(Transaction &t, std::unique_ptr<ast::Command> command, Diagnostic &diag, std::promise<bool> promise)
      40                 :            : {
      41                 :          0 :     std::unique_lock<std::mutex> lock(mutex_);
      42         [ #  # ]:          0 :     if (closed_) {
      43                 :            :         /* Since the command queue is closed, no more command will be executed
      44                 :            :          * => set the promise of this newly pushed command to false right away */
      45         [ #  # ]:          0 :         promise.set_value(false);
      46                 :          0 :         return;
      47                 :            :     }
      48                 :            : 
      49                 :            :     /* Check if transaction `t` already has elements in the queue, if not then check if the transaction is the currently
      50                 :            :      * running transaction. If yes, then emplace the command in the front, otherwise emplace at the end. */
      51   [ #  #  #  # ]:          0 :     auto it = std::find_if(command_list_.rend(), command_list_.rbegin(),
      52                 :          0 :             [&t](queued_command &x){ return (t == std::get<0>(x)); }
      53                 :            :     );
      54   [ #  #  #  #  :          0 :     if (running_transaction_ and *running_transaction_ == t and it == command_list_.rend())
          #  #  #  #  #  
                      # ]
      55         [ #  # ]:          0 :         command_list_.emplace_front(t, std::move(command), diag, std::move(promise));
      56   [ #  #  #  # ]:          0 :     else if (it == command_list_.rbegin())
      57         [ #  # ]:          0 :         command_list_.emplace_back(t, std::move(command), diag, std::move(promise));
      58                 :            :     else
      59         [ #  # ]:          0 :         command_list_.emplace(++it.base(), t, std::move(command), diag, std::move(promise));
      60                 :            : 
      61         [ #  # ]:          0 :     lock.unlock();
      62                 :          0 :     has_element_.notify_one();
      63         [ #  # ]:          0 : }
      64                 :            : 
      65                 :          0 : void SerialScheduler::CommandQueue::close()
      66                 :            : {
      67                 :          0 :     std::unique_lock<std::mutex> lock(mutex_);
      68                 :          0 :     closed_ = true;
      69         [ #  # ]:          0 :     while (not command_list_.empty()) {
      70         [ #  # ]:          0 :         std::get<3>(command_list_.front()).set_value(false);
      71                 :          0 :         command_list_.pop_front();
      72                 :            :     }
      73         [ #  # ]:          0 :     lock.unlock();
      74                 :          1 :     has_element_.notify_all();
      75                 :          0 : }
      76                 :            : 
      77                 :          0 : bool SerialScheduler::CommandQueue::is_closed()
      78                 :            : {
      79                 :          0 :     std::lock_guard<std::mutex> lock(mutex_);
      80                 :          0 :     return query_queue_.closed_;
      81                 :          0 : }
      82                 :            : 
      83                 :          0 : void SerialScheduler::CommandQueue::stop_transaction(Transaction &t) {
      84                 :          0 :     std::unique_lock<std::mutex> lock(mutex_);
      85         [ #  # ]:          0 :     M_insist(&t == running_transaction_);
      86                 :          0 :     running_transaction_ = nullptr;
      87         [ #  # ]:          0 :     lock.unlock();
      88                 :          0 :     has_element_.notify_one();
      89                 :          0 : }
      90                 :            : 
      91                 :          1 : SerialScheduler::CommandQueue SerialScheduler::query_queue_;
      92                 :            : std::atomic<int64_t> SerialScheduler::next_start_time = 0;
      93                 :            : 
      94                 :          2 : SerialScheduler::~SerialScheduler()
      95                 :          2 : {
      96         [ +  - ]:          1 :     if (schedule_thread_.joinable()) {
      97         [ #  # ]:          0 :         query_queue_.close();
      98         [ #  # ]:          0 :         schedule_thread_.join();
      99                 :          0 :     }
     100                 :          2 : }
     101                 :            : 
     102                 :          0 : std::future<bool> SerialScheduler::schedule_command(Transaction &t, std::unique_ptr<ast::Command> command, Diagnostic &diag)
     103                 :            : {
     104                 :          0 :     std::promise<bool> execution_completed;
     105         [ #  # ]:          0 :     auto execution_completed_future = execution_completed.get_future();
     106         [ #  # ]:          0 :     query_queue_.push(t, std::move(command), diag, std::move(execution_completed));
     107                 :            : 
     108         [ #  # ]:          0 :     if (not schedule_thread_.joinable()) [[unlikely]]
     109                 :            :         // Creating the worker thread not here but in the constructor of `SerialScheduler` causes deadlocks.
     110         [ #  # ]:          0 :         schedule_thread_ = std::thread(schedule_thread);
     111                 :          0 :     return execution_completed_future;
     112         [ #  # ]:          0 : }
     113                 :            : 
     114                 :          0 : std::unique_ptr<SerialScheduler::Transaction> SerialScheduler::begin_transaction() {
     115                 :          0 :     return std::make_unique<SerialScheduler::Transaction>();
     116                 :            : }
     117                 :            : 
     118                 :          0 : bool SerialScheduler::commit(std::unique_ptr<SerialScheduler::Transaction> t) {
     119                 :            :     /* TODO: When autocommit is not used as the default anymore, the transaction must check for conflicts with
     120                 :            :      * other transactions that were introduced in the time between when this transaction executed statements and now. */
     121                 :          0 :     query_queue_.stop_transaction(*t);
     122                 :          0 :     return true;
     123                 :            : }
     124                 :            : 
     125                 :          0 : bool SerialScheduler::abort(std::unique_ptr<SerialScheduler::Transaction> t) {
     126                 :            :     /* TODO: Undo changes of transaction */
     127                 :          0 :     query_queue_.stop_transaction(*t);
     128                 :          0 :     return true;
     129                 :            : }
     130                 :            : 
     131                 :          0 : void SerialScheduler::schedule_thread()
     132                 :            : {
     133                 :          0 :     Catalog &C = Catalog::Get();
     134         [ #  # ]:          0 :     while (not query_queue_.is_closed()) {
     135                 :          0 :         auto ret = query_queue_.pop();
     136                 :            :         // pop() should only return no value if the queue is closed
     137         [ #  # ]:          0 :         if (not ret.has_value()) continue;
     138                 :            : 
     139         [ #  # ]:          0 :         auto [t, ast, diag, promise] = std::move(ret.value());
     140                 :            : 
     141                 :            :         // check if transaction has a start_time, set one if not. -1 represents an undefined value.
     142   [ #  #  #  #  :          0 :         if (t.start_time() == -1) t.start_time(next_start_time++);
                   #  # ]
     143                 :            :         /* TODO: Implement overflow handling: if the transaction timestamps overflow, then outdated versions of tuples
     144                 :            :          * can become visible and other weird behaviour can occur. For this to happen, (2^63)-1 Transactions need to
     145                 :            :          * run without every restarting mutable. */
     146   [ #  #  #  # ]:          0 :         if (next_start_time < 0) [[unlikely]] M_unreachable("Transaction timestamp overflow");
     147                 :            : 
     148         [ #  # ]:          0 :         ast::Sema sema(diag);
     149         [ #  # ]:          0 :         bool err = diag.num_errors() > 0; // parser errors
     150                 :            : 
     151         [ #  # ]:          0 :         diag.clear();
     152         [ #  # ]:          0 :         auto cmd = sema.analyze(std::move(ast));
     153         [ #  # ]:          0 :         err |= diag.num_errors() > 0; // sema errors
     154                 :            : 
     155         [ #  # ]:          0 :         M_insist(not err == bool(cmd), "when there are no errors, Sema must have returned a command");
     156   [ #  #  #  # ]:          0 :         if (not err and cmd) {
     157   [ #  #  #  # ]:          0 :             cmd->transaction(&t);
     158   [ #  #  #  # ]:          0 :             cmd->execute(diag);
     159         [ #  # ]:          0 :             promise.set_value(true);
     160                 :          0 :             continue;
     161                 :            :         }
     162         [ #  # ]:          0 :         promise.set_value(false);
     163      [ #  #  # ]:          0 :     }
     164                 :          0 : }
     165                 :            : 
     166                 :            : __attribute__((constructor(202)))
     167                 :          1 : static void register_scheduler()
     168                 :            : {
     169                 :          1 :     Catalog &C = Catalog::Get();
     170         [ -  + ]:          1 :     C.register_scheduler(
     171                 :          1 :         C.pool("SerialScheduler"),
     172         [ +  - ]:          1 :         std::make_unique<SerialScheduler>(),
     173                 :            :         "executes all incoming queries serially"
     174                 :            :     );
     175                 :          1 : }

Generated by: LCOV version 1.16