Building testable Celery tasks

Building testable Celery tasks
Photo by Daniela Paola Alchapar / Unsplash

When designing a Python application with asynchronous message handling, you very quickly find a good case to implement Celery in your application. Over the years I've seen many tasks written within Celery, which is used for async handling of sending notifications, updating external resources or processing heavy duty tasks.

One of the most common pitfalls I've seen in these applications (which are mostly written in Django) is that they are very hard to test, or heavily framework dependent and won't allow you to easily test the business logic in your task.

Scenario: SMS Service

Let's start with one of many examples, which is very simple (but we can also have tasks with many more dependencies involved).

from smspackage import sms_client

@app.task()
def send_sms_to_new_user(user_id):
    client = sms_client.New(api_key=XXXXX)

    user = User.objects.select_for_update(pk=user_id)

    if user.is_notified:
        return
  
    client.send_message(user.phone, {
        "message_body": f"Hey {user.name}, thanks for signing up!"
    })

    user.is_notified = True
    user.save()
    

Perfect, we have our task. We can even test our task with a Test API key (which allows sending a single SMS to a single verified phone number 100x per day) against the SMS service and our own phone number to confirm we receive the text. We release the feature to production

The problem

Some other developer is doing some optimization to the phone number field on the user, splitting up country code and phone number into 2 separate fields, reusing phone for the non-country code part and introducing user.get_full_phone() to get the full result.

The SMS service has the test API key configured, but our engineer working on this feature has set up a dummy phone number in the test user, as he did not want to commit a real phone number (nor did he want his number for testing). He assumes the Celery task is still working as expected and sending a SMS (which is being blocked in the SMS Service logs).

The SMS Service is not suitable for integration testing (as it's silently accepting anything valid), yet our code depends on it to run.

The solution

To solve this, we need to decouple our Celery task from our business logic. Our celery task only calls the business logic, which should live separate from our Celery task.

This allows us to specify the construction or our business logic function a bit more.

In the below example, I've put every task in our application as a separate Use Case (read more on Use Cases and Clean Architecture here)

from ..usecase.send_sms import SendSMSUseCase
from smspackage import sms_client

@app.task
def send_sms_to_new_user_task(user_id):
    client = sms_client.New(api_key=XXXX)

    SendSMSUseCase(
        client=SMSClient
    ).execute(user_id)

The implementation of the Use Case (this can be either done using Duck typing or using the Type system by defining an Abstract Base Class (ABC) / Protocol.

class SendSMSUseCase:

    def __init__(sms_client):
        self.sms_client = sms_client

    def execute(user_id):
        user = User.objects.select_for_update(pk=user_id)

        if user.is_notified:
            return
  
        self.client.send_message(user.phone, {
            "message_body": f"Hey {user.name}, thanks for signing up!"
        })

        user.is_notified = True
        user.save()

Instead of a class, you could also define the use case as a function, adding the dependencies as a function argument:

def send_sms_usecase(sms_client, user_id):
    user = User.objects.select_for_update(pk=user_id)

    if user.is_notified:
        return

    self.client.send_message(user.phone, {
        "message_body": f"Hey {user.name}, thanks for signing up!"
    })

    user.is_notified = True
    user.save()

The result

This allows us to very easily test the business logic associated with the task, and we are even able to call this business logic from other parts of the code, e.g. an admin could retrigger the SMS if the customer reports that the text was not received.

All you need to test this function as part of your application, is by writing a Mock function, that implement the methods required by the type in the function:


class MockSMSClient:

    def send_message(phone, message_options):
        if phone !== '+1123456789':
            raise InvalidPhoneNumberException()

        if message_options["message_body"] != "Hey Test User, thanks for signing up!"
            raise InvalidMessageException()

def test_send_sms_usecase():

    client = MockSMSClient()
    use_case = SendSMSUseCase(client).execute(user_id=1)
    user = User.objects.get(id=1)

    assert(user.is_notified, True)

    
    

This solution allows us to scale even when the application business logic is more complex or requires more external services to be updated.

By validating the input that is thrown in the mock with expected data, you can also assert the correct input parameters have been given and this has not changed over time. If any developer would indeed change the way the phone number is rendered or does some magic with the name field, your test will fail and someone will need to assert whether the change is breaking any expected behaviour.