이벤트스토밍 결과를 소스 코드로 변환 및 실습
Updated:
이벤트스토밍 결과를 소스 코드로 변환 및 실습
Event Driven Architecture의 설계 기법인 이벤트 스토밍의 결과를 Spring Boot(스프링 부트)와 EventDriven 방식을 사용하여, 실제 코드로 변환하는 실습 관련 내용을 살펴보겠습니다.
이벤트스토밍 : Type of stickers
Aggregate (Yellow Sticker)
1) 이벤트 스토밍의 첫번째 구현은 도메인 모델을 정의하는 단계입니다. 노란색 스티커로 붙여진 Aggregate 의 변화에 의하여 이벤트가 생성되고, 커맨드 요청을 받아서 Aggregate 가 변화를 하기 때문에, Aggregate 는 가장 중요하고, 가장 먼저 구현을 합니다.
2) Aggregate 를 구현 시, Java 언어로 Spring-boot 를 사용하여 구현을 한다면 아래와 같이 Java 클래스에 @Entity 어노테이션을 선언해 주면 준비가 완료 됩니다.
- Entity 라는 말은 객체, 혹은 개체 라는 의미로 사용되는데 연필이나 컴퓨터 처럼 서로 구별이 되는 하나하나의 대상을 지칭합니다. 즉 우리가 사용하는 도메인 언어입니다.
- Spring 에서는 이런 도메인 언어를 손쉽게 사용할 수 있도록 어노테이션을 제공하여 줍니다.
3) 어노테이션 선언 후에 Entity 구성에 필요한 속성들을 정의하여 주면 됩니다. 속성들은 java 언어에서는 변수로 선언을 해주면 됩니다.
4) 모든 Entity 는 생성되고, 변화되고, 사라지는 lifecycle 을 가지고 있고, 속성이 있기에 프로그래밍 언어로 보았을 때 데이터베이스와 API 와의 매칭이 당연시 됩니다.
- 해당 API 가 Command 가 됩니다.
5) Aggregate 에 포함된 특정 Entity 를 RootEntity 혹은 AggregateRoot 라고 부릅니다.
아래 그림은 노란색의 이벤트 스토밍 스티커인 Aggregate 를 구현한 모습입니다.
실습 진행
- Aggregate 는 이벤트 스토밍의 노란색 결과 입니다.
- Product Class 를 생성 합니다.
- 상품 Entity 를 id, name, stock 맴버 변수를 가진 정의하고 get,set 메서드를 생성하여 줍니다.
- 클레스 상단에 @Entity 어노테이션을 달아서 Aggregate 선언을 하여 줍니다.
- @Entity 어노테이션은 JPA 방식을 사용합니다. 이는 Id 값이 필수입니다.
- id 로 선언한 변수에 @Id @GeneratedValue 를 선언하여 줍니다.
@Entity
public class Product {
@Id @GeneratedValue
Long id;
String name;
int stock;
// get/set 메서드
}
Command (Sky-Blue Sticker)
1) Aggregate 를 구성하였으면, 해당 Aggregate 를 변화시키는 커맨드를 작성합니다.
2) 파란색 스티커에 해당하는 Command 는 구현관점으로 보았을 때 외부로부터 들어오는 API 에 해당 됩니다.
3) DDD 에서는 Aggregate 를 변화시키는 채널을 Repository 라고 합니다. 그리고 RootEntity 에서만 Repository 를 제공하라고 가이드 되어 있습니다.
- 이는 하위 엔터티에서 Repository 를 제공하게 되면 해당 엔터티가 별도로 CRUD 가 이루어지게 되고, 이는 전체 Aggregate 의 라이프 싸이클이 깨지는 결과를 가져옵니다.
4) Spring-Data-Rest 에서는 Repository 를 구성하는 방법을 @Repository 라는 어노테이션 선언하여 구성하거나 혹은 extends Repository 같은 방식으로 구현하면 됩니다.
5) Spring-Data-Rest 를 사용하여 Repository Pattern 으로 프로그램을 구현하면 Entity 의 lifecycle 에 해당하는 기본적인 CRUD 가 바로 생성이 되고, 해당 CRUD 에 해당하는 API (커맨드) 가 자동으로 생성됩니다.
6) Repository Pattern 으로 구성이 안되고 복잡한 비지니스 로직이 있으면 Spring 에서 MVC 패턴으로 나온
Controller 와 Service 로 구현하면 됩니다.
아래는 파란색 스티커인 커맨드를 구현한 코드입니다.
실습 진행
- ProductRepository interface 를 생성하여 줍니다.
- CrudRepository<Product, Long> 를 extends 하여 줍니다.
- CrudRepository<> 의 두개의 변수는 Entity Type과 Primary Key(Entity Id) Type 입니다.
- 위와같이 선언만 하면, Entity 의 Repository 패턴이 자동 생성되어, Product 엔터티의 CRUD에 해당되는 API 가 자동으로 생성이 됩니다.
public interface ProductRepository extends CrudRepository<Product, Long> {
}
- 실행시 기본 포트인 8080 으로 실행됩니다.
- http 명령으로 localhost:8080 을 호출하여 봅니다.
- 기본적인 CRUD 를 호출하여 봅니다.
http localhost:8080
http http://localhost:8080/products
http POST localhost:8080/products name="TV" stock=10
http "http://localhost:8080/products/1"
http PATCH "http://localhost:8080/products/1" stock=20
http DELETE "http://localhost:8080/products/1"
http "http://localhost:8080/products/1"
- 파일 두개만 만들었지만 Aggregate 와 Command 가 생성 된 것을 확인 할 수 있습니다.
Event (Orange Sticker)
1) 주황색 스티커인 이벤트는 pojo 객체인 Java Class 로 생성을 합니다.
- 실제로 메세지로 주고 받을 때는 json 객체 형식으로 통신하는 방법을 추천합니다. Json 객체를 직접 만들거나, String 변수처럼 바로 선언을 할 수도 있으나, 클래스 파일로 구성을 해 놓았을 시, 명시적이고 쉽게 변경하기 어렵습니다.
2) 이벤트는 Aggregate 의 변화에 의해서 발생하기 때문에, 이벤트를 보내는 로직은 Entity의 lifecycle 에 작성을 하게 됩니다. 물론 비지니스 로직의 중간중간에 이벤트를 발생시켜야 한다면, 따로 서비스에서 처리를 하는 것이 맞으나, DDD 에서 말하는 주요 문구인 ‘도메인을 보았을 때 비지니스가 보여야 한다’는 원칙에 맞추어 Entity 에서 이벤트를 발생하는 방법을 추천합니다.
3) JPA 에서는 이러한 Entity 의 lifecycle에 해당하는 listener 를 어노테이션으로 생성하여 놓았습니다. 대표적으로 @PostPersist (저장후) @PrePersist(저장전) @PostUpdate (업데이트후) 등이 있습니다.
4) 메시지를 send 하는 방법은 라이브러리별로 사용법이 달라지지만, 메시지 브로커를 kafka 를 사용한다면 topic 을 설정하고, 마지막에 send 하는 형식으로 이벤트를 발행합니다.
실습 진행
- 이벤트는 일어난 사실에 대한 결과이기 때문에 과거분사(PP, Past Participle) 형으로 작성 합니다.
- 상품 정보가 변경 되었을 때 변경 사실을 알리는 ProductChanged 이벤트를 만들어 봅니다.
- ProductChanged 클레스를 생성하고, 변수를 설정합니다.
- 이벤트는 다른 서비스에서 받아보는 정보입니다. 그렇기 때문에 자세하게 적어주어야 할 필요가 있습니다.
- json 으로 데이터를 보내기 때문에 eventType 이라는 변수를 만들고, 생성자에서 이벤트 이름을 적어 줍니다.
- 세부 정보도 다른 서비스에서 명확히 이해하기 쉽도록 그냥 name 이 아닌 productName 처럼 구체적으로 작성 합니다.
public class ProductChanged {
String eventType;
Long productId;
String productName;
int productStock;
public ProductChanged(){
this.eventType = this.getClass().getSimpleName();
}
// get/set 메서드
}
- 이벤트는 Aggregate 내의 상태 변화에 의해서 발생하기 때문에, 이벤트를 보내는 로직은 Entity의 lifecycle 에 작성하게 됩니다.
- Product.java 에 데이터가 입력되었을때의 Lifecycle 인 @PostPersist 어노테이션에 이벤트를 생성하여 값을 셋팅 합니다.
- ObjectMapper 를 사용하여 json 으로 변환 합니다.
@PostPersist
public void eventPublish(){
ProductChanged productChanged = new ProductChanged();
productChanged.setProductId(this.getId());
productChanged.setProductName(this.getName());
productChanged.setProductStock(this.getStock());
ObjectMapper objectMapper = new ObjectMapper();
String json = null;
try {
json = objectMapper.writeValueAsString(productChanged);
} catch (JsonProcessingException e) {
throw new RuntimeException("JSON format exception", e);
}
System.out.println(json);
}
- 서비스를 재시작 후 Aggregate 에 데이터를 입력하여 정상적으로 json 이 생성되는지 확인 합니다.
http POST localhost:8080/products name=“TV” stock=10
{"eventType":"ProductChanged","productId":1,"productName":"TV","productStock":10}
서비스에 카프카 연결
- Spring Cloud Stream Application 모델
- Spring Cloud Streams Application에서 Kafka 바인더를 사용하기 위하여 다음 라이브러리를 pom.xml 에 추가합니다.
<!-- kafka streams -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
- spring cloud 는 spring-boot 와 버전에 대한 종속성이 있습니다. 그리하여 각각의 spring-cloud 프로젝트 별로 버전을 직접 명시하지 않고, 종속성을 선언하는 변수(properties)를 사용하여야 합니다.
- 아래와 같이 를 pom.xml 에 추가하여 줍니다.
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
- pom.xml 에서 ${} 로 시작하는 부분은 변수(properties) 처리를 하겠다는 의미입니다. 상단의 부분에 위에서 변수처리함
를 추가하여 줍니다. - 여기서 버전을 명시할때 주의할 점은 Spring-boot에 매핑되는 Spring-cloud 버전을 사용해야 합니다.
- 매핑되는 버전 정보는 스프링 클라우드 Site에서 확인 할 수 있습니다.
- https://spring.io/projects/spring-cloud 의 Release Trains 참고
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Hoxton.SR3</spring-cloud.version>
</properties>
- DemoApplication.java 파일에 스트림을 바인딩 합니다.
- @EnableBinding(Processor.class)
- streams 은 메세지의 흐름으로 input 메세지와 output 메세지가 존재합니다.
- Processor 방식은 input 과 output 을 모두 사용하는 방식입니다.
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.context.ApplicationContext;
import org.springframework.messaging.handler.annotation.Payload;
@SpringBootApplication
@EnableBinding(Processor.class)
public class DemoApplication {
public static ApplicationContext applicationContext;
public static void main(String[] args) {
applicationContext = SpringApplication.run(DemoApplication.class, args);
}
}
- stream 을 kafka 와 연결하기 위하여 application.yaml 파일에 아래 설정을 추가 합니다.
- kafka brokers로 localhost:9092 를 사용한다는 의미입니다. 카프카 설치시 기본 포트가 9092 입니다.
- bindings.input 과 bindings.output 은 기본 채널입니다. 만약 채널명을 변경 하고 싶으면 Processor 를 새로 만들어야 합니다.
https://github.com/event-storming/products/blob/master/src/main/java/com/example/template/config/kafka/KafkaProcessor.java
- destination 은 목적지라는 뜻인데, kafka 에서는 topic 이름이 됩니다.
- 즉, 해당 설정은 shop 이라는 토픽에 메세지를 주고 받겠다는 의미입니다.
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
bindings:
input:
group: product
destination: shop
contentType: application/json
output:
destination: shop
contentType: application/json
이벤트를 kafka 에 발송
- 좀전에 수정하였던 @PostPersist 부분에 스트림 메세지를 발송하는 부분을 수정합니다.
- 라이브러리 임포트
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeTypeUtils;
- Spring 에서 Bean으로 등록되지 않은 객체에서 Bean 객체를 사용하기 위해 @Autowired 대신, 직접 applicationContext 에서 getBean으로 참조합니다.
@PostPersist
public void eventPublish(){
ProductChanged productChanged = new ProductChanged();
productChanged.setProductId(this.getId());
productChanged.setProductName(this.getName());
productChanged.setProductStock(this.getStock());
ObjectMapper objectMapper = new ObjectMapper();
String json = null;
try {
json = objectMapper.writeValueAsString(productChanged);
} catch (JsonProcessingException e) {
throw new RuntimeException("JSON format exception", e);
}
Processor processor = DemoApplication.applicationContext.getBean(Processor.class);
MessageChannel outputChannel = processor.output();
outputChannel.send(MessageBuilder
.withPayload(json)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build());
}
- 수정 후 서비스를 재시작한 다음 REST API로 상품 등록 시, 카프카에 이벤트 메시지가 도달하는지 확인 합니다.
- 메시지는 Kafka Consumer로써 shop 토픽(topic) 모니터링으로 확인 가능합니다.
- http POST localhost:8080/products name=“TV” stock=10
- [kafka 설치폴더]/bin/windows/kafka-console-consumer –bootstrap-server localhost:9092 –topic shop –from-beginning
이벤트를 수신하는 Policy (Lilac Sticker)
1) 보라색 스티커는 이벤트에 반응하여 작동하는 Policy 입니다.
2) 이벤트에 반응하기 때문에 이벤트를 수신하는 리스너가 필요합니다. Spring-cloud-Stream 을 사용시 아래처럼 @StreamListener 어노테이션으로 선언을 하여 주면, 이벤트가 생성될 때마다 INPUT 으로 들어오는 데이터를 한 개씩 수신하게 됩니다.
Porcessor.INPUT 은 메시지를 수신하는 채널인데, kafka 의 구현체를 가진다면 topic 을 의미합니다. 만약 Topic 을 여러 이벤트에서 공유를 한다면 아래 리스너에서 내가 원하는 이벤트만 선별하여 작업을 해야하기 때문에 이벤트의 속성값에서 정해진 이벤트 명을 찾던가, header 에서 찾는 등 이벤트를 구분하는 로직이 필요합니다.
-
Event에 대응되는 Policy(폴리시)는 다른 마이크로서비스(팀)에서 수신 합니다. 즉, 상품 서비스에서 ProductChanged 이벤트가 발생하면 주문이나 배송 서비스에서 이를 수신 후 각 서비스에 맞는 Biz-Logic을 처리하지만, 편의상 Kafka로부터 메세지 수신만 확인합니다.
-
DemoApplication.java 에 메서드를 추가하고 @StreamListener(Processor.INPUT) 를 추가하여 스트림을 수신합니다.
@StreamListener(Processor.INPUT)
public void onEventByString(@Payload ProuctChanged productChanged){
System.out.println(productChanged);
}
- String 이 아닌 객체 자체를 받아도 StreamListener 에서 객체 변환을 하여 줍니다.
- 위의 카프카에 데이터를 보내는 명령을 호출하여 메세지를 수신하는지 확인 합니다.
[출처]http://34.117.35.195/operation/implementation/implementation-two/
[출처]http://34.117.35.195/operation/implementation/implementation-three/