object MyKafkaSenderUtil {
    val props = new Properties()
    // Kafka服务端的主机名和端口号
    props.put("bootstrap.servers", "hadoop201:9092,hadoop202:9092,hadoop203:9092")
    // key序列化
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    // value序列化
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    
    private val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)
    
    def send(topic: String, content: String) = {
        producer.send(new ProducerRecord[String, String](topic, content))
    }
    
    def main(args: Array[String]): Unit = {
        send("order0508","chenchi")
    }
}

上面是kafka生产者代码, 看着好像没错,其实里面有个大坑,

说下问题 现象,测试的时候想看能否正常生产数据到kafka,结果奇了怪了,控制台一直不报错,服务器上kafka一直不消费??

心想可能是kafka客户端问题,这不简单,直接在linux上起生产消费,结果正常

看了日志,看了topic里面的偏移量,没问题呀。

后来反复看,发现问题 是kafka没有 produce.flush  其实是没有producer.close

kafka是积累了一批数据到达一定的时数量或者一定时间段后会自动flush,但是我们才生产了1条数据,时间不够,数量也不够,所以消息全部积压了,这个时候需要手动把消息推送到消费端,所以需要flush一下

备注:以前觉得各种连接 con.close 不关会内存泄漏,其实测试的时候不关也没啥事,但是这个还是不好的习惯

 

 

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐