# Copyright 2022 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Unittest for the tracker helpers module."""
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import

import copy
import mock
import unittest
import io
import six

import settings

from businesslogic import work_env
from framework import exceptions
from framework import framework_constants
from framework import framework_helpers
from framework import permissions
from framework import template_helpers
from framework import urls
from mrproto import project_pb2
from mrproto import tracker_pb2
from mrproto import user_pb2
from services import service_manager
from testing import fake
from testing import testing_helpers
from tracker import tracker_bizobj
from tracker import tracker_constants
from tracker import tracker_helpers
from werkzeug.datastructures import FileStorage

TEST_ID_MAP = {
    'a@example.com': 1,
    'b@example.com': 2,
    'c@example.com': 3,
    'd@example.com': 4,
    }


def _Issue(project_name, local_id, summary='', status='', project_id=789):
  issue = tracker_pb2.Issue()
  issue.project_name = project_name
  issue.project_id = project_id
  issue.local_id = local_id
  issue.issue_id = 100000 + local_id
  issue.summary = summary
  issue.status = status
  return issue


def _MakeConfig():
  config = tracker_pb2.ProjectIssueConfig()
  config.well_known_statuses.append(tracker_pb2.StatusDef(
      means_open=True, status='New', deprecated=False))
  config.well_known_statuses.append(tracker_pb2.StatusDef(
      status='Old', means_open=False, deprecated=False))
  config.well_known_statuses.append(tracker_pb2.StatusDef(
      status='StatusThatWeDontUseAnymore', means_open=False, deprecated=True))

  return config


class HelpersTest(unittest.TestCase):

  def setUp(self):
    self.services = service_manager.Services(
        project=fake.ProjectService(),
        config=fake.ConfigService(),
        issue=fake.IssueService(),
        user=fake.UserService(),
        usergroup=fake.UserGroupService())

    for email, user_id in TEST_ID_MAP.items():
      self.services.user.TestAddUser(email, user_id)

    self.services.project.TestAddProject('testproj', project_id=789)
    self.issue1 = fake.MakeTestIssue(789, 1, 'one', 'New', 111)
    self.issue1.project_name = 'testproj'
    self.services.issue.TestAddIssue(self.issue1)
    self.issue2 = fake.MakeTestIssue(789, 2, 'two', 'New', 111)
    self.issue2.project_name = 'testproj'
    self.services.issue.TestAddIssue(self.issue2)
    self.issue3 = fake.MakeTestIssue(789, 3, 'three', 'New', 111)
    self.issue3.project_name = 'testproj'
    self.services.issue.TestAddIssue(self.issue3)
    self.cnxn = 'fake connextion'
    self.errors = template_helpers.EZTError()
    self.default_colspec_param = 'colspec=%s' % (
        tracker_constants.DEFAULT_COL_SPEC.replace(' ', '%20'))
    self.services.usergroup.TestAddGroupSettings(999, 'group@example.com')

  def testParseIssueRequest_Empty(self):
    post_data = fake.PostData()
    errors = template_helpers.EZTError()
    parsed = tracker_helpers.ParseIssueRequest(
        'fake cnxn', post_data, self.services, errors, 'proj')
    self.assertEqual('', parsed.summary)
    self.assertEqual('', parsed.comment)
    self.assertEqual('', parsed.status)
    self.assertEqual('', parsed.users.owner_username)
    self.assertEqual(0, parsed.users.owner_id)
    self.assertEqual([], parsed.users.cc_usernames)
    self.assertEqual([], parsed.users.cc_usernames_remove)
    self.assertEqual([], parsed.users.cc_ids)
    self.assertEqual([], parsed.users.cc_ids_remove)
    self.assertEqual('', parsed.template_name)
    self.assertEqual([], parsed.labels)
    self.assertEqual([], parsed.labels_remove)
    self.assertEqual({}, parsed.fields.vals)
    self.assertEqual({}, parsed.fields.vals_remove)
    self.assertEqual([], parsed.fields.fields_clear)
    self.assertEqual('', parsed.blocked_on.entered_str)
    self.assertEqual([], parsed.blocked_on.iids)

  def testParseIssueRequest_Normal(self):
    post_data = fake.PostData({
        'summary': ['some summary'],
        'comment': ['some comment'],
        'status': ['SomeStatus'],
        'template_name': ['some template'],
        'label': ['lab1', '-lab2'],
        'custom_123': ['field1123a', 'field1123b'],
        })
    errors = template_helpers.EZTError()
    parsed = tracker_helpers.ParseIssueRequest(
        'fake cnxn', post_data, self.services, errors, 'proj')
    self.assertEqual('some summary', parsed.summary)
    self.assertEqual('some comment', parsed.comment)
    self.assertEqual('SomeStatus', parsed.status)
    self.assertEqual('', parsed.users.owner_username)
    self.assertEqual(0, parsed.users.owner_id)
    self.assertEqual([], parsed.users.cc_usernames)
    self.assertEqual([], parsed.users.cc_usernames_remove)
    self.assertEqual([], parsed.users.cc_ids)
    self.assertEqual([], parsed.users.cc_ids_remove)
    self.assertEqual('some template', parsed.template_name)
    self.assertEqual(['lab1'], parsed.labels)
    self.assertEqual(['lab2'], parsed.labels_remove)
    self.assertEqual({123: ['field1123a', 'field1123b']}, parsed.fields.vals)
    self.assertEqual({}, parsed.fields.vals_remove)
    self.assertEqual([], parsed.fields.fields_clear)

  def testMarkupDescriptionOnInput(self):
    content = 'What?\nthat\nWhy?\nidk\nWhere?\n'
    tmpl_txt = 'What?\nWhy?\nWhere?\nWhen?'
    desc = '<b>What?</b>\nthat\n<b>Why?</b>\nidk\n<b>Where?</b>\n'
    self.assertEqual(tracker_helpers.MarkupDescriptionOnInput(
        content, tmpl_txt), desc)

  def testMarkupDescriptionLineOnInput(self):
    line = 'What happened??'
    tmpl_lines = ['What happened??','Why?']
    self.assertEqual(tracker_helpers._MarkupDescriptionLineOnInput(
        line, tmpl_lines), '<b>What happened??</b>')

    line = 'Something terrible!!!'
    self.assertEqual(tracker_helpers._MarkupDescriptionLineOnInput(
        line, tmpl_lines), 'Something terrible!!!')

  def testClassifyPlusMinusItems(self):
    add, remove = tracker_helpers._ClassifyPlusMinusItems([])
    self.assertEqual([], add)
    self.assertEqual([], remove)

    add, remove = tracker_helpers._ClassifyPlusMinusItems(
        ['', ' ', '  \t', '-'])
    six.assertCountEqual(self, [], add)
    six.assertCountEqual(self, [], remove)

    add, remove = tracker_helpers._ClassifyPlusMinusItems(
        ['a', 'b', 'c'])
    six.assertCountEqual(self, ['a', 'b', 'c'], add)
    six.assertCountEqual(self, [], remove)

    add, remove = tracker_helpers._ClassifyPlusMinusItems(
        ['a-a-a', 'b-b', 'c-'])
    six.assertCountEqual(self, ['a-a-a', 'b-b', 'c-'], add)
    six.assertCountEqual(self, [], remove)

    add, remove = tracker_helpers._ClassifyPlusMinusItems(
        ['-a'])
    six.assertCountEqual(self, [], add)
    six.assertCountEqual(self, ['a'], remove)

    add, remove = tracker_helpers._ClassifyPlusMinusItems(
        ['-a', 'b', 'c-c'])
    six.assertCountEqual(self, ['b', 'c-c'], add)
    six.assertCountEqual(self, ['a'], remove)

    add, remove = tracker_helpers._ClassifyPlusMinusItems(
        ['-a', '-b-b', '-c-'])
    six.assertCountEqual(self, [], add)
    six.assertCountEqual(self, ['a', 'b-b', 'c-'], remove)

    # We dedup, but we don't cancel out items that are both added and removed.
    add, remove = tracker_helpers._ClassifyPlusMinusItems(
        ['a', 'a', '-a'])
    six.assertCountEqual(self, ['a'], add)
    six.assertCountEqual(self, ['a'], remove)

  def testParseIssueRequestFields(self):
    parsed_fields = tracker_helpers._ParseIssueRequestFields(fake.PostData({
        'custom_1': ['https://hello.com'],
        'custom_12': ['https://blah.com'],
        'custom_14': ['https://remove.com'],
        'custom_15_goats': ['2', '3'],
        'custom_15_sheep': ['3', '5'],
        'custom_16_sheep': ['yarn'],
        'op_custom_14': ['remove'],
        'op_custom_12': ['clear'],
        'op_custom_16_sheep': ['remove'],
        'ignore': 'no matter',}))
    self.assertEqual(
        parsed_fields,
        tracker_helpers.ParsedFields(
            {
                1: ['https://hello.com'],
                12: ['https://blah.com']
            }, {14: ['https://remove.com']}, [12],
            {15: {
                'goats': ['2', '3'],
                'sheep': ['3', '5']
            }}, {16: {
                'sheep': ['yarn']
            }}))

  def testParseIssueRequestAttachments(self):
    file1 = FileStorage(
        stream=io.BytesIO(b'hello world'),
        filename='hello.c',
    )
    file2 = FileStorage(
        stream=io.BytesIO(b'Welcome to our project'),
        filename='README',
    )

    file3 = FileStorage(
        stream=io.BytesIO(b'Abort, Retry, or Fail?'),
        filename='c:\\dir\\subdir\\FILENAME.EXT',
    )

    # Browsers send this if FILE field was not filled in.
    file4 = FileStorage(
        stream=io.BytesIO(b''),
        filename='',
    )

    attachments = tracker_helpers._ParseIssueRequestAttachments({})
    self.assertEqual([], attachments)

    attachments = tracker_helpers._ParseIssueRequestAttachments(fake.PostData({
        'file1': [file1],
        }))
    self.assertEqual([('hello.c', b'hello world', 'text/plain')], attachments)
    file1.seek(0)

    attachments = tracker_helpers._ParseIssueRequestAttachments(fake.PostData({
        'file1': [file1],
        'file2': [file2],
        }))
    self.assertEqual(
        [
            ('hello.c', b'hello world', 'text/plain'),
            ('README', b'Welcome to our project', 'text/plain')
        ], attachments)
    file1.seek(0)
    file2.seek(0)

    attachments = tracker_helpers._ParseIssueRequestAttachments(fake.PostData({
        'file3': [file3],
        }))
    self.assertEqual(
        [
            (
                'FILENAME.EXT', b'Abort, Retry, or Fail?',
                'application/octet-stream')
        ], attachments)
    file3.seek(0)

    attachments = tracker_helpers._ParseIssueRequestAttachments(fake.PostData({
        'file1': [file4],  # Does not appear in result
        'file3': [file3],
        'file4': [file4],  # Does not appear in result
        }))
    self.assertEqual(
        [
            (
                'FILENAME.EXT', b'Abort, Retry, or Fail?',
                'application/octet-stream')
        ], attachments)
    file3.seek(0)

  def testParseIssueRequestKeptAttachments(self):
    pass  # TODO(jrobbins): Write this test.

  def testParseIssueRequestUsers(self):
    post_data = {}
    parsed_users = tracker_helpers._ParseIssueRequestUsers(
        'fake connection', post_data, self.services)
    self.assertEqual('', parsed_users.owner_username)
    self.assertEqual(
        framework_constants.NO_USER_SPECIFIED, parsed_users.owner_id)
    self.assertEqual([], parsed_users.cc_usernames)
    self.assertEqual([], parsed_users.cc_usernames_remove)
    self.assertEqual([], parsed_users.cc_ids)
    self.assertEqual([], parsed_users.cc_ids_remove)

    post_data = fake.PostData({
        'owner': [''],
        })
    parsed_users = tracker_helpers._ParseIssueRequestUsers(
        'fake connection', post_data, self.services)
    self.assertEqual('', parsed_users.owner_username)
    self.assertEqual(
        framework_constants.NO_USER_SPECIFIED, parsed_users.owner_id)
    self.assertEqual([], parsed_users.cc_usernames)
    self.assertEqual([], parsed_users.cc_usernames_remove)
    self.assertEqual([], parsed_users.cc_ids)
    self.assertEqual([], parsed_users.cc_ids_remove)

    post_data = fake.PostData({
        'owner': [' \t'],
        })
    parsed_users = tracker_helpers._ParseIssueRequestUsers(
        'fake connection', post_data, self.services)
    self.assertEqual('', parsed_users.owner_username)
    self.assertEqual(
        framework_constants.NO_USER_SPECIFIED, parsed_users.owner_id)
    self.assertEqual([], parsed_users.cc_usernames)
    self.assertEqual([], parsed_users.cc_usernames_remove)
    self.assertEqual([], parsed_users.cc_ids)
    self.assertEqual([], parsed_users.cc_ids_remove)

    post_data = fake.PostData({
        'owner': ['b@example.com'],
        })
    parsed_users = tracker_helpers._ParseIssueRequestUsers(
        'fake connection', post_data, self.services)
    self.assertEqual('b@example.com', parsed_users.owner_username)
    self.assertEqual(TEST_ID_MAP['b@example.com'], parsed_users.owner_id)
    self.assertEqual([], parsed_users.cc_usernames)
    self.assertEqual([], parsed_users.cc_usernames_remove)
    self.assertEqual([], parsed_users.cc_ids)
    self.assertEqual([], parsed_users.cc_ids_remove)

    post_data = fake.PostData({
        'owner': ['b@example.com'],
        })
    parsed_users = tracker_helpers._ParseIssueRequestUsers(
        'fake connection', post_data, self.services)
    self.assertEqual('b@example.com', parsed_users.owner_username)
    self.assertEqual(TEST_ID_MAP['b@example.com'], parsed_users.owner_id)
    self.assertEqual([], parsed_users.cc_usernames)
    self.assertEqual([], parsed_users.cc_usernames_remove)
    self.assertEqual([], parsed_users.cc_ids)
    self.assertEqual([], parsed_users.cc_ids_remove)

    post_data = fake.PostData({
        'cc': ['b@example.com'],
        })
    parsed_users = tracker_helpers._ParseIssueRequestUsers(
        'fake connection', post_data, self.services)
    self.assertEqual('', parsed_users.owner_username)
    self.assertEqual(
        framework_constants.NO_USER_SPECIFIED, parsed_users.owner_id)
    self.assertEqual(['b@example.com'], parsed_users.cc_usernames)
    self.assertEqual([], parsed_users.cc_usernames_remove)
    self.assertEqual([TEST_ID_MAP['b@example.com']], parsed_users.cc_ids)
    self.assertEqual([], parsed_users.cc_ids_remove)

    post_data = fake.PostData({
        'cc': ['-b@example.com, c@example.com,,'
               'a@example.com,'],
        })
    parsed_users = tracker_helpers._ParseIssueRequestUsers(
        'fake connection', post_data, self.services)
    self.assertEqual('', parsed_users.owner_username)
    self.assertEqual(
        framework_constants.NO_USER_SPECIFIED, parsed_users.owner_id)
    six.assertCountEqual(
        self, ['c@example.com', 'a@example.com'], parsed_users.cc_usernames)
    self.assertEqual(['b@example.com'], parsed_users.cc_usernames_remove)
    six.assertCountEqual(
        self, [TEST_ID_MAP['c@example.com'], TEST_ID_MAP['a@example.com']],
        parsed_users.cc_ids)
    self.assertEqual([TEST_ID_MAP['b@example.com']],
                      parsed_users.cc_ids_remove)

    post_data = fake.PostData({
        'owner': ['fuhqwhgads@example.com'],
        'cc': ['c@example.com, fuhqwhgads@example.com'],
        })
    parsed_users = tracker_helpers._ParseIssueRequestUsers(
        'fake connection', post_data, self.services)
    self.assertEqual('fuhqwhgads@example.com', parsed_users.owner_username)
    gen_uid = framework_helpers.MurmurHash3_x86_32(parsed_users.owner_username)
    self.assertEqual(gen_uid, parsed_users.owner_id)  # autocreated user
    six.assertCountEqual(
        self, ['c@example.com', 'fuhqwhgads@example.com'],
        parsed_users.cc_usernames)
    self.assertEqual([], parsed_users.cc_usernames_remove)
    six.assertCountEqual(
        self, [TEST_ID_MAP['c@example.com'], gen_uid], parsed_users.cc_ids)
    self.assertEqual([], parsed_users.cc_ids_remove)

    post_data = fake.PostData({
        'cc': ['C@example.com, b@exAmple.cOm'],
        })
    parsed_users = tracker_helpers._ParseIssueRequestUsers(
        'fake connection', post_data, self.services)
    six.assertCountEqual(
        self, ['c@example.com', 'b@example.com'], parsed_users.cc_usernames)
    self.assertEqual([], parsed_users.cc_usernames_remove)
    six.assertCountEqual(
        self, [TEST_ID_MAP['c@example.com'], TEST_ID_MAP['b@example.com']],
        parsed_users.cc_ids)
    self.assertEqual([], parsed_users.cc_ids_remove)

  def testParseBlockers_BlockedOnNothing(self):
    """Was blocked on nothing, still nothing."""
    post_data = {tracker_helpers.BLOCKED_ON: ''}
    parsed_blockers = tracker_helpers._ParseBlockers(
        self.cnxn, post_data, self.services, self.errors, 'testproj',
        tracker_helpers.BLOCKED_ON)

    self.assertEqual('', parsed_blockers.entered_str)
    self.assertEqual([], parsed_blockers.iids)
    self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKED_ON))
    self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKING))

  def testParseBlockers_BlockedOnAdded(self):
    """Was blocked on nothing; now 1, 2, 3."""
    post_data = {tracker_helpers.BLOCKED_ON: '1, 2, 3'}
    parsed_blockers = tracker_helpers._ParseBlockers(
        self.cnxn, post_data, self.services, self.errors, 'testproj',
        tracker_helpers.BLOCKED_ON)

    self.assertEqual('1, 2, 3', parsed_blockers.entered_str)
    self.assertEqual([100001, 100002, 100003], parsed_blockers.iids)
    self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKED_ON))
    self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKING))

  def testParseBlockers_BlockedOnDuplicateRef(self):
    """Was blocked on nothing; now just 2, but repeated in input."""
    post_data = {tracker_helpers.BLOCKED_ON: '2, 2, 2'}
    parsed_blockers = tracker_helpers._ParseBlockers(
        self.cnxn, post_data, self.services, self.errors, 'testproj',
        tracker_helpers.BLOCKED_ON)

    self.assertEqual('2, 2, 2', parsed_blockers.entered_str)
    self.assertEqual([100002], parsed_blockers.iids)
    self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKED_ON))
    self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKING))

  def testParseBlockers_Missing(self):
    """Parsing an input field that was not in the POST."""
    post_data = {}
    parsed_blockers = tracker_helpers._ParseBlockers(
        self.cnxn, post_data, self.services, self.errors, 'testproj',
        tracker_helpers.BLOCKED_ON)

    self.assertEqual('', parsed_blockers.entered_str)
    self.assertEqual([], parsed_blockers.iids)
    self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKED_ON))
    self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKING))

  def testParseBlockers_SameIssueNoProject(self):
    """Adding same issue as blocker should modify the errors object."""
    post_data = {'id': '2', tracker_helpers.BLOCKING: '2, 3'}

    parsed_blockers = tracker_helpers._ParseBlockers(
        self.cnxn, post_data, self.services, self.errors, 'testproj',
        tracker_helpers.BLOCKING)
    self.assertEqual('2, 3', parsed_blockers.entered_str)
    self.assertEqual([], parsed_blockers.iids)
    self.assertEqual(
        getattr(self.errors, tracker_helpers.BLOCKING),
        'Cannot be blocking the same issue')
    self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKED_ON))

  def testParseBlockers_SameIssueSameProject(self):
    """Adding same issue as blocker should modify the errors object."""
    post_data = {'id': '2', tracker_helpers.BLOCKING: 'testproj:2, 3'}

    parsed_blockers = tracker_helpers._ParseBlockers(
        self.cnxn, post_data, self.services, self.errors, 'testproj',
        tracker_helpers.BLOCKING)
    self.assertEqual('testproj:2, 3', parsed_blockers.entered_str)
    self.assertEqual([], parsed_blockers.iids)
    self.assertEqual(
        getattr(self.errors, tracker_helpers.BLOCKING),
        'Cannot be blocking the same issue')
    self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKED_ON))

  def testParseBlockers_SameIssueDifferentProject(self):
    """Adding different blocker issue should not modify the errors object."""
    post_data = {'id': '2', tracker_helpers.BLOCKING: 'testproj:2'}

    parsed_blockers = tracker_helpers._ParseBlockers(
        self.cnxn, post_data, self.services, self.errors, 'testprojB',
        tracker_helpers.BLOCKING)
    self.assertEqual('testproj:2', parsed_blockers.entered_str)
    self.assertEqual([100002], parsed_blockers.iids)
    self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKING))
    self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKED_ON))

  def testParseBlockers_Invalid(self):
    """Input fields with invalid values should modify the errors object."""
    post_data = {tracker_helpers.BLOCKING: '2, foo',
                 tracker_helpers.BLOCKED_ON: '3, bar'}

    parsed_blockers = tracker_helpers._ParseBlockers(
        self.cnxn, post_data, self.services, self.errors, 'testproj',
        tracker_helpers.BLOCKING)
    self.assertEqual('2, foo', parsed_blockers.entered_str)
    self.assertEqual([100002], parsed_blockers.iids)
    self.assertEqual(
        getattr(self.errors, tracker_helpers.BLOCKING), 'Invalid issue ID foo')
    self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKED_ON))

    parsed_blockers = tracker_helpers._ParseBlockers(
        self.cnxn, post_data, self.services, self.errors, 'testproj',
        tracker_helpers.BLOCKED_ON)
    self.assertEqual('3, bar', parsed_blockers.entered_str)
    self.assertEqual([100003], parsed_blockers.iids)
    self.assertEqual(
        getattr(self.errors, tracker_helpers.BLOCKED_ON),
        'Invalid issue ID bar')

  def testParseBlockers_Dangling(self):
    """A ref to a sanctioned projected should be allowed."""
    post_data = {'id': '2', tracker_helpers.BLOCKING: 'otherproj:2'}
    real_codesite_projects = settings.recognized_codesite_projects
    settings.recognized_codesite_projects = ['otherproj']
    parsed_blockers = tracker_helpers._ParseBlockers(
        self.cnxn, post_data, self.services, self.errors, 'testproj',
        tracker_helpers.BLOCKING)
    self.assertEqual('otherproj:2', parsed_blockers.entered_str)
    self.assertEqual([('otherproj', 2)], parsed_blockers.dangling_refs)
    settings.recognized_codesite_projects = real_codesite_projects

  def testParseBlockers_FederatedReferences(self):
    """Should parse and return FedRefs."""
    post_data = {'id': '9', tracker_helpers.BLOCKING: '2, b/123, 3, b/789'}
    parsed_blockers = tracker_helpers._ParseBlockers(
        self.cnxn, post_data, self.services, self.errors, 'testproj',
        tracker_helpers.BLOCKING)
    self.assertEqual('2, b/123, 3, b/789', parsed_blockers.entered_str)
    self.assertEqual([100002, 100003], parsed_blockers.iids)
    self.assertEqual(['b/123', 'b/789'], parsed_blockers.federated_ref_strings)

  def testIsValidIssueOwner(self):
    project = project_pb2.Project()
    project.owner_ids.extend([1, 2])
    project.committer_ids.extend([3])
    project.contributor_ids.extend([4, 999])

    valid, _ = tracker_helpers.IsValidIssueOwner(
        'fake cnxn', project, framework_constants.NO_USER_SPECIFIED,
        self.services)
    self.assertTrue(valid)

    valid, _ = tracker_helpers.IsValidIssueOwner(
        'fake cnxn', project, 1,
        self.services)
    self.assertTrue(valid)
    valid, _ = tracker_helpers.IsValidIssueOwner(
        'fake cnxn', project, 2,
        self.services)
    self.assertTrue(valid)
    valid, _ = tracker_helpers.IsValidIssueOwner(
        'fake cnxn', project, 3,
        self.services)
    self.assertTrue(valid)
    valid, _ = tracker_helpers.IsValidIssueOwner(
        'fake cnxn', project, 4,
        self.services)
    self.assertTrue(valid)

    valid, _ = tracker_helpers.IsValidIssueOwner(
        'fake cnxn', project, 7,
        self.services)
    self.assertFalse(valid)

    valid, _ = tracker_helpers.IsValidIssueOwner(
        'fake cnxn', project, 999,
        self.services)
    self.assertFalse(valid)

  # MakeViewsForUsersInIssuesTest is tested in MakeViewsForUsersInIssuesTest.

  def testGetAllowedOpenedAndClosedIssues(self):
    pass  # TOOD(jrobbins): Write this test.

  def testFormatIssueListURL_JumpedToIssue(self):
    """If we jumped to issue 123, the list is can=1&q=id-123."""
    config = tracker_pb2.ProjectIssueConfig()
    path = '/p/proj/issues/detail?id=123&q=123'
    mr = testing_helpers.MakeMonorailRequest(
        path=path, headers={'Host': 'code.google.com'})
    mr.ComputeColSpec(config)

    absolute_base_url = 'http://code.google.com'

    url_1 = tracker_helpers.FormatIssueListURL(mr, config)
    self.assertEqual(
        '%s/p/proj/issues/list?can=1&%s&q=id%%3D123' % (
            absolute_base_url, self.default_colspec_param),
        url_1)

  def testFormatIssueListURL_NoCurrentState(self):
    config = tracker_pb2.ProjectIssueConfig()
    path = '/p/proj/issues/detail?id=123'
    mr = testing_helpers.MakeMonorailRequest(
        path=path, headers={'Host': 'code.google.com'})
    mr.ComputeColSpec(config)

    absolute_base_url = 'http://code.google.com'

    url_1 = tracker_helpers.FormatIssueListURL(mr, config)
    self.assertEqual(
        '%s/p/proj/issues/list?%s&q=' % (
            absolute_base_url, self.default_colspec_param),
        url_1)

    url_2 = tracker_helpers.FormatIssueListURL(
        mr, config, foo=123)
    self.assertEqual(
        '%s/p/proj/issues/list?%s&foo=123&q=' % (
            absolute_base_url, self.default_colspec_param),
        url_2)

    url_3 = tracker_helpers.FormatIssueListURL(
        mr, config, foo=123, bar='abc')
    self.assertEqual(
        '%s/p/proj/issues/list?bar=abc&%s&foo=123&q=' % (
            absolute_base_url, self.default_colspec_param),
        url_3)

    url_4 = tracker_helpers.FormatIssueListURL(
        mr, config, baz='escaped+encoded&and100% "safe"')
    self.assertEqual(
        '%s/p/proj/issues/list?'
        'baz=escaped%%2Bencoded%%26and100%%25%%20%%22safe%%22&%s&q=' % (
            absolute_base_url, self.default_colspec_param),
        url_4)

  def testFormatIssueListURL_KeepCurrentState(self):
    config = tracker_pb2.ProjectIssueConfig()
    path = '/p/proj/issues/detail?id=123&sort=aa&colspec=a b c&groupby=d'
    mr = testing_helpers.MakeMonorailRequest(
        path=path, headers={'Host': 'localhost:8080'})
    mr.ComputeColSpec(config)

    absolute_base_url = 'http://localhost:8080'

    url_1 = tracker_helpers.FormatIssueListURL(mr, config)
    self.assertEqual(
        '%s/p/proj/issues/list?colspec=a%%20b%%20c'
        '&groupby=d&q=&sort=aa' % absolute_base_url,
        url_1)

    url_2 = tracker_helpers.FormatIssueListURL(
        mr, config, foo=123)
    self.assertEqual(
        '%s/p/proj/issues/list?'
        'colspec=a%%20b%%20c&foo=123&groupby=d&q=&sort=aa' % absolute_base_url,
        url_2)

    url_3 = tracker_helpers.FormatIssueListURL(
        mr, config, colspec='X Y Z')
    self.assertEqual(
        '%s/p/proj/issues/list?colspec=a%%20b%%20c'
        '&groupby=d&q=&sort=aa' % absolute_base_url,
        url_3)

  def testFormatRelativeIssueURL(self):
    self.assertEqual(
        '/p/proj/issues/attachment',
        tracker_helpers.FormatRelativeIssueURL(
            'proj', urls.ISSUE_ATTACHMENT))

    self.assertEqual(
        '/p/proj/issues/detail?id=123',
        tracker_helpers.FormatRelativeIssueURL(
            'proj', urls.ISSUE_DETAIL, id=123))

  @mock.patch('google.appengine.api.app_identity.get_application_id')
  def testFormatCrBugURL_Prod(self, mock_get_app_id):
    mock_get_app_id.return_value = 'monorail-prod'
    self.assertEqual(
        'https://crbug.com/proj/123',
        tracker_helpers.FormatCrBugURL('proj', 123))
    self.assertEqual(
        'https://crbug.com/123456',
        tracker_helpers.FormatCrBugURL('chromium', 123456))

  @mock.patch('google.appengine.api.app_identity.get_application_id')
  def testFormatCrBugURL_NonProd(self, mock_get_app_id):
    mock_get_app_id.return_value = 'monorail-staging'
    self.assertEqual(
        '/p/proj/issues/detail?id=123',
        tracker_helpers.FormatCrBugURL('proj', 123))
    self.assertEqual(
        '/p/chromium/issues/detail?id=123456',
        tracker_helpers.FormatCrBugURL('chromium', 123456))

  @mock.patch('tracker.tracker_constants.ISSUE_ATTACHMENTS_QUOTA_HARD', 1)
  def testComputeNewQuotaBytesUsed_ProjectQuota(self):
    upload_1 = framework_helpers.AttachmentUpload(
        'matter not', b'three men make a tiger', 'matter not')
    upload_2 = framework_helpers.AttachmentUpload(
        'matter not', b'chicken', 'matter not')
    attachments = [upload_1, upload_2]

    project = fake.Project()
    project.attachment_bytes_used = 10
    project.attachment_quota = project.attachment_bytes_used + len(
        upload_1.contents + upload_2.contents) + 1

    actual_new = tracker_helpers.ComputeNewQuotaBytesUsed(project, attachments)
    expected_new = project.attachment_quota - 1
    self.assertEqual(actual_new, expected_new)

    upload_3 = framework_helpers.AttachmentUpload(
        'matter not', b'donut', 'matter not')
    attachments.append(upload_3)
    with self.assertRaises(exceptions.OverAttachmentQuota):
      tracker_helpers.ComputeNewQuotaBytesUsed(project, attachments)

  @mock.patch(
      'tracker.tracker_constants.ISSUE_ATTACHMENTS_QUOTA_HARD', len('tiger'))
  def testComputeNewQuotaBytesUsed_GeneralQuota(self):
    upload_1 = framework_helpers.AttachmentUpload(
        'matter not', b'tiger', 'matter not')
    attachments = [upload_1]

    project = fake.Project()

    actual_new = tracker_helpers.ComputeNewQuotaBytesUsed(project, attachments)
    expected_new = len(upload_1.contents)
    self.assertEqual(actual_new, expected_new)

    upload_2 = framework_helpers.AttachmentUpload(
        'matter not', b'donut', 'matter not')
    attachments.append(upload_2)
    with self.assertRaises(exceptions.OverAttachmentQuota):
      tracker_helpers.ComputeNewQuotaBytesUsed(project, attachments)

    upload_3 = framework_helpers.AttachmentUpload(
        'matter not', b'donut', 'matter not')
    attachments.append(upload_3)
    with self.assertRaises(exceptions.OverAttachmentQuota):
      tracker_helpers.ComputeNewQuotaBytesUsed(project, attachments)

  def testIsUnderSoftAttachmentQuota(self):
    pass  # TODO(jrobbins): Write this test.

  # GetAllIssueProjects is tested in GetAllIssueProjectsTest.

  def testGetPermissionsInAllProjects(self):
    pass  # TODO(jrobbins): Write this test.

  # FilterOutNonViewableIssues is tested in FilterOutNonViewableIssuesTest.

  def testMeansOpenInProject(self):
    config = _MakeConfig()

    # ensure open means open
    self.assertTrue(tracker_helpers.MeansOpenInProject('New', config))
    self.assertTrue(tracker_helpers.MeansOpenInProject('new', config))

    # ensure an unrecognized status means open
    self.assertTrue(tracker_helpers.MeansOpenInProject(
        '_undefined_status_', config))

    # ensure closed means closed
    self.assertFalse(tracker_helpers.MeansOpenInProject('Old', config))
    self.assertFalse(tracker_helpers.MeansOpenInProject('old', config))
    self.assertFalse(tracker_helpers.MeansOpenInProject(
        'StatusThatWeDontUseAnymore', config))

  def testIsNoisy(self):
    self.assertTrue(tracker_helpers.IsNoisy(778, 320))
    self.assertFalse(tracker_helpers.IsNoisy(20, 500))
    self.assertFalse(tracker_helpers.IsNoisy(500, 20))
    self.assertFalse(tracker_helpers.IsNoisy(1, 1))

  def testMergeCCsAndAddComment(self):
    target_issue = fake.MakeTestIssue(
        789, 10, 'Target issue', 'New', 111)
    source_issue = fake.MakeTestIssue(
        789, 100, 'Source issue', 'New', 222)
    source_issue.cc_ids.append(111)
    # Issue without owner
    source_issue_2 = fake.MakeTestIssue(
        789, 101, 'Source issue 2', 'New', 0)

    self.services.issue.TestAddIssue(target_issue)
    self.services.issue.TestAddIssue(source_issue)
    self.services.issue.TestAddIssue(source_issue_2)

    # We copy this list so that it isn't updated by the test framework
    initial_issue_comments = (
        self.services.issue.GetCommentsForIssue(
            'fake cnxn', target_issue.issue_id)[:])
    mr = testing_helpers.MakeMonorailRequest(user_info={'user_id': 111})

    # Merging source into target should create a comment.
    self.assertIsNotNone(
        tracker_helpers.MergeCCsAndAddComment(
            self.services, mr, source_issue, target_issue))
    updated_issue_comments = self.services.issue.GetCommentsForIssue(
        'fake cnxn', target_issue.issue_id)
    for comment in initial_issue_comments:
      self.assertIn(comment, updated_issue_comments)
      self.assertEqual(
          len(initial_issue_comments) + 1, len(updated_issue_comments))

    # Merging source into target should add source's owner to target's CCs.
    updated_target_issue = self.services.issue.GetIssueByLocalID(
        'fake cnxn', 789, 10)
    self.assertIn(111, updated_target_issue.cc_ids)
    self.assertIn(222, updated_target_issue.cc_ids)

    # Merging source 2 into target should make a comment, but not update CCs.
    self.assertIsNotNone(
        tracker_helpers.MergeCCsAndAddComment(
            self.services, mr, source_issue_2, updated_target_issue))
    updated_target_issue = self.services.issue.GetIssueByLocalID(
        'fake cnxn', 789, 10)
    self.assertNotIn(0, updated_target_issue.cc_ids)

  def testMergeCCsAndAddComment_RestrictedSourceIssue(self):
    target_issue = fake.MakeTestIssue(
        789, 10, 'Target issue', 'New', 222)
    target_issue_2 = fake.MakeTestIssue(
        789, 11, 'Target issue 2', 'New', 222)
    source_issue = fake.MakeTestIssue(
        789, 100, 'Source issue', 'New', 111)
    source_issue.cc_ids.append(111)
    source_issue.labels.append('Restrict-View-Commit')
    target_issue_2.labels.append('Restrict-View-Commit')

    self.services.issue.TestAddIssue(source_issue)
    self.services.issue.TestAddIssue(target_issue)
    self.services.issue.TestAddIssue(target_issue_2)

    # We copy this list so that it isn't updated by the test framework
    initial_issue_comments = self.services.issue.GetCommentsForIssue(
        'fake cnxn', target_issue.issue_id)[:]
    mr = testing_helpers.MakeMonorailRequest(user_info={'user_id': 111})
    self.assertIsNotNone(
        tracker_helpers.MergeCCsAndAddComment(
            self.services, mr, source_issue, target_issue))

    # When the source is restricted, we update the target comments...
    updated_issue_comments = self.services.issue.GetCommentsForIssue(
        'fake cnxn', target_issue.issue_id)
    for comment in initial_issue_comments:
      self.assertIn(comment, updated_issue_comments)
      self.assertEqual(
          len(initial_issue_comments) + 1, len(updated_issue_comments))
    # ...but not the target CCs...
    updated_target_issue = self.services.issue.GetIssueByLocalID(
        'fake cnxn', 789, 10)
    self.assertNotIn(111, updated_target_issue.cc_ids)
    # ...unless both issues have the same restrictions.
    self.assertIsNotNone(
        tracker_helpers.MergeCCsAndAddComment(
            self.services, mr, source_issue, target_issue_2))
    updated_target_issue_2 = self.services.issue.GetIssueByLocalID(
        'fake cnxn', 789, 11)
    self.assertIn(111, updated_target_issue_2.cc_ids)

  def testMergeCCsAndAddCommentMultipleIssues(self):
    pass  # TODO(jrobbins): Write this test.

  def testGetAttachmentIfAllowed(self):
    pass  # TODO(jrobbins): Write this test.

  def testLabelsMaskedByFields(self):
    pass  # TODO(jrobbins): Write this test.

  def testLabelsNotMaskedByFields(self):
    pass  # TODO(jrobbins): Write this test.

  def testLookupComponentIDs(self):
    pass  # TODO(jrobbins): Write this test.

  def testParsePostDataUsers(self):
    pd_users = 'a@example.com, b@example.com'

    pd_users_ids, pd_users_str = tracker_helpers.ParsePostDataUsers(
        self.cnxn, pd_users, self.services.user)

    self.assertEqual([1, 2], sorted(pd_users_ids))
    self.assertEqual('a@example.com, b@example.com', pd_users_str)

  def testParsePostDataUsers_Empty(self):
    pd_users = ''

    pd_users_ids, pd_users_str = tracker_helpers.ParsePostDataUsers(
        self.cnxn, pd_users, self.services.user)

    self.assertEqual([], sorted(pd_users_ids))
    self.assertEqual('', pd_users_str)

  def testFilterIssueTypes(self):
    pass  # TODO(jrobbins): Write this test.

  # ParseMergeFields is tested in IssueMergeTest.
  # AddIssueStarrers is tested in IssueMergeTest.testMergeIssueStars().
  # CanEditProjectIssue is tested in IssueMergeTest.

  def testPairDerivedValuesWithRuleExplanations_Nothing(self):
    """Test we return nothing for an issue with no derived values."""
    proposed_issue = tracker_pb2.Issue()  # No derived values.
    traces = {}
    derived_users_by_id = {}
    actual = tracker_helpers.PairDerivedValuesWithRuleExplanations(
        proposed_issue, traces, derived_users_by_id)
    (derived_labels_and_why, derived_owner_and_why,
     derived_cc_and_why, warnings_and_why, errors_and_why) = actual
    self.assertEqual([], derived_labels_and_why)
    self.assertEqual([], derived_owner_and_why)
    self.assertEqual([], derived_cc_and_why)
    self.assertEqual([], warnings_and_why)
    self.assertEqual([], errors_and_why)

  def testPairDerivedValuesWithRuleExplanations_SomeValues(self):
    """Test we return derived values and explanations for an issue."""
    proposed_issue = tracker_pb2.Issue(
        derived_owner_id=111, derived_cc_ids=[222, 333],
        derived_labels=['aaa', 'zzz'],
        derived_warnings=['Watch out'],
        derived_errors=['Status Assigned requires an owner'])
    traces = {
        (tracker_pb2.FieldID.OWNER, 111): 'explain 1',
        (tracker_pb2.FieldID.CC, 222): 'explain 2',
        (tracker_pb2.FieldID.CC, 333): 'explain 3',
        (tracker_pb2.FieldID.LABELS, 'aaa'): 'explain 4',
        (tracker_pb2.FieldID.WARNING, 'Watch out'): 'explain 6',
        (tracker_pb2.FieldID.ERROR,
         'Status Assigned requires an owner'): 'explain 7',
        # There can be extra traces that are not used.
        (tracker_pb2.FieldID.LABELS, 'bbb'): 'explain 5',
        # If there is no trace for some derived value, why is None.
        }
    derived_users_by_id = {
      111: testing_helpers.Blank(display_name='one@example.com'),
      222: testing_helpers.Blank(display_name='two@example.com'),
      333: testing_helpers.Blank(display_name='three@example.com'),
      }
    actual = tracker_helpers.PairDerivedValuesWithRuleExplanations(
        proposed_issue, traces, derived_users_by_id)
    (derived_labels_and_why, derived_owner_and_why,
     derived_cc_and_why, warnings_and_why, errors_and_why) = actual
    self.assertEqual([
        {'value': 'aaa', 'why': 'explain 4'},
        {'value': 'zzz', 'why': None},
        ], derived_labels_and_why)
    self.assertEqual([
        {'value': 'one@example.com', 'why': 'explain 1'},
        ], derived_owner_and_why)
    self.assertEqual([
        {'value': 'two@example.com', 'why': 'explain 2'},
        {'value': 'three@example.com', 'why': 'explain 3'},
        ], derived_cc_and_why)
    self.assertEqual([
        {'value': 'Watch out', 'why': 'explain 6'},
        ], warnings_and_why)
    self.assertEqual([
        {'value': 'Status Assigned requires an owner', 'why': 'explain 7'},
        ], errors_and_why)


class MakeViewsForUsersInIssuesTest(unittest.TestCase):

  def setUp(self):
    self.issue1 = _Issue('proj', 1)
    self.issue1.owner_id = 1001
    self.issue1.reporter_id = 1002

    self.issue2 = _Issue('proj', 2)
    self.issue2.owner_id = 2001
    self.issue2.reporter_id = 2002
    self.issue2.cc_ids.extend([1, 1001, 1002, 1003])

    self.issue3 = _Issue('proj', 3)
    self.issue3.owner_id = 1001
    self.issue3.reporter_id = 3002

    self.user = fake.UserService()
    for user_id in [1, 1001, 1002, 1003, 2001, 2002, 3002]:
      self.user.TestAddUser(
          'test%d' % user_id, user_id, add_user=True)

  def testMakeViewsForUsersInIssues(self):
    issue_list = [self.issue1, self.issue2, self.issue3]
    users_by_id = tracker_helpers.MakeViewsForUsersInIssues(
        'fake cnxn', issue_list, self.user)
    six.assertCountEqual(
        self, [0, 1, 1001, 1002, 1003, 2001, 2002, 3002],
        list(users_by_id.keys()))
    for user_id in [1001, 1002, 1003, 2001]:
      self.assertEqual(users_by_id[user_id].user_id, user_id)

  def testMakeViewsForUsersInIssuesOmittingSome(self):
    issue_list = [self.issue1, self.issue2, self.issue3]
    users_by_id = tracker_helpers.MakeViewsForUsersInIssues(
        'fake cnxn', issue_list, self.user, omit_ids=[1001, 1003])
    six.assertCountEqual(
        self, [0, 1, 1002, 2001, 2002, 3002], list(users_by_id.keys()))
    for user_id in [1002, 2001, 2002, 3002]:
      self.assertEqual(users_by_id[user_id].user_id, user_id)

  def testMakeViewsForUsersInIssuesEmpty(self):
    issue_list = []
    users_by_id = tracker_helpers.MakeViewsForUsersInIssues(
        'fake cnxn', issue_list, self.user)
    six.assertCountEqual(self, [], list(users_by_id.keys()))


class GetAllIssueProjectsTest(unittest.TestCase):
  issue_x_1 = tracker_pb2.Issue()
  issue_x_1.project_id = 789
  issue_x_1.local_id = 1
  issue_x_1.reporter_id = 1002

  issue_x_2 = tracker_pb2.Issue()
  issue_x_2.project_id = 789
  issue_x_2.local_id = 2
  issue_x_2.reporter_id = 2002

  issue_y_1 = tracker_pb2.Issue()
  issue_y_1.project_id = 678
  issue_y_1.local_id = 1
  issue_y_1.reporter_id = 2002

  def setUp(self):
    self.project_service = fake.ProjectService()
    self.project_service.TestAddProject('proj-x', project_id=789)
    self.project_service.TestAddProject('proj-y', project_id=678)
    self.cnxn = 'fake connection'

  def testGetAllIssueProjects_Empty(self):
    self.assertEqual(
        {}, tracker_helpers.GetAllIssueProjects(
            self.cnxn, [], self.project_service))

  def testGetAllIssueProjects_Normal(self):
    self.assertEqual(
        {789: self.project_service.GetProjectByName(self.cnxn, 'proj-x')},
        tracker_helpers.GetAllIssueProjects(
            self.cnxn, [self.issue_x_1, self.issue_x_2], self.project_service))
    self.assertEqual(
        {789: self.project_service.GetProjectByName(self.cnxn, 'proj-x'),
         678: self.project_service.GetProjectByName(self.cnxn, 'proj-y')},
        tracker_helpers.GetAllIssueProjects(
            self.cnxn, [self.issue_x_1, self.issue_x_2, self.issue_y_1],
            self.project_service))


class FilterOutNonViewableIssuesTest(unittest.TestCase):
  owner_id = 111
  committer_id = 222
  nonmember_1_id = 1002
  nonmember_2_id = 2002
  nonmember_3_id = 3002

  issue1 = tracker_pb2.Issue()
  issue1.project_name = 'proj'
  issue1.project_id = 789
  issue1.local_id = 1
  issue1.reporter_id = nonmember_1_id

  issue2 = tracker_pb2.Issue()
  issue2.project_name = 'proj'
  issue2.project_id = 789
  issue2.local_id = 2
  issue2.reporter_id = nonmember_2_id
  issue2.labels.extend(['foo', 'bar'])

  issue3 = tracker_pb2.Issue()
  issue3.project_name = 'proj'
  issue3.project_id = 789
  issue3.local_id = 3
  issue3.reporter_id = nonmember_3_id
  issue3.labels.extend(['restrict-view-commit'])

  issue4 = tracker_pb2.Issue()
  issue4.project_name = 'proj'
  issue4.project_id = 789
  issue4.local_id = 4
  issue4.reporter_id = nonmember_3_id
  issue4.labels.extend(['Foo', 'Restrict-View-Commit'])

  def setUp(self):
    self.user = user_pb2.User()
    self.project = self.MakeProject(project_pb2.ProjectState.LIVE)
    self.config = tracker_bizobj.MakeDefaultProjectIssueConfig(
        self.project.project_id)
    self.project_dict = {self.project.project_id: self.project}
    self.config_dict = {self.config.project_id: self.config}

  def MakeProject(self, state):
    p = project_pb2.Project(
        project_id=789, project_name='proj', state=state,
        owner_ids=[self.owner_id], committer_ids=[self.committer_id])
    return p

  def testFilterOutNonViewableIssues_Member(self):
    # perms will be permissions.COMMITTER_ACTIVE_PERMISSIONSET
    filtered_issues = tracker_helpers.FilterOutNonViewableIssues(
        {self.committer_id}, self.user, self.project_dict,
        self.config_dict,
        [self.issue1, self.issue2, self.issue3, self.issue4])
    self.assertListEqual([1, 2, 3, 4],
                         [issue.local_id for issue in filtered_issues])

  def testFilterOutNonViewableIssues_Owner(self):
    # perms will be permissions.OWNER_ACTIVE_PERMISSIONSET
    filtered_issues = tracker_helpers.FilterOutNonViewableIssues(
        {self.owner_id}, self.user, self.project_dict, self.config_dict,
        [self.issue1, self.issue2, self.issue3, self.issue4])
    self.assertListEqual([1, 2, 3, 4],
                         [issue.local_id for issue in filtered_issues])

  def testFilterOutNonViewableIssues_Empty(self):
    # perms will be permissions.COMMITTER_ACTIVE_PERMISSIONSET
    filtered_issues = tracker_helpers.FilterOutNonViewableIssues(
        {self.committer_id}, self.user, self.project_dict,
        self.config_dict, [])
    self.assertListEqual([], filtered_issues)

  def testFilterOutNonViewableIssues_NonMember(self):
    # perms will be permissions.READ_ONLY_PERMISSIONSET
    filtered_issues = tracker_helpers.FilterOutNonViewableIssues(
        {self.nonmember_1_id}, self.user, self.project_dict,
        self.config_dict, [self.issue1, self.issue2, self.issue3, self.issue4])
    self.assertListEqual([1, 2],
                         [issue.local_id for issue in filtered_issues])

  def testFilterOutNonViewableIssues_Reporter(self):
    # perms will be permissions.READ_ONLY_PERMISSIONSET
    filtered_issues = tracker_helpers.FilterOutNonViewableIssues(
        {self.nonmember_3_id}, self.user, self.project_dict,
        self.config_dict, [self.issue1, self.issue2, self.issue3, self.issue4])
    self.assertListEqual([1, 2, 3, 4],
                         [issue.local_id for issue in filtered_issues])


class IssueMergeTest(unittest.TestCase):

  def setUp(self):
    self.cnxn = 'fake cnxn'
    self.services = service_manager.Services(
        config=fake.ConfigService(),
        issue=fake.IssueService(),
        user=fake.UserService(),
        project=fake.ProjectService(),
        issue_star=fake.IssueStarService(),
        spam=fake.SpamService()
    )
    self.project = self.services.project.TestAddProject('proj', project_id=987)
    self.config = tracker_bizobj.MakeDefaultProjectIssueConfig(
        self.project.project_id)
    self.project_dict = {self.project.project_id: self.project}
    self.config_dict = {self.config.project_id: self.config}

  def testParseMergeFields_NotSpecified(self):
    issue = fake.MakeTestIssue(987, 1, 'summary', 'New', 111)
    errors = template_helpers.EZTError()
    post_data = {}

    text, merge_into_issue = tracker_helpers.ParseMergeFields(
        self.cnxn, None, 'proj', post_data, 'New', self.config, issue, errors)
    self.assertEqual('', text)
    self.assertEqual(None, merge_into_issue)

    text, merge_into_issue = tracker_helpers.ParseMergeFields(
        self.cnxn, None, 'proj', post_data, 'Duplicate', self.config, issue,
        errors)
    self.assertEqual('', text)
    self.assertTrue(errors.merge_into_id)
    self.assertEqual(None, merge_into_issue)

  def testParseMergeFields_WrongStatus(self):
    issue = fake.MakeTestIssue(987, 1, 'summary', 'New', 111)
    errors = template_helpers.EZTError()
    post_data = {'merge_into': '12'}

    text, merge_into_issue = tracker_helpers.ParseMergeFields(
        self.cnxn, None, 'proj', post_data, 'New', self.config, issue, errors)
    self.assertEqual('', text)
    self.assertEqual(None, merge_into_issue)

  def testParseMergeFields_NoSuchIssue(self):
    issue = fake.MakeTestIssue(987, 1, 'summary', 'New', 111)
    issue.merged_into = 12
    errors = template_helpers.EZTError()
    post_data = {'merge_into': '12'}

    text, merge_into_issue = tracker_helpers.ParseMergeFields(
        self.cnxn, self.services, 'proj', post_data, 'Duplicate',
        self.config, issue, errors)
    self.assertEqual('12', text)
    self.assertEqual(None, merge_into_issue)

  def testParseMergeFields_DontSelfMerge(self):
    issue = fake.MakeTestIssue(987, 1, 'summary', 'New', 111)
    errors = template_helpers.EZTError()
    post_data = {'merge_into': '1'}

    text, merge_into_issue = tracker_helpers.ParseMergeFields(
        self.cnxn, self.services, 'proj', post_data, 'Duplicate', self.config,
        issue, errors)
    self.assertEqual('1', text)
    self.assertEqual(None, merge_into_issue)
    self.assertEqual('Cannot merge issue into itself', errors.merge_into_id)

  def testParseMergeFields_NewIssueToMerge(self):
    merged_issue = fake.MakeTestIssue(
        self.project.project_id,
        1,
        'unused_summary',
        'unused_status',
        111,
        reporter_id=111)
    self.services.issue.TestAddIssue(merged_issue)
    mergee_issue = fake.MakeTestIssue(
        self.project.project_id,
        2,
        'unused_summary',
        'unused_status',
        111,
        reporter_id=111)
    self.services.issue.TestAddIssue(mergee_issue)

    errors = template_helpers.EZTError()
    post_data = {'merge_into': str(mergee_issue.local_id)}

    text, merge_into_issue = tracker_helpers.ParseMergeFields(
        self.cnxn, self.services, 'proj', post_data, 'Duplicate', self.config,
        merged_issue, errors)
    self.assertEqual(str(mergee_issue.local_id), text)
    self.assertEqual(mergee_issue, merge_into_issue)

  def testCanEditProjectIssue(self):
    mr = testing_helpers.MakeMonorailRequest()
    issue = fake.MakeTestIssue(
        self.project.project_id, 1, 'summary', 'New', 111)
    issue.project_name = self.project.project_name

    non_member_not_allowed = tracker_helpers.CanEditProjectIssue(
        mr, self.project, issue, None)
    self.assertEqual(False, non_member_not_allowed)

    committer_id = 3
    self.project.committer_ids.extend([committer_id])
    mr.auth.effective_ids.add(committer_id)
    committer_allowed = tracker_helpers.CanEditProjectIssue(
        mr, self.project, issue, None)
    self.assertEqual(True, committer_allowed)

    self.project.state = project_pb2.ProjectState.ARCHIVED
    committer_read_only_not_allowed = tracker_helpers.CanEditProjectIssue(
        mr, self.project, issue, None)
    self.assertEqual(False, committer_read_only_not_allowed)

    owner_id = 1
    self.project.owner_ids.extend([owner_id])
    mr.auth.effective_ids.add(owner_id)
    owner_read_only_not_allowed = tracker_helpers.CanEditProjectIssue(
        mr, self.project, issue, None)
    self.assertEqual(False, owner_read_only_not_allowed)

  def testMergeIssueStars(self):
    mr = testing_helpers.MakeMonorailRequest()
    mr.project_name = self.project.project_name
    mr.project = self.project

    config = self.services.config.GetProjectConfig(
        self.cnxn, self.project.project_id)
    self.services.issue_star.SetStar(
        self.cnxn, self.services, config, 1, 1, True)
    self.services.issue_star.SetStar(
        self.cnxn, self.services, config, 1, 2, True)
    self.services.issue_star.SetStar(
        self.cnxn, self.services, config, 1, 3, True)
    self.services.issue_star.SetStar(
        self.cnxn, self.services, config, 3, 3, True)
    self.services.issue_star.SetStar(
        self.cnxn, self.services, config, 3, 6, True)
    self.services.issue_star.SetStar(
        self.cnxn, self.services, config, 2, 3, True)
    self.services.issue_star.SetStar(
        self.cnxn, self.services, config, 2, 4, True)
    self.services.issue_star.SetStar(
        self.cnxn, self.services, config, 2, 5, True)

    new_starrers = tracker_helpers.GetNewIssueStarrers(
        self.cnxn, self.services, [1, 3], 2)
    six.assertCountEqual(self, new_starrers, [1, 2, 6])
    tracker_helpers.AddIssueStarrers(
        self.cnxn, self.services, mr, 2, self.project, new_starrers)
    issue_2_starrers = self.services.issue_star.LookupItemStarrers(
        self.cnxn, 2)
    # XXX(jrobbins): these tests incorrectly mix local IDs with IIDs.
    six.assertCountEqual(self, [1, 2, 3, 4, 5, 6], issue_2_starrers)


class MergeLinkedMembersTest(unittest.TestCase):

  def setUp(self):
    self.cnxn = 'fake cnxn'
    self.services = service_manager.Services(
        user=fake.UserService())
    self.user1 = self.services.user.TestAddUser('one@example.com', 111)
    self.user2 = self.services.user.TestAddUser('two@example.com', 222)

  def testNoLinkedAccounts(self):
    """When no candidate accounts are linked, they are all returned."""
    actual = tracker_helpers._MergeLinkedMembers(
        self.cnxn, self.services.user, [111, 222])
    self.assertEqual([111, 222], actual)

  def testSomeLinkedButNoMasking(self):
    """If an account has linked accounts, but they are not here, keep it."""
    self.user1.linked_child_ids = [999]
    self.user2.linked_parent_id = 999
    actual = tracker_helpers._MergeLinkedMembers(
        self.cnxn, self.services.user, [111, 222])
    self.assertEqual([111, 222], actual)

  def testParentMasksChild(self):
    """When two accounts linked, only the parent is returned."""
    self.user2.linked_parent_id = 111
    actual = tracker_helpers._MergeLinkedMembers(
        self.cnxn, self.services.user, [111, 222])
    self.assertEqual([111], actual)


class FilterMemberDataTest(unittest.TestCase):

  def setUp(self):
    services = service_manager.Services(
        project=fake.ProjectService(),
        config=fake.ConfigService(),
        issue=fake.IssueService(),
        user=fake.UserService())
    self.owner_email = 'owner@dom.com'
    self.committer_email = 'commit@dom.com'
    self.contributor_email = 'contrib@dom.com'
    self.indirect_member_email = 'ind@dom.com'
    self.all_emails = [self.owner_email, self.committer_email,
                       self.contributor_email, self.indirect_member_email]
    self.project = services.project.TestAddProject('proj')

  def DoFiltering(self, perms, unsigned_user=False):
    mr = testing_helpers.MakeMonorailRequest(
        project=self.project, perms=perms)
    if not unsigned_user:
      mr.auth.user_id = 111
      mr.auth.user_view = testing_helpers.Blank(domain='jrobbins.org')
    return tracker_helpers._FilterMemberData(
        mr, [self.owner_email], [self.committer_email],
        [self.contributor_email], [self.indirect_member_email], mr.project)

  def testUnsignedUser_NormalProject(self):
    visible_members = self.DoFiltering(
        permissions.READ_ONLY_PERMISSIONSET, unsigned_user=True)
    six.assertCountEqual(
        self, [
            self.owner_email, self.committer_email, self.contributor_email,
            self.indirect_member_email
        ], visible_members)

  def testUnsignedUser_RestrictedProject(self):
    self.project.only_owners_see_contributors = True
    visible_members = self.DoFiltering(
        permissions.READ_ONLY_PERMISSIONSET, unsigned_user=True)
    six.assertCountEqual(
        self,
        [self.owner_email, self.committer_email, self.indirect_member_email],
        visible_members)

  def testOwnersAndAdminsCanSeeAll_NormalProject(self):
    visible_members = self.DoFiltering(
        permissions.OWNER_ACTIVE_PERMISSIONSET)
    six.assertCountEqual(self, self.all_emails, visible_members)

    visible_members = self.DoFiltering(
        permissions.ADMIN_PERMISSIONSET)
    six.assertCountEqual(self, self.all_emails, visible_members)

  def testOwnersAndAdminsCanSeeAll_HubAndSpoke(self):
    self.project.only_owners_see_contributors = True

    visible_members = self.DoFiltering(
        permissions.OWNER_ACTIVE_PERMISSIONSET)
    six.assertCountEqual(self, self.all_emails, visible_members)

    visible_members = self.DoFiltering(
        permissions.ADMIN_PERMISSIONSET)
    six.assertCountEqual(self, self.all_emails, visible_members)

    visible_members = self.DoFiltering(
        permissions.COMMITTER_ACTIVE_PERMISSIONSET)
    six.assertCountEqual(self, self.all_emails, visible_members)

  def testNonOwnersCanSeeAll_NormalProject(self):
    visible_members = self.DoFiltering(
        permissions.COMMITTER_ACTIVE_PERMISSIONSET)
    six.assertCountEqual(self, self.all_emails, visible_members)

    visible_members = self.DoFiltering(
        permissions.CONTRIBUTOR_ACTIVE_PERMISSIONSET)
    six.assertCountEqual(self, self.all_emails, visible_members)

  def testCommittersSeeOnlySameDomain_HubAndSpoke(self):
    self.project.only_owners_see_contributors = True

    visible_members = self.DoFiltering(
        permissions.CONTRIBUTOR_ACTIVE_PERMISSIONSET)
    six.assertCountEqual(
        self,
        [self.owner_email, self.committer_email, self.indirect_member_email],
        visible_members)


class GetLabelOptionsTest(unittest.TestCase):

  @mock.patch('tracker.tracker_helpers.LabelsNotMaskedByFields')
  def testGetLabelOptions(self, mockLabelsNotMaskedByFields):
    mockLabelsNotMaskedByFields.return_value = []
    config = tracker_pb2.ProjectIssueConfig()
    custom_perms = []
    actual = tracker_helpers.GetLabelOptions(config, custom_perms)
    expected = [
      {'doc': 'Only users who can edit the issue may access it',
       'name': 'Restrict-View-EditIssue'},
      {'doc': 'Only users who can edit the issue may add comments',
       'name': 'Restrict-AddIssueComment-EditIssue'},
      {'doc': 'Custom permission CoreTeam is needed to access',
       'name': 'Restrict-View-CoreTeam'}
    ]
    self.assertEqual(expected, actual)

  def testBuildRestrictionChoices(self):
    choices = tracker_helpers._BuildRestrictionChoices([], [], [])
    self.assertEqual([], choices)

    choices = tracker_helpers._BuildRestrictionChoices(
        [], ['Hop', 'Jump'], [])
    self.assertEqual([], choices)

    freq = [('View', 'B', 'You need permission B to do anything'),
            ('A', 'B', 'You need B to use A')]
    choices = tracker_helpers._BuildRestrictionChoices(freq, [], [])
    expected = [dict(name='Restrict-View-B',
                     doc='You need permission B to do anything'),
                dict(name='Restrict-A-B',
                     doc='You need B to use A')]
    self.assertListEqual(expected, choices)

    extra_perms = ['Over18', 'Over21']
    choices = tracker_helpers._BuildRestrictionChoices(
        [], ['Drink', 'Smoke'], extra_perms)
    expected = [dict(name='Restrict-Drink-Over18',
                     doc='Permission Over18 needed to use Drink'),
                dict(name='Restrict-Drink-Over21',
                     doc='Permission Over21 needed to use Drink'),
                dict(name='Restrict-Smoke-Over18',
                     doc='Permission Over18 needed to use Smoke'),
                dict(name='Restrict-Smoke-Over21',
                     doc='Permission Over21 needed to use Smoke')]
    self.assertListEqual(expected, choices)


class FilterKeptAttachmentsTest(unittest.TestCase):
  def testFilterKeptAttachments(self):
    comments = [
        tracker_pb2.IssueComment(
            is_description=True,
            attachments=[tracker_pb2.Attachment(attachment_id=1)]),
        tracker_pb2.IssueComment(),
        tracker_pb2.IssueComment(
            is_description=True,
            attachments=[
                tracker_pb2.Attachment(attachment_id=2),
                tracker_pb2.Attachment(attachment_id=3)]),
        tracker_pb2.IssueComment(),
        tracker_pb2.IssueComment(
            approval_id=24,
            is_description=True,
            attachments=[tracker_pb2.Attachment(attachment_id=4)])]

    filtered = tracker_helpers.FilterKeptAttachments(
        True, [1, 2, 3, 4], comments, None)
    self.assertEqual([2, 3], filtered)

  def testApprovalDescription(self):
    comments = [
        tracker_pb2.IssueComment(
            is_description=True,
            attachments=[tracker_pb2.Attachment(attachment_id=1)]),
        tracker_pb2.IssueComment(),
        tracker_pb2.IssueComment(
            is_description=True,
            attachments=[
                tracker_pb2.Attachment(attachment_id=2),
                tracker_pb2.Attachment(attachment_id=3)]),
        tracker_pb2.IssueComment(),
        tracker_pb2.IssueComment(
            approval_id=24,
            is_description=True,
            attachments=[tracker_pb2.Attachment(attachment_id=4)])]

    filtered = tracker_helpers.FilterKeptAttachments(
        True, [1, 2, 3, 4], comments, 24)
    self.assertEqual([4], filtered)

  def testNotAnIssueDescription(self):
    comments = [
        tracker_pb2.IssueComment(
            is_description=True,
            attachments=[tracker_pb2.Attachment(attachment_id=1)]),
        tracker_pb2.IssueComment(),
        tracker_pb2.IssueComment(
            is_description=True,
            attachments=[
                tracker_pb2.Attachment(attachment_id=2),
                tracker_pb2.Attachment(attachment_id=3)]),
        tracker_pb2.IssueComment(),
        tracker_pb2.IssueComment(
            approval_id=24,
            is_description=True,
            attachments=[tracker_pb2.Attachment(attachment_id=4)])]

    filtered = tracker_helpers.FilterKeptAttachments(
        False, [1, 2, 3, 4], comments, None)
    self.assertIsNone(filtered)

  def testNoDescriptionsInComments(self):
    comments = [
        tracker_pb2.IssueComment(),
        tracker_pb2.IssueComment()]

    filtered = tracker_helpers.FilterKeptAttachments(
        True, [1, 2, 3, 4], comments, None)
    self.assertEqual([], filtered)

  def testNoComments(self):
    filtered = tracker_helpers.FilterKeptAttachments(
        True, [1, 2, 3, 4], [], None)
    self.assertEqual([], filtered)


class EnumFieldHelpersTest(unittest.TestCase):

  def test_GetEnumFieldValuesAndDocstrings(self):
    """We can get all choices for an enum field"""
    fd = tracker_pb2.FieldDef(
        field_id=123,
        project_id=1,
        field_name='yellow',
        field_type=tracker_pb2.FieldTypes.ENUM_TYPE)
    ld_1 = tracker_pb2.LabelDef(
        label='yellow-submarine', label_docstring='ld_1_docstring')
    ld_2 = tracker_pb2.LabelDef(
        label='yellow-tisket', label_docstring='ld_2_docstring')
    ld_3 = tracker_pb2.LabelDef(
        label='yellow-basket', label_docstring='ld_3_docstring')
    ld_4 = tracker_pb2.LabelDef(
        label='yellow', label_docstring='ld_4_docstring')
    ld_5 = tracker_pb2.LabelDef(
        label='not-yellow', label_docstring='ld_5_docstring')
    ld_6 = tracker_pb2.LabelDef(
        label='yellow-tasket',
        label_docstring='ld_6_docstring',
        deprecated=True)
    config = tracker_pb2.ProjectIssueConfig(
        default_template_for_developers=1,
        default_template_for_users=2,
        well_known_labels=[ld_1, ld_2, ld_3, ld_4, ld_5, ld_6])
    actual = tracker_helpers._GetEnumFieldValuesAndDocstrings(fd, config)
    # Expect to omit labels `yellow` and `not-yellow` due to prefix mismatch
    # Also expect to omit label `yellow-tasket` because it's deprecated
    expected = [
        ('submarine', 'ld_1_docstring'), ('tisket', 'ld_2_docstring'),
        ('basket', 'ld_3_docstring')
    ]
    self.assertEqual(expected, actual)


class CreateIssueHelpersTest(unittest.TestCase):

  def setUp(self):
    self.services = service_manager.Services(
        project=fake.ProjectService(),
        config=fake.ConfigService(),
        issue=fake.IssueService(),
        user=fake.UserService(),
        usergroup=fake.UserGroupService())
    self.cnxn = 'fake cnxn'

    self.project_member = self.services.user.TestAddUser(
        'user_1@example.com', 111)
    self.project_group_member = self.services.user.TestAddUser(
        'group@example.com', 999)
    self.project = self.services.project.TestAddProject(
        'proj',
        project_id=789,
        committer_ids=[
            self.project_member.user_id, self.project_group_member.user_id
        ])
    self.no_project_user = self.services.user.TestAddUser(
        'user_2@example.com', 222)
    self.config = fake.MakeTestConfig(self.project.project_id, [], [])
    self.int_fd = tracker_bizobj.MakeFieldDef(
        123, 789, 'CPU', tracker_pb2.FieldTypes.INT_TYPE, None, '', False,
        False, False, None, None, '', False, '', '',
        tracker_pb2.NotifyTriggers.NEVER, 'no_action', 'doc', False)
    self.int_fd.max_value = 999
    self.config.field_defs = [self.int_fd]
    self.status_1 = tracker_pb2.StatusDef(
        status='New', means_open=True, status_docstring='status_1 docstring')
    self.config.well_known_statuses = [self.status_1]
    self.component_def_1 = tracker_pb2.ComponentDef(
        component_id=1, path='compFOO')
    self.component_def_2 = tracker_pb2.ComponentDef(
        component_id=2, path='deprecated', deprecated=True)
    self.config.component_defs = [self.component_def_1, self.component_def_2]
    self.services.config.StoreConfig('cnxn', self.config)
    self.services.usergroup.TestAddGroupSettings(999, 'group@example.com')

  def testAssertValidIssueForCreate_Valid(self):
    input_issue = tracker_pb2.Issue(
        summary='sum',
        status='New',
        owner_id=111,
        project_id=789,
        component_ids=[1],
        cc_ids=[999])
    tracker_helpers.AssertValidIssueForCreate(
        self.cnxn, self.services, input_issue, 'nonempty description')

  def testAssertValidIssueForCreate_ValidatesLabels(self):
    input_issue = tracker_pb2.Issue(
        summary='sum',
        labels=['freeze_new_label'],
        status='New',
        owner_id=111,
        project_id=789)
    with self.assertRaisesRegex(
        exceptions.InputException,
        ("The creation of new labels is blocked for the Chromium project"
         " in Monorail. To continue with editing your issue, please"
         " remove: freeze_new_label label\\(s\\)")):
      tracker_helpers.AssertValidIssueForCreate(
          self.cnxn, self.services, input_issue, 'nonempty description')

  def testAssertValidIssueForCreate_ValidatesOwner(self):
    input_issue = tracker_pb2.Issue(
        summary='sum', status='New', owner_id=222, project_id=789)
    with self.assertRaisesRegex(exceptions.InputException,
                                'Issue owner must be a project member'):
      tracker_helpers.AssertValidIssueForCreate(
          self.cnxn, self.services, input_issue, 'nonempty description')
    input_issue.owner_id = 333
    with self.assertRaisesRegex(exceptions.InputException,
                                'Issue owner user ID not found'):
      tracker_helpers.AssertValidIssueForCreate(
          self.cnxn, self.services, input_issue, 'nonempty description')
    input_issue.owner_id = 999
    with self.assertRaisesRegex(exceptions.InputException,
                                'Issue owner cannot be a user group'):
      tracker_helpers.AssertValidIssueForCreate(
          self.cnxn, self.services, input_issue, 'nonempty description')

  def testAssertValidIssueForCreate_ValidatesSummary(self):
    input_issue = tracker_pb2.Issue(
        summary='', status='New', owner_id=111, project_id=789)
    with self.assertRaisesRegex(exceptions.InputException,
                                'Summary is required'):
      tracker_helpers.AssertValidIssueForCreate(
          self.cnxn, self.services, input_issue, 'nonempty description')
      input_issue.summary = '   '
      tracker_helpers.AssertValidIssueForCreate(
          self.cnxn, self.services, input_issue, 'nonempty description')

  def testAssertValidIssueForCreate_ValidatesDescription(self):
    input_issue = tracker_pb2.Issue(
        summary='sum', status='New', owner_id=111, project_id=789)
    with self.assertRaisesRegex(exceptions.InputException,
                                'Description is required'):
      tracker_helpers.AssertValidIssueForCreate(
          self.cnxn, self.services, input_issue, '')
      tracker_helpers.AssertValidIssueForCreate(
          self.cnxn, self.services, input_issue, '    ')

  def testAssertValidIssueForCreate_ValidatesFieldDef(self):
    fv = tracker_bizobj.MakeFieldValue(
        self.int_fd.field_id, 1000, None, None, None, None, False)
    input_issue = tracker_pb2.Issue(
        summary='sum',
        status='New',
        owner_id=111,
        project_id=789,
        field_values=[fv])
    with self.assertRaises(exceptions.InputException):
      tracker_helpers.AssertValidIssueForCreate(
          self.cnxn, self.services, input_issue, 'nonempty description')

  def testAssertValidIssueForCreate_ValidatesStatus(self):
    input_issue = tracker_pb2.Issue(
        summary='sum', status='DNE_status', owner_id=111, project_id=789)

    def mock_status_lookup(*_args, **_kwargs):
      return None

    self.services.config.LookupStatusID = mock_status_lookup
    with self.assertRaisesRegex(exceptions.InputException,
                                'Undefined status: DNE_status'):
      tracker_helpers.AssertValidIssueForCreate(
          self.cnxn, self.services, input_issue, 'nonempty description')

  def testAssertValidIssueForCreate_ValidatesComponents(self):
    # Tests an undefined component.
    input_issue = tracker_pb2.Issue(
        summary='',
        status='New',
        owner_id=111,
        project_id=789,
        component_ids=[3])
    with self.assertRaisesRegex(exceptions.InputException,
                                'Undefined or deprecated component with id: 3'):
      tracker_helpers.AssertValidIssueForCreate(
          self.cnxn, self.services, input_issue, 'nonempty description')

    # Tests a deprecated component.
    input_issue = tracker_pb2.Issue(
        summary='',
        status='New',
        owner_id=111,
        project_id=789,
        component_ids=[self.component_def_2.component_id])
    with self.assertRaisesRegex(exceptions.InputException,
                                'Undefined or deprecated component with id: 2'):
      tracker_helpers.AssertValidIssueForCreate(
          self.cnxn, self.services, input_issue, 'nonempty description')

  def testAssertValidIssueForCreate_ValidatesUsers(self):
    user_fd = tracker_bizobj.MakeFieldDef(
        123, 789, 'CPU', tracker_pb2.FieldTypes.INT_TYPE, None, '', False,
        False, False, None, None, '', False, '', '',
        tracker_pb2.NotifyTriggers.NEVER, 'no_action', 'doc', False)
    self.services.config.TestAddFieldDef(user_fd)

    input_issue = tracker_pb2.Issue(
        summary='sum',
        status='New',
        owner_id=111,
        project_id=789,
        cc_ids=[123],
        field_values=[
            tracker_bizobj.MakeFieldValue(
                user_fd.field_id, None, None, 124, None, None, False)
        ])
    copied_issue = copy.deepcopy(input_issue)
    with self.assertRaisesRegex(exceptions.InputException,
                                r'users/123: .+\nusers/124: .+'):
      tracker_helpers.AssertValidIssueForCreate(
          self.cnxn, self.services, input_issue, 'nonempty description')
    self.assertEqual(input_issue, copied_issue)

    self.services.user.TestAddUser('a@test.com', 123)
    self.services.user.TestAddUser('a@test.com', 124)
    tracker_helpers.AssertValidIssueForCreate(
        self.cnxn, self.services, input_issue, 'nonempty description')
    self.assertEqual(input_issue, copied_issue)


class ModifyIssuesHelpersTest(unittest.TestCase):

  def setUp(self):
    self.services = service_manager.Services(
        project=fake.ProjectService(),
        config=fake.ConfigService(),
        issue=fake.IssueService(),
        issue_star=fake.IssueStarService(),
        user=fake.UserService(),
        usergroup=fake.UserGroupService())
    self.cnxn = 'fake cnxn'

    self.project_member = self.services.user.TestAddUser(
        'user_1@example.com', 111)
    self.project = self.services.project.TestAddProject(
        'proj', project_id=789, committer_ids=[self.project_member.user_id])
    self.no_project_user = self.services.user.TestAddUser(
        'user_2@example.com', 222)

    self.config = fake.MakeTestConfig(self.project.project_id, [], [])
    self.int_fd = tracker_bizobj.MakeFieldDef(
        123, 789, 'CPU', tracker_pb2.FieldTypes.INT_TYPE, None, '', False,
        False, False, None, None, '', False, '', '',
        tracker_pb2.NotifyTriggers.NEVER, 'no_action', 'doc', False)
    self.int_fd.max_value = 999
    self.config.field_defs = [self.int_fd]
    self.services.config.StoreConfig('cnxn', self.config)

  def testApplyAllIssueChanges(self):
    issue_delta_pairs = []
    no_change_iid = 78942

    expected_issues_to_update = {}
    expected_amendments = {}
    expected_imp_amendments = {}
    expected_old_owners = {}
    expected_old_statuses = {}
    expected_old_components = {}
    expected_merged_from_add = {}
    expected_new_starrers = {}

    issue_main = _Issue('proj', 100)
    issue_main_ref = ('proj', issue_main.local_id)
    issue_main.owner_id = 999
    issue_main.cc_ids = [111, 222]
    issue_main.labels = ['dont_touch', 'remove_me']

    expected_main = copy.deepcopy(issue_main)
    expected_main.owner_id = 888
    expected_main.cc_ids = [111, 333]
    expected_main.labels = ['dont_touch', 'add_me']
    expected_amendments[issue_main.issue_id] = [
        tracker_bizobj.MakeOwnerAmendment(888, 999),
        tracker_bizobj.MakeCcAmendment([333], [222]),
        tracker_bizobj.MakeLabelsAmendment(['add_me'], ['remove_me'])
    ]
    expected_old_owners[issue_main.issue_id] = 999

    # blocked_on issues changes setup.
    bo_add = _Issue('proj', 1)
    self.services.issue.TestAddIssue(bo_add)
    expected_bo_add = copy.deepcopy(bo_add)
    # All impacted issues should be fetched within ApplyAllIssueChanges
    # directly from the DB, skipping cache with `use_cache=False` in GetIssue().
    # So we expect these issues to have assume_stale=False.
    expected_bo_add.assume_stale = False
    expected_bo_add.blocking_iids = [issue_main.issue_id]
    expected_issues_to_update[expected_bo_add.issue_id] = expected_bo_add
    expected_imp_amendments[bo_add.issue_id] = [
        tracker_bizobj.MakeBlockingAmendment(
            [issue_main_ref], [], default_project_name='proj')
    ]

    bo_remove = _Issue('proj', 2)
    bo_remove.blocking_iids = [issue_main.issue_id]
    self.services.issue.TestAddIssue(bo_remove)
    expected_bo_remove = copy.deepcopy(bo_remove)
    expected_bo_remove.assume_stale = False
    expected_bo_remove.blocking_iids = []
    expected_issues_to_update[expected_bo_remove.issue_id] = expected_bo_remove
    expected_imp_amendments[bo_remove.issue_id] = [
        tracker_bizobj.MakeBlockingAmendment(
            [], [issue_main_ref], default_project_name='proj')
    ]

    issue_main.blocked_on_iids = [no_change_iid, bo_remove.issue_id]
    # By default new blocked_on issues that appear in blocked_on_iids
    # with no prior rank associated with it are un-ranked and assigned rank 0.
    # See SortBlockedOn in issue_svc.py.
    issue_main.blocked_on_ranks = [0, 0]
    expected_main.blocked_on_iids = [no_change_iid, bo_add.issue_id]
    expected_main.blocked_on_ranks = [0, 0]
    expected_amendments[issue_main.issue_id].append(
        tracker_bizobj.MakeBlockedOnAmendment(
            [('proj', bo_add.local_id)], [('proj', bo_remove.local_id)],
            default_project_name='proj'))

    # blocking_issues changes setup.
    b_add = _Issue('proj', 3)
    self.services.issue.TestAddIssue(b_add)
    expected_b_add = copy.deepcopy(b_add)
    expected_b_add.assume_stale = False
    expected_b_add.blocked_on_iids = [issue_main.issue_id]
    expected_b_add.blocked_on_ranks = [0]
    expected_issues_to_update[expected_b_add.issue_id] = expected_b_add
    expected_imp_amendments[b_add.issue_id] = [
        tracker_bizobj.MakeBlockedOnAmendment(
            [issue_main_ref], [], default_project_name='proj')
    ]

    b_remove = _Issue('proj', 4)
    b_remove.blocked_on_iids = [issue_main.issue_id]
    self.services.issue.TestAddIssue(b_remove)
    expected_b_remove = copy.deepcopy(b_remove)
    expected_b_remove.assume_stale = False
    expected_b_remove.blocked_on_iids = []
    # Test we can process delta changes and impact changes.
    delta_b_remove = tracker_pb2.IssueDelta(labels_add=['more_chickens'])
    expected_b_remove.labels = ['more_chickens']
    issue_delta_pairs.append((b_remove, delta_b_remove))
    expected_issues_to_update[expected_b_remove.issue_id] = expected_b_remove
    expected_imp_amendments[b_remove.issue_id] = [
        tracker_bizobj.MakeBlockedOnAmendment(
            [], [issue_main_ref], default_project_name='proj')
    ]
    expected_amendments[b_remove.issue_id] = [
        tracker_bizobj.MakeLabelsAmendment(['more_chickens'], [])
    ]

    issue_main.blocking_iids = [no_change_iid, b_remove.issue_id]
    expected_main.blocking_iids = [no_change_iid, b_add.issue_id]
    expected_amendments[issue_main.issue_id].append(
        tracker_bizobj.MakeBlockingAmendment(
            [('proj', b_add.local_id)], [('proj', b_remove.local_id)],
            default_project_name='proj'))

    # Merged issues changes setup.
    merge_remove = _Issue('proj', 5)
    self.services.issue.TestAddIssue(merge_remove)
    expected_merge_remove = copy.deepcopy(merge_remove)
    expected_merge_remove.assume_stale = False
    expected_issues_to_update[
        expected_merge_remove.issue_id] = expected_merge_remove
    expected_imp_amendments[merge_remove.issue_id] = [
        tracker_bizobj.MakeMergedIntoAmendment(
            [], [issue_main_ref], default_project_name='proj')
    ]

    merge_add = _Issue('proj', 6)
    self.services.issue.TestAddIssue(merge_add)
    expected_merge_add = copy.deepcopy(merge_add)
    expected_merge_add.assume_stale = False
    # We are adding 333 and removing 222 in issue_main with delta_main.
    expected_merge_add.cc_ids = sorted([expected_main.owner_id, 111, 333])
    expected_merged_from_add[expected_merge_add.issue_id] = [
        issue_main.issue_id
    ]

    expected_imp_amendments[merge_add.issue_id] = [
        tracker_bizobj.MakeCcAmendment(expected_merge_add.cc_ids, []),
        tracker_bizobj.MakeMergedIntoAmendment(
            [issue_main_ref], [], default_project_name='proj')
    ]
    # We are merging issue_main into merge_add, so issue_main's starrers
    # should be merged into merge_add's starrers.
    self.services.issue_star.SetStar(
        self.cnxn, self.services, None, issue_main.issue_id, 111, True)
    self.services.issue_star.SetStar(
        self.cnxn, self.services, None, issue_main.issue_id, 222, True)
    expected_merge_add.star_count = 2
    expected_new_starrers[merge_add.issue_id] = [222, 111]

    expected_issues_to_update[expected_merge_add.issue_id] = expected_merge_add


    issue_main.merged_into = merge_remove.issue_id
    expected_main.merged_into = merge_add.issue_id
    expected_amendments[issue_main.issue_id].append(
        tracker_bizobj.MakeMergedIntoAmendment(
            [('proj', merge_add.local_id)], [('proj', merge_remove.local_id)],
            default_project_name='proj'))

    self.services.issue.TestAddIssue(issue_main)
    expected_issues_to_update[expected_main.issue_id] = expected_main


    # Issues we'll put in delta_main.*_remove fields that aren't in issue_main.
    # These issues should not show up in issues_to_update.
    missing_1 = _Issue('proj', 404)
    expected_missing_1 = copy.deepcopy(missing_1)
    expected_missing_1.assume_stale = False
    self.services.issue.TestAddIssue(missing_1)
    missing_2 = _Issue('proj', 405)
    self.services.issue.TestAddIssue(missing_2)
    expected_missing_2 = copy.deepcopy(missing_2)
    expected_missing_2.assume_stale = False

    delta_main = tracker_pb2.IssueDelta(
        owner_id=888,
        cc_ids_remove=[222, 404], cc_ids_add=[333],
        labels_remove=['remove_me', 'remove_404'], labels_add=['add_me'],
        merged_into=merge_add.issue_id,
        blocked_on_add=[bo_add.issue_id],
        blocked_on_remove=[bo_remove.issue_id, missing_1.issue_id],
        blocking_add=[b_add.issue_id],
        blocking_remove=[b_remove.issue_id, missing_2.issue_id])
    issue_delta_pairs.append((issue_main, delta_main))

    actual_tuple = tracker_helpers.ApplyAllIssueChanges(
        self.cnxn, issue_delta_pairs, self.services)

    expected_tuple = tracker_helpers._IssueChangesTuple(
        expected_issues_to_update, expected_merged_from_add,
        expected_amendments, expected_imp_amendments, expected_old_owners,
        expected_old_statuses, expected_old_components, expected_new_starrers)
    self.assertEqual(actual_tuple, expected_tuple)

    self.assertEqual(missing_1, expected_missing_1)
    self.assertEqual(missing_2, expected_missing_2)

  def testApplyAllIssueChanges_NOOP(self):
    """Check we can ignore issue-delta pairs that are NOOP."""
    noop_issue = _Issue('proj', 1)
    bo_add_noop = _Issue('proj', 2)
    bo_remove_noop = _Issue('proj', 3)

    noop_issue.owner_id = 111
    noop_issue.cc_ids = [222]
    noop_issue.blocked_on_iids = [bo_add_noop.issue_id]
    bo_add_noop.blocking_iids = [noop_issue.issue_id]

    self.services.issue.TestAddIssue(noop_issue)
    self.services.issue.TestAddIssue(bo_add_noop)
    self.services.issue.TestAddIssue(bo_remove_noop)
    expected_noop_issue = copy.deepcopy(noop_issue)
    noop_delta = tracker_pb2.IssueDelta(
        owner_id=noop_issue.owner_id,
        cc_ids_add=noop_issue.cc_ids, cc_ids_remove=[333],
        blocked_on_add=noop_issue.blocked_on_iids,
        blocked_on_remove=[bo_remove_noop.issue_id])
    issue_delta_pairs = [(noop_issue, noop_delta)]

    actual_tuple = tracker_helpers.ApplyAllIssueChanges(
        self.cnxn, issue_delta_pairs, self.services)
    expected_tuple = tracker_helpers._IssueChangesTuple(
        {}, {}, {}, {}, {}, {}, {}, {})
    self.assertEqual(actual_tuple, expected_tuple)

    self.assertEqual(noop_issue, expected_noop_issue)

  def testApplyAllIssueChanges_Empty(self):
    issue_delta_pairs = []
    actual_tuple = tracker_helpers.ApplyAllIssueChanges(
        self.cnxn, issue_delta_pairs, self.services)
    expected_tuple = tracker_helpers._IssueChangesTuple(
        {}, {}, {}, {}, {}, {}, {}, {})
    self.assertEqual(actual_tuple, expected_tuple)

  def testUpdateClosedTimestamp(self):
    config = tracker_pb2.ProjectIssueConfig()
    config.well_known_statuses.append(
        tracker_pb2.StatusDef(status='New', means_open=True))
    config.well_known_statuses.append(
        tracker_pb2.StatusDef(status='Accepted', means_open=True))
    config.well_known_statuses.append(
        tracker_pb2.StatusDef(status='Old', means_open=False))
    config.well_known_statuses.append(
        tracker_pb2.StatusDef(status='Closed', means_open=False))

    issue = tracker_pb2.Issue()
    issue.local_id = 1234
    issue.status = 'New'

    # ensure the default value is undef
    self.assertTrue(not issue.closed_timestamp)

    # ensure transitioning to the same and other open states
    # doesn't set the timestamp
    issue.status = 'New'
    tracker_helpers.UpdateClosedTimestamp(config, issue, 'New')
    self.assertTrue(not issue.closed_timestamp)

    issue.status = 'Accepted'
    tracker_helpers.UpdateClosedTimestamp(config, issue, 'New')
    self.assertTrue(not issue.closed_timestamp)

    # ensure transitioning from open to closed sets the timestamp
    issue.status = 'Closed'
    tracker_helpers.UpdateClosedTimestamp(config, issue, 'Accepted')
    self.assertTrue(issue.closed_timestamp)

    # ensure that the timestamp is cleared when transitioning from
    # closed to open
    issue.status = 'New'
    tracker_helpers.UpdateClosedTimestamp(config, issue, 'Closed')
    self.assertTrue(not issue.closed_timestamp)

  def testGroupUniqueDeltaIssues(self):
    """We can identify unique IssueDeltas and group Issues by their deltas."""
    issue_1 = _Issue('proj', 1)
    delta_1 = tracker_pb2.IssueDelta(cc_ids_add=[111])

    issue_2 = _Issue('proj', 2)
    delta_2 = tracker_pb2.IssueDelta(cc_ids_add=[111], cc_ids_remove=[222])

    issue_3 = _Issue('proj', 3)
    delta_3 = tracker_pb2.IssueDelta(cc_ids_add=[111])

    issue_4 = _Issue('proj', 4)
    delta_4 = tracker_pb2.IssueDelta()

    issue_5 = _Issue('proj', 5)
    delta_5 = tracker_pb2.IssueDelta()

    issue_delta_pairs = [
        (issue_1, delta_1), (issue_2, delta_2), (issue_3, delta_3),
        (issue_4, delta_4), (issue_5, delta_5)
    ]
    unique_deltas, issues_for_deltas = tracker_helpers.GroupUniqueDeltaIssues(
        issue_delta_pairs)

    expected_unique_deltas = [delta_1, delta_2, delta_4]
    self.assertEqual(unique_deltas, expected_unique_deltas)
    expected_issues_for_deltas = [
        [issue_1, issue_3], [issue_2], [issue_4, issue_5]
    ]
    self.assertEqual(issues_for_deltas, expected_issues_for_deltas)

  def testEnforceAttachmentQuotaLimits(self):
    self.services.project.TestAddProject('Circe', project_id=798)
    issue_a1 = _Issue('Circe', 1, project_id=798)
    delta_a1 = tracker_pb2.IssueDelta()

    issue_a2 = _Issue('Circe', 2, project_id=798)
    delta_a2 = tracker_pb2.IssueDelta()

    self.services.project.TestAddProject('Patroclus', project_id=788)
    issue_b1 = _Issue('Patroclus', 1, project_id=788)
    delta_b1 = tracker_pb2.IssueDelta()

    issue_delta_pairs = [
        (issue_a1, delta_a1), (issue_a2, delta_a2), (issue_b1, delta_b1)
    ]

    upload_1 = framework_helpers.AttachmentUpload(
        'dragon', b'OOOOOO\n', 'text/plain')
    upload_2 = framework_helpers.AttachmentUpload(
        'snake', b'ooooo\n', 'text/plain')
    attachment_uploads = [upload_1, upload_2]

    actual = tracker_helpers._EnforceAttachmentQuotaLimits(
        self.cnxn, issue_delta_pairs, self.services, attachment_uploads)

    expected = {
        798: len(upload_1.contents + upload_2.contents) * 2,
        788: len(upload_1.contents + upload_2.contents)
    }
    self.assertEqual(actual, expected)

  @mock.patch('tracker.tracker_constants.ISSUE_ATTACHMENTS_QUOTA_HARD', 1)
  def testEnforceAttachmentQuotaLimits_Exceeded(self):
    self.services.project.TestAddProject('Circe', project_id=798)
    issue_a1 = _Issue('Circe', 1, project_id=798)
    delta_a1 = tracker_pb2.IssueDelta()

    issue_a2 = _Issue('Circe', 2, project_id=798)
    delta_a2 = tracker_pb2.IssueDelta()

    self.services.project.TestAddProject('Patroclus', project_id=788)
    issue_b1 = _Issue('Patroclus', 1, project_id=788)
    delta_b1 = tracker_pb2.IssueDelta()

    issue_delta_pairs = [
        (issue_a1, delta_a1), (issue_a2, delta_a2), (issue_b1, delta_b1)
    ]

    upload_1 = framework_helpers.AttachmentUpload(
        'dragon', b'OOOOOO\n', 'text/plain')
    upload_2 = framework_helpers.AttachmentUpload(
        'snake', b'ooooo\n', 'text/plain')
    attachment_uploads = [upload_1, upload_2]

    with self.assertRaisesRegex(exceptions.OverAttachmentQuota,
                                r'.+ project Patroclus\n.+ project Circe'):
      tracker_helpers._EnforceAttachmentQuotaLimits(
          self.cnxn, issue_delta_pairs, self.services, attachment_uploads)

  def testAssertIssueChangesValid_Valid(self):
    """We can assert when deltas are valid for issues."""
    impacted_issue = _Issue('chicken', 101)
    self.services.issue.TestAddIssue(impacted_issue)

    issue_1 = _Issue('chicken', 1)
    self.services.issue.TestAddIssue(issue_1)
    delta_1 = tracker_pb2.IssueDelta(
        merged_into=impacted_issue.issue_id, status='Duplicate')
    exp_d1 = copy.deepcopy(delta_1)

    issue_2 = _Issue('chicken', 2)
    self.services.issue.TestAddIssue(issue_2)
    delta_2 = tracker_pb2.IssueDelta(blocked_on_add=[impacted_issue.issue_id])
    exp_d2 = copy.deepcopy(delta_2)

    issue_3 = _Issue('chicken', 3)
    self.services.issue.TestAddIssue(issue_3)
    delta_3 = tracker_pb2.IssueDelta()
    exp_d3 = copy.deepcopy(delta_3)

    issue_4 = _Issue('chicken', 4)
    self.services.issue.TestAddIssue(issue_4)
    delta_4 = tracker_pb2.IssueDelta(owner_id=self.project_member.user_id)
    exp_d4 = copy.deepcopy(delta_4)

    issue_5 = _Issue('chicken', 5)
    self.services.issue.TestAddIssue(issue_5)
    fv = tracker_bizobj.MakeFieldValue(
        self.int_fd.field_id, 998, None, None, None, None, False)
    delta_5 = tracker_pb2.IssueDelta(field_vals_add=[fv])
    exp_d5 = copy.deepcopy(delta_5)

    issue_6 = _Issue('chicken', 6)
    self.services.issue.TestAddIssue(issue_6)
    delta_6 = tracker_pb2.IssueDelta(
        summary='  ' + 's' * tracker_constants.MAX_SUMMARY_CHARS + '  ')
    exp_d6 = copy.deepcopy(delta_6)

    issue_7 = _Issue('chicken', 7)
    self.services.issue.TestAddIssue(issue_7)
    issue_8 = _Issue('chicken', 8)
    self.services.issue.TestAddIssue(issue_8)

    # We are fine with duplicate/consistent deltas.
    delta_7 = tracker_pb2.IssueDelta(blocked_on_add=[issue_8.issue_id])
    exp_d7 = copy.deepcopy(delta_7)
    delta_8 = tracker_pb2.IssueDelta(blocking_add=[issue_7.issue_id])
    exp_d8 = copy.deepcopy(delta_8)

    issue_9 = _Issue('chicken', 9)
    self.services.issue.TestAddIssue(issue_9)
    issue_10 = _Issue('chicken', 10)
    self.services.issue.TestAddIssue(issue_10)

    delta_9 = tracker_pb2.IssueDelta(blocked_on_remove=[issue_10.issue_id])
    exp_d9 = copy.deepcopy(delta_9)
    delta_10 = tracker_pb2.IssueDelta(blocking_remove=[issue_9.issue_id])
    exp_d10 = copy.deepcopy(delta_10)

    issue_11 = _Issue('chicken', 11)
    user_fd = tracker_bizobj.MakeFieldDef(
        123, 789, 'CPU', tracker_pb2.FieldTypes.USER_TYPE, None, '', False,
        False, False, None, None, '', False, '', '',
        tracker_pb2.NotifyTriggers.NEVER, 'no_action', 'doc', False)
    self.services.config.TestAddFieldDef(user_fd)
    a_user = self.services.user.TestAddUser('a_user@test.com', 123)
    delta_11 = tracker_pb2.IssueDelta(
        cc_ids_add=[222],
        field_vals_add=[
            tracker_bizobj.MakeFieldValue(
                user_fd.field_id, None, None, a_user.user_id, None, None, False)
        ])
    exp_d11 = copy.deepcopy(delta_11)

    issue_delta_pairs = [
        (issue_1, delta_1), (issue_2, delta_2), (issue_3, delta_3),
        (issue_4, delta_4), (issue_5, delta_5), (issue_6, delta_6),
        (issue_7, delta_7), (issue_8, delta_8), (issue_9, delta_9),
        (issue_10, delta_10), (issue_11, delta_11)
    ]
    comment = '   ' + 'c' * tracker_constants.MAX_COMMENT_CHARS + '  '
    tracker_helpers._AssertIssueChangesValid(
        self.cnxn, issue_delta_pairs, self.services, comment_content=comment)

    # Check we can handle None `comment_content`.
    tracker_helpers._AssertIssueChangesValid(
        self.cnxn, issue_delta_pairs, self.services)
    self.assertEqual(
        [
            exp_d1, exp_d2, exp_d3, exp_d4, exp_d5, exp_d6, exp_d7, exp_d8,
            exp_d9, exp_d10, exp_d11
        ], [
            delta_1, delta_2, delta_3, delta_4, delta_5, delta_6, delta_7,
            delta_8, delta_9, delta_10, delta_11
        ])

  def testAssertIssueChangesValid_ValidatesLabels(self):
    """Asserts labels."""
    issue_1 = _Issue('chicken', 1)
    self.services.issue.TestAddIssue(issue_1)
    delta_1 = tracker_pb2.IssueDelta(labels_add=['freeze_new_label'])
    issue_delta_pairs = [(issue_1, delta_1)]
    comment = 'just a plain comment'
    with self.assertRaisesRegex(
        exceptions.InputException,
        ("The creation of new labels is blocked for the Chromium project"
         " in Monorail. To continue with editing your issue, please"
         " remove: freeze_new_label label\\(s\\).")):
      tracker_helpers._AssertIssueChangesValid(
          self.cnxn, issue_delta_pairs, self.services, comment_content=comment)

  def testAssertIssueChangesValid_RequiredField(self):
    """Asserts fields and requried fields.."""
    issue_1 = _Issue('chicken', 1)
    self.services.issue.TestAddIssue(issue_1)
    delta_1 = tracker_pb2.IssueDelta()
    exp_d1 = copy.deepcopy(delta_1)

    required_fd = tracker_bizobj.MakeFieldDef(
        124, 789, 'StrField', tracker_pb2.FieldTypes.STR_TYPE, None, '', True,
        False, False, None, None, '', False, '', '',
        tracker_pb2.NotifyTriggers.NEVER, 'no_action', 'doc', False)
    self.services.config.TestAddFieldDef(required_fd)

    issue_delta_pairs = [(issue_1, delta_1)]
    comment = 'just a plain comment'
    tracker_helpers._AssertIssueChangesValid(
        self.cnxn, issue_delta_pairs, self.services, comment_content=comment)

    # Check we can handle adding a field value when issue is in invalid state.
    fv = tracker_bizobj.MakeFieldValue(
        self.int_fd.field_id, 998, None, None, None, None, False)
    delta_2 = tracker_pb2.IssueDelta(field_vals_add=[fv])
    exp_d2 = copy.deepcopy(delta_2)
    tracker_helpers._AssertIssueChangesValid(
        self.cnxn, issue_delta_pairs, self.services)
    self.assertEqual([exp_d1, exp_d2], [delta_1, delta_2])

  def testAssertIssueChangesValid_Invalid(self):
    """We can raise exceptions when deltas are not valid for issues. """

    def getRef(issue):
      return '%s:%d' % (issue.project_name, issue.local_id)

    issue_delta_pairs = []
    expected_err_msgs = []

    comment = 'c' * (tracker_constants.MAX_COMMENT_CHARS + 1)
    expected_err_msgs.append('Comment is too long.')

    issue_1 = _Issue('chicken', 1)
    self.services.issue.TestAddIssue(issue_1)
    issue_1_ref = getRef(issue_1)

    delta_1 = tracker_pb2.IssueDelta(
        merged_into=issue_1.issue_id,
        blocked_on_add=[issue_1.issue_id],
        summary='',
        status='',
        cc_ids_add=[9876])

    issue_delta_pairs.append((issue_1, delta_1))
    expected_err_msgs.extend(
        [
            ('%s: MERGED type statuses must accompany mergedInto values.') %
            issue_1_ref,
            '%s: Cannot merge an issue into itself.' % issue_1_ref,
            '%s: Cannot block an issue on itself.' % issue_1_ref,
            'users/9876: User does not exist.',
            '%s: Summary required.' % issue_1_ref,
            '%s: Status is required.' % issue_1_ref
        ])

    issue_2 = _Issue('chicken', 2)
    self.services.issue.TestAddIssue(issue_2)
    issue_2_ref = getRef(issue_2)

    fv = tracker_bizobj.MakeFieldValue(
        self.int_fd.field_id, 1000, None, None, None, None, False)
    delta_2 = tracker_pb2.IssueDelta(
        status='Duplicate',
        blocking_add=[issue_2.issue_id],
        summary='s' * (tracker_constants.MAX_SUMMARY_CHARS + 1),
        owner_id=self.no_project_user.user_id,
        field_vals_add=[fv])
    issue_delta_pairs.append((issue_2, delta_2))

    expected_err_msgs.extend(
        [
            ('%s: MERGED type statuses must accompany mergedInto values.') %
            issue_2_ref,
            '%s: Cannot block an issue on itself.' % issue_2_ref,
            '%s: Issue owner must be a project member.' % issue_2_ref,
            '%s: Summary is too long.' % issue_2_ref,
            '%s: Error for %r: Value must be <= 999.' % (issue_2_ref, fv)
        ])

    issue_3 = _Issue('chicken', 3)
    issue_3.status = 'Duplicate'
    issue_3.merged_into = 78911
    self.services.issue.TestAddIssue(issue_3)
    issue_3_ref = getRef(issue_3)
    delta_3 = tracker_pb2.IssueDelta(
        status='Available', merged_into_external='b/123')
    issue_delta_pairs.append((issue_3, delta_3))
    expected_err_msgs.append(
        '%s: MERGED type statuses must accompany mergedInto values.' %
        issue_3_ref)

    with self.assertRaisesRegex(exceptions.InputException,
                                '\n'.join(expected_err_msgs)):
      tracker_helpers._AssertIssueChangesValid(
          self.cnxn, issue_delta_pairs, self.services, comment_content=comment)

  def testAssertIssueChangesValid_ConflictingDeltas(self):

    def getRef(issue):
      return '%s:%d' % (issue.project_name, issue.local_id)

    expected_err_msgs = []
    issue_3 = _Issue('chicken', 3)
    self.services.issue.TestAddIssue(issue_3)
    issue_3_ref = getRef(issue_3)
    issue_4 = _Issue('chicken', 4)
    self.services.issue.TestAddIssue(issue_4)
    issue_4_ref = getRef(issue_4)
    issue_5 = _Issue('chicken', 5)
    self.services.issue.TestAddIssue(issue_5)
    issue_5_ref = getRef(issue_5)
    issue_6 = _Issue('chicken', 6)
    self.services.issue.TestAddIssue(issue_6)
    issue_6_ref = getRef(issue_6)
    issue_7 = _Issue('chicken', 7)
    self.services.issue.TestAddIssue(issue_7)
    issue_7_ref = getRef(issue_7)

    delta_3 = tracker_pb2.IssueDelta(
        blocking_add=[issue_4.issue_id],
        blocked_on_add=[issue_5.issue_id, issue_6.issue_id])

    delta_4 = tracker_pb2.IssueDelta(
        blocked_on_remove=[issue_3.issue_id], blocking_add=[issue_5.issue_id])
    expected_err_msgs.append(
        'Changes for %s conflict for %s' % (issue_4_ref, issue_3_ref))

    delta_5 = tracker_pb2.IssueDelta(
        blocking_remove=[issue_3.issue_id],
        blocked_on_remove=[issue_4.issue_id])
    expected_err_msgs.append(
        'Changes for %s conflict for %s, %s' %
        (issue_5_ref, issue_3_ref, issue_4_ref))

    delta_6 = tracker_pb2.IssueDelta(blocking_remove=[issue_3.issue_id])
    expected_err_msgs.append(
        'Changes for %s conflict for %s' % (issue_6_ref, issue_3_ref))

    impacted_issue = _Issue('chicken', 11)
    self.services.issue.TestAddIssue(impacted_issue)
    impacted_issue_ref = getRef(impacted_issue)
    delta_7 = tracker_pb2.IssueDelta(
        blocking_remove=[issue_3.issue_id],
        blocking_add=[issue_3.issue_id],
        blocked_on_remove=[impacted_issue.issue_id],
        blocked_on_add=[impacted_issue.issue_id])
    expected_err_msgs.append(
        'Changes for %s conflict for %s, %s' %
        (issue_7_ref, issue_3_ref, impacted_issue_ref))

    issue_delta_pairs = [
        (issue_3, delta_3),
        (issue_4, delta_4),
        (issue_5, delta_5),
        (issue_6, delta_6),
        (issue_7, delta_7),
    ]

    with self.assertRaisesRegex(exceptions.InputException,
                                '\n'.join(expected_err_msgs)):
      tracker_helpers._AssertIssueChangesValid(
          self.cnxn, issue_delta_pairs, self.services)

  def testComputeNewCcsFromIssueMerge(self):
    """We can compute the new ccs to add to a merge-into issue."""
    target_issue = fake.MakeTestIssue(789, 10, 'Target issue', 'New', 111)
    source_issue_1 = fake.MakeTestIssue(
        789, 11, 'Source issue', 'New', 111)  # different restrictions
    source_issue_2 = fake.MakeTestIssue(
        789, 12, 'Source issue', 'New', 222)  # same restrictions
    source_issue_3 = fake.MakeTestIssue(
        789, 13, 'Source issue', 'New', 222)  # no restrictions
    source_issue_4 = fake.MakeTestIssue(
        789, 14, 'Source issue', 'New', 666)  # empty ccs
    source_issue_5 = fake.MakeTestIssue(
        788, 15, 'Source issue', 'New', 666)  # different project
    source_issue_1.cc_ids.append(333)
    source_issue_2.cc_ids.append(444)
    source_issue_3.cc_ids.append(555)
    source_issue_5.cc_ids.append(999)

    target_issue.labels.append('Restrict-View-Chicken')
    source_issue_1.labels.append('Restrict-View-Cow')
    source_issue_2.labels.append('Restrict-View-Chicken')

    self.services.issue.TestAddIssue(target_issue)
    self.services.issue.TestAddIssue(source_issue_1)
    self.services.issue.TestAddIssue(source_issue_2)
    self.services.issue.TestAddIssue(source_issue_3)
    self.services.issue.TestAddIssue(source_issue_4)
    self.services.issue.TestAddIssue(source_issue_5)

    new_cc_ids = tracker_helpers._ComputeNewCcsFromIssueMerge(
        target_issue, [source_issue_1, source_issue_2, source_issue_3])
    six.assertCountEqual(self, new_cc_ids, [444, 555, 222])

  def testComputeNewCcsFromIssueMerge_Empty(self):
    target_issue = fake.MakeTestIssue(789, 10, 'Target issue', 'New', 111)
    self.services.issue.TestAddIssue(target_issue)
    new_cc_ids = tracker_helpers._ComputeNewCcsFromIssueMerge(target_issue, [])
    six.assertCountEqual(self, new_cc_ids, [])

  def testEnforceNonMergeStatusDeltas(self):
    # No updates: user is setting to a non-MERGED status with no
    # existing merged_into values.
    issue_1 = _Issue('chicken', 1)
    self.services.issue.TestAddIssue(issue_1)
    delta_1 = tracker_pb2.IssueDelta(status='Available')
    exp_delta_1 = copy.deepcopy(delta_1)

    # No updates: user is setting to a MERGED status. Whether this request
    # goes through will be handled by _AssertIssueChangesValid().
    issue_2 = _Issue('chicken', 2)
    self.services.issue.TestAddIssue(issue_2)
    delta_2 = tracker_pb2.IssueDelta(status='Duplicate')
    exp_delta_2 = copy.deepcopy(delta_2)

    # No updates: user is setting to a MERGED status. (This test issue starts
    # out with a merged_into value but a non-MERGED status. We don't expect
    # real data to ever be in this state)
    issue_3 = _Issue('chicken', 3)
    issue_3.merged_into = 7011
    self.services.issue.TestAddIssue(issue_3)
    delta_3 = tracker_pb2.IssueDelta(status='Duplicate')
    exp_delta_3 = copy.deepcopy(delta_3)

    # No updates: same situation as above.
    issue_4 = _Issue('chicken', 4)
    issue_4.merged_into_external = 'b/123'
    self.services.issue.TestAddIssue(issue_4)
    delta_4 = tracker_pb2.IssueDelta(status='Duplicate')
    exp_delta_4 = copy.deepcopy(delta_4)

    # Update delta: user is setting status AWAY from a MERGED status, so we
    # auto-remove any existing merged_into values.
    issue_5 = _Issue('chicken', 5)
    issue_5.merged_into = 7011
    self.services.issue.TestAddIssue(issue_5)
    delta_5 = tracker_pb2.IssueDelta(status='Available')
    exp_delta_5 = copy.deepcopy(delta_5)
    exp_delta_5.merged_into = 0

    # Update delta: user is setting status AWAY from a MERGED status, so we
    # auto-remove any existing merged_into values.
    issue_6 = _Issue('chicken', 6)
    issue_6.merged_into_external = 'b/123'
    self.services.issue.TestAddIssue(issue_6)
    delta_6 = tracker_pb2.IssueDelta(status='Available')
    exp_delta_6 = copy.deepcopy(delta_6)
    exp_delta_6.merged_into_external = ''

    # No updates: user is setting to a non-MERGED status while also setting
    # a merged_into value. This will be rejected down the line by
    # _AssertIssueChangesValid()
    issue_7 = _Issue('chicken', 7)
    issue_7.merged_into = 7011
    self.services.issue.TestAddIssue(issue_7)
    delta_7 = tracker_pb2.IssueDelta(
        merged_into_external='b/123', status='Available')
    exp_delta_7 = copy.deepcopy(delta_7)

    # No updates: user is setting to a non-MERGED status while also setting
    # a merged_into value. This will be rejected down the line by
    # _AssertIssueChangesValid()
    issue_8 = _Issue('chicken', 8)
    issue_8.merged_into_external = 'b/123'
    self.services.issue.TestAddIssue(issue_8)
    delta_8 = tracker_pb2.IssueDelta(merged_into=8011, status='Available')
    exp_delta_8 = copy.deepcopy(delta_8)

    pairs = [
        (issue_1, delta_1), (issue_2, delta_2), (issue_3, delta_3),
        (issue_4, delta_4), (issue_5, delta_5), (issue_6, delta_6),
        (issue_7, delta_7), (issue_8, delta_8)
    ]

    tracker_helpers._EnforceNonMergeStatusDeltas(
        self.cnxn, pairs, self.services)
    self.assertEqual(
        [
            delta_1, delta_2, delta_3, delta_4, delta_5, delta_6, delta_7,
            delta_8
        ], [
            exp_delta_1, exp_delta_2, exp_delta_3, exp_delta_4, exp_delta_5,
            exp_delta_6, exp_delta_7, exp_delta_8
        ])


class IssueChangeImpactedIssuesTest(unittest.TestCase):
  """Tests for the _IssueChangeImpactedIssues class."""

  def setUp(self):
    self.services = service_manager.Services(
        issue=fake.IssueService(), issue_star=fake.IssueStarService())
    self.cnxn = 'fake connection'

  def testComputeAllImpactedIDs(self):
    tracker = tracker_helpers._IssueChangeImpactedIssues()
    tracker.blocking_add[78901].append(1)
    tracker.blocking_remove[78902].append(2)
    tracker.blocked_on_add[78903].append(1)
    tracker.blocked_on_remove[78904].append(1)
    tracker.merged_from_add[78905].append(3)
    tracker.merged_from_remove[78906].append(3)

    # Repeat a few iids.
    tracker.blocked_on_remove[78901].append(1)
    tracker.merged_from_add[78903].append(1)

    actual = tracker.ComputeAllImpactedIIDs()
    expected = {78901, 78902, 78903, 78904, 78905, 78906}
    self.assertEqual(actual, expected)

  def testComputeAllImpactedIDs_Empty(self):
    tracker = tracker_helpers._IssueChangeImpactedIssues()
    actual = tracker.ComputeAllImpactedIIDs()
    self.assertEqual(actual, set())

  def testTrackImpactedIssues(self):
    issue_delta_pairs = []

    issue_1 = _Issue('project', 1)
    issue_1.merged_into = 78906
    delta_1 = tracker_pb2.IssueDelta(
        merged_into=78905,
        blocked_on_add=[78901, 78902],
        blocked_on_remove=[78903, 78904],
    )
    issue_delta_pairs.append((issue_1, delta_1))

    issue_2 = _Issue('project', 2)
    issue_2.merged_into = 78905
    delta_2 = tracker_pb2.IssueDelta(
        merged_into=78905,  # This should be ignored.
        blocking_add=[78901, 78902],
        blocking_remove=[78903, 78904],
    )
    issue_delta_pairs.append((issue_2, delta_2))

    issue_3 = _Issue('project', 3)
    issue_3.merged_into = 78902
    delta_3 = tracker_pb2.IssueDelta(merged_into=78901)
    issue_delta_pairs.append((issue_3, delta_3))

    issue_4 = _Issue('project', 4)
    issue_4.merged_into = 78901
    delta_4 = tracker_pb2.IssueDelta(
        merged_into=framework_constants.NO_ISSUE_SPECIFIED)
    issue_delta_pairs.append((issue_4, delta_4))

    impacted_issues = tracker_helpers._IssueChangeImpactedIssues()
    for issue, delta in issue_delta_pairs:
      impacted_issues.TrackImpactedIssues(issue, delta)

    self.assertEqual(
        impacted_issues.blocking_add, {
            78901: [issue_1.issue_id],
            78902: [issue_1.issue_id]
        })
    self.assertEqual(
        impacted_issues.blocking_remove, {
            78903: [issue_1.issue_id],
            78904: [issue_1.issue_id]
        })
    self.assertEqual(
        impacted_issues.blocked_on_add, {
            78901: [issue_2.issue_id],
            78902: [issue_2.issue_id]
        })
    self.assertEqual(
        impacted_issues.blocked_on_remove, {
            78903: [issue_2.issue_id],
            78904: [issue_2.issue_id]
        })
    self.assertEqual(
        impacted_issues.merged_from_add, {
            78901: [issue_3.issue_id],
            78905: [issue_1.issue_id],
        })
    self.assertEqual(
        impacted_issues.merged_from_remove, {
            78901: [issue_4.issue_id],
            78902: [issue_3.issue_id],
            78906: [issue_1.issue_id],
        })

  def testApplyImpactedIssueChanges(self):
    impacted_tracker = tracker_helpers._IssueChangeImpactedIssues()
    impacted_issue = _Issue('proj', 1)
    self.services.issue.TestAddIssue(impacted_issue)
    impacted_iid = impacted_issue.issue_id

    # Setup.
    bo_add = _Issue('proj', 2)
    self.services.issue.TestAddIssue(bo_add)
    impacted_tracker.blocked_on_add[impacted_iid].append(bo_add.issue_id)

    bo_remove = _Issue('proj', 3)
    self.services.issue.TestAddIssue(bo_remove)
    impacted_tracker.blocked_on_remove[impacted_iid].append(
        bo_remove.issue_id)

    b_add = _Issue('proj', 4)
    self.services.issue.TestAddIssue(b_add)
    impacted_tracker.blocking_add[impacted_iid].append(
        b_add.issue_id)

    b_remove = _Issue('proj', 5)
    self.services.issue.TestAddIssue(b_remove)
    impacted_tracker.blocking_remove[impacted_iid].append(
        b_remove.issue_id)

    m_add = _Issue('proj', 6)
    m_add.cc_ids = [666, 777]
    self.services.issue.TestAddIssue(m_add)
    m_add_no_ccs = _Issue('proj', 7, '', '')
    self.services.issue.TestAddIssue(m_add_no_ccs)
    impacted_tracker.merged_from_add[impacted_iid].extend(
        [m_add.issue_id, m_add_no_ccs.issue_id])
    # Set up starrers.
    self.services.issue_star.SetStar(
        self.cnxn, self.services, None, impacted_iid, 111, True)
    self.services.issue_star.SetStar(
        self.cnxn, self.services, None, impacted_iid, 222, True)
    self.services.issue_star.SetStar(
        self.cnxn, self.services, None, m_add.issue_id, 222, True)
    self.services.issue_star.SetStar(
        self.cnxn, self.services, None, m_add.issue_id, 333, True)
    self.services.issue_star.SetStar(
        self.cnxn, self.services, None, m_add.issue_id, 444, True)

    m_remove = _Issue('proj', 8)
    m_remove.cc_ids = [888]
    self.services.issue.TestAddIssue(m_remove)
    impacted_tracker.merged_from_remove[impacted_iid].append(
        m_remove.issue_id)


    impacted_issue.cc_ids = [666]
    impacted_issue.blocked_on_iids = [78404, bo_remove.issue_id]
    impacted_issue.blocking_iids = [78405, b_remove.issue_id]
    expected_issue = copy.deepcopy(impacted_issue)

    # Verify.
    (actual_amendments,
     actual_new_starrers) = impacted_tracker.ApplyImpactedIssueChanges(
         self.cnxn, impacted_issue, self.services)
    expected_amendments = [
        tracker_bizobj.MakeBlockedOnAmendment(
            [('proj', bo_add.local_id)],
            [('proj', bo_remove.local_id)], default_project_name='proj'),
        tracker_bizobj.MakeBlockingAmendment(
            [('proj', b_add.local_id)],
            [('proj', b_remove.local_id)], default_project_name='proj'),
        tracker_bizobj.MakeCcAmendment([777], []),
        tracker_bizobj.MakeMergedIntoAmendment(
            [('proj', m_add.local_id), ('proj', m_add_no_ccs.local_id)],
            [('proj', m_remove.local_id)], default_project_name='proj')
        ]
    self.assertEqual(actual_amendments, expected_amendments)
    six.assertCountEqual(self, actual_new_starrers, [333, 444])

    expected_issue.cc_ids.append(777)
    expected_issue.blocked_on_iids = [78404, bo_add.issue_id]
    # By default new blocked_on issues that appear in blocked_on_iids
    # with no prior rank associated with it are un-ranked and assigned rank 0.
    # See SortBlockedOn in issue_svc.py.
    expected_issue.blocked_on_ranks = [0, 0]
    expected_issue.blocking_iids = [78405, b_add.issue_id]
    expected_issue.star_count = 4
    self.assertEqual(impacted_issue, expected_issue)

  def testApplyImpactedIssueChanges_Empty(self):
    impacted_tracker = tracker_helpers._IssueChangeImpactedIssues()
    impacted_issue = _Issue('proj', 1)
    expected_issue = copy.deepcopy(impacted_issue)

    (actual_amendments,
     actual_new_starrers) = impacted_tracker.ApplyImpactedIssueChanges(
         self.cnxn, impacted_issue, self.services)

    expected_amendments = []
    self.assertEqual(actual_amendments, expected_amendments)
    expected_new_starrers = []
    self.assertEqual(actual_new_starrers, expected_new_starrers)
    self.assertEqual(impacted_issue, expected_issue)

  def testApplyImpactedIssueChanges_PartiallyEmptyMergedFrom(self):
    """We can process merged_from changes when one of the lists is empty."""
    impacted_tracker = tracker_helpers._IssueChangeImpactedIssues()
    impacted_issue = _Issue('proj', 1)
    impacted_iid = impacted_issue.issue_id
    expected_issue = copy.deepcopy(impacted_issue)

    m_add = _Issue('proj', 2)
    self.services.issue.TestAddIssue(m_add)
    impacted_tracker.merged_from_add[impacted_iid].append(
        m_add.issue_id)
    # We're leaving impacted_tracker.merged_from_remove empty.

    (actual_amendments,
     actual_new_starrers) = impacted_tracker.ApplyImpactedIssueChanges(
         self.cnxn, impacted_issue, self.services)

    expected_amendments = [tracker_bizobj.MakeMergedIntoAmendment(
            [('proj', m_add.local_id)], [], default_project_name='proj')]
    self.assertEqual(actual_amendments, expected_amendments)
    expected_new_starrers = []
    self.assertEqual(actual_new_starrers, expected_new_starrers)
    self.assertEqual(impacted_issue, expected_issue)


class AssertUsersExistTest(unittest.TestCase):

  def setUp(self):
    self.cnxn = 'fake cnxn'
    self.services = service_manager.Services(user=fake.UserService())
    for user_id in [1, 1001, 1002, 1003, 2001, 2002, 3002]:
      self.services.user.TestAddUser('test%d' % user_id, user_id, add_user=True)

  def test_AssertUsersExist_Passes(self):
    existing = [1, 1001, 1002, 1003, 2001, 2002, 3002]
    with exceptions.ErrorAggregator(exceptions.InputException) as err_agg:
      tracker_helpers.AssertUsersExist(
          self.cnxn, self.services, existing, err_agg)

  def test_AssertUsersExist_Empty(self):
    with exceptions.ErrorAggregator(exceptions.InputException) as err_agg:
      tracker_helpers.AssertUsersExist(
          self.cnxn, self.services, [], err_agg)

  def test_AssertUsersExist(self):
    dne_users = [2, 3]
    existing = [1, 1001, 1002, 1003, 2001, 2002, 3002]
    all_users = existing + dne_users
    with self.assertRaisesRegex(
        exceptions.InputException,
        'users/2: User does not exist.\nusers/3: User does not exist.'):
      with exceptions.ErrorAggregator(exceptions.InputException) as err_agg:
        tracker_helpers.AssertUsersExist(
            self.cnxn, self.services, all_users, err_agg)
