Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 62 additions & 32 deletions lens/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@

import logging
import time
from functools import wraps
from functools import wraps, reduce
from operator import add
from collections import Counter

from tdigest import TDigest
import numpy as np
from scipy import stats
from scipy import signal
import pandas as pd
import dask
from dask.delayed import delayed

from .utils import hierarchical_ordering_indices

Expand All @@ -22,6 +26,13 @@
logger.addHandler(logging.StreamHandler())


@delayed
def _tdigest_partition(values, delta):
digest = TDigest(delta)
digest.batch_update(values)
return digest


def timeit(func):
"""Decorator to time callable execution and add it to the report.

Expand Down Expand Up @@ -86,15 +97,15 @@ def column_properties(series):
name = series.name
colresult = {}
colresult["dtype"] = str(series.dtype)
nulls = series.isnull().sum()
nulls = series.isnull().sum().compute()
colresult["nulls"] = int(nulls) if not np.isnan(nulls) else 0
notnulls = series.dropna()

colresult["notnulls"] = len(notnulls.index)
colresult["notnulls"] = int(notnulls.index.size.compute())
colresult["numeric"] = (
series.dtype in [np.float64, np.int64] and colresult["notnulls"] > 0
)
unique = notnulls.unique().size
unique = int(notnulls.unique().size.compute())
colresult["unique"] = unique
colresult["is_categorical"] = False
if (
Expand Down Expand Up @@ -254,7 +265,7 @@ def column_summary(series, column_props, delta=0.01):

Parameters
----------
series : pd.Series
series : dask.dataframe.Series
Numeric column.
column_props : TODO
TODO
Expand All @@ -272,33 +283,41 @@ def column_summary(series, column_props, delta=0.01):

logger.debug("column_summary - " + col)

# select non-nulls from column
data = series.dropna()
# select non-nulls from column and convert to dask array
data = series.dropna().map_partitions(np.asarray)

colresult = {}
for m in ["mean", "min", "max", "std", "sum"]:
val = getattr(data, m)()
if type(val) is np.int64:
colresult[m] = int(val)
else:
colresult[m] = val
delayed_colresult = {}
for m in ["std", "sum"]:
delayed_colresult[m] = getattr(data, m)()

[colresult] = dask.compute(delayed_colresult)

colresult["n"] = column_props[col]["notnulls"]

# Compute the t-digest.
logger.debug("column_summary - {} - creating TDigest...".format(col))

digest = delayed(reduce)(
add,
[
_tdigest_partition(partition, delta)
for partition in data.to_delayed()
],
).compute()

percentiles = [0.1, 1, 10, 25, 50, 75, 90, 99, 99.9]
colresult["percentiles"] = {
perc: np.nanpercentile(series, perc) for perc in percentiles
perc: digest.percentile(perc) for perc in percentiles
}
colresult["min"] = digest.percentile(0)
colresult["max"] = digest.percentile(100)
colresult["mean"] = digest.trimmed_mean(0, 100)
colresult["median"] = colresult["percentiles"][50]

colresult["iqr"] = (
colresult["percentiles"][75] - colresult["percentiles"][25]
)

# Compute the t-digest.
logger.debug("column_summary - {} - creating TDigest...".format(col))
digest = TDigest(delta)
digest.batch_update(data)

logger.debug("column_summary - {} - testing log trans...".format(col))
try:
colresult["logtrans"] = bool(_test_logtrans(digest))
Expand Down Expand Up @@ -386,7 +405,7 @@ def _compute_histogram_from_frequencies(series):
counts, edges:
Histogram bin edges and counts in each bin.
"""
freqs = _compute_frequencies(series)
[freqs] = dask.compute(_compute_frequencies(series))
categories = sorted(freqs.keys())
diffs = list(np.diff(categories)) + [1]
edges = [categories[0] - 0.5]
Expand All @@ -402,27 +421,38 @@ def _compute_histogram_from_frequencies(series):
return np.array(counts), np.array(edges)


def _compute_frequencies(series):
@delayed
def _compute_chunk_counter(array, dtype):
keys, counts = np.unique(array, return_counts=True)

if dtype == np.int64:
keys = [int(key) for key in keys]
elif dtype == np.float64:
keys = [float(key) for key in keys]

return Counter(dict(zip(keys, counts)))


def _compute_frequencies(array):
"""Helper to compute frequencies of a categorical column

Parameters
----------
series : pd.Series
array : dask.Array
Categorical column.a

Returns
-------
dict:
Dictionary from category name to count.
"""
freqs = series.value_counts()
if freqs.index.dtype == np.int64:
categories = [int(index) for index in freqs.index]
elif freqs.index.dtype == np.float64:
categories = [float(index) for index in freqs.index]
else:
categories = freqs.index
return dict(zip(categories, freqs.values.tolist()))
return delayed(reduce)(
add,
[
_compute_chunk_counter(chunk, array.dtype)
for chunk in array.to_delayed()
],
)


@timeit
Expand All @@ -444,7 +474,7 @@ def frequencies(series, column_props):

if column_props[name]["is_categorical"]:
logger.debug("frequencies - " + series.name)
freqs = _compute_frequencies(series)
freqs = _compute_frequencies(series.dropna().values)
return {name: freqs, "_columns": [name]}
else:
return None
Expand Down
14 changes: 5 additions & 9 deletions tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import numpy as np
import pandas as pd
import dask.dataframe as dd
from scipy import stats
import pytest

Expand Down Expand Up @@ -60,7 +61,7 @@ def df(request):

df.to_csv(dirname + "/test_results/test_data.csv", index=False)

return df
return dd.from_pandas(df, npartitions=4)


def gen_categoricalint_with_no_twos(nrows):
Expand All @@ -78,7 +79,7 @@ def gen_poisson_distributed_categorical_data(ncategories, size):
np.random.poisson(ncategories / 2.0) for i in range(size)
]
truncated_random_samples = [
max(min(0, sample), ncategories - 1) for sample in random_samples
min(max(0, sample), ncategories - 1) for sample in random_samples
]
sampled_categories = [
categories[sample] for sample in truncated_random_samples
Expand All @@ -91,13 +92,8 @@ def gen_uniformly_distributed_categorical_data(ncategories, size):
str(i) + "".join(random.sample(string.ascii_letters, 4))
for i in range(ncategories)
]
random_samples = np.random.randint(0, len(categories), size=size)
truncated_random_samples = [
max(min(0, sample), ncategories - 1) for sample in random_samples
]
sampled_categories = [
categories[sample] for sample in truncated_random_samples
]
random_samples = np.random.randint(0, ncategories, size=size)
sampled_categories = [categories[sample] for sample in random_samples]
return sampled_categories


Expand Down
8 changes: 0 additions & 8 deletions tests/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,15 +1,7 @@
boto3
cloudpickle
dask
flake8
ipywidgets
matplotlib
numpy
pandas
patsy
plotly
pytest
s3fs
scipy
statsmodels
toolz
18 changes: 12 additions & 6 deletions tests/test_summarise.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import numpy as np
import pandas as pd
import dask

import pytest

Expand Down Expand Up @@ -212,21 +213,26 @@ def test_dask_outliers(df, column_summary):

@pytest.fixture(scope="module")
def frequencies(df, column_properties):
return {
col: metrics.frequencies(df[col], column_properties[col])
for col in df.columns
}
[freqs] = dask.compute(
{
col: metrics.frequencies(df[col], column_properties[col])
for col in df.columns
}
)
return freqs


def test_dask_frequencies(df, frequencies):
def test_dask_frequencies(df, column_properties, frequencies):
for col in frequencies.keys():
freq_report = frequencies[col]
if freq_report is None:
continue
else:
freq_report = freq_report[col]

freqs = df[col].value_counts().to_dict()
freqs = df[col].compute().value_counts().to_dict()

assert len(freq_report) == column_properties[col][col]["unique"]

for k in freqs.keys():
assert freqs[k] == freq_report[k]
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tox]
envlist = py27, py36
envlist = py27, py36, py37
toxworkdir = {env:TOX_WORK_DIR:.tox}
[testenv]
sitepackages = False
Expand Down