blob: d1c45e3fceddbd7e507bc9dfa48860a72b2b26e6 [file] [log] [blame]
# Copyright 2016 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""A set of classes for interacting with tables in SQL."""
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import logging
import random
import re
import sys
import time
import six
from six.moves import queue
import settings
if not settings.unit_test_mode:
import MySQLdb
from framework import exceptions
from framework import framework_helpers
from framework import logger
from infra_libs import ts_mon
class ConnectionPool(object):
"""Manage a set of database connections such that they may be re-used.
"""
def __init__(self, poolsize=1):
self.poolsize = poolsize
self.queues = {}
def get(self, instance, database):
"""Retun a database connection, or throw an exception if none can
be made.
"""
key = instance + '/' + database
if not key in self.queues:
q = queue.Queue(self.poolsize)
self.queues[key] = q
q = self.queues[key]
if q.empty():
cnxn = cnxn_ctor(instance, database)
else:
cnxn = q.get()
# Make sure the connection is still good.
cnxn.ping()
cnxn.commit()
return cnxn
def release(self, cnxn):
if not cnxn.pool_key in self.queues:
raise BaseException('unknown pool key: %s' % cnxn.pool_key)
q = self.queues[cnxn.pool_key]
if q.full():
cnxn.close()
else:
q.put(cnxn)
@framework_helpers.retry(1, delay=1, backoff=2)
def cnxn_ctor(instance, database):
logging.info('About to connect to SQL instance %r db %r', instance, database)
if settings.unit_test_mode:
raise ValueError('unit tests should not need real database connections')
try:
if settings.local_mode:
start_time = time.time()
cnxn = MySQLdb.connect(
host='127.0.0.1', port=3306, db=database, user='root', charset='utf8')
else:
start_time = time.time()
cnxn = MySQLdb.connect(
unix_socket='/cloudsql/' + instance,
db=database,
user='monorail-mysql',
charset='utf8')
duration = int((time.time() - start_time) * 1000)
DB_CNXN_LATENCY.add(duration)
CONNECTION_COUNT.increment({'success': True})
except MySQLdb.OperationalError:
CONNECTION_COUNT.increment({'success': False})
raise
cnxn.pool_key = instance + '/' + database
cnxn.is_bad = False
return cnxn
# One connection pool per database instance (primary, replicas are each an
# instance). We'll have four connections per instance because we fetch
# issue comments, stars, spam verdicts and spam verdict history in parallel
# with promises.
cnxn_pool = ConnectionPool(settings.db_cnxn_pool_size)
# MonorailConnection maintains a dictionary of connections to SQL databases.
# Each is identified by an int shard ID.
# And there is one connection to the primary DB identified by key PRIMARY_CNXN.
PRIMARY_CNXN = 'primary_cnxn'
# When one replica is temporarily unresponseive, we can use a different one.
BAD_SHARD_AVOIDANCE_SEC = 45
CONNECTION_COUNT = ts_mon.CounterMetric(
'monorail/sql/connection_count',
'Count of connections made to the SQL database.',
[ts_mon.BooleanField('success')])
DB_CNXN_LATENCY = ts_mon.CumulativeDistributionMetric(
'monorail/sql/db_cnxn_latency',
'Time needed to establish a DB connection.',
None)
DB_QUERY_LATENCY = ts_mon.CumulativeDistributionMetric(
'monorail/sql/db_query_latency',
'Time needed to make a DB query.',
[ts_mon.StringField('type')])
DB_COMMIT_LATENCY = ts_mon.CumulativeDistributionMetric(
'monorail/sql/db_commit_latency',
'Time needed to make a DB commit.',
None)
DB_ROLLBACK_LATENCY = ts_mon.CumulativeDistributionMetric(
'monorail/sql/db_rollback_latency',
'Time needed to make a DB rollback.',
None)
DB_RETRY_COUNT = ts_mon.CounterMetric(
'monorail/sql/db_retry_count',
'Count of queries retried.',
None)
DB_QUERY_COUNT = ts_mon.CounterMetric(
'monorail/sql/db_query_count',
'Count of queries sent to the DB.',
[ts_mon.StringField('type')])
DB_COMMIT_COUNT = ts_mon.CounterMetric(
'monorail/sql/db_commit_count',
'Count of commits sent to the DB.',
None)
DB_ROLLBACK_COUNT = ts_mon.CounterMetric(
'monorail/sql/db_rollback_count',
'Count of rollbacks sent to the DB.',
None)
DB_RESULT_ROWS = ts_mon.CumulativeDistributionMetric(
'monorail/sql/db_result_rows',
'Number of results returned by a DB query.',
None)
def RandomShardID():
"""Return a random shard ID to load balance across replicas."""
return random.randint(0, settings.num_logical_shards - 1)
class MonorailConnection(object):
"""Create and manage connections to the SQL servers.
We only store connections in the context of a single user request, not
across user requests. The main purpose of this class is to make using
sharded tables easier.
"""
unavailable_shards = {} # {shard_id: timestamp of failed attempt}
def __init__(self):
self.sql_cnxns = {} # {PRIMARY_CNXN: cnxn, shard_id: cnxn, ...}
@framework_helpers.retry(1, delay=0.1, backoff=2)
def GetPrimaryConnection(self):
"""Return a connection to the primary SQL DB."""
if PRIMARY_CNXN not in self.sql_cnxns:
self.sql_cnxns[PRIMARY_CNXN] = cnxn_pool.get(
settings.db_instance, settings.db_database_name)
logging.info(
'created a primary connection %r', self.sql_cnxns[PRIMARY_CNXN])
return self.sql_cnxns[PRIMARY_CNXN]
@framework_helpers.retry(1, delay=0.1, backoff=2)
def GetConnectionForShard(self, shard_id):
"""Return a connection to the DB replica that will be used for shard_id."""
if shard_id not in self.sql_cnxns:
physical_shard_id = shard_id % settings.num_logical_shards
replica_name = settings.db_replica_names[
physical_shard_id % len(settings.db_replica_names)]
shard_instance_name = (
settings.physical_db_name_format % replica_name)
self.unavailable_shards[shard_id] = int(time.time())
self.sql_cnxns[shard_id] = cnxn_pool.get(
shard_instance_name, settings.db_database_name)
del self.unavailable_shards[shard_id]
logging.info('created a replica connection for shard %d', shard_id)
return self.sql_cnxns[shard_id]
def Execute(self, stmt_str, stmt_args, shard_id=None, commit=True, retries=2):
"""Execute the given SQL statement on one of the relevant databases."""
if shard_id is None:
# No shard was specified, so hit the primary.
sql_cnxn = self.GetPrimaryConnection()
else:
if shard_id in self.unavailable_shards:
bad_age_sec = int(time.time()) - self.unavailable_shards[shard_id]
if bad_age_sec < BAD_SHARD_AVOIDANCE_SEC:
logging.info('Avoiding bad replica %r, age %r', shard_id, bad_age_sec)
shard_id = (shard_id + 1) % settings.num_logical_shards
sql_cnxn = self.GetConnectionForShard(shard_id)
try:
return self._ExecuteWithSQLConnection(
sql_cnxn, stmt_str, stmt_args, commit=commit)
except MySQLdb.OperationalError as e:
logging.exception(e)
logging.info('retries: %r', retries)
if retries > 0:
DB_RETRY_COUNT.increment()
self.sql_cnxns = {} # Drop all old mysql connections and make new.
return self.Execute(
stmt_str, stmt_args, shard_id=shard_id, commit=commit,
retries=retries - 1)
else:
raise e
def _ExecuteWithSQLConnection(
self, sql_cnxn, stmt_str, stmt_args, commit=True):
"""Execute a statement on the given database and return a cursor."""
start_time = time.time()
cursor = sql_cnxn.cursor()
cursor.execute('SET NAMES utf8mb4')
if stmt_str.startswith('INSERT') or stmt_str.startswith('REPLACE'):
cursor.executemany(stmt_str, stmt_args)
duration = (time.time() - start_time) * 1000
DB_QUERY_LATENCY.add(duration, {'type': 'write'})
DB_QUERY_COUNT.increment({'type': 'write'})
else:
cursor.execute(stmt_str, args=stmt_args)
duration = (time.time() - start_time) * 1000
DB_QUERY_LATENCY.add(duration, {'type': 'read'})
DB_QUERY_COUNT.increment({'type': 'read'})
DB_RESULT_ROWS.add(cursor.rowcount)
if stmt_str.startswith('INSERT') or stmt_str.startswith('REPLACE'):
formatted_statement = ('%s %s' % (stmt_str, stmt_args)).replace('\n', ' ')
else:
formatted_statement = (stmt_str % tuple(stmt_args)).replace('\n', ' ')
logging.info(
'%d rows in %d ms: %s', cursor.rowcount, int(duration),
formatted_statement)
if duration >= 2000:
logger.log(
{
'log_type': 'database/query',
'statement': formatted_statement[:100000],
'type': formatted_statement.split(' ')[0],
'duration': duration / 1000,
'row_count': cursor.rowcount,
})
if commit and not stmt_str.startswith('SELECT'):
try:
sql_cnxn.commit()
duration = (time.time() - start_time) * 1000
DB_COMMIT_LATENCY.add(duration)
DB_COMMIT_COUNT.increment()
except MySQLdb.DatabaseError:
sql_cnxn.rollback()
duration = (time.time() - start_time) * 1000
DB_ROLLBACK_LATENCY.add(duration)
DB_ROLLBACK_COUNT.increment()
return cursor
def Commit(self):
"""Explicitly commit any pending txns. Normally done automatically."""
sql_cnxn = self.GetPrimaryConnection()
try:
sql_cnxn.commit()
except MySQLdb.DatabaseError:
logging.exception('Commit failed for cnxn, rolling back')
sql_cnxn.rollback()
def Close(self):
"""Safely close any connections that are still open."""
for sql_cnxn in self.sql_cnxns.values():
try:
sql_cnxn.rollback() # Abandon any uncommitted changes.
cnxn_pool.release(sql_cnxn)
except MySQLdb.DatabaseError:
# This might happen if the cnxn is somehow already closed.
logging.exception('ProgrammingError when trying to close cnxn')
class SQLTableManager(object):
"""Helper class to make it easier to deal with an SQL table."""
def __init__(self, table_name):
self.table_name = table_name
def Select(
self, cnxn, distinct=False, cols=None, left_joins=None,
joins=None, where=None, or_where_conds=False, group_by=None,
order_by=None, limit=None, offset=None, shard_id=None, use_clause=None,
having=None, **kwargs):
"""Compose and execute an SQL SELECT statement on this table.
Args:
cnxn: MonorailConnection to the databases.
distinct: If True, add DISTINCT keyword.
cols: List of columns to retrieve, defaults to '*'.
left_joins: List of LEFT JOIN (str, args) pairs.
joins: List of regular JOIN (str, args) pairs.
where: List of (str, args) for WHERE clause.
or_where_conds: Set to True to use OR in the WHERE conds.
group_by: List of strings for GROUP BY clause.
order_by: List of (str, args) for ORDER BY clause.
limit: Optional LIMIT on the number of rows returned.
offset: Optional OFFSET when using LIMIT.
shard_id: Int ID of the shard to query.
use_clause: Optional string USE clause to tell the DB which index to use.
having: List of (str, args) for Optional HAVING clause
**kwargs: WHERE-clause equality and set-membership conditions.
Keyword args are used to build up more WHERE conditions that compare
column values to constants. Key word Argument foo='bar' translates to 'foo
= "bar"', and foo=[3, 4, 5] translates to 'foo IN (3, 4, 5)'.
Returns:
A list of rows, each row is a tuple of values for the requested cols.
"""
cols = cols or ['*'] # If columns not specified, retrieve all columns.
stmt = Statement.MakeSelect(
self.table_name, cols, distinct=distinct,
or_where_conds=or_where_conds)
if use_clause:
stmt.AddUseClause(use_clause)
if having:
stmt.AddHavingTerms(having)
stmt.AddJoinClauses(left_joins or [], left=True)
stmt.AddJoinClauses(joins or [])
stmt.AddWhereTerms(where or [], **kwargs)
stmt.AddGroupByTerms(group_by or [])
stmt.AddOrderByTerms(order_by or [])
stmt.SetLimitAndOffset(limit, offset)
stmt_str, stmt_args = stmt.Generate()
cursor = cnxn.Execute(stmt_str, stmt_args, shard_id=shard_id)
rows = cursor.fetchall()
cursor.close()
return rows
def SelectRow(
self, cnxn, cols=None, default=None, where=None, **kwargs):
"""Run a query that is expected to return just one row."""
rows = self.Select(cnxn, distinct=True, cols=cols, where=where, **kwargs)
if len(rows) == 1:
return rows[0]
elif not rows:
logging.info('SelectRow got 0 results, so using default %r', default)
return default
else:
raise ValueError('SelectRow got %d results, expected only 1', len(rows))
def SelectValue(self, cnxn, col, default=None, where=None, **kwargs):
"""Run a query that is expected to return just one row w/ one value."""
row = self.SelectRow(
cnxn, cols=[col], default=[default], where=where, **kwargs)
return row[0]
def InsertRows(
self, cnxn, cols, row_values, replace=False, ignore=False,
commit=True, return_generated_ids=False):
"""Insert all the given rows.
Args:
cnxn: MonorailConnection object.
cols: List of column names to set.
row_values: List of lists with values to store. The length of each
nested list should be equal to len(cols).
replace: Set to True if inserted values should replace existing DB rows
that have the same DB keys.
ignore: Set to True to ignore rows that would duplicate existing DB keys.
commit: Set to False if this operation is part of a series of operations
that should not be committed until the final one is done.
return_generated_ids: Set to True to return a list of generated
autoincrement IDs for inserted rows. This requires us to insert rows
one at a time.
Returns:
If return_generated_ids is set to True, this method returns a list of the
auto-increment IDs generated by the DB. Otherwise, [] is returned.
"""
if not row_values:
return None # Nothing to insert
generated_ids = []
if return_generated_ids:
# We must insert the rows one-at-a-time to know the generated IDs.
for row_value in row_values:
stmt = Statement.MakeInsert(
self.table_name, cols, [row_value], replace=replace, ignore=ignore)
stmt_str, stmt_args = stmt.Generate()
cursor = cnxn.Execute(stmt_str, stmt_args, commit=commit)
if cursor.lastrowid:
generated_ids.append(cursor.lastrowid)
cursor.close()
return generated_ids
stmt = Statement.MakeInsert(
self.table_name, cols, row_values, replace=replace, ignore=ignore)
stmt_str, stmt_args = stmt.Generate()
cnxn.Execute(stmt_str, stmt_args, commit=commit)
return []
def InsertRow(
self, cnxn, replace=False, ignore=False, commit=True, **kwargs):
"""Insert a single row into the table.
Args:
cnxn: MonorailConnection object.
replace: Set to True if inserted values should replace existing DB rows
that have the same DB keys.
ignore: Set to True to ignore rows that would duplicate existing DB keys.
commit: Set to False if this operation is part of a series of operations
that should not be committed until the final one is done.
**kwargs: column=value assignments to specify what to store in the DB.
Returns:
The generated autoincrement ID of the key column if one was generated.
Otherwise, return None.
"""
cols = sorted(kwargs.keys())
row = tuple(kwargs[col] for col in cols)
generated_ids = self.InsertRows(
cnxn, cols, [row], replace=replace, ignore=ignore,
commit=commit, return_generated_ids=True)
if generated_ids:
return generated_ids[0]
else:
return None
def Update(self, cnxn, delta, where=None, commit=True, limit=None, **kwargs):
"""Update one or more rows.
Args:
cnxn: MonorailConnection object.
delta: Dictionary of {column: new_value} assignments.
where: Optional list of WHERE conditions saying which rows to update.
commit: Set to False if this operation is part of a series of operations
that should not be committed until the final one is done.
limit: Optional LIMIT on the number of rows updated.
**kwargs: WHERE-clause equality and set-membership conditions.
Returns:
Int number of rows updated.
"""
if not delta:
return 0 # Nothing is being changed
stmt = Statement.MakeUpdate(self.table_name, delta)
stmt.AddWhereTerms(where, **kwargs)
stmt.SetLimitAndOffset(limit, None)
stmt_str, stmt_args = stmt.Generate()
cursor = cnxn.Execute(stmt_str, stmt_args, commit=commit)
result = cursor.rowcount
cursor.close()
return result
def IncrementCounterValue(self, cnxn, col_name, where=None, **kwargs):
"""Atomically increment a counter stored in MySQL, return new value.
Args:
cnxn: MonorailConnection object.
col_name: int column to increment.
where: Optional list of WHERE conditions saying which rows to update.
**kwargs: WHERE-clause equality and set-membership conditions. The
where and kwargs together should narrow the update down to exactly
one row.
Returns:
The new, post-increment value of the counter.
"""
stmt = Statement.MakeIncrement(self.table_name, col_name)
stmt.AddWhereTerms(where, **kwargs)
stmt_str, stmt_args = stmt.Generate()
cursor = cnxn.Execute(stmt_str, stmt_args)
assert cursor.rowcount == 1, (
'missing or ambiguous counter: %r' % cursor.rowcount)
result = cursor.lastrowid
cursor.close()
return result
def Delete(self, cnxn, where=None, or_where_conds=False, commit=True,
limit=None, **kwargs):
"""Delete the specified table rows.
Args:
cnxn: MonorailConnection object.
where: Optional list of WHERE conditions saying which rows to update.
or_where_conds: Set to True to use OR in the WHERE conds.
commit: Set to False if this operation is part of a series of operations
that should not be committed until the final one is done.
limit: Optional LIMIT on the number of rows deleted.
**kwargs: WHERE-clause equality and set-membership conditions.
Returns:
Int number of rows updated.
"""
# Deleting the whole table is never intended in Monorail.
assert where or kwargs
stmt = Statement.MakeDelete(self.table_name, or_where_conds=or_where_conds)
stmt.AddWhereTerms(where, **kwargs)
stmt.SetLimitAndOffset(limit, None)
stmt_str, stmt_args = stmt.Generate()
cursor = cnxn.Execute(stmt_str, stmt_args, commit=commit)
result = cursor.rowcount
cursor.close()
return result
class Statement(object):
"""A class to help build complex SQL statements w/ full escaping.
Start with a Make*() method, then fill in additional clauses as needed,
then call Generate() to return the SQL string and argument list. We pass
the string and args to MySQLdb separately so that it can do escaping on
the arg values as appropriate to prevent SQL-injection attacks.
The only values that are not escaped by MySQLdb are the table names
and column names, and bits of SQL syntax, all of which is hard-coded
in our application.
"""
@classmethod
def MakeSelect(cls, table_name, cols, distinct=False, or_where_conds=False):
"""Construct a SELECT statement."""
assert _IsValidTableName(table_name)
assert all(_IsValidColumnName(col) for col in cols)
main_clause = 'SELECT%s %s FROM %s' % (
(' DISTINCT' if distinct else ''), ', '.join(cols), table_name)
return cls(main_clause, or_where_conds=or_where_conds)
@classmethod
def MakeInsert(
cls, table_name, cols, new_values, replace=False, ignore=False):
"""Construct an INSERT statement."""
if replace == True:
return cls.MakeReplace(table_name, cols, new_values, ignore)
assert _IsValidTableName(table_name)
assert all(_IsValidColumnName(col) for col in cols)
ignore_word = ' IGNORE' if ignore else ''
main_clause = 'INSERT%s INTO %s (%s)' % (
ignore_word, table_name, ', '.join(cols))
return cls(main_clause, insert_args=new_values)
@classmethod
def MakeReplace(
cls, table_name, cols, new_values, ignore=False):
"""Construct an INSERT...ON DUPLICATE KEY UPDATE... statement.
Uses the INSERT/UPDATE syntax because REPLACE is literally a DELETE
followed by an INSERT, which doesn't play well with foreign keys.
INSERT/UPDATE is an atomic check of whether the primary key exists,
followed by an INSERT if it doesn't or an UPDATE if it does.
"""
assert _IsValidTableName(table_name)
assert all(_IsValidColumnName(col) for col in cols)
ignore_word = ' IGNORE' if ignore else ''
main_clause = 'INSERT%s INTO %s (%s)' % (
ignore_word, table_name, ', '.join(cols))
return cls(main_clause, insert_args=new_values, duplicate_update_cols=cols)
@classmethod
def MakeUpdate(cls, table_name, delta):
"""Construct an UPDATE statement."""
assert _IsValidTableName(table_name)
assert all(_IsValidColumnName(col) for col in delta.keys())
update_strs = []
update_args = []
for col, val in delta.items():
update_strs.append(col + '=%s')
update_args.append(val)
main_clause = 'UPDATE %s SET %s' % (
table_name, ', '.join(update_strs))
return cls(main_clause, update_args=update_args)
@classmethod
def MakeIncrement(cls, table_name, col_name, step=1):
"""Construct an UPDATE statement that increments and returns a counter."""
assert _IsValidTableName(table_name)
assert _IsValidColumnName(col_name)
main_clause = (
'UPDATE %s SET %s = LAST_INSERT_ID(%s + %%s)' % (
table_name, col_name, col_name))
update_args = [step]
return cls(main_clause, update_args=update_args)
@classmethod
def MakeDelete(cls, table_name, or_where_conds=False):
"""Construct a DELETE statement."""
assert _IsValidTableName(table_name)
main_clause = 'DELETE FROM %s' % table_name
return cls(main_clause, or_where_conds=or_where_conds)
def __init__(
self, main_clause, insert_args=None, update_args=None,
duplicate_update_cols=None, or_where_conds=False):
self.main_clause = main_clause # E.g., SELECT or DELETE
self.or_where_conds = or_where_conds
self.insert_args = insert_args or [] # For INSERT statements
for row_value in self.insert_args:
if not all(_IsValidDBValue(val) for val in row_value):
raise exceptions.InputException('Invalid DB value %r' % (row_value,))
self.update_args = update_args or [] # For UPDATEs
for val in self.update_args:
if not _IsValidDBValue(val):
raise exceptions.InputException('Invalid DB value %r' % val)
self.duplicate_update_cols = duplicate_update_cols or [] # For REPLACE-ish
self.use_clauses = []
self.join_clauses, self.join_args = [], []
self.where_conds, self.where_args = [], []
self.having_conds, self.having_args = [], []
self.group_by_terms, self.group_by_args = [], []
self.order_by_terms, self.order_by_args = [], []
self.limit, self.offset = None, None
def Generate(self):
"""Return an SQL string having %s placeholders and args to fill them in."""
clauses = [self.main_clause] + self.use_clauses + self.join_clauses
if self.where_conds:
if self.or_where_conds:
clauses.append('WHERE ' + '\n OR '.join(self.where_conds))
else:
clauses.append('WHERE ' + '\n AND '.join(self.where_conds))
if self.group_by_terms:
clauses.append('GROUP BY ' + ', '.join(self.group_by_terms))
if self.having_conds:
assert self.group_by_terms
clauses.append('HAVING %s' % ','.join(self.having_conds))
if self.order_by_terms:
clauses.append('ORDER BY ' + ', '.join(self.order_by_terms))
if self.limit and self.offset:
clauses.append('LIMIT %d OFFSET %d' % (self.limit, self.offset))
elif self.limit:
clauses.append('LIMIT %d' % self.limit)
elif self.offset:
clauses.append('LIMIT %d OFFSET %d' % (sys.maxsize, self.offset))
if self.insert_args:
clauses.append('VALUES (' + PlaceHolders(self.insert_args[0]) + ')')
args = self.insert_args
if self.duplicate_update_cols:
clauses.append('ON DUPLICATE KEY UPDATE %s' % (
', '.join(['%s=VALUES(%s)' % (col, col)
for col in self.duplicate_update_cols])))
assert not (self.join_args + self.update_args + self.where_args +
self.group_by_args + self.order_by_args + self.having_args)
else:
args = (self.join_args + self.update_args + self.where_args +
self.group_by_args + self.having_args + self.order_by_args)
assert not (self.insert_args + self.duplicate_update_cols)
args = _BoolsToInts(args)
stmt_str = '\n'.join(clause for clause in clauses if clause)
assert _IsValidStatement(stmt_str), stmt_str
return stmt_str, args
def AddUseClause(self, use_clause):
"""Add a USE clause (giving the DB a hint about which indexes to use)."""
assert _IsValidUseClause(use_clause), use_clause
self.use_clauses.append(use_clause)
def AddJoinClauses(self, join_pairs, left=False):
"""Save JOIN clauses based on the given list of join conditions."""
for join, args in join_pairs:
assert _IsValidJoin(join), join
assert join.count('%s') == len(args), join
self.join_clauses.append(
' %sJOIN %s' % (('LEFT ' if left else ''), join))
self.join_args.extend(args)
def AddGroupByTerms(self, group_by_term_list):
"""Save info needed to generate the GROUP BY clause."""
assert all(_IsValidGroupByTerm(term) for term in group_by_term_list)
self.group_by_terms.extend(group_by_term_list)
def AddOrderByTerms(self, order_by_pairs):
"""Save info needed to generate the ORDER BY clause."""
for term, args in order_by_pairs:
assert _IsValidOrderByTerm(term), term
assert term.count('%s') == len(args), term
self.order_by_terms.append(term)
self.order_by_args.extend(args)
def SetLimitAndOffset(self, limit, offset):
"""Save info needed to generate the LIMIT OFFSET clause."""
self.limit = limit
self.offset = offset
def AddWhereTerms(self, where_cond_pairs, **kwargs):
"""Generate a WHERE clause."""
where_cond_pairs = where_cond_pairs or []
for cond, args in where_cond_pairs:
assert _IsValidWhereCond(cond), cond
assert cond.count('%s') == len(args), cond
self.where_conds.append(cond)
self.where_args.extend(args)
for col, val in sorted(kwargs.items()):
assert _IsValidColumnName(col), col
eq = True
if col.endswith('_not'):
col = col[:-4]
eq = False
if isinstance(val, set):
val = list(val) # MySQL inteface cannot handle sets.
if val is None or val == []:
if val == [] and self.main_clause and self.main_clause.startswith(
'UPDATE'):
# https://crbug.com/monorail/6735: Avoid empty arrays for UPDATE.
raise exceptions.InputException('Invalid update DB value %r' % col)
op = 'IS' if eq else 'IS NOT'
self.where_conds.append(col + ' ' + op + ' NULL')
elif isinstance(val, list):
op = 'IN' if eq else 'NOT IN'
# Sadly, MySQLdb cannot escape lists, so we flatten to multiple "%s"s
self.where_conds.append(
col + ' ' + op + ' (' + PlaceHolders(val) + ')')
self.where_args.extend(val)
else:
op = '=' if eq else '!='
self.where_conds.append(col + ' ' + op + ' %s')
self.where_args.append(val)
def AddHavingTerms(self, having_cond_pairs):
"""Generate a HAVING clause."""
for cond, args in having_cond_pairs:
assert _IsValidHavingCond(cond), cond
assert cond.count('%s') == len(args), cond
self.having_conds.append(cond)
self.having_args.extend(args)
def PlaceHolders(sql_args):
"""Return a comma-separated list of %s placeholders for the given args."""
return ','.join('%s' for _ in sql_args)
TABLE_PAT = '[A-Z][_a-zA-Z0-9]+'
COLUMN_PAT = '[a-z][_a-z]+'
COMPARE_OP_PAT = '(<|>|=|!=|>=|<=|LIKE|NOT LIKE)'
SHORTHAND = {
'table': TABLE_PAT,
'column': COLUMN_PAT,
'tab_col': r'(%s\.)?%s' % (TABLE_PAT, COLUMN_PAT),
'placeholder': '%s', # That's a literal %s that gets passed to MySQLdb
'multi_placeholder': '%s(, ?%s)*',
'compare_op': COMPARE_OP_PAT,
'opt_asc_desc': '( ASC| DESC)?',
'opt_alias': '( AS %s)?' % TABLE_PAT,
'email_cond': (r'\(?'
r'('
r'(LOWER\(Spare\d+\.email\) IS NULL OR )?'
r'LOWER\(Spare\d+\.email\) '
r'(%s %%s|IN \(%%s(, ?%%s)*\))'
r'( (AND|OR) )?'
r')+'
r'\)?' % COMPARE_OP_PAT),
'hotlist_cond': (r'\(?'
r'('
r'(LOWER\(Cond\d+\.name\) IS NULL OR )?'
r'LOWER\(Cond\d+\.name\) '
r'(%s %%s|IN \(%%s(, ?%%s)*\))'
r'( (AND|OR) )?'
r')+'
r'\)?' % COMPARE_OP_PAT),
'phase_cond': (r'\(?'
r'('
r'(LOWER\(Phase\d+\.name\) IS NULL OR )?'
r'LOWER\(Phase\d+\.name\) '
r'(%s %%s|IN \(%%s(, ?%%s)*\))?'
r'( (AND|OR) )?'
r')+'
r'\)?' % COMPARE_OP_PAT),
'approval_cond': (r'\(?'
r'('
r'(LOWER\(Cond\d+\.status\) IS NULL OR )?'
r'LOWER\(Cond\d+\.status\) '
r'(%s %%s|IN \(%%s(, ?%%s)*\))'
r'( (AND|OR) )?'
r')+'
r'\)?' % COMPARE_OP_PAT),
}
def _MakeRE(regex_str):
"""Return a regular expression object, expanding our shorthand as needed."""
return re.compile(regex_str.format(**SHORTHAND))
TABLE_RE = _MakeRE('^{table}$')
TAB_COL_RE = _MakeRE('^{tab_col}$')
USE_CLAUSE_RE = _MakeRE(
r'^USE INDEX \({column}\) USE INDEX FOR ORDER BY \({column}\)$')
HAVING_RE_LIST = [
_MakeRE(r'^COUNT\(\*\) {compare_op} {placeholder}$')]
COLUMN_RE_LIST = [
TAB_COL_RE,
_MakeRE(r'\*'),
_MakeRE(r'COUNT\(\*\)'),
_MakeRE(r'COUNT\({tab_col}\)'),
_MakeRE(r'COUNT\(DISTINCT\({tab_col}\)\)'),
_MakeRE(r'MAX\({tab_col}\)'),
_MakeRE(r'MIN\({tab_col}\)'),
_MakeRE(r'GROUP_CONCAT\((DISTINCT )?{tab_col}( ORDER BY {tab_col})?' \
r'( SEPARATOR \'.*\')?\)'),
]
JOIN_RE_LIST = [
TABLE_RE,
_MakeRE(
r'^{table}{opt_alias} ON {tab_col} = {tab_col}'
r'( AND {tab_col} = {tab_col})?'
r'( AND {tab_col} IN \({multi_placeholder}\))?$'),
_MakeRE(
r'^{table}{opt_alias} ON {tab_col} = {tab_col}'
r'( AND {tab_col} = {tab_col})?'
r'( AND {tab_col} = {placeholder})?'
r'( AND {tab_col} IN \({multi_placeholder}\))?'
r'( AND {tab_col} = {tab_col})?$'),
_MakeRE(
r'^{table}{opt_alias} ON {tab_col} = {tab_col}'
r'( AND {tab_col} = {tab_col})?'
r'( AND {tab_col} = {placeholder})?'
r'( AND {tab_col} IN \({multi_placeholder}\))?'
r'( AND {tab_col} IS NULL)?'
r'( AND \({tab_col} IS NULL'
r' OR {tab_col} NOT IN \({multi_placeholder}\)\))?$'),
_MakeRE(
r'^{table}{opt_alias} ON {tab_col} = {tab_col}'
r'( AND {tab_col} = {tab_col})?'
r'( AND {tab_col} = {placeholder})?'
r' AND \(?{tab_col} {compare_op} {placeholder}\)?'
r'( AND {tab_col} = {tab_col})?$'),
_MakeRE(
r'^{table}{opt_alias} ON {tab_col} = {tab_col}'
r'( AND {tab_col} = {tab_col})?'
r'( AND {tab_col} = {placeholder})?'
r' AND {tab_col} = {tab_col}$'),
_MakeRE(
r'^{table}{opt_alias} ON {tab_col} = {tab_col}'
r'( AND {tab_col} = {tab_col})?'
r'( AND {tab_col} = {placeholder})?'
r' AND \({tab_col} IS NULL OR'
r' {tab_col} {compare_op} {placeholder}\)$'),
_MakeRE(
r'^{table}{opt_alias} ON {tab_col} = {tab_col}'
r' AND \({tab_col} IS NOT NULL AND {tab_col} != {placeholder}\)'),
_MakeRE(
r'^{table}{opt_alias} ON {tab_col} = {tab_col}'
r' AND LOWER\({tab_col}\) = LOWER\({placeholder}\)'),
_MakeRE(
r'^{table}{opt_alias} ON {tab_col} = {tab_col} AND {email_cond}$'),
_MakeRE(
r'^{table}{opt_alias} ON {email_cond}$'),
_MakeRE(
r'^{table}{opt_alias} ON '
r'\({tab_col} = {tab_col} OR {tab_col} = {tab_col}\)$'),
_MakeRE(
r'^\({table} AS {table} JOIN User AS {table} '
r'ON {tab_col} = {tab_col} AND {email_cond}\) '
r'ON Issue(Snapshot)?.id = {tab_col}'
r'( AND {tab_col} IS NULL)?'),
_MakeRE(
r'^\({table} JOIN Hotlist AS {table} '
r'ON {tab_col} = {tab_col} AND {hotlist_cond}\) '
r'ON Issue.id = {tab_col}?'),
_MakeRE(
r'^\({table} AS {table} JOIN IssuePhaseDef AS {table} '
r'ON {tab_col} = {tab_col} AND {phase_cond}\) '
r'ON Issue.id = {tab_col}?'),
_MakeRE(
r'^IssuePhaseDef AS {table} ON {phase_cond}'),
_MakeRE(
r'^Issue2ApprovalValue AS {table} ON {tab_col} = {tab_col} '
r'AND {tab_col} = {placeholder} AND {approval_cond}'),
_MakeRE(
r'^{table} AS {table} ON {tab_col} = {tab_col} '
r'LEFT JOIN {table} AS {table} ON {tab_col} = {tab_col}'),
]
ORDER_BY_RE_LIST = [
_MakeRE(r'^{tab_col}{opt_asc_desc}$'),
_MakeRE(r'^LOWER\({tab_col}\){opt_asc_desc}$'),
_MakeRE(r'^ISNULL\({tab_col}\){opt_asc_desc}$'),
_MakeRE(r'^\(ISNULL\({tab_col}\) AND ISNULL\({tab_col}\)\){opt_asc_desc}$'),
_MakeRE(r'^FIELD\({tab_col}, {multi_placeholder}\){opt_asc_desc}$'),
_MakeRE(r'^FIELD\(IF\(ISNULL\({tab_col}\), {tab_col}, {tab_col}\), '
r'{multi_placeholder}\){opt_asc_desc}$'),
_MakeRE(r'^CONCAT\({tab_col}, {tab_col}\){opt_asc_desc}$'),
]
GROUP_BY_RE_LIST = [
TAB_COL_RE,
]
WHERE_COND_RE_LIST = [
_MakeRE(r'^TRUE$'),
_MakeRE(r'^FALSE$'),
_MakeRE(r'^{tab_col} IS NULL$'),
_MakeRE(r'^{tab_col} IS NOT NULL$'),
_MakeRE(r'^{tab_col} {compare_op} {tab_col}$'),
_MakeRE(r'^{tab_col} {compare_op} {placeholder}$'),
_MakeRE(r'^{tab_col} %% {placeholder} = {placeholder}$'),
_MakeRE(r'^{tab_col} IN \({multi_placeholder}\)$'),
_MakeRE(r'^{tab_col} NOT IN \({multi_placeholder}\)$'),
_MakeRE(r'^LOWER\({tab_col}\) IS NULL$'),
_MakeRE(r'^LOWER\({tab_col}\) IS NOT NULL$'),
_MakeRE(r'^LOWER\({tab_col}\) {compare_op} {placeholder}$'),
_MakeRE(r'^LOWER\({tab_col}\) IN \({multi_placeholder}\)$'),
_MakeRE(r'^LOWER\({tab_col}\) NOT IN \({multi_placeholder}\)$'),
_MakeRE(r'^LOWER\({tab_col}\) LIKE {placeholder}$'),
_MakeRE(r'^LOWER\({tab_col}\) NOT LIKE {placeholder}$'),
_MakeRE(
r'^timestep < \(SELECT MAX\(j.timestep\) FROM Invalidate AS j '
r'WHERE j.kind = %s '
r'AND j.cache_key = Invalidate.cache_key\)$'),
_MakeRE(
r'^\({tab_col} IS NULL OR {tab_col} {compare_op} {placeholder}\) '
r'AND \({tab_col} IS NULL OR {tab_col} {compare_op} {placeholder}'
r'\)$'),
_MakeRE(
r'^\({tab_col} IS NOT NULL AND {tab_col} {compare_op} '
r'{placeholder}\) OR \({tab_col} IS NOT NULL AND {tab_col} '
r'{compare_op} {placeholder}\)$'),
]
# Note: We never use ';' for multiple statements, '@' for SQL variables, or
# any quoted strings in stmt_str (quotes are put in my MySQLdb for args).
STMT_STR_RE = re.compile(
r'\A(SELECT|UPDATE|DELETE|INSERT|REPLACE) [\'-+=!<>%*.,()\w\s]+\Z',
re.MULTILINE)
def _IsValidDBValue(val):
if isinstance(val, six.string_types):
return '\x00' not in val
return True
def _IsValidTableName(table_name):
return TABLE_RE.match(table_name)
def _IsValidColumnName(column_expr):
return any(regex.match(column_expr) for regex in COLUMN_RE_LIST)
def _IsValidUseClause(use_clause):
return USE_CLAUSE_RE.match(use_clause)
def _IsValidHavingCond(cond):
if cond.startswith('(') and cond.endswith(')'):
cond = cond[1:-1]
if ' OR ' in cond:
return all(_IsValidHavingCond(c) for c in cond.split(' OR '))
if ' AND ' in cond:
return all(_IsValidHavingCond(c) for c in cond.split(' AND '))
return any(regex.match(cond) for regex in HAVING_RE_LIST)
def _IsValidJoin(join):
return any(regex.match(join) for regex in JOIN_RE_LIST)
def _IsValidOrderByTerm(term):
return any(regex.match(term) for regex in ORDER_BY_RE_LIST)
def _IsValidGroupByTerm(term):
return any(regex.match(term) for regex in GROUP_BY_RE_LIST)
def _IsValidWhereCond(cond):
if cond.startswith('NOT '):
cond = cond[4:]
if cond.startswith('(') and cond.endswith(')'):
cond = cond[1:-1]
if any(regex.match(cond) for regex in WHERE_COND_RE_LIST):
return True
if ' OR ' in cond:
return all(_IsValidWhereCond(c) for c in cond.split(' OR '))
if ' AND ' in cond:
return all(_IsValidWhereCond(c) for c in cond.split(' AND '))
return False
def _IsValidStatement(stmt_str):
"""Final check to make sure there is no funny junk sneaking in somehow."""
return (STMT_STR_RE.match(stmt_str) and
'--' not in stmt_str)
def _BoolsToInts(arg_list):
"""Convert any True values to 1s and Falses to 0s.
Google's copy of MySQLdb has bool-to-int conversion disabled,
and yet it seems to be needed otherwise they are converted
to strings and always interpreted as 0 (which is FALSE).
Args:
arg_list: (nested) list of SQL statment argument values, which may
include some boolean values.
Returns:
The same list, but with True replaced by 1 and False replaced by 0.
"""
result = []
for arg in arg_list:
if isinstance(arg, (list, tuple)):
result.append(_BoolsToInts(arg))
elif arg is True:
result.append(1)
elif arg is False:
result.append(0)
else:
result.append(arg)
return result