I will add another example of what can be done with Ansible Driven Automation by interfacing it with Kafka. For those who wonder what Event-Driven Ansible is have a look at my previous blog about it.

Additional Installation Steps

In my previous blog, I already listed the steps to setup Event Driven Ansible (EDA). To connect to Kafka (I will talk about Kafka installation in next chapter), Ansible has more requirements which are not installed via ansible-galaxy, but via pip. To do so, simply use the requirements file from the collection:

pip install -r ~/.ansible/collections/ansible_collections/ansible/eda/requirements.txt

If you did not do it, you will face an error message ModuleNotFoundError: No module named 'aiokafka'.

Install Kafka

First, we need to download the package:

curl -O https://packages.confluent.io/archive/7.3/confluent-7.3.3.zip

unzip it with unzip confluent-7.3.3.zip.

Once we moved into the extracted directory, we must create a new storage. For that, we will use kafka-storage to generate a unique random id:

KAFKA_CLUSTER_ID="$(bin/kafka-storage random-uuid)"

And format storage with that ID:

bin/kafka-storage format -t $KAFKA_CLUSTER_ID -c etc/kafka/kraft/server.properties

The mentioned properties file is an example provided with the package. I changed two options in that file:

  • advertised.listeners to the server IP instead of localhost
  • log.dirs to a more persitent directory as it was pointing to /tmp

Now that storage is ready, we can start Kafka server:

bin/kafka-server-start etc/kafka/kraft/server.properties

Server will take few seconds to start and will be eventually ready:

[2023-04-06 14:14:09,584] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)

Keep this window open as it is running server.

Rule Book

The rule book will look like this:

---
- name: Demo rules with kafka as source
  hosts: localhost
  sources:
    - name: Kafka
      ansible.eda.kafka:
        topic: eda
        host: 192.168.33.20
        port: 9092
        group_id: testing
  rules:
    - name: Check defined
      condition: event.i is defined
      action:
        debug:
    - name: Stop
      condition: event.stop == true
      action:
        shutdown:

And starting it with ansible-rulebook --rulebook kafka-test-rules.yml -i inventory.yml --verbose:

2023-04-06 14:46:17,311 - ansible_rulebook.app - INFO - Starting sources
2023-04-06 14:46:17,312 - ansible_rulebook.app - INFO - Starting rules
2023-04-06 14:46:17,312 - ansible_rulebook.engine - INFO - run_ruleset
2023-04-06 14:46:18,070 - ansible_rulebook.engine - INFO - ruleset define: {"name": "Demo rules with kafka as source", "hosts": ["localhost"], "sources": [{"EventSource": {"name": "Kafka", "source_name": "ansible.eda.kafka", "source_args": {"topic": "eda", "host": "192.168.33.20", "port": 9092, "group_id": "testing"}, "source_filters": []}}], "rules": [{"Rule": {"name": "Check defined", "condition": {"AllCondition": [{"IsDefinedExpression": {"Event": "i"}}]}, "action": {"Action": {"action": "debug", "action_args": {}}}, "enabled": true}}, {"Rule": {"name": "Stop", "condition": {"AllCondition": [{"EqualsExpression": {"lhs": {"Event": "stop"}, "rhs": {"Boolean": true}}}]}, "action": {"Action": {"action": "shutdown", "action_args": {}}}, "enabled": true}}]}
2023-04-06 14:46:18,070 - ansible_rulebook.engine - INFO - load source
2023-04-06 14:46:18,646 - ansible_rulebook.engine - INFO - load source filters
2023-04-06 14:46:18,650 - ansible_rulebook.engine - INFO - Calling main in ansible.eda.kafka
2023-04-06 14:46:18,650 - aiokafka.consumer.subscription_state - INFO - Updating subscribed topics to: frozenset({'eda'})
2023-04-06 14:46:18,654 - ansible_rulebook.engine - INFO - Start a playbook action runner for Demo rules with kafka as source
2023-04-06 14:46:18,655 - ansible_rulebook.engine - INFO - Waiting for actions on events from Demo rules with kafka as source
2023-04-06 14:46:18,655 - ansible_rulebook.engine - INFO - Waiting for events from Demo rules with kafka as source
2023-04-06 14:46:18,709 - aiokafka.consumer.group_coordinator - INFO - Discovered coordinator 1 for group testing
2023-04-06 14:46:18,711 - aiokafka.consumer.group_coordinator - INFO - Revoking previously assigned partitions set() for group testing
2023-04-06 14:46:18,711 - aiokafka.consumer.group_coordinator - INFO - (Re-)joining group testing
2023-04-06 14:46:18,762 - aiokafka.consumer.group_coordinator - INFO - Joined group 'testing' (generation 5) with member_id aiokafka-0.8.0-4497ff79-c442-4cf2-97a2-958708bbdfde
2023-04-06 14:46:18,762 - aiokafka.consumer.group_coordinator - INFO - Elected group leader -- performing partition assignments using roundrobin
2023-04-06 14:46:18,794 - aiokafka.consumer.group_coordinator - INFO - Successfully synced group testing with generation 5
2023-04-06 14:46:18,795 - aiokafka.consumer.group_coordinator - INFO - Setting newly assigned partitions {TopicPartition(topic='eda', partition=0)} for group testing

Let’s produce few messages to test everything works as expected (finishing with Ctrl+c):

~/confluent-7.3.3$ ./bin/kafka-console-producer --bootstrap-server 192.168.33.20:9092 --topic eda
>TEST1
>TEST2
>^Cvagrant@debian11:~/confluent-7.3.3$

And consume them:

~/confluent-7.3.3$ ./bin/kafka-console-consumer --bootstrap-server 192.168.33.20:9092 --topic eda --from-beginning
TEST1
TEST2

--from-beginning” flag will make consumer read whole topic history.

Triggering the Playbook

As in previous blog, messages are in JSON format. The rule book has two rules:

  • If JSON has a “i” field, it will display debug information.
  • If it has a “stop” field set to true, it will stop the rule book.

Let’s git it a try:

In the producer prompt (i.e. >), I enter {“i”:””}, the following will be displayed in the ansible-rulebook:

2023-04-06 15:15:06 855 [main] INFO org.drools.ansible.rulebook.integration.api.rulesengine.RegisterOnlyAgendaFilter - Activation of effective rule "Check defined" with facts: [Event DROOLS_PROTOTYPE with values = {i=}]
2023-04-06 15:15:06,856 - ansible_rulebook.rule_generator - INFO - calling Check defined
2023-04-06 15:15:06,857 - ansible_rulebook.engine - INFO - call_action debug
2023-04-06 15:15:06,857 - ansible_rulebook.engine - INFO - substitute_variables [{}] [{'event': {'i': ''}, 'fact': {'i': ''}}]
2023-04-06 15:15:06,857 - ansible_rulebook.engine - INFO - action args: {}
======================================================================================================================================================================================================
kwargs:
{'facts': {},
 'hosts': ['localhost'],
 'inventory': {'all': {'hosts': {'localhost': {'ansible_connection': 'local'}}}},
 'project_data_file': None,
 'ruleset': 'Demo rules with kafka as source',
 'source_rule_name': 'Check defined',
 'source_ruleset_name': 'Demo rules with kafka as source',
 'variables': {'event': {'i': ''}, 'fact': {'i': ''}}}
======================================================================================================================================================================================================

Next message could be {"stop":true} and associated output of ansible-rulebook:

2023-04-06 15:17:43 398 [main] INFO org.drools.ansible.rulebook.integration.api.rulesengine.RegisterOnlyAgendaFilter - <strong>Activation of effective rule "Stop"</strong> with facts: [Event DROOLS_PROTOTYPE with values = {stop=true}]
2023-04-06 15:17:43,419 - ansible_rulebook.rule_generator - INFO - calling Stop
2023-04-06 15:17:43,421 - ansible_rulebook.engine - INFO - call_action shutdown
2023-04-06 15:17:43,422 - ansible_rulebook.engine - INFO - substitute_variables [{}] [{'event': {'stop': True}, 'fact': {'stop': True}}]
2023-04-06 15:17:43,422 - ansible_rulebook.engine - INFO - action args: {}
2023-04-06 15:17:43,437 - ansible_rulebook.engine - INFO - Attempt to stop ruleset Demo rules with kafka as source
2023-04-06 15:17:43,438 - ansible_rulebook.engine - INFO - Playbook action runner Demo rules with kafka as source exits
2023-04-06 15:17:43,439 - ansible_rulebook.engine - INFO - Stopped waiting for actions on events from Demo rules with kafka as source
2023-04-06 15:17:43,441 - ansible_rulebook.engine - INFO - Stopped waiting on events from Demo rules with kafka as source
2023-04-06 15:17:43,442 - ansible_rulebook.engine - INFO - Canceling all ruleset tasks
2023-04-06 15:17:43,444 - ansible_rulebook.app - INFO - Cancelling event source tasks
2023-04-06 15:17:43,447 - root - INFO - Stopping kafka consumer
2023-04-06 15:17:43,464 - aiokafka.consumer.group_coordinator - INFO - LeaveGroup request succeeded
2023-04-06 15:17:43,467 - ansible_rulebook.engine - INFO - Task cancelled
2023-04-06 15:17:43,468 - ansible_rulebook.app - INFO - Main complete

Certainly, it is not possible to reverse this action remotely as the service is stopped 😉 .

If you have any idea or suggestion on possible scenario I could test, feel free to leave a comment.