1<?php 2 3namespace dokuwiki\plugin\struct\meta; 4 5use dokuwiki\plugin\struct\types\DateTime; 6use dokuwiki\plugin\struct\types\Page; 7 8class Search { 9 /** 10 * This separator will be used to concat multi values to flatten them in the result set 11 */ 12 const CONCAT_SEPARATOR = "\n!_-_-_-_-_!\n"; 13 14 /** 15 * The list of known and allowed comparators 16 * (order matters) 17 */ 18 static public $COMPARATORS = array( 19 '<=', '>=', '=*', '=', '<', '>', '!=', '!~', '~', 20 ); 21 22 /** @var \helper_plugin_sqlite */ 23 protected $sqlite; 24 25 /** @var Schema[] list of schemas to query */ 26 protected $schemas = array(); 27 28 /** @var Column[] list of columns to select */ 29 protected $columns = array(); 30 31 /** @var array the sorting of the result */ 32 protected $sortby = array(); 33 34 /** @var array the filters */ 35 protected $filter = array(); 36 37 /** @var array list of aliases tables can be referenced by */ 38 protected $aliases = array(); 39 40 /** @var int begin results from here */ 41 protected $range_begin = 0; 42 43 /** @var int end results here */ 44 protected $range_end = 0; 45 46 /** @var int the number of results */ 47 protected $count = -1; 48 /** @var string[] the PIDs of the result rows */ 49 protected $result_pids = null; 50 51 /** 52 * Search constructor. 53 */ 54 public function __construct() { 55 /** @var \helper_plugin_struct_db $plugin */ 56 $plugin = plugin_load('helper', 'struct_db'); 57 $this->sqlite = $plugin->getDB(); 58 } 59 60 /** 61 * Add a schema to be searched 62 * 63 * Call multiple times for multiple schemas. 64 * 65 * @param string $table 66 * @param string $alias 67 */ 68 public function addSchema($table, $alias = '') { 69 $schema = new Schema($table); 70 if(!$schema->getId()) { 71 throw new StructException('schema missing', $table); 72 } 73 74 if($this->schemas && 75 ( 76 $schema->isLookup() || 77 reset($this->schemas)->isLookup() 78 ) 79 ) { 80 throw new StructException('nolookupmix'); 81 } 82 83 $this->schemas[$table] = $schema; 84 if($alias) $this->aliases[$alias] = $table; 85 } 86 87 /** 88 * Add a column to be returned by the search 89 * 90 * Call multiple times for multiple columns. Be sure the referenced tables have been 91 * added before 92 * 93 * @param string $colname may contain an alias 94 */ 95 public function addColumn($colname) { 96 if($this->processWildcard($colname)) return; // wildcard? 97 $col = $this->findColumn($colname); 98 if(!$col) return; //FIXME do we really want to ignore missing columns? 99 $this->columns[] = $col; 100 } 101 102 /** 103 * Add sorting options 104 * 105 * Call multiple times for multiple columns. Be sure the referenced tables have been 106 * added before 107 * 108 * @param string $colname may contain an alias 109 * @param bool $asc sort direction (ASC = true, DESC = false) 110 */ 111 public function addSort($colname, $asc = true) { 112 $col = $this->findColumn($colname); 113 if(!$col) return; //FIXME do we really want to ignore missing columns? 114 115 $this->sortby[$col->getFullQualifiedLabel()] = array($col, $asc); 116 } 117 118 /** 119 * Returns all set sort columns 120 * 121 * @return array 122 */ 123 public function getSorts() { 124 return $this->sortby; 125 } 126 127 /** 128 * Adds a filter 129 * 130 * @param string $colname may contain an alias 131 * @param string|string[] $value 132 * @param string $comp @see self::COMPARATORS 133 * @param string $op either 'OR' or 'AND' 134 */ 135 public function addFilter($colname, $value, $comp, $op = 'OR') { 136 /* Convert certain filters into others 137 * this reduces the number of supported filters to implement in types */ 138 if($comp == '*~') { 139 $value = $this->filterWrapAsterisks($value); 140 $comp = '~'; 141 } elseif($comp == '<>') { 142 $comp = '!='; 143 } 144 145 if(!in_array($comp, self::$COMPARATORS)) throw new StructException("Bad comperator. Use " . join(',', self::$COMPARATORS)); 146 if($op != 'OR' && $op != 'AND') throw new StructException('Bad filter type . Only AND or OR allowed'); 147 148 $col = $this->findColumn($colname); 149 if(!$col) return; // ignore missing columns, filter might have been for different schema 150 151 // map filter operators to SQL syntax 152 switch($comp) { 153 case '~': 154 $comp = 'LIKE'; 155 break; 156 case '!~': 157 $comp = 'NOT LIKE'; 158 break; 159 case '=*': 160 $comp = 'REGEXP'; 161 break; 162 } 163 164 // we use asterisks, but SQL wants percents 165 if($comp == 'LIKE' || $comp == 'NOT LIKE') { 166 $value = $this->filterChangeToLike($value); 167 } 168 169 // add the filter 170 $this->filter[] = array($col, $value, $comp, $op); 171 } 172 173 /** 174 * Wrap given value in asterisks 175 * 176 * @param string|string[] $value 177 * @return string|string[] 178 */ 179 protected function filterWrapAsterisks($value) { 180 $map = function ($input) { 181 return "*$input*"; 182 }; 183 184 if(is_array($value)) { 185 $value = array_map($map, $value); 186 } else { 187 $value = $map($value); 188 } 189 return $value; 190 } 191 192 /** 193 * Change given string to use % instead of * 194 * 195 * @param string|string[] $value 196 * @return string|string[] 197 */ 198 protected function filterChangeToLike($value) { 199 $map = function ($input) { 200 return str_replace('*', '%', $input); 201 }; 202 203 if(is_array($value)) { 204 $value = array_map($map, $value); 205 } else { 206 $value = $map($value); 207 } 208 return $value; 209 } 210 211 /** 212 * Set offset for the results 213 * 214 * @param int $offset 215 */ 216 public function setOffset($offset) { 217 $limit = 0; 218 if($this->range_end) { 219 // if there was a limit set previously, the range_end needs to be recalculated 220 $limit = $this->range_end - $this->range_begin; 221 } 222 $this->range_begin = $offset; 223 if($limit) $this->setLimit($limit); 224 } 225 226 /** 227 * Limit results to this number 228 * 229 * @param int $limit Set to 0 to disable limit again 230 */ 231 public function setLimit($limit) { 232 if($limit) { 233 $this->range_end = $this->range_begin + $limit; 234 } else { 235 $this->range_end = 0; 236 } 237 } 238 239 /** 240 * Return the number of results (regardless of limit and offset settings) 241 * 242 * Use this to implement paging. Important: this may only be called after running @see execute() 243 * 244 * @return int 245 */ 246 public function getCount() { 247 if($this->count < 0) throw new StructException('Count is only accessible after executing the search'); 248 return $this->count; 249 } 250 251 /** 252 * Returns the PID associated with each result row 253 * 254 * Important: this may only be called after running @see execute() 255 * 256 * @return \string[] 257 */ 258 public function getPids() { 259 if($this->result_pids === null) throw new StructException('PIDs are only accessible after executing the search'); 260 return $this->result_pids; 261 } 262 263 /** 264 * Execute this search and return the result 265 * 266 * The result is a two dimensional array of Value()s. 267 * 268 * This will always query for the full result (not using offset and limit) and then 269 * return the wanted range, setting the count (@see getCount) to the whole result number 270 * 271 * @return Value[][] 272 */ 273 public function execute() { 274 list($sql, $opts) = $this->getSQL(); 275 276 /** @var \PDOStatement $res */ 277 $res = $this->sqlite->query($sql, $opts); 278 if($res === false) throw new StructException("SQL execution failed for\n\n$sql"); 279 280 $this->result_pids = array(); 281 $result = array(); 282 $cursor = -1; 283 $pageidAndRevOnly = array_reduce($this->columns, function ($pageidAndRevOnly, Column $col) { 284 return $pageidAndRevOnly && ($col->getTid() == 0); 285 }, true); 286 while($row = $res->fetch(\PDO::FETCH_ASSOC)) { 287 $cursor++; 288 if($cursor < $this->range_begin) continue; 289 if($this->range_end && $cursor >= $this->range_end) continue; 290 291 $C = 0; 292 $resrow = array(); 293 $isempty = true; 294 foreach($this->columns as $col) { 295 $val = $row["C$C"]; 296 if($col->isMulti()) { 297 $val = explode(self::CONCAT_SEPARATOR, $val); 298 } 299 $value = new Value($col, $val); 300 $isempty &= $this->isEmptyValue($value); 301 $resrow[] = $value; 302 $C++; 303 } 304 305 // skip empty rows 306 if($isempty && !$pageidAndRevOnly) { 307 $cursor--; 308 continue; 309 } 310 311 $this->result_pids[] = $row['PID']; 312 $result[] = $resrow; 313 } 314 315 $this->sqlite->res_close($res); 316 $this->count = $cursor + 1; 317 return $result; 318 } 319 320 /** 321 * Transform the set search parameters into a statement 322 * 323 * @return array ($sql, $opts) The SQL and parameters to execute 324 */ 325 public function getSQL() { 326 if(!$this->columns) throw new StructException('nocolname'); 327 328 $QB = new QueryBuilder(); 329 330 // basic tables 331 $first_table = ''; 332 foreach($this->schemas as $schema) { 333 $datatable = 'data_' . $schema->getTable(); 334 if($first_table) { 335 // follow up tables 336 $QB->addLeftJoin($first_table, $datatable, $datatable, "$first_table.pid = $datatable.pid"); 337 } else { 338 // first table 339 340 if(!$schema->isLookup()) { 341 $QB->addTable('schema_assignments'); 342 $QB->filters()->whereAnd("$datatable.pid = schema_assignments.pid"); 343 $QB->filters()->whereAnd("schema_assignments.tbl = '{$schema->getTable()}'"); 344 $QB->filters()->whereAnd("schema_assignments.assigned = 1"); 345 $QB->filters()->whereAnd("GETACCESSLEVEL($datatable.pid) > 0"); 346 $QB->filters()->whereAnd("PAGEEXISTS($datatable.pid) = 1"); 347 } 348 349 $QB->addTable($datatable); 350 $QB->addSelectColumn($datatable, 'pid', 'PID'); 351 $QB->addGroupByColumn($datatable, 'pid'); 352 353 $first_table = $datatable; 354 } 355 $QB->filters()->whereAnd("$datatable.latest = 1"); 356 } 357 358 // columns to select, handling multis 359 $sep = self::CONCAT_SEPARATOR; 360 $n = 0; 361 foreach($this->columns as $col) { 362 $CN = 'C' . $n++; 363 364 if($col->isMulti()) { 365 $datatable = "data_{$col->getTable()}"; 366 $multitable = "multi_{$col->getTable()}"; 367 $MN = 'M' . $col->getColref(); 368 369 $QB->addLeftJoin( 370 $datatable, 371 $multitable, 372 $MN, 373 "$datatable.pid = $MN.pid AND 374 $datatable.rev = $MN.rev AND 375 $MN.colref = {$col->getColref()}" 376 ); 377 378 $col->getType()->select($QB, $MN, 'value', $CN); 379 $sel = $QB->getSelectStatement($CN); 380 $QB->addSelectStatement("GROUP_CONCAT($sel, '$sep')", $CN); 381 } else { 382 $col->getType()->select($QB, 'data_' . $col->getTable(), $col->getColName(), $CN); 383 $QB->addGroupByStatement($CN); 384 } 385 } 386 387 // where clauses 388 foreach($this->filter as $filter) { 389 list($col, $value, $comp, $op) = $filter; 390 391 $datatable = "data_{$col->getTable()}"; 392 $multitable = "multi_{$col->getTable()}"; 393 394 /** @var $col Column */ 395 if($col->isMulti()) { 396 $MN = 'MN' . $col->getColref(); // FIXME this joins a second time if the column was selected before 397 398 $QB->addLeftJoin( 399 $datatable, 400 $multitable, 401 $MN, 402 "$datatable.pid = $MN.pid AND 403 $datatable.rev = $MN.rev AND 404 $MN.colref = {$col->getColref()}" 405 ); 406 $coltbl = $MN; 407 $colnam = 'value'; 408 } else { 409 $coltbl = $datatable; 410 $colnam = $col->getColName(); 411 } 412 413 $col->getType()->filter($QB, $coltbl, $colnam, $comp, $value, $op); // type based filter 414 } 415 416 // sorting - we always sort by the single val column 417 foreach($this->sortby as $sort) { 418 list($col, $asc) = $sort; 419 /** @var $col Column */ 420 $col->getType()->sort($QB, 'data_' . $col->getTable(), $col->getColName(false), $asc ? 'ASC' : 'DESC'); 421 } 422 423 return $QB->getSQL(); 424 } 425 426 /** 427 * Returns all the columns that where added to the search 428 * 429 * @return Column[] 430 */ 431 public function getColumns() { 432 return $this->columns; 433 } 434 435 /** 436 * Checks if the given column is a * wildcard 437 * 438 * If it's a wildcard all matching columns are added to the column list, otherwise 439 * nothing happens 440 * 441 * @param string $colname 442 * @return bool was wildcard? 443 */ 444 protected function processWildcard($colname) { 445 list($colname, $table) = $this->resolveColumn($colname); 446 if($colname !== '*') return false; 447 448 // no table given? assume the first is meant 449 if($table === null) { 450 $schema_list = array_keys($this->schemas); 451 $table = $schema_list[0]; 452 } 453 454 $schema = $this->schemas[$table]; 455 if(!$schema) return false; 456 $this->columns = array_merge($this->columns, $schema->getColumns(false)); 457 return true; 458 } 459 460 /** 461 * Split a given column name into table and column 462 * 463 * Handles Aliases. Table might be null if none given. 464 * 465 * @param $colname 466 * @return array (colname, table) 467 */ 468 protected function resolveColumn($colname) { 469 if(!$this->schemas) throw new StructException('noschemas'); 470 471 // resolve the alias or table name 472 list($table, $colname) = explode('.', $colname, 2); 473 if(!$colname) { 474 $colname = $table; 475 $table = null; 476 } 477 if($table && isset($this->aliases[$table])) { 478 $table = $this->aliases[$table]; 479 } 480 481 if(!$colname) throw new StructException('nocolname'); 482 483 return array($colname, $table); 484 } 485 486 /** 487 * Find a column to be used in the search 488 * 489 * @param string $colname may contain an alias 490 * @return bool|Column 491 */ 492 public function findColumn($colname) { 493 if(!$this->schemas) throw new StructException('noschemas'); 494 495 // add "fake" column for special col (unless it's a lookup) 496 if(!(reset($this->schemas)->isLookup())) { 497 $schema_list = array_keys($this->schemas); 498 if($colname == '%pageid%') { 499 return new PageColumn(0, new Page(), $schema_list[0]); 500 } 501 if($colname == '%title%') { 502 return new PageColumn(0, new Page(array('usetitles' => true)), $schema_list[0]); 503 } 504 if($colname == '%lastupdate%') { 505 return new RevisionColumn(0, new DateTime(), $schema_list[0]); 506 } 507 } 508 509 list($colname, $table) = $this->resolveColumn($colname); 510 511 // if table name given search only that, otherwise try all for matching column name 512 if($table !== null) { 513 $schemas = array($table => $this->schemas[$table]); 514 } else { 515 $schemas = $this->schemas; 516 } 517 518 // find it 519 $col = false; 520 foreach($schemas as $schema) { 521 if(empty($schema)) { 522 continue; 523 } 524 $col = $schema->findColumn($colname); 525 if($col) break; 526 } 527 528 return $col; 529 } 530 531 /** 532 * Check if the given row is empty or references our own row 533 * 534 * @param Value $value 535 * @return bool 536 */ 537 protected function isEmptyValue(Value $value) { 538 if ($value->isEmpty()) return true; 539 if ($value->getColumn()->getTid() == 0) return true; 540 return false; 541 } 542} 543 544 545