본문 바로가기

Elasticsearch

Elasticsearch + spirng boot 연동(INSERT)

저번에 인덱스 해당 id로 데이터를 조회까지 해봤다.

 

오늘은 item 인덱스에 데이터를 INSERT 해보려고 한다.

 

elasticsearch 에 데이터를 INSERT 하기 위해 설정 파일들을 만들었다.

데이터를 여러개에 List에 담아서 보내기 위해 Bulk를 활용한다.

 

IndexApi.java

@Slf4j
public class IndexApi {

    private RestHighLevelClient client;

    public IndexApi(RestHighLevelClient client) {
        this.client = client;
    }

    public boolean createIndex(String indexName) {

        CreateIndexRequest request = new CreateIndexRequest(indexName);

        return _createIndex(request, indexName);

    }

    public boolean _createIndex(CreateIndexRequest request, String indexName) {

        boolean indexFlag = false;
        CreateIndexResponse createIndexResponse;

        try{
                createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
                indexFlag = createIndexResponse.isAcknowledged();
        }catch (ElasticsearchException | IOException e){
            e.getMessage();
        }

        if(indexFlag){
            log.info(indexName + "인덱스 생성 성공");
        }else {
            log.info(indexName + "인덱스 생성 실패");
        }

        return indexFlag;
    }

}

 

elasticsearch RestHighLevelClient 를 사용하여 새로운 인덱스를 생성하는 클래스이다.

client.indices().create(request, RequestOptions.DEFAULT)를 사용하여 Elasticsearch 클라이언트를 통해 새 인덱스를 만든다.

RequestOptions.DEFAULT는 기본 요청 옵션을 사용하도록 지정한다.

createIndexResponse.isAcknowledged()를 통해 생성된 인덱스가 Elasticsearch에 성공적으로 인식되었는지 확인한다.

 

 

DocumentApi.java

@Slf4j
public class DocumentApi {

    private RestHighLevelClient client;

    public DocumentApi(RestHighLevelClient client){
        this.client = client;
    }

    public Boolean bulkDocumentWithBulkProcessor(ArrayList<BulkData> bulkList, int bulkActions) {
        return _bulkDocumentWithBulkProcessor(bulkList, bulkActions);
    }

    private Boolean _bulkDocumentWithBulkProcessor(ArrayList<BulkData> bulkList, int bulkActions) {

        boolean terminated = false ;
        BulkProcessor bulkProcessor = _getBulkProcessor(bulkActions);

        for(BulkData data : bulkList) {
            if(data.getActionType().equals(BulkData.Type.CREATE)){
                bulkProcessor.add(new IndexRequest(data.getIndexName()).id(data.getId()).source(data.getJsonMap()));
            }
        }

        try {
            terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
            bulkProcessor.close();
        }catch (InterruptedException e){
            log.error(e.getMessage());
        }

        return terminated;
    }

    private BulkProcessor _getBulkProcessor(int bulkActions) {

        BulkProcessor bulkProcessor = BulkProcessor.builder(
                        (request, bulkListener) ->
                                client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), new BulkProcessor.Listener() {
                            int count = 0;

                            @Override
                            public void beforeBulk(long l, BulkRequest bulkRequest) {
                                count = count + bulkRequest.numberOfActions();
                                log.info("Uploaded " + count + " so far");
                            }
                            @Override
                            public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
                                if (bulkResponse.hasFailures()) {
                                    for (BulkItemResponse bulkItemResponse : bulkResponse) {
                                        if (bulkItemResponse.isFailed()) {
                                            BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
                                            log.info("Error " + failure.toString());
                                        }
                                    }
                                }
                            }
                            @Override
                            public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
                                log.info("Errors " + throwable.toString());
                            }
                        }, "bulk-processor")
                .setBulkActions(bulkActions).setConcurrentRequests(0)
                .setFlushInterval(TimeValue.timeValueSeconds(30L))
                .build();
        return bulkProcessor;

    }
}

 

_getBulkProcessor

이 메소드는 Elasticsearch에 대량의 데이터를 처리하기 위한 BulkProcessor를 생성하는 메서드이다.

BulkProcessor는 대량의 색인 요청을 처리하는 동안 최적의 성능을 제공한다.

 

 

BulkProcessor.builder(): BulkProcessor를 생성하기 위한 빌더를 시작한다. 여기에는 클라이언트와 리스너를 설정한다.

 

 

_bulkDocumentWithBulkProcessor

이 메서드는 BulkProcessor를 사용하여 대량의 데이터를 Elasticsearch에 색인한다

_getBulkProcessor 메서드를 호출하여 BulkProcessor 객체를 가져온다. 이 때 bulkActions를 함께 전달하여 BulkProcessor를 설정한다.

입력으로 받은 bulkList를 순회하면서 각 데이터의 작업 유형이 "CREATE"인 경우, IndexRequest를 생성하여 BulkProcessor에 추가한다.

 

ElasticSearchSample1.java

 public String insertElasticSearchIndex() {

        String indexName = "items";
        ArrayList<BulkData> bulkList = new ArrayList<>();
        BulkData bulkData = new BulkData();

        bulkData.setIndexName(indexName);
        bulkData.setId("item3");
        bulkData.setActionType(BulkData.Type.CREATE);

        HashMap<String, Object> paramMap = new HashMap<>();
        paramMap.put("id","item3");
        paramMap.put("name", "itemName3");
        paramMap.put("brand", "itemBrand");

        bulkData.setJsonMap(paramMap);
        bulkList.add(bulkData);

        IndexApi indexApi = new IndexApi(client);

        DocumentApi documentApi = new DocumentApi(client);
        documentApi.bulkDocumentWithBulkProcessor(bulkList, 10000);

        return "elasticsearch/test";
    }

 

items 인덱스에 id값이 item3 이고 필드가 id , name , brand 를 갖고 있는 데이터를 INSERT 한다.

 

결과

 

키바나에서 조회시 id값이 item3인 데이터를 볼 수 있다.

'Elasticsearch' 카테고리의 다른 글

Elasticsearch + spirng boot 연동(SELECT)  (0) 2024.03.31
Elasticsearch + spirng boot 연동(DELETE)  (0) 2024.03.31
Elasticsearch + spirng boot 연동(SELECT)  (0) 2024.03.26
Elasticsearch Query DSL  (0) 2024.03.22
Elasticsearch Query DSL  (0) 2024.03.19