Notice
Recent Posts
Recent Comments
Link
«   2025/06   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30
Archives
Today
Total
관리 메뉴

JinHee's Board

Apache Spark - OpenSearch 연동 문제 해결 본문

공부한 내용정리/기타

Apache Spark - OpenSearch 연동 문제 해결

JinHee Han 2021. 10. 30. 22:44

개요

Apache Spark와 OpenSearch 자바로 연동시 발생하는 문제를 해결하는 과정을 기록하고자 한다.

 

 

문제확인

Opensearch는 엘라스틱에서 서비스하는 Elasticsearch 오픈소스를 포크(코드를 복사)하여 만든 AWS판 ElasticSearch 다.

Spark와 Elasticsearch를 Java를 통해 연동하는 과정에 있어 사용하던 Elasticsearch를  Opensearch로 교체해야 할 일이 생겨 교체했는데 이후 잘만 실행되었던 Spark가 실행되지 않는 문제가 생겼다.

 

 

오류메시지

Elasticsearch를 연동하고 있었을때는 오류가 없었으나 Opensearch로 연동한 다음 이와 같은 오류가 발생했다.

눈여겨볼점은 (No search type for [scan]) 이라는 메시지

 

검색해 본 결과 Elasticsearch 구버전(2.0.0 이전버전)은 search type으로 scan을 사용한다고 한다.

또한 공식문서에서는 Spark에서 연동을 지원하는 Elastic버젼은 5.0이후 버젼(하단 링크)

 

Apache Spark support | Elasticsearch for Apache Hadoop [7.15] | Elastic

Due to the way SparkSQL works with a DataFrame schema, elasticsearch-hadoop needs to be aware of what fields are returned from Elasticsearch before executing the actual queries. While one can restrict the fields manually through the underlying Elasticsearc

www.elastic.co

Opensearch는 Elasticsearch의 최신버젼을 포크했기 때문에 적어도 10.0.0 버전 이상이지만

Opensearch자체는 Elasticsearch와 별개이므로 버젼은 1.0.0 으로 설정되어 있다.

 

이를 통해 Spark와 Opensearch를 Java로 연동할시 Java에서는 Opensearch를 1.0.0버전으로 읽어서 search type을 scan으로 실행하게 되지만 Spark에서는 search type scan을 지원하지 않기 때문에 위와 같은 오류가 나는것 같다고 추측했다.

 

문제해결

Spark는 Elasticsearch를 지원하고 있지만 Opensearch를 지원하지 않기 때문에 가장 좋은 해결방법은 Spark나 Opensearch 상에서의 업데이트를 기다리는 방법이지만 기다릴 시간이 없기 때문에 다음과 같이 해결했다.

 

오류 메시지를 한참동안 추적해본 결과 org.elasticsearch.hadoop 라이브러리 내부에 

EsMajorVersion.class 에서 Elastic 버전을 인식하고 있었다. 

package org.elasticsearch.hadoop.util;

import java.io.Serializable;

import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;

/**
 * Elasticsearch major version information, useful to check client's query compatibility with the Rest API.
 */
public class EsMajorVersion implements Serializable {

    private static final long serialVersionUID = 1L;

    public static final EsMajorVersion V_0_X = new EsMajorVersion((byte) 0, "0.x");
    public static final EsMajorVersion V_1_X = new EsMajorVersion((byte) 1, "1.x");
    public static final EsMajorVersion V_2_X = new EsMajorVersion((byte) 2, "2.x");
    public static final EsMajorVersion V_5_X = new EsMajorVersion((byte) 5, "5.x");
    public static final EsMajorVersion V_6_X = new EsMajorVersion((byte) 6, "6.x");
    public static final EsMajorVersion V_7_X = new EsMajorVersion((byte) 7, "7.x");
    public static final EsMajorVersion LATEST = V_7_X;

    public final byte major;
    private final String version;

    private EsMajorVersion(byte major, String version) {
        this.major = major;
        this.version = version;
    }

    public boolean after(EsMajorVersion version) {
        return version.major < major;
    }

    public boolean on(EsMajorVersion version) {
        return version.major == major;
    }

    public boolean notOn(EsMajorVersion version) {
        return !on(version);
    }

    public boolean onOrAfter(EsMajorVersion version) {
        return version.major <= major;
    }

    public boolean before(EsMajorVersion version) {
        return version.major > major;
    }

    public boolean onOrBefore(EsMajorVersion version) {
        return version.major >= major;
    }

    public static EsMajorVersion parse(String version) {
        if (version.startsWith("0.")) {
            return new EsMajorVersion((byte) 0, version);
        }
        if (version.startsWith("1.")) {
            return new EsMajorVersion((byte) 1, version);
        }
        if (version.startsWith("2.")) {
            return new EsMajorVersion((byte) 2, version);
        }
        if (version.startsWith("5.")) {
            return new EsMajorVersion((byte) 5, version);
        }
        if (version.startsWith("6.")) {
            return new EsMajorVersion((byte) 6, version);
        }
        if (version.startsWith("7.")) {
            return new EsMajorVersion((byte) 7, version);
        }
        throw new EsHadoopIllegalArgumentException("Unsupported/Unknown Elasticsearch version [" + version + "]." +
                "Highest supported version is [" + LATEST.version + "]. You may need to upgrade ES-Hadoop.");
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }

        EsMajorVersion version = (EsMajorVersion) o;

        return major == version.major &&
                this.version.equals(version.version);
    }

    @Override
    public int hashCode() {
        return major;
    }

    @Override
    public String toString() {
        return version;
    }
}

운이 좋게도 해당 라이브러리 폴더에 java소스가 포함되어 있어서 수정이 가능했고 아래와 같이 parse 메소드를 수정했다.

    public static EsMajorVersion parse(String version) {
        if (version.startsWith("0.")) {
            return new EsMajorVersion((byte) 5, version);
        }
        if (version.startsWith("1.")) {
            return new EsMajorVersion((byte) 5, version);
        }
        if (version.startsWith("2.")) {
            return new EsMajorVersion((byte) 5, version);
        }
        if (version.startsWith("5.")) {
            return new EsMajorVersion((byte) 5, version);
        }
        if (version.startsWith("6.")) {
            return new EsMajorVersion((byte) 6, version);
        }
        if (version.startsWith("7.")) {
            return new EsMajorVersion((byte) 7, version);
        }
		return new EsMajorVersion((byte) 5, version);
    }

실행하는 Elasticsearch 버전이 1이나 2로 시작하는 버전이어도 5버전 이상으로 인식하도록 수정했다.

수정후 javac 커맨드로 해당 java만 컴파일한 다음 해당 라이브러리에 덮어 씌웠다.

 

라이브러리를 jar로 재압축한 다음 Spark 디렉토리에서 java 라이브러리들이 있는 디렉토리에 옮겼다. 

 

 

Comments