blob: fb8810b0ef7191cf4b8d350f721d7dca24cbe1b7 [file] [log] [blame]
Copybara854996b2021-09-07 19:36:02 +00001# Copyright 2016 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style
3# license that can be found in the LICENSE file or at
4# https://developers.google.com/open-source/licenses/bsd
5
6"""Unit tests for the framework_helpers module."""
7from __future__ import print_function
8from __future__ import division
9from __future__ import absolute_import
10
11import mock
12import unittest
13
Adrià Vilanova Martínez9f9ade52022-10-10 23:20:11 +020014try:
15 from mox3 import mox
16except ImportError:
17 import mox
Copybara854996b2021-09-07 19:36:02 +000018import time
19
20from businesslogic import work_env
21from framework import framework_helpers
22from framework import framework_views
23from proto import features_pb2
24from proto import project_pb2
25from proto import user_pb2
26from services import service_manager
27from testing import fake
28from testing import testing_helpers
29
30
31class HelperFunctionsTest(unittest.TestCase):
32
33 def setUp(self):
34 self.mox = mox.Mox()
35 self.time = self.mox.CreateMock(framework_helpers.time)
36 framework_helpers.time = self.time # Point to a mocked out time module.
37
38 def tearDown(self):
39 framework_helpers.time = time # Point back to the time module.
40 self.mox.UnsetStubs()
41 self.mox.ResetAll()
42
43 def testRetryDecorator_ExceedFailures(self):
44 class Tracker(object):
45 func_called = 0
46 tracker = Tracker()
47
48 # Use a function that always fails.
49 @framework_helpers.retry(2, delay=1, backoff=2)
50 def testFunc(tracker):
51 tracker.func_called += 1
52 raise Exception('Failed')
53
54 self.time.sleep(1).AndReturn(None)
55 self.time.sleep(2).AndReturn(None)
56 self.mox.ReplayAll()
57 with self.assertRaises(Exception):
58 testFunc(tracker)
59 self.mox.VerifyAll()
60 self.assertEqual(3, tracker.func_called)
61
62 def testRetryDecorator_EventuallySucceed(self):
63 class Tracker(object):
64 func_called = 0
65 tracker = Tracker()
66
67 # Use a function that succeeds on the 2nd attempt.
68 @framework_helpers.retry(2, delay=1, backoff=2)
69 def testFunc(tracker):
70 tracker.func_called += 1
71 if tracker.func_called < 2:
72 raise Exception('Failed')
73
74 self.time.sleep(1).AndReturn(None)
75 self.mox.ReplayAll()
76 testFunc(tracker)
77 self.mox.VerifyAll()
78 self.assertEqual(2, tracker.func_called)
79
80 def testGetRoleName(self):
81 proj = project_pb2.Project()
82 proj.owner_ids.append(111)
83 proj.committer_ids.append(222)
84 proj.contributor_ids.append(333)
85
86 self.assertEqual(None, framework_helpers.GetRoleName(set(), proj))
87
88 self.assertEqual('Owner', framework_helpers.GetRoleName({111}, proj))
89 self.assertEqual('Committer', framework_helpers.GetRoleName({222}, proj))
90 self.assertEqual('Contributor', framework_helpers.GetRoleName({333}, proj))
91
92 self.assertEqual(
93 'Owner', framework_helpers.GetRoleName({111, 222, 999}, proj))
94 self.assertEqual(
95 'Committer', framework_helpers.GetRoleName({222, 333, 999}, proj))
96 self.assertEqual(
97 'Contributor', framework_helpers.GetRoleName({333, 999}, proj))
98
99 def testGetHotlistRoleName(self):
100 hotlist = features_pb2.Hotlist()
101 hotlist.owner_ids.append(111)
102 hotlist.editor_ids.append(222)
103 hotlist.follower_ids.append(333)
104
105 self.assertEqual(None, framework_helpers.GetHotlistRoleName(set(), hotlist))
106
107 self.assertEqual(
108 'Owner', framework_helpers.GetHotlistRoleName({111}, hotlist))
109 self.assertEqual(
110 'Editor', framework_helpers.GetHotlistRoleName({222}, hotlist))
111 self.assertEqual(
112 'Follower', framework_helpers.GetHotlistRoleName({333}, hotlist))
113
114 self.assertEqual(
115 'Owner', framework_helpers.GetHotlistRoleName({111, 222, 999}, hotlist))
116 self.assertEqual(
117 'Editor', framework_helpers.GetHotlistRoleName(
118 {222, 333, 999}, hotlist))
119 self.assertEqual(
120 'Follower', framework_helpers.GetHotlistRoleName({333, 999}, hotlist))
121
122
123class UrlFormattingTest(unittest.TestCase):
124 """Tests for URL formatting."""
125
126 def setUp(self):
127 self.services = service_manager.Services(user=fake.UserService())
128
129 def testFormatMovedProjectURL(self):
130 """Project foo has been moved to bar. User is visiting /p/foo/..."""
131 mr = testing_helpers.MakeMonorailRequest()
132 mr.current_page_url = '/p/foo/'
133 self.assertEqual(
134 '/p/bar/',
135 framework_helpers.FormatMovedProjectURL(mr, 'bar'))
136
137 mr.current_page_url = '/p/foo/issues/list'
138 self.assertEqual(
139 '/p/bar/issues/list',
140 framework_helpers.FormatMovedProjectURL(mr, 'bar'))
141
142 mr.current_page_url = '/p/foo/issues/detail?id=123'
143 self.assertEqual(
144 '/p/bar/issues/detail?id=123',
145 framework_helpers.FormatMovedProjectURL(mr, 'bar'))
146
147 mr.current_page_url = '/p/foo/issues/detail?id=123#c7'
148 self.assertEqual(
149 '/p/bar/issues/detail?id=123#c7',
150 framework_helpers.FormatMovedProjectURL(mr, 'bar'))
151
152 def testFormatURL(self):
153 mr = testing_helpers.MakeMonorailRequest()
154 path = '/dude/wheres/my/car'
155 recognized_params = [(name, mr.GetParam(name)) for name in
156 framework_helpers.RECOGNIZED_PARAMS]
157 url = framework_helpers.FormatURL(recognized_params, path)
158 self.assertEqual(path, url)
159
160 def testFormatURLWithRecognizedParams(self):
161 params = {}
162 query = []
163 for name in framework_helpers.RECOGNIZED_PARAMS:
164 params[name] = name
165 query.append('%s=%s' % (name, 123))
166 path = '/dude/wheres/my/car'
167 expected = '%s?%s' % (path, '&'.join(query))
168 mr = testing_helpers.MakeMonorailRequest(path=expected)
169 recognized_params = [(name, mr.GetParam(name)) for name in
170 framework_helpers.RECOGNIZED_PARAMS]
171 # No added params.
172 url = framework_helpers.FormatURL(recognized_params, path)
173 self.assertEqual(expected, url)
174
175 def testFormatURLWithKeywordArgs(self):
176 params = {}
177 query_pairs = []
178 for name in framework_helpers.RECOGNIZED_PARAMS:
179 params[name] = name
180 if name != 'can' and name != 'start':
181 query_pairs.append('%s=%s' % (name, 123))
182 path = '/dude/wheres/my/car'
183 mr = testing_helpers.MakeMonorailRequest(
184 path='%s?%s' % (path, '&'.join(query_pairs)))
185 query_pairs.append('can=yep')
186 query_pairs.append('start=486')
187 query_string = '&'.join(query_pairs)
188 expected = '%s?%s' % (path, query_string)
189 recognized_params = [(name, mr.GetParam(name)) for name in
190 framework_helpers.RECOGNIZED_PARAMS]
191 url = framework_helpers.FormatURL(
192 recognized_params, path, can='yep', start=486)
193 self.assertEqual(expected, url)
194
195 def testFormatURLWithKeywordArgsAndID(self):
196 params = {}
197 query_pairs = []
198 query_pairs.append('id=200') # id should be the first parameter.
199 for name in framework_helpers.RECOGNIZED_PARAMS:
200 params[name] = name
201 if name != 'can' and name != 'start':
202 query_pairs.append('%s=%s' % (name, 123))
203 path = '/dude/wheres/my/car'
204 mr = testing_helpers.MakeMonorailRequest(
205 path='%s?%s' % (path, '&'.join(query_pairs)))
206 query_pairs.append('can=yep')
207 query_pairs.append('start=486')
208 query_string = '&'.join(query_pairs)
209 expected = '%s?%s' % (path, query_string)
210 recognized_params = [(name, mr.GetParam(name)) for name in
211 framework_helpers.RECOGNIZED_PARAMS]
212 url = framework_helpers.FormatURL(
213 recognized_params, path, can='yep', start=486, id=200)
214 self.assertEqual(expected, url)
215
216 def testFormatURLWithStrangeParams(self):
217 mr = testing_helpers.MakeMonorailRequest(path='/foo?start=0')
218 recognized_params = [(name, mr.GetParam(name)) for name in
219 framework_helpers.RECOGNIZED_PARAMS]
220 url = framework_helpers.FormatURL(
221 recognized_params, '/foo',
222 r=0, path='/foo/bar', sketchy='/foo/ bar baz ')
223 self.assertEqual(
224 '/foo?start=0&path=/foo/bar&r=0&sketchy=/foo/%20bar%20baz%20',
225 url)
226
227 def testFormatAbsoluteURL(self):
228 _request, mr = testing_helpers.GetRequestObjects(
229 path='/p/proj/some-path',
230 headers={'Host': 'www.test.com'})
231 self.assertEqual(
232 'http://www.test.com/p/proj/some/path',
233 framework_helpers.FormatAbsoluteURL(mr, '/some/path'))
234
235 def testFormatAbsoluteURL_CommonRequestParams(self):
236 _request, mr = testing_helpers.GetRequestObjects(
237 path='/p/proj/some-path?foo=bar&can=1',
238 headers={'Host': 'www.test.com'})
239 self.assertEqual(
240 'http://www.test.com/p/proj/some/path?can=1',
241 framework_helpers.FormatAbsoluteURL(mr, '/some/path'))
242 self.assertEqual(
243 'http://www.test.com/p/proj/some/path',
244 framework_helpers.FormatAbsoluteURL(
245 mr, '/some/path', copy_params=False))
246
247 def testFormatAbsoluteURL_NoProject(self):
248 path = '/some/path'
249 _request, mr = testing_helpers.GetRequestObjects(
250 headers={'Host': 'www.test.com'}, path=path)
251 url = framework_helpers.FormatAbsoluteURL(mr, path, include_project=False)
252 self.assertEqual(url, 'http://www.test.com/some/path')
253
254 def testGetHostPort_Local(self):
255 """We use testing-app.appspot.com when running locally."""
256 self.assertEqual('testing-app.appspot.com',
257 framework_helpers.GetHostPort())
258 self.assertEqual('testing-app.appspot.com',
259 framework_helpers.GetHostPort(project_name='proj'))
260
261 @mock.patch('settings.preferred_domains',
262 {'testing-app.appspot.com': 'example.com'})
263 def testGetHostPort_PreferredDomain(self):
264 """A prod server can have a preferred domain."""
265 self.assertEqual('example.com',
266 framework_helpers.GetHostPort())
267 self.assertEqual('example.com',
268 framework_helpers.GetHostPort(project_name='proj'))
269
270 @mock.patch('settings.branded_domains',
271 {'proj': 'branded.com', '*': 'unbranded.com'})
272 @mock.patch('settings.preferred_domains',
273 {'testing-app.appspot.com': 'example.com'})
274 def testGetHostPort_BrandedDomain(self):
275 """A prod server can have a preferred domain."""
276 self.assertEqual('example.com',
277 framework_helpers.GetHostPort())
278 self.assertEqual('branded.com',
279 framework_helpers.GetHostPort(project_name='proj'))
280 self.assertEqual('unbranded.com',
281 framework_helpers.GetHostPort(project_name='other-proj'))
282
283 def testIssueCommentURL(self):
284 hostport = 'port.someplex.com'
285 proj = project_pb2.Project()
286 proj.project_name = 'proj'
287
288 url = 'https://port.someplex.com/p/proj/issues/detail?id=2'
289 actual_url = framework_helpers.IssueCommentURL(
290 hostport, proj, 2)
291 self.assertEqual(actual_url, url)
292
293 url = 'https://port.someplex.com/p/proj/issues/detail?id=2#c2'
294 actual_url = framework_helpers.IssueCommentURL(
295 hostport, proj, 2, seq_num=2)
296 self.assertEqual(actual_url, url)
297
298
299class WordWrapSuperLongLinesTest(unittest.TestCase):
300
301 def testEmptyLogMessage(self):
302 msg = ''
303 wrapped_msg = framework_helpers.WordWrapSuperLongLines(msg)
304 self.assertEqual(wrapped_msg, '')
305
306 def testShortLines(self):
307 msg = 'one\ntwo\nthree\n'
308 wrapped_msg = framework_helpers.WordWrapSuperLongLines(msg)
309 expected = 'one\ntwo\nthree\n'
310 self.assertEqual(wrapped_msg, expected)
311
312 def testOneLongLine(self):
313 msg = ('This is a super long line that just goes on and on '
314 'and it seems like it will never stop because it is '
315 'super long and it was entered by a user who had no '
316 'familiarity with the return key.')
317 wrapped_msg = framework_helpers.WordWrapSuperLongLines(msg)
318 expected = ('This is a super long line that just goes on and on and it '
319 'seems like it will never stop because it\n'
320 'is super long and it was entered by a user who had no '
321 'familiarity with the return key.')
322 self.assertEqual(wrapped_msg, expected)
323
324 msg2 = ('This is a super long line that just goes on and on '
325 'and it seems like it will never stop because it is '
326 'super long and it was entered by a user who had no '
327 'familiarity with the return key. '
328 'This is a super long line that just goes on and on '
329 'and it seems like it will never stop because it is '
330 'super long and it was entered by a user who had no '
331 'familiarity with the return key.')
332 wrapped_msg2 = framework_helpers.WordWrapSuperLongLines(msg2)
333 expected2 = ('This is a super long line that just goes on and on and it '
334 'seems like it will never stop because it\n'
335 'is super long and it was entered by a user who had no '
336 'familiarity with the return key. This is a\n'
337 'super long line that just goes on and on and it seems like '
338 'it will never stop because it is super\n'
339 'long and it was entered by a user who had no familiarity '
340 'with the return key.')
341 self.assertEqual(wrapped_msg2, expected2)
342
343 def testMixOfShortAndLong(self):
344 msg = ('[Author: mpcomplete]\n'
345 '\n'
346 # Description on one long line
347 'Fix a memory leak in JsArray and JsObject for the IE and NPAPI '
348 'ports. Each time you call GetElement* or GetProperty* to '
349 'retrieve string or object token, the token would be leaked. '
350 'I added a JsScopedToken to ensure that the right thing is '
351 'done when the object leaves scope, depending on the platform.\n'
352 '\n'
353 'R=zork\n'
354 'CC=google-gears-eng@googlegroups.com\n'
355 'DELTA=108 (52 added, 36 deleted, 20 changed)\n'
356 'OCL=5932446\n'
357 'SCL=5933728\n')
358 wrapped_msg = framework_helpers.WordWrapSuperLongLines(msg)
359 expected = (
360 '[Author: mpcomplete]\n'
361 '\n'
362 'Fix a memory leak in JsArray and JsObject for the IE and NPAPI '
363 'ports. Each time you call\n'
364 'GetElement* or GetProperty* to retrieve string or object token, the '
365 'token would be leaked. I added\n'
366 'a JsScopedToken to ensure that the right thing is done when the '
367 'object leaves scope, depending on\n'
368 'the platform.\n'
369 '\n'
370 'R=zork\n'
371 'CC=google-gears-eng@googlegroups.com\n'
372 'DELTA=108 (52 added, 36 deleted, 20 changed)\n'
373 'OCL=5932446\n'
374 'SCL=5933728\n')
375 self.assertEqual(wrapped_msg, expected)
376
377
378class ComputeListDeltasTest(unittest.TestCase):
379
380 def DoOne(self, old=None, new=None, added=None, removed=None):
381 """Run one call to the target method and check expected results."""
382 actual_added, actual_removed = framework_helpers.ComputeListDeltas(
383 old, new)
384 self.assertItemsEqual(added, actual_added)
385 self.assertItemsEqual(removed, actual_removed)
386
387 def testEmptyLists(self):
388 self.DoOne(old=[], new=[], added=[], removed=[])
389 self.DoOne(old=[1, 2], new=[], added=[], removed=[1, 2])
390 self.DoOne(old=[], new=[1, 2], added=[1, 2], removed=[])
391
392 def testUnchanged(self):
393 self.DoOne(old=[1], new=[1], added=[], removed=[])
394 self.DoOne(old=[1, 2], new=[1, 2], added=[], removed=[])
395 self.DoOne(old=[1, 2], new=[2, 1], added=[], removed=[])
396
397 def testCompleteChange(self):
398 self.DoOne(old=[1, 2], new=[3, 4], added=[3, 4], removed=[1, 2])
399
400 def testGeneralChange(self):
401 self.DoOne(old=[1, 2], new=[2], added=[], removed=[1])
402 self.DoOne(old=[1], new=[1, 2], added=[2], removed=[])
403 self.DoOne(old=[1, 2], new=[2, 3], added=[3], removed=[1])
404
405
406class UserSettingsTest(unittest.TestCase):
407
408 def setUp(self):
409 self.mr = testing_helpers.MakeMonorailRequest()
410 self.cnxn = 'cnxn'
411 self.services = service_manager.Services(
412 user=fake.UserService(),
413 usergroup=fake.UserGroupService())
414
415 def testGatherUnifiedSettingsPageData(self):
416 mr = self.mr
417 mr.auth.user_view = framework_views.StuffUserView(100, 'user@invalid', True)
418 mr.auth.user_view.profile_url = '/u/profile/url'
419 userprefs = user_pb2.UserPrefs(
420 prefs=[user_pb2.UserPrefValue(name='public_issue_notice', value='true')])
421 page_data = framework_helpers.UserSettings.GatherUnifiedSettingsPageData(
422 mr.auth.user_id, mr.auth.user_view, mr.auth.user_pb, userprefs)
423
424 expected_keys = [
425 'settings_user',
426 'settings_user_pb',
427 'settings_user_is_banned',
428 'self',
429 'profile_url_fragment',
430 'preview_on_hover',
431 'settings_user_prefs',
432 ]
433 self.assertItemsEqual(expected_keys, list(page_data.keys()))
434
435 self.assertEqual('profile/url', page_data['profile_url_fragment'])
436 self.assertTrue(page_data['settings_user_prefs'].public_issue_notice)
437 self.assertFalse(page_data['settings_user_prefs'].restrict_new_issues)
438
439 def testGatherUnifiedSettingsPageData_NoUserPrefs(self):
440 """If UserPrefs were not loaded, consider them all false."""
441 mr = self.mr
442 mr.auth.user_view = framework_views.StuffUserView(100, 'user@invalid', True)
443 userprefs = None
444
445 page_data = framework_helpers.UserSettings.GatherUnifiedSettingsPageData(
446 mr.auth.user_id, mr.auth.user_view, mr.auth.user_pb, userprefs)
447
448 self.assertFalse(page_data['settings_user_prefs'].public_issue_notice)
449 self.assertFalse(page_data['settings_user_prefs'].restrict_new_issues)
450
451 def testProcessBanForm(self):
452 """We can ban and unban users."""
453 user = self.services.user.TestAddUser('one@example.com', 111)
454 post_data = {'banned': 1, 'banned_reason': 'rude'}
455 framework_helpers.UserSettings.ProcessBanForm(
456 self.cnxn, self.services.user, post_data, 111, user)
457 self.assertEqual('rude', user.banned)
458
459 post_data = {} # not banned
460 framework_helpers.UserSettings.ProcessBanForm(
461 self.cnxn, self.services.user, post_data, 111, user)
462 self.assertEqual('', user.banned)
463
464 def testProcessSettingsForm_OldStylePrefs(self):
465 """We can set prefs that are stored in the User PB."""
466 user = self.services.user.TestAddUser('one@example.com', 111)
467 post_data = {'obscure_email': 1, 'notify': 1}
468 with work_env.WorkEnv(self.mr, self.services) as we:
469 framework_helpers.UserSettings.ProcessSettingsForm(
470 we, post_data, user)
471
472 self.assertTrue(user.obscure_email)
473 self.assertTrue(user.notify_issue_change)
474 self.assertFalse(user.notify_starred_ping)
475
476 def testProcessSettingsForm_NewStylePrefs(self):
477 """We can set prefs that are stored in the UserPrefs PB."""
478 user = self.services.user.TestAddUser('one@example.com', 111)
479 post_data = {'restrict_new_issues': 1}
480 with work_env.WorkEnv(self.mr, self.services) as we:
481 framework_helpers.UserSettings.ProcessSettingsForm(
482 we, post_data, user)
483 userprefs = we.GetUserPrefs(111)
484
485 actual = {upv.name: upv.value
486 for upv in userprefs.prefs}
487 expected = {
488 'restrict_new_issues': 'true',
489 'public_issue_notice': 'false',
490 }
491 self.assertEqual(expected, actual)
492
493
494class MurmurHash3Test(unittest.TestCase):
495
496 def testMurmurHash(self):
497 test_data = [
498 ('', 0),
499 ('agable@chromium.org', 4092810879),
500 (u'jrobbins@chromium.org', 904770043),
501 ('seanmccullough%google.com@gtempaccount.com', 1301269279),
502 ('rmistry+monorail@chromium.org', 4186878788),
503 ('jparent+foo@', 2923900874),
504 ('@example.com', 3043483168),
505 ]
506 hashes = [framework_helpers.MurmurHash3_x86_32(x)
507 for (x, _) in test_data]
508 self.assertListEqual(hashes, [e for (_, e) in test_data])
509
510 def testMurmurHashWithSeed(self):
511 test_data = [
512 ('', 1113155926, 2270882445),
513 ('agable@chromium.org', 772936925, 3995066671),
514 (u'jrobbins@chromium.org', 1519359761, 1273489513),
515 ('seanmccullough%google.com@gtempaccount.com', 49913829, 1202521153),
516 ('rmistry+monorail@chromium.org', 314860298, 3636123309),
517 ('jparent+foo@', 195791379, 332453977),
518 ('@example.com', 521490555, 257496459),
519 ]
520 hashes = [framework_helpers.MurmurHash3_x86_32(x, s)
521 for (x, s, _) in test_data]
522 self.assertListEqual(hashes, [e for (_, _, e) in test_data])
523
524
525class MakeRandomKeyTest(unittest.TestCase):
526
527 def testMakeRandomKey_Normal(self):
528 key1 = framework_helpers.MakeRandomKey()
529 key2 = framework_helpers.MakeRandomKey()
530 self.assertEqual(128, len(key1))
531 self.assertEqual(128, len(key2))
532 self.assertNotEqual(key1, key2)
533
534 def testMakeRandomKey_Length(self):
535 key = framework_helpers.MakeRandomKey()
536 self.assertEqual(128, len(key))
537 key16 = framework_helpers.MakeRandomKey(length=16)
538 self.assertEqual(16, len(key16))
539
540 def testMakeRandomKey_Chars(self):
541 key = framework_helpers.MakeRandomKey(chars='a', length=4)
542 self.assertEqual('aaaa', key)
543
544
545class IsServiceAccountTest(unittest.TestCase):
546
547 def testIsServiceAccount(self):
548 appspot = 'abc@appspot.gserviceaccount.com'
549 developer = '@developer.gserviceaccount.com'
550 bugdroid = 'bugdroid1@chromium.org'
551 user = 'test@example.com'
552
553 self.assertTrue(framework_helpers.IsServiceAccount(appspot))
554 self.assertTrue(framework_helpers.IsServiceAccount(developer))
555 self.assertTrue(framework_helpers.IsServiceAccount(bugdroid))
556 self.assertFalse(framework_helpers.IsServiceAccount(user))
557
558 client_emails = set([appspot, developer, bugdroid])
559 self.assertTrue(framework_helpers.IsServiceAccount(
560 appspot, client_emails=client_emails))
561 self.assertTrue(framework_helpers.IsServiceAccount(
562 developer, client_emails=client_emails))
563 self.assertTrue(framework_helpers.IsServiceAccount(
564 bugdroid, client_emails=client_emails))
565 self.assertFalse(framework_helpers.IsServiceAccount(
566 user, client_emails=client_emails))