1<?php 2 3namespace dokuwiki\plugin\struct\meta; 4 5use dokuwiki\plugin\struct\types\DateTime; 6use dokuwiki\plugin\struct\types\Decimal; 7use dokuwiki\plugin\struct\types\Page; 8 9class Search { 10 /** 11 * This separator will be used to concat multi values to flatten them in the result set 12 */ 13 const CONCAT_SEPARATOR = "\n!_-_-_-_-_!\n"; 14 15 /** 16 * The list of known and allowed comparators 17 * (order matters) 18 */ 19 static public $COMPARATORS = array( 20 '<=', '>=', '=*', '=', '<', '>', '!=', '!~', '~', 21 ); 22 23 /** @var \helper_plugin_sqlite */ 24 protected $sqlite; 25 26 /** @var Schema[] list of schemas to query */ 27 protected $schemas = array(); 28 29 /** @var Column[] list of columns to select */ 30 protected $columns = array(); 31 32 /** @var array the sorting of the result */ 33 protected $sortby = array(); 34 35 /** @var array the filters */ 36 protected $filter = array(); 37 38 /** @var array list of aliases tables can be referenced by */ 39 protected $aliases = array(); 40 41 /** @var int begin results from here */ 42 protected $range_begin = 0; 43 44 /** @var int end results here */ 45 protected $range_end = 0; 46 47 /** @var int the number of results */ 48 protected $count = -1; 49 /** @var string[] the PIDs of the result rows */ 50 protected $result_pids = null; 51 52 /** 53 * Search constructor. 54 */ 55 public function __construct() { 56 /** @var \helper_plugin_struct_db $plugin */ 57 $plugin = plugin_load('helper', 'struct_db'); 58 $this->sqlite = $plugin->getDB(); 59 } 60 61 /** 62 * Add a schema to be searched 63 * 64 * Call multiple times for multiple schemas. 65 * 66 * @param string $table 67 * @param string $alias 68 */ 69 public function addSchema($table, $alias = '') { 70 $schema = new Schema($table); 71 if(!$schema->getId()) { 72 throw new StructException('schema missing', $table); 73 } 74 75 if($this->schemas && 76 ( 77 $schema->isLookup() || 78 reset($this->schemas)->isLookup() 79 ) 80 ) { 81 throw new StructException('nolookupmix'); 82 } 83 84 $this->schemas[$schema->getTable()] = $schema; 85 if($alias) $this->aliases[$alias] = $schema->getTable(); 86 } 87 88 /** 89 * Add a column to be returned by the search 90 * 91 * Call multiple times for multiple columns. Be sure the referenced tables have been 92 * added before 93 * 94 * @param string $colname may contain an alias 95 */ 96 public function addColumn($colname) { 97 if($this->processWildcard($colname)) return; // wildcard? 98 $col = $this->findColumn($colname); 99 if(!$col) return; //FIXME do we really want to ignore missing columns? 100 $this->columns[] = $col; 101 } 102 103 /** 104 * Add sorting options 105 * 106 * Call multiple times for multiple columns. Be sure the referenced tables have been 107 * added before 108 * 109 * @param string $colname may contain an alias 110 * @param bool $asc sort direction (ASC = true, DESC = false) 111 * @param bool $nc set true for caseinsensitivity 112 */ 113 public function addSort($colname, $asc = true, $nc = true) { 114 $col = $this->findColumn($colname); 115 if(!$col) return; //FIXME do we really want to ignore missing columns? 116 117 $this->sortby[$col->getFullQualifiedLabel()] = array($col, $asc, $nc); 118 } 119 120 /** 121 * Returns all set sort columns 122 * 123 * @return array 124 */ 125 public function getSorts() { 126 return $this->sortby; 127 } 128 129 /** 130 * Adds a filter 131 * 132 * @param string $colname may contain an alias 133 * @param string|string[] $value 134 * @param string $comp @see self::COMPARATORS 135 * @param string $op either 'OR' or 'AND' 136 */ 137 public function addFilter($colname, $value, $comp, $op = 'OR') { 138 /* Convert certain filters into others 139 * this reduces the number of supported filters to implement in types */ 140 if($comp == '*~') { 141 $value = $this->filterWrapAsterisks($value); 142 $comp = '~'; 143 } elseif($comp == '<>') { 144 $comp = '!='; 145 } 146 147 if(!in_array($comp, self::$COMPARATORS)) throw new StructException("Bad comperator. Use " . join(',', self::$COMPARATORS)); 148 if($op != 'OR' && $op != 'AND') throw new StructException('Bad filter type . Only AND or OR allowed'); 149 150 $col = $this->findColumn($colname); 151 if(!$col) return; // ignore missing columns, filter might have been for different schema 152 153 // map filter operators to SQL syntax 154 switch($comp) { 155 case '~': 156 $comp = 'LIKE'; 157 break; 158 case '!~': 159 $comp = 'NOT LIKE'; 160 break; 161 case '=*': 162 $comp = 'REGEXP'; 163 break; 164 } 165 166 // we use asterisks, but SQL wants percents 167 if($comp == 'LIKE' || $comp == 'NOT LIKE') { 168 $value = $this->filterChangeToLike($value); 169 } 170 171 // add the filter 172 $this->filter[] = array($col, $value, $comp, $op); 173 } 174 175 /** 176 * Wrap given value in asterisks 177 * 178 * @param string|string[] $value 179 * @return string|string[] 180 */ 181 protected function filterWrapAsterisks($value) { 182 $map = function ($input) { 183 return "*$input*"; 184 }; 185 186 if(is_array($value)) { 187 $value = array_map($map, $value); 188 } else { 189 $value = $map($value); 190 } 191 return $value; 192 } 193 194 /** 195 * Change given string to use % instead of * 196 * 197 * @param string|string[] $value 198 * @return string|string[] 199 */ 200 protected function filterChangeToLike($value) { 201 $map = function ($input) { 202 return str_replace('*', '%', $input); 203 }; 204 205 if(is_array($value)) { 206 $value = array_map($map, $value); 207 } else { 208 $value = $map($value); 209 } 210 return $value; 211 } 212 213 /** 214 * Set offset for the results 215 * 216 * @param int $offset 217 */ 218 public function setOffset($offset) { 219 $limit = 0; 220 if($this->range_end) { 221 // if there was a limit set previously, the range_end needs to be recalculated 222 $limit = $this->range_end - $this->range_begin; 223 } 224 $this->range_begin = $offset; 225 if($limit) $this->setLimit($limit); 226 } 227 228 /** 229 * Limit results to this number 230 * 231 * @param int $limit Set to 0 to disable limit again 232 */ 233 public function setLimit($limit) { 234 if($limit) { 235 $this->range_end = $this->range_begin + $limit; 236 } else { 237 $this->range_end = 0; 238 } 239 } 240 241 /** 242 * Return the number of results (regardless of limit and offset settings) 243 * 244 * Use this to implement paging. Important: this may only be called after running @see execute() 245 * 246 * @return int 247 */ 248 public function getCount() { 249 if($this->count < 0) throw new StructException('Count is only accessible after executing the search'); 250 return $this->count; 251 } 252 253 /** 254 * Returns the PID associated with each result row 255 * 256 * Important: this may only be called after running @see execute() 257 * 258 * @return \string[] 259 */ 260 public function getPids() { 261 if($this->result_pids === null) throw new StructException('PIDs are only accessible after executing the search'); 262 return $this->result_pids; 263 } 264 265 /** 266 * Execute this search and return the result 267 * 268 * The result is a two dimensional array of Value()s. 269 * 270 * This will always query for the full result (not using offset and limit) and then 271 * return the wanted range, setting the count (@see getCount) to the whole result number 272 * 273 * @return Value[][] 274 */ 275 public function execute() { 276 list($sql, $opts) = $this->getSQL(); 277 278 /** @var \PDOStatement $res */ 279 $res = $this->sqlite->query($sql, $opts); 280 if($res === false) throw new StructException("SQL execution failed for\n\n$sql"); 281 282 $this->result_pids = array(); 283 $result = array(); 284 $cursor = -1; 285 $pageidAndRevOnly = array_reduce($this->columns, function ($pageidAndRevOnly, Column $col) { 286 return $pageidAndRevOnly && ($col->getTid() == 0); 287 }, true); 288 while($row = $res->fetch(\PDO::FETCH_ASSOC)) { 289 $cursor++; 290 if($cursor < $this->range_begin) continue; 291 if($this->range_end && $cursor >= $this->range_end) continue; 292 293 $C = 0; 294 $resrow = array(); 295 $isempty = true; 296 foreach($this->columns as $col) { 297 $val = $row["C$C"]; 298 if($col->isMulti()) { 299 $val = explode(self::CONCAT_SEPARATOR, $val); 300 } 301 $value = new Value($col, $val); 302 $isempty &= $this->isEmptyValue($value); 303 $resrow[] = $value; 304 $C++; 305 } 306 307 // skip empty rows 308 if($isempty && !$pageidAndRevOnly) { 309 $cursor--; 310 continue; 311 } 312 313 $this->result_pids[] = $row['PID']; 314 $result[] = $resrow; 315 } 316 317 $this->sqlite->res_close($res); 318 $this->count = $cursor + 1; 319 return $result; 320 } 321 322 /** 323 * Transform the set search parameters into a statement 324 * 325 * @return array ($sql, $opts) The SQL and parameters to execute 326 */ 327 public function getSQL() { 328 if(!$this->columns) throw new StructException('nocolname'); 329 330 $QB = new QueryBuilder(); 331 332 // basic tables 333 $first_table = ''; 334 foreach($this->schemas as $schema) { 335 $datatable = 'data_' . $schema->getTable(); 336 if($first_table) { 337 // follow up tables 338 $QB->addLeftJoin($first_table, $datatable, $datatable, "$first_table.pid = $datatable.pid"); 339 } else { 340 // first table 341 342 if(!$schema->isLookup()) { 343 $QB->addTable('schema_assignments'); 344 $QB->filters()->whereAnd("$datatable.pid = schema_assignments.pid"); 345 $QB->filters()->whereAnd("schema_assignments.tbl = '{$schema->getTable()}'"); 346 $QB->filters()->whereAnd("schema_assignments.assigned = 1"); 347 $QB->filters()->whereAnd("GETACCESSLEVEL($datatable.pid) > 0"); 348 $QB->filters()->whereAnd("PAGEEXISTS($datatable.pid) = 1"); 349 } 350 351 $QB->addTable($datatable); 352 $QB->addSelectColumn($datatable, 'pid', 'PID'); 353 $QB->addGroupByColumn($datatable, 'pid'); 354 355 $first_table = $datatable; 356 } 357 $QB->filters()->whereAnd("$datatable.latest = 1"); 358 } 359 360 // columns to select, handling multis 361 $sep = self::CONCAT_SEPARATOR; 362 $n = 0; 363 foreach($this->columns as $col) { 364 $CN = 'C' . $n++; 365 366 if($col->isMulti()) { 367 $datatable = "data_{$col->getTable()}"; 368 $multitable = "multi_{$col->getTable()}"; 369 $MN = 'M' . $col->getColref(); 370 371 $QB->addLeftJoin( 372 $datatable, 373 $multitable, 374 $MN, 375 "$datatable.pid = $MN.pid AND 376 $datatable.rev = $MN.rev AND 377 $MN.colref = {$col->getColref()}" 378 ); 379 380 $col->getType()->select($QB, $MN, 'value', $CN); 381 $sel = $QB->getSelectStatement($CN); 382 $QB->addSelectStatement("GROUP_CONCAT($sel, '$sep')", $CN); 383 } else { 384 $col->getType()->select($QB, 'data_' . $col->getTable(), $col->getColName(), $CN); 385 $QB->addGroupByStatement($CN); 386 } 387 } 388 389 // where clauses 390 foreach($this->filter as $filter) { 391 list($col, $value, $comp, $op) = $filter; 392 393 $datatable = "data_{$col->getTable()}"; 394 $multitable = "multi_{$col->getTable()}"; 395 396 /** @var $col Column */ 397 if($col->isMulti()) { 398 $MN = 'MN' . $col->getColref(); // FIXME this joins a second time if the column was selected before 399 400 $QB->addLeftJoin( 401 $datatable, 402 $multitable, 403 $MN, 404 "$datatable.pid = $MN.pid AND 405 $datatable.rev = $MN.rev AND 406 $MN.colref = {$col->getColref()}" 407 ); 408 $coltbl = $MN; 409 $colnam = 'value'; 410 } else { 411 $coltbl = $datatable; 412 $colnam = $col->getColName(); 413 } 414 415 $col->getType()->filter($QB, $coltbl, $colnam, $comp, $value, $op); // type based filter 416 } 417 418 // sorting - we always sort by the single val column 419 foreach($this->sortby as $sort) { 420 list($col, $asc, $nc) = $sort; 421 /** @var $col Column */ 422 $colname = $col->getColName(false); 423 if($nc) $colname .= ' COLLATE NOCASE'; 424 $col->getType()->sort($QB, 'data_' . $col->getTable(), $colname, $asc ? 'ASC' : 'DESC'); 425 } 426 427 return $QB->getSQL(); 428 } 429 430 /** 431 * Returns all the columns that where added to the search 432 * 433 * @return Column[] 434 */ 435 public function getColumns() { 436 return $this->columns; 437 } 438 439 /** 440 * Checks if the given column is a * wildcard 441 * 442 * If it's a wildcard all matching columns are added to the column list, otherwise 443 * nothing happens 444 * 445 * @param string $colname 446 * @return bool was wildcard? 447 */ 448 protected function processWildcard($colname) { 449 list($colname, $table) = $this->resolveColumn($colname); 450 if($colname !== '*') return false; 451 452 // no table given? assume the first is meant 453 if($table === null) { 454 $schema_list = array_keys($this->schemas); 455 $table = $schema_list[0]; 456 } 457 458 $schema = $this->schemas[$table]; 459 if(!$schema) return false; 460 $this->columns = array_merge($this->columns, $schema->getColumns(false)); 461 return true; 462 } 463 464 /** 465 * Split a given column name into table and column 466 * 467 * Handles Aliases. Table might be null if none given. 468 * 469 * @param $colname 470 * @return array (colname, table) 471 */ 472 protected function resolveColumn($colname) { 473 if(!$this->schemas) throw new StructException('noschemas'); 474 475 // resolve the alias or table name 476 list($table, $colname) = explode('.', $colname, 2); 477 if(!$colname) { 478 $colname = $table; 479 $table = null; 480 } 481 if($table && isset($this->aliases[$table])) { 482 $table = $this->aliases[$table]; 483 } 484 485 if(!$colname) throw new StructException('nocolname'); 486 487 return array($colname, $table); 488 } 489 490 /** 491 * Find a column to be used in the search 492 * 493 * @param string $colname may contain an alias 494 * @return bool|Column 495 */ 496 public function findColumn($colname) { 497 if(!$this->schemas) throw new StructException('noschemas'); 498 $schema_list = array_keys($this->schemas); 499 500 // add "fake" column for special col 501 if(!(reset($this->schemas)->isLookup())) { 502 if($colname == '%pageid%') { 503 return new PageColumn(0, new Page(), $schema_list[0]); 504 } 505 if($colname == '%title%') { 506 return new PageColumn(0, new Page(array('usetitles' => true)), $schema_list[0]); 507 } 508 if($colname == '%lastupdate%') { 509 return new RevisionColumn(0, new DateTime(), $schema_list[0]); 510 } 511 } else { 512 if($colname == '%rowid%') { 513 return new RowColumn(0, new Decimal(), $schema_list[0]); 514 } 515 } 516 517 list($colname, $table) = $this->resolveColumn($colname); 518 519 // if table name given search only that, otherwise try all for matching column name 520 if($table !== null) { 521 $schemas = array($table => $this->schemas[$table]); 522 } else { 523 $schemas = $this->schemas; 524 } 525 526 // find it 527 $col = false; 528 foreach($schemas as $schema) { 529 if(empty($schema)) { 530 continue; 531 } 532 $col = $schema->findColumn($colname); 533 if($col) break; 534 } 535 536 return $col; 537 } 538 539 /** 540 * Check if the given row is empty or references our own row 541 * 542 * @param Value $value 543 * @return bool 544 */ 545 protected function isEmptyValue(Value $value) { 546 if ($value->isEmpty()) return true; 547 if ($value->getColumn()->getTid() == 0) return true; 548 return false; 549 } 550} 551 552 553