blob: ec0a28e9386ef7983ac46a59995cc9de22b5a706 [file] [log] [blame]
Copybara854996b2021-09-07 19:36:02 +00001# Copyright 2016 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style
3# license that can be found in the LICENSE file or at
4# https://developers.google.com/open-source/licenses/bsd
5
6"""The FrontendSearchPipeline class manages issue search and sorting.
7
8The frontend pipeline checks memcache for cached results in each shard. It
9then calls backend jobs to do any shards that had a cache miss. On cache hit,
10the cached results must be filtered by permissions, so the at-risk cache and
11backends are consulted. Next, the sharded results are combined into an overall
12list of IIDs. Then, that list is paginated and the issues on the current
13pagination page can be shown. Alternatively, this class can determine just the
14position the currently shown issue would occupy in the overall sorted list.
15"""
16
17from __future__ import division
18from __future__ import print_function
19from __future__ import absolute_import
20
21import json
22
23import collections
24import logging
25import math
26import random
27import time
28
29from google.appengine.api import apiproxy_stub_map
30from google.appengine.api import memcache
31from google.appengine.api import modules
32from google.appengine.api import urlfetch
33
34import settings
35from features import savedqueries_helpers
36from framework import framework_bizobj
37from framework import framework_constants
38from framework import framework_helpers
39from framework import paginate
40from framework import permissions
41from framework import sorting
42from framework import urls
43from search import ast2ast
44from search import query2ast
45from search import searchpipeline
46from services import fulltext_helpers
47from tracker import tracker_bizobj
48from tracker import tracker_constants
49from tracker import tracker_helpers
50
51
52# Fail-fast responses usually finish in less than 50ms. If we see a failure
53# in under that amount of time, we don't bother logging it.
54FAIL_FAST_LIMIT_SEC = 0.1
55
56DELAY_BETWEEN_RPC_COMPLETION_POLLS = 0.04 # 40 milliseconds
57
58# The choices help balance the cost of choosing samples vs. the cost of
59# selecting issues that are in a range bounded by neighboring samples.
60# Preferred chunk size parameters were determined by experimentation.
61MIN_SAMPLE_CHUNK_SIZE = int(
62 math.sqrt(tracker_constants.DEFAULT_RESULTS_PER_PAGE))
63MAX_SAMPLE_CHUNK_SIZE = int(math.sqrt(settings.search_limit_per_shard))
64PREFERRED_NUM_CHUNKS = 50
65
66
67# TODO(jojwang): monorail:4127: combine some url parameters info or
68# query info into dicts or tuples to make argument manager easier.
69class FrontendSearchPipeline(object):
70 """Manage the process of issue search, including backends and caching.
71
72 Even though the code is divided into several methods, the public
73 methods should be called in sequence, so the execution of the code
74 is pretty much in the order of the source code lines here.
75 """
76
77 def __init__(
78 self,
79 cnxn,
80 services,
81 auth,
82 me_user_ids,
83 query,
84 query_project_names,
85 items_per_page,
86 paginate_start,
87 can,
88 group_by_spec,
89 sort_spec,
90 warnings,
91 errors,
92 use_cached_searches,
93 profiler,
94 project=None):
95 self.cnxn = cnxn
96 self.me_user_ids = me_user_ids
97 self.auth = auth
98 self.logged_in_user_id = auth.user_id or 0
99 self.can = can
100 self.items_per_page = items_per_page
101 self.paginate_start = paginate_start
102 self.group_by_spec = group_by_spec
103 self.sort_spec = sort_spec
104 self.warnings = warnings
105 self.use_cached_searches = use_cached_searches
106 self.profiler = profiler
107
108 self.services = services
109 self.pagination = None
110 self.num_skipped_at_start = 0
111 self.total_count = 0
112 self.errors = errors
113
114 self.project_name = ''
115 if project:
116 self.project_name = project.project_name
117 self.query_projects = []
118 if query_project_names:
119 consider_projects = list(services.project.GetProjectsByName(
120 self.cnxn, query_project_names).values())
121 self.query_projects = [
122 p for p in consider_projects
123 if permissions.UserCanViewProject(
124 self.auth.user_pb, self.auth.effective_ids, p)]
125 if project:
126 self.query_projects.append(project)
127 member_of_all_projects = self.auth.user_pb.is_site_admin or all(
128 framework_bizobj.UserIsInProject(p, self.auth.effective_ids)
129 for p in self.query_projects)
130 self.query_project_ids = sorted([
131 p.project_id for p in self.query_projects])
132 self.query_project_names = sorted([
133 p.project_name for p in self.query_projects])
134
135 config_dict = self.services.config.GetProjectConfigs(
136 self.cnxn, self.query_project_ids)
137 self.harmonized_config = tracker_bizobj.HarmonizeConfigs(
138 list(config_dict.values()))
139
140 # The following fields are filled in as the pipeline progresses.
141 # The value None means that we still need to compute that value.
142 # A shard_key is a tuple (shard_id, subquery).
143 self.users_by_id = {}
144 self.nonviewable_iids = {} # {shard_id: set(iid)}
145 self.unfiltered_iids = {} # {shard_key: [iid, ...]} needing perm checks.
146 self.filtered_iids = {} # {shard_key: [iid, ...]} already perm checked.
147 self.search_limit_reached = {} # {shard_key: [bool, ...]}.
148 self.allowed_iids = [] # Matching iids that user is permitted to view.
149 self.allowed_results = None # results that the user is permitted to view.
150 self.visible_results = None # allowed_results on current pagination page.
151 self.error_responses = set()
152
153 error_msg = _CheckQuery(
154 self.cnxn, self.services, query, self.harmonized_config,
155 self.query_project_ids, member_of_all_projects,
156 warnings=self.warnings)
157 if error_msg:
158 self.errors.query = error_msg
159
160 # Split up query into smaller subqueries that would get the same results
161 # to improve performance. Smaller queries are more likely to get cache
162 # hits and subqueries can be parallelized by querying for them across
163 # multiple shards.
164 self.subqueries = []
165 try:
166 self.subqueries = query2ast.QueryToSubqueries(query)
167 except query2ast.InvalidQueryError:
168 # Ignore errors because they've already been recorded in
169 # self.errors.query.
170 pass
171
172 def SearchForIIDs(self):
173 """Use backends to search each shard and store their results."""
174 with self.profiler.Phase('Checking cache and calling Backends'):
175 rpc_tuples = _StartBackendSearch(
176 self.cnxn, self.query_project_names, self.query_project_ids,
177 self.harmonized_config, self.unfiltered_iids,
178 self.search_limit_reached, self.nonviewable_iids,
179 self.error_responses, self.services, self.me_user_ids,
180 self.logged_in_user_id, self.items_per_page + self.paginate_start,
181 self.subqueries, self.can, self.group_by_spec, self.sort_spec,
182 self.warnings, self.use_cached_searches)
183
184 with self.profiler.Phase('Waiting for Backends'):
185 try:
186 _FinishBackendSearch(rpc_tuples)
187 except Exception as e:
188 logging.exception(e)
189 raise
190
191 if self.error_responses:
192 logging.error('%r error responses. Incomplete search results.',
193 self.error_responses)
194
195 with self.profiler.Phase('Filtering cached results'):
196 for shard_key in self.unfiltered_iids:
197 shard_id, _subquery = shard_key
198 if shard_id not in self.nonviewable_iids:
199 logging.error(
200 'Not displaying shard %r because of no nonviewable_iids', shard_id)
201 self.error_responses.add(shard_id)
202 filtered_shard_iids = []
203 else:
204 unfiltered_shard_iids = self.unfiltered_iids[shard_key]
205 nonviewable_shard_iids = self.nonviewable_iids[shard_id]
206 # TODO(jrobbins): avoid creating large temporary lists.
207 filtered_shard_iids = [iid for iid in unfiltered_shard_iids
208 if iid not in nonviewable_shard_iids]
209 self.filtered_iids[shard_key] = filtered_shard_iids
210
211 seen_iids_by_shard_id = collections.defaultdict(set)
212 with self.profiler.Phase('Dedupping result IIDs across shards'):
213 for shard_key in self.filtered_iids:
214 shard_id, _subquery = shard_key
215 deduped = [iid for iid in self.filtered_iids[shard_key]
216 if iid not in seen_iids_by_shard_id[shard_id]]
217 self.filtered_iids[shard_key] = deduped
218 seen_iids_by_shard_id[shard_id].update(deduped)
219
220 with self.profiler.Phase('Counting all filtered results'):
221 for shard_key in self.filtered_iids:
222 self.total_count += len(self.filtered_iids[shard_key])
223
224 with self.profiler.Phase('Trimming results beyond pagination page'):
225 for shard_key in self.filtered_iids:
226 self.filtered_iids[shard_key] = self.filtered_iids[
227 shard_key][:self.paginate_start + self.items_per_page]
228
229 def MergeAndSortIssues(self):
230 """Merge and sort results from all shards into one combined list."""
231 with self.profiler.Phase('selecting issues to merge and sort'):
232 self._NarrowFilteredIIDs()
233 self.allowed_iids = []
234 for filtered_shard_iids in self.filtered_iids.values():
235 self.allowed_iids.extend(filtered_shard_iids)
236
237 with self.profiler.Phase('getting allowed results'):
238 self.allowed_results = self.services.issue.GetIssues(
239 self.cnxn, self.allowed_iids)
240
241 # Note: At this point, we have results that are only sorted within
242 # each backend's shard. We still need to sort the merged result.
243 self._LookupNeededUsers(self.allowed_results)
244 with self.profiler.Phase('merging and sorting issues'):
245 self.allowed_results = _SortIssues(
246 self.allowed_results, self.harmonized_config, self.users_by_id,
247 self.group_by_spec, self.sort_spec)
248
249 def _NarrowFilteredIIDs(self):
250 """Combine filtered shards into a range of IIDs for issues to sort.
251
252 The niave way is to concatenate shard_iids[:start + num] for all
253 shards then select [start:start + num]. We do better by sampling
254 issues and then determining which of those samples are known to
255 come before start or after start+num. We then trim off all those IIDs
256 and sort a smaller range of IIDs that might actuall be displayed.
257 See the design doc at go/monorail-sorting.
258
259 This method modifies self.fitered_iids and self.num_skipped_at_start.
260 """
261 # Sample issues and skip those that are known to come before start.
262 # See the "Sorting in Monorail" design doc.
263
264 # If the result set is small, don't bother optimizing it.
265 orig_length = _TotalLength(self.filtered_iids)
266 if orig_length < self.items_per_page * 4:
267 return
268
269 # 1. Get sample issues in each shard and sort them all together.
270 last = self.paginate_start + self.items_per_page
271
272 samples_by_shard, sample_iids_to_shard = self._FetchAllSamples(
273 self.filtered_iids)
274 sample_issues = []
275 for issue_dict in samples_by_shard.values():
276 sample_issues.extend(list(issue_dict.values()))
277
278 self._LookupNeededUsers(sample_issues)
279 sample_issues = _SortIssues(
280 sample_issues, self.harmonized_config, self.users_by_id,
281 self.group_by_spec, self.sort_spec)
282 sample_iid_tuples = [
283 (issue.issue_id, sample_iids_to_shard[issue.issue_id])
284 for issue in sample_issues]
285
286 # 2. Trim off some IIDs that are sure to be positioned after last.
287 num_trimmed_end = _TrimEndShardedIIDs(
288 self.filtered_iids, sample_iid_tuples, last)
289 logging.info('Trimmed %r issues from the end of shards', num_trimmed_end)
290
291 # 3. Trim off some IIDs that are sure to be posiitoned before start.
292 keep = _TotalLength(self.filtered_iids) - self.paginate_start
293 # Reverse the sharded lists.
294 _ReverseShards(self.filtered_iids)
295 sample_iid_tuples.reverse()
296 self.num_skipped_at_start = _TrimEndShardedIIDs(
297 self.filtered_iids, sample_iid_tuples, keep)
298 logging.info('Trimmed %r issues from the start of shards',
299 self.num_skipped_at_start)
300 # Reverse sharded lists again to get back into forward order.
301 _ReverseShards(self.filtered_iids)
302
303 def DetermineIssuePosition(self, issue):
304 """Calculate info needed to show the issue flipper.
305
306 Args:
307 issue: The issue currently being viewed.
308
309 Returns:
310 A 3-tuple (prev_iid, index, next_iid) were prev_iid is the
311 IID of the previous issue in the total ordering (or None),
312 index is the index that the current issue has in the total
313 ordering, and next_iid is the next issue (or None). If the current
314 issue is not in the list of results at all, returns None, None, None.
315 """
316 # 1. If the current issue is not in the results at all, then exit.
317 if not any(issue.issue_id in filtered_shard_iids
318 for filtered_shard_iids in self.filtered_iids.values()):
319 return None, None, None
320
321 # 2. Choose and retrieve sample issues in each shard.
322 samples_by_shard, _ = self._FetchAllSamples(self.filtered_iids)
323
324 # 3. Build up partial results for each shard.
325 preceeding_counts = {} # dict {shard_key: num_issues_preceeding_current}
326 prev_candidates, next_candidates = [], []
327 for shard_key in self.filtered_iids:
328 prev_candidate, index_in_shard, next_candidate = (
329 self._DetermineIssuePositionInShard(
330 shard_key, issue, samples_by_shard[shard_key]))
331 preceeding_counts[shard_key] = index_in_shard
332 if prev_candidate:
333 prev_candidates.append(prev_candidate)
334 if next_candidate:
335 next_candidates.append(next_candidate)
336
337 # 4. Combine the results.
338 index = sum(preceeding_counts.values())
339 prev_candidates = _SortIssues(
340 prev_candidates, self.harmonized_config, self.users_by_id,
341 self.group_by_spec, self.sort_spec)
342 prev_iid = prev_candidates[-1].issue_id if prev_candidates else None
343 next_candidates = _SortIssues(
344 next_candidates, self.harmonized_config, self.users_by_id,
345 self.group_by_spec, self.sort_spec)
346 next_iid = next_candidates[0].issue_id if next_candidates else None
347
348 return prev_iid, index, next_iid
349
350 def _DetermineIssuePositionInShard(self, shard_key, issue, sample_dict):
351 """Determine where the given issue would fit into results from a shard."""
352 # See the design doc for details. Basically, it first surveys the results
353 # to bound a range where the given issue would belong, then it fetches the
354 # issues in that range and sorts them.
355
356 filtered_shard_iids = self.filtered_iids[shard_key]
357
358 # 1. Select a sample of issues, leveraging ones we have in RAM already.
359 issues_on_hand = list(sample_dict.values())
360 if issue.issue_id not in sample_dict:
361 issues_on_hand.append(issue)
362
363 self._LookupNeededUsers(issues_on_hand)
364 sorted_on_hand = _SortIssues(
365 issues_on_hand, self.harmonized_config, self.users_by_id,
366 self.group_by_spec, self.sort_spec)
367 sorted_on_hand_iids = [soh.issue_id for soh in sorted_on_hand]
368 index_in_on_hand = sorted_on_hand_iids.index(issue.issue_id)
369
370 # 2. Bound the gap around where issue belongs.
371 if index_in_on_hand == 0:
372 fetch_start = 0
373 else:
374 prev_on_hand_iid = sorted_on_hand_iids[index_in_on_hand - 1]
375 fetch_start = filtered_shard_iids.index(prev_on_hand_iid) + 1
376
377 if index_in_on_hand == len(sorted_on_hand) - 1:
378 fetch_end = len(filtered_shard_iids)
379 else:
380 next_on_hand_iid = sorted_on_hand_iids[index_in_on_hand + 1]
381 fetch_end = filtered_shard_iids.index(next_on_hand_iid)
382
383 # 3. Retrieve all the issues in that gap to get an exact answer.
384 fetched_issues = self.services.issue.GetIssues(
385 self.cnxn, filtered_shard_iids[fetch_start:fetch_end])
386 if issue.issue_id not in filtered_shard_iids[fetch_start:fetch_end]:
387 fetched_issues.append(issue)
388 self._LookupNeededUsers(fetched_issues)
389 sorted_fetched = _SortIssues(
390 fetched_issues, self.harmonized_config, self.users_by_id,
391 self.group_by_spec, self.sort_spec)
392 sorted_fetched_iids = [sf.issue_id for sf in sorted_fetched]
393 index_in_fetched = sorted_fetched_iids.index(issue.issue_id)
394
395 # 4. Find the issues that come immediately before and after the place where
396 # the given issue would belong in this shard.
397 if index_in_fetched > 0:
398 prev_candidate = sorted_fetched[index_in_fetched - 1]
399 elif index_in_on_hand > 0:
400 prev_candidate = sorted_on_hand[index_in_on_hand - 1]
401 else:
402 prev_candidate = None
403
404 if index_in_fetched < len(sorted_fetched) - 1:
405 next_candidate = sorted_fetched[index_in_fetched + 1]
406 elif index_in_on_hand < len(sorted_on_hand) - 1:
407 next_candidate = sorted_on_hand[index_in_on_hand + 1]
408 else:
409 next_candidate = None
410
411 return prev_candidate, fetch_start + index_in_fetched, next_candidate
412
413 def _FetchAllSamples(self, filtered_iids):
414 """Return a dict {shard_key: {iid: sample_issue}}."""
415 samples_by_shard = {} # {shard_key: {iid: sample_issue}}
416 sample_iids_to_shard = {} # {iid: shard_key}
417 all_needed_iids = [] # List of iids to retrieve.
418
419 for shard_key in filtered_iids:
420 on_hand_issues, shard_needed_iids = self._ChooseSampleIssues(
421 filtered_iids[shard_key])
422 samples_by_shard[shard_key] = on_hand_issues
423 for iid in on_hand_issues:
424 sample_iids_to_shard[iid] = shard_key
425 for iid in shard_needed_iids:
426 sample_iids_to_shard[iid] = shard_key
427 all_needed_iids.extend(shard_needed_iids)
428
429 retrieved_samples, _misses = self.services.issue.GetIssuesDict(
430 self.cnxn, all_needed_iids)
431 for retrieved_iid, retrieved_issue in retrieved_samples.items():
432 retr_shard_key = sample_iids_to_shard[retrieved_iid]
433 samples_by_shard[retr_shard_key][retrieved_iid] = retrieved_issue
434
435 return samples_by_shard, sample_iids_to_shard
436
437 def _ChooseSampleIssues(self, issue_ids):
438 """Select a scattering of issues from the list, leveraging RAM cache.
439
440 Args:
441 issue_ids: A list of issue IDs that comprise the results in a shard.
442
443 Returns:
444 A pair (on_hand_issues, needed_iids) where on_hand_issues is
445 an issue dict {iid: issue} of issues already in RAM, and
446 shard_needed_iids is a list of iids of issues that need to be retrieved.
447 """
448 on_hand_issues = {} # {iid: issue} of sample issues already in RAM.
449 needed_iids = [] # [iid, ...] of sample issues not in RAM yet.
450 chunk_size = max(MIN_SAMPLE_CHUNK_SIZE, min(MAX_SAMPLE_CHUNK_SIZE,
451 int(len(issue_ids) // PREFERRED_NUM_CHUNKS)))
452 for i in range(chunk_size, len(issue_ids), chunk_size):
453 issue = self.services.issue.GetAnyOnHandIssue(
454 issue_ids, start=i, end=min(i + chunk_size, len(issue_ids)))
455 if issue:
456 on_hand_issues[issue.issue_id] = issue
457 else:
458 needed_iids.append(issue_ids[i])
459
460 return on_hand_issues, needed_iids
461
462 def _LookupNeededUsers(self, issues):
463 """Look up user info needed to sort issues, if any."""
464 with self.profiler.Phase('lookup of owner, reporter, and cc'):
465 additional_user_views_by_id = (
466 tracker_helpers.MakeViewsForUsersInIssues(
467 self.cnxn, issues, self.services.user,
468 omit_ids=list(self.users_by_id.keys())))
469 self.users_by_id.update(additional_user_views_by_id)
470
471 def Paginate(self):
472 """Fetch matching issues and paginate the search results.
473
474 These two actions are intertwined because we try to only
475 retrieve the Issues on the current pagination page.
476 """
477 # We already got the issues, just display a slice of the visible ones.
478 limit_reached = False
479 for shard_limit_reached in self.search_limit_reached.values():
480 limit_reached |= shard_limit_reached
481 self.pagination = paginate.ArtifactPagination(
482 self.allowed_results,
483 self.items_per_page,
484 self.paginate_start,
485 self.project_name,
486 urls.ISSUE_LIST,
487 total_count=self.total_count,
488 limit_reached=limit_reached,
489 skipped=self.num_skipped_at_start)
490 self.visible_results = self.pagination.visible_results
491
492 # If we were not forced to look up visible users already, do it now.
493 self._LookupNeededUsers(self.visible_results)
494
495 def __repr__(self):
496 """Return a string that shows the internal state of this pipeline."""
497 if self.allowed_iids:
498 shown_allowed_iids = self.allowed_iids[:200]
499 else:
500 shown_allowed_iids = self.allowed_iids
501
502 if self.allowed_results:
503 shown_allowed_results = self.allowed_results[:200]
504 else:
505 shown_allowed_results = self.allowed_results
506
507 parts = [
508 'allowed_iids: %r' % shown_allowed_iids,
509 'allowed_results: %r' % shown_allowed_results,
510 'len(visible_results): %r' % (
511 self.visible_results and len(self.visible_results))]
512 return '%s(%s)' % (self.__class__.__name__, '\n'.join(parts))
513
514
515def _CheckQuery(
516 cnxn, services, query, harmonized_config, project_ids,
517 member_of_all_projects, warnings=None):
518 """Parse the given query and report the first error or None."""
519 try:
520 query_ast = query2ast.ParseUserQuery(
521 query, '', query2ast.BUILTIN_ISSUE_FIELDS, harmonized_config,
522 warnings=warnings)
523 query_ast = ast2ast.PreprocessAST(
524 cnxn, query_ast, project_ids, services, harmonized_config,
525 is_member=member_of_all_projects)
526 except query2ast.InvalidQueryError as e:
527 return e.message
528 except ast2ast.MalformedQuery as e:
529 return e.message
530
531 return None
532
533
534def _MakeBackendCallback(func, *args):
535 # type: (Callable[[*Any], Any], *Any) -> Callable[[*Any], Any]
536 """Helper to store a particular function and argument set into a callback.
537
538 Args:
539 func: Function to callback.
540 *args: The arguments to pass into the function.
541
542 Returns:
543 Callback function based on specified arguments.
544 """
545 return lambda: func(*args)
546
547
548def _StartBackendSearch(
549 cnxn, query_project_names, query_project_ids, harmonized_config,
550 unfiltered_iids_dict, search_limit_reached_dict, nonviewable_iids,
551 error_responses, services, me_user_ids, logged_in_user_id, new_url_num,
552 subqueries, can, group_by_spec, sort_spec, warnings, use_cached_searches):
553 # type: (MonorailConnection, Sequence[str], Sequence[int],
554 # proto.tracker_pb2.ProjectIssueConfig,
555 # Mapping[Tuple(int, str), Sequence[int]],
556 # Mapping[Tuple(int, str), Sequence[bool]],
557 # Mapping[Tuple(int, str), Collection[int]], Sequence[Tuple(int, str)],
558 # Services, Sequence[int], int, int, Sequence[str], int, str, str,
559 # Sequence[Tuple(str, Sequence[str])], bool) ->
560 # Sequence[Tuple(int, Tuple(int, str),
561 # google.appengine.api.apiproxy_stub_map.UserRPC)]
562 """Request that our backends search and return a list of matching issue IDs.
563
564 Args:
565 cnxn: monorail connection to the database.
566 query_project_names: set of project names to search.
567 query_project_ids: list of project IDs to search.
568 harmonized_config: combined ProjectIssueConfig for all projects being
569 searched.
570 unfiltered_iids_dict: dict {shard_key: [iid, ...]} of unfiltered search
571 results to accumulate into. They need to be later filtered by
572 permissions and merged into filtered_iids_dict.
573 search_limit_reached_dict: dict {shard_key: [bool, ...]} to determine if
574 the search limit of any shard was reached.
575 nonviewable_iids: dict {shard_id: set(iid)} of restricted issues in the
576 projects being searched that the signed in user cannot view.
577 error_responses: shard_iids of shards that encountered errors.
578 services: connections to backends.
579 me_user_ids: Empty list when no user is logged in, or user ID of the logged
580 in user when doing an interactive search, or the viewed user ID when
581 viewing someone else's dashboard, or the subscribing user's ID when
582 evaluating subscriptions. And, any linked accounts.
583 logged_in_user_id: user_id of the logged in user, 0 otherwise
584 new_url_num: the number of issues for BackendSearchPipeline to query.
585 Computed based on pagination offset + number of items per page.
586 subqueries: split up list of query string segments.
587 can: "canned query" number to scope the user's search.
588 group_by_spec: string that lists the grouping order.
589 sort_spec: string that lists the sort order.
590 warnings: list to accumulate warning messages.
591 use_cached_searches: Bool for whether to use cached searches.
592
593 Returns:
594 A list of rpc_tuples that can be passed to _FinishBackendSearch to wait
595 on any remaining backend calls.
596
597 SIDE-EFFECTS:
598 Any data found in memcache is immediately put into unfiltered_iids_dict.
599 As the backends finish their work, _HandleBackendSearchResponse will update
600 unfiltered_iids_dict for those shards.
601
602 Any warnings produced throughout this process will be added to the list
603 warnings.
604 """
605 rpc_tuples = []
606 needed_shard_keys = set()
607 for subquery in subqueries:
608 subquery, warnings = searchpipeline.ReplaceKeywordsWithUserIDs(
609 me_user_ids, subquery)
610 warnings.extend(warnings)
611 for shard_id in range(settings.num_logical_shards):
612 needed_shard_keys.add((shard_id, subquery))
613
614 # 1. Get whatever we can from memcache. Cache hits are only kept if they are
615 # not already expired.
616 project_shard_timestamps = _GetProjectTimestamps(
617 query_project_ids, needed_shard_keys)
618
619 if use_cached_searches:
620 cached_unfiltered_iids_dict, cached_search_limit_reached_dict = (
621 _GetCachedSearchResults(
622 cnxn, query_project_ids, needed_shard_keys,
623 harmonized_config, project_shard_timestamps, services, me_user_ids,
624 can, group_by_spec, sort_spec, warnings))
625 unfiltered_iids_dict.update(cached_unfiltered_iids_dict)
626 search_limit_reached_dict.update(cached_search_limit_reached_dict)
627 for cache_hit_shard_key in unfiltered_iids_dict:
628 needed_shard_keys.remove(cache_hit_shard_key)
629
630 # 2. Each kept cache hit will have unfiltered IIDs, so we filter them by
631 # removing non-viewable IDs.
632 _GetNonviewableIIDs(
633 query_project_ids, logged_in_user_id,
634 set(range(settings.num_logical_shards)),
635 rpc_tuples, nonviewable_iids, project_shard_timestamps,
636 services.cache_manager.processed_invalidations_up_to,
637 use_cached_searches)
638
639 # 3. Hit backends for any shards that are still needed. When these results
640 # come back, they are also put into unfiltered_iids_dict.
641 for shard_key in needed_shard_keys:
642 rpc = _StartBackendSearchCall(
643 query_project_names,
644 shard_key,
645 services.cache_manager.processed_invalidations_up_to,
646 me_user_ids,
647 logged_in_user_id,
648 new_url_num,
649 can=can,
650 sort_spec=sort_spec,
651 group_by_spec=group_by_spec)
652 rpc_tuple = (time.time(), shard_key, rpc)
653 rpc.callback = _MakeBackendCallback(
654 _HandleBackendSearchResponse, query_project_names, rpc_tuple,
655 rpc_tuples, settings.backend_retries, unfiltered_iids_dict,
656 search_limit_reached_dict,
657 services.cache_manager.processed_invalidations_up_to, error_responses,
658 me_user_ids, logged_in_user_id, new_url_num, can, sort_spec,
659 group_by_spec)
660 rpc_tuples.append(rpc_tuple)
661
662 return rpc_tuples
663
664
665def _FinishBackendSearch(rpc_tuples):
666 """Wait for all backend calls to complete, including any retries."""
667 while rpc_tuples:
668 active_rpcs = [rpc for (_time, _shard_key, rpc) in rpc_tuples]
669 # Wait for any active RPC to complete. It's callback function will
670 # automatically be called.
671 finished_rpc = real_wait_any(active_rpcs)
672 # Figure out which rpc_tuple finished and remove it from our list.
673 for rpc_tuple in rpc_tuples:
674 _time, _shard_key, rpc = rpc_tuple
675 if rpc == finished_rpc:
676 rpc_tuples.remove(rpc_tuple)
677 break
678 else:
679 raise ValueError('We somehow finished an RPC that is not in rpc_tuples')
680
681
682def real_wait_any(active_rpcs):
683 """Work around the blocking nature of wait_any().
684
685 wait_any() checks for any finished RPCs, and returns one if found.
686 If no RPC is finished, it simply blocks on the last RPC in the list.
687 This is not the desired behavior because we are not able to detect
688 FAST-FAIL RPC results and retry them if wait_any() is blocked on a
689 request that is taking a long time to do actual work.
690
691 Instead, we do the same check, without blocking on any individual RPC.
692 """
693 if settings.local_mode:
694 # The development server has very different code for RPCs than the
695 # code used in the hosted environment.
696 return apiproxy_stub_map.UserRPC.wait_any(active_rpcs)
697 while True:
698 finished, _ = apiproxy_stub_map.UserRPC._UserRPC__check_one(active_rpcs)
699 if finished:
700 return finished
701 time.sleep(DELAY_BETWEEN_RPC_COMPLETION_POLLS)
702
703def _GetProjectTimestamps(query_project_ids, needed_shard_keys):
704 """Get a dict of modified_ts values for all specified project-shards."""
705 project_shard_timestamps = {}
706 if query_project_ids:
707 keys = []
708 for pid in query_project_ids:
709 for sid, _subquery in needed_shard_keys:
710 keys.append('%d;%d' % (pid, sid))
711 else:
712 keys = [('all;%d' % sid)
713 for sid, _subquery in needed_shard_keys]
714
715 timestamps_for_project = memcache.get_multi(
716 keys=keys, namespace=settings.memcache_namespace)
717 for key, timestamp in timestamps_for_project.items():
718 pid_str, sid_str = key.split(';')
719 if pid_str == 'all':
720 project_shard_timestamps['all', int(sid_str)] = timestamp
721 else:
722 project_shard_timestamps[int(pid_str), int(sid_str)] = timestamp
723
724 return project_shard_timestamps
725
726
727def _GetNonviewableIIDs(
728 query_project_ids, logged_in_user_id, needed_shard_ids, rpc_tuples,
729 nonviewable_iids, project_shard_timestamps, invalidation_timestep,
730 use_cached_searches):
731 """Build a set of at-risk IIDs, and accumulate RPCs to get uncached ones."""
732 if query_project_ids:
733 keys = []
734 for pid in query_project_ids:
735 for sid in needed_shard_ids:
736 keys.append('%d;%d;%d' % (pid, logged_in_user_id, sid))
737 else:
738 keys = [
739 ('all;%d;%d' % (logged_in_user_id, sid)) for sid in needed_shard_ids
740 ]
741
742 if use_cached_searches:
743 cached_dict = memcache.get_multi(
744 keys, key_prefix='nonviewable:', namespace=settings.memcache_namespace)
745 else:
746 cached_dict = {}
747
748 for sid in needed_shard_ids:
749 if query_project_ids:
750 for pid in query_project_ids:
751 _AccumulateNonviewableIIDs(
752 pid, logged_in_user_id, sid, cached_dict, nonviewable_iids,
753 project_shard_timestamps, rpc_tuples, invalidation_timestep)
754 else:
755 _AccumulateNonviewableIIDs(
756 None, logged_in_user_id, sid, cached_dict, nonviewable_iids,
757 project_shard_timestamps, rpc_tuples, invalidation_timestep)
758
759
760def _AccumulateNonviewableIIDs(
761 pid, logged_in_user_id, sid, cached_dict, nonviewable_iids,
762 project_shard_timestamps, rpc_tuples, invalidation_timestep):
763 """Use one of the retrieved cache entries or call a backend if needed."""
764 if pid is None:
765 key = 'all;%d;%d' % (logged_in_user_id, sid)
766 else:
767 key = '%d;%d;%d' % (pid, logged_in_user_id, sid)
768
769 if key in cached_dict:
770 issue_ids, cached_ts = cached_dict.get(key)
771 modified_ts = project_shard_timestamps.get((pid, sid))
772 if modified_ts is None or modified_ts > cached_ts:
773 logging.info('nonviewable too stale on (project %r, shard %r)',
774 pid, sid)
775 else:
776 logging.info('adding %d nonviewable issue_ids', len(issue_ids))
777 nonviewable_iids[sid] = set(issue_ids)
778
779 if sid not in nonviewable_iids:
780 logging.info('nonviewable for %r not found', key)
781 logging.info('starting backend call for nonviewable iids %r', key)
782 rpc = _StartBackendNonviewableCall(
783 pid, logged_in_user_id, sid, invalidation_timestep)
784 rpc_tuple = (time.time(), sid, rpc)
785 rpc.callback = _MakeBackendCallback(
786 _HandleBackendNonviewableResponse, pid, logged_in_user_id, sid,
787 rpc_tuple, rpc_tuples, settings.backend_retries, nonviewable_iids,
788 invalidation_timestep)
789 rpc_tuples.append(rpc_tuple)
790
791
792def _GetCachedSearchResults(
793 cnxn, query_project_ids, needed_shard_keys, harmonized_config,
794 project_shard_timestamps, services, me_user_ids, can, group_by_spec,
795 sort_spec, warnings):
796 """Return a dict of cached search results that are not already stale.
797
798 If it were not for cross-project search, we would simply cache when we do a
799 search and then invalidate when an issue is modified. But, with
800 cross-project search we don't know all the memcache entries that would
801 need to be invalidated. So, instead, we write the search result cache
802 entries and then an initial modified_ts value for each project if it was
803 not already there. And, when we update an issue we write a new
804 modified_ts entry, which implicitly invalidate all search result
805 cache entries that were written earlier because they are now stale. When
806 reading from the cache, we ignore any query project with modified_ts
807 after its search result cache timestamp, because it is stale.
808
809 Args:
810 cnxn: monorail connection to the database.
811 query_project_ids: list of project ID numbers for all projects being
812 searched.
813 needed_shard_keys: set of shard keys that need to be checked.
814 harmonized_config: ProjectIsueConfig with combined information for all
815 projects involved in this search.
816 project_shard_timestamps: a dict {(project_id, shard_id): timestamp, ...}
817 that tells when each shard was last invalidated.
818 services: connections to backends.
819 me_user_ids: Empty list when no user is logged in, or user ID of the logged
820 in user when doing an interactive search, or the viewed user ID when
821 viewing someone else's dashboard, or the subscribing user's ID when
822 evaluating subscriptions. And, any linked accounts.
823 can: "canned query" number to scope the user's search.
824 group_by_spec: string that lists the grouping order.
825 sort_spec: string that lists the sort order.
826 warnings: list to accumulate warning messages.
827
828
829 Returns:
830 Tuple consisting of:
831 A dictionary {shard_id: [issue_id, ...], ...} of unfiltered search result
832 issue IDs. Only shard_ids found in memcache will be in that dictionary.
833 The result issue IDs must be permission checked before they can be
834 considered to be part of the user's result set.
835 A dictionary {shard_id: bool, ...}. The boolean is set to True if
836 the search results limit of the shard is hit.
837 """
838 projects_str = ','.join(str(pid) for pid in sorted(query_project_ids))
839 projects_str = projects_str or 'all'
840 canned_query = savedqueries_helpers.SavedQueryIDToCond(
841 cnxn, services.features, can)
842 canned_query, warnings = searchpipeline.ReplaceKeywordsWithUserIDs(
843 me_user_ids, canned_query)
844 warnings.extend(warnings)
845
846 sd = sorting.ComputeSortDirectives(
847 harmonized_config, group_by_spec, sort_spec)
848 sd_str = ' '.join(sd)
849 memcache_key_prefix = '%s;%s' % (projects_str, canned_query)
850 limit_reached_key_prefix = '%s;%s' % (projects_str, canned_query)
851
852 cached_dict = memcache.get_multi(
853 ['%s;%s;%s;%d' % (memcache_key_prefix, subquery, sd_str, sid)
854 for sid, subquery in needed_shard_keys],
855 namespace=settings.memcache_namespace)
856 cached_search_limit_reached_dict = memcache.get_multi(
857 ['%s;%s;%s;search_limit_reached;%d' % (
858 limit_reached_key_prefix, subquery, sd_str, sid)
859 for sid, subquery in needed_shard_keys],
860 namespace=settings.memcache_namespace)
861
862 unfiltered_dict = {}
863 search_limit_reached_dict = {}
864 for shard_key in needed_shard_keys:
865 shard_id, subquery = shard_key
866 memcache_key = '%s;%s;%s;%d' % (
867 memcache_key_prefix, subquery, sd_str, shard_id)
868 limit_reached_key = '%s;%s;%s;search_limit_reached;%d' % (
869 limit_reached_key_prefix, subquery, sd_str, shard_id)
870 if memcache_key not in cached_dict:
871 logging.info('memcache miss on shard %r', shard_key)
872 continue
873
874 cached_iids, cached_ts = cached_dict[memcache_key]
875 if cached_search_limit_reached_dict.get(limit_reached_key):
876 search_limit_reached, _ = cached_search_limit_reached_dict[
877 limit_reached_key]
878 else:
879 search_limit_reached = False
880
881 stale = False
882 if query_project_ids:
883 for project_id in query_project_ids:
884 modified_ts = project_shard_timestamps.get((project_id, shard_id))
885 if modified_ts is None or modified_ts > cached_ts:
886 stale = True
887 logging.info('memcache too stale on shard %r because of %r',
888 shard_id, project_id)
889 break
890 else:
891 modified_ts = project_shard_timestamps.get(('all', shard_id))
892 if modified_ts is None or modified_ts > cached_ts:
893 stale = True
894 logging.info('memcache too stale on shard %r because of all',
895 shard_id)
896
897 if not stale:
898 unfiltered_dict[shard_key] = cached_iids
899 search_limit_reached_dict[shard_key] = search_limit_reached
900
901 return unfiltered_dict, search_limit_reached_dict
902
903
904def _MakeBackendRequestHeaders(failfast):
905 headers = {
906 # This is needed to allow frontends to talk to backends without going
907 # through a login screen on googleplex.com.
908 # http://wiki/Main/PrometheusInternal#Internal_Applications_and_APIs
909 'X-URLFetch-Service-Id': 'GOOGLEPLEX',
910 }
911 if failfast:
912 headers['X-AppEngine-FailFast'] = 'Yes'
913 return headers
914
915
916def _StartBackendSearchCall(
917 query_project_names,
918 shard_key,
919 invalidation_timestep,
920 me_user_ids,
921 logged_in_user_id,
922 new_url_num,
923 can=None,
924 sort_spec=None,
925 group_by_spec=None,
926 deadline=None,
927 failfast=True):
928 # type: (Sequence[str], Tuple(int, str), int, Sequence[int], int,
929 # int, str, str, int, bool) ->
930 # google.appengine.api.apiproxy_stub_map.UserRPC
931 """Ask a backend to query one shard of the database.
932
933 Args:
934 query_project_names: List of project names queried.
935 shard_key: Tuple specifying which DB shard to query.
936 invalidation_timestep: int timestep to use keep cached items fresh.
937 me_user_ids: Empty list when no user is logged in, or user ID of the logged
938 in user when doing an interactive search, or the viewed user ID when
939 viewing someone else's dashboard, or the subscribing user's ID when
940 evaluating subscriptions. And, any linked accounts.
941 logged_in_user_id: Id of the logged in user.
942 new_url_num: the number of issues for BackendSearchPipeline to query.
943 Computed based on pagination offset + number of items per page.
944 can: Id of th canned query to use.
945 sort_spec: Str specifying how issues should be sorted.
946 group_by_spec: Str specifying how issues should be grouped.
947 deadline: Max time for the RPC to take before failing.
948 failfast: Whether to set the X-AppEngine-FailFast request header.
949
950 Returns:
951 UserRPC for the created RPC call.
952 """
953 shard_id, subquery = shard_key
Adrià Vilanova Martínez515639b2021-07-06 16:43:59 +0200954 backend_host = modules.get_hostname(module='default')
955 url = 'https://%s%s' % (
Copybara854996b2021-09-07 19:36:02 +0000956 backend_host,
957 framework_helpers.FormatURL(
958 [],
959 urls.BACKEND_SEARCH,
960 projects=','.join(query_project_names),
961 q=subquery,
962 start=0,
963 num=new_url_num,
964 can=can,
965 sort=sort_spec,
966 groupby=group_by_spec,
967 logged_in_user_id=logged_in_user_id,
968 me_user_ids=','.join(str(uid) for uid in me_user_ids),
969 shard_id=shard_id,
970 invalidation_timestep=invalidation_timestep))
971 logging.info('\n\nCalling backend: %s', url)
972 rpc = urlfetch.create_rpc(
973 deadline=deadline or settings.backend_deadline)
974 headers = _MakeBackendRequestHeaders(failfast)
975 # follow_redirects=False is needed to avoid a login screen on googleplex.
976 urlfetch.make_fetch_call(rpc, url, follow_redirects=False, headers=headers)
977 return rpc
978
979
980def _StartBackendNonviewableCall(
981 project_id, logged_in_user_id, shard_id, invalidation_timestep,
982 deadline=None, failfast=True):
983 """Ask a backend to query one shard of the database."""
Adrià Vilanova Martínez515639b2021-07-06 16:43:59 +0200984 backend_host = modules.get_hostname(module='default')
985 url = 'https://%s%s' % (backend_host, framework_helpers.FormatURL(
Copybara854996b2021-09-07 19:36:02 +0000986 None, urls.BACKEND_NONVIEWABLE,
987 project_id=project_id or '',
988 logged_in_user_id=logged_in_user_id or '',
989 shard_id=shard_id,
990 invalidation_timestep=invalidation_timestep))
991 logging.info('Calling backend nonviewable: %s', url)
992 rpc = urlfetch.create_rpc(deadline=deadline or settings.backend_deadline)
993 headers = _MakeBackendRequestHeaders(failfast)
994 # follow_redirects=False is needed to avoid a login screen on googleplex.
995 urlfetch.make_fetch_call(rpc, url, follow_redirects=False, headers=headers)
996 return rpc
997
998
999def _HandleBackendSearchResponse(
1000 query_project_names, rpc_tuple, rpc_tuples, remaining_retries,
1001 unfiltered_iids, search_limit_reached, invalidation_timestep,
1002 error_responses, me_user_ids, logged_in_user_id, new_url_num, can,
1003 sort_spec, group_by_spec):
1004 # type: (Sequence[str], Tuple(int, Tuple(int, str),
1005 # google.appengine.api.apiproxy_stub_map.UserRPC),
1006 # Sequence[Tuple(int, Tuple(int, str),
1007 # google.appengine.api.apiproxy_stub_map.UserRPC)],
1008 # int, Mapping[Tuple(int, str), Sequence[int]],
1009 # Mapping[Tuple(int, str), bool], int, Collection[Tuple(int, str)],
1010 # Sequence[int], int, int, int, str, str) -> None
1011 #
1012 """Process one backend response and retry if there was an error.
1013
1014 SIDE EFFECTS: This function edits many of the passed in parameters in place.
1015 For example, search_limit_reached and unfiltered_iids are updated with
1016 response data from the RPC, keyed by shard_key.
1017
1018 Args:
1019 query_project_names: List of projects to query.
1020 rpc_tuple: Tuple containing an RPC response object, the time it happened,
1021 and what shard the RPC was queried against.
1022 rpc_tuples: List of RPC responses to mutate with any retry responses that
1023 heppened.
1024 remaining_retries: Number of times left to retry.
1025 unfiltered_iids: Dict of Issue ids, before they've been filtered by
1026 permissions.
1027 search_limit_reached: Dict of whether the search limit for a particular
1028 shard has been hit.
1029 invalidation_timestep: int timestep to use keep cached items fresh.
1030 error_responses:
1031 me_user_ids: List of relevant user IDs. ie: the currently logged in user
1032 and linked account IDs if applicable.
1033 logged_in_user_id: Logged in user's ID.
1034 new_url_num: the number of issues for BackendSearchPipeline to query.
1035 Computed based on pagination offset + number of items per page.
1036 can: Canned query ID to use.
1037 sort_spec: str specifying how issues should be sorted.
1038 group_by_spec: str specifying how issues should be grouped.
1039 """
1040 start_time, shard_key, rpc = rpc_tuple
1041 duration_sec = time.time() - start_time
1042
1043 try:
1044 response = rpc.get_result()
1045 logging.info('call to backend took %d sec', duration_sec)
1046 # Note that response.content has "})]'\n" prepended to it.
1047 json_content = response.content[5:]
1048 logging.info('got json text: %r length %r',
1049 json_content[:framework_constants.LOGGING_MAX_LENGTH],
1050 len(json_content))
1051 if json_content == '':
1052 raise Exception('Fast fail')
1053 json_data = json.loads(json_content)
1054 unfiltered_iids[shard_key] = json_data['unfiltered_iids']
1055 search_limit_reached[shard_key] = json_data['search_limit_reached']
1056 if json_data.get('error'):
1057 # Don't raise an exception, just log, because these errors are more like
1058 # 400s than 500s, and shouldn't be retried.
1059 logging.error('Backend shard %r returned error "%r"' % (
1060 shard_key, json_data.get('error')))
1061 error_responses.add(shard_key)
1062
1063 except Exception as e:
1064 if duration_sec > FAIL_FAST_LIMIT_SEC: # Don't log fail-fast exceptions.
1065 logging.exception(e)
1066 if not remaining_retries:
1067 logging.error('backend search retries exceeded')
1068 error_responses.add(shard_key)
1069 return # Used all retries, so give up.
1070
1071 if duration_sec >= settings.backend_deadline:
1072 logging.error('backend search on %r took too long', shard_key)
1073 error_responses.add(shard_key)
1074 return # That backend shard is overloaded, so give up.
1075
1076 logging.error('backend call for shard %r failed, retrying', shard_key)
1077 retry_rpc = _StartBackendSearchCall(
1078 query_project_names,
1079 shard_key,
1080 invalidation_timestep,
1081 me_user_ids,
1082 logged_in_user_id,
1083 new_url_num,
1084 can=can,
1085 sort_spec=sort_spec,
1086 group_by_spec=group_by_spec,
1087 failfast=remaining_retries > 2)
1088 retry_rpc_tuple = (time.time(), shard_key, retry_rpc)
1089 retry_rpc.callback = _MakeBackendCallback(
1090 _HandleBackendSearchResponse, query_project_names, retry_rpc_tuple,
1091 rpc_tuples, remaining_retries - 1, unfiltered_iids,
1092 search_limit_reached, invalidation_timestep, error_responses,
1093 me_user_ids, logged_in_user_id, new_url_num, can, sort_spec,
1094 group_by_spec)
1095 rpc_tuples.append(retry_rpc_tuple)
1096
1097
1098def _HandleBackendNonviewableResponse(
1099 project_id, logged_in_user_id, shard_id, rpc_tuple, rpc_tuples,
1100 remaining_retries, nonviewable_iids, invalidation_timestep):
1101 """Process one backend response and retry if there was an error."""
1102 start_time, shard_id, rpc = rpc_tuple
1103 duration_sec = time.time() - start_time
1104
1105 try:
1106 response = rpc.get_result()
1107 logging.info('call to backend nonviewable took %d sec', duration_sec)
1108 # Note that response.content has "})]'\n" prepended to it.
1109 json_content = response.content[5:]
1110 logging.info('got json text: %r length %r',
1111 json_content[:framework_constants.LOGGING_MAX_LENGTH],
1112 len(json_content))
1113 if json_content == '':
1114 raise Exception('Fast fail')
1115 json_data = json.loads(json_content)
1116 nonviewable_iids[shard_id] = set(json_data['nonviewable'])
1117
1118 except Exception as e:
1119 if duration_sec > FAIL_FAST_LIMIT_SEC: # Don't log fail-fast exceptions.
1120 logging.exception(e)
1121
1122 if not remaining_retries:
1123 logging.warn('Used all retries, so give up on shard %r', shard_id)
1124 return
1125
1126 if duration_sec >= settings.backend_deadline:
1127 logging.error('nonviewable call on %r took too long', shard_id)
1128 return # That backend shard is overloaded, so give up.
1129
1130 logging.error(
1131 'backend nonviewable call for shard %r;%r;%r failed, retrying',
1132 project_id, logged_in_user_id, shard_id)
1133 retry_rpc = _StartBackendNonviewableCall(
1134 project_id, logged_in_user_id, shard_id, invalidation_timestep,
1135 failfast=remaining_retries > 2)
1136 retry_rpc_tuple = (time.time(), shard_id, retry_rpc)
1137 retry_rpc.callback = _MakeBackendCallback(
1138 _HandleBackendNonviewableResponse, project_id, logged_in_user_id,
1139 shard_id, retry_rpc_tuple, rpc_tuples, remaining_retries - 1,
1140 nonviewable_iids, invalidation_timestep)
1141 rpc_tuples.append(retry_rpc_tuple)
1142
1143
1144def _TotalLength(sharded_iids):
1145 """Return the total length of all issue_iids lists."""
1146 return sum(len(issue_iids) for issue_iids in sharded_iids.values())
1147
1148
1149def _ReverseShards(sharded_iids):
1150 """Reverse each issue_iids list in place."""
1151 for shard_key in sharded_iids:
1152 sharded_iids[shard_key].reverse()
1153
1154
1155def _TrimEndShardedIIDs(sharded_iids, sample_iid_tuples, num_needed):
1156 """Trim the IIDs to keep at least num_needed items.
1157
1158 Args:
1159 sharded_iids: dict {shard_key: issue_id_list} for search results. This is
1160 modified in place to remove some trailing issue IDs.
1161 sample_iid_tuples: list of (iid, shard_key) from a sorted list of sample
1162 issues.
1163 num_needed: int minimum total number of items to keep. Some IIDs that are
1164 known to belong in positions > num_needed will be trimmed off.
1165
1166 Returns:
1167 The total number of IIDs removed from the IID lists.
1168 """
1169 # 1. Get (sample_iid, position_in_shard) for each sample.
1170 sample_positions = _CalcSamplePositions(sharded_iids, sample_iid_tuples)
1171
1172 # 2. Walk through the samples, computing a combined lower bound at each
1173 # step until we know that we have passed at least num_needed IIDs.
1174 lower_bound_per_shard = {}
1175 excess_samples = []
1176 for i in range(len(sample_positions)):
1177 _sample_iid, sample_shard_key, pos = sample_positions[i]
1178 lower_bound_per_shard[sample_shard_key] = pos
1179 overall_lower_bound = sum(lower_bound_per_shard.values())
1180 if overall_lower_bound >= num_needed:
1181 excess_samples = sample_positions[i + 1:]
1182 break
1183 else:
1184 return 0 # We went through all samples and never reached num_needed.
1185
1186 # 3. Truncate each shard at the first excess sample in that shard.
1187 already_trimmed = set()
1188 num_trimmed = 0
1189 for _sample_iid, sample_shard_key, pos in excess_samples:
1190 if sample_shard_key not in already_trimmed:
1191 num_trimmed += len(sharded_iids[sample_shard_key]) - pos
1192 sharded_iids[sample_shard_key] = sharded_iids[sample_shard_key][:pos]
1193 already_trimmed.add(sample_shard_key)
1194
1195 return num_trimmed
1196
1197
1198# TODO(jrobbins): Convert this to a python generator.
1199def _CalcSamplePositions(sharded_iids, sample_iids):
1200 """Return [(iid, shard_key, position_in_shard), ...] for each sample."""
1201 # We keep track of how far index() has scanned in each shard to avoid
1202 # starting over at position 0 when looking for the next sample in
1203 # the same shard.
1204 scan_positions = collections.defaultdict(lambda: 0)
1205 sample_positions = []
1206 for sample_iid, sample_shard_key in sample_iids:
1207 try:
1208 pos = sharded_iids.get(sample_shard_key, []).index(
1209 sample_iid, scan_positions[sample_shard_key])
1210 scan_positions[sample_shard_key] = pos
1211 sample_positions.append((sample_iid, sample_shard_key, pos))
1212 except ValueError:
1213 pass
1214
1215 return sample_positions
1216
1217
1218def _SortIssues(issues, config, users_by_id, group_by_spec, sort_spec):
1219 """Sort the found issues based on the request and config values.
1220
1221 Args:
1222 issues: A list of issues to be sorted.
1223 config: A ProjectIssueConfig that could impact sort order.
1224 users_by_id: dictionary {user_id: user_view,...} for all users who
1225 participate in any issue in the entire list.
1226 group_by_spec: string that lists the grouping order
1227 sort_spec: string that lists the sort order
1228
1229
1230 Returns:
1231 A sorted list of issues, based on parameters from mr and config.
1232 """
1233 issues = sorting.SortArtifacts(
1234 issues, config, tracker_helpers.SORTABLE_FIELDS,
1235 tracker_helpers.SORTABLE_FIELDS_POSTPROCESSORS, group_by_spec,
1236 sort_spec, users_by_id=users_by_id)
1237 return issues