xref: /plugin/struct/meta/Search.php (revision b3a9db2269e6f282a06527b6ba5f5122bdbc5b9d)
1<?php
2
3namespace dokuwiki\plugin\struct\meta;
4
5use dokuwiki\plugin\struct\types\AutoSummary;
6use dokuwiki\plugin\struct\types\DateTime;
7use dokuwiki\plugin\struct\types\Decimal;
8use dokuwiki\plugin\struct\types\Page;
9use dokuwiki\plugin\struct\types\User;
10
11class Search
12{
13    /**
14     * This separator will be used to concat multi values to flatten them in the result set
15     */
16    public const CONCAT_SEPARATOR = "\n!_-_-_-_-_!\n";
17
18    /**
19     * The list of known and allowed comparators
20     * (order matters)
21     */
22    public static $COMPARATORS = array(
23        '<=', '>=', '=*', '=', '<', '>', '!=', '!~', '~', ' IN '
24    );
25
26    /** @var \helper_plugin_struct_db */
27    protected $dbHelper;
28
29    /** @var  \helper_plugin_sqlite */
30    protected $sqlite;
31
32    /** @var Schema[] list of schemas to query */
33    protected $schemas = array();
34
35    /** @var Column[] list of columns to select */
36    protected $columns = array();
37
38    /** @var array the sorting of the result */
39    protected $sortby = array();
40
41    /** @var array the filters */
42    protected $filter = array();
43
44    /** @var array list of aliases tables can be referenced by */
45    protected $aliases = array();
46
47    /** @var  int begin results from here */
48    protected $range_begin = 0;
49
50    /** @var  int end results here */
51    protected $range_end = 0;
52
53    /** @var int the number of results */
54    protected $count = -1;
55    /** @var  string[] the PIDs of the result rows */
56    protected $result_pids = null;
57    /** @var  array the row ids of the result rows */
58    protected $result_rids = [];
59    /** @var  array the revisions of the result rows */
60    protected $result_revs = [];
61
62    /**
63     * Search constructor.
64     */
65    public function __construct()
66    {
67        /** @var  $dbHelper */
68        $this->dbHelper = plugin_load('helper', 'struct_db');
69        $this->sqlite = $this->dbHelper->getDB();
70    }
71
72    public function getDb()
73    {
74        return $this->sqlite;
75    }
76
77    /**
78     * Add a schema to be searched
79     *
80     * Call multiple times for multiple schemas.
81     *
82     * @param string $table
83     * @param string $alias
84     */
85    public function addSchema($table, $alias = '')
86    {
87        $schema = new Schema($table);
88        if (!$schema->getId()) {
89            throw new StructException('schema missing', $table);
90        }
91
92        $this->schemas[$schema->getTable()] = $schema;
93        if ($alias) $this->aliases[$alias] = $schema->getTable();
94    }
95
96    /**
97     * Add a column to be returned by the search
98     *
99     * Call multiple times for multiple columns. Be sure the referenced tables have been
100     * added before
101     *
102     * @param string $colname may contain an alias
103     */
104    public function addColumn($colname)
105    {
106        if ($this->processWildcard($colname)) return; // wildcard?
107        if ($colname[0] == '-') { // remove column from previous wildcard lookup
108            $colname = substr($colname, 1);
109            foreach ($this->columns as $key => $col) {
110                if ($col->getLabel() == $colname) unset($this->columns[$key]);
111            }
112            return;
113        }
114
115        $col = $this->findColumn($colname);
116        if (!$col) return; //FIXME do we really want to ignore missing columns?
117        $this->columns[] = $col;
118    }
119
120    /**
121     * Add sorting options
122     *
123     * Call multiple times for multiple columns. Be sure the referenced tables have been
124     * added before
125     *
126     * @param string $colname may contain an alias
127     * @param bool $asc sort direction (ASC = true, DESC = false)
128     * @param bool $nc set true for caseinsensitivity
129     */
130    public function addSort($colname, $asc = true, $nc = true)
131    {
132        $col = $this->findColumn($colname);
133        if (!$col) return; //FIXME do we really want to ignore missing columns?
134
135        $this->sortby[$col->getFullQualifiedLabel()] = array($col, $asc, $nc);
136    }
137
138    /**
139     * Clear all sorting options
140     *
141     * @return void
142     */
143    public function clearSort()
144    {
145        $this->sortby = [];
146    }
147
148    /**
149     * Returns all set sort columns
150     *
151     * @return array
152     */
153    public function getSorts()
154    {
155        return $this->sortby;
156    }
157
158    /**
159     * Adds a filter
160     *
161     * @param string $colname may contain an alias
162     * @param string|string[] $value
163     * @param string $comp @see self::COMPARATORS
164     * @param string $op either 'OR' or 'AND'
165     */
166    public function addFilter($colname, $value, $comp, $op = 'OR')
167    {
168        /* Convert certain filters into others
169         * this reduces the number of supported filters to implement in types */
170        if ($comp == '*~') {
171            $value = $this->filterWrapAsterisks($value);
172            $comp = '~';
173        } elseif ($comp == '<>') {
174            $comp = '!=';
175        }
176
177        if (!in_array($comp, self::$COMPARATORS))
178            throw new StructException("Bad comperator. Use " . join(',', self::$COMPARATORS));
179        if ($op != 'OR' && $op != 'AND')
180            throw new StructException('Bad filter type . Only AND or OR allowed');
181
182        $col = $this->findColumn($colname);
183        if (!$col) return; // ignore missing columns, filter might have been for different schema
184
185        // map filter operators to SQL syntax
186        switch ($comp) {
187            case '~':
188                $comp = 'LIKE';
189                break;
190            case '!~':
191                $comp = 'NOT LIKE';
192                break;
193            case '=*':
194                $comp = 'REGEXP';
195                break;
196        }
197
198        // we use asterisks, but SQL wants percents
199        if ($comp == 'LIKE' || $comp == 'NOT LIKE') {
200            $value = $this->filterChangeToLike($value);
201        }
202
203        if ($comp == ' IN ' && !is_array($value)) {
204            $value = $this->parseFilterValueList($value);
205            //col IN ('a', 'b', 'c') is equal to col = 'a' OR 'col = 'b' OR col = 'c'
206            $comp = '=';
207        }
208
209        // add the filter
210        $this->filter[] = array($col, $value, $comp, $op);
211    }
212
213    /**
214     * Parse SQLite row value into array
215     *
216     * @param string $value
217     * @return string[]
218     */
219    protected function parseFilterValueList($value)
220    {
221        $Handler = new FilterValueListHandler();
222        $LexerClass = class_exists('\Doku_Lexer') ? '\Doku_Lexer' : '\dokuwiki\Parsing\Lexer\Lexer';
223        $isLegacy = $LexerClass === '\Doku_Lexer';
224        /** @var \Doku_Lexer|\dokuwiki\Parsing\Lexer\Lexer $Lexer */
225        $Lexer = new $LexerClass($Handler, 'base', true);
226
227
228        $Lexer->addEntryPattern('\(', 'base', 'row');
229        $Lexer->addPattern('\s*,\s*', 'row');
230        $Lexer->addExitPattern('\)', 'row');
231
232        $Lexer->addEntryPattern('"', 'row', 'double_quote_string');
233        $Lexer->addSpecialPattern('\\\\"', 'double_quote_string', 'escapeSequence');
234        $Lexer->addExitPattern('"', 'double_quote_string');
235
236        $Lexer->addEntryPattern("'", 'row', 'singleQuoteString');
237        $Lexer->addSpecialPattern("\\\\'", 'singleQuoteString', 'escapeSequence');
238        $Lexer->addExitPattern("'", 'singleQuoteString');
239
240        $Lexer->mapHandler('double_quote_string', 'singleQuoteString');
241
242        $Lexer->addSpecialPattern('[-+]?[0-9]*\.?[0-9]+(?:[eE][-+]?[0-9]+)?', 'row', 'number');
243
244        $res = $Lexer->parse($value);
245
246        $currentMode = $isLegacy ? $Lexer->_mode->getCurrent() : $Lexer->getModeStack()->getCurrent();
247        if (!$res || $currentMode != 'base') {
248            throw new StructException('invalid row value syntax');
249        }
250
251        return $Handler->getRow();
252    }
253
254    /**
255     * Wrap given value in asterisks
256     *
257     * @param string|string[] $value
258     * @return string|string[]
259     */
260    protected function filterWrapAsterisks($value)
261    {
262        $map = function ($input) {
263            return "*$input*";
264        };
265
266        if (is_array($value)) {
267            $value = array_map($map, $value);
268        } else {
269            $value = $map($value);
270        }
271        return $value;
272    }
273
274    /**
275     * Change given string to use % instead of *
276     *
277     * @param string|string[] $value
278     * @return string|string[]
279     */
280    protected function filterChangeToLike($value)
281    {
282        $map = function ($input) {
283            return str_replace('*', '%', $input);
284        };
285
286        if (is_array($value)) {
287            $value = array_map($map, $value);
288        } else {
289            $value = $map($value);
290        }
291        return $value;
292    }
293
294    /**
295     * Set offset for the results
296     *
297     * @param int $offset
298     */
299    public function setOffset($offset)
300    {
301        $limit = 0;
302        if ($this->range_end) {
303            // if there was a limit set previously, the range_end needs to be recalculated
304            $limit = $this->range_end - $this->range_begin;
305        }
306        $this->range_begin = $offset;
307        if ($limit) $this->setLimit($limit);
308    }
309
310    /**
311     * Limit results to this number
312     *
313     * @param int $limit Set to 0 to disable limit again
314     */
315    public function setLimit($limit)
316    {
317        if ($limit) {
318            $this->range_end = $this->range_begin + $limit;
319        } else {
320            $this->range_end = 0;
321        }
322    }
323
324    /**
325     * Return the number of results (regardless of limit and offset settings)
326     *
327     * Use this to implement paging. Important: this may only be called after running @return int
328     * @see execute()
329     *
330     */
331    public function getCount()
332    {
333        if ($this->count < 0) throw new StructException('Count is only accessible after executing the search');
334        return $this->count;
335    }
336
337    /**
338     * Returns the PID associated with each result row
339     *
340     * Important: this may only be called after running @return \string[]
341     * @see execute()
342     *
343     */
344    public function getPids()
345    {
346        if ($this->result_pids === null)
347            throw new StructException('PIDs are only accessible after executing the search');
348        return $this->result_pids;
349    }
350
351    /**
352     * Returns the rid associated with each result row
353     *
354     * Important: this may only be called after running @return array
355     * @see execute()
356     *
357     */
358    public function getRids()
359    {
360        if ($this->result_rids === null)
361            throw new StructException('rids are only accessible after executing the search');
362        return $this->result_rids;
363    }
364
365    /**
366     * Returns the rid associated with each result row
367     *
368     * Important: this may only be called after running @return array
369     * @see execute()
370     *
371     */
372    public function getRevs()
373    {
374        if ($this->result_revs === null)
375            throw new StructException('revs are only accessible after executing the search');
376        return $this->result_revs;
377    }
378
379    /**
380     * Execute this search and return the result
381     *
382     * The result is a two dimensional array of Value()s.
383     *
384     * This will always query for the full result (not using offset and limit) and then
385     * return the wanted range, setting the count (@return Value[][]
386     * @see getCount) to the whole result number
387     *
388     */
389    public function execute()
390    {
391        list($sql, $opts) = $this->getSQL();
392
393        /** @var \PDOStatement $res */
394        $res = $this->sqlite->query($sql, $opts);
395        if ($res === false) throw new StructException("SQL execution failed for\n\n$sql");
396
397        $this->result_pids = array();
398        $result = array();
399        $cursor = -1;
400        $pageidAndRevOnly = array_reduce($this->columns, function ($pageidAndRevOnly, Column $col) {
401            return $pageidAndRevOnly && ($col->getTid() == 0);
402        }, true);
403        while ($row = $res->fetch(\PDO::FETCH_ASSOC)) {
404            $cursor++;
405            if ($cursor < $this->range_begin) continue;
406            if ($this->range_end && $cursor >= $this->range_end) continue;
407
408            $C = 0;
409            $resrow = array();
410            $isempty = true;
411            foreach ($this->columns as $col) {
412                $val = $row["C$C"];
413                if ($col->isMulti()) {
414                    $val = explode(self::CONCAT_SEPARATOR, $val);
415                }
416                $value = new Value($col, $val);
417                $isempty &= $this->isEmptyValue($value);
418                $resrow[] = $value;
419                $C++;
420            }
421
422            // skip empty rows
423            if ($isempty && !$pageidAndRevOnly) {
424                $cursor--;
425                continue;
426            }
427
428            $this->result_pids[] = $row['PID'];
429            $this->result_rids[] = $row['rid'];
430            $this->result_revs[] = $row['rev'];
431            $result[] = $resrow;
432        }
433
434        $res->closeCursor();
435        $this->count = $cursor + 1;
436        return $result;
437    }
438
439    /**
440     * Transform the set search parameters into a statement
441     *
442     * @return array ($sql, $opts) The SQL and parameters to execute
443     */
444    public function getSQL()
445    {
446        if (!$this->columns) throw new StructException('nocolname');
447
448        $QB = new QueryBuilder();
449
450        // basic tables
451        $first_table = '';
452        foreach ($this->schemas as $schema) {
453            $datatable = 'data_' . $schema->getTable();
454            if ($first_table) {
455                // follow up tables
456                $QB->addLeftJoin($first_table, $datatable, $datatable, "$first_table.pid = $datatable.pid");
457            } else {
458                // first table
459                $QB->addTable($datatable);
460
461                // add conditional page clauses if pid has a value
462                $subAnd = $QB->filters()->whereSubAnd();
463                $subAnd->whereAnd("$datatable.pid = ''");
464                $subOr = $subAnd->whereSubOr();
465                $subOr->whereAnd("GETACCESSLEVEL($datatable.pid) > 0");
466                $subOr->whereAnd("PAGEEXISTS($datatable.pid) = 1");
467                $subOr->whereAnd('(ASSIGNED = 1 OR ASSIGNED IS NULL)');
468
469                // add conditional schema assignment check
470                $QB->addLeftJoin(
471                    $datatable,
472                    'schema_assignments',
473                    '',
474                    "$datatable.pid != ''
475                    AND $datatable.pid = schema_assignments.pid
476                    AND schema_assignments.tbl = '{$schema->getTable()}'"
477                );
478
479                $QB->addSelectColumn($datatable, 'rid');
480                $QB->addSelectColumn($datatable, 'pid', 'PID');
481                $QB->addSelectColumn($datatable, 'rev');
482                $QB->addSelectColumn('schema_assignments', 'assigned', 'ASSIGNED');
483                $QB->addGroupByColumn($datatable, 'pid');
484                $QB->addGroupByColumn($datatable, 'rid');
485
486                $first_table = $datatable;
487            }
488            // phpcs:ignore
489            $QB->filters()->whereAnd("( (IS_PUBLISHER($datatable.pid) AND $datatable.latest = 1) OR (IS_PUBLISHER($datatable.pid) !=1 AND $datatable.published = 1) )");
490        }
491
492        // columns to select, handling multis
493        $sep = self::CONCAT_SEPARATOR;
494        $n = 0;
495        foreach ($this->columns as $col) {
496            $CN = 'C' . $n++;
497
498            if ($col->isMulti()) {
499                $datatable = "data_{$col->getTable()}";
500                $multitable = "multi_{$col->getTable()}";
501                $MN = $QB->generateTableAlias('M');
502
503                $QB->addLeftJoin(
504                    $datatable,
505                    $multitable,
506                    $MN,
507                    "$datatable.pid = $MN.pid AND $datatable.rid = $MN.rid AND
508                     $datatable.rev = $MN.rev AND
509                     $MN.colref = {$col->getColref()}"
510                );
511
512                $col->getType()->select($QB, $MN, 'value', $CN);
513                $sel = $QB->getSelectStatement($CN);
514                $QB->addSelectStatement("GROUP_CONCAT_DISTINCT($sel, '$sep')", $CN);
515            } else {
516                $col->getType()->select($QB, 'data_' . $col->getTable(), $col->getColName(), $CN);
517                $QB->addGroupByStatement($CN);
518            }
519        }
520
521        // where clauses
522        if (!empty($this->filter)) {
523            $userWHERE = $QB->filters()->where('AND');
524        }
525        foreach ($this->filter as $filter) {
526            /** @var Column $col */
527            list($col, $value, $comp, $op) = $filter;
528
529            $datatable = "data_{$col->getTable()}";
530            $multitable = "multi_{$col->getTable()}";
531
532            /** @var $col Column */
533            if ($col->isMulti()) {
534                $MN = $QB->generateTableAlias('MN');
535
536                $QB->addLeftJoin(
537                    $datatable,
538                    $multitable,
539                    $MN,
540                    "$datatable.pid = $MN.pid AND $datatable.rid = $MN.rid AND
541                     $datatable.rev = $MN.rev AND
542                     $MN.colref = {$col->getColref()}"
543                );
544                $coltbl = $MN;
545                $colnam = 'value';
546            } else {
547                $coltbl = $datatable;
548                $colnam = $col->getColName();
549            }
550
551            $col->getType()->filter($userWHERE, $coltbl, $colnam, $comp, $value, $op); // type based filter
552        }
553
554        // sorting - we always sort by the single val column
555        foreach ($this->sortby as $sort) {
556            list($col, $asc, $nc) = $sort;
557            /** @var $col Column */
558            $colname = $col->getColName(false);
559            if ($nc) $colname .= ' COLLATE NOCASE';
560            $col->getType()->sort($QB, 'data_' . $col->getTable(), $colname, $asc ? 'ASC' : 'DESC');
561        }
562
563        return $QB->getSQL();
564    }
565
566    /**
567     * Returns all the columns that where added to the search
568     *
569     * @return Column[]
570     */
571    public function getColumns()
572    {
573        return $this->columns;
574    }
575
576    /**
577     * All the schemas currently added
578     *
579     * @return Schema[]
580     */
581    public function getSchemas()
582    {
583        return array_values($this->schemas);
584    }
585
586    /**
587     * Checks if the given column is a * wildcard
588     *
589     * If it's a wildcard all matching columns are added to the column list, otherwise
590     * nothing happens
591     *
592     * @param string $colname
593     * @return bool was wildcard?
594     */
595    protected function processWildcard($colname)
596    {
597        list($colname, $table) = $this->resolveColumn($colname);
598        if ($colname !== '*') return false;
599
600        // no table given? assume the first is meant
601        if ($table === null) {
602            $schema_list = array_keys($this->schemas);
603            $table = $schema_list[0];
604        }
605
606        $schema = $this->schemas[$table] ?? null;
607        if (!$schema) return false;
608        $this->columns = array_merge($this->columns, $schema->getColumns(false));
609        return true;
610    }
611
612    /**
613     * Split a given column name into table and column
614     *
615     * Handles Aliases. Table might be null if none given.
616     *
617     * @param $colname
618     * @return array (colname, table)
619     */
620    protected function resolveColumn($colname)
621    {
622        if (!$this->schemas) throw new StructException('noschemas');
623
624        // resolve the alias or table name
625        @list($table, $colname) = array_pad(explode('.', $colname, 2), 2, '');
626        if (!$colname) {
627            $colname = $table;
628            $table = null;
629        }
630        if ($table && isset($this->aliases[$table])) {
631            $table = $this->aliases[$table];
632        }
633
634        if (!$colname) throw new StructException('nocolname');
635
636        return array($colname, $table);
637    }
638
639    /**
640     * Find a column to be used in the search
641     *
642     * @param string $colname may contain an alias
643     * @return bool|Column
644     */
645    public function findColumn($colname, $strict = false)
646    {
647        if (!$this->schemas) throw new StructException('noschemas');
648        $schema_list = array_keys($this->schemas);
649
650        // add "fake" column for special col
651        if ($colname == '%pageid%') {
652            return new PageColumn(0, new Page(), $schema_list[0]);
653        }
654        if ($colname == '%title%') {
655            return new PageColumn(0, new Page(array('usetitles' => true)), $schema_list[0]);
656        }
657        if ($colname == '%lastupdate%') {
658            return new RevisionColumn(0, new DateTime(), $schema_list[0]);
659        }
660        if ($colname == '%lasteditor%') {
661            return new UserColumn(0, new User(), $schema_list[0]);
662        }
663        if ($colname == '%lastsummary%') {
664            return new SummaryColumn(0, new AutoSummary(), $schema_list[0]);
665        }
666        if ($colname == '%rowid%') {
667            return new RowColumn(0, new Decimal(), $schema_list[0]);
668        }
669        if ($colname == '%published%') {
670            return new PublishedColumn(0, new Decimal(), $schema_list[0]);
671        }
672
673        list($colname, $table) = $this->resolveColumn($colname);
674
675        /*
676         * If table name is given search only that, otherwise if no strict behavior
677         * is requested by the caller, try all assigned schemas for matching the
678         * column name.
679         */
680        if ($table !== null && isset($this->schemas[$table])) {
681            $schemas = array($table => $this->schemas[$table]);
682        } elseif ($table === null || !$strict) {
683            $schemas = $this->schemas;
684        } else {
685            return false;
686        }
687
688        // find it
689        $col = false;
690        foreach ($schemas as $schema) {
691            if (empty($schema)) {
692                continue;
693            }
694            $col = $schema->findColumn($colname);
695            if ($col) break;
696        }
697
698        return $col;
699    }
700
701    /**
702     * Check if the given row is empty or references our own row
703     *
704     * @param Value $value
705     * @return bool
706     */
707    protected function isEmptyValue(Value $value)
708    {
709        if ($value->isEmpty()) return true;
710        if ($value->getColumn()->getTid() == 0) return true;
711        return false;
712    }
713}
714