우리는 로컬 SCDF 서버를 띄우는데 성공했습니다.
이제 여기서 뭘 할 수 있을까요??
위의 어플리케이션 타입을 보면 source , processor, sink를 확인 할 수 있습니다.
이 세가지 어플리케이션은 각각 데이터 제공, 데이터 가공, 데이터 처리의 역할을 할 수 있습니다.
바로 실습해 봅시다.
먼저 source 어플리케이션을 만듭니다.
1. spring.io 로 이동하여 test source 프로젝트를 만들어줍니다.
저는 롬복과 래빗엠큐를 의존성으로 주입하여 만들었습니다.
이 프로젝트는 5초에 한번씩 hello, SCDF! 라는 메시지 데이터를 제공합니다.
(1). app입니다.
package com.example.scdfsource;
import com.example.scdfsource.service.MessageProducerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
@SpringBootApplication
@EnableBinding(Source.class)
@EnableScheduling
@Slf4j
public class ScdfsourceApplication {
@Autowired
private MessageProducerService messageProducerService;
public static void main(String[] args) {
SpringApplication.run(ScdfsourceApplication.class, args);
}
@Scheduled(fixedRate = 5000)
public void triggerMessageProduction() {
messageProducerService.produceMessage();
log.info("Sent message: Hello, SCDF!");
}
}
(2). 메시지 생성 서비스입니다.
package com.example.scdfsource.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
public class MessageProducerService {
@Autowired
private Source source;
public void produceMessage() {
String message = "Hello, SCDF!";
source.output().send(MessageBuilder.withPayload(message).build());
}
}
최대한 간단하게 작성하였습니다.
(3). 다음은 yml입니다.
server:
port: 8091
spring:
cloud:
stream:
bindings:
output:
destination: messages
binder: rabbit
binders:
rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: 유저 네임
password: 비밀 번호
이 source 어플리케이션을 빌드하고 jar를 메이븐 리파지토리에 등록해 주세요.
명령어 입니다.
mvn install:install-file -Dfile="scdfsource-0.0.1-SNAPSHOT.jar" -DgroupId="com.example" -DartifactId="scdfsource" -Dversion="0.0.1-SNAPSHOT" -Dpackaging=jar -DlocalRepositoryPath="C:\Users\kimhyunseung\.m2\repository"
**안에 들어가는 내용들은 본인의 컴퓨터에 맞춰 작성해 주셔야 합니다.
2. spring.io 로 이동하여 test processor 프로젝트를 만들어줍니다.
이 프로젝트는 source 어플리케이션으로 부터 데이터를 받아
Processed message: HELLO, SCDF!
라는 데이터로 가공합니다.
(1). app입니다.
package com.example.scdfprocessor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.handler.annotation.SendTo;
@EnableBinding(Processor.class)
@SpringBootApplication
@Slf4j
public class ScdfprocessorApplication {
public static void main(String[] args) {
SpringApplication.run(ScdfprocessorApplication.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public String processMessage(String message) {
log.info("recieve message: {}" , message);
return "Processed message: " + message.toUpperCase();
}
}
(2). 다음은 yml 입니다.
server:
port: 8092
spring:
cloud:
stream:
bindings:
input:
destination: messages
binder: rabbit
output:
destination: processed-messages
binder: rabbit
binders:
rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: 유저 네임
password: 비밀 번호
이 processor 어플리케이션을 빌드하고 jar를 메이븐 리파지토리에 등록해 주세요.
위의 source 어플리케이션과 동일하게 해주시면 됩니다.
3. spring.io 로 이동하여 test sink 프로젝트를 만들어줍니다.
이제 데이터를 저장할 DB를 설정 합니다. 저는 오라클을 사용하였는데
어떤 DB를 사용하는지는 여러분의 마음입니다.
레디스, mongoDB, postgres 아무거나 사용하셔서
데이터를 받아 주세요.
(1). app입니다.
package com.example.OracleSink;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
@SpringBootApplication
@EnableBinding(Sink.class)
@Slf4j
public class OracleSinkApplication {
public static void main(String[] args) {
SpringApplication.run(OracleSinkApplication.class, args);
}
@Autowired
private MessageRepository messageRepository;
@StreamListener(Sink.INPUT)
public void processMessage(String message) {
log.info("Received message: {}", message);
Message msg = new Message();
msg.setContent(message);
messageRepository.save(msg);
log.info("Message saved to Oracle Database.");
}
@Entity
@Table(name = "MESSAGES")
public static class Message {
@Id
@GeneratedValue(strategy = GenerationType.AUTO)
private Long id;
private String content;
// getters and setters
public void setContent(String content) {
this.content = content;
}
}
}
(2). 다음은 레파지토리입니다.
package com.example.OracleSink;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface MessageRepository extends JpaRepository<OracleSinkApplication.Message, Long> {
}
(3). 다음은 yml 입니다.
server:
port: 8093
spring:
cloud:
stream:
bindings:
input:
destination: messages
binder: rabbit
binders:
rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: 유저 네임
password: 비밀 번호
datasource:
url: jdbc:oracle:thin:@오라클 주소
username: 유저 네임
password: 비밀 번호
jpa:
show-sql: true
hibernate:
ddl-auto: update
properties:
hibernate:
dialect: org.hibernate.dialect.Oracle10gDialect
이 sink 어플리케이션을 빌드하고 jar를 메이븐 리파지토리에 등록해 주세요.
위의 source 어플리케이션과 동일하게 해주시면 됩니다.
자 오늘은 여기까지 작성해보겠습니다.
중간중간 개별로 실행해서 이상이 없는지 확인하시면서
다 정상 동작 한다면 매우 축하드립니다.
이제 다음 마지막 시간에 scdf 서버에 streams로 올려 무한 스트리밍을 구현해 봅시다.
'스프링부트' 카테고리의 다른 글
Spring WebFlux 시작하기 - 리액티브 웹 애플리케이션 개발과 R2DBC 소개 (2) | 2023.08.21 |
---|---|
스프링 클라우드 데이터 플로우(SCDF)를 활용한 무중단 서비스 구축: 소스, 프로세서, 싱크 어플리케이션 스트림 통합 및 배포 (3) (1) | 2023.06.12 |
스프링 클라우드 데이터 플로우 (SCDF)를 이용한 로컬 서버 구축 및 어플리케이션 통합 - 실습 가이드와 네이버 개발자 리소스 (1) (0) | 2023.06.09 |
스프링 클라우드 데이터 플로우 (SCDF)를 활용한 실시간 대용량 데이터 처리 프로젝트 소개와 경험 공유 (0) | 2023.06.08 |
Stomp를 활용한 실시간 채팅 프로그램 구현: 웹소켓 최적화 및 효율적인 메시징 전송 (4) (7) | 2023.02.24 |
댓글