diff --git a/.github/workflows/open-api.yml b/.github/workflows/open-api.yml index 0a8b828c8de8..04f3e7a907af 100644 --- a/.github/workflows/open-api.yml +++ b/.github/workflows/open-api.yml @@ -46,6 +46,9 @@ jobs: - name: Install working-directory: ./open-api run: pip install openapi-spec-validator==0.5.2 - - name: Validate + - name: Validate REST catalog spec working-directory: ./open-api run: openapi-spec-validator rest-catalog-open-api.yaml + - name: Validate S3 REST Signer spec + working-directory: ./aws/src/main/resources + run: openapi-spec-validator s3-signer-open-api.yaml diff --git a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java index 8c1b7f23ad1f..c7e93879921a 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java @@ -43,6 +43,7 @@ public S3Client s3() { .applyMutation(awsProperties::applyHttpClientConfigurations) .applyMutation(awsProperties::applyS3EndpointConfigurations) .applyMutation(awsProperties::applyS3ServiceConfigurations) + .applyMutation(awsProperties::applyS3SignerConfiguration) .build(); } diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java index 39b8ea4e46e8..178025342838 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java @@ -102,6 +102,7 @@ public S3Client s3() { .applyMutation(awsProperties::applyS3EndpointConfigurations) .applyMutation(awsProperties::applyS3ServiceConfigurations) .applyMutation(awsProperties::applyS3CredentialConfigurations) + .applyMutation(awsProperties::applyS3SignerConfiguration) .build(); } diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java index 7696cb960527..7cf46d7c3f3b 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java @@ -29,12 +29,18 @@ import org.apache.iceberg.aws.glue.GlueCatalog; import org.apache.iceberg.aws.lakeformation.LakeFormationAwsClientFactory; import org.apache.iceberg.aws.s3.S3FileIO; +import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Strings; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SerializableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; @@ -42,6 +48,8 @@ import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder; import software.amazon.awssdk.core.client.builder.SdkClientBuilder; +import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption; +import software.amazon.awssdk.core.signer.Signer; import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; import software.amazon.awssdk.services.dynamodb.DynamoDbClientBuilder; @@ -53,6 +61,8 @@ public class AwsProperties implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(AwsProperties.class); + /** * Type of S3 Server side encryption used, default to {@link * AwsProperties#S3FILEIO_SSE_TYPE_NONE}. @@ -261,6 +271,8 @@ public class AwsProperties implements Serializable { public static final boolean S3_CHECKSUM_ENABLED_DEFAULT = false; + public static final String S3_SIGNER_IMPL = "s3.signer-impl"; + /** Configure the batch size used when deleting multiple files from a given S3 bucket */ public static final String S3FILEIO_DELETE_BATCH_SIZE = "s3.delete.batch-size"; @@ -676,6 +688,9 @@ public class AwsProperties implements Serializable { private String dynamoDbTableName; private String dynamoDbEndpoint; + private final String s3SignerImpl; + private final Map allProperties; + public AwsProperties() { this.httpClientType = HTTP_CLIENT_TYPE_DEFAULT; this.httpClientUrlConnectionConnectionTimeoutMs = null; @@ -732,11 +747,16 @@ public AwsProperties() { this.dynamoDbEndpoint = null; this.dynamoDbTableName = DYNAMODB_TABLE_NAME_DEFAULT; + + this.s3SignerImpl = null; + this.allProperties = Maps.newHashMap(); + ValidationException.check( s3KeyIdAccessKeyBothConfigured(), "S3 client access key ID and secret access key must be set at the same time"); } + @SuppressWarnings("MethodLength") public AwsProperties(Map properties) { this.httpClientType = PropertyUtil.propertyAsString(properties, HTTP_CLIENT_TYPE, HTTP_CLIENT_TYPE_DEFAULT); @@ -883,6 +903,10 @@ public AwsProperties(Map properties) { this.dynamoDbEndpoint = properties.get(DYNAMODB_ENDPOINT); this.dynamoDbTableName = PropertyUtil.propertyAsString(properties, DYNAMODB_TABLE_NAME, DYNAMODB_TABLE_NAME_DEFAULT); + + this.s3SignerImpl = properties.get(S3_SIGNER_IMPL); + this.allProperties = SerializableMap.copyOf(properties); + ValidationException.check( s3KeyIdAccessKeyBothConfigured(), "S3 client access key ID and secret access key must be set at the same time"); @@ -1119,6 +1143,66 @@ public void applyS3ServiceConfigurations(T builder) .build()); } + /** + * Configure a signer for an S3 client. + * + *

Sample usage: + * + *

+   *     S3Client.builder().applyMutation(awsProperties::applyS3SignerConfiguration)
+   * 
+ */ + public void applyS3SignerConfiguration(T builder) { + if (null != s3SignerImpl) { + builder.overrideConfiguration( + c -> c.putAdvancedOption(SdkAdvancedClientOption.SIGNER, loadS3SignerDynamically())); + } + } + + private Signer loadS3SignerDynamically() { + // load the signer implementation dynamically + Object signer = null; + try { + signer = + DynMethods.builder("create") + .impl(s3SignerImpl, Map.class) + .buildStaticChecked() + .invoke(allProperties); + } catch (NoSuchMethodException e) { + LOG.warn( + "Cannot find static method create(Map properties) for signer {}", + s3SignerImpl, + e); + } + + if (null == signer) { + try { + signer = DynMethods.builder("create").impl(s3SignerImpl).buildChecked().invoke(null); + } catch (NoSuchMethodException e) { + LOG.warn("Cannot find static method create() for signer {}", s3SignerImpl, e); + } + } + + if (null == signer) { + // try via default no-arg constructor + try { + signer = DynConstructors.builder().impl(s3SignerImpl).buildChecked().newInstance(); + } catch (NoSuchMethodException e) { + LOG.warn("Cannot find no-arg constructor for signer {}", s3SignerImpl, e); + } + } + + Preconditions.checkArgument( + null != signer, "Cannot instantiate custom signer: %s", s3SignerImpl); + + Preconditions.checkArgument( + signer instanceof Signer, + "Custom signer %s must be an instance of %s", + s3SignerImpl, + Signer.class.getName()); + return (Signer) signer; + } + /** * Configure the httpClient for a client according to the HttpClientType. The two supported * HttpClientTypes are urlconnection and apache diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3ObjectMapper.java b/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3ObjectMapper.java new file mode 100644 index 000000000000..2b1ed49899b5 --- /dev/null +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3ObjectMapper.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws.s3.signer; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.module.SimpleModule; +import java.io.IOException; +import org.apache.iceberg.rest.RESTSerializers.ErrorResponseDeserializer; +import org.apache.iceberg.rest.RESTSerializers.ErrorResponseSerializer; +import org.apache.iceberg.rest.RESTSerializers.OAuthTokenResponseDeserializer; +import org.apache.iceberg.rest.RESTSerializers.OAuthTokenResponseSerializer; +import org.apache.iceberg.rest.responses.ErrorResponse; +import org.apache.iceberg.rest.responses.OAuthTokenResponse; + +public class S3ObjectMapper { + + private static final JsonFactory FACTORY = new JsonFactory(); + private static final ObjectMapper MAPPER = new ObjectMapper(FACTORY); + private static volatile boolean isInitialized = false; + + private S3ObjectMapper() {} + + static ObjectMapper mapper() { + if (!isInitialized) { + synchronized (S3ObjectMapper.class) { + if (!isInitialized) { + MAPPER.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); + MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + MAPPER.setPropertyNamingStrategy(PropertyNamingStrategies.KebabCaseStrategy.INSTANCE); + MAPPER.registerModule(initModule()); + isInitialized = true; + } + } + } + + return MAPPER; + } + + private static SimpleModule initModule() { + return new SimpleModule() + .addSerializer(ErrorResponse.class, new ErrorResponseSerializer()) + .addDeserializer(ErrorResponse.class, new ErrorResponseDeserializer()) + .addSerializer(OAuthTokenResponse.class, new OAuthTokenResponseSerializer()) + .addDeserializer(OAuthTokenResponse.class, new OAuthTokenResponseDeserializer()) + .addSerializer(S3SignRequest.class, new S3SignRequestSerializer<>()) + .addSerializer(ImmutableS3SignRequest.class, new S3SignRequestSerializer<>()) + .addDeserializer(S3SignRequest.class, new S3SignRequestDeserializer<>()) + .addDeserializer(ImmutableS3SignRequest.class, new S3SignRequestDeserializer<>()) + .addSerializer(S3SignResponse.class, new S3SignResponseSerializer<>()) + .addSerializer(ImmutableS3SignResponse.class, new S3SignResponseSerializer<>()) + .addDeserializer(S3SignResponse.class, new S3SignResponseDeserializer<>()) + .addDeserializer(ImmutableS3SignResponse.class, new S3SignResponseDeserializer<>()); + } + + public static class S3SignRequestSerializer extends JsonSerializer { + @Override + public void serialize(T request, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + S3SignRequestParser.toJson(request, gen); + } + } + + public static class S3SignRequestDeserializer + extends JsonDeserializer { + @Override + public T deserialize(JsonParser p, DeserializationContext context) throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + return (T) S3SignRequestParser.fromJson(jsonNode); + } + } + + public static class S3SignResponseSerializer extends JsonSerializer { + @Override + public void serialize(T request, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + S3SignResponseParser.toJson(request, gen); + } + } + + public static class S3SignResponseDeserializer + extends JsonDeserializer { + @Override + public T deserialize(JsonParser p, DeserializationContext context) throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + return (T) S3SignResponseParser.fromJson(jsonNode); + } + } +} diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3SignRequest.java b/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3SignRequest.java new file mode 100644 index 000000000000..d8323fd868b6 --- /dev/null +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3SignRequest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws.s3.signer; + +import java.net.URI; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.rest.RESTRequest; +import org.immutables.value.Value; + +@Value.Immutable +public interface S3SignRequest extends RESTRequest { + String region(); + + String method(); + + URI uri(); + + Map> headers(); + + Map properties(); + + @Override + default void validate() {} +} diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3SignRequestParser.java b/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3SignRequestParser.java new file mode 100644 index 000000000000..f5d6f711a072 --- /dev/null +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3SignRequestParser.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws.s3.signer; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.JsonUtil; + +public class S3SignRequestParser { + + private static final String REGION = "region"; + private static final String METHOD = "method"; + private static final String URI = "uri"; + private static final String HEADERS = "headers"; + private static final String PROPERTIES = "properties"; + + private S3SignRequestParser() {} + + public static String toJson(S3SignRequest request) { + return toJson(request, false); + } + + public static String toJson(S3SignRequest request, boolean pretty) { + return JsonUtil.generate(gen -> toJson(request, gen), pretty); + } + + public static void toJson(S3SignRequest request, JsonGenerator gen) throws IOException { + Preconditions.checkArgument(null != request, "Invalid s3 sign request: null"); + + gen.writeStartObject(); + + gen.writeStringField(REGION, request.region()); + gen.writeStringField(METHOD, request.method()); + gen.writeStringField(URI, request.uri().toString()); + headersToJson(HEADERS, request.headers(), gen); + + if (!request.properties().isEmpty()) { + JsonUtil.writeStringMap(PROPERTIES, request.properties(), gen); + } + + gen.writeEndObject(); + } + + public static S3SignRequest fromJson(String json) { + return JsonUtil.parse(json, S3SignRequestParser::fromJson); + } + + public static S3SignRequest fromJson(JsonNode json) { + Preconditions.checkArgument(null != json, "Cannot parse s3 sign request from null object"); + Preconditions.checkArgument( + json.isObject(), "Cannot parse s3 sign request from non-object: %s", json); + + String region = JsonUtil.getString(REGION, json); + String method = JsonUtil.getString(METHOD, json); + java.net.URI uri = java.net.URI.create(JsonUtil.getString(URI, json)); + Map> headers = headersFromJson(HEADERS, json); + + ImmutableS3SignRequest.Builder builder = + ImmutableS3SignRequest.builder().region(region).method(method).uri(uri).headers(headers); + + if (json.has(PROPERTIES)) { + builder.properties(JsonUtil.getStringMap(PROPERTIES, json)); + } + + return builder.build(); + } + + static void headersToJson(String property, Map> headers, JsonGenerator gen) + throws IOException { + gen.writeObjectFieldStart(property); + for (Entry> entry : headers.entrySet()) { + gen.writeFieldName(entry.getKey()); + + gen.writeStartArray(); + for (String val : entry.getValue()) { + gen.writeString(val); + } + gen.writeEndArray(); + } + gen.writeEndObject(); + } + + static Map> headersFromJson(String property, JsonNode json) { + Map> headers = Maps.newHashMap(); + JsonNode headersNode = JsonUtil.get(property, json); + headersNode + .fields() + .forEachRemaining( + entry -> { + String key = entry.getKey(); + List values = Arrays.asList(JsonUtil.getStringArray(entry.getValue())); + headers.put(key, values); + }); + return headers; + } +} diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3SignResponse.java b/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3SignResponse.java new file mode 100644 index 000000000000..40c2059488f8 --- /dev/null +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3SignResponse.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws.s3.signer; + +import java.net.URI; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.rest.RESTResponse; +import org.immutables.value.Value; + +@Value.Immutable +public interface S3SignResponse extends RESTResponse { + URI uri(); + + Map> headers(); + + @Override + default void validate() {} +} diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3SignResponseParser.java b/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3SignResponseParser.java new file mode 100644 index 000000000000..69d6de8f04ac --- /dev/null +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3SignResponseParser.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws.s3.signer; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.JsonUtil; + +public class S3SignResponseParser { + + private static final String URI = "uri"; + private static final String HEADERS = "headers"; + + private S3SignResponseParser() {} + + public static String toJson(S3SignResponse request) { + return toJson(request, false); + } + + public static String toJson(S3SignResponse request, boolean pretty) { + return JsonUtil.generate(gen -> toJson(request, gen), pretty); + } + + public static void toJson(S3SignResponse response, JsonGenerator gen) throws IOException { + Preconditions.checkArgument(null != response, "Invalid s3 sign response: null"); + + gen.writeStartObject(); + + gen.writeStringField(URI, response.uri().toString()); + S3SignRequestParser.headersToJson(HEADERS, response.headers(), gen); + + gen.writeEndObject(); + } + + public static S3SignResponse fromJson(String json) { + return JsonUtil.parse(json, S3SignResponseParser::fromJson); + } + + public static S3SignResponse fromJson(JsonNode json) { + Preconditions.checkArgument(null != json, "Cannot parse s3 sign response from null object"); + Preconditions.checkArgument( + json.isObject(), "Cannot parse s3 sign response from non-object: %s", json); + + java.net.URI uri = java.net.URI.create(JsonUtil.getString(URI, json)); + Map> headers = S3SignRequestParser.headersFromJson(HEADERS, json); + + return ImmutableS3SignResponse.builder().uri(uri).headers(headers).build(); + } +} diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java b/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java new file mode 100644 index 000000000000..4b8a29203a0a --- /dev/null +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws.s3.signer; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import java.net.URI; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Supplier; +import javax.annotation.Nullable; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.rest.ErrorHandlers; +import org.apache.iceberg.rest.HTTPClient; +import org.apache.iceberg.rest.RESTClient; +import org.apache.iceberg.rest.auth.OAuth2Properties; +import org.apache.iceberg.rest.auth.OAuth2Util; +import org.apache.iceberg.rest.auth.OAuth2Util.AuthSession; +import org.apache.iceberg.rest.responses.OAuthTokenResponse; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.ThreadPools; +import org.immutables.value.Value; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.signer.internal.AbstractAws4Signer; +import software.amazon.awssdk.auth.signer.internal.Aws4SignerRequestParams; +import software.amazon.awssdk.auth.signer.params.Aws4PresignerParams; +import software.amazon.awssdk.auth.signer.params.AwsS3V4SignerParams; +import software.amazon.awssdk.core.checksums.SdkChecksum; +import software.amazon.awssdk.core.interceptor.ExecutionAttributes; +import software.amazon.awssdk.http.SdkHttpFullRequest; + +@Value.Immutable +public abstract class S3V4RestSignerClient + extends AbstractAws4Signer { + + private static final Logger LOG = LoggerFactory.getLogger(S3V4RestSignerClient.class); + public static final String S3_SIGNER_URI = "s3.signer.uri"; + public static final String S3_SIGNER_ENDPOINT = "s3.signer.endpoint"; + static final String S3_SIGNER_DEFAULT_ENDPOINT = "v1/aws/s3/sign"; + static final String UNSIGNED_PAYLOAD = "UNSIGNED-PAYLOAD"; + static final String CACHE_CONTROL = "Cache-Control"; + static final String CACHE_CONTROL_PRIVATE = "private"; + static final String CACHE_CONTROL_NO_CACHE = "no-cache"; + + private static final Cache SIGNED_COMPONENT_CACHE = + Caffeine.newBuilder().expireAfterWrite(30, TimeUnit.SECONDS).maximumSize(100).build(); + + private static final ScheduledExecutorService TOKEN_REFRESH_EXECUTOR = + ThreadPools.newScheduledPool("s3-signer-token-refresh", 1); + private static final String SCOPE = "sign"; + private static RESTClient httpClient; + + public abstract Map properties(); + + @Value.Default + public Supplier> requestPropertiesSupplier() { + return Collections::emptyMap; + } + + @Value.Lazy + public String baseSignerUri() { + return properties().getOrDefault(S3_SIGNER_URI, properties().get(CatalogProperties.URI)); + } + + @Value.Lazy + public String endpoint() { + return properties().getOrDefault(S3_SIGNER_ENDPOINT, S3_SIGNER_DEFAULT_ENDPOINT); + } + + /** A credential to exchange for a token in the OAuth2 client credentials flow. */ + @Nullable + @Value.Lazy + public String credential() { + return properties().get(OAuth2Properties.CREDENTIAL); + } + + /** A Bearer token which will be used for interaction with the server. */ + @Nullable + @Value.Lazy + public String token() { + return properties().get(OAuth2Properties.TOKEN); + } + + private RESTClient httpClient() { + if (null == httpClient) { + // TODO: should be closed + httpClient = + HTTPClient.builder() + .uri(baseSignerUri()) + .withObjectMapper(S3ObjectMapper.mapper()) + .build(); + } + + return httpClient; + } + + @Value.Lazy + AuthSession authSession() { + if (null != token()) { + return AuthSession.fromAccessToken( + httpClient(), + TOKEN_REFRESH_EXECUTOR, + token(), + expiresAtMillis(properties()), + new AuthSession(ImmutableMap.of(), token(), null, credential(), SCOPE)); + } + + if (credentialProvided()) { + AuthSession session = new AuthSession(ImmutableMap.of(), token(), null, credential(), SCOPE); + long startTimeMillis = System.currentTimeMillis(); + OAuthTokenResponse authResponse = + OAuth2Util.fetchToken(httpClient(), session.headers(), credential(), SCOPE); + return AuthSession.fromTokenResponse( + httpClient(), TOKEN_REFRESH_EXECUTOR, authResponse, startTimeMillis, session); + } + + return AuthSession.empty(); + } + + private boolean credentialProvided() { + return null != credential() && !credential().isEmpty(); + } + + private Long expiresAtMillis(Map properties) { + if (properties.containsKey(OAuth2Properties.TOKEN_EXPIRES_IN_MS)) { + long expiresInMillis = + PropertyUtil.propertyAsLong( + properties, + OAuth2Properties.TOKEN_EXPIRES_IN_MS, + OAuth2Properties.TOKEN_EXPIRES_IN_MS_DEFAULT); + return System.currentTimeMillis() + expiresInMillis; + } else { + return null; + } + } + + @Value.Check + protected void check() { + Preconditions.checkArgument( + properties().containsKey(S3_SIGNER_URI) || properties().containsKey(CatalogProperties.URI), + "S3 signer service URI is required"); + } + + @Override + protected void processRequestPayload( + SdkHttpFullRequest.Builder mutableRequest, + byte[] signature, + byte[] signingKey, + Aws4SignerRequestParams signerRequestParams, + AwsS3V4SignerParams signerParams) { + checkSignerParams(signerParams); + } + + @Override + protected void processRequestPayload( + SdkHttpFullRequest.Builder mutableRequest, + byte[] signature, + byte[] signingKey, + Aws4SignerRequestParams signerRequestParams, + AwsS3V4SignerParams signerParams, + SdkChecksum sdkChecksum) { + checkSignerParams(signerParams); + } + + @Override + protected String calculateContentHashPresign( + SdkHttpFullRequest.Builder mutableRequest, Aws4PresignerParams signerParams) { + return UNSIGNED_PAYLOAD; + } + + @Override + public SdkHttpFullRequest presign( + SdkHttpFullRequest request, ExecutionAttributes executionAttributes) { + throw new UnsupportedOperationException("Pre-signing not allowed."); + } + + @Override + public SdkHttpFullRequest sign( + SdkHttpFullRequest request, ExecutionAttributes executionAttributes) { + AwsS3V4SignerParams signerParams = + extractSignerParams(AwsS3V4SignerParams.builder(), executionAttributes).build(); + + S3SignRequest remoteSigningRequest = + ImmutableS3SignRequest.builder() + .method(request.method().name()) + .region(signerParams.signingRegion().id()) + .uri(request.getUri()) + .headers(request.headers()) + .properties(requestPropertiesSupplier().get()) + .build(); + + Key cacheKey = Key.from(remoteSigningRequest); + SignedComponent cachedSignedComponent = SIGNED_COMPONENT_CACHE.getIfPresent(cacheKey); + SignedComponent signedComponent; + + if (null != cachedSignedComponent) { + signedComponent = cachedSignedComponent; + } else { + Map responseHeaders = Maps.newHashMap(); + Consumer> responseHeadersConsumer = responseHeaders::putAll; + S3SignResponse s3SignResponse = + httpClient() + .post( + endpoint(), + remoteSigningRequest, + S3SignResponse.class, + () -> authSession().headers(), + ErrorHandlers.defaultErrorHandler(), + responseHeadersConsumer); + + signedComponent = + ImmutableSignedComponent.builder() + .headers(s3SignResponse.headers()) + .signedURI(s3SignResponse.uri()) + .build(); + + if (canBeCached(responseHeaders)) { + SIGNED_COMPONENT_CACHE.put(cacheKey, signedComponent); + } + } + + // The SdkHttpFullRequest Builder appends the raw path from the input URI in .uri(), + // so we need to clear the current path from the request + SdkHttpFullRequest.Builder mutableRequest = request.toBuilder(); + mutableRequest.encodedPath(""); + mutableRequest.uri(signedComponent.signedURI()); + reconstructHeaders(signedComponent.headers(), mutableRequest); + + return mutableRequest.build(); + } + + private void reconstructHeaders( + Map> signedAndUnsignedHeaders, + SdkHttpFullRequest.Builder mutableRequest) { + Map> headers = Maps.newHashMap(signedAndUnsignedHeaders); + // we need to remove the Cache-Control header that is being sent by the server + headers.remove(CACHE_CONTROL); + + // we need to overwrite whatever headers the server signed/unsigned with the ones from the + // original request and then put all headers back to the request + headers.putAll(mutableRequest.headers()); + headers.forEach(mutableRequest::putHeader); + } + + private boolean canBeCached(Map responseHeaders) { + return CACHE_CONTROL_PRIVATE.equals(responseHeaders.get(CACHE_CONTROL)); + } + + private void checkSignerParams(AwsS3V4SignerParams signerParams) { + if (signerParams.enablePayloadSigning()) { + throw new UnsupportedOperationException("Payload signing not supported"); + } + + if (signerParams.enableChunkedEncoding()) { + throw new UnsupportedOperationException("Chunked encoding not supported"); + } + } + + @Value.Immutable + interface Key { + String method(); + + String region(); + + String uri(); + + static Key from(S3SignRequest request) { + return ImmutableKey.builder() + .method(request.method()) + .region(request.region()) + .uri(request.uri().toString()) + .build(); + } + } + + @Value.Immutable + interface SignedComponent { + Map> headers(); + + URI signedURI(); + } + + public static S3V4RestSignerClient create(Map properties) { + return ImmutableS3V4RestSignerClient.builder().properties(properties).build(); + } +} diff --git a/aws/src/main/resources/s3-signer-open-api.yaml b/aws/src/main/resources/s3-signer-open-api.yaml new file mode 100644 index 000000000000..be9775b82cbf --- /dev/null +++ b/aws/src/main/resources/s3-signer-open-api.yaml @@ -0,0 +1,146 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +--- +openapi: 3.0.3 +info: + title: Apache Iceberg S3 Signer API + license: + name: Apache 2.0 + url: https://www.apache.org/licenses/LICENSE-2.0.html + version: 0.0.1 + description: + Defines the specification for the S3 Signer API. +servers: + - url: "{scheme}://{host}/{basePath}" + description: Server URL when the port can be inferred from the scheme + variables: + scheme: + description: The scheme of the URI, either http or https. + default: https + host: + description: The host address for the specified server + default: localhost + basePath: + description: Optional prefix to be prepended to all routes + default: "" + - url: "{scheme}://{host}:{port}/{basePath}" + description: Generic base server URL, with all parts configurable + variables: + scheme: + description: The scheme of the URI, either http or https. + default: https + host: + description: The host address for the specified server + default: localhost + port: + description: The port used when addressing the host + default: "443" + basePath: + description: Optional prefix to be appended to all routes + default: "" + +paths: + + /v1/aws/s3/sign: + + post: + tags: + - S3 Signer API + summary: Remotely signs S3 requests + operationId: signS3Request + requestBody: + description: The request containing the headers to be signed + content: + application/json: + schema: + $ref: '#/components/schemas/S3SignRequest' + required: true + responses: + 200: + $ref: '#/components/responses/S3SignResponse' + 400: + $ref: '../../../../open-api/rest-catalog-open-api.yaml#/components/responses/BadRequestErrorResponse' + 401: + $ref: '../../../../open-api/rest-catalog-open-api.yaml#/components/responses/UnauthorizedResponse' + 403: + $ref: '../../../../open-api/rest-catalog-open-api.yaml#/components/responses/ForbiddenResponse' + 419: + $ref: '../../../../open-api/rest-catalog-open-api.yaml#/components/responses/AuthenticationTimeoutResponse' + 503: + $ref: '../../../../open-api/rest-catalog-open-api.yaml#/components/responses/ServiceUnavailableResponse' + 5XX: + $ref: '../../../../open-api/rest-catalog-open-api.yaml#/components/responses/ServerErrorResponse' + + ############################## + # Application Schema Objects # + ############################## +components: + schemas: + + S3Headers: + type: object + additionalProperties: + type: array + items: + type: string + + S3SignRequest: + required: + - region + - uri + - method + - headers + properties: + region: + type: string + uri: + type: string + method: + type: string + enum: ["PUT", "GET", "HEAD", "POST", "DELETE", "PATCH", "OPTIONS"] + headers: + $ref: '#/components/schemas/S3Headers' + properties: + type: object + additionalProperties: + type: string + + + ############################# + # Reusable Response Objects # + ############################# + responses: + + S3SignResponse: + description: The response containing signed & unsigned headers. The server will also send + a Cache-Control header, indicating whether the response can be cached (Cache-Control = ["private"]) + or not (Cache-Control = ["no-cache"]). + content: + application/json: + schema: + type: object + required: + - uri + - headers + properties: + uri: + type: string + headers: + $ref: '#/components/schemas/S3Headers' diff --git a/aws/src/test/java/org/apache/iceberg/aws/TestAwsProperties.java b/aws/src/test/java/org/apache/iceberg/aws/TestAwsProperties.java index b1586ff46ad5..25455d954b56 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/TestAwsProperties.java +++ b/aws/src/test/java/org/apache/iceberg/aws/TestAwsProperties.java @@ -22,10 +22,14 @@ import java.time.Duration; import java.util.Collections; import java.util.Map; +import java.util.Optional; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.aws.s3.signer.S3V4RestSignerClient; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -34,6 +38,9 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.auth.signer.AwsS3V4Signer; +import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption; +import software.amazon.awssdk.core.signer.Signer; import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; @@ -606,4 +613,77 @@ public void testKryoSerialization() throws IOException { awsPropertiesWithEmptyProps.s3BucketToAccessPointMapping(), deSerializedAwsPropertiesWithEmptyProps.s3BucketToAccessPointMapping()); } + + @Test + public void testS3RemoteSignerWithoutUri() { + Map properties = + ImmutableMap.of(AwsProperties.S3_SIGNER_IMPL, S3V4RestSignerClient.class.getName()); + AwsProperties awsProperties = new AwsProperties(properties); + + Assertions.assertThatThrownBy( + () -> awsProperties.applyS3SignerConfiguration(S3Client.builder())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("S3 signer service URI is required"); + } + + @Test + public void testS3RemoteSigner() { + String uri = "http://localhost:12345"; + Map properties = + ImmutableMap.of( + AwsProperties.S3_SIGNER_IMPL, + S3V4RestSignerClient.class.getName(), + CatalogProperties.URI, + uri); + AwsProperties awsProperties = new AwsProperties(properties); + S3ClientBuilder builder = S3Client.builder(); + + awsProperties.applyS3SignerConfiguration(builder); + + Optional signer = + builder.overrideConfiguration().advancedOption(SdkAdvancedClientOption.SIGNER); + Assertions.assertThat(signer).isPresent().get().isInstanceOf(S3V4RestSignerClient.class); + S3V4RestSignerClient signerClient = (S3V4RestSignerClient) signer.get(); + Assertions.assertThat(signerClient.baseSignerUri()).isEqualTo(uri); + Assertions.assertThat(signerClient.properties()).isEqualTo(properties); + } + + @Test + public void testS3LocalSigner() { + Map properties = + ImmutableMap.of(AwsProperties.S3_SIGNER_IMPL, AwsS3V4Signer.class.getName()); + AwsProperties awsProperties = new AwsProperties(properties); + S3ClientBuilder builder = S3Client.builder(); + + awsProperties.applyS3SignerConfiguration(builder); + + Optional signer = + builder.overrideConfiguration().advancedOption(SdkAdvancedClientOption.SIGNER); + Assertions.assertThat(signer).isPresent().get().isInstanceOf(AwsS3V4Signer.class); + } + + @Test + public void testS3WrongSigner() { + Map properties = ImmutableMap.of(AwsProperties.S3_SIGNER_IMPL, "WrongSigner"); + AwsProperties awsProperties = new AwsProperties(properties); + + Assertions.assertThatThrownBy( + () -> awsProperties.applyS3SignerConfiguration(S3Client.builder())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot instantiate custom signer: WrongSigner"); + } + + @Test + public void testS3SignerWrongSubclass() { + // we're just passing some random class as a signer impl + Map properties = + ImmutableMap.of(AwsProperties.S3_SIGNER_IMPL, AwsProperties.class.getName()); + AwsProperties awsProperties = new AwsProperties(properties); + + Assertions.assertThatThrownBy( + () -> awsProperties.applyS3SignerConfiguration(S3Client.builder())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Custom signer org.apache.iceberg.aws.AwsProperties must be an instance of software.amazon.awssdk.core.signer.Signer"); + } } diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/MinioContainer.java b/aws/src/test/java/org/apache/iceberg/aws/s3/MinioContainer.java new file mode 100644 index 000000000000..b0a2c116bba1 --- /dev/null +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/MinioContainer.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws.s3; + +import java.net.URI; +import java.time.Duration; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.utility.Base58; +import software.amazon.awssdk.auth.credentials.AwsCredentials; + +public class MinioContainer extends GenericContainer { + + private static final int DEFAULT_PORT = 9000; + private static final String DEFAULT_IMAGE = "minio/minio"; + private static final String DEFAULT_TAG = "edge"; + + private static final String MINIO_ACCESS_KEY = "MINIO_ACCESS_KEY"; + private static final String MINIO_SECRET_KEY = "MINIO_SECRET_KEY"; + + private static final String DEFAULT_STORAGE_DIRECTORY = "/data"; + private static final String HEALTH_ENDPOINT = "/minio/health/ready"; + + public MinioContainer(AwsCredentials credentials) { + this(DEFAULT_IMAGE + ":" + DEFAULT_TAG, credentials); + } + + public MinioContainer(String image, AwsCredentials credentials) { + super(image == null ? DEFAULT_IMAGE + ":" + DEFAULT_TAG : image); + this.withNetworkAliases("minio-" + Base58.randomString(6)) + .withCommand("server", DEFAULT_STORAGE_DIRECTORY) + .addExposedPort(DEFAULT_PORT); + if (credentials != null) { + this.withEnv(MINIO_ACCESS_KEY, credentials.accessKeyId()) + .withEnv(MINIO_SECRET_KEY, credentials.secretAccessKey()); + } + setWaitStrategy( + new HttpWaitStrategy() + .forPort(DEFAULT_PORT) + .forPath(HEALTH_ENDPOINT) + .withStartupTimeout(Duration.ofMinutes(2))); + } + + public URI getURI() { + return URI.create("http://" + getHost() + ":" + getMappedPort(DEFAULT_PORT)); + } +} diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/S3SignerServlet.java b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/S3SignerServlet.java new file mode 100644 index 000000000000..ad2177a01a3e --- /dev/null +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/S3SignerServlet.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws.s3.signer; + +import static java.lang.String.format; +import static org.apache.iceberg.rest.RESTCatalogAdapter.castRequest; +import static org.apache.iceberg.rest.RESTCatalogAdapter.castResponse; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.InputStreamReader; +import java.io.Reader; +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.iceberg.exceptions.RESTException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.io.CharStreams; +import org.apache.iceberg.rest.RESTUtil; +import org.apache.iceberg.rest.ResourcePaths; +import org.apache.iceberg.rest.responses.ErrorResponse; +import org.apache.iceberg.rest.responses.OAuthTokenResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.signer.AwsS3V4Signer; +import software.amazon.awssdk.auth.signer.params.AwsS3V4SignerParams; +import software.amazon.awssdk.http.SdkHttpFullRequest; +import software.amazon.awssdk.http.SdkHttpMethod; +import software.amazon.awssdk.regions.Region; + +/** + * The {@link S3V4RestSignerClient} performs OAuth and S3 sign requests against a REST server. The + * {@link S3SignerServlet} provides a simple servlet implementation to emulate the server-side + * behavior of signing S3 requests and handling OAuth. + */ +public class S3SignerServlet extends HttpServlet { + + private static final Logger LOG = LoggerFactory.getLogger(S3SignerServlet.class); + + static final Clock SIGNING_CLOCK = Clock.fixed(Instant.now(), ZoneId.of("UTC")); + static final Set UNSIGNED_HEADERS = + Sets.newHashSet( + Arrays.asList("range", "x-amz-date", "amz-sdk-invocation-id", "amz-sdk-retry")); + private static final String POST = "POST"; + + private static final Set CACHEABLE_METHODS = + Stream.of(SdkHttpMethod.GET, SdkHttpMethod.HEAD).collect(Collectors.toSet()); + + private final Map responseHeaders = + ImmutableMap.of(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType()); + private final ObjectMapper mapper; + + public S3SignerServlet(ObjectMapper mapper) { + this.mapper = mapper; + } + + @Override + protected void doGet(HttpServletRequest request, HttpServletResponse response) { + execute(request, response); + } + + @Override + protected void doHead(HttpServletRequest request, HttpServletResponse response) { + execute(request, response); + } + + @Override + protected void doPost(HttpServletRequest request, HttpServletResponse response) { + execute(request, response); + } + + @Override + protected void doDelete(HttpServletRequest request, HttpServletResponse response) { + execute(request, response); + } + + private OAuthTokenResponse handleOAuth(Map requestMap) { + String grantType = requestMap.get("grant_type"); + switch (grantType) { + case "client_credentials": + return castResponse( + OAuthTokenResponse.class, + OAuthTokenResponse.builder() + .withToken("client-credentials-token:sub=" + requestMap.get("client_id")) + .withIssuedTokenType("urn:ietf:params:oauth:token-type:access_token") + .withTokenType("Bearer") + .build()); + + case "urn:ietf:params:oauth:grant-type:token-exchange": + String actor = requestMap.get("actor_token"); + String token = + String.format( + "token-exchange-token:sub=%s%s", + requestMap.get("subject_token"), actor != null ? ",act=" + actor : ""); + return castResponse( + OAuthTokenResponse.class, + OAuthTokenResponse.builder() + .withToken(token) + .withIssuedTokenType("urn:ietf:params:oauth:token-type:access_token") + .withTokenType("Bearer") + .build()); + + default: + throw new UnsupportedOperationException("Unsupported grant_type: " + grantType); + } + } + + private S3SignResponse signRequest(S3SignRequest request) { + AwsS3V4SignerParams signingParams = + AwsS3V4SignerParams.builder() + .awsCredentials(TestS3RestSigner.CREDENTIALS_PROVIDER.resolveCredentials()) + .enablePayloadSigning(false) + .signingClockOverride(SIGNING_CLOCK) + .enableChunkedEncoding(false) + .signingRegion(Region.of(request.region())) + .doubleUrlEncode(false) + .timeOffset(0) + .signingName("s3") + .build(); + + Map> unsignedHeaders = + request.headers().entrySet().stream() + .filter(e -> UNSIGNED_HEADERS.contains(e.getKey().toLowerCase())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Map> signedHeaders = + request.headers().entrySet().stream() + .filter(e -> !UNSIGNED_HEADERS.contains(e.getKey().toLowerCase())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + SdkHttpFullRequest sign = + AwsS3V4Signer.create() + .sign( + SdkHttpFullRequest.builder() + .uri(request.uri()) + .method(SdkHttpMethod.fromValue(request.method())) + .headers(signedHeaders) + .build(), + signingParams); + + Map> headers = Maps.newHashMap(sign.headers()); + headers.putAll(unsignedHeaders); + + return ImmutableS3SignResponse.builder().uri(request.uri()).headers(headers).build(); + } + + protected void execute(HttpServletRequest request, HttpServletResponse response) { + response.setStatus(HttpServletResponse.SC_OK); + responseHeaders.forEach(response::setHeader); + + String path = request.getRequestURI().substring(1); + Object requestBody; + try { + // we only need to handle oauth tokens & s3 sign request routes here as those are the only + // requests that are being done by the S3V4RestSignerClient + if (POST.equals(request.getMethod()) + && S3V4RestSignerClient.S3_SIGNER_DEFAULT_ENDPOINT.equals(path)) { + S3SignRequest s3SignRequest = + castRequest( + S3SignRequest.class, mapper.readValue(request.getReader(), S3SignRequest.class)); + S3SignResponse s3SignResponse = signRequest(s3SignRequest); + if (CACHEABLE_METHODS.contains(SdkHttpMethod.fromValue(s3SignRequest.method()))) { + // tell the client this can be cached + response.setHeader( + S3V4RestSignerClient.CACHE_CONTROL, S3V4RestSignerClient.CACHE_CONTROL_PRIVATE); + } else { + response.setHeader( + S3V4RestSignerClient.CACHE_CONTROL, S3V4RestSignerClient.CACHE_CONTROL_NO_CACHE); + } + + mapper.writeValue(response.getWriter(), s3SignResponse); + } else if (POST.equals(request.getMethod()) && ResourcePaths.tokens().equals(path)) { + try (Reader reader = new InputStreamReader(request.getInputStream())) { + requestBody = RESTUtil.decodeFormData(CharStreams.toString(reader)); + } + + OAuthTokenResponse oAuthTokenResponse = + handleOAuth((Map) castRequest(Map.class, requestBody)); + mapper.writeValue(response.getWriter(), oAuthTokenResponse); + } else { + response.setStatus(HttpServletResponse.SC_BAD_REQUEST); + mapper.writeValue( + response.getWriter(), + ErrorResponse.builder() + .responseCode(400) + .withType("BadRequestException") + .withMessage(format("No route for request: %s %s", request.getMethod(), path)) + .build()); + } + } catch (RESTException e) { + LOG.error("Error processing REST request", e); + response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + } catch (Exception e) { + LOG.error("Unexpected exception when processing REST request", e); + response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + } + } +} diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3RestSigner.java b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3RestSigner.java new file mode 100644 index 000000000000..b166a6282b16 --- /dev/null +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3RestSigner.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws.s3.signer; + +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.iceberg.aws.s3.MinioContainer; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.rest.auth.OAuth2Properties; +import org.assertj.core.api.Assertions; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.gzip.GzipHandler; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.jetbrains.annotations.NotNull; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.auth.signer.internal.AbstractAws4Signer; +import software.amazon.awssdk.auth.signer.internal.AbstractAwsS3V4Signer; +import software.amazon.awssdk.auth.signer.internal.Aws4SignerRequestParams; +import software.amazon.awssdk.auth.signer.internal.SignerConstant; +import software.amazon.awssdk.auth.signer.params.Aws4PresignerParams; +import software.amazon.awssdk.auth.signer.params.AwsS3V4SignerParams; +import software.amazon.awssdk.core.checksums.SdkChecksum; +import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption; +import software.amazon.awssdk.core.interceptor.ExecutionAttributes; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.http.SdkHttpFullRequest; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.UploadPartRequest; + +public class TestS3RestSigner { + + private static final Region REGION = Region.US_WEST_2; + private static final String BUCKET = "iceberg-s3-signer-test"; + static final AwsCredentialsProvider CREDENTIALS_PROVIDER = + StaticCredentialsProvider.create( + AwsBasicCredentials.create("accessKeyId", "secretAccessKey")); + + private static Server httpServer; + private S3Client s3; + + @Rule public TemporaryFolder temp = new TemporaryFolder(); + + @Rule + public MinioContainer minioContainer = + new MinioContainer(CREDENTIALS_PROVIDER.resolveCredentials()); + + @AfterClass + public static void afterClass() throws Exception { + if (null != httpServer) { + httpServer.stop(); + } + } + + @Before + public void before() throws Exception { + if (null == httpServer) { + httpServer = initHttpServer(8181); + } + + ValidatingSigner validatingSigner = + new ValidatingSigner( + ImmutableS3V4RestSignerClient.builder() + .properties( + ImmutableMap.of( + S3V4RestSignerClient.S3_SIGNER_URI, + httpServer.getURI().toString(), + OAuth2Properties.CREDENTIAL, + "catalog:12345")) + .build(), + new CustomAwsS3V4Signer()); + + s3 = + S3Client.builder() + .region(REGION) + .credentialsProvider(CREDENTIALS_PROVIDER) + .applyMutation( + s3ClientBuilder -> + s3ClientBuilder.httpClientBuilder( + software.amazon.awssdk.http.apache.ApacheHttpClient.builder())) + .endpointOverride(minioContainer.getURI()) + .overrideConfiguration( + c -> c.putAdvancedOption(SdkAdvancedClientOption.SIGNER, validatingSigner)) + .build(); + + s3.createBucket(CreateBucketRequest.builder().bucket(BUCKET).build()); + s3.putObject( + PutObjectRequest.builder().bucket(BUCKET).key("random/key").build(), + Paths.get("/etc/hosts")); + s3.putObject( + PutObjectRequest.builder().bucket(BUCKET).key("encoded/key=value/file").build(), + Paths.get("/etc/hosts")); + + s3.createMultipartUpload( + CreateMultipartUploadRequest.builder().bucket(BUCKET).key("random/multipart-key").build()); + } + + private Server initHttpServer(int port) throws Exception { + S3SignerServlet servlet = new S3SignerServlet(S3ObjectMapper.mapper()); + ServletContextHandler servletContext = + new ServletContextHandler(ServletContextHandler.NO_SESSIONS); + servletContext.setContextPath("/"); + ServletHolder servletHolder = new ServletHolder(servlet); + servletHolder.setInitParameter("javax.ws.rs.Application", "ServiceListPublic"); + servletContext.addServlet(servletHolder, "/*"); + servletContext.setVirtualHosts(null); + servletContext.setGzipHandler(new GzipHandler()); + + Server server = new Server(port); + server.setHandler(servletContext); + server.start(); + return server; + } + + @Test + public void validateGetObject() { + s3.getObject(GetObjectRequest.builder().bucket(BUCKET).key("random/key").build()); + // signer caching should kick in when repeating the same request + s3.getObject(GetObjectRequest.builder().bucket(BUCKET).key("random/key").build()); + } + + @Test + public void validatePutObject() { + s3.putObject( + PutObjectRequest.builder().bucket(BUCKET).key("some/key").build(), Paths.get("/etc/hosts")); + } + + @Test + public void validateListPrefix() { + s3.listObjectsV2(ListObjectsV2Request.builder().bucket(BUCKET).prefix("some/prefix/").build()); + } + + @Test + public void validateEncodedGetObject() { + s3.getObject(GetObjectRequest.builder().bucket(BUCKET).key("encoded/key=value/file").build()); + // signer caching should kick in when repeating the same request + s3.getObject(GetObjectRequest.builder().bucket(BUCKET).key("encoded/key=value/file").build()); + } + + @Test + public void validatedCreateMultiPartUpload() { + s3.createMultipartUpload( + CreateMultipartUploadRequest.builder().bucket(BUCKET).key("some/multipart-key").build()); + } + + @Test + public void validatedUploadPart() { + s3.uploadPart( + UploadPartRequest.builder() + .bucket(BUCKET) + .key("some/multipart-key") + .uploadId("1234") + .build(), + RequestBody.fromString("content")); + } + + /** + * A signer that compares the Authorization header after signing the request with the {@link + * S3V4RestSignerClient} and with the {@link AbstractAwsS3V4Signer} + */ + private static class ValidatingSigner + extends AbstractAws4Signer { + private final S3V4RestSignerClient icebergSigner; + private final AbstractAwsS3V4Signer awsSigner; + + private ValidatingSigner(S3V4RestSignerClient icebergSigner, AbstractAwsS3V4Signer awsSigner) { + this.icebergSigner = icebergSigner; + this.awsSigner = awsSigner; + } + + @Override + protected void processRequestPayload( + SdkHttpFullRequest.Builder mutableRequest, + byte[] signature, + byte[] signingKey, + Aws4SignerRequestParams signerRequestParams, + AwsS3V4SignerParams signerParams) { + throw new UnsupportedOperationException(); + } + + @Override + protected void processRequestPayload( + SdkHttpFullRequest.Builder mutableRequest, + byte[] signature, + byte[] signingKey, + Aws4SignerRequestParams signerRequestParams, + AwsS3V4SignerParams signerParams, + SdkChecksum sdkChecksum) { + throw new UnsupportedOperationException(); + } + + @Override + protected String calculateContentHashPresign( + SdkHttpFullRequest.Builder mutableRequest, Aws4PresignerParams signerParams) { + throw new UnsupportedOperationException(); + } + + @Override + public SdkHttpFullRequest presign( + SdkHttpFullRequest request, ExecutionAttributes executionAttributes) { + throw new UnsupportedOperationException(); + } + + @Override + public SdkHttpFullRequest sign( + SdkHttpFullRequest request, ExecutionAttributes executionAttributes) { + + AwsS3V4SignerParams signerParams = + extractSignerParams(AwsS3V4SignerParams.builder(), executionAttributes) + .signingClockOverride(S3SignerServlet.SIGNING_CLOCK) + .enableChunkedEncoding(false) + .timeOffset(0) + .doubleUrlEncode(false) + .enablePayloadSigning(false) + .signingName("s3") + .build(); + + SdkHttpFullRequest icebergResult = icebergSigner.sign(request, executionAttributes); + + SdkHttpFullRequest awsResult = signWithAwsSigner(request, signerParams); + + Assertions.assertThat(awsResult.headers().get("Authorization")) + .isEqualTo(icebergResult.headers().get("Authorization")); + + Assertions.assertThat(awsResult.headers()).isEqualTo(icebergResult.headers()); + return awsResult; + } + + @NotNull + private SdkHttpFullRequest signWithAwsSigner( + SdkHttpFullRequest request, AwsS3V4SignerParams signerParams) { + // we need to filter out the unsigned headers for the AWS signer and re-append those headers + // back after signing + Map> unsignedHeaders = + request.headers().entrySet().stream() + .filter(e -> S3SignerServlet.UNSIGNED_HEADERS.contains(e.getKey().toLowerCase())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + SdkHttpFullRequest.Builder builder = request.toBuilder(); + for (String unsignedHeader : S3SignerServlet.UNSIGNED_HEADERS) { + builder.removeHeader(unsignedHeader); + } + + SdkHttpFullRequest awsResult = awsSigner.sign(builder.build(), signerParams); + // append the unsigned headers back + SdkHttpFullRequest.Builder mutableResult = awsResult.toBuilder(); + unsignedHeaders.forEach(mutableResult::putHeader); + return mutableResult.build(); + } + } + + /** + * A custom AWS Signer that overrides {@link + * AbstractAwsS3V4Signer#calculateContentHash(SdkHttpFullRequest.Builder, AwsS3V4SignerParams, + * SdkChecksum)} because we don't want to sign the payload. We disabled payload signing via {@link + * AwsS3V4SignerParams#enablePayloadSigning()} but the original + * code looks at the used protocol and if it's not https it will by default enable + * payload signing here. + * + *

However, we run Minio with http and don't have a means to disable payload signing in + * order to achieve the same signature in the {@link ValidatingSigner#sign(SdkHttpFullRequest, + * ExecutionAttributes)} check above. + */ + private static class CustomAwsS3V4Signer extends AbstractAwsS3V4Signer { + + @Override + protected String calculateContentHash( + SdkHttpFullRequest.Builder mutableRequest, + AwsS3V4SignerParams signerParams, + SdkChecksum contentFlexibleChecksum) { + boolean isUnsignedStreamingTrailer = + mutableRequest + .firstMatchingHeader(SignerConstant.X_AMZ_CONTENT_SHA256) + .map(STREAMING_UNSIGNED_PAYLOAD_TRAILER::equals) + .orElse(false); + + if (!isUnsignedStreamingTrailer) { + // To be consistent with other service clients using sig-v4, + // we just set the header as "required", and AWS4Signer.sign() will be + // notified to pick up the header value returned by this method. + mutableRequest.putHeader(SignerConstant.X_AMZ_CONTENT_SHA256, "required"); + } + return S3V4RestSignerClient.UNSIGNED_PAYLOAD; + } + } +} diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3SignRequestParser.java b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3SignRequestParser.java new file mode 100644 index 000000000000..5b8e9cd3b61f --- /dev/null +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3SignRequestParser.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws.s3.signer; + +import com.fasterxml.jackson.databind.JsonNode; +import java.net.URI; +import java.util.Arrays; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.assertj.core.api.Assertions; +import org.junit.Test; + +public class TestS3SignRequestParser { + + @Test + public void nullRequest() { + Assertions.assertThatThrownBy(() -> S3SignRequestParser.fromJson((JsonNode) null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse s3 sign request from null object"); + + Assertions.assertThatThrownBy(() -> S3SignRequestParser.toJson(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid s3 sign request: null"); + } + + @Test + public void missingFields() { + Assertions.assertThatThrownBy(() -> S3SignRequestParser.fromJson("{}")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse missing string: region"); + + Assertions.assertThatThrownBy(() -> S3SignRequestParser.fromJson("{\"region\":\"us-west-2\"}")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse missing string: method"); + + Assertions.assertThatThrownBy( + () -> S3SignRequestParser.fromJson("{\"region\":\"us-west-2\", \"method\" : \"PUT\"}")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse missing string: uri"); + + Assertions.assertThatThrownBy( + () -> + S3SignRequestParser.fromJson( + "{\n" + + " \"region\" : \"us-west-2\",\n" + + " \"method\" : \"PUT\",\n" + + " \"uri\" : \"http://localhost:49208/iceberg-signer-test\"\n" + + "}")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse missing field: headers"); + } + + @Test + public void invalidMethod() { + Assertions.assertThatThrownBy( + () -> + S3SignRequestParser.fromJson( + "{\n" + + " \"region\" : \"us-west-2\",\n" + + " \"method\" : 23,\n" + + " \"uri\" : \"http://localhost:49208/iceberg-signer-test\",\n" + + " \"headers\" : {}}\n" + + "}")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse to a string value: method: 23"); + } + + @Test + public void invalidUri() { + Assertions.assertThatThrownBy( + () -> + S3SignRequestParser.fromJson( + "{\n" + + " \"region\" : \"us-west-2\",\n" + + " \"method\" : \"PUT\",\n" + + " \"uri\" : 45,\n" + + " \"headers\" : {}}\n" + + "}")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse to a string value: uri: 45"); + } + + @Test + public void invalidRegion() { + Assertions.assertThatThrownBy( + () -> + S3SignRequestParser.fromJson( + "{\n" + + " \"region\" : 23,\n" + + " \"method\" : \"PUT\",\n" + + " \"uri\" : \"http://localhost:49208/iceberg-signer-test\",\n" + + " \"headers\" : {}\n" + + "}")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse to a string value: region: 23"); + } + + @Test + public void roundTripSerde() { + ImmutableS3SignRequest s3SignRequest = + ImmutableS3SignRequest.builder() + .uri(URI.create("http://localhost:49208/iceberg-signer-test")) + .method("PUT") + .region("us-west-2") + .headers( + ImmutableMap.of( + "amz-sdk-request", + Arrays.asList("attempt=1", "max=4"), + "Content-Length", + Arrays.asList("191"), + "Content-Type", + Arrays.asList("application/json"), + "User-Agent", + Arrays.asList("aws-sdk-java/2.17.257", "Linux/5.4.0-126"))) + .build(); + + String json = S3SignRequestParser.toJson(s3SignRequest, true); + Assertions.assertThat(S3SignRequestParser.fromJson(json)).isEqualTo(s3SignRequest); + Assertions.assertThat(json) + .isEqualTo( + "{\n" + + " \"region\" : \"us-west-2\",\n" + + " \"method\" : \"PUT\",\n" + + " \"uri\" : \"http://localhost:49208/iceberg-signer-test\",\n" + + " \"headers\" : {\n" + + " \"amz-sdk-request\" : [ \"attempt=1\", \"max=4\" ],\n" + + " \"Content-Length\" : [ \"191\" ],\n" + + " \"Content-Type\" : [ \"application/json\" ],\n" + + " \"User-Agent\" : [ \"aws-sdk-java/2.17.257\", \"Linux/5.4.0-126\" ]\n" + + " }\n" + + "}"); + } + + @Test + public void roundTripSerdeWithProperties() { + ImmutableS3SignRequest s3SignRequest = + ImmutableS3SignRequest.builder() + .uri(URI.create("http://localhost:49208/iceberg-signer-test")) + .method("PUT") + .region("us-west-2") + .headers( + ImmutableMap.of( + "amz-sdk-request", + Arrays.asList("attempt=1", "max=4"), + "Content-Length", + Arrays.asList("191"), + "Content-Type", + Arrays.asList("application/json"), + "User-Agent", + Arrays.asList("aws-sdk-java/2.17.257", "Linux/5.4.0-126"))) + .properties(ImmutableMap.of("k1", "v1")) + .build(); + + String json = S3SignRequestParser.toJson(s3SignRequest, true); + Assertions.assertThat(S3SignRequestParser.fromJson(json)).isEqualTo(s3SignRequest); + Assertions.assertThat(json) + .isEqualTo( + "{\n" + + " \"region\" : \"us-west-2\",\n" + + " \"method\" : \"PUT\",\n" + + " \"uri\" : \"http://localhost:49208/iceberg-signer-test\",\n" + + " \"headers\" : {\n" + + " \"amz-sdk-request\" : [ \"attempt=1\", \"max=4\" ],\n" + + " \"Content-Length\" : [ \"191\" ],\n" + + " \"Content-Type\" : [ \"application/json\" ],\n" + + " \"User-Agent\" : [ \"aws-sdk-java/2.17.257\", \"Linux/5.4.0-126\" ]\n" + + " },\n" + + " \"properties\" : {\n" + + " \"k1\" : \"v1\"\n" + + " }\n" + + "}"); + } +} diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3SignResponseParser.java b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3SignResponseParser.java new file mode 100644 index 000000000000..778f00d2989c --- /dev/null +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3SignResponseParser.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws.s3.signer; + +import com.fasterxml.jackson.databind.JsonNode; +import java.net.URI; +import java.util.Arrays; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.assertj.core.api.Assertions; +import org.junit.Test; + +public class TestS3SignResponseParser { + + @Test + public void nullResponse() { + Assertions.assertThatThrownBy(() -> S3SignResponseParser.fromJson((JsonNode) null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse s3 sign response from null object"); + + Assertions.assertThatThrownBy(() -> S3SignResponseParser.toJson(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid s3 sign response: null"); + } + + @Test + public void missingFields() { + Assertions.assertThatThrownBy(() -> S3SignResponseParser.fromJson("{}")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse missing string: uri"); + + Assertions.assertThatThrownBy( + () -> + S3SignResponseParser.fromJson( + "{\"uri\" : \"http://localhost:49208/iceberg-signer-test\"}")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse missing field: headers"); + } + + @Test + public void invalidUri() { + Assertions.assertThatThrownBy( + () -> S3SignResponseParser.fromJson("{\"uri\" : 45, \"headers\" : {}}}")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse to a string value: uri: 45"); + } + + @Test + public void roundTripSerde() { + S3SignResponse s3SignResponse = + ImmutableS3SignResponse.builder() + .uri(URI.create("http://localhost:49208/iceberg-signer-test")) + .headers( + ImmutableMap.of( + "amz-sdk-request", + Arrays.asList("attempt=1", "max=4"), + "Content-Length", + Arrays.asList("191"), + "Content-Type", + Arrays.asList("application/json"), + "User-Agent", + Arrays.asList("aws-sdk-java/2.17.257", "Linux/5.4.0-126"))) + .build(); + + String json = S3SignResponseParser.toJson(s3SignResponse, true); + Assertions.assertThat(S3SignResponseParser.fromJson(json)).isEqualTo(s3SignResponse); + Assertions.assertThat(json) + .isEqualTo( + "{\n" + + " \"uri\" : \"http://localhost:49208/iceberg-signer-test\",\n" + + " \"headers\" : {\n" + + " \"amz-sdk-request\" : [ \"attempt=1\", \"max=4\" ],\n" + + " \"Content-Length\" : [ \"191\" ],\n" + + " \"Content-Type\" : [ \"application/json\" ],\n" + + " \"User-Agent\" : [ \"aws-sdk-java/2.17.257\", \"Linux/5.4.0-126\" ]\n" + + " }\n" + + "}"); + } +} diff --git a/build.gradle b/build.gradle index abafedb48ebe..3e55beb0a6ed 100644 --- a/build.gradle +++ b/build.gradle @@ -389,9 +389,15 @@ project(':iceberg-aws') { api project(':iceberg-api') implementation project(':iceberg-common') implementation project(':iceberg-core') + annotationProcessor "org.immutables:value" + compileOnly "org.immutables:value" + implementation "com.github.ben-manes.caffeine:caffeine" + implementation "com.fasterxml.jackson.core:jackson-databind" + implementation "com.fasterxml.jackson.core:jackson-core" compileOnly 'software.amazon.awssdk:url-connection-client' compileOnly 'software.amazon.awssdk:apache-client' + compileOnly 'software.amazon.awssdk:auth' compileOnly 'software.amazon.awssdk:s3' compileOnly 'software.amazon.awssdk:kms' compileOnly 'software.amazon.awssdk:glue' @@ -416,6 +422,9 @@ project(':iceberg-aws') { } testImplementation "com.esotericsoftware:kryo" testImplementation "org.xerial:sqlite-jdbc" + testImplementation "org.testcontainers:testcontainers" + testImplementation "org.apache.httpcomponents.client5:httpclient5" + testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts') } sourceSets { diff --git a/versions.props b/versions.props index ef6a50fa05de..4e4d30dbc284 100644 --- a/versions.props +++ b/versions.props @@ -45,3 +45,4 @@ org.mock-server:mockserver-netty = 5.13.2 org.mock-server:mockserver-client-java = 5.13.2 com.esotericsoftware:kryo = 4.0.2 org.eclipse.jetty:* = 9.4.43.v20210629 +org.testcontainers:* = 1.17.5