val helloStream: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String]("hello", valueDeserializer, kafkaProps) // 指定消费策略 helloStream.setStartFromEarliest() // - 从最早的记录开始; helloStream.setStartFromLatest() //- 从最新记录开始; helloStream.setStartFromTimestamp(null); // 从指定的epoch时间戳(毫秒)开始; helloStream.setStartFromGroupOffsets(); // 默认行为,从上次消费的偏移量进行继续消费。 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition val specificStartOffsets = new mutable.HashMap[KafkaTopicPartition,Long]() specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L) // 第一个分区从23L开始 specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L) // 第二个分区从31L开始 specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L) // 第三个分区从43L开始 helloStream.setStartFromSpecificOffsets(specificStartOffsets) // Kafka支持Topic自动发现,也就是用正则的方式创建FlinkKafkaConsumer