Project import generated by Copybara.

GitOrigin-RevId: d9e9e3fb4e31372ec1fb43b178994ca78fa8fe70
diff --git a/api/test/sitewide_servicer_test.py b/api/test/sitewide_servicer_test.py
new file mode 100644
index 0000000..3259fbb
--- /dev/null
+++ b/api/test/sitewide_servicer_test.py
@@ -0,0 +1,143 @@
+# Copyright 2018 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style
+# license that can be found in the LICENSE file or at
+# https://developers.google.com/open-source/licenses/bsd
+
+"""Tests for the sitewide servicer."""
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+
+import time
+import unittest
+
+import mock
+from components.prpc import codes
+from components.prpc import context
+from components.prpc import server
+
+import settings
+from api import sitewide_servicer
+from api.api_proto import common_pb2
+from api.api_proto import sitewide_pb2
+from framework import monorailcontext
+from framework import xsrf
+from services import service_manager
+from testing import fake
+
+
+class SitewideServicerTest(unittest.TestCase):
+
+  def setUp(self):
+    self.cnxn = fake.MonorailConnection()
+    self.services = service_manager.Services(
+        usergroup=fake.UserGroupService(),
+        user=fake.UserService())
+    self.user_1 = self.services.user.TestAddUser('owner@example.com', 111)
+    self.sitewide_svcr = sitewide_servicer.SitewideServicer(
+        self.services, make_rate_limiter=False)
+
+  def CallWrapped(self, wrapped_handler, *args, **kwargs):
+    return wrapped_handler.wrapped(self.sitewide_svcr, *args, **kwargs)
+
+  @mock.patch('services.secrets_svc.GetXSRFKey')
+  @mock.patch('time.time')
+  def testRefreshToken(self, mockTime, mockGetXSRFKey):
+    """We can refresh an expired token."""
+    mockGetXSRFKey.side_effect = lambda: 'fakeXSRFKey'
+    # The token is at the brink of being too old
+    mockTime.side_effect = lambda: 1 + xsrf.REFRESH_TOKEN_TIMEOUT_SEC
+
+    token_path = 'token_path'
+    token = xsrf.GenerateToken(111, token_path, 1)
+
+    request = sitewide_pb2.RefreshTokenRequest(
+        token=token, token_path=token_path)
+    mc = monorailcontext.MonorailContext(
+        self.services, cnxn=self.cnxn, requester='owner@example.com')
+    response = self.CallWrapped(self.sitewide_svcr.RefreshToken, mc, request)
+
+    self.assertEqual(
+        sitewide_pb2.RefreshTokenResponse(
+            token='QSaKMyXhY752g7n8a34HyTo4NjQwMDE=',
+            token_expires_sec=870901),
+        response)
+
+  @mock.patch('services.secrets_svc.GetXSRFKey')
+  @mock.patch('time.time')
+  def testRefreshToken_InvalidToken(self, mockTime, mockGetXSRFKey):
+    """We reject attempts to refresh an invalid token."""
+    mockGetXSRFKey.side_effect = ['fakeXSRFKey']
+    mockTime.side_effect = [123]
+
+    token_path = 'token_path'
+    token = 'invalidToken'
+
+    request = sitewide_pb2.RefreshTokenRequest(
+        token=token, token_path=token_path)
+    mc = monorailcontext.MonorailContext(
+        self.services, cnxn=self.cnxn, requester='owner@example.com')
+
+    with self.assertRaises(xsrf.TokenIncorrect):
+      self.CallWrapped(self.sitewide_svcr.RefreshToken, mc, request)
+
+  @mock.patch('services.secrets_svc.GetXSRFKey')
+  @mock.patch('time.time')
+  def testRefreshToken_TokenTooOld(self, mockTime, mockGetXSRFKey):
+    """We reject attempts to refresh a token that's too old."""
+    mockGetXSRFKey.side_effect = lambda: 'fakeXSRFKey'
+    mockTime.side_effect = lambda: 2 + xsrf.REFRESH_TOKEN_TIMEOUT_SEC
+
+    token_path = 'token_path'
+    token = xsrf.GenerateToken(111, token_path, 1)
+
+    request = sitewide_pb2.RefreshTokenRequest(
+        token=token, token_path=token_path)
+    mc = monorailcontext.MonorailContext(
+        self.services, cnxn=self.cnxn, requester='owner@example.com')
+
+    with self.assertRaises(xsrf.TokenIncorrect):
+      self.CallWrapped(self.sitewide_svcr.RefreshToken, mc, request)
+
+  def testGetServerStatus_Normal(self):
+    request = sitewide_pb2.GetServerStatusRequest()
+    mc = monorailcontext.MonorailContext(
+        self.services, cnxn=self.cnxn, requester='owner@example.com')
+    response = self.CallWrapped(self.sitewide_svcr.GetServerStatus, mc, request)
+
+    self.assertEqual(
+        sitewide_pb2.GetServerStatusResponse(),
+        response)
+
+  @mock.patch('settings.banner_message', 'Message')
+  def testGetServerStatus_BannerMessage(self):
+    request = sitewide_pb2.GetServerStatusRequest()
+    mc = monorailcontext.MonorailContext(
+        self.services, cnxn=self.cnxn, requester='owner@example.com')
+    response = self.CallWrapped(self.sitewide_svcr.GetServerStatus, mc, request)
+
+    self.assertEqual(
+        sitewide_pb2.GetServerStatusResponse(banner_message='Message'),
+        response)
+
+  @mock.patch('settings.banner_time', (2019, 6, 13, 18, 30))
+  def testGetServerStatus_BannerTime(self):
+    request = sitewide_pb2.GetServerStatusRequest()
+    mc = monorailcontext.MonorailContext(
+        self.services, cnxn=self.cnxn, requester='owner@example.com')
+    response = self.CallWrapped(self.sitewide_svcr.GetServerStatus, mc, request)
+
+    self.assertEqual(
+        sitewide_pb2.GetServerStatusResponse(banner_time=1560450600),
+        response)
+
+  @mock.patch('settings.read_only', True)
+  def testGetServerStatus_ReadOnly(self):
+    request = sitewide_pb2.GetServerStatusRequest()
+    mc = monorailcontext.MonorailContext(
+        self.services, cnxn=self.cnxn, requester='owner@example.com')
+    response = self.CallWrapped(self.sitewide_svcr.GetServerStatus, mc, request)
+
+    self.assertEqual(
+        sitewide_pb2.GetServerStatusResponse(read_only=True),
+        response)