blob: c71727ae4f47d4b9f294521cb5542d8d9c054c0e [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2007 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""An APIProxy stub that communicates with VMEngine service bridges."""
from concurrent import futures
import imp
import logging
import os
import sys
from google.appengine.api import apiproxy_rpc
from google.appengine.api import apiproxy_stub_map
from google.appengine.ext.remote_api import remote_api_bytes_pb2 as remote_api_pb2
from google.appengine.runtime import apiproxy_errors
from google.appengine.runtime import context
import six.moves.urllib.parse
import urllib3
logging.getLogger('requests_nologs').setLevel(logging.ERROR)
TICKET_HEADER = 'HTTP_X_APPENGINE_API_TICKET'
DEV_TICKET_HEADER = 'HTTP_X_APPENGINE_DEV_REQUEST_ID'
DAPPER_ENV_KEY = 'HTTP_X_GOOGLE_DAPPERTRACEINFO'
SERVICE_BRIDGE_HOST = os.environ.get('API_HOST',
'appengine.googleapis.internal')
API_PORT = os.environ.get('API_PORT', '10001')
SERVICE_ENDPOINT_NAME = 'app-engine-apis'
APIHOST_METHOD = '/VMRemoteAPI.CallRemoteAPI'
PROXY_PATH = '/rpc_http'
DAPPER_HEADER = 'X-Google-DapperTraceInfo'
SERVICE_DEADLINE_HEADER = 'X-Google-RPC-Service-Deadline'
SERVICE_ENDPOINT_HEADER = 'X-Google-RPC-Service-Endpoint'
SERVICE_METHOD_HEADER = 'X-Google-RPC-Service-Method'
RPC_CONTENT_TYPE = 'application/octet-stream'
DEFAULT_TIMEOUT = 60
DEADLINE_DELTA_SECONDS = 1
MAX_CONCURRENT_API_CALLS = 100
URLLIB3_POOL_COUNT = 10
URLLIB3_POOL_SIZE = 10
_EXCEPTIONS_MAP = {
remote_api_pb2.RpcError.UNKNOWN:
(apiproxy_errors.RPCFailedError,
'The remote RPC to the application server failed for call %s.%s().'),
remote_api_pb2.RpcError.CALL_NOT_FOUND:
(apiproxy_errors.CallNotFoundError,
'The API package \'%s\' or call \'%s()\' was not found.'),
remote_api_pb2.RpcError.PARSE_ERROR:
(apiproxy_errors.ArgumentError,
'There was an error parsing arguments for API call %s.%s().'),
remote_api_pb2.RpcError.OVER_QUOTA:
(apiproxy_errors.OverQuotaError,
'The API call %s.%s() required more quota than is available.'),
remote_api_pb2.RpcError.REQUEST_TOO_LARGE:
(apiproxy_errors.RequestTooLargeError,
'The request to API call %s.%s() was too large.'),
remote_api_pb2.RpcError.CAPABILITY_DISABLED:
(apiproxy_errors.CapabilityDisabledError,
'The API call %s.%s() is temporarily disabled.'),
remote_api_pb2.RpcError.FEATURE_DISABLED:
(apiproxy_errors.FeatureNotEnabledError,
'The API call %s.%s() is currently not enabled.'),
remote_api_pb2.RpcError.RESPONSE_TOO_LARGE:
(apiproxy_errors.ResponseTooLargeError,
'The response from API call %s.%s() was too large.'),
remote_api_pb2.RpcError.CANCELLED:
(apiproxy_errors.CancelledError,
'The API call %s.%s() was explicitly cancelled.'),
remote_api_pb2.RpcError.DEADLINE_EXCEEDED:
(apiproxy_errors.DeadlineExceededError,
'The API call %s.%s() took too long to respond and was cancelled.')
}
_DEFAULT_EXCEPTION = _EXCEPTIONS_MAP[remote_api_pb2.RpcError.UNKNOWN]
_DEADLINE_EXCEEDED_EXCEPTION = _EXCEPTIONS_MAP[
remote_api_pb2.RpcError.DEADLINE_EXCEEDED]
class DefaultApiRPC(apiproxy_rpc.RPC):
"""A class representing an RPC to a remote server."""
def _ErrorException(self, exception_class, error_details):
return exception_class(error_details % (self.package, self.call))
def _TranslateToError(self, response):
"""Translates a failed APIResponse into an exception."""
if response.HasField('rpc_error'):
code = response.rpc_error.code
detail = response.rpc_error.detail
exception_type, msg = _EXCEPTIONS_MAP.get(code, _DEFAULT_EXCEPTION)
if detail:
msg = '%s -- Additional details from server: %s' % (msg, detail)
raise self._ErrorException(exception_type, msg)
raise apiproxy_errors.ApplicationError(response.application_error.code,
response.application_error.detail)
def _MakeCallImpl(self):
"""Makes an asynchronous API call over the service bridge.
For this to work the following must be set:
self.package: the API package name;
self.call: the name of the API call/method to invoke;
self.request: the API request body as a serialized protocol buffer.
The actual API call is made by urllib3.request via a thread pool
(multiprocessing.dummy.Pool). The thread pool restricts the number of
concurrent requests to MAX_CONCURRENT_API_CALLS, so this method will
block if that limit is exceeded, until other asynchronous calls resolve.
If the main thread holds the import lock, waiting on thread work can cause
a deadlock:
https://docs.python.org/2/library/threading.html#importing-in-threaded-code
Therefore, we try to detect this error case and fall back to sync calls.
"""
assert self._state == apiproxy_rpc.RPC.IDLE, self._state
if context.READ_FROM_OS_ENVIRON:
ticket = os.environ.get(TICKET_HEADER,
os.environ.get(DEV_TICKET_HEADER))
else:
ticket = context.gae_headers.API_TICKET.get(
context.gae_headers.DEV_REQUEST_ID.get(None))
request = remote_api_pb2.Request(
service_name=self.package,
method=self.call,
request_id=ticket,
request=self.request.SerializeToString())
deadline = self.deadline or DEFAULT_TIMEOUT
body_data = request.SerializeToString()
headers = {
SERVICE_DEADLINE_HEADER: str(deadline),
SERVICE_ENDPOINT_HEADER: SERVICE_ENDPOINT_NAME,
SERVICE_METHOD_HEADER: APIHOST_METHOD,
'Content-type': RPC_CONTENT_TYPE,
}
dapper_header_value = context.get(DAPPER_ENV_KEY)
if dapper_header_value:
headers[DAPPER_HEADER] = dapper_header_value
api_host = os.environ.get('API_HOST', SERVICE_BRIDGE_HOST)
api_port = os.environ.get('API_PORT', API_PORT)
if ':' in api_host:
api_host = '[{}]'.format(api_host)
endpoint_url = six.moves.urllib.parse.urlunparse(
('http', '%s:%s' % (api_host, api_port), PROXY_PATH, '', '', ''))
self._state = apiproxy_rpc.RPC.RUNNING
request_kwargs = dict(
url=endpoint_url,
method='POST',
timeout=DEADLINE_DELTA_SECONDS + deadline,
headers=headers,
body=body_data)
if six.PY2 and imp.lock_held():
self.future = futures.Future()
self.future.set_result(self._SendRequestAndFinish(**request_kwargs))
else:
self.future = self.stub.thread_pool.submit(self._SendRequestAndFinish,
**request_kwargs)
def _WaitImpl(self):
assert self.future is not None
futures.wait([self.future])
return True
def _SendRequest(self, **kwargs):
try:
response = self.stub.http.request(**kwargs)
if response.status != 200:
raise apiproxy_errors.RPCFailedError(
'Proxy returned HTTP status %s %s' %
(response.status, response.reason))
except urllib3.exceptions.TimeoutError:
raise self._ErrorException(*_DEADLINE_EXCEEDED_EXCEPTION)
except (urllib3.exceptions.RequestError,
urllib3.exceptions.ConnectionError):
raise self._ErrorException(*_DEFAULT_EXCEPTION)
parsed_response = remote_api_pb2.Response.FromString(response.data)
if (parsed_response.HasField('application_error') or
parsed_response.HasField('rpc_error')):
raise self._TranslateToError(parsed_response)
self.response.ParseFromString(parsed_response.response)
def _CaptureTrace(self, f, **kwargs):
try:
f(**kwargs)
except Exception:
_, exc, tb = sys.exc_info()
self._exception = exc
self._traceback = tb
def _SendRequestAndFinish(self, **kwargs):
try:
self._CaptureTrace(self._SendRequest, **kwargs)
finally:
if self.callback:
self._CaptureTrace(self.callback)
self._state = apiproxy_rpc.RPC.FINISHING
class DefaultApiStub(object):
"""A stub for calling services through a VM service bridge.
You can use this to stub out any service that the remote server supports.
"""
def __init__(self):
self.thread_pool = futures.ThreadPoolExecutor(MAX_CONCURRENT_API_CALLS)
self.http = urllib3.PoolManager(
num_pools=URLLIB3_POOL_COUNT, maxsize=URLLIB3_POOL_SIZE)
def MakeSyncCall(self, service, call, request, response):
"""Make a synchronous API call.
Args:
service: The name of the service you are trying to use.
call: The name of the method.
request: The request protocol buffer
response: The response protocol buffer to be filled.
"""
rpc = self.CreateRPC()
rpc.MakeCall(service, call, request, response)
rpc.Wait()
rpc.CheckSuccess()
def CreateRPC(self):
"""Create a new RPC object."""
return DefaultApiRPC(stub=self)
def Register(stub):
"""Insert stubs so App Engine services are accessed via the service bridge."""
apiproxy_stub_map.apiproxy.SetDefaultStub(stub)