How to Send Messages to SQS with Python, Flask, and Boto3

2020-07-20 15:28:00 | #programming #python #sysadmin #aws #sqs

Tested On

  • Linux Ubuntu 20.04
  • Windows 10
  • macOS Catalina

In this tutorial, we're going to learn how to set up an API that forwards messages to an SQS queue. SQS allows us to queue up requests from the front end, where we can deploy workers to consume messages at a healthy interval. This is much safer than sending messages directly to the backend, which risks overloading the server and DB.

We'll begin by setting up a local environment so we can program our REST API with Flask. We'll even set up a localized version of SQS, using localstack. This allows us to test our code without having an AWS account. When you're ready to deploy to production, we'll teach you how to set up those resources in your live AWS evironment and update your endpoints.

Completing the Prerequisites

It is important that you complete the prerequisites listed above, as this program depends on few components already covered. The first component is a custom logger.py module that accurately logs statements to CloudWatch. The other component is a localstack environment that runs the AWS cloud stack, locally. This gives you the ability to run and debug your code against mock AWS services, without having to sign up for AWS.

As a side note, if you'd like to learn about how to receive messages from an AWS SQS queue with Python, you can read this article, as well.

How to Set Up the Python SQS Message Sender Project Files

How to Create Python Project Files with Windows 10 PowerShell 2.0+

cd ~
New-Item -ItemType "directory" -Path ".\python-sqs-sender"
cd python-sqs-sender
New-Item -ItemType "file" -Path . -Name "main.py"
virtualenv venv
.\venv\Scripts\activate

To verify that the virtual environment is active, make sure (venv) is in the PowerShell command prompt. For example, (venv) PS C:\Users\username\python-sqs-sender>

How to Create Python Project Files with Linux Ubuntu 14.04+ or macOS

cd ~
mkdir python-sqs-sender
cd python-sqs-sender
virtualenv -p python3 venv
source venv/bin/activate
touch main.py

To verify that the virtual environment is active, make sure (venv) is in the terminal command prompt.

This will create the following files and folders, and activate the virtual environment.

▾ python-sqs-sender/
  ▸ venv/
  logger.py
  main.py

Before you continue, double check to make sure you've imported the logger.py file from this article, which will allow our Python service to log to CloudWatch logs.

Full Code Example for the Python SQS Message Sender

Add the following code to your main.py file. This program is formatted to be compatible with AWS Lambda, which allows you to spin up backend services and code without having to provision or manage servers. All you need to worry about is making your code as lightweight as possible and AWS handles all of the deployments and autoscaling for you.

Filename: main.py

import os
import boto3
import awsgi
from flask import Flask, request, jsonify
from flask_cors import CORS
import logger
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

app = Flask(__name__)
CORS(app, resources={r'/*': {'origins': list(os.environ['ORIGINS'].split(','))}}, supports_credentials=True)

try:
    sqs_client = boto3.client(
        'sqs',
        region_name=os.environ['AWS_REGION'],
        endpoint_url=os.environ['SQS_ENDPOINT'],
        use_ssl=os.environ['USE_SSL'] == '1',
        verify=False,
        aws_access_key_id=os.environ['ACCESS_KEY'],
        aws_secret_access_key=os.environ['SECRET_KEY'])
except Exception as e:
    logger.ex(e)


def json_status(status_code, message):
    res = jsonify({
        'isBase64Encoded': False,
        'statusCode': status_code,
        'body': message
    })
    res.status = '%s %s' % (status_code, message)
    res.message = message
    return res


@app.route('/', methods=['POST'])
def send_message():
    json_data = request.get_json()
    message = json_data.get('message')
    some_attribute = json_data.get('some_attribute')
    another_attribute = json_data.get('another_attribute')

    if message is None:
        return json_status('400', 'Message is required')

    if some_attribute is None:
        return json_status('400', 'Some attribute is required')

    if another_attribute is None:
        return json_status('400', 'Another attribute is required')

    try:
        queue_url = sqs_client.get_queue_url(QueueName=os.environ['SQS_QUEUE_NAME'])['QueueUrl']
        sqs_client.send_message(
            QueueUrl=queue_url,
            DelaySeconds=10,
            MessageBody=message,
            MessageAttributes={
                'some_attribute': {
                    'DataType': 'String',
                    'StringValue': some_attribute
                },
                'another_attribute': {
                    'DataType': 'String',
                    'StringValue': another_attribute
                }
            }
        )
        logger.ex('Message sent')
        return json_status(200, 'Message sent')
    except Exception as e:
        logger.ex(e)
        return json_status(500, str(e))


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5100, debug=True)


def lambda_handler(event, context):
    return awsgi.response(app, event, context)

Explanation of the main.py Code

Lines 1-8: imports all the required modules.

Line 10: instantiates our Flask app and line 11 indicates which domains we will allow requests from through CORS.

Lines 13-23: instantiates our SQS client. If you're not familiar with environment variables, they get explained later on in this tutorial.

Lines 26-34: formats a json response that is compatible with AWS Lambda, ensuring that our reponses make it back through API Gateway and into the browser.

Lines 37-56: defines a route we can send requests to that will invoke the Lambda function.

Lines 59-60: allows us to run our Flask app locally on port 5100 and lines 63-64 are necessary for AWS Lambda to invoke the service through Flask routes.

How to Run the Program

Activate your virtual machine by running source venv/bin/activate from inside the project root folder. Then run python main.py to execute the program.

You should receive a ModuleNotFoundError similar to the following. This just means you need to pip install a few dependencies in order to run the program.

Traceback (most recent call last):
    File "main.py", line 2, in 
      import boto3
  ModuleNotFoundError: No module named 'boto3'

Installing Python Module Dependencies

So if you see the above error about boto3, run pip install boto3. When you try to run python main.py again, it will complain about another missing module so pip install that as well. By the end, you should have run the following commands:

source venv/bin/activate
pip install boto3
pip install aws-wsgi
pip install flask
pip install flask_cors

Now, when you run python main.py, you should see another error:

Traceback (most recent call last):
  File "main.py", line 11, in 
    CORS(app, resources={r'/*': {'origins': list(os.environ['ORIGINS'].split(','))}}, supports_credentials=True)
  File "/usr/lib/python3.8/os.py", line 675, in __getitem__
    raise KeyError(key) from None
KeyError: 'ORIGINS'

Running the Entire Stack Locally

Setting Environment Variables

While it can be more convenient to add environment variables in our ~/.bashrc file, we're going to set them in the command we use to run the service, for tutorial purposes.

Creating a Localstack SQS Queue

With localstack running, create an SQS queue, if you have not done so, already:

aws sqs create-queue --queue-name MyQueue --endpoint-url http://0.0.0.0:4566 --region us-east-1 --profile localstack

If you get the error, The config profile (localstack) could not be found, it's because you haven't added the localstack profile to your ~/.aws/credentials file. Complete the prerequisites and return to this tutorial. Then, you'll be able to run the flask service with environment variables set to the localstack end points, with the following command:

Running the Flask Service

ORIGINS=https://somewebsite.com,https://somewebsite2.com AWS_REGION=us-east-1 SQS_ENDPOINT=http://0.0.0.0:4566 USE_SSL=0 ACCESS_KEY=foo SECRET_KEY=bar SQS_QUEUE_NAME=Queue python main.py

ORIGINS is not so important for testing purposes, because we'll be using curl to send requests. But when we launch to production, we'll be able to pass any number of comma-separated domains that we want to allow requests from. Run the following command, for now:

Sending a Request to the Flask Service/API That's Connected to LocalStack

Open up a new terminal window and run the following curl command. You should have 3 terminal windows open at this point—one for localstack, one for the flask service, and one to run curl.

curl -ivvv localhost:5100 -H "Content-Type: application/json" -d '{"message": "My message", "some_attribute": "Something", "another_attribute": "Another thing"}'

If everything was set up properly, you should have received the following output:

{
  "body": "Message sent", 
  "isBase64Encoded": false, 
  "statusCode": 200
}

Verifying that the LocalStack SQS Queue Received the Message

To verify that the localstack SQS queue received the message from our curl request, run the following command:

aws sqs receive-message --queue-url localhost:4566/queue/Queue --endpoint-url http://0.0.0.0:4566 --region us-east-1 --profile localstack
{
    "Messages": [
        {
            "MessageId": "5caac084-9950-5050-6a57-a7c1890d7e0b",
            "ReceiptHandle": "dygasgldmlswerjwymbkcnrsdwnbimzjtvztyxtxiueeqrirxobfrtreapczopidasykadqasvmuuhetozdofpwokydsmerzrnmbghdmyhlcaeabkgmnyzwifnkndtzohebctpxxjzoeylmijjnpxidpqecoecgiwbdiyekvzddmscgfjwtwjzidq",
            "MD5OfBody": "be59f66a07d05b2c52a1387d1f62d753",
            "Body": "My message",
            "Attributes": {
                "SenderId": "AIDAIT2UOQQY3AUEKVGXU",
                "SentTimestamp": "1595518929857",
                "ApproximateReceiveCount": "1",
                "ApproximateFirstReceiveTimestamp": "1595525992436"
            }
        }
    ]
}

Running the Service Locally, While Pointing to a Live SQS Queue

This scenario is ideal for when you want to ensure that AWS and SQS are set up properly, and the problem is not in the code. If you do find a typo in the code, you'll be able to fix it and rerun the flask service, locally.

Setting Up an API User with Limited Permissions in the IAM Console

Because we're going to be using the AWS CLI to set up and communicate with SQS, you'll want to set up an api user with limited permissions. If you're not sure how to do this, read our tutorial about Making Your AWS Account More Secure By Restricting Access Key Permissions. Do not use your root user or any key pair with full permissions, as it's a huge security risk. Once you've created the api user, attach the following policies:

  • AmazonSQSFullAccess

Adding the API User's Access Keys to a Local API Profile

Once you've attached the above policy, add your API user's access key ID and secret access key to an [api] profile in your ~/.aws/credentials file.

[default]
aws_access_key_id = foo
aws_secret_access_key = bar

[localstack]
aws_access_key_id = foo
aws_secret_access_key = bar

[api]
aws_access_key_id = API_USER_ACCESS_KEY_GOES_HERE
aws_secret_access_key = API_USER_SECRET_KEY_GOES_HERE

Listing Existing Production SQS Queues

You will then be able to list your SQS queues with the following command. Make sure to change the region in both the --endpoint-url and --region if your production services are running in a different region than us-east-1.

aws sqs list-queues --endpoint-url https://sqs.us-east-1.amazonaws.com --region us-east-1 --profile api
{
  "QueueUrls": [
      "https://sqs.us-east-1.amazonaws.com/000000000000/MyQueue"
  ]
}

Creating a Production SQS Queue

If you have not yet created any SQS queues in production, the list above will be empty. Create a new SQS queue with your preferred --queue-name.

aws sqs create-queue --queue-name MyQueue --endpoint-url https://sqs.us-east-1.amazonaws.com --region us-east-1 --profile api

Verifying the Production SQS Queue Was Created

Make sure this new queue was added by running the aws sqs list-queues --endpoint-url https://sqs.us-east-1.amazonaws.com --region us-east-1 --profile api command again, and copy its QueueUrl, which should be in the following format: https://sqs.us-east-1.amazonaws.com/000000000000/MyQueue

Once you have the QueueUrl, you'll be able to do things like delete the queue with the aws sqs delete-queue --queue-url https://sqs.us-east-1.amazonaws.com/000000000000/MyQueue --endpoint-url https://sqs.us-east-1.amazonaws.com --region us-east-1 --profile api command. But don't do anything like that now. We need this queue in production to test our flask service against it.

Running the Local Flask Service Against the Production SQS Queue

Run the following command: ORIGINS=https://somewebsite.com,https://somewebsite2.com AWS_REGION=us-east-1 SQS_ENDPOINT=https://sqs.us-east-1.amazonaws.com USE_SSL=1 ACCESS_KEY=API_USER_ACCESS_KEY_GOES_HERE SECRET_KEY=API_USER_SECRET_KEY_GOES_HERE SQS_QUEUE_NAME=MyQueue python main.py

Sending a Request to the Flask Service/API That's Connected to the Production SQS Queue

Then, use curl to send an API request and message to your local service, which is now connected to the production SQS queue: curl -ivvv localhost:5100 -H "Content-Type: application/json" -d '{"message": "My message", "some_attribute": "Something", "another_attribute": "Another thing"}'

Verifying that the Production SQS Queue Received the Message

And to verify that the production SQS queue received the message, run the receive-message command again, but this time, make sure the endpoints are pointing to your production instance: aws sqs receive-message --queue-url https://sqs.us-east-1.amazonaws.com/000000000000/MyQueue --endpoint-url https://sqs.us-east-1.amazonaws.com --region us-east-1 --profile api

Conclusion

At some point, you will want to deploy the Flask service to AWS Lambda in production. This task, however, is beyond the scope of this tutorial. We will cover this topic, in the future, so subscribe to get notified when that tutorial is complete. Lastly, when you're finished running API commands, downgrade the api user's SQS policy to AWSLambdaSQSQueueExecutionRole, which ensures that your flask service only has the necessary permissions to receive and delete messages, and not make modifications to any SQS buckets.

If you're interested in programs that carry out your computer tasks for you, take our Automation the Easy Way with Python course. This course teaches CSV and Excel file generation, API requests, website scraping, email delivery, task scheduling, and browser click, mouse, and keyboard automation. Automate your daily tasks, free up time, and get ahead, today.

Want To See More Exercises?

View Exercises View Courses

Comments

You must log in to comment. Don't have an account? Sign up for free.

Subscribe to comments for this post

Want To Receive More Free Content?

Would you like to receive free resources, tailored to help you reach your IT goals? Get started now, by leaving your email address below. We promise not to spam. You can also sign up for a free account and follow us on and engage with the community. You may opt out at any time.



Tell Us About Your Project









Contact Us

Do you have a specific IT problem that needs solving or just have a general IT question? Use the contact form to get in touch with us and an IT professional will be with you, momentarily.

Hire Us

We offer web development, enterprise software development, QA & testing, google analytics, domains and hosting, databases, security, IT consulting, and other IT-related services.

Free IT Tutorials

Head over to our tutorials section to learn all about working with various IT solutions.

Contact