tdengine-mapper-go/insert_build.go

179 lines
4.8 KiB
Go
Raw Normal View History

2024-09-14 15:57:03 +08:00
package tdmap
import (
"database/sql"
"fmt"
"reflect"
"strings"
"time"
)
// buildInsertStatementForSuperTable 构建超级表插入语句
// @param rows 同一超级表下的数据行
// @return 插入语句
func buildInsertStatementForSuperTable(rows ...*TableRowMateria) (string, error) {
if len(rows) == 0 {
return "", fmt.Errorf("no rows provided for super table insert")
}
var sb strings.Builder
// 格式化标签列和值
formattedTagColumns := formatColumns(rows[0].TagColumns...)
formattedTagValues := formatRowValues(rows[0].TagValues...)
formattedColumns := formatColumns(rows[0].Columns...)
formattedSuperTableName := formatIdentifier(rows[0].SuperTableName)
formattedTableName := formatIdentifier(rows[0].TableName)
/*
insert into `dev_001`
using `super_dev` (`dev_id`,`dev_type`) tags ('A001','电表')
(`ts`,`aV`,`bV`,`aI`,`bI`) values
('2024-09-01T00:00:00+08:00','220','221','20.0','20.1'),
('2024-09-02T00:00:00+08:00','220','221','20.0','20.1')
*/
// 构建插入语句头部
sb.WriteString(fmt.Sprintf("%s USING %s %s TAGS %s %s VALUES ",
formattedTableName, formattedSuperTableName,
formattedTagColumns,
formattedTagValues,
formattedColumns,
))
// 添加每行的值
for i, row := range rows {
if i > 0 {
sb.WriteString(",")
}
sb.WriteString(formatRowValues(row.Values...))
}
return sb.String(), nil
}
// buildInsertStatementForNormalTable 普通表插入构建
// @param data 数据 当前应该是同一个表的
// @return 插入语句 表名1 (列名) VALUES (),()
func buildInsertStatementForNormalTable(rows ...*TableRowMateria) (string, error) {
if len(rows) == 0 {
return "", fmt.Errorf("no data provided for normal table insert")
}
var sb strings.Builder
formattedColumns := formatColumns(append(rows[0].TagColumns, rows[0].Columns...)...)
formattedTableName := formatIdentifier(rows[0].TableName)
/*
insert into `dev_001`
(`ts`,`aV`,`bV`,`aI`,`bI`) values
('2024-09-01T00:00:00+08:00','220','221','20.0','20.1'),
('2024-09-02T00:00:00+08:00','220','221','20.0','20.1')
*/
// 构建插入语句头部
sb.WriteString(fmt.Sprintf("%s %s VALUES ", formattedTableName, formattedColumns))
// 添加每行的值
for i, row := range rows {
if i > 0 {
sb.WriteString(",")
}
sb.WriteString(formatRowValues(append(row.TagValues, row.Values...)...))
}
return sb.String(), nil
}
// formatRowValues 格式化单行的值并返回格式化后的字符串
// @param values 单行的值
// @return 格式化后的字符串 ('a',12.3,true,)
func formatRowValues(values ...any) string {
formattedValues := make([]string, len(values))
for i, val := range values {
switch v := val.(type) {
case sql.NullBool:
if v.Valid {
formattedValues[i] = fmt.Sprintf("%v", v.Bool)
} else {
formattedValues[i] = "null"
}
case sql.NullByte:
if v.Valid {
formattedValues[i] = fmt.Sprintf("%v", v.Byte)
} else {
formattedValues[i] = "null"
}
case sql.NullFloat64:
if v.Valid {
formattedValues[i] = fmt.Sprintf("%v", v.Float64)
} else {
formattedValues[i] = "null"
}
case sql.NullInt64:
if v.Valid {
formattedValues[i] = fmt.Sprintf("%v", v.Int64)
} else {
formattedValues[i] = "null"
}
case sql.NullInt32:
if v.Valid {
formattedValues[i] = fmt.Sprintf("%v", v.Int32)
} else {
formattedValues[i] = "null"
}
case sql.NullInt16:
if v.Valid {
formattedValues[i] = fmt.Sprintf("%v", v.Int16)
} else {
formattedValues[i] = "null"
}
case sql.NullString:
if v.Valid {
formattedValues[i] = fmt.Sprintf("'%s'", v.String)
} else {
formattedValues[i] = "null"
}
case sql.NullTime:
if v.Valid {
formattedValues[i] = fmt.Sprintf("'%s'", v.Time.Format(time.RFC3339))
} else {
formattedValues[i] = "null"
}
case string:
formattedValues[i] = fmt.Sprintf("'%s'", v)
case rune: // rune 转换为 string
formattedValues[i] = fmt.Sprintf("'%s'", string(v))
case nil:
formattedValues[i] = "null"
case time.Time:
formattedValues[i] = fmt.Sprintf("'%s'", v.Format("2006-01-02 15:04:05"))
default:
if reflect.TypeOf(val).Kind() == reflect.Ptr && reflect.ValueOf(val).IsNil() {
formattedValues[i] = "null"
continue
}
formattedValues[i] = fmt.Sprintf("%v", val)
}
}
return "(" + strings.Join(formattedValues, ",") + ")"
}
// formatIdentifier 为表名或超级表名添加反引号
// @param identifier 表名或超级表名
// @return 格式化后的字符串 `tableName`
func formatIdentifier(identifier string) string {
return fmt.Sprintf("`%s`", identifier)
}
// formatColumns 为列名添加反引号
// @param columns 列名
func formatColumns(columns ...string) string {
formatted := make([]string, len(columns))
for i, col := range columns {
formatted[i] = fmt.Sprintf("`%s`", col)
}
return "(" + strings.Join(formatted, ",") + ")"
}