This is an example for Camel-Kafka-connector PGEvent Source
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
Download the connector package tar.gz and extract the content to a directory. In this example we’ll use /home/oscerd/connectors/
> cd /home/oscerd/connectors/
> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-pgevent-kafka-connector/0.11.5/camel-pgevent-kafka-connector-0.11.5-package.tar.gz
> untar.gz camel-pgevent-kafka-connector-0.11.5-package.tar.gz
You’ll need to set up the plugin.path
property in your kafka
Open the $KAFKA_HOME/config/connect-standalone.properties
and set the plugin.path
property to your choosen location:
...
plugin.path=/home/oscerd/connectors
...
We’ll need a full running Postgresql instance.
First step is running it:
> docker run --name some-postgres -e POSTGRES_PASSWORD=mysecretpassword -d postgres
6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb
Take note of the container id. We need now to create the table we’ll use: the table is the following
CREATE TABLE accounts (
user_id serial PRIMARY KEY,
username VARCHAR ( 50 ) UNIQUE NOT NULL,
city VARCHAR ( 50 ) NOT NULL
);
We are now ready to create the table
> docker exec -it 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb psql -U postgres
psql (13.0 (Debian 13.0-1.pgdg100+1))
Type "help" for help.
postgres=# CREATE TABLE accounts (
postgres(# user_id serial PRIMARY KEY,
postgres(# username VARCHAR ( 50 ) UNIQUE NOT NULL,
postgres(# city VARCHAR ( 50 ) NOT NULL
postgres(# );
Now we need to create a trigger with a notify command for the channel we’ll choose. In our case we’ll call the channel like the table we created, so 'accounts'
The Trigger is the following
CREATE OR REPLACE FUNCTION row_inserted()
RETURNS trigger AS $$
DECLARE
BEGIN
PERFORM pg_notify(
'accounts',
row_to_json(NEW)::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER row_inserted
AFTER INSERT ON accounts
FOR EACH ROW
EXECUTE PROCEDURE row_inserted();
We need to execute it in our psql session
> docker exec -it 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb psql -U postgres
psql (13.0 (Debian 13.0-1.pgdg100+1))
Type "help" for help.
postgres=# CREATE OR REPLACE FUNCTION row_inserted()
postgres-# RETURNS trigger AS $$
postgres$# DECLARE
postgres$# BEGIN
postgres$# PERFORM pg_notify(
postgres$# 'accounts',
postgres$# row_to_json(NEW)::text);
postgres$# RETURN NEW;
postgres$# END;
postgres$# $$ LANGUAGE plpgsql;
CREATE FUNCTION
postgres=#
postgres=# CREATE TRIGGER row_inserted
postgres-# AFTER INSERT ON accounts
postgres-# FOR EACH ROW
postgres-# EXECUTE PROCEDURE row_inserted();
CREATE TRIGGER
We need to take note also of the container ip
> docker inspect -f '{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb 172.17.0.2
Open the PGEvent configuration file at $EXAMPLES/pgevent/pgevent-source/config/CamelPgeventSourceConnector.properties
name=CamelPgeventSourceConnector
connector.class=org.apache.camel.kafkaconnector.pgevent.CamelPgeventSourceConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
topics=mytopic
camel.source.endpoint.user=postgres
camel.source.endpoint.pass=mysecretpassword
camel.source.path.port=5432
camel.source.path.host=172.17.0.2
camel.source.path.channel=accounts
camel.source.path.database=postgres
and add the correct IP for the container.
Run the kafka connect with the SQL Source connector:
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $EXAMPLES/pgevent/pgevent-source/config/CamelPgeventSourceConnector.properties
Now we need to run some insert in our database
> docker exec -it 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb psql -U postgres
postgres=# insert into accounts(username,city) values('andrea','Roma');
INSERT 0 1
Asynchronous notification "accounts" with payload "{"user_id":1,"username":"andrea","city":"Roma"}" received from server process with PID 152.
postgres=# insert into accounts(username,city) values('John','New York');
INSERT 0 1
Asynchronous notification "accounts" with payload "{"user_id":2,"username":"John","city":"New York"}" received from server process with PID 152.
On a different terminal run the kafkacat consumer
> ./kafkacat -b localhost:9092 -t mytopic
{"user_id":1,"username":"andrea","city":"Roma"}
{"user_id":2,"username":"John","city":"New York"}
% Reached end of topic dbtest6 [0] at offset 2