Copybara | 854996b | 2021-09-07 19:36:02 +0000 | [diff] [blame] | 1 | # Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 | # Use of this source code is governed by a BSD-style |
| 3 | # license that can be found in the LICENSE file or at |
| 4 | # https://developers.google.com/open-source/licenses/bsd |
| 5 | |
| 6 | """Tests for the user service.""" |
| 7 | from __future__ import print_function |
| 8 | from __future__ import division |
| 9 | from __future__ import absolute_import |
| 10 | |
| 11 | import unittest |
| 12 | |
| 13 | import mock |
Adrià Vilanova MartÃnez | 9f9ade5 | 2022-10-10 23:20:11 +0200 | [diff] [blame^] | 14 | try: |
| 15 | from mox3 import mox |
| 16 | except ImportError: |
| 17 | import mox |
Copybara | 854996b | 2021-09-07 19:36:02 +0000 | [diff] [blame] | 18 | import time |
| 19 | |
| 20 | from google.appengine.ext import testbed |
| 21 | |
| 22 | from framework import exceptions |
| 23 | from framework import framework_constants |
| 24 | from framework import sql |
| 25 | from proto import user_pb2 |
| 26 | from services import user_svc |
| 27 | from testing import fake |
| 28 | |
| 29 | |
| 30 | def SetUpGetUsers(user_service, cnxn): |
| 31 | """Set up expected calls to SQL tables.""" |
| 32 | user_service.user_tbl.Select( |
| 33 | cnxn, cols=user_svc.USER_COLS, user_id=[333]).AndReturn( |
| 34 | [(333, 'c@example.com', False, False, False, False, True, |
| 35 | False, 'Spammer', |
| 36 | 'stay_same_issue', False, False, True, 0, 0, None)]) |
| 37 | user_service.linkedaccount_tbl.Select( |
| 38 | cnxn, cols=user_svc.LINKEDACCOUNT_COLS, parent_id=[333], child_id=[333], |
| 39 | or_where_conds=True).AndReturn([]) |
| 40 | |
| 41 | |
| 42 | def MakeUserService(cache_manager, my_mox): |
| 43 | user_service = user_svc.UserService(cache_manager) |
| 44 | user_service.user_tbl = my_mox.CreateMock(sql.SQLTableManager) |
| 45 | user_service.hotlistvisithistory_tbl = my_mox.CreateMock(sql.SQLTableManager) |
| 46 | user_service.linkedaccount_tbl = my_mox.CreateMock(sql.SQLTableManager) |
| 47 | # Account linking invites are done with patch(). |
| 48 | return user_service |
| 49 | |
| 50 | |
| 51 | class UserTwoLevelCacheTest(unittest.TestCase): |
| 52 | |
| 53 | def setUp(self): |
| 54 | self.testbed = testbed.Testbed() |
| 55 | self.testbed.activate() |
| 56 | self.testbed.init_memcache_stub() |
| 57 | |
| 58 | self.mox = mox.Mox() |
| 59 | self.cnxn = fake.MonorailConnection() |
| 60 | self.cache_manager = fake.CacheManager() |
| 61 | self.user_service = MakeUserService(self.cache_manager, self.mox) |
| 62 | |
| 63 | def tearDown(self): |
| 64 | self.testbed.deactivate() |
| 65 | |
| 66 | def testDeserializeUsersByID(self): |
| 67 | user_rows = [ |
| 68 | (111, 'a@example.com', False, False, False, False, True, False, '', |
| 69 | 'stay_same_issue', False, False, True, 0, 0, None), |
| 70 | (222, 'b@example.com', False, False, False, False, True, False, '', |
| 71 | 'next_in_list', False, False, True, 0, 0, None), |
| 72 | ] |
| 73 | linkedaccount_rows = [] |
| 74 | user_dict = self.user_service.user_2lc._DeserializeUsersByID( |
| 75 | user_rows, linkedaccount_rows) |
| 76 | self.assertEqual(2, len(user_dict)) |
| 77 | self.assertEqual('a@example.com', user_dict[111].email) |
| 78 | self.assertFalse(user_dict[111].is_site_admin) |
| 79 | self.assertEqual('', user_dict[111].banned) |
| 80 | self.assertFalse(user_dict[111].notify_issue_change) |
| 81 | self.assertEqual('b@example.com', user_dict[222].email) |
| 82 | self.assertIsNone(user_dict[111].linked_parent_id) |
| 83 | self.assertEqual([], user_dict[111].linked_child_ids) |
| 84 | self.assertIsNone(user_dict[222].linked_parent_id) |
| 85 | self.assertEqual([], user_dict[222].linked_child_ids) |
| 86 | |
| 87 | def testDeserializeUsersByID_LinkedAccounts(self): |
| 88 | user_rows = [ |
| 89 | (111, 'a@example.com', False, False, False, False, True, False, '', |
| 90 | 'stay_same_issue', False, False, True, 0, 0, None), |
| 91 | ] |
| 92 | linkedaccount_rows = [(111, 222), (111, 333), (444, 111)] |
| 93 | user_dict = self.user_service.user_2lc._DeserializeUsersByID( |
| 94 | user_rows, linkedaccount_rows) |
| 95 | self.assertEqual(1, len(user_dict)) |
| 96 | user_pb = user_dict[111] |
| 97 | self.assertEqual('a@example.com', user_pb.email) |
| 98 | self.assertEqual(444, user_pb.linked_parent_id) |
| 99 | self.assertEqual([222, 333], user_pb.linked_child_ids) |
| 100 | |
| 101 | def testFetchItems(self): |
| 102 | SetUpGetUsers(self.user_service, self.cnxn) |
| 103 | self.mox.ReplayAll() |
| 104 | user_dict = self.user_service.user_2lc.FetchItems(self.cnxn, [333]) |
| 105 | self.mox.VerifyAll() |
| 106 | self.assertEqual([333], list(user_dict.keys())) |
| 107 | self.assertEqual('c@example.com', user_dict[333].email) |
| 108 | self.assertFalse(user_dict[333].is_site_admin) |
| 109 | self.assertEqual('Spammer', user_dict[333].banned) |
| 110 | |
| 111 | |
| 112 | class UserServiceTest(unittest.TestCase): |
| 113 | |
| 114 | def setUp(self): |
| 115 | self.testbed = testbed.Testbed() |
| 116 | self.testbed.activate() |
| 117 | self.testbed.init_memcache_stub() |
| 118 | |
| 119 | self.mox = mox.Mox() |
| 120 | self.cnxn = fake.MonorailConnection() |
| 121 | self.cache_manager = fake.CacheManager() |
| 122 | self.user_service = MakeUserService(self.cache_manager, self.mox) |
| 123 | |
| 124 | def tearDown(self): |
| 125 | self.testbed.deactivate() |
| 126 | self.mox.UnsetStubs() |
| 127 | self.mox.ResetAll() |
| 128 | |
| 129 | def SetUpCreateUsers(self): |
| 130 | self.user_service.user_tbl.InsertRows( |
| 131 | self.cnxn, |
| 132 | ['user_id', 'email', 'obscure_email'], |
| 133 | [(3035911623, 'a@example.com', True), |
| 134 | (2996997680, 'b@example.com', True)] |
| 135 | ).AndReturn(None) |
| 136 | |
| 137 | def testCreateUsers(self): |
| 138 | self.SetUpCreateUsers() |
| 139 | self.mox.ReplayAll() |
| 140 | self.user_service._CreateUsers( |
| 141 | self.cnxn, ['a@example.com', 'b@example.com']) |
| 142 | self.mox.VerifyAll() |
| 143 | |
| 144 | def SetUpLookupUserEmails(self): |
| 145 | self.user_service.user_tbl.Select( |
| 146 | self.cnxn, cols=['user_id', 'email'], user_id=[222]).AndReturn( |
| 147 | [(222, 'b@example.com')]) |
| 148 | |
| 149 | def testLookupUserEmails(self): |
| 150 | self.SetUpLookupUserEmails() |
| 151 | self.user_service.email_cache.CacheItem( |
| 152 | 111, 'a@example.com') |
| 153 | self.mox.ReplayAll() |
| 154 | emails_dict = self.user_service.LookupUserEmails( |
| 155 | self.cnxn, [111, 222]) |
| 156 | self.mox.VerifyAll() |
| 157 | self.assertEqual( |
| 158 | {111: 'a@example.com', 222: 'b@example.com'}, |
| 159 | emails_dict) |
| 160 | |
| 161 | def SetUpLookupUserEmails_Missed(self): |
| 162 | self.user_service.user_tbl.Select( |
| 163 | self.cnxn, cols=['user_id', 'email'], user_id=[222]).AndReturn([]) |
| 164 | self.user_service.email_cache.CacheItem( |
| 165 | 111, 'a@example.com') |
| 166 | |
| 167 | def testLookupUserEmails_Missed(self): |
| 168 | self.SetUpLookupUserEmails_Missed() |
| 169 | self.mox.ReplayAll() |
| 170 | with self.assertRaises(exceptions.NoSuchUserException): |
| 171 | self.user_service.LookupUserEmails(self.cnxn, [111, 222]) |
| 172 | self.mox.VerifyAll() |
| 173 | |
| 174 | def testLookUpUserEmails_IgnoreMissed(self): |
| 175 | self.SetUpLookupUserEmails_Missed() |
| 176 | self.mox.ReplayAll() |
| 177 | emails_dict = self.user_service.LookupUserEmails( |
| 178 | self.cnxn, [111, 222], ignore_missed=True) |
| 179 | self.mox.VerifyAll() |
| 180 | self.assertEqual({111: 'a@example.com'}, emails_dict) |
| 181 | |
| 182 | def testLookupUserEmail(self): |
| 183 | self.SetUpLookupUserEmails() # Same as testLookupUserEmails() |
| 184 | self.mox.ReplayAll() |
| 185 | email_addr = self.user_service.LookupUserEmail(self.cnxn, 222) |
| 186 | self.mox.VerifyAll() |
| 187 | self.assertEqual('b@example.com', email_addr) |
| 188 | |
| 189 | def SetUpLookupUserIDs(self): |
| 190 | self.user_service.user_tbl.Select( |
| 191 | self.cnxn, cols=['email', 'user_id'], |
| 192 | email=['b@example.com']).AndReturn([('b@example.com', 222)]) |
| 193 | |
| 194 | def testLookupUserIDs(self): |
| 195 | self.SetUpLookupUserIDs() |
| 196 | self.user_service.user_id_cache.CacheItem( |
| 197 | 'a@example.com', 111) |
| 198 | self.mox.ReplayAll() |
| 199 | user_id_dict = self.user_service.LookupUserIDs( |
| 200 | self.cnxn, ['a@example.com', 'b@example.com']) |
| 201 | self.mox.VerifyAll() |
| 202 | self.assertEqual( |
| 203 | {'a@example.com': 111, 'b@example.com': 222}, |
| 204 | user_id_dict) |
| 205 | |
| 206 | def testLookupUserIDs_InvalidEmail(self): |
| 207 | self.user_service.user_tbl.Select( |
| 208 | self.cnxn, cols=['email', 'user_id'], email=['abc']).AndReturn([]) |
| 209 | self.mox.ReplayAll() |
| 210 | user_id_dict = self.user_service.LookupUserIDs( |
| 211 | self.cnxn, ['abc'], autocreate=True) |
| 212 | self.mox.VerifyAll() |
| 213 | self.assertEqual({}, user_id_dict) |
| 214 | |
| 215 | def testLookupUserIDs_NoUserValue(self): |
| 216 | self.user_service.user_tbl.Select = mock.Mock( |
| 217 | return_value=[('b@example.com', 222)]) |
| 218 | user_id_dict = self.user_service.LookupUserIDs( |
| 219 | self.cnxn, [framework_constants.NO_VALUES, '', 'b@example.com']) |
| 220 | self.assertEqual({'b@example.com': 222}, user_id_dict) |
| 221 | self.user_service.user_tbl.Select.assert_called_once_with( |
| 222 | self.cnxn, cols=['email', 'user_id'], email=['b@example.com']) |
| 223 | |
| 224 | def testLookupUserID(self): |
| 225 | self.SetUpLookupUserIDs() # Same as testLookupUserIDs() |
| 226 | self.user_service.user_id_cache.CacheItem('a@example.com', 111) |
| 227 | self.mox.ReplayAll() |
| 228 | user_id = self.user_service.LookupUserID(self.cnxn, 'b@example.com') |
| 229 | self.mox.VerifyAll() |
| 230 | self.assertEqual(222, user_id) |
| 231 | |
| 232 | def SetUpGetUsersByIDs(self): |
| 233 | self.user_service.user_tbl.Select( |
| 234 | self.cnxn, cols=user_svc.USER_COLS, user_id=[333, 444]).AndReturn( |
| 235 | [ |
| 236 | ( |
| 237 | 333, 'c@example.com', False, False, False, False, True, |
| 238 | False, 'Spammer', 'stay_same_issue', False, False, True, 0, |
| 239 | 0, None) |
| 240 | ]) |
| 241 | self.user_service.linkedaccount_tbl.Select( |
| 242 | self.cnxn, |
| 243 | cols=user_svc.LINKEDACCOUNT_COLS, |
| 244 | parent_id=[333, 444], |
| 245 | child_id=[333, 444], |
| 246 | or_where_conds=True).AndReturn([]) |
| 247 | |
| 248 | |
| 249 | def testGetUsersByIDs(self): |
| 250 | self.SetUpGetUsersByIDs() |
| 251 | user_a = user_pb2.User(email='a@example.com') |
| 252 | self.user_service.user_2lc.CacheItem(111, user_a) |
| 253 | self.mox.ReplayAll() |
| 254 | # 444 user does not exist. |
| 255 | user_dict = self.user_service.GetUsersByIDs(self.cnxn, [111, 333, 444]) |
| 256 | self.mox.VerifyAll() |
| 257 | self.assertEqual(3, len(user_dict)) |
| 258 | self.assertEqual('a@example.com', user_dict[111].email) |
| 259 | self.assertFalse(user_dict[111].is_site_admin) |
| 260 | self.assertFalse(user_dict[111].banned) |
| 261 | self.assertTrue(user_dict[111].notify_issue_change) |
| 262 | self.assertEqual('c@example.com', user_dict[333].email) |
| 263 | self.assertEqual(user_dict[444], user_pb2.MakeUser(444)) |
| 264 | |
| 265 | def testGetUsersByIDs_SkipMissed(self): |
| 266 | self.SetUpGetUsersByIDs() |
| 267 | user_a = user_pb2.User(email='a@example.com') |
| 268 | self.user_service.user_2lc.CacheItem(111, user_a) |
| 269 | self.mox.ReplayAll() |
| 270 | # 444 user does not exist |
| 271 | user_dict = self.user_service.GetUsersByIDs( |
| 272 | self.cnxn, [111, 333, 444], skip_missed=True) |
| 273 | self.mox.VerifyAll() |
| 274 | self.assertEqual(2, len(user_dict)) |
| 275 | self.assertEqual('a@example.com', user_dict[111].email) |
| 276 | self.assertFalse(user_dict[111].is_site_admin) |
| 277 | self.assertFalse(user_dict[111].banned) |
| 278 | self.assertTrue(user_dict[111].notify_issue_change) |
| 279 | self.assertEqual('c@example.com', user_dict[333].email) |
| 280 | |
| 281 | def testGetUser(self): |
| 282 | SetUpGetUsers(self.user_service, self.cnxn) |
| 283 | user_a = user_pb2.User(email='a@example.com') |
| 284 | self.user_service.user_2lc.CacheItem(111, user_a) |
| 285 | self.mox.ReplayAll() |
| 286 | user = self.user_service.GetUser(self.cnxn, 333) |
| 287 | self.mox.VerifyAll() |
| 288 | self.assertEqual('c@example.com', user.email) |
| 289 | |
| 290 | def SetUpUpdateUser(self): |
| 291 | delta = { |
| 292 | 'keep_people_perms_open': False, |
| 293 | 'preview_on_hover': True, |
| 294 | 'notify_issue_change': True, |
| 295 | 'after_issue_update': 'STAY_SAME_ISSUE', |
| 296 | 'notify_starred_issue_change': True, |
| 297 | 'notify_starred_ping': False, |
| 298 | 'is_site_admin': False, |
| 299 | 'banned': 'Turned spammer', |
| 300 | 'obscure_email': True, |
| 301 | 'email_compact_subject': False, |
| 302 | 'email_view_widget': True, |
| 303 | 'last_visit_timestamp': 0, |
| 304 | 'email_bounce_timestamp': 0, |
| 305 | 'vacation_message': None, |
| 306 | } |
| 307 | self.user_service.user_tbl.Update( |
| 308 | self.cnxn, delta, user_id=111, commit=False) |
| 309 | |
| 310 | def testUpdateUser(self): |
| 311 | self.SetUpUpdateUser() |
| 312 | user_a = user_pb2.User( |
| 313 | email='a@example.com', banned='Turned spammer') |
| 314 | self.mox.ReplayAll() |
| 315 | self.user_service.UpdateUser(self.cnxn, 111, user_a) |
| 316 | self.mox.VerifyAll() |
| 317 | self.assertFalse(self.user_service.user_2lc.HasItem(111)) |
| 318 | |
| 319 | def SetUpGetRecentlyVisitedHotlists(self): |
| 320 | self.user_service.hotlistvisithistory_tbl.Select( |
| 321 | self.cnxn, cols=['hotlist_id'], user_id=[111], |
| 322 | order_by=[('viewed DESC', [])], limit=10).AndReturn( |
| 323 | ((123,), (234,))) |
| 324 | |
| 325 | def testGetRecentlyVisitedHotlists(self): |
| 326 | self.SetUpGetRecentlyVisitedHotlists() |
| 327 | self.mox.ReplayAll() |
| 328 | recent_hotlist_rows = self.user_service.GetRecentlyVisitedHotlists( |
| 329 | self.cnxn, 111) |
| 330 | self.mox.VerifyAll() |
| 331 | self.assertEqual(recent_hotlist_rows, [123, 234]) |
| 332 | |
| 333 | def SetUpAddVisitedHotlist(self, ts): |
| 334 | self.user_service.hotlistvisithistory_tbl.Delete( |
| 335 | self.cnxn, hotlist_id=123, user_id=111, commit=False) |
| 336 | self.user_service.hotlistvisithistory_tbl.InsertRows( |
| 337 | self.cnxn, user_svc.HOTLISTVISITHISTORY_COLS, |
| 338 | [(123, 111, ts)], |
| 339 | commit=False) |
| 340 | |
| 341 | @mock.patch('time.time') |
| 342 | def testAddVisitedHotlist(self, mockTime): |
| 343 | ts = 122333 |
| 344 | mockTime.return_value = ts |
| 345 | self.SetUpAddVisitedHotlist(ts) |
| 346 | self.mox.ReplayAll() |
| 347 | self.user_service.AddVisitedHotlist(self.cnxn, 111, 123, commit=False) |
| 348 | self.mox.VerifyAll() |
| 349 | |
| 350 | def testExpungeHotlistsFromHistory(self): |
| 351 | self.user_service.hotlistvisithistory_tbl.Delete = mock.Mock() |
| 352 | hotlist_ids = [123, 223] |
| 353 | self.user_service.ExpungeHotlistsFromHistory( |
| 354 | self.cnxn, hotlist_ids, commit=False) |
| 355 | self.user_service.hotlistvisithistory_tbl.Delete.assert_called_once_with( |
| 356 | self.cnxn, hotlist_id=hotlist_ids, commit=False) |
| 357 | |
| 358 | def testExpungeUsersHotlistsHistory(self): |
| 359 | self.user_service.hotlistvisithistory_tbl.Delete = mock.Mock() |
| 360 | user_ids = [111, 222] |
| 361 | self.user_service.ExpungeUsersHotlistsHistory( |
| 362 | self.cnxn, user_ids, commit=False) |
| 363 | self.user_service.hotlistvisithistory_tbl.Delete.assert_called_once_with( |
| 364 | self.cnxn, user_id=user_ids, commit=False) |
| 365 | |
| 366 | def SetUpTrimUserVisitedHotlists(self, user_ids, ts): |
| 367 | self.user_service.hotlistvisithistory_tbl.Select( |
| 368 | self.cnxn, cols=['user_id'], group_by=['user_id'], |
| 369 | having=[('COUNT(*) > %s', [10])], limit=1000).AndReturn(( |
| 370 | (111,), (222,), (333,))) |
| 371 | for user_id in user_ids: |
| 372 | self.user_service.hotlistvisithistory_tbl.Select( |
| 373 | self.cnxn, cols=['viewed'], user_id=user_id, |
| 374 | order_by=[('viewed DESC', [])]).AndReturn([ |
| 375 | (ts,), (ts,), (ts,), (ts,), (ts,), (ts,), |
| 376 | (ts,), (ts,), (ts,), (ts,), (ts+1,)]) |
| 377 | self.user_service.hotlistvisithistory_tbl.Delete( |
| 378 | self.cnxn, user_id=user_id, where=[('viewed < %s', [ts])], |
| 379 | commit=False) |
| 380 | |
| 381 | @mock.patch('time.time') |
| 382 | def testTrimUserVisitedHotlists(self, mockTime): |
| 383 | ts = 122333 |
| 384 | mockTime.return_value = ts |
| 385 | self.SetUpTrimUserVisitedHotlists([111, 222, 333], ts) |
| 386 | self.mox.ReplayAll() |
| 387 | self.user_service.TrimUserVisitedHotlists(self.cnxn, commit=False) |
| 388 | self.mox.VerifyAll() |
| 389 | |
| 390 | def testGetPendingLinkedInvites_Anon(self): |
| 391 | """An Anon user never has invites to link accounts.""" |
| 392 | as_parent, as_child = self.user_service.GetPendingLinkedInvites( |
| 393 | self.cnxn, 0) |
| 394 | self.assertEqual([], as_parent) |
| 395 | self.assertEqual([], as_child) |
| 396 | |
| 397 | def testGetPendingLinkedInvites_None(self): |
| 398 | """A user who has no link invites gets empty lists.""" |
| 399 | self.user_service.linkedaccountinvite_tbl = mock.Mock() |
| 400 | self.user_service.linkedaccountinvite_tbl.Select.return_value = [] |
| 401 | as_parent, as_child = self.user_service.GetPendingLinkedInvites( |
| 402 | self.cnxn, 111) |
| 403 | self.assertEqual([], as_parent) |
| 404 | self.assertEqual([], as_child) |
| 405 | |
| 406 | def testGetPendingLinkedInvites_Some(self): |
| 407 | """A user who has link invites can get them.""" |
| 408 | self.user_service.linkedaccountinvite_tbl = mock.Mock() |
| 409 | self.user_service.linkedaccountinvite_tbl.Select.return_value = [ |
| 410 | (111, 222), (111, 333), (888, 999), (333, 111)] |
| 411 | as_parent, as_child = self.user_service.GetPendingLinkedInvites( |
| 412 | self.cnxn, 111) |
| 413 | self.assertEqual([222, 333], as_parent) |
| 414 | self.assertEqual([333], as_child) |
| 415 | |
| 416 | def testAssertNotAlreadyLinked_NotLinked(self): |
| 417 | """No exception is raised when accounts are not already linked.""" |
| 418 | self.user_service.linkedaccount_tbl = mock.Mock() |
| 419 | self.user_service.linkedaccount_tbl.Select.return_value = [] |
| 420 | self.user_service._AssertNotAlreadyLinked(self.cnxn, 111, 222) |
| 421 | |
| 422 | def testAssertNotAlreadyLinked_AlreadyLinked(self): |
| 423 | """Reject attempt to link any account that is already linked.""" |
| 424 | self.user_service.linkedaccount_tbl = mock.Mock() |
| 425 | self.user_service.linkedaccount_tbl.Select.return_value = [ |
| 426 | (111, 222)] |
| 427 | with self.assertRaises(exceptions.InputException): |
| 428 | self.user_service._AssertNotAlreadyLinked(self.cnxn, 111, 333) |
| 429 | |
| 430 | def testInviteLinkedParent_Anon(self): |
| 431 | """Anon cannot invite anyone to link accounts.""" |
| 432 | with self.assertRaises(exceptions.InputException): |
| 433 | self.user_service.InviteLinkedParent(self.cnxn, 0, 0) |
| 434 | with self.assertRaises(exceptions.InputException): |
| 435 | self.user_service.InviteLinkedParent(self.cnxn, 111, 0) |
| 436 | with self.assertRaises(exceptions.InputException): |
| 437 | self.user_service.InviteLinkedParent(self.cnxn, 0, 111) |
| 438 | |
| 439 | def testInviteLinkedParent_Normal(self): |
| 440 | """One account can invite another to link.""" |
| 441 | self.user_service.linkedaccount_tbl = mock.Mock() |
| 442 | self.user_service.linkedaccount_tbl.Select.return_value = [] |
| 443 | self.user_service.linkedaccountinvite_tbl = mock.Mock() |
| 444 | self.user_service.InviteLinkedParent( |
| 445 | self.cnxn, 111, 222) |
| 446 | self.user_service.linkedaccountinvite_tbl.InsertRow.assert_called_once_with( |
| 447 | self.cnxn, parent_id=111, child_id=222) |
| 448 | |
| 449 | def testAcceptLinkedChild_Anon(self): |
| 450 | """Reject attempts for anon to accept any invite.""" |
| 451 | with self.assertRaises(exceptions.InputException): |
| 452 | self.user_service.AcceptLinkedChild(self.cnxn, 0, 333) |
| 453 | with self.assertRaises(exceptions.InputException): |
| 454 | self.user_service.AcceptLinkedChild(self.cnxn, 333, 0) |
| 455 | |
| 456 | def testAcceptLinkedChild_Missing(self): |
| 457 | """Reject attempts to link without a matching invite.""" |
| 458 | self.user_service.linkedaccountinvite_tbl = mock.Mock() |
| 459 | self.user_service.linkedaccountinvite_tbl.Select.return_value = [] |
| 460 | self.user_service.linkedaccount_tbl = mock.Mock() |
| 461 | self.user_service.linkedaccount_tbl.Select.return_value = [] |
| 462 | with self.assertRaises(exceptions.InputException) as cm: |
| 463 | self.user_service.AcceptLinkedChild(self.cnxn, 111, 333) |
| 464 | self.assertEqual('No such invite', cm.exception.message) |
| 465 | |
| 466 | def testAcceptLinkedChild_Normal(self): |
| 467 | """Create linkage between accounts and remove invite.""" |
| 468 | self.user_service.linkedaccountinvite_tbl = mock.Mock() |
| 469 | self.user_service.linkedaccountinvite_tbl.Select.return_value = [ |
| 470 | (111, 222), (333, 444)] |
| 471 | self.user_service.linkedaccount_tbl = mock.Mock() |
| 472 | self.user_service.linkedaccount_tbl.Select.return_value = [] |
| 473 | |
| 474 | self.user_service.AcceptLinkedChild(self.cnxn, 111, 222) |
| 475 | self.user_service.linkedaccount_tbl.InsertRow.assert_called_once_with( |
| 476 | self.cnxn, parent_id=111, child_id=222) |
| 477 | self.user_service.linkedaccountinvite_tbl.Delete.assert_called_once_with( |
| 478 | self.cnxn, parent_id=111, child_id=222) |
| 479 | |
| 480 | def testUnlinkAccounts_MissingIDs(self): |
| 481 | """Reject an attempt to unlink anon.""" |
| 482 | with self.assertRaises(exceptions.InputException): |
| 483 | self.user_service.UnlinkAccounts(self.cnxn, 0, 0) |
| 484 | with self.assertRaises(exceptions.InputException): |
| 485 | self.user_service.UnlinkAccounts(self.cnxn, 0, 111) |
| 486 | with self.assertRaises(exceptions.InputException): |
| 487 | self.user_service.UnlinkAccounts(self.cnxn, 111, 0) |
| 488 | |
| 489 | def testUnlinkAccounts_Normal(self): |
| 490 | """We can unlink accounts.""" |
| 491 | self.user_service.linkedaccount_tbl = mock.Mock() |
| 492 | self.user_service.UnlinkAccounts(self.cnxn, 111, 222) |
| 493 | self.user_service.linkedaccount_tbl.Delete.assert_called_once_with( |
| 494 | self.cnxn, parent_id=111, child_id=222) |
| 495 | |
| 496 | def testUpdateUserSettings(self): |
| 497 | self.SetUpUpdateUser() |
| 498 | user_a = user_pb2.User(email='a@example.com') |
| 499 | self.mox.ReplayAll() |
| 500 | self.user_service.UpdateUserSettings( |
| 501 | self.cnxn, 111, user_a, is_banned=True, |
| 502 | banned_reason='Turned spammer') |
| 503 | self.mox.VerifyAll() |
| 504 | |
| 505 | def testGetUsersPrefs(self): |
| 506 | self.user_service.userprefs_tbl = mock.Mock() |
| 507 | self.user_service.userprefs_tbl.Select.return_value = [ |
| 508 | (111, 'code_font', 'true'), |
| 509 | (111, 'keep_perms_open', 'true'), |
| 510 | # Note: user 222 has not set any prefs. |
| 511 | (333, 'code_font', 'false')] |
| 512 | |
| 513 | prefs_dict = self.user_service.GetUsersPrefs(self.cnxn, [111, 222, 333]) |
| 514 | |
| 515 | expected = { |
| 516 | 111: user_pb2.UserPrefs( |
| 517 | user_id=111, |
| 518 | prefs=[user_pb2.UserPrefValue(name='code_font', value='true'), |
| 519 | user_pb2.UserPrefValue(name='keep_perms_open', value='true')]), |
| 520 | 222: user_pb2.UserPrefs(user_id=222), |
| 521 | 333: user_pb2.UserPrefs( |
| 522 | user_id=333, |
| 523 | prefs=[user_pb2.UserPrefValue(name='code_font', value='false')]), |
| 524 | } |
| 525 | self.assertEqual(expected, prefs_dict) |
| 526 | |
| 527 | def testGetUserPrefs(self): |
| 528 | self.user_service.userprefs_tbl = mock.Mock() |
| 529 | self.user_service.userprefs_tbl.Select.return_value = [ |
| 530 | (111, 'code_font', 'true'), |
| 531 | (111, 'keep_perms_open', 'true'), |
| 532 | # Note: user 222 has not set any prefs. |
| 533 | (333, 'code_font', 'false')] |
| 534 | |
| 535 | userprefs = self.user_service.GetUserPrefs(self.cnxn, 111) |
| 536 | expected = user_pb2.UserPrefs( |
| 537 | user_id=111, |
| 538 | prefs=[user_pb2.UserPrefValue(name='code_font', value='true'), |
| 539 | user_pb2.UserPrefValue(name='keep_perms_open', value='true')]) |
| 540 | self.assertEqual(expected, userprefs) |
| 541 | |
| 542 | userprefs = self.user_service.GetUserPrefs(self.cnxn, 222) |
| 543 | expected = user_pb2.UserPrefs(user_id=222) |
| 544 | self.assertEqual(expected, userprefs) |
| 545 | |
| 546 | def testSetUserPrefs(self): |
| 547 | self.user_service.userprefs_tbl = mock.Mock() |
| 548 | pref_values = [user_pb2.UserPrefValue(name='code_font', value='true'), |
| 549 | user_pb2.UserPrefValue(name='keep_perms_open', value='true')] |
| 550 | self.user_service.SetUserPrefs(self.cnxn, 111, pref_values) |
| 551 | self.user_service.userprefs_tbl.InsertRows.assert_called_once_with( |
| 552 | self.cnxn, user_svc.USERPREFS_COLS, |
| 553 | [(111, 'code_font', 'true'), |
| 554 | (111, 'keep_perms_open', 'true')], |
| 555 | replace=True) |
| 556 | |
| 557 | def testExpungeUsers(self): |
| 558 | self.user_service.linkedaccount_tbl.Delete = mock.Mock() |
| 559 | self.user_service.linkedaccountinvite_tbl.Delete = mock.Mock() |
| 560 | self.user_service.userprefs_tbl.Delete = mock.Mock() |
| 561 | self.user_service.user_tbl.Delete = mock.Mock() |
| 562 | |
| 563 | user_ids = [222, 444] |
| 564 | self.user_service.ExpungeUsers(self.cnxn, user_ids) |
| 565 | |
| 566 | linked_account_calls = [ |
| 567 | mock.call(self.cnxn, parent_id=user_ids, commit=False), |
| 568 | mock.call(self.cnxn, child_id=user_ids, commit=False)] |
| 569 | self.user_service.linkedaccount_tbl.Delete.has_calls(linked_account_calls) |
| 570 | self.user_service.linkedaccountinvite_tbl.Delete.has_calls( |
| 571 | linked_account_calls) |
| 572 | user_calls = [mock.call(self.cnxn, user_id=user_ids, commit=False)] |
| 573 | self.user_service.userprefs_tbl.Delete.has_calls(user_calls) |
| 574 | self.user_service.user_tbl.Delete.has_calls(user_calls) |
| 575 | |
| 576 | def testTotalUsersCount(self): |
| 577 | self.user_service.user_tbl.SelectValue = mock.Mock(return_value=10) |
| 578 | self.assertEqual(self.user_service.TotalUsersCount(self.cnxn), 9) |
| 579 | self.user_service.user_tbl.SelectValue.assert_called_once_with( |
| 580 | self.cnxn, col='COUNT(*)') |
| 581 | |
| 582 | def testGetAllUserEmailsBatch(self): |
| 583 | rows = [('cow@test.com',), ('pig@test.com',), ('fox@test.com',)] |
| 584 | self.user_service.user_tbl.Select = mock.Mock(return_value=rows) |
| 585 | emails = self.user_service.GetAllUserEmailsBatch(self.cnxn) |
| 586 | self.user_service.user_tbl.Select.assert_called_once_with( |
| 587 | self.cnxn, cols=['email'], limit=1000, offset=0, |
| 588 | where=[('user_id != %s', [framework_constants.DELETED_USER_ID])], |
| 589 | order_by=[('user_id ASC', [])]) |
| 590 | self.assertItemsEqual( |
| 591 | emails, ['cow@test.com', 'pig@test.com', 'fox@test.com']) |
| 592 | |
| 593 | def testGetAllUserEmailsBatch_CustomLimit(self): |
| 594 | rows = [('cow@test.com',), ('pig@test.com',), ('fox@test.com',)] |
| 595 | self.user_service.user_tbl.Select = mock.Mock(return_value=rows) |
| 596 | emails = self.user_service.GetAllUserEmailsBatch( |
| 597 | self.cnxn, limit=30, offset=60) |
| 598 | self.user_service.user_tbl.Select.assert_called_once_with( |
| 599 | self.cnxn, cols=['email'], limit=30, offset=60, |
| 600 | where=[('user_id != %s', [framework_constants.DELETED_USER_ID])], |
| 601 | order_by=[('user_id ASC', [])]) |
| 602 | self.assertItemsEqual( |
| 603 | emails, ['cow@test.com', 'pig@test.com', 'fox@test.com']) |