Notice
Recent Posts
Recent Comments
Link
«   2025/07   »
1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30 31
Archives
Today
Total
관리 메뉴

JinHee's Board

NIFI - Custom Processor 만들어보기 본문

공부한 내용정리/기타

NIFI - Custom Processor 만들어보기

JinHee Han 2024. 3. 17. 12:59

개요

Nifi에서 특정 기능을 활용해야 하는 일이 생겼으나 해당 기능은 Nifi에서 Processor로 제공하지 않는것 같다

Custom Processor를 직접 만들어 해당 기능을 구현하고 내용을 기록하고자 한다

 

참고한 링크 

 

GitHub - apache/nifi: Apache NiFi

Apache NiFi. Contribute to apache/nifi development by creating an account on GitHub.

github.com

 

Nifi Processor 프로젝트 생성 및 배포 (feat. Maven)

Maven Archetype으로 NiFi Processor 프로젝트 생성하기 Maven Archetype은 Maven 프로젝트 템플릿을 생성하는 tookit입니다. 이 툴킷으로 원하는 프로젝트 템플릿을 생성할 수 있습니다. 저는 nifi processor 템플릿

plz-fast-retire.tistory.com

 

Create a Custom Processor | Syncfusion

How to create a custom processor in Data Integration Platform Data Integration Platform provided support to create a custom processor to meet custom or user specific requirements. This document will explain the steps involved in developing a custom process

help.syncfusion.com

 

사전준비

  • Apach maven 3.9.6 (최신버젼)
  • Java 
  • Intellij (무료버젼으로 사용했음)

프로젝트 생성

원하는 경로에 mvn archetype:generate  명령어 실행 (프로젝트 생성)

 

마지막줄에는 원하는 Maven 프로젝트 탬플릿을 선택해야 하는데 Nifi Custom 프로세서를 만들기 위한 목적이기 때문에

nifi 를 입력하여 해당 탬플릿을 찾는다.

Nifi Custom 프로세서는 1번 (1번 입력)

적용하고자 하는 Nifi의 버젼에 해당하는 숫자를 입력해야 한다.

(1.16.3 버젼을 사용하고 있으므로 53을 입력했다)

 

Custom 프로세서 프로젝트 정보 입력

 

생성 완료

 

프로젝트 빌드

프로젝트 생성 이 완료되면 우선 빌드 하는데에 문제가 없는지 확인한다

cmd로 생성한 프로젝트에서 mvn clean install 실행

문제가 없다면 모두 SUCCESS가 나타난다

 

문제가 발생하면 아래와 같이 나타난다 ( JAVA 버전 문제 )

문제의 원인이 되는 부분은 캡처 이미지의 제일 윗부분과 같이 해당 위치에서 확인 할 수 있다

 

지금까지 발견했던 빌드 시 발생한 문제 및 해결 방법은 아래와 같다

1. JAVA 버젼 문제 ( JAVA 1.8.0_241 사용시 빌드 되지 않음 → JAVA 1.8.0_3** 버젼으로 변경 )

2. 라이브러리 버젼 문제 ( Pom.xml 의존성 설정한 라이브러리의 버젼 변경 )

 

코드 수정

생성한 프로젝트는 Intellij 로 열었고, Custom 프로세서 탬플릿 코드를 수정할 위치는 표시한 위치에 있는 부분 이다

 

코드내용은 아래와 같으며 주석으로 작성해놓은 숫자는 Nifi 프로세서에 해당하는 부분이다

package com.example.nifi;

import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

@Tags({"example"}) // 1 : 태그
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class MyProcessor extends AbstractProcessor {

	// 2 : Property 파라미터 
    public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
            .Builder().name("MY_PROPERTY") 
            .displayName("My property") // 3 : 파라미터 이름
            .description("Example Property") // 4 : 파라미터 설명
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();
            
    public static final PropertyDescriptor MY_PROPERTY2 = new PropertyDescriptor
            .Builder().name("MY_PROPERTY2")
            .displayName("My property2")
            .description("Example Property2")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();

	// 5 : 프로세서를 통과한 플로우 파일이 넘어가는 경로
    public static final Relationship MY_RELATIONSHIP = new Relationship.Builder()
            .name("MY_RELATIONSHIP") 
            .description("Example relationship")
            .build();

    private List<PropertyDescriptor> descriptors;

    private Set<Relationship> relationships;

    @Override
    protected void init(final ProcessorInitializationContext context) {
        // 앞에서 생성한 Property 파라미터는 이곳에서 추가해야 실제로 추가가 된다
        descriptors = new ArrayList<>();
        descriptors.add(MY_PROPERTY);
        descriptors.add(MY_PROPERTY2);
        descriptors = Collections.unmodifiableList(descriptors);

        relationships = new HashSet<>();
        relationships.add(MY_RELATIONSHIP);
        relationships = Collections.unmodifiableSet(relationships);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @OnScheduled
    public void onScheduled(final ProcessContext context) {

    }

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) {
        // 프로세서가 실행되어 실제로 작동하는 기능
        FlowFile flowFile = session.get();
        if ( flowFile == null ) {
            return;
        }
        // TODO implement
    }
}

 

 

 

위와 같이 작성후 mvn clean Install 로 빌드하면 my-custom-process\nifi-custom-nar\target 경로에 .nar 파일이 생성된다 

이 파일을 nifi/lib 폴더에 넣고 nifi 를 재실행 하면 Custom 프로세서를 사용할 수 있다

 

예시 : 두 Property 에 입력한 숫자를 더하여 출력하는 Custom 프로세서

package com.example.nifi;

import org.apache.commons.io.IOUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class MyProcessor extends AbstractProcessor {

    public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
            .Builder().name("MY_PROPERTY")
            .displayName("My property")
            .description("Example Property")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();
    public static final PropertyDescriptor MY_PROPERTY2 = new PropertyDescriptor
            .Builder().name("MY_PROPERTY2")
            .displayName("My property2")
            .description("Example Property2")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();

    public static final Relationship MY_RELATIONSHIP = new Relationship.Builder()
            .name("MY_RELATIONSHIP")
            .description("Example relationship")
            .build();

    private List<PropertyDescriptor> descriptors;

    private Set<Relationship> relationships;

    @Override
    protected void init(final ProcessorInitializationContext context) {
        descriptors = new ArrayList<>();
        descriptors.add(MY_PROPERTY);
        descriptors.add(MY_PROPERTY2);
        descriptors = Collections.unmodifiableList(descriptors);

        relationships = new HashSet<>();
        relationships.add(MY_RELATIONSHIP);
        relationships = Collections.unmodifiableSet(relationships);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @OnScheduled
    public void onScheduled(final ProcessContext context) {

    }

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) {
        int num1 = Integer.parseInt(context.getProperty(MY_PROPERTY).getValue());
        int num2 = Integer.parseInt(context.getProperty(MY_PROPERTY2).getValue());
        String result = Integer.toString(num1 + num2);

        //플로우 파일을 새로 생성
        FlowFile flowFile = session.create();
        flowFile = session.write(flowFile, new StreamCallback() {
            @Override
            public void process(InputStream in, OutputStream outputStream) throws IOException {
                // IOUtils는 Pom.xml에 라이브러리를 추가해야 사용가능
                IOUtils.write(result, outputStream, "UTF-8");
            }
        });

        //플로우 파일 반환
        session.transfer(flowFile, MY_RELATIONSHIP);
    }
}

 

빌드 후 반영 및 실행 결과

 

Comments