Skip to content

Commit cfb6bcd

Browse files
Merge pull request #128 from sailpoint-oss/devrel-2511
Add yield to the paginator
2 parents 6072f0e + 40409f4 commit cfb6bcd

3 files changed

Lines changed: 327 additions & 11 deletions

File tree

example.py

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
1+
import logging
2+
from pprint import pprint
3+
4+
logging.basicConfig(level=logging.DEBUG)
5+
16
import sailpoint
2-
import sailpoint.v3
37
import sailpoint.beta
8+
import sailpoint.v3
9+
import sailpoint.v2025
410
from sailpoint.configuration import Configuration
511
from sailpoint.paginator import Paginator
612
from sailpoint.v3.models.search import Search
7-
from pprint import pprint
13+
from sailpoint.v2025.models.account import Account
814

915
configuration = Configuration()
1016

@@ -36,7 +42,7 @@
3642
"Exception when calling AccessProfilesApi->list_access_profiles: %s\n" % e
3743
)
3844

39-
# Use the paginator with search
45+
#Use the paginator with search
4046

4147
search = Search()
4248
search.indices = ['identities']
@@ -46,8 +52,16 @@
4652
identities = Paginator.paginate_search(sailpoint.v3.SearchApi(api_client),search, 250, 1000)
4753
for identity in identities:
4854
print(identity['name'])
49-
5055

56+
# Stream search results using paginate_stream_search
57+
search_stream = Search()
58+
search_stream.indices = ['identities']
59+
search_stream.query = { 'query': '*' }
60+
search_stream.sort = ['-name']
61+
62+
print("Streaming search results (paginate_stream_search):\n")
63+
for identity in Paginator.paginate_stream_search(sailpoint.v3.SearchApi(api_client), search_stream, 250, 1000):
64+
print(identity['name'])
5165

5266
# Use the paginator to paginate 1000 accounts 100 at a time
5367
accounts = Paginator.paginate(sailpoint.v3.AccountsApi(api_client).list_accounts, 1000, limit=100)
@@ -60,4 +74,36 @@
6074

6175
workgroups = sailpoint.beta.GovernanceGroupsApi(api_client).list_workgroups()
6276
for workgroup in workgroups:
63-
print(workgroup.name)
77+
print(workgroup.name)
78+
79+
#Stream v2025 accounts with optional model typing
80+
with sailpoint.v2025.ApiClient(configuration) as api_client:
81+
try:
82+
account_stream = Paginator.paginate_stream(
83+
sailpoint.v2025.AccountsApi(api_client).list_accounts,
84+
1000,
85+
limit=100,
86+
model=Account
87+
)
88+
print("Streaming v2025 accounts (paginate_stream with model=Account):\n")
89+
for account in account_stream:
90+
print(account.name)
91+
except Exception as e:
92+
print("Exception when streaming accounts: %s\n" % e)
93+
94+
# Stream v2025 accounts with HTTP info (status code, headers) and optional model typing
95+
with sailpoint.v2025.ApiClient(configuration) as api_client:
96+
try:
97+
account_stream = Paginator.paginate_stream_with_http_info(
98+
sailpoint.v2025.AccountsApi(api_client).list_accounts_with_http_info,
99+
1000,
100+
limit=100,
101+
model=Account
102+
)
103+
print("Streaming v2025 accounts (paginate_stream_with_http_info):\n")
104+
for account, response in account_stream:
105+
print(f"[{response.status_code}] {account.name}")
106+
except Exception as e:
107+
print("Exception when streaming accounts with http info: %s\n" % e)
108+
109+

sailpoint/paginator.py

Lines changed: 192 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1+
import logging
12
from sailpoint.v3.api.search_api import SearchApi
23
from sailpoint.v3.models.search import Search
3-
from typing import TypeVar
4+
from typing import Any, Callable, Iterator, Optional, Tuple, Type, TypeVar, overload
45

56
T = TypeVar('T')
7+
TItem = TypeVar('TItem')
8+
9+
logger = logging.getLogger(__name__)
610

711
class PaginationParams:
812
limit: int
@@ -27,7 +31,7 @@ def paginate(T, result_limit, **kwargs) -> T:
2731

2832
modified = []
2933
while True:
30-
print(f'Paginating call, offset = {kwargs["offset"]}')
34+
logger.debug(f'Paginating call, offset = {kwargs["offset"]}')
3135

3236
# Call endpoint and pass any arguments
3337
results = T(**kwargs)
@@ -48,6 +52,188 @@ def paginate(T, result_limit, **kwargs) -> T:
4852

4953
kwargs['offset'] += increment
5054

55+
@overload
56+
@staticmethod
57+
def paginate_stream(
58+
api_call: Callable[..., Any],
59+
result_limit: Optional[int] = None,
60+
*,
61+
model: Type[TItem],
62+
**kwargs
63+
) -> Iterator[TItem]: ...
64+
65+
@overload
66+
@staticmethod
67+
def paginate_stream(
68+
api_call: Callable[..., Any],
69+
result_limit: Optional[int] = None,
70+
**kwargs
71+
) -> Iterator[Any]: ...
72+
73+
@staticmethod
74+
def paginate_stream(
75+
api_call: Callable[..., Any],
76+
result_limit: Optional[int] = None,
77+
*,
78+
model: Optional[Type[TItem]] = None,
79+
**kwargs
80+
) -> Iterator[TItem]:
81+
"""
82+
Stream paginated results by yielding items as each API page is received.
83+
When model is provided, the iterator is typed as Iterator[model] for IDE support.
84+
"""
85+
result_limit = result_limit if result_limit else 1000
86+
increment = kwargs.get('limit') if kwargs.get('limit') is not None else 250
87+
kwargs['offset'] = kwargs.get('offset') if kwargs.get('offset') is not None else 0
88+
yielded = 0
89+
90+
while True:
91+
logger.debug(f'Paginating call, offset = {kwargs["offset"]}')
92+
93+
results = api_call(**kwargs)
94+
95+
if isinstance(results, list):
96+
batch = results
97+
else:
98+
batch = results.data
99+
100+
for item in batch:
101+
yield item
102+
yielded += 1
103+
if result_limit > 0 and yielded >= result_limit:
104+
return
105+
106+
if len(batch) < increment:
107+
return
108+
109+
kwargs['offset'] += increment
110+
111+
@overload
112+
@staticmethod
113+
def paginate_stream_with_http_info(
114+
api_call: Callable[..., Any],
115+
result_limit: Optional[int] = None,
116+
*,
117+
model: Type[TItem],
118+
**kwargs
119+
) -> Iterator[Tuple[TItem, Any]]: ...
120+
121+
@overload
122+
@staticmethod
123+
def paginate_stream_with_http_info(
124+
api_call: Callable[..., Any],
125+
result_limit: Optional[int] = None,
126+
**kwargs
127+
) -> Iterator[Tuple[Any, Any]]: ...
128+
129+
@staticmethod
130+
def paginate_stream_with_http_info(
131+
api_call: Callable[..., Any],
132+
result_limit: Optional[int] = None,
133+
*,
134+
model: Optional[Type[TItem]] = None,
135+
**kwargs
136+
) -> Iterator[Tuple[TItem, Any]]:
137+
"""
138+
Stream paginated results from a _with_http_info API call.
139+
Yields (item, response) tuples so callers can inspect status_code/headers
140+
for every page, not just the first.
141+
When model is provided, items in the tuples are typed as model for IDE support.
142+
"""
143+
result_limit = result_limit if result_limit else 1000
144+
increment = kwargs.get('limit') if kwargs.get('limit') is not None else 250
145+
kwargs['offset'] = kwargs.get('offset') if kwargs.get('offset') is not None else 0
146+
yielded = 0
147+
148+
while True:
149+
logger.debug(f'Paginating call, offset = {kwargs["offset"]}')
150+
response = api_call(**kwargs)
151+
batch = response.data
152+
153+
for item in batch:
154+
yield (item, response)
155+
yielded += 1
156+
if result_limit > 0 and yielded >= result_limit:
157+
return
158+
159+
if len(batch) < increment:
160+
return
161+
162+
kwargs['offset'] += increment
163+
164+
@staticmethod
165+
def paginate_stream_search(search_api: SearchApi, search: Search, increment: int, limit: int):
166+
"""
167+
Stream search results by yielding each result as it is received from each API page.
168+
"""
169+
increment = increment if increment else 250
170+
max_limit = limit if limit else 0
171+
yielded = 0
172+
173+
if search.sort is None or len(search.sort) != 1:
174+
raise Exception('search query must include exactly one sort parameter to paginate properly')
175+
176+
while True:
177+
logger.debug('Paginating call')
178+
results = search_api.search_post(search, None, increment)
179+
180+
for result in results:
181+
yield result
182+
yielded += 1
183+
if max_limit > 0 and yielded >= max_limit:
184+
return
185+
186+
logger.debug(f'Received {len(results)} results')
187+
188+
if len(results) < increment:
189+
return
190+
191+
result = results[len(results) - 1]
192+
if result[search.sort[0].strip('+-')] is not None:
193+
next_search_after = result[str(search.sort[0]).strip('+-')]
194+
search.search_after = [next_search_after]
195+
else:
196+
raise Exception('Search unexpectedly did not return a result we can search after!')
197+
198+
@staticmethod
199+
def paginate_stream_search_with_http_info(
200+
search_api: SearchApi, search: Search, increment: int, limit: int
201+
) -> Iterator[Tuple[Any, Any]]:
202+
"""
203+
Stream search results from search_post_with_http_info.
204+
Yields (item, response) tuples so callers can inspect status_code/headers
205+
for every page, not just the first.
206+
"""
207+
increment = increment if increment else 250
208+
max_limit = limit if limit else 0
209+
yielded = 0
210+
211+
if search.sort is None or len(search.sort) != 1:
212+
raise Exception('search query must include exactly one sort parameter to paginate properly')
213+
214+
while True:
215+
logger.debug('Paginating call')
216+
response = search_api.search_post_with_http_info(search, None, increment)
217+
batch = response.data
218+
219+
for result in batch:
220+
yield (result, response)
221+
yielded += 1
222+
if max_limit > 0 and yielded >= max_limit:
223+
return
224+
225+
logger.debug(f'Received {len(batch)} results')
226+
227+
if len(batch) < increment:
228+
return
229+
230+
last = batch[len(batch) - 1]
231+
if last[search.sort[0].strip('+-')] is not None:
232+
next_search_after = last[str(search.sort[0]).strip('+-')]
233+
search.search_after = [next_search_after]
234+
else:
235+
raise Exception('Search unexpectedly did not return a result we can search after!')
236+
51237
@staticmethod
52238
def paginate_search(search_api: SearchApi, search: Search, increment: int, limit: int):
53239
increment = increment if increment else 250
@@ -60,11 +246,11 @@ def paginate_search(search_api: SearchApi, search: Search, increment: int, limit
60246
raise Exception('search query must include exactly one sort parameter to paginate properly')
61247

62248
while True:
63-
print(f'Paginating call, offset = {offset}')
249+
logger.debug(f'Paginating call, offset = {offset}')
64250
results = search_api.search_post(search, None, increment)
65251
modified = modified + results
66252

67-
print(f'Received {len(results)} results')
253+
logger.debug(f'Received {len(results)} results')
68254

69255
if len(results) < increment or (len(modified) >= max_limit and max_limit > 0):
70256
results = modified
@@ -91,11 +277,11 @@ def paginate_search_with_http_info(search_api: SearchApi, search: Search, increm
91277
raise Exception('search query must include exactly one sort parameter to paginate properly')
92278

93279
while True:
94-
print(f'Paginating call, offset = {offset}')
280+
logger.debug(f'Paginating call, offset = {offset}')
95281
results = search_api.search_post_with_http_info(search, None, increment)
96282
modified = modified + results.data
97283

98-
print(f'Recieved {len(results.data)} results')
284+
logger.debug(f'Received {len(results.data)} results')
99285

100286
if len(results.data) < increment or (len(modified) >= max_limit and max_limit > 0):
101287
results.data = modified

0 commit comments

Comments
 (0)