| # Copyright 2016 The Chromium Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Helper functions for custom field sevlets.""" |
| from __future__ import print_function |
| from __future__ import division |
| from __future__ import absolute_import |
| |
| import collections |
| import itertools |
| import logging |
| import re |
| import settings |
| |
| from google.appengine.api import app_identity |
| |
| from features import autolink_constants |
| from framework import authdata |
| from framework import exceptions |
| from framework import framework_bizobj |
| from framework import framework_constants |
| from framework import permissions |
| from framework import timestr |
| from framework import validate |
| from mrproto import tracker_pb2 |
| from services import config_svc |
| from tracker import tracker_bizobj |
| |
| |
| INVALID_USER_ID = -1 |
| |
| ParsedFieldDef = collections.namedtuple( |
| 'ParsedFieldDef', |
| 'field_name, field_type_str, min_value, max_value, regex, ' |
| 'needs_member, needs_perm, grants_perm, notify_on, is_required, ' |
| 'is_niche, importance, is_multivalued, field_docstring, choices_text, ' |
| 'applicable_type, applicable_predicate, revised_labels, date_action_str, ' |
| 'approvers_str, survey, parent_approval_name, is_phase_field, ' |
| 'is_restricted_field') |
| |
| |
| def ListApplicableFieldDefs(issues, config): |
| # type: (Sequence[mrproto.tracker_pb2.Issue], |
| # mrproto.tracker_pb2.ProjectIssueConfig) -> |
| # Sequence[mrproto.tracker_pb2.FieldDef] |
| """Return the applicable FieldDefs for the given issues. """ |
| issue_labels = [] |
| issue_approval_ids = [] |
| for issue in issues: |
| issue_labels.extend(issue.labels) |
| issue_approval_ids.extend( |
| [approval.approval_id for approval in issue.approval_values]) |
| labels_by_prefix = tracker_bizobj.LabelsByPrefix(list(set(issue_labels)), []) |
| types = set(labels_by_prefix.get('type', [])) |
| types_lower = [t.lower() for t in types] |
| applicable_fds = [] |
| for fd in config.field_defs: |
| if fd.is_deleted: |
| continue |
| if fd.field_id in issue_approval_ids: |
| applicable_fds.append(fd) |
| elif fd.field_type != tracker_pb2.FieldTypes.APPROVAL_TYPE and ( |
| not fd.applicable_type or fd.applicable_type.lower() in types_lower): |
| applicable_fds.append(fd) |
| return applicable_fds |
| |
| |
| def ParseFieldDefRequest(post_data, config): |
| """Parse the user's HTML form data to update a field definition.""" |
| field_name = post_data.get('name', '') |
| field_type_str = post_data.get('field_type') |
| # TODO(jrobbins): once a min or max is set, it cannot be completely removed. |
| min_value_str = post_data.get('min_value') |
| try: |
| min_value = int(min_value_str) |
| except (ValueError, TypeError): |
| min_value = None |
| max_value_str = post_data.get('max_value') |
| try: |
| max_value = int(max_value_str) |
| except (ValueError, TypeError): |
| max_value = None |
| regex = post_data.get('regex') |
| needs_member = 'needs_member' in post_data |
| needs_perm = post_data.get('needs_perm', '').strip() |
| grants_perm = post_data.get('grants_perm', '').strip() |
| notify_on_str = post_data.get('notify_on') |
| if notify_on_str in config_svc.NOTIFY_ON_ENUM: |
| notify_on = config_svc.NOTIFY_ON_ENUM.index(notify_on_str) |
| else: |
| notify_on = 0 |
| importance = post_data.get('importance') |
| is_required = (importance == 'required') |
| is_niche = (importance == 'niche') |
| is_multivalued = 'is_multivalued' in post_data |
| field_docstring = post_data.get('docstring', '') |
| choices_text = post_data.get('choices', '') |
| applicable_type = post_data.get('applicable_type', '') |
| applicable_predicate = '' # TODO(jrobbins): placeholder for future feature |
| revised_labels = _ParseChoicesIntoWellKnownLabels( |
| choices_text, field_name, config, field_type_str) |
| date_action_str = post_data.get('date_action') |
| approvers_str = post_data.get('approver_names', '').strip().rstrip(',') |
| survey = post_data.get('survey', '') |
| parent_approval_name = post_data.get('parent_approval_name', '') |
| # TODO(jojwang): monorail:3774, remove enum_type condition when |
| # phases can have labels. |
| is_phase_field = ('is_phase_field' in post_data) and ( |
| field_type_str not in ['approval_type', 'enum_type']) |
| is_restricted_field = 'is_restricted_field' in post_data |
| |
| return ParsedFieldDef( |
| field_name, field_type_str, min_value, max_value, regex, needs_member, |
| needs_perm, grants_perm, notify_on, is_required, is_niche, importance, |
| is_multivalued, field_docstring, choices_text, applicable_type, |
| applicable_predicate, revised_labels, date_action_str, approvers_str, |
| survey, parent_approval_name, is_phase_field, is_restricted_field) |
| |
| |
| def _ParseChoicesIntoWellKnownLabels( |
| choices_text, field_name, config, field_type_str): |
| """Parse a field's possible choices and integrate them into the config. |
| |
| Args: |
| choices_text: string with one label and optional docstring per line. |
| field_name: string name of the field definition being edited. |
| config: ProjectIssueConfig PB of the current project. |
| field_type_str: string name of the new field's type. None if an existing |
| field is being updated |
| |
| Returns: |
| A revised list of labels that can be used to update the config. |
| """ |
| fd = tracker_bizobj.FindFieldDef(field_name, config) |
| matches = framework_constants.IDENTIFIER_DOCSTRING_RE.findall(choices_text) |
| maskingFieldNames = [] |
| # wkls should only be masked by the field if it is an enum_type. |
| if (field_type_str == 'enum_type') or ( |
| fd and fd.field_type is tracker_pb2.FieldTypes.ENUM_TYPE): |
| maskingFieldNames.append(field_name.lower()) |
| |
| new_labels = [ |
| ('%s-%s' % (field_name, label), choice_docstring.strip(), False) |
| for label, choice_docstring in matches] |
| kept_labels = [ |
| (wkl.label, wkl.label_docstring, wkl.deprecated) |
| for wkl in config.well_known_labels |
| if not tracker_bizobj.LabelIsMaskedByField( |
| wkl.label, maskingFieldNames)] |
| revised_labels = kept_labels + new_labels |
| return revised_labels |
| |
| |
| def ShiftEnumFieldsIntoLabels( |
| labels, labels_remove, field_val_strs, field_val_strs_remove, config): |
| """Look at the custom field values and treat enum fields as labels. |
| |
| Args: |
| labels: list of labels to add/set on the issue. |
| labels_remove: list of labels to remove from the issue. |
| field_val_strs: {field_id: [val_str, ...]} of custom fields to add/set. |
| field_val_strs_remove: {field_id: [val_str, ...]} of custom fields to |
| remove. |
| config: ProjectIssueConfig PB including custom field definitions. |
| |
| SIDE-EFFECT: the labels and labels_remove lists will be extended with |
| key-value labels corresponding to the enum field values. Those field |
| entries will be removed from field_val_strs and field_val_strs_remove. |
| """ |
| for fd in config.field_defs: |
| if fd.field_type != tracker_pb2.FieldTypes.ENUM_TYPE: |
| continue |
| |
| if fd.field_id in field_val_strs: |
| labels.extend( |
| '%s-%s' % (fd.field_name, val) |
| for val in field_val_strs[fd.field_id] |
| if val and val != '--') |
| del field_val_strs[fd.field_id] |
| |
| if fd.field_id in field_val_strs_remove: |
| labels_remove.extend( |
| '%s-%s' % (fd.field_name, val) |
| for val in field_val_strs_remove[fd.field_id] |
| if val and val != '--') |
| del field_val_strs_remove[fd.field_id] |
| |
| |
| def ReviseApprovals(approval_id, approver_ids, survey, config): |
| revised_approvals = [( |
| approval.approval_id, approval.approver_ids, approval.survey) for |
| approval in config.approval_defs if |
| approval.approval_id != approval_id] |
| revised_approvals.append((approval_id, approver_ids, survey)) |
| return revised_approvals |
| |
| |
| def ParseOneFieldValue(cnxn, user_service, fd, val_str): |
| """Make one FieldValue PB from the given user-supplied string.""" |
| if fd.field_type == tracker_pb2.FieldTypes.INT_TYPE: |
| try: |
| return tracker_bizobj.MakeFieldValue( |
| fd.field_id, int(val_str), None, None, None, None, False) |
| except ValueError: |
| return None # TODO(jrobbins): should bounce |
| |
| elif fd.field_type == tracker_pb2.FieldTypes.STR_TYPE: |
| return tracker_bizobj.MakeFieldValue( |
| fd.field_id, None, val_str, None, None, None, False) |
| |
| elif fd.field_type == tracker_pb2.FieldTypes.USER_TYPE: |
| if val_str: |
| try: |
| user_id = user_service.LookupUserID(cnxn, val_str, autocreate=False) |
| except exceptions.NoSuchUserException: |
| # Set to invalid user ID to display error during the validation step. |
| user_id = INVALID_USER_ID |
| return tracker_bizobj.MakeFieldValue( |
| fd.field_id, None, None, user_id, None, None, False) |
| else: |
| return None |
| |
| elif fd.field_type == tracker_pb2.FieldTypes.DATE_TYPE: |
| try: |
| timestamp = timestr.DateWidgetStrToTimestamp(val_str) |
| return tracker_bizobj.MakeFieldValue( |
| fd.field_id, None, None, None, timestamp, None, False) |
| except ValueError: |
| return None # TODO(jrobbins): should bounce |
| |
| elif fd.field_type == tracker_pb2.FieldTypes.URL_TYPE: |
| val_str = FormatUrlFieldValue(val_str) |
| try: |
| return tracker_bizobj.MakeFieldValue( |
| fd.field_id, None, None, None, None, val_str, False) |
| except ValueError: |
| return None # TODO(jojwang): should bounce |
| |
| else: |
| logging.error('Cant parse field with unexpected type %r', fd.field_type) |
| return None |
| |
| |
| def ParseOnePhaseFieldValue(cnxn, user_service, fd, val_str, phase_ids): |
| """Return a list containing a FieldValue PB for each phase.""" |
| phase_fvs = [] |
| for phase_id in phase_ids: |
| # TODO(jojwang): monorail:3970, create the FieldValue once and find some |
| # proto2 CopyFrom() method to create a new one for each phase. |
| fv = ParseOneFieldValue(cnxn, user_service, fd, val_str) |
| if fv: |
| fv.phase_id = phase_id |
| phase_fvs.append(fv) |
| |
| return phase_fvs |
| |
| |
| def ParseFieldValues(cnxn, user_service, field_val_strs, phase_field_val_strs, |
| config, phase_ids_by_name=None): |
| """Return a list of FieldValue PBs based on the given dict of strings.""" |
| field_values = [] |
| for fd in config.field_defs: |
| if fd.is_phase_field and ( |
| fd.field_id in phase_field_val_strs) and phase_ids_by_name: |
| fvs_by_phase_name = phase_field_val_strs.get(fd.field_id, {}) |
| for phase_name, val_strs in fvs_by_phase_name.items(): |
| phase_ids = phase_ids_by_name.get(phase_name) |
| if not phase_ids: |
| continue |
| for val_str in val_strs: |
| field_values.extend( |
| ParseOnePhaseFieldValue( |
| cnxn, user_service, fd, val_str, phase_ids=phase_ids)) |
| # We do not save phase fields when there are no phases. |
| elif not fd.is_phase_field and (fd.field_id in field_val_strs): |
| for val_str in field_val_strs[fd.field_id]: |
| fv = ParseOneFieldValue(cnxn, user_service, fd, val_str) |
| if fv: |
| field_values.append(fv) |
| |
| return field_values |
| |
| |
| def ValidateLabels(cnxn, services, project_id, labels, ezt_errors=None): |
| """Validate labels to block creation of new labels for the Chromium project in |
| Monorail and return an error string or None. |
| |
| Args: |
| cnxn: MonorailConnection object. |
| services: Services object referencing services that can be queried. |
| project_id: Project ID. |
| labels: List of labels to be validated. |
| |
| Returns: |
| A string containing an error message if there was one. |
| """ |
| if settings.unit_test_mode or project_id in settings.label_freeze_project_ids: |
| new_labels = [ |
| l for l in labels if services.config.LookupLabelID( |
| cnxn, project_id, l, autocreate=False, case_sensitive=False) is None |
| and not settings.is_label_allowed(project_id, l) |
| ] |
| if len(new_labels) > 0: |
| err_msg = ( |
| "The creation of new labels is blocked for the Chromium project" |
| " in Monorail. To continue with editing your issue, please" |
| " remove: {} label(s).").format(", ".join(new_labels)) |
| if ezt_errors is not None: |
| ezt_errors.labels = err_msg |
| return err_msg |
| return None |
| |
| |
| def ValidateCustomFieldValue(cnxn, project, services, field_def, field_val): |
| # type: (MonorailConnection, mrproto.tracker_pb2.Project, Services, |
| # mrproto.tracker_pb2.FieldDef, mrproto.tracker_pb2.FieldValue) -> str |
| """Validate one custom field value and return an error string or None. |
| |
| Args: |
| cnxn: MonorailConnection object. |
| project: Project PB with info on the project the custom field belongs to. |
| services: Services object referencing services that can be queried. |
| field_def: FieldDef for the custom field we're validating against. |
| field_val: The value of the custom field. |
| |
| Returns: |
| A string containing an error message if there was one. |
| """ |
| if field_def.field_type == tracker_pb2.FieldTypes.INT_TYPE: |
| if (field_def.min_value is not None and |
| field_val.int_value < field_def.min_value): |
| return 'Value must be >= %d.' % field_def.min_value |
| if (field_def.max_value is not None and |
| field_val.int_value > field_def.max_value): |
| return 'Value must be <= %d.' % field_def.max_value |
| |
| elif field_def.field_type == tracker_pb2.FieldTypes.STR_TYPE: |
| if field_def.regex and field_val.str_value: |
| try: |
| regex = re.compile(field_def.regex) |
| if not regex.match(field_val.str_value): |
| return 'Value must match regular expression: %s.' % field_def.regex |
| except re.error: |
| logging.info('Failed to process regex %r with value %r. Allowing.', |
| field_def.regex, field_val.str_value) |
| return None |
| |
| elif field_def.field_type == tracker_pb2.FieldTypes.USER_TYPE: |
| field_val_user = services.user.GetUser(cnxn, field_val.user_id) |
| auth = authdata.AuthData.FromUser(cnxn, field_val_user, services) |
| if auth.user_pb.user_id == INVALID_USER_ID: |
| return 'User not found.' |
| if field_def.needs_member: |
| user_value_in_project = framework_bizobj.UserIsInProject( |
| project, auth.effective_ids) |
| if not user_value_in_project: |
| return 'User must be a member of the project.' |
| if field_def.needs_perm: |
| user_perms = permissions.GetPermissions( |
| auth.user_pb, auth.effective_ids, project) |
| has_perm = user_perms.CanUsePerm( |
| field_def.needs_perm, auth.effective_ids, project, []) |
| if not has_perm: |
| return 'User must have permission "%s".' % field_def.needs_perm |
| return None |
| |
| elif field_def.field_type == tracker_pb2.FieldTypes.DATE_TYPE: |
| # TODO(jrobbins): date validation |
| pass |
| |
| elif field_def.field_type == tracker_pb2.FieldTypes.URL_TYPE: |
| if field_val.url_value: |
| if not (validate.IsValidURL(field_val.url_value) |
| or autolink_constants.IS_A_SHORT_LINK_RE.match( |
| field_val.url_value) |
| or autolink_constants.IS_A_NUMERIC_SHORT_LINK_RE.match( |
| field_val.url_value) |
| or autolink_constants.IS_IMPLIED_LINK_RE.match( |
| field_val.url_value)): |
| return 'Value must be a valid url.' |
| |
| return None |
| |
| def ValidateCustomFields( |
| cnxn, services, field_values, config, project, ezt_errors=None, issue=None): |
| # type: (MonorailConnection, Services, |
| # Collection[mrproto.tracker_pb2.FieldValue], |
| # mrproto.tracker_pb2.ProjectConfig, mrproto.tracker_pb2.Project, |
| # Optional[EZTError], Optional[mrproto.tracker_pb2.Issue]) -> |
| # Sequence[str] |
| """Validate given fields and report problems in error messages.""" |
| fds_by_id = {fd.field_id: fd for fd in config.field_defs} |
| err_msgs = [] |
| |
| # Create a set of field_ids that have required values. If this set still |
| # contains items by the end of the function, there is an error. |
| required_fds = set() |
| if issue: |
| applicable_fds = ListApplicableFieldDefs([issue], config) |
| |
| lower_field_names = [fd.field_name.lower() for fd in applicable_fds] |
| label_prefixes = tracker_bizobj.LabelsByPrefix( |
| list(set(issue.labels)), lower_field_names) |
| |
| # Add applicable required fields to required_fds. |
| for fd in applicable_fds: |
| if not fd.is_required: |
| continue |
| |
| if (fd.field_type is tracker_pb2.FieldTypes.ENUM_TYPE and |
| fd.field_name.lower() in label_prefixes): |
| # Handle custom enum fields - they're a special case because their |
| # values are stored in labels instead of FieldValues. |
| continue |
| |
| required_fds.add(fd.field_id) |
| # Ensure that every field value entered is valid. ie: That users exist. |
| for fv in field_values: |
| # Remove field_ids from the required set when found. |
| if fv.field_id in required_fds: |
| required_fds.remove(fv.field_id) |
| |
| fd = fds_by_id.get(fv.field_id) |
| if fd: |
| err_msg = ValidateCustomFieldValue(cnxn, project, services, fd, fv) |
| |
| if err_msg: |
| err_msgs.append('Error for %r: %s' % (fv, err_msg)) |
| if ezt_errors: |
| ezt_errors.SetCustomFieldError(fv.field_id, err_msg) |
| |
| # Add errors for any fields still left in the required set. |
| for field_id in required_fds: |
| fd = fds_by_id.get(field_id) |
| err_msg = '%s field is required.' % (fd.field_name) |
| err_msgs.append(err_msg) |
| if ezt_errors: |
| ezt_errors.SetCustomFieldError(field_id, err_msg) |
| |
| return err_msgs |
| |
| |
| def AssertCustomFieldsEditPerms( |
| mr, config, field_vals, field_vals_remove, fields_clear, labels, |
| labels_remove): |
| """Check permissions for any kind of custom field edition attempt.""" |
| # TODO: When clearing phase_fields is possible, include it in this method. |
| field_ids = set() |
| |
| for fv in field_vals: |
| field_ids.add(fv.field_id) |
| for fvr in field_vals_remove: |
| field_ids.add(fvr.field_id) |
| for fd_id in fields_clear: |
| field_ids.add(fd_id) |
| |
| enum_fds_by_name = { |
| fd.field_name.lower(): fd.field_id |
| for fd in config.field_defs |
| if fd.field_type is tracker_pb2.FieldTypes.ENUM_TYPE and not fd.is_deleted |
| } |
| for label in itertools.chain(labels, labels_remove): |
| enum_field_name = tracker_bizobj.LabelIsMaskedByField( |
| label, enum_fds_by_name.keys()) |
| if enum_field_name: |
| field_ids.add(enum_fds_by_name.get(enum_field_name)) |
| |
| fds_by_id = {fd.field_id: fd for fd in config.field_defs} |
| for field_id in field_ids: |
| fd = fds_by_id.get(field_id) |
| if fd: |
| assert permissions.CanEditValueForFieldDef( |
| mr.auth.effective_ids, mr.perms, mr.project, |
| fd), 'No permission to edit certain fields.' |
| |
| |
| def ApplyRestrictedDefaultValues( |
| mr, config, field_vals, labels, template_field_vals, template_labels): |
| """Add default values of template fields that the user cannot edit. |
| |
| This method can be called by servlets where restricted field values that |
| a user cannot edit are displayed but do not get returned when the user |
| submits the form (and also assumes that previous assertions ensure these |
| conditions). These missing default values still need to be passed to the |
| services layer when a 'write' is done so that these default values do |
| not get removed. |
| |
| Args: |
| mr: MonorailRequest Object to hold info about the request and the user. |
| config: ProjectIssueConfig Object for the project. |
| field_vals: list of FieldValues that the user wants to save. |
| labels: list of labels that the user wants to save. |
| template_field_vals: list of FieldValues belonging to the template. |
| template_labels: list of labels belonging to the template. |
| |
| Side Effect: |
| The default values of a template that the user cannot edit are added |
| to 'field_vals' and 'labels'. |
| """ |
| |
| fds_by_id = {fd.field_id: fd for fd in config.field_defs} |
| for fv in template_field_vals: |
| fd = fds_by_id.get(fv.field_id) |
| if fd and not permissions.CanEditValueForFieldDef(mr.auth.effective_ids, |
| mr.perms, mr.project, fd): |
| field_vals.append(fv) |
| |
| fds_by_name = { |
| fd.field_name.lower(): fd |
| for fd in config.field_defs |
| if fd.field_type is tracker_pb2.FieldTypes.ENUM_TYPE and not fd.is_deleted |
| } |
| for label in template_labels: |
| enum_field_name = tracker_bizobj.LabelIsMaskedByField( |
| label, fds_by_name.keys()) |
| if enum_field_name: |
| fd = fds_by_name.get(enum_field_name) |
| if fd and not permissions.CanEditValueForFieldDef( |
| mr.auth.effective_ids, mr.perms, mr.project, fd): |
| labels.append(label) |
| |
| |
| def FormatUrlFieldValue(url_str): |
| """Check for and add 'https://' to a url string""" |
| if not url_str.startswith('http'): |
| return 'http://' + url_str |
| return url_str |
| |
| |
| def ReviseFieldDefFromParsed(parsed, old_fd): |
| """Creates new FieldDef based on an original FieldDef and parsed FieldDef""" |
| if parsed.date_action_str in config_svc.DATE_ACTION_ENUM: |
| date_action = config_svc.DATE_ACTION_ENUM.index(parsed.date_action_str) |
| else: |
| date_action = 0 |
| return tracker_bizobj.MakeFieldDef( |
| old_fd.field_id, old_fd.project_id, old_fd.field_name, old_fd.field_type, |
| parsed.applicable_type, parsed.applicable_predicate, parsed.is_required, |
| parsed.is_niche, parsed.is_multivalued, parsed.min_value, |
| parsed.max_value, parsed.regex, parsed.needs_member, parsed.needs_perm, |
| parsed.grants_perm, parsed.notify_on, date_action, parsed.field_docstring, |
| False, approval_id=old_fd.approval_id or None, |
| is_phase_field=old_fd.is_phase_field) |
| |
| |
| def ParsedFieldDefAssertions(mr, parsed): |
| """Checks if new/updated FieldDef is not violating basic assertions. |
| If the assertions are violated, the errors |
| will be included in the mr.errors. |
| |
| Args: |
| mr: MonorailRequest object used to hold |
| commonly info parsed from the request. |
| parsed: ParsedFieldDef object used to contain parsed info, |
| in this case regarding a custom field definition. |
| """ |
| # TODO(crbug/monorail/7275): This method is meant to eventually |
| # do all assertion checkings (shared by create/update fieldDef) |
| # and assign all mr.errors values. |
| if (parsed.is_required and parsed.is_niche): |
| mr.errors.is_niche = 'A field cannot be both required and niche.' |
| if parsed.date_action_str is not None and ( |
| parsed.date_action_str not in config_svc.DATE_ACTION_ENUM): |
| mr.errors.date_action = 'The date action should be either: ' + ', '.join( |
| config_svc.DATE_ACTION_ENUM) + '.' |
| if (parsed.min_value is not None and parsed.max_value is not None and |
| parsed.min_value > parsed.max_value): |
| mr.errors.min_value = 'Minimum value must be less than maximum.' |
| if parsed.regex: |
| try: |
| re.compile(parsed.regex) |
| except re.error: |
| mr.errors.regex = 'Invalid regular expression.' |