| # Copyright 2016 The Chromium Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Business objects for Monorail's framework. |
| |
| These are classes and functions that operate on the objects that |
| users care about in Monorail but that are not part of just one specific |
| component: e.g., projects, users, and labels. |
| """ |
| from __future__ import print_function |
| from __future__ import division |
| from __future__ import absolute_import |
| |
| import functools |
| import itertools |
| import re |
| |
| import six |
| |
| import settings |
| from framework import exceptions |
| from framework import framework_constants |
| from mrproto import tracker_pb2 |
| from services import client_config_svc |
| |
| |
| # Pattern to match a valid column header name. |
| RE_COLUMN_NAME = r'\w+[\w+-.]*\w+' |
| |
| # Compiled regexp to match a valid column specification. |
| RE_COLUMN_SPEC = re.compile(r'(%s(\s%s)*)*$' % (RE_COLUMN_NAME, RE_COLUMN_NAME)) |
| |
| |
| def WhichUsersShareAProject(cnxn, services, user_effective_ids, other_users): |
| # type: (MonorailConnection, Services, Sequence[int], |
| # Collection[user_pb2.User]) -> Collection[user_pb2.User] |
| """Returns a list of users that share a project with given user_effective_ids. |
| |
| Args: |
| cnxn: MonorailConnection to the database. |
| services: Services object for connections to backend services. |
| user_effective_ids: The user's set of effective_ids. |
| other_users: The list of users to be filtered for email visibility. |
| |
| Returns: |
| Collection of users that share a project with at least one effective_id. |
| """ |
| |
| projects_by_user_effective_id = services.project.GetProjectMemberships( |
| cnxn, user_effective_ids) |
| authed_user_projects = set( |
| itertools.chain.from_iterable(projects_by_user_effective_id.values())) |
| |
| other_user_ids = [other_user.user_id for other_user in other_users] |
| all_other_user_effective_ids = GetEffectiveIds(cnxn, services, other_user_ids) |
| users_that_share_project = [] |
| for other_user in other_users: |
| other_user_effective_ids = all_other_user_effective_ids[other_user.user_id] |
| |
| # Do not filter yourself. |
| if any(uid in user_effective_ids for uid in other_user_effective_ids): |
| users_that_share_project.append(other_user) |
| continue |
| |
| other_user_proj_by_effective_ids = services.project.GetProjectMemberships( |
| cnxn, other_user_effective_ids) |
| other_user_projects = itertools.chain.from_iterable( |
| other_user_proj_by_effective_ids.values()) |
| if any(project in authed_user_projects for project in other_user_projects): |
| users_that_share_project.append(other_user) |
| return users_that_share_project |
| |
| |
| def FilterViewableEmails(cnxn, services, user_auth, other_users): |
| # type: (MonorailConnection, Services, AuthData, |
| # Collection[user_pb2.User]) -> Collection[user_pb2.User] |
| """Returns a list of users with emails visible to `user_auth`. |
| |
| Args: |
| cnxn: MonorailConnection to the database. |
| services: Services object for connections to backend services. |
| user_auth: The AuthData of the user viewing the email addresses. |
| other_users: The list of users to be filtered for email visibility. |
| |
| Returns: |
| Collection of user that should reveal their emails. |
| """ |
| # Case 1: Anon users don't see anything revealed. |
| if user_auth.user_pb is None: |
| return [] |
| |
| # Case 2: site admins always see unobscured email addresses. |
| if user_auth.user_pb.is_site_admin: |
| return other_users |
| |
| # Case 3: Members of any groups in settings.full_emails_perm_groups |
| # can view unobscured email addresses. |
| for group_email in settings.full_emails_perm_groups: |
| if services.usergroup.LookupUserGroupID( |
| cnxn, group_email) in user_auth.effective_ids: |
| return other_users |
| |
| # Case 4: Users see unobscured emails as long as they share a common Project. |
| return WhichUsersShareAProject( |
| cnxn, services, user_auth.effective_ids, other_users) |
| |
| |
| def DoUsersShareAProject(cnxn, services, user_effective_ids, other_user_id): |
| # type: (MonorailConnection, Services, Sequence[int], int) -> bool |
| """Determine whether two users share at least one Project. |
| |
| The user_effective_ids may include group ids or the other_user_id may be a |
| member of a group that results in transitive Project ownership. |
| |
| Args: |
| cnxn: MonorailConnection to the database. |
| services: Services object for connections to backend services. |
| user_effective_ids: The effective ids of the authorized User. |
| other_user_id: The other user's user_id to compare against. |
| |
| Returns: |
| True if one or more Projects are shared between the Users. |
| """ |
| projects_by_user_effective_id = services.project.GetProjectMemberships( |
| cnxn, user_effective_ids) |
| authed_user_projects = itertools.chain.from_iterable( |
| projects_by_user_effective_id.values()) |
| |
| # Get effective ids for other user to handle transitive Project membership. |
| other_user_effective_ids = GetEffectiveIds(cnxn, services, other_user_id) |
| projects_by_other_user_effective_ids = services.project.GetProjectMemberships( |
| cnxn, other_user_effective_ids) |
| other_user_projects = itertools.chain.from_iterable( |
| projects_by_other_user_effective_ids.values()) |
| |
| return any(project in authed_user_projects for project in other_user_projects) |
| |
| |
| # TODO(https://crbug.com/monorail/8192): Remove this method. |
| def DeprecatedShouldRevealEmail(user_auth, project, viewed_email): |
| # type: (AuthData, Project, str) -> bool |
| """ |
| Deprecated V1 API logic to decide whether to publish a user's email |
| address. Avoid updating this method. |
| |
| Args: |
| user_auth: The AuthData of the user viewing the email addresses. |
| project: The Project PB to which the viewed user belongs. |
| viewed_email: The email of the viewed user. |
| |
| Returns: |
| True if email addresses should be published to the logged-in user. |
| """ |
| # Case 1: Anon users don't see anything revealed. |
| if user_auth.user_pb is None: |
| return False |
| |
| # Case 2: site admins always see unobscured email addresses. |
| if user_auth.user_pb.is_site_admin: |
| return True |
| |
| # Case 3: Project members see the unobscured email of everyone in a project. |
| if project and UserIsInProject(project, user_auth.effective_ids): |
| return True |
| |
| # Case 4: Do not obscure your own email. |
| if viewed_email and user_auth.user_pb.email == viewed_email: |
| return True |
| |
| return False |
| |
| |
| def ParseAndObscureAddress(email): |
| # type: str -> str |
| """Break the given email into username and domain, and obscure. |
| |
| Args: |
| email: string email address to process |
| |
| Returns: |
| A 4-tuple (username, domain, obscured_username, obscured_email). |
| The obscured_username is truncated more aggressively than how Google Groups |
| does it: it truncates at 5 characters or truncates OFF 3 characters, |
| whichever results in a shorter obscured_username. |
| """ |
| if '@' in email: |
| username, user_domain = email.split('@', 1) |
| else: # don't fail if User table has unexpected email address format. |
| username, user_domain = email, '' |
| |
| base_username = username.split('+')[0] |
| cutoff_point = min(5, max(1, len(base_username) - 3)) |
| obscured_username = base_username[:cutoff_point] |
| obscured_email = '%s...@%s' %(obscured_username, user_domain) |
| |
| return username, user_domain, obscured_username, obscured_email |
| |
| |
| def CreateUserDisplayNamesAndEmails(cnxn, services, user_auth, users): |
| # type: (MonorailConnection, Services, AuthData, |
| # Collection[user_pb2.User]) -> |
| # Tuple[Mapping[int, str], Mapping[int, str]] |
| """Create the display names and emails of the given users based on the |
| current user. |
| |
| Args: |
| cnxn: MonorailConnection to the database. |
| services: Services object for connections to backend services. |
| user_auth: AuthData object that identifies the logged in user. |
| users: Collection of User PB objects. |
| |
| Returns: |
| A Tuple containing two Dicts of user_ids to display names and user_ids to |
| emails. If a given User does not have an email, there will be an empty |
| string in both. |
| """ |
| # NOTE: Currently only service accounts can have display_names set. For all |
| # other users and service accounts with no display_names specified, we use the |
| # obscured or unobscured emails for both `display_names` and `emails`. |
| # See crbug.com/monorail/8510. |
| display_names = {} |
| emails = {} |
| |
| # Do a pass on simple display cases. |
| maybe_revealed_users = [] |
| for user in users: |
| if user.user_id == framework_constants.DELETED_USER_ID: |
| display_names[user.user_id] = framework_constants.DELETED_USER_NAME |
| emails[user.user_id] = '' |
| elif not user.email: |
| display_names[user.user_id] = '' |
| emails[user.user_id] = '' |
| elif not user.obscure_email: |
| display_names[user.user_id] = user.email |
| emails[user.user_id] = user.email |
| else: |
| # Default to hiding user email. |
| (_username, _domain, _obs_username, |
| obs_email) = ParseAndObscureAddress(user.email) |
| display_names[user.user_id] = obs_email |
| emails[user.user_id] = obs_email |
| maybe_revealed_users.append(user) |
| |
| # Reveal viewable emails. |
| viewable_users = FilterViewableEmails( |
| cnxn, services, user_auth, maybe_revealed_users) |
| for user in viewable_users: |
| display_names[user.user_id] = user.email |
| emails[user.user_id] = user.email |
| |
| # Use Client.display_names for service accounts that have one specified. |
| for user in users: |
| if user.email in client_config_svc.GetServiceAccountMap(): |
| display_names[user.user_id] = client_config_svc.GetServiceAccountMap()[ |
| user.email] |
| |
| return display_names, emails |
| |
| |
| def UserOwnsProject(project, effective_ids): |
| """Return True if any of the effective_ids is a project owner.""" |
| return not effective_ids.isdisjoint(project.owner_ids or set()) |
| |
| |
| def UserIsInProject(project, effective_ids): |
| """Return True if any of the effective_ids is a project member. |
| |
| Args: |
| project: Project PB for the current project. |
| effective_ids: set of int user IDs for the current user (including all |
| user groups). This will be an empty set for anonymous users. |
| |
| Returns: |
| True if the user has any direct or indirect role in the project. The value |
| will actually be a set(), but it will have an ID in it if the user is in |
| the project, or it will be an empty set which is considered False. |
| """ |
| return (UserOwnsProject(project, effective_ids) or |
| not effective_ids.isdisjoint(project.committer_ids or set()) or |
| not effective_ids.isdisjoint(project.contributor_ids or set())) |
| |
| |
| def IsPriviledgedDomainUser(email): |
| """Return True if the user's account is from a priviledged domain.""" |
| if email and '@' in email: |
| _, user_domain = email.split('@', 1) |
| return user_domain in settings.priviledged_user_domains |
| |
| return False |
| |
| |
| def IsValidColumnSpec(col_spec): |
| # type: str -> bool |
| """Return true if the given column specification is valid.""" |
| return re.match(RE_COLUMN_SPEC, col_spec) |
| |
| |
| # String translation table to catch a common typos in label names. |
| _CANONICALIZATION_TRANSLATION_TABLE = { |
| ord(delete_u_char): None |
| for delete_u_char in u'!"#$%&\'()*+,/:;<>?@[\\]^`{|}~\t\n\x0b\x0c\r ' |
| } |
| _CANONICALIZATION_TRANSLATION_TABLE.update({ord(u'='): ord(u'-')}) |
| |
| |
| def CanonicalizeLabel(user_input): |
| """Canonicalize a given label or status value. |
| |
| When the user enters a string that represents a label or an enum, |
| convert it a canonical form that makes it more likely to match |
| existing values. |
| |
| Args: |
| user_input: string that the user typed for a label. |
| |
| Returns: |
| Canonical form of that label as a unicode string. |
| """ |
| if user_input is None: |
| return user_input |
| |
| if not isinstance(user_input, six.text_type): |
| user_input = user_input.decode('utf-8') |
| |
| canon_str = user_input.translate(_CANONICALIZATION_TRANSLATION_TABLE) |
| return canon_str |
| |
| |
| def MergeLabels(labels_list, labels_add, labels_remove, config): |
| """Update a list of labels with the given add and remove label lists. |
| |
| Args: |
| labels_list: list of current labels. |
| labels_add: labels that the user wants to add. |
| labels_remove: labels that the user wants to remove. |
| config: ProjectIssueConfig with info about exclusive prefixes and |
| enum fields. |
| |
| Returns: |
| (merged_labels, update_labels_add, update_labels_remove): |
| A new list of labels with the given labels added and removed, and |
| any exclusive label prefixes taken into account. Then two |
| lists of update strings to explain the changes that were actually |
| made. |
| """ |
| old_lower_labels = [lab.lower() for lab in labels_list] |
| labels_add = [lab for lab in labels_add |
| if lab.lower() not in old_lower_labels] |
| labels_remove = [lab for lab in labels_remove |
| if lab.lower() in old_lower_labels] |
| labels_remove_lower = [lab.lower() for lab in labels_remove] |
| exclusive_prefixes = [ |
| lab.lower() + '-' for lab in config.exclusive_label_prefixes] |
| for fd in config.field_defs: |
| if (fd.field_type == tracker_pb2.FieldTypes.ENUM_TYPE and |
| not fd.is_multivalued): |
| exclusive_prefixes.append(fd.field_name.lower() + '-') |
| |
| # We match prefix strings rather than splitting on dash because |
| # an exclusive-prefix or field name may contain dashes. |
| def MatchPrefix(lab, prefixes): |
| for prefix_dash in prefixes: |
| if lab.lower().startswith(prefix_dash): |
| return prefix_dash |
| return False |
| |
| # Dedup any added labels. E.g., ignore attempts to add Priority twice. |
| excl_add = [] |
| dedupped_labels_add = [] |
| for lab in labels_add: |
| matched_prefix_dash = MatchPrefix(lab, exclusive_prefixes) |
| if matched_prefix_dash: |
| if matched_prefix_dash not in excl_add: |
| excl_add.append(matched_prefix_dash) |
| dedupped_labels_add.append(lab) |
| else: |
| dedupped_labels_add.append(lab) |
| |
| # "Old minus exclusive" is the set of old label values minus any |
| # that should be overwritten by newly set exclusive labels. |
| old_minus_excl = [] |
| for lab in labels_list: |
| matched_prefix_dash = MatchPrefix(lab, excl_add) |
| if not matched_prefix_dash: |
| old_minus_excl.append(lab) |
| |
| merged_labels = [lab for lab in old_minus_excl + dedupped_labels_add |
| if lab.lower() not in labels_remove_lower] |
| |
| return merged_labels, dedupped_labels_add, labels_remove |
| |
| |
| # Pattern to match a valid hotlist name. |
| RE_HOTLIST_NAME_PATTERN = r"[a-zA-Z][-0-9a-zA-Z\.]*" |
| |
| # Compiled regexp to match the hotlist name and nothing more before or after. |
| RE_HOTLIST_NAME = re.compile( |
| '^%s$' % RE_HOTLIST_NAME_PATTERN, re.VERBOSE) |
| |
| |
| def IsValidHotlistName(s): |
| """Return true if the given string is a valid hotlist name.""" |
| return (RE_HOTLIST_NAME.match(s) and |
| len(s) <= framework_constants.MAX_HOTLIST_NAME_LENGTH) |
| |
| |
| USER_PREF_DEFS = { |
| 'code_font': re.compile('(true|false)'), |
| 'render_markdown': re.compile('(true|false)'), |
| |
| # The are for dismissible cues. True means the user has dismissed them. |
| 'privacy_click_through': re.compile('(true|false)'), |
| 'corp_mode_click_through': re.compile('(true|false)'), |
| 'code_of_conduct': re.compile('(true|false)'), |
| 'dit_keystrokes': re.compile('(true|false)'), |
| 'italics_mean_derived': re.compile('(true|false)'), |
| 'availability_msgs': re.compile('(true|false)'), |
| 'your_email_bounced': re.compile('(true|false)'), |
| 'search_for_numbers': re.compile('(true|false)'), |
| 'restrict_new_issues': re.compile('(true|false)'), |
| 'public_issue_notice': re.compile('(true|false)'), |
| 'you_are_on_vacation': re.compile('(true|false)'), |
| 'how_to_join_project': re.compile('(true|false)'), |
| 'document_team_duties': re.compile('(true|false)'), |
| 'showing_ids_instead_of_tiles': re.compile('(true|false)'), |
| 'issue_timestamps': re.compile('(true|false)'), |
| 'stale_fulltext': re.compile('(true|false)'), |
| } |
| MAX_PREF_VALUE_LENGTH = 80 |
| |
| |
| def ValidatePref(name, value): |
| """Return an error message if the server does not support a pref value.""" |
| if name not in USER_PREF_DEFS: |
| return 'Unknown pref name: %r' % name |
| if len(value) > MAX_PREF_VALUE_LENGTH: |
| return 'Value for pref name %r is too long' % name |
| if not USER_PREF_DEFS[name].match(value): |
| return 'Invalid pref value %r for %r' % (value, name) |
| return None |
| |
| |
| def IsRestrictNewIssuesUser(cnxn, services, user_id): |
| # type: (MonorailConnection, Services, int) -> bool) |
| """Returns true iff user's new issues should be restricted by default.""" |
| user_group_ids = services.usergroup.LookupMemberships(cnxn, user_id) |
| restrict_new_issues_groups_dict = services.user.LookupUserIDs( |
| cnxn, settings.restrict_new_issues_user_groups, autocreate=True) |
| restrict_new_issues_group_ids = set(restrict_new_issues_groups_dict.values()) |
| return any(gid in restrict_new_issues_group_ids for gid in user_group_ids) |
| |
| |
| def IsPublicIssueNoticeUser(cnxn, services, user_id): |
| # type: (MonorailConnection, Services, int) -> bool) |
| """Returns true iff user should see a public issue notice by default.""" |
| user_group_ids = services.usergroup.LookupMemberships(cnxn, user_id) |
| public_issue_notice_groups_dict = services.user.LookupUserIDs( |
| cnxn, settings.public_issue_notice_user_groups, autocreate=True) |
| public_issue_notice_group_ids = set(public_issue_notice_groups_dict.values()) |
| return any(gid in public_issue_notice_group_ids for gid in user_group_ids) |
| |
| |
| def GetEffectiveIds(cnxn, services, user_ids): |
| # type: (MonorailConnection, Services, Collection[int]) -> |
| # Mapping[int, Collection[int]] |
| """ |
| Given a set of user IDs, it returns a mapping of user_id to a set of effective |
| IDs that include the user's ID and all of their user groups. This mapping |
| will be contain only the user_id anonymous users. |
| """ |
| # Get direct memberships for user_ids. |
| effective_ids_by_user_id = services.usergroup.LookupAllMemberships( |
| cnxn, user_ids) |
| # Add user_id to list of effective_ids. |
| for user_id, effective_ids in effective_ids_by_user_id.items(): |
| effective_ids.add(user_id) |
| # Get User objects for user_ids. |
| users_by_id = services.user.GetUsersByIDs(cnxn, user_ids) |
| for user_id, user in users_by_id.items(): |
| if user and user.email: |
| effective_ids_by_user_id[user_id].update( |
| _ComputeMembershipsByEmail(cnxn, services, user.email)) |
| |
| # Add related parent and child ids. |
| related_ids = [] |
| if user.linked_parent_id: |
| related_ids.append(user.linked_parent_id) |
| if user.linked_child_ids: |
| related_ids.extend(user.linked_child_ids) |
| |
| # Add any related efective_ids. |
| if related_ids: |
| effective_ids_by_user_id[user_id].update(related_ids) |
| effective_ids_by_related_id = services.usergroup.LookupAllMemberships( |
| cnxn, related_ids) |
| related_effective_ids = functools.reduce( |
| set.union, effective_ids_by_related_id.values(), set()) |
| effective_ids_by_user_id[user_id].update(related_effective_ids) |
| return effective_ids_by_user_id |
| |
| |
| def _ComputeMembershipsByEmail(cnxn, services, email): |
| # type: (MonorailConnection, Services, str) -> Collection[int] |
| """ |
| Given an user email, it returns a list [group_id] of computed user groups. |
| """ |
| # Get the user email domain to compute memberships of the user. |
| (_username, user_email_domain, _obs_username, |
| _obs_email) = ParseAndObscureAddress(email) |
| return services.usergroup.LookupComputedMemberships(cnxn, user_email_domain) |