xref: /plugin/struct/meta/Search.php (revision eafc109f41a4f149995a4d5aadb0fb0af66a7b9d)
1<?php
2
3namespace dokuwiki\plugin\struct\meta;
4
5use dokuwiki\plugin\struct\types\DateTime;
6use dokuwiki\plugin\struct\types\Decimal;
7use dokuwiki\plugin\struct\types\Page;
8
9class Search {
10    /**
11     * This separator will be used to concat multi values to flatten them in the result set
12     */
13    const CONCAT_SEPARATOR = "\n!_-_-_-_-_!\n";
14
15    /**
16     * The list of known and allowed comparators
17     * (order matters)
18     */
19    static public $COMPARATORS = array(
20        '<=', '>=', '=*', '=', '<', '>', '!=', '!~', '~',
21    );
22
23    /** @var  \helper_plugin_sqlite */
24    protected $sqlite;
25
26    /** @var Schema[] list of schemas to query */
27    protected $schemas = array();
28
29    /** @var Column[] list of columns to select */
30    protected $columns = array();
31
32    /** @var array the sorting of the result */
33    protected $sortby = array();
34
35    /** @var array the filters */
36    protected $filter = array();
37
38    /** @var array list of aliases tables can be referenced by */
39    protected $aliases = array();
40
41    /** @var  int begin results from here */
42    protected $range_begin = 0;
43
44    /** @var  int end results here */
45    protected $range_end = 0;
46
47    /** @var int the number of results */
48    protected $count = -1;
49    /** @var  string[] the PIDs of the result rows */
50    protected $result_pids = null;
51
52    /**
53     * Search constructor.
54     */
55    public function __construct() {
56        /** @var \helper_plugin_struct_db $plugin */
57        $plugin = plugin_load('helper', 'struct_db');
58        $this->sqlite = $plugin->getDB();
59    }
60
61    /**
62     * Add a schema to be searched
63     *
64     * Call multiple times for multiple schemas.
65     *
66     * @param string $table
67     * @param string $alias
68     */
69    public function addSchema($table, $alias = '') {
70        $schema = new Schema($table);
71        if(!$schema->getId()) {
72            throw new StructException('schema missing', $table);
73        }
74
75        if($this->schemas &&
76            (
77                $schema->isLookup() ||
78                reset($this->schemas)->isLookup()
79            )
80        ) {
81            throw new StructException('nolookupmix');
82        }
83
84        $this->schemas[$table] = $schema;
85        if($alias) $this->aliases[$alias] = $table;
86    }
87
88    /**
89     * Add a column to be returned by the search
90     *
91     * Call multiple times for multiple columns. Be sure the referenced tables have been
92     * added before
93     *
94     * @param string $colname may contain an alias
95     */
96    public function addColumn($colname) {
97        if($this->processWildcard($colname)) return; // wildcard?
98        $col = $this->findColumn($colname);
99        if(!$col) return; //FIXME do we really want to ignore missing columns?
100        $this->columns[] = $col;
101    }
102
103    /**
104     * Add sorting options
105     *
106     * Call multiple times for multiple columns. Be sure the referenced tables have been
107     * added before
108     *
109     * @param string $colname may contain an alias
110     * @param bool $asc sort direction (ASC = true, DESC = false)
111     */
112    public function addSort($colname, $asc = true) {
113        $col = $this->findColumn($colname);
114        if(!$col) return; //FIXME do we really want to ignore missing columns?
115
116        $this->sortby[$col->getFullQualifiedLabel()] = array($col, $asc);
117    }
118
119    /**
120     * Returns all set sort columns
121     *
122     * @return array
123     */
124    public function getSorts() {
125        return $this->sortby;
126    }
127
128    /**
129     * Adds a filter
130     *
131     * @param string $colname may contain an alias
132     * @param string|string[] $value
133     * @param string $comp @see self::COMPARATORS
134     * @param string $op either 'OR' or 'AND'
135     */
136    public function addFilter($colname, $value, $comp, $op = 'OR') {
137        /* Convert certain filters into others
138         * this reduces the number of supported filters to implement in types */
139        if($comp == '*~') {
140            $value = $this->filterWrapAsterisks($value);
141            $comp = '~';
142        } elseif($comp == '<>') {
143            $comp = '!=';
144        }
145
146        if(!in_array($comp, self::$COMPARATORS)) throw new StructException("Bad comperator. Use " . join(',', self::$COMPARATORS));
147        if($op != 'OR' && $op != 'AND') throw new StructException('Bad filter type . Only AND or OR allowed');
148
149        $col = $this->findColumn($colname);
150        if(!$col) return; // ignore missing columns, filter might have been for different schema
151
152        // map filter operators to SQL syntax
153        switch($comp) {
154            case '~':
155                $comp = 'LIKE';
156                break;
157            case '!~':
158                $comp = 'NOT LIKE';
159                break;
160            case '=*':
161                $comp = 'REGEXP';
162                break;
163        }
164
165        // we use asterisks, but SQL wants percents
166        if($comp == 'LIKE' || $comp == 'NOT LIKE') {
167            $value = $this->filterChangeToLike($value);
168        }
169
170        // add the filter
171        $this->filter[] = array($col, $value, $comp, $op);
172    }
173
174    /**
175     * Wrap given value in asterisks
176     *
177     * @param string|string[] $value
178     * @return string|string[]
179     */
180    protected function filterWrapAsterisks($value) {
181        $map = function ($input) {
182            return "*$input*";
183        };
184
185        if(is_array($value)) {
186            $value = array_map($map, $value);
187        } else {
188            $value = $map($value);
189        }
190        return $value;
191    }
192
193    /**
194     * Change given string to use % instead of *
195     *
196     * @param string|string[] $value
197     * @return string|string[]
198     */
199    protected function filterChangeToLike($value) {
200        $map = function ($input) {
201            return str_replace('*', '%', $input);
202        };
203
204        if(is_array($value)) {
205            $value = array_map($map, $value);
206        } else {
207            $value = $map($value);
208        }
209        return $value;
210    }
211
212    /**
213     * Set offset for the results
214     *
215     * @param int $offset
216     */
217    public function setOffset($offset) {
218        $limit = 0;
219        if($this->range_end) {
220            // if there was a limit set previously, the range_end needs to be recalculated
221            $limit = $this->range_end - $this->range_begin;
222        }
223        $this->range_begin = $offset;
224        if($limit) $this->setLimit($limit);
225    }
226
227    /**
228     * Limit results to this number
229     *
230     * @param int $limit Set to 0 to disable limit again
231     */
232    public function setLimit($limit) {
233        if($limit) {
234            $this->range_end = $this->range_begin + $limit;
235        } else {
236            $this->range_end = 0;
237        }
238    }
239
240    /**
241     * Return the number of results (regardless of limit and offset settings)
242     *
243     * Use this to implement paging. Important: this may only be called after running @see execute()
244     *
245     * @return int
246     */
247    public function getCount() {
248        if($this->count < 0) throw new StructException('Count is only accessible after executing the search');
249        return $this->count;
250    }
251
252    /**
253     * Returns the PID associated with each result row
254     *
255     * Important: this may only be called after running @see execute()
256     *
257     * @return \string[]
258     */
259    public function getPids() {
260        if($this->result_pids === null) throw new StructException('PIDs are only accessible after executing the search');
261        return $this->result_pids;
262    }
263
264    /**
265     * Execute this search and return the result
266     *
267     * The result is a two dimensional array of Value()s.
268     *
269     * This will always query for the full result (not using offset and limit) and then
270     * return the wanted range, setting the count (@see getCount) to the whole result number
271     *
272     * @return Value[][]
273     */
274    public function execute() {
275        list($sql, $opts) = $this->getSQL();
276
277        /** @var \PDOStatement $res */
278        $res = $this->sqlite->query($sql, $opts);
279        if($res === false) throw new StructException("SQL execution failed for\n\n$sql");
280
281        $this->result_pids = array();
282        $result = array();
283        $cursor = -1;
284        $pageidAndRevOnly = array_reduce($this->columns, function ($pageidAndRevOnly, Column $col) {
285            return $pageidAndRevOnly && ($col->getTid() == 0);
286        }, true);
287        while($row = $res->fetch(\PDO::FETCH_ASSOC)) {
288            $cursor++;
289            if($cursor < $this->range_begin) continue;
290            if($this->range_end && $cursor >= $this->range_end) continue;
291
292            $C = 0;
293            $resrow = array();
294            $isempty = true;
295            foreach($this->columns as $col) {
296                $val = $row["C$C"];
297                if($col->isMulti()) {
298                    $val = explode(self::CONCAT_SEPARATOR, $val);
299                }
300                $value = new Value($col, $val);
301                $isempty &= $this->isEmptyValue($value);
302                $resrow[] = $value;
303                $C++;
304            }
305
306            // skip empty rows
307            if($isempty && !$pageidAndRevOnly) {
308                $cursor--;
309                continue;
310            }
311
312            $this->result_pids[] = $row['PID'];
313            $result[] = $resrow;
314        }
315
316        $this->sqlite->res_close($res);
317        $this->count = $cursor + 1;
318        return $result;
319    }
320
321    /**
322     * Transform the set search parameters into a statement
323     *
324     * @return array ($sql, $opts) The SQL and parameters to execute
325     */
326    public function getSQL() {
327        if(!$this->columns) throw new StructException('nocolname');
328
329        $QB = new QueryBuilder();
330
331        // basic tables
332        $first_table = '';
333        foreach($this->schemas as $schema) {
334            $datatable = 'data_' . $schema->getTable();
335            if($first_table) {
336                // follow up tables
337                $QB->addLeftJoin($first_table, $datatable, $datatable, "$first_table.pid = $datatable.pid");
338            } else {
339                // first table
340
341                if(!$schema->isLookup()) {
342                    $QB->addTable('schema_assignments');
343                    $QB->filters()->whereAnd("$datatable.pid = schema_assignments.pid");
344                    $QB->filters()->whereAnd("schema_assignments.tbl = '{$schema->getTable()}'");
345                    $QB->filters()->whereAnd("schema_assignments.assigned = 1");
346                    $QB->filters()->whereAnd("GETACCESSLEVEL($datatable.pid) > 0");
347                    $QB->filters()->whereAnd("PAGEEXISTS($datatable.pid) = 1");
348                }
349
350                $QB->addTable($datatable);
351                $QB->addSelectColumn($datatable, 'pid', 'PID');
352                $QB->addGroupByColumn($datatable, 'pid');
353
354                $first_table = $datatable;
355            }
356            $QB->filters()->whereAnd("$datatable.latest = 1");
357        }
358
359        // columns to select, handling multis
360        $sep = self::CONCAT_SEPARATOR;
361        $n = 0;
362        foreach($this->columns as $col) {
363            $CN = 'C' . $n++;
364
365            if($col->isMulti()) {
366                $datatable = "data_{$col->getTable()}";
367                $multitable = "multi_{$col->getTable()}";
368                $MN = 'M' . $col->getColref();
369
370                $QB->addLeftJoin(
371                    $datatable,
372                    $multitable,
373                    $MN,
374                    "$datatable.pid = $MN.pid AND
375                     $datatable.rev = $MN.rev AND
376                     $MN.colref = {$col->getColref()}"
377                );
378
379                $col->getType()->select($QB, $MN, 'value', $CN);
380                $sel = $QB->getSelectStatement($CN);
381                $QB->addSelectStatement("GROUP_CONCAT($sel, '$sep')", $CN);
382            } else {
383                $col->getType()->select($QB, 'data_' . $col->getTable(), $col->getColName(), $CN);
384                $QB->addGroupByStatement($CN);
385            }
386        }
387
388        // where clauses
389        foreach($this->filter as $filter) {
390            list($col, $value, $comp, $op) = $filter;
391
392            $datatable = "data_{$col->getTable()}";
393            $multitable = "multi_{$col->getTable()}";
394
395            /** @var $col Column */
396            if($col->isMulti()) {
397                $MN = 'MN' . $col->getColref(); // FIXME this joins a second time if the column was selected before
398
399                $QB->addLeftJoin(
400                    $datatable,
401                    $multitable,
402                    $MN,
403                    "$datatable.pid = $MN.pid AND
404                     $datatable.rev = $MN.rev AND
405                     $MN.colref = {$col->getColref()}"
406                );
407                $coltbl = $MN;
408                $colnam = 'value';
409            } else {
410                $coltbl = $datatable;
411                $colnam = $col->getColName();
412            }
413
414            $col->getType()->filter($QB, $coltbl, $colnam, $comp, $value, $op); // type based filter
415        }
416
417        // sorting - we always sort by the single val column
418        foreach($this->sortby as $sort) {
419            list($col, $asc) = $sort;
420            /** @var $col Column */
421            $col->getType()->sort($QB, 'data_' . $col->getTable(), $col->getColName(false), $asc ? 'ASC' : 'DESC');
422        }
423
424        return $QB->getSQL();
425    }
426
427    /**
428     * Returns all the columns that where added to the search
429     *
430     * @return Column[]
431     */
432    public function getColumns() {
433        return $this->columns;
434    }
435
436    /**
437     * Checks if the given column is a * wildcard
438     *
439     * If it's a wildcard all matching columns are added to the column list, otherwise
440     * nothing happens
441     *
442     * @param string $colname
443     * @return bool was wildcard?
444     */
445    protected function processWildcard($colname) {
446        list($colname, $table) = $this->resolveColumn($colname);
447        if($colname !== '*') return false;
448
449        // no table given? assume the first is meant
450        if($table === null) {
451            $schema_list = array_keys($this->schemas);
452            $table = $schema_list[0];
453        }
454
455        $schema = $this->schemas[$table];
456        if(!$schema) return false;
457        $this->columns = array_merge($this->columns, $schema->getColumns(false));
458        return true;
459    }
460
461    /**
462     * Split a given column name into table and column
463     *
464     * Handles Aliases. Table might be null if none given.
465     *
466     * @param $colname
467     * @return array (colname, table)
468     */
469    protected function resolveColumn($colname) {
470        if(!$this->schemas) throw new StructException('noschemas');
471
472        // resolve the alias or table name
473        list($table, $colname) = explode('.', $colname, 2);
474        if(!$colname) {
475            $colname = $table;
476            $table = null;
477        }
478        if($table && isset($this->aliases[$table])) {
479            $table = $this->aliases[$table];
480        }
481
482        if(!$colname) throw new StructException('nocolname');
483
484        return array($colname, $table);
485    }
486
487    /**
488     * Find a column to be used in the search
489     *
490     * @param string $colname may contain an alias
491     * @return bool|Column
492     */
493    public function findColumn($colname) {
494        if(!$this->schemas) throw new StructException('noschemas');
495        $schema_list = array_keys($this->schemas);
496
497        // add "fake" column for special col
498        if(!(reset($this->schemas)->isLookup())) {
499            if($colname == '%pageid%') {
500                return new PageColumn(0, new Page(), $schema_list[0]);
501            }
502            if($colname == '%title%') {
503                return new PageColumn(0, new Page(array('usetitles' => true)), $schema_list[0]);
504            }
505            if($colname == '%lastupdate%') {
506                return new RevisionColumn(0, new DateTime(), $schema_list[0]);
507            }
508        } else {
509            if($colname == '%rowid%') {
510                return new RowColumn(0, new Decimal(), $schema_list[0]);
511            }
512        }
513
514        list($colname, $table) = $this->resolveColumn($colname);
515
516        // if table name given search only that, otherwise try all for matching column name
517        if($table !== null) {
518            $schemas = array($table => $this->schemas[$table]);
519        } else {
520            $schemas = $this->schemas;
521        }
522
523        // find it
524        $col = false;
525        foreach($schemas as $schema) {
526            if(empty($schema)) {
527                continue;
528            }
529            $col = $schema->findColumn($colname);
530            if($col) break;
531        }
532
533        return $col;
534    }
535
536    /**
537     * Check if the given row is empty or references our own row
538     *
539     * @param Value $value
540     * @return bool
541     */
542    protected function isEmptyValue(Value $value) {
543        if ($value->isEmpty()) return true;
544        if ($value->getColumn()->getTid() == 0) return true;
545        return false;
546    }
547}
548
549
550