The WorksAudit Book
Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode

Extract-Transform-Load (ETL)

ETL

WorksAudit ETL is a process that transform raw log data from HUE, Collab, EBM, AC, Expense to a common Parquet format searchable by AWS Athena service. This ETL is running as a Spark job in AWS Glue service.

ETL Architecture

Following diagram summarizes the structure of the ETL:

ETL Architecture

The diagram can be described as follows:

  1. All specific ETL logic (integrated, HUE, AC) are based on (or an extension of) a collection of layered core modules (wap-audit-transform-core, wap-audit-transform-spark, wap-audit-transform-glue).
  2. The vision is that in the ideal future, all logs are supposed to be in Protobuf format, thus only integrated ETL will be necessary. Currently HUE and other ETL modules are necessary as a temporary solution to process existing log format.
  3. The layered core modules are designed such that the inner layer is more “generic” or “platform-independent” than the outer layer:
    1. The innermost layer wap-audit-transform-core is the heart of the transformation that is simply a function that transforms one type T to another U (Parquet data structure).
    2. The next layer wap-audit-transform-spark handles Spark-specific data type. It extracts the generic type T from Spark data structure Dataset. It also wraps the generic type U in Spark data structure Dataset.
    3. The outermost layer wap-audit-transform-glue hosts the Glue Job script that interfaces with AWS Glue system. This layer creates Spark data structure from Glue-specific data structure DynamicFrame and wraps the output back to this structure.

ETL Modules

Core (wap-audit-transform-core)

All the base ideas for the ETL are implemented in this module. Those are:

  1. Transformation. The transformation (AuditTransform) is the heart of the ETL. It simply transforms input (AuditTransformInput) structure to the output (AuditTransformOutput) structure.
  2. Sentence Generator. Sentence generator is a simple multilingual text processor that replaces variables in a string pattern with values. The main function for this pattern processing is here. As can be seen, the input of the function is the pattern, and all the components (subject, verb, object, etc.) that should replace the variables in the pattern specified.
  3. Multilingual Data Management. The multilingual data is accessible using an object called MultilingualReference which is basically an abstraction of a lookup engine.

This Core layer is independent of any execution environment, and dependent only on Scala standard libraries (and some additional general-purpose libraries, but not much).

Notes on Transformation

The main logic of transformation is shown here:

def transform[T <: AuditTransformInput](input: T): AuditParquetOutput =
  failableTransform(input) match {
    case Success(value) => value
    case Failure(exception) =>
      // serialize the stack trace to string, and put it in an empty output's resources
      // so that it can be logged by external process
      val sw = new StringWriter
      exception.printStackTrace(new PrintWriter(sw))
      AuditParquetOutput.emptyOutput(sw.toString)
  }

As can be recognized from the function signature, it simply transforms an input of type T (an extension of AuditTransformInput]) into an output of type AuditParquetOutput. How this is done is by doing a “failable transformation”. Failable transformation is defined as:

  1. Transformation of a set of data (fields) that some might fail individually, but the process of the whole transformation continues (resumes) to process another field. For example, the structure may include field time and user. The transformation of time data may fail, but the process resumes to try to transform user.
  2. The transformation errors will be collected, and stored as part of the output.
  3. There may be some errors that are considered fatal. These errors will abort the whole ETL process.

failableTransform is a sequence of “extract operations” that the result is being set to the output. The actual value transformation itself are implemented in those extract operations. For example following is the first transformation:

val (tenantName, r1) =
  resume(input.extractTenantName, AuditParquetOutput.emptyString, List())

The extractTenantName function, like all other extraction functions returns Try[T]. When an exception happen on extracting a value, tenantName will take a value of the default specified (AuditParquetOutput.emptyString), and the exception itself will not be thrown but appended to a List (third argument). r1 is the resulting List with possibly an exception appended. Note that the function is pure, and the output is a new List (not the same List being passed as argument).

The last extraction looks like this:

val (custom_meta_10, resumables) =
  resume(input.extractCustomMeta10, AuditParquetOutput.emptyString, r74)

The output resumables is the final collection of exceptions that may happen during all extractions. This is then collected and put into transform_error: Array[String] field in the output:

resumables
  .map(_.getMessage)
  .toArray

The purpose of putting of the errors in the output is so that:

  1. The transformation can always be successful (not being stopped because some of bad data). Audit log is produced by many products and will inevitably contain some unclean data.
  2. The Viewer may decide on how to present the unavailability of a value to the end user.
Notes on Sentence Generator and Multilingual Data Management

The sentence generator and multilingual data management architecture is as follows:

ETL Architecture

The generateSentence function receives IDs of multilingual resources (sentence pattern and sentence components IDs), and uses MetaMultilingualReference to resolve the actual multilingual value of all specified IDs.

Following code shows how this generateSentence code is being used:

    for (tenant <- extractTenantName;
         landscape <- extractLandscapeName;
         sentenceId <- resolveSentenceId;
         userId <- extractUserId;
         verbId <- extractVerbId;
         objectId <- extractObjectId;
         indirectObjectId <- extractIndirectObjectId;
         modifierId <- extractModifierId;
         word1Id <- extractWord1Id;
         word2Id <- extractWord2Id;
         word3Id <- extractWord3Id;
         word4Id <- extractWord4Id;
         word5Id <- extractWord5Id)
      yield
        MultilingualSentenceUtil
          .generateSentence(
            ref = MetaMultilingualReference,
            patternId = sentenceId,
            subjectId = Some(userId),
            verbId = Some(verbId),
            objectId = Some(objectId),
            indirectId = Some(indirectObjectId),
            modifierId = Some(modifierId),
            word1Id = Some(word1Id),
            word2Id = Some(word2Id),
            word3Id = Some(word3Id),
            word4Id = Some(word4Id),
            word5Id = Some(word5Id)
          )(
            subjectLookup = multilingualSubjectLookup match {
              case Some(f) => f
              case _       => MetaMultilingualReference.lookup(tenant, landscape)
            },
            verbLookup = multilingualVerbLookup match {
              case Some(f) => f
              case _       => MetaMultilingualReference.lookup
            },
            objectLookup = multilingualObjectLookup match {
              case Some(f) => f
              case _       => MetaMultilingualReference.lookup(tenant, landscape)
            },
            indirectLookup = multilingualIndirectLookup match {
              case Some(f) => f
              case _       => MetaMultilingualReference.lookup(tenant, landscape)
            },
            modifierLookup = multilingualModifierLookup match {
              case Some(f) => f
              case _       => MetaMultilingualReference.lookup(tenant, landscape)
            },
            word1Lookup = multilingualWord1Lookup match {
              case Some(f) => f
              case _       => MetaMultilingualReference.lookup(tenant, landscape)
            },
            word2Lookup = multilingualWord2Lookup match {
              case Some(f) => f
              case _       => MetaMultilingualReference.lookup(tenant, landscape)
            },
            word3Lookup = multilingualWord3Lookup match {
              case Some(f) => f
              case _       => MetaMultilingualReference.lookup(tenant, landscape)
            },
            word4Lookup = multilingualWord4Lookup match {
              case Some(f) => f
              case _       => MetaMultilingualReference.lookup(tenant, landscape)
            },
            word5Lookup = multilingualWord5Lookup match {
              case Some(f) => f
              case _       => MetaMultilingualReference.lookup(tenant, landscape)
            }
          )
          .map(_.asMultilingualJson) match {
          case Some(result) => result
          case _            => throw new ResumableException("message.i18n.unresolvable")
        }

The try-yield structure is basically instructing to “extract all necessary IDs”, if one of the ID is invalid, then the output sentence is invalid. Most of these extraction function will return a default value (ID for empty string mostly), even when it is unable to extract any ID.

Some ID lookup could be tenant-landscape specific, thus the lookup function used passes tenant and landscape arguments:

subjectLookup = multilingualSubjectLookup match {
  case Some(f) => f
  case _       => MetaMultilingualReference.lookup(tenant, landscape)
}

In above case subject is tenant-landscape specific (subject is usually user ID, and users are tenant/landscape-specific).

The MetaMultilingualReference is a wrapper to available implementations of MultilingualReference trait. This meta object simply dispatches the lookup requests to 3 implementations of MultilingualReference:

  1. ResourcesMultilingualReference. This object loads all multilingual text resource files listed in resources/multilingual.registry file (which is actually all .csv files under resources. There is a reason why the object doesn’t just load everything under resources).
  2. S3SimpleMultilingualReference. This object loads multilingual file from S3 bucket.
  3. S3TenantMultilingualReference. This object lazily loads multilingual file from S3 bucket for a specific tenant/landscape. Only when there is a lookup to a particular tenant/landscape will the file loaded.
Multilingual String Resources File Format

All multilingual string resources, whether stored in resources folder (baked into the ETL jar file), or stored remotely in S3 bucket has the same format, that is a “modified” CSV file. The parser for this CSV file is defined here. The rules are:

  1. Empty, whitespace-only lines will be ignored.
  2. Lines started with # will be ignored. Usually these lines are used for comments.
  3. Only 3 fields currently can be defined, separated by commas, in the following order:
    1. id: identifier of the multilingual string resource.
    2. en: English version of the text.
    3. ja: Japanese version of the text.
  4. Comma within the text can be escaped using backslash. e.g. I\, robot.

Following are some example of the entries:

# some examples

# this is the name of an activity, the ID will have following format:
# generic/activity/{activity-id}
generic/activity/web/open/page,Page Opening,画面の表示

# this is an example of sentence pattern
generic/active.svo,${subject} ${verb} ${object}.,${subject}が${object}を${verb}しました。

Spark (wap-audit-transform-spark)

The purpose of the Spark layer is to make the Core layer executable in a Spark cluster. The main function for transformation using Spark is SparkAuditTransform.transform as shown here:

  def transform[T <: AuditTransformInput](spark: SparkSession,
                                          input: Dataset[T]): Dataset[AuditParquetOutput] = {
    input
      .map(AuditTransform.transform)(Encoders.product[AuditParquetOutput])
      .filter(maybeEmpty =>
        if (maybeEmpty.isEmpty) {
          logger.error("Error transforming input: " + maybeEmpty.resources)
          false
        } else true)
  }

The transformation from Spark perspective is basically just mapping all the contents of the Dataset using Core layer’s AuditTransform.transform. To see how this function executed in Spark, see the stand alone tester.

Loading Dataset for AuditTransformInput Implementation

One of the most important task for Spark layer is to load data source into a data structure that implements AuditTransformInput trait.

ETL need to read from different kind of data sources (Protobuf, HUE raw log, etc.) and load it to different type of AuditTransformInput implementations, e.g. AuditIntegratedTransformInput for Protobuf, and AuditHueTransformInput for HUE. Following function SparkAuditTransform.transform:

  def loadDataset[T <: Product](inputClassName: String,
                                dataFramePreprocessorName: Option[String],
                                dataFrame: DataFrame): Dataset[T] = {

    val preprocessor: DataFrame => DataFrame =
      preprocessFunction(inputClassName, dataFramePreprocessorName)
        .getOrElse(dataFrame_ => dataFrame_)

    preprocessor(dataFrame)
      .as(
        newProductEncoder(
          typeTag(inputClassName)
        )
      )
  }

The typeTag is a reflection utility to load the class given a class name. This class type would then be used to create an encoder to read the data source (in Spark’s DataFrame into the type specified in the argument). Note that some input class needs to be preprocessed, this can be customized by passing a function name into dataFramePreprocessorName argument. An example of this is following Glue argument for AuditIntegratedTransformInput:

--AUDIT_INPUT_DATAFRAME_PROCESSOR_FUNCTION=preprocess

This will load the function AuditIntegratedTransformInput.preprocess to preprocess the data before being processed in the transformation.

Glue (wap-audit-transform-spark)

Glue layer is basically the one Glue job script that calls the Spark layer. This is the entry point of the whole ETL process.

Following are the steps in the script:

  1. Processing mandatory arguments.
     val args = GlueArgParser.getResolvedOptions(
       sysArgs,
       Seq(
         "JOB_NAME",
         "AUDIT_INPUT_DB_NAME",
         //....
       ).toArray
     )
    
     Job.init(args("JOB_NAME"), glueContext, args.asJava)
    
     val inputDatabaseName = args("AUDIT_INPUT_DB_NAME")
     // ....
    
  2. Processing optional arguments.
     val preprocessorArgName = "AUDIT_INPUT_DATAFRAME_PROCESSOR_FUNCTION"
     val inputDataFramePreprocessorName =
       if (sysArgs.contains(s"--$preprocessorArgName"))
         Some(
           GlueArgParser.getResolvedOptions(sysArgs, Array(preprocessorArgName))(
             preprocessorArgName))
       else
         None
     // ....
    
  3. Getting the data source. Depending on whether filter expression (to limit the processing, i.e. for only certain dates) is provided, pushDownPredicate is being used.
     val dataSource =
       partitionFilterExpression match {
         case Some(expression) =>
           glueContext
             .getCatalogSource(
               database = inputDatabaseName,
               tableName = inputTableName,
               redshiftTmpDir = "",
               transformationContext = "auditDataSource",
               pushDownPredicate = expression
             )
             .getDynamicFrame()
         case None =>
           glueContext
             .getCatalogSource(
               database = inputDatabaseName,
               tableName = inputTableName,
               redshiftTmpDir = "",
               transformationContext = "auditDataSource"
             )
             .getDynamicFrame()
       }
    
    
  4. Calculating data repartition. This repartition is used for data processing optimization. Due to different nature of the data source (some have large number of small size partitions, some have small number of large size partitions), adjustments to the number of partitions can be made by pasing different values. No definite rules on the right value for repartition arguments, it’s mostly set based on experiments.
     val repartitionNum = {
       if (partitionTarget > 0)
         partitionTarget
       else if (repartitionMultiplier >= 0)
         repartitionMultiplier * dataSource.getNumPartitions
       else dataSource.getNumPartitions / repartitionMultiplier.abs
     }
    
  5. The data transformation using Spark layer.
       val outputDataset = SparkAuditTransform.transform(
         glueContext.getSparkSession,
         SparkAuditTransform.loadDataset(
           inputClassName,
           inputDataFramePreprocessorName,
           if (repartitionNum > dataSource.getNumPartitions)
             dataSource
               .repartition(repartitionNum)
               .toDF()
           else if (repartitionNum < dataSource.getNumPartitions && dataSource.getNumPartitions >= coalesceThreshold)
             dataSource
               .coalesce(repartitionNum)
               .toDF()
           else
             dataSource.toDF()
         )
       )
    
    
  6. Writing the output.
       val outputDynamicFrame =
         DynamicFrame.apply(outputDataset.toDF(), glueContext)
    
       glueContext
         .getSinkWithFormat(connectionType = "s3",
                            options = JsonOptions(
                              Map("path" -> outputPath,
                                  "partitionKeys" -> Array("tenant_name",
                                                           "landscape_name",
                                                           "date"))),
                            format = "parquet")
         .writeDynamicFrame(outputDynamicFrame)
    
    

Integrated (wap-audit-transform-integrated)

Integrated ETL is the implementation of data transformation for Protobuf data source. The main class is the implementation of AuditTransformInput trait, that is AuditIntegratedTransformInput. The signature of the class constructor is as follows:

case class AuditIntegratedTransformInput(tenant: String = "",
                                         landscape: String = "",
                                         service: String = "",
                                         timestamp: Long = 0L,
                                         data: String = "",
                                         version: AuditProtobufProducerVersion =
                                           AuditProtobufProducerVersion())
    extends AbstractAuditTransformInput {
//.....
}

This constructor will be automatically called by the Spark layer when reading the JSON from the data source. Following is a sample record of the data source that will populate the argument values:

{"tenant":"ki4g","landscape":"production","service":"wap/hue","timestamp":1628725129,"version":{"platform":"java","protobuf":"1.0.15","producer":"1.10.0-SNAPSHOT"},"data":"Ci....=="}

data field is the encrypted Protobuf data that contains the actual audit log. The data itself then will be processed by Protobuf deserializer:

val activity = new ProtobufActivity(data)

The rest of the code is basically extracting information from Protobuf data.

Custom Activity Logic

Many of activities have different way to extract values. To represent this difference an abstraction called logic is used. An example on where this logic is used is here:

  override def extractActivityType: Try[String] =
    activity.clause
      .flatMap(_.logic)
      .flatMap(_.activityType)

This code shows that the activity-specific logic is loaded in order to get activityType.

There are many implementations of logic in com.worksap.company.audit.transform.integrated.protobuf.logic package. Many activity types share logic implementations, some have their own specific implementation. Most of logic are direct or indirect descendant of DefaultLogic.

How the correct logic is being loaded is implemented in ProtobufActivityClause:

private val webPattern = raw"/web/(.*)".r
// ...

def logic: Try[ActivityTypeLogic] = activityType.map {

   case webPattern(restOfType) =>
      if (restOfType == "open/page") {
         if (this.activity.description.startsWith("collabo/authn")) {
            new DefaultLogic(
               this,
               verbIdCustomizer = _ => Some("generic/verb/access"),
               modifierIdCustomizer = _ =>
                  new ProtobufMapHelper(activity.context)
                          .get(HueContextKey.CLIENT_IP_ADDRESS.name),
               objectIdCustomizer = _ => Some("special/word/the.login.page")
            )
            // .....
         }
         // ....
      }
   //.....
   
}

In this structure activityType is being mapped to DefaultLogic when it matches the webPattern regular expression (started with /web/), and followed by string open/page, etc.

HUE (wap-audit-transform-hue)

HUE module is the ETL for transforming HUE Native Raw Log format. The main implementation is AuditHueTransformInput. HUE Raw Log file is a collection of tab-separated values with an encrypted payload containing the actual audit log. This tab-separated values will be automatically passed into the raw_log argument:

case class AuditHueTransformInput(raw_log: String,
                                  tenant: String,
                                  landscape: String,
                                  log_type: String,
                                  audit_log_type: String,
                                  date: String,
                                  log_section: String,
                                  aggregator_id: String) extends AbstractAuditTransformInput {

   val hueRawLog: HueRawLog = new HueRawLog(raw_log)
   
   // ....
}

HueRawLog class decrypts the payload and deserialize the fields.

Custom Activity Logic

Like in Integrated ETL, HUE also has different logic to handle different activities. The logic is instantiated here like:

  val logic: Try[HueActivityTypeLogic] =
    hueRawLog
      .triedPayload
      .flatMap(_.activity)
      .flatMap(_.logic(hueRawLog) match {
        case Some(value) => Success(value)
        case _ => Failure(new IllegalArgumentException("Unable to resolve logic"))
      })

Loading appropriate logic for an activity type is performed here, like:

val MAPPING: Map[String, HueActivityMetadata] = Map(
    // -------------------------------------------------------------------------
    // Data activity
    // -------------------------------------------------------------------------
    "DATA_CHANGE" -> HueActivityMetadata(
      "/data/operate/data",
      l => new DataChangeLogic(l)
    ),
    // .....

For each of the HUE’s native activity type (e.g. DATA_CHANGE), there is an equivalent WorksAudit activity type (e.g. /data/operate/data) and associated logic to handle data extraction (DataChangeLogic in above example).

AC (wap-audit-transform-ac)

AC module is for transforming AC “access log”. The main implementation class is `ACTransformInput:

case class AuditACTransformInput(csv: String,
                                 tenant: String,
                                 landscape: String,
                                 year: String,
                                 month: String,
                                 day: String)
    extends AbstractAuditTransformInput {
   
   // ....
}

AC access log format is basic CSV and that value will be injected to csv argument of this class.

The other important class for this ETL module is the mapping from AC’s activity type to WorksAudit activity type:

case class ACActivityMapper(csvRecord: ACCSVRecord) {

  def activityType = csvRecord.messageCode match {
    case Some("1009000002") => Success("/authentication/login/nil/success")
    case Some("1009000003") => Success("/authentication/logout")
//....

Note that this file is autogenerated by using /scripts/generate_ac_activity_mapper.py script. In order to update the mapper file, update the specification file before running the sript.