diff --git a/src/Dapr.Common/Data/Attributes/DataPipelineAttribute.cs b/src/Dapr.Common/Data/Attributes/DataPipelineAttribute.cs new file mode 100644 index 000000000..1fa54b8bd --- /dev/null +++ b/src/Dapr.Common/Data/Attributes/DataPipelineAttribute.cs @@ -0,0 +1,48 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Dapr.Common.Data.Operations; + +namespace Dapr.Common.Data.Attributes; + +/// +/// Attribute-based approach for indicating which data operations should be performed on a type and in what order. +/// +[AttributeUsage(AttributeTargets.Class | AttributeTargets.Struct, AllowMultiple = false, Inherited = false)] +public sealed class DataPipelineAttribute : Attribute +{ + /// + /// Contains the various data operation types available and the order in which to apply them. + /// + public readonly IReadOnlyList DataOperationTypes; + + /// + /// Initializes a new . + /// + /// + /// + public DataPipelineAttribute(params Type[] dataOperationTypes) + { + var registeredTypes = new List(); + + foreach (var type in dataOperationTypes) + { + if (!typeof(IDaprDataOperation).IsAssignableFrom(type)) + throw new DaprException($"Unable to register data preparation operation as {nameof(type)} does not implement `IDataOperation`"); + + registeredTypes.Add(type); + } + + DataOperationTypes = registeredTypes; + } +} diff --git a/src/Dapr.Common/Data/DaprDecoderPipeline.cs b/src/Dapr.Common/Data/DaprDecoderPipeline.cs new file mode 100644 index 000000000..c3eaf5b1d --- /dev/null +++ b/src/Dapr.Common/Data/DaprDecoderPipeline.cs @@ -0,0 +1,145 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Dapr.Common.Data.Operations; + +namespace Dapr.Common.Data; + +/// +/// Processes the data using the provided providers. +/// +internal sealed class DaprDecoderPipeline +{ + private readonly Stack prefixKeys; + private readonly IDaprTStringTransitionOperation? genericToStringOp; + private readonly List stringOps = new(); + private readonly IDaprStringByteTransitionOperation? stringToByteOp; + private readonly List byteOps = new(); + + /// + /// Used to initialize a new . + /// + public DaprDecoderPipeline(IEnumerable operations, Stack prefixKeys) + { + this.prefixKeys = prefixKeys; + + foreach (var op in operations) + { + switch (op) + { + case IDaprTStringTransitionOperation genToStrOp when genericToStringOp is not null: + throw new DaprException( + $"Multiple types are declared for the conversion of the data to a string in the data pipeline for {typeof(TOutput)} - only one is allowed"); + case IDaprTStringTransitionOperation genToStrOp: + genericToStringOp = genToStrOp; + break; + case IDaprStringBasedOperation strOp: + stringOps.Add(strOp); + break; + case IDaprStringByteTransitionOperation strToByte when stringToByteOp is not null: + throw new DaprException( + $"Multiple types are declared for the pipeline conversion from a string to a byte array for {typeof(TOutput)} - only one is allowed"); + case IDaprStringByteTransitionOperation strToByte: + stringToByteOp = strToByte; + break; + case IDaprByteBasedOperation byteOp: + byteOps.Add(byteOp); + break; + } + } + + if (genericToStringOp is null) + { + throw new DaprException( + $"A pipeline operation must be specified to convert a {typeof(TOutput)} into a serializable string"); + } + + if (stringToByteOp is null) + { + throw new DaprException( + $"A pipeline operation must be specified to convert a {typeof(TOutput)} into a byte array"); + } + } + + /// + /// Processes the reverse of the data in the order of the provided list of . + /// + /// The data to process in reverse. + /// The metadata providing the mechanism(s) used to encode the data. + /// Cancellation token. + /// The evaluated data. + public async Task> ReverseProcessAsync(ReadOnlyMemory payload, + Dictionary metadata, CancellationToken cancellationToken = default) + { + var metadataPrefixes = new Stack(prefixKeys); + + //First, perform byte-based operations + var inboundPayload = new DaprOperationPayload>(payload) { Metadata = metadata }; + var byteBasedResult = await ReverseByteOperationsAsync(inboundPayload, metadataPrefixes, cancellationToken); + + //Convert this back to a string from a byte array + var currentPrefix = metadataPrefixes.Pop(); + var stringResult = await stringToByteOp!.ReverseAsync(byteBasedResult, currentPrefix, cancellationToken); + + //Perform the string-based operations + var stringBasedResult = await ReverseStringOperationsAsync(stringResult, metadataPrefixes, cancellationToken); + + //Convert from a string back into its generic type + currentPrefix = metadataPrefixes.Pop(); + var genericResult = await genericToStringOp!.ReverseAsync(stringBasedResult, currentPrefix, cancellationToken); + + return genericResult; + } + + /// + /// Performs a reversal operation for the string-based operations. + /// + /// The payload to run the reverse operation against. + /// The prefix values for retrieving data from the metadata for this operation. + /// Cancellation token. + /// + private async Task> ReverseStringOperationsAsync( + DaprOperationPayload payload, + Stack metadataPrefixes, CancellationToken cancellationToken) + { + stringOps.Reverse(); + foreach (var op in stringOps) + { + var currentPrefix = metadataPrefixes.Pop(); + payload = await op.ReverseAsync(payload, currentPrefix, cancellationToken); + } + + return payload; + } + + /// + /// Performs a reversal operation for the byte-based operations. + /// + /// The current state of the payload. + /// The prefix values for retrieving data from the metadata for this operation. + /// Cancellation token. + /// The most up-to-date payload. + private async Task>> + ReverseByteOperationsAsync(DaprOperationPayload> payload, Stack metadataPrefixes, + CancellationToken cancellationToken) + { + byteOps.Reverse(); + foreach (var op in byteOps) + { + var currentPrefix = metadataPrefixes.Pop(); + payload = await op.ReverseAsync(payload, currentPrefix, cancellationToken); + } + + return payload; + } +} diff --git a/src/Dapr.Common/Data/DaprEncoderPipeline.cs b/src/Dapr.Common/Data/DaprEncoderPipeline.cs new file mode 100644 index 000000000..54003769a --- /dev/null +++ b/src/Dapr.Common/Data/DaprEncoderPipeline.cs @@ -0,0 +1,155 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Dapr.Common.Data.Extensions; +using Dapr.Common.Data.Operations; + +namespace Dapr.Common.Data; + +/// +/// Processes the data using the provided providers. +/// +internal sealed class DaprEncoderPipeline +{ + /// + /// The metadata key containing the operations. + /// + private const string OperationKey = "ops"; + + private readonly List operationNames = new(); + private readonly Dictionary operationInvocations = new(); + + private readonly IDaprTStringTransitionOperation? genericToStringOp; + private readonly List stringOps = new(); + private readonly IDaprStringByteTransitionOperation? stringToByteOp; + private readonly List byteOps = new(); + + /// + /// Used to initialize a new . + /// + public DaprEncoderPipeline(IEnumerable operations) + { + foreach (var op in operations) + { + switch (op) + { + case IDaprTStringTransitionOperation genToStrOp when genericToStringOp is not null: + throw new DaprException( + $"Multiple types are declared for the conversion of the data to a string in the data pipeline for {typeof(TInput)} - only one is allowed"); + case IDaprTStringTransitionOperation genToStrOp: + genericToStringOp = genToStrOp; + break; + case IDaprStringBasedOperation strOp: + stringOps.Add(strOp); + break; + case IDaprStringByteTransitionOperation strToByte when stringToByteOp is not null: + throw new DaprException( + $"Multiple types are declared for the pipeline conversion from a string to a byte array for {typeof(TInput)} - only one is allowed"); + case IDaprStringByteTransitionOperation strToByte: + stringToByteOp = strToByte; + break; + case IDaprByteBasedOperation byteOp: + byteOps.Add(byteOp); + break; + } + } + + if (genericToStringOp is null) + { + throw new DaprException( + $"A pipeline operation must be specified to convert a {typeof(TInput)} into a serializable string"); + } + + if (stringToByteOp is null) + { + throw new DaprException( + $"A pipeline operation must be specified to convert a {typeof(TInput)} into a byte array"); + } + } + + /// + /// Processes the data in the order of the provided list of . + /// + /// The data to evaluate. + /// Cancellation token. + /// The evaluated data. + public async Task>> ProcessAsync(TInput input, CancellationToken cancellationToken = default) + { + //Combines the metadata across each operation to be returned with the result + var combinedMetadata = new Dictionary(); + + //Start by serializing the input to a string + var serializationPayload = await genericToStringOp!.ExecuteAsync(input, cancellationToken); + combinedMetadata.MergeFrom(serializationPayload.Metadata, RegisterOperationInvocation(genericToStringOp.Name)); + + //Run through any provided string-based operations + var stringPayload = new DaprOperationPayload(serializationPayload.Payload); + foreach (var strOp in stringOps) + { + stringPayload = await strOp.ExecuteAsync(stringPayload.Payload, cancellationToken); + combinedMetadata.MergeFrom(stringPayload.Metadata, RegisterOperationInvocation(strOp.Name)); + } + + //Encode the string payload to a byte array + var encodedPayload = await stringToByteOp!.ExecuteAsync(stringPayload.Payload, cancellationToken); + combinedMetadata.MergeFrom(encodedPayload.Metadata, RegisterOperationInvocation(stringToByteOp.Name)); + + //Run through any provided byte-based operations + var bytePayload = new DaprOperationPayload>(encodedPayload.Payload); + foreach (var byteOp in byteOps) + { + bytePayload = await byteOp.ExecuteAsync(bytePayload.Payload, cancellationToken); + combinedMetadata.MergeFrom(bytePayload.Metadata, RegisterOperationInvocation(byteOp.Name)); + } + + //Persist the op names to the metadata + combinedMetadata[OperationKey] = string.Join(',', operationNames); + + //Create a payload that combines the payload and metadata + var resultPayload = new DaprOperationPayload>(bytePayload.Payload) + { + Metadata = combinedMetadata + }; + return resultPayload; + } + + /// + /// Gets the formatted operation name with its zero-based invocation count. + /// + /// The name of the operation. + /// A string value containing the operation name and its zero-based invocation count. + private string RegisterOperationInvocation(string operationName) + { + //Add to the operation names + var result = $"{operationName}[{GetAndAddOperationInvocation(operationName)}]"; + operationNames.Add(result); + + //Return to be used in the metadata key + return result; + } + + /// + /// Registers another operation invocation. + /// + /// The name of the operation. + /// The zero-based count of the operational invocation. + private int GetAndAddOperationInvocation(string operationName) + { + if (!operationInvocations.TryGetValue(operationName, out var invocationCount)) + operationInvocations[operationName] = 1; + else + operationInvocations[operationName]++; + + return invocationCount; + } +} diff --git a/src/Dapr.Common/Data/DataPipelineFactory.cs b/src/Dapr.Common/Data/DataPipelineFactory.cs new file mode 100644 index 000000000..662a0963c --- /dev/null +++ b/src/Dapr.Common/Data/DataPipelineFactory.cs @@ -0,0 +1,113 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Reflection; +using System.Text.RegularExpressions; +using Dapr.Common.Data.Attributes; +using Dapr.Common.Data.Operations; +using Microsoft.Extensions.DependencyInjection; + +namespace Dapr.Common.Data; + +/// +/// Used to create a data pipeline specific to a given type using the ordered operation services indicated in the +/// attribute on that type. +/// +internal sealed class DataPipelineFactory +{ + /// + /// The service provider used to pull the registered data operation services. + /// + private readonly IServiceProvider serviceProvider; + + /// + /// Used to instantiate a . + /// + public DataPipelineFactory(IServiceProvider serviceProvider) + { + this.serviceProvider = serviceProvider; + } + + /// + /// Creates a pipeline used to serialize a given type. + /// + /// The type to create the pipeline for. + /// + public DaprEncoderPipeline CreateEncodingPipeline() + { + var attribute = typeof(T).GetCustomAttribute(); + if (attribute == null) + { + return new DaprEncoderPipeline(new List>()); + } + + var allRegisteredOperations = serviceProvider.GetServices().ToList(); + var operations = attribute.DataOperationTypes + .SelectMany(type => allRegisteredOperations.Where(op => op.GetType() == type)) + .ToList(); + + return new DaprEncoderPipeline(operations); + } + + /// + /// Creates a pipeline used to reverse a previously applied pipeline operation using the provided + /// operation names from the metadata. + /// + /// The metadata payload used to determine the order of operations. + /// The type to deserialize to. + /// A pipeline configured for reverse processing. + /// + public DaprDecoderPipeline CreateDecodingPipeline(Dictionary metadata) + { + const string operationKey = "ops"; + if (!metadata.TryGetValue(operationKey, out var opNames)) + { + throw new DaprException( + $"Unable to decode payload as its metadata is missing the key (\"${operationKey}\") containing the operation order"); + } + + //Run through the names backwards in the order of the operations as named in the metadata + var operations = new List(opNames.Split(',').Reverse()).ToList(); + + var services = serviceProvider.GetServices().ToList(); + var metadataPrefixes = new Stack(); + var operationalServices = new List(); + + for (var a = 0; a < operations.Count; a++) + { + var operation = operations[a]; + var plainName = Regex.Replace(operation, @"\[\d+\]$", string.Empty); + + var matchingService = services.FirstOrDefault(op => string.Equals(op.Name, plainName)); + if (matchingService is null) + throw new DaprException($"Unable to locate service matching {plainName} in service registry"); + + operationalServices.Add(matchingService); + metadataPrefixes.Push(operation); + } + + if (operationalServices.Count != operations.Count) + { + //Identify which names are missing + foreach (var op in operationalServices) + { + operations.Remove(op.Name); + } + + throw new DaprException( + $"Registered services were not located for the following operation names present in the metadata: {String.Join(',', operations)}"); + } + + return new DaprDecoderPipeline(operationalServices, metadataPrefixes); + } +} diff --git a/src/Dapr.Common/Data/Extensions/DaprDataPipelineBuilder.cs b/src/Dapr.Common/Data/Extensions/DaprDataPipelineBuilder.cs new file mode 100644 index 000000000..c310910eb --- /dev/null +++ b/src/Dapr.Common/Data/Extensions/DaprDataPipelineBuilder.cs @@ -0,0 +1,35 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Microsoft.Extensions.DependencyInjection; + +namespace Dapr.Common.Data.Extensions; + +/// +/// Used by the fluent registration builder to configure a Dapr data pipeline. +/// +public sealed class DaprDataPipelineBuilder : IDaprDataPipelineBuilder +{ + /// + /// The registered services on the builder. + /// + public IServiceCollection Services { get; } + + /// + /// Used to initialize a new . + /// + public DaprDataPipelineBuilder(IServiceCollection services) + { + Services = services; + } +} diff --git a/src/Dapr.Common/Data/Extensions/DaprDataPipelineRegistrationBuilderExtensions.cs b/src/Dapr.Common/Data/Extensions/DaprDataPipelineRegistrationBuilderExtensions.cs new file mode 100644 index 000000000..f42d60fa1 --- /dev/null +++ b/src/Dapr.Common/Data/Extensions/DaprDataPipelineRegistrationBuilderExtensions.cs @@ -0,0 +1,168 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Dapr.Common.Data.Operations; +using Dapr.Common.Data.Operations.Providers.Compression; +using Dapr.Common.Data.Operations.Providers.Encoding; +using Dapr.Common.Data.Operations.Providers.Integrity; +using Dapr.Common.Data.Operations.Providers.Serialization; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; + +namespace Dapr.Common.Data.Extensions; + +/// +/// Contains the dependency injection registration extension for the Dapr data pipeline operations. +/// +public static class DaprDataPipelineRegistrationBuilderExtensions +{ + /// + /// Registers a Dapr data processing pipeline. + /// + /// + /// + public static IDaprDataPipelineBuilder AddDaprDataProcessingPipeline(this IServiceCollection services) + { + services.AddSingleton(); + return new DaprDataPipelineBuilder(services); + } + + /// + /// Adds a serializer data operation. + /// + public static IDaprDataPipelineBuilder WithSerializer(this IDaprDataPipelineBuilder builder, + ServiceLifetime lifetime = ServiceLifetime.Singleton) + where TService : class, IDaprDataOperation => + builder.WithDaprOperation(lifetime); + + /// + /// Adds a serializer data operation. + /// + public static IDaprDataPipelineBuilder WithSerializer(this IDaprDataPipelineBuilder builder, + Func> serializerFactory, + ServiceLifetime lifetime = ServiceLifetime.Singleton) => + builder.WithDaprOperation, TInput, string>(serializerFactory, lifetime); + + /// + /// Adds a compression data operation. + /// + public static IDaprDataPipelineBuilder WithCompressor(this IDaprDataPipelineBuilder builder, + ServiceLifetime lifetime = ServiceLifetime.Singleton) + where TService : class, IDaprDataCompressor => + builder.WithDaprOperation(lifetime); + + /// + /// Adds a compressor data operation. + /// + public static IDaprDataPipelineBuilder WithCompressor(this IDaprDataPipelineBuilder builder, + Func compressorFactory, + ServiceLifetime lifetime = ServiceLifetime.Singleton) => + builder.WithDaprOperation, ReadOnlyMemory>(compressorFactory, + lifetime); + + /// + /// Adds a data integrity operation. + /// + public static IDaprDataPipelineBuilder WithIntegrity(this IDaprDataPipelineBuilder builder, + ServiceLifetime serviceLifetime = ServiceLifetime.Singleton) + where TService : class, IDaprDataValidator + => builder.WithDaprOperation(serviceLifetime); + + /// + /// Adds a data integrity operation using a factory that provides an . + /// + public static IDaprDataPipelineBuilder WithIntegrity(this IDaprDataPipelineBuilder builder, + Func validatorFactory, + ServiceLifetime serviceLifetime = ServiceLifetime.Singleton) => + builder.WithDaprOperation, ReadOnlyMemory>(validatorFactory, + serviceLifetime); + + /// + /// Adds an encoder operation. + /// + public static IDaprDataPipelineBuilder WithEncoder(this IDaprDataPipelineBuilder builder, + ServiceLifetime serviceLifetime = ServiceLifetime.Singleton) + where TService : class, IDaprDataEncoder + => builder.WithDaprOperation(serviceLifetime); + + /// + /// Adds an encoder operation using a factory that provides an . + /// + public static IDaprDataPipelineBuilder WithEncoder(this IDaprDataPipelineBuilder builder, + Func encoderFactory, + ServiceLifetime serviceLifetime = ServiceLifetime.Singleton) => + builder.WithDaprOperation>(encoderFactory, + serviceLifetime); + + /// + /// Registers the specified Dapr operation services. + /// + /// The builder to register the type on. + /// The lifetime the service should be registered for. + /// + /// Thrown when an invalid service lifetime is provided. + private static IDaprDataPipelineBuilder WithDaprOperation(this IDaprDataPipelineBuilder builder, + ServiceLifetime lifetime) + where TService : class, IDaprDataOperation + { + switch (lifetime) + { + case ServiceLifetime.Singleton: + builder.Services.TryAddEnumerable(ServiceDescriptor.Singleton()); + break; + case ServiceLifetime.Scoped: + builder.Services.TryAddEnumerable(ServiceDescriptor.Scoped()); + break; + case ServiceLifetime.Transient: + builder.Services.TryAddEnumerable(ServiceDescriptor.Transient()); + break; + default: + throw new ArgumentOutOfRangeException(nameof(lifetime), lifetime, null); + } + + return builder; + } + + /// + /// Registers the specified + /// + /// The builder to register the type on. + /// The data operation factory used to register the data operation service. + /// The lifetime the service should be registered for. + /// The type of service being registered. + /// The input type provided to the operation. + /// The output type provided by the operation. + /// + /// Thrown when an invalid service lifetime is provided. + private static IDaprDataPipelineBuilder WithDaprOperation(this IDaprDataPipelineBuilder builder, + Func operationFactory, ServiceLifetime lifetime) + where TService : class, IDaprDataOperation + { + switch (lifetime) + { + case ServiceLifetime.Singleton: + builder.Services.TryAddEnumerable(ServiceDescriptor.Singleton(operationFactory)); + break; + case ServiceLifetime.Scoped: + builder.Services.TryAddEnumerable(ServiceDescriptor.Scoped(operationFactory)); + break; + case ServiceLifetime.Transient: + builder.Services.TryAddEnumerable(ServiceDescriptor.Transient(operationFactory)); + break; + default: + throw new ArgumentOutOfRangeException(nameof(lifetime), lifetime, null); + } + + return builder; + } +} diff --git a/src/Dapr.Common/Data/Extensions/DictionaryExtensions.cs b/src/Dapr.Common/Data/Extensions/DictionaryExtensions.cs new file mode 100644 index 000000000..090daf22f --- /dev/null +++ b/src/Dapr.Common/Data/Extensions/DictionaryExtensions.cs @@ -0,0 +1,38 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Common.Data.Extensions; + +/// +/// Provides extension methods for use with a . +/// +internal static class DictionaryExtensions +{ + /// + /// Merges the keys and values of the provided dictionary in mergeFrom with the + /// dictionary provided in mergeTo. + /// + /// The dictionary the values are being merged into. + /// The dictionary the values are being merged from. + /// The prefix to prepend to the key of the merged values. + /// The type of the value for either dictionary. + internal static void MergeFrom(this Dictionary mergeTo, + Dictionary mergeFrom, string prefix) + { + foreach (var kvp in mergeFrom) + { + var newKey = $"{prefix}{kvp.Key}"; + mergeTo[newKey] = kvp.Value; + } + } +} diff --git a/src/Dapr.Common/Data/Extensions/IDaprDataPipelineBuilder.cs b/src/Dapr.Common/Data/Extensions/IDaprDataPipelineBuilder.cs new file mode 100644 index 000000000..7d044594b --- /dev/null +++ b/src/Dapr.Common/Data/Extensions/IDaprDataPipelineBuilder.cs @@ -0,0 +1,21 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Common.Data.Extensions; + +/// +/// Used to build out registration functionality for a Dapr data pipeline +/// +public interface IDaprDataPipelineBuilder : IDaprServiceBuilder +{ +} diff --git a/src/Dapr.Common/Data/Extensions/IDaprDataProcessingBuilder.cs b/src/Dapr.Common/Data/Extensions/IDaprDataProcessingBuilder.cs new file mode 100644 index 000000000..62432542f --- /dev/null +++ b/src/Dapr.Common/Data/Extensions/IDaprDataProcessingBuilder.cs @@ -0,0 +1,21 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Common.Data.Extensions; + +/// +/// Provides a root builder for the Dapr processing functionality facilitating a more fluent-style registration. +/// +public interface IDaprDataProcessingBuilder : IDaprServiceBuilder +{ +} diff --git a/src/Dapr.Common/Data/Extensions/IDaprServiceBuilder.cs b/src/Dapr.Common/Data/Extensions/IDaprServiceBuilder.cs new file mode 100644 index 000000000..481e91b2b --- /dev/null +++ b/src/Dapr.Common/Data/Extensions/IDaprServiceBuilder.cs @@ -0,0 +1,27 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Microsoft.Extensions.DependencyInjection; + +namespace Dapr.Common.Data.Extensions; + +/// +/// Responsible for registering Dapr service functionality. +/// +public interface IDaprServiceBuilder +{ + /// + /// The registered services on the builder. + /// + public IServiceCollection Services { get; } +} diff --git a/src/Dapr.Common/Data/Operations/DaprOperationPayload.cs b/src/Dapr.Common/Data/Operations/DaprOperationPayload.cs new file mode 100644 index 000000000..93ca62228 --- /dev/null +++ b/src/Dapr.Common/Data/Operations/DaprOperationPayload.cs @@ -0,0 +1,40 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Common.Data.Operations; + +/// +/// Contains the result of a Dapr data operation. +/// +/// The type of the payload. +public record DaprOperationPayload +{ + /// + /// Initializes a . + /// + /// The resulting payload following the operation. + public DaprOperationPayload(T payload) + { + Payload = payload; + } + + /// + /// The result of the operation. + /// + public T Payload { get; init; } + + /// + /// The metadata produced by the operation. + /// + public Dictionary Metadata { get; init; } = new(); +} diff --git a/src/Dapr.Common/Data/Operations/IDaprByteBasedOperation.cs b/src/Dapr.Common/Data/Operations/IDaprByteBasedOperation.cs new file mode 100644 index 000000000..feaf60372 --- /dev/null +++ b/src/Dapr.Common/Data/Operations/IDaprByteBasedOperation.cs @@ -0,0 +1,21 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Common.Data.Operations; + +/// +/// Represents a Dapr pipeline operation performed against byte values. +/// +public interface IDaprByteBasedOperation : IDaprDataOperation, ReadOnlyMemory> +{ +} diff --git a/src/Dapr.Common/Data/Operations/IDaprDataOperation.cs b/src/Dapr.Common/Data/Operations/IDaprDataOperation.cs new file mode 100644 index 000000000..337d9be9b --- /dev/null +++ b/src/Dapr.Common/Data/Operations/IDaprDataOperation.cs @@ -0,0 +1,51 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Common.Data.Operations; + +/// +/// Represents a data operation. +/// +public interface IDaprDataOperation +{ + /// + /// The name of the operation. + /// + string Name { get; } +} + +/// +/// Represents a data operation with generic input and output types. +/// +/// The type of the input data. +/// The type of the output data. +public interface IDaprDataOperation : IDaprDataOperation +{ + /// + /// Executes the data processing operation. + /// + /// The input data. + /// Cancellation token. + /// The output data and metadata for the operation. + Task> ExecuteAsync(TInput? input, CancellationToken cancellationToken = default); + + /// + /// Reverses the data operation. + /// + /// The processed input data being reversed. + /// The prefix value of the keys containing the operation metadata. + /// Cancellation token. + /// The reversed output data and metadata for the operation. + Task> ReverseAsync(DaprOperationPayload input, string metadataPrefix, + CancellationToken cancellationToken = default); +} diff --git a/src/Dapr.Common/Data/Operations/IDaprStringBasedOperation.cs b/src/Dapr.Common/Data/Operations/IDaprStringBasedOperation.cs new file mode 100644 index 000000000..29be721e2 --- /dev/null +++ b/src/Dapr.Common/Data/Operations/IDaprStringBasedOperation.cs @@ -0,0 +1,21 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Common.Data.Operations; + +/// +/// Represents a Dapr pipeline operation performed against string values. +/// +public interface IDaprStringBasedOperation : IDaprDataOperation +{ +} diff --git a/src/Dapr.Common/Data/Operations/IDaprStringByteTransitionOperation.cs b/src/Dapr.Common/Data/Operations/IDaprStringByteTransitionOperation.cs new file mode 100644 index 000000000..31a346216 --- /dev/null +++ b/src/Dapr.Common/Data/Operations/IDaprStringByteTransitionOperation.cs @@ -0,0 +1,21 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Common.Data.Operations; + +/// +/// Represents a Dapr pipeline operation performed against string values that output as byte arrays. +/// +public interface IDaprStringByteTransitionOperation : IDaprDataOperation> +{ +} diff --git a/src/Dapr.Common/Data/Operations/IDaprTStringTransitionOperation.cs b/src/Dapr.Common/Data/Operations/IDaprTStringTransitionOperation.cs new file mode 100644 index 000000000..fb0c73848 --- /dev/null +++ b/src/Dapr.Common/Data/Operations/IDaprTStringTransitionOperation.cs @@ -0,0 +1,21 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Common.Data.Operations; + +/// +/// Identifies an operation that provides a transition from a generic type to a string. +/// +public interface IDaprTStringTransitionOperation : IDaprDataOperation +{ +} diff --git a/src/Dapr.Common/Data/Operations/Providers/Compression/GzipCompressor.cs b/src/Dapr.Common/Data/Operations/Providers/Compression/GzipCompressor.cs new file mode 100644 index 000000000..bb5573765 --- /dev/null +++ b/src/Dapr.Common/Data/Operations/Providers/Compression/GzipCompressor.cs @@ -0,0 +1,59 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.IO.Compression; + +namespace Dapr.Common.Data.Operations.Providers.Compression; + +/// +public sealed class GzipCompressor : IDaprDataCompressor +{ + /// + /// The name of the operation. + /// + public string Name => "Dapr.Compression.Gzip"; + + /// + /// Executes the data processing operation. + /// + /// The input data. + /// Cancellation token. + /// The output data and metadata for the operation. + public async Task>> ExecuteAsync(ReadOnlyMemory input, CancellationToken cancellationToken = default) + { + using var outputStream = new MemoryStream(); + await using (var gzipStream = new GZipStream(outputStream, CompressionMode.Compress)) + { + await gzipStream.WriteAsync(input, cancellationToken); + } + + //Replace the existing payload with the compressed payload + return new DaprOperationPayload>(outputStream.ToArray()); + } + + /// + /// Reverses the data operation. + /// + /// The processed input data being reversed. + /// The prefix value of the keys containing the operation metadata. + /// Cancellation token. + /// The reversed output data and metadata for the operation. + public async Task>> ReverseAsync(DaprOperationPayload> input, string metadataPrefix, CancellationToken cancellationToken) + { + using var inputStream = new MemoryStream(input.Payload.ToArray()); + await using var gzipStream = new GZipStream(inputStream, CompressionMode.Decompress); + using var outputStream = new MemoryStream(); + await gzipStream.CopyToAsync(outputStream, cancellationToken); + return new DaprOperationPayload>(outputStream.ToArray()); + } +} diff --git a/src/Dapr.Common/Data/Operations/Providers/Compression/IDaprDataCompressor.cs b/src/Dapr.Common/Data/Operations/Providers/Compression/IDaprDataCompressor.cs new file mode 100644 index 000000000..bf1678e15 --- /dev/null +++ b/src/Dapr.Common/Data/Operations/Providers/Compression/IDaprDataCompressor.cs @@ -0,0 +1,21 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Common.Data.Operations.Providers.Compression; + +/// +/// Identifies an operation that provides a data compression capability. +/// +public interface IDaprDataCompressor : IDaprByteBasedOperation +{ +} diff --git a/src/Dapr.Common/Data/Operations/Providers/Encoding/IDaprDataEncoder.cs b/src/Dapr.Common/Data/Operations/Providers/Encoding/IDaprDataEncoder.cs new file mode 100644 index 000000000..f39fa6767 --- /dev/null +++ b/src/Dapr.Common/Data/Operations/Providers/Encoding/IDaprDataEncoder.cs @@ -0,0 +1,21 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Common.Data.Operations.Providers.Encoding; + +/// +/// Identifies an operation that encodes strings into byte arrays. +/// +public interface IDaprDataEncoder : IDaprStringByteTransitionOperation +{ +} diff --git a/src/Dapr.Common/Data/Operations/Providers/Encoding/Utf8Encoder.cs b/src/Dapr.Common/Data/Operations/Providers/Encoding/Utf8Encoder.cs new file mode 100644 index 000000000..fd973306f --- /dev/null +++ b/src/Dapr.Common/Data/Operations/Providers/Encoding/Utf8Encoder.cs @@ -0,0 +1,54 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Common.Data.Operations.Providers.Encoding; + +/// +/// Responsible for encoding a string to a byte array. +/// +public sealed class Utf8Encoder : IDaprDataEncoder +{ + /// + /// The name of the operation. + /// + public string Name => "Dapr.Encoding.Utf8"; + + /// + /// Executes the data processing operation. + /// + /// The input data. + /// Cancellation token. + /// The output data and metadata for the operation. + public Task>> ExecuteAsync(string? input, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(input, nameof(input)); + + var bytes = System.Text.Encoding.UTF8.GetBytes(input); + var result = new DaprOperationPayload>(bytes); + return Task.FromResult(result); + } + + /// + /// Reverses the data operation. + /// + /// The processed input data being reversed. + /// The prefix value of the keys containing the operation metadata. + /// Cancellation token. + /// The reversed output data and metadata for the operation. + public Task> ReverseAsync(DaprOperationPayload> input, string metadataPrefix, CancellationToken cancellationToken = default) + { + var strValue = System.Text.Encoding.UTF8.GetString(input.Payload.Span); + var result = new DaprOperationPayload(strValue); + return Task.FromResult(result); + } +} diff --git a/src/Dapr.Common/Data/Operations/Providers/Integrity/IDaprDataValidator.cs b/src/Dapr.Common/Data/Operations/Providers/Integrity/IDaprDataValidator.cs new file mode 100644 index 000000000..d455643b2 --- /dev/null +++ b/src/Dapr.Common/Data/Operations/Providers/Integrity/IDaprDataValidator.cs @@ -0,0 +1,21 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Common.Data.Operations.Providers.Integrity; + +/// +/// Identifies an operation that provides data integrity validation. +/// +public interface IDaprDataValidator : IDaprByteBasedOperation +{ +} diff --git a/src/Dapr.Common/Data/Operations/Providers/Integrity/Sha256Validator.cs b/src/Dapr.Common/Data/Operations/Providers/Integrity/Sha256Validator.cs new file mode 100644 index 000000000..cd0f39b17 --- /dev/null +++ b/src/Dapr.Common/Data/Operations/Providers/Integrity/Sha256Validator.cs @@ -0,0 +1,91 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Security.Cryptography; + +namespace Dapr.Common.Data.Operations.Providers.Integrity; + +/// +/// Provides a data integrity validation service using an SHA256 hash. +/// +public class Sha256Validator : IDaprDataValidator +{ + /// + /// The name of the operation. + /// + public string Name => "Dapr.Integrity.Sha256"; + + /// + /// The key containing the hash value in the metadata. + /// + private const string HashKey = "hash"; + + /// + /// Executes the data processing operation. + /// + /// The input data. + /// Cancellation token. + /// The output data and metadata for the operation. + public async Task>> ExecuteAsync(ReadOnlyMemory input, CancellationToken cancellationToken = default) + { + var checksum = await CalculateChecksumAsync(input, cancellationToken); + var result = new DaprOperationPayload>(input); + result.Metadata.Add(HashKey, checksum); + return result; + } + + /// + /// Reverses the data operation. + /// + /// The processed input data being reversed. + /// The prefix value of the keys containing the operation metadata. + /// Cancellation token. + /// The reversed output data and metadata for the operation. + public async Task>> ReverseAsync(DaprOperationPayload> input, string metadataPrefix, CancellationToken cancellationToken) + { + var checksumKey = GetChecksumKey(metadataPrefix); + if (input.Metadata.TryGetValue(checksumKey, out var checksum)) + { + var newChecksum = await CalculateChecksumAsync(input.Payload, cancellationToken); + if (!string.Equals(checksum, newChecksum)) + { + throw new DaprException("Data integrity check failed. Checksums do not match."); + } + } + + //If there's no checksum metadata of if it matches, just continue with the next operation + return new DaprOperationPayload>(input.Payload); + } + + /// + /// Creates the SHA256 representing the checksum on the value. + /// + /// The data to create the hash from. + /// Cancellation token. + /// A task containing the base64 hash value. + private async static Task CalculateChecksumAsync(ReadOnlyMemory data, CancellationToken cancellationToken) + { + using var sha256 = SHA256.Create(); + await using var memoryStream = new MemoryStream(data.Length); + await memoryStream.WriteAsync(data, cancellationToken); + memoryStream.Position = 0; + var hash = await sha256.ComputeHashAsync(memoryStream, cancellationToken); + return Convert.ToBase64String(hash); + } + + /// + /// Get the key used to store the hash in the metadata. + /// + /// The key value. + private static string GetChecksumKey(string keyPrefix) => $"{keyPrefix}{HashKey}"; +} diff --git a/src/Dapr.Common/Data/Operations/Providers/Serialization/IDaprDataSerializer.cs b/src/Dapr.Common/Data/Operations/Providers/Serialization/IDaprDataSerializer.cs new file mode 100644 index 000000000..2f3490fc0 --- /dev/null +++ b/src/Dapr.Common/Data/Operations/Providers/Serialization/IDaprDataSerializer.cs @@ -0,0 +1,21 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Common.Data.Operations.Providers.Serialization; + +/// +/// Identifies an operation that provides a data serialization capability. +/// +public interface IDaprDataSerializer : IDaprTStringTransitionOperation +{ +} diff --git a/src/Dapr.Common/Data/Operations/Providers/Serialization/SystemTextJsonSerializer.cs b/src/Dapr.Common/Data/Operations/Providers/Serialization/SystemTextJsonSerializer.cs new file mode 100644 index 000000000..02ae6d650 --- /dev/null +++ b/src/Dapr.Common/Data/Operations/Providers/Serialization/SystemTextJsonSerializer.cs @@ -0,0 +1,72 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Text.Json; + +namespace Dapr.Common.Data.Operations.Providers.Serialization; + +/// +/// Provides serialization capabilities using System.Text.Json. +/// +public sealed class SystemTextJsonSerializer : IDaprDataSerializer +{ + /// + /// Optionally provided . + /// + private JsonSerializerOptions? options = new (JsonSerializerDefaults.Web); + + /// + /// The name of the operation. + /// + public string Name => "Dapr.Serialization.SystemTextJson"; + + /// + /// Executes the data processing operation. + /// + /// The input data. + /// Cancellation token. + /// The output data and metadata for the operation. + public Task> ExecuteAsync(T? input, CancellationToken cancellationToken = default) + { + var jsonResult = JsonSerializer.Serialize(input, options); + var result = new DaprOperationPayload(jsonResult); + + return Task.FromResult(result); + } + + /// + /// Reverses the data operation. + /// + /// The processed input data being reversed. + /// The prefix value of the keys containing the operation metadata. + /// Cancellation token. + /// The reversed output data and metadata for the operation. + public Task> ReverseAsync(DaprOperationPayload input, string metadataPrefix, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(input.Payload, nameof(input)); + + var value = JsonSerializer.Deserialize(input.Payload, options); + var result = new DaprOperationPayload(value); + return Task.FromResult(result); + } + + /// + /// Used to provide a to the operation. + /// + /// The configuration options to use. + public void UseOptions(JsonSerializerOptions jsonSerializerOptions) + { + this.options = jsonSerializerOptions; + } +} diff --git a/test/Dapr.Client.Test/BulkPublishEventApiTest.cs b/test/Dapr.Client.Test/BulkPublishEventApiTest.cs index 14b946d66..0ecde0f12 100644 --- a/test/Dapr.Client.Test/BulkPublishEventApiTest.cs +++ b/test/Dapr.Client.Test/BulkPublishEventApiTest.cs @@ -11,6 +11,8 @@ // limitations under the License. // ------------------------------------------------------------------------ +using Dapr.Common; + namespace Dapr.Client.Test { using System; diff --git a/test/Dapr.Common.Test/DaprDefaultTest.cs b/test/Dapr.Common.Test/DaprDefaultTest.cs index ef4d0da3c..42e4318ba 100644 --- a/test/Dapr.Common.Test/DaprDefaultTest.cs +++ b/test/Dapr.Common.Test/DaprDefaultTest.cs @@ -1,4 +1,17 @@ -using System; +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System; using Microsoft.Extensions.Configuration; using Xunit; diff --git a/test/Dapr.Common.Test/Data/Attributes/DataPipelineAttributeTests.cs b/test/Dapr.Common.Test/Data/Attributes/DataPipelineAttributeTests.cs new file mode 100644 index 000000000..0b7bd130f --- /dev/null +++ b/test/Dapr.Common.Test/Data/Attributes/DataPipelineAttributeTests.cs @@ -0,0 +1,50 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Text.Unicode; +using Dapr.Common.Data.Attributes; +using Dapr.Common.Data.Operations.Providers.Compression; +using Dapr.Common.Data.Operations.Providers.Encoding; +using Dapr.Common.Data.Operations.Providers.Serialization; +using Xunit; + +namespace Dapr.Common.Test.Data.Attributes; + +public class DataPipelineAttributeTests +{ + [Fact] + public void DataOperationAttribute_ShouldThrowExceptionForInvalidTypes() + { + // Arrange & Act & Assert + Assert.Throws(() => new DataPipelineAttribute(typeof(InvalidOperation))); + } + + [Fact] + public void DataOperationAttribute_ShouldRegisterValidTypes() + { + // Arrange & Act + var attribute = new DataPipelineAttribute(typeof(GzipCompressor), typeof(Utf8Encoder), typeof(SystemTextJsonSerializer)); + + // Assert + Assert.Equal(3, attribute.DataOperationTypes.Count); + Assert.Contains(typeof(GzipCompressor), attribute.DataOperationTypes); + Assert.Contains(typeof(Utf8Encoder), attribute.DataOperationTypes); + Assert.Contains(typeof(SystemTextJsonSerializer), attribute.DataOperationTypes); + } + + private sealed class InvalidOperation + { + } + + private sealed record MyRecord(string Name); +} diff --git a/test/Dapr.Common.Test/Data/DaprDecoderPipelineTests.cs b/test/Dapr.Common.Test/Data/DaprDecoderPipelineTests.cs new file mode 100644 index 000000000..6c0b391fc --- /dev/null +++ b/test/Dapr.Common.Test/Data/DaprDecoderPipelineTests.cs @@ -0,0 +1,196 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System; +using System.Collections.Generic; +using System.Text; +using System.Text.Unicode; +using System.Threading; +using System.Threading.Tasks; +using Dapr.Common.Data; +using Dapr.Common.Data.Attributes; +using Dapr.Common.Data.Extensions; +using Dapr.Common.Data.Operations; +using Dapr.Common.Data.Operations.Providers.Compression; +using Dapr.Common.Data.Operations.Providers.Encoding; +using Dapr.Common.Data.Operations.Providers.Integrity; +using Dapr.Common.Data.Operations.Providers.Serialization; +using Microsoft.Extensions.DependencyInjection; +using Xunit; + +namespace Dapr.Common.Test.Data; + +public class DaprDecoderPipelineTests +{ + [Fact] + public async Task ReverseAsync_ShouldReverseOperationsInMetadataOrder() + { + // Arrange + var operations = new List + { + new GzipCompressor(), + new SystemTextJsonSerializer(), + new Utf8Encoder(), + new Sha256Validator() + }; + var opNames = new Stack(); + opNames.Push("Dapr.Serialization.SystemTextJson[0]"); + opNames.Push("Dapr.Encoding.Utf8[0]"); + opNames.Push("Dapr.Compression.Gzip[0]"); + opNames.Push("Dapr.Integrity.Sha256[0]"); + + var pipeline = new DaprDecoderPipeline(operations, opNames); + + // Act + var payload = Convert.FromBase64String("H4sIAAAAAAAACqtWykvMTVWyUgpOzC3ISVXSUSpLzCkFChia1gIAotvhPBwAAAA="); + var metadata = new Dictionary + { + { "Dapr.Integrity.Sha256-hash", "x9yYvPm6j9Xd7X1Iwz08iQFKidQQXR9giprO3SBZg7Y=" }, + { + "ops", + "Dapr.Serialization.SystemTextJson[0],Dapr.Encoding.Utf8[0],Dapr.Compression.Gzip[0],Dapr.Integrity.Sha256[0]" + } + }; + var result = await pipeline.ReverseProcessAsync(payload, metadata); + + Assert.Equal("Sample", result.Payload.Name); + Assert.Equal(15, result.Payload.Value); + } + + [Fact] + public async Task EndToEndTest() + { + // Arrange + var services = new ServiceCollection(); + services.AddDaprDataProcessingPipeline() + .WithCompressor() + .WithSerializer>() + .WithIntegrity(_ => new Sha256Validator()) + .WithEncoder(c => new Utf8Encoder()); + + var serviceProvider = services.BuildServiceProvider(); + var factory = serviceProvider.GetRequiredService(); + + var encoderPipeline = factory.CreateEncodingPipeline(); + var record = new SimpleRegistration("This is merely a test!"); + + // Act + var encodedPayload = await encoderPipeline.ProcessAsync(record, CancellationToken.None); + + // Assert + Assert.NotNull(encodedPayload); + Assert.True(encodedPayload.Payload.Length > 0); + Assert.Equal("H4sIAAAAAAAACqtWykvMTVWyUgrJyCxWAKLc1KLUnEqFRIWS1OISRaVaAF3KYX0hAAAA", + Convert.ToBase64String(encodedPayload.Payload.Span)); + Assert.NotNull(encodedPayload.Metadata); + Assert.True(encodedPayload.Metadata.ContainsKey("ops")); + Assert.Equal("Dapr.Serialization.SystemTextJson[0],Dapr.Encoding.Utf8[0],Dapr.Compression.Gzip[0],Dapr.Integrity.Sha256[0]", encodedPayload.Metadata["ops"]); + Assert.True(encodedPayload.Metadata.ContainsKey("Dapr.Integrity.Sha256[0]hash")); + Assert.Equal("Ehr18bGgwtfe/uq8MbfnIQkbsUYOAHt7xWNAecRo2DI=", encodedPayload.Metadata["Dapr.Integrity.Sha256[0]hash"]); + + // Act #2 + var decoderPipeline = factory.CreateDecodingPipeline(encodedPayload.Metadata); + var decodedPayload = await decoderPipeline.ReverseProcessAsync(encodedPayload.Payload, encodedPayload.Metadata); + + // Assert #2 + Assert.Equal(record, decodedPayload.Payload); + } + + [Fact] + public async Task EndToEndTest_ShouldFailValidationWithBadHashValue() + { + // Arrange + var services = new ServiceCollection(); + services.AddDaprDataProcessingPipeline() + .WithCompressor() + .WithSerializer>() + .WithIntegrity(_ => new Sha256Validator()) + .WithEncoder(c => new Utf8Encoder()); + + var serviceProvider = services.BuildServiceProvider(); + var factory = serviceProvider.GetRequiredService(); + + var encoderPipeline = factory.CreateEncodingPipeline(); + var record = new SimpleRegistration("This is merely a test!"); + + // Act + var encodedPayload = await encoderPipeline.ProcessAsync(record, CancellationToken.None); + + // Assert + Assert.NotNull(encodedPayload); + Assert.True(encodedPayload.Payload.Length > 0); + Assert.Equal("H4sIAAAAAAAACqtWykvMTVWyUgrJyCxWAKLc1KLUnEqFRIWS1OISRaVaAF3KYX0hAAAA", + Convert.ToBase64String(encodedPayload.Payload.Span)); + Assert.NotNull(encodedPayload.Metadata); + Assert.True(encodedPayload.Metadata.ContainsKey("ops")); + Assert.Equal("Dapr.Serialization.SystemTextJson[0],Dapr.Encoding.Utf8[0],Dapr.Compression.Gzip[0],Dapr.Integrity.Sha256[0]", encodedPayload.Metadata["ops"]); + Assert.True(encodedPayload.Metadata.ContainsKey("Dapr.Integrity.Sha256[0]hash")); + Assert.Equal("Ehr18bGgwtfe/uq8MbfnIQkbsUYOAHt7xWNAecRo2DI=", encodedPayload.Metadata["Dapr.Integrity.Sha256[0]hash"]); + + encodedPayload.Metadata["Dapr.Integrity.Sha256[0]hash"] = "abc123"; + + // Act & Assert #2 + var decoderPipeline = factory.CreateDecodingPipeline(encodedPayload.Metadata); + await Assert.ThrowsAsync(async () => + await decoderPipeline.ReverseProcessAsync(encodedPayload.Payload, encodedPayload.Metadata)); + } + + [Fact] + public async Task EndToEndWithDuplicateOperations() + { + // Arrange + var services = new ServiceCollection(); + services.AddDaprDataProcessingPipeline() + .WithCompressor() + .WithSerializer>() + .WithIntegrity(_ => new Sha256Validator()) + .WithEncoder(c => new Utf8Encoder()); + + var serviceProvider = services.BuildServiceProvider(); + var factory = serviceProvider.GetRequiredService(); + + var encoderPipeline = factory.CreateEncodingPipeline(); + var record = new DuplicateRegistration("Don't worry - this is only a test!"); + + // Act + var encodedPayload = await encoderPipeline.ProcessAsync(record, CancellationToken.None); + + // Assert + Assert.NotNull(encodedPayload); + Assert.True(encodedPayload.Payload.Length > 0); + Assert.Equal("H4sIAAAAAAAACpPv5mAAA67VYae8z/iGbgoqOnm+W9PUwMBIP1DjvL7WqpALoRonT+iEMSz6s2eOV6tL66Qrj4Rcl0YxiFw4c8oIqBUAdhx5/UQAAAA=", + Convert.ToBase64String(encodedPayload.Payload.Span)); + Assert.NotNull(encodedPayload.Metadata); + Assert.True(encodedPayload.Metadata.ContainsKey("ops")); + Assert.Equal("Dapr.Serialization.SystemTextJson[0],Dapr.Encoding.Utf8[0],Dapr.Compression.Gzip[0],Dapr.Integrity.Sha256[0],Dapr.Compression.Gzip[1],Dapr.Integrity.Sha256[1]", encodedPayload.Metadata["ops"]); + Assert.True(encodedPayload.Metadata.ContainsKey("Dapr.Integrity.Sha256[0]hash")); + Assert.Equal("9+H+ngzx1fru8VdywlpoT0E20JqBXm1k81Un/o7z0ZM=", encodedPayload.Metadata["Dapr.Integrity.Sha256[0]hash"]); + Assert.True(encodedPayload.Metadata.ContainsKey("Dapr.Integrity.Sha256[1]hash")); + Assert.Equal("r9EkN6xWpuB9saAWGy92aGvU0T8dkLt2Kur5/ItSf2s=", encodedPayload.Metadata["Dapr.Integrity.Sha256[1]hash"]); + + // Act #2 + var decoderPipeline = factory.CreateDecodingPipeline(encodedPayload.Metadata); + var decodedPayload = await decoderPipeline.ReverseProcessAsync(encodedPayload.Payload, encodedPayload.Metadata); + + // Assert #2 + Assert.Equal(record, decodedPayload.Payload); + } + + private record SampleRecord(string Name, int Value); + + [DataPipeline(typeof(GzipCompressor), typeof(SystemTextJsonSerializer), typeof(Utf8Encoder), typeof(Sha256Validator))] + private record SimpleRegistration(string Name); + + [DataPipeline(typeof(SystemTextJsonSerializer), typeof(GzipCompressor), typeof(Utf8Encoder), typeof(Sha256Validator), typeof(GzipCompressor), typeof(Sha256Validator))] + private record DuplicateRegistration(string Name); +} diff --git a/test/Dapr.Common.Test/Data/DaprEncoderPipelineTests.cs b/test/Dapr.Common.Test/Data/DaprEncoderPipelineTests.cs new file mode 100644 index 000000000..a563d65d9 --- /dev/null +++ b/test/Dapr.Common.Test/Data/DaprEncoderPipelineTests.cs @@ -0,0 +1,100 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Dapr.Common.Data; +using Dapr.Common.Data.Attributes; +using Dapr.Common.Data.Operations; +using Dapr.Common.Data.Operations.Providers.Compression; +using Dapr.Common.Data.Operations.Providers.Encoding; +using Dapr.Common.Data.Operations.Providers.Integrity; +using Dapr.Common.Data.Operations.Providers.Serialization; +using Xunit; + +namespace Dapr.Common.Test.Data; + +public class DaprEncoderPipelineTests +{ + [Fact] + public async Task ProcessAsync_ShouldProcessBasicOperations() + { + // Arrange + var operations = new List + { + new SystemTextJsonSerializer(), + new Utf8Encoder() + }; + var pipeline = new DaprEncoderPipeline(operations); + + // Act + var result = await pipeline.ProcessAsync(new SampleRecord("Sample", 15)); + + // Assert + Assert.Equal("eyJuYW1lIjoiU2FtcGxlIiwidmFsdWUiOjE1fQ==", Convert.ToBase64String(result.Payload.Span)); + Assert.True(result.Metadata.ContainsKey("ops")); + Assert.Equal("Dapr.Serialization.SystemTextJson[0],Dapr.Encoding.Utf8[0]", result.Metadata["ops"]); + } + + [Fact] + public async Task ProcessAsync_ShouldProcessOptionalOperations() + { + // Arrange + var operations = new List + { + new GzipCompressor(), + new SystemTextJsonSerializer(), + new Utf8Encoder(), + new Sha256Validator() + }; + var pipeline = new DaprEncoderPipeline(operations); + + // Act + var result = await pipeline.ProcessAsync(new SampleRecord("Sample", 15)); + + Assert.Equal("H4sIAAAAAAAACqtWykvMTVWyUgpOzC3ISVXSUSpLzCkFChia1gIAotvhPBwAAAA=", Convert.ToBase64String(result.Payload.Span)); + Assert.Equal(2, result.Metadata.Keys.Count); + Assert.True(result.Metadata.ContainsKey("Dapr.Integrity.Sha256[0]hash")); + Assert.Equal("x9yYvPm6j9Xd7X1Iwz08iQFKidQQXR9giprO3SBZg7Y=", result.Metadata["Dapr.Integrity.Sha256[0]hash"]); + Assert.True(result.Metadata.ContainsKey("ops")); + Assert.Equal("Dapr.Serialization.SystemTextJson[0],Dapr.Encoding.Utf8[0],Dapr.Compression.Gzip[0],Dapr.Integrity.Sha256[0]", result.Metadata["ops"]); + } + + [Fact] + public async Task ProcessAsync_ShouldProcessDuplicateOperations() + { + // Arrange + var operations = new List + { + new GzipCompressor(), + new SystemTextJsonSerializer(), + new Utf8Encoder(), + new Sha256Validator(), + new GzipCompressor() + }; + var pipeline = new DaprEncoderPipeline(operations); + + // Act + var result = await pipeline.ProcessAsync(new SampleRecord("Sample", 15)); + + Assert.Equal("H4sIAAAAAAAACpPv5mAAA67VYae8z/iGbgri8juje8Iz9FKglvcZTVYuiVnXmBgW3X5oIwNUBQAguNy9LwAAAA==", Convert.ToBase64String(result.Payload.Span)); + Assert.Equal(2, result.Metadata.Keys.Count); + Assert.True(result.Metadata.ContainsKey("Dapr.Integrity.Sha256[0]hash")); + Assert.Equal("x9yYvPm6j9Xd7X1Iwz08iQFKidQQXR9giprO3SBZg7Y=", result.Metadata["Dapr.Integrity.Sha256[0]hash"]); + Assert.True(result.Metadata.ContainsKey("ops")); + Assert.Equal("Dapr.Serialization.SystemTextJson[0],Dapr.Encoding.Utf8[0],Dapr.Compression.Gzip[0],Dapr.Integrity.Sha256[0],Dapr.Compression.Gzip[1]", result.Metadata["ops"]); + } + + private record SampleRecord(string Name, int Value); +} diff --git a/test/Dapr.Common.Test/Data/DataPipelineFactoryTests.cs b/test/Dapr.Common.Test/Data/DataPipelineFactoryTests.cs new file mode 100644 index 000000000..5723f3201 --- /dev/null +++ b/test/Dapr.Common.Test/Data/DataPipelineFactoryTests.cs @@ -0,0 +1,186 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Collections.Generic; +using Dapr.Common.Data; +using Dapr.Common.Data.Attributes; +using Dapr.Common.Data.Extensions; +using Dapr.Common.Data.Operations.Providers.Compression; +using Dapr.Common.Data.Operations.Providers.Encoding; +using Dapr.Common.Data.Operations.Providers.Integrity; +using Dapr.Common.Data.Operations.Providers.Serialization; +using Microsoft.Extensions.DependencyInjection; +using Xunit; + + namespace Dapr.Common.Test.Data; + +public class DataPipelineFactoryTests +{ + [Fact] + public void CreatePipeline_ShouldCreateProcessingPipelineWithCorrectOperations() + { + // Arrange + var services = new ServiceCollection(); + services.AddDaprDataProcessingPipeline() + .WithCompressor() + .WithSerializer>() + .WithIntegrity(_ => new Sha256Validator()) + .WithEncoder(c => new Utf8Encoder()); + + var serviceProvider = services.BuildServiceProvider(); + var factory = serviceProvider.GetRequiredService(); + + // Act + var pipeline = factory.CreateEncodingPipeline(); + + // Assert + Assert.NotNull(pipeline); + } + + [Fact] + public void CreatePipeline_ShouldThrowIfSerializationTypeNotRegisteredForProcessingPipeline() + { + // Arrange + var services = new ServiceCollection(); + services.AddDaprDataProcessingPipeline() + .WithCompressor() + .WithIntegrity(_ => new Sha256Validator()) + .WithEncoder(c => new Utf8Encoder()); + + var serviceProvider = services.BuildServiceProvider(); + var factory = serviceProvider.GetRequiredService(); + + // Act & Assert + Assert.Throws(() => factory.CreateEncodingPipeline()); + } + + [Fact] + public void CreatePipeline_ShouldThrowIEncodingTypeNotRegisteredForProcessingPipeline() + { + // Arrange + var services = new ServiceCollection(); + services.AddDaprDataProcessingPipeline() + .WithCompressor() + .WithSerializer>() + .WithIntegrity(_ => new Sha256Validator()); + + var serviceProvider = services.BuildServiceProvider(); + var factory = serviceProvider.GetRequiredService(); + + // Act & Assert + Assert.Throws(() => factory.CreateEncodingPipeline()); + } + + [Fact] + public void CreatePipeline_ShouldThrowIfSerializationTypeNotRegisteredForReverseProcessingPipeline() + { + // Arrange + var services = new ServiceCollection(); + services.AddDaprDataProcessingPipeline() + .WithCompressor() + .WithIntegrity(_ => new Sha256Validator()) + .WithEncoder(c => new Utf8Encoder()); + + var serviceProvider = services.BuildServiceProvider(); + var factory = serviceProvider.GetRequiredService(); + var metadata = new Dictionary + { + { "Dapr.Integrity.Sha256-hash", "x9yYvPm6j9Xd7X1Iwz08iQFKidQQXR9giprO3SBZg7Y=" }, + { + "ops", + "Dapr.Serialization.SystemTextJson[0],Dapr.Encoding.Utf8[0],Dapr.Compression.Gzip[0],Dapr.Integrity.Sha256[0]" + } + }; + + // Act & Assert + Assert.Throws(() => factory.CreateDecodingPipeline(metadata)); + } + + [Fact] + public void CreatePipeline_ShouldThrowIfEncodingTypeNotRegisteredForReverseProcessingPipeline() + { + // Arrange + var services = new ServiceCollection(); + services.AddDaprDataProcessingPipeline() + .WithCompressor() + .WithIntegrity(_ => new Sha256Validator()) + .WithSerializer>(); + + var serviceProvider = services.BuildServiceProvider(); + var factory = serviceProvider.GetRequiredService(); + var metadata = new Dictionary + { + { "Dapr.Integrity.Sha256-hash", "x9yYvPm6j9Xd7X1Iwz08iQFKidQQXR9giprO3SBZg7Y=" }, + { + "ops", + "Dapr.Serialization.SystemTextJson[0],Dapr.Encoding.Utf8[0],Dapr.Compression.Gzip[0],Dapr.Integrity.Sha256[0]" + } + }; + + // Act & Assert + Assert.Throws(() => factory.CreateDecodingPipeline(metadata)); + } + + [Fact] + public void CreatePipeline_ShouldThrowIfOpsNotDefinedInMetadata() + { + // Arrange + var services = new ServiceCollection(); + services.AddDaprDataProcessingPipeline() + .WithCompressor() + .WithIntegrity(_ => new Sha256Validator()) + .WithSerializer>(); + + var serviceProvider = services.BuildServiceProvider(); + var factory = serviceProvider.GetRequiredService(); + var metadata = new Dictionary + { + { "Dapr.Integrity.Sha256[0]hash", "x9yYvPm6j9Xd7X1Iwz08iQFKidQQXR9giprO3SBZg7Y=" } + }; + + // Act & Assert + Assert.Throws(() => factory.CreateDecodingPipeline(metadata)); + } + + [Fact] + public void CreatePipeline_ShouldCreateReversePipelineWithCorrectOperations() + { + // Arrange + var services = new ServiceCollection(); + services.AddDaprDataProcessingPipeline() + .WithCompressor() + .WithSerializer>() + .WithIntegrity(_ => new Sha256Validator()) + .WithEncoder(c => new Utf8Encoder()); + + var serviceProvider = services.BuildServiceProvider(); + var factory = serviceProvider.GetRequiredService(); + var metadata = new Dictionary + { + { "Dapr.Integrity.Sha256-hash", "x9yYvPm6j9Xd7X1Iwz08iQFKidQQXR9giprO3SBZg7Y=" }, + { + "ops", + "Dapr.Serialization.SystemTextJson[0],Dapr.Encoding.Utf8[0],Dapr.Compression.Gzip[0],Dapr.Integrity.Sha256[0]" + } + }; + + // Act + var pipeline = factory.CreateDecodingPipeline(metadata); + + // Assert + Assert.NotNull(pipeline); + } + + [DataPipeline(typeof(GzipCompressor), typeof(Utf8Encoder), typeof(SystemTextJsonSerializer))] + private sealed record SampleRecord(string Name, int Value, bool Flag); +} diff --git a/test/Dapr.Common.Test/Data/Extensions/DaprDataPipelineRegistrationBuilderExtensions.cs b/test/Dapr.Common.Test/Data/Extensions/DaprDataPipelineRegistrationBuilderExtensions.cs new file mode 100644 index 000000000..133402173 --- /dev/null +++ b/test/Dapr.Common.Test/Data/Extensions/DaprDataPipelineRegistrationBuilderExtensions.cs @@ -0,0 +1,188 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Dapr.Common.Data.Extensions; +using Dapr.Common.Data.Operations; +using Dapr.Common.Data.Operations.Providers.Compression; +using Dapr.Common.Data.Operations.Providers.Integrity; +using Dapr.Common.Data.Operations.Providers.Serialization; +using Microsoft.Extensions.DependencyInjection; +using Xunit; + +namespace Dapr.Common.Test.Data.Extensions; + +public class DaprDataPipelineRegistrationBuilderExtensionsTests +{ + [Fact] + public void AddDaprDataProcessingPipeline_ShouldReturnDaprDataProcessingPipelineBuilder() + { + // Arrange + var services = new ServiceCollection(); + + // Act + var result = services.AddDaprDataProcessingPipeline(); + + // Assert + Assert.NotNull(result); + Assert.IsType(result); + } + + [Fact] + public void WithSerializer_ShouldRegisterSerializationService() + { + // Arrange + var services = new ServiceCollection(); + + // Act + services.AddDaprDataProcessingPipeline() + .WithSerializer>(); + + // Assert + var serviceProvider = services.BuildServiceProvider(); + var service = serviceProvider.GetService(); + Assert.NotNull(service); + Assert.True(service is SystemTextJsonSerializer); + } + + [Fact] + public void WithSerializer_ShouldRegisterSerializationFactory() + { + // Arrange + var services = new ServiceCollection(); + + // Act + services.AddDaprDataProcessingPipeline() + .WithSerializer(_ => new SystemTextJsonSerializer()); + + // Assert + var serviceProvider = services.BuildServiceProvider(); + var service = serviceProvider.GetService(); + Assert.NotNull(service); + Assert.True(service is SystemTextJsonSerializer); + } + + [Fact] + public void WithCompressor_ShouldRegisterType() + { + // Arrange + var services = new ServiceCollection(); + + // Act + services.AddDaprDataProcessingPipeline() + .WithCompressor(); + + // Assert + var serviceProvider = services.BuildServiceProvider(); + var service = serviceProvider.GetService(); + Assert.NotNull(service); + Assert.True(service is GzipCompressor); + } + + [Fact] + public void WithCompressor_ShouldRegisterFactory() + { + // Arrange + var services = new ServiceCollection(); + + // Act + services.AddDaprDataProcessingPipeline() + .WithCompressor(_ => new GzipCompressor()); + + // Assert + var serviceProvider = services.BuildServiceProvider(); + var service = serviceProvider.GetService(); + Assert.NotNull(service); + Assert.True(service is GzipCompressor); + } + + [Fact] + public void WithIntegrity_ShouldRegisterType() + { + // Arrange + var services = new ServiceCollection(); + + // Act + services.AddDaprDataProcessingPipeline() + .WithIntegrity(); + + // Assert + var serviceProvider = services.BuildServiceProvider(); + var service = serviceProvider.GetService(); + Assert.NotNull(service); + Assert.True(service is Sha256Validator); + } + + [Fact] + public void WithIntegrity_ShouldRegisterFactory() + { + // Arrange + var services = new ServiceCollection(); + + // Act + services.AddDaprDataProcessingPipeline() + .WithIntegrity(_ => new Sha256Validator()); + + // Assert + var serviceProvider = services.BuildServiceProvider(); + var service = serviceProvider.GetService(); + Assert.NotNull(service); + Assert.True(service is Sha256Validator); + } + + [Fact] + public void WithDaprOperation_ShouldRegisterScopedService() + { + // Arrange + var services = new ServiceCollection(); + + // Act + services.AddDaprDataProcessingPipeline() + .WithSerializer>(ServiceLifetime.Scoped); + + // Assert + var serviceProvider = services.BuildServiceProvider(); + using var scope = serviceProvider.CreateScope(); + var service = scope.ServiceProvider.GetService(); + Assert.NotNull(service); + } + + [Fact] + public void WithDaprOperation_ShouldRegisterTransientService() + { + // Arrange + var services = new ServiceCollection(); + + // Act + services.AddDaprDataProcessingPipeline() + .WithSerializer(_ => new SystemTextJsonSerializer(), ServiceLifetime.Transient); + + // Assert + var serviceProvider = services.BuildServiceProvider(); + var service1 = serviceProvider.GetService(); + var service2 = serviceProvider.GetService(); + Assert.NotNull(service1); + Assert.NotNull(service2); + Assert.NotSame(service1, service2); + } + + private sealed record SampleRecord(string Name, int Count); + + private class MockOperation : IDaprDataOperation + { + /// + /// The name of the operation. + /// + public string Name => "Test"; + } +} + diff --git a/test/Dapr.Common.Test/Data/Operators/Providers/Compression/GzipCompressorTest.cs b/test/Dapr.Common.Test/Data/Operators/Providers/Compression/GzipCompressorTest.cs new file mode 100644 index 000000000..be15e63e3 --- /dev/null +++ b/test/Dapr.Common.Test/Data/Operators/Providers/Compression/GzipCompressorTest.cs @@ -0,0 +1,56 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Dapr.Common.Data.Operations.Providers.Compression; + +namespace Dapr.Common.Test.Data.Operators.Providers.Compression; + +using System; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +public class GzipCompressorTests +{ + [Fact] + public async Task ExecuteAsync_ShouldCompressData() + { + // Arrange + var compressor = new GzipCompressor(); + var input = new ReadOnlyMemory(new byte[] { 1, 2, 3, 4, 5 }); + + // Act + var result = await compressor.ExecuteAsync(input); + + // Assert + Assert.NotNull(result); + Assert.NotEqual(input, result.Payload); + } + + [Fact] + public async Task ReverseAsync_ShouldDecompressData() + { + // Arrange + var compressor = new GzipCompressor(); + var input = new ReadOnlyMemory(new byte[] { 1, 2, 3, 4, 5 }); + var compressedResult = await compressor.ExecuteAsync(input); + + // Act + var result = await compressor.ReverseAsync(compressedResult, string.Empty, CancellationToken.None); + + // Assert + Assert.NotNull(result); + Assert.Equal(input.ToArray(), result.Payload.ToArray()); + } +} + diff --git a/test/Dapr.Common.Test/Data/Operators/Providers/Encoding/Utf8EncoderTest.cs b/test/Dapr.Common.Test/Data/Operators/Providers/Encoding/Utf8EncoderTest.cs new file mode 100644 index 000000000..57eaf95c6 --- /dev/null +++ b/test/Dapr.Common.Test/Data/Operators/Providers/Encoding/Utf8EncoderTest.cs @@ -0,0 +1,41 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Dapr.Common.Data.Operations.Providers.Encoding; +using Xunit; + +namespace Dapr.Common.Test.Data.Operators.Providers.Encoding; + +public class Utf8EncoderTest +{ + [Fact] + public async Task ExecuteAsync_ShouldEncodeData() + { + // Arrange + var encoder = new Utf8Encoder(); + const string input = "This is a test value!"; + + // Act + var encodedResult = await encoder.ExecuteAsync(input); + + // Assert + Assert.NotNull(encodedResult); + Assert.Equal("VGhpcyBpcyBhIHRlc3QgdmFsdWUh", Convert.ToBase64String(encodedResult.Payload.Span)); + } + + [Fact] + public async Task ReverseAsync_ShouldDecodeData() + { + // Arrange + var encoder = new Utf8Encoder(); + const string input = "This is a test value!"; + var encodedResult = await encoder.ExecuteAsync(input); + + // Act + var reverseResult = await encoder.ReverseAsync(encodedResult, string.Empty, CancellationToken.None); + + // Assert + Assert.NotNull(reverseResult); + Assert.Equal(input, reverseResult.Payload); + } +} diff --git a/test/Dapr.Common.Test/Data/Operators/Providers/Integrity/Sha256ValidatorTest.cs b/test/Dapr.Common.Test/Data/Operators/Providers/Integrity/Sha256ValidatorTest.cs new file mode 100644 index 000000000..859643f22 --- /dev/null +++ b/test/Dapr.Common.Test/Data/Operators/Providers/Integrity/Sha256ValidatorTest.cs @@ -0,0 +1,78 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System; +using System.Threading; +using System.Threading.Tasks; +using Dapr.Common.Data.Operations; +using Dapr.Common.Data.Operations.Providers.Integrity; +using Xunit; + +namespace Dapr.Common.Test.Data.Operators.Providers.Integrity; + +public class Sha256ValidatorTests +{ + [Fact] + public async Task ExecuteAsync_ShouldCalculateChecksum() + { + // Arrange + var validator = new Sha256Validator(); + var input = new ReadOnlyMemory(new byte[] { 1, 2, 3, 4, 5 }); + + // Act + var result = await validator.ExecuteAsync(input); + + // Assert + Assert.NotNull(result); + Assert.True(result.Metadata.ContainsKey("hash")); + } + + [Fact] + public async Task ReverseAsync_ShouldValidateChecksumWithoutMetadataHeader() + { + // Arrange + var validator = new Sha256Validator(); + var input = new ReadOnlyMemory(new byte[] { 1, 2, 3, 4, 5 }); + var result = new DaprOperationPayload>(input); + + await validator.ReverseAsync(result, $"{validator.Name}[0]", CancellationToken.None); + } + + [Fact] + public async Task ReverseAsync_ShouldValidateChecksum() + { + // Arrange + var validator = new Sha256Validator(); + var input = new ReadOnlyMemory(new byte[] { 1, 2, 3, 4, 5 }); + var result = await validator.ExecuteAsync(input); + + // Act & Assert + await validator.ReverseAsync(result, $"{validator.Name}[0]", CancellationToken.None); + } + + [Fact] + public async Task ReverseAsync_ShouldThrowExceptionForInvalidChecksum() + { + // Arrange + var validator = new Sha256Validator(); + var input = new ReadOnlyMemory(new byte[] { 1, 2, 3, 4, 5 }); + var result = await validator.ExecuteAsync(input); + result = result with + { + Payload = new ReadOnlyMemory(new byte[] { 6, 7, 8, 9, 0 }) + }; + + // Act & Assert + await Assert.ThrowsAsync(() => validator.ReverseAsync(result,string.Empty, CancellationToken.None)); + } +} diff --git a/test/Dapr.Common.Test/Data/Operators/Providers/Serialization/SystemTextJsonSerializerTest.cs b/test/Dapr.Common.Test/Data/Operators/Providers/Serialization/SystemTextJsonSerializerTest.cs new file mode 100644 index 000000000..d49a5377d --- /dev/null +++ b/test/Dapr.Common.Test/Data/Operators/Providers/Serialization/SystemTextJsonSerializerTest.cs @@ -0,0 +1,56 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Threading.Tasks; +using Dapr.Common.Data.Operations; +using Dapr.Common.Data.Operations.Providers.Serialization; +using Xunit; + +namespace Dapr.Common.Test.Data.Operators.Providers.Serialization; + +public class SystemTextJsonSerializerTest +{ + [Fact] + public async Task ExecuteAsync_ShouldSerialize() + { + //Arrange + var validator = new SystemTextJsonSerializer(); + var input = new TestObject("Test", 15); + + //Act + var result = await validator.ExecuteAsync(input); + + //Assert + Assert.NotNull(result); + Assert.Equal("{\"name\":\"Test\",\"count\":15}", result.Payload); + } + + [Fact] + public async Task ReverseAsync_ShouldDeserialize() + { + //Arrange + var validator = new SystemTextJsonSerializer(); + const string input = "{\"name\":\"Test\",\"count\":15}"; + var payload = new DaprOperationPayload(input); + + //Act + var result = await validator.ReverseAsync(payload, string.Empty); + + //Assert + Assert.NotNull(result); + var expected = new TestObject("Test", 15); + Assert.Equal(expected, result.Payload); + } + + private record TestObject(string Name, int Count); +}