blob: 35c067f2d629d474427ed63d8cbdfff83ec78ff4 [file] [log] [blame]
package mqtt_processor
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/url"
"strings"
"time"
"github.com/flashmob/go-guerrilla/backends"
"github.com/flashmob/go-guerrilla/mail"
// TODO(avm99963): Change to github.com/DusanKasan/parsemail once the bug fixes land there
"gomodules.avm99963.com/forks/parsemail"
"github.com/eclipse/paho.mqtt.golang"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
)
const subtopicStatus = "status"
type MQTTConfig struct {
MQTTBroker string `json:"mqtt_broker"`
MQTTClientID string `json:"mqtt_clientid"`
MQTTUsername string `json:"mqtt_username"`
MQTTPassword string `json:"mqtt_password"`
MQTTTopic string `json:"mqtt_topic"`
EmailToken string `json:"mqtt_email_token"`
S3Enabled string `json:"s3_enabled"`
S3Endpoint string `json:"s3_endpoint"`
S3AccessKey string `json:"s3_access_key"`
S3SecretKey string `json:"s3_secret_key"`
S3UseSSL string `json:"s3_use_ssl"`
S3Bucket string `json:"s3_bucket"`
}
type motionEvent struct {
Timestamp int64 `json:"timestamp"`
CameraID string `json:"-"`
ImageURL string `json:"image_url"`
}
func hasToken(email *parsemail.Email, config *MQTTConfig) bool {
if config.EmailToken == "" {
backends.Log().Error("The mqtt_email_token field is empty")
return false
}
return email.TextBody == "Y-Token: "+config.EmailToken
}
func getSize(stream io.Reader) (io.Reader, int, error) {
b, err := ioutil.ReadAll(stream)
if err != nil {
return nil, -1, err
}
r := bytes.NewReader(b)
return r, r.Len(), nil
}
func uploadImage(email *parsemail.Email, cameraID string, config *MQTTConfig, s3c *minio.Client) (string, error) {
for _, a := range email.Attachments {
if strings.HasPrefix(a.ContentType, "image") {
// Upload image
ctx := context.Background()
fileName := cameraID + "/" + a.Filename
data, size, err := getSize(a.Data)
if err != nil {
return "", err
}
n, err := s3c.PutObject(ctx, config.S3Bucket, fileName, data, int64(size), minio.PutObjectOptions{ContentType: a.ContentType})
if err != nil {
return "", err
}
// Get image URL
reqParams := make(url.Values)
u, err := s3c.PresignedGetObject(ctx, config.S3Bucket, n.Key, time.Duration(2*24)*time.Hour, reqParams)
if err != nil {
return "", err
}
return u.String(), nil
}
}
backends.Log().Info("The message doesn't contain an image attachment")
return "", nil
}
func processMotionEvent(e *mail.Envelope, config *MQTTConfig, s3c *minio.Client) (*motionEvent, error) {
event := &motionEvent{}
dataReader := bufio.NewReader(&e.Data)
email, err := parsemail.Parse(dataReader)
if err != nil {
return event, err
}
if !hasToken(&email, config) {
return event, fmt.Errorf("Message body doesn't include the correct token")
}
event.CameraID = e.MailFrom.User
event.Timestamp = email.Date.Unix()
if config.S3Enabled == "true" {
imageURL, err := uploadImage(&email, event.CameraID, config, s3c)
if err != nil {
return event, err
}
event.ImageURL = imageURL
}
return event, nil
}
func processEmail(e *mail.Envelope, c mqtt.Client, s3c *minio.Client, config *MQTTConfig) error {
event, err := processMotionEvent(e, config, s3c)
if err != nil {
return err
}
backends.Log().Info(fmt.Sprintf("Motion event received: %v", event))
topic := config.MQTTTopic + "/" + event.CameraID + "/motion"
msg, err := json.Marshal(event)
if err != nil {
return err
}
if token := c.Publish(topic, 2, true, msg); token.Wait() && token.Error() != nil {
return token.Error()
}
backends.Log().Debug("MQTT message published successfully")
return nil
}
var Processor = func() backends.Decorator {
var (
c mqtt.Client
s3c *minio.Client
)
config := &MQTTConfig{}
initFunc := backends.InitializeWith(func(backendConfig backends.BackendConfig) error {
// Get configuration
configType := backends.BaseConfig(&MQTTConfig{})
bcfg, err := backends.Svc.ExtractConfig(backendConfig, configType)
if err != nil {
return err
}
config = bcfg.(*MQTTConfig)
// Set options for MQTT client
opts := mqtt.NewClientOptions()
opts = opts.AddBroker(config.MQTTBroker)
opts = opts.SetClientID(config.MQTTClientID)
opts = opts.SetUsername(config.MQTTUsername).SetPassword(config.MQTTPassword)
opts = opts.SetWill(config.MQTTTopic+"/"+subtopicStatus, "offline", 1, true)
// Create MQTT client and start the connection
c = mqtt.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}
// Send a birth message
if token := c.Publish(config.MQTTTopic+"/"+subtopicStatus, 1, true, "online"); token.Wait() && token.Error() != nil {
return token.Error()
}
// Set up the minio client if S3 storage is enabled
if config.S3Enabled == "true" {
var err error
s3c, err = minio.New(config.S3Endpoint, &minio.Options{
Creds: credentials.NewStaticV4(config.S3AccessKey, config.S3SecretKey, ""),
Secure: config.S3UseSSL == "true",
})
if err != nil {
return err
}
}
backends.Log().Debug("MQTT client successfully started and connected")
return nil
})
backends.Svc.AddInitializer(initFunc)
return func(p backends.Processor) backends.Processor {
return backends.ProcessWith(
func(e *mail.Envelope, task backends.SelectTask) (backends.Result, error) {
if task == backends.TaskValidateRcpt {
// We don't validate the recipient
return p.Process(e, task)
} else if task == backends.TaskSaveMail {
err := processEmail(e, c, s3c, config)
if err != nil {
backends.Log().WithError(err).Error("Could not handle email")
return backends.NewResult("554 Error: could not handle email"), err
}
// call the next processor in the chain
return p.Process(e, task)
}
return p.Process(e, task)
},
)
}
}