Branch data Line data Source code
1 : : #include "catalog/SerialScheduler.hpp" 2 : : #include "parse/Sema.hpp" 3 : : #include <mutable/mutable.hpp> 4 : : 5 : : 6 : : using namespace m; 7 : : 8 : : 9 : 0 : std::optional<m::Scheduler::queued_command> SerialScheduler::CommandQueue::pop() 10 : : { 11 : 0 : std::unique_lock<std::mutex> lock(mutex_); 12 [ # # ]: 0 : has_element_.wait(lock, []{ 13 : : // always wake up if the queue is closed 14 [ # # ]: 0 : if (query_queue_.closed_) [[unlikely]] 15 : 0 : return true; 16 : : // keep waiting if the queue is empty but not closed 17 [ # # ]: 0 : else if (query_queue_.command_list_.empty()) 18 : 0 : return false; 19 : : // wake up if there is no running_transaction_ or if the next command is from the running_transaction_ 20 [ # # # # ]: 0 : else if (not query_queue_.running_transaction_ or std::get<0>(query_queue_.command_list_.front()) == *query_queue_.running_transaction_) 21 : 0 : return true; 22 : : // otherwise keep waiting 23 : : else 24 : 0 : return false; 25 : 0 : }); 26 : : // if the queue is still empty here the queue should be closed. 27 [ # # ]: 0 : if (command_list_.empty()) [[unlikely]] return std::nullopt; 28 : : 29 : 0 : queued_command res = std::move(command_list_.front()); 30 : 0 : command_list_.pop_front(); 31 : : 32 : : // if there is currently no running transaction, set the next transaction in the queue as the running transaction 33 [ # # ]: 0 : if (not running_transaction_) 34 : 0 : running_transaction_ = &std::get<0>(res); 35 : : 36 : 0 : return {std::move(res)}; 37 : 0 : } 38 : : 39 : 0 : void SerialScheduler::CommandQueue::push(Transaction &t, std::unique_ptr<ast::Command> command, Diagnostic &diag, std::promise<bool> promise) 40 : : { 41 : 0 : std::unique_lock<std::mutex> lock(mutex_); 42 [ # # ]: 0 : if (closed_) { 43 : : /* Since the command queue is closed, no more command will be executed 44 : : * => set the promise of this newly pushed command to false right away */ 45 [ # # ]: 0 : promise.set_value(false); 46 : 0 : return; 47 : : } 48 : : 49 : : /* Check if transaction `t` already has elements in the queue, if not then check if the transaction is the currently 50 : : * running transaction. If yes, then emplace the command in the front, otherwise emplace at the end. */ 51 [ # # # # ]: 0 : auto it = std::find_if(command_list_.rend(), command_list_.rbegin(), 52 : 0 : [&t](queued_command &x){ return (t == std::get<0>(x)); } 53 : : ); 54 [ # # # # : 0 : if (running_transaction_ and *running_transaction_ == t and it == command_list_.rend()) # # # # # # ] 55 [ # # ]: 0 : command_list_.emplace_front(t, std::move(command), diag, std::move(promise)); 56 [ # # # # ]: 0 : else if (it == command_list_.rbegin()) 57 [ # # ]: 0 : command_list_.emplace_back(t, std::move(command), diag, std::move(promise)); 58 : : else 59 [ # # ]: 0 : command_list_.emplace(++it.base(), t, std::move(command), diag, std::move(promise)); 60 : : 61 [ # # ]: 0 : lock.unlock(); 62 : 0 : has_element_.notify_one(); 63 [ # # ]: 0 : } 64 : : 65 : 0 : void SerialScheduler::CommandQueue::close() 66 : : { 67 : 0 : std::unique_lock<std::mutex> lock(mutex_); 68 : 0 : closed_ = true; 69 [ # # ]: 0 : while (not command_list_.empty()) { 70 [ # # ]: 0 : std::get<3>(command_list_.front()).set_value(false); 71 : 0 : command_list_.pop_front(); 72 : : } 73 [ # # ]: 0 : lock.unlock(); 74 : 1 : has_element_.notify_all(); 75 : 0 : } 76 : : 77 : 0 : bool SerialScheduler::CommandQueue::is_closed() 78 : : { 79 : 0 : std::lock_guard<std::mutex> lock(mutex_); 80 : 0 : return query_queue_.closed_; 81 : 0 : } 82 : : 83 : 0 : void SerialScheduler::CommandQueue::stop_transaction(Transaction &t) { 84 : 0 : std::unique_lock<std::mutex> lock(mutex_); 85 [ # # ]: 0 : M_insist(&t == running_transaction_); 86 : 0 : running_transaction_ = nullptr; 87 [ # # ]: 0 : lock.unlock(); 88 : 0 : has_element_.notify_one(); 89 : 0 : } 90 : : 91 : 1 : SerialScheduler::CommandQueue SerialScheduler::query_queue_; 92 : : std::atomic<int64_t> SerialScheduler::next_start_time = 0; 93 : : 94 : 2 : SerialScheduler::~SerialScheduler() 95 : 2 : { 96 [ + - ]: 1 : if (schedule_thread_.joinable()) { 97 [ # # ]: 0 : query_queue_.close(); 98 [ # # ]: 0 : schedule_thread_.join(); 99 : 0 : } 100 : 2 : } 101 : : 102 : 0 : std::future<bool> SerialScheduler::schedule_command(Transaction &t, std::unique_ptr<ast::Command> command, Diagnostic &diag) 103 : : { 104 : 0 : std::promise<bool> execution_completed; 105 [ # # ]: 0 : auto execution_completed_future = execution_completed.get_future(); 106 [ # # ]: 0 : query_queue_.push(t, std::move(command), diag, std::move(execution_completed)); 107 : : 108 [ # # ]: 0 : if (not schedule_thread_.joinable()) [[unlikely]] 109 : : // Creating the worker thread not here but in the constructor of `SerialScheduler` causes deadlocks. 110 [ # # ]: 0 : schedule_thread_ = std::thread(schedule_thread); 111 : 0 : return execution_completed_future; 112 [ # # ]: 0 : } 113 : : 114 : 0 : std::unique_ptr<SerialScheduler::Transaction> SerialScheduler::begin_transaction() { 115 : 0 : return std::make_unique<SerialScheduler::Transaction>(); 116 : : } 117 : : 118 : 0 : bool SerialScheduler::commit(std::unique_ptr<SerialScheduler::Transaction> t) { 119 : : /* TODO: When autocommit is not used as the default anymore, the transaction must check for conflicts with 120 : : * other transactions that were introduced in the time between when this transaction executed statements and now. */ 121 : 0 : query_queue_.stop_transaction(*t); 122 : 0 : return true; 123 : : } 124 : : 125 : 0 : bool SerialScheduler::abort(std::unique_ptr<SerialScheduler::Transaction> t) { 126 : : /* TODO: Undo changes of transaction */ 127 : 0 : query_queue_.stop_transaction(*t); 128 : 0 : return true; 129 : : } 130 : : 131 : 0 : void SerialScheduler::schedule_thread() 132 : : { 133 : 0 : Catalog &C = Catalog::Get(); 134 [ # # ]: 0 : while (not query_queue_.is_closed()) { 135 : 0 : auto ret = query_queue_.pop(); 136 : : // pop() should only return no value if the queue is closed 137 [ # # ]: 0 : if (not ret.has_value()) continue; 138 : : 139 [ # # ]: 0 : auto [t, ast, diag, promise] = std::move(ret.value()); 140 : : 141 : : // check if transaction has a start_time, set one if not. -1 represents an undefined value. 142 [ # # # # : 0 : if (t.start_time() == -1) t.start_time(next_start_time++); # # ] 143 : : /* TODO: Implement overflow handling: if the transaction timestamps overflow, then outdated versions of tuples 144 : : * can become visible and other weird behaviour can occur. For this to happen, (2^63)-1 Transactions need to 145 : : * run without every restarting mutable. */ 146 [ # # # # ]: 0 : if (next_start_time < 0) [[unlikely]] M_unreachable("Transaction timestamp overflow"); 147 : : 148 [ # # ]: 0 : ast::Sema sema(diag); 149 [ # # ]: 0 : bool err = diag.num_errors() > 0; // parser errors 150 : : 151 [ # # ]: 0 : diag.clear(); 152 [ # # ]: 0 : auto cmd = sema.analyze(std::move(ast)); 153 [ # # ]: 0 : err |= diag.num_errors() > 0; // sema errors 154 : : 155 [ # # ]: 0 : M_insist(not err == bool(cmd), "when there are no errors, Sema must have returned a command"); 156 [ # # # # ]: 0 : if (not err and cmd) { 157 [ # # # # ]: 0 : cmd->transaction(&t); 158 [ # # # # ]: 0 : cmd->execute(diag); 159 [ # # ]: 0 : promise.set_value(true); 160 : 0 : continue; 161 : : } 162 [ # # ]: 0 : promise.set_value(false); 163 [ # # # ]: 0 : } 164 : 0 : } 165 : : 166 : : __attribute__((constructor(202))) 167 : 1 : static void register_scheduler() 168 : : { 169 : 1 : Catalog &C = Catalog::Get(); 170 [ - + ]: 1 : C.register_scheduler( 171 : 1 : C.pool("SerialScheduler"), 172 [ + - ]: 1 : std::make_unique<SerialScheduler>(), 173 : : "executes all incoming queries serially" 174 : : ); 175 : 1 : }