feat: 完成批量插入的SQL生成功能

This commit is contained in:
周杰 2024-09-14 13:12:52 +08:00
parent adde90af93
commit 41c2b5d211
10 changed files with 790 additions and 1 deletions

1
.gitignore vendored
View File

@ -21,3 +21,4 @@
# Go workspace file
go.work
.idea

View File

@ -1,3 +1,76 @@
# tdengine-mapper-go
struct 与 TDengine 表进行映射相关操作
> Golang 的 struct 与 TDengine 表进行映射
### struct 标签
```
db : 表的列名, 值为 "" 或 "-" 表示忽略
taos : taos的标签目前只有 [tag], 表示当前是 超级表的 TAG 字段
```
### struct 接口
```go
type SuperTableNamer interface {
SuperTableName() string // 获取超级表的表名 可选
}
type TableNamer interface {
TableName() string // 当前表的表名
}
```
`TableNamer` 是必须要实现的接口
`SuperTableName` 是可选实现的,如果没有 实现,就会当普通表进行处理
### 示例
```go
type User struct {
Name string `db:"name" taos:"tag"`
Age int `db:"age"`
}
func (u *User) TableName() string {
return "user_" + u.Name
}
func (u *User) SuperTableName() string {
return "super_user"
}
func TestSimpleInsert(t *testing.T) {
tdMapping := NewTdMapping()
data := []any{
&User{Name: "张三", Age: 18},
&User{Name: "李四", Age: 20},
}
insertSql, err := tdMapping.ToInsertSQL(data...)
if err != nil {
t.Fatal(err)
}
fmt.Println(insertSql)
}
------------------------------------------------------
INSERT INTO
`user_张三` USING `super_user` (`name`) TAGS ('张三') (`age`) VALUES (18)
`user_李四` USING `super_user` (`name`) TAGS ('李四') (`age`) VALUES (20)
```

3
go.mod Normal file
View File

@ -0,0 +1,3 @@
module manabox.cn/tdengine-mapper
go 1.18

178
insert_build.go Normal file
View File

@ -0,0 +1,178 @@
package td_builder
import (
"database/sql"
"fmt"
"reflect"
"strings"
"time"
)
// buildInsertStatementForSuperTable 构建超级表插入语句
// @param rows 同一超级表下的数据行
// @return 插入语句
func buildInsertStatementForSuperTable(rows ...*TableRowMateria) (string, error) {
if len(rows) == 0 {
return "", fmt.Errorf("no rows provided for super table insert")
}
var sb strings.Builder
// 格式化标签列和值
formattedTagColumns := formatColumns(rows[0].TagColumns...)
formattedTagValues := formatRowValues(rows[0].TagValues...)
formattedColumns := formatColumns(rows[0].Columns...)
formattedSuperTableName := formatIdentifier(rows[0].SuperTableName)
formattedTableName := formatIdentifier(rows[0].TableName)
/*
insert into `dev_001`
using `super_dev` (`dev_id`,`dev_type`) tags ('A001','电表')
(`ts`,`aV`,`bV`,`aI`,`bI`) values
('2024-09-01T00:00:00+08:00','220','221','20.0','20.1'),
('2024-09-02T00:00:00+08:00','220','221','20.0','20.1')
*/
// 构建插入语句头部
sb.WriteString(fmt.Sprintf("%s USING %s %s TAGS %s %s VALUES ",
formattedTableName, formattedSuperTableName,
formattedTagColumns,
formattedTagValues,
formattedColumns,
))
// 添加每行的值
for i, row := range rows {
if i > 0 {
sb.WriteString(",")
}
sb.WriteString(formatRowValues(row.Values...))
}
return sb.String(), nil
}
// buildInsertStatementForNormalTable 普通表插入构建
// @param data 数据 当前应该是同一个表的
// @return 插入语句 表名1 (列名) VALUES (),()
func buildInsertStatementForNormalTable(rows ...*TableRowMateria) (string, error) {
if len(rows) == 0 {
return "", fmt.Errorf("no data provided for normal table insert")
}
var sb strings.Builder
formattedColumns := formatColumns(append(rows[0].TagColumns, rows[0].Columns...)...)
formattedTableName := formatIdentifier(rows[0].TableName)
/*
insert into `dev_001`
(`ts`,`aV`,`bV`,`aI`,`bI`) values
('2024-09-01T00:00:00+08:00','220','221','20.0','20.1'),
('2024-09-02T00:00:00+08:00','220','221','20.0','20.1')
*/
// 构建插入语句头部
sb.WriteString(fmt.Sprintf("%s %s VALUES ", formattedTableName, formattedColumns))
// 添加每行的值
for i, row := range rows {
if i > 0 {
sb.WriteString(",")
}
sb.WriteString(formatRowValues(append(row.TagValues, row.Values...)...))
}
return sb.String(), nil
}
// formatRowValues 格式化单行的值并返回格式化后的字符串
// @param values 单行的值
// @return 格式化后的字符串 ('a',12.3,true,)
func formatRowValues(values ...any) string {
formattedValues := make([]string, len(values))
for i, val := range values {
switch v := val.(type) {
case sql.NullBool:
if v.Valid {
formattedValues[i] = fmt.Sprintf("%v", v.Bool)
} else {
formattedValues[i] = "null"
}
case sql.NullByte:
if v.Valid {
formattedValues[i] = fmt.Sprintf("%v", v.Byte)
} else {
formattedValues[i] = "null"
}
case sql.NullFloat64:
if v.Valid {
formattedValues[i] = fmt.Sprintf("%v", v.Float64)
} else {
formattedValues[i] = "null"
}
case sql.NullInt64:
if v.Valid {
formattedValues[i] = fmt.Sprintf("%v", v.Int64)
} else {
formattedValues[i] = "null"
}
case sql.NullInt32:
if v.Valid {
formattedValues[i] = fmt.Sprintf("%v", v.Int32)
} else {
formattedValues[i] = "null"
}
case sql.NullInt16:
if v.Valid {
formattedValues[i] = fmt.Sprintf("%v", v.Int16)
} else {
formattedValues[i] = "null"
}
case sql.NullString:
if v.Valid {
formattedValues[i] = fmt.Sprintf("'%s'", v.String)
} else {
formattedValues[i] = "null"
}
case sql.NullTime:
if v.Valid {
formattedValues[i] = fmt.Sprintf("'%s'", v.Time.Format(time.RFC3339))
} else {
formattedValues[i] = "null"
}
case string:
formattedValues[i] = fmt.Sprintf("'%s'", v)
case rune: // rune 转换为 string
formattedValues[i] = fmt.Sprintf("'%s'", string(v))
case nil:
formattedValues[i] = "null"
case time.Time:
formattedValues[i] = fmt.Sprintf("'%s'", v.Format("2006-01-02 15:04:05"))
default:
if reflect.TypeOf(val).Kind() == reflect.Ptr && reflect.ValueOf(val).IsNil() {
formattedValues[i] = "null"
continue
}
formattedValues[i] = fmt.Sprintf("%v", val)
}
}
return "(" + strings.Join(formattedValues, ",") + ")"
}
// formatIdentifier 为表名或超级表名添加反引号
// @param identifier 表名或超级表名
// @return 格式化后的字符串 `tableName`
func formatIdentifier(identifier string) string {
return fmt.Sprintf("`%s`", identifier)
}
// formatColumns 为列名添加反引号
// @param columns 列名
func formatColumns(columns ...string) string {
formatted := make([]string, len(columns))
for i, col := range columns {
formatted[i] = fmt.Sprintf("`%s`", col)
}
return "(" + strings.Join(formatted, ",") + ")"
}

174
mapping.go Normal file
View File

@ -0,0 +1,174 @@
package td_builder
import (
"fmt"
"manabox.cn/tdengine-mapper/syncmap"
"reflect"
"strings"
)
func NewTdMapping() *TdMapping {
return &TdMapping{}
}
// TableNamer 定义了一个获取表名的方法。
type TableNamer interface {
TableName() string
}
type TdMapping struct {
modelMates syncmap.Map[string, *StructMate]
}
func (b *TdMapping) scanStruct(data ...any) error {
for _, datum := range data {
if reflect.TypeOf(datum).Kind() != reflect.Ptr {
return fmt.Errorf("需要指针类型:%v", reflect.TypeOf(datum))
}
if mate, err := scan(datum); err != nil {
return err
} else {
b.modelMates.Store(mate.UniqueTypeName, mate)
}
}
return nil
}
// 提取 struct 内的信息
// @param data 包含 struct 类型的数据
// @return 返回一个 map[表名][]*TableRowMateria 如果超级表名为空,则表示普通表
func (b *TdMapping) extractStructData(data ...any) (map[string][]*TableRowMateria, error) {
result := make(map[string][]*TableRowMateria)
for _, item := range data {
tf, vf := getReflectTypeAndValue(item)
uniqueTypeName := getUniqueTypeName(tf)
// 获取表名
var tableName string
{
if vf.CanAddr() {
if a, ok := vf.Addr().Interface().(TableNamer); ok {
tableName = a.TableName()
}
}
if tableName == "" {
if a, ok := item.(TableNamer); ok {
tableName = a.TableName()
}
}
if tableName == "" {
return nil, fmt.Errorf("not import TableName() string func for struct type: %s", uniqueTypeName)
}
}
mate, ok := b.modelMates.Load(uniqueTypeName)
if !ok {
return nil, fmt.Errorf("not found struct type: %s", uniqueTypeName)
}
materia := &TableRowMateria{
SuperTableName: mate.SuperTableName,
TableName: tableName,
TagColumns: make([]string, 0, len(mate.TaggedFieldNames)),
TagValues: make([]any, 0, len(mate.TaggedFieldNames)),
Columns: make([]string, 0, len(mate.DBAnnotatedNames)),
Values: make([]any, 0, len(mate.DBAnnotatedNames)),
}
for _, name := range mate.DBAnnotatedNames {
// db tag 名称 -- 数据库列名
dbColumn := mate.FiledDBNameCache[name]
field := vf.FieldByIndex(mate.FieldIndexCache[name])
for field.Kind() == reflect.Ptr {
if field.IsNil() {
break
}
field = field.Elem()
}
// 字段值
dbValue := field.Interface()
materia.Columns = append(materia.Columns, dbColumn)
materia.Values = append(materia.Values, dbValue)
}
for _, name := range mate.TaggedFieldNames {
// db tag 名称 -- 数据库列名
tagColumn := mate.FiledDBNameCache[name]
field := vf.FieldByIndex(mate.FieldIndexCache[name])
for field.Kind() == reflect.Ptr {
if field.IsNil() {
break
}
field = field.Elem()
}
// 字段值
tagValue := field.Interface()
materia.TagColumns = append(materia.TagColumns, tagColumn)
materia.TagValues = append(materia.TagValues, tagValue)
}
// 添加到结果
key := materia.TableName
result[key] = append(result[key], materia)
}
return result, nil
}
func (b *TdMapping) ToInsertSQL(data ...any) (string, error) {
// 扫描 struct 类型数据
if err := b.scanStruct(data...); err != nil {
return "", fmt.Errorf("failed to scan struct: %w", err)
}
// 提取 struct 内的信息
tableMap, err := b.extractStructData(data...)
if err != nil {
return "", fmt.Errorf("failed to extract struct data: %w", err)
}
if len(tableMap) == 0 {
return "", fmt.Errorf("no data to insert")
}
// 构建 insert 语句
/*
INSERT INTO
表名1 USING 超级表名1 (tag列名) TAGS(tag值) (列名) VALUES (),()
表名2 USING 超级表名2 (tag列名)TAGS(tag值) (列名) VALUES (),()
*/
var buf strings.Builder
buf.WriteString("INSERT INTO \n")
for _, materials := range tableMap {
if len(materials) == 0 {
continue
}
var (
rowSql string
err error
)
if materials[0].SuperTableName == "" {
// 表名1 (列名) VALUES (),()
rowSql, err = buildInsertStatementForNormalTable(materials...)
} else {
// 表名1 USING 超级表名1 (tag列名) TAGS(tag值) (列名) VALUES (),()
rowSql, err = buildInsertStatementForSuperTable(materials...)
}
if err != nil {
return "", fmt.Errorf("failed to build insert statement: %w", err)
}
buf.WriteString(rowSql)
buf.WriteString(" \n")
}
return buf.String(), nil
}

126
mapping_test.go Normal file
View File

@ -0,0 +1,126 @@
package td_builder
import (
"database/sql"
"fmt"
"testing"
"time"
)
type TaosTAG struct {
DevId string `db:"dev_id" taos:"tag"`
DevType string `db:"dev_type" taos:"tag"`
DataType string `db:"data_type" taos:"tag"`
Alias string `db:"alias"`
}
type TaosDevice struct {
*TaosTAG
Ts time.Time `db:"ts"` // 时间戳
Ts2 *time.Time `db:"ts2"` // 时间戳
LoadUnitId string `db:"load_unit_id" taos:"tag"` // 负载单体 id
NullInt *int `db:"null_int"`
DefString string `db:"def_string"`
PInt *int `db:"p_int"`
NullInt64 sql.NullInt64 `db:"null_int64"`
NullInt32 sql.NullInt32 `db:"null_int32"`
}
func (s *TaosDevice) SuperTableName() string {
return "super_device"
}
func (s *TaosDevice) TableName() string {
return "device_" + s.DevId
}
type TaosUser struct {
*TaosTAG
Name string `db:"name" taos:"tag"`
Ts time.Time `db:"ts"` // 时间戳
// 体重
Weight int `db:"weight"`
}
func (s *TaosUser) SuperTableName() string {
return "super_user"
}
func (s *TaosUser) TableName() string {
return "user_" + s.DevId
}
func TestBuilderInsert(t *testing.T) {
tdMapping := NewTdMapping()
p := 1
data := []any{
&TaosDevice{
TaosTAG: &TaosTAG{
DevId: "设备ID",
DevType: "测试设备",
DataType: "测试数据",
},
Ts: time.Now(),
LoadUnitId: "负载单体ID",
PInt: &p,
NullInt32: sql.NullInt32{Int32: 32, Valid: true},
},
&TaosUser{
TaosTAG: &TaosTAG{
DevId: "User001",
DevType: "User类型",
DataType: "User数据类型001",
},
Ts: time.Now(),
Name: "张三",
}, &TaosUser{
TaosTAG: &TaosTAG{
DevId: "User002",
DevType: "User类型",
DataType: "User数据类型002",
},
Ts: time.Now(),
Name: "李四",
Weight: 110,
}, &TaosUser{
TaosTAG: &TaosTAG{
DevId: "User002",
DevType: "User类型",
DataType: "User数据类型002",
},
Name: "李四",
Ts: time.Now(),
Weight: 100,
},
}
insertSql, err := tdMapping.ToInsertSQL(data...)
if err != nil {
t.Fatal(err)
}
fmt.Println(insertSql)
}
type User struct {
Name string `db:"name" taos:"tag"`
Age int `db:"age"`
}
func (u *User) TableName() string {
return "user_" + u.Name
}
func (u *User) SuperTableName() string {
return "super_user"
}
func TestSimpleInsert(t *testing.T) {
tdMapping := NewTdMapping()
data := []any{
&User{Name: "张三", Age: 18},
&User{Name: "李四", Age: 20},
}
insertSql, err := tdMapping.ToInsertSQL(data...)
if err != nil {
t.Fatal(err)
}
fmt.Println(insertSql)
}

156
scan.go Normal file
View File

@ -0,0 +1,156 @@
package td_builder
import (
"fmt"
"reflect"
)
func scan(data interface{}) (*StructMate, error) {
// 获取结构体的类型
t, v := getReflectTypeAndValue(data)
// 确保是结构体类型
if t.Kind() != reflect.Struct {
return nil, fmt.Errorf("input data is not a struct")
}
// 获取包路径和类型名称
uniqueTypeName := getUniqueTypeName(t)
// 初始化结果结构体
sr := StructMate{
UniqueTypeName: uniqueTypeName,
FieldIndexCache: make(map[string][]int, t.NumField()),
FiledDBNameCache: make(map[string]string, t.NumField()),
DBAnnotatedNames: make([]string, 0, t.NumField()),
TaggedFieldNames: make([]string, 0, t.NumField()),
SuperTableName: "",
}
//timeType := reflect.TypeOf(time.Time{})
// 遍历结构体的字段
for i := 0; i < t.NumField(); i++ {
field := t.Field(i)
fieldValue := v.Field(i)
if field.Anonymous {
if field.Type.Kind() == reflect.Ptr {
if fieldValue.IsNil() {
// 如果指针是 nil创建一个该类型的零值实例
zeroValue := reflect.Zero(field.Type.Elem())
fieldValue = zeroValue
} else {
fieldValue = fieldValue.Elem()
}
}
if !fieldValue.CanInterface() {
continue
}
subResult, err := scan(fieldValue.Interface())
if err != nil {
return nil, err
}
for k, v := range subResult.FieldIndexCache {
sr.FieldIndexCache[k] = append(sr.FieldIndexCache[k], i)
sr.FieldIndexCache[k] = append(sr.FieldIndexCache[k], v...)
}
for k, v := range subResult.FiledDBNameCache {
sr.FiledDBNameCache[k] = v
}
sr.DBAnnotatedNames = append(sr.DBAnnotatedNames, subResult.DBAnnotatedNames...)
sr.TaggedFieldNames = append(sr.TaggedFieldNames, subResult.TaggedFieldNames...)
continue
}
//// 如果字段是结构体或结构体指针,递归处理
//if (field.Type.Kind() == reflect.Struct || field.Type.Kind() == reflect.Ptr) && field.Type != timeType {
//
// if field.Type.Kind() == reflect.Ptr {
// if fieldValue.IsNil() {
// // 如果指针是 nil创建一个该类型的零值实例
// zeroValue := reflect.Zero(field.Type.Elem())
// fieldValue = zeroValue
// } else {
// fieldValue = fieldValue.Elem()
// }
// }
//
// if !fieldValue.CanInterface() {
// continue
// }
//
// subResult, err := scan(fieldValue.Interface())
// if err != nil {
// return nil, err
// }
// sr.merge(subResult)
// continue
//}
// 检查字段是否有db注解
columnName := field.Tag.Get("db")
if columnName == "-" || columnName == "" {
continue
}
sr.FieldIndexCache[field.Name] = []int{i}
sr.FiledDBNameCache[field.Name] = columnName
// 检查字段是否有taos注解
if field.Tag.Get("taos") == "tag" {
sr.TaggedFieldNames = append(sr.TaggedFieldNames, field.Name)
} else {
sr.DBAnnotatedNames = append(sr.DBAnnotatedNames, field.Name)
}
}
sr.SuperTableName = callSuperTableName(data)
return &sr, nil
}
// 获取包路径和类型名称
func getUniqueTypeName(t reflect.Type) string {
pkgPath := t.PkgPath()
typeName := t.Name()
uniqueTypeName := fmt.Sprintf("%s.%s", pkgPath, typeName)
return uniqueTypeName
}
func getReflectTypeAndValue(data interface{}) (reflect.Type, reflect.Value) {
t := reflect.TypeOf(data)
v := reflect.ValueOf(data)
// 处理结构体和结构体指针
for t.Kind() == reflect.Ptr {
t = t.Elem()
v = v.Elem()
}
return t, v
}
func callSuperTableName(obj any) string {
v := reflect.ValueOf(obj)
if v.Kind() == reflect.Struct {
return ""
}
// 检查是否可以调用 SuperTableName 方法
superTableNameMethod := v.MethodByName("SuperTableName")
if !superTableNameMethod.IsValid() {
return ""
}
// 调用 SuperTableName 方法
results := superTableNameMethod.Call(nil)
if len(results) == 1 && results[0].Kind() == reflect.String {
return results[0].String()
}
return ""
}

40
syncmap/map.go Normal file
View File

@ -0,0 +1,40 @@
package syncmap
import (
"sync"
)
type Map[T any, V any] struct {
sMap sync.Map
}
func (m *Map[T, V]) Store(key T, value V) {
m.sMap.Store(key, value)
}
func (m *Map[T, V]) Load(key T) (value V, ok bool) {
v, ok := m.sMap.Load(key)
if ok {
return v.(V), ok
}
return
}
func (m *Map[T, V]) Delete(key T) {
m.sMap.Delete(key)
}
func (m *Map[T, V]) Range(f func(T, V) bool) {
m.sMap.Range(func(key, value any) bool {
return f(key.(T), value.(V))
})
}
func (m *Map[T, V]) LoadOrStore(key T, value V) (actual V, loaded bool) {
_actual, loaded := m.sMap.LoadOrStore(key, value)
if loaded {
actual = _actual.(V)
}
return
}

14
syncmap/map_test.go Normal file
View File

@ -0,0 +1,14 @@
package syncmap
import (
"log"
"testing"
)
func TestAA(t *testing.T) {
var m Map[string, int]
m.Store("a", 1234)
value, _ := m.Load("a")
log.Println("value:", value)
}

24
types.go Normal file
View File

@ -0,0 +1,24 @@
package td_builder
type TableRowMateria struct {
SuperTableName string
TableName string
TagColumns []string // tag列名
TagValues []any // tag值
Columns []string // 列名
Values []any // 值
}
// StructMate 静态化的结构体信息
type StructMate struct {
UniqueTypeName string // 结构体的唯一标识符
FieldIndexCache map[string][]int // 字段名到索引的映射缓存
FiledDBNameCache map[string]string // 字段名到 db 注解的名称的映射缓存
DBAnnotatedNames []string // 包含 db 注解的 属性的名称
TaggedFieldNames []string // 包含的 tag 注解的 属性的名称
SuperTableName string // 超级表名
}