MindsDB and Kafka¶
MindsDB provides the Kafka connector plugin to connect to the Kafka cluster.
Please visit the Kafka Connect MindsDB page on the official Confluent site. It contains all the instructions on how to install the connector from the Confluent hub.
Setting up Kafka in Docker¶
Let's review the instructions here as well.
You may use the official connector docker image:
docker pull mindsdb/mindsdb-kafka-connector
The source code of the Kafka connector is in the kafka_connector GitHub repository. Please read the instructions before building the connector from scratch.
It is possible to integrate and use the Kafka connector as part of your own Kafka cluster, or you may use our test docker environment.
To try out our test docker environment, follow the steps below.
Before you bring the docker container up, please note that there are two types of connector configuration:
- For MindsDB Cloud
- For a separate MindsDB installation
No matter which option you choose, these files require real values to be set in place of a username, password, Kafka connection details, etc. The SASL mechanism details are optional, as local Kafka installation may not have this mechanism configured - or you can use this data for the SASL username and password.
Now that your config files store real data, you can execute the command below from the root of the kafka_connector GitHub repository to build the connector and launch it in the test environment locally.
docker-compose up -d
Let's go over some examples.
Examples¶
Prerequisites¶
-
Launch MindsDB instance where HTTP API interface runs on
docker network interface inet ip
. Usually, the IP address is172.17.0.1
, and the port is47334
.You can modify the HTTP API interface details in the MindsDB config file.
Now, to launch your MindsDB, run the following command:
python -m mindsdb --config=/path-to-the-config-file/your-config-file.json
Please note that the config file must be in JSON format. It must include this part of the config file from the MindsDB repository in a proper JSON format.
-
Train a new model. You may use this tutorial as an example.
-
Run the test environment as instructed in the Setting up Kafka in Docker section above.
Create the Connector Instance¶
To create a connector, you need to send a POST request to a specific CONNECTORS_URL
with connector configuration in JSON format, as below.
import requests
MINDSDB_URL = "http://172.17.0.1:47334"
CONNECTOR_NAME = "MindsDBConnector"
INTEGRATION_NAME = 'test_kafka'
KAFKA_PORT = 9092
KAFKA_HOST = "127.0.0.1"
CONNECTOR_NAME = "MindsDBConnector"
CONNECTORS_URL = "http://127.0.0.1:9021/api/connect/connect-default/connectors"
STREAM_IN = "topic_in"
STREAM_OUT = "topic_out"
PREDICTOR_NAME = "YOUR_MODEL_NAME" # set actual model name here
params = {"name": CONNECTOR_NAME,
"config": {"connector.class": "com.mindsdb.kafka.connect.MindsDBConnector",
"topics": STREAM_IN,
"mindsdb.url": MINDSDB_URL,
"kafka.api.host": KAFKA_HOST,
"kafka.api.port": KAFKA_PORT,
"kafka.api.name": INTEGRATION_NAME,
"predictor.name": PREDICTOR_NAME,
"output.forecast.topic": STREAM_OUT,
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanism": "PLAIN",
"sasl.plain.username": "admin",
"sasl.plain.password": "admin-secret",
}
}
headers = {"Content-Type": "application/json"}
res = requests.post(CONNECTORS_URL, json=params, headers=headers)
This code creates a MindsDB Kafka connector that uses the PREDICTOR_NAME
model, receives source data from the STREAM_IN
Kafka topic, and sends prediction results to the STREAM_OUT
Kafka topic.
Send Source Data and Receive Prediction Results¶
There are many Kafka client implementations - choose the most suitable one depending on your goals.
The code below generates and sends the source records to topic_in
by default. You can use any other Kafka topic by providing its name as a CMD parameter.
import sys
import json
import kafka
connection_info = {"bootstrap_servers": "127.0.0.1:9092",
"security_protocol": "SASL_PLAINTEXT",
"sasl_mechanism": "PLAIN",
"sasl_plain_username": "admin",
"sasl_plain_password": "admin-secret"}
producer = kafka.KafkaProducer(**connection_info)
if __name__ == '__main__':
print(json.dumps(connection_info))
if len(sys.argv) == 1:
topic = "topic_in"
else:
topic = sys.argv[1]
for x in range(1, 4):
data = {"Age": x+20, "Weight": x * x * 0.8 + 200, "Height": x * x * 0.5 + 65}
to_send = json.dumps(data)
producer.send(topic, to_send.encode('utf-8'))
producer.close()
And the following code shows how to read prediction results from topic_out
by default. Again, you can use any other Kafka topic by providing its name as a CMD parameter.
import sys
import json
import kafka
connection_info = {"bootstrap_servers": "127.0.0.1:9092",
"security_protocol": "SASL_PLAINTEXT",
"sasl_mechanism": "PLAIN",
"sasl_plain_username": "admin",
"sasl_plain_password": "admin-secret",
"auto_offset_reset": 'earliest',
"consumer_timeout_ms": 1000}
consumer = kafka.KafkaConsumer(**connection_info)
if __name__ == '__main__':
print(json.dumps(connection_info))
if len(sys.argv) == 1:
topic = "topic_out"
else:
topic = sys.argv[1]
consumer.subscribe(topics=[topic])
for msg in consumer:
print(msg.value)
print("-" * 100)
What's Next?¶
Now that you are all set, we recommend you check out our Tutorials and Community Tutorials sections, where you'll find various examples of regression, classification, and time series predictions with MindsDB.
To learn more about MindsDB itself, follow the guide on MindsDB database structure. Also, don't miss out on the remaining pages from the SQL API section, as they explain a common SQL syntax with examples.
Have fun!