1<?php 2 3namespace dokuwiki\plugin\struct\meta; 4 5use dokuwiki\plugin\struct\types\DateTime; 6use dokuwiki\plugin\struct\types\Page; 7 8class Search { 9 /** 10 * This separator will be used to concat multi values to flatten them in the result set 11 */ 12 const CONCAT_SEPARATOR = "\n!_-_-_-_-_!\n"; 13 14 /** 15 * The list of known and allowed comparators 16 * (order matters) 17 */ 18 static public $COMPARATORS = array( 19 '<=', '>=', '=*', '=', '<', '>', '!=', '!~', '~', 20 ); 21 22 /** @var \helper_plugin_sqlite */ 23 protected $sqlite; 24 25 /** @var Schema[] list of schemas to query */ 26 protected $schemas = array(); 27 28 /** @var Column[] list of columns to select */ 29 protected $columns = array(); 30 31 /** @var array the sorting of the result */ 32 protected $sortby = array(); 33 34 /** @var array the filters */ 35 protected $filter = array(); 36 37 /** @var array list of aliases tables can be referenced by */ 38 protected $aliases = array(); 39 40 /** @var int begin results from here */ 41 protected $range_begin = 0; 42 43 /** @var int end results here */ 44 protected $range_end = 0; 45 46 /** @var int the number of results */ 47 protected $count = -1; 48 /** @var string[] the PIDs of the result rows */ 49 protected $result_pids = null; 50 51 /** 52 * Search constructor. 53 */ 54 public function __construct() { 55 /** @var \helper_plugin_struct_db $plugin */ 56 $plugin = plugin_load('helper', 'struct_db'); 57 $this->sqlite = $plugin->getDB(); 58 } 59 60 /** 61 * Add a schema to be searched 62 * 63 * Call multiple times for multiple schemas. 64 * 65 * @param string $table 66 * @param string $alias 67 */ 68 public function addSchema($table, $alias = '') { 69 $this->schemas[$table] = new Schema($table); 70 if($alias) $this->aliases[$alias] = $table; 71 } 72 73 /** 74 * Add a column to be returned by the search 75 * 76 * Call multiple times for multiple columns. Be sure the referenced tables have been 77 * added before 78 * 79 * @param string $colname may contain an alias 80 */ 81 public function addColumn($colname) { 82 if($this->processWildcard($colname)) return; // wildcard? 83 $col = $this->findColumn($colname); 84 if(!$col) return; //FIXME do we really want to ignore missing columns? 85 $this->columns[] = $col; 86 } 87 88 /** 89 * Add sorting options 90 * 91 * Call multiple times for multiple columns. Be sure the referenced tables have been 92 * added before 93 * 94 * @param string $colname may contain an alias 95 * @param bool $asc sort direction (ASC = true, DESC = false) 96 */ 97 public function addSort($colname, $asc = true) { 98 $col = $this->findColumn($colname); 99 if(!$col) return; //FIXME do we really want to ignore missing columns? 100 101 $this->sortby[$col->getFullQualifiedLabel()] = array($col, $asc); 102 } 103 104 /** 105 * Returns all set sort columns 106 * 107 * @return array 108 */ 109 public function getSorts() { 110 return $this->sortby; 111 } 112 113 /** 114 * Adds a filter 115 * 116 * @param string $colname may contain an alias 117 * @param string|string[] $value 118 * @param string $comp @see self::COMPARATORS 119 * @param string $op either 'OR' or 'AND' 120 */ 121 public function addFilter($colname, $value, $comp, $op = 'OR') { 122 /* Convert certain filters into others 123 * this reduces the number of supported filters to implement in types */ 124 if($comp == '*~') { 125 $value = $this->filterWrapAsterisks($value); 126 $comp = '~'; 127 } elseif($comp == '<>') { 128 $comp = '!='; 129 } 130 131 if(!in_array($comp, self::$COMPARATORS)) throw new StructException("Bad comperator. Use " . join(',', self::$COMPARATORS)); 132 if($op != 'OR' && $op != 'AND') throw new StructException('Bad filter type . Only AND or OR allowed'); 133 134 $col = $this->findColumn($colname); 135 if(!$col) return; // ignore missing columns, filter might have been for different schema 136 137 // map filter operators to SQL syntax 138 switch($comp) { 139 case '~': 140 $comp = 'LIKE'; 141 break; 142 case '!~': 143 $comp = 'NOT LIKE'; 144 break; 145 case '=*': 146 $comp = 'REGEXP'; 147 break; 148 } 149 150 // we use asterisks, but SQL wants percents 151 if($comp == 'LIKE' || $comp == 'NOT LIKE') { 152 $value = $this->filterChangeToLike($value); 153 } 154 155 // add the filter 156 $this->filter[] = array($col, $value, $comp, $op); 157 } 158 159 /** 160 * Wrap given value in asterisks 161 * 162 * @param string|string[] $value 163 * @return string|string[] 164 */ 165 protected function filterWrapAsterisks($value) { 166 $map = function ($input) { 167 return "*$input*"; 168 }; 169 170 if(is_array($value)) { 171 $value = array_map($map, $value); 172 } else { 173 $value = $map($value); 174 } 175 return $value; 176 } 177 178 /** 179 * Change given string to use % instead of * 180 * 181 * @param string|string[] $value 182 * @return string|string[] 183 */ 184 protected function filterChangeToLike($value) { 185 $map = function ($input) { 186 return str_replace('*', '%', $input); 187 }; 188 189 if(is_array($value)) { 190 $value = array_map($map, $value); 191 } else { 192 $value = $map($value); 193 } 194 return $value; 195 } 196 197 /** 198 * Set offset for the results 199 * 200 * @param int $offset 201 */ 202 public function setOffset($offset) { 203 $limit = 0; 204 if($this->range_end) { 205 // if there was a limit set previously, the range_end needs to be recalculated 206 $limit = $this->range_end - $this->range_begin; 207 } 208 $this->range_begin = $offset; 209 if($limit) $this->setLimit($limit); 210 } 211 212 /** 213 * Limit results to this number 214 * 215 * @param int $limit Set to 0 to disable limit again 216 */ 217 public function setLimit($limit) { 218 if($limit) { 219 $this->range_end = $this->range_begin + $limit; 220 } else { 221 $this->range_end = 0; 222 } 223 } 224 225 /** 226 * Return the number of results (regardless of limit and offset settings) 227 * 228 * Use this to implement paging. Important: this may only be called after running @see execute() 229 * 230 * @return int 231 */ 232 public function getCount() { 233 if($this->count < 0) throw new StructException('Count is only accessible after executing the search'); 234 return $this->count; 235 } 236 237 /** 238 * Returns the PID associated with each result row 239 * 240 * Important: this may only be called after running @see execute() 241 * 242 * @return \string[] 243 */ 244 public function getPids() { 245 if($this->result_pids === null) throw new StructException('PIDs are only accessible after executing the search'); 246 return $this->result_pids; 247 } 248 249 /** 250 * Execute this search and return the result 251 * 252 * The result is a two dimensional array of Value()s. 253 * 254 * This will always query for the full result (not using offset and limit) and then 255 * return the wanted range, setting the count (@see getCount) to the whole result number 256 * 257 * @return Value[][] 258 */ 259 public function execute() { 260 list($sql, $opts) = $this->getSQL(); 261 262 /** @var \PDOStatement $res */ 263 $res = $this->sqlite->query($sql, $opts); 264 if($res === false) throw new StructException("SQL execution failed for\n\n$sql"); 265 266 $this->result_pids = array(); 267 $result = array(); 268 $cursor = -1; 269 while($row = $res->fetch(\PDO::FETCH_ASSOC)) { 270 if($this->isRowEmpty($row)) { 271 continue; 272 } 273 $cursor++; 274 if($cursor < $this->range_begin) continue; 275 if($this->range_end && $cursor >= $this->range_end) continue; 276 277 $this->result_pids[] = $row['PID']; 278 279 $C = 0; 280 $resrow = array(); 281 foreach($this->columns as $col) { 282 $val = $row["C$C"]; 283 if($col->isMulti()) { 284 $val = explode(self::CONCAT_SEPARATOR, $val); 285 } 286 $resrow[] = new Value($col, $val); 287 $C++; 288 } 289 $result[] = $resrow; 290 } 291 292 $this->sqlite->res_close($res); 293 $this->count = $cursor + 1; 294 return $result; 295 } 296 297 /** 298 * Transform the set search parameters into a statement 299 * 300 * @return array ($sql, $opts) The SQL and parameters to execute 301 */ 302 public function getSQL() { 303 if(!$this->columns) throw new StructException('nocolname'); 304 305 $QB = new QueryBuilder(); 306 307 // basic tables 308 $first_table = ''; 309 foreach($this->schemas as $schema) { 310 $datatable = 'data_' . $schema->getTable(); 311 if($first_table) { 312 // follow up tables 313 $QB->addLeftJoin($first_table, $datatable, $datatable, "$first_table.pid = $datatable.pid"); 314 } else { 315 // first table 316 $QB->addTable('schema_assignments'); 317 $QB->addTable($datatable); 318 $QB->addSelectColumn($datatable, 'pid', 'PID'); 319 $QB->addGroupByColumn($datatable, 'pid'); 320 321 $QB->filters()->whereAnd("$datatable.pid = schema_assignments.pid"); 322 $QB->filters()->whereAnd("schema_assignments.tbl = '{$schema->getTable()}'"); 323 $QB->filters()->whereAnd("schema_assignments.assigned = 1"); 324 $QB->filters()->whereAnd("GETACCESSLEVEL($datatable.pid) > 0"); 325 $QB->filters()->whereAnd("PAGEEXISTS($datatable.pid) = 1"); 326 327 $first_table = $datatable; 328 } 329 $QB->filters()->whereAnd("$datatable.latest = 1"); 330 } 331 332 // columns to select, handling multis 333 $sep = self::CONCAT_SEPARATOR; 334 $n = 0; 335 foreach($this->columns as $col) { 336 $CN = 'C' . $n++; 337 338 if($col->isMulti()) { 339 $datatable = "data_{$col->getTable()}"; 340 $multitable = "multi_{$col->getTable()}"; 341 $MN = 'M' . $col->getColref(); 342 343 $QB->addLeftJoin( 344 $datatable, 345 $multitable, 346 $MN, 347 "$datatable.pid = $MN.pid AND 348 $datatable.rev = $MN.rev AND 349 $MN.colref = {$col->getColref()}" 350 ); 351 352 $col->getType()->select($QB, $MN, 'value', $CN); 353 $sel = $QB->getSelectStatement($CN); 354 $QB->addSelectStatement("GROUP_CONCAT($sel, '$sep')", $CN); 355 } else { 356 $col->getType()->select($QB, 'data_' . $col->getTable(), $col->getColName(), $CN); 357 $QB->addGroupByStatement($CN); 358 } 359 } 360 361 // where clauses 362 foreach($this->filter as $filter) { 363 list($col, $value, $comp, $op) = $filter; 364 365 $datatable = "data_{$col->getTable()}"; 366 $multitable = "multi_{$col->getTable()}"; 367 368 /** @var $col Column */ 369 if($col->isMulti()) { 370 $MN = 'MN' . $col->getColref(); // FIXME this joins a second time if the column was selected before 371 372 $QB->addLeftJoin( 373 $datatable, 374 $multitable, 375 $MN, 376 "$datatable.pid = $MN.pid AND 377 $datatable.rev = $MN.rev AND 378 $MN.colref = {$col->getColref()}" 379 ); 380 $coltbl = $MN; 381 $colnam = 'value'; 382 } else { 383 $coltbl = $datatable; 384 $colnam = $col->getColName(); 385 } 386 387 $col->getType()->filter($QB, $coltbl, $colnam, $comp, $value, $op); // type based filter 388 } 389 390 // sorting - we always sort by the single val column 391 foreach($this->sortby as $sort) { 392 list($col, $asc) = $sort; 393 /** @var $col Column */ 394 $col->getType()->sort($QB, 'data_'.$col->getTable(), $col->getColName(false), $asc ? 'ASC' : 'DESC'); 395 } 396 397 return $QB->getSQL(); 398 } 399 400 /** 401 * Returns all the columns that where added to the search 402 * 403 * @return Column[] 404 */ 405 public function getColumns() { 406 return $this->columns; 407 } 408 409 /** 410 * Checks if the given column is a * wildcard 411 * 412 * If it's a wildcard all matching columns are added to the column list, otherwise 413 * nothing happens 414 * 415 * @param string $colname 416 * @return bool was wildcard? 417 */ 418 protected function processWildcard($colname) { 419 list($colname, $table) = $this->resolveColumn($colname); 420 if($colname !== '*') return false; 421 422 // no table given? assume the first is meant 423 if($table === null) { 424 $schema_list = array_keys($this->schemas); 425 $table = $schema_list[0]; 426 } 427 428 $schema = $this->schemas[$table]; 429 if(!$schema) return false; 430 $this->columns = array_merge($this->columns, $schema->getColumns(false)); 431 return true; 432 } 433 434 /** 435 * Split a given column name into table and column 436 * 437 * Handles Aliases. Table might be null if none given. 438 * 439 * @param $colname 440 * @return array (colname, table) 441 */ 442 protected function resolveColumn($colname) { 443 if(!$this->schemas) throw new StructException('noschemas'); 444 445 // resolve the alias or table name 446 list($table, $colname) = explode('.', $colname, 2); 447 if(!$colname) { 448 $colname = $table; 449 $table = null; 450 } 451 if($table && isset($this->aliases[$table])) { 452 $table = $this->aliases[$table]; 453 } 454 455 if(!$colname) throw new StructException('nocolname'); 456 457 return array($colname, $table); 458 } 459 460 /** 461 * Find a column to be used in the search 462 * 463 * @param string $colname may contain an alias 464 * @return bool|Column 465 */ 466 public function findColumn($colname) { 467 if(!$this->schemas) throw new StructException('noschemas'); 468 469 // handling of page and title column is special - we add a "fake" column 470 $schema_list = array_keys($this->schemas); 471 if($colname == '%pageid%') { 472 return new PageColumn(0, new Page(), $schema_list[0]); 473 } 474 if($colname == '%title%') { 475 return new PageColumn(0, new Page(array('usetitles' => true)), $schema_list[0]); 476 } 477 if($colname == '%lastupdate%') { 478 return new RevisionColumn(0, new DateTime(), $schema_list[0]); 479 } 480 481 list($colname, $table) = $this->resolveColumn($colname); 482 483 // if table name given search only that, otherwise try all for matching column name 484 if($table !== null) { 485 $schemas = array($table => $this->schemas[$table]); 486 } else { 487 $schemas = $this->schemas; 488 } 489 490 // find it 491 $col = false; 492 foreach($schemas as $schema) { 493 if(empty($schema)) { 494 continue; 495 } 496 $col = $schema->findColumn($colname); 497 if($col) break; 498 } 499 500 return $col; 501 } 502 503 /** 504 * Check if a row is empty / only contains a reference to itself 505 * 506 * @param array $rowColumns an array as returned from the database 507 * @return bool 508 */ 509 private function isRowEmpty($rowColumns) { 510 $C = 0; 511 foreach($this->columns as $col) { 512 $val = $rowColumns["C$C"]; 513 $C += 1; 514 if(blank($val) || is_a($col->getType(), 'dokuwiki\plugin\struct\types\Page') && $val == $rowColumns["PID"]) { 515 continue; 516 } 517 return false; 518 } 519 return true; 520 } 521 522} 523 524 525