JinHee's Board
NIFI - Custom Processor 만들어보기 본문
개요
Nifi에서 특정 기능을 활용해야 하는 일이 생겼으나 해당 기능은 Nifi에서 Processor로 제공하지 않는것 같다
Custom Processor를 직접 만들어 해당 기능을 구현하고 내용을 기록하고자 한다
참고한 링크
GitHub - apache/nifi: Apache NiFi
Apache NiFi. Contribute to apache/nifi development by creating an account on GitHub.
github.com
Nifi Processor 프로젝트 생성 및 배포 (feat. Maven)
Maven Archetype으로 NiFi Processor 프로젝트 생성하기 Maven Archetype은 Maven 프로젝트 템플릿을 생성하는 tookit입니다. 이 툴킷으로 원하는 프로젝트 템플릿을 생성할 수 있습니다. 저는 nifi processor 템플릿
plz-fast-retire.tistory.com
Create a Custom Processor | Syncfusion
How to create a custom processor in Data Integration Platform Data Integration Platform provided support to create a custom processor to meet custom or user specific requirements. This document will explain the steps involved in developing a custom process
help.syncfusion.com
사전준비
- Apach maven 3.9.6 (최신버젼)
- Java
- Intellij (무료버젼으로 사용했음)
프로젝트 생성
원하는 경로에 mvn archetype:generate 명령어 실행 (프로젝트 생성)
마지막줄에는 원하는 Maven 프로젝트 탬플릿을 선택해야 하는데 Nifi Custom 프로세서를 만들기 위한 목적이기 때문에
nifi 를 입력하여 해당 탬플릿을 찾는다.
Nifi Custom 프로세서는 1번 (1번 입력)
적용하고자 하는 Nifi의 버젼에 해당하는 숫자를 입력해야 한다.
(1.16.3 버젼을 사용하고 있으므로 53을 입력했다)
Custom 프로세서 프로젝트 정보 입력
생성 완료
프로젝트 빌드
프로젝트 생성 이 완료되면 우선 빌드 하는데에 문제가 없는지 확인한다
cmd로 생성한 프로젝트에서 mvn clean install 실행
문제가 없다면 모두 SUCCESS가 나타난다
문제가 발생하면 아래와 같이 나타난다 ( JAVA 버전 문제 )
문제의 원인이 되는 부분은 캡처 이미지의 제일 윗부분과 같이 해당 위치에서 확인 할 수 있다
지금까지 발견했던 빌드 시 발생한 문제 및 해결 방법은 아래와 같다
1. JAVA 버젼 문제 ( JAVA 1.8.0_241 사용시 빌드 되지 않음 → JAVA 1.8.0_3** 버젼으로 변경 )
2. 라이브러리 버젼 문제 ( Pom.xml 의존성 설정한 라이브러리의 버젼 변경 )
코드 수정
생성한 프로젝트는 Intellij 로 열었고, Custom 프로세서 탬플릿 코드를 수정할 위치는 표시한 위치에 있는 부분 이다
코드내용은 아래와 같으며 주석으로 작성해놓은 숫자는 Nifi 프로세서에 해당하는 부분이다
package com.example.nifi;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@Tags({"example"}) // 1 : 태그
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class MyProcessor extends AbstractProcessor {
// 2 : Property 파라미터
public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
.Builder().name("MY_PROPERTY")
.displayName("My property") // 3 : 파라미터 이름
.description("Example Property") // 4 : 파라미터 설명
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor MY_PROPERTY2 = new PropertyDescriptor
.Builder().name("MY_PROPERTY2")
.displayName("My property2")
.description("Example Property2")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
// 5 : 프로세서를 통과한 플로우 파일이 넘어가는 경로
public static final Relationship MY_RELATIONSHIP = new Relationship.Builder()
.name("MY_RELATIONSHIP")
.description("Example relationship")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
// 앞에서 생성한 Property 파라미터는 이곳에서 추가해야 실제로 추가가 된다
descriptors = new ArrayList<>();
descriptors.add(MY_PROPERTY);
descriptors.add(MY_PROPERTY2);
descriptors = Collections.unmodifiableList(descriptors);
relationships = new HashSet<>();
relationships.add(MY_RELATIONSHIP);
relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
// 프로세서가 실행되어 실제로 작동하는 기능
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
// TODO implement
}
}
위와 같이 작성후 mvn clean Install 로 빌드하면 my-custom-process\nifi-custom-nar\target 경로에 .nar 파일이 생성된다
이 파일을 nifi/lib 폴더에 넣고 nifi 를 재실행 하면 Custom 프로세서를 사용할 수 있다
예시 : 두 Property 에 입력한 숫자를 더하여 출력하는 Custom 프로세서
package com.example.nifi;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class MyProcessor extends AbstractProcessor {
public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
.Builder().name("MY_PROPERTY")
.displayName("My property")
.description("Example Property")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor MY_PROPERTY2 = new PropertyDescriptor
.Builder().name("MY_PROPERTY2")
.displayName("My property2")
.description("Example Property2")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship MY_RELATIONSHIP = new Relationship.Builder()
.name("MY_RELATIONSHIP")
.description("Example relationship")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
descriptors = new ArrayList<>();
descriptors.add(MY_PROPERTY);
descriptors.add(MY_PROPERTY2);
descriptors = Collections.unmodifiableList(descriptors);
relationships = new HashSet<>();
relationships.add(MY_RELATIONSHIP);
relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
int num1 = Integer.parseInt(context.getProperty(MY_PROPERTY).getValue());
int num2 = Integer.parseInt(context.getProperty(MY_PROPERTY2).getValue());
String result = Integer.toString(num1 + num2);
//플로우 파일을 새로 생성
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(InputStream in, OutputStream outputStream) throws IOException {
// IOUtils는 Pom.xml에 라이브러리를 추가해야 사용가능
IOUtils.write(result, outputStream, "UTF-8");
}
});
//플로우 파일 반환
session.transfer(flowFile, MY_RELATIONSHIP);
}
}
빌드 후 반영 및 실행 결과
'공부한 내용정리 > 기타' 카테고리의 다른 글
Project - Socket과 ElasticSearch 를 활용한 채팅 사이트 개발일지 [4] (0) | 2022.05.22 |
---|---|
Project - Socket과 ElasticSearch 를 활용한 채팅 사이트 개발일지 [3] (0) | 2022.05.08 |
Project - Socket과 ElasticSearch 를 활용한 채팅 사이트 개발일지 [2] (0) | 2022.04.23 |
Project - Socket과 ElasticSearch 를 활용한 채팅 사이트 개발일지 [1] (2) | 2022.04.09 |
Apache Spark - OpenSearch 연동 문제 해결 (0) | 2021.10.30 |