Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ on:

jobs:
tests:
runs-on: ubuntu-20.04
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
# last supported for sqlitecloud, last security maintained, last release
python-version: ["3.6", "3.8", "3.12"]
python-version: ["3.9", "3.10", "3.12"]

steps:
- uses: actions/checkout@v4
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
profile = "black"

[tool.pytest.ini_options]
log_level = "INFO"
markers = [
"slow: marks tests as slow (deselect with '-m \"not slow\"')",
"serial",
Expand Down
9 changes: 7 additions & 2 deletions src/sqlitecloud/dbapi2.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,8 +584,7 @@ def execute(
sql, parameters, self.connection.sqlitecloud_connection
)

self._resultset = None
self._result_operation = None
self._reset()

if isinstance(result, SQLiteCloudResult):
self._resultset = result
Expand Down Expand Up @@ -853,6 +852,12 @@ def _get_value(self, row: int, col: int) -> Optional[Any]:

return self._convert_value(value, colname, decltype)

def _reset(self) -> None:
self._resultset = None
self._result_operation = None

self._iter_row = 0

def __iter__(self) -> "Cursor":
return self

Expand Down
10 changes: 7 additions & 3 deletions src/sqlitecloud/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,10 @@ def _internal_setup_pubsub(
def _internal_pubsub_thread(self, connection: SQLiteCloudConnect) -> None:
blen = 2048
buffer: bytes = b""
tread = 0

try:
while True:
tread = 0

try:
if not connection.pubsub_socket:
Expand Down Expand Up @@ -240,7 +240,6 @@ def _internal_pubsub_thread(self, connection: SQLiteCloudConnect) -> None:

nread = len(data)
tread += nread
blen -= nread
buffer += data

sqlitecloud_number = self._internal_parse_number(buffer)
Expand All @@ -262,11 +261,16 @@ def _internal_pubsub_thread(self, connection: SQLiteCloudConnect) -> None:
connection.pubsub_callback(
connection, SQLiteCloudResultSet(result), connection.pubsub_data
)

# reset after having read the message
tread = 0
buffer: bytes = b""
except Exception as e:
logging.error(f"An error occurred while parsing data: {e}.")

finally:
connection.pubsub_callback(connection, None, connection.pubsub_data)
if connection and connection.pubsub_callback:
connection.pubsub_callback(connection, None, connection.pubsub_data)

def upload_database(
self,
Expand Down
15 changes: 15 additions & 0 deletions src/tests/integration/test_dbapi2.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,3 +441,18 @@ def test_connection_is_connected(self, sqlitecloud_dbapi2_connection):
connection.close()

assert not connection.is_connected()

def test_fetchall_returns_right_nrows_number(self, sqlitecloud_dbapi2_connection):
connection = sqlitecloud_dbapi2_connection

cursor = connection.cursor()

cursor.execute("SELECT * FROM Genres LIMIT 3")

assert len(cursor.fetchall()) == 3
assert cursor.rowcount == 3

cursor.execute("SELECT * FROM Albums LIMIT 4")

assert len(cursor.fetchall()) == 4
assert cursor.rowcount == 4
32 changes: 32 additions & 0 deletions src/tests/integration/test_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,38 @@ def assert_callback(conn, result, data):

assert callback_called

def test_notify_multiple_messages(self, sqlitecloud_connection):
connection, _ = sqlitecloud_connection

called_times = 3
flag = threading.Event()

def assert_callback(conn, result, data):
nonlocal called_times
nonlocal flag

if isinstance(result, SQLiteCloudResultSet):
assert data == ["somedataX"]
called_times -= 1
if called_times == 0:
flag.set()

pubsub = SQLiteCloudPubSub()
subject_type = SQLITECLOUD_PUBSUB_SUBJECT.CHANNEL
channel = "channel" + str(uuid.uuid4())

pubsub.create_channel(connection, channel)
pubsub.listen(connection, subject_type, channel, assert_callback, ["somedataX"])

pubsub.notify_channel(connection, channel, "somedataX")
pubsub.notify_channel(connection, channel, "somedataX")
pubsub.notify_channel(connection, channel, "somedataX")

# wait for callback to be called
flag.wait(30)

assert called_times == 0

def test_unlisten_channel(self, sqlitecloud_connection):
connection, _ = sqlitecloud_connection

Expand Down