Kafka doesn't save offset if consume short time












1















Problem



A consumer with particular group id connects to a broker, listens topic for less than 1 minute and disconnects (according to business logic). While it listens to the topic it can consume some messages.
When the same consumer repeats this action it consumes the same messages!



I discovered that Kafka saves offset with interval 1 minute. It means that the consumer has to listen the topic for more than 1 minute.
How can I reduce this interval?



I've found such properties:





  • log.flush.offset.checkpoint.interval.ms

  • log.flush.start.offset.checkpoint.interval.ms


  • offset.flush.interval.ms - looks the most appropriate


I try to set them in server.properties file:



log.flush.offset.checkpoint.interval.ms=6000
log.flush.start.offset.checkpoint.interval.ms=6000
offset.flush.interval.ms=6000


Restart Kafka and Zookeeper. But it doesn't help. The consumer still has to listen to the topic for more than 1 minute. What I do wrong?



My environment




  • Kafka and Zookeeper via Confluent.


  • php-rdkafka as client library


  • enable.auto.commit is set to true


I use low level consumer. auto.offset.reset is set to smallest.
Code example



<?php
$topicConf = new RdKafkaTopicConf();
$topicConf->set('auto.offset.reset', 'smallest');

$conf = new RdKafkaConf();
$conf->set('group.id', 'foo');

$kafkaConsumer = new RdKafkaConsumer($conf);
$kafkaConsumer->addBrokers('queue.a:9092');
$kafkaConsumer->setLogLevel(LOG_DEBUG);

$topicConf = new RdKafkaTopicConf();
$topicConf->set('auto.offset.reset', 'smallest');

$queue = $kafkaConsumer->newQueue();
$topic = $kafkaConsumer->newTopic('topic_name', $topicConf);
$topic->consumeQueueStart(0, RD_KAFKA_OFFSET_STORED, $queue);

while (true) {
$msg = $queue->consume(2000);
if ($msg !== null) {
var_dump($msg);
}
}









share|improve this question





























    1















    Problem



    A consumer with particular group id connects to a broker, listens topic for less than 1 minute and disconnects (according to business logic). While it listens to the topic it can consume some messages.
    When the same consumer repeats this action it consumes the same messages!



    I discovered that Kafka saves offset with interval 1 minute. It means that the consumer has to listen the topic for more than 1 minute.
    How can I reduce this interval?



    I've found such properties:





    • log.flush.offset.checkpoint.interval.ms

    • log.flush.start.offset.checkpoint.interval.ms


    • offset.flush.interval.ms - looks the most appropriate


    I try to set them in server.properties file:



    log.flush.offset.checkpoint.interval.ms=6000
    log.flush.start.offset.checkpoint.interval.ms=6000
    offset.flush.interval.ms=6000


    Restart Kafka and Zookeeper. But it doesn't help. The consumer still has to listen to the topic for more than 1 minute. What I do wrong?



    My environment




    • Kafka and Zookeeper via Confluent.


    • php-rdkafka as client library


    • enable.auto.commit is set to true


    I use low level consumer. auto.offset.reset is set to smallest.
    Code example



    <?php
    $topicConf = new RdKafkaTopicConf();
    $topicConf->set('auto.offset.reset', 'smallest');

    $conf = new RdKafkaConf();
    $conf->set('group.id', 'foo');

    $kafkaConsumer = new RdKafkaConsumer($conf);
    $kafkaConsumer->addBrokers('queue.a:9092');
    $kafkaConsumer->setLogLevel(LOG_DEBUG);

    $topicConf = new RdKafkaTopicConf();
    $topicConf->set('auto.offset.reset', 'smallest');

    $queue = $kafkaConsumer->newQueue();
    $topic = $kafkaConsumer->newTopic('topic_name', $topicConf);
    $topic->consumeQueueStart(0, RD_KAFKA_OFFSET_STORED, $queue);

    while (true) {
    $msg = $queue->consume(2000);
    if ($msg !== null) {
    var_dump($msg);
    }
    }









    share|improve this question



























      1












      1








      1








      Problem



      A consumer with particular group id connects to a broker, listens topic for less than 1 minute and disconnects (according to business logic). While it listens to the topic it can consume some messages.
      When the same consumer repeats this action it consumes the same messages!



      I discovered that Kafka saves offset with interval 1 minute. It means that the consumer has to listen the topic for more than 1 minute.
      How can I reduce this interval?



      I've found such properties:





      • log.flush.offset.checkpoint.interval.ms

      • log.flush.start.offset.checkpoint.interval.ms


      • offset.flush.interval.ms - looks the most appropriate


      I try to set them in server.properties file:



      log.flush.offset.checkpoint.interval.ms=6000
      log.flush.start.offset.checkpoint.interval.ms=6000
      offset.flush.interval.ms=6000


      Restart Kafka and Zookeeper. But it doesn't help. The consumer still has to listen to the topic for more than 1 minute. What I do wrong?



      My environment




      • Kafka and Zookeeper via Confluent.


      • php-rdkafka as client library


      • enable.auto.commit is set to true


      I use low level consumer. auto.offset.reset is set to smallest.
      Code example



      <?php
      $topicConf = new RdKafkaTopicConf();
      $topicConf->set('auto.offset.reset', 'smallest');

      $conf = new RdKafkaConf();
      $conf->set('group.id', 'foo');

      $kafkaConsumer = new RdKafkaConsumer($conf);
      $kafkaConsumer->addBrokers('queue.a:9092');
      $kafkaConsumer->setLogLevel(LOG_DEBUG);

      $topicConf = new RdKafkaTopicConf();
      $topicConf->set('auto.offset.reset', 'smallest');

      $queue = $kafkaConsumer->newQueue();
      $topic = $kafkaConsumer->newTopic('topic_name', $topicConf);
      $topic->consumeQueueStart(0, RD_KAFKA_OFFSET_STORED, $queue);

      while (true) {
      $msg = $queue->consume(2000);
      if ($msg !== null) {
      var_dump($msg);
      }
      }









      share|improve this question
















      Problem



      A consumer with particular group id connects to a broker, listens topic for less than 1 minute and disconnects (according to business logic). While it listens to the topic it can consume some messages.
      When the same consumer repeats this action it consumes the same messages!



      I discovered that Kafka saves offset with interval 1 minute. It means that the consumer has to listen the topic for more than 1 minute.
      How can I reduce this interval?



      I've found such properties:





      • log.flush.offset.checkpoint.interval.ms

      • log.flush.start.offset.checkpoint.interval.ms


      • offset.flush.interval.ms - looks the most appropriate


      I try to set them in server.properties file:



      log.flush.offset.checkpoint.interval.ms=6000
      log.flush.start.offset.checkpoint.interval.ms=6000
      offset.flush.interval.ms=6000


      Restart Kafka and Zookeeper. But it doesn't help. The consumer still has to listen to the topic for more than 1 minute. What I do wrong?



      My environment




      • Kafka and Zookeeper via Confluent.


      • php-rdkafka as client library


      • enable.auto.commit is set to true


      I use low level consumer. auto.offset.reset is set to smallest.
      Code example



      <?php
      $topicConf = new RdKafkaTopicConf();
      $topicConf->set('auto.offset.reset', 'smallest');

      $conf = new RdKafkaConf();
      $conf->set('group.id', 'foo');

      $kafkaConsumer = new RdKafkaConsumer($conf);
      $kafkaConsumer->addBrokers('queue.a:9092');
      $kafkaConsumer->setLogLevel(LOG_DEBUG);

      $topicConf = new RdKafkaTopicConf();
      $topicConf->set('auto.offset.reset', 'smallest');

      $queue = $kafkaConsumer->newQueue();
      $topic = $kafkaConsumer->newTopic('topic_name', $topicConf);
      $topic->consumeQueueStart(0, RD_KAFKA_OFFSET_STORED, $queue);

      while (true) {
      $msg = $queue->consume(2000);
      if ($msg !== null) {
      var_dump($msg);
      }
      }






      php apache-kafka






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 23 '18 at 11:29







      Evgenii Karavskii

















      asked Nov 23 '18 at 10:10









      Evgenii KaravskiiEvgenii Karavskii

      114




      114
























          1 Answer
          1






          active

          oldest

          votes


















          1














          You should try to explicitly commit the offset in your consumer:




          Explicitly Committing Offsets in Consumers
          If you go with the automatic offset commits, you don’t need to worry about explicitly committing offsets. But you do need to think about how you will commit offsets if you decide you need more control over the timing of offset commits—either in order to minimize duplicates or because you are doing event processing outside the main consumer poll loop.




          Extract from Kafka definitive guide, page 127. (It's a free Ebook you can download)



          It is recommended that you Always commit offsets after events were processed If you do all the processing within the poll loop and don’t maintain state between poll loops (e.g., for aggregation), this should be easy. You can use the auto-commit configuration or commit events at the end of the poll loop.



          I have not used the php client myself, but looks like this could be what you need.



          Adding to your code example above:



          while (true) {
          $msg = $queue->consume(2000);
          if ($msg !== null) {
          var_dump($msg);
          $kafkaConsumer->commit($msg);
          }
          }





          share|improve this answer


























          • php-rdkafka is a bit strange library. It has explicit committing only for high-level consumer class. I have to use low-level consumer class. And enable.auto.commit is set to true. P.S. thanks for book :)

            – Evgenii Karavskii
            Nov 23 '18 at 11:26













          • @EvgeniiKaravskii php-rdkafka is simply a wrapper over librdkafka library, which is what actually does all the "heavy lifting". It's used in other languages as well.

            – SteveB
            Nov 23 '18 at 11:40












          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53444641%2fkafka-doesnt-save-offset-if-consume-short-time%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          1














          You should try to explicitly commit the offset in your consumer:




          Explicitly Committing Offsets in Consumers
          If you go with the automatic offset commits, you don’t need to worry about explicitly committing offsets. But you do need to think about how you will commit offsets if you decide you need more control over the timing of offset commits—either in order to minimize duplicates or because you are doing event processing outside the main consumer poll loop.




          Extract from Kafka definitive guide, page 127. (It's a free Ebook you can download)



          It is recommended that you Always commit offsets after events were processed If you do all the processing within the poll loop and don’t maintain state between poll loops (e.g., for aggregation), this should be easy. You can use the auto-commit configuration or commit events at the end of the poll loop.



          I have not used the php client myself, but looks like this could be what you need.



          Adding to your code example above:



          while (true) {
          $msg = $queue->consume(2000);
          if ($msg !== null) {
          var_dump($msg);
          $kafkaConsumer->commit($msg);
          }
          }





          share|improve this answer


























          • php-rdkafka is a bit strange library. It has explicit committing only for high-level consumer class. I have to use low-level consumer class. And enable.auto.commit is set to true. P.S. thanks for book :)

            – Evgenii Karavskii
            Nov 23 '18 at 11:26













          • @EvgeniiKaravskii php-rdkafka is simply a wrapper over librdkafka library, which is what actually does all the "heavy lifting". It's used in other languages as well.

            – SteveB
            Nov 23 '18 at 11:40
















          1














          You should try to explicitly commit the offset in your consumer:




          Explicitly Committing Offsets in Consumers
          If you go with the automatic offset commits, you don’t need to worry about explicitly committing offsets. But you do need to think about how you will commit offsets if you decide you need more control over the timing of offset commits—either in order to minimize duplicates or because you are doing event processing outside the main consumer poll loop.




          Extract from Kafka definitive guide, page 127. (It's a free Ebook you can download)



          It is recommended that you Always commit offsets after events were processed If you do all the processing within the poll loop and don’t maintain state between poll loops (e.g., for aggregation), this should be easy. You can use the auto-commit configuration or commit events at the end of the poll loop.



          I have not used the php client myself, but looks like this could be what you need.



          Adding to your code example above:



          while (true) {
          $msg = $queue->consume(2000);
          if ($msg !== null) {
          var_dump($msg);
          $kafkaConsumer->commit($msg);
          }
          }





          share|improve this answer


























          • php-rdkafka is a bit strange library. It has explicit committing only for high-level consumer class. I have to use low-level consumer class. And enable.auto.commit is set to true. P.S. thanks for book :)

            – Evgenii Karavskii
            Nov 23 '18 at 11:26













          • @EvgeniiKaravskii php-rdkafka is simply a wrapper over librdkafka library, which is what actually does all the "heavy lifting". It's used in other languages as well.

            – SteveB
            Nov 23 '18 at 11:40














          1












          1








          1







          You should try to explicitly commit the offset in your consumer:




          Explicitly Committing Offsets in Consumers
          If you go with the automatic offset commits, you don’t need to worry about explicitly committing offsets. But you do need to think about how you will commit offsets if you decide you need more control over the timing of offset commits—either in order to minimize duplicates or because you are doing event processing outside the main consumer poll loop.




          Extract from Kafka definitive guide, page 127. (It's a free Ebook you can download)



          It is recommended that you Always commit offsets after events were processed If you do all the processing within the poll loop and don’t maintain state between poll loops (e.g., for aggregation), this should be easy. You can use the auto-commit configuration or commit events at the end of the poll loop.



          I have not used the php client myself, but looks like this could be what you need.



          Adding to your code example above:



          while (true) {
          $msg = $queue->consume(2000);
          if ($msg !== null) {
          var_dump($msg);
          $kafkaConsumer->commit($msg);
          }
          }





          share|improve this answer















          You should try to explicitly commit the offset in your consumer:




          Explicitly Committing Offsets in Consumers
          If you go with the automatic offset commits, you don’t need to worry about explicitly committing offsets. But you do need to think about how you will commit offsets if you decide you need more control over the timing of offset commits—either in order to minimize duplicates or because you are doing event processing outside the main consumer poll loop.




          Extract from Kafka definitive guide, page 127. (It's a free Ebook you can download)



          It is recommended that you Always commit offsets after events were processed If you do all the processing within the poll loop and don’t maintain state between poll loops (e.g., for aggregation), this should be easy. You can use the auto-commit configuration or commit events at the end of the poll loop.



          I have not used the php client myself, but looks like this could be what you need.



          Adding to your code example above:



          while (true) {
          $msg = $queue->consume(2000);
          if ($msg !== null) {
          var_dump($msg);
          $kafkaConsumer->commit($msg);
          }
          }






          share|improve this answer














          share|improve this answer



          share|improve this answer








          edited Nov 23 '18 at 10:37

























          answered Nov 23 '18 at 10:28









          lloiaconolloiacono

          2,16622129




          2,16622129













          • php-rdkafka is a bit strange library. It has explicit committing only for high-level consumer class. I have to use low-level consumer class. And enable.auto.commit is set to true. P.S. thanks for book :)

            – Evgenii Karavskii
            Nov 23 '18 at 11:26













          • @EvgeniiKaravskii php-rdkafka is simply a wrapper over librdkafka library, which is what actually does all the "heavy lifting". It's used in other languages as well.

            – SteveB
            Nov 23 '18 at 11:40



















          • php-rdkafka is a bit strange library. It has explicit committing only for high-level consumer class. I have to use low-level consumer class. And enable.auto.commit is set to true. P.S. thanks for book :)

            – Evgenii Karavskii
            Nov 23 '18 at 11:26













          • @EvgeniiKaravskii php-rdkafka is simply a wrapper over librdkafka library, which is what actually does all the "heavy lifting". It's used in other languages as well.

            – SteveB
            Nov 23 '18 at 11:40

















          php-rdkafka is a bit strange library. It has explicit committing only for high-level consumer class. I have to use low-level consumer class. And enable.auto.commit is set to true. P.S. thanks for book :)

          – Evgenii Karavskii
          Nov 23 '18 at 11:26







          php-rdkafka is a bit strange library. It has explicit committing only for high-level consumer class. I have to use low-level consumer class. And enable.auto.commit is set to true. P.S. thanks for book :)

          – Evgenii Karavskii
          Nov 23 '18 at 11:26















          @EvgeniiKaravskii php-rdkafka is simply a wrapper over librdkafka library, which is what actually does all the "heavy lifting". It's used in other languages as well.

          – SteveB
          Nov 23 '18 at 11:40





          @EvgeniiKaravskii php-rdkafka is simply a wrapper over librdkafka library, which is what actually does all the "heavy lifting". It's used in other languages as well.

          – SteveB
          Nov 23 '18 at 11:40




















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53444641%2fkafka-doesnt-save-offset-if-consume-short-time%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          "Incorrect syntax near the keyword 'ON'. (on update cascade, on delete cascade,)

          Alcedinidae

          Origin of the phrase “under your belt”?