RDB Feeder 0.4

From wiki.searchtechnologies.com
Jump to: navigation, search

For Information on Aspire 3.1 Click Here

Aspire / Aspire Components / Simple Feeder / RDB Feeder

RDB Feeder
Description: Feeds Aspire documents from an RDB. Can operate in full feed or incremental mode (see below).
Inputs: RDB
Outputs: Separate sub jobs for each RDB record, where each sub-job contains an AspireDocument object containing data from the RDB, published to the configured pipeline manager.
Factory: aspire-rdbfeeder
Sub Type: default
Object Type: Produces AspireDocument objects.

Other Notes

  • This feeder is based on the Simple Feeder
  • Current default is to feed records at 1 per second. Set <feedWait> to "0" (zero) to feed as fast as possible.
  • Note that this component is not the same as the RDB Sub Job Feeder

Feeder Operation

Full feed

Full mode uses SQL from the configuration and executes this against the database configured via the RDB connection stage. Each resulting row is formed in to an Aspire document using the column names as document elements and this document is submitted to a pipeline manager using the event configured for inserts. As the document is created, the value of the column identified in the configuration as the primary key is inserted in to the fetchUrl element of the document. The value insert will be placed in the action attribute of the document.

Example Aspire document from a full feed

 <doc action="insert">
   <ID source="RDBFeederImpl">1</ID>
   <fetchUrl source="RDBFeederImpl">1</fetchUrl>
   <PG_ID source="RDBFeederImpl">1</PG_ID>
   <ENTITY_TYPE source="RDBFeederImpl">productgroup</ENTITY_TYPE>
   <PG_UUID source="RDBFeederImpl">uuid val 1</PG_UUID>
   <PG_WEBSITE source="RDBFeederImpl">bbc.co.uk/eastenders</PG_WEBSITE>
   <PG_TYPE source="RDBFeederImpl">Series</PG_TYPE>
   <feederLabel source="RDBFeederImpl">EM3FullFeeder</feederLabel>
   <feederType source="RDBFeederImpl">RDBFeeder</feederType>
 </doc>

Incremental feed

Incremental mode also uses SQL from the configuration file. However, this time a "pre-update" command is run (optional), followed by a command to extract the data. Once jobs return from the pipeline, a "post-update" command is run for the individual job.

In order to handle updates, you should configure a queue table in your database. (The actual name of the table can be anything, as long as it is referenced consistently in the SQL statements below.) When a row changes in the table a trigger should fire and insert a row into this queue table. The queue table must have (at least these) four columns: sequence, id, action and status. (All of the columns can be named anything you want; the first three are set using config parameters, and the status is used only in the SQL statements below.) The sequence column is a unique incrementing ID for this change. id indicates the ID (primary key) of the row in the main data table that changed. action is a single character indicating the type of change (I for insert, U for update, and D for delete, or N for no change) and status is a flag indicating the current status of the update (suggested values are W for waiting to be processed, I for in progress and C for completed.

Your "pre-update" SQL can then update the status (from W to I) for those updates you wish to process on this pass. This could be all the updates that are not complete, or the first n.

Your data extraction SQL should then join this update table with the main data table, selecting the columns you wish to index (those with a status of I) and the id, sequence and action at a minimum, ordering the rows by increasing sequence number. The column names returned for id, sequence and action must match the column names specified in the <id>, <sequence> and <action> parameters - you may need to use the SQL AS construct.

Aspire documents will then be formed for each row returned and submitted to the pipeline manager using the event defined for the appropriate action. You can configure events for insert, update and delete, which means that these actions can use differing pipelines. As the document is created, the value of the column identified in the configuration as the primary key is inserted into the fetchUrl element of the document. The extracted action value ('insert', 'update', or 'delete') will be placed in the action attribute of the document.

The feeder will ensure that jobs created for a particular document id are finished before another for the same id is started, but jobs for differing ids could be submitted out of sequence order.

When a job is completed successfully, the "post-process" SQL will be executed. This SQL will have the token :SEQ replaced with the sequence number of the job that has completed. This SQL should update the status in the queue table (from I to C) to indicate the update has been processed.

Alternative Process If you wish to keep the number of rows in the queue table to a minimum, you may use the "post-process" to delete the row. You may then not need to use the "pre-process" SQL. In this case, this could be omitted from the configuration.

Example Aspire document for an update

 <doc action="update">
   <ID source="RDBFeederImpl">1</ID>
   <fetchUrl source="RDBFeederImpl">1</fetchUrl>
   <PG_ID source="RDBFeederImpl">1</PG_ID>
   <ENTITY_TYPE source="RDBFeederImpl">productgroup</ENTITY_TYPE>
   <PG_UUID source="RDBFeederImpl">uuid val 1</PG_UUID>
   <PG_WEBSITE source="RDBFeederImpl">bbc.co.uk/eastenders/test</PG_WEBSITE>
   <PG_TYPE source="RDBFeederImpl">Series</PG_TYPE>
   <feederLabel source="RDBFeederImpl">EM3FullFeeder</feederLabel>
   <feederType source="RDBFeederImpl">RDBFeeder</feederType>
 </doc>

Example Aspire document for a delete

 <doc action="delete">
   <ID source="RDBFeederImpl">1</ID>
   <fetchUrl source="RDBFeederImpl">1</fetchUrl>
   <PG_ID source="RDBFeederImpl">1</PG_ID>
   <feederLabel source="RDBFeederImpl">EM3FullFeeder</feederLabel>
   <feederType source="RDBFeederImpl">RDBFeeder</feederType>
 </doc>

Configuration

This feeder takes all parameters from the Simple Feeder plus the following:

Element Type Default Description
feederLabel string RDBFeed The feeder label submitted in the <feederLabel> of the published document.
rdbLocation String rdb The Aspire component name/path to the rdb service. The RDB service is an RDB Connection component which maintains a pool of RDB connections.
id/@column String None The name of the column that holds the primary key (id) for the row.

NOTE: This must match the column name returned in the sql/update parameter. You may need to use the SQL AS construct.

sequence/@column String None The name of the column that holds the sequence from the queue table.

NOTE: This must match the column name returned in the sql/update parameter. You may need to use the SQL AS construct.

action/@column String None The name of the column that holds the action from the queue table.

NOTE: This must match the column name returned in the sql/update parameter. You may need to use the SQL AS construct.

sql String None The various pieces of SQL to run. See below.
events String See below The events to publish documents against. See below.


SQL Configuration

The RDB feeder uses configurable SQL to specify what to feed.

Element Type Default Description
sql/preUpdate String None The SQL to be run before an incremental update takes place. (optional)
sql/update String None The SQL to be run to extract data for the incremental update. Should include the id, sequence and action as a minimum.
sql/postUpdate String None The SQL to be run as jobs for an incremental complete. The token :SEQ will be replaced by the sequence for the update that completed.
sql/fullSelect String None The SQL to be run to extract data for the full feed. (optional)

Event Configuration

The RDB feeder publishes differing actions against differing events meaning different pipelines can be used to handle different actions.

Element Type Default Description
events/insert/@event String onPublish The event to publish against for an insert.
events/update/@event String onPublish The event to publish against for an update.
events/delete/@event String onDelete The event to publish against for a delete.


Branch Configuration

The RDB feeder publishes documents using the branch manager. It publishes using the events configured above. You must therefore include <branches> for these events in the configuration to publish to a pipeline within a pipeline manager. See Branch Handler for more details.

Element Type Description
branches/branch/@event String The event to configure.
branches/branch/@pipelineManager string The URL of the pipeline manager to publish to. Can be relative.
branches/branch/@pipeline string The name of the pipeline to publish to. Omit for default.

Example Configurations

Simple Configuration

 <component name="EM3FullFeeder" subType="default" factoryName="aspire-rdbfeeder">
   <config>
     <branches>
       <branch event="onPublish" pipelineManager="/system/standard-pipe-manager"/>
       <branch event="onDelete" pipelineManager="/system/standard-pipe-manager" pipeline="delete-pipe">
     </branches>
     <feedWait>0</feedWait>
     <id column="ID"/>
     <sequence column="SEQ"/>
     <action column="ACTION"/>
     <sql>
       <preUpdate><![CDATA[update queue set status='I' where seq in 
                             (select top 3 seq from queue where status!='I' order by seq asc)]]></preUpdate>
       <update><![CDATA[select queue.seq AS SEQ, queue.id AS ID, queue.action AS ACTION, main.col1, main.col2, main.col3
                          from main right outer join queue
                          on main.id = queue.id
                          where queue.status = 'I'
                          order by queue.seq asc]]></update>
       <postUpdate><![CDATA[update queue set status='C' where seq = :SEQ]]></postUpdate>
       <fullSelect><![CDATA[select main.id, main.col1, main.col2, main.col3 from main]]></fullSelect>
     </sql>
   </config>
 </component>

Simple Configuration: Only Full Feeds

This example shows a simple configuration where only full re-feeds are required (i.e. no incremental updates).

Note: ROWNUM column shown below is only appropriate for Oracle.

 <component name="CVsFullFeeder" subType="default" factoryName="aspire-rdbfeeder">
   <config>
     <rdbLocation>RDBConnection</rdbLocation>
     <branches>
       <branch event="onPublish" pipelineManager="ProcessCVsBatchXMLFile"/>
     </branches>
     <id column="applicant_id"/>
     <feedWait>0</feedWait>
     <sql>
       <fullSelect><![CDATA[
         SELECT applicant_id, roles, main_skills, text_cv
         FROM
         ( 
           select applicant_id, roles, main_skills, text_cv from stcvs
         )
         WHERE ROWNUM <= 25
       ]]></fullSelect>
     </sql>
   </config>
 </component>

Complex Configuration

  <component name="FeedRealTimeUpdates" subType="default" factoryName="aspire-rdbfeeder">
     <config>
       <feederLabel>FeedRealTimeUpdates</feederLabel>
       <rdbLocation>RDBConnection</rdbLocation>
       <branches>
         <branch event="onPublish"  pipelineManager="ProcessRegistrant" pipeline="process-registrant-action"/>
         <branch event="onDelete"  pipelineManager="ProcessRegistrant" pipeline="process-registrant-delete"/>
       </branches>
       <loopWait>60000</loopWait>
       <feedWait>0</feedWait>
       <id column="REGISTRATION_ID"/>
       <sequence column="UPDATE_ID"/>
       <action column="ACTION"/>
       <sql>
         <preUpdate>
           <![CDATA[
           update REGISTRANT_UPDATES set status = 'I' where status = 'W'
           ]]>
         </preUpdate>
         <update>
           <![CDATA[
           SELECT queue.update_id AS UPDATE_ID, queue.action AS ACTION,
                        'true' AS USE_COMMIT_WITHIN, 
                        r.REGISTRATION_ID,     
                                FIRST_NAME,
                                CURRENT_LAST_NAME,
                                CURRENT_STATE,
                                to_char(BIRTH_DATE,'yyyy-mm-dd') AS BIRTH_DATE, 
                                GENDER,
                                DISPLAY_STATE,
                                to_char(CREATION_DATE,'yyyy-mm-dd') AS CREATION_DATE,
                                to_char(LAST_UPDATE_DATE,'yyyy-mm-dd') AS LAST_UPDATE_DATE
                         FROM   registrations r right outer join REGISTRANT_UPDATES queue
                                on r.registration_id = queue.registration_id   
                         WHERE  DISPLAY_STATE >= 0  AND  DISPLAY_STATE <= 2
                                and queue.status = 'I'
                         order by queue.update_id asc
           ]]>
         </update>
         <postUpdate>
           <![CDATA[
           UPDATE REGISTRANT_UPDATES set status = 'C'
                         WHERE update_id = :SEQ
                               and status = 'I'
           ]]>
         </postUpdate>
       </sql>
     </config>
   </component>