Add functionality to upload images to S3
This change adds the following functionality:
- Add the motion detection image to a S3 bucket.
- Pass a presigned URL linking to the image in the JSON payload of the
MQTT event message.
- Add the possibility of customizing whether images are uploaded to a S3
bucket or not.
Change-Id: Id82401839e92dbc62f3bbd1e7e7f0adeeb5718a6
diff --git a/mqtt_processor/mqtt_processor.go b/mqtt_processor/mqtt_processor.go
index 256c469..e965702 100644
--- a/mqtt_processor/mqtt_processor.go
+++ b/mqtt_processor/mqtt_processor.go
@@ -2,8 +2,12 @@
import (
"bufio"
+ "context"
"encoding/json"
"fmt"
+ "net/url"
+ "strings"
+ "time"
"github.com/flashmob/go-guerrilla/backends"
"github.com/flashmob/go-guerrilla/mail"
@@ -12,6 +16,9 @@
"gomodules.avm99963.com/forks/parsemail"
"github.com/eclipse/paho.mqtt.golang"
+
+ "github.com/minio/minio-go/v7"
+ "github.com/minio/minio-go/v7/pkg/credentials"
)
const subtopicStatus = "status"
@@ -23,11 +30,18 @@
MQTTPassword string `json:"mqtt_password"`
MQTTTopic string `json:"mqtt_topic"`
EmailToken string `json:"mqtt_email_token"`
+ S3Enabled string `json:"s3_enabled"`
+ S3Endpoint string `json:"s3_endpoint"`
+ S3AccessKey string `json:"s3_access_key"`
+ S3SecretKey string `json:"s3_secret_key"`
+ S3UseSSL string `json:"s3_use_ssl"`
+ S3Bucket string `json:"s3_bucket"`
}
type motionEvent struct {
Timestamp int64 `json:"timestamp"`
CameraID string `json:"-"`
+ ImageURL string `json:"image_url"`
}
func hasToken(email *parsemail.Email, config *MQTTConfig) bool {
@@ -39,7 +53,34 @@
return email.TextBody == "Y-Token: "+config.EmailToken
}
-func processMotionEvent(e *mail.Envelope, config *MQTTConfig) (*motionEvent, error) {
+func uploadImage(email *parsemail.Email, cameraID string, config *MQTTConfig, s3c *minio.Client) (string, error) {
+ for _, a := range email.Attachments {
+ if strings.HasPrefix(a.ContentType, "image") {
+ // Upload image
+ ctx := context.Background()
+ fileName := cameraID + "/" + a.Filename
+ n, err := s3c.PutObject(ctx, config.S3Bucket, fileName, a.Data, -1, minio.PutObjectOptions{ContentType: a.ContentType})
+ if err != nil {
+ return "", err
+ }
+
+ // Get image URL
+ reqParams := make(url.Values)
+ u, err := s3c.PresignedGetObject(ctx, config.S3Bucket, n.Key, time.Duration(2*24)*time.Hour, reqParams)
+ if err != nil {
+ return "", err
+ }
+
+ return u.String(), nil
+ }
+ }
+
+ backends.Log().Info("The message doesn't contain an image attachment")
+
+ return "", nil
+}
+
+func processMotionEvent(e *mail.Envelope, config *MQTTConfig, s3c *minio.Client) (*motionEvent, error) {
event := &motionEvent{}
dataReader := bufio.NewReader(&e.Data)
@@ -55,11 +96,20 @@
event.CameraID = e.MailFrom.User
event.Timestamp = email.Date.Unix()
+ if config.S3Enabled == "true" {
+ imageURL, err := uploadImage(&email, event.CameraID, config, s3c)
+ if err != nil {
+ return event, err
+ }
+
+ event.ImageURL = imageURL
+ }
+
return event, nil
}
-func processEmail(e *mail.Envelope, c mqtt.Client, config *MQTTConfig) error {
- event, err := processMotionEvent(e, config)
+func processEmail(e *mail.Envelope, c mqtt.Client, s3c *minio.Client, config *MQTTConfig) error {
+ event, err := processMotionEvent(e, config, s3c)
if err != nil {
return err
}
@@ -83,7 +133,8 @@
var Processor = func() backends.Decorator {
var (
- c mqtt.Client
+ c mqtt.Client
+ s3c *minio.Client
)
config := &MQTTConfig{}
@@ -115,6 +166,18 @@
return token.Error()
}
+ // Set up the minio client if S3 storage is enabled
+ if config.S3Enabled == "true" {
+ var err error
+ s3c, err = minio.New(config.S3Endpoint, &minio.Options{
+ Creds: credentials.NewStaticV4(config.S3AccessKey, config.S3SecretKey, ""),
+ Secure: config.S3UseSSL == "true",
+ })
+ if err != nil {
+ return err
+ }
+ }
+
backends.Log().Debug("MQTT client successfully started and connected")
return nil
@@ -128,7 +191,7 @@
// We don't validate the recipient
return p.Process(e, task)
} else if task == backends.TaskSaveMail {
- err := processEmail(e, c, config)
+ err := processEmail(e, c, s3c, config)
if err != nil {
backends.Log().WithError(err).Error("Could not handle email")
return backends.NewResult("554 Error: could not handle email"), err