Skip to content

Latest commit

 

History

History

pgevent-source

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 

Camel-Kafka-connector PGEvent Source

This is an example for Camel-Kafka-connector PGEvent Source

Standalone

What is needed

  • A running postgresql instance through docker

Running Kafka

$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic

Download the connector package

Download the connector package tar.gz and extract the content to a directory. In this example we’ll use /home/oscerd/connectors/

> cd /home/oscerd/connectors/
> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-pgevent-kafka-connector/0.11.5/camel-pgevent-kafka-connector-0.11.5-package.tar.gz
> untar.gz camel-pgevent-kafka-connector-0.11.5-package.tar.gz

Configuring Kafka Connect

You’ll need to set up the plugin.path property in your kafka

Open the $KAFKA_HOME/config/connect-standalone.properties and set the plugin.path property to your choosen location:

...
plugin.path=/home/oscerd/connectors
...

Setup the docker image

We’ll need a full running Postgresql instance.

First step is running it:

> docker run --name some-postgres -e POSTGRES_PASSWORD=mysecretpassword -d postgres
6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb

Take note of the container id. We need now to create the table we’ll use: the table is the following

CREATE TABLE accounts (
	user_id serial PRIMARY KEY,
	username VARCHAR ( 50 ) UNIQUE NOT NULL,
	city VARCHAR ( 50 ) NOT NULL
);

We are now ready to create the table

> docker exec -it 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb psql -U postgres
psql (13.0 (Debian 13.0-1.pgdg100+1))
Type "help" for help.

postgres=# CREATE TABLE accounts (
postgres(# user_id serial PRIMARY KEY,
postgres(# username VARCHAR ( 50 ) UNIQUE NOT NULL,
postgres(# city VARCHAR ( 50 ) NOT NULL
postgres(# );

Now we need to create a trigger with a notify command for the channel we’ll choose. In our case we’ll call the channel like the table we created, so 'accounts'

The Trigger is the following

CREATE OR REPLACE FUNCTION row_inserted()
  RETURNS trigger AS $$
DECLARE
BEGIN
  PERFORM pg_notify(
    'accounts',
    row_to_json(NEW)::text);
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER row_inserted
  AFTER INSERT ON accounts
  FOR EACH ROW
  EXECUTE PROCEDURE row_inserted();

We need to execute it in our psql session

> docker exec -it 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb psql -U postgres
psql (13.0 (Debian 13.0-1.pgdg100+1))
Type "help" for help.

postgres=# CREATE OR REPLACE FUNCTION row_inserted()
postgres-#   RETURNS trigger AS $$
postgres$# DECLARE
postgres$# BEGIN
postgres$#   PERFORM pg_notify(
postgres$#     'accounts',
postgres$#     row_to_json(NEW)::text);
postgres$#   RETURN NEW;
postgres$# END;
postgres$# $$ LANGUAGE plpgsql;
CREATE FUNCTION
postgres=#
postgres=# CREATE TRIGGER row_inserted
postgres-#   AFTER INSERT ON accounts
postgres-#   FOR EACH ROW
postgres-#   EXECUTE PROCEDURE row_inserted();
CREATE TRIGGER

We need to take note also of the container ip

> docker inspect -f '{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb
172.17.0.2

Setup the connectors

Open the PGEvent configuration file at $EXAMPLES/pgevent/pgevent-source/config/CamelPgeventSourceConnector.properties

name=CamelPgeventSourceConnector
connector.class=org.apache.camel.kafkaconnector.pgevent.CamelPgeventSourceConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

topics=mytopic

camel.source.endpoint.user=postgres
camel.source.endpoint.pass=mysecretpassword
camel.source.path.port=5432
camel.source.path.host=172.17.0.2

camel.source.path.channel=accounts
camel.source.path.database=postgres

and add the correct IP for the container.

Running the example

Run the kafka connect with the SQL Source connector:

$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $EXAMPLES/pgevent/pgevent-source/config/CamelPgeventSourceConnector.properties

Now we need to run some insert in our database

> docker exec -it 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb psql -U postgres
postgres=# insert into accounts(username,city) values('andrea','Roma');
INSERT 0 1
Asynchronous notification "accounts" with payload "{"user_id":1,"username":"andrea","city":"Roma"}" received from server process with PID 152.
postgres=# insert into accounts(username,city) values('John','New York');
INSERT 0 1
Asynchronous notification "accounts" with payload "{"user_id":2,"username":"John","city":"New York"}" received from server process with PID 152.

On a different terminal run the kafkacat consumer

> ./kafkacat -b  localhost:9092 -t mytopic
{"user_id":1,"username":"andrea","city":"Roma"}
{"user_id":2,"username":"John","city":"New York"}
% Reached end of topic dbtest6 [0] at offset 2