blob: 256c46978f9e9b4897131ddd7d5dad4a49f81934 [file] [log] [blame]
package mqtt_processor
import (
"bufio"
"encoding/json"
"fmt"
"github.com/flashmob/go-guerrilla/backends"
"github.com/flashmob/go-guerrilla/mail"
// TODO(avm99963): Change to github.com/DusanKasan/parsemail once the bug fixes land there
"gomodules.avm99963.com/forks/parsemail"
"github.com/eclipse/paho.mqtt.golang"
)
const subtopicStatus = "status"
type MQTTConfig struct {
MQTTBroker string `json:"mqtt_broker"`
MQTTClientID string `json:"mqtt_clientid"`
MQTTUsername string `json:"mqtt_username"`
MQTTPassword string `json:"mqtt_password"`
MQTTTopic string `json:"mqtt_topic"`
EmailToken string `json:"mqtt_email_token"`
}
type motionEvent struct {
Timestamp int64 `json:"timestamp"`
CameraID string `json:"-"`
}
func hasToken(email *parsemail.Email, config *MQTTConfig) bool {
if config.EmailToken == "" {
backends.Log().Error("The mqtt_email_token field is empty")
return false
}
return email.TextBody == "Y-Token: "+config.EmailToken
}
func processMotionEvent(e *mail.Envelope, config *MQTTConfig) (*motionEvent, error) {
event := &motionEvent{}
dataReader := bufio.NewReader(&e.Data)
email, err := parsemail.Parse(dataReader)
if err != nil {
return event, err
}
if !hasToken(&email, config) {
return event, fmt.Errorf("Message body doesn't include the correct token")
}
event.CameraID = e.MailFrom.User
event.Timestamp = email.Date.Unix()
return event, nil
}
func processEmail(e *mail.Envelope, c mqtt.Client, config *MQTTConfig) error {
event, err := processMotionEvent(e, config)
if err != nil {
return err
}
backends.Log().Info(fmt.Sprintf("Motion event received: %v", event))
topic := config.MQTTTopic + "/" + event.CameraID + "/motion"
msg, err := json.Marshal(event)
if err != nil {
return err
}
if token := c.Publish(topic, 2, true, msg); token.Wait() && token.Error() != nil {
return token.Error()
}
backends.Log().Debug("MQTT message published successfully")
return nil
}
var Processor = func() backends.Decorator {
var (
c mqtt.Client
)
config := &MQTTConfig{}
initFunc := backends.InitializeWith(func(backendConfig backends.BackendConfig) error {
// Get configuration
configType := backends.BaseConfig(&MQTTConfig{})
bcfg, err := backends.Svc.ExtractConfig(backendConfig, configType)
if err != nil {
return err
}
config = bcfg.(*MQTTConfig)
// Set options for MQTT client
opts := mqtt.NewClientOptions()
opts = opts.AddBroker(config.MQTTBroker)
opts = opts.SetClientID(config.MQTTClientID)
opts = opts.SetUsername(config.MQTTUsername).SetPassword(config.MQTTPassword)
opts = opts.SetWill(config.MQTTTopic+"/"+subtopicStatus, "offline", 1, true)
// Create MQTT client and start the connection
c = mqtt.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}
// Send a birth message
if token := c.Publish(config.MQTTTopic+"/"+subtopicStatus, 1, true, "online"); token.Wait() && token.Error() != nil {
return token.Error()
}
backends.Log().Debug("MQTT client successfully started and connected")
return nil
})
backends.Svc.AddInitializer(initFunc)
return func(p backends.Processor) backends.Processor {
return backends.ProcessWith(
func(e *mail.Envelope, task backends.SelectTask) (backends.Result, error) {
if task == backends.TaskValidateRcpt {
// We don't validate the recipient
return p.Process(e, task)
} else if task == backends.TaskSaveMail {
err := processEmail(e, c, config)
if err != nil {
backends.Log().WithError(err).Error("Could not handle email")
return backends.NewResult("554 Error: could not handle email"), err
}
// call the next processor in the chain
return p.Process(e, task)
}
return p.Process(e, task)
},
)
}
}