| <!DOCTYPE html> |
| <html lang="en"> |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| http://www.apache.org/licenses/LICENSE-2.0 |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --> |
| <head> |
| <meta charset="utf-8" /> |
| <title>Groovy</title> |
| <!--link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /--> |
| <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" /> |
| </head> |
| |
| <body> |
| <h2>Summary</h2> |
| <p>This is a grooviest groovy script :)</p> |
| <h2>Script Bindings:</h2> |
| <table> |
| <tr><th>variable</th><th>type</th><th>description</th></tr> |
| <tr> |
| <td>session</td> |
| <td>org.apache.nifi.processor.ProcessSession</td> |
| <td>the session that is used to get, change, and transfer input files</td> |
| </tr> |
| <tr> |
| <td>context</td> |
| <td>org.apache.nifi.processor.ProcessContext</td> |
| <td>the context (almost unuseful)</td> |
| </tr> |
| <tr> |
| <td>log</td> |
| <td>org.apache.nifi.logging.ComponentLog</td> |
| <td>the logger for this processor instance</td> |
| </tr> |
| <tr> |
| <td>REL_SUCCESS</td> |
| <td>org.apache.nifi.processor.Relationship</td> |
| <td>the success relationship</td> |
| </tr> |
| <tr> |
| <td>REL_FAILURE</td> |
| <td>org.apache.nifi.processor.Relationship</td> |
| <td>the failure relationship</td> |
| </tr> |
| <tr> |
| <td>CTL</td> |
| <td>java.util.HashMap<String,<a href="https://github.com/apache/nifi/blob/main/nifi-api/src/main/java/org/apache/nifi/controller/ControllerService.java">ControllerService</a>></td> |
| <td>Map populated with controller services defined with `CTL.*` processor properties. |
| <br/>The `CTL.` prefixed properties could be linked to controller service and provides access to this service from a script without additional code.</td> |
| </tr> |
| <tr> |
| <td>SQL</td> |
| <td>java.util.HashMap<String, <a href="http://docs.groovy-lang.org/latest/html/api/groovy/sql/Sql.html">groovy.sql.Sql</a>></td> |
| <td>Map populated with `groovy.sql.Sql` objects connected to corresponding database defined with `SQL.*` processor properties. |
| <br/>The `SQL.` prefixed properties could be linked only to DBCPSercice.</td> |
| </tr> |
| <tr> |
| <td>RecordReader</td> |
| <td>java.util.HashMap<String,<a href="https://github.com/apache/nifi/blob/main/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReaderFactory.java">RecordReaderFactory</a>></td> |
| <td>Map populated with controller services defined with `RecordReader.*` processor properties. |
| <br/>The `RecordReader.` prefixed properties are to be linked to RecordReaderFactory controller service instances.</td> |
| </tr> |
| <tr> |
| <td>RecordWriter</td> |
| <td>java.util.HashMap<String,<a href="https://github.com/apache/nifi/blob/main/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java">RecordSetWriterFactory</a>></td> |
| <td>Map populated with controller services defined with `RecordWriter.*` processor properties. |
| <br/>The `RecordWriter.` prefixed properties are to be linked to RecordSetWriterFactory controller service instances.</td> |
| </tr> |
| <tr> |
| <td>Dynamic processor properties</td> |
| <td>org.apache.nifi.components.PropertyDescriptor</td> |
| <td>All processor properties not started with `CTL.` or `SQL.` are bound to script variables</td> |
| </tr> |
| </table> |
| |
| <h2>SQL map details</h2> |
| <p> |
| <b>Example:</b> if you defined property <code>`SQL.mydb`</code> and linked it to any DBCPService, |
| then you can access it from code <code>SQL.mydb.rows('select * from mytable')</code><br/> |
| |
| <br/>The processor automatically takes connection from dbcp service before executing script and tries to handle transaction: |
| <br/> database transactions automatically rolled back on script exception and committed on success. |
| <br/>Or you can manage transaction manually. |
| <br/>NOTE: Script must not disconnect connection. |
| |
| <br/><img src="SQL.gif"/> |
| <br/><img src="SQL2.gif"/> |
| </p> |
| |
| <h2>SessionFile - flow file extension</h2> |
| <p> |
| The (org.apache.nifi.processors.groovyx.flow.SessionFile) is an actual object returned by session in Extended Groovy processor.<br/> |
| This flow file is a container that references session and the real flow file.<br/> |
| This allows to use simplified syntax to work with file attributes and content: |
| </p> |
| <p><i>set new attribute value</i></p> |
| <pre> |
| flowFile.ATTRIBUTE_NAME = ATTRIBUTE_VALUE |
| flowFile.'mime.type' = 'text/xml' |
| flowFile.putAttribute("ATTRIBUTE_NAME", ATTRIBUTE_VALUE) |
| //the same as |
| flowFile = session.putAttribute(flowFile, "ATTRIBUTE_NAME", ATTRIBUTE_VALUE) |
| </pre> |
| |
| <p><i>remove attribute</i></p> |
| <pre> |
| flowFile.ATTRIBUTE_NAME = null |
| //equals to |
| flowFile = session.removeAttribute(flowFile, "ATTRIBUTE_NAME") |
| </pre> |
| |
| <p><i>get attribute value</i></p> |
| <pre> |
| String a = flowFile.ATTRIBUTE_NAME |
| </pre> |
| |
| <p><i>write content</i></p> |
| <pre> |
| flowFile.write("UTF-8", "THE CharSequence to write into flow file replacing current content") |
| flowFile.write("UTF-8"){writer-> |
| do something with java.io.Writer... |
| } |
| flowFile.write{outStream-> |
| do something with output stream... |
| } |
| flowFile.write{inStream, outStream-> |
| do something with input and output streams... |
| } |
| </pre> |
| |
| <p><i>get content</i></p> |
| <pre> |
| InputStream i = flowFile.read() |
| def json = new groovy.json.JsonSlurper().parse( flowFile.read() ) |
| String text = flowFile.read().getText("UTF-8") |
| </pre> |
| |
| <p><i>transfer flow file to success relation</i></p> |
| <pre> |
| REL_SUCCESS << flowFile |
| flowFile.transfer(REL_SUCCESS) |
| //the same as: |
| session.transfer(flowFile, REL_SUCCESS) |
| </pre> |
| |
| <p><i>work with dbcp</i></p> |
| <pre> |
| import groovy.sql.Sql |
| |
| //define property named `SQL.db` connected to a DBCPConnectionPool controller service |
| //for this case it's an H2 database example |
| |
| //read value from the database with prepared statement |
| //and assign into flowfile attribute `db.yesterday` |
| def daysAdd = -1 |
| def row = SQL.db.firstRow("select dateadd('DAY', ${daysAdd}, sysdate) as DB_DATE from dual") |
| flowFile.'db.yesterday' = row.DB_DATE |
| |
| //to work with BLOBs and CLOBs in the database |
| //use parameter casting using groovy.sql.Sql.BLOB(Stream) and groovy.sql.Sql.CLOB(Reader) |
| |
| //write content of the flow file into database blob |
| flowFile.read{ rawIn-> |
| def parms = [ |
| p_id : flowFile.ID as Long, //get flow file attribute named `ID` |
| p_data : Sql.BLOB( rawIn ), //use input stream as BLOB sql parameter |
| ] |
| SQL.db.executeUpdate(parms, "update mytable set data = :p_data where id = :p_id") |
| } |
| </pre> |
| |
| |
| <h2>Handling processor start & stop</h2> |
| |
| <p>In the extended groovy processor you can catch `start` and `stop` and `unscheduled` events by providing corresponding static methods:</p> |
| <pre> |
| import org.apache.nifi.processor.ProcessContext |
| import java.util.concurrent.atomic.AtomicLong |
| |
| class Const{ |
| static Date startTime = null; |
| static AtomicLong triggerCount = null; |
| } |
| |
| static onStart(ProcessContext context){ |
| Const.startTime = new Date() |
| Const.triggerCount = new AtomicLong(0) |
| println "onStart $context ${Const.startTime}" |
| } |
| |
| static onStop(ProcessContext context){ |
| def alive = (System.currentTimeMillis() - Const.startTime.getTime()) / 1000 |
| println "onStop $context executed ${ Const.triggerCount } times during ${ alive } seconds" |
| } |
| |
| static onUnscheduled(ProcessContext context){ |
| def alive = (System.currentTimeMillis() - Const.startTime.getTime()) / 1000 |
| println "onUnscheduled $context executed ${ Const.triggerCount } times during ${ alive } seconds" |
| } |
| |
| flowFile.'trigger.count' = Const.triggerCount.incrementAndGet() |
| REL_SUCCESS << flowFile |
| </pre> |
| <br/> |
| <br/> |
| <br/> |
| <br/> |
| </body> |
| </html> |