Elasticsearch + spirng boot 연동(INSERT)
저번에 인덱스 해당 id로 데이터를 조회까지 해봤다.
오늘은 item 인덱스에 데이터를 INSERT 해보려고 한다.
elasticsearch 에 데이터를 INSERT 하기 위해 설정 파일들을 만들었다.
데이터를 여러개에 List에 담아서 보내기 위해 Bulk를 활용한다.
IndexApi.java
@Slf4j
public class IndexApi {
private RestHighLevelClient client;
public IndexApi(RestHighLevelClient client) {
this.client = client;
}
public boolean createIndex(String indexName) {
CreateIndexRequest request = new CreateIndexRequest(indexName);
return _createIndex(request, indexName);
}
public boolean _createIndex(CreateIndexRequest request, String indexName) {
boolean indexFlag = false;
CreateIndexResponse createIndexResponse;
try{
createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
indexFlag = createIndexResponse.isAcknowledged();
}catch (ElasticsearchException | IOException e){
e.getMessage();
}
if(indexFlag){
log.info(indexName + "인덱스 생성 성공");
}else {
log.info(indexName + "인덱스 생성 실패");
}
return indexFlag;
}
}
elasticsearch RestHighLevelClient 를 사용하여 새로운 인덱스를 생성하는 클래스이다.
client.indices().create(request, RequestOptions.DEFAULT)를 사용하여 Elasticsearch 클라이언트를 통해 새 인덱스를 만든다.
RequestOptions.DEFAULT는 기본 요청 옵션을 사용하도록 지정한다.
createIndexResponse.isAcknowledged()를 통해 생성된 인덱스가 Elasticsearch에 성공적으로 인식되었는지 확인한다.
DocumentApi.java
@Slf4j
public class DocumentApi {
private RestHighLevelClient client;
public DocumentApi(RestHighLevelClient client){
this.client = client;
}
public Boolean bulkDocumentWithBulkProcessor(ArrayList<BulkData> bulkList, int bulkActions) {
return _bulkDocumentWithBulkProcessor(bulkList, bulkActions);
}
private Boolean _bulkDocumentWithBulkProcessor(ArrayList<BulkData> bulkList, int bulkActions) {
boolean terminated = false ;
BulkProcessor bulkProcessor = _getBulkProcessor(bulkActions);
for(BulkData data : bulkList) {
if(data.getActionType().equals(BulkData.Type.CREATE)){
bulkProcessor.add(new IndexRequest(data.getIndexName()).id(data.getId()).source(data.getJsonMap()));
}
}
try {
terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
bulkProcessor.close();
}catch (InterruptedException e){
log.error(e.getMessage());
}
return terminated;
}
private BulkProcessor _getBulkProcessor(int bulkActions) {
BulkProcessor bulkProcessor = BulkProcessor.builder(
(request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), new BulkProcessor.Listener() {
int count = 0;
@Override
public void beforeBulk(long l, BulkRequest bulkRequest) {
count = count + bulkRequest.numberOfActions();
log.info("Uploaded " + count + " so far");
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
if (bulkResponse.hasFailures()) {
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
log.info("Error " + failure.toString());
}
}
}
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
log.info("Errors " + throwable.toString());
}
}, "bulk-processor")
.setBulkActions(bulkActions).setConcurrentRequests(0)
.setFlushInterval(TimeValue.timeValueSeconds(30L))
.build();
return bulkProcessor;
}
}
_getBulkProcessor
이 메소드는 Elasticsearch에 대량의 데이터를 처리하기 위한 BulkProcessor를 생성하는 메서드이다.
BulkProcessor는 대량의 색인 요청을 처리하는 동안 최적의 성능을 제공한다.
BulkProcessor.builder(): BulkProcessor를 생성하기 위한 빌더를 시작한다. 여기에는 클라이언트와 리스너를 설정한다.
_bulkDocumentWithBulkProcessor
이 메서드는 BulkProcessor를 사용하여 대량의 데이터를 Elasticsearch에 색인한다
_getBulkProcessor 메서드를 호출하여 BulkProcessor 객체를 가져온다. 이 때 bulkActions를 함께 전달하여 BulkProcessor를 설정한다.
입력으로 받은 bulkList를 순회하면서 각 데이터의 작업 유형이 "CREATE"인 경우, IndexRequest를 생성하여 BulkProcessor에 추가한다.
ElasticSearchSample1.java
public String insertElasticSearchIndex() {
String indexName = "items";
ArrayList<BulkData> bulkList = new ArrayList<>();
BulkData bulkData = new BulkData();
bulkData.setIndexName(indexName);
bulkData.setId("item3");
bulkData.setActionType(BulkData.Type.CREATE);
HashMap<String, Object> paramMap = new HashMap<>();
paramMap.put("id","item3");
paramMap.put("name", "itemName3");
paramMap.put("brand", "itemBrand");
bulkData.setJsonMap(paramMap);
bulkList.add(bulkData);
IndexApi indexApi = new IndexApi(client);
DocumentApi documentApi = new DocumentApi(client);
documentApi.bulkDocumentWithBulkProcessor(bulkList, 10000);
return "elasticsearch/test";
}
items 인덱스에 id값이 item3 이고 필드가 id , name , brand 를 갖고 있는 데이터를 INSERT 한다.
결과
키바나에서 조회시 id값이 item3인 데이터를 볼 수 있다.