修改 pom.xml
加入 kafka 依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<version>2.7.3</version>
</dependency>
添加 kafka.xml
配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="producerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
<entry key="retries" value="3" />
<entry key="ack" value="1" />
<entry key="buffer.memory" value="33554432" />
<entry key="key.serializer"
value="org.apache.kafka.common.serialization.StringSerializer" />
<entry key="value.serializer"
value="org.apache.kafka.common.serialization.StringSerializer" />
</map>
</constructor-arg>
</bean>
<bean id="producerFactory"
class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<ref bean="producerProperties" />
</constructor-arg>
</bean>
<bean id="kafkaTemplate"
class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory" />
<property name="defaultTopic" value="defaultTopic" />
</bean>
</beans>
测试示例
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.inke.streaming.spi.hook.HookMessage;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.util.UUID;
@RunWith(value = SpringJUnit4ClassRunner.class)
@ContextConfiguration(value = {"classpath:kafka.xml"})
@Slf4j
public class KafkaHookTest
{
@Autowired
private KafkaTemplate kafkaTemplate;
@Test
public void test()
{
Injector injector = Guice.createInjector(new KafkaHookModule());
KafkaHook kafkaHook = injector.getInstance(KafkaHook.class);
HookMessage hookMessage = new HookMessage();
hookMessage.setInstance(kafkaTemplate);
hookMessage.setUnique(UUID.randomUUID().toString());
hookMessage.setState("SUCCESS");
hookMessage.setTopic("kafka-report");
kafkaHook.execute(hookMessage);
}
}
评论区