Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ electric_phoenix-*.tar

# Temporary files, for example, from tests.
/tmp/
.claude

2 changes: 1 addition & 1 deletion apps/phoenix_sync_example/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ defmodule PhoenixSyncExample.MixProject do
{:dns_cluster, "~> 0.1.1"},
{:bandit, "~> 1.5"},
{:phoenix_sync, path: "../.."},
{:electric, "~> 1.1.0"}
{:electric, "~> 1.0"}
]
end

Expand Down
35 changes: 18 additions & 17 deletions apps/phoenix_sync_example/mix.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion apps/plug_sync/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ defmodule PlugSync.MixProject do
{:bandit, "~> 1.0"},
{:postgrex, "~> 0.21"},
{:ecto_sql, "~> 3.0"},
{:electric, "~> 1.1.2"},
{:electric, "~> 1.0"},
{:phoenix_sync, [path: "../..", override: true]},
{:igniter, "~> 0.6"}
]
Expand Down
33 changes: 17 additions & 16 deletions apps/plug_sync/mix.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion apps/txid_match/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ defmodule TXIDMatch.MixProject do
# Run "mix help deps" to learn about dependencies.
defp deps do
[
{:electric, "~> 1.1.1", override: true},
{:electric, "~> 1.0", override: true},
{:electric_client, "~> 0.7", override: true},
{:nimble_options, "~> 1.1"},
{:phoenix_live_view, "~> 1.0", optional: true},
Expand Down
2 changes: 1 addition & 1 deletion lib/mix/tasks/phoenix_sync.install.ex
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ if Code.ensure_loaded?(Igniter) do
defp required_electric_version do
Phoenix.Sync.MixProject.project()
|> Keyword.fetch!(:deps)
|> Enum.find(&match?({:electric, _, _}, &1))
|> Enum.find(&(elem(&1, 0) == :electric))
|> elem(1)
end

Expand Down
196 changes: 189 additions & 7 deletions lib/phoenix/sync/electric.ex
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,11 @@ defmodule Phoenix.Sync.Electric do
defp http_mode_plug_opts(electric_config) do
with {:ok, client} <- configure_client(electric_config, :http) do
# don't decode the body - just pass it directly
client = %{client | fetch: {Electric.Client.Fetch.HTTP, [request: [raw: true]]}}
request_opts =
Keyword.get(electric_config, :request_opts, [])
|> Keyword.merge(raw: true)

client = %{client | fetch: {Electric.Client.Fetch.HTTP, [request: request_opts]}}
{:ok, %Phoenix.Sync.Electric.ClientAdapter{client: client}}
end
end
Expand Down Expand Up @@ -590,6 +594,178 @@ defmodule Phoenix.Sync.Electric do
end
end

@subset_body_keys ~w(where order_by limit offset params where_expr order_by_expr)
# In flattened POST params, root offset may be the stream cursor, so treat offset
# as subset-only only when we can reliably separate query/body params.
@subset_body_keys_without_offset @subset_body_keys -- ["offset"]

@doc false
def normalize_subset_params(params, method \\ "GET")

def normalize_subset_params(%Plug.Conn{} = conn, params) when is_map(params) do
case conn.method do
"POST" ->
query_params = map_or_empty(conn.query_params) |> normalize_prefixed_subset_params()
body_params = map_or_empty(conn.body_params)
path_params = map_or_empty(conn.path_params)

# Match sync-service behavior by nesting subset body keys separately
# from query params, preserving stream offset/handle from query/path.
if map_size(query_params) == 0 and map_size(body_params) == 0 do
normalize_subset_params(params, conn.method)
else
query_params
|> merge_subset_body_params(body_params)
|> Map.merge(path_params)
end

_ ->
normalize_subset_params(params, conn.method)
end
end

def normalize_subset_params(params, method) when is_map(params) do
params
|> normalize_prefixed_subset_params()
|> maybe_normalize_subset_body_params(method)
end

defp normalize_prefixed_subset_params(params) when is_map(params) do
{subset_prefixed_params, rest} =
Enum.reduce(params, {%{}, %{}}, fn {key, value}, {subset_acc, rest_acc} ->
case subset_param_name(key) do
{:ok, subset_key} ->
{Map.put(subset_acc, subset_key, value), rest_acc}

:error ->
{subset_acc, Map.put(rest_acc, key, value)}
end
end)

if map_size(subset_prefixed_params) == 0 do
params
else
existing_subset = existing_subset(rest)

rest
|> Map.drop(["subset", :subset])
|> Map.put("subset", Map.merge(existing_subset, subset_prefixed_params))
end
end

# POST subset snapshots send params in the body as plain subset keys
# (where/params/limit/offset/order_by), which need nesting under "subset".
defp maybe_normalize_subset_body_params(params, method)
when method in ["POST", :post] and is_map(params) do
{subset_body_params, rest} =
Enum.reduce(params, {%{}, %{}}, fn {key, value}, {subset_acc, rest_acc} ->
case subset_body_param_name(key, preserve_stream_offset?: true) do
{:ok, subset_key} ->
{Map.put(subset_acc, subset_key, value), rest_acc}

:error ->
{subset_acc, Map.put(rest_acc, key, value)}
end
end)

if map_size(subset_body_params) == 0 do
params
else
existing_subset = existing_subset(rest)

rest
|> Map.drop(["subset", :subset])
|> Map.put("subset", Map.merge(existing_subset, subset_body_params))
end
end

defp maybe_normalize_subset_body_params(params, _method), do: params

defp subset_param_name(key) when is_binary(key) do
if String.starts_with?(key, "subset__") do
{:ok, String.replace_prefix(key, "subset__", "")}
else
:error
end
end

defp subset_param_name(key) when is_atom(key) do
key
|> Atom.to_string()
|> subset_param_name()
end

defp subset_param_name(_), do: :error

defp subset_body_param_name(key, opts \\ [])

defp subset_body_param_name(key, opts) when is_binary(key) do
keys =
if opts[:preserve_stream_offset?],
do: @subset_body_keys_without_offset,
else: @subset_body_keys

if key in keys, do: {:ok, key}, else: :error
end

defp subset_body_param_name(key, opts) when is_atom(key) do
key
|> Atom.to_string()
|> subset_body_param_name(opts)
end

defp subset_body_param_name(_, _opts), do: :error

defp existing_subset(params) when is_map(params) do
params
|> Map.take(["subset", :subset])
|> Map.values()
|> Enum.filter(&is_map/1)
|> Enum.reduce(%{}, &Map.merge(&2, &1))
end

defp merge_subset_body_params(query_params, body_params) when map_size(body_params) == 0 do
query_params
end

defp merge_subset_body_params(query_params, body_params) when is_map(body_params) do
case existing_subset(body_params) do
subset when map_size(subset) > 0 ->
existing_subset = existing_subset(query_params)

query_params
|> Map.merge(body_params)
|> Map.drop([:subset, "subset"])
|> Map.put("subset", Map.merge(existing_subset, subset))

_ ->
{subset_params, other_params} =
Enum.reduce(body_params, {%{}, %{}}, fn {key, value}, {subset_acc, rest_acc} ->
case subset_body_param_name(key) do
{:ok, subset_key} ->
{Map.put(subset_acc, subset_key, value), rest_acc}

:error ->
{subset_acc, Map.put(rest_acc, key, value)}
end
end)

if map_size(subset_params) > 0 do
existing_subset = existing_subset(query_params)

query_params
|> Map.merge(other_params)
|> Map.put("subset", Map.merge(existing_subset, subset_params))
else
Map.merge(query_params, body_params)
end
end
end

defp map_or_empty(%Plug.Conn.Unfetched{}), do: %{}
defp map_or_empty(map) when is_map(map), do: map
defp map_or_empty(_), do: %{}

@json Phoenix.Sync.json_library()

@doc false
Expand Down Expand Up @@ -640,8 +816,9 @@ defmodule Phoenix.Sync.Electric do
end
end

if Code.ensure_loaded?(Electric.Shapes.Api) &&
Code.ensure_loaded?(Phoenix.Sync.Electric.ApiAdapter) do
if Code.ensure_loaded?(Electric.Shapes.Api) do
Code.ensure_loaded(Phoenix.Sync.Electric.ApiAdapter)

defimpl Phoenix.Sync.Adapter.PlugApi, for: Electric.Shapes.Api do
alias Electric.Shapes

Expand All @@ -653,12 +830,14 @@ if Code.ensure_loaded?(Electric.Shapes.Api) &&
end

def call(api, %{method: "GET"} = conn, params) do
params = Phoenix.Sync.Electric.normalize_subset_params(conn, params)

case Shapes.Api.validate(api, params) do
{:ok, request} ->
conn
|> content_type()
|> Plug.Conn.assign(:request, request)
|> Shapes.Api.serve_shape_log(request)
|> Shapes.Api.serve_shape_response(request)

{:error, response} ->
conn
Expand Down Expand Up @@ -688,20 +867,23 @@ if Code.ensure_loaded?(Electric.Shapes.Api) &&
Shapes.Api.options(conn)
end

def response(api, _conn, params) do
def response(api, conn, params) do
params = Phoenix.Sync.Electric.normalize_subset_params(conn, params)

case Shapes.Api.validate(api, params) do
{:ok, request} ->
{
request,
Shapes.Api.serve_shape_log(request) |> Phoenix.Sync.Electric.consume_response_stream()
Shapes.Api.serve_shape_response(request)
|> Phoenix.Sync.Electric.consume_response_stream()
}

{:error, response} ->
{nil, response}
end
end

def send_response(%ApiAdapter{}, conn, {request, response}) do
def send_response(_api, conn, {request, response}) do
conn
|> content_type()
|> Plug.Conn.assign(:request, request)
Expand Down
9 changes: 7 additions & 2 deletions lib/phoenix/sync/electric/api_adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ if Code.ensure_loaded?(Electric.Shapes.Api) do
end

def call(%ApiAdapter{api: api, shape: shape}, %{method: "GET"} = conn, params) do
params = Phoenix.Sync.Electric.normalize_subset_params(conn, params)

if transform_fun = PredefinedShape.transform_fun(shape) do
case Shapes.Api.validate(api, params) do
{:ok, request} ->
response = Shapes.Api.serve_shape_log(request)
response = Shapes.Api.serve_shape_response(request)
response = Map.update!(response, :body, &apply_transform(&1, transform_fun))

conn
Expand All @@ -47,15 +49,18 @@ if Code.ensure_loaded?(Electric.Shapes.Api) do
end

def call(%ApiAdapter{api: api}, conn, params) do
params = Phoenix.Sync.Electric.normalize_subset_params(conn, params)
Phoenix.Sync.Adapter.PlugApi.call(api, conn, params)
end

# only works if method is GET...
def response(%ApiAdapter{api: api, shape: shape}, %{method: "GET"} = conn, params) do
params = Phoenix.Sync.Electric.normalize_subset_params(conn, params)

if transform_fun = PredefinedShape.transform_fun(shape) do
case Shapes.Api.validate(api, params) do
{:ok, request} ->
response = Shapes.Api.serve_shape_log(request)
response = Shapes.Api.serve_shape_response(request)
response = Map.update!(response, :body, &apply_transform(&1, transform_fun))
{request, response}

Expand Down
10 changes: 7 additions & 3 deletions lib/phoenix/sync/electric/client_adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ defmodule Phoenix.Sync.Electric.ClientAdapter do
end

# this is the server-defined shape route, so we want to only pass on the
# per-request/stream position params leaving the shape-definition params
# from the configured client.
# per-request/stream position params and subset query params, leaving
# the shape-definition params from the configured client.
defp request(%{shape_definition: %PredefinedShape{} = shape} = sync_client, _conn, params) do
{
Client.request(
Expand All @@ -47,7 +47,8 @@ defmodule Phoenix.Sync.Electric.ClientAdapter do
offset: params["offset"],
shape_handle: params["handle"],
live: live?(params["live"]),
next_cursor: params["cursor"]
next_cursor: params["cursor"],
params: subset_request_params(params)
),
shape
}
Expand All @@ -68,6 +69,9 @@ defmodule Phoenix.Sync.Electric.ClientAdapter do
defp normalise_method(method), do: method |> String.downcase() |> String.to_atom()
defp live?(live), do: live == "true"

defp subset_request_params(params),
do: Map.filter(params, fn {key, _} -> String.starts_with?(key, "subset__") end)

defp fetch_upstream(sync_client, conn, request, shape) do
response = make_request(sync_client, conn, request, shape)

Expand Down
4 changes: 3 additions & 1 deletion lib/phoenix/sync/plug/cors.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ defmodule Phoenix.Sync.Plug.CORS do
@electric_headers [
"electric-cursor",
"electric-handle",
"electric-has-data",
"electric-offset",
"electric-schema",
"electric-up-to-date",
"electric-internal-known-error"
"electric-internal-known-error",
"retry-after"
]

@expose_headers ["transfer-encoding" | @electric_headers]
Expand Down
Loading