Skip to main content

Kafka v3

This Integration is part of the Kafka Pack.#

Supported versions

Supported Cortex XSOAR versions: 6.1.0 and later.

Use the Kafka integration to manage messages and partitions and to fetch Kafka messages to create incidents in Cortex XSOAR.

This integration was integrated and tested with version 2.7.1 of Kafka.

This integration is fully compatible with the Kafka v2 integration.

Configure Kafka v3 in Cortex#

ParameterDescriptionRequired
CSV list of Kafka brokers to connect to, e.g., 172.16.20.207:9092,172.16.20.234:9093True
Consumer group IDThis group id will be used when fetching incidents and preforming consumer commands. If not set the group id 'xsoar_group' will be used.False
Use TLS for connectionFalse
Trust any certificate (not secure)False
CA certificate of Kafka server (.cer)False
Client certificate (.cer)False
Client certificate key (.key)False
Client certificate key password (if required)False
PasswordFalse
Use SASL PLAIN for connectionFalse
SASL PLAIN UsernameFalse
SASL PLAIN PasswordFalse
Topic to fetch incidents from (Required for fetch incidents)False
CSV list of partitions to fetch messages fromFalse
Offset to fetch messages from (exclusive)The initial offset to start fetching from, not including the value set (e.g., if 3 is set, the first event that will be fetched will be with offset 4). If you want to start from the earliest or latest, type in 'earliest' or 'latest' accordingly.False
Maximum number of messages to fetchFalse
Stop consuming upon timeoutWhen fetching a significant number of messages (100+), it's advisable to halt message consumption upon timeout. This ensures that the fetch terminates if no messages are received after a specified duration, instead of requesting messages until reaching the maximum number of messages to fetch.False
Consumer OnlyFalse
Fetch incidentsFalse
Incident typeFalse
Max number of bytes per messageThe max number of message bytes to retrieve in each attempted fetch request. Should be in multiples of 1024. If the fetching process takes a long time, consider increasing this value. Default is '1048576'.False

Commands#

You can execute these commands from the CLI, as part of an automation, or in a playbook. After you successfully execute a command, a DBot message appears in the War Room with the command details.

kafka-print-topics#


Prints all topics and their partitions.

Base Command#

kafka-print-topics

Input#

Argument NameDescriptionRequired
include_offsetsWhether to include the first and last offset for a topic, when printing a list of topics and partitions. Possible values are: true, false. Default is true.Optional

Context Output#

PathTypeDescription
Kafka.Topic.NameStringKafka topic name.
Kafka.Topic.Partitions.IDNumberTopic partition ID.
Kafka.Topic.Partitions.EarliestOffsetNumberTopic partition earliest offset.
Kafka.Topic.Partitions.LatestOffsetNumberTopic partition latest offset.

Command Example#

!kafka-print-topics

Context Example#

{
"Kafka": {
"Topic": [
{
"Name": "test-topic1",
"Partitions": [
{
"ID": 0
}
]
},
{
"Name": "test-topic2",
"Partitions": [
{
"ID": 0
},
{
"ID": 1
}
]
}
]
}
}

Human Readable Output#

Kafka Topics#
NamePartitions
test-topic1{'ID': 0}
test-topic2{'ID': 0, 'EarliestOffset': 0, 'OldestOffset': 3}, {'ID': 1, 'EarliestOffset': 0, 'OldestOffset': 4}

kafka-publish-msg#


Publishes a message to Kafka.

Base Command#

kafka-publish-msg

Input#

Argument NameDescriptionRequired
topicA topic to publish messages to.Required
valueMessage value (string).Required
partitioning_keyMessage partition (number).Optional

Context Output#

There is no context output for this command.

Command Example#

!kafka-publish-msg topic=test-topic value="test message"

Human Readable Output#

Message was successfully produced to topic 'test-topic', partition 0

kafka-consume-msg#


Consumes a single Kafka message.

Base Command#

kafka-consume-msg

Input#

Argument NameDescriptionRequired
topicA topic to get messages from.Required
offsetMessage offset to filter by. Acceptable values are 'Earliest', 'Latest', or any other offest number. Default is Earliest.Optional
partitionPartition (number).Optional
poll_timeoutPoll timeout to consume the message.Optional

Context Output#

PathTypeDescription
Kafka.Topic.NamestringName of the topic.
Kafka.Topic.Message.ValuestringValue of the message.
Kafka.Topic.Message.OffsetnumberOffset of the value in the topic.

Command Example#

!kafka-consume-msg topic=test-topic offset=latest

Context Example#

{
"Kafka": {
"Topic": {
"Message": {
"Value": "test message",
"Offset": 11
},
"Name": "test-topic"
}
}
}

Human Readable Output#

Message consumed from topic 'test'#
OffsetMessage
11test message

kafka-fetch-partitions#


Fetches partitions for a topic.

Base Command#

kafka-fetch-partitions

Input#

Argument NameDescriptionRequired
topicA topic to fetch partitions for.Required

Context Output#

PathTypeDescription
Kafka.Topic.NamestringName of topic.
Kafka.Topic.PartitionnumberPrints all partitions for a topic.

Command Example#

!kafka-fetch-partitions topic=test

Context Example#

{
"Kafka": {
"Topic": {
"Partition": [
0,
1,
2
],
"Name": "test"
}
}
}

Human Readable Output#

Available partitions for topic 'test'#
Partitions
0
1
2

Configuration of SASL_SSL PLAIN:#

  1. Make sure you have the broker port which supports SSL connection.
  2. Add 'broker_address:port' to the brokers list.
  3. Provide the CA root certificate in the 'CA certificate of Kafka server (.cer)' section.
  4. If your client certificate is password protected, provide the password in the 'Client certificate key password (if required)' section.
  5. Provide SASL PLAIN Username and SASL PLAIN Password

Note: SASL is supported only when used in combination with SSL.

Important: This integration also supports users with consumer only permissions.