Project import generated by Copybara.

GitOrigin-RevId: d9e9e3fb4e31372ec1fb43b178994ca78fa8fe70
diff --git a/search/frontendsearchpipeline.py b/search/frontendsearchpipeline.py
new file mode 100644
index 0000000..367c52f
--- /dev/null
+++ b/search/frontendsearchpipeline.py
@@ -0,0 +1,1237 @@
+# Copyright 2016 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style
+# license that can be found in the LICENSE file or at
+# https://developers.google.com/open-source/licenses/bsd
+
+"""The FrontendSearchPipeline class manages issue search and sorting.
+
+The frontend pipeline checks memcache for cached results in each shard.  It
+then calls backend jobs to do any shards that had a cache miss.  On cache hit,
+the cached results must be filtered by permissions, so the at-risk cache and
+backends are consulted.  Next, the sharded results are combined into an overall
+list of IIDs.  Then, that list is paginated and the issues on the current
+pagination page can be shown.  Alternatively, this class can determine just the
+position the currently shown issue would occupy in the overall sorted list.
+"""
+
+from __future__ import division
+from __future__ import print_function
+from __future__ import absolute_import
+
+import json
+
+import collections
+import logging
+import math
+import random
+import time
+
+from google.appengine.api import apiproxy_stub_map
+from google.appengine.api import memcache
+from google.appengine.api import modules
+from google.appengine.api import urlfetch
+
+import settings
+from features import savedqueries_helpers
+from framework import framework_bizobj
+from framework import framework_constants
+from framework import framework_helpers
+from framework import paginate
+from framework import permissions
+from framework import sorting
+from framework import urls
+from search import ast2ast
+from search import query2ast
+from search import searchpipeline
+from services import fulltext_helpers
+from tracker import tracker_bizobj
+from tracker import tracker_constants
+from tracker import tracker_helpers
+
+
+# Fail-fast responses usually finish in less than 50ms.  If we see a failure
+# in under that amount of time, we don't bother logging it.
+FAIL_FAST_LIMIT_SEC = 0.1
+
+DELAY_BETWEEN_RPC_COMPLETION_POLLS = 0.04  # 40 milliseconds
+
+# The choices help balance the cost of choosing samples vs. the cost of
+# selecting issues that are in a range bounded by neighboring samples.
+# Preferred chunk size parameters were determined by experimentation.
+MIN_SAMPLE_CHUNK_SIZE = int(
+    math.sqrt(tracker_constants.DEFAULT_RESULTS_PER_PAGE))
+MAX_SAMPLE_CHUNK_SIZE = int(math.sqrt(settings.search_limit_per_shard))
+PREFERRED_NUM_CHUNKS = 50
+
+
+# TODO(jojwang): monorail:4127: combine some url parameters info or
+# query info into dicts or tuples to make argument manager easier.
+class FrontendSearchPipeline(object):
+  """Manage the process of issue search, including backends and caching.
+
+  Even though the code is divided into several methods, the public
+  methods should be called in sequence, so the execution of the code
+  is pretty much in the order of the source code lines here.
+  """
+
+  def __init__(
+      self,
+      cnxn,
+      services,
+      auth,
+      me_user_ids,
+      query,
+      query_project_names,
+      items_per_page,
+      paginate_start,
+      can,
+      group_by_spec,
+      sort_spec,
+      warnings,
+      errors,
+      use_cached_searches,
+      profiler,
+      project=None):
+    self.cnxn = cnxn
+    self.me_user_ids = me_user_ids
+    self.auth = auth
+    self.logged_in_user_id = auth.user_id or 0
+    self.can = can
+    self.items_per_page = items_per_page
+    self.paginate_start = paginate_start
+    self.group_by_spec = group_by_spec
+    self.sort_spec = sort_spec
+    self.warnings = warnings
+    self.use_cached_searches = use_cached_searches
+    self.profiler = profiler
+
+    self.services = services
+    self.pagination = None
+    self.num_skipped_at_start = 0
+    self.total_count = 0
+    self.errors = errors
+
+    self.project_name = ''
+    if project:
+      self.project_name = project.project_name
+    self.query_projects = []
+    if query_project_names:
+      consider_projects = list(services.project.GetProjectsByName(
+        self.cnxn, query_project_names).values())
+      self.query_projects = [
+          p for p in consider_projects
+          if permissions.UserCanViewProject(
+              self.auth.user_pb, self.auth.effective_ids, p)]
+    if project:
+      self.query_projects.append(project)
+    member_of_all_projects = self.auth.user_pb.is_site_admin or all(
+        framework_bizobj.UserIsInProject(p, self.auth.effective_ids)
+        for p in self.query_projects)
+    self.query_project_ids = sorted([
+        p.project_id for p in self.query_projects])
+    self.query_project_names = sorted([
+        p.project_name for p in self.query_projects])
+
+    config_dict = self.services.config.GetProjectConfigs(
+        self.cnxn, self.query_project_ids)
+    self.harmonized_config = tracker_bizobj.HarmonizeConfigs(
+        list(config_dict.values()))
+
+    # The following fields are filled in as the pipeline progresses.
+    # The value None means that we still need to compute that value.
+    # A shard_key is a tuple (shard_id, subquery).
+    self.users_by_id = {}
+    self.nonviewable_iids = {}  # {shard_id: set(iid)}
+    self.unfiltered_iids = {}  # {shard_key: [iid, ...]} needing perm checks.
+    self.filtered_iids = {}  # {shard_key: [iid, ...]} already perm checked.
+    self.search_limit_reached = {}  # {shard_key: [bool, ...]}.
+    self.allowed_iids = []  # Matching iids that user is permitted to view.
+    self.allowed_results = None  # results that the user is permitted to view.
+    self.visible_results = None  # allowed_results on current pagination page.
+    self.error_responses = set()
+
+    error_msg = _CheckQuery(
+        self.cnxn, self.services, query, self.harmonized_config,
+        self.query_project_ids, member_of_all_projects,
+        warnings=self.warnings)
+    if error_msg:
+      self.errors.query = error_msg
+
+    # Split up query into smaller subqueries that would get the same results
+    # to improve performance. Smaller queries are more likely to get cache
+    # hits and subqueries can be parallelized by querying for them across
+    # multiple shards.
+    self.subqueries = []
+    try:
+      self.subqueries = query2ast.QueryToSubqueries(query)
+    except query2ast.InvalidQueryError:
+      # Ignore errors because they've already been recorded in
+      # self.errors.query.
+      pass
+
+  def SearchForIIDs(self):
+    """Use backends to search each shard and store their results."""
+    with self.profiler.Phase('Checking cache and calling Backends'):
+      rpc_tuples = _StartBackendSearch(
+          self.cnxn, self.query_project_names, self.query_project_ids,
+          self.harmonized_config, self.unfiltered_iids,
+          self.search_limit_reached, self.nonviewable_iids,
+          self.error_responses, self.services, self.me_user_ids,
+          self.logged_in_user_id, self.items_per_page + self.paginate_start,
+          self.subqueries, self.can, self.group_by_spec, self.sort_spec,
+          self.warnings, self.use_cached_searches)
+
+    with self.profiler.Phase('Waiting for Backends'):
+      try:
+        _FinishBackendSearch(rpc_tuples)
+      except Exception as e:
+        logging.exception(e)
+        raise
+
+    if self.error_responses:
+      logging.error('%r error responses. Incomplete search results.',
+                    self.error_responses)
+
+    with self.profiler.Phase('Filtering cached results'):
+      for shard_key in self.unfiltered_iids:
+        shard_id, _subquery = shard_key
+        if shard_id not in self.nonviewable_iids:
+          logging.error(
+            'Not displaying shard %r because of no nonviewable_iids', shard_id)
+          self.error_responses.add(shard_id)
+          filtered_shard_iids = []
+        else:
+          unfiltered_shard_iids = self.unfiltered_iids[shard_key]
+          nonviewable_shard_iids = self.nonviewable_iids[shard_id]
+          # TODO(jrobbins): avoid creating large temporary lists.
+          filtered_shard_iids = [iid for iid in unfiltered_shard_iids
+                                 if iid not in nonviewable_shard_iids]
+        self.filtered_iids[shard_key] = filtered_shard_iids
+
+    seen_iids_by_shard_id = collections.defaultdict(set)
+    with self.profiler.Phase('Dedupping result IIDs across shards'):
+      for shard_key in self.filtered_iids:
+        shard_id, _subquery = shard_key
+        deduped = [iid for iid in self.filtered_iids[shard_key]
+                   if iid not in seen_iids_by_shard_id[shard_id]]
+        self.filtered_iids[shard_key] = deduped
+        seen_iids_by_shard_id[shard_id].update(deduped)
+
+    with self.profiler.Phase('Counting all filtered results'):
+      for shard_key in self.filtered_iids:
+        self.total_count += len(self.filtered_iids[shard_key])
+
+    with self.profiler.Phase('Trimming results beyond pagination page'):
+      for shard_key in self.filtered_iids:
+        self.filtered_iids[shard_key] = self.filtered_iids[
+            shard_key][:self.paginate_start + self.items_per_page]
+
+  def MergeAndSortIssues(self):
+    """Merge and sort results from all shards into one combined list."""
+    with self.profiler.Phase('selecting issues to merge and sort'):
+      self._NarrowFilteredIIDs()
+      self.allowed_iids = []
+      for filtered_shard_iids in self.filtered_iids.values():
+        self.allowed_iids.extend(filtered_shard_iids)
+
+    with self.profiler.Phase('getting allowed results'):
+      self.allowed_results = self.services.issue.GetIssues(
+          self.cnxn, self.allowed_iids)
+
+    # Note: At this point, we have results that are only sorted within
+    # each backend's shard.  We still need to sort the merged result.
+    self._LookupNeededUsers(self.allowed_results)
+    with self.profiler.Phase('merging and sorting issues'):
+      self.allowed_results = _SortIssues(
+          self.allowed_results, self.harmonized_config, self.users_by_id,
+          self.group_by_spec, self.sort_spec)
+
+  def _NarrowFilteredIIDs(self):
+    """Combine filtered shards into a range of IIDs for issues to sort.
+
+    The niave way is to concatenate shard_iids[:start + num] for all
+    shards then select [start:start + num].  We do better by sampling
+    issues and then determining which of those samples are known to
+    come before start or after start+num.  We then trim off all those IIDs
+    and sort a smaller range of IIDs that might actuall be displayed.
+    See the design doc at go/monorail-sorting.
+
+    This method modifies self.fitered_iids and self.num_skipped_at_start.
+    """
+    # Sample issues and skip those that are known to come before start.
+    # See the "Sorting in Monorail" design doc.
+
+    # If the result set is small, don't bother optimizing it.
+    orig_length = _TotalLength(self.filtered_iids)
+    if orig_length < self.items_per_page * 4:
+      return
+
+    # 1. Get sample issues in each shard and sort them all together.
+    last = self.paginate_start + self.items_per_page
+
+    samples_by_shard, sample_iids_to_shard = self._FetchAllSamples(
+        self.filtered_iids)
+    sample_issues = []
+    for issue_dict in samples_by_shard.values():
+      sample_issues.extend(list(issue_dict.values()))
+
+    self._LookupNeededUsers(sample_issues)
+    sample_issues = _SortIssues(
+        sample_issues, self.harmonized_config, self.users_by_id,
+        self.group_by_spec, self.sort_spec)
+    sample_iid_tuples = [
+        (issue.issue_id, sample_iids_to_shard[issue.issue_id])
+        for issue in sample_issues]
+
+    # 2. Trim off some IIDs that are sure to be positioned after last.
+    num_trimmed_end = _TrimEndShardedIIDs(
+        self.filtered_iids, sample_iid_tuples, last)
+    logging.info('Trimmed %r issues from the end of shards', num_trimmed_end)
+
+    # 3. Trim off some IIDs that are sure to be posiitoned before start.
+    keep = _TotalLength(self.filtered_iids) - self.paginate_start
+    # Reverse the sharded lists.
+    _ReverseShards(self.filtered_iids)
+    sample_iid_tuples.reverse()
+    self.num_skipped_at_start = _TrimEndShardedIIDs(
+        self.filtered_iids, sample_iid_tuples, keep)
+    logging.info('Trimmed %r issues from the start of shards',
+                 self.num_skipped_at_start)
+    # Reverse sharded lists again to get back into forward order.
+    _ReverseShards(self.filtered_iids)
+
+  def DetermineIssuePosition(self, issue):
+    """Calculate info needed to show the issue flipper.
+
+    Args:
+      issue: The issue currently being viewed.
+
+    Returns:
+      A 3-tuple (prev_iid, index, next_iid) were prev_iid is the
+      IID of the previous issue in the total ordering (or None),
+      index is the index that the current issue has in the total
+      ordering, and next_iid is the next issue (or None).  If the current
+      issue is not in the list of results at all, returns None, None, None.
+    """
+    # 1. If the current issue is not in the results at all, then exit.
+    if not any(issue.issue_id in filtered_shard_iids
+               for filtered_shard_iids in self.filtered_iids.values()):
+      return None, None, None
+
+    # 2. Choose and retrieve sample issues in each shard.
+    samples_by_shard, _ = self._FetchAllSamples(self.filtered_iids)
+
+    # 3. Build up partial results for each shard.
+    preceeding_counts = {}  # dict {shard_key: num_issues_preceeding_current}
+    prev_candidates, next_candidates = [], []
+    for shard_key in self.filtered_iids:
+      prev_candidate, index_in_shard, next_candidate = (
+          self._DetermineIssuePositionInShard(
+              shard_key, issue, samples_by_shard[shard_key]))
+      preceeding_counts[shard_key] = index_in_shard
+      if prev_candidate:
+        prev_candidates.append(prev_candidate)
+      if next_candidate:
+        next_candidates.append(next_candidate)
+
+    # 4. Combine the results.
+    index = sum(preceeding_counts.values())
+    prev_candidates = _SortIssues(
+        prev_candidates, self.harmonized_config, self.users_by_id,
+        self.group_by_spec, self.sort_spec)
+    prev_iid = prev_candidates[-1].issue_id if prev_candidates else None
+    next_candidates = _SortIssues(
+        next_candidates, self.harmonized_config, self.users_by_id,
+        self.group_by_spec, self.sort_spec)
+    next_iid = next_candidates[0].issue_id if next_candidates else None
+
+    return prev_iid, index, next_iid
+
+  def _DetermineIssuePositionInShard(self, shard_key, issue, sample_dict):
+    """Determine where the given issue would fit into results from a shard."""
+    # See the design doc for details.  Basically, it first surveys the results
+    # to bound a range where the given issue would belong, then it fetches the
+    # issues in that range and sorts them.
+
+    filtered_shard_iids = self.filtered_iids[shard_key]
+
+    # 1. Select a sample of issues, leveraging ones we have in RAM already.
+    issues_on_hand = list(sample_dict.values())
+    if issue.issue_id not in sample_dict:
+      issues_on_hand.append(issue)
+
+    self._LookupNeededUsers(issues_on_hand)
+    sorted_on_hand = _SortIssues(
+        issues_on_hand, self.harmonized_config, self.users_by_id,
+        self.group_by_spec, self.sort_spec)
+    sorted_on_hand_iids = [soh.issue_id for soh in sorted_on_hand]
+    index_in_on_hand = sorted_on_hand_iids.index(issue.issue_id)
+
+    # 2. Bound the gap around where issue belongs.
+    if index_in_on_hand == 0:
+      fetch_start = 0
+    else:
+      prev_on_hand_iid = sorted_on_hand_iids[index_in_on_hand - 1]
+      fetch_start = filtered_shard_iids.index(prev_on_hand_iid) + 1
+
+    if index_in_on_hand == len(sorted_on_hand) - 1:
+      fetch_end = len(filtered_shard_iids)
+    else:
+      next_on_hand_iid = sorted_on_hand_iids[index_in_on_hand + 1]
+      fetch_end = filtered_shard_iids.index(next_on_hand_iid)
+
+    # 3. Retrieve all the issues in that gap to get an exact answer.
+    fetched_issues = self.services.issue.GetIssues(
+        self.cnxn, filtered_shard_iids[fetch_start:fetch_end])
+    if issue.issue_id not in filtered_shard_iids[fetch_start:fetch_end]:
+      fetched_issues.append(issue)
+    self._LookupNeededUsers(fetched_issues)
+    sorted_fetched = _SortIssues(
+        fetched_issues, self.harmonized_config, self.users_by_id,
+        self.group_by_spec, self.sort_spec)
+    sorted_fetched_iids = [sf.issue_id for sf in sorted_fetched]
+    index_in_fetched = sorted_fetched_iids.index(issue.issue_id)
+
+    # 4. Find the issues that come immediately before and after the place where
+    # the given issue would belong in this shard.
+    if index_in_fetched > 0:
+      prev_candidate = sorted_fetched[index_in_fetched - 1]
+    elif index_in_on_hand > 0:
+      prev_candidate = sorted_on_hand[index_in_on_hand - 1]
+    else:
+      prev_candidate = None
+
+    if index_in_fetched < len(sorted_fetched) - 1:
+      next_candidate = sorted_fetched[index_in_fetched + 1]
+    elif index_in_on_hand < len(sorted_on_hand) - 1:
+      next_candidate = sorted_on_hand[index_in_on_hand + 1]
+    else:
+      next_candidate = None
+
+    return prev_candidate, fetch_start + index_in_fetched, next_candidate
+
+  def _FetchAllSamples(self, filtered_iids):
+    """Return a dict {shard_key: {iid: sample_issue}}."""
+    samples_by_shard = {}  # {shard_key: {iid: sample_issue}}
+    sample_iids_to_shard = {}  # {iid: shard_key}
+    all_needed_iids = []  # List of iids to retrieve.
+
+    for shard_key in filtered_iids:
+      on_hand_issues, shard_needed_iids = self._ChooseSampleIssues(
+          filtered_iids[shard_key])
+      samples_by_shard[shard_key] = on_hand_issues
+      for iid in on_hand_issues:
+        sample_iids_to_shard[iid] = shard_key
+      for iid in shard_needed_iids:
+        sample_iids_to_shard[iid] = shard_key
+      all_needed_iids.extend(shard_needed_iids)
+
+    retrieved_samples, _misses = self.services.issue.GetIssuesDict(
+        self.cnxn, all_needed_iids)
+    for retrieved_iid, retrieved_issue in retrieved_samples.items():
+      retr_shard_key = sample_iids_to_shard[retrieved_iid]
+      samples_by_shard[retr_shard_key][retrieved_iid] = retrieved_issue
+
+    return samples_by_shard, sample_iids_to_shard
+
+  def _ChooseSampleIssues(self, issue_ids):
+    """Select a scattering of issues from the list, leveraging RAM cache.
+
+    Args:
+      issue_ids: A list of issue IDs that comprise the results in a shard.
+
+    Returns:
+      A pair (on_hand_issues, needed_iids) where on_hand_issues is
+      an issue dict {iid: issue} of issues already in RAM, and
+      shard_needed_iids is a list of iids of issues that need to be retrieved.
+    """
+    on_hand_issues = {}  # {iid: issue} of sample issues already in RAM.
+    needed_iids = []  # [iid, ...] of sample issues not in RAM yet.
+    chunk_size = max(MIN_SAMPLE_CHUNK_SIZE, min(MAX_SAMPLE_CHUNK_SIZE,
+        int(len(issue_ids) // PREFERRED_NUM_CHUNKS)))
+    for i in range(chunk_size, len(issue_ids), chunk_size):
+      issue = self.services.issue.GetAnyOnHandIssue(
+          issue_ids, start=i, end=min(i + chunk_size, len(issue_ids)))
+      if issue:
+        on_hand_issues[issue.issue_id] = issue
+      else:
+        needed_iids.append(issue_ids[i])
+
+    return on_hand_issues, needed_iids
+
+  def _LookupNeededUsers(self, issues):
+    """Look up user info needed to sort issues, if any."""
+    with self.profiler.Phase('lookup of owner, reporter, and cc'):
+      additional_user_views_by_id = (
+          tracker_helpers.MakeViewsForUsersInIssues(
+              self.cnxn, issues, self.services.user,
+              omit_ids=list(self.users_by_id.keys())))
+      self.users_by_id.update(additional_user_views_by_id)
+
+  def Paginate(self):
+    """Fetch matching issues and paginate the search results.
+
+    These two actions are intertwined because we try to only
+    retrieve the Issues on the current pagination page.
+    """
+    # We already got the issues, just display a slice of the visible ones.
+    limit_reached = False
+    for shard_limit_reached in self.search_limit_reached.values():
+      limit_reached |= shard_limit_reached
+    self.pagination = paginate.ArtifactPagination(
+        self.allowed_results,
+        self.items_per_page,
+        self.paginate_start,
+        self.project_name,
+        urls.ISSUE_LIST,
+        total_count=self.total_count,
+        limit_reached=limit_reached,
+        skipped=self.num_skipped_at_start)
+    self.visible_results = self.pagination.visible_results
+
+    # If we were not forced to look up visible users already, do it now.
+    self._LookupNeededUsers(self.visible_results)
+
+  def __repr__(self):
+    """Return a string that shows the internal state of this pipeline."""
+    if self.allowed_iids:
+      shown_allowed_iids = self.allowed_iids[:200]
+    else:
+      shown_allowed_iids = self.allowed_iids
+
+    if self.allowed_results:
+      shown_allowed_results = self.allowed_results[:200]
+    else:
+      shown_allowed_results = self.allowed_results
+
+    parts = [
+        'allowed_iids: %r' % shown_allowed_iids,
+        'allowed_results: %r' % shown_allowed_results,
+        'len(visible_results): %r' % (
+            self.visible_results and len(self.visible_results))]
+    return '%s(%s)' % (self.__class__.__name__, '\n'.join(parts))
+
+
+def _CheckQuery(
+    cnxn, services, query, harmonized_config, project_ids,
+    member_of_all_projects, warnings=None):
+  """Parse the given query and report the first error or None."""
+  try:
+    query_ast = query2ast.ParseUserQuery(
+        query, '', query2ast.BUILTIN_ISSUE_FIELDS, harmonized_config,
+        warnings=warnings)
+    query_ast = ast2ast.PreprocessAST(
+        cnxn, query_ast, project_ids, services, harmonized_config,
+        is_member=member_of_all_projects)
+  except query2ast.InvalidQueryError as e:
+    return e.message
+  except ast2ast.MalformedQuery as e:
+    return e.message
+
+  return None
+
+
+def _MakeBackendCallback(func, *args):
+  # type: (Callable[[*Any], Any], *Any) -> Callable[[*Any], Any]
+  """Helper to store a particular function and argument set into a callback.
+
+  Args:
+    func: Function to callback.
+    *args: The arguments to pass into the function.
+
+  Returns:
+    Callback function based on specified arguments.
+  """
+  return lambda: func(*args)
+
+
+def _StartBackendSearch(
+    cnxn, query_project_names, query_project_ids, harmonized_config,
+    unfiltered_iids_dict, search_limit_reached_dict, nonviewable_iids,
+    error_responses, services, me_user_ids, logged_in_user_id, new_url_num,
+    subqueries, can, group_by_spec, sort_spec, warnings, use_cached_searches):
+  # type: (MonorailConnection, Sequence[str], Sequence[int],
+  #     proto.tracker_pb2.ProjectIssueConfig,
+  #     Mapping[Tuple(int, str), Sequence[int]],
+  #     Mapping[Tuple(int, str), Sequence[bool]],
+  #     Mapping[Tuple(int, str), Collection[int]], Sequence[Tuple(int, str)],
+  #     Services, Sequence[int], int, int, Sequence[str], int, str, str,
+  #     Sequence[Tuple(str, Sequence[str])], bool) ->
+  #     Sequence[Tuple(int, Tuple(int, str),
+  #         google.appengine.api.apiproxy_stub_map.UserRPC)]
+  """Request that our backends search and return a list of matching issue IDs.
+
+  Args:
+    cnxn: monorail connection to the database.
+    query_project_names: set of project names to search.
+    query_project_ids: list of project IDs to search.
+    harmonized_config: combined ProjectIssueConfig for all projects being
+        searched.
+    unfiltered_iids_dict: dict {shard_key: [iid, ...]} of unfiltered search
+        results to accumulate into.  They need to be later filtered by
+        permissions and merged into filtered_iids_dict.
+    search_limit_reached_dict: dict {shard_key: [bool, ...]} to determine if
+        the search limit of any shard was reached.
+    nonviewable_iids: dict {shard_id: set(iid)} of restricted issues in the
+        projects being searched that the signed in user cannot view.
+    error_responses: shard_iids of shards that encountered errors.
+    services: connections to backends.
+    me_user_ids: Empty list when no user is logged in, or user ID of the logged
+        in user when doing an interactive search, or the viewed user ID when
+        viewing someone else's dashboard, or the subscribing user's ID when
+        evaluating subscriptions.  And, any linked accounts.
+    logged_in_user_id: user_id of the logged in user, 0 otherwise
+    new_url_num: the number of issues for BackendSearchPipeline to query.
+        Computed based on pagination offset + number of items per page.
+    subqueries: split up list of query string segments.
+    can: "canned query" number to scope the user's search.
+    group_by_spec: string that lists the grouping order.
+    sort_spec: string that lists the sort order.
+    warnings: list to accumulate warning messages.
+    use_cached_searches: Bool for whether to use cached searches.
+
+  Returns:
+    A list of rpc_tuples that can be passed to _FinishBackendSearch to wait
+    on any remaining backend calls.
+
+  SIDE-EFFECTS:
+    Any data found in memcache is immediately put into unfiltered_iids_dict.
+    As the backends finish their work, _HandleBackendSearchResponse will update
+    unfiltered_iids_dict for those shards.
+
+    Any warnings produced throughout this process will be added to the list
+    warnings.
+  """
+  rpc_tuples = []
+  needed_shard_keys = set()
+  for subquery in subqueries:
+    subquery, warnings = searchpipeline.ReplaceKeywordsWithUserIDs(
+        me_user_ids, subquery)
+    warnings.extend(warnings)
+    for shard_id in range(settings.num_logical_shards):
+      needed_shard_keys.add((shard_id, subquery))
+
+  # 1. Get whatever we can from memcache.  Cache hits are only kept if they are
+  # not already expired.
+  project_shard_timestamps = _GetProjectTimestamps(
+      query_project_ids, needed_shard_keys)
+
+  if use_cached_searches:
+    cached_unfiltered_iids_dict, cached_search_limit_reached_dict = (
+        _GetCachedSearchResults(
+            cnxn, query_project_ids, needed_shard_keys,
+            harmonized_config, project_shard_timestamps, services, me_user_ids,
+            can, group_by_spec, sort_spec, warnings))
+    unfiltered_iids_dict.update(cached_unfiltered_iids_dict)
+    search_limit_reached_dict.update(cached_search_limit_reached_dict)
+  for cache_hit_shard_key in unfiltered_iids_dict:
+    needed_shard_keys.remove(cache_hit_shard_key)
+
+  # 2. Each kept cache hit will have unfiltered IIDs, so we filter them by
+  # removing non-viewable IDs.
+  _GetNonviewableIIDs(
+    query_project_ids, logged_in_user_id,
+    set(range(settings.num_logical_shards)),
+    rpc_tuples, nonviewable_iids, project_shard_timestamps,
+    services.cache_manager.processed_invalidations_up_to,
+    use_cached_searches)
+
+  # 3. Hit backends for any shards that are still needed.  When these results
+  # come back, they are also put into unfiltered_iids_dict.
+  for shard_key in needed_shard_keys:
+    rpc = _StartBackendSearchCall(
+        query_project_names,
+        shard_key,
+        services.cache_manager.processed_invalidations_up_to,
+        me_user_ids,
+        logged_in_user_id,
+        new_url_num,
+        can=can,
+        sort_spec=sort_spec,
+        group_by_spec=group_by_spec)
+    rpc_tuple = (time.time(), shard_key, rpc)
+    rpc.callback = _MakeBackendCallback(
+        _HandleBackendSearchResponse, query_project_names, rpc_tuple,
+        rpc_tuples, settings.backend_retries, unfiltered_iids_dict,
+        search_limit_reached_dict,
+        services.cache_manager.processed_invalidations_up_to, error_responses,
+        me_user_ids, logged_in_user_id, new_url_num, can, sort_spec,
+        group_by_spec)
+    rpc_tuples.append(rpc_tuple)
+
+  return rpc_tuples
+
+
+def _FinishBackendSearch(rpc_tuples):
+  """Wait for all backend calls to complete, including any retries."""
+  while rpc_tuples:
+    active_rpcs = [rpc for (_time, _shard_key, rpc) in rpc_tuples]
+    # Wait for any active RPC to complete.  It's callback function will
+    # automatically be called.
+    finished_rpc = real_wait_any(active_rpcs)
+    # Figure out which rpc_tuple finished and remove it from our list.
+    for rpc_tuple in rpc_tuples:
+      _time, _shard_key, rpc = rpc_tuple
+      if rpc == finished_rpc:
+        rpc_tuples.remove(rpc_tuple)
+        break
+    else:
+      raise ValueError('We somehow finished an RPC that is not in rpc_tuples')
+
+
+def real_wait_any(active_rpcs):
+  """Work around the blocking nature of wait_any().
+
+  wait_any() checks for any finished RPCs, and returns one if found.
+  If no RPC is finished, it simply blocks on the last RPC in the list.
+  This is not the desired behavior because we are not able to detect
+  FAST-FAIL RPC results and retry them if wait_any() is blocked on a
+  request that is taking a long time to do actual work.
+
+  Instead, we do the same check, without blocking on any individual RPC.
+  """
+  if settings.local_mode:
+    # The development server has very different code for RPCs than the
+    # code used in the hosted environment.
+    return apiproxy_stub_map.UserRPC.wait_any(active_rpcs)
+  while True:
+    finished, _ = apiproxy_stub_map.UserRPC._UserRPC__check_one(active_rpcs)
+    if finished:
+      return finished
+    time.sleep(DELAY_BETWEEN_RPC_COMPLETION_POLLS)
+
+def _GetProjectTimestamps(query_project_ids, needed_shard_keys):
+  """Get a dict of modified_ts values for all specified project-shards."""
+  project_shard_timestamps = {}
+  if query_project_ids:
+    keys = []
+    for pid in query_project_ids:
+      for sid, _subquery in needed_shard_keys:
+        keys.append('%d;%d' % (pid, sid))
+  else:
+    keys = [('all;%d' % sid)
+            for sid, _subquery in needed_shard_keys]
+
+  timestamps_for_project = memcache.get_multi(
+      keys=keys, namespace=settings.memcache_namespace)
+  for key, timestamp in timestamps_for_project.items():
+    pid_str, sid_str = key.split(';')
+    if pid_str == 'all':
+      project_shard_timestamps['all', int(sid_str)] = timestamp
+    else:
+      project_shard_timestamps[int(pid_str), int(sid_str)] = timestamp
+
+  return project_shard_timestamps
+
+
+def _GetNonviewableIIDs(
+    query_project_ids, logged_in_user_id, needed_shard_ids, rpc_tuples,
+    nonviewable_iids, project_shard_timestamps, invalidation_timestep,
+    use_cached_searches):
+  """Build a set of at-risk IIDs, and accumulate RPCs to get uncached ones."""
+  if query_project_ids:
+    keys = []
+    for pid in query_project_ids:
+      for sid in needed_shard_ids:
+        keys.append('%d;%d;%d' % (pid, logged_in_user_id, sid))
+  else:
+    keys = [
+        ('all;%d;%d' % (logged_in_user_id, sid)) for sid in needed_shard_ids
+    ]
+
+  if use_cached_searches:
+    cached_dict = memcache.get_multi(
+        keys, key_prefix='nonviewable:', namespace=settings.memcache_namespace)
+  else:
+    cached_dict = {}
+
+  for sid in needed_shard_ids:
+    if query_project_ids:
+      for pid in query_project_ids:
+        _AccumulateNonviewableIIDs(
+            pid, logged_in_user_id, sid, cached_dict, nonviewable_iids,
+            project_shard_timestamps, rpc_tuples, invalidation_timestep)
+    else:
+      _AccumulateNonviewableIIDs(
+          None, logged_in_user_id, sid, cached_dict, nonviewable_iids,
+          project_shard_timestamps, rpc_tuples, invalidation_timestep)
+
+
+def _AccumulateNonviewableIIDs(
+    pid, logged_in_user_id, sid, cached_dict, nonviewable_iids,
+    project_shard_timestamps, rpc_tuples, invalidation_timestep):
+  """Use one of the retrieved cache entries or call a backend if needed."""
+  if pid is None:
+    key = 'all;%d;%d' % (logged_in_user_id, sid)
+  else:
+    key = '%d;%d;%d' % (pid, logged_in_user_id, sid)
+
+  if key in cached_dict:
+    issue_ids, cached_ts = cached_dict.get(key)
+    modified_ts = project_shard_timestamps.get((pid, sid))
+    if modified_ts is None or modified_ts > cached_ts:
+      logging.info('nonviewable too stale on (project %r, shard %r)',
+                   pid, sid)
+    else:
+      logging.info('adding %d nonviewable issue_ids', len(issue_ids))
+      nonviewable_iids[sid] = set(issue_ids)
+
+  if sid not in nonviewable_iids:
+    logging.info('nonviewable for %r not found', key)
+    logging.info('starting backend call for nonviewable iids %r', key)
+    rpc = _StartBackendNonviewableCall(
+      pid, logged_in_user_id, sid, invalidation_timestep)
+    rpc_tuple = (time.time(), sid, rpc)
+    rpc.callback = _MakeBackendCallback(
+        _HandleBackendNonviewableResponse, pid, logged_in_user_id, sid,
+        rpc_tuple, rpc_tuples, settings.backend_retries, nonviewable_iids,
+        invalidation_timestep)
+    rpc_tuples.append(rpc_tuple)
+
+
+def _GetCachedSearchResults(
+    cnxn, query_project_ids, needed_shard_keys, harmonized_config,
+    project_shard_timestamps, services, me_user_ids, can, group_by_spec,
+    sort_spec, warnings):
+  """Return a dict of cached search results that are not already stale.
+
+  If it were not for cross-project search, we would simply cache when we do a
+  search and then invalidate when an issue is modified.  But, with
+  cross-project search we don't know all the memcache entries that would
+  need to be invalidated.  So, instead, we write the search result cache
+  entries and then an initial modified_ts value for each project if it was
+  not already there. And, when we update an issue we write a new
+  modified_ts entry, which implicitly invalidate all search result
+  cache entries that were written earlier because they are now stale.  When
+  reading from the cache, we ignore any query project with modified_ts
+  after its search result cache timestamp, because it is stale.
+
+  Args:
+    cnxn: monorail connection to the database.
+    query_project_ids: list of project ID numbers for all projects being
+        searched.
+    needed_shard_keys: set of shard keys that need to be checked.
+    harmonized_config: ProjectIsueConfig with combined information for all
+        projects involved in this search.
+    project_shard_timestamps: a dict {(project_id, shard_id): timestamp, ...}
+        that tells when each shard was last invalidated.
+    services: connections to backends.
+    me_user_ids: Empty list when no user is logged in, or user ID of the logged
+        in user when doing an interactive search, or the viewed user ID when
+        viewing someone else's dashboard, or the subscribing user's ID when
+        evaluating subscriptions.  And, any linked accounts.
+    can: "canned query" number to scope the user's search.
+    group_by_spec: string that lists the grouping order.
+    sort_spec: string that lists the sort order.
+    warnings: list to accumulate warning messages.
+
+
+  Returns:
+    Tuple consisting of:
+      A dictionary {shard_id: [issue_id, ...], ...} of unfiltered search result
+      issue IDs. Only shard_ids found in memcache will be in that dictionary.
+      The result issue IDs must be permission checked before they can be
+      considered to be part of the user's result set.
+      A dictionary {shard_id: bool, ...}. The boolean is set to True if
+      the search results limit of the shard is hit.
+  """
+  projects_str = ','.join(str(pid) for pid in sorted(query_project_ids))
+  projects_str = projects_str or 'all'
+  canned_query = savedqueries_helpers.SavedQueryIDToCond(
+      cnxn, services.features, can)
+  canned_query, warnings = searchpipeline.ReplaceKeywordsWithUserIDs(
+      me_user_ids, canned_query)
+  warnings.extend(warnings)
+
+  sd = sorting.ComputeSortDirectives(
+      harmonized_config, group_by_spec, sort_spec)
+  sd_str = ' '.join(sd)
+  memcache_key_prefix = '%s;%s' % (projects_str, canned_query)
+  limit_reached_key_prefix = '%s;%s' % (projects_str, canned_query)
+
+  cached_dict = memcache.get_multi(
+      ['%s;%s;%s;%d' % (memcache_key_prefix, subquery, sd_str, sid)
+       for sid, subquery in needed_shard_keys],
+      namespace=settings.memcache_namespace)
+  cached_search_limit_reached_dict = memcache.get_multi(
+      ['%s;%s;%s;search_limit_reached;%d' % (
+          limit_reached_key_prefix, subquery, sd_str, sid)
+       for sid, subquery in needed_shard_keys],
+      namespace=settings.memcache_namespace)
+
+  unfiltered_dict = {}
+  search_limit_reached_dict = {}
+  for shard_key in needed_shard_keys:
+    shard_id, subquery = shard_key
+    memcache_key = '%s;%s;%s;%d' % (
+        memcache_key_prefix, subquery, sd_str, shard_id)
+    limit_reached_key = '%s;%s;%s;search_limit_reached;%d' % (
+        limit_reached_key_prefix, subquery, sd_str, shard_id)
+    if memcache_key not in cached_dict:
+      logging.info('memcache miss on shard %r', shard_key)
+      continue
+
+    cached_iids, cached_ts = cached_dict[memcache_key]
+    if cached_search_limit_reached_dict.get(limit_reached_key):
+      search_limit_reached, _ = cached_search_limit_reached_dict[
+          limit_reached_key]
+    else:
+      search_limit_reached = False
+
+    stale = False
+    if query_project_ids:
+      for project_id in query_project_ids:
+        modified_ts = project_shard_timestamps.get((project_id, shard_id))
+        if modified_ts is None or modified_ts > cached_ts:
+          stale = True
+          logging.info('memcache too stale on shard %r because of %r',
+                       shard_id, project_id)
+          break
+    else:
+      modified_ts = project_shard_timestamps.get(('all', shard_id))
+      if modified_ts is None or modified_ts > cached_ts:
+        stale = True
+        logging.info('memcache too stale on shard %r because of all',
+                     shard_id)
+
+    if not stale:
+      unfiltered_dict[shard_key] = cached_iids
+      search_limit_reached_dict[shard_key] = search_limit_reached
+
+  return unfiltered_dict, search_limit_reached_dict
+
+
+def _MakeBackendRequestHeaders(failfast):
+  headers = {
+    # This is needed to allow frontends to talk to backends without going
+    # through a login screen on googleplex.com.
+    # http://wiki/Main/PrometheusInternal#Internal_Applications_and_APIs
+    'X-URLFetch-Service-Id': 'GOOGLEPLEX',
+    }
+  if failfast:
+    headers['X-AppEngine-FailFast'] = 'Yes'
+  return headers
+
+
+def _StartBackendSearchCall(
+    query_project_names,
+    shard_key,
+    invalidation_timestep,
+    me_user_ids,
+    logged_in_user_id,
+    new_url_num,
+    can=None,
+    sort_spec=None,
+    group_by_spec=None,
+    deadline=None,
+    failfast=True):
+  # type: (Sequence[str], Tuple(int, str), int, Sequence[int], int,
+  #     int, str, str, int, bool) ->
+  #     google.appengine.api.apiproxy_stub_map.UserRPC
+  """Ask a backend to query one shard of the database.
+
+  Args:
+    query_project_names: List of project names queried.
+    shard_key: Tuple specifying which DB shard to query.
+    invalidation_timestep: int timestep to use keep cached items fresh.
+    me_user_ids: Empty list when no user is logged in, or user ID of the logged
+        in user when doing an interactive search, or the viewed user ID when
+        viewing someone else's dashboard, or the subscribing user's ID when
+        evaluating subscriptions.  And, any linked accounts.
+    logged_in_user_id: Id of the logged in user.
+    new_url_num: the number of issues for BackendSearchPipeline to query.
+        Computed based on pagination offset + number of items per page.
+    can: Id of th canned query to use.
+    sort_spec: Str specifying how issues should be sorted.
+    group_by_spec: Str specifying how issues should be grouped.
+    deadline: Max time for the RPC to take before failing.
+    failfast: Whether to set the X-AppEngine-FailFast request header.
+
+  Returns:
+    UserRPC for the created RPC call.
+  """
+  shard_id, subquery = shard_key
+  backend_host = modules.get_hostname(module='besearch')
+  url = 'http://%s%s' % (
+      backend_host,
+      framework_helpers.FormatURL(
+          [],
+          urls.BACKEND_SEARCH,
+          projects=','.join(query_project_names),
+          q=subquery,
+          start=0,
+          num=new_url_num,
+          can=can,
+          sort=sort_spec,
+          groupby=group_by_spec,
+          logged_in_user_id=logged_in_user_id,
+          me_user_ids=','.join(str(uid) for uid in me_user_ids),
+          shard_id=shard_id,
+          invalidation_timestep=invalidation_timestep))
+  logging.info('\n\nCalling backend: %s', url)
+  rpc = urlfetch.create_rpc(
+      deadline=deadline or settings.backend_deadline)
+  headers = _MakeBackendRequestHeaders(failfast)
+  # follow_redirects=False is needed to avoid a login screen on googleplex.
+  urlfetch.make_fetch_call(rpc, url, follow_redirects=False, headers=headers)
+  return rpc
+
+
+def _StartBackendNonviewableCall(
+    project_id, logged_in_user_id, shard_id, invalidation_timestep,
+    deadline=None, failfast=True):
+  """Ask a backend to query one shard of the database."""
+  backend_host = modules.get_hostname(module='besearch')
+  url = 'http://%s%s' % (backend_host, framework_helpers.FormatURL(
+      None, urls.BACKEND_NONVIEWABLE,
+      project_id=project_id or '',
+      logged_in_user_id=logged_in_user_id or '',
+      shard_id=shard_id,
+      invalidation_timestep=invalidation_timestep))
+  logging.info('Calling backend nonviewable: %s', url)
+  rpc = urlfetch.create_rpc(deadline=deadline or settings.backend_deadline)
+  headers = _MakeBackendRequestHeaders(failfast)
+  # follow_redirects=False is needed to avoid a login screen on googleplex.
+  urlfetch.make_fetch_call(rpc, url, follow_redirects=False, headers=headers)
+  return rpc
+
+
+def _HandleBackendSearchResponse(
+    query_project_names, rpc_tuple, rpc_tuples, remaining_retries,
+    unfiltered_iids, search_limit_reached, invalidation_timestep,
+    error_responses, me_user_ids, logged_in_user_id, new_url_num, can,
+    sort_spec, group_by_spec):
+  # type: (Sequence[str], Tuple(int, Tuple(int, str),
+  #         google.appengine.api.apiproxy_stub_map.UserRPC),
+  #     Sequence[Tuple(int, Tuple(int, str),
+  #         google.appengine.api.apiproxy_stub_map.UserRPC)],
+  #     int, Mapping[Tuple(int, str), Sequence[int]],
+  #     Mapping[Tuple(int, str), bool], int, Collection[Tuple(int, str)],
+  #     Sequence[int], int, int, int, str, str) -> None
+  #
+  """Process one backend response and retry if there was an error.
+
+  SIDE EFFECTS: This function edits many of the passed in parameters in place.
+    For example, search_limit_reached and unfiltered_iids are updated with
+    response data from the RPC, keyed by shard_key.
+
+  Args:
+    query_project_names: List of projects to query.
+    rpc_tuple: Tuple containing an RPC response object, the time it happened,
+      and what shard the RPC was queried against.
+    rpc_tuples: List of RPC responses to mutate with any retry responses that
+      heppened.
+    remaining_retries: Number of times left to retry.
+    unfiltered_iids: Dict of Issue ids, before they've been filtered by
+      permissions.
+    search_limit_reached: Dict of whether the search limit for a particular
+      shard has been hit.
+    invalidation_timestep: int timestep to use keep cached items fresh.
+    error_responses:
+    me_user_ids: List of relevant user IDs. ie: the currently logged in user
+      and linked account IDs if applicable.
+    logged_in_user_id: Logged in user's ID.
+    new_url_num: the number of issues for BackendSearchPipeline to query.
+        Computed based on pagination offset + number of items per page.
+    can: Canned query ID to use.
+    sort_spec: str specifying how issues should be sorted.
+    group_by_spec: str specifying how issues should be grouped.
+  """
+  start_time, shard_key, rpc = rpc_tuple
+  duration_sec = time.time() - start_time
+
+  try:
+    response = rpc.get_result()
+    logging.info('call to backend took %d sec', duration_sec)
+    # Note that response.content has "})]'\n" prepended to it.
+    json_content = response.content[5:]
+    logging.info('got json text: %r length %r',
+                 json_content[:framework_constants.LOGGING_MAX_LENGTH],
+                 len(json_content))
+    if json_content == '':
+      raise Exception('Fast fail')
+    json_data = json.loads(json_content)
+    unfiltered_iids[shard_key] = json_data['unfiltered_iids']
+    search_limit_reached[shard_key] = json_data['search_limit_reached']
+    if json_data.get('error'):
+      # Don't raise an exception, just log, because these errors are more like
+      # 400s than 500s, and shouldn't be retried.
+      logging.error('Backend shard %r returned error "%r"' % (
+          shard_key, json_data.get('error')))
+      error_responses.add(shard_key)
+
+  except Exception as e:
+    if duration_sec > FAIL_FAST_LIMIT_SEC:  # Don't log fail-fast exceptions.
+      logging.exception(e)
+    if not remaining_retries:
+      logging.error('backend search retries exceeded')
+      error_responses.add(shard_key)
+      return  # Used all retries, so give up.
+
+    if duration_sec >= settings.backend_deadline:
+      logging.error('backend search on %r took too long', shard_key)
+      error_responses.add(shard_key)
+      return  # That backend shard is overloaded, so give up.
+
+    logging.error('backend call for shard %r failed, retrying', shard_key)
+    retry_rpc = _StartBackendSearchCall(
+        query_project_names,
+        shard_key,
+        invalidation_timestep,
+        me_user_ids,
+        logged_in_user_id,
+        new_url_num,
+        can=can,
+        sort_spec=sort_spec,
+        group_by_spec=group_by_spec,
+        failfast=remaining_retries > 2)
+    retry_rpc_tuple = (time.time(), shard_key, retry_rpc)
+    retry_rpc.callback = _MakeBackendCallback(
+        _HandleBackendSearchResponse, query_project_names, retry_rpc_tuple,
+        rpc_tuples, remaining_retries - 1, unfiltered_iids,
+        search_limit_reached, invalidation_timestep, error_responses,
+        me_user_ids, logged_in_user_id, new_url_num, can, sort_spec,
+        group_by_spec)
+    rpc_tuples.append(retry_rpc_tuple)
+
+
+def _HandleBackendNonviewableResponse(
+    project_id, logged_in_user_id, shard_id, rpc_tuple, rpc_tuples,
+    remaining_retries, nonviewable_iids, invalidation_timestep):
+  """Process one backend response and retry if there was an error."""
+  start_time, shard_id, rpc = rpc_tuple
+  duration_sec = time.time() - start_time
+
+  try:
+    response = rpc.get_result()
+    logging.info('call to backend nonviewable took %d sec', duration_sec)
+    # Note that response.content has "})]'\n" prepended to it.
+    json_content = response.content[5:]
+    logging.info('got json text: %r length %r',
+                 json_content[:framework_constants.LOGGING_MAX_LENGTH],
+                 len(json_content))
+    if json_content == '':
+      raise Exception('Fast fail')
+    json_data = json.loads(json_content)
+    nonviewable_iids[shard_id] = set(json_data['nonviewable'])
+
+  except Exception as e:
+    if duration_sec > FAIL_FAST_LIMIT_SEC:  # Don't log fail-fast exceptions.
+      logging.exception(e)
+
+    if not remaining_retries:
+      logging.warn('Used all retries, so give up on shard %r', shard_id)
+      return
+
+    if duration_sec >= settings.backend_deadline:
+      logging.error('nonviewable call on %r took too long', shard_id)
+      return  # That backend shard is overloaded, so give up.
+
+    logging.error(
+      'backend nonviewable call for shard %r;%r;%r failed, retrying',
+      project_id, logged_in_user_id, shard_id)
+    retry_rpc = _StartBackendNonviewableCall(
+        project_id, logged_in_user_id, shard_id, invalidation_timestep,
+        failfast=remaining_retries > 2)
+    retry_rpc_tuple = (time.time(), shard_id, retry_rpc)
+    retry_rpc.callback = _MakeBackendCallback(
+        _HandleBackendNonviewableResponse, project_id, logged_in_user_id,
+        shard_id, retry_rpc_tuple, rpc_tuples, remaining_retries - 1,
+        nonviewable_iids, invalidation_timestep)
+    rpc_tuples.append(retry_rpc_tuple)
+
+
+def _TotalLength(sharded_iids):
+  """Return the total length of all issue_iids lists."""
+  return sum(len(issue_iids) for issue_iids in sharded_iids.values())
+
+
+def _ReverseShards(sharded_iids):
+  """Reverse each issue_iids list in place."""
+  for shard_key in sharded_iids:
+    sharded_iids[shard_key].reverse()
+
+
+def _TrimEndShardedIIDs(sharded_iids, sample_iid_tuples, num_needed):
+  """Trim the IIDs to keep at least num_needed items.
+
+  Args:
+    sharded_iids: dict {shard_key: issue_id_list} for search results.  This is
+        modified in place to remove some trailing issue IDs.
+    sample_iid_tuples: list of (iid, shard_key) from a sorted list of sample
+        issues.
+    num_needed: int minimum total number of items to keep.  Some IIDs that are
+        known to belong in positions > num_needed will be trimmed off.
+
+  Returns:
+    The total number of IIDs removed from the IID lists.
+  """
+  # 1. Get (sample_iid, position_in_shard) for each sample.
+  sample_positions = _CalcSamplePositions(sharded_iids, sample_iid_tuples)
+
+  # 2. Walk through the samples, computing a combined lower bound at each
+  # step until we know that we have passed at least num_needed IIDs.
+  lower_bound_per_shard = {}
+  excess_samples = []
+  for i in range(len(sample_positions)):
+    _sample_iid, sample_shard_key, pos = sample_positions[i]
+    lower_bound_per_shard[sample_shard_key] = pos
+    overall_lower_bound = sum(lower_bound_per_shard.values())
+    if overall_lower_bound >= num_needed:
+      excess_samples = sample_positions[i + 1:]
+      break
+  else:
+    return 0  # We went through all samples and never reached num_needed.
+
+  # 3. Truncate each shard at the first excess sample in that shard.
+  already_trimmed = set()
+  num_trimmed = 0
+  for _sample_iid, sample_shard_key, pos in excess_samples:
+    if sample_shard_key not in already_trimmed:
+      num_trimmed += len(sharded_iids[sample_shard_key]) - pos
+      sharded_iids[sample_shard_key] = sharded_iids[sample_shard_key][:pos]
+      already_trimmed.add(sample_shard_key)
+
+  return num_trimmed
+
+
+# TODO(jrobbins): Convert this to a python generator.
+def _CalcSamplePositions(sharded_iids, sample_iids):
+  """Return [(iid, shard_key, position_in_shard), ...] for each sample."""
+  # We keep track of how far index() has scanned in each shard to avoid
+  # starting over at position 0 when looking for the next sample in
+  # the same shard.
+  scan_positions = collections.defaultdict(lambda: 0)
+  sample_positions = []
+  for sample_iid, sample_shard_key in sample_iids:
+    try:
+      pos = sharded_iids.get(sample_shard_key, []).index(
+          sample_iid, scan_positions[sample_shard_key])
+      scan_positions[sample_shard_key] = pos
+      sample_positions.append((sample_iid, sample_shard_key, pos))
+    except ValueError:
+      pass
+
+  return sample_positions
+
+
+def _SortIssues(issues, config, users_by_id, group_by_spec, sort_spec):
+  """Sort the found issues based on the request and config values.
+
+  Args:
+    issues: A list of issues to be sorted.
+    config: A ProjectIssueConfig that could impact sort order.
+    users_by_id: dictionary {user_id: user_view,...} for all users who
+      participate in any issue in the entire list.
+    group_by_spec: string that lists the grouping order
+    sort_spec: string that lists the sort order
+
+
+  Returns:
+    A sorted list of issues, based on parameters from mr and config.
+  """
+  issues = sorting.SortArtifacts(
+      issues, config, tracker_helpers.SORTABLE_FIELDS,
+      tracker_helpers.SORTABLE_FIELDS_POSTPROCESSORS, group_by_spec,
+      sort_spec, users_by_id=users_by_id)
+  return issues