blob: 4a8eb169fccdcf7f28fcf923ef164db7488658c2 [file] [log] [blame]
Copybara854996b2021-09-07 19:36:02 +00001# Copyright 2016 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style
3# license that can be found in the LICENSE file or at
4# https://developers.google.com/open-source/licenses/bsd
5
6"""Tests for the user service."""
7from __future__ import print_function
8from __future__ import division
9from __future__ import absolute_import
10
11import unittest
12
13import mock
14import mox
15import time
16
17from google.appengine.ext import testbed
18
19from framework import exceptions
20from framework import framework_constants
21from framework import sql
22from proto import user_pb2
23from services import user_svc
24from testing import fake
25
26
27def SetUpGetUsers(user_service, cnxn):
28 """Set up expected calls to SQL tables."""
29 user_service.user_tbl.Select(
30 cnxn, cols=user_svc.USER_COLS, user_id=[333]).AndReturn(
31 [(333, 'c@example.com', False, False, False, False, True,
32 False, 'Spammer',
33 'stay_same_issue', False, False, True, 0, 0, None)])
34 user_service.linkedaccount_tbl.Select(
35 cnxn, cols=user_svc.LINKEDACCOUNT_COLS, parent_id=[333], child_id=[333],
36 or_where_conds=True).AndReturn([])
37
38
39def MakeUserService(cache_manager, my_mox):
40 user_service = user_svc.UserService(cache_manager)
41 user_service.user_tbl = my_mox.CreateMock(sql.SQLTableManager)
42 user_service.hotlistvisithistory_tbl = my_mox.CreateMock(sql.SQLTableManager)
43 user_service.linkedaccount_tbl = my_mox.CreateMock(sql.SQLTableManager)
44 # Account linking invites are done with patch().
45 return user_service
46
47
48class UserTwoLevelCacheTest(unittest.TestCase):
49
50 def setUp(self):
51 self.testbed = testbed.Testbed()
52 self.testbed.activate()
53 self.testbed.init_memcache_stub()
54
55 self.mox = mox.Mox()
56 self.cnxn = fake.MonorailConnection()
57 self.cache_manager = fake.CacheManager()
58 self.user_service = MakeUserService(self.cache_manager, self.mox)
59
60 def tearDown(self):
61 self.testbed.deactivate()
62
63 def testDeserializeUsersByID(self):
64 user_rows = [
65 (111, 'a@example.com', False, False, False, False, True, False, '',
66 'stay_same_issue', False, False, True, 0, 0, None),
67 (222, 'b@example.com', False, False, False, False, True, False, '',
68 'next_in_list', False, False, True, 0, 0, None),
69 ]
70 linkedaccount_rows = []
71 user_dict = self.user_service.user_2lc._DeserializeUsersByID(
72 user_rows, linkedaccount_rows)
73 self.assertEqual(2, len(user_dict))
74 self.assertEqual('a@example.com', user_dict[111].email)
75 self.assertFalse(user_dict[111].is_site_admin)
76 self.assertEqual('', user_dict[111].banned)
77 self.assertFalse(user_dict[111].notify_issue_change)
78 self.assertEqual('b@example.com', user_dict[222].email)
79 self.assertIsNone(user_dict[111].linked_parent_id)
80 self.assertEqual([], user_dict[111].linked_child_ids)
81 self.assertIsNone(user_dict[222].linked_parent_id)
82 self.assertEqual([], user_dict[222].linked_child_ids)
83
84 def testDeserializeUsersByID_LinkedAccounts(self):
85 user_rows = [
86 (111, 'a@example.com', False, False, False, False, True, False, '',
87 'stay_same_issue', False, False, True, 0, 0, None),
88 ]
89 linkedaccount_rows = [(111, 222), (111, 333), (444, 111)]
90 user_dict = self.user_service.user_2lc._DeserializeUsersByID(
91 user_rows, linkedaccount_rows)
92 self.assertEqual(1, len(user_dict))
93 user_pb = user_dict[111]
94 self.assertEqual('a@example.com', user_pb.email)
95 self.assertEqual(444, user_pb.linked_parent_id)
96 self.assertEqual([222, 333], user_pb.linked_child_ids)
97
98 def testFetchItems(self):
99 SetUpGetUsers(self.user_service, self.cnxn)
100 self.mox.ReplayAll()
101 user_dict = self.user_service.user_2lc.FetchItems(self.cnxn, [333])
102 self.mox.VerifyAll()
103 self.assertEqual([333], list(user_dict.keys()))
104 self.assertEqual('c@example.com', user_dict[333].email)
105 self.assertFalse(user_dict[333].is_site_admin)
106 self.assertEqual('Spammer', user_dict[333].banned)
107
108
109class UserServiceTest(unittest.TestCase):
110
111 def setUp(self):
112 self.testbed = testbed.Testbed()
113 self.testbed.activate()
114 self.testbed.init_memcache_stub()
115
116 self.mox = mox.Mox()
117 self.cnxn = fake.MonorailConnection()
118 self.cache_manager = fake.CacheManager()
119 self.user_service = MakeUserService(self.cache_manager, self.mox)
120
121 def tearDown(self):
122 self.testbed.deactivate()
123 self.mox.UnsetStubs()
124 self.mox.ResetAll()
125
126 def SetUpCreateUsers(self):
127 self.user_service.user_tbl.InsertRows(
128 self.cnxn,
129 ['user_id', 'email', 'obscure_email'],
130 [(3035911623, 'a@example.com', True),
131 (2996997680, 'b@example.com', True)]
132 ).AndReturn(None)
133
134 def testCreateUsers(self):
135 self.SetUpCreateUsers()
136 self.mox.ReplayAll()
137 self.user_service._CreateUsers(
138 self.cnxn, ['a@example.com', 'b@example.com'])
139 self.mox.VerifyAll()
140
141 def SetUpLookupUserEmails(self):
142 self.user_service.user_tbl.Select(
143 self.cnxn, cols=['user_id', 'email'], user_id=[222]).AndReturn(
144 [(222, 'b@example.com')])
145
146 def testLookupUserEmails(self):
147 self.SetUpLookupUserEmails()
148 self.user_service.email_cache.CacheItem(
149 111, 'a@example.com')
150 self.mox.ReplayAll()
151 emails_dict = self.user_service.LookupUserEmails(
152 self.cnxn, [111, 222])
153 self.mox.VerifyAll()
154 self.assertEqual(
155 {111: 'a@example.com', 222: 'b@example.com'},
156 emails_dict)
157
158 def SetUpLookupUserEmails_Missed(self):
159 self.user_service.user_tbl.Select(
160 self.cnxn, cols=['user_id', 'email'], user_id=[222]).AndReturn([])
161 self.user_service.email_cache.CacheItem(
162 111, 'a@example.com')
163
164 def testLookupUserEmails_Missed(self):
165 self.SetUpLookupUserEmails_Missed()
166 self.mox.ReplayAll()
167 with self.assertRaises(exceptions.NoSuchUserException):
168 self.user_service.LookupUserEmails(self.cnxn, [111, 222])
169 self.mox.VerifyAll()
170
171 def testLookUpUserEmails_IgnoreMissed(self):
172 self.SetUpLookupUserEmails_Missed()
173 self.mox.ReplayAll()
174 emails_dict = self.user_service.LookupUserEmails(
175 self.cnxn, [111, 222], ignore_missed=True)
176 self.mox.VerifyAll()
177 self.assertEqual({111: 'a@example.com'}, emails_dict)
178
179 def testLookupUserEmail(self):
180 self.SetUpLookupUserEmails() # Same as testLookupUserEmails()
181 self.mox.ReplayAll()
182 email_addr = self.user_service.LookupUserEmail(self.cnxn, 222)
183 self.mox.VerifyAll()
184 self.assertEqual('b@example.com', email_addr)
185
186 def SetUpLookupUserIDs(self):
187 self.user_service.user_tbl.Select(
188 self.cnxn, cols=['email', 'user_id'],
189 email=['b@example.com']).AndReturn([('b@example.com', 222)])
190
191 def testLookupUserIDs(self):
192 self.SetUpLookupUserIDs()
193 self.user_service.user_id_cache.CacheItem(
194 'a@example.com', 111)
195 self.mox.ReplayAll()
196 user_id_dict = self.user_service.LookupUserIDs(
197 self.cnxn, ['a@example.com', 'b@example.com'])
198 self.mox.VerifyAll()
199 self.assertEqual(
200 {'a@example.com': 111, 'b@example.com': 222},
201 user_id_dict)
202
203 def testLookupUserIDs_InvalidEmail(self):
204 self.user_service.user_tbl.Select(
205 self.cnxn, cols=['email', 'user_id'], email=['abc']).AndReturn([])
206 self.mox.ReplayAll()
207 user_id_dict = self.user_service.LookupUserIDs(
208 self.cnxn, ['abc'], autocreate=True)
209 self.mox.VerifyAll()
210 self.assertEqual({}, user_id_dict)
211
212 def testLookupUserIDs_NoUserValue(self):
213 self.user_service.user_tbl.Select = mock.Mock(
214 return_value=[('b@example.com', 222)])
215 user_id_dict = self.user_service.LookupUserIDs(
216 self.cnxn, [framework_constants.NO_VALUES, '', 'b@example.com'])
217 self.assertEqual({'b@example.com': 222}, user_id_dict)
218 self.user_service.user_tbl.Select.assert_called_once_with(
219 self.cnxn, cols=['email', 'user_id'], email=['b@example.com'])
220
221 def testLookupUserID(self):
222 self.SetUpLookupUserIDs() # Same as testLookupUserIDs()
223 self.user_service.user_id_cache.CacheItem('a@example.com', 111)
224 self.mox.ReplayAll()
225 user_id = self.user_service.LookupUserID(self.cnxn, 'b@example.com')
226 self.mox.VerifyAll()
227 self.assertEqual(222, user_id)
228
229 def SetUpGetUsersByIDs(self):
230 self.user_service.user_tbl.Select(
231 self.cnxn, cols=user_svc.USER_COLS, user_id=[333, 444]).AndReturn(
232 [
233 (
234 333, 'c@example.com', False, False, False, False, True,
235 False, 'Spammer', 'stay_same_issue', False, False, True, 0,
236 0, None)
237 ])
238 self.user_service.linkedaccount_tbl.Select(
239 self.cnxn,
240 cols=user_svc.LINKEDACCOUNT_COLS,
241 parent_id=[333, 444],
242 child_id=[333, 444],
243 or_where_conds=True).AndReturn([])
244
245
246 def testGetUsersByIDs(self):
247 self.SetUpGetUsersByIDs()
248 user_a = user_pb2.User(email='a@example.com')
249 self.user_service.user_2lc.CacheItem(111, user_a)
250 self.mox.ReplayAll()
251 # 444 user does not exist.
252 user_dict = self.user_service.GetUsersByIDs(self.cnxn, [111, 333, 444])
253 self.mox.VerifyAll()
254 self.assertEqual(3, len(user_dict))
255 self.assertEqual('a@example.com', user_dict[111].email)
256 self.assertFalse(user_dict[111].is_site_admin)
257 self.assertFalse(user_dict[111].banned)
258 self.assertTrue(user_dict[111].notify_issue_change)
259 self.assertEqual('c@example.com', user_dict[333].email)
260 self.assertEqual(user_dict[444], user_pb2.MakeUser(444))
261
262 def testGetUsersByIDs_SkipMissed(self):
263 self.SetUpGetUsersByIDs()
264 user_a = user_pb2.User(email='a@example.com')
265 self.user_service.user_2lc.CacheItem(111, user_a)
266 self.mox.ReplayAll()
267 # 444 user does not exist
268 user_dict = self.user_service.GetUsersByIDs(
269 self.cnxn, [111, 333, 444], skip_missed=True)
270 self.mox.VerifyAll()
271 self.assertEqual(2, len(user_dict))
272 self.assertEqual('a@example.com', user_dict[111].email)
273 self.assertFalse(user_dict[111].is_site_admin)
274 self.assertFalse(user_dict[111].banned)
275 self.assertTrue(user_dict[111].notify_issue_change)
276 self.assertEqual('c@example.com', user_dict[333].email)
277
278 def testGetUser(self):
279 SetUpGetUsers(self.user_service, self.cnxn)
280 user_a = user_pb2.User(email='a@example.com')
281 self.user_service.user_2lc.CacheItem(111, user_a)
282 self.mox.ReplayAll()
283 user = self.user_service.GetUser(self.cnxn, 333)
284 self.mox.VerifyAll()
285 self.assertEqual('c@example.com', user.email)
286
287 def SetUpUpdateUser(self):
288 delta = {
289 'keep_people_perms_open': False,
290 'preview_on_hover': True,
291 'notify_issue_change': True,
292 'after_issue_update': 'STAY_SAME_ISSUE',
293 'notify_starred_issue_change': True,
294 'notify_starred_ping': False,
295 'is_site_admin': False,
296 'banned': 'Turned spammer',
297 'obscure_email': True,
298 'email_compact_subject': False,
299 'email_view_widget': True,
300 'last_visit_timestamp': 0,
301 'email_bounce_timestamp': 0,
302 'vacation_message': None,
303 }
304 self.user_service.user_tbl.Update(
305 self.cnxn, delta, user_id=111, commit=False)
306
307 def testUpdateUser(self):
308 self.SetUpUpdateUser()
309 user_a = user_pb2.User(
310 email='a@example.com', banned='Turned spammer')
311 self.mox.ReplayAll()
312 self.user_service.UpdateUser(self.cnxn, 111, user_a)
313 self.mox.VerifyAll()
314 self.assertFalse(self.user_service.user_2lc.HasItem(111))
315
316 def SetUpGetRecentlyVisitedHotlists(self):
317 self.user_service.hotlistvisithistory_tbl.Select(
318 self.cnxn, cols=['hotlist_id'], user_id=[111],
319 order_by=[('viewed DESC', [])], limit=10).AndReturn(
320 ((123,), (234,)))
321
322 def testGetRecentlyVisitedHotlists(self):
323 self.SetUpGetRecentlyVisitedHotlists()
324 self.mox.ReplayAll()
325 recent_hotlist_rows = self.user_service.GetRecentlyVisitedHotlists(
326 self.cnxn, 111)
327 self.mox.VerifyAll()
328 self.assertEqual(recent_hotlist_rows, [123, 234])
329
330 def SetUpAddVisitedHotlist(self, ts):
331 self.user_service.hotlistvisithistory_tbl.Delete(
332 self.cnxn, hotlist_id=123, user_id=111, commit=False)
333 self.user_service.hotlistvisithistory_tbl.InsertRows(
334 self.cnxn, user_svc.HOTLISTVISITHISTORY_COLS,
335 [(123, 111, ts)],
336 commit=False)
337
338 @mock.patch('time.time')
339 def testAddVisitedHotlist(self, mockTime):
340 ts = 122333
341 mockTime.return_value = ts
342 self.SetUpAddVisitedHotlist(ts)
343 self.mox.ReplayAll()
344 self.user_service.AddVisitedHotlist(self.cnxn, 111, 123, commit=False)
345 self.mox.VerifyAll()
346
347 def testExpungeHotlistsFromHistory(self):
348 self.user_service.hotlistvisithistory_tbl.Delete = mock.Mock()
349 hotlist_ids = [123, 223]
350 self.user_service.ExpungeHotlistsFromHistory(
351 self.cnxn, hotlist_ids, commit=False)
352 self.user_service.hotlistvisithistory_tbl.Delete.assert_called_once_with(
353 self.cnxn, hotlist_id=hotlist_ids, commit=False)
354
355 def testExpungeUsersHotlistsHistory(self):
356 self.user_service.hotlistvisithistory_tbl.Delete = mock.Mock()
357 user_ids = [111, 222]
358 self.user_service.ExpungeUsersHotlistsHistory(
359 self.cnxn, user_ids, commit=False)
360 self.user_service.hotlistvisithistory_tbl.Delete.assert_called_once_with(
361 self.cnxn, user_id=user_ids, commit=False)
362
363 def SetUpTrimUserVisitedHotlists(self, user_ids, ts):
364 self.user_service.hotlistvisithistory_tbl.Select(
365 self.cnxn, cols=['user_id'], group_by=['user_id'],
366 having=[('COUNT(*) > %s', [10])], limit=1000).AndReturn((
367 (111,), (222,), (333,)))
368 for user_id in user_ids:
369 self.user_service.hotlistvisithistory_tbl.Select(
370 self.cnxn, cols=['viewed'], user_id=user_id,
371 order_by=[('viewed DESC', [])]).AndReturn([
372 (ts,), (ts,), (ts,), (ts,), (ts,), (ts,),
373 (ts,), (ts,), (ts,), (ts,), (ts+1,)])
374 self.user_service.hotlistvisithistory_tbl.Delete(
375 self.cnxn, user_id=user_id, where=[('viewed < %s', [ts])],
376 commit=False)
377
378 @mock.patch('time.time')
379 def testTrimUserVisitedHotlists(self, mockTime):
380 ts = 122333
381 mockTime.return_value = ts
382 self.SetUpTrimUserVisitedHotlists([111, 222, 333], ts)
383 self.mox.ReplayAll()
384 self.user_service.TrimUserVisitedHotlists(self.cnxn, commit=False)
385 self.mox.VerifyAll()
386
387 def testGetPendingLinkedInvites_Anon(self):
388 """An Anon user never has invites to link accounts."""
389 as_parent, as_child = self.user_service.GetPendingLinkedInvites(
390 self.cnxn, 0)
391 self.assertEqual([], as_parent)
392 self.assertEqual([], as_child)
393
394 def testGetPendingLinkedInvites_None(self):
395 """A user who has no link invites gets empty lists."""
396 self.user_service.linkedaccountinvite_tbl = mock.Mock()
397 self.user_service.linkedaccountinvite_tbl.Select.return_value = []
398 as_parent, as_child = self.user_service.GetPendingLinkedInvites(
399 self.cnxn, 111)
400 self.assertEqual([], as_parent)
401 self.assertEqual([], as_child)
402
403 def testGetPendingLinkedInvites_Some(self):
404 """A user who has link invites can get them."""
405 self.user_service.linkedaccountinvite_tbl = mock.Mock()
406 self.user_service.linkedaccountinvite_tbl.Select.return_value = [
407 (111, 222), (111, 333), (888, 999), (333, 111)]
408 as_parent, as_child = self.user_service.GetPendingLinkedInvites(
409 self.cnxn, 111)
410 self.assertEqual([222, 333], as_parent)
411 self.assertEqual([333], as_child)
412
413 def testAssertNotAlreadyLinked_NotLinked(self):
414 """No exception is raised when accounts are not already linked."""
415 self.user_service.linkedaccount_tbl = mock.Mock()
416 self.user_service.linkedaccount_tbl.Select.return_value = []
417 self.user_service._AssertNotAlreadyLinked(self.cnxn, 111, 222)
418
419 def testAssertNotAlreadyLinked_AlreadyLinked(self):
420 """Reject attempt to link any account that is already linked."""
421 self.user_service.linkedaccount_tbl = mock.Mock()
422 self.user_service.linkedaccount_tbl.Select.return_value = [
423 (111, 222)]
424 with self.assertRaises(exceptions.InputException):
425 self.user_service._AssertNotAlreadyLinked(self.cnxn, 111, 333)
426
427 def testInviteLinkedParent_Anon(self):
428 """Anon cannot invite anyone to link accounts."""
429 with self.assertRaises(exceptions.InputException):
430 self.user_service.InviteLinkedParent(self.cnxn, 0, 0)
431 with self.assertRaises(exceptions.InputException):
432 self.user_service.InviteLinkedParent(self.cnxn, 111, 0)
433 with self.assertRaises(exceptions.InputException):
434 self.user_service.InviteLinkedParent(self.cnxn, 0, 111)
435
436 def testInviteLinkedParent_Normal(self):
437 """One account can invite another to link."""
438 self.user_service.linkedaccount_tbl = mock.Mock()
439 self.user_service.linkedaccount_tbl.Select.return_value = []
440 self.user_service.linkedaccountinvite_tbl = mock.Mock()
441 self.user_service.InviteLinkedParent(
442 self.cnxn, 111, 222)
443 self.user_service.linkedaccountinvite_tbl.InsertRow.assert_called_once_with(
444 self.cnxn, parent_id=111, child_id=222)
445
446 def testAcceptLinkedChild_Anon(self):
447 """Reject attempts for anon to accept any invite."""
448 with self.assertRaises(exceptions.InputException):
449 self.user_service.AcceptLinkedChild(self.cnxn, 0, 333)
450 with self.assertRaises(exceptions.InputException):
451 self.user_service.AcceptLinkedChild(self.cnxn, 333, 0)
452
453 def testAcceptLinkedChild_Missing(self):
454 """Reject attempts to link without a matching invite."""
455 self.user_service.linkedaccountinvite_tbl = mock.Mock()
456 self.user_service.linkedaccountinvite_tbl.Select.return_value = []
457 self.user_service.linkedaccount_tbl = mock.Mock()
458 self.user_service.linkedaccount_tbl.Select.return_value = []
459 with self.assertRaises(exceptions.InputException) as cm:
460 self.user_service.AcceptLinkedChild(self.cnxn, 111, 333)
461 self.assertEqual('No such invite', cm.exception.message)
462
463 def testAcceptLinkedChild_Normal(self):
464 """Create linkage between accounts and remove invite."""
465 self.user_service.linkedaccountinvite_tbl = mock.Mock()
466 self.user_service.linkedaccountinvite_tbl.Select.return_value = [
467 (111, 222), (333, 444)]
468 self.user_service.linkedaccount_tbl = mock.Mock()
469 self.user_service.linkedaccount_tbl.Select.return_value = []
470
471 self.user_service.AcceptLinkedChild(self.cnxn, 111, 222)
472 self.user_service.linkedaccount_tbl.InsertRow.assert_called_once_with(
473 self.cnxn, parent_id=111, child_id=222)
474 self.user_service.linkedaccountinvite_tbl.Delete.assert_called_once_with(
475 self.cnxn, parent_id=111, child_id=222)
476
477 def testUnlinkAccounts_MissingIDs(self):
478 """Reject an attempt to unlink anon."""
479 with self.assertRaises(exceptions.InputException):
480 self.user_service.UnlinkAccounts(self.cnxn, 0, 0)
481 with self.assertRaises(exceptions.InputException):
482 self.user_service.UnlinkAccounts(self.cnxn, 0, 111)
483 with self.assertRaises(exceptions.InputException):
484 self.user_service.UnlinkAccounts(self.cnxn, 111, 0)
485
486 def testUnlinkAccounts_Normal(self):
487 """We can unlink accounts."""
488 self.user_service.linkedaccount_tbl = mock.Mock()
489 self.user_service.UnlinkAccounts(self.cnxn, 111, 222)
490 self.user_service.linkedaccount_tbl.Delete.assert_called_once_with(
491 self.cnxn, parent_id=111, child_id=222)
492
493 def testUpdateUserSettings(self):
494 self.SetUpUpdateUser()
495 user_a = user_pb2.User(email='a@example.com')
496 self.mox.ReplayAll()
497 self.user_service.UpdateUserSettings(
498 self.cnxn, 111, user_a, is_banned=True,
499 banned_reason='Turned spammer')
500 self.mox.VerifyAll()
501
502 def testGetUsersPrefs(self):
503 self.user_service.userprefs_tbl = mock.Mock()
504 self.user_service.userprefs_tbl.Select.return_value = [
505 (111, 'code_font', 'true'),
506 (111, 'keep_perms_open', 'true'),
507 # Note: user 222 has not set any prefs.
508 (333, 'code_font', 'false')]
509
510 prefs_dict = self.user_service.GetUsersPrefs(self.cnxn, [111, 222, 333])
511
512 expected = {
513 111: user_pb2.UserPrefs(
514 user_id=111,
515 prefs=[user_pb2.UserPrefValue(name='code_font', value='true'),
516 user_pb2.UserPrefValue(name='keep_perms_open', value='true')]),
517 222: user_pb2.UserPrefs(user_id=222),
518 333: user_pb2.UserPrefs(
519 user_id=333,
520 prefs=[user_pb2.UserPrefValue(name='code_font', value='false')]),
521 }
522 self.assertEqual(expected, prefs_dict)
523
524 def testGetUserPrefs(self):
525 self.user_service.userprefs_tbl = mock.Mock()
526 self.user_service.userprefs_tbl.Select.return_value = [
527 (111, 'code_font', 'true'),
528 (111, 'keep_perms_open', 'true'),
529 # Note: user 222 has not set any prefs.
530 (333, 'code_font', 'false')]
531
532 userprefs = self.user_service.GetUserPrefs(self.cnxn, 111)
533 expected = user_pb2.UserPrefs(
534 user_id=111,
535 prefs=[user_pb2.UserPrefValue(name='code_font', value='true'),
536 user_pb2.UserPrefValue(name='keep_perms_open', value='true')])
537 self.assertEqual(expected, userprefs)
538
539 userprefs = self.user_service.GetUserPrefs(self.cnxn, 222)
540 expected = user_pb2.UserPrefs(user_id=222)
541 self.assertEqual(expected, userprefs)
542
543 def testSetUserPrefs(self):
544 self.user_service.userprefs_tbl = mock.Mock()
545 pref_values = [user_pb2.UserPrefValue(name='code_font', value='true'),
546 user_pb2.UserPrefValue(name='keep_perms_open', value='true')]
547 self.user_service.SetUserPrefs(self.cnxn, 111, pref_values)
548 self.user_service.userprefs_tbl.InsertRows.assert_called_once_with(
549 self.cnxn, user_svc.USERPREFS_COLS,
550 [(111, 'code_font', 'true'),
551 (111, 'keep_perms_open', 'true')],
552 replace=True)
553
554 def testExpungeUsers(self):
555 self.user_service.linkedaccount_tbl.Delete = mock.Mock()
556 self.user_service.linkedaccountinvite_tbl.Delete = mock.Mock()
557 self.user_service.userprefs_tbl.Delete = mock.Mock()
558 self.user_service.user_tbl.Delete = mock.Mock()
559
560 user_ids = [222, 444]
561 self.user_service.ExpungeUsers(self.cnxn, user_ids)
562
563 linked_account_calls = [
564 mock.call(self.cnxn, parent_id=user_ids, commit=False),
565 mock.call(self.cnxn, child_id=user_ids, commit=False)]
566 self.user_service.linkedaccount_tbl.Delete.has_calls(linked_account_calls)
567 self.user_service.linkedaccountinvite_tbl.Delete.has_calls(
568 linked_account_calls)
569 user_calls = [mock.call(self.cnxn, user_id=user_ids, commit=False)]
570 self.user_service.userprefs_tbl.Delete.has_calls(user_calls)
571 self.user_service.user_tbl.Delete.has_calls(user_calls)
572
573 def testTotalUsersCount(self):
574 self.user_service.user_tbl.SelectValue = mock.Mock(return_value=10)
575 self.assertEqual(self.user_service.TotalUsersCount(self.cnxn), 9)
576 self.user_service.user_tbl.SelectValue.assert_called_once_with(
577 self.cnxn, col='COUNT(*)')
578
579 def testGetAllUserEmailsBatch(self):
580 rows = [('cow@test.com',), ('pig@test.com',), ('fox@test.com',)]
581 self.user_service.user_tbl.Select = mock.Mock(return_value=rows)
582 emails = self.user_service.GetAllUserEmailsBatch(self.cnxn)
583 self.user_service.user_tbl.Select.assert_called_once_with(
584 self.cnxn, cols=['email'], limit=1000, offset=0,
585 where=[('user_id != %s', [framework_constants.DELETED_USER_ID])],
586 order_by=[('user_id ASC', [])])
587 self.assertItemsEqual(
588 emails, ['cow@test.com', 'pig@test.com', 'fox@test.com'])
589
590 def testGetAllUserEmailsBatch_CustomLimit(self):
591 rows = [('cow@test.com',), ('pig@test.com',), ('fox@test.com',)]
592 self.user_service.user_tbl.Select = mock.Mock(return_value=rows)
593 emails = self.user_service.GetAllUserEmailsBatch(
594 self.cnxn, limit=30, offset=60)
595 self.user_service.user_tbl.Select.assert_called_once_with(
596 self.cnxn, cols=['email'], limit=30, offset=60,
597 where=[('user_id != %s', [framework_constants.DELETED_USER_ID])],
598 order_by=[('user_id ASC', [])])
599 self.assertItemsEqual(
600 emails, ['cow@test.com', 'pig@test.com', 'fox@test.com'])