| package mqtt_processor |
| |
| import ( |
| "bufio" |
| "bytes" |
| "context" |
| "encoding/json" |
| "fmt" |
| "io" |
| "io/ioutil" |
| "net/url" |
| "strings" |
| "time" |
| |
| "github.com/flashmob/go-guerrilla/backends" |
| "github.com/flashmob/go-guerrilla/mail" |
| |
| // TODO(avm99963): Change to github.com/DusanKasan/parsemail once the bug fixes land there |
| "gomodules.avm99963.com/forks/parsemail" |
| |
| "github.com/eclipse/paho.mqtt.golang" |
| |
| "github.com/minio/minio-go/v7" |
| "github.com/minio/minio-go/v7/pkg/credentials" |
| ) |
| |
| const subtopicStatus = "status" |
| |
| type MQTTConfig struct { |
| MQTTBroker string `json:"mqtt_broker"` |
| MQTTClientID string `json:"mqtt_clientid"` |
| MQTTUsername string `json:"mqtt_username"` |
| MQTTPassword string `json:"mqtt_password"` |
| MQTTTopic string `json:"mqtt_topic"` |
| EmailToken string `json:"mqtt_email_token"` |
| S3Enabled string `json:"s3_enabled"` |
| S3Endpoint string `json:"s3_endpoint"` |
| S3AccessKey string `json:"s3_access_key"` |
| S3SecretKey string `json:"s3_secret_key"` |
| S3UseSSL string `json:"s3_use_ssl"` |
| S3Bucket string `json:"s3_bucket"` |
| } |
| |
| type motionEvent struct { |
| Timestamp int64 `json:"timestamp"` |
| CameraID string `json:"-"` |
| ImageURL string `json:"image_url"` |
| } |
| |
| func hasToken(email *parsemail.Email, config *MQTTConfig) bool { |
| if config.EmailToken == "" { |
| backends.Log().Error("The mqtt_email_token field is empty") |
| return false |
| } |
| |
| return email.TextBody == "Y-Token: "+config.EmailToken |
| } |
| |
| func getSize(stream io.Reader) (io.Reader, int, error) { |
| b, err := ioutil.ReadAll(stream) |
| if err != nil { |
| return nil, -1, err |
| } |
| |
| r := bytes.NewReader(b) |
| return r, r.Len(), nil |
| } |
| |
| func uploadImage(email *parsemail.Email, cameraID string, config *MQTTConfig, s3c *minio.Client) (string, error) { |
| for _, a := range email.Attachments { |
| if strings.HasPrefix(a.ContentType, "image") { |
| // Upload image |
| ctx := context.Background() |
| fileName := cameraID + "/" + a.Filename |
| data, size, err := getSize(a.Data) |
| if err != nil { |
| return "", err |
| } |
| n, err := s3c.PutObject(ctx, config.S3Bucket, fileName, data, int64(size), minio.PutObjectOptions{ContentType: a.ContentType}) |
| if err != nil { |
| return "", err |
| } |
| |
| // Get image URL |
| reqParams := make(url.Values) |
| u, err := s3c.PresignedGetObject(ctx, config.S3Bucket, n.Key, time.Duration(2*24)*time.Hour, reqParams) |
| if err != nil { |
| return "", err |
| } |
| |
| return u.String(), nil |
| } |
| } |
| |
| backends.Log().Info("The message doesn't contain an image attachment") |
| |
| return "", nil |
| } |
| |
| func processMotionEvent(e *mail.Envelope, config *MQTTConfig, s3c *minio.Client) (*motionEvent, error) { |
| event := &motionEvent{} |
| |
| dataReader := bufio.NewReader(&e.Data) |
| email, err := parsemail.Parse(dataReader) |
| if err != nil { |
| return event, err |
| } |
| |
| if !hasToken(&email, config) { |
| return event, fmt.Errorf("Message body doesn't include the correct token") |
| } |
| |
| event.CameraID = e.MailFrom.User |
| event.Timestamp = email.Date.Unix() |
| |
| if config.S3Enabled == "true" { |
| imageURL, err := uploadImage(&email, event.CameraID, config, s3c) |
| if err != nil { |
| return event, err |
| } |
| |
| event.ImageURL = imageURL |
| } |
| |
| return event, nil |
| } |
| |
| func processEmail(e *mail.Envelope, c mqtt.Client, s3c *minio.Client, config *MQTTConfig) error { |
| event, err := processMotionEvent(e, config, s3c) |
| if err != nil { |
| return err |
| } |
| |
| backends.Log().Info(fmt.Sprintf("Motion event received: %v", event)) |
| |
| topic := config.MQTTTopic + "/" + event.CameraID + "/motion" |
| msg, err := json.Marshal(event) |
| if err != nil { |
| return err |
| } |
| |
| if token := c.Publish(topic, 2, true, msg); token.Wait() && token.Error() != nil { |
| return token.Error() |
| } |
| |
| backends.Log().Debug("MQTT message published successfully") |
| |
| return nil |
| } |
| |
| var Processor = func() backends.Decorator { |
| var ( |
| c mqtt.Client |
| s3c *minio.Client |
| ) |
| |
| config := &MQTTConfig{} |
| |
| initFunc := backends.InitializeWith(func(backendConfig backends.BackendConfig) error { |
| // Get configuration |
| configType := backends.BaseConfig(&MQTTConfig{}) |
| bcfg, err := backends.Svc.ExtractConfig(backendConfig, configType) |
| if err != nil { |
| return err |
| } |
| config = bcfg.(*MQTTConfig) |
| |
| // Set options for MQTT client |
| opts := mqtt.NewClientOptions() |
| opts = opts.AddBroker(config.MQTTBroker) |
| opts = opts.SetClientID(config.MQTTClientID) |
| opts = opts.SetUsername(config.MQTTUsername).SetPassword(config.MQTTPassword) |
| opts = opts.SetWill(config.MQTTTopic+"/"+subtopicStatus, "offline", 1, true) |
| |
| // Create MQTT client and start the connection |
| c = mqtt.NewClient(opts) |
| if token := c.Connect(); token.Wait() && token.Error() != nil { |
| return token.Error() |
| } |
| |
| // Send a birth message |
| if token := c.Publish(config.MQTTTopic+"/"+subtopicStatus, 1, true, "online"); token.Wait() && token.Error() != nil { |
| return token.Error() |
| } |
| |
| // Set up the minio client if S3 storage is enabled |
| if config.S3Enabled == "true" { |
| var err error |
| s3c, err = minio.New(config.S3Endpoint, &minio.Options{ |
| Creds: credentials.NewStaticV4(config.S3AccessKey, config.S3SecretKey, ""), |
| Secure: config.S3UseSSL == "true", |
| }) |
| if err != nil { |
| return err |
| } |
| } |
| |
| backends.Log().Debug("MQTT client successfully started and connected") |
| |
| return nil |
| }) |
| |
| backends.Svc.AddInitializer(initFunc) |
| return func(p backends.Processor) backends.Processor { |
| return backends.ProcessWith( |
| func(e *mail.Envelope, task backends.SelectTask) (backends.Result, error) { |
| if task == backends.TaskValidateRcpt { |
| // We don't validate the recipient |
| return p.Process(e, task) |
| } else if task == backends.TaskSaveMail { |
| err := processEmail(e, c, s3c, config) |
| if err != nil { |
| backends.Log().WithError(err).Error("Could not handle email") |
| return backends.NewResult("554 Error: could not handle email"), err |
| } |
| |
| // call the next processor in the chain |
| return p.Process(e, task) |
| } |
| return p.Process(e, task) |
| }, |
| ) |
| } |
| } |