blob: 5bfd899eb694e07f7ffa9296a66baf7b7b9e3940 [file] [log] [blame]
Copybara854996b2021-09-07 19:36:02 +00001# Copyright 2016 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style
3# license that can be found in the LICENSE file or at
4# https://developers.google.com/open-source/licenses/bsd
5
6"""Tests for the usergroup service."""
7from __future__ import print_function
8from __future__ import division
9from __future__ import absolute_import
10
11import collections
12import mock
13import unittest
14
15import mox
16
17from google.appengine.ext import testbed
18
19from framework import exceptions
20from framework import permissions
21from framework import sql
22from proto import usergroup_pb2
23from services import service_manager
24from services import usergroup_svc
25from testing import fake
26
27
28def MakeUserGroupService(cache_manager, my_mox):
29 usergroup_service = usergroup_svc.UserGroupService(cache_manager)
30 usergroup_service.usergroup_tbl = my_mox.CreateMock(sql.SQLTableManager)
31 usergroup_service.usergroupsettings_tbl = my_mox.CreateMock(
32 sql.SQLTableManager)
33 usergroup_service.usergroupprojects_tbl = my_mox.CreateMock(
34 sql.SQLTableManager)
35 return usergroup_service
36
37
38class MembershipTwoLevelCacheTest(unittest.TestCase):
39
40 def setUp(self):
41 self.mox = mox.Mox()
42 self.cache_manager = fake.CacheManager()
43 self.usergroup_service = MakeUserGroupService(self.cache_manager, self.mox)
44
45 def testDeserializeMemberships(self):
46 memberships_rows = [(111, 777), (111, 888), (222, 888)]
47 actual = self.usergroup_service.memberships_2lc._DeserializeMemberships(
48 memberships_rows)
49 self.assertItemsEqual([111, 222], list(actual.keys()))
50 self.assertItemsEqual([777, 888], actual[111])
51 self.assertItemsEqual([888], actual[222])
52
53
54class UserGroupServiceTest(unittest.TestCase):
55
56 def setUp(self):
57 self.testbed = testbed.Testbed()
58 self.testbed.activate()
59 self.testbed.init_memcache_stub()
60
61 self.mox = mox.Mox()
62 self.cnxn = 'fake connection'
63 self.cache_manager = fake.CacheManager()
64 self.usergroup_service = MakeUserGroupService(self.cache_manager, self.mox)
65 self.services = service_manager.Services(
66 user=fake.UserService(),
67 usergroup=self.usergroup_service,
68 project=fake.ProjectService())
69
70 def tearDown(self):
71 self.testbed.deactivate()
72 self.mox.UnsetStubs()
73 self.mox.ResetAll()
74
75 def SetUpCreateGroup(
76 self, group_id, visiblity, external_group_type=None):
77 self.SetUpUpdateSettings(group_id, visiblity, external_group_type)
78
79 def testCreateGroup_Normal(self):
80 self.services.user.TestAddUser('group@example.com', 888)
81 self.SetUpCreateGroup(888, 'anyone')
82 self.mox.ReplayAll()
83 actual_group_id = self.usergroup_service.CreateGroup(
84 self.cnxn, self.services, 'group@example.com', 'anyone')
85 self.mox.VerifyAll()
86 self.assertEqual(888, actual_group_id)
87
88 def testCreateGroup_Import(self):
89 self.services.user.TestAddUser('troopers', 888)
90 self.SetUpCreateGroup(888, 'owners', 'mdb')
91 self.mox.ReplayAll()
92 actual_group_id = self.usergroup_service.CreateGroup(
93 self.cnxn, self.services, 'troopers', 'owners', 'mdb')
94 self.mox.VerifyAll()
95 self.assertEqual(888, actual_group_id)
96
97 def SetUpDetermineWhichUserIDsAreGroups(self, ids_to_query, mock_group_ids):
98 self.usergroup_service.usergroupsettings_tbl.Select(
99 self.cnxn, cols=['group_id'], group_id=ids_to_query).AndReturn(
100 (gid,) for gid in mock_group_ids)
101
102 def testDetermineWhichUserIDsAreGroups_NoGroups(self):
103 self.SetUpDetermineWhichUserIDsAreGroups([], [])
104 self.mox.ReplayAll()
105 actual_group_ids = self.usergroup_service.DetermineWhichUserIDsAreGroups(
106 self.cnxn, [])
107 self.mox.VerifyAll()
108 self.assertEqual([], actual_group_ids)
109
110 def testDetermineWhichUserIDsAreGroups_SomeGroups(self):
111 user_ids = [111, 222, 333]
112 group_ids = [888, 999]
113 self.SetUpDetermineWhichUserIDsAreGroups(user_ids + group_ids, group_ids)
114 self.mox.ReplayAll()
115 actual_group_ids = self.usergroup_service.DetermineWhichUserIDsAreGroups(
116 self.cnxn, user_ids + group_ids)
117 self.mox.VerifyAll()
118 self.assertEqual(group_ids, actual_group_ids)
119
120 def testLookupUserGroupID_Found(self):
121 mock_select = mock.MagicMock()
122 self.services.usergroup.usergroupsettings_tbl.Select = mock_select
123 mock_select.return_value = [('group@example.com', 888)]
124
125 actual = self.services.usergroup.LookupUserGroupID(
126 self.cnxn, 'group@example.com')
127
128 self.assertEqual(888, actual)
129 mock_select.assert_called_once_with(
130 self.cnxn, cols=['email', 'group_id'],
131 left_joins=[('User ON UserGroupSettings.group_id = User.user_id', [])],
132 email='group@example.com',
133 where=[('group_id IS NOT NULL', [])])
134
135 def testLookupUserGroupID_NotFound(self):
136 mock_select = mock.MagicMock()
137 self.services.usergroup.usergroupsettings_tbl.Select = mock_select
138 mock_select.return_value = []
139
140 actual = self.services.usergroup.LookupUserGroupID(
141 self.cnxn, 'user@example.com')
142
143 self.assertIsNone(actual)
144 mock_select.assert_called_once_with(
145 self.cnxn, cols=['email', 'group_id'],
146 left_joins=[('User ON UserGroupSettings.group_id = User.user_id', [])],
147 email='user@example.com',
148 where=[('group_id IS NOT NULL', [])])
149
150 def SetUpLookupAllMemberships(self, user_ids, mock_membership_rows):
151 self.usergroup_service.usergroup_tbl.Select(
152 self.cnxn, cols=['user_id', 'group_id'], distinct=True,
153 user_id=user_ids).AndReturn(mock_membership_rows)
154
155 def testLookupAllMemberships(self):
156 self.usergroup_service.group_dag.initialized = True
157 self.usergroup_service.memberships_2lc.CacheItem(111, {888, 999})
158 self.SetUpLookupAllMemberships([222], [(222, 777), (222, 999)])
159 self.usergroup_service.usergroupsettings_tbl.Select(
160 self.cnxn, cols=['group_id']).AndReturn([])
161 self.usergroup_service.usergroup_tbl.Select(
162 self.cnxn, cols=['user_id', 'group_id'], distinct=True,
163 user_id=[]).AndReturn([])
164 self.mox.ReplayAll()
165 actual_membership_dict = self.usergroup_service.LookupAllMemberships(
166 self.cnxn, [111, 222])
167 self.mox.VerifyAll()
168 self.assertEqual(
169 {111: {888, 999}, 222: {777, 999}},
170 actual_membership_dict)
171
172 def SetUpRemoveMembers(self, group_id, member_ids):
173 self.usergroup_service.usergroup_tbl.Delete(
174 self.cnxn, group_id=group_id, user_id=member_ids)
175
176 def testRemoveMembers(self):
177 self.usergroup_service.group_dag.initialized = True
178 self.SetUpRemoveMembers(888, [111, 222])
179 self.SetUpLookupAllMembers([111, 222], [], {}, {})
180 self.mox.ReplayAll()
181 self.usergroup_service.RemoveMembers(self.cnxn, 888, [111, 222])
182 self.mox.VerifyAll()
183
184 def testUpdateMembers(self):
185 self.usergroup_service.group_dag.initialized = True
186 self.usergroup_service.usergroup_tbl.Delete(
187 self.cnxn, group_id=888, user_id=[111, 222])
188 self.usergroup_service.usergroup_tbl.InsertRows(
189 self.cnxn, ['user_id', 'group_id', 'role'],
190 [(111, 888, 'member'), (222, 888, 'member')])
191 self.SetUpLookupAllMembers([111, 222], [], {}, {})
192 self.mox.ReplayAll()
193 self.usergroup_service.UpdateMembers(
194 self.cnxn, 888, [111, 222], 'member')
195 self.mox.VerifyAll()
196
197 def testUpdateMembers_CircleDetection(self):
198 # Two groups: 888 and 999 while 999 is a member of 888.
199 self.SetUpDAG([(888,), (999,)], [(999, 888)])
200 self.mox.ReplayAll()
201 self.assertRaises(
202 exceptions.CircularGroupException,
203 self.usergroup_service.UpdateMembers, self.cnxn, 999, [888], 'member')
204 self.mox.VerifyAll()
205
206 def SetUpLookupAllMembers(
207 self, group_ids, direct_member_rows,
208 descedants_dict, indirect_member_rows_dict):
209 self.usergroup_service.usergroup_tbl.Select(
210 self.cnxn, cols=['user_id', 'group_id', 'role'], distinct=True,
211 group_id=group_ids).AndReturn(direct_member_rows)
212 for gid in group_ids:
213 if descedants_dict.get(gid, []):
214 self.usergroup_service.usergroup_tbl.Select(
215 self.cnxn, cols=['user_id'], distinct=True,
216 group_id=descedants_dict.get(gid, [])).AndReturn(
217 indirect_member_rows_dict.get(gid, []))
218
219 def testLookupAllMembers(self):
220 self.usergroup_service.group_dag.initialized = True
221 self.usergroup_service.group_dag.user_group_children = (
222 collections.defaultdict(list))
223 self.usergroup_service.group_dag.user_group_children[777] = [888]
224 self.usergroup_service.group_dag.user_group_children[888] = [999]
225 self.SetUpLookupAllMembers(
226 [777],
227 [(888, 777, 'member'), (111, 888, 'member'), (999, 888, 'member'),
228 (222, 999, 'member')],
229 {777: [888, 999]},
230 {777: [(111,), (222,), (999,)]})
231
232 self.mox.ReplayAll()
233 members_dict, owners_dict = self.usergroup_service.LookupAllMembers(
234 self.cnxn, [777])
235 self.mox.VerifyAll()
236 self.assertItemsEqual([111, 222, 888, 999], members_dict[777])
237 self.assertItemsEqual([], owners_dict[777])
238
239 def testExpandAnyGroupEmailRecipients(self):
240 self.usergroup_service.group_dag.initialized = True
241 self.SetUpDetermineWhichUserIDsAreGroups(
242 [111, 777, 888, 999], [777, 888, 999])
243 self.SetUpGetGroupSettings(
244 [777, 888, 999],
245 [(777, 'anyone', None, 0, 1, 0),
246 (888, 'anyone', None, 0, 0, 1),
247 (999, 'anyone', None, 0, 1, 1)],
248 )
249 self.SetUpLookupAllMembers(
250 [777, 888, 999],
251 [(222, 777, 'member'), (333, 888, 'member'), (444, 999, 'member')],
252 {}, {})
253 self.mox.ReplayAll()
254 direct, indirect = self.usergroup_service.ExpandAnyGroupEmailRecipients(
255 self.cnxn, [111, 777, 888, 999])
256 self.mox.VerifyAll()
257 self.assertItemsEqual([111, 888, 999], direct)
258 self.assertItemsEqual([222, 444], indirect)
259
260 def SetUpLookupMembers(self, group_member_dict):
261 mock_membership_rows = []
262 group_ids = []
263 for gid, members in group_member_dict.items():
264 group_ids.append(gid)
265 mock_membership_rows.extend([(uid, gid, 'member') for uid in members])
266 group_ids.sort()
267 self.usergroup_service.usergroup_tbl.Select(
268 self.cnxn, cols=['user_id','group_id', 'role'], distinct=True,
269 group_id=group_ids).AndReturn(mock_membership_rows)
270
271 def testLookupMembers_NoneRequested(self):
272 self.mox.ReplayAll()
273 member_ids, _ = self.usergroup_service.LookupMembers(self.cnxn, [])
274 self.mox.VerifyAll()
275 self.assertItemsEqual({}, member_ids)
276
277 def testLookupMembers_Nonexistent(self):
278 """If some requested groups don't exist, they are ignored."""
279 self.SetUpLookupMembers({777: []})
280 self.mox.ReplayAll()
281 member_ids, _ = self.usergroup_service.LookupMembers(self.cnxn, [777])
282 self.mox.VerifyAll()
283 self.assertItemsEqual([], member_ids[777])
284
285 def testLookupMembers_AllEmpty(self):
286 """Requesting all empty groups results in no members."""
287 self.SetUpLookupMembers({888: [], 999: []})
288 self.mox.ReplayAll()
289 member_ids, _ = self.usergroup_service.LookupMembers(self.cnxn, [888, 999])
290 self.mox.VerifyAll()
291 self.assertItemsEqual([], member_ids[888])
292
293 def testLookupMembers_OneGroup(self):
294 self.SetUpLookupMembers({888: [111, 222]})
295 self.mox.ReplayAll()
296 member_ids, _ = self.usergroup_service.LookupMembers(self.cnxn, [888])
297 self.mox.VerifyAll()
298 self.assertItemsEqual([111, 222], member_ids[888])
299
300 def testLookupMembers_GroupsAndNonGroups(self):
301 """We ignore any non-groups passed in."""
302 self.SetUpLookupMembers({111: [], 333: [], 888: [111, 222]})
303 self.mox.ReplayAll()
304 member_ids, _ = self.usergroup_service.LookupMembers(
305 self.cnxn, [111, 333, 888])
306 self.mox.VerifyAll()
307 self.assertItemsEqual([111, 222], member_ids[888])
308
309 def testLookupMembers_OverlappingGroups(self):
310 """We get the union of IDs. Imagine 888 = {111} and 999 = {111, 222}."""
311 self.SetUpLookupMembers({888: [111], 999: [111, 222]})
312 self.mox.ReplayAll()
313 member_ids, _ = self.usergroup_service.LookupMembers(self.cnxn, [888, 999])
314 self.mox.VerifyAll()
315 self.assertItemsEqual([111, 222], member_ids[999])
316 self.assertItemsEqual([111], member_ids[888])
317
318 def testLookupVisibleMembers_LimitedVisiblity(self):
319 """We get only the member IDs in groups that the user is allowed to see."""
320 self.usergroup_service.group_dag.initialized = True
321 self.SetUpGetGroupSettings(
322 [888, 999],
323 [(888, 'anyone', None, 0, 1, 0), (999, 'members', None, 0, 1, 0)])
324 self.SetUpLookupMembers({888: [111], 999: [111]})
325 self.SetUpLookupAllMembers(
326 [888, 999], [(111, 888, 'member'), (111, 999, 'member')], {}, {})
327 self.mox.ReplayAll()
328 member_ids, _ = self.usergroup_service.LookupVisibleMembers(
329 self.cnxn, [888, 999], permissions.USER_PERMISSIONSET, set(),
330 self.services)
331 self.mox.VerifyAll()
332 self.assertItemsEqual([111], member_ids[888])
333 self.assertNotIn(999, member_ids)
334
335 def SetUpGetAllUserGroupsInfo(self, mock_settings_rows, mock_count_rows,
336 mock_friends=None):
337 mock_friends = mock_friends or []
338 self.usergroup_service.usergroupsettings_tbl.Select(
339 self.cnxn, cols=['email', 'group_id', 'who_can_view_members',
340 'external_group_type', 'last_sync_time',
341 'notify_members', 'notify_group'],
342 left_joins=[('User ON UserGroupSettings.group_id = User.user_id', [])]
343 ).AndReturn(mock_settings_rows)
344 self.usergroup_service.usergroup_tbl.Select(
345 self.cnxn, cols=['group_id', 'COUNT(*)'],
346 group_by=['group_id']).AndReturn(mock_count_rows)
347
348 group_ids = [g[1] for g in mock_settings_rows]
349 self.usergroup_service.usergroupprojects_tbl.Select(
350 self.cnxn, cols=usergroup_svc.USERGROUPPROJECTS_COLS,
351 group_id=group_ids).AndReturn(mock_friends)
352
353 def testGetAllUserGroupsInfo(self):
354 self.SetUpGetAllUserGroupsInfo(
355 [('group@example.com', 888, 'anyone', None, 0, 1, 0)],
356 [(888, 12)])
357 self.mox.ReplayAll()
358 actual_infos = self.usergroup_service.GetAllUserGroupsInfo(self.cnxn)
359 self.mox.VerifyAll()
360 self.assertEqual(1, len(actual_infos))
361 addr, count, group_settings, group_id = actual_infos[0]
362 self.assertEqual('group@example.com', addr)
363 self.assertEqual(12, count)
364 self.assertEqual(usergroup_pb2.MemberVisibility.ANYONE,
365 group_settings.who_can_view_members)
366 self.assertEqual(888, group_id)
367
368 def SetUpGetGroupSettings(self, group_ids, mock_result_rows,
369 mock_friends=None):
370 mock_friends = mock_friends or []
371 self.usergroup_service.usergroupsettings_tbl.Select(
372 self.cnxn, cols=usergroup_svc.USERGROUPSETTINGS_COLS,
373 group_id=group_ids).AndReturn(mock_result_rows)
374 self.usergroup_service.usergroupprojects_tbl.Select(
375 self.cnxn, cols=usergroup_svc.USERGROUPPROJECTS_COLS,
376 group_id=group_ids).AndReturn(mock_friends)
377
378 def testGetGroupSettings_NoGroupsRequested(self):
379 self.SetUpGetGroupSettings([], [])
380 self.mox.ReplayAll()
381 actual_settings_dict = self.usergroup_service.GetAllGroupSettings(
382 self.cnxn, [])
383 self.mox.VerifyAll()
384 self.assertEqual({}, actual_settings_dict)
385
386 def testGetGroupSettings_NoGroupsFound(self):
387 self.SetUpGetGroupSettings([777], [])
388 self.mox.ReplayAll()
389 actual_settings_dict = self.usergroup_service.GetAllGroupSettings(
390 self.cnxn, [777])
391 self.mox.VerifyAll()
392 self.assertEqual({}, actual_settings_dict)
393
394 def testGetGroupSettings_SomeGroups(self):
395 self.SetUpGetGroupSettings(
396 [777, 888, 999],
397 [(888, 'anyone', None, 0, 1, 0), (999, 'members', None, 0, 1, 0)])
398 self.mox.ReplayAll()
399 actual_settings_dict = self.usergroup_service.GetAllGroupSettings(
400 self.cnxn, [777, 888, 999])
401 self.mox.VerifyAll()
402 self.assertEqual(
403 {888: usergroup_pb2.MakeSettings('anyone'),
404 999: usergroup_pb2.MakeSettings('members')},
405 actual_settings_dict)
406
407 def testGetGroupSettings_NoSuchGroup(self):
408 self.SetUpGetGroupSettings([777], [])
409 self.mox.ReplayAll()
410 actual_settings = self.usergroup_service.GetGroupSettings(self.cnxn, 777)
411 self.mox.VerifyAll()
412 self.assertEqual(None, actual_settings)
413
414 def testGetGroupSettings_Found(self):
415 self.SetUpGetGroupSettings([888], [(888, 'anyone', None, 0, 1, 0)])
416 self.mox.ReplayAll()
417 actual_settings = self.usergroup_service.GetGroupSettings(self.cnxn, 888)
418 self.mox.VerifyAll()
419 self.assertEqual(
420 usergroup_pb2.MemberVisibility.ANYONE,
421 actual_settings.who_can_view_members)
422
423 def testGetGroupSettings_Import(self):
424 self.SetUpGetGroupSettings(
425 [888], [(888, 'owners', 'mdb', 0, 1, 0)])
426 self.mox.ReplayAll()
427 actual_settings = self.usergroup_service.GetGroupSettings(self.cnxn, 888)
428 self.mox.VerifyAll()
429 self.assertEqual(
430 usergroup_pb2.MemberVisibility.OWNERS,
431 actual_settings.who_can_view_members)
432 self.assertEqual(
433 usergroup_pb2.GroupType.MDB,
434 actual_settings.ext_group_type)
435
436 def SetUpUpdateSettings(self, group_id, visiblity, external_group_type=None,
437 last_sync_time=0, friend_projects=None,
438 notify_members=True, notify_group=False):
439 friend_projects = friend_projects or []
440 self.usergroup_service.usergroupsettings_tbl.InsertRow(
441 self.cnxn, group_id=group_id, who_can_view_members=visiblity,
442 external_group_type=external_group_type,
443 last_sync_time=last_sync_time, notify_members=notify_members,
444 notify_group=notify_group, replace=True)
445 self.usergroup_service.usergroupprojects_tbl.Delete(
446 self.cnxn, group_id=group_id)
447 if friend_projects:
448 rows = [(group_id, p_id) for p_id in friend_projects]
449 self.usergroup_service.usergroupprojects_tbl.InsertRows(
450 self.cnxn, ['group_id', 'project_id'], rows)
451
452 def testUpdateSettings_Normal(self):
453 self.SetUpUpdateSettings(888, 'anyone')
454 self.mox.ReplayAll()
455 self.usergroup_service.UpdateSettings(
456 self.cnxn, 888, usergroup_pb2.MakeSettings('anyone'))
457 self.mox.VerifyAll()
458
459 def testUpdateSettings_Import(self):
460 self.SetUpUpdateSettings(888, 'owners', 'mdb')
461 self.mox.ReplayAll()
462 self.usergroup_service.UpdateSettings(
463 self.cnxn, 888,
464 usergroup_pb2.MakeSettings('owners', 'mdb'))
465 self.mox.VerifyAll()
466
467 def testUpdateSettings_WithFriends(self):
468 self.SetUpUpdateSettings(888, 'anyone', friend_projects=[789])
469 self.mox.ReplayAll()
470 self.usergroup_service.UpdateSettings(
471 self.cnxn, 888,
472 usergroup_pb2.MakeSettings('anyone', friend_projects=[789]))
473 self.mox.VerifyAll()
474
475 def testExpungeUsersInGroups(self):
476 self.usergroup_service.usergroupprojects_tbl.Delete = mock.Mock()
477 self.usergroup_service.usergroupsettings_tbl.Delete = mock.Mock()
478 self.usergroup_service.usergroup_tbl.Delete = mock.Mock()
479
480 ids = [222, 333, 444]
481 self.usergroup_service.ExpungeUsersInGroups(self.cnxn, ids)
482
483 self.usergroup_service.usergroupprojects_tbl.Delete.assert_called_once_with(
484 self.cnxn, group_id=ids, commit=False)
485 self.usergroup_service.usergroupsettings_tbl.Delete.assert_called_once_with(
486 self.cnxn, group_id=ids, commit=False)
487 self.usergroup_service.usergroup_tbl.Delete.assert_has_calls(
488 [mock.call(self.cnxn, group_id=ids, commit=False),
489 mock.call(self.cnxn, user_id=ids, commit=False)])
490
491 def SetUpDAG(self, group_id_rows, usergroup_rows):
492 self.usergroup_service.usergroupsettings_tbl.Select(
493 self.cnxn, cols=['group_id']).AndReturn(group_id_rows)
494 self.usergroup_service.usergroup_tbl.Select(
495 self.cnxn, cols=['user_id', 'group_id'], distinct=True,
496 user_id=[r[0] for r in group_id_rows]).AndReturn(usergroup_rows)
497
498 def testDAG_Build(self):
499 # Old entries should go away after rebuilding
500 self.usergroup_service.group_dag.user_group_parents = (
501 collections.defaultdict(list))
502 self.usergroup_service.group_dag.user_group_parents[111] = [222]
503 # Two groups: 888 and 999 while 999 is a member of 888.
504 self.SetUpDAG([(888,), (999,)], [(999, 888)])
505 self.mox.ReplayAll()
506 self.usergroup_service.group_dag.Build(self.cnxn)
507 self.mox.VerifyAll()
508 self.assertIn(888, self.usergroup_service.group_dag.user_group_children)
509 self.assertIn(999, self.usergroup_service.group_dag.user_group_parents)
510 self.assertNotIn(111, self.usergroup_service.group_dag.user_group_parents)
511
512 def testDAG_GetAllAncestors(self):
513 # Three groups: 777, 888 and 999.
514 # 999 is a direct member of 888, and 888 is a direct member of 777.
515 self.SetUpDAG([(777,), (888,), (999,)], [(999, 888), (888, 777)])
516 self.mox.ReplayAll()
517 ancestors = self.usergroup_service.group_dag.GetAllAncestors(
518 self.cnxn, 999)
519 self.mox.VerifyAll()
520 ancestors.sort()
521 self.assertEqual([777, 888], ancestors)
522
523 def testDAG_GetAllAncestorsDiamond(self):
524 # Four groups: 666, 777, 888 and 999.
525 # 999 is a direct member of both 888 and 777,
526 # 888 is a direct member of 666, and 777 is also a direct member of 666.
527 self.SetUpDAG([(666, ), (777,), (888,), (999,)],
528 [(999, 888), (999, 777), (888, 666), (777, 666)])
529 self.mox.ReplayAll()
530 ancestors = self.usergroup_service.group_dag.GetAllAncestors(
531 self.cnxn, 999)
532 self.mox.VerifyAll()
533 ancestors.sort()
534 self.assertEqual([666, 777, 888], ancestors)
535
536 def testDAG_GetAllDescendants(self):
537 # Four groups: 666, 777, 888 and 999.
538 # 999 is a direct member of both 888 and 777,
539 # 888 is a direct member of 666, and 777 is also a direct member of 666.
540 self.SetUpDAG([(666, ), (777,), (888,), (999,)],
541 [(999, 888), (999, 777), (888, 666), (777, 666)])
542 self.mox.ReplayAll()
543 descendants = self.usergroup_service.group_dag.GetAllDescendants(
544 self.cnxn, 666)
545 self.mox.VerifyAll()
546 descendants.sort()
547 self.assertEqual([777, 888, 999], descendants)
548
549 def testDAG_IsChild(self):
550 # Four groups: 666, 777, 888 and 999.
551 # 999 is a direct member of both 888 and 777,
552 # 888 is a direct member of 666, and 777 is also a direct member of 666.
553 self.SetUpDAG([(666, ), (777,), (888,), (999,)],
554 [(999, 888), (999, 777), (888, 666), (777, 666)])
555 self.mox.ReplayAll()
556 result1 = self.usergroup_service.group_dag.IsChild(
557 self.cnxn, 777, 666)
558 result2 = self.usergroup_service.group_dag.IsChild(
559 self.cnxn, 777, 888)
560 self.mox.VerifyAll()
561 self.assertTrue(result1)
562 self.assertFalse(result2)