1<?php 2 3namespace dokuwiki\plugin\struct\meta; 4 5use dokuwiki\plugin\struct\types\AutoSummary; 6use dokuwiki\plugin\struct\types\DateTime; 7use dokuwiki\plugin\struct\types\Decimal; 8use dokuwiki\plugin\struct\types\Page; 9use dokuwiki\plugin\struct\types\User; 10 11class Search 12{ 13 /** 14 * This separator will be used to concat multi values to flatten them in the result set 15 */ 16 const CONCAT_SEPARATOR = "\n!_-_-_-_-_!\n"; 17 18 /** 19 * The list of known and allowed comparators 20 * (order matters) 21 */ 22 public static $COMPARATORS = array( 23 '<=', '>=', '=*', '=', '<', '>', '!=', '!~', '~', ' IN ' 24 ); 25 26 /** @var \helper_plugin_sqlite */ 27 protected $sqlite; 28 29 /** @var Schema[] list of schemas to query */ 30 protected $schemas = array(); 31 32 /** @var Column[] list of columns to select */ 33 protected $columns = array(); 34 35 /** @var array the sorting of the result */ 36 protected $sortby = array(); 37 38 /** @var array the filters */ 39 protected $filter = array(); 40 41 /** @var array list of aliases tables can be referenced by */ 42 protected $aliases = array(); 43 44 /** @var int begin results from here */ 45 protected $range_begin = 0; 46 47 /** @var int end results here */ 48 protected $range_end = 0; 49 50 /** @var int the number of results */ 51 protected $count = -1; 52 /** @var string[] the PIDs of the result rows */ 53 protected $result_pids = null; 54 /** @var array the row ids of the result rows */ 55 protected $result_rids = []; 56 /** @var array the revisions of the result rows */ 57 protected $result_revs = []; 58 59 /** 60 * Search constructor. 61 */ 62 public function __construct() 63 { 64 /** @var \helper_plugin_struct_db $plugin */ 65 $plugin = plugin_load('helper', 'struct_db'); 66 $this->sqlite = $plugin->getDB(); 67 } 68 69 /** 70 * Add a schema to be searched 71 * 72 * Call multiple times for multiple schemas. 73 * 74 * @param string $table 75 * @param string $alias 76 */ 77 public function addSchema($table, $alias = '') 78 { 79 $schema = new Schema($table); 80 if (!$schema->getId()) { 81 throw new StructException('schema missing', $table); 82 } 83 84 $this->schemas[$schema->getTable()] = $schema; 85 if ($alias) $this->aliases[$alias] = $schema->getTable(); 86 } 87 88 /** 89 * Add a column to be returned by the search 90 * 91 * Call multiple times for multiple columns. Be sure the referenced tables have been 92 * added before 93 * 94 * @param string $colname may contain an alias 95 */ 96 public function addColumn($colname) 97 { 98 if ($this->processWildcard($colname)) return; // wildcard? 99 if ($colname[0] == '-') { // remove column from previous wildcard lookup 100 $colname = substr($colname, 1); 101 foreach ($this->columns as $key => $col) { 102 if ($col->getLabel() == $colname) unset($this->columns[$key]); 103 } 104 return; 105 } 106 107 $col = $this->findColumn($colname); 108 if (!$col) return; //FIXME do we really want to ignore missing columns? 109 $this->columns[] = $col; 110 } 111 112 /** 113 * Add sorting options 114 * 115 * Call multiple times for multiple columns. Be sure the referenced tables have been 116 * added before 117 * 118 * @param string $colname may contain an alias 119 * @param bool $asc sort direction (ASC = true, DESC = false) 120 * @param bool $nc set true for caseinsensitivity 121 */ 122 public function addSort($colname, $asc = true, $nc = true) 123 { 124 $col = $this->findColumn($colname); 125 if (!$col) return; //FIXME do we really want to ignore missing columns? 126 127 $this->sortby[$col->getFullQualifiedLabel()] = array($col, $asc, $nc); 128 } 129 130 /** 131 * Returns all set sort columns 132 * 133 * @return array 134 */ 135 public function getSorts() 136 { 137 return $this->sortby; 138 } 139 140 /** 141 * Adds a filter 142 * 143 * @param string $colname may contain an alias 144 * @param string|string[] $value 145 * @param string $comp @see self::COMPARATORS 146 * @param string $op either 'OR' or 'AND' 147 */ 148 public function addFilter($colname, $value, $comp, $op = 'OR') 149 { 150 /* Convert certain filters into others 151 * this reduces the number of supported filters to implement in types */ 152 if ($comp == '*~') { 153 $value = $this->filterWrapAsterisks($value); 154 $comp = '~'; 155 } elseif ($comp == '<>') { 156 $comp = '!='; 157 } 158 159 if (!in_array($comp, self::$COMPARATORS)) throw new StructException("Bad comperator. Use " . join(',', self::$COMPARATORS)); 160 if ($op != 'OR' && $op != 'AND') throw new StructException('Bad filter type . Only AND or OR allowed'); 161 162 $col = $this->findColumn($colname); 163 if (!$col) return; // ignore missing columns, filter might have been for different schema 164 165 // map filter operators to SQL syntax 166 switch ($comp) { 167 case '~': 168 $comp = 'LIKE'; 169 break; 170 case '!~': 171 $comp = 'NOT LIKE'; 172 break; 173 case '=*': 174 $comp = 'REGEXP'; 175 break; 176 } 177 178 // we use asterisks, but SQL wants percents 179 if ($comp == 'LIKE' || $comp == 'NOT LIKE') { 180 $value = $this->filterChangeToLike($value); 181 } 182 183 if ($comp == ' IN ' && !is_array($value)) { 184 $value = $this->parseFilterValueList($value); 185 //col IN ('a', 'b', 'c') is equal to col = 'a' OR 'col = 'b' OR col = 'c' 186 $comp = '='; 187 } 188 189 // add the filter 190 $this->filter[] = array($col, $value, $comp, $op); 191 } 192 193 /** 194 * Parse SQLite row value into array 195 * 196 * @param string $value 197 * @return string[] 198 */ 199 protected function parseFilterValueList($value) 200 { 201 $Handler = new FilterValueListHandler(); 202 $LexerClass = class_exists('\Doku_Lexer') ? '\Doku_Lexer' : '\dokuwiki\Parsing\Lexer\Lexer'; 203 $isLegacy = $LexerClass === '\Doku_Lexer'; 204 /** @var \Doku_Lexer|\dokuwiki\Parsing\Lexer\Lexer $Lexer */ 205 $Lexer = new $LexerClass($Handler, 'base', true); 206 207 208 $Lexer->addEntryPattern('\(', 'base', 'row'); 209 $Lexer->addPattern('\s*,\s*', 'row'); 210 $Lexer->addExitPattern('\)', 'row'); 211 212 $Lexer->addEntryPattern('"', 'row', 'double_quote_string'); 213 $Lexer->addSpecialPattern('\\\\"', 'double_quote_string', 'escapeSequence'); 214 $Lexer->addExitPattern('"', 'double_quote_string'); 215 216 $Lexer->addEntryPattern("'", 'row', 'singleQuoteString'); 217 $Lexer->addSpecialPattern("\\\\'", 'singleQuoteString', 'escapeSequence'); 218 $Lexer->addExitPattern("'", 'singleQuoteString'); 219 220 $Lexer->mapHandler('double_quote_string', 'singleQuoteString'); 221 222 $Lexer->addSpecialPattern('[-+]?[0-9]*\.?[0-9]+(?:[eE][-+]?[0-9]+)?', 'row', 'number'); 223 224 $res = $Lexer->parse($value); 225 226 $currentMode = $isLegacy ? $Lexer->_mode->getCurrent() : $Lexer->getModeStack()->getCurrent(); 227 if (!$res || $currentMode != 'base') { 228 throw new StructException('invalid row value syntax'); 229 } 230 231 return $Handler->getRow(); 232 } 233 234 /** 235 * Wrap given value in asterisks 236 * 237 * @param string|string[] $value 238 * @return string|string[] 239 */ 240 protected function filterWrapAsterisks($value) 241 { 242 $map = function ($input) { 243 return "*$input*"; 244 }; 245 246 if (is_array($value)) { 247 $value = array_map($map, $value); 248 } else { 249 $value = $map($value); 250 } 251 return $value; 252 } 253 254 /** 255 * Change given string to use % instead of * 256 * 257 * @param string|string[] $value 258 * @return string|string[] 259 */ 260 protected function filterChangeToLike($value) 261 { 262 $map = function ($input) { 263 return str_replace('*', '%', $input); 264 }; 265 266 if (is_array($value)) { 267 $value = array_map($map, $value); 268 } else { 269 $value = $map($value); 270 } 271 return $value; 272 } 273 274 /** 275 * Set offset for the results 276 * 277 * @param int $offset 278 */ 279 public function setOffset($offset) 280 { 281 $limit = 0; 282 if ($this->range_end) { 283 // if there was a limit set previously, the range_end needs to be recalculated 284 $limit = $this->range_end - $this->range_begin; 285 } 286 $this->range_begin = $offset; 287 if ($limit) $this->setLimit($limit); 288 } 289 290 /** 291 * Limit results to this number 292 * 293 * @param int $limit Set to 0 to disable limit again 294 */ 295 public function setLimit($limit) 296 { 297 if ($limit) { 298 $this->range_end = $this->range_begin + $limit; 299 } else { 300 $this->range_end = 0; 301 } 302 } 303 304 /** 305 * Return the number of results (regardless of limit and offset settings) 306 * 307 * Use this to implement paging. Important: this may only be called after running @return int 308 * @see execute() 309 * 310 */ 311 public function getCount() 312 { 313 if ($this->count < 0) throw new StructException('Count is only accessible after executing the search'); 314 return $this->count; 315 } 316 317 /** 318 * Returns the PID associated with each result row 319 * 320 * Important: this may only be called after running @return \string[] 321 * @see execute() 322 * 323 */ 324 public function getPids() 325 { 326 if ($this->result_pids === null) throw new StructException('PIDs are only accessible after executing the search'); 327 return $this->result_pids; 328 } 329 330 /** 331 * Returns the rid associated with each result row 332 * 333 * Important: this may only be called after running @return array 334 * @see execute() 335 * 336 */ 337 public function getRids() 338 { 339 if ($this->result_rids === null) throw new StructException('rids are only accessible after executing the search'); 340 return $this->result_rids; 341 } 342 343 /** 344 * Returns the rid associated with each result row 345 * 346 * Important: this may only be called after running @return array 347 * @see execute() 348 * 349 */ 350 public function getRevs() 351 { 352 if ($this->result_revs === null) throw new StructException('revs are only accessible after executing the search'); 353 return $this->result_revs; 354 } 355 356 /** 357 * Execute this search and return the result 358 * 359 * The result is a two dimensional array of Value()s. 360 * 361 * This will always query for the full result (not using offset and limit) and then 362 * return the wanted range, setting the count (@return Value[][] 363 * @see getCount) to the whole result number 364 * 365 */ 366 public function execute() 367 { 368 list($sql, $opts) = $this->getSQL(); 369 370 /** @var \PDOStatement $res */ 371 $res = $this->sqlite->query($sql, $opts); 372 if ($res === false) throw new StructException("SQL execution failed for\n\n$sql"); 373 374 $this->result_pids = array(); 375 $result = array(); 376 $cursor = -1; 377 $pageidAndRevOnly = array_reduce($this->columns, function ($pageidAndRevOnly, Column $col) { 378 return $pageidAndRevOnly && ($col->getTid() == 0); 379 }, true); 380 while ($row = $res->fetch(\PDO::FETCH_ASSOC)) { 381 $cursor++; 382 if ($cursor < $this->range_begin) continue; 383 if ($this->range_end && $cursor >= $this->range_end) continue; 384 385 $C = 0; 386 $resrow = array(); 387 $isempty = true; 388 foreach ($this->columns as $col) { 389 $val = $row["C$C"]; 390 if ($col->isMulti()) { 391 $val = explode(self::CONCAT_SEPARATOR, $val); 392 } 393 $value = new Value($col, $val); 394 $isempty &= $this->isEmptyValue($value); 395 $resrow[] = $value; 396 $C++; 397 } 398 399 // skip empty rows 400 if ($isempty && !$pageidAndRevOnly) { 401 $cursor--; 402 continue; 403 } 404 405 $this->result_pids[] = $row['PID']; 406 $this->result_rids[] = $row['rid']; 407 $this->result_revs[] = $row['rev']; 408 $result[] = $resrow; 409 } 410 411 $this->sqlite->res_close($res); 412 $this->count = $cursor + 1; 413 return $result; 414 } 415 416 /** 417 * Transform the set search parameters into a statement 418 * 419 * @return array ($sql, $opts) The SQL and parameters to execute 420 */ 421 public function getSQL() 422 { 423 if (!$this->columns) throw new StructException('nocolname'); 424 425 $QB = new QueryBuilder(); 426 427 // basic tables 428 $first_table = ''; 429 foreach ($this->schemas as $schema) { 430 $datatable = 'data_' . $schema->getTable(); 431 if ($first_table) { 432 // follow up tables 433 $QB->addLeftJoin($first_table, $datatable, $datatable, "$first_table.pid = $datatable.pid"); 434 } else { 435 // first table 436 $QB->addTable($datatable); 437 438 // add conditional page clauses if pid has a value 439 $subAnd = $QB->filters()->whereSubAnd(); 440 $subAnd->whereAnd("$datatable.pid = ''"); 441 $subOr = $subAnd->whereSubOr(); 442 $subOr->whereAnd("GETACCESSLEVEL($datatable.pid) > 0"); 443 $subOr->whereAnd("PAGEEXISTS($datatable.pid) = 1"); 444 $subOr->whereAnd('(ASSIGNED = 1 OR ASSIGNED IS NULL)'); 445 446 // add conditional schema assignment check 447 $QB->addLeftJoin( 448 $datatable, 449 'schema_assignments', 450 '', 451 "$datatable.pid != '' 452 AND $datatable.pid = schema_assignments.pid 453 AND schema_assignments.tbl = '{$schema->getTable()}'" 454 ); 455 456 $QB->addSelectColumn($datatable, 'rid'); 457 $QB->addSelectColumn($datatable, 'pid', 'PID'); 458 $QB->addSelectColumn($datatable, 'rev'); 459 $QB->addSelectColumn('schema_assignments', 'assigned', 'ASSIGNED'); 460 $QB->addGroupByColumn($datatable, 'pid'); 461 $QB->addGroupByColumn($datatable, 'rid'); 462 463 $first_table = $datatable; 464 } 465 $QB->filters()->whereAnd("$datatable.latest = 1"); 466 } 467 468 // columns to select, handling multis 469 $sep = self::CONCAT_SEPARATOR; 470 $n = 0; 471 foreach ($this->columns as $col) { 472 $CN = 'C' . $n++; 473 474 if ($col->isMulti()) { 475 $datatable = "data_{$col->getTable()}"; 476 $multitable = "multi_{$col->getTable()}"; 477 $MN = $QB->generateTableAlias('M'); 478 479 $QB->addLeftJoin( 480 $datatable, 481 $multitable, 482 $MN, 483 "$datatable.pid = $MN.pid AND $datatable.rid = $MN.rid AND 484 $datatable.rev = $MN.rev AND 485 $MN.colref = {$col->getColref()}" 486 ); 487 488 $col->getType()->select($QB, $MN, 'value', $CN); 489 $sel = $QB->getSelectStatement($CN); 490 $QB->addSelectStatement("GROUP_CONCAT($sel, '$sep')", $CN); 491 } else { 492 $col->getType()->select($QB, 'data_' . $col->getTable(), $col->getColName(), $CN); 493 $QB->addGroupByStatement($CN); 494 } 495 } 496 497 // where clauses 498 if (!empty($this->filter)) { 499 $userWHERE = $QB->filters()->where('AND'); 500 } 501 foreach ($this->filter as $filter) { 502 /** @var Column $col */ 503 list($col, $value, $comp, $op) = $filter; 504 505 $datatable = "data_{$col->getTable()}"; 506 $multitable = "multi_{$col->getTable()}"; 507 508 /** @var $col Column */ 509 if ($col->isMulti()) { 510 $MN = $QB->generateTableAlias('MN'); 511 512 $QB->addLeftJoin( 513 $datatable, 514 $multitable, 515 $MN, 516 "$datatable.pid = $MN.pid AND $datatable.rid = $MN.rid AND 517 $datatable.rev = $MN.rev AND 518 $MN.colref = {$col->getColref()}" 519 ); 520 $coltbl = $MN; 521 $colnam = 'value'; 522 } else { 523 $coltbl = $datatable; 524 $colnam = $col->getColName(); 525 } 526 527 $col->getType()->filter($userWHERE, $coltbl, $colnam, $comp, $value, $op); // type based filter 528 } 529 530 // sorting - we always sort by the single val column 531 foreach ($this->sortby as $sort) { 532 list($col, $asc, $nc) = $sort; 533 /** @var $col Column */ 534 $colname = $col->getColName(false); 535 if ($nc) $colname .= ' COLLATE NOCASE'; 536 $col->getType()->sort($QB, 'data_' . $col->getTable(), $colname, $asc ? 'ASC' : 'DESC'); 537 } 538 539 return $QB->getSQL(); 540 } 541 542 /** 543 * Returns all the columns that where added to the search 544 * 545 * @return Column[] 546 */ 547 public function getColumns() 548 { 549 return $this->columns; 550 } 551 552 /** 553 * All the schemas currently added 554 * 555 * @return Schema[] 556 */ 557 public function getSchemas() 558 { 559 return array_values($this->schemas); 560 } 561 562 /** 563 * Checks if the given column is a * wildcard 564 * 565 * If it's a wildcard all matching columns are added to the column list, otherwise 566 * nothing happens 567 * 568 * @param string $colname 569 * @return bool was wildcard? 570 */ 571 protected function processWildcard($colname) 572 { 573 list($colname, $table) = $this->resolveColumn($colname); 574 if ($colname !== '*') return false; 575 576 // no table given? assume the first is meant 577 if ($table === null) { 578 $schema_list = array_keys($this->schemas); 579 $table = $schema_list[0]; 580 } 581 582 $schema = $this->schemas[$table]; 583 if (!$schema) return false; 584 $this->columns = array_merge($this->columns, $schema->getColumns(false)); 585 return true; 586 } 587 588 /** 589 * Split a given column name into table and column 590 * 591 * Handles Aliases. Table might be null if none given. 592 * 593 * @param $colname 594 * @return array (colname, table) 595 */ 596 protected function resolveColumn($colname) 597 { 598 if (!$this->schemas) throw new StructException('noschemas'); 599 600 // resolve the alias or table name 601 @list($table, $colname) = explode('.', $colname, 2); 602 if (!$colname) { 603 $colname = $table; 604 $table = null; 605 } 606 if ($table && isset($this->aliases[$table])) { 607 $table = $this->aliases[$table]; 608 } 609 610 if (!$colname) throw new StructException('nocolname'); 611 612 return array($colname, $table); 613 } 614 615 /** 616 * Find a column to be used in the search 617 * 618 * @param string $colname may contain an alias 619 * @return bool|Column 620 */ 621 public function findColumn($colname, $strict = false) 622 { 623 if (!$this->schemas) throw new StructException('noschemas'); 624 $schema_list = array_keys($this->schemas); 625 626 // add "fake" column for special col 627 if ($colname == '%pageid%') { 628 return new PageColumn(0, new Page(), $schema_list[0]); 629 } 630 if ($colname == '%title%') { 631 return new PageColumn(0, new Page(array('usetitles' => true)), $schema_list[0]); 632 } 633 if ($colname == '%lastupdate%') { 634 return new RevisionColumn(0, new DateTime(), $schema_list[0]); 635 } 636 if ($colname == '%lasteditor%') { 637 return new UserColumn(0, new User(), $schema_list[0]); 638 } 639 if ($colname == '%lastsummary%') { 640 return new SummaryColumn(0, new AutoSummary(), $schema_list[0]); 641 } 642 if ($colname == '%rowid%') { 643 return new RowColumn(0, new Decimal(), $schema_list[0]); 644 } 645 646 list($colname, $table) = $this->resolveColumn($colname); 647 648 /* 649 * If table name is given search only that, otherwise if no strict behavior 650 * is requested by the caller, try all assigned schemas for matching the 651 * column name. 652 */ 653 if ($table !== null && isset($this->schemas[$table])) { 654 $schemas = array($table => $this->schemas[$table]); 655 } elseif ($table === null || !$strict) { 656 $schemas = $this->schemas; 657 } else { 658 return false; 659 } 660 661 // find it 662 $col = false; 663 foreach ($schemas as $schema) { 664 if (empty($schema)) { 665 continue; 666 } 667 $col = $schema->findColumn($colname); 668 if ($col) break; 669 } 670 671 return $col; 672 } 673 674 /** 675 * Check if the given row is empty or references our own row 676 * 677 * @param Value $value 678 * @return bool 679 */ 680 protected function isEmptyValue(Value $value) 681 { 682 if ($value->isEmpty()) return true; 683 if ($value->getColumn()->getTid() == 0) return true; 684 return false; 685 } 686} 687