diff --git a/code/src/sixsq/nuvla/db/binding.clj b/code/src/sixsq/nuvla/db/binding.clj index 7be208d53..e9f60f136 100644 --- a/code/src/sixsq/nuvla/db/binding.clj +++ b/code/src/sixsq/nuvla/db/binding.clj @@ -94,33 +94,18 @@ error codes can also be thrown.") (query-native - [this collection-id options] - "This function executes a native query, where the collection-id - corresponds to the name of a Collection. + [this index options] + "This function executes a native query against the given index. On success, the function must return the response body. On failure, the function must throw an ex-info containing the error - ring response. If the resource-id does not correspond to a Collection, - then a 400 (bad-request) response must be returned. Other appropriate - error codes can also be thrown.") - - (add-metric - [this collection-id data options] - "This function adds the given metric to the database. The metric - must not already exist in the database. - - On success, the function must return a 201 ring response with the - relative URL of the new metric as the Location. - - On failure, the function must throw an ex-info containing the error - ring response. The error must be 409 (conflict) if the metric - exists already. Other appropriate error codes can also be thrown.") + ring response.") - (bulk-insert-metrics - [this collection-id data options] - "This function insert the given metrics in the database where the - collection-id corresponds to the name of a metrics Collection. + (bulk-delete + [this collection-id options] + "This function removes the given resources in the database where the + collection-id corresponds to the name of a Collection. On success, the function must return the summary map of what was done on the db. @@ -130,9 +115,9 @@ then a 400 (bad-request) response must be returned. Other appropriate error codes can also be thrown.") - (bulk-delete + (bulk-edit [this collection-id options] - "This function removes the given resources in the database where the + "This function edits the given resources in the database where the collection-id corresponds to the name of a Collection. On success, the function must return the summary map of what was done @@ -143,10 +128,40 @@ then a 400 (bad-request) response must be returned. Other appropriate error codes can also be thrown.") - (bulk-edit - [this collection-id options] - "This function edits the given resources in the database where the - collection-id corresponds to the name of a Collection. + (create-timeseries + [this index options] + "This function creates a timeseries with the given index name in the database.") + + (retrieve-timeseries + [this index] + "This function retrieves the identified timeseries from the database. + + On success, this returns the clojure map representation of the + timeseries. The response must not be embedded in a ring response. + + On failure, this function must throw an ex-info containing the error + ring response. If the resource doesn't exist, use a 404 status.") + + (edit-timeseries + [this index options] + "This function updates (edits) the given timeseries in the database. + The timeseries must already exist in the database.") + + (add-timeseries-datapoint + [this index data options] + "This function adds the given timeseries datapoint to the database. + The datapoint with the given timestamp and dimensions must not already exist in the database. + + On success, the function must return a 201 ring response with the + relative URL of the new metric as the Location. + + On failure, the function must throw an ex-info containing the error + ring response. The error must be 409 (conflict) if the metric + exists already. Other appropriate error codes can also be thrown.") + + (bulk-insert-timeseries-datapoints + [this index data options] + "This function insert the given timeseries datapoints in the database. On success, the function must return the summary map of what was done on the db. @@ -154,4 +169,8 @@ On failure, the function must throw an ex-info containing the error ring response. If the resource-id does not correspond to a Collection, then a 400 (bad-request) response must be returned. Other appropriate - error codes can also be thrown.")) + error codes can also be thrown.") + + (delete-timeseries + [this index options] + "This function deletes a timeseries with the given index name from the database.")) diff --git a/code/src/sixsq/nuvla/db/es/binding.clj b/code/src/sixsq/nuvla/db/es/binding.clj index 78d8258c4..5e0198499 100644 --- a/code/src/sixsq/nuvla/db/es/binding.clj +++ b/code/src/sixsq/nuvla/db/es/binding.clj @@ -10,6 +10,7 @@ [sixsq.nuvla.db.es.common.es-mapping :as mapping] [sixsq.nuvla.db.es.common.utils :as escu] [sixsq.nuvla.db.es.filter :as filter] + [sixsq.nuvla.db.es.log :as es-logu] [sixsq.nuvla.db.es.order :as order] [sixsq.nuvla.db.es.pagination :as paging] [sixsq.nuvla.db.es.script-utils :refer [get-update-script]] @@ -19,6 +20,25 @@ [sixsq.nuvla.server.util.response :as r]) (:import (java.io Closeable))) +(defn spandex-request-plain + "Run a spandex request and checks that the response code is among the expected ones: + if it is not, the response is traced in the logs and a generic exception is returned to the caller. + Does not catch exceptions." + [client request expected-status-set] + (let [{:keys [status body] :as response} (spandex/request client request)] + (if (contains? expected-status-set status) + response + (es-logu/log-and-throw-unexpected-es-status (pr-str body) status expected-status-set)))) + +(defn spandex-request + "Run a spandex request and checks that the response code is among the expected ones: + if it is not, the response is traced in the logs and a generic exception is thrown to the caller. + When a Spandex exception occurs, it is also traced in the logs and a generic exception is thrown to the caller." + [client request expected-status-set] + (try (spandex-request-plain client request expected-status-set) + (catch Exception e + (es-logu/log-and-throw-unexpected-es-ex e)))) + (defn create-index [client index] (try @@ -180,9 +200,11 @@ {:timeout (str timeout "ms")})) (defn query-data - [client collection-id {:keys [cimi-params params] :as options}] + [client collection-id {:keys [cimi-params params no-prefix] :as options}] (try - (let [index (escu/collection-id->index collection-id) + (let [index (if no-prefix + collection-id + (escu/collection-id->index collection-id)) paging (paging/paging cimi-params) orderby (order/sorters cimi-params) aggregation (merge-with merge @@ -212,81 +234,75 @@ (let [msg (str "error when querying: " (:body response))] (throw (r/ex-response msg 500))))) (catch Exception e - (let [{:keys [body] :as _response} (ex-data e) - error (:error body) - msg (str "unexpected exception querying: " (or error e))] - (throw (r/ex-response msg 500)))))) + (es-logu/log-and-throw-unexpected-es-ex e)))) (defn query-data-native - [client collection-id query] - (try - (let [index (escu/collection-id->index collection-id) - response (spandex/request client {:url [index :_search] - :method :post - :body query})] - (if (shards-successful? response) - (:body response) - (let [msg (str "error when querying: " (:body response))] - (throw (r/ex-response msg 500))))) - (catch Exception e - (let [{:keys [body] :as _response} (ex-data e) - error (:error body) - msg (str "unexpected exception querying: " (or error e))] + [client index query] + (let [response (spandex-request client {:url [index :_search] + :method :post + :body query} #{200})] + (if (shards-successful? response) + (:body response) + (let [msg (str "error when querying: " (:body response))] (throw (r/ex-response msg 500)))))) -(defn add-metric-data - [client collection-id data {:keys [refresh] - :or {refresh true} - :as _options}] +(defn add-timeseries-datapoint + [client index data {:keys [refresh] + :or {refresh true} + :as _options}] (try - (let [index (escu/collection-id->index collection-id) - updated-data (-> data + (let [updated-data (-> data (dissoc :timestamp) (assoc "@timestamp" (:timestamp data))) - response (spandex/request client {:url [index :_doc] - :query-string {:refresh refresh} - :method :post - :body updated-data}) + response (spandex-request-plain client {:url [index :_doc] + :query-string {:refresh refresh} + :method :post + :body updated-data} + #{201}) success? (shards-successful? response)] (if success? {:status 201 :body {:status 201 - :message (str collection-id " metric added")}} - (r/response-conflict collection-id))) + :message (str index " metric added")}} + (r/response-conflict index))) (catch Exception e - (let [{:keys [status body] :as _response} (ex-data e) - error (:error body)] + (let [{:keys [status] :as _response} (ex-data e)] (if (= 409 status) - (r/response-conflict collection-id) - (r/response-error (str "unexpected exception: " (or error e)))))))) + (r/response-conflict index) + (es-logu/log-and-throw-unexpected-es-ex e)))))) -(defn bulk-insert-metrics - [client collection-id data _options] - (try - (let [index (escu/collection-id->index collection-id) - data-transform (fn [{:keys [timestamp] :as doc}] - (-> doc - (dissoc :timestamp) - (assoc "@timestamp" timestamp))) - body (spandex/chunks->body (interleave (repeat {:create {}}) - (map data-transform data))) - response (spandex/request client {:url [index :_bulk] - :method :put - :headers {"Content-Type" "application/x-ndjson"} - :body body}) - body-response (:body response) - success? (not (errors? response))] - (if success? - body-response - (let [items (:items body-response) - msg (str (if (seq items) - {:errors-count (count items) - :first-error (first items)} - body-response))] - (throw (r/ex-response msg 400))))) - (catch Exception e - (let [{:keys [body status]} (ex-data e)] - (throw (r/ex-response (str body) (or status 500))))))) +(defn process-es-response + [response] + (let [body-response (:body response) + success? (not (errors? response))] + (if success? + body-response + (let [items (:items body-response) + status-codes (map (comp :status second first) items) + msg (str (if (seq items) + {:errors-count (count items) + :first-error (first items)} + body-response))] + (cond + (some #{409} status-codes) + (es-logu/throw-conflict-ex "") + :else + (es-logu/log-and-throw-unexpected-es-ex msg (ex-info msg {}))))))) + +(defn bulk-insert-timeseries-datapoints + [client index data _options] + (let [data-transform (fn [{:keys [timestamp] :as doc}] + (-> doc + (dissoc :timestamp) + (assoc "@timestamp" timestamp))) + body (spandex/chunks->body (interleave (repeat {:create {}}) + (map data-transform data))) + response (spandex-request client {:url [index :_bulk] + :method :put + :headers {"Content-Type" "application/x-ndjson"} + :body body} + #{200})] + (process-es-response response))) (defn bulk-edit-data [client collection-id @@ -359,101 +375,195 @@ (defn create-or-update-lifecycle-policy [client index ilm-policy] (let [policy-name (str index "-ilm-policy")] - (try - (let [{:keys [status]} - (spandex/request - client - {:url [:_ilm :policy policy-name] - :method :put - :body {:policy - {:_meta {:description (str "ILM policy for " index)} - :phases ilm-policy}}})] - (if (= 200 status) - (do (log/debug policy-name "ILM policy created/updated") - policy-name) - (log/error "unexpected status code when creating/updating" policy-name "ILM policy (" status ")"))) - (catch Exception e - (let [{:keys [status body] :as _response} (ex-data e) - error (:error body)] - (log/error "unexpected status code when creating/updating" policy-name "ILM policy (" status "). " (or error e))))))) - -(defn create-timeseries-template - [client index mapping {:keys [routing-path look-back-time look-ahead-time start-time lifecycle-name]}] + (spandex-request + client + {:url [:_ilm :policy policy-name] + :method :put + :body {:policy + {:_meta {:description (str "ILM policy for " index)} + :phases ilm-policy}}} + #{200}) + (log/debug policy-name "ILM policy created/updated") + policy-name)) + +(defn delete-lifecycle-policy + [client index] + (let [policy-name (str index "-ilm-policy")] + (spandex-request + client + {:url [:_ilm :policy policy-name] + :method :delete} + #{200}) + (log/debug policy-name "ILM policy deleted") + policy-name)) + +(defn create-or-update-timeseries-template + [client index mappings {:keys [routing-path look-back-time look-ahead-time start-time lifecycle-name]}] (let [template-name (str index "-template")] - (try - (let [{:keys [status]} (spandex/request client - {:url [:_index_template template-name], - :method :put - :body {:index_patterns [(str index "*")], - :data_stream {}, - :template - {:settings - (cond-> - {:index.mode "time_series", - :number_of_shards 3 - ;:index.look_back_time "7d", - ;:index.look_ahead_time "2h", - ;:index.time_series.start_time "2023-01-01T00:00:00.000Z" - ;:index.lifecycle.name "nuvlabox-status-ts-1d-hf-ilm-policy" - } - routing-path (assoc :index.routing_path routing-path) - look-ahead-time (assoc :index.look_ahead_time look-ahead-time) - look-back-time (assoc :index.look_back_time look-back-time) - start-time (assoc :index.time_series.start_time start-time) - lifecycle-name (assoc :index.lifecycle.name lifecycle-name)) - :mappings mapping}}})] - (if (= 200 status) - (do (log/debug template-name "index template created/updated") - template-name) - (log/error "unexpected status code when creating/updating" template-name "index template (" status ")"))) - (catch Exception e - (let [{:keys [status body] :as _response} (ex-data e) - error (:error body)] - (log/error "unexpected status code when creating/updating" template-name "index template (" status "). " (or error e))))))) - -(defn create-datastream + (spandex-request client + {:url [:_index_template template-name], + :method :put + :body {:index_patterns [(str index "*")], + :data_stream {}, + :template + {:settings + (cond-> + {:index.mode "time_series", + :number_of_shards 3 + ;:index.look_back_time "7d", + ;:index.look_ahead_time "2h", + ;:index.time_series.start_time "2023-01-01T00:00:00.000Z" + ;:index.lifecycle.name "nuvlabox-status-ts-1d-hf-ilm-policy" + } + routing-path (assoc :index.routing_path routing-path) + look-ahead-time (assoc :index.look_ahead_time look-ahead-time) + look-back-time (assoc :index.look_back_time look-back-time) + start-time (assoc :index.time_series.start_time start-time) + lifecycle-name (assoc :index.lifecycle.name lifecycle-name)) + :mappings mappings}}} + #{200}) + (log/debug template-name "index template created/updated") + template-name)) + +(defn delete-timeseries-template + [client index] + (let [template-name (str index "-template")] + (spandex-request client + {:url [:_index_template template-name], + :method :delete} + #{200}) + (log/debug template-name "index template deleted") + template-name)) + +(defn retrieve-datastream [client datastream-index-name] (try - (let [{:keys [status]} (spandex/request client {:url [:_data_stream datastream-index-name], :method :get})] - (if (= 200 status) - (log/debug datastream-index-name "datastream already exists") - (log/error "unexpected status code when checking" datastream-index-name "datastream (" status ")"))) + (let [{:keys [body]} (spandex-request-plain client {:url [:_data_stream datastream-index-name] :method :get} + #{200})] + (->> body :data_streams first)) (catch Exception e - (let [{:keys [status body]} (ex-data e)] - (try - (if (= 404 status) - (let [{{:keys [acknowledged]} :body} - (spandex/request client {:url [:_data_stream datastream-index-name], :method :put})] - (if acknowledged - (log/info datastream-index-name "datastream created") - (log/warn datastream-index-name "datastream may or may not have been created"))) - (log/error "unexpected status code when checking" datastream-index-name "datastream (" status "). " body)) - (catch Exception e - (let [{:keys [status body] :as _response} (ex-data e) - error (:error body)] - (log/error "unexpected status code when creating" datastream-index-name "datastream (" status "). " (or error e))))))))) - -(defn initialize-timeserie-datastream - [client collection-id {:keys [spec ilm-policy look-back-time look-ahead-time start-time] - :or {ilm-policy hot-warm-cold-delete-policy - look-back-time "7d"} - :as _options}] - (let [index (escu/collection-id->index collection-id) - mapping (mapping/mapping spec {:dynamic-templates false, :fulltext false}) - routing-path (mapping/time-series-routing-path spec) - ilm-policy-name (create-or-update-lifecycle-policy client index ilm-policy)] - (create-timeseries-template client index mapping {:routing-path routing-path - :lifecycle-name ilm-policy-name - :look-ahead-time look-ahead-time - :look-back-time look-back-time - :start-time start-time}) - (create-datastream client index))) + (let [{:keys [status] :as _response} (ex-data e)] + (when (not= 404 status) + (es-logu/log-and-throw-unexpected-es-ex e)))))) + +(defn create-datastream + [client datastream-index-name] + (if (some? (retrieve-datastream client datastream-index-name)) + (es-logu/throw-conflict-ex datastream-index-name) + (let [{{:keys [acknowledged]} :body} + (spandex-request client {:url [:_data_stream datastream-index-name], :method :put} #{200})] + (if acknowledged + (log/info datastream-index-name "datastream created") + (log/warn datastream-index-name "datastream may or may not have been created"))))) + +(defn datastream-mappings + [client datastream-index-name] + (->> (spandex-request client {:url [datastream-index-name :_mapping], :method :get} #{200}) + :body seq (sort-by first) last second :mappings :properties)) + +(defn datastream-rollover + [client datastream-index-name] + (let [{{:keys [acknowledged]} :body} + (spandex-request client {:url [datastream-index-name :_rollover] + :method :post} + #{200})] + (if acknowledged + (log/info datastream-index-name "rollover executed successfully") + (log/warn datastream-index-name "rollover may or may not have executed")))) + +(defn edit-datastream + [client datastream-index-name new-mappings] + (let [current-mappings (datastream-mappings client datastream-index-name) + {{:keys [acknowledged]} :body} + (spandex-request client {:url [datastream-index-name :_mapping] + :query-string {:write_index_only true} + :method :put + :body new-mappings} + #{200})] + (when-not (= current-mappings (datastream-mappings client datastream-index-name)) + ;; if there was a change in the mappings do a rollover + (datastream-rollover client datastream-index-name)) + (if acknowledged + (log/info datastream-index-name "datastream updated") + (log/warn datastream-index-name "datastream may or may not have been updated")))) + +(defn delete-datastream + [client datastream-index-name] + (spandex-request client {:url [:_data_stream datastream-index-name], :method :delete} #{200}) + (log/debug datastream-index-name "datastream deleted")) + +(defn create-timeseries-impl + [client timeseries-id + {:keys [create-datastream? + mappings + routing-path + ilm-policy + look-back-time + look-ahead-time + start-time] + :or {ilm-policy hot-warm-cold-delete-policy + look-back-time "7d"} + :as _options}] + (let [ilm-policy-name (create-or-update-lifecycle-policy client timeseries-id ilm-policy)] + (create-or-update-timeseries-template client timeseries-id mappings + {:routing-path routing-path + :lifecycle-name ilm-policy-name + :look-ahead-time look-ahead-time + :look-back-time look-back-time + :start-time start-time})) + (when create-datastream? + (create-datastream client timeseries-id))) + +(defn retrieve-timeseries-impl + [client timeseries-id] + (or (retrieve-datastream client timeseries-id) + (throw (r/ex-not-found timeseries-id)))) + +(defn edit-timeseries-impl + [client timeseries-id + {:keys [mappings + routing-path + ilm-policy + look-back-time + look-ahead-time + start-time] + :or {ilm-policy hot-warm-cold-delete-policy + look-back-time "7d"} + :as _options}] + (let [ilm-policy-name (create-or-update-lifecycle-policy client timeseries-id ilm-policy)] + (create-or-update-timeseries-template + client timeseries-id mappings + {:routing-path routing-path + :lifecycle-name ilm-policy-name + :look-ahead-time look-ahead-time + :look-back-time look-back-time + :start-time start-time})) + (when (some? (retrieve-datastream client timeseries-id)) + (edit-datastream client timeseries-id mappings))) + +(defn delete-timeseries-impl + [client timeseries-id _options] + (when (some? (retrieve-datastream client timeseries-id)) + (delete-datastream client timeseries-id)) + (delete-timeseries-template client timeseries-id) + (delete-lifecycle-policy client timeseries-id)) + +(defn initialize-collection-timeseries + [client collection-id {:keys [spec] :as options}] + (let [timeseries-id (escu/collection-id->index collection-id) + mappings (mapping/mapping spec {:dynamic-templates false, :fulltext false}) + routing-path (mapping/time-series-routing-path spec)] + (create-timeseries-impl client timeseries-id + (assoc options + :create-datastream? true + :mappings mappings + :routing-path routing-path)))) (defn initialize-db [client collection-id {:keys [spec timeseries] :as options}] (let [index (escu/collection-id->index collection-id)] (if timeseries - (initialize-timeserie-datastream client collection-id options) + (initialize-collection-timeseries client collection-id options) (let [mapping (mapping/mapping spec)] (create-index client index) (set-index-mapping client index mapping))))) @@ -486,14 +596,8 @@ (query [_ collection-id options] (query-data client collection-id options)) - (query-native [_ collection-id query] - (query-data-native client collection-id query)) - - (add-metric [_ collection-id data options] - (add-metric-data client collection-id data options)) - - (bulk-insert-metrics [_ collection-id data options] - (bulk-insert-metrics client collection-id data options)) + (query-native [_ index query] + (query-data-native client index query)) (bulk-delete [_ collection-id options] (bulk-delete-data client collection-id options)) @@ -501,6 +605,25 @@ (bulk-edit [_ collection-id options] (bulk-edit-data client collection-id options)) + (create-timeseries [_ timeseries-id options] + (create-timeseries-impl client timeseries-id options)) + + (retrieve-timeseries [_ timeseries-id] + (retrieve-timeseries-impl client timeseries-id)) + + (edit-timeseries [_ timeseries-id options] + (edit-timeseries-impl client timeseries-id options)) + + (add-timeseries-datapoint [_ index data options] + (add-timeseries-datapoint client index data options)) + + (bulk-insert-timeseries-datapoints [_ index data options] + (bulk-insert-timeseries-datapoints client index data options)) + + (delete-timeseries [_ timeseries-id options] + (delete-timeseries-impl client timeseries-id options)) + + Closeable (close [_] (when sniffer diff --git a/code/src/sixsq/nuvla/db/es/log.clj b/code/src/sixsq/nuvla/db/es/log.clj new file mode 100644 index 000000000..797389271 --- /dev/null +++ b/code/src/sixsq/nuvla/db/es/log.clj @@ -0,0 +1,39 @@ +(ns sixsq.nuvla.db.es.log + (:require [clojure.tools.logging :as log] + [sixsq.nuvla.server.util.response :as r])) + +(defn throw-bad-request-ex + [msg] + (throw (r/ex-bad-request msg))) + +(defn throw-conflict-ex + [id] + (throw (r/ex-conflict id))) + +(defn log-and-throw-bad-request-ex + ([ex] + (log-and-throw-bad-request-ex "bad request" ex)) + ([msg ex] + (let [{:keys [status body]} (ex-data ex) + error (:error body)] + (log/error msg {:status status} (or error ex)) + (throw (r/ex-bad-request msg))))) + +(defn log-and-throw-unexpected-es-status + ([status expected-status-set] + (log-and-throw-unexpected-es-status "unexpected status code" status expected-status-set)) + ([msg status expected-status-set] + (log/error (str "unexpected status " status ". One of " expected-status-set " was expected.") msg) + (throw (r/ex-response msg 500)))) + +(defn log-and-throw-unexpected-es-ex + ([ex] + (log-and-throw-unexpected-es-ex "unexpected error" ex)) + ([msg ex] + (log-and-throw-unexpected-es-ex msg "unexpected error" ex)) + ([internal-msg external-msg ex] + (let [{:keys [status body]} (ex-data ex) + error (:error body)] + (log/error internal-msg {:status status} (or error ex)) + (throw (r/ex-response external-msg 500))))) + diff --git a/code/src/sixsq/nuvla/db/impl.clj b/code/src/sixsq/nuvla/db/impl.clj index 37b3cc786..78e5e607b 100644 --- a/code/src/sixsq/nuvla/db/impl.clj +++ b/code/src/sixsq/nuvla/db/impl.clj @@ -42,14 +42,8 @@ (defn query [collection-id & [options]] (p/query *impl* collection-id options)) -(defn query-native [collection-id query] - (p/query-native *impl* collection-id query)) - -(defn add-metric [collection-id data & [options]] - (p/add-metric *impl* collection-id data options)) - -(defn bulk-insert-metrics [collection-id data & [options]] - (p/bulk-insert-metrics *impl* collection-id data options)) +(defn query-native [index query] + (p/query-native *impl* index query)) (defn bulk-delete [collection-id & [options]] (p/bulk-delete *impl* collection-id options)) @@ -57,6 +51,23 @@ (defn bulk-edit [collection-id & [options]] (p/bulk-edit *impl* collection-id options)) +(defn create-timeseries [index & [options]] + (p/create-timeseries *impl* index options)) + +(defn edit-timeseries [index & [options]] + (p/edit-timeseries *impl* index options)) + +(defn retrieve-timeseries [timeseries-id] + (p/retrieve-timeseries *impl* timeseries-id)) + +(defn add-timeseries-datapoint [index data & [options]] + (p/add-timeseries-datapoint *impl* index data options)) + +(defn bulk-insert-timeseries-datapoints [index data & [options]] + (p/bulk-insert-timeseries-datapoints *impl* index data options)) + +(defn delete-timeseries [index & [options]] + (p/delete-timeseries *impl* index options)) (defn close [] (when-let [^Closeable impl *impl*] diff --git a/code/src/sixsq/nuvla/server/resources/common/crud.clj b/code/src/sixsq/nuvla/server/resources/common/crud.clj index 19b412466..80b517d3c 100644 --- a/code/src/sixsq/nuvla/server/resources/common/crud.clj +++ b/code/src/sixsq/nuvla/server/resources/common/crud.clj @@ -2,6 +2,7 @@ (:require [sixsq.nuvla.auth.acl-resource :as a] [sixsq.nuvla.auth.utils :as auth] + [sixsq.nuvla.db.es.common.utils :as escu] [sixsq.nuvla.db.filter.parser :as parser] [sixsq.nuvla.db.impl :as db] [sixsq.nuvla.server.resources.common.utils :as u] @@ -55,7 +56,7 @@ (defn query-native "Executes the database query as a native query." [collection-id query] - (db/query-native collection-id query)) + (db/query-native (escu/collection-id->index collection-id) query)) (defmulti bulk-delete resource-name-dispatch) diff --git a/code/src/sixsq/nuvla/server/resources/common/std_crud.clj b/code/src/sixsq/nuvla/server/resources/common/std_crud.clj index 77062c875..8af7aa627 100644 --- a/code/src/sixsq/nuvla/server/resources/common/std_crud.clj +++ b/code/src/sixsq/nuvla/server/resources/common/std_crud.clj @@ -8,6 +8,7 @@ [clojure.walk :as w] [sixsq.nuvla.auth.acl-resource :as a] [sixsq.nuvla.auth.utils :as auth] + [sixsq.nuvla.db.es.common.utils :as escu] [sixsq.nuvla.db.impl :as db] [sixsq.nuvla.server.middleware.cimi-params.impl :as impl] [sixsq.nuvla.server.resources.common.crud :as crud] @@ -213,23 +214,24 @@ (str "_" resource-name))] (create-bulk-job action-name resource-name authn-info acl body)))) -(defn add-metric-fn +(defn add-timeseries-datapoint-fn [resource-name collection-acl _resource-uri & {:keys [validate-fn options]}] (validate-collection-acl collection-acl) (fn [{:keys [body] :as request}] (a/throw-cannot-add collection-acl request) (validate-fn body) - (db/add-metric resource-name body options))) + (db/add-timeseries-datapoint (escu/collection-id->index resource-name) + body options))) -(defn bulk-insert-metrics-fn - [resource-name collection-acl _collection-uri] +(defn bulk-insert-timeseries-datapoints-fn + [index collection-acl _collection-uri] (validate-collection-acl collection-acl) (fn [{:keys [body] :as request}] (throw-bulk-header-missing request) (a/throw-cannot-add collection-acl request) (a/throw-cannot-bulk-action collection-acl request) (let [options (select-keys request [:nuvla/authn :body]) - response (db/bulk-insert-metrics resource-name body options)] + response (db/bulk-insert-timeseries-datapoints index body options)] (r/json-response response)))) (defn generic-bulk-operation-fn diff --git a/code/src/sixsq/nuvla/server/resources/nuvlabox/data_utils.clj b/code/src/sixsq/nuvla/server/resources/nuvlabox/data_utils.clj index 728118400..94d9ae843 100644 --- a/code/src/sixsq/nuvla/server/resources/nuvlabox/data_utils.clj +++ b/code/src/sixsq/nuvla/server/resources/nuvlabox/data_utils.clj @@ -1,6 +1,5 @@ (ns sixsq.nuvla.server.resources.nuvlabox.data-utils (:require - [clojure.data.csv :as csv] [clojure.data.json :as json] [clojure.set :as set] [clojure.string :as str] @@ -8,25 +7,19 @@ [environ.core :as env] [promesa.core :as p] [promesa.exec :as px] - [ring.middleware.accept :refer [wrap-accept]] [sixsq.nuvla.auth.utils :as auth] [sixsq.nuvla.db.filter.parser :as parser] [sixsq.nuvla.server.middleware.cimi-params.impl :as cimi-params-impl] [sixsq.nuvla.server.resources.common.crud :as crud] [sixsq.nuvla.server.resources.common.utils :as u] [sixsq.nuvla.server.resources.nuvlabox.utils :as utils] + [sixsq.nuvla.server.resources.timeseries.data-utils :as ts-data-utils] [sixsq.nuvla.server.resources.ts-nuvlaedge-availability :as ts-nuvlaedge-availability] [sixsq.nuvla.server.resources.ts-nuvlaedge-telemetry :as ts-nuvlaedge-telemetry] [sixsq.nuvla.server.util.log :as logu] - [sixsq.nuvla.server.util.response :as r] [sixsq.nuvla.server.util.time :as time]) - (:import - (java.io StringWriter) - (java.text DecimalFormat DecimalFormatSymbols) - (java.util Locale) - (java.util.concurrent ExecutionException TimeoutException))) + (:import (java.util.concurrent ExecutionException TimeoutException))) -(def max-data-points 200) (def running-query-data (atom 0)) (def requesting-query-data (atom 0)) (def query-data-max-attempts (env/env :query-data-max-attempts 50)) @@ -164,123 +157,17 @@ (catch Exception ex (log/error "An error occurred inserting metrics: " (ex-message ex))))) -(defn ->predefined-aggregations-resp - [{:keys [mode nuvlaedge-ids aggregations] group-by-field :group-by} resp] - (let [ts-data (fn [tsds-stats] - (map - (fn [{:keys [key_as_string doc_count] :as bucket}] - {:timestamp key_as_string - :doc-count doc_count - :aggregations (->> (keys aggregations) - (select-keys bucket))}) - (:buckets tsds-stats))) - dimensions (case mode - :single-edge-query - {:nuvlaedge-id (first nuvlaedge-ids)} - :multi-edge-query - {:nuvlaedge-count (count nuvlaedge-ids)}) - hits (second resp)] - (if group-by-field - (for [{:keys [key tsds-stats]} (get-in resp [0 :aggregations :by-field :buckets])] - (cond-> - {:dimensions (assoc dimensions group-by-field key) - :ts-data (ts-data tsds-stats)} - (seq hits) (assoc :hits hits))) - [(cond-> - {:dimensions dimensions - :ts-data (ts-data (get-in resp [0 :aggregations :tsds-stats]))} - (seq hits) (assoc :hits hits))]))) - -(defn ->custom-es-aggregations-resp - [{:keys [mode nuvlaedge-ids]} resp] - (let [ts-data (fn [tsds-stats] - (map - (fn [{:keys [key_as_string doc_count] :as bucket}] - {:timestamp key_as_string - :doc-count doc_count - :aggregations (dissoc bucket :key_as_string :key :doc_count)}) - (:buckets tsds-stats))) - dimensions (case mode - :single-edge-query - {:nuvlaedge-id (first nuvlaedge-ids)} - :multi-edge-query - {:nuvlaedge-count (count nuvlaedge-ids)})] - [(merge {:dimensions dimensions} - (into {} (for [agg-key (keys (get-in resp [0 :aggregations]))] - [agg-key (ts-data (get-in resp [0 :aggregations agg-key]))])))])) - -(defn ->raw-resp - [{:keys [mode nuvlaedge-ids]} resp] - (let [dimensions (case mode - :single-edge-query - {:nuvlaedge-id (first nuvlaedge-ids)} - :multi-edge-query - {:nuvlaedge-count (count nuvlaedge-ids)}) - hits (second resp)] - [{:dimensions dimensions - :ts-data (sort-by :timestamp hits)}])) - -(defn ->metrics-resp - [{:keys [predefined-aggregations custom-es-aggregations raw] :as options} resp] - (cond - predefined-aggregations - (->predefined-aggregations-resp options resp) - - raw - (->raw-resp options resp) - - custom-es-aggregations - (->custom-es-aggregations-resp options resp))) - -(defn build-aggregations-clause - [{:keys [predefined-aggregations raw custom-es-aggregations from to ts-interval aggregations] group-by-field :group-by}] - (cond - raw - {} ;; send an empty :tsds-aggregation to avoid acl checks. TODO: find a cleaner way - - predefined-aggregations - (let [tsds-aggregations {:tsds-stats - {:date_histogram - {:field "@timestamp" - :fixed_interval ts-interval - :min_doc_count 0 - :extended_bounds {:min (time/to-str from) - :max (time/to-str to)}} - :aggregations (or aggregations {})}}] - - (if group-by-field - {:aggregations - {:by-field - {:terms {:field group-by-field} - :aggregations tsds-aggregations}}} - {:aggregations tsds-aggregations})) - - custom-es-aggregations - {:aggregations custom-es-aggregations})) - -(defn build-ts-query [{:keys [last nuvlaedge-ids from to additional-filters orderby] :as options}] - (let [nuvlabox-id-filter (str "nuvlaedge-id=[" (str/join " " (map #(str "'" % "'") - nuvlaedge-ids)) - "]") - time-range-filter (str "@timestamp>'" (time/to-str from) "'" - " and " - "@timestamp<'" (time/to-str to) "'") - aggregation-clause (build-aggregations-clause options)] - (cond-> - {:cimi-params (cond-> - {:last (or last 0) - :filter - (parser/parse-cimi-filter - (str "(" - (apply str - (interpose " and " - (into [nuvlabox-id-filter - time-range-filter] - additional-filters))) - ")"))} - orderby (assoc :orderby orderby))} - aggregation-clause - (assoc :params {:tsds-aggregation (json/write-str aggregation-clause)})))) +(defn ->resp-dimensions + [{:keys [mode nuvlaedge-ids]}] + (case mode + :single-edge-query + {:nuvlaedge-id (first nuvlaedge-ids)} + :multi-edge-query + {:nuvlaedge-count (count nuvlaedge-ids)})) + +(defn build-ts-query [{:keys [nuvlaedge-ids] :as options}] + (ts-data-utils/build-ts-query + (assoc options :dimensions-filters {"nuvlaedge-id" nuvlaedge-ids}))) (defn build-availability-query [options] ;; return up to 10000 availability state updates @@ -290,7 +177,7 @@ [options] (->> (build-availability-query options) (crud/query-as-admin ts-nuvlaedge-availability/resource-type) - (->metrics-resp options))) + (ts-data-utils/->ts-query-resp (assoc options :->resp-dimensions-fn ->resp-dimensions)))) (defn query-availability-raw ([options] @@ -329,13 +216,13 @@ (defn build-telemetry-query [{:keys [raw metric] :as options}] (build-ts-query (-> options (assoc :additional-filters [(str "metric='" metric "'")]) - (cond-> raw (assoc :last max-data-points))))) + (cond-> raw (assoc :last ts-data-utils/max-data-points))))) (defn query-metrics [options] (->> (build-telemetry-query options) (crud/query-as-admin ts-nuvlaedge-telemetry/resource-type) - (->metrics-resp options))) + (ts-data-utils/->ts-query-resp (assoc options :->resp-dimensions-fn ->resp-dimensions)))) (defn latest-availability-status ([nuvlaedge-id] @@ -557,13 +444,15 @@ (let [[_ n unit] (re-matches #"(.*)-(.*)" (name granularity))] (try (time/duration (Integer/parseInt n) (keyword unit)) - (catch Exception _ + (catch Exception e + (log/error e) (logu/log-and-throw-400 (str "unrecognized value for granularity " granularity)))))) (defn precompute-query-params - [{:keys [predefined-aggregations granularity] :as query-opts}] + [{:keys [raw query-type granularity] :as query-opts}] (cond-> query-opts - predefined-aggregations (assoc :granularity-duration (granularity->duration granularity)))) + (and (not raw) (= ts-data-utils/query-type-standard query-type)) + (assoc :granularity-duration (granularity->duration granularity)))) (defn available-before? [{:keys [first-availability] :as _nuvlabox} timestamp] @@ -680,8 +569,8 @@ resp)) (defn compute-nuvlabox-availability - [[{:keys [predefined-aggregations granularity-duration nuvlaboxes] :as query-opts} resp]] - (if predefined-aggregations + [[{:keys [raw query-type granularity-duration nuvlaboxes] :as query-opts} resp]] + (if (and (not raw) (= ts-data-utils/query-type-standard query-type)) (let [nuvlabox (first nuvlaboxes) now (time/now) hits (->> (get-in resp [0 :hits]) @@ -699,129 +588,76 @@ [[query-opts resp]] [query-opts (update-in resp [0] dissoc :hits)]) -(defn throw-custom-aggregations-not-exportable - [{:keys [custom-es-aggregations]}] - (when custom-es-aggregations - (logu/log-and-throw-400 "Custom aggregations cannot be exported to csv format"))) - -(defn metrics-data->csv [options dimension-keys meta-keys metric-keys data-fn response] - (with-open [writer (StringWriter.)] - ;; write csv header - (csv/write-csv writer [(concat (map name dimension-keys) - (map name meta-keys) - (map name metric-keys))]) - ;; write csv data - (let [df (DecimalFormat. "0.####" (DecimalFormatSymbols. Locale/US))] - (csv/write-csv writer - (for [{:keys [dimensions ts-data]} response - data-point ts-data] - (concat (map dimensions dimension-keys) - (map data-point meta-keys) - (map (fn [metric-key] - (let [v (data-fn options data-point metric-key)] - (if (float? v) - ;; format floats with 4 decimal and dot separator - (.format df v) - v))) - metric-keys))))) - (.toString writer))) - -(defn csv-export-fn - [dimension-keys-fn meta-keys-fn metric-keys-fn data-fn] - (fn [{:keys [resps] :as options}] - (throw-custom-aggregations-not-exportable options) - (metrics-data->csv - options - (dimension-keys-fn options) - (meta-keys-fn options) - (metric-keys-fn options) - data-fn - (first resps)))) - (defn csv-dimension-keys-fn [] - (fn [{:keys [raw predefined-aggregations datasets datasets-opts mode]}] - (cond - raw + (fn [{:keys [raw queries query-specs mode]}] + (if raw [] - - predefined-aggregations - (let [{group-by-field :group-by} (get datasets-opts (first datasets)) + (let [{group-by-field :group-by} (get query-specs (first queries)) dimension-keys (case mode :single-edge-query [] :multi-edge-query [:nuvlaedge-count])] (cond-> dimension-keys - (and predefined-aggregations group-by-field) (conj group-by-field)))))) + group-by-field (conj group-by-field)))))) (defn csv-meta-keys-fn [] - (fn [{:keys [mode predefined-aggregations raw]}] - (cond - raw (case mode - :single-edge-query - [:timestamp] - :multi-edge-query - [:timestamp :nuvlaedge-id]) - predefined-aggregations [:timestamp :doc-count]))) + (fn [{:keys [mode raw]}] + (if raw + (case mode + :single-edge-query + [:timestamp] + :multi-edge-query + [:timestamp :nuvlaedge-id]) + [:timestamp :doc-count]))) (defn availability-csv-metric-keys-fn [] - (fn [{:keys [predefined-aggregations raw datasets datasets-opts]}] - (let [{:keys [response-aggs]} (get datasets-opts (first datasets))] - (cond - raw [:online] - predefined-aggregations response-aggs)))) + (fn [{:keys [raw queries query-specs]}] + (let [{:keys [response-aggs]} (get query-specs (first queries))] + (if raw [:online] response-aggs)))) (defn availability-csv-data-fn [] - (fn [{:keys [predefined-aggregations raw]} {:keys [aggregations] :as data-point} metric-key] - (cond - raw + (fn [{:keys [raw]} {:keys [aggregations] :as data-point} metric-key] + (if raw (get data-point metric-key) - - predefined-aggregations (get-in aggregations [metric-key :value])))) (defn availability-csv-export-fn [] - (csv-export-fn (csv-dimension-keys-fn) - (csv-meta-keys-fn) - (availability-csv-metric-keys-fn) - (availability-csv-data-fn))) + (ts-data-utils/csv-export-fn (csv-dimension-keys-fn) + (csv-meta-keys-fn) + (availability-csv-metric-keys-fn) + (availability-csv-data-fn))) (defn telemetry-csv-metric-keys-fn [metric] - (fn [{:keys [predefined-aggregations raw datasets datasets-opts resps]}] + (fn [{:keys [raw queries query-specs resps]}] (let [{:keys [aggregations response-aggs]} - (get datasets-opts (first datasets))] - (cond - raw + (get query-specs (first queries))] + (if raw (sort (keys (-> resps ffirst :ts-data first (get metric)))) - - predefined-aggregations (or response-aggs (keys aggregations)))))) (defn telemetry-csv-data-fn [metric] - (fn [{:keys [predefined-aggregations raw]} + (fn [{:keys [raw]} {:keys [aggregations] :as data-point} metric-key] - (cond - raw + (if raw (get-in data-point [metric metric-key]) - - predefined-aggregations (get-in aggregations [metric-key :value])))) (defn telemetry-csv-export-fn [metric] - (csv-export-fn (csv-dimension-keys-fn) - (csv-meta-keys-fn) - (telemetry-csv-metric-keys-fn metric) - (telemetry-csv-data-fn metric))) + (ts-data-utils/csv-export-fn (csv-dimension-keys-fn) + (csv-meta-keys-fn) + (telemetry-csv-metric-keys-fn metric) + (telemetry-csv-data-fn metric))) -(defn single-edge-datasets +(defn single-edge-queries [] {"availability-stats" {:metric "availability" :pre-process-fn (comp filter-available-before-period-end @@ -914,8 +750,8 @@ nb-resps))) (defn compute-nuvlaboxes-availabilities - [[{:keys [predefined-aggregations] :as query-opts} resp]] - (if predefined-aggregations + [[{:keys [raw query-type] :as query-opts} resp]] + (if (and (not raw) (= ts-data-utils/query-type-standard query-type)) (let [now (time/now) hits (->> (get-in resp [0 :hits]) (map #(update % :timestamp time/parse-date)) @@ -928,11 +764,11 @@ [query-opts resp])) (defn compute-global-availability - [[{:keys [predefined-aggregations] :as query-opts} resp]] + [[{:keys [raw query-type] :as query-opts} resp]] [query-opts (cond-> resp - predefined-aggregations + (and (not raw) (= ts-data-utils/query-type-standard query-type)) (update-resp-ts-data-point-aggs (fn [_ts-data-point {:keys [by-edge] :as aggs}] (let [avgs-count (count (:buckets by-edge)) @@ -948,11 +784,11 @@ nil)})))))]) (defn add-edges-count - [[{:keys [predefined-aggregations] :as query-opts} resp]] + [[{:keys [raw query-type] :as query-opts} resp]] [query-opts (cond-> resp - predefined-aggregations + (and (not raw) (= ts-data-utils/query-type-standard query-type)) (update-resp-ts-data-point-aggs (fn [_ts-data-point {:keys [by-edge] :as aggs}] (assoc aggs :edges-count {:value (count (:buckets by-edge))}))))]) @@ -966,8 +802,8 @@ (partial map (partial f ts-data-point)))))) (defn add-edge-names-fn - [[{:keys [predefined-aggregations nuvlaboxes] :as query-opts} resp]] - (if predefined-aggregations + [[{:keys [raw query-type nuvlaboxes] :as query-opts} resp]] + (if (and (not raw) (= ts-data-utils/query-type-standard query-type)) (let [edge-names-by-id (->> nuvlaboxes (map (fn [{:keys [id name]}] [id name])) @@ -980,8 +816,8 @@ [query-opts resp])) (defn add-missing-edges-fn - [[{:keys [predefined-aggregations granularity-duration nuvlaboxes] :as query-opts} resp]] - (if predefined-aggregations + [[{:keys [raw query-type granularity-duration nuvlaboxes] :as query-opts} resp]] + (if (and (not raw) (= ts-data-utils/query-type-standard query-type)) (letfn [(update-buckets [ts-data-point buckets] (let [bucket-edge-ids (set (map :key buckets)) @@ -1129,8 +965,8 @@ (used-memory))))))) (defn query-and-process-availabilities - [{:keys [predefined-aggregations nuvlaboxes] :as options}] - (if predefined-aggregations + [{:keys [raw query-type nuvlaboxes] :as options}] + (if (and (not raw) (= ts-data-utils/query-type-standard query-type)) (let [ret (query-and-process-availabilities* options)] [{:dimensions {:nuvlaedge-count (count nuvlaboxes)} :ts-data (mapv @@ -1145,18 +981,7 @@ ret)}]) (query-availability options))) -(defn keep-response-aggs-only - [{:keys [predefined-aggregations response-aggs] :as _query-opts} resp] - (cond-> - resp - predefined-aggregations - (update-resp-ts-data-point-aggs - (fn [_ts-data-point aggs] - (if response-aggs - (select-keys aggs response-aggs) - aggs))))) - -(defn multi-edge-datasets +(defn multi-edge-queries [] (let [group-by-field (fn [field aggs] {:terms {:field field} @@ -1266,183 +1091,59 @@ :response-aggs [:sum-energy-consumption] :csv-export-fn (telemetry-csv-export-fn :power-consumption)}})) -(defn parse-params - [{:keys [uuid dataset from to granularity custom-es-aggregations] :as params} - {:keys [accept] :as _request}] - (let [datasets (if (coll? dataset) dataset [dataset]) - raw (= "raw" granularity) - predefined-aggregations (not (or raw custom-es-aggregations)) - custom-es-aggregations (cond-> custom-es-aggregations - (string? custom-es-aggregations) - json/read-str)] - (-> params - (assoc :mime-type (:mime accept)) - (assoc :datasets datasets) - (assoc :from (time/parse-date from)) - (assoc :to (time/parse-date to)) - (cond-> - uuid (assoc :id (u/resource-id "nuvlabox" uuid)) - raw (assoc :raw true) - predefined-aggregations (assoc :predefined-aggregations true) - custom-es-aggregations (assoc :custom-es-aggregations custom-es-aggregations))))) - -(defn throw-response-format-not-supported - [{:keys [mime-type] :as params}] - (when-not mime-type - (logu/log-and-throw-400 406 "Not Acceptable")) - params) - -(defn throw-mandatory-dataset-parameter - [{:keys [datasets] :as params}] - (when-not (seq datasets) (logu/log-and-throw-400 "dataset parameter is mandatory")) - params) - -(defn throw-mandatory-from-to-parameters - [{:keys [from to] :as params}] - (when-not from - (logu/log-and-throw-400 (str "from parameter is mandatory, with format iso8601 (uuuu-MM-dd'T'HH:mm:ss[.SSS]Z)"))) - (when-not to - (logu/log-and-throw-400 (str "to parameter is mandatory, with format iso8601 (uuuu-MM-dd'T'HH:mm:ss[.SSS]Z)"))) - params) - -(defn throw-from-not-before-to - [{:keys [from to] :as params}] - (when-not (time/before? from to) - (logu/log-and-throw-400 "from must be before to")) - params) - -(defn throw-mandatory-granularity-parameter - [{:keys [raw granularity custom-es-aggregations] :as params}] - (when (and (not raw) (not custom-es-aggregations) (empty? granularity)) - (logu/log-and-throw-400 "granularity parameter is mandatory")) - params) +(defn assoc-nuvlabox-id + [{:keys [uuid] :as params}] + (cond-> params + uuid (assoc :id (u/resource-id "nuvlabox" uuid)))) + +(defn assoc-query-specs + [{:keys [mode custom-es-aggregations] :as params} _request] + (let [custom-es-aggregations (cond-> custom-es-aggregations + (string? custom-es-aggregations) + json/read-str) + query-specs (cond-> (case mode + :single-edge-query (single-edge-queries) + :multi-edge-query (multi-edge-queries)) + + (not custom-es-aggregations) + (update-vals #(assoc % :query-type ts-data-utils/query-type-standard)) + + custom-es-aggregations + (update-vals #(-> % + (assoc :query-type ts-data-utils/query-type-custom-es-query) + (assoc :aggregations custom-es-aggregations))))] + (assoc params :query-specs query-specs))) (defn throw-custom-es-aggregations-checks - [{:keys [custom-es-aggregations granularity] :as params}] + [{:keys [granularity custom-es-aggregations] :as params}] (when custom-es-aggregations (when granularity (logu/log-and-throw-400 "when custom-es-aggregations is specified, granularity parameter must be omitted"))) params) -(defn throw-too-many-data-points - [{:keys [from to granularity predefined-aggregations] :as params}] - (when predefined-aggregations - (let [max-n-buckets max-data-points - n-buckets (.dividedBy (time/duration from to) - (granularity->duration granularity))] - (when (> n-buckets max-n-buckets) - (logu/log-and-throw-400 "too many data points requested. Please restrict the time interval or increase the time granularity.")))) - params) - -(defn granularity->ts-interval - "Converts from a string of the form - to an ElasticSearch interval string" - [granularity] - (let [[_ n unit] (re-matches #"(.*)-(.*)" (name granularity))] - (str n (case unit - "seconds" "s" - "minutes" "m" - "hours" "h" - "days" "d" - "weeks" "d" - "months" "M" - (logu/log-and-throw-400 (str "unrecognized value for granularity " granularity)))))) - -(defn assoc-base-query-opts - [{:keys [predefined-aggregations granularity filter] :as params} request] - (assoc params - :base-query-opts - (-> (select-keys params [:id :from :to :granularity - :raw :custom-es-aggregations :predefined-aggregations - :mode :int-atom]) - (assoc :request request) - (cond-> - filter - (assoc :cimi-filter filter) - predefined-aggregations - (assoc :ts-interval (granularity->ts-interval granularity)))))) - -(defn assoc-datasets-opts - [{:keys [mode] :as params}] - (assoc params - :datasets-opts - (case mode - :single-edge-query (single-edge-datasets) - :multi-edge-query (multi-edge-datasets)))) - -(defn throw-unknown-datasets - [{:keys [datasets datasets-opts] :as params}] - (when-not (every? (set (keys datasets-opts)) datasets) - (logu/log-and-throw-400 (str "unknown datasets: " - (str/join "," (sort (set/difference (set datasets) - (set (keys datasets-opts)))))))) - params) - -(defn throw-csv-multi-dataset - [{:keys [datasets mime-type] :as params}] - (when (and (= "text/csv" mime-type) (not= 1 (count datasets))) - (logu/log-and-throw-400 (str "exactly one dataset must be specified with accept header 'text/csv'"))) - params) - -(defn run-query - [base-query-opts datasets-opts dataset-key] - (let [{:keys [pre-process-fn query-fn post-process-fn] :as dataset-opts} (get datasets-opts dataset-key) - {:keys [predefined-aggregations] :as query-opts} (merge base-query-opts dataset-opts) - query-opts (if pre-process-fn (doall (pre-process-fn query-opts)) query-opts)] - (cond->> (doall (query-fn query-opts)) - post-process-fn ((fn [resp] (doall (second (post-process-fn [query-opts resp]))))) - predefined-aggregations (keep-response-aggs-only query-opts)))) - -(defn run-queries - [{:keys [datasets base-query-opts datasets-opts] :as params}] - (assoc params - :resps - (map (partial run-query base-query-opts datasets-opts) datasets))) - -(defn json-data-response - [{:keys [datasets resps]}] - (r/json-response (zipmap datasets resps))) - -(defn csv-response - [{:keys [datasets datasets-opts] :as options}] - (let [{:keys [csv-export-fn]} (get datasets-opts (first datasets))] - (when-not csv-export-fn - (logu/log-and-throw-400 (str "csv export not supported for dataset " (first datasets)))) - (r/csv-response "export.csv" (csv-export-fn options)))) - -(defn send-data-response - [{:keys [mime-type] :as options}] - (case mime-type - "application/json" - (json-data-response options) - "text/csv" - (csv-response options))) +(defn assoc-query + [{:keys [dataset] :as params}] + (cond-> params + dataset (assoc :query dataset))) (defn query-data [params request] (-> params - (parse-params request) - (throw-response-format-not-supported) - (throw-mandatory-dataset-parameter) - (throw-mandatory-from-to-parameters) - (throw-from-not-before-to) - (throw-mandatory-granularity-parameter) - (throw-too-many-data-points) (throw-custom-es-aggregations-checks) - (assoc-base-query-opts request) - (assoc-datasets-opts) - (throw-unknown-datasets) - (throw-csv-multi-dataset) - (run-queries) - (send-data-response))) + (assoc-nuvlabox-id) + (assoc-query) + (assoc-query-specs request) + (ts-data-utils/query-data request))) (defn gated-query-data "Only allow one call to query-data on availability of multiple edges at a time. Allow max 4 additional requests to wait at most 5 seconds to get access to computation." - [{:keys [mode dataset] :as params} request] - (let [datasets (if (coll? dataset) dataset [dataset])] + [{:keys [mode query dataset] :as params} request] + (let [query (or query dataset) + queries (if (coll? query) query [query])] (if (and (= :multi-edge-query mode) - (some #{"availability-stats" "availability-by-edge"} datasets)) + (some #{"availability-stats" "availability-by-edge"} queries)) (if (> @requesting-query-data 4) (logu/log-and-throw 503 "Server too busy") ;; retry for up to 5 seconds (or QUERY_DATA_MAX_ATTEMPTS * 100ms) @@ -1478,9 +1179,5 @@ (defn wrapped-query-data [params request] - (let [query-data (wrap-accept (partial gated-query-data params) - {:mime ["application/json" :qs 1 - "text/csv" :qs 0.5]})] + (let [query-data (ts-data-utils/wrap-query-data-accept (partial gated-query-data params))] (query-data request))) - - diff --git a/code/src/sixsq/nuvla/server/resources/spec/nuvlabox.cljc b/code/src/sixsq/nuvla/server/resources/spec/nuvlabox.cljc index 1888e7dfa..808ca523c 100644 --- a/code/src/sixsq/nuvla/server/resources/spec/nuvlabox.cljc +++ b/code/src/sixsq/nuvla/server/resources/spec/nuvlabox.cljc @@ -98,15 +98,17 @@ ;; actions (s/def ::dataset (s/coll-of ::core/nonblank-string)) +(s/def ::query (s/coll-of ::core/nonblank-string)) (s/def ::filter (st/spec ::core/nonblank-string)) (s/def ::from (st/spec ::core/timestamp)) (s/def ::to (st/spec ::core/timestamp)) (s/def ::granularity (st/spec (s/or :raw #{"raw"} :granularity-duration data-utils/granularity->duration))) (s/def ::custom-es-aggregations any?) -(s/def ::bulk-data-body (su/only-keys-maps {:req-un [::dataset - ::from +(s/def ::bulk-data-body (su/only-keys-maps {:req-un [::from ::to] - :opt-un [::filter + :opt-un [::dataset + ::query + ::filter ::granularity ::custom-es-aggregations]})) diff --git a/code/src/sixsq/nuvla/server/resources/spec/timeseries.cljc b/code/src/sixsq/nuvla/server/resources/spec/timeseries.cljc new file mode 100644 index 000000000..9ede7dcfc --- /dev/null +++ b/code/src/sixsq/nuvla/server/resources/spec/timeseries.cljc @@ -0,0 +1,124 @@ +(ns sixsq.nuvla.server.resources.spec.timeseries + (:require + [clojure.spec.alpha :as s] + [sixsq.nuvla.server.resources.spec.common :as common] + [sixsq.nuvla.server.resources.spec.core :as core] + [sixsq.nuvla.server.util.spec :as su] + [spec-tools.core :as st])) + +(def field-types #{"keyword" "long" "double"}) + +(s/def ::field-name + (assoc (st/spec ::core/nonblank-string) + :json-schema/description "Timeseries field name")) + +(s/def ::field-type + (assoc (st/spec field-types) + :json-schema/type "string" + :json-schema/description "Timeseries field name")) + +(s/def ::description + (-> (st/spec string?) + (assoc :json-schema/description "human-readable description"))) + +(s/def ::dimension + (assoc (st/spec (su/only-keys + :req-un [::field-name + ::field-type] + :opt-un [::description])) + :json-schema/type "map" + :json-schema/description "Timeseries dimension")) + +(s/def ::dimensions + (-> (st/spec (s/coll-of ::dimension :kind vector? :distinct true :min-count 1)) + (assoc :json-schema/description "Timeseries dimensions"))) + +(def metric-types #{"gauge" "counter"}) + +(s/def ::metric-type + (assoc (st/spec metric-types) + :json-schema/type "string" + :json-schema/description "Timeseries metric type")) + +(s/def ::optional + (-> (st/spec boolean?) + (assoc :json-schema/type "boolean" + :json-schema/description "optional value ? (default false)"))) + +(s/def ::metric + (assoc (st/spec (su/only-keys + :req-un [::field-name + ::field-type + ::metric-type] + :opt-un [::description + ::optional])) + :json-schema/type "map" + :json-schema/description "Timeseries metric")) + +(s/def ::metrics + (-> (st/spec (s/coll-of ::metric :kind vector? :distinct true :min-count 1)) + (assoc :json-schema/description "Timeseries metrics"))) + +(s/def ::query-name + (assoc (st/spec ::core/nonblank-string) + :json-schema/description "Timeseries query name")) + +(def query-types #{"standard" "custom-es-query"}) + +(s/def ::query-type + (assoc (st/spec query-types) + :json-schema/type "string" + :json-schema/description "Timeseries query type")) + +(s/def ::aggregation-name + (assoc (st/spec ::core/nonblank-string) + :json-schema/description "Timeseries query aggregation name")) + +(def aggregation-types #{"avg" "min" "max" "sum" "value_count"}) + +(s/def ::aggregation-type + (assoc (st/spec aggregation-types) + :json-schema/type "string" + :json-schema/description "Timeseries query aggregation type")) + +(s/def ::aggregation + (assoc (st/spec (su/only-keys + :req-un [::aggregation-name + ::aggregation-type + ::field-name])) + :json-schema/type "map" + :json-schema/description "Timeseries query aggregation specification")) + +(s/def ::aggregations + (-> (st/spec (s/coll-of ::aggregation :kind vector? :distinct true)) + (assoc :json-schema/description "Query aggregations"))) + +(s/def ::query + (assoc (st/spec (su/only-keys + :req-un [::aggregations])) + :json-schema/type "map" + :json-schema/description "Timeseries query")) + +(s/def ::custom-es-query + (-> (st/spec (su/constrained-map keyword? any?)) + (assoc :json-schema/type "map" + :json-schema/description "custom ElasticSearch query"))) + +(s/def ::query-definition + (assoc (st/spec (su/only-keys + :req-un [::query-name + ::query-type] + :opt-un [::description + ::query + ::custom-es-query])) + :json-schema/type "map" + :json-schema/description "Timeseries query definition")) + +(s/def ::queries + (-> (st/spec (s/coll-of ::query-definition :kind vector? :distinct true)) + (assoc :json-schema/description "Queries supported by the timeseries"))) + +(s/def ::schema + (su/only-keys-maps common/common-attrs + {:req-un [::dimensions ::metrics] + :opt-un [::queries]})) diff --git a/code/src/sixsq/nuvla/server/resources/timeseries.clj b/code/src/sixsq/nuvla/server/resources/timeseries.clj new file mode 100644 index 000000000..2a7a2f192 --- /dev/null +++ b/code/src/sixsq/nuvla/server/resources/timeseries.clj @@ -0,0 +1,169 @@ +(ns sixsq.nuvla.server.resources.timeseries + " +The `timeseries` resources represent a timeseries. +" + (:require + [sixsq.nuvla.auth.acl-resource :as a] + [sixsq.nuvla.db.impl :as db] + [sixsq.nuvla.server.resources.common.crud :as crud] + [sixsq.nuvla.server.resources.common.std-crud :as std-crud] + [sixsq.nuvla.server.resources.common.utils :as u] + [sixsq.nuvla.server.resources.spec.timeseries :as timeseries] + [sixsq.nuvla.server.resources.timeseries.data-utils :as data-utils] + [sixsq.nuvla.server.resources.timeseries.utils :as utils] + [sixsq.nuvla.server.util.response :as r])) + + +(def ^:const resource-type (u/ns->type *ns*)) + + +(def ^:const collection-type (u/ns->collection-type *ns*)) + + +(def collection-acl {:query ["group/nuvla-user"] + :add ["group/nuvla-user"] + :bulk-action ["group/nuvla-user"]}) + +;; +;; "Implementations" of multimethod declared in crud namespace +;; + + +(def validate-fn (u/create-spec-validation-fn ::timeseries/schema)) + +(defn validate + [resource] + (validate-fn resource)) + +(defmethod crud/validate resource-type + [resource] + (validate resource)) + +;; +;; use default ACL method +;; + +(defmethod crud/add-acl resource-type + [resource request] + (a/add-acl (dissoc resource :acl) request)) + +(def add-impl (std-crud/add-fn resource-type collection-acl resource-type + :validate-fn validate)) + +(defmethod crud/add resource-type + [request] + (let [{status :status {:keys [resource-id]} :body :as response} (add-impl request)] + (when (= 201 status) + (-> (crud/retrieve-by-id-as-admin resource-id) + (utils/create-timeseries))) + response)) + + +(def retrieve-impl (std-crud/retrieve-fn resource-type)) + + +(defmethod crud/retrieve resource-type + [request] + (retrieve-impl request)) + +(def edit-impl (std-crud/edit-fn resource-type)) + +(defmethod crud/edit resource-type + [{{uuid :uuid} :params :as request}] + (let [current (-> (str resource-type "/" uuid) + crud/retrieve-by-id-as-admin + (a/throw-cannot-edit request)) + resp (-> request + (utils/throw-dimensions-can-only-be-appended current) + (utils/throw-metrics-can-only-be-added current) + edit-impl)] + (utils/edit-timeseries (:body resp)) + resp)) + +(def delete-impl (std-crud/delete-fn resource-type)) + + +(defmethod crud/delete resource-type + [request] + (let [{:keys [status] :as response} (delete-impl request)] + (when (= 200 status) + (utils/delete-timeseries (u/request->resource-id request))) + response)) + +;; +;; insert/bulk insert datapoints actions +;; + +(defmethod crud/do-action [resource-type utils/action-insert] + [{{uuid :uuid} :params body :body :as request}] + (try + (let [id (str resource-type "/" uuid) + timeseries-index (utils/resource-id->timeseries-index id) + timeseries (-> (crud/retrieve-by-id-as-admin id) + (a/throw-cannot-manage request))] + (->> body + (utils/add-timestamp) + (utils/validate-datapoint timeseries) + (db/add-timeseries-datapoint timeseries-index))) + (catch Exception e + (or (ex-data e) (throw e))))) + +(defmethod crud/do-action [resource-type utils/action-bulk-insert] + [{{uuid :uuid} :params body :body :as request}] + (std-crud/throw-bulk-header-missing request) + (try + (let [id (str resource-type "/" uuid) + timeseries-index (utils/resource-id->timeseries-index id) + timeseries (-> (crud/retrieve-by-id-as-admin id) + (a/throw-cannot-manage request))] + (->> body + (map utils/add-timestamp) + (utils/validate-datapoints timeseries) + (db/bulk-insert-timeseries-datapoints timeseries-index)) + (r/map-response "bulk insert of timeseries datapoints executed successfully" 200)) + (catch Exception e + (or (ex-data e) (throw e))))) + +;; +;; data query action +;; + +(defmethod crud/do-action [resource-type utils/action-data] + [{:keys [params] :as request}] + (try + (data-utils/wrapped-query-data params request) + (catch Exception e + (or (ex-data e) (throw e))))) + +;; +;; available operations +;; + +(defmethod crud/set-operations resource-type + [{:keys [id] :as resource} request] + (let [delete-op (u/operation-map id :delete) + insert-op (u/action-map id utils/action-insert) + bulk-insert-op (u/action-map id utils/action-bulk-insert) + data-op (u/action-map id utils/action-data) + can-manage? (a/can-manage? resource request)] + (assoc resource + :operations + (cond-> [] + (a/can-delete? resource request) (conj delete-op) + can-manage? (conj insert-op bulk-insert-op data-op))))) + + +;; +;; collection +;; + +(def query-impl (std-crud/query-fn resource-type collection-acl collection-type)) + +(defmethod crud/query resource-type + [request] + (query-impl request)) + +(defn initialize + [] + (std-crud/initialize resource-type ::timeseries/schema)) + diff --git a/code/src/sixsq/nuvla/server/resources/timeseries/data_utils.clj b/code/src/sixsq/nuvla/server/resources/timeseries/data_utils.clj new file mode 100644 index 000000000..fb1851ed1 --- /dev/null +++ b/code/src/sixsq/nuvla/server/resources/timeseries/data_utils.clj @@ -0,0 +1,545 @@ +(ns sixsq.nuvla.server.resources.timeseries.data-utils + (:require [clojure.data.csv :as csv] + [clojure.data.json :as json] + [clojure.set :as set] + [clojure.string :as str] + [ring.middleware.accept :refer [wrap-accept]] + [sixsq.nuvla.auth.acl-resource :as a] + [sixsq.nuvla.db.filter.parser :as parser] + [sixsq.nuvla.db.impl :as db] + [sixsq.nuvla.server.resources.common.crud :as crud] + [sixsq.nuvla.server.resources.timeseries.utils :as utils] + [sixsq.nuvla.server.util.log :as logu] + [sixsq.nuvla.server.util.response :as r] + [sixsq.nuvla.server.util.time :as time]) + (:import + (java.io StringWriter) + (java.text DecimalFormat DecimalFormatSymbols) + (java.util Locale))) + +(def max-data-points 200) + +(def query-type-standard "standard") +(def query-type-custom-es-query "custom-es-query") + +(defn update-resp-ts-data + [resp f] + (-> resp + vec + (update-in [0 :ts-data] (comp vec f)))) + +(defn update-resp-ts-data-points + [resp f] + (update-resp-ts-data + resp + (fn [ts-data] + (mapv f ts-data)))) + +(defn update-resp-ts-data-point-aggs + [resp f] + (update-resp-ts-data-points + resp + (fn [ts-data-point] + (update ts-data-point :aggregations (partial f ts-data-point))))) + +(defn granularity->duration + "Converts from a string of the form - to java.time duration" + [granularity] + (let [[_ n unit] (re-matches #"(.*)-(.*)" (name granularity))] + (try + (time/duration (Integer/parseInt n) (keyword unit)) + (catch Exception _ + (logu/log-and-throw-400 (str "unrecognized value for granularity " granularity)))))) + +(defn keep-response-aggs-only + [{:keys [query-type response-aggs] :as _query-opts} resp] + (cond-> + resp + (= query-type-standard query-type) + (update-resp-ts-data-point-aggs + (fn [_ts-data-point aggs] + (if response-aggs + (select-keys aggs response-aggs) + aggs))))) + +(defn parse-params + [{:keys [query from to granularity custom-es-aggregations] :as params} + {:keys [accept] :as _request}] + (let [queries (if (coll? query) + query + (if (some? query) [query] [])) + raw (= "raw" granularity)] + (-> params + (assoc :mime-type (:mime accept)) + (assoc :queries queries) + (assoc :from (time/parse-date from)) + (assoc :to (time/parse-date to)) + (cond-> raw (assoc :raw true))))) + +(defn throw-response-format-not-supported + [{:keys [mime-type] :as params}] + (when-not mime-type + (logu/log-and-throw-400 406 "Not Acceptable")) + params) + +(defn throw-mandatory-query-parameter + [{:keys [raw queries] :as params}] + (when (and (not raw) (not (seq queries))) + (logu/log-and-throw-400 "query parameter is mandatory")) + params) + +(defn throw-mandatory-from-to-parameters + [{:keys [from to] :as params}] + (when-not from + (logu/log-and-throw-400 (str "from parameter is mandatory, with format iso8601 (uuuu-MM-dd'T'HH:mm:ss[.SSS]Z)"))) + (when-not to + (logu/log-and-throw-400 (str "to parameter is mandatory, with format iso8601 (uuuu-MM-dd'T'HH:mm:ss[.SSS]Z)"))) + params) + +(defn throw-from-not-before-to + [{:keys [from to] :as params}] + (when-not (time/before? from to) + (logu/log-and-throw-400 "from must be before to")) + params) + +(defn throw-mandatory-granularity-parameter + [{:keys [raw granularity queries query-specs] :as params}] + (when (and (not raw) + (->> (select-keys query-specs queries) + vals + (some #(= query-type-standard (:query-type %)))) + (empty? granularity)) + (logu/log-and-throw-400 "granularity parameter is mandatory with standard queries")) + params) + +(defn throw-too-many-data-points + [{:keys [from to granularity raw] :as params}] + (when (and granularity (not raw)) + (let [max-n-buckets max-data-points + n-buckets (.dividedBy (time/duration from to) + (granularity->duration granularity))] + (when (> n-buckets max-n-buckets) + (logu/log-and-throw-400 "too many data points requested. Please restrict the time interval or increase the time granularity.")))) + params) + +(defn granularity->ts-interval + "Converts from a string of the form - to an ElasticSearch interval string" + [granularity] + (let [[_ n unit] (re-matches #"(.*)-(.*)" (name granularity))] + (str n (case unit + "seconds" "s" + "minutes" "m" + "hours" "h" + "days" "d" + "weeks" "d" + "months" "M" + (logu/log-and-throw-400 (str "unrecognized value for granularity " granularity)))))) + +(defn assoc-request + [params request] + (assoc params :request request)) + +(defn assoc-cimi-filter + [{:keys [filter] :as params}] + (cond-> params filter (assoc :cimi-filter filter))) + +(defn assoc-ts-interval + [{:keys [raw granularity] :as params}] + (cond-> params + (and granularity (not raw)) + (assoc :ts-interval (granularity->ts-interval granularity)))) + +(defn throw-unknown-queries + [{:keys [queries query-specs] :as params}] + (when-not (every? (set (keys query-specs)) queries) + (logu/log-and-throw-400 (str "unknown queries: " + (str/join "," (sort (set/difference (set queries) + (set (keys query-specs)))))))) + params) + +(defn throw-csv-multi-query + [{:keys [queries mime-type] :as params}] + (when (and (= "text/csv" mime-type) (not= 1 (count queries))) + (logu/log-and-throw-400 (str "exactly one query must be specified with accept header 'text/csv'"))) + params) + +(defn run-query + [{:keys [raw] :as params} query-specs query-key] + (let [{:keys [pre-process-fn query-fn post-process-fn] :as query-spec} (get query-specs query-key) + {:keys [query-type] :as query-opts} (merge params query-spec) + query-opts (if pre-process-fn (doall (pre-process-fn query-opts)) query-opts)] + (cond->> (doall (query-fn query-opts)) + post-process-fn ((fn [resp] (doall (second (post-process-fn [query-opts resp]))))) + (and (not raw) (= query-type-standard query-type)) (keep-response-aggs-only query-opts)))) + +(defn run-queries + [{:keys [queries query-specs] :as params}] + (assoc params + :resps + (map (partial run-query params query-specs) queries))) + +(defn json-data-response + [{:keys [queries resps]}] + (r/json-response (zipmap queries resps))) + +(defn csv-response + [{:keys [queries query-specs] :as options}] + (let [{:keys [csv-export-fn]} (get query-specs (first queries))] + (when-not csv-export-fn + (logu/log-and-throw-400 (str "csv export not supported for query " (first queries)))) + (r/csv-response "export.csv" (csv-export-fn options)))) + +(defn send-data-response + [{:keys [mime-type] :as options}] + (case mime-type + "application/json" + (json-data-response options) + "text/csv" + (csv-response options))) + +(defn query-data + [params request] + (-> params + (parse-params request) + (throw-response-format-not-supported) + (throw-mandatory-query-parameter) + (throw-mandatory-from-to-parameters) + (throw-from-not-before-to) + (throw-mandatory-granularity-parameter) + (throw-too-many-data-points) + (assoc-request request) + (assoc-cimi-filter) + (assoc-ts-interval) + (throw-unknown-queries) + (throw-csv-multi-query) + (run-queries) + (send-data-response))) + +(defn wrap-query-data-accept + [handler] + (wrap-accept handler + {:mime ["application/json" :qs 1 + "text/csv" :qs 0.5]})) + +(defn assoc-timeseries + [{uuid :uuid :as params} request] + (let [id (str "timeseries/" uuid) + timeseries-index (utils/resource-id->timeseries-index id) + timeseries (-> (crud/retrieve-by-id-as-admin id) + (a/throw-cannot-manage request))] + (assoc params + :timeseries-index timeseries-index + :timeseries timeseries))) + +(defn ->resp-dimensions + [{:keys [timeseries dimensions-filters]}] + (->> (for [{:keys [field-name]} (:dimensions timeseries)] + (let [v (get dimensions-filters field-name)] + (cond + (nil? v) + {field-name "all"} + + (= (count v) 1) + {field-name (first v)} + + (pos? (count v)) + {field-name {:count (count v)}}))) + (into {}))) + +(defn ->standard-query-resp + [{:keys [aggregations ->resp-dimensions-fn] group-by-field :group-by :as params} resp] + (let [ts-data (fn [tsds-stats] + (map + (fn [{:keys [key_as_string doc_count] :as bucket}] + {:timestamp key_as_string + :doc-count doc_count + :aggregations (->> (keys aggregations) + (map keyword) + (select-keys bucket))}) + (:buckets tsds-stats))) + hits (second resp)] + (if group-by-field + (for [{:keys [key tsds-stats]} (get-in resp [0 :aggregations :by-field :buckets])] + (cond-> + {:dimensions (assoc (->resp-dimensions-fn params) group-by-field key) + :ts-data (ts-data tsds-stats)} + (seq hits) (assoc :hits hits))) + [(cond-> + {:dimensions (->resp-dimensions-fn params) + :ts-data (ts-data (get-in resp [0 :aggregations :tsds-stats]))} + (seq hits) (assoc :hits hits))]))) + +(defn ->custom-es-query-resp + [{:keys [->resp-dimensions-fn] :as params} resp] + (let [ts-data (fn [tsds-stats] + (map + (fn [{:keys [key_as_string doc_count] :as bucket}] + {:timestamp key_as_string + :doc-count doc_count + :aggregations (dissoc bucket :key_as_string :key :doc_count)}) + (:buckets tsds-stats)))] + [(merge {:dimensions (->resp-dimensions-fn params)} + (into {} (for [agg-key (keys (get-in resp [0 :aggregations]))] + [agg-key (ts-data (get-in resp [0 :aggregations agg-key]))])))])) + +(defn ->raw-resp + [{:keys [->resp-dimensions-fn] :as params} resp] + (let [hits (second resp)] + [{:dimensions (->resp-dimensions-fn params) + :ts-data (sort-by :timestamp hits)}])) + +(defn ->ts-query-resp + [{:keys [query-type raw] :as params} resp] + (cond + raw + (->raw-resp params resp) + + (= query-type-standard query-type) + (->standard-query-resp params resp) + + (= query-type-custom-es-query query-type) + (->custom-es-query-resp params resp))) + +(defn build-aggregations-clause + [{:keys [from to ts-interval raw query-type aggregations] + group-by-field :group-by}] + (cond + raw + {} ;; send an empty :tsds-aggregation to avoid acl checks. TODO: find a cleaner way + + (= query-type-standard query-type) + (let [tsds-aggregations {:tsds-stats + {:date_histogram + {:field "@timestamp" + :fixed_interval ts-interval + :min_doc_count 0 + :extended_bounds {:min (time/to-str from) + :max (time/to-str to)}} + :aggregations (or aggregations {})}}] + (if group-by-field + {:aggregations + {:by-field + {:terms {:field group-by-field} + :aggregations tsds-aggregations}}} + {:aggregations tsds-aggregations})) + + (= query-type-custom-es-query query-type) + {:aggregations aggregations})) + +(defn dimension-filter->cimi-filter + [[dimension values]] + (str dimension "=[" (str/join " " (map #(str "'" % "'") values)) "]")) + +(defn build-ts-query [{:keys [last dimensions-filters from to additional-filters orderby] :as options}] + (let [time-range-filter (str "@timestamp>'" (time/to-str from) "'" + " and " + "@timestamp<'" (time/to-str to) "'") + aggregation-clause (build-aggregations-clause options)] + (cond-> + {:cimi-params (cond-> + {:last (or last 0) + :filter + (parser/parse-cimi-filter + (str "(" + (apply str + (interpose " and " + (concat [time-range-filter] + (map dimension-filter->cimi-filter dimensions-filters) + additional-filters))) + ")"))} + orderby (assoc :orderby orderby))} + aggregation-clause + (assoc :params {:tsds-aggregation (json/write-str aggregation-clause)})))) + +(defn build-query [{:keys [raw] :as options}] + (-> (build-ts-query (cond-> options + raw (assoc :last max-data-points))) + (assoc :no-prefix true))) + +(defn generic-query-fn + [{:keys [timeseries-index] :as params}] + (->> (build-query params) + (crud/query-as-admin timeseries-index) + (->ts-query-resp (assoc params :->resp-dimensions-fn ->resp-dimensions)))) + +(defmulti ts-query->query-spec (fn [_params {:keys [query-type]}] query-type)) + +(defmethod ts-query->query-spec :default + [_params {:keys [query-type]}] + (logu/log-and-throw-400 (str "unrecognized query type " query-type))) + +(defn parse-aggregations + [aggregations] + (->> aggregations + (map (fn [{:keys [aggregation-name aggregation-type field-name]}] + [aggregation-name {(keyword aggregation-type) {:field field-name}}])) + (into {}))) + +(defn throw-custom-aggregations-not-exportable + [{:keys [custom-es-aggregations]}] + (when custom-es-aggregations + (logu/log-and-throw-400 "Custom aggregations cannot be exported to csv format"))) + +(defn metrics-data->csv [options dimension-keys meta-keys metric-keys data-fn response] + (with-open [writer (StringWriter.)] + ;; write csv header + (csv/write-csv writer [(concat (map name dimension-keys) + (map name meta-keys) + (map name metric-keys))]) + ;; write csv data + (let [df (DecimalFormat. "0.####" (DecimalFormatSymbols. Locale/US))] + (csv/write-csv writer + (for [{:keys [dimensions ts-data]} response + data-point ts-data] + (concat (map dimensions dimension-keys) + (map data-point meta-keys) + (map (fn [metric-key] + (let [v (data-fn options data-point metric-key)] + (if (float? v) + ;; format floats with 4 decimal and dot separator + (.format df v) + v))) + metric-keys))))) + (.toString writer))) + +(defn csv-export-fn + [dimension-keys-fn meta-keys-fn metric-keys-fn data-fn] + (fn [{:keys [resps] :as options}] + (throw-custom-aggregations-not-exportable options) + (metrics-data->csv + options + (dimension-keys-fn options) + (meta-keys-fn options) + (metric-keys-fn options) + data-fn + (first resps)))) + +(defn csv-dimension-keys-fn + [{:keys [dimensions]} {:keys [query-type] :as _query-spec}] + (fn [{:keys [raw queries query-specs]}] + (cond + raw + [] + + (= query-type-standard query-type) + (let [{group-by-field :group-by} (get query-specs (first queries)) + dimension-keys (map :field-name dimensions)] + (cond-> dimension-keys + group-by-field (conj group-by-field)))))) + +(defn csv-meta-keys-fn + [{:keys [dimensions]} {:keys [query-type] :as _query-spec}] + (fn [{:keys [raw]}] + (cond + raw (concat [:timestamp] (map (comp keyword :field-name) dimensions)) + (= query-type-standard query-type) [:timestamp :doc-count]))) + +(defn csv-query-keys-fn + [{:keys [metrics]} {:keys [query-type] :as _query-spec}] + (fn [{:keys [raw queries query-specs]}] + (cond + raw + (sort (map :field-name metrics)) + + (= query-type-standard query-type) + (let [{:keys [aggregations response-aggs]} + (get query-specs (first queries))] + (or response-aggs (keys aggregations)))))) + +(defn csv-data-fn + [{:keys [query-type] :as _query-spec}] + (fn [{:keys [raw]} + {:keys [aggregations] :as data-point} metric-key] + (cond + raw + (get data-point (keyword metric-key)) + + (= query-type-standard query-type) + (get-in aggregations [(keyword metric-key) :value])))) + +(defn generic-csv-export-fn + [timeseries ts-query] + (csv-export-fn (csv-dimension-keys-fn timeseries ts-query) + (csv-meta-keys-fn timeseries ts-query) + (csv-query-keys-fn timeseries ts-query) + (csv-data-fn ts-query))) + +(defmethod ts-query->query-spec query-type-standard + [{:keys [timeseries]} {:keys [query] :as ts-query}] + {:query-type query-type-standard + :query-fn generic-query-fn + :aggregations (some-> query :aggregations parse-aggregations) + :csv-export-fn (generic-csv-export-fn timeseries ts-query)}) + +(defmethod ts-query->query-spec query-type-custom-es-query + [_params {:keys [custom-es-query] :as _ts-query}] + {:query-type query-type-custom-es-query + :query-fn generic-query-fn + :aggregations (some-> custom-es-query :aggregations)}) + +(defn assoc-query-specs + [{:keys [timeseries] :as params}] + (let [query-specs (-> (get timeseries :queries) + (->> (group-by :query-name)) + (update-vals (comp (partial ts-query->query-spec params) first)))] + (cond-> params + query-specs (assoc :query-specs query-specs)))) + +(defn parse-dimension-filter + [s] + (let [[_ dimension value] (re-matches #"(.*)=(.*)" s)] + [dimension value])) + +(defn assoc-dimensions-filters + [{:keys [dimension-filter] :as params}] + (let [dimension-filter (when dimension-filter + (if (coll? dimension-filter) dimension-filter [dimension-filter])) + dimensions-filters (-> (->> dimension-filter + (map parse-dimension-filter) + (group-by first)) + (update-vals #(map second %)))] + (cond-> params + dimensions-filters (assoc :dimensions-filters dimensions-filters)))) + +(defn throw-invalid-dimensions + [{:keys [dimensions-filters] {:keys [dimensions]} :timeseries :as params}] + (let [dimensions-filters-keys (set (keys dimensions-filters)) + dimensions-field-names (set (map :field-name dimensions))] + (when (seq dimensions-filters-keys) + (when-not (set/subset? dimensions-filters-keys dimensions-field-names) + (throw (r/ex-response (str "invalid dimensions: " + (str/join "," (set/difference dimensions-filters-keys dimensions-field-names))) + 400))))) + params) + +(defn throw-timeseries-not-created-yet + [{:keys [timeseries-index] :as params}] + (when-not (db/retrieve-timeseries timeseries-index) + (throw (r/ex-response "timeseries not created yet. Insert some data prior to querying the timeseries" + 404))) + params) + +(defn assoc-raw-query + "If granularity is raw and no query is specified, generate a new query returning all the metrics." + [{:keys [granularity query timeseries] :as params}] + (cond-> params + (and (= "raw" granularity) (nil? query)) + (assoc :query ["raw"] + :query-specs {"raw" {:query-fn generic-query-fn + :csv-export-fn (generic-csv-export-fn timeseries {:query-name "raw"})}}))) + +(defn generic-ts-query-data + [params request] + (-> params + (assoc-timeseries request) + (assoc-query-specs) + (assoc-dimensions-filters) + (throw-invalid-dimensions) + (throw-timeseries-not-created-yet) + (assoc-raw-query) + (query-data request))) + +(defn wrapped-query-data + [params request] + (let [query-data (wrap-query-data-accept (partial generic-ts-query-data params))] + (query-data request))) diff --git a/code/src/sixsq/nuvla/server/resources/timeseries/utils.clj b/code/src/sixsq/nuvla/server/resources/timeseries/utils.clj new file mode 100644 index 000000000..d34729d7e --- /dev/null +++ b/code/src/sixsq/nuvla/server/resources/timeseries/utils.clj @@ -0,0 +1,172 @@ +(ns sixsq.nuvla.server.resources.timeseries.utils + (:require [clojure.set :as set] + [clojure.string :as str] + [sixsq.nuvla.db.impl :as db] + [sixsq.nuvla.server.resources.common.utils :as u] + [sixsq.nuvla.server.util.response :as r] + [sixsq.nuvla.server.util.time :as time])) + +(def action-insert "insert") +(def action-bulk-insert "bulk-insert") +(def action-data "data") + +(defn resource-id->timeseries-index + [resource-id] + (str "ts-" (u/id->uuid resource-id))) + +(defn dimension->es-property + [{:keys [field-name field-type]}] + [field-name {:type field-type + :time_series_dimension true}]) + +(defn metric->es-property + [{:keys [field-name field-type metric-type]}] + [field-name {:type field-type + :time_series_metric metric-type}]) + +(defn ts-resource->mappings + [{:keys [dimensions metrics]}] + {:properties + (into {"@timestamp" {:type "date" + :format "strict_date_optional_time||epoch_millis"}} + (concat + (map dimension->es-property dimensions) + (map metric->es-property metrics)))}) + +(defn ts-resource->routing-path + [{:keys [dimensions]}] + (mapv :field-name dimensions)) + +(defn create-timeseries + [{:keys [id] :as resource}] + (let [mappings (ts-resource->mappings resource) + routing-path (ts-resource->routing-path resource)] + (db/create-timeseries + (resource-id->timeseries-index id) + {:mappings mappings + :routing-path routing-path}))) + +(defn throw-dimensions-can-only-be-appended + [{{new-dimensions :dimensions} :body :as request} + {current-dimensions :dimensions :as _current}] + (when current-dimensions + (when-not (and (>= (count new-dimensions) (count current-dimensions)) + (= (map #(dissoc % :description) + current-dimensions) + (map #(dissoc % :description) + (subvec new-dimensions 0 (count current-dimensions))))) + (throw (r/ex-response "dimensions can only be appended" 400)))) + request) + +(defn throw-metrics-can-only-be-added + [{{new-metrics :metrics} :body :as request} + {current-metrics :metrics :as _current}] + (when-not (every? (fn [{:keys [field-name] :as current-metric}] + (= (dissoc current-metric :description) + (dissoc (->> new-metrics + (filter #(= field-name (:field-name %))) + first) + :description))) + current-metrics) + (throw (r/ex-response "metrics can only be added" 400))) + request) + +(defn edit-timeseries + [{:keys [id] :as resource}] + (let [mappings (ts-resource->mappings resource) + routing-path (ts-resource->routing-path resource)] + (db/edit-timeseries + (resource-id->timeseries-index id) + {:mappings mappings + :routing-path routing-path}))) + +(defn delete-timeseries + [resource-id] + (db/delete-timeseries + (resource-id->timeseries-index resource-id))) + +(defn throw-missing-dimensions + [{:keys [dimensions] :as _timeseries} datapoint] + (let [missing (->> dimensions + (filter #(nil? (get datapoint (keyword (:field-name %))))))] + (if (empty? missing) + datapoint + (throw (r/ex-response + (str "missing value for dimensions: " (str/join "," (map :field-name missing))) + 400))))) + +(defn throw-missing-mandatory-metrics + [{:keys [metrics] :as _timeseries} datapoint] + (let [missing (->> metrics + (filter #(not (:optional %))) + (filter #(nil? (get datapoint (keyword (:field-name %))))))] + (if (empty? missing) + datapoint + (throw (r/ex-response + (str "missing value for mandatory metrics: " (str/join "," (map :field-name missing))) + 400))))) + +(defn throw-wrong-type + [{:keys [field-name field-type] :as _field} field-value] + (let [check-type-fn (case field-type + "long" int? + "double" number? + "keyword" string?)] + (if (check-type-fn field-value) + field-value + (throw (r/ex-response + (str "a value with the wrong type was provided for field " field-name ": " field-value) + 400))))) + +(defn throw-wrong-types + [{:keys [dimensions metrics] :as _timeseries} datapoint] + (doseq [{:keys [field-name] :as field} (concat dimensions metrics)] + (some->> (get datapoint (keyword field-name)) + (throw-wrong-type field))) + datapoint) + +(defn throw-extra-keys + [{:keys [dimensions metrics] :as _timeseries} datapoint] + (let [extra-keys (set/difference (set (keys (dissoc datapoint :timestamp))) + (->> (concat dimensions metrics) + (map (comp keyword :field-name)) + set))] + (if (empty? extra-keys) + datapoint + (throw (r/ex-response + (str "unexpected keys: " (str/join "," extra-keys)) + 400))))) + +(defn throw-outside-acceptable-time-range + [_timeseries {:keys [timestamp] :as datapoint}] + (let [ts (time/parse-date timestamp) + now (time/now) + look-ahead-time (time/duration-unit 2 :hours) + look-back-time (time/duration-unit 7 :days) + start-time (time/minus now look-back-time) + end-time (time/plus now look-ahead-time)] + (if (and (time/before? start-time ts) (time/before? ts end-time)) + datapoint + (throw (r/ex-response + (str "timestamp is outside acceptable range: " ts " not in [" start-time " - " end-time "]") + 400))))) + +(defn validate-datapoint + [timeseries datapoint] + (->> datapoint + (throw-missing-dimensions timeseries) + (throw-missing-mandatory-metrics timeseries) + (throw-wrong-types timeseries) + (throw-extra-keys timeseries) + (throw-outside-acceptable-time-range timeseries))) + +(defn validate-datapoints + [timeseries datapoints] + (doseq [datapoint datapoints] + (validate-datapoint timeseries datapoint)) + datapoints) + +(defn add-timestamp + [{:keys [timestamp] :as datapoint}] + (cond-> datapoint + (not timestamp) (assoc :timestamp (time/now-str)))) diff --git a/code/src/sixsq/nuvla/server/resources/ts_nuvlaedge_availability.clj b/code/src/sixsq/nuvla/server/resources/ts_nuvlaedge_availability.clj index 534e9a3ea..22c8cec3f 100644 --- a/code/src/sixsq/nuvla/server/resources/ts_nuvlaedge_availability.clj +++ b/code/src/sixsq/nuvla/server/resources/ts_nuvlaedge_availability.clj @@ -39,9 +39,9 @@ The `ts-nuvlaedge` resources create a timeseries related to nuvlaedge availabili [resource _request] resource) -(def add-impl (std-crud/add-metric-fn resource-type collection-acl resource-type - :validate-fn validate - :options {:refresh false})) +(def add-impl (std-crud/add-timeseries-datapoint-fn resource-type collection-acl resource-type + :validate-fn validate + :options {:refresh false})) (defmethod crud/add resource-type diff --git a/code/src/sixsq/nuvla/server/resources/ts_nuvlaedge_telemetry.clj b/code/src/sixsq/nuvla/server/resources/ts_nuvlaedge_telemetry.clj index 4c435c27d..9cd5cf53a 100644 --- a/code/src/sixsq/nuvla/server/resources/ts_nuvlaedge_telemetry.clj +++ b/code/src/sixsq/nuvla/server/resources/ts_nuvlaedge_telemetry.clj @@ -3,6 +3,7 @@ The `ts-nuvlaedge` resources create a timeseries related to nuvlaedge. " (:require + [sixsq.nuvla.db.es.common.utils :as escu] [sixsq.nuvla.server.resources.common.crud :as crud] [sixsq.nuvla.server.resources.common.std-crud :as std-crud] [sixsq.nuvla.server.resources.common.utils :as u] @@ -47,9 +48,9 @@ The `ts-nuvlaedge` resources create a timeseries related to nuvlaedge. [resource _request] resource) -(def add-impl (std-crud/add-metric-fn resource-type collection-acl resource-type - :validate-fn validate - :options {:refresh false})) +(def add-impl (std-crud/add-timeseries-datapoint-fn resource-type collection-acl resource-type + :validate-fn validate + :options {:refresh false})) (defmethod crud/add resource-type @@ -94,7 +95,9 @@ The `ts-nuvlaedge` resources create a timeseries related to nuvlaedge. (query-impl request)) -(def bulk-insert-impl (std-crud/bulk-insert-metrics-fn resource-type collection-acl collection-type)) +(def bulk-insert-impl (std-crud/bulk-insert-timeseries-datapoints-fn + (escu/collection-id->index resource-type) + collection-acl collection-type)) (defmethod crud/bulk-action [resource-type "bulk-insert"] [request] @@ -103,4 +106,4 @@ The `ts-nuvlaedge` resources create a timeseries related to nuvlaedge. (defn initialize [] - (std-crud/initialize-as-timeseries resource-type ::ts-nuvlaedge-telemetry/schema)) \ No newline at end of file + (std-crud/initialize-as-timeseries resource-type ::ts-nuvlaedge-telemetry/schema)) diff --git a/code/test/sixsq/nuvla/db/es/binding_test.clj b/code/test/sixsq/nuvla/db/es/binding_test.clj index 2bbf1bb69..6061bb6e6 100644 --- a/code/test/sixsq/nuvla/db/es/binding_test.clj +++ b/code/test/sixsq/nuvla/db/es/binding_test.clj @@ -50,7 +50,7 @@ datastream-index-name "test-ts-index-1"] (testing "Create timeseries template" - (t/create-timeseries-template client index-name mapping {:routing-path routing-path}) + (t/create-or-update-timeseries-template client index-name mapping {:routing-path routing-path}) (let [response (-> (spandex/request client {:url (str "_index_template/" template-name)}) (get-in [:body :index_templates 0]))] (is (= template-name (:name response))) @@ -94,8 +94,8 @@ (is (= #{:hot :warm :delete} (set (keys phases)))) (testing "Create timeseries template with ilm policy" - (let [template-name (t/create-timeseries-template client index-name mapping - {:routing-path routing-path + (let [template-name (t/create-or-update-timeseries-template client index-name mapping + {:routing-path routing-path :start-time (time/to-str (time/minus (time/now) (time/duration-unit 20 :hours))) :lifecycle-name ilm-policy-name}) response (-> (spandex/request client {:url (str "_index_template/" template-name)}) @@ -114,7 +114,7 @@ :metric "ram" :ram {:used 0}}) (range 100))] - (t/bulk-insert-metrics client collection-id test-data-last-sec {})) + (t/bulk-insert-timeseries-datapoints client (escu/collection-id->index collection-id) test-data-last-sec {})) (spandex/request client {:url [:_refresh], :method :post}) (let [_response (-> (spandex/request client {:url (str "_data_stream/" index-name)}) (get-in [:body :data_streams])) diff --git a/code/test/sixsq/nuvla/server/resources/nuvlabox_status_2_lifecycle_test.clj b/code/test/sixsq/nuvla/server/resources/nuvlabox_status_2_lifecycle_test.clj index b2e95c65d..a76968055 100644 --- a/code/test/sixsq/nuvla/server/resources/nuvlabox_status_2_lifecycle_test.clj +++ b/code/test/sixsq/nuvla/server/resources/nuvlabox_status_2_lifecycle_test.clj @@ -477,7 +477,7 @@ (let [invalid-format (fn [accept-header] (-> (metrics-request {:accept-header accept-header - :datasets ["cpu-stats"] + :queries ["cpu-stats"] :from (time/minus now (time/duration-unit 1 :days)) :to now :granularity "1-days"}) @@ -489,7 +489,7 @@ (let [metrics-request (fn [accept-header response-content-type] (-> (metrics-request (cond-> - {:datasets ["cpu-stats"] + {:queries ["cpu-stats"] :from (time/minus now (time/duration-unit 1 :days)) :to now :granularity "1-days"} @@ -585,22 +585,22 @@ now (time/now) midnight-today (time/truncated-to-days now) midnight-yesterday (time/truncated-to-days (time/minus now (time/duration-unit 1 :days))) - metrics-request (fn [{:keys [datasets from from-str to to-str granularity custom-es-aggregations accept-header] #_:or #_{accept-header "application/json"}}] + metrics-request (fn [{:keys [queries from from-str to to-str granularity custom-es-aggregations accept-header] #_:or #_{accept-header "application/json"}}] (-> session-nb (content-type "application/x-www-form-urlencoded") (cond-> accept-header (header "accept" accept-header)) (request nuvlabox-data-url :body (rc/form-encode (cond-> - {:dataset datasets - :from (if from (time/to-str from) from-str) - :to (if to (time/to-str to) to-str)} + {:query queries + :from (if from (time/to-str from) from-str) + :to (if to (time/to-str to) to-str)} granularity (assoc :granularity granularity) custom-es-aggregations (assoc :custom-es-aggregations custom-es-aggregations))))))] (testing "new metrics data is added to ts-nuvlaedge time-serie" (let [from (time/minus (time/now) (time/duration-unit 1 :days)) to now - metric-data (-> (metrics-request {:datasets ["cpu-stats" + metric-data (-> (metrics-request {:queries ["cpu-stats" "ram-stats" "disk-stats" "network-stats" @@ -700,7 +700,7 @@ (testing "raw metric data query" (let [from (time/minus (time/now) (time/duration-unit 1 :days)) to now - raw-metric-data (-> (metrics-request {:datasets ["cpu-stats" + raw-metric-data (-> (metrics-request {:queries ["cpu-stats" "ram-stats" "disk-stats" "network-stats" @@ -767,7 +767,7 @@ to now] (testing "custom aggregation on cpu-stats" (let [custom-cpu-agg (-> (metrics-request - {:datasets ["cpu-stats"] + {:queries ["cpu-stats"] :from from :to to :custom-es-aggregations (json/write-str @@ -790,7 +790,7 @@ (:cpu-stats custom-cpu-agg))))) (testing "custom aggregation on disk-stats" (let [custom-cpu-agg (-> (metrics-request - {:datasets ["disk-stats"] + {:queries ["disk-stats"] :from from :to to :custom-es-aggregations (json/write-str @@ -819,45 +819,45 @@ (ltu/body->edn) (ltu/body) :message))] - (is (= "exactly one dataset must be specified with accept header 'text/csv'" + (is (= "exactly one query must be specified with accept header 'text/csv'" (invalid-request {:accept-header "text/csv" - :datasets ["cpu-stats" "network-stats"] + :queries ["cpu-stats" "network-stats"] :from (time/minus now (time/duration-unit 1 :days)) :to now :granularity "1-days"}))) (is (= "from parameter is mandatory, with format iso8601 (uuuu-MM-dd'T'HH:mm:ss[.SSS]Z)" - (invalid-request {:datasets ["cpu-stats"] + (invalid-request {:queries ["cpu-stats"] :granularity "1-days"}))) (is (= "from parameter is mandatory, with format iso8601 (uuuu-MM-dd'T'HH:mm:ss[.SSS]Z)" - (invalid-request {:datasets ["cpu-stats"] + (invalid-request {:queries ["cpu-stats"] :from-str "wrong-datetime" :granularity "1-days"}))) (is (= "to parameter is mandatory, with format iso8601 (uuuu-MM-dd'T'HH:mm:ss[.SSS]Z)" - (invalid-request {:datasets ["cpu-stats"] + (invalid-request {:queries ["cpu-stats"] :from (time/minus now (time/duration-unit 1 :days)) :granularity "1-days"}))) (is (= "to parameter is mandatory, with format iso8601 (uuuu-MM-dd'T'HH:mm:ss[.SSS]Z)" - (invalid-request {:datasets ["cpu-stats"] + (invalid-request {:queries ["cpu-stats"] :from (time/minus now (time/duration-unit 1 :days)) :to-str "wrong-datetime" :granularity "1-days"}))) (is (= "from must be before to" - (invalid-request {:datasets ["cpu-stats"] + (invalid-request {:queries ["cpu-stats"] :from now :to now :granularity "1-days"}))) - (is (= "unknown datasets: invalid-1,invalid-2" - (invalid-request {:datasets ["invalid-1" "cpu-stats" "invalid-2"] + (is (= "unknown queries: invalid-1,invalid-2" + (invalid-request {:queries ["invalid-1" "cpu-stats" "invalid-2"] :from (time/minus now (time/duration-unit 1 :days)) :to now :granularity "1-days"}))) (is (= "unrecognized value for granularity 1-invalid" - (invalid-request {:datasets ["cpu-stats"] + (invalid-request {:queries ["cpu-stats"] :from (time/minus now (time/duration-unit 1 :days)) :to now :granularity "1-invalid"}))) (is (= "too many data points requested. Please restrict the time interval or increase the time granularity." - (invalid-request {:datasets ["cpu-stats"] + (invalid-request {:queries ["cpu-stats"] :from (time/minus now (time/duration-unit 1 :days)) :to now :granularity "1-minutes"}))))) @@ -865,9 +865,9 @@ (testing "csv export of metrics data" (let [from (time/minus now (time/duration-unit 1 :days)) to now - csv-request (fn [dataset granularity] + csv-request (fn [query granularity] (-> (metrics-request {:accept-header "text/csv" - :datasets [dataset] + :queries [query] :from from :to to :granularity granularity}) @@ -959,7 +959,7 @@ (testing "Export with custom es aggregations not allowed" (let [csv-custom-cpu-agg (-> (metrics-request {:accept-header "text/csv" - :datasets ["cpu-stats"] + :queries ["cpu-stats"] :from from :to to :custom-es-aggregations (json/write-str @@ -1017,7 +1017,7 @@ now (time/now) midnight-today (time/truncated-to-days now) midnight-yesterday (time/truncated-to-days (time/minus now (time/duration-unit 1 :days))) - metrics-request (fn [{:keys [datasets from from-str to to-str granularity custom-es-aggregations accept-header]}] + metrics-request (fn [{:keys [queries from from-str to to-str granularity custom-es-aggregations accept-header]}] (-> session-nb (cond-> accept-header (header "accept" accept-header)) (request nuvlabox-data-url @@ -1025,20 +1025,20 @@ :headers {:bulk true} :body (json/write-str (cond-> - {:filter (str "(id='" nuvlabox-id "'" - " or id='" nuvlabox-id-2 "'" - " or id='" nuvlabox-id-3 "'" - " or id='" nuvlabox-id-4 "')") - :dataset datasets - :from (if from (time/to-str from) from-str) - :to (if to (time/to-str to) to-str)} + {:filter (str "(id='" nuvlabox-id "'" + " or id='" nuvlabox-id-2 "'" + " or id='" nuvlabox-id-3 "'" + " or id='" nuvlabox-id-4 "')") + :query queries + :from (if from (time/to-str from) from-str) + :to (if to (time/to-str to) to-str)} granularity (assoc :granularity granularity) custom-es-aggregations (assoc :custom-es-aggregations custom-es-aggregations))))))] (testing "new metrics data is added to ts-nuvlaedge time-serie" (ltu/refresh-es-indices) (let [from (time/minus (time/now) (time/duration-unit 1 :days)) to now - metric-data (-> (metrics-request {:datasets ["cpu-stats" + metric-data (-> (metrics-request {:queries ["cpu-stats" "ram-stats" "disk-stats" "network-stats" @@ -1116,7 +1116,7 @@ (ltu/refresh-es-indices) (let [from (time/minus (time/now) (time/duration-unit 1 :days)) to now - raw-metric-data (-> (metrics-request {:datasets ["cpu-stats" + raw-metric-data (-> (metrics-request {:queries ["cpu-stats" "ram-stats" "disk-stats" "network-stats" @@ -1223,7 +1223,7 @@ to now] (testing "custom aggregation on cpu-stats" (let [custom-cpu-agg (-> (metrics-request - {:datasets ["cpu-stats"] + {:queries ["cpu-stats"] :from from :to to :custom-es-aggregations {:agg1 {:date_histogram @@ -1245,7 +1245,7 @@ (:cpu-stats custom-cpu-agg))))) (testing "custom aggregation on disk-stats" (let [custom-disk-agg (-> (metrics-request - {:datasets ["disk-stats"] + {:queries ["disk-stats"] :from from :to to :custom-es-aggregations {:agg1 {:date_histogram @@ -1273,29 +1273,29 @@ (ltu/body->edn) (ltu/body) :message))] - (is (= "exactly one dataset must be specified with accept header 'text/csv'" + (is (= "exactly one query must be specified with accept header 'text/csv'" (invalid-request {:accept-header "text/csv" - :datasets ["cpu-stats" "network-stats"] + :queries ["cpu-stats" "network-stats"] :from (time/minus now (time/duration-unit 1 :days)) :to now :granularity "1-days"}))) (is (= "from must be before to" - (invalid-request {:datasets ["cpu-stats"] + (invalid-request {:queries ["cpu-stats"] :from now :to now :granularity "1-days"}))) - (is (= "unknown datasets: invalid-1,invalid-2" - (invalid-request {:datasets ["invalid-1" "cpu-stats" "invalid-2"] + (is (= "unknown queries: invalid-1,invalid-2" + (invalid-request {:queries ["invalid-1" "cpu-stats" "invalid-2"] :from (time/minus now (time/duration-unit 1 :days)) :to now :granularity "1-days"}))) (is (= "unrecognized value for granularity 1-invalid" - (invalid-request {:datasets ["cpu-stats"] + (invalid-request {:queries ["cpu-stats"] :from (time/minus now (time/duration-unit 1 :days)) :to now :granularity "1-invalid"}))) (is (= "too many data points requested. Please restrict the time interval or increase the time granularity." - (invalid-request {:datasets ["cpu-stats"] + (invalid-request {:queries ["cpu-stats"] :from (time/minus now (time/duration-unit 1 :days)) :to now :granularity "1-minutes"}))))) @@ -1303,9 +1303,9 @@ (testing "csv export of metrics data" (let [from (time/minus now (time/duration-unit 1 :days)) to now - csv-request (fn [dataset granularity] + csv-request (fn [query granularity] (-> (metrics-request {:accept-header "text/csv" - :datasets [dataset] + :queries [query] :from from :to to :granularity granularity}) @@ -1572,20 +1572,20 @@ (testing "availability data on a single nuvlabox" (let [nuvlabox-data-url (str nuvlabox-url "/data") - metrics-request (fn [{:keys [datasets from from-str to to-str granularity accept-header] #_:or #_{accept-header "application/json"}}] + metrics-request (fn [{:keys [queries from from-str to to-str granularity accept-header] #_:or #_{accept-header "application/json"}}] (-> session-nb (content-type "application/x-www-form-urlencoded") (cond-> accept-header (header "accept" accept-header)) (request nuvlabox-data-url :body (rc/form-encode - {:dataset datasets + {:query queries :from (if from (time/to-str from) from-str) :to (if to (time/to-str to) to-str) :granularity granularity}))))] (testing "from midnight yesterday until now" (let [from midnight-yesterday to now - metric-data (-> (metrics-request {:datasets ["availability-stats"] + metric-data (-> (metrics-request {:queries ["availability-stats"] :from from :to to :granularity "1-days"}) @@ -1618,7 +1618,7 @@ (testing "raw availability data query" (let [from (time/minus now (time/duration-unit 1 :days)) to now - raw-availability-data (-> (metrics-request {:datasets ["availability-stats"] + raw-availability-data (-> (metrics-request {:queries ["availability-stats"] :from from :to to :granularity "raw"}) @@ -1637,9 +1637,9 @@ (testing "csv export of availability data" (let [from midnight-yesterday to now - csv-request (fn [dataset granularity] + csv-request (fn [query granularity] (-> (metrics-request {:accept-header "text/csv" - :datasets [dataset] + :queries [query] :from from :to to :granularity granularity}) @@ -1691,7 +1691,7 @@ nuvlabox-data-url (str p/service-context nb/resource-type "/data") midnight-today (time/truncated-to-days now) midnight-yesterday (time/truncated-to-days (time/minus now (time/duration-unit 1 :days))) - metrics-request (fn [{:keys [datasets from from-str to to-str granularity accept-header]}] + metrics-request (fn [{:keys [queries from from-str to to-str granularity accept-header]}] (-> session-nb (cond-> accept-header (header "accept" accept-header)) (request nuvlabox-data-url @@ -1702,7 +1702,7 @@ " or id='" nuvlabox-id-3 "'" " or id='" nuvlabox-id-4 "'" " or id='" nuvlabox-id-5 "')") - :dataset datasets + :query queries :from (if from (time/to-str from) from-str) :to (if to (time/to-str to) to-str) :granularity granularity}))))] @@ -1710,7 +1710,7 @@ (ltu/refresh-es-indices) (let [from midnight-yesterday to now - metric-data (-> (metrics-request {:datasets ["availability-stats" + metric-data (-> (metrics-request {:queries ["availability-stats" "availability-by-edge"] :from from :to to @@ -1775,7 +1775,7 @@ (testing "raw availability data query" (let [from midnight-yesterday to now - raw-availability-data (-> (metrics-request {:datasets ["availability-stats"] + raw-availability-data (-> (metrics-request {:queries ["availability-stats"] :from from :to to :granularity "raw"}) @@ -1797,9 +1797,9 @@ (:availability-stats raw-availability-data))))) (testing "csv export of availability data" - (let [csv-request (fn [dataset granularity] + (let [csv-request (fn [query granularity] (-> (metrics-request {:accept-header "text/csv" - :datasets [dataset] + :queries [query] :from midnight-yesterday :to now :granularity granularity}) @@ -1855,14 +1855,14 @@ (let [nuvlabox-data-url (str p/service-context nb/resource-type "/data") midnight-today (time/truncated-to-days now) midnight-yesterday (time/truncated-to-days (time/minus now (time/duration-unit 1 :days))) - metrics-request (fn [{:keys [datasets from from-str to to-str granularity accept-header]}] + metrics-request (fn [{:keys [queries from from-str to to-str granularity accept-header]}] (-> session-nb (cond-> accept-header (header "accept" accept-header)) (request nuvlabox-data-url :request-method :patch :headers {:bulk true} :body (json/write-str - {:dataset datasets + {:query queries :from (if from (time/to-str from) from-str) :to (if to (time/to-str to) to-str) :granularity granularity})))) @@ -1873,7 +1873,7 @@ (let [from now-1d to now] (with-redefs [data-utils/query-data-max-time 100] - (-> (metrics-request {:datasets ["availability-stats"] + (-> (metrics-request {:queries ["availability-stats"] :from from :to to :granularity "1-days"}) @@ -1884,7 +1884,7 @@ (testing "availability query performance" (let [[elapsed-time metric-data] (logt/logtime1 - (-> (metrics-request {:datasets ["availability-stats"] + (-> (metrics-request {:queries ["availability-stats"] :from from :to to :granularity "1-days"}) diff --git a/code/test/sixsq/nuvla/server/resources/spec/timeseries_test.cljc b/code/test/sixsq/nuvla/server/resources/spec/timeseries_test.cljc new file mode 100644 index 000000000..358401866 --- /dev/null +++ b/code/test/sixsq/nuvla/server/resources/spec/timeseries_test.cljc @@ -0,0 +1,62 @@ +(ns sixsq.nuvla.server.resources.spec.timeseries-test + (:require + [clojure.test :refer [deftest]] + [sixsq.nuvla.server.resources.timeseries :as t] + [sixsq.nuvla.server.resources.spec.spec-test-utils :as stu] + [sixsq.nuvla.server.resources.spec.timeseries :as timeseries] + [sixsq.nuvla.server.util.time :as time])) + +(def valid-acl {:owners ["group/nuvla-admin"] + :edit-acl ["group/nuvla-admin"]}) + +(deftest check-schema + (let [timestamp (time/now-str) + valid-entry {:id (str t/resource-type "/internal") + :resource-type t/resource-type + :created timestamp + :updated timestamp + :acl valid-acl + :dimensions [{:field-name "test-dimension" + :field-type "keyword"}] + :metrics [{:field-name "test-metric" + :field-type "long" + :metric-type "gauge"} + {:field-name "test-optional-metric" + :field-type "long" + :metric-type "counter" + :optional true}] + :queries [{:query-name "test-metric-avg-query" + :query-type "standard" + :query {:aggregations [{:aggregation-name "test-metric-avg" + :aggregation-type "avg" + :field-name "test-metric"}]}} + {:query-name "test-metric-min-query" + :query-type "standard" + :query {:aggregations [{:aggregation-name "test-metric-min" + :aggregation-type "min" + :field-name "test-metric"}]}} + {:query-name "test-metric-max-query" + :query-type "standard" + :query {:aggregations [{:aggregation-name "test-metric-max" + :aggregation-type "max" + :field-name "test-metric"}]}} + {:query-name "test-metric-multi-query" + :query-type "standard" + :query {:aggregations [{:aggregation-name "test-metric-avg" + :aggregation-type "avg" + :field-name "test-metric"} + {:aggregation-name "test-metric-min" + :aggregation-type "min" + :field-name "test-metric"} + {:aggregation-name "test-metric-max" + :aggregation-type "max" + :field-name "test-metric"}]}}]}] + + (stu/is-valid ::timeseries/schema valid-entry) + (stu/is-invalid ::timeseries/schema (assoc valid-entry :unknown "value")) + + (doseq [attr #{:metrics :dimensions}] + (stu/is-invalid ::timeseries/schema (dissoc valid-entry attr))) + + (doseq [attr #{:queries}] + (stu/is-valid ::timeseries/schema (dissoc valid-entry attr))))) diff --git a/code/test/sixsq/nuvla/server/resources/timeseries_lifecycle_test.clj b/code/test/sixsq/nuvla/server/resources/timeseries_lifecycle_test.clj new file mode 100644 index 000000000..bd50f7424 --- /dev/null +++ b/code/test/sixsq/nuvla/server/resources/timeseries_lifecycle_test.clj @@ -0,0 +1,638 @@ +(ns sixsq.nuvla.server.resources.timeseries-lifecycle-test + (:require + [clojure.data.json :as json] + [clojure.string :as str] + [clojure.test :refer [deftest is testing use-fixtures]] + [peridot.core :refer [content-type header request session]] + [ring.util.codec :as rc] + [sixsq.nuvla.db.es.binding :as es-binding] + [sixsq.nuvla.db.impl :as db] + [sixsq.nuvla.server.app.params :as p] + [sixsq.nuvla.server.middleware.authn-info :refer [authn-info-header]] + [sixsq.nuvla.server.resources.common.utils :as u] + [sixsq.nuvla.server.resources.lifecycle-test-utils :as ltu] + [sixsq.nuvla.server.resources.timeseries :as t] + [sixsq.nuvla.server.resources.timeseries.utils :as tu] + [sixsq.nuvla.server.util.time :as time])) + +(use-fixtures :each ltu/with-test-server-fixture) + +(def base-uri (str p/service-context t/resource-type)) + +(def dimension1 "test-dimension1") + +(def metric1 "test-metric1") +(def metric2 "test-metric2") + +(def query1 "test-query1") +(def query2 "test-query2") +(def query3 "test-query3-invalid") +(def aggregation1 "test-metric1-avg") +(def aggregation2 "test-metric1-value-count") +(def aggregation3 "test-metric1-sum") + +(def valid-entry {:dimensions [{:field-name dimension1 + :field-type "keyword" + :description "description of dimension 1"}] + :metrics [{:field-name metric1 + :field-type "double" + :metric-type "gauge" + :description "description of metric 1"} + {:field-name metric2 + :field-type "long" + :description "description of metric 2" + :metric-type "counter" + :optional true}] + :queries [{:query-name query1 + :query-type "standard" + :description "description of query 1" + :query {:aggregations [{:aggregation-name aggregation1 + :aggregation-type "avg" + :field-name metric1} + {:aggregation-name aggregation2 + :aggregation-type "value_count" + :field-name metric1} + {:aggregation-name aggregation3 + :aggregation-type "sum" + :field-name metric1}]}} + {:query-name query2 + :query-type "custom-es-query" + :description "description of query 2" + :custom-es-query {:aggregations + {:agg1 {:date_histogram + {:field "@timestamp" + :fixed_interval "1d" + :min_doc_count 0} + :aggregations {:custom-agg {:stats {:field metric1}}}}}}} + {:query-name query3 + :query-type "custom-es-query" + :description "invalid query" + :custom-es-query {:aggregations + {:agg1 {:invalid "invalid"}}}}]}) + +(defn create-timeseries + [session entry] + (-> session + (request base-uri + :request-method :post + :body (json/write-str entry)) + (ltu/body->edn) + (ltu/is-status 201) + (ltu/location))) + +(deftest lifecycle + (let [session-anon (-> (ltu/ring-app) + session + (content-type "application/json")) + session-user (header session-anon authn-info-header + "user/jane user/jane group/nuvla-user group/nuvla-anon") + ;; create timeseries + ts-id (create-timeseries session-user valid-entry) + ts-url (str p/service-context ts-id) + ;; retrieve timeseries + ts-response (-> session-user + (request ts-url) + (ltu/body->edn) + (ltu/is-status 200) + (ltu/is-operation-present tu/action-insert)) + ts-resource (ltu/body ts-response) + ts-index (tu/resource-id->timeseries-index ts-id) + insert-op-url (ltu/get-op-url ts-response tu/action-insert) + now (time/now)] + (is (= (assoc valid-entry + :id ts-id + :resource-type "timeseries") + (select-keys ts-resource [:resource-type :id :dimensions :metrics :queries]))) + + (testing "No timeseries is created yet" + (is (thrown? Exception (db/retrieve-timeseries ts-index)))) + + (testing "invalid timeseries creation attempts" + (-> session-user + (request base-uri + :request-method :post + :body (json/write-str (assoc valid-entry :dimensions []))) + (ltu/body->edn) + (ltu/is-status 400)) + + (-> session-user + (request base-uri + :request-method :post + :body (json/write-str (assoc valid-entry :metrics []))) + (ltu/body->edn) + (ltu/is-status 400))) + + (testing "query timeseries" + (let [query-response (-> session-user + (request base-uri) + (ltu/body->edn) + (ltu/is-status 200) + (ltu/is-count 1) + (ltu/body))] + (is (= valid-entry (-> query-response + :resources + first + (select-keys [:dimensions :metrics :queries])))))) + + (testing "insert timeseries datapoint" + (let [datapoint {:timestamp (time/to-str now) + dimension1 "d1-val1" + metric1 3.14 + metric2 1000}] + (testing "datapoint validation error: missing dimensions" + (-> session-user + (request insert-op-url + :request-method :post + :body (json/write-str (dissoc datapoint dimension1))) + (ltu/body->edn) + (ltu/is-status 400) + (ltu/is-key-value :message "missing value for dimensions: test-dimension1"))) + + (testing "datapoint validation error: missing value for mandatory metrics" + (-> session-user + (request insert-op-url + :request-method :post + :body (json/write-str (dissoc datapoint metric1))) + (ltu/body->edn) + (ltu/is-status 400) + (ltu/is-key-value :message "missing value for mandatory metrics: test-metric1"))) + + (testing "datapoint validation error: wrong field type provided" + (-> session-user + (request insert-op-url + :request-method :post + :body (json/write-str (assoc datapoint dimension1 1000))) + (ltu/body->edn) + (ltu/is-status 400) + (ltu/is-key-value :message "a value with the wrong type was provided for field test-dimension1: 1000")) + + (-> session-user + (request insert-op-url + :request-method :post + :body (json/write-str (assoc datapoint metric1 "wrong-type"))) + (ltu/body->edn) + (ltu/is-status 400) + (ltu/is-key-value :message "a value with the wrong type was provided for field test-metric1: wrong-type"))) + + (testing "successful insert" + (-> session-user + (request insert-op-url + :request-method :post + :body (json/write-str datapoint)) + (ltu/body->edn) + (ltu/is-status 201))) + + (testing "timeseries is now created" + (is (some? (db/retrieve-timeseries ts-index)))) + + (testing "insert same datapoint again -> conflict" + (-> session-user + (request insert-op-url + :request-method :post + :body (json/write-str datapoint)) + (ltu/body->edn) + (ltu/is-status 409))) + + (testing "timestamp is not mandatory" + (-> session-user + (request insert-op-url + :request-method :post + :body (json/write-str (dissoc datapoint :timestamp))) + (ltu/body->edn) + (ltu/is-status 201))) + + (testing "optional metrics can be omitted" + (-> session-user + (request insert-op-url + :request-method :post + :body (json/write-str (dissoc datapoint :timestamp metric2))) + (ltu/body->edn) + (ltu/is-status 201))))) + + (testing "bulk insert timeseries datapoints" + (let [datapoints [{:timestamp (time/to-str now) + dimension1 "d1-val2" + metric1 10 + metric2 1} + {:timestamp (time/to-str now) + dimension1 "d1-val3" + metric1 20 + metric2 2}] + bulk-insert-op-url (ltu/get-op-url ts-response tu/action-bulk-insert)] + (testing "missing bulk header" + (-> session-user + (request bulk-insert-op-url + :request-method :post + :body (json/write-str datapoints)) + (ltu/body->edn) + (ltu/is-status 400) + (ltu/is-key-value :message "Bulk request should contain bulk http header."))) + + (testing "Sending data outside insert window should throw 400 error" + (-> session-user + (request bulk-insert-op-url + :headers {"bulk" true} + :request-method :post + :body (json/write-str + (map (fn [entry] + (assoc entry :timestamp + (time/to-str (time/minus now (time/duration-unit 2 :weeks))))) + datapoints))) + (ltu/body->edn) + (ltu/is-status 400)) + + (-> session-user + (request bulk-insert-op-url + :headers {"bulk" true} + :request-method :post + :body (json/write-str + (map (fn [entry] + (assoc entry :timestamp + (time/to-str (time/plus now (time/duration-unit 4 :hours))))) + datapoints))) + (ltu/body->edn) + (ltu/is-status 400))) + + (testing "successful bulk insert" + (-> session-user + (request bulk-insert-op-url + :headers {"bulk" true} + :request-method :post + :body (json/write-str datapoints)) + (ltu/body->edn) + (ltu/is-status 200))) + + (testing "timestamp is not mandatory" + (-> session-user + (request bulk-insert-op-url + :headers {"bulk" true} + :request-method :post + :body (json/write-str (map #(dissoc % :timestamp) datapoints))) + (ltu/body->edn) + (ltu/is-status 200))))) + + (testing "update timeseries" + (let [dimension2 "test-dimension2" + metric3 "test-metric3"] + (testing "removing existing dimensions is not allowed" + (let [nok-entry {:dimensions [{:field-name dimension2 + :field-type "keyword"}] + :metrics [{:field-name metric1 + :field-type "double" + :metric-type "gauge"} + {:field-name metric2 + :field-type "long" + :metric-type "counter" + :optional true}]}] + (-> session-user + (request ts-url + :request-method :put + :body (json/write-str nok-entry)) + (ltu/body->edn) + (ltu/is-status 400) + (ltu/is-key-value :message "dimensions can only be appended")))) + + (testing "removing existing metrics is not allowed" + (let [nok-entry {:dimensions [{:field-name dimension1 + :field-type "keyword"}] + :metrics [{:field-name metric1 + :field-type "double" + :metric-type "gauge"} + {:field-name metric3 + :field-type "double" + :metric-type "gauge"}]}] + (-> session-user + (request ts-url + :request-method :put + :body (json/write-str nok-entry)) + (ltu/body->edn) + (ltu/is-status 400) + (ltu/is-key-value :message "metrics can only be added")))) + + (testing "successful update - additional dimension and additional metric" + (let [updated-entry {:dimensions [{:field-name dimension1 + :field-type "keyword"} + {:field-name dimension2 + :field-type "keyword"}] + :metrics [{:field-name metric1 + :field-type "double" + :metric-type "gauge"} + {:field-name metric2 + :field-type "long" + :metric-type "counter" + :optional true} + {:field-name metric3 + :field-type "double" + :metric-type "gauge"}]}] + (-> session-user + (request ts-url + :request-method :put + :body (json/write-str updated-entry)) + (ltu/body->edn) + (ltu/is-status 200)) + + (testing "check that the timestream mapping has been updated with the new metric" + (let [es-client (ltu/es-client)] + (is (= {:time_series_metric "gauge" + :type "double"} + (-> (es-binding/datastream-mappings es-client ts-index) + (get (keyword metric3))))))) + + (testing "insert datapoint with updated schema" + (let [datapoint {:timestamp (time/now-str) + dimension1 "d1-val1" + dimension2 "d2-val1" + metric1 3.14 + metric2 1000 + metric3 12.34}] + (-> session-user + (request insert-op-url + :request-method :post + :body (json/write-str datapoint)) + (ltu/body->edn) + (ltu/is-status 201)))) + + (testing "changing the order of existing dimensions is not allowed" + (let [nok-entry (assoc updated-entry :dimensions + [{:field-name dimension2 + :field-type "keyword"} + {:field-name dimension1 + :field-type "keyword"}])] + (-> session-user + (request ts-url + :request-method :put + :body (json/write-str nok-entry)) + (ltu/body->edn) + (ltu/is-status 400) + (ltu/is-key-value :message "dimensions can only be appended")))))))) + + (testing "delete timeseries" + (-> session-user + (request ts-url :request-method :delete) + (ltu/body->edn) + (ltu/is-status 200)) + + ;; timeseries meta doc is deleted + (-> session-user + (request ts-url) + (ltu/body->edn) + (ltu/is-status 404)) + + ;; timeseries is also deleted + (is (thrown? Exception (db/retrieve-timeseries ts-index)))))) + +(deftest query + (let [session-anon (-> (ltu/ring-app) + session + (content-type "application/json")) + session-user (header session-anon authn-info-header + "user/jane user/jane group/nuvla-user group/nuvla-anon") + ts-id (create-timeseries session-user valid-entry) + ts-url (str p/service-context ts-id) + ;; retrieve timeseries + ts-response (-> session-user + (request ts-url) + (ltu/body->edn) + (ltu/is-status 200) + (ltu/is-operation-present tu/action-insert)) + bulk-insert-op-url (ltu/get-op-url ts-response tu/action-bulk-insert) + data-op-url (ltu/get-op-url ts-response tu/action-data) + + now (time/now) + now-1h (time/minus now (time/duration-unit 1 :hours)) + d1-val1 "d1q-val1" + d1-val2 "d1q-val2" + datapoints [{:timestamp (time/to-str now-1h) + dimension1 d1-val1 + metric1 10.0 + metric2 1} + {:timestamp (time/to-str now-1h) + dimension1 d1-val2 + metric1 20.0 + metric2 2}]] + + (testing "Query metrics" + (let [midnight-today (time/truncated-to-days now) + midnight-yesterday (time/truncated-to-days (time/minus now (time/duration-unit 1 :days))) + metrics-request (fn [{:keys [dimensions-filters queries from from-str to to-str granularity accept-header]}] + (-> session-user + (content-type "application/x-www-form-urlencoded") + (cond-> accept-header (header "accept" accept-header)) + (request data-op-url + :body (rc/form-encode + (cond-> + {:from (if from (time/to-str from) from-str) + :to (if to (time/to-str to) to-str)} + queries (assoc :query queries) + dimensions-filters (assoc :dimension-filter dimensions-filters) + granularity (assoc :granularity granularity))))))] + + (testing "query before first insert return 404" + (let [from (time/minus now (time/duration-unit 1 :days)) + to now] + (-> (metrics-request {:queries [query1] + :from from + :to to + :granularity "1-days"}) + (ltu/is-status 404)))) + + (testing "successful bulk insert" + (-> session-user + (request bulk-insert-op-url + :headers {"bulk" true} + :request-method :post + :body (json/write-str datapoints)) + (ltu/body->edn) + (ltu/is-status 200))) + + (ltu/refresh-es-indices) + + (testing "basic query" + (let [from (time/minus now (time/duration-unit 1 :days)) + to now + metric-data (-> (metrics-request {:queries [query1] + :from from + :to to + :granularity "1-days"}) + (ltu/is-status 200) + (ltu/body->edn) + (ltu/body))] + (is (= [{:dimensions {(keyword dimension1) "all"} + :ts-data [{:timestamp (time/to-str midnight-yesterday) + :doc-count 0 + :aggregations {(keyword aggregation1) {:value nil} + (keyword aggregation2) {:value 0} + (keyword aggregation3) {:value 0.0}}} + {:timestamp (time/to-str midnight-today) + :doc-count 2 + :aggregations {(keyword aggregation1) {:value 15.0} + (keyword aggregation2) {:value 2} + (keyword aggregation3) {:value 30.0}}}]}] + (get metric-data (keyword query1)))))) + (testing "basic query with dimension filter" + (let [from (time/minus now (time/duration-unit 1 :days)) + to now + metric-data (-> (metrics-request {:dimensions-filters [(str dimension1 "=" d1-val1)] + :queries [query1] + :from from + :to to + :granularity "1-days"}) + (ltu/is-status 200) + (ltu/body->edn) + (ltu/body))] + (is (= [{:dimensions {(keyword dimension1) d1-val1} + :ts-data [{:timestamp (time/to-str midnight-yesterday) + :doc-count 0 + :aggregations {(keyword aggregation1) {:value nil} + (keyword aggregation2) {:value 0} + (keyword aggregation3) {:value 0.0}}} + {:timestamp (time/to-str midnight-today) + :doc-count 1 + :aggregations {(keyword aggregation1) {:value 10.0} + (keyword aggregation2) {:value 1} + (keyword aggregation3) {:value 10.0}}}]}] + (get metric-data (keyword query1)))))) + (testing "basic query with wrong dimension filter" + (let [from (time/minus now (time/duration-unit 1 :days)) + to now] + (-> (metrics-request {:dimensions-filters ["wrong-dimension=w1" + "wrong-dimension=w2"] + :queries [query1] + :from from + :to to + :granularity "1-days"}) + (ltu/body->edn) + (ltu/is-status 400) + (ltu/is-key-value :message "invalid dimensions: wrong-dimension")))) + (testing "raw data with query" + (let [from (time/minus now (time/duration-unit 1 :days)) + to now + metric-data (-> (metrics-request {:queries [query1] + :from from + :to to + :granularity "raw"}) + (ltu/is-status 200) + (ltu/body->edn) + (ltu/body))] + (is (= [{:dimensions {(keyword dimension1) "all"} + :ts-data (set (map #(update-keys % keyword) datapoints))}] + (-> (get metric-data (keyword query1)) + (update-in [0 :ts-data] set)))))) + (testing "raw data without query" + (let [from (time/minus now (time/duration-unit 1 :days)) + to now + metric-data (-> (metrics-request {:from from + :to to + :granularity "raw"}) + (ltu/is-status 200) + (ltu/body->edn) + (ltu/body))] + (is (= {:raw [{:dimensions {(keyword dimension1) "all"} + :ts-data (set (map #(update-keys % keyword) datapoints))}]} + (update-in metric-data [:raw 0 :ts-data] set))))) + + (testing "raw data with dimension filter" + (let [from (time/minus now (time/duration-unit 1 :days)) + to now + metric-data (-> (metrics-request {:dimensions-filters [(str dimension1 "=" d1-val1)] + :from from + :to to + :granularity "raw"}) + (ltu/is-status 200) + (ltu/body->edn) + (ltu/body))] + (is (= {:raw [{:dimensions {(keyword dimension1) d1-val1} + :ts-data (->> datapoints + (filter #(= d1-val1 (get % dimension1))) + (map #(update-keys % keyword)) + set)}]} + (update-in metric-data [:raw 0 :ts-data] set))))) + + (testing "custom es query" + (let [from (time/minus now (time/duration-unit 1 :days)) + to now] + (testing "query with no dimensions filter" + (let [metric-data (-> (metrics-request {:queries [query2] + :from from + :to to}) + (ltu/is-status 200) + (ltu/body->edn) + (ltu/body))] + (is (= [{:dimensions {(keyword dimension1) "all"} + :agg1 [{:timestamp (time/to-str midnight-today) + :doc-count 2 + :aggregations {:custom-agg {:avg 15.0 + :count 2 + :max 20.0 + :min 10.0 + :sum 30.0}}}]}] + (get metric-data (keyword query2)))))) + (testing "query with dimensions filter" + (let [metric-data (-> (metrics-request {:dimensions-filters [(str dimension1 "=" d1-val1)] + :queries [query2] + :from from + :to to}) + (ltu/is-status 200) + (ltu/body->edn) + (ltu/body))] + (is (= [{:dimensions {(keyword dimension1) d1-val1} + :agg1 [{:timestamp (time/to-str midnight-today) + :doc-count 1 + :aggregations {:custom-agg {:avg 10.0 + :count 1 + :max 10.0 + :min 10.0 + :sum 10.0}}}]}] + (get metric-data (keyword query2)))))) + (testing "invalid ES query" + (-> (metrics-request {:queries [query3] + :from from + :to to}) + (ltu/body->edn) + (ltu/is-status 500) + (ltu/is-key-value :message "unexpected error"))))) + + (testing "csv export" + (let [from (time/minus now (time/duration-unit 1 :days)) + to now + csv-request (fn [query granularity] + (-> (metrics-request (cond-> {:accept-header "text/csv" + :from from + :to to + :granularity granularity} + query (assoc :queries [query]))) + (ltu/is-status 200) + (ltu/is-header "Content-Type" "text/csv") + (ltu/is-header "Content-disposition" "attachment;filename=export.csv") + (ltu/body)))] + (testing "Basic query" + (is (= (str "test-dimension1,timestamp,doc-count,test-metric1-avg,test-metric1-value-count,test-metric1-sum\n" + (str/join "," ["all" (time/to-str midnight-yesterday) + 0 nil 0 0]) "\n" + (str/join "," ["all" (time/to-str midnight-today) + 2 15 2 30]) "\n") + (csv-request query1 "1-days")))) + (testing "Export raw data to csv" + (is (= (into #{["timestamp" "test-dimension1" "test-metric1" "test-metric2"]} + (map (fn [{:keys [timestamp] :as datapoint}] + [timestamp + (get datapoint dimension1) + (str (int (get datapoint metric1))) + (str (int (get datapoint metric2)))]) + datapoints)) + (-> (csv-request nil "raw") + (str/split #"\n") + (->> (mapv #(str/split % #","))) + set)))) + (testing "Export with custom queries not allowed" + (-> (metrics-request {:accept-header "text/csv" + :from from + :to to + :queries [query3]}) + (ltu/is-status 400) + (ltu/body->edn) + (ltu/is-key-value :message (str "csv export not supported for query " query3)))))))))) + +(deftest bad-methods + (let [resource-uri (str p/service-context (u/new-resource-id t/resource-type))] + (ltu/verify-405-status [[resource-uri :post]]))) diff --git a/code/test/sixsq/nuvla/server/resources/ts_nuvlaedge_telemetry_lifecycle_test.clj b/code/test/sixsq/nuvla/server/resources/ts_nuvlaedge_telemetry_lifecycle_test.clj index ce3c1c703..a16f37936 100644 --- a/code/test/sixsq/nuvla/server/resources/ts_nuvlaedge_telemetry_lifecycle_test.clj +++ b/code/test/sixsq/nuvla/server/resources/ts_nuvlaedge_telemetry_lifecycle_test.clj @@ -106,7 +106,7 @@ :request-method :patch :body (json/write-str conflicting-entries)) (ltu/body->edn) - (ltu/is-status 400)))))) + (ltu/is-status 409)))))) (deftest query-ram (let [session-anon (-> (ltu/ring-app)