xref: /plugin/struct/meta/Search.php (revision 1057556627155ce214796eceb5b80361493f882a)
1<?php
2
3namespace dokuwiki\plugin\struct\meta;
4
5use dokuwiki\plugin\struct\types\DateTime;
6use dokuwiki\plugin\struct\types\Decimal;
7use dokuwiki\plugin\struct\types\Page;
8use dokuwiki\plugin\struct\types\AutoSummary;
9use dokuwiki\plugin\struct\types\Text;
10use dokuwiki\plugin\struct\types\User;
11
12class Search {
13    /**
14     * This separator will be used to concat multi values to flatten them in the result set
15     */
16    const CONCAT_SEPARATOR = "\n!_-_-_-_-_!\n";
17
18    /**
19     * The list of known and allowed comparators
20     * (order matters)
21     */
22    static public $COMPARATORS = array(
23        '<=', '>=', '=*', '=', '<', '>', '!=', '!~', '~',
24    );
25
26    /** @var  \helper_plugin_sqlite */
27    protected $sqlite;
28
29    /** @var Schema[] list of schemas to query */
30    protected $schemas = array();
31
32    /** @var Column[] list of columns to select */
33    protected $columns = array();
34
35    /** @var array the sorting of the result */
36    protected $sortby = array();
37
38    /** @var array the filters */
39    protected $filter = array();
40
41    /** @var array list of aliases tables can be referenced by */
42    protected $aliases = array();
43
44    /** @var  int begin results from here */
45    protected $range_begin = 0;
46
47    /** @var  int end results here */
48    protected $range_end = 0;
49
50    /** @var int the number of results */
51    protected $count = -1;
52    /** @var  string[] the PIDs of the result rows */
53    protected $result_pids = null;
54    /** @var  array the row ids of the result rows */
55    protected $result_rids = [];
56
57    /**
58     * Search constructor.
59     */
60    public function __construct() {
61        /** @var \helper_plugin_struct_db $plugin */
62        $plugin = plugin_load('helper', 'struct_db');
63        $this->sqlite = $plugin->getDB();
64    }
65
66    /**
67     * Add a schema to be searched
68     *
69     * Call multiple times for multiple schemas.
70     *
71     * @param string $table
72     * @param string $alias
73     */
74    public function addSchema($table, $alias = '') {
75        $schema = new Schema($table);
76        if(!$schema->getId()) {
77            throw new StructException('schema missing', $table);
78        }
79
80        // FIXME is the mixing still relevant?
81//        if($this->schemas &&
82//            (
83//                $schema->isLookup() ||
84//                reset($this->schemas)->isLookup()
85//            )
86//        ) {
87//            throw new StructException('nolookupmix');
88//        }
89
90        $this->schemas[$schema->getTable()] = $schema;
91        if($alias) $this->aliases[$alias] = $schema->getTable();
92    }
93
94    /**
95     * Add a column to be returned by the search
96     *
97     * Call multiple times for multiple columns. Be sure the referenced tables have been
98     * added before
99     *
100     * @param string $colname may contain an alias
101     */
102    public function addColumn($colname) {
103        if($this->processWildcard($colname)) return; // wildcard?
104        $col = $this->findColumn($colname);
105        if(!$col) return; //FIXME do we really want to ignore missing columns?
106        $this->columns[] = $col;
107    }
108
109    /**
110     * Add sorting options
111     *
112     * Call multiple times for multiple columns. Be sure the referenced tables have been
113     * added before
114     *
115     * @param string $colname may contain an alias
116     * @param bool $asc sort direction (ASC = true, DESC = false)
117     * @param bool $nc set true for caseinsensitivity
118     */
119    public function addSort($colname, $asc = true, $nc = true) {
120        $col = $this->findColumn($colname);
121        if(!$col) return; //FIXME do we really want to ignore missing columns?
122
123        $this->sortby[$col->getFullQualifiedLabel()] = array($col, $asc, $nc);
124    }
125
126    /**
127     * Returns all set sort columns
128     *
129     * @return array
130     */
131    public function getSorts() {
132        return $this->sortby;
133    }
134
135    /**
136     * Adds a filter
137     *
138     * @param string $colname may contain an alias
139     * @param string|string[] $value
140     * @param string $comp @see self::COMPARATORS
141     * @param string $op either 'OR' or 'AND'
142     */
143    public function addFilter($colname, $value, $comp, $op = 'OR') {
144        /* Convert certain filters into others
145         * this reduces the number of supported filters to implement in types */
146        if($comp == '*~') {
147            $value = $this->filterWrapAsterisks($value);
148            $comp = '~';
149        } elseif($comp == '<>') {
150            $comp = '!=';
151        }
152
153        if(!in_array($comp, self::$COMPARATORS)) throw new StructException("Bad comperator. Use " . join(',', self::$COMPARATORS));
154        if($op != 'OR' && $op != 'AND') throw new StructException('Bad filter type . Only AND or OR allowed');
155
156        $col = $this->findColumn($colname);
157        if(!$col) return; // ignore missing columns, filter might have been for different schema
158
159        // map filter operators to SQL syntax
160        switch($comp) {
161            case '~':
162                $comp = 'LIKE';
163                break;
164            case '!~':
165                $comp = 'NOT LIKE';
166                break;
167            case '=*':
168                $comp = 'REGEXP';
169                break;
170        }
171
172        // we use asterisks, but SQL wants percents
173        if($comp == 'LIKE' || $comp == 'NOT LIKE') {
174            $value = $this->filterChangeToLike($value);
175        }
176
177        // add the filter
178        $this->filter[] = array($col, $value, $comp, $op);
179    }
180
181    /**
182     * Wrap given value in asterisks
183     *
184     * @param string|string[] $value
185     * @return string|string[]
186     */
187    protected function filterWrapAsterisks($value) {
188        $map = function ($input) {
189            return "*$input*";
190        };
191
192        if(is_array($value)) {
193            $value = array_map($map, $value);
194        } else {
195            $value = $map($value);
196        }
197        return $value;
198    }
199
200    /**
201     * Change given string to use % instead of *
202     *
203     * @param string|string[] $value
204     * @return string|string[]
205     */
206    protected function filterChangeToLike($value) {
207        $map = function ($input) {
208            return str_replace('*', '%', $input);
209        };
210
211        if(is_array($value)) {
212            $value = array_map($map, $value);
213        } else {
214            $value = $map($value);
215        }
216        return $value;
217    }
218
219    /**
220     * Set offset for the results
221     *
222     * @param int $offset
223     */
224    public function setOffset($offset) {
225        $limit = 0;
226        if($this->range_end) {
227            // if there was a limit set previously, the range_end needs to be recalculated
228            $limit = $this->range_end - $this->range_begin;
229        }
230        $this->range_begin = $offset;
231        if($limit) $this->setLimit($limit);
232    }
233
234    /**
235     * Limit results to this number
236     *
237     * @param int $limit Set to 0 to disable limit again
238     */
239    public function setLimit($limit) {
240        if($limit) {
241            $this->range_end = $this->range_begin + $limit;
242        } else {
243            $this->range_end = 0;
244        }
245    }
246
247    /**
248     * Return the number of results (regardless of limit and offset settings)
249     *
250     * Use this to implement paging. Important: this may only be called after running @see execute()
251     *
252     * @return int
253     */
254    public function getCount() {
255        if($this->count < 0) throw new StructException('Count is only accessible after executing the search');
256        return $this->count;
257    }
258
259    /**
260     * Returns the PID associated with each result row
261     *
262     * Important: this may only be called after running @see execute()
263     *
264     * @return \string[]
265     */
266    public function getPids() {
267        if($this->result_pids === null) throw new StructException('PIDs are only accessible after executing the search');
268        return $this->result_pids;
269    }
270
271    /**
272     * Returns the rid associated with each result row
273     *
274     * Important: this may only be called after running @see execute()
275     *
276     * @return array
277     */
278    public function getRids() {
279        if($this->result_rids === null) throw new StructException('rids are only accessible after executing the search');
280        return $this->result_rids;
281    }
282
283    /**
284     * Execute this search and return the result
285     *
286     * The result is a two dimensional array of Value()s.
287     *
288     * This will always query for the full result (not using offset and limit) and then
289     * return the wanted range, setting the count (@see getCount) to the whole result number
290     *
291     * @return Value[][]
292     */
293    public function execute() {
294        list($sql, $opts) = $this->getSQL();
295
296        /** @var \PDOStatement $res */
297        $res = $this->sqlite->query($sql, $opts);
298        if($res === false) throw new StructException("SQL execution failed for\n\n$sql");
299
300        $this->result_pids = array();
301        $result = array();
302        $cursor = -1;
303        $pageidAndRevOnly = array_reduce($this->columns, function ($pageidAndRevOnly, Column $col) {
304            return $pageidAndRevOnly && ($col->getTid() == 0);
305        }, true);
306        while($row = $res->fetch(\PDO::FETCH_ASSOC)) {
307            $cursor++;
308            if($cursor < $this->range_begin) continue;
309            if($this->range_end && $cursor >= $this->range_end) continue;
310
311            $C = 0;
312            $resrow = array();
313            $isempty = true;
314            foreach($this->columns as $col) {
315                $val = $row["C$C"];
316                if($col->isMulti()) {
317                    $val = explode(self::CONCAT_SEPARATOR, $val);
318                }
319                $value = new Value($col, $val);
320                $isempty &= $this->isEmptyValue($value);
321                $resrow[] = $value;
322                $C++;
323            }
324
325            // skip empty rows
326            if($isempty && !$pageidAndRevOnly) {
327                $cursor--;
328                continue;
329            }
330
331            $this->result_pids[] = $row['PID'];
332            $this->result_rids[] = $row['rid'];
333            $result[] = $resrow;
334        }
335
336        $this->sqlite->res_close($res);
337        $this->count = $cursor + 1;
338        return $result;
339    }
340
341    /**
342     * Transform the set search parameters into a statement
343     *
344     * @return array ($sql, $opts) The SQL and parameters to execute
345     */
346    public function getSQL() {
347        if(!$this->columns) throw new StructException('nocolname');
348
349        $QB = new QueryBuilder();
350
351        // basic tables
352        $first_table = '';
353        foreach($this->schemas as $schema) {
354            $datatable = 'data_' . $schema->getTable();
355            if($first_table) {
356                // follow up tables
357                $QB->addLeftJoin($first_table, $datatable, $datatable, "$first_table.pid = $datatable.pid");
358            } else {
359                // first table
360
361                // FIXME this breaks page search, a different check is needed
362                if(false) {
363                    $QB->addTable('schema_assignments');
364                    $QB->filters()->whereAnd("$datatable.pid = schema_assignments.pid");
365                    $QB->filters()->whereAnd("schema_assignments.tbl = '{$schema->getTable()}'");
366                    $QB->filters()->whereAnd("schema_assignments.assigned = 1");
367                    $QB->filters()->whereAnd("GETACCESSLEVEL($datatable.pid) > 0");
368                    $QB->filters()->whereAnd("PAGEEXISTS($datatable.pid) = 1");
369                }
370
371                $QB->addTable($datatable);
372                $QB->addSelectColumn($datatable, 'rid');
373                $QB->addSelectColumn($datatable, 'pid', 'PID');
374                $QB->addGroupByColumn($datatable, 'pid');
375
376                $first_table = $datatable;
377            }
378            $QB->filters()->whereAnd("$datatable.latest = 1");
379        }
380
381        // columns to select, handling multis
382        $sep = self::CONCAT_SEPARATOR;
383        $n = 0;
384        foreach($this->columns as $col) {
385            $CN = 'C' . $n++;
386
387            if($col->isMulti()) {
388                $datatable = "data_{$col->getTable()}";
389                $multitable = "multi_{$col->getTable()}";
390                $MN = $QB->generateTableAlias('M');
391
392                $QB->addLeftJoin(
393                    $datatable,
394                    $multitable,
395                    $MN,
396                    "$datatable.pid = $MN.pid AND
397                     $datatable.rev = $MN.rev AND
398                     $MN.colref = {$col->getColref()}"
399                );
400
401                $col->getType()->select($QB, $MN, 'value', $CN);
402                $sel = $QB->getSelectStatement($CN);
403                $QB->addSelectStatement("GROUP_CONCAT($sel, '$sep')", $CN);
404            } else {
405                $col->getType()->select($QB, 'data_' . $col->getTable(), $col->getColName(), $CN);
406                $QB->addGroupByStatement($CN);
407            }
408        }
409
410        // where clauses
411        if (!empty($this->filter)) {
412            $userWHERE = $QB->filters()->where('AND');
413        }
414        foreach($this->filter as $filter) {
415            /** @var Column $col */
416            list($col, $value, $comp, $op) = $filter;
417
418            $datatable = "data_{$col->getTable()}";
419            $multitable = "multi_{$col->getTable()}";
420
421            /** @var $col Column */
422            if($col->isMulti()) {
423                $MN = $QB->generateTableAlias('MN');
424
425                $QB->addLeftJoin(
426                    $datatable,
427                    $multitable,
428                    $MN,
429                    "$datatable.pid = $MN.pid AND
430                     $datatable.rev = $MN.rev AND
431                     $MN.colref = {$col->getColref()}"
432                );
433                $coltbl = $MN;
434                $colnam = 'value';
435            } else {
436                $coltbl = $datatable;
437                $colnam = $col->getColName();
438            }
439
440            $col->getType()->filter($userWHERE, $coltbl, $colnam, $comp, $value, $op); // type based filter
441        }
442
443        // sorting - we always sort by the single val column
444        foreach($this->sortby as $sort) {
445            list($col, $asc, $nc) = $sort;
446            /** @var $col Column */
447            $colname = $col->getColName(false);
448            if($nc) $colname .= ' COLLATE NOCASE';
449            $col->getType()->sort($QB, 'data_' . $col->getTable(), $colname, $asc ? 'ASC' : 'DESC');
450        }
451
452        return $QB->getSQL();
453    }
454
455    /**
456     * Returns all the columns that where added to the search
457     *
458     * @return Column[]
459     */
460    public function getColumns() {
461        return $this->columns;
462    }
463
464    /**
465     * All the schemas currently added
466     *
467     * @return Schema[]
468     */
469    public function getSchemas() {
470        return array_values($this->schemas);
471    }
472
473    /**
474     * Checks if the given column is a * wildcard
475     *
476     * If it's a wildcard all matching columns are added to the column list, otherwise
477     * nothing happens
478     *
479     * @param string $colname
480     * @return bool was wildcard?
481     */
482    protected function processWildcard($colname) {
483        list($colname, $table) = $this->resolveColumn($colname);
484        if($colname !== '*') return false;
485
486        // no table given? assume the first is meant
487        if($table === null) {
488            $schema_list = array_keys($this->schemas);
489            $table = $schema_list[0];
490        }
491
492        $schema = $this->schemas[$table];
493        if(!$schema) return false;
494        $this->columns = array_merge($this->columns, $schema->getColumns(false));
495        return true;
496    }
497
498    /**
499     * Split a given column name into table and column
500     *
501     * Handles Aliases. Table might be null if none given.
502     *
503     * @param $colname
504     * @return array (colname, table)
505     */
506    protected function resolveColumn($colname) {
507        if(!$this->schemas) throw new StructException('noschemas');
508
509        // resolve the alias or table name
510        @list($table, $colname) = explode('.', $colname, 2);
511        if(!$colname) {
512            $colname = $table;
513            $table = null;
514        }
515        if($table && isset($this->aliases[$table])) {
516            $table = $this->aliases[$table];
517        }
518
519        if(!$colname) throw new StructException('nocolname');
520
521        return array($colname, $table);
522    }
523
524    /**
525     * Find a column to be used in the search
526     *
527     * @param string $colname may contain an alias
528     * @return bool|Column
529     */
530    public function findColumn($colname) {
531        if(!$this->schemas) throw new StructException('noschemas');
532        $schema_list = array_keys($this->schemas);
533
534        // add "fake" column for special col
535        if($colname == '%pageid%') {
536            return new PageColumn(0, new Page(), $schema_list[0]);
537        }
538        if($colname == '%title%') {
539            return new PageColumn(0, new Page(array('usetitles' => true)), $schema_list[0]);
540        }
541        if($colname == '%lastupdate%') {
542            return new RevisionColumn(0, new DateTime(), $schema_list[0]);
543        }
544        if ($colname == '%lasteditor%') {
545            return new UserColumn(0, new User(), $schema_list[0]);
546        }
547        if ($colname == '%lastsummary%') {
548            return new SummaryColumn(0, new AutoSummary(), $schema_list[0]);
549        }
550        if($colname == '%rowid%') {
551            return new RowColumn(0, new Decimal(), $schema_list[0]);
552        }
553
554        list($colname, $table) = $this->resolveColumn($colname);
555
556        // if table name given search only that, otherwise try all for matching column name
557        if($table !== null) {
558            $schemas = array($table => $this->schemas[$table]);
559        } else {
560            $schemas = $this->schemas;
561        }
562
563        // find it
564        $col = false;
565        foreach($schemas as $schema) {
566            if(empty($schema)) {
567                continue;
568            }
569            $col = $schema->findColumn($colname);
570            if($col) break;
571        }
572
573        return $col;
574    }
575
576    /**
577     * Check if the given row is empty or references our own row
578     *
579     * @param Value $value
580     * @return bool
581     */
582    protected function isEmptyValue(Value $value) {
583        if ($value->isEmpty()) return true;
584        if ($value->getColumn()->getTid() == 0) return true;
585        return false;
586    }
587}
588
589
590