-
Notifications
You must be signed in to change notification settings - Fork 5.4k
Timeout reading delta tables after incremental updates with null values #24920
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
7a30fbd
to
5efc8d1
Compare
5efc8d1
to
c25b334
Compare
Optional<Long> snapshotVersion = Optional.of(snapshot.getVersion(deltaEngine.get())); | ||
List<DeltaColumn> schema = getSchema(config, schemaTableName, deltaEngine.get(), snapshot); | ||
return Optional.of(new DeltaTable( | ||
schemaTableName.getSchemaName(), | ||
schemaTableName.getTableName(), | ||
tableLocation, | ||
Optional.of(snapshot.getVersion(deltaEngine.get())), // lock the snapshot version | ||
getSchema(config, schemaTableName, deltaEngine.get(), snapshot))); | ||
snapshotVersion, // lock the snapshot version | ||
schema)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This diff doesn't seem to materially change anything. Can we revert it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted
@@ -127,44 +127,62 @@ private static class AllFilesIterator | |||
implements CloseableIterator<Row> | |||
{ | |||
private final CloseableIterator<FilteredColumnarBatch> inputIterator; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic in all of these iterators seems quite complex. I know you're just updating the logic, but I think we could vastly simplify them. Would something like this suffice? Maybe I'm missing something, but it doesn't seem like the iterator is doing all that much.
private static class AllFilesIterator
implements CloseableIterator<Row>
{
private final CloseableIterator<FilteredColumnarBatch> inputIterator;
private Iterator<Row> rows;
private CloseableIterator<Row> prev;
public AllFilesIterator(CloseableIterator<FilteredColumnarBatch> inputIterator)
{
this.inputIterator = inputIterator;
this.rows = Streams.stream(inputIterator)
.flatMap(batch -> {
if (prev != null) {
try {
prev.close();
}
catch (IOException e) {
throw new RuntimeException("Failed to close previous rowBatch");
}
}
prev = batch.getRows();
return Streams.stream(prev);
})
.iterator();
}
@Override
public boolean hasNext()
{
return rows.hasNext();
}
@Override
public Row next()
{
return rows.next();
}
@Override
public void close()
throws IOException
{
inputIterator.close();
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flatten the batches is a clever solution. It makes it more legible and easier to understand. I've adopted your approach. Thanks!!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating this! I took a closer look and think we also need similar logic in the map
in order to close the previous item. So maybe edit the change to be like:
private final CloseableIterator<FilteredColumnarBatch> inputIterator;
private Iterator<Row> rows;
private @Nullable CloseableIterator<Row> prevBatch;
private @Nullable Row prevRow;
public AllFilesIterator(CloseableIterator<FilteredColumnarBatch> inputIterator)
{
this.inputIterator = inputIterator;
this.rows = Streams.stream(inputIterator)
.flatMap(batch -> {
if (prevBatch != null) {
try {
prevBatch.close();
}
catch (IOException e) {
throw new RuntimeException("Failed to close previous row batch");
}
}
prevBatch = batch.getRows();
return Streams.stream(prev);
})
.map(row -> {
if (prevRow != null) {
try {
prevRow.close();
}
catch (IOException e) {
throw new RuntimeException("Failed to close previous row");
}
}
prevRow = row;
return row;
})
.iterator();
}
Also, it seems similar logic is copied. It would be great to make this generic so that we don't need to copy/paste this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The interface Row
does not implement closeable. However prev
has to be closed in the close()
method of the AllFilesIterator
class, as the last batch is not closed by the map function.
…es (prestodb#24919) Simplified logic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible to generate these tables during the test rather than commit the files? I know we might have some tables/tests which use pre-baked files already, but I am not a huge fan of committing data like this, even if it is small. Not a hard requirement, but it would be nice to make an effort to do that if possible.
Presto does not have support for incremental updates in Presto, so the data cannot be created. Those files were created with another tool, so I don't see it viable. |
…es (prestodb#24919) Refactor logic to avoid duplicated code Ensure to close last row batch in the close() method
Added refactor to the code to avoid duplicated code and ensure row batch closing |
Description
Changed the batch management to take into account the case the batch returns empty row iterators.
In testing:
Motivation and Context
Fixes issue #24919
Impact
Allows the conector to query delta tables after huge incremental updates with null values
Test Plan
Added new test suite that tests the changes querying a table with incremental updates of null values in non partitioned and partitioned columns.
Contributor checklist
Release Notes