CreateConsumer映射对应的累,.Process()累下的方法来处理

mac2024-03-30  30

//消息处理 func messageProc(messageBody string) error { messagePush := &commonModel.MessagePush{} if err := json.Unmarshal([]byte(messageBody), messagePush); err != nil { logrus.Errorf("message unmarshal fail, msg %s, err %v", messageBody, err) return err } err := factory.CreateConsumer(messagePush.InfoType, messagePush.ActionType).Process(messagePush) //CreateConsumer映射对应的累,.Process()累下的方法来处理 if err != nil { logrus.Errorf("message consumer fail, msg %s, err %v", messageBody, err) return err } if err := store.BehaviorLogMysql.SaveBehaviorLog(messagePush); err == nil { logrus.WithField(consts.Operate, consts.OperateProcMsgSuccess).Infof("message proc success, %s", messageBody) } return err } ----------------------------------- func (f *Factory) CreateConsumer(infoType model.InfoType, actionType model.ActionType) Consumer { switch infoType { case model.InfoTypeArticle: return createArticleConsumer(actionType) case model.InfoTypeComment: return createCommentConsumer(actionType) case model.InfoTypeDraftArticle: return createDraftArticleConsumer(actionType) default: logrus.Errorf("Unsupported consumer info type %d", infoType) return &Empty{} } } func createArticleConsumer(actionType model.ActionType) Consumer { switch actionType { case model.ActionTypeCreate: return &article.Create{} case model.ActionTypeDelete: return &article.Delete{} case model.ActionTypePublish: return &article.Publish{} case model.ActionTypeLike: return &article.Like{} case model.ActionTypeUndoLike: return &article.LikeUndo{} case model.ActionTypeUnlike: return &article.Hate{} case model.ActionTypeFavorite: return &article.Favorite{} case model.ActionTypeUndoFavorite: return &article.FavoriteUndo{} case model.ActionTypeReward: return &article.Reward{} default: logrus.Errorf("Unsupported consumer action type %d", actionType) return &Empty{} } } func createCommentConsumer(actionType model.ActionType) Consumer { switch actionType { case model.ActionTypeCreate: return &comment.Create{} case model.ActionTypeDelete: return &comment.Delete{} case model.ActionTypeVoteDown: return &comment.VoteDown{} case model.ActionTypeVoteUp: return &comment.VoteUp{} default: logrus.Errorf("Unsupported consumer action type %d", actionType) return &Empty{} } } func createDraftArticleConsumer(actionType model.ActionType) Consumer { switch actionType { case model.ActionTypeSave: return &draftArticle.Save{} default: logrus.Errorf("Unsupported consumer action type %d", actionType) return &Empty{} } } ----------------------------------- //比如删除得Process type Delete struct { } // 文章删除 // 1. 删除 oss 上的文件 // 2. 删除 es 中的文章 func (a *Delete) Process(message *commonModel.MessagePush) error { id := utils.SNToID(message.InfoID) objectKey := fmt.Sprintf(consts.ArticlePath, message.InfoID, id) if err := store.OSSClient.DeleteObject(config.GlobalConfig.Bucket, objectKey); err != nil { logrus.Errorf("delete bucket %s object %s err, %v", config.GlobalConfig.Bucket, objectKey, err) return err } // 删除 es 索引文件 searchApi := client.NewSearchAPI() _, err := searchApi.ApiV1SearchArticlesSnDelete(context.TODO(), strconv.Itoa(int(message.InfoID))) if err != nil { logrus.Errorf("delete article error, article %d, err %v", message.InfoID, err) return err } return nil }

 

最新回复(0)