Merge branch 'main' into avm99963-monorail

Merged commit 34d8229ae2b51fb1a15bd208e6fe6185c94f6266

GitOrigin-RevId: 7ee0917f93a577e475f8e09526dd144d245593f4
diff --git a/third_party/appengine-python-standard/default_api_stub.py b/third_party/appengine-python-standard/default_api_stub.py
new file mode 100644
index 0000000..c71727a
--- /dev/null
+++ b/third_party/appengine-python-standard/default_api_stub.py
@@ -0,0 +1,320 @@
+#!/usr/bin/env python
+#
+# Copyright 2007 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+
+
+"""An APIProxy stub that communicates with VMEngine service bridges."""
+
+from concurrent import futures
+import imp
+import logging
+import os
+import sys
+from google.appengine.api import apiproxy_rpc
+from google.appengine.api import apiproxy_stub_map
+from google.appengine.ext.remote_api import remote_api_bytes_pb2 as remote_api_pb2
+from google.appengine.runtime import apiproxy_errors
+from google.appengine.runtime import context
+import six.moves.urllib.parse
+import urllib3
+
+
+
+
+
+
+
+
+
+logging.getLogger('requests_nologs').setLevel(logging.ERROR)
+
+TICKET_HEADER = 'HTTP_X_APPENGINE_API_TICKET'
+DEV_TICKET_HEADER = 'HTTP_X_APPENGINE_DEV_REQUEST_ID'
+DAPPER_ENV_KEY = 'HTTP_X_GOOGLE_DAPPERTRACEINFO'
+SERVICE_BRIDGE_HOST = os.environ.get('API_HOST',
+                                     'appengine.googleapis.internal')
+API_PORT = os.environ.get('API_PORT', '10001')
+SERVICE_ENDPOINT_NAME = 'app-engine-apis'
+APIHOST_METHOD = '/VMRemoteAPI.CallRemoteAPI'
+PROXY_PATH = '/rpc_http'
+DAPPER_HEADER = 'X-Google-DapperTraceInfo'
+SERVICE_DEADLINE_HEADER = 'X-Google-RPC-Service-Deadline'
+SERVICE_ENDPOINT_HEADER = 'X-Google-RPC-Service-Endpoint'
+SERVICE_METHOD_HEADER = 'X-Google-RPC-Service-Method'
+RPC_CONTENT_TYPE = 'application/octet-stream'
+DEFAULT_TIMEOUT = 60
+
+DEADLINE_DELTA_SECONDS = 1
+
+
+
+
+
+MAX_CONCURRENT_API_CALLS = 100
+
+URLLIB3_POOL_COUNT = 10
+
+URLLIB3_POOL_SIZE = 10
+
+
+
+_EXCEPTIONS_MAP = {
+    remote_api_pb2.RpcError.UNKNOWN:
+        (apiproxy_errors.RPCFailedError,
+         'The remote RPC to the application server failed for call %s.%s().'),
+    remote_api_pb2.RpcError.CALL_NOT_FOUND:
+        (apiproxy_errors.CallNotFoundError,
+         'The API package \'%s\' or call \'%s()\' was not found.'),
+    remote_api_pb2.RpcError.PARSE_ERROR:
+        (apiproxy_errors.ArgumentError,
+         'There was an error parsing arguments for API call %s.%s().'),
+    remote_api_pb2.RpcError.OVER_QUOTA:
+        (apiproxy_errors.OverQuotaError,
+         'The API call %s.%s() required more quota than is available.'),
+    remote_api_pb2.RpcError.REQUEST_TOO_LARGE:
+        (apiproxy_errors.RequestTooLargeError,
+         'The request to API call %s.%s() was too large.'),
+    remote_api_pb2.RpcError.CAPABILITY_DISABLED:
+        (apiproxy_errors.CapabilityDisabledError,
+         'The API call %s.%s() is temporarily disabled.'),
+    remote_api_pb2.RpcError.FEATURE_DISABLED:
+        (apiproxy_errors.FeatureNotEnabledError,
+         'The API call %s.%s() is currently not enabled.'),
+    remote_api_pb2.RpcError.RESPONSE_TOO_LARGE:
+        (apiproxy_errors.ResponseTooLargeError,
+         'The response from API call %s.%s() was too large.'),
+    remote_api_pb2.RpcError.CANCELLED:
+        (apiproxy_errors.CancelledError,
+         'The API call %s.%s() was explicitly cancelled.'),
+    remote_api_pb2.RpcError.DEADLINE_EXCEEDED:
+        (apiproxy_errors.DeadlineExceededError,
+         'The API call %s.%s() took too long to respond and was cancelled.')
+}
+
+_DEFAULT_EXCEPTION = _EXCEPTIONS_MAP[remote_api_pb2.RpcError.UNKNOWN]
+
+_DEADLINE_EXCEEDED_EXCEPTION = _EXCEPTIONS_MAP[
+    remote_api_pb2.RpcError.DEADLINE_EXCEEDED]
+
+
+
+
+
+
+class DefaultApiRPC(apiproxy_rpc.RPC):
+  """A class representing an RPC to a remote server."""
+
+  def _ErrorException(self, exception_class, error_details):
+    return exception_class(error_details % (self.package, self.call))
+
+  def _TranslateToError(self, response):
+    """Translates a failed APIResponse into an exception."""
+
+
+    if response.HasField('rpc_error'):
+      code = response.rpc_error.code
+      detail = response.rpc_error.detail
+      exception_type, msg = _EXCEPTIONS_MAP.get(code, _DEFAULT_EXCEPTION)
+      if detail:
+        msg = '%s -- Additional details from server: %s' % (msg, detail)
+      raise self._ErrorException(exception_type, msg)
+
+
+    raise apiproxy_errors.ApplicationError(response.application_error.code,
+                                           response.application_error.detail)
+
+  def _MakeCallImpl(self):
+    """Makes an asynchronous API call over the service bridge.
+
+    For this to work the following must be set:
+      self.package: the API package name;
+      self.call: the name of the API call/method to invoke;
+      self.request: the API request body as a serialized protocol buffer.
+
+    The actual API call is made by urllib3.request via a thread pool
+    (multiprocessing.dummy.Pool). The thread pool restricts the number of
+    concurrent requests to MAX_CONCURRENT_API_CALLS, so this method will
+    block if that limit is exceeded, until other asynchronous calls resolve.
+
+    If the main thread holds the import lock, waiting on thread work can cause
+    a deadlock:
+    https://docs.python.org/2/library/threading.html#importing-in-threaded-code
+
+    Therefore, we try to detect this error case and fall back to sync calls.
+    """
+    assert self._state == apiproxy_rpc.RPC.IDLE, self._state
+
+
+
+
+
+
+
+
+    if context.READ_FROM_OS_ENVIRON:
+      ticket = os.environ.get(TICKET_HEADER,
+                              os.environ.get(DEV_TICKET_HEADER))
+    else:
+
+
+
+      ticket = context.gae_headers.API_TICKET.get(
+          context.gae_headers.DEV_REQUEST_ID.get(None))
+
+    request = remote_api_pb2.Request(
+        service_name=self.package,
+        method=self.call,
+        request_id=ticket,
+        request=self.request.SerializeToString())
+
+    deadline = self.deadline or DEFAULT_TIMEOUT
+
+    body_data = request.SerializeToString()
+    headers = {
+        SERVICE_DEADLINE_HEADER: str(deadline),
+        SERVICE_ENDPOINT_HEADER: SERVICE_ENDPOINT_NAME,
+        SERVICE_METHOD_HEADER: APIHOST_METHOD,
+        'Content-type': RPC_CONTENT_TYPE,
+    }
+
+
+    dapper_header_value = context.get(DAPPER_ENV_KEY)
+    if dapper_header_value:
+      headers[DAPPER_HEADER] = dapper_header_value
+
+
+
+
+
+    api_host = os.environ.get('API_HOST', SERVICE_BRIDGE_HOST)
+    api_port = os.environ.get('API_PORT', API_PORT)
+
+    if ':' in api_host:
+      api_host = '[{}]'.format(api_host)
+    endpoint_url = six.moves.urllib.parse.urlunparse(
+        ('http', '%s:%s' % (api_host, api_port), PROXY_PATH, '', '', ''))
+
+    self._state = apiproxy_rpc.RPC.RUNNING
+
+    request_kwargs = dict(
+        url=endpoint_url,
+        method='POST',
+        timeout=DEADLINE_DELTA_SECONDS + deadline,
+        headers=headers,
+        body=body_data)
+
+
+
+
+
+
+    if six.PY2 and imp.lock_held():
+      self.future = futures.Future()
+      self.future.set_result(self._SendRequestAndFinish(**request_kwargs))
+
+    else:
+
+
+      self.future = self.stub.thread_pool.submit(self._SendRequestAndFinish,
+                                                 **request_kwargs)
+
+  def _WaitImpl(self):
+
+    assert self.future is not None
+    futures.wait([self.future])
+    return True
+
+  def _SendRequest(self, **kwargs):
+    try:
+      response = self.stub.http.request(**kwargs)
+
+      if response.status != 200:
+        raise apiproxy_errors.RPCFailedError(
+            'Proxy returned HTTP status %s %s' %
+            (response.status, response.reason))
+    except urllib3.exceptions.TimeoutError:
+      raise self._ErrorException(*_DEADLINE_EXCEEDED_EXCEPTION)
+    except (urllib3.exceptions.RequestError,
+            urllib3.exceptions.ConnectionError):
+
+      raise self._ErrorException(*_DEFAULT_EXCEPTION)
+
+
+    parsed_response = remote_api_pb2.Response.FromString(response.data)
+
+
+    if (parsed_response.HasField('application_error') or
+        parsed_response.HasField('rpc_error')):
+      raise self._TranslateToError(parsed_response)
+
+
+    self.response.ParseFromString(parsed_response.response)
+
+  def _CaptureTrace(self, f, **kwargs):
+    try:
+      f(**kwargs)
+    except Exception:
+
+
+      _, exc, tb = sys.exc_info()
+      self._exception = exc
+      self._traceback = tb
+
+  def _SendRequestAndFinish(self, **kwargs):
+    try:
+      self._CaptureTrace(self._SendRequest, **kwargs)
+    finally:
+      if self.callback:
+        self._CaptureTrace(self.callback)
+      self._state = apiproxy_rpc.RPC.FINISHING
+
+
+class DefaultApiStub(object):
+  """A stub for calling services through a VM service bridge.
+
+  You can use this to stub out any service that the remote server supports.
+  """
+
+
+  def __init__(self):
+    self.thread_pool = futures.ThreadPoolExecutor(MAX_CONCURRENT_API_CALLS)
+    self.http = urllib3.PoolManager(
+        num_pools=URLLIB3_POOL_COUNT, maxsize=URLLIB3_POOL_SIZE)
+
+  def MakeSyncCall(self, service, call, request, response):
+    """Make a synchronous API call.
+
+    Args:
+      service: The name of the service you are trying to use.
+      call: The name of the method.
+      request: The request protocol buffer
+      response: The response protocol buffer to be filled.
+    """
+    rpc = self.CreateRPC()
+    rpc.MakeCall(service, call, request, response)
+    rpc.Wait()
+    rpc.CheckSuccess()
+
+  def CreateRPC(self):
+    """Create a new RPC object."""
+    return DefaultApiRPC(stub=self)
+
+
+def Register(stub):
+  """Insert stubs so App Engine services are accessed via the service bridge."""
+  apiproxy_stub_map.apiproxy.SetDefaultStub(stub)