blob: e9168305b2b1428d89b33289fafd4f648fb3565e [file] [log] [blame]
Copybara854996b2021-09-07 19:36:02 +00001# Copyright 2016 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style
3# license that can be found in the LICENSE file or at
4# https://developers.google.com/open-source/licenses/bsd
5
6""" Set of functions for detaling with spam reports.
7"""
8from __future__ import print_function
9from __future__ import division
10from __future__ import absolute_import
11
12import collections
13import logging
14import settings
Copybara854996b2021-09-07 19:36:02 +000015
16from collections import defaultdict
Copybara854996b2021-09-07 19:36:02 +000017from framework import sql
Copybara854996b2021-09-07 19:36:02 +000018from infra_libs import ts_mon
19from services import ml_helpers
20
21
22SPAMREPORT_TABLE_NAME = 'SpamReport'
23SPAMVERDICT_TABLE_NAME = 'SpamVerdict'
24ISSUE_TABLE = 'Issue'
25
26REASON_MANUAL = 'manual'
27REASON_THRESHOLD = 'threshold'
28REASON_CLASSIFIER = 'classifier'
29REASON_FAIL_OPEN = 'fail_open'
30SPAM_CLASS_LABEL = '1'
31
32SPAMREPORT_ISSUE_COLS = ['issue_id', 'reported_user_id', 'user_id']
33SPAMVERDICT_ISSUE_COL = ['created', 'content_created', 'user_id',
34 'reported_user_id', 'comment_id', 'issue_id']
35MANUALVERDICT_ISSUE_COLS = ['user_id', 'issue_id', 'is_spam', 'reason',
36 'project_id']
37THRESHVERDICT_ISSUE_COLS = ['issue_id', 'is_spam', 'reason', 'project_id']
38
39SPAMREPORT_COMMENT_COLS = ['comment_id', 'reported_user_id', 'user_id']
40MANUALVERDICT_COMMENT_COLS = ['user_id', 'comment_id', 'is_spam', 'reason',
41 'project_id']
42THRESHVERDICT_COMMENT_COLS = ['comment_id', 'is_spam', 'reason', 'project_id']
43
44
45class SpamService(object):
46 """The persistence layer for spam reports."""
47 issue_actions = ts_mon.CounterMetric(
48 'monorail/spam_svc/issue', 'Count of things that happen to issues.', [
49 ts_mon.StringField('type'),
50 ts_mon.StringField('reporter_id'),
51 ts_mon.StringField('issue')
52 ])
53 comment_actions = ts_mon.CounterMetric(
54 'monorail/spam_svc/comment', 'Count of things that happen to comments.', [
55 ts_mon.StringField('type'),
56 ts_mon.StringField('reporter_id'),
57 ts_mon.StringField('issue'),
58 ts_mon.StringField('comment_id')
59 ])
60 ml_engine_failures = ts_mon.CounterMetric(
61 'monorail/spam_svc/ml_engine_failure',
62 'Failures calling the ML Engine API',
63 None)
64
65 def __init__(self):
66 self.report_tbl = sql.SQLTableManager(SPAMREPORT_TABLE_NAME)
67 self.verdict_tbl = sql.SQLTableManager(SPAMVERDICT_TABLE_NAME)
68 self.issue_tbl = sql.SQLTableManager(ISSUE_TABLE)
69
70 # ML Engine library is lazy loaded below.
71 self.ml_engine = None
72
73 def LookupIssuesFlaggers(self, cnxn, issue_ids):
74 """Returns users who've reported the issues or their comments as spam.
75
76 Returns a dictionary {issue_id: (issue_reporters, comment_reporters)}
77 issue_reportes is a list of users who flagged the issue;
78 comment_reporters element is a dictionary {comment_id: [user_ids]} where
79 user_ids are the users who flagged that comment.
80 """
81 rows = self.report_tbl.Select(
82 cnxn, cols=['issue_id', 'user_id', 'comment_id'],
83 issue_id=issue_ids)
84
85 reporters = collections.defaultdict(
86 # Return a tuple of (issue_reporters, comment_reporters) as described
87 # above.
88 lambda: ([], collections.defaultdict(list)))
89
90 for row in rows:
91 issue_id = int(row[0])
92 user_id = row[1]
93 if row[2]:
94 comment_id = row[2]
95 reporters[issue_id][1][comment_id].append(user_id)
96 else:
97 reporters[issue_id][0].append(user_id)
98
99 return reporters
100
101 def LookupIssueFlaggers(self, cnxn, issue_id):
102 """Returns users who've reported the issue or its comments as spam.
103
104 Returns a tuple. First element is a list of users who flagged the issue;
105 second element is a dictionary of comment id to a list of users who flagged
106 that comment.
107 """
108 return self.LookupIssuesFlaggers(cnxn, [issue_id])[issue_id]
109
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200110 def _LookupIssueFlagCounts(self, cnxn, issue_ids):
Copybara854996b2021-09-07 19:36:02 +0000111 """Returns a map of issue_id to flag counts"""
112 rows = self.report_tbl.Select(cnxn, cols=['issue_id', 'COUNT(*)'],
113 issue_id=issue_ids, group_by=['issue_id'])
114 counts = {}
115 for row in rows:
116 counts[int(row[0])] = row[1]
117 return counts
118
119 def LookupIssueVerdicts(self, cnxn, issue_ids):
120 """Returns a map of issue_id to most recent spam verdicts"""
121 rows = self.verdict_tbl.Select(cnxn,
122 cols=['issue_id', 'reason', 'MAX(created)'],
123 issue_id=issue_ids, comment_id=None,
124 group_by=['issue_id'])
125 counts = {}
126 for row in rows:
127 counts[int(row[0])] = row[1]
128 return counts
129
130 def LookupIssueVerdictHistory(self, cnxn, issue_ids):
131 """Returns a map of issue_id to most recent spam verdicts"""
132 rows = self.verdict_tbl.Select(cnxn, cols=[
133 'issue_id', 'reason', 'created', 'is_spam', 'classifier_confidence',
134 'user_id', 'overruled'],
135 issue_id=issue_ids, order_by=[('issue_id', []), ('created', [])])
136
137 # TODO: group by issue_id, make class instead of dict for verdict.
138 verdicts = []
139 for row in rows:
140 verdicts.append({
141 'issue_id': row[0],
142 'reason': row[1],
143 'created': row[2],
144 'is_spam': row[3],
145 'classifier_confidence': row[4],
146 'user_id': row[5],
147 'overruled': row[6],
148 })
149
150 return verdicts
151
152 def LookupCommentVerdictHistory(self, cnxn, comment_ids):
153 """Returns a map of issue_id to most recent spam verdicts"""
154 rows = self.verdict_tbl.Select(cnxn, cols=[
155 'comment_id', 'reason', 'created', 'is_spam', 'classifier_confidence',
156 'user_id', 'overruled'],
157 comment_id=comment_ids, order_by=[('comment_id', []), ('created', [])])
158
159 # TODO: group by comment_id, make class instead of dict for verdict.
160 verdicts = []
161 for row in rows:
162 verdicts.append({
163 'comment_id': row[0],
164 'reason': row[1],
165 'created': row[2],
166 'is_spam': row[3],
167 'classifier_confidence': row[4],
168 'user_id': row[5],
169 'overruled': row[6],
170 })
171
172 return verdicts
173
174 def FlagIssues(self, cnxn, issue_service, issues, reporting_user_id,
175 flagged_spam):
176 """Creates or deletes a spam report on an issue."""
177 verdict_updates = []
178 if flagged_spam:
179 rows = [(issue.issue_id, issue.reporter_id, reporting_user_id)
180 for issue in issues]
181 self.report_tbl.InsertRows(cnxn, SPAMREPORT_ISSUE_COLS, rows,
182 ignore=True)
183 else:
184 issue_ids = [issue.issue_id for issue in issues]
185 self.report_tbl.Delete(
186 cnxn, issue_id=issue_ids, user_id=reporting_user_id,
187 comment_id=None)
188
189 project_id = issues[0].project_id
190
191 # Now record new verdicts and update issue.is_spam, if they've changed.
192 ids = [issue.issue_id for issue in issues]
Adrià Vilanova Martínezde942802022-07-15 14:06:55 +0200193 counts = self._LookupIssueFlagCounts(cnxn, ids)
Copybara854996b2021-09-07 19:36:02 +0000194 previous_verdicts = self.LookupIssueVerdicts(cnxn, ids)
195
196 for issue_id in counts:
197 # If the flag counts changed enough to toggle the is_spam bit, need to
198 # record a new verdict and update the Issue.
199
200 # No number of user spam flags can overturn an admin's verdict.
201 if previous_verdicts.get(issue_id) == REASON_MANUAL:
202 continue
203
204 # If enough spam flags come in, mark the issue as spam.
205 if (flagged_spam and counts[issue_id] >= settings.spam_flag_thresh):
206 verdict_updates.append(issue_id)
207
208 if len(verdict_updates) == 0:
209 return
210
211 # Some of the issues may have exceed the flag threshold, so issue verdicts
212 # and mark as spam in those cases.
213 rows = [(issue_id, flagged_spam, REASON_THRESHOLD, project_id)
214 for issue_id in verdict_updates]
215 self.verdict_tbl.InsertRows(cnxn, THRESHVERDICT_ISSUE_COLS, rows,
216 ignore=True)
217 update_issues = []
218 for issue in issues:
219 if issue.issue_id in verdict_updates:
220 issue.is_spam = flagged_spam
221 update_issues.append(issue)
222
223 if flagged_spam:
224 for issue in update_issues:
225 issue_ref = '%s:%s' % (issue.project_name, issue.local_id)
226 self.issue_actions.increment(
227 {
228 'type': 'flag',
229 'reporter_id': str(reporting_user_id),
230 'issue': issue_ref
231 })
232
233 issue_service.UpdateIssues(cnxn, update_issues, update_cols=['is_spam'])
234
235 def FlagComment(
236 self, cnxn, issue, comment_id, reported_user_id, reporting_user_id,
237 flagged_spam):
238 """Creates or deletes a spam report on a comment."""
239 # TODO(seanmccullough): Bulk comment flagging? There's no UI for that.
240 if flagged_spam:
241 self.report_tbl.InsertRow(
242 cnxn,
243 ignore=True,
244 issue_id=issue.issue_id,
245 comment_id=comment_id,
246 reported_user_id=reported_user_id,
247 user_id=reporting_user_id)
248 issue_ref = '%s:%s' % (issue.project_name, issue.local_id)
249 self.comment_actions.increment(
250 {
251 'type': 'flag',
252 'reporter_id': str(reporting_user_id),
253 'issue': issue_ref,
254 'comment_id': str(comment_id)
255 })
256 else:
257 self.report_tbl.Delete(
258 cnxn,
259 issue_id=issue.issue_id,
260 comment_id=comment_id,
261 user_id=reporting_user_id)
262
263 def RecordClassifierIssueVerdict(self, cnxn, issue, is_spam, confidence,
264 fail_open):
265 reason = REASON_FAIL_OPEN if fail_open else REASON_CLASSIFIER
266 self.verdict_tbl.InsertRow(cnxn, issue_id=issue.issue_id, is_spam=is_spam,
267 reason=reason, classifier_confidence=confidence,
268 project_id=issue.project_id)
269 if is_spam:
270 issue_ref = '%s:%s' % (issue.project_name, issue.local_id)
271 self.issue_actions.increment(
272 {
273 'type': 'classifier',
274 'reporter_id': 'classifier',
275 'issue': issue_ref
276 })
277 # This is called at issue creation time, so there's nothing else to do here.
278
279 def RecordManualIssueVerdicts(self, cnxn, issue_service, issues, user_id,
280 is_spam):
281 rows = [(user_id, issue.issue_id, is_spam, REASON_MANUAL, issue.project_id)
282 for issue in issues]
283 issue_ids = [issue.issue_id for issue in issues]
284
285 # Overrule all previous verdicts.
286 self.verdict_tbl.Update(cnxn, {'overruled': True}, [
287 ('issue_id IN (%s)' % sql.PlaceHolders(issue_ids), issue_ids)
288 ], commit=False)
289
290 self.verdict_tbl.InsertRows(cnxn, MANUALVERDICT_ISSUE_COLS, rows,
291 ignore=True)
292
293 for issue in issues:
294 issue.is_spam = is_spam
295
296 if is_spam:
297 for issue in issues:
298 issue_ref = '%s:%s' % (issue.project_name, issue.local_id)
299 self.issue_actions.increment(
300 {
301 'type': 'manual',
302 'reporter_id': str(user_id),
303 'issue': issue_ref
304 })
305 else:
306 issue_service.AllocateNewLocalIDs(cnxn, issues)
307
308 # This will commit the transaction.
309 issue_service.UpdateIssues(cnxn, issues, update_cols=['is_spam'])
310
311 def RecordManualCommentVerdict(self, cnxn, issue_service, user_service,
312 comment_id, user_id, is_spam):
313 # TODO(seanmccullough): Bulk comment verdicts? There's no UI for that.
314 self.verdict_tbl.InsertRow(cnxn, ignore=True,
315 user_id=user_id, comment_id=comment_id, is_spam=is_spam,
316 reason=REASON_MANUAL)
317 comment = issue_service.GetComment(cnxn, comment_id)
318 comment.is_spam = is_spam
319 issue = issue_service.GetIssue(cnxn, comment.issue_id, use_cache=False)
320 issue_service.SoftDeleteComment(
321 cnxn, issue, comment, user_id, user_service, is_spam, True, is_spam)
322 if is_spam:
323 issue_ref = '%s:%s' % (issue.project_name, issue.local_id)
324 self.comment_actions.increment(
325 {
326 'type': 'manual',
327 'reporter_id': str(user_id),
328 'issue': issue_ref,
329 'comment_id': str(comment_id)
330 })
331
332 def RecordClassifierCommentVerdict(
333 self, cnxn, issue_service, comment, is_spam, confidence, fail_open):
334 reason = REASON_FAIL_OPEN if fail_open else REASON_CLASSIFIER
335 self.verdict_tbl.InsertRow(cnxn, comment_id=comment.id, is_spam=is_spam,
336 reason=reason, classifier_confidence=confidence,
337 project_id=comment.project_id)
338 if is_spam:
339 issue = issue_service.GetIssue(cnxn, comment.issue_id, use_cache=False)
340 issue_ref = '%s:%s' % (issue.project_name, issue.local_id)
341 self.comment_actions.increment(
342 {
343 'type': 'classifier',
344 'reporter_id': 'classifier',
345 'issue': issue_ref,
346 'comment_id': str(comment.id)
347 })
348
349 def _predict(self, instance):
350 """Requests a prediction from the ML Engine API.
351
352 Sample API response:
353 {'predictions': [{
354 'classes': ['0', '1'],
355 'scores': [0.4986788034439087, 0.5013211965560913]
356 }]}
357
358 This hits the default model.
359
360 Returns:
361 A floating point number representing the confidence
362 the instance is spam.
363 """
364 model_name = 'projects/%s/models/%s' % (
365 settings.classifier_project_id, settings.spam_model_name)
366 body = {'instances': [{"inputs": instance["word_hashes"]}]}
367
368 if not self.ml_engine:
369 self.ml_engine = ml_helpers.setup_ml_engine()
370
371 request = self.ml_engine.projects().predict(name=model_name, body=body)
372 response = request.execute()
373 logging.info('ML Engine API response: %r' % response)
374 prediction = response['predictions'][0]
375
376 # Ensure the class confidence we return is for the spam, not the ham label.
377 # The spam label, '1', is usually at index 1 but I'm not sure of any
378 # guarantees around label order.
379 if prediction['classes'][1] == SPAM_CLASS_LABEL:
380 return prediction['scores'][1]
381 elif prediction['classes'][0] == SPAM_CLASS_LABEL:
382 return prediction['scores'][0]
383 else:
384 raise Exception('No predicted classes found.')
385
386 def _IsExempt(self, author, is_project_member):
387 """Return True if the user is exempt from spam checking."""
388 if author.email is not None and author.email.endswith(
389 settings.spam_allowlisted_suffixes):
390 logging.info('%s allowlisted from spam filtering', author.email)
391 return True
392
393 if is_project_member:
394 logging.info('%s is a project member, assuming ham', author.email)
395 return True
396
397 return False
398
399 def ClassifyIssue(self, issue, firstComment, reporter, is_project_member):
400 """Classify an issue as either spam or ham.
401
402 Args:
403 issue: the Issue.
404 firstComment: the first Comment on issue.
405 reporter: User PB for the Issue reporter.
406 is_project_member: True if reporter is a member of issue's project.
407
408 Returns a JSON dict of classifier prediction results from
409 the ML Engine API.
410 """
411 instance = ml_helpers.GenerateFeaturesRaw(
412 [issue.summary, firstComment.content],
413 settings.spam_feature_hashes)
414 return self._classify(instance, reporter, is_project_member)
415
416 def ClassifyComment(self, comment_content, commenter, is_project_member=True):
417 """Classify a comment as either spam or ham.
418
419 Args:
420 comment: the comment text.
421 commenter: User PB for the user who authored the comment.
422
423 Returns a JSON dict of classifier prediction results from
424 the ML Engine API.
425 """
426 instance = ml_helpers.GenerateFeaturesRaw(
427 ['', comment_content],
428 settings.spam_feature_hashes)
429 return self._classify(instance, commenter, is_project_member)
430
431
432 def _classify(self, instance, author, is_project_member):
433 # Fail-safe: not spam.
434 result = self.ham_classification()
435
436 if self._IsExempt(author, is_project_member):
437 return result
438
439 if not self.ml_engine:
440 self.ml_engine = ml_helpers.setup_ml_engine()
441
442 # If setup_ml_engine returns None, it failed to init.
443 if not self.ml_engine:
444 logging.error("ML Engine not initialized.")
445 self.ml_engine_failures.increment()
446 result['failed_open'] = True
447 return result
448
449 remaining_retries = 3
450 while remaining_retries > 0:
451 try:
452 result['confidence_is_spam'] = self._predict(instance)
453 result['failed_open'] = False
454 return result
455 except Exception as ex:
456 remaining_retries = remaining_retries - 1
457 self.ml_engine_failures.increment()
458 logging.error('Error calling ML Engine API: %s' % ex)
459
460 result['failed_open'] = True
461 return result
462
463 def ham_classification(self):
464 return {'confidence_is_spam': 0.0,
465 'failed_open': False}
466
Copybara854996b2021-09-07 19:36:02 +0000467 def GetIssueFlagQueue(
468 self, cnxn, _issue_service, project_id, offset=0, limit=10):
469 """Returns list of recent issues that have been flagged by users"""
470 issue_flags = self.report_tbl.Select(
471 cnxn,
472 cols=[
473 "Issue.project_id", "Report.issue_id", "count(*) as count",
474 "max(Report.created) as latest",
475 "count(distinct Report.user_id) as users"
476 ],
477 left_joins=["Issue ON Issue.id = Report.issue_id"],
478 where=[
479 ('Report.issue_id IS NOT NULL', []),
480 ("Issue.project_id == %v", [project_id])
481 ],
482 order_by=[('count DESC', [])],
483 group_by=['Report.issue_id'],
484 offset=offset,
485 limit=limit)
486 ret = []
487 for row in issue_flags:
488 ret.append(
489 ModerationItem(
490 project_id=row[0],
491 issue_id=row[1],
492 count=row[2],
493 latest_report=row[3],
494 num_users=row[4],
495 ))
496
497 count = self.verdict_tbl.SelectValue(
498 cnxn,
499 col='COUNT(DISTINCT Report.issue_id)',
500 where=[('Issue.project_id = %s', [project_id])],
501 left_joins=["Issue ON Issue.id = SpamReport.issue_id"])
502 return ret, count
503
504
505 def GetCommentClassifierQueue(
506 self, cnxn, _issue_service, project_id, offset=0, limit=10):
507 """Returns list of recent comments with spam verdicts,
508 ranked in ascending order of confidence (so uncertain items are first).
509 """
510 # TODO(seanmccullough): Optimize pagination. This query probably gets
511 # slower as the number of SpamVerdicts grows, regardless of offset
512 # and limit values used here. Using offset,limit in general may not
513 # be the best way to do this.
514 comment_results = self.verdict_tbl.Select(
515 cnxn,
516 cols=[
517 'issue_id', 'is_spam', 'reason', 'classifier_confidence', 'created'
518 ],
519 where=[
520 ('project_id = %s', [project_id]),
521 (
522 'classifier_confidence <= %s',
523 [settings.classifier_moderation_thresh]),
524 ('overruled = %s', [False]),
525 ('comment_id IS NOT NULL', []),
526 ],
527 order_by=[
528 ('classifier_confidence ASC', []),
529 ('created ASC', []),
530 ],
531 group_by=['comment_id'],
532 offset=offset,
533 limit=limit,
534 )
535
536 ret = []
537 for row in comment_results:
538 ret.append(
539 ModerationItem(
540 comment_id=int(row[0]),
541 is_spam=row[1] == 1,
542 reason=row[2],
543 classifier_confidence=row[3],
544 verdict_time='%s' % row[4],
545 ))
546
547 count = self.verdict_tbl.SelectValue(
548 cnxn,
549 col='COUNT(*)',
550 where=[
551 ('project_id = %s', [project_id]),
552 (
553 'classifier_confidence <= %s',
554 [settings.classifier_moderation_thresh]),
555 ('overruled = %s', [False]),
556 ('comment_id IS NOT NULL', []),
557 ])
558
559 return ret, count
560
561
562 def GetTrainingIssues(self, cnxn, issue_service, since, offset=0, limit=100):
563 """Returns list of recent issues with human-labeled spam/ham verdicts.
564 """
565
566 # get all of the manual verdicts in the past day.
567 results = self.verdict_tbl.Select(cnxn,
568 cols=['issue_id'],
569 where=[
570 ('overruled = %s', [False]),
571 ('reason = %s', ['manual']),
572 ('issue_id IS NOT NULL', []),
573 ('created > %s', [since.isoformat()]),
574 ],
575 offset=offset,
576 limit=limit,
577 )
578
579 issue_ids = [int(row[0]) for row in results if row[0]]
580 issues = issue_service.GetIssues(cnxn, issue_ids)
581 comments = issue_service.GetCommentsForIssues(cnxn, issue_ids)
582 first_comments = {}
583 for issue in issues:
584 first_comments[issue.issue_id] = (comments[issue.issue_id][0].content
585 if issue.issue_id in comments else "[Empty]")
586
587 count = self.verdict_tbl.SelectValue(cnxn,
588 col='COUNT(*)',
589 where=[
590 ('overruled = %s', [False]),
591 ('reason = %s', ['manual']),
592 ('issue_id IS NOT NULL', []),
593 ('created > %s', [since.isoformat()]),
594 ])
595
596 return issues, first_comments, count
597
598 def GetTrainingComments(self, cnxn, issue_service, since, offset=0,
599 limit=100):
600 """Returns list of recent comments with human-labeled spam/ham verdicts.
601 """
602
603 # get all of the manual verdicts in the past day.
604 results = self.verdict_tbl.Select(
605 cnxn,
606 distinct=True,
607 cols=['comment_id'],
608 where=[
609 ('overruled = %s', [False]),
610 ('reason = %s', ['manual']),
611 ('comment_id IS NOT NULL', []),
612 ('created > %s', [since.isoformat()]),
613 ],
614 offset=offset,
615 limit=limit,
616 )
617
618 comment_ids = [int(row[0]) for row in results if row[0]]
619 # Don't care about sequence numbers in this context yet.
620 comments = issue_service.GetCommentsByID(cnxn, comment_ids,
621 defaultdict(int))
622 return comments
623
624 def ExpungeUsersInSpam(self, cnxn, user_ids):
625 """Removes all references to given users from Spam DB tables.
626
627 This method will not commit the operations. This method will
628 not make changes to in-memory data.
629 """
630 commit = False
631 self.report_tbl.Delete(cnxn, reported_user_id=user_ids, commit=commit)
632 self.report_tbl.Delete(cnxn, user_id=user_ids, commit=commit)
633 self.verdict_tbl.Delete(cnxn, user_id=user_ids, commit=commit)
634
635
636class ModerationItem:
637 def __init__(self, **kwargs):
638 self.__dict__ = kwargs