Skip to content

Commit

Permalink
Fix timpe-partion datetime validation
Browse files Browse the repository at this point in the history
  • Loading branch information
psainics committed Sep 11, 2024
1 parent f157898 commit 05fae0b
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -526,23 +526,21 @@ private void validateTimePartitioningColumn(String columnName, FailureCollector

boolean isTimestamp = logicalType == LogicalType.TIMESTAMP_MICROS || logicalType == LogicalType.TIMESTAMP_MILLIS;
boolean isDate = logicalType == LogicalType.DATE;
boolean isTimestampOrDate = isTimestamp || isDate;

// If timePartitioningType is HOUR, then logicalType cannot be DATE Only TIMESTAMP_MICROS and TIMESTAMP_MILLIS
if (timePartitioningType == TimePartitioning.Type.HOUR && !isTimestamp) {
collector.addFailure(
String.format("Partition column '%s' is of invalid type '%s'.",
boolean isDateTime = logicalType == LogicalType.DATETIME;
boolean isTimestampOrDateOrDateTime = isTimestamp || isDate || isDateTime;
boolean isTimestampOrDateTime = isTimestamp || isDateTime;
// TimePartitioningType HOUR is supported by TIMESTAMP_MICROS, TIMESTAMP_MILLIS, DATETIME
if (timePartitioningType == TimePartitioning.Type.HOUR && !isTimestampOrDateTime) {
collector.addFailure(String.format("Partition column '%s' is of invalid type '%s'.",
columnName, fieldSchema.getDisplayName()),
"Partition column must be a timestamp.").withConfigProperty(NAME_PARTITION_BY_FIELD)
.withOutputSchemaField(columnName).withInputSchemaField(columnName);

// For any other timePartitioningType (DAY, MONTH, YEAR) logicalType can be DATE, TIMESTAMP_MICROS, TIMESTAMP_MILLIS
} else if (!isTimestampOrDate) {
collector.addFailure(
String.format("Partition column '%s' is of invalid type '%s'.",
"Partition column must be of type TIMESTAMP or DATETIME")
.withConfigProperty(NAME_PARTITION_BY_FIELD).withOutputSchemaField(columnName).withInputSchemaField(columnName);
// TimePartitioningType (DAY, MONTH, YEAR) are supported by TIMESTAMP_MICROS, TIMESTAMP_MILLIS, DATE, DATETIME
} else if (!isTimestampOrDateOrDateTime) {
collector.addFailure(String.format("Partition column '%s' is of invalid type '%s'.",
columnName, fieldSchema.getDisplayName()),
"Partition column must be a date or timestamp.").withConfigProperty(NAME_PARTITION_BY_FIELD)
.withOutputSchemaField(columnName).withInputSchemaField(columnName);
"Partition column must be of type TIMESTAMP, DATE or DATETIME")
.withConfigProperty(NAME_PARTITION_BY_FIELD).withOutputSchemaField(columnName).withInputSchemaField(columnName);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,36 @@ public void testValidateTimePartitioningColumnWithNullAndDate() throws
Assert.assertEquals(0, collector.getValidationFailures().size());
}

@Test
public void testValidateTimePartitioningColumnWithMonthAndDateTime() throws
InvocationTargetException, IllegalAccessException {

String columnName = "partitionFrom";
Schema schema = Schema.of(Schema.LogicalType.DATETIME);

Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema;
TimePartitioning.Type timePartitioningType = TimePartitioning.Type.MONTH;

validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType);
// No error as null time timePartitioningType will default to DAY
Assert.assertEquals(0, collector.getValidationFailures().size());
}

@Test
public void testValidateTimePartitioningColumnWithHourAndDateTime() throws
InvocationTargetException, IllegalAccessException {

String columnName = "partitionFrom";
Schema schema = Schema.of(Schema.LogicalType.DATETIME);

Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema;
TimePartitioning.Type timePartitioningType = TimePartitioning.Type.HOUR;

validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType);
// No error as null time timePartitioningType will default to DAY
Assert.assertEquals(0, collector.getValidationFailures().size());
}

@Test
public void testValidateColumnNameWithValidColumnName() {
String columnName = "test";
Expand Down

0 comments on commit 05fae0b

Please sign in to comment.