blob: 52a25d707cf58db064df5782e9d441752c2dd092 [file] [log] [blame]
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file or at
# https://developers.google.com/open-source/licenses/bsd
"""Some utility classes for interacting with templates."""
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import
import cgi
import cStringIO
import httplib
import logging
import time
import types
import ezt
import six
from protorpc import messages
import settings
from framework import framework_constants
_DISPLAY_VALUE_TRAILING_CHARS = 8
_DISPLAY_VALUE_TIP_CHARS = 120
class PBProxy(object):
"""Wraps a Protocol Buffer so it is easy to acceess from a template."""
def __init__(self, pb):
self.__pb = pb
def __getattr__(self, name):
"""Make the getters template friendly.
Psudo-hack alert: When attributes end with _bool, they are converted in
to EZT style bools. I.e., if false return None, if true return True.
Args:
name: the name of the attribute to get.
Returns:
The value of that attribute (as an EZT bool if the name ends with _bool).
"""
if name.endswith('_bool'):
bool_name = name
name = name[0:-5]
else:
bool_name = None
# Make it possible for a PBProxy-local attribute to override the protocol
# buffer field, or even to allow attributes to be added to the PBProxy that
# the protocol buffer does not even have.
if name in self.__dict__:
if callable(self.__dict__[name]):
val = self.__dict__[name]()
else:
val = self.__dict__[name]
if bool_name:
return ezt.boolean(val)
return val
if bool_name:
# return an ezt.boolean for the named field.
return ezt.boolean(getattr(self.__pb, name))
val = getattr(self.__pb, name)
if isinstance(val, messages.Enum):
return int(val) # TODO(jrobbins): use str() instead
if isinstance(val, messages.Message):
return PBProxy(val)
# Return a list of values whose Message entries
# have been wrapped in PBProxies.
if isinstance(val, (list, messages.FieldList)):
list_to_return = []
for v in val:
if isinstance(v, messages.Message):
list_to_return.append(PBProxy(v))
else:
list_to_return.append(v)
return list_to_return
return val
def DebugString(self):
"""Return a string representation that is useful in debugging."""
return 'PBProxy(%s)' % self.__pb
def __eq__(self, other):
# Disable warning about accessing other.__pb.
# pylint: disable=protected-access
return isinstance(other, PBProxy) and self.__pb == other.__pb
_templates = {}
def GetTemplate(
template_path, compress_whitespace=True, eliminate_blank_lines=False,
base_format=ezt.FORMAT_HTML):
"""Make a MonorailTemplate if needed, or reuse one if possible."""
key = template_path, compress_whitespace, base_format
if key in _templates:
return _templates[key]
template = MonorailTemplate(
template_path, compress_whitespace=compress_whitespace,
eliminate_blank_lines=eliminate_blank_lines, base_format=base_format)
_templates[key] = template
return template
class cStringIOUnicodeWrapper(object):
"""Wrapper on cStringIO.StringIO that encodes unicode as UTF-8 as it goes."""
def __init__(self):
self.buffer = cStringIO.StringIO()
def write(self, s):
if isinstance(s, six.text_type):
utf8_s = s.encode('utf-8')
else:
utf8_s = s
self.buffer.write(utf8_s)
def getvalue(self):
return self.buffer.getvalue()
SNIFFABLE_PATTERNS = {
'%PDF-': '%NoNoNo-',
}
class MonorailTemplate(object):
"""A template with additional functionality."""
def __init__(self, template_path, compress_whitespace=True,
eliminate_blank_lines=False, base_format=ezt.FORMAT_HTML):
self.template_path = template_path
self.template = None
self.compress_whitespace = compress_whitespace
self.base_format = base_format
self.eliminate_blank_lines = eliminate_blank_lines
def WriteResponse(self, response, data, content_type=None):
"""Write the parsed and filled in template to http server."""
if content_type:
response.content_type = content_type
response.status = data.get('http_response_code', httplib.OK)
whole_page = self.GetResponse(data)
if data.get('prevent_sniffing'):
for sniff_pattern, sniff_replacement in SNIFFABLE_PATTERNS.items():
whole_page = whole_page.replace(sniff_pattern, sniff_replacement)
start = time.time()
response.write(whole_page)
logging.info('wrote response in %dms', int((time.time() - start) * 1000))
def WriteFlaskResponse(self, response, data, content_type=None):
"""Write the parsed and filled in template to http server."""
if content_type:
response.content_type = content_type
response.status_code = data.get('http_response_code', httplib.OK)
whole_page = self.GetResponse(data)
if data.get('prevent_sniffing'):
for sniff_pattern, sniff_replacement in SNIFFABLE_PATTERNS.items():
whole_page = whole_page.replace(sniff_pattern, sniff_replacement)
start = time.time()
response.response = whole_page
logging.info('wrote response in %dms', int((time.time() - start) * 1000))
def GetResponse(self, data):
"""Generate the text from the template and return it as a string."""
template = self.GetTemplate()
start = time.time()
buf = cStringIOUnicodeWrapper()
template.generate(buf, data)
whole_page = buf.getvalue()
logging.info('rendering took %dms', int((time.time() - start) * 1000))
logging.info('whole_page len is %r', len(whole_page))
if self.eliminate_blank_lines:
lines = whole_page.split('\n')
whole_page = '\n'.join(line for line in lines if line.strip())
logging.info('smaller whole_page len is %r', len(whole_page))
logging.info('smaller rendering took %dms',
int((time.time() - start) * 1000))
return whole_page
def GetTemplate(self):
"""Parse the EZT template, or return an already parsed one."""
# We don't operate directly on self.template to avoid races.
template = self.template
if template is None or settings.local_mode:
start = time.time()
template = ezt.Template(
fname=self.template_path,
compress_whitespace=self.compress_whitespace,
base_format=self.base_format)
logging.info('parsed in %dms', int((time.time() - start) * 1000))
self.template = template
return template
def GetTemplatePath(self):
"""Accessor for the template path specified in the constructor.
Returns:
The string path for the template file provided to the constructor.
"""
return self.template_path
class EZTError(object):
"""This class is a helper class to pass errors to EZT.
This class is used to hold information that will be passed to EZT but might
be unset. All unset values return None (ie EZT False)
Example: page errors
"""
def __getattr__(self, _name):
"""This is the EZT retrieval function."""
return None
def AnyErrors(self):
return len(self.__dict__) != 0
def DebugString(self):
return 'EZTError(%s)' % self.__dict__
def SetError(self, name, value):
self.__setattr__(name, value)
def SetCustomFieldError(self, field_id, value):
# This access works because of the custom __getattr__.
# pylint: disable=access-member-before-definition
# pylint: disable=attribute-defined-outside-init
if self.custom_fields is None:
self.custom_fields = []
self.custom_fields.append(EZTItem(field_id=field_id, message=value))
any_errors = property(AnyErrors, None)
def FitUnsafeText(text, length):
"""Trim some unsafe (unescaped) text to a specific length.
Three periods are appended if trimming occurs. Note that we cannot use
the ellipsis character (&hellip) because this is unescaped text.
Args:
text: the string to fit (ASCII or unicode).
length: the length to trim to.
Returns:
An ASCII or unicode string fitted to the given length.
"""
if not text:
return ""
if len(text) <= length:
return text
return text[:length] + '...'
def BytesKbOrMb(num_bytes):
"""Return a human-readable string representation of a number of bytes."""
if num_bytes < 1024:
return '%d bytes' % num_bytes # e.g., 128 bytes
if num_bytes < 99 * 1024:
return '%.1f KB' % (num_bytes / 1024.0) # e.g. 23.4 KB
if num_bytes < 1024 * 1024:
return '%d KB' % (num_bytes / 1024) # e.g., 219 KB
if num_bytes < 99 * 1024 * 1024:
return '%.1f MB' % (num_bytes / 1024.0 / 1024.0) # e.g., 21.9 MB
return '%d MB' % (num_bytes / 1024 / 1024) # e.g., 100 MB
class EZTItem(object):
"""A class that makes a collection of fields easily accessible in EZT."""
def __init__(self, **kwargs):
"""Store all the given key-value pairs as fields of this object."""
vars(self).update(kwargs)
def __repr__(self):
fields = ', '.join('%r: %r' % (k, v) for k, v in
sorted(vars(self).items()))
return '%s({%s})' % (self.__class__.__name__, fields)
def __eq__(self, other):
return self.__dict__ == other.__dict__
def ExpandLabels(page_data):
"""If page_data has a 'labels' list, expand it into 'label1', etc.
Args:
page_data: Template data which may include a 'labels' field.
"""
label_list = page_data.get('labels', [])
if isinstance(label_list, types.StringTypes):
label_list = [label.strip() for label in page_data['labels'].split(',')]
for i in range(len(label_list)):
page_data['label%d' % i] = label_list[i]
for i in range(len(label_list), framework_constants.MAX_LABELS):
page_data['label%d' % i] = ''
class TextRun(object):
"""A fragment of user-entered text that needs to be safely displyed."""
def __init__(self, content, tag=None, href=None):
self.content = content
self.tag = tag
self.href = href
self.title = None
self.css_class = None
def FormatForHTMLEmail(self):
"""Return a string that can be used in an HTML email body."""
if self.tag == 'a' and self.href:
return '<a href="%s">%s</a>' % (
cgi.escape(self.href, quote=True),
cgi.escape(self.content, quote=True))
return cgi.escape(self.content, quote=True)