# Copyright 2017 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Tests for notify_reasons.py."""
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import

import unittest
import os

from features import notify_reasons
from framework import emailfmt
from framework import framework_views
from framework import urls
from mrproto import user_pb2
from mrproto import usergroup_pb2
from services import service_manager
from testing import fake
from tracker import tracker_bizobj


REPLY_NOT_ALLOWED = notify_reasons.REPLY_NOT_ALLOWED
REPLY_MAY_COMMENT = notify_reasons.REPLY_MAY_COMMENT
REPLY_MAY_UPDATE = notify_reasons.REPLY_MAY_UPDATE


class ComputeIssueChangeAddressPermListTest(unittest.TestCase):

  def setUp(self):
    self.users_by_id = {
        111: framework_views.StuffUserView(111, 'owner@example.com', True),
        222: framework_views.StuffUserView(222, 'member@example.com', True),
        999: framework_views.StuffUserView(999, 'visitor@example.com', True),
        }
    self.services = service_manager.Services(
        project=fake.ProjectService(),
        config=fake.ConfigService(),
        issue=fake.IssueService(),
        user=fake.UserService(),
        usergroup=fake.UserGroupService())
    self.owner = self.services.user.TestAddUser('owner@example.com', 111)
    self.member = self.services.user.TestAddUser('member@example.com', 222)
    self.visitor = self.services.user.TestAddUser('visitor@example.com', 999)
    self.project = self.services.project.TestAddProject(
        'proj', owner_ids=[111], committer_ids=[222])
    self.project.process_inbound_email = True
    self.issue = fake.MakeTestIssue(
        self.project.project_id, 1, 'summary', 'New', 111)

  def testEmptyIDs(self):
    cnxn = 'fake cnxn'
    addr_perm_list = notify_reasons.ComputeIssueChangeAddressPermList(
        cnxn, [], self.project, self.issue, self.services, [], {})
    self.assertEqual([], addr_perm_list)

  def testRecipientIsMember(self):
    cnxn = 'fake cnxn'
    ids_to_consider = [111, 222, 999]
    addr_perm_list = notify_reasons.ComputeIssueChangeAddressPermList(
        cnxn, ids_to_consider, self.project, self.issue, self.services, set(),
        self.users_by_id, pref_check_function=lambda *args: True)
    self.assertEqual(
        [notify_reasons.AddrPerm(
            True, 'owner@example.com', self.owner, REPLY_MAY_UPDATE,
            user_pb2.UserPrefs(user_id=111)),
         notify_reasons.AddrPerm(
            True, 'member@example.com', self.member, REPLY_MAY_UPDATE,
            user_pb2.UserPrefs(user_id=222)),
         notify_reasons.AddrPerm(
            False, 'visitor@example.com', self.visitor, REPLY_MAY_COMMENT,
            user_pb2.UserPrefs(user_id=999))],
        addr_perm_list)


class ComputeProjectAndIssueNotificationAddrListTest(unittest.TestCase):

  def setUp(self):
    self.cnxn = 'fake cnxn'
    self.services = service_manager.Services(
        project=fake.ProjectService(),
        user=fake.UserService())
    self.project = self.services.project.TestAddProject('project')
    self.services.user.TestAddUser('alice@gmail.com', 111)
    self.services.user.TestAddUser('bob@gmail.com', 222)
    self.services.user.TestAddUser('fred@gmail.com', 555)

  def testNotifyAddress(self):
    # No mailing list or filter rules are defined
    addr_perm_list = notify_reasons.ComputeProjectNotificationAddrList(
        self.cnxn, self.services, self.project, True, set())
    self.assertListEqual([], addr_perm_list)

    # Only mailing list is notified.
    self.project.issue_notify_address = 'mailing-list@domain.com'
    addr_perm_list = notify_reasons.ComputeProjectNotificationAddrList(
        self.cnxn, self.services, self.project, True, set())
    self.assertListEqual(
        [notify_reasons.AddrPerm(
            False, 'mailing-list@domain.com', None, REPLY_NOT_ALLOWED,
            user_pb2.UserPrefs())],
        addr_perm_list)

    # No one is notified because mailing list was already notified.
    omit_addrs = {'mailing-list@domain.com'}
    addr_perm_list = notify_reasons.ComputeProjectNotificationAddrList(
        self.cnxn, self.services, self.project, False, omit_addrs)
    self.assertListEqual([], addr_perm_list)

    # No one is notified because anon users cannot view.
    addr_perm_list = notify_reasons.ComputeProjectNotificationAddrList(
        self.cnxn, self.services, self.project, False, set())
    self.assertListEqual([], addr_perm_list)

  def testFilterRuleNotifyAddresses(self):
    issue = fake.MakeTestIssue(
        self.project.project_id, 1, 'summary', 'New', 555)
    issue.derived_notify_addrs.extend(['notify@domain.com'])

    addr_perm_list = notify_reasons.ComputeIssueNotificationAddrList(
        self.cnxn, self.services, issue, set())
    self.assertListEqual(
        [notify_reasons.AddrPerm(
            False, 'notify@domain.com', None, REPLY_NOT_ALLOWED,
            user_pb2.UserPrefs())],
        addr_perm_list)

    # Also-notify addresses can be omitted (e.g., if it is the same as
    # the email address of the user who made the change).
    addr_perm_list = notify_reasons.ComputeIssueNotificationAddrList(
        self.cnxn, self.services, issue, {'notify@domain.com'})
    self.assertListEqual([], addr_perm_list)


class ComputeGroupReasonListTest(unittest.TestCase):

  def setUp(self):
    self.services = service_manager.Services(
        project=fake.ProjectService(),
        config=fake.ConfigService(),
        issue=fake.IssueService(),
        features=fake.FeaturesService(),
        user=fake.UserService(),
        usergroup=fake.UserGroupService())
    self.project = self.services.project.TestAddProject(
      'project', project_id=789)
    self.config = self.services.config.GetProjectConfig('cnxn', 789)
    self.alice = self.services.user.TestAddUser('alice@example.com', 111)
    self.bob = self.services.user.TestAddUser('bob@example.com', 222)
    self.fred = self.services.user.TestAddUser('fred@example.com', 555)
    self.users_by_id = framework_views.MakeAllUserViews(
        'cnxn', self.services.user, [111, 222, 555])
    self.issue = fake.MakeTestIssue(
        self.project.project_id, 1, 'summary', 'New', 555)

  def CheckGroupReasonList(
      self,
      actual,
      reporter_apl=None,
      owner_apl=None,
      old_owner_apl=None,
      default_owner_apl=None,
      ccd_apl=None,
      group_ccd_apl=None,
      default_ccd_apl=None,
      starrer_apl=None,
      subscriber_apl=None,
      also_notified_apl=None,
      all_notifications_apl=None):
    (
        you_report, you_own, you_old_owner, you_default_owner, you_ccd,
        you_group_ccd, you_default_ccd, you_star, you_subscribe,
        you_also_notify, all_notifications) = actual
    self.assertEqual(
        (reporter_apl or [], notify_reasons.REASON_REPORTER),
        you_report)
    self.assertEqual(
        (owner_apl or [], notify_reasons.REASON_OWNER),
        you_own)
    self.assertEqual(
        (old_owner_apl or [], notify_reasons.REASON_OLD_OWNER),
        you_old_owner)
    self.assertEqual(
        (default_owner_apl or [], notify_reasons.REASON_DEFAULT_OWNER),
        you_default_owner)
    self.assertEqual(
        (ccd_apl or [], notify_reasons.REASON_CCD),
        you_ccd)
    self.assertEqual(
        (group_ccd_apl or [], notify_reasons.REASON_GROUP_CCD), you_group_ccd)
    self.assertEqual(
        (default_ccd_apl or [], notify_reasons.REASON_DEFAULT_CCD),
        you_default_ccd)
    self.assertEqual(
        (starrer_apl or [], notify_reasons.REASON_STARRER),
        you_star)
    self.assertEqual(
        (subscriber_apl or [], notify_reasons.REASON_SUBSCRIBER),
        you_subscribe)
    self.assertEqual(
        (also_notified_apl or [], notify_reasons.REASON_ALSO_NOTIFY),
        you_also_notify)
    self.assertEqual(
        (all_notifications_apl or [], notify_reasons.REASON_ALL_NOTIFICATIONS),
        all_notifications)

  def testComputeGroupReasonList_OwnerAndCC(self):
    """Fred owns the issue, Alice is CC'd."""
    self.issue.cc_ids = [self.alice.user_id]
    actual = notify_reasons.ComputeGroupReasonList(
        'cnxn', self.services, self.project, self.issue, self.config,
        self.users_by_id, [], True)
    self.CheckGroupReasonList(
        actual,
        owner_apl=[notify_reasons.AddrPerm(
            False, self.fred.email, self.fred, REPLY_NOT_ALLOWED,
            user_pb2.UserPrefs(user_id=self.fred.user_id))],
        ccd_apl=[notify_reasons.AddrPerm(
            False, self.alice.email, self.alice, REPLY_NOT_ALLOWED,
            user_pb2.UserPrefs(user_id=self.alice.user_id))])

  def testComputeGroupReasonList_DerivedCcs(self):
    """Check we correctly compute reasons for component and rule ccs."""
    member_1 = self.services.user.TestAddUser('member_1@example.com', 991)
    member_2 = self.services.user.TestAddUser('member_2@example.com', 992)
    member_3 = self.services.user.TestAddUser('member_3@example.com', 993)

    expanded_group = self.services.user.TestAddUser('group@example.com', 999)
    self.services.usergroup.CreateGroup(
        'cnxn', self.services, expanded_group.email, 'owners')
    self.services.usergroup.TestAddGroupSettings(
        expanded_group.user_id,
        expanded_group.email,
        notify_members=True,
        notify_group=False)
    self.services.usergroup.TestAddMembers(
        expanded_group.user_id, [member_1.user_id, member_2.user_id])

    group = self.services.user.TestAddUser('group_1@example.com', 888)
    self.services.usergroup.CreateGroup(
        'cnxn', self.services, group.email, 'owners')
    self.services.usergroup.TestAddGroupSettings(
        group.user_id, group.email, notify_members=False, notify_group=True)
    self.services.usergroup.TestAddMembers(
        group.user_id, [member_2.user_id, member_3.user_id])

    users_by_id = framework_views.MakeAllUserViews(
        'cnxn', self.services.user, [
            self.alice.user_id, self.fred.user_id, member_1.user_id,
            member_2.user_id, member_3.user_id, group.user_id,
            expanded_group.user_id
        ])

    comp_id = 123
    self.config.component_defs = [
        fake.MakeTestComponentDef(
            self.project.project_id,
            comp_id,
            path='Chicken',
            cc_ids=[group.user_id, expanded_group.user_id, self.alice.user_id])
    ]
    derived_cc_ids = [
        self.fred.user_id,  # cc'd directly due to a rule
        self.alice.user_id,  # cc'd due to the component
        expanded_group
        .user_id,  # cc'd due to the component, members notified directly
        group.user_id,  # cc'd due to the component
        # cc'd directly due to a rule,
        # not removed from rule cc notifications due to transitive cc of
        # expanded_group.
        member_1.user_id,
        member_3.user_id,
    ]
    issue = fake.MakeTestIssue(
        self.project.project_id,
        2,
        'summary',
        'New',
        0,
        derived_cc_ids=derived_cc_ids,
        component_ids=[comp_id])
    actual = notify_reasons.ComputeGroupReasonList(
        'cnxn', self.services, self.project, issue, self.config, users_by_id,
        [], True)

    # Asserts list/reason of derived ccs from rules (not components).
    # The derived ccs list/reason is the 7th tuple returned by
    # ComputeGroupReasonList()
    actual_ccd_apl, actual_ccd_reason = actual[6]
    self.assertEqual(
        actual_ccd_apl, [
            notify_reasons.AddrPerm(
                False, member_3.email, member_3, REPLY_NOT_ALLOWED,
                user_pb2.UserPrefs(user_id=member_3.user_id)),
            notify_reasons.AddrPerm(
                False, self.fred.email, self.fred, REPLY_NOT_ALLOWED,
                user_pb2.UserPrefs(user_id=self.fred.user_id)),
            notify_reasons.AddrPerm(
                False, member_1.email, member_1, REPLY_NOT_ALLOWED,
                user_pb2.UserPrefs(user_id=member_1.user_id)),
        ])
    self.assertEqual(actual_ccd_reason, notify_reasons.REASON_DEFAULT_CCD)

    # Asserts list/reason of derived ccs from components.
    # The component derived ccs list/reason is hte 8th tuple returned by
    # ComputeGroupReasonList() when there are component derived ccs.
    actual_component_apl, actual_comp_reason = actual[7]
    self.assertEqual(
        actual_component_apl, [
            notify_reasons.AddrPerm(
                False, group.email, group, REPLY_NOT_ALLOWED,
                user_pb2.UserPrefs(user_id=group.user_id)),
            notify_reasons.AddrPerm(
                False, self.alice.email, self.alice, REPLY_NOT_ALLOWED,
                user_pb2.UserPrefs(user_id=self.alice.user_id)),
            notify_reasons.AddrPerm(
                False, member_2.email, member_2, REPLY_NOT_ALLOWED,
                user_pb2.UserPrefs(user_id=member_2.user_id)),
            notify_reasons.AddrPerm(
                False, member_1.email, member_1, REPLY_NOT_ALLOWED,
                user_pb2.UserPrefs(user_id=member_1.user_id)),
        ])
    self.assertEqual(
        actual_comp_reason,
        "You are auto-CC'd on all issues in component Chicken")

  def testComputeGroupReasonList_Starrers(self):
    """Bob and Alice starred it, but Alice opts out of notifications."""
    self.alice.notify_starred_issue_change = False
    actual = notify_reasons.ComputeGroupReasonList(
        'cnxn', self.services, self.project, self.issue, self.config,
        self.users_by_id, [], True,
        starrer_ids=[self.alice.user_id, self.bob.user_id])
    self.CheckGroupReasonList(
        actual,
        owner_apl=[notify_reasons.AddrPerm(
            False, self.fred.email, self.fred, REPLY_NOT_ALLOWED,
            user_pb2.UserPrefs(user_id=self.fred.user_id))],
        starrer_apl=[notify_reasons.AddrPerm(
            False, self.bob.email, self.bob, REPLY_NOT_ALLOWED,
            user_pb2.UserPrefs(user_id=self.bob.user_id))])

  def testComputeGroupReasonList_Subscribers(self):
    """Bob subscribed."""
    sq = tracker_bizobj.MakeSavedQuery(
          1, 'freds issues', 1, 'owner:fred@example.com',
          subscription_mode='immediate', executes_in_project_ids=[789])
    self.services.features.UpdateUserSavedQueries(
        'cnxn', self.bob.user_id, [sq])
    actual = notify_reasons.ComputeGroupReasonList(
        'cnxn', self.services, self.project, self.issue, self.config,
        self.users_by_id, [], True)
    self.CheckGroupReasonList(
        actual,
        owner_apl=[notify_reasons.AddrPerm(
            False, self.fred.email, self.fred, REPLY_NOT_ALLOWED,
            user_pb2.UserPrefs(user_id=self.fred.user_id))],
        subscriber_apl=[notify_reasons.AddrPerm(
            False, self.bob.email, self.bob, REPLY_NOT_ALLOWED,
            user_pb2.UserPrefs(user_id=self.bob.user_id))])

    # Now with subscriber notifications disabled.
    actual = notify_reasons.ComputeGroupReasonList(
        'cnxn', self.services, self.project, self.issue, self.config,
        self.users_by_id, [], True, include_subscribers=False)
    self.CheckGroupReasonList(
        actual,
        owner_apl=[notify_reasons.AddrPerm(
            False, self.fred.email, self.fred, REPLY_NOT_ALLOWED,
            user_pb2.UserPrefs(user_id=self.fred.user_id))])

  def testComputeGroupReasonList_NotifyAll(self):
    """Project is configured to always notify issues@example.com."""
    self.project.issue_notify_address = 'issues@example.com'
    actual = notify_reasons.ComputeGroupReasonList(
        'cnxn', self.services, self.project, self.issue, self.config,
        self.users_by_id, [], True)
    self.CheckGroupReasonList(
        actual,
        owner_apl=[notify_reasons.AddrPerm(
            False, self.fred.email, self.fred, REPLY_NOT_ALLOWED,
            user_pb2.UserPrefs(user_id=self.fred.user_id))],
        all_notifications_apl=[notify_reasons.AddrPerm(
            False, 'issues@example.com', None, REPLY_NOT_ALLOWED,
            user_pb2.UserPrefs())])

    # We don't use the notify-all address when the issue is not public.
    actual = notify_reasons.ComputeGroupReasonList(
        'cnxn', self.services, self.project, self.issue, self.config,
        self.users_by_id, [], False)
    self.CheckGroupReasonList(
        actual,
        owner_apl=[notify_reasons.AddrPerm(
            False, self.fred.email, self.fred, REPLY_NOT_ALLOWED,
            user_pb2.UserPrefs(user_id=self.fred.user_id))])

    # Now with the notify-all address disabled.
    actual = notify_reasons.ComputeGroupReasonList(
        'cnxn', self.services, self.project, self.issue, self.config,
        self.users_by_id, [], True, include_notify_all=False)
    self.CheckGroupReasonList(
        actual,
        owner_apl=[notify_reasons.AddrPerm(
            False, self.fred.email, self.fred, REPLY_NOT_ALLOWED,
            user_pb2.UserPrefs(user_id=self.fred.user_id))])
