package user_core_client import ( "context" "encoding/json" "log" "os" "os/signal" "syscall" "testing" ) func init() { log.SetFlags(log.LstdFlags | log.Lshortfile) } func TestUserSyncClient(t *testing.T) { client := NewUserCentKafkaSync("user-core", &KafkaConfig{ Brokers: []string{"101.133.147.225:9092"}, GroupID: "test-group", SaslUser: "admin", SaslPassword: "admin123", SaslMechanism: "plain", }, OptKafkaLogger(log.New(os.Stdout, "", log.LstdFlags|log.Lshortfile))) ctx := context.Background() if err := client.Start(ctx); err != nil { t.Fatalf("start user sync client failed,err:%v", err) } defer func() { log.Printf("stop user sync client ...") if err := client.Stop(ctx); err != nil { log.Printf("stop user sync client failed,err:%v", err) } else { log.Printf("stop user sync client success") } }() client.SetCallUserListener(func(ctx context.Context, event *UserEvent) error { marshal, _ := json.Marshal(event) log.Printf("user event: %s", marshal) return nil }) // 优雅的关闭 监听 ctrl + c ch := make(chan os.Signal, 1) signal.Notify(ch, os.Interrupt, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGSTOP, syscall.SIGUSR1) <-ch }