blob: 8e5a45f59304c4c7f79a00cdf41476d066b78572 [file] [log] [blame]
Copybara854996b2021-09-07 19:36:02 +00001# Copyright 2016 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style
3# license that can be found in the LICENSE file or at
4# https://developers.google.com/open-source/licenses/bsd
5
6"""A set of functions that provide persistence for Monorail issue tracking.
7
8This module provides functions to get, update, create, and (in some
9cases) delete each type of business object. It provides a logical
10persistence layer on top of an SQL database.
11
12Business objects are described in tracker_pb2.py and tracker_bizobj.py.
13"""
14from __future__ import print_function
15from __future__ import division
16from __future__ import absolute_import
17
18import collections
19import json
20import logging
21import os
22import time
23import uuid
24
25from google.appengine.api import app_identity
26from google.appengine.api import images
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +020027from google.cloud import storage
Copybara854996b2021-09-07 19:36:02 +000028
29import settings
30from features import filterrules_helpers
31from framework import authdata
32from framework import exceptions
33from framework import framework_bizobj
34from framework import framework_constants
35from framework import framework_helpers
36from framework import gcs_helpers
37from framework import permissions
38from framework import sql
39from infra_libs import ts_mon
40from proto import project_pb2
41from proto import tracker_pb2
42from services import caches
43from services import tracker_fulltext
44from tracker import tracker_bizobj
45from tracker import tracker_helpers
46
47# TODO(jojwang): monorail:4693, remove this after all 'stable-full'
48# gates have been renamed to 'stable'.
49FLT_EQUIVALENT_GATES = {'stable-full': 'stable',
50 'stable': 'stable-full'}
51
52ISSUE_TABLE_NAME = 'Issue'
53ISSUESUMMARY_TABLE_NAME = 'IssueSummary'
54ISSUE2LABEL_TABLE_NAME = 'Issue2Label'
55ISSUE2COMPONENT_TABLE_NAME = 'Issue2Component'
56ISSUE2CC_TABLE_NAME = 'Issue2Cc'
57ISSUE2NOTIFY_TABLE_NAME = 'Issue2Notify'
58ISSUE2FIELDVALUE_TABLE_NAME = 'Issue2FieldValue'
59COMMENT_TABLE_NAME = 'Comment'
60COMMENTCONTENT_TABLE_NAME = 'CommentContent'
61COMMENTIMPORTER_TABLE_NAME = 'CommentImporter'
62ATTACHMENT_TABLE_NAME = 'Attachment'
63ISSUERELATION_TABLE_NAME = 'IssueRelation'
64DANGLINGRELATION_TABLE_NAME = 'DanglingIssueRelation'
65ISSUEUPDATE_TABLE_NAME = 'IssueUpdate'
66ISSUEFORMERLOCATIONS_TABLE_NAME = 'IssueFormerLocations'
67REINDEXQUEUE_TABLE_NAME = 'ReindexQueue'
68LOCALIDCOUNTER_TABLE_NAME = 'LocalIDCounter'
69ISSUESNAPSHOT_TABLE_NAME = 'IssueSnapshot'
70ISSUESNAPSHOT2CC_TABLE_NAME = 'IssueSnapshot2Cc'
71ISSUESNAPSHOT2COMPONENT_TABLE_NAME = 'IssueSnapshot2Component'
72ISSUESNAPSHOT2LABEL_TABLE_NAME = 'IssueSnapshot2Label'
73ISSUEPHASEDEF_TABLE_NAME = 'IssuePhaseDef'
74ISSUE2APPROVALVALUE_TABLE_NAME = 'Issue2ApprovalValue'
75ISSUEAPPROVAL2APPROVER_TABLE_NAME = 'IssueApproval2Approver'
76ISSUEAPPROVAL2COMMENT_TABLE_NAME = 'IssueApproval2Comment'
77
78
79ISSUE_COLS = [
80 'id', 'project_id', 'local_id', 'status_id', 'owner_id', 'reporter_id',
81 'opened', 'closed', 'modified',
82 'owner_modified', 'status_modified', 'component_modified',
83 'derived_owner_id', 'derived_status_id',
84 'deleted', 'star_count', 'attachment_count', 'is_spam']
85ISSUESUMMARY_COLS = ['issue_id', 'summary']
86ISSUE2LABEL_COLS = ['issue_id', 'label_id', 'derived']
87ISSUE2COMPONENT_COLS = ['issue_id', 'component_id', 'derived']
88ISSUE2CC_COLS = ['issue_id', 'cc_id', 'derived']
89ISSUE2NOTIFY_COLS = ['issue_id', 'email']
90ISSUE2FIELDVALUE_COLS = [
91 'issue_id', 'field_id', 'int_value', 'str_value', 'user_id', 'date_value',
92 'url_value', 'derived', 'phase_id']
93# Explicitly specify column 'Comment.id' to allow joins on other tables that
94# have an 'id' column.
95COMMENT_COLS = [
96 'Comment.id', 'issue_id', 'created', 'Comment.project_id', 'commenter_id',
97 'deleted_by', 'Comment.is_spam', 'is_description',
98 'commentcontent_id'] # Note: commentcontent_id must be last.
99COMMENTCONTENT_COLS = [
100 'CommentContent.id', 'content', 'inbound_message']
101COMMENTIMPORTER_COLS = ['comment_id', 'importer_id']
102ABBR_COMMENT_COLS = ['Comment.id', 'commenter_id', 'deleted_by',
103 'is_description']
104ATTACHMENT_COLS = [
105 'id', 'issue_id', 'comment_id', 'filename', 'filesize', 'mimetype',
106 'deleted', 'gcs_object_id']
107ISSUERELATION_COLS = ['issue_id', 'dst_issue_id', 'kind', 'rank']
108ABBR_ISSUERELATION_COLS = ['dst_issue_id', 'rank']
109DANGLINGRELATION_COLS = [
110 'issue_id', 'dst_issue_project', 'dst_issue_local_id',
111 'ext_issue_identifier', 'kind']
112ISSUEUPDATE_COLS = [
113 'id', 'issue_id', 'comment_id', 'field', 'old_value', 'new_value',
114 'added_user_id', 'removed_user_id', 'custom_field_name']
115ISSUEFORMERLOCATIONS_COLS = ['issue_id', 'project_id', 'local_id']
116REINDEXQUEUE_COLS = ['issue_id', 'created']
117ISSUESNAPSHOT_COLS = ['id', 'issue_id', 'shard', 'project_id', 'local_id',
118 'reporter_id', 'owner_id', 'status_id', 'period_start', 'period_end',
119 'is_open']
120ISSUESNAPSHOT2CC_COLS = ['issuesnapshot_id', 'cc_id']
121ISSUESNAPSHOT2COMPONENT_COLS = ['issuesnapshot_id', 'component_id']
122ISSUESNAPSHOT2LABEL_COLS = ['issuesnapshot_id', 'label_id']
123ISSUEPHASEDEF_COLS = ['id', 'name', 'rank']
124ISSUE2APPROVALVALUE_COLS = ['approval_id', 'issue_id', 'phase_id',
125 'status', 'setter_id', 'set_on']
126ISSUEAPPROVAL2APPROVER_COLS = ['approval_id', 'approver_id', 'issue_id']
127ISSUEAPPROVAL2COMMENT_COLS = ['approval_id', 'comment_id']
128
129CHUNK_SIZE = 1000
130
131
132class IssueIDTwoLevelCache(caches.AbstractTwoLevelCache):
133 """Class to manage RAM and memcache for Issue IDs."""
134
135 def __init__(self, cache_manager, issue_service):
136 super(IssueIDTwoLevelCache, self).__init__(
137 cache_manager, 'issue_id', 'issue_id:', int,
138 max_size=settings.issue_cache_max_size)
139 self.issue_service = issue_service
140
141 def _MakeCache(self, cache_manager, kind, max_size=None):
142 """Override normal RamCache creation with ValueCentricRamCache."""
143 return caches.ValueCentricRamCache(cache_manager, kind, max_size=max_size)
144
145 def _DeserializeIssueIDs(self, project_local_issue_ids):
146 """Convert database rows into a dict {(project_id, local_id): issue_id}."""
147 return {(project_id, local_id): issue_id
148 for (project_id, local_id, issue_id) in project_local_issue_ids}
149
150 def FetchItems(self, cnxn, keys):
151 """On RAM and memcache miss, hit the database."""
152 local_ids_by_pid = collections.defaultdict(list)
153 for project_id, local_id in keys:
154 local_ids_by_pid[project_id].append(local_id)
155
156 where = [] # We OR per-project pairs of conditions together.
157 for project_id, local_ids_in_project in local_ids_by_pid.items():
158 term_str = ('(Issue.project_id = %%s AND Issue.local_id IN (%s))' %
159 sql.PlaceHolders(local_ids_in_project))
160 where.append((term_str, [project_id] + local_ids_in_project))
161
162 rows = self.issue_service.issue_tbl.Select(
163 cnxn, cols=['project_id', 'local_id', 'id'],
164 where=where, or_where_conds=True)
165 return self._DeserializeIssueIDs(rows)
166
167 def _KeyToStr(self, key):
168 """This cache uses pairs of ints as keys. Convert them to strings."""
169 return '%d,%d' % key
170
171 def _StrToKey(self, key_str):
172 """This cache uses pairs of ints as keys. Convert them from strings."""
173 project_id_str, local_id_str = key_str.split(',')
174 return int(project_id_str), int(local_id_str)
175
176
177class IssueTwoLevelCache(caches.AbstractTwoLevelCache):
178 """Class to manage RAM and memcache for Issue PBs."""
179
180 def __init__(
181 self, cache_manager, issue_service, project_service, config_service):
182 super(IssueTwoLevelCache, self).__init__(
183 cache_manager, 'issue', 'issue:', tracker_pb2.Issue,
184 max_size=settings.issue_cache_max_size)
185 self.issue_service = issue_service
186 self.project_service = project_service
187 self.config_service = config_service
188
189 def _UnpackIssue(self, cnxn, issue_row):
190 """Partially construct an issue object using info from a DB row."""
191 (issue_id, project_id, local_id, status_id, owner_id, reporter_id,
192 opened, closed, modified, owner_modified, status_modified,
193 component_modified, derived_owner_id, derived_status_id,
194 deleted, star_count, attachment_count, is_spam) = issue_row
195
196 issue = tracker_pb2.Issue()
197 project = self.project_service.GetProject(cnxn, project_id)
198 issue.project_name = project.project_name
199 issue.issue_id = issue_id
200 issue.project_id = project_id
201 issue.local_id = local_id
202 if status_id is not None:
203 status = self.config_service.LookupStatus(cnxn, project_id, status_id)
204 issue.status = status
205 issue.owner_id = owner_id or 0
206 issue.reporter_id = reporter_id or 0
207 issue.derived_owner_id = derived_owner_id or 0
208 if derived_status_id is not None:
209 derived_status = self.config_service.LookupStatus(
210 cnxn, project_id, derived_status_id)
211 issue.derived_status = derived_status
212 issue.deleted = bool(deleted)
213 if opened:
214 issue.opened_timestamp = opened
215 if closed:
216 issue.closed_timestamp = closed
217 if modified:
218 issue.modified_timestamp = modified
219 if owner_modified:
220 issue.owner_modified_timestamp = owner_modified
221 if status_modified:
222 issue.status_modified_timestamp = status_modified
223 if component_modified:
224 issue.component_modified_timestamp = component_modified
225 issue.star_count = star_count
226 issue.attachment_count = attachment_count
227 issue.is_spam = bool(is_spam)
228 return issue
229
230 def _UnpackFieldValue(self, fv_row):
231 """Construct a field value object from a DB row."""
232 (issue_id, field_id, int_value, str_value, user_id, date_value, url_value,
233 derived, phase_id) = fv_row
234 fv = tracker_bizobj.MakeFieldValue(
235 field_id, int_value, str_value, user_id, date_value, url_value,
236 bool(derived), phase_id=phase_id)
237 return fv, issue_id
238
239 def _UnpackApprovalValue(self, av_row):
240 """Contruct an ApprovalValue PB from a DB row."""
241 (approval_id, issue_id, phase_id, status, setter_id, set_on) = av_row
242 if status:
243 status_enum = tracker_pb2.ApprovalStatus(status.upper())
244 else:
245 status_enum = tracker_pb2.ApprovalStatus.NOT_SET
246 av = tracker_pb2.ApprovalValue(
247 approval_id=approval_id, setter_id=setter_id, set_on=set_on,
248 status=status_enum, phase_id=phase_id)
249 return av, issue_id
250
251 def _UnpackPhase(self, phase_row):
252 """Construct a Phase PB from a DB row."""
253 (phase_id, name, rank) = phase_row
254 phase = tracker_pb2.Phase(
255 phase_id=phase_id, name=name, rank=rank)
256 return phase
257
258 def _DeserializeIssues(
259 self, cnxn, issue_rows, summary_rows, label_rows, component_rows,
260 cc_rows, notify_rows, fieldvalue_rows, relation_rows,
261 dangling_relation_rows, phase_rows, approvalvalue_rows,
262 av_approver_rows):
263 """Convert the given DB rows into a dict of Issue PBs."""
264 results_dict = {}
265 for issue_row in issue_rows:
266 issue = self._UnpackIssue(cnxn, issue_row)
267 results_dict[issue.issue_id] = issue
268
269 for issue_id, summary in summary_rows:
270 results_dict[issue_id].summary = summary
271
272 # TODO(jrobbins): it would be nice to order labels by rank and name.
273 for issue_id, label_id, derived in label_rows:
274 issue = results_dict.get(issue_id)
275 if not issue:
276 logging.info('Got label for an unknown issue: %r %r',
277 label_rows, issue_rows)
278 continue
279 label = self.config_service.LookupLabel(cnxn, issue.project_id, label_id)
280 assert label, ('Label ID %r on IID %r not found in project %r' %
281 (label_id, issue_id, issue.project_id))
282 if derived:
283 results_dict[issue_id].derived_labels.append(label)
284 else:
285 results_dict[issue_id].labels.append(label)
286
287 for issue_id, component_id, derived in component_rows:
288 if derived:
289 results_dict[issue_id].derived_component_ids.append(component_id)
290 else:
291 results_dict[issue_id].component_ids.append(component_id)
292
293 for issue_id, user_id, derived in cc_rows:
294 if derived:
295 results_dict[issue_id].derived_cc_ids.append(user_id)
296 else:
297 results_dict[issue_id].cc_ids.append(user_id)
298
299 for issue_id, email in notify_rows:
300 results_dict[issue_id].derived_notify_addrs.append(email)
301
302 for fv_row in fieldvalue_rows:
303 fv, issue_id = self._UnpackFieldValue(fv_row)
304 results_dict[issue_id].field_values.append(fv)
305
306 phases_by_id = {}
307 for phase_row in phase_rows:
308 phase = self._UnpackPhase(phase_row)
309 phases_by_id[phase.phase_id] = phase
310
311 approvers_dict = collections.defaultdict(list)
312 for approver_row in av_approver_rows:
313 approval_id, approver_id, issue_id = approver_row
314 approvers_dict[approval_id, issue_id].append(approver_id)
315
316 for av_row in approvalvalue_rows:
317 av, issue_id = self._UnpackApprovalValue(av_row)
318 av.approver_ids = approvers_dict[av.approval_id, issue_id]
319 results_dict[issue_id].approval_values.append(av)
320 if av.phase_id:
321 phase = phases_by_id[av.phase_id]
322 issue_phases = results_dict[issue_id].phases
323 if phase not in issue_phases:
324 issue_phases.append(phase)
325 # Order issue phases
326 for issue in results_dict.values():
327 if issue.phases:
328 issue.phases.sort(key=lambda phase: phase.rank)
329
330 for issue_id, dst_issue_id, kind, rank in relation_rows:
331 src_issue = results_dict.get(issue_id)
332 dst_issue = results_dict.get(dst_issue_id)
333 assert src_issue or dst_issue, (
334 'Neither source issue %r nor dest issue %r was found' %
335 (issue_id, dst_issue_id))
336 if src_issue:
337 if kind == 'blockedon':
338 src_issue.blocked_on_iids.append(dst_issue_id)
339 src_issue.blocked_on_ranks.append(rank)
340 elif kind == 'mergedinto':
341 src_issue.merged_into = dst_issue_id
342 else:
343 logging.info('unknown relation kind %r', kind)
344 continue
345
346 if dst_issue:
347 if kind == 'blockedon':
348 dst_issue.blocking_iids.append(issue_id)
349
350 for row in dangling_relation_rows:
351 issue_id, dst_issue_proj, dst_issue_id, ext_id, kind = row
352 src_issue = results_dict.get(issue_id)
353 if kind == 'blockedon':
354 src_issue.dangling_blocked_on_refs.append(
355 tracker_bizobj.MakeDanglingIssueRef(dst_issue_proj,
356 dst_issue_id, ext_id))
357 elif kind == 'blocking':
358 src_issue.dangling_blocking_refs.append(
359 tracker_bizobj.MakeDanglingIssueRef(dst_issue_proj, dst_issue_id,
360 ext_id))
361 elif kind == 'mergedinto':
362 src_issue.merged_into_external = ext_id
363 else:
364 logging.warn('unhandled danging relation kind %r', kind)
365 continue
366
367 return results_dict
368
369 # Note: sharding is used to here to allow us to load issues from the replicas
370 # without placing load on the primary DB. Writes are not sharded.
371 # pylint: disable=arguments-differ
372 def FetchItems(self, cnxn, issue_ids, shard_id=None):
373 """Retrieve and deserialize issues."""
374 issue_rows = self.issue_service.issue_tbl.Select(
375 cnxn, cols=ISSUE_COLS, id=issue_ids, shard_id=shard_id)
376
377 summary_rows = self.issue_service.issuesummary_tbl.Select(
378 cnxn, cols=ISSUESUMMARY_COLS, shard_id=shard_id, issue_id=issue_ids)
379 label_rows = self.issue_service.issue2label_tbl.Select(
380 cnxn, cols=ISSUE2LABEL_COLS, shard_id=shard_id, issue_id=issue_ids)
381 component_rows = self.issue_service.issue2component_tbl.Select(
382 cnxn, cols=ISSUE2COMPONENT_COLS, shard_id=shard_id, issue_id=issue_ids)
383 cc_rows = self.issue_service.issue2cc_tbl.Select(
384 cnxn, cols=ISSUE2CC_COLS, shard_id=shard_id, issue_id=issue_ids)
385 notify_rows = self.issue_service.issue2notify_tbl.Select(
386 cnxn, cols=ISSUE2NOTIFY_COLS, shard_id=shard_id, issue_id=issue_ids)
387 fieldvalue_rows = self.issue_service.issue2fieldvalue_tbl.Select(
388 cnxn, cols=ISSUE2FIELDVALUE_COLS, shard_id=shard_id,
389 issue_id=issue_ids)
390 approvalvalue_rows = self.issue_service.issue2approvalvalue_tbl.Select(
391 cnxn, cols=ISSUE2APPROVALVALUE_COLS, issue_id=issue_ids)
392 phase_ids = [av_row[2] for av_row in approvalvalue_rows]
393 phase_rows = []
394 if phase_ids:
395 phase_rows = self.issue_service.issuephasedef_tbl.Select(
396 cnxn, cols=ISSUEPHASEDEF_COLS, id=list(set(phase_ids)))
397 av_approver_rows = self.issue_service.issueapproval2approver_tbl.Select(
398 cnxn, cols=ISSUEAPPROVAL2APPROVER_COLS, issue_id=issue_ids)
399 if issue_ids:
400 ph = sql.PlaceHolders(issue_ids)
401 blocked_on_rows = self.issue_service.issuerelation_tbl.Select(
402 cnxn, cols=ISSUERELATION_COLS, issue_id=issue_ids, kind='blockedon',
403 order_by=[('issue_id', []), ('rank DESC', []), ('dst_issue_id', [])])
404 blocking_rows = self.issue_service.issuerelation_tbl.Select(
405 cnxn, cols=ISSUERELATION_COLS, dst_issue_id=issue_ids,
406 kind='blockedon', order_by=[('issue_id', []), ('dst_issue_id', [])])
407 unique_blocking = tuple(
408 row for row in blocking_rows if row not in blocked_on_rows)
409 merge_rows = self.issue_service.issuerelation_tbl.Select(
410 cnxn, cols=ISSUERELATION_COLS,
411 where=[('(issue_id IN (%s) OR dst_issue_id IN (%s))' % (ph, ph),
412 issue_ids + issue_ids),
413 ('kind != %s', ['blockedon'])])
414 relation_rows = blocked_on_rows + unique_blocking + merge_rows
415 dangling_relation_rows = self.issue_service.danglingrelation_tbl.Select(
416 cnxn, cols=DANGLINGRELATION_COLS, issue_id=issue_ids)
417 else:
418 relation_rows = []
419 dangling_relation_rows = []
420
421 issue_dict = self._DeserializeIssues(
422 cnxn, issue_rows, summary_rows, label_rows, component_rows, cc_rows,
423 notify_rows, fieldvalue_rows, relation_rows, dangling_relation_rows,
424 phase_rows, approvalvalue_rows, av_approver_rows)
425 logging.info('IssueTwoLevelCache.FetchItems returning: %r', issue_dict)
426 return issue_dict
427
428
429class CommentTwoLevelCache(caches.AbstractTwoLevelCache):
430 """Class to manage RAM and memcache for IssueComment PBs."""
431
432 def __init__(self, cache_manager, issue_svc):
433 super(CommentTwoLevelCache, self).__init__(
434 cache_manager, 'comment', 'comment:', tracker_pb2.IssueComment,
435 max_size=settings.comment_cache_max_size)
436 self.issue_svc = issue_svc
437
438 # pylint: disable=arguments-differ
439 def FetchItems(self, cnxn, keys, shard_id=None):
440 comment_rows = self.issue_svc.comment_tbl.Select(cnxn,
441 cols=COMMENT_COLS, id=keys, shard_id=shard_id)
442
443 if len(comment_rows) < len(keys):
444 self.issue_svc.replication_lag_retries.increment()
445 logging.info('issue3755: expected %d, but got %d rows from shard %d',
446 len(keys), len(comment_rows), shard_id)
447 shard_id = None # Will use Primary DB.
448 comment_rows = self.issue_svc.comment_tbl.Select(
449 cnxn, cols=COMMENT_COLS, id=keys, shard_id=None)
450 logging.info(
451 'Retry got %d comment rows from the primary DB', len(comment_rows))
452
453 cids = [row[0] for row in comment_rows]
454 commentcontent_ids = [row[-1] for row in comment_rows]
455 content_rows = self.issue_svc.commentcontent_tbl.Select(
456 cnxn, cols=COMMENTCONTENT_COLS, id=commentcontent_ids,
457 shard_id=shard_id)
458 approval_rows = self.issue_svc.issueapproval2comment_tbl.Select(
459 cnxn, cols=ISSUEAPPROVAL2COMMENT_COLS, comment_id=cids)
460 amendment_rows = self.issue_svc.issueupdate_tbl.Select(
461 cnxn, cols=ISSUEUPDATE_COLS, comment_id=cids, shard_id=shard_id)
462 attachment_rows = self.issue_svc.attachment_tbl.Select(
463 cnxn, cols=ATTACHMENT_COLS, comment_id=cids, shard_id=shard_id)
464 importer_rows = self.issue_svc.commentimporter_tbl.Select(
465 cnxn, cols=COMMENTIMPORTER_COLS, comment_id=cids, shard_id=shard_id)
466
467 comments = self.issue_svc._DeserializeComments(
468 comment_rows, content_rows, amendment_rows, attachment_rows,
469 approval_rows, importer_rows)
470
471 comments_dict = {}
472 for comment in comments:
473 comments_dict[comment.id] = comment
474
475 return comments_dict
476
477
478class IssueService(object):
479 """The persistence layer for Monorail's issues, comments, and attachments."""
480 spam_labels = ts_mon.CounterMetric(
481 'monorail/issue_svc/spam_label',
482 'Issues created, broken down by spam label.',
483 [ts_mon.StringField('type')])
484 replication_lag_retries = ts_mon.CounterMetric(
485 'monorail/issue_svc/replication_lag_retries',
486 'Counts times that loading comments from a replica failed',
487 [])
488 issue_creations = ts_mon.CounterMetric(
489 'monorail/issue_svc/issue_creations',
490 'Counts times that issues were created',
491 [])
492 comment_creations = ts_mon.CounterMetric(
493 'monorail/issue_svc/comment_creations',
494 'Counts times that comments were created',
495 [])
496
497 def __init__(self, project_service, config_service, cache_manager,
498 chart_service):
499 """Initialize this object so that it is ready to use.
500
501 Args:
502 project_service: services object for project info.
503 config_service: services object for tracker configuration info.
504 cache_manager: local cache with distributed invalidation.
505 chart_service (ChartService): An instance of ChartService.
506 """
507 # Tables that represent issue data.
508 self.issue_tbl = sql.SQLTableManager(ISSUE_TABLE_NAME)
509 self.issuesummary_tbl = sql.SQLTableManager(ISSUESUMMARY_TABLE_NAME)
510 self.issue2label_tbl = sql.SQLTableManager(ISSUE2LABEL_TABLE_NAME)
511 self.issue2component_tbl = sql.SQLTableManager(ISSUE2COMPONENT_TABLE_NAME)
512 self.issue2cc_tbl = sql.SQLTableManager(ISSUE2CC_TABLE_NAME)
513 self.issue2notify_tbl = sql.SQLTableManager(ISSUE2NOTIFY_TABLE_NAME)
514 self.issue2fieldvalue_tbl = sql.SQLTableManager(ISSUE2FIELDVALUE_TABLE_NAME)
515 self.issuerelation_tbl = sql.SQLTableManager(ISSUERELATION_TABLE_NAME)
516 self.danglingrelation_tbl = sql.SQLTableManager(DANGLINGRELATION_TABLE_NAME)
517 self.issueformerlocations_tbl = sql.SQLTableManager(
518 ISSUEFORMERLOCATIONS_TABLE_NAME)
519 self.issuesnapshot_tbl = sql.SQLTableManager(ISSUESNAPSHOT_TABLE_NAME)
520 self.issuesnapshot2cc_tbl = sql.SQLTableManager(
521 ISSUESNAPSHOT2CC_TABLE_NAME)
522 self.issuesnapshot2component_tbl = sql.SQLTableManager(
523 ISSUESNAPSHOT2COMPONENT_TABLE_NAME)
524 self.issuesnapshot2label_tbl = sql.SQLTableManager(
525 ISSUESNAPSHOT2LABEL_TABLE_NAME)
526 self.issuephasedef_tbl = sql.SQLTableManager(ISSUEPHASEDEF_TABLE_NAME)
527 self.issue2approvalvalue_tbl = sql.SQLTableManager(
528 ISSUE2APPROVALVALUE_TABLE_NAME)
529 self.issueapproval2approver_tbl = sql.SQLTableManager(
530 ISSUEAPPROVAL2APPROVER_TABLE_NAME)
531 self.issueapproval2comment_tbl = sql.SQLTableManager(
532 ISSUEAPPROVAL2COMMENT_TABLE_NAME)
533
534 # Tables that represent comments.
535 self.comment_tbl = sql.SQLTableManager(COMMENT_TABLE_NAME)
536 self.commentcontent_tbl = sql.SQLTableManager(COMMENTCONTENT_TABLE_NAME)
537 self.commentimporter_tbl = sql.SQLTableManager(COMMENTIMPORTER_TABLE_NAME)
538 self.issueupdate_tbl = sql.SQLTableManager(ISSUEUPDATE_TABLE_NAME)
539 self.attachment_tbl = sql.SQLTableManager(ATTACHMENT_TABLE_NAME)
540
541 # Tables for cron tasks.
542 self.reindexqueue_tbl = sql.SQLTableManager(REINDEXQUEUE_TABLE_NAME)
543
544 # Tables for generating sequences of local IDs.
545 self.localidcounter_tbl = sql.SQLTableManager(LOCALIDCOUNTER_TABLE_NAME)
546
547 # Like a dictionary {(project_id, local_id): issue_id}
548 # Use value centric cache here because we cannot store a tuple in the
549 # Invalidate table.
550 self.issue_id_2lc = IssueIDTwoLevelCache(cache_manager, self)
551 # Like a dictionary {issue_id: issue}
552 self.issue_2lc = IssueTwoLevelCache(
553 cache_manager, self, project_service, config_service)
554
555 # Like a dictionary {comment_id: comment)
556 self.comment_2lc = CommentTwoLevelCache(
557 cache_manager, self)
558
559 self._config_service = config_service
560 self.chart_service = chart_service
561
562 ### Issue ID lookups
563
564 def LookupIssueIDsFollowMoves(self, cnxn, project_local_id_pairs):
565 # type: (MonorailConnection, Sequence[Tuple(int, int)]) ->
566 # (Sequence[int], Sequence[Tuple(int, int)])
567 """Find the global issue IDs given the project ID and local ID of each.
568
569 If any (project_id, local_id) pairs refer to an issue that has been moved,
570 the issue ID will still be returned.
571
572 Args:
573 cnxn: Monorail connection.
574 project_local_id_pairs: (project_id, local_id) pairs to look up.
575
576 Returns:
577 A tuple of two items.
578 1. A sequence of global issue IDs in the `project_local_id_pairs` order.
579 2. A sequence of (project_id, local_id) containing each pair provided
580 for which no matching issue is found.
581 """
582
583 issue_id_dict, misses = self.issue_id_2lc.GetAll(
584 cnxn, project_local_id_pairs)
585 for miss in misses:
586 project_id, local_id = miss
587 issue_id = int(
588 self.issueformerlocations_tbl.SelectValue(
589 cnxn,
590 'issue_id',
591 default=0,
592 project_id=project_id,
593 local_id=local_id))
594 if issue_id:
595 misses.remove(miss)
596 issue_id_dict[miss] = issue_id
597 # Put the Issue IDs in the order specified by project_local_id_pairs
598 issue_ids = [
599 issue_id_dict[pair]
600 for pair in project_local_id_pairs
601 if pair in issue_id_dict
602 ]
603
604 return issue_ids, misses
605
606 def LookupIssueIDs(self, cnxn, project_local_id_pairs):
607 """Find the global issue IDs given the project ID and local ID of each."""
608 issue_id_dict, misses = self.issue_id_2lc.GetAll(
609 cnxn, project_local_id_pairs)
610
611 # Put the Issue IDs in the order specified by project_local_id_pairs
612 issue_ids = [issue_id_dict[pair] for pair in project_local_id_pairs
613 if pair in issue_id_dict]
614
615 return issue_ids, misses
616
617 def LookupIssueID(self, cnxn, project_id, local_id):
618 """Find the global issue ID given the project ID and local ID."""
619 issue_ids, _misses = self.LookupIssueIDs(cnxn, [(project_id, local_id)])
620 try:
621 return issue_ids[0]
622 except IndexError:
623 raise exceptions.NoSuchIssueException()
624
625 def ResolveIssueRefs(
626 self, cnxn, ref_projects, default_project_name, refs):
627 """Look up all the referenced issues and return their issue_ids.
628
629 Args:
630 cnxn: connection to SQL database.
631 ref_projects: pre-fetched dict {project_name: project} of all projects
632 mentioned in the refs as well as the default project.
633 default_project_name: string name of the current project, this is used
634 when the project_name in a ref is None.
635 refs: list of (project_name, local_id) pairs. These are parsed from
636 textual references in issue descriptions, comments, and the input
637 in the blocked-on field.
638
639 Returns:
640 A list of issue_ids for all the referenced issues. References to issues
641 in deleted projects and any issues not found are simply ignored.
642 """
643 if not refs:
644 return [], []
645
646 project_local_id_pairs = []
647 for project_name, local_id in refs:
648 project = ref_projects.get(project_name or default_project_name)
649 if not project or project.state == project_pb2.ProjectState.DELETABLE:
650 continue # ignore any refs to issues in deleted projects
651 project_local_id_pairs.append((project.project_id, local_id))
652
653 return self.LookupIssueIDs(cnxn, project_local_id_pairs) # tuple
654
655 def LookupIssueRefs(self, cnxn, issue_ids):
656 """Return {issue_id: (project_name, local_id)} for each issue_id."""
657 issue_dict, _misses = self.GetIssuesDict(cnxn, issue_ids)
658 return {
659 issue_id: (issue.project_name, issue.local_id)
660 for issue_id, issue in issue_dict.items()}
661
662 ### Issue objects
663
664 def CreateIssue(
665 self,
666 cnxn,
667 services,
668 issue,
669 marked_description,
670 attachments=None,
671 index_now=False,
672 importer_id=None):
673 """Create and store a new issue with all the given information.
674
675 Args:
676 cnxn: connection to SQL database.
677 services: persistence layer for users, issues, and projects.
678 issue: Issue PB to create.
679 marked_description: issue description with initial HTML markup.
680 attachments: [(filename, contents, mimetype),...] attachments uploaded at
681 the time the comment was made.
682 index_now: True if the issue should be updated in the full text index.
683 importer_id: optional user ID of API client importing issues for users.
684
685 Returns:
686 A tuple (the newly created Issue PB and Comment PB for the
687 issue description).
688 """
689 project_id = issue.project_id
690 reporter_id = issue.reporter_id
691 timestamp = issue.opened_timestamp
692 config = self._config_service.GetProjectConfig(cnxn, project_id)
693
694 iids_to_invalidate = set()
695 if len(issue.blocked_on_iids) != 0:
696 iids_to_invalidate.update(issue.blocked_on_iids)
697 if len(issue.blocking_iids) != 0:
698 iids_to_invalidate.update(issue.blocking_iids)
699
700 comment = self._MakeIssueComment(
701 project_id, reporter_id, marked_description,
702 attachments=attachments, timestamp=timestamp,
703 is_description=True, importer_id=importer_id)
704
705 reporter = services.user.GetUser(cnxn, reporter_id)
706 project = services.project.GetProject(cnxn, project_id)
707 reporter_auth = authdata.AuthData.FromUserID(cnxn, reporter_id, services)
708 is_project_member = framework_bizobj.UserIsInProject(
709 project, reporter_auth.effective_ids)
710 classification = services.spam.ClassifyIssue(
711 issue, comment, reporter, is_project_member)
712
713 if classification['confidence_is_spam'] > settings.classifier_spam_thresh:
714 issue.is_spam = True
715 predicted_label = 'spam'
716 else:
717 predicted_label = 'ham'
718
719 logging.info('classified new issue as %s' % predicted_label)
720 self.spam_labels.increment({'type': predicted_label})
721
722 # Create approval surveys
723 approval_comments = []
724 if len(issue.approval_values) != 0:
725 approval_defs_by_id = {ad.approval_id: ad for ad in config.approval_defs}
726 for av in issue.approval_values:
727 ad = approval_defs_by_id.get(av.approval_id)
728 if ad:
729 survey = ''
730 if ad.survey:
731 questions = ad.survey.split('\n')
732 survey = '\n'.join(['<b>' + q + '</b>' for q in questions])
733 approval_comments.append(self._MakeIssueComment(
734 project_id, reporter_id, survey, timestamp=timestamp,
735 is_description=True, approval_id=ad.approval_id))
736 else:
737 logging.info('Could not find ApprovalDef with approval_id %r',
738 av.approval_id)
739
740 issue.local_id = self.AllocateNextLocalID(cnxn, project_id)
741 self.issue_creations.increment()
742 issue_id = self.InsertIssue(cnxn, issue)
743 comment.issue_id = issue_id
744 self.InsertComment(cnxn, comment)
745 for approval_comment in approval_comments:
746 approval_comment.issue_id = issue_id
747 self.InsertComment(cnxn, approval_comment)
748
749 issue.issue_id = issue_id
750
751 # ClassifyIssue only returns confidence_is_spam, but
752 # RecordClassifierIssueVerdict records confidence of
753 # ham or spam. Therefore if ham, invert score.
754 confidence = classification['confidence_is_spam']
755 if not issue.is_spam:
756 confidence = 1.0 - confidence
757
758 services.spam.RecordClassifierIssueVerdict(
759 cnxn, issue, predicted_label=='spam',
760 confidence, classification['failed_open'])
761
762 if permissions.HasRestrictions(issue, 'view'):
763 self._config_service.InvalidateMemcache(
764 [issue], key_prefix='nonviewable:')
765
766 # Add a comment to existing issues saying they are now blocking or
767 # blocked on this issue.
768 blocked_add_issues = self.GetIssues(cnxn, issue.blocked_on_iids)
769 for add_issue in blocked_add_issues:
770 self.CreateIssueComment(
771 cnxn, add_issue, reporter_id, content='',
772 amendments=[tracker_bizobj.MakeBlockingAmendment(
773 [(issue.project_name, issue.local_id)], [],
774 default_project_name=add_issue.project_name)])
775 blocking_add_issues = self.GetIssues(cnxn, issue.blocking_iids)
776 for add_issue in blocking_add_issues:
777 self.CreateIssueComment(
778 cnxn, add_issue, reporter_id, content='',
779 amendments=[tracker_bizobj.MakeBlockedOnAmendment(
780 [(issue.project_name, issue.local_id)], [],
781 default_project_name=add_issue.project_name)])
782
783 self._UpdateIssuesModified(
784 cnxn, iids_to_invalidate, modified_timestamp=timestamp)
785
786 if index_now:
787 tracker_fulltext.IndexIssues(
788 cnxn, [issue], services.user, self, self._config_service)
789 else:
790 self.EnqueueIssuesForIndexing(cnxn, [issue.issue_id])
791
792 return issue, comment
793
794 def AllocateNewLocalIDs(self, cnxn, issues):
795 # Filter to just the issues that need new local IDs.
796 issues = [issue for issue in issues if issue.local_id < 0]
797
798 for issue in issues:
799 if issue.local_id < 0:
800 issue.local_id = self.AllocateNextLocalID(cnxn, issue.project_id)
801
802 self.UpdateIssues(cnxn, issues)
803
804 logging.info("AllocateNewLocalIDs")
805
806 def GetAllIssuesInProject(
807 self, cnxn, project_id, min_local_id=None, use_cache=True):
808 """Special query to efficiently get ALL issues in a project.
809
810 This is not done while the user is waiting, only by backround tasks.
811
812 Args:
813 cnxn: connection to SQL database.
814 project_id: the ID of the project.
815 min_local_id: optional int to start at.
816 use_cache: optional boolean to turn off using the cache.
817
818 Returns:
819 A list of Issue protocol buffers for all issues.
820 """
821 all_local_ids = self.GetAllLocalIDsInProject(
822 cnxn, project_id, min_local_id=min_local_id)
823 return self.GetIssuesByLocalIDs(
824 cnxn, project_id, all_local_ids, use_cache=use_cache)
825
826 def GetAnyOnHandIssue(self, issue_ids, start=None, end=None):
827 """Get any one issue from RAM or memcache, otherwise return None."""
828 return self.issue_2lc.GetAnyOnHandItem(issue_ids, start=start, end=end)
829
830 def GetIssuesDict(self, cnxn, issue_ids, use_cache=True, shard_id=None):
831 # type: (MonorailConnection, Collection[int], Optional[Boolean],
832 # Optional[int]) -> (Dict[int, Issue], Sequence[int])
833 """Get a dict {iid: issue} from the DB or cache.
834
835 Returns:
836 A dict {iid: issue} from the DB or cache.
837 A sequence of iid that could not be found.
838 """
839 issue_dict, missed_iids = self.issue_2lc.GetAll(
840 cnxn, issue_ids, use_cache=use_cache, shard_id=shard_id)
841 if not use_cache:
842 for issue in issue_dict.values():
843 issue.assume_stale = False
844 return issue_dict, missed_iids
845
846 def GetIssues(self, cnxn, issue_ids, use_cache=True, shard_id=None):
847 # type: (MonorailConnection, Sequence[int], Optional[Boolean],
848 # Optional[int]) -> (Sequence[int])
849 """Get a list of Issue PBs from the DB or cache.
850
851 Args:
852 cnxn: connection to SQL database.
853 issue_ids: integer global issue IDs of the issues.
854 use_cache: optional boolean to turn off using the cache.
855 shard_id: optional int shard_id to limit retrieval.
856
857 Returns:
858 A list of Issue PBs in the same order as the given issue_ids.
859 """
860 issue_dict, _misses = self.GetIssuesDict(
861 cnxn, issue_ids, use_cache=use_cache, shard_id=shard_id)
862
863 # Return a list that is ordered the same as the given issue_ids.
864 issue_list = [issue_dict[issue_id] for issue_id in issue_ids
865 if issue_id in issue_dict]
866
867 return issue_list
868
869 def GetIssue(self, cnxn, issue_id, use_cache=True):
870 """Get one Issue PB from the DB.
871
872 Args:
873 cnxn: connection to SQL database.
874 issue_id: integer global issue ID of the issue.
875 use_cache: optional boolean to turn off using the cache.
876
877 Returns:
878 The requested Issue protocol buffer.
879
880 Raises:
881 NoSuchIssueException: the issue was not found.
882 """
883 issues = self.GetIssues(cnxn, [issue_id], use_cache=use_cache)
884 try:
885 return issues[0]
886 except IndexError:
887 raise exceptions.NoSuchIssueException()
888
889 def GetIssuesByLocalIDs(
890 self, cnxn, project_id, local_id_list, use_cache=True, shard_id=None):
891 """Get all the requested issues.
892
893 Args:
894 cnxn: connection to SQL database.
895 project_id: int ID of the project to which the issues belong.
896 local_id_list: list of integer local IDs for the requested issues.
897 use_cache: optional boolean to turn off using the cache.
898 shard_id: optional int shard_id to choose a replica.
899
900 Returns:
901 List of Issue PBs for the requested issues. The result Issues
902 will be ordered in the same order as local_id_list.
903 """
904 issue_ids_to_fetch, _misses = self.LookupIssueIDs(
905 cnxn, [(project_id, local_id) for local_id in local_id_list])
906 issues = self.GetIssues(
907 cnxn, issue_ids_to_fetch, use_cache=use_cache, shard_id=shard_id)
908 return issues
909
910 def GetIssueByLocalID(self, cnxn, project_id, local_id, use_cache=True):
911 """Get one Issue PB from the DB.
912
913 Args:
914 cnxn: connection to SQL database.
915 project_id: the ID of the project to which the issue belongs.
916 local_id: integer local ID of the issue.
917 use_cache: optional boolean to turn off using the cache.
918
919 Returns:
920 The requested Issue protocol buffer.
921 """
922 issues = self.GetIssuesByLocalIDs(
923 cnxn, project_id, [local_id], use_cache=use_cache)
924 try:
925 return issues[0]
926 except IndexError:
927 raise exceptions.NoSuchIssueException(
928 'The issue %s:%d does not exist.' % (project_id, local_id))
929
930 def GetOpenAndClosedIssues(self, cnxn, issue_ids):
931 """Return the requested issues in separate open and closed lists.
932
933 Args:
934 cnxn: connection to SQL database.
935 issue_ids: list of int issue issue_ids.
936
937 Returns:
938 A pair of lists, the first with open issues, second with closed issues.
939 """
940 if not issue_ids:
941 return [], [] # make one common case efficient
942
943 issues = self.GetIssues(cnxn, issue_ids)
944 project_ids = {issue.project_id for issue in issues}
945 configs = self._config_service.GetProjectConfigs(cnxn, project_ids)
946 open_issues = []
947 closed_issues = []
948 for issue in issues:
949 config = configs[issue.project_id]
950 if tracker_helpers.MeansOpenInProject(
951 tracker_bizobj.GetStatus(issue), config):
952 open_issues.append(issue)
953 else:
954 closed_issues.append(issue)
955
956 return open_issues, closed_issues
957
958 # TODO(crbug.com/monorail/7822): Delete this method when V0 API retired.
959 def GetCurrentLocationOfMovedIssue(self, cnxn, project_id, local_id):
960 """Return the current location of a moved issue based on old location."""
961 issue_id = int(self.issueformerlocations_tbl.SelectValue(
962 cnxn, 'issue_id', default=0, project_id=project_id, local_id=local_id))
963 if not issue_id:
964 return None, None
965 project_id, local_id = self.issue_tbl.SelectRow(
966 cnxn, cols=['project_id', 'local_id'], id=issue_id)
967 return project_id, local_id
968
969 def GetPreviousLocations(self, cnxn, issue):
970 """Get all the previous locations of an issue."""
971 location_rows = self.issueformerlocations_tbl.Select(
972 cnxn, cols=['project_id', 'local_id'], issue_id=issue.issue_id)
973 locations = [(pid, local_id) for (pid, local_id) in location_rows
974 if pid != issue.project_id or local_id != issue.local_id]
975 return locations
976
977 def GetCommentsByUser(self, cnxn, user_id):
978 """Get all comments created by a user"""
979 comments = self.GetComments(cnxn, commenter_id=user_id,
980 is_description=False, limit=10000)
981 return comments
982
983 def GetIssueActivity(self, cnxn, num=50, before=None, after=None,
984 project_ids=None, user_ids=None, ascending=False):
985
986 if project_ids:
987 use_clause = (
988 'USE INDEX (project_id) USE INDEX FOR ORDER BY (project_id)')
989 elif user_ids:
990 use_clause = (
991 'USE INDEX (commenter_id) USE INDEX FOR ORDER BY (commenter_id)')
992 else:
993 use_clause = ''
994
995 # TODO(jrobbins): make this into a persist method.
996 # TODO(jrobbins): this really needs permission checking in SQL, which
997 # will be slow.
998 where_conds = [('Issue.id = Comment.issue_id', [])]
999 if project_ids is not None:
1000 cond_str = 'Comment.project_id IN (%s)' % sql.PlaceHolders(project_ids)
1001 where_conds.append((cond_str, project_ids))
1002 if user_ids is not None:
1003 cond_str = 'Comment.commenter_id IN (%s)' % sql.PlaceHolders(user_ids)
1004 where_conds.append((cond_str, user_ids))
1005
1006 if before:
1007 where_conds.append(('created < %s', [before]))
1008 if after:
1009 where_conds.append(('created > %s', [after]))
1010 if ascending:
1011 order_by = [('created', [])]
1012 else:
1013 order_by = [('created DESC', [])]
1014
1015 comments = self.GetComments(
1016 cnxn, joins=[('Issue', [])], deleted_by=None, where=where_conds,
1017 use_clause=use_clause, order_by=order_by, limit=num + 1)
1018 return comments
1019
1020 def GetIssueIDsReportedByUser(self, cnxn, user_id):
1021 """Get all issue IDs created by a user"""
1022 rows = self.issue_tbl.Select(cnxn, cols=['id'], reporter_id=user_id,
1023 limit=10000)
1024 return [row[0] for row in rows]
1025
1026 def InsertIssue(self, cnxn, issue):
1027 """Store the given issue in SQL.
1028
1029 Args:
1030 cnxn: connection to SQL database.
1031 issue: Issue PB to insert into the database.
1032
1033 Returns:
1034 The int issue_id of the newly created issue.
1035 """
1036 status_id = self._config_service.LookupStatusID(
1037 cnxn, issue.project_id, issue.status)
1038 row = (issue.project_id, issue.local_id, status_id,
1039 issue.owner_id or None,
1040 issue.reporter_id,
1041 issue.opened_timestamp,
1042 issue.closed_timestamp,
1043 issue.modified_timestamp,
1044 issue.owner_modified_timestamp,
1045 issue.status_modified_timestamp,
1046 issue.component_modified_timestamp,
1047 issue.derived_owner_id or None,
1048 self._config_service.LookupStatusID(
1049 cnxn, issue.project_id, issue.derived_status),
1050 bool(issue.deleted),
1051 issue.star_count, issue.attachment_count,
1052 issue.is_spam)
1053 # ISSUE_COLs[1:] to skip setting the ID
1054 # Insert into the Primary DB.
1055 generated_ids = self.issue_tbl.InsertRows(
1056 cnxn, ISSUE_COLS[1:], [row], commit=False, return_generated_ids=True)
1057 issue_id = generated_ids[0]
1058 issue.issue_id = issue_id
1059 self.issue_tbl.Update(
1060 cnxn, {'shard': issue_id % settings.num_logical_shards},
1061 id=issue.issue_id, commit=False)
1062
1063 self._UpdateIssuesSummary(cnxn, [issue], commit=False)
1064 self._UpdateIssuesLabels(cnxn, [issue], commit=False)
1065 self._UpdateIssuesFields(cnxn, [issue], commit=False)
1066 self._UpdateIssuesComponents(cnxn, [issue], commit=False)
1067 self._UpdateIssuesCc(cnxn, [issue], commit=False)
1068 self._UpdateIssuesNotify(cnxn, [issue], commit=False)
1069 self._UpdateIssuesRelation(cnxn, [issue], commit=False)
1070 self._UpdateIssuesApprovals(cnxn, issue, commit=False)
1071 self.chart_service.StoreIssueSnapshots(cnxn, [issue], commit=False)
1072 cnxn.Commit()
1073 self._config_service.InvalidateMemcache([issue])
1074
1075 return issue_id
1076
1077 def UpdateIssues(
1078 self, cnxn, issues, update_cols=None, just_derived=False, commit=True,
1079 invalidate=True):
1080 """Update the given issues in SQL.
1081
1082 Args:
1083 cnxn: connection to SQL database.
1084 issues: list of issues to update, these must have been loaded with
1085 use_cache=False so that issue.assume_stale is False.
1086 update_cols: optional list of just the field names to update.
1087 just_derived: set to True when only updating derived fields.
1088 commit: set to False to skip the DB commit and do it in the caller.
1089 invalidate: set to False to leave cache invalidatation to the caller.
1090 """
1091 if not issues:
1092 return
1093
1094 for issue in issues: # slow, but mysql will not allow REPLACE rows.
1095 assert not issue.assume_stale, (
1096 'issue2514: Storing issue that might be stale: %r' % issue)
1097 delta = {
1098 'project_id': issue.project_id,
1099 'local_id': issue.local_id,
1100 'owner_id': issue.owner_id or None,
1101 'status_id': self._config_service.LookupStatusID(
1102 cnxn, issue.project_id, issue.status) or None,
1103 'opened': issue.opened_timestamp,
1104 'closed': issue.closed_timestamp,
1105 'modified': issue.modified_timestamp,
1106 'owner_modified': issue.owner_modified_timestamp,
1107 'status_modified': issue.status_modified_timestamp,
1108 'component_modified': issue.component_modified_timestamp,
1109 'derived_owner_id': issue.derived_owner_id or None,
1110 'derived_status_id': self._config_service.LookupStatusID(
1111 cnxn, issue.project_id, issue.derived_status) or None,
1112 'deleted': bool(issue.deleted),
1113 'star_count': issue.star_count,
1114 'attachment_count': issue.attachment_count,
1115 'is_spam': issue.is_spam,
1116 }
1117 if update_cols is not None:
1118 delta = {key: val for key, val in delta.items()
1119 if key in update_cols}
1120 self.issue_tbl.Update(cnxn, delta, id=issue.issue_id, commit=False)
1121
1122 if not update_cols:
1123 self._UpdateIssuesLabels(cnxn, issues, commit=False)
1124 self._UpdateIssuesCc(cnxn, issues, commit=False)
1125 self._UpdateIssuesFields(cnxn, issues, commit=False)
1126 self._UpdateIssuesComponents(cnxn, issues, commit=False)
1127 self._UpdateIssuesNotify(cnxn, issues, commit=False)
1128 if not just_derived:
1129 self._UpdateIssuesSummary(cnxn, issues, commit=False)
1130 self._UpdateIssuesRelation(cnxn, issues, commit=False)
1131
1132 self.chart_service.StoreIssueSnapshots(cnxn, issues, commit=False)
1133
1134 iids_to_invalidate = [issue.issue_id for issue in issues]
1135 if just_derived and invalidate:
1136 self.issue_2lc.InvalidateAllKeys(cnxn, iids_to_invalidate)
1137 elif invalidate:
1138 self.issue_2lc.InvalidateKeys(cnxn, iids_to_invalidate)
1139 if commit:
1140 cnxn.Commit()
1141 if invalidate:
1142 self._config_service.InvalidateMemcache(issues)
1143
1144 def UpdateIssue(
1145 self, cnxn, issue, update_cols=None, just_derived=False, commit=True,
1146 invalidate=True):
1147 """Update the given issue in SQL.
1148
1149 Args:
1150 cnxn: connection to SQL database.
1151 issue: the issue to update.
1152 update_cols: optional list of just the field names to update.
1153 just_derived: set to True when only updating derived fields.
1154 commit: set to False to skip the DB commit and do it in the caller.
1155 invalidate: set to False to leave cache invalidatation to the caller.
1156 """
1157 self.UpdateIssues(
1158 cnxn, [issue], update_cols=update_cols, just_derived=just_derived,
1159 commit=commit, invalidate=invalidate)
1160
1161 def _UpdateIssuesSummary(self, cnxn, issues, commit=True):
1162 """Update the IssueSummary table rows for the given issues."""
1163 self.issuesummary_tbl.InsertRows(
1164 cnxn, ISSUESUMMARY_COLS,
1165 [(issue.issue_id, issue.summary) for issue in issues],
1166 replace=True, commit=commit)
1167
1168 def _UpdateIssuesLabels(self, cnxn, issues, commit=True):
1169 """Update the Issue2Label table rows for the given issues."""
1170 label_rows = []
1171 for issue in issues:
1172 issue_shard = issue.issue_id % settings.num_logical_shards
1173 # TODO(jrobbins): If the user adds many novel labels in one issue update,
1174 # that could be slow. Solution is to add all new labels in a batch first.
1175 label_rows.extend(
1176 (issue.issue_id,
1177 self._config_service.LookupLabelID(cnxn, issue.project_id, label),
1178 False,
1179 issue_shard)
1180 for label in issue.labels)
1181 label_rows.extend(
1182 (issue.issue_id,
1183 self._config_service.LookupLabelID(cnxn, issue.project_id, label),
1184 True,
1185 issue_shard)
1186 for label in issue.derived_labels)
1187
1188 self.issue2label_tbl.Delete(
1189 cnxn, issue_id=[issue.issue_id for issue in issues],
1190 commit=False)
1191 self.issue2label_tbl.InsertRows(
1192 cnxn, ISSUE2LABEL_COLS + ['issue_shard'],
1193 label_rows, ignore=True, commit=commit)
1194
1195 def _UpdateIssuesFields(self, cnxn, issues, commit=True):
1196 """Update the Issue2FieldValue table rows for the given issues."""
1197 fieldvalue_rows = []
1198 for issue in issues:
1199 issue_shard = issue.issue_id % settings.num_logical_shards
1200 for fv in issue.field_values:
1201 fieldvalue_rows.append(
1202 (issue.issue_id, fv.field_id, fv.int_value, fv.str_value,
1203 fv.user_id or None, fv.date_value, fv.url_value, fv.derived,
1204 fv.phase_id or None, issue_shard))
1205
1206 self.issue2fieldvalue_tbl.Delete(
1207 cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
1208 self.issue2fieldvalue_tbl.InsertRows(
1209 cnxn, ISSUE2FIELDVALUE_COLS + ['issue_shard'],
1210 fieldvalue_rows, commit=commit)
1211
1212 def _UpdateIssuesComponents(self, cnxn, issues, commit=True):
1213 """Update the Issue2Component table rows for the given issues."""
1214 issue2component_rows = []
1215 for issue in issues:
1216 issue_shard = issue.issue_id % settings.num_logical_shards
1217 issue2component_rows.extend(
1218 (issue.issue_id, component_id, False, issue_shard)
1219 for component_id in issue.component_ids)
1220 issue2component_rows.extend(
1221 (issue.issue_id, component_id, True, issue_shard)
1222 for component_id in issue.derived_component_ids)
1223
1224 self.issue2component_tbl.Delete(
1225 cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
1226 self.issue2component_tbl.InsertRows(
1227 cnxn, ISSUE2COMPONENT_COLS + ['issue_shard'],
1228 issue2component_rows, ignore=True, commit=commit)
1229
1230 def _UpdateIssuesCc(self, cnxn, issues, commit=True):
1231 """Update the Issue2Cc table rows for the given issues."""
1232 cc_rows = []
1233 for issue in issues:
1234 issue_shard = issue.issue_id % settings.num_logical_shards
1235 cc_rows.extend(
1236 (issue.issue_id, cc_id, False, issue_shard)
1237 for cc_id in issue.cc_ids)
1238 cc_rows.extend(
1239 (issue.issue_id, cc_id, True, issue_shard)
1240 for cc_id in issue.derived_cc_ids)
1241
1242 self.issue2cc_tbl.Delete(
1243 cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
1244 self.issue2cc_tbl.InsertRows(
1245 cnxn, ISSUE2CC_COLS + ['issue_shard'],
1246 cc_rows, ignore=True, commit=commit)
1247
1248 def _UpdateIssuesNotify(self, cnxn, issues, commit=True):
1249 """Update the Issue2Notify table rows for the given issues."""
1250 notify_rows = []
1251 for issue in issues:
1252 derived_rows = [[issue.issue_id, email]
1253 for email in issue.derived_notify_addrs]
1254 notify_rows.extend(derived_rows)
1255
1256 self.issue2notify_tbl.Delete(
1257 cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
1258 self.issue2notify_tbl.InsertRows(
1259 cnxn, ISSUE2NOTIFY_COLS, notify_rows, ignore=True, commit=commit)
1260
1261 def _UpdateIssuesRelation(self, cnxn, issues, commit=True):
1262 """Update the IssueRelation table rows for the given issues."""
1263 relation_rows = []
1264 blocking_rows = []
1265 dangling_relation_rows = []
1266 for issue in issues:
1267 for i, dst_issue_id in enumerate(issue.blocked_on_iids):
1268 rank = issue.blocked_on_ranks[i]
1269 relation_rows.append((issue.issue_id, dst_issue_id, 'blockedon', rank))
1270 for dst_issue_id in issue.blocking_iids:
1271 blocking_rows.append((dst_issue_id, issue.issue_id, 'blockedon'))
1272 for dst_ref in issue.dangling_blocked_on_refs:
1273 if dst_ref.ext_issue_identifier:
1274 dangling_relation_rows.append((
1275 issue.issue_id, None, None,
1276 dst_ref.ext_issue_identifier, 'blockedon'))
1277 else:
1278 dangling_relation_rows.append((
1279 issue.issue_id, dst_ref.project, dst_ref.issue_id,
1280 None, 'blockedon'))
1281 for dst_ref in issue.dangling_blocking_refs:
1282 if dst_ref.ext_issue_identifier:
1283 dangling_relation_rows.append((
1284 issue.issue_id, None, None,
1285 dst_ref.ext_issue_identifier, 'blocking'))
1286 else:
1287 dangling_relation_rows.append((
1288 issue.issue_id, dst_ref.project, dst_ref.issue_id,
1289 dst_ref.ext_issue_identifier, 'blocking'))
1290 if issue.merged_into:
1291 relation_rows.append((
1292 issue.issue_id, issue.merged_into, 'mergedinto', None))
1293 if issue.merged_into_external:
1294 dangling_relation_rows.append((
1295 issue.issue_id, None, None,
1296 issue.merged_into_external, 'mergedinto'))
1297
1298 old_blocking = self.issuerelation_tbl.Select(
1299 cnxn, cols=ISSUERELATION_COLS[:-1],
1300 dst_issue_id=[issue.issue_id for issue in issues], kind='blockedon')
1301 relation_rows.extend([
1302 (row + (0,)) for row in blocking_rows if row not in old_blocking])
1303 delete_rows = [row for row in old_blocking if row not in blocking_rows]
1304
1305 for issue_id, dst_issue_id, kind in delete_rows:
1306 self.issuerelation_tbl.Delete(cnxn, issue_id=issue_id,
1307 dst_issue_id=dst_issue_id, kind=kind, commit=False)
1308 self.issuerelation_tbl.Delete(
1309 cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
1310 self.issuerelation_tbl.InsertRows(
1311 cnxn, ISSUERELATION_COLS, relation_rows, ignore=True, commit=commit)
1312 self.danglingrelation_tbl.Delete(
1313 cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
1314 self.danglingrelation_tbl.InsertRows(
1315 cnxn, DANGLINGRELATION_COLS, dangling_relation_rows, ignore=True,
1316 commit=commit)
1317
1318 def _UpdateIssuesModified(
1319 self, cnxn, iids, modified_timestamp=None, invalidate=True):
1320 """Store a modified timestamp for each of the specified issues."""
1321 if not iids:
1322 return
1323 delta = {'modified': modified_timestamp or int(time.time())}
1324 self.issue_tbl.Update(cnxn, delta, id=iids, commit=False)
1325 if invalidate:
1326 self.InvalidateIIDs(cnxn, iids)
1327
1328 def _UpdateIssuesApprovals(self, cnxn, issue, commit=True):
1329 """Update the Issue2ApprovalValue table rows for the given issue."""
1330 self.issue2approvalvalue_tbl.Delete(
1331 cnxn, issue_id=issue.issue_id, commit=commit)
1332 av_rows = [(av.approval_id, issue.issue_id, av.phase_id,
1333 av.status.name.lower(), av.setter_id, av.set_on) for
1334 av in issue.approval_values]
1335 self.issue2approvalvalue_tbl.InsertRows(
1336 cnxn, ISSUE2APPROVALVALUE_COLS, av_rows, commit=commit)
1337
1338 approver_rows = []
1339 for av in issue.approval_values:
1340 approver_rows.extend([(av.approval_id, approver_id, issue.issue_id)
1341 for approver_id in av.approver_ids])
1342 self.issueapproval2approver_tbl.Delete(
1343 cnxn, issue_id=issue.issue_id, commit=commit)
1344 self.issueapproval2approver_tbl.InsertRows(
1345 cnxn, ISSUEAPPROVAL2APPROVER_COLS, approver_rows, commit=commit)
1346
1347 def UpdateIssueStructure(self, cnxn, config, issue, template, reporter_id,
1348 comment_content, commit=True, invalidate=True):
1349 """Converts the phases and approvals structure of the issue into the
1350 structure of the given template."""
1351 # TODO(jojwang): Remove Field defs that belong to any removed approvals.
1352 approval_defs_by_id = {ad.approval_id: ad for ad in config.approval_defs}
1353 issue_avs_by_id = {av.approval_id: av for av in issue.approval_values}
1354
1355 new_approval_surveys = []
1356 new_issue_approvals = []
1357
1358 for template_av in template.approval_values:
1359 existing_issue_av = issue_avs_by_id.get(template_av.approval_id)
1360
1361 # Update all approval surveys so latest ApprovalDef survey changes
1362 # appear in the converted issue's approval values.
1363 ad = approval_defs_by_id.get(template_av.approval_id)
1364 new_av_approver_ids = []
1365 if ad:
1366 new_av_approver_ids = ad.approver_ids
1367 new_approval_surveys.append(
1368 self._MakeIssueComment(
1369 issue.project_id, reporter_id, ad.survey,
1370 is_description=True, approval_id=ad.approval_id))
1371 else:
1372 logging.info('ApprovalDef not found for approval %r', template_av)
1373
1374 # Keep approval values as-is if it exists in issue and template
1375 if existing_issue_av:
1376 new_av = tracker_bizobj.MakeApprovalValue(
1377 existing_issue_av.approval_id,
1378 approver_ids=existing_issue_av.approver_ids,
1379 status=existing_issue_av.status,
1380 setter_id=existing_issue_av.setter_id,
1381 set_on=existing_issue_av.set_on,
1382 phase_id=template_av.phase_id)
1383 new_issue_approvals.append(new_av)
1384 else:
1385 new_av = tracker_bizobj.MakeApprovalValue(
1386 template_av.approval_id, approver_ids=new_av_approver_ids,
1387 status=template_av.status, phase_id=template_av.phase_id)
1388 new_issue_approvals.append(new_av)
1389
1390 template_phase_by_name = {
1391 phase.name.lower(): phase for phase in template.phases}
1392 issue_phase_by_id = {phase.phase_id: phase for phase in issue.phases}
1393 updated_fvs = []
1394 # Trim issue FieldValues or update FieldValue phase_ids
1395 for fv in issue.field_values:
1396 # If a fv's phase has the same name as a template's phase, update
1397 # the fv's phase_id to that of the template phase's. Otherwise,
1398 # remove the fv.
1399 if fv.phase_id:
1400 issue_phase = issue_phase_by_id.get(fv.phase_id)
1401 if issue_phase and issue_phase.name:
1402 template_phase = template_phase_by_name.get(issue_phase.name.lower())
1403 # TODO(jojwang): monorail:4693, remove this after all 'stable-full'
1404 # gates have been renamed to 'stable'.
1405 if not template_phase:
1406 template_phase = template_phase_by_name.get(
1407 FLT_EQUIVALENT_GATES.get(issue_phase.name.lower()))
1408 if template_phase:
1409 fv.phase_id = template_phase.phase_id
1410 updated_fvs.append(fv)
1411 # keep all fvs that do not belong to phases.
1412 else:
1413 updated_fvs.append(fv)
1414
1415 fd_names_by_id = {fd.field_id: fd.field_name for fd in config.field_defs}
1416 amendment = tracker_bizobj.MakeApprovalStructureAmendment(
1417 [fd_names_by_id.get(av.approval_id) for av in new_issue_approvals],
1418 [fd_names_by_id.get(av.approval_id) for av in issue.approval_values])
1419
1420 # Update issue structure in RAM.
1421 issue.approval_values = new_issue_approvals
1422 issue.phases = template.phases
1423 issue.field_values = updated_fvs
1424
1425 # Update issue structure in DB.
1426 for survey in new_approval_surveys:
1427 survey.issue_id = issue.issue_id
1428 self.InsertComment(cnxn, survey, commit=False)
1429 self._UpdateIssuesApprovals(cnxn, issue, commit=False)
1430 self._UpdateIssuesFields(cnxn, [issue], commit=False)
1431 comment_pb = self.CreateIssueComment(
1432 cnxn, issue, reporter_id, comment_content,
1433 amendments=[amendment], commit=False)
1434
1435 if commit:
1436 cnxn.Commit()
1437
1438 if invalidate:
1439 self.InvalidateIIDs(cnxn, [issue.issue_id])
1440
1441 return comment_pb
1442
1443 def DeltaUpdateIssue(
1444 self, cnxn, services, reporter_id, project_id,
1445 config, issue, delta, index_now=False, comment=None, attachments=None,
1446 iids_to_invalidate=None, rules=None, predicate_asts=None,
1447 is_description=False, timestamp=None, kept_attachments=None,
1448 importer_id=None, inbound_message=None):
1449 """Update the issue in the database and return a set of update tuples.
1450
1451 Args:
1452 cnxn: connection to SQL database.
1453 services: connections to persistence layer.
1454 reporter_id: user ID of the user making this change.
1455 project_id: int ID for the current project.
1456 config: ProjectIssueConfig PB for this project.
1457 issue: Issue PB of issue to update.
1458 delta: IssueDelta object of fields to update.
1459 index_now: True if the issue should be updated in the full text index.
1460 comment: This should be the content of the comment
1461 corresponding to this change.
1462 attachments: List [(filename, contents, mimetype),...] of attachments.
1463 iids_to_invalidate: optional set of issue IDs that need to be invalidated.
1464 If provided, affected issues will be accumulated here and, the caller
1465 must call InvalidateIIDs() afterwards.
1466 rules: optional list of preloaded FilterRule PBs for this project.
1467 predicate_asts: optional list of QueryASTs for the rules. If rules are
1468 provided, then predicate_asts should also be provided.
1469 is_description: True if the comment is a new description for the issue.
1470 timestamp: int timestamp set during testing, otherwise defaults to
1471 int(time.time()).
1472 kept_attachments: This should be a list of int attachment ids for
1473 attachments kept from previous descriptions, if the comment is
1474 a change to the issue description
1475 importer_id: optional ID of user ID for an API client that is importing
1476 issues and attributing them to other users.
1477 inbound_message: optional string full text of an email that caused
1478 this comment to be added.
1479
1480 Returns:
1481 A tuple (amendments, comment_pb) with a list of Amendment PBs that
1482 describe the set of metadata updates that the user made, and the
1483 resulting IssueComment (or None if no comment was created).
1484 """
1485 timestamp = timestamp or int(time.time())
1486 old_effective_owner = tracker_bizobj.GetOwnerId(issue)
1487 old_effective_status = tracker_bizobj.GetStatus(issue)
1488 old_components = set(issue.component_ids)
1489
1490 logging.info(
1491 'Bulk edit to project_id %s issue.local_id %s, comment %r',
1492 project_id, issue.local_id, comment)
1493 if iids_to_invalidate is None:
1494 iids_to_invalidate = set([issue.issue_id])
1495 invalidate = True
1496 else:
1497 iids_to_invalidate.add(issue.issue_id)
1498 invalidate = False # Caller will do it.
1499
1500 # Store each updated value in the issue PB, and compute Update PBs
1501 amendments, impacted_iids = tracker_bizobj.ApplyIssueDelta(
1502 cnxn, self, issue, delta, config)
1503 iids_to_invalidate.update(impacted_iids)
1504
1505 # If this was a no-op with no comment, bail out and don't save,
1506 # invalidate, or re-index anything.
1507 if (not amendments and (not comment or not comment.strip()) and
1508 not attachments):
1509 logging.info('No amendments, comment, attachments: this is a no-op.')
1510 return [], None
1511
1512 # Note: no need to check for collisions when the user is doing a delta.
1513
1514 # update the modified_timestamp for any comment added, even if it was
1515 # just a text comment with no issue fields changed.
1516 issue.modified_timestamp = timestamp
1517
1518 # Update the closed timestamp before filter rules so that rules
1519 # can test for closed_timestamp, and also after filter rules
1520 # so that closed_timestamp will be set if the issue is closed by the rule.
1521 tracker_helpers.UpdateClosedTimestamp(config, issue, old_effective_status)
1522 if rules is None:
1523 logging.info('Rules were not given')
1524 rules = services.features.GetFilterRules(cnxn, config.project_id)
1525 predicate_asts = filterrules_helpers.ParsePredicateASTs(
1526 rules, config, [])
1527
1528 filterrules_helpers.ApplyGivenRules(
1529 cnxn, services, issue, config, rules, predicate_asts)
1530 tracker_helpers.UpdateClosedTimestamp(config, issue, old_effective_status)
1531 if old_effective_owner != tracker_bizobj.GetOwnerId(issue):
1532 issue.owner_modified_timestamp = timestamp
1533 if old_effective_status != tracker_bizobj.GetStatus(issue):
1534 issue.status_modified_timestamp = timestamp
1535 if old_components != set(issue.component_ids):
1536 issue.component_modified_timestamp = timestamp
1537
1538 # Store the issue in SQL.
1539 self.UpdateIssue(cnxn, issue, commit=False, invalidate=False)
1540
1541 comment_pb = self.CreateIssueComment(
1542 cnxn, issue, reporter_id, comment, amendments=amendments,
1543 is_description=is_description, attachments=attachments, commit=False,
1544 kept_attachments=kept_attachments, timestamp=timestamp,
1545 importer_id=importer_id, inbound_message=inbound_message)
1546 self._UpdateIssuesModified(
1547 cnxn, iids_to_invalidate, modified_timestamp=issue.modified_timestamp,
1548 invalidate=invalidate)
1549
1550 # Add a comment to the newly added issues saying they are now blocking
1551 # this issue.
1552 for add_issue in self.GetIssues(cnxn, delta.blocked_on_add):
1553 self.CreateIssueComment(
1554 cnxn, add_issue, reporter_id, content='',
1555 amendments=[tracker_bizobj.MakeBlockingAmendment(
1556 [(issue.project_name, issue.local_id)], [],
1557 default_project_name=add_issue.project_name)],
1558 timestamp=timestamp, importer_id=importer_id)
1559 # Add a comment to the newly removed issues saying they are no longer
1560 # blocking this issue.
1561 for remove_issue in self.GetIssues(cnxn, delta.blocked_on_remove):
1562 self.CreateIssueComment(
1563 cnxn, remove_issue, reporter_id, content='',
1564 amendments=[tracker_bizobj.MakeBlockingAmendment(
1565 [], [(issue.project_name, issue.local_id)],
1566 default_project_name=remove_issue.project_name)],
1567 timestamp=timestamp, importer_id=importer_id)
1568
1569 # Add a comment to the newly added issues saying they are now blocked on
1570 # this issue.
1571 for add_issue in self.GetIssues(cnxn, delta.blocking_add):
1572 self.CreateIssueComment(
1573 cnxn, add_issue, reporter_id, content='',
1574 amendments=[tracker_bizobj.MakeBlockedOnAmendment(
1575 [(issue.project_name, issue.local_id)], [],
1576 default_project_name=add_issue.project_name)],
1577 timestamp=timestamp, importer_id=importer_id)
1578 # Add a comment to the newly removed issues saying they are no longer
1579 # blocked on this issue.
1580 for remove_issue in self.GetIssues(cnxn, delta.blocking_remove):
1581 self.CreateIssueComment(
1582 cnxn, remove_issue, reporter_id, content='',
1583 amendments=[tracker_bizobj.MakeBlockedOnAmendment(
1584 [], [(issue.project_name, issue.local_id)],
1585 default_project_name=remove_issue.project_name)],
1586 timestamp=timestamp, importer_id=importer_id)
1587
1588 if not invalidate:
1589 cnxn.Commit()
1590
1591 if index_now:
1592 tracker_fulltext.IndexIssues(
1593 cnxn, [issue], services.user_service, self, self._config_service)
1594 else:
1595 self.EnqueueIssuesForIndexing(cnxn, [issue.issue_id])
1596
1597 return amendments, comment_pb
1598
1599 def InvalidateIIDs(self, cnxn, iids_to_invalidate):
1600 """Invalidate the specified issues in the Invalidate table and memcache."""
1601 issues_to_invalidate = self.GetIssues(cnxn, iids_to_invalidate)
1602 self.InvalidateIssues(cnxn, issues_to_invalidate)
1603
1604 def InvalidateIssues(self, cnxn, issues):
1605 """Invalidate the specified issues in the Invalidate table and memcache."""
1606 iids = [issue.issue_id for issue in issues]
1607 self.issue_2lc.InvalidateKeys(cnxn, iids)
1608 self._config_service.InvalidateMemcache(issues)
1609
1610 def RelateIssues(self, cnxn, issue_relation_dict, commit=True):
1611 """Update the IssueRelation table rows for the given relationships.
1612
1613 issue_relation_dict is a mapping of 'source' issues to 'destination' issues,
1614 paired with the kind of relationship connecting the two.
1615 """
1616 relation_rows = []
1617 for src_iid, dests in issue_relation_dict.items():
1618 for dst_iid, kind in dests:
1619 if kind == 'blocking':
1620 relation_rows.append((dst_iid, src_iid, 'blockedon', 0))
1621 elif kind == 'blockedon':
1622 relation_rows.append((src_iid, dst_iid, 'blockedon', 0))
1623 elif kind == 'mergedinto':
1624 relation_rows.append((src_iid, dst_iid, 'mergedinto', None))
1625
1626 self.issuerelation_tbl.InsertRows(
1627 cnxn, ISSUERELATION_COLS, relation_rows, ignore=True, commit=commit)
1628
1629 def CopyIssues(self, cnxn, dest_project, issues, user_service, copier_id):
1630 """Copy the given issues into the destination project."""
1631 created_issues = []
1632 iids_to_invalidate = set()
1633
1634 for target_issue in issues:
1635 assert not target_issue.assume_stale, (
1636 'issue2514: Copying issue that might be stale: %r' % target_issue)
1637 new_issue = tracker_pb2.Issue()
1638 new_issue.project_id = dest_project.project_id
1639 new_issue.project_name = dest_project.project_name
1640 new_issue.summary = target_issue.summary
1641 new_issue.labels.extend(target_issue.labels)
1642 new_issue.field_values.extend(target_issue.field_values)
1643 new_issue.reporter_id = copier_id
1644
1645 timestamp = int(time.time())
1646 new_issue.opened_timestamp = timestamp
1647 new_issue.modified_timestamp = timestamp
1648
1649 target_comments = self.GetCommentsForIssue(cnxn, target_issue.issue_id)
1650 initial_summary_comment = target_comments[0]
1651
1652 # Note that blocking and merge_into are not copied.
1653 if target_issue.blocked_on_iids:
1654 blocked_on = target_issue.blocked_on_iids
1655 iids_to_invalidate.update(blocked_on)
1656 new_issue.blocked_on_iids = blocked_on
1657
1658 # Gather list of attachments from the target issue's summary comment.
1659 # MakeIssueComments expects a list of [(filename, contents, mimetype),...]
1660 attachments = []
1661 for attachment in initial_summary_comment.attachments:
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +02001662 client = storage.Client()
1663 bucket = client.get_bucket(app_identity.get_default_gcs_bucket_name)
1664 blob = bucket.get_blob(attachment.gcs_object_id)
1665 content = blob.download_as_bytes()
1666 attachments.append([attachment.filename, content, attachment.mimetype])
Copybara854996b2021-09-07 19:36:02 +00001667
1668 if attachments:
1669 new_issue.attachment_count = len(attachments)
1670
1671 # Create the same summary comment as the target issue.
1672 comment = self._MakeIssueComment(
1673 dest_project.project_id, copier_id, initial_summary_comment.content,
1674 attachments=attachments, timestamp=timestamp, is_description=True)
1675
1676 new_issue.local_id = self.AllocateNextLocalID(
1677 cnxn, dest_project.project_id)
1678 issue_id = self.InsertIssue(cnxn, new_issue)
1679 comment.issue_id = issue_id
1680 self.InsertComment(cnxn, comment)
1681
1682 if permissions.HasRestrictions(new_issue, 'view'):
1683 self._config_service.InvalidateMemcache(
1684 [new_issue], key_prefix='nonviewable:')
1685
1686 tracker_fulltext.IndexIssues(
1687 cnxn, [new_issue], user_service, self, self._config_service)
1688 created_issues.append(new_issue)
1689
1690 # The referenced issues are all modified when the relationship is added.
1691 self._UpdateIssuesModified(
1692 cnxn, iids_to_invalidate, modified_timestamp=timestamp)
1693
1694 return created_issues
1695
1696 def MoveIssues(self, cnxn, dest_project, issues, user_service):
1697 """Move the given issues into the destination project."""
1698 old_location_rows = [
1699 (issue.issue_id, issue.project_id, issue.local_id)
1700 for issue in issues]
1701 moved_back_iids = set()
1702
1703 former_locations_in_project = self.issueformerlocations_tbl.Select(
1704 cnxn, cols=ISSUEFORMERLOCATIONS_COLS,
1705 project_id=dest_project.project_id,
1706 issue_id=[issue.issue_id for issue in issues])
1707 former_locations = {
1708 issue_id: local_id
1709 for issue_id, project_id, local_id in former_locations_in_project}
1710
1711 # Remove the issue id from issue_id_2lc so that it does not stay
1712 # around in cache and memcache.
1713 # The Key of IssueIDTwoLevelCache is (project_id, local_id).
1714 self.issue_id_2lc.InvalidateKeys(
1715 cnxn, [(issue.project_id, issue.local_id) for issue in issues])
1716 self.InvalidateIssues(cnxn, issues)
1717
1718 for issue in issues:
1719 if issue.issue_id in former_locations:
1720 dest_id = former_locations[issue.issue_id]
1721 moved_back_iids.add(issue.issue_id)
1722 else:
1723 dest_id = self.AllocateNextLocalID(cnxn, dest_project.project_id)
1724
1725 issue.local_id = dest_id
1726 issue.project_id = dest_project.project_id
1727 issue.project_name = dest_project.project_name
1728
1729 # Rewrite each whole issue so that status and label IDs are looked up
1730 # in the context of the destination project.
1731 self.UpdateIssues(cnxn, issues)
1732
1733 # Comments also have the project_id because it is needed for an index.
1734 self.comment_tbl.Update(
1735 cnxn, {'project_id': dest_project.project_id},
1736 issue_id=[issue.issue_id for issue in issues], commit=False)
1737
1738 # Record old locations so that we can offer links if the user looks there.
1739 self.issueformerlocations_tbl.InsertRows(
1740 cnxn, ISSUEFORMERLOCATIONS_COLS, old_location_rows, ignore=True,
1741 commit=False)
1742 cnxn.Commit()
1743
1744 tracker_fulltext.IndexIssues(
1745 cnxn, issues, user_service, self, self._config_service)
1746
1747 return moved_back_iids
1748
1749 def ExpungeFormerLocations(self, cnxn, project_id):
1750 """Delete history of issues that were in this project but moved out."""
1751 self.issueformerlocations_tbl.Delete(cnxn, project_id=project_id)
1752
1753 def ExpungeIssues(self, cnxn, issue_ids):
1754 """Completely delete the specified issues from the database."""
1755 logging.info('expunging the issues %r', issue_ids)
1756 tracker_fulltext.UnindexIssues(issue_ids)
1757
1758 remaining_iids = issue_ids[:]
1759
1760 # Note: these are purposely not done in a transaction to allow
1761 # incremental progress in what might be a very large change.
1762 # We are not concerned about non-atomic deletes because all
1763 # this data will be gone eventually anyway.
1764 while remaining_iids:
1765 iids_in_chunk = remaining_iids[:CHUNK_SIZE]
1766 remaining_iids = remaining_iids[CHUNK_SIZE:]
1767 self.issuesummary_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1768 self.issue2label_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1769 self.issue2component_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1770 self.issue2cc_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1771 self.issue2notify_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1772 self.issueupdate_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1773 self.attachment_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1774 self.comment_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1775 self.issuerelation_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1776 self.issuerelation_tbl.Delete(cnxn, dst_issue_id=iids_in_chunk)
1777 self.danglingrelation_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1778 self.issueformerlocations_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1779 self.reindexqueue_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1780 self.issue_tbl.Delete(cnxn, id=iids_in_chunk)
1781
1782 def SoftDeleteIssue(self, cnxn, project_id, local_id, deleted, user_service):
1783 """Set the deleted boolean on the indicated issue and store it.
1784
1785 Args:
1786 cnxn: connection to SQL database.
1787 project_id: int project ID for the current project.
1788 local_id: int local ID of the issue to freeze/unfreeze.
1789 deleted: boolean, True to soft-delete, False to undelete.
1790 user_service: persistence layer for users, used to lookup user IDs.
1791 """
1792 issue = self.GetIssueByLocalID(cnxn, project_id, local_id, use_cache=False)
1793 issue.deleted = deleted
1794 self.UpdateIssue(cnxn, issue, update_cols=['deleted'])
1795 tracker_fulltext.IndexIssues(
1796 cnxn, [issue], user_service, self, self._config_service)
1797
1798 def DeleteComponentReferences(self, cnxn, component_id):
1799 """Delete any references to the specified component."""
1800 # TODO(jrobbins): add tasks to re-index any affected issues.
1801 # Note: if this call fails, some data could be left
1802 # behind, but it would not be displayed, and it could always be
1803 # GC'd from the DB later.
1804 self.issue2component_tbl.Delete(cnxn, component_id=component_id)
1805
1806 ### Local ID generation
1807
1808 def InitializeLocalID(self, cnxn, project_id):
1809 """Initialize the local ID counter for the specified project to zero.
1810
1811 Args:
1812 cnxn: connection to SQL database.
1813 project_id: int ID of the project.
1814 """
1815 self.localidcounter_tbl.InsertRow(
1816 cnxn, project_id=project_id, used_local_id=0, used_spam_id=0)
1817
1818 def SetUsedLocalID(self, cnxn, project_id):
1819 """Set the local ID counter based on existing issues.
1820
1821 Args:
1822 cnxn: connection to SQL database.
1823 project_id: int ID of the project.
1824 """
1825 highest_id = self.GetHighestLocalID(cnxn, project_id)
1826 self.localidcounter_tbl.InsertRow(
1827 cnxn, replace=True, used_local_id=highest_id, project_id=project_id)
1828 return highest_id
1829
1830 def AllocateNextLocalID(self, cnxn, project_id):
1831 """Return the next available issue ID in the specified project.
1832
1833 Args:
1834 cnxn: connection to SQL database.
1835 project_id: int ID of the project.
1836
1837 Returns:
1838 The next local ID.
1839 """
1840 try:
1841 next_local_id = self.localidcounter_tbl.IncrementCounterValue(
1842 cnxn, 'used_local_id', project_id=project_id)
1843 except AssertionError as e:
1844 logging.info('exception incrementing local_id counter: %s', e)
1845 next_local_id = self.SetUsedLocalID(cnxn, project_id) + 1
1846 return next_local_id
1847
1848 def GetHighestLocalID(self, cnxn, project_id):
1849 """Return the highest used issue ID in the specified project.
1850
1851 Args:
1852 cnxn: connection to SQL database.
1853 project_id: int ID of the project.
1854
1855 Returns:
1856 The highest local ID for an active or moved issues.
1857 """
1858 highest = self.issue_tbl.SelectValue(
1859 cnxn, 'MAX(local_id)', project_id=project_id)
1860 highest = highest or 0 # It will be None if the project has no issues.
1861 highest_former = self.issueformerlocations_tbl.SelectValue(
1862 cnxn, 'MAX(local_id)', project_id=project_id)
1863 highest_former = highest_former or 0
1864 return max(highest, highest_former)
1865
1866 def GetAllLocalIDsInProject(self, cnxn, project_id, min_local_id=None):
1867 """Return the list of local IDs only, not the actual issues.
1868
1869 Args:
1870 cnxn: connection to SQL database.
1871 project_id: the ID of the project to which the issue belongs.
1872 min_local_id: point to start at.
1873
1874 Returns:
1875 A range object of local IDs from 1 to N, or from min_local_id to N. It
1876 may be the case that some of those local IDs are no longer used, e.g.,
1877 if some issues were moved out of this project.
1878 """
1879 if not min_local_id:
1880 min_local_id = 1
1881 highest_local_id = self.GetHighestLocalID(cnxn, project_id)
1882 return list(range(min_local_id, highest_local_id + 1))
1883
1884 def ExpungeLocalIDCounters(self, cnxn, project_id):
1885 """Delete history of local ids that were in this project."""
1886 self.localidcounter_tbl.Delete(cnxn, project_id=project_id)
1887
1888 ### Comments
1889
1890 def _UnpackComment(
1891 self, comment_row, content_dict, inbound_message_dict, approval_dict,
1892 importer_dict):
1893 """Partially construct a Comment PB from a DB row."""
1894 (comment_id, issue_id, created, project_id, commenter_id,
1895 deleted_by, is_spam, is_description, commentcontent_id) = comment_row
1896 comment = tracker_pb2.IssueComment()
1897 comment.id = comment_id
1898 comment.issue_id = issue_id
1899 comment.timestamp = created
1900 comment.project_id = project_id
1901 comment.user_id = commenter_id
1902 comment.content = content_dict.get(commentcontent_id, '')
1903 comment.inbound_message = inbound_message_dict.get(commentcontent_id, '')
1904 comment.deleted_by = deleted_by or 0
1905 comment.is_spam = bool(is_spam)
1906 comment.is_description = bool(is_description)
1907 comment.approval_id = approval_dict.get(comment_id)
1908 comment.importer_id = importer_dict.get(comment_id)
1909 return comment
1910
1911 def _UnpackAmendment(self, amendment_row):
1912 """Construct an Amendment PB from a DB row."""
1913 (_id, _issue_id, comment_id, field_name,
1914 old_value, new_value, added_user_id, removed_user_id,
1915 custom_field_name) = amendment_row
1916 amendment = tracker_pb2.Amendment()
1917 field_enum = tracker_pb2.FieldID(field_name.upper())
1918 amendment.field = field_enum
1919
1920 # TODO(jrobbins): display old values in more cases.
1921 if new_value is not None:
1922 amendment.newvalue = new_value
1923 if old_value is not None:
1924 amendment.oldvalue = old_value
1925 if added_user_id:
1926 amendment.added_user_ids.append(added_user_id)
1927 if removed_user_id:
1928 amendment.removed_user_ids.append(removed_user_id)
1929 if custom_field_name:
1930 amendment.custom_field_name = custom_field_name
1931 return amendment, comment_id
1932
1933 def _ConsolidateAmendments(self, amendments):
1934 """Consoliodate amendments of the same field in one comment into one
1935 amendment PB."""
1936
1937 fields_dict = {}
1938 result = []
1939
1940 for amendment in amendments:
1941 key = amendment.field, amendment.custom_field_name
1942 fields_dict.setdefault(key, []).append(amendment)
1943 for (field, _custom_name), sorted_amendments in sorted(fields_dict.items()):
1944 new_amendment = tracker_pb2.Amendment()
1945 new_amendment.field = field
1946 for amendment in sorted_amendments:
1947 if amendment.newvalue is not None:
1948 if new_amendment.newvalue is not None:
1949 # NOTE: see crbug/monorail/8272. BLOCKEDON and BLOCKING changes
1950 # are all stored in newvalue e.g. (newvalue = -b/123 b/124) and
1951 # external bugs and monorail bugs are stored in separate amendments.
1952 # Without this, the values of external bug amendments and monorail
1953 # blocker bug amendments may overwrite each other.
1954 new_amendment.newvalue += (' ' + amendment.newvalue)
1955 else:
1956 new_amendment.newvalue = amendment.newvalue
1957 if amendment.oldvalue is not None:
1958 new_amendment.oldvalue = amendment.oldvalue
1959 if amendment.added_user_ids:
1960 new_amendment.added_user_ids.extend(amendment.added_user_ids)
1961 if amendment.removed_user_ids:
1962 new_amendment.removed_user_ids.extend(amendment.removed_user_ids)
1963 if amendment.custom_field_name:
1964 new_amendment.custom_field_name = amendment.custom_field_name
1965 result.append(new_amendment)
1966 return result
1967
1968 def _UnpackAttachment(self, attachment_row):
1969 """Construct an Attachment PB from a DB row."""
1970 (attachment_id, _issue_id, comment_id, filename, filesize, mimetype,
1971 deleted, gcs_object_id) = attachment_row
1972 attach = tracker_pb2.Attachment()
1973 attach.attachment_id = attachment_id
1974 attach.filename = filename
1975 attach.filesize = filesize
1976 attach.mimetype = mimetype
1977 attach.deleted = bool(deleted)
1978 attach.gcs_object_id = gcs_object_id
1979 return attach, comment_id
1980
1981 def _DeserializeComments(
1982 self, comment_rows, commentcontent_rows, amendment_rows, attachment_rows,
1983 approval_rows, importer_rows):
1984 """Turn rows into IssueComment PBs."""
1985 results = [] # keep objects in the same order as the rows
1986 results_dict = {} # for fast access when joining.
1987
1988 content_dict = dict(
1989 (commentcontent_id, content) for
1990 commentcontent_id, content, _ in commentcontent_rows)
1991 inbound_message_dict = dict(
1992 (commentcontent_id, inbound_message) for
1993 commentcontent_id, _, inbound_message in commentcontent_rows)
1994 approval_dict = dict(
1995 (comment_id, approval_id) for approval_id, comment_id in
1996 approval_rows)
1997 importer_dict = dict(importer_rows)
1998
1999 for comment_row in comment_rows:
2000 comment = self._UnpackComment(
2001 comment_row, content_dict, inbound_message_dict, approval_dict,
2002 importer_dict)
2003 results.append(comment)
2004 results_dict[comment.id] = comment
2005
2006 for amendment_row in amendment_rows:
2007 amendment, comment_id = self._UnpackAmendment(amendment_row)
2008 try:
2009 results_dict[comment_id].amendments.extend([amendment])
2010 except KeyError:
2011 logging.error('Found amendment for missing comment: %r', comment_id)
2012
2013 for attachment_row in attachment_rows:
2014 attach, comment_id = self._UnpackAttachment(attachment_row)
2015 try:
2016 results_dict[comment_id].attachments.append(attach)
2017 except KeyError:
2018 logging.error('Found attachment for missing comment: %r', comment_id)
2019
2020 for c in results:
2021 c.amendments = self._ConsolidateAmendments(c.amendments)
2022
2023 return results
2024
2025 # TODO(jrobbins): make this a private method and expose just the interface
2026 # needed by activities.py.
2027 def GetComments(
2028 self, cnxn, where=None, order_by=None, content_only=False, **kwargs):
2029 """Retrieve comments from SQL."""
2030 shard_id = sql.RandomShardID()
2031 order_by = order_by or [('created', [])]
2032 comment_rows = self.comment_tbl.Select(
2033 cnxn, cols=COMMENT_COLS, where=where,
2034 order_by=order_by, shard_id=shard_id, **kwargs)
2035 cids = [row[0] for row in comment_rows]
2036 commentcontent_ids = [row[-1] for row in comment_rows]
2037 content_rows = self.commentcontent_tbl.Select(
2038 cnxn, cols=COMMENTCONTENT_COLS, id=commentcontent_ids,
2039 shard_id=shard_id)
2040 approval_rows = self.issueapproval2comment_tbl.Select(
2041 cnxn, cols=ISSUEAPPROVAL2COMMENT_COLS, comment_id=cids)
2042 amendment_rows = []
2043 attachment_rows = []
2044 importer_rows = []
2045 if not content_only:
2046 amendment_rows = self.issueupdate_tbl.Select(
2047 cnxn, cols=ISSUEUPDATE_COLS, comment_id=cids, shard_id=shard_id)
2048 attachment_rows = self.attachment_tbl.Select(
2049 cnxn, cols=ATTACHMENT_COLS, comment_id=cids, shard_id=shard_id)
2050 importer_rows = self.commentimporter_tbl.Select(
2051 cnxn, cols=COMMENTIMPORTER_COLS, comment_id=cids, shard_id=shard_id)
2052
2053 comments = self._DeserializeComments(
2054 comment_rows, content_rows, amendment_rows, attachment_rows,
2055 approval_rows, importer_rows)
2056 return comments
2057
2058 def GetComment(self, cnxn, comment_id):
2059 """Get the requested comment, or raise an exception."""
2060 comments = self.GetComments(cnxn, id=comment_id)
2061 try:
2062 return comments[0]
2063 except IndexError:
2064 raise exceptions.NoSuchCommentException()
2065
2066 def GetCommentsForIssue(self, cnxn, issue_id):
2067 """Return all IssueComment PBs for the specified issue.
2068
2069 Args:
2070 cnxn: connection to SQL database.
2071 issue_id: int global ID of the issue.
2072
2073 Returns:
2074 A list of the IssueComment protocol buffers for the description
2075 and comments on this issue.
2076 """
2077 comments = self.GetComments(cnxn, issue_id=[issue_id])
2078 for i, comment in enumerate(comments):
2079 comment.sequence = i
2080
2081 return comments
2082
2083
2084 def GetCommentsByID(self, cnxn, comment_ids, sequences, use_cache=True,
2085 shard_id=None):
2086 """Return all IssueComment PBs by comment ids.
2087
2088 Args:
2089 cnxn: connection to SQL database.
2090 comment_ids: a list of comment ids.
2091 sequences: sequence of the comments.
2092 use_cache: optional boolean to enable the cache.
2093 shard_id: optional int shard_id to limit retrieval.
2094
2095 Returns:
2096 A list of the IssueComment protocol buffers for comment_ids.
2097 """
2098 # Try loading issue comments from a random shard to reduce load on
2099 # primary DB.
2100 if shard_id is None:
2101 shard_id = sql.RandomShardID()
2102
2103 comment_dict, _missed_comments = self.comment_2lc.GetAll(cnxn, comment_ids,
2104 use_cache=use_cache, shard_id=shard_id)
2105
2106 comments = sorted(list(comment_dict.values()), key=lambda x: x.timestamp)
2107
2108 for i in range(len(comment_ids)):
2109 comments[i].sequence = sequences[i]
2110
2111 return comments
2112
2113 # TODO(jrobbins): remove this method because it is too slow when an issue
2114 # has a huge number of comments.
2115 def GetCommentsForIssues(self, cnxn, issue_ids, content_only=False):
2116 """Return all IssueComment PBs for each issue ID in the given list.
2117
2118 Args:
2119 cnxn: connection to SQL database.
2120 issue_ids: list of integer global issue IDs.
2121 content_only: optional boolean, set true for faster loading of
2122 comment content without attachments and amendments.
2123
2124 Returns:
2125 Dict {issue_id: [IssueComment, ...]} with IssueComment protocol
2126 buffers for the description and comments on each issue.
2127 """
2128 comments = self.GetComments(
2129 cnxn, issue_id=issue_ids, content_only=content_only)
2130
2131 comments_dict = collections.defaultdict(list)
2132 for comment in comments:
2133 comment.sequence = len(comments_dict[comment.issue_id])
2134 comments_dict[comment.issue_id].append(comment)
2135
2136 return comments_dict
2137
2138 def InsertComment(self, cnxn, comment, commit=True):
2139 """Store the given issue comment in SQL.
2140
2141 Args:
2142 cnxn: connection to SQL database.
2143 comment: IssueComment PB to insert into the database.
2144 commit: set to False to avoid doing the commit for now.
2145 """
2146 commentcontent_id = self.commentcontent_tbl.InsertRow(
2147 cnxn, content=comment.content,
2148 inbound_message=comment.inbound_message, commit=False)
2149 comment_id = self.comment_tbl.InsertRow(
2150 cnxn, issue_id=comment.issue_id, created=comment.timestamp,
2151 project_id=comment.project_id,
2152 commenter_id=comment.user_id,
2153 deleted_by=comment.deleted_by or None,
2154 is_spam=comment.is_spam, is_description=comment.is_description,
2155 commentcontent_id=commentcontent_id,
2156 commit=False)
2157 comment.id = comment_id
2158 if comment.importer_id:
2159 self.commentimporter_tbl.InsertRow(
2160 cnxn, comment_id=comment_id, importer_id=comment.importer_id)
2161
2162 amendment_rows = []
2163 for amendment in comment.amendments:
2164 field_enum = str(amendment.field).lower()
2165 if (amendment.get_assigned_value('newvalue') is not None and
2166 not amendment.added_user_ids and not amendment.removed_user_ids):
2167 amendment_rows.append((
2168 comment.issue_id, comment_id, field_enum,
2169 amendment.oldvalue, amendment.newvalue,
2170 None, None, amendment.custom_field_name))
2171 for added_user_id in amendment.added_user_ids:
2172 amendment_rows.append((
2173 comment.issue_id, comment_id, field_enum, None, None,
2174 added_user_id, None, amendment.custom_field_name))
2175 for removed_user_id in amendment.removed_user_ids:
2176 amendment_rows.append((
2177 comment.issue_id, comment_id, field_enum, None, None,
2178 None, removed_user_id, amendment.custom_field_name))
2179 # ISSUEUPDATE_COLS[1:] to skip id column.
2180 self.issueupdate_tbl.InsertRows(
2181 cnxn, ISSUEUPDATE_COLS[1:], amendment_rows, commit=False)
2182
2183 attachment_rows = []
2184 for attach in comment.attachments:
2185 attachment_rows.append([
2186 comment.issue_id, comment.id, attach.filename, attach.filesize,
2187 attach.mimetype, attach.deleted, attach.gcs_object_id])
2188 self.attachment_tbl.InsertRows(
2189 cnxn, ATTACHMENT_COLS[1:], attachment_rows, commit=False)
2190
2191 if comment.approval_id:
2192 self.issueapproval2comment_tbl.InsertRows(
2193 cnxn, ISSUEAPPROVAL2COMMENT_COLS,
2194 [(comment.approval_id, comment_id)], commit=False)
2195
2196 if commit:
2197 cnxn.Commit()
2198
2199 def _UpdateComment(self, cnxn, comment, update_cols=None):
2200 """Update the given issue comment in SQL.
2201
2202 Args:
2203 cnxn: connection to SQL database.
2204 comment: IssueComment PB to update in the database.
2205 update_cols: optional list of just the field names to update.
2206 """
2207 delta = {
2208 'commenter_id': comment.user_id,
2209 'deleted_by': comment.deleted_by or None,
2210 'is_spam': comment.is_spam,
2211 }
2212 if update_cols is not None:
2213 delta = {key: val for key, val in delta.items()
2214 if key in update_cols}
2215
2216 self.comment_tbl.Update(cnxn, delta, id=comment.id)
2217 self.comment_2lc.InvalidateKeys(cnxn, [comment.id])
2218
2219 def _MakeIssueComment(
2220 self, project_id, user_id, content, inbound_message=None,
2221 amendments=None, attachments=None, kept_attachments=None, timestamp=None,
2222 is_spam=False, is_description=False, approval_id=None, importer_id=None):
2223 """Create in IssueComment protocol buffer in RAM.
2224
2225 Args:
2226 project_id: Project with the issue.
2227 user_id: the user ID of the user who entered the comment.
2228 content: string body of the comment.
2229 inbound_message: optional string full text of an email that
2230 caused this comment to be added.
2231 amendments: list of Amendment PBs describing the
2232 metadata changes that the user made along w/ comment.
2233 attachments: [(filename, contents, mimetype),...] attachments uploaded at
2234 the time the comment was made.
2235 kept_attachments: list of Attachment PBs for attachments kept from
2236 previous descriptions, if the comment is a description
2237 timestamp: time at which the comment was made, defaults to now.
2238 is_spam: True if the comment was classified as spam.
2239 is_description: True if the comment is a description for the issue.
2240 approval_id: id, if any, of the APPROVAL_TYPE FieldDef this comment
2241 belongs to.
2242 importer_id: optional User ID of script that imported the comment on
2243 behalf of a user.
2244
2245 Returns:
2246 The new IssueComment protocol buffer.
2247
2248 The content may have some markup done during input processing.
2249
2250 Any attachments are immediately stored.
2251 """
2252 comment = tracker_pb2.IssueComment()
2253 comment.project_id = project_id
2254 comment.user_id = user_id
2255 comment.content = content or ''
2256 comment.is_spam = is_spam
2257 comment.is_description = is_description
2258 if not timestamp:
2259 timestamp = int(time.time())
2260 comment.timestamp = int(timestamp)
2261 if inbound_message:
2262 comment.inbound_message = inbound_message
2263 if amendments:
2264 logging.info('amendments is %r', amendments)
2265 comment.amendments.extend(amendments)
2266 if approval_id:
2267 comment.approval_id = approval_id
2268
2269 if attachments:
2270 for filename, body, mimetype in attachments:
2271 gcs_object_id = gcs_helpers.StoreObjectInGCS(
2272 body, mimetype, project_id, filename=filename)
2273 attach = tracker_pb2.Attachment()
2274 # attachment id is determined later by the SQL DB.
2275 attach.filename = filename
2276 attach.filesize = len(body)
2277 attach.mimetype = mimetype
2278 attach.gcs_object_id = gcs_object_id
2279 comment.attachments.extend([attach])
2280 logging.info("Save attachment with object_id: %s" % gcs_object_id)
2281
2282 if kept_attachments:
2283 for kept_attach in kept_attachments:
2284 (filename, filesize, mimetype, deleted,
2285 gcs_object_id) = kept_attach[3:]
2286 new_attach = tracker_pb2.Attachment(
2287 filename=filename, filesize=filesize, mimetype=mimetype,
2288 deleted=bool(deleted), gcs_object_id=gcs_object_id)
2289 comment.attachments.append(new_attach)
2290 logging.info("Copy attachment with object_id: %s" % gcs_object_id)
2291
2292 if importer_id:
2293 comment.importer_id = importer_id
2294
2295 return comment
2296
2297 def CreateIssueComment(
2298 self, cnxn, issue, user_id, content, inbound_message=None,
2299 amendments=None, attachments=None, kept_attachments=None, timestamp=None,
2300 is_spam=False, is_description=False, approval_id=None, commit=True,
2301 importer_id=None):
2302 """Create and store a new comment on the specified issue.
2303
2304 Args:
2305 cnxn: connection to SQL database.
2306 issue: the issue on which to add the comment, must be loaded from
2307 database with use_cache=False so that assume_stale == False.
2308 user_id: the user ID of the user who entered the comment.
2309 content: string body of the comment.
2310 inbound_message: optional string full text of an email that caused
2311 this comment to be added.
2312 amendments: list of Amendment PBs describing the
2313 metadata changes that the user made along w/ comment.
2314 attachments: [(filename, contents, mimetype),...] attachments uploaded at
2315 the time the comment was made.
2316 kept_attachments: list of attachment ids for attachments kept from
2317 previous descriptions, if the comment is an update to the description
2318 timestamp: time at which the comment was made, defaults to now.
2319 is_spam: True if the comment is classified as spam.
2320 is_description: True if the comment is a description for the issue.
2321 approval_id: id, if any, of the APPROVAL_TYPE FieldDef this comment
2322 belongs to.
2323 commit: set to False to not commit to DB yet.
2324 importer_id: user ID of an API client that is importing issues.
2325
2326 Returns:
2327 The new IssueComment protocol buffer.
2328
2329 Note that we assume that the content is safe to echo out
2330 again. The content may have some markup done during input
2331 processing.
2332 """
2333 if is_description:
2334 kept_attachments = self.GetAttachmentsByID(cnxn, kept_attachments)
2335 else:
2336 kept_attachments = []
2337
2338 comment = self._MakeIssueComment(
2339 issue.project_id, user_id, content, amendments=amendments,
2340 inbound_message=inbound_message, attachments=attachments,
2341 timestamp=timestamp, is_spam=is_spam, is_description=is_description,
2342 kept_attachments=kept_attachments, approval_id=approval_id,
2343 importer_id=importer_id)
2344 comment.issue_id = issue.issue_id
2345
2346 if attachments or kept_attachments:
2347 issue.attachment_count = (
2348 issue.attachment_count + len(attachments) + len(kept_attachments))
2349 self.UpdateIssue(cnxn, issue, update_cols=['attachment_count'])
2350
2351 self.comment_creations.increment()
2352 self.InsertComment(cnxn, comment, commit=commit)
2353
2354 return comment
2355
2356 def SoftDeleteComment(
2357 self, cnxn, issue, issue_comment, deleted_by_user_id,
2358 user_service, delete=True, reindex=False, is_spam=False):
2359 """Mark comment as un/deleted, which shows/hides it from average users."""
2360 # Update number of attachments
2361 attachments = 0
2362 if issue_comment.attachments:
2363 for attachment in issue_comment.attachments:
2364 if not attachment.deleted:
2365 attachments += 1
2366
2367 # Delete only if it's not in deleted state
2368 if delete:
2369 if not issue_comment.deleted_by:
2370 issue_comment.deleted_by = deleted_by_user_id
2371 issue.attachment_count = issue.attachment_count - attachments
2372
2373 # Undelete only if it's in deleted state
2374 elif issue_comment.deleted_by:
2375 issue_comment.deleted_by = 0
2376 issue.attachment_count = issue.attachment_count + attachments
2377
2378 issue_comment.is_spam = is_spam
2379 self._UpdateComment(
2380 cnxn, issue_comment, update_cols=['deleted_by', 'is_spam'])
2381 self.UpdateIssue(cnxn, issue, update_cols=['attachment_count'])
2382
2383 # Reindex the issue to take the comment deletion/undeletion into account.
2384 if reindex:
2385 tracker_fulltext.IndexIssues(
2386 cnxn, [issue], user_service, self, self._config_service)
2387 else:
2388 self.EnqueueIssuesForIndexing(cnxn, [issue.issue_id])
2389
2390 ### Approvals
2391
2392 def GetIssueApproval(self, cnxn, issue_id, approval_id, use_cache=True):
2393 """Retrieve the specified approval for the specified issue."""
2394 issue = self.GetIssue(cnxn, issue_id, use_cache=use_cache)
2395 approval = tracker_bizobj.FindApprovalValueByID(
2396 approval_id, issue.approval_values)
2397 if approval:
2398 return issue, approval
2399 raise exceptions.NoSuchIssueApprovalException()
2400
2401 def DeltaUpdateIssueApproval(
2402 self, cnxn, modifier_id, config, issue, approval, approval_delta,
2403 comment_content=None, is_description=False, attachments=None,
2404 commit=True, kept_attachments=None):
2405 """Update the issue's approval in the database."""
2406 amendments = []
2407
2408 # Update status in RAM and DB and create status amendment.
2409 if approval_delta.status:
2410 approval.status = approval_delta.status
2411 approval.set_on = approval_delta.set_on or int(time.time())
2412 approval.setter_id = modifier_id
2413 status_amendment = tracker_bizobj.MakeApprovalStatusAmendment(
2414 approval_delta.status)
2415 amendments.append(status_amendment)
2416
2417 self._UpdateIssueApprovalStatus(
2418 cnxn, issue.issue_id, approval.approval_id, approval.status,
2419 approval.setter_id, approval.set_on)
2420
2421 # Update approver_ids in RAM and DB and create approver amendment.
2422 approvers_add = [approver for approver in approval_delta.approver_ids_add
2423 if approver not in approval.approver_ids]
2424 approvers_remove = [approver for approver in
2425 approval_delta.approver_ids_remove
2426 if approver in approval.approver_ids]
2427 if approvers_add or approvers_remove:
2428 approver_ids = [approver for approver in
2429 list(approval.approver_ids) + approvers_add
2430 if approver not in approvers_remove]
2431 approval.approver_ids = approver_ids
2432 approvers_amendment = tracker_bizobj.MakeApprovalApproversAmendment(
2433 approvers_add, approvers_remove)
2434 amendments.append(approvers_amendment)
2435
2436 self._UpdateIssueApprovalApprovers(
2437 cnxn, issue.issue_id, approval.approval_id, approver_ids)
2438
2439 fv_amendments = tracker_bizobj.ApplyFieldValueChanges(
2440 issue, config, approval_delta.subfield_vals_add,
2441 approval_delta.subfield_vals_remove, approval_delta.subfields_clear)
2442 amendments.extend(fv_amendments)
2443 if fv_amendments:
2444 self._UpdateIssuesFields(cnxn, [issue], commit=False)
2445
2446 label_amendment = tracker_bizobj.ApplyLabelChanges(
2447 issue, config, approval_delta.labels_add, approval_delta.labels_remove)
2448 if label_amendment:
2449 amendments.append(label_amendment)
2450 self._UpdateIssuesLabels(cnxn, [issue], commit=False)
2451
2452 comment_pb = self.CreateIssueComment(
2453 cnxn, issue, modifier_id, comment_content, amendments=amendments,
2454 approval_id=approval.approval_id, is_description=is_description,
2455 attachments=attachments, commit=False,
2456 kept_attachments=kept_attachments)
2457
2458 if commit:
2459 cnxn.Commit()
2460 self.issue_2lc.InvalidateKeys(cnxn, [issue.issue_id])
2461
2462 return comment_pb
2463
2464 def _UpdateIssueApprovalStatus(
2465 self, cnxn, issue_id, approval_id, status, setter_id, set_on):
2466 """Update the approvalvalue for the given issue_id's issue."""
2467 set_on = set_on or int(time.time())
2468 delta = {
2469 'status': status.name.lower(),
2470 'setter_id': setter_id,
2471 'set_on': set_on,
2472 }
2473 self.issue2approvalvalue_tbl.Update(
2474 cnxn, delta, approval_id=approval_id, issue_id=issue_id,
2475 commit=False)
2476
2477 def _UpdateIssueApprovalApprovers(
2478 self, cnxn, issue_id, approval_id, approver_ids):
2479 """Update the list of approvers allowed to approve an issue's approval."""
2480 self.issueapproval2approver_tbl.Delete(
2481 cnxn, issue_id=issue_id, approval_id=approval_id, commit=False)
2482 self.issueapproval2approver_tbl.InsertRows(
2483 cnxn, ISSUEAPPROVAL2APPROVER_COLS, [(approval_id, approver_id, issue_id)
2484 for approver_id in approver_ids],
2485 commit=False)
2486
2487 ### Attachments
2488
2489 def GetAttachmentAndContext(self, cnxn, attachment_id):
2490 """Load a IssueAttachment from database, and its comment ID and IID.
2491
2492 Args:
2493 cnxn: connection to SQL database.
2494 attachment_id: long integer unique ID of desired issue attachment.
2495
2496 Returns:
2497 An Attachment protocol buffer that contains metadata about the attached
2498 file, or None if it doesn't exist. Also, the comment ID and issue IID
2499 of the comment and issue that contain this attachment.
2500
2501 Raises:
2502 NoSuchAttachmentException: the attachment was not found.
2503 """
2504 if attachment_id is None:
2505 raise exceptions.NoSuchAttachmentException()
2506
2507 attachment_row = self.attachment_tbl.SelectRow(
2508 cnxn, cols=ATTACHMENT_COLS, id=attachment_id)
2509 if attachment_row:
2510 (attach_id, issue_id, comment_id, filename, filesize, mimetype,
2511 deleted, gcs_object_id) = attachment_row
2512 if not deleted:
2513 attachment = tracker_pb2.Attachment(
2514 attachment_id=attach_id, filename=filename, filesize=filesize,
2515 mimetype=mimetype, deleted=bool(deleted),
2516 gcs_object_id=gcs_object_id)
2517 return attachment, comment_id, issue_id
2518
2519 raise exceptions.NoSuchAttachmentException()
2520
2521 def GetAttachmentsByID(self, cnxn, attachment_ids):
2522 """Return all Attachment PBs by attachment ids.
2523
2524 Args:
2525 cnxn: connection to SQL database.
2526 attachment_ids: a list of comment ids.
2527
2528 Returns:
2529 A list of the Attachment protocol buffers for the attachments with
2530 these ids.
2531 """
2532 attachment_rows = self.attachment_tbl.Select(
2533 cnxn, cols=ATTACHMENT_COLS, id=attachment_ids)
2534
2535 return attachment_rows
2536
2537 def _UpdateAttachment(self, cnxn, comment, attach, update_cols=None):
2538 """Update attachment metadata in the DB.
2539
2540 Args:
2541 cnxn: connection to SQL database.
2542 comment: IssueComment PB to invalidate in the cache.
2543 attach: IssueAttachment PB to update in the DB.
2544 update_cols: optional list of just the field names to update.
2545 """
2546 delta = {
2547 'filename': attach.filename,
2548 'filesize': attach.filesize,
2549 'mimetype': attach.mimetype,
2550 'deleted': bool(attach.deleted),
2551 }
2552 if update_cols is not None:
2553 delta = {key: val for key, val in delta.items()
2554 if key in update_cols}
2555
2556 self.attachment_tbl.Update(cnxn, delta, id=attach.attachment_id)
2557 self.comment_2lc.InvalidateKeys(cnxn, [comment.id])
2558
2559 def SoftDeleteAttachment(
2560 self, cnxn, issue, issue_comment, attach_id, user_service, delete=True,
2561 index_now=False):
2562 """Mark attachment as un/deleted, which shows/hides it from avg users."""
2563 attachment = None
2564 for attach in issue_comment.attachments:
2565 if attach.attachment_id == attach_id:
2566 attachment = attach
2567
2568 if not attachment:
2569 logging.warning(
2570 'Tried to (un)delete non-existent attachment #%s in project '
2571 '%s issue %s', attach_id, issue.project_id, issue.local_id)
2572 return
2573
2574 if not issue_comment.deleted_by:
2575 # Decrement attachment count only if it's not in deleted state
2576 if delete:
2577 if not attachment.deleted:
2578 issue.attachment_count = issue.attachment_count - 1
2579
2580 # Increment attachment count only if it's in deleted state
2581 elif attachment.deleted:
2582 issue.attachment_count = issue.attachment_count + 1
2583
2584 logging.info('attachment.deleted was %s', attachment.deleted)
2585
2586 attachment.deleted = delete
2587
2588 logging.info('attachment.deleted is %s', attachment.deleted)
2589
2590 self._UpdateAttachment(
2591 cnxn, issue_comment, attachment, update_cols=['deleted'])
2592 self.UpdateIssue(cnxn, issue, update_cols=['attachment_count'])
2593
2594 if index_now:
2595 tracker_fulltext.IndexIssues(
2596 cnxn, [issue], user_service, self, self._config_service)
2597 else:
2598 self.EnqueueIssuesForIndexing(cnxn, [issue.issue_id])
2599
2600 ### Reindex queue
2601
2602 def EnqueueIssuesForIndexing(self, cnxn, issue_ids, commit=True):
2603 # type: (MonorailConnection, Collection[int], Optional[bool]) -> None
2604 """Add the given issue IDs to the ReindexQueue table."""
2605 reindex_rows = [(issue_id,) for issue_id in issue_ids]
2606 self.reindexqueue_tbl.InsertRows(
2607 cnxn, ['issue_id'], reindex_rows, ignore=True, commit=commit)
2608
2609 def ReindexIssues(self, cnxn, num_to_reindex, user_service):
2610 """Reindex some issues specified in the IndexQueue table."""
2611 rows = self.reindexqueue_tbl.Select(
2612 cnxn, order_by=[('created', [])], limit=num_to_reindex)
2613 issue_ids = [row[0] for row in rows]
2614
2615 if issue_ids:
2616 issues = self.GetIssues(cnxn, issue_ids)
2617 tracker_fulltext.IndexIssues(
2618 cnxn, issues, user_service, self, self._config_service)
2619 self.reindexqueue_tbl.Delete(cnxn, issue_id=issue_ids)
2620
2621 return len(issue_ids)
2622
2623 ### Search functions
2624
2625 def RunIssueQuery(
2626 self, cnxn, left_joins, where, order_by, shard_id=None, limit=None):
2627 """Run a SQL query to find matching issue IDs.
2628
2629 Args:
2630 cnxn: connection to SQL database.
2631 left_joins: list of SQL LEFT JOIN clauses.
2632 where: list of SQL WHERE clauses.
2633 order_by: list of SQL ORDER BY clauses.
2634 shard_id: int shard ID to focus the search.
2635 limit: int maximum number of results, defaults to
2636 settings.search_limit_per_shard.
2637
2638 Returns:
2639 (issue_ids, capped) where issue_ids is a list of the result issue IDs,
2640 and capped is True if the number of results reached the limit.
2641 """
2642 limit = limit or settings.search_limit_per_shard
2643 where = where + [('Issue.deleted = %s', [False])]
2644 rows = self.issue_tbl.Select(
2645 cnxn, shard_id=shard_id, distinct=True, cols=['Issue.id'],
2646 left_joins=left_joins, where=where, order_by=order_by,
2647 limit=limit)
2648 issue_ids = [row[0] for row in rows]
2649 capped = len(issue_ids) >= limit
2650 return issue_ids, capped
2651
2652 def GetIIDsByLabelIDs(self, cnxn, label_ids, project_id, shard_id):
2653 """Return a list of IIDs for issues with any of the given label IDs."""
2654 if not label_ids:
2655 return []
2656 where = []
2657 if shard_id is not None:
2658 slice_term = ('shard = %s', [shard_id])
2659 where.append(slice_term)
2660
2661 rows = self.issue_tbl.Select(
2662 cnxn, shard_id=shard_id, cols=['id'],
2663 left_joins=[('Issue2Label ON Issue.id = Issue2Label.issue_id', [])],
2664 label_id=label_ids, project_id=project_id, where=where)
2665 return [row[0] for row in rows]
2666
2667 def GetIIDsByParticipant(self, cnxn, user_ids, project_ids, shard_id):
2668 """Return IIDs for issues where any of the given users participate."""
2669 iids = []
2670 where = []
2671 if shard_id is not None:
2672 where.append(('shard = %s', [shard_id]))
2673 if project_ids:
2674 cond_str = 'Issue.project_id IN (%s)' % sql.PlaceHolders(project_ids)
2675 where.append((cond_str, project_ids))
2676
2677 # TODO(jrobbins): Combine these 3 queries into one with ORs. It currently
2678 # is not the bottleneck.
2679 rows = self.issue_tbl.Select(
2680 cnxn, cols=['id'], reporter_id=user_ids,
2681 where=where, shard_id=shard_id)
2682 for row in rows:
2683 iids.append(row[0])
2684
2685 rows = self.issue_tbl.Select(
2686 cnxn, cols=['id'], owner_id=user_ids,
2687 where=where, shard_id=shard_id)
2688 for row in rows:
2689 iids.append(row[0])
2690
2691 rows = self.issue_tbl.Select(
2692 cnxn, cols=['id'], derived_owner_id=user_ids,
2693 where=where, shard_id=shard_id)
2694 for row in rows:
2695 iids.append(row[0])
2696
2697 rows = self.issue_tbl.Select(
2698 cnxn, cols=['id'],
2699 left_joins=[('Issue2Cc ON Issue2Cc.issue_id = Issue.id', [])],
2700 cc_id=user_ids,
2701 where=where + [('cc_id IS NOT NULL', [])],
2702 shard_id=shard_id)
2703 for row in rows:
2704 iids.append(row[0])
2705
2706 rows = self.issue_tbl.Select(
2707 cnxn, cols=['Issue.id'],
2708 left_joins=[
2709 ('Issue2FieldValue ON Issue.id = Issue2FieldValue.issue_id', []),
2710 ('FieldDef ON Issue2FieldValue.field_id = FieldDef.id', [])],
2711 user_id=user_ids, grants_perm='View',
2712 where=where + [('user_id IS NOT NULL', [])],
2713 shard_id=shard_id)
2714 for row in rows:
2715 iids.append(row[0])
2716
2717 return iids
2718
2719 ### Issue Dependency Rankings
2720
2721 def SortBlockedOn(self, cnxn, issue, blocked_on_iids):
2722 """Sort blocked_on dependencies by rank and dst_issue_id.
2723
2724 Args:
2725 cnxn: connection to SQL database.
2726 issue: the issue being blocked.
2727 blocked_on_iids: the iids of all the issue's blockers
2728
2729 Returns:
2730 a tuple (ids, ranks), where ids is the sorted list of
2731 blocked_on_iids and ranks is the list of corresponding ranks
2732 """
2733 rows = self.issuerelation_tbl.Select(
2734 cnxn, cols=ISSUERELATION_COLS, issue_id=issue.issue_id,
2735 dst_issue_id=blocked_on_iids, kind='blockedon',
2736 order_by=[('rank DESC', []), ('dst_issue_id', [])])
2737 ids = [row[1] for row in rows]
2738 ids.extend([iid for iid in blocked_on_iids if iid not in ids])
2739 ranks = [row[3] for row in rows]
2740 ranks.extend([0] * (len(blocked_on_iids) - len(ranks)))
2741 return ids, ranks
2742
2743 def ApplyIssueRerank(
2744 self, cnxn, parent_id, relations_to_change, commit=True, invalidate=True):
2745 """Updates rankings of blocked on issue relations to new values
2746
2747 Args:
2748 cnxn: connection to SQL database.
2749 parent_id: the global ID of the blocked issue to update
2750 relations_to_change: This should be a list of
2751 [(blocker_id, new_rank),...] of relations that need to be changed
2752 commit: set to False to skip the DB commit and do it in the caller.
2753 invalidate: set to False to leave cache invalidatation to the caller.
2754 """
2755 blocker_ids = [blocker for (blocker, rank) in relations_to_change]
2756 self.issuerelation_tbl.Delete(
2757 cnxn, issue_id=parent_id, dst_issue_id=blocker_ids, commit=False)
2758 insert_rows = [(parent_id, blocker, 'blockedon', rank)
2759 for (blocker, rank) in relations_to_change]
2760 self.issuerelation_tbl.InsertRows(
2761 cnxn, cols=ISSUERELATION_COLS, row_values=insert_rows, commit=commit)
2762 if invalidate:
2763 self.InvalidateIIDs(cnxn, [parent_id])
2764
2765 # Expunge Users from Issues system.
2766 def ExpungeUsersInIssues(self, cnxn, user_ids_by_email, limit=None):
2767 """Removes all references to given users from issue DB tables.
2768
2769 This method will not commit the operations. This method will
2770 not make changes to in-memory data.
2771
2772 Args:
2773 cnxn: connection to SQL database.
2774 user_ids_by_email: dict of {email: user_id} of all users we want
2775 to expunge.
2776 limit: Optional, the limit for each operation.
2777
2778 Returns:
2779 A list of issue_ids that need to be reindexed.
2780 """
2781 commit = False
2782 user_ids = list(user_ids_by_email.values())
2783 user_emails = list(user_ids_by_email.keys())
2784 # Track issue_ids for issues that will have different search documents
2785 # as a result of removing users.
2786 affected_issue_ids = []
2787
2788 # Reassign commenter_id and delete inbound_messages.
2789 shard_id = sql.RandomShardID()
2790 comment_content_id_rows = self.comment_tbl.Select(
2791 cnxn, cols=['Comment.id', 'Comment.issue_id', 'commentcontent_id'],
2792 commenter_id=user_ids, shard_id=shard_id, limit=limit)
2793 comment_ids = [row[0] for row in comment_content_id_rows]
2794 commentcontent_ids = [row[2] for row in comment_content_id_rows]
2795 if commentcontent_ids:
2796 self.commentcontent_tbl.Update(
2797 cnxn, {'inbound_message': None}, id=commentcontent_ids, commit=commit)
2798 if comment_ids:
2799 self.comment_tbl.Update(
2800 cnxn, {'commenter_id': framework_constants.DELETED_USER_ID},
2801 id=comment_ids,
2802 commit=commit)
2803 affected_issue_ids.extend([row[1] for row in comment_content_id_rows])
2804
2805 # Reassign deleted_by comments deleted_by.
2806 self.comment_tbl.Update(
2807 cnxn,
2808 {'deleted_by': framework_constants.DELETED_USER_ID},
2809 deleted_by=user_ids,
2810 commit=commit, limit=limit)
2811
2812 # Remove users in field values.
2813 fv_issue_id_rows = self.issue2fieldvalue_tbl.Select(
2814 cnxn, cols=['issue_id'], user_id=user_ids, limit=limit)
2815 fv_issue_ids = [row[0] for row in fv_issue_id_rows]
2816 self.issue2fieldvalue_tbl.Delete(
2817 cnxn, user_id=user_ids, limit=limit, commit=commit)
2818 affected_issue_ids.extend(fv_issue_ids)
2819
2820 # Remove users in approval values.
2821 self.issueapproval2approver_tbl.Delete(
2822 cnxn, approver_id=user_ids, commit=commit, limit=limit)
2823 self.issue2approvalvalue_tbl.Update(
2824 cnxn,
2825 {'setter_id': framework_constants.DELETED_USER_ID},
2826 setter_id=user_ids,
2827 commit=commit, limit=limit)
2828
2829 # Remove users in issue Ccs.
2830 cc_issue_id_rows = self.issue2cc_tbl.Select(
2831 cnxn, cols=['issue_id'], cc_id=user_ids, limit=limit)
2832 cc_issue_ids = [row[0] for row in cc_issue_id_rows]
2833 self.issue2cc_tbl.Delete(
2834 cnxn, cc_id=user_ids, limit=limit, commit=commit)
2835 affected_issue_ids.extend(cc_issue_ids)
2836
2837 # Remove users in issue owners.
2838 owner_issue_id_rows = self.issue_tbl.Select(
2839 cnxn, cols=['id'], owner_id=user_ids, limit=limit)
2840 owner_issue_ids = [row[0] for row in owner_issue_id_rows]
2841 if owner_issue_ids:
2842 self.issue_tbl.Update(
2843 cnxn, {'owner_id': None}, id=owner_issue_ids, commit=commit)
2844 affected_issue_ids.extend(owner_issue_ids)
2845 derived_owner_issue_id_rows = self.issue_tbl.Select(
2846 cnxn, cols=['id'], derived_owner_id=user_ids, limit=limit)
2847 derived_owner_issue_ids = [row[0] for row in derived_owner_issue_id_rows]
2848 if derived_owner_issue_ids:
2849 self.issue_tbl.Update(
2850 cnxn, {'derived_owner_id': None},
2851 id=derived_owner_issue_ids,
2852 commit=commit)
2853 affected_issue_ids.extend(derived_owner_issue_ids)
2854
2855 # Remove users in issue reporters.
2856 reporter_issue_id_rows = self.issue_tbl.Select(
2857 cnxn, cols=['id'], reporter_id=user_ids, limit=limit)
2858 reporter_issue_ids = [row[0] for row in reporter_issue_id_rows]
2859 if reporter_issue_ids:
2860 self.issue_tbl.Update(
2861 cnxn, {'reporter_id': framework_constants.DELETED_USER_ID},
2862 id=reporter_issue_ids,
2863 commit=commit)
2864 affected_issue_ids.extend(reporter_issue_ids)
2865
2866 # Note: issueupdate_tbl's and issue2notify's user_id columns do not
2867 # reference the User table. So all values need to updated here before
2868 # User rows can be deleted safely. No limit will be applied.
2869
2870 # Remove users in issue updates.
2871 self.issueupdate_tbl.Update(
2872 cnxn,
2873 {'added_user_id': framework_constants.DELETED_USER_ID},
2874 added_user_id=user_ids,
2875 commit=commit)
2876 self.issueupdate_tbl.Update(
2877 cnxn,
2878 {'removed_user_id': framework_constants.DELETED_USER_ID},
2879 removed_user_id=user_ids,
2880 commit=commit)
2881
2882 # Remove users in issue notify.
2883 self.issue2notify_tbl.Delete(
2884 cnxn, email=user_emails, commit=commit)
2885
2886 # Remove users in issue snapshots.
2887 self.issuesnapshot_tbl.Update(
2888 cnxn,
2889 {'owner_id': framework_constants.DELETED_USER_ID},
2890 owner_id=user_ids,
2891 commit=commit, limit=limit)
2892 self.issuesnapshot_tbl.Update(
2893 cnxn,
2894 {'reporter_id': framework_constants.DELETED_USER_ID},
2895 reporter_id=user_ids,
2896 commit=commit, limit=limit)
2897 self.issuesnapshot2cc_tbl.Delete(
2898 cnxn, cc_id=user_ids, commit=commit, limit=limit)
2899
2900 return list(set(affected_issue_ids))