Branch data Line data Source code
1 : : #include "backend/Interpreter.hpp"
2 : :
3 : : #include "util/container/RefCountingHashMap.hpp"
4 : : #include <algorithm>
5 : : #include <cerrno>
6 : : #include <cstdlib>
7 : : #include <iterator>
8 : : #include <mutable/catalog/Catalog.hpp>
9 : : #include <mutable/Options.hpp>
10 : : #include <mutable/parse/AST.hpp>
11 : : #include <mutable/util/fn.hpp>
12 : : #include <numeric>
13 : : #include <type_traits>
14 : :
15 : :
16 : : using namespace m;
17 : : using namespace m::storage;
18 : :
19 : :
20 : : /*======================================================================================================================
21 : : * Helper function
22 : : *====================================================================================================================*/
23 : :
24 : : /** Compile a `StackMachine` to load or store a tuple of `Schema` `tuple_schema` using a given memory address and a
25 : : * given `DataLayout`.
26 : : *
27 : : * @param tuple_schema the `Schema` of the tuple to load/store, specifying the `Schema::Identifier`s to load/store
28 : : * @param address the memory address of the `Store` we are loading from / storing to
29 : : * @param layout the `DataLayout` of the `Table` we are loading from / storing to
30 : : * @param layout_schema the `Schema` of `layout`, specifying the `Schema::Identifier`s present in `layout`
31 : : * @param row_id the ID of the *first* row to load/store
32 : : * @param tuple_id the ID of the tuple used for loading/storing
33 : : */
34 : : template<bool IsStore>
35 : 882 : static StackMachine compile_data_layout(const Schema &tuple_schema, void *address, const DataLayout &layout,
36 : : const Schema &layout_schema, std::size_t row_id, std::size_t tuple_id)
37 : : {
38 : 882 : StackMachine SM; // the `StackMachine` to compile
39 : :
40 : : struct stride_info_t
41 : : {
42 : : std::size_t counter_id;
43 : : uint64_t num_tuples;
44 : : uint64_t stride_in_bits;
45 : : };
46 : 882 : std::vector<stride_info_t> stride_info_stack; // used to track all strides from root to leaf
47 : :
48 : 882 : struct {
49 : 882 : std::size_t id = -1UL; ///< context id
50 : 882 : std::size_t offset_id = -1UL; ///< id to keep track of current adjustable bit offset in case of a bit stride
51 : : uintptr_t bit_offset; ///< fixed offset, in bits
52 : : uint64_t bit_stride; ///< stride in bits
53 : : uint64_t num_tuples; ///< number of tuples of the linearization in which the null bitmap is stored
54 : : std::size_t row_id; ///< the row id within the linearization in which the null bitmap is stored
55 : :
56 : 10622 : operator bool() { return id != -1UL; }
57 : 4133 : bool adjustable_offset() { return offset_id != -1UL; }
58 : 882 : } null_bitmap_info;
59 : :
60 : 882 : std::unordered_map<std::size_t, std::size_t> leaf2id;
61 : 882 : std::unordered_map<std::size_t, std::size_t> leaf2mask;
62 : :
63 : : /*----- Check whether any of the entries in `tuple_schema` can be NULL, so that we need the NULL bitmap. -----*/
64 [ + - + - ]: 1764 : const bool needs_null_bitmap = [&]() {
65 [ + - + - ]: 882 : for (auto &tuple_entry : tuple_schema) {
66 [ + - + - ]: 882 : if (layout_schema[tuple_entry.id].second.nullable())
67 : 883 : return true; // found an entry in `tuple_schema` that can be NULL according to `layout_schema`
68 : : }
69 : 0 : return false; // no attribute in `tuple_schema` can be NULL according to `layout_schema`
70 : 882 : }();
71 : :
72 : : /* Compute location of NULL bitmap. */
73 : 882 : const auto null_bitmap_idx = layout_schema.num_entries();
74 : 1765 : auto find_null_bitmap = [&](const DataLayout &layout, std::size_t row_id) -> void {
75 : 2646 : auto find_null_bitmap_impl = [&](const DataLayout::INode &node, uintptr_t offset, std::size_t row_id,
76 : : auto &find_null_bitmap_ref) -> void
77 : : {
78 [ + + + + ]: 7075 : for (auto &child : node) {
79 [ + + + + ]: 5311 : if (auto child_leaf = cast<const DataLayout::Leaf>(child.ptr.get())) {
80 [ + + + + ]: 4429 : if (child_leaf->index() == null_bitmap_idx) {
81 : 882 : M_insist(not null_bitmap_info, "there must be at most one null bitmap in the linearization");
82 : 882 : const uint64_t additional_offset_in_bits = child.offset_in_bits + row_id * child.stride_in_bits;
83 : : /* add NULL bitmap address to context */
84 : 882 : null_bitmap_info.id = SM.add(reinterpret_cast<void*>(offset + additional_offset_in_bits / 8));
85 : 882 : null_bitmap_info.bit_offset = (additional_offset_in_bits) % 8;
86 : 882 : null_bitmap_info.bit_stride = child.stride_in_bits;
87 : 882 : null_bitmap_info.num_tuples = node.num_tuples();
88 : 882 : null_bitmap_info.row_id = row_id;
89 : 882 : }
90 : 4429 : } else {
91 : 882 : auto child_inode = as<const DataLayout::INode>(child.ptr.get());
92 : 882 : const std::size_t lin_id = row_id / child_inode->num_tuples();
93 : 882 : const std::size_t inner_row_id = row_id % child_inode->num_tuples();
94 : 882 : const uint64_t additional_offset = child.offset_in_bits / 8 + lin_id * child.stride_in_bits / 8;
95 : 882 : find_null_bitmap_ref(*child_inode, offset + additional_offset, inner_row_id, find_null_bitmap_ref);
96 : : }
97 : : }
98 : 1764 : };
99 : 882 : find_null_bitmap_impl(static_cast<const DataLayout::INode&>(layout), uintptr_t(address), row_id,
100 : : find_null_bitmap_impl);
101 : 882 : };
102 [ + - + - ]: 882 : if (needs_null_bitmap)
103 [ + - + - ]: 882 : find_null_bitmap(layout, row_id);
104 [ + - + - : 882 : if (null_bitmap_info and null_bitmap_info.bit_stride) {
+ + + - +
- + + ]
105 [ + - + - : 840 : null_bitmap_info.offset_id = SM.add(null_bitmap_info.bit_offset); // add initial NULL bitmap offset to context
+ - + - ]
106 : 840 : }
107 : :
108 : : /* Emit code for attribute access and pointer increment. */
109 : 1764 : auto compile_accesses = [&](const DataLayout &layout, std::size_t row_id) -> void {
110 : 2646 : auto compile_accesses_impl = [&](const DataLayout::INode &node, uintptr_t offset, std::size_t row_id,
111 : : auto &compile_accesses_ref) -> void
112 : : {
113 [ + + + + ]: 7075 : for (auto &child : node) {
114 [ + + + + ]: 5311 : if (auto child_leaf = cast<const DataLayout::Leaf>(child.ptr.get())) {
115 [ + + + + ]: 4429 : if (child_leaf->index() != null_bitmap_idx) {
116 [ - + - + ]: 3547 : const bool attr_can_be_null = null_bitmap_info and layout_schema[child_leaf->index()].nullable();
117 : 3547 : auto &id = layout_schema[child_leaf->index()].id;
118 : :
119 : : /* Locate the attribute in the operator schema. */
120 [ - + - + ]: 3547 : if (auto it = tuple_schema.find(id); it != tuple_schema.end()) {
121 : 3547 : uint64_t idx = std::distance(tuple_schema.begin(), it); // get attribute index in schema
122 : 3547 : const uint64_t additional_offset_in_bits = child.offset_in_bits + row_id * child.stride_in_bits;
123 : 3547 : const std::size_t byte_offset = additional_offset_in_bits / 8;
124 : 3547 : const std::size_t bit_offset = additional_offset_in_bits % 8;
125 [ + + + - : 3547 : M_insist(not bit_offset or child_leaf->type()->is_boolean() or child_leaf->type()->is_bitmap(),
+ + + - ]
126 : : "only booleans and bitmaps may not be byte aligned");
127 : :
128 : 3547 : const std::size_t byte_stride = child.stride_in_bits / 8;
129 : 3547 : const std::size_t bit_stride = child.stride_in_bits % 8;
130 [ + + + - : 3547 : M_insist(not bit_stride or child_leaf->type()->is_boolean() or child_leaf->type()->is_bitmap(),
+ + + - ]
131 : : "only booleans and bitmaps may not be byte aligned");
132 [ + + + + ]: 3547 : M_insist(bit_stride == 0 or byte_stride == 0,
133 : : "the stride must be a whole multiple of a byte or less than a byte");
134 : :
135 : : /* Access NULL bit. */
136 [ - + - + ]: 3547 : if (attr_can_be_null) {
137 [ + + + + ]: 3547 : if (not null_bitmap_info.bit_stride) {
138 : : /* No bit stride means the NULL bitmap only advances with parent sequence. */
139 : 254 : const std::size_t bit_offset = null_bitmap_info.bit_offset + child_leaf->index();
140 [ + + + + ]: 254 : if (bit_offset < 8) {
141 : 182 : SM.emit_Ld_Ctx(null_bitmap_info.id);
142 : : if constexpr (IsStore) {
143 : 82 : SM.emit_Ld_Tup(tuple_id, idx);
144 : 82 : SM.emit_Is_Null();
145 : 82 : SM.emit_St_b(bit_offset);
146 : : } else {
147 : 100 : SM.emit_Ld_b(0x1UL << bit_offset);
148 : : }
149 : 182 : } else {
150 : : /* Advance to respective byte. */
151 : 72 : SM.add_and_emit_load(uint64_t(bit_offset / 8));
152 : 72 : SM.emit_Ld_Ctx(null_bitmap_info.id);
153 : 72 : SM.emit_Add_p();
154 : : if constexpr (IsStore) {
155 : 44 : SM.emit_Ld_Tup(tuple_id, idx);
156 : 44 : SM.emit_Is_Null();
157 : 44 : SM.emit_St_b(bit_offset % 8);
158 : : } else {
159 : 28 : SM.emit_Ld_b(0x1UL << (bit_offset % 8));
160 : : }
161 : : }
162 : 254 : } else {
163 : : /* With bit stride. Use adjustable offset instead of fixed offset. */
164 : 3293 : M_insist(null_bitmap_info.adjustable_offset());
165 : :
166 : : /* Create variables for address and mask in context. Only used for storing.*/
167 : : std::size_t address_id, mask_id;
168 : : if constexpr (IsStore) {
169 : 2991 : address_id = SM.add(reinterpret_cast<void*>(0));
170 : 2991 : mask_id = SM.add(uint64_t(0));
171 : : }
172 : :
173 : : /* Compute address of entire byte containing the NULL bit. */
174 : 3293 : SM.emit_Ld_Ctx(null_bitmap_info.offset_id);
175 : 3293 : SM.add_and_emit_load(child_leaf->index());
176 : 3293 : SM.emit_Add_i();
177 : 3293 : SM.emit_SARi_i(3); // (adj_offset + attr.id) / 8
178 : 3293 : SM.emit_Ld_Ctx(null_bitmap_info.id);
179 : 3293 : SM.emit_Add_p();
180 : : if constexpr (IsStore)
181 : 2991 : SM.emit_Upd_Ctx(address_id); // store address in context
182 : : else
183 : 302 : SM.emit_Ld_i8(); // load byte from address
184 : :
185 : : if constexpr (IsStore) {
186 : : /* Test whether value equals NULL. */
187 : 2991 : SM.emit_Ld_Tup(tuple_id, idx);
188 : 2991 : SM.emit_Is_Null();
189 : : }
190 : :
191 : : /* Initialize mask. */
192 : 3293 : SM.add_and_emit_load(uint64_t(0x1UL));
193 : :
194 : : /* Compute offset of NULL bit. */
195 : 3293 : SM.emit_Ld_Ctx(null_bitmap_info.offset_id);
196 : 3293 : SM.add_and_emit_load(child_leaf->index());
197 : 3293 : SM.emit_Add_i();
198 : 3293 : SM.add_and_emit_load(uint64_t(0b111));
199 : 3293 : SM.emit_And_i(); // (adj_offset + attr.id) % 8
200 : :
201 : : /* Shift mask by offset. */
202 : 3293 : SM.emit_ShL_i();
203 : : if constexpr (IsStore) {
204 : 2991 : SM.emit_Upd_Ctx(mask_id); // store mask in context
205 : :
206 : : /* Load byte and set NULL bit to 1. */
207 : 2991 : SM.emit_Ld_Ctx(address_id);
208 : 2991 : SM.emit_Ld_i8();
209 : 2991 : SM.emit_Or_i(); // in case of NULL
210 : :
211 : : /* Load byte and set NULL bit to 0. */
212 : 2991 : SM.emit_Ld_Ctx(mask_id);
213 : 2991 : SM.emit_Neg_i();
214 : 2991 : SM.emit_Ld_Ctx(address_id);
215 : 2991 : SM.emit_Ld_i8();
216 : 2991 : SM.emit_And_i(); // in case of not NULL
217 : :
218 : 2991 : SM.emit_Sel(); // select the respective modified byte
219 : :
220 : : /* Write entire byte back to the store. */
221 : 2991 : SM.emit_St_i8();
222 : : } else {
223 : : /* Apply mask and cast to boolean. */
224 : 302 : SM.emit_And_i();
225 : 302 : SM.emit_NEZ_i();
226 : : }
227 : : }
228 : : if constexpr (not IsStore)
229 : 430 : SM.emit_Push_Null(); // to select it later if NULL
230 : 3547 : }
231 : :
232 : : /* Introduce leaf pointer. */
233 : 3547 : const std::size_t offset_id = SM.add_and_emit_load(reinterpret_cast<void*>(offset + byte_offset));
234 : 3547 : leaf2id[child_leaf->index()] = offset_id;
235 : :
236 [ + + + + ]: 3547 : if (bit_stride) {
237 : 79 : M_insist(child_leaf->type()->is_boolean(), "only booleans are supported yet");
238 : :
239 : : if constexpr (IsStore) {
240 : : /* Load value to stack. */
241 : 51 : SM.emit_Ld_Tup(tuple_id, idx); // boolean
242 : : } else {
243 : : /* Load byte with the respective value. */
244 : 28 : SM.emit_Ld_i8();
245 : : }
246 : :
247 : : /* Introduce mask. */
248 : 79 : const std::size_t mask_id = SM.add(uint64_t(0x1UL << bit_offset));
249 : 79 : leaf2mask[child_leaf->index()] = mask_id;
250 : :
251 : : if constexpr (IsStore) {
252 : : /* Load byte and set bit to 1. */
253 : 51 : SM.emit_Ld_Ctx(offset_id);
254 : 51 : SM.emit_Ld_i8();
255 : 51 : SM.emit_Ld_Ctx(mask_id);
256 : 51 : SM.emit_Or_i(); // in case of TRUE
257 : :
258 : : /* Load byte and set bit to 0. */
259 : 51 : SM.emit_Ld_Ctx(offset_id);
260 : 51 : SM.emit_Ld_i8();
261 : 51 : SM.emit_Ld_Ctx(mask_id);
262 : 51 : SM.emit_Neg_i(); // negate mask
263 : 51 : SM.emit_And_i(); // in case of FALSE
264 : :
265 : 51 : SM.emit_Sel(); // select the respective modified byte
266 : :
267 : : /* Write entire byte back to the store. */
268 : 51 : SM.emit_St_i8();
269 : : } else {
270 : : /* Load and apply mask and convert to bool. */
271 : 28 : SM.emit_Ld_Ctx(mask_id);
272 : 28 : SM.emit_And_i();
273 : 28 : SM.emit_NEZ_i();
274 : :
275 [ - + ]: 28 : if (attr_can_be_null)
276 : 28 : SM.emit_Sel();
277 : :
278 : : /* Store value in output tuple. */
279 : 28 : SM.emit_St_Tup(tuple_id, idx, child_leaf->type());
280 : 28 : SM.emit_Pop();
281 : : }
282 : :
283 : : /* Update the mask. */
284 : 79 : SM.emit_Ld_Ctx(mask_id);
285 : 79 : SM.emit_ShLi_i(1);
286 : 79 : SM.emit_Upd_Ctx(mask_id);
287 : :
288 : : /* Check whether we are in the 8th iteration and reset mask. */
289 : 79 : SM.add_and_emit_load(uint64_t(0x1UL) << 8);
290 : 79 : SM.emit_Eq_i();
291 : 79 : SM.emit_Dup(); // duplicate outcome for later use
292 : 79 : SM.add_and_emit_load(uint64_t(0x1UL));
293 : 79 : SM.emit_Ld_Ctx(mask_id);
294 : 79 : SM.emit_Sel();
295 : 79 : SM.emit_Upd_Ctx(mask_id); // mask <- mask == 256 ? 1 : mask
296 : 79 : SM.emit_Pop();
297 : :
298 : : /* If the mask was reset, advance to the next byte. */
299 : 79 : SM.emit_Cast_i_b(); // convert outcome of previous check to int
300 : 79 : SM.emit_Ld_Ctx(offset_id);
301 : 79 : SM.emit_Add_p();
302 : 79 : SM.emit_Upd_Ctx(offset_id);
303 : 79 : SM.emit_Pop();
304 : 79 : } else {
305 : : if constexpr (IsStore) {
306 : : /* Load value to stack. */
307 : 3066 : SM.emit_Ld_Tup(tuple_id, idx);
308 : :
309 : : /* Store value. */
310 [ + + ]: 3066 : if (child_leaf->type()->is_boolean())
311 : 29 : SM.emit_St_b(bit_offset);
312 : : else
313 : 3037 : SM.emit_St(child_leaf->type());
314 : : } else {
315 : : /* Load value. */
316 [ + + ]: 402 : if (child_leaf->type()->is_boolean())
317 : 22 : SM.emit_Ld_b(0x1UL << bit_offset); // convert the fixed bit offset to a fixed mask
318 : : else
319 : 380 : SM.emit_Ld(child_leaf->type());
320 : :
321 [ - + ]: 402 : if (attr_can_be_null)
322 : 402 : SM.emit_Sel();
323 : :
324 : : /* Store value in output tuple. */
325 : 402 : SM.emit_St_Tup(tuple_id, idx, child_leaf->type());
326 : 402 : SM.emit_Pop();
327 : : }
328 : :
329 : : /* If the attribute has a stride, advance the pointer accordingly. */
330 : 3468 : M_insist(not bit_stride);
331 [ + + + + ]: 3468 : if (byte_stride) {
332 : : /* Advance the attribute pointer by the attribute's stride. */
333 : 3214 : SM.add_and_emit_load(int64_t(byte_stride));
334 : 3214 : SM.emit_Ld_Ctx(offset_id);
335 : 3214 : SM.emit_Add_p();
336 : 3214 : SM.emit_Upd_Ctx(offset_id);
337 : 3214 : SM.emit_Pop();
338 : 3214 : }
339 : : }
340 : :
341 : 3547 : }
342 : 3547 : }
343 : 4429 : } else {
344 : 882 : auto child_inode = as<const DataLayout::INode>(child.ptr.get());
345 : 882 : const std::size_t lin_id = row_id / child_inode->num_tuples();
346 : 882 : const std::size_t inner_row_id = row_id % child_inode->num_tuples();
347 : 882 : const uint64_t additional_offset = child.offset_in_bits / 8 + lin_id * child.stride_in_bits / 8;
348 : 882 : compile_accesses_ref(*child_inode, offset + additional_offset, inner_row_id, compile_accesses_ref);
349 : : }
350 : : }
351 : 1764 : };
352 : 882 : compile_accesses_impl(static_cast<const DataLayout::INode&>(layout), uintptr_t(address), row_id,
353 : : compile_accesses_impl);
354 : 882 : };
355 [ + - + - ]: 882 : compile_accesses(layout, row_id);
356 : :
357 : : /* If the NULL bitmap has a stride, advance the adjustable offset accordingly. */
358 [ + - + - : 882 : if (null_bitmap_info and null_bitmap_info.bit_stride) {
+ + + - +
- + + ]
359 [ + - + - : 840 : M_insist(null_bitmap_info.adjustable_offset());
+ - + - ]
360 [ + - + - ]: 840 : M_insist(null_bitmap_info.num_tuples > 1);
361 : :
362 : : /* Update adjustable offset. */
363 [ + - + - ]: 840 : SM.emit_Ld_Ctx(null_bitmap_info.offset_id);
364 [ + - + - : 840 : SM.add_and_emit_load(null_bitmap_info.bit_stride);
+ - + - ]
365 [ + - + - ]: 840 : SM.emit_Add_i();
366 [ + - + - ]: 840 : SM.emit_Upd_Ctx(null_bitmap_info.offset_id);
367 [ + - + - ]: 840 : SM.emit_Pop();
368 : :
369 : : /* Check whether we are in the last iteration and advance to correct byte. */
370 [ + - + - : 840 : const auto counter_id = SM.add_and_emit_load(uint64_t(null_bitmap_info.row_id));
+ - + - ]
371 [ + - + - ]: 840 : SM.emit_Inc();
372 [ + - + - ]: 840 : SM.emit_Upd_Ctx(counter_id);
373 [ + - + - : 840 : SM.add_and_emit_load(null_bitmap_info.num_tuples);
+ - + - ]
374 [ + - + - ]: 840 : SM.emit_NE_i();
375 [ + - + - : 840 : SM.emit_Dup(); SM.emit_Dup(); // triple outcome for later use
+ - + - ]
376 [ + - + - ]: 840 : SM.emit_Not_b(); // negate outcome of check
377 [ + - + - ]: 840 : SM.emit_Cast_i_b(); // convert to int
378 [ + - + - ]: 840 : SM.emit_Ld_Ctx(null_bitmap_info.offset_id);
379 [ + - + - ]: 840 : SM.emit_SARi_i(3); // corresponds div 8
380 [ + - + - ]: 840 : SM.emit_Mul_i();
381 [ + - + - ]: 840 : SM.emit_Ld_Ctx(null_bitmap_info.id);
382 [ + - + - ]: 840 : SM.emit_Add_p();
383 [ + - + - ]: 840 : SM.emit_Upd_Ctx(null_bitmap_info.id); // id <- counter != num_tuples ? id : id + adj_offset / 8
384 [ + - + - ]: 840 : SM.emit_Pop();
385 : :
386 : : /* If we were in the last iteration, reset adjustable offset. */
387 [ + - + - ]: 840 : SM.emit_Cast_i_b(); // convert outcome of previous check to int
388 [ + - + - ]: 840 : SM.emit_Ld_Ctx(null_bitmap_info.offset_id);
389 [ + - + - ]: 840 : SM.emit_Mul_i();
390 [ + - + - ]: 840 : SM.emit_Upd_Ctx(null_bitmap_info.offset_id);
391 [ + - + - ]: 840 : SM.emit_Pop();
392 : :
393 : : /* If we were in the last iteration, reset the counter. */
394 [ + - + - ]: 840 : SM.emit_Cast_i_b(); // convert outcome of previous check to int
395 [ + - + - ]: 840 : SM.emit_Ld_Ctx(counter_id);
396 [ + - + - ]: 840 : SM.emit_Mul_i();
397 [ + - + - ]: 840 : SM.emit_Upd_Ctx(counter_id);
398 [ + - + - ]: 840 : SM.emit_Pop();
399 : 840 : }
400 : :
401 : : /* Emit code to gap strides. */
402 : 1764 : auto compile_strides = [&](const DataLayout &layout, std::size_t row_id) -> void {
403 : 2646 : auto compile_strides_impl = [&](const DataLayout::INode &node, std::size_t row_id,
404 : : auto &compile_strides_ref) -> void {
405 [ + + + + ]: 7075 : for (auto &child : node) {
406 [ + + + + ]: 5311 : if (auto child_leaf = cast<const DataLayout::Leaf>(child.ptr.get())) {
407 : : std::size_t offset_id;
408 : 4429 : std::size_t mask_id = -1UL;
409 [ + - + + : 4429 : if (null_bitmap_info and child_leaf->index() == null_bitmap_idx) {
+ - + + ]
410 : 882 : offset_id = null_bitmap_info.id;
411 : 882 : mask_id = null_bitmap_info.offset_id;
412 [ + - + - ]: 4429 : } else if (auto it = leaf2id.find(child_leaf->index()); it != leaf2id.end()) {
413 : 3547 : offset_id = it->second;
414 [ + + + + ]: 3547 : if (auto it = leaf2mask.find(child_leaf->index()); it != leaf2mask.end())
415 : 79 : mask_id = it->second;
416 : 3547 : } else {
417 : 0 : continue; // nothing to be done
418 : : }
419 : :
420 : : /* Emit code for stride jumps. */
421 : 4429 : std::size_t prev_num_tuples = 1;
422 : 4429 : std::size_t prev_stride_in_bits = child.stride_in_bits;
423 [ + + + + ]: 8858 : for (auto it = stride_info_stack.rbegin(), end = stride_info_stack.rend(); it != end; ++it) {
424 : 4429 : auto &info = *it;
425 : :
426 : : /* Compute the remaining stride in bits. */
427 : 4429 : const std::size_t stride_remaining_in_bits =
428 : 4429 : info.stride_in_bits - (info.num_tuples / prev_num_tuples) * prev_stride_in_bits;
429 : :
430 : : /* Perform stride jump, if necessary. */
431 [ - + - + ]: 4429 : if (stride_remaining_in_bits) {
432 : 4429 : std::size_t byte_stride = stride_remaining_in_bits / 8;
433 : 4429 : const std::size_t bit_stride = stride_remaining_in_bits % 8;
434 : :
435 [ + + + + ]: 4429 : if (bit_stride) {
436 [ - + - + ]: 24 : M_insist(child_leaf->index() == null_bitmap_idx or child_leaf->type()->is_boolean(),
437 : : "only the null bitmap or booleans may cause not byte aligned stride jumps, "
438 : : "bitmaps are not supported yet");
439 [ + - + - ]: 24 : M_insist(child_leaf->index() != null_bitmap_idx or null_bitmap_info.adjustable_offset(),
440 : : "only null bitmaps with adjustable offset may cause not byte aligned stride jumps");
441 : 24 : M_insist(mask_id != -1UL);
442 : :
443 : : /* Reset mask. */
444 [ - + - + ]: 24 : if (child_leaf->index() == null_bitmap_idx) {
445 : : /* Reset adjustable bit offset to 0. */
446 [ # # # # ]: 0 : if (info.num_tuples != 1) {
447 : : /* Check whether counter equals num_tuples. */
448 : 0 : SM.emit_Ld_Ctx(info.counter_id);
449 : 0 : SM.add_and_emit_load(int64_t(info.num_tuples));
450 : 0 : SM.emit_NE_i();
451 : 0 : SM.emit_Cast_i_b();
452 : 0 : } else {
453 : 0 : SM.add_and_emit_load(uint64_t(0));
454 : : }
455 : 0 : SM.emit_Ld_Ctx(mask_id);
456 : 0 : SM.emit_Mul_i();
457 : 0 : SM.emit_Upd_Ctx(mask_id);
458 : 0 : SM.emit_Pop();
459 : 0 : } else {
460 : : /* Reset mask to 0x1UL to access first bit again. */
461 [ + - + - ]: 24 : if (info.num_tuples != 1) {
462 : : /* Check whether counter equals num_tuples. */
463 : 24 : SM.emit_Ld_Ctx(info.counter_id);
464 : 24 : SM.add_and_emit_load(int64_t(info.num_tuples));
465 : 24 : SM.emit_Eq_i();
466 : 24 : SM.add_and_emit_load(uint64_t(0x1UL));
467 : 24 : SM.emit_Ld_Ctx(mask_id);
468 : 24 : SM.emit_Sel();
469 : 24 : } else {
470 : 0 : SM.add_and_emit_load(uint64_t(0x1UL));
471 : : }
472 : 24 : SM.emit_Upd_Ctx(mask_id);
473 : 24 : SM.emit_Pop();
474 : : }
475 : :
476 : : /* Ceil to next entire byte. */
477 : 24 : ++byte_stride;
478 : 24 : }
479 : :
480 : : /* Advance pointer. */
481 [ + + + + ]: 4429 : if (info.num_tuples != 1) {
482 : : /* Check whether counter equals num_tuples. */
483 : 4133 : SM.emit_Ld_Ctx(info.counter_id);
484 : 4133 : SM.add_and_emit_load(int64_t(info.num_tuples));
485 : 4133 : SM.emit_Eq_i();
486 : 4133 : SM.emit_Cast_i_b();
487 : :
488 : 4133 : SM.add_and_emit_load(byte_stride);
489 : 4133 : SM.emit_Mul_i();
490 : 4133 : } else {
491 : 296 : SM.add_and_emit_load(byte_stride);
492 : : }
493 : 4429 : SM.emit_Ld_Ctx(offset_id);
494 : 4429 : SM.emit_Add_p();
495 : 4429 : SM.emit_Upd_Ctx(offset_id);
496 : 4429 : SM.emit_Pop();
497 : 4429 : }
498 : :
499 : : /* Update variables for next iteration. */
500 : 4429 : prev_num_tuples = info.num_tuples;
501 : 4429 : prev_stride_in_bits = info.stride_in_bits;
502 : 4429 : }
503 : 4429 : } else {
504 : 882 : auto child_inode = as<const DataLayout::INode>(child.ptr.get());
505 : :
506 : : /* Initialize counter and emit increment. */
507 : 882 : const std::size_t inner_row_id = row_id % child_inode->num_tuples();
508 : 882 : const auto counter_id = SM.add_and_emit_load(inner_row_id); // introduce counter to track iteration count
509 : 882 : SM.emit_Inc();
510 : 882 : SM.emit_Upd_Ctx(counter_id);
511 : 882 : SM.emit_Pop(); // XXX: not needed if recursion cleans up stack properly
512 : :
513 : : /* Put context on stack and perform recursive descend. */
514 : 3528 : stride_info_stack.push_back(stride_info_t{
515 : 882 : .counter_id = counter_id,
516 : 882 : .num_tuples = child_inode->num_tuples(),
517 : 882 : .stride_in_bits = child.stride_in_bits
518 : : });
519 : 882 : compile_strides_ref(*child_inode, inner_row_id, compile_strides_ref);
520 : 882 : stride_info_stack.pop_back();
521 : :
522 : : /* Reset counter if iteration is whole multiple of num_tuples. */
523 [ + + + + ]: 882 : if (child_inode->num_tuples() != 1) {
524 : 840 : SM.emit_Ld_Ctx(counter_id); // XXX: not needed if recursion cleans up stack properly
525 : 840 : SM.add_and_emit_load(child_inode->num_tuples());
526 : 840 : SM.emit_NE_i();
527 : 840 : SM.emit_Cast_i_b();
528 : 840 : SM.emit_Ld_Ctx(counter_id);
529 : 840 : SM.emit_Mul_i();
530 : 840 : } else {
531 : 42 : SM.add_and_emit_load(int64_t(0));
532 : : }
533 : 882 : SM.emit_Upd_Ctx(counter_id);
534 : : }
535 : : }
536 : 1764 : };
537 : 882 : compile_strides_impl(static_cast<const DataLayout::INode&>(layout), row_id, compile_strides_impl);
538 : 882 : };
539 [ + - + - ]: 882 : compile_strides(layout, row_id);
540 : :
541 : 882 : return SM;
542 [ + - + - ]: 882 : }
543 : :
544 : 104 : StackMachine Interpreter::compile_load(const Schema &tuple_schema, void *address, const storage::DataLayout &layout,
545 : : const Schema &layout_schema, std::size_t row_id, std::size_t tuple_id)
546 : : {
547 : 104 : return compile_data_layout<false>(tuple_schema, address, layout, layout_schema, row_id, tuple_id);
548 : : }
549 : :
550 : 778 : StackMachine Interpreter::compile_store(const Schema &tuple_schema, void *address, const storage::DataLayout &layout,
551 : : const Schema &layout_schema, std::size_t row_id, std::size_t tuple_id)
552 : : {
553 : 778 : return compile_data_layout<true>(tuple_schema, address, layout, layout_schema, row_id, tuple_id);
554 : : }
555 : :
556 : : /*======================================================================================================================
557 : : * Declaration of operator data.
558 : : *====================================================================================================================*/
559 : :
560 : : namespace {
561 : :
562 : : struct PrintData : OperatorData
563 : : {
564 : 0 : uint32_t num_rows = 0;
565 : : StackMachine printer;
566 : 0 : PrintData(const PrintOperator &op)
567 [ # # # # ]: 0 : : printer(op.schema())
568 : 0 : {
569 : 0 : auto &S = op.schema();
570 [ # # # # ]: 0 : auto ostream_index = printer.add(&op.out);
571 [ # # ]: 0 : for (std::size_t i = 0; i != S.num_entries(); ++i) {
572 [ # # ]: 0 : if (i != 0)
573 [ # # ]: 0 : printer.emit_Putc(ostream_index, ',');
574 [ # # ]: 0 : printer.emit_Ld_Tup(0, i);
575 [ # # # # ]: 0 : printer.emit_Print(ostream_index, S[i].type);
576 : 0 : }
577 : 0 : }
578 : : };
579 : :
580 : 0 : struct NoOpData : OperatorData
581 : : {
582 : 0 : uint32_t num_rows = 0;
583 : : };
584 : :
585 : : struct ProjectionData : OperatorData
586 : : {
587 : : Pipeline pipeline;
588 : : std::optional<StackMachine> projections;
589 : : Tuple res;
590 : :
591 : 81 : ProjectionData(const ProjectionOperator &op)
592 [ + - ]: 81 : : pipeline(op.schema())
593 [ - + ]: 81 : , res(op.schema())
594 : 162 : { }
595 : :
596 : 68 : void emit_projections(const Schema &pipeline_schema, const ProjectionOperator &op) {
597 : 68 : projections.emplace(pipeline_schema);
598 : 68 : std::size_t out_idx = 0;
599 [ + + ]: 348 : for (auto &p : op.projections()) {
600 : 280 : projections->emit(p.first.get(), 1);
601 : 280 : projections->emit_St_Tup(0, out_idx++, p.first.get().type());
602 : : }
603 : 68 : }
604 : : };
605 : :
606 : : struct JoinData : OperatorData
607 : : {
608 : : Pipeline pipeline;
609 : : std::vector<StackMachine> load_attrs;
610 : :
611 [ # # ]: 0 : JoinData(const JoinOperator &op) : pipeline(op.schema()) { }
612 : :
613 : 0 : void emit_load_attrs(const Schema &in_schema) {
614 : 0 : auto &SM = load_attrs.emplace_back();
615 [ # # ]: 0 : for (std::size_t schema_idx = 0; schema_idx != in_schema.num_entries(); ++schema_idx) {
616 : 0 : auto &e = in_schema[schema_idx];
617 : 0 : auto it = pipeline.schema().find(e.id);
618 [ # # ]: 0 : if (it != pipeline.schema().end()) { // attribute is needed
619 : 0 : SM.emit_Ld_Tup(1, schema_idx);
620 : 0 : SM.emit_St_Tup(0, std::distance(pipeline.schema().begin(), it), e.type);
621 : 0 : }
622 : 0 : }
623 : 0 : }
624 : : };
625 : :
626 : : struct NestedLoopsJoinData : JoinData
627 : : {
628 : : using buffer_type = std::vector<Tuple>;
629 : :
630 : : StackMachine predicate; ///< evaluated the predicate to a bool
631 : : std::vector<Schema> buffer_schemas; ///< schema of each buffer
632 : : buffer_type *buffers; ///< tuple buffer per child
633 : : std::size_t active_child;
634 : : Tuple res;
635 : :
636 [ # # # # ]: 0 : NestedLoopsJoinData(const JoinOperator &op)
637 : 0 : : JoinData(op)
638 [ # # # # ]: 0 : , buffers(new buffer_type[op.children().size() - 1])
639 [ # # # # : 0 : , res({ Type::Get_Boolean(Type::TY_Vector) })
# # # # ]
640 : 0 : { }
641 : :
642 [ # # ]: 0 : ~NestedLoopsJoinData() { delete[] buffers; }
643 : : };
644 : :
645 : : struct SimpleHashJoinData : JoinData
646 : : {
647 [ # # ]: 0 : Catalog &C = Catalog::Get();
648 : 0 : bool is_probe_phase = false; ///< determines whether tuples are used to *build* or *probe* the hash table
649 : : std::vector<std::pair<const ast::Expr*, const ast::Expr*>> exprs;
650 : : StackMachine build_key; ///< extracts the key of the build input
651 : : StackMachine probe_key; ///< extracts the key of the probe input
652 : : RefCountingHashMap<Tuple, Tuple> ht; ///< hash table on build input
653 : :
654 : : Schema key_schema; ///< the `Schema` of the `key`
655 : : Tuple key; ///< `Tuple` to hold the key
656 : :
657 [ # # # # : 0 : SimpleHashJoinData(const JoinOperator &op)
# # ]
658 : 0 : : JoinData(op)
659 [ # # ]: 0 : , ht(1024)
660 : 0 : {
661 [ # # ]: 0 : auto &schema_lhs = op.child(0)->schema();
662 : : #ifndef NDEBUG
663 [ # # ]: 0 : auto &schema_rhs = op.child(1)->schema();
664 : : #endif
665 : :
666 : : /* Decompose each join predicate of the form `A.x = B.y` into parts `A.x` and `B.y` and build the schema of the
667 : : * join key. */
668 : 0 : auto &pred = op.predicate();
669 [ # # ]: 0 : for (auto &clause : pred) {
670 [ # # ]: 0 : M_insist(clause.size() == 1, "invalid predicate for simple hash join");
671 : 0 : auto &literal = clause[0];
672 [ # # ]: 0 : M_insist(not literal.negative(), "invalid predicate for simple hash join");
673 : 0 : auto &expr = literal.expr();
674 [ # # ]: 0 : auto binary = as<const ast::BinaryExpr>(&expr);
675 [ # # ]: 0 : M_insist(binary->tok == TK_EQUAL);
676 : 0 : auto first = binary->lhs.get();
677 : 0 : auto second = binary->rhs.get();
678 [ # # # # : 0 : M_insist(is_comparable(first->type(), second->type()), "the two sides of a comparison should be comparable");
# # # # ]
679 [ # # # # : 0 : M_insist(first->type() == second->type(), "operand types must be equal");
# # ]
680 : :
681 : : /* Add type to general key schema. */
682 [ # # # # : 0 : key_schema.add(C.pool("key"), first->type());
# # # # ]
683 : :
684 : : /*----- Decide which side of the join the predicate belongs to. -----*/
685 [ # # ]: 0 : auto required_by_first = first->get_required();
686 : : #ifndef NDEBUG
687 [ # # ]: 0 : auto required_by_second = second->get_required();
688 : : #endif
689 [ # # # # ]: 0 : if ((required_by_first & schema_lhs).num_entries() != 0) {
690 : : #ifndef NDEBUG
691 [ # # # # ]: 0 : M_insist((required_by_second & schema_rhs).num_entries() != 0, "second must belong to RHS");
692 : : #endif
693 [ # # ]: 0 : exprs.emplace_back(first, second);
694 : 0 : } else {
695 : : #ifndef NDEBUG
696 [ # # # # ]: 0 : M_insist((required_by_first & schema_rhs).num_entries() != 0, "first must belong to RHS");
697 [ # # # # ]: 0 : M_insist((required_by_second & schema_lhs).num_entries() != 0, "second must belong to LHS");
698 : : #endif
699 [ # # ]: 0 : exprs.emplace_back(second, first);
700 : : }
701 : 0 : }
702 : :
703 : : /* Create the tuple holding a key. */
704 [ # # # # ]: 0 : key = Tuple(key_schema);
705 : 0 : }
706 : :
707 : 0 : void load_build_key(const Schema &pipeline_schema) {
708 [ # # ]: 0 : for (std::size_t i = 0; i != exprs.size(); ++i) {
709 : 0 : const ast::Expr *expr = exprs[i].first;
710 : 0 : build_key.emit(*expr, pipeline_schema, 1); // compile expr
711 : 0 : build_key.emit_St_Tup(0, i, expr->type()); // write result to index i
712 : 0 : }
713 : 0 : }
714 : :
715 : 0 : void load_probe_key(const Schema &pipeline_schema) {
716 [ # # ]: 0 : for (std::size_t i = 0; i != exprs.size(); ++i) {
717 : 0 : const ast::Expr *expr = exprs[i].second;
718 : 0 : probe_key.emit(*expr, pipeline_schema, 1); // compile expr
719 : 0 : probe_key.emit_St_Tup(0, i, expr->type()); // write result to index i
720 : 0 : }
721 : 0 : }
722 : : };
723 : :
724 : 0 : struct LimitData : OperatorData
725 : : {
726 : 0 : std::size_t num_tuples = 0;
727 : : };
728 : :
729 : : struct GroupingData : OperatorData
730 : : {
731 : : Pipeline pipeline;
732 : : StackMachine compute_key; ///< computes the key for a tuple
733 : : std::vector<StackMachine> compute_aggregate_arguments; ///< StackMachines to compute the argumetns of aggregations
734 : : std::vector<Tuple> args; ///< tuple used to hold the computed arguments
735 : :
736 : 0 : GroupingData(const GroupingOperator &op)
737 [ # # ]: 0 : : pipeline(op.schema())
738 [ # # # # : 0 : , compute_key(op.child(0)->schema())
# # ]
739 : 0 : {
740 [ # # ]: 0 : std::ostringstream oss;
741 : :
742 : : /* Compile the stack machine to compute the key and compute the key schema. */
743 : : {
744 : 0 : std::size_t key_idx = 0;
745 [ # # # # ]: 0 : for (auto [grp, alias] : op.group_by()) {
746 [ # # # # ]: 0 : compute_key.emit(grp.get(), 1);
747 [ # # # # : 0 : compute_key.emit_St_Tup(0, key_idx++, grp.get().type());
# # ]
748 : 0 : }
749 : : }
750 : :
751 : : /* Compile a StackMachine to compute the arguments of each aggregation function. For example, for the
752 : : * aggregation `AVG(price * tax)`, the compiled StackMachine computes `price * tax`. */
753 [ # # ]: 0 : for (auto agg : op.aggregates()) {
754 [ # # ]: 0 : auto &fe = as<const ast::FnApplicationExpr>(agg.get());
755 : 0 : std::size_t arg_idx = 0;
756 [ # # # # : 0 : StackMachine sm(op.child(0)->schema());
# # ]
757 : 0 : std::vector<const Type*> arg_types;
758 [ # # ]: 0 : for (auto &arg : fe.args) {
759 [ # # ]: 0 : sm.emit(*arg, 1);
760 [ # # # # : 0 : sm.emit_Cast(agg.get().type(), arg->type()); // cast argument type to aggregate type, e.g. f32 to f64 for SUM
# # ]
761 [ # # # # ]: 0 : sm.emit_St_Tup(0, arg_idx++, arg->type());
762 [ # # # # ]: 0 : arg_types.push_back(arg->type());
763 : : }
764 [ # # # # : 0 : args.emplace_back(Tuple(arg_types));
# # ]
765 [ # # ]: 0 : compute_aggregate_arguments.emplace_back(std::move(sm));
766 : 0 : }
767 : 0 : }
768 : : };
769 : :
770 : : struct AggregationData : OperatorData
771 : : {
772 : : Pipeline pipeline;
773 : : Tuple aggregates;
774 : : std::vector<StackMachine> compute_aggregate_arguments; ///< StackMachines to compute the argumetns of aggregations
775 : : std::vector<Tuple> args; ///< tuple used to hold the computed arguments
776 : :
777 [ # # ]: 0 : AggregationData(const AggregationOperator &op)
778 [ # # ]: 0 : : pipeline(op.schema())
779 : 0 : {
780 : 0 : std::vector<const Type*> types;
781 [ # # ]: 0 : for (auto &e : op.schema())
782 [ # # ]: 0 : types.push_back(e.type);
783 [ # # # # : 0 : types.push_back(Type::Get_Integer(Type::TY_Scalar, 8)); // add nth_tuple counter
# # ]
784 [ # # # # ]: 0 : aggregates = Tuple(std::move(types));
785 [ # # # # ]: 0 : aggregates.set(op.schema().num_entries(), 0L); // initialize running count
786 : :
787 [ # # ]: 0 : for (auto agg : op.aggregates()) {
788 [ # # ]: 0 : auto &fe = as<const ast::FnApplicationExpr>(agg.get());
789 : 0 : std::size_t arg_idx = 0;
790 [ # # # # : 0 : StackMachine sm(op.child(0)->schema());
# # ]
791 : 0 : std::vector<const Type*> arg_types;
792 [ # # ]: 0 : for (auto &arg : fe.args) {
793 [ # # ]: 0 : sm.emit(*arg, 1);
794 [ # # # # : 0 : sm.emit_Cast(agg.get().type(), arg->type()); // cast argument type to aggregate type, e.g. f32 to f64 for SUM
# # ]
795 [ # # # # ]: 0 : sm.emit_St_Tup(0, arg_idx++, agg.get().type()); // store casted argument of aggregate type to tuple
796 [ # # # # ]: 0 : arg_types.push_back(agg.get().type());
797 : : }
798 [ # # # # : 0 : args.emplace_back(Tuple(arg_types));
# # ]
799 [ # # ]: 0 : compute_aggregate_arguments.emplace_back(std::move(sm));
800 : 0 : }
801 : 0 : }
802 : : };
803 : :
804 : : struct HashBasedGroupingData : GroupingData
805 : : {
806 : : /** Callable to compute the hash of the keys of a tuple. */
807 : : struct hasher
808 : : {
809 : : std::size_t key_size;
810 : :
811 : 0 : hasher(std::size_t key_size) : key_size(key_size) { }
812 : :
813 : 0 : uint64_t operator()(const Tuple &tup) const {
814 : : std::hash<Value> h;
815 : 0 : uint64_t hash = 0xcbf29ce484222325;
816 [ # # ]: 0 : for (std::size_t i = 0; i != key_size; ++i) {
817 [ # # ]: 0 : hash ^= tup.is_null(i) ? 0 : h(tup[i]);
818 : 0 : hash *= 1099511628211;
819 : 0 : }
820 : 0 : return hash;
821 : : }
822 : : };
823 : :
824 : : /** Callable to compare two tuples by their keys. */
825 : : struct equals
826 : : {
827 : : std::size_t key_size;
828 : :
829 : 0 : equals(std::size_t key_size) : key_size(key_size) { }
830 : :
831 : 0 : uint64_t operator()(const Tuple &first, const Tuple &second) const {
832 [ # # ]: 0 : for (std::size_t i = 0; i != key_size; ++i) {
833 [ # # ]: 0 : if (first.is_null(i) != second.is_null(i)) return false;
834 [ # # ]: 0 : if (not first.is_null(i))
835 [ # # ]: 0 : if (first.get(i) != second.get(i)) return false;
836 : 0 : }
837 : 0 : return true;
838 : 0 : }
839 : : };
840 : :
841 : : /** A map of `Tuple`s, where the key part is used for hashing and comparison. The mapped to value holds the count
842 : : * of tuples that belong to this group. */
843 : : std::unordered_map<Tuple, unsigned, hasher, equals> groups;
844 : :
845 : 0 : HashBasedGroupingData(const GroupingOperator &op)
846 : 0 : : GroupingData(op)
847 [ # # # # : 0 : , groups(1024, hasher(op.group_by().size()), equals(op.group_by().size()))
# # ]
848 : 0 : { }
849 : : };
850 : :
851 : : struct SortingData : OperatorData
852 : : {
853 : : Pipeline pipeline;
854 : : std::vector<Tuple> buffer;
855 : :
856 [ # # ]: 0 : SortingData(Schema buffer_schema) : pipeline(std::move(buffer_schema)) { }
857 : : };
858 : :
859 : : struct FilterData : OperatorData
860 : : {
861 : : StackMachine filter;
862 : : Tuple res;
863 : :
864 : 0 : FilterData(const FilterOperator &op, const Schema &pipeline_schema)
865 [ # # # # ]: 0 : : filter(pipeline_schema)
866 [ # # # # : 0 : , res({ Type::Get_Boolean(Type::TY_Vector) })
# # # # ]
867 : 0 : {
868 [ # # # # ]: 0 : filter.emit(op.filter(), 1);
869 [ # # ]: 0 : filter.emit_St_Tup_b(0, 0);
870 : 0 : }
871 : : };
872 : :
873 : : struct DisjunctiveFilterData : OperatorData
874 : : {
875 : : std::vector<StackMachine> predicates;
876 : : Tuple res;
877 : :
878 : 0 : DisjunctiveFilterData(const DisjunctiveFilterOperator &op, const Schema &pipeline_schema)
879 [ # # # # : 0 : : res({ Type::Get_Boolean(Type::TY_Vector) })
# # # # ]
880 : 0 : {
881 [ # # ]: 0 : auto clause = op.filter()[0];
882 [ # # ]: 0 : for (cnf::Predicate &pred : clause) {
883 [ # # ]: 0 : cnf::Clause clause({ pred });
884 [ # # # # ]: 0 : cnf::CNF cnf({ clause });
885 [ # # ]: 0 : StackMachine &SM = predicates.emplace_back(pipeline_schema);
886 [ # # ]: 0 : SM.emit(cnf, 1); // compile single predicate
887 [ # # ]: 0 : SM.emit_St_Tup_b(0, 0);
888 : 0 : }
889 : 0 : }
890 : : };
891 : :
892 : : }
893 : :
894 : :
895 : : /*======================================================================================================================
896 : : * Pipeline
897 : : *====================================================================================================================*/
898 : :
899 : 81 : void Pipeline::operator()(const ScanOperator &op)
900 : : {
901 : 81 : auto &store = op.store();
902 : 81 : auto &table = store.table();
903 : 81 : const auto num_rows = store.num_rows();
904 : :
905 : : /* Compile StackMachine to load tuples from store. */
906 [ + - - + ]: 81 : auto loader = Interpreter::compile_load(op.schema(), store.memory().addr(), table.layout(), table.schema());
907 : :
908 [ + - ]: 81 : const auto remainder = num_rows % block_.capacity();
909 : 81 : std::size_t i = 0;
910 : : /* Fill entire vector. */
911 [ + + + - ]: 123 : for (auto end = num_rows - remainder; i != end; i += block_.capacity()) {
912 [ + - ]: 42 : block_.clear();
913 [ + - ]: 42 : block_.fill();
914 [ + - + + ]: 2730 : for (std::size_t j = 0; j != block_.capacity(); ++j) {
915 [ + - ]: 2688 : Tuple *args[] = { &block_[j] };
916 [ + - ]: 2688 : loader(args);
917 : 2688 : }
918 [ + - + - ]: 42 : op.parent()->accept(*this);
919 : 42 : }
920 [ + + ]: 81 : if (i != num_rows) {
921 : : /* Fill last vector with remaining tuples. */
922 [ + - ]: 68 : block_.clear();
923 [ + - ]: 68 : block_.mask((1UL << remainder) - 1);
924 [ + - + - : 1730 : for (std::size_t j = 0; i != op.store().num_rows(); ++i, ++j) {
+ + ]
925 [ + - + - ]: 1662 : M_insist(j < block_.capacity());
926 [ + - ]: 1662 : Tuple *args[] = { &block_[j] };
927 [ + - ]: 1662 : loader(args);
928 : 1662 : }
929 [ + - + - ]: 68 : op.parent()->accept(*this);
930 : 68 : }
931 : 81 : }
932 : :
933 : 110 : void Pipeline::operator()(const CallbackOperator &op)
934 : : {
935 [ + + ]: 4460 : for (auto &t : block_)
936 : 4350 : op.callback()(op.schema(), t);
937 : 110 : }
938 : :
939 : 0 : void Pipeline::operator()(const PrintOperator &op)
940 : : {
941 : 0 : auto data = as<PrintData>(op.data());
942 : 0 : data->num_rows += block_.size();
943 [ # # ]: 0 : for (auto &t : block_) {
944 : 0 : Tuple *args[] = { &t };
945 : 0 : data->printer(args);
946 : 0 : op.out << '\n';
947 : : }
948 : 0 : }
949 : :
950 : 0 : void Pipeline::operator()(const NoOpOperator &op)
951 : : {
952 : 0 : as<NoOpData>(op.data())->num_rows += block_.size();
953 : 0 : }
954 : :
955 : 0 : void Pipeline::operator()(const FilterOperator &op)
956 : : {
957 [ # # ]: 0 : if (not op.data())
958 [ # # # # ]: 0 : op.data(new FilterData(op, this->schema()));
959 : :
960 : 0 : auto data = as<FilterData>(op.data());
961 [ # # ]: 0 : for (auto it = block_.begin(); it != block_.end(); ++it) {
962 : 0 : Tuple *args[] = { &data->res, &*it };
963 : 0 : data->filter(args);
964 [ # # # # ]: 0 : if (data->res.is_null(0) or not data->res[0].as_b()) block_.erase(it);
965 : 0 : }
966 [ # # ]: 0 : if (not block_.empty())
967 : 0 : op.parent()->accept(*this);
968 : 0 : }
969 : :
970 : 0 : void Pipeline::operator()(const DisjunctiveFilterOperator &op)
971 : : {
972 [ # # ]: 0 : if (not op.data())
973 [ # # # # ]: 0 : op.data(new DisjunctiveFilterData(op, this->schema()));
974 : :
975 : 0 : auto data = as<DisjunctiveFilterData>(op.data());
976 [ # # ]: 0 : for (auto it = block_.begin(); it != block_.end(); ++it) {
977 : 0 : data->res.set(0, false); // reset
978 : 0 : Tuple *args[] = { &data->res, &*it };
979 : :
980 [ # # ]: 0 : for (auto &pred : data->predicates) {
981 : 0 : pred(args);
982 [ # # # # ]: 0 : if (not data->res.is_null(0) and data->res[0].as_b())
983 : 0 : goto satisfied; // one predicate is satisfied ⇒ entire clause is satisfied
984 : : }
985 : 0 : block_.erase(it); // no predicate was satisfied ⇒ drop tuple
986 : : satisfied:;
987 : 0 : }
988 [ # # ]: 0 : if (not block_.empty())
989 : 0 : op.parent()->accept(*this);
990 : 0 : }
991 : :
992 : 0 : void Pipeline::operator()(const JoinOperator &op)
993 : : {
994 [ # # ]: 0 : if (is<SimpleHashJoinData>(op.data())) {
995 : : /* Perform simple hash join. */
996 : 0 : auto data = as<SimpleHashJoinData>(op.data());
997 : 0 : Tuple *args[2] = { &data->key, nullptr };
998 [ # # ]: 0 : if (data->is_probe_phase) {
999 [ # # ]: 0 : if (data->load_attrs.size() != 2) {
1000 : 0 : data->load_probe_key(this->schema());
1001 : 0 : data->emit_load_attrs(this->schema());
1002 : 0 : }
1003 : 0 : auto &pipeline = data->pipeline;
1004 : 0 : std::size_t i = 0;
1005 [ # # ]: 0 : for (auto &t : block_) {
1006 : 0 : args[1] = &t;
1007 : 0 : data->probe_key(args);
1008 : 0 : pipeline.block_.fill();
1009 [ # # ]: 0 : data->ht.for_all(*args[0], [&](std::pair<const Tuple, Tuple> &v) {
1010 [ # # ]: 0 : if (i == pipeline.block_.capacity()) {
1011 : 0 : pipeline.push(*op.parent());
1012 : 0 : i = 0;
1013 : 0 : }
1014 : :
1015 : : {
1016 : 0 : Tuple *load_args[2] = { &pipeline.block_[i], &v.second };
1017 : 0 : data->load_attrs[0](load_args); // load build attrs
1018 : : }
1019 : : {
1020 : 0 : Tuple *load_args[2] = { &pipeline.block_[i], &t };
1021 : 0 : data->load_attrs[1](load_args); // load probe attrs
1022 : : }
1023 : 0 : ++i;
1024 : 0 : });
1025 : : }
1026 : :
1027 [ # # ]: 0 : if (i != 0) {
1028 : 0 : M_insist(i <= pipeline.block_.capacity());
1029 [ # # ]: 0 : pipeline.block_.mask(i == pipeline.block_.capacity() ? -1UL : (1UL << i) - 1);
1030 : 0 : pipeline.push(*op.parent());
1031 : 0 : }
1032 : 0 : } else {
1033 [ # # ]: 0 : if (data->load_attrs.size() != 1) {
1034 : 0 : data->load_build_key(this->schema());
1035 : 0 : data->emit_load_attrs(this->schema());
1036 : 0 : }
1037 : 0 : const auto &tuple_schema = op.child(0)->schema();
1038 [ # # ]: 0 : for (auto &t : block_) {
1039 : 0 : args[1] = &t;
1040 : 0 : data->build_key(args);
1041 [ # # # # ]: 0 : data->ht.insert_with_duplicates(args[0]->clone(data->key_schema), t.clone(tuple_schema));
1042 : : }
1043 : : }
1044 : 0 : } else {
1045 : : /* Perform nested-loops join. */
1046 : 0 : auto data = as<NestedLoopsJoinData>(op.data());
1047 : 0 : auto size = op.children().size();
1048 [ # # ]: 0 : std::vector<Tuple*> predicate_args(size + 1, nullptr);
1049 : 0 : predicate_args[0] = &data->res;
1050 : :
1051 [ # # ]: 0 : if (data->active_child == size - 1) {
1052 : : /* This is the right-most child. Combine its produced tuple with all combinations of the buffered
1053 : : * tuples. */
1054 [ # # ]: 0 : std::vector<std::size_t> positions(size - 1, std::size_t(-1L)); // positions within each buffer
1055 : 0 : std::size_t child_id = 0; // cursor to the child that provides the next part of the joined tuple
1056 : 0 : auto &pipeline = data->pipeline;
1057 : :
1058 : : /* Compile loading data from current child. */
1059 [ # # ]: 0 : if (data->buffer_schemas.size() != size) {
1060 [ # # ]: 0 : M_insist(data->buffer_schemas.size() == size - 1);
1061 [ # # ]: 0 : M_insist(data->load_attrs.size() == size - 1);
1062 [ # # # # ]: 0 : data->emit_load_attrs(this->schema());
1063 [ # # # # ]: 0 : data->buffer_schemas.emplace_back(this->schema()); // save the schema of the current pipeline
1064 [ # # # # ]: 0 : if (op.predicate().size()) {
1065 [ # # ]: 0 : std::vector<std::size_t> tuple_ids(size);
1066 [ # # ]: 0 : std::iota(tuple_ids.begin(), tuple_ids.end(), 1); // start at index 1
1067 [ # # # # ]: 0 : data->predicate.emit(op.predicate(), data->buffer_schemas, tuple_ids);
1068 [ # # ]: 0 : data->predicate.emit_St_Tup_b(0, 0);
1069 : 0 : }
1070 : 0 : }
1071 : :
1072 [ # # ]: 0 : M_insist(data->buffer_schemas.size() == size);
1073 [ # # ]: 0 : M_insist(data->load_attrs.size() == size);
1074 : :
1075 : 0 : for (;;) {
1076 [ # # ]: 0 : if (child_id == size - 1) { // right-most child, which produced the RHS `block_`
1077 : : /* Combine the tuples. One tuple from each buffer. */
1078 [ # # ]: 0 : pipeline.clear();
1079 [ # # # # ]: 0 : pipeline.block_.mask(block_.mask());
1080 : :
1081 [ # # # # ]: 0 : if (op.predicate().size()) {
1082 [ # # ]: 0 : for (std::size_t cid = 0; cid != child_id; ++cid)
1083 : 0 : predicate_args[cid + 1] = &data->buffers[cid][positions[cid]];
1084 : 0 : }
1085 : :
1086 : : /* Concatenate tuples from the first n-1 children. */
1087 [ # # # # : 0 : for (auto output_it = pipeline.block_.begin(); output_it != pipeline.block_.end(); ++output_it) {
# # # # #
# ]
1088 [ # # # # ]: 0 : auto &rhs = block_[output_it.index()];
1089 [ # # # # ]: 0 : if (op.predicate().size()) { // do we have a predicate?
1090 : 0 : predicate_args[size] = &rhs;
1091 [ # # ]: 0 : data->predicate(predicate_args.data()); // evaluate predicate
1092 [ # # # # : 0 : if (data->res.is_null(0) or not data->res[0].as_b()) {
# # # # #
# ]
1093 [ # # ]: 0 : pipeline.block_.erase(output_it);
1094 : 0 : continue;
1095 : : }
1096 : 0 : }
1097 : :
1098 [ # # ]: 0 : for (std::size_t i = 0; i != child_id; ++i) {
1099 : 0 : auto &buffer = data->buffers[i]; // get buffer of i-th child
1100 [ # # ]: 0 : Tuple *load_args[2] = { &*output_it, &buffer[positions[i]] }; // load child's current tuple
1101 [ # # ]: 0 : data->load_attrs[i](load_args);
1102 : 0 : }
1103 : :
1104 : : {
1105 [ # # ]: 0 : Tuple *load_args[2] = { &*output_it, &rhs }; // load last child's attributes
1106 [ # # ]: 0 : data->load_attrs[child_id](load_args);
1107 : : }
1108 : 0 : }
1109 : :
1110 [ # # # # ]: 0 : if (not pipeline.block_.empty())
1111 [ # # # # ]: 0 : pipeline.push(*op.parent());
1112 : 0 : --child_id;
1113 : 0 : } else { // child whose tuples have been materialized in a buffer
1114 : 0 : ++positions[child_id];
1115 : 0 : auto &buffer = data->buffers[child_id];
1116 [ # # ]: 0 : if (positions[child_id] == buffer.size()) { // reached the end of this buffer; backtrack
1117 [ # # ]: 0 : if (child_id == 0)
1118 : 0 : break;
1119 : 0 : positions[child_id] = std::size_t(-1L);
1120 : 0 : --child_id;
1121 : 0 : } else {
1122 [ # # ]: 0 : M_insist(positions[child_id] < buffer.size(), "position out of bounds");
1123 : 0 : ++child_id;
1124 : : }
1125 : : }
1126 : : }
1127 : 0 : } else {
1128 : : /* This is not the right-most child. Collect its produced tuples in a buffer. */
1129 [ # # # # ]: 0 : const auto &tuple_schema = op.child(data->active_child)->schema();
1130 [ # # ]: 0 : if (data->buffer_schemas.size() <= data->active_child) {
1131 [ # # # # ]: 0 : data->buffer_schemas.emplace_back(this->schema()); // save the schema of the current pipeline
1132 [ # # # # ]: 0 : data->emit_load_attrs(this->schema());
1133 [ # # ]: 0 : M_insist(data->buffer_schemas.size() == data->load_attrs.size());
1134 : 0 : }
1135 [ # # # # : 0 : for (auto &t : block_)
# # # # #
# # # ]
1136 [ # # # # ]: 0 : data->buffers[data->active_child].emplace_back(t.clone(tuple_schema));
1137 : : }
1138 : 0 : }
1139 : 0 : }
1140 : :
1141 : 110 : void Pipeline::operator()(const ProjectionOperator &op)
1142 : : {
1143 : 110 : auto data = as<ProjectionData>(op.data());
1144 : 110 : auto &pipeline = data->pipeline;
1145 [ + + ]: 110 : if (not data->projections)
1146 : 68 : data->emit_projections(this->schema(), op);
1147 : :
1148 : 110 : pipeline.clear();
1149 : 110 : pipeline.block_.mask(block_.mask());
1150 : :
1151 [ + + ]: 4460 : for (auto it = block_.begin(); it != block_.end(); ++it) {
1152 : 4350 : auto &out = pipeline.block_[it.index()];
1153 : 4350 : Tuple *args[] = { &out, &*it };
1154 : 4350 : (*data->projections)(args);
1155 : 4350 : }
1156 : :
1157 : 110 : pipeline.push(*op.parent());
1158 : 110 : }
1159 : :
1160 : 0 : void Pipeline::operator()(const LimitOperator &op)
1161 : : {
1162 : 0 : auto data = as<LimitData>(op.data());
1163 : :
1164 [ # # ]: 0 : for (auto it = block_.begin(); it != block_.end(); ++it) {
1165 [ # # # # ]: 0 : if (data->num_tuples < op.offset() or data->num_tuples >= op.offset() + op.limit())
1166 : 0 : block_.erase(it); /* discard this tuple */
1167 : 0 : ++data->num_tuples;
1168 : 0 : }
1169 : :
1170 [ # # ]: 0 : if (not block_.empty())
1171 : 0 : op.parent()->accept(*this);
1172 : :
1173 [ # # ]: 0 : if (data->num_tuples >= op.offset() + op.limit())
1174 : 0 : throw LimitOperator::stack_unwind(); // all tuples produced, now unwind the stack
1175 : 0 : }
1176 : :
1177 : 0 : void Pipeline::operator()(const GroupingOperator &op)
1178 : : {
1179 : 0 : auto perform_aggregation = [&](decltype(HashBasedGroupingData::groups)::value_type &entry, Tuple &tuple,
1180 : : GroupingData &data)
1181 : : {
1182 : 0 : const std::size_t key_size = op.group_by().size();
1183 : :
1184 : 0 : Tuple &group = const_cast<Tuple&>(entry.first);
1185 : 0 : const unsigned nth_tuple = ++entry.second;
1186 : :
1187 : : /* Add this tuple to its group by computing the aggregates. */
1188 [ # # ]: 0 : for (std::size_t i = 0, end = op.aggregates().size(); i != end; ++i) {
1189 : 0 : auto &aggregate_arguments = data.args[i];
1190 : 0 : Tuple *args[] = { &aggregate_arguments, &tuple };
1191 : 0 : data.compute_aggregate_arguments[i](args);
1192 : :
1193 : 0 : bool is_null = group.is_null(key_size + i);
1194 : 0 : auto &val = group[key_size + i];
1195 : :
1196 : 0 : auto &fe = as<const ast::FnApplicationExpr>(op.aggregates()[i].get());
1197 : 0 : auto ty = fe.type();
1198 : 0 : auto &fn = fe.get_function();
1199 : :
1200 [ # # # # : 0 : switch (fn.fnid) {
# # # ]
1201 : : default:
1202 : 0 : M_unreachable("function kind not implemented");
1203 : :
1204 : : case Function::FN_UDF:
1205 : 0 : M_unreachable("UDFs not yet supported");
1206 : :
1207 : : case Function::FN_COUNT:
1208 [ # # ]: 0 : if (is_null)
1209 : 0 : group.set(key_size + i, 0); // initialize
1210 [ # # ]: 0 : if (fe.args.size() == 0) { // COUNT(*)
1211 : 0 : val.as_i() += 1;
1212 : 0 : } else { // COUNT(x) aka. count not NULL
1213 : 0 : val.as_i() += not aggregate_arguments.is_null(0);
1214 : : }
1215 : 0 : break;
1216 : :
1217 : : case Function::FN_SUM: {
1218 : 0 : auto n = as<const Numeric>(ty);
1219 [ # # ]: 0 : if (is_null) {
1220 [ # # ]: 0 : if (n->is_floating_point())
1221 : 0 : group.set(key_size + i, 0.); // double precision
1222 : : else
1223 : 0 : group.set(key_size + i, 0); // int
1224 : 0 : }
1225 [ # # ]: 0 : if (aggregate_arguments.is_null(0)) continue; // skip NULL
1226 [ # # ]: 0 : if (n->is_floating_point())
1227 : 0 : val.as_d() += aggregate_arguments[0].as_d();
1228 : : else
1229 : 0 : val.as_i() += aggregate_arguments[0].as_i();
1230 : 0 : break;
1231 : : }
1232 : :
1233 : : case Function::FN_AVG: {
1234 [ # # ]: 0 : if (is_null) {
1235 [ # # ]: 0 : if (ty->is_floating_point())
1236 : 0 : group.set(key_size + i, 0.); // double precision
1237 : : else
1238 : 0 : group.set(key_size + i, 0); // int
1239 : 0 : }
1240 [ # # ]: 0 : if (aggregate_arguments.is_null(0)) continue; // skip NULL
1241 : : /* Compute AVG as iterative mean as described in Knuth, The Art of Computer Programming Vol 2,
1242 : : * section 4.2.2. */
1243 : 0 : val.as_d() += (aggregate_arguments[0].as_d() - val.as_d()) / nth_tuple;
1244 : 0 : break;
1245 : : }
1246 : :
1247 : : case Function::FN_MIN: {
1248 : : using std::min;
1249 [ # # ]: 0 : if (aggregate_arguments.is_null(0)) continue; // skip NULL
1250 [ # # ]: 0 : if (is_null) {
1251 : 0 : group.set(key_size + i, aggregate_arguments[0]);
1252 : 0 : continue;
1253 : : }
1254 : :
1255 : 0 : auto n = as<const Numeric>(ty);
1256 [ # # ]: 0 : if (n->is_float())
1257 : 0 : val.as_f() = min(val.as_f(), aggregate_arguments[0].as_f());
1258 [ # # ]: 0 : else if (n->is_double())
1259 : 0 : val.as_d() = min(val.as_d(), aggregate_arguments[0].as_d());
1260 : : else
1261 : 0 : val.as_i() = min(val.as_i(), aggregate_arguments[0].as_i());
1262 : 0 : break;
1263 : : }
1264 : :
1265 : : case Function::FN_MAX: {
1266 : : using std::max;
1267 [ # # ]: 0 : if (aggregate_arguments.is_null(0)) continue; // skip NULL
1268 [ # # ]: 0 : if (is_null) {
1269 : 0 : group.set(key_size + i, aggregate_arguments[0]);
1270 : 0 : continue;
1271 : : }
1272 : :
1273 : 0 : auto n = as<const Numeric>(ty);
1274 [ # # ]: 0 : if (n->is_float())
1275 : 0 : val.as_f() = max(val.as_f(), aggregate_arguments[0].as_f());
1276 [ # # ]: 0 : else if (n->is_double())
1277 : 0 : val.as_d() = max(val.as_d(), aggregate_arguments[0].as_d());
1278 : : else
1279 : 0 : val.as_i() = max(val.as_i(), aggregate_arguments[0].as_i());
1280 : 0 : break;
1281 : : }
1282 : : }
1283 : 0 : }
1284 : 0 : };
1285 : :
1286 : : /* Find the group. */
1287 : 0 : auto data = as<HashBasedGroupingData>(op.data());
1288 : 0 : auto &groups = data->groups;
1289 : :
1290 : 0 : Tuple key(op.schema());
1291 [ # # # # : 0 : for (auto &tuple : block_) {
# # # # #
# # # ]
1292 : 0 : Tuple *args[] = { &key, &tuple };
1293 [ # # ]: 0 : data->compute_key(args);
1294 [ # # ]: 0 : auto it = groups.find(key);
1295 [ # # ]: 0 : if (it == groups.end()) {
1296 : : /* Initialize the group's aggregate to NULL. This will be overwritten by the neutral element w.r.t.
1297 : : * the aggregation function. */
1298 [ # # ]: 0 : it = groups.emplace_hint(it, std::move(key), 0);
1299 [ # # # # : 0 : key = Tuple(op.schema());
# # ]
1300 : 0 : }
1301 [ # # ]: 0 : perform_aggregation(*it, tuple, *data);
1302 : : }
1303 : 0 : }
1304 : :
1305 : 0 : void Pipeline::operator()(const AggregationOperator &op)
1306 : : {
1307 : 0 : auto data = as<AggregationData>(op.data());
1308 : 0 : auto &nth_tuple = data->aggregates[op.schema().num_entries()].as_i();
1309 : :
1310 [ # # ]: 0 : for (auto &tuple : block_) {
1311 : 0 : nth_tuple += 1UL;
1312 [ # # ]: 0 : for (std::size_t i = 0, end = op.aggregates().size(); i != end; ++i) {
1313 : 0 : auto &aggregate_arguments = data->args[i];
1314 : 0 : Tuple *args[] = { &aggregate_arguments, &tuple };
1315 : 0 : data->compute_aggregate_arguments[i](args);
1316 : :
1317 : 0 : auto &fe = as<const ast::FnApplicationExpr>(op.aggregates()[i].get());
1318 : 0 : auto ty = fe.type();
1319 : 0 : auto &fn = fe.get_function();
1320 : :
1321 : 0 : bool agg_is_null = data->aggregates.is_null(i);
1322 : 0 : auto &val = data->aggregates[i];
1323 : :
1324 [ # # # # : 0 : switch (fn.fnid) {
# # # ]
1325 : : default:
1326 : 0 : M_unreachable("function kind not implemented");
1327 : :
1328 : : case Function::FN_UDF:
1329 : 0 : M_unreachable("UDFs not yet supported");
1330 : :
1331 : : case Function::FN_COUNT:
1332 [ # # ]: 0 : if (fe.args.size() == 0) { // COUNT(*)
1333 : 0 : val.as_i() += 1;
1334 : 0 : } else { // COUNT(x) aka. count not NULL
1335 : 0 : val.as_i() += not aggregate_arguments.is_null(0);
1336 : : }
1337 : 0 : break;
1338 : :
1339 : : case Function::FN_SUM: {
1340 : 0 : auto n = as<const Numeric>(ty);
1341 [ # # ]: 0 : if (aggregate_arguments.is_null(0)) continue; // skip NULL
1342 [ # # ]: 0 : if (n->is_floating_point())
1343 : 0 : val.as_d() += aggregate_arguments[0].as_d();
1344 : : else
1345 : 0 : val.as_i() += aggregate_arguments[0].as_i();
1346 : 0 : break;
1347 : : }
1348 : :
1349 : : case Function::FN_AVG: {
1350 [ # # ]: 0 : if (aggregate_arguments.is_null(0)) continue; // skip NULL
1351 : : /* Compute AVG as iterative mean as described in Knuth, The Art of Computer Programming Vol 2,
1352 : : * section 4.2.2. */
1353 : 0 : val.as_d() += (aggregate_arguments[0].as_d() - val.as_d()) / nth_tuple;
1354 : 0 : break;
1355 : : }
1356 : :
1357 : : case Function::FN_MIN: {
1358 : : using std::min;
1359 [ # # ]: 0 : if (aggregate_arguments.is_null(0)) continue; // skip NULL
1360 [ # # ]: 0 : if (agg_is_null) {
1361 : 0 : data->aggregates.set(i, aggregate_arguments[0]);
1362 : 0 : continue;
1363 : : }
1364 : :
1365 : 0 : auto n = as<const Numeric>(ty);
1366 [ # # ]: 0 : if (n->is_float())
1367 : 0 : val.as_f() = min(val.as_f(), aggregate_arguments[0].as_f());
1368 [ # # ]: 0 : else if (n->is_double())
1369 : 0 : val.as_d() = min(val.as_d(), aggregate_arguments[0].as_d());
1370 : : else
1371 : 0 : val.as_i() = min(val.as_i(), aggregate_arguments[0].as_i());
1372 : 0 : break;
1373 : : }
1374 : :
1375 : : case Function::FN_MAX: {
1376 : : using std::max;
1377 [ # # ]: 0 : if (aggregate_arguments.is_null(0)) continue; // skip NULL
1378 [ # # ]: 0 : if (agg_is_null) {
1379 : 0 : data->aggregates.set(i, aggregate_arguments[0]);
1380 : 0 : continue;
1381 : : }
1382 : :
1383 : 0 : auto n = as<const Numeric>(ty);
1384 [ # # ]: 0 : if (n->is_float())
1385 : 0 : val.as_f() = max(val.as_f(), aggregate_arguments[0].as_f());
1386 [ # # ]: 0 : else if (n->is_double())
1387 : 0 : val.as_d() = max(val.as_d(), aggregate_arguments[0].as_d());
1388 : : else
1389 : 0 : val.as_i() = max(val.as_i(), aggregate_arguments[0].as_i());
1390 : 0 : break;
1391 : : }
1392 : : }
1393 : 0 : }
1394 : : }
1395 : 0 : }
1396 : :
1397 : 0 : void Pipeline::operator()(const SortingOperator &op)
1398 : : {
1399 [ # # ]: 0 : if (not op.data())
1400 [ # # # # : 0 : op.data(new SortingData(this->schema()));
# # # # #
# ]
1401 : :
1402 : : /* cache all tuples for sorting */
1403 : 0 : auto data = as<SortingData>(op.data());
1404 [ # # ]: 0 : for (auto &t : block_)
1405 [ # # ]: 0 : data->buffer.emplace_back(t.clone(this->schema()));
1406 : 0 : }
1407 : :
1408 : : /*======================================================================================================================
1409 : : * Interpreter - Recursive descent
1410 : : *====================================================================================================================*/
1411 : :
1412 : 81 : void Interpreter::operator()(const CallbackOperator &op)
1413 : : {
1414 : 81 : op.child(0)->accept(*this);
1415 : 81 : }
1416 : :
1417 : 0 : void Interpreter::operator()(const PrintOperator &op)
1418 : : {
1419 [ # # ]: 0 : op.data(new PrintData(op));
1420 : 0 : op.child(0)->accept(*this);
1421 [ # # ]: 0 : if (not Options::Get().quiet)
1422 : 0 : op.out << as<PrintData>(op.data())->num_rows << " rows\n";
1423 : 0 : }
1424 : :
1425 : 0 : void Interpreter::operator()(const NoOpOperator &op)
1426 : : {
1427 : 0 : op.data(new NoOpData());
1428 : 0 : op.child(0)->accept(*this);
1429 : 0 : op.out << as<NoOpData>(op.data())->num_rows << " rows\n";
1430 : 0 : }
1431 : :
1432 : 81 : void Interpreter::operator()(const ScanOperator &op)
1433 : : {
1434 : 81 : Pipeline pipeline(op.schema());
1435 [ + - ]: 81 : pipeline.push(op);
1436 : 81 : }
1437 : :
1438 : 0 : void Interpreter::operator()(const FilterOperator &op)
1439 : : {
1440 : 0 : op.child(0)->accept(*this);
1441 : 0 : }
1442 : :
1443 : 0 : void Interpreter::operator()(const DisjunctiveFilterOperator &op)
1444 : : {
1445 : 0 : op.child(0)->accept(*this);
1446 : 0 : }
1447 : :
1448 : 0 : void Interpreter::operator()(const JoinOperator &op)
1449 : : {
1450 [ # # ]: 0 : if (op.predicate().is_equi()) {
1451 : : /* Perform simple hash join. */
1452 [ # # ]: 0 : auto data = new SimpleHashJoinData(op);
1453 : 0 : op.data(data);
1454 [ # # ]: 0 : if (op.has_info())
1455 : 0 : data->ht.resize(op.info().estimated_cardinality);
1456 : 0 : op.child(0)->accept(*this); // build HT on LHS
1457 [ # # ]: 0 : if (data->ht.size() == 0) // no tuples produced
1458 : 0 : return;
1459 : 0 : data->is_probe_phase = true;
1460 : 0 : op.child(1)->accept(*this); // probe HT with RHS
1461 : 0 : } else {
1462 : : /* Perform nested-loops join. */
1463 [ # # ]: 0 : auto data = new NestedLoopsJoinData(op);
1464 : 0 : op.data(data);
1465 [ # # ]: 0 : for (std::size_t i = 0, end = op.children().size(); i != end; ++i) {
1466 : 0 : data->active_child = i;
1467 : 0 : auto c = op.child(i);
1468 : 0 : c->accept(*this);
1469 [ # # # # ]: 0 : if (i != op.children().size() - 1 and data->buffers[i].empty()) // no tuples produced
1470 : 0 : return;
1471 : 0 : }
1472 : : }
1473 : 0 : }
1474 : :
1475 : 81 : void Interpreter::operator()(const ProjectionOperator &op)
1476 : : {
1477 : 81 : bool has_child = op.children().size();
1478 [ + - ]: 81 : auto data = new ProjectionData(op);
1479 : 81 : op.data(data);
1480 : :
1481 : : /* Evaluate the projection. */
1482 [ + - ]: 81 : if (has_child)
1483 : 81 : op.child(0)->accept(*this);
1484 : : else {
1485 : 0 : Pipeline pipeline;
1486 [ # # ]: 0 : pipeline.block_.mask(1); // evaluate the projection EXACTLY ONCE on an empty tuple
1487 [ # # ]: 0 : pipeline.push(op);
1488 : 0 : }
1489 : 81 : }
1490 : :
1491 : 0 : void Interpreter::operator()(const LimitOperator &op)
1492 : : {
1493 : : try {
1494 [ # # # # ]: 0 : op.data(new LimitData());
1495 [ # # # # ]: 0 : op.child(0)->accept(*this);
1496 [ # # ]: 0 : } catch (LimitOperator::stack_unwind) {
1497 : : /* OK, we produced all tuples and unwinded the stack */
1498 : 0 : }
1499 : 0 : }
1500 : :
1501 : 0 : void Interpreter::operator()(const GroupingOperator &op)
1502 : : {
1503 : 0 : auto &parent = *op.parent();
1504 [ # # ]: 0 : auto data = new HashBasedGroupingData(op);
1505 : 0 : op.data(data);
1506 : :
1507 : 0 : op.child(0)->accept(*this);
1508 : :
1509 : 0 : const auto num_groups = data->groups.size();
1510 : 0 : const auto remainder = num_groups % data->pipeline.block_.capacity();
1511 : 0 : auto it = data->groups.begin();
1512 [ # # ]: 0 : for (std::size_t i = 0; i != num_groups - remainder; i += data->pipeline.block_.capacity()) {
1513 : 0 : data->pipeline.block_.clear();
1514 : 0 : data->pipeline.block_.fill();
1515 [ # # ]: 0 : for (std::size_t j = 0; j != data->pipeline.block_.capacity(); ++j) {
1516 : 0 : auto node = data->groups.extract(it++);
1517 [ # # # # ]: 0 : swap(data->pipeline.block_[j], node.key());
1518 : 0 : }
1519 : 0 : data->pipeline.push(parent);
1520 : 0 : }
1521 : 0 : data->pipeline.block_.clear();
1522 : 0 : data->pipeline.block_.mask((1UL << remainder) - 1UL);
1523 [ # # ]: 0 : for (std::size_t i = 0; i != remainder; ++i) {
1524 : 0 : auto node = data->groups.extract(it++);
1525 [ # # # # ]: 0 : swap(data->pipeline.block_[i], node.key());
1526 : 0 : }
1527 : 0 : data->pipeline.push(parent);
1528 : 0 : }
1529 : :
1530 : 0 : void Interpreter::operator()(const AggregationOperator &op)
1531 : : {
1532 [ # # ]: 0 : op.data(new AggregationData(op));
1533 : 0 : auto data = as<AggregationData>(op.data());
1534 : :
1535 : : /* Initialize aggregates. */
1536 [ # # ]: 0 : for (std::size_t i = 0, end = op.aggregates().size(); i != end; ++i) {
1537 : 0 : auto &fe = as<const ast::FnApplicationExpr>(op.aggregates()[i].get());
1538 : 0 : auto ty = fe.type();
1539 : 0 : auto &fn = fe.get_function();
1540 : :
1541 [ # # # # : 0 : switch (fn.fnid) {
# # ]
1542 : : default:
1543 : 0 : M_unreachable("function kind not implemented");
1544 : :
1545 : : case Function::FN_UDF:
1546 : 0 : M_unreachable("UDFs not yet supported");
1547 : :
1548 : : case Function::FN_COUNT:
1549 : 0 : data->aggregates.set(i, 0); // initialize
1550 : 0 : break;
1551 : :
1552 : : case Function::FN_SUM: {
1553 : 0 : auto n = as<const Numeric>(ty);
1554 [ # # ]: 0 : if (n->is_floating_point())
1555 : 0 : data->aggregates.set(i, 0.); // double precision
1556 : : else
1557 : 0 : data->aggregates.set(i, 0L); // int64
1558 : 0 : break;
1559 : : }
1560 : :
1561 : : case Function::FN_AVG: {
1562 [ # # ]: 0 : if (ty->is_floating_point())
1563 : 0 : data->aggregates.set(i, 0.); // double precision
1564 : : else
1565 : 0 : data->aggregates.set(i, 0L); // int64
1566 : 0 : break;
1567 : : }
1568 : :
1569 : : case Function::FN_MIN:
1570 : : case Function::FN_MAX: {
1571 : 0 : data->aggregates.null(i); // initialize to NULL
1572 : 0 : break;
1573 : : }
1574 : : }
1575 : 0 : }
1576 : 0 : op.child(0)->accept(*this);
1577 : :
1578 : : using std::swap;
1579 : 0 : data->pipeline.block_.clear();
1580 : 0 : data->pipeline.block_.mask(1UL);
1581 : 0 : swap(data->pipeline.block_[0], data->aggregates);
1582 : 0 : data->pipeline.push(*op.parent());
1583 : 0 : }
1584 : :
1585 : 0 : void Interpreter::operator()(const SortingOperator &op)
1586 : : {
1587 : 0 : op.child(0)->accept(*this);
1588 : :
1589 : 0 : auto data = as<SortingData>(op.data());
1590 [ # # ]: 0 : if (not data) // no tuples produced
1591 : 0 : return;
1592 : :
1593 : 0 : const auto &orderings = op.order_by();
1594 : :
1595 [ # # ]: 0 : StackMachine comparator(data->pipeline.schema());
1596 [ # # ]: 0 : for (auto o : orderings) {
1597 [ # # ]: 0 : comparator.emit(o.first.get(), 1); // LHS
1598 [ # # ]: 0 : comparator.emit(o.first.get(), 2); // RHS
1599 : :
1600 : : /* Emit comparison. */
1601 [ # # ]: 0 : auto ty = o.first.get().type();
1602 [ # # # # : 0 : visit(overloaded {
# # # # #
# ]
1603 : 0 : [&comparator](const Boolean&) { comparator.emit_Cmp_b(); },
1604 : 0 : [&comparator](const CharacterSequence&) { comparator.emit_Cmp_s(); },
1605 : 0 : [&comparator](const Numeric &n) {
1606 [ # # # ]: 0 : switch (n.kind) {
1607 : : case Numeric::N_Int:
1608 : : case Numeric::N_Decimal:
1609 : 0 : comparator.emit_Cmp_i();
1610 : 0 : break;
1611 : :
1612 : : case Numeric::N_Float:
1613 [ # # ]: 0 : if (n.size() <= 32)
1614 : 0 : comparator.emit_Cmp_f();
1615 : : else
1616 : 0 : comparator.emit_Cmp_d();
1617 : 0 : break;
1618 : : }
1619 : 0 : },
1620 : 0 : [&comparator](const Date&) { comparator.emit_Cmp_i(); },
1621 : 0 : [&comparator](const DateTime&) { comparator.emit_Cmp_i(); },
1622 : 0 : [](auto&&) { M_insist("invalid type"); }
1623 : 0 : }, *ty);
1624 : :
1625 [ # # ]: 0 : if (not o.second)
1626 [ # # ]: 0 : comparator.emit_Minus_i(); // sort descending
1627 [ # # ]: 0 : comparator.emit_St_Tup_i(0, 0);
1628 [ # # ]: 0 : comparator.emit_Stop_NZ();
1629 : : }
1630 : :
1631 [ # # # # : 0 : Tuple res({ Type::Get_Integer(Type::TY_Vector, 4) });
# # # # ]
1632 [ # # ]: 0 : std::sort(data->buffer.begin(), data->buffer.end(), [&](Tuple &first, Tuple &second) {
1633 : 0 : Tuple *args[] = { &res, &first, &second };
1634 : 0 : comparator(args);
1635 : 0 : M_insist(not res.is_null(0));
1636 : 0 : return res[0].as_i() < 0;
1637 : : });
1638 : :
1639 [ # # ]: 0 : auto &parent = *op.parent();
1640 : 0 : const auto num_tuples = data->buffer.size();
1641 [ # # ]: 0 : const auto remainder = num_tuples % data->pipeline.block_.capacity();
1642 : 0 : auto it = data->buffer.begin();
1643 [ # # # # ]: 0 : for (std::size_t i = 0; i != num_tuples - remainder; i += data->pipeline.block_.capacity()) {
1644 [ # # ]: 0 : data->pipeline.block_.clear();
1645 [ # # ]: 0 : data->pipeline.block_.fill();
1646 [ # # # # ]: 0 : for (std::size_t j = 0; j != data->pipeline.block_.capacity(); ++j)
1647 [ # # # # ]: 0 : data->pipeline.block_[j] = std::move(*it++);
1648 [ # # ]: 0 : data->pipeline.push(parent);
1649 : 0 : }
1650 [ # # ]: 0 : data->pipeline.block_.clear();
1651 [ # # ]: 0 : data->pipeline.block_.mask((1UL << remainder) - 1UL);
1652 [ # # ]: 0 : for (std::size_t i = 0; i != remainder; ++i)
1653 [ # # # # ]: 0 : data->pipeline.block_[i] = std::move(*it++);
1654 [ # # ]: 0 : data->pipeline.push(parent);
1655 : 0 : }
1656 : :
1657 : : __attribute__((constructor(202)))
1658 : 1 : static void register_interpreter()
1659 : : {
1660 : 1 : Catalog &C = Catalog::Get();
1661 [ + - ]: 1 : C.register_backend<Interpreter>(C.pool("Interpreter"), "tuple-at-a-time Interpreter built with virtual stack machines");
1662 : 1 : }
|