Pipeline Configuration

The following files are provided in the config folder to configure the pipeline

  • orctypemappingdef.xml - type mapping for AVRO output

  • avrotypemappingdef.xml - type mapping for ORC output

  • messagetype.xsd - message schema

  • log4j2.xml - logging configuration

  • invariant-hdfs-adapter - Configuration settings for HDFS connection

  • datapipelineservices.yml - List of tables to be streamed including mapping file references

  • targetDataStore.yml - database connection details

  • brkadapter.properties - Kafka connection details

  • inbound.topics.brkadapter.properties - List of Kafka topics

  • securitybrkadapter.properties - Security settings

HDFS Adapter

The HDFS pipeline service is used to capture DB events data from a Kafka topic, which is then written to HDFS and mapped to a table for immediate querying from within Hive. The data can be from event publication (delimited) format or from CDC source (AVRO format)

The data from Kafka is parsed, processed and written in ORC storage format onto separate folders corresponding to each table and into daily partitions. The HDFS adapter is integrates with Hive and can automatically create new tables or map data onto existing predefined tables.

The invariant-hdfs-adapter.yml file specifies the configuration parameters for connecting with the Hadoop system

# Configuration 
hadoop.home: /opt/invariant/current/hadoop-client/
hive.home: /opt/invariant/current/hive-server2/ 
connect.hdfs.principal: admin 
hadoop.conf.dir: /opt/invariant/current/hadoop-client/conf/ 
hdfs.url: hdfs://invserver:8020/
 
hive.metastore.uris: thrift://invserver:9083 
hive.integration: true 
partitioner.class: io.invariant.invhdfsadapter.partitioner.DailyPartitioner 
hive.database: test 
flush.size: 3
 
HDFS_AUTHENTICATION_KERBEROS_CONFIG: false 
hdfs.namenode.principal: 
connect.hdfs.keytab: 
tables.dir:dev 
hadoop.hive.warehouse.basedir: /apps/hive/warehouse/

The datapipeline services are configured using the “datapipelineservices” YAML file, which is placed along with all of the pipeline configurations in the pipeline directory. The data pipeline consists of multiple steps with each step having a reader, processer and writer configuration.

The pipeline configuration consists of a general section for pipeline details followed by one or more pipeline steps. The general config contains the name of the pipeline, the port for interaction, the log directory for keeping track of process state and an error log directory for tracking errors. Ensure the pipeline is run as a user that has read/write access to these directories and there is a minimum of 10 GB of space in these directories.

dataPipelineName: CaseDataPipe
walDir: /opt/invariant/pipelinedir/waldir
errorlogDir: /opt/invariant/pipelinedir/hdfs-ppl/errdir
messageCharDelimiter: "\u001D"
The pipeline steps include a reader, processer and writer configuration.
 pipelineSteps:
   - !datapipelinestep
     dataPiplineStepName: ppl-step-one
     processer:
       !invdatapplprocesser
       processbatchsize: 100
       processtimewindowms: 200
     reader:
       !invbrkreader
       invResourceName: topicDev01
       msgreader:
         !qrepdelimitedmsgreader
         messageDef:
           - tablea-d.xml
           - tablea-i.xml
           - tablea-u.xml
           - tableb-d.xml
           - tableb-i.xml
           - tableb-u.xml

The datapipeline name is used for metrics reporting and has to be unique for the step.

Processer

Processer is based on the specific use case. For processing DB event message to Hadoop, the processer used is !invdatapplprocesser

  • processbatchsize specifies the no of records that is processed as part of the microbatch

  • processtimewindowms is the time in milliseconds that the processer waits prior to waiting for more records.

The processer uses a combination of processbatchsize & processtimewindowms when processing a stream of records. If there is a continuous stream of records present then the processer will process micro-batches that are made up of the record count defined in the processbatchsize configuration. If there is not a continuous stream of records present then the processer waits for the time in milliseconds as configured in processtimewindowms to proceed with processing the batch.

Reader

The reader configuration should include the type of reader and the resource name. !invbrkreader denotes the type of reader used to process the stream from the broker. invResourceName denotes the topic that is used to read the events for processing.

Message Reader

The Message reader section details out the type of message that makes up the event. for reading Event publish messages we need to define that as !qrepdelimitedmsgreader and configure the delimiters. The messageCharDelimiter as "\u001D" and recordCharDelimiter is "\u001E". please check with the Event publishing team on the message column and record delimiter.

messageDef is a collection of XML definitions that defines the columns of interest and their respective mapping for each database event - insert,update,delete from predefined set of tables. one definition is expected for each of the DML action per table. As the column positions, values published could be different based on the action and table.

The message definition contains details of the source and target, including information about a set of columns which are defined by the DB2 Event publishing. the data in position 6 refers to ibmsnap_operation which defines the DML action, the data in position 5 is provides the table where the DML action was executed. These two pieces of data is very critical in identifying the message type so the proper transformation can be applied and save in the target store.

A collection of columns including some of the DB2 Event publishing specific columns are defined with details such as the position of the column in the message, the source and target name, type and any necessary format information that would be used in the transformation.

<tns:MessageDef xmlns:tns="http:invariant.io/commontypes"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http:invariant.io/commontypes messagetype.xsd"
            xsi:type="tns:QRepDelimitedMessageType">
       <sourcetablename>sourcetablename</sourcetablename>
       <sourcedbname>sourcedb</sourcedbname>
       <sourcetablenameposition>5</sourcetablenameposition>
       <targettablename>targettablename</targettablename>
       <columns>
         <column  colposition="6">
           <sourceDef>
              <colname>ibmsnap_operation</colname>
              <coltype>VARCHAR</coltype>
           </sourceDef>
           <targetDef>
              <colname>ibmsnap_operation</colname>
              <coltype>VARCHAR</coltype>
           </targetDef>
         </column>
         <column  colposition="9">
           <sourceDef>
            <colname>ibmsnap_logmarker</colname>
            <coltype>TIMESTAMP</coltype>
           </sourceDef>
         <targetDef>
            <colname>ibmsnap_logmarker</colname>
            <coltype>TIMESTAMP</coltype>
         </targetDef>
         </column>
      <column colposition="12">
         <sourceDef>
           <colname>COLUMNA</colname>
            <coltype>BIGINT</coltype>
            <format/>
         </sourceDef>
         <targetDef>
            <colname>COLUMNA</colname>
            <coltype>BIGINT</coltype>
            <format/>
         </targetDef>
      </column>
     </columns>
    <dmlaction>DLET</dmlaction>
    <dmlactionposition>6</dmlactionposition>
 </tns:MessageDef>

The last section in the pipeline def is the writer definition. This consists of the defining the appropriate writer and its specific configuration.

   writer:
       !invhdfsorcwriter
       includeCreateTime: true
       invResourceName: targetschema
       targetContainerDef:
         - tablea.xml
         - tableb.xml

For writing messages to HDFS as ORC, use “!invhdfsorcwriter” writer. There are two other properties included in the HDFS writer

includeCreateTime: true   

This ensures a timestamp column is populated with the current time of the record as its being processed.

invResourceName: targetschema

This provides the resource/schema that is used to write these targets to within HDFS.

The writer definition includes a collection of target definitions which will be used to persist the processed messages. The target container definition is xml based and consists of the name of the target, includes the columns and respective types that make up the target and finally the list of record identifiers that can be used to uniquely identify the record.

<?xml version="1.0" encoding="UTF-8"?>
<tns:ContainerDef  xmlns:tns="http:invariant.io/commontypes"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http:invariant.io/commontypes messagetype.xsd">
   <name>TARGETTABLENAME</name>
   <version>1</version>
   <recordIdentifiers>
     <colname>COLUMNA</colname>
   </recordIdentifiers>
   <columns>
      <column>
         <colname>ibmsnap_operation</colname>
         <coltype>VARCHAR</coltype>
      </column>
      <column>
         <colname>ibmsnap_logmarker</colname>
         <coltype>TIMESTAMP</coltype>
      </column>
      <column>
         <colname>COLUMNA</colname>
         <coltype>BIGINT</coltype>
      </column>
   </columns>
</tns:ContainerDef>

Last updated