[Webinar] How to Build a Data Mesh in Confluent Cloud | Register Now

Analyzing AWS Audit Logs in Real Time Using Confluent Cloud and Amazon EventBridge

Written By

Last year, we introduced the Connect with Confluent partner program, enabling our technology partners to develop native integrations with Confluent Cloud. This gives our customers access to Confluent data streams from within their favorite applications and allows them to extract maximum value from their data. With over 40 partners now in the program, including two Amazon Web Services (AWS) services—Amazon Athena and AWS Lambda—we’re rapidly expanding Confluent’s already vast data streaming ecosystem. Today, we are excited to announce that Amazon EventBridge has joined the Connect with Confluent program, marking it the third AWS service to become part of this collaborative effort.

EventBridge is a fully managed service that facilitates seamless event routing between AWS and third-party services. More than 1.5 million customers use EventBridge to send over 2.6 trillion messages each month. 

Currently, two types of Confluent with EventBridge integrations are available. First, API destinations enable customers to send EventBridge data to Confluent. Second, there's EventBridge Pipes, a fully managed point-to-point integration, and the EventBridge Kafka Connector, both of which sink data from Confluent to events. In this blog post, we will primarily focus on the API destinations integration. Additionally, we will discuss how to set up the integration with EventBridge Pipes and the connector, using Confluent Cloud as a source.

EventBridge to Confluent integration use cases

Confluent and EventBridge are similar but serve different use cases. Confluent Cloud is a complete data streaming platform designed for managing and processing real-time data streams at scale. Confluent provides advanced features like stream processing, schema management, and connectors for integrating with various data sources and sinks, making it a versatile choice for building event-driven architectures in multi-cloud or hybrid environments. It is the ideal choice for high-throughput and low-latency use cases with complex real-time event processing needs. For instance, Confluent offers a p99 end-to-end latency of sub-100 milliseconds for GBps+ or tens of millions of events per second. Moreover, organizations can leverage over 120 pre-built connectors provided by Confluent to seamlessly stream data to and from virtually any data store on AWS.

EventBridge is an event router that connects various AWS and third-party services. It also offers EventBridge Pipes for point-to-point integrations and EventBridge Scheduler, a serverless task scheduler that simplifies creating, executing, and managing millions of schedules across AWS services without the need to provision or manage underlying infrastructure. Over 160 AWS services produce operational and management events routed through EventBridge. Furthermore, EventBridge supports events from over 30 third-party services. Integrating the Confluent data streaming platform with EventBridge allows organizations to unlock value from all of these data sources in real time. 

By integrating both services together we can unlock numerous use cases, with one prominent example being real-time analysis of AWS CloudTrail events. This enables organizations to capture and process CloudTrail events in real time for various use cases such as security monitoring, compliance auditing, and operational analysis. Doing so allows for rapid detection of malicious behavior and prevents any potential damage before it occurs. 

EventBridge API destinations integration 

API destinations are HTTP endpoints that you can invoke as the target for EventBridge. In the case of integration with Confluent, this involves utilizing Confluent's Produce REST API to transmit data from EventBridge to Confluent seamlessly. This allows customers to leverage Confluent’s rich set of tools to stream, process, and analyze data in real time.

Solution overview

In this scenario, AWS CloudTrail events are routed to EventBridge, which in turn forwards them to a public Confluent Cloud cluster hosted on AWS. Subsequently, Confluent Cloud for Apache Flink® is leveraged for real-time detection of any abnormal activity. Detected anomalies will be sent to a “flagged” topic in Confluent Cloud. In real-world applications, a dedicated consumer will react promptly to these incidents in real time.

Prerequisites

  • Confluent Cloud cluster with “audit_trail_topic” created

  • Confluent Cloud API keys with access to the cluster and topic

  • AWS Command Line Interface (AWS CLI)

  • CloudTrail with logging enabled

Capturing CloudTrail read-only events in EventBridge

To capture CloudTrail management events we need to create a rule in EventBridge in the same region where CloudTrail is enabled. It should also be created with a pattern that defines exactly what events we want to capture. The following uses the AWS CLI to create the rule:

'''
aws events put-rule \
    --name "ruleForManagementEvents" \
    --region <AWS_CloudTrail_Region>\
   --event-bus-name "default" \
    --state "ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS" \
   --event-pattern '{
        "detail-type": ["AWS API Call via CloudTrail"],
       "detail": {
            "userIdentity": {
                "type": [{"anything-but":"AWSService"}]
           },
            "eventSource": [{
                "anything-but": ["sts.amazonaws.com","ssm.amazonaws.com"]
           }]
       }
   }' \
    --role-arn <IAM_ROLE_ARN>
'''

The above command creates an EventBridge rule that captures management events generated by AWS API calls via CloudTrail, while filtering out events where the user identity is an AWS service and the event source is either "sts.amazonaws.com" or "ssm.amazonaws.com". You can change the event pattern depending on your use case.

Send CloudTrail events to Confluent Cloud

EventBridge uses API destinations to send data to Confluent Cloud. This method leverages Confluent Cloud’s fully managed Produce API. Therefore, the initial step involves creating an API destination and establishing a connection for Confluent Cloud within EventBridge. A connection is a logical container that holds authorization details of the target. 

Creating an API destination and connection to Confluent Cloud

  1. In the EventBridge API destinations console, click on “Create API destination.”

  2. Fill out the API destination details as follows:

    1. API destination endpoint: The Confluent REST Endpoint and path should look like this: <Confluent_Cloud_REST_Endpoint>/kafka/v3/clusters/<Confluent_Cloud_cluster_id>/topics/<topic_name>/records

    2. HTTP method: POST

  3. In the “Connection type” section, click Create a new connection and fill out the connection details. In the “Destination type” section, select Partners and choose “Confluent.”

  4. Enter the username (Confluent Cloud cluster API key) and Password (Confluent Cloud cluster API secret). 

  5. Click Create to create the API destination and the connection to Confluent Cloud.

Adding API destinations as a target to EventBridge 

Next, we also need to add the API destination as a target to the rule we created earlier to invoke the API destination target when the rule matches the filtered events. We use the AWS CLI to add the Confluent Cloud API destination as a target to the rule we created in step 1.

'''
aws events put-targets \
    --rule "ruleForManagementEvents" \
    --event-bus-name "default" \
    --region <AWS_CloudTrail_Region> \
    --targets '[{
        "Id": "cwcapidestination",
        "Arn": <API_Destination_ARN>,
        "RoleArn": <IAM_ROLE_TO_INVOKE_API_DESTINATION>,
        "InputTransformer": {
            "InputPathsMap": {
                "event": "$.detail"
            },
            "InputTemplate": "{\"value\":{\"type\":\"JSON\",\"data\":}}"
        }
    }]'
'''

In this command, the InputTransformer specifies how the event data should be transformed before sending it to the target. 

  • "InputPathsMap": Specifies the mapping of input paths. In this case, it maps the "event" key to the entire event detail.

  • "InputTemplate": Specifies the template for transforming the event data. It wraps the event data into a JSON object with a key "value" and embeds the event data within it. This value will then be used by the Confluent Produce API to send the data. 

Running this command successfully configures EventBridge to send events from CloudTrail to Confluent Cloud.

Checking events in Confluent Cloud

  1. In the Confluent Cloud console, go to the topic page.

  2. Select the Confluent Cloud environment and cluster, then click Continue.

  3. Select the audit_trail_topic and make sure that it’s receiving CloudTrail events.

Real-time audit log analysis

We can now use Confluent Cloud for Apache Flink to process, analyze, and transform the audit logs real time, enabling us to act and gain insights in real time.

First, we need to create a Flink compute pool. In the Flink console, choose the environment and click “Continue.” Then click “Create compute pool”; we choose the same region we’re working and gave the pool a name.

Authoring the Flink statement

One common use case is to detect anomalies in API invocations. Suppose we’re part of the Security team, and our objective is to flag instances where a specific S3 API is called more than 10 times within any 1-minute window by the same principal. This is how the Flink statement would look:

'''
SELECT
  window_start,
  window_end,
  JSON_VALUE(CAST(val AS STRING), '$.eventName') AS eventName,
  JSON_VALUE(CAST(val AS STRING), '$.eventSource') AS AWSService,
  JSON_VALUE(CAST(val AS STRING), '$.userIdentity.principalId') AS userIdentity,
  COUNT(*) AS eventCount
FROM TABLE(
  TUMBLE(TABLE eventbridge, DESCRIPTOR($rowtime), INTERVAL '1' MINUTE)
)
WHERE JSON_VALUE(CAST(val AS STRING), '$.eventSource') = 's3.amazonaws.com'
GROUP BY window_start, window_end, JSON_VALUE(CAST(val AS STRING), '$.eventName'), JSON_VALUE(CAST(val AS STRING), '$.eventSource'), JSON_VALUE(CAST(val AS STRING), '$.userIdentity.principalId')
HAVING COUNT(*) > 10;
'''

To test this, run '''aws s3 ls''' 15 times using the AWS CLI. Doing so will result in this row in the Flink statement output:

Deploying the Flink statement

When we are happy with the Flink SQL statement, we can wrap it so that it starts to send the flagged activities to the destination topic.

  1. Back in the Workspaces console, add a new table:

    '''
    CREATE TABLE target_table (
        window_start TIMESTAMP,
        window_end TIMESTAMP,
        eventName STRING,
        AWSService STRING,
        userIdentity STRING,
        eventCount BIGINT
    ); 
    '''

    This statement creates a new topic called "flagged_topic" and registers a schema for this topic in the Schema Registry.

  2. Now edit the running statement to insert data into the topic that was just created:

    '''
    INSERT INTO flagged_topic (window_start, window_end, eventName, AWSService, userIdentity, eventCount)
    <THE_FLINK_STATEMENT>
    '''

No-code field masking

Notice that certain user information is displayed as plain text. While this information could be valuable for downstream consumers to take the appropriate actions, we will opt to mask the “userIdentity” field to demonstrate our new Topic Actions feature. Topic Actions offers pre-configured stream processing statements designed to address common, domain-agnostic tasks.

  1. In the Data Portal, choose the environment, the “flagged_topic” topic, and then click on the Actions button.

  2. Currently there are two actions: Deduplicate topic and Mask fields. Choose Mask fields.

  3. In Settings, enter the userIdentity field and mask everything with ‘ * ‘ except the last two characters. You can fill out the rest and click Confirm.

  4. The userIdentity field is now masked and can be viewed in the final_flagged_topic.

Connecting the output data to other systems

Now that the output is stored in a Confluent topic, there are several options for connecting consumers or sinking the data into preferred data stores on AWS. One option is to connect consumers directly to the output topic. Furthermore, Confluent offers over 120 pre-built connectors that can be used to sink the data into various data stores on AWS.

(Optional) Send the data back to EventBridge 

Another possibility is to sink the data back to EventBridge or AWS services, such as Step Functions, using either the EventBridge Sink connector or EventBridge Pipes. The EventBridge Sink connector can send the data to any event bus that is configured, supporting the Confluent Cloud Schema Registry. On the other hand, EventBridge Pipes provide fully managed point-to-point integrations between different sources and 15 different AWS targets.

At the time of writing this post, EventBridge Pipes do not support Confluent Schema Registry. Therefore, events serialized with an Avro schema can be forwarded, but will not be filtered on with EventBridge Pipes. However, we can use an AWS Lambda function as an enrichment step in the Pipe to deserialize the event before sending to the downstream application as described below.

To set up the EventBridge Sink connector with Confluent Cloud, follow the steps outlined in Confluent Hub and the GitHub repo. On the other hand, to configure EventBridge Pipes for receiving data from Confluent, proceed with the following steps:

  1. Using the AWS CLI create a new Confluent secret in Secrets Manager.

    '''
    aws secretsmanager create-secret \
    --name confluent_cloud \
     --secret-string '{"username":<Confluent_API_Key>,"password":<Confluent_Secret>}'
    '''
  2. In the EventBridge Pipes console, choose Confluent from the list of sources.

  3. Fill out the bootstrap servers, topic name, are the secrets that we created in the previous step.

  4. In the Filtering page, we have the option to filter any events out before sending to the target. However, because the event is serialized with Avro format, Pipe filtering cannot be used at the moment. Consequently, the filtering capability is restricted to topics with JSON-encoded or plain string events. You can leave the defaults in the filtering page and click Next.

  5. In the Enrichment step, we can trigger a Lambda function for event deserialization before routing it back to Pipes. By leaving this empty, the data will remain encoded when it is sent to the target destination. You can leave it empty and click Next.

  6. You can choose one of the targets supported by EventBridge Pipes and click on “Create pipe.”

Considerations for using EventBridge API destinations with Confluent

API destinations use Confluent Produce API to send data to Confluent. Therefore, this integration has a few limitations that should be considered:

  • The integration does not support batching, which could limit throughput

  • API destinations and Confluent Produce API do not support Confluent Schema Registry, therefore, the integration will only write schemaless data to Confluent.

  • EventBridge API destinations only support public endpoints and therefore only support public Confluent Cloud clusters. For private Confluent Cloud clusters, a Lambda function can be used to read data from EventBridge and write it to Confluent as shown below.

  • The maximum throughput for the REST API to produce records directly to the Confluent Cloud cluster is 10 MBps. For higher throughput, other methods should be used. Similarly, each API destination endpoint has a soft limit of 300 invocations per second to protect downstream targets. This limit can be adjusted in the AWS quota console.

Conclusion

In this post, we explored the integration of EventBridge with Confluent Cloud, facilitating the seamless flow of data from various AWS services to Confluent Cloud for real-time processing and analysis. Through a practical demonstration, we showcased how audit events from CloudTrail can be ingested and analyzed in real time, empowering organizations to swiftly detect and respond to anomalies, thereby strengthening their security and compliance measures.

Despite the numerous benefits that this integration brings, it's important to note that it currently supports only REST. Having said that, we will continue to work with the EventBridge team to enhance the integration and introduce support for additional mechanisms like Kafka native producer.

Next Steps

Not yet a Confluent customer? Try Confluent Cloud in AWS Marketplace. New signups receive $400 in free credits to spend during their first 30 days. Your credits will be immediately visible on your Confluent Account after subscribing to Confluent through the AWS marketplace.

  • Ahmed Zamzam is a staff partner solutions architect at Confluent, specializing in integrations with cloud service providers (CSPs). Leveraging his expertise, he collaborates closely with CSP service teams to develop and refine integrations, empowering customers to leverage their data in real time effectively. Outside of work, Zamzam enjoys traveling, playing tennis, and cycling.

Did you like this blog post? Share it now