I will add another example of what can be done with Ansible Driven Automation by interfacing it with Kafka. For those who wonder what Event-Driven Ansible is have a look at my previous blog about it.
Additional Installation Steps
In my previous blog, I already listed the steps to setup Event Driven Ansible (EDA). To connect to Kafka (I will talk about Kafka installation in next chapter), Ansible has more requirements which are not installed via ansible-galaxy
, but via pip
. To do so, simply use the requirements file from the collection:
1 | pip install -r ~/.ansible /collections/ansible_collections/ansible/eda/requirements .txt |
If you did not do it, you will face an error message ModuleNotFoundError: No module named 'aiokafka'
.
Install Kafka
First, we need to download the package:
1 | curl -O https: //packages .confluent.io /archive/7 .3 /confluent-7 .3.3.zip |
unzip it with unzip confluent-7.3.3.zip
.
Once we moved into the extracted directory, we must create a new storage. For that, we will use kafka-storage
to generate a unique random id:
1 | KAFKA_CLUSTER_ID= "$(bin/kafka-storage random-uuid)" |
And format storage with that ID:
1 | bin /kafka-storage format -t $KAFKA_CLUSTER_ID -c etc /kafka/kraft/server .properties |
The mentioned properties file is an example provided with the package. I changed two options in that file:
- advertised.listeners to the server IP instead of localhost
- log.dirs to a more persitent directory as it was pointing to /tmp
Now that storage is ready, we can start Kafka server:
1 | bin /kafka-server-start etc /kafka/kraft/server .properties |
Server will take few seconds to start and will be eventually ready:
1 | [2023-04-06 14:14:09,584] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer) |
Keep this window open as it is running server.
Rule Book
The rule book will look like this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | --- - name : Demo rules with kafka as source hosts: localhost sources: - name : Kafka ansible.eda.kafka: topic: eda host: 192.168.33.20 port: 9092 group_id: testing rules: - name : Check defined condition: event.i is defined action: debug: - name : Stop condition: event.stop == true action: shutdown: |
And starting it with ansible-rulebook --rulebook kafka-test-rules.yml -i inventory.yml --verbose
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | 2023-04-06 14:46:17,311 - ansible_rulebook.app - INFO - Starting sources 2023-04-06 14:46:17,312 - ansible_rulebook.app - INFO - Starting rules 2023-04-06 14:46:17,312 - ansible_rulebook.engine - INFO - run_ruleset 2023-04-06 14:46:18,070 - ansible_rulebook.engine - INFO - ruleset define: {"name": "Demo rules with kafka as source", "hosts": ["localhost"], "sources": [{"EventSource": {"name": "Kafka", "source_name": "ansible.eda.kafka", "source_args": {"topic": "eda", "host": "192.168.33.20", "port": 9092, "group_id": "testing"}, "source_filters": []}}], "rules": [{"Rule": {"name": "Check defined", "condition": {"AllCondition": [{"IsDefinedExpression": {"Event": "i"}}]}, "action": {"Action": {"action": "debug", "action_args": {}}}, "enabled": true}}, {"Rule": {"name": "Stop", "condition": {"AllCondition": [{"EqualsExpression": {"lhs": {"Event": "stop"}, "rhs": {"Boolean": true}}}]}, "action": {"Action": {"action": "shutdown", "action_args": {}}}, "enabled": true}}]} 2023-04-06 14:46:18,070 - ansible_rulebook.engine - INFO - load source 2023-04-06 14:46:18,646 - ansible_rulebook.engine - INFO - load source filters 2023-04-06 14:46:18,650 - ansible_rulebook.engine - INFO - Calling main in ansible.eda.kafka 2023-04-06 14:46:18,650 - aiokafka.consumer.subscription_state - INFO - Updating subscribed topics to: frozenset({'eda'}) 2023-04-06 14:46:18,654 - ansible_rulebook.engine - INFO - Start a playbook action runner for Demo rules with kafka as source 2023-04-06 14:46:18,655 - ansible_rulebook.engine - INFO - Waiting for actions on events from Demo rules with kafka as source 2023-04-06 14:46:18,655 - ansible_rulebook.engine - INFO - Waiting for events from Demo rules with kafka as source 2023-04-06 14:46:18,709 - aiokafka.consumer.group_coordinator - INFO - Discovered coordinator 1 for group testing 2023-04-06 14:46:18,711 - aiokafka.consumer.group_coordinator - INFO - Revoking previously assigned partitions set() for group testing 2023-04-06 14:46:18,711 - aiokafka.consumer.group_coordinator - INFO - (Re-)joining group testing 2023-04-06 14:46:18,762 - aiokafka.consumer.group_coordinator - INFO - Joined group 'testing' (generation 5) with member_id aiokafka-0.8.0-4497ff79-c442-4cf2-97a2-958708bbdfde 2023-04-06 14:46:18,762 - aiokafka.consumer.group_coordinator - INFO - Elected group leader -- performing partition assignments using roundrobin 2023-04-06 14:46:18,794 - aiokafka.consumer.group_coordinator - INFO - Successfully synced group testing with generation 5 2023-04-06 14:46:18,795 - aiokafka.consumer.group_coordinator - INFO - Setting newly assigned partitions {TopicPartition(topic='eda', partition=0)} for group testing |
Let’s produce few messages to test everything works as expected (finishing with Ctrl+c):
1 2 3 4 | ~ /confluent-7 .3.3$ . /bin/kafka-console-producer --bootstrap-server 192.168.33.20:9092 --topic eda >TEST1 >TEST2 >^Cvagrant@debian11:~ /confluent-7 .3.3$ |
And consume them:
1 2 3 | ~ /confluent-7 .3.3$ . /bin/kafka-console-consumer --bootstrap-server 192.168.33.20:9092 --topic eda --from-beginning TEST1 TEST2 |
“--from-beginning
” flag will make consumer read whole topic history.
Triggering the Playbook
As in previous blog, messages are in JSON format. The rule book has two rules:
- If JSON has a “i” field, it will display debug information.
- If it has a “stop” field set to true, it will stop the rule book.
Let’s git it a try:
In the producer prompt (i.e. >
), I enter {“i”:””}, the following will be displayed in the ansible-rulebook:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | 2023-04-06 15:15:06 855 [main] INFO org.drools.ansible.rulebook.integration.api.rulesengine.RegisterOnlyAgendaFilter - Activation of effective rule "Check defined" with facts: [Event DROOLS_PROTOTYPE with values = {i=}] 2023-04-06 15:15:06,856 - ansible_rulebook.rule_generator - INFO - calling Check defined 2023-04-06 15:15:06,857 - ansible_rulebook.engine - INFO - call_action debug 2023-04-06 15:15:06,857 - ansible_rulebook.engine - INFO - substitute_variables [{}] [{'event': {'i': ''}, 'fact': {'i': ''}}] 2023-04-06 15:15:06,857 - ansible_rulebook.engine - INFO - action args: {} ====================================================================================================================================================================================================== kwargs: {'facts': {}, 'hosts': ['localhost'], 'inventory': {'all': {'hosts': {'localhost': {'ansible_connection': 'local'}}}}, 'project_data_file': None, 'ruleset': 'Demo rules with kafka as source', 'source_rule_name': 'Check defined', 'source_ruleset_name': 'Demo rules with kafka as source', 'variables': {'event': {'i': ''}, 'fact': {'i': ''}}} ====================================================================================================================================================================================================== |
Next message could be {"stop":true}
and associated output of ansible-rulebook:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | 2023-04-06 15:17:43 398 [main] INFO org.drools.ansible.rulebook.integration.api.rulesengine.RegisterOnlyAgendaFilter - <strong>Activation of effective rule "Stop"</strong> with facts: [Event DROOLS_PROTOTYPE with values = {stop=true}] 2023-04-06 15:17:43,419 - ansible_rulebook.rule_generator - INFO - calling Stop 2023-04-06 15:17:43,421 - ansible_rulebook.engine - INFO - call_action shutdown 2023-04-06 15:17:43,422 - ansible_rulebook.engine - INFO - substitute_variables [{}] [{'event': {'stop': True}, 'fact': {'stop': True}}] 2023-04-06 15:17:43,422 - ansible_rulebook.engine - INFO - action args: {} 2023-04-06 15:17:43,437 - ansible_rulebook.engine - INFO - Attempt to stop ruleset Demo rules with kafka as source 2023-04-06 15:17:43,438 - ansible_rulebook.engine - INFO - Playbook action runner Demo rules with kafka as source exits 2023-04-06 15:17:43,439 - ansible_rulebook.engine - INFO - Stopped waiting for actions on events from Demo rules with kafka as source 2023-04-06 15:17:43,441 - ansible_rulebook.engine - INFO - Stopped waiting on events from Demo rules with kafka as source 2023-04-06 15:17:43,442 - ansible_rulebook.engine - INFO - Canceling all ruleset tasks 2023-04-06 15:17:43,444 - ansible_rulebook.app - INFO - Cancelling event source tasks 2023-04-06 15:17:43,447 - root - INFO - Stopping kafka consumer 2023-04-06 15:17:43,464 - aiokafka.consumer.group_coordinator - INFO - LeaveGroup request succeeded 2023-04-06 15:17:43,467 - ansible_rulebook.engine - INFO - Task cancelled 2023-04-06 15:17:43,468 - ansible_rulebook.app - INFO - Main complete |
Certainly, it is not possible to reverse this action remotely as the service is stopped .
If you have any idea or suggestion on possible scenario I could test, feel free to leave a comment.
Sangeetha
30.06.2024have installed and configured kafka_2.13-3.7.0.tgz on my rhel dev machine. am able to publish and consume messages on kafka server. EDA rule book activation is successful and running. however, when I send events/message to kafka topic which is not visible / received by EDA rulebook but by kafka console topic. I didn't see anything informative on eda activation logs or kafka logs. On kafka server properties i am using kafka server ip and 9092 port and same used in rulebook as well. When I checked the established connections on kafka, i see eda server is listed in it. netstat -an | grep ESTABLISHED | grep 9092. Can you please advise what needs to be checked.
Hello,
Thanks for your message.
Without seeing your logs, it will be difficult to help you. I would suggest to compare your logs with what I shared in the blog post.