Data Migration from Postgres using Kafka Connect

Zeeshan Kareem
2 min readNov 27, 2020

Overview: This story covers the process of automatic data migration from one database (here Postgres) to another using Kafka Connect for Change Data Capture (CDC). The source consists of concerned list of tables by creating Publication in Postgres source Database1 whose data needs to be migrated into certain tables of Database2.

Tools: Postgres, Kafka Connect, Kafka, Spring Boot, Java

Diagram representation of data flow

Step 1: Create Publication for the concerned list of tables in Database

CREATE PUBLICATION dbz_publication FOR TABLE table1, table2, table3,…., tableN;

Note: Until and unless PUBLICATION name is explicitly mentioned while setting Kafka Connect configuration, it should be dbz_publication in order to caputure changes in Database1 by Kafka Connector.

Step 2: Connect the Postgres Database1 as a source to Kafka

Use Postman to configure settings needed to sink data from required tables of Database1 to Kafka:

POST request: http://×××:×××/connectors

BODY (raw):
{“name”: “postgres-source”,
“config”: {“connector.class”:”io.debezium.connector.postgresql.PostgresConnector”,
“tasks.max”:”1",
“database.hostname”: “×××”,
“database.port”: “×××”,
“database.user”: “×××”,
“database.password”: “×××”,
“database.dbname” : “Database1”,
“database.server.name”: “dbserver1”,
“database.whitelist”: “Database1”,
“database.history.kafka.bootstrap.servers”: “×××:×××”,
“database.history.kafka.topic”: “schema-changes.Database1”,
“key.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“key.converter.schemas.enable”: “false”,
“value.converter.schemas.enable”: “true”,
“value.converter.schema.registry.url”: “http://×××:8081”,
“plugin.name”: “pgoutput",
“publication.autocreate.mode”: “filtered”
}
}

Note: We are using standard logical decoding plugin pgoutput which transforms the changes read to the logical replication protocol and filters the data according to the publication specification. To check if the above registered settings have been configured, hit a GET request from Postman with URL http://×××:×××/connectors/postgres-source/status

Step 3: Check the data streaming in topic(s)

Use Kafka Tool 2 to verify the data from the topics generated by published tables

Note: In order to read data properly, Key and Value should be converted to String from Kafka Tool 2 settings.

Step 4: Processing of streaming data from Kafka

Use consumer service(s) for writing business logic in order to read data from the topic(s), then process it as per the requirement and persist it in Database2.

Note: In my case, I had a Spring Boot microservice with consumers written in Java. While consuming data from a certain topic, three cases can be made for separate processing, i.e., Add, Update and Delete and then persisted in respective tables of Database2.

Step 5: Verify Data Migration from Database1 to Database2

Write DML statement(s) in Database1 and check if the corresponding changes (as per business logic in Step 4) are reflected in Database2

--

--

Zeeshan Kareem
0 Followers

IIT Ropar alumni, Software Engineer, Tech enthusiast