본문 바로가기
스프링부트

스프링 클라우드 데이터 플로우 (SCDF)를 활용한 실시간 데이터 파이프라인 구축 - 소스, 프로세서, 싱크 어플리케이션 개발 및 통합 실습 (2)

by 플라퉁 2023. 6. 12.
728x90
반응형

 

 

 

 

우리는 로컬 SCDF 서버를 띄우는데 성공했습니다.

 

이제 여기서 뭘 할 수 있을까요??

 

 

 

위의 어플리케이션 타입을 보면 source , processor, sink를 확인 할 수 있습니다.

 

이 세가지 어플리케이션은 각각 데이터 제공, 데이터 가공, 데이터 처리의 역할을 할 수 있습니다.

 

바로 실습해 봅시다.

 

먼저 source 어플리케이션을 만듭니다.

 

 

 

 

 

https://start.spring.io/

 

1. spring.io 로 이동하여 test source 프로젝트를 만들어줍니다.

 

저는 롬복과 래빗엠큐를 의존성으로 주입하여 만들었습니다.

 

이 프로젝트는 5초에 한번씩 hello, SCDF! 라는 메시지 데이터를 제공합니다.

 

 

 

 

(1). app입니다.

 

package com.example.scdfsource;

import com.example.scdfsource.service.MessageProducerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;

@SpringBootApplication
@EnableBinding(Source.class)
@EnableScheduling
@Slf4j
public class ScdfsourceApplication {

	@Autowired
	private MessageProducerService messageProducerService;

	public static void main(String[] args) {
		SpringApplication.run(ScdfsourceApplication.class, args);
	}

	@Scheduled(fixedRate = 5000)
	public void triggerMessageProduction() {
		messageProducerService.produceMessage();
		log.info("Sent message: Hello, SCDF!");
	}

}

 

 

 

 

(2). 메시지 생성 서비스입니다.

 

package com.example.scdfsource.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class MessageProducerService {

    @Autowired
    private Source source;

    public void produceMessage() {
        String message = "Hello, SCDF!";
        source.output().send(MessageBuilder.withPayload(message).build());
    }
}

 

최대한 간단하게 작성하였습니다. 

 

 

 

 

 

(3). 다음은 yml입니다.

 

server:
  port: 8091

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: messages
          binder: rabbit
      binders:
        rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 127.0.0.1
                port: 5672
                username: 유저 네임
                password: 비밀 번호

 

이 source 어플리케이션을 빌드하고 jar를 메이븐 리파지토리에 등록해 주세요.

 

 

 

 

명령어 입니다.

 

mvn install:install-file -Dfile="scdfsource-0.0.1-SNAPSHOT.jar" -DgroupId="com.example" -DartifactId="scdfsource" -Dversion="0.0.1-SNAPSHOT" -Dpackaging=jar -DlocalRepositoryPath="C:\Users\kimhyunseung\.m2\repository"

 

**안에 들어가는 내용들은 본인의 컴퓨터에 맞춰 작성해 주셔야 합니다.

 

 

 

 

 

2. spring.io 로 이동하여 test processor 프로젝트를 만들어줍니다.

 

이 프로젝트는 source 어플리케이션으로 부터 데이터를 받아

 

Processed message: HELLO, SCDF!

 

라는 데이터로 가공합니다.

 

 

(1). app입니다.

 

package com.example.scdfprocessor;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.handler.annotation.SendTo;

@EnableBinding(Processor.class)
@SpringBootApplication
@Slf4j
public class ScdfprocessorApplication {

   public static void main(String[] args) {
      SpringApplication.run(ScdfprocessorApplication.class, args);
   }

   @StreamListener(Processor.INPUT)
   @SendTo(Processor.OUTPUT)
   public String processMessage(String message) {
      log.info("recieve message: {}" , message);
      return "Processed message: " + message.toUpperCase();
   }
}

 

 

 

 

 

(2). 다음은 yml 입니다.

 

server:
  port: 8092

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: messages
          binder: rabbit
        output:
          destination: processed-messages
          binder: rabbit
      binders:
        rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 127.0.0.1
                port: 5672
                username: 유저 네임
                password: 비밀 번호

 

이 processor 어플리케이션을 빌드하고 jar를 메이븐 리파지토리에 등록해 주세요.

 

위의 source 어플리케이션과 동일하게 해주시면 됩니다.

 

 

 

 

 

3. spring.io 로 이동하여 test sink 프로젝트를 만들어줍니다.

 

이제 데이터를 저장할 DB를 설정 합니다. 저는 오라클을 사용하였는데 

 

어떤 DB를 사용하는지는 여러분의 마음입니다.

 

레디스, mongoDB, postgres 아무거나 사용하셔서 

 

데이터를 받아 주세요.

 

 

 

(1). app입니다.

 

package com.example.OracleSink;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
@SpringBootApplication
@EnableBinding(Sink.class)
@Slf4j
public class OracleSinkApplication {

   public static void main(String[] args) {
      SpringApplication.run(OracleSinkApplication.class, args);
   }

   @Autowired
   private MessageRepository messageRepository;

   @StreamListener(Sink.INPUT)
   public void processMessage(String message) {
      log.info("Received message: {}", message);
      Message msg = new Message();
      msg.setContent(message);
      messageRepository.save(msg);
      log.info("Message saved to Oracle Database.");
   }

   @Entity
   @Table(name = "MESSAGES")
   public static class Message {

      @Id
      @GeneratedValue(strategy = GenerationType.AUTO)
      private Long id;

      private String content;

      // getters and setters

      public void setContent(String content) {
         this.content = content;
      }
   }



}

 

 

 

 

 

(2). 다음은 레파지토리입니다.

 

package com.example.OracleSink;

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface MessageRepository extends JpaRepository<OracleSinkApplication.Message, Long> {

}

 

 

 

 

(3). 다음은 yml 입니다.

 

server:
  port: 8093

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: messages
          binder: rabbit
      binders:
        rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 127.0.0.1
                port: 5672
                username: 유저 네임
                password: 비밀 번호
  datasource:
    url: jdbc:oracle:thin:@오라클 주소
    username: 유저 네임
    password: 비밀 번호
  jpa:
    show-sql: true
    hibernate:
      ddl-auto: update
  properties:
    hibernate:
      dialect: org.hibernate.dialect.Oracle10gDialect

 

이 sink 어플리케이션을 빌드하고 jar를 메이븐 리파지토리에 등록해 주세요.

 

위의 source 어플리케이션과 동일하게 해주시면 됩니다.

 

 

 

 

 

 

자 오늘은 여기까지 작성해보겠습니다.

 

중간중간 개별로 실행해서 이상이 없는지 확인하시면서

 

다 정상 동작 한다면 매우 축하드립니다.

 

 

 

이제 다음 마지막 시간에 scdf 서버에 streams로 올려 무한 스트리밍을 구현해 봅시다.

 

 

 

 

728x90
반응형

댓글