blob: 10b2c8ae0758bc57d1275575634e366eed9e5e7b [file] [log] [blame]
Copybara854996b2021-09-07 19:36:02 +00001# Copyright 2016 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style
3# license that can be found in the LICENSE file or at
4# https://developers.google.com/open-source/licenses/bsd
5
6"""Tests for the usergroup service."""
7from __future__ import print_function
8from __future__ import division
9from __future__ import absolute_import
10
11import collections
12import mock
13import unittest
14
Adrià Vilanova Martínez9f9ade52022-10-10 23:20:11 +020015try:
16 from mox3 import mox
17except ImportError:
18 import mox
Copybara854996b2021-09-07 19:36:02 +000019
20from google.appengine.ext import testbed
21
22from framework import exceptions
23from framework import permissions
24from framework import sql
25from proto import usergroup_pb2
26from services import service_manager
27from services import usergroup_svc
28from testing import fake
29
30
31def MakeUserGroupService(cache_manager, my_mox):
32 usergroup_service = usergroup_svc.UserGroupService(cache_manager)
33 usergroup_service.usergroup_tbl = my_mox.CreateMock(sql.SQLTableManager)
34 usergroup_service.usergroupsettings_tbl = my_mox.CreateMock(
35 sql.SQLTableManager)
36 usergroup_service.usergroupprojects_tbl = my_mox.CreateMock(
37 sql.SQLTableManager)
38 return usergroup_service
39
40
41class MembershipTwoLevelCacheTest(unittest.TestCase):
42
43 def setUp(self):
44 self.mox = mox.Mox()
45 self.cache_manager = fake.CacheManager()
46 self.usergroup_service = MakeUserGroupService(self.cache_manager, self.mox)
47
48 def testDeserializeMemberships(self):
49 memberships_rows = [(111, 777), (111, 888), (222, 888)]
50 actual = self.usergroup_service.memberships_2lc._DeserializeMemberships(
51 memberships_rows)
52 self.assertItemsEqual([111, 222], list(actual.keys()))
53 self.assertItemsEqual([777, 888], actual[111])
54 self.assertItemsEqual([888], actual[222])
55
56
57class UserGroupServiceTest(unittest.TestCase):
58
59 def setUp(self):
60 self.testbed = testbed.Testbed()
61 self.testbed.activate()
62 self.testbed.init_memcache_stub()
63
64 self.mox = mox.Mox()
65 self.cnxn = 'fake connection'
66 self.cache_manager = fake.CacheManager()
67 self.usergroup_service = MakeUserGroupService(self.cache_manager, self.mox)
68 self.services = service_manager.Services(
69 user=fake.UserService(),
70 usergroup=self.usergroup_service,
71 project=fake.ProjectService())
72
73 def tearDown(self):
74 self.testbed.deactivate()
75 self.mox.UnsetStubs()
76 self.mox.ResetAll()
77
78 def SetUpCreateGroup(
79 self, group_id, visiblity, external_group_type=None):
80 self.SetUpUpdateSettings(group_id, visiblity, external_group_type)
81
82 def testCreateGroup_Normal(self):
83 self.services.user.TestAddUser('group@example.com', 888)
84 self.SetUpCreateGroup(888, 'anyone')
85 self.mox.ReplayAll()
86 actual_group_id = self.usergroup_service.CreateGroup(
87 self.cnxn, self.services, 'group@example.com', 'anyone')
88 self.mox.VerifyAll()
89 self.assertEqual(888, actual_group_id)
90
91 def testCreateGroup_Import(self):
92 self.services.user.TestAddUser('troopers', 888)
93 self.SetUpCreateGroup(888, 'owners', 'mdb')
94 self.mox.ReplayAll()
95 actual_group_id = self.usergroup_service.CreateGroup(
96 self.cnxn, self.services, 'troopers', 'owners', 'mdb')
97 self.mox.VerifyAll()
98 self.assertEqual(888, actual_group_id)
99
100 def SetUpDetermineWhichUserIDsAreGroups(self, ids_to_query, mock_group_ids):
101 self.usergroup_service.usergroupsettings_tbl.Select(
102 self.cnxn, cols=['group_id'], group_id=ids_to_query).AndReturn(
103 (gid,) for gid in mock_group_ids)
104
105 def testDetermineWhichUserIDsAreGroups_NoGroups(self):
106 self.SetUpDetermineWhichUserIDsAreGroups([], [])
107 self.mox.ReplayAll()
108 actual_group_ids = self.usergroup_service.DetermineWhichUserIDsAreGroups(
109 self.cnxn, [])
110 self.mox.VerifyAll()
111 self.assertEqual([], actual_group_ids)
112
113 def testDetermineWhichUserIDsAreGroups_SomeGroups(self):
114 user_ids = [111, 222, 333]
115 group_ids = [888, 999]
116 self.SetUpDetermineWhichUserIDsAreGroups(user_ids + group_ids, group_ids)
117 self.mox.ReplayAll()
118 actual_group_ids = self.usergroup_service.DetermineWhichUserIDsAreGroups(
119 self.cnxn, user_ids + group_ids)
120 self.mox.VerifyAll()
121 self.assertEqual(group_ids, actual_group_ids)
122
123 def testLookupUserGroupID_Found(self):
124 mock_select = mock.MagicMock()
125 self.services.usergroup.usergroupsettings_tbl.Select = mock_select
126 mock_select.return_value = [('group@example.com', 888)]
127
128 actual = self.services.usergroup.LookupUserGroupID(
129 self.cnxn, 'group@example.com')
130
131 self.assertEqual(888, actual)
132 mock_select.assert_called_once_with(
133 self.cnxn, cols=['email', 'group_id'],
134 left_joins=[('User ON UserGroupSettings.group_id = User.user_id', [])],
135 email='group@example.com',
136 where=[('group_id IS NOT NULL', [])])
137
138 def testLookupUserGroupID_NotFound(self):
139 mock_select = mock.MagicMock()
140 self.services.usergroup.usergroupsettings_tbl.Select = mock_select
141 mock_select.return_value = []
142
143 actual = self.services.usergroup.LookupUserGroupID(
144 self.cnxn, 'user@example.com')
145
146 self.assertIsNone(actual)
147 mock_select.assert_called_once_with(
148 self.cnxn, cols=['email', 'group_id'],
149 left_joins=[('User ON UserGroupSettings.group_id = User.user_id', [])],
150 email='user@example.com',
151 where=[('group_id IS NOT NULL', [])])
152
153 def SetUpLookupAllMemberships(self, user_ids, mock_membership_rows):
154 self.usergroup_service.usergroup_tbl.Select(
155 self.cnxn, cols=['user_id', 'group_id'], distinct=True,
156 user_id=user_ids).AndReturn(mock_membership_rows)
157
158 def testLookupAllMemberships(self):
159 self.usergroup_service.group_dag.initialized = True
160 self.usergroup_service.memberships_2lc.CacheItem(111, {888, 999})
161 self.SetUpLookupAllMemberships([222], [(222, 777), (222, 999)])
162 self.usergroup_service.usergroupsettings_tbl.Select(
163 self.cnxn, cols=['group_id']).AndReturn([])
164 self.usergroup_service.usergroup_tbl.Select(
165 self.cnxn, cols=['user_id', 'group_id'], distinct=True,
166 user_id=[]).AndReturn([])
167 self.mox.ReplayAll()
168 actual_membership_dict = self.usergroup_service.LookupAllMemberships(
169 self.cnxn, [111, 222])
170 self.mox.VerifyAll()
171 self.assertEqual(
172 {111: {888, 999}, 222: {777, 999}},
173 actual_membership_dict)
174
175 def SetUpRemoveMembers(self, group_id, member_ids):
176 self.usergroup_service.usergroup_tbl.Delete(
177 self.cnxn, group_id=group_id, user_id=member_ids)
178
179 def testRemoveMembers(self):
180 self.usergroup_service.group_dag.initialized = True
181 self.SetUpRemoveMembers(888, [111, 222])
182 self.SetUpLookupAllMembers([111, 222], [], {}, {})
183 self.mox.ReplayAll()
184 self.usergroup_service.RemoveMembers(self.cnxn, 888, [111, 222])
185 self.mox.VerifyAll()
186
187 def testUpdateMembers(self):
188 self.usergroup_service.group_dag.initialized = True
189 self.usergroup_service.usergroup_tbl.Delete(
190 self.cnxn, group_id=888, user_id=[111, 222])
191 self.usergroup_service.usergroup_tbl.InsertRows(
192 self.cnxn, ['user_id', 'group_id', 'role'],
193 [(111, 888, 'member'), (222, 888, 'member')])
194 self.SetUpLookupAllMembers([111, 222], [], {}, {})
195 self.mox.ReplayAll()
196 self.usergroup_service.UpdateMembers(
197 self.cnxn, 888, [111, 222], 'member')
198 self.mox.VerifyAll()
199
200 def testUpdateMembers_CircleDetection(self):
201 # Two groups: 888 and 999 while 999 is a member of 888.
202 self.SetUpDAG([(888,), (999,)], [(999, 888)])
203 self.mox.ReplayAll()
204 self.assertRaises(
205 exceptions.CircularGroupException,
206 self.usergroup_service.UpdateMembers, self.cnxn, 999, [888], 'member')
207 self.mox.VerifyAll()
208
209 def SetUpLookupAllMembers(
210 self, group_ids, direct_member_rows,
211 descedants_dict, indirect_member_rows_dict):
212 self.usergroup_service.usergroup_tbl.Select(
213 self.cnxn, cols=['user_id', 'group_id', 'role'], distinct=True,
214 group_id=group_ids).AndReturn(direct_member_rows)
215 for gid in group_ids:
216 if descedants_dict.get(gid, []):
217 self.usergroup_service.usergroup_tbl.Select(
218 self.cnxn, cols=['user_id'], distinct=True,
219 group_id=descedants_dict.get(gid, [])).AndReturn(
220 indirect_member_rows_dict.get(gid, []))
221
222 def testLookupAllMembers(self):
223 self.usergroup_service.group_dag.initialized = True
224 self.usergroup_service.group_dag.user_group_children = (
225 collections.defaultdict(list))
226 self.usergroup_service.group_dag.user_group_children[777] = [888]
227 self.usergroup_service.group_dag.user_group_children[888] = [999]
228 self.SetUpLookupAllMembers(
229 [777],
230 [(888, 777, 'member'), (111, 888, 'member'), (999, 888, 'member'),
231 (222, 999, 'member')],
232 {777: [888, 999]},
233 {777: [(111,), (222,), (999,)]})
234
235 self.mox.ReplayAll()
236 members_dict, owners_dict = self.usergroup_service.LookupAllMembers(
237 self.cnxn, [777])
238 self.mox.VerifyAll()
239 self.assertItemsEqual([111, 222, 888, 999], members_dict[777])
240 self.assertItemsEqual([], owners_dict[777])
241
242 def testExpandAnyGroupEmailRecipients(self):
243 self.usergroup_service.group_dag.initialized = True
244 self.SetUpDetermineWhichUserIDsAreGroups(
245 [111, 777, 888, 999], [777, 888, 999])
246 self.SetUpGetGroupSettings(
247 [777, 888, 999],
248 [(777, 'anyone', None, 0, 1, 0),
249 (888, 'anyone', None, 0, 0, 1),
250 (999, 'anyone', None, 0, 1, 1)],
251 )
252 self.SetUpLookupAllMembers(
253 [777, 888, 999],
254 [(222, 777, 'member'), (333, 888, 'member'), (444, 999, 'member')],
255 {}, {})
256 self.mox.ReplayAll()
257 direct, indirect = self.usergroup_service.ExpandAnyGroupEmailRecipients(
258 self.cnxn, [111, 777, 888, 999])
259 self.mox.VerifyAll()
260 self.assertItemsEqual([111, 888, 999], direct)
261 self.assertItemsEqual([222, 444], indirect)
262
263 def SetUpLookupMembers(self, group_member_dict):
264 mock_membership_rows = []
265 group_ids = []
266 for gid, members in group_member_dict.items():
267 group_ids.append(gid)
268 mock_membership_rows.extend([(uid, gid, 'member') for uid in members])
269 group_ids.sort()
270 self.usergroup_service.usergroup_tbl.Select(
271 self.cnxn, cols=['user_id','group_id', 'role'], distinct=True,
272 group_id=group_ids).AndReturn(mock_membership_rows)
273
274 def testLookupMembers_NoneRequested(self):
275 self.mox.ReplayAll()
276 member_ids, _ = self.usergroup_service.LookupMembers(self.cnxn, [])
277 self.mox.VerifyAll()
278 self.assertItemsEqual({}, member_ids)
279
280 def testLookupMembers_Nonexistent(self):
281 """If some requested groups don't exist, they are ignored."""
282 self.SetUpLookupMembers({777: []})
283 self.mox.ReplayAll()
284 member_ids, _ = self.usergroup_service.LookupMembers(self.cnxn, [777])
285 self.mox.VerifyAll()
286 self.assertItemsEqual([], member_ids[777])
287
288 def testLookupMembers_AllEmpty(self):
289 """Requesting all empty groups results in no members."""
290 self.SetUpLookupMembers({888: [], 999: []})
291 self.mox.ReplayAll()
292 member_ids, _ = self.usergroup_service.LookupMembers(self.cnxn, [888, 999])
293 self.mox.VerifyAll()
294 self.assertItemsEqual([], member_ids[888])
295
296 def testLookupMembers_OneGroup(self):
297 self.SetUpLookupMembers({888: [111, 222]})
298 self.mox.ReplayAll()
299 member_ids, _ = self.usergroup_service.LookupMembers(self.cnxn, [888])
300 self.mox.VerifyAll()
301 self.assertItemsEqual([111, 222], member_ids[888])
302
303 def testLookupMembers_GroupsAndNonGroups(self):
304 """We ignore any non-groups passed in."""
305 self.SetUpLookupMembers({111: [], 333: [], 888: [111, 222]})
306 self.mox.ReplayAll()
307 member_ids, _ = self.usergroup_service.LookupMembers(
308 self.cnxn, [111, 333, 888])
309 self.mox.VerifyAll()
310 self.assertItemsEqual([111, 222], member_ids[888])
311
312 def testLookupMembers_OverlappingGroups(self):
313 """We get the union of IDs. Imagine 888 = {111} and 999 = {111, 222}."""
314 self.SetUpLookupMembers({888: [111], 999: [111, 222]})
315 self.mox.ReplayAll()
316 member_ids, _ = self.usergroup_service.LookupMembers(self.cnxn, [888, 999])
317 self.mox.VerifyAll()
318 self.assertItemsEqual([111, 222], member_ids[999])
319 self.assertItemsEqual([111], member_ids[888])
320
321 def testLookupVisibleMembers_LimitedVisiblity(self):
322 """We get only the member IDs in groups that the user is allowed to see."""
323 self.usergroup_service.group_dag.initialized = True
324 self.SetUpGetGroupSettings(
325 [888, 999],
326 [(888, 'anyone', None, 0, 1, 0), (999, 'members', None, 0, 1, 0)])
327 self.SetUpLookupMembers({888: [111], 999: [111]})
328 self.SetUpLookupAllMembers(
329 [888, 999], [(111, 888, 'member'), (111, 999, 'member')], {}, {})
330 self.mox.ReplayAll()
331 member_ids, _ = self.usergroup_service.LookupVisibleMembers(
332 self.cnxn, [888, 999], permissions.USER_PERMISSIONSET, set(),
333 self.services)
334 self.mox.VerifyAll()
335 self.assertItemsEqual([111], member_ids[888])
336 self.assertNotIn(999, member_ids)
337
338 def SetUpGetAllUserGroupsInfo(self, mock_settings_rows, mock_count_rows,
339 mock_friends=None):
340 mock_friends = mock_friends or []
341 self.usergroup_service.usergroupsettings_tbl.Select(
342 self.cnxn, cols=['email', 'group_id', 'who_can_view_members',
343 'external_group_type', 'last_sync_time',
344 'notify_members', 'notify_group'],
345 left_joins=[('User ON UserGroupSettings.group_id = User.user_id', [])]
346 ).AndReturn(mock_settings_rows)
347 self.usergroup_service.usergroup_tbl.Select(
348 self.cnxn, cols=['group_id', 'COUNT(*)'],
349 group_by=['group_id']).AndReturn(mock_count_rows)
350
351 group_ids = [g[1] for g in mock_settings_rows]
352 self.usergroup_service.usergroupprojects_tbl.Select(
353 self.cnxn, cols=usergroup_svc.USERGROUPPROJECTS_COLS,
354 group_id=group_ids).AndReturn(mock_friends)
355
356 def testGetAllUserGroupsInfo(self):
357 self.SetUpGetAllUserGroupsInfo(
358 [('group@example.com', 888, 'anyone', None, 0, 1, 0)],
359 [(888, 12)])
360 self.mox.ReplayAll()
361 actual_infos = self.usergroup_service.GetAllUserGroupsInfo(self.cnxn)
362 self.mox.VerifyAll()
363 self.assertEqual(1, len(actual_infos))
364 addr, count, group_settings, group_id = actual_infos[0]
365 self.assertEqual('group@example.com', addr)
366 self.assertEqual(12, count)
367 self.assertEqual(usergroup_pb2.MemberVisibility.ANYONE,
368 group_settings.who_can_view_members)
369 self.assertEqual(888, group_id)
370
371 def SetUpGetGroupSettings(self, group_ids, mock_result_rows,
372 mock_friends=None):
373 mock_friends = mock_friends or []
374 self.usergroup_service.usergroupsettings_tbl.Select(
375 self.cnxn, cols=usergroup_svc.USERGROUPSETTINGS_COLS,
376 group_id=group_ids).AndReturn(mock_result_rows)
377 self.usergroup_service.usergroupprojects_tbl.Select(
378 self.cnxn, cols=usergroup_svc.USERGROUPPROJECTS_COLS,
379 group_id=group_ids).AndReturn(mock_friends)
380
381 def testGetGroupSettings_NoGroupsRequested(self):
382 self.SetUpGetGroupSettings([], [])
383 self.mox.ReplayAll()
384 actual_settings_dict = self.usergroup_service.GetAllGroupSettings(
385 self.cnxn, [])
386 self.mox.VerifyAll()
387 self.assertEqual({}, actual_settings_dict)
388
389 def testGetGroupSettings_NoGroupsFound(self):
390 self.SetUpGetGroupSettings([777], [])
391 self.mox.ReplayAll()
392 actual_settings_dict = self.usergroup_service.GetAllGroupSettings(
393 self.cnxn, [777])
394 self.mox.VerifyAll()
395 self.assertEqual({}, actual_settings_dict)
396
397 def testGetGroupSettings_SomeGroups(self):
398 self.SetUpGetGroupSettings(
399 [777, 888, 999],
400 [(888, 'anyone', None, 0, 1, 0), (999, 'members', None, 0, 1, 0)])
401 self.mox.ReplayAll()
402 actual_settings_dict = self.usergroup_service.GetAllGroupSettings(
403 self.cnxn, [777, 888, 999])
404 self.mox.VerifyAll()
405 self.assertEqual(
406 {888: usergroup_pb2.MakeSettings('anyone'),
407 999: usergroup_pb2.MakeSettings('members')},
408 actual_settings_dict)
409
410 def testGetGroupSettings_NoSuchGroup(self):
411 self.SetUpGetGroupSettings([777], [])
412 self.mox.ReplayAll()
413 actual_settings = self.usergroup_service.GetGroupSettings(self.cnxn, 777)
414 self.mox.VerifyAll()
415 self.assertEqual(None, actual_settings)
416
417 def testGetGroupSettings_Found(self):
418 self.SetUpGetGroupSettings([888], [(888, 'anyone', None, 0, 1, 0)])
419 self.mox.ReplayAll()
420 actual_settings = self.usergroup_service.GetGroupSettings(self.cnxn, 888)
421 self.mox.VerifyAll()
422 self.assertEqual(
423 usergroup_pb2.MemberVisibility.ANYONE,
424 actual_settings.who_can_view_members)
425
426 def testGetGroupSettings_Import(self):
427 self.SetUpGetGroupSettings(
428 [888], [(888, 'owners', 'mdb', 0, 1, 0)])
429 self.mox.ReplayAll()
430 actual_settings = self.usergroup_service.GetGroupSettings(self.cnxn, 888)
431 self.mox.VerifyAll()
432 self.assertEqual(
433 usergroup_pb2.MemberVisibility.OWNERS,
434 actual_settings.who_can_view_members)
435 self.assertEqual(
436 usergroup_pb2.GroupType.MDB,
437 actual_settings.ext_group_type)
438
439 def SetUpUpdateSettings(self, group_id, visiblity, external_group_type=None,
440 last_sync_time=0, friend_projects=None,
441 notify_members=True, notify_group=False):
442 friend_projects = friend_projects or []
443 self.usergroup_service.usergroupsettings_tbl.InsertRow(
444 self.cnxn, group_id=group_id, who_can_view_members=visiblity,
445 external_group_type=external_group_type,
446 last_sync_time=last_sync_time, notify_members=notify_members,
447 notify_group=notify_group, replace=True)
448 self.usergroup_service.usergroupprojects_tbl.Delete(
449 self.cnxn, group_id=group_id)
450 if friend_projects:
451 rows = [(group_id, p_id) for p_id in friend_projects]
452 self.usergroup_service.usergroupprojects_tbl.InsertRows(
453 self.cnxn, ['group_id', 'project_id'], rows)
454
455 def testUpdateSettings_Normal(self):
456 self.SetUpUpdateSettings(888, 'anyone')
457 self.mox.ReplayAll()
458 self.usergroup_service.UpdateSettings(
459 self.cnxn, 888, usergroup_pb2.MakeSettings('anyone'))
460 self.mox.VerifyAll()
461
462 def testUpdateSettings_Import(self):
463 self.SetUpUpdateSettings(888, 'owners', 'mdb')
464 self.mox.ReplayAll()
465 self.usergroup_service.UpdateSettings(
466 self.cnxn, 888,
467 usergroup_pb2.MakeSettings('owners', 'mdb'))
468 self.mox.VerifyAll()
469
470 def testUpdateSettings_WithFriends(self):
471 self.SetUpUpdateSettings(888, 'anyone', friend_projects=[789])
472 self.mox.ReplayAll()
473 self.usergroup_service.UpdateSettings(
474 self.cnxn, 888,
475 usergroup_pb2.MakeSettings('anyone', friend_projects=[789]))
476 self.mox.VerifyAll()
477
478 def testExpungeUsersInGroups(self):
479 self.usergroup_service.usergroupprojects_tbl.Delete = mock.Mock()
480 self.usergroup_service.usergroupsettings_tbl.Delete = mock.Mock()
481 self.usergroup_service.usergroup_tbl.Delete = mock.Mock()
482
483 ids = [222, 333, 444]
484 self.usergroup_service.ExpungeUsersInGroups(self.cnxn, ids)
485
486 self.usergroup_service.usergroupprojects_tbl.Delete.assert_called_once_with(
487 self.cnxn, group_id=ids, commit=False)
488 self.usergroup_service.usergroupsettings_tbl.Delete.assert_called_once_with(
489 self.cnxn, group_id=ids, commit=False)
490 self.usergroup_service.usergroup_tbl.Delete.assert_has_calls(
491 [mock.call(self.cnxn, group_id=ids, commit=False),
492 mock.call(self.cnxn, user_id=ids, commit=False)])
493
494 def SetUpDAG(self, group_id_rows, usergroup_rows):
495 self.usergroup_service.usergroupsettings_tbl.Select(
496 self.cnxn, cols=['group_id']).AndReturn(group_id_rows)
497 self.usergroup_service.usergroup_tbl.Select(
498 self.cnxn, cols=['user_id', 'group_id'], distinct=True,
499 user_id=[r[0] for r in group_id_rows]).AndReturn(usergroup_rows)
500
501 def testDAG_Build(self):
502 # Old entries should go away after rebuilding
503 self.usergroup_service.group_dag.user_group_parents = (
504 collections.defaultdict(list))
505 self.usergroup_service.group_dag.user_group_parents[111] = [222]
506 # Two groups: 888 and 999 while 999 is a member of 888.
507 self.SetUpDAG([(888,), (999,)], [(999, 888)])
508 self.mox.ReplayAll()
509 self.usergroup_service.group_dag.Build(self.cnxn)
510 self.mox.VerifyAll()
511 self.assertIn(888, self.usergroup_service.group_dag.user_group_children)
512 self.assertIn(999, self.usergroup_service.group_dag.user_group_parents)
513 self.assertNotIn(111, self.usergroup_service.group_dag.user_group_parents)
514
515 def testDAG_GetAllAncestors(self):
516 # Three groups: 777, 888 and 999.
517 # 999 is a direct member of 888, and 888 is a direct member of 777.
518 self.SetUpDAG([(777,), (888,), (999,)], [(999, 888), (888, 777)])
519 self.mox.ReplayAll()
520 ancestors = self.usergroup_service.group_dag.GetAllAncestors(
521 self.cnxn, 999)
522 self.mox.VerifyAll()
523 ancestors.sort()
524 self.assertEqual([777, 888], ancestors)
525
526 def testDAG_GetAllAncestorsDiamond(self):
527 # Four groups: 666, 777, 888 and 999.
528 # 999 is a direct member of both 888 and 777,
529 # 888 is a direct member of 666, and 777 is also a direct member of 666.
530 self.SetUpDAG([(666, ), (777,), (888,), (999,)],
531 [(999, 888), (999, 777), (888, 666), (777, 666)])
532 self.mox.ReplayAll()
533 ancestors = self.usergroup_service.group_dag.GetAllAncestors(
534 self.cnxn, 999)
535 self.mox.VerifyAll()
536 ancestors.sort()
537 self.assertEqual([666, 777, 888], ancestors)
538
539 def testDAG_GetAllDescendants(self):
540 # Four groups: 666, 777, 888 and 999.
541 # 999 is a direct member of both 888 and 777,
542 # 888 is a direct member of 666, and 777 is also a direct member of 666.
543 self.SetUpDAG([(666, ), (777,), (888,), (999,)],
544 [(999, 888), (999, 777), (888, 666), (777, 666)])
545 self.mox.ReplayAll()
546 descendants = self.usergroup_service.group_dag.GetAllDescendants(
547 self.cnxn, 666)
548 self.mox.VerifyAll()
549 descendants.sort()
550 self.assertEqual([777, 888, 999], descendants)
551
552 def testDAG_IsChild(self):
553 # Four groups: 666, 777, 888 and 999.
554 # 999 is a direct member of both 888 and 777,
555 # 888 is a direct member of 666, and 777 is also a direct member of 666.
556 self.SetUpDAG([(666, ), (777,), (888,), (999,)],
557 [(999, 888), (999, 777), (888, 666), (777, 666)])
558 self.mox.ReplayAll()
559 result1 = self.usergroup_service.group_dag.IsChild(
560 self.cnxn, 777, 666)
561 result2 = self.usergroup_service.group_dag.IsChild(
562 self.cnxn, 777, 888)
563 self.mox.VerifyAll()
564 self.assertTrue(result1)
565 self.assertFalse(result2)