Trident OpaqueKafkaSpout: transactions not committed
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty{ height:90px;width:728px;box-sizing:border-box;
}
So I'm using the storm-kafka-client to subscribe my topology to a Kafka topic. Since I need exactly-once semantics I'm using the Opaque Spout, however the first transaction is never committed, it gets saved to the state and then it's replayed over and over again periodically. If I push more data into Kafka, this data is not being saved to the state either because the previous transaction hasn't been commited. The State I'm using is the Kafka State as well.
I'm using Strom 1.2.1 and Kafka 1.0.1 if that matters.
edit: this is the topology I'm using.
Config conf = new Config();
conf.setNumWorkers(NUM_WORKERS);
conf.setDebug(true);
conf.setMaxSpoutPending(MAX_SPOUT_PENDING);
KafkaSpoutConfig.Builder<String, String> spoutConfig = KafkaSpoutConfig.builder(KAFKA_BROKERS, KAFKA_TOPIC);
spoutConfig.setProp(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), SCHEMA_REGISTRY_URL);
spoutConfig.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
spoutConfig.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
spoutConfig.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE);
spoutConfig.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST);
spoutConfig.setOffsetCommitPeriodMs(5000);
KafkaTridentSpoutOpaque kafkaSpout = new KafkaTridentSpoutOpaque<> (spoutConfig.build());
TridentTopology topology = new TridentTopology();
Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_BROKERS);
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withProducerProperties(props)
.withKafkaTopicSelector(new DefaultTopicSelector("test"))
.withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("parsed_value", "parsed_value"));
Stream tramasRedis = topology.newStream("KAFKA_TRAMAS", kafkaSpout).parallelismHint(KAFKA_SPOUT_PARALELLISM)
.each(new Fields("key", "value"), new ReadRedisMessage(), new Fields("parsed_value"));
tramasRedis.partitionPersist(stateFactory, new Fields("parsed_value"), new TridentKafkaStateUpdater());
LocalCluster lc = new LocalCluster();
StormSubmitter.submitTopology(TOPOLOGY_NAME, conf, topology.build());
And this is what I get in the logs when I receive a message from Kafka in debug mode.
2018-11-26 08:20:16.166 o.a.s.d.task Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Emitting: spout-KAFKA_TRAMAS s1 [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]
2018-11-26 08:20:16.167 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] TRANSFERING tuple [dest: 5 tuple: source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]]
2018-11-26 08:20:16.168 o.a.s.d.task Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Emitting direct: 5; spout-KAFKA_TRAMAS $coord-bg0 [13:2, 1]
2018-11-26 08:20:16.168 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] TRANSFERING tuple [dest: 5 tuple: source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1]]
2018-11-26 08:20:16.169 o.a.s.d.task Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Emitting: spout-KAFKA_TRAMAS __ack_ack [-7985297489562004428 3421159403193702831]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] TRANSFERING tuple [dest: 3 tuple: source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831]]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Processing received message FOR 5 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] BOLT ack TASK: 6 TIME: -1 TUPLE: source: $spoutcoord-spout-KAFKA_TRAMAS:2, stream: $batch, id: {-7985297489562004428=4123209423349964546}, [13:2, [{partition=3, topic=redis-tramas-fnet}, {partition=2, topic=redis-tramas-fnet}, {partition=1, topic=redis-tramas-fnet}, {partition=0, topic=redis-tramas-fnet}, {partition=7, topic=redis-tramas-fnet}, {partition=6, topic=redis-tramas-fnet}, {partition=5, topic=redis-tramas-fnet}, {partition=4, topic=redis-tramas-fnet}, {partition=11, topic=redis-tramas-fnet}, {partition=10, topic=redis-tramas-fnet}, {partition=9, topic=redis-tramas-fnet}, {partition=8, topic=redis-tramas-fnet}, {partition=15, topic=redis-tramas-fnet}, {partition=14, topic=redis-tramas-fnet}, {partition=13, topic=redis-tramas-fnet}, {partition=12, topic=redis-tramas-fnet}]]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Execute done TUPLE source: $spoutcoord-spout-KAFKA_TRAMAS:2, stream: $batch, id: {-7985297489562004428=4123209423349964546}, [13:2, [{partition=3, topic=redis-tramas-fnet}, {partition=2, topic=redis-tramas-fnet}, {partition=1, topic=redis-tramas-fnet}, {partition=0, topic=redis-tramas-fnet}, {partition=7, topic=redis-tramas-fnet}, {partition=6, topic=redis-tramas-fnet}, {partition=5, topic=redis-tramas-fnet}, {partition=4, topic=redis-tramas-fnet}, {partition=11, topic=redis-tramas-fnet}, {partition=10, topic=redis-tramas-fnet}, {partition=9, topic=redis-tramas-fnet}, {partition=8, topic=redis-tramas-fnet}, {partition=15, topic=redis-tramas-fnet}, {partition=14, topic=redis-tramas-fnet}, {partition=13, topic=redis-tramas-fnet}, {partition=12, topic=redis-tramas-fnet}]] TASK: 6 DELTA: -1
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Processing received message FOR 6 TUPLE: source: $spoutcoord-spout-KAFKA_TRAMAS:2, stream: $batch, id: {-1505053285028712763=8306088157808961249}, [14:2, [{partition=3, topic=redis-tramas-fnet}, {partition=2, topic=redis-tramas-fnet}, {partition=1, topic=redis-tramas-fnet}, {partition=0, topic=redis-tramas-fnet}, {partition=7, topic=redis-tramas-fnet}, {partition=6, topic=redis-tramas-fnet}, {partition=5, topic=redis-tramas-fnet}, {partition=4, topic=redis-tramas-fnet}, {partition=11, topic=redis-tramas-fnet}, {partition=10, topic=redis-tramas-fnet}, {partition=9, topic=redis-tramas-fnet}, {partition=8, topic=redis-tramas-fnet}, {partition=15, topic=redis-tramas-fnet}, {partition=14, topic=redis-tramas-fnet}, {partition=13, topic=redis-tramas-fnet}, {partition=12, topic=redis-tramas-fnet}]]
2018-11-26 08:20:16.171 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Processing received message FOR 3 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831]
2018-11-26 08:20:16.171 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] BOLT ack TASK: 3 TIME: -1 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831]
2018-11-26 08:20:16.171 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Execute done TUPLE source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831] TASK: 3 DELTA: -1
2018-11-26 08:20:16.175 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] BOLT ack TASK: 5 TIME: 6 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]
2018-11-26 08:20:16.175 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Execute done TUPLE source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}] TASK: 5 DELTA: -1
2018-11-26 08:20:16.175 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Processing received message FOR 5 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1]
2018-11-26 08:20:16.175 o.a.s.d.task Thread-14-b-0-executor[5 5] [INFO] Emitting: b-0 __ack_ack [-7985297489562004428 1604111899640219309]
2018-11-26 08:20:16.176 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] TRANSFERING tuple [dest: 3 tuple: source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309]]
2018-11-26 08:20:16.176 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] BOLT ack TASK: 5 TIME: -1 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1]
2018-11-26 08:20:16.176 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Execute done TUPLE source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1] TASK: 5 DELTA: -1
2018-11-26 08:20:16.177 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Processing received message FOR 3 TUPLE: source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309]
2018-11-26 08:20:16.178 o.a.s.d.task Thread-6-__acker-executor[3 3] [INFO] Emitting direct: 1; __acker __ack_ack [-7985297489562004428 37198]
2018-11-26 08:20:16.178 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] TRANSFERING tuple [dest: 1 tuple: source: __acker:3, stream: __ack_ack, id: {}, [-7985297489562004428 37198]]
2018-11-26 08:20:16.178 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] BOLT ack TASK: 3 TIME: -1 TUPLE: source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309]
2018-11-26 08:20:16.178 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Execute done TUPLE source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309] TASK: 3 DELTA: -1
2018-11-26 08:20:16.181 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] Processing received message FOR 1 TUPLE: source: __acker:3, stream: __ack_ack, id: {}, [-7985297489562004428 37198]
2018-11-26 08:20:16.181 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] SPOUT Acking message -7985297489562004428 13:2
2018-11-26 08:20:16.561 o.a.s.d.task Thread-8-$mastercoord-bg0-executor[1 1] [INFO] Emitting: $mastercoord-bg0 $batch [88:1]
2018-11-26 08:20:16.562 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] TRANSFERING tuple [dest: 2 tuple: source: $mastercoord-bg0:1, stream: $batch, id: {-5934215212694827955=1827497614943510587}, [88:1]]
2018-11-26 08:20:16.562 o.a.s.d.task Thread-8-$mastercoord-bg0-executor[1 1] [INFO] Emitting: $mastercoord-bg0 __ack_init [-5934215212694827955 1827497614943510587 1]
2018-11-26 08:20:16.562 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] TRANSFERING tuple [dest: 3 tuple: source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1]]
2018-11-26 08:20:16.563 o.a.s.d.executor Thread-4-$spoutcoord-spout-KAFKA_TRAMAS-executor[2 2] [INFO] Processing received message FOR 2 TUPLE: source: $mastercoord-bg0:1, stream: $batch, id: {-5934215212694827955=1827497614943510587}, [88:1]
2018-11-26 08:20:16.565 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Processing received message FOR 3 TUPLE: source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1]
2018-11-26 08:20:16.565 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] BOLT ack TASK: 3 TIME: -1 TUPLE: source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1]
2018-11-26 08:20:16.565 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Execute done TUPLE source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1] TASK: 3 DELTA: -1
Activating the DEBUG logging on the MasterCoordinator I get these errors periodically.
2018-11-28 10:57:44.614 o.a.s.t.t.MasterBatchCoordinator Thread-8-$mastercoord-bg0-executor[1 1] [DEBUG] Fail. [tx_attempt = 18:4], [tx_status = null], [MasterBatchCoordinator{_states=[org.apache.storm.trident.topology.state.TransactionalState@6168a033], _activeTx={3=3:3 <PROCESSING>, 4=4:3 <PROCESSING>, 5=5:3 <PROCESSING>, 6=6:3 <PROCESSING>}, _attemptIds={3=3, 4=3, 5=3, 6=3, 7=3, 8=3, 9=3, 10=3, 11=3, 12=3, 13=3, 14=4, 15=4, 16=3, 17=3, 18=4, 19=7, 20=7, 21=7, 22=7, 23=7, 24=7, 25=7, 26=7, 27=7, 28=7, 29=7, 30=7, 31=7, 32=7, 33=7, 34=7, 35=7, 36=7, 37=7, 38=7, 39=7, 40=7, 41=7, 42=7, 43=6, 44=6, 45=5, 46=5, 47=5, 48=5, 49=5, 50=6, 51=5, 52=5, 53=5, 54=5, 55=5, 56=5, 57=5, 58=5, 59=5, 60=5, 61=5, 62=5, 63=5, 64=5, 65=5, 66=5, 67=5, 68=4, 69=4, 70=4, 71=4, 72=4, 73=4, 74=4, 75=4, 76=3, 77=3, 78=1, 79=1, 80=1, 81=1, 82=1, 83=1, 84=2, 85=1, 86=1, 87=1, 88=1, 89=1, 90=1, 91=1, 92=1, 93=1, 94=1, 95=1, 96=1, 97=1, 98=1, 99=1, 100=1, 101=0, 102=0}, _collector=org.apache.storm.spout.SpoutOutputCollector@1f58e2fd, _currTransaction=3, _maxTransactionActive=100, _coordinators=[org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Coordinator@6faffb26], _managedSpoutIds=[KAFKA_TRAMAS], _spouts=[org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor@59e6917f], _throttler=org.apache.storm.utils.WindowedTimeThrottler@7d2bdb48, _active=true}]
2018-11-28 10:57:44.616 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] SPOUT Failing -2574568553084235376: {:stream "$batch", :values [#object[org.apache.storm.trident.topology.TransactionAttempt 0x4fce1854 "19:6"]]} REASON: TIMEOUT MSG-ID: 19:6
Aparently the batches failing are empty. If there is data in the topic when the topology starts it works because there's no empty batches beforehand, however after the first batch is committed (the one with data), the empty batches start failing and new Kafka data is not being processed because there's uncommitted transactions. If the topic is empty, it just fails constantly and it doesn't push anything to Kafka.
java apache-kafka apache-storm
|
show 3 more comments
So I'm using the storm-kafka-client to subscribe my topology to a Kafka topic. Since I need exactly-once semantics I'm using the Opaque Spout, however the first transaction is never committed, it gets saved to the state and then it's replayed over and over again periodically. If I push more data into Kafka, this data is not being saved to the state either because the previous transaction hasn't been commited. The State I'm using is the Kafka State as well.
I'm using Strom 1.2.1 and Kafka 1.0.1 if that matters.
edit: this is the topology I'm using.
Config conf = new Config();
conf.setNumWorkers(NUM_WORKERS);
conf.setDebug(true);
conf.setMaxSpoutPending(MAX_SPOUT_PENDING);
KafkaSpoutConfig.Builder<String, String> spoutConfig = KafkaSpoutConfig.builder(KAFKA_BROKERS, KAFKA_TOPIC);
spoutConfig.setProp(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), SCHEMA_REGISTRY_URL);
spoutConfig.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
spoutConfig.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
spoutConfig.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE);
spoutConfig.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST);
spoutConfig.setOffsetCommitPeriodMs(5000);
KafkaTridentSpoutOpaque kafkaSpout = new KafkaTridentSpoutOpaque<> (spoutConfig.build());
TridentTopology topology = new TridentTopology();
Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_BROKERS);
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withProducerProperties(props)
.withKafkaTopicSelector(new DefaultTopicSelector("test"))
.withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("parsed_value", "parsed_value"));
Stream tramasRedis = topology.newStream("KAFKA_TRAMAS", kafkaSpout).parallelismHint(KAFKA_SPOUT_PARALELLISM)
.each(new Fields("key", "value"), new ReadRedisMessage(), new Fields("parsed_value"));
tramasRedis.partitionPersist(stateFactory, new Fields("parsed_value"), new TridentKafkaStateUpdater());
LocalCluster lc = new LocalCluster();
StormSubmitter.submitTopology(TOPOLOGY_NAME, conf, topology.build());
And this is what I get in the logs when I receive a message from Kafka in debug mode.
2018-11-26 08:20:16.166 o.a.s.d.task Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Emitting: spout-KAFKA_TRAMAS s1 [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]
2018-11-26 08:20:16.167 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] TRANSFERING tuple [dest: 5 tuple: source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]]
2018-11-26 08:20:16.168 o.a.s.d.task Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Emitting direct: 5; spout-KAFKA_TRAMAS $coord-bg0 [13:2, 1]
2018-11-26 08:20:16.168 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] TRANSFERING tuple [dest: 5 tuple: source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1]]
2018-11-26 08:20:16.169 o.a.s.d.task Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Emitting: spout-KAFKA_TRAMAS __ack_ack [-7985297489562004428 3421159403193702831]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] TRANSFERING tuple [dest: 3 tuple: source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831]]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Processing received message FOR 5 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] BOLT ack TASK: 6 TIME: -1 TUPLE: source: $spoutcoord-spout-KAFKA_TRAMAS:2, stream: $batch, id: {-7985297489562004428=4123209423349964546}, [13:2, [{partition=3, topic=redis-tramas-fnet}, {partition=2, topic=redis-tramas-fnet}, {partition=1, topic=redis-tramas-fnet}, {partition=0, topic=redis-tramas-fnet}, {partition=7, topic=redis-tramas-fnet}, {partition=6, topic=redis-tramas-fnet}, {partition=5, topic=redis-tramas-fnet}, {partition=4, topic=redis-tramas-fnet}, {partition=11, topic=redis-tramas-fnet}, {partition=10, topic=redis-tramas-fnet}, {partition=9, topic=redis-tramas-fnet}, {partition=8, topic=redis-tramas-fnet}, {partition=15, topic=redis-tramas-fnet}, {partition=14, topic=redis-tramas-fnet}, {partition=13, topic=redis-tramas-fnet}, {partition=12, topic=redis-tramas-fnet}]]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Execute done TUPLE source: $spoutcoord-spout-KAFKA_TRAMAS:2, stream: $batch, id: {-7985297489562004428=4123209423349964546}, [13:2, [{partition=3, topic=redis-tramas-fnet}, {partition=2, topic=redis-tramas-fnet}, {partition=1, topic=redis-tramas-fnet}, {partition=0, topic=redis-tramas-fnet}, {partition=7, topic=redis-tramas-fnet}, {partition=6, topic=redis-tramas-fnet}, {partition=5, topic=redis-tramas-fnet}, {partition=4, topic=redis-tramas-fnet}, {partition=11, topic=redis-tramas-fnet}, {partition=10, topic=redis-tramas-fnet}, {partition=9, topic=redis-tramas-fnet}, {partition=8, topic=redis-tramas-fnet}, {partition=15, topic=redis-tramas-fnet}, {partition=14, topic=redis-tramas-fnet}, {partition=13, topic=redis-tramas-fnet}, {partition=12, topic=redis-tramas-fnet}]] TASK: 6 DELTA: -1
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Processing received message FOR 6 TUPLE: source: $spoutcoord-spout-KAFKA_TRAMAS:2, stream: $batch, id: {-1505053285028712763=8306088157808961249}, [14:2, [{partition=3, topic=redis-tramas-fnet}, {partition=2, topic=redis-tramas-fnet}, {partition=1, topic=redis-tramas-fnet}, {partition=0, topic=redis-tramas-fnet}, {partition=7, topic=redis-tramas-fnet}, {partition=6, topic=redis-tramas-fnet}, {partition=5, topic=redis-tramas-fnet}, {partition=4, topic=redis-tramas-fnet}, {partition=11, topic=redis-tramas-fnet}, {partition=10, topic=redis-tramas-fnet}, {partition=9, topic=redis-tramas-fnet}, {partition=8, topic=redis-tramas-fnet}, {partition=15, topic=redis-tramas-fnet}, {partition=14, topic=redis-tramas-fnet}, {partition=13, topic=redis-tramas-fnet}, {partition=12, topic=redis-tramas-fnet}]]
2018-11-26 08:20:16.171 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Processing received message FOR 3 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831]
2018-11-26 08:20:16.171 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] BOLT ack TASK: 3 TIME: -1 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831]
2018-11-26 08:20:16.171 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Execute done TUPLE source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831] TASK: 3 DELTA: -1
2018-11-26 08:20:16.175 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] BOLT ack TASK: 5 TIME: 6 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]
2018-11-26 08:20:16.175 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Execute done TUPLE source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}] TASK: 5 DELTA: -1
2018-11-26 08:20:16.175 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Processing received message FOR 5 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1]
2018-11-26 08:20:16.175 o.a.s.d.task Thread-14-b-0-executor[5 5] [INFO] Emitting: b-0 __ack_ack [-7985297489562004428 1604111899640219309]
2018-11-26 08:20:16.176 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] TRANSFERING tuple [dest: 3 tuple: source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309]]
2018-11-26 08:20:16.176 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] BOLT ack TASK: 5 TIME: -1 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1]
2018-11-26 08:20:16.176 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Execute done TUPLE source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1] TASK: 5 DELTA: -1
2018-11-26 08:20:16.177 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Processing received message FOR 3 TUPLE: source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309]
2018-11-26 08:20:16.178 o.a.s.d.task Thread-6-__acker-executor[3 3] [INFO] Emitting direct: 1; __acker __ack_ack [-7985297489562004428 37198]
2018-11-26 08:20:16.178 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] TRANSFERING tuple [dest: 1 tuple: source: __acker:3, stream: __ack_ack, id: {}, [-7985297489562004428 37198]]
2018-11-26 08:20:16.178 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] BOLT ack TASK: 3 TIME: -1 TUPLE: source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309]
2018-11-26 08:20:16.178 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Execute done TUPLE source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309] TASK: 3 DELTA: -1
2018-11-26 08:20:16.181 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] Processing received message FOR 1 TUPLE: source: __acker:3, stream: __ack_ack, id: {}, [-7985297489562004428 37198]
2018-11-26 08:20:16.181 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] SPOUT Acking message -7985297489562004428 13:2
2018-11-26 08:20:16.561 o.a.s.d.task Thread-8-$mastercoord-bg0-executor[1 1] [INFO] Emitting: $mastercoord-bg0 $batch [88:1]
2018-11-26 08:20:16.562 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] TRANSFERING tuple [dest: 2 tuple: source: $mastercoord-bg0:1, stream: $batch, id: {-5934215212694827955=1827497614943510587}, [88:1]]
2018-11-26 08:20:16.562 o.a.s.d.task Thread-8-$mastercoord-bg0-executor[1 1] [INFO] Emitting: $mastercoord-bg0 __ack_init [-5934215212694827955 1827497614943510587 1]
2018-11-26 08:20:16.562 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] TRANSFERING tuple [dest: 3 tuple: source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1]]
2018-11-26 08:20:16.563 o.a.s.d.executor Thread-4-$spoutcoord-spout-KAFKA_TRAMAS-executor[2 2] [INFO] Processing received message FOR 2 TUPLE: source: $mastercoord-bg0:1, stream: $batch, id: {-5934215212694827955=1827497614943510587}, [88:1]
2018-11-26 08:20:16.565 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Processing received message FOR 3 TUPLE: source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1]
2018-11-26 08:20:16.565 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] BOLT ack TASK: 3 TIME: -1 TUPLE: source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1]
2018-11-26 08:20:16.565 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Execute done TUPLE source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1] TASK: 3 DELTA: -1
Activating the DEBUG logging on the MasterCoordinator I get these errors periodically.
2018-11-28 10:57:44.614 o.a.s.t.t.MasterBatchCoordinator Thread-8-$mastercoord-bg0-executor[1 1] [DEBUG] Fail. [tx_attempt = 18:4], [tx_status = null], [MasterBatchCoordinator{_states=[org.apache.storm.trident.topology.state.TransactionalState@6168a033], _activeTx={3=3:3 <PROCESSING>, 4=4:3 <PROCESSING>, 5=5:3 <PROCESSING>, 6=6:3 <PROCESSING>}, _attemptIds={3=3, 4=3, 5=3, 6=3, 7=3, 8=3, 9=3, 10=3, 11=3, 12=3, 13=3, 14=4, 15=4, 16=3, 17=3, 18=4, 19=7, 20=7, 21=7, 22=7, 23=7, 24=7, 25=7, 26=7, 27=7, 28=7, 29=7, 30=7, 31=7, 32=7, 33=7, 34=7, 35=7, 36=7, 37=7, 38=7, 39=7, 40=7, 41=7, 42=7, 43=6, 44=6, 45=5, 46=5, 47=5, 48=5, 49=5, 50=6, 51=5, 52=5, 53=5, 54=5, 55=5, 56=5, 57=5, 58=5, 59=5, 60=5, 61=5, 62=5, 63=5, 64=5, 65=5, 66=5, 67=5, 68=4, 69=4, 70=4, 71=4, 72=4, 73=4, 74=4, 75=4, 76=3, 77=3, 78=1, 79=1, 80=1, 81=1, 82=1, 83=1, 84=2, 85=1, 86=1, 87=1, 88=1, 89=1, 90=1, 91=1, 92=1, 93=1, 94=1, 95=1, 96=1, 97=1, 98=1, 99=1, 100=1, 101=0, 102=0}, _collector=org.apache.storm.spout.SpoutOutputCollector@1f58e2fd, _currTransaction=3, _maxTransactionActive=100, _coordinators=[org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Coordinator@6faffb26], _managedSpoutIds=[KAFKA_TRAMAS], _spouts=[org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor@59e6917f], _throttler=org.apache.storm.utils.WindowedTimeThrottler@7d2bdb48, _active=true}]
2018-11-28 10:57:44.616 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] SPOUT Failing -2574568553084235376: {:stream "$batch", :values [#object[org.apache.storm.trident.topology.TransactionAttempt 0x4fce1854 "19:6"]]} REASON: TIMEOUT MSG-ID: 19:6
Aparently the batches failing are empty. If there is data in the topic when the topology starts it works because there's no empty batches beforehand, however after the first batch is committed (the one with data), the empty batches start failing and new Kafka data is not being processed because there's uncommitted transactions. If the topic is empty, it just fails constantly and it doesn't push anything to Kafka.
java apache-kafka apache-storm
Is it the storm-kafka or storm-kafka-client spout?
– Stig Rohde Døssing
Nov 23 '18 at 13:23
@StigRohdeDøssing it's the storm-kafka-client
– Algeel
Nov 23 '18 at 13:31
Are you seeing anything in the worker logs? The State should log if it fails to write to Kafka. Also try posting your topology setup.
– Stig Rohde Døssing
Nov 23 '18 at 13:37
@StigRohdeDøssing updated the post with more info. I don't get any writing failure. Actually it only saves the first batch if there's already data in the topic when i run the topology. If the topic is empty and I push data when the topology is running, it doesn't write that batch either.
– Algeel
Nov 26 '18 at 8:48
The log you posted is helpful, but I think we need to enable debug logging for the master batch coordinator (the component that controls emit/commit behavior in Trident, you can see it in the log named something like $mastercoord). Could you enable debug logging for org.apache.storm.trident.topology.MasterBatchCoordinator and try again?
– Stig Rohde Døssing
Nov 26 '18 at 18:34
|
show 3 more comments
So I'm using the storm-kafka-client to subscribe my topology to a Kafka topic. Since I need exactly-once semantics I'm using the Opaque Spout, however the first transaction is never committed, it gets saved to the state and then it's replayed over and over again periodically. If I push more data into Kafka, this data is not being saved to the state either because the previous transaction hasn't been commited. The State I'm using is the Kafka State as well.
I'm using Strom 1.2.1 and Kafka 1.0.1 if that matters.
edit: this is the topology I'm using.
Config conf = new Config();
conf.setNumWorkers(NUM_WORKERS);
conf.setDebug(true);
conf.setMaxSpoutPending(MAX_SPOUT_PENDING);
KafkaSpoutConfig.Builder<String, String> spoutConfig = KafkaSpoutConfig.builder(KAFKA_BROKERS, KAFKA_TOPIC);
spoutConfig.setProp(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), SCHEMA_REGISTRY_URL);
spoutConfig.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
spoutConfig.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
spoutConfig.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE);
spoutConfig.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST);
spoutConfig.setOffsetCommitPeriodMs(5000);
KafkaTridentSpoutOpaque kafkaSpout = new KafkaTridentSpoutOpaque<> (spoutConfig.build());
TridentTopology topology = new TridentTopology();
Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_BROKERS);
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withProducerProperties(props)
.withKafkaTopicSelector(new DefaultTopicSelector("test"))
.withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("parsed_value", "parsed_value"));
Stream tramasRedis = topology.newStream("KAFKA_TRAMAS", kafkaSpout).parallelismHint(KAFKA_SPOUT_PARALELLISM)
.each(new Fields("key", "value"), new ReadRedisMessage(), new Fields("parsed_value"));
tramasRedis.partitionPersist(stateFactory, new Fields("parsed_value"), new TridentKafkaStateUpdater());
LocalCluster lc = new LocalCluster();
StormSubmitter.submitTopology(TOPOLOGY_NAME, conf, topology.build());
And this is what I get in the logs when I receive a message from Kafka in debug mode.
2018-11-26 08:20:16.166 o.a.s.d.task Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Emitting: spout-KAFKA_TRAMAS s1 [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]
2018-11-26 08:20:16.167 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] TRANSFERING tuple [dest: 5 tuple: source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]]
2018-11-26 08:20:16.168 o.a.s.d.task Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Emitting direct: 5; spout-KAFKA_TRAMAS $coord-bg0 [13:2, 1]
2018-11-26 08:20:16.168 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] TRANSFERING tuple [dest: 5 tuple: source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1]]
2018-11-26 08:20:16.169 o.a.s.d.task Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Emitting: spout-KAFKA_TRAMAS __ack_ack [-7985297489562004428 3421159403193702831]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] TRANSFERING tuple [dest: 3 tuple: source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831]]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Processing received message FOR 5 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] BOLT ack TASK: 6 TIME: -1 TUPLE: source: $spoutcoord-spout-KAFKA_TRAMAS:2, stream: $batch, id: {-7985297489562004428=4123209423349964546}, [13:2, [{partition=3, topic=redis-tramas-fnet}, {partition=2, topic=redis-tramas-fnet}, {partition=1, topic=redis-tramas-fnet}, {partition=0, topic=redis-tramas-fnet}, {partition=7, topic=redis-tramas-fnet}, {partition=6, topic=redis-tramas-fnet}, {partition=5, topic=redis-tramas-fnet}, {partition=4, topic=redis-tramas-fnet}, {partition=11, topic=redis-tramas-fnet}, {partition=10, topic=redis-tramas-fnet}, {partition=9, topic=redis-tramas-fnet}, {partition=8, topic=redis-tramas-fnet}, {partition=15, topic=redis-tramas-fnet}, {partition=14, topic=redis-tramas-fnet}, {partition=13, topic=redis-tramas-fnet}, {partition=12, topic=redis-tramas-fnet}]]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Execute done TUPLE source: $spoutcoord-spout-KAFKA_TRAMAS:2, stream: $batch, id: {-7985297489562004428=4123209423349964546}, [13:2, [{partition=3, topic=redis-tramas-fnet}, {partition=2, topic=redis-tramas-fnet}, {partition=1, topic=redis-tramas-fnet}, {partition=0, topic=redis-tramas-fnet}, {partition=7, topic=redis-tramas-fnet}, {partition=6, topic=redis-tramas-fnet}, {partition=5, topic=redis-tramas-fnet}, {partition=4, topic=redis-tramas-fnet}, {partition=11, topic=redis-tramas-fnet}, {partition=10, topic=redis-tramas-fnet}, {partition=9, topic=redis-tramas-fnet}, {partition=8, topic=redis-tramas-fnet}, {partition=15, topic=redis-tramas-fnet}, {partition=14, topic=redis-tramas-fnet}, {partition=13, topic=redis-tramas-fnet}, {partition=12, topic=redis-tramas-fnet}]] TASK: 6 DELTA: -1
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Processing received message FOR 6 TUPLE: source: $spoutcoord-spout-KAFKA_TRAMAS:2, stream: $batch, id: {-1505053285028712763=8306088157808961249}, [14:2, [{partition=3, topic=redis-tramas-fnet}, {partition=2, topic=redis-tramas-fnet}, {partition=1, topic=redis-tramas-fnet}, {partition=0, topic=redis-tramas-fnet}, {partition=7, topic=redis-tramas-fnet}, {partition=6, topic=redis-tramas-fnet}, {partition=5, topic=redis-tramas-fnet}, {partition=4, topic=redis-tramas-fnet}, {partition=11, topic=redis-tramas-fnet}, {partition=10, topic=redis-tramas-fnet}, {partition=9, topic=redis-tramas-fnet}, {partition=8, topic=redis-tramas-fnet}, {partition=15, topic=redis-tramas-fnet}, {partition=14, topic=redis-tramas-fnet}, {partition=13, topic=redis-tramas-fnet}, {partition=12, topic=redis-tramas-fnet}]]
2018-11-26 08:20:16.171 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Processing received message FOR 3 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831]
2018-11-26 08:20:16.171 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] BOLT ack TASK: 3 TIME: -1 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831]
2018-11-26 08:20:16.171 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Execute done TUPLE source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831] TASK: 3 DELTA: -1
2018-11-26 08:20:16.175 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] BOLT ack TASK: 5 TIME: 6 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]
2018-11-26 08:20:16.175 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Execute done TUPLE source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}] TASK: 5 DELTA: -1
2018-11-26 08:20:16.175 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Processing received message FOR 5 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1]
2018-11-26 08:20:16.175 o.a.s.d.task Thread-14-b-0-executor[5 5] [INFO] Emitting: b-0 __ack_ack [-7985297489562004428 1604111899640219309]
2018-11-26 08:20:16.176 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] TRANSFERING tuple [dest: 3 tuple: source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309]]
2018-11-26 08:20:16.176 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] BOLT ack TASK: 5 TIME: -1 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1]
2018-11-26 08:20:16.176 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Execute done TUPLE source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1] TASK: 5 DELTA: -1
2018-11-26 08:20:16.177 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Processing received message FOR 3 TUPLE: source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309]
2018-11-26 08:20:16.178 o.a.s.d.task Thread-6-__acker-executor[3 3] [INFO] Emitting direct: 1; __acker __ack_ack [-7985297489562004428 37198]
2018-11-26 08:20:16.178 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] TRANSFERING tuple [dest: 1 tuple: source: __acker:3, stream: __ack_ack, id: {}, [-7985297489562004428 37198]]
2018-11-26 08:20:16.178 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] BOLT ack TASK: 3 TIME: -1 TUPLE: source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309]
2018-11-26 08:20:16.178 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Execute done TUPLE source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309] TASK: 3 DELTA: -1
2018-11-26 08:20:16.181 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] Processing received message FOR 1 TUPLE: source: __acker:3, stream: __ack_ack, id: {}, [-7985297489562004428 37198]
2018-11-26 08:20:16.181 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] SPOUT Acking message -7985297489562004428 13:2
2018-11-26 08:20:16.561 o.a.s.d.task Thread-8-$mastercoord-bg0-executor[1 1] [INFO] Emitting: $mastercoord-bg0 $batch [88:1]
2018-11-26 08:20:16.562 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] TRANSFERING tuple [dest: 2 tuple: source: $mastercoord-bg0:1, stream: $batch, id: {-5934215212694827955=1827497614943510587}, [88:1]]
2018-11-26 08:20:16.562 o.a.s.d.task Thread-8-$mastercoord-bg0-executor[1 1] [INFO] Emitting: $mastercoord-bg0 __ack_init [-5934215212694827955 1827497614943510587 1]
2018-11-26 08:20:16.562 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] TRANSFERING tuple [dest: 3 tuple: source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1]]
2018-11-26 08:20:16.563 o.a.s.d.executor Thread-4-$spoutcoord-spout-KAFKA_TRAMAS-executor[2 2] [INFO] Processing received message FOR 2 TUPLE: source: $mastercoord-bg0:1, stream: $batch, id: {-5934215212694827955=1827497614943510587}, [88:1]
2018-11-26 08:20:16.565 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Processing received message FOR 3 TUPLE: source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1]
2018-11-26 08:20:16.565 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] BOLT ack TASK: 3 TIME: -1 TUPLE: source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1]
2018-11-26 08:20:16.565 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Execute done TUPLE source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1] TASK: 3 DELTA: -1
Activating the DEBUG logging on the MasterCoordinator I get these errors periodically.
2018-11-28 10:57:44.614 o.a.s.t.t.MasterBatchCoordinator Thread-8-$mastercoord-bg0-executor[1 1] [DEBUG] Fail. [tx_attempt = 18:4], [tx_status = null], [MasterBatchCoordinator{_states=[org.apache.storm.trident.topology.state.TransactionalState@6168a033], _activeTx={3=3:3 <PROCESSING>, 4=4:3 <PROCESSING>, 5=5:3 <PROCESSING>, 6=6:3 <PROCESSING>}, _attemptIds={3=3, 4=3, 5=3, 6=3, 7=3, 8=3, 9=3, 10=3, 11=3, 12=3, 13=3, 14=4, 15=4, 16=3, 17=3, 18=4, 19=7, 20=7, 21=7, 22=7, 23=7, 24=7, 25=7, 26=7, 27=7, 28=7, 29=7, 30=7, 31=7, 32=7, 33=7, 34=7, 35=7, 36=7, 37=7, 38=7, 39=7, 40=7, 41=7, 42=7, 43=6, 44=6, 45=5, 46=5, 47=5, 48=5, 49=5, 50=6, 51=5, 52=5, 53=5, 54=5, 55=5, 56=5, 57=5, 58=5, 59=5, 60=5, 61=5, 62=5, 63=5, 64=5, 65=5, 66=5, 67=5, 68=4, 69=4, 70=4, 71=4, 72=4, 73=4, 74=4, 75=4, 76=3, 77=3, 78=1, 79=1, 80=1, 81=1, 82=1, 83=1, 84=2, 85=1, 86=1, 87=1, 88=1, 89=1, 90=1, 91=1, 92=1, 93=1, 94=1, 95=1, 96=1, 97=1, 98=1, 99=1, 100=1, 101=0, 102=0}, _collector=org.apache.storm.spout.SpoutOutputCollector@1f58e2fd, _currTransaction=3, _maxTransactionActive=100, _coordinators=[org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Coordinator@6faffb26], _managedSpoutIds=[KAFKA_TRAMAS], _spouts=[org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor@59e6917f], _throttler=org.apache.storm.utils.WindowedTimeThrottler@7d2bdb48, _active=true}]
2018-11-28 10:57:44.616 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] SPOUT Failing -2574568553084235376: {:stream "$batch", :values [#object[org.apache.storm.trident.topology.TransactionAttempt 0x4fce1854 "19:6"]]} REASON: TIMEOUT MSG-ID: 19:6
Aparently the batches failing are empty. If there is data in the topic when the topology starts it works because there's no empty batches beforehand, however after the first batch is committed (the one with data), the empty batches start failing and new Kafka data is not being processed because there's uncommitted transactions. If the topic is empty, it just fails constantly and it doesn't push anything to Kafka.
java apache-kafka apache-storm
So I'm using the storm-kafka-client to subscribe my topology to a Kafka topic. Since I need exactly-once semantics I'm using the Opaque Spout, however the first transaction is never committed, it gets saved to the state and then it's replayed over and over again periodically. If I push more data into Kafka, this data is not being saved to the state either because the previous transaction hasn't been commited. The State I'm using is the Kafka State as well.
I'm using Strom 1.2.1 and Kafka 1.0.1 if that matters.
edit: this is the topology I'm using.
Config conf = new Config();
conf.setNumWorkers(NUM_WORKERS);
conf.setDebug(true);
conf.setMaxSpoutPending(MAX_SPOUT_PENDING);
KafkaSpoutConfig.Builder<String, String> spoutConfig = KafkaSpoutConfig.builder(KAFKA_BROKERS, KAFKA_TOPIC);
spoutConfig.setProp(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), SCHEMA_REGISTRY_URL);
spoutConfig.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
spoutConfig.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
spoutConfig.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE);
spoutConfig.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST);
spoutConfig.setOffsetCommitPeriodMs(5000);
KafkaTridentSpoutOpaque kafkaSpout = new KafkaTridentSpoutOpaque<> (spoutConfig.build());
TridentTopology topology = new TridentTopology();
Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_BROKERS);
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withProducerProperties(props)
.withKafkaTopicSelector(new DefaultTopicSelector("test"))
.withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("parsed_value", "parsed_value"));
Stream tramasRedis = topology.newStream("KAFKA_TRAMAS", kafkaSpout).parallelismHint(KAFKA_SPOUT_PARALELLISM)
.each(new Fields("key", "value"), new ReadRedisMessage(), new Fields("parsed_value"));
tramasRedis.partitionPersist(stateFactory, new Fields("parsed_value"), new TridentKafkaStateUpdater());
LocalCluster lc = new LocalCluster();
StormSubmitter.submitTopology(TOPOLOGY_NAME, conf, topology.build());
And this is what I get in the logs when I receive a message from Kafka in debug mode.
2018-11-26 08:20:16.166 o.a.s.d.task Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Emitting: spout-KAFKA_TRAMAS s1 [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]
2018-11-26 08:20:16.167 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] TRANSFERING tuple [dest: 5 tuple: source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]]
2018-11-26 08:20:16.168 o.a.s.d.task Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Emitting direct: 5; spout-KAFKA_TRAMAS $coord-bg0 [13:2, 1]
2018-11-26 08:20:16.168 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] TRANSFERING tuple [dest: 5 tuple: source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1]]
2018-11-26 08:20:16.169 o.a.s.d.task Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Emitting: spout-KAFKA_TRAMAS __ack_ack [-7985297489562004428 3421159403193702831]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] TRANSFERING tuple [dest: 3 tuple: source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831]]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Processing received message FOR 5 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] BOLT ack TASK: 6 TIME: -1 TUPLE: source: $spoutcoord-spout-KAFKA_TRAMAS:2, stream: $batch, id: {-7985297489562004428=4123209423349964546}, [13:2, [{partition=3, topic=redis-tramas-fnet}, {partition=2, topic=redis-tramas-fnet}, {partition=1, topic=redis-tramas-fnet}, {partition=0, topic=redis-tramas-fnet}, {partition=7, topic=redis-tramas-fnet}, {partition=6, topic=redis-tramas-fnet}, {partition=5, topic=redis-tramas-fnet}, {partition=4, topic=redis-tramas-fnet}, {partition=11, topic=redis-tramas-fnet}, {partition=10, topic=redis-tramas-fnet}, {partition=9, topic=redis-tramas-fnet}, {partition=8, topic=redis-tramas-fnet}, {partition=15, topic=redis-tramas-fnet}, {partition=14, topic=redis-tramas-fnet}, {partition=13, topic=redis-tramas-fnet}, {partition=12, topic=redis-tramas-fnet}]]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Execute done TUPLE source: $spoutcoord-spout-KAFKA_TRAMAS:2, stream: $batch, id: {-7985297489562004428=4123209423349964546}, [13:2, [{partition=3, topic=redis-tramas-fnet}, {partition=2, topic=redis-tramas-fnet}, {partition=1, topic=redis-tramas-fnet}, {partition=0, topic=redis-tramas-fnet}, {partition=7, topic=redis-tramas-fnet}, {partition=6, topic=redis-tramas-fnet}, {partition=5, topic=redis-tramas-fnet}, {partition=4, topic=redis-tramas-fnet}, {partition=11, topic=redis-tramas-fnet}, {partition=10, topic=redis-tramas-fnet}, {partition=9, topic=redis-tramas-fnet}, {partition=8, topic=redis-tramas-fnet}, {partition=15, topic=redis-tramas-fnet}, {partition=14, topic=redis-tramas-fnet}, {partition=13, topic=redis-tramas-fnet}, {partition=12, topic=redis-tramas-fnet}]] TASK: 6 DELTA: -1
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Processing received message FOR 6 TUPLE: source: $spoutcoord-spout-KAFKA_TRAMAS:2, stream: $batch, id: {-1505053285028712763=8306088157808961249}, [14:2, [{partition=3, topic=redis-tramas-fnet}, {partition=2, topic=redis-tramas-fnet}, {partition=1, topic=redis-tramas-fnet}, {partition=0, topic=redis-tramas-fnet}, {partition=7, topic=redis-tramas-fnet}, {partition=6, topic=redis-tramas-fnet}, {partition=5, topic=redis-tramas-fnet}, {partition=4, topic=redis-tramas-fnet}, {partition=11, topic=redis-tramas-fnet}, {partition=10, topic=redis-tramas-fnet}, {partition=9, topic=redis-tramas-fnet}, {partition=8, topic=redis-tramas-fnet}, {partition=15, topic=redis-tramas-fnet}, {partition=14, topic=redis-tramas-fnet}, {partition=13, topic=redis-tramas-fnet}, {partition=12, topic=redis-tramas-fnet}]]
2018-11-26 08:20:16.171 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Processing received message FOR 3 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831]
2018-11-26 08:20:16.171 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] BOLT ack TASK: 3 TIME: -1 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831]
2018-11-26 08:20:16.171 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Execute done TUPLE source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831] TASK: 3 DELTA: -1
2018-11-26 08:20:16.175 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] BOLT ack TASK: 5 TIME: 6 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]
2018-11-26 08:20:16.175 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Execute done TUPLE source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}] TASK: 5 DELTA: -1
2018-11-26 08:20:16.175 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Processing received message FOR 5 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1]
2018-11-26 08:20:16.175 o.a.s.d.task Thread-14-b-0-executor[5 5] [INFO] Emitting: b-0 __ack_ack [-7985297489562004428 1604111899640219309]
2018-11-26 08:20:16.176 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] TRANSFERING tuple [dest: 3 tuple: source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309]]
2018-11-26 08:20:16.176 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] BOLT ack TASK: 5 TIME: -1 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1]
2018-11-26 08:20:16.176 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Execute done TUPLE source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1] TASK: 5 DELTA: -1
2018-11-26 08:20:16.177 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Processing received message FOR 3 TUPLE: source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309]
2018-11-26 08:20:16.178 o.a.s.d.task Thread-6-__acker-executor[3 3] [INFO] Emitting direct: 1; __acker __ack_ack [-7985297489562004428 37198]
2018-11-26 08:20:16.178 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] TRANSFERING tuple [dest: 1 tuple: source: __acker:3, stream: __ack_ack, id: {}, [-7985297489562004428 37198]]
2018-11-26 08:20:16.178 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] BOLT ack TASK: 3 TIME: -1 TUPLE: source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309]
2018-11-26 08:20:16.178 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Execute done TUPLE source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309] TASK: 3 DELTA: -1
2018-11-26 08:20:16.181 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] Processing received message FOR 1 TUPLE: source: __acker:3, stream: __ack_ack, id: {}, [-7985297489562004428 37198]
2018-11-26 08:20:16.181 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] SPOUT Acking message -7985297489562004428 13:2
2018-11-26 08:20:16.561 o.a.s.d.task Thread-8-$mastercoord-bg0-executor[1 1] [INFO] Emitting: $mastercoord-bg0 $batch [88:1]
2018-11-26 08:20:16.562 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] TRANSFERING tuple [dest: 2 tuple: source: $mastercoord-bg0:1, stream: $batch, id: {-5934215212694827955=1827497614943510587}, [88:1]]
2018-11-26 08:20:16.562 o.a.s.d.task Thread-8-$mastercoord-bg0-executor[1 1] [INFO] Emitting: $mastercoord-bg0 __ack_init [-5934215212694827955 1827497614943510587 1]
2018-11-26 08:20:16.562 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] TRANSFERING tuple [dest: 3 tuple: source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1]]
2018-11-26 08:20:16.563 o.a.s.d.executor Thread-4-$spoutcoord-spout-KAFKA_TRAMAS-executor[2 2] [INFO] Processing received message FOR 2 TUPLE: source: $mastercoord-bg0:1, stream: $batch, id: {-5934215212694827955=1827497614943510587}, [88:1]
2018-11-26 08:20:16.565 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Processing received message FOR 3 TUPLE: source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1]
2018-11-26 08:20:16.565 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] BOLT ack TASK: 3 TIME: -1 TUPLE: source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1]
2018-11-26 08:20:16.565 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Execute done TUPLE source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1] TASK: 3 DELTA: -1
Activating the DEBUG logging on the MasterCoordinator I get these errors periodically.
2018-11-28 10:57:44.614 o.a.s.t.t.MasterBatchCoordinator Thread-8-$mastercoord-bg0-executor[1 1] [DEBUG] Fail. [tx_attempt = 18:4], [tx_status = null], [MasterBatchCoordinator{_states=[org.apache.storm.trident.topology.state.TransactionalState@6168a033], _activeTx={3=3:3 <PROCESSING>, 4=4:3 <PROCESSING>, 5=5:3 <PROCESSING>, 6=6:3 <PROCESSING>}, _attemptIds={3=3, 4=3, 5=3, 6=3, 7=3, 8=3, 9=3, 10=3, 11=3, 12=3, 13=3, 14=4, 15=4, 16=3, 17=3, 18=4, 19=7, 20=7, 21=7, 22=7, 23=7, 24=7, 25=7, 26=7, 27=7, 28=7, 29=7, 30=7, 31=7, 32=7, 33=7, 34=7, 35=7, 36=7, 37=7, 38=7, 39=7, 40=7, 41=7, 42=7, 43=6, 44=6, 45=5, 46=5, 47=5, 48=5, 49=5, 50=6, 51=5, 52=5, 53=5, 54=5, 55=5, 56=5, 57=5, 58=5, 59=5, 60=5, 61=5, 62=5, 63=5, 64=5, 65=5, 66=5, 67=5, 68=4, 69=4, 70=4, 71=4, 72=4, 73=4, 74=4, 75=4, 76=3, 77=3, 78=1, 79=1, 80=1, 81=1, 82=1, 83=1, 84=2, 85=1, 86=1, 87=1, 88=1, 89=1, 90=1, 91=1, 92=1, 93=1, 94=1, 95=1, 96=1, 97=1, 98=1, 99=1, 100=1, 101=0, 102=0}, _collector=org.apache.storm.spout.SpoutOutputCollector@1f58e2fd, _currTransaction=3, _maxTransactionActive=100, _coordinators=[org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Coordinator@6faffb26], _managedSpoutIds=[KAFKA_TRAMAS], _spouts=[org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor@59e6917f], _throttler=org.apache.storm.utils.WindowedTimeThrottler@7d2bdb48, _active=true}]
2018-11-28 10:57:44.616 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] SPOUT Failing -2574568553084235376: {:stream "$batch", :values [#object[org.apache.storm.trident.topology.TransactionAttempt 0x4fce1854 "19:6"]]} REASON: TIMEOUT MSG-ID: 19:6
Aparently the batches failing are empty. If there is data in the topic when the topology starts it works because there's no empty batches beforehand, however after the first batch is committed (the one with data), the empty batches start failing and new Kafka data is not being processed because there's uncommitted transactions. If the topic is empty, it just fails constantly and it doesn't push anything to Kafka.
java apache-kafka apache-storm
java apache-kafka apache-storm
edited Nov 28 '18 at 11:32
Algeel
asked Nov 23 '18 at 11:06
AlgeelAlgeel
12
12
Is it the storm-kafka or storm-kafka-client spout?
– Stig Rohde Døssing
Nov 23 '18 at 13:23
@StigRohdeDøssing it's the storm-kafka-client
– Algeel
Nov 23 '18 at 13:31
Are you seeing anything in the worker logs? The State should log if it fails to write to Kafka. Also try posting your topology setup.
– Stig Rohde Døssing
Nov 23 '18 at 13:37
@StigRohdeDøssing updated the post with more info. I don't get any writing failure. Actually it only saves the first batch if there's already data in the topic when i run the topology. If the topic is empty and I push data when the topology is running, it doesn't write that batch either.
– Algeel
Nov 26 '18 at 8:48
The log you posted is helpful, but I think we need to enable debug logging for the master batch coordinator (the component that controls emit/commit behavior in Trident, you can see it in the log named something like $mastercoord). Could you enable debug logging for org.apache.storm.trident.topology.MasterBatchCoordinator and try again?
– Stig Rohde Døssing
Nov 26 '18 at 18:34
|
show 3 more comments
Is it the storm-kafka or storm-kafka-client spout?
– Stig Rohde Døssing
Nov 23 '18 at 13:23
@StigRohdeDøssing it's the storm-kafka-client
– Algeel
Nov 23 '18 at 13:31
Are you seeing anything in the worker logs? The State should log if it fails to write to Kafka. Also try posting your topology setup.
– Stig Rohde Døssing
Nov 23 '18 at 13:37
@StigRohdeDøssing updated the post with more info. I don't get any writing failure. Actually it only saves the first batch if there's already data in the topic when i run the topology. If the topic is empty and I push data when the topology is running, it doesn't write that batch either.
– Algeel
Nov 26 '18 at 8:48
The log you posted is helpful, but I think we need to enable debug logging for the master batch coordinator (the component that controls emit/commit behavior in Trident, you can see it in the log named something like $mastercoord). Could you enable debug logging for org.apache.storm.trident.topology.MasterBatchCoordinator and try again?
– Stig Rohde Døssing
Nov 26 '18 at 18:34
Is it the storm-kafka or storm-kafka-client spout?
– Stig Rohde Døssing
Nov 23 '18 at 13:23
Is it the storm-kafka or storm-kafka-client spout?
– Stig Rohde Døssing
Nov 23 '18 at 13:23
@StigRohdeDøssing it's the storm-kafka-client
– Algeel
Nov 23 '18 at 13:31
@StigRohdeDøssing it's the storm-kafka-client
– Algeel
Nov 23 '18 at 13:31
Are you seeing anything in the worker logs? The State should log if it fails to write to Kafka. Also try posting your topology setup.
– Stig Rohde Døssing
Nov 23 '18 at 13:37
Are you seeing anything in the worker logs? The State should log if it fails to write to Kafka. Also try posting your topology setup.
– Stig Rohde Døssing
Nov 23 '18 at 13:37
@StigRohdeDøssing updated the post with more info. I don't get any writing failure. Actually it only saves the first batch if there's already data in the topic when i run the topology. If the topic is empty and I push data when the topology is running, it doesn't write that batch either.
– Algeel
Nov 26 '18 at 8:48
@StigRohdeDøssing updated the post with more info. I don't get any writing failure. Actually it only saves the first batch if there's already data in the topic when i run the topology. If the topic is empty and I push data when the topology is running, it doesn't write that batch either.
– Algeel
Nov 26 '18 at 8:48
The log you posted is helpful, but I think we need to enable debug logging for the master batch coordinator (the component that controls emit/commit behavior in Trident, you can see it in the log named something like $mastercoord). Could you enable debug logging for org.apache.storm.trident.topology.MasterBatchCoordinator and try again?
– Stig Rohde Døssing
Nov 26 '18 at 18:34
The log you posted is helpful, but I think we need to enable debug logging for the master batch coordinator (the component that controls emit/commit behavior in Trident, you can see it in the log named something like $mastercoord). Could you enable debug logging for org.apache.storm.trident.topology.MasterBatchCoordinator and try again?
– Stig Rohde Døssing
Nov 26 '18 at 18:34
|
show 3 more comments
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53445535%2ftrident-opaquekafkaspout-transactions-not-committed%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53445535%2ftrident-opaquekafkaspout-transactions-not-committed%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Is it the storm-kafka or storm-kafka-client spout?
– Stig Rohde Døssing
Nov 23 '18 at 13:23
@StigRohdeDøssing it's the storm-kafka-client
– Algeel
Nov 23 '18 at 13:31
Are you seeing anything in the worker logs? The State should log if it fails to write to Kafka. Also try posting your topology setup.
– Stig Rohde Døssing
Nov 23 '18 at 13:37
@StigRohdeDøssing updated the post with more info. I don't get any writing failure. Actually it only saves the first batch if there's already data in the topic when i run the topology. If the topic is empty and I push data when the topology is running, it doesn't write that batch either.
– Algeel
Nov 26 '18 at 8:48
The log you posted is helpful, but I think we need to enable debug logging for the master batch coordinator (the component that controls emit/commit behavior in Trident, you can see it in the log named something like $mastercoord). Could you enable debug logging for org.apache.storm.trident.topology.MasterBatchCoordinator and try again?
– Stig Rohde Døssing
Nov 26 '18 at 18:34