Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions grafana/rmf-app/pkg/plugin/cache/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Channel struct {
TimeRange backend.TimeRange
Absolute bool
Step time.Duration
Interval time.Duration
Span time.Duration
Fields frame.SeriesFields
}

Expand Down
45 changes: 31 additions & 14 deletions grafana/rmf-app/pkg/plugin/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,33 @@ import (
const DefaultHttpTimeout = 60
const DefaultCacheSizeMB = 1024
const MinimalCacheSizeMB = 128
const MinBatchRequestMinutes = 10
const MaxBatchRequestMinutes = 120
const DefaultBatchRequestMinutes = MaxBatchRequestMinutes

type Config struct {
URL string
Timeout int
CacheSize int
Username string
Password string // #nosec G117
JSON struct {
URL string
Timeout int
CacheSize int
BatchRequestMinutes int
Username string
Password string // #nosec G117
JSON struct {
// Conventional Grafana HTTP config (see the `DataSourceHttpSettings` UI element)
TimeoutRaw string `json:"timeout"`
TlsSkipVerify bool `json:"tlsSkipVerify"`
DisableCompression bool `json:"disableCompression"`
// Custom RMF settings.
CacheSizeRaw string `json:"cacheSize"`
// Legacy custom RMF settings. We should ge rid of these at some point.
Server *string `json:"path"`
Port string `json:"port"`
SSL bool `json:"ssl"`
Username string `json:"userName"`
Password string `json:"password"` // #nosec G117
SSLVerify bool `json:"skipVerify"` // NB: the meaning of JSON field is inverted.
OmegamonDs string `json:"omegamonDs"`
// Legacy custom RMF settings. We should get rid of these at some point.
Server *string `json:"path"`
Port string `json:"port"`
SSL bool `json:"ssl"`
Username string `json:"userName"`
Password string `json:"password"` // #nosec G117
SSLVerify bool `json:"skipVerify"` // NB: the meaning of JSON field is inverted.
OmegamonDs string `json:"omegamonDs"`
BatchRequestInterval string `json:"batchRequestInterval"`
}
}

Expand Down Expand Up @@ -98,5 +103,17 @@ func (ds *RMFDatasource) getConfig(settings backend.DataSourceInstanceSettings)
logger.Warn("cache size is not small, using minimal value", "cacheSize", config.CacheSize)
config.CacheSize = MinimalCacheSizeMB
}
if config.BatchRequestMinutes, err = strconv.Atoi(config.JSON.BatchRequestInterval); err != nil {
logger.Warn("batch request interval is not valid, applying default", "batchRequestInterval", config.JSON.BatchRequestInterval)
config.BatchRequestMinutes = DefaultBatchRequestMinutes
}
if config.BatchRequestMinutes < MinBatchRequestMinutes {
logger.Warn("batch request interval is too small, using minimal value", "batchRequestInterval", config.BatchRequestMinutes)
config.BatchRequestMinutes = MinBatchRequestMinutes
}
if config.BatchRequestMinutes > MaxBatchRequestMinutes {
logger.Warn("batch request interval is too large, using maximal value", "batchRequestInterval", config.BatchRequestMinutes)
config.BatchRequestMinutes = MaxBatchRequestMinutes
}
return &config, nil
}
72 changes: 59 additions & 13 deletions grafana/rmf-app/pkg/plugin/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,18 @@ const ChannelCacheSizeMB = 64
const SdsDelay = 5 * time.Second
const TimeSeriesType = "TimeSeries"
const QueryPattern = `^([A-Za-z_][A-Za-z0-9_]*)\(([^)]*)\)$` // e.g., banner(resource), table(resource), caption(resource)
const DDS_BATCH_REQUESTS_FUNCTIONALITY_MASK = 0x1000

type RMFDatasource struct {
uid string
name string
channelCache *cache.ChannelCache
frameCache *cache.FrameCache
ddsClient *dds.Client
single singleflight.Group
omegamonDs string
queryMatcher *regexp.Regexp
uid string
name string
channelCache *cache.ChannelCache
frameCache *cache.FrameCache
ddsClient *dds.Client
single singleflight.Group
omegamonDs string
queryMatcher *regexp.Regexp
batchRequestInterval int
}

// NewRMFDatasource creates a new instance of the RMF datasource.
Expand All @@ -85,10 +87,12 @@ func NewRMFDatasource(ctx context.Context, settings backend.DataSourceInstanceSe
ds.channelCache = cache.NewChannelCache(ChannelCacheSizeMB)
ds.frameCache = cache.NewFrameCache(config.CacheSize)
ds.omegamonDs = config.JSON.OmegamonDs
ds.batchRequestInterval = config.BatchRequestMinutes
logger.Info("initialized a datasource",
"uid", settings.UID, "name", settings.Name,
"url", config.URL, "timeout", config.Timeout, "cacheSize", config.CacheSize,
"username", config.Username, "tlsSkipVerify", config.JSON.TlsSkipVerify)
"username", config.Username, "tlsSkipVerify", config.JSON.TlsSkipVerify,
"batchRequestInterval", ds.batchRequestInterval)
ds.queryMatcher = regexp.MustCompile(QueryPattern)
return ds, nil
}
Expand Down Expand Up @@ -315,6 +319,19 @@ func (ds *RMFDatasource) QueryData(ctx context.Context, req *backend.QueryDataRe
start := q.TimeRange.From.UTC()
r := dds.NewRequest(params.Resource.Value, start, start, step)
f, jump, err := ds.getCachedTSFrames(r, q.TimeRange.To.UTC(), step, fields)
if ds.supportsBatchRequests() {
span := step
step = time.Duration(ds.batchRequestInterval) * time.Minute
logger.Debug("### using batch requests", "batchSpan", span.Seconds(), "step", step.Seconds())
r = dds.NewBatchRequest(params.Resource.Value, start, step, span)
f2, _, err2 := ds.getCachedTSFrames(r, q.TimeRange.To.UTC(), step, fields)
if f == nil || err != nil {
f = f2
} else if f2 != nil && err2 == nil {
step = frame.GetDuration(f2)
f, err = frame.MergeInto(f, f2)
}
}
if f == nil || err != nil {
f = frame.TaggedFrame(start, "No data yet...")
}
Expand All @@ -328,6 +345,8 @@ func (ds *RMFDatasource) QueryData(ctx context.Context, req *backend.QueryDataRe
TimeRange: backend.TimeRange{From: start.Add(jump), To: q.TimeRange.To.UTC()},
Absolute: params.AbsoluteTime,
Step: step,
Interval: q.Interval,
Span: r.Span,
Fields: fields,
}
err = ds.channelCache.Set(channel.Path, &cachedChannel)
Expand Down Expand Up @@ -403,25 +422,32 @@ func (ds *RMFDatasource) RunStream(ctx context.Context, req *backend.RunStreamRe
}
res := c.Resource
step := c.Step
span := c.Span
interval := c.Interval
absolute := c.Absolute
from := c.TimeRange.From
to := c.TimeRange.To
fields := c.Fields

logger.Debug("starting streaming", "step", step.String(), "path", req.Path)
r := dds.NewRequest(res, from, from, step)
var r *dds.Request
if ds.supportsBatchRequests() {
r = dds.NewBatchRequest(res, from, step, span)
} else {
r = dds.NewRequest(res, from, from, step)
}

// Stream historical part of time series
stop := to
for {
if !absolute {
stop = time.Now().Add(-SdsDelay)
}
if r.TimeRange.To.After(stop) {
if r.TimeRange.From.After(stop) {
logger.Debug("finished with historical data", "request", r.String(), "path", req.Path)
break
}
f, jump, err := ds.getCachedTSFrames(r, stop, step, fields)
f, _, err := ds.getCachedTSFrames(r, stop, step, fields)
if err != nil {
logger.Info("streaming stopped", "reason", err, "path", req.Path)
return nil
Expand All @@ -431,17 +457,32 @@ func (ds *RMFDatasource) RunStream(ctx context.Context, req *backend.RunStreamRe
logger.Info("streaming stopped", "reason", err, "path", req.Path)
return nil
}
r.Add(jump)
r.Add(frame.GetDuration(f))
continue
}
if err := ds.serveTSFrame(ctx, sender, fields, r, true); err != nil {
if gpme, ok := errors.AsType[*dds.GpmError](err); ok && gpme.Id == dds.MESSAGE_ID_NOT_ENOUGTH_MEMORY {
logger.Debug("GPM0555I: reduce step", "step", step.Minutes())
step = step / 2
if step < MinBatchRequestMinutes*time.Minute {
logger.Info("streaming stopped", "reason", "step is too small", "path", req.Path, "step", step.Minutes(), "error", err)
return nil
}
r = dds.NewBatchRequest(res, r.TimeRange.From, step, span)
continue
}
logger.Info("streaming stopped", "reason", err, "path", req.Path)
return nil
}
r.Add(step)
}
if !absolute {
// Stream live data as it's being collected
if ds.supportsBatchRequests() {
mintime := ds.ddsClient.GetCachedMintime()
step = getStep(interval, mintime)
r = dds.NewRequest(res, stop, stop, step)
}
for {
if err := ds.serveTSFrame(ctx, sender, fields, r, false); err != nil {
logger.Info("streaming stopped", "reason", err, "path", req.Path)
Expand Down Expand Up @@ -479,3 +520,8 @@ func (d *RMFDatasource) parseQuery(resource string) (string, string) {
}
return "", resource
}

func (ds *RMFDatasource) supportsBatchRequests() bool {
fl := ds.ddsClient.GetFunctionality()
return fl&DDS_BATCH_REQUESTS_FUNCTIONALITY_MASK == DDS_BATCH_REQUESTS_FUNCTIONALITY_MASK
}
32 changes: 23 additions & 9 deletions grafana/rmf-app/pkg/plugin/dds/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"mime"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand All @@ -46,6 +47,7 @@ const ContainedPath = "/gpm/contained"
const PerformPath = "/gpm/perform"
const XslHeadersPath = "/gpm/include/reptrans.xsl"
const FullReportPath = "/gpm/rmfm3"
const TimeSeriesPath = "/gpm/performTimeSeries"

var MayHaveExt = map[string]bool{
IndexPath: true,
Expand All @@ -58,15 +60,16 @@ var ErrParse = errors.New("unable to parse DDS response")
var ErrUnauthorized = errors.New("not authorized to access DDS")

type Client struct {
baseUrl string
username string
password string
httpClient *http.Client
headerMap *HeaderMap
timeData *TimeData
resource *Resource
systems []string
useXmlExt atomic.Bool
baseUrl string
username string
password string
httpClient *http.Client
headerMap *HeaderMap
timeData *TimeData
resource *Resource
systems []string
useXmlExt atomic.Bool
functionality atomic.Int32

stopChan chan struct{}
closeOnce sync.Once
Expand Down Expand Up @@ -268,6 +271,12 @@ func (c *Client) updateMetadata() *TimeData {
c.timeData = timeData
c.resource = resource
c.systems = systems
fl, parseErr := strconv.ParseInt(response.Server.Functionality, 10, 32)
if parseErr != nil {
logger.Warn("unable to parse DDS functionality", "value", response.Server.Functionality, "error", parseErr)
} else {
c.functionality.Store(int32(fl))
}
c.rwMutex.Unlock()
logger.Debug("DDS time data updated")
return timeData, nil
Expand Down Expand Up @@ -299,3 +308,8 @@ func (c *Client) GetSystems() []string {
c.ensureTimeData()
return c.systems
}

func (c *Client) GetFunctionality() int32 {
c.ensureTimeData()
return c.functionality.Load()
}
21 changes: 19 additions & 2 deletions grafana/rmf-app/pkg/plugin/dds/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package dds
import (
"fmt"
"net/url"
"strconv"
"strings"
"time"

Expand All @@ -29,14 +30,23 @@ import (
type Request struct {
Resource string
TimeRange data.TimeRange
Batched bool
Span time.Duration
}

func NewRequest(res string, from time.Time, to time.Time, step time.Duration) *Request {
q := Request{Resource: res, TimeRange: data.TimeRange{From: from, To: to}}
q := Request{Resource: res, TimeRange: data.TimeRange{From: from, To: to}, Batched: false}
q.Align(step)
return &q
}

func NewBatchRequest(res string, t time.Time, step time.Duration, span time.Duration) *Request {
startHour := t.Truncate(step)
endHour := startHour.Add(step)
q := Request{Resource: res, TimeRange: data.TimeRange{From: startHour, To: endHour}, Batched: true, Span: span}
return &q
}

func (r *Request) Align(d time.Duration) {
r.TimeRange.From = r.TimeRange.From.Truncate(d)
t := r.TimeRange.To.Truncate(d)
Expand Down Expand Up @@ -76,6 +86,13 @@ func (r *Request) pathWithParams(timeOfs time.Duration) (string, []string, error
if path == "" {
path = PerformPath
}
params = append(params, "range", r.formatRange(timeOfs))
var rangeParam = "range"
if r.Batched {
params = append(params, "batchSpan", strconv.Itoa(int(r.Span.Seconds())))
path = TimeSeriesPath
timeOfs = 0
rangeParam = "rangeUtc"
}
params = append(params, rangeParam, r.formatRange(timeOfs))
return path, params, nil
}
Loading
Loading