| package mqtt_processor |
| |
| import ( |
| "bufio" |
| "encoding/json" |
| "fmt" |
| |
| "github.com/flashmob/go-guerrilla/backends" |
| "github.com/flashmob/go-guerrilla/mail" |
| |
| // TODO(avm99963): Change to github.com/DusanKasan/parsemail once the bug fixes land there |
| "gomodules.avm99963.com/forks/parsemail" |
| |
| "github.com/eclipse/paho.mqtt.golang" |
| ) |
| |
| const subtopicStatus = "status" |
| |
| type MQTTConfig struct { |
| MQTTBroker string `json:"mqtt_broker"` |
| MQTTClientID string `json:"mqtt_clientid"` |
| MQTTUsername string `json:"mqtt_username"` |
| MQTTPassword string `json:"mqtt_password"` |
| MQTTTopic string `json:"mqtt_topic"` |
| EmailToken string `json:"mqtt_email_token"` |
| } |
| |
| type motionEvent struct { |
| Timestamp int64 `json:"timestamp"` |
| CameraID string `json:"-"` |
| } |
| |
| func hasToken(email *parsemail.Email, config *MQTTConfig) bool { |
| if config.EmailToken == "" { |
| backends.Log().Error("The mqtt_email_token field is empty") |
| return false |
| } |
| |
| return email.TextBody == "Y-Token: "+config.EmailToken |
| } |
| |
| func processMotionEvent(e *mail.Envelope, config *MQTTConfig) (*motionEvent, error) { |
| event := &motionEvent{} |
| |
| dataReader := bufio.NewReader(&e.Data) |
| email, err := parsemail.Parse(dataReader) |
| if err != nil { |
| return event, err |
| } |
| |
| if !hasToken(&email, config) { |
| return event, fmt.Errorf("Message body doesn't include the correct token") |
| } |
| |
| event.CameraID = e.MailFrom.User |
| event.Timestamp = email.Date.Unix() |
| |
| return event, nil |
| } |
| |
| func processEmail(e *mail.Envelope, c mqtt.Client, config *MQTTConfig) error { |
| event, err := processMotionEvent(e, config) |
| if err != nil { |
| return err |
| } |
| |
| backends.Log().Info(fmt.Sprintf("Motion event received: %v", event)) |
| |
| topic := config.MQTTTopic + "/" + event.CameraID + "/motion" |
| msg, err := json.Marshal(event) |
| if err != nil { |
| return err |
| } |
| |
| if token := c.Publish(topic, 2, true, msg); token.Wait() && token.Error() != nil { |
| return token.Error() |
| } |
| |
| backends.Log().Debug("MQTT message published successfully") |
| |
| return nil |
| } |
| |
| var Processor = func() backends.Decorator { |
| var ( |
| c mqtt.Client |
| ) |
| |
| config := &MQTTConfig{} |
| |
| initFunc := backends.InitializeWith(func(backendConfig backends.BackendConfig) error { |
| // Get configuration |
| configType := backends.BaseConfig(&MQTTConfig{}) |
| bcfg, err := backends.Svc.ExtractConfig(backendConfig, configType) |
| if err != nil { |
| return err |
| } |
| config = bcfg.(*MQTTConfig) |
| |
| // Set options for MQTT client |
| opts := mqtt.NewClientOptions() |
| opts = opts.AddBroker(config.MQTTBroker) |
| opts = opts.SetClientID(config.MQTTClientID) |
| opts = opts.SetUsername(config.MQTTUsername).SetPassword(config.MQTTPassword) |
| opts = opts.SetWill(config.MQTTTopic+"/"+subtopicStatus, "offline", 1, true) |
| |
| // Create MQTT client and start the connection |
| c = mqtt.NewClient(opts) |
| if token := c.Connect(); token.Wait() && token.Error() != nil { |
| return token.Error() |
| } |
| |
| // Send a birth message |
| if token := c.Publish(config.MQTTTopic+"/"+subtopicStatus, 1, true, "online"); token.Wait() && token.Error() != nil { |
| return token.Error() |
| } |
| |
| backends.Log().Debug("MQTT client successfully started and connected") |
| |
| return nil |
| }) |
| |
| backends.Svc.AddInitializer(initFunc) |
| return func(p backends.Processor) backends.Processor { |
| return backends.ProcessWith( |
| func(e *mail.Envelope, task backends.SelectTask) (backends.Result, error) { |
| if task == backends.TaskValidateRcpt { |
| // We don't validate the recipient |
| return p.Process(e, task) |
| } else if task == backends.TaskSaveMail { |
| err := processEmail(e, c, config) |
| if err != nil { |
| backends.Log().WithError(err).Error("Could not handle email") |
| return backends.NewResult("554 Error: could not handle email"), err |
| } |
| |
| // call the next processor in the chain |
| return p.Process(e, task) |
| } |
| return p.Process(e, task) |
| }, |
| ) |
| } |
| } |