blob: c71727ae4f47d4b9f294521cb5542d8d9c054c0e [file] [log] [blame]
Adrià Vilanova Martínezf19ea432024-01-23 20:20:52 +01001#!/usr/bin/env python
2#
3# Copyright 2007 Google LLC
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18
19
20
21"""An APIProxy stub that communicates with VMEngine service bridges."""
22
23from concurrent import futures
24import imp
25import logging
26import os
27import sys
28from google.appengine.api import apiproxy_rpc
29from google.appengine.api import apiproxy_stub_map
30from google.appengine.ext.remote_api import remote_api_bytes_pb2 as remote_api_pb2
31from google.appengine.runtime import apiproxy_errors
32from google.appengine.runtime import context
33import six.moves.urllib.parse
34import urllib3
35
36
37
38
39
40
41
42
43
44logging.getLogger('requests_nologs').setLevel(logging.ERROR)
45
46TICKET_HEADER = 'HTTP_X_APPENGINE_API_TICKET'
47DEV_TICKET_HEADER = 'HTTP_X_APPENGINE_DEV_REQUEST_ID'
48DAPPER_ENV_KEY = 'HTTP_X_GOOGLE_DAPPERTRACEINFO'
49SERVICE_BRIDGE_HOST = os.environ.get('API_HOST',
50 'appengine.googleapis.internal')
51API_PORT = os.environ.get('API_PORT', '10001')
52SERVICE_ENDPOINT_NAME = 'app-engine-apis'
53APIHOST_METHOD = '/VMRemoteAPI.CallRemoteAPI'
54PROXY_PATH = '/rpc_http'
55DAPPER_HEADER = 'X-Google-DapperTraceInfo'
56SERVICE_DEADLINE_HEADER = 'X-Google-RPC-Service-Deadline'
57SERVICE_ENDPOINT_HEADER = 'X-Google-RPC-Service-Endpoint'
58SERVICE_METHOD_HEADER = 'X-Google-RPC-Service-Method'
59RPC_CONTENT_TYPE = 'application/octet-stream'
60DEFAULT_TIMEOUT = 60
61
62DEADLINE_DELTA_SECONDS = 1
63
64
65
66
67
68MAX_CONCURRENT_API_CALLS = 100
69
70URLLIB3_POOL_COUNT = 10
71
72URLLIB3_POOL_SIZE = 10
73
74
75
76_EXCEPTIONS_MAP = {
77 remote_api_pb2.RpcError.UNKNOWN:
78 (apiproxy_errors.RPCFailedError,
79 'The remote RPC to the application server failed for call %s.%s().'),
80 remote_api_pb2.RpcError.CALL_NOT_FOUND:
81 (apiproxy_errors.CallNotFoundError,
82 'The API package \'%s\' or call \'%s()\' was not found.'),
83 remote_api_pb2.RpcError.PARSE_ERROR:
84 (apiproxy_errors.ArgumentError,
85 'There was an error parsing arguments for API call %s.%s().'),
86 remote_api_pb2.RpcError.OVER_QUOTA:
87 (apiproxy_errors.OverQuotaError,
88 'The API call %s.%s() required more quota than is available.'),
89 remote_api_pb2.RpcError.REQUEST_TOO_LARGE:
90 (apiproxy_errors.RequestTooLargeError,
91 'The request to API call %s.%s() was too large.'),
92 remote_api_pb2.RpcError.CAPABILITY_DISABLED:
93 (apiproxy_errors.CapabilityDisabledError,
94 'The API call %s.%s() is temporarily disabled.'),
95 remote_api_pb2.RpcError.FEATURE_DISABLED:
96 (apiproxy_errors.FeatureNotEnabledError,
97 'The API call %s.%s() is currently not enabled.'),
98 remote_api_pb2.RpcError.RESPONSE_TOO_LARGE:
99 (apiproxy_errors.ResponseTooLargeError,
100 'The response from API call %s.%s() was too large.'),
101 remote_api_pb2.RpcError.CANCELLED:
102 (apiproxy_errors.CancelledError,
103 'The API call %s.%s() was explicitly cancelled.'),
104 remote_api_pb2.RpcError.DEADLINE_EXCEEDED:
105 (apiproxy_errors.DeadlineExceededError,
106 'The API call %s.%s() took too long to respond and was cancelled.')
107}
108
109_DEFAULT_EXCEPTION = _EXCEPTIONS_MAP[remote_api_pb2.RpcError.UNKNOWN]
110
111_DEADLINE_EXCEEDED_EXCEPTION = _EXCEPTIONS_MAP[
112 remote_api_pb2.RpcError.DEADLINE_EXCEEDED]
113
114
115
116
117
118
119class DefaultApiRPC(apiproxy_rpc.RPC):
120 """A class representing an RPC to a remote server."""
121
122 def _ErrorException(self, exception_class, error_details):
123 return exception_class(error_details % (self.package, self.call))
124
125 def _TranslateToError(self, response):
126 """Translates a failed APIResponse into an exception."""
127
128
129 if response.HasField('rpc_error'):
130 code = response.rpc_error.code
131 detail = response.rpc_error.detail
132 exception_type, msg = _EXCEPTIONS_MAP.get(code, _DEFAULT_EXCEPTION)
133 if detail:
134 msg = '%s -- Additional details from server: %s' % (msg, detail)
135 raise self._ErrorException(exception_type, msg)
136
137
138 raise apiproxy_errors.ApplicationError(response.application_error.code,
139 response.application_error.detail)
140
141 def _MakeCallImpl(self):
142 """Makes an asynchronous API call over the service bridge.
143
144 For this to work the following must be set:
145 self.package: the API package name;
146 self.call: the name of the API call/method to invoke;
147 self.request: the API request body as a serialized protocol buffer.
148
149 The actual API call is made by urllib3.request via a thread pool
150 (multiprocessing.dummy.Pool). The thread pool restricts the number of
151 concurrent requests to MAX_CONCURRENT_API_CALLS, so this method will
152 block if that limit is exceeded, until other asynchronous calls resolve.
153
154 If the main thread holds the import lock, waiting on thread work can cause
155 a deadlock:
156 https://docs.python.org/2/library/threading.html#importing-in-threaded-code
157
158 Therefore, we try to detect this error case and fall back to sync calls.
159 """
160 assert self._state == apiproxy_rpc.RPC.IDLE, self._state
161
162
163
164
165
166
167
168
169 if context.READ_FROM_OS_ENVIRON:
170 ticket = os.environ.get(TICKET_HEADER,
171 os.environ.get(DEV_TICKET_HEADER))
172 else:
173
174
175
176 ticket = context.gae_headers.API_TICKET.get(
177 context.gae_headers.DEV_REQUEST_ID.get(None))
178
179 request = remote_api_pb2.Request(
180 service_name=self.package,
181 method=self.call,
182 request_id=ticket,
183 request=self.request.SerializeToString())
184
185 deadline = self.deadline or DEFAULT_TIMEOUT
186
187 body_data = request.SerializeToString()
188 headers = {
189 SERVICE_DEADLINE_HEADER: str(deadline),
190 SERVICE_ENDPOINT_HEADER: SERVICE_ENDPOINT_NAME,
191 SERVICE_METHOD_HEADER: APIHOST_METHOD,
192 'Content-type': RPC_CONTENT_TYPE,
193 }
194
195
196 dapper_header_value = context.get(DAPPER_ENV_KEY)
197 if dapper_header_value:
198 headers[DAPPER_HEADER] = dapper_header_value
199
200
201
202
203
204 api_host = os.environ.get('API_HOST', SERVICE_BRIDGE_HOST)
205 api_port = os.environ.get('API_PORT', API_PORT)
206
207 if ':' in api_host:
208 api_host = '[{}]'.format(api_host)
209 endpoint_url = six.moves.urllib.parse.urlunparse(
210 ('http', '%s:%s' % (api_host, api_port), PROXY_PATH, '', '', ''))
211
212 self._state = apiproxy_rpc.RPC.RUNNING
213
214 request_kwargs = dict(
215 url=endpoint_url,
216 method='POST',
217 timeout=DEADLINE_DELTA_SECONDS + deadline,
218 headers=headers,
219 body=body_data)
220
221
222
223
224
225
226 if six.PY2 and imp.lock_held():
227 self.future = futures.Future()
228 self.future.set_result(self._SendRequestAndFinish(**request_kwargs))
229
230 else:
231
232
233 self.future = self.stub.thread_pool.submit(self._SendRequestAndFinish,
234 **request_kwargs)
235
236 def _WaitImpl(self):
237
238 assert self.future is not None
239 futures.wait([self.future])
240 return True
241
242 def _SendRequest(self, **kwargs):
243 try:
244 response = self.stub.http.request(**kwargs)
245
246 if response.status != 200:
247 raise apiproxy_errors.RPCFailedError(
248 'Proxy returned HTTP status %s %s' %
249 (response.status, response.reason))
250 except urllib3.exceptions.TimeoutError:
251 raise self._ErrorException(*_DEADLINE_EXCEEDED_EXCEPTION)
252 except (urllib3.exceptions.RequestError,
253 urllib3.exceptions.ConnectionError):
254
255 raise self._ErrorException(*_DEFAULT_EXCEPTION)
256
257
258 parsed_response = remote_api_pb2.Response.FromString(response.data)
259
260
261 if (parsed_response.HasField('application_error') or
262 parsed_response.HasField('rpc_error')):
263 raise self._TranslateToError(parsed_response)
264
265
266 self.response.ParseFromString(parsed_response.response)
267
268 def _CaptureTrace(self, f, **kwargs):
269 try:
270 f(**kwargs)
271 except Exception:
272
273
274 _, exc, tb = sys.exc_info()
275 self._exception = exc
276 self._traceback = tb
277
278 def _SendRequestAndFinish(self, **kwargs):
279 try:
280 self._CaptureTrace(self._SendRequest, **kwargs)
281 finally:
282 if self.callback:
283 self._CaptureTrace(self.callback)
284 self._state = apiproxy_rpc.RPC.FINISHING
285
286
287class DefaultApiStub(object):
288 """A stub for calling services through a VM service bridge.
289
290 You can use this to stub out any service that the remote server supports.
291 """
292
293
294 def __init__(self):
295 self.thread_pool = futures.ThreadPoolExecutor(MAX_CONCURRENT_API_CALLS)
296 self.http = urllib3.PoolManager(
297 num_pools=URLLIB3_POOL_COUNT, maxsize=URLLIB3_POOL_SIZE)
298
299 def MakeSyncCall(self, service, call, request, response):
300 """Make a synchronous API call.
301
302 Args:
303 service: The name of the service you are trying to use.
304 call: The name of the method.
305 request: The request protocol buffer
306 response: The response protocol buffer to be filled.
307 """
308 rpc = self.CreateRPC()
309 rpc.MakeCall(service, call, request, response)
310 rpc.Wait()
311 rpc.CheckSuccess()
312
313 def CreateRPC(self):
314 """Create a new RPC object."""
315 return DefaultApiRPC(stub=self)
316
317
318def Register(stub):
319 """Insert stubs so App Engine services are accessed via the service bridge."""
320 apiproxy_stub_map.apiproxy.SetDefaultStub(stub)