GraphQL API with Python & Flask: JWT authentication¶
Table of contents
Introduction¶
In this example, we will discuss how to use Hasura actions and codegen to build a Python & Flask API for JWT authentication.
Step 1: Create action definition & custom types¶
We assume a user
table with fields email
and password
.
We create two actions and custom types:
Signup
: returns aCreateUserOutput
type Mutation {
Signup (email: String! password: String!): CreateUserOutput
}
type CreateUserOutput {
id : Int!
email : String!
password : String!
}
Login
: returns aJsonWebToken
type Mutation {
Login (email: String! password: String!): JsonWebToken
}
type JsonWebToken {
token : String!
}
Example: creating the Signup
action
Step 2: Action handler implementation for signup¶
If we check the Codegen
tab, we can see that a nice scaffold has been generated for us from the GraphQL types we defined.
Now we need to implement the business logic for Signup
. Our action will do the following:
- Recieve the action arguments
email
andpassword
onrequest
, and pass those values toSignupArgs.from_request()
- Convert the plaintext password input into a hashed secure password with Argon2
- Send a mutation to Hasura to save the newly created user with the hashed password
- Return the created user object to signal success, or else error
The first thing we have to implement is the Argon2 password hashing. We will use argon2-cffi for this. The second thing is a library for making requests to Hasura for mutations/queries, our choice will be requests.
Our requirements.txt
will now look like:
flask
argon2-cffi
requests
pyjwt
On to the implementation.
Signup handler & password hashing¶
For password hashing, the argon2
API is minimal and straightforward: an instance of a password hasher is created with PasswordHasher()
, which has methods .hash(password)
, .verify(hashed_password, password)
, and .check_needs_rehash(hashed_password)
.
In our signup handler, the first thing we’ll do is convert the action input password to a secure hash:
from argon2 import PasswordHasher
Password = PasswordHasher()
@app.route("/signup", methods=["POST"])
def signup_handler():
args = AuthArgs.from_request(request.get_json())
hashed_password = Password.hash(args.password)
GraphQL request client¶
Next, since we have the user’s email and hashed password, we need to send a request to Hasura to save them in the database. For that, we’ll need a request client implementation:
from requests import request
@dataclass
class Client:
url: str
headers: dict
def run_query(self, query: str, variables: dict, extract=False):
request = requests.post(
self.url,
headers=self.headers,
json={"query": query, "variables": variables},
)
assert request.ok, f"Failed with code {request.status_code}"
return request.json()
create_user = lambda self, email, password: self.run_query(
"""
mutation CreateUser($email: String!, $password: String!) {
insert_user_one(object: {email: $email, password: $password}) {
id
email
password
}
}
""",
{"email": email, "password": password},
)
Here we create a utility class for handling our Hasura operations. It takes an URL and headers object as initialization options, and exposes a method .run_query()
for performing GraphQL requests. We create the query function for saving our user in the Signup
action as a class method as well.
We can instantiate the Client
like this:
HASURA_URL = "http://graphql-engine:8080/v1/graphql"
HASURA_HEADERS = {"X-Hasura-Admin-Secret": "your-secret"}
client = Client(url=HASURA_URL, headers=HASURA_HEADERS)
Now, in our Signup
action handler, we need to call client.create_user()
with the input email and the hashed password value to save them, then return the result:
@app.route("/signup", methods=["POST"])
def signup_handler():
args = AuthArgs.from_request(request.get_json())
hashed_password = Password.hash(args.password)
user_response = client.create_user(args.email, hashed_password)
if user_response.get("errors"):
return {"message": user_response["errors"][0]["message"]}, 400
else:
user = user_response["data"]["insert_user_one"]
return CreateUserOutput(**user).to_json()
To test this out, send an HTTP request to your Flask API at /signup
with an email and password:
POST http://localhost:5000/signup HTTP/1.1
content-type: application/json
{
"input": {
"email": "user@test.com",
"password": "password123"
}
}
You should get a successful response like this:
HTTP/1.0 200 OK
Content-Type: text/html; charset=utf-8
Content-Length: 129
Server: Werkzeug/1.0.1 Python/3.8.2
Date: Sun, 10 May 2020 19:58:23 GMT
{
"id": 1,
"email": "user@test.com",
"password": "$argon2id$v=19$m=102400,t=2,p=8$fSmC349hY74QoGRTD0w$OYQYd/PP9kYsy9gRnDF1oQ"
}
Now our Signup
action is functional! The last piece is create the Login
handler, which will do a password comparison, and then return a signed JWT if successful.
Step 3: Action handler implementation for login¶
The first thing we need is a new request method on our Client
class to find a user by email, so that we can look them up to compare the password. Under create_user
, create the following new method:
find_user_by_email = lambda self, email: self.run_query(
"""
query UserByEmail($email: String!) {
user(where: {email: {_eq: $email}}, limit: 1) {
id
email
password
}
}
""",
{"email": email},
)
Then in our login handler, we call Password.verify()
to compare the input password against the hashed password saved in the database. If the password matches, we create a JWT from the user credentials, and return it.
We also need to check to see if the password needs to be updated and re-hashed by Argon2, in the event that hashing parameters have changed and it’s no longer valid. If so, we should re-hash and then save the updated password in the database through an update mutation to Hasura, client.update_password()
.
@app.route("/login", methods=["POST"])
def login_handler():
args = LoginArgs.from_request(request.get_json())
user_response = client.find_user_by_email(args.email)
user = user_response["data"]["user"][0]
try:
Password.verify(user.get("password"), args.password)
rehash_and_save_password_if_needed(user, args.password)
return JsonWebToken(generate_token(user)).to_json()
except VerifyMismatchError:
return { "message": "Invalid credentials" }, 401
Here is what the implementation of generate_token()
and rehash_and_save_password_if_needed()
could look like:
import os
import jwt
# Try to get the secret from ENV, else fallback to provided string
HASURA_JWT_SECRET = os.getenv("HASURA_GRAPHQL_JWT_SECRET", "a-very-secret-secret")
# ROLE LOGIC FOR DEMO PURPOSES ONLY
# NOT AT ALL SUITABLE FOR A REAL APP
def generate_token(user) -> str:
"""
Generates a JWT compliant with the Hasura spec, given a User object with field "id"
"""
user_roles = ["user"]
admin_roles = ["user", "admin"]
is_admin = user["email"] == "admin@site.com"
payload = {
"https://hasura.io/jwt/claims": {
"x-hasura-allowed-roles": admin_roles if is_admin else user_roles,
"x-hasura-default-role": "admin" if is_admin else "user",
"x-hasura-user-id": user["id"],
}
}
token = jwt.encode(payload, HASURA_JWT_SECRET, "HS256")
return token.decode("utf-8")
def rehash_and_save_password_if_needed(user, plaintext_password):
"""
Whenever your Argon2 parameters – or argon2-cffi’s defaults! –
change, you should rehash your passwords at the next opportunity.
The common approach is to do that whenever a user logs in, since
that should be the only time when you have access to the cleartext password.
Therefore it’s best practice to check – and if necessary rehash –
passwords after each successful authentication.
"""
if Password.check_needs_rehash(user["password"]):
client.update_password(user["id"], Password.hash(plaintext_password))
And finally, client.update_password()
:
update_password = lambda self, id, password: self.run_query(
"""
mutation UpdatePassword($id: Int!, $password: String!) {
update_user_by_pk(pk_columns: {id: $id}, _set: {password: $password}) {
password
}
}
""",
{"id": id, "password": password},
)
Step 4: Testing out the handler routes¶
Call the /signup
endpoint with email
and password
:
POST http://localhost:5000/signup HTTP/1.1
content-type: application/json
{
"input": {
"email": "user@test.com",
"password": "password123"
}
}
Action handler response:
HTTP/1.0 200 OK
Content-Type: text/html; charset=utf-8
Content-Length: 256
Server: Werkzeug/1.0.1 Python/3.8.2
Date: Sun, 10 May 2020 19:59:36 GMT
{
"token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.z9ey1lw9p89gUkAmWEa7Qbpa1R71TgfkjZnEunGJ1ig"
}
Decode the JWT token to access the Hasura claims:
$ decode_jwt 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.z9ey1lw9p89gUkAmWEa7Qbpa1R71TgfkjZnEunGJ1ig'
{
"https://hasura.io/jwt/claims": {
"x-hasura-allowed-roles": ["user"],
"x-hasura-default-role": "user",
"x-hasura-user-id": 1
}
}
Step 5: Calling the finished actions¶
Let’s try out our defined actions from the GraphQL API.
Call the Signup
action:
mutation Signup {
signup(email: "newuser@test.com", password: "a-password") {
id
email
password
}
}
Call the Signup
action with a duplicate:
mutation SignupDuplicate {
signup(email: "newuser@test.com", password: "a-password") {
id
email
password
}
}
Call the Login
action with valid credentials:
query Login {
Login(email: "newuser@test.com", password: "a-password") {
token
}
}
Call the Login
action with invalid credentials:
query IncorrectLogin {
Login(email: "newuser@test.com", password: "bad-password") {
token
}
}
Complete app code¶
import os
import jwt
import json
import logging
import requests
from flask import Flask, request, jsonify
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
from typing import Optional
from dataclasses import dataclass, asdict
HASURA_URL = "http://graphql-engine:8080/v1/graphql"
HASURA_HEADERS = {"X-Hasura-Admin-Secret": "your-secret"}
HASURA_JWT_SECRET = os.getenv("HASURA_GRAPHQL_JWT_SECRET", "a-very-secret-secret")
################
# GRAPHQL CLIENT
################
@dataclass
class Client:
url: str
headers: dict
def run_query(self, query: str, variables: dict, extract=False):
request = requests.post(
self.url,
headers=self.headers,
json={"query": query, "variables": variables},
)
assert request.ok, f"Failed with code {request.status_code}"
return request.json()
find_user_by_email = lambda self, email: self.run_query(
"""
query UserByEmail($email: String!) {
user(where: {email: {_eq: $email}}, limit: 1) {
id
email
password
}
}
""",
{"email": email},
)
create_user = lambda self, email, password: self.run_query(
"""
mutation CreateUser($email: String!, $password: String!) {
insert_user_one(object: {email: $email, password: $password}) {
id
email
password
}
}
""",
{"email": email, "password": password},
)
update_password = lambda self, id, password: self.run_query(
"""
mutation UpdatePassword($id: Int!, $password: String!) {
update_user_by_pk(pk_columns: {id: $id}, _set: {password: $password}) {
password
}
}
""",
{"id": id, "password": password},
)
#######
# UTILS
#######
Password = PasswordHasher()
client = Client(url=HASURA_URL, headers=HASURA_HEADERS)
# ROLE LOGIC FOR DEMO PURPOSES ONLY
# NOT AT ALL SUITABLE FOR A REAL APP
def generate_token(user) -> str:
"""
Generates a JWT compliant with the Hasura spec, given a User object with field "id"
"""
user_roles = ["user"]
admin_roles = ["user", "admin"]
is_admin = user["email"] == "admin@site.com"
payload = {
"https://hasura.io/jwt/claims": {
"x-hasura-allowed-roles": admin_roles if is_admin else user_roles,
"x-hasura-default-role": "admin" if is_admin else "user",
"x-hasura-user-id": user["id"],
}
}
token = jwt.encode(payload, HASURA_JWT_SECRET, "HS256")
return token.decode("utf-8")
def rehash_and_save_password_if_needed(user, plaintext_password):
if Password.check_needs_rehash(user["password"]):
client.update_password(user["id"], Password.hash(plaintext_password))
#############
# DATA MODELS
#############
@dataclass
class RequestMixin:
@classmethod
def from_request(cls, request):
"""
Helper method to convert an HTTP request to Dataclass Instance
"""
values = request.get("input")
return cls(**values)
def to_json(self):
return json.dumps(asdict(self))
@dataclass
class CreateUserOutput(RequestMixin):
id: int
email: str
password: str
@dataclass
class JsonWebToken(RequestMixin):
token: str
@dataclass
class AuthArgs(RequestMixin):
email: str
password: str
##############
# MAIN SERVICE
##############
app = Flask(__name__)
@app.route("/signup", methods=["POST"])
def signup_handler():
args = AuthArgs.from_request(request.get_json())
hashed_password = Password.hash(args.password)
user_response = client.create_user(args.email, hashed_password)
if user_response.get("errors"):
return {"message": user_response["errors"][0]["message"]}, 400
else:
user = user_response["data"]["insert_user_one"]
return CreateUserOutput(**user).to_json()
@app.route("/login", methods=["POST"])
def login_handler():
args = AuthArgs.from_request(request.get_json())
user_response = client.find_user_by_email(args.email)
user = user_response["data"]["user"][0]
try:
Password.verify(user.get("password"), args.password)
rehash_and_save_password_if_needed(user, args.password)
return JsonWebToken(generate_token(user)).to_json()
except VerifyMismatchError:
return {"message": "Invalid credentials"}, 401
if __name__ == "__main__":
app.run(debug=True, host="0.0.0.0")
Additional Resources
Introduction to Hasura Actions - View Recording.