How to Receive SQS Messages and Insert Into a Postgresql Database with Python and Boto3

2020-10-26 17:31:00 | #programming #python #sysadmin #aws #sqs #database

Today, we're learning how to code an AWS Lambda-compatible Python worker that consumes messages from an SQS queue and inserts them into a database.

The first code example demonstrates how to consume messages in a local environment and can be manually executed. The second example provides you with production-ready Lambda-compatible code.

Completing the Prerequisites

It is important that you complete the prerequisites listed above, as this program depends on few topics already covered. The first is a custom logger.py module that correctly logs statements to CloudWatch. The next is a localstack environment for provisioning the AWS cloud stack on your local machine, without having to sign up for AWS. And the last topic covers how to receive messages from SQS. Please complete those three tutorials before proceeding.

We're going to continue with the project folder and files from the How to Receive Messages from SQS with Python and Boto3 tutorial.

Reactivating the Python Virtual Environment

How to Reactivate the Python Virtual Environment with Windows 10 PowerShell 2.0+

cd ~/python-sqs-receiver
.\venv\Scripts\activate

To verify that the virtual environment is active, make sure (venv) is in the PowerShell command prompt. For example, (venv) PS C:\Users\username\python-sqs-receiver>

How to Reactivate the Python Virtual Environment with Linux Ubuntu 14.04+ or macOS

cd ~/python-sqs-receiver
source venv/bin/activate

To verify that the virtual environment is active, make sure (venv) is in the terminal command prompt.

Importing the Postgresql Database Client

To make it easier to connect to a Postgresql database and perform the insert query, we can import the database.py file from this article.

Double check to make sure you have the following files and folders.

▾ python-sqs-receiver/
  ▸ venv/
  database.py
  logger.py
  main.py

Setting Up the Postgresql Database

If you're not sure how to set up a Postgresql database, you can follow our How to Set Up a Postgresql Database Container with Docker Machine tutorial.

Whichever Postgresql solution you decide to go with, justmMake sure to take note of the DB name, user, password, IP address (host) and port number, as they will be required to connect.

Creating the Postgresql Database Table Columns

First, create the following sql script and name it psql_script.sql. This script drops the messages, and creates a new one with columns, indicated. We will use this table to store data from incoming SQS messages.

Filename: psql_script.sql

\set db `echo "$POSTGRES_DB"`;
\set usr `echo "$POSTGRES_USER"`;

DROP TABLE messages;

CREATE TABLE IF NOT EXISTS messages (
  id uuid NOT NULL,
  body TEXT NOT NULL,
  some_attr VARCHAR(255),
  some_attr2 BYTEA,
  some_attr3 INTEGER,
  created_date TIMESTAMP NOT NULL,
  PRIMARY KEY (id)
);

Now, run the following commands. The first command displays the dev docker machine's IP address. Yours will, most likely, be something different, so make sure to replace 192.168.99.122, below. Ignore the ERROR: table "messages" does not exist error that displays the first time you run this command.

# Display the docker-machine's IP
docker-machine ip dev
192.168.99.122

# Run the psql script
POSTGRES_DB=pgdb POSTGRES_USER=pguser psql -h 192.168.99.122 -U pguser -d pgdb -f psql_script.sql -W
Password: 
psql:psql_script.sql:4: ERROR:  table "messages" does not exist
CREATE TABLE

# Connect to the database
psql -h 192.168.99.122 -U pguser -d pgdb -W
  
# Ensure the table was created
\dt

Full Code Example for a Manual Python SQS Message Receiver

Add the following code to your main.py file.

Filename: main.py

import os
import uuid
import boto3
import datetime
from database import PsqlClient

try:
    sqs_client = boto3.client(
        'sqs',
        region_name=os.environ['AWS_REGION'],
        endpoint_url=os.environ['SQS_ENDPOINT'],
        use_ssl=os.environ['USE_SSL'] == '1',
        verify=False,
        aws_access_key_id=os.environ['ACCESS_KEY'],
        aws_secret_access_key=os.environ['SECRET_KEY'])
except Exception as e:
    print(e)

try:
    db = PsqlClient(
        database=os.environ['POSTGRES_DB'],
        user=os.environ['POSTGRES_USER'],
        password=os.environ['POSTGRES_PASSWORD'],
        host=os.environ['POSTGRES_HOST'],
        port=int(os.environ['POSTGRES_PORT'])
    )
except Exception as e:
    print(e)

queue_url = sqs_client.get_queue_url(QueueName=os.environ['SQS_QUEUE_NAME'])['QueueUrl']

try:
    # Receive message from SQS queue
    response = sqs_client.receive_message(
        QueueUrl=queue_url,
        AttributeNames=[
            'SentTimestamp'
        ],
        MaxNumberOfMessages=1,
        MessageAttributeNames=[
            'All'
        ],
        VisibilityTimeout=0,
        WaitTimeSeconds=0
    )

    message = response['Messages'][0]
    receipt_handle = message['ReceiptHandle']

    body = message['Body']
    some_attr = message['MessageAttributes']['SomeAttribute']['StringValue']
    some_attr2 = message['MessageAttributes']['SomeAttribute2']['BinaryValue']
    some_attr3 = message['MessageAttributes']['SomeAttribute3']['StringValue']
    print(body, some_attr, some_attr2, some_attr3)

    try:
        db.execute('INSERT INTO messages (id, body, some_attr, some_attr2, some_attr3, created_date) \
            VALUES (%(id)s, %(body)s, %(some_attr)s, %(some_attr2)s, %(some_attr3)s, %(created_date)s)', (
            {
                'id': uuid.uuid4(),
                'body': body,
                'some_attr': some_attr,
                'some_attr2': some_attr2,
                'some_attr3': some_attr3,
                'created_date': datetime.datetime.utcnow()
            })
        )

        # Delete received message from queue
        sqs_client.delete_message(
            QueueUrl=queue_url,
            ReceiptHandle=receipt_handle
        )
        print('Received message: %s' % message)
    except Exception as e:
        print(e)

except Exception as e:
    print(e)

Explanation of the main.py Code

Lines 1-5: imports all the required modules.

Lines 7-17: Instantiates the sqs_client which connects to an SQS endpoint, based on the environment variables that you set.

Lines 19-28: Instantiates a PsqlClient which connects to a postgresql database, with the specified location and credentials.

Lines 32-79: Receives the number of messages indicated in line 39, extracts the content, attributes, and receipt handle, then proceeds to store the message in the DB, and deletes the message from the queue.

How to Run the Program While Pointing to LocalStack

Installing Python Dependencies

If you receive a ModuleNotFoundError, you will have to pip3 install aws-psycopg2 and any other required modules that you're prompted for.

Sending Messages to SQS

Add the following json to an attributes.json file. This is a generic example that teaches you about various data types. Messages attributes are fully customizeable, but they must adhere to the following standard. After you complete this tutorial, feel free to change the attribute names and values.

Filename: attributes.json

{
  "SomeAttribute": {
    "DataType": "String",
    "StringValue": "Some Value"
  },
  "SomeAttribute2": {
    "DataType": "Binary",
    "BinaryValue": "Another Value"
  },
  "SomeAttribute3": {
    "DataType": "Number",
    "StringValue": "123"
  }
}

Send the SQS message with the following command.

aws sqs send-message \
  --queue-url=http://0.0.0.0:4566/queue/MyQueue \
  --message-body 'Content' \
  --message-attributes=file://attributes.json \
  --endpoint-url=http://0.0.0.0:4566 \
  --region us-east-1 \
  --profile localstack
{
    "MD5OfMessageBody": "f15c1cae7882448b3fb0404682e17e61",
    "MD5OfMessageAttributes": "485f25af5d01da7b500440fa9459ac92",
    "MessageId": "5e94426b-6701-b141-1c9c-a79defd0fc88"
}

Running the main.py Code

Now, run the following command. We set the environment variables in this command, for tutorial purposes, but feel free to set them in your ~/.bashrc file.

AWS_REGION=us-east-1 SQS_ENDPOINT=http://0.0.0.0:4566 USE_SSL=0 ACCESS_KEY=foo SECRET_KEY=bar SQS_QUEUE_NAME=MyQueue POSTGRES_DB=pgdb POSTGRES_USER=pguser POSTGRES_PASSWORD=Ch4ng3M3 POSTGRES_HOST=192.168.99.122 POSTGRES_PORT=5432 python main.py

Querying the Postgresql Database

Here, we want to make sure our SQS message values made it into the database, and the message was removed from the SQS queue.

# Connect to the database
psql -h 192.168.99.122 -U pguser -d pgdb -W
  
# Ensure the message was inserted into the DB
select * from messages;
  
# Exit psql
exit
  
# Ensure the message was removed from SQS
aws sqs receive-message --queue-url http://0.0.0.0:4566/queue/MyQueue --endpoint-url http://0.0.0.0:4566 --region us-east-1 --profile localstack

Full Code Example for an Event-Based Python SQS Message Receiver That Saves to a DB

This example is more suited for production. It's formatted to be AWS Lambda-compatible and can be set up to execute lambda_handler on an SQS event trigger, which would fire as messages entered the queue. Or an EventBridge trigger that runs at a specified time interval. Polling, basically.

Filename: main.py

import logger
from database import PsqlClient

try:
    db = PsqlClient(
        database=os.environ['POSTGRES_DB'],
        user=os.environ['POSTGRES_USER'],
        password=os.environ['POSTGRES_PASSWORD'],
        host=os.environ['POSTGRES_HOST'],
        port=int(os.environ['POSTGRES_PORT'])
    )
except Exception as e:
    logger.ex(e)

def lambda_handler(event, context):
    for record in event['Records']:
        body = message['Body']
        some_attr = message['MessageAttributes']['SomeAttribute']['StringValue']
        some_attr2 = message['MessageAttributes']['SomeAttribute2']['BinaryValue']
        some_attr3 = message['MessageAttributes']['SomeAttribute3']['StringValue']

        try:
            db.execute('INSERT INTO messages (id, body, some_attr, some_attr2, some_attr3, created_date) \
                VALUES (%(id)s, %(body)s, %(some_attr)s, %(some_attr2)s, %(some_attr3)s, %(created_date)s)', (
                {
                    'id': uuid.uuid4(),
                    'body': body,
                    'some_attr': some_attr,
                    'some_attr2': some_attr2,
                    'some_attr3': some_attr3,
                    'created_date': datetime.datetime.utcnow()
                })
            )
        except Exception as e:
            logger.ex(e)
            raise

Explanation of the main.py Code

Lines 1-2: imports all the required modules.

In lines 15-36 is the logic for our lambda_handler. This function automatically gets invoked by the trigger we set up in production. Notice how we wrap the logic in a try except block, logging the error to Cloudwatch with logger.ex() and then using raise to cause the error to bubble up.

If the process was successful, the message will be discarded from the SQS queue as it is no longer needed. But if an error occurs, the exception that bubbled up will force the SQS message to remain in the queue for future attempts. Make sure to keep an eye on your logs or set an alarm if a Lambda error occurs so that you can read the logs and address the issue. Otherwise, your lambda will continue trying to process the message.

There are various ways to handle unconsummable messages in the SQS queue, that are beyond the scope of this article. But we recommend analyzing how the message was created, first, to ensure that all the attributes and values are what this consumer expects, and update the code, if necessary. If there was a mistake in how the message was created, you have the option to purge the old message so that the SQS trigger doesn't constantly invoke this function.

When you're finished running API commands, downgrade the api user's SQS policy to AWSLambdaSQSQueueExecutionRole, which ensures that your flask service only has the necessary permissions to receive and delete messages, and not make modifications to any SQS buckets.

Conclusion

That's the end of this tutorial. We hope you found it helpful. Make sure to check out our other tutorials, as well.

If you're interested in programs that carry out your computer tasks for you, take our Automation the Easy Way with Python course. This course teaches CSV and Excel file generation, API requests, website scraping, email delivery, task scheduling, and browser click, mouse, and keyboard automation. Automate your daily tasks, free up time, and get ahead, today.

Want To See More Exercises?

View Exercises View Courses

Comments

You must log in to comment. Don't have an account? Sign up for free.

Subscribe to comments for this post

Want To Receive More Free Content?

Would you like to receive free resources, tailored to help you reach your IT goals? Get started now, by leaving your email address below. We promise not to spam. You can also sign up for a free account and follow us on and engage with the community. You may opt out at any time.



Tell Us About Your Project









Contact Us

Do you have a specific IT problem that needs solving or just have a general IT question? Use the contact form to get in touch with us and an IT professional will be with you, momentarily.

Hire Us

We offer web development, enterprise software development, QA & testing, google analytics, domains and hosting, databases, security, IT consulting, and other IT-related services.

Free IT Tutorials

Head over to our tutorials section to learn all about working with various IT solutions.

Contact