blob: 9a62cb9fef0aa55b11dbb33125abc7cb5f73a03a [file] [log] [blame]
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file or at
# https://developers.google.com/open-source/licenses/bsd
""" Set of functions for detaling with spam reports.
"""
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import collections
import logging
import settings
import sys
from collections import defaultdict
from features import filterrules_helpers
from framework import sql
from framework import framework_constants
from infra_libs import ts_mon
from services import ml_helpers
SPAMREPORT_TABLE_NAME = 'SpamReport'
SPAMVERDICT_TABLE_NAME = 'SpamVerdict'
ISSUE_TABLE = 'Issue'
REASON_MANUAL = 'manual'
REASON_THRESHOLD = 'threshold'
REASON_CLASSIFIER = 'classifier'
REASON_FAIL_OPEN = 'fail_open'
SPAM_CLASS_LABEL = '1'
SPAMREPORT_ISSUE_COLS = ['issue_id', 'reported_user_id', 'user_id']
SPAMVERDICT_ISSUE_COL = ['created', 'content_created', 'user_id',
'reported_user_id', 'comment_id', 'issue_id']
MANUALVERDICT_ISSUE_COLS = ['user_id', 'issue_id', 'is_spam', 'reason',
'project_id']
THRESHVERDICT_ISSUE_COLS = ['issue_id', 'is_spam', 'reason', 'project_id']
SPAMREPORT_COMMENT_COLS = ['comment_id', 'reported_user_id', 'user_id']
MANUALVERDICT_COMMENT_COLS = ['user_id', 'comment_id', 'is_spam', 'reason',
'project_id']
THRESHVERDICT_COMMENT_COLS = ['comment_id', 'is_spam', 'reason', 'project_id']
class SpamService(object):
"""The persistence layer for spam reports."""
issue_actions = ts_mon.CounterMetric(
'monorail/spam_svc/issue', 'Count of things that happen to issues.', [
ts_mon.StringField('type'),
ts_mon.StringField('reporter_id'),
ts_mon.StringField('issue')
])
comment_actions = ts_mon.CounterMetric(
'monorail/spam_svc/comment', 'Count of things that happen to comments.', [
ts_mon.StringField('type'),
ts_mon.StringField('reporter_id'),
ts_mon.StringField('issue'),
ts_mon.StringField('comment_id')
])
ml_engine_failures = ts_mon.CounterMetric(
'monorail/spam_svc/ml_engine_failure',
'Failures calling the ML Engine API',
None)
def __init__(self):
self.report_tbl = sql.SQLTableManager(SPAMREPORT_TABLE_NAME)
self.verdict_tbl = sql.SQLTableManager(SPAMVERDICT_TABLE_NAME)
self.issue_tbl = sql.SQLTableManager(ISSUE_TABLE)
# ML Engine library is lazy loaded below.
self.ml_engine = None
def LookupIssuesFlaggers(self, cnxn, issue_ids):
"""Returns users who've reported the issues or their comments as spam.
Returns a dictionary {issue_id: (issue_reporters, comment_reporters)}
issue_reportes is a list of users who flagged the issue;
comment_reporters element is a dictionary {comment_id: [user_ids]} where
user_ids are the users who flagged that comment.
"""
rows = self.report_tbl.Select(
cnxn, cols=['issue_id', 'user_id', 'comment_id'],
issue_id=issue_ids)
reporters = collections.defaultdict(
# Return a tuple of (issue_reporters, comment_reporters) as described
# above.
lambda: ([], collections.defaultdict(list)))
for row in rows:
issue_id = int(row[0])
user_id = row[1]
if row[2]:
comment_id = row[2]
reporters[issue_id][1][comment_id].append(user_id)
else:
reporters[issue_id][0].append(user_id)
return reporters
def LookupIssueFlaggers(self, cnxn, issue_id):
"""Returns users who've reported the issue or its comments as spam.
Returns a tuple. First element is a list of users who flagged the issue;
second element is a dictionary of comment id to a list of users who flagged
that comment.
"""
return self.LookupIssuesFlaggers(cnxn, [issue_id])[issue_id]
def LookupIssueFlagCounts(self, cnxn, issue_ids):
"""Returns a map of issue_id to flag counts"""
rows = self.report_tbl.Select(cnxn, cols=['issue_id', 'COUNT(*)'],
issue_id=issue_ids, group_by=['issue_id'])
counts = {}
for row in rows:
counts[int(row[0])] = row[1]
return counts
def LookupIssueVerdicts(self, cnxn, issue_ids):
"""Returns a map of issue_id to most recent spam verdicts"""
rows = self.verdict_tbl.Select(cnxn,
cols=['issue_id', 'reason', 'MAX(created)'],
issue_id=issue_ids, comment_id=None,
group_by=['issue_id'])
counts = {}
for row in rows:
counts[int(row[0])] = row[1]
return counts
def LookupIssueVerdictHistory(self, cnxn, issue_ids):
"""Returns a map of issue_id to most recent spam verdicts"""
rows = self.verdict_tbl.Select(cnxn, cols=[
'issue_id', 'reason', 'created', 'is_spam', 'classifier_confidence',
'user_id', 'overruled'],
issue_id=issue_ids, order_by=[('issue_id', []), ('created', [])])
# TODO: group by issue_id, make class instead of dict for verdict.
verdicts = []
for row in rows:
verdicts.append({
'issue_id': row[0],
'reason': row[1],
'created': row[2],
'is_spam': row[3],
'classifier_confidence': row[4],
'user_id': row[5],
'overruled': row[6],
})
return verdicts
def LookupCommentVerdictHistory(self, cnxn, comment_ids):
"""Returns a map of issue_id to most recent spam verdicts"""
rows = self.verdict_tbl.Select(cnxn, cols=[
'comment_id', 'reason', 'created', 'is_spam', 'classifier_confidence',
'user_id', 'overruled'],
comment_id=comment_ids, order_by=[('comment_id', []), ('created', [])])
# TODO: group by comment_id, make class instead of dict for verdict.
verdicts = []
for row in rows:
verdicts.append({
'comment_id': row[0],
'reason': row[1],
'created': row[2],
'is_spam': row[3],
'classifier_confidence': row[4],
'user_id': row[5],
'overruled': row[6],
})
return verdicts
def FlagIssues(self, cnxn, issue_service, issues, reporting_user_id,
flagged_spam):
"""Creates or deletes a spam report on an issue."""
verdict_updates = []
if flagged_spam:
rows = [(issue.issue_id, issue.reporter_id, reporting_user_id)
for issue in issues]
self.report_tbl.InsertRows(cnxn, SPAMREPORT_ISSUE_COLS, rows,
ignore=True)
else:
issue_ids = [issue.issue_id for issue in issues]
self.report_tbl.Delete(
cnxn, issue_id=issue_ids, user_id=reporting_user_id,
comment_id=None)
project_id = issues[0].project_id
# Now record new verdicts and update issue.is_spam, if they've changed.
ids = [issue.issue_id for issue in issues]
counts = self.LookupIssueFlagCounts(cnxn, ids)
previous_verdicts = self.LookupIssueVerdicts(cnxn, ids)
for issue_id in counts:
# If the flag counts changed enough to toggle the is_spam bit, need to
# record a new verdict and update the Issue.
# No number of user spam flags can overturn an admin's verdict.
if previous_verdicts.get(issue_id) == REASON_MANUAL:
continue
# If enough spam flags come in, mark the issue as spam.
if (flagged_spam and counts[issue_id] >= settings.spam_flag_thresh):
verdict_updates.append(issue_id)
if len(verdict_updates) == 0:
return
# Some of the issues may have exceed the flag threshold, so issue verdicts
# and mark as spam in those cases.
rows = [(issue_id, flagged_spam, REASON_THRESHOLD, project_id)
for issue_id in verdict_updates]
self.verdict_tbl.InsertRows(cnxn, THRESHVERDICT_ISSUE_COLS, rows,
ignore=True)
update_issues = []
for issue in issues:
if issue.issue_id in verdict_updates:
issue.is_spam = flagged_spam
update_issues.append(issue)
if flagged_spam:
for issue in update_issues:
issue_ref = '%s:%s' % (issue.project_name, issue.local_id)
self.issue_actions.increment(
{
'type': 'flag',
'reporter_id': str(reporting_user_id),
'issue': issue_ref
})
issue_service.UpdateIssues(cnxn, update_issues, update_cols=['is_spam'])
def FlagComment(
self, cnxn, issue, comment_id, reported_user_id, reporting_user_id,
flagged_spam):
"""Creates or deletes a spam report on a comment."""
# TODO(seanmccullough): Bulk comment flagging? There's no UI for that.
if flagged_spam:
self.report_tbl.InsertRow(
cnxn,
ignore=True,
issue_id=issue.issue_id,
comment_id=comment_id,
reported_user_id=reported_user_id,
user_id=reporting_user_id)
issue_ref = '%s:%s' % (issue.project_name, issue.local_id)
self.comment_actions.increment(
{
'type': 'flag',
'reporter_id': str(reporting_user_id),
'issue': issue_ref,
'comment_id': str(comment_id)
})
else:
self.report_tbl.Delete(
cnxn,
issue_id=issue.issue_id,
comment_id=comment_id,
user_id=reporting_user_id)
def RecordClassifierIssueVerdict(self, cnxn, issue, is_spam, confidence,
fail_open):
reason = REASON_FAIL_OPEN if fail_open else REASON_CLASSIFIER
self.verdict_tbl.InsertRow(cnxn, issue_id=issue.issue_id, is_spam=is_spam,
reason=reason, classifier_confidence=confidence,
project_id=issue.project_id)
if is_spam:
issue_ref = '%s:%s' % (issue.project_name, issue.local_id)
self.issue_actions.increment(
{
'type': 'classifier',
'reporter_id': 'classifier',
'issue': issue_ref
})
# This is called at issue creation time, so there's nothing else to do here.
def RecordManualIssueVerdicts(self, cnxn, issue_service, issues, user_id,
is_spam):
rows = [(user_id, issue.issue_id, is_spam, REASON_MANUAL, issue.project_id)
for issue in issues]
issue_ids = [issue.issue_id for issue in issues]
# Overrule all previous verdicts.
self.verdict_tbl.Update(cnxn, {'overruled': True}, [
('issue_id IN (%s)' % sql.PlaceHolders(issue_ids), issue_ids)
], commit=False)
self.verdict_tbl.InsertRows(cnxn, MANUALVERDICT_ISSUE_COLS, rows,
ignore=True)
for issue in issues:
issue.is_spam = is_spam
if is_spam:
for issue in issues:
issue_ref = '%s:%s' % (issue.project_name, issue.local_id)
self.issue_actions.increment(
{
'type': 'manual',
'reporter_id': str(user_id),
'issue': issue_ref
})
else:
issue_service.AllocateNewLocalIDs(cnxn, issues)
# This will commit the transaction.
issue_service.UpdateIssues(cnxn, issues, update_cols=['is_spam'])
def RecordManualCommentVerdict(self, cnxn, issue_service, user_service,
comment_id, user_id, is_spam):
# TODO(seanmccullough): Bulk comment verdicts? There's no UI for that.
self.verdict_tbl.InsertRow(cnxn, ignore=True,
user_id=user_id, comment_id=comment_id, is_spam=is_spam,
reason=REASON_MANUAL)
comment = issue_service.GetComment(cnxn, comment_id)
comment.is_spam = is_spam
issue = issue_service.GetIssue(cnxn, comment.issue_id, use_cache=False)
issue_service.SoftDeleteComment(
cnxn, issue, comment, user_id, user_service, is_spam, True, is_spam)
if is_spam:
issue_ref = '%s:%s' % (issue.project_name, issue.local_id)
self.comment_actions.increment(
{
'type': 'manual',
'reporter_id': str(user_id),
'issue': issue_ref,
'comment_id': str(comment_id)
})
def RecordClassifierCommentVerdict(
self, cnxn, issue_service, comment, is_spam, confidence, fail_open):
reason = REASON_FAIL_OPEN if fail_open else REASON_CLASSIFIER
self.verdict_tbl.InsertRow(cnxn, comment_id=comment.id, is_spam=is_spam,
reason=reason, classifier_confidence=confidence,
project_id=comment.project_id)
if is_spam:
issue = issue_service.GetIssue(cnxn, comment.issue_id, use_cache=False)
issue_ref = '%s:%s' % (issue.project_name, issue.local_id)
self.comment_actions.increment(
{
'type': 'classifier',
'reporter_id': 'classifier',
'issue': issue_ref,
'comment_id': str(comment.id)
})
def _predict(self, instance):
"""Requests a prediction from the ML Engine API.
Sample API response:
{'predictions': [{
'classes': ['0', '1'],
'scores': [0.4986788034439087, 0.5013211965560913]
}]}
This hits the default model.
Returns:
A floating point number representing the confidence
the instance is spam.
"""
model_name = 'projects/%s/models/%s' % (
settings.classifier_project_id, settings.spam_model_name)
body = {'instances': [{"inputs": instance["word_hashes"]}]}
if not self.ml_engine:
self.ml_engine = ml_helpers.setup_ml_engine()
request = self.ml_engine.projects().predict(name=model_name, body=body)
response = request.execute()
logging.info('ML Engine API response: %r' % response)
prediction = response['predictions'][0]
# Ensure the class confidence we return is for the spam, not the ham label.
# The spam label, '1', is usually at index 1 but I'm not sure of any
# guarantees around label order.
if prediction['classes'][1] == SPAM_CLASS_LABEL:
return prediction['scores'][1]
elif prediction['classes'][0] == SPAM_CLASS_LABEL:
return prediction['scores'][0]
else:
raise Exception('No predicted classes found.')
def _IsExempt(self, author, is_project_member):
"""Return True if the user is exempt from spam checking."""
if author.email is not None and author.email.endswith(
settings.spam_allowlisted_suffixes):
logging.info('%s allowlisted from spam filtering', author.email)
return True
if is_project_member:
logging.info('%s is a project member, assuming ham', author.email)
return True
return False
def ClassifyIssue(self, issue, firstComment, reporter, is_project_member):
"""Classify an issue as either spam or ham.
Args:
issue: the Issue.
firstComment: the first Comment on issue.
reporter: User PB for the Issue reporter.
is_project_member: True if reporter is a member of issue's project.
Returns a JSON dict of classifier prediction results from
the ML Engine API.
"""
instance = ml_helpers.GenerateFeaturesRaw(
[issue.summary, firstComment.content],
settings.spam_feature_hashes)
return self._classify(instance, reporter, is_project_member)
def ClassifyComment(self, comment_content, commenter, is_project_member=True):
"""Classify a comment as either spam or ham.
Args:
comment: the comment text.
commenter: User PB for the user who authored the comment.
Returns a JSON dict of classifier prediction results from
the ML Engine API.
"""
instance = ml_helpers.GenerateFeaturesRaw(
['', comment_content],
settings.spam_feature_hashes)
return self._classify(instance, commenter, is_project_member)
def _classify(self, instance, author, is_project_member):
# Fail-safe: not spam.
result = self.ham_classification()
if self._IsExempt(author, is_project_member):
return result
if not self.ml_engine:
self.ml_engine = ml_helpers.setup_ml_engine()
# If setup_ml_engine returns None, it failed to init.
if not self.ml_engine:
logging.error("ML Engine not initialized.")
self.ml_engine_failures.increment()
result['failed_open'] = True
return result
remaining_retries = 3
while remaining_retries > 0:
try:
result['confidence_is_spam'] = self._predict(instance)
result['failed_open'] = False
return result
except Exception as ex:
remaining_retries = remaining_retries - 1
self.ml_engine_failures.increment()
logging.error('Error calling ML Engine API: %s' % ex)
result['failed_open'] = True
return result
def ham_classification(self):
return {'confidence_is_spam': 0.0,
'failed_open': False}
def GetIssueClassifierQueue(
self, cnxn, _issue_service, project_id, offset=0, limit=10):
"""Returns list of recent issues with spam verdicts,
ranked in ascending order of confidence (so uncertain items are first).
"""
# TODO(seanmccullough): Optimize pagination. This query probably gets
# slower as the number of SpamVerdicts grows, regardless of offset
# and limit values used here. Using offset,limit in general may not
# be the best way to do this.
issue_results = self.verdict_tbl.Select(
cnxn,
cols=[
'issue_id', 'is_spam', 'reason', 'classifier_confidence', 'created'
],
where=[
('project_id = %s', [project_id]),
(
'classifier_confidence <= %s',
[settings.classifier_moderation_thresh]),
('overruled = %s', [False]),
('issue_id IS NOT NULL', []),
],
order_by=[
('classifier_confidence ASC', []),
('created ASC', []),
],
group_by=['issue_id'],
offset=offset,
limit=limit,
)
ret = []
for row in issue_results:
ret.append(
ModerationItem(
issue_id=int(row[0]),
is_spam=row[1] == 1,
reason=row[2],
classifier_confidence=row[3],
verdict_time='%s' % row[4],
))
count = self.verdict_tbl.SelectValue(
cnxn,
col='COUNT(*)',
where=[
('project_id = %s', [project_id]),
(
'classifier_confidence <= %s',
[settings.classifier_moderation_thresh]),
('overruled = %s', [False]),
('issue_id IS NOT NULL', []),
])
return ret, count
def GetIssueFlagQueue(
self, cnxn, _issue_service, project_id, offset=0, limit=10):
"""Returns list of recent issues that have been flagged by users"""
issue_flags = self.report_tbl.Select(
cnxn,
cols=[
"Issue.project_id", "Report.issue_id", "count(*) as count",
"max(Report.created) as latest",
"count(distinct Report.user_id) as users"
],
left_joins=["Issue ON Issue.id = Report.issue_id"],
where=[
('Report.issue_id IS NOT NULL', []),
("Issue.project_id == %v", [project_id])
],
order_by=[('count DESC', [])],
group_by=['Report.issue_id'],
offset=offset,
limit=limit)
ret = []
for row in issue_flags:
ret.append(
ModerationItem(
project_id=row[0],
issue_id=row[1],
count=row[2],
latest_report=row[3],
num_users=row[4],
))
count = self.verdict_tbl.SelectValue(
cnxn,
col='COUNT(DISTINCT Report.issue_id)',
where=[('Issue.project_id = %s', [project_id])],
left_joins=["Issue ON Issue.id = SpamReport.issue_id"])
return ret, count
def GetCommentClassifierQueue(
self, cnxn, _issue_service, project_id, offset=0, limit=10):
"""Returns list of recent comments with spam verdicts,
ranked in ascending order of confidence (so uncertain items are first).
"""
# TODO(seanmccullough): Optimize pagination. This query probably gets
# slower as the number of SpamVerdicts grows, regardless of offset
# and limit values used here. Using offset,limit in general may not
# be the best way to do this.
comment_results = self.verdict_tbl.Select(
cnxn,
cols=[
'issue_id', 'is_spam', 'reason', 'classifier_confidence', 'created'
],
where=[
('project_id = %s', [project_id]),
(
'classifier_confidence <= %s',
[settings.classifier_moderation_thresh]),
('overruled = %s', [False]),
('comment_id IS NOT NULL', []),
],
order_by=[
('classifier_confidence ASC', []),
('created ASC', []),
],
group_by=['comment_id'],
offset=offset,
limit=limit,
)
ret = []
for row in comment_results:
ret.append(
ModerationItem(
comment_id=int(row[0]),
is_spam=row[1] == 1,
reason=row[2],
classifier_confidence=row[3],
verdict_time='%s' % row[4],
))
count = self.verdict_tbl.SelectValue(
cnxn,
col='COUNT(*)',
where=[
('project_id = %s', [project_id]),
(
'classifier_confidence <= %s',
[settings.classifier_moderation_thresh]),
('overruled = %s', [False]),
('comment_id IS NOT NULL', []),
])
return ret, count
def GetTrainingIssues(self, cnxn, issue_service, since, offset=0, limit=100):
"""Returns list of recent issues with human-labeled spam/ham verdicts.
"""
# get all of the manual verdicts in the past day.
results = self.verdict_tbl.Select(cnxn,
cols=['issue_id'],
where=[
('overruled = %s', [False]),
('reason = %s', ['manual']),
('issue_id IS NOT NULL', []),
('created > %s', [since.isoformat()]),
],
offset=offset,
limit=limit,
)
issue_ids = [int(row[0]) for row in results if row[0]]
issues = issue_service.GetIssues(cnxn, issue_ids)
comments = issue_service.GetCommentsForIssues(cnxn, issue_ids)
first_comments = {}
for issue in issues:
first_comments[issue.issue_id] = (comments[issue.issue_id][0].content
if issue.issue_id in comments else "[Empty]")
count = self.verdict_tbl.SelectValue(cnxn,
col='COUNT(*)',
where=[
('overruled = %s', [False]),
('reason = %s', ['manual']),
('issue_id IS NOT NULL', []),
('created > %s', [since.isoformat()]),
])
return issues, first_comments, count
def GetTrainingComments(self, cnxn, issue_service, since, offset=0,
limit=100):
"""Returns list of recent comments with human-labeled spam/ham verdicts.
"""
# get all of the manual verdicts in the past day.
results = self.verdict_tbl.Select(
cnxn,
distinct=True,
cols=['comment_id'],
where=[
('overruled = %s', [False]),
('reason = %s', ['manual']),
('comment_id IS NOT NULL', []),
('created > %s', [since.isoformat()]),
],
offset=offset,
limit=limit,
)
comment_ids = [int(row[0]) for row in results if row[0]]
# Don't care about sequence numbers in this context yet.
comments = issue_service.GetCommentsByID(cnxn, comment_ids,
defaultdict(int))
return comments
def ExpungeUsersInSpam(self, cnxn, user_ids):
"""Removes all references to given users from Spam DB tables.
This method will not commit the operations. This method will
not make changes to in-memory data.
"""
commit = False
self.report_tbl.Delete(cnxn, reported_user_id=user_ids, commit=commit)
self.report_tbl.Delete(cnxn, user_id=user_ids, commit=commit)
self.verdict_tbl.Delete(cnxn, user_id=user_ids, commit=commit)
class ModerationItem:
def __init__(self, **kwargs):
self.__dict__ = kwargs