GraphQL API with Python & Flask: JWT authentication

Introduction

In this example, we will discuss how to use Hasura actions and codegen to build a Python & Flask API for JWT authentication.

Step 1: Create action definition & custom types

We assume a user table with fields email and password.

We create two actions and custom types:

  1. Signup: returns a CreateUserOutput
type Mutation {
  Signup (email: String! password: String!): CreateUserOutput
}

type CreateUserOutput {
  id : Int!
  email : String!
  password : String!
}
  1. Login: returns a JsonWebToken
type Mutation {
  Login (email: String! password: String!): JsonWebToken
}

type JsonWebToken {
  token : String!
}

Example: creating the Signup action

Python Flask signup types

Step 2: Action handler implementation for signup

If we check the Codegen tab, we can see that a nice scaffold has been generated for us from the GraphQL types we defined.

Now we need to implement the business logic for Signup. Our action will do the following:

  • Recieve the action arguments email and password on request, and pass those values to SignupArgs.from_request()
  • Convert the plaintext password input into a hashed secure password with Argon2
  • Send a mutation to Hasura to save the newly created user with the hashed password
  • Return the created user object to signal success, or else error

The first thing we have to implement is the Argon2 password hashing. We will use argon2-cffi for this. The second thing is a library for making requests to Hasura for mutations/queries, our choice will be requests.

Our requirements.txt will now look like:

flask
argon2-cffi
requests
pyjwt

On to the implementation.

Signup handler & password hashing

For password hashing, the argon2 API is minimal and straightforward: an instance of a password hasher is created with PasswordHasher(), which has methods .hash(password), .verify(hashed_password, password), and .check_needs_rehash(hashed_password).

In our signup handler, the first thing we’ll do is convert the action input password to a secure hash:

from argon2 import PasswordHasher
Password = PasswordHasher()

@app.route("/signup", methods=["POST"])
def signup_handler():
    args = AuthArgs.from_request(request.get_json())
    hashed_password = Password.hash(args.password)

GraphQL request client

Next, since we have the user’s email and hashed password, we need to send a request to Hasura to save them in the database. For that, we’ll need a request client implementation:

from requests import request

@dataclass
class Client:
    url: str
    headers: dict

    def run_query(self, query: str, variables: dict, extract=False):
        request = requests.post(
            self.url,
            headers=self.headers,
            json={"query": query, "variables": variables},
        )
        assert request.ok, f"Failed with code {request.status_code}"
        return request.json()

    create_user = lambda self, email, password: self.run_query(
        """
            mutation CreateUser($email: String!, $password: String!) {
                insert_user_one(object: {email: $email, password: $password}) {
                    id
                    email
                    password
                }
            }
        """,
        {"email": email, "password": password},
    )

Here we create a utility class for handling our Hasura operations. It takes an URL and headers object as initialization options, and exposes a method .run_query() for performing GraphQL requests. We create the query function for saving our user in the Signup action as a class method as well.

We can instantiate the Client like this:

HASURA_URL = "http://graphql-engine:8080/v1/graphql"
HASURA_HEADERS = {"X-Hasura-Admin-Secret": "your-secret"}

client = Client(url=HASURA_URL, headers=HASURA_HEADERS)

Now, in our Signup action handler, we need to call client.create_user() with the input email and the hashed password value to save them, then return the result:

@app.route("/signup", methods=["POST"])
def signup_handler():
    args = AuthArgs.from_request(request.get_json())
    hashed_password = Password.hash(args.password)
    user_response = client.create_user(args.email, hashed_password)
    if user_response.get("errors"):
        return {"message": user_response["errors"][0]["message"]}, 400
    else:
        user = user_response["data"]["insert_user_one"]
        return CreateUserOutput(**user).to_json()

To test this out, send an HTTP request to your Flask API at /signup with an email and password:

POST http://localhost:5000/signup HTTP/1.1
content-type: application/json

{
  "input": {
    "email": "user@test.com",
    "password": "password123"
  }
}

You should get a successful response like this:

HTTP/1.0 200 OK
Content-Type: text/html; charset=utf-8
Content-Length: 129
Server: Werkzeug/1.0.1 Python/3.8.2
Date: Sun, 10 May 2020 19:58:23 GMT

{
  "id": 1,
  "email": "user@test.com",
  "password": "$argon2id$v=19$m=102400,t=2,p=8$fSmC349hY74QoGRTD0w$OYQYd/PP9kYsy9gRnDF1oQ"
}

Now our Signup action is functional! The last piece is create the Login handler, which will do a password comparison, and then return a signed JWT if successful.

Step 3: Action handler implementation for login

The first thing we need is a new request method on our Client class to find a user by email, so that we can look them up to compare the password. Under create_user, create the following new method:

find_user_by_email = lambda self, email: self.run_query(
    """
        query UserByEmail($email: String!) {
            user(where: {email: {_eq: $email}}, limit: 1) {
                id
                email
                password
            }
        }
    """,
    {"email": email},
)

Then in our login handler, we call Password.verify() to compare the input password against the hashed password saved in the database. If the password matches, we create a JWT from the user credentials, and return it.

We also need to check to see if the password needs to be updated and re-hashed by Argon2, in the event that hashing parameters have changed and it’s no longer valid. If so, we should re-hash and then save the updated password in the database through an update mutation to Hasura, client.update_password().

@app.route("/login", methods=["POST"])
def login_handler():
    args = LoginArgs.from_request(request.get_json())
    user_response = client.find_user_by_email(args.email)
    user = user_response["data"]["user"][0]
    try:
        Password.verify(user.get("password"), args.password)
        rehash_and_save_password_if_needed(user, args.password)
        return JsonWebToken(generate_token(user)).to_json()
    except VerifyMismatchError:
        return { "message": "Invalid credentials" }, 401

Here is what the implementation of generate_token() and rehash_and_save_password_if_needed() could look like:

import os
import jwt

# Try to get the secret from ENV, else fallback to provided string
HASURA_JWT_SECRET = os.getenv("HASURA_GRAPHQL_JWT_SECRET", "a-very-secret-secret")

# ROLE LOGIC FOR DEMO PURPOSES ONLY
# NOT AT ALL SUITABLE FOR A REAL APP
def generate_token(user) -> str:
    """
    Generates a JWT compliant with the Hasura spec, given a User object with field "id"
    """
    user_roles = ["user"]
    admin_roles = ["user", "admin"]
    is_admin = user["email"] == "admin@site.com"
    payload = {
        "https://hasura.io/jwt/claims": {
            "x-hasura-allowed-roles": admin_roles if is_admin else user_roles,
            "x-hasura-default-role": "admin" if is_admin else "user",
            "x-hasura-user-id": user["id"],
        }
    }
    token = jwt.encode(payload, HASURA_JWT_SECRET, "HS256")
    return token.decode("utf-8")

def rehash_and_save_password_if_needed(user, plaintext_password):
    """
    Whenever your Argon2 parameters – or argon2-cffi’s defaults! –
    change, you should rehash your passwords at the next opportunity.
    The common approach is to do that whenever a user logs in, since
    that should be the only time when you have access to the cleartext password.
    Therefore it’s best practice to check – and if necessary rehash –
    passwords after each successful authentication.
    """
    if Password.check_needs_rehash(user["password"]):
        client.update_password(user["id"], Password.hash(plaintext_password))

And finally, client.update_password():

update_password = lambda self, id, password: self.run_query(
    """
        mutation UpdatePassword($id: Int!, $password: String!) {
            update_user_by_pk(pk_columns: {id: $id}, _set: {password: $password}) {
                password
            }
        }
    """,
    {"id": id, "password": password},
)

Step 4: Testing out the handler routes

Call the /signup endpoint with email and password:

POST http://localhost:5000/signup HTTP/1.1
content-type: application/json

{
  "input": {
    "email": "user@test.com",
    "password": "password123"
  }
}

Action handler response:

HTTP/1.0 200 OK
Content-Type: text/html; charset=utf-8
Content-Length: 256
Server: Werkzeug/1.0.1 Python/3.8.2
Date: Sun, 10 May 2020 19:59:36 GMT

{
  "token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.z9ey1lw9p89gUkAmWEa7Qbpa1R71TgfkjZnEunGJ1ig"
}

Decode the JWT token to access the Hasura claims:

$ decode_jwt 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.z9ey1lw9p89gUkAmWEa7Qbpa1R71TgfkjZnEunGJ1ig'

{
  "https://hasura.io/jwt/claims": {
    "x-hasura-allowed-roles": ["user"],
    "x-hasura-default-role": "user",
    "x-hasura-user-id": 1
  }
}

Step 5: Calling the finished actions

Let’s try out our defined actions from the GraphQL API.

Call the Signup action:

mutation Signup {
  signup(email: "newuser@test.com", password: "a-password") {
    id
    email
    password
  }
}
mutation Signup { signup(email: "newuser@test.com", password: "a-password") { id email password } }
{ "data": { "Signup": { "id": 2, "email": "newuser@test.com", "password": "$argon2id$v=19$m=102400,t=2,p=8$fSmC349hY74QoGRTD0w$OYQYd/PP9kYsy9gRnDF1oQ" } } }

Call the Signup action with a duplicate:

mutation SignupDuplicate {
  signup(email: "newuser@test.com", password: "a-password") {
    id
    email
    password
  }
}
mutation SignupDuplicate { signup(email: "newuser@test.com", password: "a-password") { id email password } }
{ "errors": [ { "extensions": { "path": "$", "code": "unexpected" }, "message": "Uniqueness violation. Duplicate key value violates unique constraint \"user_email_key\"" ] }

Call the Login action with valid credentials:

query Login {
  Login(email: "newuser@test.com", password: "a-password") {
    token
  }
}
query Login { Login(email: "newuser@test.com", password: "a-password") { token } }
{ "data": { "Login": { "token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.z9ey1lw9p89gUkAmWEa7Qbpa1R71TgfkjZnEunGJ1ig" } } }

Call the Login action with invalid credentials:

query IncorrectLogin {
  Login(email: "newuser@test.com", password: "bad-password") {
    token
  }
}
query IncorrectLogin { Login(email: "newuser@test.com", password: "bad-password") { token } }
{ "errors": [ { "extensions": { "path": "$", "code": "unexpected" }, "message": "Invalid credentials" ] }

Complete app code

import os
import jwt
import json
import logging
import requests
from flask import Flask, request, jsonify
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
from typing import Optional
from dataclasses import dataclass, asdict

HASURA_URL = "http://graphql-engine:8080/v1/graphql"
HASURA_HEADERS = {"X-Hasura-Admin-Secret": "your-secret"}
HASURA_JWT_SECRET = os.getenv("HASURA_GRAPHQL_JWT_SECRET", "a-very-secret-secret")

################
# GRAPHQL CLIENT
################

@dataclass
class Client:
    url: str
    headers: dict

    def run_query(self, query: str, variables: dict, extract=False):
        request = requests.post(
            self.url,
            headers=self.headers,
            json={"query": query, "variables": variables},
        )
        assert request.ok, f"Failed with code {request.status_code}"
        return request.json()

    find_user_by_email = lambda self, email: self.run_query(
        """
            query UserByEmail($email: String!) {
                user(where: {email: {_eq: $email}}, limit: 1) {
                    id
                    email
                    password
                }
            }
        """,
        {"email": email},
    )

    create_user = lambda self, email, password: self.run_query(
        """
            mutation CreateUser($email: String!, $password: String!) {
                insert_user_one(object: {email: $email, password: $password}) {
                    id
                    email
                    password
                }
            }
        """,
        {"email": email, "password": password},
    )

    update_password = lambda self, id, password: self.run_query(
        """
            mutation UpdatePassword($id: Int!, $password: String!) {
                update_user_by_pk(pk_columns: {id: $id}, _set: {password: $password}) {
                    password
                }
            }
        """,
        {"id": id, "password": password},
    )

#######
# UTILS
#######

Password = PasswordHasher()
client = Client(url=HASURA_URL, headers=HASURA_HEADERS)

# ROLE LOGIC FOR DEMO PURPOSES ONLY
# NOT AT ALL SUITABLE FOR A REAL APP
def generate_token(user) -> str:
    """
    Generates a JWT compliant with the Hasura spec, given a User object with field "id"
    """
    user_roles = ["user"]
    admin_roles = ["user", "admin"]
    is_admin = user["email"] == "admin@site.com"
    payload = {
        "https://hasura.io/jwt/claims": {
            "x-hasura-allowed-roles": admin_roles if is_admin else user_roles,
            "x-hasura-default-role": "admin" if is_admin else "user",
            "x-hasura-user-id": user["id"],
        }
    }
    token = jwt.encode(payload, HASURA_JWT_SECRET, "HS256")
    return token.decode("utf-8")


def rehash_and_save_password_if_needed(user, plaintext_password):
    if Password.check_needs_rehash(user["password"]):
        client.update_password(user["id"], Password.hash(plaintext_password))


#############
# DATA MODELS
#############

@dataclass
class RequestMixin:
    @classmethod
    def from_request(cls, request):
        """
        Helper method to convert an HTTP request to Dataclass Instance
        """
        values = request.get("input")
        return cls(**values)

    def to_json(self):
        return json.dumps(asdict(self))


@dataclass
class CreateUserOutput(RequestMixin):
    id: int
    email: str
    password: str


@dataclass
class JsonWebToken(RequestMixin):
    token: str


@dataclass
class AuthArgs(RequestMixin):
    email: str
    password: str

##############
# MAIN SERVICE
##############

app = Flask(__name__)

@app.route("/signup", methods=["POST"])
def signup_handler():
    args = AuthArgs.from_request(request.get_json())
    hashed_password = Password.hash(args.password)
    user_response = client.create_user(args.email, hashed_password)
    if user_response.get("errors"):
        return {"message": user_response["errors"][0]["message"]}, 400
    else:
        user = user_response["data"]["insert_user_one"]
        return CreateUserOutput(**user).to_json()

@app.route("/login", methods=["POST"])
def login_handler():
    args = AuthArgs.from_request(request.get_json())
    user_response = client.find_user_by_email(args.email)
    user = user_response["data"]["user"][0]
    try:
        Password.verify(user.get("password"), args.password)
        rehash_and_save_password_if_needed(user, args.password)
        return JsonWebToken(generate_token(user)).to_json()
    except VerifyMismatchError:
        return {"message": "Invalid credentials"}, 401

if __name__ == "__main__":
    app.run(debug=True, host="0.0.0.0")

Additional Resources

Introduction to Hasura Actions - View Recording.