zookeeper 설치
https://archive.apache.org/dist/zookeeper/stable/
카프카 설치 (얘만 쓰는 것 같은데)
카프카를 쓰는 이유?
잘은 몰라요 ㅋㅋㅋㅋㅋㅋ
war 파일 두개 이상 있을때 Rest API 로도 두개의 war 파일을
옮겨 다닐수 있지만
카프카도 좋은듯 네카라쿠베인지 뭔지에서 많이들 쓰신다는디
버전은 3.3.1 버전 사용했습니다.
cmd 에서는 이렇게
# zookeeper 서버 띄우기
C:\kafka\kafka_2.13-3.3.1> .\bin\windows\zookeeper-server-start.bat config\zookeeper.properties
# kafka 서버 띄우기
C:\kafka\kafka_2.13-3.3.1> .\bin\windows\kafka-server-start.bat config\server.properties
# 토픽 생성
C:\kafka\kafka_2.13-3.3.1\bin\windows> .\kafka-topics.bat --create --topic [topic name] --bootstrap-server [host]:[port] --partition 1
# producer 로 topic 메시지 전달
C:\kafka\kafka_2.13-3.3.1\bin\windows> .\kafka-console-producer.bat --broker-list localhost:9092 --topic new-topic
이제 Springboot 들어가보죠
서버 두개 띄워주신 상태로
dependencies {
// todo: logback , log4jdbc 설정
implementation 'org.bgee.log4jdbc-log4j2:log4jdbc-log4j2-jdbc4.1:1.16'
// todo: mysql
implementation 'mysql:mysql-connector-java:8.0.32'
// kafka
implementation 'org.springframework.kafka:spring-kafka'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
implementation 'org.springframework.boot:spring-boot-starter-web'
compileOnly 'org.projectlombok:lombok'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.projectlombok:lombok'
providedRuntime 'org.springframework.boot:spring-boot-starter-tomcat'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
}
application.properties
# Kafaka producer
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# kafka consumer
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=foo
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.auto-offset-reset=earliest
config 파일은 두개!
Consumer 와 Producer
# KafkaConsumerConfig
@EnableKafka //kafka 설정 추가
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory
<String, String> consumerFactory(){
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); //kafka 실행 서버 ip
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId"); //Consumer들을 그룹핑 할수 있다.
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); //KEY 값을 String de serializer로 지정
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); //VALUE 값을 String de serializer로 지정
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory()); //위에서 설정한 consumerFactory를 설정해줌
return kafkaListenerContainerFactory;
}
}
# KafkaProducerConfig
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory(){
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); //kafka 실행 서버 ip
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //KEY 값을 String serializer로 지정
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //VALUE 값을 String serializer로 지정
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
Service 도 두개
# KafkaConsumerService
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaConsumerService {
private final PeopleRepository repository;
@KafkaListener(topics = "new-topic")
public void updateQty(String kafkaMessage){
log.info("kafka message = {}", kafkaMessage);
//kafka 메세지 역 직렬화
Map<Object, Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
System.out.println("kafka message = " + kafkaMessage);
}
}
# KafkaProducerService
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public People send(String topic, People people){
ObjectMapper mapper = new ObjectMapper();
//json format으로 변경
String json = "";
try {
json = mapper.writeValueAsString(people);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
//kafka 메세지 전송
kafkaTemplate.send(topic, json);
log.info("Kafka Producer send data from the order service = {}",people);
return people;
}
}
이제 Contoller
# KafkaController
@RestController
@RequestMapping("/api")
public class KafkaController {
@Autowired
KafkaProducerService kafkaProducer;
@PostMapping("/kafka")
public ResponseEntity<Object> create(@RequestBody People people) {
//kafka 로직 추가
kafkaProducer.send("new-topic", people);
return ResponseEntity.status(HttpStatus.OK).body(people);
}
}
이게 끝입니다.
@kafkaListener 어노테이션으로 kafka에서 값을 받아오는? 그런 코드
그럼 받아보죠
People 이라는 객체를 직렬화 시켜 String 으로 받아옴
끝
'SpringBoot' 카테고리의 다른 글
Springboot - Security (1) | 2023.12.14 |
---|---|
Springboot - NoSQL Redis (0) | 2023.12.14 |
MySQL + Springboot JPA + logback (1) | 2023.12.12 |
SpringBoot - MyBatis (RestController) (0) | 2023.10.16 |
SpringBoot - SSR , SCR (개념 간략) (0) | 2023.10.04 |