How can Kafka mqtt connector send mqtt topic as key?
up vote
0
down vote
favorite
I have a MQTT broker and a Kafka broker running, I have used the kafka-connector: https://github.com/Landoop/stream-reactor, with the next configuration:
name=Mqtt-Source
connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
tasks.max=1
connect.mqtt.kcql=INSERT INTO test SELECT * FROM + WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter` WITHKEY(id)
connect.mqtt.connection.clean=true
connect.mqtt.connection.timeout=1000
connect.mqtt.connection.keep.alive=1000
connect.mqtt.client.id=test_mqtt_connector
connect.mqtt.converter.throw.on.error=true
connect.mqtt.hosts=tcp://mqtt-broker:1883
connect.mqtt.service.quality=1
In the kcql, I'm definning the field of the message that kafka should take as key, is it anyway to use the mqtt-topic as key? So I don't need to define the WITHKEY()
in the kcql.
apache-kafka mqtt apache-kafka-connect
add a comment |
up vote
0
down vote
favorite
I have a MQTT broker and a Kafka broker running, I have used the kafka-connector: https://github.com/Landoop/stream-reactor, with the next configuration:
name=Mqtt-Source
connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
tasks.max=1
connect.mqtt.kcql=INSERT INTO test SELECT * FROM + WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter` WITHKEY(id)
connect.mqtt.connection.clean=true
connect.mqtt.connection.timeout=1000
connect.mqtt.connection.keep.alive=1000
connect.mqtt.client.id=test_mqtt_connector
connect.mqtt.converter.throw.on.error=true
connect.mqtt.hosts=tcp://mqtt-broker:1883
connect.mqtt.service.quality=1
In the kcql, I'm definning the field of the message that kafka should take as key, is it anyway to use the mqtt-topic as key? So I don't need to define the WITHKEY()
in the kcql.
apache-kafka mqtt apache-kafka-connect
add a comment |
up vote
0
down vote
favorite
up vote
0
down vote
favorite
I have a MQTT broker and a Kafka broker running, I have used the kafka-connector: https://github.com/Landoop/stream-reactor, with the next configuration:
name=Mqtt-Source
connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
tasks.max=1
connect.mqtt.kcql=INSERT INTO test SELECT * FROM + WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter` WITHKEY(id)
connect.mqtt.connection.clean=true
connect.mqtt.connection.timeout=1000
connect.mqtt.connection.keep.alive=1000
connect.mqtt.client.id=test_mqtt_connector
connect.mqtt.converter.throw.on.error=true
connect.mqtt.hosts=tcp://mqtt-broker:1883
connect.mqtt.service.quality=1
In the kcql, I'm definning the field of the message that kafka should take as key, is it anyway to use the mqtt-topic as key? So I don't need to define the WITHKEY()
in the kcql.
apache-kafka mqtt apache-kafka-connect
I have a MQTT broker and a Kafka broker running, I have used the kafka-connector: https://github.com/Landoop/stream-reactor, with the next configuration:
name=Mqtt-Source
connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
tasks.max=1
connect.mqtt.kcql=INSERT INTO test SELECT * FROM + WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter` WITHKEY(id)
connect.mqtt.connection.clean=true
connect.mqtt.connection.timeout=1000
connect.mqtt.connection.keep.alive=1000
connect.mqtt.client.id=test_mqtt_connector
connect.mqtt.converter.throw.on.error=true
connect.mqtt.hosts=tcp://mqtt-broker:1883
connect.mqtt.service.quality=1
In the kcql, I'm definning the field of the message that kafka should take as key, is it anyway to use the mqtt-topic as key? So I don't need to define the WITHKEY()
in the kcql.
apache-kafka mqtt apache-kafka-connect
apache-kafka mqtt apache-kafka-connect
asked Nov 19 at 8:08
Asier Gomez
1,36672156
1,36672156
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
up vote
0
down vote
I don't know about Landoop's KCQL, but assuming the topic is part of the message value, you can move it to the key like so
transforms=ReplaceKey,ExtractKey
transforms.ReplaceKey.type=org.apache.kafka.connect.transforms.ValueToKey
# change the field accordingly
transforms.ReplaceKey.fields=mqtt_topic
transforms.ExtractKey.type=org.apache.kafka.connect.transforms.ExtractField$Key
# make sure this is the same field as above
transforms.ExtractKey.field=mqtt_topic
If not, then you can staticly insert it
transforms=AddKey
transforms.AddKey.type=org.apache.kafka.connect.transforms.InsertField$Key
# The exclamation makes this a required field
transforms.AddKey.static.field=mqtt_topic!
transforms.AddKey.static.value="<<your topic name>>"
However, the above might not work with SELECT * FROM +
, where you are selecting from all MQTT topics
Thanks, but the problem is that the KCQL need to define the key, how can I send the key from there?
– Asier Gomez
Nov 20 at 8:25
The first solution you propouse, is throwing:java.lang.NullPointerException at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:85)
if I put that paraghraph behind kcql command, any idea?
– Asier Gomez
Nov 20 at 8:45
You're not supposed to literally copy it... As I commented in it, change accordingly... I cannot see your Kafka messages to know what that should be... And looking at the KCQL documentation or MQTT Landoop page, no you actually don't need a key, since keys can be null in Kafka messages anyway
– cricket_007
Nov 20 at 11:16
If I don't add a key it fails asking for a key, I know in the documentation it doesn't need, but then it fails if you don't give a key
– Asier Gomez
Nov 20 at 11:19
add a comment |
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
0
down vote
I don't know about Landoop's KCQL, but assuming the topic is part of the message value, you can move it to the key like so
transforms=ReplaceKey,ExtractKey
transforms.ReplaceKey.type=org.apache.kafka.connect.transforms.ValueToKey
# change the field accordingly
transforms.ReplaceKey.fields=mqtt_topic
transforms.ExtractKey.type=org.apache.kafka.connect.transforms.ExtractField$Key
# make sure this is the same field as above
transforms.ExtractKey.field=mqtt_topic
If not, then you can staticly insert it
transforms=AddKey
transforms.AddKey.type=org.apache.kafka.connect.transforms.InsertField$Key
# The exclamation makes this a required field
transforms.AddKey.static.field=mqtt_topic!
transforms.AddKey.static.value="<<your topic name>>"
However, the above might not work with SELECT * FROM +
, where you are selecting from all MQTT topics
Thanks, but the problem is that the KCQL need to define the key, how can I send the key from there?
– Asier Gomez
Nov 20 at 8:25
The first solution you propouse, is throwing:java.lang.NullPointerException at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:85)
if I put that paraghraph behind kcql command, any idea?
– Asier Gomez
Nov 20 at 8:45
You're not supposed to literally copy it... As I commented in it, change accordingly... I cannot see your Kafka messages to know what that should be... And looking at the KCQL documentation or MQTT Landoop page, no you actually don't need a key, since keys can be null in Kafka messages anyway
– cricket_007
Nov 20 at 11:16
If I don't add a key it fails asking for a key, I know in the documentation it doesn't need, but then it fails if you don't give a key
– Asier Gomez
Nov 20 at 11:19
add a comment |
up vote
0
down vote
I don't know about Landoop's KCQL, but assuming the topic is part of the message value, you can move it to the key like so
transforms=ReplaceKey,ExtractKey
transforms.ReplaceKey.type=org.apache.kafka.connect.transforms.ValueToKey
# change the field accordingly
transforms.ReplaceKey.fields=mqtt_topic
transforms.ExtractKey.type=org.apache.kafka.connect.transforms.ExtractField$Key
# make sure this is the same field as above
transforms.ExtractKey.field=mqtt_topic
If not, then you can staticly insert it
transforms=AddKey
transforms.AddKey.type=org.apache.kafka.connect.transforms.InsertField$Key
# The exclamation makes this a required field
transforms.AddKey.static.field=mqtt_topic!
transforms.AddKey.static.value="<<your topic name>>"
However, the above might not work with SELECT * FROM +
, where you are selecting from all MQTT topics
Thanks, but the problem is that the KCQL need to define the key, how can I send the key from there?
– Asier Gomez
Nov 20 at 8:25
The first solution you propouse, is throwing:java.lang.NullPointerException at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:85)
if I put that paraghraph behind kcql command, any idea?
– Asier Gomez
Nov 20 at 8:45
You're not supposed to literally copy it... As I commented in it, change accordingly... I cannot see your Kafka messages to know what that should be... And looking at the KCQL documentation or MQTT Landoop page, no you actually don't need a key, since keys can be null in Kafka messages anyway
– cricket_007
Nov 20 at 11:16
If I don't add a key it fails asking for a key, I know in the documentation it doesn't need, but then it fails if you don't give a key
– Asier Gomez
Nov 20 at 11:19
add a comment |
up vote
0
down vote
up vote
0
down vote
I don't know about Landoop's KCQL, but assuming the topic is part of the message value, you can move it to the key like so
transforms=ReplaceKey,ExtractKey
transforms.ReplaceKey.type=org.apache.kafka.connect.transforms.ValueToKey
# change the field accordingly
transforms.ReplaceKey.fields=mqtt_topic
transforms.ExtractKey.type=org.apache.kafka.connect.transforms.ExtractField$Key
# make sure this is the same field as above
transforms.ExtractKey.field=mqtt_topic
If not, then you can staticly insert it
transforms=AddKey
transforms.AddKey.type=org.apache.kafka.connect.transforms.InsertField$Key
# The exclamation makes this a required field
transforms.AddKey.static.field=mqtt_topic!
transforms.AddKey.static.value="<<your topic name>>"
However, the above might not work with SELECT * FROM +
, where you are selecting from all MQTT topics
I don't know about Landoop's KCQL, but assuming the topic is part of the message value, you can move it to the key like so
transforms=ReplaceKey,ExtractKey
transforms.ReplaceKey.type=org.apache.kafka.connect.transforms.ValueToKey
# change the field accordingly
transforms.ReplaceKey.fields=mqtt_topic
transforms.ExtractKey.type=org.apache.kafka.connect.transforms.ExtractField$Key
# make sure this is the same field as above
transforms.ExtractKey.field=mqtt_topic
If not, then you can staticly insert it
transforms=AddKey
transforms.AddKey.type=org.apache.kafka.connect.transforms.InsertField$Key
# The exclamation makes this a required field
transforms.AddKey.static.field=mqtt_topic!
transforms.AddKey.static.value="<<your topic name>>"
However, the above might not work with SELECT * FROM +
, where you are selecting from all MQTT topics
edited Nov 19 at 17:09
answered Nov 19 at 17:03
cricket_007
77.5k1142107
77.5k1142107
Thanks, but the problem is that the KCQL need to define the key, how can I send the key from there?
– Asier Gomez
Nov 20 at 8:25
The first solution you propouse, is throwing:java.lang.NullPointerException at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:85)
if I put that paraghraph behind kcql command, any idea?
– Asier Gomez
Nov 20 at 8:45
You're not supposed to literally copy it... As I commented in it, change accordingly... I cannot see your Kafka messages to know what that should be... And looking at the KCQL documentation or MQTT Landoop page, no you actually don't need a key, since keys can be null in Kafka messages anyway
– cricket_007
Nov 20 at 11:16
If I don't add a key it fails asking for a key, I know in the documentation it doesn't need, but then it fails if you don't give a key
– Asier Gomez
Nov 20 at 11:19
add a comment |
Thanks, but the problem is that the KCQL need to define the key, how can I send the key from there?
– Asier Gomez
Nov 20 at 8:25
The first solution you propouse, is throwing:java.lang.NullPointerException at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:85)
if I put that paraghraph behind kcql command, any idea?
– Asier Gomez
Nov 20 at 8:45
You're not supposed to literally copy it... As I commented in it, change accordingly... I cannot see your Kafka messages to know what that should be... And looking at the KCQL documentation or MQTT Landoop page, no you actually don't need a key, since keys can be null in Kafka messages anyway
– cricket_007
Nov 20 at 11:16
If I don't add a key it fails asking for a key, I know in the documentation it doesn't need, but then it fails if you don't give a key
– Asier Gomez
Nov 20 at 11:19
Thanks, but the problem is that the KCQL need to define the key, how can I send the key from there?
– Asier Gomez
Nov 20 at 8:25
Thanks, but the problem is that the KCQL need to define the key, how can I send the key from there?
– Asier Gomez
Nov 20 at 8:25
The first solution you propouse, is throwing:
java.lang.NullPointerException at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:85)
if I put that paraghraph behind kcql command, any idea?– Asier Gomez
Nov 20 at 8:45
The first solution you propouse, is throwing:
java.lang.NullPointerException at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:85)
if I put that paraghraph behind kcql command, any idea?– Asier Gomez
Nov 20 at 8:45
You're not supposed to literally copy it... As I commented in it, change accordingly... I cannot see your Kafka messages to know what that should be... And looking at the KCQL documentation or MQTT Landoop page, no you actually don't need a key, since keys can be null in Kafka messages anyway
– cricket_007
Nov 20 at 11:16
You're not supposed to literally copy it... As I commented in it, change accordingly... I cannot see your Kafka messages to know what that should be... And looking at the KCQL documentation or MQTT Landoop page, no you actually don't need a key, since keys can be null in Kafka messages anyway
– cricket_007
Nov 20 at 11:16
If I don't add a key it fails asking for a key, I know in the documentation it doesn't need, but then it fails if you don't give a key
– Asier Gomez
Nov 20 at 11:19
If I don't add a key it fails asking for a key, I know in the documentation it doesn't need, but then it fails if you don't give a key
– Asier Gomez
Nov 20 at 11:19
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53370576%2fhow-can-kafka-mqtt-connector-send-mqtt-topic-as-key%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown