22
33import com .ditto .java .*;
44import com .ditto .java .serialization .DittoCborSerializable ;
5+
56import jakarta .annotation .Nonnull ;
67import org .springframework .stereotype .Component ;
78import reactor .core .publisher .Flux ;
1314
1415@ Component
1516public class DittoTaskService {
16-
1717 private static final String TASKS_COLLECTION_NAME = "tasks" ;
1818
1919 private final DittoService dittoService ;
@@ -24,185 +24,118 @@ public DittoTaskService(DittoService dittoService) {
2424
2525 public void addTask (@ Nonnull String title ) {
2626 try {
27- DittoQueryResult result = dittoService
28- .getDitto ()
29- .getStore ()
30- .execute (
31- "INSERT INTO %s DOCUMENTS (:newTask)" .formatted (
32- TASKS_COLLECTION_NAME
33- ),
27+ dittoService .getDitto ().getStore ().execute (
28+ "INSERT INTO %s DOCUMENTS (:newTask)" .formatted (TASKS_COLLECTION_NAME ),
3429 DittoCborSerializable .Dictionary .buildDictionary ()
35- .put (
36- "newTask" ,
37- DittoCborSerializable .Dictionary .buildDictionary ()
38- .put ("_id" , UUID .randomUUID ().toString ())
39- .put ("title" , title )
40- .put ("done" , false )
41- .put ("deleted" , false )
42- .build ()
43- )
44- .build ()
45- )
46- .toCompletableFuture ()
47- .join ();
48- result .close ();
49- } catch (Error | DittoException e ) {
30+ .put (
31+ "newTask" ,
32+ DittoCborSerializable .Dictionary .buildDictionary ()
33+ .put ("_id" , UUID .randomUUID ().toString ())
34+ .put ("title" , title )
35+ .put ("done" , false )
36+ .put ("deleted" , false )
37+ .build ()
38+ )
39+ .build ()
40+ ).toCompletableFuture ().join ();
41+ } catch (Error e ) {
5042 throw new RuntimeException (e );
5143 }
5244 }
5345
5446 public void toggleTaskDone (@ Nonnull String taskId ) {
5547 try {
56- boolean isDone ;
57- try (DittoQueryResult tasks = dittoService
58- .getDitto ()
59- .getStore ()
60- .execute (
61- "SELECT * FROM %s WHERE _id = :taskId" .formatted (
62- TASKS_COLLECTION_NAME
63- ),
48+ DittoQueryResult tasks = dittoService .getDitto ().getStore ().execute (
49+ "SELECT * FROM %s WHERE _id = :taskId" .formatted (TASKS_COLLECTION_NAME ),
6450 DittoCborSerializable .Dictionary .buildDictionary ()
65- .put ("taskId" , taskId )
66- .build ()
67- )
68- .toCompletableFuture ()
69- .join ()
70- ) {
71- isDone = tasks
72- .getItems ()
73- .get (0 )
74- .getValue ()
75- .get ("done" )
76- .asBoolean ();
77- }
51+ .put ("taskId" , taskId )
52+ .build ()
53+ ).toCompletableFuture ().join ();
54+
55+ boolean isDone = tasks .getItems ().get (0 ).getValue ().get ("done" ).asBoolean ();
7856
79- DittoQueryResult result = dittoService
80- .getDitto ()
81- .getStore ()
82- .execute (
83- "UPDATE %s SET done = :done WHERE _id = :taskId" .formatted (
84- TASKS_COLLECTION_NAME
85- ),
57+ dittoService .getDitto ().getStore ().execute (
58+ "UPDATE %s SET done = :done WHERE _id = :taskId" .formatted (TASKS_COLLECTION_NAME ),
8659 DittoCborSerializable .Dictionary .buildDictionary ()
87- .put ("done" , !isDone )
88- .put ("taskId" , taskId )
89- .build ()
90- )
91- .toCompletableFuture ()
92- .join ();
93- result .close ();
60+ .put ("done" , !isDone )
61+ .put ("taskId" , taskId )
62+ .build ()
63+ ).toCompletableFuture ().join ();
9464 } catch (Error | DittoException e ) {
9565 throw new RuntimeException (e );
9666 }
9767 }
9868
9969 public void deleteTask (@ Nonnull String taskId ) {
10070 try {
101- DittoQueryResult result = dittoService
102- .getDitto ()
103- .getStore ()
104- .execute (
105- "UPDATE %s SET deleted = :deleted WHERE _id = :taskId" .formatted (
106- TASKS_COLLECTION_NAME
107- ),
71+ dittoService .getDitto ().getStore ().execute (
72+ "UPDATE %s SET deleted = :deleted WHERE _id = :taskId" .formatted (TASKS_COLLECTION_NAME ),
10873 DittoCborSerializable .Dictionary .buildDictionary ()
109- .put ("deleted" , true )
110- .put ("taskId" , taskId )
111- .build ()
112- )
113- .toCompletableFuture ()
114- .join ();
115- result .close ();
116- } catch (Error | DittoException e ) {
74+ .put ("deleted" , true )
75+ .put ("taskId" , taskId )
76+ .build ()
77+ ).toCompletableFuture ().join ();
78+ } catch (Error e ) {
11779 throw new RuntimeException (e );
11880 }
11981 }
12082
12183 public void updateTask (@ Nonnull String taskId , @ Nonnull String newTitle ) {
12284 try {
123- DittoQueryResult result = dittoService
124- .getDitto ()
125- .getStore ()
126- .execute (
127- "UPDATE %s SET title = :title WHERE _id = :taskId" .formatted (
128- TASKS_COLLECTION_NAME
129- ),
85+ dittoService .getDitto ().getStore ().execute (
86+ "UPDATE %s SET title = :title WHERE _id = :taskId" .formatted (TASKS_COLLECTION_NAME ),
13087 DittoCborSerializable .Dictionary .buildDictionary ()
131- .put ("title" , newTitle )
132- .put ("taskId" , taskId )
133- .build ()
134- )
135- .toCompletableFuture ()
136- .join ();
137- result .close ();
138- } catch (Error | DittoException e ) {
88+ .put ("title" , newTitle )
89+ .put ("taskId" , taskId )
90+ .build ()
91+ ).toCompletableFuture ().join ();
92+ } catch (Error e ) {
13993 throw new RuntimeException (e );
14094 }
14195 }
14296
14397 @ Nonnull
14498 public Flux <List <Task >> observeAll () {
145- final String subscriptionQuery =
146- "SELECT * FROM %s WHERE NOT deleted" .formatted (TASKS_COLLECTION_NAME );
99+ final String subscriptionQuery = "SELECT * FROM %s WHERE NOT deleted" .formatted (TASKS_COLLECTION_NAME );
147100 final String displayQuery = subscriptionQuery + " ORDER BY title ASC" ;
148101
149- return Flux .create (
150- emitter -> {
151- Ditto ditto = dittoService .getDitto ();
152- try {
153- @ SuppressWarnings ("resource" )
154- DittoSyncSubscription subscription = ditto
155- .getSync ()
156- .registerSubscription (subscriptionQuery );
157-
158- @ SuppressWarnings ("resource" )
159- DittoStoreObserver observer = ditto
160- .getStore ()
161- .registerObserver (displayQuery , results ->
162- emitter .next (
163- results
164- .getItems ()
165- .stream ()
166- .map (item -> {
167- try {
168- return this .itemToTask (item );
169- } catch (Exception e ) {
170- emitter .error (e );
171- throw new RuntimeException (e );
172- }
173- })
174- .toList ()
175- )
176- );
177-
178- emitter .onDispose (() -> {
179- // TODO: Can't just catch, this potentially leaks the `observer` resource.
180- try {
181- subscription .close ();
182- } catch (IOException e ) {
183- throw new RuntimeException (e );
184- }
185- try {
186- observer .close ();
187- } catch (DittoException e ) {
188- throw new RuntimeException (e );
189- }
190- });
191- } catch (DittoException e ) {
192- emitter .error (e );
193- }
194- },
195- FluxSink .OverflowStrategy .LATEST
196- );
102+ return Flux .create (emitter -> {
103+ Ditto ditto = dittoService .getDitto ();
104+ try {
105+ DittoSyncSubscription subscription = ditto .getSync ().registerSubscription (subscriptionQuery );
106+ DittoStoreObserver observer = ditto .getStore ().registerObserver (displayQuery , results ->
107+ emitter .next (results .getItems ().stream ().map (this ::itemToTask ).toList ()));
108+
109+ emitter .onDispose (() -> {
110+ // TODO: Can't just catch, this potentially leaks the `observer` resource.
111+ try {
112+ subscription .close ();
113+ } catch (IOException e ) {
114+ throw new RuntimeException (e );
115+ }
116+ try {
117+ observer .close ();
118+ } catch (DittoException e ) {
119+ throw new RuntimeException (e );
120+ }
121+ });
122+ } catch (DittoException e ) {
123+ emitter .error (e );
124+ }
125+ }, FluxSink .OverflowStrategy .LATEST );
197126 }
198127
199- private Task itemToTask (@ Nonnull DittoQueryResultItem item ) throws DittoException {
128+ private Task itemToTask (@ Nonnull DittoQueryResultItem item ) {
200129 DittoCborSerializable .Dictionary value = item .getValue ();
201- return new Task (
202- value .get ("_id" ).asString (),
203- value .get ("title" ).asString (),
204- value .get ("done" ).asBoolean (),
205- value .get ("deleted" ).asBoolean ()
206- );
130+ try {
131+ return new Task (
132+ value .get ("_id" ).asString (),
133+ value .get ("title" ).asString (),
134+ value .get ("done" ).asBoolean (),
135+ value .get ("deleted" ).asBoolean ()
136+ );
137+ } catch (DittoException e ) {
138+ throw new RuntimeException (e );
139+ }
207140 }
208141}
0 commit comments