LCOV - code coverage report
Current view: top level - src/io - DSVReader.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 241 263 91.6 %
Date: 2025-03-25 01:19:55 Functions: 10 17 58.8 %
Branches: 275 468 58.8 %

           Branch data     Line data    Source code
       1                 :            : #include <mutable/io/Reader.hpp>
       2                 :            : 
       3                 :            : #include "backend/Interpreter.hpp"
       4                 :            : #include "backend/StackMachine.hpp"
       5                 :            : #include <cctype>
       6                 :            : #include <cerrno>
       7                 :            : #include <exception>
       8                 :            : #include <iterator>
       9                 :            : #include <limits>
      10                 :            : #include <map>
      11                 :            : #include <memory>
      12                 :            : #include <mutable/catalog/Catalog.hpp>
      13                 :            : #include <mutable/storage/DataLayout.hpp>
      14                 :            : #include <mutable/storage/Store.hpp>
      15                 :            : #include <mutable/util/macro.hpp>
      16                 :            : #include <string>
      17                 :            : 
      18                 :            : 
      19                 :            : using namespace m;
      20                 :            : using namespace m::storage;
      21                 :            : 
      22                 :            : 
      23   [ +  -  +  -  :         60 : DSVReader::DSVReader(const Table &table, Config cfg, Diagnostic &diag, Scheduler::Transaction *transaction)
                   +  - ]
      24                 :         20 :     : Reader(table, diag, transaction)
      25                 :         20 :     , cfg_(cfg)
      26         [ +  - ]:         20 :     , pos(nullptr)
      27                 :         20 : {
      28   [ +  -  +  -  :         20 :     if (config().delimiter == config().quote)
                   +  + ]
      29   [ +  -  +  -  :          1 :         throw invalid_argument("delimiter and quote must not be the same character");
             -  +  +  - ]
      30                 :         21 : }
      31                 :            : 
      32                 :         19 : void DSVReader::operator()(std::istream &in, const char *name)
      33                 :            : {
      34                 :         19 :     auto &C = Catalog::Get();
      35                 :         19 :     auto &store = table.store();
      36                 :            : 
      37                 :            :     /* Compute table schema. */
      38                 :         19 :     Schema S;
      39   [ +  -  +  -  :         71 :     for (auto it = table.begin_all(); it != table.end_all(); ++it) S.add({table.name(), it->name}, it->type);
          +  -  +  +  +  
          -  +  -  +  -  
          +  -  -  +  +  
             -  +  -  +  
                      - ]
      40                 :            : 
      41                 :            :     /* Declare reference to the `StackMachine` for the current `Linearization`. */
      42                 :         19 :     std::unique_ptr<StackMachine> W;
      43                 :         19 :     const DataLayout *layout = nullptr;
      44                 :            : 
      45                 :            :     /* Allocate intermediate tuple. */
      46   [ +  -  -  + ]:         19 :     tup = Tuple(S);
      47                 :            : 
      48                 :         19 :     std::vector<const Attribute*> columns; ///< maps column offset to attribute
      49                 :         19 :     this->in = &in;
      50                 :         19 :     c = '\n';
      51         [ +  - ]:         19 :     pos = Position(name);
      52         [ +  - ]:         19 :     step(); // initialize the variable `c` by reading the first character from the input stream
      53                 :            : 
      54                 :         35 :     auto read_cell = [&]() -> ThreadSafePooledString {
      55                 :         16 :         buf.clear();
      56   [ +  -  +  +  :         68 :         while (c != EOF and c != '\n' and c != config().delimiter) {
                   +  + ]
      57                 :         52 :             buf.push_back(c);
      58                 :         52 :             step();
      59                 :            :         }
      60                 :         16 :         buf.push_back(0);
      61                 :         16 :         return C.pool(&buf[0]);
      62                 :            :     };
      63                 :            : 
      64                 :            :     /*----- Handle header information. -------------------------------------------------------------------------------*/
      65   [ +  -  +  +  :         19 :     if (config().has_header and not config().skip_header) {
             +  -  +  + ]
      66   [ -  +  +  + ]:         20 :         while (c != EOF and c != '\n') {
      67         [ +  - ]:         16 :             auto name = read_cell();
      68                 :         16 :             const Attribute *attr = nullptr;
      69                 :            :             try {
      70         [ +  + ]:         16 :                 attr = &table.at(name);
      71   [ +  -  +  - ]:         16 :             } catch (std::out_of_range) { /* nothing to do */ }
      72         [ +  - ]:         16 :             columns.push_back(attr);
      73   [ +  -  +  + ]:         16 :             if (c == config().delimiter)
      74         [ +  - ]:         13 :                 step(); // discard delimiter
      75                 :         36 :         }
      76   [ -  +  +  - ]:          4 :         M_insist(c == EOF or c == '\n');
      77         [ +  - ]:          4 :         step();
      78                 :          4 :     } else {
      79   [ +  -  +  -  :         51 :         for (auto &attr : table)
          +  -  +  +  +  
                -  +  - ]
      80         [ +  - ]:         36 :             columns.push_back(&attr);
      81   [ +  -  +  + ]:         15 :         if (config().skip_header) {
      82         [ +  - ]:          1 :             in.ignore(std::numeric_limits<std::streamsize>::max(), '\n'); // skip entire line
      83                 :          1 :             c = '\n';
      84         [ +  - ]:          1 :             step(); // skip newline
      85                 :          1 :         }
      86                 :            :     }
      87                 :            : 
      88                 :            :     /* Find timestamp attributes */
      89   [ +  -  +  -  :         19 :     auto ts_begin = std::find_if(table.cbegin_hidden(), table.end_hidden(),
                   +  - ]
      90                 :         19 :                                  [&](const Attribute & attr) {
      91         [ #  # ]:          0 :                                     return attr.name == C.pool("$ts_begin");
      92                 :          0 :     });
      93   [ +  -  +  -  :         19 :     auto ts_end = std::find_if(table.cbegin_hidden(), table.end_hidden(),
                   +  - ]
      94                 :         19 :                                [&](const Attribute & attr) {
      95                 :          0 :                                     return attr.name == C.pool("$ts_end");
      96                 :            :     });
      97                 :            : 
      98                 :            :     /*----- Read data. -----------------------------------------------------------------------------------------------*/
      99                 :         19 :     std::size_t idx = 0;
     100   [ +  +  +  +  :         86 :     while (in.good() and idx < config().num_rows) {
             +  -  +  + ]
     101                 :         67 :         ++idx;
     102         [ +  - ]:         67 :         store.append();
     103         [ +  + ]:        276 :         for (std::size_t i = 0; i != columns.size(); ++i) {
     104                 :        212 :             auto col = columns[i];
     105   [ +  +  +  -  :        212 :             if (i != 0 and not accept(config().delimiter)) {
             +  -  +  + ]
     106   [ +  -  +  -  :          3 :                 diag.e(pos) << "Expected a delimiter (" << config().delimiter << ").\n";
          +  -  +  -  +  
                      - ]
     107         [ +  - ]:          3 :                 discard_row();
     108                 :          3 :                 --idx;
     109         [ +  - ]:          3 :                 store.drop(); // drop the unfinished row
     110                 :          3 :                 goto end_of_row;
     111                 :            :             }
     112                 :            : 
     113         [ +  + ]:        209 :             if (col) { // current cell should be read
     114   [ +  +  +  -  :        204 :                 if ((i == columns.size() - 1 and c == '\n') or (i < columns.size() - 1 and c == config().delimiter)) { // NULL
                   +  - ]
     115         [ +  + ]:        204 :                     tup.null(col->id);
     116                 :          2 :                     continue; // keep delimiter (expected at beginning of each loop)
     117                 :            :                 }
     118                 :          0 :                 col_idx = col->id;
     119         [ +  + ]:          0 :                 (*this)(*col->type); // dynamic dispatch based on column type
     120         [ +  - ]:        202 :                 discard_cell(); // discard remainder of the cell
     121                 :        202 :             } else {
     122         [ +  - ]:          5 :                 discard_cell();
     123                 :            :             }
     124                 :        207 :         }
     125   [ +  -  -  + ]:        128 :         if (c != EOF and c != '\n') {
     126   [ #  #  #  # ]:          0 :             diag.e(pos) << "Expected end of row.\n";
     127         [ #  # ]:          0 :             discard_row();
     128                 :          0 :         } else {
     129   [ +  -  +  + ]:         64 :             if (layout != &table.layout()) {
     130                 :            :                 /* The data layout was updated, recompile stack machine. */
     131         [ +  - ]:         19 :                 layout = &table.layout();
     132   [ +  -  +  -  :         19 :                 W = std::make_unique<StackMachine>(Interpreter::compile_store(S, store.memory().addr(), *layout,
             +  -  +  - ]
     133         [ +  - ]:         19 :                                                                               S, store.num_rows() - 1));
     134                 :         19 :             }
     135                 :            :             /*----- set timestamps if available. -----*/
     136   [ -  +  #  #  :         64 :             if (this->transaction and ts_begin != table.end_hidden()) {
             #  #  -  + ]
     137   [ #  #  #  #  :          0 :                 tup.set(ts_begin->id, Value(transaction->start_time()));
             #  #  #  # ]
     138                 :            :                 /* Set $ts_end to -1. It is a special value representing infinity. */
     139   [ #  #  #  #  :          0 :                 M_insist(ts_end != table.end_hidden());
                   #  # ]
     140   [ #  #  #  #  :          0 :                 tup.set(ts_end->id, Value(-1));
                   #  # ]
     141                 :          0 :             }
     142                 :            : 
     143                 :         64 :             Tuple *args[] = { &tup };
     144         [ +  - ]:         64 :             (*W)(args); // write tuple to store
     145                 :            :         }
     146                 :            : end_of_row:
     147   [ -  +  +  - ]:         67 :         M_insist(c == EOF or c == '\n');
     148         [ +  - ]:         67 :         step();
     149                 :            :     }
     150                 :            : 
     151                 :        251 :     this->in = nullptr;
     152                 :       1060 : }
     153                 :            : 
     154                 :            : 
     155                 :          5 : void DSVReader::operator()(Const<Boolean>&)
     156                 :            : {
     157                 :          5 :     buf.clear();
     158   [ +  -  +  +  :         22 :     while (c != EOF and c != '\n' and c != config().delimiter) { push(); }
                   +  + ]
     159                 :          5 :     buf.push_back(0);
     160         [ +  + ]:          5 :     if (streq("TRUE", &buf[0]))
     161                 :          1 :         tup.set(col_idx, true);
     162         [ +  + ]:          4 :     else if (streq("FALSE", &buf[0]))
     163                 :          1 :         tup.set(col_idx, false);
     164                 :            :     else
     165                 :          3 :         diag.e(pos) << "Expected TRUE or FALSE.\n";
     166                 :          5 : }
     167                 :            : 
     168                 :         44 : void DSVReader::operator()(Const<CharacterSequence>&)
     169                 :            : {
     170                 :            :     /* This implementation is compliant with RFC 4180.  In quoted strings, quotes have to be escaped with an additional
     171                 :            :      * quote.  In unquoted strings, delimiter and quotes are prohibited.  In both cases, escape sequences are not
     172                 :            :      * supported.  Note that EOF implicitly closes quoted strings.
     173                 :            :      * Source: https://tools.ietf.org/html/rfc4180#section-2 */
     174                 :         44 :     buf.clear();
     175         [ +  + ]:         44 :     if (accept(config().quote)) {
     176         [ +  + ]:         26 :         if (config().escape == config().quote) { // RFC 4180
     177         [ -  + ]:         48 :             while (c != EOF) {
     178         [ +  + ]:         48 :                 if (c != config().quote) {
     179                 :         44 :                     push();
     180                 :         44 :                 } else {
     181                 :          4 :                     step();
     182         [ +  + ]:          4 :                     if (c == config().quote)
     183                 :          1 :                         push();
     184                 :            :                     else
     185                 :          3 :                         break;
     186                 :            :                 }
     187                 :            :             }
     188                 :          3 :         } else {
     189         [ -  + ]:        368 :             while (c != EOF) {
     190         [ +  + ]:        368 :                 if (c == config().quote) {
     191                 :         23 :                     step();
     192                 :         23 :                     break;
     193         [ +  + ]:        345 :                 } else if (c == config().escape) {
     194                 :          7 :                     step();
     195                 :          7 :                     push();
     196                 :          7 :                 } else {
     197                 :        338 :                     push();
     198                 :            :                 }
     199                 :            :             }
     200                 :            :         }
     201                 :         26 :     } else {
     202   [ +  -  +  +  :        274 :         while (c != EOF and c != '\n' and c != config().delimiter) {
                   +  + ]
     203         [ +  + ]:        257 :             if (c == config().quote) {
     204                 :          1 :                 diag.e(pos) << "WARNING: Illegal character " << config().quote << " found in unquoted string.\n";
     205                 :            :                 /* Entire cell is discarded. */
     206                 :          1 :                 tup.null(col_idx);
     207   [ +  -  +  +  :          4 :                 while (c != EOF and c != '\n' and c != config().delimiter) step();
                   +  + ]
     208                 :          1 :                 return;
     209                 :            :             } else
     210                 :        256 :                 push();
     211                 :            :         }
     212                 :            :     }
     213                 :         43 :     buf.push_back(0);
     214                 :            : 
     215                 :         43 :     Catalog &C = Catalog::Get();
     216   [ +  -  +  - ]:         43 :     tup.set(col_idx, C.pool(&buf[0]));
     217                 :         44 : }
     218                 :            : 
     219                 :          5 : void DSVReader::operator()(Const<Date>&)
     220                 :            : {
     221                 :          5 :     Catalog &C = Catalog::Get();
     222                 :          5 :     buf.clear();
     223                 :          5 :     buf.push_back('d');
     224                 :          5 :     buf.push_back('\'');
     225                 :            : 
     226                 :          5 :     const bool has_quote = accept(config().quote);
     227                 :            : #define DIGITS(num) for (auto i = 0; i < num; ++i) if (is_dec(c)) push(); else goto invalid;
     228         [ +  - ]:          5 :     if ('-' == c) push();
     229   [ +  +  +  - ]:         25 :     DIGITS(4);
     230         [ +  - ]:          5 :     if ('-' == c) push(); else goto invalid;
     231   [ +  +  +  - ]:         15 :     DIGITS(2);
     232         [ +  - ]:          5 :     if ('-' == c) push(); else goto invalid;
     233   [ +  +  +  - ]:         15 :     DIGITS(2);
     234                 :            : #undef DIGITS
     235   [ +  +  +  + ]:          5 :     if (has_quote and not accept(config().quote))
     236                 :          1 :         goto invalid;
     237                 :            : 
     238                 :          4 :     buf.push_back('\'');
     239                 :          4 :     buf.push_back(0);
     240                 :            : 
     241   [ +  -  -  +  :          4 :     tup.set(col_idx, Interpreter::eval(ast::Constant(ast::Token(pos, C.pool(buf.data()), TK_DATE))));
             +  -  +  - ]
     242                 :          4 :     return;
     243                 :            : 
     244                 :            : invalid:
     245                 :          1 :     diag.e(pos) << "WARNING: Invalid date.\n";
     246                 :          1 :     tup.null(col_idx);
     247                 :          5 : }
     248                 :            : 
     249                 :          5 : void DSVReader::operator()(Const<DateTime>&)
     250                 :            : {
     251                 :          5 :     Catalog &C = Catalog::Get();
     252                 :          5 :     buf.clear();
     253                 :          5 :     buf.push_back('d');
     254                 :          5 :     buf.push_back('\'');
     255                 :            : 
     256                 :          5 :     const bool has_quote = accept(config().quote);
     257                 :            : #define DIGITS(num) for (auto i = 0; i < num; ++i) if (is_dec(c)) push(); else goto invalid;
     258         [ +  - ]:          5 :     if ('-' == c) push();
     259   [ +  +  +  - ]:         25 :     DIGITS(4);
     260         [ +  - ]:          5 :     if ('-' == c) push(); else goto invalid;
     261   [ +  +  +  - ]:         15 :     DIGITS(2);
     262         [ +  - ]:          5 :     if ('-' == c) push(); else goto invalid;
     263   [ +  +  +  - ]:         15 :     DIGITS(2);
     264         [ +  + ]:          5 :     if (' ' == c) push(); else goto invalid;
     265   [ +  +  +  - ]:         12 :     DIGITS(2);
     266         [ +  - ]:          4 :     if (':' == c) push(); else goto invalid;
     267   [ +  +  +  - ]:         12 :     DIGITS(2);
     268         [ +  - ]:          4 :     if (':' == c) push(); else goto invalid;
     269   [ +  +  +  - ]:         12 :     DIGITS(2);
     270                 :            : #undef DIGITS
     271   [ +  +  +  + ]:          4 :     if (has_quote and not accept(config().quote))
     272                 :          1 :         goto invalid;
     273                 :            : 
     274                 :          3 :     buf.push_back('\'');
     275                 :          3 :     buf.push_back(0);
     276   [ +  -  -  +  :          3 :     tup.set(col_idx, Interpreter::eval(ast::Constant(ast::Token(pos, C.pool(buf.data()), TK_DATE_TIME))));
             +  -  +  - ]
     277                 :          3 :     return;
     278                 :            : 
     279                 :            : invalid:
     280                 :          2 :     diag.e(pos) << "WARNING: Invalid datetime.\n";
     281                 :          2 :     tup.null(col_idx);
     282                 :          5 : }
     283                 :            : 
     284                 :        143 : void DSVReader::operator()(Const<Numeric> &ty)
     285                 :            : {
     286   [ -  +  +  + ]:        143 :     switch (ty.kind) {
     287                 :            :         case Numeric::N_Int: {
     288                 :         90 :             bool is_neg = false;
     289         [ +  + ]:         90 :             if (accept('-'))
     290                 :          1 :                 is_neg = true;
     291                 :            :             else
     292                 :         89 :                 accept('+');
     293                 :         90 :             int64_t i = read_unsigned_int();
     294   [ +  -  +  +  :         90 :             if (c != EOF and c != '\n' and c != config().delimiter) {
                   +  + ]
     295                 :          5 :                 diag.e(pos) << "WARNING: Unexpected characters encountered in an integer.\n";
     296                 :          5 :                 tup.null(col_idx);
     297   [ +  -  -  +  :         27 :                 while (c != EOF and c != '\n' and c != config().delimiter) step();
                   +  + ]
     298                 :          5 :                 return;
     299                 :            :             }
     300         [ +  + ]:         85 :             if (is_neg) i = -i;
     301                 :         85 :             tup.set(col_idx, i);
     302                 :         85 :             break;
     303                 :            :         }
     304                 :            : 
     305                 :            :         case Numeric::N_Decimal: {
     306                 :          4 :             auto scale = ty.scale;
     307                 :            :             /* Read pre dot digits. */
     308                 :          4 :             bool is_neg = false;
     309         [ +  + ]:          4 :             if (accept('-'))
     310                 :          1 :                 is_neg = true;
     311                 :            :             else
     312                 :          3 :                 accept('+');
     313                 :          4 :             int64_t d = read_unsigned_int();
     314                 :            :             // std::cerr << "Read: " << d;
     315                 :          4 :             d = d * powi(10, scale);
     316         [ +  + ]:          4 :             if (accept('.')) {
     317                 :            :                 /* Read post dot digits. */
     318                 :          3 :                 int64_t post_dot = 0;
     319                 :          3 :                 auto n = scale;
     320   [ +  +  +  + ]:         11 :                 while (n > 0 and is_dec(c)) {
     321                 :          8 :                     post_dot = 10 * post_dot + c - '0';
     322                 :          8 :                     step();
     323                 :          8 :                     n--;
     324                 :            :                 }
     325                 :          3 :                 post_dot *= powi(10, n);
     326                 :            :                 /* Discard further digits */
     327         [ -  + ]:          3 :                 while (is_dec(c)) { step(); }
     328         [ +  - ]:          3 :                 d += d >= 0 ? post_dot : -post_dot;
     329                 :          3 :             }
     330   [ +  -  +  -  :          4 :             if (c != EOF and c != '\n' and c != config().delimiter) {
                   +  + ]
     331                 :          2 :                 diag.e(pos) << "WARNING: Unexpected characters encountered in a decimal.\n";
     332                 :          2 :                 tup.null(col_idx);
     333   [ +  -  -  +  :         10 :                 while (c != EOF and c != '\n' and c != config().delimiter) step();
                   +  + ]
     334                 :          2 :                 return;
     335                 :            :             }
     336         [ +  + ]:          2 :             if (is_neg) d = -d;
     337                 :          2 :             tup.set(col_idx, d);
     338                 :          2 :             break;
     339                 :            :         }
     340                 :            : 
     341                 :            :         case Numeric::N_Float: {
     342                 :         49 :             std::string float_str;
     343   [ +  -  +  +  :        392 :             while(c != EOF and c != '\n' and c != config().delimiter) {
             +  -  +  + ]
     344         [ +  - ]:        343 :                 float_str += c;
     345         [ +  - ]:        343 :                 step();
     346                 :            :             }
     347                 :            :             char* end;
     348                 :         49 :             errno = 0;
     349                 :         49 :             double d = std::strtod(float_str.c_str(), &end);
     350         [ +  + ]:         49 :             if (*end != '\0') {
     351   [ +  -  +  - ]:          5 :                 diag.e(pos) << "WARNING: Unexpected characters encountered in a floating-point number.\n";
     352         [ +  - ]:          5 :                 tup.null(col_idx);
     353                 :          5 :                 return;
     354                 :            :             }
     355   [ -  +  #  #  :         44 :             if ( errno == ERANGE and ( d == HUGE_VAL or d == HUGE_VALF or d == HUGE_VALL ) ) {
             #  #  #  # ]
     356   [ #  #  #  # ]:          0 :                 diag.w(pos) << "WARNING: A floating-point number is larger than the maximum value.\n";
     357                 :          0 :                 d = std::numeric_limits<double>::max();
     358   [ -  +  #  #  :         44 :             } else if ( errno == ERANGE and (d == -HUGE_VAL or d == -HUGE_VALF or d == -HUGE_VALL ) ) {
                   #  # ]
     359   [ #  #  #  # ]:          0 :                 diag.w(pos) << "WARNING: A floating-point number is smaller than the minimum value.\n";
     360                 :          0 :                 d = std::numeric_limits<double>::min();
     361                 :          0 :             }
     362   [ +  -  +  - ]:         44 :             if (ty.is_float())
     363   [ +  -  +  - ]:         44 :                 tup.set(col_idx, float(d));
     364                 :            :             else
     365   [ #  #  #  # ]:          0 :                 tup.set(col_idx, d);
     366                 :         44 :             break;
     367         [ -  + ]:         49 :         }
     368                 :            :     }
     369                 :        143 : }
     370                 :            : 
     371                 :          0 : void DSVReader::operator()(Const<ErrorType>&) { M_unreachable("invalid type"); }
     372                 :          0 : void DSVReader::operator()(Const<NoneType>&) { M_unreachable("invalid type"); }
     373                 :          0 : void DSVReader::operator()(Const<Bitmap>&) { M_unreachable("invalid type"); }
     374                 :          0 : void DSVReader::operator()(Const<FnType>&) { M_unreachable("invalid type"); }
     375                 :            : 
     376                 :         94 : int64_t DSVReader::read_unsigned_int()
     377                 :            : {
     378                 :         94 :     int64_t i = 0;
     379         [ +  + ]:        228 :     while (is_dec(c)) {
     380                 :        134 :         i = 10 * i + c - '0';
     381                 :        134 :         step();
     382                 :            :     }
     383                 :         94 :     return i;
     384                 :            : }

Generated by: LCOV version 1.16