blob: 77a70acbdf6e743118371fb34a864afd44c15f76 [file] [log] [blame]
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01001# Copyright 2016 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
Copybara854996b2021-09-07 19:36:02 +00004
5"""Business objects for Monorail's framework.
6
7These are classes and functions that operate on the objects that
8users care about in Monorail but that are not part of just one specific
9component: e.g., projects, users, and labels.
10"""
11from __future__ import print_function
12from __future__ import division
13from __future__ import absolute_import
14
15import functools
16import itertools
17import re
18
19import six
20
21import settings
22from framework import exceptions
23from framework import framework_constants
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +010024from mrproto import tracker_pb2
Copybara854996b2021-09-07 19:36:02 +000025from services import client_config_svc
26
27
28# Pattern to match a valid column header name.
29RE_COLUMN_NAME = r'\w+[\w+-.]*\w+'
30
31# Compiled regexp to match a valid column specification.
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +010032RE_COLUMN_SPEC = re.compile(r'(%s(\s%s)*)*$' % (RE_COLUMN_NAME, RE_COLUMN_NAME))
Copybara854996b2021-09-07 19:36:02 +000033
34
35def WhichUsersShareAProject(cnxn, services, user_effective_ids, other_users):
36 # type: (MonorailConnection, Services, Sequence[int],
37 # Collection[user_pb2.User]) -> Collection[user_pb2.User]
38 """Returns a list of users that share a project with given user_effective_ids.
39
40 Args:
41 cnxn: MonorailConnection to the database.
42 services: Services object for connections to backend services.
43 user_effective_ids: The user's set of effective_ids.
44 other_users: The list of users to be filtered for email visibility.
45
46 Returns:
47 Collection of users that share a project with at least one effective_id.
48 """
49
50 projects_by_user_effective_id = services.project.GetProjectMemberships(
51 cnxn, user_effective_ids)
52 authed_user_projects = set(
53 itertools.chain.from_iterable(projects_by_user_effective_id.values()))
54
55 other_user_ids = [other_user.user_id for other_user in other_users]
56 all_other_user_effective_ids = GetEffectiveIds(cnxn, services, other_user_ids)
57 users_that_share_project = []
58 for other_user in other_users:
59 other_user_effective_ids = all_other_user_effective_ids[other_user.user_id]
60
61 # Do not filter yourself.
62 if any(uid in user_effective_ids for uid in other_user_effective_ids):
63 users_that_share_project.append(other_user)
64 continue
65
66 other_user_proj_by_effective_ids = services.project.GetProjectMemberships(
67 cnxn, other_user_effective_ids)
68 other_user_projects = itertools.chain.from_iterable(
69 other_user_proj_by_effective_ids.values())
70 if any(project in authed_user_projects for project in other_user_projects):
71 users_that_share_project.append(other_user)
72 return users_that_share_project
73
74
75def FilterViewableEmails(cnxn, services, user_auth, other_users):
76 # type: (MonorailConnection, Services, AuthData,
77 # Collection[user_pb2.User]) -> Collection[user_pb2.User]
78 """Returns a list of users with emails visible to `user_auth`.
79
80 Args:
81 cnxn: MonorailConnection to the database.
82 services: Services object for connections to backend services.
83 user_auth: The AuthData of the user viewing the email addresses.
84 other_users: The list of users to be filtered for email visibility.
85
86 Returns:
87 Collection of user that should reveal their emails.
88 """
89 # Case 1: Anon users don't see anything revealed.
90 if user_auth.user_pb is None:
91 return []
92
93 # Case 2: site admins always see unobscured email addresses.
94 if user_auth.user_pb.is_site_admin:
95 return other_users
96
97 # Case 3: Members of any groups in settings.full_emails_perm_groups
98 # can view unobscured email addresses.
99 for group_email in settings.full_emails_perm_groups:
100 if services.usergroup.LookupUserGroupID(
101 cnxn, group_email) in user_auth.effective_ids:
102 return other_users
103
104 # Case 4: Users see unobscured emails as long as they share a common Project.
105 return WhichUsersShareAProject(
106 cnxn, services, user_auth.effective_ids, other_users)
107
108
109def DoUsersShareAProject(cnxn, services, user_effective_ids, other_user_id):
110 # type: (MonorailConnection, Services, Sequence[int], int) -> bool
111 """Determine whether two users share at least one Project.
112
113 The user_effective_ids may include group ids or the other_user_id may be a
114 member of a group that results in transitive Project ownership.
115
116 Args:
117 cnxn: MonorailConnection to the database.
118 services: Services object for connections to backend services.
119 user_effective_ids: The effective ids of the authorized User.
120 other_user_id: The other user's user_id to compare against.
121
122 Returns:
123 True if one or more Projects are shared between the Users.
124 """
125 projects_by_user_effective_id = services.project.GetProjectMemberships(
126 cnxn, user_effective_ids)
127 authed_user_projects = itertools.chain.from_iterable(
128 projects_by_user_effective_id.values())
129
130 # Get effective ids for other user to handle transitive Project membership.
131 other_user_effective_ids = GetEffectiveIds(cnxn, services, other_user_id)
132 projects_by_other_user_effective_ids = services.project.GetProjectMemberships(
133 cnxn, other_user_effective_ids)
134 other_user_projects = itertools.chain.from_iterable(
135 projects_by_other_user_effective_ids.values())
136
137 return any(project in authed_user_projects for project in other_user_projects)
138
139
140# TODO(https://crbug.com/monorail/8192): Remove this method.
141def DeprecatedShouldRevealEmail(user_auth, project, viewed_email):
142 # type: (AuthData, Project, str) -> bool
143 """
144 Deprecated V1 API logic to decide whether to publish a user's email
145 address. Avoid updating this method.
146
147 Args:
148 user_auth: The AuthData of the user viewing the email addresses.
149 project: The Project PB to which the viewed user belongs.
150 viewed_email: The email of the viewed user.
151
152 Returns:
153 True if email addresses should be published to the logged-in user.
154 """
155 # Case 1: Anon users don't see anything revealed.
156 if user_auth.user_pb is None:
157 return False
158
159 # Case 2: site admins always see unobscured email addresses.
160 if user_auth.user_pb.is_site_admin:
161 return True
162
163 # Case 3: Project members see the unobscured email of everyone in a project.
164 if project and UserIsInProject(project, user_auth.effective_ids):
165 return True
166
167 # Case 4: Do not obscure your own email.
168 if viewed_email and user_auth.user_pb.email == viewed_email:
169 return True
170
171 return False
172
173
174def ParseAndObscureAddress(email):
175 # type: str -> str
176 """Break the given email into username and domain, and obscure.
177
178 Args:
179 email: string email address to process
180
181 Returns:
182 A 4-tuple (username, domain, obscured_username, obscured_email).
183 The obscured_username is truncated more aggressively than how Google Groups
184 does it: it truncates at 5 characters or truncates OFF 3 characters,
185 whichever results in a shorter obscured_username.
186 """
187 if '@' in email:
188 username, user_domain = email.split('@', 1)
189 else: # don't fail if User table has unexpected email address format.
190 username, user_domain = email, ''
191
192 base_username = username.split('+')[0]
193 cutoff_point = min(5, max(1, len(base_username) - 3))
194 obscured_username = base_username[:cutoff_point]
195 obscured_email = '%s...@%s' %(obscured_username, user_domain)
196
197 return username, user_domain, obscured_username, obscured_email
198
199
200def CreateUserDisplayNamesAndEmails(cnxn, services, user_auth, users):
201 # type: (MonorailConnection, Services, AuthData,
202 # Collection[user_pb2.User]) ->
203 # Tuple[Mapping[int, str], Mapping[int, str]]
204 """Create the display names and emails of the given users based on the
205 current user.
206
207 Args:
208 cnxn: MonorailConnection to the database.
209 services: Services object for connections to backend services.
210 user_auth: AuthData object that identifies the logged in user.
211 users: Collection of User PB objects.
212
213 Returns:
214 A Tuple containing two Dicts of user_ids to display names and user_ids to
215 emails. If a given User does not have an email, there will be an empty
216 string in both.
217 """
218 # NOTE: Currently only service accounts can have display_names set. For all
219 # other users and service accounts with no display_names specified, we use the
220 # obscured or unobscured emails for both `display_names` and `emails`.
221 # See crbug.com/monorail/8510.
222 display_names = {}
223 emails = {}
224
225 # Do a pass on simple display cases.
226 maybe_revealed_users = []
227 for user in users:
228 if user.user_id == framework_constants.DELETED_USER_ID:
229 display_names[user.user_id] = framework_constants.DELETED_USER_NAME
230 emails[user.user_id] = ''
231 elif not user.email:
232 display_names[user.user_id] = ''
233 emails[user.user_id] = ''
234 elif not user.obscure_email:
235 display_names[user.user_id] = user.email
236 emails[user.user_id] = user.email
237 else:
238 # Default to hiding user email.
239 (_username, _domain, _obs_username,
240 obs_email) = ParseAndObscureAddress(user.email)
241 display_names[user.user_id] = obs_email
242 emails[user.user_id] = obs_email
243 maybe_revealed_users.append(user)
244
245 # Reveal viewable emails.
246 viewable_users = FilterViewableEmails(
247 cnxn, services, user_auth, maybe_revealed_users)
248 for user in viewable_users:
249 display_names[user.user_id] = user.email
250 emails[user.user_id] = user.email
251
252 # Use Client.display_names for service accounts that have one specified.
253 for user in users:
254 if user.email in client_config_svc.GetServiceAccountMap():
255 display_names[user.user_id] = client_config_svc.GetServiceAccountMap()[
256 user.email]
257
258 return display_names, emails
259
260
261def UserOwnsProject(project, effective_ids):
262 """Return True if any of the effective_ids is a project owner."""
263 return not effective_ids.isdisjoint(project.owner_ids or set())
264
265
266def UserIsInProject(project, effective_ids):
267 """Return True if any of the effective_ids is a project member.
268
269 Args:
270 project: Project PB for the current project.
271 effective_ids: set of int user IDs for the current user (including all
272 user groups). This will be an empty set for anonymous users.
273
274 Returns:
275 True if the user has any direct or indirect role in the project. The value
276 will actually be a set(), but it will have an ID in it if the user is in
277 the project, or it will be an empty set which is considered False.
278 """
279 return (UserOwnsProject(project, effective_ids) or
280 not effective_ids.isdisjoint(project.committer_ids or set()) or
281 not effective_ids.isdisjoint(project.contributor_ids or set()))
282
283
284def IsPriviledgedDomainUser(email):
285 """Return True if the user's account is from a priviledged domain."""
286 if email and '@' in email:
287 _, user_domain = email.split('@', 1)
288 return user_domain in settings.priviledged_user_domains
289
290 return False
291
292
293def IsValidColumnSpec(col_spec):
294 # type: str -> bool
295 """Return true if the given column specification is valid."""
296 return re.match(RE_COLUMN_SPEC, col_spec)
297
298
299# String translation table to catch a common typos in label names.
300_CANONICALIZATION_TRANSLATION_TABLE = {
301 ord(delete_u_char): None
302 for delete_u_char in u'!"#$%&\'()*+,/:;<>?@[\\]^`{|}~\t\n\x0b\x0c\r '
303 }
304_CANONICALIZATION_TRANSLATION_TABLE.update({ord(u'='): ord(u'-')})
305
306
307def CanonicalizeLabel(user_input):
308 """Canonicalize a given label or status value.
309
310 When the user enters a string that represents a label or an enum,
311 convert it a canonical form that makes it more likely to match
312 existing values.
313
314 Args:
315 user_input: string that the user typed for a label.
316
317 Returns:
318 Canonical form of that label as a unicode string.
319 """
320 if user_input is None:
321 return user_input
322
323 if not isinstance(user_input, six.text_type):
324 user_input = user_input.decode('utf-8')
325
326 canon_str = user_input.translate(_CANONICALIZATION_TRANSLATION_TABLE)
327 return canon_str
328
329
330def MergeLabels(labels_list, labels_add, labels_remove, config):
331 """Update a list of labels with the given add and remove label lists.
332
333 Args:
334 labels_list: list of current labels.
335 labels_add: labels that the user wants to add.
336 labels_remove: labels that the user wants to remove.
337 config: ProjectIssueConfig with info about exclusive prefixes and
338 enum fields.
339
340 Returns:
341 (merged_labels, update_labels_add, update_labels_remove):
342 A new list of labels with the given labels added and removed, and
343 any exclusive label prefixes taken into account. Then two
344 lists of update strings to explain the changes that were actually
345 made.
346 """
347 old_lower_labels = [lab.lower() for lab in labels_list]
348 labels_add = [lab for lab in labels_add
349 if lab.lower() not in old_lower_labels]
350 labels_remove = [lab for lab in labels_remove
351 if lab.lower() in old_lower_labels]
352 labels_remove_lower = [lab.lower() for lab in labels_remove]
353 exclusive_prefixes = [
354 lab.lower() + '-' for lab in config.exclusive_label_prefixes]
355 for fd in config.field_defs:
356 if (fd.field_type == tracker_pb2.FieldTypes.ENUM_TYPE and
357 not fd.is_multivalued):
358 exclusive_prefixes.append(fd.field_name.lower() + '-')
359
360 # We match prefix strings rather than splitting on dash because
361 # an exclusive-prefix or field name may contain dashes.
362 def MatchPrefix(lab, prefixes):
363 for prefix_dash in prefixes:
364 if lab.lower().startswith(prefix_dash):
365 return prefix_dash
366 return False
367
368 # Dedup any added labels. E.g., ignore attempts to add Priority twice.
369 excl_add = []
370 dedupped_labels_add = []
371 for lab in labels_add:
372 matched_prefix_dash = MatchPrefix(lab, exclusive_prefixes)
373 if matched_prefix_dash:
374 if matched_prefix_dash not in excl_add:
375 excl_add.append(matched_prefix_dash)
376 dedupped_labels_add.append(lab)
377 else:
378 dedupped_labels_add.append(lab)
379
380 # "Old minus exclusive" is the set of old label values minus any
381 # that should be overwritten by newly set exclusive labels.
382 old_minus_excl = []
383 for lab in labels_list:
384 matched_prefix_dash = MatchPrefix(lab, excl_add)
385 if not matched_prefix_dash:
386 old_minus_excl.append(lab)
387
388 merged_labels = [lab for lab in old_minus_excl + dedupped_labels_add
389 if lab.lower() not in labels_remove_lower]
390
391 return merged_labels, dedupped_labels_add, labels_remove
392
393
394# Pattern to match a valid hotlist name.
395RE_HOTLIST_NAME_PATTERN = r"[a-zA-Z][-0-9a-zA-Z\.]*"
396
397# Compiled regexp to match the hotlist name and nothing more before or after.
398RE_HOTLIST_NAME = re.compile(
399 '^%s$' % RE_HOTLIST_NAME_PATTERN, re.VERBOSE)
400
401
402def IsValidHotlistName(s):
403 """Return true if the given string is a valid hotlist name."""
404 return (RE_HOTLIST_NAME.match(s) and
405 len(s) <= framework_constants.MAX_HOTLIST_NAME_LENGTH)
406
407
408USER_PREF_DEFS = {
409 'code_font': re.compile('(true|false)'),
410 'render_markdown': re.compile('(true|false)'),
411
412 # The are for dismissible cues. True means the user has dismissed them.
413 'privacy_click_through': re.compile('(true|false)'),
414 'corp_mode_click_through': re.compile('(true|false)'),
415 'code_of_conduct': re.compile('(true|false)'),
416 'dit_keystrokes': re.compile('(true|false)'),
417 'italics_mean_derived': re.compile('(true|false)'),
418 'availability_msgs': re.compile('(true|false)'),
419 'your_email_bounced': re.compile('(true|false)'),
420 'search_for_numbers': re.compile('(true|false)'),
421 'restrict_new_issues': re.compile('(true|false)'),
422 'public_issue_notice': re.compile('(true|false)'),
423 'you_are_on_vacation': re.compile('(true|false)'),
424 'how_to_join_project': re.compile('(true|false)'),
425 'document_team_duties': re.compile('(true|false)'),
426 'showing_ids_instead_of_tiles': re.compile('(true|false)'),
427 'issue_timestamps': re.compile('(true|false)'),
428 'stale_fulltext': re.compile('(true|false)'),
429 }
430MAX_PREF_VALUE_LENGTH = 80
431
432
433def ValidatePref(name, value):
434 """Return an error message if the server does not support a pref value."""
435 if name not in USER_PREF_DEFS:
436 return 'Unknown pref name: %r' % name
437 if len(value) > MAX_PREF_VALUE_LENGTH:
438 return 'Value for pref name %r is too long' % name
439 if not USER_PREF_DEFS[name].match(value):
440 return 'Invalid pref value %r for %r' % (value, name)
441 return None
442
443
444def IsRestrictNewIssuesUser(cnxn, services, user_id):
445 # type: (MonorailConnection, Services, int) -> bool)
446 """Returns true iff user's new issues should be restricted by default."""
447 user_group_ids = services.usergroup.LookupMemberships(cnxn, user_id)
448 restrict_new_issues_groups_dict = services.user.LookupUserIDs(
449 cnxn, settings.restrict_new_issues_user_groups, autocreate=True)
450 restrict_new_issues_group_ids = set(restrict_new_issues_groups_dict.values())
451 return any(gid in restrict_new_issues_group_ids for gid in user_group_ids)
452
453
454def IsPublicIssueNoticeUser(cnxn, services, user_id):
455 # type: (MonorailConnection, Services, int) -> bool)
456 """Returns true iff user should see a public issue notice by default."""
457 user_group_ids = services.usergroup.LookupMemberships(cnxn, user_id)
458 public_issue_notice_groups_dict = services.user.LookupUserIDs(
459 cnxn, settings.public_issue_notice_user_groups, autocreate=True)
460 public_issue_notice_group_ids = set(public_issue_notice_groups_dict.values())
461 return any(gid in public_issue_notice_group_ids for gid in user_group_ids)
462
463
464def GetEffectiveIds(cnxn, services, user_ids):
465 # type: (MonorailConnection, Services, Collection[int]) ->
466 # Mapping[int, Collection[int]]
467 """
468 Given a set of user IDs, it returns a mapping of user_id to a set of effective
469 IDs that include the user's ID and all of their user groups. This mapping
470 will be contain only the user_id anonymous users.
471 """
472 # Get direct memberships for user_ids.
473 effective_ids_by_user_id = services.usergroup.LookupAllMemberships(
474 cnxn, user_ids)
475 # Add user_id to list of effective_ids.
476 for user_id, effective_ids in effective_ids_by_user_id.items():
477 effective_ids.add(user_id)
478 # Get User objects for user_ids.
479 users_by_id = services.user.GetUsersByIDs(cnxn, user_ids)
480 for user_id, user in users_by_id.items():
481 if user and user.email:
482 effective_ids_by_user_id[user_id].update(
483 _ComputeMembershipsByEmail(cnxn, services, user.email))
484
485 # Add related parent and child ids.
486 related_ids = []
487 if user.linked_parent_id:
488 related_ids.append(user.linked_parent_id)
489 if user.linked_child_ids:
490 related_ids.extend(user.linked_child_ids)
491
492 # Add any related efective_ids.
493 if related_ids:
494 effective_ids_by_user_id[user_id].update(related_ids)
495 effective_ids_by_related_id = services.usergroup.LookupAllMemberships(
496 cnxn, related_ids)
497 related_effective_ids = functools.reduce(
498 set.union, effective_ids_by_related_id.values(), set())
499 effective_ids_by_user_id[user_id].update(related_effective_ids)
500 return effective_ids_by_user_id
501
502
503def _ComputeMembershipsByEmail(cnxn, services, email):
504 # type: (MonorailConnection, Services, str) -> Collection[int]
505 """
506 Given an user email, it returns a list [group_id] of computed user groups.
507 """
508 # Get the user email domain to compute memberships of the user.
509 (_username, user_email_domain, _obs_username,
510 _obs_email) = ParseAndObscureAddress(email)
511 return services.usergroup.LookupComputedMemberships(cnxn, user_email_domain)