blob: 56fb6afc7e111a36c56b5c34447ff6a0512d8a26 [file] [log] [blame]
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01001# Copyright 2016 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
Copybara854996b2021-09-07 19:36:02 +00004
5"""Backend issue issue search and sorting.
6
7Each of several "besearch" backend jobs manages one shard of the overall set
8of issues in the system. The backend search pipeline retrieves the issues
9that match the user query, puts them into memcache, and returns them to
10the frontend search pipeline.
11"""
12from __future__ import print_function
13from __future__ import division
14from __future__ import absolute_import
15
16import logging
17import re
18import time
19
20from google.appengine.api import memcache
21
22import settings
23from features import savedqueries_helpers
24from framework import authdata
25from framework import framework_constants
26from framework import framework_helpers
27from framework import sorting
28from framework import sql
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +010029from mrproto import ast_pb2
30from mrproto import tracker_pb2
Copybara854996b2021-09-07 19:36:02 +000031from search import ast2ast
32from search import ast2select
33from search import ast2sort
34from search import query2ast
35from search import searchpipeline
36from services import tracker_fulltext
37from services import fulltext_helpers
38from tracker import tracker_bizobj
39
40
41# Used in constructing the at-risk query.
42AT_RISK_LABEL_RE = re.compile(r'^(restrict-view-.+)$', re.IGNORECASE)
43
44# Limit on the number of list items to show in debug log statements
45MAX_LOG = 200
46
47
48class BackendSearchPipeline(object):
49 """Manage the process of issue search, including Promises and caching.
50
51 Even though the code is divided into several methods, the public
52 methods should be called in sequence, so the execution of the code
53 is pretty much in the order of the source code lines here.
54 """
55
56 def __init__(
57 self, mr, services, default_results_per_page,
58 query_project_names, logged_in_user_id, me_user_ids):
59
60 self.mr = mr
61 self.services = services
62 self.default_results_per_page = default_results_per_page
63
64 self.query_project_list = list(services.project.GetProjectsByName(
65 mr.cnxn, query_project_names).values())
66 self.query_project_ids = [
67 p.project_id for p in self.query_project_list]
68
69 self.me_user_ids = me_user_ids
70 self.mr.auth = authdata.AuthData.FromUserID(
71 mr.cnxn, logged_in_user_id, services)
72
73 # The following fields are filled in as the pipeline progresses.
74 # The value None means that we still need to compute that value.
75 self.result_iids = None # Sorted issue IDs that match the query
76 self.search_limit_reached = False # True if search results limit is hit.
77 self.error = None
78
79 self._MakePromises()
80
81 def _MakePromises(self):
82 config_dict = self.services.config.GetProjectConfigs(
83 self.mr.cnxn, self.query_project_ids)
84 self.harmonized_config = tracker_bizobj.HarmonizeConfigs(
85 list(config_dict.values()))
86
87 self.canned_query = savedqueries_helpers.SavedQueryIDToCond(
88 self.mr.cnxn, self.services.features, self.mr.can)
89
90 self.canned_query, warnings = searchpipeline.ReplaceKeywordsWithUserIDs(
91 self.me_user_ids, self.canned_query)
92 self.mr.warnings.extend(warnings)
93 self.user_query, warnings = searchpipeline.ReplaceKeywordsWithUserIDs(
94 self.me_user_ids, self.mr.query)
95 self.mr.warnings.extend(warnings)
96 logging.debug('Searching query: %s %s', self.canned_query, self.user_query)
97
98 slice_term = ('Issue.shard = %s', [self.mr.shard_id])
99
100 sd = sorting.ComputeSortDirectives(
101 self.harmonized_config, self.mr.group_by_spec, self.mr.sort_spec)
102
103 self.result_iids_promise = framework_helpers.Promise(
104 _GetQueryResultIIDs, self.mr.cnxn,
105 self.services, self.canned_query, self.user_query,
106 self.query_project_ids, self.harmonized_config, sd,
107 slice_term, self.mr.shard_id, self.mr.invalidation_timestep)
108
109 def SearchForIIDs(self):
110 """Wait for the search Promises and store their results."""
111 with self.mr.profiler.Phase('WaitOnPromises'):
112 self.result_iids, self.search_limit_reached, self.error = (
113 self.result_iids_promise.WaitAndGetValue())
114
115
116def SearchProjectCan(
117 cnxn, services, project_ids, query_ast, shard_id, harmonized_config,
118 left_joins=None, where=None, sort_directives=None, query_desc=''):
119 """Return a list of issue global IDs in the projects that satisfy the query.
120
121 Args:
122 cnxn: Regular database connection to the primary DB.
123 services: interface to issue storage backends.
124 project_ids: list of int IDs of the project to search
125 query_ast: A QueryAST PB with conjunctions and conditions.
126 shard_id: limit search to the specified shard ID int.
127 harmonized_config: harmonized config for all projects being searched.
128 left_joins: SQL LEFT JOIN clauses that are needed in addition to
129 anything generated from the query_ast.
130 where: SQL WHERE clauses that are needed in addition to
131 anything generated from the query_ast.
132 sort_directives: list of strings specifying the columns to sort on.
133 query_desc: descriptive string for debugging.
134
135 Returns:
136 (issue_ids, capped, error) where issue_ids is a list of issue issue_ids
137 that satisfy the query, capped is True if the number of results were
138 capped due to an implementation limit, and error is any well-known error
139 (probably a query parsing error) encountered during search.
140 """
141 logging.info('searching projects %r for AST %r', project_ids, query_ast)
142 start_time = time.time()
143 left_joins = left_joins or []
144 where = where or []
145 if project_ids:
146 cond_str = 'Issue.project_id IN (%s)' % sql.PlaceHolders(project_ids)
147 where.append((cond_str, project_ids))
148
149 try:
150 query_ast = ast2ast.PreprocessAST(
151 cnxn, query_ast, project_ids, services, harmonized_config)
152 logging.info('simplified AST is %r', query_ast)
153 query_left_joins, query_where, _ = ast2select.BuildSQLQuery(query_ast)
154 left_joins.extend(query_left_joins)
155 where.extend(query_where)
156 except ast2ast.MalformedQuery as e:
157 # TODO(jrobbins): inform the user that their query had invalid tokens.
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +0100158 logging.info('Invalid query tokens %s.\n %r\n\n', str(e), query_ast)
Copybara854996b2021-09-07 19:36:02 +0000159 return [], False, e
160 except ast2select.NoPossibleResults as e:
161 # TODO(jrobbins): inform the user that their query was impossible.
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +0100162 logging.info('Impossible query %s.\n %r\n\n', str(e), query_ast)
Copybara854996b2021-09-07 19:36:02 +0000163 return [], False, e
164 logging.info('translated to left_joins %r', left_joins)
165 logging.info('translated to where %r', where)
166
167 fts_capped = False
168 if query_ast.conjunctions:
169 # TODO(jrobbins): Handle "OR" in queries. For now, we just process the
170 # first conjunction.
171 assert len(query_ast.conjunctions) == 1
172 conj = query_ast.conjunctions[0]
173 full_text_iids, fts_capped = tracker_fulltext.SearchIssueFullText(
174 project_ids, conj, shard_id)
175 if full_text_iids is not None:
176 if not full_text_iids:
177 return [], False, None # No match on fulltext, so don't bother DB.
178 cond_str = 'Issue.id IN (%s)' % sql.PlaceHolders(full_text_iids)
179 where.append((cond_str, full_text_iids))
180
181 label_def_rows = []
182 status_def_rows = []
183 if sort_directives:
184 if project_ids:
185 for pid in project_ids:
186 label_def_rows.extend(services.config.GetLabelDefRows(cnxn, pid))
187 status_def_rows.extend(services.config.GetStatusDefRows(cnxn, pid))
188 else:
189 label_def_rows = services.config.GetLabelDefRowsAnyProject(cnxn)
190 status_def_rows = services.config.GetStatusDefRowsAnyProject(cnxn)
191
192 harmonized_labels = tracker_bizobj.HarmonizeLabelOrStatusRows(
193 label_def_rows)
194 harmonized_statuses = tracker_bizobj.HarmonizeLabelOrStatusRows(
195 status_def_rows)
196 harmonized_fields = harmonized_config.field_defs
197 sort_left_joins, order_by = ast2sort.BuildSortClauses(
198 sort_directives, harmonized_labels, harmonized_statuses,
199 harmonized_fields)
200 logging.info('translated to sort left_joins %r', sort_left_joins)
201 logging.info('translated to order_by %r', order_by)
202
203 issue_ids, db_capped = services.issue.RunIssueQuery(
204 cnxn, left_joins + sort_left_joins, where, order_by, shard_id=shard_id)
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +0100205 logging.warning(
206 'executed "%s" query %r for %d issues in %dms', query_desc, query_ast,
207 len(issue_ids), int((time.time() - start_time) * 1000))
Copybara854996b2021-09-07 19:36:02 +0000208 capped = fts_capped or db_capped
209 return issue_ids, capped, None
210
211def _FilterSpam(query_ast):
212 uses_spam = False
213 # TODO(jrobbins): Handle "OR" in queries. For now, we just modify the
214 # first conjunction.
215 conjunction = query_ast.conjunctions[0]
216 for condition in conjunction.conds:
217 for field in condition.field_defs:
218 if field.field_name == 'spam':
219 uses_spam = True
220
221 if not uses_spam:
222 query_ast.conjunctions[0].conds.append(
223 ast_pb2.MakeCond(
224 ast_pb2.QueryOp.NE,
225 [tracker_pb2.FieldDef(
226 field_name='spam',
227 field_type=tracker_pb2.FieldTypes.BOOL_TYPE)
228 ],
229 [], []))
230
231 return query_ast
232
233def _GetQueryResultIIDs(
234 cnxn, services, canned_query, user_query,
235 query_project_ids, harmonized_config, sd, slice_term,
236 shard_id, invalidation_timestep):
237 """Do a search and return a list of matching issue IDs.
238
239 Args:
240 cnxn: connection to the database.
241 services: interface to issue storage backends.
242 canned_query: string part of the query from the drop-down menu.
243 user_query: string part of the query that the user typed in.
244 query_project_ids: list of project IDs to search.
245 harmonized_config: combined configs for all the queried projects.
246 sd: list of sort directives.
247 slice_term: additional query term to narrow results to a logical shard
248 within a physical shard.
249 shard_id: int number of the database shard to search.
250 invalidation_timestep: int timestep to use keep memcached items fresh.
251
252 Returns:
253 Tuple consisting of:
254 A list of issue issue_ids that match the user's query. An empty list, [],
255 is returned if no issues match the query.
256 Boolean that is set to True if the search results limit of this shard is
257 hit.
258 An error (subclass of Exception) encountered during query processing. None
259 means that no error was encountered.
260 """
261 query_ast = _FilterSpam(query2ast.ParseUserQuery(
262 user_query, canned_query, query2ast.BUILTIN_ISSUE_FIELDS,
263 harmonized_config))
264
265 logging.info('query_project_ids is %r', query_project_ids)
266
267 is_fulltext_query = bool(
268 query_ast.conjunctions and
269 fulltext_helpers.BuildFTSQuery(
270 query_ast.conjunctions[0], tracker_fulltext.ISSUE_FULLTEXT_FIELDS))
271 expiration = framework_constants.CACHE_EXPIRATION
272 if is_fulltext_query:
273 expiration = framework_constants.FULLTEXT_MEMCACHE_EXPIRATION
274
275 # Might raise ast2ast.MalformedQuery or ast2select.NoPossibleResults.
276 result_iids, search_limit_reached, error = SearchProjectCan(
277 cnxn, services, query_project_ids, query_ast, shard_id,
278 harmonized_config, sort_directives=sd, where=[slice_term],
279 query_desc='getting query issue IDs')
280 logging.info('Found %d result_iids', len(result_iids))
281 if error:
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +0100282 logging.warning('Got error %r', error)
Copybara854996b2021-09-07 19:36:02 +0000283
284 projects_str = ','.join(str(pid) for pid in sorted(query_project_ids))
285 projects_str = projects_str or 'all'
286 memcache_key = ';'.join([
287 projects_str, canned_query, user_query, ' '.join(sd), str(shard_id)])
288 memcache.set(memcache_key, (result_iids, invalidation_timestep),
289 time=expiration, namespace=settings.memcache_namespace)
290 logging.info('set memcache key %r', memcache_key)
291
292 search_limit_memcache_key = ';'.join([
293 projects_str, canned_query, user_query, ' '.join(sd),
294 'search_limit_reached', str(shard_id)])
295 memcache.set(search_limit_memcache_key,
296 (search_limit_reached, invalidation_timestep),
297 time=expiration, namespace=settings.memcache_namespace)
298 logging.info('set search limit memcache key %r',
299 search_limit_memcache_key)
300
301 timestamps_for_projects = memcache.get_multi(
302 keys=(['%d;%d' % (pid, shard_id) for pid in query_project_ids] +
303 ['all:%d' % shard_id]),
304 namespace=settings.memcache_namespace)
305
306 if query_project_ids:
307 for pid in query_project_ids:
308 key = '%d;%d' % (pid, shard_id)
309 if key not in timestamps_for_projects:
310 memcache.set(
311 key,
312 invalidation_timestep,
313 time=framework_constants.CACHE_EXPIRATION,
314 namespace=settings.memcache_namespace)
315 else:
316 key = 'all;%d' % shard_id
317 if key not in timestamps_for_projects:
318 memcache.set(
319 key,
320 invalidation_timestep,
321 time=framework_constants.CACHE_EXPIRATION,
322 namespace=settings.memcache_namespace)
323
324 return result_iids, search_limit_reached, error