Added module for useful authentication actions

This commit is contained in:
Alex Tavarez
2025-09-02 17:13:10 -04:00
parent 75451de7c1
commit abc6bbd4af

190
lib/sukaato/auth.ex Normal file
View File

@@ -0,0 +1,190 @@
defmodule Sukaato.Auth do
import Argon2
import Ecto.Query
require Integer
alias Sukaato.{Repo, User}
# alias Sukaato.StandardQueries
use SukaatoWeb, :controller
@rel_proj_root "../.."
@site_config_file Path.expand(@rel_proj_root <> "/site.toml", __DIR__)
@site_config Toml.decode_file(@site_config_file)
@admin_username elem(@site_config, 1)["site"]["username"]
# @fqdn elem(@site_config, 1)["site"]["fqdn"]
# @admin_files_path Path.expand(@rel_proj_root <> "/priv/static/files/users/" <> @admin_username, __DIR__)
# @markdown_posts_path Path.expand(@rel_proj_root <> "/lib/sukaato_web/controllers/page_md/users/" <> @admin_username <> "/posts", __DIR__)
@tables %{
user: User
}
def generate_user_session(conn, user_id \\ @admin_username, authenticated, session_key \\ "login_token") when is_binary(user_id) and is_boolean(authenticated) and is_binary(session_key) do
if authenticated do
user_token = :crypto.strong_rand_bytes(32) |> Base.encode64 |> binary_part(0, 32)
conn = put_session(conn, session_key, user_id <> ":" <> user_token)
conn
else
conn
end
end
def generate_fido_challenge(conn, user_id \\ @admin_username) when is_binary(user_id) do
query = from u in @tables.user, select: {u.fido_enabled, u.fido_active, u.fido_priority, u.fido_creds, u.fido_keys}, where: u.username == ^user_id or u.email == ^user_id
result = Repo.one(query)
fido_required = elem(result, 0) and elem(result, 1)
fido_pairs = if fido_required, do: Enum.zip([elem(result, 3), elem(result, 4)]), else: []
fido_pairs = if length(fido_pairs) > 0 and elem(result, 2) != nil, do: [Enum.at(fido_pairs, elem(result, 2))], else: fido_pairs
challenge = if fido_required, do: Wax.new_authentication_challenge(allow_credentials: fido_pairs), else: nil
# @TODO add preprocessing of challenge data to match appropriate form
# @NOTE see requirements for input params of JS Web Authentication as guide: https://developer.mozilla.org/en-US/docs/Web/API/AuthenticatorAssertionResponse#instance_properties
if challenge != nil do
conn = put_session(conn, "fido_challenge", challenge)
{conn, challenge, fido_pairs}
else
{conn, challenge, fido_pairs}
end
end
def get_credentials(conn, fields \\ [], source \\ :body_params) when is_list(fields) and is_atom(source) do
avail_conn_params = if length(fields) > 0, do: Enum.filter(fields, fn f -> Map.has_key?(Map.get(conn, source), f) end), else: []
# IO.inspect(conn.body_params)
if length(avail_conn_params) > 0 do
response = Enum.map(avail_conn_params, fn p -> {p, Map.get(conn, source)[p]} end)
response = Map.new(response)
response
else
nil
end
end
defp authflow_assessment(result, mode) when is_atom(mode) do
cond do
is_tuple(result) ->
result = Tuple.to_list(result)
evened_result = Enum.chunk_every(result, 2)
results = Enum.map(evened_result, fn r -> apply(Enum, mode, [r]) end)
whether_required = apply(Enum, mode, [results])
# evened_result = if Integer.is_even(length(result)), do: result, else: Enum.drop(result, -1)
# ceiling = length(evened_result) - 1
#
# indices = 0..ceiling//2
# # indices = Enum.to_list(indices)
# enableds = Enum.map(indices, fn i -> Enum.fetch!(evened_result, i - 1) end)
# IO.inspect(enableds)
# actives = Enum.map(indices, fn i -> Enum.fetch!(evened_result, i) end)
# IO.inspect(actives)
# total_range = if length(enableds) >= length(actives), do: 0..length(enableds), else: length(enableds) < length(actives)
# total_range = if is_boolean(total_range) and total_range, do: 0..length(actives), else: 0..0
# conjuncts = Enum.zip([total_range, enableds, actives])
# conjunctions = Enum.map(conjuncts, fn c -> {elem(c, 0), elem(c, 1) and elem(c, 2)} end)
# last_item = if length(result) > length(evened_result), do: Enum.at(result, -1), else: nil
# last_item = if is_nil(last_item), do: nil, else: (if is_boolean(last_item), do: last_item, else: !is_nil(last_item))
# last_conjunction = if is_nil(last_item), do: [], else: [{length(conjunctions), last_item}]
# conjunctions = Enum.concat(conjunctions, last_conjunction)
# requisites = Enum.filter(conjunctions, fn c -> elem(c, 1) end)
# conjunctions = Enum.map(conjunctions, fn c -> elem(c, 1) end)
# # whether_required = Enum.all?(conjunctions)
# whether_required = apply(Enum, mode, [conjunctions])
# {:ok, whether_required, requisites}
{:ok, whether_required, result}
!is_tuple(result) ->
whether_required = if is_boolean(result), do: result, else: !is_nil(result)
{:ok, whether_required, [result]}
end
end
def is_authflow_required?(user_id \\ @admin_username, mode \\ :all?, result \\ nil) when is_binary(user_id) and is_atom(mode) do
query = if is_nil(result), do: (from u in @tables.user, select: {u.totp_active, u.totp_enabled, u.fido_active, u.fido_enabled}, where: u.username == ^user_id or u.email == ^user_id), else: nil
result = if is_nil(query), do: result, else: Repo.one(query)
result = if is_struct(result) and match?(%Ecto.Query{}, result), do: Repo.one(result), else: result
result = authflow_assessment(result, mode)
required = elem(result, 1)
required
end
defp pass_auth(password, stored_password) when is_binary(password) and is_binary(stored_password) do
authenticated = verify_pass(password, stored_password)
authenticated
end
defp totp_auth(totp_code, totp_secret) when is_binary(totp_code) do
true_secret = Base.decode32!(totp_secret)
authenticated = NimbleTOTP.valid?(true_secret, totp_code)
authenticated
end
defp fido_auth(challenge, credential_id, auth_data_bin, sig, client_data_json_raw) do
# @TODO add preprocessing of input params to match appropriate nested data type
# @NOTE see requirements for parameters of below as guide: https://hexdocs.pm/wax_/Wax.html#authenticate/6
result = Wax.authenticate(credential_id, auth_data_bin, sig, client_data_json_raw, challenge)
if elem(result, 0) == :ok do
authenticated = true
authenticated
else
authenticated = false
authenticated
end
end
def password_bouncer(conn, user_id \\ @admin_username, password, result \\ nil) when is_binary(user_id) and is_binary(password) do
query = from u in @tables.user, select: u.password, where: u.username == ^user_id or u.email == ^user_id
result = if is_nil(result), do: Repo.one(query), else: result
stored_password = result
authenticated = if is_nil(stored_password), do: false, else: pass_auth(password, stored_password)
if authenticated do
conn = generate_user_session(conn, user_id, authenticated, "pass_token")
{conn, authenticated, result}
else
{conn, authenticated, result}
end
end
def totp_bouncer(conn, user_id \\ @admin_username, code, result \\ nil) when is_binary(user_id) and is_binary(user_id) do
query = from u in @tables.user, select: {u.totp_active, u.totp_enabled, u.totp_secret, u.ltotp}, where: u.username == ^user_id or u.email == ^user_id
result = if is_nil(result), do: Repo.one(query), else: result
requirement_query = from u in @tables.user, select: {u.totp_active, u.totp_enabled}, where: u.username == ^user_id or u.email == ^user_id
totp_required = is_authflow_required?(user_id, :all?, (requirement_query))
secret = if totp_required, do: elem(result, 2), else: nil
# last_totp_chall = if totp_required, do: elem(result, 3), else: nil
authenticated = if totp_required, do: totp_auth(code, secret), else: true
if authenticated do
conn = generate_user_session(conn, user_id, authenticated, "2fa_token")
{conn, authenticated, result}
else
{conn, authenticated, result}
end
end
def fido_bouncer(conn, user_id \\ @admin_username, webauthn_response \\ %{}, result \\ nil) when is_binary(user_id) and (is_map(webauthn_response) or is_struct(webauthn_response)) do
query = from u in @tables.user, select: {u.fido_active, u.fido_enabled, u.fido_priority, u.fido_creds, u.fido_keys}, where: u.username == ^user_id or u.email == ^user_id
result = if result == nil, do: Repo.one(query), else: result
fido_required = if result != nil, do: is_authflow_required?(user_id, result), else: false
# conn = fetch_session(conn)
challenge = if fido_required, do: get_session(conn, "fido_challenge"), else: nil
cred_id = if fido_required, do: webauthn_response.credential_id, else: nil
auth_data = if fido_required, do: webauthn_response.authenticator_data, else: nil
sig = if fido_required, do: webauthn_response.signature, else: nil
raw_client_json = if fido_required, do: webauthn_response.raw_client_json, else: nil
authenticated = if fido_required, do: fido_auth(challenge, cred_id, auth_data, sig, raw_client_json), else: true
if authenticated do
conn = generate_user_session(conn, user_id, authenticated, "2fa_token")
{conn, authenticated, result}
else
{conn, authenticated, result}
end
end
end