Skip to content

Commit 6b20506

Browse files
jyecuschtjholm
andauthored
fix: fix bucket url and permission types
* fix: allow non-timedelta expiry values for upload/download URLs * fix: replace incorrect bucket permission values Existing values will still work, with a deprecation warning. --------- Co-authored-by: Tim Holm <tim.holm@nitric.io>
1 parent 3698993 commit 6b20506

1 file changed

Lines changed: 60 additions & 16 deletions

File tree

nitric/resources/buckets.py

Lines changed: 60 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from dataclasses import dataclass
2323
from datetime import timedelta
2424
from enum import Enum
25-
from typing import Callable, List, Literal, Optional, Union
25+
from typing import Callable, List, Literal, Optional, Union, cast
2626
from warnings import warn
2727

2828
import betterproto
@@ -146,20 +146,40 @@ async def delete(self):
146146
except GRPCError as grpc_err:
147147
raise exception_from_grpc_error(grpc_err) from grpc_err
148148

149-
async def upload_url(self, expiry: Optional[timedelta] = None):
150-
"""Get a temporary writable URL to this file."""
151-
return await self.sign_url(mode=FileMode.WRITE, expiry=expiry)
149+
async def upload_url(self, expiry: Optional[Union[timedelta, int]] = None):
150+
"""
151+
Get a temporary writable URL to this file.
152152
153-
async def download_url(self, expiry: Optional[timedelta] = None):
154-
"""Get a temporary readable URL to this file."""
155-
return await self.sign_url(mode=FileMode.READ, expiry=expiry)
153+
Parameters:
156154
157-
async def sign_url(self, mode: FileMode = FileMode.READ, expiry: Optional[timedelta] = None):
158-
"""Generate a signed URL for reading or writing to a file."""
159-
warn("File.sign_url() is deprecated, use upload_url() or download_url() instead", DeprecationWarning)
155+
expiry (timedelta or int, optional): The expiry time for the signed URL.
156+
If an integer is provided, it is treated as seconds. Default is 600 seconds.
160157
158+
Returns:
159+
str: The signed URL.
160+
"""
161+
return await self._sign_url(mode=FileMode.WRITE, expiry=expiry)
162+
163+
async def download_url(self, expiry: Optional[Union[timedelta, int]] = None):
164+
"""
165+
Get a temporary readable URL to this file.
166+
167+
Parameters:
168+
169+
expiry (timedelta or int, optional): The expiry time for the signed URL.
170+
If an integer is provided, it is treated as seconds. Default is 600 seconds.
171+
172+
Returns:
173+
str: The signed URL.
174+
"""
175+
return await self._sign_url(mode=FileMode.READ, expiry=expiry)
176+
177+
async def _sign_url(self, mode: FileMode = FileMode.READ, expiry: Optional[Union[timedelta, int]] = None):
178+
"""Generate a signed URL for reading or writing to a file."""
161179
if expiry is None:
162180
expiry = timedelta(seconds=600)
181+
if not isinstance(expiry, timedelta):
182+
expiry = timedelta(seconds=expiry)
163183

164184
try:
165185
response = await self._bucket._storage_stub.pre_sign_url( # type: ignore pylint: disable=protected-access
@@ -172,7 +192,25 @@ async def sign_url(self, mode: FileMode = FileMode.READ, expiry: Optional[timede
172192
raise exception_from_grpc_error(grpc_err) from grpc_err
173193

174194

175-
BucketPermission = Literal["reading", "writing", "deleting"]
195+
LegacyBucketPermission = Literal["reading", "writing", "deleting"]
196+
BucketPermission = Literal["read", "write", "delete"]
197+
198+
legacy_perms: List[LegacyBucketPermission] = ["reading", "writing", "deleting"]
199+
new_perms: List[BucketPermission] = ["read", "write", "delete"]
200+
201+
202+
def check_permission(permission: Union[LegacyBucketPermission, BucketPermission]) -> BucketPermission:
203+
"""Check if the permission is valid and return the new permission if it is a legacy permission."""
204+
if permission in legacy_perms:
205+
new_perm = new_perms[legacy_perms.index(cast(LegacyBucketPermission, permission))]
206+
warn(
207+
f"The permission '{permission}' is deprecated. Use '{new_perm}' instead.", DeprecationWarning, stacklevel=2
208+
)
209+
return new_perm
210+
elif permission in new_perms:
211+
return cast(BucketPermission, permission)
212+
else:
213+
raise ValueError("Invalid permission value, must be one of 'read', 'write', or 'delete'.")
176214

177215

178216
class BucketNotificationWorkerOptions:
@@ -211,19 +249,25 @@ async def _register(self) -> None:
211249

212250
def _perms_to_actions(self, *args: BucketPermission) -> List[Action]:
213251
permission_actions_map: dict[BucketPermission, List[Action]] = {
214-
"reading": [Action.BucketFileGet, Action.BucketFileList],
215-
"writing": [Action.BucketFilePut],
216-
"deleting": [Action.BucketFileDelete],
252+
"read": [Action.BucketFileGet, Action.BucketFileList],
253+
"write": [Action.BucketFilePut],
254+
"delete": [Action.BucketFileDelete],
217255
}
218256

219257
return [action for perm in args for action in permission_actions_map[perm]]
220258

221259
def _to_resource_id(self) -> ResourceIdentifier:
222260
return ResourceIdentifier(name=self.name, type=ResourceType.Bucket) # type:ignore
223261

224-
def allow(self, perm: BucketPermission, *args: BucketPermission) -> BucketRef:
262+
def allow(
263+
self,
264+
perm: Union[LegacyBucketPermission, BucketPermission],
265+
*args: Union[LegacyBucketPermission, BucketPermission],
266+
) -> BucketRef:
225267
"""Request the required permissions for this resource."""
226-
str_args = [str(perm)] + [str(permission) for permission in args]
268+
all_perms: List[BucketPermission] = [check_permission(perm)] + [check_permission(p) for p in args]
269+
270+
str_args = [str(permission) for permission in all_perms]
227271
self._register_policy(*str_args)
228272

229273
return BucketRef(self.name)

0 commit comments

Comments
 (0)