Project import generated by Copybara.

GitOrigin-RevId: d9e9e3fb4e31372ec1fb43b178994ca78fa8fe70
diff --git a/features/test/inboundemail_test.py b/features/test/inboundemail_test.py
new file mode 100644
index 0000000..6c13827
--- /dev/null
+++ b/features/test/inboundemail_test.py
@@ -0,0 +1,400 @@
+# Copyright 2016 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style
+# license that can be found in the LICENSE file or at
+# https://developers.google.com/open-source/licenses/bsd
+
+"""Unittests for monorail.feature.inboundemail."""
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+
+import unittest
+import webapp2
+from mock import patch
+
+import mox
+import time
+
+from google.appengine.ext.webapp.mail_handlers import BounceNotificationHandler
+
+import settings
+from businesslogic import work_env
+from features import alert2issue
+from features import commitlogcommands
+from features import inboundemail
+from framework import authdata
+from framework import emailfmt
+from framework import monorailcontext
+from framework import permissions
+from proto import project_pb2
+from proto import tracker_pb2
+from proto import user_pb2
+from services import service_manager
+from testing import fake
+from testing import testing_helpers
+from tracker import tracker_helpers
+
+
+class InboundEmailTest(unittest.TestCase):
+
+  def setUp(self):
+    self.cnxn = 'fake cnxn'
+    self.services = service_manager.Services(
+        config=fake.ConfigService(),
+        issue=fake.IssueService(),
+        user=fake.UserService(),
+        usergroup=fake.UserGroupService(),
+        project=fake.ProjectService())
+    self.project = self.services.project.TestAddProject(
+        'proj', project_id=987, process_inbound_email=True,
+        contrib_ids=[111])
+    self.project_addr = 'proj@monorail.example.com'
+
+    self.issue = tracker_pb2.Issue()
+    self.issue.project_id = 987
+    self.issue.local_id = 100
+    self.services.issue.TestAddIssue(self.issue)
+
+    self.msg = testing_helpers.MakeMessage(
+        testing_helpers.HEADER_LINES, 'awesome!')
+
+    request, _ = testing_helpers.GetRequestObjects()
+    self.inbound = inboundemail.InboundEmail(request, None, self.services)
+    self.mox = mox.Mox()
+
+  def tearDown(self):
+    self.mox.UnsetStubs()
+    self.mox.ResetAll()
+
+  def testTemplates(self):
+    for name, template_path in self.inbound._templates.items():
+      assert(name in inboundemail.MSG_TEMPLATES)
+      assert(
+          template_path.GetTemplatePath().endswith(
+              inboundemail.MSG_TEMPLATES[name]))
+
+  def testProcessMail_MsgTooBig(self):
+    self.mox.StubOutWithMock(emailfmt, 'IsBodyTooBigToParse')
+    emailfmt.IsBodyTooBigToParse(mox.IgnoreArg()).AndReturn(True)
+    self.mox.ReplayAll()
+
+    email_tasks = self.inbound.ProcessMail(self.msg, self.project_addr)
+    self.mox.VerifyAll()
+    self.assertEqual(1, len(email_tasks))
+    email_task = email_tasks[0]
+    self.assertEqual('user@example.com', email_task['to'])
+    self.assertEqual('Email body too long', email_task['subject'])
+
+  def testProcessMail_NoProjectOnToLine(self):
+    self.mox.StubOutWithMock(emailfmt, 'IsProjectAddressOnToLine')
+    emailfmt.IsProjectAddressOnToLine(
+        self.project_addr, [self.project_addr]).AndReturn(False)
+    self.mox.ReplayAll()
+
+    ret = self.inbound.ProcessMail(self.msg, self.project_addr)
+    self.mox.VerifyAll()
+    self.assertIsNone(ret)
+
+  def testProcessMail_IssueUnidentified(self):
+    self.mox.StubOutWithMock(emailfmt, 'IdentifyProjectVerbAndLabel')
+    emailfmt.IdentifyProjectVerbAndLabel(self.project_addr).AndReturn(('proj',
+        None, None))
+
+    self.mox.StubOutWithMock(emailfmt, 'IdentifyIssue')
+    emailfmt.IdentifyIssue('proj', mox.IgnoreArg()).AndReturn((None))
+
+    self.mox.ReplayAll()
+
+    ret = self.inbound.ProcessMail(self.msg, self.project_addr)
+    self.mox.VerifyAll()
+    self.assertIsNone(ret)
+
+  def testProcessMail_ProjectNotLive(self):
+    self.services.user.TestAddUser('user@example.com', 111)
+    self.project.state = project_pb2.ProjectState.DELETABLE
+    email_tasks = self.inbound.ProcessMail(self.msg, self.project_addr)
+    email_task = email_tasks[0]
+    self.assertEqual('user@example.com', email_task['to'])
+    self.assertEqual('Project not found', email_task['subject'])
+
+  def testProcessMail_ProjectInboundEmailDisabled(self):
+    self.services.user.TestAddUser('user@example.com', 111)
+    self.project.process_inbound_email = False
+    email_tasks = self.inbound.ProcessMail(self.msg, self.project_addr)
+    email_task = email_tasks[0]
+    self.assertEqual('user@example.com', email_task['to'])
+    self.assertEqual(
+        'Email replies are not enabled in project proj', email_task['subject'])
+
+  def testProcessMail_NoRefHeader(self):
+    self.services.user.TestAddUser('user@example.com', 111)
+    self.mox.StubOutWithMock(emailfmt, 'ValidateReferencesHeader')
+    emailfmt.ValidateReferencesHeader(
+        mox.IgnoreArg(), self.project, mox.IgnoreArg(),
+        mox.IgnoreArg()).AndReturn(False)
+    emailfmt.ValidateReferencesHeader(
+        mox.IgnoreArg(), self.project, mox.IgnoreArg(),
+        mox.IgnoreArg()).AndReturn(False)
+    self.mox.ReplayAll()
+
+    email_tasks = self.inbound.ProcessMail(self.msg, self.project_addr)
+    self.mox.VerifyAll()
+    self.assertEqual(1, len(email_tasks))
+    email_task = email_tasks[0]
+    self.assertEqual('user@example.com', email_task['to'])
+    self.assertEqual(
+        'Your message is not a reply to a notification email',
+        email_task['subject'])
+
+  def testProcessMail_NoAccount(self):
+    # Note: not calling TestAddUser().
+    email_tasks = self.inbound.ProcessMail(self.msg, self.project_addr)
+    self.mox.VerifyAll()
+    self.assertEqual(1, len(email_tasks))
+    email_task = email_tasks[0]
+    self.assertEqual('user@example.com', email_task['to'])
+    self.assertEqual(
+        'Could not determine account of sender', email_task['subject'])
+
+  def testProcessMail_BannedAccount(self):
+    user_pb = self.services.user.TestAddUser('user@example.com', 111)
+    user_pb.banned = 'banned'
+
+    self.mox.StubOutWithMock(emailfmt, 'ValidateReferencesHeader')
+    emailfmt.ValidateReferencesHeader(
+        mox.IgnoreArg(), self.project, mox.IgnoreArg(),
+        mox.IgnoreArg()).AndReturn(True)
+    self.mox.ReplayAll()
+
+    email_tasks = self.inbound.ProcessMail(self.msg, self.project_addr)
+    self.mox.VerifyAll()
+    self.assertEqual(1, len(email_tasks))
+    email_task = email_tasks[0]
+    self.assertEqual('user@example.com', email_task['to'])
+    self.assertEqual(
+        'You are banned from using this issue tracker', email_task['subject'])
+
+  def testProcessMail_Success(self):
+    self.services.user.TestAddUser('user@example.com', 111)
+
+    self.mox.StubOutWithMock(emailfmt, 'ValidateReferencesHeader')
+    emailfmt.ValidateReferencesHeader(
+        mox.IgnoreArg(), self.project, mox.IgnoreArg(),
+        mox.IgnoreArg()).AndReturn(True)
+
+    self.mox.StubOutWithMock(self.inbound, 'ProcessIssueReply')
+    self.inbound.ProcessIssueReply(
+        mox.IgnoreArg(), self.project, 123, self.project_addr,
+        'awesome!')
+
+    self.mox.ReplayAll()
+
+    ret = self.inbound.ProcessMail(self.msg, self.project_addr)
+    self.mox.VerifyAll()
+    self.assertIsNone(ret)
+
+  def testProcessMail_Success_with_AlertNotification(self):
+    """Test ProcessMail with an alert notification message.
+
+    This is a sanity check for alert2issue.ProcessEmailNotification to ensure
+    that it can be successfully invoked in ProcessMail. Each function of
+    alert2issue module should be tested in aler2issue_test.
+    """
+    project_name = self.project.project_name
+    verb = 'alert'
+    trooper_queue = 'my-trooper'
+    project_addr = '%s+%s+%s@example.com' % (project_name, verb, trooper_queue)
+
+    self.mox.StubOutWithMock(emailfmt, 'IsProjectAddressOnToLine')
+    emailfmt.IsProjectAddressOnToLine(
+        project_addr, mox.IgnoreArg()).AndReturn(True)
+
+    class MockAuthData(object):
+      def __init__(self):
+        self.user_pb = user_pb2.MakeUser(111)
+        self.effective_ids = set([1, 2, 3])
+        self.user_id = 111
+        self.email = 'user@example.com'
+
+    mock_auth_data = MockAuthData()
+    self.mox.StubOutWithMock(authdata.AuthData, 'FromEmail')
+    authdata.AuthData.FromEmail(
+        mox.IgnoreArg(), settings.alert_service_account, self.services,
+        autocreate=True).AndReturn(mock_auth_data)
+
+    self.mox.StubOutWithMock(alert2issue, 'ProcessEmailNotification')
+    alert2issue.ProcessEmailNotification(
+        self.services, mox.IgnoreArg(), self.project, project_addr,
+        mox.IgnoreArg(), mock_auth_data, mox.IgnoreArg(), 'awesome!', '',
+        self.msg, trooper_queue)
+
+    self.mox.ReplayAll()
+    ret = self.inbound.ProcessMail(self.msg, project_addr)
+    self.mox.VerifyAll()
+    self.assertIsNone(ret)
+
+  def testProcessIssueReply_NoIssue(self):
+    nonexistant_local_id = 200
+    mc = monorailcontext.MonorailContext(
+        self.services, cnxn=self.cnxn, requester='user@example.com')
+    mc.LookupLoggedInUserPerms(self.project)
+
+    email_tasks = self.inbound.ProcessIssueReply(
+        mc, self.project, nonexistant_local_id, self.project_addr,
+        'awesome!')
+    self.assertEqual(1, len(email_tasks))
+    email_task = email_tasks[0]
+    self.assertEqual('user@example.com', email_task['to'])
+    self.assertEqual(
+        'Could not find issue %d in project %s' %
+        (nonexistant_local_id, self.project.project_name),
+        email_task['subject'])
+
+  def testProcessIssueReply_DeletedIssue(self):
+    self.issue.deleted = True
+    mc = monorailcontext.MonorailContext(
+        self.services, cnxn=self.cnxn, requester='user@example.com')
+    mc.LookupLoggedInUserPerms(self.project)
+
+    email_tasks = self.inbound.ProcessIssueReply(
+        mc, self.project, self.issue.local_id, self.project_addr,
+        'awesome!')
+    self.assertEqual(1, len(email_tasks))
+    email_task = email_tasks[0]
+    self.assertEqual('user@example.com', email_task['to'])
+    self.assertEqual(
+        'Could not find issue %d in project %s' %
+        (self.issue.local_id, self.project.project_name), email_task['subject'])
+
+  def VerifyUserHasNoPerm(self, perms):
+    mc = monorailcontext.MonorailContext(
+        self.services, cnxn=self.cnxn, requester='user@example.com')
+    mc.perms = perms
+
+    email_tasks = self.inbound.ProcessIssueReply(
+        mc, self.project, self.issue.local_id, self.project_addr,
+        'awesome!')
+    self.assertEqual(1, len(email_tasks))
+    email_task = email_tasks[0]
+    self.assertEqual('user@example.com', email_task['to'])
+    self.assertEqual(
+        'User does not have permission to add a comment', email_task['subject'])
+
+  def testProcessIssueReply_NoViewPerm(self):
+    self.VerifyUserHasNoPerm(permissions.EMPTY_PERMISSIONSET)
+
+  def testProcessIssueReply_CantViewRestrictedIssue(self):
+    self.issue.labels.append('Restrict-View-CoreTeam')
+    self.VerifyUserHasNoPerm(permissions.USER_PERMISSIONSET)
+
+  def testProcessIssueReply_NoAddIssuePerm(self):
+    self.VerifyUserHasNoPerm(permissions.READ_ONLY_PERMISSIONSET)
+
+  def testProcessIssueReply_NoEditIssuePerm(self):
+    self.services.user.TestAddUser('user@example.com', 111)
+    mc = monorailcontext.MonorailContext(
+        self.services, cnxn=self.cnxn, requester='user@example.com')
+    mc.perms = permissions.USER_PERMISSIONSET
+    mock_uia = commitlogcommands.UpdateIssueAction(self.issue.local_id)
+
+    self.mox.StubOutWithMock(commitlogcommands, 'UpdateIssueAction')
+    commitlogcommands.UpdateIssueAction(self.issue.local_id).AndReturn(mock_uia)
+
+    self.mox.StubOutWithMock(mock_uia, 'Parse')
+    mock_uia.Parse(
+        self.cnxn, self.project.project_name, 111, ['awesome!'], self.services,
+        strip_quoted_lines=True)
+    self.mox.StubOutWithMock(mock_uia, 'Run')
+    # mc.perms does not contain permission EDIT_ISSUE.
+    mock_uia.Run(mc, self.services)
+
+    self.mox.ReplayAll()
+    ret = self.inbound.ProcessIssueReply(
+        mc, self.project, self.issue.local_id, self.project_addr,
+        'awesome!')
+    self.mox.VerifyAll()
+    self.assertIsNone(ret)
+
+  def testProcessIssueReply_Success(self):
+    self.services.user.TestAddUser('user@example.com', 111)
+    mc = monorailcontext.MonorailContext(
+        self.services, cnxn=self.cnxn, requester='user@example.com')
+    mc.perms = permissions.COMMITTER_ACTIVE_PERMISSIONSET
+    mock_uia = commitlogcommands.UpdateIssueAction(self.issue.local_id)
+
+    self.mox.StubOutWithMock(commitlogcommands, 'UpdateIssueAction')
+    commitlogcommands.UpdateIssueAction(self.issue.local_id).AndReturn(mock_uia)
+
+    self.mox.StubOutWithMock(mock_uia, 'Parse')
+    mock_uia.Parse(
+        self.cnxn, self.project.project_name, 111, ['awesome!'], self.services,
+        strip_quoted_lines=True)
+    self.mox.StubOutWithMock(mock_uia, 'Run')
+    mock_uia.Run(mc, self.services)
+
+    self.mox.ReplayAll()
+    ret = self.inbound.ProcessIssueReply(
+        mc, self.project, self.issue.local_id, self.project_addr,
+        'awesome!')
+    self.mox.VerifyAll()
+    self.assertIsNone(ret)
+
+
+class BouncedEmailTest(unittest.TestCase):
+
+  def setUp(self):
+    self.cnxn = 'fake cnxn'
+    self.services = service_manager.Services(
+        user=fake.UserService())
+    self.user = self.services.user.TestAddUser('user@example.com', 111)
+
+    app = webapp2.WSGIApplication(config={'services': self.services})
+    app.set_globals(app=app)
+
+    self.servlet = inboundemail.BouncedEmail()
+    self.mox = mox.Mox()
+
+  def tearDown(self):
+    self.mox.UnsetStubs()
+    self.mox.ResetAll()
+
+  def testPost_Normal(self):
+    """Normally, our post() just calls BounceNotificationHandler post()."""
+    self.mox.StubOutWithMock(BounceNotificationHandler, 'post')
+    BounceNotificationHandler.post()
+    self.mox.ReplayAll()
+
+    self.servlet.post()
+    self.mox.VerifyAll()
+
+  def testPost_Exception(self):
+    """Our post() method works around an escaping bug."""
+    self.servlet.request = webapp2.Request.blank(
+        '/', POST={'raw-message': 'this is an email message'})
+
+    self.mox.StubOutWithMock(BounceNotificationHandler, 'post')
+    BounceNotificationHandler.post().AndRaise(AttributeError())
+    BounceNotificationHandler.post()
+    self.mox.ReplayAll()
+
+    self.servlet.post()
+    self.mox.VerifyAll()
+
+  def testReceive_Normal(self):
+    """Find the user that bounced and set email_bounce_timestamp."""
+    self.assertEqual(0, self.user.email_bounce_timestamp)
+
+    bounce_message = testing_helpers.Blank(original={'to': 'user@example.com'})
+    self.servlet.receive(bounce_message)
+
+    self.assertNotEqual(0, self.user.email_bounce_timestamp)
+
+  def testReceive_NoSuchUser(self):
+    """When not found, log it and ignore without creating a user record."""
+    self.servlet.request = webapp2.Request.blank(
+        '/', POST={'raw-message': 'this is an email message'})
+    bounce_message = testing_helpers.Blank(
+        original={'to': 'nope@example.com'},
+        notification='notification')
+    self.servlet.receive(bounce_message)
+    self.assertEqual(1, len(self.services.user.users_by_id))