1<?php 2 3namespace dokuwiki\plugin\struct\meta; 4 5use dokuwiki\plugin\struct\types\Page; 6 7class Search { 8 /** 9 * This separator will be used to concat multi values to flatten them in the result set 10 */ 11 const CONCAT_SEPARATOR = "\n!_-_-_-_-_!\n"; 12 13 /** 14 * The list of known and allowed comparators 15 * (order matters) 16 */ 17 static public $COMPARATORS = array( 18 '<=', '>=', '=*', '=', '<', '>', '!=', '!~', '~', 19 ); 20 21 /** @var \helper_plugin_sqlite */ 22 protected $sqlite; 23 24 /** @var Schema[] list of schemas to query */ 25 protected $schemas = array(); 26 27 /** @var Column[] list of columns to select */ 28 protected $columns = array(); 29 30 /** @var array the sorting of the result */ 31 protected $sortby = array(); 32 33 /** @var array the filters */ 34 protected $filter = array(); 35 36 /** @var array list of aliases tables can be referenced by */ 37 protected $aliases = array(); 38 39 /** @var int begin results from here */ 40 protected $range_begin = 0; 41 42 /** @var int end results here */ 43 protected $range_end = 0; 44 45 /** @var int the number of results */ 46 protected $count = -1; 47 48 /** 49 * Search constructor. 50 */ 51 public function __construct() { 52 /** @var \helper_plugin_struct_db $plugin */ 53 $plugin = plugin_load('helper', 'struct_db'); 54 $this->sqlite = $plugin->getDB(); 55 } 56 57 /** 58 * Add a schema to be searched 59 * 60 * Call multiple times for multiple schemas. 61 * 62 * @param string $table 63 * @param string $alias 64 */ 65 public function addSchema($table, $alias = '') { 66 $this->schemas[$table] = new Schema($table); 67 if($alias) $this->aliases[$alias] = $table; 68 } 69 70 /** 71 * Add a column to be returned by the search 72 * 73 * Call multiple times for multiple columns. Be sure the referenced tables have been 74 * added before 75 * 76 * @param string $colname may contain an alias 77 */ 78 public function addColumn($colname) { 79 $col = $this->findColumn($colname); 80 if(!$col) return; //FIXME do we really want to ignore missing columns? 81 $this->columns[] = $col; 82 } 83 84 /** 85 * Add sorting options 86 * 87 * Call multiple times for multiple columns. Be sure the referenced tables have been 88 * added before 89 * 90 * @param string $colname may contain an alias 91 * @param bool $asc sort direction (ASC = true, DESC = false) 92 */ 93 public function addSort($colname, $asc = true) { 94 $col = $this->findColumn($colname); 95 if(!$col) return; //FIXME do we really want to ignore missing columns? 96 97 $this->sortby[$col->getFullQualifiedLabel()] = array($col, $asc); 98 } 99 100 /** 101 * Returns all set sort columns 102 * 103 * @return array 104 */ 105 public function getSorts() { 106 return $this->sortby; 107 } 108 109 /** 110 * Adds a filter 111 * 112 * @param string $colname may contain an alias 113 * @param string $value 114 * @param string $comp @see self::COMPARATORS 115 * @param string $op either 'OR' or 'AND' 116 */ 117 public function addFilter($colname, $value, $comp, $op = 'OR') { 118 /* Convert certain filters into others 119 * this reduces the number of supported filters to implement in types */ 120 if ($comp == '*~') { 121 $value = '*' . $value . '*'; 122 $comp = '~'; 123 } elseif ($comp == '<>') { 124 $comp = '!='; 125 } 126 127 if(!in_array($comp, self::$COMPARATORS)) throw new StructException("Bad comperator. Use " . join(',', self::$COMPARATORS)); 128 if($op != 'OR' && $op != 'AND') throw new StructException('Bad filter type . Only AND or OR allowed'); 129 130 $col = $this->findColumn($colname); 131 if(!$col) return; // ignore missing columns, filter might have been for different schema 132 133 // map filter operators to SQL syntax 134 switch($comp) { 135 case '~': 136 $comp = 'LIKE'; 137 break; 138 case '!~': 139 $comp = 'NOT LIKE'; 140 break; 141 case '=*': 142 $comp = 'REGEXP'; 143 break; 144 } 145 146 // we use asterisks, but SQL wants percents 147 if($comp == 'LIKE' || $comp == 'NOT LIKE') { 148 $value = str_replace('*','%',$value); 149 } 150 151 // add the filter 152 $this->filter[] = array($col, $value, $comp, $op); 153 } 154 155 /** 156 * Set offset for the results 157 * 158 * @param int $offset 159 */ 160 public function setOffset($offset) { 161 $limit = 0; 162 if($this->range_end) { 163 // if there was a limit set previously, the range_end needs to be recalculated 164 $limit = $this->range_end - $this->range_begin; 165 } 166 $this->range_begin = $offset; 167 if($limit) $this->setLimit($limit); 168 } 169 170 /** 171 * Limit results to this number 172 * 173 * @param int $limit Set to 0 to disable limit again 174 */ 175 public function setLimit($limit) { 176 if($limit) { 177 $this->range_end = $this->range_begin + $limit; 178 } else { 179 $this->range_end = 0; 180 } 181 } 182 183 /** 184 * Return the number of results (regardless of limit and offset settings) 185 * 186 * Use this to implement paging. Important: this may only be called after running @see execute() 187 * 188 * @return int 189 */ 190 public function getCount() { 191 if($this->count < 0) throw new StructException('Count is only accessible after executing the search'); 192 return $this->count; 193 } 194 195 /** 196 * Execute this search and return the result 197 * 198 * The result is a two dimensional array of Value()s. 199 * 200 * This will always query for the full result (not using offset and limit) and then 201 * return the wanted range, setting the count (@see getCount) to the whole result number 202 * 203 * @return Value[][] 204 */ 205 public function execute() { 206 list($sql, $opts) = $this->getSQL(); 207 208 /** @var \PDOStatement $res */ 209 $res = $this->sqlite->query($sql, $opts); 210 if($res === false) throw new StructException("SQL execution failed for\n\n$sql"); 211 212 $result = array(); 213 $cursor = -1; 214 while($row = $res->fetch(\PDO::FETCH_ASSOC)) { 215 if ($this->isRowEmpty($row)) { 216 continue; 217 } 218 $cursor++; 219 if($cursor < $this->range_begin) continue; 220 if($this->range_end && $cursor >= $this->range_end) continue; 221 222 $C = 0; 223 $resrow = array(); 224 foreach($this->columns as $col) { 225 $val = $row["C$C"]; 226 if($col->isMulti()) { 227 $val = explode(self::CONCAT_SEPARATOR, $val); 228 } 229 $resrow[] = new Value($col, $val); 230 $C++; 231 } 232 $result[] = $resrow; 233 } 234 235 $this->sqlite->res_close($res); 236 $this->count = $cursor + 1; 237 return $result; 238 } 239 240 /** 241 * Transform the set search parameters into a statement 242 * 243 * @return array ($sql, $opts) The SQL and parameters to execute 244 */ 245 public function getSQL() { 246 if(!$this->columns) throw new StructException('nocolname'); 247 248 $QB = new QueryBuilder(); 249 250 // basic tables 251 $first_table = ''; 252 foreach($this->schemas as $schema) { 253 $datatable = 'data_'.$schema->getTable(); 254 if($first_table) { 255 // follow up tables 256 $QB->addLeftJoin($first_table, $datatable, $datatable, "$first_table.pid = $datatable.pid"); 257 } else { 258 // first table 259 $QB->addTable('schema_assignments'); 260 $QB->addTable($datatable); 261 $QB->addSelectColumn($datatable, 'pid', 'PID'); 262 $QB->addGroupByColumn($datatable, 'pid'); 263 264 $QB->filters()->whereAnd("$datatable.pid = schema_assignments.pid"); 265 $QB->filters()->whereAnd("schema_assignments.tbl = '{$schema->getTable()}'"); 266 $QB->filters()->whereAnd("schema_assignments.assigned = 1"); 267 $QB->filters()->whereAnd("GETACCESSLEVEL($datatable.pid) > 0"); 268 $QB->filters()->whereAnd("PAGEEXISTS($datatable.pid) = 1"); 269 270 $first_table = $datatable; 271 } 272 $QB->filters()->whereAnd("$datatable.latest = 1"); 273 } 274 275 // columns to select, handling multis 276 $sep = self::CONCAT_SEPARATOR; 277 $n = 0; 278 foreach($this->columns as $col) { 279 $CN = 'C' . $n++; 280 281 if($col->isMulti()) { 282 $datatable = "data_{$col->getTable()}"; 283 $multitable = "multi_{$col->getTable()}"; 284 $MN = 'M' . $col->getColref(); 285 286 $QB->addLeftJoin( 287 $datatable, 288 $multitable, 289 $MN, 290 "$datatable.pid = $MN.pid AND 291 $datatable.rev = $MN.rev AND 292 $MN.colref = {$col->getColref()}" 293 ); 294 295 $col->getType()->select($QB, $MN, 'value' , $CN); 296 $sel = $QB->getSelectStatement($CN); 297 $QB->addSelectStatement("GROUP_CONCAT($sel, '$sep')", $CN); 298 } else { 299 $col->getType()->select($QB, 'data_'.$col->getTable(), $col->getColName() , $CN); 300 $QB->addGroupByStatement($CN); 301 } 302 } 303 304 // where clauses 305 foreach($this->filter as $filter) { 306 list($col, $value, $comp, $op) = $filter; 307 308 $datatable = "data_{$col->getTable()}"; 309 $multitable = "multi_{$col->getTable()}"; 310 311 /** @var $col Column */ 312 if($col->isMulti()) { 313 $MN = 'MN' . $col->getColref(); // FIXME this joins a second time if the column was selected before 314 315 $QB->addLeftJoin( 316 $datatable, 317 $multitable, 318 $MN, 319 "$datatable.pid = $MN.pid AND 320 $datatable.rev = $MN.rev AND 321 $MN.colref = {$col->getColref()}" 322 ); 323 $coltbl = $MN; 324 $colnam = 'value'; 325 } else { 326 $coltbl = $datatable; 327 $colnam = $col->getColName(); 328 } 329 $col->getType()->filter($QB, $coltbl, $colnam, $comp, $value, $op); // type based filter 330 } 331 332 // sorting - we always sort by the single val column 333 foreach($this->sortby as $sort) { 334 list($col, $asc) = $sort; 335 /** @var $col Column */ 336 $QB->addOrderBy($col->getFullColName(false) . ' '.(($asc) ? 'ASC' : 'DESC')); 337 } 338 339 return $QB->getSQL(); 340 } 341 342 /** 343 * Returns all the columns that where added to the search 344 * 345 * @return Column[] 346 */ 347 public function getColumns() { 348 return $this->columns; 349 } 350 351 352 /** 353 * Find a column to be used in the search 354 * 355 * @param string $colname may contain an alias 356 * @return bool|Column 357 */ 358 public function findColumn($colname) { 359 if(!$this->schemas) throw new StructException('noschemas'); 360 361 // handling of page and title column is special - we add a "fake" column 362 $schema_list = array_keys($this->schemas); 363 if($colname == '%pageid%') { 364 return new PageColumn(0, new Page(), $schema_list[0]); 365 } 366 if($colname == '%title%') { 367 return new PageColumn(0, new Page(array('usetitles' => true)), $schema_list[0]); 368 } 369 370 // resolve the alias or table name 371 list($table, $colname) = explode('.', $colname, 2); 372 if(!$colname) { 373 $colname = $table; 374 $table = ''; 375 } 376 if($table && isset($this->aliases[$table])) { 377 $table = $this->aliases[$table]; 378 } 379 380 if(!$colname) throw new StructException('nocolname'); 381 382 // if table name given search only that, otherwise try all for matching column name 383 if($table) { 384 $schemas = array($table => $this->schemas[$table]); 385 } else { 386 $schemas = $this->schemas; 387 } 388 389 // find it 390 $col = false; 391 foreach($schemas as $schema) { 392 if(empty($schema)) { 393 continue; 394 } 395 $col = $schema->findColumn($colname); 396 if($col) break; 397 } 398 399 return $col; 400 } 401 402 /** 403 * Check if a row is empty / only contains a reference to itself 404 * 405 * @param array $rowColumns an array as returned from the database 406 * @return bool 407 */ 408 private function isRowEmpty($rowColumns) { 409 $C = 0; 410 foreach($this->columns as $col) { 411 $val = $rowColumns["C$C"]; 412 $C += 1; 413 if (blank($val) || is_a($col->getType(),'dokuwiki\plugin\struct\types\Page') && $val == $rowColumns["PID"]) { 414 continue; 415 } 416 return false; 417 } 418 return true; 419 } 420 421} 422 423 424