Project import generated by Copybara.

GitOrigin-RevId: d9e9e3fb4e31372ec1fb43b178994ca78fa8fe70
diff --git a/framework/test/servlet_test.py b/framework/test/servlet_test.py
new file mode 100644
index 0000000..40d5ed2
--- /dev/null
+++ b/framework/test/servlet_test.py
@@ -0,0 +1,474 @@
+# Copyright 2016 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style
+# license that can be found in the LICENSE file or at
+# https://developers.google.com/open-source/licenses/bsd
+
+"""Unit tests for servlet base class module."""
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+
+import time
+import mock
+import unittest
+
+from google.appengine.api import app_identity
+from google.appengine.ext import testbed
+
+import webapp2
+
+from framework import framework_constants
+from framework import servlet
+from framework import xsrf
+from proto import project_pb2
+from proto import tracker_pb2
+from proto import user_pb2
+from services import service_manager
+from testing import fake
+from testing import testing_helpers
+
+
+class TestableServlet(servlet.Servlet):
+  """A tiny concrete subclass of abstract class Servlet."""
+
+  def __init__(self, request, response, services=None, do_post_redirect=True):
+    super(TestableServlet, self).__init__(request, response, services=services)
+    self.do_post_redirect = do_post_redirect
+    self.seen_post_data = None
+
+  def ProcessFormData(self, _mr, post_data):
+    self.seen_post_data = post_data
+    if self.do_post_redirect:
+      return '/This/Is?The=Next#Page'
+    else:
+      self.response.write('sending raw data to browser')
+
+
+class ServletTest(unittest.TestCase):
+
+  def setUp(self):
+    services = service_manager.Services(
+        project=fake.ProjectService(),
+        project_star=fake.ProjectStarService(),
+        user=fake.UserService(),
+        usergroup=fake.UserGroupService())
+    services.user.TestAddUser('user@example.com', 111)
+    self.page_class = TestableServlet(
+        webapp2.Request.blank('/'), webapp2.Response(), services=services)
+    self.testbed = testbed.Testbed()
+    self.testbed.activate()
+    self.testbed.init_user_stub()
+    self.testbed.init_memcache_stub()
+    self.testbed.init_datastore_v3_stub()
+
+  def tearDown(self):
+    self.testbed.deactivate()
+
+  def testDefaultValues(self):
+    self.assertEqual(None, self.page_class._MAIN_TAB_MODE)
+    self.assertTrue(self.page_class._TEMPLATE_PATH.endswith('/templates/'))
+    self.assertEqual(None, self.page_class._PAGE_TEMPLATE)
+
+  def testGatherBaseData(self):
+    project = self.page_class.services.project.TestAddProject(
+        'testproj', state=project_pb2.ProjectState.LIVE)
+    project.cached_content_timestamp = 12345
+
+    (_request, mr) = testing_helpers.GetRequestObjects(
+        path='/p/testproj/feeds', project=project)
+    nonce = '1a2b3c4d5e6f7g'
+
+    base_data = self.page_class.GatherBaseData(mr, nonce)
+
+    self.assertEqual(base_data['nonce'], nonce)
+    self.assertEqual(base_data['projectname'], 'testproj')
+    self.assertEqual(base_data['project'].cached_content_timestamp, 12345)
+    self.assertEqual(base_data['project_alert'], None)
+
+    self.assertTrue(base_data['currentPageURL'].endswith('/p/testproj/feeds'))
+    self.assertTrue(
+        base_data['currentPageURLEncoded'].endswith('%2Fp%2Ftestproj%2Ffeeds'))
+
+  def testFormHandlerURL(self):
+    self.assertEqual('/edit.do', self.page_class._FormHandlerURL('/'))
+    self.assertEqual(
+      '/something/edit.do',
+      self.page_class._FormHandlerURL('/something/'))
+    self.assertEqual(
+      '/something/edit.do',
+      self.page_class._FormHandlerURL('/something/edit.do'))
+    self.assertEqual(
+      '/something/detail_ezt.do',
+      self.page_class._FormHandlerURL('/something/detail_ezt'))
+
+  def testProcessForm_BadToken(self):
+    user_id = 111
+    token = 'no soup for you'
+
+    request, mr = testing_helpers.GetRequestObjects(
+        path='/we/we/we?so=excited',
+        params={
+            'yesterday': 'thursday',
+            'today': 'friday',
+            'token': token
+        },
+        user_info={'user_id': user_id},
+        method='POST',
+    )
+    self.assertRaises(
+        xsrf.TokenIncorrect, self.page_class._DoFormProcessing, request, mr)
+    self.assertEqual(None, self.page_class.seen_post_data)
+
+  def testProcessForm_XhrAllowed_BadToken(self):
+    user_id = 111
+    token = 'no soup for you'
+
+    self.page_class.ALLOW_XHR = True
+
+    request, mr = testing_helpers.GetRequestObjects(
+        path='/we/we/we?so=excited',
+        params={
+            'yesterday': 'thursday',
+            'today': 'friday',
+            'token': token
+        },
+        user_info={'user_id': user_id},
+        method='POST',
+    )
+    self.assertRaises(
+        xsrf.TokenIncorrect, self.page_class._DoFormProcessing, request, mr)
+    self.assertEqual(None, self.page_class.seen_post_data)
+
+  def testProcessForm_XhrAllowed_AcceptsPathToken(self):
+    user_id = 111
+    token = xsrf.GenerateToken(user_id, '/we/we/we')
+
+    self.page_class.ALLOW_XHR = True
+
+    request, mr = testing_helpers.GetRequestObjects(
+        path='/we/we/we?so=excited',
+        params={
+            'yesterday': 'thursday',
+            'today': 'friday',
+            'token': token
+        },
+        user_info={'user_id': user_id},
+        method='POST',
+    )
+    with self.assertRaises(webapp2.HTTPException) as cm:
+      self.page_class._DoFormProcessing(request, mr)
+    self.assertEqual(302, cm.exception.code)  # forms redirect on success
+
+    self.assertDictEqual(
+        {
+            'yesterday': 'thursday',
+            'today': 'friday',
+            'token': token
+        }, dict(self.page_class.seen_post_data))
+
+  def testProcessForm_XhrAllowed_AcceptsXhrToken(self):
+    user_id = 111
+    token = xsrf.GenerateToken(user_id, 'xhr')
+
+    self.page_class.ALLOW_XHR = True
+
+    request, mr = testing_helpers.GetRequestObjects(
+        path='/we/we/we?so=excited',
+        params={'yesterday': 'thursday', 'today': 'friday', 'token': token},
+        user_info={'user_id': user_id},
+        method='POST',
+    )
+    with self.assertRaises(webapp2.HTTPException) as cm:
+      self.page_class._DoFormProcessing(request, mr)
+    self.assertEqual(302, cm.exception.code)  # forms redirect on success
+
+    self.assertDictEqual(
+        {
+            'yesterday': 'thursday',
+            'today': 'friday',
+            'token': token
+        }, dict(self.page_class.seen_post_data))
+
+  def testProcessForm_RawResponse(self):
+    user_id = 111
+    token = xsrf.GenerateToken(user_id, '/we/we/we')
+
+    request, mr = testing_helpers.GetRequestObjects(
+        path='/we/we/we?so=excited',
+        params={'yesterday': 'thursday', 'today': 'friday', 'token': token},
+        user_info={'user_id': user_id},
+        method='POST',
+    )
+    self.page_class.do_post_redirect = False
+    self.page_class._DoFormProcessing(request, mr)
+    self.assertEqual(
+        'sending raw data to browser',
+        self.page_class.response.body)
+
+  def testProcessForm_Normal(self):
+    user_id = 111
+    token = xsrf.GenerateToken(user_id, '/we/we/we')
+
+    request, mr = testing_helpers.GetRequestObjects(
+        path='/we/we/we?so=excited',
+        params={'yesterday': 'thursday', 'today': 'friday', 'token': token},
+        user_info={'user_id': user_id},
+        method='POST',
+    )
+    with self.assertRaises(webapp2.HTTPException) as cm:
+      self.page_class._DoFormProcessing(request, mr)
+    self.assertEqual(302, cm.exception.code)  # forms redirect on success
+
+    self.assertDictEqual(
+        {'yesterday': 'thursday', 'today': 'friday', 'token': token},
+        dict(self.page_class.seen_post_data))
+
+  def testCalcProjectAlert(self):
+    project = fake.Project(
+        project_name='alerttest', state=project_pb2.ProjectState.LIVE)
+
+    project_alert = servlet._CalcProjectAlert(project)
+    self.assertEqual(project_alert, None)
+
+    project.state = project_pb2.ProjectState.ARCHIVED
+    project_alert = servlet._CalcProjectAlert(project)
+    self.assertEqual(
+        project_alert,
+        'Project is archived: read-only by members only.')
+
+    delete_time = int(time.time() + framework_constants.SECS_PER_DAY * 1.5)
+    project.delete_time = delete_time
+    project_alert = servlet._CalcProjectAlert(project)
+    self.assertEqual(project_alert, 'Scheduled for deletion in 1 day.')
+
+    delete_time = int(time.time() + framework_constants.SECS_PER_DAY * 2.5)
+    project.delete_time = delete_time
+    project_alert = servlet._CalcProjectAlert(project)
+    self.assertEqual(project_alert, 'Scheduled for deletion in 2 days.')
+
+  def testCheckForMovedProject_NoRedirect(self):
+    project = fake.Project(
+        project_name='proj', state=project_pb2.ProjectState.LIVE)
+    request, mr = testing_helpers.GetRequestObjects(
+        path='/p/proj', project=project)
+    self.page_class._CheckForMovedProject(mr, request)
+
+    request, mr = testing_helpers.GetRequestObjects(
+        path='/p/proj/source/browse/p/adminAdvanced', project=project)
+    self.page_class._CheckForMovedProject(mr, request)
+
+  def testCheckForMovedProject_Redirect(self):
+    project = fake.Project(project_name='proj', moved_to='http://example.com')
+    request, mr = testing_helpers.GetRequestObjects(
+        path='/p/proj', project=project)
+    with self.assertRaises(webapp2.HTTPException) as cm:
+      self.page_class._CheckForMovedProject(mr, request)
+    self.assertEqual(302, cm.exception.code)  # redirect because project moved
+
+    request, mr = testing_helpers.GetRequestObjects(
+        path='/p/proj/source/browse/p/adminAdvanced', project=project)
+    with self.assertRaises(webapp2.HTTPException) as cm:
+      self.page_class._CheckForMovedProject(mr, request)
+    self.assertEqual(302, cm.exception.code)  # redirect because project moved
+
+  def testCheckForMovedProject_AdminAdvanced(self):
+    """We do not redirect away from the page that edits project state."""
+    project = fake.Project(project_name='proj', moved_to='http://example.com')
+    request, mr = testing_helpers.GetRequestObjects(
+        path='/p/proj/adminAdvanced', project=project)
+    self.page_class._CheckForMovedProject(mr, request)
+
+    request, mr = testing_helpers.GetRequestObjects(
+        path='/p/proj/adminAdvanced?ts=123234', project=project)
+    self.page_class._CheckForMovedProject(mr, request)
+
+    request, mr = testing_helpers.GetRequestObjects(
+        path='/p/proj/adminAdvanced.do', project=project)
+    self.page_class._CheckForMovedProject(mr, request)
+
+  @mock.patch('settings.branded_domains',
+              {'proj': 'branded.example.com', '*': 'bugs.chromium.org'})
+  def testMaybeRedirectToBrandedDomain_RedirBrandedProject(self):
+    """We redirect for a branded project if the user typed a different host."""
+    project = fake.Project(project_name='proj')
+    request, _mr = testing_helpers.GetRequestObjects(
+        path='/p/proj/path', project=project)
+    with self.assertRaises(webapp2.HTTPException) as cm:
+      self.page_class._MaybeRedirectToBrandedDomain(request, 'proj')
+    self.assertEqual(302, cm.exception.code)  # forms redirect on success
+    self.assertEqual('https://branded.example.com/p/proj/path?redir=1',
+                     cm.exception.location)
+
+    request, _mr = testing_helpers.GetRequestObjects(
+      path='/p/proj/path?query', project=project)
+    with self.assertRaises(webapp2.HTTPException) as cm:
+      self.page_class._MaybeRedirectToBrandedDomain(request, 'proj')
+    self.assertEqual(302, cm.exception.code)  # forms redirect on success
+    self.assertEqual('https://branded.example.com/p/proj/path?query&redir=1',
+                     cm.exception.location)
+
+  @mock.patch('settings.branded_domains',
+              {'proj': 'branded.example.com', '*': 'bugs.chromium.org'})
+  def testMaybeRedirectToBrandedDomain_AvoidRedirLoops(self):
+    """Don't redirect for a branded project if already redirected."""
+    project = fake.Project(project_name='proj')
+    request, _mr = testing_helpers.GetRequestObjects(
+        path='/p/proj/path?redir=1', project=project)
+    # No redirect happens.
+    self.page_class._MaybeRedirectToBrandedDomain(request, 'proj')
+
+  @mock.patch('settings.branded_domains',
+              {'proj': 'branded.example.com', '*': 'bugs.chromium.org'})
+  def testMaybeRedirectToBrandedDomain_NonProjectPage(self):
+    """Don't redirect for a branded project if not in any project."""
+    request, _mr = testing_helpers.GetRequestObjects(
+        path='/u/user@example.com')
+    # No redirect happens.
+    self.page_class._MaybeRedirectToBrandedDomain(request, None)
+
+  @mock.patch('settings.branded_domains',
+              {'proj': 'branded.example.com', '*': 'bugs.chromium.org'})
+  def testMaybeRedirectToBrandedDomain_AlreadyOnBrandedHost(self):
+    """Don't redirect for a branded project if already on branded domain."""
+    project = fake.Project(project_name='proj')
+    request, _mr = testing_helpers.GetRequestObjects(
+        path='/p/proj/path', project=project)
+    request.host = 'branded.example.com'
+    # No redirect happens.
+    self.page_class._MaybeRedirectToBrandedDomain(request, 'proj')
+
+  @mock.patch('settings.branded_domains',
+              {'proj': 'branded.example.com', '*': 'bugs.chromium.org'})
+  def testMaybeRedirectToBrandedDomain_Localhost(self):
+    """Don't redirect for a branded project on localhost."""
+    project = fake.Project(project_name='proj')
+    request, _mr = testing_helpers.GetRequestObjects(
+        path='/p/proj/path', project=project)
+    request.host = 'localhost:8080'
+    # No redirect happens.
+    self.page_class._MaybeRedirectToBrandedDomain(request, 'proj')
+
+    request.host = '0.0.0.0:8080'
+    # No redirect happens.
+    self.page_class._MaybeRedirectToBrandedDomain(request, 'proj')
+
+  @mock.patch('settings.branded_domains',
+              {'proj': 'branded.example.com', '*': 'bugs.chromium.org'})
+  def testMaybeRedirectToBrandedDomain_NotBranded(self):
+    """Don't redirect for a non-branded project."""
+    project = fake.Project(project_name='other')
+    request, _mr = testing_helpers.GetRequestObjects(
+        path='/p/other/path?query', project=project)
+    request.host = 'branded.example.com'  # But other project is unbranded.
+
+    with self.assertRaises(webapp2.HTTPException) as cm:
+      self.page_class._MaybeRedirectToBrandedDomain(request, 'other')
+    self.assertEqual(302, cm.exception.code)  # forms redirect on success
+    self.assertEqual('https://bugs.chromium.org/p/other/path?query&redir=1',
+                     cm.exception.location)
+
+  def testGatherHelpData_Normal(self):
+    project = fake.Project(project_name='proj')
+    _request, mr = testing_helpers.GetRequestObjects(
+        path='/p/proj', project=project)
+    help_data = self.page_class.GatherHelpData(mr, {})
+    self.assertEqual(None, help_data['cue'])
+    self.assertEqual(None, help_data['account_cue'])
+
+  def testGatherHelpData_VacationReminder(self):
+    project = fake.Project(project_name='proj')
+    _request, mr = testing_helpers.GetRequestObjects(
+        path='/p/proj', project=project)
+    mr.auth.user_id = 111
+    mr.auth.user_pb.vacation_message = 'Gone skiing'
+    help_data = self.page_class.GatherHelpData(mr, {})
+    self.assertEqual('you_are_on_vacation', help_data['cue'])
+
+    self.page_class.services.user.SetUserPrefs(
+        'cnxn', 111,
+        [user_pb2.UserPrefValue(name='you_are_on_vacation', value='true')])
+    help_data = self.page_class.GatherHelpData(mr, {})
+    self.assertEqual(None, help_data['cue'])
+    self.assertEqual(None, help_data['account_cue'])
+
+  def testGatherHelpData_YouAreBouncing(self):
+    project = fake.Project(project_name='proj')
+    _request, mr = testing_helpers.GetRequestObjects(
+        path='/p/proj', project=project)
+    mr.auth.user_id = 111
+    mr.auth.user_pb.email_bounce_timestamp = 1497647529
+    help_data = self.page_class.GatherHelpData(mr, {})
+    self.assertEqual('your_email_bounced', help_data['cue'])
+
+    self.page_class.services.user.SetUserPrefs(
+        'cnxn', 111,
+        [user_pb2.UserPrefValue(name='your_email_bounced', value='true')])
+    help_data = self.page_class.GatherHelpData(mr, {})
+    self.assertEqual(None, help_data['cue'])
+    self.assertEqual(None, help_data['account_cue'])
+
+  def testGatherHelpData_ChildAccount(self):
+    """Display a warning when user is signed in to a child account."""
+    project = fake.Project(project_name='proj')
+    _request, mr = testing_helpers.GetRequestObjects(
+        path='/p/proj', project=project)
+    mr.auth.user_pb.linked_parent_id = 111
+    help_data = self.page_class.GatherHelpData(mr, {})
+    self.assertEqual(None, help_data['cue'])
+    self.assertEqual('switch_to_parent_account', help_data['account_cue'])
+    self.assertEqual('user@example.com', help_data['parent_email'])
+
+  def testGatherDebugData_Visibility(self):
+    project = fake.Project(
+        project_name='testtest', state=project_pb2.ProjectState.LIVE)
+    _request, mr = testing_helpers.GetRequestObjects(
+        path='/p/foo/servlet_path', project=project)
+    debug_data = self.page_class.GatherDebugData(mr, {})
+    self.assertEqual('off', debug_data['dbg'])
+
+    _request, mr = testing_helpers.GetRequestObjects(
+        path='/p/foo/servlet_path?debug=1', project=project)
+    debug_data = self.page_class.GatherDebugData(mr, {})
+    self.assertEqual('on', debug_data['dbg'])
+
+
+class ProjectIsRestrictedTest(unittest.TestCase):
+
+  def testNonRestrictedProject(self):
+    proj = project_pb2.Project()
+    mr = testing_helpers.MakeMonorailRequest()
+    mr.project = proj
+
+    proj.access = project_pb2.ProjectAccess.ANYONE
+    proj.state = project_pb2.ProjectState.LIVE
+    self.assertFalse(servlet._ProjectIsRestricted(mr))
+
+    proj.state = project_pb2.ProjectState.ARCHIVED
+    self.assertFalse(servlet._ProjectIsRestricted(mr))
+
+  def testRestrictedProject(self):
+    proj = project_pb2.Project()
+    mr = testing_helpers.MakeMonorailRequest()
+    mr.project = proj
+
+    proj.state = project_pb2.ProjectState.LIVE
+    proj.access = project_pb2.ProjectAccess.MEMBERS_ONLY
+    self.assertTrue(servlet._ProjectIsRestricted(mr))
+
+class VersionBaseTest(unittest.TestCase):
+
+  @mock.patch('settings.local_mode', True)
+  def testLocalhost(self):
+    request = webapp2.Request.blank('/', base_url='http://localhost:8080')
+    actual = servlet._VersionBaseURL(request)
+    expected = 'http://localhost:8080'
+    self.assertEqual(expected, actual)
+
+  @mock.patch('settings.local_mode', False)
+  @mock.patch('google.appengine.api.app_identity.get_default_version_hostname')
+  def testProd(self, mock_gdvh):
+    mock_gdvh.return_value = 'monorail-prod.appspot.com'
+    request = webapp2.Request.blank('/', base_url='https://bugs.chromium.org')
+    actual = servlet._VersionBaseURL(request)
+    expected = 'https://test-dot-monorail-prod.appspot.com'
+    self.assertEqual(expected, actual)