Muy buenas, me llamo Miguel y aquí les traigo este artículo.
Operaciones por lotes de S3 es una característica relativamente reciente de Servicio de almacenamiento simple de Amazon (S3) que permite el procesamiento de cientos, miles, millones o incluso miles de millones de objetos S3 de forma sencilla y directa.
Permite copiar objetos de un depósito a otro, configurar etiquetas o listas de control de acceso (ACL), iniciar una restauración desde Glacier o invocar una función AWS Lambda en cada objeto.
AWS Lambda es una plataforma informática sin servidor impulsada por eventos que ejecuta fragmentos de código llamados funciones Lambda.
Esta publicación se centrará en cómo usar S3 Batch Operations para invocar una función Lambda escrita en Scala, con sbt como nuestra herramienta de construcción.
Índice
Solicitud y respuesta
Cuando se configura correctamente, un trabajo de operaciones por lotes de S3 envía una solicitud JSON en un formato particular a la función Lambda para que se invoque en los objetos de S3.
A su vez, espera que la función Lambda devuelva una respuesta JSON en un formato específico que indique el éxito o el fracaso de la invocación.
El siguiente es un ejemplo de una solicitud JSON que se envía desde S3 Batch Operations a la función Lambda:
{ "invocationSchemaVersion": "1.0", "invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", "job": { "id": "f3cc4f60-61f6-4a2b-8a21-d07600c373ce" }, "tasks": [ { "taskId": "dGFza2lkZ29lc2hlcmUK", "s3Key": "mediaImage1.png", "s3VersionId": "1", "s3BucketArn": "arn:aws:s3:us-east-1:0123456789:sample-bucket" } ] }
Los campos más importantes para procesar objetos S3 en la función Lambda son s3Key
, s3VersionId
y s3BucketArn
, que están envueltos en un objeto de tarea.
Cada tarea hace referencia a un objeto S3 que la función Lambda debería procesar. Los otros campos son metadatos sobre el trabajo S3 Batch Operations.
En el momento de redactar este documento, S3 Batch Operations solo envía una tarea por solicitud.
El siguiente es un ejemplo de una respuesta JSON que la función Lambda debe devolver al trabajo S3 Batch Operations:
{ "invocationSchemaVersion": "1.0", "treatMissingKeysAs": "PermanentFailure", "invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", "results": [ { "taskId": "dGFza2lkZ29lc2hlcmUK", "resultCode": "Succeeded", "resultString": "Processed mediaImage1.png" } ] }
Amazon S3 Batch Operations espera dos códigos de estado diferentes en la carga útil de respuesta. El primero es el código de respuesta para toda la solicitud y el segundo es un código de resultado por tarea.
La siguiente tabla contiene los códigos de respuesta y sus comportamientos asociados.
Response Code,Description Succeeded,"The task completed normally. If you requested a job completion report, the task's result string is included in the report." TemporaryFailure,"The task suffered a temporary failure and will be redriven before the job completes. The result string is ignored. If this is the final redrive, the error message is included in the final report." PermanentFailure,"The task suffered a permanent failure. If you requested a job-completion report, the task is marked as Failed and includes the error message string. Result strings from failed tasks are ignored."
Controlador de solicitudes Lambda
Las funciones de AWS Lambda requieren la implementación de una interfaz que consume y procesa datos.
Los detalles específicos varían entre diferentes lenguajes, pero para una función Lambda escrita en Java o Scala, hay dos interfaces para los métodos del controlador. Esas interfaces están documentadas aquí.
Para manejar el formato de solicitud / respuesta de S3 Batch Operations, el RequestStreamHandler se implementará en esta función Lambda. Adicionalmente, Circe se utilizará para la serialización y deserialización de la solicitud y la respuesta.
Para utilizar estas bibliotecas, las dependencias respectivas deben agregarse albuild.sbt
archivo.
libraryDependencies ++= Seq( "com.amazonaws" % "aws-lambda-java-core" % "1.2.1", "io.circe" %% "circe-core" % "0.12.3", "io.circe" %% "circe-generic" % "0.12.3", "io.circe" %% "circe-parser" % "0.12.3" )
Primero, es necesario definir los modelos para la solicitud. La definición de estos modelos captura la estructura de los datos y permite la deserialización con circe.
Circe es una popular biblioteca JSON en Scala que permite la generación semiautomática de códecs JSON para tipos de datos mediante anotaciones. los @JsonCodec
anotación se utiliza para derivar los codificadores y decodificadores JSON para los modelos definidos a continuación.
import io.circe.generic.JsonCodec// Input from S3 Batch Operations to Lambda @JsonCodec case class S3BatchRequest( invocationSchemaVersion: String, invocationId: String, job: S3BatchJob, tasks: List[S3BatchTask] )// S3 Batch Job details @JsonCodec case class S3BatchJob(id: String)// S3 Batch Task model @JsonCodec case class S3BatchTask( taskId: String, s3Key: String, s3VersionId: String, s3BucketArn: String ) { lazy val s3BucketName: String = s3BucketArn.split(":").last }
Y de manera similar para la respuesta:
// Output from Lambda to S3 Batch Operations @JsonCodec case class S3BatchLambdaResponse( invocationSchemaVersion: String, treatMissingKeysAs: String, invocationId: String, results: List[S3BatchResult] )// Output Result for each S3 Batch task @JsonCodec case class S3BatchResult( taskId: String, resultCode: String, resultString: String )
Con la interfaz RequestStreamHandler
, Lambda pasa una InputStream
y OutputStream
al handleRequest
, que debe anularse para implementar la interfaz.
El método del controlador debe leer bytes del flujo de entrada, realizar algún procesamiento y luego escribir el resultado esperado en el flujo de salida.
import com.amazonaws.services.lambda.runtime.{Context, RequestStreamHandler} class S3BatchOperationsLambdaHandler extends RequestStreamHandler { override def handleRequest( is: InputStream, os: OutputStream, context: Context ): Unit = { // processing logic here } }
Esta clase extiende la interfaz RequestStreamHandler
y anula la handleRequest
método según sea necesario. Además, la implementación de este método debe manejar el formato de solicitud y respuesta esperado de S3 Batch Operations.
import io.circe.parser.decode import io.circe.syntax._ import software.amazon.awssdk.services.s3.model.S3Exception import scala.io.Sourceoverride def handleRequest( is: InputStream, os: OutputStream, context: Context ): Unit = { val reqString = Source.fromInputStream(is).mkString val s3BatchRequest = decode[S3BatchRequest](reqString) match { case Right(batchReq) => batchReq case Left(ex) => // if an exception is thrown during the Lambda invocation, // then the ResponseCode is automatically set to // "PermanentFailure". throw ex } val results = processTasks(s3BatchRequest.tasks) returnResults(os, s3BatchRequest, results) }
Aquí el método handleRequest
lee los bytes del flujo de entrada como una cadena, luego decodifica esa cadena JSON en el S3BatchRequest
modelo definido anteriormente.
Una vez deserializados los datos, los objetos S3 que aparecen en la lista de tareas de la solicitud se pueden recuperar y procesar.
Además, se definen métodos auxiliares que contienen lógica para procesar los objetos y comunicar los resultados al trabajo.
def processTasks( tasks: List[S3BatchTask] ): List[S3BatchResult] = for { task <- tasks } yield { val res = processObject(task.s3BucketName, task.s3Key) val (resultCode, resultString) = res match { case Right(res) => ("Succeeded", res) case Left(ex: S3Exception) if ex.awsErrorDetails().errorCode() == "RequestTimeout" => ("TemporaryFailure", ex.getMessage) case Left(ex) => ("PermanentFailure", ex.getMessage) } S3BatchResult( taskId = task.taskId, resultCode = resultCode, resultString = resultString ) }def processObject( s3BucketName: String, s3Key: String ): Either[Exception, String] = { val path = s"$s3BucketName/$s3Key" println(path) Right(path) }
processObject
se llama al método para cada tarea, recibiendo el nombre del depósito S3 y la clave del objeto de destino como parámetros. En un caso de uso real, el objeto podría recuperarse de S3 para realizar un procesamiento adicional, como actualizar campos JSON o eliminar datos confidenciales.
Para simplificar las cosas en esta publicación, la ruta del objeto simplemente se imprime y se devuelve como la cadena de resultado.
Una vez procesado el objeto, se requiere una indicación de si la operación se realizó correctamente o no. Anteriormente en la publicación, se enumeraron tres posibles códigos de respuesta: Succeeded
, TemporaryFailure
y PermanentFailure
.
Si una tarea está marcada como TemporaryFailure
, S3 Batch Operations reintenta automáticamente esa tarea antes de que se complete el trabajo.
Los resultados del procesamiento luego se envuelven en un objeto S3BatchResult
, como se definió anteriormente. El resultado de cada tarea se devuelve en la respuesta a S3 Batch Operations.
def returnResults( os: OutputStream, req: S3BatchRequest, results: List[S3BatchResult] ): Unit = { output = S3BatchLambdaResponse( invocationSchemaVersion = req.invocationSchemaVersion, treatMissingKeysAs = "PermanentFailure", invocationId = req.invocationId, results = results ) outputBytes = output.asJson.noSpaces.getBytes() os.write(outputBytes) }
En este método, los resultados del procesamiento, así como los metadatos relevantes para el trabajo, se incluyen en el S3BatchLambdaResponse
modelo. Luego, la respuesta se serializa como una cadena JSON usando circe, cuyos bytes se escriben en el flujo de salida.
En este punto, la implementación de la función Lambda está completa.
Para utilizar la función con S3 Batch Operations, debe estar empaquetada e implementada. Para empaquetar el código en un JAR, el sbt-montaje
se puede utilizar el complemento. Siguiendo preparar del complemento, el proyecto se puede empaquetar ejecutando el siguiente comando:
sbt assembly
Ahora que el proyecto se ha ensamblado en un archivo JAR, se puede cargar como una función Lambda a través de la consola de AWS o la AWS CLI, como se describe en la documentación.
Crear un trabajo
Para crear un trabajo de operaciones por lotes de S3, es necesario especificar los objetos sobre los que invocar la función Lambda.
Esto se hace definiendo un archivo de manifiesto, que puede tener la forma de un informe de Inventario de S3 o un archivo CSV. Para obtener más detalles sobre cómo especificar un manifiesto, así como la diferencia entre los dos tipos, consulte los documentos de AWS.
Después de especificar el manifiesto, es necesario definir los roles de IAM de la función Lambda, así como el del trabajo S3 Batch Operations.
Ejemplo de política de Lambda IAM:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion", "s3:PutObject", "lambda:InvokeFunction" ], "Resource": "*" } ] }
El s3:GetObject
permiso se usa para recuperar el manifiesto, mientras que el s3:PutObject
permiso es necesario para escribir el informe para el trabajo S3 Batch Operations.
El permiso lambda:InvokeFunction
es necesario para invocar la función Lambda desde el trabajo.
Ejemplo de política de IAM de trabajo de operaciones por lotes de S3:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "lambda:InvokeFunction", "Resource": "arn:aws:lambda:us-east-1:012345678910:function:myS3BatchOperationsLambdaFunction" }, { "Effect": "Allow", "Action": "s3:*", "Resource": "arn:aws:s3:::sample-bucket" } ] }
Con estos roles de IAM implementados, un trabajo de Operaciones por lotes de S3 está listo para crearse.
Una forma de crear un trabajo de operaciones por lotes de S3 es usar el create-job
comando en la CLI. El siguiente ejemplo crea un trabajo de operaciones por lotes de Amazon S3 que invoca una función Lambda mediante la AWS CLI.
aws s3control create-job \ --account-id <AccountID> \ --no-confirmation-required \ --operation '{"LambdaInvoke": { "FunctionArn": "arn:aws:lambda:Region:AccountID:function:LambdaFunctionName" } }' \ --manifest '{"Spec":{"Format":"S3BatchOperations_CSV_20180820","Fields":["Bucket","Key"]},"Location":{"ObjectArn":"arn:aws:s3:::ManifestLocation","ETag":"ManifestETag"}}' \ --report '{"Bucket":"arn:aws:s3:::sample-bucket", "Format":"Report_CSV_20180820", "Enabled":true, "Prefix":"ReportPrefix", "ReportScope":"AllTasks"}' \ --priority 2 \ --role-arn arn:aws:iam::AccountID:role/BatchOperationsRole \ --region Region \ --description "S3 Batch Lambda Invoke" \
A menos --no-confirmation-required
que se pase la marca, el trabajo recién creado debe ejecutarse y confirmarse usando la consola.
Para obtener más información sobre el create-job
comando de la CLI, consulte AWS CLI Command Reference . Es importante tener en cuenta que el tipo de manifiesto se especifica en la opción --manifest
.
Para los manifiestos creados a partir de informes de inventario de S3, este parámetro debe ser "S3InventoryReport_CSV_20161130"
. Para los manifiestos creados con formato CSV normal, se "S3BatchOperations_CSV_20180820"
debe utilizar y se Fields
debe establecer el valor.
Dado que el proceso para crear un trabajo de operaciones por lotes de S3 mediante la consola es bastante sencillo y se ha simplificado últimamente, los documentos de AWS sobre cómo crear un trabajo y operaciones de operaciones por lotes de Amazon S3 deberían ser suficientes para ese método.
Ver los resultados
Mientras se ejecuta un trabajo de operaciones por lotes de S3, la tasa de éxito promedio de todos los objetos procesados previamente se mostrará en tiempo real.
Si al menos el 50% de las operaciones de un trabajo han fallado después de que se hayan intentado más de 1000 operaciones, el trabajo falla automáticamente y deja de realizar más operaciones.
Una vez que el trabajo ha completado su ejecución, los resultados se presentarán por objeto en un archivo CSV guardado en la ubicación especificada para el informe.
S3 Batch Operations brilla más cuando procesa millones o miles de millones de objetos, o grandes cantidades de datos en general. Por ejemplo, si uno necesita eliminar ciertos datos de todos los objetos S3, o si una multitud de archivos JSON necesitan ser aplanados y generados como archivos Parquet , un trabajo de Operaciones por lotes de S3 integrado con Lambda manejaría la tarea bastante bien mientras escalaba fácilmente. procesando millones de objetos en horas.
Gracias por leer este artículo.
Añadir comentario