1<?php 2 3namespace dokuwiki\plugin\struct\meta; 4 5use dokuwiki\plugin\struct\types\DateTime; 6use dokuwiki\plugin\struct\types\Page; 7 8class Search { 9 /** 10 * This separator will be used to concat multi values to flatten them in the result set 11 */ 12 const CONCAT_SEPARATOR = "\n!_-_-_-_-_!\n"; 13 14 /** 15 * The list of known and allowed comparators 16 * (order matters) 17 */ 18 static public $COMPARATORS = array( 19 '<=', '>=', '=*', '=', '<', '>', '!=', '!~', '~', 20 ); 21 22 /** @var \helper_plugin_sqlite */ 23 protected $sqlite; 24 25 /** @var Schema[] list of schemas to query */ 26 protected $schemas = array(); 27 28 /** @var Column[] list of columns to select */ 29 protected $columns = array(); 30 31 /** @var array the sorting of the result */ 32 protected $sortby = array(); 33 34 /** @var array the filters */ 35 protected $filter = array(); 36 37 /** @var array list of aliases tables can be referenced by */ 38 protected $aliases = array(); 39 40 /** @var int begin results from here */ 41 protected $range_begin = 0; 42 43 /** @var int end results here */ 44 protected $range_end = 0; 45 46 /** @var int the number of results */ 47 protected $count = -1; 48 /** @var string[] the PIDs of the result rows */ 49 protected $result_pids = null; 50 51 /** 52 * Search constructor. 53 */ 54 public function __construct() { 55 /** @var \helper_plugin_struct_db $plugin */ 56 $plugin = plugin_load('helper', 'struct_db'); 57 $this->sqlite = $plugin->getDB(); 58 } 59 60 /** 61 * Add a schema to be searched 62 * 63 * Call multiple times for multiple schemas. 64 * 65 * @param string $table 66 * @param string $alias 67 */ 68 public function addSchema($table, $alias = '') { 69 $this->schemas[$table] = new Schema($table); 70 if($alias) $this->aliases[$alias] = $table; 71 } 72 73 /** 74 * Add a column to be returned by the search 75 * 76 * Call multiple times for multiple columns. Be sure the referenced tables have been 77 * added before 78 * 79 * @param string $colname may contain an alias 80 */ 81 public function addColumn($colname) { 82 if($this->processWildcard($colname)) return; // wildcard? 83 $col = $this->findColumn($colname); 84 if(!$col) return; //FIXME do we really want to ignore missing columns? 85 $this->columns[] = $col; 86 } 87 88 /** 89 * Add sorting options 90 * 91 * Call multiple times for multiple columns. Be sure the referenced tables have been 92 * added before 93 * 94 * @param string $colname may contain an alias 95 * @param bool $asc sort direction (ASC = true, DESC = false) 96 */ 97 public function addSort($colname, $asc = true) { 98 $col = $this->findColumn($colname); 99 if(!$col) return; //FIXME do we really want to ignore missing columns? 100 101 $this->sortby[$col->getFullQualifiedLabel()] = array($col, $asc); 102 } 103 104 /** 105 * Returns all set sort columns 106 * 107 * @return array 108 */ 109 public function getSorts() { 110 return $this->sortby; 111 } 112 113 /** 114 * Adds a filter 115 * 116 * @param string $colname may contain an alias 117 * @param string|string[] $value 118 * @param string $comp @see self::COMPARATORS 119 * @param string $op either 'OR' or 'AND' 120 */ 121 public function addFilter($colname, $value, $comp, $op = 'OR') { 122 /* Convert certain filters into others 123 * this reduces the number of supported filters to implement in types */ 124 if($comp == '*~') { 125 $value = $this->filterWrapAsterisks($value); 126 $comp = '~'; 127 } elseif($comp == '<>') { 128 $comp = '!='; 129 } 130 131 if(!in_array($comp, self::$COMPARATORS)) throw new StructException("Bad comperator. Use " . join(',', self::$COMPARATORS)); 132 if($op != 'OR' && $op != 'AND') throw new StructException('Bad filter type . Only AND or OR allowed'); 133 134 $col = $this->findColumn($colname); 135 if(!$col) return; // ignore missing columns, filter might have been for different schema 136 137 // map filter operators to SQL syntax 138 switch($comp) { 139 case '~': 140 $comp = 'LIKE'; 141 break; 142 case '!~': 143 $comp = 'NOT LIKE'; 144 break; 145 case '=*': 146 $comp = 'REGEXP'; 147 break; 148 } 149 150 // we use asterisks, but SQL wants percents 151 if($comp == 'LIKE' || $comp == 'NOT LIKE') { 152 $value = $this->filterChangeToLike($value); 153 } 154 155 // add the filter 156 $this->filter[] = array($col, $value, $comp, $op); 157 } 158 159 /** 160 * Wrap given value in asterisks 161 * 162 * @param string|string[] $value 163 * @return string|string[] 164 */ 165 protected function filterWrapAsterisks($value) { 166 $map = function ($input) { 167 return "*$input*"; 168 }; 169 170 if(is_array($value)) { 171 $value = array_map($map, $value); 172 } else { 173 $value = $map($value); 174 } 175 return $value; 176 } 177 178 /** 179 * Change given string to use % instead of * 180 * 181 * @param string|string[] $value 182 * @return string|string[] 183 */ 184 protected function filterChangeToLike($value) { 185 $map = function ($input) { 186 return str_replace('*', '%', $input); 187 }; 188 189 if(is_array($value)) { 190 $value = array_map($map, $value); 191 } else { 192 $value = $map($value); 193 } 194 return $value; 195 } 196 197 /** 198 * Set offset for the results 199 * 200 * @param int $offset 201 */ 202 public function setOffset($offset) { 203 $limit = 0; 204 if($this->range_end) { 205 // if there was a limit set previously, the range_end needs to be recalculated 206 $limit = $this->range_end - $this->range_begin; 207 } 208 $this->range_begin = $offset; 209 if($limit) $this->setLimit($limit); 210 } 211 212 /** 213 * Limit results to this number 214 * 215 * @param int $limit Set to 0 to disable limit again 216 */ 217 public function setLimit($limit) { 218 if($limit) { 219 $this->range_end = $this->range_begin + $limit; 220 } else { 221 $this->range_end = 0; 222 } 223 } 224 225 /** 226 * Return the number of results (regardless of limit and offset settings) 227 * 228 * Use this to implement paging. Important: this may only be called after running @see execute() 229 * 230 * @return int 231 */ 232 public function getCount() { 233 if($this->count < 0) throw new StructException('Count is only accessible after executing the search'); 234 return $this->count; 235 } 236 237 /** 238 * Returns the PID associated with each result row 239 * 240 * Important: this may only be called after running @see execute() 241 * 242 * @return \string[] 243 */ 244 public function getPids() { 245 if($this->result_pids === null) throw new StructException('PIDs are only accessible after executing the search'); 246 return $this->result_pids; 247 } 248 249 /** 250 * Execute this search and return the result 251 * 252 * The result is a two dimensional array of Value()s. 253 * 254 * This will always query for the full result (not using offset and limit) and then 255 * return the wanted range, setting the count (@see getCount) to the whole result number 256 * 257 * @return Value[][] 258 */ 259 public function execute() { 260 list($sql, $opts) = $this->getSQL(); 261 262 /** @var \PDOStatement $res */ 263 $res = $this->sqlite->query($sql, $opts); 264 if($res === false) throw new StructException("SQL execution failed for\n\n$sql"); 265 266 $this->result_pids = array(); 267 $result = array(); 268 $cursor = -1; 269 while($row = $res->fetch(\PDO::FETCH_ASSOC)) { 270 if($this->isRowEmpty($row)) { 271 continue; 272 } 273 $cursor++; 274 if($cursor < $this->range_begin) continue; 275 if($this->range_end && $cursor >= $this->range_end) continue; 276 277 $this->result_pids[] = $row['PID']; 278 279 $C = 0; 280 $resrow = array(); 281 foreach($this->columns as $col) { 282 $val = $row["C$C"]; 283 if($col->isMulti()) { 284 $val = explode(self::CONCAT_SEPARATOR, $val); 285 } 286 $resrow[] = new Value($col, $val); 287 $C++; 288 } 289 $result[] = $resrow; 290 } 291 292 $this->sqlite->res_close($res); 293 $this->count = $cursor + 1; 294 return $result; 295 } 296 297 /** 298 * Transform the set search parameters into a statement 299 * 300 * @return array ($sql, $opts) The SQL and parameters to execute 301 */ 302 public function getSQL() { 303 if(!$this->columns) throw new StructException('nocolname'); 304 305 $QB = new QueryBuilder(); 306 307 // basic tables 308 $first_table = ''; 309 foreach($this->schemas as $schema) { 310 $datatable = 'data_' . $schema->getTable(); 311 if($first_table) { 312 // follow up tables 313 $QB->addLeftJoin($first_table, $datatable, $datatable, "$first_table.pid = $datatable.pid"); 314 } else { 315 // first table 316 317 if(!$schema->isLookup()) { 318 $QB->addTable('schema_assignments'); 319 $QB->filters()->whereAnd("$datatable.pid = schema_assignments.pid"); 320 $QB->filters()->whereAnd("schema_assignments.tbl = '{$schema->getTable()}'"); 321 $QB->filters()->whereAnd("schema_assignments.assigned = 1"); 322 $QB->filters()->whereAnd("GETACCESSLEVEL($datatable.pid) > 0"); 323 $QB->filters()->whereAnd("PAGEEXISTS($datatable.pid) = 1"); 324 } 325 326 $QB->addTable($datatable); 327 $QB->addSelectColumn($datatable, 'pid', 'PID'); 328 $QB->addGroupByColumn($datatable, 'pid'); 329 330 $first_table = $datatable; 331 } 332 $QB->filters()->whereAnd("$datatable.latest = 1"); 333 } 334 335 // columns to select, handling multis 336 $sep = self::CONCAT_SEPARATOR; 337 $n = 0; 338 foreach($this->columns as $col) { 339 $CN = 'C' . $n++; 340 341 if($col->isMulti()) { 342 $datatable = "data_{$col->getTable()}"; 343 $multitable = "multi_{$col->getTable()}"; 344 $MN = 'M' . $col->getColref(); 345 346 $QB->addLeftJoin( 347 $datatable, 348 $multitable, 349 $MN, 350 "$datatable.pid = $MN.pid AND 351 $datatable.rev = $MN.rev AND 352 $MN.colref = {$col->getColref()}" 353 ); 354 355 $col->getType()->select($QB, $MN, 'value', $CN); 356 $sel = $QB->getSelectStatement($CN); 357 $QB->addSelectStatement("GROUP_CONCAT($sel, '$sep')", $CN); 358 } else { 359 $col->getType()->select($QB, 'data_' . $col->getTable(), $col->getColName(), $CN); 360 $QB->addGroupByStatement($CN); 361 } 362 } 363 364 // where clauses 365 foreach($this->filter as $filter) { 366 list($col, $value, $comp, $op) = $filter; 367 368 $datatable = "data_{$col->getTable()}"; 369 $multitable = "multi_{$col->getTable()}"; 370 371 /** @var $col Column */ 372 if($col->isMulti()) { 373 $MN = 'MN' . $col->getColref(); // FIXME this joins a second time if the column was selected before 374 375 $QB->addLeftJoin( 376 $datatable, 377 $multitable, 378 $MN, 379 "$datatable.pid = $MN.pid AND 380 $datatable.rev = $MN.rev AND 381 $MN.colref = {$col->getColref()}" 382 ); 383 $coltbl = $MN; 384 $colnam = 'value'; 385 } else { 386 $coltbl = $datatable; 387 $colnam = $col->getColName(); 388 } 389 390 $col->getType()->filter($QB, $coltbl, $colnam, $comp, $value, $op); // type based filter 391 } 392 393 // sorting - we always sort by the single val column 394 foreach($this->sortby as $sort) { 395 list($col, $asc) = $sort; 396 /** @var $col Column */ 397 $QB->addOrderBy($col->getFullColName(false) . ' ' . (($asc) ? 'ASC' : 'DESC')); 398 } 399 400 return $QB->getSQL(); 401 } 402 403 /** 404 * Returns all the columns that where added to the search 405 * 406 * @return Column[] 407 */ 408 public function getColumns() { 409 return $this->columns; 410 } 411 412 /** 413 * Checks if the given column is a * wildcard 414 * 415 * If it's a wildcard all matching columns are added to the column list, otherwise 416 * nothing happens 417 * 418 * @param string $colname 419 * @return bool was wildcard? 420 */ 421 protected function processWildcard($colname) { 422 list($colname, $table) = $this->resolveColumn($colname); 423 if($colname !== '*') return false; 424 425 // no table given? assume the first is meant 426 if($table === null) { 427 $schema_list = array_keys($this->schemas); 428 $table = $schema_list[0]; 429 } 430 431 $schema = $this->schemas[$table]; 432 if(!$schema) return false; 433 $this->columns = array_merge($this->columns, $schema->getColumns(false)); 434 return true; 435 } 436 437 /** 438 * Split a given column name into table and column 439 * 440 * Handles Aliases. Table might be null if none given. 441 * 442 * @param $colname 443 * @return array (colname, table) 444 */ 445 protected function resolveColumn($colname) { 446 if(!$this->schemas) throw new StructException('noschemas'); 447 448 // resolve the alias or table name 449 list($table, $colname) = explode('.', $colname, 2); 450 if(!$colname) { 451 $colname = $table; 452 $table = null; 453 } 454 if($table && isset($this->aliases[$table])) { 455 $table = $this->aliases[$table]; 456 } 457 458 if(!$colname) throw new StructException('nocolname'); 459 460 return array($colname, $table); 461 } 462 463 /** 464 * Find a column to be used in the search 465 * 466 * @param string $colname may contain an alias 467 * @return bool|Column 468 */ 469 public function findColumn($colname) { 470 if(!$this->schemas) throw new StructException('noschemas'); 471 472 // handling of page and title column is special - we add a "fake" column 473 $schema_list = array_keys($this->schemas); 474 if($colname == '%pageid%') { 475 return new PageColumn(0, new Page(), $schema_list[0]); 476 } 477 if($colname == '%title%') { 478 return new PageColumn(0, new Page(array('usetitles' => true)), $schema_list[0]); 479 } 480 if($colname == '%lastupdate%') { 481 return new RevisionColumn(0, new DateTime(), $schema_list[0]); 482 } 483 484 list($colname, $table) = $this->resolveColumn($colname); 485 486 // if table name given search only that, otherwise try all for matching column name 487 if($table !== null) { 488 $schemas = array($table => $this->schemas[$table]); 489 } else { 490 $schemas = $this->schemas; 491 } 492 493 // find it 494 $col = false; 495 foreach($schemas as $schema) { 496 if(empty($schema)) { 497 continue; 498 } 499 $col = $schema->findColumn($colname); 500 if($col) break; 501 } 502 503 return $col; 504 } 505 506 /** 507 * Check if a row is empty / only contains a reference to itself 508 * 509 * @param array $rowColumns an array as returned from the database 510 * @return bool 511 */ 512 private function isRowEmpty($rowColumns) { 513 $C = 0; 514 foreach($this->columns as $col) { 515 $val = $rowColumns["C$C"]; 516 $C += 1; 517 if(blank($val) || is_a($col->getType(), 'dokuwiki\plugin\struct\types\Page') && $val == $rowColumns["PID"]) { 518 continue; 519 } 520 return false; 521 } 522 return true; 523 } 524 525} 526 527 528