Skip to content
This repository was archived by the owner on Mar 12, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
5dd6db3
Add `LowLevelConnection`
Sh4pe Feb 9, 2019
4585a32
Make tests involving temp directories compatible with Python 2
Sh4pe Feb 9, 2019
37b37a3
Add class InsertTable
Sh4pe Feb 10, 2019
d81e590
Merge branch 'master' into storage_class
Sh4pe Feb 13, 2019
78c9018
Merge branch 'pytest' into storage_class
Sh4pe Feb 13, 2019
95c21eb
Merge branch 'pytest' into storage_class
Sh4pe Feb 15, 2019
d0b6214
Remove `dummy_test` from `unittest.py`
Sh4pe Feb 15, 2019
ae54a94
Merge branch 'master' into storage_class
Sh4pe Feb 17, 2019
0cd15c0
Adapt imports in `database_test.py`
Sh4pe Feb 17, 2019
656ac1f
Use DEFERRED isolation level in `LowLevelConnection`
Sh4pe Feb 17, 2019
4709f40
Ingore *.pyc
Sh4pe Feb 17, 2019
8db7fbf
Setup database layout with connection as context manager
Sh4pe Feb 17, 2019
9030725
Let `unittest` generate the database_test testsuite
Sh4pe Feb 17, 2019
6619cc9
Remove reference to `dummy_test`
Sh4pe Feb 17, 2019
35cfacb
Ensure that `LowLevelConnection.__init__` is idempotent
Sh4pe Feb 17, 2019
f6f80a8
Factor out COLUMNS to `definitions.py`
Sh4pe Feb 17, 2019
1b819ef
Merge branch 'factor_out_definitions' into storage_class
Sh4pe Feb 17, 2019
044fea6
Get definition of columns for InsertTable from definitions.COLUMNS
Sh4pe Feb 25, 2019
ede1b3a
Remove database_test.test_suite
Sh4pe Feb 25, 2019
fb71345
Add test strategies for generating transactions dataframes
Sh4pe Mar 4, 2019
df2ee62
Fix bug and improve performance of transactions strategy
Sh4pe Mar 5, 2019
f73abc8
Add stub for StorageClass and its tests
Sh4pe Mar 12, 2019
cc63547
basic tests for storage facade
fa-me Mar 15, 2019
2bc286d
requirements dev
fa-me May 22, 2019
d08aa56
hash functions for dataframe
fa-me May 22, 2019
533cb9b
extended dataframe hash util tests
fa-me May 22, 2019
663c329
started implementing storage with additional hash column for duplicat…
fa-me May 22, 2019
c2ac19f
updated dash response tests
fa-me May 26, 2019
1389466
removed py27 backwards compatibility for temp dir
fa-me May 26, 2019
ae801bc
rename id to row_key as primary key
fa-me May 26, 2019
74cb304
added id column in strategy dataframes
fa-me May 26, 2019
f8cac63
fixed tests for database, id as key column
fa-me May 26, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ __pycache__
/.vscode
docs/graphs/png/*.png
.doit.db.*
pynance/*.pyc
*.pyc
.pytest_cache
/.hypothesis
.coverage
4 changes: 2 additions & 2 deletions pynance/dash_viz/plot_flow_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def test_onselect_csvtype(self):

for expected, selected in zip(onselect_response, dropdown_values):
response = onselect_csvtype(selected)
response_dict = json.loads(response.data.decode())
response_dict = json.loads(response) # .data.decode())
is_enabled = not response_dict["response"]["props"]["disabled"]

self.assertEqual(expected, is_enabled)
Expand Down Expand Up @@ -140,7 +140,7 @@ def test_update_output(self):
bytestr = self._read_sample_file_like_uploaded()

response = update_output(bytestr, "DKBCash")
response_dict = json.loads(response.data.decode())
response_dict = json.loads(response) # .data.decode())

res_charts = response_dict["response"]["props"]["figure"]["data"]

Expand Down
204 changes: 204 additions & 0 deletions pynance/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
"""
Explain the classes briefly. Elaborate on Storage
"""

import sqlite3
import numpy as np
from .definitions import COLUMNS


def exists_table(conn, table_name):
"""
Returns True if and only if 'table_name' is an existing table.
"""

result = conn.execute(
'select count(*) from sqlite_master where type="table" and name="{}"'.format(table_name)
).fetchall()
return result[0][0] == 1


def generate_sqlite_columns_definitions():
"""
Converts definitions.COLUMNS into the column definitions of a sqlite table. By column definitions,
we mean the part of a CREATE TABLE statement that defines the columns:

CREATE TABLE my_table_name (<column definitions here>)

Returns the column definitions as string
"""

type_lookup_dict = {
str: 'TEXT',
np.datetime64: 'TEXT',
np.float64: 'REAL'
}

def name_type_to_string(x):
col_name, col_type = x
if col_type not in type_lookup_dict:
raise ValueError(
"Don't know which sqlite type '{}' is".format(col_type))
return '{} {}'.format(col_name, type_lookup_dict[col_type])

return ', '.join(map(name_type_to_string, COLUMNS.items()))


class LowLevelConnection(object):
"""
Class that handles low-level database connection. Makes sure the expected table strucutre exists.
Should be used in with-statements.
"""

# Schema evolution should be handled later once it is needed
SUPPORTED_SCHEMA_VERSIONS = [1]

TABLE_SCHEMA_VERSION = 'schema'
TABLE_TRANSACTIONS = 'transactions'
ID_COLUMN = 'id'

def _get_db_conn(self):
"""
Get the connection to the sqlite database. We use the 'DEFERRED' isolation level. This
is the default in Python 3 anyways, in Python 2 the default is autocommit mode. The DEFERRED
isolation level seems appropriate in this case. See also
* https://www.sqlite.org/lang_transaction.html
"""
return sqlite3.connect(
self.db_file_name,
isolation_level='DEFERRED'
)

def __init__(self, schema_version, db_file_name):
"""
Parameters:
* `schema_version`: Integer denoting the schema version.
* `db_file_name`: This DB file will be created if it does not yet exist.
"""
assert schema_version in LowLevelConnection.SUPPORTED_SCHEMA_VERSIONS
self.db_file_name = db_file_name

connection = self._get_db_conn()
with connection:
if not exists_table(connection, LowLevelConnection.TABLE_SCHEMA_VERSION):
connection.execute('CREATE TABLE IF NOT EXISTS {} (version INTEGER)'.format(
LowLevelConnection.TABLE_SCHEMA_VERSION))
connection.execute('INSERT INTO {} VALUES (1)'.format(
LowLevelConnection.TABLE_SCHEMA_VERSION))

if not exists_table(connection, LowLevelConnection.TABLE_TRANSACTIONS):
connection.execute('CREATE TABLE IF NOT EXISTS {} ({})'.format(
LowLevelConnection.TABLE_TRANSACTIONS,
generate_sqlite_columns_definitions()
))
connection.execute('CREATE INDEX date_index ON {} ({})'.format(
LowLevelConnection.TABLE_TRANSACTIONS, 'date'))
connection.execute('CREATE INDEX id ON {} ({})'.format(
LowLevelConnection.TABLE_TRANSACTIONS, LowLevelConnection.ID_COLUMN))

def __enter__(self):
self.conn = self._get_db_conn()
return self.conn

def __exit__(self, _1, _2, _3):
self.conn.close()


class InsertTable(object):
"""
This class makes sure that a DataFrame is inserted into a temporary table of a sqlite databases.
It also makes sure that the temporary table is created in a safe way and disposed afterwards. For
this purpuse, instances of this class should be used in with statements.
"""

@staticmethod
def create_temp_table(conn):
"""Creates temporary table suitable for inserting the DataFrame and returns its name."""

cursor = conn.cursor()
i, table_name, go_on = 0, '', True

while go_on:
go_on = False
table_name = 'insert_df_{}'.format(i)
try:
cursor.execute('CREATE TEMPORARY TABLE {} ({})'.format(
table_name,
generate_sqlite_columns_definitions()
))
except sqlite3.OperationalError:
go_on = True
i += 1

return 'temp', table_name

def __init__(self, conn, data_frame):
"uses conn, fetches everything from 'data_frame' into a temporary table"

self.conn = conn
self.temp_table_schema, self.temp_table_name = InsertTable.create_temp_table(
conn)
data_frame.to_sql(
name=self.temp_table_name,
schema=self.temp_table_schema,
index=False,
con=conn,
chunksize=5000
)

def __enter__(self):
return (self.temp_table_schema, self.temp_table_name)

def __exit__(self, _1, _2, _3):
"Make sure the table is gone."
self.conn.cursor().execute('DROP TABLE {}.{}'.format(
self.temp_table_schema, self.temp_table_name
))


class Storage(object):

def __init__(self, db_file):
self.db_file = db_file

@classmethod
def validate_dataframe_shape(cls, data_frame):
"""
asserts that the correct columns are present.
Tolerates that additional columns are present
"""
return True

def append_dataframe(self, data_frame):
"""
asserts that the shape of the dataframe is correct
returns the part of the dataframe that is new. This part has also an ID column
"""
if not self.validate_dataframe_shape(data_frame):
raise Exception('Invalid dataframe')

with LowLevelConnection(1, self.db_file) as conn:
with InsertTable(conn, data_frame) as insert_table:
# add existing data to insert_table
with conn:
column_keys = COLUMNS.keys()
columns_str = ','.join(column_keys)
conn.cursor().execute(
'''
INSERT INTO %s
SELECT %s
FROM %s
ON CONFLICT (%s) DO NOTHING
''' % (insert_table,
columns_str,
LowLevelConnection.TABLE_TRANSACTIONS,
LowLevelConnection.ID_COLUMN))
conn.close()
# but only non-duplicates
# replace existing table by insert_table

def load_dataframe(self):
"""
loads from db. contains ID column
"""
pass
163 changes: 163 additions & 0 deletions pynance/database_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import unittest
import os.path
import shutil
from tempfile import TemporaryDirectory, TemporaryFile
import sqlite3

from pynance.database import generate_sqlite_columns_definitions, \
LowLevelConnection, InsertTable
from pynance.textimporter import read_csv
from pynance.dkb import SupportedCsvTypes


class ColumnsDefinitionsTestCase(unittest.TestCase):
def test_it_produces_valid_string(self):
result = generate_sqlite_columns_definitions()
self.assertEqual(type(result), str)
self.assertTrue(len(result) > 0)

def test_it_produces_valid_sql_types(self):
with TemporaryDirectory() as tmp_dir:
tmp_file = os.path.join(tmp_dir, 'test.db')
conn = sqlite3.connect(tmp_file)
column_definitions = generate_sqlite_columns_definitions()
query = 'CREATE TABLE test ({})'.format(column_definitions)
conn.execute(query)
conn.close()


class LowLevelConnectionTestCase(unittest.TestCase):
def test_creates_database_file_if_not_exists(self):
with TemporaryDirectory() as tmp_dir:
db_file = os.path.join(tmp_dir, 'test.db')
self.assertFalse(os.path.exists(db_file))
with LowLevelConnection(1, db_file) as _:
pass
self.assertTrue(os.path.exists(db_file))

def test_opens_connection(self):
with TemporaryDirectory() as tmp_dir:
with LowLevelConnection(1, os.path.join(tmp_dir, 'test.db')) as conn:
self.assertIsNotNone(conn)

def test_creates_expected_tables(self):
with TemporaryDirectory() as tmp_dir:
with LowLevelConnection(1, os.path.join(tmp_dir, 'test.db')) as conn:
cursor = conn.cursor()
tables = set(map(
lambda x: x[0],
cursor.execute(
'select name from sqlite_master where type="table"').fetchall()
))
self.assertEqual(
tables,
set([LowLevelConnection.TABLE_SCHEMA_VERSION, LowLevelConnection.TABLE_TRANSACTIONS
]))
self.assertEqual(
[(1,)],
cursor.execute(
'select count(*) from {}'.format(LowLevelConnection.TABLE_SCHEMA_VERSION)).fetchall()
)

def test_works_on_same_database_twice(self):
with TemporaryDirectory() as tmp_dir:
db_name = os.path.join(tmp_dir, 'test.db')
with LowLevelConnection(1, db_name) as _:
pass
with LowLevelConnection(1, db_name) as conn:
result = conn \
.execute('select count(*) from {}'.format(LowLevelConnection.TABLE_SCHEMA_VERSION)) \
.fetchall()
self.assertEqual(1, result[0][0])


class InsertTableTestCase(unittest.TestCase):

def test_create_temp_table_table_exists(self):
with TemporaryDirectory() as tmp_dir:
with LowLevelConnection(1, os.path.join(tmp_dir, 'test.db')) as conn:
table_schema, table_name = InsertTable.create_temp_table(conn)
# Fails if and only if table does not exist
conn.cursor().execute('select count(*) from {}.{}'.format(table_schema, table_name))

def test_create_temp_table_choses_other_table_if_exists(self):
with TemporaryDirectory() as tmp_dir:
with LowLevelConnection(1, os.path.join(tmp_dir, 'test.db')) as conn:
conn.cursor().execute('CREATE TEMPORARY TABLE insert_df_0 (id INT)')
table_schema, table_name = InsertTable.create_temp_table(conn)
self.assertEqual(table_schema, 'temp')
self.assertEqual(
table_name, 'insert_df_1', 'expected table creation to fail exactly the first time')

def test_it_removes_the_temporary_table(self):
test_data_frame = read_csv(os.path.join(
'pynance', 'test_data', 'dkb_cash_sample.csv'), SupportedCsvTypes.DKBCash)
# TODO: get rid of the 'drop' here
test_data_frame = test_data_frame.drop(['origin'], axis=1)
with TemporaryDirectory() as tmp_dir:
with LowLevelConnection(1, os.path.join(tmp_dir, 'test.db')) as conn:
insert_table_with_schema = ''

def check_if_table_exists():
conn.cursor().execute('select count(*) from {}'.format(insert_table_with_schema))

with InsertTable(conn, test_data_frame) as insert_table:
insert_table_with_schema = '{}.{}'.format(
insert_table[0], insert_table[1])
check_if_table_exists()

self.assertRaises(sqlite3.OperationalError,
check_if_table_exists)

def test_it_works_with_dataframes_from_text_importer(self):
def run_test(csv_file, df_format):
# Get the DataFrame
self.assertTrue(os.path.isfile(csv_file))
# TODO: Investigate what origin is good for and if we want to include it as column
# in the database as well.
data_frame = read_csv(csv_file, df_format).drop(['origin'], axis=1)
self.assertTrue(len(data_frame.index) > 0)

# Load it into the InserTable and test this
with TemporaryDirectory() as tmp_dir:
with LowLevelConnection(1, os.path.join(tmp_dir, 'test.db')) as conn:
with InsertTable(conn, data_frame) as insert_table:

data_frame_size = len(data_frame.index)
database_rows = conn.cursor() \
.execute('SELECT count(*) FROM {}.{}'.format(insert_table[0], insert_table[1])).fetchall()[0][0]

self.assertEqual(
data_frame_size, database_rows, 'not all (or more?) rows written to database')

run_test(os.path.join('pynance', 'test_data',
'dkb_cash_sample.csv'), SupportedCsvTypes.DKBCash)
run_test(os.path.join('pynance', 'test_data',
'dkb_visa_sample.csv'), SupportedCsvTypes.DKBVisa)


class StorageTestCase(unittest.TestCase):

def test_validate_dataframe_shape_complains_when_columns_are_missing(self):
"Assertion when columns are missing"
pass

def test_validate_dataframe_shape_accepts_aditional_columns(self):
"Does not compain when aditional columns are present"
pass

def test_append_dataframe_rejects_invalid_dataframes(self):
pass

def test_append_dataframe_returns_new_parts_with_id(self):
pass

def test_append_dataframe_returned_ids_are_the_same_as_in_load_dataframe(self):
pass

def test_append_dataframe_duplicats_are_left_out(self):
pass

def test_load_dataframe_works_with_new_storage_instance(self):
"implies new conn etc..."
pass
Loading