Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
:min-lein-version "2.5.0"

:dependencies
[[com.databend/databend-jdbc "0.4.2"]
[[com.databend/databend-jdbc "0.4.6"]
[clojure.java-time "0.3.2"]
[org.clojure/clojure "1.11.1"]
]

:repositories [["snapshots" {:sign-releases false
Expand All @@ -17,12 +18,14 @@
["project" "file:repo"]]

:aliases
{"test" ["with-profile" "test"]}
{"test" ["with-profile" "+provided" "test"
"metabase.driver.databend-describe-fields-test"]
"test-integration" ["with-profile" "+provided" "test"]}


:profiles {:provided
{:dependencies [[com.databend/metabase-core "1.40"]]}
:uberjar {:aot :all
{:dependencies [[com.databend/metabase-core "0.61.3"]]}
:uberjar {:aot [metabase.driver.databend]
:auto-clean true
:target-path "target/%s"
:uberjar-name "databend.metabase-driver.jar"
Expand Down
115 changes: 94 additions & 21 deletions src/metabase/driver/databend.clj
Original file line number Diff line number Diff line change
Expand Up @@ -182,24 +182,100 @@
(get-tables-in-db metadata (get-db-name db)))]
{:tables tables})))

(defmethod sql-jdbc.sync/database-type->base-type :databend [_ database-type]
(database-type->base-type database-type))

(defn- describe-table-fields-via-sql
; DatabaseMetaData.getColumns() returns no rows against Databend, so query
; information_schema.columns directly for reliable field metadata.
[database {:keys [name schema]}]
(let [db-name (or (not-empty schema) (get-in database [:details :dbname] "default"))
spec (sql-jdbc.conn/connection-details->spec :databend (:details database))]
(try
(sql-jdbc.execute/do-with-connection-with-options
:databend spec nil
(fn [^Connection conn]
(let [sql (str "SELECT column_name, data_type, ordinal_position, is_nullable"
" FROM information_schema.columns"
" WHERE table_schema = ? AND table_name = ?"
" ORDER BY ordinal_position")
stmt (doto (.prepareStatement conn sql)
(.setString 1 db-name)
(.setString 2 name))
rset (.executeQuery stmt)]
(loop [fields #{}]
(if (.next rset)
(let [col-name (.getString rset "column_name")
data-type (.getString rset "data_type")
ord-pos (.getInt rset "ordinal_position")
is-null (.getString rset "is_nullable")
safe-type (or data-type "")
upper-type (str/upper-case safe-type)]
(if (or (str/blank? safe-type)
(re-matches #"(?i)^AggregateFunction\(.+$" safe-type))
(recur fields)
(recur (conj fields
{:name col-name
:database-type safe-type
:base-type (or (sql-jdbc.sync/database-type->base-type :databend upper-type)
:type/*)
:database-position (dec ord-pos)
:nullable? (= "YES" is-null)}))))
fields)))))
(catch Exception e
(log/error e "describe-table-fields-via-sql failed for" name "in schema" db-name)
#{}))))

(defmethod driver/describe-table :databend
[_ database table]
(let [table-metadata (sql-jdbc.sync/describe-table :databend database table)
filtered-fields (for [field (:fields table-metadata)
:let [updated-field
(update-in field [:database-type]
;; Enum8(UInt8) -> Enum8
clojure.string/replace #"^(Enum.+)\(.+\)" "$1")]
;; Skip all AggregateFunction (but keeping SimpleAggregateFunction) columns
;; JDBC does not support that and it crashes the data browser
:when (not (re-matches #"^AggregateFunction\(.+$"
(get field :database-type)))]
updated-field)]
(merge table-metadata {:fields (set filtered-fields)})))
[_ database table]
{:name (:name table)
:schema (:schema table)
:fields (describe-table-fields-via-sql database table)})

(defmethod driver/describe-fields :databend
; Metabase v0.49+ uses describe-fields instead of describe-table for sync-fields.
; The sql-jdbc default calls getColumns() which returns nothing for Databend, so
; we override here to query information_schema.columns directly.
[_driver database & {:keys [schema-names table-names]}]
; Early exit if caller explicitly requested an empty set of schemas or tables.
(if (or (and schema-names (empty? schema-names))
(and table-names (empty? table-names)))
[]
(let [; Metabase stores schema as "" when none is set; treat blank the same as nil
; and fall back to the dbname from connection details (e.g. "gold").
db-name (or (first (filter #(not (str/blank? %)) schema-names))
(get-in database [:details :dbname] "default"))
spec (sql-jdbc.conn/connection-details->spec :databend (:details database))
tbl-clause (when (seq table-names)
(str " AND table_name IN ("
(str/join "," (repeat (count table-names) "?"))
")"))
sql (str "SELECT table_schema, table_name, column_name, data_type,"
" ordinal_position, is_nullable"
" FROM information_schema.columns"
" WHERE table_schema = ?"
(or tbl-clause "")
" ORDER BY table_name, ordinal_position")
params (into [sql db-name] (when (seq table-names) table-names))]
(try
(let [rows (jdbc/query spec params)]
; NOTE: return table-schema as nil. Databend's JDBC getTables() returns TABLE_SCHEM=""
; so Metabase stores schema=NULL in Postgres (JSON API serializes it as ""). Returning
; nil here causes Metabase to query WHERE schema IS NULL, which matches correctly.
(vec (for [{:keys [table_name column_name data_type ordinal_position is_nullable]} rows
:let [safe-type (or data_type "")
upper-type (str/upper-case safe-type)]
:when (not (or (str/blank? safe-type)
(re-matches #"(?i)^AggregateFunction\(.+$" safe-type)))]
{:table-schema nil
:table-name table_name
:name column_name
:database-type safe-type
:base-type (or (sql-jdbc.sync/database-type->base-type :databend upper-type)
:type/*)
:database-position (dec (int ordinal_position))
:database-is-nullable (= "YES" is_nullable)
:pk? false})))
(catch Exception e
(log/error e "describe-fields :databend failed for schema" db-name)
[])))))

(defn- to-start-of-year
[expr]
Expand Down Expand Up @@ -376,10 +452,6 @@
(.getLong rs i)
(.getBigDecimal rs i))))

; Map databend data types to base types
(defmethod sql-jdbc.sync/database-type->base-type :databend [_ database-type]
(database-type->base-type database-type))

; Concatenate the elements of an array based on array elemets type (coverting array data type to string type to apply filter on array data)
(defn is-string-array? [os]
(if (= (type (first (vec os))) java.lang.String) (str "['" (clojure.string/join "','" os) "']") (str "[" (clojure.string/join "," os) "]")))
Expand All @@ -402,7 +474,8 @@
:connection-impersonation false
:schemas true
:datetime-diff true
:upload-with-auto-pk false}]
:upload-with-auto-pk false
:describe-fields true}]

(defmethod driver/database-supports? [:databend feature] [_driver _feature _db] supported?))

Expand Down
127 changes: 127 additions & 0 deletions test/metabase/driver/databend_describe_fields_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
(ns metabase.driver.databend-describe-fields-test
"Unit tests for driver/describe-fields :databend.
These run without a live Databend server or the Metabase test framework."
(:require [clojure.java.jdbc :as jdbc]
[clojure.test :refer :all]
[metabase.driver :as driver]
metabase.driver.databend
[metabase.driver.sql-jdbc.connection :as sql-jdbc.conn]))

(deftest describe-fields-early-exit
(testing "returns [] when schema-names is an empty collection"
(is (= [] (driver/describe-fields :databend {} :schema-names []))))
(testing "returns [] when table-names is an empty collection"
(is (= [] (driver/describe-fields :databend {} :table-names [])))))

(deftest describe-fields-db-name-fallback
(testing "uses :details :dbname when schema-names is not passed"
(let [captured-params (atom nil)]
(with-redefs [sql-jdbc.conn/connection-details->spec (fn [_ _] {})
jdbc/query (fn [_spec params] (reset! captured-params params) [])]
(driver/describe-fields :databend {:details {:dbname "gold"}})
(is (= "gold" (second @captured-params))))))
(testing "uses :details :dbname when schema-names contains only blank/nil values"
(let [captured-params (atom nil)]
(with-redefs [sql-jdbc.conn/connection-details->spec (fn [_ _] {})
jdbc/query (fn [_spec params] (reset! captured-params params) [])]
(driver/describe-fields :databend {:details {:dbname "gold"}} :schema-names ["" nil])
(is (= "gold" (second @captured-params))))))
(testing "uses first non-blank schema-name when provided"
(let [captured-params (atom nil)]
(with-redefs [sql-jdbc.conn/connection-details->spec (fn [_ _] {})
jdbc/query (fn [_spec params] (reset! captured-params params) [])]
(driver/describe-fields :databend {:details {:dbname "other"}} :schema-names ["gold"])
(is (= "gold" (second @captured-params)))))))

(deftest describe-fields-field-shape
(testing "returns field maps with :table-schema nil and correct shape"
(with-redefs [sql-jdbc.conn/connection-details->spec (fn [_ _] {})
jdbc/query (fn [_spec _params]
[{:table_name "sfdc_pipeline" :column_name "stage"
:data_type "varchar" :ordinal_position 1 :is_nullable "YES"}
{:table_name "sfdc_pipeline" :column_name "amount"
:data_type "double" :ordinal_position 2 :is_nullable "NO"}])]
(let [result (driver/describe-fields :databend {:details {:dbname "gold"}})]
(is (= 2 (count result)))
(is (every? #(nil? (:table-schema %)) result)
":table-schema must be nil — Metabase stores schema=NULL in Postgres and matches via WHERE schema IS NULL")
(is (every? #(false? (:pk? %)) result))
(is (= {:table-schema nil
:table-name "sfdc_pipeline"
:name "stage"
:database-type "varchar"
:base-type :type/Text
:database-position 0
:database-is-nullable true
:pk? false}
(first (filter #(= "stage" (:name %)) result))))
(is (= {:table-schema nil
:table-name "sfdc_pipeline"
:name "amount"
:database-type "double"
:base-type :type/Float
:database-position 1
:database-is-nullable false
:pk? false}
(first (filter #(= "amount" (:name %)) result))))))))

(deftest describe-fields-type-filtering
(testing "excludes AggregateFunction columns"
(with-redefs [sql-jdbc.conn/connection-details->spec (fn [_ _] {})
jdbc/query (fn [_spec _params]
[{:table_name "t" :column_name "agg_col"
:data_type "AggregateFunction(sum, UInt64)"
:ordinal_position 1 :is_nullable "YES"}
{:table_name "t" :column_name "normal_col"
:data_type "varchar" :ordinal_position 2 :is_nullable "YES"}])]
(let [result (driver/describe-fields :databend {:details {:dbname "gold"}})]
(is (= 1 (count result)))
(is (= "normal_col" (:name (first result)))))))
(testing "excludes rows with nil data_type"
(with-redefs [sql-jdbc.conn/connection-details->spec (fn [_ _] {})
jdbc/query (fn [_spec _params]
[{:table_name "t" :column_name "nil_col"
:data_type nil :ordinal_position 1 :is_nullable "YES"}
{:table_name "t" :column_name "good_col"
:data_type "bigint" :ordinal_position 2 :is_nullable "YES"}])]
(let [result (driver/describe-fields :databend {:details {:dbname "gold"}})]
(is (= 1 (count result)))
(is (= "good_col" (:name (first result))))))))

(deftest describe-fields-table-names-filter
(testing "passes table names as IN-clause params when table-names provided"
(let [captured-params (atom nil)]
(with-redefs [sql-jdbc.conn/connection-details->spec (fn [_ _] {})
jdbc/query (fn [_spec params] (reset! captured-params params) [])]
(driver/describe-fields :databend {:details {:dbname "gold"}}
:schema-names ["gold"]
:table-names ["orders" "customers"])
(let [params @captured-params]
(is (= 4 (count params)) "sql + schema + 2 table-names = 4 params")
(is (= "gold" (nth params 1)))
(is (= "orders" (nth params 2)))
(is (= "customers" (nth params 3)))))))
(testing "omits table-name params when table-names is nil"
(let [captured-params (atom nil)]
(with-redefs [sql-jdbc.conn/connection-details->spec (fn [_ _] {})
jdbc/query (fn [_spec params] (reset! captured-params params) [])]
(driver/describe-fields :databend {:details {:dbname "gold"}} :schema-names ["gold"])
(is (= 2 (count @captured-params)) "sql + schema only = 2 params")
(is (= "gold" (nth @captured-params 1)))))))

(deftest describe-fields-database-supports
(testing ":describe-fields feature returns true so Metabase routes sync through our implementation"
(is (true? (driver/database-supports? :databend :describe-fields nil)))))

(deftest describe-fields-exception-handling
(testing "returns [] and does not rethrow when jdbc/query throws"
(with-redefs [sql-jdbc.conn/connection-details->spec (fn [_ _] {})
jdbc/query (fn [_spec _params] (throw (Exception. "connection refused")))]
(let [result (driver/describe-fields :databend {:details {:dbname "gold"}})]
(is (= [] result)))))
(testing "falls back to :details :dbname = \"default\" when :dbname key is absent"
(let [captured-params (atom nil)]
(with-redefs [sql-jdbc.conn/connection-details->spec (fn [_ _] {})
jdbc/query (fn [_spec params] (reset! captured-params params) [])]
(driver/describe-fields :databend {:details {}})
(is (= "default" (second @captured-params)))))))
6 changes: 1 addition & 5 deletions test/metabase/driver/databend_test.clj
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
(ns metabase.driver.databend-test
"Tests for specific behavior of the Databend driver."
#_{:clj-kondo/ignore [:unsorted-required-namespaces]}
(:require [cljc.java-time.format.date-time-formatter :as date-time-formatter]
[cljc.java-time.local-date :as local-date]
[cljc.java-time.offset-date-time :as offset-date-time]
[cljc.java-time.temporal.chrono-unit :as chrono-unit]
[clojure.test :refer :all]
(:require [clojure.test :refer :all]
[metabase.driver :as driver]
[metabase.driver.common :as driver.common]
[metabase.driver.sql-jdbc.connection :as sql-jdbc.conn]
Expand Down