Project import generated by Copybara.

GitOrigin-RevId: d9e9e3fb4e31372ec1fb43b178994ca78fa8fe70
diff --git a/tools/ml/trainer/__init__.py b/tools/ml/trainer/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tools/ml/trainer/__init__.py
diff --git a/tools/ml/trainer/dataset.py b/tools/ml/trainer/dataset.py
new file mode 100644
index 0000000..0def4b6
--- /dev/null
+++ b/tools/ml/trainer/dataset.py
@@ -0,0 +1,95 @@
+# Copyright 2018 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style
+# license that can be found in the LICENSE file or at
+# https://developers.google.com/open-source/licenses/bsd
+
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+
+import StringIO
+import tensorflow as tf
+
+import csv
+import sys
+from googleapiclient import discovery
+from googleapiclient import errors
+from oauth2client.client import GoogleCredentials
+
+import trainer.ml_helpers
+
+
+def fetch_training_data(bucket, prefix, trainer_type):
+
+  credentials = GoogleCredentials.get_application_default()
+  storage = discovery.build('storage', 'v1', credentials=credentials)
+  objects = storage.objects()
+
+  request = objects.list(bucket=bucket, prefix=prefix)
+  response = make_api_request(request)
+  items = response.get('items')
+  csv_filepaths = [blob.get('name') for blob in items]
+
+  if trainer_type == 'spam':
+    return fetch_spam(csv_filepaths, bucket, objects)
+  else:
+    return fetch_component(csv_filepaths, bucket, objects)
+
+
+def fetch_spam(csv_filepaths, bucket, objects):
+
+  training_data = []
+  # Add code
+  csv_filepaths = [
+    'spam-training-data/full-android.csv',
+    'spam-training-data/full-support.csv',
+  ] + csv_filepaths
+
+  for filepath in csv_filepaths:
+    media = fetch_training_csv(filepath, objects, bucket)
+    rows, skipped_rows = trainer.ml_helpers.spam_from_file(
+        StringIO.StringIO(media))
+
+    if len(rows):
+      training_data.extend(rows)
+
+    tf.logging.info('{:<40}{:<20}{:<20}'.format(
+        filepath,
+        'added %d rows' % len(rows),
+        'skipped %d rows' % skipped_rows))
+
+  return training_data
+
+
+def fetch_component(csv_filepaths, bucket, objects):
+
+  training_data = []
+  for filepath in csv_filepaths:
+    media = fetch_training_csv(filepath, objects, bucket)
+    rows = trainer.ml_helpers.component_from_file(
+        StringIO.StringIO(media))
+
+    if len(rows):
+      training_data.extend(rows)
+
+    tf.logging.info('{:<40}{:<20}'.format(
+        filepath,
+        'added %d rows' % len(rows)))
+
+  return training_data
+
+
+def fetch_training_csv(filepath, objects, bucket):
+  request = objects.get_media(bucket=bucket, object=filepath)
+  return make_api_request(request)
+
+
+def make_api_request(request):
+  try:
+    return request.execute()
+  except errors.HttpError, err:
+    tf.logging.error('There was an error with the API. Details:')
+    tf.logging.error(err._get_reason())
+    raise
+
+
diff --git a/tools/ml/trainer/ml_helpers.py b/tools/ml/trainer/ml_helpers.py
new file mode 120000
index 0000000..c790a2c
--- /dev/null
+++ b/tools/ml/trainer/ml_helpers.py
@@ -0,0 +1 @@
+../../../services/ml_helpers.py
\ No newline at end of file
diff --git a/tools/ml/trainer/model.py b/tools/ml/trainer/model.py
new file mode 100644
index 0000000..3b627a9
--- /dev/null
+++ b/tools/ml/trainer/model.py
@@ -0,0 +1,109 @@
+# Copyright 2018 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style
+# license that can be found in the LICENSE file or at
+# https://developers.google.com/open-source/licenses/bsd
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import numpy as np
+import tensorflow as tf
+
+from trainer.ml_helpers import COMPONENT_FEATURES
+from trainer.ml_helpers import SPAM_FEATURE_HASHES
+
+# Important: we assume this list mirrors the output of GenerateFeaturesRaw.
+INPUT_COLUMNS = {'component': [
+                     tf.feature_column.numeric_column(
+                         key='word_features',
+                         shape=(COMPONENT_FEATURES,)),
+                 ],
+                 'spam': [
+                     tf.feature_column.numeric_column(
+                         key='word_hashes',
+                         shape=(SPAM_FEATURE_HASHES,)),
+                 ]}
+
+
+def build_estimator(config, trainer_type, class_count):
+  """Returns a tf.Estimator.
+
+  Args:
+    config: tf.contrib.learn.RunConfig defining the runtime environment for the
+      estimator (including model_dir).
+  Returns:
+    A LinearClassifier
+  """
+  return tf.contrib.learn.DNNClassifier(
+    config=config,
+    feature_columns=(INPUT_COLUMNS[trainer_type]),
+    hidden_units=[1024, 512, 256],
+    optimizer=tf.train.AdamOptimizer(learning_rate=0.001,
+      beta1=0.9,
+      beta2=0.999,
+      epsilon=1e-08,
+      use_locking=False,
+      name='Adam'),
+    n_classes=class_count
+  )
+
+
+def feature_list_to_dict(X, trainer_type):
+  """Converts an array of feature dicts into to one dict of
+    {feature_name: [feature_values]}.
+
+  Important: this assumes the ordering of X and INPUT_COLUMNS is the same.
+
+  Args:
+    X: an array of feature dicts
+  Returns:
+    A dictionary where each key is a feature name its value is a numpy array of
+    shape (len(X),).
+  """
+  feature_dict = {}
+
+  for feature_column in INPUT_COLUMNS[trainer_type]:
+    feature_dict[feature_column.name] = []
+
+  for instance in X:
+    for key in instance.keys():
+      feature_dict[key].append(instance[key])
+
+  for key in [f.name for f in INPUT_COLUMNS[trainer_type]]:
+    feature_dict[key] = np.array(feature_dict[key])
+
+  return feature_dict
+
+
+def generate_json_serving_input_fn(trainer_type):
+  def json_serving_input_fn():
+    """Build the serving inputs.
+
+    Returns:
+      An InputFnOps containing features with placeholders.
+    """
+    features_placeholders = {}
+    for column in INPUT_COLUMNS[trainer_type]:
+      name = '%s_placeholder' % column.name
+
+      # Special case non-scalar features.
+      if column.shape[0] > 1:
+        shape = [None, column.shape[0]]
+      else:
+        shape = [None]
+
+      placeholder = tf.placeholder(tf.float32, shape, name=name)
+      features_placeholders[column.name] = placeholder
+
+    labels = None # Unknown at serving time
+    return tf.contrib.learn.InputFnOps(features_placeholders, labels,
+      features_placeholders)
+
+  return json_serving_input_fn
+
+
+SERVING_FUNCTIONS = {
+    'JSON-component': generate_json_serving_input_fn('component'),
+    'JSON-spam':  generate_json_serving_input_fn('spam')
+}
diff --git a/tools/ml/trainer/task.py b/tools/ml/trainer/task.py
new file mode 100644
index 0000000..7416c68
--- /dev/null
+++ b/tools/ml/trainer/task.py
@@ -0,0 +1,284 @@
+# Copyright 2018 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style
+# license that can be found in the LICENSE file or at
+# https://developers.google.com/open-source/licenses/bsd
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import argparse
+import json
+import os
+import re
+
+import numpy as np
+import tensorflow as tf
+from googleapiclient import discovery
+from googleapiclient import errors
+from oauth2client.client import GoogleCredentials
+from sklearn.model_selection import train_test_split
+from tensorflow.contrib.learn.python.learn import learn_runner
+from tensorflow.contrib.learn.python.learn.estimators import run_config
+from tensorflow.contrib.learn.python.learn.utils import saved_model_export_utils
+from tensorflow.contrib.training.python.training import hparam
+
+from google.cloud.storage import blob, bucket, client
+
+import trainer.dataset
+import trainer.model
+import trainer.ml_helpers
+import trainer.top_words
+
+def generate_experiment_fn(**experiment_args):
+  """Create an experiment function.
+
+  Args:
+    experiment_args: keyword arguments to be passed through to experiment
+      See `tf.contrib.learn.Experiment` for full args.
+  Returns:
+    A function:
+      (tf.contrib.learn.RunConfig, tf.contrib.training.HParams) -> Experiment
+
+    This function is used by learn_runner to create an Experiment which
+    executes model code provided in the form of an Estimator and
+    input functions.
+  """
+  def _experiment_fn(config, hparams):
+    index_to_component = {}
+
+    if hparams.train_file:
+      with open(hparams.train_file) as f:
+        if hparams.trainer_type == 'spam':
+          training_data = trainer.ml_helpers.spam_from_file(f)
+        else:
+          training_data = trainer.ml_helpers.component_from_file(f)
+    else:
+      training_data = trainer.dataset.fetch_training_data(hparams.gcs_bucket,
+        hparams.gcs_prefix, hparams.trainer_type)
+
+    tf.logging.info('Training data received. Len: %d' % len(training_data))
+
+    if hparams.trainer_type == 'spam':
+      X, y = trainer.ml_helpers.transform_spam_csv_to_features(
+          training_data)
+    else:
+      top_list = trainer.top_words.make_top_words_list(hparams.job_dir)
+      X, y, index_to_component = trainer.ml_helpers \
+          .transform_component_csv_to_features(training_data, top_list)
+
+    tf.logging.info('Features generated')
+    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2,
+      random_state=42)
+
+    train_input_fn = tf.estimator.inputs.numpy_input_fn(
+      x=trainer.model.feature_list_to_dict(X_train, hparams.trainer_type),
+      y=np.array(y_train),
+      num_epochs=hparams.num_epochs,
+      batch_size=hparams.train_batch_size,
+      shuffle=True
+    )
+    eval_input_fn = tf.estimator.inputs.numpy_input_fn(
+      x=trainer.model.feature_list_to_dict(X_test, hparams.trainer_type),
+      y=np.array(y_test),
+      num_epochs=None,
+      batch_size=hparams.eval_batch_size,
+      shuffle=False # Don't shuffle evaluation data
+    )
+
+    tf.logging.info('Numpy fns created')
+    if hparams.trainer_type == 'component':
+      store_component_conversion(hparams.job_dir, index_to_component)
+
+    return tf.contrib.learn.Experiment(
+      trainer.model.build_estimator(config=config,
+                                    trainer_type=hparams.trainer_type,
+                                    class_count=len(set(y))),
+      train_input_fn=train_input_fn,
+      eval_input_fn=eval_input_fn,
+      **experiment_args
+    )
+  return _experiment_fn
+
+
+def store_component_conversion(job_dir, data):
+
+  tf.logging.info('job_dir: %s' % job_dir)
+  job_info = re.search('gs://(monorail-.+)-mlengine/(component_trainer_\d+)',
+                       job_dir)
+
+  # Check if training is being done on GAE or locally.
+  if job_info:
+    project = job_info.group(1)
+    job_name = job_info.group(2)
+
+    client_obj = client.Client(project=project)
+    bucket_name = '%s-mlengine' % project
+    bucket_obj = bucket.Bucket(client_obj, bucket_name)
+
+    bucket_obj.blob = blob.Blob(job_name + '/component_index.json', bucket_obj)
+
+    bucket_obj.blob.upload_from_string(json.dumps(data),
+                                       content_type='application/json')
+
+  else:
+    paths = job_dir.split('/')
+    for y, _ in enumerate(list(range(1, len(paths))), 1):
+      if not os.path.exists("/".join(paths[:y+1])):
+        os.makedirs('/'.join(paths[:y+1]))
+    with open(job_dir + '/component_index.json', 'w') as f:
+      f.write(json.dumps(data))
+
+
+def store_eval(job_dir, results):
+
+  tf.logging.info('job_dir: %s' % job_dir)
+  job_info = re.search('gs://(monorail-.+)-mlengine/(spam_trainer_\d+)',
+                       job_dir)
+
+  # Only upload eval data if this is not being run locally.
+  if job_info:
+    project = job_info.group(1)
+    job_name = job_info.group(2)
+
+    tf.logging.info('project: %s' % project)
+    tf.logging.info('job_name: %s' % job_name)
+
+    client_obj = client.Client(project=project)
+    bucket_name = '%s-mlengine' % project
+    bucket_obj = bucket.Bucket(client_obj, bucket_name)
+
+    bucket_obj.blob = blob.Blob(job_name + '/eval_data.json', bucket_obj)
+    for key, value in results[0].items():
+      if isinstance(value, np.float32):
+        results[0][key] = value.item()
+
+    bucket_obj.blob.upload_from_string(json.dumps(results[0]),
+                                       content_type='application/json')
+
+  else:
+    tf.logging.error('Could not find bucket "%s" to output evalution to.'
+                     % job_dir)
+
+
+if __name__ == '__main__':
+  parser = argparse.ArgumentParser()
+
+  # Input Arguments
+  parser.add_argument(
+    '--train-file',
+    help='GCS or local path to training data',
+  )
+  parser.add_argument(
+    '--gcs-bucket',
+    help='GCS bucket for training data.',
+  )
+  parser.add_argument(
+    '--gcs-prefix',
+    help='Training data path prefix inside GCS bucket.',
+  )
+  parser.add_argument(
+    '--num-epochs',
+    help="""\
+    Maximum number of training data epochs on which to train.
+    If both --max-steps and --num-epochs are specified,
+    the training job will run for --max-steps or --num-epochs,
+    whichever occurs first. If unspecified will run for --max-steps.\
+    """,
+    type=int,
+  )
+  parser.add_argument(
+    '--train-batch-size',
+    help='Batch size for training steps',
+    type=int,
+    default=128
+  )
+  parser.add_argument(
+    '--eval-batch-size',
+    help='Batch size for evaluation steps',
+    type=int,
+    default=128
+  )
+
+  # Training arguments
+  parser.add_argument(
+    '--job-dir',
+    help='GCS location to write checkpoints and export models',
+    required=True
+  )
+
+  # Logging arguments
+  parser.add_argument(
+    '--verbosity',
+    choices=[
+        'DEBUG',
+        'ERROR',
+        'FATAL',
+        'INFO',
+        'WARN'
+    ],
+    default='INFO',
+  )
+
+  # Experiment arguments
+  parser.add_argument(
+    '--eval-delay-secs',
+    help='How long to wait before running first evaluation',
+    default=10,
+    type=int
+  )
+  parser.add_argument(
+    '--min-eval-frequency',
+    help='Minimum number of training steps between evaluations',
+    default=None,  # Use TensorFlow's default (currently, 1000)
+    type=int
+  )
+  parser.add_argument(
+    '--train-steps',
+    help="""\
+    Steps to run the training job for. If --num-epochs is not specified,
+    this must be. Otherwise the training job will run indefinitely.\
+    """,
+    type=int
+  )
+  parser.add_argument(
+    '--eval-steps',
+    help='Number of steps to run evalution for at each checkpoint',
+    default=100,
+    type=int
+  )
+  parser.add_argument(
+    '--trainer-type',
+    help='Which trainer to use (spam or component)',
+    choices=['spam', 'component'],
+    required=True
+  )
+
+  args = parser.parse_args()
+
+  tf.logging.set_verbosity(args.verbosity)
+
+  # Run the training job
+  # learn_runner pulls configuration information from environment
+  # variables using tf.learn.RunConfig and uses this configuration
+  # to conditionally execute Experiment, or param server code.
+  eval_results = learn_runner.run(
+    generate_experiment_fn(
+      min_eval_frequency=args.min_eval_frequency,
+      eval_delay_secs=args.eval_delay_secs,
+      train_steps=args.train_steps,
+      eval_steps=args.eval_steps,
+      export_strategies=[saved_model_export_utils.make_export_strategy(
+        trainer.model.SERVING_FUNCTIONS['JSON-' + args.trainer_type],
+        exports_to_keep=1,
+        default_output_alternative_key=None,
+      )],
+    ),
+    run_config=run_config.RunConfig(model_dir=args.job_dir),
+    hparams=hparam.HParams(**args.__dict__)
+  )
+
+  # Store a json blob in GCS with the results of training job (AUC of
+  # precision/recall, etc).
+  if args.trainer_type == 'spam':
+    store_eval(args.job_dir, eval_results)
diff --git a/tools/ml/trainer/top_words.py b/tools/ml/trainer/top_words.py
new file mode 100644
index 0000000..26da211
--- /dev/null
+++ b/tools/ml/trainer/top_words.py
@@ -0,0 +1,127 @@
+# Copyright 2018 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style
+# license that can be found in the LICENSE file or at
+# https://developers.google.com/open-source/licenses/bsd
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import csv
+import os
+import re
+import StringIO
+import sys
+import tensorflow as tf
+import time
+
+from googleapiclient import discovery
+from googleapiclient import errors
+from oauth2client.client import GoogleCredentials
+import google
+from google.cloud.storage import blob, bucket, client
+
+import trainer.ml_helpers
+import trainer.dataset
+
+
+TOP_WORDS = 'topwords.txt'
+STOP_WORDS = 'stopwords.txt'
+
+
+def fetch_stop_words(project_id, objects):
+  request = objects.get_media(bucket=project_id + '-mlengine',
+                              object=STOP_WORDS)
+  response = trainer.dataset.make_api_request(request)
+  return response.split()
+
+
+def fetch_training_csv(filepath, objects, b):
+  request = objects.get_media(bucket=b, object=filepath)
+  return trainer.dataset.make_api_request(request)
+
+
+def GenerateTopWords(objects, word_dict, project_id):
+  stop_words = fetch_stop_words(project_id, objects)
+  sorted_words = sorted(word_dict, key=word_dict.get, reverse=True)
+
+  top_words = []
+  index = 0
+
+  while len(top_words) < trainer.ml_helpers.COMPONENT_FEATURES:
+    if sorted_words[index] not in stop_words:
+      top_words.append(sorted_words[index])
+    index += 1
+
+  return top_words
+
+
+def make_top_words_list(job_dir):
+  """Returns the top (most common) words in the entire dataset for component
+  prediction. If a file is already stored in GCS containing these words, the
+  words from the file are simply returned. Otherwise, the most common words are
+  determined and written to GCS, before being returned.
+
+  Returns:
+    A list of the most common words in the dataset (the number of them
+    determined by ml_helpers.COMPONENT_FEATURES).
+  """
+
+  credentials = GoogleCredentials.get_application_default()
+  storage = discovery.build('storage', 'v1', credentials=credentials)
+  objects = storage.objects()
+
+  subpaths = re.match('gs://(monorail-.*)-mlengine/(component_trainer_\d+)',
+                      job_dir)
+
+  if subpaths:
+    project_id = subpaths.group(1)
+    trainer_folder = subpaths.group(2)
+  else:
+    project_id = 'monorail-prod'
+
+  storage_bucket = project_id + '.appspot.com'
+  request = objects.list(bucket=storage_bucket,
+                         prefix='component_training_data')
+
+  response = trainer.dataset.make_api_request(request)
+
+  items = response.get('items')
+  csv_filepaths = [b.get('name') for b in items]
+
+  final_string = ''
+
+  for word in parse_words(csv_filepaths, objects, storage_bucket, project_id):
+    final_string += word + '\n'
+
+  if subpaths:
+    client_obj = client.Client(project=project_id)
+    bucket_obj = bucket.Bucket(client_obj, project_id + '-mlengine')
+
+    bucket_obj.blob = google.cloud.storage.blob.Blob(trainer_folder
+                                                   + '/'
+                                                   + TOP_WORDS,
+                                                   bucket_obj)
+    bucket_obj.blob.upload_from_string(final_string,
+                                       content_type='text/plain')
+  return final_string.split()
+
+
+def parse_words(files, objects, b, project_id):
+  word_dict = {}
+
+  csv.field_size_limit(sys.maxsize)
+  for filepath in files:
+    media = fetch_training_csv(filepath, objects, b)
+
+    for row in csv.reader(StringIO.StringIO(media)):
+      _, content = row
+      words = content.split()
+
+      for word in words:
+        if word in word_dict:
+          word_dict[word] += 1
+        else:
+          word_dict[word] = 1
+
+  return GenerateTopWords(objects, word_dict, project_id)