blob: e96570216e30b3402a0c464f5667e1c1a79396cb [file] [log] [blame]
avm9996388e622d2021-01-22 18:57:58 +01001package mqtt_processor
2
3import (
4 "bufio"
avm999638355e002021-01-26 23:05:01 +01005 "context"
avm9996388e622d2021-01-22 18:57:58 +01006 "encoding/json"
7 "fmt"
avm999638355e002021-01-26 23:05:01 +01008 "net/url"
9 "strings"
10 "time"
avm9996388e622d2021-01-22 18:57:58 +010011
12 "github.com/flashmob/go-guerrilla/backends"
13 "github.com/flashmob/go-guerrilla/mail"
14
15 // TODO(avm99963): Change to github.com/DusanKasan/parsemail once the bug fixes land there
16 "gomodules.avm99963.com/forks/parsemail"
17
18 "github.com/eclipse/paho.mqtt.golang"
avm999638355e002021-01-26 23:05:01 +010019
20 "github.com/minio/minio-go/v7"
21 "github.com/minio/minio-go/v7/pkg/credentials"
avm9996388e622d2021-01-22 18:57:58 +010022)
23
24const subtopicStatus = "status"
25
26type MQTTConfig struct {
27 MQTTBroker string `json:"mqtt_broker"`
28 MQTTClientID string `json:"mqtt_clientid"`
29 MQTTUsername string `json:"mqtt_username"`
30 MQTTPassword string `json:"mqtt_password"`
31 MQTTTopic string `json:"mqtt_topic"`
32 EmailToken string `json:"mqtt_email_token"`
avm999638355e002021-01-26 23:05:01 +010033 S3Enabled string `json:"s3_enabled"`
34 S3Endpoint string `json:"s3_endpoint"`
35 S3AccessKey string `json:"s3_access_key"`
36 S3SecretKey string `json:"s3_secret_key"`
37 S3UseSSL string `json:"s3_use_ssl"`
38 S3Bucket string `json:"s3_bucket"`
avm9996388e622d2021-01-22 18:57:58 +010039}
40
41type motionEvent struct {
42 Timestamp int64 `json:"timestamp"`
43 CameraID string `json:"-"`
avm999638355e002021-01-26 23:05:01 +010044 ImageURL string `json:"image_url"`
avm9996388e622d2021-01-22 18:57:58 +010045}
46
47func hasToken(email *parsemail.Email, config *MQTTConfig) bool {
48 if config.EmailToken == "" {
49 backends.Log().Error("The mqtt_email_token field is empty")
50 return false
51 }
52
53 return email.TextBody == "Y-Token: "+config.EmailToken
54}
55
avm999638355e002021-01-26 23:05:01 +010056func uploadImage(email *parsemail.Email, cameraID string, config *MQTTConfig, s3c *minio.Client) (string, error) {
57 for _, a := range email.Attachments {
58 if strings.HasPrefix(a.ContentType, "image") {
59 // Upload image
60 ctx := context.Background()
61 fileName := cameraID + "/" + a.Filename
62 n, err := s3c.PutObject(ctx, config.S3Bucket, fileName, a.Data, -1, minio.PutObjectOptions{ContentType: a.ContentType})
63 if err != nil {
64 return "", err
65 }
66
67 // Get image URL
68 reqParams := make(url.Values)
69 u, err := s3c.PresignedGetObject(ctx, config.S3Bucket, n.Key, time.Duration(2*24)*time.Hour, reqParams)
70 if err != nil {
71 return "", err
72 }
73
74 return u.String(), nil
75 }
76 }
77
78 backends.Log().Info("The message doesn't contain an image attachment")
79
80 return "", nil
81}
82
83func processMotionEvent(e *mail.Envelope, config *MQTTConfig, s3c *minio.Client) (*motionEvent, error) {
avm9996388e622d2021-01-22 18:57:58 +010084 event := &motionEvent{}
85
86 dataReader := bufio.NewReader(&e.Data)
87 email, err := parsemail.Parse(dataReader)
88 if err != nil {
89 return event, err
90 }
91
92 if !hasToken(&email, config) {
93 return event, fmt.Errorf("Message body doesn't include the correct token")
94 }
95
96 event.CameraID = e.MailFrom.User
97 event.Timestamp = email.Date.Unix()
98
avm999638355e002021-01-26 23:05:01 +010099 if config.S3Enabled == "true" {
100 imageURL, err := uploadImage(&email, event.CameraID, config, s3c)
101 if err != nil {
102 return event, err
103 }
104
105 event.ImageURL = imageURL
106 }
107
avm9996388e622d2021-01-22 18:57:58 +0100108 return event, nil
109}
110
avm999638355e002021-01-26 23:05:01 +0100111func processEmail(e *mail.Envelope, c mqtt.Client, s3c *minio.Client, config *MQTTConfig) error {
112 event, err := processMotionEvent(e, config, s3c)
avm9996388e622d2021-01-22 18:57:58 +0100113 if err != nil {
114 return err
115 }
116
117 backends.Log().Info(fmt.Sprintf("Motion event received: %v", event))
118
119 topic := config.MQTTTopic + "/" + event.CameraID + "/motion"
120 msg, err := json.Marshal(event)
121 if err != nil {
122 return err
123 }
124
125 if token := c.Publish(topic, 2, true, msg); token.Wait() && token.Error() != nil {
126 return token.Error()
127 }
128
129 backends.Log().Debug("MQTT message published successfully")
130
131 return nil
132}
133
134var Processor = func() backends.Decorator {
135 var (
avm999638355e002021-01-26 23:05:01 +0100136 c mqtt.Client
137 s3c *minio.Client
avm9996388e622d2021-01-22 18:57:58 +0100138 )
139
140 config := &MQTTConfig{}
141
142 initFunc := backends.InitializeWith(func(backendConfig backends.BackendConfig) error {
143 // Get configuration
144 configType := backends.BaseConfig(&MQTTConfig{})
145 bcfg, err := backends.Svc.ExtractConfig(backendConfig, configType)
146 if err != nil {
147 return err
148 }
149 config = bcfg.(*MQTTConfig)
150
151 // Set options for MQTT client
152 opts := mqtt.NewClientOptions()
153 opts = opts.AddBroker(config.MQTTBroker)
154 opts = opts.SetClientID(config.MQTTClientID)
155 opts = opts.SetUsername(config.MQTTUsername).SetPassword(config.MQTTPassword)
156 opts = opts.SetWill(config.MQTTTopic+"/"+subtopicStatus, "offline", 1, true)
157
158 // Create MQTT client and start the connection
159 c = mqtt.NewClient(opts)
160 if token := c.Connect(); token.Wait() && token.Error() != nil {
161 return token.Error()
162 }
163
164 // Send a birth message
165 if token := c.Publish(config.MQTTTopic+"/"+subtopicStatus, 1, true, "online"); token.Wait() && token.Error() != nil {
166 return token.Error()
167 }
168
avm999638355e002021-01-26 23:05:01 +0100169 // Set up the minio client if S3 storage is enabled
170 if config.S3Enabled == "true" {
171 var err error
172 s3c, err = minio.New(config.S3Endpoint, &minio.Options{
173 Creds: credentials.NewStaticV4(config.S3AccessKey, config.S3SecretKey, ""),
174 Secure: config.S3UseSSL == "true",
175 })
176 if err != nil {
177 return err
178 }
179 }
180
avm9996388e622d2021-01-22 18:57:58 +0100181 backends.Log().Debug("MQTT client successfully started and connected")
182
183 return nil
184 })
185
186 backends.Svc.AddInitializer(initFunc)
187 return func(p backends.Processor) backends.Processor {
188 return backends.ProcessWith(
189 func(e *mail.Envelope, task backends.SelectTask) (backends.Result, error) {
190 if task == backends.TaskValidateRcpt {
191 // We don't validate the recipient
192 return p.Process(e, task)
193 } else if task == backends.TaskSaveMail {
avm999638355e002021-01-26 23:05:01 +0100194 err := processEmail(e, c, s3c, config)
avm9996388e622d2021-01-22 18:57:58 +0100195 if err != nil {
196 backends.Log().WithError(err).Error("Could not handle email")
197 return backends.NewResult("554 Error: could not handle email"), err
198 }
199
200 // call the next processor in the chain
201 return p.Process(e, task)
202 }
203 return p.Process(e, task)
204 },
205 )
206 }
207}