Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private static class TimeMicrosWriter implements ValueWriter<Integer> {

@Override
public void write(Integer timeMills, Encoder encoder) throws IOException {
encoder.writeLong(timeMills * 1000);
encoder.writeLong(timeMills * 1000L);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private static class TimeMicrosWriter implements ValueWriter<Integer> {

@Override
public void write(Integer timeMills, Encoder encoder) throws IOException {
encoder.writeLong(timeMills * 1000);
encoder.writeLong(timeMills * 1000L);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private static class TimeMicrosWriter implements ValueWriter<Integer> {

@Override
public void write(Integer timeMills, Encoder encoder) throws IOException {
encoder.writeLong(timeMills * 1000);
encoder.writeLong(timeMills * 1000L);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Time;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.table.data.RowData;
Expand All @@ -29,6 +32,7 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.DataTest;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.avro.DataReader;
Expand All @@ -37,17 +41,37 @@
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.junit.Assert;
import org.junit.Test;

public class TestFlinkAvroReaderWriter extends DataTest {

private static final int NUM_RECORDS = 100;

private static final Schema SCHEMA_NUM_TYPE = new Schema(
Types.NestedField.optional(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "int", Types.IntegerType.get()),
Types.NestedField.optional(3, "float", Types.FloatType.get()),
Types.NestedField.optional(4, "double", Types.DoubleType.get()),
Types.NestedField.optional(5, "date", Types.DateType.get()),
Types.NestedField.optional(6, "time", Types.TimeType.get()),
Types.NestedField.optional(7, "timestamp", Types.TimestampType.withoutZone()),
Types.NestedField.optional(8, "bigint", Types.LongType.get()),
Types.NestedField.optional(9, "decimal", Types.DecimalType.of(4, 2))
);

@Override
protected void writeAndValidate(Schema schema) throws IOException {
RowType flinkSchema = FlinkSchemaUtil.convert(schema);
List<Record> expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1991L);
writeAndValidate(schema, expectedRecords, NUM_RECORDS);
}

private void writeAndValidate(Schema schema, List<Record> expectedRecords, int numRecord) throws IOException {
RowType flinkSchema = FlinkSchemaUtil.convert(schema);
List<RowData> expectedRows = Lists.newArrayList(RandomRowData.convert(schema, expectedRecords));

File recordsFile = temp.newFile();
Expand All @@ -67,7 +91,7 @@ protected void writeAndValidate(Schema schema) throws IOException {
.build()) {
Iterator<Record> expected = expectedRecords.iterator();
Iterator<RowData> rows = reader.iterator();
for (int i = 0; i < NUM_RECORDS; i++) {
for (int i = 0; i < numRecord; i++) {
Assert.assertTrue("Should have expected number of records", rows.hasNext());
TestHelpers.assertRowData(schema.asStruct(), flinkSchema, expected.next(), rows.next());
}
Expand All @@ -91,11 +115,39 @@ protected void writeAndValidate(Schema schema) throws IOException {
.build()) {
Iterator<RowData> expected = expectedRows.iterator();
Iterator<Record> records = reader.iterator();
for (int i = 0; i < NUM_RECORDS; i += 1) {
for (int i = 0; i < numRecord; i += 1) {
Assert.assertTrue("Should have expected number of records", records.hasNext());
TestHelpers.assertRowData(schema.asStruct(), flinkSchema, records.next(), expected.next());
}
Assert.assertFalse("Should not have extra records", records.hasNext());
}
}

private Record recordNumType(
int id, int intV, float floatV, double doubleV, long date, long time, long timestamp,
long bigint, double decimal) {
Record record = GenericRecord.create(SCHEMA_NUM_TYPE);
record.setField("id", id);
record.setField("int", intV);
record.setField("float", floatV);
record.setField("double", doubleV);
record.setField("date", DateTimeUtil.dateFromDays((int) new Date(date).toLocalDate().toEpochDay()));
record.setField("time", new Time(time).toLocalTime());
record.setField("timestamp", DateTimeUtil.timestampFromMicros(timestamp * 1000));
record.setField("bigint", bigint);
record.setField("decimal", BigDecimal.valueOf(decimal));
return record;
}

@Test
public void testNumericTypes() throws IOException {

List<Record> expected = ImmutableList.of(
recordNumType(2, Integer.MAX_VALUE, Float.MAX_VALUE, Double.MAX_VALUE, Long.MAX_VALUE,
1643811742000L, 1643811742000L, 1643811742000L, 10.24d),
recordNumType(2, Integer.MIN_VALUE, Float.MIN_VALUE, Double.MIN_VALUE, Long.MIN_VALUE,
1643811742000L, 1643811742000L, 1643811742000L, 10.24d));

writeAndValidate(SCHEMA_NUM_TYPE, expected, 2);
}
}