Send Mule Logs from SQS to Elasticsearch via Logstash


In a previous post, we learnt How to send Mule logs to SQS. As we saw, sending logs to a message broker as an intermediate step instead of sending them directly to the final aggregation/logging system can have multiple benefits, such as reliability, scalability or flexibility in distributed environments like Mulesoft. 

In this post, we’ll see the second part of the whole scenario - once we know how to send the logs to a broker like SQS, we will now see how to send those logs from SQS to our logging system.

For that, we will use Logstash, one of the key components of the ELK stack. We will set up Logstash to consume messages from our SQS queue and send them to Elasticsearch. We’ve chosen Elasticsearch as the logging database were our logs will be stored, but it’s also possible to send them from Logstash to other destinations like Splunk, Datadog etc

Let’s dive in!

Prerequisites

To follow this tutorial, we will need:

Define the Logstash Pipeline

The way Logstash processes logs is by defining a pipeline. A Logstash pipeline is a series of processing stages that Logstash uses to ingest, transform, and route data from one or more input sources to one or more output destinations. Each pipeline consists of three main components: input, filter, and output. Together, these stages define how data flows through Logstash.


The pipeline we will build:
  • Inputs - The input of our pipeline will be the messages in the SQS queue. For that, we will install the SQS input plugin, which basically allows logstash to create (and configure) a consumer of that queue
  • Filter - The logs that we sent in Part I were formatted with the default PatternLayout in Mule. We will use a Grok filter to convert those logs to JSON and extract the info into fields. Check out the post Logstash Grok Filters for Mule Logs in PatternLayout to see how to build this filter.
  • Output - The destination of our logs will be Elasticsearch, so in the output section we will provide the details to connect to our Elasticsearch instance (url, username, pasword, index)


Install the SQS input plugin

As we’ve just mentioned, for the input of our Logstash Pipeline we will use the SQS input plugin. 
For that, before setting up our pipeline, from our Logstash server we need to verify if the plugin is already installed in our system or not. We can easily retrieve all the input plugins installed by running the command:

sudo /usr/share/logstash/bin/logstash-plugin list --group input

You might find the plugin referred as logstash-input-sqs or as part of the logstash-integration-aws plugin. if you see any of those, you can skip the next step.


If the SQS input plugin is not installed, this is how we install it:
  • To install a plugin in Logstash, the first thing we do is to get the name of the plugin. Go to the Input Plugins list an look for the SQS plugin. You will see, the name of the plugin with the link to its Github repo. Copy the name of the SQS plugin. At the time of writing, that name is logstash-input-sqs
 

Then, run the command:

sudo /usr/share/logstash/bin/logstash-plugin install logstash-input-sqs


Create an IAM user in AWS for Logstash user in Elasticsearch

IAM user in AWS

To consume messages from the queue, Logstash will need to authenticate to AWS. For that, we need a user in our AWS account with permissions for that. As a best practice, we always follow the Principle of Least Privilege (PoLP), so we should create a user in our AWS account with permissions to ONLY read messages from ONLY our logs queue. 

However, just for demo purposes we will use the same user as we use in our previous post to send and receive logs to an SQS queue. In any case, to create another user with those specific permissions we can follow the steps described in the first part of this tutorial. So, take note of the access key and secret of the AWS user.


Logstash user in Elasticsearch

For our pipeline output, Logstash will be connecting to Elasticsearch and ingesting logs to an index. So, for that, Logstash will need a user with permissions in Elasticsearch to create indices and send messages to those indices. For that we need the following privileges (according to Elastic official docs):
Cluster Privileges
  • manage_index_templates
  • manage_ilm
  • monitor
Index Privileges - As index we’l use the wildcard *
  • create
  • create_index
  • manage
  • manage_ilm
  • write
  • auto_configure
Alternatively, to create the custom role, we can use the Elastic REST API. For that, run the following command to create a role with the required permissions:
With CURL:

curl -X POST 'http://[YOUR_ELASTICSEARCH_SERVER]:9200/_security/role/[MULE_LOGSTASH_WRITER]' \
-u [YOUR_ELASTIC_ADMIN_USER]:[YOUR_ADMIN_PASSWORD] \
--header 'Content-Type: application/json' \
--data '{
"cluster": ["manage_index_templates", "monitor", "manage_ilm"],
"indices": [
{
"names": ["*"],
"privileges": ["create", "auto_configure", "create_index", "manage", "manage_ilm", "write"]
}
]
}'

Where:

  • YOUR_ELASTICSEARCH_SERVER - Hostname/DNS name that identifies your elasticsearch server in the network
  • MULE_LOGSTASH_WRITER - The name of the custom role you want to create
  • Default port for Elasticsearch is 9200. Change it if you’re using another port in your Elasticsearch instance.
  • [YOUR_ELASTIC_ADMIN_USER] - An admin user with permissions to create roles and users on elastic. In here, we can use the elastic superuser
  • [YOUR_ADMIN_PASSWORD] - The password of your admin user
  • The body of the request will contain all the Privileges we defined previously for our logstash custom role


Create a User and assign the custom role

Next, we’ll create a user and assign the custom role we’ve defined, in our case  mule-logstash-writer
With CURL:

curl -X POST 'http://[YOUR_ELASTICSEARCH_SERVER]:9200/_security/user/[MULE_LOGSTASH_WRITER]' \
--header 'Content-Type: application/json' \
-u [YOUR_ELASTIC_ADMIN_USER]:[YOUR_ADMIN_PASSWORD] \
--data '{
"password": "[LOGSTASH_USER_PASSWORD]",
"roles": ["LOGSTASH_CUSTOM_ROLE"],
"full_name": "Logstash User",
"email": "logstash@mulesoft.com"
}'

Where:

  • MULE_LOGSTASH_WRITER - The name of the user you want to create
  • Default port for Elasticsearch is 9200. Change it if you’re using another port in your Elasticsearch instance.
  • [YOUR_ELASTIC_ADMIN_USER] - An admin user with permissions to create roles and users on elastic. In here, we can use the elastic superuser
  • [YOUR_ADMIN_PASSWORD] - The password of your admin user
  • The body of the request will contain:
    • password: Sets the password for the user.
    • roles: Assign the custom role we’ve created
    • full_name : Provides a descriptive/display name of the user


Check the connection to Elasticsearch

The last thing we should do for setting up the Logstash user is to verify that our Logstash server can connect to our Elasticsearch instance with that user/custom role. This way we can make sure the connection details are correct. For that, from the Logstash server run the curl command:

curl http://[YOUR_ELASTICSEARCH_SERVER]:9200 -u [YOUR_LOGSTASH_USER]:[YOUR_PASSWORD]



Send logs to SQS

Before running logstash, run the app we created in the first part and send some requests so that we can see how logs are sent to SQS and that they will be kept in the queue until logstash consumes them



See the content of the message


Take note of the name of the queue and the AWS region in which our queue is running.


Set up the Logstash Pipeline

Next, we’ll set up the Logstash pipeline by defining a conf file. In our case, we will create a mule-sqs.conf file with the following content:

Input - Here, we’ll include the details to connect to our SQS code

input {
sqs {
region => "[YOUR_AWS_REGION]"
queue => "[YOUR_MULE_LOGS_QUEUE]"
access_key_id => "[YOUR_ACCESS_KEY]"
secret_access_key => "[YOUR_SECRET_ACCESS_KEY]"
}
}

Filter
 - We’re using the default PatternLayout of the Mule apps. In here, we’ll use a GROK Filter, so that we can extract the info in the logs and put them as JSON fields. For more details on this filter check out this post.

filter {
grok {
match => {"message" => "%{LOGLEVEL:log_level}%{SPACE}%{TIMESTAMP_ISO8601:time_stamp}%{SPACE}\[%{DATA:thread_name}\]%{SPACE}\[%{WORD}:%{SPACE}%{DATA:processor_name}\;%{SPACE}%{WORD}:%{SPACE}%{DATA:correlation_id}\]%{SPACE}%{JAVACLASS:logger_name}:%{SPACE}%{GREEDYDATA:log_message}"}
}
}

Output
 - Here, we’ll provide the details of our Elasticsearch instance, with the username and password we created before and a custom index for our logs. For testing purposes we’ll also include the rubydebug codec to see how logs are processing in the terminal.

output {
elasticsearch {
hosts => ["[YOUR_ELASTICSEARCH_SERVER]:9200"]
user => "[YOUR_LOGSTASH_USER]"
password => "[YOUR_PASSWORD]"
index => "sqs-mule-logs"
}
stdout {
codec => rubydebug
}

}
This is the final conf file in our example:

input {
sqs {
region => "eu-central-1"
queue => "mule-logs"
access_key_id => "AKIA44Y6CCVA7TE5ZFFF"
secret_access_key => "33aCnOcultMcVm3pw25Qq+Dd8GvqZv/ZBV5IKlD6"
}
}


filter {
grok {
match => {"message" => "%{LOGLEVEL:log_level}%{SPACE}%{TIMESTAMP_ISO8601:time_stamp}%{SPACE}\[%{DATA:thread_name}\]%{SPACE}\[%{WORD}:%{SPACE}%{DATA:processor_name}\;%{SPACE}%{WORD}:%{SPACE}%{DATA:correlation_id}\]%{SPACE}%{JAVACLASS:logger_name}:%{SPACE}%{GREEDYDATA:log_message}"}
}
}

output {
elasticsearch {
hosts => ["ip-172-31-19-150.eu-central-1.compute.internal:9200"]
user => "mule-logstash"
password => "Mule1234"
index => "sqs-mule-logs"
}
stdout {
codec => rubydebug
}
}

Run the Logstash Pipeline

Run logstash using the conf file of our pipeline:

sudo /usr/share/logstash/bin/logstash -f ~/sqs-logs/mule-sqs.conf

Check the output of the terminal and verify with the ruby output that logstash is processing messages


Head back to AWS SQS. Verify that the queue is now empty, Logstash has consumed all messages


Let’s now head over to Kibana and check if our logs have been ingested properly into Elasticsearch. First, go to Stack Management > Data > Index Management. We should see there is a new index sqs-mule-logs and it contains documents.


Next, create a data view and visualize the logs


Previous Post Next Post