top of page
Writer's pictureGang Tao

Monitor AWS Cloud Activities in Real-Time Using Timeplus

AWS is the largest and most widely adopted cloud provider globally. It has been a leader in the cloud computing market since its launch in 2006. It provides a wide range of on-demand cloud services such as computing power, storage, databases, machine learning, networking, and more.


Whether it's currently at Timeplus, running a SaaS-based streaming analytics server, or in the past while working for other big software companies, AWS has always been a key part of supporting our daily operations and development. AWS can be complex or complicated, and managing it isn't easy. That's why it's essential to monitor AWS events and activities, especially for maintaining security, compliance, performance, and overall efficiency in cloud environments.


In today’s blog, I am going to share how you can set up a real-time AWS monitoring solution using different AWS services and Timeplus. Here are some quick introductions about the services and tools I will cover in this blog.


  • AWS CloudTrail is a service that enables governance, compliance, operational auditing, and risk auditing of your AWS account. It logs all API calls and actions taken within your AWS environment, providing a detailed record of what occurred, when it happened, who initiated it, and where it was made from. This log data helps you track changes, monitor activity, and detect potential security threats.


  • AWS S3 (Simple Storage Service) is a scalable, high-performance, and secure object storage. It is designed to store and retrieve any amount of data from anywhere on the internet at any time. S3 is known for its durability, availability, and low cost, making it one of the most popular cloud storage solutions.


  • AWS Lambda is a serverless compute service that allows you to run code without provisioning or managing servers. Lambda automatically handles the infrastructure, scaling, and execution, letting you focus solely on your application code. It runs your code in response to events, such as changes in data, HTTP requests, or activity in AWS services like S3 or DynamoDB.


  • Amazon SQS (Simple Queue Service) is a fully managed message queuing service provided by AWS that allows you to decouple and scale microservices, distributed systems, and serverless applications. SQS enables asynchronous communication between different components of a system by passing messages between them in the form of a queue.


  • Redpanda Connect (Previously Benthos, which was acquired by Redpanda) is a powerful, lightweight data stream processor that supports various input, output, and transformation components, making it highly customizable and efficient.


  • Timeplus is a SQL stream processing and analytics engine with columnar and row stores in a single-binary engine. It supports building real-time applications at the edge or cloud.



 

Solution 1: Lambda Function + Timeplus



The above architecture diagram shows how the solution works. 


  1. CloudTrail track all your AWS API calls and actions, save such event data into S3

  2. A Lambda function is triggered when new objects are created in S3 and calls Timeplus’s HTTP Ingest API to push the CloudTrail event into a Timeplus stream

  3. Timeplus stream will store all the cloudtrail events as soon as the event triggered, and can analyze those events in real-time or do historical data analysis.


Here is a step by step guide:


Step 1:


Set up CloudTrail to deliver logs to S3. Go to the AWS CloudTrail Console and set up a trail if you haven’t already. Configure CloudTrail to deliver logs to an S3 bucket.


The CloudTrail events are stored in S3 as a gzipped json file.

Here is a sample of what CloudTrail event looks like for your reference.

{

  "Records": [

    {

      "eventVersion": "1.08",

      "userIdentity": {

        "type": "IAMUser",

        "principalId": "AIDEXAMPLEUSERID",

        "arn": "arn:aws:iam::123456789012:user/JohnDoe",

        "accountId": "123456789012",

        "userName": "JohnDoe"

      },

      "eventTime": "2023-09-30T15:15:22Z",

      "eventSource": "ec2.amazonaws.com",

      "eventName": "StartInstances",

      "awsRegion": "us-west-2",

      "sourceIPAddress": "203.0.113.42",

      "userAgent": "aws-cli/2.0.0 Python/3.8.8 Windows/10 botocore/2.1.0",

      "requestParameters": {

        "instancesSet": {

          "items": [

            {

              "instanceId": "i-0abcdef1234567890"

            }

          ]

        }

      },

      "responseElements": {

        "instancesSet": {

          "items": [

            {

              "instanceId": "i-0abcdef1234567890",

              "currentState": {

                "code": 0,

                "name": "pending"

              },

              "previousState": {

                "code": 80,

                "name": "stopped"

              }

            }

          ]

        }

      },

      "requestID": "e3b1c0a1-1a8b-4fdd-b543-5c013c158adc",

      "eventID": "65a929f0-29e8-4af7-a4d6-20e5a1f59972",

      "readOnly": false,

      "resources": [

        {

          "ARN": "arn:aws:ec2:us-west-2:123456789012:instance/i-0abcdef1234567890",

          "accountId": "123456789012"

        }

      ],

      "eventType": "AwsApiCall",

      "managementEvent": true,

      "eventCategory": "Management"

    }

  ]




Step 2:


Create a Stream in Timeplus used to store and analysis the real-time cloudtrail event. You can run DDL or using console UI to create a stream.


CREATE STREAM cloudtrail_events
(
  `raw` string,
)

This stream is very simple, just contain one field `raw` which will be used to store the cloudtrail json event.



Step 3:


Create a Lambda Function that will ingest CloudTrail event to Timeplus in real-time.

Here is the code for the Lambda Function, we chose python as the programming language to run the lambda function.


import boto3
import gzip
import json
import urllib3

url = "https://us-west-2.timeplus.cloud/workspaceid/api/v1beta2/streams/cloudtrail_events/ingest?format=lines"
headers = {
    "X-Api-Key": "your_api_key",
    "Content-Type": "text/plain"
}
http = urllib3.PoolManager()

def lambda_handler(event, context):
    # S3 client
    s3 = boto3.client('s3')
    
    # Get the object from the event
    bucket_name = event['Records'][0]['s3']['bucket']['name']
    object_key = event['Records'][0]['s3']['object']['key']
    
    # Download the log file from S3
    response = s3.get_object(Bucket=bucket_name, Key=object_key)
    
    # Read and decompress the log file (assuming it's .gz)
    with gzip.GzipFile(fileobj=response['Body']) as gz:
        logs = json.load(gz)
        
    result = []
    
    # Parse the log file for CloudTrail events
    for record in logs['Records']:
        # Prepare the payload
        data = json.dumps(record).encode('utf-8')
        # Make the POST request
        response = http.request(
            'POST',
            url,
            body=data,
            headers=headers
        )
        result.append(cloudtrail_event)
    
    # Return success message
    return {
        'statusCode': 200,
        'body': json.dumps(result)
    }

This lambda function:

  • Reads the s3 object bucked and key from the first event 

  • Reads the s3 object and unzip it

  • Iterate all the events in the cloudtrail event, calling Timeplus ingest api to ingest the cloudtrail event into the stream `cloudtrail_events` we created in step 2


There are some things to note here:

  • Urllib3 is used which is default included in AWS python lambda function, if you want to use other library like `requests`, you need run some extra configurations

  • As the lambda function here required to read S3, you need to add related permissions to the role of that lambda function

  • In most cases, CloudTrail batches the events together and delivers them to the destination (like an S3 bucket) as a set of individual events in one file. So, while a single record usually represents one API action or event, multiple records can appear together in the JSON log file. In that case, we need iterate all keys instead of just process the first record as above code shows.



Step 4:


Set up an S3 Event Trigger. Go to your S3 bucket and configure an event notification that triggers on new object creation. This event will trigger the AWS Lambda function we created at step 3, whenever a new CloudTrail log is delivered to the bucket.

Now, your CloudTrail events are ingested to Timeplus stream in real-time, and you can run Timeplus’s streaming SQL to monitor what's happening with your AWS cloud.


Solution 1 is easy to set up, while there are also some limitations

  • Need to expose the Timeplus ingest endpoint so that the Lambda function can access it. This is straightforward for Timeplus Cloud but not as user-friendly for OnPrem setups.

  • The Lambda function is called for each new cloudtrail event, in case of network error, even though there is retry mechanism provided by Lambda functions, the retry times have limitations, there is a chance that the error cannot be recovered in short term, the event will be lost. In such a case, SQS which provides persistent consumption, is a better solution.


 

Solution 2: SQS + Redpanda Connect + Timeplus



An alternative is to use Redpanda Connect to read the S3 data and send it to Timeplus. Redpanda Connect is already integrated with Timeplus, so there is no extra component to be deployed in this solution.  But users need to switch the S3 notification from lambda to SQS.


Here is a step by step guide:



Step 1:


Set up CloudTrail to deliver logs to S3. This step is same as solution 1, anyway you need create such trail and persist events into S3



Step 2:


Create a SQS for event notification. Creating a stream is optional here, as we are going to leverage Timeplus data collection UI to create such a stream, so we directly create SQS which will be used as the notification channel for the S3 bucket.In order for S3 to send messages to the SQS queue, the SQS queue needs to allow S3 to send messages. You can do this by attaching an SQS policy to the queue that grants the necessary permission.


Here’s an example of the SQS policy to allow the S3 bucket to send messages:

{

  "Version": "2012-10-17",

  "Id": "QueuePolicy",

  "Statement": [

    {

    "Effect": "Allow",

      "Principal": {

        "Service": "s3.amazonaws.com"

      },

      "Action": "SQS:SendMessage",

      "Resource": "arn:aws:sqs:us-west-2:123456789012:YourQueueName",

      "Condition": {

        "ArnLike": {

          "aws:SourceArn": "arn:aws:s3:::YourBucketName"

        }

      }

    }

  ]

}



Step 3:


Create Event notification using SQS on your S3 bucket. Go to your S3 bucket and configure an event notification that triggers on new object creation with SQS, in the destination part, use the SQS we just created at step 2.



Step 4:

Create a data collection in Timeplus UI using Redpanda Connect. In Timeplus Console, use Redpanda Connect AWS_S3 input to create a data collection pipeline.




Put the following code into the input editor:

input:
 label: cloudtrail
 aws_s3:
   region: us-west-1
   credentials:
     id: userid
     secret: usersecret
   sqs:
     url: >-
     https://sqs.us-west-1.amazonaws.com/accountid/queuename
 processors:
   - decompress:
       algorithm: gzip
   - bloblang: |
       root = this.Records
   - unarchive:
       format: json_array
   - mapping: |-
       root.raw = content().string()

There is a processor attached to this input config. Which will help to transform the S3 content into a event stream ingested into Timeplus stream.

  • The decompress unzip the gzipped data from s3

  • The bloblang root - this.Records, read the json payload from the field called Records

  • The unarchive, will turn the json array of the events into batches of single json

  • And the last step, mapping will put the whole json object into the field of `raw`, which is our field name of stream cloudtrail_events.


Following the wizard to next step, which will previous the s3 input pipeline result, and guide user to create the stream used to store the cloudtrail events



Now, we have another way to get those cloudtrail events into Timeplus stream in real-time.


 

Query CloudTrail Events


Now as CloudTrail data is ingested into Timeplus stream in real-time, User can start analysis the stream, In the Timeplus Console UI, run following SQL, which will show the eventName, eventType and sourceIP for new triggered CloudTrail event started from last 15 minutes and continuously monitor it.


SELECT
 raw, raw:eventName AS eventName, raw:eventType AS eventType, raw:sourceIPAddress AS sourceIP
FROM
 cloudtrail_events
WHERE
 _tp_time > (now() - 15m)


 

Which one to use, Lambda or SQS?


When comparing the above solutions, which one should be used?



Lambda Based Notifications


Solution 1 uses Lambda Function as the trigger to handle new arrival CloudTrail events, the events are extracted using a python script, and calling Timeplus’s REST API to push the event in real-time. There are some pros and cons.


Pros

  • Low Latency: Lambda functions are triggered immediately upon the S3 event, providing near-real-time processing of notifications.

  • Cost Efficiency: With Lambda’s pay-as-you-go model, you only pay for the time your code is running. This is highly cost-effective for low or moderate volumes of events.

  • Automatic Scaling: Lambda scales automatically to handle an increasing number of requests. It can handle a large number of events concurrently without manual intervention.


Cons

  • Short Execution Time Limit: Lambda has a maximum execution time limit of 15 minutes. If the data processing task takes longer than that, users have to break it into smaller tasks or use other services. In this case, it is not a concern as each cloudtrail event is usually just 1~2k bytes json before compression, so it won't take too long to process.

  • Error Handling and Retries: If a Lambda invocation fails, it retries the execution. However, users don’t have as much control over retries and failure handling compared to using SQS, where you can implement more sophisticated error-handling strategies (e.g., DLQs).

  • Need expose Timeplus for Lambda access: Users need to expose Timeplus address or using Timeplus Cloud, so the AWS Lambda function and access the ingest endpoint.



SQS Based Notifications


Solution 2 uses SQS as the trigger to handle new arrival CloudTrail events, the events are extracted using Redpanda Connect descriptive pipeline to process the data and send to Timeplus. So the data is pulled into Timeplus.


Pros

  • Unlimited scalability: SQS can handle a large volume of events/messages, and it scales automatically to handle spikes in message volume without needing any manual intervention.

  • Persistence & Reliability: Messages in SQS are stored until a consumer retrieves and deletes them. This ensures that messages aren't lost even if the consumer is temporarily unavailable. So even if there is a network issue or Timeplus down, there will be no data lost.

  • No need to expose Timeplus: As data is pulled into Timeplus, there is no need to expose Timeplus endpoint.


Cons

  • Message Processing Latency: There might be a slight delay between when the message is added to the queue and when it is processed by the consumer, especially if the consumer is using long polling or batch processing.

  • Cost: While SQS pricing is generally low, costs can accumulate depending on the volume of requests and the amount of data transferred.


In summary, users can decide which way to go based on the latency, cost , durability and the network architecture regulations.


 

Summary


In today’s blog, I have demonstrated how user can setup two different solutions to get AWS CloudTrail event into a Timeplus stream:

  • CloudTrail + S3 + Lambda Function + Timeplus

  • CloudTrail + S3 + SQS + Timeplus with Redpanda Connect


The key difference between these two solutions is that they use different event notifications, one uses Lambda and the other uses SQS, one will push events to Timeplus, and the other will pull the event leveraging internal Redpanda Connect.  Both solutions have pros and cons, users can choose which one to use accordingly.


In my next blog, I would like to show how user can run some more powerful streaming query to do real-time monitoring and analysis of CloudTrail events for security and threat detection

  • Detect Unauthorized Access: Monitoring helps detect suspicious login attempts, privilege escalations, or any unauthorized access to AWS resources. For example, multiple failed ConsoleLogin attempts can indicate a brute-force attack.

  • Mitigate Insider Threats: You can track actions taken by IAM users or roles to prevent insider threats, such as unauthorized data deletion or policy changes.

  • Intrusion Detection: Continuous monitoring of AWS services like CloudTrail, GuardDuty, and CloudWatch Logs allows you to identify potential breaches or compromises, such as suspicious API calls or unusual traffic patterns.

  • Automated Threat Responses: With real-time monitoring, you can automate responses to certain security threats (e.g., disabling compromised accounts, triggering alerts).


Of course, you can do more than just security monitoring with this real-time data, including but not limited to: Operational Efficiency and Performance Monitoring, Compliance and Governance or Resource Usage and Management.


Looking forward to you feedback and comments about this, stay tuned!


 

Ready to try Timeplus Enterprise? Try for free for 30-days. 


Join our Timeplus Community! Connect with other users or get support in our Slack community.

bottom of page