1919import com .pipedream .api .resources .proxy .requests .PatchProxyRequest ;
2020import com .pipedream .api .resources .proxy .requests .PostProxyRequest ;
2121import com .pipedream .api .resources .proxy .requests .PutProxyRequest ;
22+ import com .pipedream .api .resources .proxy .types .ProxyResponse ;
2223import java .io .IOException ;
23- import java .io .InputStream ;
2424import java .util .concurrent .CompletableFuture ;
2525import okhttp3 .Call ;
2626import okhttp3 .Callback ;
2727import okhttp3 .Headers ;
2828import okhttp3 .HttpUrl ;
29+ import okhttp3 .MediaType ;
2930import okhttp3 .OkHttpClient ;
3031import okhttp3 .Request ;
3132import okhttp3 .RequestBody ;
@@ -40,17 +41,57 @@ public AsyncRawProxyClient(ClientOptions clientOptions) {
4041 this .clientOptions = clientOptions ;
4142 }
4243
44+ private static boolean isJsonContentType (MediaType contentType ) {
45+ if (contentType == null ) {
46+ return false ;
47+ }
48+ return "application" .equals (contentType .type ()) && "json" .equals (contentType .subtype ());
49+ }
50+
51+ private static void handleSuccessResponse (
52+ Response response ,
53+ ResponseBody responseBody ,
54+ CompletableFuture <BaseClientHttpResponse <ProxyResponse >> future ) {
55+ MediaType contentType = responseBody != null ? responseBody .contentType () : null ;
56+ String contentTypeString = contentType != null ? contentType .toString () : null ;
57+ ProxyResponse proxyBody ;
58+ if (isJsonContentType (contentType )) {
59+ try {
60+ String responseBodyString = responseBody != null ? responseBody .string () : "null" ;
61+ Object parsed = ObjectMappers .JSON_MAPPER .readValue (responseBodyString , Object .class );
62+ proxyBody = ProxyResponse .json (parsed , contentTypeString );
63+ } catch (JsonProcessingException e ) {
64+ future .completeExceptionally (new BaseClientException (
65+ "Response Content-Type was application/json but body is not valid JSON" , e ));
66+ return ;
67+ } catch (IOException e ) {
68+ future .completeExceptionally (new BaseClientException ("Error reading response body" , e ));
69+ return ;
70+ } finally {
71+ response .close ();
72+ }
73+ } else {
74+ try {
75+ proxyBody = ProxyResponse .stream (new ResponseBodyInputStream (response ), contentTypeString );
76+ } catch (IOException e ) {
77+ future .completeExceptionally (new BaseClientException ("Error creating response stream" , e ));
78+ return ;
79+ }
80+ }
81+ future .complete (new BaseClientHttpResponse <>(proxyBody , response ));
82+ }
83+
4384 /**
4485 * Forward an authenticated GET request to an external API using an external user's account credentials
4586 */
46- public CompletableFuture <BaseClientHttpResponse <InputStream >> get (String url64 , GetProxyRequest request ) {
87+ public CompletableFuture <BaseClientHttpResponse <ProxyResponse >> get (String url64 , GetProxyRequest request ) {
4788 return get (url64 , request , null );
4889 }
4990
5091 /**
5192 * Forward an authenticated GET request to an external API using an external user's account credentials
5293 */
53- public CompletableFuture <BaseClientHttpResponse <InputStream >> get (
94+ public CompletableFuture <BaseClientHttpResponse <ProxyResponse >> get (
5495 String url64 , GetProxyRequest request , RequestOptions requestOptions ) {
5596 HttpUrl .Builder httpUrl = HttpUrl .parse (this .clientOptions .environment ().getUrl ())
5697 .newBuilder ()
@@ -70,14 +111,14 @@ public CompletableFuture<BaseClientHttpResponse<InputStream>> get(
70111 if (requestOptions != null && requestOptions .getTimeout ().isPresent ()) {
71112 client = clientOptions .httpClientWithTimeout (requestOptions );
72113 }
73- CompletableFuture <BaseClientHttpResponse <InputStream >> future = new CompletableFuture <>();
114+ CompletableFuture <BaseClientHttpResponse <ProxyResponse >> future = new CompletableFuture <>();
74115 client .newCall (okhttpRequest ).enqueue (new Callback () {
75116 @ Override
76117 public void onResponse (@ NotNull Call call , @ NotNull Response response ) throws IOException {
77118 try {
78119 ResponseBody responseBody = response .body ();
79120 if (response .isSuccessful ()) {
80- future . complete ( new BaseClientHttpResponse <>( new ResponseBodyInputStream ( response ), response ) );
121+ handleSuccessResponse ( response , responseBody , future );
81122 return ;
82123 }
83124 String responseBodyString = responseBody != null ? responseBody .string () : "{}" ;
@@ -112,14 +153,14 @@ public void onFailure(@NotNull Call call, @NotNull IOException e) {
112153 /**
113154 * Forward an authenticated POST request to an external API using an external user's account credentials
114155 */
115- public CompletableFuture <BaseClientHttpResponse <InputStream >> post (String url64 , PostProxyRequest request ) {
156+ public CompletableFuture <BaseClientHttpResponse <ProxyResponse >> post (String url64 , PostProxyRequest request ) {
116157 return post (url64 , request , null );
117158 }
118159
119160 /**
120161 * Forward an authenticated POST request to an external API using an external user's account credentials
121162 */
122- public CompletableFuture <BaseClientHttpResponse <InputStream >> post (
163+ public CompletableFuture <BaseClientHttpResponse <ProxyResponse >> post (
123164 String url64 , PostProxyRequest request , RequestOptions requestOptions ) {
124165 HttpUrl .Builder httpUrl = HttpUrl .parse (this .clientOptions .environment ().getUrl ())
125166 .newBuilder ()
@@ -147,14 +188,14 @@ public CompletableFuture<BaseClientHttpResponse<InputStream>> post(
147188 if (requestOptions != null && requestOptions .getTimeout ().isPresent ()) {
148189 client = clientOptions .httpClientWithTimeout (requestOptions );
149190 }
150- CompletableFuture <BaseClientHttpResponse <InputStream >> future = new CompletableFuture <>();
191+ CompletableFuture <BaseClientHttpResponse <ProxyResponse >> future = new CompletableFuture <>();
151192 client .newCall (okhttpRequest ).enqueue (new Callback () {
152193 @ Override
153194 public void onResponse (@ NotNull Call call , @ NotNull Response response ) throws IOException {
154195 try {
155196 ResponseBody responseBody = response .body ();
156197 if (response .isSuccessful ()) {
157- future . complete ( new BaseClientHttpResponse <>( new ResponseBodyInputStream ( response ), response ) );
198+ handleSuccessResponse ( response , responseBody , future );
158199 return ;
159200 }
160201 String responseBodyString = responseBody != null ? responseBody .string () : "{}" ;
@@ -189,14 +230,14 @@ public void onFailure(@NotNull Call call, @NotNull IOException e) {
189230 /**
190231 * Forward an authenticated PUT request to an external API using an external user's account credentials
191232 */
192- public CompletableFuture <BaseClientHttpResponse <InputStream >> put (String url64 , PutProxyRequest request ) {
233+ public CompletableFuture <BaseClientHttpResponse <ProxyResponse >> put (String url64 , PutProxyRequest request ) {
193234 return put (url64 , request , null );
194235 }
195236
196237 /**
197238 * Forward an authenticated PUT request to an external API using an external user's account credentials
198239 */
199- public CompletableFuture <BaseClientHttpResponse <InputStream >> put (
240+ public CompletableFuture <BaseClientHttpResponse <ProxyResponse >> put (
200241 String url64 , PutProxyRequest request , RequestOptions requestOptions ) {
201242 HttpUrl .Builder httpUrl = HttpUrl .parse (this .clientOptions .environment ().getUrl ())
202243 .newBuilder ()
@@ -224,14 +265,14 @@ public CompletableFuture<BaseClientHttpResponse<InputStream>> put(
224265 if (requestOptions != null && requestOptions .getTimeout ().isPresent ()) {
225266 client = clientOptions .httpClientWithTimeout (requestOptions );
226267 }
227- CompletableFuture <BaseClientHttpResponse <InputStream >> future = new CompletableFuture <>();
268+ CompletableFuture <BaseClientHttpResponse <ProxyResponse >> future = new CompletableFuture <>();
228269 client .newCall (okhttpRequest ).enqueue (new Callback () {
229270 @ Override
230271 public void onResponse (@ NotNull Call call , @ NotNull Response response ) throws IOException {
231272 try {
232273 ResponseBody responseBody = response .body ();
233274 if (response .isSuccessful ()) {
234- future . complete ( new BaseClientHttpResponse <>( new ResponseBodyInputStream ( response ), response ) );
275+ handleSuccessResponse ( response , responseBody , future );
235276 return ;
236277 }
237278 String responseBodyString = responseBody != null ? responseBody .string () : "{}" ;
@@ -266,14 +307,14 @@ public void onFailure(@NotNull Call call, @NotNull IOException e) {
266307 /**
267308 * Forward an authenticated DELETE request to an external API using an external user's account credentials
268309 */
269- public CompletableFuture <BaseClientHttpResponse <InputStream >> delete (String url64 , DeleteProxyRequest request ) {
310+ public CompletableFuture <BaseClientHttpResponse <ProxyResponse >> delete (String url64 , DeleteProxyRequest request ) {
270311 return delete (url64 , request , null );
271312 }
272313
273314 /**
274315 * Forward an authenticated DELETE request to an external API using an external user's account credentials
275316 */
276- public CompletableFuture <BaseClientHttpResponse <InputStream >> delete (
317+ public CompletableFuture <BaseClientHttpResponse <ProxyResponse >> delete (
277318 String url64 , DeleteProxyRequest request , RequestOptions requestOptions ) {
278319 HttpUrl .Builder httpUrl = HttpUrl .parse (this .clientOptions .environment ().getUrl ())
279320 .newBuilder ()
@@ -293,14 +334,14 @@ public CompletableFuture<BaseClientHttpResponse<InputStream>> delete(
293334 if (requestOptions != null && requestOptions .getTimeout ().isPresent ()) {
294335 client = clientOptions .httpClientWithTimeout (requestOptions );
295336 }
296- CompletableFuture <BaseClientHttpResponse <InputStream >> future = new CompletableFuture <>();
337+ CompletableFuture <BaseClientHttpResponse <ProxyResponse >> future = new CompletableFuture <>();
297338 client .newCall (okhttpRequest ).enqueue (new Callback () {
298339 @ Override
299340 public void onResponse (@ NotNull Call call , @ NotNull Response response ) throws IOException {
300341 try {
301342 ResponseBody responseBody = response .body ();
302343 if (response .isSuccessful ()) {
303- future . complete ( new BaseClientHttpResponse <>( new ResponseBodyInputStream ( response ), response ) );
344+ handleSuccessResponse ( response , responseBody , future );
304345 return ;
305346 }
306347 String responseBodyString = responseBody != null ? responseBody .string () : "{}" ;
@@ -335,14 +376,14 @@ public void onFailure(@NotNull Call call, @NotNull IOException e) {
335376 /**
336377 * Forward an authenticated PATCH request to an external API using an external user's account credentials
337378 */
338- public CompletableFuture <BaseClientHttpResponse <InputStream >> patch (String url64 , PatchProxyRequest request ) {
379+ public CompletableFuture <BaseClientHttpResponse <ProxyResponse >> patch (String url64 , PatchProxyRequest request ) {
339380 return patch (url64 , request , null );
340381 }
341382
342383 /**
343384 * Forward an authenticated PATCH request to an external API using an external user's account credentials
344385 */
345- public CompletableFuture <BaseClientHttpResponse <InputStream >> patch (
386+ public CompletableFuture <BaseClientHttpResponse <ProxyResponse >> patch (
346387 String url64 , PatchProxyRequest request , RequestOptions requestOptions ) {
347388 HttpUrl .Builder httpUrl = HttpUrl .parse (this .clientOptions .environment ().getUrl ())
348389 .newBuilder ()
@@ -370,14 +411,14 @@ public CompletableFuture<BaseClientHttpResponse<InputStream>> patch(
370411 if (requestOptions != null && requestOptions .getTimeout ().isPresent ()) {
371412 client = clientOptions .httpClientWithTimeout (requestOptions );
372413 }
373- CompletableFuture <BaseClientHttpResponse <InputStream >> future = new CompletableFuture <>();
414+ CompletableFuture <BaseClientHttpResponse <ProxyResponse >> future = new CompletableFuture <>();
374415 client .newCall (okhttpRequest ).enqueue (new Callback () {
375416 @ Override
376417 public void onResponse (@ NotNull Call call , @ NotNull Response response ) throws IOException {
377418 try {
378419 ResponseBody responseBody = response .body ();
379420 if (response .isSuccessful ()) {
380- future . complete ( new BaseClientHttpResponse <>( new ResponseBodyInputStream ( response ), response ) );
421+ handleSuccessResponse ( response , responseBody , future );
381422 return ;
382423 }
383424 String responseBodyString = responseBody != null ? responseBody .string () : "{}" ;
0 commit comments