blob: 6c138276caaba70b8808fb3bf719fb56b7a0e91e [file] [log] [blame]
Copybara854996b2021-09-07 19:36:02 +00001# Copyright 2016 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style
3# license that can be found in the LICENSE file or at
4# https://developers.google.com/open-source/licenses/bsd
5
6"""Unittests for monorail.feature.inboundemail."""
7from __future__ import print_function
8from __future__ import division
9from __future__ import absolute_import
10
11import unittest
12import webapp2
13from mock import patch
14
15import mox
16import time
17
18from google.appengine.ext.webapp.mail_handlers import BounceNotificationHandler
19
20import settings
21from businesslogic import work_env
22from features import alert2issue
23from features import commitlogcommands
24from features import inboundemail
25from framework import authdata
26from framework import emailfmt
27from framework import monorailcontext
28from framework import permissions
29from proto import project_pb2
30from proto import tracker_pb2
31from proto import user_pb2
32from services import service_manager
33from testing import fake
34from testing import testing_helpers
35from tracker import tracker_helpers
36
37
38class InboundEmailTest(unittest.TestCase):
39
40 def setUp(self):
41 self.cnxn = 'fake cnxn'
42 self.services = service_manager.Services(
43 config=fake.ConfigService(),
44 issue=fake.IssueService(),
45 user=fake.UserService(),
46 usergroup=fake.UserGroupService(),
47 project=fake.ProjectService())
48 self.project = self.services.project.TestAddProject(
49 'proj', project_id=987, process_inbound_email=True,
50 contrib_ids=[111])
51 self.project_addr = 'proj@monorail.example.com'
52
53 self.issue = tracker_pb2.Issue()
54 self.issue.project_id = 987
55 self.issue.local_id = 100
56 self.services.issue.TestAddIssue(self.issue)
57
58 self.msg = testing_helpers.MakeMessage(
59 testing_helpers.HEADER_LINES, 'awesome!')
60
61 request, _ = testing_helpers.GetRequestObjects()
62 self.inbound = inboundemail.InboundEmail(request, None, self.services)
63 self.mox = mox.Mox()
64
65 def tearDown(self):
66 self.mox.UnsetStubs()
67 self.mox.ResetAll()
68
69 def testTemplates(self):
70 for name, template_path in self.inbound._templates.items():
71 assert(name in inboundemail.MSG_TEMPLATES)
72 assert(
73 template_path.GetTemplatePath().endswith(
74 inboundemail.MSG_TEMPLATES[name]))
75
76 def testProcessMail_MsgTooBig(self):
77 self.mox.StubOutWithMock(emailfmt, 'IsBodyTooBigToParse')
78 emailfmt.IsBodyTooBigToParse(mox.IgnoreArg()).AndReturn(True)
79 self.mox.ReplayAll()
80
81 email_tasks = self.inbound.ProcessMail(self.msg, self.project_addr)
82 self.mox.VerifyAll()
83 self.assertEqual(1, len(email_tasks))
84 email_task = email_tasks[0]
85 self.assertEqual('user@example.com', email_task['to'])
86 self.assertEqual('Email body too long', email_task['subject'])
87
88 def testProcessMail_NoProjectOnToLine(self):
89 self.mox.StubOutWithMock(emailfmt, 'IsProjectAddressOnToLine')
90 emailfmt.IsProjectAddressOnToLine(
91 self.project_addr, [self.project_addr]).AndReturn(False)
92 self.mox.ReplayAll()
93
94 ret = self.inbound.ProcessMail(self.msg, self.project_addr)
95 self.mox.VerifyAll()
96 self.assertIsNone(ret)
97
98 def testProcessMail_IssueUnidentified(self):
99 self.mox.StubOutWithMock(emailfmt, 'IdentifyProjectVerbAndLabel')
100 emailfmt.IdentifyProjectVerbAndLabel(self.project_addr).AndReturn(('proj',
101 None, None))
102
103 self.mox.StubOutWithMock(emailfmt, 'IdentifyIssue')
104 emailfmt.IdentifyIssue('proj', mox.IgnoreArg()).AndReturn((None))
105
106 self.mox.ReplayAll()
107
108 ret = self.inbound.ProcessMail(self.msg, self.project_addr)
109 self.mox.VerifyAll()
110 self.assertIsNone(ret)
111
112 def testProcessMail_ProjectNotLive(self):
113 self.services.user.TestAddUser('user@example.com', 111)
114 self.project.state = project_pb2.ProjectState.DELETABLE
115 email_tasks = self.inbound.ProcessMail(self.msg, self.project_addr)
116 email_task = email_tasks[0]
117 self.assertEqual('user@example.com', email_task['to'])
118 self.assertEqual('Project not found', email_task['subject'])
119
120 def testProcessMail_ProjectInboundEmailDisabled(self):
121 self.services.user.TestAddUser('user@example.com', 111)
122 self.project.process_inbound_email = False
123 email_tasks = self.inbound.ProcessMail(self.msg, self.project_addr)
124 email_task = email_tasks[0]
125 self.assertEqual('user@example.com', email_task['to'])
126 self.assertEqual(
127 'Email replies are not enabled in project proj', email_task['subject'])
128
129 def testProcessMail_NoRefHeader(self):
130 self.services.user.TestAddUser('user@example.com', 111)
131 self.mox.StubOutWithMock(emailfmt, 'ValidateReferencesHeader')
132 emailfmt.ValidateReferencesHeader(
133 mox.IgnoreArg(), self.project, mox.IgnoreArg(),
134 mox.IgnoreArg()).AndReturn(False)
135 emailfmt.ValidateReferencesHeader(
136 mox.IgnoreArg(), self.project, mox.IgnoreArg(),
137 mox.IgnoreArg()).AndReturn(False)
138 self.mox.ReplayAll()
139
140 email_tasks = self.inbound.ProcessMail(self.msg, self.project_addr)
141 self.mox.VerifyAll()
142 self.assertEqual(1, len(email_tasks))
143 email_task = email_tasks[0]
144 self.assertEqual('user@example.com', email_task['to'])
145 self.assertEqual(
146 'Your message is not a reply to a notification email',
147 email_task['subject'])
148
149 def testProcessMail_NoAccount(self):
150 # Note: not calling TestAddUser().
151 email_tasks = self.inbound.ProcessMail(self.msg, self.project_addr)
152 self.mox.VerifyAll()
153 self.assertEqual(1, len(email_tasks))
154 email_task = email_tasks[0]
155 self.assertEqual('user@example.com', email_task['to'])
156 self.assertEqual(
157 'Could not determine account of sender', email_task['subject'])
158
159 def testProcessMail_BannedAccount(self):
160 user_pb = self.services.user.TestAddUser('user@example.com', 111)
161 user_pb.banned = 'banned'
162
163 self.mox.StubOutWithMock(emailfmt, 'ValidateReferencesHeader')
164 emailfmt.ValidateReferencesHeader(
165 mox.IgnoreArg(), self.project, mox.IgnoreArg(),
166 mox.IgnoreArg()).AndReturn(True)
167 self.mox.ReplayAll()
168
169 email_tasks = self.inbound.ProcessMail(self.msg, self.project_addr)
170 self.mox.VerifyAll()
171 self.assertEqual(1, len(email_tasks))
172 email_task = email_tasks[0]
173 self.assertEqual('user@example.com', email_task['to'])
174 self.assertEqual(
175 'You are banned from using this issue tracker', email_task['subject'])
176
177 def testProcessMail_Success(self):
178 self.services.user.TestAddUser('user@example.com', 111)
179
180 self.mox.StubOutWithMock(emailfmt, 'ValidateReferencesHeader')
181 emailfmt.ValidateReferencesHeader(
182 mox.IgnoreArg(), self.project, mox.IgnoreArg(),
183 mox.IgnoreArg()).AndReturn(True)
184
185 self.mox.StubOutWithMock(self.inbound, 'ProcessIssueReply')
186 self.inbound.ProcessIssueReply(
187 mox.IgnoreArg(), self.project, 123, self.project_addr,
188 'awesome!')
189
190 self.mox.ReplayAll()
191
192 ret = self.inbound.ProcessMail(self.msg, self.project_addr)
193 self.mox.VerifyAll()
194 self.assertIsNone(ret)
195
196 def testProcessMail_Success_with_AlertNotification(self):
197 """Test ProcessMail with an alert notification message.
198
199 This is a sanity check for alert2issue.ProcessEmailNotification to ensure
200 that it can be successfully invoked in ProcessMail. Each function of
201 alert2issue module should be tested in aler2issue_test.
202 """
203 project_name = self.project.project_name
204 verb = 'alert'
205 trooper_queue = 'my-trooper'
206 project_addr = '%s+%s+%s@example.com' % (project_name, verb, trooper_queue)
207
208 self.mox.StubOutWithMock(emailfmt, 'IsProjectAddressOnToLine')
209 emailfmt.IsProjectAddressOnToLine(
210 project_addr, mox.IgnoreArg()).AndReturn(True)
211
212 class MockAuthData(object):
213 def __init__(self):
214 self.user_pb = user_pb2.MakeUser(111)
215 self.effective_ids = set([1, 2, 3])
216 self.user_id = 111
217 self.email = 'user@example.com'
218
219 mock_auth_data = MockAuthData()
220 self.mox.StubOutWithMock(authdata.AuthData, 'FromEmail')
221 authdata.AuthData.FromEmail(
222 mox.IgnoreArg(), settings.alert_service_account, self.services,
223 autocreate=True).AndReturn(mock_auth_data)
224
225 self.mox.StubOutWithMock(alert2issue, 'ProcessEmailNotification')
226 alert2issue.ProcessEmailNotification(
227 self.services, mox.IgnoreArg(), self.project, project_addr,
228 mox.IgnoreArg(), mock_auth_data, mox.IgnoreArg(), 'awesome!', '',
229 self.msg, trooper_queue)
230
231 self.mox.ReplayAll()
232 ret = self.inbound.ProcessMail(self.msg, project_addr)
233 self.mox.VerifyAll()
234 self.assertIsNone(ret)
235
236 def testProcessIssueReply_NoIssue(self):
237 nonexistant_local_id = 200
238 mc = monorailcontext.MonorailContext(
239 self.services, cnxn=self.cnxn, requester='user@example.com')
240 mc.LookupLoggedInUserPerms(self.project)
241
242 email_tasks = self.inbound.ProcessIssueReply(
243 mc, self.project, nonexistant_local_id, self.project_addr,
244 'awesome!')
245 self.assertEqual(1, len(email_tasks))
246 email_task = email_tasks[0]
247 self.assertEqual('user@example.com', email_task['to'])
248 self.assertEqual(
249 'Could not find issue %d in project %s' %
250 (nonexistant_local_id, self.project.project_name),
251 email_task['subject'])
252
253 def testProcessIssueReply_DeletedIssue(self):
254 self.issue.deleted = True
255 mc = monorailcontext.MonorailContext(
256 self.services, cnxn=self.cnxn, requester='user@example.com')
257 mc.LookupLoggedInUserPerms(self.project)
258
259 email_tasks = self.inbound.ProcessIssueReply(
260 mc, self.project, self.issue.local_id, self.project_addr,
261 'awesome!')
262 self.assertEqual(1, len(email_tasks))
263 email_task = email_tasks[0]
264 self.assertEqual('user@example.com', email_task['to'])
265 self.assertEqual(
266 'Could not find issue %d in project %s' %
267 (self.issue.local_id, self.project.project_name), email_task['subject'])
268
269 def VerifyUserHasNoPerm(self, perms):
270 mc = monorailcontext.MonorailContext(
271 self.services, cnxn=self.cnxn, requester='user@example.com')
272 mc.perms = perms
273
274 email_tasks = self.inbound.ProcessIssueReply(
275 mc, self.project, self.issue.local_id, self.project_addr,
276 'awesome!')
277 self.assertEqual(1, len(email_tasks))
278 email_task = email_tasks[0]
279 self.assertEqual('user@example.com', email_task['to'])
280 self.assertEqual(
281 'User does not have permission to add a comment', email_task['subject'])
282
283 def testProcessIssueReply_NoViewPerm(self):
284 self.VerifyUserHasNoPerm(permissions.EMPTY_PERMISSIONSET)
285
286 def testProcessIssueReply_CantViewRestrictedIssue(self):
287 self.issue.labels.append('Restrict-View-CoreTeam')
288 self.VerifyUserHasNoPerm(permissions.USER_PERMISSIONSET)
289
290 def testProcessIssueReply_NoAddIssuePerm(self):
291 self.VerifyUserHasNoPerm(permissions.READ_ONLY_PERMISSIONSET)
292
293 def testProcessIssueReply_NoEditIssuePerm(self):
294 self.services.user.TestAddUser('user@example.com', 111)
295 mc = monorailcontext.MonorailContext(
296 self.services, cnxn=self.cnxn, requester='user@example.com')
297 mc.perms = permissions.USER_PERMISSIONSET
298 mock_uia = commitlogcommands.UpdateIssueAction(self.issue.local_id)
299
300 self.mox.StubOutWithMock(commitlogcommands, 'UpdateIssueAction')
301 commitlogcommands.UpdateIssueAction(self.issue.local_id).AndReturn(mock_uia)
302
303 self.mox.StubOutWithMock(mock_uia, 'Parse')
304 mock_uia.Parse(
305 self.cnxn, self.project.project_name, 111, ['awesome!'], self.services,
306 strip_quoted_lines=True)
307 self.mox.StubOutWithMock(mock_uia, 'Run')
308 # mc.perms does not contain permission EDIT_ISSUE.
309 mock_uia.Run(mc, self.services)
310
311 self.mox.ReplayAll()
312 ret = self.inbound.ProcessIssueReply(
313 mc, self.project, self.issue.local_id, self.project_addr,
314 'awesome!')
315 self.mox.VerifyAll()
316 self.assertIsNone(ret)
317
318 def testProcessIssueReply_Success(self):
319 self.services.user.TestAddUser('user@example.com', 111)
320 mc = monorailcontext.MonorailContext(
321 self.services, cnxn=self.cnxn, requester='user@example.com')
322 mc.perms = permissions.COMMITTER_ACTIVE_PERMISSIONSET
323 mock_uia = commitlogcommands.UpdateIssueAction(self.issue.local_id)
324
325 self.mox.StubOutWithMock(commitlogcommands, 'UpdateIssueAction')
326 commitlogcommands.UpdateIssueAction(self.issue.local_id).AndReturn(mock_uia)
327
328 self.mox.StubOutWithMock(mock_uia, 'Parse')
329 mock_uia.Parse(
330 self.cnxn, self.project.project_name, 111, ['awesome!'], self.services,
331 strip_quoted_lines=True)
332 self.mox.StubOutWithMock(mock_uia, 'Run')
333 mock_uia.Run(mc, self.services)
334
335 self.mox.ReplayAll()
336 ret = self.inbound.ProcessIssueReply(
337 mc, self.project, self.issue.local_id, self.project_addr,
338 'awesome!')
339 self.mox.VerifyAll()
340 self.assertIsNone(ret)
341
342
343class BouncedEmailTest(unittest.TestCase):
344
345 def setUp(self):
346 self.cnxn = 'fake cnxn'
347 self.services = service_manager.Services(
348 user=fake.UserService())
349 self.user = self.services.user.TestAddUser('user@example.com', 111)
350
351 app = webapp2.WSGIApplication(config={'services': self.services})
352 app.set_globals(app=app)
353
354 self.servlet = inboundemail.BouncedEmail()
355 self.mox = mox.Mox()
356
357 def tearDown(self):
358 self.mox.UnsetStubs()
359 self.mox.ResetAll()
360
361 def testPost_Normal(self):
362 """Normally, our post() just calls BounceNotificationHandler post()."""
363 self.mox.StubOutWithMock(BounceNotificationHandler, 'post')
364 BounceNotificationHandler.post()
365 self.mox.ReplayAll()
366
367 self.servlet.post()
368 self.mox.VerifyAll()
369
370 def testPost_Exception(self):
371 """Our post() method works around an escaping bug."""
372 self.servlet.request = webapp2.Request.blank(
373 '/', POST={'raw-message': 'this is an email message'})
374
375 self.mox.StubOutWithMock(BounceNotificationHandler, 'post')
376 BounceNotificationHandler.post().AndRaise(AttributeError())
377 BounceNotificationHandler.post()
378 self.mox.ReplayAll()
379
380 self.servlet.post()
381 self.mox.VerifyAll()
382
383 def testReceive_Normal(self):
384 """Find the user that bounced and set email_bounce_timestamp."""
385 self.assertEqual(0, self.user.email_bounce_timestamp)
386
387 bounce_message = testing_helpers.Blank(original={'to': 'user@example.com'})
388 self.servlet.receive(bounce_message)
389
390 self.assertNotEqual(0, self.user.email_bounce_timestamp)
391
392 def testReceive_NoSuchUser(self):
393 """When not found, log it and ignore without creating a user record."""
394 self.servlet.request = webapp2.Request.blank(
395 '/', POST={'raw-message': 'this is an email message'})
396 bounce_message = testing_helpers.Blank(
397 original={'to': 'nope@example.com'},
398 notification='notification')
399 self.servlet.receive(bounce_message)
400 self.assertEqual(1, len(self.services.user.users_by_id))