package user_core_client import ( "context" "encoding/json" "errors" "fmt" "github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go/sasl" "github.com/segmentio/kafka-go/sasl/plain" "github.com/segmentio/kafka-go/sasl/scram" "strings" "time" ) // Kafka 相关常量 const ( // UserSyncTopic 用户同步的Kafka主题 UserSyncTopic = "user-sync" ) type EventType string // 用户事件类型常量 const ( UserCreateEvent EventType = "user-create-event" // 用户创建事件 UserUpdateEvent EventType = "user-update-event" // 用户更新事件 UserDeleteEvent EventType = "user-delete-event" // 用户删除事件 ) type OptionKafkaSync = func(s *kafkaSync) func OptKafkaLogger(l Logger) func(s *kafkaSync) { return func(s *kafkaSync) { s.logger = l } } type KafkaConfig struct { Brokers []string `json:"brokers"` // kafka broker 地址 GroupID string `json:"group_id"` // kafka group id SaslUser string `json:"sasluser"` // kafka 用户名 SaslPassword string `json:"saslpassword"` // kafka 密码 SaslMechanism string `json:"saslmechanism"` // 可选值: plain / scram-sha256 / scram-sha512 } func NewUserCentKafkaSync(platformCode string, kafkaConfig *KafkaConfig, opts ...OptionKafkaSync) UserCentSync { k := &kafkaSync{ platformCode: platformCode, kafkaConfig: kafkaConfig, } for _, opt := range opts { opt(k) } if k.logger == nil { mLog := NewLogger() k.logger = mLog } return k } type kafkaSync struct { platformCode string kafkaConfig *KafkaConfig kafkaReader *kafka.Reader errHandler func(ctx context.Context, err error) error eventHandler func(ctx context.Context, event *UserEvent) error cancelFunc context.CancelFunc logger Logger } func (k *kafkaSync) Stop(ctx context.Context) error { // 取消上下文 if k.cancelFunc != nil { k.cancelFunc() } // 关闭 kafka 消费者 if k.kafkaReader != nil { if err := k.kafkaReader.Close(); err != nil { return fmt.Errorf("close kafka reader failed, err: %v", err) } } return nil } func (k *kafkaSync) Start(ctx context.Context) error { // 初始化 SASL 机制 var mechanism sasl.Mechanism switch strings.ToLower(k.kafkaConfig.SaslMechanism) { case "scram-sha256": mechanism, _ = scram.Mechanism(scram.SHA256, k.kafkaConfig.SaslUser, k.kafkaConfig.SaslPassword) case "scram-sha512": mechanism, _ = scram.Mechanism(scram.SHA512, k.kafkaConfig.SaslUser, k.kafkaConfig.SaslPassword) case "plain", "": // 默认 plain mechanism = plain.Mechanism{ Username: k.kafkaConfig.SaslUser, Password: k.kafkaConfig.SaslPassword, } default: return fmt.Errorf("unsupported sasl mechanism: %s", k.kafkaConfig.SaslMechanism) } dialer := &kafka.Dialer{ Timeout: 10 * time.Second, DualStack: true, SASLMechanism: mechanism, } // 初始化 kafka 消费者 k.kafkaReader = kafka.NewReader(kafka.ReaderConfig{ Brokers: k.kafkaConfig.Brokers, GroupID: k.kafkaConfig.GroupID, Topic: UserSyncTopic, // 订阅主题 MinBytes: 10, // 最小字节数 MaxBytes: 10e6, // 最大字节数 MaxWait: 10 * time.Second, // 最大等待时间 CommitInterval: 100 * time.Millisecond, // 提交偏移量的时间间隔 StartOffset: kafka.LastOffset, // 从最后一个偏移量开始消费 WatchPartitionChanges: true, // 监听分区变化 Dialer: dialer, // 自定义拨号器 }) sCtx, cancelFunc := context.WithCancel(ctx) k.cancelFunc = cancelFunc // 启动消费循环 go k.startLoop(sCtx) return nil } func (k *kafkaSync) SetCallUserListener(f func(context.Context, *UserEvent) error) { k.eventHandler = f } func (k *kafkaSync) callEventHandler(ctx context.Context, message *kafka.Message) error { if k.eventHandler == nil { return nil } var userEvent UserEvent err := json.Unmarshal(message.Value, &userEvent) if err != nil { return fmt.Errorf("unmarshal user event failed topic: %s message: %s err: %v", message.Topic, message.Value, err) } // 过滤非当前平台的事件 if userEvent.PlatformCode != k.platformCode { return nil } k.logger.Printf("received event topic: %s message: %s", message.Topic, message.Value) return k.eventHandler(ctx, &userEvent) } func (k *kafkaSync) SetErrorListener(f func(context.Context, error) error) { k.errHandler = f } func (k *kafkaSync) callErrorListener(ctx context.Context, err error) error { if k.errHandler == nil { return nil } return k.errHandler(ctx, err) } func (k *kafkaSync) startLoop(ctx context.Context) { k.logger.Printf("kafka reader start loop") defer k.logger.Printf("kafka reader stop") for { // 从 kafka 读取消息, 该消息会阻塞,直到有消息可读 // 并且里面也判断了 ctx 的取消,所以不再判断 message, err := k.kafkaReader.FetchMessage(ctx) if err != nil { if errors.Is(err, context.Canceled) { return } // 其他错误, 直接返回 k.logger.Printf("kafka reader fetch message failed, err: %v", err) continue } // 解析消息 var event BaseEvent if err := json.Unmarshal(message.Value, &event); err != nil { if err := k.callErrorListener(ctx, err); err != nil { k.logger.Printf("call error listener failed, err: %v", err) continue } } // 是用户事件 就处理 if strings.HasPrefix(event.EventType, "user-") { if err := k.callEventHandler(ctx, &message); err != nil { k.logger.Printf("call event handler failed, err: %v", err) continue } } // 提交偏移量 if err := k.kafkaReader.CommitMessages(ctx, message); err != nil { k.logger.Printf("commit message failed, err: %v", err) continue } } }