1<?php 2 3namespace dokuwiki\plugin\struct\meta; 4 5use dokuwiki\plugin\struct\types\DateTime; 6use dokuwiki\plugin\struct\types\Decimal; 7use dokuwiki\plugin\struct\types\Page; 8 9class Search { 10 /** 11 * This separator will be used to concat multi values to flatten them in the result set 12 */ 13 const CONCAT_SEPARATOR = "\n!_-_-_-_-_!\n"; 14 15 /** 16 * The list of known and allowed comparators 17 * (order matters) 18 */ 19 static public $COMPARATORS = array( 20 '<=', '>=', '=*', '=', '<', '>', '!=', '!~', '~', 21 ); 22 23 /** @var \helper_plugin_sqlite */ 24 protected $sqlite; 25 26 /** @var Schema[] list of schemas to query */ 27 protected $schemas = array(); 28 29 /** @var Column[] list of columns to select */ 30 protected $columns = array(); 31 32 /** @var array the sorting of the result */ 33 protected $sortby = array(); 34 35 /** @var array the filters */ 36 protected $filter = array(); 37 38 /** @var array list of aliases tables can be referenced by */ 39 protected $aliases = array(); 40 41 /** @var int begin results from here */ 42 protected $range_begin = 0; 43 44 /** @var int end results here */ 45 protected $range_end = 0; 46 47 /** @var int the number of results */ 48 protected $count = -1; 49 /** @var string[] the PIDs of the result rows */ 50 protected $result_pids = null; 51 52 /** 53 * Search constructor. 54 */ 55 public function __construct() { 56 /** @var \helper_plugin_struct_db $plugin */ 57 $plugin = plugin_load('helper', 'struct_db'); 58 $this->sqlite = $plugin->getDB(); 59 } 60 61 /** 62 * Add a schema to be searched 63 * 64 * Call multiple times for multiple schemas. 65 * 66 * @param string $table 67 * @param string $alias 68 */ 69 public function addSchema($table, $alias = '') { 70 $schema = new Schema($table); 71 if(!$schema->getId()) { 72 throw new StructException('schema missing', $table); 73 } 74 75 if($this->schemas && 76 ( 77 $schema->isLookup() || 78 reset($this->schemas)->isLookup() 79 ) 80 ) { 81 throw new StructException('nolookupmix'); 82 } 83 84 $this->schemas[$table] = $schema; 85 if($alias) $this->aliases[$alias] = $table; 86 } 87 88 /** 89 * Add a column to be returned by the search 90 * 91 * Call multiple times for multiple columns. Be sure the referenced tables have been 92 * added before 93 * 94 * @param string $colname may contain an alias 95 */ 96 public function addColumn($colname) { 97 if($this->processWildcard($colname)) return; // wildcard? 98 $col = $this->findColumn($colname); 99 if(!$col) return; //FIXME do we really want to ignore missing columns? 100 $this->columns[] = $col; 101 } 102 103 /** 104 * Add sorting options 105 * 106 * Call multiple times for multiple columns. Be sure the referenced tables have been 107 * added before 108 * 109 * @param string $colname may contain an alias 110 * @param bool $asc sort direction (ASC = true, DESC = false) 111 */ 112 public function addSort($colname, $asc = true) { 113 $col = $this->findColumn($colname); 114 if(!$col) return; //FIXME do we really want to ignore missing columns? 115 116 $this->sortby[$col->getFullQualifiedLabel()] = array($col, $asc); 117 } 118 119 /** 120 * Returns all set sort columns 121 * 122 * @return array 123 */ 124 public function getSorts() { 125 return $this->sortby; 126 } 127 128 /** 129 * Adds a filter 130 * 131 * @param string $colname may contain an alias 132 * @param string|string[] $value 133 * @param string $comp @see self::COMPARATORS 134 * @param string $op either 'OR' or 'AND' 135 */ 136 public function addFilter($colname, $value, $comp, $op = 'OR') { 137 /* Convert certain filters into others 138 * this reduces the number of supported filters to implement in types */ 139 if($comp == '*~') { 140 $value = $this->filterWrapAsterisks($value); 141 $comp = '~'; 142 } elseif($comp == '<>') { 143 $comp = '!='; 144 } 145 146 if(!in_array($comp, self::$COMPARATORS)) throw new StructException("Bad comperator. Use " . join(',', self::$COMPARATORS)); 147 if($op != 'OR' && $op != 'AND') throw new StructException('Bad filter type . Only AND or OR allowed'); 148 149 $col = $this->findColumn($colname); 150 if(!$col) return; // ignore missing columns, filter might have been for different schema 151 152 // map filter operators to SQL syntax 153 switch($comp) { 154 case '~': 155 $comp = 'LIKE'; 156 break; 157 case '!~': 158 $comp = 'NOT LIKE'; 159 break; 160 case '=*': 161 $comp = 'REGEXP'; 162 break; 163 } 164 165 // we use asterisks, but SQL wants percents 166 if($comp == 'LIKE' || $comp == 'NOT LIKE') { 167 $value = $this->filterChangeToLike($value); 168 } 169 170 // add the filter 171 $this->filter[] = array($col, $value, $comp, $op); 172 } 173 174 /** 175 * Wrap given value in asterisks 176 * 177 * @param string|string[] $value 178 * @return string|string[] 179 */ 180 protected function filterWrapAsterisks($value) { 181 $map = function ($input) { 182 return "*$input*"; 183 }; 184 185 if(is_array($value)) { 186 $value = array_map($map, $value); 187 } else { 188 $value = $map($value); 189 } 190 return $value; 191 } 192 193 /** 194 * Change given string to use % instead of * 195 * 196 * @param string|string[] $value 197 * @return string|string[] 198 */ 199 protected function filterChangeToLike($value) { 200 $map = function ($input) { 201 return str_replace('*', '%', $input); 202 }; 203 204 if(is_array($value)) { 205 $value = array_map($map, $value); 206 } else { 207 $value = $map($value); 208 } 209 return $value; 210 } 211 212 /** 213 * Set offset for the results 214 * 215 * @param int $offset 216 */ 217 public function setOffset($offset) { 218 $limit = 0; 219 if($this->range_end) { 220 // if there was a limit set previously, the range_end needs to be recalculated 221 $limit = $this->range_end - $this->range_begin; 222 } 223 $this->range_begin = $offset; 224 if($limit) $this->setLimit($limit); 225 } 226 227 /** 228 * Limit results to this number 229 * 230 * @param int $limit Set to 0 to disable limit again 231 */ 232 public function setLimit($limit) { 233 if($limit) { 234 $this->range_end = $this->range_begin + $limit; 235 } else { 236 $this->range_end = 0; 237 } 238 } 239 240 /** 241 * Return the number of results (regardless of limit and offset settings) 242 * 243 * Use this to implement paging. Important: this may only be called after running @see execute() 244 * 245 * @return int 246 */ 247 public function getCount() { 248 if($this->count < 0) throw new StructException('Count is only accessible after executing the search'); 249 return $this->count; 250 } 251 252 /** 253 * Returns the PID associated with each result row 254 * 255 * Important: this may only be called after running @see execute() 256 * 257 * @return \string[] 258 */ 259 public function getPids() { 260 if($this->result_pids === null) throw new StructException('PIDs are only accessible after executing the search'); 261 return $this->result_pids; 262 } 263 264 /** 265 * Execute this search and return the result 266 * 267 * The result is a two dimensional array of Value()s. 268 * 269 * This will always query for the full result (not using offset and limit) and then 270 * return the wanted range, setting the count (@see getCount) to the whole result number 271 * 272 * @return Value[][] 273 */ 274 public function execute() { 275 list($sql, $opts) = $this->getSQL(); 276 277 /** @var \PDOStatement $res */ 278 $res = $this->sqlite->query($sql, $opts); 279 if($res === false) throw new StructException("SQL execution failed for\n\n$sql"); 280 281 $this->result_pids = array(); 282 $result = array(); 283 $cursor = -1; 284 $pageidAndRevOnly = array_reduce($this->columns, function ($pageidAndRevOnly, Column $col) { 285 return $pageidAndRevOnly && ($col->getTid() == 0); 286 }, true); 287 while($row = $res->fetch(\PDO::FETCH_ASSOC)) { 288 $cursor++; 289 if($cursor < $this->range_begin) continue; 290 if($this->range_end && $cursor >= $this->range_end) continue; 291 292 $C = 0; 293 $resrow = array(); 294 $isempty = true; 295 foreach($this->columns as $col) { 296 $val = $row["C$C"]; 297 if($col->isMulti()) { 298 $val = explode(self::CONCAT_SEPARATOR, $val); 299 } 300 $value = new Value($col, $val); 301 $isempty &= $this->isEmptyValue($value); 302 $resrow[] = $value; 303 $C++; 304 } 305 306 // skip empty rows 307 if($isempty && !$pageidAndRevOnly) { 308 $cursor--; 309 continue; 310 } 311 312 $this->result_pids[] = $row['PID']; 313 $result[] = $resrow; 314 } 315 316 $this->sqlite->res_close($res); 317 $this->count = $cursor + 1; 318 return $result; 319 } 320 321 /** 322 * Transform the set search parameters into a statement 323 * 324 * @return array ($sql, $opts) The SQL and parameters to execute 325 */ 326 public function getSQL() { 327 if(!$this->columns) throw new StructException('nocolname'); 328 329 $QB = new QueryBuilder(); 330 331 // basic tables 332 $first_table = ''; 333 foreach($this->schemas as $schema) { 334 $datatable = 'data_' . $schema->getTable(); 335 if($first_table) { 336 // follow up tables 337 $QB->addLeftJoin($first_table, $datatable, $datatable, "$first_table.pid = $datatable.pid"); 338 } else { 339 // first table 340 341 if(!$schema->isLookup()) { 342 $QB->addTable('schema_assignments'); 343 $QB->filters()->whereAnd("$datatable.pid = schema_assignments.pid"); 344 $QB->filters()->whereAnd("schema_assignments.tbl = '{$schema->getTable()}'"); 345 $QB->filters()->whereAnd("schema_assignments.assigned = 1"); 346 $QB->filters()->whereAnd("GETACCESSLEVEL($datatable.pid) > 0"); 347 $QB->filters()->whereAnd("PAGEEXISTS($datatable.pid) = 1"); 348 } 349 350 $QB->addTable($datatable); 351 $QB->addSelectColumn($datatable, 'pid', 'PID'); 352 $QB->addGroupByColumn($datatable, 'pid'); 353 354 $first_table = $datatable; 355 } 356 $QB->filters()->whereAnd("$datatable.latest = 1"); 357 } 358 359 // columns to select, handling multis 360 $sep = self::CONCAT_SEPARATOR; 361 $n = 0; 362 foreach($this->columns as $col) { 363 $CN = 'C' . $n++; 364 365 if($col->isMulti()) { 366 $datatable = "data_{$col->getTable()}"; 367 $multitable = "multi_{$col->getTable()}"; 368 $MN = 'M' . $col->getColref(); 369 370 $QB->addLeftJoin( 371 $datatable, 372 $multitable, 373 $MN, 374 "$datatable.pid = $MN.pid AND 375 $datatable.rev = $MN.rev AND 376 $MN.colref = {$col->getColref()}" 377 ); 378 379 $col->getType()->select($QB, $MN, 'value', $CN); 380 $sel = $QB->getSelectStatement($CN); 381 $QB->addSelectStatement("GROUP_CONCAT($sel, '$sep')", $CN); 382 } else { 383 $col->getType()->select($QB, 'data_' . $col->getTable(), $col->getColName(), $CN); 384 $QB->addGroupByStatement($CN); 385 } 386 } 387 388 // where clauses 389 foreach($this->filter as $filter) { 390 list($col, $value, $comp, $op) = $filter; 391 392 $datatable = "data_{$col->getTable()}"; 393 $multitable = "multi_{$col->getTable()}"; 394 395 /** @var $col Column */ 396 if($col->isMulti()) { 397 $MN = 'MN' . $col->getColref(); // FIXME this joins a second time if the column was selected before 398 399 $QB->addLeftJoin( 400 $datatable, 401 $multitable, 402 $MN, 403 "$datatable.pid = $MN.pid AND 404 $datatable.rev = $MN.rev AND 405 $MN.colref = {$col->getColref()}" 406 ); 407 $coltbl = $MN; 408 $colnam = 'value'; 409 } else { 410 $coltbl = $datatable; 411 $colnam = $col->getColName(); 412 } 413 414 $col->getType()->filter($QB, $coltbl, $colnam, $comp, $value, $op); // type based filter 415 } 416 417 // sorting - we always sort by the single val column 418 foreach($this->sortby as $sort) { 419 list($col, $asc) = $sort; 420 /** @var $col Column */ 421 $col->getType()->sort($QB, 'data_' . $col->getTable(), $col->getColName(false), $asc ? 'ASC' : 'DESC'); 422 } 423 424 return $QB->getSQL(); 425 } 426 427 /** 428 * Returns all the columns that where added to the search 429 * 430 * @return Column[] 431 */ 432 public function getColumns() { 433 return $this->columns; 434 } 435 436 /** 437 * Checks if the given column is a * wildcard 438 * 439 * If it's a wildcard all matching columns are added to the column list, otherwise 440 * nothing happens 441 * 442 * @param string $colname 443 * @return bool was wildcard? 444 */ 445 protected function processWildcard($colname) { 446 list($colname, $table) = $this->resolveColumn($colname); 447 if($colname !== '*') return false; 448 449 // no table given? assume the first is meant 450 if($table === null) { 451 $schema_list = array_keys($this->schemas); 452 $table = $schema_list[0]; 453 } 454 455 $schema = $this->schemas[$table]; 456 if(!$schema) return false; 457 $this->columns = array_merge($this->columns, $schema->getColumns(false)); 458 return true; 459 } 460 461 /** 462 * Split a given column name into table and column 463 * 464 * Handles Aliases. Table might be null if none given. 465 * 466 * @param $colname 467 * @return array (colname, table) 468 */ 469 protected function resolveColumn($colname) { 470 if(!$this->schemas) throw new StructException('noschemas'); 471 472 // resolve the alias or table name 473 list($table, $colname) = explode('.', $colname, 2); 474 if(!$colname) { 475 $colname = $table; 476 $table = null; 477 } 478 if($table && isset($this->aliases[$table])) { 479 $table = $this->aliases[$table]; 480 } 481 482 if(!$colname) throw new StructException('nocolname'); 483 484 return array($colname, $table); 485 } 486 487 /** 488 * Find a column to be used in the search 489 * 490 * @param string $colname may contain an alias 491 * @return bool|Column 492 */ 493 public function findColumn($colname) { 494 if(!$this->schemas) throw new StructException('noschemas'); 495 $schema_list = array_keys($this->schemas); 496 497 // add "fake" column for special col 498 if(!(reset($this->schemas)->isLookup())) { 499 if($colname == '%pageid%') { 500 return new PageColumn(0, new Page(), $schema_list[0]); 501 } 502 if($colname == '%title%') { 503 return new PageColumn(0, new Page(array('usetitles' => true)), $schema_list[0]); 504 } 505 if($colname == '%lastupdate%') { 506 return new RevisionColumn(0, new DateTime(), $schema_list[0]); 507 } 508 } else { 509 if($colname == '%rowid%') { 510 return new RowColumn(0, new Decimal(), $schema_list[0]); 511 } 512 } 513 514 list($colname, $table) = $this->resolveColumn($colname); 515 516 // if table name given search only that, otherwise try all for matching column name 517 if($table !== null) { 518 $schemas = array($table => $this->schemas[$table]); 519 } else { 520 $schemas = $this->schemas; 521 } 522 523 // find it 524 $col = false; 525 foreach($schemas as $schema) { 526 if(empty($schema)) { 527 continue; 528 } 529 $col = $schema->findColumn($colname); 530 if($col) break; 531 } 532 533 return $col; 534 } 535 536 /** 537 * Check if the given row is empty or references our own row 538 * 539 * @param Value $value 540 * @return bool 541 */ 542 protected function isEmptyValue(Value $value) { 543 if ($value->isEmpty()) return true; 544 if ($value->getColumn()->getTid() == 0) return true; 545 return false; 546 } 547} 548 549 550