Copybara | 854996b | 2021-09-07 19:36:02 +0000 | [diff] [blame^] | 1 | # Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 | # Use of this source code is governed by a BSD-style |
| 3 | # license that can be found in the LICENSE file or at |
| 4 | # https://developers.google.com/open-source/licenses/bsd |
| 5 | |
| 6 | """The FrontendSearchPipeline class manages issue search and sorting. |
| 7 | |
| 8 | The frontend pipeline checks memcache for cached results in each shard. It |
| 9 | then calls backend jobs to do any shards that had a cache miss. On cache hit, |
| 10 | the cached results must be filtered by permissions, so the at-risk cache and |
| 11 | backends are consulted. Next, the sharded results are combined into an overall |
| 12 | list of IIDs. Then, that list is paginated and the issues on the current |
| 13 | pagination page can be shown. Alternatively, this class can determine just the |
| 14 | position the currently shown issue would occupy in the overall sorted list. |
| 15 | """ |
| 16 | |
| 17 | from __future__ import division |
| 18 | from __future__ import print_function |
| 19 | from __future__ import absolute_import |
| 20 | |
| 21 | import json |
| 22 | |
| 23 | import collections |
| 24 | import logging |
| 25 | import math |
| 26 | import random |
| 27 | import time |
| 28 | |
| 29 | from google.appengine.api import apiproxy_stub_map |
| 30 | from google.appengine.api import memcache |
| 31 | from google.appengine.api import modules |
| 32 | from google.appengine.api import urlfetch |
| 33 | |
| 34 | import settings |
| 35 | from features import savedqueries_helpers |
| 36 | from framework import framework_bizobj |
| 37 | from framework import framework_constants |
| 38 | from framework import framework_helpers |
| 39 | from framework import paginate |
| 40 | from framework import permissions |
| 41 | from framework import sorting |
| 42 | from framework import urls |
| 43 | from search import ast2ast |
| 44 | from search import query2ast |
| 45 | from search import searchpipeline |
| 46 | from services import fulltext_helpers |
| 47 | from tracker import tracker_bizobj |
| 48 | from tracker import tracker_constants |
| 49 | from tracker import tracker_helpers |
| 50 | |
| 51 | |
| 52 | # Fail-fast responses usually finish in less than 50ms. If we see a failure |
| 53 | # in under that amount of time, we don't bother logging it. |
| 54 | FAIL_FAST_LIMIT_SEC = 0.1 |
| 55 | |
| 56 | DELAY_BETWEEN_RPC_COMPLETION_POLLS = 0.04 # 40 milliseconds |
| 57 | |
| 58 | # The choices help balance the cost of choosing samples vs. the cost of |
| 59 | # selecting issues that are in a range bounded by neighboring samples. |
| 60 | # Preferred chunk size parameters were determined by experimentation. |
| 61 | MIN_SAMPLE_CHUNK_SIZE = int( |
| 62 | math.sqrt(tracker_constants.DEFAULT_RESULTS_PER_PAGE)) |
| 63 | MAX_SAMPLE_CHUNK_SIZE = int(math.sqrt(settings.search_limit_per_shard)) |
| 64 | PREFERRED_NUM_CHUNKS = 50 |
| 65 | |
| 66 | |
| 67 | # TODO(jojwang): monorail:4127: combine some url parameters info or |
| 68 | # query info into dicts or tuples to make argument manager easier. |
| 69 | class FrontendSearchPipeline(object): |
| 70 | """Manage the process of issue search, including backends and caching. |
| 71 | |
| 72 | Even though the code is divided into several methods, the public |
| 73 | methods should be called in sequence, so the execution of the code |
| 74 | is pretty much in the order of the source code lines here. |
| 75 | """ |
| 76 | |
| 77 | def __init__( |
| 78 | self, |
| 79 | cnxn, |
| 80 | services, |
| 81 | auth, |
| 82 | me_user_ids, |
| 83 | query, |
| 84 | query_project_names, |
| 85 | items_per_page, |
| 86 | paginate_start, |
| 87 | can, |
| 88 | group_by_spec, |
| 89 | sort_spec, |
| 90 | warnings, |
| 91 | errors, |
| 92 | use_cached_searches, |
| 93 | profiler, |
| 94 | project=None): |
| 95 | self.cnxn = cnxn |
| 96 | self.me_user_ids = me_user_ids |
| 97 | self.auth = auth |
| 98 | self.logged_in_user_id = auth.user_id or 0 |
| 99 | self.can = can |
| 100 | self.items_per_page = items_per_page |
| 101 | self.paginate_start = paginate_start |
| 102 | self.group_by_spec = group_by_spec |
| 103 | self.sort_spec = sort_spec |
| 104 | self.warnings = warnings |
| 105 | self.use_cached_searches = use_cached_searches |
| 106 | self.profiler = profiler |
| 107 | |
| 108 | self.services = services |
| 109 | self.pagination = None |
| 110 | self.num_skipped_at_start = 0 |
| 111 | self.total_count = 0 |
| 112 | self.errors = errors |
| 113 | |
| 114 | self.project_name = '' |
| 115 | if project: |
| 116 | self.project_name = project.project_name |
| 117 | self.query_projects = [] |
| 118 | if query_project_names: |
| 119 | consider_projects = list(services.project.GetProjectsByName( |
| 120 | self.cnxn, query_project_names).values()) |
| 121 | self.query_projects = [ |
| 122 | p for p in consider_projects |
| 123 | if permissions.UserCanViewProject( |
| 124 | self.auth.user_pb, self.auth.effective_ids, p)] |
| 125 | if project: |
| 126 | self.query_projects.append(project) |
| 127 | member_of_all_projects = self.auth.user_pb.is_site_admin or all( |
| 128 | framework_bizobj.UserIsInProject(p, self.auth.effective_ids) |
| 129 | for p in self.query_projects) |
| 130 | self.query_project_ids = sorted([ |
| 131 | p.project_id for p in self.query_projects]) |
| 132 | self.query_project_names = sorted([ |
| 133 | p.project_name for p in self.query_projects]) |
| 134 | |
| 135 | config_dict = self.services.config.GetProjectConfigs( |
| 136 | self.cnxn, self.query_project_ids) |
| 137 | self.harmonized_config = tracker_bizobj.HarmonizeConfigs( |
| 138 | list(config_dict.values())) |
| 139 | |
| 140 | # The following fields are filled in as the pipeline progresses. |
| 141 | # The value None means that we still need to compute that value. |
| 142 | # A shard_key is a tuple (shard_id, subquery). |
| 143 | self.users_by_id = {} |
| 144 | self.nonviewable_iids = {} # {shard_id: set(iid)} |
| 145 | self.unfiltered_iids = {} # {shard_key: [iid, ...]} needing perm checks. |
| 146 | self.filtered_iids = {} # {shard_key: [iid, ...]} already perm checked. |
| 147 | self.search_limit_reached = {} # {shard_key: [bool, ...]}. |
| 148 | self.allowed_iids = [] # Matching iids that user is permitted to view. |
| 149 | self.allowed_results = None # results that the user is permitted to view. |
| 150 | self.visible_results = None # allowed_results on current pagination page. |
| 151 | self.error_responses = set() |
| 152 | |
| 153 | error_msg = _CheckQuery( |
| 154 | self.cnxn, self.services, query, self.harmonized_config, |
| 155 | self.query_project_ids, member_of_all_projects, |
| 156 | warnings=self.warnings) |
| 157 | if error_msg: |
| 158 | self.errors.query = error_msg |
| 159 | |
| 160 | # Split up query into smaller subqueries that would get the same results |
| 161 | # to improve performance. Smaller queries are more likely to get cache |
| 162 | # hits and subqueries can be parallelized by querying for them across |
| 163 | # multiple shards. |
| 164 | self.subqueries = [] |
| 165 | try: |
| 166 | self.subqueries = query2ast.QueryToSubqueries(query) |
| 167 | except query2ast.InvalidQueryError: |
| 168 | # Ignore errors because they've already been recorded in |
| 169 | # self.errors.query. |
| 170 | pass |
| 171 | |
| 172 | def SearchForIIDs(self): |
| 173 | """Use backends to search each shard and store their results.""" |
| 174 | with self.profiler.Phase('Checking cache and calling Backends'): |
| 175 | rpc_tuples = _StartBackendSearch( |
| 176 | self.cnxn, self.query_project_names, self.query_project_ids, |
| 177 | self.harmonized_config, self.unfiltered_iids, |
| 178 | self.search_limit_reached, self.nonviewable_iids, |
| 179 | self.error_responses, self.services, self.me_user_ids, |
| 180 | self.logged_in_user_id, self.items_per_page + self.paginate_start, |
| 181 | self.subqueries, self.can, self.group_by_spec, self.sort_spec, |
| 182 | self.warnings, self.use_cached_searches) |
| 183 | |
| 184 | with self.profiler.Phase('Waiting for Backends'): |
| 185 | try: |
| 186 | _FinishBackendSearch(rpc_tuples) |
| 187 | except Exception as e: |
| 188 | logging.exception(e) |
| 189 | raise |
| 190 | |
| 191 | if self.error_responses: |
| 192 | logging.error('%r error responses. Incomplete search results.', |
| 193 | self.error_responses) |
| 194 | |
| 195 | with self.profiler.Phase('Filtering cached results'): |
| 196 | for shard_key in self.unfiltered_iids: |
| 197 | shard_id, _subquery = shard_key |
| 198 | if shard_id not in self.nonviewable_iids: |
| 199 | logging.error( |
| 200 | 'Not displaying shard %r because of no nonviewable_iids', shard_id) |
| 201 | self.error_responses.add(shard_id) |
| 202 | filtered_shard_iids = [] |
| 203 | else: |
| 204 | unfiltered_shard_iids = self.unfiltered_iids[shard_key] |
| 205 | nonviewable_shard_iids = self.nonviewable_iids[shard_id] |
| 206 | # TODO(jrobbins): avoid creating large temporary lists. |
| 207 | filtered_shard_iids = [iid for iid in unfiltered_shard_iids |
| 208 | if iid not in nonviewable_shard_iids] |
| 209 | self.filtered_iids[shard_key] = filtered_shard_iids |
| 210 | |
| 211 | seen_iids_by_shard_id = collections.defaultdict(set) |
| 212 | with self.profiler.Phase('Dedupping result IIDs across shards'): |
| 213 | for shard_key in self.filtered_iids: |
| 214 | shard_id, _subquery = shard_key |
| 215 | deduped = [iid for iid in self.filtered_iids[shard_key] |
| 216 | if iid not in seen_iids_by_shard_id[shard_id]] |
| 217 | self.filtered_iids[shard_key] = deduped |
| 218 | seen_iids_by_shard_id[shard_id].update(deduped) |
| 219 | |
| 220 | with self.profiler.Phase('Counting all filtered results'): |
| 221 | for shard_key in self.filtered_iids: |
| 222 | self.total_count += len(self.filtered_iids[shard_key]) |
| 223 | |
| 224 | with self.profiler.Phase('Trimming results beyond pagination page'): |
| 225 | for shard_key in self.filtered_iids: |
| 226 | self.filtered_iids[shard_key] = self.filtered_iids[ |
| 227 | shard_key][:self.paginate_start + self.items_per_page] |
| 228 | |
| 229 | def MergeAndSortIssues(self): |
| 230 | """Merge and sort results from all shards into one combined list.""" |
| 231 | with self.profiler.Phase('selecting issues to merge and sort'): |
| 232 | self._NarrowFilteredIIDs() |
| 233 | self.allowed_iids = [] |
| 234 | for filtered_shard_iids in self.filtered_iids.values(): |
| 235 | self.allowed_iids.extend(filtered_shard_iids) |
| 236 | |
| 237 | with self.profiler.Phase('getting allowed results'): |
| 238 | self.allowed_results = self.services.issue.GetIssues( |
| 239 | self.cnxn, self.allowed_iids) |
| 240 | |
| 241 | # Note: At this point, we have results that are only sorted within |
| 242 | # each backend's shard. We still need to sort the merged result. |
| 243 | self._LookupNeededUsers(self.allowed_results) |
| 244 | with self.profiler.Phase('merging and sorting issues'): |
| 245 | self.allowed_results = _SortIssues( |
| 246 | self.allowed_results, self.harmonized_config, self.users_by_id, |
| 247 | self.group_by_spec, self.sort_spec) |
| 248 | |
| 249 | def _NarrowFilteredIIDs(self): |
| 250 | """Combine filtered shards into a range of IIDs for issues to sort. |
| 251 | |
| 252 | The niave way is to concatenate shard_iids[:start + num] for all |
| 253 | shards then select [start:start + num]. We do better by sampling |
| 254 | issues and then determining which of those samples are known to |
| 255 | come before start or after start+num. We then trim off all those IIDs |
| 256 | and sort a smaller range of IIDs that might actuall be displayed. |
| 257 | See the design doc at go/monorail-sorting. |
| 258 | |
| 259 | This method modifies self.fitered_iids and self.num_skipped_at_start. |
| 260 | """ |
| 261 | # Sample issues and skip those that are known to come before start. |
| 262 | # See the "Sorting in Monorail" design doc. |
| 263 | |
| 264 | # If the result set is small, don't bother optimizing it. |
| 265 | orig_length = _TotalLength(self.filtered_iids) |
| 266 | if orig_length < self.items_per_page * 4: |
| 267 | return |
| 268 | |
| 269 | # 1. Get sample issues in each shard and sort them all together. |
| 270 | last = self.paginate_start + self.items_per_page |
| 271 | |
| 272 | samples_by_shard, sample_iids_to_shard = self._FetchAllSamples( |
| 273 | self.filtered_iids) |
| 274 | sample_issues = [] |
| 275 | for issue_dict in samples_by_shard.values(): |
| 276 | sample_issues.extend(list(issue_dict.values())) |
| 277 | |
| 278 | self._LookupNeededUsers(sample_issues) |
| 279 | sample_issues = _SortIssues( |
| 280 | sample_issues, self.harmonized_config, self.users_by_id, |
| 281 | self.group_by_spec, self.sort_spec) |
| 282 | sample_iid_tuples = [ |
| 283 | (issue.issue_id, sample_iids_to_shard[issue.issue_id]) |
| 284 | for issue in sample_issues] |
| 285 | |
| 286 | # 2. Trim off some IIDs that are sure to be positioned after last. |
| 287 | num_trimmed_end = _TrimEndShardedIIDs( |
| 288 | self.filtered_iids, sample_iid_tuples, last) |
| 289 | logging.info('Trimmed %r issues from the end of shards', num_trimmed_end) |
| 290 | |
| 291 | # 3. Trim off some IIDs that are sure to be posiitoned before start. |
| 292 | keep = _TotalLength(self.filtered_iids) - self.paginate_start |
| 293 | # Reverse the sharded lists. |
| 294 | _ReverseShards(self.filtered_iids) |
| 295 | sample_iid_tuples.reverse() |
| 296 | self.num_skipped_at_start = _TrimEndShardedIIDs( |
| 297 | self.filtered_iids, sample_iid_tuples, keep) |
| 298 | logging.info('Trimmed %r issues from the start of shards', |
| 299 | self.num_skipped_at_start) |
| 300 | # Reverse sharded lists again to get back into forward order. |
| 301 | _ReverseShards(self.filtered_iids) |
| 302 | |
| 303 | def DetermineIssuePosition(self, issue): |
| 304 | """Calculate info needed to show the issue flipper. |
| 305 | |
| 306 | Args: |
| 307 | issue: The issue currently being viewed. |
| 308 | |
| 309 | Returns: |
| 310 | A 3-tuple (prev_iid, index, next_iid) were prev_iid is the |
| 311 | IID of the previous issue in the total ordering (or None), |
| 312 | index is the index that the current issue has in the total |
| 313 | ordering, and next_iid is the next issue (or None). If the current |
| 314 | issue is not in the list of results at all, returns None, None, None. |
| 315 | """ |
| 316 | # 1. If the current issue is not in the results at all, then exit. |
| 317 | if not any(issue.issue_id in filtered_shard_iids |
| 318 | for filtered_shard_iids in self.filtered_iids.values()): |
| 319 | return None, None, None |
| 320 | |
| 321 | # 2. Choose and retrieve sample issues in each shard. |
| 322 | samples_by_shard, _ = self._FetchAllSamples(self.filtered_iids) |
| 323 | |
| 324 | # 3. Build up partial results for each shard. |
| 325 | preceeding_counts = {} # dict {shard_key: num_issues_preceeding_current} |
| 326 | prev_candidates, next_candidates = [], [] |
| 327 | for shard_key in self.filtered_iids: |
| 328 | prev_candidate, index_in_shard, next_candidate = ( |
| 329 | self._DetermineIssuePositionInShard( |
| 330 | shard_key, issue, samples_by_shard[shard_key])) |
| 331 | preceeding_counts[shard_key] = index_in_shard |
| 332 | if prev_candidate: |
| 333 | prev_candidates.append(prev_candidate) |
| 334 | if next_candidate: |
| 335 | next_candidates.append(next_candidate) |
| 336 | |
| 337 | # 4. Combine the results. |
| 338 | index = sum(preceeding_counts.values()) |
| 339 | prev_candidates = _SortIssues( |
| 340 | prev_candidates, self.harmonized_config, self.users_by_id, |
| 341 | self.group_by_spec, self.sort_spec) |
| 342 | prev_iid = prev_candidates[-1].issue_id if prev_candidates else None |
| 343 | next_candidates = _SortIssues( |
| 344 | next_candidates, self.harmonized_config, self.users_by_id, |
| 345 | self.group_by_spec, self.sort_spec) |
| 346 | next_iid = next_candidates[0].issue_id if next_candidates else None |
| 347 | |
| 348 | return prev_iid, index, next_iid |
| 349 | |
| 350 | def _DetermineIssuePositionInShard(self, shard_key, issue, sample_dict): |
| 351 | """Determine where the given issue would fit into results from a shard.""" |
| 352 | # See the design doc for details. Basically, it first surveys the results |
| 353 | # to bound a range where the given issue would belong, then it fetches the |
| 354 | # issues in that range and sorts them. |
| 355 | |
| 356 | filtered_shard_iids = self.filtered_iids[shard_key] |
| 357 | |
| 358 | # 1. Select a sample of issues, leveraging ones we have in RAM already. |
| 359 | issues_on_hand = list(sample_dict.values()) |
| 360 | if issue.issue_id not in sample_dict: |
| 361 | issues_on_hand.append(issue) |
| 362 | |
| 363 | self._LookupNeededUsers(issues_on_hand) |
| 364 | sorted_on_hand = _SortIssues( |
| 365 | issues_on_hand, self.harmonized_config, self.users_by_id, |
| 366 | self.group_by_spec, self.sort_spec) |
| 367 | sorted_on_hand_iids = [soh.issue_id for soh in sorted_on_hand] |
| 368 | index_in_on_hand = sorted_on_hand_iids.index(issue.issue_id) |
| 369 | |
| 370 | # 2. Bound the gap around where issue belongs. |
| 371 | if index_in_on_hand == 0: |
| 372 | fetch_start = 0 |
| 373 | else: |
| 374 | prev_on_hand_iid = sorted_on_hand_iids[index_in_on_hand - 1] |
| 375 | fetch_start = filtered_shard_iids.index(prev_on_hand_iid) + 1 |
| 376 | |
| 377 | if index_in_on_hand == len(sorted_on_hand) - 1: |
| 378 | fetch_end = len(filtered_shard_iids) |
| 379 | else: |
| 380 | next_on_hand_iid = sorted_on_hand_iids[index_in_on_hand + 1] |
| 381 | fetch_end = filtered_shard_iids.index(next_on_hand_iid) |
| 382 | |
| 383 | # 3. Retrieve all the issues in that gap to get an exact answer. |
| 384 | fetched_issues = self.services.issue.GetIssues( |
| 385 | self.cnxn, filtered_shard_iids[fetch_start:fetch_end]) |
| 386 | if issue.issue_id not in filtered_shard_iids[fetch_start:fetch_end]: |
| 387 | fetched_issues.append(issue) |
| 388 | self._LookupNeededUsers(fetched_issues) |
| 389 | sorted_fetched = _SortIssues( |
| 390 | fetched_issues, self.harmonized_config, self.users_by_id, |
| 391 | self.group_by_spec, self.sort_spec) |
| 392 | sorted_fetched_iids = [sf.issue_id for sf in sorted_fetched] |
| 393 | index_in_fetched = sorted_fetched_iids.index(issue.issue_id) |
| 394 | |
| 395 | # 4. Find the issues that come immediately before and after the place where |
| 396 | # the given issue would belong in this shard. |
| 397 | if index_in_fetched > 0: |
| 398 | prev_candidate = sorted_fetched[index_in_fetched - 1] |
| 399 | elif index_in_on_hand > 0: |
| 400 | prev_candidate = sorted_on_hand[index_in_on_hand - 1] |
| 401 | else: |
| 402 | prev_candidate = None |
| 403 | |
| 404 | if index_in_fetched < len(sorted_fetched) - 1: |
| 405 | next_candidate = sorted_fetched[index_in_fetched + 1] |
| 406 | elif index_in_on_hand < len(sorted_on_hand) - 1: |
| 407 | next_candidate = sorted_on_hand[index_in_on_hand + 1] |
| 408 | else: |
| 409 | next_candidate = None |
| 410 | |
| 411 | return prev_candidate, fetch_start + index_in_fetched, next_candidate |
| 412 | |
| 413 | def _FetchAllSamples(self, filtered_iids): |
| 414 | """Return a dict {shard_key: {iid: sample_issue}}.""" |
| 415 | samples_by_shard = {} # {shard_key: {iid: sample_issue}} |
| 416 | sample_iids_to_shard = {} # {iid: shard_key} |
| 417 | all_needed_iids = [] # List of iids to retrieve. |
| 418 | |
| 419 | for shard_key in filtered_iids: |
| 420 | on_hand_issues, shard_needed_iids = self._ChooseSampleIssues( |
| 421 | filtered_iids[shard_key]) |
| 422 | samples_by_shard[shard_key] = on_hand_issues |
| 423 | for iid in on_hand_issues: |
| 424 | sample_iids_to_shard[iid] = shard_key |
| 425 | for iid in shard_needed_iids: |
| 426 | sample_iids_to_shard[iid] = shard_key |
| 427 | all_needed_iids.extend(shard_needed_iids) |
| 428 | |
| 429 | retrieved_samples, _misses = self.services.issue.GetIssuesDict( |
| 430 | self.cnxn, all_needed_iids) |
| 431 | for retrieved_iid, retrieved_issue in retrieved_samples.items(): |
| 432 | retr_shard_key = sample_iids_to_shard[retrieved_iid] |
| 433 | samples_by_shard[retr_shard_key][retrieved_iid] = retrieved_issue |
| 434 | |
| 435 | return samples_by_shard, sample_iids_to_shard |
| 436 | |
| 437 | def _ChooseSampleIssues(self, issue_ids): |
| 438 | """Select a scattering of issues from the list, leveraging RAM cache. |
| 439 | |
| 440 | Args: |
| 441 | issue_ids: A list of issue IDs that comprise the results in a shard. |
| 442 | |
| 443 | Returns: |
| 444 | A pair (on_hand_issues, needed_iids) where on_hand_issues is |
| 445 | an issue dict {iid: issue} of issues already in RAM, and |
| 446 | shard_needed_iids is a list of iids of issues that need to be retrieved. |
| 447 | """ |
| 448 | on_hand_issues = {} # {iid: issue} of sample issues already in RAM. |
| 449 | needed_iids = [] # [iid, ...] of sample issues not in RAM yet. |
| 450 | chunk_size = max(MIN_SAMPLE_CHUNK_SIZE, min(MAX_SAMPLE_CHUNK_SIZE, |
| 451 | int(len(issue_ids) // PREFERRED_NUM_CHUNKS))) |
| 452 | for i in range(chunk_size, len(issue_ids), chunk_size): |
| 453 | issue = self.services.issue.GetAnyOnHandIssue( |
| 454 | issue_ids, start=i, end=min(i + chunk_size, len(issue_ids))) |
| 455 | if issue: |
| 456 | on_hand_issues[issue.issue_id] = issue |
| 457 | else: |
| 458 | needed_iids.append(issue_ids[i]) |
| 459 | |
| 460 | return on_hand_issues, needed_iids |
| 461 | |
| 462 | def _LookupNeededUsers(self, issues): |
| 463 | """Look up user info needed to sort issues, if any.""" |
| 464 | with self.profiler.Phase('lookup of owner, reporter, and cc'): |
| 465 | additional_user_views_by_id = ( |
| 466 | tracker_helpers.MakeViewsForUsersInIssues( |
| 467 | self.cnxn, issues, self.services.user, |
| 468 | omit_ids=list(self.users_by_id.keys()))) |
| 469 | self.users_by_id.update(additional_user_views_by_id) |
| 470 | |
| 471 | def Paginate(self): |
| 472 | """Fetch matching issues and paginate the search results. |
| 473 | |
| 474 | These two actions are intertwined because we try to only |
| 475 | retrieve the Issues on the current pagination page. |
| 476 | """ |
| 477 | # We already got the issues, just display a slice of the visible ones. |
| 478 | limit_reached = False |
| 479 | for shard_limit_reached in self.search_limit_reached.values(): |
| 480 | limit_reached |= shard_limit_reached |
| 481 | self.pagination = paginate.ArtifactPagination( |
| 482 | self.allowed_results, |
| 483 | self.items_per_page, |
| 484 | self.paginate_start, |
| 485 | self.project_name, |
| 486 | urls.ISSUE_LIST, |
| 487 | total_count=self.total_count, |
| 488 | limit_reached=limit_reached, |
| 489 | skipped=self.num_skipped_at_start) |
| 490 | self.visible_results = self.pagination.visible_results |
| 491 | |
| 492 | # If we were not forced to look up visible users already, do it now. |
| 493 | self._LookupNeededUsers(self.visible_results) |
| 494 | |
| 495 | def __repr__(self): |
| 496 | """Return a string that shows the internal state of this pipeline.""" |
| 497 | if self.allowed_iids: |
| 498 | shown_allowed_iids = self.allowed_iids[:200] |
| 499 | else: |
| 500 | shown_allowed_iids = self.allowed_iids |
| 501 | |
| 502 | if self.allowed_results: |
| 503 | shown_allowed_results = self.allowed_results[:200] |
| 504 | else: |
| 505 | shown_allowed_results = self.allowed_results |
| 506 | |
| 507 | parts = [ |
| 508 | 'allowed_iids: %r' % shown_allowed_iids, |
| 509 | 'allowed_results: %r' % shown_allowed_results, |
| 510 | 'len(visible_results): %r' % ( |
| 511 | self.visible_results and len(self.visible_results))] |
| 512 | return '%s(%s)' % (self.__class__.__name__, '\n'.join(parts)) |
| 513 | |
| 514 | |
| 515 | def _CheckQuery( |
| 516 | cnxn, services, query, harmonized_config, project_ids, |
| 517 | member_of_all_projects, warnings=None): |
| 518 | """Parse the given query and report the first error or None.""" |
| 519 | try: |
| 520 | query_ast = query2ast.ParseUserQuery( |
| 521 | query, '', query2ast.BUILTIN_ISSUE_FIELDS, harmonized_config, |
| 522 | warnings=warnings) |
| 523 | query_ast = ast2ast.PreprocessAST( |
| 524 | cnxn, query_ast, project_ids, services, harmonized_config, |
| 525 | is_member=member_of_all_projects) |
| 526 | except query2ast.InvalidQueryError as e: |
| 527 | return e.message |
| 528 | except ast2ast.MalformedQuery as e: |
| 529 | return e.message |
| 530 | |
| 531 | return None |
| 532 | |
| 533 | |
| 534 | def _MakeBackendCallback(func, *args): |
| 535 | # type: (Callable[[*Any], Any], *Any) -> Callable[[*Any], Any] |
| 536 | """Helper to store a particular function and argument set into a callback. |
| 537 | |
| 538 | Args: |
| 539 | func: Function to callback. |
| 540 | *args: The arguments to pass into the function. |
| 541 | |
| 542 | Returns: |
| 543 | Callback function based on specified arguments. |
| 544 | """ |
| 545 | return lambda: func(*args) |
| 546 | |
| 547 | |
| 548 | def _StartBackendSearch( |
| 549 | cnxn, query_project_names, query_project_ids, harmonized_config, |
| 550 | unfiltered_iids_dict, search_limit_reached_dict, nonviewable_iids, |
| 551 | error_responses, services, me_user_ids, logged_in_user_id, new_url_num, |
| 552 | subqueries, can, group_by_spec, sort_spec, warnings, use_cached_searches): |
| 553 | # type: (MonorailConnection, Sequence[str], Sequence[int], |
| 554 | # proto.tracker_pb2.ProjectIssueConfig, |
| 555 | # Mapping[Tuple(int, str), Sequence[int]], |
| 556 | # Mapping[Tuple(int, str), Sequence[bool]], |
| 557 | # Mapping[Tuple(int, str), Collection[int]], Sequence[Tuple(int, str)], |
| 558 | # Services, Sequence[int], int, int, Sequence[str], int, str, str, |
| 559 | # Sequence[Tuple(str, Sequence[str])], bool) -> |
| 560 | # Sequence[Tuple(int, Tuple(int, str), |
| 561 | # google.appengine.api.apiproxy_stub_map.UserRPC)] |
| 562 | """Request that our backends search and return a list of matching issue IDs. |
| 563 | |
| 564 | Args: |
| 565 | cnxn: monorail connection to the database. |
| 566 | query_project_names: set of project names to search. |
| 567 | query_project_ids: list of project IDs to search. |
| 568 | harmonized_config: combined ProjectIssueConfig for all projects being |
| 569 | searched. |
| 570 | unfiltered_iids_dict: dict {shard_key: [iid, ...]} of unfiltered search |
| 571 | results to accumulate into. They need to be later filtered by |
| 572 | permissions and merged into filtered_iids_dict. |
| 573 | search_limit_reached_dict: dict {shard_key: [bool, ...]} to determine if |
| 574 | the search limit of any shard was reached. |
| 575 | nonviewable_iids: dict {shard_id: set(iid)} of restricted issues in the |
| 576 | projects being searched that the signed in user cannot view. |
| 577 | error_responses: shard_iids of shards that encountered errors. |
| 578 | services: connections to backends. |
| 579 | me_user_ids: Empty list when no user is logged in, or user ID of the logged |
| 580 | in user when doing an interactive search, or the viewed user ID when |
| 581 | viewing someone else's dashboard, or the subscribing user's ID when |
| 582 | evaluating subscriptions. And, any linked accounts. |
| 583 | logged_in_user_id: user_id of the logged in user, 0 otherwise |
| 584 | new_url_num: the number of issues for BackendSearchPipeline to query. |
| 585 | Computed based on pagination offset + number of items per page. |
| 586 | subqueries: split up list of query string segments. |
| 587 | can: "canned query" number to scope the user's search. |
| 588 | group_by_spec: string that lists the grouping order. |
| 589 | sort_spec: string that lists the sort order. |
| 590 | warnings: list to accumulate warning messages. |
| 591 | use_cached_searches: Bool for whether to use cached searches. |
| 592 | |
| 593 | Returns: |
| 594 | A list of rpc_tuples that can be passed to _FinishBackendSearch to wait |
| 595 | on any remaining backend calls. |
| 596 | |
| 597 | SIDE-EFFECTS: |
| 598 | Any data found in memcache is immediately put into unfiltered_iids_dict. |
| 599 | As the backends finish their work, _HandleBackendSearchResponse will update |
| 600 | unfiltered_iids_dict for those shards. |
| 601 | |
| 602 | Any warnings produced throughout this process will be added to the list |
| 603 | warnings. |
| 604 | """ |
| 605 | rpc_tuples = [] |
| 606 | needed_shard_keys = set() |
| 607 | for subquery in subqueries: |
| 608 | subquery, warnings = searchpipeline.ReplaceKeywordsWithUserIDs( |
| 609 | me_user_ids, subquery) |
| 610 | warnings.extend(warnings) |
| 611 | for shard_id in range(settings.num_logical_shards): |
| 612 | needed_shard_keys.add((shard_id, subquery)) |
| 613 | |
| 614 | # 1. Get whatever we can from memcache. Cache hits are only kept if they are |
| 615 | # not already expired. |
| 616 | project_shard_timestamps = _GetProjectTimestamps( |
| 617 | query_project_ids, needed_shard_keys) |
| 618 | |
| 619 | if use_cached_searches: |
| 620 | cached_unfiltered_iids_dict, cached_search_limit_reached_dict = ( |
| 621 | _GetCachedSearchResults( |
| 622 | cnxn, query_project_ids, needed_shard_keys, |
| 623 | harmonized_config, project_shard_timestamps, services, me_user_ids, |
| 624 | can, group_by_spec, sort_spec, warnings)) |
| 625 | unfiltered_iids_dict.update(cached_unfiltered_iids_dict) |
| 626 | search_limit_reached_dict.update(cached_search_limit_reached_dict) |
| 627 | for cache_hit_shard_key in unfiltered_iids_dict: |
| 628 | needed_shard_keys.remove(cache_hit_shard_key) |
| 629 | |
| 630 | # 2. Each kept cache hit will have unfiltered IIDs, so we filter them by |
| 631 | # removing non-viewable IDs. |
| 632 | _GetNonviewableIIDs( |
| 633 | query_project_ids, logged_in_user_id, |
| 634 | set(range(settings.num_logical_shards)), |
| 635 | rpc_tuples, nonviewable_iids, project_shard_timestamps, |
| 636 | services.cache_manager.processed_invalidations_up_to, |
| 637 | use_cached_searches) |
| 638 | |
| 639 | # 3. Hit backends for any shards that are still needed. When these results |
| 640 | # come back, they are also put into unfiltered_iids_dict. |
| 641 | for shard_key in needed_shard_keys: |
| 642 | rpc = _StartBackendSearchCall( |
| 643 | query_project_names, |
| 644 | shard_key, |
| 645 | services.cache_manager.processed_invalidations_up_to, |
| 646 | me_user_ids, |
| 647 | logged_in_user_id, |
| 648 | new_url_num, |
| 649 | can=can, |
| 650 | sort_spec=sort_spec, |
| 651 | group_by_spec=group_by_spec) |
| 652 | rpc_tuple = (time.time(), shard_key, rpc) |
| 653 | rpc.callback = _MakeBackendCallback( |
| 654 | _HandleBackendSearchResponse, query_project_names, rpc_tuple, |
| 655 | rpc_tuples, settings.backend_retries, unfiltered_iids_dict, |
| 656 | search_limit_reached_dict, |
| 657 | services.cache_manager.processed_invalidations_up_to, error_responses, |
| 658 | me_user_ids, logged_in_user_id, new_url_num, can, sort_spec, |
| 659 | group_by_spec) |
| 660 | rpc_tuples.append(rpc_tuple) |
| 661 | |
| 662 | return rpc_tuples |
| 663 | |
| 664 | |
| 665 | def _FinishBackendSearch(rpc_tuples): |
| 666 | """Wait for all backend calls to complete, including any retries.""" |
| 667 | while rpc_tuples: |
| 668 | active_rpcs = [rpc for (_time, _shard_key, rpc) in rpc_tuples] |
| 669 | # Wait for any active RPC to complete. It's callback function will |
| 670 | # automatically be called. |
| 671 | finished_rpc = real_wait_any(active_rpcs) |
| 672 | # Figure out which rpc_tuple finished and remove it from our list. |
| 673 | for rpc_tuple in rpc_tuples: |
| 674 | _time, _shard_key, rpc = rpc_tuple |
| 675 | if rpc == finished_rpc: |
| 676 | rpc_tuples.remove(rpc_tuple) |
| 677 | break |
| 678 | else: |
| 679 | raise ValueError('We somehow finished an RPC that is not in rpc_tuples') |
| 680 | |
| 681 | |
| 682 | def real_wait_any(active_rpcs): |
| 683 | """Work around the blocking nature of wait_any(). |
| 684 | |
| 685 | wait_any() checks for any finished RPCs, and returns one if found. |
| 686 | If no RPC is finished, it simply blocks on the last RPC in the list. |
| 687 | This is not the desired behavior because we are not able to detect |
| 688 | FAST-FAIL RPC results and retry them if wait_any() is blocked on a |
| 689 | request that is taking a long time to do actual work. |
| 690 | |
| 691 | Instead, we do the same check, without blocking on any individual RPC. |
| 692 | """ |
| 693 | if settings.local_mode: |
| 694 | # The development server has very different code for RPCs than the |
| 695 | # code used in the hosted environment. |
| 696 | return apiproxy_stub_map.UserRPC.wait_any(active_rpcs) |
| 697 | while True: |
| 698 | finished, _ = apiproxy_stub_map.UserRPC._UserRPC__check_one(active_rpcs) |
| 699 | if finished: |
| 700 | return finished |
| 701 | time.sleep(DELAY_BETWEEN_RPC_COMPLETION_POLLS) |
| 702 | |
| 703 | def _GetProjectTimestamps(query_project_ids, needed_shard_keys): |
| 704 | """Get a dict of modified_ts values for all specified project-shards.""" |
| 705 | project_shard_timestamps = {} |
| 706 | if query_project_ids: |
| 707 | keys = [] |
| 708 | for pid in query_project_ids: |
| 709 | for sid, _subquery in needed_shard_keys: |
| 710 | keys.append('%d;%d' % (pid, sid)) |
| 711 | else: |
| 712 | keys = [('all;%d' % sid) |
| 713 | for sid, _subquery in needed_shard_keys] |
| 714 | |
| 715 | timestamps_for_project = memcache.get_multi( |
| 716 | keys=keys, namespace=settings.memcache_namespace) |
| 717 | for key, timestamp in timestamps_for_project.items(): |
| 718 | pid_str, sid_str = key.split(';') |
| 719 | if pid_str == 'all': |
| 720 | project_shard_timestamps['all', int(sid_str)] = timestamp |
| 721 | else: |
| 722 | project_shard_timestamps[int(pid_str), int(sid_str)] = timestamp |
| 723 | |
| 724 | return project_shard_timestamps |
| 725 | |
| 726 | |
| 727 | def _GetNonviewableIIDs( |
| 728 | query_project_ids, logged_in_user_id, needed_shard_ids, rpc_tuples, |
| 729 | nonviewable_iids, project_shard_timestamps, invalidation_timestep, |
| 730 | use_cached_searches): |
| 731 | """Build a set of at-risk IIDs, and accumulate RPCs to get uncached ones.""" |
| 732 | if query_project_ids: |
| 733 | keys = [] |
| 734 | for pid in query_project_ids: |
| 735 | for sid in needed_shard_ids: |
| 736 | keys.append('%d;%d;%d' % (pid, logged_in_user_id, sid)) |
| 737 | else: |
| 738 | keys = [ |
| 739 | ('all;%d;%d' % (logged_in_user_id, sid)) for sid in needed_shard_ids |
| 740 | ] |
| 741 | |
| 742 | if use_cached_searches: |
| 743 | cached_dict = memcache.get_multi( |
| 744 | keys, key_prefix='nonviewable:', namespace=settings.memcache_namespace) |
| 745 | else: |
| 746 | cached_dict = {} |
| 747 | |
| 748 | for sid in needed_shard_ids: |
| 749 | if query_project_ids: |
| 750 | for pid in query_project_ids: |
| 751 | _AccumulateNonviewableIIDs( |
| 752 | pid, logged_in_user_id, sid, cached_dict, nonviewable_iids, |
| 753 | project_shard_timestamps, rpc_tuples, invalidation_timestep) |
| 754 | else: |
| 755 | _AccumulateNonviewableIIDs( |
| 756 | None, logged_in_user_id, sid, cached_dict, nonviewable_iids, |
| 757 | project_shard_timestamps, rpc_tuples, invalidation_timestep) |
| 758 | |
| 759 | |
| 760 | def _AccumulateNonviewableIIDs( |
| 761 | pid, logged_in_user_id, sid, cached_dict, nonviewable_iids, |
| 762 | project_shard_timestamps, rpc_tuples, invalidation_timestep): |
| 763 | """Use one of the retrieved cache entries or call a backend if needed.""" |
| 764 | if pid is None: |
| 765 | key = 'all;%d;%d' % (logged_in_user_id, sid) |
| 766 | else: |
| 767 | key = '%d;%d;%d' % (pid, logged_in_user_id, sid) |
| 768 | |
| 769 | if key in cached_dict: |
| 770 | issue_ids, cached_ts = cached_dict.get(key) |
| 771 | modified_ts = project_shard_timestamps.get((pid, sid)) |
| 772 | if modified_ts is None or modified_ts > cached_ts: |
| 773 | logging.info('nonviewable too stale on (project %r, shard %r)', |
| 774 | pid, sid) |
| 775 | else: |
| 776 | logging.info('adding %d nonviewable issue_ids', len(issue_ids)) |
| 777 | nonviewable_iids[sid] = set(issue_ids) |
| 778 | |
| 779 | if sid not in nonviewable_iids: |
| 780 | logging.info('nonviewable for %r not found', key) |
| 781 | logging.info('starting backend call for nonviewable iids %r', key) |
| 782 | rpc = _StartBackendNonviewableCall( |
| 783 | pid, logged_in_user_id, sid, invalidation_timestep) |
| 784 | rpc_tuple = (time.time(), sid, rpc) |
| 785 | rpc.callback = _MakeBackendCallback( |
| 786 | _HandleBackendNonviewableResponse, pid, logged_in_user_id, sid, |
| 787 | rpc_tuple, rpc_tuples, settings.backend_retries, nonviewable_iids, |
| 788 | invalidation_timestep) |
| 789 | rpc_tuples.append(rpc_tuple) |
| 790 | |
| 791 | |
| 792 | def _GetCachedSearchResults( |
| 793 | cnxn, query_project_ids, needed_shard_keys, harmonized_config, |
| 794 | project_shard_timestamps, services, me_user_ids, can, group_by_spec, |
| 795 | sort_spec, warnings): |
| 796 | """Return a dict of cached search results that are not already stale. |
| 797 | |
| 798 | If it were not for cross-project search, we would simply cache when we do a |
| 799 | search and then invalidate when an issue is modified. But, with |
| 800 | cross-project search we don't know all the memcache entries that would |
| 801 | need to be invalidated. So, instead, we write the search result cache |
| 802 | entries and then an initial modified_ts value for each project if it was |
| 803 | not already there. And, when we update an issue we write a new |
| 804 | modified_ts entry, which implicitly invalidate all search result |
| 805 | cache entries that were written earlier because they are now stale. When |
| 806 | reading from the cache, we ignore any query project with modified_ts |
| 807 | after its search result cache timestamp, because it is stale. |
| 808 | |
| 809 | Args: |
| 810 | cnxn: monorail connection to the database. |
| 811 | query_project_ids: list of project ID numbers for all projects being |
| 812 | searched. |
| 813 | needed_shard_keys: set of shard keys that need to be checked. |
| 814 | harmonized_config: ProjectIsueConfig with combined information for all |
| 815 | projects involved in this search. |
| 816 | project_shard_timestamps: a dict {(project_id, shard_id): timestamp, ...} |
| 817 | that tells when each shard was last invalidated. |
| 818 | services: connections to backends. |
| 819 | me_user_ids: Empty list when no user is logged in, or user ID of the logged |
| 820 | in user when doing an interactive search, or the viewed user ID when |
| 821 | viewing someone else's dashboard, or the subscribing user's ID when |
| 822 | evaluating subscriptions. And, any linked accounts. |
| 823 | can: "canned query" number to scope the user's search. |
| 824 | group_by_spec: string that lists the grouping order. |
| 825 | sort_spec: string that lists the sort order. |
| 826 | warnings: list to accumulate warning messages. |
| 827 | |
| 828 | |
| 829 | Returns: |
| 830 | Tuple consisting of: |
| 831 | A dictionary {shard_id: [issue_id, ...], ...} of unfiltered search result |
| 832 | issue IDs. Only shard_ids found in memcache will be in that dictionary. |
| 833 | The result issue IDs must be permission checked before they can be |
| 834 | considered to be part of the user's result set. |
| 835 | A dictionary {shard_id: bool, ...}. The boolean is set to True if |
| 836 | the search results limit of the shard is hit. |
| 837 | """ |
| 838 | projects_str = ','.join(str(pid) for pid in sorted(query_project_ids)) |
| 839 | projects_str = projects_str or 'all' |
| 840 | canned_query = savedqueries_helpers.SavedQueryIDToCond( |
| 841 | cnxn, services.features, can) |
| 842 | canned_query, warnings = searchpipeline.ReplaceKeywordsWithUserIDs( |
| 843 | me_user_ids, canned_query) |
| 844 | warnings.extend(warnings) |
| 845 | |
| 846 | sd = sorting.ComputeSortDirectives( |
| 847 | harmonized_config, group_by_spec, sort_spec) |
| 848 | sd_str = ' '.join(sd) |
| 849 | memcache_key_prefix = '%s;%s' % (projects_str, canned_query) |
| 850 | limit_reached_key_prefix = '%s;%s' % (projects_str, canned_query) |
| 851 | |
| 852 | cached_dict = memcache.get_multi( |
| 853 | ['%s;%s;%s;%d' % (memcache_key_prefix, subquery, sd_str, sid) |
| 854 | for sid, subquery in needed_shard_keys], |
| 855 | namespace=settings.memcache_namespace) |
| 856 | cached_search_limit_reached_dict = memcache.get_multi( |
| 857 | ['%s;%s;%s;search_limit_reached;%d' % ( |
| 858 | limit_reached_key_prefix, subquery, sd_str, sid) |
| 859 | for sid, subquery in needed_shard_keys], |
| 860 | namespace=settings.memcache_namespace) |
| 861 | |
| 862 | unfiltered_dict = {} |
| 863 | search_limit_reached_dict = {} |
| 864 | for shard_key in needed_shard_keys: |
| 865 | shard_id, subquery = shard_key |
| 866 | memcache_key = '%s;%s;%s;%d' % ( |
| 867 | memcache_key_prefix, subquery, sd_str, shard_id) |
| 868 | limit_reached_key = '%s;%s;%s;search_limit_reached;%d' % ( |
| 869 | limit_reached_key_prefix, subquery, sd_str, shard_id) |
| 870 | if memcache_key not in cached_dict: |
| 871 | logging.info('memcache miss on shard %r', shard_key) |
| 872 | continue |
| 873 | |
| 874 | cached_iids, cached_ts = cached_dict[memcache_key] |
| 875 | if cached_search_limit_reached_dict.get(limit_reached_key): |
| 876 | search_limit_reached, _ = cached_search_limit_reached_dict[ |
| 877 | limit_reached_key] |
| 878 | else: |
| 879 | search_limit_reached = False |
| 880 | |
| 881 | stale = False |
| 882 | if query_project_ids: |
| 883 | for project_id in query_project_ids: |
| 884 | modified_ts = project_shard_timestamps.get((project_id, shard_id)) |
| 885 | if modified_ts is None or modified_ts > cached_ts: |
| 886 | stale = True |
| 887 | logging.info('memcache too stale on shard %r because of %r', |
| 888 | shard_id, project_id) |
| 889 | break |
| 890 | else: |
| 891 | modified_ts = project_shard_timestamps.get(('all', shard_id)) |
| 892 | if modified_ts is None or modified_ts > cached_ts: |
| 893 | stale = True |
| 894 | logging.info('memcache too stale on shard %r because of all', |
| 895 | shard_id) |
| 896 | |
| 897 | if not stale: |
| 898 | unfiltered_dict[shard_key] = cached_iids |
| 899 | search_limit_reached_dict[shard_key] = search_limit_reached |
| 900 | |
| 901 | return unfiltered_dict, search_limit_reached_dict |
| 902 | |
| 903 | |
| 904 | def _MakeBackendRequestHeaders(failfast): |
| 905 | headers = { |
| 906 | # This is needed to allow frontends to talk to backends without going |
| 907 | # through a login screen on googleplex.com. |
| 908 | # http://wiki/Main/PrometheusInternal#Internal_Applications_and_APIs |
| 909 | 'X-URLFetch-Service-Id': 'GOOGLEPLEX', |
| 910 | } |
| 911 | if failfast: |
| 912 | headers['X-AppEngine-FailFast'] = 'Yes' |
| 913 | return headers |
| 914 | |
| 915 | |
| 916 | def _StartBackendSearchCall( |
| 917 | query_project_names, |
| 918 | shard_key, |
| 919 | invalidation_timestep, |
| 920 | me_user_ids, |
| 921 | logged_in_user_id, |
| 922 | new_url_num, |
| 923 | can=None, |
| 924 | sort_spec=None, |
| 925 | group_by_spec=None, |
| 926 | deadline=None, |
| 927 | failfast=True): |
| 928 | # type: (Sequence[str], Tuple(int, str), int, Sequence[int], int, |
| 929 | # int, str, str, int, bool) -> |
| 930 | # google.appengine.api.apiproxy_stub_map.UserRPC |
| 931 | """Ask a backend to query one shard of the database. |
| 932 | |
| 933 | Args: |
| 934 | query_project_names: List of project names queried. |
| 935 | shard_key: Tuple specifying which DB shard to query. |
| 936 | invalidation_timestep: int timestep to use keep cached items fresh. |
| 937 | me_user_ids: Empty list when no user is logged in, or user ID of the logged |
| 938 | in user when doing an interactive search, or the viewed user ID when |
| 939 | viewing someone else's dashboard, or the subscribing user's ID when |
| 940 | evaluating subscriptions. And, any linked accounts. |
| 941 | logged_in_user_id: Id of the logged in user. |
| 942 | new_url_num: the number of issues for BackendSearchPipeline to query. |
| 943 | Computed based on pagination offset + number of items per page. |
| 944 | can: Id of th canned query to use. |
| 945 | sort_spec: Str specifying how issues should be sorted. |
| 946 | group_by_spec: Str specifying how issues should be grouped. |
| 947 | deadline: Max time for the RPC to take before failing. |
| 948 | failfast: Whether to set the X-AppEngine-FailFast request header. |
| 949 | |
| 950 | Returns: |
| 951 | UserRPC for the created RPC call. |
| 952 | """ |
| 953 | shard_id, subquery = shard_key |
| 954 | backend_host = modules.get_hostname(module='besearch') |
| 955 | url = 'http://%s%s' % ( |
| 956 | backend_host, |
| 957 | framework_helpers.FormatURL( |
| 958 | [], |
| 959 | urls.BACKEND_SEARCH, |
| 960 | projects=','.join(query_project_names), |
| 961 | q=subquery, |
| 962 | start=0, |
| 963 | num=new_url_num, |
| 964 | can=can, |
| 965 | sort=sort_spec, |
| 966 | groupby=group_by_spec, |
| 967 | logged_in_user_id=logged_in_user_id, |
| 968 | me_user_ids=','.join(str(uid) for uid in me_user_ids), |
| 969 | shard_id=shard_id, |
| 970 | invalidation_timestep=invalidation_timestep)) |
| 971 | logging.info('\n\nCalling backend: %s', url) |
| 972 | rpc = urlfetch.create_rpc( |
| 973 | deadline=deadline or settings.backend_deadline) |
| 974 | headers = _MakeBackendRequestHeaders(failfast) |
| 975 | # follow_redirects=False is needed to avoid a login screen on googleplex. |
| 976 | urlfetch.make_fetch_call(rpc, url, follow_redirects=False, headers=headers) |
| 977 | return rpc |
| 978 | |
| 979 | |
| 980 | def _StartBackendNonviewableCall( |
| 981 | project_id, logged_in_user_id, shard_id, invalidation_timestep, |
| 982 | deadline=None, failfast=True): |
| 983 | """Ask a backend to query one shard of the database.""" |
| 984 | backend_host = modules.get_hostname(module='besearch') |
| 985 | url = 'http://%s%s' % (backend_host, framework_helpers.FormatURL( |
| 986 | None, urls.BACKEND_NONVIEWABLE, |
| 987 | project_id=project_id or '', |
| 988 | logged_in_user_id=logged_in_user_id or '', |
| 989 | shard_id=shard_id, |
| 990 | invalidation_timestep=invalidation_timestep)) |
| 991 | logging.info('Calling backend nonviewable: %s', url) |
| 992 | rpc = urlfetch.create_rpc(deadline=deadline or settings.backend_deadline) |
| 993 | headers = _MakeBackendRequestHeaders(failfast) |
| 994 | # follow_redirects=False is needed to avoid a login screen on googleplex. |
| 995 | urlfetch.make_fetch_call(rpc, url, follow_redirects=False, headers=headers) |
| 996 | return rpc |
| 997 | |
| 998 | |
| 999 | def _HandleBackendSearchResponse( |
| 1000 | query_project_names, rpc_tuple, rpc_tuples, remaining_retries, |
| 1001 | unfiltered_iids, search_limit_reached, invalidation_timestep, |
| 1002 | error_responses, me_user_ids, logged_in_user_id, new_url_num, can, |
| 1003 | sort_spec, group_by_spec): |
| 1004 | # type: (Sequence[str], Tuple(int, Tuple(int, str), |
| 1005 | # google.appengine.api.apiproxy_stub_map.UserRPC), |
| 1006 | # Sequence[Tuple(int, Tuple(int, str), |
| 1007 | # google.appengine.api.apiproxy_stub_map.UserRPC)], |
| 1008 | # int, Mapping[Tuple(int, str), Sequence[int]], |
| 1009 | # Mapping[Tuple(int, str), bool], int, Collection[Tuple(int, str)], |
| 1010 | # Sequence[int], int, int, int, str, str) -> None |
| 1011 | # |
| 1012 | """Process one backend response and retry if there was an error. |
| 1013 | |
| 1014 | SIDE EFFECTS: This function edits many of the passed in parameters in place. |
| 1015 | For example, search_limit_reached and unfiltered_iids are updated with |
| 1016 | response data from the RPC, keyed by shard_key. |
| 1017 | |
| 1018 | Args: |
| 1019 | query_project_names: List of projects to query. |
| 1020 | rpc_tuple: Tuple containing an RPC response object, the time it happened, |
| 1021 | and what shard the RPC was queried against. |
| 1022 | rpc_tuples: List of RPC responses to mutate with any retry responses that |
| 1023 | heppened. |
| 1024 | remaining_retries: Number of times left to retry. |
| 1025 | unfiltered_iids: Dict of Issue ids, before they've been filtered by |
| 1026 | permissions. |
| 1027 | search_limit_reached: Dict of whether the search limit for a particular |
| 1028 | shard has been hit. |
| 1029 | invalidation_timestep: int timestep to use keep cached items fresh. |
| 1030 | error_responses: |
| 1031 | me_user_ids: List of relevant user IDs. ie: the currently logged in user |
| 1032 | and linked account IDs if applicable. |
| 1033 | logged_in_user_id: Logged in user's ID. |
| 1034 | new_url_num: the number of issues for BackendSearchPipeline to query. |
| 1035 | Computed based on pagination offset + number of items per page. |
| 1036 | can: Canned query ID to use. |
| 1037 | sort_spec: str specifying how issues should be sorted. |
| 1038 | group_by_spec: str specifying how issues should be grouped. |
| 1039 | """ |
| 1040 | start_time, shard_key, rpc = rpc_tuple |
| 1041 | duration_sec = time.time() - start_time |
| 1042 | |
| 1043 | try: |
| 1044 | response = rpc.get_result() |
| 1045 | logging.info('call to backend took %d sec', duration_sec) |
| 1046 | # Note that response.content has "})]'\n" prepended to it. |
| 1047 | json_content = response.content[5:] |
| 1048 | logging.info('got json text: %r length %r', |
| 1049 | json_content[:framework_constants.LOGGING_MAX_LENGTH], |
| 1050 | len(json_content)) |
| 1051 | if json_content == '': |
| 1052 | raise Exception('Fast fail') |
| 1053 | json_data = json.loads(json_content) |
| 1054 | unfiltered_iids[shard_key] = json_data['unfiltered_iids'] |
| 1055 | search_limit_reached[shard_key] = json_data['search_limit_reached'] |
| 1056 | if json_data.get('error'): |
| 1057 | # Don't raise an exception, just log, because these errors are more like |
| 1058 | # 400s than 500s, and shouldn't be retried. |
| 1059 | logging.error('Backend shard %r returned error "%r"' % ( |
| 1060 | shard_key, json_data.get('error'))) |
| 1061 | error_responses.add(shard_key) |
| 1062 | |
| 1063 | except Exception as e: |
| 1064 | if duration_sec > FAIL_FAST_LIMIT_SEC: # Don't log fail-fast exceptions. |
| 1065 | logging.exception(e) |
| 1066 | if not remaining_retries: |
| 1067 | logging.error('backend search retries exceeded') |
| 1068 | error_responses.add(shard_key) |
| 1069 | return # Used all retries, so give up. |
| 1070 | |
| 1071 | if duration_sec >= settings.backend_deadline: |
| 1072 | logging.error('backend search on %r took too long', shard_key) |
| 1073 | error_responses.add(shard_key) |
| 1074 | return # That backend shard is overloaded, so give up. |
| 1075 | |
| 1076 | logging.error('backend call for shard %r failed, retrying', shard_key) |
| 1077 | retry_rpc = _StartBackendSearchCall( |
| 1078 | query_project_names, |
| 1079 | shard_key, |
| 1080 | invalidation_timestep, |
| 1081 | me_user_ids, |
| 1082 | logged_in_user_id, |
| 1083 | new_url_num, |
| 1084 | can=can, |
| 1085 | sort_spec=sort_spec, |
| 1086 | group_by_spec=group_by_spec, |
| 1087 | failfast=remaining_retries > 2) |
| 1088 | retry_rpc_tuple = (time.time(), shard_key, retry_rpc) |
| 1089 | retry_rpc.callback = _MakeBackendCallback( |
| 1090 | _HandleBackendSearchResponse, query_project_names, retry_rpc_tuple, |
| 1091 | rpc_tuples, remaining_retries - 1, unfiltered_iids, |
| 1092 | search_limit_reached, invalidation_timestep, error_responses, |
| 1093 | me_user_ids, logged_in_user_id, new_url_num, can, sort_spec, |
| 1094 | group_by_spec) |
| 1095 | rpc_tuples.append(retry_rpc_tuple) |
| 1096 | |
| 1097 | |
| 1098 | def _HandleBackendNonviewableResponse( |
| 1099 | project_id, logged_in_user_id, shard_id, rpc_tuple, rpc_tuples, |
| 1100 | remaining_retries, nonviewable_iids, invalidation_timestep): |
| 1101 | """Process one backend response and retry if there was an error.""" |
| 1102 | start_time, shard_id, rpc = rpc_tuple |
| 1103 | duration_sec = time.time() - start_time |
| 1104 | |
| 1105 | try: |
| 1106 | response = rpc.get_result() |
| 1107 | logging.info('call to backend nonviewable took %d sec', duration_sec) |
| 1108 | # Note that response.content has "})]'\n" prepended to it. |
| 1109 | json_content = response.content[5:] |
| 1110 | logging.info('got json text: %r length %r', |
| 1111 | json_content[:framework_constants.LOGGING_MAX_LENGTH], |
| 1112 | len(json_content)) |
| 1113 | if json_content == '': |
| 1114 | raise Exception('Fast fail') |
| 1115 | json_data = json.loads(json_content) |
| 1116 | nonviewable_iids[shard_id] = set(json_data['nonviewable']) |
| 1117 | |
| 1118 | except Exception as e: |
| 1119 | if duration_sec > FAIL_FAST_LIMIT_SEC: # Don't log fail-fast exceptions. |
| 1120 | logging.exception(e) |
| 1121 | |
| 1122 | if not remaining_retries: |
| 1123 | logging.warn('Used all retries, so give up on shard %r', shard_id) |
| 1124 | return |
| 1125 | |
| 1126 | if duration_sec >= settings.backend_deadline: |
| 1127 | logging.error('nonviewable call on %r took too long', shard_id) |
| 1128 | return # That backend shard is overloaded, so give up. |
| 1129 | |
| 1130 | logging.error( |
| 1131 | 'backend nonviewable call for shard %r;%r;%r failed, retrying', |
| 1132 | project_id, logged_in_user_id, shard_id) |
| 1133 | retry_rpc = _StartBackendNonviewableCall( |
| 1134 | project_id, logged_in_user_id, shard_id, invalidation_timestep, |
| 1135 | failfast=remaining_retries > 2) |
| 1136 | retry_rpc_tuple = (time.time(), shard_id, retry_rpc) |
| 1137 | retry_rpc.callback = _MakeBackendCallback( |
| 1138 | _HandleBackendNonviewableResponse, project_id, logged_in_user_id, |
| 1139 | shard_id, retry_rpc_tuple, rpc_tuples, remaining_retries - 1, |
| 1140 | nonviewable_iids, invalidation_timestep) |
| 1141 | rpc_tuples.append(retry_rpc_tuple) |
| 1142 | |
| 1143 | |
| 1144 | def _TotalLength(sharded_iids): |
| 1145 | """Return the total length of all issue_iids lists.""" |
| 1146 | return sum(len(issue_iids) for issue_iids in sharded_iids.values()) |
| 1147 | |
| 1148 | |
| 1149 | def _ReverseShards(sharded_iids): |
| 1150 | """Reverse each issue_iids list in place.""" |
| 1151 | for shard_key in sharded_iids: |
| 1152 | sharded_iids[shard_key].reverse() |
| 1153 | |
| 1154 | |
| 1155 | def _TrimEndShardedIIDs(sharded_iids, sample_iid_tuples, num_needed): |
| 1156 | """Trim the IIDs to keep at least num_needed items. |
| 1157 | |
| 1158 | Args: |
| 1159 | sharded_iids: dict {shard_key: issue_id_list} for search results. This is |
| 1160 | modified in place to remove some trailing issue IDs. |
| 1161 | sample_iid_tuples: list of (iid, shard_key) from a sorted list of sample |
| 1162 | issues. |
| 1163 | num_needed: int minimum total number of items to keep. Some IIDs that are |
| 1164 | known to belong in positions > num_needed will be trimmed off. |
| 1165 | |
| 1166 | Returns: |
| 1167 | The total number of IIDs removed from the IID lists. |
| 1168 | """ |
| 1169 | # 1. Get (sample_iid, position_in_shard) for each sample. |
| 1170 | sample_positions = _CalcSamplePositions(sharded_iids, sample_iid_tuples) |
| 1171 | |
| 1172 | # 2. Walk through the samples, computing a combined lower bound at each |
| 1173 | # step until we know that we have passed at least num_needed IIDs. |
| 1174 | lower_bound_per_shard = {} |
| 1175 | excess_samples = [] |
| 1176 | for i in range(len(sample_positions)): |
| 1177 | _sample_iid, sample_shard_key, pos = sample_positions[i] |
| 1178 | lower_bound_per_shard[sample_shard_key] = pos |
| 1179 | overall_lower_bound = sum(lower_bound_per_shard.values()) |
| 1180 | if overall_lower_bound >= num_needed: |
| 1181 | excess_samples = sample_positions[i + 1:] |
| 1182 | break |
| 1183 | else: |
| 1184 | return 0 # We went through all samples and never reached num_needed. |
| 1185 | |
| 1186 | # 3. Truncate each shard at the first excess sample in that shard. |
| 1187 | already_trimmed = set() |
| 1188 | num_trimmed = 0 |
| 1189 | for _sample_iid, sample_shard_key, pos in excess_samples: |
| 1190 | if sample_shard_key not in already_trimmed: |
| 1191 | num_trimmed += len(sharded_iids[sample_shard_key]) - pos |
| 1192 | sharded_iids[sample_shard_key] = sharded_iids[sample_shard_key][:pos] |
| 1193 | already_trimmed.add(sample_shard_key) |
| 1194 | |
| 1195 | return num_trimmed |
| 1196 | |
| 1197 | |
| 1198 | # TODO(jrobbins): Convert this to a python generator. |
| 1199 | def _CalcSamplePositions(sharded_iids, sample_iids): |
| 1200 | """Return [(iid, shard_key, position_in_shard), ...] for each sample.""" |
| 1201 | # We keep track of how far index() has scanned in each shard to avoid |
| 1202 | # starting over at position 0 when looking for the next sample in |
| 1203 | # the same shard. |
| 1204 | scan_positions = collections.defaultdict(lambda: 0) |
| 1205 | sample_positions = [] |
| 1206 | for sample_iid, sample_shard_key in sample_iids: |
| 1207 | try: |
| 1208 | pos = sharded_iids.get(sample_shard_key, []).index( |
| 1209 | sample_iid, scan_positions[sample_shard_key]) |
| 1210 | scan_positions[sample_shard_key] = pos |
| 1211 | sample_positions.append((sample_iid, sample_shard_key, pos)) |
| 1212 | except ValueError: |
| 1213 | pass |
| 1214 | |
| 1215 | return sample_positions |
| 1216 | |
| 1217 | |
| 1218 | def _SortIssues(issues, config, users_by_id, group_by_spec, sort_spec): |
| 1219 | """Sort the found issues based on the request and config values. |
| 1220 | |
| 1221 | Args: |
| 1222 | issues: A list of issues to be sorted. |
| 1223 | config: A ProjectIssueConfig that could impact sort order. |
| 1224 | users_by_id: dictionary {user_id: user_view,...} for all users who |
| 1225 | participate in any issue in the entire list. |
| 1226 | group_by_spec: string that lists the grouping order |
| 1227 | sort_spec: string that lists the sort order |
| 1228 | |
| 1229 | |
| 1230 | Returns: |
| 1231 | A sorted list of issues, based on parameters from mr and config. |
| 1232 | """ |
| 1233 | issues = sorting.SortArtifacts( |
| 1234 | issues, config, tracker_helpers.SORTABLE_FIELDS, |
| 1235 | tracker_helpers.SORTABLE_FIELDS_POSTPROCESSORS, group_by_spec, |
| 1236 | sort_spec, users_by_id=users_by_id) |
| 1237 | return issues |