blob: 35c067f2d629d474427ed63d8cbdfff83ec78ff4 [file] [log] [blame]
avm9996388e622d2021-01-22 18:57:58 +01001package mqtt_processor
2
3import (
4 "bufio"
avm9996372705562021-03-28 00:08:18 +01005 "bytes"
avm999638355e002021-01-26 23:05:01 +01006 "context"
avm9996388e622d2021-01-22 18:57:58 +01007 "encoding/json"
8 "fmt"
avm9996372705562021-03-28 00:08:18 +01009 "io"
10 "io/ioutil"
avm999638355e002021-01-26 23:05:01 +010011 "net/url"
12 "strings"
13 "time"
avm9996388e622d2021-01-22 18:57:58 +010014
15 "github.com/flashmob/go-guerrilla/backends"
16 "github.com/flashmob/go-guerrilla/mail"
17
18 // TODO(avm99963): Change to github.com/DusanKasan/parsemail once the bug fixes land there
19 "gomodules.avm99963.com/forks/parsemail"
20
21 "github.com/eclipse/paho.mqtt.golang"
avm999638355e002021-01-26 23:05:01 +010022
23 "github.com/minio/minio-go/v7"
24 "github.com/minio/minio-go/v7/pkg/credentials"
avm9996388e622d2021-01-22 18:57:58 +010025)
26
27const subtopicStatus = "status"
28
29type MQTTConfig struct {
30 MQTTBroker string `json:"mqtt_broker"`
31 MQTTClientID string `json:"mqtt_clientid"`
32 MQTTUsername string `json:"mqtt_username"`
33 MQTTPassword string `json:"mqtt_password"`
34 MQTTTopic string `json:"mqtt_topic"`
35 EmailToken string `json:"mqtt_email_token"`
avm999638355e002021-01-26 23:05:01 +010036 S3Enabled string `json:"s3_enabled"`
37 S3Endpoint string `json:"s3_endpoint"`
38 S3AccessKey string `json:"s3_access_key"`
39 S3SecretKey string `json:"s3_secret_key"`
40 S3UseSSL string `json:"s3_use_ssl"`
41 S3Bucket string `json:"s3_bucket"`
avm9996388e622d2021-01-22 18:57:58 +010042}
43
44type motionEvent struct {
45 Timestamp int64 `json:"timestamp"`
46 CameraID string `json:"-"`
avm999638355e002021-01-26 23:05:01 +010047 ImageURL string `json:"image_url"`
avm9996388e622d2021-01-22 18:57:58 +010048}
49
50func hasToken(email *parsemail.Email, config *MQTTConfig) bool {
51 if config.EmailToken == "" {
52 backends.Log().Error("The mqtt_email_token field is empty")
53 return false
54 }
55
56 return email.TextBody == "Y-Token: "+config.EmailToken
57}
58
avm9996372705562021-03-28 00:08:18 +010059func getSize(stream io.Reader) (io.Reader, int, error) {
60 b, err := ioutil.ReadAll(stream)
61 if err != nil {
62 return nil, -1, err
63 }
64
65 r := bytes.NewReader(b)
66 return r, r.Len(), nil
67}
68
avm999638355e002021-01-26 23:05:01 +010069func uploadImage(email *parsemail.Email, cameraID string, config *MQTTConfig, s3c *minio.Client) (string, error) {
70 for _, a := range email.Attachments {
71 if strings.HasPrefix(a.ContentType, "image") {
72 // Upload image
73 ctx := context.Background()
74 fileName := cameraID + "/" + a.Filename
avm9996372705562021-03-28 00:08:18 +010075 data, size, err := getSize(a.Data)
76 if err != nil {
77 return "", err
78 }
79 n, err := s3c.PutObject(ctx, config.S3Bucket, fileName, data, int64(size), minio.PutObjectOptions{ContentType: a.ContentType})
avm999638355e002021-01-26 23:05:01 +010080 if err != nil {
81 return "", err
82 }
83
84 // Get image URL
85 reqParams := make(url.Values)
86 u, err := s3c.PresignedGetObject(ctx, config.S3Bucket, n.Key, time.Duration(2*24)*time.Hour, reqParams)
87 if err != nil {
88 return "", err
89 }
90
91 return u.String(), nil
92 }
93 }
94
95 backends.Log().Info("The message doesn't contain an image attachment")
96
97 return "", nil
98}
99
100func processMotionEvent(e *mail.Envelope, config *MQTTConfig, s3c *minio.Client) (*motionEvent, error) {
avm9996388e622d2021-01-22 18:57:58 +0100101 event := &motionEvent{}
102
103 dataReader := bufio.NewReader(&e.Data)
104 email, err := parsemail.Parse(dataReader)
105 if err != nil {
106 return event, err
107 }
108
109 if !hasToken(&email, config) {
110 return event, fmt.Errorf("Message body doesn't include the correct token")
111 }
112
113 event.CameraID = e.MailFrom.User
114 event.Timestamp = email.Date.Unix()
115
avm999638355e002021-01-26 23:05:01 +0100116 if config.S3Enabled == "true" {
117 imageURL, err := uploadImage(&email, event.CameraID, config, s3c)
118 if err != nil {
119 return event, err
120 }
121
122 event.ImageURL = imageURL
123 }
124
avm9996388e622d2021-01-22 18:57:58 +0100125 return event, nil
126}
127
avm999638355e002021-01-26 23:05:01 +0100128func processEmail(e *mail.Envelope, c mqtt.Client, s3c *minio.Client, config *MQTTConfig) error {
129 event, err := processMotionEvent(e, config, s3c)
avm9996388e622d2021-01-22 18:57:58 +0100130 if err != nil {
131 return err
132 }
133
134 backends.Log().Info(fmt.Sprintf("Motion event received: %v", event))
135
136 topic := config.MQTTTopic + "/" + event.CameraID + "/motion"
137 msg, err := json.Marshal(event)
138 if err != nil {
139 return err
140 }
141
142 if token := c.Publish(topic, 2, true, msg); token.Wait() && token.Error() != nil {
143 return token.Error()
144 }
145
146 backends.Log().Debug("MQTT message published successfully")
147
148 return nil
149}
150
151var Processor = func() backends.Decorator {
152 var (
avm999638355e002021-01-26 23:05:01 +0100153 c mqtt.Client
154 s3c *minio.Client
avm9996388e622d2021-01-22 18:57:58 +0100155 )
156
157 config := &MQTTConfig{}
158
159 initFunc := backends.InitializeWith(func(backendConfig backends.BackendConfig) error {
160 // Get configuration
161 configType := backends.BaseConfig(&MQTTConfig{})
162 bcfg, err := backends.Svc.ExtractConfig(backendConfig, configType)
163 if err != nil {
164 return err
165 }
166 config = bcfg.(*MQTTConfig)
167
168 // Set options for MQTT client
169 opts := mqtt.NewClientOptions()
170 opts = opts.AddBroker(config.MQTTBroker)
171 opts = opts.SetClientID(config.MQTTClientID)
172 opts = opts.SetUsername(config.MQTTUsername).SetPassword(config.MQTTPassword)
173 opts = opts.SetWill(config.MQTTTopic+"/"+subtopicStatus, "offline", 1, true)
174
175 // Create MQTT client and start the connection
176 c = mqtt.NewClient(opts)
177 if token := c.Connect(); token.Wait() && token.Error() != nil {
178 return token.Error()
179 }
180
181 // Send a birth message
182 if token := c.Publish(config.MQTTTopic+"/"+subtopicStatus, 1, true, "online"); token.Wait() && token.Error() != nil {
183 return token.Error()
184 }
185
avm999638355e002021-01-26 23:05:01 +0100186 // Set up the minio client if S3 storage is enabled
187 if config.S3Enabled == "true" {
188 var err error
189 s3c, err = minio.New(config.S3Endpoint, &minio.Options{
190 Creds: credentials.NewStaticV4(config.S3AccessKey, config.S3SecretKey, ""),
191 Secure: config.S3UseSSL == "true",
192 })
193 if err != nil {
194 return err
195 }
196 }
197
avm9996388e622d2021-01-22 18:57:58 +0100198 backends.Log().Debug("MQTT client successfully started and connected")
199
200 return nil
201 })
202
203 backends.Svc.AddInitializer(initFunc)
204 return func(p backends.Processor) backends.Processor {
205 return backends.ProcessWith(
206 func(e *mail.Envelope, task backends.SelectTask) (backends.Result, error) {
207 if task == backends.TaskValidateRcpt {
208 // We don't validate the recipient
209 return p.Process(e, task)
210 } else if task == backends.TaskSaveMail {
avm999638355e002021-01-26 23:05:01 +0100211 err := processEmail(e, c, s3c, config)
avm9996388e622d2021-01-22 18:57:58 +0100212 if err != nil {
213 backends.Log().WithError(err).Error("Could not handle email")
214 return backends.NewResult("554 Error: could not handle email"), err
215 }
216
217 // call the next processor in the chain
218 return p.Process(e, task)
219 }
220 return p.Process(e, task)
221 },
222 )
223 }
224}