diff --git a/center/cconf/plugin.go b/center/cconf/plugin.go index e3bdc4ad..4f29120e 100644 --- a/center/cconf/plugin.go +++ b/center/cconf/plugin.go @@ -43,4 +43,16 @@ var Plugins = []Plugin{ Type: "pgsql", TypeName: "PostgreSQL", }, + { + Id: 8, + Category: "logging", + Type: "doris", + TypeName: "Doris", + }, + { + Id: 9, + Category: "logging", + Type: "opensearch", + TypeName: "OpenSearch", + }, } diff --git a/center/router/router.go b/center/router/router.go index 02cfdba9..11409e24 100644 --- a/center/router/router.go +++ b/center/router/router.go @@ -232,6 +232,11 @@ func (rt *Router) Config(r *gin.Engine) { pages.POST("/log-query", rt.QueryLog) } + // OpenSearch 专用接口 + pages.POST("/os-indices", rt.QueryOSIndices) + pages.POST("/os-variable", rt.QueryOSVariable) + pages.POST("/os-fields", rt.QueryOSFields) + pages.GET("/sql-template", rt.QuerySqlTemplate) pages.POST("/auth/login", rt.jwtMock(), rt.loginPost) pages.POST("/auth/logout", rt.jwtMock(), rt.auth(), rt.user(), rt.logoutPost) diff --git a/center/router/router_datasource.go b/center/router/router_datasource.go index fcd9e628..0eec1ac8 100644 --- a/center/router/router_datasource.go +++ b/center/router/router_datasource.go @@ -2,12 +2,14 @@ package router import ( "crypto/tls" + "encoding/json" "fmt" "io" "net/http" "net/url" "strings" + "github.com/ccfos/nightingale/v6/datasource/opensearch" "github.com/ccfos/nightingale/v6/models" "github.com/gin-gonic/gin" @@ -108,6 +110,48 @@ func (rt *Router) datasourceUpsert(c *gin.Context) { } } + for k, v := range req.SettingsJson { + if strings.Contains(k, "cluster_name") { + req.ClusterName = v.(string) + break + } + } + + if req.PluginType == models.OPENSEARCH { + b, err := json.Marshal(req.SettingsJson) + if err != nil { + logger.Warningf("marshal settings fail: %v", err) + return + } + + var os opensearch.OpenSearch + err = json.Unmarshal(b, &os) + if err != nil { + logger.Warningf("unmarshal settings fail: %v", err) + return + } + + if len(os.Nodes) == 0 { + logger.Warningf("nodes empty, %+v", req) + return + } + + req.HTTPJson = models.HTTP{ + Timeout: os.Timeout, + Url: os.Nodes[0], + Headers: os.Headers, + TLS: models.TLS{ + SkipTlsVerify: os.TLS.SkipTlsVerify, + }, + } + + req.AuthJson = models.Auth{ + BasicAuth: os.Basic.Enable, + BasicAuthUser: os.Basic.Username, + BasicAuthPassword: os.Basic.Password, + } + } + if req.Id == 0 { req.CreatedBy = username req.Status = "enabled" diff --git a/center/router/router_opensearch.go b/center/router/router_opensearch.go new file mode 100644 index 00000000..c2e3e724 --- /dev/null +++ b/center/router/router_opensearch.go @@ -0,0 +1,58 @@ +package router + +import ( + "github.com/ccfos/nightingale/v6/datasource/opensearch" + "github.com/ccfos/nightingale/v6/dscache" + + "github.com/gin-gonic/gin" + "github.com/toolkits/pkg/ginx" + "github.com/toolkits/pkg/logger" +) + +func (rt *Router) QueryOSIndices(c *gin.Context) { + var f IndexReq + ginx.BindJSON(c, &f) + + plug, exists := dscache.DsCache.Get(f.Cate, f.DatasourceId) + if !exists { + logger.Warningf("cluster:%d not exists", f.DatasourceId) + ginx.Bomb(200, "cluster not exists") + } + + indices, err := plug.(*opensearch.OpenSearch).QueryIndices() + ginx.Dangerous(err) + + ginx.NewRender(c).Data(indices, nil) +} + +func (rt *Router) QueryOSFields(c *gin.Context) { + var f IndexReq + ginx.BindJSON(c, &f) + + plug, exists := dscache.DsCache.Get(f.Cate, f.DatasourceId) + if !exists { + logger.Warningf("cluster:%d not exists", f.DatasourceId) + ginx.Bomb(200, "cluster not exists") + } + + fields, err := plug.(*opensearch.OpenSearch).QueryFields([]string{f.Index}) + ginx.Dangerous(err) + + ginx.NewRender(c).Data(fields, nil) +} + +func (rt *Router) QueryOSVariable(c *gin.Context) { + var f FieldValueReq + ginx.BindJSON(c, &f) + + plug, exists := dscache.DsCache.Get(f.Cate, f.DatasourceId) + if !exists { + logger.Warningf("cluster:%d not exists", f.DatasourceId) + ginx.Bomb(200, "cluster not exists") + } + + fields, err := plug.(*opensearch.OpenSearch).QueryFieldValue([]string{f.Index}, f.Query.Field, f.Query.Query) + ginx.Dangerous(err) + + ginx.NewRender(c).Data(fields, nil) +} diff --git a/datasource/doris/doris.go b/datasource/doris/doris.go new file mode 100644 index 00000000..c4428600 --- /dev/null +++ b/datasource/doris/doris.go @@ -0,0 +1,199 @@ +package doris + +import ( + "context" + "fmt" + "strings" + + "github.com/ccfos/nightingale/v6/datasource" + "github.com/ccfos/nightingale/v6/dskit/doris" + "github.com/ccfos/nightingale/v6/dskit/types" + "github.com/ccfos/nightingale/v6/models" + + "github.com/mitchellh/mapstructure" + "github.com/toolkits/pkg/logger" +) + +const ( + DorisType = "doris" +) + +func init() { + datasource.RegisterDatasource(DorisType, new(Doris)) +} + +type Doris struct { + doris.Doris `json:",inline" mapstructure:",squash"` +} + +type QueryParam struct { + Ref string `json:"ref" mapstructure:"ref"` + Database string `json:"database" mapstructure:"database"` + Table string `json:"table" mapstructure:"table"` + SQL string `json:"sql" mapstructure:"sql"` + Keys datasource.Keys `json:"keys" mapstructure:"keys"` +} + +func (d *Doris) InitClient() error { + if len(d.Addr) == 0 { + return fmt.Errorf("not found doris addr, please check datasource config") + } + if _, err := d.NewConn(context.TODO(), ""); err != nil { + return err + } + return nil +} + +func (d *Doris) Init(settings map[string]interface{}) (datasource.Datasource, error) { + newest := new(Doris) + err := mapstructure.Decode(settings, newest) + return newest, err +} + +func (d *Doris) Validate(ctx context.Context) error { + if len(d.Addr) == 0 || len(strings.TrimSpace(d.Addr)) == 0 { + return fmt.Errorf("doris addr is invalid, please check datasource setting") + } + + if len(strings.TrimSpace(d.User)) == 0 { + return fmt.Errorf("doris user is invalid, please check datasource setting") + } + + return nil +} + +// Equal compares whether two objects are the same, used for caching +func (d *Doris) Equal(p datasource.Datasource) bool { + newest, ok := p.(*Doris) + if !ok { + logger.Errorf("unexpected plugin type, expected is ck") + return false + } + + // only compare first shard + if d.Addr != newest.Addr { + return false + } + + if d.User != newest.User { + return false + } + + if d.Password != newest.Password { + return false + } + + if d.EnableWrite != newest.EnableWrite { + return false + } + + if d.FeAddr != newest.FeAddr { + return false + } + + if d.MaxQueryRows != newest.MaxQueryRows { + return false + } + + if d.Timeout != newest.Timeout { + return false + } + + if d.MaxIdleConns != newest.MaxIdleConns { + return false + } + + if d.MaxOpenConns != newest.MaxOpenConns { + return false + } + + if d.ConnMaxLifetime != newest.ConnMaxLifetime { + return false + } + + if d.ClusterName != newest.ClusterName { + return false + } + + return true +} + +func (d *Doris) MakeLogQuery(ctx context.Context, query interface{}, eventTags []string, start, end int64) (interface{}, error) { + return nil, nil +} + +func (d *Doris) MakeTSQuery(ctx context.Context, query interface{}, eventTags []string, start, end int64) (interface{}, error) { + return nil, nil +} + +func (d *Doris) QueryMapData(ctx context.Context, query interface{}) ([]map[string]string, error) { + return nil, nil +} + +func (d *Doris) QueryData(ctx context.Context, query interface{}) ([]models.DataResp, error) { + dorisQueryParam := new(QueryParam) + if err := mapstructure.Decode(query, dorisQueryParam); err != nil { + return nil, err + } + + if dorisQueryParam.Keys.ValueKey == "" { + return nil, fmt.Errorf("valueKey is required") + } + + items, err := d.QueryTimeseries(context.TODO(), &doris.QueryParam{ + Database: dorisQueryParam.Database, + Sql: dorisQueryParam.SQL, + Keys: types.Keys{ + ValueKey: dorisQueryParam.Keys.ValueKey, + LabelKey: dorisQueryParam.Keys.LabelKey, + TimeKey: dorisQueryParam.Keys.TimeKey, + }, + }) + if err != nil { + logger.Warningf("query:%+v get data err:%v", dorisQueryParam, err) + return []models.DataResp{}, err + } + data := make([]models.DataResp, 0) + for i := range items { + data = append(data, models.DataResp{ + Ref: dorisQueryParam.Ref, + Metric: items[i].Metric, + Values: items[i].Values, + }) + } + + // parse resp to time series data + logger.Infof("req:%+v keys:%+v \n data:%v", dorisQueryParam, dorisQueryParam.Keys, data) + + return data, nil +} + +func (d *Doris) QueryLog(ctx context.Context, query interface{}) ([]interface{}, int64, error) { + dorisQueryParam := new(QueryParam) + if err := mapstructure.Decode(query, dorisQueryParam); err != nil { + return nil, 0, err + } + + items, err := d.QueryLogs(ctx, &doris.QueryParam{ + Database: dorisQueryParam.Database, + Sql: dorisQueryParam.SQL, + }) + if err != nil { + logger.Warningf("query:%+v get data err:%v", dorisQueryParam, err) + return []interface{}{}, 0, err + } + logs := make([]interface{}, 0) + for i := range items { + logs = append(logs, items[i]) + } + + return logs, 0, nil +} + +func (d *Doris) DescribeTable(ctx context.Context, query interface{}) ([]*types.ColumnProperty, error) { + dorisQueryParam := new(QueryParam) + if err := mapstructure.Decode(query, dorisQueryParam); err != nil { + return nil, err + } + return d.DescTable(ctx, dorisQueryParam.Database, dorisQueryParam.Table) +} diff --git a/datasource/opensearch/opensearch.go b/datasource/opensearch/opensearch.go new file mode 100644 index 00000000..680f66b2 --- /dev/null +++ b/datasource/opensearch/opensearch.go @@ -0,0 +1,399 @@ +package opensearch + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "net/url" + "reflect" + "regexp" + "sort" + "strings" + "time" + + "github.com/ccfos/nightingale/v6/datasource" + "github.com/ccfos/nightingale/v6/datasource/commons/eslike" + "github.com/ccfos/nightingale/v6/models" + "github.com/ccfos/nightingale/v6/pkg/tlsx" + + "github.com/mitchellh/mapstructure" + "github.com/olivere/elastic/v7" + oscliv2 "github.com/opensearch-project/opensearch-go/v2" + osapiv2 "github.com/opensearch-project/opensearch-go/v2/opensearchapi" +) + +const ( + OpenSearchType = "opensearch" +) + +type OpenSearch struct { + Addr string `json:"os.addr" mapstructure:"os.addr"` + Nodes []string `json:"os.nodes" mapstructure:"os.nodes"` + Timeout int64 `json:"os.timeout" mapstructure:"os.timeout"` // millis + Basic BasicAuth `json:"os.basic" mapstructure:"os.basic"` + TLS TLS `json:"os.tls" mapstructure:"os.tls"` + Version string `json:"os.version" mapstructure:"os.version"` + Headers map[string]string `json:"os.headers" mapstructure:"os.headers"` + MinInterval int `json:"os.min_interval" mapstructure:"os.min_interval"` // seconds + MaxShard int `json:"os.max_shard" mapstructure:"os.max_shard"` + ClusterName string `json:"os.cluster_name" mapstructure:"os.cluster_name"` + Client *oscliv2.Client `json:"os.client" mapstructure:"os.client"` +} + +type TLS struct { + SkipTlsVerify bool `json:"os.tls.skip_tls_verify" mapstructure:"os.tls.skip_tls_verify"` +} + +type BasicAuth struct { + Enable bool `json:"os.auth.enable" mapstructure:"os.auth.enable"` + Username string `json:"os.user" mapstructure:"os.user"` + Password string `json:"os.password" mapstructure:"os.password"` +} + +func init() { + datasource.RegisterDatasource(OpenSearchType, new(OpenSearch)) +} + +func (os *OpenSearch) Init(settings map[string]interface{}) (datasource.Datasource, error) { + newest := new(OpenSearch) + err := mapstructure.Decode(settings, newest) + return newest, err +} + +func (os *OpenSearch) InitClient() error { + transport := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: time.Duration(os.Timeout) * time.Millisecond, + }).DialContext, + ResponseHeaderTimeout: time.Duration(os.Timeout) * time.Millisecond, + } + + if len(os.Nodes) > 0 { + os.Addr = os.Nodes[0] + } + + if strings.Contains(os.Addr, "https") { + tlsConfig := tlsx.ClientConfig{ + InsecureSkipVerify: os.TLS.SkipTlsVerify, + UseTLS: true, + } + cfg, err := tlsConfig.TLSConfig() + if err != nil { + return err + } + transport.TLSClientConfig = cfg + } + + headers := http.Header{} + for k, v := range os.Headers { + headers[k] = []string{v} + } + + options := oscliv2.Config{ + Addresses: os.Nodes, + Transport: transport, + Header: headers, + } + + if os.Basic.Enable && os.Basic.Username != "" { + options.Username = os.Basic.Username + options.Password = os.Basic.Password + } + + var err = error(nil) + os.Client, err = oscliv2.NewClient(options) + + return err +} + +func (os *OpenSearch) Equal(other datasource.Datasource) bool { + sort.Strings(os.Nodes) + sort.Strings(other.(*OpenSearch).Nodes) + + if strings.Join(os.Nodes, ",") != strings.Join(other.(*OpenSearch).Nodes, ",") { + return false + } + + if os.Basic.Username != other.(*OpenSearch).Basic.Username { + return false + } + + if os.Basic.Password != other.(*OpenSearch).Basic.Password { + return false + } + + if os.TLS.SkipTlsVerify != other.(*OpenSearch).TLS.SkipTlsVerify { + return false + } + + if os.Timeout != other.(*OpenSearch).Timeout { + return false + } + + if !reflect.DeepEqual(os.Headers, other.(*OpenSearch).Headers) { + return false + } + + return true +} + +func (os *OpenSearch) Validate(ctx context.Context) (err error) { + if len(os.Nodes) == 0 { + return fmt.Errorf("need a valid addr") + } + + for _, addr := range os.Nodes { + _, err = url.Parse(addr) + if err != nil { + return fmt.Errorf("parse addr error: %v", err) + } + } + + if os.Basic.Enable && (len(os.Basic.Username) == 0 || len(os.Basic.Password) == 0) { + return fmt.Errorf("need a valid user, password") + } + + if os.MaxShard == 0 { + os.MaxShard = 5 + } + + if os.MinInterval < 10 { + os.MinInterval = 10 + } + + if os.Timeout == 0 { + os.Timeout = 6000 + } + + if !strings.HasPrefix(os.Version, "2") { + return fmt.Errorf("version must be 2.0+") + } + + return nil +} + +func (os *OpenSearch) MakeLogQuery(ctx context.Context, query interface{}, eventTags []string, start, end int64) (interface{}, error) { + return eslike.MakeLogQuery(ctx, query, eventTags, start, end) +} + +func (os *OpenSearch) MakeTSQuery(ctx context.Context, query interface{}, eventTags []string, start, end int64) (interface{}, error) { + return eslike.MakeTSQuery(ctx, query, eventTags, start, end) +} + +func search(ctx context.Context, indices []string, source interface{}, timeout int, cli *oscliv2.Client) (*elastic.SearchResult, error) { + var body *bytes.Buffer = nil + if source != nil { + body = new(bytes.Buffer) + err := json.NewEncoder(body).Encode(source) + if err != nil { + return nil, err + } + } + + req := osapiv2.SearchRequest{ + Index: indices, + Body: body, + } + + if timeout > 0 { + req.Timeout = time.Second * time.Duration(timeout) + } + + resp, err := req.Do(ctx, cli) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return nil, fmt.Errorf("opensearch response not 2xx, resp is %v", resp) + } + + bs, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + result := new(elastic.SearchResult) + err = json.Unmarshal(bs, &result) + if err != nil { + return nil, err + } + + return result, nil +} + +func (os *OpenSearch) QueryData(ctx context.Context, queryParam interface{}) ([]models.DataResp, error) { + + search := func(ctx context.Context, indices []string, source interface{}, timeout int, maxShard int) (*elastic.SearchResult, error) { + return search(ctx, indices, source, timeout, os.Client) + } + + return eslike.QueryData(ctx, queryParam, os.Timeout, os.Version, search) +} + +func (os *OpenSearch) QueryIndices() ([]string, error) { + + cir := osapiv2.CatIndicesRequest{ + Format: "json", + } + + rsp, err := cir.Do(context.Background(), os.Client) + if err != nil { + return nil, err + } + defer rsp.Body.Close() + + bs, err := io.ReadAll(rsp.Body) + if err != nil { + return nil, err + } + + resp := make([]struct { + Index string `json:"index"` + }, 0) + + err = json.Unmarshal(bs, &resp) + if err != nil { + return nil, err + } + + var ret []string + for _, k := range resp { + ret = append(ret, k.Index) + } + + return ret, nil +} + +func (os *OpenSearch) QueryFields(indices []string) ([]string, error) { + + var fields []string + mappingRequest := osapiv2.IndicesGetMappingRequest{ + Index: indices, + } + + resp, err := mappingRequest.Do(context.Background(), os.Client) + if err != nil { + return fields, err + } + defer resp.Body.Close() + bs, err := io.ReadAll(resp.Body) + if err != nil { + return fields, err + } + + result := map[string]interface{}{} + + err = json.Unmarshal(bs, &result) + if err != nil { + return fields, err + } + + idx := "" + if len(indices) > 0 { + idx = indices[0] + } + + mappingIndex := "" + indexReg, _ := regexp.Compile(idx) + for key, value := range result { + mappings, ok := value.(map[string]interface{}) + if !ok { + continue + } + if len(mappings) == 0 { + continue + } + if key == idx || strings.Contains(key, idx) || + (indexReg != nil && indexReg.MatchString(key)) { + mappingIndex = key + break + } + } + + if len(mappingIndex) == 0 { + return fields, nil + } + + fields = propertyMappingRange(result[mappingIndex], 1) + + sort.Strings(fields) + return fields, nil +} + +func propertyMappingRange(v interface{}, depth int) (fields []string) { + mapping, ok := v.(map[string]interface{}) + if !ok { + return + } + if len(mapping) == 0 { + return + } + for key, value := range mapping { + if reflect.TypeOf(value).Kind() == reflect.Map { + valueMap := value.(map[string]interface{}) + if prop, found := valueMap["properties"]; found { + subFields := propertyMappingRange(prop, depth+1) + for i := range subFields { + if depth == 1 { + fields = append(fields, subFields[i]) + } else { + fields = append(fields, key+"."+subFields[i]) + } + } + } else if typ, found := valueMap["type"]; found { + if eslike.HitFilter(typ.(string)) { + continue + } + fields = append(fields, key) + } + } + } + return +} + +func (os *OpenSearch) QueryLog(ctx context.Context, queryParam interface{}) ([]interface{}, int64, error) { + + search := func(ctx context.Context, indices []string, source interface{}, timeout int, maxShard int) (*elastic.SearchResult, error) { + return search(ctx, indices, source, timeout, os.Client) + } + + return eslike.QueryLog(ctx, queryParam, os.Timeout, os.Version, 0, search) +} + +func (os *OpenSearch) QueryFieldValue(indexs []string, field string, query string) ([]string, error) { + var values []string + source := elastic.NewSearchSource(). + Size(0) + + if query != "" { + source = source.Query(elastic.NewBoolQuery().Must(elastic.NewQueryStringQuery(query))) + } + source = source.Aggregation("distinct", elastic.NewTermsAggregation().Field(field).Size(10000)) + + result, err := search(context.Background(), indexs, source, 0, os.Client) + if err != nil { + return values, err + } + + agg, found := result.Aggregations.Terms("distinct") + if !found { + return values, nil + } + + for _, bucket := range agg.Buckets { + values = append(values, bucket.Key.(string)) + } + + return values, nil +} + +func (os *OpenSearch) QueryMapData(ctx context.Context, query interface{}) ([]map[string]string, error) { + return nil, nil +} diff --git a/dscache/sync.go b/dscache/sync.go index fa55f110..e76e1eac 100644 --- a/dscache/sync.go +++ b/dscache/sync.go @@ -8,8 +8,10 @@ import ( "github.com/ccfos/nightingale/v6/datasource" _ "github.com/ccfos/nightingale/v6/datasource/ck" + _ "github.com/ccfos/nightingale/v6/datasource/doris" "github.com/ccfos/nightingale/v6/datasource/es" _ "github.com/ccfos/nightingale/v6/datasource/mysql" + _ "github.com/ccfos/nightingale/v6/datasource/opensearch" _ "github.com/ccfos/nightingale/v6/datasource/postgresql" "github.com/ccfos/nightingale/v6/dskit/tdengine" "github.com/ccfos/nightingale/v6/models" @@ -82,8 +84,6 @@ func getDatasourcesFromDBLoop(ctx *ctx.Context, fromAPI bool) { if item.PluginType == "elasticsearch" { esN9eToDatasourceInfo(&ds, item) - } else if item.PluginType == "opensearch" { - osN9eToDatasourceInfo(&ds, item) } else if item.PluginType == "tdengine" { tdN9eToDatasourceInfo(&ds, item) } else { @@ -144,24 +144,6 @@ func esN9eToDatasourceInfo(ds *datasource.DatasourceInfo, item models.Datasource ds.Settings["es.enable_write"] = item.SettingsJson["enable_write"] } -// for opensearch -func osN9eToDatasourceInfo(ds *datasource.DatasourceInfo, item models.Datasource) { - ds.Settings = make(map[string]interface{}) - ds.Settings["os.nodes"] = []string{item.HTTPJson.Url} - ds.Settings["os.timeout"] = item.HTTPJson.Timeout - ds.Settings["os.basic"] = es.BasicAuth{ - Username: item.AuthJson.BasicAuthUser, - Password: item.AuthJson.BasicAuthPassword, - } - ds.Settings["os.tls"] = es.TLS{ - SkipTlsVerify: item.HTTPJson.TLS.SkipTlsVerify, - } - ds.Settings["os.version"] = item.SettingsJson["version"] - ds.Settings["os.headers"] = item.HTTPJson.Headers - ds.Settings["os.min_interval"] = item.SettingsJson["min_interval"] - ds.Settings["os.max_shard"] = item.SettingsJson["max_shard"] -} - func PutDatasources(items []datasource.DatasourceInfo) { ids := make([]int64, 0) for _, item := range items { diff --git a/dskit/doris/doris.go b/dskit/doris/doris.go new file mode 100644 index 00000000..0ad1bb06 --- /dev/null +++ b/dskit/doris/doris.go @@ -0,0 +1,543 @@ +package doris + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "reflect" + "strings" + "time" + "unicode" + + "github.com/ccfos/nightingale/v6/dskit/pool" + "github.com/ccfos/nightingale/v6/dskit/types" + + _ "github.com/go-sql-driver/mysql" // MySQL driver + "github.com/mitchellh/mapstructure" +) + +// Doris struct to hold connection details and the connection object +type Doris struct { + Addr string `json:"doris.addr" mapstructure:"doris.addr"` // be node + FeAddr string `json:"doris.fe_addr" mapstructure:"doris.fe_addr"` // fe node + User string `json:"doris.user" mapstructure:"doris.user"` // + Password string `json:"doris.password" mapstructure:"doris.password"` // + Timeout int `json:"doris.timeout" mapstructure:"doris.timeout"` + MaxIdleConns int `json:"doris.max_idle_conns" mapstructure:"doris.max_idle_conns"` + MaxOpenConns int `json:"doris.max_open_conns" mapstructure:"doris.max_open_conns"` + ConnMaxLifetime int `json:"doris.conn_max_lifetime" mapstructure:"doris.conn_max_lifetime"` + MaxQueryRows int `json:"doris.max_query_rows" mapstructure:"doris.max_query_rows"` + ClusterName string `json:"doris.cluster_name" mapstructure:"doris.cluster_name"` + EnableWrite bool `json:"doris.enable_write" mapstructure:"doris.enable_write"` +} + +// NewDorisWithSettings initializes a new Doris instance with the given settings +func NewDorisWithSettings(ctx context.Context, settings interface{}) (*Doris, error) { + newest := new(Doris) + settingsMap := map[string]interface{}{} + if reflect.TypeOf(settings).Kind() == reflect.String { + if err := json.Unmarshal([]byte(settings.(string)), &settingsMap); err != nil { + return nil, err + } + } else { + var assert bool + settingsMap, assert = settings.(map[string]interface{}) + if !assert { + return nil, errors.New("settings type invalid") + } + } + if err := mapstructure.Decode(settingsMap, newest); err != nil { + return nil, err + } + + return newest, nil +} + +// NewConn establishes a new connection to Doris +func (d *Doris) NewConn(ctx context.Context, database string) (*sql.DB, error) { + if len(d.Addr) == 0 { + return nil, errors.New("empty fe-node addr") + } + + // Set default values similar to postgres implementation + if d.Timeout == 0 { + d.Timeout = 60 + } + if d.MaxIdleConns == 0 { + d.MaxIdleConns = 10 + } + if d.MaxOpenConns == 0 { + d.MaxOpenConns = 100 + } + if d.ConnMaxLifetime == 0 { + d.ConnMaxLifetime = 14400 + } + if d.MaxQueryRows == 0 { + d.MaxQueryRows = 500 + } + + var keys []string + keys = append(keys, d.Addr) + keys = append(keys, d.Password, d.User) + if len(database) > 0 { + keys = append(keys, database) + } + cachedkey := strings.Join(keys, ":") + // cache conn with database + conn, ok := pool.PoolClient.Load(cachedkey) + if ok { + return conn.(*sql.DB), nil + } + var db *sql.DB + var err error + defer func() { + if db != nil && err == nil { + pool.PoolClient.Store(cachedkey, db) + } + }() + + // Simplified connection logic for Doris using MySQL driver + dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8", d.User, d.Password, d.Addr, database) + db, err = sql.Open("mysql", dsn) + if err != nil { + return nil, err + } + + // Set connection pool configuration + db.SetMaxIdleConns(d.MaxIdleConns) + db.SetMaxOpenConns(d.MaxOpenConns) + db.SetConnMaxLifetime(time.Duration(d.ConnMaxLifetime) * time.Second) + + return db, nil +} + +// createTimeoutContext creates a context with timeout based on Doris configuration +func (d *Doris) createTimeoutContext(ctx context.Context) (context.Context, context.CancelFunc) { + timeout := d.Timeout + if timeout == 0 { + timeout = 60 + } + return context.WithTimeout(ctx, time.Duration(timeout)*time.Second) +} + +// ShowDatabases lists all databases in Doris +func (d *Doris) ShowDatabases(ctx context.Context) ([]string, error) { + timeoutCtx, cancel := d.createTimeoutContext(ctx) + defer cancel() + + db, err := d.NewConn(timeoutCtx, "") + if err != nil { + return []string{}, err + } + + rows, err := db.QueryContext(timeoutCtx, "SHOW DATABASES") + if err != nil { + return nil, err + } + defer rows.Close() + + var databases []string + for rows.Next() { + var dbName string + if err := rows.Scan(&dbName); err != nil { + continue + } + databases = append(databases, dbName) + } + return databases, nil +} + +// ShowResources lists all resources with type resourceType in Doris +func (d *Doris) ShowResources(ctx context.Context, resourceType string) ([]string, error) { + timeoutCtx, cancel := d.createTimeoutContext(ctx) + defer cancel() + + db, err := d.NewConn(timeoutCtx, "") + if err != nil { + return []string{}, err + } + + // 使用 SHOW RESOURCES 命令 + query := fmt.Sprintf("SHOW RESOURCES WHERE RESOURCETYPE = '%s'", resourceType) + rows, err := db.QueryContext(timeoutCtx, query) + if err != nil { + return nil, fmt.Errorf("failed to execute query: %w", err) + } + defer rows.Close() + + distinctName := make(map[string]struct{}) + + // 获取列信息 + columns, err := rows.Columns() + if err != nil { + return nil, fmt.Errorf("failed to get columns: %w", err) + } + + // 准备接收数据的变量 + values := make([]interface{}, len(columns)) + valuePtrs := make([]interface{}, len(columns)) + for i := range values { + valuePtrs[i] = &values[i] + } + + // 遍历结果集 + for rows.Next() { + err := rows.Scan(valuePtrs...) + if err != nil { + return nil, fmt.Errorf("error scanning row: %w", err) + } + // 提取资源名称并添加到 map 中(自动去重) + if name, ok := values[0].([]byte); ok { + distinctName[string(name)] = struct{}{} + } else if nameStr, ok := values[0].(string); ok { + distinctName[nameStr] = struct{}{} + } + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("error iterating rows: %w", err) + } + + // 将 map 转换为切片 + var resources []string + for name := range distinctName { + resources = append(resources, name) + } + + return resources, nil +} + +// ShowTables lists all tables in a given database +func (d *Doris) ShowTables(ctx context.Context, database string) ([]string, error) { + timeoutCtx, cancel := d.createTimeoutContext(ctx) + defer cancel() + + db, err := d.NewConn(timeoutCtx, database) + if err != nil { + return nil, err + } + + query := fmt.Sprintf("SHOW TABLES IN %s", database) + rows, err := db.QueryContext(timeoutCtx, query) + if err != nil { + return nil, err + } + defer rows.Close() + + var tables []string + for rows.Next() { + var tableName string + if err := rows.Scan(&tableName); err != nil { + continue + } + tables = append(tables, tableName) + } + return tables, nil +} + +// DescTable describes the schema of a specified table in Doris +func (d *Doris) DescTable(ctx context.Context, database, table string) ([]*types.ColumnProperty, error) { + timeoutCtx, cancel := d.createTimeoutContext(ctx) + defer cancel() + + db, err := d.NewConn(timeoutCtx, database) + if err != nil { + return nil, err + } + + query := fmt.Sprintf("DESCRIBE %s.%s", database, table) + rows, err := db.QueryContext(timeoutCtx, query) + if err != nil { + return nil, err + } + defer rows.Close() + + // 日志报表中需要把 .type 转化成内部类型 + // TODO: 是否有复合类型, Array/JSON/Tuple/Nested, 是否有更多的类型 + convertDorisType := func(origin string) (string, bool) { + lower := strings.ToLower(origin) + switch lower { + case "double": + return types.LogExtractValueTypeFloat, true + + case "datetime", "date": + return types.LogExtractValueTypeDate, false + + case "text": + return types.LogExtractValueTypeText, true + + default: + if strings.Contains(lower, "int") { + return types.LogExtractValueTypeLong, true + } + // 日期类型统一按照.date处理 + if strings.HasPrefix(lower, "date") { + return types.LogExtractValueTypeDate, false + } + if strings.HasPrefix(lower, "varchar") || strings.HasPrefix(lower, "char") { + return types.LogExtractValueTypeText, true + } + if strings.HasPrefix(lower, "decimal") { + return types.LogExtractValueTypeFloat, true + } + } + + return origin, false + } + + var columns []*types.ColumnProperty + for rows.Next() { + var ( + field string + typ string + null string + key string + defaultValue sql.NullString + extra string + ) + if err := rows.Scan(&field, &typ, &null, &key, &defaultValue, &extra); err != nil { + continue + } + type2, indexable := convertDorisType(typ) + columns = append(columns, &types.ColumnProperty{ + Field: field, + Type: typ, // You might want to convert MySQL types to your custom types + + Type2: type2, + Indexable: indexable, + }) + } + return columns, nil +} + +// SelectRows selects rows from a specified table in Doris based on a given query with MaxQueryRows check +func (d *Doris) SelectRows(ctx context.Context, database, table, query string) ([]map[string]interface{}, error) { + sql := fmt.Sprintf("SELECT * FROM %s.%s", database, table) + if query != "" { + sql += " " + query + } + + // 检查查询结果行数 + err := d.CheckMaxQueryRows(ctx, database, sql) + if err != nil { + return nil, err + } + + return d.ExecQuery(ctx, database, sql) +} + +// ExecQuery executes a given SQL query in Doris and returns the results +func (d *Doris) ExecQuery(ctx context.Context, database string, sql string) ([]map[string]interface{}, error) { + timeoutCtx, cancel := d.createTimeoutContext(ctx) + defer cancel() + + db, err := d.NewConn(timeoutCtx, database) + if err != nil { + return nil, err + } + + rows, err := db.QueryContext(timeoutCtx, sql) + if err != nil { + return nil, err + } + defer rows.Close() + + columns, err := rows.Columns() + if err != nil { + return nil, err + } + + var results []map[string]interface{} + + for rows.Next() { + columnValues := make([]interface{}, len(columns)) + columnPointers := make([]interface{}, len(columns)) + for i := range columnValues { + columnPointers[i] = &columnValues[i] + } + + if err := rows.Scan(columnPointers...); err != nil { + continue + } + + rowMap := make(map[string]interface{}) + for i, colName := range columns { + val := columnValues[i] + bytes, ok := val.([]byte) + if ok { + rowMap[colName] = string(bytes) + } else { + rowMap[colName] = val + } + } + results = append(results, rowMap) + } + return results, nil +} + +// ExecContext executes a given SQL query in Doris and returns the results +func (d *Doris) ExecContext(ctx context.Context, database string, sql string) error { + timeoutCtx, cancel := d.createTimeoutContext(ctx) + defer cancel() + + db, err := d.NewConn(timeoutCtx, database) + if err != nil { + return err + } + + _, err = db.ExecContext(timeoutCtx, sql) + return err +} + +// ExecBatchSQL 执行多条 SQL 语句 +func (d *Doris) ExecBatchSQL(ctx context.Context, database string, sqlBatch string) error { + // 分割 SQL 语句 + sqlStatements := SplitSQLStatements(sqlBatch) + + // 逐条执行 SQL 语句 + for _, ql := range sqlStatements { + // 跳过空语句 + ql = strings.TrimSpace(ql) + if ql == "" { + continue + } + + // 检查是否是 CREATE DATABASE 语句 + isCreateDB := strings.HasPrefix(strings.ToUpper(ql), "CREATE DATABASE") + // strings.HasPrefix(strings.ToUpper(sql), "CREATE SCHEMA") // 暂时不支持CREATE SCHEMA + + // 对于 CREATE DATABASE 语句,使用空数据库名连接 + currentDB := database + if isCreateDB { + currentDB = "" + } + + // 执行单条 SQL,ExecContext 内部已经包含超时处理 + err := d.ExecContext(ctx, currentDB, ql) + if err != nil { + return fmt.Errorf("exec sql failed, sql:%s, err:%w", sqlBatch, err) + } + } + + return nil +} + +// SplitSQLStatements 将多条 SQL 语句分割成单独的语句 +func SplitSQLStatements(sqlBatch string) []string { + var statements []string + var currentStatement strings.Builder + + // 状态标记 + var ( + inString bool // 是否在字符串内 + inComment bool // 是否在单行注释内 + inMultilineComment bool // 是否在多行注释内 + escaped bool // 前一个字符是否为转义字符 + ) + + for i := 0; i < len(sqlBatch); i++ { + char := sqlBatch[i] + currentStatement.WriteByte(char) + + // 处理转义字符 + if inString && char == '\\' { + escaped = !escaped + continue + } + + // 处理字符串 + if char == '\'' && !inComment && !inMultilineComment { + if !escaped { + inString = !inString + } + escaped = false + continue + } + + // 处理单行注释 + if !inString && !inMultilineComment && !inComment && char == '-' && i+1 < len(sqlBatch) && sqlBatch[i+1] == '-' { + inComment = true + currentStatement.WriteByte(sqlBatch[i+1]) // 写入第二个'-' + i++ + continue + } + + // 处理多行注释开始 + if !inString && !inComment && char == '/' && i+1 < len(sqlBatch) && sqlBatch[i+1] == '*' { + inMultilineComment = true + currentStatement.WriteByte(sqlBatch[i+1]) // 写入'*' + i++ + continue + } + + // 处理多行注释结束 + if inMultilineComment && char == '*' && i+1 < len(sqlBatch) && sqlBatch[i+1] == '/' { + inMultilineComment = false + currentStatement.WriteByte(sqlBatch[i+1]) // 写入'/' + i++ + continue + } + + // 处理换行符,结束单行注释 + if inComment && (char == '\n' || char == '\r') { + inComment = false + } + + // 分割SQL语句 + if char == ';' && !inString && !inMultilineComment && !inComment { + // 收集到分号后面的单行注释(如果有) + for j := i + 1; j < len(sqlBatch); j++ { + nextChar := sqlBatch[j] + + // 检查是否是注释开始 + if nextChar == '-' && j+1 < len(sqlBatch) && sqlBatch[j+1] == '-' { + // 找到了注释,添加到当前语句 + currentStatement.WriteByte(nextChar) // 添加'-' + currentStatement.WriteByte(sqlBatch[j+1]) // 添加第二个'-' + j++ + + // 读取直到行尾 + for k := j + 1; k < len(sqlBatch); k++ { + commentChar := sqlBatch[k] + currentStatement.WriteByte(commentChar) + j = k + + if commentChar == '\n' || commentChar == '\r' { + break + } + } + i = j + break + } else if !isWhitespace(nextChar) { + // 非注释且非空白字符,停止收集 + break + } else { + // 是空白字符,添加到当前语句 + currentStatement.WriteByte(nextChar) + i = j + } + } + + statements = append(statements, strings.TrimSpace(currentStatement.String())) + currentStatement.Reset() + continue + } + + escaped = false + } + + // 处理最后一条可能没有分号的语句 + lastStatement := strings.TrimSpace(currentStatement.String()) + if lastStatement != "" { + statements = append(statements, lastStatement) + } + + return statements +} + +// 判断字符是否为空白字符 +func isWhitespace(c byte) bool { + return unicode.IsSpace(rune(c)) +} diff --git a/dskit/doris/logs.go b/dskit/doris/logs.go new file mode 100644 index 00000000..18a49229 --- /dev/null +++ b/dskit/doris/logs.go @@ -0,0 +1,36 @@ +package doris + +import ( + "context" + "sort" +) + +// 日志相关的操作 +const ( + TimeseriesAggregationTimestamp = "__ts__" +) + +// TODO: 待测试, MAP/ARRAY/STRUCT/JSON 等类型能否处理 +func (d *Doris) QueryLogs(ctx context.Context, query *QueryParam) ([]map[string]interface{}, error) { + // 等同于 Query() + return d.Query(ctx, query) +} + +// 本质是查询时序数据, 取第一组, SQL由上层封装, 不再做复杂的解析和截断 +func (d *Doris) QueryHistogram(ctx context.Context, query *QueryParam) ([][]float64, error) { + values, err := d.QueryTimeseries(ctx, query) + if err != nil { + return [][]float64{}, nil + } + if len(values) > 0 && len(values[0].Values) > 0 { + items := values[0].Values + sort.Slice(items, func(i, j int) bool { + if len(items[i]) > 0 && len(items[j]) > 0 { + return items[i][0] < items[j][0] + } + return false + }) + return items, nil + } + return [][]float64{}, nil +} diff --git a/dskit/doris/template.md b/dskit/doris/template.md new file mode 100644 index 00000000..8018dd0a --- /dev/null +++ b/dskit/doris/template.md @@ -0,0 +1,126 @@ +## SQL变量 + +| 字段名 | 含义 | 使用场景 | +| ---- | ---- | ---- | +|database|数据库|无| +|table|表名|| +|time_field|时间戳的字段|| +|query|查询条件|日志原文| +|from|开始时间|| +|to|结束时间|| +|aggregation|聚合算法|时序图| +|field|聚合的字段|时序图| +|limit|分页参数|日志原文| +|offset|分页参数|日志原文| +|interval|直方图的时间粒度|直方图| + +## 日志原文 +### 直方图 + +``` +# 如何计算interval的值 +max := 60 // 最多60个柱子 +interval := ($to-$from) / max +interval = interval - interval%10 +if interval <= 0 { + interval = 60 +} +``` + +``` +SELECT count() as cnt, + FLOOR(UNIX_TIMESTAMP($time_field) / $interval) * $interval AS __ts__ + FROM $table + WHERE $time_field BETWEEN FROM_UNIXTIME($from) AND FROM_UNIXTIME($to) + GROUP BY __ts__; +``` + +``` +{ + "database":"$database", + "sql":"$sql", + "keys:": { + "valueKey":"cnt", + "timeKey":"__ts__" + } +} +``` + +### 日志原文 + +``` +SELECT * from $table + WHERE $time_field BETWEEN FROM_UNIXTIME($from) AND FROM_UNIXTIME($to) + ORDER by $time_filed + LIMIT $limit OFFSET $offset; +``` + +``` +{ + "database":"$database", + "sql":"$sql" +} +``` + +## 时序图 + +### 日志行数 + +``` +SELECT COUNT() AS cnt, DATE_FORMAT(date, '%Y-%m-%d %H:%i:00') AS __ts__ + FROM nginx_access_log + WHERE $time_field BETWEEN FROM_UNIXTIME($from) AND FROM_UNIXTIME($to) + GROUP BY __ts__ +``` + +``` +{ + "database":"$database", + "sql":"$sql", + "keys:": { + "valueKey":"cnt", + "timeKey":"__ts__" + } +} +``` + +### max/min/avg/sum + +``` +SELECT $aggregation($field) AS series, DATE_FORMAT(date, '%Y-%m-%d %H:%i:00') AS __ts__ + FROM nginx_access_log + WHERE $time_field BETWEEN FROM_UNIXTIME($from) AND FROM_UNIXTIME($to) + GROUP BY __ts__ +``` + +``` +{ + "database":"$database", + "sql":"$sql", + "keys:": { + "valueKey":"series", + "timeKey":"__ts__" + } +} +``` + + +### 分位值 + +``` +SELECT percentile($field, 0.95) AS series, DATE_FORMAT(date, '%Y-%m-%d %H:%i:00') AS __ts__ + FROM nginx_access_log + WHERE $time_field BETWEEN FROM_UNIXTIME($from) AND FROM_UNIXTIME($to) + GROUP BY __ts__ +``` + +``` +{ + "database":"$database", + "sql":"$sql", + "keys:": { + "valueKey":"series", + "timeKey":"__ts__" + } +} +``` \ No newline at end of file diff --git a/dskit/doris/timeseries.go b/dskit/doris/timeseries.go new file mode 100644 index 00000000..d0fe9139 --- /dev/null +++ b/dskit/doris/timeseries.go @@ -0,0 +1,108 @@ +package doris + +import ( + "context" + "fmt" + "strings" + + "github.com/ccfos/nightingale/v6/dskit/sqlbase" + "github.com/ccfos/nightingale/v6/dskit/types" +) + +const ( + TimeFieldFormatEpochMilli = "epoch_millis" + TimeFieldFormatEpochSecond = "epoch_second" + TimeFieldFormatDateTime = "datetime" +) + +// 不再拼接SQL, 完全信赖用户的输入 +type QueryParam struct { + Database string `json:"database"` + Sql string `json:"sql"` + Keys types.Keys `json:"keys" mapstructure:"keys"` +} + +var ( + DorisBannedOp = map[string]struct{}{ + "CREATE": {}, + "INSERT": {}, + "ALTER": {}, + "REVOKE": {}, + "DROP": {}, + "RENAME": {}, + "ATTACH": {}, + "DETACH": {}, + "OPTIMIZE": {}, + "TRUNCATE": {}, + "SET": {}, + } +) + +// Query executes a given SQL query in Doris and returns the results with MaxQueryRows check +func (d *Doris) Query(ctx context.Context, query *QueryParam) ([]map[string]interface{}, error) { + // 校验SQL的合法性, 过滤掉 write请求 + sqlItem := strings.Split(strings.ToUpper(query.Sql), " ") + for _, item := range sqlItem { + if _, ok := DorisBannedOp[item]; ok { + return nil, fmt.Errorf("operation %s is forbid, only read db, please check your sql", item) + } + } + + // 检查查询结果行数 + err := d.CheckMaxQueryRows(ctx, query.Database, query.Sql) + if err != nil { + return nil, err + } + + rows, err := d.ExecQuery(ctx, query.Database, query.Sql) + if err != nil { + return nil, err + } + return rows, nil +} + +// QueryTimeseries executes a time series data query using the given parameters with MaxQueryRows check +func (d *Doris) QueryTimeseries(ctx context.Context, query *QueryParam) ([]types.MetricValues, error) { + // 使用 Query 方法执行查询,Query方法内部已包含MaxQueryRows检查 + rows, err := d.Query(ctx, query) + if err != nil { + return nil, err + } + + return sqlbase.FormatMetricValues(query.Keys, rows), nil +} + +// CheckMaxQueryRows checks if the query result exceeds the maximum allowed rows +func (d *Doris) CheckMaxQueryRows(ctx context.Context, database, sql string) error { + timeoutCtx, cancel := d.createTimeoutContext(ctx) + defer cancel() + + cleanedSQL := strings.ReplaceAll(sql, ";", "") + checkQuery := fmt.Sprintf("SELECT COUNT(*) as count FROM (%s) AS subquery;", cleanedSQL) + + // 执行计数查询 + results, err := d.ExecQuery(timeoutCtx, database, checkQuery) + if err != nil { + return err + } + + if len(results) > 0 { + if count, exists := results[0]["count"]; exists { + v, err := sqlbase.ParseFloat64Value(count) + if err != nil { + return err + } + + maxQueryRows := d.MaxQueryRows + if maxQueryRows == 0 { + maxQueryRows = 500 + } + + if v > float64(maxQueryRows) { + return fmt.Errorf("query result rows count %d exceeds the maximum limit %d", int(v), maxQueryRows) + } + } + } + + return nil +} diff --git a/dskit/sqlbase/timeseries.go b/dskit/sqlbase/timeseries.go index ff8b50f8..55f2bd7e 100644 --- a/dskit/sqlbase/timeseries.go +++ b/dskit/sqlbase/timeseries.go @@ -7,6 +7,7 @@ import ( "crypto/md5" "encoding/json" "fmt" + "math" "reflect" "sort" "strconv" @@ -121,6 +122,11 @@ func FormatMetricValues(keys types.Keys, rows []map[string]interface{}, ignoreDe // Compile and store the metric values for metricName, value := range metricValue { + // NaN 无法执行json.Marshal(), 接口会报错 + if math.IsNaN(value) { + continue + } + metrics := make(model.Metric) var labelsStr []string diff --git a/go.mod b/go.mod index e2b6aea0..70feb545 100644 --- a/go.mod +++ b/go.mod @@ -33,6 +33,7 @@ require ( github.com/mitchellh/mapstructure v1.5.0 github.com/mojocn/base64Captcha v1.3.6 github.com/olivere/elastic/v7 v7.0.32 + github.com/opensearch-project/opensearch-go/v2 v2.3.0 github.com/patrickmn/go-cache v2.1.0+incompatible github.com/pelletier/go-toml/v2 v2.0.8 github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index d393c695..85fb4a8a 100644 --- a/go.sum +++ b/go.sum @@ -31,8 +31,21 @@ github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1 github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de h1:FxWPpzIjnTlhPwqqXc4/vE0f7GvRjuAsbW+HOIe8KnA= github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de/go.mod h1:DCaWoUhZrYW9p1lxo/cm8EmUOOzAPSEZNGF2DK1dJgw= +github.com/aws/aws-sdk-go v1.44.263/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= github.com/aws/aws-sdk-go v1.44.302 h1:ST3ko6GrJKn3Xi+nAvxjG3uk/V1pW8KC52WLeIxqqNk= github.com/aws/aws-sdk-go v1.44.302/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= +github.com/aws/aws-sdk-go-v2 v1.18.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= +github.com/aws/aws-sdk-go-v2/config v1.18.25/go.mod h1:dZnYpD5wTW/dQF0rRNLVypB396zWCcPiBIvdvSWHEg4= +github.com/aws/aws-sdk-go-v2/credentials v1.13.24/go.mod h1:jYPYi99wUOPIFi0rhiOvXeSEReVOzBqFNOX5bXYoG2o= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.3/go.mod h1:4Q0UFP0YJf0NrsEuEYHpM9fTSEVnD16Z3uyEF7J9JGM= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33/go.mod h1:7i0PF1ME/2eUPFcjkVIwq+DOygHEoK92t5cDqNgYbIw= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27/go.mod h1:UrHnn3QV/d0pBZ6QBAEQcqFLf8FAzLmoUfPVIueOvoM= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34/go.mod h1:Etz2dj6UHYuw+Xw830KfzCfWGMzqvUTCjUj5b76GVDc= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.27/go.mod h1:EOwBD4J4S5qYszS5/3DpkejfuK+Z5/1uzICfPaZLtqw= +github.com/aws/aws-sdk-go-v2/service/sso v1.12.10/go.mod h1:ouy2P4z6sJN70fR3ka3wD3Ro3KezSxU6eKGQI2+2fjI= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.10/go.mod h1:AFvkxc8xfBe8XA+5St5XIHHrQQtkxqrRincx4hmMHOk= +github.com/aws/aws-sdk-go-v2/service/sts v1.19.0/go.mod h1:BgQOMsg8av8jset59jelyPW7NoZcZXLVpDsXunGDrk8= +github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bitly/go-simplejson v0.5.1 h1:xgwPbetQScXt1gh9BmoJ6j9JMr3TElvuIyjR8pgdoow= @@ -139,6 +152,7 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -189,6 +203,7 @@ github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= @@ -248,6 +263,8 @@ github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olivere/elastic/v7 v7.0.32 h1:R7CXvbu8Eq+WlsLgxmKVKPox0oOwAE/2T9Si5BnvK6E= github.com/olivere/elastic/v7 v7.0.32/go.mod h1:c7PVmLe3Fxq77PIfY/bZmxY/TAamBhCzZ8xDOE09a9k= +github.com/opensearch-project/opensearch-go/v2 v2.3.0 h1:nQIEMr+A92CkhHrZgUhcfsrZjibvB3APXf2a1VwCmMQ= +github.com/opensearch-project/opensearch-go/v2 v2.3.0/go.mod h1:8LDr9FCgUTVoT+5ESjc2+iaZuldqE+23Iq0r1XeNue8= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/paulmach/orb v0.11.1 h1:3koVegMC4X/WeiXYz9iswopaTwMem53NzTJuTF20JzU= @@ -392,6 +409,7 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= @@ -419,6 +437,7 @@ golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -428,6 +447,7 @@ golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= @@ -436,6 +456,7 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= @@ -470,6 +491,7 @@ gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df/go.mod h1:LRQQ+SO6ZHR7tOkp gopkg.in/square/go-jose.v2 v2.6.0 h1:NGk74WTnPKBNUhNzQX7PYcTLUjoq7mzKk2OKbvwk2iI= gopkg.in/square/go-jose.v2 v2.6.0/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/models/alert_rule.go b/models/alert_rule.go index d5620b3c..a226191b 100644 --- a/models/alert_rule.go +++ b/models/alert_rule.go @@ -30,6 +30,8 @@ const ( ELASTICSEARCH = "elasticsearch" MYSQL = "mysql" POSTGRESQL = "pgsql" + DORIS = "doris" + OPENSEARCH = "opensearch" CLICKHOUSE = "ck" ) @@ -1199,7 +1201,9 @@ func (ar *AlertRule) IsInnerRule() bool { ar.Cate == ELASTICSEARCH || ar.Prod == LOKI || ar.Cate == LOKI || ar.Cate == MYSQL || - ar.Cate == POSTGRESQL + ar.Cate == POSTGRESQL || + ar.Cate == DORIS || + ar.Cate == OPENSEARCH } func (ar *AlertRule) GetRuleType() string {