Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/open-api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ jobs:
- name: Install
working-directory: ./open-api
run: pip install openapi-spec-validator==0.5.2
- name: Validate
- name: Validate REST catalog spec
working-directory: ./open-api
run: openapi-spec-validator rest-catalog-open-api.yaml
- name: Validate S3 REST Signer spec
working-directory: ./aws/src/main/resources
run: openapi-spec-validator s3-signer-open-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public S3Client s3() {
.applyMutation(awsProperties::applyHttpClientConfigurations)
.applyMutation(awsProperties::applyS3EndpointConfigurations)
.applyMutation(awsProperties::applyS3ServiceConfigurations)
.applyMutation(awsProperties::applyS3SignerConfiguration)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public S3Client s3() {
.applyMutation(awsProperties::applyS3EndpointConfigurations)
.applyMutation(awsProperties::applyS3ServiceConfigurations)
.applyMutation(awsProperties::applyS3CredentialConfigurations)
.applyMutation(awsProperties::applyS3SignerConfiguration)
.build();
}

Expand Down
84 changes: 84 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,27 @@
import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.aws.lakeformation.LakeFormationAwsClientFactory;
import org.apache.iceberg.aws.s3.S3FileIO;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SerializableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;
import software.amazon.awssdk.core.client.builder.SdkClientBuilder;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.signer.Signer;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClientBuilder;
Expand All @@ -53,6 +61,8 @@

public class AwsProperties implements Serializable {

private static final Logger LOG = LoggerFactory.getLogger(AwsProperties.class);

/**
* Type of S3 Server side encryption used, default to {@link
* AwsProperties#S3FILEIO_SSE_TYPE_NONE}.
Expand Down Expand Up @@ -261,6 +271,8 @@ public class AwsProperties implements Serializable {

public static final boolean S3_CHECKSUM_ENABLED_DEFAULT = false;

public static final String S3_SIGNER_IMPL = "s3.signer-impl";

/** Configure the batch size used when deleting multiple files from a given S3 bucket */
public static final String S3FILEIO_DELETE_BATCH_SIZE = "s3.delete.batch-size";

Expand Down Expand Up @@ -676,6 +688,9 @@ public class AwsProperties implements Serializable {
private String dynamoDbTableName;
private String dynamoDbEndpoint;

private final String s3SignerImpl;
private final Map<String, String> allProperties;

public AwsProperties() {
this.httpClientType = HTTP_CLIENT_TYPE_DEFAULT;
this.httpClientUrlConnectionConnectionTimeoutMs = null;
Expand Down Expand Up @@ -732,11 +747,16 @@ public AwsProperties() {

this.dynamoDbEndpoint = null;
this.dynamoDbTableName = DYNAMODB_TABLE_NAME_DEFAULT;

this.s3SignerImpl = null;
this.allProperties = Maps.newHashMap();

ValidationException.check(
s3KeyIdAccessKeyBothConfigured(),
"S3 client access key ID and secret access key must be set at the same time");
}

@SuppressWarnings("MethodLength")
public AwsProperties(Map<String, String> properties) {
this.httpClientType =
PropertyUtil.propertyAsString(properties, HTTP_CLIENT_TYPE, HTTP_CLIENT_TYPE_DEFAULT);
Expand Down Expand Up @@ -883,6 +903,10 @@ public AwsProperties(Map<String, String> properties) {
this.dynamoDbEndpoint = properties.get(DYNAMODB_ENDPOINT);
this.dynamoDbTableName =
PropertyUtil.propertyAsString(properties, DYNAMODB_TABLE_NAME, DYNAMODB_TABLE_NAME_DEFAULT);

this.s3SignerImpl = properties.get(S3_SIGNER_IMPL);
this.allProperties = SerializableMap.copyOf(properties);

ValidationException.check(
s3KeyIdAccessKeyBothConfigured(),
"S3 client access key ID and secret access key must be set at the same time");
Expand Down Expand Up @@ -1119,6 +1143,66 @@ public <T extends S3ClientBuilder> void applyS3ServiceConfigurations(T builder)
.build());
}

/**
* Configure a signer for an S3 client.
*
* <p>Sample usage:
*
* <pre>
* S3Client.builder().applyMutation(awsProperties::applyS3SignerConfiguration)
* </pre>
*/
public <T extends S3ClientBuilder> void applyS3SignerConfiguration(T builder) {
if (null != s3SignerImpl) {
builder.overrideConfiguration(
c -> c.putAdvancedOption(SdkAdvancedClientOption.SIGNER, loadS3SignerDynamically()));
}
}

private Signer loadS3SignerDynamically() {
// load the signer implementation dynamically
Object signer = null;
try {
signer =
DynMethods.builder("create")
.impl(s3SignerImpl, Map.class)
.buildStaticChecked()
.invoke(allProperties);
} catch (NoSuchMethodException e) {
Comment thread
nastra marked this conversation as resolved.
LOG.warn(
"Cannot find static method create(Map<String, String> properties) for signer {}",
s3SignerImpl,
e);
}

if (null == signer) {
try {
signer = DynMethods.builder("create").impl(s3SignerImpl).buildChecked().invoke(null);
} catch (NoSuchMethodException e) {
LOG.warn("Cannot find static method create() for signer {}", s3SignerImpl, e);
}
}

if (null == signer) {
// try via default no-arg constructor
try {
signer = DynConstructors.builder().impl(s3SignerImpl).buildChecked().newInstance();
} catch (NoSuchMethodException e) {
LOG.warn("Cannot find no-arg constructor for signer {}", s3SignerImpl, e);
}
}

Preconditions.checkArgument(
null != signer, "Cannot instantiate custom signer: %s", s3SignerImpl);

Preconditions.checkArgument(
signer instanceof Signer,
"Custom signer %s must be an instance of %s",
s3SignerImpl,
Signer.class.getName());
return (Signer) signer;
}

/**
* Configure the httpClient for a client according to the HttpClientType. The two supported
* HttpClientTypes are urlconnection and apache
Expand Down
116 changes: 116 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3ObjectMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.aws.s3.signer;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import java.io.IOException;
import org.apache.iceberg.rest.RESTSerializers.ErrorResponseDeserializer;
import org.apache.iceberg.rest.RESTSerializers.ErrorResponseSerializer;
import org.apache.iceberg.rest.RESTSerializers.OAuthTokenResponseDeserializer;
import org.apache.iceberg.rest.RESTSerializers.OAuthTokenResponseSerializer;
import org.apache.iceberg.rest.responses.ErrorResponse;
import org.apache.iceberg.rest.responses.OAuthTokenResponse;

public class S3ObjectMapper {

private static final JsonFactory FACTORY = new JsonFactory();
private static final ObjectMapper MAPPER = new ObjectMapper(FACTORY);
private static volatile boolean isInitialized = false;

private S3ObjectMapper() {}

static ObjectMapper mapper() {
if (!isInitialized) {
synchronized (S3ObjectMapper.class) {
if (!isInitialized) {
MAPPER.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
MAPPER.setPropertyNamingStrategy(PropertyNamingStrategies.KebabCaseStrategy.INSTANCE);
MAPPER.registerModule(initModule());
isInitialized = true;
}
}
}

return MAPPER;
}

private static SimpleModule initModule() {
return new SimpleModule()
.addSerializer(ErrorResponse.class, new ErrorResponseSerializer())
.addDeserializer(ErrorResponse.class, new ErrorResponseDeserializer())
.addSerializer(OAuthTokenResponse.class, new OAuthTokenResponseSerializer())
.addDeserializer(OAuthTokenResponse.class, new OAuthTokenResponseDeserializer())
.addSerializer(S3SignRequest.class, new S3SignRequestSerializer<>())
.addSerializer(ImmutableS3SignRequest.class, new S3SignRequestSerializer<>())
.addDeserializer(S3SignRequest.class, new S3SignRequestDeserializer<>())
.addDeserializer(ImmutableS3SignRequest.class, new S3SignRequestDeserializer<>())
.addSerializer(S3SignResponse.class, new S3SignResponseSerializer<>())
.addSerializer(ImmutableS3SignResponse.class, new S3SignResponseSerializer<>())
.addDeserializer(S3SignResponse.class, new S3SignResponseDeserializer<>())
.addDeserializer(ImmutableS3SignResponse.class, new S3SignResponseDeserializer<>());
}

public static class S3SignRequestSerializer<T extends S3SignRequest> extends JsonSerializer<T> {
@Override
public void serialize(T request, JsonGenerator gen, SerializerProvider serializers)
throws IOException {
S3SignRequestParser.toJson(request, gen);
}
}

public static class S3SignRequestDeserializer<T extends S3SignRequest>
extends JsonDeserializer<T> {
@Override
public T deserialize(JsonParser p, DeserializationContext context) throws IOException {
JsonNode jsonNode = p.getCodec().readTree(p);
return (T) S3SignRequestParser.fromJson(jsonNode);
}
}

public static class S3SignResponseSerializer<T extends S3SignResponse> extends JsonSerializer<T> {
@Override
public void serialize(T request, JsonGenerator gen, SerializerProvider serializers)
throws IOException {
S3SignResponseParser.toJson(request, gen);
}
}

public static class S3SignResponseDeserializer<T extends S3SignResponse>
extends JsonDeserializer<T> {
@Override
public T deserialize(JsonParser p, DeserializationContext context) throws IOException {
JsonNode jsonNode = p.getCodec().readTree(p);
return (T) S3SignResponseParser.fromJson(jsonNode);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.aws.s3.signer;

import java.net.URI;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.rest.RESTRequest;
import org.immutables.value.Value;

@Value.Immutable
public interface S3SignRequest extends RESTRequest {
String region();

String method();

URI uri();

Map<String, List<String>> headers();

Map<String, String> properties();

@Override
default void validate() {}
}
Loading