blob: ad50f81d8510c5c384e0b03059226bf06c349563 [file] [log] [blame]
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01001# Copyright 2016 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
Copybara854996b2021-09-07 19:36:02 +00004
5"""A set of functions that provide persistence for Monorail issue tracking.
6
7This module provides functions to get, update, create, and (in some
8cases) delete each type of business object. It provides a logical
9persistence layer on top of an SQL database.
10
11Business objects are described in tracker_pb2.py and tracker_bizobj.py.
12"""
13from __future__ import print_function
14from __future__ import division
15from __future__ import absolute_import
16
17import collections
18import json
19import logging
20import os
21import time
22import uuid
23
24from google.appengine.api import app_identity
25from google.appengine.api import images
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +020026from google.cloud import storage
Copybara854996b2021-09-07 19:36:02 +000027
28import settings
29from features import filterrules_helpers
30from framework import authdata
31from framework import exceptions
32from framework import framework_bizobj
33from framework import framework_constants
34from framework import framework_helpers
35from framework import gcs_helpers
36from framework import permissions
37from framework import sql
38from infra_libs import ts_mon
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +010039from mrproto import project_pb2
40from mrproto import tracker_pb2
Copybara854996b2021-09-07 19:36:02 +000041from services import caches
42from services import tracker_fulltext
43from tracker import tracker_bizobj
44from tracker import tracker_helpers
45
46# TODO(jojwang): monorail:4693, remove this after all 'stable-full'
47# gates have been renamed to 'stable'.
48FLT_EQUIVALENT_GATES = {'stable-full': 'stable',
49 'stable': 'stable-full'}
50
51ISSUE_TABLE_NAME = 'Issue'
52ISSUESUMMARY_TABLE_NAME = 'IssueSummary'
53ISSUE2LABEL_TABLE_NAME = 'Issue2Label'
54ISSUE2COMPONENT_TABLE_NAME = 'Issue2Component'
55ISSUE2CC_TABLE_NAME = 'Issue2Cc'
56ISSUE2NOTIFY_TABLE_NAME = 'Issue2Notify'
57ISSUE2FIELDVALUE_TABLE_NAME = 'Issue2FieldValue'
58COMMENT_TABLE_NAME = 'Comment'
59COMMENTCONTENT_TABLE_NAME = 'CommentContent'
60COMMENTIMPORTER_TABLE_NAME = 'CommentImporter'
61ATTACHMENT_TABLE_NAME = 'Attachment'
62ISSUERELATION_TABLE_NAME = 'IssueRelation'
63DANGLINGRELATION_TABLE_NAME = 'DanglingIssueRelation'
64ISSUEUPDATE_TABLE_NAME = 'IssueUpdate'
65ISSUEFORMERLOCATIONS_TABLE_NAME = 'IssueFormerLocations'
66REINDEXQUEUE_TABLE_NAME = 'ReindexQueue'
67LOCALIDCOUNTER_TABLE_NAME = 'LocalIDCounter'
68ISSUESNAPSHOT_TABLE_NAME = 'IssueSnapshot'
69ISSUESNAPSHOT2CC_TABLE_NAME = 'IssueSnapshot2Cc'
70ISSUESNAPSHOT2COMPONENT_TABLE_NAME = 'IssueSnapshot2Component'
71ISSUESNAPSHOT2LABEL_TABLE_NAME = 'IssueSnapshot2Label'
72ISSUEPHASEDEF_TABLE_NAME = 'IssuePhaseDef'
73ISSUE2APPROVALVALUE_TABLE_NAME = 'Issue2ApprovalValue'
74ISSUEAPPROVAL2APPROVER_TABLE_NAME = 'IssueApproval2Approver'
75ISSUEAPPROVAL2COMMENT_TABLE_NAME = 'IssueApproval2Comment'
76
77
78ISSUE_COLS = [
79 'id', 'project_id', 'local_id', 'status_id', 'owner_id', 'reporter_id',
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +010080 'opened', 'closed', 'modified', 'owner_modified', 'status_modified',
81 'component_modified', 'migration_modified', 'derived_owner_id',
82 'derived_status_id', 'deleted', 'star_count', 'attachment_count', 'is_spam'
83]
Copybara854996b2021-09-07 19:36:02 +000084ISSUESUMMARY_COLS = ['issue_id', 'summary']
85ISSUE2LABEL_COLS = ['issue_id', 'label_id', 'derived']
86ISSUE2COMPONENT_COLS = ['issue_id', 'component_id', 'derived']
87ISSUE2CC_COLS = ['issue_id', 'cc_id', 'derived']
88ISSUE2NOTIFY_COLS = ['issue_id', 'email']
89ISSUE2FIELDVALUE_COLS = [
90 'issue_id', 'field_id', 'int_value', 'str_value', 'user_id', 'date_value',
91 'url_value', 'derived', 'phase_id']
92# Explicitly specify column 'Comment.id' to allow joins on other tables that
93# have an 'id' column.
94COMMENT_COLS = [
95 'Comment.id', 'issue_id', 'created', 'Comment.project_id', 'commenter_id',
96 'deleted_by', 'Comment.is_spam', 'is_description',
97 'commentcontent_id'] # Note: commentcontent_id must be last.
98COMMENTCONTENT_COLS = [
99 'CommentContent.id', 'content', 'inbound_message']
100COMMENTIMPORTER_COLS = ['comment_id', 'importer_id']
101ABBR_COMMENT_COLS = ['Comment.id', 'commenter_id', 'deleted_by',
102 'is_description']
103ATTACHMENT_COLS = [
104 'id', 'issue_id', 'comment_id', 'filename', 'filesize', 'mimetype',
105 'deleted', 'gcs_object_id']
106ISSUERELATION_COLS = ['issue_id', 'dst_issue_id', 'kind', 'rank']
107ABBR_ISSUERELATION_COLS = ['dst_issue_id', 'rank']
108DANGLINGRELATION_COLS = [
109 'issue_id', 'dst_issue_project', 'dst_issue_local_id',
110 'ext_issue_identifier', 'kind']
111ISSUEUPDATE_COLS = [
112 'id', 'issue_id', 'comment_id', 'field', 'old_value', 'new_value',
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +0100113 'added_user_id', 'removed_user_id', 'custom_field_name',
114 'added_component_id', 'removed_component_id'
115]
Copybara854996b2021-09-07 19:36:02 +0000116ISSUEFORMERLOCATIONS_COLS = ['issue_id', 'project_id', 'local_id']
117REINDEXQUEUE_COLS = ['issue_id', 'created']
118ISSUESNAPSHOT_COLS = ['id', 'issue_id', 'shard', 'project_id', 'local_id',
119 'reporter_id', 'owner_id', 'status_id', 'period_start', 'period_end',
120 'is_open']
121ISSUESNAPSHOT2CC_COLS = ['issuesnapshot_id', 'cc_id']
122ISSUESNAPSHOT2COMPONENT_COLS = ['issuesnapshot_id', 'component_id']
123ISSUESNAPSHOT2LABEL_COLS = ['issuesnapshot_id', 'label_id']
124ISSUEPHASEDEF_COLS = ['id', 'name', 'rank']
125ISSUE2APPROVALVALUE_COLS = ['approval_id', 'issue_id', 'phase_id',
126 'status', 'setter_id', 'set_on']
127ISSUEAPPROVAL2APPROVER_COLS = ['approval_id', 'approver_id', 'issue_id']
128ISSUEAPPROVAL2COMMENT_COLS = ['approval_id', 'comment_id']
129
130CHUNK_SIZE = 1000
131
132
133class IssueIDTwoLevelCache(caches.AbstractTwoLevelCache):
134 """Class to manage RAM and memcache for Issue IDs."""
135
136 def __init__(self, cache_manager, issue_service):
137 super(IssueIDTwoLevelCache, self).__init__(
138 cache_manager, 'issue_id', 'issue_id:', int,
139 max_size=settings.issue_cache_max_size)
140 self.issue_service = issue_service
141
142 def _MakeCache(self, cache_manager, kind, max_size=None):
143 """Override normal RamCache creation with ValueCentricRamCache."""
144 return caches.ValueCentricRamCache(cache_manager, kind, max_size=max_size)
145
146 def _DeserializeIssueIDs(self, project_local_issue_ids):
147 """Convert database rows into a dict {(project_id, local_id): issue_id}."""
148 return {(project_id, local_id): issue_id
149 for (project_id, local_id, issue_id) in project_local_issue_ids}
150
151 def FetchItems(self, cnxn, keys):
152 """On RAM and memcache miss, hit the database."""
153 local_ids_by_pid = collections.defaultdict(list)
154 for project_id, local_id in keys:
155 local_ids_by_pid[project_id].append(local_id)
156
157 where = [] # We OR per-project pairs of conditions together.
158 for project_id, local_ids_in_project in local_ids_by_pid.items():
159 term_str = ('(Issue.project_id = %%s AND Issue.local_id IN (%s))' %
160 sql.PlaceHolders(local_ids_in_project))
161 where.append((term_str, [project_id] + local_ids_in_project))
162
163 rows = self.issue_service.issue_tbl.Select(
164 cnxn, cols=['project_id', 'local_id', 'id'],
165 where=where, or_where_conds=True)
166 return self._DeserializeIssueIDs(rows)
167
168 def _KeyToStr(self, key):
169 """This cache uses pairs of ints as keys. Convert them to strings."""
170 return '%d,%d' % key
171
172 def _StrToKey(self, key_str):
173 """This cache uses pairs of ints as keys. Convert them from strings."""
174 project_id_str, local_id_str = key_str.split(',')
175 return int(project_id_str), int(local_id_str)
176
177
178class IssueTwoLevelCache(caches.AbstractTwoLevelCache):
179 """Class to manage RAM and memcache for Issue PBs."""
180
181 def __init__(
182 self, cache_manager, issue_service, project_service, config_service):
183 super(IssueTwoLevelCache, self).__init__(
184 cache_manager, 'issue', 'issue:', tracker_pb2.Issue,
185 max_size=settings.issue_cache_max_size)
186 self.issue_service = issue_service
187 self.project_service = project_service
188 self.config_service = config_service
189
190 def _UnpackIssue(self, cnxn, issue_row):
191 """Partially construct an issue object using info from a DB row."""
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +0100192 (
193 issue_id, project_id, local_id, status_id, owner_id, reporter_id,
194 opened, closed, modified, owner_modified, status_modified,
195 component_modified, migration_modified, derived_owner_id,
196 derived_status_id, deleted, star_count, attachment_count,
197 is_spam) = issue_row
Copybara854996b2021-09-07 19:36:02 +0000198
199 issue = tracker_pb2.Issue()
200 project = self.project_service.GetProject(cnxn, project_id)
201 issue.project_name = project.project_name
202 issue.issue_id = issue_id
203 issue.project_id = project_id
204 issue.local_id = local_id
205 if status_id is not None:
206 status = self.config_service.LookupStatus(cnxn, project_id, status_id)
207 issue.status = status
208 issue.owner_id = owner_id or 0
209 issue.reporter_id = reporter_id or 0
210 issue.derived_owner_id = derived_owner_id or 0
211 if derived_status_id is not None:
212 derived_status = self.config_service.LookupStatus(
213 cnxn, project_id, derived_status_id)
214 issue.derived_status = derived_status
215 issue.deleted = bool(deleted)
216 if opened:
217 issue.opened_timestamp = opened
218 if closed:
219 issue.closed_timestamp = closed
220 if modified:
221 issue.modified_timestamp = modified
222 if owner_modified:
223 issue.owner_modified_timestamp = owner_modified
224 if status_modified:
225 issue.status_modified_timestamp = status_modified
226 if component_modified:
227 issue.component_modified_timestamp = component_modified
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +0100228 if migration_modified:
229 issue.migration_modified_timestamp = migration_modified
Copybara854996b2021-09-07 19:36:02 +0000230 issue.star_count = star_count
231 issue.attachment_count = attachment_count
232 issue.is_spam = bool(is_spam)
233 return issue
234
235 def _UnpackFieldValue(self, fv_row):
236 """Construct a field value object from a DB row."""
237 (issue_id, field_id, int_value, str_value, user_id, date_value, url_value,
238 derived, phase_id) = fv_row
239 fv = tracker_bizobj.MakeFieldValue(
240 field_id, int_value, str_value, user_id, date_value, url_value,
241 bool(derived), phase_id=phase_id)
242 return fv, issue_id
243
244 def _UnpackApprovalValue(self, av_row):
245 """Contruct an ApprovalValue PB from a DB row."""
246 (approval_id, issue_id, phase_id, status, setter_id, set_on) = av_row
247 if status:
248 status_enum = tracker_pb2.ApprovalStatus(status.upper())
249 else:
250 status_enum = tracker_pb2.ApprovalStatus.NOT_SET
251 av = tracker_pb2.ApprovalValue(
252 approval_id=approval_id, setter_id=setter_id, set_on=set_on,
253 status=status_enum, phase_id=phase_id)
254 return av, issue_id
255
256 def _UnpackPhase(self, phase_row):
257 """Construct a Phase PB from a DB row."""
258 (phase_id, name, rank) = phase_row
259 phase = tracker_pb2.Phase(
260 phase_id=phase_id, name=name, rank=rank)
261 return phase
262
263 def _DeserializeIssues(
264 self, cnxn, issue_rows, summary_rows, label_rows, component_rows,
265 cc_rows, notify_rows, fieldvalue_rows, relation_rows,
266 dangling_relation_rows, phase_rows, approvalvalue_rows,
267 av_approver_rows):
268 """Convert the given DB rows into a dict of Issue PBs."""
269 results_dict = {}
270 for issue_row in issue_rows:
271 issue = self._UnpackIssue(cnxn, issue_row)
272 results_dict[issue.issue_id] = issue
273
274 for issue_id, summary in summary_rows:
275 results_dict[issue_id].summary = summary
276
277 # TODO(jrobbins): it would be nice to order labels by rank and name.
278 for issue_id, label_id, derived in label_rows:
279 issue = results_dict.get(issue_id)
280 if not issue:
281 logging.info('Got label for an unknown issue: %r %r',
282 label_rows, issue_rows)
283 continue
284 label = self.config_service.LookupLabel(cnxn, issue.project_id, label_id)
285 assert label, ('Label ID %r on IID %r not found in project %r' %
286 (label_id, issue_id, issue.project_id))
287 if derived:
288 results_dict[issue_id].derived_labels.append(label)
289 else:
290 results_dict[issue_id].labels.append(label)
291
292 for issue_id, component_id, derived in component_rows:
293 if derived:
294 results_dict[issue_id].derived_component_ids.append(component_id)
295 else:
296 results_dict[issue_id].component_ids.append(component_id)
297
298 for issue_id, user_id, derived in cc_rows:
299 if derived:
300 results_dict[issue_id].derived_cc_ids.append(user_id)
301 else:
302 results_dict[issue_id].cc_ids.append(user_id)
303
304 for issue_id, email in notify_rows:
305 results_dict[issue_id].derived_notify_addrs.append(email)
306
307 for fv_row in fieldvalue_rows:
308 fv, issue_id = self._UnpackFieldValue(fv_row)
309 results_dict[issue_id].field_values.append(fv)
310
311 phases_by_id = {}
312 for phase_row in phase_rows:
313 phase = self._UnpackPhase(phase_row)
314 phases_by_id[phase.phase_id] = phase
315
316 approvers_dict = collections.defaultdict(list)
317 for approver_row in av_approver_rows:
318 approval_id, approver_id, issue_id = approver_row
319 approvers_dict[approval_id, issue_id].append(approver_id)
320
321 for av_row in approvalvalue_rows:
322 av, issue_id = self._UnpackApprovalValue(av_row)
323 av.approver_ids = approvers_dict[av.approval_id, issue_id]
324 results_dict[issue_id].approval_values.append(av)
325 if av.phase_id:
326 phase = phases_by_id[av.phase_id]
327 issue_phases = results_dict[issue_id].phases
328 if phase not in issue_phases:
329 issue_phases.append(phase)
330 # Order issue phases
331 for issue in results_dict.values():
332 if issue.phases:
333 issue.phases.sort(key=lambda phase: phase.rank)
334
335 for issue_id, dst_issue_id, kind, rank in relation_rows:
336 src_issue = results_dict.get(issue_id)
337 dst_issue = results_dict.get(dst_issue_id)
338 assert src_issue or dst_issue, (
339 'Neither source issue %r nor dest issue %r was found' %
340 (issue_id, dst_issue_id))
341 if src_issue:
342 if kind == 'blockedon':
343 src_issue.blocked_on_iids.append(dst_issue_id)
344 src_issue.blocked_on_ranks.append(rank)
345 elif kind == 'mergedinto':
346 src_issue.merged_into = dst_issue_id
347 else:
348 logging.info('unknown relation kind %r', kind)
349 continue
350
351 if dst_issue:
352 if kind == 'blockedon':
353 dst_issue.blocking_iids.append(issue_id)
354
355 for row in dangling_relation_rows:
356 issue_id, dst_issue_proj, dst_issue_id, ext_id, kind = row
357 src_issue = results_dict.get(issue_id)
358 if kind == 'blockedon':
359 src_issue.dangling_blocked_on_refs.append(
360 tracker_bizobj.MakeDanglingIssueRef(dst_issue_proj,
361 dst_issue_id, ext_id))
362 elif kind == 'blocking':
363 src_issue.dangling_blocking_refs.append(
364 tracker_bizobj.MakeDanglingIssueRef(dst_issue_proj, dst_issue_id,
365 ext_id))
366 elif kind == 'mergedinto':
367 src_issue.merged_into_external = ext_id
368 else:
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +0100369 logging.warning('unhandled danging relation kind %r', kind)
Copybara854996b2021-09-07 19:36:02 +0000370 continue
371
372 return results_dict
373
374 # Note: sharding is used to here to allow us to load issues from the replicas
375 # without placing load on the primary DB. Writes are not sharded.
376 # pylint: disable=arguments-differ
377 def FetchItems(self, cnxn, issue_ids, shard_id=None):
378 """Retrieve and deserialize issues."""
379 issue_rows = self.issue_service.issue_tbl.Select(
380 cnxn, cols=ISSUE_COLS, id=issue_ids, shard_id=shard_id)
381
382 summary_rows = self.issue_service.issuesummary_tbl.Select(
383 cnxn, cols=ISSUESUMMARY_COLS, shard_id=shard_id, issue_id=issue_ids)
384 label_rows = self.issue_service.issue2label_tbl.Select(
385 cnxn, cols=ISSUE2LABEL_COLS, shard_id=shard_id, issue_id=issue_ids)
386 component_rows = self.issue_service.issue2component_tbl.Select(
387 cnxn, cols=ISSUE2COMPONENT_COLS, shard_id=shard_id, issue_id=issue_ids)
388 cc_rows = self.issue_service.issue2cc_tbl.Select(
389 cnxn, cols=ISSUE2CC_COLS, shard_id=shard_id, issue_id=issue_ids)
390 notify_rows = self.issue_service.issue2notify_tbl.Select(
391 cnxn, cols=ISSUE2NOTIFY_COLS, shard_id=shard_id, issue_id=issue_ids)
392 fieldvalue_rows = self.issue_service.issue2fieldvalue_tbl.Select(
393 cnxn, cols=ISSUE2FIELDVALUE_COLS, shard_id=shard_id,
394 issue_id=issue_ids)
395 approvalvalue_rows = self.issue_service.issue2approvalvalue_tbl.Select(
396 cnxn, cols=ISSUE2APPROVALVALUE_COLS, issue_id=issue_ids)
397 phase_ids = [av_row[2] for av_row in approvalvalue_rows]
398 phase_rows = []
399 if phase_ids:
400 phase_rows = self.issue_service.issuephasedef_tbl.Select(
401 cnxn, cols=ISSUEPHASEDEF_COLS, id=list(set(phase_ids)))
402 av_approver_rows = self.issue_service.issueapproval2approver_tbl.Select(
403 cnxn, cols=ISSUEAPPROVAL2APPROVER_COLS, issue_id=issue_ids)
404 if issue_ids:
405 ph = sql.PlaceHolders(issue_ids)
406 blocked_on_rows = self.issue_service.issuerelation_tbl.Select(
407 cnxn, cols=ISSUERELATION_COLS, issue_id=issue_ids, kind='blockedon',
408 order_by=[('issue_id', []), ('rank DESC', []), ('dst_issue_id', [])])
409 blocking_rows = self.issue_service.issuerelation_tbl.Select(
410 cnxn, cols=ISSUERELATION_COLS, dst_issue_id=issue_ids,
411 kind='blockedon', order_by=[('issue_id', []), ('dst_issue_id', [])])
412 unique_blocking = tuple(
413 row for row in blocking_rows if row not in blocked_on_rows)
414 merge_rows = self.issue_service.issuerelation_tbl.Select(
415 cnxn, cols=ISSUERELATION_COLS,
416 where=[('(issue_id IN (%s) OR dst_issue_id IN (%s))' % (ph, ph),
417 issue_ids + issue_ids),
418 ('kind != %s', ['blockedon'])])
419 relation_rows = blocked_on_rows + unique_blocking + merge_rows
420 dangling_relation_rows = self.issue_service.danglingrelation_tbl.Select(
421 cnxn, cols=DANGLINGRELATION_COLS, issue_id=issue_ids)
422 else:
423 relation_rows = []
424 dangling_relation_rows = []
425
426 issue_dict = self._DeserializeIssues(
427 cnxn, issue_rows, summary_rows, label_rows, component_rows, cc_rows,
428 notify_rows, fieldvalue_rows, relation_rows, dangling_relation_rows,
429 phase_rows, approvalvalue_rows, av_approver_rows)
430 logging.info('IssueTwoLevelCache.FetchItems returning: %r', issue_dict)
431 return issue_dict
432
433
434class CommentTwoLevelCache(caches.AbstractTwoLevelCache):
435 """Class to manage RAM and memcache for IssueComment PBs."""
436
437 def __init__(self, cache_manager, issue_svc):
438 super(CommentTwoLevelCache, self).__init__(
439 cache_manager, 'comment', 'comment:', tracker_pb2.IssueComment,
440 max_size=settings.comment_cache_max_size)
441 self.issue_svc = issue_svc
442
443 # pylint: disable=arguments-differ
444 def FetchItems(self, cnxn, keys, shard_id=None):
445 comment_rows = self.issue_svc.comment_tbl.Select(cnxn,
446 cols=COMMENT_COLS, id=keys, shard_id=shard_id)
447
448 if len(comment_rows) < len(keys):
449 self.issue_svc.replication_lag_retries.increment()
450 logging.info('issue3755: expected %d, but got %d rows from shard %d',
451 len(keys), len(comment_rows), shard_id)
452 shard_id = None # Will use Primary DB.
453 comment_rows = self.issue_svc.comment_tbl.Select(
454 cnxn, cols=COMMENT_COLS, id=keys, shard_id=None)
455 logging.info(
456 'Retry got %d comment rows from the primary DB', len(comment_rows))
457
458 cids = [row[0] for row in comment_rows]
459 commentcontent_ids = [row[-1] for row in comment_rows]
460 content_rows = self.issue_svc.commentcontent_tbl.Select(
461 cnxn, cols=COMMENTCONTENT_COLS, id=commentcontent_ids,
462 shard_id=shard_id)
463 approval_rows = self.issue_svc.issueapproval2comment_tbl.Select(
464 cnxn, cols=ISSUEAPPROVAL2COMMENT_COLS, comment_id=cids)
465 amendment_rows = self.issue_svc.issueupdate_tbl.Select(
466 cnxn, cols=ISSUEUPDATE_COLS, comment_id=cids, shard_id=shard_id)
467 attachment_rows = self.issue_svc.attachment_tbl.Select(
468 cnxn, cols=ATTACHMENT_COLS, comment_id=cids, shard_id=shard_id)
469 importer_rows = self.issue_svc.commentimporter_tbl.Select(
470 cnxn, cols=COMMENTIMPORTER_COLS, comment_id=cids, shard_id=shard_id)
471
472 comments = self.issue_svc._DeserializeComments(
473 comment_rows, content_rows, amendment_rows, attachment_rows,
474 approval_rows, importer_rows)
475
476 comments_dict = {}
477 for comment in comments:
478 comments_dict[comment.id] = comment
479
480 return comments_dict
481
482
483class IssueService(object):
484 """The persistence layer for Monorail's issues, comments, and attachments."""
485 spam_labels = ts_mon.CounterMetric(
486 'monorail/issue_svc/spam_label',
487 'Issues created, broken down by spam label.',
488 [ts_mon.StringField('type')])
489 replication_lag_retries = ts_mon.CounterMetric(
490 'monorail/issue_svc/replication_lag_retries',
491 'Counts times that loading comments from a replica failed',
492 [])
493 issue_creations = ts_mon.CounterMetric(
494 'monorail/issue_svc/issue_creations',
495 'Counts times that issues were created',
496 [])
497 comment_creations = ts_mon.CounterMetric(
498 'monorail/issue_svc/comment_creations',
499 'Counts times that comments were created',
500 [])
501
502 def __init__(self, project_service, config_service, cache_manager,
503 chart_service):
504 """Initialize this object so that it is ready to use.
505
506 Args:
507 project_service: services object for project info.
508 config_service: services object for tracker configuration info.
509 cache_manager: local cache with distributed invalidation.
510 chart_service (ChartService): An instance of ChartService.
511 """
512 # Tables that represent issue data.
513 self.issue_tbl = sql.SQLTableManager(ISSUE_TABLE_NAME)
514 self.issuesummary_tbl = sql.SQLTableManager(ISSUESUMMARY_TABLE_NAME)
515 self.issue2label_tbl = sql.SQLTableManager(ISSUE2LABEL_TABLE_NAME)
516 self.issue2component_tbl = sql.SQLTableManager(ISSUE2COMPONENT_TABLE_NAME)
517 self.issue2cc_tbl = sql.SQLTableManager(ISSUE2CC_TABLE_NAME)
518 self.issue2notify_tbl = sql.SQLTableManager(ISSUE2NOTIFY_TABLE_NAME)
519 self.issue2fieldvalue_tbl = sql.SQLTableManager(ISSUE2FIELDVALUE_TABLE_NAME)
520 self.issuerelation_tbl = sql.SQLTableManager(ISSUERELATION_TABLE_NAME)
521 self.danglingrelation_tbl = sql.SQLTableManager(DANGLINGRELATION_TABLE_NAME)
522 self.issueformerlocations_tbl = sql.SQLTableManager(
523 ISSUEFORMERLOCATIONS_TABLE_NAME)
524 self.issuesnapshot_tbl = sql.SQLTableManager(ISSUESNAPSHOT_TABLE_NAME)
525 self.issuesnapshot2cc_tbl = sql.SQLTableManager(
526 ISSUESNAPSHOT2CC_TABLE_NAME)
527 self.issuesnapshot2component_tbl = sql.SQLTableManager(
528 ISSUESNAPSHOT2COMPONENT_TABLE_NAME)
529 self.issuesnapshot2label_tbl = sql.SQLTableManager(
530 ISSUESNAPSHOT2LABEL_TABLE_NAME)
531 self.issuephasedef_tbl = sql.SQLTableManager(ISSUEPHASEDEF_TABLE_NAME)
532 self.issue2approvalvalue_tbl = sql.SQLTableManager(
533 ISSUE2APPROVALVALUE_TABLE_NAME)
534 self.issueapproval2approver_tbl = sql.SQLTableManager(
535 ISSUEAPPROVAL2APPROVER_TABLE_NAME)
536 self.issueapproval2comment_tbl = sql.SQLTableManager(
537 ISSUEAPPROVAL2COMMENT_TABLE_NAME)
538
539 # Tables that represent comments.
540 self.comment_tbl = sql.SQLTableManager(COMMENT_TABLE_NAME)
541 self.commentcontent_tbl = sql.SQLTableManager(COMMENTCONTENT_TABLE_NAME)
542 self.commentimporter_tbl = sql.SQLTableManager(COMMENTIMPORTER_TABLE_NAME)
543 self.issueupdate_tbl = sql.SQLTableManager(ISSUEUPDATE_TABLE_NAME)
544 self.attachment_tbl = sql.SQLTableManager(ATTACHMENT_TABLE_NAME)
545
546 # Tables for cron tasks.
547 self.reindexqueue_tbl = sql.SQLTableManager(REINDEXQUEUE_TABLE_NAME)
548
549 # Tables for generating sequences of local IDs.
550 self.localidcounter_tbl = sql.SQLTableManager(LOCALIDCOUNTER_TABLE_NAME)
551
552 # Like a dictionary {(project_id, local_id): issue_id}
553 # Use value centric cache here because we cannot store a tuple in the
554 # Invalidate table.
555 self.issue_id_2lc = IssueIDTwoLevelCache(cache_manager, self)
556 # Like a dictionary {issue_id: issue}
557 self.issue_2lc = IssueTwoLevelCache(
558 cache_manager, self, project_service, config_service)
559
560 # Like a dictionary {comment_id: comment)
561 self.comment_2lc = CommentTwoLevelCache(
562 cache_manager, self)
563
564 self._config_service = config_service
565 self.chart_service = chart_service
566
567 ### Issue ID lookups
568
569 def LookupIssueIDsFollowMoves(self, cnxn, project_local_id_pairs):
570 # type: (MonorailConnection, Sequence[Tuple(int, int)]) ->
571 # (Sequence[int], Sequence[Tuple(int, int)])
572 """Find the global issue IDs given the project ID and local ID of each.
573
574 If any (project_id, local_id) pairs refer to an issue that has been moved,
575 the issue ID will still be returned.
576
577 Args:
578 cnxn: Monorail connection.
579 project_local_id_pairs: (project_id, local_id) pairs to look up.
580
581 Returns:
582 A tuple of two items.
583 1. A sequence of global issue IDs in the `project_local_id_pairs` order.
584 2. A sequence of (project_id, local_id) containing each pair provided
585 for which no matching issue is found.
586 """
587
588 issue_id_dict, misses = self.issue_id_2lc.GetAll(
589 cnxn, project_local_id_pairs)
590 for miss in misses:
591 project_id, local_id = miss
592 issue_id = int(
593 self.issueformerlocations_tbl.SelectValue(
594 cnxn,
595 'issue_id',
596 default=0,
597 project_id=project_id,
598 local_id=local_id))
599 if issue_id:
600 misses.remove(miss)
601 issue_id_dict[miss] = issue_id
602 # Put the Issue IDs in the order specified by project_local_id_pairs
603 issue_ids = [
604 issue_id_dict[pair]
605 for pair in project_local_id_pairs
606 if pair in issue_id_dict
607 ]
608
609 return issue_ids, misses
610
611 def LookupIssueIDs(self, cnxn, project_local_id_pairs):
612 """Find the global issue IDs given the project ID and local ID of each."""
613 issue_id_dict, misses = self.issue_id_2lc.GetAll(
614 cnxn, project_local_id_pairs)
615
616 # Put the Issue IDs in the order specified by project_local_id_pairs
617 issue_ids = [issue_id_dict[pair] for pair in project_local_id_pairs
618 if pair in issue_id_dict]
619
620 return issue_ids, misses
621
622 def LookupIssueID(self, cnxn, project_id, local_id):
623 """Find the global issue ID given the project ID and local ID."""
624 issue_ids, _misses = self.LookupIssueIDs(cnxn, [(project_id, local_id)])
625 try:
626 return issue_ids[0]
627 except IndexError:
628 raise exceptions.NoSuchIssueException()
629
630 def ResolveIssueRefs(
631 self, cnxn, ref_projects, default_project_name, refs):
632 """Look up all the referenced issues and return their issue_ids.
633
634 Args:
635 cnxn: connection to SQL database.
636 ref_projects: pre-fetched dict {project_name: project} of all projects
637 mentioned in the refs as well as the default project.
638 default_project_name: string name of the current project, this is used
639 when the project_name in a ref is None.
640 refs: list of (project_name, local_id) pairs. These are parsed from
641 textual references in issue descriptions, comments, and the input
642 in the blocked-on field.
643
644 Returns:
645 A list of issue_ids for all the referenced issues. References to issues
646 in deleted projects and any issues not found are simply ignored.
647 """
648 if not refs:
649 return [], []
650
651 project_local_id_pairs = []
652 for project_name, local_id in refs:
653 project = ref_projects.get(project_name or default_project_name)
654 if not project or project.state == project_pb2.ProjectState.DELETABLE:
655 continue # ignore any refs to issues in deleted projects
656 project_local_id_pairs.append((project.project_id, local_id))
657
658 return self.LookupIssueIDs(cnxn, project_local_id_pairs) # tuple
659
660 def LookupIssueRefs(self, cnxn, issue_ids):
661 """Return {issue_id: (project_name, local_id)} for each issue_id."""
662 issue_dict, _misses = self.GetIssuesDict(cnxn, issue_ids)
663 return {
664 issue_id: (issue.project_name, issue.local_id)
665 for issue_id, issue in issue_dict.items()}
666
667 ### Issue objects
668
669 def CreateIssue(
670 self,
671 cnxn,
672 services,
673 issue,
674 marked_description,
675 attachments=None,
676 index_now=False,
677 importer_id=None):
678 """Create and store a new issue with all the given information.
679
680 Args:
681 cnxn: connection to SQL database.
682 services: persistence layer for users, issues, and projects.
683 issue: Issue PB to create.
684 marked_description: issue description with initial HTML markup.
685 attachments: [(filename, contents, mimetype),...] attachments uploaded at
686 the time the comment was made.
687 index_now: True if the issue should be updated in the full text index.
688 importer_id: optional user ID of API client importing issues for users.
689
690 Returns:
691 A tuple (the newly created Issue PB and Comment PB for the
692 issue description).
693 """
694 project_id = issue.project_id
695 reporter_id = issue.reporter_id
696 timestamp = issue.opened_timestamp
697 config = self._config_service.GetProjectConfig(cnxn, project_id)
698
699 iids_to_invalidate = set()
700 if len(issue.blocked_on_iids) != 0:
701 iids_to_invalidate.update(issue.blocked_on_iids)
702 if len(issue.blocking_iids) != 0:
703 iids_to_invalidate.update(issue.blocking_iids)
704
705 comment = self._MakeIssueComment(
706 project_id, reporter_id, marked_description,
707 attachments=attachments, timestamp=timestamp,
708 is_description=True, importer_id=importer_id)
709
710 reporter = services.user.GetUser(cnxn, reporter_id)
711 project = services.project.GetProject(cnxn, project_id)
712 reporter_auth = authdata.AuthData.FromUserID(cnxn, reporter_id, services)
713 is_project_member = framework_bizobj.UserIsInProject(
714 project, reporter_auth.effective_ids)
715 classification = services.spam.ClassifyIssue(
716 issue, comment, reporter, is_project_member)
717
718 if classification['confidence_is_spam'] > settings.classifier_spam_thresh:
719 issue.is_spam = True
720 predicted_label = 'spam'
721 else:
722 predicted_label = 'ham'
723
724 logging.info('classified new issue as %s' % predicted_label)
725 self.spam_labels.increment({'type': predicted_label})
726
727 # Create approval surveys
728 approval_comments = []
729 if len(issue.approval_values) != 0:
730 approval_defs_by_id = {ad.approval_id: ad for ad in config.approval_defs}
731 for av in issue.approval_values:
732 ad = approval_defs_by_id.get(av.approval_id)
733 if ad:
734 survey = ''
735 if ad.survey:
736 questions = ad.survey.split('\n')
737 survey = '\n'.join(['<b>' + q + '</b>' for q in questions])
738 approval_comments.append(self._MakeIssueComment(
739 project_id, reporter_id, survey, timestamp=timestamp,
740 is_description=True, approval_id=ad.approval_id))
741 else:
742 logging.info('Could not find ApprovalDef with approval_id %r',
743 av.approval_id)
744
745 issue.local_id = self.AllocateNextLocalID(cnxn, project_id)
746 self.issue_creations.increment()
747 issue_id = self.InsertIssue(cnxn, issue)
748 comment.issue_id = issue_id
749 self.InsertComment(cnxn, comment)
750 for approval_comment in approval_comments:
751 approval_comment.issue_id = issue_id
752 self.InsertComment(cnxn, approval_comment)
753
754 issue.issue_id = issue_id
755
756 # ClassifyIssue only returns confidence_is_spam, but
757 # RecordClassifierIssueVerdict records confidence of
758 # ham or spam. Therefore if ham, invert score.
759 confidence = classification['confidence_is_spam']
760 if not issue.is_spam:
761 confidence = 1.0 - confidence
762
763 services.spam.RecordClassifierIssueVerdict(
764 cnxn, issue, predicted_label=='spam',
765 confidence, classification['failed_open'])
766
767 if permissions.HasRestrictions(issue, 'view'):
768 self._config_service.InvalidateMemcache(
769 [issue], key_prefix='nonviewable:')
770
771 # Add a comment to existing issues saying they are now blocking or
772 # blocked on this issue.
773 blocked_add_issues = self.GetIssues(cnxn, issue.blocked_on_iids)
774 for add_issue in blocked_add_issues:
775 self.CreateIssueComment(
776 cnxn, add_issue, reporter_id, content='',
777 amendments=[tracker_bizobj.MakeBlockingAmendment(
778 [(issue.project_name, issue.local_id)], [],
779 default_project_name=add_issue.project_name)])
780 blocking_add_issues = self.GetIssues(cnxn, issue.blocking_iids)
781 for add_issue in blocking_add_issues:
782 self.CreateIssueComment(
783 cnxn, add_issue, reporter_id, content='',
784 amendments=[tracker_bizobj.MakeBlockedOnAmendment(
785 [(issue.project_name, issue.local_id)], [],
786 default_project_name=add_issue.project_name)])
787
788 self._UpdateIssuesModified(
789 cnxn, iids_to_invalidate, modified_timestamp=timestamp)
790
791 if index_now:
792 tracker_fulltext.IndexIssues(
793 cnxn, [issue], services.user, self, self._config_service)
794 else:
795 self.EnqueueIssuesForIndexing(cnxn, [issue.issue_id])
796
797 return issue, comment
798
799 def AllocateNewLocalIDs(self, cnxn, issues):
800 # Filter to just the issues that need new local IDs.
801 issues = [issue for issue in issues if issue.local_id < 0]
802
803 for issue in issues:
804 if issue.local_id < 0:
805 issue.local_id = self.AllocateNextLocalID(cnxn, issue.project_id)
806
807 self.UpdateIssues(cnxn, issues)
808
809 logging.info("AllocateNewLocalIDs")
810
811 def GetAllIssuesInProject(
812 self, cnxn, project_id, min_local_id=None, use_cache=True):
813 """Special query to efficiently get ALL issues in a project.
814
815 This is not done while the user is waiting, only by backround tasks.
816
817 Args:
818 cnxn: connection to SQL database.
819 project_id: the ID of the project.
820 min_local_id: optional int to start at.
821 use_cache: optional boolean to turn off using the cache.
822
823 Returns:
824 A list of Issue protocol buffers for all issues.
825 """
826 all_local_ids = self.GetAllLocalIDsInProject(
827 cnxn, project_id, min_local_id=min_local_id)
828 return self.GetIssuesByLocalIDs(
829 cnxn, project_id, all_local_ids, use_cache=use_cache)
830
831 def GetAnyOnHandIssue(self, issue_ids, start=None, end=None):
832 """Get any one issue from RAM or memcache, otherwise return None."""
833 return self.issue_2lc.GetAnyOnHandItem(issue_ids, start=start, end=end)
834
835 def GetIssuesDict(self, cnxn, issue_ids, use_cache=True, shard_id=None):
836 # type: (MonorailConnection, Collection[int], Optional[Boolean],
837 # Optional[int]) -> (Dict[int, Issue], Sequence[int])
838 """Get a dict {iid: issue} from the DB or cache.
839
840 Returns:
841 A dict {iid: issue} from the DB or cache.
842 A sequence of iid that could not be found.
843 """
844 issue_dict, missed_iids = self.issue_2lc.GetAll(
845 cnxn, issue_ids, use_cache=use_cache, shard_id=shard_id)
846 if not use_cache:
847 for issue in issue_dict.values():
848 issue.assume_stale = False
849 return issue_dict, missed_iids
850
851 def GetIssues(self, cnxn, issue_ids, use_cache=True, shard_id=None):
852 # type: (MonorailConnection, Sequence[int], Optional[Boolean],
853 # Optional[int]) -> (Sequence[int])
854 """Get a list of Issue PBs from the DB or cache.
855
856 Args:
857 cnxn: connection to SQL database.
858 issue_ids: integer global issue IDs of the issues.
859 use_cache: optional boolean to turn off using the cache.
860 shard_id: optional int shard_id to limit retrieval.
861
862 Returns:
863 A list of Issue PBs in the same order as the given issue_ids.
864 """
865 issue_dict, _misses = self.GetIssuesDict(
866 cnxn, issue_ids, use_cache=use_cache, shard_id=shard_id)
867
868 # Return a list that is ordered the same as the given issue_ids.
869 issue_list = [issue_dict[issue_id] for issue_id in issue_ids
870 if issue_id in issue_dict]
871
872 return issue_list
873
874 def GetIssue(self, cnxn, issue_id, use_cache=True):
875 """Get one Issue PB from the DB.
876
877 Args:
878 cnxn: connection to SQL database.
879 issue_id: integer global issue ID of the issue.
880 use_cache: optional boolean to turn off using the cache.
881
882 Returns:
883 The requested Issue protocol buffer.
884
885 Raises:
886 NoSuchIssueException: the issue was not found.
887 """
888 issues = self.GetIssues(cnxn, [issue_id], use_cache=use_cache)
889 try:
890 return issues[0]
891 except IndexError:
892 raise exceptions.NoSuchIssueException()
893
894 def GetIssuesByLocalIDs(
895 self, cnxn, project_id, local_id_list, use_cache=True, shard_id=None):
896 """Get all the requested issues.
897
898 Args:
899 cnxn: connection to SQL database.
900 project_id: int ID of the project to which the issues belong.
901 local_id_list: list of integer local IDs for the requested issues.
902 use_cache: optional boolean to turn off using the cache.
903 shard_id: optional int shard_id to choose a replica.
904
905 Returns:
906 List of Issue PBs for the requested issues. The result Issues
907 will be ordered in the same order as local_id_list.
908 """
909 issue_ids_to_fetch, _misses = self.LookupIssueIDs(
910 cnxn, [(project_id, local_id) for local_id in local_id_list])
911 issues = self.GetIssues(
912 cnxn, issue_ids_to_fetch, use_cache=use_cache, shard_id=shard_id)
913 return issues
914
915 def GetIssueByLocalID(self, cnxn, project_id, local_id, use_cache=True):
916 """Get one Issue PB from the DB.
917
918 Args:
919 cnxn: connection to SQL database.
920 project_id: the ID of the project to which the issue belongs.
921 local_id: integer local ID of the issue.
922 use_cache: optional boolean to turn off using the cache.
923
924 Returns:
925 The requested Issue protocol buffer.
926 """
927 issues = self.GetIssuesByLocalIDs(
928 cnxn, project_id, [local_id], use_cache=use_cache)
929 try:
930 return issues[0]
931 except IndexError:
932 raise exceptions.NoSuchIssueException(
933 'The issue %s:%d does not exist.' % (project_id, local_id))
934
935 def GetOpenAndClosedIssues(self, cnxn, issue_ids):
936 """Return the requested issues in separate open and closed lists.
937
938 Args:
939 cnxn: connection to SQL database.
940 issue_ids: list of int issue issue_ids.
941
942 Returns:
943 A pair of lists, the first with open issues, second with closed issues.
944 """
945 if not issue_ids:
946 return [], [] # make one common case efficient
947
948 issues = self.GetIssues(cnxn, issue_ids)
949 project_ids = {issue.project_id for issue in issues}
950 configs = self._config_service.GetProjectConfigs(cnxn, project_ids)
951 open_issues = []
952 closed_issues = []
953 for issue in issues:
954 config = configs[issue.project_id]
955 if tracker_helpers.MeansOpenInProject(
956 tracker_bizobj.GetStatus(issue), config):
957 open_issues.append(issue)
958 else:
959 closed_issues.append(issue)
960
961 return open_issues, closed_issues
962
963 # TODO(crbug.com/monorail/7822): Delete this method when V0 API retired.
964 def GetCurrentLocationOfMovedIssue(self, cnxn, project_id, local_id):
965 """Return the current location of a moved issue based on old location."""
966 issue_id = int(self.issueformerlocations_tbl.SelectValue(
967 cnxn, 'issue_id', default=0, project_id=project_id, local_id=local_id))
968 if not issue_id:
969 return None, None
970 project_id, local_id = self.issue_tbl.SelectRow(
971 cnxn, cols=['project_id', 'local_id'], id=issue_id)
972 return project_id, local_id
973
974 def GetPreviousLocations(self, cnxn, issue):
975 """Get all the previous locations of an issue."""
976 location_rows = self.issueformerlocations_tbl.Select(
977 cnxn, cols=['project_id', 'local_id'], issue_id=issue.issue_id)
978 locations = [(pid, local_id) for (pid, local_id) in location_rows
979 if pid != issue.project_id or local_id != issue.local_id]
980 return locations
981
982 def GetCommentsByUser(self, cnxn, user_id):
983 """Get all comments created by a user"""
984 comments = self.GetComments(cnxn, commenter_id=user_id,
985 is_description=False, limit=10000)
986 return comments
987
988 def GetIssueActivity(self, cnxn, num=50, before=None, after=None,
989 project_ids=None, user_ids=None, ascending=False):
990
991 if project_ids:
992 use_clause = (
993 'USE INDEX (project_id) USE INDEX FOR ORDER BY (project_id)')
994 elif user_ids:
995 use_clause = (
996 'USE INDEX (commenter_id) USE INDEX FOR ORDER BY (commenter_id)')
997 else:
998 use_clause = ''
999
1000 # TODO(jrobbins): make this into a persist method.
1001 # TODO(jrobbins): this really needs permission checking in SQL, which
1002 # will be slow.
1003 where_conds = [('Issue.id = Comment.issue_id', [])]
1004 if project_ids is not None:
1005 cond_str = 'Comment.project_id IN (%s)' % sql.PlaceHolders(project_ids)
1006 where_conds.append((cond_str, project_ids))
1007 if user_ids is not None:
1008 cond_str = 'Comment.commenter_id IN (%s)' % sql.PlaceHolders(user_ids)
1009 where_conds.append((cond_str, user_ids))
1010
1011 if before:
1012 where_conds.append(('created < %s', [before]))
1013 if after:
1014 where_conds.append(('created > %s', [after]))
1015 if ascending:
1016 order_by = [('created', [])]
1017 else:
1018 order_by = [('created DESC', [])]
1019
1020 comments = self.GetComments(
1021 cnxn, joins=[('Issue', [])], deleted_by=None, where=where_conds,
1022 use_clause=use_clause, order_by=order_by, limit=num + 1)
1023 return comments
1024
1025 def GetIssueIDsReportedByUser(self, cnxn, user_id):
1026 """Get all issue IDs created by a user"""
1027 rows = self.issue_tbl.Select(cnxn, cols=['id'], reporter_id=user_id,
1028 limit=10000)
1029 return [row[0] for row in rows]
1030
1031 def InsertIssue(self, cnxn, issue):
1032 """Store the given issue in SQL.
1033
1034 Args:
1035 cnxn: connection to SQL database.
1036 issue: Issue PB to insert into the database.
1037
1038 Returns:
1039 The int issue_id of the newly created issue.
1040 """
1041 status_id = self._config_service.LookupStatusID(
1042 cnxn, issue.project_id, issue.status)
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01001043 row = (
1044 issue.project_id, issue.local_id, status_id, issue.owner_id or
1045 None, issue.reporter_id, issue.opened_timestamp, issue.closed_timestamp,
1046 issue.modified_timestamp, issue.owner_modified_timestamp,
1047 issue.status_modified_timestamp, issue.component_modified_timestamp,
1048 issue.migration_modified_timestamp, issue.derived_owner_id or None,
1049 self._config_service.LookupStatusID(
1050 cnxn, issue.project_id, issue.derived_status), bool(issue.deleted),
1051 issue.star_count, issue.attachment_count, issue.is_spam)
Copybara854996b2021-09-07 19:36:02 +00001052 # ISSUE_COLs[1:] to skip setting the ID
1053 # Insert into the Primary DB.
1054 generated_ids = self.issue_tbl.InsertRows(
1055 cnxn, ISSUE_COLS[1:], [row], commit=False, return_generated_ids=True)
1056 issue_id = generated_ids[0]
1057 issue.issue_id = issue_id
1058 self.issue_tbl.Update(
1059 cnxn, {'shard': issue_id % settings.num_logical_shards},
1060 id=issue.issue_id, commit=False)
1061
1062 self._UpdateIssuesSummary(cnxn, [issue], commit=False)
1063 self._UpdateIssuesLabels(cnxn, [issue], commit=False)
1064 self._UpdateIssuesFields(cnxn, [issue], commit=False)
1065 self._UpdateIssuesComponents(cnxn, [issue], commit=False)
1066 self._UpdateIssuesCc(cnxn, [issue], commit=False)
1067 self._UpdateIssuesNotify(cnxn, [issue], commit=False)
1068 self._UpdateIssuesRelation(cnxn, [issue], commit=False)
1069 self._UpdateIssuesApprovals(cnxn, issue, commit=False)
1070 self.chart_service.StoreIssueSnapshots(cnxn, [issue], commit=False)
1071 cnxn.Commit()
1072 self._config_service.InvalidateMemcache([issue])
1073
1074 return issue_id
1075
1076 def UpdateIssues(
1077 self, cnxn, issues, update_cols=None, just_derived=False, commit=True,
1078 invalidate=True):
1079 """Update the given issues in SQL.
1080
1081 Args:
1082 cnxn: connection to SQL database.
1083 issues: list of issues to update, these must have been loaded with
1084 use_cache=False so that issue.assume_stale is False.
1085 update_cols: optional list of just the field names to update.
1086 just_derived: set to True when only updating derived fields.
1087 commit: set to False to skip the DB commit and do it in the caller.
1088 invalidate: set to False to leave cache invalidatation to the caller.
1089 """
1090 if not issues:
1091 return
1092
1093 for issue in issues: # slow, but mysql will not allow REPLACE rows.
1094 assert not issue.assume_stale, (
1095 'issue2514: Storing issue that might be stale: %r' % issue)
1096 delta = {
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01001097 'project_id':
1098 issue.project_id,
1099 'local_id':
1100 issue.local_id,
1101 'owner_id':
1102 issue.owner_id or None,
1103 'status_id':
1104 self._config_service.LookupStatusID(
1105 cnxn, issue.project_id, issue.status) or None,
1106 'opened':
1107 issue.opened_timestamp,
1108 'closed':
1109 issue.closed_timestamp,
1110 'modified':
1111 issue.modified_timestamp,
1112 'owner_modified':
1113 issue.owner_modified_timestamp,
1114 'status_modified':
1115 issue.status_modified_timestamp,
1116 'component_modified':
1117 issue.component_modified_timestamp,
1118 'migration_modified':
1119 issue.migration_modified_timestamp,
1120 'derived_owner_id':
1121 issue.derived_owner_id or None,
1122 'derived_status_id':
1123 self._config_service.LookupStatusID(
1124 cnxn, issue.project_id, issue.derived_status) or None,
1125 'deleted':
1126 bool(issue.deleted),
1127 'star_count':
1128 issue.star_count,
1129 'attachment_count':
1130 issue.attachment_count,
1131 'is_spam':
1132 issue.is_spam,
1133 }
Copybara854996b2021-09-07 19:36:02 +00001134 if update_cols is not None:
1135 delta = {key: val for key, val in delta.items()
1136 if key in update_cols}
1137 self.issue_tbl.Update(cnxn, delta, id=issue.issue_id, commit=False)
1138
1139 if not update_cols:
1140 self._UpdateIssuesLabels(cnxn, issues, commit=False)
1141 self._UpdateIssuesCc(cnxn, issues, commit=False)
1142 self._UpdateIssuesFields(cnxn, issues, commit=False)
1143 self._UpdateIssuesComponents(cnxn, issues, commit=False)
1144 self._UpdateIssuesNotify(cnxn, issues, commit=False)
1145 if not just_derived:
1146 self._UpdateIssuesSummary(cnxn, issues, commit=False)
1147 self._UpdateIssuesRelation(cnxn, issues, commit=False)
1148
1149 self.chart_service.StoreIssueSnapshots(cnxn, issues, commit=False)
1150
1151 iids_to_invalidate = [issue.issue_id for issue in issues]
1152 if just_derived and invalidate:
1153 self.issue_2lc.InvalidateAllKeys(cnxn, iids_to_invalidate)
1154 elif invalidate:
1155 self.issue_2lc.InvalidateKeys(cnxn, iids_to_invalidate)
1156 if commit:
1157 cnxn.Commit()
1158 if invalidate:
1159 self._config_service.InvalidateMemcache(issues)
1160
1161 def UpdateIssue(
1162 self, cnxn, issue, update_cols=None, just_derived=False, commit=True,
1163 invalidate=True):
1164 """Update the given issue in SQL.
1165
1166 Args:
1167 cnxn: connection to SQL database.
1168 issue: the issue to update.
1169 update_cols: optional list of just the field names to update.
1170 just_derived: set to True when only updating derived fields.
1171 commit: set to False to skip the DB commit and do it in the caller.
1172 invalidate: set to False to leave cache invalidatation to the caller.
1173 """
1174 self.UpdateIssues(
1175 cnxn, [issue], update_cols=update_cols, just_derived=just_derived,
1176 commit=commit, invalidate=invalidate)
1177
1178 def _UpdateIssuesSummary(self, cnxn, issues, commit=True):
1179 """Update the IssueSummary table rows for the given issues."""
1180 self.issuesummary_tbl.InsertRows(
1181 cnxn, ISSUESUMMARY_COLS,
1182 [(issue.issue_id, issue.summary) for issue in issues],
1183 replace=True, commit=commit)
1184
1185 def _UpdateIssuesLabels(self, cnxn, issues, commit=True):
1186 """Update the Issue2Label table rows for the given issues."""
1187 label_rows = []
1188 for issue in issues:
1189 issue_shard = issue.issue_id % settings.num_logical_shards
1190 # TODO(jrobbins): If the user adds many novel labels in one issue update,
1191 # that could be slow. Solution is to add all new labels in a batch first.
1192 label_rows.extend(
1193 (issue.issue_id,
1194 self._config_service.LookupLabelID(cnxn, issue.project_id, label),
1195 False,
1196 issue_shard)
1197 for label in issue.labels)
1198 label_rows.extend(
1199 (issue.issue_id,
1200 self._config_service.LookupLabelID(cnxn, issue.project_id, label),
1201 True,
1202 issue_shard)
1203 for label in issue.derived_labels)
1204
1205 self.issue2label_tbl.Delete(
1206 cnxn, issue_id=[issue.issue_id for issue in issues],
1207 commit=False)
1208 self.issue2label_tbl.InsertRows(
1209 cnxn, ISSUE2LABEL_COLS + ['issue_shard'],
1210 label_rows, ignore=True, commit=commit)
1211
1212 def _UpdateIssuesFields(self, cnxn, issues, commit=True):
1213 """Update the Issue2FieldValue table rows for the given issues."""
1214 fieldvalue_rows = []
1215 for issue in issues:
1216 issue_shard = issue.issue_id % settings.num_logical_shards
1217 for fv in issue.field_values:
1218 fieldvalue_rows.append(
1219 (issue.issue_id, fv.field_id, fv.int_value, fv.str_value,
1220 fv.user_id or None, fv.date_value, fv.url_value, fv.derived,
1221 fv.phase_id or None, issue_shard))
1222
1223 self.issue2fieldvalue_tbl.Delete(
1224 cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
1225 self.issue2fieldvalue_tbl.InsertRows(
1226 cnxn, ISSUE2FIELDVALUE_COLS + ['issue_shard'],
1227 fieldvalue_rows, commit=commit)
1228
1229 def _UpdateIssuesComponents(self, cnxn, issues, commit=True):
1230 """Update the Issue2Component table rows for the given issues."""
1231 issue2component_rows = []
1232 for issue in issues:
1233 issue_shard = issue.issue_id % settings.num_logical_shards
1234 issue2component_rows.extend(
1235 (issue.issue_id, component_id, False, issue_shard)
1236 for component_id in issue.component_ids)
1237 issue2component_rows.extend(
1238 (issue.issue_id, component_id, True, issue_shard)
1239 for component_id in issue.derived_component_ids)
1240
1241 self.issue2component_tbl.Delete(
1242 cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
1243 self.issue2component_tbl.InsertRows(
1244 cnxn, ISSUE2COMPONENT_COLS + ['issue_shard'],
1245 issue2component_rows, ignore=True, commit=commit)
1246
1247 def _UpdateIssuesCc(self, cnxn, issues, commit=True):
1248 """Update the Issue2Cc table rows for the given issues."""
1249 cc_rows = []
1250 for issue in issues:
1251 issue_shard = issue.issue_id % settings.num_logical_shards
1252 cc_rows.extend(
1253 (issue.issue_id, cc_id, False, issue_shard)
1254 for cc_id in issue.cc_ids)
1255 cc_rows.extend(
1256 (issue.issue_id, cc_id, True, issue_shard)
1257 for cc_id in issue.derived_cc_ids)
1258
1259 self.issue2cc_tbl.Delete(
1260 cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
1261 self.issue2cc_tbl.InsertRows(
1262 cnxn, ISSUE2CC_COLS + ['issue_shard'],
1263 cc_rows, ignore=True, commit=commit)
1264
1265 def _UpdateIssuesNotify(self, cnxn, issues, commit=True):
1266 """Update the Issue2Notify table rows for the given issues."""
1267 notify_rows = []
1268 for issue in issues:
1269 derived_rows = [[issue.issue_id, email]
1270 for email in issue.derived_notify_addrs]
1271 notify_rows.extend(derived_rows)
1272
1273 self.issue2notify_tbl.Delete(
1274 cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
1275 self.issue2notify_tbl.InsertRows(
1276 cnxn, ISSUE2NOTIFY_COLS, notify_rows, ignore=True, commit=commit)
1277
1278 def _UpdateIssuesRelation(self, cnxn, issues, commit=True):
1279 """Update the IssueRelation table rows for the given issues."""
1280 relation_rows = []
1281 blocking_rows = []
1282 dangling_relation_rows = []
1283 for issue in issues:
1284 for i, dst_issue_id in enumerate(issue.blocked_on_iids):
1285 rank = issue.blocked_on_ranks[i]
1286 relation_rows.append((issue.issue_id, dst_issue_id, 'blockedon', rank))
1287 for dst_issue_id in issue.blocking_iids:
1288 blocking_rows.append((dst_issue_id, issue.issue_id, 'blockedon'))
1289 for dst_ref in issue.dangling_blocked_on_refs:
1290 if dst_ref.ext_issue_identifier:
1291 dangling_relation_rows.append((
1292 issue.issue_id, None, None,
1293 dst_ref.ext_issue_identifier, 'blockedon'))
1294 else:
1295 dangling_relation_rows.append((
1296 issue.issue_id, dst_ref.project, dst_ref.issue_id,
1297 None, 'blockedon'))
1298 for dst_ref in issue.dangling_blocking_refs:
1299 if dst_ref.ext_issue_identifier:
1300 dangling_relation_rows.append((
1301 issue.issue_id, None, None,
1302 dst_ref.ext_issue_identifier, 'blocking'))
1303 else:
1304 dangling_relation_rows.append((
1305 issue.issue_id, dst_ref.project, dst_ref.issue_id,
1306 dst_ref.ext_issue_identifier, 'blocking'))
1307 if issue.merged_into:
1308 relation_rows.append((
1309 issue.issue_id, issue.merged_into, 'mergedinto', None))
1310 if issue.merged_into_external:
1311 dangling_relation_rows.append((
1312 issue.issue_id, None, None,
1313 issue.merged_into_external, 'mergedinto'))
1314
1315 old_blocking = self.issuerelation_tbl.Select(
1316 cnxn, cols=ISSUERELATION_COLS[:-1],
1317 dst_issue_id=[issue.issue_id for issue in issues], kind='blockedon')
1318 relation_rows.extend([
1319 (row + (0,)) for row in blocking_rows if row not in old_blocking])
1320 delete_rows = [row for row in old_blocking if row not in blocking_rows]
1321
1322 for issue_id, dst_issue_id, kind in delete_rows:
1323 self.issuerelation_tbl.Delete(cnxn, issue_id=issue_id,
1324 dst_issue_id=dst_issue_id, kind=kind, commit=False)
1325 self.issuerelation_tbl.Delete(
1326 cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
1327 self.issuerelation_tbl.InsertRows(
1328 cnxn, ISSUERELATION_COLS, relation_rows, ignore=True, commit=commit)
1329 self.danglingrelation_tbl.Delete(
1330 cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
1331 self.danglingrelation_tbl.InsertRows(
1332 cnxn, DANGLINGRELATION_COLS, dangling_relation_rows, ignore=True,
1333 commit=commit)
1334
1335 def _UpdateIssuesModified(
1336 self, cnxn, iids, modified_timestamp=None, invalidate=True):
1337 """Store a modified timestamp for each of the specified issues."""
1338 if not iids:
1339 return
1340 delta = {'modified': modified_timestamp or int(time.time())}
1341 self.issue_tbl.Update(cnxn, delta, id=iids, commit=False)
1342 if invalidate:
1343 self.InvalidateIIDs(cnxn, iids)
1344
1345 def _UpdateIssuesApprovals(self, cnxn, issue, commit=True):
1346 """Update the Issue2ApprovalValue table rows for the given issue."""
1347 self.issue2approvalvalue_tbl.Delete(
1348 cnxn, issue_id=issue.issue_id, commit=commit)
1349 av_rows = [(av.approval_id, issue.issue_id, av.phase_id,
1350 av.status.name.lower(), av.setter_id, av.set_on) for
1351 av in issue.approval_values]
1352 self.issue2approvalvalue_tbl.InsertRows(
1353 cnxn, ISSUE2APPROVALVALUE_COLS, av_rows, commit=commit)
1354
1355 approver_rows = []
1356 for av in issue.approval_values:
1357 approver_rows.extend([(av.approval_id, approver_id, issue.issue_id)
1358 for approver_id in av.approver_ids])
1359 self.issueapproval2approver_tbl.Delete(
1360 cnxn, issue_id=issue.issue_id, commit=commit)
1361 self.issueapproval2approver_tbl.InsertRows(
1362 cnxn, ISSUEAPPROVAL2APPROVER_COLS, approver_rows, commit=commit)
1363
1364 def UpdateIssueStructure(self, cnxn, config, issue, template, reporter_id,
1365 comment_content, commit=True, invalidate=True):
1366 """Converts the phases and approvals structure of the issue into the
1367 structure of the given template."""
1368 # TODO(jojwang): Remove Field defs that belong to any removed approvals.
1369 approval_defs_by_id = {ad.approval_id: ad for ad in config.approval_defs}
1370 issue_avs_by_id = {av.approval_id: av for av in issue.approval_values}
1371
1372 new_approval_surveys = []
1373 new_issue_approvals = []
1374
1375 for template_av in template.approval_values:
1376 existing_issue_av = issue_avs_by_id.get(template_av.approval_id)
1377
1378 # Update all approval surveys so latest ApprovalDef survey changes
1379 # appear in the converted issue's approval values.
1380 ad = approval_defs_by_id.get(template_av.approval_id)
1381 new_av_approver_ids = []
1382 if ad:
1383 new_av_approver_ids = ad.approver_ids
1384 new_approval_surveys.append(
1385 self._MakeIssueComment(
1386 issue.project_id, reporter_id, ad.survey,
1387 is_description=True, approval_id=ad.approval_id))
1388 else:
1389 logging.info('ApprovalDef not found for approval %r', template_av)
1390
1391 # Keep approval values as-is if it exists in issue and template
1392 if existing_issue_av:
1393 new_av = tracker_bizobj.MakeApprovalValue(
1394 existing_issue_av.approval_id,
1395 approver_ids=existing_issue_av.approver_ids,
1396 status=existing_issue_av.status,
1397 setter_id=existing_issue_av.setter_id,
1398 set_on=existing_issue_av.set_on,
1399 phase_id=template_av.phase_id)
1400 new_issue_approvals.append(new_av)
1401 else:
1402 new_av = tracker_bizobj.MakeApprovalValue(
1403 template_av.approval_id, approver_ids=new_av_approver_ids,
1404 status=template_av.status, phase_id=template_av.phase_id)
1405 new_issue_approvals.append(new_av)
1406
1407 template_phase_by_name = {
1408 phase.name.lower(): phase for phase in template.phases}
1409 issue_phase_by_id = {phase.phase_id: phase for phase in issue.phases}
1410 updated_fvs = []
1411 # Trim issue FieldValues or update FieldValue phase_ids
1412 for fv in issue.field_values:
1413 # If a fv's phase has the same name as a template's phase, update
1414 # the fv's phase_id to that of the template phase's. Otherwise,
1415 # remove the fv.
1416 if fv.phase_id:
1417 issue_phase = issue_phase_by_id.get(fv.phase_id)
1418 if issue_phase and issue_phase.name:
1419 template_phase = template_phase_by_name.get(issue_phase.name.lower())
1420 # TODO(jojwang): monorail:4693, remove this after all 'stable-full'
1421 # gates have been renamed to 'stable'.
1422 if not template_phase:
1423 template_phase = template_phase_by_name.get(
1424 FLT_EQUIVALENT_GATES.get(issue_phase.name.lower()))
1425 if template_phase:
1426 fv.phase_id = template_phase.phase_id
1427 updated_fvs.append(fv)
1428 # keep all fvs that do not belong to phases.
1429 else:
1430 updated_fvs.append(fv)
1431
1432 fd_names_by_id = {fd.field_id: fd.field_name for fd in config.field_defs}
1433 amendment = tracker_bizobj.MakeApprovalStructureAmendment(
1434 [fd_names_by_id.get(av.approval_id) for av in new_issue_approvals],
1435 [fd_names_by_id.get(av.approval_id) for av in issue.approval_values])
1436
1437 # Update issue structure in RAM.
1438 issue.approval_values = new_issue_approvals
1439 issue.phases = template.phases
1440 issue.field_values = updated_fvs
1441
1442 # Update issue structure in DB.
1443 for survey in new_approval_surveys:
1444 survey.issue_id = issue.issue_id
1445 self.InsertComment(cnxn, survey, commit=False)
1446 self._UpdateIssuesApprovals(cnxn, issue, commit=False)
1447 self._UpdateIssuesFields(cnxn, [issue], commit=False)
1448 comment_pb = self.CreateIssueComment(
1449 cnxn, issue, reporter_id, comment_content,
1450 amendments=[amendment], commit=False)
1451
1452 if commit:
1453 cnxn.Commit()
1454
1455 if invalidate:
1456 self.InvalidateIIDs(cnxn, [issue.issue_id])
1457
1458 return comment_pb
1459
1460 def DeltaUpdateIssue(
1461 self, cnxn, services, reporter_id, project_id,
1462 config, issue, delta, index_now=False, comment=None, attachments=None,
1463 iids_to_invalidate=None, rules=None, predicate_asts=None,
1464 is_description=False, timestamp=None, kept_attachments=None,
1465 importer_id=None, inbound_message=None):
1466 """Update the issue in the database and return a set of update tuples.
1467
1468 Args:
1469 cnxn: connection to SQL database.
1470 services: connections to persistence layer.
1471 reporter_id: user ID of the user making this change.
1472 project_id: int ID for the current project.
1473 config: ProjectIssueConfig PB for this project.
1474 issue: Issue PB of issue to update.
1475 delta: IssueDelta object of fields to update.
1476 index_now: True if the issue should be updated in the full text index.
1477 comment: This should be the content of the comment
1478 corresponding to this change.
1479 attachments: List [(filename, contents, mimetype),...] of attachments.
1480 iids_to_invalidate: optional set of issue IDs that need to be invalidated.
1481 If provided, affected issues will be accumulated here and, the caller
1482 must call InvalidateIIDs() afterwards.
1483 rules: optional list of preloaded FilterRule PBs for this project.
1484 predicate_asts: optional list of QueryASTs for the rules. If rules are
1485 provided, then predicate_asts should also be provided.
1486 is_description: True if the comment is a new description for the issue.
1487 timestamp: int timestamp set during testing, otherwise defaults to
1488 int(time.time()).
1489 kept_attachments: This should be a list of int attachment ids for
1490 attachments kept from previous descriptions, if the comment is
1491 a change to the issue description
1492 importer_id: optional ID of user ID for an API client that is importing
1493 issues and attributing them to other users.
1494 inbound_message: optional string full text of an email that caused
1495 this comment to be added.
1496
1497 Returns:
1498 A tuple (amendments, comment_pb) with a list of Amendment PBs that
1499 describe the set of metadata updates that the user made, and the
1500 resulting IssueComment (or None if no comment was created).
1501 """
1502 timestamp = timestamp or int(time.time())
1503 old_effective_owner = tracker_bizobj.GetOwnerId(issue)
1504 old_effective_status = tracker_bizobj.GetStatus(issue)
1505 old_components = set(issue.component_ids)
1506
1507 logging.info(
1508 'Bulk edit to project_id %s issue.local_id %s, comment %r',
1509 project_id, issue.local_id, comment)
1510 if iids_to_invalidate is None:
1511 iids_to_invalidate = set([issue.issue_id])
1512 invalidate = True
1513 else:
1514 iids_to_invalidate.add(issue.issue_id)
1515 invalidate = False # Caller will do it.
1516
1517 # Store each updated value in the issue PB, and compute Update PBs
1518 amendments, impacted_iids = tracker_bizobj.ApplyIssueDelta(
1519 cnxn, self, issue, delta, config)
1520 iids_to_invalidate.update(impacted_iids)
1521
1522 # If this was a no-op with no comment, bail out and don't save,
1523 # invalidate, or re-index anything.
1524 if (not amendments and (not comment or not comment.strip()) and
1525 not attachments):
1526 logging.info('No amendments, comment, attachments: this is a no-op.')
1527 return [], None
1528
1529 # Note: no need to check for collisions when the user is doing a delta.
1530
1531 # update the modified_timestamp for any comment added, even if it was
1532 # just a text comment with no issue fields changed.
1533 issue.modified_timestamp = timestamp
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01001534 issue.migration_modified_timestamp = timestamp
Copybara854996b2021-09-07 19:36:02 +00001535
1536 # Update the closed timestamp before filter rules so that rules
1537 # can test for closed_timestamp, and also after filter rules
1538 # so that closed_timestamp will be set if the issue is closed by the rule.
1539 tracker_helpers.UpdateClosedTimestamp(config, issue, old_effective_status)
1540 if rules is None:
1541 logging.info('Rules were not given')
1542 rules = services.features.GetFilterRules(cnxn, config.project_id)
1543 predicate_asts = filterrules_helpers.ParsePredicateASTs(
1544 rules, config, [])
1545
1546 filterrules_helpers.ApplyGivenRules(
1547 cnxn, services, issue, config, rules, predicate_asts)
1548 tracker_helpers.UpdateClosedTimestamp(config, issue, old_effective_status)
1549 if old_effective_owner != tracker_bizobj.GetOwnerId(issue):
1550 issue.owner_modified_timestamp = timestamp
1551 if old_effective_status != tracker_bizobj.GetStatus(issue):
1552 issue.status_modified_timestamp = timestamp
1553 if old_components != set(issue.component_ids):
1554 issue.component_modified_timestamp = timestamp
1555
1556 # Store the issue in SQL.
1557 self.UpdateIssue(cnxn, issue, commit=False, invalidate=False)
1558
1559 comment_pb = self.CreateIssueComment(
1560 cnxn, issue, reporter_id, comment, amendments=amendments,
1561 is_description=is_description, attachments=attachments, commit=False,
1562 kept_attachments=kept_attachments, timestamp=timestamp,
1563 importer_id=importer_id, inbound_message=inbound_message)
1564 self._UpdateIssuesModified(
1565 cnxn, iids_to_invalidate, modified_timestamp=issue.modified_timestamp,
1566 invalidate=invalidate)
1567
1568 # Add a comment to the newly added issues saying they are now blocking
1569 # this issue.
1570 for add_issue in self.GetIssues(cnxn, delta.blocked_on_add):
1571 self.CreateIssueComment(
1572 cnxn, add_issue, reporter_id, content='',
1573 amendments=[tracker_bizobj.MakeBlockingAmendment(
1574 [(issue.project_name, issue.local_id)], [],
1575 default_project_name=add_issue.project_name)],
1576 timestamp=timestamp, importer_id=importer_id)
1577 # Add a comment to the newly removed issues saying they are no longer
1578 # blocking this issue.
1579 for remove_issue in self.GetIssues(cnxn, delta.blocked_on_remove):
1580 self.CreateIssueComment(
1581 cnxn, remove_issue, reporter_id, content='',
1582 amendments=[tracker_bizobj.MakeBlockingAmendment(
1583 [], [(issue.project_name, issue.local_id)],
1584 default_project_name=remove_issue.project_name)],
1585 timestamp=timestamp, importer_id=importer_id)
1586
1587 # Add a comment to the newly added issues saying they are now blocked on
1588 # this issue.
1589 for add_issue in self.GetIssues(cnxn, delta.blocking_add):
1590 self.CreateIssueComment(
1591 cnxn, add_issue, reporter_id, content='',
1592 amendments=[tracker_bizobj.MakeBlockedOnAmendment(
1593 [(issue.project_name, issue.local_id)], [],
1594 default_project_name=add_issue.project_name)],
1595 timestamp=timestamp, importer_id=importer_id)
1596 # Add a comment to the newly removed issues saying they are no longer
1597 # blocked on this issue.
1598 for remove_issue in self.GetIssues(cnxn, delta.blocking_remove):
1599 self.CreateIssueComment(
1600 cnxn, remove_issue, reporter_id, content='',
1601 amendments=[tracker_bizobj.MakeBlockedOnAmendment(
1602 [], [(issue.project_name, issue.local_id)],
1603 default_project_name=remove_issue.project_name)],
1604 timestamp=timestamp, importer_id=importer_id)
1605
1606 if not invalidate:
1607 cnxn.Commit()
1608
1609 if index_now:
1610 tracker_fulltext.IndexIssues(
1611 cnxn, [issue], services.user_service, self, self._config_service)
1612 else:
1613 self.EnqueueIssuesForIndexing(cnxn, [issue.issue_id])
1614
1615 return amendments, comment_pb
1616
1617 def InvalidateIIDs(self, cnxn, iids_to_invalidate):
1618 """Invalidate the specified issues in the Invalidate table and memcache."""
1619 issues_to_invalidate = self.GetIssues(cnxn, iids_to_invalidate)
1620 self.InvalidateIssues(cnxn, issues_to_invalidate)
1621
1622 def InvalidateIssues(self, cnxn, issues):
1623 """Invalidate the specified issues in the Invalidate table and memcache."""
1624 iids = [issue.issue_id for issue in issues]
1625 self.issue_2lc.InvalidateKeys(cnxn, iids)
1626 self._config_service.InvalidateMemcache(issues)
1627
1628 def RelateIssues(self, cnxn, issue_relation_dict, commit=True):
1629 """Update the IssueRelation table rows for the given relationships.
1630
1631 issue_relation_dict is a mapping of 'source' issues to 'destination' issues,
1632 paired with the kind of relationship connecting the two.
1633 """
1634 relation_rows = []
1635 for src_iid, dests in issue_relation_dict.items():
1636 for dst_iid, kind in dests:
1637 if kind == 'blocking':
1638 relation_rows.append((dst_iid, src_iid, 'blockedon', 0))
1639 elif kind == 'blockedon':
1640 relation_rows.append((src_iid, dst_iid, 'blockedon', 0))
1641 elif kind == 'mergedinto':
1642 relation_rows.append((src_iid, dst_iid, 'mergedinto', None))
1643
1644 self.issuerelation_tbl.InsertRows(
1645 cnxn, ISSUERELATION_COLS, relation_rows, ignore=True, commit=commit)
1646
1647 def CopyIssues(self, cnxn, dest_project, issues, user_service, copier_id):
1648 """Copy the given issues into the destination project."""
1649 created_issues = []
1650 iids_to_invalidate = set()
1651
1652 for target_issue in issues:
1653 assert not target_issue.assume_stale, (
1654 'issue2514: Copying issue that might be stale: %r' % target_issue)
1655 new_issue = tracker_pb2.Issue()
1656 new_issue.project_id = dest_project.project_id
1657 new_issue.project_name = dest_project.project_name
1658 new_issue.summary = target_issue.summary
1659 new_issue.labels.extend(target_issue.labels)
1660 new_issue.field_values.extend(target_issue.field_values)
1661 new_issue.reporter_id = copier_id
1662
1663 timestamp = int(time.time())
1664 new_issue.opened_timestamp = timestamp
1665 new_issue.modified_timestamp = timestamp
1666
1667 target_comments = self.GetCommentsForIssue(cnxn, target_issue.issue_id)
1668 initial_summary_comment = target_comments[0]
1669
1670 # Note that blocking and merge_into are not copied.
1671 if target_issue.blocked_on_iids:
1672 blocked_on = target_issue.blocked_on_iids
1673 iids_to_invalidate.update(blocked_on)
1674 new_issue.blocked_on_iids = blocked_on
1675
1676 # Gather list of attachments from the target issue's summary comment.
1677 # MakeIssueComments expects a list of [(filename, contents, mimetype),...]
1678 attachments = []
1679 for attachment in initial_summary_comment.attachments:
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +02001680 client = storage.Client()
1681 bucket = client.get_bucket(app_identity.get_default_gcs_bucket_name)
1682 blob = bucket.get_blob(attachment.gcs_object_id)
1683 content = blob.download_as_bytes()
1684 attachments.append([attachment.filename, content, attachment.mimetype])
Copybara854996b2021-09-07 19:36:02 +00001685
1686 if attachments:
1687 new_issue.attachment_count = len(attachments)
1688
1689 # Create the same summary comment as the target issue.
1690 comment = self._MakeIssueComment(
1691 dest_project.project_id, copier_id, initial_summary_comment.content,
1692 attachments=attachments, timestamp=timestamp, is_description=True)
1693
1694 new_issue.local_id = self.AllocateNextLocalID(
1695 cnxn, dest_project.project_id)
1696 issue_id = self.InsertIssue(cnxn, new_issue)
1697 comment.issue_id = issue_id
1698 self.InsertComment(cnxn, comment)
1699
1700 if permissions.HasRestrictions(new_issue, 'view'):
1701 self._config_service.InvalidateMemcache(
1702 [new_issue], key_prefix='nonviewable:')
1703
1704 tracker_fulltext.IndexIssues(
1705 cnxn, [new_issue], user_service, self, self._config_service)
1706 created_issues.append(new_issue)
1707
1708 # The referenced issues are all modified when the relationship is added.
1709 self._UpdateIssuesModified(
1710 cnxn, iids_to_invalidate, modified_timestamp=timestamp)
1711
1712 return created_issues
1713
1714 def MoveIssues(self, cnxn, dest_project, issues, user_service):
1715 """Move the given issues into the destination project."""
1716 old_location_rows = [
1717 (issue.issue_id, issue.project_id, issue.local_id)
1718 for issue in issues]
1719 moved_back_iids = set()
1720
1721 former_locations_in_project = self.issueformerlocations_tbl.Select(
1722 cnxn, cols=ISSUEFORMERLOCATIONS_COLS,
1723 project_id=dest_project.project_id,
1724 issue_id=[issue.issue_id for issue in issues])
1725 former_locations = {
1726 issue_id: local_id
1727 for issue_id, project_id, local_id in former_locations_in_project}
1728
1729 # Remove the issue id from issue_id_2lc so that it does not stay
1730 # around in cache and memcache.
1731 # The Key of IssueIDTwoLevelCache is (project_id, local_id).
1732 self.issue_id_2lc.InvalidateKeys(
1733 cnxn, [(issue.project_id, issue.local_id) for issue in issues])
1734 self.InvalidateIssues(cnxn, issues)
1735
1736 for issue in issues:
1737 if issue.issue_id in former_locations:
1738 dest_id = former_locations[issue.issue_id]
1739 moved_back_iids.add(issue.issue_id)
1740 else:
1741 dest_id = self.AllocateNextLocalID(cnxn, dest_project.project_id)
1742
1743 issue.local_id = dest_id
1744 issue.project_id = dest_project.project_id
1745 issue.project_name = dest_project.project_name
1746
1747 # Rewrite each whole issue so that status and label IDs are looked up
1748 # in the context of the destination project.
1749 self.UpdateIssues(cnxn, issues)
1750
1751 # Comments also have the project_id because it is needed for an index.
1752 self.comment_tbl.Update(
1753 cnxn, {'project_id': dest_project.project_id},
1754 issue_id=[issue.issue_id for issue in issues], commit=False)
1755
1756 # Record old locations so that we can offer links if the user looks there.
1757 self.issueformerlocations_tbl.InsertRows(
1758 cnxn, ISSUEFORMERLOCATIONS_COLS, old_location_rows, ignore=True,
1759 commit=False)
1760 cnxn.Commit()
1761
1762 tracker_fulltext.IndexIssues(
1763 cnxn, issues, user_service, self, self._config_service)
1764
1765 return moved_back_iids
1766
1767 def ExpungeFormerLocations(self, cnxn, project_id):
1768 """Delete history of issues that were in this project but moved out."""
1769 self.issueformerlocations_tbl.Delete(cnxn, project_id=project_id)
1770
1771 def ExpungeIssues(self, cnxn, issue_ids):
1772 """Completely delete the specified issues from the database."""
1773 logging.info('expunging the issues %r', issue_ids)
1774 tracker_fulltext.UnindexIssues(issue_ids)
1775
1776 remaining_iids = issue_ids[:]
1777
1778 # Note: these are purposely not done in a transaction to allow
1779 # incremental progress in what might be a very large change.
1780 # We are not concerned about non-atomic deletes because all
1781 # this data will be gone eventually anyway.
1782 while remaining_iids:
1783 iids_in_chunk = remaining_iids[:CHUNK_SIZE]
1784 remaining_iids = remaining_iids[CHUNK_SIZE:]
1785 self.issuesummary_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1786 self.issue2label_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1787 self.issue2component_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1788 self.issue2cc_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1789 self.issue2notify_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1790 self.issueupdate_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1791 self.attachment_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1792 self.comment_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1793 self.issuerelation_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1794 self.issuerelation_tbl.Delete(cnxn, dst_issue_id=iids_in_chunk)
1795 self.danglingrelation_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1796 self.issueformerlocations_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1797 self.reindexqueue_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1798 self.issue_tbl.Delete(cnxn, id=iids_in_chunk)
1799
1800 def SoftDeleteIssue(self, cnxn, project_id, local_id, deleted, user_service):
1801 """Set the deleted boolean on the indicated issue and store it.
1802
1803 Args:
1804 cnxn: connection to SQL database.
1805 project_id: int project ID for the current project.
1806 local_id: int local ID of the issue to freeze/unfreeze.
1807 deleted: boolean, True to soft-delete, False to undelete.
1808 user_service: persistence layer for users, used to lookup user IDs.
1809 """
1810 issue = self.GetIssueByLocalID(cnxn, project_id, local_id, use_cache=False)
1811 issue.deleted = deleted
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01001812 issue.migration_modified_timestamp = int(time.time())
1813 self.UpdateIssue(cnxn, issue, update_cols=['deleted', 'migration_modified'])
Copybara854996b2021-09-07 19:36:02 +00001814 tracker_fulltext.IndexIssues(
1815 cnxn, [issue], user_service, self, self._config_service)
1816
1817 def DeleteComponentReferences(self, cnxn, component_id):
1818 """Delete any references to the specified component."""
1819 # TODO(jrobbins): add tasks to re-index any affected issues.
1820 # Note: if this call fails, some data could be left
1821 # behind, but it would not be displayed, and it could always be
1822 # GC'd from the DB later.
1823 self.issue2component_tbl.Delete(cnxn, component_id=component_id)
1824
1825 ### Local ID generation
1826
1827 def InitializeLocalID(self, cnxn, project_id):
1828 """Initialize the local ID counter for the specified project to zero.
1829
1830 Args:
1831 cnxn: connection to SQL database.
1832 project_id: int ID of the project.
1833 """
1834 self.localidcounter_tbl.InsertRow(
1835 cnxn, project_id=project_id, used_local_id=0, used_spam_id=0)
1836
1837 def SetUsedLocalID(self, cnxn, project_id):
1838 """Set the local ID counter based on existing issues.
1839
1840 Args:
1841 cnxn: connection to SQL database.
1842 project_id: int ID of the project.
1843 """
1844 highest_id = self.GetHighestLocalID(cnxn, project_id)
1845 self.localidcounter_tbl.InsertRow(
1846 cnxn, replace=True, used_local_id=highest_id, project_id=project_id)
1847 return highest_id
1848
1849 def AllocateNextLocalID(self, cnxn, project_id):
1850 """Return the next available issue ID in the specified project.
1851
1852 Args:
1853 cnxn: connection to SQL database.
1854 project_id: int ID of the project.
1855
1856 Returns:
1857 The next local ID.
1858 """
1859 try:
1860 next_local_id = self.localidcounter_tbl.IncrementCounterValue(
1861 cnxn, 'used_local_id', project_id=project_id)
1862 except AssertionError as e:
1863 logging.info('exception incrementing local_id counter: %s', e)
1864 next_local_id = self.SetUsedLocalID(cnxn, project_id) + 1
1865 return next_local_id
1866
1867 def GetHighestLocalID(self, cnxn, project_id):
1868 """Return the highest used issue ID in the specified project.
1869
1870 Args:
1871 cnxn: connection to SQL database.
1872 project_id: int ID of the project.
1873
1874 Returns:
1875 The highest local ID for an active or moved issues.
1876 """
1877 highest = self.issue_tbl.SelectValue(
1878 cnxn, 'MAX(local_id)', project_id=project_id)
1879 highest = highest or 0 # It will be None if the project has no issues.
1880 highest_former = self.issueformerlocations_tbl.SelectValue(
1881 cnxn, 'MAX(local_id)', project_id=project_id)
1882 highest_former = highest_former or 0
1883 return max(highest, highest_former)
1884
1885 def GetAllLocalIDsInProject(self, cnxn, project_id, min_local_id=None):
1886 """Return the list of local IDs only, not the actual issues.
1887
1888 Args:
1889 cnxn: connection to SQL database.
1890 project_id: the ID of the project to which the issue belongs.
1891 min_local_id: point to start at.
1892
1893 Returns:
1894 A range object of local IDs from 1 to N, or from min_local_id to N. It
1895 may be the case that some of those local IDs are no longer used, e.g.,
1896 if some issues were moved out of this project.
1897 """
1898 if not min_local_id:
1899 min_local_id = 1
1900 highest_local_id = self.GetHighestLocalID(cnxn, project_id)
1901 return list(range(min_local_id, highest_local_id + 1))
1902
1903 def ExpungeLocalIDCounters(self, cnxn, project_id):
1904 """Delete history of local ids that were in this project."""
1905 self.localidcounter_tbl.Delete(cnxn, project_id=project_id)
1906
1907 ### Comments
1908
1909 def _UnpackComment(
1910 self, comment_row, content_dict, inbound_message_dict, approval_dict,
1911 importer_dict):
1912 """Partially construct a Comment PB from a DB row."""
1913 (comment_id, issue_id, created, project_id, commenter_id,
1914 deleted_by, is_spam, is_description, commentcontent_id) = comment_row
1915 comment = tracker_pb2.IssueComment()
1916 comment.id = comment_id
1917 comment.issue_id = issue_id
1918 comment.timestamp = created
1919 comment.project_id = project_id
1920 comment.user_id = commenter_id
1921 comment.content = content_dict.get(commentcontent_id, '')
1922 comment.inbound_message = inbound_message_dict.get(commentcontent_id, '')
1923 comment.deleted_by = deleted_by or 0
1924 comment.is_spam = bool(is_spam)
1925 comment.is_description = bool(is_description)
1926 comment.approval_id = approval_dict.get(comment_id)
1927 comment.importer_id = importer_dict.get(comment_id)
1928 return comment
1929
1930 def _UnpackAmendment(self, amendment_row):
1931 """Construct an Amendment PB from a DB row."""
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01001932 (
1933 _id, _issue_id, comment_id, field_name, old_value, new_value,
1934 added_user_id, removed_user_id, custom_field_name, added_component_id,
1935 removed_component_id) = amendment_row
Copybara854996b2021-09-07 19:36:02 +00001936 amendment = tracker_pb2.Amendment()
1937 field_enum = tracker_pb2.FieldID(field_name.upper())
1938 amendment.field = field_enum
1939
1940 # TODO(jrobbins): display old values in more cases.
1941 if new_value is not None:
1942 amendment.newvalue = new_value
1943 if old_value is not None:
1944 amendment.oldvalue = old_value
1945 if added_user_id:
1946 amendment.added_user_ids.append(added_user_id)
1947 if removed_user_id:
1948 amendment.removed_user_ids.append(removed_user_id)
1949 if custom_field_name:
1950 amendment.custom_field_name = custom_field_name
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01001951 if added_component_id:
1952 added_component_id = int(added_component_id)
1953 amendment.added_component_ids.append(added_component_id)
1954 if removed_component_id:
1955 removed_component_id = int(removed_component_id)
1956 amendment.removed_component_ids.append(removed_component_id)
Copybara854996b2021-09-07 19:36:02 +00001957 return amendment, comment_id
1958
1959 def _ConsolidateAmendments(self, amendments):
1960 """Consoliodate amendments of the same field in one comment into one
1961 amendment PB."""
1962
1963 fields_dict = {}
1964 result = []
1965
1966 for amendment in amendments:
1967 key = amendment.field, amendment.custom_field_name
1968 fields_dict.setdefault(key, []).append(amendment)
1969 for (field, _custom_name), sorted_amendments in sorted(fields_dict.items()):
1970 new_amendment = tracker_pb2.Amendment()
1971 new_amendment.field = field
1972 for amendment in sorted_amendments:
1973 if amendment.newvalue is not None:
1974 if new_amendment.newvalue is not None:
1975 # NOTE: see crbug/monorail/8272. BLOCKEDON and BLOCKING changes
1976 # are all stored in newvalue e.g. (newvalue = -b/123 b/124) and
1977 # external bugs and monorail bugs are stored in separate amendments.
1978 # Without this, the values of external bug amendments and monorail
1979 # blocker bug amendments may overwrite each other.
1980 new_amendment.newvalue += (' ' + amendment.newvalue)
1981 else:
1982 new_amendment.newvalue = amendment.newvalue
1983 if amendment.oldvalue is not None:
1984 new_amendment.oldvalue = amendment.oldvalue
1985 if amendment.added_user_ids:
1986 new_amendment.added_user_ids.extend(amendment.added_user_ids)
1987 if amendment.removed_user_ids:
1988 new_amendment.removed_user_ids.extend(amendment.removed_user_ids)
1989 if amendment.custom_field_name:
1990 new_amendment.custom_field_name = amendment.custom_field_name
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01001991 if amendment.added_component_ids:
1992 new_amendment.added_component_ids.extend(
1993 amendment.added_component_ids)
1994 if amendment.removed_component_ids:
1995 new_amendment.removed_component_ids.extend(
1996 amendment.removed_component_ids)
Copybara854996b2021-09-07 19:36:02 +00001997 result.append(new_amendment)
1998 return result
1999
2000 def _UnpackAttachment(self, attachment_row):
2001 """Construct an Attachment PB from a DB row."""
2002 (attachment_id, _issue_id, comment_id, filename, filesize, mimetype,
2003 deleted, gcs_object_id) = attachment_row
2004 attach = tracker_pb2.Attachment()
2005 attach.attachment_id = attachment_id
2006 attach.filename = filename
2007 attach.filesize = filesize
2008 attach.mimetype = mimetype
2009 attach.deleted = bool(deleted)
2010 attach.gcs_object_id = gcs_object_id
2011 return attach, comment_id
2012
2013 def _DeserializeComments(
2014 self, comment_rows, commentcontent_rows, amendment_rows, attachment_rows,
2015 approval_rows, importer_rows):
2016 """Turn rows into IssueComment PBs."""
2017 results = [] # keep objects in the same order as the rows
2018 results_dict = {} # for fast access when joining.
2019
2020 content_dict = dict(
2021 (commentcontent_id, content) for
2022 commentcontent_id, content, _ in commentcontent_rows)
2023 inbound_message_dict = dict(
2024 (commentcontent_id, inbound_message) for
2025 commentcontent_id, _, inbound_message in commentcontent_rows)
2026 approval_dict = dict(
2027 (comment_id, approval_id) for approval_id, comment_id in
2028 approval_rows)
2029 importer_dict = dict(importer_rows)
2030
2031 for comment_row in comment_rows:
2032 comment = self._UnpackComment(
2033 comment_row, content_dict, inbound_message_dict, approval_dict,
2034 importer_dict)
2035 results.append(comment)
2036 results_dict[comment.id] = comment
2037
2038 for amendment_row in amendment_rows:
2039 amendment, comment_id = self._UnpackAmendment(amendment_row)
2040 try:
2041 results_dict[comment_id].amendments.extend([amendment])
2042 except KeyError:
2043 logging.error('Found amendment for missing comment: %r', comment_id)
2044
2045 for attachment_row in attachment_rows:
2046 attach, comment_id = self._UnpackAttachment(attachment_row)
2047 try:
2048 results_dict[comment_id].attachments.append(attach)
2049 except KeyError:
2050 logging.error('Found attachment for missing comment: %r', comment_id)
2051
2052 for c in results:
2053 c.amendments = self._ConsolidateAmendments(c.amendments)
2054
2055 return results
2056
2057 # TODO(jrobbins): make this a private method and expose just the interface
2058 # needed by activities.py.
2059 def GetComments(
2060 self, cnxn, where=None, order_by=None, content_only=False, **kwargs):
2061 """Retrieve comments from SQL."""
2062 shard_id = sql.RandomShardID()
2063 order_by = order_by or [('created', [])]
2064 comment_rows = self.comment_tbl.Select(
2065 cnxn, cols=COMMENT_COLS, where=where,
2066 order_by=order_by, shard_id=shard_id, **kwargs)
2067 cids = [row[0] for row in comment_rows]
2068 commentcontent_ids = [row[-1] for row in comment_rows]
2069 content_rows = self.commentcontent_tbl.Select(
2070 cnxn, cols=COMMENTCONTENT_COLS, id=commentcontent_ids,
2071 shard_id=shard_id)
2072 approval_rows = self.issueapproval2comment_tbl.Select(
2073 cnxn, cols=ISSUEAPPROVAL2COMMENT_COLS, comment_id=cids)
2074 amendment_rows = []
2075 attachment_rows = []
2076 importer_rows = []
2077 if not content_only:
2078 amendment_rows = self.issueupdate_tbl.Select(
2079 cnxn, cols=ISSUEUPDATE_COLS, comment_id=cids, shard_id=shard_id)
2080 attachment_rows = self.attachment_tbl.Select(
2081 cnxn, cols=ATTACHMENT_COLS, comment_id=cids, shard_id=shard_id)
2082 importer_rows = self.commentimporter_tbl.Select(
2083 cnxn, cols=COMMENTIMPORTER_COLS, comment_id=cids, shard_id=shard_id)
2084
2085 comments = self._DeserializeComments(
2086 comment_rows, content_rows, amendment_rows, attachment_rows,
2087 approval_rows, importer_rows)
2088 return comments
2089
2090 def GetComment(self, cnxn, comment_id):
2091 """Get the requested comment, or raise an exception."""
2092 comments = self.GetComments(cnxn, id=comment_id)
2093 try:
2094 return comments[0]
2095 except IndexError:
2096 raise exceptions.NoSuchCommentException()
2097
2098 def GetCommentsForIssue(self, cnxn, issue_id):
2099 """Return all IssueComment PBs for the specified issue.
2100
2101 Args:
2102 cnxn: connection to SQL database.
2103 issue_id: int global ID of the issue.
2104
2105 Returns:
2106 A list of the IssueComment protocol buffers for the description
2107 and comments on this issue.
2108 """
2109 comments = self.GetComments(cnxn, issue_id=[issue_id])
2110 for i, comment in enumerate(comments):
2111 comment.sequence = i
2112
2113 return comments
2114
2115
2116 def GetCommentsByID(self, cnxn, comment_ids, sequences, use_cache=True,
2117 shard_id=None):
2118 """Return all IssueComment PBs by comment ids.
2119
2120 Args:
2121 cnxn: connection to SQL database.
2122 comment_ids: a list of comment ids.
2123 sequences: sequence of the comments.
2124 use_cache: optional boolean to enable the cache.
2125 shard_id: optional int shard_id to limit retrieval.
2126
2127 Returns:
2128 A list of the IssueComment protocol buffers for comment_ids.
2129 """
2130 # Try loading issue comments from a random shard to reduce load on
2131 # primary DB.
2132 if shard_id is None:
2133 shard_id = sql.RandomShardID()
2134
2135 comment_dict, _missed_comments = self.comment_2lc.GetAll(cnxn, comment_ids,
2136 use_cache=use_cache, shard_id=shard_id)
2137
2138 comments = sorted(list(comment_dict.values()), key=lambda x: x.timestamp)
2139
2140 for i in range(len(comment_ids)):
2141 comments[i].sequence = sequences[i]
2142
2143 return comments
2144
2145 # TODO(jrobbins): remove this method because it is too slow when an issue
2146 # has a huge number of comments.
2147 def GetCommentsForIssues(self, cnxn, issue_ids, content_only=False):
2148 """Return all IssueComment PBs for each issue ID in the given list.
2149
2150 Args:
2151 cnxn: connection to SQL database.
2152 issue_ids: list of integer global issue IDs.
2153 content_only: optional boolean, set true for faster loading of
2154 comment content without attachments and amendments.
2155
2156 Returns:
2157 Dict {issue_id: [IssueComment, ...]} with IssueComment protocol
2158 buffers for the description and comments on each issue.
2159 """
2160 comments = self.GetComments(
2161 cnxn, issue_id=issue_ids, content_only=content_only)
2162
2163 comments_dict = collections.defaultdict(list)
2164 for comment in comments:
2165 comment.sequence = len(comments_dict[comment.issue_id])
2166 comments_dict[comment.issue_id].append(comment)
2167
2168 return comments_dict
2169
2170 def InsertComment(self, cnxn, comment, commit=True):
2171 """Store the given issue comment in SQL.
2172
2173 Args:
2174 cnxn: connection to SQL database.
2175 comment: IssueComment PB to insert into the database.
2176 commit: set to False to avoid doing the commit for now.
2177 """
2178 commentcontent_id = self.commentcontent_tbl.InsertRow(
2179 cnxn, content=comment.content,
2180 inbound_message=comment.inbound_message, commit=False)
2181 comment_id = self.comment_tbl.InsertRow(
2182 cnxn, issue_id=comment.issue_id, created=comment.timestamp,
2183 project_id=comment.project_id,
2184 commenter_id=comment.user_id,
2185 deleted_by=comment.deleted_by or None,
2186 is_spam=comment.is_spam, is_description=comment.is_description,
2187 commentcontent_id=commentcontent_id,
2188 commit=False)
2189 comment.id = comment_id
2190 if comment.importer_id:
2191 self.commentimporter_tbl.InsertRow(
2192 cnxn, comment_id=comment_id, importer_id=comment.importer_id)
2193
2194 amendment_rows = []
2195 for amendment in comment.amendments:
2196 field_enum = str(amendment.field).lower()
2197 if (amendment.get_assigned_value('newvalue') is not None and
2198 not amendment.added_user_ids and not amendment.removed_user_ids):
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01002199 amendment_rows.append(
2200 (
2201 comment.issue_id, comment_id, field_enum, amendment.oldvalue,
2202 amendment.newvalue, None, None, amendment.custom_field_name,
2203 None, None))
Copybara854996b2021-09-07 19:36:02 +00002204 for added_user_id in amendment.added_user_ids:
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01002205 amendment_rows.append(
2206 (
2207 comment.issue_id, comment_id, field_enum, None, None,
2208 added_user_id, None, amendment.custom_field_name, None, None))
Copybara854996b2021-09-07 19:36:02 +00002209 for removed_user_id in amendment.removed_user_ids:
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01002210 amendment_rows.append(
2211 (
2212 comment.issue_id, comment_id, field_enum, None, None, None,
2213 removed_user_id, amendment.custom_field_name, None, None))
2214 for added_component_id in amendment.added_component_ids:
2215 amendment_rows.append(
2216 (
2217 comment.issue_id, comment_id, field_enum, None, None, None,
2218 None, amendment.custom_field_name, added_component_id, None))
2219 for removed_component_id in amendment.removed_component_ids:
2220 amendment_rows.append(
2221 (
2222 comment.issue_id, comment_id, field_enum, None, None, None,
2223 None, amendment.custom_field_name, None, removed_component_id))
Copybara854996b2021-09-07 19:36:02 +00002224 # ISSUEUPDATE_COLS[1:] to skip id column.
2225 self.issueupdate_tbl.InsertRows(
2226 cnxn, ISSUEUPDATE_COLS[1:], amendment_rows, commit=False)
2227
2228 attachment_rows = []
2229 for attach in comment.attachments:
2230 attachment_rows.append([
2231 comment.issue_id, comment.id, attach.filename, attach.filesize,
2232 attach.mimetype, attach.deleted, attach.gcs_object_id])
2233 self.attachment_tbl.InsertRows(
2234 cnxn, ATTACHMENT_COLS[1:], attachment_rows, commit=False)
2235
2236 if comment.approval_id:
2237 self.issueapproval2comment_tbl.InsertRows(
2238 cnxn, ISSUEAPPROVAL2COMMENT_COLS,
2239 [(comment.approval_id, comment_id)], commit=False)
2240
2241 if commit:
2242 cnxn.Commit()
2243
2244 def _UpdateComment(self, cnxn, comment, update_cols=None):
2245 """Update the given issue comment in SQL.
2246
2247 Args:
2248 cnxn: connection to SQL database.
2249 comment: IssueComment PB to update in the database.
2250 update_cols: optional list of just the field names to update.
2251 """
2252 delta = {
2253 'commenter_id': comment.user_id,
2254 'deleted_by': comment.deleted_by or None,
2255 'is_spam': comment.is_spam,
2256 }
2257 if update_cols is not None:
2258 delta = {key: val for key, val in delta.items()
2259 if key in update_cols}
2260
2261 self.comment_tbl.Update(cnxn, delta, id=comment.id)
2262 self.comment_2lc.InvalidateKeys(cnxn, [comment.id])
2263
2264 def _MakeIssueComment(
2265 self, project_id, user_id, content, inbound_message=None,
2266 amendments=None, attachments=None, kept_attachments=None, timestamp=None,
2267 is_spam=False, is_description=False, approval_id=None, importer_id=None):
2268 """Create in IssueComment protocol buffer in RAM.
2269
2270 Args:
2271 project_id: Project with the issue.
2272 user_id: the user ID of the user who entered the comment.
2273 content: string body of the comment.
2274 inbound_message: optional string full text of an email that
2275 caused this comment to be added.
2276 amendments: list of Amendment PBs describing the
2277 metadata changes that the user made along w/ comment.
2278 attachments: [(filename, contents, mimetype),...] attachments uploaded at
2279 the time the comment was made.
2280 kept_attachments: list of Attachment PBs for attachments kept from
2281 previous descriptions, if the comment is a description
2282 timestamp: time at which the comment was made, defaults to now.
2283 is_spam: True if the comment was classified as spam.
2284 is_description: True if the comment is a description for the issue.
2285 approval_id: id, if any, of the APPROVAL_TYPE FieldDef this comment
2286 belongs to.
2287 importer_id: optional User ID of script that imported the comment on
2288 behalf of a user.
2289
2290 Returns:
2291 The new IssueComment protocol buffer.
2292
2293 The content may have some markup done during input processing.
2294
2295 Any attachments are immediately stored.
2296 """
2297 comment = tracker_pb2.IssueComment()
2298 comment.project_id = project_id
2299 comment.user_id = user_id
2300 comment.content = content or ''
2301 comment.is_spam = is_spam
2302 comment.is_description = is_description
2303 if not timestamp:
2304 timestamp = int(time.time())
2305 comment.timestamp = int(timestamp)
2306 if inbound_message:
2307 comment.inbound_message = inbound_message
2308 if amendments:
2309 logging.info('amendments is %r', amendments)
2310 comment.amendments.extend(amendments)
2311 if approval_id:
2312 comment.approval_id = approval_id
2313
2314 if attachments:
2315 for filename, body, mimetype in attachments:
2316 gcs_object_id = gcs_helpers.StoreObjectInGCS(
2317 body, mimetype, project_id, filename=filename)
2318 attach = tracker_pb2.Attachment()
2319 # attachment id is determined later by the SQL DB.
2320 attach.filename = filename
2321 attach.filesize = len(body)
2322 attach.mimetype = mimetype
2323 attach.gcs_object_id = gcs_object_id
2324 comment.attachments.extend([attach])
2325 logging.info("Save attachment with object_id: %s" % gcs_object_id)
2326
2327 if kept_attachments:
2328 for kept_attach in kept_attachments:
2329 (filename, filesize, mimetype, deleted,
2330 gcs_object_id) = kept_attach[3:]
2331 new_attach = tracker_pb2.Attachment(
2332 filename=filename, filesize=filesize, mimetype=mimetype,
2333 deleted=bool(deleted), gcs_object_id=gcs_object_id)
2334 comment.attachments.append(new_attach)
2335 logging.info("Copy attachment with object_id: %s" % gcs_object_id)
2336
2337 if importer_id:
2338 comment.importer_id = importer_id
2339
2340 return comment
2341
2342 def CreateIssueComment(
2343 self, cnxn, issue, user_id, content, inbound_message=None,
2344 amendments=None, attachments=None, kept_attachments=None, timestamp=None,
2345 is_spam=False, is_description=False, approval_id=None, commit=True,
2346 importer_id=None):
2347 """Create and store a new comment on the specified issue.
2348
2349 Args:
2350 cnxn: connection to SQL database.
2351 issue: the issue on which to add the comment, must be loaded from
2352 database with use_cache=False so that assume_stale == False.
2353 user_id: the user ID of the user who entered the comment.
2354 content: string body of the comment.
2355 inbound_message: optional string full text of an email that caused
2356 this comment to be added.
2357 amendments: list of Amendment PBs describing the
2358 metadata changes that the user made along w/ comment.
2359 attachments: [(filename, contents, mimetype),...] attachments uploaded at
2360 the time the comment was made.
2361 kept_attachments: list of attachment ids for attachments kept from
2362 previous descriptions, if the comment is an update to the description
2363 timestamp: time at which the comment was made, defaults to now.
2364 is_spam: True if the comment is classified as spam.
2365 is_description: True if the comment is a description for the issue.
2366 approval_id: id, if any, of the APPROVAL_TYPE FieldDef this comment
2367 belongs to.
2368 commit: set to False to not commit to DB yet.
2369 importer_id: user ID of an API client that is importing issues.
2370
2371 Returns:
2372 The new IssueComment protocol buffer.
2373
2374 Note that we assume that the content is safe to echo out
2375 again. The content may have some markup done during input
2376 processing.
2377 """
2378 if is_description:
2379 kept_attachments = self.GetAttachmentsByID(cnxn, kept_attachments)
2380 else:
2381 kept_attachments = []
2382
2383 comment = self._MakeIssueComment(
2384 issue.project_id, user_id, content, amendments=amendments,
2385 inbound_message=inbound_message, attachments=attachments,
2386 timestamp=timestamp, is_spam=is_spam, is_description=is_description,
2387 kept_attachments=kept_attachments, approval_id=approval_id,
2388 importer_id=importer_id)
2389 comment.issue_id = issue.issue_id
2390
2391 if attachments or kept_attachments:
2392 issue.attachment_count = (
2393 issue.attachment_count + len(attachments) + len(kept_attachments))
2394 self.UpdateIssue(cnxn, issue, update_cols=['attachment_count'])
2395
2396 self.comment_creations.increment()
2397 self.InsertComment(cnxn, comment, commit=commit)
2398
2399 return comment
2400
2401 def SoftDeleteComment(
2402 self, cnxn, issue, issue_comment, deleted_by_user_id,
2403 user_service, delete=True, reindex=False, is_spam=False):
2404 """Mark comment as un/deleted, which shows/hides it from average users."""
2405 # Update number of attachments
2406 attachments = 0
2407 if issue_comment.attachments:
2408 for attachment in issue_comment.attachments:
2409 if not attachment.deleted:
2410 attachments += 1
2411
2412 # Delete only if it's not in deleted state
2413 if delete:
2414 if not issue_comment.deleted_by:
2415 issue_comment.deleted_by = deleted_by_user_id
2416 issue.attachment_count = issue.attachment_count - attachments
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01002417 issue.migration_modified_timestamp = int(time.time())
Copybara854996b2021-09-07 19:36:02 +00002418
2419 # Undelete only if it's in deleted state
2420 elif issue_comment.deleted_by:
2421 issue_comment.deleted_by = 0
2422 issue.attachment_count = issue.attachment_count + attachments
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01002423 issue.migration_modified_timestamp = int(time.time())
Copybara854996b2021-09-07 19:36:02 +00002424
2425 issue_comment.is_spam = is_spam
2426 self._UpdateComment(
2427 cnxn, issue_comment, update_cols=['deleted_by', 'is_spam'])
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01002428 self.UpdateIssue(
2429 cnxn, issue, update_cols=['attachment_count', 'migration_modified'])
Copybara854996b2021-09-07 19:36:02 +00002430
2431 # Reindex the issue to take the comment deletion/undeletion into account.
2432 if reindex:
2433 tracker_fulltext.IndexIssues(
2434 cnxn, [issue], user_service, self, self._config_service)
2435 else:
2436 self.EnqueueIssuesForIndexing(cnxn, [issue.issue_id])
2437
2438 ### Approvals
2439
2440 def GetIssueApproval(self, cnxn, issue_id, approval_id, use_cache=True):
2441 """Retrieve the specified approval for the specified issue."""
2442 issue = self.GetIssue(cnxn, issue_id, use_cache=use_cache)
2443 approval = tracker_bizobj.FindApprovalValueByID(
2444 approval_id, issue.approval_values)
2445 if approval:
2446 return issue, approval
2447 raise exceptions.NoSuchIssueApprovalException()
2448
2449 def DeltaUpdateIssueApproval(
2450 self, cnxn, modifier_id, config, issue, approval, approval_delta,
2451 comment_content=None, is_description=False, attachments=None,
2452 commit=True, kept_attachments=None):
2453 """Update the issue's approval in the database."""
2454 amendments = []
2455
2456 # Update status in RAM and DB and create status amendment.
2457 if approval_delta.status:
2458 approval.status = approval_delta.status
2459 approval.set_on = approval_delta.set_on or int(time.time())
2460 approval.setter_id = modifier_id
2461 status_amendment = tracker_bizobj.MakeApprovalStatusAmendment(
2462 approval_delta.status)
2463 amendments.append(status_amendment)
2464
2465 self._UpdateIssueApprovalStatus(
2466 cnxn, issue.issue_id, approval.approval_id, approval.status,
2467 approval.setter_id, approval.set_on)
2468
2469 # Update approver_ids in RAM and DB and create approver amendment.
2470 approvers_add = [approver for approver in approval_delta.approver_ids_add
2471 if approver not in approval.approver_ids]
2472 approvers_remove = [approver for approver in
2473 approval_delta.approver_ids_remove
2474 if approver in approval.approver_ids]
2475 if approvers_add or approvers_remove:
2476 approver_ids = [approver for approver in
2477 list(approval.approver_ids) + approvers_add
2478 if approver not in approvers_remove]
2479 approval.approver_ids = approver_ids
2480 approvers_amendment = tracker_bizobj.MakeApprovalApproversAmendment(
2481 approvers_add, approvers_remove)
2482 amendments.append(approvers_amendment)
2483
2484 self._UpdateIssueApprovalApprovers(
2485 cnxn, issue.issue_id, approval.approval_id, approver_ids)
2486
2487 fv_amendments = tracker_bizobj.ApplyFieldValueChanges(
2488 issue, config, approval_delta.subfield_vals_add,
2489 approval_delta.subfield_vals_remove, approval_delta.subfields_clear)
2490 amendments.extend(fv_amendments)
2491 if fv_amendments:
2492 self._UpdateIssuesFields(cnxn, [issue], commit=False)
2493
2494 label_amendment = tracker_bizobj.ApplyLabelChanges(
2495 issue, config, approval_delta.labels_add, approval_delta.labels_remove)
2496 if label_amendment:
2497 amendments.append(label_amendment)
2498 self._UpdateIssuesLabels(cnxn, [issue], commit=False)
2499
2500 comment_pb = self.CreateIssueComment(
2501 cnxn, issue, modifier_id, comment_content, amendments=amendments,
2502 approval_id=approval.approval_id, is_description=is_description,
2503 attachments=attachments, commit=False,
2504 kept_attachments=kept_attachments)
2505
2506 if commit:
2507 cnxn.Commit()
2508 self.issue_2lc.InvalidateKeys(cnxn, [issue.issue_id])
2509
2510 return comment_pb
2511
2512 def _UpdateIssueApprovalStatus(
2513 self, cnxn, issue_id, approval_id, status, setter_id, set_on):
2514 """Update the approvalvalue for the given issue_id's issue."""
2515 set_on = set_on or int(time.time())
2516 delta = {
2517 'status': status.name.lower(),
2518 'setter_id': setter_id,
2519 'set_on': set_on,
2520 }
2521 self.issue2approvalvalue_tbl.Update(
2522 cnxn, delta, approval_id=approval_id, issue_id=issue_id,
2523 commit=False)
2524
2525 def _UpdateIssueApprovalApprovers(
2526 self, cnxn, issue_id, approval_id, approver_ids):
2527 """Update the list of approvers allowed to approve an issue's approval."""
2528 self.issueapproval2approver_tbl.Delete(
2529 cnxn, issue_id=issue_id, approval_id=approval_id, commit=False)
2530 self.issueapproval2approver_tbl.InsertRows(
2531 cnxn, ISSUEAPPROVAL2APPROVER_COLS, [(approval_id, approver_id, issue_id)
2532 for approver_id in approver_ids],
2533 commit=False)
2534
2535 ### Attachments
2536
2537 def GetAttachmentAndContext(self, cnxn, attachment_id):
2538 """Load a IssueAttachment from database, and its comment ID and IID.
2539
2540 Args:
2541 cnxn: connection to SQL database.
2542 attachment_id: long integer unique ID of desired issue attachment.
2543
2544 Returns:
2545 An Attachment protocol buffer that contains metadata about the attached
2546 file, or None if it doesn't exist. Also, the comment ID and issue IID
2547 of the comment and issue that contain this attachment.
2548
2549 Raises:
2550 NoSuchAttachmentException: the attachment was not found.
2551 """
2552 if attachment_id is None:
2553 raise exceptions.NoSuchAttachmentException()
2554
2555 attachment_row = self.attachment_tbl.SelectRow(
2556 cnxn, cols=ATTACHMENT_COLS, id=attachment_id)
2557 if attachment_row:
2558 (attach_id, issue_id, comment_id, filename, filesize, mimetype,
2559 deleted, gcs_object_id) = attachment_row
2560 if not deleted:
2561 attachment = tracker_pb2.Attachment(
2562 attachment_id=attach_id, filename=filename, filesize=filesize,
2563 mimetype=mimetype, deleted=bool(deleted),
2564 gcs_object_id=gcs_object_id)
2565 return attachment, comment_id, issue_id
2566
2567 raise exceptions.NoSuchAttachmentException()
2568
2569 def GetAttachmentsByID(self, cnxn, attachment_ids):
2570 """Return all Attachment PBs by attachment ids.
2571
2572 Args:
2573 cnxn: connection to SQL database.
2574 attachment_ids: a list of comment ids.
2575
2576 Returns:
2577 A list of the Attachment protocol buffers for the attachments with
2578 these ids.
2579 """
2580 attachment_rows = self.attachment_tbl.Select(
2581 cnxn, cols=ATTACHMENT_COLS, id=attachment_ids)
2582
2583 return attachment_rows
2584
2585 def _UpdateAttachment(self, cnxn, comment, attach, update_cols=None):
2586 """Update attachment metadata in the DB.
2587
2588 Args:
2589 cnxn: connection to SQL database.
2590 comment: IssueComment PB to invalidate in the cache.
2591 attach: IssueAttachment PB to update in the DB.
2592 update_cols: optional list of just the field names to update.
2593 """
2594 delta = {
2595 'filename': attach.filename,
2596 'filesize': attach.filesize,
2597 'mimetype': attach.mimetype,
2598 'deleted': bool(attach.deleted),
2599 }
2600 if update_cols is not None:
2601 delta = {key: val for key, val in delta.items()
2602 if key in update_cols}
2603
2604 self.attachment_tbl.Update(cnxn, delta, id=attach.attachment_id)
2605 self.comment_2lc.InvalidateKeys(cnxn, [comment.id])
2606
2607 def SoftDeleteAttachment(
2608 self, cnxn, issue, issue_comment, attach_id, user_service, delete=True,
2609 index_now=False):
2610 """Mark attachment as un/deleted, which shows/hides it from avg users."""
2611 attachment = None
2612 for attach in issue_comment.attachments:
2613 if attach.attachment_id == attach_id:
2614 attachment = attach
2615
2616 if not attachment:
2617 logging.warning(
2618 'Tried to (un)delete non-existent attachment #%s in project '
2619 '%s issue %s', attach_id, issue.project_id, issue.local_id)
2620 return
2621
2622 if not issue_comment.deleted_by:
2623 # Decrement attachment count only if it's not in deleted state
2624 if delete:
2625 if not attachment.deleted:
2626 issue.attachment_count = issue.attachment_count - 1
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01002627 issue.migration_modified_timestamp = int(time.time())
Copybara854996b2021-09-07 19:36:02 +00002628
2629 # Increment attachment count only if it's in deleted state
2630 elif attachment.deleted:
2631 issue.attachment_count = issue.attachment_count + 1
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01002632 issue.migration_modified_timestamp = int(time.time())
Copybara854996b2021-09-07 19:36:02 +00002633
2634 logging.info('attachment.deleted was %s', attachment.deleted)
2635
2636 attachment.deleted = delete
2637
2638 logging.info('attachment.deleted is %s', attachment.deleted)
2639
2640 self._UpdateAttachment(
2641 cnxn, issue_comment, attachment, update_cols=['deleted'])
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01002642 self.UpdateIssue(
2643 cnxn, issue, update_cols=['attachment_count', 'migration_modified'])
Copybara854996b2021-09-07 19:36:02 +00002644
2645 if index_now:
2646 tracker_fulltext.IndexIssues(
2647 cnxn, [issue], user_service, self, self._config_service)
2648 else:
2649 self.EnqueueIssuesForIndexing(cnxn, [issue.issue_id])
2650
2651 ### Reindex queue
2652
2653 def EnqueueIssuesForIndexing(self, cnxn, issue_ids, commit=True):
2654 # type: (MonorailConnection, Collection[int], Optional[bool]) -> None
2655 """Add the given issue IDs to the ReindexQueue table."""
2656 reindex_rows = [(issue_id,) for issue_id in issue_ids]
2657 self.reindexqueue_tbl.InsertRows(
2658 cnxn, ['issue_id'], reindex_rows, ignore=True, commit=commit)
2659
2660 def ReindexIssues(self, cnxn, num_to_reindex, user_service):
2661 """Reindex some issues specified in the IndexQueue table."""
2662 rows = self.reindexqueue_tbl.Select(
2663 cnxn, order_by=[('created', [])], limit=num_to_reindex)
2664 issue_ids = [row[0] for row in rows]
2665
2666 if issue_ids:
2667 issues = self.GetIssues(cnxn, issue_ids)
2668 tracker_fulltext.IndexIssues(
2669 cnxn, issues, user_service, self, self._config_service)
2670 self.reindexqueue_tbl.Delete(cnxn, issue_id=issue_ids)
2671
2672 return len(issue_ids)
2673
2674 ### Search functions
2675
2676 def RunIssueQuery(
2677 self, cnxn, left_joins, where, order_by, shard_id=None, limit=None):
2678 """Run a SQL query to find matching issue IDs.
2679
2680 Args:
2681 cnxn: connection to SQL database.
2682 left_joins: list of SQL LEFT JOIN clauses.
2683 where: list of SQL WHERE clauses.
2684 order_by: list of SQL ORDER BY clauses.
2685 shard_id: int shard ID to focus the search.
2686 limit: int maximum number of results, defaults to
2687 settings.search_limit_per_shard.
2688
2689 Returns:
2690 (issue_ids, capped) where issue_ids is a list of the result issue IDs,
2691 and capped is True if the number of results reached the limit.
2692 """
2693 limit = limit or settings.search_limit_per_shard
2694 where = where + [('Issue.deleted = %s', [False])]
2695 rows = self.issue_tbl.Select(
2696 cnxn, shard_id=shard_id, distinct=True, cols=['Issue.id'],
2697 left_joins=left_joins, where=where, order_by=order_by,
2698 limit=limit)
2699 issue_ids = [row[0] for row in rows]
2700 capped = len(issue_ids) >= limit
2701 return issue_ids, capped
2702
2703 def GetIIDsByLabelIDs(self, cnxn, label_ids, project_id, shard_id):
2704 """Return a list of IIDs for issues with any of the given label IDs."""
2705 if not label_ids:
2706 return []
2707 where = []
2708 if shard_id is not None:
2709 slice_term = ('shard = %s', [shard_id])
2710 where.append(slice_term)
2711
2712 rows = self.issue_tbl.Select(
2713 cnxn, shard_id=shard_id, cols=['id'],
2714 left_joins=[('Issue2Label ON Issue.id = Issue2Label.issue_id', [])],
2715 label_id=label_ids, project_id=project_id, where=where)
2716 return [row[0] for row in rows]
2717
2718 def GetIIDsByParticipant(self, cnxn, user_ids, project_ids, shard_id):
2719 """Return IIDs for issues where any of the given users participate."""
2720 iids = []
2721 where = []
2722 if shard_id is not None:
2723 where.append(('shard = %s', [shard_id]))
2724 if project_ids:
2725 cond_str = 'Issue.project_id IN (%s)' % sql.PlaceHolders(project_ids)
2726 where.append((cond_str, project_ids))
2727
2728 # TODO(jrobbins): Combine these 3 queries into one with ORs. It currently
2729 # is not the bottleneck.
2730 rows = self.issue_tbl.Select(
2731 cnxn, cols=['id'], reporter_id=user_ids,
2732 where=where, shard_id=shard_id)
2733 for row in rows:
2734 iids.append(row[0])
2735
2736 rows = self.issue_tbl.Select(
2737 cnxn, cols=['id'], owner_id=user_ids,
2738 where=where, shard_id=shard_id)
2739 for row in rows:
2740 iids.append(row[0])
2741
2742 rows = self.issue_tbl.Select(
2743 cnxn, cols=['id'], derived_owner_id=user_ids,
2744 where=where, shard_id=shard_id)
2745 for row in rows:
2746 iids.append(row[0])
2747
2748 rows = self.issue_tbl.Select(
2749 cnxn, cols=['id'],
2750 left_joins=[('Issue2Cc ON Issue2Cc.issue_id = Issue.id', [])],
2751 cc_id=user_ids,
2752 where=where + [('cc_id IS NOT NULL', [])],
2753 shard_id=shard_id)
2754 for row in rows:
2755 iids.append(row[0])
2756
2757 rows = self.issue_tbl.Select(
2758 cnxn, cols=['Issue.id'],
2759 left_joins=[
2760 ('Issue2FieldValue ON Issue.id = Issue2FieldValue.issue_id', []),
2761 ('FieldDef ON Issue2FieldValue.field_id = FieldDef.id', [])],
2762 user_id=user_ids, grants_perm='View',
2763 where=where + [('user_id IS NOT NULL', [])],
2764 shard_id=shard_id)
2765 for row in rows:
2766 iids.append(row[0])
2767
2768 return iids
2769
2770 ### Issue Dependency Rankings
2771
2772 def SortBlockedOn(self, cnxn, issue, blocked_on_iids):
2773 """Sort blocked_on dependencies by rank and dst_issue_id.
2774
2775 Args:
2776 cnxn: connection to SQL database.
2777 issue: the issue being blocked.
2778 blocked_on_iids: the iids of all the issue's blockers
2779
2780 Returns:
2781 a tuple (ids, ranks), where ids is the sorted list of
2782 blocked_on_iids and ranks is the list of corresponding ranks
2783 """
2784 rows = self.issuerelation_tbl.Select(
2785 cnxn, cols=ISSUERELATION_COLS, issue_id=issue.issue_id,
2786 dst_issue_id=blocked_on_iids, kind='blockedon',
2787 order_by=[('rank DESC', []), ('dst_issue_id', [])])
2788 ids = [row[1] for row in rows]
2789 ids.extend([iid for iid in blocked_on_iids if iid not in ids])
2790 ranks = [row[3] for row in rows]
2791 ranks.extend([0] * (len(blocked_on_iids) - len(ranks)))
2792 return ids, ranks
2793
2794 def ApplyIssueRerank(
2795 self, cnxn, parent_id, relations_to_change, commit=True, invalidate=True):
2796 """Updates rankings of blocked on issue relations to new values
2797
2798 Args:
2799 cnxn: connection to SQL database.
2800 parent_id: the global ID of the blocked issue to update
2801 relations_to_change: This should be a list of
2802 [(blocker_id, new_rank),...] of relations that need to be changed
2803 commit: set to False to skip the DB commit and do it in the caller.
2804 invalidate: set to False to leave cache invalidatation to the caller.
2805 """
2806 blocker_ids = [blocker for (blocker, rank) in relations_to_change]
2807 self.issuerelation_tbl.Delete(
2808 cnxn, issue_id=parent_id, dst_issue_id=blocker_ids, commit=False)
2809 insert_rows = [(parent_id, blocker, 'blockedon', rank)
2810 for (blocker, rank) in relations_to_change]
2811 self.issuerelation_tbl.InsertRows(
2812 cnxn, cols=ISSUERELATION_COLS, row_values=insert_rows, commit=commit)
2813 if invalidate:
2814 self.InvalidateIIDs(cnxn, [parent_id])
2815
2816 # Expunge Users from Issues system.
2817 def ExpungeUsersInIssues(self, cnxn, user_ids_by_email, limit=None):
2818 """Removes all references to given users from issue DB tables.
2819
2820 This method will not commit the operations. This method will
2821 not make changes to in-memory data.
2822
2823 Args:
2824 cnxn: connection to SQL database.
2825 user_ids_by_email: dict of {email: user_id} of all users we want
2826 to expunge.
2827 limit: Optional, the limit for each operation.
2828
2829 Returns:
2830 A list of issue_ids that need to be reindexed.
2831 """
2832 commit = False
2833 user_ids = list(user_ids_by_email.values())
2834 user_emails = list(user_ids_by_email.keys())
2835 # Track issue_ids for issues that will have different search documents
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01002836 # and need updates to modification time as a result of removing users.
Copybara854996b2021-09-07 19:36:02 +00002837 affected_issue_ids = []
2838
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01002839 timestamp = int(time.time())
2840
Copybara854996b2021-09-07 19:36:02 +00002841 # Reassign commenter_id and delete inbound_messages.
2842 shard_id = sql.RandomShardID()
2843 comment_content_id_rows = self.comment_tbl.Select(
2844 cnxn, cols=['Comment.id', 'Comment.issue_id', 'commentcontent_id'],
2845 commenter_id=user_ids, shard_id=shard_id, limit=limit)
2846 comment_ids = [row[0] for row in comment_content_id_rows]
2847 commentcontent_ids = [row[2] for row in comment_content_id_rows]
2848 if commentcontent_ids:
2849 self.commentcontent_tbl.Update(
2850 cnxn, {'inbound_message': None}, id=commentcontent_ids, commit=commit)
2851 if comment_ids:
2852 self.comment_tbl.Update(
2853 cnxn, {'commenter_id': framework_constants.DELETED_USER_ID},
2854 id=comment_ids,
2855 commit=commit)
2856 affected_issue_ids.extend([row[1] for row in comment_content_id_rows])
2857
2858 # Reassign deleted_by comments deleted_by.
2859 self.comment_tbl.Update(
2860 cnxn,
2861 {'deleted_by': framework_constants.DELETED_USER_ID},
2862 deleted_by=user_ids,
2863 commit=commit, limit=limit)
2864
2865 # Remove users in field values.
2866 fv_issue_id_rows = self.issue2fieldvalue_tbl.Select(
2867 cnxn, cols=['issue_id'], user_id=user_ids, limit=limit)
2868 fv_issue_ids = [row[0] for row in fv_issue_id_rows]
2869 self.issue2fieldvalue_tbl.Delete(
2870 cnxn, user_id=user_ids, limit=limit, commit=commit)
2871 affected_issue_ids.extend(fv_issue_ids)
2872
2873 # Remove users in approval values.
2874 self.issueapproval2approver_tbl.Delete(
2875 cnxn, approver_id=user_ids, commit=commit, limit=limit)
2876 self.issue2approvalvalue_tbl.Update(
2877 cnxn,
2878 {'setter_id': framework_constants.DELETED_USER_ID},
2879 setter_id=user_ids,
2880 commit=commit, limit=limit)
2881
2882 # Remove users in issue Ccs.
2883 cc_issue_id_rows = self.issue2cc_tbl.Select(
2884 cnxn, cols=['issue_id'], cc_id=user_ids, limit=limit)
2885 cc_issue_ids = [row[0] for row in cc_issue_id_rows]
2886 self.issue2cc_tbl.Delete(
2887 cnxn, cc_id=user_ids, limit=limit, commit=commit)
2888 affected_issue_ids.extend(cc_issue_ids)
2889
2890 # Remove users in issue owners.
2891 owner_issue_id_rows = self.issue_tbl.Select(
2892 cnxn, cols=['id'], owner_id=user_ids, limit=limit)
2893 owner_issue_ids = [row[0] for row in owner_issue_id_rows]
2894 if owner_issue_ids:
2895 self.issue_tbl.Update(
2896 cnxn, {'owner_id': None}, id=owner_issue_ids, commit=commit)
2897 affected_issue_ids.extend(owner_issue_ids)
2898 derived_owner_issue_id_rows = self.issue_tbl.Select(
2899 cnxn, cols=['id'], derived_owner_id=user_ids, limit=limit)
2900 derived_owner_issue_ids = [row[0] for row in derived_owner_issue_id_rows]
2901 if derived_owner_issue_ids:
2902 self.issue_tbl.Update(
2903 cnxn, {'derived_owner_id': None},
2904 id=derived_owner_issue_ids,
2905 commit=commit)
2906 affected_issue_ids.extend(derived_owner_issue_ids)
2907
2908 # Remove users in issue reporters.
2909 reporter_issue_id_rows = self.issue_tbl.Select(
2910 cnxn, cols=['id'], reporter_id=user_ids, limit=limit)
2911 reporter_issue_ids = [row[0] for row in reporter_issue_id_rows]
2912 if reporter_issue_ids:
2913 self.issue_tbl.Update(
2914 cnxn, {'reporter_id': framework_constants.DELETED_USER_ID},
2915 id=reporter_issue_ids,
2916 commit=commit)
2917 affected_issue_ids.extend(reporter_issue_ids)
2918
2919 # Note: issueupdate_tbl's and issue2notify's user_id columns do not
2920 # reference the User table. So all values need to updated here before
2921 # User rows can be deleted safely. No limit will be applied.
2922
2923 # Remove users in issue updates.
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01002924 user_added_id_rows = self.issueupdate_tbl.Select(
2925 cnxn,
2926 cols=['IssueUpdate.issue_id'],
2927 added_user_id=user_ids,
2928 shard_id=shard_id,
2929 limit=limit)
2930 user_removed_id_rows = self.issueupdate_tbl.Select(
2931 cnxn,
2932 cols=['IssueUpdate.issue_id'],
2933 removed_user_id=user_ids,
2934 shard_id=shard_id,
2935 limit=limit)
Copybara854996b2021-09-07 19:36:02 +00002936 self.issueupdate_tbl.Update(
2937 cnxn,
2938 {'added_user_id': framework_constants.DELETED_USER_ID},
2939 added_user_id=user_ids,
2940 commit=commit)
2941 self.issueupdate_tbl.Update(
2942 cnxn,
2943 {'removed_user_id': framework_constants.DELETED_USER_ID},
2944 removed_user_id=user_ids,
2945 commit=commit)
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01002946 affected_issue_ids.extend([row[0] for row in user_added_id_rows])
2947 affected_issue_ids.extend([row[0] for row in user_removed_id_rows])
Copybara854996b2021-09-07 19:36:02 +00002948
2949 # Remove users in issue notify.
2950 self.issue2notify_tbl.Delete(
2951 cnxn, email=user_emails, commit=commit)
2952
2953 # Remove users in issue snapshots.
2954 self.issuesnapshot_tbl.Update(
2955 cnxn,
2956 {'owner_id': framework_constants.DELETED_USER_ID},
2957 owner_id=user_ids,
2958 commit=commit, limit=limit)
2959 self.issuesnapshot_tbl.Update(
2960 cnxn,
2961 {'reporter_id': framework_constants.DELETED_USER_ID},
2962 reporter_id=user_ids,
2963 commit=commit, limit=limit)
2964 self.issuesnapshot2cc_tbl.Delete(
2965 cnxn, cc_id=user_ids, commit=commit, limit=limit)
2966
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01002967 # Update migration_modified timestamp for affected issues.
2968 deduped_issue_ids = list(set(affected_issue_ids))
2969 if deduped_issue_ids:
2970 self.issue_tbl.Update(
2971 cnxn, {'migration_modified': timestamp},
2972 id=deduped_issue_ids,
2973 commit=commit)
2974
2975 return deduped_issue_ids