Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 47 additions & 5 deletions example.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from pprint import pprint

import sailpoint
import sailpoint.v3
import sailpoint.beta
import sailpoint.v3
import sailpoint.v2025
from sailpoint.configuration import Configuration
from sailpoint.paginator import Paginator
from sailpoint.v3.models.search import Search
from pprint import pprint
from sailpoint.v2025.models.account import Account

configuration = Configuration()

Expand Down Expand Up @@ -36,7 +39,7 @@
"Exception when calling AccessProfilesApi->list_access_profiles: %s\n" % e
)

# Use the paginator with search
#Use the paginator with search

search = Search()
search.indices = ['identities']
Expand All @@ -46,8 +49,16 @@
identities = Paginator.paginate_search(sailpoint.v3.SearchApi(api_client),search, 250, 1000)
for identity in identities:
print(identity['name'])


# Stream search results using paginate_stream_search
search_stream = Search()
search_stream.indices = ['identities']
search_stream.query = { 'query': '*' }
search_stream.sort = ['-name']

print("Streaming search results (paginate_stream_search):\n")
for identity in Paginator.paginate_stream_search(sailpoint.v3.SearchApi(api_client), search_stream, 250, 1000):
print(identity['name'])

# Use the paginator to paginate 1000 accounts 100 at a time
accounts = Paginator.paginate(sailpoint.v3.AccountsApi(api_client).list_accounts, 1000, limit=100)
Expand All @@ -60,4 +71,35 @@

workgroups = sailpoint.beta.GovernanceGroupsApi(api_client).list_workgroups()
for workgroup in workgroups:
print(workgroup.name)
print(workgroup.name)

#Stream v2025 accounts with optional model typing
with sailpoint.v2025.ApiClient(configuration) as api_client:
try:
account_stream = Paginator.paginate_stream(
sailpoint.v2025.AccountsApi(api_client).list_accounts,
1000,
limit=100,
model=Account
)
print("Streaming v2025 accounts (paginate_stream with model=Account):\n")
for account in account_stream:
print(account.name)
except Exception as e:
print("Exception when streaming accounts: %s\n" % e)

# Stream v2025 accounts with HTTP info (status code, headers) and optional model typing
with sailpoint.v2025.ApiClient(configuration) as api_client:
try:
account_stream = Paginator.paginate_stream_with_http_info(
sailpoint.v2025.AccountsApi(api_client).list_accounts_with_http_info,
1000,
limit=100,
model=Account
)
print("Streaming v2025 accounts (paginate_stream_with_http_info):\n")
for account, response in account_stream:
print(f"[{response.status_code}] {account.name}")
except Exception as e:
print("Exception when streaming accounts with http info: %s\n" % e)

148 changes: 147 additions & 1 deletion sailpoint/paginator.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from sailpoint.v3.api.search_api import SearchApi
from sailpoint.v3.models.search import Search
from typing import TypeVar
from typing import Any, Iterator, Optional, Tuple, Type, TypeVar

T = TypeVar('T')
TItem = TypeVar('TItem')

class PaginationParams:
limit: int
Expand Down Expand Up @@ -48,6 +49,151 @@ def paginate(T, result_limit, **kwargs) -> T:

kwargs['offset'] += increment

@staticmethod
def paginate_stream(
api_call,
result_limit: Optional[int] = None,
*,
model: Optional[Type[TItem]] = None,
**kwargs
) -> Iterator[TItem]:
"""
Stream paginated results by yielding items as each API page is received.
Optional model parameter is for typing only (e.g. Iterator[Account] when model=Account).
"""
result_limit = result_limit if result_limit else 1000
increment = kwargs.get('limit') if kwargs.get('limit') is not None else 250
kwargs['offset'] = kwargs.get('offset') if kwargs.get('offset') is not None else 0
yielded = 0

while True:
print(f'Paginating call, offset = {kwargs["offset"]}')

results = api_call(**kwargs)

if isinstance(results, list):
batch = results
else:
batch = results.data

for item in batch:
yield item
yielded += 1
if result_limit > 0 and yielded >= result_limit:
return

if len(batch) < increment:
return

kwargs['offset'] += increment

@staticmethod
def paginate_stream_with_http_info(
api_call,
result_limit: Optional[int] = None,
*,
model: Optional[Type[TItem]] = None,
**kwargs
) -> Iterator[Tuple[TItem, Any]]:
"""
Stream paginated results from a _with_http_info API call.
Yields (item, response) tuples so callers can inspect status_code/headers
for every page, not just the first.
"""
result_limit = result_limit if result_limit else 1000
increment = kwargs.get('limit') if kwargs.get('limit') is not None else 250
kwargs['offset'] = kwargs.get('offset') if kwargs.get('offset') is not None else 0
yielded = 0

while True:
print(f'Paginating call, offset = {kwargs["offset"]}')
response = api_call(**kwargs)
batch = response.data

for item in batch:
yield (item, response)
yielded += 1
if result_limit > 0 and yielded >= result_limit:
return

if len(batch) < increment:
return

kwargs['offset'] += increment

@staticmethod
def paginate_stream_search(search_api: SearchApi, search: Search, increment: int, limit: int):
"""
Stream search results by yielding each result as it is received from each API page.
"""
increment = increment if increment else 250
max_limit = limit if limit else 0
yielded = 0

if search.sort is None or len(search.sort) != 1:
raise Exception('search query must include exactly one sort parameter to paginate properly')

while True:
print(f'Paginating call')
results = search_api.search_post(search, None, increment)

for result in results:
yield result
yielded += 1
if max_limit > 0 and yielded >= max_limit:
return

print(f'Received {len(results)} results')

if len(results) < increment:
return

result = results[len(results) - 1]
if result[search.sort[0].strip('+-')] is not None:
next_search_after = result[str(search.sort[0]).strip('+-')]
search.search_after = [next_search_after]
else:
raise Exception('Search unexpectedly did not return a result we can search after!')

@staticmethod
def paginate_stream_search_with_http_info(
search_api: SearchApi, search: Search, increment: int, limit: int
) -> Iterator[Tuple[Any, Any]]:
"""
Stream search results from search_post_with_http_info.
Yields (item, response) tuples so callers can inspect status_code/headers
for every page, not just the first.
"""
increment = increment if increment else 250
max_limit = limit if limit else 0
yielded = 0

if search.sort is None or len(search.sort) != 1:
raise Exception('search query must include exactly one sort parameter to paginate properly')

while True:
print(f'Paginating call')
response = search_api.search_post_with_http_info(search, None, increment)
batch = response.data

for result in batch:
yield (result, response)
yielded += 1
if max_limit > 0 and yielded >= max_limit:
return

print(f'Received {len(batch)} results')

if len(batch) < increment:
return

last = batch[len(batch) - 1]
if last[search.sort[0].strip('+-')] is not None:
next_search_after = last[str(search.sort[0]).strip('+-')]
search.search_after = [next_search_after]
else:
raise Exception('Search unexpectedly did not return a result we can search after!')

@staticmethod
def paginate_search(search_api: SearchApi, search: Search, increment: int, limit: int):
increment = increment if increment else 250
Expand Down
84 changes: 84 additions & 0 deletions validation_test.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from typing import Any
import unittest

import sailpoint.beta
import sailpoint.v3
import sailpoint.v2024
import sailpoint.v2025
import sailpoint.v2026
from sailpoint.configuration import Configuration, ConfigurationParams
from sailpoint.paginator import Paginator
Expand All @@ -16,6 +18,7 @@ class TestPythonSDK(unittest.TestCase):
beta_api_client = sailpoint.beta.ApiClient(configuration)
configuration.experimental = True
v2024_api_client = sailpoint.v2024.ApiClient(configuration)
v2025_api_client = sailpoint.v2025.ApiClient(configuration)
v2026_api_client = sailpoint.v2026.ApiClient(configuration)


Expand Down Expand Up @@ -47,6 +50,19 @@ def test_search_pagination(self):
self.assertEqual(100,len(search_results.data))
self.assertEqual(200,search_results.status_code)

def test_paginate_stream_search(self):
"""Stream search yields same count as paginate_search when fully consumed."""
search = Search()
search.indices = ['identities']
search.query = {'query': '*'}
search.sort = ['-name']

stream = Paginator.paginate_stream_search(
sailpoint.v3.SearchApi(self.v3_api_client), search, 10, 100
)
items = list(stream)
self.assertEqual(100, len(items))

def test_list_transforms(self):
transforms = sailpoint.v3.TransformsApi(self.v3_api_client).list_transforms_with_http_info()
self.assertIsNotNone(transforms.data)
Expand All @@ -58,6 +74,74 @@ def test_pagination(self):
self.assertEqual(100, len(accounts.data))
self.assertEqual(200, accounts.status_code)

def test_paginate_stream(self):
"""Stream yields same count as paginate when fully consumed."""
stream = Paginator.paginate_stream(
sailpoint.v3.AccountsApi(self.v3_api_client).list_accounts_with_http_info,
100,
limit=10
)
items = list(stream)
self.assertEqual(100, len(items))

def test_paginate_stream_consumed_incrementally(self):
"""Stream yields items as they come; consuming first N then stopping does not require full fetch."""
stream = Paginator.paginate_stream(
sailpoint.v3.AccountsApi(self.v3_api_client).list_accounts_with_http_info,
100,
limit=2
)
first_three = []
for i, item in enumerate(stream):
first_three.append(item)
if i >= 2:
break
self.assertGreaterEqual(len(first_three), 1)
self.assertLessEqual(len(first_three), 3)

def test_paginate_stream_with_model_v2025(self):
"""When model=Account is passed, yielded items are typed (and are Account instances)."""
from sailpoint.v2025.models.account import Account

stream = Paginator.paginate_stream(
sailpoint.v2025.AccountsApi(self.v2025_api_client).list_accounts_with_http_info,
10,
limit=5,
model=Account
)
for item in stream:
self.assertIsInstance(item, Account)
break # at least one item has correct type

def test_paginate_stream_with_http_info(self):
"""Yields (item, response) tuples; every tuple carries the response from its page."""
stream = Paginator.paginate_stream_with_http_info(
sailpoint.v3.AccountsApi(self.v3_api_client).list_accounts_with_http_info,
100,
limit=10
)
items = []
for item, response in stream:
self.assertEqual(200, response.status_code)
items.append(item)
self.assertEqual(100, len(items))

def test_paginate_stream_search_with_http_info(self):
"""Yields (item, response) tuples; every tuple carries the response from its page."""
search = Search()
search.indices = ['identities']
search.query = {'query': '*'}
search.sort = ['-name']

stream = Paginator.paginate_stream_search_with_http_info(
sailpoint.v3.SearchApi(self.v3_api_client), search, 10, 100
)
items = []
for item, response in stream:
self.assertEqual(200, response.status_code)
items.append(item)
self.assertEqual(100, len(items))

def test_list_accounts_beta(self):
accounts = sailpoint.beta.AccountsApi(self.beta_api_client).list_accounts_with_http_info()
self.assertIsNotNone(accounts.data)
Expand Down