1<?php 2 3namespace dokuwiki\plugin\struct\meta; 4 5use dokuwiki\plugin\struct\types\DateTime; 6use dokuwiki\plugin\struct\types\Decimal; 7use dokuwiki\plugin\struct\types\Page; 8use dokuwiki\plugin\struct\types\User; 9 10class Search { 11 /** 12 * This separator will be used to concat multi values to flatten them in the result set 13 */ 14 const CONCAT_SEPARATOR = "\n!_-_-_-_-_!\n"; 15 16 /** 17 * The list of known and allowed comparators 18 * (order matters) 19 */ 20 static public $COMPARATORS = array( 21 '<=', '>=', '=*', '=', '<', '>', '!=', '!~', '~', 22 ); 23 24 /** @var \helper_plugin_sqlite */ 25 protected $sqlite; 26 27 /** @var Schema[] list of schemas to query */ 28 protected $schemas = array(); 29 30 /** @var Column[] list of columns to select */ 31 protected $columns = array(); 32 33 /** @var array the sorting of the result */ 34 protected $sortby = array(); 35 36 /** @var array the filters */ 37 protected $filter = array(); 38 39 /** @var array list of aliases tables can be referenced by */ 40 protected $aliases = array(); 41 42 /** @var int begin results from here */ 43 protected $range_begin = 0; 44 45 /** @var int end results here */ 46 protected $range_end = 0; 47 48 /** @var int the number of results */ 49 protected $count = -1; 50 /** @var string[] the PIDs of the result rows */ 51 protected $result_pids = null; 52 53 /** 54 * Search constructor. 55 */ 56 public function __construct() { 57 /** @var \helper_plugin_struct_db $plugin */ 58 $plugin = plugin_load('helper', 'struct_db'); 59 $this->sqlite = $plugin->getDB(); 60 } 61 62 /** 63 * Add a schema to be searched 64 * 65 * Call multiple times for multiple schemas. 66 * 67 * @param string $table 68 * @param string $alias 69 */ 70 public function addSchema($table, $alias = '') { 71 $schema = new Schema($table); 72 if(!$schema->getId()) { 73 throw new StructException('schema missing', $table); 74 } 75 76 if($this->schemas && 77 ( 78 $schema->isLookup() || 79 reset($this->schemas)->isLookup() 80 ) 81 ) { 82 throw new StructException('nolookupmix'); 83 } 84 85 $this->schemas[$schema->getTable()] = $schema; 86 if($alias) $this->aliases[$alias] = $schema->getTable(); 87 } 88 89 /** 90 * Add a column to be returned by the search 91 * 92 * Call multiple times for multiple columns. Be sure the referenced tables have been 93 * added before 94 * 95 * @param string $colname may contain an alias 96 */ 97 public function addColumn($colname) { 98 if($this->processWildcard($colname)) return; // wildcard? 99 $col = $this->findColumn($colname); 100 if(!$col) return; //FIXME do we really want to ignore missing columns? 101 $this->columns[] = $col; 102 } 103 104 /** 105 * Add sorting options 106 * 107 * Call multiple times for multiple columns. Be sure the referenced tables have been 108 * added before 109 * 110 * @param string $colname may contain an alias 111 * @param bool $asc sort direction (ASC = true, DESC = false) 112 * @param bool $nc set true for caseinsensitivity 113 */ 114 public function addSort($colname, $asc = true, $nc = true) { 115 $col = $this->findColumn($colname); 116 if(!$col) return; //FIXME do we really want to ignore missing columns? 117 118 $this->sortby[$col->getFullQualifiedLabel()] = array($col, $asc, $nc); 119 } 120 121 /** 122 * Returns all set sort columns 123 * 124 * @return array 125 */ 126 public function getSorts() { 127 return $this->sortby; 128 } 129 130 /** 131 * Adds a filter 132 * 133 * @param string $colname may contain an alias 134 * @param string|string[] $value 135 * @param string $comp @see self::COMPARATORS 136 * @param string $op either 'OR' or 'AND' 137 */ 138 public function addFilter($colname, $value, $comp, $op = 'OR') { 139 /* Convert certain filters into others 140 * this reduces the number of supported filters to implement in types */ 141 if($comp == '*~') { 142 $value = $this->filterWrapAsterisks($value); 143 $comp = '~'; 144 } elseif($comp == '<>') { 145 $comp = '!='; 146 } 147 148 if(!in_array($comp, self::$COMPARATORS)) throw new StructException("Bad comperator. Use " . join(',', self::$COMPARATORS)); 149 if($op != 'OR' && $op != 'AND') throw new StructException('Bad filter type . Only AND or OR allowed'); 150 151 $col = $this->findColumn($colname); 152 if(!$col) return; // ignore missing columns, filter might have been for different schema 153 154 // map filter operators to SQL syntax 155 switch($comp) { 156 case '~': 157 $comp = 'LIKE'; 158 break; 159 case '!~': 160 $comp = 'NOT LIKE'; 161 break; 162 case '=*': 163 $comp = 'REGEXP'; 164 break; 165 } 166 167 // we use asterisks, but SQL wants percents 168 if($comp == 'LIKE' || $comp == 'NOT LIKE') { 169 $value = $this->filterChangeToLike($value); 170 } 171 172 // add the filter 173 $this->filter[] = array($col, $value, $comp, $op); 174 } 175 176 /** 177 * Wrap given value in asterisks 178 * 179 * @param string|string[] $value 180 * @return string|string[] 181 */ 182 protected function filterWrapAsterisks($value) { 183 $map = function ($input) { 184 return "*$input*"; 185 }; 186 187 if(is_array($value)) { 188 $value = array_map($map, $value); 189 } else { 190 $value = $map($value); 191 } 192 return $value; 193 } 194 195 /** 196 * Change given string to use % instead of * 197 * 198 * @param string|string[] $value 199 * @return string|string[] 200 */ 201 protected function filterChangeToLike($value) { 202 $map = function ($input) { 203 return str_replace('*', '%', $input); 204 }; 205 206 if(is_array($value)) { 207 $value = array_map($map, $value); 208 } else { 209 $value = $map($value); 210 } 211 return $value; 212 } 213 214 /** 215 * Set offset for the results 216 * 217 * @param int $offset 218 */ 219 public function setOffset($offset) { 220 $limit = 0; 221 if($this->range_end) { 222 // if there was a limit set previously, the range_end needs to be recalculated 223 $limit = $this->range_end - $this->range_begin; 224 } 225 $this->range_begin = $offset; 226 if($limit) $this->setLimit($limit); 227 } 228 229 /** 230 * Limit results to this number 231 * 232 * @param int $limit Set to 0 to disable limit again 233 */ 234 public function setLimit($limit) { 235 if($limit) { 236 $this->range_end = $this->range_begin + $limit; 237 } else { 238 $this->range_end = 0; 239 } 240 } 241 242 /** 243 * Return the number of results (regardless of limit and offset settings) 244 * 245 * Use this to implement paging. Important: this may only be called after running @see execute() 246 * 247 * @return int 248 */ 249 public function getCount() { 250 if($this->count < 0) throw new StructException('Count is only accessible after executing the search'); 251 return $this->count; 252 } 253 254 /** 255 * Returns the PID associated with each result row 256 * 257 * Important: this may only be called after running @see execute() 258 * 259 * @return \string[] 260 */ 261 public function getPids() { 262 if($this->result_pids === null) throw new StructException('PIDs are only accessible after executing the search'); 263 return $this->result_pids; 264 } 265 266 /** 267 * Execute this search and return the result 268 * 269 * The result is a two dimensional array of Value()s. 270 * 271 * This will always query for the full result (not using offset and limit) and then 272 * return the wanted range, setting the count (@see getCount) to the whole result number 273 * 274 * @return Value[][] 275 */ 276 public function execute() { 277 list($sql, $opts) = $this->getSQL(); 278 279 /** @var \PDOStatement $res */ 280 $res = $this->sqlite->query($sql, $opts); 281 if($res === false) throw new StructException("SQL execution failed for\n\n$sql"); 282 283 $this->result_pids = array(); 284 $result = array(); 285 $cursor = -1; 286 $pageidAndRevOnly = array_reduce($this->columns, function ($pageidAndRevOnly, Column $col) { 287 return $pageidAndRevOnly && ($col->getTid() == 0); 288 }, true); 289 while($row = $res->fetch(\PDO::FETCH_ASSOC)) { 290 $cursor++; 291 if($cursor < $this->range_begin) continue; 292 if($this->range_end && $cursor >= $this->range_end) continue; 293 294 $C = 0; 295 $resrow = array(); 296 $isempty = true; 297 foreach($this->columns as $col) { 298 $val = $row["C$C"]; 299 if($col->isMulti()) { 300 $val = explode(self::CONCAT_SEPARATOR, $val); 301 } 302 $value = new Value($col, $val); 303 $isempty &= $this->isEmptyValue($value); 304 $resrow[] = $value; 305 $C++; 306 } 307 308 // skip empty rows 309 if($isempty && !$pageidAndRevOnly) { 310 $cursor--; 311 continue; 312 } 313 314 $this->result_pids[] = $row['PID']; 315 $result[] = $resrow; 316 } 317 318 $this->sqlite->res_close($res); 319 $this->count = $cursor + 1; 320 return $result; 321 } 322 323 /** 324 * Transform the set search parameters into a statement 325 * 326 * @return array ($sql, $opts) The SQL and parameters to execute 327 */ 328 public function getSQL() { 329 if(!$this->columns) throw new StructException('nocolname'); 330 331 $QB = new QueryBuilder(); 332 333 // basic tables 334 $first_table = ''; 335 foreach($this->schemas as $schema) { 336 $datatable = 'data_' . $schema->getTable(); 337 if($first_table) { 338 // follow up tables 339 $QB->addLeftJoin($first_table, $datatable, $datatable, "$first_table.pid = $datatable.pid"); 340 } else { 341 // first table 342 343 if(!$schema->isLookup()) { 344 $QB->addTable('schema_assignments'); 345 $QB->filters()->whereAnd("$datatable.pid = schema_assignments.pid"); 346 $QB->filters()->whereAnd("schema_assignments.tbl = '{$schema->getTable()}'"); 347 $QB->filters()->whereAnd("schema_assignments.assigned = 1"); 348 $QB->filters()->whereAnd("GETACCESSLEVEL($datatable.pid) > 0"); 349 $QB->filters()->whereAnd("PAGEEXISTS($datatable.pid) = 1"); 350 } 351 352 $QB->addTable($datatable); 353 $QB->addSelectColumn($datatable, 'pid', 'PID'); 354 $QB->addGroupByColumn($datatable, 'pid'); 355 356 $first_table = $datatable; 357 } 358 $QB->filters()->whereAnd("$datatable.latest = 1"); 359 } 360 361 // columns to select, handling multis 362 $sep = self::CONCAT_SEPARATOR; 363 $n = 0; 364 foreach($this->columns as $col) { 365 $CN = 'C' . $n++; 366 367 if($col->isMulti()) { 368 $datatable = "data_{$col->getTable()}"; 369 $multitable = "multi_{$col->getTable()}"; 370 $MN = $QB->generateTableAlias('M'); 371 372 $QB->addLeftJoin( 373 $datatable, 374 $multitable, 375 $MN, 376 "$datatable.pid = $MN.pid AND 377 $datatable.rev = $MN.rev AND 378 $MN.colref = {$col->getColref()}" 379 ); 380 381 $col->getType()->select($QB, $MN, 'value', $CN); 382 $sel = $QB->getSelectStatement($CN); 383 $QB->addSelectStatement("GROUP_CONCAT($sel, '$sep')", $CN); 384 } else { 385 $col->getType()->select($QB, 'data_' . $col->getTable(), $col->getColName(), $CN); 386 $QB->addGroupByStatement($CN); 387 } 388 } 389 390 // where clauses 391 if (!empty($this->filter)) { 392 $userWHERE = $QB->filters()->where('AND'); 393 } 394 foreach($this->filter as $filter) { 395 list($col, $value, $comp, $op) = $filter; 396 397 $datatable = "data_{$col->getTable()}"; 398 $multitable = "multi_{$col->getTable()}"; 399 400 /** @var $col Column */ 401 if($col->isMulti()) { 402 $MN = $QB->generateTableAlias('MN'); 403 404 $QB->addLeftJoin( 405 $datatable, 406 $multitable, 407 $MN, 408 "$datatable.pid = $MN.pid AND 409 $datatable.rev = $MN.rev AND 410 $MN.colref = {$col->getColref()}" 411 ); 412 $coltbl = $MN; 413 $colnam = 'value'; 414 } else { 415 $coltbl = $datatable; 416 $colnam = $col->getColName(); 417 } 418 419 $col->getType()->filter($userWHERE, $coltbl, $colnam, $comp, $value, $op); // type based filter 420 } 421 422 // sorting - we always sort by the single val column 423 foreach($this->sortby as $sort) { 424 list($col, $asc, $nc) = $sort; 425 /** @var $col Column */ 426 $colname = $col->getColName(false); 427 if($nc) $colname .= ' COLLATE NOCASE'; 428 $col->getType()->sort($QB, 'data_' . $col->getTable(), $colname, $asc ? 'ASC' : 'DESC'); 429 } 430 431 return $QB->getSQL(); 432 } 433 434 /** 435 * Returns all the columns that where added to the search 436 * 437 * @return Column[] 438 */ 439 public function getColumns() { 440 return $this->columns; 441 } 442 443 /** 444 * All the schemas currently added 445 * 446 * @return Schema[] 447 */ 448 public function getSchemas() { 449 return array_values($this->schemas); 450 } 451 452 /** 453 * Checks if the given column is a * wildcard 454 * 455 * If it's a wildcard all matching columns are added to the column list, otherwise 456 * nothing happens 457 * 458 * @param string $colname 459 * @return bool was wildcard? 460 */ 461 protected function processWildcard($colname) { 462 list($colname, $table) = $this->resolveColumn($colname); 463 if($colname !== '*') return false; 464 465 // no table given? assume the first is meant 466 if($table === null) { 467 $schema_list = array_keys($this->schemas); 468 $table = $schema_list[0]; 469 } 470 471 $schema = $this->schemas[$table]; 472 if(!$schema) return false; 473 $this->columns = array_merge($this->columns, $schema->getColumns(false)); 474 return true; 475 } 476 477 /** 478 * Split a given column name into table and column 479 * 480 * Handles Aliases. Table might be null if none given. 481 * 482 * @param $colname 483 * @return array (colname, table) 484 */ 485 protected function resolveColumn($colname) { 486 if(!$this->schemas) throw new StructException('noschemas'); 487 488 // resolve the alias or table name 489 list($table, $colname) = explode('.', $colname, 2); 490 if(!$colname) { 491 $colname = $table; 492 $table = null; 493 } 494 if($table && isset($this->aliases[$table])) { 495 $table = $this->aliases[$table]; 496 } 497 498 if(!$colname) throw new StructException('nocolname'); 499 500 return array($colname, $table); 501 } 502 503 /** 504 * Find a column to be used in the search 505 * 506 * @param string $colname may contain an alias 507 * @return bool|Column 508 */ 509 public function findColumn($colname) { 510 if(!$this->schemas) throw new StructException('noschemas'); 511 $schema_list = array_keys($this->schemas); 512 513 // add "fake" column for special col 514 if(!(reset($this->schemas)->isLookup())) { 515 if($colname == '%pageid%') { 516 return new PageColumn(0, new Page(), $schema_list[0]); 517 } 518 if($colname == '%title%') { 519 return new PageColumn(0, new Page(array('usetitles' => true)), $schema_list[0]); 520 } 521 if($colname == '%lastupdate%') { 522 return new RevisionColumn(0, new DateTime(), $schema_list[0]); 523 } 524 if ($colname == '%lasteditor%') { 525 return new UserColumn(0, new User(), $schema_list[0]); 526 } 527 } else { 528 if($colname == '%rowid%') { 529 return new RowColumn(0, new Decimal(), $schema_list[0]); 530 } 531 } 532 533 list($colname, $table) = $this->resolveColumn($colname); 534 535 // if table name given search only that, otherwise try all for matching column name 536 if($table !== null) { 537 $schemas = array($table => $this->schemas[$table]); 538 } else { 539 $schemas = $this->schemas; 540 } 541 542 // find it 543 $col = false; 544 foreach($schemas as $schema) { 545 if(empty($schema)) { 546 continue; 547 } 548 $col = $schema->findColumn($colname); 549 if($col) break; 550 } 551 552 return $col; 553 } 554 555 /** 556 * Check if the given row is empty or references our own row 557 * 558 * @param Value $value 559 * @return bool 560 */ 561 protected function isEmptyValue(Value $value) { 562 if ($value->isEmpty()) return true; 563 if ($value->getColumn()->getTid() == 0) return true; 564 return false; 565 } 566} 567 568 569