Project import generated by Copybara.

GitOrigin-RevId: d9e9e3fb4e31372ec1fb43b178994ca78fa8fe70
diff --git a/framework/framework_bizobj.py b/framework/framework_bizobj.py
new file mode 100644
index 0000000..bacaec5
--- /dev/null
+++ b/framework/framework_bizobj.py
@@ -0,0 +1,512 @@
+# Copyright 2016 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style
+# license that can be found in the LICENSE file or at
+# https://developers.google.com/open-source/licenses/bsd
+
+"""Business objects for Monorail's framework.
+
+These are classes and functions that operate on the objects that
+users care about in Monorail but that are not part of just one specific
+component: e.g., projects, users, and labels.
+"""
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+
+import functools
+import itertools
+import re
+
+import six
+
+import settings
+from framework import exceptions
+from framework import framework_constants
+from proto import tracker_pb2
+from services import client_config_svc
+
+
+# Pattern to match a valid column header name.
+RE_COLUMN_NAME = r'\w+[\w+-.]*\w+'
+
+# Compiled regexp to match a valid column specification.
+RE_COLUMN_SPEC = re.compile('(%s(\s%s)*)*$' % (RE_COLUMN_NAME, RE_COLUMN_NAME))
+
+
+def WhichUsersShareAProject(cnxn, services, user_effective_ids, other_users):
+  # type: (MonorailConnection, Services, Sequence[int],
+  #     Collection[user_pb2.User]) -> Collection[user_pb2.User]
+  """Returns a list of users that share a project with given user_effective_ids.
+
+  Args:
+    cnxn: MonorailConnection to the database.
+    services: Services object for connections to backend services.
+    user_effective_ids: The user's set of effective_ids.
+    other_users: The list of users to be filtered for email visibility.
+
+  Returns:
+    Collection of users that share a project with at least one effective_id.
+  """
+
+  projects_by_user_effective_id = services.project.GetProjectMemberships(
+      cnxn, user_effective_ids)
+  authed_user_projects = set(
+      itertools.chain.from_iterable(projects_by_user_effective_id.values()))
+
+  other_user_ids = [other_user.user_id for other_user in other_users]
+  all_other_user_effective_ids = GetEffectiveIds(cnxn, services, other_user_ids)
+  users_that_share_project = []
+  for other_user in other_users:
+    other_user_effective_ids = all_other_user_effective_ids[other_user.user_id]
+
+    # Do not filter yourself.
+    if any(uid in user_effective_ids for uid in other_user_effective_ids):
+      users_that_share_project.append(other_user)
+      continue
+
+    other_user_proj_by_effective_ids = services.project.GetProjectMemberships(
+        cnxn, other_user_effective_ids)
+    other_user_projects = itertools.chain.from_iterable(
+        other_user_proj_by_effective_ids.values())
+    if any(project in authed_user_projects for project in other_user_projects):
+      users_that_share_project.append(other_user)
+  return users_that_share_project
+
+
+def FilterViewableEmails(cnxn, services, user_auth, other_users):
+  # type: (MonorailConnection, Services, AuthData,
+  #     Collection[user_pb2.User]) -> Collection[user_pb2.User]
+  """Returns a list of users with emails visible to `user_auth`.
+
+  Args:
+    cnxn: MonorailConnection to the database.
+    services: Services object for connections to backend services.
+    user_auth: The AuthData of the user viewing the email addresses.
+    other_users: The list of users to be filtered for email visibility.
+
+  Returns:
+    Collection of user that should reveal their emails.
+  """
+  # Case 1: Anon users don't see anything revealed.
+  if user_auth.user_pb is None:
+    return []
+
+  # Case 2: site admins always see unobscured email addresses.
+  if user_auth.user_pb.is_site_admin:
+    return other_users
+
+  # Case 3: Members of any groups in settings.full_emails_perm_groups
+  # can view unobscured email addresses.
+  for group_email in settings.full_emails_perm_groups:
+    if services.usergroup.LookupUserGroupID(
+        cnxn, group_email) in user_auth.effective_ids:
+      return other_users
+
+  # Case 4: Users see unobscured emails as long as they share a common Project.
+  return WhichUsersShareAProject(
+      cnxn, services, user_auth.effective_ids, other_users)
+
+
+def DoUsersShareAProject(cnxn, services, user_effective_ids, other_user_id):
+  # type: (MonorailConnection, Services, Sequence[int], int) -> bool
+  """Determine whether two users share at least one Project.
+
+  The user_effective_ids may include group ids or the other_user_id may be a
+  member of a group that results in transitive Project ownership.
+
+  Args:
+    cnxn: MonorailConnection to the database.
+    services: Services object for connections to backend services.
+    user_effective_ids: The effective ids of the authorized User.
+    other_user_id: The other user's user_id to compare against.
+
+  Returns:
+    True if one or more Projects are shared between the Users.
+  """
+  projects_by_user_effective_id = services.project.GetProjectMemberships(
+      cnxn, user_effective_ids)
+  authed_user_projects = itertools.chain.from_iterable(
+      projects_by_user_effective_id.values())
+
+  # Get effective ids for other user to handle transitive Project membership.
+  other_user_effective_ids = GetEffectiveIds(cnxn, services, other_user_id)
+  projects_by_other_user_effective_ids = services.project.GetProjectMemberships(
+      cnxn, other_user_effective_ids)
+  other_user_projects = itertools.chain.from_iterable(
+      projects_by_other_user_effective_ids.values())
+
+  return any(project in authed_user_projects for project in other_user_projects)
+
+
+# TODO(https://crbug.com/monorail/8192): Remove this method.
+def DeprecatedShouldRevealEmail(user_auth, project, viewed_email):
+  # type: (AuthData, Project, str) -> bool
+  """
+  Deprecated V1 API logic to decide whether to publish a user's email
+  address. Avoid updating this method.
+
+  Args:
+    user_auth: The AuthData of the user viewing the email addresses.
+    project: The Project PB to which the viewed user belongs.
+    viewed_email: The email of the viewed user.
+
+  Returns:
+    True if email addresses should be published to the logged-in user.
+  """
+  # Case 1: Anon users don't see anything revealed.
+  if user_auth.user_pb is None:
+    return False
+
+  # Case 2: site admins always see unobscured email addresses.
+  if user_auth.user_pb.is_site_admin:
+    return True
+
+  # Case 3: Project members see the unobscured email of everyone in a project.
+  if project and UserIsInProject(project, user_auth.effective_ids):
+    return True
+
+  # Case 4: Do not obscure your own email.
+  if viewed_email and user_auth.user_pb.email == viewed_email:
+    return True
+
+  return False
+
+
+def ParseAndObscureAddress(email):
+  # type: str -> str
+  """Break the given email into username and domain, and obscure.
+
+  Args:
+    email: string email address to process
+
+  Returns:
+    A 4-tuple (username, domain, obscured_username, obscured_email).
+    The obscured_username is truncated more aggressively than how Google Groups
+    does it: it truncates at 5 characters or truncates OFF 3 characters,
+    whichever results in a shorter obscured_username.
+  """
+  if '@' in email:
+    username, user_domain = email.split('@', 1)
+  else:  # don't fail if User table has unexpected email address format.
+    username, user_domain = email, ''
+
+  base_username = username.split('+')[0]
+  cutoff_point = min(5, max(1, len(base_username) - 3))
+  obscured_username = base_username[:cutoff_point]
+  obscured_email = '%s...@%s' %(obscured_username, user_domain)
+
+  return username, user_domain, obscured_username, obscured_email
+
+
+def CreateUserDisplayNamesAndEmails(cnxn, services, user_auth, users):
+  # type: (MonorailConnection, Services, AuthData,
+  #     Collection[user_pb2.User]) ->
+  #     Tuple[Mapping[int, str], Mapping[int, str]]
+  """Create the display names and emails of the given users based on the
+     current user.
+
+  Args:
+    cnxn: MonorailConnection to the database.
+    services: Services object for connections to backend services.
+    user_auth: AuthData object that identifies the logged in user.
+    users: Collection of User PB objects.
+
+  Returns:
+    A Tuple containing two Dicts of user_ids to display names and user_ids to
+        emails. If a given User does not have an email, there will be an empty
+        string in both.
+  """
+  # NOTE: Currently only service accounts can have display_names set. For all
+  # other users and service accounts with no display_names specified, we use the
+  # obscured or unobscured emails for both `display_names` and `emails`.
+  # See crbug.com/monorail/8510.
+  display_names = {}
+  emails = {}
+
+  # Do a pass on simple display cases.
+  maybe_revealed_users = []
+  for user in users:
+    if user.user_id == framework_constants.DELETED_USER_ID:
+      display_names[user.user_id] = framework_constants.DELETED_USER_NAME
+      emails[user.user_id] = ''
+    elif not user.email:
+      display_names[user.user_id] = ''
+      emails[user.user_id] = ''
+    elif not user.obscure_email:
+      display_names[user.user_id] = user.email
+      emails[user.user_id] = user.email
+    else:
+      # Default to hiding user email.
+      (_username, _domain, _obs_username,
+       obs_email) = ParseAndObscureAddress(user.email)
+      display_names[user.user_id] = obs_email
+      emails[user.user_id] = obs_email
+      maybe_revealed_users.append(user)
+
+  # Reveal viewable emails.
+  viewable_users = FilterViewableEmails(
+      cnxn, services, user_auth, maybe_revealed_users)
+  for user in viewable_users:
+    display_names[user.user_id] = user.email
+    emails[user.user_id] = user.email
+
+  # Use Client.display_names for service accounts that have one specified.
+  for user in users:
+    if user.email in client_config_svc.GetServiceAccountMap():
+      display_names[user.user_id] = client_config_svc.GetServiceAccountMap()[
+          user.email]
+
+  return display_names, emails
+
+
+def UserOwnsProject(project, effective_ids):
+  """Return True if any of the effective_ids is a project owner."""
+  return not effective_ids.isdisjoint(project.owner_ids or set())
+
+
+def UserIsInProject(project, effective_ids):
+  """Return True if any of the effective_ids is a project member.
+
+  Args:
+    project: Project PB for the current project.
+    effective_ids: set of int user IDs for the current user (including all
+        user groups).  This will be an empty set for anonymous users.
+
+  Returns:
+    True if the user has any direct or indirect role in the project.  The value
+    will actually be a set(), but it will have an ID in it if the user is in
+    the project, or it will be an empty set which is considered False.
+  """
+  return (UserOwnsProject(project, effective_ids) or
+          not effective_ids.isdisjoint(project.committer_ids or set()) or
+          not effective_ids.isdisjoint(project.contributor_ids or set()))
+
+
+def IsPriviledgedDomainUser(email):
+  """Return True if the user's account is from a priviledged domain."""
+  if email and '@' in email:
+    _, user_domain = email.split('@', 1)
+    return user_domain in settings.priviledged_user_domains
+
+  return False
+
+
+def IsValidColumnSpec(col_spec):
+  # type: str -> bool
+  """Return true if the given column specification is valid."""
+  return re.match(RE_COLUMN_SPEC, col_spec)
+
+
+# String translation table to catch a common typos in label names.
+_CANONICALIZATION_TRANSLATION_TABLE = {
+    ord(delete_u_char): None
+    for delete_u_char in u'!"#$%&\'()*+,/:;<>?@[\\]^`{|}~\t\n\x0b\x0c\r '
+    }
+_CANONICALIZATION_TRANSLATION_TABLE.update({ord(u'='): ord(u'-')})
+
+
+def CanonicalizeLabel(user_input):
+  """Canonicalize a given label or status value.
+
+  When the user enters a string that represents a label or an enum,
+  convert it a canonical form that makes it more likely to match
+  existing values.
+
+  Args:
+    user_input: string that the user typed for a label.
+
+  Returns:
+    Canonical form of that label as a unicode string.
+  """
+  if user_input is None:
+    return user_input
+
+  if not isinstance(user_input, six.text_type):
+    user_input = user_input.decode('utf-8')
+
+  canon_str = user_input.translate(_CANONICALIZATION_TRANSLATION_TABLE)
+  return canon_str
+
+
+def MergeLabels(labels_list, labels_add, labels_remove, config):
+  """Update a list of labels with the given add and remove label lists.
+
+  Args:
+    labels_list: list of current labels.
+    labels_add: labels that the user wants to add.
+    labels_remove: labels that the user wants to remove.
+    config: ProjectIssueConfig with info about exclusive prefixes and
+        enum fields.
+
+  Returns:
+    (merged_labels, update_labels_add, update_labels_remove):
+    A new list of labels with the given labels added and removed, and
+    any exclusive label prefixes taken into account.  Then two
+    lists of update strings to explain the changes that were actually
+    made.
+  """
+  old_lower_labels = [lab.lower() for lab in labels_list]
+  labels_add = [lab for lab in labels_add
+                if lab.lower() not in old_lower_labels]
+  labels_remove = [lab for lab in labels_remove
+                   if lab.lower() in old_lower_labels]
+  labels_remove_lower = [lab.lower() for lab in labels_remove]
+  exclusive_prefixes = [
+      lab.lower() + '-' for lab in config.exclusive_label_prefixes]
+  for fd in config.field_defs:
+    if (fd.field_type == tracker_pb2.FieldTypes.ENUM_TYPE and
+        not fd.is_multivalued):
+      exclusive_prefixes.append(fd.field_name.lower() + '-')
+
+  # We match prefix strings rather than splitting on dash because
+  # an exclusive-prefix or field name may contain dashes.
+  def MatchPrefix(lab, prefixes):
+    for prefix_dash in prefixes:
+      if lab.lower().startswith(prefix_dash):
+        return prefix_dash
+    return False
+
+  # Dedup any added labels.  E.g., ignore attempts to add Priority twice.
+  excl_add = []
+  dedupped_labels_add = []
+  for lab in labels_add:
+    matched_prefix_dash = MatchPrefix(lab, exclusive_prefixes)
+    if matched_prefix_dash:
+      if matched_prefix_dash not in excl_add:
+        excl_add.append(matched_prefix_dash)
+        dedupped_labels_add.append(lab)
+    else:
+      dedupped_labels_add.append(lab)
+
+  # "Old minus exclusive" is the set of old label values minus any
+  # that should be overwritten by newly set exclusive labels.
+  old_minus_excl = []
+  for lab in labels_list:
+    matched_prefix_dash = MatchPrefix(lab, excl_add)
+    if not matched_prefix_dash:
+      old_minus_excl.append(lab)
+
+  merged_labels = [lab for lab in old_minus_excl + dedupped_labels_add
+                   if lab.lower() not in labels_remove_lower]
+
+  return merged_labels, dedupped_labels_add, labels_remove
+
+
+# Pattern to match a valid hotlist name.
+RE_HOTLIST_NAME_PATTERN = r"[a-zA-Z][-0-9a-zA-Z\.]*"
+
+# Compiled regexp to match the hotlist name and nothing more before or after.
+RE_HOTLIST_NAME = re.compile(
+    '^%s$' % RE_HOTLIST_NAME_PATTERN, re.VERBOSE)
+
+
+def IsValidHotlistName(s):
+  """Return true if the given string is a valid hotlist name."""
+  return (RE_HOTLIST_NAME.match(s) and
+          len(s) <= framework_constants.MAX_HOTLIST_NAME_LENGTH)
+
+
+USER_PREF_DEFS = {
+  'code_font': re.compile('(true|false)'),
+  'render_markdown': re.compile('(true|false)'),
+
+  # The are for dismissible cues.  True means the user has dismissed them.
+  'privacy_click_through': re.compile('(true|false)'),
+  'corp_mode_click_through': re.compile('(true|false)'),
+  'code_of_conduct': re.compile('(true|false)'),
+  'dit_keystrokes': re.compile('(true|false)'),
+  'italics_mean_derived': re.compile('(true|false)'),
+  'availability_msgs': re.compile('(true|false)'),
+  'your_email_bounced': re.compile('(true|false)'),
+  'search_for_numbers': re.compile('(true|false)'),
+  'restrict_new_issues': re.compile('(true|false)'),
+  'public_issue_notice': re.compile('(true|false)'),
+  'you_are_on_vacation': re.compile('(true|false)'),
+  'how_to_join_project': re.compile('(true|false)'),
+  'document_team_duties': re.compile('(true|false)'),
+  'showing_ids_instead_of_tiles': re.compile('(true|false)'),
+  'issue_timestamps': re.compile('(true|false)'),
+  'stale_fulltext': re.compile('(true|false)'),
+  }
+MAX_PREF_VALUE_LENGTH = 80
+
+
+def ValidatePref(name, value):
+  """Return an error message if the server does not support a pref value."""
+  if name not in USER_PREF_DEFS:
+    return 'Unknown pref name: %r' % name
+  if len(value) > MAX_PREF_VALUE_LENGTH:
+    return 'Value for pref name %r is too long' % name
+  if not USER_PREF_DEFS[name].match(value):
+    return 'Invalid pref value %r for %r' % (value, name)
+  return None
+
+
+def IsRestrictNewIssuesUser(cnxn, services, user_id):
+  # type: (MonorailConnection, Services, int) -> bool)
+  """Returns true iff user's new issues should be restricted by default."""
+  user_group_ids = services.usergroup.LookupMemberships(cnxn, user_id)
+  restrict_new_issues_groups_dict = services.user.LookupUserIDs(
+      cnxn, settings.restrict_new_issues_user_groups, autocreate=True)
+  restrict_new_issues_group_ids = set(restrict_new_issues_groups_dict.values())
+  return any(gid in restrict_new_issues_group_ids for gid in user_group_ids)
+
+
+def IsPublicIssueNoticeUser(cnxn, services, user_id):
+  # type: (MonorailConnection, Services, int) -> bool)
+  """Returns true iff user should see a public issue notice by default."""
+  user_group_ids = services.usergroup.LookupMemberships(cnxn, user_id)
+  public_issue_notice_groups_dict = services.user.LookupUserIDs(
+      cnxn, settings.public_issue_notice_user_groups, autocreate=True)
+  public_issue_notice_group_ids = set(public_issue_notice_groups_dict.values())
+  return any(gid in public_issue_notice_group_ids for gid in user_group_ids)
+
+
+def GetEffectiveIds(cnxn, services, user_ids):
+  # type: (MonorailConnection, Services, Collection[int]) ->
+  #   Mapping[int, Collection[int]]
+  """
+  Given a set of user IDs, it returns a mapping of user_id to a set of effective
+  IDs that include the user's ID and all of their user groups. This mapping
+  will be contain only the user_id anonymous users.
+  """
+  # Get direct memberships for user_ids.
+  effective_ids_by_user_id = services.usergroup.LookupAllMemberships(
+      cnxn, user_ids)
+  # Add user_id to list of effective_ids.
+  for user_id, effective_ids in effective_ids_by_user_id.items():
+    effective_ids.add(user_id)
+  # Get User objects for user_ids.
+  users_by_id = services.user.GetUsersByIDs(cnxn, user_ids)
+  for user_id, user in users_by_id.items():
+    if user and user.email:
+      effective_ids_by_user_id[user_id].update(
+          _ComputeMembershipsByEmail(cnxn, services, user.email))
+
+      # Add related parent and child ids.
+      related_ids = []
+      if user.linked_parent_id:
+        related_ids.append(user.linked_parent_id)
+      if user.linked_child_ids:
+        related_ids.extend(user.linked_child_ids)
+
+      # Add any related efective_ids.
+      if related_ids:
+        effective_ids_by_user_id[user_id].update(related_ids)
+        effective_ids_by_related_id = services.usergroup.LookupAllMemberships(
+            cnxn, related_ids)
+        related_effective_ids = functools.reduce(
+            set.union, effective_ids_by_related_id.values(), set())
+        effective_ids_by_user_id[user_id].update(related_effective_ids)
+  return effective_ids_by_user_id
+
+
+def _ComputeMembershipsByEmail(cnxn, services, email):
+  # type: (MonorailConnection, Services, str) -> Collection[int]
+  """
+  Given an user email, it returns a list [group_id] of computed user groups.
+  """
+  # Get the user email domain to compute memberships of the user.
+  (_username, user_email_domain, _obs_username,
+   _obs_email) = ParseAndObscureAddress(email)
+  return services.usergroup.LookupComputedMemberships(cnxn, user_email_domain)