Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions apps/plug_sync/lib/plug_sync/router.ex
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
defmodule PlugSync.Router do
use Plug.Router, copy_opts_to_assign: :options
use Phoenix.Sync.Router
use Phoenix.Sync.Controller

plug Plug.Logger, log: :debug
plug :match
plug :dispatch

sync "/items-mapped", table: "items", transform: &PlugSync.Router.map_item/1

get "/items-interruptible" do
sync_render(conn, fn -> [table: "items"] end)
end

match _ do
send_resp(conn, 404, "not found")
end
Expand Down
8 changes: 8 additions & 0 deletions lib/phoenix/sync/adapter/plug_api.ex
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
defprotocol Phoenix.Sync.Adapter.PlugApi do
@moduledoc false

@type response() :: term()

@spec predefined_shape(t(), Phoenix.Sync.PredefinedShape.t()) :: {:ok, t()} | {:error, term()}
def predefined_shape(api, shape)

@spec call(t(), Plug.Conn.t(), Plug.Conn.params()) :: Plug.Conn.t()
def call(api, conn, params)

@spec response(t(), Plug.Conn.t(), Plug.Conn.params()) :: response()
def response(api, conn, params)

@spec send_response(t(), Plug.Conn.t(), response()) :: Plug.Conn.t()
def send_response(api, conn, response)
end
10 changes: 7 additions & 3 deletions lib/phoenix/sync/controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,16 @@ defmodule Phoenix.Sync.Controller do
)

def sync_render(conn, shape_fun) when is_function(shape_fun, 0) do
conn = Plug.Conn.fetch_query_params(conn)

conn
|> Phoenix.Sync.Controller.configure_plug_conn!(@plug_assign_opts)
|> Phoenix.Sync.Controller.sync_render(conn.params, shape_fun)
end

def sync_render(conn, shape, shape_opts \\ []) do
conn = Plug.Conn.fetch_query_params(conn)

conn
|> Phoenix.Sync.Controller.configure_plug_conn!(@plug_assign_opts)
|> Phoenix.Sync.Controller.sync_render(conn.params, shape, shape_opts)
Expand Down Expand Up @@ -248,7 +252,7 @@ defmodule Phoenix.Sync.Controller do

{:ok, pid} =
Task.start_link(fn ->
send(parent, {:response, self(), Adapter.PlugApi.call(shape_api, conn, params)})
send(parent, {:response, self(), Adapter.PlugApi.response(shape_api, conn, params)})
end)

ref = Process.monitor(pid)
Expand All @@ -270,10 +274,10 @@ defmodule Phoenix.Sync.Controller do

interruptible_call(conn, api, params, shape_fun)

{:response, ^pid, conn} ->
{:response, ^pid, response} ->
Process.demonitor(ref, [:flush])

conn
Adapter.PlugApi.send_response(shape_api, conn, response)

{:DOWN, ^ref, :process, _pid, reason} ->
Plug.Conn.send_resp(conn, 500, inspect(reason))
Expand Down
39 changes: 38 additions & 1 deletion lib/phoenix/sync/electric.ex
Original file line number Diff line number Diff line change
Expand Up @@ -623,9 +623,25 @@ defmodule Phoenix.Sync.Electric do
def map_response_body(msgs, _mapper) do
msgs
end

if @electric_available? do
@doc false
# for the embedded api we need to make sure that the response stream is consumed
# in the same process that made the request, in order for cleanups to happen
# so we have to enumerate the body stream immediately before passing the response
# to any other process...
def consume_response_stream(%Electric.Shapes.Api.Response{} = response) do
Map.update!(response, :body, &do_consume_stream(&1))
end

defp do_consume_stream(body) do
Enum.to_list(body)
end
end
end

if Code.ensure_loaded?(Electric.Shapes.Api) do
if Code.ensure_loaded?(Electric.Shapes.Api) &&
Code.ensure_loaded?(Phoenix.Sync.Electric.ApiAdapter) do
defimpl Phoenix.Sync.Adapter.PlugApi, for: Electric.Shapes.Api do
alias Electric.Shapes

Expand Down Expand Up @@ -672,6 +688,27 @@ if Code.ensure_loaded?(Electric.Shapes.Api) do
Shapes.Api.options(conn)
end

def response(api, _conn, params) do
case Shapes.Api.validate(api, params) do
{:ok, request} ->
{
request,
Shapes.Api.serve_shape_log(request) |> Phoenix.Sync.Electric.consume_response_stream()
}

{:error, response} ->
{nil, response}
end
end

def send_response(%ApiAdapter{}, conn, {request, response}) do
conn
|> content_type()
|> Plug.Conn.assign(:request, request)
|> Plug.Conn.assign(:response, response)
|> Shapes.Api.Response.send(response)
end

defp content_type(conn) do
Plug.Conn.put_resp_content_type(conn, "application/json")
end
Expand Down
28 changes: 28 additions & 0 deletions lib/phoenix/sync/electric/api_adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,34 @@ if Code.ensure_loaded?(Electric.Shapes.Api) do
Phoenix.Sync.Adapter.PlugApi.call(api, conn, params)
end

# only works if method is GET...
def response(%ApiAdapter{api: api, shape: shape}, %{method: "GET"} = conn, params) do
if transform_fun = PredefinedShape.transform_fun(shape) do
case Shapes.Api.validate(api, params) do
{:ok, request} ->
response = Shapes.Api.serve_shape_log(request)
response = Map.update!(response, :body, &apply_transform(&1, transform_fun))
{request, response}

{:error, response} ->
{nil, response}
end
else
Phoenix.Sync.Adapter.PlugApi.response(api, conn, params)
|> then(fn {request, response} ->
{request, Phoenix.Sync.Electric.consume_response_stream(response)}
end)
end
end

def send_response(%ApiAdapter{}, conn, {request, response}) do
conn
|> content_type()
|> Plug.Conn.assign(:request, request)
|> Plug.Conn.assign(:response, response)
|> Shapes.Api.Response.send(response)
end

defp content_type(conn) do
Plug.Conn.put_resp_content_type(conn, "application/json")
end
Expand Down
48 changes: 35 additions & 13 deletions lib/phoenix/sync/electric/client_adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,63 @@ defmodule Phoenix.Sync.Electric.ClientAdapter do
}}
end

def call(sync_client, conn, params) do
{request, shape} = request(sync_client, conn, params)

fetch_upstream(sync_client, conn, request, shape)
end

def response(sync_client, %{method: "GET"} = conn, params) do
{request, shape} = request(sync_client, conn, params)

make_request(sync_client, conn, request, shape)
end

def send_response(_sync_client, conn, response) do
conn
|> put_resp_headers(response.headers)
|> Plug.Conn.send_resp(response.status, response.body)
end

# this is the server-defined shape route, so we want to only pass on the
# per-request/stream position params leaving the shape-definition params
# from the configured client.
def call(%{shape_definition: %PredefinedShape{} = shape} = sync_client, conn, params) do
request =
defp request(%{shape_definition: %PredefinedShape{} = shape} = sync_client, _conn, params) do
{
Client.request(
sync_client.client,
method: :get,
offset: params["offset"],
shape_handle: params["handle"],
live: live?(params["live"]),
next_cursor: params["cursor"]
)

fetch_upstream(sync_client, conn, request, shape)
),
shape
}
end

# this version is the pure client-defined shape version
def call(sync_client, %{method: method} = conn, params) do
request =
defp request(sync_client, %{method: method} = _conn, params) do
{
Client.request(
sync_client.client,
method: normalise_method(method),
params: params
)

fetch_upstream(sync_client, conn, request, nil)
),
nil
}
end

defp normalise_method(method), do: method |> String.downcase() |> String.to_atom()
defp live?(live), do: live == "true"

defp fetch_upstream(sync_client, conn, request, shape) do
response = make_request(sync_client, conn, request, shape)

send_response(sync_client, conn, response)
end

defp make_request(sync_client, conn, request, shape) do
request = put_req_headers(request, conn.req_headers)

response =
Expand All @@ -69,9 +93,7 @@ defmodule Phoenix.Sync.Electric.ClientAdapter do
response.body
end

conn
|> put_resp_headers(response.headers)
|> Plug.Conn.send_resp(response.status, body)
%{response | body: body}
end

defp put_req_headers(request, headers) do
Expand Down
18 changes: 18 additions & 0 deletions lib/phoenix/sync/sandbox/api_adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,24 @@ if Phoenix.Sync.sandbox_enabled?() do
end)
end

def response(%{shape: nil} = _adapter, conn, params) do
shape_api = lookup_api!()
PlugApi.response(shape_api, conn, params)
end

def response(%{shape: shape} = _adapter, conn, params) do
shape_api = lookup_api!()

Phoenix.Sync.Electric.api_predefined_shape(conn, shape_api, shape, fn conn, shape_api ->
PlugApi.response(shape_api, conn, params)
end)
end

def send_response(_adapter, conn, response) do
shape_api = lookup_api!()
PlugApi.send_response(shape_api, conn, response)
end

defp lookup_api!() do
Phoenix.Sync.Sandbox.retrieve_api!()
end
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Phoenix.Sync.MixProject do

# Remember to update the README when you change the version
@version "0.6.0"
@electric_version "~> 1.1.9 and >= 1.1.9"
@electric_version ">= 1.1.9 and <= 1.1.10"

def project do
[
Expand Down
Loading