Initial prototype
Change-Id: I60a94e90aab48dfcf7c1f03fe5613d1db7d0df95
diff --git a/mqtt_processor/mqtt_processor.go b/mqtt_processor/mqtt_processor.go
new file mode 100644
index 0000000..256c469
--- /dev/null
+++ b/mqtt_processor/mqtt_processor.go
@@ -0,0 +1,144 @@
+package mqtt_processor
+
+import (
+ "bufio"
+ "encoding/json"
+ "fmt"
+
+ "github.com/flashmob/go-guerrilla/backends"
+ "github.com/flashmob/go-guerrilla/mail"
+
+ // TODO(avm99963): Change to github.com/DusanKasan/parsemail once the bug fixes land there
+ "gomodules.avm99963.com/forks/parsemail"
+
+ "github.com/eclipse/paho.mqtt.golang"
+)
+
+const subtopicStatus = "status"
+
+type MQTTConfig struct {
+ MQTTBroker string `json:"mqtt_broker"`
+ MQTTClientID string `json:"mqtt_clientid"`
+ MQTTUsername string `json:"mqtt_username"`
+ MQTTPassword string `json:"mqtt_password"`
+ MQTTTopic string `json:"mqtt_topic"`
+ EmailToken string `json:"mqtt_email_token"`
+}
+
+type motionEvent struct {
+ Timestamp int64 `json:"timestamp"`
+ CameraID string `json:"-"`
+}
+
+func hasToken(email *parsemail.Email, config *MQTTConfig) bool {
+ if config.EmailToken == "" {
+ backends.Log().Error("The mqtt_email_token field is empty")
+ return false
+ }
+
+ return email.TextBody == "Y-Token: "+config.EmailToken
+}
+
+func processMotionEvent(e *mail.Envelope, config *MQTTConfig) (*motionEvent, error) {
+ event := &motionEvent{}
+
+ dataReader := bufio.NewReader(&e.Data)
+ email, err := parsemail.Parse(dataReader)
+ if err != nil {
+ return event, err
+ }
+
+ if !hasToken(&email, config) {
+ return event, fmt.Errorf("Message body doesn't include the correct token")
+ }
+
+ event.CameraID = e.MailFrom.User
+ event.Timestamp = email.Date.Unix()
+
+ return event, nil
+}
+
+func processEmail(e *mail.Envelope, c mqtt.Client, config *MQTTConfig) error {
+ event, err := processMotionEvent(e, config)
+ if err != nil {
+ return err
+ }
+
+ backends.Log().Info(fmt.Sprintf("Motion event received: %v", event))
+
+ topic := config.MQTTTopic + "/" + event.CameraID + "/motion"
+ msg, err := json.Marshal(event)
+ if err != nil {
+ return err
+ }
+
+ if token := c.Publish(topic, 2, true, msg); token.Wait() && token.Error() != nil {
+ return token.Error()
+ }
+
+ backends.Log().Debug("MQTT message published successfully")
+
+ return nil
+}
+
+var Processor = func() backends.Decorator {
+ var (
+ c mqtt.Client
+ )
+
+ config := &MQTTConfig{}
+
+ initFunc := backends.InitializeWith(func(backendConfig backends.BackendConfig) error {
+ // Get configuration
+ configType := backends.BaseConfig(&MQTTConfig{})
+ bcfg, err := backends.Svc.ExtractConfig(backendConfig, configType)
+ if err != nil {
+ return err
+ }
+ config = bcfg.(*MQTTConfig)
+
+ // Set options for MQTT client
+ opts := mqtt.NewClientOptions()
+ opts = opts.AddBroker(config.MQTTBroker)
+ opts = opts.SetClientID(config.MQTTClientID)
+ opts = opts.SetUsername(config.MQTTUsername).SetPassword(config.MQTTPassword)
+ opts = opts.SetWill(config.MQTTTopic+"/"+subtopicStatus, "offline", 1, true)
+
+ // Create MQTT client and start the connection
+ c = mqtt.NewClient(opts)
+ if token := c.Connect(); token.Wait() && token.Error() != nil {
+ return token.Error()
+ }
+
+ // Send a birth message
+ if token := c.Publish(config.MQTTTopic+"/"+subtopicStatus, 1, true, "online"); token.Wait() && token.Error() != nil {
+ return token.Error()
+ }
+
+ backends.Log().Debug("MQTT client successfully started and connected")
+
+ return nil
+ })
+
+ backends.Svc.AddInitializer(initFunc)
+ return func(p backends.Processor) backends.Processor {
+ return backends.ProcessWith(
+ func(e *mail.Envelope, task backends.SelectTask) (backends.Result, error) {
+ if task == backends.TaskValidateRcpt {
+ // We don't validate the recipient
+ return p.Process(e, task)
+ } else if task == backends.TaskSaveMail {
+ err := processEmail(e, c, config)
+ if err != nil {
+ backends.Log().WithError(err).Error("Could not handle email")
+ return backends.NewResult("554 Error: could not handle email"), err
+ }
+
+ // call the next processor in the chain
+ return p.Process(e, task)
+ }
+ return p.Process(e, task)
+ },
+ )
+ }
+}