blob: c709d75bff0d720b619cc098298fca2596645c5c [file] [log] [blame]
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01001# Copyright 2016 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
Copybara854996b2021-09-07 19:36:02 +00004
5"""Tests for the user service."""
6from __future__ import print_function
7from __future__ import division
8from __future__ import absolute_import
9
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +010010import six
Copybara854996b2021-09-07 19:36:02 +000011import unittest
12
13import mock
Adrià Vilanova Martínez9f9ade52022-10-10 23:20:11 +020014try:
15 from mox3 import mox
16except ImportError:
17 import mox
Copybara854996b2021-09-07 19:36:02 +000018import time
19
20from google.appengine.ext import testbed
21
22from framework import exceptions
23from framework import framework_constants
24from framework import sql
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +010025from mrproto import user_pb2
Copybara854996b2021-09-07 19:36:02 +000026from services import user_svc
27from testing import fake
28
29
30def SetUpGetUsers(user_service, cnxn):
31 """Set up expected calls to SQL tables."""
32 user_service.user_tbl.Select(
33 cnxn, cols=user_svc.USER_COLS, user_id=[333]).AndReturn(
34 [(333, 'c@example.com', False, False, False, False, True,
35 False, 'Spammer',
36 'stay_same_issue', False, False, True, 0, 0, None)])
37 user_service.linkedaccount_tbl.Select(
38 cnxn, cols=user_svc.LINKEDACCOUNT_COLS, parent_id=[333], child_id=[333],
39 or_where_conds=True).AndReturn([])
40
41
42def MakeUserService(cache_manager, my_mox):
43 user_service = user_svc.UserService(cache_manager)
44 user_service.user_tbl = my_mox.CreateMock(sql.SQLTableManager)
45 user_service.hotlistvisithistory_tbl = my_mox.CreateMock(sql.SQLTableManager)
46 user_service.linkedaccount_tbl = my_mox.CreateMock(sql.SQLTableManager)
47 # Account linking invites are done with patch().
48 return user_service
49
50
51class UserTwoLevelCacheTest(unittest.TestCase):
52
53 def setUp(self):
54 self.testbed = testbed.Testbed()
55 self.testbed.activate()
56 self.testbed.init_memcache_stub()
57
58 self.mox = mox.Mox()
59 self.cnxn = fake.MonorailConnection()
60 self.cache_manager = fake.CacheManager()
61 self.user_service = MakeUserService(self.cache_manager, self.mox)
62
63 def tearDown(self):
64 self.testbed.deactivate()
65
66 def testDeserializeUsersByID(self):
67 user_rows = [
68 (111, 'a@example.com', False, False, False, False, True, False, '',
69 'stay_same_issue', False, False, True, 0, 0, None),
70 (222, 'b@example.com', False, False, False, False, True, False, '',
71 'next_in_list', False, False, True, 0, 0, None),
72 ]
73 linkedaccount_rows = []
74 user_dict = self.user_service.user_2lc._DeserializeUsersByID(
75 user_rows, linkedaccount_rows)
76 self.assertEqual(2, len(user_dict))
77 self.assertEqual('a@example.com', user_dict[111].email)
78 self.assertFalse(user_dict[111].is_site_admin)
79 self.assertEqual('', user_dict[111].banned)
80 self.assertFalse(user_dict[111].notify_issue_change)
81 self.assertEqual('b@example.com', user_dict[222].email)
82 self.assertIsNone(user_dict[111].linked_parent_id)
83 self.assertEqual([], user_dict[111].linked_child_ids)
84 self.assertIsNone(user_dict[222].linked_parent_id)
85 self.assertEqual([], user_dict[222].linked_child_ids)
86
87 def testDeserializeUsersByID_LinkedAccounts(self):
88 user_rows = [
89 (111, 'a@example.com', False, False, False, False, True, False, '',
90 'stay_same_issue', False, False, True, 0, 0, None),
91 ]
92 linkedaccount_rows = [(111, 222), (111, 333), (444, 111)]
93 user_dict = self.user_service.user_2lc._DeserializeUsersByID(
94 user_rows, linkedaccount_rows)
95 self.assertEqual(1, len(user_dict))
96 user_pb = user_dict[111]
97 self.assertEqual('a@example.com', user_pb.email)
98 self.assertEqual(444, user_pb.linked_parent_id)
99 self.assertEqual([222, 333], user_pb.linked_child_ids)
100
101 def testFetchItems(self):
102 SetUpGetUsers(self.user_service, self.cnxn)
103 self.mox.ReplayAll()
104 user_dict = self.user_service.user_2lc.FetchItems(self.cnxn, [333])
105 self.mox.VerifyAll()
106 self.assertEqual([333], list(user_dict.keys()))
107 self.assertEqual('c@example.com', user_dict[333].email)
108 self.assertFalse(user_dict[333].is_site_admin)
109 self.assertEqual('Spammer', user_dict[333].banned)
110
111
112class UserServiceTest(unittest.TestCase):
113
114 def setUp(self):
115 self.testbed = testbed.Testbed()
116 self.testbed.activate()
117 self.testbed.init_memcache_stub()
118
119 self.mox = mox.Mox()
120 self.cnxn = fake.MonorailConnection()
121 self.cache_manager = fake.CacheManager()
122 self.user_service = MakeUserService(self.cache_manager, self.mox)
123
124 def tearDown(self):
125 self.testbed.deactivate()
126 self.mox.UnsetStubs()
127 self.mox.ResetAll()
128
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +0100129 def testCreateUsers(self):
130 self.user_service.user_tbl.Select(
131 self.cnxn,
132 cols=('user_id',),
133 user_id=[3035911623, 2996997680],
134 ).AndReturn([(2996997680,)])
Copybara854996b2021-09-07 19:36:02 +0000135 self.user_service.user_tbl.InsertRows(
136 self.cnxn,
137 ['user_id', 'email', 'obscure_email'],
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +0100138 [(3035911623, 'a@example.com', True)],
Copybara854996b2021-09-07 19:36:02 +0000139 ).AndReturn(None)
Copybara854996b2021-09-07 19:36:02 +0000140 self.mox.ReplayAll()
141 self.user_service._CreateUsers(
142 self.cnxn, ['a@example.com', 'b@example.com'])
143 self.mox.VerifyAll()
144
145 def SetUpLookupUserEmails(self):
146 self.user_service.user_tbl.Select(
147 self.cnxn, cols=['user_id', 'email'], user_id=[222]).AndReturn(
148 [(222, 'b@example.com')])
149
150 def testLookupUserEmails(self):
151 self.SetUpLookupUserEmails()
152 self.user_service.email_cache.CacheItem(
153 111, 'a@example.com')
154 self.mox.ReplayAll()
155 emails_dict = self.user_service.LookupUserEmails(
156 self.cnxn, [111, 222])
157 self.mox.VerifyAll()
158 self.assertEqual(
159 {111: 'a@example.com', 222: 'b@example.com'},
160 emails_dict)
161
162 def SetUpLookupUserEmails_Missed(self):
163 self.user_service.user_tbl.Select(
164 self.cnxn, cols=['user_id', 'email'], user_id=[222]).AndReturn([])
165 self.user_service.email_cache.CacheItem(
166 111, 'a@example.com')
167
168 def testLookupUserEmails_Missed(self):
169 self.SetUpLookupUserEmails_Missed()
170 self.mox.ReplayAll()
171 with self.assertRaises(exceptions.NoSuchUserException):
172 self.user_service.LookupUserEmails(self.cnxn, [111, 222])
173 self.mox.VerifyAll()
174
175 def testLookUpUserEmails_IgnoreMissed(self):
176 self.SetUpLookupUserEmails_Missed()
177 self.mox.ReplayAll()
178 emails_dict = self.user_service.LookupUserEmails(
179 self.cnxn, [111, 222], ignore_missed=True)
180 self.mox.VerifyAll()
181 self.assertEqual({111: 'a@example.com'}, emails_dict)
182
183 def testLookupUserEmail(self):
184 self.SetUpLookupUserEmails() # Same as testLookupUserEmails()
185 self.mox.ReplayAll()
186 email_addr = self.user_service.LookupUserEmail(self.cnxn, 222)
187 self.mox.VerifyAll()
188 self.assertEqual('b@example.com', email_addr)
189
190 def SetUpLookupUserIDs(self):
191 self.user_service.user_tbl.Select(
192 self.cnxn, cols=['email', 'user_id'],
193 email=['b@example.com']).AndReturn([('b@example.com', 222)])
194
195 def testLookupUserIDs(self):
196 self.SetUpLookupUserIDs()
197 self.user_service.user_id_cache.CacheItem(
198 'a@example.com', 111)
199 self.mox.ReplayAll()
200 user_id_dict = self.user_service.LookupUserIDs(
201 self.cnxn, ['a@example.com', 'b@example.com'])
202 self.mox.VerifyAll()
203 self.assertEqual(
204 {'a@example.com': 111, 'b@example.com': 222},
205 user_id_dict)
206
207 def testLookupUserIDs_InvalidEmail(self):
208 self.user_service.user_tbl.Select(
209 self.cnxn, cols=['email', 'user_id'], email=['abc']).AndReturn([])
210 self.mox.ReplayAll()
211 user_id_dict = self.user_service.LookupUserIDs(
212 self.cnxn, ['abc'], autocreate=True)
213 self.mox.VerifyAll()
214 self.assertEqual({}, user_id_dict)
215
216 def testLookupUserIDs_NoUserValue(self):
217 self.user_service.user_tbl.Select = mock.Mock(
218 return_value=[('b@example.com', 222)])
219 user_id_dict = self.user_service.LookupUserIDs(
220 self.cnxn, [framework_constants.NO_VALUES, '', 'b@example.com'])
221 self.assertEqual({'b@example.com': 222}, user_id_dict)
222 self.user_service.user_tbl.Select.assert_called_once_with(
223 self.cnxn, cols=['email', 'user_id'], email=['b@example.com'])
224
225 def testLookupUserID(self):
226 self.SetUpLookupUserIDs() # Same as testLookupUserIDs()
227 self.user_service.user_id_cache.CacheItem('a@example.com', 111)
228 self.mox.ReplayAll()
229 user_id = self.user_service.LookupUserID(self.cnxn, 'b@example.com')
230 self.mox.VerifyAll()
231 self.assertEqual(222, user_id)
232
233 def SetUpGetUsersByIDs(self):
234 self.user_service.user_tbl.Select(
235 self.cnxn, cols=user_svc.USER_COLS, user_id=[333, 444]).AndReturn(
236 [
237 (
238 333, 'c@example.com', False, False, False, False, True,
239 False, 'Spammer', 'stay_same_issue', False, False, True, 0,
240 0, None)
241 ])
242 self.user_service.linkedaccount_tbl.Select(
243 self.cnxn,
244 cols=user_svc.LINKEDACCOUNT_COLS,
245 parent_id=[333, 444],
246 child_id=[333, 444],
247 or_where_conds=True).AndReturn([])
248
249
250 def testGetUsersByIDs(self):
251 self.SetUpGetUsersByIDs()
252 user_a = user_pb2.User(email='a@example.com')
253 self.user_service.user_2lc.CacheItem(111, user_a)
254 self.mox.ReplayAll()
255 # 444 user does not exist.
256 user_dict = self.user_service.GetUsersByIDs(self.cnxn, [111, 333, 444])
257 self.mox.VerifyAll()
258 self.assertEqual(3, len(user_dict))
259 self.assertEqual('a@example.com', user_dict[111].email)
260 self.assertFalse(user_dict[111].is_site_admin)
261 self.assertFalse(user_dict[111].banned)
262 self.assertTrue(user_dict[111].notify_issue_change)
263 self.assertEqual('c@example.com', user_dict[333].email)
264 self.assertEqual(user_dict[444], user_pb2.MakeUser(444))
265
266 def testGetUsersByIDs_SkipMissed(self):
267 self.SetUpGetUsersByIDs()
268 user_a = user_pb2.User(email='a@example.com')
269 self.user_service.user_2lc.CacheItem(111, user_a)
270 self.mox.ReplayAll()
271 # 444 user does not exist
272 user_dict = self.user_service.GetUsersByIDs(
273 self.cnxn, [111, 333, 444], skip_missed=True)
274 self.mox.VerifyAll()
275 self.assertEqual(2, len(user_dict))
276 self.assertEqual('a@example.com', user_dict[111].email)
277 self.assertFalse(user_dict[111].is_site_admin)
278 self.assertFalse(user_dict[111].banned)
279 self.assertTrue(user_dict[111].notify_issue_change)
280 self.assertEqual('c@example.com', user_dict[333].email)
281
282 def testGetUser(self):
283 SetUpGetUsers(self.user_service, self.cnxn)
284 user_a = user_pb2.User(email='a@example.com')
285 self.user_service.user_2lc.CacheItem(111, user_a)
286 self.mox.ReplayAll()
287 user = self.user_service.GetUser(self.cnxn, 333)
288 self.mox.VerifyAll()
289 self.assertEqual('c@example.com', user.email)
290
291 def SetUpUpdateUser(self):
292 delta = {
293 'keep_people_perms_open': False,
294 'preview_on_hover': True,
295 'notify_issue_change': True,
296 'after_issue_update': 'STAY_SAME_ISSUE',
297 'notify_starred_issue_change': True,
298 'notify_starred_ping': False,
299 'is_site_admin': False,
300 'banned': 'Turned spammer',
301 'obscure_email': True,
302 'email_compact_subject': False,
303 'email_view_widget': True,
304 'last_visit_timestamp': 0,
305 'email_bounce_timestamp': 0,
306 'vacation_message': None,
307 }
308 self.user_service.user_tbl.Update(
309 self.cnxn, delta, user_id=111, commit=False)
310
311 def testUpdateUser(self):
312 self.SetUpUpdateUser()
313 user_a = user_pb2.User(
314 email='a@example.com', banned='Turned spammer')
315 self.mox.ReplayAll()
316 self.user_service.UpdateUser(self.cnxn, 111, user_a)
317 self.mox.VerifyAll()
318 self.assertFalse(self.user_service.user_2lc.HasItem(111))
319
320 def SetUpGetRecentlyVisitedHotlists(self):
321 self.user_service.hotlistvisithistory_tbl.Select(
322 self.cnxn, cols=['hotlist_id'], user_id=[111],
323 order_by=[('viewed DESC', [])], limit=10).AndReturn(
324 ((123,), (234,)))
325
326 def testGetRecentlyVisitedHotlists(self):
327 self.SetUpGetRecentlyVisitedHotlists()
328 self.mox.ReplayAll()
329 recent_hotlist_rows = self.user_service.GetRecentlyVisitedHotlists(
330 self.cnxn, 111)
331 self.mox.VerifyAll()
332 self.assertEqual(recent_hotlist_rows, [123, 234])
333
334 def SetUpAddVisitedHotlist(self, ts):
335 self.user_service.hotlistvisithistory_tbl.Delete(
336 self.cnxn, hotlist_id=123, user_id=111, commit=False)
337 self.user_service.hotlistvisithistory_tbl.InsertRows(
338 self.cnxn, user_svc.HOTLISTVISITHISTORY_COLS,
339 [(123, 111, ts)],
340 commit=False)
341
342 @mock.patch('time.time')
343 def testAddVisitedHotlist(self, mockTime):
344 ts = 122333
345 mockTime.return_value = ts
346 self.SetUpAddVisitedHotlist(ts)
347 self.mox.ReplayAll()
348 self.user_service.AddVisitedHotlist(self.cnxn, 111, 123, commit=False)
349 self.mox.VerifyAll()
350
351 def testExpungeHotlistsFromHistory(self):
352 self.user_service.hotlistvisithistory_tbl.Delete = mock.Mock()
353 hotlist_ids = [123, 223]
354 self.user_service.ExpungeHotlistsFromHistory(
355 self.cnxn, hotlist_ids, commit=False)
356 self.user_service.hotlistvisithistory_tbl.Delete.assert_called_once_with(
357 self.cnxn, hotlist_id=hotlist_ids, commit=False)
358
359 def testExpungeUsersHotlistsHistory(self):
360 self.user_service.hotlistvisithistory_tbl.Delete = mock.Mock()
361 user_ids = [111, 222]
362 self.user_service.ExpungeUsersHotlistsHistory(
363 self.cnxn, user_ids, commit=False)
364 self.user_service.hotlistvisithistory_tbl.Delete.assert_called_once_with(
365 self.cnxn, user_id=user_ids, commit=False)
366
367 def SetUpTrimUserVisitedHotlists(self, user_ids, ts):
368 self.user_service.hotlistvisithistory_tbl.Select(
369 self.cnxn, cols=['user_id'], group_by=['user_id'],
370 having=[('COUNT(*) > %s', [10])], limit=1000).AndReturn((
371 (111,), (222,), (333,)))
372 for user_id in user_ids:
373 self.user_service.hotlistvisithistory_tbl.Select(
374 self.cnxn, cols=['viewed'], user_id=user_id,
375 order_by=[('viewed DESC', [])]).AndReturn([
376 (ts,), (ts,), (ts,), (ts,), (ts,), (ts,),
377 (ts,), (ts,), (ts,), (ts,), (ts+1,)])
378 self.user_service.hotlistvisithistory_tbl.Delete(
379 self.cnxn, user_id=user_id, where=[('viewed < %s', [ts])],
380 commit=False)
381
382 @mock.patch('time.time')
383 def testTrimUserVisitedHotlists(self, mockTime):
384 ts = 122333
385 mockTime.return_value = ts
386 self.SetUpTrimUserVisitedHotlists([111, 222, 333], ts)
387 self.mox.ReplayAll()
388 self.user_service.TrimUserVisitedHotlists(self.cnxn, commit=False)
389 self.mox.VerifyAll()
390
391 def testGetPendingLinkedInvites_Anon(self):
392 """An Anon user never has invites to link accounts."""
393 as_parent, as_child = self.user_service.GetPendingLinkedInvites(
394 self.cnxn, 0)
395 self.assertEqual([], as_parent)
396 self.assertEqual([], as_child)
397
398 def testGetPendingLinkedInvites_None(self):
399 """A user who has no link invites gets empty lists."""
400 self.user_service.linkedaccountinvite_tbl = mock.Mock()
401 self.user_service.linkedaccountinvite_tbl.Select.return_value = []
402 as_parent, as_child = self.user_service.GetPendingLinkedInvites(
403 self.cnxn, 111)
404 self.assertEqual([], as_parent)
405 self.assertEqual([], as_child)
406
407 def testGetPendingLinkedInvites_Some(self):
408 """A user who has link invites can get them."""
409 self.user_service.linkedaccountinvite_tbl = mock.Mock()
410 self.user_service.linkedaccountinvite_tbl.Select.return_value = [
411 (111, 222), (111, 333), (888, 999), (333, 111)]
412 as_parent, as_child = self.user_service.GetPendingLinkedInvites(
413 self.cnxn, 111)
414 self.assertEqual([222, 333], as_parent)
415 self.assertEqual([333], as_child)
416
417 def testAssertNotAlreadyLinked_NotLinked(self):
418 """No exception is raised when accounts are not already linked."""
419 self.user_service.linkedaccount_tbl = mock.Mock()
420 self.user_service.linkedaccount_tbl.Select.return_value = []
421 self.user_service._AssertNotAlreadyLinked(self.cnxn, 111, 222)
422
423 def testAssertNotAlreadyLinked_AlreadyLinked(self):
424 """Reject attempt to link any account that is already linked."""
425 self.user_service.linkedaccount_tbl = mock.Mock()
426 self.user_service.linkedaccount_tbl.Select.return_value = [
427 (111, 222)]
428 with self.assertRaises(exceptions.InputException):
429 self.user_service._AssertNotAlreadyLinked(self.cnxn, 111, 333)
430
431 def testInviteLinkedParent_Anon(self):
432 """Anon cannot invite anyone to link accounts."""
433 with self.assertRaises(exceptions.InputException):
434 self.user_service.InviteLinkedParent(self.cnxn, 0, 0)
435 with self.assertRaises(exceptions.InputException):
436 self.user_service.InviteLinkedParent(self.cnxn, 111, 0)
437 with self.assertRaises(exceptions.InputException):
438 self.user_service.InviteLinkedParent(self.cnxn, 0, 111)
439
440 def testInviteLinkedParent_Normal(self):
441 """One account can invite another to link."""
442 self.user_service.linkedaccount_tbl = mock.Mock()
443 self.user_service.linkedaccount_tbl.Select.return_value = []
444 self.user_service.linkedaccountinvite_tbl = mock.Mock()
445 self.user_service.InviteLinkedParent(
446 self.cnxn, 111, 222)
447 self.user_service.linkedaccountinvite_tbl.InsertRow.assert_called_once_with(
448 self.cnxn, parent_id=111, child_id=222)
449
450 def testAcceptLinkedChild_Anon(self):
451 """Reject attempts for anon to accept any invite."""
452 with self.assertRaises(exceptions.InputException):
453 self.user_service.AcceptLinkedChild(self.cnxn, 0, 333)
454 with self.assertRaises(exceptions.InputException):
455 self.user_service.AcceptLinkedChild(self.cnxn, 333, 0)
456
457 def testAcceptLinkedChild_Missing(self):
458 """Reject attempts to link without a matching invite."""
459 self.user_service.linkedaccountinvite_tbl = mock.Mock()
460 self.user_service.linkedaccountinvite_tbl.Select.return_value = []
461 self.user_service.linkedaccount_tbl = mock.Mock()
462 self.user_service.linkedaccount_tbl.Select.return_value = []
463 with self.assertRaises(exceptions.InputException) as cm:
464 self.user_service.AcceptLinkedChild(self.cnxn, 111, 333)
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +0100465 self.assertEqual('No such invite', str(cm.exception))
Copybara854996b2021-09-07 19:36:02 +0000466
467 def testAcceptLinkedChild_Normal(self):
468 """Create linkage between accounts and remove invite."""
469 self.user_service.linkedaccountinvite_tbl = mock.Mock()
470 self.user_service.linkedaccountinvite_tbl.Select.return_value = [
471 (111, 222), (333, 444)]
472 self.user_service.linkedaccount_tbl = mock.Mock()
473 self.user_service.linkedaccount_tbl.Select.return_value = []
474
475 self.user_service.AcceptLinkedChild(self.cnxn, 111, 222)
476 self.user_service.linkedaccount_tbl.InsertRow.assert_called_once_with(
477 self.cnxn, parent_id=111, child_id=222)
478 self.user_service.linkedaccountinvite_tbl.Delete.assert_called_once_with(
479 self.cnxn, parent_id=111, child_id=222)
480
481 def testUnlinkAccounts_MissingIDs(self):
482 """Reject an attempt to unlink anon."""
483 with self.assertRaises(exceptions.InputException):
484 self.user_service.UnlinkAccounts(self.cnxn, 0, 0)
485 with self.assertRaises(exceptions.InputException):
486 self.user_service.UnlinkAccounts(self.cnxn, 0, 111)
487 with self.assertRaises(exceptions.InputException):
488 self.user_service.UnlinkAccounts(self.cnxn, 111, 0)
489
490 def testUnlinkAccounts_Normal(self):
491 """We can unlink accounts."""
492 self.user_service.linkedaccount_tbl = mock.Mock()
493 self.user_service.UnlinkAccounts(self.cnxn, 111, 222)
494 self.user_service.linkedaccount_tbl.Delete.assert_called_once_with(
495 self.cnxn, parent_id=111, child_id=222)
496
497 def testUpdateUserSettings(self):
498 self.SetUpUpdateUser()
499 user_a = user_pb2.User(email='a@example.com')
500 self.mox.ReplayAll()
501 self.user_service.UpdateUserSettings(
502 self.cnxn, 111, user_a, is_banned=True,
503 banned_reason='Turned spammer')
504 self.mox.VerifyAll()
505
506 def testGetUsersPrefs(self):
507 self.user_service.userprefs_tbl = mock.Mock()
508 self.user_service.userprefs_tbl.Select.return_value = [
509 (111, 'code_font', 'true'),
510 (111, 'keep_perms_open', 'true'),
511 # Note: user 222 has not set any prefs.
512 (333, 'code_font', 'false')]
513
514 prefs_dict = self.user_service.GetUsersPrefs(self.cnxn, [111, 222, 333])
515
516 expected = {
517 111: user_pb2.UserPrefs(
518 user_id=111,
519 prefs=[user_pb2.UserPrefValue(name='code_font', value='true'),
520 user_pb2.UserPrefValue(name='keep_perms_open', value='true')]),
521 222: user_pb2.UserPrefs(user_id=222),
522 333: user_pb2.UserPrefs(
523 user_id=333,
524 prefs=[user_pb2.UserPrefValue(name='code_font', value='false')]),
525 }
526 self.assertEqual(expected, prefs_dict)
527
528 def testGetUserPrefs(self):
529 self.user_service.userprefs_tbl = mock.Mock()
530 self.user_service.userprefs_tbl.Select.return_value = [
531 (111, 'code_font', 'true'),
532 (111, 'keep_perms_open', 'true'),
533 # Note: user 222 has not set any prefs.
534 (333, 'code_font', 'false')]
535
536 userprefs = self.user_service.GetUserPrefs(self.cnxn, 111)
537 expected = user_pb2.UserPrefs(
538 user_id=111,
539 prefs=[user_pb2.UserPrefValue(name='code_font', value='true'),
540 user_pb2.UserPrefValue(name='keep_perms_open', value='true')])
541 self.assertEqual(expected, userprefs)
542
543 userprefs = self.user_service.GetUserPrefs(self.cnxn, 222)
544 expected = user_pb2.UserPrefs(user_id=222)
545 self.assertEqual(expected, userprefs)
546
547 def testSetUserPrefs(self):
548 self.user_service.userprefs_tbl = mock.Mock()
549 pref_values = [user_pb2.UserPrefValue(name='code_font', value='true'),
550 user_pb2.UserPrefValue(name='keep_perms_open', value='true')]
551 self.user_service.SetUserPrefs(self.cnxn, 111, pref_values)
552 self.user_service.userprefs_tbl.InsertRows.assert_called_once_with(
553 self.cnxn, user_svc.USERPREFS_COLS,
554 [(111, 'code_font', 'true'),
555 (111, 'keep_perms_open', 'true')],
556 replace=True)
557
558 def testExpungeUsers(self):
559 self.user_service.linkedaccount_tbl.Delete = mock.Mock()
560 self.user_service.linkedaccountinvite_tbl.Delete = mock.Mock()
561 self.user_service.userprefs_tbl.Delete = mock.Mock()
562 self.user_service.user_tbl.Delete = mock.Mock()
563
564 user_ids = [222, 444]
565 self.user_service.ExpungeUsers(self.cnxn, user_ids)
566
567 linked_account_calls = [
568 mock.call(self.cnxn, parent_id=user_ids, commit=False),
569 mock.call(self.cnxn, child_id=user_ids, commit=False)]
570 self.user_service.linkedaccount_tbl.Delete.has_calls(linked_account_calls)
571 self.user_service.linkedaccountinvite_tbl.Delete.has_calls(
572 linked_account_calls)
573 user_calls = [mock.call(self.cnxn, user_id=user_ids, commit=False)]
574 self.user_service.userprefs_tbl.Delete.has_calls(user_calls)
575 self.user_service.user_tbl.Delete.has_calls(user_calls)
576
577 def testTotalUsersCount(self):
578 self.user_service.user_tbl.SelectValue = mock.Mock(return_value=10)
579 self.assertEqual(self.user_service.TotalUsersCount(self.cnxn), 9)
580 self.user_service.user_tbl.SelectValue.assert_called_once_with(
581 self.cnxn, col='COUNT(*)')
582
583 def testGetAllUserEmailsBatch(self):
584 rows = [('cow@test.com',), ('pig@test.com',), ('fox@test.com',)]
585 self.user_service.user_tbl.Select = mock.Mock(return_value=rows)
586 emails = self.user_service.GetAllUserEmailsBatch(self.cnxn)
587 self.user_service.user_tbl.Select.assert_called_once_with(
588 self.cnxn, cols=['email'], limit=1000, offset=0,
589 where=[('user_id != %s', [framework_constants.DELETED_USER_ID])],
590 order_by=[('user_id ASC', [])])
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +0100591 six.assertCountEqual(
592 self, emails, ['cow@test.com', 'pig@test.com', 'fox@test.com'])
Copybara854996b2021-09-07 19:36:02 +0000593
594 def testGetAllUserEmailsBatch_CustomLimit(self):
595 rows = [('cow@test.com',), ('pig@test.com',), ('fox@test.com',)]
596 self.user_service.user_tbl.Select = mock.Mock(return_value=rows)
597 emails = self.user_service.GetAllUserEmailsBatch(
598 self.cnxn, limit=30, offset=60)
599 self.user_service.user_tbl.Select.assert_called_once_with(
600 self.cnxn, cols=['email'], limit=30, offset=60,
601 where=[('user_id != %s', [framework_constants.DELETED_USER_ID])],
602 order_by=[('user_id ASC', [])])
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +0100603 six.assertCountEqual(
604 self, emails, ['cow@test.com', 'pig@test.com', 'fox@test.com'])