1<?php 2 3namespace dokuwiki\plugin\struct\meta; 4 5use dokuwiki\plugin\struct\types\DateTime; 6use dokuwiki\plugin\struct\types\Page; 7 8class Search { 9 /** 10 * This separator will be used to concat multi values to flatten them in the result set 11 */ 12 const CONCAT_SEPARATOR = "\n!_-_-_-_-_!\n"; 13 14 /** 15 * The list of known and allowed comparators 16 * (order matters) 17 */ 18 static public $COMPARATORS = array( 19 '<=', '>=', '=*', '=', '<', '>', '!=', '!~', '~', 20 ); 21 22 /** @var \helper_plugin_sqlite */ 23 protected $sqlite; 24 25 /** @var Schema[] list of schemas to query */ 26 protected $schemas = array(); 27 28 /** @var Column[] list of columns to select */ 29 protected $columns = array(); 30 31 /** @var array the sorting of the result */ 32 protected $sortby = array(); 33 34 /** @var array the filters */ 35 protected $filter = array(); 36 37 /** @var array list of aliases tables can be referenced by */ 38 protected $aliases = array(); 39 40 /** @var int begin results from here */ 41 protected $range_begin = 0; 42 43 /** @var int end results here */ 44 protected $range_end = 0; 45 46 /** @var int the number of results */ 47 protected $count = -1; 48 /** @var string[] the PIDs of the result rows */ 49 protected $result_pids = null; 50 51 /** 52 * Search constructor. 53 */ 54 public function __construct() { 55 /** @var \helper_plugin_struct_db $plugin */ 56 $plugin = plugin_load('helper', 'struct_db'); 57 $this->sqlite = $plugin->getDB(); 58 } 59 60 /** 61 * Add a schema to be searched 62 * 63 * Call multiple times for multiple schemas. 64 * 65 * @param string $table 66 * @param string $alias 67 */ 68 public function addSchema($table, $alias = '') { 69 $schema = new Schema($table); 70 if(!$schema->getId()) { 71 throw new StructException('schema missing', $table); 72 } 73 74 if($this->schemas && 75 ( 76 $schema->isLookup() || 77 reset($this->schemas)->isLookup() 78 ) 79 ) { 80 throw new StructException('nolookupmix'); 81 } 82 83 $this->schemas[$table] = $schema; 84 if($alias) $this->aliases[$alias] = $table; 85 } 86 87 /** 88 * Add a column to be returned by the search 89 * 90 * Call multiple times for multiple columns. Be sure the referenced tables have been 91 * added before 92 * 93 * @param string $colname may contain an alias 94 */ 95 public function addColumn($colname) { 96 if($this->processWildcard($colname)) return; // wildcard? 97 $col = $this->findColumn($colname); 98 if(!$col) return; //FIXME do we really want to ignore missing columns? 99 $this->columns[] = $col; 100 } 101 102 /** 103 * Add sorting options 104 * 105 * Call multiple times for multiple columns. Be sure the referenced tables have been 106 * added before 107 * 108 * @param string $colname may contain an alias 109 * @param bool $asc sort direction (ASC = true, DESC = false) 110 */ 111 public function addSort($colname, $asc = true) { 112 $col = $this->findColumn($colname); 113 if(!$col) return; //FIXME do we really want to ignore missing columns? 114 115 $this->sortby[$col->getFullQualifiedLabel()] = array($col, $asc); 116 } 117 118 /** 119 * Returns all set sort columns 120 * 121 * @return array 122 */ 123 public function getSorts() { 124 return $this->sortby; 125 } 126 127 /** 128 * Adds a filter 129 * 130 * @param string $colname may contain an alias 131 * @param string|string[] $value 132 * @param string $comp @see self::COMPARATORS 133 * @param string $op either 'OR' or 'AND' 134 */ 135 public function addFilter($colname, $value, $comp, $op = 'OR') { 136 /* Convert certain filters into others 137 * this reduces the number of supported filters to implement in types */ 138 if($comp == '*~') { 139 $value = $this->filterWrapAsterisks($value); 140 $comp = '~'; 141 } elseif($comp == '<>') { 142 $comp = '!='; 143 } 144 145 if(!in_array($comp, self::$COMPARATORS)) throw new StructException("Bad comperator. Use " . join(',', self::$COMPARATORS)); 146 if($op != 'OR' && $op != 'AND') throw new StructException('Bad filter type . Only AND or OR allowed'); 147 148 $col = $this->findColumn($colname); 149 if(!$col) return; // ignore missing columns, filter might have been for different schema 150 151 // map filter operators to SQL syntax 152 switch($comp) { 153 case '~': 154 $comp = 'LIKE'; 155 break; 156 case '!~': 157 $comp = 'NOT LIKE'; 158 break; 159 case '=*': 160 $comp = 'REGEXP'; 161 break; 162 } 163 164 // we use asterisks, but SQL wants percents 165 if($comp == 'LIKE' || $comp == 'NOT LIKE') { 166 $value = $this->filterChangeToLike($value); 167 } 168 169 // add the filter 170 $this->filter[] = array($col, $value, $comp, $op); 171 } 172 173 /** 174 * Wrap given value in asterisks 175 * 176 * @param string|string[] $value 177 * @return string|string[] 178 */ 179 protected function filterWrapAsterisks($value) { 180 $map = function ($input) { 181 return "*$input*"; 182 }; 183 184 if(is_array($value)) { 185 $value = array_map($map, $value); 186 } else { 187 $value = $map($value); 188 } 189 return $value; 190 } 191 192 /** 193 * Change given string to use % instead of * 194 * 195 * @param string|string[] $value 196 * @return string|string[] 197 */ 198 protected function filterChangeToLike($value) { 199 $map = function ($input) { 200 return str_replace('*', '%', $input); 201 }; 202 203 if(is_array($value)) { 204 $value = array_map($map, $value); 205 } else { 206 $value = $map($value); 207 } 208 return $value; 209 } 210 211 /** 212 * Set offset for the results 213 * 214 * @param int $offset 215 */ 216 public function setOffset($offset) { 217 $limit = 0; 218 if($this->range_end) { 219 // if there was a limit set previously, the range_end needs to be recalculated 220 $limit = $this->range_end - $this->range_begin; 221 } 222 $this->range_begin = $offset; 223 if($limit) $this->setLimit($limit); 224 } 225 226 /** 227 * Limit results to this number 228 * 229 * @param int $limit Set to 0 to disable limit again 230 */ 231 public function setLimit($limit) { 232 if($limit) { 233 $this->range_end = $this->range_begin + $limit; 234 } else { 235 $this->range_end = 0; 236 } 237 } 238 239 /** 240 * Return the number of results (regardless of limit and offset settings) 241 * 242 * Use this to implement paging. Important: this may only be called after running @see execute() 243 * 244 * @return int 245 */ 246 public function getCount() { 247 if($this->count < 0) throw new StructException('Count is only accessible after executing the search'); 248 return $this->count; 249 } 250 251 /** 252 * Returns the PID associated with each result row 253 * 254 * Important: this may only be called after running @see execute() 255 * 256 * @return \string[] 257 */ 258 public function getPids() { 259 if($this->result_pids === null) throw new StructException('PIDs are only accessible after executing the search'); 260 return $this->result_pids; 261 } 262 263 /** 264 * Execute this search and return the result 265 * 266 * The result is a two dimensional array of Value()s. 267 * 268 * This will always query for the full result (not using offset and limit) and then 269 * return the wanted range, setting the count (@see getCount) to the whole result number 270 * 271 * @return Value[][] 272 */ 273 public function execute() { 274 list($sql, $opts) = $this->getSQL(); 275 276 /** @var \PDOStatement $res */ 277 $res = $this->sqlite->query($sql, $opts); 278 if($res === false) throw new StructException("SQL execution failed for\n\n$sql"); 279 280 $this->result_pids = array(); 281 $result = array(); 282 $cursor = -1; 283 $pageidAndRevOnly = !(count($this->columns) > 2 || $this->columns[0]->getTid() || 284 !empty($this->columns[1]) && $this->columns[1]->getTid()); 285 while($row = $res->fetch(\PDO::FETCH_ASSOC)) { 286 $cursor++; 287 if($cursor < $this->range_begin) continue; 288 if($this->range_end && $cursor >= $this->range_end) continue; 289 290 $C = 0; 291 $resrow = array(); 292 $isempty = true; 293 foreach($this->columns as $col) { 294 $val = $row["C$C"]; 295 if($col->isMulti()) { 296 $val = explode(self::CONCAT_SEPARATOR, $val); 297 } 298 $value = new Value($col, $val); 299 $isempty &= $this->isEmptyValue($value); 300 $resrow[] = $value; 301 $C++; 302 } 303 304 // skip empty rows 305 if($isempty && !$pageidAndRevOnly) { 306 $cursor--; 307 continue; 308 } 309 310 $this->result_pids[] = $row['PID']; 311 $result[] = $resrow; 312 } 313 314 $this->sqlite->res_close($res); 315 $this->count = $cursor + 1; 316 return $result; 317 } 318 319 /** 320 * Transform the set search parameters into a statement 321 * 322 * @return array ($sql, $opts) The SQL and parameters to execute 323 */ 324 public function getSQL() { 325 if(!$this->columns) throw new StructException('nocolname'); 326 327 $QB = new QueryBuilder(); 328 329 // basic tables 330 $first_table = ''; 331 foreach($this->schemas as $schema) { 332 $datatable = 'data_' . $schema->getTable(); 333 if($first_table) { 334 // follow up tables 335 $QB->addLeftJoin($first_table, $datatable, $datatable, "$first_table.pid = $datatable.pid"); 336 } else { 337 // first table 338 339 if(!$schema->isLookup()) { 340 $QB->addTable('schema_assignments'); 341 $QB->filters()->whereAnd("$datatable.pid = schema_assignments.pid"); 342 $QB->filters()->whereAnd("schema_assignments.tbl = '{$schema->getTable()}'"); 343 $QB->filters()->whereAnd("schema_assignments.assigned = 1"); 344 $QB->filters()->whereAnd("GETACCESSLEVEL($datatable.pid) > 0"); 345 $QB->filters()->whereAnd("PAGEEXISTS($datatable.pid) = 1"); 346 } 347 348 $QB->addTable($datatable); 349 $QB->addSelectColumn($datatable, 'pid', 'PID'); 350 $QB->addGroupByColumn($datatable, 'pid'); 351 352 $first_table = $datatable; 353 } 354 $QB->filters()->whereAnd("$datatable.latest = 1"); 355 } 356 357 // columns to select, handling multis 358 $sep = self::CONCAT_SEPARATOR; 359 $n = 0; 360 foreach($this->columns as $col) { 361 $CN = 'C' . $n++; 362 363 if($col->isMulti()) { 364 $datatable = "data_{$col->getTable()}"; 365 $multitable = "multi_{$col->getTable()}"; 366 $MN = 'M' . $col->getColref(); 367 368 $QB->addLeftJoin( 369 $datatable, 370 $multitable, 371 $MN, 372 "$datatable.pid = $MN.pid AND 373 $datatable.rev = $MN.rev AND 374 $MN.colref = {$col->getColref()}" 375 ); 376 377 $col->getType()->select($QB, $MN, 'value', $CN); 378 $sel = $QB->getSelectStatement($CN); 379 $QB->addSelectStatement("GROUP_CONCAT($sel, '$sep')", $CN); 380 } else { 381 $col->getType()->select($QB, 'data_' . $col->getTable(), $col->getColName(), $CN); 382 $QB->addGroupByStatement($CN); 383 } 384 } 385 386 // where clauses 387 foreach($this->filter as $filter) { 388 list($col, $value, $comp, $op) = $filter; 389 390 $datatable = "data_{$col->getTable()}"; 391 $multitable = "multi_{$col->getTable()}"; 392 393 /** @var $col Column */ 394 if($col->isMulti()) { 395 $MN = 'MN' . $col->getColref(); // FIXME this joins a second time if the column was selected before 396 397 $QB->addLeftJoin( 398 $datatable, 399 $multitable, 400 $MN, 401 "$datatable.pid = $MN.pid AND 402 $datatable.rev = $MN.rev AND 403 $MN.colref = {$col->getColref()}" 404 ); 405 $coltbl = $MN; 406 $colnam = 'value'; 407 } else { 408 $coltbl = $datatable; 409 $colnam = $col->getColName(); 410 } 411 412 $col->getType()->filter($QB, $coltbl, $colnam, $comp, $value, $op); // type based filter 413 } 414 415 // sorting - we always sort by the single val column 416 foreach($this->sortby as $sort) { 417 list($col, $asc) = $sort; 418 /** @var $col Column */ 419 $col->getType()->sort($QB, 'data_' . $col->getTable(), $col->getColName(false), $asc ? 'ASC' : 'DESC'); 420 } 421 422 return $QB->getSQL(); 423 } 424 425 /** 426 * Returns all the columns that where added to the search 427 * 428 * @return Column[] 429 */ 430 public function getColumns() { 431 return $this->columns; 432 } 433 434 /** 435 * Checks if the given column is a * wildcard 436 * 437 * If it's a wildcard all matching columns are added to the column list, otherwise 438 * nothing happens 439 * 440 * @param string $colname 441 * @return bool was wildcard? 442 */ 443 protected function processWildcard($colname) { 444 list($colname, $table) = $this->resolveColumn($colname); 445 if($colname !== '*') return false; 446 447 // no table given? assume the first is meant 448 if($table === null) { 449 $schema_list = array_keys($this->schemas); 450 $table = $schema_list[0]; 451 } 452 453 $schema = $this->schemas[$table]; 454 if(!$schema) return false; 455 $this->columns = array_merge($this->columns, $schema->getColumns(false)); 456 return true; 457 } 458 459 /** 460 * Split a given column name into table and column 461 * 462 * Handles Aliases. Table might be null if none given. 463 * 464 * @param $colname 465 * @return array (colname, table) 466 */ 467 protected function resolveColumn($colname) { 468 if(!$this->schemas) throw new StructException('noschemas'); 469 470 // resolve the alias or table name 471 list($table, $colname) = explode('.', $colname, 2); 472 if(!$colname) { 473 $colname = $table; 474 $table = null; 475 } 476 if($table && isset($this->aliases[$table])) { 477 $table = $this->aliases[$table]; 478 } 479 480 if(!$colname) throw new StructException('nocolname'); 481 482 return array($colname, $table); 483 } 484 485 /** 486 * Find a column to be used in the search 487 * 488 * @param string $colname may contain an alias 489 * @return bool|Column 490 */ 491 public function findColumn($colname) { 492 if(!$this->schemas) throw new StructException('noschemas'); 493 494 // add "fake" column for special col (unless it's a lookup) 495 if(!(reset($this->schemas)->isLookup())) { 496 $schema_list = array_keys($this->schemas); 497 if($colname == '%pageid%') { 498 return new PageColumn(0, new Page(), $schema_list[0]); 499 } 500 if($colname == '%title%') { 501 return new PageColumn(0, new Page(array('usetitles' => true)), $schema_list[0]); 502 } 503 if($colname == '%lastupdate%') { 504 return new RevisionColumn(0, new DateTime(), $schema_list[0]); 505 } 506 } 507 508 list($colname, $table) = $this->resolveColumn($colname); 509 510 // if table name given search only that, otherwise try all for matching column name 511 if($table !== null) { 512 $schemas = array($table => $this->schemas[$table]); 513 } else { 514 $schemas = $this->schemas; 515 } 516 517 // find it 518 $col = false; 519 foreach($schemas as $schema) { 520 if(empty($schema)) { 521 continue; 522 } 523 $col = $schema->findColumn($colname); 524 if($col) break; 525 } 526 527 return $col; 528 } 529 530 /** 531 * Check if the given row is empty or references our own row 532 * 533 * @param Value $value 534 * @return bool 535 */ 536 protected function isEmptyValue(Value $value) { 537 if ($value->isEmpty()) return true; 538 if ($value->getColumn()->getTid() == 0) return true; 539 return false; 540 } 541} 542 543 544