blob: 323d3eb42803ad48bb9a2ac2f5c756a698bcb79f [file] [log] [blame]
Copybara854996b2021-09-07 19:36:02 +00001# Copyright 2016 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style
3# license that can be found in the LICENSE file or at
4# https://developers.google.com/open-source/licenses/bsd
5
6"""Tests for the user service."""
7from __future__ import print_function
8from __future__ import division
9from __future__ import absolute_import
10
11import unittest
12
13import mock
Adrià Vilanova Martínez9f9ade52022-10-10 23:20:11 +020014try:
15 from mox3 import mox
16except ImportError:
17 import mox
Copybara854996b2021-09-07 19:36:02 +000018import time
19
20from google.appengine.ext import testbed
21
22from framework import exceptions
23from framework import framework_constants
24from framework import sql
25from proto import user_pb2
26from services import user_svc
27from testing import fake
28
29
30def SetUpGetUsers(user_service, cnxn):
31 """Set up expected calls to SQL tables."""
32 user_service.user_tbl.Select(
33 cnxn, cols=user_svc.USER_COLS, user_id=[333]).AndReturn(
34 [(333, 'c@example.com', False, False, False, False, True,
35 False, 'Spammer',
36 'stay_same_issue', False, False, True, 0, 0, None)])
37 user_service.linkedaccount_tbl.Select(
38 cnxn, cols=user_svc.LINKEDACCOUNT_COLS, parent_id=[333], child_id=[333],
39 or_where_conds=True).AndReturn([])
40
41
42def MakeUserService(cache_manager, my_mox):
43 user_service = user_svc.UserService(cache_manager)
44 user_service.user_tbl = my_mox.CreateMock(sql.SQLTableManager)
45 user_service.hotlistvisithistory_tbl = my_mox.CreateMock(sql.SQLTableManager)
46 user_service.linkedaccount_tbl = my_mox.CreateMock(sql.SQLTableManager)
47 # Account linking invites are done with patch().
48 return user_service
49
50
51class UserTwoLevelCacheTest(unittest.TestCase):
52
53 def setUp(self):
54 self.testbed = testbed.Testbed()
55 self.testbed.activate()
56 self.testbed.init_memcache_stub()
57
58 self.mox = mox.Mox()
59 self.cnxn = fake.MonorailConnection()
60 self.cache_manager = fake.CacheManager()
61 self.user_service = MakeUserService(self.cache_manager, self.mox)
62
63 def tearDown(self):
64 self.testbed.deactivate()
65
66 def testDeserializeUsersByID(self):
67 user_rows = [
68 (111, 'a@example.com', False, False, False, False, True, False, '',
69 'stay_same_issue', False, False, True, 0, 0, None),
70 (222, 'b@example.com', False, False, False, False, True, False, '',
71 'next_in_list', False, False, True, 0, 0, None),
72 ]
73 linkedaccount_rows = []
74 user_dict = self.user_service.user_2lc._DeserializeUsersByID(
75 user_rows, linkedaccount_rows)
76 self.assertEqual(2, len(user_dict))
77 self.assertEqual('a@example.com', user_dict[111].email)
78 self.assertFalse(user_dict[111].is_site_admin)
79 self.assertEqual('', user_dict[111].banned)
80 self.assertFalse(user_dict[111].notify_issue_change)
81 self.assertEqual('b@example.com', user_dict[222].email)
82 self.assertIsNone(user_dict[111].linked_parent_id)
83 self.assertEqual([], user_dict[111].linked_child_ids)
84 self.assertIsNone(user_dict[222].linked_parent_id)
85 self.assertEqual([], user_dict[222].linked_child_ids)
86
87 def testDeserializeUsersByID_LinkedAccounts(self):
88 user_rows = [
89 (111, 'a@example.com', False, False, False, False, True, False, '',
90 'stay_same_issue', False, False, True, 0, 0, None),
91 ]
92 linkedaccount_rows = [(111, 222), (111, 333), (444, 111)]
93 user_dict = self.user_service.user_2lc._DeserializeUsersByID(
94 user_rows, linkedaccount_rows)
95 self.assertEqual(1, len(user_dict))
96 user_pb = user_dict[111]
97 self.assertEqual('a@example.com', user_pb.email)
98 self.assertEqual(444, user_pb.linked_parent_id)
99 self.assertEqual([222, 333], user_pb.linked_child_ids)
100
101 def testFetchItems(self):
102 SetUpGetUsers(self.user_service, self.cnxn)
103 self.mox.ReplayAll()
104 user_dict = self.user_service.user_2lc.FetchItems(self.cnxn, [333])
105 self.mox.VerifyAll()
106 self.assertEqual([333], list(user_dict.keys()))
107 self.assertEqual('c@example.com', user_dict[333].email)
108 self.assertFalse(user_dict[333].is_site_admin)
109 self.assertEqual('Spammer', user_dict[333].banned)
110
111
112class UserServiceTest(unittest.TestCase):
113
114 def setUp(self):
115 self.testbed = testbed.Testbed()
116 self.testbed.activate()
117 self.testbed.init_memcache_stub()
118
119 self.mox = mox.Mox()
120 self.cnxn = fake.MonorailConnection()
121 self.cache_manager = fake.CacheManager()
122 self.user_service = MakeUserService(self.cache_manager, self.mox)
123
124 def tearDown(self):
125 self.testbed.deactivate()
126 self.mox.UnsetStubs()
127 self.mox.ResetAll()
128
129 def SetUpCreateUsers(self):
130 self.user_service.user_tbl.InsertRows(
131 self.cnxn,
132 ['user_id', 'email', 'obscure_email'],
133 [(3035911623, 'a@example.com', True),
134 (2996997680, 'b@example.com', True)]
135 ).AndReturn(None)
136
137 def testCreateUsers(self):
138 self.SetUpCreateUsers()
139 self.mox.ReplayAll()
140 self.user_service._CreateUsers(
141 self.cnxn, ['a@example.com', 'b@example.com'])
142 self.mox.VerifyAll()
143
144 def SetUpLookupUserEmails(self):
145 self.user_service.user_tbl.Select(
146 self.cnxn, cols=['user_id', 'email'], user_id=[222]).AndReturn(
147 [(222, 'b@example.com')])
148
149 def testLookupUserEmails(self):
150 self.SetUpLookupUserEmails()
151 self.user_service.email_cache.CacheItem(
152 111, 'a@example.com')
153 self.mox.ReplayAll()
154 emails_dict = self.user_service.LookupUserEmails(
155 self.cnxn, [111, 222])
156 self.mox.VerifyAll()
157 self.assertEqual(
158 {111: 'a@example.com', 222: 'b@example.com'},
159 emails_dict)
160
161 def SetUpLookupUserEmails_Missed(self):
162 self.user_service.user_tbl.Select(
163 self.cnxn, cols=['user_id', 'email'], user_id=[222]).AndReturn([])
164 self.user_service.email_cache.CacheItem(
165 111, 'a@example.com')
166
167 def testLookupUserEmails_Missed(self):
168 self.SetUpLookupUserEmails_Missed()
169 self.mox.ReplayAll()
170 with self.assertRaises(exceptions.NoSuchUserException):
171 self.user_service.LookupUserEmails(self.cnxn, [111, 222])
172 self.mox.VerifyAll()
173
174 def testLookUpUserEmails_IgnoreMissed(self):
175 self.SetUpLookupUserEmails_Missed()
176 self.mox.ReplayAll()
177 emails_dict = self.user_service.LookupUserEmails(
178 self.cnxn, [111, 222], ignore_missed=True)
179 self.mox.VerifyAll()
180 self.assertEqual({111: 'a@example.com'}, emails_dict)
181
182 def testLookupUserEmail(self):
183 self.SetUpLookupUserEmails() # Same as testLookupUserEmails()
184 self.mox.ReplayAll()
185 email_addr = self.user_service.LookupUserEmail(self.cnxn, 222)
186 self.mox.VerifyAll()
187 self.assertEqual('b@example.com', email_addr)
188
189 def SetUpLookupUserIDs(self):
190 self.user_service.user_tbl.Select(
191 self.cnxn, cols=['email', 'user_id'],
192 email=['b@example.com']).AndReturn([('b@example.com', 222)])
193
194 def testLookupUserIDs(self):
195 self.SetUpLookupUserIDs()
196 self.user_service.user_id_cache.CacheItem(
197 'a@example.com', 111)
198 self.mox.ReplayAll()
199 user_id_dict = self.user_service.LookupUserIDs(
200 self.cnxn, ['a@example.com', 'b@example.com'])
201 self.mox.VerifyAll()
202 self.assertEqual(
203 {'a@example.com': 111, 'b@example.com': 222},
204 user_id_dict)
205
206 def testLookupUserIDs_InvalidEmail(self):
207 self.user_service.user_tbl.Select(
208 self.cnxn, cols=['email', 'user_id'], email=['abc']).AndReturn([])
209 self.mox.ReplayAll()
210 user_id_dict = self.user_service.LookupUserIDs(
211 self.cnxn, ['abc'], autocreate=True)
212 self.mox.VerifyAll()
213 self.assertEqual({}, user_id_dict)
214
215 def testLookupUserIDs_NoUserValue(self):
216 self.user_service.user_tbl.Select = mock.Mock(
217 return_value=[('b@example.com', 222)])
218 user_id_dict = self.user_service.LookupUserIDs(
219 self.cnxn, [framework_constants.NO_VALUES, '', 'b@example.com'])
220 self.assertEqual({'b@example.com': 222}, user_id_dict)
221 self.user_service.user_tbl.Select.assert_called_once_with(
222 self.cnxn, cols=['email', 'user_id'], email=['b@example.com'])
223
224 def testLookupUserID(self):
225 self.SetUpLookupUserIDs() # Same as testLookupUserIDs()
226 self.user_service.user_id_cache.CacheItem('a@example.com', 111)
227 self.mox.ReplayAll()
228 user_id = self.user_service.LookupUserID(self.cnxn, 'b@example.com')
229 self.mox.VerifyAll()
230 self.assertEqual(222, user_id)
231
232 def SetUpGetUsersByIDs(self):
233 self.user_service.user_tbl.Select(
234 self.cnxn, cols=user_svc.USER_COLS, user_id=[333, 444]).AndReturn(
235 [
236 (
237 333, 'c@example.com', False, False, False, False, True,
238 False, 'Spammer', 'stay_same_issue', False, False, True, 0,
239 0, None)
240 ])
241 self.user_service.linkedaccount_tbl.Select(
242 self.cnxn,
243 cols=user_svc.LINKEDACCOUNT_COLS,
244 parent_id=[333, 444],
245 child_id=[333, 444],
246 or_where_conds=True).AndReturn([])
247
248
249 def testGetUsersByIDs(self):
250 self.SetUpGetUsersByIDs()
251 user_a = user_pb2.User(email='a@example.com')
252 self.user_service.user_2lc.CacheItem(111, user_a)
253 self.mox.ReplayAll()
254 # 444 user does not exist.
255 user_dict = self.user_service.GetUsersByIDs(self.cnxn, [111, 333, 444])
256 self.mox.VerifyAll()
257 self.assertEqual(3, len(user_dict))
258 self.assertEqual('a@example.com', user_dict[111].email)
259 self.assertFalse(user_dict[111].is_site_admin)
260 self.assertFalse(user_dict[111].banned)
261 self.assertTrue(user_dict[111].notify_issue_change)
262 self.assertEqual('c@example.com', user_dict[333].email)
263 self.assertEqual(user_dict[444], user_pb2.MakeUser(444))
264
265 def testGetUsersByIDs_SkipMissed(self):
266 self.SetUpGetUsersByIDs()
267 user_a = user_pb2.User(email='a@example.com')
268 self.user_service.user_2lc.CacheItem(111, user_a)
269 self.mox.ReplayAll()
270 # 444 user does not exist
271 user_dict = self.user_service.GetUsersByIDs(
272 self.cnxn, [111, 333, 444], skip_missed=True)
273 self.mox.VerifyAll()
274 self.assertEqual(2, len(user_dict))
275 self.assertEqual('a@example.com', user_dict[111].email)
276 self.assertFalse(user_dict[111].is_site_admin)
277 self.assertFalse(user_dict[111].banned)
278 self.assertTrue(user_dict[111].notify_issue_change)
279 self.assertEqual('c@example.com', user_dict[333].email)
280
281 def testGetUser(self):
282 SetUpGetUsers(self.user_service, self.cnxn)
283 user_a = user_pb2.User(email='a@example.com')
284 self.user_service.user_2lc.CacheItem(111, user_a)
285 self.mox.ReplayAll()
286 user = self.user_service.GetUser(self.cnxn, 333)
287 self.mox.VerifyAll()
288 self.assertEqual('c@example.com', user.email)
289
290 def SetUpUpdateUser(self):
291 delta = {
292 'keep_people_perms_open': False,
293 'preview_on_hover': True,
294 'notify_issue_change': True,
295 'after_issue_update': 'STAY_SAME_ISSUE',
296 'notify_starred_issue_change': True,
297 'notify_starred_ping': False,
298 'is_site_admin': False,
299 'banned': 'Turned spammer',
300 'obscure_email': True,
301 'email_compact_subject': False,
302 'email_view_widget': True,
303 'last_visit_timestamp': 0,
304 'email_bounce_timestamp': 0,
305 'vacation_message': None,
306 }
307 self.user_service.user_tbl.Update(
308 self.cnxn, delta, user_id=111, commit=False)
309
310 def testUpdateUser(self):
311 self.SetUpUpdateUser()
312 user_a = user_pb2.User(
313 email='a@example.com', banned='Turned spammer')
314 self.mox.ReplayAll()
315 self.user_service.UpdateUser(self.cnxn, 111, user_a)
316 self.mox.VerifyAll()
317 self.assertFalse(self.user_service.user_2lc.HasItem(111))
318
319 def SetUpGetRecentlyVisitedHotlists(self):
320 self.user_service.hotlistvisithistory_tbl.Select(
321 self.cnxn, cols=['hotlist_id'], user_id=[111],
322 order_by=[('viewed DESC', [])], limit=10).AndReturn(
323 ((123,), (234,)))
324
325 def testGetRecentlyVisitedHotlists(self):
326 self.SetUpGetRecentlyVisitedHotlists()
327 self.mox.ReplayAll()
328 recent_hotlist_rows = self.user_service.GetRecentlyVisitedHotlists(
329 self.cnxn, 111)
330 self.mox.VerifyAll()
331 self.assertEqual(recent_hotlist_rows, [123, 234])
332
333 def SetUpAddVisitedHotlist(self, ts):
334 self.user_service.hotlistvisithistory_tbl.Delete(
335 self.cnxn, hotlist_id=123, user_id=111, commit=False)
336 self.user_service.hotlistvisithistory_tbl.InsertRows(
337 self.cnxn, user_svc.HOTLISTVISITHISTORY_COLS,
338 [(123, 111, ts)],
339 commit=False)
340
341 @mock.patch('time.time')
342 def testAddVisitedHotlist(self, mockTime):
343 ts = 122333
344 mockTime.return_value = ts
345 self.SetUpAddVisitedHotlist(ts)
346 self.mox.ReplayAll()
347 self.user_service.AddVisitedHotlist(self.cnxn, 111, 123, commit=False)
348 self.mox.VerifyAll()
349
350 def testExpungeHotlistsFromHistory(self):
351 self.user_service.hotlistvisithistory_tbl.Delete = mock.Mock()
352 hotlist_ids = [123, 223]
353 self.user_service.ExpungeHotlistsFromHistory(
354 self.cnxn, hotlist_ids, commit=False)
355 self.user_service.hotlistvisithistory_tbl.Delete.assert_called_once_with(
356 self.cnxn, hotlist_id=hotlist_ids, commit=False)
357
358 def testExpungeUsersHotlistsHistory(self):
359 self.user_service.hotlistvisithistory_tbl.Delete = mock.Mock()
360 user_ids = [111, 222]
361 self.user_service.ExpungeUsersHotlistsHistory(
362 self.cnxn, user_ids, commit=False)
363 self.user_service.hotlistvisithistory_tbl.Delete.assert_called_once_with(
364 self.cnxn, user_id=user_ids, commit=False)
365
366 def SetUpTrimUserVisitedHotlists(self, user_ids, ts):
367 self.user_service.hotlistvisithistory_tbl.Select(
368 self.cnxn, cols=['user_id'], group_by=['user_id'],
369 having=[('COUNT(*) > %s', [10])], limit=1000).AndReturn((
370 (111,), (222,), (333,)))
371 for user_id in user_ids:
372 self.user_service.hotlistvisithistory_tbl.Select(
373 self.cnxn, cols=['viewed'], user_id=user_id,
374 order_by=[('viewed DESC', [])]).AndReturn([
375 (ts,), (ts,), (ts,), (ts,), (ts,), (ts,),
376 (ts,), (ts,), (ts,), (ts,), (ts+1,)])
377 self.user_service.hotlistvisithistory_tbl.Delete(
378 self.cnxn, user_id=user_id, where=[('viewed < %s', [ts])],
379 commit=False)
380
381 @mock.patch('time.time')
382 def testTrimUserVisitedHotlists(self, mockTime):
383 ts = 122333
384 mockTime.return_value = ts
385 self.SetUpTrimUserVisitedHotlists([111, 222, 333], ts)
386 self.mox.ReplayAll()
387 self.user_service.TrimUserVisitedHotlists(self.cnxn, commit=False)
388 self.mox.VerifyAll()
389
390 def testGetPendingLinkedInvites_Anon(self):
391 """An Anon user never has invites to link accounts."""
392 as_parent, as_child = self.user_service.GetPendingLinkedInvites(
393 self.cnxn, 0)
394 self.assertEqual([], as_parent)
395 self.assertEqual([], as_child)
396
397 def testGetPendingLinkedInvites_None(self):
398 """A user who has no link invites gets empty lists."""
399 self.user_service.linkedaccountinvite_tbl = mock.Mock()
400 self.user_service.linkedaccountinvite_tbl.Select.return_value = []
401 as_parent, as_child = self.user_service.GetPendingLinkedInvites(
402 self.cnxn, 111)
403 self.assertEqual([], as_parent)
404 self.assertEqual([], as_child)
405
406 def testGetPendingLinkedInvites_Some(self):
407 """A user who has link invites can get them."""
408 self.user_service.linkedaccountinvite_tbl = mock.Mock()
409 self.user_service.linkedaccountinvite_tbl.Select.return_value = [
410 (111, 222), (111, 333), (888, 999), (333, 111)]
411 as_parent, as_child = self.user_service.GetPendingLinkedInvites(
412 self.cnxn, 111)
413 self.assertEqual([222, 333], as_parent)
414 self.assertEqual([333], as_child)
415
416 def testAssertNotAlreadyLinked_NotLinked(self):
417 """No exception is raised when accounts are not already linked."""
418 self.user_service.linkedaccount_tbl = mock.Mock()
419 self.user_service.linkedaccount_tbl.Select.return_value = []
420 self.user_service._AssertNotAlreadyLinked(self.cnxn, 111, 222)
421
422 def testAssertNotAlreadyLinked_AlreadyLinked(self):
423 """Reject attempt to link any account that is already linked."""
424 self.user_service.linkedaccount_tbl = mock.Mock()
425 self.user_service.linkedaccount_tbl.Select.return_value = [
426 (111, 222)]
427 with self.assertRaises(exceptions.InputException):
428 self.user_service._AssertNotAlreadyLinked(self.cnxn, 111, 333)
429
430 def testInviteLinkedParent_Anon(self):
431 """Anon cannot invite anyone to link accounts."""
432 with self.assertRaises(exceptions.InputException):
433 self.user_service.InviteLinkedParent(self.cnxn, 0, 0)
434 with self.assertRaises(exceptions.InputException):
435 self.user_service.InviteLinkedParent(self.cnxn, 111, 0)
436 with self.assertRaises(exceptions.InputException):
437 self.user_service.InviteLinkedParent(self.cnxn, 0, 111)
438
439 def testInviteLinkedParent_Normal(self):
440 """One account can invite another to link."""
441 self.user_service.linkedaccount_tbl = mock.Mock()
442 self.user_service.linkedaccount_tbl.Select.return_value = []
443 self.user_service.linkedaccountinvite_tbl = mock.Mock()
444 self.user_service.InviteLinkedParent(
445 self.cnxn, 111, 222)
446 self.user_service.linkedaccountinvite_tbl.InsertRow.assert_called_once_with(
447 self.cnxn, parent_id=111, child_id=222)
448
449 def testAcceptLinkedChild_Anon(self):
450 """Reject attempts for anon to accept any invite."""
451 with self.assertRaises(exceptions.InputException):
452 self.user_service.AcceptLinkedChild(self.cnxn, 0, 333)
453 with self.assertRaises(exceptions.InputException):
454 self.user_service.AcceptLinkedChild(self.cnxn, 333, 0)
455
456 def testAcceptLinkedChild_Missing(self):
457 """Reject attempts to link without a matching invite."""
458 self.user_service.linkedaccountinvite_tbl = mock.Mock()
459 self.user_service.linkedaccountinvite_tbl.Select.return_value = []
460 self.user_service.linkedaccount_tbl = mock.Mock()
461 self.user_service.linkedaccount_tbl.Select.return_value = []
462 with self.assertRaises(exceptions.InputException) as cm:
463 self.user_service.AcceptLinkedChild(self.cnxn, 111, 333)
464 self.assertEqual('No such invite', cm.exception.message)
465
466 def testAcceptLinkedChild_Normal(self):
467 """Create linkage between accounts and remove invite."""
468 self.user_service.linkedaccountinvite_tbl = mock.Mock()
469 self.user_service.linkedaccountinvite_tbl.Select.return_value = [
470 (111, 222), (333, 444)]
471 self.user_service.linkedaccount_tbl = mock.Mock()
472 self.user_service.linkedaccount_tbl.Select.return_value = []
473
474 self.user_service.AcceptLinkedChild(self.cnxn, 111, 222)
475 self.user_service.linkedaccount_tbl.InsertRow.assert_called_once_with(
476 self.cnxn, parent_id=111, child_id=222)
477 self.user_service.linkedaccountinvite_tbl.Delete.assert_called_once_with(
478 self.cnxn, parent_id=111, child_id=222)
479
480 def testUnlinkAccounts_MissingIDs(self):
481 """Reject an attempt to unlink anon."""
482 with self.assertRaises(exceptions.InputException):
483 self.user_service.UnlinkAccounts(self.cnxn, 0, 0)
484 with self.assertRaises(exceptions.InputException):
485 self.user_service.UnlinkAccounts(self.cnxn, 0, 111)
486 with self.assertRaises(exceptions.InputException):
487 self.user_service.UnlinkAccounts(self.cnxn, 111, 0)
488
489 def testUnlinkAccounts_Normal(self):
490 """We can unlink accounts."""
491 self.user_service.linkedaccount_tbl = mock.Mock()
492 self.user_service.UnlinkAccounts(self.cnxn, 111, 222)
493 self.user_service.linkedaccount_tbl.Delete.assert_called_once_with(
494 self.cnxn, parent_id=111, child_id=222)
495
496 def testUpdateUserSettings(self):
497 self.SetUpUpdateUser()
498 user_a = user_pb2.User(email='a@example.com')
499 self.mox.ReplayAll()
500 self.user_service.UpdateUserSettings(
501 self.cnxn, 111, user_a, is_banned=True,
502 banned_reason='Turned spammer')
503 self.mox.VerifyAll()
504
505 def testGetUsersPrefs(self):
506 self.user_service.userprefs_tbl = mock.Mock()
507 self.user_service.userprefs_tbl.Select.return_value = [
508 (111, 'code_font', 'true'),
509 (111, 'keep_perms_open', 'true'),
510 # Note: user 222 has not set any prefs.
511 (333, 'code_font', 'false')]
512
513 prefs_dict = self.user_service.GetUsersPrefs(self.cnxn, [111, 222, 333])
514
515 expected = {
516 111: user_pb2.UserPrefs(
517 user_id=111,
518 prefs=[user_pb2.UserPrefValue(name='code_font', value='true'),
519 user_pb2.UserPrefValue(name='keep_perms_open', value='true')]),
520 222: user_pb2.UserPrefs(user_id=222),
521 333: user_pb2.UserPrefs(
522 user_id=333,
523 prefs=[user_pb2.UserPrefValue(name='code_font', value='false')]),
524 }
525 self.assertEqual(expected, prefs_dict)
526
527 def testGetUserPrefs(self):
528 self.user_service.userprefs_tbl = mock.Mock()
529 self.user_service.userprefs_tbl.Select.return_value = [
530 (111, 'code_font', 'true'),
531 (111, 'keep_perms_open', 'true'),
532 # Note: user 222 has not set any prefs.
533 (333, 'code_font', 'false')]
534
535 userprefs = self.user_service.GetUserPrefs(self.cnxn, 111)
536 expected = user_pb2.UserPrefs(
537 user_id=111,
538 prefs=[user_pb2.UserPrefValue(name='code_font', value='true'),
539 user_pb2.UserPrefValue(name='keep_perms_open', value='true')])
540 self.assertEqual(expected, userprefs)
541
542 userprefs = self.user_service.GetUserPrefs(self.cnxn, 222)
543 expected = user_pb2.UserPrefs(user_id=222)
544 self.assertEqual(expected, userprefs)
545
546 def testSetUserPrefs(self):
547 self.user_service.userprefs_tbl = mock.Mock()
548 pref_values = [user_pb2.UserPrefValue(name='code_font', value='true'),
549 user_pb2.UserPrefValue(name='keep_perms_open', value='true')]
550 self.user_service.SetUserPrefs(self.cnxn, 111, pref_values)
551 self.user_service.userprefs_tbl.InsertRows.assert_called_once_with(
552 self.cnxn, user_svc.USERPREFS_COLS,
553 [(111, 'code_font', 'true'),
554 (111, 'keep_perms_open', 'true')],
555 replace=True)
556
557 def testExpungeUsers(self):
558 self.user_service.linkedaccount_tbl.Delete = mock.Mock()
559 self.user_service.linkedaccountinvite_tbl.Delete = mock.Mock()
560 self.user_service.userprefs_tbl.Delete = mock.Mock()
561 self.user_service.user_tbl.Delete = mock.Mock()
562
563 user_ids = [222, 444]
564 self.user_service.ExpungeUsers(self.cnxn, user_ids)
565
566 linked_account_calls = [
567 mock.call(self.cnxn, parent_id=user_ids, commit=False),
568 mock.call(self.cnxn, child_id=user_ids, commit=False)]
569 self.user_service.linkedaccount_tbl.Delete.has_calls(linked_account_calls)
570 self.user_service.linkedaccountinvite_tbl.Delete.has_calls(
571 linked_account_calls)
572 user_calls = [mock.call(self.cnxn, user_id=user_ids, commit=False)]
573 self.user_service.userprefs_tbl.Delete.has_calls(user_calls)
574 self.user_service.user_tbl.Delete.has_calls(user_calls)
575
576 def testTotalUsersCount(self):
577 self.user_service.user_tbl.SelectValue = mock.Mock(return_value=10)
578 self.assertEqual(self.user_service.TotalUsersCount(self.cnxn), 9)
579 self.user_service.user_tbl.SelectValue.assert_called_once_with(
580 self.cnxn, col='COUNT(*)')
581
582 def testGetAllUserEmailsBatch(self):
583 rows = [('cow@test.com',), ('pig@test.com',), ('fox@test.com',)]
584 self.user_service.user_tbl.Select = mock.Mock(return_value=rows)
585 emails = self.user_service.GetAllUserEmailsBatch(self.cnxn)
586 self.user_service.user_tbl.Select.assert_called_once_with(
587 self.cnxn, cols=['email'], limit=1000, offset=0,
588 where=[('user_id != %s', [framework_constants.DELETED_USER_ID])],
589 order_by=[('user_id ASC', [])])
590 self.assertItemsEqual(
591 emails, ['cow@test.com', 'pig@test.com', 'fox@test.com'])
592
593 def testGetAllUserEmailsBatch_CustomLimit(self):
594 rows = [('cow@test.com',), ('pig@test.com',), ('fox@test.com',)]
595 self.user_service.user_tbl.Select = mock.Mock(return_value=rows)
596 emails = self.user_service.GetAllUserEmailsBatch(
597 self.cnxn, limit=30, offset=60)
598 self.user_service.user_tbl.Select.assert_called_once_with(
599 self.cnxn, cols=['email'], limit=30, offset=60,
600 where=[('user_id != %s', [framework_constants.DELETED_USER_ID])],
601 order_by=[('user_id ASC', [])])
602 self.assertItemsEqual(
603 emails, ['cow@test.com', 'pig@test.com', 'fox@test.com'])