Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions api/src/main/java/org/apache/iceberg/OverwriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.util.DataFileSet;
import org.apache.iceberg.util.DeleteFileSet;

/**
* API for overwriting files in a table.
Expand Down Expand Up @@ -75,6 +77,19 @@ public interface OverwriteFiles extends SnapshotUpdate<OverwriteFiles> {
*/
OverwriteFiles deleteFile(DataFile file);

/**
* Deletes a set of data files from the table with their respective delete files.
*
* @param dataFilesToDelete the data files to be deleted from the table
* @param deleteFilesToDelete the delete files corresponding to the data files to be deleted from
* the table
* @return this for method chaining
*/
default OverwriteFiles deleteFiles(
Comment thread
stevenzwu marked this conversation as resolved.
DataFileSet dataFilesToDelete, DeleteFileSet deleteFilesToDelete) {
throw new UnsupportedOperationException("Deleting data and delete files is not supported");
}

/**
* Signal that each file added to the table must match the overwrite expression.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,9 @@ default int failedDataFilesCount() {
}

default int removedDeleteFilesCount() {
return 0;
return rewriteResults().stream()
.mapToInt(FileGroupRewriteResult::removedDeleteFilesCount)
.sum();
}
}

Expand All @@ -248,6 +250,10 @@ interface FileGroupRewriteResult {
default long rewrittenBytesCount() {
return 0L;
}

default int removedDeleteFilesCount() {
return 0;
}
}

/** For a file group that failed to rewrite. */
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iceberg.expressions.StrictMetricsEvaluator;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.DataFileSet;
import org.apache.iceberg.util.DeleteFileSet;

public class BaseOverwriteFiles extends MergingSnapshotProducer<OverwriteFiles>
implements OverwriteFiles {
Expand Down Expand Up @@ -70,6 +71,20 @@ public OverwriteFiles addFile(DataFile file) {
return this;
}

@Override
public OverwriteFiles deleteFiles(
DataFileSet dataFilesToDelete, DeleteFileSet deleteFilesToDelete) {
for (DataFile dataFile : dataFilesToDelete) {
deleteFile(dataFile);
}

for (DeleteFile deleteFile : deleteFilesToDelete) {
delete(deleteFile);
}

return this;
}

@Override
public OverwriteFiles deleteFile(DataFile file) {
deletedDataFiles.add(file);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ interface FileGroupRewriteResult extends RewriteDataFiles.FileGroupRewriteResult
default long rewrittenBytesCount() {
return RewriteDataFiles.FileGroupRewriteResult.super.rewrittenBytesCount();
}

@Override
@Value.Default
default int removedDeleteFilesCount() {
return RewriteDataFiles.FileGroupRewriteResult.super.removedDeleteFilesCount();
}
}

@Value.Immutable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.DataFileSet;
import org.apache.iceberg.util.DeleteFileSet;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -74,19 +75,23 @@ public RewriteDataFilesCommitManager(
public void commitFileGroups(Set<RewriteFileGroup> fileGroups) {
DataFileSet rewrittenDataFiles = DataFileSet.create();
DataFileSet addedDataFiles = DataFileSet.create();
DeleteFileSet danglingDVs = DeleteFileSet.create();
for (RewriteFileGroup group : fileGroups) {
rewrittenDataFiles.addAll(group.rewrittenFiles());
addedDataFiles.addAll(group.addedFiles());
danglingDVs.addAll(group.danglingDVs());
}

RewriteFiles rewrite = table.newRewrite().validateFromSnapshot(startingSnapshotId);
if (useStartingSequenceNumber) {
long sequenceNumber = table.snapshot(startingSnapshotId).sequenceNumber();
rewrite.rewriteFiles(rewrittenDataFiles, addedDataFiles, sequenceNumber);
} else {
rewrite.rewriteFiles(rewrittenDataFiles, addedDataFiles);
rewrite.dataSequenceNumber(sequenceNumber);
}

rewrittenDataFiles.forEach(rewrite::deleteFile);
addedDataFiles.forEach(rewrite::addFile);
danglingDVs.forEach(rewrite::deleteFile);

snapshotProperties.forEach(rewrite::set);

rewrite.commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.actions.RewriteDataFiles.FileGroupInfo;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.DataFileSet;
import org.apache.iceberg.util.DeleteFileSet;

/**
* Container class representing a set of files to be rewritten by a RewriteAction and the new files
Expand Down Expand Up @@ -59,6 +62,12 @@ public Set<DataFile> rewrittenFiles() {
.collect(Collectors.toCollection(DataFileSet::create));
}

public Set<DeleteFile> danglingDVs() {
return fileScanTasks().stream()
.flatMap(task -> task.deletes().stream().filter(ContentFileUtil::isDV))
.collect(Collectors.toCollection(DeleteFileSet::create));
}

public Set<DataFile> addedFiles() {
return addedFiles;
}
Expand All @@ -70,6 +79,7 @@ public RewriteDataFiles.FileGroupRewriteResult asResult() {
.addedDataFilesCount(addedFiles.size())
.rewrittenDataFilesCount(fileScanTasks().size())
.rewrittenBytesCount(inputFilesSizeInBytes())
.removedDeleteFilesCount(danglingDVs().size())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,17 @@ public RewriteDataFiles.Result execute() {
partialProgressEnabled
? doExecuteWithPartialProgress(plan, commitManager(startingSnapshotId))
: doExecute(plan, commitManager(startingSnapshotId));
ImmutableRewriteDataFiles.Result result = resultBuilder.build();

if (removeDanglingDeletes) {
RemoveDanglingDeletesSparkAction action =
new RemoveDanglingDeletesSparkAction(spark(), table);
int removedCount = Iterables.size(action.execute().removedDeleteFiles());
resultBuilder.removedDeleteFilesCount(removedCount);
int removedDeleteFiles = Iterables.size(action.execute().removedDeleteFiles());
return result.withRemovedDeleteFilesCount(
result.removedDeleteFilesCount() + removedDeleteFiles);
}
return resultBuilder.build();

return result;
}

private void init(long startingSnapshotId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,14 @@
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.PartitioningWriter;
import org.apache.iceberg.io.RollingDataWriter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.FileRewriteCoordinator;
import org.apache.iceberg.spark.SparkWriteConf;
import org.apache.iceberg.spark.SparkWriteRequirements;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.DataFileSet;
import org.apache.iceberg.util.DeleteFileSet;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
import org.apache.spark.api.java.JavaSparkContext;
Expand Down Expand Up @@ -230,14 +231,14 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {

private void abort(WriterCommitMessage[] messages) {
if (cleanupOnAbort) {
SparkCleanupUtil.deleteFiles("job abort", table.io(), files(messages));
SparkCleanupUtil.deleteFiles("job abort", table.io(), Lists.newArrayList(files(messages)));
} else {
LOG.warn("Skipping cleanup of written files");
}
}

private List<DataFile> files(WriterCommitMessage[] messages) {
List<DataFile> files = Lists.newArrayList();
private DataFileSet files(WriterCommitMessage[] messages) {
DataFileSet files = DataFileSet.create();

for (WriterCommitMessage message : messages) {
if (message != null) {
Expand Down Expand Up @@ -294,7 +295,7 @@ public void commit(WriterCommitMessage[] messages) {
private class DynamicOverwrite extends BaseBatchWrite {
@Override
public void commit(WriterCommitMessage[] messages) {
List<DataFile> files = files(messages);
DataFileSet files = files(messages);

if (files.isEmpty()) {
LOG.info("Dynamic overwrite is empty, skipping commit");
Expand All @@ -312,7 +313,6 @@ public void commit(WriterCommitMessage[] messages) {
if (isolationLevel == SERIALIZABLE) {
dynamicOverwrite.validateNoConflictingData();
dynamicOverwrite.validateNoConflictingDeletes();

} else if (isolationLevel == SNAPSHOT) {
dynamicOverwrite.validateNoConflictingDeletes();
}
Expand Down Expand Up @@ -357,7 +357,6 @@ public void commit(WriterCommitMessage[] messages) {
if (isolationLevel == SERIALIZABLE) {
overwriteFiles.validateNoConflictingDeletes();
overwriteFiles.validateNoConflictingData();

} else if (isolationLevel == SNAPSHOT) {
overwriteFiles.validateNoConflictingDeletes();
}
Expand All @@ -377,11 +376,23 @@ private CopyOnWriteOperation(SparkCopyOnWriteScan scan, IsolationLevel isolation
this.isolationLevel = isolationLevel;
}

private List<DataFile> overwrittenFiles() {
private DataFileSet overwrittenFiles() {
if (scan == null) {
return DataFileSet.create();
} else {
return scan.tasks().stream()
.map(FileScanTask::file)
.collect(Collectors.toCollection(DataFileSet::create));
}
}

private DeleteFileSet danglingDVs() {
if (scan == null) {
return ImmutableList.of();
return DeleteFileSet.create();
} else {
return scan.tasks().stream().map(FileScanTask::file).collect(Collectors.toList());
return scan.tasks().stream()
.flatMap(task -> task.deletes().stream().filter(ContentFileUtil::isDV))
.collect(Collectors.toCollection(DeleteFileSet::create));
}
}

Expand All @@ -402,11 +413,10 @@ private Expression conflictDetectionFilter() {
public void commit(WriterCommitMessage[] messages) {
OverwriteFiles overwriteFiles = table.newOverwrite();

List<DataFile> overwrittenFiles = overwrittenFiles();
DataFileSet overwrittenFiles = overwrittenFiles();
int numOverwrittenFiles = overwrittenFiles.size();
for (DataFile overwrittenFile : overwrittenFiles) {
overwriteFiles.deleteFile(overwrittenFile);
}
DeleteFileSet danglingDVs = danglingDVs();
overwriteFiles.deleteFiles(overwrittenFiles, danglingDVs);

int numAddedFiles = 0;
for (DataFile file : files(messages)) {
Expand Down Expand Up @@ -480,7 +490,7 @@ private RewriteFiles(String fileSetID) {
@Override
public void commit(WriterCommitMessage[] messages) {
FileRewriteCoordinator coordinator = FileRewriteCoordinator.get();
coordinator.stageRewrite(table, fileSetID, DataFileSet.of(files(messages)));
coordinator.stageRewrite(table, fileSetID, files(messages));
}
}

Expand Down
Loading