From abc6bbd4afe2d3c582be81beb06cc8f829004142 Mon Sep 17 00:00:00 2001 From: Alex Tavarez Date: Tue, 2 Sep 2025 17:13:10 -0400 Subject: [PATCH] Added module for useful authentication actions --- lib/sukaato/auth.ex | 190 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 190 insertions(+) create mode 100644 lib/sukaato/auth.ex diff --git a/lib/sukaato/auth.ex b/lib/sukaato/auth.ex new file mode 100644 index 0000000..b106307 --- /dev/null +++ b/lib/sukaato/auth.ex @@ -0,0 +1,190 @@ +defmodule Sukaato.Auth do + import Argon2 + import Ecto.Query + require Integer + alias Sukaato.{Repo, User} + # alias Sukaato.StandardQueries + use SukaatoWeb, :controller + + @rel_proj_root "../.." + @site_config_file Path.expand(@rel_proj_root <> "/site.toml", __DIR__) + @site_config Toml.decode_file(@site_config_file) + @admin_username elem(@site_config, 1)["site"]["username"] + # @fqdn elem(@site_config, 1)["site"]["fqdn"] + # @admin_files_path Path.expand(@rel_proj_root <> "/priv/static/files/users/" <> @admin_username, __DIR__) + # @markdown_posts_path Path.expand(@rel_proj_root <> "/lib/sukaato_web/controllers/page_md/users/" <> @admin_username <> "/posts", __DIR__) + + @tables %{ + user: User + } + + def generate_user_session(conn, user_id \\ @admin_username, authenticated, session_key \\ "login_token") when is_binary(user_id) and is_boolean(authenticated) and is_binary(session_key) do + if authenticated do + user_token = :crypto.strong_rand_bytes(32) |> Base.encode64 |> binary_part(0, 32) + + conn = put_session(conn, session_key, user_id <> ":" <> user_token) + conn + else + conn + end + end + + def generate_fido_challenge(conn, user_id \\ @admin_username) when is_binary(user_id) do + query = from u in @tables.user, select: {u.fido_enabled, u.fido_active, u.fido_priority, u.fido_creds, u.fido_keys}, where: u.username == ^user_id or u.email == ^user_id + result = Repo.one(query) + fido_required = elem(result, 0) and elem(result, 1) + fido_pairs = if fido_required, do: Enum.zip([elem(result, 3), elem(result, 4)]), else: [] + fido_pairs = if length(fido_pairs) > 0 and elem(result, 2) != nil, do: [Enum.at(fido_pairs, elem(result, 2))], else: fido_pairs + challenge = if fido_required, do: Wax.new_authentication_challenge(allow_credentials: fido_pairs), else: nil + # @TODO add preprocessing of challenge data to match appropriate form + # @NOTE see requirements for input params of JS Web Authentication as guide: https://developer.mozilla.org/en-US/docs/Web/API/AuthenticatorAssertionResponse#instance_properties + + if challenge != nil do + conn = put_session(conn, "fido_challenge", challenge) + {conn, challenge, fido_pairs} + else + {conn, challenge, fido_pairs} + end + end + + def get_credentials(conn, fields \\ [], source \\ :body_params) when is_list(fields) and is_atom(source) do + avail_conn_params = if length(fields) > 0, do: Enum.filter(fields, fn f -> Map.has_key?(Map.get(conn, source), f) end), else: [] + # IO.inspect(conn.body_params) + + if length(avail_conn_params) > 0 do + response = Enum.map(avail_conn_params, fn p -> {p, Map.get(conn, source)[p]} end) + response = Map.new(response) + response + else + nil + end + end + + defp authflow_assessment(result, mode) when is_atom(mode) do + cond do + is_tuple(result) -> + result = Tuple.to_list(result) + + evened_result = Enum.chunk_every(result, 2) + results = Enum.map(evened_result, fn r -> apply(Enum, mode, [r]) end) + whether_required = apply(Enum, mode, [results]) + + # evened_result = if Integer.is_even(length(result)), do: result, else: Enum.drop(result, -1) + # ceiling = length(evened_result) - 1 + # + # indices = 0..ceiling//2 + # # indices = Enum.to_list(indices) + # enableds = Enum.map(indices, fn i -> Enum.fetch!(evened_result, i - 1) end) + # IO.inspect(enableds) + # actives = Enum.map(indices, fn i -> Enum.fetch!(evened_result, i) end) + # IO.inspect(actives) + # total_range = if length(enableds) >= length(actives), do: 0..length(enableds), else: length(enableds) < length(actives) + # total_range = if is_boolean(total_range) and total_range, do: 0..length(actives), else: 0..0 + # conjuncts = Enum.zip([total_range, enableds, actives]) + # conjunctions = Enum.map(conjuncts, fn c -> {elem(c, 0), elem(c, 1) and elem(c, 2)} end) + + # last_item = if length(result) > length(evened_result), do: Enum.at(result, -1), else: nil + # last_item = if is_nil(last_item), do: nil, else: (if is_boolean(last_item), do: last_item, else: !is_nil(last_item)) + # last_conjunction = if is_nil(last_item), do: [], else: [{length(conjunctions), last_item}] + # conjunctions = Enum.concat(conjunctions, last_conjunction) + + # requisites = Enum.filter(conjunctions, fn c -> elem(c, 1) end) + # conjunctions = Enum.map(conjunctions, fn c -> elem(c, 1) end) + # # whether_required = Enum.all?(conjunctions) + # whether_required = apply(Enum, mode, [conjunctions]) + + # {:ok, whether_required, requisites} + {:ok, whether_required, result} + !is_tuple(result) -> + whether_required = if is_boolean(result), do: result, else: !is_nil(result) + + {:ok, whether_required, [result]} + end + end + + def is_authflow_required?(user_id \\ @admin_username, mode \\ :all?, result \\ nil) when is_binary(user_id) and is_atom(mode) do + query = if is_nil(result), do: (from u in @tables.user, select: {u.totp_active, u.totp_enabled, u.fido_active, u.fido_enabled}, where: u.username == ^user_id or u.email == ^user_id), else: nil + result = if is_nil(query), do: result, else: Repo.one(query) + result = if is_struct(result) and match?(%Ecto.Query{}, result), do: Repo.one(result), else: result + result = authflow_assessment(result, mode) + required = elem(result, 1) + required + end + + + defp pass_auth(password, stored_password) when is_binary(password) and is_binary(stored_password) do + authenticated = verify_pass(password, stored_password) + authenticated + end + + defp totp_auth(totp_code, totp_secret) when is_binary(totp_code) do + true_secret = Base.decode32!(totp_secret) + authenticated = NimbleTOTP.valid?(true_secret, totp_code) + authenticated + end + + defp fido_auth(challenge, credential_id, auth_data_bin, sig, client_data_json_raw) do + # @TODO add preprocessing of input params to match appropriate nested data type + # @NOTE see requirements for parameters of below as guide: https://hexdocs.pm/wax_/Wax.html#authenticate/6 + result = Wax.authenticate(credential_id, auth_data_bin, sig, client_data_json_raw, challenge) + + if elem(result, 0) == :ok do + authenticated = true + authenticated + else + authenticated = false + authenticated + end + end + + def password_bouncer(conn, user_id \\ @admin_username, password, result \\ nil) when is_binary(user_id) and is_binary(password) do + query = from u in @tables.user, select: u.password, where: u.username == ^user_id or u.email == ^user_id + result = if is_nil(result), do: Repo.one(query), else: result + stored_password = result + authenticated = if is_nil(stored_password), do: false, else: pass_auth(password, stored_password) + + if authenticated do + conn = generate_user_session(conn, user_id, authenticated, "pass_token") + {conn, authenticated, result} + else + {conn, authenticated, result} + end + end + + def totp_bouncer(conn, user_id \\ @admin_username, code, result \\ nil) when is_binary(user_id) and is_binary(user_id) do + query = from u in @tables.user, select: {u.totp_active, u.totp_enabled, u.totp_secret, u.ltotp}, where: u.username == ^user_id or u.email == ^user_id + result = if is_nil(result), do: Repo.one(query), else: result + requirement_query = from u in @tables.user, select: {u.totp_active, u.totp_enabled}, where: u.username == ^user_id or u.email == ^user_id + totp_required = is_authflow_required?(user_id, :all?, (requirement_query)) + secret = if totp_required, do: elem(result, 2), else: nil + # last_totp_chall = if totp_required, do: elem(result, 3), else: nil + authenticated = if totp_required, do: totp_auth(code, secret), else: true + + if authenticated do + conn = generate_user_session(conn, user_id, authenticated, "2fa_token") + {conn, authenticated, result} + else + {conn, authenticated, result} + end + end + + def fido_bouncer(conn, user_id \\ @admin_username, webauthn_response \\ %{}, result \\ nil) when is_binary(user_id) and (is_map(webauthn_response) or is_struct(webauthn_response)) do + query = from u in @tables.user, select: {u.fido_active, u.fido_enabled, u.fido_priority, u.fido_creds, u.fido_keys}, where: u.username == ^user_id or u.email == ^user_id + result = if result == nil, do: Repo.one(query), else: result + fido_required = if result != nil, do: is_authflow_required?(user_id, result), else: false + # conn = fetch_session(conn) + challenge = if fido_required, do: get_session(conn, "fido_challenge"), else: nil + cred_id = if fido_required, do: webauthn_response.credential_id, else: nil + auth_data = if fido_required, do: webauthn_response.authenticator_data, else: nil + sig = if fido_required, do: webauthn_response.signature, else: nil + raw_client_json = if fido_required, do: webauthn_response.raw_client_json, else: nil + authenticated = if fido_required, do: fido_auth(challenge, cred_id, auth_data, sig, raw_client_json), else: true + + if authenticated do + conn = generate_user_session(conn, user_id, authenticated, "2fa_token") + {conn, authenticated, result} + else + {conn, authenticated, result} + end + end +end