diff --git a/grafana/rmf-app/pkg/plugin/cache/channel.go b/grafana/rmf-app/pkg/plugin/cache/channel.go index 776b3d7c..28887844 100644 --- a/grafana/rmf-app/pkg/plugin/cache/channel.go +++ b/grafana/rmf-app/pkg/plugin/cache/channel.go @@ -35,6 +35,8 @@ type Channel struct { TimeRange backend.TimeRange Absolute bool Step time.Duration + Interval time.Duration + Span time.Duration Fields frame.SeriesFields } diff --git a/grafana/rmf-app/pkg/plugin/config.go b/grafana/rmf-app/pkg/plugin/config.go index 04e1690a..7c577ff5 100644 --- a/grafana/rmf-app/pkg/plugin/config.go +++ b/grafana/rmf-app/pkg/plugin/config.go @@ -29,28 +29,33 @@ import ( const DefaultHttpTimeout = 60 const DefaultCacheSizeMB = 1024 const MinimalCacheSizeMB = 128 +const MinBatchRequestMinutes = 10 +const MaxBatchRequestMinutes = 120 +const DefaultBatchRequestMinutes = MaxBatchRequestMinutes type Config struct { - URL string - Timeout int - CacheSize int - Username string - Password string // #nosec G117 - JSON struct { + URL string + Timeout int + CacheSize int + BatchRequestMinutes int + Username string + Password string // #nosec G117 + JSON struct { // Conventional Grafana HTTP config (see the `DataSourceHttpSettings` UI element) TimeoutRaw string `json:"timeout"` TlsSkipVerify bool `json:"tlsSkipVerify"` DisableCompression bool `json:"disableCompression"` // Custom RMF settings. CacheSizeRaw string `json:"cacheSize"` - // Legacy custom RMF settings. We should ge rid of these at some point. - Server *string `json:"path"` - Port string `json:"port"` - SSL bool `json:"ssl"` - Username string `json:"userName"` - Password string `json:"password"` // #nosec G117 - SSLVerify bool `json:"skipVerify"` // NB: the meaning of JSON field is inverted. - OmegamonDs string `json:"omegamonDs"` + // Legacy custom RMF settings. We should get rid of these at some point. + Server *string `json:"path"` + Port string `json:"port"` + SSL bool `json:"ssl"` + Username string `json:"userName"` + Password string `json:"password"` // #nosec G117 + SSLVerify bool `json:"skipVerify"` // NB: the meaning of JSON field is inverted. + OmegamonDs string `json:"omegamonDs"` + BatchRequestInterval string `json:"batchRequestInterval"` } } @@ -98,5 +103,17 @@ func (ds *RMFDatasource) getConfig(settings backend.DataSourceInstanceSettings) logger.Warn("cache size is not small, using minimal value", "cacheSize", config.CacheSize) config.CacheSize = MinimalCacheSizeMB } + if config.BatchRequestMinutes, err = strconv.Atoi(config.JSON.BatchRequestInterval); err != nil { + logger.Warn("batch request interval is not valid, applying default", "batchRequestInterval", config.JSON.BatchRequestInterval) + config.BatchRequestMinutes = DefaultBatchRequestMinutes + } + if config.BatchRequestMinutes < MinBatchRequestMinutes { + logger.Warn("batch request interval is too small, using minimal value", "batchRequestInterval", config.BatchRequestMinutes) + config.BatchRequestMinutes = MinBatchRequestMinutes + } + if config.BatchRequestMinutes > MaxBatchRequestMinutes { + logger.Warn("batch request interval is too large, using maximal value", "batchRequestInterval", config.BatchRequestMinutes) + config.BatchRequestMinutes = MaxBatchRequestMinutes + } return &config, nil } diff --git a/grafana/rmf-app/pkg/plugin/datasource.go b/grafana/rmf-app/pkg/plugin/datasource.go index 2d72d2f0..efed50f6 100644 --- a/grafana/rmf-app/pkg/plugin/datasource.go +++ b/grafana/rmf-app/pkg/plugin/datasource.go @@ -58,16 +58,18 @@ const ChannelCacheSizeMB = 64 const SdsDelay = 5 * time.Second const TimeSeriesType = "TimeSeries" const QueryPattern = `^([A-Za-z_][A-Za-z0-9_]*)\(([^)]*)\)$` // e.g., banner(resource), table(resource), caption(resource) +const DDS_BATCH_REQUESTS_FUNCTIONALITY_MASK = 0x1000 type RMFDatasource struct { - uid string - name string - channelCache *cache.ChannelCache - frameCache *cache.FrameCache - ddsClient *dds.Client - single singleflight.Group - omegamonDs string - queryMatcher *regexp.Regexp + uid string + name string + channelCache *cache.ChannelCache + frameCache *cache.FrameCache + ddsClient *dds.Client + single singleflight.Group + omegamonDs string + queryMatcher *regexp.Regexp + batchRequestInterval int } // NewRMFDatasource creates a new instance of the RMF datasource. @@ -85,10 +87,12 @@ func NewRMFDatasource(ctx context.Context, settings backend.DataSourceInstanceSe ds.channelCache = cache.NewChannelCache(ChannelCacheSizeMB) ds.frameCache = cache.NewFrameCache(config.CacheSize) ds.omegamonDs = config.JSON.OmegamonDs + ds.batchRequestInterval = config.BatchRequestMinutes logger.Info("initialized a datasource", "uid", settings.UID, "name", settings.Name, "url", config.URL, "timeout", config.Timeout, "cacheSize", config.CacheSize, - "username", config.Username, "tlsSkipVerify", config.JSON.TlsSkipVerify) + "username", config.Username, "tlsSkipVerify", config.JSON.TlsSkipVerify, + "batchRequestInterval", ds.batchRequestInterval) ds.queryMatcher = regexp.MustCompile(QueryPattern) return ds, nil } @@ -315,6 +319,19 @@ func (ds *RMFDatasource) QueryData(ctx context.Context, req *backend.QueryDataRe start := q.TimeRange.From.UTC() r := dds.NewRequest(params.Resource.Value, start, start, step) f, jump, err := ds.getCachedTSFrames(r, q.TimeRange.To.UTC(), step, fields) + if ds.supportsBatchRequests() { + span := step + step = time.Duration(ds.batchRequestInterval) * time.Minute + logger.Debug("### using batch requests", "batchSpan", span.Seconds(), "step", step.Seconds()) + r = dds.NewBatchRequest(params.Resource.Value, start, step, span) + f2, _, err2 := ds.getCachedTSFrames(r, q.TimeRange.To.UTC(), step, fields) + if f == nil || err != nil { + f = f2 + } else if f2 != nil && err2 == nil { + step = frame.GetDuration(f2) + f, err = frame.MergeInto(f, f2) + } + } if f == nil || err != nil { f = frame.TaggedFrame(start, "No data yet...") } @@ -328,6 +345,8 @@ func (ds *RMFDatasource) QueryData(ctx context.Context, req *backend.QueryDataRe TimeRange: backend.TimeRange{From: start.Add(jump), To: q.TimeRange.To.UTC()}, Absolute: params.AbsoluteTime, Step: step, + Interval: q.Interval, + Span: r.Span, Fields: fields, } err = ds.channelCache.Set(channel.Path, &cachedChannel) @@ -403,13 +422,20 @@ func (ds *RMFDatasource) RunStream(ctx context.Context, req *backend.RunStreamRe } res := c.Resource step := c.Step + span := c.Span + interval := c.Interval absolute := c.Absolute from := c.TimeRange.From to := c.TimeRange.To fields := c.Fields logger.Debug("starting streaming", "step", step.String(), "path", req.Path) - r := dds.NewRequest(res, from, from, step) + var r *dds.Request + if ds.supportsBatchRequests() { + r = dds.NewBatchRequest(res, from, step, span) + } else { + r = dds.NewRequest(res, from, from, step) + } // Stream historical part of time series stop := to @@ -417,11 +443,11 @@ func (ds *RMFDatasource) RunStream(ctx context.Context, req *backend.RunStreamRe if !absolute { stop = time.Now().Add(-SdsDelay) } - if r.TimeRange.To.After(stop) { + if r.TimeRange.From.After(stop) { logger.Debug("finished with historical data", "request", r.String(), "path", req.Path) break } - f, jump, err := ds.getCachedTSFrames(r, stop, step, fields) + f, _, err := ds.getCachedTSFrames(r, stop, step, fields) if err != nil { logger.Info("streaming stopped", "reason", err, "path", req.Path) return nil @@ -431,10 +457,20 @@ func (ds *RMFDatasource) RunStream(ctx context.Context, req *backend.RunStreamRe logger.Info("streaming stopped", "reason", err, "path", req.Path) return nil } - r.Add(jump) + r.Add(frame.GetDuration(f)) continue } if err := ds.serveTSFrame(ctx, sender, fields, r, true); err != nil { + if gpme, ok := errors.AsType[*dds.GpmError](err); ok && gpme.Id == dds.MESSAGE_ID_NOT_ENOUGTH_MEMORY { + logger.Debug("GPM0555I: reduce step", "step", step.Minutes()) + step = step / 2 + if step < MinBatchRequestMinutes*time.Minute { + logger.Info("streaming stopped", "reason", "step is too small", "path", req.Path, "step", step.Minutes(), "error", err) + return nil + } + r = dds.NewBatchRequest(res, r.TimeRange.From, step, span) + continue + } logger.Info("streaming stopped", "reason", err, "path", req.Path) return nil } @@ -442,6 +478,11 @@ func (ds *RMFDatasource) RunStream(ctx context.Context, req *backend.RunStreamRe } if !absolute { // Stream live data as it's being collected + if ds.supportsBatchRequests() { + mintime := ds.ddsClient.GetCachedMintime() + step = getStep(interval, mintime) + r = dds.NewRequest(res, stop, stop, step) + } for { if err := ds.serveTSFrame(ctx, sender, fields, r, false); err != nil { logger.Info("streaming stopped", "reason", err, "path", req.Path) @@ -479,3 +520,8 @@ func (d *RMFDatasource) parseQuery(resource string) (string, string) { } return "", resource } + +func (ds *RMFDatasource) supportsBatchRequests() bool { + fl := ds.ddsClient.GetFunctionality() + return fl&DDS_BATCH_REQUESTS_FUNCTIONALITY_MASK == DDS_BATCH_REQUESTS_FUNCTIONALITY_MASK +} diff --git a/grafana/rmf-app/pkg/plugin/dds/client.go b/grafana/rmf-app/pkg/plugin/dds/client.go index e9da7a5c..ac9e6e02 100644 --- a/grafana/rmf-app/pkg/plugin/dds/client.go +++ b/grafana/rmf-app/pkg/plugin/dds/client.go @@ -27,6 +27,7 @@ import ( "mime" "net/http" "net/url" + "strconv" "strings" "sync" "sync/atomic" @@ -46,6 +47,7 @@ const ContainedPath = "/gpm/contained" const PerformPath = "/gpm/perform" const XslHeadersPath = "/gpm/include/reptrans.xsl" const FullReportPath = "/gpm/rmfm3" +const TimeSeriesPath = "/gpm/performTimeSeries" var MayHaveExt = map[string]bool{ IndexPath: true, @@ -58,15 +60,16 @@ var ErrParse = errors.New("unable to parse DDS response") var ErrUnauthorized = errors.New("not authorized to access DDS") type Client struct { - baseUrl string - username string - password string - httpClient *http.Client - headerMap *HeaderMap - timeData *TimeData - resource *Resource - systems []string - useXmlExt atomic.Bool + baseUrl string + username string + password string + httpClient *http.Client + headerMap *HeaderMap + timeData *TimeData + resource *Resource + systems []string + useXmlExt atomic.Bool + functionality atomic.Int32 stopChan chan struct{} closeOnce sync.Once @@ -268,6 +271,12 @@ func (c *Client) updateMetadata() *TimeData { c.timeData = timeData c.resource = resource c.systems = systems + fl, parseErr := strconv.ParseInt(response.Server.Functionality, 10, 32) + if parseErr != nil { + logger.Warn("unable to parse DDS functionality", "value", response.Server.Functionality, "error", parseErr) + } else { + c.functionality.Store(int32(fl)) + } c.rwMutex.Unlock() logger.Debug("DDS time data updated") return timeData, nil @@ -299,3 +308,8 @@ func (c *Client) GetSystems() []string { c.ensureTimeData() return c.systems } + +func (c *Client) GetFunctionality() int32 { + c.ensureTimeData() + return c.functionality.Load() +} diff --git a/grafana/rmf-app/pkg/plugin/dds/request.go b/grafana/rmf-app/pkg/plugin/dds/request.go index c5106831..7e7eaacb 100644 --- a/grafana/rmf-app/pkg/plugin/dds/request.go +++ b/grafana/rmf-app/pkg/plugin/dds/request.go @@ -20,6 +20,7 @@ package dds import ( "fmt" "net/url" + "strconv" "strings" "time" @@ -29,14 +30,23 @@ import ( type Request struct { Resource string TimeRange data.TimeRange + Batched bool + Span time.Duration } func NewRequest(res string, from time.Time, to time.Time, step time.Duration) *Request { - q := Request{Resource: res, TimeRange: data.TimeRange{From: from, To: to}} + q := Request{Resource: res, TimeRange: data.TimeRange{From: from, To: to}, Batched: false} q.Align(step) return &q } +func NewBatchRequest(res string, t time.Time, step time.Duration, span time.Duration) *Request { + startHour := t.Truncate(step) + endHour := startHour.Add(step) + q := Request{Resource: res, TimeRange: data.TimeRange{From: startHour, To: endHour}, Batched: true, Span: span} + return &q +} + func (r *Request) Align(d time.Duration) { r.TimeRange.From = r.TimeRange.From.Truncate(d) t := r.TimeRange.To.Truncate(d) @@ -76,6 +86,13 @@ func (r *Request) pathWithParams(timeOfs time.Duration) (string, []string, error if path == "" { path = PerformPath } - params = append(params, "range", r.formatRange(timeOfs)) + var rangeParam = "range" + if r.Batched { + params = append(params, "batchSpan", strconv.Itoa(int(r.Span.Seconds()))) + path = TimeSeriesPath + timeOfs = 0 + rangeParam = "rangeUtc" + } + params = append(params, rangeParam, r.formatRange(timeOfs)) return path, params, nil } diff --git a/grafana/rmf-app/pkg/plugin/dds/response.go b/grafana/rmf-app/pkg/plugin/dds/response.go index d1cc8002..d5565d68 100644 --- a/grafana/rmf-app/pkg/plugin/dds/response.go +++ b/grafana/rmf-app/pkg/plugin/dds/response.go @@ -47,8 +47,41 @@ var AcceptableMessages = map[string]bool{ "GPM0709I": true, // Filter has caused no data to be returned } +const MESSAGE_ID_NOT_ENOUGTH_MEMORY = "GPM0555I" +const MESSAGE_SEVERITY_WARNING = 2 + type Response struct { - Reports []Report `json:"report"` + Reports []Report `json:"report"` + Server Server `json:"server"` + TimeSeries *TimeSeries `json:"timeSeries"` +} + +type Server struct { + Functionality string `json:"functionality"` + Version string `json:"version"` +} + +type TimeSeries struct { + Metric *Metric + Message *Message + Resource *Resource `json:"resource"` + Series []Series `json:"series"` +} + +type Series struct { + TimeData *TimeDataShort `json:"timeData"` + Message *Message + Rows []Row `json:"row"` +} + +type GpmError struct { + Id string + Severity int + Description string +} + +func (e *GpmError) Error() string { + return fmt.Sprintf("DDS error: %s (severity %d). %s", e.Id, e.Severity, e.Description) } type Report struct { @@ -73,14 +106,18 @@ func (r Report) GetRowNames() []string { return names } -type TimeData struct { - // FIXME: don't use these in report headers: they are in DDS timezone. Remove from the mapping. +type TimeDataShort struct { LocalStart DateTime `json:"localStart"` LocalEnd DateTime `json:"localEnd"` - LocalPrev DateTime `json:"localPrev"` - LocalNext DateTime `json:"localNext"` UTCStart DateTime `json:"utcStart"` UTCEnd DateTime `json:"utcEnd"` +} + +type TimeData struct { + // FIXME: don't use these in report headers: they are in DDS timezone. Remove from the mapping. + TimeDataShort + LocalPrev DateTime `json:"localPrev"` + LocalNext DateTime `json:"localNext"` NumSamples int `json:"numSamples"` NumSystems *int `json:"numSystems,omitempty"` MinTime struct { diff --git a/grafana/rmf-app/pkg/plugin/frame/frame.go b/grafana/rmf-app/pkg/plugin/frame/frame.go index 5fffd335..7a45e609 100644 --- a/grafana/rmf-app/pkg/plugin/frame/frame.go +++ b/grafana/rmf-app/pkg/plugin/frame/frame.go @@ -91,21 +91,60 @@ func Build(ddsResponse *dds.Response, headers *dds.HeaderMap, wide bool) (*data. if format == dds.ReportFormat { return buildForReport(&report, headers, frameName), nil } else if wide { - return buildWideForMetric(&report, frameName), nil + return buildWideForMetric(report.Metric, &report.TimeData.TimeDataShort, &report.Rows, frameName), nil } else { return buildLongForMetric(&report, frameName), nil } } +func validateTimeSeriesResponse(ddsResponse *dds.Response) error { + if ddsResponse.TimeSeries == nil { + return errors.New("no time series data in DDS response") + } + if ddsResponse.TimeSeries.Message != nil && ddsResponse.TimeSeries.Message.Severity > 2 { + return &dds.GpmError{ + Id: ddsResponse.TimeSeries.Message.Id, + Severity: ddsResponse.TimeSeries.Message.Severity, + Description: ddsResponse.TimeSeries.Message.Description, + } + } + seriesNum := len(ddsResponse.TimeSeries.Series) + if seriesNum == 0 { + return errors.New("no series in DDS response") + } + timeSeries := ddsResponse.TimeSeries + if timeSeries.Metric == nil { + return errors.New("no metric data in DDS response") + } + if _, ok := dds.SupportedFormats[timeSeries.Metric.Format]; !ok { + return fmt.Errorf("unsupported data format (%s) in DDS response", timeSeries.Metric.Format) + } + return nil +} + +func BuildBatch(ddsResponse *dds.Response) (*data.Frame, error) { + err := validateTimeSeriesResponse(ddsResponse) + if err != nil { + return nil, err + } + var result *data.Frame + for _, series := range ddsResponse.TimeSeries.Series { + frameName := strings.Trim(ddsResponse.TimeSeries.Metric.Description, " ") + frame := buildWideForMetric(ddsResponse.TimeSeries.Metric, series.TimeData, &series.Rows, frameName) + result, _ = MergeInto(result, frame) + } + return result, nil +} + // buildWideForMetric creates a time series data frame for a metric from pre-parsed DDS response. // Grafana frame format: wide. -func buildWideForMetric(report *dds.Report, frameName string) *data.Frame { - timestamp := report.TimeData.UTCEnd.Time - metricFormat := report.Metric.Format +func buildWideForMetric(metric *dds.Metric, timeData *dds.TimeDataShort, rows *[]dds.Row, frameName string) *data.Frame { + timestamp := timeData.UTCEnd.Time + metricFormat := metric.Format labels := getFrameLabels(metricFormat, frameName) resultFrame := data.NewFrame(frameName, data.NewField("time", nil, []time.Time{timestamp})) - iterateMetricRows(report, frameName, + iterateMetricRows(rows, frameName, func(name string, value *float64) { newField := data.NewField(name, labels, []*float64{value}) resultFrame.Fields = append(resultFrame.Fields, newField) @@ -144,7 +183,7 @@ func buildLongForMetric(report *dds.Report, frameName string) *data.Frame { data.NewField(valField, nil, []*float64{}), ) - iterateMetricRows(report, frameName, + iterateMetricRows(&report.Rows, frameName, func(name string, value *float64) { resultFrame.Fields[0].Append(timestamp) resultFrame.Fields[1].Append(name) @@ -155,10 +194,10 @@ func buildLongForMetric(report *dds.Report, frameName string) *data.Frame { } // iterateMetricRows parses metric key-value pairs and passes them to `process` while iterating over rows. -func iterateMetricRows(report *dds.Report, defaultName string, process func(name string, value *float64)) { +func iterateMetricRows(rows *[]dds.Row, defaultName string, process func(name string, value *float64)) { colMap := map[string]bool{} var sb strings.Builder - for _, jsonRow := range report.Rows { + for _, jsonRow := range *rows { cols := jsonRow.Cols name, rawValue := cols[0], cols[1] if name == "*NoData*" { diff --git a/grafana/rmf-app/pkg/plugin/frame/utils.go b/grafana/rmf-app/pkg/plugin/frame/utils.go index a24ab9b5..8ea5d7ca 100644 --- a/grafana/rmf-app/pkg/plugin/frame/utils.go +++ b/grafana/rmf-app/pkg/plugin/frame/utils.go @@ -57,13 +57,17 @@ type FieldInfo struct { // values for the field in those frames will be discarded by frontend. func SyncFieldNames(seriesFields SeriesFields, frame *data.Frame, frameTime time.Time) { fieldNames := map[string]bool{} + frameRows := 1 + if len(frame.Fields) > 0 { + frameRows = frame.Fields[0].Len() + } for _, field := range frame.Fields { seriesFields[field.Name] = FieldInfo{Time: frameTime, Labels: field.Labels} fieldNames[field.Name] = true } for key := range seriesFields { if _, ok := fieldNames[key]; !ok { - newField := data.NewField(key, seriesFields[key].Labels, []*float64{nil}) + newField := data.NewField(key, seriesFields[key].Labels, make([]*float64, frameRows)) frame.Fields = append(frame.Fields, newField) } } @@ -200,3 +204,30 @@ func getStringAt(field *data.Field, index int) string { } return value } + +func GetDuration(f *data.Frame) time.Duration { + if f == nil { + return 0 + } + if len(f.Fields) == 0 { + return 0 + } + timeField := f.Fields[0] + if timeField.Type() != data.FieldTypeTime { + return 0 + } + var minTime time.Time + var maxTime time.Time + for i := 0; i < timeField.Len(); i++ { + t, ok := timeField.At(i).(time.Time) + if ok { + if minTime.IsZero() || t.Before(minTime) { + minTime = t + } + if maxTime.IsZero() || t.After(maxTime) { + maxTime = t + } + } + } + return maxTime.Sub(minTime) +} diff --git a/grafana/rmf-app/pkg/plugin/query.go b/grafana/rmf-app/pkg/plugin/query.go index 5a27bb2a..35ad734e 100644 --- a/grafana/rmf-app/pkg/plugin/query.go +++ b/grafana/rmf-app/pkg/plugin/query.go @@ -19,6 +19,7 @@ package plugin import ( "context" + "errors" "time" "github.com/IBM/RMF/grafana/rmf-app/pkg/plugin/cache" @@ -37,7 +38,12 @@ func (ds *RMFDatasource) getFrame(r *dds.Request, wide bool) (*data.Frame, error return nil, err } headers := ds.ddsClient.GetCachedHeaders() - f, err := frame.Build(ddsResponse, headers, wide) + var f *data.Frame + if r.Batched { + f, err = frame.BuildBatch(ddsResponse) + } else { + f, err = frame.Build(ddsResponse, headers, wide) + } if err != nil { return nil, err } @@ -75,7 +81,7 @@ func (ds *RMFDatasource) getCachedTSFrames(r *dds.Request, stop time.Time, step ) // Create a copy of the original request - don't alter it cr := dds.NewRequest(r.Resource, r.TimeRange.From, r.TimeRange.To, step) - for r.TimeRange.To.Before(stop) { + for cr.TimeRange.From.Before(stop) { next := ds.frameCache.Get(cr, true) if next == nil { break @@ -119,6 +125,9 @@ func (ds *RMFDatasource) serveTSFrame(ctx context.Context, sender *backend.Strea logger.Debug("executing query", "request", r.String()) f, err = ds.getFrame(r, true) if err != nil { + if gpme, ok := errors.AsType[*dds.GpmError](err); ok && gpme.Severity > dds.MESSAGE_SEVERITY_WARNING { + return err + } logger.Error("failed to get data", "request", r.String(), "reason", err) f = frame.NoDataFrame(r.TimeRange.To) } else {