Skip to content
131 changes: 98 additions & 33 deletions tests/system/test_zonal.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import google_crc32c

import pytest
import gc

# current library imports
from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
Expand All @@ -18,6 +19,35 @@
from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import (
AsyncMultiRangeDownloader,
)
import psutil

pid = os.getpid()
print(f"Test running in process ID,sleeping: {pid}")
import time

# time.sleep(10)


def log_num_of_open_fd(position):
"""Helper function to log the number of open file descriptors."""
import inspect

func_name = inspect.stack()[1].function
print(f"\n----------- In function: {func_name} at {position} -----------")
process = psutil.Process(pid)
print(
f"Number of TCP connections for process: {len(process.net_connections(kind='tcp'))}"
)
try:
system_tcp_connections = len(psutil.net_connections(kind="tcp"))
print(
f"Total number of TCP connections on the system: {system_tcp_connections}"
)
except psutil.AccessDenied:
print(
"Could not determine total number of TCP connections on the system (permission denied)."
)


pytestmark = pytest.mark.skipif(
os.getenv("RUN_ZONAL_SYSTEM_TESTS") != "True",
Expand All @@ -36,34 +66,34 @@ def _get_equal_dist(a: int, b: int) -> tuple[int, int]:
return a + step, a + 2 * step


async def write_one_appendable_object(
bucket_name: str,
object_name: str,
data: bytes,
) -> None:
"""Helper to write an appendable object."""
grpc_client = AsyncGrpcClient(attempt_direct_path=True).grpc_client
writer = AsyncAppendableObjectWriter(grpc_client, bucket_name, object_name)
await writer.open()
await writer.append(data)
await writer.close()


@pytest.fixture(scope="function")
def appendable_object(storage_client, blobs_to_delete):
"""Fixture to create and cleanup an appendable object."""
object_name = f"appendable_obj_for_mrd-{str(uuid.uuid4())[:4]}"
asyncio.run(
write_one_appendable_object(
_ZONAL_BUCKET,
object_name,
_BYTES_TO_UPLOAD,
)
)
yield object_name

# Clean up; use json client (i.e. `storage_client` fixture) to delete.
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
# async def write_one_appendable_object(
# bucket_name: str,
# object_name: str,
# data: bytes,
# ) -> None:
# """Helper to write an appendable object."""
# grpc_client = AsyncGrpcClient(attempt_direct_path=True).grpc_client
# writer = AsyncAppendableObjectWriter(grpc_client, bucket_name, object_name)
# await writer.open()
# await writer.append(data)
# await writer.close()


# @pytest.fixture(scope="function")
# def appendable_object(storage_client, blobs_to_delete):
# """Fixture to create and cleanup an appendable object."""
# object_name = f"appendable_obj_for_mrd-{str(uuid.uuid4())[:4]}"
# asyncio.run(
# write_one_appendable_object(
# _ZONAL_BUCKET,
# object_name,
# _BYTES_TO_UPLOAD,
# )
# )
# yield object_name

# # Clean up; use json client (i.e. `storage_client` fixture) to delete.
# blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))


@pytest.mark.asyncio
Expand All @@ -82,6 +112,7 @@ def appendable_object(storage_client, blobs_to_delete):
async def test_basic_wrd(
storage_client, blobs_to_delete, attempt_direct_path, object_size
):
log_num_of_open_fd("start")
object_name = f"test_basic_wrd-{str(uuid.uuid4())}"

# Client instantiation; it cannot be part of fixture because.
Expand Down Expand Up @@ -114,6 +145,10 @@ async def test_basic_wrd(

# Clean up; use json client (i.e. `storage_client` fixture) to delete.
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
del writer
del mrd
gc.collect()
log_num_of_open_fd("end")


@pytest.mark.asyncio
Expand All @@ -126,6 +161,7 @@ async def test_basic_wrd(
],
)
async def test_basic_wrd_in_slices(storage_client, blobs_to_delete, object_size):
log_num_of_open_fd("start")
object_name = f"test_basic_wrd-{str(uuid.uuid4())}"

# Client instantiation; it cannot be part of fixture because.
Expand Down Expand Up @@ -161,18 +197,28 @@ async def test_basic_wrd_in_slices(storage_client, blobs_to_delete, object_size)

# Clean up; use json client (i.e. `storage_client` fixture) to delete.
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
del writer
del mrd
gc.collect()
log_num_of_open_fd("end")


@pytest.mark.asyncio
@pytest.mark.parametrize(
"flush_interval",
[2 * 1024 * 1024, 4 * 1024 * 1024, 8 * 1024 * 1024, _DEFAULT_FLUSH_INTERVAL_BYTES],
[
2 * 1024 * 1024,
4 * 1024 * 1024,
8 * 1024 * 1024,
_DEFAULT_FLUSH_INTERVAL_BYTES,
],
)
async def test_wrd_with_non_default_flush_interval(
storage_client,
blobs_to_delete,
flush_interval,
):
log_num_of_open_fd("start")
object_name = f"test_basic_wrd-{str(uuid.uuid4())}"
object_size = 9 * 1024 * 1024

Expand Down Expand Up @@ -214,10 +260,15 @@ async def test_wrd_with_non_default_flush_interval(

# Clean up; use json client (i.e. `storage_client` fixture) to delete.
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
del writer
del mrd
gc.collect()
log_num_of_open_fd("end")


@pytest.mark.asyncio
async def test_read_unfinalized_appendable_object(storage_client, blobs_to_delete):
log_num_of_open_fd("start")
object_name = f"read_unfinalized_appendable_object-{str(uuid.uuid4())[:4]}"
grpc_client = AsyncGrpcClient(attempt_direct_path=True).grpc_client

Expand All @@ -237,20 +288,30 @@ async def test_read_unfinalized_appendable_object(storage_client, blobs_to_delet

# Clean up; use json client (i.e. `storage_client` fixture) to delete.
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
del writer
del mrd
gc.collect()
log_num_of_open_fd("end")


@pytest.mark.asyncio
async def test_mrd_open_with_read_handle(appendable_object):
grpc_client = AsyncGrpcClient(attempt_direct_path=True).grpc_client
async def test_mrd_open_with_read_handle():
log_num_of_open_fd("start")
grpc_client = AsyncGrpcClient().grpc_client
object_name = f"test_read_handl-{str(uuid.uuid4())[:4]}"
writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name)
await writer.open()
await writer.append(_BYTES_TO_UPLOAD)
await writer.close()

mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, appendable_object)
mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name)
await mrd.open()
read_handle = mrd.read_handle
await mrd.close()

# Open a new MRD using the `read_handle` obtained above
new_mrd = AsyncMultiRangeDownloader(
grpc_client, _ZONAL_BUCKET, appendable_object, read_handle=read_handle
grpc_client, _ZONAL_BUCKET, object_name, read_handle=read_handle
)
await new_mrd.open()
# persisted_size not set when opened with read_handle
Expand All @@ -259,3 +320,7 @@ async def test_mrd_open_with_read_handle(appendable_object):
await new_mrd.download_ranges([(0, 0, buffer)])
await new_mrd.close()
assert buffer.getvalue() == _BYTES_TO_UPLOAD
del mrd
del new_mrd
gc.collect()
log_num_of_open_fd("end")