Skip to content

Commit

Permalink
Fix Ingestion API SO error (#1294)
Browse files Browse the repository at this point in the history
  • Loading branch information
damirabdul authored Mar 22, 2023
1 parent 977977d commit fb7b96d
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.opendatadiscovery.oddplatform.repository.reactive;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.commons.collections4.CollectionUtils;
Expand Down Expand Up @@ -58,15 +59,14 @@ public Flux<MetadataFieldPojo> listByKey(final Collection<MetadataKey> keys) {
if (keys.isEmpty()) {
return Flux.just();
}

final Condition condition = keys.stream()
.map(t -> METADATA_FIELD.NAME.eq(t.fieldName()).and(METADATA_FIELD.TYPE.eq(t.fieldType().toString())))
.reduce(Condition::or)
.orElseThrow();

return jooqReactiveOperations
.flux(DSL.selectFrom(METADATA_FIELD).where(addSoftDeleteFilter(condition)))
.map(this::recordToPojo);
return jooqReactiveOperations.executeInPartitionReturning(new ArrayList<>(keys), chunk -> {
final Condition condition = chunk.stream()
.map(t -> METADATA_FIELD.NAME.eq(t.fieldName()).and(METADATA_FIELD.TYPE.eq(t.fieldType().toString())))
.reduce(Condition::or)
.orElseThrow();
return jooqReactiveOperations
.flux(DSL.selectFrom(METADATA_FIELD).where(addSoftDeleteFilter(condition)));
}).map(this::recordToPojo);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ private Mono<Void> ingestMetadataForFields(final IngestionRequest request,
final List<MetadataInfo> metadataInfos = retrieveMetadataInfoFromDatasetFields(request, fields);
final List<MetadataKey> metadataKeys = metadataInfos.stream()
.map(MetadataInfo::key)
.distinct()
.toList();
final List<Long> datasetFieldIds = fields.values().stream()
.map(DatasetFieldPojo::getId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public boolean shouldProcess(final IngestionRequest request) {
return CollectionUtils.isNotEmpty(extractHollowCandidates(request));
}

@Override
public IngestionProcessingPhase getPhase() {
return IngestionProcessingPhase.INITIAL;
}

private Set<String> extractHollowCandidates(final IngestionRequest request) {
final Set<String> existingEntitiesOddrns = request.getAllEntities()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@

@RequiredArgsConstructor
public enum IngestionProcessingPhase {
MAIN(1),
FINALIZING(2);
INITIAL(1),
MAIN(2),
FINALIZING(3);

@Getter
private final int order;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public Mono<Void> process(final IngestionRequest request) {
final List<MetadataInfo> metadataInfos = retrieveMetadataInfoFromDataStructure(request);
final List<MetadataKey> metadataKeys = metadataInfos.stream()
.map(MetadataInfo::key)
.distinct()
.toList();

final var existingMono = metadataFieldValueRepository.listByDataEntityIds(request.getAllIds())
Expand Down

0 comments on commit fb7b96d

Please sign in to comment.