In a previous post, we learnt How to send Mule logs to SQS. As we saw, sending logs to a message broker as an intermediate step instead of sending them directly to the final aggregation/logging system can have multiple benefits, such as reliability, scalability or flexibility in distributed environments like Mulesoft. In this post, we’ll see the second part of the whole scenario - once we know how to send the logs to a broker like SQS, we will now see how to send those logs from SQS to our logging system.
For that, we will use Logstash, one of the key components of the ELK stack. We will set up Logstash to consume messages from our SQS queue and send them to Elasticsearch. We’ve chosen Elasticsearch as the logging database were our logs will be stored, but it’s also possible to send them from Logstash to other destinations like Splunk, Datadog etc
Let’s dive in!
Prerequisites
To follow this tutorial, we will need:- An instance of Logstash up & running. It can be hosted in a remote server, in your local system or in a docker container. In this tutorial, we will use a Logstash instance, version 8.15.3, installed on an Ubuntu Server 24.04 LTS. Follow this post to see How to Install Logstash on Linux
- An instance of Elasticsearch where we will send the logs. Optionally, we will also be using an instance of Kibana, as the preferred UI for Elasticsearch. In this tutorial, both Elasticsearch and Kibana will be on version 8.15.3 (same as Logstash) and be running on the same server, Ubuntu Server 24.04 LTS. Elasticsearch+Kibana will run on a different server, not in the same server as Logstash. Check out these posts with different ways of installing Elasticsearch and Kibana:
- How to Install Elasticsearch and Kibana on Linux - Part I
- How to Install Elasticsearch and Kibana on Linux - Part II
- How to Install Elasticsearch on Docker
- How to Install Kibana on Docker
- Install Elasticsearch and Kibana with Docker Compose
- You will also need an AWS account where you can use the Simple Queue Service (SQS).
- It is required that you’ve completed the previous part of this tutorial - How to send Mule logs to SQS so that we can start this exercise from a queue in SQS where we’ve got some messages (logs) waiting to be consumed.
Define the Logstash Pipeline
The way Logstash processes logs is by defining a pipeline. A Logstash pipeline is a series of processing stages that Logstash uses to ingest, transform, and route data from one or more input sources to one or more output destinations. Each pipeline consists of three main components: input, filter, and output. Together, these stages define how data flows through Logstash.The pipeline we will build:
For that, before setting up our pipeline, from our Logstash server we need to verify if the plugin is already installed in our system or not. We can easily retrieve all the input plugins installed by running the command:
- Inputs - The input of our pipeline will be the messages in the SQS queue. For that, we will install the SQS input plugin, which basically allows logstash to create (and configure) a consumer of that queue
- Filter - The logs that we sent in Part I were formatted with the default PatternLayout in Mule. We will use a Grok filter to convert those logs to JSON and extract the info into fields. Check out the post Logstash Grok Filters for Mule Logs in PatternLayout to see how to build this filter.
- Output - The destination of our logs will be Elasticsearch, so in the output section we will provide the details to connect to our Elasticsearch instance (url, username, pasword, index)
Install the SQS input plugin
As we’ve just mentioned, for the input of our Logstash Pipeline we will use the SQS input plugin.For that, before setting up our pipeline, from our Logstash server we need to verify if the plugin is already installed in our system or not. We can easily retrieve all the input plugins installed by running the command:
sudo /usr/share/logstash/bin/logstash-plugin list --group input
If the SQS input plugin is not installed, this is how we install it:
- To install a plugin in Logstash, the first thing we do is to get the name of the plugin. Go to the Input Plugins list an look for the SQS plugin. You will see, the name of the plugin with the link to its Github repo. Copy the name of the SQS plugin. At the time of writing, that name is logstash-input-sqs
Then, run the command:
sudo /usr/share/logstash/bin/logstash-plugin install logstash-input-sqs
Create an IAM user in AWS for Logstash user in Elasticsearch
IAM user in AWS
To consume messages from the queue, Logstash will need to authenticate to AWS. For that, we need a user in our AWS account with permissions for that. As a best practice, we always follow the Principle of Least Privilege (PoLP), so we should create a user in our AWS account with permissions to ONLY read messages from ONLY our logs queue.However, just for demo purposes we will use the same user as we use in our previous post to send and receive logs to an SQS queue. In any case, to create another user with those specific permissions we can follow the steps described in the first part of this tutorial. So, take note of the access key and secret of the AWS user.
Logstash user in Elasticsearch
For our pipeline output, Logstash will be connecting to Elasticsearch and ingesting logs to an index. So, for that, Logstash will need a user with permissions in Elasticsearch to create indices and send messages to those indices. For that we need the following privileges (according to Elastic official docs):Cluster Privileges
- manage_index_templates
- manage_ilm
- monitor
- create
- create_index
- manage
- manage_ilm
- write
- auto_configure
With CURL:
curl -X POST 'http://[YOUR_ELASTICSEARCH_SERVER]:9200/_security/role/[MULE_LOGSTASH_WRITER]' \
-u [YOUR_ELASTIC_ADMIN_USER]:[YOUR_ADMIN_PASSWORD] \
--header 'Content-Type: application/json' \
--data '{
"cluster": ["manage_index_templates", "monitor", "manage_ilm"
],
"indices": [
{
"names": ["*"],
"privileges": ["create", "auto_configure", "create_index", "manage", "manage_ilm", "write"
]
}
]
}'
YOUR_ELASTICSEARCH_SERVER
- Hostname/DNS name that identifies your elasticsearch server in the networkMULE_LOGSTASH_WRITER
- The name of the custom role you want to create- Default port for Elasticsearch is 9200. Change it if you’re using another port in your Elasticsearch instance.
[YOUR_ELASTIC_ADMIN_USER]
- An admin user with permissions to create roles and users on elastic. In here, we can use the elastic superuser[YOUR_ADMIN_PASSWORD]
- The password of your admin user- The body of the request will contain all the Privileges we defined previously for our logstash custom role
Create a User and assign the custom role
Next, we’ll create a user and assign the custom role we’ve defined, in our casemule-logstash-writer
With CURL:
curl -X POST 'http://[YOUR_ELASTICSEARCH_SERVER]:9200/_security/user/[MULE_LOGSTASH_WRITER]' \
--header 'Content-Type: application/json' \
-u [YOUR_ELASTIC_ADMIN_USER]:[YOUR_ADMIN_PASSWORD] \
--data '{
"password": "[LOGSTASH_USER_PASSWORD]",
"roles": ["LOGSTASH_CUSTOM_ROLE"],
"full_name": "Logstash User",
"email": "logstash@mulesoft.com"
}'
MULE_LOGSTASH_WRITER
- The name of the user you want to create- Default port for Elasticsearch is 9200. Change it if you’re using another port in your Elasticsearch instance.
[YOUR_ELASTIC_ADMIN_USER]
- An admin user with permissions to create roles and users on elastic. In here, we can use the elastic superuser[YOUR_ADMIN_PASSWORD]
- The password of your admin user- The body of the request will contain:
password
: Sets the password for the user.roles
: Assign the custom role we’ve createdfull_name
: Provides a descriptive/display name of the user
Check the connection to Elasticsearch
The last thing we should do for setting up the Logstash user is to verify that our Logstash server can connect to our Elasticsearch instance with that user/custom role. This way we can make sure the connection details are correct. For that, from the Logstash server run the curl command:curl http://[YOUR_ELASTICSEARCH_SERVER]:9200 -u [YOUR_LOGSTASH_USER]:[YOUR_PASSWORD]
Send logs to SQS
Before running logstash, run the app we created in the first part and send some requests so that we can see how logs are sent to SQS and that they will be kept in the queue until logstash consumes themSee the content of the message
Take note of the name of the queue and the AWS region in which our queue is running.
Set up the Logstash Pipeline
Next, we’ll set up the Logstash pipeline by defining a conf file. In our case, we will create a mule-sqs.conf file with the following content:Input - Here, we’ll include the details to connect to our SQS code
input {
sqs {
region => "[YOUR_AWS_REGION]"
queue => "[YOUR_MULE_LOGS_QUEUE]"
access_key_id => "[YOUR_ACCESS_KEY]"
secret_access_key => "[YOUR_SECRET_ACCESS_KEY]"
}
}
filter {
grok {
match => {"message" => "%{LOGLEVEL:log_level}%{SPACE}%{TIMESTAMP_ISO8601:time_stamp}%{SPACE}\[%{DATA:thread_name}\]%{SPACE}\[%{WORD}:%{SPACE}%{DATA:processor_name}\;%{SPACE}%{WORD}:%{SPACE}%{DATA:correlation_id}\]%{SPACE}%{JAVACLASS:logger_name}:%{SPACE}%{GREEDYDATA:log_message}"}
}
}
output {
elasticsearch {
hosts => ["[YOUR_ELASTICSEARCH_SERVER]:9200"]
user => "[YOUR_LOGSTASH_USER]"
password => "[YOUR_PASSWORD]"
index => "sqs-mule-logs"
}
stdout {
codec => rubydebug
}
}
This is the final conf file in our example:input {sqs {region => "eu-central-1"queue => "mule-logs"access_key_id => "AKIA44Y6CCVA7TE5ZFFF"secret_access_key => "33aCnOcultMcVm3pw25Qq+Dd8GvqZv/ZBV5IKlD6"}}filter {grok {match => {"message" => "%{LOGLEVEL:log_level}%{SPACE}%{TIMESTAMP_ISO8601:time_stamp}%{SPACE}\[%{DATA:thread_name}\]%{SPACE}\[%{WORD}:%{SPACE}%{DATA:processor_name}\;%{SPACE}%{WORD}:%{SPACE}%{DATA:correlation_id}\]%{SPACE}%{JAVACLASS:logger_name}:%{SPACE}%{GREEDYDATA:log_message}"}}}output {elasticsearch {hosts => ["ip-172-31-19-150.eu-central-1.compute.internal:9200"]user => "mule-logstash"password => "Mule1234"index => "sqs-mule-logs"}stdout {codec => rubydebug}}
Run the Logstash Pipeline
Run logstash using the conf file of our pipeline:sudo /usr/share/logstash/bin/logstash -f ~/sqs-logs/mule-sqs.conf
Head back to AWS SQS. Verify that the queue is now empty, Logstash has consumed all messages
Let’s now head over to Kibana and check if our logs have been ingested properly into Elasticsearch. First, go to Stack Management > Data > Index Management. We should see there is a new index sqs-mule-logs and it contains documents.
Next, create a data view and visualize the logs