1<?php 2 3namespace dokuwiki\plugin\struct\meta; 4 5use dokuwiki\plugin\struct\types\DateTime; 6use dokuwiki\plugin\struct\types\Decimal; 7use dokuwiki\plugin\struct\types\Page; 8use dokuwiki\plugin\struct\types\Summary; 9use dokuwiki\plugin\struct\types\Text; 10use dokuwiki\plugin\struct\types\User; 11 12class Search { 13 /** 14 * This separator will be used to concat multi values to flatten them in the result set 15 */ 16 const CONCAT_SEPARATOR = "\n!_-_-_-_-_!\n"; 17 18 /** 19 * The list of known and allowed comparators 20 * (order matters) 21 */ 22 static public $COMPARATORS = array( 23 '<=', '>=', '=*', '=', '<', '>', '!=', '!~', '~', 'IN' 24 ); 25 26 /** @var \helper_plugin_sqlite */ 27 protected $sqlite; 28 29 /** @var Schema[] list of schemas to query */ 30 protected $schemas = array(); 31 32 /** @var Column[] list of columns to select */ 33 protected $columns = array(); 34 35 /** @var array the sorting of the result */ 36 protected $sortby = array(); 37 38 /** @var array the filters */ 39 protected $filter = array(); 40 41 /** @var array list of aliases tables can be referenced by */ 42 protected $aliases = array(); 43 44 /** @var int begin results from here */ 45 protected $range_begin = 0; 46 47 /** @var int end results here */ 48 protected $range_end = 0; 49 50 /** @var int the number of results */ 51 protected $count = -1; 52 /** @var string[] the PIDs of the result rows */ 53 protected $result_pids = null; 54 55 /** 56 * Search constructor. 57 */ 58 public function __construct() { 59 /** @var \helper_plugin_struct_db $plugin */ 60 $plugin = plugin_load('helper', 'struct_db'); 61 $this->sqlite = $plugin->getDB(); 62 } 63 64 /** 65 * Add a schema to be searched 66 * 67 * Call multiple times for multiple schemas. 68 * 69 * @param string $table 70 * @param string $alias 71 */ 72 public function addSchema($table, $alias = '') { 73 $schema = new Schema($table); 74 if(!$schema->getId()) { 75 throw new StructException('schema missing', $table); 76 } 77 78 if($this->schemas && 79 ( 80 $schema->isLookup() || 81 reset($this->schemas)->isLookup() 82 ) 83 ) { 84 throw new StructException('nolookupmix'); 85 } 86 87 $this->schemas[$schema->getTable()] = $schema; 88 if($alias) $this->aliases[$alias] = $schema->getTable(); 89 } 90 91 /** 92 * Add a column to be returned by the search 93 * 94 * Call multiple times for multiple columns. Be sure the referenced tables have been 95 * added before 96 * 97 * @param string $colname may contain an alias 98 */ 99 public function addColumn($colname) { 100 if($this->processWildcard($colname)) return; // wildcard? 101 $col = $this->findColumn($colname); 102 if(!$col) return; //FIXME do we really want to ignore missing columns? 103 $this->columns[] = $col; 104 } 105 106 /** 107 * Add sorting options 108 * 109 * Call multiple times for multiple columns. Be sure the referenced tables have been 110 * added before 111 * 112 * @param string $colname may contain an alias 113 * @param bool $asc sort direction (ASC = true, DESC = false) 114 * @param bool $nc set true for caseinsensitivity 115 */ 116 public function addSort($colname, $asc = true, $nc = true) { 117 $col = $this->findColumn($colname); 118 if(!$col) return; //FIXME do we really want to ignore missing columns? 119 120 $this->sortby[$col->getFullQualifiedLabel()] = array($col, $asc, $nc); 121 } 122 123 /** 124 * Returns all set sort columns 125 * 126 * @return array 127 */ 128 public function getSorts() { 129 return $this->sortby; 130 } 131 132 /** 133 * Adds a filter 134 * 135 * @param string $colname may contain an alias 136 * @param string|string[] $value 137 * @param string $comp @see self::COMPARATORS 138 * @param string $op either 'OR' or 'AND' 139 */ 140 public function addFilter($colname, $value, $comp, $op = 'OR') { 141 /* Convert certain filters into others 142 * this reduces the number of supported filters to implement in types */ 143 if($comp == '*~') { 144 $value = $this->filterWrapAsterisks($value); 145 $comp = '~'; 146 } elseif($comp == '<>') { 147 $comp = '!='; 148 } 149 150 if(!in_array($comp, self::$COMPARATORS)) throw new StructException("Bad comperator. Use " . join(',', self::$COMPARATORS)); 151 if($op != 'OR' && $op != 'AND') throw new StructException('Bad filter type . Only AND or OR allowed'); 152 153 $col = $this->findColumn($colname); 154 if(!$col) return; // ignore missing columns, filter might have been for different schema 155 156 // map filter operators to SQL syntax 157 switch($comp) { 158 case '~': 159 $comp = 'LIKE'; 160 break; 161 case '!~': 162 $comp = 'NOT LIKE'; 163 break; 164 case '=*': 165 $comp = 'REGEXP'; 166 break; 167 } 168 169 // we use asterisks, but SQL wants percents 170 if($comp == 'LIKE' || $comp == 'NOT LIKE') { 171 $value = $this->filterChangeToLike($value); 172 } 173 174 if ($comp == 'IN' && !is_array($value)) { 175 $value = $this->parseRowValue($value); 176 //col IN ('a', 'b', 'c') is equal to col = 'a' OR 'col = 'b' OR col = 'c' 177 $comp = '='; 178 } 179 180 // add the filter 181 $this->filter[] = array($col, $value, $comp, $op); 182 } 183 184 /** 185 * Parse SQLite row value into array 186 * 187 * @param string $value 188 * @return string[] 189 */ 190 protected function parseRowValue($value) { 191 $Handler = new RowValueHandler(); 192 $Lexer = new \Doku_Lexer($Handler, 'base', true); 193 194 $Lexer->addEntryPattern('\(','base','row'); 195 $Lexer->addPattern('\s*,\s*','row'); 196 $Lexer->addExitPattern('\)','row'); 197 198 $Lexer->addEntryPattern('"', 'row', 'double_quote_string'); 199 $Lexer->addSpecialPattern('\\\\"','double_quote_string','escape_sequence'); 200 $Lexer->addExitPattern('"', 'double_quote_string'); 201 202 $Lexer->addEntryPattern("'", 'row', 'single_quote_string'); 203 $Lexer->addSpecialPattern("\\\\'",'single_quote_string','escape_sequence'); 204 $Lexer->addExitPattern("'", 'single_quote_string'); 205 206 $Lexer->mapHandler('double_quote_string','single_quote_string'); 207 208 $Lexer->addSpecialPattern('[-+]?[0-9]*\.?[0-9]+(?:[eE][-+]?[0-9]+)?','row','number'); 209 210 $res = $Lexer->parse($value); 211 212 if (!$res || $Lexer->_mode->getCurrent() != 'base') { 213 throw new StructException('invalid row value syntax'); 214 } 215 216 return $Handler->get_row(); 217 } 218 219 /** 220 * Wrap given value in asterisks 221 * 222 * @param string|string[] $value 223 * @return string|string[] 224 */ 225 protected function filterWrapAsterisks($value) { 226 $map = function ($input) { 227 return "*$input*"; 228 }; 229 230 if(is_array($value)) { 231 $value = array_map($map, $value); 232 } else { 233 $value = $map($value); 234 } 235 return $value; 236 } 237 238 /** 239 * Change given string to use % instead of * 240 * 241 * @param string|string[] $value 242 * @return string|string[] 243 */ 244 protected function filterChangeToLike($value) { 245 $map = function ($input) { 246 return str_replace('*', '%', $input); 247 }; 248 249 if(is_array($value)) { 250 $value = array_map($map, $value); 251 } else { 252 $value = $map($value); 253 } 254 return $value; 255 } 256 257 /** 258 * Set offset for the results 259 * 260 * @param int $offset 261 */ 262 public function setOffset($offset) { 263 $limit = 0; 264 if($this->range_end) { 265 // if there was a limit set previously, the range_end needs to be recalculated 266 $limit = $this->range_end - $this->range_begin; 267 } 268 $this->range_begin = $offset; 269 if($limit) $this->setLimit($limit); 270 } 271 272 /** 273 * Limit results to this number 274 * 275 * @param int $limit Set to 0 to disable limit again 276 */ 277 public function setLimit($limit) { 278 if($limit) { 279 $this->range_end = $this->range_begin + $limit; 280 } else { 281 $this->range_end = 0; 282 } 283 } 284 285 /** 286 * Return the number of results (regardless of limit and offset settings) 287 * 288 * Use this to implement paging. Important: this may only be called after running @see execute() 289 * 290 * @return int 291 */ 292 public function getCount() { 293 if($this->count < 0) throw new StructException('Count is only accessible after executing the search'); 294 return $this->count; 295 } 296 297 /** 298 * Returns the PID associated with each result row 299 * 300 * Important: this may only be called after running @see execute() 301 * 302 * @return \string[] 303 */ 304 public function getPids() { 305 if($this->result_pids === null) throw new StructException('PIDs are only accessible after executing the search'); 306 return $this->result_pids; 307 } 308 309 /** 310 * Execute this search and return the result 311 * 312 * The result is a two dimensional array of Value()s. 313 * 314 * This will always query for the full result (not using offset and limit) and then 315 * return the wanted range, setting the count (@see getCount) to the whole result number 316 * 317 * @return Value[][] 318 */ 319 public function execute() { 320 list($sql, $opts) = $this->getSQL(); 321 322 /** @var \PDOStatement $res */ 323 $res = $this->sqlite->query($sql, $opts); 324 if($res === false) throw new StructException("SQL execution failed for\n\n$sql"); 325 326 $this->result_pids = array(); 327 $result = array(); 328 $cursor = -1; 329 $pageidAndRevOnly = array_reduce($this->columns, function ($pageidAndRevOnly, Column $col) { 330 return $pageidAndRevOnly && ($col->getTid() == 0); 331 }, true); 332 while($row = $res->fetch(\PDO::FETCH_ASSOC)) { 333 $cursor++; 334 if($cursor < $this->range_begin) continue; 335 if($this->range_end && $cursor >= $this->range_end) continue; 336 337 $C = 0; 338 $resrow = array(); 339 $isempty = true; 340 foreach($this->columns as $col) { 341 $val = $row["C$C"]; 342 if($col->isMulti()) { 343 $val = explode(self::CONCAT_SEPARATOR, $val); 344 } 345 $value = new Value($col, $val); 346 $isempty &= $this->isEmptyValue($value); 347 $resrow[] = $value; 348 $C++; 349 } 350 351 // skip empty rows 352 if($isempty && !$pageidAndRevOnly) { 353 $cursor--; 354 continue; 355 } 356 357 $this->result_pids[] = $row['PID']; 358 $result[] = $resrow; 359 } 360 361 $this->sqlite->res_close($res); 362 $this->count = $cursor + 1; 363 return $result; 364 } 365 366 /** 367 * Transform the set search parameters into a statement 368 * 369 * @return array ($sql, $opts) The SQL and parameters to execute 370 */ 371 public function getSQL() { 372 if(!$this->columns) throw new StructException('nocolname'); 373 374 $QB = new QueryBuilder(); 375 376 // basic tables 377 $first_table = ''; 378 foreach($this->schemas as $schema) { 379 $datatable = 'data_' . $schema->getTable(); 380 if($first_table) { 381 // follow up tables 382 $QB->addLeftJoin($first_table, $datatable, $datatable, "$first_table.pid = $datatable.pid"); 383 } else { 384 // first table 385 386 if(!$schema->isLookup()) { 387 $QB->addTable('schema_assignments'); 388 $QB->filters()->whereAnd("$datatable.pid = schema_assignments.pid"); 389 $QB->filters()->whereAnd("schema_assignments.tbl = '{$schema->getTable()}'"); 390 $QB->filters()->whereAnd("schema_assignments.assigned = 1"); 391 $QB->filters()->whereAnd("GETACCESSLEVEL($datatable.pid) > 0"); 392 $QB->filters()->whereAnd("PAGEEXISTS($datatable.pid) = 1"); 393 } 394 395 $QB->addTable($datatable); 396 $QB->addSelectColumn($datatable, 'pid', 'PID'); 397 $QB->addGroupByColumn($datatable, 'pid'); 398 399 $first_table = $datatable; 400 } 401 $QB->filters()->whereAnd("$datatable.latest = 1"); 402 } 403 404 // columns to select, handling multis 405 $sep = self::CONCAT_SEPARATOR; 406 $n = 0; 407 foreach($this->columns as $col) { 408 $CN = 'C' . $n++; 409 410 if($col->isMulti()) { 411 $datatable = "data_{$col->getTable()}"; 412 $multitable = "multi_{$col->getTable()}"; 413 $MN = $QB->generateTableAlias('M'); 414 415 $QB->addLeftJoin( 416 $datatable, 417 $multitable, 418 $MN, 419 "$datatable.pid = $MN.pid AND 420 $datatable.rev = $MN.rev AND 421 $MN.colref = {$col->getColref()}" 422 ); 423 424 $col->getType()->select($QB, $MN, 'value', $CN); 425 $sel = $QB->getSelectStatement($CN); 426 $QB->addSelectStatement("GROUP_CONCAT($sel, '$sep')", $CN); 427 } else { 428 $col->getType()->select($QB, 'data_' . $col->getTable(), $col->getColName(), $CN); 429 $QB->addGroupByStatement($CN); 430 } 431 } 432 433 // where clauses 434 if (!empty($this->filter)) { 435 $userWHERE = $QB->filters()->where('AND'); 436 } 437 foreach($this->filter as $filter) { 438 /** @var Column $col */ 439 list($col, $value, $comp, $op) = $filter; 440 441 $datatable = "data_{$col->getTable()}"; 442 $multitable = "multi_{$col->getTable()}"; 443 444 /** @var $col Column */ 445 if($col->isMulti()) { 446 $MN = $QB->generateTableAlias('MN'); 447 448 $QB->addLeftJoin( 449 $datatable, 450 $multitable, 451 $MN, 452 "$datatable.pid = $MN.pid AND 453 $datatable.rev = $MN.rev AND 454 $MN.colref = {$col->getColref()}" 455 ); 456 $coltbl = $MN; 457 $colnam = 'value'; 458 } else { 459 $coltbl = $datatable; 460 $colnam = $col->getColName(); 461 } 462 463 $col->getType()->filter($userWHERE, $coltbl, $colnam, $comp, $value, $op); // type based filter 464 } 465 466 // sorting - we always sort by the single val column 467 foreach($this->sortby as $sort) { 468 list($col, $asc, $nc) = $sort; 469 /** @var $col Column */ 470 $colname = $col->getColName(false); 471 if($nc) $colname .= ' COLLATE NOCASE'; 472 $col->getType()->sort($QB, 'data_' . $col->getTable(), $colname, $asc ? 'ASC' : 'DESC'); 473 } 474 475 return $QB->getSQL(); 476 } 477 478 /** 479 * Returns all the columns that where added to the search 480 * 481 * @return Column[] 482 */ 483 public function getColumns() { 484 return $this->columns; 485 } 486 487 /** 488 * All the schemas currently added 489 * 490 * @return Schema[] 491 */ 492 public function getSchemas() { 493 return array_values($this->schemas); 494 } 495 496 /** 497 * Checks if the given column is a * wildcard 498 * 499 * If it's a wildcard all matching columns are added to the column list, otherwise 500 * nothing happens 501 * 502 * @param string $colname 503 * @return bool was wildcard? 504 */ 505 protected function processWildcard($colname) { 506 list($colname, $table) = $this->resolveColumn($colname); 507 if($colname !== '*') return false; 508 509 // no table given? assume the first is meant 510 if($table === null) { 511 $schema_list = array_keys($this->schemas); 512 $table = $schema_list[0]; 513 } 514 515 $schema = $this->schemas[$table]; 516 if(!$schema) return false; 517 $this->columns = array_merge($this->columns, $schema->getColumns(false)); 518 return true; 519 } 520 521 /** 522 * Split a given column name into table and column 523 * 524 * Handles Aliases. Table might be null if none given. 525 * 526 * @param $colname 527 * @return array (colname, table) 528 */ 529 protected function resolveColumn($colname) { 530 if(!$this->schemas) throw new StructException('noschemas'); 531 532 // resolve the alias or table name 533 @list($table, $colname) = explode('.', $colname, 2); 534 if(!$colname) { 535 $colname = $table; 536 $table = null; 537 } 538 if($table && isset($this->aliases[$table])) { 539 $table = $this->aliases[$table]; 540 } 541 542 if(!$colname) throw new StructException('nocolname'); 543 544 return array($colname, $table); 545 } 546 547 /** 548 * Find a column to be used in the search 549 * 550 * @param string $colname may contain an alias 551 * @return bool|Column 552 */ 553 public function findColumn($colname) { 554 if(!$this->schemas) throw new StructException('noschemas'); 555 $schema_list = array_keys($this->schemas); 556 557 // add "fake" column for special col 558 if(!(reset($this->schemas)->isLookup())) { 559 if($colname == '%pageid%') { 560 return new PageColumn(0, new Page(), $schema_list[0]); 561 } 562 if($colname == '%title%') { 563 return new PageColumn(0, new Page(array('usetitles' => true)), $schema_list[0]); 564 } 565 if($colname == '%lastupdate%') { 566 return new RevisionColumn(0, new DateTime(), $schema_list[0]); 567 } 568 if ($colname == '%lasteditor%') { 569 return new UserColumn(0, new User(), $schema_list[0]); 570 } 571 if ($colname == '%lastsummary%') { 572 return new SummaryColumn(0, new Summary(), $schema_list[0]); 573 } 574 } else { 575 if($colname == '%rowid%') { 576 return new RowColumn(0, new Decimal(), $schema_list[0]); 577 } 578 } 579 580 list($colname, $table) = $this->resolveColumn($colname); 581 582 // if table name given search only that, otherwise try all for matching column name 583 if($table !== null) { 584 $schemas = array($table => $this->schemas[$table]); 585 } else { 586 $schemas = $this->schemas; 587 } 588 589 // find it 590 $col = false; 591 foreach($schemas as $schema) { 592 if(empty($schema)) { 593 continue; 594 } 595 $col = $schema->findColumn($colname); 596 if($col) break; 597 } 598 599 return $col; 600 } 601 602 /** 603 * Check if the given row is empty or references our own row 604 * 605 * @param Value $value 606 * @return bool 607 */ 608 protected function isEmptyValue(Value $value) { 609 if ($value->isEmpty()) return true; 610 if ($value->getColumn()->getTid() == 0) return true; 611 return false; 612 } 613} 614 615 616