Project import generated by Copybara.
GitOrigin-RevId: d9e9e3fb4e31372ec1fb43b178994ca78fa8fe70
diff --git a/tracker/test/issuebulkedit_test.py b/tracker/test/issuebulkedit_test.py
new file mode 100644
index 0000000..89d9bc3
--- /dev/null
+++ b/tracker/test/issuebulkedit_test.py
@@ -0,0 +1,892 @@
+# Copyright 2016 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style
+# license that can be found in the LICENSE file or at
+# https://developers.google.com/open-source/licenses/bsd
+
+"""Unittests for monorail.tracker.issuebulkedit."""
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+
+import mock
+import os
+import unittest
+import webapp2
+
+from google.appengine.api import memcache
+from google.appengine.ext import testbed
+
+from framework import exceptions
+from framework import permissions
+from proto import tracker_pb2
+from services import service_manager
+from services import tracker_fulltext
+from testing import fake
+from testing import testing_helpers
+from tracker import issuebulkedit
+from tracker import tracker_bizobj
+from tracker import tracker_constants
+
+
+class Response(object):
+
+ def __init__(self):
+ self.status = None
+
+
+class IssueBulkEditTest(unittest.TestCase):
+
+ def setUp(self):
+ self.services = service_manager.Services(
+ features=fake.FeaturesService(),
+ project=fake.ProjectService(),
+ config=fake.ConfigService(),
+ issue=fake.IssueService(),
+ issue_star=fake.IssueStarService(),
+ user=fake.UserService(),
+ usergroup=fake.UserGroupService())
+ self.servlet = issuebulkedit.IssueBulkEdit(
+ 'req', 'res', services=self.services)
+ self.mr = testing_helpers.MakeMonorailRequest(
+ perms=permissions.OWNER_ACTIVE_PERMISSIONSET)
+ self.project = self.services.project.TestAddProject(
+ name='proj', project_id=789, owner_ids=[111])
+ self.cnxn = 'fake connection'
+ self.config = self.services.config.GetProjectConfig(
+ self.cnxn, self.project.project_id)
+ self.services.config.StoreConfig(self.cnxn, self.config)
+ self.owner = self.services.user.TestAddUser('owner@example.com', 111)
+
+ self.testbed = testbed.Testbed()
+ self.testbed.activate()
+ self.testbed.init_memcache_stub()
+ self.testbed.init_datastore_v3_stub()
+
+ self.mocked_methods = {}
+
+ def tearDown(self):
+ """Restore mocked objects of other modules."""
+ self.testbed.deactivate()
+ for obj, items in self.mocked_methods.items():
+ for member, previous_value in items.items():
+ setattr(obj, member, previous_value)
+
+ def testAssertBasePermission(self):
+ """Permit users with EDIT_ISSUE and ADD_ISSUE_COMMENT permissions."""
+ mr = testing_helpers.MakeMonorailRequest(
+ perms=permissions.CONTRIBUTOR_ACTIVE_PERMISSIONSET)
+ self.assertRaises(permissions.PermissionException,
+ self.servlet.AssertBasePermission, mr)
+
+ self.servlet.AssertBasePermission(self.mr)
+
+ def testGatherPageData(self):
+ """Test GPD works in a normal no-corner-cases case."""
+ created_issue_1 = fake.MakeTestIssue(
+ 789, 1, 'issue summary', 'New', 0, reporter_id=111)
+ self.services.issue.TestAddIssue(created_issue_1)
+ local_id_1 = created_issue_1.local_id
+ mr = testing_helpers.MakeMonorailRequest(
+ project=self.project)
+ mr.local_id_list = [local_id_1]
+
+ page_data = self.servlet.GatherPageData(mr)
+ self.assertEqual(1, page_data['num_issues'])
+
+ def testGatherPageData_CustomFieldEdition(self):
+ """Test GPD works in a normal no-corner-cases case."""
+ created_issue_1 = fake.MakeTestIssue(
+ 789, 1, 'issue summary', 'New', 0, reporter_id=111)
+ self.services.issue.TestAddIssue(created_issue_1)
+ local_id_1 = created_issue_1.local_id
+ mr = testing_helpers.MakeMonorailRequest(
+ project=self.project, perms=permissions.PermissionSet([]))
+ mr.local_id_list = [local_id_1]
+ mr.auth.effective_ids = {222}
+
+ fd_not_restricted = tracker_bizobj.MakeFieldDef(
+ 123,
+ 789,
+ 'CPU',
+ tracker_pb2.FieldTypes.INT_TYPE,
+ None,
+ '',
+ False,
+ False,
+ False,
+ None,
+ None,
+ '',
+ False,
+ '',
+ '',
+ tracker_pb2.NotifyTriggers.NEVER,
+ 'no_action',
+ 'doc',
+ False,
+ is_restricted_field=False)
+ self.config.field_defs.append(fd_not_restricted)
+
+ fd_restricted = tracker_bizobj.MakeFieldDef(
+ 124,
+ 789,
+ 'CPU',
+ tracker_pb2.FieldTypes.INT_TYPE,
+ None,
+ '',
+ False,
+ False,
+ False,
+ None,
+ None,
+ '',
+ False,
+ '',
+ '',
+ tracker_pb2.NotifyTriggers.NEVER,
+ 'no_action',
+ 'doc',
+ False,
+ is_restricted_field=True)
+ self.config.field_defs.append(fd_restricted)
+
+ page_data = self.servlet.GatherPageData(mr)
+ self.assertTrue(page_data['fields'][0].is_editable)
+ self.assertFalse(page_data['fields'][1].is_editable)
+
+ def testGatherPageData_NoIssues(self):
+ """Test GPD when no issues are specified in the mr."""
+ mr = testing_helpers.MakeMonorailRequest(
+ project=self.project)
+ self.assertRaises(exceptions.InputException,
+ self.servlet.GatherPageData, mr)
+
+ def testGatherPageData_FilteredIssues(self):
+ """Test GPD when all specified issues get filtered out."""
+ created_issue_1 = fake.MakeTestIssue(
+ 789,
+ 1,
+ 'issue summary',
+ 'New',
+ 0,
+ reporter_id=111,
+ labels=['restrict-view-Googler'])
+ self.services.issue.TestAddIssue(created_issue_1)
+ local_id_1 = created_issue_1.local_id
+ mr = testing_helpers.MakeMonorailRequest(
+ project=self.project)
+ mr.local_id_list = [local_id_1]
+
+ self.assertRaises(webapp2.HTTPException,
+ self.servlet.GatherPageData, mr)
+
+ def testGatherPageData_TypeLabels(self):
+ """Test that GPD displays a custom field for appropriate issues."""
+ created_issue_1 = fake.MakeTestIssue(
+ 789,
+ 1,
+ 'issue summary',
+ 'New',
+ 0,
+ reporter_id=111,
+ labels=['type-customlabels'])
+ self.services.issue.TestAddIssue(created_issue_1)
+ local_id_1 = created_issue_1.local_id
+ mr = testing_helpers.MakeMonorailRequest(
+ project=self.project)
+ mr.local_id_list = [local_id_1]
+
+ fd = tracker_bizobj.MakeFieldDef(
+ 123, 789, 'CPU', tracker_pb2.FieldTypes.INT_TYPE, None,
+ '', False, False, False, None, None, '', False, '', '',
+ tracker_pb2.NotifyTriggers.NEVER, 'no_action', 'doc', False)
+ self.config.field_defs.append(fd)
+
+ page_data = self.servlet.GatherPageData(mr)
+ self.assertEqual(1, len(page_data['fields']))
+
+ @mock.patch('framework.cloud_tasks_helpers.create_task')
+ def testProcessFormData(self, _create_task_mock):
+ """Test that PFD works in a normal no-corner-cases case."""
+ created_issue_1 = fake.MakeTestIssue(
+ 789, 1, 'issue summary', 'New', 111, reporter_id=111)
+ self.services.issue.TestAddIssue(created_issue_1)
+ local_id_1 = created_issue_1.local_id
+
+ mr = testing_helpers.MakeMonorailRequest(
+ project=self.project,
+ perms=permissions.OWNER_ACTIVE_PERMISSIONSET,
+ user_info={'user_id': 111})
+ mr.local_id_list = [local_id_1]
+
+ post_data = fake.PostData(
+ owner=['owner@example.com'], can=[1],
+ q=[''], colspec=[''], sort=[''], groupby=[''], start=[0], num=[100])
+ self._MockMethods()
+ url = self.servlet.ProcessFormData(mr, post_data)
+ self.assertTrue('list?can=1&q=&saved=1' in url)
+
+ def testProcessFormData_NoIssues(self):
+ """Test PFD when no issues are specified."""
+ mr = testing_helpers.MakeMonorailRequest(
+ project=self.project,
+ perms=permissions.OWNER_ACTIVE_PERMISSIONSET,
+ user_info={'user_id': 111})
+ post_data = fake.PostData()
+ self.servlet.response = Response()
+ self.servlet.ProcessFormData(mr, post_data)
+ # 400 == bad request
+ self.assertEqual(400, self.servlet.response.status)
+
+ def testProcessFormData_NoUser(self):
+ """Test PFD when the user is not logged in."""
+ mr = testing_helpers.MakeMonorailRequest(
+ project=self.project)
+ mr.local_id_list = [99999]
+ post_data = fake.PostData()
+ self.servlet.response = Response()
+ self.servlet.ProcessFormData(mr, post_data)
+ # 400 == bad request
+ self.assertEqual(400, self.servlet.response.status)
+
+ def testProcessFormData_CantComment(self):
+ """Test PFD when the user can't comment on any of the issues."""
+ mr = testing_helpers.MakeMonorailRequest(
+ project=self.project,
+ perms=permissions.EMPTY_PERMISSIONSET,
+ user_info={'user_id': 111})
+ mr.local_id_list = [99999]
+ post_data = fake.PostData()
+ self.servlet.response = Response()
+ self.servlet.ProcessFormData(mr, post_data)
+ # 400 == bad request
+ self.assertEqual(400, self.servlet.response.status)
+
+ def testProcessFormData_CantEdit(self):
+ """Test PFD when the user can't edit any issue metadata."""
+ mr = testing_helpers.MakeMonorailRequest(
+ project=self.project,
+ perms=permissions.CONTRIBUTOR_ACTIVE_PERMISSIONSET,
+ user_info={'user_id': 111})
+ mr.local_id_list = [99999]
+ post_data = fake.PostData()
+ self.servlet.response = Response()
+ self.servlet.ProcessFormData(mr, post_data)
+ # 400 == bad request
+ self.assertEqual(400, self.servlet.response.status)
+
+ def testProcessFormData_CantMove(self):
+ """Test PFD when the user can't move issues."""
+ mr = testing_helpers.MakeMonorailRequest(
+ project=self.project,
+ perms=permissions.COMMITTER_ACTIVE_PERMISSIONSET,
+ user_info={'user_id': 111})
+ mr.local_id_list = [99999]
+ post_data = fake.PostData(move_to=['proj'])
+ self.servlet.response = Response()
+ self.servlet.ProcessFormData(mr, post_data)
+ # 400 == bad request
+ self.assertEqual(400, self.servlet.response.status)
+
+ created_issue_1 = fake.MakeTestIssue(
+ 789, 1, 'issue summary', 'New', 111, reporter_id=111)
+ self.services.issue.TestAddIssue(created_issue_1)
+ local_id_1 = created_issue_1.local_id
+ mr.perms = permissions.OWNER_ACTIVE_PERMISSIONSET
+ mr.local_id_list = [local_id_1]
+ mr.project_name = 'proj'
+ self._MockMethods()
+ self.servlet.ProcessFormData(mr, post_data)
+ self.assertEqual(
+ 'The issues are already in project proj', mr.errors.move_to)
+
+ post_data = fake.PostData(move_to=['notexist'])
+ self.servlet.ProcessFormData(mr, post_data)
+ self.assertEqual('No such project: notexist', mr.errors.move_to)
+
+ def _MockMethods(self):
+ # Mock methods of other modules to avoid unnecessary testing
+ self.mocked_methods[tracker_fulltext] = {
+ 'IndexIssues': tracker_fulltext.IndexIssues,
+ 'UnindexIssues': tracker_fulltext.UnindexIssues}
+ def DoNothing(*_args, **_kwargs):
+ pass
+ self.servlet.PleaseCorrect = DoNothing
+ tracker_fulltext.IndexIssues = DoNothing
+ tracker_fulltext.UnindexIssues = DoNothing
+
+ def GetFirstAmendment(self, project_id, local_id):
+ issue = self.services.issue.GetIssueByLocalID(
+ self.cnxn, project_id, local_id)
+ issue_id = issue.issue_id
+ comments = self.services.issue.GetCommentsForIssue(self.cnxn, issue_id)
+ last_comment = comments[-1]
+ first_amendment = last_comment.amendments[0]
+ return first_amendment.field, first_amendment.newvalue
+
+ def testProcessFormData_BadUserField(self):
+ """Test PFD when a nonexistent user is added as a field value."""
+ created_issue_1 = fake.MakeTestIssue(
+ 789, 1, 'issue summary', 'New', 111, reporter_id=111)
+ self.services.issue.TestAddIssue(created_issue_1)
+ local_id_1 = created_issue_1.local_id
+ mr = testing_helpers.MakeMonorailRequest(
+ project=self.project,
+ perms=permissions.OWNER_ACTIVE_PERMISSIONSET,
+ user_info={'user_id': 111})
+ mr.local_id_list = [local_id_1]
+
+ fd = tracker_bizobj.MakeFieldDef(
+ 12345, 789, 'PM', tracker_pb2.FieldTypes.USER_TYPE, None,
+ '', False, False, False, None, None, '', False, '', '',
+ tracker_pb2.NotifyTriggers.NEVER, 'no_action', 'doc', False)
+ self.config.field_defs.append(fd)
+
+ post_data = fake.PostData(
+ custom_12345=['ghost@gmail.com'], owner=['owner@example.com'], can=[1],
+ q=[''], colspec=[''], sort=[''], groupby=[''], start=[0], num=[100])
+ self._MockMethods()
+ self.servlet.ProcessFormData(mr, post_data)
+ self.assertEqual('User not found.', mr.errors.custom_fields[0].message)
+
+ @mock.patch('framework.cloud_tasks_helpers.create_task')
+ def testProcessFormData_CustomFields(self, _create_task_mock):
+ """Test PFD processes edits to custom fields."""
+ created_issue_1 = fake.MakeTestIssue(
+ 789, 1, 'issue summary', 'New', 111, reporter_id=111)
+ self.services.issue.TestAddIssue(created_issue_1)
+ local_id_1 = created_issue_1.local_id
+ mr = testing_helpers.MakeMonorailRequest(
+ project=self.project,
+ perms=permissions.OWNER_ACTIVE_PERMISSIONSET,
+ user_info={'user_id': 111})
+ mr.local_id_list = [local_id_1]
+
+ fd = tracker_bizobj.MakeFieldDef(
+ 12345, 789, 'CPU', tracker_pb2.FieldTypes.INT_TYPE, None,
+ '', False, False, False, None, None, '', False, '', '',
+ tracker_pb2.NotifyTriggers.NEVER, 'no_action', 'doc', False)
+ self.config.field_defs.append(fd)
+
+ post_data = fake.PostData(
+ custom_12345=['10'],
+ owner=['owner@example.com'],
+ can=[1],
+ q=[''],
+ colspec=[''],
+ sort=[''],
+ groupby=[''],
+ start=[0],
+ num=[100])
+ self._MockMethods()
+ self.servlet.ProcessFormData(mr, post_data)
+ self.assertEqual(
+ (tracker_pb2.FieldID.CUSTOM, '10'),
+ self.GetFirstAmendment(789, local_id_1))
+
+ @mock.patch('framework.cloud_tasks_helpers.create_task')
+ def testProcessFormData_RestrictedCustomFieldsAccept(self, _create_task_mock):
+ """We accept edits to restricted fields by editors (or admins)."""
+ created_issue_1 = fake.MakeTestIssue(
+ 789, 1, 'issue summary', 'New', 111, reporter_id=111)
+ self.services.issue.TestAddIssue(created_issue_1)
+ local_id_1 = created_issue_1.local_id
+ mr = testing_helpers.MakeMonorailRequest(
+ project=self.project,
+ perms=permissions.PermissionSet(
+ [
+ permissions.EDIT_ISSUE, permissions.ADD_ISSUE_COMMENT,
+ permissions.VIEW
+ ]),
+ user_info={'user_id': 111})
+ mr.local_id_list = [local_id_1]
+
+ fd = tracker_bizobj.MakeFieldDef(
+ 12345,
+ 789,
+ 'CPU',
+ tracker_pb2.FieldTypes.INT_TYPE,
+ None,
+ '',
+ False,
+ False,
+ False,
+ None,
+ None,
+ '',
+ False,
+ '',
+ '',
+ tracker_pb2.NotifyTriggers.NEVER,
+ 'no_action',
+ 'doc',
+ False,
+ is_restricted_field=True)
+ fd.editor_ids = [111]
+ self.config.field_defs.append(fd)
+
+ post_data = fake.PostData(
+ custom_12345=['10'],
+ owner=['owner@example.com'],
+ can=[1],
+ q=[''],
+ colspec=[''],
+ sort=[''],
+ groupby=[''],
+ start=[0],
+ num=[100])
+ self._MockMethods()
+ self.servlet.ProcessFormData(mr, post_data)
+ self.assertEqual(
+ (tracker_pb2.FieldID.CUSTOM, '10'),
+ self.GetFirstAmendment(789, local_id_1))
+
+ def testProcessFormData_RestrictedCustomFieldsReject(self):
+ """We reject edits to restricted fields by non-editors (and non-admins)."""
+ created_issue_1 = fake.MakeTestIssue(
+ 789, 1, 'issue summary', 'New', 111, reporter_id=111)
+ self.services.issue.TestAddIssue(created_issue_1)
+ local_id_1 = created_issue_1.local_id
+ mr = testing_helpers.MakeMonorailRequest(
+ project=self.project,
+ perms=permissions.PermissionSet(
+ [
+ permissions.EDIT_ISSUE, permissions.ADD_ISSUE_COMMENT,
+ permissions.VIEW
+ ]),
+ user_info={'user_id': 111})
+ mr.local_id_list = [local_id_1]
+
+ fd_int = tracker_bizobj.MakeFieldDef(
+ 11111,
+ 789,
+ 'fd_int',
+ tracker_pb2.FieldTypes.INT_TYPE,
+ None,
+ '',
+ False,
+ False,
+ False,
+ None,
+ None,
+ '',
+ False,
+ '',
+ '',
+ tracker_pb2.NotifyTriggers.NEVER,
+ 'no_action',
+ 'doc',
+ False,
+ is_restricted_field=True)
+ fd_enum = tracker_bizobj.MakeFieldDef(
+ 44444,
+ 789,
+ 'fdEnum',
+ tracker_pb2.FieldTypes.ENUM_TYPE,
+ None,
+ '',
+ False,
+ False,
+ False,
+ None,
+ None,
+ '',
+ False,
+ '',
+ '',
+ tracker_pb2.NotifyTriggers.NEVER,
+ 'no_action',
+ 'doc',
+ False,
+ is_restricted_field=True)
+ fd_int.admin_ids = [222]
+ fd_enum.editor_ids = [333]
+ self.config.field_defs = [fd_int, fd_enum]
+
+ post_data_add_fv = fake.PostData(
+ custom_11111=['10'],
+ owner=['owner@example.com'],
+ can=[1],
+ q=[''],
+ colspec=[''],
+ sort=[''],
+ groupby=[''],
+ start=[0],
+ num=[100])
+ post_data_rm_fv = fake.PostData(
+ op_custom_11111=['remove'],
+ custom_11111=['10'],
+ owner=['owner@example.com'],
+ can=[1],
+ q=[''],
+ colspec=[''],
+ sort=[''],
+ groupby=[''],
+ start=[0],
+ num=[100])
+ post_data_clear_fd = fake.PostData(
+ op_custom_11111=['clear'],
+ owner=['owner@example.com'],
+ can=[1],
+ q=[''],
+ colspec=[''],
+ sort=[''],
+ groupby=[''],
+ start=[0],
+ num=[100])
+ post_data_label_edits_enum = fake.PostData(
+ label=['fdEnum-a'],
+ owner=['owner@example.com'],
+ can=[1],
+ q=[''],
+ colspec=[''],
+ sort=[''],
+ groupby=[''],
+ start=[0],
+ num=[100])
+ post_data_label_rm_enum = fake.PostData(
+ label=['-fdEnum-b'],
+ owner=['owner@example.com'],
+ can=[1],
+ q=[''],
+ colspec=[''],
+ sort=[''],
+ groupby=[''],
+ start=[0],
+ num=[100])
+
+ self._MockMethods()
+ self.assertRaises(
+ AssertionError, self.servlet.ProcessFormData, mr, post_data_add_fv)
+ self.assertRaises(
+ AssertionError, self.servlet.ProcessFormData, mr, post_data_rm_fv)
+ self.assertRaises(
+ AssertionError, self.servlet.ProcessFormData, mr, post_data_clear_fd)
+ self.assertRaises(
+ AssertionError, self.servlet.ProcessFormData, mr,
+ post_data_label_edits_enum)
+ self.assertRaises(
+ AssertionError, self.servlet.ProcessFormData, mr,
+ post_data_label_rm_enum)
+
+ def testProcessFormData_DuplicateStatus_MergeSameIssue(self):
+ """Test PFD processes null/cleared status values."""
+ created_issue_1 = fake.MakeTestIssue(
+ 789, 1, 'issue summary', 'New', 111, reporter_id=111)
+ self.services.issue.TestAddIssue(created_issue_1)
+ local_id_1 = created_issue_1.local_id
+
+ created_issue_2 = fake.MakeTestIssue(
+ 789, 1, 'issue summary', 'New', 112, reporter_id=112)
+ self.services.issue.TestAddIssue(created_issue_2)
+ merge_into_local_id_2 = created_issue_2.local_id
+
+ mr = testing_helpers.MakeMonorailRequest(
+ project=self.project,
+ perms=permissions.OWNER_ACTIVE_PERMISSIONSET,
+ user_info={'user_id': 111})
+ mr.local_id_list = [local_id_1, merge_into_local_id_2]
+ mr.project_name = 'proj'
+
+ # Add required project_name to merge_into_issue.
+ merge_into_issue = self.services.issue.GetIssueByLocalID(
+ mr.cnxn, self.project.project_id, merge_into_local_id_2)
+ merge_into_issue.project_name = 'proj'
+
+ post_data = fake.PostData(status=['Duplicate'],
+ merge_into=[str(merge_into_local_id_2)], owner=['owner@example.com'],
+ can=[1], q=[''], colspec=[''], sort=[''], groupby=[''], start=[0],
+ num=[100])
+ self._MockMethods()
+ self.servlet.ProcessFormData(mr, post_data)
+ self.assertEqual('Cannot merge issue into itself', mr.errors.merge_into_id)
+
+ def testProcessFormData_DuplicateStatus_MergeMissingIssue(self):
+ """Test PFD processes null/cleared status values."""
+ created_issue_1 = fake.MakeTestIssue(
+ 789, 1, 'issue summary', 'New', 111, reporter_id=111)
+ self.services.issue.TestAddIssue(created_issue_1)
+ local_id_1 = created_issue_1.local_id
+ created_issue_2 = fake.MakeTestIssue(
+ 789, 1, 'issue summary2', 'New', 112, reporter_id=112)
+ self.services.issue.TestAddIssue(created_issue_2)
+ local_id_2 = created_issue_2.local_id
+ mr = testing_helpers.MakeMonorailRequest(
+ project=self.project,
+ perms=permissions.OWNER_ACTIVE_PERMISSIONSET,
+ user_info={'user_id': 111})
+ mr.local_id_list = [local_id_1, local_id_2]
+ mr.project_name = 'proj'
+
+ post_data = fake.PostData(status=['Duplicate'],
+ merge_into=['non existant id'], owner=['owner@example.com'],
+ can=[1], q=[''], colspec=[''], sort=[''], groupby=[''], start=[0],
+ num=[100])
+ self._MockMethods()
+ self.servlet.ProcessFormData(mr, post_data)
+ self.assertEqual('Please enter an issue ID', mr.errors.merge_into_id)
+
+ @mock.patch('framework.cloud_tasks_helpers.create_task')
+ def testProcessFormData_DuplicateStatus_Success(self, _create_task_mock):
+ """Test PFD processes null/cleared status values."""
+ created_issue_1 = fake.MakeTestIssue(
+ 789, 1, 'issue summary', 'New', 111, reporter_id=111)
+ self.services.issue.TestAddIssue(created_issue_1)
+ local_id_1 = created_issue_1.local_id
+ created_issue_2 = fake.MakeTestIssue(
+ 789, 2, 'issue summary2', 'New', 111, reporter_id=111)
+ self.services.issue.TestAddIssue(created_issue_2)
+ local_id_2 = created_issue_2.local_id
+ created_issue_3 = fake.MakeTestIssue(
+ 789, 3, 'issue summary3', 'New', 112, reporter_id=112)
+ self.services.issue.TestAddIssue(created_issue_3)
+ merge_into_local_id_3 = created_issue_3.local_id
+ mr = testing_helpers.MakeMonorailRequest(
+ project=self.project,
+ perms=permissions.OWNER_ACTIVE_PERMISSIONSET,
+ user_info={'user_id': 111})
+ mr.local_id_list = [local_id_1, local_id_2]
+ mr.project_name = 'proj'
+
+ post_data = fake.PostData(status=['Duplicate'],
+ merge_into=[str(merge_into_local_id_3)], owner=['owner@example.com'],
+ can=[1], q=[''], colspec=[''], sort=[''], groupby=[''], start=[0],
+ num=[100])
+ self._MockMethods()
+
+ # Add project_name, CCs and starrers to the merge_into_issue.
+ merge_into_issue = self.services.issue.GetIssueByLocalID(
+ mr.cnxn, self.project.project_id, merge_into_local_id_3)
+ merge_into_issue.project_name = 'proj'
+ merge_into_issue.cc_ids = [113, 120]
+ self.services.issue_star.SetStar(
+ mr.cnxn, self.services, None, merge_into_issue.issue_id, 120, True)
+
+ # Add project_name, CCs and starrers to the source issues.
+ # Issue 1
+ issue_1 = self.services.issue.GetIssueByLocalID(
+ mr.cnxn, self.project.project_id, local_id_1)
+ issue_1.project_name = 'proj'
+ issue_1.cc_ids = [113, 114]
+ self.services.issue_star.SetStar(
+ mr.cnxn, self.services, None, issue_1.issue_id, 113, True)
+ # Issue 2
+ issue_2 = self.services.issue.GetIssueByLocalID(
+ mr.cnxn, self.project.project_id, local_id_2)
+ issue_2.project_name = 'proj'
+ issue_2.cc_ids = [113, 115, 118]
+ self.services.issue_star.SetStar(
+ mr.cnxn, self.services, None, issue_2.issue_id, 114, True)
+ self.services.issue_star.SetStar(
+ mr.cnxn, self.services, None, issue_2.issue_id, 115, True)
+
+ self.servlet.ProcessFormData(mr, post_data)
+
+ # Verify both source issues were updated.
+ self.assertEqual(
+ (tracker_pb2.FieldID.STATUS, 'Duplicate'),
+ self.GetFirstAmendment(self.project.project_id, local_id_1))
+ self.assertEqual(
+ (tracker_pb2.FieldID.STATUS, 'Duplicate'),
+ self.GetFirstAmendment(self.project.project_id, local_id_2))
+
+ # Verify that the merge into issue was updated with a comment.
+ comments = self.services.issue.GetCommentsForIssue(
+ self.cnxn, merge_into_issue.issue_id)
+ self.assertEqual(
+ 'Issue 1 has been merged into this issue.\n'
+ 'Issue 2 has been merged into this issue.', comments[-1].content)
+
+ # Verify CC lists and owner were merged to the merge_into issue.
+ self.assertEqual(
+ [113, 120, 114, 115, 118, 111], merge_into_issue.cc_ids)
+ # Verify new starrers were added to the merge_into issue.
+ self.assertEqual(4,
+ self.services.issue_star.CountItemStars(
+ self.cnxn, merge_into_issue.issue_id))
+ self.assertEqual([120, 113, 114, 115],
+ self.services.issue_star.LookupItemStarrers(
+ self.cnxn, merge_into_issue.issue_id))
+
+ @mock.patch('framework.cloud_tasks_helpers.create_task')
+ def testProcessFormData_ClearStatus(self, _create_task_mock):
+ """Test PFD processes null/cleared status values."""
+ created_issue_1 = fake.MakeTestIssue(
+ 789, 1, 'issue summary', 'New', 111, reporter_id=111)
+ self.services.issue.TestAddIssue(created_issue_1)
+ local_id_1 = created_issue_1.local_id
+ mr = testing_helpers.MakeMonorailRequest(
+ project=self.project,
+ perms=permissions.OWNER_ACTIVE_PERMISSIONSET,
+ user_info={'user_id': 111})
+ mr.local_id_list = [local_id_1]
+
+ post_data = fake.PostData(
+ op_statusenter=['clear'], owner=['owner@example.com'], can=[1],
+ q=[''], colspec=[''], sort=[''], groupby=[''], start=[0], num=[100])
+ self._MockMethods()
+ self.servlet.ProcessFormData(mr, post_data)
+ self.assertEqual(
+ (tracker_pb2.FieldID.STATUS, ''), self.GetFirstAmendment(
+ 789, local_id_1))
+
+ def testProcessFormData_InvalidOwner(self):
+ """Test PFD rejects invalid owner emails."""
+ created_issue_1 = fake.MakeTestIssue(
+ 789, 1, 'issue summary', 'New', 0, reporter_id=111)
+ self.services.issue.TestAddIssue(created_issue_1)
+ local_id_1 = created_issue_1.local_id
+ mr = testing_helpers.MakeMonorailRequest(
+ project=self.project,
+ perms=permissions.OWNER_ACTIVE_PERMISSIONSET,
+ user_info={'user_id': 111})
+ mr.local_id_list = [local_id_1]
+ post_data = fake.PostData(
+ owner=['invalid'])
+ self.servlet.response = Response()
+ self._MockMethods()
+ self.servlet.ProcessFormData(mr, post_data)
+ self.assertTrue(mr.errors.AnyErrors())
+
+ @mock.patch('framework.cloud_tasks_helpers.create_task')
+ def testProcessFormData_MoveTo(self, _create_task_mock):
+ """Test PFD processes move_to values."""
+ created_issue_1 = fake.MakeTestIssue(
+ 789, 1, 'issue summary', 'New', 111, reporter_id=111)
+ self.services.issue.TestAddIssue(created_issue_1)
+ local_id_1 = created_issue_1.local_id
+ move_to_project = self.services.project.TestAddProject(
+ name='proj2', project_id=790, owner_ids=[111])
+
+ mr = testing_helpers.MakeMonorailRequest(
+ project=self.project,
+ perms=permissions.OWNER_ACTIVE_PERMISSIONSET,
+ user_info={'user_id': 111})
+ mr.project_name = 'proj'
+ mr.local_id_list = [local_id_1]
+
+ self._MockMethods()
+ post_data = fake.PostData(
+ move_to=['proj2'], can=[1], q=[''],
+ colspec=[''], sort=[''], groupby=[''], start=[0], num=[100])
+ self.servlet.response = Response()
+ self.servlet.ProcessFormData(mr, post_data)
+
+ issue = self.services.issue.GetIssueByLocalID(
+ self.cnxn, move_to_project.project_id, local_id_1)
+ self.assertIsNotNone(issue)
+
+ def testProcessFormData_InvalidBlockIssues(self):
+ """Test PFD processes invalid blocked_on and blocking values."""
+ created_issue_1 = fake.MakeTestIssue(
+ 789, 1, 'issue summary', 'New', 111, reporter_id=111)
+ self.services.issue.TestAddIssue(created_issue_1)
+ local_id_1 = created_issue_1.local_id
+ mr = testing_helpers.MakeMonorailRequest(
+ project=self.project,
+ perms=permissions.OWNER_ACTIVE_PERMISSIONSET,
+ user_info={'user_id': 111})
+ mr.project_name = 'proj'
+ mr.local_id_list = [local_id_1]
+
+ self._MockMethods()
+ post_data = fake.PostData(
+ op_blockedonenter=['append'], blocked_on=['12345'],
+ op_blockingenter=['append'], blocking=['54321'],
+ can=[1], q=[''],
+ colspec=[''], sort=[''], groupby=[''], start=[0], num=[100])
+ self.servlet.ProcessFormData(mr, post_data)
+
+ self.assertEqual('Invalid issue ID 12345', mr.errors.blocked_on)
+ self.assertEqual('Invalid issue ID 54321', mr.errors.blocking)
+
+ def testProcessFormData_BlockIssuesOnItself(self):
+ """Test PFD processes invalid blocked_on and blocking values."""
+ created_issue_1 = fake.MakeTestIssue(
+ 789, 1, 'issue summary', 'New', 111, reporter_id=111)
+ self.services.issue.TestAddIssue(created_issue_1)
+ local_id_1 = created_issue_1.local_id
+ created_issue_2 = fake.MakeTestIssue(
+ 789, 1, 'issue summary', 'New', 111, reporter_id=111)
+ self.services.issue.TestAddIssue(created_issue_2)
+ local_id_2 = created_issue_2.local_id
+ mr = testing_helpers.MakeMonorailRequest(
+ project=self.project,
+ perms=permissions.OWNER_ACTIVE_PERMISSIONSET,
+ user_info={'user_id': 111})
+ mr.project_name = 'proj'
+ mr.local_id_list = [local_id_1, local_id_2]
+
+ self._MockMethods()
+ post_data = fake.PostData(
+ op_blockedonenter=['append'], blocked_on=[str(local_id_1)],
+ op_blockingenter=['append'], blocking=[str(local_id_2)],
+ can=[1], q=[''],
+ colspec=[''], sort=[''], groupby=[''], start=[0], num=[100])
+ self.servlet.ProcessFormData(mr, post_data)
+
+ self.assertEqual('Cannot block an issue on itself.', mr.errors.blocked_on)
+ self.assertEqual('Cannot block an issue on itself.', mr.errors.blocking)
+
+ @mock.patch('framework.cloud_tasks_helpers.create_task')
+ def testProcessFormData_NormalBlockIssues(self, _create_task_mock):
+ """Test PFD processes blocked_on and blocking values."""
+ created_issue_1 = fake.MakeTestIssue(
+ 789, 1, 'issue summary', 'New', 111, reporter_id=111)
+ self.services.issue.TestAddIssue(created_issue_1)
+ local_id_1 = created_issue_1.local_id
+
+ created_issueid = fake.MakeTestIssue(
+ 789, 2, 'blocking', 'New', 111, reporter_id=111)
+ self.services.issue.TestAddIssue(created_issueid)
+ blocking_id = created_issueid.local_id
+
+ created_issueid = fake.MakeTestIssue(
+ 789, 3, 'blocked on', 'New', 111, reporter_id=111)
+ self.services.issue.TestAddIssue(created_issueid)
+ blocked_on_id = created_issueid.local_id
+ mr = testing_helpers.MakeMonorailRequest(
+ project=self.project,
+ perms=permissions.OWNER_ACTIVE_PERMISSIONSET,
+ user_info={'user_id': 111})
+ mr.project_name = 'proj'
+ mr.local_id_list = [local_id_1]
+
+ self._MockMethods()
+ post_data = fake.PostData(
+ op_blockedonenter=['append'], blocked_on=[str(blocked_on_id)],
+ op_blockingenter=['append'], blocking=[str(blocking_id)],
+ can=[1], q=[''],
+ colspec=[''], sort=[''], groupby=[''], start=[0], num=[100])
+ self.servlet.ProcessFormData(mr, post_data)
+
+ self.assertIsNone(mr.errors.blocked_on)
+ self.assertIsNone(mr.errors.blocking)
+
+ def testProcessFormData_TooLongComment(self):
+ """Test PFD rejects comments that are too long."""
+ created_issue_1 = fake.MakeTestIssue(
+ 789, 1, 'issue summary', 'New', 111, reporter_id=111)
+ self.services.issue.TestAddIssue(created_issue_1)
+ local_id_1 = created_issue_1.local_id
+
+ mr = testing_helpers.MakeMonorailRequest(
+ project=self.project,
+ perms=permissions.OWNER_ACTIVE_PERMISSIONSET,
+ user_info={'user_id': 111})
+ mr.local_id_list = [local_id_1]
+
+ post_data = fake.PostData(
+ owner=['owner@example.com'],
+ can=[1],
+ q=[''],
+ colspec=[''],
+ sort=[''],
+ groupby=[''],
+ start=[0],
+ num=[100],
+ comment=[' ' + 'c' * tracker_constants.MAX_COMMENT_CHARS + ' '])
+ self._MockMethods()
+ self.servlet.ProcessFormData(mr, post_data)
+ self.assertTrue(mr.errors.AnyErrors())
+ self.assertEqual('Comment is too long.', mr.errors.comment)