user-core-client/client_sync_kafka_impl.go

216 lines
5.7 KiB
Go
Raw Normal View History

2025-08-19 13:52:01 +08:00
package user_core_client
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl"
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram"
"strings"
"time"
)
// Kafka 相关常量
const (
// UserSyncTopic 用户同步的Kafka主题
UserSyncTopic = "user-sync"
)
type EventType string
// 用户事件类型常量
const (
UserCreateEvent EventType = "user-create-event" // 用户创建事件
UserUpdateEvent EventType = "user-update-event" // 用户更新事件
UserDeleteEvent EventType = "user-delete-event" // 用户删除事件
)
type OptionKafkaSync = func(s *kafkaSync)
func OptKafkaLogger(l Logger) func(s *kafkaSync) {
return func(s *kafkaSync) {
s.logger = l
}
}
type KafkaConfig struct {
Brokers []string `json:"brokers"` // kafka broker 地址
GroupID string `json:"group_id"` // kafka group id
SaslUser string `json:"sasluser"` // kafka 用户名
SaslPassword string `json:"saslpassword"` // kafka 密码
SaslMechanism string `json:"saslmechanism"` // 可选值: plain / scram-sha256 / scram-sha512
}
func NewUserCentKafkaSync(platformCode string, kafkaConfig *KafkaConfig, opts ...OptionKafkaSync) UserCentSync {
k := &kafkaSync{
platformCode: platformCode,
kafkaConfig: kafkaConfig,
}
for _, opt := range opts {
opt(k)
}
if k.logger == nil {
mLog := NewLogger()
k.logger = mLog
}
return k
}
type kafkaSync struct {
platformCode string
kafkaConfig *KafkaConfig
kafkaReader *kafka.Reader
errHandler func(ctx context.Context, err error) error
eventHandler func(ctx context.Context, event *UserEvent) error
cancelFunc context.CancelFunc
logger Logger
}
func (k *kafkaSync) Stop(ctx context.Context) error {
// 取消上下文
if k.cancelFunc != nil {
k.cancelFunc()
}
// 关闭 kafka 消费者
if k.kafkaReader != nil {
if err := k.kafkaReader.Close(); err != nil {
return fmt.Errorf("close kafka reader failed, err: %v", err)
}
}
return nil
}
func (k *kafkaSync) Start(ctx context.Context) error {
// 初始化 SASL 机制
var mechanism sasl.Mechanism
switch strings.ToLower(k.kafkaConfig.SaslMechanism) {
case "scram-sha256":
mechanism, _ = scram.Mechanism(scram.SHA256, k.kafkaConfig.SaslUser, k.kafkaConfig.SaslPassword)
case "scram-sha512":
mechanism, _ = scram.Mechanism(scram.SHA512, k.kafkaConfig.SaslUser, k.kafkaConfig.SaslPassword)
case "plain", "": // 默认 plain
mechanism = plain.Mechanism{
Username: k.kafkaConfig.SaslUser,
Password: k.kafkaConfig.SaslPassword,
}
default:
return fmt.Errorf("unsupported sasl mechanism: %s", k.kafkaConfig.SaslMechanism)
}
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
SASLMechanism: mechanism,
}
// 初始化 kafka 消费者
k.kafkaReader = kafka.NewReader(kafka.ReaderConfig{
Brokers: k.kafkaConfig.Brokers,
GroupID: k.kafkaConfig.GroupID,
Topic: UserSyncTopic, // 订阅主题
MinBytes: 10, // 最小字节数
MaxBytes: 10e6, // 最大字节数
MaxWait: 10 * time.Second, // 最大等待时间
CommitInterval: 100 * time.Millisecond, // 提交偏移量的时间间隔
StartOffset: kafka.LastOffset, // 从最后一个偏移量开始消费
WatchPartitionChanges: true, // 监听分区变化
Dialer: dialer, // 自定义拨号器
})
sCtx, cancelFunc := context.WithCancel(ctx)
k.cancelFunc = cancelFunc
// 启动消费循环
go k.startLoop(sCtx)
return nil
}
func (k *kafkaSync) SetCallUserListener(f func(context.Context, *UserEvent) error) {
k.eventHandler = f
}
func (k *kafkaSync) callEventHandler(ctx context.Context, message *kafka.Message) error {
if k.eventHandler == nil {
return nil
}
var userEvent UserEvent
err := json.Unmarshal(message.Value, &userEvent)
if err != nil {
return fmt.Errorf("unmarshal user event failed topic: %s message: %s err: %v", message.Topic, message.Value, err)
}
// 过滤非当前平台的事件
if userEvent.PlatformCode != k.platformCode {
return nil
}
k.logger.Printf("received event topic: %s message: %s", message.Topic, message.Value)
return k.eventHandler(ctx, &userEvent)
}
func (k *kafkaSync) SetErrorListener(f func(context.Context, error) error) {
k.errHandler = f
}
func (k *kafkaSync) callErrorListener(ctx context.Context, err error) error {
if k.errHandler == nil {
return nil
}
return k.errHandler(ctx, err)
}
func (k *kafkaSync) startLoop(ctx context.Context) {
k.logger.Printf("kafka reader start loop")
defer k.logger.Printf("kafka reader stop")
for {
// 从 kafka 读取消息, 该消息会阻塞,直到有消息可读
// 并且里面也判断了 ctx 的取消,所以不再判断
message, err := k.kafkaReader.FetchMessage(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
// 其他错误, 直接返回
k.logger.Printf("kafka reader fetch message failed, err: %v", err)
continue
}
// 解析消息
var event BaseEvent
if err := json.Unmarshal(message.Value, &event); err != nil {
if err := k.callErrorListener(ctx, err); err != nil {
k.logger.Printf("call error listener failed, err: %v", err)
continue
}
}
// 是用户事件 就处理
if strings.HasPrefix(event.EventType, "user-") {
if err := k.callEventHandler(ctx, &message); err != nil {
k.logger.Printf("call event handler failed, err: %v", err)
continue
}
}
// 提交偏移量
if err := k.kafkaReader.CommitMessages(ctx, message); err != nil {
k.logger.Printf("commit message failed, err: %v", err)
continue
}
}
}