blob: 69fdc6bde42e23a4c63c6d072e7448c747fba64f [file] [log] [blame]
Copybara854996b2021-09-07 19:36:02 +00001# Copyright 2016 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style
3# license that can be found in the LICENSE file or at
4# https://developers.google.com/open-source/licenses/bsd
5
6"""Backend issue issue search and sorting.
7
8Each of several "besearch" backend jobs manages one shard of the overall set
9of issues in the system. The backend search pipeline retrieves the issues
10that match the user query, puts them into memcache, and returns them to
11the frontend search pipeline.
12"""
13from __future__ import print_function
14from __future__ import division
15from __future__ import absolute_import
16
17import logging
18import re
19import time
20
21from google.appengine.api import memcache
22
23import settings
24from features import savedqueries_helpers
25from framework import authdata
26from framework import framework_constants
27from framework import framework_helpers
28from framework import sorting
29from framework import sql
30from proto import ast_pb2
31from proto import tracker_pb2
32from search import ast2ast
33from search import ast2select
34from search import ast2sort
35from search import query2ast
36from search import searchpipeline
37from services import tracker_fulltext
38from services import fulltext_helpers
39from tracker import tracker_bizobj
40
41
42# Used in constructing the at-risk query.
43AT_RISK_LABEL_RE = re.compile(r'^(restrict-view-.+)$', re.IGNORECASE)
44
45# Limit on the number of list items to show in debug log statements
46MAX_LOG = 200
47
48
49class BackendSearchPipeline(object):
50 """Manage the process of issue search, including Promises and caching.
51
52 Even though the code is divided into several methods, the public
53 methods should be called in sequence, so the execution of the code
54 is pretty much in the order of the source code lines here.
55 """
56
57 def __init__(
58 self, mr, services, default_results_per_page,
59 query_project_names, logged_in_user_id, me_user_ids):
60
61 self.mr = mr
62 self.services = services
63 self.default_results_per_page = default_results_per_page
64
65 self.query_project_list = list(services.project.GetProjectsByName(
66 mr.cnxn, query_project_names).values())
67 self.query_project_ids = [
68 p.project_id for p in self.query_project_list]
69
70 self.me_user_ids = me_user_ids
71 self.mr.auth = authdata.AuthData.FromUserID(
72 mr.cnxn, logged_in_user_id, services)
73
74 # The following fields are filled in as the pipeline progresses.
75 # The value None means that we still need to compute that value.
76 self.result_iids = None # Sorted issue IDs that match the query
77 self.search_limit_reached = False # True if search results limit is hit.
78 self.error = None
79
80 self._MakePromises()
81
82 def _MakePromises(self):
83 config_dict = self.services.config.GetProjectConfigs(
84 self.mr.cnxn, self.query_project_ids)
85 self.harmonized_config = tracker_bizobj.HarmonizeConfigs(
86 list(config_dict.values()))
87
88 self.canned_query = savedqueries_helpers.SavedQueryIDToCond(
89 self.mr.cnxn, self.services.features, self.mr.can)
90
91 self.canned_query, warnings = searchpipeline.ReplaceKeywordsWithUserIDs(
92 self.me_user_ids, self.canned_query)
93 self.mr.warnings.extend(warnings)
94 self.user_query, warnings = searchpipeline.ReplaceKeywordsWithUserIDs(
95 self.me_user_ids, self.mr.query)
96 self.mr.warnings.extend(warnings)
97 logging.debug('Searching query: %s %s', self.canned_query, self.user_query)
98
99 slice_term = ('Issue.shard = %s', [self.mr.shard_id])
100
101 sd = sorting.ComputeSortDirectives(
102 self.harmonized_config, self.mr.group_by_spec, self.mr.sort_spec)
103
104 self.result_iids_promise = framework_helpers.Promise(
105 _GetQueryResultIIDs, self.mr.cnxn,
106 self.services, self.canned_query, self.user_query,
107 self.query_project_ids, self.harmonized_config, sd,
108 slice_term, self.mr.shard_id, self.mr.invalidation_timestep)
109
110 def SearchForIIDs(self):
111 """Wait for the search Promises and store their results."""
112 with self.mr.profiler.Phase('WaitOnPromises'):
113 self.result_iids, self.search_limit_reached, self.error = (
114 self.result_iids_promise.WaitAndGetValue())
115
116
117def SearchProjectCan(
118 cnxn, services, project_ids, query_ast, shard_id, harmonized_config,
119 left_joins=None, where=None, sort_directives=None, query_desc=''):
120 """Return a list of issue global IDs in the projects that satisfy the query.
121
122 Args:
123 cnxn: Regular database connection to the primary DB.
124 services: interface to issue storage backends.
125 project_ids: list of int IDs of the project to search
126 query_ast: A QueryAST PB with conjunctions and conditions.
127 shard_id: limit search to the specified shard ID int.
128 harmonized_config: harmonized config for all projects being searched.
129 left_joins: SQL LEFT JOIN clauses that are needed in addition to
130 anything generated from the query_ast.
131 where: SQL WHERE clauses that are needed in addition to
132 anything generated from the query_ast.
133 sort_directives: list of strings specifying the columns to sort on.
134 query_desc: descriptive string for debugging.
135
136 Returns:
137 (issue_ids, capped, error) where issue_ids is a list of issue issue_ids
138 that satisfy the query, capped is True if the number of results were
139 capped due to an implementation limit, and error is any well-known error
140 (probably a query parsing error) encountered during search.
141 """
142 logging.info('searching projects %r for AST %r', project_ids, query_ast)
143 start_time = time.time()
144 left_joins = left_joins or []
145 where = where or []
146 if project_ids:
147 cond_str = 'Issue.project_id IN (%s)' % sql.PlaceHolders(project_ids)
148 where.append((cond_str, project_ids))
149
150 try:
151 query_ast = ast2ast.PreprocessAST(
152 cnxn, query_ast, project_ids, services, harmonized_config)
153 logging.info('simplified AST is %r', query_ast)
154 query_left_joins, query_where, _ = ast2select.BuildSQLQuery(query_ast)
155 left_joins.extend(query_left_joins)
156 where.extend(query_where)
157 except ast2ast.MalformedQuery as e:
158 # TODO(jrobbins): inform the user that their query had invalid tokens.
159 logging.info('Invalid query tokens %s.\n %r\n\n', e.message, query_ast)
160 return [], False, e
161 except ast2select.NoPossibleResults as e:
162 # TODO(jrobbins): inform the user that their query was impossible.
163 logging.info('Impossible query %s.\n %r\n\n', e.message, query_ast)
164 return [], False, e
165 logging.info('translated to left_joins %r', left_joins)
166 logging.info('translated to where %r', where)
167
168 fts_capped = False
169 if query_ast.conjunctions:
170 # TODO(jrobbins): Handle "OR" in queries. For now, we just process the
171 # first conjunction.
172 assert len(query_ast.conjunctions) == 1
173 conj = query_ast.conjunctions[0]
174 full_text_iids, fts_capped = tracker_fulltext.SearchIssueFullText(
175 project_ids, conj, shard_id)
176 if full_text_iids is not None:
177 if not full_text_iids:
178 return [], False, None # No match on fulltext, so don't bother DB.
179 cond_str = 'Issue.id IN (%s)' % sql.PlaceHolders(full_text_iids)
180 where.append((cond_str, full_text_iids))
181
182 label_def_rows = []
183 status_def_rows = []
184 if sort_directives:
185 if project_ids:
186 for pid in project_ids:
187 label_def_rows.extend(services.config.GetLabelDefRows(cnxn, pid))
188 status_def_rows.extend(services.config.GetStatusDefRows(cnxn, pid))
189 else:
190 label_def_rows = services.config.GetLabelDefRowsAnyProject(cnxn)
191 status_def_rows = services.config.GetStatusDefRowsAnyProject(cnxn)
192
193 harmonized_labels = tracker_bizobj.HarmonizeLabelOrStatusRows(
194 label_def_rows)
195 harmonized_statuses = tracker_bizobj.HarmonizeLabelOrStatusRows(
196 status_def_rows)
197 harmonized_fields = harmonized_config.field_defs
198 sort_left_joins, order_by = ast2sort.BuildSortClauses(
199 sort_directives, harmonized_labels, harmonized_statuses,
200 harmonized_fields)
201 logging.info('translated to sort left_joins %r', sort_left_joins)
202 logging.info('translated to order_by %r', order_by)
203
204 issue_ids, db_capped = services.issue.RunIssueQuery(
205 cnxn, left_joins + sort_left_joins, where, order_by, shard_id=shard_id)
206 logging.warn('executed "%s" query %r for %d issues in %dms',
207 query_desc, query_ast, len(issue_ids),
208 int((time.time() - start_time) * 1000))
209 capped = fts_capped or db_capped
210 return issue_ids, capped, None
211
212def _FilterSpam(query_ast):
213 uses_spam = False
214 # TODO(jrobbins): Handle "OR" in queries. For now, we just modify the
215 # first conjunction.
216 conjunction = query_ast.conjunctions[0]
217 for condition in conjunction.conds:
218 for field in condition.field_defs:
219 if field.field_name == 'spam':
220 uses_spam = True
221
222 if not uses_spam:
223 query_ast.conjunctions[0].conds.append(
224 ast_pb2.MakeCond(
225 ast_pb2.QueryOp.NE,
226 [tracker_pb2.FieldDef(
227 field_name='spam',
228 field_type=tracker_pb2.FieldTypes.BOOL_TYPE)
229 ],
230 [], []))
231
232 return query_ast
233
234def _GetQueryResultIIDs(
235 cnxn, services, canned_query, user_query,
236 query_project_ids, harmonized_config, sd, slice_term,
237 shard_id, invalidation_timestep):
238 """Do a search and return a list of matching issue IDs.
239
240 Args:
241 cnxn: connection to the database.
242 services: interface to issue storage backends.
243 canned_query: string part of the query from the drop-down menu.
244 user_query: string part of the query that the user typed in.
245 query_project_ids: list of project IDs to search.
246 harmonized_config: combined configs for all the queried projects.
247 sd: list of sort directives.
248 slice_term: additional query term to narrow results to a logical shard
249 within a physical shard.
250 shard_id: int number of the database shard to search.
251 invalidation_timestep: int timestep to use keep memcached items fresh.
252
253 Returns:
254 Tuple consisting of:
255 A list of issue issue_ids that match the user's query. An empty list, [],
256 is returned if no issues match the query.
257 Boolean that is set to True if the search results limit of this shard is
258 hit.
259 An error (subclass of Exception) encountered during query processing. None
260 means that no error was encountered.
261 """
262 query_ast = _FilterSpam(query2ast.ParseUserQuery(
263 user_query, canned_query, query2ast.BUILTIN_ISSUE_FIELDS,
264 harmonized_config))
265
266 logging.info('query_project_ids is %r', query_project_ids)
267
268 is_fulltext_query = bool(
269 query_ast.conjunctions and
270 fulltext_helpers.BuildFTSQuery(
271 query_ast.conjunctions[0], tracker_fulltext.ISSUE_FULLTEXT_FIELDS))
272 expiration = framework_constants.CACHE_EXPIRATION
273 if is_fulltext_query:
274 expiration = framework_constants.FULLTEXT_MEMCACHE_EXPIRATION
275
276 # Might raise ast2ast.MalformedQuery or ast2select.NoPossibleResults.
277 result_iids, search_limit_reached, error = SearchProjectCan(
278 cnxn, services, query_project_ids, query_ast, shard_id,
279 harmonized_config, sort_directives=sd, where=[slice_term],
280 query_desc='getting query issue IDs')
281 logging.info('Found %d result_iids', len(result_iids))
282 if error:
283 logging.warn('Got error %r', error)
284
285 projects_str = ','.join(str(pid) for pid in sorted(query_project_ids))
286 projects_str = projects_str or 'all'
287 memcache_key = ';'.join([
288 projects_str, canned_query, user_query, ' '.join(sd), str(shard_id)])
289 memcache.set(memcache_key, (result_iids, invalidation_timestep),
290 time=expiration, namespace=settings.memcache_namespace)
291 logging.info('set memcache key %r', memcache_key)
292
293 search_limit_memcache_key = ';'.join([
294 projects_str, canned_query, user_query, ' '.join(sd),
295 'search_limit_reached', str(shard_id)])
296 memcache.set(search_limit_memcache_key,
297 (search_limit_reached, invalidation_timestep),
298 time=expiration, namespace=settings.memcache_namespace)
299 logging.info('set search limit memcache key %r',
300 search_limit_memcache_key)
301
302 timestamps_for_projects = memcache.get_multi(
303 keys=(['%d;%d' % (pid, shard_id) for pid in query_project_ids] +
304 ['all:%d' % shard_id]),
305 namespace=settings.memcache_namespace)
306
307 if query_project_ids:
308 for pid in query_project_ids:
309 key = '%d;%d' % (pid, shard_id)
310 if key not in timestamps_for_projects:
311 memcache.set(
312 key,
313 invalidation_timestep,
314 time=framework_constants.CACHE_EXPIRATION,
315 namespace=settings.memcache_namespace)
316 else:
317 key = 'all;%d' % shard_id
318 if key not in timestamps_for_projects:
319 memcache.set(
320 key,
321 invalidation_timestep,
322 time=framework_constants.CACHE_EXPIRATION,
323 namespace=settings.memcache_namespace)
324
325 return result_iids, search_limit_reached, error