//消息处理
func messageProc(messageBody string) error {
messagePush := &commonModel.MessagePush{}
if err := json.Unmarshal([]byte(messageBody), messagePush); err != nil {
logrus.Errorf("message unmarshal fail, msg %s, err %v", messageBody, err)
return err
}
err := factory.CreateConsumer(messagePush.InfoType, messagePush.ActionType).Process(messagePush) //CreateConsumer映射对应的累,.Process()累下的方法来处理
if err != nil {
logrus.Errorf("message consumer fail, msg %s, err %v", messageBody, err)
return err
}
if err := store.BehaviorLogMysql.SaveBehaviorLog(messagePush); err == nil {
logrus.WithField(consts.Operate, consts.OperateProcMsgSuccess).Infof("message proc success, %s", messageBody)
}
return err
}
-----------------------------------
func (f *Factory) CreateConsumer(infoType model.InfoType, actionType model.ActionType) Consumer {
switch infoType {
case model.InfoTypeArticle:
return createArticleConsumer(actionType)
case model.InfoTypeComment:
return createCommentConsumer(actionType)
case model.InfoTypeDraftArticle:
return createDraftArticleConsumer(actionType)
default:
logrus.Errorf("Unsupported consumer info type %d", infoType)
return &Empty{}
}
}
func createArticleConsumer(actionType model.ActionType) Consumer {
switch actionType {
case model.ActionTypeCreate:
return &article.Create{}
case model.ActionTypeDelete:
return &article.Delete{}
case model.ActionTypePublish:
return &article.Publish{}
case model.ActionTypeLike:
return &article.Like{}
case model.ActionTypeUndoLike:
return &article.LikeUndo{}
case model.ActionTypeUnlike:
return &article.Hate{}
case model.ActionTypeFavorite:
return &article.Favorite{}
case model.ActionTypeUndoFavorite:
return &article.FavoriteUndo{}
case model.ActionTypeReward:
return &article.Reward{}
default:
logrus.Errorf("Unsupported consumer action type %d", actionType)
return &Empty{}
}
}
func createCommentConsumer(actionType model.ActionType) Consumer {
switch actionType {
case model.ActionTypeCreate:
return &comment.Create{}
case model.ActionTypeDelete:
return &comment.Delete{}
case model.ActionTypeVoteDown:
return &comment.VoteDown{}
case model.ActionTypeVoteUp:
return &comment.VoteUp{}
default:
logrus.Errorf("Unsupported consumer action type %d", actionType)
return &Empty{}
}
}
func createDraftArticleConsumer(actionType model.ActionType) Consumer {
switch actionType {
case model.ActionTypeSave:
return &draftArticle.Save{}
default:
logrus.Errorf("Unsupported consumer action type %d", actionType)
return &Empty{}
}
}
-----------------------------------
//比如删除得Process
type Delete struct {
}
// 文章删除
// 1. 删除 oss 上的文件
// 2. 删除 es 中的文章
func (a *Delete) Process(message *commonModel.MessagePush) error {
id := utils.SNToID(message.InfoID)
objectKey := fmt.Sprintf(consts.ArticlePath, message.InfoID, id)
if err := store.OSSClient.DeleteObject(config.GlobalConfig.Bucket, objectKey); err != nil {
logrus.Errorf("delete bucket %s object %s err, %v", config.GlobalConfig.Bucket, objectKey, err)
return err
}
// 删除 es 索引文件
searchApi := client.NewSearchAPI()
_, err := searchApi.ApiV1SearchArticlesSnDelete(context.TODO(), strconv.Itoa(int(message.InfoID)))
if err != nil {
logrus.Errorf("delete article error, article %d, err %v", message.InfoID, err)
return err
}
return nil
}