1<?php 2 3namespace dokuwiki\plugin\struct\meta; 4 5use dokuwiki\plugin\struct\types\DateTime; 6use dokuwiki\plugin\struct\types\Decimal; 7use dokuwiki\plugin\struct\types\Page; 8use dokuwiki\plugin\struct\types\AutoSummary; 9use dokuwiki\plugin\struct\types\Text; 10use dokuwiki\plugin\struct\types\User; 11 12class Search 13{ 14 /** 15 * This separator will be used to concat multi values to flatten them in the result set 16 */ 17 const CONCAT_SEPARATOR = "\n!_-_-_-_-_!\n"; 18 19 /** 20 * The list of known and allowed comparators 21 * (order matters) 22 */ 23 public static $COMPARATORS = array( 24 '<=', '>=', '=*', '=', '<', '>', '!=', '!~', '~', 'IN' 25 ); 26 27 /** @var \helper_plugin_sqlite */ 28 protected $sqlite; 29 30 /** @var Schema[] list of schemas to query */ 31 protected $schemas = array(); 32 33 /** @var Column[] list of columns to select */ 34 protected $columns = array(); 35 36 /** @var array the sorting of the result */ 37 protected $sortby = array(); 38 39 /** @var array the filters */ 40 protected $filter = array(); 41 42 /** @var array list of aliases tables can be referenced by */ 43 protected $aliases = array(); 44 45 /** @var int begin results from here */ 46 protected $range_begin = 0; 47 48 /** @var int end results here */ 49 protected $range_end = 0; 50 51 /** @var int the number of results */ 52 protected $count = -1; 53 /** @var string[] the PIDs of the result rows */ 54 protected $result_pids = null; 55 /** @var array the row ids of the result rows */ 56 protected $result_rids = []; 57 /** @var array the revisions of the result rows */ 58 protected $result_revs = []; 59 60 /** 61 * Search constructor. 62 */ 63 public function __construct() 64 { 65 /** @var \helper_plugin_struct_db $plugin */ 66 $plugin = plugin_load('helper', 'struct_db'); 67 $this->sqlite = $plugin->getDB(); 68 } 69 70 /** 71 * Add a schema to be searched 72 * 73 * Call multiple times for multiple schemas. 74 * 75 * @param string $table 76 * @param string $alias 77 */ 78 public function addSchema($table, $alias = '') 79 { 80 $schema = new Schema($table); 81 if (!$schema->getId()) { 82 throw new StructException('schema missing', $table); 83 } 84 85 $this->schemas[$schema->getTable()] = $schema; 86 if ($alias) $this->aliases[$alias] = $schema->getTable(); 87 } 88 89 /** 90 * Add a column to be returned by the search 91 * 92 * Call multiple times for multiple columns. Be sure the referenced tables have been 93 * added before 94 * 95 * @param string $colname may contain an alias 96 */ 97 public function addColumn($colname) 98 { 99 if ($this->processWildcard($colname)) return; // wildcard? 100 $col = $this->findColumn($colname); 101 if (!$col) return; //FIXME do we really want to ignore missing columns? 102 $this->columns[] = $col; 103 } 104 105 /** 106 * Add sorting options 107 * 108 * Call multiple times for multiple columns. Be sure the referenced tables have been 109 * added before 110 * 111 * @param string $colname may contain an alias 112 * @param bool $asc sort direction (ASC = true, DESC = false) 113 * @param bool $nc set true for caseinsensitivity 114 */ 115 public function addSort($colname, $asc = true, $nc = true) 116 { 117 $col = $this->findColumn($colname); 118 if (!$col) return; //FIXME do we really want to ignore missing columns? 119 120 $this->sortby[$col->getFullQualifiedLabel()] = array($col, $asc, $nc); 121 } 122 123 /** 124 * Returns all set sort columns 125 * 126 * @return array 127 */ 128 public function getSorts() 129 { 130 return $this->sortby; 131 } 132 133 /** 134 * Adds a filter 135 * 136 * @param string $colname may contain an alias 137 * @param string|string[] $value 138 * @param string $comp @see self::COMPARATORS 139 * @param string $op either 'OR' or 'AND' 140 */ 141 public function addFilter($colname, $value, $comp, $op = 'OR') 142 { 143 /* Convert certain filters into others 144 * this reduces the number of supported filters to implement in types */ 145 if ($comp == '*~') { 146 $value = $this->filterWrapAsterisks($value); 147 $comp = '~'; 148 } elseif ($comp == '<>') { 149 $comp = '!='; 150 } 151 152 if (!in_array($comp, self::$COMPARATORS)) throw new StructException("Bad comperator. Use " . join(',', self::$COMPARATORS)); 153 if ($op != 'OR' && $op != 'AND') throw new StructException('Bad filter type . Only AND or OR allowed'); 154 155 $col = $this->findColumn($colname); 156 if (!$col) return; // ignore missing columns, filter might have been for different schema 157 158 // map filter operators to SQL syntax 159 switch ($comp) { 160 case '~': 161 $comp = 'LIKE'; 162 break; 163 case '!~': 164 $comp = 'NOT LIKE'; 165 break; 166 case '=*': 167 $comp = 'REGEXP'; 168 break; 169 } 170 171 // we use asterisks, but SQL wants percents 172 if ($comp == 'LIKE' || $comp == 'NOT LIKE') { 173 $value = $this->filterChangeToLike($value); 174 } 175 176 if ($comp == 'IN' && !is_array($value)) { 177 $value = $this->parseFilterValueList($value); 178 //col IN ('a', 'b', 'c') is equal to col = 'a' OR 'col = 'b' OR col = 'c' 179 $comp = '='; 180 } 181 182 // add the filter 183 $this->filter[] = array($col, $value, $comp, $op); 184 } 185 186 /** 187 * Parse SQLite row value into array 188 * 189 * @param string $value 190 * @return string[] 191 */ 192 protected function parseFilterValueList($value) 193 { 194 $Handler = new FilterValueListHandler(); 195 $LexerClass = class_exists('\Doku_Lexer') ? '\Doku_Lexer' : '\dokuwiki\Parsing\Lexer\Lexer'; 196 $isLegacy = $LexerClass === '\Doku_Lexer'; 197 /** @var \Doku_Lexer|\dokuwiki\Parsing\Lexer\Lexer $Lexer */ 198 $Lexer = new $LexerClass($Handler, 'base', true); 199 200 201 $Lexer->addEntryPattern('\(', 'base', 'row'); 202 $Lexer->addPattern('\s*,\s*', 'row'); 203 $Lexer->addExitPattern('\)', 'row'); 204 205 $Lexer->addEntryPattern('"', 'row', 'double_quote_string'); 206 $Lexer->addSpecialPattern('\\\\"', 'double_quote_string', 'escapeSequence'); 207 $Lexer->addExitPattern('"', 'double_quote_string'); 208 209 $Lexer->addEntryPattern("'", 'row', 'singleQuoteString'); 210 $Lexer->addSpecialPattern("\\\\'", 'singleQuoteString', 'escapeSequence'); 211 $Lexer->addExitPattern("'", 'singleQuoteString'); 212 213 $Lexer->mapHandler('double_quote_string', 'singleQuoteString'); 214 215 $Lexer->addSpecialPattern('[-+]?[0-9]*\.?[0-9]+(?:[eE][-+]?[0-9]+)?', 'row', 'number'); 216 217 $res = $Lexer->parse($value); 218 219 $currentMode = $isLegacy ? $Lexer->_mode->getCurrent() : $Lexer->getModeStack()->getCurrent(); 220 if (!$res || $currentMode != 'base') { 221 throw new StructException('invalid row value syntax'); 222 } 223 224 return $Handler->getRow(); 225 } 226 227 /** 228 * Wrap given value in asterisks 229 * 230 * @param string|string[] $value 231 * @return string|string[] 232 */ 233 protected function filterWrapAsterisks($value) 234 { 235 $map = function ($input) { 236 return "*$input*"; 237 }; 238 239 if (is_array($value)) { 240 $value = array_map($map, $value); 241 } else { 242 $value = $map($value); 243 } 244 return $value; 245 } 246 247 /** 248 * Change given string to use % instead of * 249 * 250 * @param string|string[] $value 251 * @return string|string[] 252 */ 253 protected function filterChangeToLike($value) 254 { 255 $map = function ($input) { 256 return str_replace('*', '%', $input); 257 }; 258 259 if (is_array($value)) { 260 $value = array_map($map, $value); 261 } else { 262 $value = $map($value); 263 } 264 return $value; 265 } 266 267 /** 268 * Set offset for the results 269 * 270 * @param int $offset 271 */ 272 public function setOffset($offset) 273 { 274 $limit = 0; 275 if ($this->range_end) { 276 // if there was a limit set previously, the range_end needs to be recalculated 277 $limit = $this->range_end - $this->range_begin; 278 } 279 $this->range_begin = $offset; 280 if ($limit) $this->setLimit($limit); 281 } 282 283 /** 284 * Limit results to this number 285 * 286 * @param int $limit Set to 0 to disable limit again 287 */ 288 public function setLimit($limit) 289 { 290 if ($limit) { 291 $this->range_end = $this->range_begin + $limit; 292 } else { 293 $this->range_end = 0; 294 } 295 } 296 297 /** 298 * Return the number of results (regardless of limit and offset settings) 299 * 300 * Use this to implement paging. Important: this may only be called after running @see execute() 301 * 302 * @return int 303 */ 304 public function getCount() 305 { 306 if ($this->count < 0) throw new StructException('Count is only accessible after executing the search'); 307 return $this->count; 308 } 309 310 /** 311 * Returns the PID associated with each result row 312 * 313 * Important: this may only be called after running @see execute() 314 * 315 * @return \string[] 316 */ 317 public function getPids() 318 { 319 if ($this->result_pids === null) throw new StructException('PIDs are only accessible after executing the search'); 320 return $this->result_pids; 321 } 322 323 /** 324 * Returns the rid associated with each result row 325 * 326 * Important: this may only be called after running @see execute() 327 * 328 * @return array 329 */ 330 public function getRids() 331 { 332 if ($this->result_rids === null) throw new StructException('rids are only accessible after executing the search'); 333 return $this->result_rids; 334 } 335 336 /** 337 * Returns the rid associated with each result row 338 * 339 * Important: this may only be called after running @see execute() 340 * 341 * @return array 342 */ 343 public function getRevs() 344 { 345 if ($this->result_revs === null) throw new StructException('revs are only accessible after executing the search'); 346 return $this->result_revs; 347 } 348 349 /** 350 * Execute this search and return the result 351 * 352 * The result is a two dimensional array of Value()s. 353 * 354 * This will always query for the full result (not using offset and limit) and then 355 * return the wanted range, setting the count (@see getCount) to the whole result number 356 * 357 * @return Value[][] 358 */ 359 public function execute() 360 { 361 list($sql, $opts) = $this->getSQL(); 362 363 /** @var \PDOStatement $res */ 364 $res = $this->sqlite->query($sql, $opts); 365 if ($res === false) throw new StructException("SQL execution failed for\n\n$sql"); 366 367 $this->result_pids = array(); 368 $result = array(); 369 $cursor = -1; 370 $pageidAndRevOnly = array_reduce($this->columns, function ($pageidAndRevOnly, Column $col) { 371 return $pageidAndRevOnly && ($col->getTid() == 0); 372 }, true); 373 while ($row = $res->fetch(\PDO::FETCH_ASSOC)) { 374 $cursor++; 375 if ($cursor < $this->range_begin) continue; 376 if ($this->range_end && $cursor >= $this->range_end) continue; 377 378 $C = 0; 379 $resrow = array(); 380 $isempty = true; 381 foreach ($this->columns as $col) { 382 $val = $row["C$C"]; 383 if ($col->isMulti()) { 384 $val = explode(self::CONCAT_SEPARATOR, $val); 385 } 386 $value = new Value($col, $val); 387 $isempty &= $this->isEmptyValue($value); 388 $resrow[] = $value; 389 $C++; 390 } 391 392 // skip empty rows 393 if ($isempty && !$pageidAndRevOnly) { 394 $cursor--; 395 continue; 396 } 397 398 $this->result_pids[] = $row['PID']; 399 $this->result_rids[] = $row['rid']; 400 $this->result_revs[] = $row['rev']; 401 $result[] = $resrow; 402 } 403 404 $this->sqlite->res_close($res); 405 $this->count = $cursor + 1; 406 return $result; 407 } 408 409 /** 410 * Transform the set search parameters into a statement 411 * 412 * @return array ($sql, $opts) The SQL and parameters to execute 413 */ 414 public function getSQL() 415 { 416 if (!$this->columns) throw new StructException('nocolname'); 417 418 $QB = new QueryBuilder(); 419 420 // basic tables 421 $first_table = ''; 422 foreach ($this->schemas as $schema) { 423 $datatable = 'data_' . $schema->getTable(); 424 if ($first_table) { 425 // follow up tables 426 $QB->addLeftJoin($first_table, $datatable, $datatable, "$first_table.pid = $datatable.pid"); 427 } else { 428 // first table 429 $QB->addTable($datatable); 430 431 // add conditional page clauses if pid has a value 432 $subAnd = $QB->filters()->whereSubAnd(); 433 $subAnd->whereAnd("$datatable.pid = ''"); 434 $subOr = $subAnd->whereSubOr(); 435 $subOr->whereAnd("GETACCESSLEVEL($datatable.pid) > 0"); 436 $subOr->whereAnd("PAGEEXISTS($datatable.pid) = 1"); 437 $subOr->whereAnd('(ASSIGNED = 1 OR ASSIGNED IS NULL)'); 438 439 // add conditional schema assignment check 440 $QB->addLeftJoin( 441 $datatable, 442 'schema_assignments', 443 '', 444 "$datatable.pid != '' 445 AND $datatable.pid = schema_assignments.pid 446 AND schema_assignments.tbl = '{$schema->getTable()}'" 447 ); 448 449 $QB->addSelectColumn($datatable, 'rid'); 450 $QB->addSelectColumn($datatable, 'pid', 'PID'); 451 $QB->addSelectColumn($datatable, 'rev'); 452 $QB->addSelectColumn('schema_assignments', 'assigned', 'ASSIGNED'); 453 $QB->addGroupByColumn($datatable, 'pid'); 454 455 $first_table = $datatable; 456 } 457 $QB->filters()->whereAnd("$datatable.latest = 1"); 458 } 459 460 // columns to select, handling multis 461 $sep = self::CONCAT_SEPARATOR; 462 $n = 0; 463 foreach ($this->columns as $col) { 464 $CN = 'C' . $n++; 465 466 if ($col->isMulti()) { 467 $datatable = "data_{$col->getTable()}"; 468 $multitable = "multi_{$col->getTable()}"; 469 $MN = $QB->generateTableAlias('M'); 470 471 $QB->addLeftJoin( 472 $datatable, 473 $multitable, 474 $MN, 475 "$datatable.pid = $MN.pid AND $datatable.rid = $MN.rid AND 476 $datatable.rev = $MN.rev AND 477 $MN.colref = {$col->getColref()}" 478 ); 479 480 $col->getType()->select($QB, $MN, 'value', $CN); 481 $sel = $QB->getSelectStatement($CN); 482 $QB->addSelectStatement("GROUP_CONCAT($sel, '$sep')", $CN); 483 } else { 484 $col->getType()->select($QB, 'data_' . $col->getTable(), $col->getColName(), $CN); 485 $QB->addGroupByStatement($CN); 486 } 487 } 488 489 // where clauses 490 if (!empty($this->filter)) { 491 $userWHERE = $QB->filters()->where('AND'); 492 } 493 foreach ($this->filter as $filter) { 494 /** @var Column $col */ 495 list($col, $value, $comp, $op) = $filter; 496 497 $datatable = "data_{$col->getTable()}"; 498 $multitable = "multi_{$col->getTable()}"; 499 500 /** @var $col Column */ 501 if ($col->isMulti()) { 502 $MN = $QB->generateTableAlias('MN'); 503 504 $QB->addLeftJoin( 505 $datatable, 506 $multitable, 507 $MN, 508 "$datatable.pid = $MN.pid AND $datatable.rid = $MN.rid AND 509 $datatable.rev = $MN.rev AND 510 $MN.colref = {$col->getColref()}" 511 ); 512 $coltbl = $MN; 513 $colnam = 'value'; 514 } else { 515 $coltbl = $datatable; 516 $colnam = $col->getColName(); 517 } 518 519 $col->getType()->filter($userWHERE, $coltbl, $colnam, $comp, $value, $op); // type based filter 520 } 521 522 // sorting - we always sort by the single val column 523 foreach ($this->sortby as $sort) { 524 list($col, $asc, $nc) = $sort; 525 /** @var $col Column */ 526 $colname = $col->getColName(false); 527 if ($nc) $colname .= ' COLLATE NOCASE'; 528 $col->getType()->sort($QB, 'data_' . $col->getTable(), $colname, $asc ? 'ASC' : 'DESC'); 529 } 530 531 return $QB->getSQL(); 532 } 533 534 /** 535 * Returns all the columns that where added to the search 536 * 537 * @return Column[] 538 */ 539 public function getColumns() 540 { 541 return $this->columns; 542 } 543 544 /** 545 * All the schemas currently added 546 * 547 * @return Schema[] 548 */ 549 public function getSchemas() 550 { 551 return array_values($this->schemas); 552 } 553 554 /** 555 * Checks if the given column is a * wildcard 556 * 557 * If it's a wildcard all matching columns are added to the column list, otherwise 558 * nothing happens 559 * 560 * @param string $colname 561 * @return bool was wildcard? 562 */ 563 protected function processWildcard($colname) 564 { 565 list($colname, $table) = $this->resolveColumn($colname); 566 if ($colname !== '*') return false; 567 568 // no table given? assume the first is meant 569 if ($table === null) { 570 $schema_list = array_keys($this->schemas); 571 $table = $schema_list[0]; 572 } 573 574 $schema = $this->schemas[$table]; 575 if (!$schema) return false; 576 $this->columns = array_merge($this->columns, $schema->getColumns(false)); 577 return true; 578 } 579 580 /** 581 * Split a given column name into table and column 582 * 583 * Handles Aliases. Table might be null if none given. 584 * 585 * @param $colname 586 * @return array (colname, table) 587 */ 588 protected function resolveColumn($colname) 589 { 590 if (!$this->schemas) throw new StructException('noschemas'); 591 592 // resolve the alias or table name 593 @list($table, $colname) = explode('.', $colname, 2); 594 if (!$colname) { 595 $colname = $table; 596 $table = null; 597 } 598 if ($table && isset($this->aliases[$table])) { 599 $table = $this->aliases[$table]; 600 } 601 602 if (!$colname) throw new StructException('nocolname'); 603 604 return array($colname, $table); 605 } 606 607 /** 608 * Find a column to be used in the search 609 * 610 * @param string $colname may contain an alias 611 * @return bool|Column 612 */ 613 public function findColumn($colname) 614 { 615 if (!$this->schemas) throw new StructException('noschemas'); 616 $schema_list = array_keys($this->schemas); 617 618 // add "fake" column for special col 619 if ($colname == '%pageid%') { 620 return new PageColumn(0, new Page(), $schema_list[0]); 621 } 622 if ($colname == '%title%') { 623 return new PageColumn(0, new Page(array('usetitles' => true)), $schema_list[0]); 624 } 625 if ($colname == '%lastupdate%') { 626 return new RevisionColumn(0, new DateTime(), $schema_list[0]); 627 } 628 if ($colname == '%lasteditor%') { 629 return new UserColumn(0, new User(), $schema_list[0]); 630 } 631 if ($colname == '%lastsummary%') { 632 return new SummaryColumn(0, new AutoSummary(), $schema_list[0]); 633 } 634 if ($colname == '%rowid%') { 635 return new RowColumn(0, new Decimal(), $schema_list[0]); 636 } 637 638 list($colname, $table) = $this->resolveColumn($colname); 639 640 // if table name given search only that, otherwise try all for matching column name 641 if ($table !== null) { 642 $schemas = array($table => $this->schemas[$table]); 643 } else { 644 $schemas = $this->schemas; 645 } 646 647 // find it 648 $col = false; 649 foreach ($schemas as $schema) { 650 if (empty($schema)) { 651 continue; 652 } 653 $col = $schema->findColumn($colname); 654 if ($col) break; 655 } 656 657 return $col; 658 } 659 660 /** 661 * Check if the given row is empty or references our own row 662 * 663 * @param Value $value 664 * @return bool 665 */ 666 protected function isEmptyValue(Value $value) 667 { 668 if ($value->isEmpty()) return true; 669 if ($value->getColumn()->getTid() == 0) return true; 670 return false; 671 } 672} 673