blob: 1d0146cfe5bc4a42c0b50fa932ba826408646e29 [file] [log] [blame]
Copybara854996b2021-09-07 19:36:02 +00001# Copyright 2016 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style
3# license that can be found in the LICENSE file or at
4# https://developers.google.com/open-source/licenses/bsd
5
6"""Unit tests for the framework_helpers module."""
7from __future__ import print_function
8from __future__ import division
9from __future__ import absolute_import
10
11import mock
12import unittest
13
14import mox
15import time
16
17from businesslogic import work_env
18from framework import framework_helpers
19from framework import framework_views
20from proto import features_pb2
21from proto import project_pb2
22from proto import user_pb2
23from services import service_manager
24from testing import fake
25from testing import testing_helpers
26
27
28class HelperFunctionsTest(unittest.TestCase):
29
30 def setUp(self):
31 self.mox = mox.Mox()
32 self.time = self.mox.CreateMock(framework_helpers.time)
33 framework_helpers.time = self.time # Point to a mocked out time module.
34
35 def tearDown(self):
36 framework_helpers.time = time # Point back to the time module.
37 self.mox.UnsetStubs()
38 self.mox.ResetAll()
39
40 def testRetryDecorator_ExceedFailures(self):
41 class Tracker(object):
42 func_called = 0
43 tracker = Tracker()
44
45 # Use a function that always fails.
46 @framework_helpers.retry(2, delay=1, backoff=2)
47 def testFunc(tracker):
48 tracker.func_called += 1
49 raise Exception('Failed')
50
51 self.time.sleep(1).AndReturn(None)
52 self.time.sleep(2).AndReturn(None)
53 self.mox.ReplayAll()
54 with self.assertRaises(Exception):
55 testFunc(tracker)
56 self.mox.VerifyAll()
57 self.assertEqual(3, tracker.func_called)
58
59 def testRetryDecorator_EventuallySucceed(self):
60 class Tracker(object):
61 func_called = 0
62 tracker = Tracker()
63
64 # Use a function that succeeds on the 2nd attempt.
65 @framework_helpers.retry(2, delay=1, backoff=2)
66 def testFunc(tracker):
67 tracker.func_called += 1
68 if tracker.func_called < 2:
69 raise Exception('Failed')
70
71 self.time.sleep(1).AndReturn(None)
72 self.mox.ReplayAll()
73 testFunc(tracker)
74 self.mox.VerifyAll()
75 self.assertEqual(2, tracker.func_called)
76
77 def testGetRoleName(self):
78 proj = project_pb2.Project()
79 proj.owner_ids.append(111)
80 proj.committer_ids.append(222)
81 proj.contributor_ids.append(333)
82
83 self.assertEqual(None, framework_helpers.GetRoleName(set(), proj))
84
85 self.assertEqual('Owner', framework_helpers.GetRoleName({111}, proj))
86 self.assertEqual('Committer', framework_helpers.GetRoleName({222}, proj))
87 self.assertEqual('Contributor', framework_helpers.GetRoleName({333}, proj))
88
89 self.assertEqual(
90 'Owner', framework_helpers.GetRoleName({111, 222, 999}, proj))
91 self.assertEqual(
92 'Committer', framework_helpers.GetRoleName({222, 333, 999}, proj))
93 self.assertEqual(
94 'Contributor', framework_helpers.GetRoleName({333, 999}, proj))
95
96 def testGetHotlistRoleName(self):
97 hotlist = features_pb2.Hotlist()
98 hotlist.owner_ids.append(111)
99 hotlist.editor_ids.append(222)
100 hotlist.follower_ids.append(333)
101
102 self.assertEqual(None, framework_helpers.GetHotlistRoleName(set(), hotlist))
103
104 self.assertEqual(
105 'Owner', framework_helpers.GetHotlistRoleName({111}, hotlist))
106 self.assertEqual(
107 'Editor', framework_helpers.GetHotlistRoleName({222}, hotlist))
108 self.assertEqual(
109 'Follower', framework_helpers.GetHotlistRoleName({333}, hotlist))
110
111 self.assertEqual(
112 'Owner', framework_helpers.GetHotlistRoleName({111, 222, 999}, hotlist))
113 self.assertEqual(
114 'Editor', framework_helpers.GetHotlistRoleName(
115 {222, 333, 999}, hotlist))
116 self.assertEqual(
117 'Follower', framework_helpers.GetHotlistRoleName({333, 999}, hotlist))
118
119
120class UrlFormattingTest(unittest.TestCase):
121 """Tests for URL formatting."""
122
123 def setUp(self):
124 self.services = service_manager.Services(user=fake.UserService())
125
126 def testFormatMovedProjectURL(self):
127 """Project foo has been moved to bar. User is visiting /p/foo/..."""
128 mr = testing_helpers.MakeMonorailRequest()
129 mr.current_page_url = '/p/foo/'
130 self.assertEqual(
131 '/p/bar/',
132 framework_helpers.FormatMovedProjectURL(mr, 'bar'))
133
134 mr.current_page_url = '/p/foo/issues/list'
135 self.assertEqual(
136 '/p/bar/issues/list',
137 framework_helpers.FormatMovedProjectURL(mr, 'bar'))
138
139 mr.current_page_url = '/p/foo/issues/detail?id=123'
140 self.assertEqual(
141 '/p/bar/issues/detail?id=123',
142 framework_helpers.FormatMovedProjectURL(mr, 'bar'))
143
144 mr.current_page_url = '/p/foo/issues/detail?id=123#c7'
145 self.assertEqual(
146 '/p/bar/issues/detail?id=123#c7',
147 framework_helpers.FormatMovedProjectURL(mr, 'bar'))
148
149 def testFormatURL(self):
150 mr = testing_helpers.MakeMonorailRequest()
151 path = '/dude/wheres/my/car'
152 recognized_params = [(name, mr.GetParam(name)) for name in
153 framework_helpers.RECOGNIZED_PARAMS]
154 url = framework_helpers.FormatURL(recognized_params, path)
155 self.assertEqual(path, url)
156
157 def testFormatURLWithRecognizedParams(self):
158 params = {}
159 query = []
160 for name in framework_helpers.RECOGNIZED_PARAMS:
161 params[name] = name
162 query.append('%s=%s' % (name, 123))
163 path = '/dude/wheres/my/car'
164 expected = '%s?%s' % (path, '&'.join(query))
165 mr = testing_helpers.MakeMonorailRequest(path=expected)
166 recognized_params = [(name, mr.GetParam(name)) for name in
167 framework_helpers.RECOGNIZED_PARAMS]
168 # No added params.
169 url = framework_helpers.FormatURL(recognized_params, path)
170 self.assertEqual(expected, url)
171
172 def testFormatURLWithKeywordArgs(self):
173 params = {}
174 query_pairs = []
175 for name in framework_helpers.RECOGNIZED_PARAMS:
176 params[name] = name
177 if name != 'can' and name != 'start':
178 query_pairs.append('%s=%s' % (name, 123))
179 path = '/dude/wheres/my/car'
180 mr = testing_helpers.MakeMonorailRequest(
181 path='%s?%s' % (path, '&'.join(query_pairs)))
182 query_pairs.append('can=yep')
183 query_pairs.append('start=486')
184 query_string = '&'.join(query_pairs)
185 expected = '%s?%s' % (path, query_string)
186 recognized_params = [(name, mr.GetParam(name)) for name in
187 framework_helpers.RECOGNIZED_PARAMS]
188 url = framework_helpers.FormatURL(
189 recognized_params, path, can='yep', start=486)
190 self.assertEqual(expected, url)
191
192 def testFormatURLWithKeywordArgsAndID(self):
193 params = {}
194 query_pairs = []
195 query_pairs.append('id=200') # id should be the first parameter.
196 for name in framework_helpers.RECOGNIZED_PARAMS:
197 params[name] = name
198 if name != 'can' and name != 'start':
199 query_pairs.append('%s=%s' % (name, 123))
200 path = '/dude/wheres/my/car'
201 mr = testing_helpers.MakeMonorailRequest(
202 path='%s?%s' % (path, '&'.join(query_pairs)))
203 query_pairs.append('can=yep')
204 query_pairs.append('start=486')
205 query_string = '&'.join(query_pairs)
206 expected = '%s?%s' % (path, query_string)
207 recognized_params = [(name, mr.GetParam(name)) for name in
208 framework_helpers.RECOGNIZED_PARAMS]
209 url = framework_helpers.FormatURL(
210 recognized_params, path, can='yep', start=486, id=200)
211 self.assertEqual(expected, url)
212
213 def testFormatURLWithStrangeParams(self):
214 mr = testing_helpers.MakeMonorailRequest(path='/foo?start=0')
215 recognized_params = [(name, mr.GetParam(name)) for name in
216 framework_helpers.RECOGNIZED_PARAMS]
217 url = framework_helpers.FormatURL(
218 recognized_params, '/foo',
219 r=0, path='/foo/bar', sketchy='/foo/ bar baz ')
220 self.assertEqual(
221 '/foo?start=0&path=/foo/bar&r=0&sketchy=/foo/%20bar%20baz%20',
222 url)
223
224 def testFormatAbsoluteURL(self):
225 _request, mr = testing_helpers.GetRequestObjects(
226 path='/p/proj/some-path',
227 headers={'Host': 'www.test.com'})
228 self.assertEqual(
229 'http://www.test.com/p/proj/some/path',
230 framework_helpers.FormatAbsoluteURL(mr, '/some/path'))
231
232 def testFormatAbsoluteURL_CommonRequestParams(self):
233 _request, mr = testing_helpers.GetRequestObjects(
234 path='/p/proj/some-path?foo=bar&can=1',
235 headers={'Host': 'www.test.com'})
236 self.assertEqual(
237 'http://www.test.com/p/proj/some/path?can=1',
238 framework_helpers.FormatAbsoluteURL(mr, '/some/path'))
239 self.assertEqual(
240 'http://www.test.com/p/proj/some/path',
241 framework_helpers.FormatAbsoluteURL(
242 mr, '/some/path', copy_params=False))
243
244 def testFormatAbsoluteURL_NoProject(self):
245 path = '/some/path'
246 _request, mr = testing_helpers.GetRequestObjects(
247 headers={'Host': 'www.test.com'}, path=path)
248 url = framework_helpers.FormatAbsoluteURL(mr, path, include_project=False)
249 self.assertEqual(url, 'http://www.test.com/some/path')
250
251 def testGetHostPort_Local(self):
252 """We use testing-app.appspot.com when running locally."""
253 self.assertEqual('testing-app.appspot.com',
254 framework_helpers.GetHostPort())
255 self.assertEqual('testing-app.appspot.com',
256 framework_helpers.GetHostPort(project_name='proj'))
257
258 @mock.patch('settings.preferred_domains',
259 {'testing-app.appspot.com': 'example.com'})
260 def testGetHostPort_PreferredDomain(self):
261 """A prod server can have a preferred domain."""
262 self.assertEqual('example.com',
263 framework_helpers.GetHostPort())
264 self.assertEqual('example.com',
265 framework_helpers.GetHostPort(project_name='proj'))
266
267 @mock.patch('settings.branded_domains',
268 {'proj': 'branded.com', '*': 'unbranded.com'})
269 @mock.patch('settings.preferred_domains',
270 {'testing-app.appspot.com': 'example.com'})
271 def testGetHostPort_BrandedDomain(self):
272 """A prod server can have a preferred domain."""
273 self.assertEqual('example.com',
274 framework_helpers.GetHostPort())
275 self.assertEqual('branded.com',
276 framework_helpers.GetHostPort(project_name='proj'))
277 self.assertEqual('unbranded.com',
278 framework_helpers.GetHostPort(project_name='other-proj'))
279
280 def testIssueCommentURL(self):
281 hostport = 'port.someplex.com'
282 proj = project_pb2.Project()
283 proj.project_name = 'proj'
284
285 url = 'https://port.someplex.com/p/proj/issues/detail?id=2'
286 actual_url = framework_helpers.IssueCommentURL(
287 hostport, proj, 2)
288 self.assertEqual(actual_url, url)
289
290 url = 'https://port.someplex.com/p/proj/issues/detail?id=2#c2'
291 actual_url = framework_helpers.IssueCommentURL(
292 hostport, proj, 2, seq_num=2)
293 self.assertEqual(actual_url, url)
294
295
296class WordWrapSuperLongLinesTest(unittest.TestCase):
297
298 def testEmptyLogMessage(self):
299 msg = ''
300 wrapped_msg = framework_helpers.WordWrapSuperLongLines(msg)
301 self.assertEqual(wrapped_msg, '')
302
303 def testShortLines(self):
304 msg = 'one\ntwo\nthree\n'
305 wrapped_msg = framework_helpers.WordWrapSuperLongLines(msg)
306 expected = 'one\ntwo\nthree\n'
307 self.assertEqual(wrapped_msg, expected)
308
309 def testOneLongLine(self):
310 msg = ('This is a super long line that just goes on and on '
311 'and it seems like it will never stop because it is '
312 'super long and it was entered by a user who had no '
313 'familiarity with the return key.')
314 wrapped_msg = framework_helpers.WordWrapSuperLongLines(msg)
315 expected = ('This is a super long line that just goes on and on and it '
316 'seems like it will never stop because it\n'
317 'is super long and it was entered by a user who had no '
318 'familiarity with the return key.')
319 self.assertEqual(wrapped_msg, expected)
320
321 msg2 = ('This is a super long line that just goes on and on '
322 'and it seems like it will never stop because it is '
323 'super long and it was entered by a user who had no '
324 'familiarity with the return key. '
325 'This is a super long line that just goes on and on '
326 'and it seems like it will never stop because it is '
327 'super long and it was entered by a user who had no '
328 'familiarity with the return key.')
329 wrapped_msg2 = framework_helpers.WordWrapSuperLongLines(msg2)
330 expected2 = ('This is a super long line that just goes on and on and it '
331 'seems like it will never stop because it\n'
332 'is super long and it was entered by a user who had no '
333 'familiarity with the return key. This is a\n'
334 'super long line that just goes on and on and it seems like '
335 'it will never stop because it is super\n'
336 'long and it was entered by a user who had no familiarity '
337 'with the return key.')
338 self.assertEqual(wrapped_msg2, expected2)
339
340 def testMixOfShortAndLong(self):
341 msg = ('[Author: mpcomplete]\n'
342 '\n'
343 # Description on one long line
344 'Fix a memory leak in JsArray and JsObject for the IE and NPAPI '
345 'ports. Each time you call GetElement* or GetProperty* to '
346 'retrieve string or object token, the token would be leaked. '
347 'I added a JsScopedToken to ensure that the right thing is '
348 'done when the object leaves scope, depending on the platform.\n'
349 '\n'
350 'R=zork\n'
351 'CC=google-gears-eng@googlegroups.com\n'
352 'DELTA=108 (52 added, 36 deleted, 20 changed)\n'
353 'OCL=5932446\n'
354 'SCL=5933728\n')
355 wrapped_msg = framework_helpers.WordWrapSuperLongLines(msg)
356 expected = (
357 '[Author: mpcomplete]\n'
358 '\n'
359 'Fix a memory leak in JsArray and JsObject for the IE and NPAPI '
360 'ports. Each time you call\n'
361 'GetElement* or GetProperty* to retrieve string or object token, the '
362 'token would be leaked. I added\n'
363 'a JsScopedToken to ensure that the right thing is done when the '
364 'object leaves scope, depending on\n'
365 'the platform.\n'
366 '\n'
367 'R=zork\n'
368 'CC=google-gears-eng@googlegroups.com\n'
369 'DELTA=108 (52 added, 36 deleted, 20 changed)\n'
370 'OCL=5932446\n'
371 'SCL=5933728\n')
372 self.assertEqual(wrapped_msg, expected)
373
374
375class ComputeListDeltasTest(unittest.TestCase):
376
377 def DoOne(self, old=None, new=None, added=None, removed=None):
378 """Run one call to the target method and check expected results."""
379 actual_added, actual_removed = framework_helpers.ComputeListDeltas(
380 old, new)
381 self.assertItemsEqual(added, actual_added)
382 self.assertItemsEqual(removed, actual_removed)
383
384 def testEmptyLists(self):
385 self.DoOne(old=[], new=[], added=[], removed=[])
386 self.DoOne(old=[1, 2], new=[], added=[], removed=[1, 2])
387 self.DoOne(old=[], new=[1, 2], added=[1, 2], removed=[])
388
389 def testUnchanged(self):
390 self.DoOne(old=[1], new=[1], added=[], removed=[])
391 self.DoOne(old=[1, 2], new=[1, 2], added=[], removed=[])
392 self.DoOne(old=[1, 2], new=[2, 1], added=[], removed=[])
393
394 def testCompleteChange(self):
395 self.DoOne(old=[1, 2], new=[3, 4], added=[3, 4], removed=[1, 2])
396
397 def testGeneralChange(self):
398 self.DoOne(old=[1, 2], new=[2], added=[], removed=[1])
399 self.DoOne(old=[1], new=[1, 2], added=[2], removed=[])
400 self.DoOne(old=[1, 2], new=[2, 3], added=[3], removed=[1])
401
402
403class UserSettingsTest(unittest.TestCase):
404
405 def setUp(self):
406 self.mr = testing_helpers.MakeMonorailRequest()
407 self.cnxn = 'cnxn'
408 self.services = service_manager.Services(
409 user=fake.UserService(),
410 usergroup=fake.UserGroupService())
411
412 def testGatherUnifiedSettingsPageData(self):
413 mr = self.mr
414 mr.auth.user_view = framework_views.StuffUserView(100, 'user@invalid', True)
415 mr.auth.user_view.profile_url = '/u/profile/url'
416 userprefs = user_pb2.UserPrefs(
417 prefs=[user_pb2.UserPrefValue(name='public_issue_notice', value='true')])
418 page_data = framework_helpers.UserSettings.GatherUnifiedSettingsPageData(
419 mr.auth.user_id, mr.auth.user_view, mr.auth.user_pb, userprefs)
420
421 expected_keys = [
422 'settings_user',
423 'settings_user_pb',
424 'settings_user_is_banned',
425 'self',
426 'profile_url_fragment',
427 'preview_on_hover',
428 'settings_user_prefs',
429 ]
430 self.assertItemsEqual(expected_keys, list(page_data.keys()))
431
432 self.assertEqual('profile/url', page_data['profile_url_fragment'])
433 self.assertTrue(page_data['settings_user_prefs'].public_issue_notice)
434 self.assertFalse(page_data['settings_user_prefs'].restrict_new_issues)
435
436 def testGatherUnifiedSettingsPageData_NoUserPrefs(self):
437 """If UserPrefs were not loaded, consider them all false."""
438 mr = self.mr
439 mr.auth.user_view = framework_views.StuffUserView(100, 'user@invalid', True)
440 userprefs = None
441
442 page_data = framework_helpers.UserSettings.GatherUnifiedSettingsPageData(
443 mr.auth.user_id, mr.auth.user_view, mr.auth.user_pb, userprefs)
444
445 self.assertFalse(page_data['settings_user_prefs'].public_issue_notice)
446 self.assertFalse(page_data['settings_user_prefs'].restrict_new_issues)
447
448 def testProcessBanForm(self):
449 """We can ban and unban users."""
450 user = self.services.user.TestAddUser('one@example.com', 111)
451 post_data = {'banned': 1, 'banned_reason': 'rude'}
452 framework_helpers.UserSettings.ProcessBanForm(
453 self.cnxn, self.services.user, post_data, 111, user)
454 self.assertEqual('rude', user.banned)
455
456 post_data = {} # not banned
457 framework_helpers.UserSettings.ProcessBanForm(
458 self.cnxn, self.services.user, post_data, 111, user)
459 self.assertEqual('', user.banned)
460
461 def testProcessSettingsForm_OldStylePrefs(self):
462 """We can set prefs that are stored in the User PB."""
463 user = self.services.user.TestAddUser('one@example.com', 111)
464 post_data = {'obscure_email': 1, 'notify': 1}
465 with work_env.WorkEnv(self.mr, self.services) as we:
466 framework_helpers.UserSettings.ProcessSettingsForm(
467 we, post_data, user)
468
469 self.assertTrue(user.obscure_email)
470 self.assertTrue(user.notify_issue_change)
471 self.assertFalse(user.notify_starred_ping)
472
473 def testProcessSettingsForm_NewStylePrefs(self):
474 """We can set prefs that are stored in the UserPrefs PB."""
475 user = self.services.user.TestAddUser('one@example.com', 111)
476 post_data = {'restrict_new_issues': 1}
477 with work_env.WorkEnv(self.mr, self.services) as we:
478 framework_helpers.UserSettings.ProcessSettingsForm(
479 we, post_data, user)
480 userprefs = we.GetUserPrefs(111)
481
482 actual = {upv.name: upv.value
483 for upv in userprefs.prefs}
484 expected = {
485 'restrict_new_issues': 'true',
486 'public_issue_notice': 'false',
487 }
488 self.assertEqual(expected, actual)
489
490
491class MurmurHash3Test(unittest.TestCase):
492
493 def testMurmurHash(self):
494 test_data = [
495 ('', 0),
496 ('agable@chromium.org', 4092810879),
497 (u'jrobbins@chromium.org', 904770043),
498 ('seanmccullough%google.com@gtempaccount.com', 1301269279),
499 ('rmistry+monorail@chromium.org', 4186878788),
500 ('jparent+foo@', 2923900874),
501 ('@example.com', 3043483168),
502 ]
503 hashes = [framework_helpers.MurmurHash3_x86_32(x)
504 for (x, _) in test_data]
505 self.assertListEqual(hashes, [e for (_, e) in test_data])
506
507 def testMurmurHashWithSeed(self):
508 test_data = [
509 ('', 1113155926, 2270882445),
510 ('agable@chromium.org', 772936925, 3995066671),
511 (u'jrobbins@chromium.org', 1519359761, 1273489513),
512 ('seanmccullough%google.com@gtempaccount.com', 49913829, 1202521153),
513 ('rmistry+monorail@chromium.org', 314860298, 3636123309),
514 ('jparent+foo@', 195791379, 332453977),
515 ('@example.com', 521490555, 257496459),
516 ]
517 hashes = [framework_helpers.MurmurHash3_x86_32(x, s)
518 for (x, s, _) in test_data]
519 self.assertListEqual(hashes, [e for (_, _, e) in test_data])
520
521
522class MakeRandomKeyTest(unittest.TestCase):
523
524 def testMakeRandomKey_Normal(self):
525 key1 = framework_helpers.MakeRandomKey()
526 key2 = framework_helpers.MakeRandomKey()
527 self.assertEqual(128, len(key1))
528 self.assertEqual(128, len(key2))
529 self.assertNotEqual(key1, key2)
530
531 def testMakeRandomKey_Length(self):
532 key = framework_helpers.MakeRandomKey()
533 self.assertEqual(128, len(key))
534 key16 = framework_helpers.MakeRandomKey(length=16)
535 self.assertEqual(16, len(key16))
536
537 def testMakeRandomKey_Chars(self):
538 key = framework_helpers.MakeRandomKey(chars='a', length=4)
539 self.assertEqual('aaaa', key)
540
541
542class IsServiceAccountTest(unittest.TestCase):
543
544 def testIsServiceAccount(self):
545 appspot = 'abc@appspot.gserviceaccount.com'
546 developer = '@developer.gserviceaccount.com'
547 bugdroid = 'bugdroid1@chromium.org'
548 user = 'test@example.com'
549
550 self.assertTrue(framework_helpers.IsServiceAccount(appspot))
551 self.assertTrue(framework_helpers.IsServiceAccount(developer))
552 self.assertTrue(framework_helpers.IsServiceAccount(bugdroid))
553 self.assertFalse(framework_helpers.IsServiceAccount(user))
554
555 client_emails = set([appspot, developer, bugdroid])
556 self.assertTrue(framework_helpers.IsServiceAccount(
557 appspot, client_emails=client_emails))
558 self.assertTrue(framework_helpers.IsServiceAccount(
559 developer, client_emails=client_emails))
560 self.assertTrue(framework_helpers.IsServiceAccount(
561 bugdroid, client_emails=client_emails))
562 self.assertFalse(framework_helpers.IsServiceAccount(
563 user, client_emails=client_emails))