blob: bacaec5596318c073523abb6c7537caf9c2ce1bf [file] [log] [blame]
Copybara854996b2021-09-07 19:36:02 +00001# Copyright 2016 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style
3# license that can be found in the LICENSE file or at
4# https://developers.google.com/open-source/licenses/bsd
5
6"""Business objects for Monorail's framework.
7
8These are classes and functions that operate on the objects that
9users care about in Monorail but that are not part of just one specific
10component: e.g., projects, users, and labels.
11"""
12from __future__ import print_function
13from __future__ import division
14from __future__ import absolute_import
15
16import functools
17import itertools
18import re
19
20import six
21
22import settings
23from framework import exceptions
24from framework import framework_constants
25from proto import tracker_pb2
26from services import client_config_svc
27
28
29# Pattern to match a valid column header name.
30RE_COLUMN_NAME = r'\w+[\w+-.]*\w+'
31
32# Compiled regexp to match a valid column specification.
33RE_COLUMN_SPEC = re.compile('(%s(\s%s)*)*$' % (RE_COLUMN_NAME, RE_COLUMN_NAME))
34
35
36def WhichUsersShareAProject(cnxn, services, user_effective_ids, other_users):
37 # type: (MonorailConnection, Services, Sequence[int],
38 # Collection[user_pb2.User]) -> Collection[user_pb2.User]
39 """Returns a list of users that share a project with given user_effective_ids.
40
41 Args:
42 cnxn: MonorailConnection to the database.
43 services: Services object for connections to backend services.
44 user_effective_ids: The user's set of effective_ids.
45 other_users: The list of users to be filtered for email visibility.
46
47 Returns:
48 Collection of users that share a project with at least one effective_id.
49 """
50
51 projects_by_user_effective_id = services.project.GetProjectMemberships(
52 cnxn, user_effective_ids)
53 authed_user_projects = set(
54 itertools.chain.from_iterable(projects_by_user_effective_id.values()))
55
56 other_user_ids = [other_user.user_id for other_user in other_users]
57 all_other_user_effective_ids = GetEffectiveIds(cnxn, services, other_user_ids)
58 users_that_share_project = []
59 for other_user in other_users:
60 other_user_effective_ids = all_other_user_effective_ids[other_user.user_id]
61
62 # Do not filter yourself.
63 if any(uid in user_effective_ids for uid in other_user_effective_ids):
64 users_that_share_project.append(other_user)
65 continue
66
67 other_user_proj_by_effective_ids = services.project.GetProjectMemberships(
68 cnxn, other_user_effective_ids)
69 other_user_projects = itertools.chain.from_iterable(
70 other_user_proj_by_effective_ids.values())
71 if any(project in authed_user_projects for project in other_user_projects):
72 users_that_share_project.append(other_user)
73 return users_that_share_project
74
75
76def FilterViewableEmails(cnxn, services, user_auth, other_users):
77 # type: (MonorailConnection, Services, AuthData,
78 # Collection[user_pb2.User]) -> Collection[user_pb2.User]
79 """Returns a list of users with emails visible to `user_auth`.
80
81 Args:
82 cnxn: MonorailConnection to the database.
83 services: Services object for connections to backend services.
84 user_auth: The AuthData of the user viewing the email addresses.
85 other_users: The list of users to be filtered for email visibility.
86
87 Returns:
88 Collection of user that should reveal their emails.
89 """
90 # Case 1: Anon users don't see anything revealed.
91 if user_auth.user_pb is None:
92 return []
93
94 # Case 2: site admins always see unobscured email addresses.
95 if user_auth.user_pb.is_site_admin:
96 return other_users
97
98 # Case 3: Members of any groups in settings.full_emails_perm_groups
99 # can view unobscured email addresses.
100 for group_email in settings.full_emails_perm_groups:
101 if services.usergroup.LookupUserGroupID(
102 cnxn, group_email) in user_auth.effective_ids:
103 return other_users
104
105 # Case 4: Users see unobscured emails as long as they share a common Project.
106 return WhichUsersShareAProject(
107 cnxn, services, user_auth.effective_ids, other_users)
108
109
110def DoUsersShareAProject(cnxn, services, user_effective_ids, other_user_id):
111 # type: (MonorailConnection, Services, Sequence[int], int) -> bool
112 """Determine whether two users share at least one Project.
113
114 The user_effective_ids may include group ids or the other_user_id may be a
115 member of a group that results in transitive Project ownership.
116
117 Args:
118 cnxn: MonorailConnection to the database.
119 services: Services object for connections to backend services.
120 user_effective_ids: The effective ids of the authorized User.
121 other_user_id: The other user's user_id to compare against.
122
123 Returns:
124 True if one or more Projects are shared between the Users.
125 """
126 projects_by_user_effective_id = services.project.GetProjectMemberships(
127 cnxn, user_effective_ids)
128 authed_user_projects = itertools.chain.from_iterable(
129 projects_by_user_effective_id.values())
130
131 # Get effective ids for other user to handle transitive Project membership.
132 other_user_effective_ids = GetEffectiveIds(cnxn, services, other_user_id)
133 projects_by_other_user_effective_ids = services.project.GetProjectMemberships(
134 cnxn, other_user_effective_ids)
135 other_user_projects = itertools.chain.from_iterable(
136 projects_by_other_user_effective_ids.values())
137
138 return any(project in authed_user_projects for project in other_user_projects)
139
140
141# TODO(https://crbug.com/monorail/8192): Remove this method.
142def DeprecatedShouldRevealEmail(user_auth, project, viewed_email):
143 # type: (AuthData, Project, str) -> bool
144 """
145 Deprecated V1 API logic to decide whether to publish a user's email
146 address. Avoid updating this method.
147
148 Args:
149 user_auth: The AuthData of the user viewing the email addresses.
150 project: The Project PB to which the viewed user belongs.
151 viewed_email: The email of the viewed user.
152
153 Returns:
154 True if email addresses should be published to the logged-in user.
155 """
156 # Case 1: Anon users don't see anything revealed.
157 if user_auth.user_pb is None:
158 return False
159
160 # Case 2: site admins always see unobscured email addresses.
161 if user_auth.user_pb.is_site_admin:
162 return True
163
164 # Case 3: Project members see the unobscured email of everyone in a project.
165 if project and UserIsInProject(project, user_auth.effective_ids):
166 return True
167
168 # Case 4: Do not obscure your own email.
169 if viewed_email and user_auth.user_pb.email == viewed_email:
170 return True
171
172 return False
173
174
175def ParseAndObscureAddress(email):
176 # type: str -> str
177 """Break the given email into username and domain, and obscure.
178
179 Args:
180 email: string email address to process
181
182 Returns:
183 A 4-tuple (username, domain, obscured_username, obscured_email).
184 The obscured_username is truncated more aggressively than how Google Groups
185 does it: it truncates at 5 characters or truncates OFF 3 characters,
186 whichever results in a shorter obscured_username.
187 """
188 if '@' in email:
189 username, user_domain = email.split('@', 1)
190 else: # don't fail if User table has unexpected email address format.
191 username, user_domain = email, ''
192
193 base_username = username.split('+')[0]
194 cutoff_point = min(5, max(1, len(base_username) - 3))
195 obscured_username = base_username[:cutoff_point]
196 obscured_email = '%s...@%s' %(obscured_username, user_domain)
197
198 return username, user_domain, obscured_username, obscured_email
199
200
201def CreateUserDisplayNamesAndEmails(cnxn, services, user_auth, users):
202 # type: (MonorailConnection, Services, AuthData,
203 # Collection[user_pb2.User]) ->
204 # Tuple[Mapping[int, str], Mapping[int, str]]
205 """Create the display names and emails of the given users based on the
206 current user.
207
208 Args:
209 cnxn: MonorailConnection to the database.
210 services: Services object for connections to backend services.
211 user_auth: AuthData object that identifies the logged in user.
212 users: Collection of User PB objects.
213
214 Returns:
215 A Tuple containing two Dicts of user_ids to display names and user_ids to
216 emails. If a given User does not have an email, there will be an empty
217 string in both.
218 """
219 # NOTE: Currently only service accounts can have display_names set. For all
220 # other users and service accounts with no display_names specified, we use the
221 # obscured or unobscured emails for both `display_names` and `emails`.
222 # See crbug.com/monorail/8510.
223 display_names = {}
224 emails = {}
225
226 # Do a pass on simple display cases.
227 maybe_revealed_users = []
228 for user in users:
229 if user.user_id == framework_constants.DELETED_USER_ID:
230 display_names[user.user_id] = framework_constants.DELETED_USER_NAME
231 emails[user.user_id] = ''
232 elif not user.email:
233 display_names[user.user_id] = ''
234 emails[user.user_id] = ''
235 elif not user.obscure_email:
236 display_names[user.user_id] = user.email
237 emails[user.user_id] = user.email
238 else:
239 # Default to hiding user email.
240 (_username, _domain, _obs_username,
241 obs_email) = ParseAndObscureAddress(user.email)
242 display_names[user.user_id] = obs_email
243 emails[user.user_id] = obs_email
244 maybe_revealed_users.append(user)
245
246 # Reveal viewable emails.
247 viewable_users = FilterViewableEmails(
248 cnxn, services, user_auth, maybe_revealed_users)
249 for user in viewable_users:
250 display_names[user.user_id] = user.email
251 emails[user.user_id] = user.email
252
253 # Use Client.display_names for service accounts that have one specified.
254 for user in users:
255 if user.email in client_config_svc.GetServiceAccountMap():
256 display_names[user.user_id] = client_config_svc.GetServiceAccountMap()[
257 user.email]
258
259 return display_names, emails
260
261
262def UserOwnsProject(project, effective_ids):
263 """Return True if any of the effective_ids is a project owner."""
264 return not effective_ids.isdisjoint(project.owner_ids or set())
265
266
267def UserIsInProject(project, effective_ids):
268 """Return True if any of the effective_ids is a project member.
269
270 Args:
271 project: Project PB for the current project.
272 effective_ids: set of int user IDs for the current user (including all
273 user groups). This will be an empty set for anonymous users.
274
275 Returns:
276 True if the user has any direct or indirect role in the project. The value
277 will actually be a set(), but it will have an ID in it if the user is in
278 the project, or it will be an empty set which is considered False.
279 """
280 return (UserOwnsProject(project, effective_ids) or
281 not effective_ids.isdisjoint(project.committer_ids or set()) or
282 not effective_ids.isdisjoint(project.contributor_ids or set()))
283
284
285def IsPriviledgedDomainUser(email):
286 """Return True if the user's account is from a priviledged domain."""
287 if email and '@' in email:
288 _, user_domain = email.split('@', 1)
289 return user_domain in settings.priviledged_user_domains
290
291 return False
292
293
294def IsValidColumnSpec(col_spec):
295 # type: str -> bool
296 """Return true if the given column specification is valid."""
297 return re.match(RE_COLUMN_SPEC, col_spec)
298
299
300# String translation table to catch a common typos in label names.
301_CANONICALIZATION_TRANSLATION_TABLE = {
302 ord(delete_u_char): None
303 for delete_u_char in u'!"#$%&\'()*+,/:;<>?@[\\]^`{|}~\t\n\x0b\x0c\r '
304 }
305_CANONICALIZATION_TRANSLATION_TABLE.update({ord(u'='): ord(u'-')})
306
307
308def CanonicalizeLabel(user_input):
309 """Canonicalize a given label or status value.
310
311 When the user enters a string that represents a label or an enum,
312 convert it a canonical form that makes it more likely to match
313 existing values.
314
315 Args:
316 user_input: string that the user typed for a label.
317
318 Returns:
319 Canonical form of that label as a unicode string.
320 """
321 if user_input is None:
322 return user_input
323
324 if not isinstance(user_input, six.text_type):
325 user_input = user_input.decode('utf-8')
326
327 canon_str = user_input.translate(_CANONICALIZATION_TRANSLATION_TABLE)
328 return canon_str
329
330
331def MergeLabels(labels_list, labels_add, labels_remove, config):
332 """Update a list of labels with the given add and remove label lists.
333
334 Args:
335 labels_list: list of current labels.
336 labels_add: labels that the user wants to add.
337 labels_remove: labels that the user wants to remove.
338 config: ProjectIssueConfig with info about exclusive prefixes and
339 enum fields.
340
341 Returns:
342 (merged_labels, update_labels_add, update_labels_remove):
343 A new list of labels with the given labels added and removed, and
344 any exclusive label prefixes taken into account. Then two
345 lists of update strings to explain the changes that were actually
346 made.
347 """
348 old_lower_labels = [lab.lower() for lab in labels_list]
349 labels_add = [lab for lab in labels_add
350 if lab.lower() not in old_lower_labels]
351 labels_remove = [lab for lab in labels_remove
352 if lab.lower() in old_lower_labels]
353 labels_remove_lower = [lab.lower() for lab in labels_remove]
354 exclusive_prefixes = [
355 lab.lower() + '-' for lab in config.exclusive_label_prefixes]
356 for fd in config.field_defs:
357 if (fd.field_type == tracker_pb2.FieldTypes.ENUM_TYPE and
358 not fd.is_multivalued):
359 exclusive_prefixes.append(fd.field_name.lower() + '-')
360
361 # We match prefix strings rather than splitting on dash because
362 # an exclusive-prefix or field name may contain dashes.
363 def MatchPrefix(lab, prefixes):
364 for prefix_dash in prefixes:
365 if lab.lower().startswith(prefix_dash):
366 return prefix_dash
367 return False
368
369 # Dedup any added labels. E.g., ignore attempts to add Priority twice.
370 excl_add = []
371 dedupped_labels_add = []
372 for lab in labels_add:
373 matched_prefix_dash = MatchPrefix(lab, exclusive_prefixes)
374 if matched_prefix_dash:
375 if matched_prefix_dash not in excl_add:
376 excl_add.append(matched_prefix_dash)
377 dedupped_labels_add.append(lab)
378 else:
379 dedupped_labels_add.append(lab)
380
381 # "Old minus exclusive" is the set of old label values minus any
382 # that should be overwritten by newly set exclusive labels.
383 old_minus_excl = []
384 for lab in labels_list:
385 matched_prefix_dash = MatchPrefix(lab, excl_add)
386 if not matched_prefix_dash:
387 old_minus_excl.append(lab)
388
389 merged_labels = [lab for lab in old_minus_excl + dedupped_labels_add
390 if lab.lower() not in labels_remove_lower]
391
392 return merged_labels, dedupped_labels_add, labels_remove
393
394
395# Pattern to match a valid hotlist name.
396RE_HOTLIST_NAME_PATTERN = r"[a-zA-Z][-0-9a-zA-Z\.]*"
397
398# Compiled regexp to match the hotlist name and nothing more before or after.
399RE_HOTLIST_NAME = re.compile(
400 '^%s$' % RE_HOTLIST_NAME_PATTERN, re.VERBOSE)
401
402
403def IsValidHotlistName(s):
404 """Return true if the given string is a valid hotlist name."""
405 return (RE_HOTLIST_NAME.match(s) and
406 len(s) <= framework_constants.MAX_HOTLIST_NAME_LENGTH)
407
408
409USER_PREF_DEFS = {
410 'code_font': re.compile('(true|false)'),
411 'render_markdown': re.compile('(true|false)'),
412
413 # The are for dismissible cues. True means the user has dismissed them.
414 'privacy_click_through': re.compile('(true|false)'),
415 'corp_mode_click_through': re.compile('(true|false)'),
416 'code_of_conduct': re.compile('(true|false)'),
417 'dit_keystrokes': re.compile('(true|false)'),
418 'italics_mean_derived': re.compile('(true|false)'),
419 'availability_msgs': re.compile('(true|false)'),
420 'your_email_bounced': re.compile('(true|false)'),
421 'search_for_numbers': re.compile('(true|false)'),
422 'restrict_new_issues': re.compile('(true|false)'),
423 'public_issue_notice': re.compile('(true|false)'),
424 'you_are_on_vacation': re.compile('(true|false)'),
425 'how_to_join_project': re.compile('(true|false)'),
426 'document_team_duties': re.compile('(true|false)'),
427 'showing_ids_instead_of_tiles': re.compile('(true|false)'),
428 'issue_timestamps': re.compile('(true|false)'),
429 'stale_fulltext': re.compile('(true|false)'),
430 }
431MAX_PREF_VALUE_LENGTH = 80
432
433
434def ValidatePref(name, value):
435 """Return an error message if the server does not support a pref value."""
436 if name not in USER_PREF_DEFS:
437 return 'Unknown pref name: %r' % name
438 if len(value) > MAX_PREF_VALUE_LENGTH:
439 return 'Value for pref name %r is too long' % name
440 if not USER_PREF_DEFS[name].match(value):
441 return 'Invalid pref value %r for %r' % (value, name)
442 return None
443
444
445def IsRestrictNewIssuesUser(cnxn, services, user_id):
446 # type: (MonorailConnection, Services, int) -> bool)
447 """Returns true iff user's new issues should be restricted by default."""
448 user_group_ids = services.usergroup.LookupMemberships(cnxn, user_id)
449 restrict_new_issues_groups_dict = services.user.LookupUserIDs(
450 cnxn, settings.restrict_new_issues_user_groups, autocreate=True)
451 restrict_new_issues_group_ids = set(restrict_new_issues_groups_dict.values())
452 return any(gid in restrict_new_issues_group_ids for gid in user_group_ids)
453
454
455def IsPublicIssueNoticeUser(cnxn, services, user_id):
456 # type: (MonorailConnection, Services, int) -> bool)
457 """Returns true iff user should see a public issue notice by default."""
458 user_group_ids = services.usergroup.LookupMemberships(cnxn, user_id)
459 public_issue_notice_groups_dict = services.user.LookupUserIDs(
460 cnxn, settings.public_issue_notice_user_groups, autocreate=True)
461 public_issue_notice_group_ids = set(public_issue_notice_groups_dict.values())
462 return any(gid in public_issue_notice_group_ids for gid in user_group_ids)
463
464
465def GetEffectiveIds(cnxn, services, user_ids):
466 # type: (MonorailConnection, Services, Collection[int]) ->
467 # Mapping[int, Collection[int]]
468 """
469 Given a set of user IDs, it returns a mapping of user_id to a set of effective
470 IDs that include the user's ID and all of their user groups. This mapping
471 will be contain only the user_id anonymous users.
472 """
473 # Get direct memberships for user_ids.
474 effective_ids_by_user_id = services.usergroup.LookupAllMemberships(
475 cnxn, user_ids)
476 # Add user_id to list of effective_ids.
477 for user_id, effective_ids in effective_ids_by_user_id.items():
478 effective_ids.add(user_id)
479 # Get User objects for user_ids.
480 users_by_id = services.user.GetUsersByIDs(cnxn, user_ids)
481 for user_id, user in users_by_id.items():
482 if user and user.email:
483 effective_ids_by_user_id[user_id].update(
484 _ComputeMembershipsByEmail(cnxn, services, user.email))
485
486 # Add related parent and child ids.
487 related_ids = []
488 if user.linked_parent_id:
489 related_ids.append(user.linked_parent_id)
490 if user.linked_child_ids:
491 related_ids.extend(user.linked_child_ids)
492
493 # Add any related efective_ids.
494 if related_ids:
495 effective_ids_by_user_id[user_id].update(related_ids)
496 effective_ids_by_related_id = services.usergroup.LookupAllMemberships(
497 cnxn, related_ids)
498 related_effective_ids = functools.reduce(
499 set.union, effective_ids_by_related_id.values(), set())
500 effective_ids_by_user_id[user_id].update(related_effective_ids)
501 return effective_ids_by_user_id
502
503
504def _ComputeMembershipsByEmail(cnxn, services, email):
505 # type: (MonorailConnection, Services, str) -> Collection[int]
506 """
507 Given an user email, it returns a list [group_id] of computed user groups.
508 """
509 # Get the user email domain to compute memberships of the user.
510 (_username, user_email_domain, _obs_username,
511 _obs_email) = ParseAndObscureAddress(email)
512 return services.usergroup.LookupComputedMemberships(cnxn, user_email_domain)