1<?php 2 3namespace dokuwiki\plugin\struct\meta; 4 5use dokuwiki\plugin\struct\types\DateTime; 6use dokuwiki\plugin\struct\types\Decimal; 7use dokuwiki\plugin\struct\types\Page; 8 9class Search { 10 /** 11 * This separator will be used to concat multi values to flatten them in the result set 12 */ 13 const CONCAT_SEPARATOR = "\n!_-_-_-_-_!\n"; 14 15 /** 16 * The list of known and allowed comparators 17 * (order matters) 18 */ 19 static public $COMPARATORS = array( 20 '<=', '>=', '=*', '=', '<', '>', '!=', '!~', '~', 21 ); 22 23 /** @var \helper_plugin_sqlite */ 24 protected $sqlite; 25 26 /** @var Schema[] list of schemas to query */ 27 protected $schemas = array(); 28 29 /** @var Column[] list of columns to select */ 30 protected $columns = array(); 31 32 /** @var array the sorting of the result */ 33 protected $sortby = array(); 34 35 /** @var array the filters */ 36 protected $filter = array(); 37 38 /** @var array list of aliases tables can be referenced by */ 39 protected $aliases = array(); 40 41 /** @var int begin results from here */ 42 protected $range_begin = 0; 43 44 /** @var int end results here */ 45 protected $range_end = 0; 46 47 /** @var int the number of results */ 48 protected $count = -1; 49 /** @var string[] the PIDs of the result rows */ 50 protected $result_pids = null; 51 52 /** 53 * Search constructor. 54 */ 55 public function __construct() { 56 /** @var \helper_plugin_struct_db $plugin */ 57 $plugin = plugin_load('helper', 'struct_db'); 58 $this->sqlite = $plugin->getDB(); 59 } 60 61 /** 62 * Add a schema to be searched 63 * 64 * Call multiple times for multiple schemas. 65 * 66 * @param string $table 67 * @param string $alias 68 */ 69 public function addSchema($table, $alias = '') { 70 $schema = new Schema($table); 71 if(!$schema->getId()) { 72 throw new StructException('schema missing', $table); 73 } 74 75 if($this->schemas && 76 ( 77 $schema->isLookup() || 78 reset($this->schemas)->isLookup() 79 ) 80 ) { 81 throw new StructException('nolookupmix'); 82 } 83 84 $this->schemas[$table] = $schema; 85 if($alias) $this->aliases[$alias] = $table; 86 } 87 88 /** 89 * Add a column to be returned by the search 90 * 91 * Call multiple times for multiple columns. Be sure the referenced tables have been 92 * added before 93 * 94 * @param string $colname may contain an alias 95 */ 96 public function addColumn($colname) { 97 if($this->processWildcard($colname)) return; // wildcard? 98 $col = $this->findColumn($colname); 99 if(!$col) return; //FIXME do we really want to ignore missing columns? 100 $this->columns[] = $col; 101 } 102 103 /** 104 * Add sorting options 105 * 106 * Call multiple times for multiple columns. Be sure the referenced tables have been 107 * added before 108 * 109 * @param string $colname may contain an alias 110 * @param bool $asc sort direction (ASC = true, DESC = false) 111 */ 112 public function addSort($colname, $asc = true) { 113 $col = $this->findColumn($colname); 114 if(!$col) return; //FIXME do we really want to ignore missing columns? 115 116 $this->sortby[$col->getFullQualifiedLabel()] = array($col, $asc); 117 } 118 119 /** 120 * Returns all set sort columns 121 * 122 * @return array 123 */ 124 public function getSorts() { 125 return $this->sortby; 126 } 127 128 /** 129 * Adds a filter 130 * 131 * @param string $colname may contain an alias 132 * @param string|string[] $value 133 * @param string $comp @see self::COMPARATORS 134 * @param string $op either 'OR' or 'AND' 135 */ 136 public function addFilter($colname, $value, $comp, $op = 'OR') { 137 /* Convert certain filters into others 138 * this reduces the number of supported filters to implement in types */ 139 if($comp == '*~') { 140 $value = $this->filterWrapAsterisks($value); 141 $comp = '~'; 142 } elseif($comp == '<>') { 143 $comp = '!='; 144 } 145 146 if(!in_array($comp, self::$COMPARATORS)) throw new StructException("Bad comperator. Use " . join(',', self::$COMPARATORS)); 147 if($op != 'OR' && $op != 'AND') throw new StructException('Bad filter type . Only AND or OR allowed'); 148 149 $col = $this->findColumn($colname); 150 if(!$col) return; // ignore missing columns, filter might have been for different schema 151 152 // map filter operators to SQL syntax 153 switch($comp) { 154 case '~': 155 $comp = 'LIKE'; 156 break; 157 case '!~': 158 $comp = 'NOT LIKE'; 159 break; 160 case '=*': 161 $comp = 'REGEXP'; 162 break; 163 } 164 165 // we use asterisks, but SQL wants percents 166 if($comp == 'LIKE' || $comp == 'NOT LIKE') { 167 $value = $this->filterChangeToLike($value); 168 } 169 170 // add the filter 171 $this->filter[] = array($col, $value, $comp, $op); 172 } 173 174 /** 175 * Wrap given value in asterisks 176 * 177 * @param string|string[] $value 178 * @return string|string[] 179 */ 180 protected function filterWrapAsterisks($value) { 181 $map = function ($input) { 182 return "*$input*"; 183 }; 184 185 if(is_array($value)) { 186 $value = array_map($map, $value); 187 } else { 188 $value = $map($value); 189 } 190 return $value; 191 } 192 193 /** 194 * Change given string to use % instead of * 195 * 196 * @param string|string[] $value 197 * @return string|string[] 198 */ 199 protected function filterChangeToLike($value) { 200 $map = function ($input) { 201 return str_replace('*', '%', $input); 202 }; 203 204 if(is_array($value)) { 205 $value = array_map($map, $value); 206 } else { 207 $value = $map($value); 208 } 209 return $value; 210 } 211 212 /** 213 * Set offset for the results 214 * 215 * @param int $offset 216 */ 217 public function setOffset($offset) { 218 $limit = 0; 219 if($this->range_end) { 220 // if there was a limit set previously, the range_end needs to be recalculated 221 $limit = $this->range_end - $this->range_begin; 222 } 223 $this->range_begin = $offset; 224 if($limit) $this->setLimit($limit); 225 } 226 227 /** 228 * Limit results to this number 229 * 230 * @param int $limit Set to 0 to disable limit again 231 */ 232 public function setLimit($limit) { 233 if($limit) { 234 $this->range_end = $this->range_begin + $limit; 235 } else { 236 $this->range_end = 0; 237 } 238 } 239 240 /** 241 * Return the number of results (regardless of limit and offset settings) 242 * 243 * Use this to implement paging. Important: this may only be called after running @see execute() 244 * 245 * @return int 246 */ 247 public function getCount() { 248 if($this->count < 0) throw new StructException('Count is only accessible after executing the search'); 249 return $this->count; 250 } 251 252 /** 253 * Returns the PID associated with each result row 254 * 255 * Important: this may only be called after running @see execute() 256 * 257 * @return \string[] 258 */ 259 public function getPids() { 260 if($this->result_pids === null) throw new StructException('PIDs are only accessible after executing the search'); 261 return $this->result_pids; 262 } 263 264 /** 265 * Execute this search and return the result 266 * 267 * The result is a two dimensional array of Value()s. 268 * 269 * This will always query for the full result (not using offset and limit) and then 270 * return the wanted range, setting the count (@see getCount) to the whole result number 271 * 272 * @return Value[][] 273 */ 274 public function execute() { 275 list($sql, $opts) = $this->getSQL(); 276 277 /** @var \PDOStatement $res */ 278 $res = $this->sqlite->query($sql, $opts); 279 if($res === false) throw new StructException("SQL execution failed for\n\n$sql"); 280 281 $this->result_pids = array(); 282 $result = array(); 283 $cursor = -1; 284 while($row = $res->fetch(\PDO::FETCH_ASSOC)) { 285 $cursor++; 286 if($cursor < $this->range_begin) continue; 287 if($this->range_end && $cursor >= $this->range_end) continue; 288 289 $C = 0; 290 $resrow = array(); 291 $isempty = true; 292 foreach($this->columns as $col) { 293 $val = $row["C$C"]; 294 if($col->isMulti()) { 295 $val = explode(self::CONCAT_SEPARATOR, $val); 296 } 297 $value = new Value($col, $val); 298 $isempty &= $this->isEmptyValue($value, $row['PID']); 299 $resrow[] = $value; 300 $C++; 301 } 302 303 // skip empty rows 304 if($isempty) { 305 $cursor--; 306 continue; 307 } 308 309 $this->result_pids[] = $row['PID']; 310 $result[] = $resrow; 311 } 312 313 $this->sqlite->res_close($res); 314 $this->count = $cursor + 1; 315 return $result; 316 } 317 318 /** 319 * Transform the set search parameters into a statement 320 * 321 * @return array ($sql, $opts) The SQL and parameters to execute 322 */ 323 public function getSQL() { 324 if(!$this->columns) throw new StructException('nocolname'); 325 326 $QB = new QueryBuilder(); 327 328 // basic tables 329 $first_table = ''; 330 foreach($this->schemas as $schema) { 331 $datatable = 'data_' . $schema->getTable(); 332 if($first_table) { 333 // follow up tables 334 $QB->addLeftJoin($first_table, $datatable, $datatable, "$first_table.pid = $datatable.pid"); 335 } else { 336 // first table 337 338 if(!$schema->isLookup()) { 339 $QB->addTable('schema_assignments'); 340 $QB->filters()->whereAnd("$datatable.pid = schema_assignments.pid"); 341 $QB->filters()->whereAnd("schema_assignments.tbl = '{$schema->getTable()}'"); 342 $QB->filters()->whereAnd("schema_assignments.assigned = 1"); 343 $QB->filters()->whereAnd("GETACCESSLEVEL($datatable.pid) > 0"); 344 $QB->filters()->whereAnd("PAGEEXISTS($datatable.pid) = 1"); 345 } 346 347 $QB->addTable($datatable); 348 $QB->addSelectColumn($datatable, 'pid', 'PID'); 349 $QB->addGroupByColumn($datatable, 'pid'); 350 351 $first_table = $datatable; 352 } 353 $QB->filters()->whereAnd("$datatable.latest = 1"); 354 } 355 356 // columns to select, handling multis 357 $sep = self::CONCAT_SEPARATOR; 358 $n = 0; 359 foreach($this->columns as $col) { 360 $CN = 'C' . $n++; 361 362 if($col->isMulti()) { 363 $datatable = "data_{$col->getTable()}"; 364 $multitable = "multi_{$col->getTable()}"; 365 $MN = 'M' . $col->getColref(); 366 367 $QB->addLeftJoin( 368 $datatable, 369 $multitable, 370 $MN, 371 "$datatable.pid = $MN.pid AND 372 $datatable.rev = $MN.rev AND 373 $MN.colref = {$col->getColref()}" 374 ); 375 376 $col->getType()->select($QB, $MN, 'value', $CN); 377 $sel = $QB->getSelectStatement($CN); 378 $QB->addSelectStatement("GROUP_CONCAT($sel, '$sep')", $CN); 379 } else { 380 $col->getType()->select($QB, 'data_' . $col->getTable(), $col->getColName(), $CN); 381 $QB->addGroupByStatement($CN); 382 } 383 } 384 385 // where clauses 386 foreach($this->filter as $filter) { 387 list($col, $value, $comp, $op) = $filter; 388 389 $datatable = "data_{$col->getTable()}"; 390 $multitable = "multi_{$col->getTable()}"; 391 392 /** @var $col Column */ 393 if($col->isMulti()) { 394 $MN = 'MN' . $col->getColref(); // FIXME this joins a second time if the column was selected before 395 396 $QB->addLeftJoin( 397 $datatable, 398 $multitable, 399 $MN, 400 "$datatable.pid = $MN.pid AND 401 $datatable.rev = $MN.rev AND 402 $MN.colref = {$col->getColref()}" 403 ); 404 $coltbl = $MN; 405 $colnam = 'value'; 406 } else { 407 $coltbl = $datatable; 408 $colnam = $col->getColName(); 409 } 410 411 $col->getType()->filter($QB, $coltbl, $colnam, $comp, $value, $op); // type based filter 412 } 413 414 // sorting - we always sort by the single val column 415 foreach($this->sortby as $sort) { 416 list($col, $asc) = $sort; 417 /** @var $col Column */ 418 $col->getType()->sort($QB, 'data_' . $col->getTable(), $col->getColName(false), $asc ? 'ASC' : 'DESC'); 419 } 420 421 return $QB->getSQL(); 422 } 423 424 /** 425 * Returns all the columns that where added to the search 426 * 427 * @return Column[] 428 */ 429 public function getColumns() { 430 return $this->columns; 431 } 432 433 /** 434 * Checks if the given column is a * wildcard 435 * 436 * If it's a wildcard all matching columns are added to the column list, otherwise 437 * nothing happens 438 * 439 * @param string $colname 440 * @return bool was wildcard? 441 */ 442 protected function processWildcard($colname) { 443 list($colname, $table) = $this->resolveColumn($colname); 444 if($colname !== '*') return false; 445 446 // no table given? assume the first is meant 447 if($table === null) { 448 $schema_list = array_keys($this->schemas); 449 $table = $schema_list[0]; 450 } 451 452 $schema = $this->schemas[$table]; 453 if(!$schema) return false; 454 $this->columns = array_merge($this->columns, $schema->getColumns(false)); 455 return true; 456 } 457 458 /** 459 * Split a given column name into table and column 460 * 461 * Handles Aliases. Table might be null if none given. 462 * 463 * @param $colname 464 * @return array (colname, table) 465 */ 466 protected function resolveColumn($colname) { 467 if(!$this->schemas) throw new StructException('noschemas'); 468 469 // resolve the alias or table name 470 list($table, $colname) = explode('.', $colname, 2); 471 if(!$colname) { 472 $colname = $table; 473 $table = null; 474 } 475 if($table && isset($this->aliases[$table])) { 476 $table = $this->aliases[$table]; 477 } 478 479 if(!$colname) throw new StructException('nocolname'); 480 481 return array($colname, $table); 482 } 483 484 /** 485 * Find a column to be used in the search 486 * 487 * @param string $colname may contain an alias 488 * @return bool|Column 489 */ 490 public function findColumn($colname) { 491 if(!$this->schemas) throw new StructException('noschemas'); 492 $schema_list = array_keys($this->schemas); 493 494 // add "fake" column for special col 495 if(!(reset($this->schemas)->isLookup())) { 496 if($colname == '%pageid%') { 497 return new PageColumn(0, new Page(), $schema_list[0]); 498 } 499 if($colname == '%title%') { 500 return new PageColumn(0, new Page(array('usetitles' => true)), $schema_list[0]); 501 } 502 if($colname == '%lastupdate%') { 503 return new RevisionColumn(0, new DateTime(), $schema_list[0]); 504 } 505 } else { 506 if($colname == '%rowid%') { 507 return new RowColumn(0, new Decimal(), $schema_list[0]); 508 } 509 } 510 511 list($colname, $table) = $this->resolveColumn($colname); 512 513 // if table name given search only that, otherwise try all for matching column name 514 if($table !== null) { 515 $schemas = array($table => $this->schemas[$table]); 516 } else { 517 $schemas = $this->schemas; 518 } 519 520 // find it 521 $col = false; 522 foreach($schemas as $schema) { 523 if(empty($schema)) { 524 continue; 525 } 526 $col = $schema->findColumn($colname); 527 if($col) break; 528 } 529 530 return $col; 531 } 532 533 /** 534 * Check if the given row is empty or references our own row 535 * 536 * @param Value $value 537 * @param string $pid 538 * @return bool 539 */ 540 protected function isEmptyValue(Value $value, $pid) { 541 if($value->isEmpty()) return true; 542 if(is_a($value->getColumn()->getType(), Page::class) && $value->getRawValue() == $pid) return true; 543 return false; 544 } 545} 546 547 548