1<?php 2 3namespace dokuwiki\plugin\struct\meta; 4 5use dokuwiki\plugin\struct\types\Date; 6use dokuwiki\plugin\struct\types\DateTime; 7use dokuwiki\plugin\struct\types\Page; 8 9class Search { 10 /** 11 * This separator will be used to concat multi values to flatten them in the result set 12 */ 13 const CONCAT_SEPARATOR = "\n!_-_-_-_-_!\n"; 14 15 /** 16 * The list of known and allowed comparators 17 * (order matters) 18 */ 19 static public $COMPARATORS = array( 20 '<=', '>=', '=*', '=', '<', '>', '!=', '!~', '~', 21 ); 22 23 /** @var \helper_plugin_sqlite */ 24 protected $sqlite; 25 26 /** @var Schema[] list of schemas to query */ 27 protected $schemas = array(); 28 29 /** @var Column[] list of columns to select */ 30 protected $columns = array(); 31 32 /** @var array the sorting of the result */ 33 protected $sortby = array(); 34 35 /** @var array the filters */ 36 protected $filter = array(); 37 38 /** @var array list of aliases tables can be referenced by */ 39 protected $aliases = array(); 40 41 /** @var int begin results from here */ 42 protected $range_begin = 0; 43 44 /** @var int end results here */ 45 protected $range_end = 0; 46 47 /** @var int the number of results */ 48 protected $count = -1; 49 /** @var string[] the PIDs of the result rows */ 50 protected $result_pids = null; 51 52 /** 53 * Search constructor. 54 */ 55 public function __construct() { 56 /** @var \helper_plugin_struct_db $plugin */ 57 $plugin = plugin_load('helper', 'struct_db'); 58 $this->sqlite = $plugin->getDB(); 59 } 60 61 /** 62 * Add a schema to be searched 63 * 64 * Call multiple times for multiple schemas. 65 * 66 * @param string $table 67 * @param string $alias 68 */ 69 public function addSchema($table, $alias = '') { 70 $this->schemas[$table] = new Schema($table); 71 if($alias) $this->aliases[$alias] = $table; 72 } 73 74 /** 75 * Add a column to be returned by the search 76 * 77 * Call multiple times for multiple columns. Be sure the referenced tables have been 78 * added before 79 * 80 * @param string $colname may contain an alias 81 */ 82 public function addColumn($colname) { 83 $col = $this->findColumn($colname); 84 if(!$col) return; //FIXME do we really want to ignore missing columns? 85 $this->columns[] = $col; 86 } 87 88 /** 89 * Add sorting options 90 * 91 * Call multiple times for multiple columns. Be sure the referenced tables have been 92 * added before 93 * 94 * @param string $colname may contain an alias 95 * @param bool $asc sort direction (ASC = true, DESC = false) 96 */ 97 public function addSort($colname, $asc = true) { 98 $col = $this->findColumn($colname); 99 if(!$col) return; //FIXME do we really want to ignore missing columns? 100 101 $this->sortby[$col->getFullQualifiedLabel()] = array($col, $asc); 102 } 103 104 /** 105 * Returns all set sort columns 106 * 107 * @return array 108 */ 109 public function getSorts() { 110 return $this->sortby; 111 } 112 113 /** 114 * Adds a filter 115 * 116 * @param string $colname may contain an alias 117 * @param string|string[] $value 118 * @param string $comp @see self::COMPARATORS 119 * @param string $op either 'OR' or 'AND' 120 */ 121 public function addFilter($colname, $value, $comp, $op = 'OR') { 122 /* Convert certain filters into others 123 * this reduces the number of supported filters to implement in types */ 124 if ($comp == '*~') { 125 $value = $this->filterWrapAsterisks($value); 126 $comp = '~'; 127 } elseif ($comp == '<>') { 128 $comp = '!='; 129 } 130 131 if(!in_array($comp, self::$COMPARATORS)) throw new StructException("Bad comperator. Use " . join(',', self::$COMPARATORS)); 132 if($op != 'OR' && $op != 'AND') throw new StructException('Bad filter type . Only AND or OR allowed'); 133 134 $col = $this->findColumn($colname); 135 if(!$col) return; // ignore missing columns, filter might have been for different schema 136 137 // map filter operators to SQL syntax 138 switch($comp) { 139 case '~': 140 $comp = 'LIKE'; 141 break; 142 case '!~': 143 $comp = 'NOT LIKE'; 144 break; 145 case '=*': 146 $comp = 'REGEXP'; 147 break; 148 } 149 150 // we use asterisks, but SQL wants percents 151 if($comp == 'LIKE' || $comp == 'NOT LIKE') { 152 $value = $this->filterChangeToLike($value); 153 } 154 155 // add the filter 156 $this->filter[] = array($col, $value, $comp, $op); 157 } 158 159 /** 160 * Wrap given value in asterisks 161 * 162 * @param string|string[] $value 163 * @return string|string[] 164 */ 165 protected function filterWrapAsterisks($value) { 166 $map = function ($input) { 167 return "*$input*"; 168 }; 169 170 if(is_array($value)) { 171 $value = array_map($map, $value); 172 } else { 173 $value = $map($value); 174 } 175 return $value; 176 } 177 178 /** 179 * Change given string to use % instead of * 180 * 181 * @param string|string[] $value 182 * @return string|string[] 183 */ 184 protected function filterChangeToLike($value) { 185 $map = function ($input) { 186 return str_replace('*','%',$input); 187 }; 188 189 if(is_array($value)) { 190 $value = array_map($map, $value); 191 } else { 192 $value = $map($value); 193 } 194 return $value; 195 } 196 197 198 /** 199 * Set offset for the results 200 * 201 * @param int $offset 202 */ 203 public function setOffset($offset) { 204 $limit = 0; 205 if($this->range_end) { 206 // if there was a limit set previously, the range_end needs to be recalculated 207 $limit = $this->range_end - $this->range_begin; 208 } 209 $this->range_begin = $offset; 210 if($limit) $this->setLimit($limit); 211 } 212 213 /** 214 * Limit results to this number 215 * 216 * @param int $limit Set to 0 to disable limit again 217 */ 218 public function setLimit($limit) { 219 if($limit) { 220 $this->range_end = $this->range_begin + $limit; 221 } else { 222 $this->range_end = 0; 223 } 224 } 225 226 /** 227 * Return the number of results (regardless of limit and offset settings) 228 * 229 * Use this to implement paging. Important: this may only be called after running @see execute() 230 * 231 * @return int 232 */ 233 public function getCount() { 234 if($this->count < 0) throw new StructException('Count is only accessible after executing the search'); 235 return $this->count; 236 } 237 238 public function getPids() { 239 if($this->result_pids === null) throw new StructException('PIDs are only accessible after executing the search'); 240 return $this->result_pids; 241 } 242 243 /** 244 * Execute this search and return the result 245 * 246 * The result is a two dimensional array of Value()s. 247 * 248 * This will always query for the full result (not using offset and limit) and then 249 * return the wanted range, setting the count (@see getCount) to the whole result number 250 * 251 * @return Value[][] 252 */ 253 public function execute() { 254 list($sql, $opts) = $this->getSQL(); 255 256 /** @var \PDOStatement $res */ 257 $res = $this->sqlite->query($sql, $opts); 258 if($res === false) throw new StructException("SQL execution failed for\n\n$sql"); 259 260 $this->result_pids = array(); 261 $result = array(); 262 $cursor = -1; 263 while($row = $res->fetch(\PDO::FETCH_ASSOC)) { 264 if ($this->isRowEmpty($row)) { 265 continue; 266 } 267 $cursor++; 268 if($cursor < $this->range_begin) continue; 269 if($this->range_end && $cursor >= $this->range_end) continue; 270 271 $this->result_pids[] = $row['PID']; 272 273 $C = 0; 274 $resrow = array(); 275 foreach($this->columns as $col) { 276 $val = $row["C$C"]; 277 if($col->isMulti()) { 278 $val = explode(self::CONCAT_SEPARATOR, $val); 279 } 280 $resrow[] = new Value($col, $val); 281 $C++; 282 } 283 $result[] = $resrow; 284 } 285 286 $this->sqlite->res_close($res); 287 $this->count = $cursor + 1; 288 return $result; 289 } 290 291 /** 292 * Transform the set search parameters into a statement 293 * 294 * @return array ($sql, $opts) The SQL and parameters to execute 295 */ 296 public function getSQL() { 297 if(!$this->columns) throw new StructException('nocolname'); 298 299 $QB = new QueryBuilder(); 300 301 // basic tables 302 $first_table = ''; 303 foreach($this->schemas as $schema) { 304 $datatable = 'data_'.$schema->getTable(); 305 if($first_table) { 306 // follow up tables 307 $QB->addLeftJoin($first_table, $datatable, $datatable, "$first_table.pid = $datatable.pid"); 308 } else { 309 // first table 310 $QB->addTable('schema_assignments'); 311 $QB->addTable($datatable); 312 $QB->addSelectColumn($datatable, 'pid', 'PID'); 313 $QB->addGroupByColumn($datatable, 'pid'); 314 315 $QB->filters()->whereAnd("$datatable.pid = schema_assignments.pid"); 316 $QB->filters()->whereAnd("schema_assignments.tbl = '{$schema->getTable()}'"); 317 $QB->filters()->whereAnd("schema_assignments.assigned = 1"); 318 $QB->filters()->whereAnd("GETACCESSLEVEL($datatable.pid) > 0"); 319 $QB->filters()->whereAnd("PAGEEXISTS($datatable.pid) = 1"); 320 321 $first_table = $datatable; 322 } 323 $QB->filters()->whereAnd("$datatable.latest = 1"); 324 } 325 326 // columns to select, handling multis 327 $sep = self::CONCAT_SEPARATOR; 328 $n = 0; 329 foreach($this->columns as $col) { 330 $CN = 'C' . $n++; 331 332 if($col->isMulti()) { 333 $datatable = "data_{$col->getTable()}"; 334 $multitable = "multi_{$col->getTable()}"; 335 $MN = 'M' . $col->getColref(); 336 337 $QB->addLeftJoin( 338 $datatable, 339 $multitable, 340 $MN, 341 "$datatable.pid = $MN.pid AND 342 $datatable.rev = $MN.rev AND 343 $MN.colref = {$col->getColref()}" 344 ); 345 346 $col->getType()->select($QB, $MN, 'value' , $CN); 347 $sel = $QB->getSelectStatement($CN); 348 $QB->addSelectStatement("GROUP_CONCAT($sel, '$sep')", $CN); 349 } else { 350 $col->getType()->select($QB, 'data_'.$col->getTable(), $col->getColName() , $CN); 351 352 // the %lastupdate% column needs datetime mangling 353 if(is_a($col, 'dokuwiki\\plugin\\struct\\meta\\RevisionColumn')) { 354 $sel = $QB->getSelectStatement($CN); 355 $QB->addSelectStatement("DATETIME($sel, 'unixepoch')", $CN); 356 } 357 358 $QB->addGroupByStatement($CN); 359 } 360 } 361 362 // where clauses 363 foreach($this->filter as $filter) { 364 list($col, $value, $comp, $op) = $filter; 365 366 $datatable = "data_{$col->getTable()}"; 367 $multitable = "multi_{$col->getTable()}"; 368 369 /** @var $col Column */ 370 if($col->isMulti()) { 371 $MN = 'MN' . $col->getColref(); // FIXME this joins a second time if the column was selected before 372 373 $QB->addLeftJoin( 374 $datatable, 375 $multitable, 376 $MN, 377 "$datatable.pid = $MN.pid AND 378 $datatable.rev = $MN.rev AND 379 $MN.colref = {$col->getColref()}" 380 ); 381 $coltbl = $MN; 382 $colnam = 'value'; 383 } else { 384 $coltbl = $datatable; 385 $colnam = $col->getColName(); 386 } 387 388 $col->getType()->filter($QB, $coltbl, $colnam, $comp, $value, $op); // type based filter 389 } 390 391 // sorting - we always sort by the single val column 392 foreach($this->sortby as $sort) { 393 list($col, $asc) = $sort; 394 /** @var $col Column */ 395 $QB->addOrderBy($col->getFullColName(false) . ' '.(($asc) ? 'ASC' : 'DESC')); 396 } 397 398 return $QB->getSQL(); 399 } 400 401 /** 402 * Returns all the columns that where added to the search 403 * 404 * @return Column[] 405 */ 406 public function getColumns() { 407 return $this->columns; 408 } 409 410 411 /** 412 * Find a column to be used in the search 413 * 414 * @param string $colname may contain an alias 415 * @return bool|Column 416 */ 417 public function findColumn($colname) { 418 if(!$this->schemas) throw new StructException('noschemas'); 419 420 // handling of page and title column is special - we add a "fake" column 421 $schema_list = array_keys($this->schemas); 422 if($colname == '%pageid%') { 423 return new PageColumn(0, new Page(), $schema_list[0]); 424 } 425 if($colname == '%title%') { 426 return new PageColumn(0, new Page(array('usetitles' => true)), $schema_list[0]); 427 } 428 if($colname == '%lastupdate%') { 429 return new RevisionColumn(0, new DateTime(), $schema_list[0]); 430 } 431 432 // resolve the alias or table name 433 list($table, $colname) = explode('.', $colname, 2); 434 if(!$colname) { 435 $colname = $table; 436 $table = ''; 437 } 438 if($table && isset($this->aliases[$table])) { 439 $table = $this->aliases[$table]; 440 } 441 442 if(!$colname) throw new StructException('nocolname'); 443 444 // if table name given search only that, otherwise try all for matching column name 445 if($table) { 446 $schemas = array($table => $this->schemas[$table]); 447 } else { 448 $schemas = $this->schemas; 449 } 450 451 // find it 452 $col = false; 453 foreach($schemas as $schema) { 454 if(empty($schema)) { 455 continue; 456 } 457 $col = $schema->findColumn($colname); 458 if($col) break; 459 } 460 461 return $col; 462 } 463 464 /** 465 * Check if a row is empty / only contains a reference to itself 466 * 467 * @param array $rowColumns an array as returned from the database 468 * @return bool 469 */ 470 private function isRowEmpty($rowColumns) { 471 $C = 0; 472 foreach($this->columns as $col) { 473 $val = $rowColumns["C$C"]; 474 $C += 1; 475 if (blank($val) || is_a($col->getType(),'dokuwiki\plugin\struct\types\Page') && $val == $rowColumns["PID"]) { 476 continue; 477 } 478 return false; 479 } 480 return true; 481 } 482 483} 484 485 486