How I Dockerized Apache Flink, Kafka, and PostgreSQL for Real-Time Data Streaming

Integrating pyFlink, Kafka, and PostgreSQL using Docker

Get your pyFlink applications ready using docker — author generated image using https://www.dall-efree.com/

Why Read This?

  • Real-World Insights: Get practical tips from my personal journey of overcoming integration hurdles.
  • Complete Setup: Learn how to integrate Flink, Kafka, and PostgreSQL seamlessly using Docker-Compose.
  • Step-by-Step Guide: Perfect for both beginners and experienced developers looking to streamline their data streaming stack.

Setting Up the Scene

I embarked on a mission to integrate Apache Flink with Kafka and PostgreSQL using Docker. What makes this endeavor particularly exciting is the use of pyFlink — the Python flavor of Flink — which is both powerful and relatively rare. This setup aims to handle real-time data processing and storage efficiently. In the following sections, I’ll demonstrate how I achieved this, discussing the challenges encountered and how I overcame them. I’ll conclude with a step-by-step guide so you can build and experiment with this streaming pipeline yourself.

The infrastructure we’ll build is illustrated below. Externally, there’s a publisher module that simulates IoT sensor messages, similar to what was discussed in a previous post. Inside the Docker container, we will create two Kafka topics. The first topic, sensors, will store incoming messages from IoT devices in real-time. A Flink application will then consume messages from this topic, filter those with temperatures above 30°C, and publish them to a second topic, alerts. Additionally, the Flink application will insert the consumed messages into a PostgreSQL table created specifically for this purpose. This setup allows us to persist sensor data in a structured, tabular format, providing opportunities for further transformation and analysis. Visualization tools like Tableau or Power BI can be connected to this data for real-time plotting and dashboards.

Moreover, the alerts topic can be consumed by other clients to initiate actions based on the messages it holds, such as activating air conditioning systems or triggering fire safety protocols.

Services included in the docker container — image by author

In order to follow up the tutorial, you can clone the following repo. A docker-compose.yml is placed in the root of the project so you can initialize the multi-container application. Furthermore, you can find detailed instructions in the README file.

Issues With Kafka Ports in docker-compose.yml

Initially, I encountered problems with Kafka’s port configuration when using the confluentinc Kafka Docker image, a popular choice for such setups. This issue became apparent through the logs, emphasizing the importance of not running docker-compose up in detached mode (-d) during initial setup and troubleshooting phases.

The reason for the failure was that the internal and external hosts were using the same port, which led to connectivity problems. I fixed this by changing the internal port to 19092. I’ve found this blog post pretty clarifying.

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:19092,PLAINTEXT_HOST://localhost:9092

Configuring Flink in Session Mode

To run Flink in session mode (allowing multiple jobs in a single cluster), I’m using the following directives in the docker-compose.yml.

Custom Docker Image for PyFlink

Given the limitations of the default Apache Flink Docker image, which doesn’t include Python support, I created a custom Docker image for pyFlink. This custom image ensures that Flink can run Python jobs and includes the necessary dependencies for integration with Kafka and PostgreSQL. The Dockerfile used for this is located in the pyflink subdirectory.

  1. Base Image: We start with the official Flink image.
  2. Python Installation: Python and pip are installed, upgrading pip to the latest version.
  3. Dependency Management: Dependencies are installed via requirements.txt. Alternatively, lines are commented to demonstrate how to manually install dependencies from local files, useful for deployment in environments without internet access.
  4. Connector Libraries: Connectors for Kafka and PostgreSQL are downloaded directly into the Flink lib directory. This enables Flink to interact with Kafka and PostgreSQL during job execution.
  5. Script Copying: Scripts from the repository are copied into the /opt/flink directory to be executed by the Flink task manager.

With this custom Docker image, we ensure pyFlink can run properly within the Docker container, equipped with the necessary libraries to interact with Kafka and PostgreSQL seamlessly. This approach provides flexibility and is suitable for both development and production environments.

Note: Ensure that any network or security considerations for downloading connectors and other dependencies are addressed according to your deployment environment’s policies.

Integrating PostgreSQL

To connect Apache Flink to the PostgreSQL database, a proper JDBC connector is required. The custom Docker image for pyFlink downloads the JDBC connector for PostgreSQL, which is compatible with PostgreSQL 16.

To simplify this process, a download_libs.sh script is included in the repository, mirroring the actions performed in the Flink Docker container. This script automates the download of the necessary libraries, ensuring consistency between the Docker and local environments.

Note: Connectors usually have two versions. In this particular case, since I’m using Flink 1.18, the latest stable version available, I’ve downloaded 3.1.2–1.18. My guess is that the first version tracks JDBC implementation for several databases. They’re available in the maven directory.

env.add_jars(
f"file://{current_dir}/flink-connector-jdbc-3.1.2–1.18.jar",
f"file://{current_dir}/postgresql-42.7.3.jar"
)

Defining JDBC Sink

In our Flink task, there’s a crucial function named configure_postgre_sink located in the usr_jobs/postgres_sink.py file. This function is responsible for configuring a generic PostgreSQL sink. To use it effectively, you need to provide the SQL Data Manipulation Language (DML) statement and the corresponding value types. The types used in the streaming data are defined as TYPE_INFO … it took me a while to come up with the correct declaration 😅.

Notice also that the JdbcSink has an optional parameter to define the ExecutionOptions. For this particular case, I’ll use an update interval of 1 second and limit the amount of rows to 200. You can find more information in the official documentation. Yes, you guessed it, since I’m defining an interval, this can be considered a micro-batch ETL. However, due to Flink parallelism you can handle multiple streams at once in a simple script which is at the same time, easy to follow.

Note: Don’t forget to create the raw_sensors_data table in Postgres, where raw data coming from the IoT sensors will be received. This is covered in the step-by-step guide in the sections below.

Sinking Data to Kafka

I’ve covered how to consume data from a Kafka topic in a previous discussion. However, I haven’t configured a sink yet and that’s what we’ll do. The configuration has some intricacies and it’s defined in a function, similarly to the Postgres sink. Additionally, you have to define the type for the data stream before sinking it to Kafka. Notice that the alarms_data stream is properly casted as a string with output_type=Types.STRING() before sinking it to Kafka, since I’ve declared the serializer as SimpleStringSchema().

I’ll show you how to fetch data from the alerts topic in the following steps.

Local or Containerized configuration

One of the greatest things about this docker configuration is that you can run Flink from local or inside the container as a managed task. The local Flink setup is depicted in the following figure, where you can see our Flink application detached from the docker container. This may help to troubleshoot Flink, which doesn’t have a good suite of native observability tools. Actually, we would like to give a try to datorios tools for Flink, they are very promising for monitoring purposes.

Runing Flink applications in local with other services running inside the container — image by author

If you want to try the Flink application locally, you have to correctly define the hosts and ports used by the script which actually are two constants in the usr_jobs/postgres_sink.py file:

For container run, use:

KAFKA_HOST = "kafka:19092"
POSTGRES_HOST = "postgres:5432"

For local run, use:

KAFKA_HOST = "localhost:9092"
POSTGRES_HOST = "localhost:5432"

By default the repo sets up the Flink application to run inside the container. You can monitor the jobs running using the web UI, accessing from http://localhost:8081. You won’t be able to see it if you choose to run the job locally.

Screenshot of the Flink web UI with the running job — image by author

Note: If you run the job locally, you need to install the Flink dependencies located in the requirements.txt. Also a pyproject.toml file is provided if you like to set up the environment with poetry.

Step-by-Step Guide to Run the Streaming Pipeline

Step 1: Launch the multi-container application

Launch the containers by running docker-compose. I preferred to do it without detached mode to see the logs while the containers are spinning up and then running.

docker-compose up

Check for the logs to see if the services are running properly.

Step 2: Create the Kafka topics

Next, we’re going to create the topics to receive data from the IoT sensors and store the alerts filtered by the Flink application.

docker-compose exec kafka kafka-topics 
-- create - topic sensors
-- bootstrap-server localhost:9092
-- partitions 1
-- replication-factor 1

docker-compose exec kafka kafka-topics
-- create - topic alerts
-- bootstrap-server localhost:9092
-- partitions 1
-- replication-factor 1

To check if the topics were created correctly you can execute the following command

docker-compose exec kafka kafka-topics 
-- bootstrap-server localhost:9092
-- list

Step 3: Create Postgres table

Login to the postgres console

psql -h localhost -U flinkuser -d flinkdb

Enter the password flinkpassword to log into the postgres console, remember this is a local configuration so default access has been configured in the docker-compose.yml. Then create the table

CREATE TABLE raw_sensors_data (
message_id VARCHAR(255) PRIMARY KEY,
sensor_id INT NOT NULL,
message TEXT NOT NULL,
timestamp TIMESTAMPTZ NOT NULL
);

You can check if the table is properly created by doing the following

flinkdb=# d raw_sensors_data

This will show you a result similar to the following one:

Step 4: Launching the Kafka producer

Create a local environment with conda or poetry and install python kafka package:

pip install kafka-python

Then execute the kafka producer, which mimics IoT sensor messages and publishes messages to the sensors topic.

python pyflink/usr_jobs/kafka_producer.py

Leave it running for the rest of the tutorial.

Step 5: Initializing the Flink task

We’re going to launch the Flink application from within the container, so you can monitor it from the web UI through localhost:8081. Run the following command from the repository root:

docker-compose exec flink-jobmanager flink run 
-py /opt/flink/usr_jobs/postgres_sink.py

You’ll see some logging information, additionally alerts will also be displayed in the flink-jobmanager container logs. Also, you can check if the job is running from the Flink web UI http://localhost:8081/#/job/running.

Details of running job — image by author

Apparently the monitoring tells that there are no messages going through the Flink job, which is not true, since alerts can be seen in the docker log.

We’ll check the messages using the Postgres table and read the alerts topic, which were created for this purpose.

Step 6: Read Alerts in Kafka Topic

To read data in the alerts topic, you can execute the following command:

docker-compose exec kafka kafka-console-consumer 
-- bootstrap-server localhost:9092
-- topic alerts
-- from-beginning

That will bring all the messages that the topic has received so far.

Step 7: Read raw data from Postgres table

Additionally you can query the raw messages from the IoT sensor and even parse the JSON data in PostgreSQL:

SELECT
*,
(message::json->>'temperature')::numeric as temperature
FROM raw_sensors_data
LIMIT 10;

Step 8: Stopping Services

You can easily stop everything by doing ctrl-c on the docker terminal. If you prefer, to make proper shutdown, proceed with the following steps:

  1. Cancel the Flink job by clicking in the top right corner of job details in the web UI.
  2. Stop the kafka_producer.py script which was running locally.
  3. Ctrl-c on the docker terminal to stop the services

The information exchanged in the session, while the services were running, is permanently stored. So in the case you want to query the Postgres table or the Kafka topics, the data is going to be there.

Insights on Using Multiple Sinks in a PyFlink Job

In the Flink job used for demonstration, I’m managing 2 data streams simultaneously, in the same task. The one that writes raw data coming from the sensors topic (IoT devices) and the filtered alerts which are set to another topic. This has some advantages and drawbacks, as a simple summary, here are the pros and cons:

Pros of Single Job with Multiple Sinks:

– Simplicity in resource management.

– Consistency in data flow.

Cons of Single Job:

– Can become complex as logic grows.

– Scalability might be an issue.

Pros of Multiple Jobs:

– Better fault isolation.

– Focused optimization.

Cons of Multiple Jobs:

– Resource overhead.

– Coordination complexity.

Conclusion

This setup offers a robust solution for real-time data streaming and processing, integrating Flink, Kafka, and PostgreSQL effectively. The main purpose of using Postgres in the loop is to check the raw messages coming from the IoT devices without relying on queries to the topic itself. It also helped to demonstrate how to sink data using a JDBC connector, which might be pretty standard. The message transformations were done using the DataStream API. I would like to dive further into the SQL API which introduces a friendlier interface. Finally, regarding how to manage data streams, choose between single or multiple jobs based on your specific requirements ensuring scalability and maintainability.

Next Steps

1. Use SQL API to make transformations.

2. Optimize resource usage based on job complexity.

3. Explore advanced Flink features for complex data processing tasks.

Happy streaming! 🚀

Stay tuned for more tutorials on integrating and scaling data engineering solutions with Docker!

Feel free to reach out for any questions or suggestions in the comments below!

Ready to Optimize Your Streaming Data Applications?

Unlock the full potential of your data with our expert consulting services, tailored for streaming data applications. Whether you’re looking to enhance real-time analytics, streamline data pipelines, or optimize performance, we’re here to help.