Branch data Line data Source code
1 : : #include <mutable/catalog/DatabaseCommand.hpp>
2 : :
3 : : #include "backend/StackMachine.hpp"
4 : : #include <mutable/catalog/Catalog.hpp>
5 : : #include <mutable/catalog/Schema.hpp>
6 : : #include <mutable/IR/Optimizer.hpp>
7 : : #include <mutable/mutable.hpp>
8 : : #include <mutable/Options.hpp>
9 : : #include <mutable/storage/Index.hpp>
10 : : #include <mutable/util/DotTool.hpp>
11 : :
12 : :
13 : : using namespace m;
14 : :
15 : :
16 : 0 : void EmptyCommand::execute(Diagnostic &diag) { /* Nothing to be done. */ }
17 : :
18 : :
19 : : /*======================================================================================================================
20 : : * Instructions
21 : : *====================================================================================================================*/
22 : :
23 : 0 : void learn_spns::execute(Diagnostic &diag)
24 : : {
25 : 0 : auto &C = Catalog::Get();
26 [ # # ]: 0 : if (not C.has_database_in_use()) { diag.err() << "No database selected.\n"; return; }
27 : :
28 : 0 : auto &DB = C.get_database_in_use();
29 [ # # ]: 0 : if (DB.size() == 0) { diag.err() << "There are no tables in the database.\n"; return; }
30 : :
31 [ # # # # ]: 0 : auto CE = C.create_cardinality_estimator(C.pool("Spn"), DB.name);
32 [ # # ]: 0 : auto spn_estimator = cast<SpnEstimator>(CE.get());
33 [ # # ]: 0 : spn_estimator->learn_spns();
34 [ # # ]: 0 : DB.cardinality_estimator(std::move(CE));
35 : :
36 [ # # # # : 0 : if (not Options::Get().quiet) { diag.out() << "Learned SPN on every table in " << DB.name << ".\n"; }
# # # # #
# # # ]
37 : 0 : }
38 : :
39 : : __attribute__((constructor(201)))
40 : 1 : static void register_instructions()
41 : : {
42 : 1 : Catalog &C = Catalog::Get();
43 : : #define REGISTER(NAME, DESCRIPTION) \
44 : : C.register_instruction<NAME>(C.pool(#NAME), DESCRIPTION)
45 [ + - ]: 1 : REGISTER(learn_spns, "create an SPN for every table in the database");
46 : : #undef REGISTER
47 : 1 : }
48 : :
49 : : /*======================================================================================================================
50 : : * Data Manipulation Language (DML)
51 : : *====================================================================================================================*/
52 : :
53 : 0 : void QueryDatabase::execute(Diagnostic &diag)
54 : : {
55 : 0 : Catalog &C = Catalog::Get();
56 : :
57 [ # # ]: 0 : if (auto stmt = cast<ast::Stmt>(&ast())) {
58 [ # # ]: 0 : if (Options::Get().ast)
59 : 0 : stmt->dump(diag.out());
60 [ # # ]: 0 : if (Options::Get().astdot) {
61 : 0 : DotTool dot(diag);
62 [ # # # # ]: 0 : stmt->dot(dot.stream());
63 [ # # ]: 0 : dot.show("ast", false, "dot");
64 : 0 : }
65 : 0 : }
66 : :
67 [ # # # # ]: 0 : auto graph_construction = C.timer().create_timing("Construct the query graph");
68 [ # # # # ]: 0 : graph_ = QueryGraph::Build(ast<ast::SelectStmt>());
69 [ # # # # ]: 0 : graph_->transaction(this->transaction());
70 [ # # # # : 0 : for (auto &pre_opt : C.pre_optimizations())
# # # # ]
71 [ # # # # ]: 0 : (*pre_opt.second).operator()(*graph_);
72 [ # # ]: 0 : graph_construction.stop();
73 : :
74 [ # # # # ]: 1 : if (Options::Get().graph)
75 [ # # ]: 0 : graph_->dump(std::cout);
76 [ # # # # ]: 0 : if (Options::Get().graphdot) {
77 [ # # ]: 0 : DotTool dot(diag);
78 [ # # # # ]: 0 : graph_->dot(dot.stream());
79 [ # # ]: 0 : dot.show("graph", false, "fdp");
80 : 0 : }
81 [ # # # # ]: 0 : if (Options::Get().graph2sql) {
82 [ # # ]: 0 : graph_->sql(std::cout);
83 [ # # ]: 0 : std::cout.flush();
84 : 0 : }
85 : :
86 [ # # # # : 0 : auto logical_plan_computation = C.timer().create_timing("Compute the logical query plan");
# # ]
87 [ # # # # : 0 : Optimizer Opt(C.plan_enumerator(), C.cost_function());
# # ]
88 [ # # ]: 0 : std::unique_ptr<Producer> producer = Opt(*graph_);
89 [ # # # # : 0 : for (auto &post_opt : C.logical_post_optimizations())
# # # # ]
90 [ # # # # ]: 0 : producer = (*post_opt.second).operator()(std::move(producer));
91 [ # # ]: 0 : logical_plan_computation.stop();
92 [ # # ]: 0 : M_insist(bool(producer), "logical plan must have been computed");
93 : :
94 [ # # # # ]: 0 : if (Options::Get().plan)
95 [ # # # # ]: 0 : producer->dump(diag.out());
96 [ # # # # ]: 0 : if (Options::Get().plandot) {
97 [ # # ]: 0 : DotTool dot(diag);
98 [ # # # # ]: 0 : producer->dot(dot.stream());
99 [ # # ]: 0 : dot.show("logical_plan", false, "dot");
100 : 0 : }
101 : :
102 [ # # # # ]: 0 : if (Options::Get().benchmark)
103 [ # # ]: 0 : logical_plan_ = std::make_unique<NoOpOperator>(std::cout);
104 : : else
105 [ # # ]: 0 : logical_plan_ = std::make_unique<PrintOperator>(std::cout);
106 [ # # ]: 0 : logical_plan_->add_child(producer.release());
107 : :
108 [ # # ]: 0 : static thread_local std::unique_ptr<Backend> backend;
109 [ # # ]: 0 : if (not backend)
110 [ # # # # : 0 : backend = M_TIME_EXPR(C.create_backend(), "Create backend", C.timer());
# # # # ]
111 : :
112 [ # # # # : 0 : auto physical_plan_computation = C.timer().create_timing("Compute the physical query plan");
# # ]
113 [ # # ]: 0 : PhysicalOptimizerImpl<ConcretePhysicalPlanTable> PhysOpt;
114 [ # # ]: 0 : backend->register_operators(PhysOpt);
115 [ # # ]: 0 : PhysOpt.cover(*logical_plan_);
116 [ # # ]: 0 : physical_plan_ = PhysOpt.extract_plan();
117 [ # # # # : 0 : for (auto &post_opt : C.physical_post_optimizations())
# # # # ]
118 [ # # # # ]: 0 : physical_plan_ = (*post_opt.second).operator()(std::move(physical_plan_));
119 [ # # ]: 0 : physical_plan_computation.stop();
120 : :
121 [ # # # # ]: 0 : if (Options::Get().physplan)
122 [ # # ]: 0 : physical_plan_->dump(std::cout);
123 : :
124 [ # # # # ]: 0 : if (not Options::Get().dryrun)
125 [ # # # # : 0 : M_TIME_EXPR(backend->execute(*physical_plan_), "Execute query", C.timer());
# # # # ]
126 : 0 : }
127 : :
128 : 0 : void InsertRecords::execute(Diagnostic&)
129 : : {
130 : 0 : Catalog &C = Catalog::Get();
131 : 0 : auto &DB = C.get_database_in_use();
132 : :
133 : 0 : auto &I = ast<ast::InsertStmt>();
134 [ # # ]: 0 : auto &T = DB.get_table(I.table_name.text.assert_not_none());
135 : 0 : auto &store = T.store();
136 : 0 : StoreWriter W(store);
137 [ # # ]: 0 : auto &S = W.schema();
138 [ # # ]: 0 : Tuple tup(S);
139 : :
140 : : /* Find timestamp attributes */
141 [ # # # # : 0 : auto ts_begin = std::find_if(T.cbegin_hidden(), T.end_hidden(),
# # ]
142 : 0 : [&](const Attribute & attr) {
143 [ # # ]: 0 : return attr.name == C.pool("$ts_begin");
144 : 0 : });
145 [ # # # # : 0 : auto ts_end = std::find_if(T.cbegin_hidden(), T.end_hidden(),
# # ]
146 : 0 : [&](const Attribute & attr) {
147 [ # # ]: 0 : return attr.name == C.pool("$ts_end");
148 : 0 : });
149 : :
150 : : /* Write all tuples to the store. */
151 [ # # ]: 0 : for (auto &t : I.tuples) {
152 [ # # ]: 0 : StackMachine get_tuple(Schema{});
153 [ # # ]: 0 : for (std::size_t i = 0; i != t.size(); ++i) {
154 [ # # ]: 0 : auto attr_id = T.convert_id(i); // hidden attributes change the actual id of the attribute
155 : 0 : auto &v = t[i];
156 [ # # # # ]: 0 : switch (v.first) {
157 : : case ast::InsertStmt::I_Null:
158 [ # # ]: 0 : get_tuple.emit_St_Tup_Null(0, i);
159 : 0 : break;
160 : :
161 : : case ast::InsertStmt::I_Default:
162 : : /* nothing to be done, Tuples are initialized to default values */
163 : 0 : break;
164 : :
165 : : case ast::InsertStmt::I_Expr:
166 [ # # ]: 0 : get_tuple.emit(*v.second);
167 [ # # # # : 0 : get_tuple.emit_Cast(S[attr_id].type, v.second->type());
# # ]
168 [ # # # # ]: 0 : get_tuple.emit_St_Tup(0, attr_id, S[attr_id].type);
169 : 0 : break;
170 : : }
171 : 0 : }
172 : 0 : Tuple *args[] = { &tup };
173 [ # # ]: 0 : get_tuple(args);
174 : :
175 : : /*----- set timestamps if available. -----*/
176 [ # # # # : 0 : if (ts_begin != T.end_hidden()) {
# # ]
177 [ # # # # : 0 : tup.set(ts_begin->id, Value(transaction()->start_time()));
# # # # #
# ]
178 : : /* Set $ts_end to -1. It is a special value representing infinity. */
179 [ # # # # : 0 : M_insist(ts_end != T.end_hidden());
# # ]
180 [ # # # # : 0 : tup.set(ts_end->id, Value(-1));
# # ]
181 : 0 : }
182 : :
183 [ # # ]: 0 : W.append(tup);
184 : 0 : }
185 : : /* Invalidate all indexes on the table. */
186 [ # # # # ]: 0 : DB.invalidate_indexes(T.name());
187 : 0 : }
188 : :
189 : 0 : void UpdateRecords::execute(Diagnostic&)
190 : : {
191 : 0 : M_unreachable("not yet implemented");
192 : : }
193 : :
194 : 0 : void DeleteRecords::execute(Diagnostic&)
195 : : {
196 : 0 : M_unreachable("not yet implemented");
197 : : }
198 : :
199 : 0 : void ImportDSV::execute(Diagnostic &diag)
200 : : {
201 : 0 : Catalog &C = Catalog::Get();
202 : : try {
203 [ # # # # ]: 0 : DSVReader R(table_, cfg_, diag, transaction());
204 : :
205 : 0 : errno = 0;
206 [ # # ]: 0 : std::ifstream file(path_);
207 [ # # # # ]: 0 : if (not file) {
208 : 0 : const auto errsv = errno;
209 [ # # # # : 0 : diag.err() << "Could not open file " << path_;
# # ]
210 [ # # ]: 0 : if (errsv)
211 [ # # # # : 0 : diag.err() << ": " << strerror(errsv);
# # ]
212 [ # # # # ]: 0 : diag.err() << std::endl;
213 : 0 : } else {
214 [ # # # # : 0 : M_TIME_EXPR(R(file, path_.c_str()), "Read DSV file", C.timer());
# # # # ]
215 : : }
216 [ # # # # ]: 0 : } catch (m::invalid_argument e) {
217 [ # # # # : 0 : diag.err() << "Error reading DSV file: " << e.what() << "\n";
# # # # ]
218 [ # # ]: 0 : }
219 : 0 : }
220 : :
221 : :
222 : : /*======================================================================================================================
223 : : * Data Definition Language
224 : : *====================================================================================================================*/
225 : :
226 : 0 : void CreateDatabase::execute(Diagnostic &diag)
227 : : {
228 : : try {
229 [ # # # # : 0 : Catalog::Get().add_database(db_name_);
# # ]
230 [ # # # # ]: 0 : if (not Options::Get().quiet)
231 [ # # # # : 0 : diag.out() << "Created database " << db_name_ << ".\n";
# # # # ]
232 [ # # ]: 0 : } catch (std::invalid_argument) {
233 [ # # # # : 0 : diag.err() << "Database " << db_name_ << " already exists.\n";
# # # # ]
234 [ # # ]: 0 : }
235 : 0 : }
236 : :
237 : 1 : void DropDatabase::execute(Diagnostic &diag)
238 : : {
239 : : try {
240 [ + - + - ]: 1 : Catalog::Get().drop_database(db_name_);
241 [ + - - + ]: 1 : if (not Options::Get().quiet)
242 [ # # # # : 0 : diag.out() << "Dropped database " << db_name_ << ".\n";
# # # # ]
243 [ # # ]: 1 : } catch (std::invalid_argument) {
244 [ # # # # : 0 : diag.err() << "Database " << db_name_ << " does not exist.\n";
# # # # ]
245 [ # # ]: 0 : }
246 : 1 : }
247 : :
248 : 0 : void UseDatabase::execute(Diagnostic &diag)
249 : : {
250 : 0 : auto &C = Catalog::Get();
251 : : try {
252 [ # # ]: 0 : auto &DB = C.get_database(db_name_);
253 [ # # ]: 0 : C.set_database_in_use(DB);
254 [ # # # # ]: 0 : if (not Options::Get().quiet)
255 [ # # # # : 0 : diag.out() << "Using database " << db_name_ << ".\n";
# # # # ]
256 [ # # ]: 0 : } catch (std::out_of_range) {
257 [ # # # # : 0 : diag.err() << "Database " << db_name_ << " does not exist.\n";
# # # # ]
258 [ # # ]: 0 : }
259 : 0 : }
260 : :
261 : 0 : void CreateTable::execute(Diagnostic &diag)
262 : : {
263 : 0 : auto &C = Catalog::Get();
264 : 0 : auto &DB = C.get_database_in_use();
265 : 0 : ThreadSafePooledString table_name = table_->name();
266 : 0 : Table *table = nullptr;
267 : : try {
268 [ # # ]: 0 : table = &DB.add(std::move(table_));
269 [ # # ]: 0 : } catch (std::invalid_argument) {
270 [ # # # # : 0 : diag.err() << "Table " << table_name << " already exists in database " << DB.name << ".\n";
# # # # #
# # # ]
271 [ # # # # ]: 0 : }
272 : :
273 [ # # # # ]: 0 : table->layout(C.data_layout());
274 [ # # # # ]: 0 : table->store(C.create_store(*table));
275 : :
276 [ # # # # ]: 0 : if (not Options::Get().quiet)
277 [ # # # # : 0 : diag.out() << "Created table " << table->name() << ".\n";
# # # # #
# ]
278 : 0 : }
279 : :
280 : 2 : void DropTable::execute(Diagnostic &diag)
281 : : {
282 : 2 : auto &C = Catalog::Get();
283 : 2 : auto &DB = C.get_database_in_use();
284 : :
285 [ + + ]: 5 : for (auto &table_name : table_names_) {
286 : : try {
287 [ + - ]: 3 : DB.drop_table(table_name);
288 [ + - - + ]: 3 : if (not Options::Get().quiet)
289 [ # # # # : 0 : diag.out() << "Dropped table " << table_name << ".\n";
# # # # ]
290 [ # # ]: 3 : } catch (std::invalid_argument) {
291 [ # # # # : 0 : diag.err() << "Table " << table_name << " does not exist in Database " << DB.name << ".\n";
# # # # #
# # # ]
292 [ # # ]: 0 : }
293 : : }
294 : 2 : }
295 : :
296 : 0 : void CreateIndex::execute(Diagnostic &diag)
297 : : {
298 : 0 : auto &C = Catalog::Get();
299 : 0 : auto &DB = C.get_database_in_use();
300 : 0 : const auto &table = DB.get_table(table_name_);
301 : :
302 : : /* Compute bulkloading schema from attribute name. */
303 : 0 : Schema schema;
304 [ # # # # : 0 : for (auto &entry : table.schema()) {
# # # # ]
305 [ # # # # : 0 : if (entry.id.name == attribute_name_) {
# # ]
306 [ # # # # ]: 0 : schema.add(entry);
307 : 0 : break; // only one-dimensional indexes are supported
308 : : }
309 : : }
310 : :
311 : : /* Bulkload index. */
312 : : try {
313 [ # # # # : 0 : M_TIME_EXPR(index_->bulkload(table, schema), "Bulkload index", C.timer());
# # # # ]
314 [ # # # # ]: 0 : } catch (invalid_argument) {
315 [ # # # # : 0 : diag.err() << "Could not bulkload index." << '\n';
# # ]
316 [ # # # # ]: 0 : }
317 : :
318 : : /* Add index to database. */
319 : : try {
320 [ # # # # ]: 0 : DB.add_index(std::move(index_), table_name_, attribute_name_, index_name_);
321 [ # # # # ]: 0 : if (not Options::Get().quiet)
322 [ # # # # : 0 : diag.out() << "Created index " << index_name_ << ".\n";
# # # # ]
323 [ # # # # ]: 0 : } catch (std::out_of_range) {
324 [ # # # # : 0 : diag.err() << "Table " << table_name_ << " or Attribute " << attribute_name_ << " does not exist in Database "
# # # # #
# # # ]
325 [ # # # # ]: 0 : << DB.name << ".\n";
326 [ # # # # : 0 : } catch (invalid_argument) {
# # ]
327 [ # # # # : 0 : diag.err() << "Index " << index_name_ << " already exists in Database " << DB.name << ".\n";
# # # # #
# # # ]
328 [ # # # # ]: 0 : }
329 : 0 : }
330 : :
331 : 0 : void DropIndex::execute(Diagnostic &diag)
332 : : {
333 : 0 : auto &C = Catalog::Get();
334 : 0 : auto &DB = C.get_database_in_use();
335 : :
336 [ # # ]: 0 : for (auto &index_name : index_names_) {
337 : : try {
338 [ # # ]: 0 : DB.drop_index(index_name);
339 [ # # # # ]: 0 : if (not Options::Get().quiet)
340 [ # # # # : 0 : diag.out() << "Dropped index " << index_name << ".\n";
# # # # ]
341 [ # # # # ]: 0 : } catch (invalid_argument) {
342 [ # # # # : 0 : diag.err() << "Index " << index_name << " does not exist in Database " << DB.name << ".\n";
# # # # #
# # # ]
343 [ # # ]: 0 : }
344 : : }
345 : 0 : }
346 : :
347 : : #define ACCEPT(CLASS) \
348 : : void CLASS::accept(DatabaseCommandVisitor &v) { v(*this); } \
349 : : void CLASS::accept(ConstDatabaseCommandVisitor &v) const { v(*this); }
350 : 0 : M_DATABASE_COMMAND_LIST(ACCEPT)
351 : : #undef ACCEPT
|