Internal services pipeline in Analytics Platform

Dima Kalashnikov
Picnic Engineering
Published in
9 min readSep 9, 2022

--

We continue our story on the Analytics Platform setup in Picnic. In the “Picnic Analytics Platform: Migration from AWS Kinesis to Confluent Cloud” we described why and how we migrated from AWS Kinesis to Confluent Cloud.

This time we will dive into how we configure our internal services pipeline.

Quick re-cap: the purpose of the internal pipeline is to deliver data from dozens of Picnic back-end services such as warehousing, machine learning models, customers and order status updates. The data is loaded into Snowflake, Picnic’s single source of truth Data Warehouse (DWH). Almost all internal services emit events over RabbitMQ. Our pipeline captures these events and sends them to Confluent Cloud. We use the RabbitMQ Source connector for Apache Kafka Connect. Finally, the Snowflake sink connector picks the messages from topics and loads them into respective tables in DWH.

Now we are going to take a deeper look into each sub-part of our system.

RabbitMQ

We have already mentioned that RabbitMQ is used as the main inter-service communication event bus at Picnic. Yet, some messages are destined for the DWH only.

One may wonder why don’t we replace RabbitMQ with Apache Kafka everywhere?

In order to answer the first question, we should take a closer look at the difference between RabbitMQ and Apache Kafka in terms of services parallelism. RabbitMQ has a killer-feature for many Picnic services: it doesn’t impose constraints on horizontal scalability of producers and consumers. Let’s for a moment take exclusive queues out of the equation, as well as extreme cases of scaling. With Kafka, one is always capped by the number of partitions in a topic.While we can scale them easily (we don’t care about the order of analytics events at Picnic, remember?), it makes the situation more complicated for all parties.

The challenge is that automatic triggers for Kubernetes horizontal scaling should be tied not only to CPU, memory and other workload metrics of a service, but also to the number of partitions to avoid producing idle pods. At the same time, we cannot grant each service an abundance of partitions due to cost considerations. We settled upon the fact that RabbitMQ should remain the central part of inter-service communication, and we shall return to that point only if we figure out how to dynamically and elastically scale partitions in a cost efficient manner.

Another crucial question one may ask, if a message’s purpose is to be delivered to DWH only, why don’t services publish them to Kafka directly?

Apache Kafka is indeed the right place to directly write messages targeting DWH. However, we decided that the Analytics Platform team should tackle one problem at a time: first move away from AWS Kinesis without any disruptions and additional work for other teams, and only then help to move producers to work with Apache Kafka directly. At the moment, we already have a first team who works with Apache Kafka as a first-class citizen (say hi to the Targeting team!).

In the following sections we take a look at configuration of the components presented on the diagram¹.

Confluent RabbitMQ Source Connector

Apache Kafka Connect provides a vast ecosystem of open-source and proprietary connectors to source and sink data. In Picnic we heavily rely on them to simplify processing in our setup with the strictest SLA, the internal services pipeline. We run Kafka Connect in distributed mode to achieve better throughput by horizontally scaling.

Let’s first look into a Dockerfile for our Apache Kafka Connect with Confluent RabbitMQ source connector.

FROM confluentinc/cp-kafka-connect:7.1.3ENV RABBITMQ_CONNECTOR_VERSION 1.5.2
# 1 - start
ENV JMX_PROMETHEUS_VERSION 0.16.1
ENV JMX_PROMETHEUS_JAR "jmx_prometheus_javaagent-${JMX_PROMETHEUS_VERSION}.jar"
ENV JMX_PROMETHEUS_FOLDER "/usr/share/jmx-exporter"
ENV JMX_PROMETHEUS_SHA256 "0ddc6834f854c03d5795305193c1d33132a24fbd406b4b52828602f5bc30777e"
USER root
ADD kafka_connect_prometheus.yaml ${JMX_PROMETHEUS_FOLDER}/kafka_connect_prometheus.yaml
RUN curl -f -o "${JMX_PROMETHEUS_JAR}" -L "https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/${JMX_PROMETHEUS_VERSION}/${JMX_PROMETHEUS_JAR}" \
&& test $(sha256sum "${JMX_PROMETHEUS_JAR}" | awk '{print $1}') = "${JMX_PROMETHEUS_SHA256}" \
&& mkdir "${JMX_PROMETHEUS_FOLDER}" \
&& mv "${JMX_PROMETHEUS_JAR}" "${JMX_PROMETHEUS_FOLDER}/jmx_prometheus_javaagent.jar"
# 1 - end
# 2 - start
# User provided by Confluent Platform Kafka Connect
USER appuser
RUN confluent-hub install --no-prompt "confluentinc/kafka-connect-rabbitmq:${RABBITMQ_CONNECTOR_VERSION}"
# 2 - end
# JMX exporter port
EXPOSE 8081
# Kafka Connect Rest API Port
EXPOSE 8083

We essentially leverage the Confluent Platform Kafka Connect image with a few twists:

  1. Block 1: We add the JMX metrics exporter for Prometheus. It enables us to extract performance and health metrics from the pipeline as early as possible. We use configuration recommended by Confluent for the JMX exporter (see kafka_connect_prometheus.yaml)
  2. Block 2: Next, we install the RabbitMQ source connector from Confluent Hub. The entrypoint is already provided in the base image, so we omit it here.

As has been already mentioned, we run our services on top of the Kubernetes cluster, but for simplicity we provide a docker-compose file closely matching our setup. We configure Kafka Connect with environment variables divided into 6 main blocks:

Block 1:

CONNECT_BOOTSTRAP_SERVERS: BOOTSTRAP_PORT
CONNECT_GROUP_ID: confluent-connect
CONNECT_CONFIG_STORAGE_TOPIC: confluent-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: confluent-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: confluent-connect-status

We have a standard set of parameters telling Kafka Connect where our bootstrap server is, what the name of its consumer group is and where to keep configurations, offsets and statuses.

Block 2:

CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter

This specifies the default set of JSON converters for our event data.

Block 3:

CONNECT_TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS: 150000

Here we tweak the default configuration, by bumping up the graceful shutdown time so our connector can push as many queued events to Kafka as it can during a restart, instead of re-queueing them back to RabbitMQ.

Block 4:

CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components/
CONNECT_SECURITY_PROTOCOL: SASL_SSL
CONNECT_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="KEY" password="SECRET";
CONNECT_SASL_MECHANISM: PLAIN
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: HTTPS
CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL
CONNECT_PRODUCER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="KEY" password="SECRET";
CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
CONNECT_CONFLUENT_LICENSE: CONFLUENT_LICENCE
CONNECT_REST_ADVERTISED_HOST_NAME: YOUR_HOST_IP

This instructs Kafka Connect to securely connect to the Kafka cluster, where to find installed plugins (in our case, RabbitMQ source connector), and what the hostname for the REST API is.

Block 5:

CONNECT_PRODUCER_COMPRESSION_TYPE: lz4
CONNECT_PRODUCER_BATCH_SIZE: 131072
CONNECT_PRODUCER_LINGER_MS: 50
CONNECT_PRODUCER_MAX_REQUEST_SIZE: 8000000

The main purpose here is to let heavy messages through the connector. Some of our services send events containing aggregated data which are quite bulky, so we increase the max request size to 8MB (max value for standard Confluent Cloud clusters) and enable events compression. To make compression fruitful, we also add lingering and increase batch size to have a higher data compression rate. One needs to be careful with events sent through Kafka Connect which are bigger than the max request size: connectors fail and go to the manual-only recovery mode. That is, the only way to fix that is to either redeploy it or call Kafka Connect REST API to restart the connector. We automated that part in Picnic by introducing a Kafka Connect Watchdog, our custom tool which restarts connectors in case they fail.

Block 6:

KAFKA_OPTS: -javaagent:/usr/share/jmx-exporter/jmx_prometheus_javaagent.jar=8081:/usr/share/jmx-exporter/kafka_connect_prometheus.yaml

Finally, we update Kafka options to enable our JMX metrics exporter for Prometheus.

The full docker-compose file follows below:

---
version: '3.7'
services:
kafka-connect:
build: .
environment:
# 1
CONNECT_BOOTSTRAP_SERVERS: BOOTSTRAP_PORT
CONNECT_GROUP_ID: confluent-connect
CONNECT_CONFIG_STORAGE_TOPIC: confluent-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: confluent-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: confluent-connect-status
# 2
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
# 3
CONNECT_TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS: 150000
# 4
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components/
CONNECT_SECURITY_PROTOCOL: SASL_SSL
CONNECT_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="KEY" password="SECRET";
CONNECT_SASL_MECHANISM: PLAIN
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: HTTPS
CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL
CONNECT_PRODUCER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="KEY" password="SECRET";
CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
CONNECT_CONFLUENT_LICENSE: CONFLUENT_LICENCE
CONNECT_REST_ADVERTISED_HOST_NAME: YOUR_HOST_IP
# 5
CONNECT_PRODUCER_COMPRESSION_TYPE: lz4
CONNECT_PRODUCER_BATCH_SIZE: 131072
CONNECT_PRODUCER_LINGER_MS: 50
CONNECT_PRODUCER_MAX_REQUEST_SIZE: 8000000
# 6
KAFKA_OPTS: -javaagent:/usr/share/jmx-exporter/jmx_prometheus_javaagent.jar=8081:/usr/share/jmx-exporter/kafka_connect_prometheus.yaml
ports:
- 8083:8083
- 8081:8081
volumes:
- ./:/var/app/logs

Finally, our configuration of the RabbitMQ Source connector itself. It consists of 4 main blocks.

Block 1:

"rabbitmq.shutdown.timeout.ms": "30000",
"rabbitmq.username": "USERNAME",
"rabbitmq.requested.channel.max": "3",
"rabbitmq.security.protocol": "SSL",
"rabbitmq.host": "RMQHOST",
"rabbitmq.virtual.host": "VHOST",
"rabbitmq.port": "PORT",
"rabbitmq.password": "PASSWORD",
"rabbitmq.queue": "QUEUE"

Parameters starting with the rabbitmq prefix are specific to the RabbitMQ connector. Here we define how to connect to it and add 2 overrides of defaults:

  1. rabbitmq.shutdown.timeout.ms — defaults to 10 seconds but we bump it to 30 since otherwise many messages are re-queued during a restart of the connector causing surge on its startup
  2. rabbitmq.requested.channel.max — defaults to 0 which is unlimited. We saw a few times that it can bring down a RabbitMQ cluster, especially in cases where the connector is misconfigured and tries to connect to non-existent queues. Connectors open more and more channels every time they attempt to connect causing enormous pressure on a RabbitMQ cluster. We decided to limit it to 3 which is performant enough and doesn’t cause any unexpected channel explosion.

Block 2:

"confluent.topic.ssl.endpoint.identification.algorithm": "HTTPS",
"confluent.license": "LICENCE",
"confluent.topic.security.protocol": "SASL_SSL",
"confluent.topic.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"KEY\" password=\"PASSWORD\";",
"confluent.topic.sasl.mechanism": "PLAIN",
"confluent.topic.bootstrap.servers": "BOOTSTRAP_SERVER"

This is the standard configuration for a Confluent Platform Connector which configures the bootstrap URL, security parameters and the Confluent licence.

Block 3:

"errors.log.enable": "true",
"errors.retry.timeout": "140000"

This enables extended logging for errors and adds retries for them (by default they are disabled).

Block 4:

"connector.class": "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
"tasks.max": "MAX_NUMBER_OF_TASKS",
"name": "my-connector",
"kafka.topic": "TOPIC"

This provides configuration for the connector class to be used, the max number of tasks (capped by the number of partitions!), a connector name and a topic where data will be delivered.

Block 5:

"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",

This specifies the converters for the Confluent RabbitMQ Source connector. Value converters must always be a ByteArrayConverter. Unfortunately, this connector doesn’t support any other type of converters at the moment. The key converter isn’t relevant in this case since the key is always null.

Full configuration follows below:

{
# 1
"rabbitmq.shutdown.timeout.ms": "30000",
"rabbitmq.username": "USERNAME",
"rabbitmq.requested.channel.max": "3",
"rabbitmq.security.protocol": "SSL",
"rabbitmq.host": "RMQHOST",
"rabbitmq.virtual.host": "VHOST",
"rabbitmq.port": "PORT",
"rabbitmq.password": "PASSWORD",
"rabbitmq.queue": "QUEUE",
# 2
"confluent.topic.ssl.endpoint.identification.algorithm": "HTTPS",
"confluent.license": "LICENCE",
"confluent.topic.security.protocol": "SASL_SSL",
"confluent.topic.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"KEY\" password=\"PASSWORD\";",
"confluent.topic.sasl.mechanism": "PLAIN",
"confluent.topic.bootstrap.servers": "BOOTSTRAP_SERVER",
# 3
"errors.log.enable": "true",
"errors.retry.timeout": "140000",
# 4
"connector.class": "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
"tasks.max": "MAX_NUMBER_OF_TASKS",
"name": "my-connector",
"kafka.topic": "TOPIC",
# 5
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
}

All our connectors operate under a respective service account in Confluent Cloud. In RBAC configuration we grant access to:

  • Read, write and describe topics prefixed with confluent-connect.
  • Read and describe the confluent-connect consumer group which is used to read from the topics mentioned above.
  • Read, write and describe the _confluent-command topic. Containing information about the Confluent Platform licence.
  • Write and describe access to our topics where data will be pushed.

Confluent Cloud

Our topics setup is quite straightforward: each of our RabbitMQ source connectors is attached to a separate topic. Partition number is estimated separately per topic and per environment depending on the workload. We tend to slightly overprovision partitions to accommodate for spikes in event volume and for future growth.

We also have a Confluent-managed Snowflake Sink connector. It is worth noting that we have a single instance of the connector with multiple tasks. The connector is capable of reading from multiple topics and writing into the respective tables. Therefore, there is no need to manage multiple instances of the connector. However, keep in mind that one cannot sink data from several topics into a single table. The configuration of the connector is well-described on the Confluent Documentation page, we don’t add anything custom to it. The only thing worth highlighting is that one task can work with only a limited number of partitions (in our case 50 partitions per task). You can see more details here on the connector properties page.

Snowflake

We essentially have one table in Snowflake per RabbitMQ source connector. We do not let the Confluent Snowflake sink connector provision tables automatically to have more control on the structure and format of a table. Here is how we set it up:

CREATE TABLE IF NOT EXISTS evt_raw.${table_name} (
record_metadata VARIANT COMMENT 'Metadata and RMQ headers of the event',
record_content VARIANT COMMENT 'Raw event content',
event_id VARCHAR DEFAULT public.f_get_kafka_event_id(record_metadata, record_content) COMMENT 'Event ID',
load_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP() :: TIMESTAMP COMMENT 'Timestamp of data load into Snowflake'
) CLUSTER BY (TO_DATE(load_ts))
COMMENT = 'Events from ''${exchange_name}'' exchange. ${routing_key_comment}';

Standard tables provisioned by the Snowflake Sink connector have only record_metadata and record_content fields. We have also added 2 more fields: event_id and load_ts .

Event id is generated with the function you can find below. We use MD5 hash of the event metadata and payload to reference and deduplicate them easier.

CREATE OR REPLACE FUNCTION public.f_get_kafka_event_id(
record_metadata VARIANT,
record_content VARIANT
)
RETURNS VARCHAR
RETURNS NULL ON NULL INPUT
COMMENT
= 'Returns a unique Kafka event ID given a record metadata (headers) and record content (payload).'
AS
$$
MD5(LOWER(
TRIM(record_metadata :: VARCHAR)
|| '|~~|' || TRIM(record_content :: VARCHAR)))
$$
;

Load timestamp helps us to establish CDC (change data capture) processes in the database and speed up time-dependent queries.

Wrapping up

We described the entire pipeline for our internal services in the Analytics Platform consisting of Confluent RabbitMQ Source connector, Confluent Cloud, Confluent Snowflake Sink connector and Snowflake. We hope you will find this article helpful on how to set up Kafka Connect and streamline your data to Snowflake! In the next blog posts, we will take a look at our setup for customer applications and monitoring.

[1]: DLQ: Dead Letter Queue; where rejected messages live

--

--