blob: a5709ea961e9b4e090bad3d4378a55b5ef0818ce [file] [log] [blame]
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01001# Copyright 2016 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
Copybara854996b2021-09-07 19:36:02 +00004
5"""A set of functions that provide fulltext search for issues."""
6from __future__ import print_function
7from __future__ import division
8from __future__ import absolute_import
9
10import collections
11import logging
12import time
13
14from six import string_types
15
16from google.appengine.api import search
17
18import settings
19from framework import framework_constants
20from framework import framework_helpers
21from framework import framework_views
22from services import fulltext_helpers
23from tracker import tracker_bizobj
24
25
26# When updating and re-indexing all issues in a project, work in batches
27# of this size to manage memory usage and avoid rpc timeouts.
28_INDEX_BATCH_SIZE = 40
29
30
31# The user can search for text that occurs specifically in these
32# parts of an issue.
33ISSUE_FULLTEXT_FIELDS = ['summary', 'description', 'comment']
34# Note: issue documents also contain a "metadata" field, but we do not
35# expose that to users. Issue metadata can be searched in a structured way
36# by giving a specific field name such as "owner:" or "status:". The metadata
37# search field exists only for fulltext queries that do not specify any field.
38
39
40def IndexIssues(cnxn, issues, user_service, issue_service, config_service):
41 """(Re)index all the given issues.
42
43 Args:
44 cnxn: connection to SQL database.
45 issues: list of Issue PBs to index.
46 user_service: interface to user data storage.
47 issue_service: interface to issue data storage.
48 config_service: interface to configuration data storage.
49 """
50 issues = list(issues)
51 config_dict = config_service.GetProjectConfigs(
52 cnxn, {issue.project_id for issue in issues})
53 for start in range(0, len(issues), _INDEX_BATCH_SIZE):
54 logging.info('indexing issues: %d remaining', len(issues) - start)
55 _IndexIssueBatch(
56 cnxn, issues[start:start + _INDEX_BATCH_SIZE], user_service,
57 issue_service, config_dict)
58
59
60def _IndexIssueBatch(cnxn, issues, user_service, issue_service, config_dict):
61 """Internal method to (re)index the given batch of issues.
62
63 Args:
64 cnxn: connection to SQL database.
65 issues: list of Issue PBs to index.
66 user_service: interface to user data storage.
67 issue_service: interface to issue data storage.
68 config_dict: dict {project_id: config} for all the projects that
69 the given issues are in.
70 """
71 user_ids = tracker_bizobj.UsersInvolvedInIssues(issues)
72 comments_dict = issue_service.GetCommentsForIssues(
73 cnxn, [issue.issue_id for issue in issues])
74 for comments in comments_dict.values():
75 user_ids.update([ic.user_id for ic in comments])
76
77 users_by_id = framework_views.MakeAllUserViews(
78 cnxn, user_service, user_ids)
79 _CreateIssueSearchDocuments(issues, comments_dict, users_by_id, config_dict)
80
81
82def _CreateIssueSearchDocuments(
83 issues, comments_dict, users_by_id, config_dict):
84 """Make the GAE search index documents for the given issue batch.
85
86 Args:
87 issues: list of issues to index.
88 comments_dict: prefetched dictionary of comments on those issues.
89 users_by_id: dictionary {user_id: UserView} so that the email
90 addresses of users who left comments can be found via search.
91 config_dict: dict {project_id: config} for all the projects that
92 the given issues are in.
93 """
94 documents_by_shard = collections.defaultdict(list)
95 for issue in issues:
96 summary = issue.summary
97 # TODO(jrobbins): allow search specifically on explicit vs derived
98 # fields.
99 owner_id = tracker_bizobj.GetOwnerId(issue)
100 owner_email = users_by_id[owner_id].email
101 config = config_dict[issue.project_id]
102 component_paths = []
103 for component_id in issue.component_ids:
104 cd = tracker_bizobj.FindComponentDefByID(component_id, config)
105 if cd:
106 component_paths.append(cd.path)
107
108 field_values = [tracker_bizobj.GetFieldValue(fv, users_by_id)
109 for fv in issue.field_values]
110 # Convert to string only the values that are not strings already.
111 # This is done because the default encoding in appengine seems to be 'ascii'
112 # and string values might contain unicode characters, so str will fail to
113 # encode them.
114 field_values = [value if isinstance(value, string_types) else str(value)
115 for value in field_values]
116
117 metadata = '%s %s %s %s %s %s' % (
118 tracker_bizobj.GetStatus(issue),
119 owner_email,
120 [users_by_id[cc_id].email for cc_id in
121 tracker_bizobj.GetCcIds(issue)],
122 ' '.join(component_paths),
123 ' '.join(field_values),
124 ' '.join(tracker_bizobj.GetLabels(issue)))
125 custom_fields = _BuildCustomFTSFields(issue)
126
127 comments = comments_dict.get(issue.issue_id, [])
128 room_for_comments = (framework_constants.MAX_FTS_FIELD_SIZE -
129 len(summary) -
130 len(metadata) -
131 sum(len(cf.value) for cf in custom_fields))
132 comments = _IndexableComments(
133 comments, users_by_id, remaining_chars=room_for_comments)
134 logging.info('len(comments) is %r', len(comments))
135 if comments:
136 description = _ExtractCommentText(comments[0], users_by_id)
137 description = description[:framework_constants.MAX_FTS_FIELD_SIZE]
138 all_comments = ' '. join(
139 _ExtractCommentText(c, users_by_id) for c in comments[1:])
140 all_comments = all_comments[:framework_constants.MAX_FTS_FIELD_SIZE]
141 else:
142 description = ''
143 all_comments = ''
144 logging.info(
145 'Issue %s:%r has zero indexable comments',
146 issue.project_name, issue.local_id)
147
148 logging.info('Building document for %s:%d',
149 issue.project_name, issue.local_id)
150 logging.info('len(summary) = %d', len(summary))
151 logging.info('len(metadata) = %d', len(metadata))
152 logging.info('len(description) = %d', len(description))
153 logging.info('len(comment) = %d', len(all_comments))
154 for cf in custom_fields:
155 logging.info('len(%s) = %d', cf.name, len(cf.value))
156
157 doc = search.Document(
158 doc_id=str(issue.issue_id),
159 fields=[
160 search.NumberField(name='project_id', value=issue.project_id),
161 search.TextField(name='summary', value=summary),
162 search.TextField(name='metadata', value=metadata),
163 search.TextField(name='description', value=description),
164 search.TextField(name='comment', value=all_comments),
165 ] + custom_fields)
166
167 shard_id = issue.issue_id % settings.num_logical_shards
168 documents_by_shard[shard_id].append(doc)
169
170 start_time = time.time()
171 promises = []
172 for shard_id, documents in documents_by_shard.items():
173 if documents:
174 promises.append(framework_helpers.Promise(
175 _IndexDocsInShard, shard_id, documents))
176
177 for promise in promises:
178 promise.WaitAndGetValue()
179
180 logging.info('Finished %d indexing in shards in %d ms',
181 len(documents_by_shard), int((time.time() - start_time) * 1000))
182
183
184def _IndexableComments(comments, users_by_id, remaining_chars=None):
185 """We only index the comments that are not deleted or banned.
186
187 Args:
188 comments: list of Comment PBs for one issue.
189 users_by_id: Dict of (user_id -> UserView) for all users.
190 remaining_chars: number of characters available for comment text
191 without hitting the GAE search index max document size.
192
193 Returns:
194 A list of comments filtered to not have any deleted comments or
195 comments from banned users. If the issue has a huge number of
196 comments, only a certain number of the first and last comments
197 are actually indexed.
198 """
199 if remaining_chars is None:
200 remaining_chars = framework_constants.MAX_FTS_FIELD_SIZE
201 allowed_comments = []
202 for comment in comments:
203 user_view = users_by_id.get(comment.user_id)
204 if not (comment.deleted_by or (user_view and user_view.banned)):
205 if comment.is_description and allowed_comments:
206 # index the latest description, but not older descriptions
207 allowed_comments[0] = comment
208 else:
209 allowed_comments.append(comment)
210
211 reasonable_size = (framework_constants.INITIAL_COMMENTS_TO_INDEX +
212 framework_constants.FINAL_COMMENTS_TO_INDEX)
213 if len(allowed_comments) <= reasonable_size:
214 candidates = allowed_comments
215 else:
216 candidates = ( # Prioritize the description and recent comments.
217 allowed_comments[0:1] +
218 allowed_comments[-framework_constants.FINAL_COMMENTS_TO_INDEX:] +
219 allowed_comments[1:framework_constants.INITIAL_COMMENTS_TO_INDEX])
220
221 total_length = 0
222 result = []
223 for comment in candidates:
224 total_length += len(comment.content)
225 if total_length > remaining_chars:
226 break
227 result.append(comment)
228
229 return result
230
231
232def _IndexDocsInShard(shard_id, documents):
233 search_index = search.Index(
234 name=settings.search_index_name_format % shard_id)
235 search_index.put(documents)
236 logging.info('FTS indexed %d docs in shard %d', len(documents), shard_id)
237 # TODO(jrobbins): catch OverQuotaError and add the issues to the
238 # ReindexQueue table instead.
239
240
241def _ExtractCommentText(comment, users_by_id):
242 """Return a string with all the searchable text of the given Comment PB."""
243 commenter_email = users_by_id[comment.user_id].email
244 return '%s %s %s' % (
245 commenter_email,
246 comment.content,
247 ' '.join(attach.filename
248 for attach in comment.attachments
249 if not attach.deleted))
250
251
252def _BuildCustomFTSFields(issue):
253 """Return a list of FTS Fields to index string-valued custom fields."""
254 fts_fields = []
255 for fv in issue.field_values:
256 if fv.str_value:
257 # TODO(jrobbins): also indicate which were derived vs. explicit.
258 # TODO(jrobbins): also toss in the email addresses of any users in
259 # user-valued custom fields, ints for int-valued fields, etc.
260 fts_field = search.TextField(
261 name='custom_%d' % fv.field_id, value=fv.str_value)
262 fts_fields.append(fts_field)
263
264 return fts_fields
265
266
267def UnindexIssues(issue_ids):
268 """Remove many issues from the sharded search indexes."""
269 iids_by_shard = {}
270 for issue_id in issue_ids:
271 shard_id = issue_id % settings.num_logical_shards
272 iids_by_shard.setdefault(shard_id, [])
273 iids_by_shard[shard_id].append(issue_id)
274
275 for shard_id, iids_in_shard in iids_by_shard.items():
276 try:
277 logging.info(
278 'unindexing %r issue_ids in %r', len(iids_in_shard), shard_id)
279 search_index = search.Index(
280 name=settings.search_index_name_format % shard_id)
281 search_index.delete([str(iid) for iid in iids_in_shard])
282 except search.Error:
283 logging.exception('FTS deletion failed')
284
285
286def SearchIssueFullText(project_ids, query_ast_conj, shard_id):
287 """Do full-text search in GAE FTS.
288
289 Args:
290 project_ids: list of project ID numbers to consider.
291 query_ast_conj: One conjuctive clause from the AST parsed
292 from the user's query.
293 shard_id: int shard ID for the shard to consider.
294
295 Returns:
296 (issue_ids, capped) where issue_ids is a list of issue issue_ids that match
297 the full-text query. And, capped is True if the results were capped due to
298 an implementation limitation. Or, return (None, False) if the given AST
299 conjunction contains no full-text conditions.
300 """
301 fulltext_query = fulltext_helpers.BuildFTSQuery(
302 query_ast_conj, ISSUE_FULLTEXT_FIELDS)
303 if fulltext_query is None:
304 return None, False
305
306 if project_ids:
307 project_clause = ' OR '.join(
308 'project_id:%d' % pid for pid in project_ids)
309 fulltext_query = '(%s) %s' % (project_clause, fulltext_query)
310
311 # TODO(jrobbins): it would be good to also include some other
312 # structured search terms to narrow down the set of index
313 # documents considered. E.g., most queries are only over the
314 # open issues.
315 logging.info('FTS query is %r', fulltext_query)
316 issue_ids = fulltext_helpers.ComprehensiveSearch(
317 fulltext_query, settings.search_index_name_format % shard_id)
318 capped = len(issue_ids) >= settings.fulltext_limit_per_shard
319 return issue_ids, capped