Kafka doesn't save offset if consume short time
Problem
A consumer with particular group id connects to a broker, listens topic for less than 1 minute and disconnects (according to business logic). While it listens to the topic it can consume some messages.
When the same consumer repeats this action it consumes the same messages!
I discovered that Kafka saves offset with interval 1 minute. It means that the consumer has to listen the topic for more than 1 minute.
How can I reduce this interval?
I've found such properties:
log.flush.offset.checkpoint.interval.ms
log.flush.start.offset.checkpoint.interval.ms
offset.flush.interval.ms
- looks the most appropriate
I try to set them in server.properties
file:
log.flush.offset.checkpoint.interval.ms=6000
log.flush.start.offset.checkpoint.interval.ms=6000
offset.flush.interval.ms=6000
Restart Kafka and Zookeeper. But it doesn't help. The consumer still has to listen to the topic for more than 1 minute. What I do wrong?
My environment
- Kafka and Zookeeper via Confluent.
php-rdkafka
as client library
enable.auto.commit
is set totrue
I use low level consumer. auto.offset.reset
is set to smallest
.
Code example
<?php
$topicConf = new RdKafkaTopicConf();
$topicConf->set('auto.offset.reset', 'smallest');
$conf = new RdKafkaConf();
$conf->set('group.id', 'foo');
$kafkaConsumer = new RdKafkaConsumer($conf);
$kafkaConsumer->addBrokers('queue.a:9092');
$kafkaConsumer->setLogLevel(LOG_DEBUG);
$topicConf = new RdKafkaTopicConf();
$topicConf->set('auto.offset.reset', 'smallest');
$queue = $kafkaConsumer->newQueue();
$topic = $kafkaConsumer->newTopic('topic_name', $topicConf);
$topic->consumeQueueStart(0, RD_KAFKA_OFFSET_STORED, $queue);
while (true) {
$msg = $queue->consume(2000);
if ($msg !== null) {
var_dump($msg);
}
}
php apache-kafka
add a comment |
Problem
A consumer with particular group id connects to a broker, listens topic for less than 1 minute and disconnects (according to business logic). While it listens to the topic it can consume some messages.
When the same consumer repeats this action it consumes the same messages!
I discovered that Kafka saves offset with interval 1 minute. It means that the consumer has to listen the topic for more than 1 minute.
How can I reduce this interval?
I've found such properties:
log.flush.offset.checkpoint.interval.ms
log.flush.start.offset.checkpoint.interval.ms
offset.flush.interval.ms
- looks the most appropriate
I try to set them in server.properties
file:
log.flush.offset.checkpoint.interval.ms=6000
log.flush.start.offset.checkpoint.interval.ms=6000
offset.flush.interval.ms=6000
Restart Kafka and Zookeeper. But it doesn't help. The consumer still has to listen to the topic for more than 1 minute. What I do wrong?
My environment
- Kafka and Zookeeper via Confluent.
php-rdkafka
as client library
enable.auto.commit
is set totrue
I use low level consumer. auto.offset.reset
is set to smallest
.
Code example
<?php
$topicConf = new RdKafkaTopicConf();
$topicConf->set('auto.offset.reset', 'smallest');
$conf = new RdKafkaConf();
$conf->set('group.id', 'foo');
$kafkaConsumer = new RdKafkaConsumer($conf);
$kafkaConsumer->addBrokers('queue.a:9092');
$kafkaConsumer->setLogLevel(LOG_DEBUG);
$topicConf = new RdKafkaTopicConf();
$topicConf->set('auto.offset.reset', 'smallest');
$queue = $kafkaConsumer->newQueue();
$topic = $kafkaConsumer->newTopic('topic_name', $topicConf);
$topic->consumeQueueStart(0, RD_KAFKA_OFFSET_STORED, $queue);
while (true) {
$msg = $queue->consume(2000);
if ($msg !== null) {
var_dump($msg);
}
}
php apache-kafka
add a comment |
Problem
A consumer with particular group id connects to a broker, listens topic for less than 1 minute and disconnects (according to business logic). While it listens to the topic it can consume some messages.
When the same consumer repeats this action it consumes the same messages!
I discovered that Kafka saves offset with interval 1 minute. It means that the consumer has to listen the topic for more than 1 minute.
How can I reduce this interval?
I've found such properties:
log.flush.offset.checkpoint.interval.ms
log.flush.start.offset.checkpoint.interval.ms
offset.flush.interval.ms
- looks the most appropriate
I try to set them in server.properties
file:
log.flush.offset.checkpoint.interval.ms=6000
log.flush.start.offset.checkpoint.interval.ms=6000
offset.flush.interval.ms=6000
Restart Kafka and Zookeeper. But it doesn't help. The consumer still has to listen to the topic for more than 1 minute. What I do wrong?
My environment
- Kafka and Zookeeper via Confluent.
php-rdkafka
as client library
enable.auto.commit
is set totrue
I use low level consumer. auto.offset.reset
is set to smallest
.
Code example
<?php
$topicConf = new RdKafkaTopicConf();
$topicConf->set('auto.offset.reset', 'smallest');
$conf = new RdKafkaConf();
$conf->set('group.id', 'foo');
$kafkaConsumer = new RdKafkaConsumer($conf);
$kafkaConsumer->addBrokers('queue.a:9092');
$kafkaConsumer->setLogLevel(LOG_DEBUG);
$topicConf = new RdKafkaTopicConf();
$topicConf->set('auto.offset.reset', 'smallest');
$queue = $kafkaConsumer->newQueue();
$topic = $kafkaConsumer->newTopic('topic_name', $topicConf);
$topic->consumeQueueStart(0, RD_KAFKA_OFFSET_STORED, $queue);
while (true) {
$msg = $queue->consume(2000);
if ($msg !== null) {
var_dump($msg);
}
}
php apache-kafka
Problem
A consumer with particular group id connects to a broker, listens topic for less than 1 minute and disconnects (according to business logic). While it listens to the topic it can consume some messages.
When the same consumer repeats this action it consumes the same messages!
I discovered that Kafka saves offset with interval 1 minute. It means that the consumer has to listen the topic for more than 1 minute.
How can I reduce this interval?
I've found such properties:
log.flush.offset.checkpoint.interval.ms
log.flush.start.offset.checkpoint.interval.ms
offset.flush.interval.ms
- looks the most appropriate
I try to set them in server.properties
file:
log.flush.offset.checkpoint.interval.ms=6000
log.flush.start.offset.checkpoint.interval.ms=6000
offset.flush.interval.ms=6000
Restart Kafka and Zookeeper. But it doesn't help. The consumer still has to listen to the topic for more than 1 minute. What I do wrong?
My environment
- Kafka and Zookeeper via Confluent.
php-rdkafka
as client library
enable.auto.commit
is set totrue
I use low level consumer. auto.offset.reset
is set to smallest
.
Code example
<?php
$topicConf = new RdKafkaTopicConf();
$topicConf->set('auto.offset.reset', 'smallest');
$conf = new RdKafkaConf();
$conf->set('group.id', 'foo');
$kafkaConsumer = new RdKafkaConsumer($conf);
$kafkaConsumer->addBrokers('queue.a:9092');
$kafkaConsumer->setLogLevel(LOG_DEBUG);
$topicConf = new RdKafkaTopicConf();
$topicConf->set('auto.offset.reset', 'smallest');
$queue = $kafkaConsumer->newQueue();
$topic = $kafkaConsumer->newTopic('topic_name', $topicConf);
$topic->consumeQueueStart(0, RD_KAFKA_OFFSET_STORED, $queue);
while (true) {
$msg = $queue->consume(2000);
if ($msg !== null) {
var_dump($msg);
}
}
php apache-kafka
php apache-kafka
edited Nov 23 '18 at 11:29
Evgenii Karavskii
asked Nov 23 '18 at 10:10
Evgenii KaravskiiEvgenii Karavskii
114
114
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
You should try to explicitly commit the offset in your consumer:
Explicitly Committing Offsets in Consumers
If you go with the automatic offset commits, you don’t need to worry about explicitly committing offsets. But you do need to think about how you will commit offsets if you decide you need more control over the timing of offset commits—either in order to minimize duplicates or because you are doing event processing outside the main consumer poll loop.
Extract from Kafka definitive guide, page 127. (It's a free Ebook you can download)
It is recommended that you Always commit offsets after events were processed If you do all the processing within the poll loop and don’t maintain state between poll loops (e.g., for aggregation), this should be easy. You can use the auto-commit configuration or commit events at the end of the poll loop.
I have not used the php client myself, but looks like this could be what you need.
Adding to your code example above:
while (true) {
$msg = $queue->consume(2000);
if ($msg !== null) {
var_dump($msg);
$kafkaConsumer->commit($msg);
}
}
php-rdkafka
is a bit strange library. It has explicit committing only for high-level consumer class. I have to use low-level consumer class. Andenable.auto.commit
is set totrue
. P.S. thanks for book :)
– Evgenii Karavskii
Nov 23 '18 at 11:26
@EvgeniiKaravskii php-rdkafka is simply a wrapper over librdkafka library, which is what actually does all the "heavy lifting". It's used in other languages as well.
– SteveB
Nov 23 '18 at 11:40
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53444641%2fkafka-doesnt-save-offset-if-consume-short-time%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
You should try to explicitly commit the offset in your consumer:
Explicitly Committing Offsets in Consumers
If you go with the automatic offset commits, you don’t need to worry about explicitly committing offsets. But you do need to think about how you will commit offsets if you decide you need more control over the timing of offset commits—either in order to minimize duplicates or because you are doing event processing outside the main consumer poll loop.
Extract from Kafka definitive guide, page 127. (It's a free Ebook you can download)
It is recommended that you Always commit offsets after events were processed If you do all the processing within the poll loop and don’t maintain state between poll loops (e.g., for aggregation), this should be easy. You can use the auto-commit configuration or commit events at the end of the poll loop.
I have not used the php client myself, but looks like this could be what you need.
Adding to your code example above:
while (true) {
$msg = $queue->consume(2000);
if ($msg !== null) {
var_dump($msg);
$kafkaConsumer->commit($msg);
}
}
php-rdkafka
is a bit strange library. It has explicit committing only for high-level consumer class. I have to use low-level consumer class. Andenable.auto.commit
is set totrue
. P.S. thanks for book :)
– Evgenii Karavskii
Nov 23 '18 at 11:26
@EvgeniiKaravskii php-rdkafka is simply a wrapper over librdkafka library, which is what actually does all the "heavy lifting". It's used in other languages as well.
– SteveB
Nov 23 '18 at 11:40
add a comment |
You should try to explicitly commit the offset in your consumer:
Explicitly Committing Offsets in Consumers
If you go with the automatic offset commits, you don’t need to worry about explicitly committing offsets. But you do need to think about how you will commit offsets if you decide you need more control over the timing of offset commits—either in order to minimize duplicates or because you are doing event processing outside the main consumer poll loop.
Extract from Kafka definitive guide, page 127. (It's a free Ebook you can download)
It is recommended that you Always commit offsets after events were processed If you do all the processing within the poll loop and don’t maintain state between poll loops (e.g., for aggregation), this should be easy. You can use the auto-commit configuration or commit events at the end of the poll loop.
I have not used the php client myself, but looks like this could be what you need.
Adding to your code example above:
while (true) {
$msg = $queue->consume(2000);
if ($msg !== null) {
var_dump($msg);
$kafkaConsumer->commit($msg);
}
}
php-rdkafka
is a bit strange library. It has explicit committing only for high-level consumer class. I have to use low-level consumer class. Andenable.auto.commit
is set totrue
. P.S. thanks for book :)
– Evgenii Karavskii
Nov 23 '18 at 11:26
@EvgeniiKaravskii php-rdkafka is simply a wrapper over librdkafka library, which is what actually does all the "heavy lifting". It's used in other languages as well.
– SteveB
Nov 23 '18 at 11:40
add a comment |
You should try to explicitly commit the offset in your consumer:
Explicitly Committing Offsets in Consumers
If you go with the automatic offset commits, you don’t need to worry about explicitly committing offsets. But you do need to think about how you will commit offsets if you decide you need more control over the timing of offset commits—either in order to minimize duplicates or because you are doing event processing outside the main consumer poll loop.
Extract from Kafka definitive guide, page 127. (It's a free Ebook you can download)
It is recommended that you Always commit offsets after events were processed If you do all the processing within the poll loop and don’t maintain state between poll loops (e.g., for aggregation), this should be easy. You can use the auto-commit configuration or commit events at the end of the poll loop.
I have not used the php client myself, but looks like this could be what you need.
Adding to your code example above:
while (true) {
$msg = $queue->consume(2000);
if ($msg !== null) {
var_dump($msg);
$kafkaConsumer->commit($msg);
}
}
You should try to explicitly commit the offset in your consumer:
Explicitly Committing Offsets in Consumers
If you go with the automatic offset commits, you don’t need to worry about explicitly committing offsets. But you do need to think about how you will commit offsets if you decide you need more control over the timing of offset commits—either in order to minimize duplicates or because you are doing event processing outside the main consumer poll loop.
Extract from Kafka definitive guide, page 127. (It's a free Ebook you can download)
It is recommended that you Always commit offsets after events were processed If you do all the processing within the poll loop and don’t maintain state between poll loops (e.g., for aggregation), this should be easy. You can use the auto-commit configuration or commit events at the end of the poll loop.
I have not used the php client myself, but looks like this could be what you need.
Adding to your code example above:
while (true) {
$msg = $queue->consume(2000);
if ($msg !== null) {
var_dump($msg);
$kafkaConsumer->commit($msg);
}
}
edited Nov 23 '18 at 10:37
answered Nov 23 '18 at 10:28
lloiaconolloiacono
2,16622129
2,16622129
php-rdkafka
is a bit strange library. It has explicit committing only for high-level consumer class. I have to use low-level consumer class. Andenable.auto.commit
is set totrue
. P.S. thanks for book :)
– Evgenii Karavskii
Nov 23 '18 at 11:26
@EvgeniiKaravskii php-rdkafka is simply a wrapper over librdkafka library, which is what actually does all the "heavy lifting". It's used in other languages as well.
– SteveB
Nov 23 '18 at 11:40
add a comment |
php-rdkafka
is a bit strange library. It has explicit committing only for high-level consumer class. I have to use low-level consumer class. Andenable.auto.commit
is set totrue
. P.S. thanks for book :)
– Evgenii Karavskii
Nov 23 '18 at 11:26
@EvgeniiKaravskii php-rdkafka is simply a wrapper over librdkafka library, which is what actually does all the "heavy lifting". It's used in other languages as well.
– SteveB
Nov 23 '18 at 11:40
php-rdkafka
is a bit strange library. It has explicit committing only for high-level consumer class. I have to use low-level consumer class. And enable.auto.commit
is set to true
. P.S. thanks for book :)– Evgenii Karavskii
Nov 23 '18 at 11:26
php-rdkafka
is a bit strange library. It has explicit committing only for high-level consumer class. I have to use low-level consumer class. And enable.auto.commit
is set to true
. P.S. thanks for book :)– Evgenii Karavskii
Nov 23 '18 at 11:26
@EvgeniiKaravskii php-rdkafka is simply a wrapper over librdkafka library, which is what actually does all the "heavy lifting". It's used in other languages as well.
– SteveB
Nov 23 '18 at 11:40
@EvgeniiKaravskii php-rdkafka is simply a wrapper over librdkafka library, which is what actually does all the "heavy lifting". It's used in other languages as well.
– SteveB
Nov 23 '18 at 11:40
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53444641%2fkafka-doesnt-save-offset-if-consume-short-time%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown