Branch data Line data Source code
1 : : #include <mutable/io/Reader.hpp>
2 : :
3 : : #include "backend/Interpreter.hpp"
4 : : #include "backend/StackMachine.hpp"
5 : : #include <cctype>
6 : : #include <cerrno>
7 : : #include <exception>
8 : : #include <iterator>
9 : : #include <limits>
10 : : #include <map>
11 : : #include <memory>
12 : : #include <mutable/catalog/Catalog.hpp>
13 : : #include <mutable/storage/DataLayout.hpp>
14 : : #include <mutable/storage/Store.hpp>
15 : : #include <mutable/util/macro.hpp>
16 : : #include <string>
17 : :
18 : :
19 : : using namespace m;
20 : : using namespace m::storage;
21 : :
22 : :
23 [ + - + - : 60 : DSVReader::DSVReader(const Table &table, Config cfg, Diagnostic &diag, Scheduler::Transaction *transaction)
+ - ]
24 : 20 : : Reader(table, diag, transaction)
25 : 20 : , cfg_(cfg)
26 [ + - ]: 20 : , pos(nullptr)
27 : 20 : {
28 [ + - + - : 20 : if (config().delimiter == config().quote)
+ + ]
29 [ + - + - : 1 : throw invalid_argument("delimiter and quote must not be the same character");
- + + - ]
30 : 21 : }
31 : :
32 : 19 : void DSVReader::operator()(std::istream &in, const char *name)
33 : : {
34 : 19 : auto &C = Catalog::Get();
35 : 19 : auto &store = table.store();
36 : :
37 : : /* Compute table schema. */
38 : 19 : Schema S;
39 [ + - + - : 71 : for (auto it = table.begin_all(); it != table.end_all(); ++it) S.add({table.name(), it->name}, it->type);
+ - + + +
- + - + -
+ - - + +
- + - +
- ]
40 : :
41 : : /* Declare reference to the `StackMachine` for the current `Linearization`. */
42 : 19 : std::unique_ptr<StackMachine> W;
43 : 19 : const DataLayout *layout = nullptr;
44 : :
45 : : /* Allocate intermediate tuple. */
46 [ + - - + ]: 19 : tup = Tuple(S);
47 : :
48 : 19 : std::vector<const Attribute*> columns; ///< maps column offset to attribute
49 : 19 : this->in = ∈
50 : 19 : c = '\n';
51 [ + - ]: 19 : pos = Position(name);
52 [ + - ]: 19 : step(); // initialize the variable `c` by reading the first character from the input stream
53 : :
54 : 35 : auto read_cell = [&]() -> ThreadSafePooledString {
55 : 16 : buf.clear();
56 [ + - + + : 68 : while (c != EOF and c != '\n' and c != config().delimiter) {
+ + ]
57 : 52 : buf.push_back(c);
58 : 52 : step();
59 : : }
60 : 16 : buf.push_back(0);
61 : 16 : return C.pool(&buf[0]);
62 : : };
63 : :
64 : : /*----- Handle header information. -------------------------------------------------------------------------------*/
65 [ + - + + : 19 : if (config().has_header and not config().skip_header) {
+ - + + ]
66 [ - + + + ]: 20 : while (c != EOF and c != '\n') {
67 [ + - ]: 16 : auto name = read_cell();
68 : 16 : const Attribute *attr = nullptr;
69 : : try {
70 [ + + ]: 16 : attr = &table.at(name);
71 [ + - + - ]: 16 : } catch (std::out_of_range) { /* nothing to do */ }
72 [ + - ]: 16 : columns.push_back(attr);
73 [ + - + + ]: 16 : if (c == config().delimiter)
74 [ + - ]: 13 : step(); // discard delimiter
75 : 36 : }
76 [ - + + - ]: 4 : M_insist(c == EOF or c == '\n');
77 [ + - ]: 4 : step();
78 : 4 : } else {
79 [ + - + - : 51 : for (auto &attr : table)
+ - + + +
- + - ]
80 [ + - ]: 36 : columns.push_back(&attr);
81 [ + - + + ]: 15 : if (config().skip_header) {
82 [ + - ]: 1 : in.ignore(std::numeric_limits<std::streamsize>::max(), '\n'); // skip entire line
83 : 1 : c = '\n';
84 [ + - ]: 1 : step(); // skip newline
85 : 1 : }
86 : : }
87 : :
88 : : /* Find timestamp attributes */
89 [ + - + - : 19 : auto ts_begin = std::find_if(table.cbegin_hidden(), table.end_hidden(),
+ - ]
90 : 19 : [&](const Attribute & attr) {
91 [ # # ]: 0 : return attr.name == C.pool("$ts_begin");
92 : 0 : });
93 [ + - + - : 19 : auto ts_end = std::find_if(table.cbegin_hidden(), table.end_hidden(),
+ - ]
94 : 19 : [&](const Attribute & attr) {
95 : 0 : return attr.name == C.pool("$ts_end");
96 : : });
97 : :
98 : : /*----- Read data. -----------------------------------------------------------------------------------------------*/
99 : 19 : std::size_t idx = 0;
100 [ + + + + : 86 : while (in.good() and idx < config().num_rows) {
+ - + + ]
101 : 67 : ++idx;
102 [ + - ]: 67 : store.append();
103 [ + + ]: 276 : for (std::size_t i = 0; i != columns.size(); ++i) {
104 : 212 : auto col = columns[i];
105 [ + + + - : 212 : if (i != 0 and not accept(config().delimiter)) {
+ - + + ]
106 [ + - + - : 3 : diag.e(pos) << "Expected a delimiter (" << config().delimiter << ").\n";
+ - + - +
- ]
107 [ + - ]: 3 : discard_row();
108 : 3 : --idx;
109 [ + - ]: 3 : store.drop(); // drop the unfinished row
110 : 3 : goto end_of_row;
111 : : }
112 : :
113 [ + + ]: 209 : if (col) { // current cell should be read
114 [ + + + - : 204 : if ((i == columns.size() - 1 and c == '\n') or (i < columns.size() - 1 and c == config().delimiter)) { // NULL
+ - ]
115 [ + + ]: 204 : tup.null(col->id);
116 : 2 : continue; // keep delimiter (expected at beginning of each loop)
117 : : }
118 : 0 : col_idx = col->id;
119 [ + + ]: 0 : (*this)(*col->type); // dynamic dispatch based on column type
120 [ + - ]: 202 : discard_cell(); // discard remainder of the cell
121 : 202 : } else {
122 [ + - ]: 5 : discard_cell();
123 : : }
124 : 207 : }
125 [ + - - + ]: 128 : if (c != EOF and c != '\n') {
126 [ # # # # ]: 0 : diag.e(pos) << "Expected end of row.\n";
127 [ # # ]: 0 : discard_row();
128 : 0 : } else {
129 [ + - + + ]: 64 : if (layout != &table.layout()) {
130 : : /* The data layout was updated, recompile stack machine. */
131 [ + - ]: 19 : layout = &table.layout();
132 [ + - + - : 19 : W = std::make_unique<StackMachine>(Interpreter::compile_store(S, store.memory().addr(), *layout,
+ - + - ]
133 [ + - ]: 19 : S, store.num_rows() - 1));
134 : 19 : }
135 : : /*----- set timestamps if available. -----*/
136 [ - + # # : 64 : if (this->transaction and ts_begin != table.end_hidden()) {
# # - + ]
137 [ # # # # : 0 : tup.set(ts_begin->id, Value(transaction->start_time()));
# # # # ]
138 : : /* Set $ts_end to -1. It is a special value representing infinity. */
139 [ # # # # : 0 : M_insist(ts_end != table.end_hidden());
# # ]
140 [ # # # # : 0 : tup.set(ts_end->id, Value(-1));
# # ]
141 : 0 : }
142 : :
143 : 64 : Tuple *args[] = { &tup };
144 [ + - ]: 64 : (*W)(args); // write tuple to store
145 : : }
146 : : end_of_row:
147 [ - + + - ]: 67 : M_insist(c == EOF or c == '\n');
148 [ + - ]: 67 : step();
149 : : }
150 : :
151 : 251 : this->in = nullptr;
152 : 1060 : }
153 : :
154 : :
155 : 5 : void DSVReader::operator()(Const<Boolean>&)
156 : : {
157 : 5 : buf.clear();
158 [ + - + + : 22 : while (c != EOF and c != '\n' and c != config().delimiter) { push(); }
+ + ]
159 : 5 : buf.push_back(0);
160 [ + + ]: 5 : if (streq("TRUE", &buf[0]))
161 : 1 : tup.set(col_idx, true);
162 [ + + ]: 4 : else if (streq("FALSE", &buf[0]))
163 : 1 : tup.set(col_idx, false);
164 : : else
165 : 3 : diag.e(pos) << "Expected TRUE or FALSE.\n";
166 : 5 : }
167 : :
168 : 44 : void DSVReader::operator()(Const<CharacterSequence>&)
169 : : {
170 : : /* This implementation is compliant with RFC 4180. In quoted strings, quotes have to be escaped with an additional
171 : : * quote. In unquoted strings, delimiter and quotes are prohibited. In both cases, escape sequences are not
172 : : * supported. Note that EOF implicitly closes quoted strings.
173 : : * Source: https://tools.ietf.org/html/rfc4180#section-2 */
174 : 44 : buf.clear();
175 [ + + ]: 44 : if (accept(config().quote)) {
176 [ + + ]: 26 : if (config().escape == config().quote) { // RFC 4180
177 [ - + ]: 48 : while (c != EOF) {
178 [ + + ]: 48 : if (c != config().quote) {
179 : 44 : push();
180 : 44 : } else {
181 : 4 : step();
182 [ + + ]: 4 : if (c == config().quote)
183 : 1 : push();
184 : : else
185 : 3 : break;
186 : : }
187 : : }
188 : 3 : } else {
189 [ - + ]: 368 : while (c != EOF) {
190 [ + + ]: 368 : if (c == config().quote) {
191 : 23 : step();
192 : 23 : break;
193 [ + + ]: 345 : } else if (c == config().escape) {
194 : 7 : step();
195 : 7 : push();
196 : 7 : } else {
197 : 338 : push();
198 : : }
199 : : }
200 : : }
201 : 26 : } else {
202 [ + - + + : 274 : while (c != EOF and c != '\n' and c != config().delimiter) {
+ + ]
203 [ + + ]: 257 : if (c == config().quote) {
204 : 1 : diag.e(pos) << "WARNING: Illegal character " << config().quote << " found in unquoted string.\n";
205 : : /* Entire cell is discarded. */
206 : 1 : tup.null(col_idx);
207 [ + - + + : 4 : while (c != EOF and c != '\n' and c != config().delimiter) step();
+ + ]
208 : 1 : return;
209 : : } else
210 : 256 : push();
211 : : }
212 : : }
213 : 43 : buf.push_back(0);
214 : :
215 : 43 : Catalog &C = Catalog::Get();
216 [ + - + - ]: 43 : tup.set(col_idx, C.pool(&buf[0]));
217 : 44 : }
218 : :
219 : 5 : void DSVReader::operator()(Const<Date>&)
220 : : {
221 : 5 : Catalog &C = Catalog::Get();
222 : 5 : buf.clear();
223 : 5 : buf.push_back('d');
224 : 5 : buf.push_back('\'');
225 : :
226 : 5 : const bool has_quote = accept(config().quote);
227 : : #define DIGITS(num) for (auto i = 0; i < num; ++i) if (is_dec(c)) push(); else goto invalid;
228 [ + - ]: 5 : if ('-' == c) push();
229 [ + + + - ]: 25 : DIGITS(4);
230 [ + - ]: 5 : if ('-' == c) push(); else goto invalid;
231 [ + + + - ]: 15 : DIGITS(2);
232 [ + - ]: 5 : if ('-' == c) push(); else goto invalid;
233 [ + + + - ]: 15 : DIGITS(2);
234 : : #undef DIGITS
235 [ + + + + ]: 5 : if (has_quote and not accept(config().quote))
236 : 1 : goto invalid;
237 : :
238 : 4 : buf.push_back('\'');
239 : 4 : buf.push_back(0);
240 : :
241 [ + - - + : 4 : tup.set(col_idx, Interpreter::eval(ast::Constant(ast::Token(pos, C.pool(buf.data()), TK_DATE))));
+ - + - ]
242 : 4 : return;
243 : :
244 : : invalid:
245 : 1 : diag.e(pos) << "WARNING: Invalid date.\n";
246 : 1 : tup.null(col_idx);
247 : 5 : }
248 : :
249 : 5 : void DSVReader::operator()(Const<DateTime>&)
250 : : {
251 : 5 : Catalog &C = Catalog::Get();
252 : 5 : buf.clear();
253 : 5 : buf.push_back('d');
254 : 5 : buf.push_back('\'');
255 : :
256 : 5 : const bool has_quote = accept(config().quote);
257 : : #define DIGITS(num) for (auto i = 0; i < num; ++i) if (is_dec(c)) push(); else goto invalid;
258 [ + - ]: 5 : if ('-' == c) push();
259 [ + + + - ]: 25 : DIGITS(4);
260 [ + - ]: 5 : if ('-' == c) push(); else goto invalid;
261 [ + + + - ]: 15 : DIGITS(2);
262 [ + - ]: 5 : if ('-' == c) push(); else goto invalid;
263 [ + + + - ]: 15 : DIGITS(2);
264 [ + + ]: 5 : if (' ' == c) push(); else goto invalid;
265 [ + + + - ]: 12 : DIGITS(2);
266 [ + - ]: 4 : if (':' == c) push(); else goto invalid;
267 [ + + + - ]: 12 : DIGITS(2);
268 [ + - ]: 4 : if (':' == c) push(); else goto invalid;
269 [ + + + - ]: 12 : DIGITS(2);
270 : : #undef DIGITS
271 [ + + + + ]: 4 : if (has_quote and not accept(config().quote))
272 : 1 : goto invalid;
273 : :
274 : 3 : buf.push_back('\'');
275 : 3 : buf.push_back(0);
276 [ + - - + : 3 : tup.set(col_idx, Interpreter::eval(ast::Constant(ast::Token(pos, C.pool(buf.data()), TK_DATE_TIME))));
+ - + - ]
277 : 3 : return;
278 : :
279 : : invalid:
280 : 2 : diag.e(pos) << "WARNING: Invalid datetime.\n";
281 : 2 : tup.null(col_idx);
282 : 5 : }
283 : :
284 : 143 : void DSVReader::operator()(Const<Numeric> &ty)
285 : : {
286 [ - + + + ]: 143 : switch (ty.kind) {
287 : : case Numeric::N_Int: {
288 : 90 : bool is_neg = false;
289 [ + + ]: 90 : if (accept('-'))
290 : 1 : is_neg = true;
291 : : else
292 : 89 : accept('+');
293 : 90 : int64_t i = read_unsigned_int();
294 [ + - + + : 90 : if (c != EOF and c != '\n' and c != config().delimiter) {
+ + ]
295 : 5 : diag.e(pos) << "WARNING: Unexpected characters encountered in an integer.\n";
296 : 5 : tup.null(col_idx);
297 [ + - - + : 27 : while (c != EOF and c != '\n' and c != config().delimiter) step();
+ + ]
298 : 5 : return;
299 : : }
300 [ + + ]: 85 : if (is_neg) i = -i;
301 : 85 : tup.set(col_idx, i);
302 : 85 : break;
303 : : }
304 : :
305 : : case Numeric::N_Decimal: {
306 : 4 : auto scale = ty.scale;
307 : : /* Read pre dot digits. */
308 : 4 : bool is_neg = false;
309 [ + + ]: 4 : if (accept('-'))
310 : 1 : is_neg = true;
311 : : else
312 : 3 : accept('+');
313 : 4 : int64_t d = read_unsigned_int();
314 : : // std::cerr << "Read: " << d;
315 : 4 : d = d * powi(10, scale);
316 [ + + ]: 4 : if (accept('.')) {
317 : : /* Read post dot digits. */
318 : 3 : int64_t post_dot = 0;
319 : 3 : auto n = scale;
320 [ + + + + ]: 11 : while (n > 0 and is_dec(c)) {
321 : 8 : post_dot = 10 * post_dot + c - '0';
322 : 8 : step();
323 : 8 : n--;
324 : : }
325 : 3 : post_dot *= powi(10, n);
326 : : /* Discard further digits */
327 [ - + ]: 3 : while (is_dec(c)) { step(); }
328 [ + - ]: 3 : d += d >= 0 ? post_dot : -post_dot;
329 : 3 : }
330 [ + - + - : 4 : if (c != EOF and c != '\n' and c != config().delimiter) {
+ + ]
331 : 2 : diag.e(pos) << "WARNING: Unexpected characters encountered in a decimal.\n";
332 : 2 : tup.null(col_idx);
333 [ + - - + : 10 : while (c != EOF and c != '\n' and c != config().delimiter) step();
+ + ]
334 : 2 : return;
335 : : }
336 [ + + ]: 2 : if (is_neg) d = -d;
337 : 2 : tup.set(col_idx, d);
338 : 2 : break;
339 : : }
340 : :
341 : : case Numeric::N_Float: {
342 : 49 : std::string float_str;
343 [ + - + + : 392 : while(c != EOF and c != '\n' and c != config().delimiter) {
+ - + + ]
344 [ + - ]: 343 : float_str += c;
345 [ + - ]: 343 : step();
346 : : }
347 : : char* end;
348 : 49 : errno = 0;
349 : 49 : double d = std::strtod(float_str.c_str(), &end);
350 [ + + ]: 49 : if (*end != '\0') {
351 [ + - + - ]: 5 : diag.e(pos) << "WARNING: Unexpected characters encountered in a floating-point number.\n";
352 [ + - ]: 5 : tup.null(col_idx);
353 : 5 : return;
354 : : }
355 [ - + # # : 44 : if ( errno == ERANGE and ( d == HUGE_VAL or d == HUGE_VALF or d == HUGE_VALL ) ) {
# # # # ]
356 [ # # # # ]: 0 : diag.w(pos) << "WARNING: A floating-point number is larger than the maximum value.\n";
357 : 0 : d = std::numeric_limits<double>::max();
358 [ - + # # : 44 : } else if ( errno == ERANGE and (d == -HUGE_VAL or d == -HUGE_VALF or d == -HUGE_VALL ) ) {
# # ]
359 [ # # # # ]: 0 : diag.w(pos) << "WARNING: A floating-point number is smaller than the minimum value.\n";
360 : 0 : d = std::numeric_limits<double>::min();
361 : 0 : }
362 [ + - + - ]: 44 : if (ty.is_float())
363 [ + - + - ]: 44 : tup.set(col_idx, float(d));
364 : : else
365 [ # # # # ]: 0 : tup.set(col_idx, d);
366 : 44 : break;
367 [ - + ]: 49 : }
368 : : }
369 : 143 : }
370 : :
371 : 0 : void DSVReader::operator()(Const<ErrorType>&) { M_unreachable("invalid type"); }
372 : 0 : void DSVReader::operator()(Const<NoneType>&) { M_unreachable("invalid type"); }
373 : 0 : void DSVReader::operator()(Const<Bitmap>&) { M_unreachable("invalid type"); }
374 : 0 : void DSVReader::operator()(Const<FnType>&) { M_unreachable("invalid type"); }
375 : :
376 : 94 : int64_t DSVReader::read_unsigned_int()
377 : : {
378 : 94 : int64_t i = 0;
379 [ + + ]: 228 : while (is_dec(c)) {
380 : 134 : i = 10 * i + c - '0';
381 : 134 : step();
382 : : }
383 : 94 : return i;
384 : : }
|