<dependency> <groupId>org.apache.skywalking</groupId> <artifactId>apm-toolkit-kafka</artifactId> <version>${skywalking.version}</version> </dependency>
public class ConsumerThread2 extends Thread { @Override public void run() { Properties consumerProperties = new Properties(); //...consumerProperties.put() KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties); consumer.subscribe(topicPattern, new NoOpConsumerRebalanceListener()); while (true) { if (pollAndInvoke(consumer)) break; } consumer.close(); } @KafkaPollAndInvoke private boolean pollAndInvoke(KafkaConsumer<String, String> consumer) { try { Thread.sleep(1000); } catch (InterruptedException e) { } ConsumerRecords<String, String> records = consumer.poll(100); if (!records.isEmpty()) { OkHttpClient client = new OkHttpClient.Builder().build(); Request request = new Request.Builder().url("http://localhost:8080/kafka-scenario/case/kafka-thread2-ping").build(); Response response = null; try { response = client.newCall(request).execute(); } catch (IOException e) { } response.body().close(); return true; } return false; } }
Sample codes only