blob: b60667235eb9724f2aa93c4a9f4051c2f395fb6f [file] [log] [blame]
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01001# Copyright 2016 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
Copybara854996b2021-09-07 19:36:02 +00004
5"""The FrontendSearchPipeline class manages issue search and sorting.
6
7The frontend pipeline checks memcache for cached results in each shard. It
8then calls backend jobs to do any shards that had a cache miss. On cache hit,
9the cached results must be filtered by permissions, so the at-risk cache and
10backends are consulted. Next, the sharded results are combined into an overall
11list of IIDs. Then, that list is paginated and the issues on the current
12pagination page can be shown. Alternatively, this class can determine just the
13position the currently shown issue would occupy in the overall sorted list.
14"""
15
16from __future__ import division
17from __future__ import print_function
18from __future__ import absolute_import
19
20import json
21
22import collections
23import logging
24import math
25import random
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +010026import six
Copybara854996b2021-09-07 19:36:02 +000027import time
28
29from google.appengine.api import apiproxy_stub_map
30from google.appengine.api import memcache
31from google.appengine.api import modules
32from google.appengine.api import urlfetch
33
34import settings
35from features import savedqueries_helpers
36from framework import framework_bizobj
37from framework import framework_constants
38from framework import framework_helpers
39from framework import paginate
40from framework import permissions
41from framework import sorting
42from framework import urls
43from search import ast2ast
44from search import query2ast
45from search import searchpipeline
46from services import fulltext_helpers
47from tracker import tracker_bizobj
48from tracker import tracker_constants
49from tracker import tracker_helpers
50
51
52# Fail-fast responses usually finish in less than 50ms. If we see a failure
53# in under that amount of time, we don't bother logging it.
54FAIL_FAST_LIMIT_SEC = 0.1
55
56DELAY_BETWEEN_RPC_COMPLETION_POLLS = 0.04 # 40 milliseconds
57
58# The choices help balance the cost of choosing samples vs. the cost of
59# selecting issues that are in a range bounded by neighboring samples.
60# Preferred chunk size parameters were determined by experimentation.
61MIN_SAMPLE_CHUNK_SIZE = int(
62 math.sqrt(tracker_constants.DEFAULT_RESULTS_PER_PAGE))
63MAX_SAMPLE_CHUNK_SIZE = int(math.sqrt(settings.search_limit_per_shard))
64PREFERRED_NUM_CHUNKS = 50
65
66
67# TODO(jojwang): monorail:4127: combine some url parameters info or
68# query info into dicts or tuples to make argument manager easier.
69class FrontendSearchPipeline(object):
70 """Manage the process of issue search, including backends and caching.
71
72 Even though the code is divided into several methods, the public
73 methods should be called in sequence, so the execution of the code
74 is pretty much in the order of the source code lines here.
75 """
76
77 def __init__(
78 self,
79 cnxn,
80 services,
81 auth,
82 me_user_ids,
83 query,
84 query_project_names,
85 items_per_page,
86 paginate_start,
87 can,
88 group_by_spec,
89 sort_spec,
90 warnings,
91 errors,
92 use_cached_searches,
93 profiler,
94 project=None):
95 self.cnxn = cnxn
96 self.me_user_ids = me_user_ids
97 self.auth = auth
98 self.logged_in_user_id = auth.user_id or 0
99 self.can = can
100 self.items_per_page = items_per_page
101 self.paginate_start = paginate_start
102 self.group_by_spec = group_by_spec
103 self.sort_spec = sort_spec
104 self.warnings = warnings
105 self.use_cached_searches = use_cached_searches
106 self.profiler = profiler
107
108 self.services = services
109 self.pagination = None
110 self.num_skipped_at_start = 0
111 self.total_count = 0
112 self.errors = errors
113
114 self.project_name = ''
115 if project:
116 self.project_name = project.project_name
117 self.query_projects = []
118 if query_project_names:
119 consider_projects = list(services.project.GetProjectsByName(
120 self.cnxn, query_project_names).values())
121 self.query_projects = [
122 p for p in consider_projects
123 if permissions.UserCanViewProject(
124 self.auth.user_pb, self.auth.effective_ids, p)]
125 if project:
126 self.query_projects.append(project)
127 member_of_all_projects = self.auth.user_pb.is_site_admin or all(
128 framework_bizobj.UserIsInProject(p, self.auth.effective_ids)
129 for p in self.query_projects)
130 self.query_project_ids = sorted([
131 p.project_id for p in self.query_projects])
132 self.query_project_names = sorted([
133 p.project_name for p in self.query_projects])
134
135 config_dict = self.services.config.GetProjectConfigs(
136 self.cnxn, self.query_project_ids)
137 self.harmonized_config = tracker_bizobj.HarmonizeConfigs(
138 list(config_dict.values()))
139
140 # The following fields are filled in as the pipeline progresses.
141 # The value None means that we still need to compute that value.
142 # A shard_key is a tuple (shard_id, subquery).
143 self.users_by_id = {}
144 self.nonviewable_iids = {} # {shard_id: set(iid)}
145 self.unfiltered_iids = {} # {shard_key: [iid, ...]} needing perm checks.
146 self.filtered_iids = {} # {shard_key: [iid, ...]} already perm checked.
147 self.search_limit_reached = {} # {shard_key: [bool, ...]}.
148 self.allowed_iids = [] # Matching iids that user is permitted to view.
149 self.allowed_results = None # results that the user is permitted to view.
150 self.visible_results = None # allowed_results on current pagination page.
151 self.error_responses = set()
152
153 error_msg = _CheckQuery(
154 self.cnxn, self.services, query, self.harmonized_config,
155 self.query_project_ids, member_of_all_projects,
156 warnings=self.warnings)
157 if error_msg:
158 self.errors.query = error_msg
159
160 # Split up query into smaller subqueries that would get the same results
161 # to improve performance. Smaller queries are more likely to get cache
162 # hits and subqueries can be parallelized by querying for them across
163 # multiple shards.
164 self.subqueries = []
165 try:
166 self.subqueries = query2ast.QueryToSubqueries(query)
167 except query2ast.InvalidQueryError:
168 # Ignore errors because they've already been recorded in
169 # self.errors.query.
170 pass
171
172 def SearchForIIDs(self):
173 """Use backends to search each shard and store their results."""
174 with self.profiler.Phase('Checking cache and calling Backends'):
175 rpc_tuples = _StartBackendSearch(
176 self.cnxn, self.query_project_names, self.query_project_ids,
177 self.harmonized_config, self.unfiltered_iids,
178 self.search_limit_reached, self.nonviewable_iids,
179 self.error_responses, self.services, self.me_user_ids,
180 self.logged_in_user_id, self.items_per_page + self.paginate_start,
181 self.subqueries, self.can, self.group_by_spec, self.sort_spec,
182 self.warnings, self.use_cached_searches)
183
184 with self.profiler.Phase('Waiting for Backends'):
185 try:
186 _FinishBackendSearch(rpc_tuples)
187 except Exception as e:
188 logging.exception(e)
189 raise
190
191 if self.error_responses:
192 logging.error('%r error responses. Incomplete search results.',
193 self.error_responses)
194
195 with self.profiler.Phase('Filtering cached results'):
196 for shard_key in self.unfiltered_iids:
197 shard_id, _subquery = shard_key
198 if shard_id not in self.nonviewable_iids:
199 logging.error(
200 'Not displaying shard %r because of no nonviewable_iids', shard_id)
201 self.error_responses.add(shard_id)
202 filtered_shard_iids = []
203 else:
204 unfiltered_shard_iids = self.unfiltered_iids[shard_key]
205 nonviewable_shard_iids = self.nonviewable_iids[shard_id]
206 # TODO(jrobbins): avoid creating large temporary lists.
207 filtered_shard_iids = [iid for iid in unfiltered_shard_iids
208 if iid not in nonviewable_shard_iids]
209 self.filtered_iids[shard_key] = filtered_shard_iids
210
211 seen_iids_by_shard_id = collections.defaultdict(set)
212 with self.profiler.Phase('Dedupping result IIDs across shards'):
213 for shard_key in self.filtered_iids:
214 shard_id, _subquery = shard_key
215 deduped = [iid for iid in self.filtered_iids[shard_key]
216 if iid not in seen_iids_by_shard_id[shard_id]]
217 self.filtered_iids[shard_key] = deduped
218 seen_iids_by_shard_id[shard_id].update(deduped)
219
220 with self.profiler.Phase('Counting all filtered results'):
221 for shard_key in self.filtered_iids:
222 self.total_count += len(self.filtered_iids[shard_key])
223
224 with self.profiler.Phase('Trimming results beyond pagination page'):
225 for shard_key in self.filtered_iids:
226 self.filtered_iids[shard_key] = self.filtered_iids[
227 shard_key][:self.paginate_start + self.items_per_page]
228
229 def MergeAndSortIssues(self):
230 """Merge and sort results from all shards into one combined list."""
231 with self.profiler.Phase('selecting issues to merge and sort'):
232 self._NarrowFilteredIIDs()
233 self.allowed_iids = []
234 for filtered_shard_iids in self.filtered_iids.values():
235 self.allowed_iids.extend(filtered_shard_iids)
236
237 with self.profiler.Phase('getting allowed results'):
238 self.allowed_results = self.services.issue.GetIssues(
239 self.cnxn, self.allowed_iids)
240
241 # Note: At this point, we have results that are only sorted within
242 # each backend's shard. We still need to sort the merged result.
243 self._LookupNeededUsers(self.allowed_results)
244 with self.profiler.Phase('merging and sorting issues'):
245 self.allowed_results = _SortIssues(
246 self.allowed_results, self.harmonized_config, self.users_by_id,
247 self.group_by_spec, self.sort_spec)
248
249 def _NarrowFilteredIIDs(self):
250 """Combine filtered shards into a range of IIDs for issues to sort.
251
252 The niave way is to concatenate shard_iids[:start + num] for all
253 shards then select [start:start + num]. We do better by sampling
254 issues and then determining which of those samples are known to
255 come before start or after start+num. We then trim off all those IIDs
256 and sort a smaller range of IIDs that might actuall be displayed.
257 See the design doc at go/monorail-sorting.
258
259 This method modifies self.fitered_iids and self.num_skipped_at_start.
260 """
261 # Sample issues and skip those that are known to come before start.
262 # See the "Sorting in Monorail" design doc.
263
264 # If the result set is small, don't bother optimizing it.
265 orig_length = _TotalLength(self.filtered_iids)
266 if orig_length < self.items_per_page * 4:
267 return
268
269 # 1. Get sample issues in each shard and sort them all together.
270 last = self.paginate_start + self.items_per_page
271
272 samples_by_shard, sample_iids_to_shard = self._FetchAllSamples(
273 self.filtered_iids)
274 sample_issues = []
275 for issue_dict in samples_by_shard.values():
276 sample_issues.extend(list(issue_dict.values()))
277
278 self._LookupNeededUsers(sample_issues)
279 sample_issues = _SortIssues(
280 sample_issues, self.harmonized_config, self.users_by_id,
281 self.group_by_spec, self.sort_spec)
282 sample_iid_tuples = [
283 (issue.issue_id, sample_iids_to_shard[issue.issue_id])
284 for issue in sample_issues]
285
286 # 2. Trim off some IIDs that are sure to be positioned after last.
287 num_trimmed_end = _TrimEndShardedIIDs(
288 self.filtered_iids, sample_iid_tuples, last)
289 logging.info('Trimmed %r issues from the end of shards', num_trimmed_end)
290
291 # 3. Trim off some IIDs that are sure to be posiitoned before start.
292 keep = _TotalLength(self.filtered_iids) - self.paginate_start
293 # Reverse the sharded lists.
294 _ReverseShards(self.filtered_iids)
295 sample_iid_tuples.reverse()
296 self.num_skipped_at_start = _TrimEndShardedIIDs(
297 self.filtered_iids, sample_iid_tuples, keep)
298 logging.info('Trimmed %r issues from the start of shards',
299 self.num_skipped_at_start)
300 # Reverse sharded lists again to get back into forward order.
301 _ReverseShards(self.filtered_iids)
302
303 def DetermineIssuePosition(self, issue):
304 """Calculate info needed to show the issue flipper.
305
306 Args:
307 issue: The issue currently being viewed.
308
309 Returns:
310 A 3-tuple (prev_iid, index, next_iid) were prev_iid is the
311 IID of the previous issue in the total ordering (or None),
312 index is the index that the current issue has in the total
313 ordering, and next_iid is the next issue (or None). If the current
314 issue is not in the list of results at all, returns None, None, None.
315 """
316 # 1. If the current issue is not in the results at all, then exit.
317 if not any(issue.issue_id in filtered_shard_iids
318 for filtered_shard_iids in self.filtered_iids.values()):
319 return None, None, None
320
321 # 2. Choose and retrieve sample issues in each shard.
322 samples_by_shard, _ = self._FetchAllSamples(self.filtered_iids)
323
324 # 3. Build up partial results for each shard.
325 preceeding_counts = {} # dict {shard_key: num_issues_preceeding_current}
326 prev_candidates, next_candidates = [], []
327 for shard_key in self.filtered_iids:
328 prev_candidate, index_in_shard, next_candidate = (
329 self._DetermineIssuePositionInShard(
330 shard_key, issue, samples_by_shard[shard_key]))
331 preceeding_counts[shard_key] = index_in_shard
332 if prev_candidate:
333 prev_candidates.append(prev_candidate)
334 if next_candidate:
335 next_candidates.append(next_candidate)
336
337 # 4. Combine the results.
338 index = sum(preceeding_counts.values())
339 prev_candidates = _SortIssues(
340 prev_candidates, self.harmonized_config, self.users_by_id,
341 self.group_by_spec, self.sort_spec)
342 prev_iid = prev_candidates[-1].issue_id if prev_candidates else None
343 next_candidates = _SortIssues(
344 next_candidates, self.harmonized_config, self.users_by_id,
345 self.group_by_spec, self.sort_spec)
346 next_iid = next_candidates[0].issue_id if next_candidates else None
347
348 return prev_iid, index, next_iid
349
350 def _DetermineIssuePositionInShard(self, shard_key, issue, sample_dict):
351 """Determine where the given issue would fit into results from a shard."""
352 # See the design doc for details. Basically, it first surveys the results
353 # to bound a range where the given issue would belong, then it fetches the
354 # issues in that range and sorts them.
355
356 filtered_shard_iids = self.filtered_iids[shard_key]
357
358 # 1. Select a sample of issues, leveraging ones we have in RAM already.
359 issues_on_hand = list(sample_dict.values())
360 if issue.issue_id not in sample_dict:
361 issues_on_hand.append(issue)
362
363 self._LookupNeededUsers(issues_on_hand)
364 sorted_on_hand = _SortIssues(
365 issues_on_hand, self.harmonized_config, self.users_by_id,
366 self.group_by_spec, self.sort_spec)
367 sorted_on_hand_iids = [soh.issue_id for soh in sorted_on_hand]
368 index_in_on_hand = sorted_on_hand_iids.index(issue.issue_id)
369
370 # 2. Bound the gap around where issue belongs.
371 if index_in_on_hand == 0:
372 fetch_start = 0
373 else:
374 prev_on_hand_iid = sorted_on_hand_iids[index_in_on_hand - 1]
375 fetch_start = filtered_shard_iids.index(prev_on_hand_iid) + 1
376
377 if index_in_on_hand == len(sorted_on_hand) - 1:
378 fetch_end = len(filtered_shard_iids)
379 else:
380 next_on_hand_iid = sorted_on_hand_iids[index_in_on_hand + 1]
381 fetch_end = filtered_shard_iids.index(next_on_hand_iid)
382
383 # 3. Retrieve all the issues in that gap to get an exact answer.
384 fetched_issues = self.services.issue.GetIssues(
385 self.cnxn, filtered_shard_iids[fetch_start:fetch_end])
386 if issue.issue_id not in filtered_shard_iids[fetch_start:fetch_end]:
387 fetched_issues.append(issue)
388 self._LookupNeededUsers(fetched_issues)
389 sorted_fetched = _SortIssues(
390 fetched_issues, self.harmonized_config, self.users_by_id,
391 self.group_by_spec, self.sort_spec)
392 sorted_fetched_iids = [sf.issue_id for sf in sorted_fetched]
393 index_in_fetched = sorted_fetched_iids.index(issue.issue_id)
394
395 # 4. Find the issues that come immediately before and after the place where
396 # the given issue would belong in this shard.
397 if index_in_fetched > 0:
398 prev_candidate = sorted_fetched[index_in_fetched - 1]
399 elif index_in_on_hand > 0:
400 prev_candidate = sorted_on_hand[index_in_on_hand - 1]
401 else:
402 prev_candidate = None
403
404 if index_in_fetched < len(sorted_fetched) - 1:
405 next_candidate = sorted_fetched[index_in_fetched + 1]
406 elif index_in_on_hand < len(sorted_on_hand) - 1:
407 next_candidate = sorted_on_hand[index_in_on_hand + 1]
408 else:
409 next_candidate = None
410
411 return prev_candidate, fetch_start + index_in_fetched, next_candidate
412
413 def _FetchAllSamples(self, filtered_iids):
414 """Return a dict {shard_key: {iid: sample_issue}}."""
415 samples_by_shard = {} # {shard_key: {iid: sample_issue}}
416 sample_iids_to_shard = {} # {iid: shard_key}
417 all_needed_iids = [] # List of iids to retrieve.
418
419 for shard_key in filtered_iids:
420 on_hand_issues, shard_needed_iids = self._ChooseSampleIssues(
421 filtered_iids[shard_key])
422 samples_by_shard[shard_key] = on_hand_issues
423 for iid in on_hand_issues:
424 sample_iids_to_shard[iid] = shard_key
425 for iid in shard_needed_iids:
426 sample_iids_to_shard[iid] = shard_key
427 all_needed_iids.extend(shard_needed_iids)
428
429 retrieved_samples, _misses = self.services.issue.GetIssuesDict(
430 self.cnxn, all_needed_iids)
431 for retrieved_iid, retrieved_issue in retrieved_samples.items():
432 retr_shard_key = sample_iids_to_shard[retrieved_iid]
433 samples_by_shard[retr_shard_key][retrieved_iid] = retrieved_issue
434
435 return samples_by_shard, sample_iids_to_shard
436
437 def _ChooseSampleIssues(self, issue_ids):
438 """Select a scattering of issues from the list, leveraging RAM cache.
439
440 Args:
441 issue_ids: A list of issue IDs that comprise the results in a shard.
442
443 Returns:
444 A pair (on_hand_issues, needed_iids) where on_hand_issues is
445 an issue dict {iid: issue} of issues already in RAM, and
446 shard_needed_iids is a list of iids of issues that need to be retrieved.
447 """
448 on_hand_issues = {} # {iid: issue} of sample issues already in RAM.
449 needed_iids = [] # [iid, ...] of sample issues not in RAM yet.
450 chunk_size = max(MIN_SAMPLE_CHUNK_SIZE, min(MAX_SAMPLE_CHUNK_SIZE,
451 int(len(issue_ids) // PREFERRED_NUM_CHUNKS)))
452 for i in range(chunk_size, len(issue_ids), chunk_size):
453 issue = self.services.issue.GetAnyOnHandIssue(
454 issue_ids, start=i, end=min(i + chunk_size, len(issue_ids)))
455 if issue:
456 on_hand_issues[issue.issue_id] = issue
457 else:
458 needed_iids.append(issue_ids[i])
459
460 return on_hand_issues, needed_iids
461
462 def _LookupNeededUsers(self, issues):
463 """Look up user info needed to sort issues, if any."""
464 with self.profiler.Phase('lookup of owner, reporter, and cc'):
465 additional_user_views_by_id = (
466 tracker_helpers.MakeViewsForUsersInIssues(
467 self.cnxn, issues, self.services.user,
468 omit_ids=list(self.users_by_id.keys())))
469 self.users_by_id.update(additional_user_views_by_id)
470
471 def Paginate(self):
472 """Fetch matching issues and paginate the search results.
473
474 These two actions are intertwined because we try to only
475 retrieve the Issues on the current pagination page.
476 """
477 # We already got the issues, just display a slice of the visible ones.
478 limit_reached = False
479 for shard_limit_reached in self.search_limit_reached.values():
480 limit_reached |= shard_limit_reached
481 self.pagination = paginate.ArtifactPagination(
482 self.allowed_results,
483 self.items_per_page,
484 self.paginate_start,
485 self.project_name,
486 urls.ISSUE_LIST,
487 total_count=self.total_count,
488 limit_reached=limit_reached,
489 skipped=self.num_skipped_at_start)
490 self.visible_results = self.pagination.visible_results
491
492 # If we were not forced to look up visible users already, do it now.
493 self._LookupNeededUsers(self.visible_results)
494
495 def __repr__(self):
496 """Return a string that shows the internal state of this pipeline."""
497 if self.allowed_iids:
498 shown_allowed_iids = self.allowed_iids[:200]
499 else:
500 shown_allowed_iids = self.allowed_iids
501
502 if self.allowed_results:
503 shown_allowed_results = self.allowed_results[:200]
504 else:
505 shown_allowed_results = self.allowed_results
506
507 parts = [
508 'allowed_iids: %r' % shown_allowed_iids,
509 'allowed_results: %r' % shown_allowed_results,
510 'len(visible_results): %r' % (
511 self.visible_results and len(self.visible_results))]
512 return '%s(%s)' % (self.__class__.__name__, '\n'.join(parts))
513
514
515def _CheckQuery(
516 cnxn, services, query, harmonized_config, project_ids,
517 member_of_all_projects, warnings=None):
518 """Parse the given query and report the first error or None."""
519 try:
520 query_ast = query2ast.ParseUserQuery(
521 query, '', query2ast.BUILTIN_ISSUE_FIELDS, harmonized_config,
522 warnings=warnings)
523 query_ast = ast2ast.PreprocessAST(
524 cnxn, query_ast, project_ids, services, harmonized_config,
525 is_member=member_of_all_projects)
526 except query2ast.InvalidQueryError as e:
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +0100527 return str(e)
Copybara854996b2021-09-07 19:36:02 +0000528 except ast2ast.MalformedQuery as e:
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +0100529 return str(e)
Copybara854996b2021-09-07 19:36:02 +0000530
531 return None
532
533
534def _MakeBackendCallback(func, *args):
535 # type: (Callable[[*Any], Any], *Any) -> Callable[[*Any], Any]
536 """Helper to store a particular function and argument set into a callback.
537
538 Args:
539 func: Function to callback.
540 *args: The arguments to pass into the function.
541
542 Returns:
543 Callback function based on specified arguments.
544 """
545 return lambda: func(*args)
546
547
548def _StartBackendSearch(
549 cnxn, query_project_names, query_project_ids, harmonized_config,
550 unfiltered_iids_dict, search_limit_reached_dict, nonviewable_iids,
551 error_responses, services, me_user_ids, logged_in_user_id, new_url_num,
552 subqueries, can, group_by_spec, sort_spec, warnings, use_cached_searches):
553 # type: (MonorailConnection, Sequence[str], Sequence[int],
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +0100554 # mrproto.tracker_pb2.ProjectIssueConfig,
Copybara854996b2021-09-07 19:36:02 +0000555 # Mapping[Tuple(int, str), Sequence[int]],
556 # Mapping[Tuple(int, str), Sequence[bool]],
557 # Mapping[Tuple(int, str), Collection[int]], Sequence[Tuple(int, str)],
558 # Services, Sequence[int], int, int, Sequence[str], int, str, str,
559 # Sequence[Tuple(str, Sequence[str])], bool) ->
560 # Sequence[Tuple(int, Tuple(int, str),
561 # google.appengine.api.apiproxy_stub_map.UserRPC)]
562 """Request that our backends search and return a list of matching issue IDs.
563
564 Args:
565 cnxn: monorail connection to the database.
566 query_project_names: set of project names to search.
567 query_project_ids: list of project IDs to search.
568 harmonized_config: combined ProjectIssueConfig for all projects being
569 searched.
570 unfiltered_iids_dict: dict {shard_key: [iid, ...]} of unfiltered search
571 results to accumulate into. They need to be later filtered by
572 permissions and merged into filtered_iids_dict.
573 search_limit_reached_dict: dict {shard_key: [bool, ...]} to determine if
574 the search limit of any shard was reached.
575 nonviewable_iids: dict {shard_id: set(iid)} of restricted issues in the
576 projects being searched that the signed in user cannot view.
577 error_responses: shard_iids of shards that encountered errors.
578 services: connections to backends.
579 me_user_ids: Empty list when no user is logged in, or user ID of the logged
580 in user when doing an interactive search, or the viewed user ID when
581 viewing someone else's dashboard, or the subscribing user's ID when
582 evaluating subscriptions. And, any linked accounts.
583 logged_in_user_id: user_id of the logged in user, 0 otherwise
584 new_url_num: the number of issues for BackendSearchPipeline to query.
585 Computed based on pagination offset + number of items per page.
586 subqueries: split up list of query string segments.
587 can: "canned query" number to scope the user's search.
588 group_by_spec: string that lists the grouping order.
589 sort_spec: string that lists the sort order.
590 warnings: list to accumulate warning messages.
591 use_cached_searches: Bool for whether to use cached searches.
592
593 Returns:
594 A list of rpc_tuples that can be passed to _FinishBackendSearch to wait
595 on any remaining backend calls.
596
597 SIDE-EFFECTS:
598 Any data found in memcache is immediately put into unfiltered_iids_dict.
599 As the backends finish their work, _HandleBackendSearchResponse will update
600 unfiltered_iids_dict for those shards.
601
602 Any warnings produced throughout this process will be added to the list
603 warnings.
604 """
605 rpc_tuples = []
606 needed_shard_keys = set()
607 for subquery in subqueries:
608 subquery, warnings = searchpipeline.ReplaceKeywordsWithUserIDs(
609 me_user_ids, subquery)
610 warnings.extend(warnings)
611 for shard_id in range(settings.num_logical_shards):
612 needed_shard_keys.add((shard_id, subquery))
613
614 # 1. Get whatever we can from memcache. Cache hits are only kept if they are
615 # not already expired.
616 project_shard_timestamps = _GetProjectTimestamps(
617 query_project_ids, needed_shard_keys)
618
619 if use_cached_searches:
620 cached_unfiltered_iids_dict, cached_search_limit_reached_dict = (
621 _GetCachedSearchResults(
622 cnxn, query_project_ids, needed_shard_keys,
623 harmonized_config, project_shard_timestamps, services, me_user_ids,
624 can, group_by_spec, sort_spec, warnings))
625 unfiltered_iids_dict.update(cached_unfiltered_iids_dict)
626 search_limit_reached_dict.update(cached_search_limit_reached_dict)
627 for cache_hit_shard_key in unfiltered_iids_dict:
628 needed_shard_keys.remove(cache_hit_shard_key)
629
630 # 2. Each kept cache hit will have unfiltered IIDs, so we filter them by
631 # removing non-viewable IDs.
632 _GetNonviewableIIDs(
633 query_project_ids, logged_in_user_id,
634 set(range(settings.num_logical_shards)),
635 rpc_tuples, nonviewable_iids, project_shard_timestamps,
636 services.cache_manager.processed_invalidations_up_to,
637 use_cached_searches)
638
639 # 3. Hit backends for any shards that are still needed. When these results
640 # come back, they are also put into unfiltered_iids_dict.
641 for shard_key in needed_shard_keys:
642 rpc = _StartBackendSearchCall(
643 query_project_names,
644 shard_key,
645 services.cache_manager.processed_invalidations_up_to,
646 me_user_ids,
647 logged_in_user_id,
648 new_url_num,
649 can=can,
650 sort_spec=sort_spec,
651 group_by_spec=group_by_spec)
652 rpc_tuple = (time.time(), shard_key, rpc)
653 rpc.callback = _MakeBackendCallback(
654 _HandleBackendSearchResponse, query_project_names, rpc_tuple,
655 rpc_tuples, settings.backend_retries, unfiltered_iids_dict,
656 search_limit_reached_dict,
657 services.cache_manager.processed_invalidations_up_to, error_responses,
658 me_user_ids, logged_in_user_id, new_url_num, can, sort_spec,
659 group_by_spec)
660 rpc_tuples.append(rpc_tuple)
661
662 return rpc_tuples
663
664
665def _FinishBackendSearch(rpc_tuples):
666 """Wait for all backend calls to complete, including any retries."""
667 while rpc_tuples:
668 active_rpcs = [rpc for (_time, _shard_key, rpc) in rpc_tuples]
669 # Wait for any active RPC to complete. It's callback function will
670 # automatically be called.
671 finished_rpc = real_wait_any(active_rpcs)
672 # Figure out which rpc_tuple finished and remove it from our list.
673 for rpc_tuple in rpc_tuples:
674 _time, _shard_key, rpc = rpc_tuple
675 if rpc == finished_rpc:
676 rpc_tuples.remove(rpc_tuple)
677 break
678 else:
679 raise ValueError('We somehow finished an RPC that is not in rpc_tuples')
680
681
682def real_wait_any(active_rpcs):
683 """Work around the blocking nature of wait_any().
684
685 wait_any() checks for any finished RPCs, and returns one if found.
686 If no RPC is finished, it simply blocks on the last RPC in the list.
687 This is not the desired behavior because we are not able to detect
688 FAST-FAIL RPC results and retry them if wait_any() is blocked on a
689 request that is taking a long time to do actual work.
690
691 Instead, we do the same check, without blocking on any individual RPC.
692 """
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +0100693 if six.PY3 or settings.local_mode:
Copybara854996b2021-09-07 19:36:02 +0000694 # The development server has very different code for RPCs than the
695 # code used in the hosted environment.
696 return apiproxy_stub_map.UserRPC.wait_any(active_rpcs)
697 while True:
698 finished, _ = apiproxy_stub_map.UserRPC._UserRPC__check_one(active_rpcs)
699 if finished:
700 return finished
701 time.sleep(DELAY_BETWEEN_RPC_COMPLETION_POLLS)
702
703def _GetProjectTimestamps(query_project_ids, needed_shard_keys):
704 """Get a dict of modified_ts values for all specified project-shards."""
705 project_shard_timestamps = {}
706 if query_project_ids:
707 keys = []
708 for pid in query_project_ids:
709 for sid, _subquery in needed_shard_keys:
710 keys.append('%d;%d' % (pid, sid))
711 else:
712 keys = [('all;%d' % sid)
713 for sid, _subquery in needed_shard_keys]
714
715 timestamps_for_project = memcache.get_multi(
716 keys=keys, namespace=settings.memcache_namespace)
717 for key, timestamp in timestamps_for_project.items():
718 pid_str, sid_str = key.split(';')
719 if pid_str == 'all':
720 project_shard_timestamps['all', int(sid_str)] = timestamp
721 else:
722 project_shard_timestamps[int(pid_str), int(sid_str)] = timestamp
723
724 return project_shard_timestamps
725
726
727def _GetNonviewableIIDs(
728 query_project_ids, logged_in_user_id, needed_shard_ids, rpc_tuples,
729 nonviewable_iids, project_shard_timestamps, invalidation_timestep,
730 use_cached_searches):
731 """Build a set of at-risk IIDs, and accumulate RPCs to get uncached ones."""
732 if query_project_ids:
733 keys = []
734 for pid in query_project_ids:
735 for sid in needed_shard_ids:
736 keys.append('%d;%d;%d' % (pid, logged_in_user_id, sid))
737 else:
738 keys = [
739 ('all;%d;%d' % (logged_in_user_id, sid)) for sid in needed_shard_ids
740 ]
741
742 if use_cached_searches:
743 cached_dict = memcache.get_multi(
744 keys, key_prefix='nonviewable:', namespace=settings.memcache_namespace)
745 else:
746 cached_dict = {}
747
748 for sid in needed_shard_ids:
749 if query_project_ids:
750 for pid in query_project_ids:
751 _AccumulateNonviewableIIDs(
752 pid, logged_in_user_id, sid, cached_dict, nonviewable_iids,
753 project_shard_timestamps, rpc_tuples, invalidation_timestep)
754 else:
755 _AccumulateNonviewableIIDs(
756 None, logged_in_user_id, sid, cached_dict, nonviewable_iids,
757 project_shard_timestamps, rpc_tuples, invalidation_timestep)
758
759
760def _AccumulateNonviewableIIDs(
761 pid, logged_in_user_id, sid, cached_dict, nonviewable_iids,
762 project_shard_timestamps, rpc_tuples, invalidation_timestep):
763 """Use one of the retrieved cache entries or call a backend if needed."""
764 if pid is None:
765 key = 'all;%d;%d' % (logged_in_user_id, sid)
766 else:
767 key = '%d;%d;%d' % (pid, logged_in_user_id, sid)
768
769 if key in cached_dict:
770 issue_ids, cached_ts = cached_dict.get(key)
771 modified_ts = project_shard_timestamps.get((pid, sid))
772 if modified_ts is None or modified_ts > cached_ts:
773 logging.info('nonviewable too stale on (project %r, shard %r)',
774 pid, sid)
775 else:
776 logging.info('adding %d nonviewable issue_ids', len(issue_ids))
777 nonviewable_iids[sid] = set(issue_ids)
778
779 if sid not in nonviewable_iids:
Copybara854996b2021-09-07 19:36:02 +0000780 logging.info('starting backend call for nonviewable iids %r', key)
781 rpc = _StartBackendNonviewableCall(
782 pid, logged_in_user_id, sid, invalidation_timestep)
783 rpc_tuple = (time.time(), sid, rpc)
784 rpc.callback = _MakeBackendCallback(
785 _HandleBackendNonviewableResponse, pid, logged_in_user_id, sid,
786 rpc_tuple, rpc_tuples, settings.backend_retries, nonviewable_iids,
787 invalidation_timestep)
788 rpc_tuples.append(rpc_tuple)
789
790
791def _GetCachedSearchResults(
792 cnxn, query_project_ids, needed_shard_keys, harmonized_config,
793 project_shard_timestamps, services, me_user_ids, can, group_by_spec,
794 sort_spec, warnings):
795 """Return a dict of cached search results that are not already stale.
796
797 If it were not for cross-project search, we would simply cache when we do a
798 search and then invalidate when an issue is modified. But, with
799 cross-project search we don't know all the memcache entries that would
800 need to be invalidated. So, instead, we write the search result cache
801 entries and then an initial modified_ts value for each project if it was
802 not already there. And, when we update an issue we write a new
803 modified_ts entry, which implicitly invalidate all search result
804 cache entries that were written earlier because they are now stale. When
805 reading from the cache, we ignore any query project with modified_ts
806 after its search result cache timestamp, because it is stale.
807
808 Args:
809 cnxn: monorail connection to the database.
810 query_project_ids: list of project ID numbers for all projects being
811 searched.
812 needed_shard_keys: set of shard keys that need to be checked.
813 harmonized_config: ProjectIsueConfig with combined information for all
814 projects involved in this search.
815 project_shard_timestamps: a dict {(project_id, shard_id): timestamp, ...}
816 that tells when each shard was last invalidated.
817 services: connections to backends.
818 me_user_ids: Empty list when no user is logged in, or user ID of the logged
819 in user when doing an interactive search, or the viewed user ID when
820 viewing someone else's dashboard, or the subscribing user's ID when
821 evaluating subscriptions. And, any linked accounts.
822 can: "canned query" number to scope the user's search.
823 group_by_spec: string that lists the grouping order.
824 sort_spec: string that lists the sort order.
825 warnings: list to accumulate warning messages.
826
827
828 Returns:
829 Tuple consisting of:
830 A dictionary {shard_id: [issue_id, ...], ...} of unfiltered search result
831 issue IDs. Only shard_ids found in memcache will be in that dictionary.
832 The result issue IDs must be permission checked before they can be
833 considered to be part of the user's result set.
834 A dictionary {shard_id: bool, ...}. The boolean is set to True if
835 the search results limit of the shard is hit.
836 """
837 projects_str = ','.join(str(pid) for pid in sorted(query_project_ids))
838 projects_str = projects_str or 'all'
839 canned_query = savedqueries_helpers.SavedQueryIDToCond(
840 cnxn, services.features, can)
841 canned_query, warnings = searchpipeline.ReplaceKeywordsWithUserIDs(
842 me_user_ids, canned_query)
843 warnings.extend(warnings)
844
845 sd = sorting.ComputeSortDirectives(
846 harmonized_config, group_by_spec, sort_spec)
847 sd_str = ' '.join(sd)
848 memcache_key_prefix = '%s;%s' % (projects_str, canned_query)
849 limit_reached_key_prefix = '%s;%s' % (projects_str, canned_query)
850
851 cached_dict = memcache.get_multi(
852 ['%s;%s;%s;%d' % (memcache_key_prefix, subquery, sd_str, sid)
853 for sid, subquery in needed_shard_keys],
854 namespace=settings.memcache_namespace)
855 cached_search_limit_reached_dict = memcache.get_multi(
856 ['%s;%s;%s;search_limit_reached;%d' % (
857 limit_reached_key_prefix, subquery, sd_str, sid)
858 for sid, subquery in needed_shard_keys],
859 namespace=settings.memcache_namespace)
860
861 unfiltered_dict = {}
862 search_limit_reached_dict = {}
863 for shard_key in needed_shard_keys:
864 shard_id, subquery = shard_key
865 memcache_key = '%s;%s;%s;%d' % (
866 memcache_key_prefix, subquery, sd_str, shard_id)
867 limit_reached_key = '%s;%s;%s;search_limit_reached;%d' % (
868 limit_reached_key_prefix, subquery, sd_str, shard_id)
869 if memcache_key not in cached_dict:
870 logging.info('memcache miss on shard %r', shard_key)
871 continue
872
873 cached_iids, cached_ts = cached_dict[memcache_key]
874 if cached_search_limit_reached_dict.get(limit_reached_key):
875 search_limit_reached, _ = cached_search_limit_reached_dict[
876 limit_reached_key]
877 else:
878 search_limit_reached = False
879
880 stale = False
881 if query_project_ids:
882 for project_id in query_project_ids:
883 modified_ts = project_shard_timestamps.get((project_id, shard_id))
884 if modified_ts is None or modified_ts > cached_ts:
885 stale = True
886 logging.info('memcache too stale on shard %r because of %r',
887 shard_id, project_id)
888 break
889 else:
890 modified_ts = project_shard_timestamps.get(('all', shard_id))
891 if modified_ts is None or modified_ts > cached_ts:
892 stale = True
893 logging.info('memcache too stale on shard %r because of all',
894 shard_id)
895
896 if not stale:
897 unfiltered_dict[shard_key] = cached_iids
898 search_limit_reached_dict[shard_key] = search_limit_reached
899
900 return unfiltered_dict, search_limit_reached_dict
901
902
903def _MakeBackendRequestHeaders(failfast):
904 headers = {
905 # This is needed to allow frontends to talk to backends without going
906 # through a login screen on googleplex.com.
907 # http://wiki/Main/PrometheusInternal#Internal_Applications_and_APIs
908 'X-URLFetch-Service-Id': 'GOOGLEPLEX',
909 }
910 if failfast:
911 headers['X-AppEngine-FailFast'] = 'Yes'
912 return headers
913
914
915def _StartBackendSearchCall(
916 query_project_names,
917 shard_key,
918 invalidation_timestep,
919 me_user_ids,
920 logged_in_user_id,
921 new_url_num,
922 can=None,
923 sort_spec=None,
924 group_by_spec=None,
925 deadline=None,
926 failfast=True):
927 # type: (Sequence[str], Tuple(int, str), int, Sequence[int], int,
928 # int, str, str, int, bool) ->
929 # google.appengine.api.apiproxy_stub_map.UserRPC
930 """Ask a backend to query one shard of the database.
931
932 Args:
933 query_project_names: List of project names queried.
934 shard_key: Tuple specifying which DB shard to query.
935 invalidation_timestep: int timestep to use keep cached items fresh.
936 me_user_ids: Empty list when no user is logged in, or user ID of the logged
937 in user when doing an interactive search, or the viewed user ID when
938 viewing someone else's dashboard, or the subscribing user's ID when
939 evaluating subscriptions. And, any linked accounts.
940 logged_in_user_id: Id of the logged in user.
941 new_url_num: the number of issues for BackendSearchPipeline to query.
942 Computed based on pagination offset + number of items per page.
943 can: Id of th canned query to use.
944 sort_spec: Str specifying how issues should be sorted.
945 group_by_spec: Str specifying how issues should be grouped.
946 deadline: Max time for the RPC to take before failing.
947 failfast: Whether to set the X-AppEngine-FailFast request header.
948
949 Returns:
950 UserRPC for the created RPC call.
951 """
952 shard_id, subquery = shard_key
Adrià Vilanova Martínez515639b2021-07-06 16:43:59 +0200953 backend_host = modules.get_hostname(module='default')
954 url = 'https://%s%s' % (
Copybara854996b2021-09-07 19:36:02 +0000955 backend_host,
956 framework_helpers.FormatURL(
957 [],
958 urls.BACKEND_SEARCH,
959 projects=','.join(query_project_names),
960 q=subquery,
961 start=0,
962 num=new_url_num,
963 can=can,
964 sort=sort_spec,
965 groupby=group_by_spec,
966 logged_in_user_id=logged_in_user_id,
967 me_user_ids=','.join(str(uid) for uid in me_user_ids),
968 shard_id=shard_id,
969 invalidation_timestep=invalidation_timestep))
970 logging.info('\n\nCalling backend: %s', url)
971 rpc = urlfetch.create_rpc(
972 deadline=deadline or settings.backend_deadline)
973 headers = _MakeBackendRequestHeaders(failfast)
974 # follow_redirects=False is needed to avoid a login screen on googleplex.
975 urlfetch.make_fetch_call(rpc, url, follow_redirects=False, headers=headers)
976 return rpc
977
978
979def _StartBackendNonviewableCall(
980 project_id, logged_in_user_id, shard_id, invalidation_timestep,
981 deadline=None, failfast=True):
982 """Ask a backend to query one shard of the database."""
Adrià Vilanova Martínez515639b2021-07-06 16:43:59 +0200983 backend_host = modules.get_hostname(module='default')
984 url = 'https://%s%s' % (backend_host, framework_helpers.FormatURL(
Copybara854996b2021-09-07 19:36:02 +0000985 None, urls.BACKEND_NONVIEWABLE,
986 project_id=project_id or '',
987 logged_in_user_id=logged_in_user_id or '',
988 shard_id=shard_id,
989 invalidation_timestep=invalidation_timestep))
990 logging.info('Calling backend nonviewable: %s', url)
991 rpc = urlfetch.create_rpc(deadline=deadline or settings.backend_deadline)
992 headers = _MakeBackendRequestHeaders(failfast)
993 # follow_redirects=False is needed to avoid a login screen on googleplex.
994 urlfetch.make_fetch_call(rpc, url, follow_redirects=False, headers=headers)
995 return rpc
996
997
998def _HandleBackendSearchResponse(
999 query_project_names, rpc_tuple, rpc_tuples, remaining_retries,
1000 unfiltered_iids, search_limit_reached, invalidation_timestep,
1001 error_responses, me_user_ids, logged_in_user_id, new_url_num, can,
1002 sort_spec, group_by_spec):
1003 # type: (Sequence[str], Tuple(int, Tuple(int, str),
1004 # google.appengine.api.apiproxy_stub_map.UserRPC),
1005 # Sequence[Tuple(int, Tuple(int, str),
1006 # google.appengine.api.apiproxy_stub_map.UserRPC)],
1007 # int, Mapping[Tuple(int, str), Sequence[int]],
1008 # Mapping[Tuple(int, str), bool], int, Collection[Tuple(int, str)],
1009 # Sequence[int], int, int, int, str, str) -> None
1010 #
1011 """Process one backend response and retry if there was an error.
1012
1013 SIDE EFFECTS: This function edits many of the passed in parameters in place.
1014 For example, search_limit_reached and unfiltered_iids are updated with
1015 response data from the RPC, keyed by shard_key.
1016
1017 Args:
1018 query_project_names: List of projects to query.
1019 rpc_tuple: Tuple containing an RPC response object, the time it happened,
1020 and what shard the RPC was queried against.
1021 rpc_tuples: List of RPC responses to mutate with any retry responses that
1022 heppened.
1023 remaining_retries: Number of times left to retry.
1024 unfiltered_iids: Dict of Issue ids, before they've been filtered by
1025 permissions.
1026 search_limit_reached: Dict of whether the search limit for a particular
1027 shard has been hit.
1028 invalidation_timestep: int timestep to use keep cached items fresh.
1029 error_responses:
1030 me_user_ids: List of relevant user IDs. ie: the currently logged in user
1031 and linked account IDs if applicable.
1032 logged_in_user_id: Logged in user's ID.
1033 new_url_num: the number of issues for BackendSearchPipeline to query.
1034 Computed based on pagination offset + number of items per page.
1035 can: Canned query ID to use.
1036 sort_spec: str specifying how issues should be sorted.
1037 group_by_spec: str specifying how issues should be grouped.
1038 """
1039 start_time, shard_key, rpc = rpc_tuple
1040 duration_sec = time.time() - start_time
1041
1042 try:
1043 response = rpc.get_result()
1044 logging.info('call to backend took %d sec', duration_sec)
1045 # Note that response.content has "})]'\n" prepended to it.
1046 json_content = response.content[5:]
1047 logging.info('got json text: %r length %r',
1048 json_content[:framework_constants.LOGGING_MAX_LENGTH],
1049 len(json_content))
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01001050 if json_content == b'':
Copybara854996b2021-09-07 19:36:02 +00001051 raise Exception('Fast fail')
1052 json_data = json.loads(json_content)
1053 unfiltered_iids[shard_key] = json_data['unfiltered_iids']
1054 search_limit_reached[shard_key] = json_data['search_limit_reached']
1055 if json_data.get('error'):
1056 # Don't raise an exception, just log, because these errors are more like
1057 # 400s than 500s, and shouldn't be retried.
1058 logging.error('Backend shard %r returned error "%r"' % (
1059 shard_key, json_data.get('error')))
1060 error_responses.add(shard_key)
1061
1062 except Exception as e:
1063 if duration_sec > FAIL_FAST_LIMIT_SEC: # Don't log fail-fast exceptions.
1064 logging.exception(e)
1065 if not remaining_retries:
1066 logging.error('backend search retries exceeded')
1067 error_responses.add(shard_key)
1068 return # Used all retries, so give up.
1069
1070 if duration_sec >= settings.backend_deadline:
1071 logging.error('backend search on %r took too long', shard_key)
1072 error_responses.add(shard_key)
1073 return # That backend shard is overloaded, so give up.
1074
1075 logging.error('backend call for shard %r failed, retrying', shard_key)
1076 retry_rpc = _StartBackendSearchCall(
1077 query_project_names,
1078 shard_key,
1079 invalidation_timestep,
1080 me_user_ids,
1081 logged_in_user_id,
1082 new_url_num,
1083 can=can,
1084 sort_spec=sort_spec,
1085 group_by_spec=group_by_spec,
1086 failfast=remaining_retries > 2)
1087 retry_rpc_tuple = (time.time(), shard_key, retry_rpc)
1088 retry_rpc.callback = _MakeBackendCallback(
1089 _HandleBackendSearchResponse, query_project_names, retry_rpc_tuple,
1090 rpc_tuples, remaining_retries - 1, unfiltered_iids,
1091 search_limit_reached, invalidation_timestep, error_responses,
1092 me_user_ids, logged_in_user_id, new_url_num, can, sort_spec,
1093 group_by_spec)
1094 rpc_tuples.append(retry_rpc_tuple)
1095
1096
1097def _HandleBackendNonviewableResponse(
1098 project_id, logged_in_user_id, shard_id, rpc_tuple, rpc_tuples,
1099 remaining_retries, nonviewable_iids, invalidation_timestep):
1100 """Process one backend response and retry if there was an error."""
1101 start_time, shard_id, rpc = rpc_tuple
1102 duration_sec = time.time() - start_time
1103
1104 try:
1105 response = rpc.get_result()
1106 logging.info('call to backend nonviewable took %d sec', duration_sec)
1107 # Note that response.content has "})]'\n" prepended to it.
1108 json_content = response.content[5:]
1109 logging.info('got json text: %r length %r',
1110 json_content[:framework_constants.LOGGING_MAX_LENGTH],
1111 len(json_content))
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01001112 if json_content == b'':
Copybara854996b2021-09-07 19:36:02 +00001113 raise Exception('Fast fail')
1114 json_data = json.loads(json_content)
1115 nonviewable_iids[shard_id] = set(json_data['nonviewable'])
1116
1117 except Exception as e:
1118 if duration_sec > FAIL_FAST_LIMIT_SEC: # Don't log fail-fast exceptions.
1119 logging.exception(e)
1120
1121 if not remaining_retries:
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01001122 logging.warning('Used all retries, so give up on shard %r', shard_id)
Copybara854996b2021-09-07 19:36:02 +00001123 return
1124
1125 if duration_sec >= settings.backend_deadline:
1126 logging.error('nonviewable call on %r took too long', shard_id)
1127 return # That backend shard is overloaded, so give up.
1128
1129 logging.error(
1130 'backend nonviewable call for shard %r;%r;%r failed, retrying',
1131 project_id, logged_in_user_id, shard_id)
1132 retry_rpc = _StartBackendNonviewableCall(
1133 project_id, logged_in_user_id, shard_id, invalidation_timestep,
1134 failfast=remaining_retries > 2)
1135 retry_rpc_tuple = (time.time(), shard_id, retry_rpc)
1136 retry_rpc.callback = _MakeBackendCallback(
1137 _HandleBackendNonviewableResponse, project_id, logged_in_user_id,
1138 shard_id, retry_rpc_tuple, rpc_tuples, remaining_retries - 1,
1139 nonviewable_iids, invalidation_timestep)
1140 rpc_tuples.append(retry_rpc_tuple)
1141
1142
1143def _TotalLength(sharded_iids):
1144 """Return the total length of all issue_iids lists."""
1145 return sum(len(issue_iids) for issue_iids in sharded_iids.values())
1146
1147
1148def _ReverseShards(sharded_iids):
1149 """Reverse each issue_iids list in place."""
1150 for shard_key in sharded_iids:
1151 sharded_iids[shard_key].reverse()
1152
1153
1154def _TrimEndShardedIIDs(sharded_iids, sample_iid_tuples, num_needed):
1155 """Trim the IIDs to keep at least num_needed items.
1156
1157 Args:
1158 sharded_iids: dict {shard_key: issue_id_list} for search results. This is
1159 modified in place to remove some trailing issue IDs.
1160 sample_iid_tuples: list of (iid, shard_key) from a sorted list of sample
1161 issues.
1162 num_needed: int minimum total number of items to keep. Some IIDs that are
1163 known to belong in positions > num_needed will be trimmed off.
1164
1165 Returns:
1166 The total number of IIDs removed from the IID lists.
1167 """
1168 # 1. Get (sample_iid, position_in_shard) for each sample.
1169 sample_positions = _CalcSamplePositions(sharded_iids, sample_iid_tuples)
1170
1171 # 2. Walk through the samples, computing a combined lower bound at each
1172 # step until we know that we have passed at least num_needed IIDs.
1173 lower_bound_per_shard = {}
1174 excess_samples = []
1175 for i in range(len(sample_positions)):
1176 _sample_iid, sample_shard_key, pos = sample_positions[i]
1177 lower_bound_per_shard[sample_shard_key] = pos
1178 overall_lower_bound = sum(lower_bound_per_shard.values())
1179 if overall_lower_bound >= num_needed:
1180 excess_samples = sample_positions[i + 1:]
1181 break
1182 else:
1183 return 0 # We went through all samples and never reached num_needed.
1184
1185 # 3. Truncate each shard at the first excess sample in that shard.
1186 already_trimmed = set()
1187 num_trimmed = 0
1188 for _sample_iid, sample_shard_key, pos in excess_samples:
1189 if sample_shard_key not in already_trimmed:
1190 num_trimmed += len(sharded_iids[sample_shard_key]) - pos
1191 sharded_iids[sample_shard_key] = sharded_iids[sample_shard_key][:pos]
1192 already_trimmed.add(sample_shard_key)
1193
1194 return num_trimmed
1195
1196
1197# TODO(jrobbins): Convert this to a python generator.
1198def _CalcSamplePositions(sharded_iids, sample_iids):
1199 """Return [(iid, shard_key, position_in_shard), ...] for each sample."""
1200 # We keep track of how far index() has scanned in each shard to avoid
1201 # starting over at position 0 when looking for the next sample in
1202 # the same shard.
1203 scan_positions = collections.defaultdict(lambda: 0)
1204 sample_positions = []
1205 for sample_iid, sample_shard_key in sample_iids:
1206 try:
1207 pos = sharded_iids.get(sample_shard_key, []).index(
1208 sample_iid, scan_positions[sample_shard_key])
1209 scan_positions[sample_shard_key] = pos
1210 sample_positions.append((sample_iid, sample_shard_key, pos))
1211 except ValueError:
1212 pass
1213
1214 return sample_positions
1215
1216
1217def _SortIssues(issues, config, users_by_id, group_by_spec, sort_spec):
1218 """Sort the found issues based on the request and config values.
1219
1220 Args:
1221 issues: A list of issues to be sorted.
1222 config: A ProjectIssueConfig that could impact sort order.
1223 users_by_id: dictionary {user_id: user_view,...} for all users who
1224 participate in any issue in the entire list.
1225 group_by_spec: string that lists the grouping order
1226 sort_spec: string that lists the sort order
1227
1228
1229 Returns:
1230 A sorted list of issues, based on parameters from mr and config.
1231 """
1232 issues = sorting.SortArtifacts(
1233 issues, config, tracker_helpers.SORTABLE_FIELDS,
1234 tracker_helpers.SORTABLE_FIELDS_POSTPROCESSORS, group_by_spec,
1235 sort_spec, users_by_id=users_by_id)
1236 return issues