Initial prototype

Change-Id: I60a94e90aab48dfcf7c1f03fe5613d1db7d0df95
diff --git a/mqtt_processor/mqtt_processor.go b/mqtt_processor/mqtt_processor.go
new file mode 100644
index 0000000..256c469
--- /dev/null
+++ b/mqtt_processor/mqtt_processor.go
@@ -0,0 +1,144 @@
+package mqtt_processor
+
+import (
+	"bufio"
+	"encoding/json"
+	"fmt"
+
+	"github.com/flashmob/go-guerrilla/backends"
+	"github.com/flashmob/go-guerrilla/mail"
+
+	// TODO(avm99963): Change to github.com/DusanKasan/parsemail once the bug fixes land there
+	"gomodules.avm99963.com/forks/parsemail"
+
+	"github.com/eclipse/paho.mqtt.golang"
+)
+
+const subtopicStatus = "status"
+
+type MQTTConfig struct {
+	MQTTBroker   string `json:"mqtt_broker"`
+	MQTTClientID string `json:"mqtt_clientid"`
+	MQTTUsername string `json:"mqtt_username"`
+	MQTTPassword string `json:"mqtt_password"`
+	MQTTTopic    string `json:"mqtt_topic"`
+	EmailToken   string `json:"mqtt_email_token"`
+}
+
+type motionEvent struct {
+	Timestamp int64  `json:"timestamp"`
+	CameraID  string `json:"-"`
+}
+
+func hasToken(email *parsemail.Email, config *MQTTConfig) bool {
+	if config.EmailToken == "" {
+		backends.Log().Error("The mqtt_email_token field is empty")
+		return false
+	}
+
+	return email.TextBody == "Y-Token: "+config.EmailToken
+}
+
+func processMotionEvent(e *mail.Envelope, config *MQTTConfig) (*motionEvent, error) {
+	event := &motionEvent{}
+
+	dataReader := bufio.NewReader(&e.Data)
+	email, err := parsemail.Parse(dataReader)
+	if err != nil {
+		return event, err
+	}
+
+	if !hasToken(&email, config) {
+		return event, fmt.Errorf("Message body doesn't include the correct token")
+	}
+
+	event.CameraID = e.MailFrom.User
+	event.Timestamp = email.Date.Unix()
+
+	return event, nil
+}
+
+func processEmail(e *mail.Envelope, c mqtt.Client, config *MQTTConfig) error {
+	event, err := processMotionEvent(e, config)
+	if err != nil {
+		return err
+	}
+
+	backends.Log().Info(fmt.Sprintf("Motion event received: %v", event))
+
+	topic := config.MQTTTopic + "/" + event.CameraID + "/motion"
+	msg, err := json.Marshal(event)
+	if err != nil {
+		return err
+	}
+
+	if token := c.Publish(topic, 2, true, msg); token.Wait() && token.Error() != nil {
+		return token.Error()
+	}
+
+	backends.Log().Debug("MQTT message published successfully")
+
+	return nil
+}
+
+var Processor = func() backends.Decorator {
+	var (
+		c mqtt.Client
+	)
+
+	config := &MQTTConfig{}
+
+	initFunc := backends.InitializeWith(func(backendConfig backends.BackendConfig) error {
+		// Get configuration
+		configType := backends.BaseConfig(&MQTTConfig{})
+		bcfg, err := backends.Svc.ExtractConfig(backendConfig, configType)
+		if err != nil {
+			return err
+		}
+		config = bcfg.(*MQTTConfig)
+
+		// Set options for MQTT client
+		opts := mqtt.NewClientOptions()
+		opts = opts.AddBroker(config.MQTTBroker)
+		opts = opts.SetClientID(config.MQTTClientID)
+		opts = opts.SetUsername(config.MQTTUsername).SetPassword(config.MQTTPassword)
+		opts = opts.SetWill(config.MQTTTopic+"/"+subtopicStatus, "offline", 1, true)
+
+		// Create MQTT client and start the connection
+		c = mqtt.NewClient(opts)
+		if token := c.Connect(); token.Wait() && token.Error() != nil {
+			return token.Error()
+		}
+
+		// Send a birth message
+		if token := c.Publish(config.MQTTTopic+"/"+subtopicStatus, 1, true, "online"); token.Wait() && token.Error() != nil {
+			return token.Error()
+		}
+
+		backends.Log().Debug("MQTT client successfully started and connected")
+
+		return nil
+	})
+
+	backends.Svc.AddInitializer(initFunc)
+	return func(p backends.Processor) backends.Processor {
+		return backends.ProcessWith(
+			func(e *mail.Envelope, task backends.SelectTask) (backends.Result, error) {
+				if task == backends.TaskValidateRcpt {
+					// We don't validate the recipient
+					return p.Process(e, task)
+				} else if task == backends.TaskSaveMail {
+					err := processEmail(e, c, config)
+					if err != nil {
+						backends.Log().WithError(err).Error("Could not handle email")
+						return backends.NewResult("554 Error: could not handle email"), err
+					}
+
+					// call the next processor in the chain
+					return p.Process(e, task)
+				}
+				return p.Process(e, task)
+			},
+		)
+	}
+}