blob: 256c46978f9e9b4897131ddd7d5dad4a49f81934 [file] [log] [blame]
avm9996388e622d2021-01-22 18:57:58 +01001package mqtt_processor
2
3import (
4 "bufio"
5 "encoding/json"
6 "fmt"
7
8 "github.com/flashmob/go-guerrilla/backends"
9 "github.com/flashmob/go-guerrilla/mail"
10
11 // TODO(avm99963): Change to github.com/DusanKasan/parsemail once the bug fixes land there
12 "gomodules.avm99963.com/forks/parsemail"
13
14 "github.com/eclipse/paho.mqtt.golang"
15)
16
17const subtopicStatus = "status"
18
19type MQTTConfig struct {
20 MQTTBroker string `json:"mqtt_broker"`
21 MQTTClientID string `json:"mqtt_clientid"`
22 MQTTUsername string `json:"mqtt_username"`
23 MQTTPassword string `json:"mqtt_password"`
24 MQTTTopic string `json:"mqtt_topic"`
25 EmailToken string `json:"mqtt_email_token"`
26}
27
28type motionEvent struct {
29 Timestamp int64 `json:"timestamp"`
30 CameraID string `json:"-"`
31}
32
33func hasToken(email *parsemail.Email, config *MQTTConfig) bool {
34 if config.EmailToken == "" {
35 backends.Log().Error("The mqtt_email_token field is empty")
36 return false
37 }
38
39 return email.TextBody == "Y-Token: "+config.EmailToken
40}
41
42func processMotionEvent(e *mail.Envelope, config *MQTTConfig) (*motionEvent, error) {
43 event := &motionEvent{}
44
45 dataReader := bufio.NewReader(&e.Data)
46 email, err := parsemail.Parse(dataReader)
47 if err != nil {
48 return event, err
49 }
50
51 if !hasToken(&email, config) {
52 return event, fmt.Errorf("Message body doesn't include the correct token")
53 }
54
55 event.CameraID = e.MailFrom.User
56 event.Timestamp = email.Date.Unix()
57
58 return event, nil
59}
60
61func processEmail(e *mail.Envelope, c mqtt.Client, config *MQTTConfig) error {
62 event, err := processMotionEvent(e, config)
63 if err != nil {
64 return err
65 }
66
67 backends.Log().Info(fmt.Sprintf("Motion event received: %v", event))
68
69 topic := config.MQTTTopic + "/" + event.CameraID + "/motion"
70 msg, err := json.Marshal(event)
71 if err != nil {
72 return err
73 }
74
75 if token := c.Publish(topic, 2, true, msg); token.Wait() && token.Error() != nil {
76 return token.Error()
77 }
78
79 backends.Log().Debug("MQTT message published successfully")
80
81 return nil
82}
83
84var Processor = func() backends.Decorator {
85 var (
86 c mqtt.Client
87 )
88
89 config := &MQTTConfig{}
90
91 initFunc := backends.InitializeWith(func(backendConfig backends.BackendConfig) error {
92 // Get configuration
93 configType := backends.BaseConfig(&MQTTConfig{})
94 bcfg, err := backends.Svc.ExtractConfig(backendConfig, configType)
95 if err != nil {
96 return err
97 }
98 config = bcfg.(*MQTTConfig)
99
100 // Set options for MQTT client
101 opts := mqtt.NewClientOptions()
102 opts = opts.AddBroker(config.MQTTBroker)
103 opts = opts.SetClientID(config.MQTTClientID)
104 opts = opts.SetUsername(config.MQTTUsername).SetPassword(config.MQTTPassword)
105 opts = opts.SetWill(config.MQTTTopic+"/"+subtopicStatus, "offline", 1, true)
106
107 // Create MQTT client and start the connection
108 c = mqtt.NewClient(opts)
109 if token := c.Connect(); token.Wait() && token.Error() != nil {
110 return token.Error()
111 }
112
113 // Send a birth message
114 if token := c.Publish(config.MQTTTopic+"/"+subtopicStatus, 1, true, "online"); token.Wait() && token.Error() != nil {
115 return token.Error()
116 }
117
118 backends.Log().Debug("MQTT client successfully started and connected")
119
120 return nil
121 })
122
123 backends.Svc.AddInitializer(initFunc)
124 return func(p backends.Processor) backends.Processor {
125 return backends.ProcessWith(
126 func(e *mail.Envelope, task backends.SelectTask) (backends.Result, error) {
127 if task == backends.TaskValidateRcpt {
128 // We don't validate the recipient
129 return p.Process(e, task)
130 } else if task == backends.TaskSaveMail {
131 err := processEmail(e, c, config)
132 if err != nil {
133 backends.Log().WithError(err).Error("Could not handle email")
134 return backends.NewResult("554 Error: could not handle email"), err
135 }
136
137 // call the next processor in the chain
138 return p.Process(e, task)
139 }
140 return p.Process(e, task)
141 },
142 )
143 }
144}