Published: Nov 29, 2020 by Dkrypt
Project Repository (GitHub)
What is Apache NiFi
Apache NiFi is a robust, powerful and reliable system to process and distribute data in real time. It has an intuitive UI with simple drag-n-drop feature, which helps to create data flows. A NiFi flow is a collection of processors connected with defined relationships to handle success and failure. Each processor acts as a single unit to either ingest, transform or distribute the data. The data unit in NiFi is called FlowFile. A FlowFile can be considerd as a file which has some metadata like filename, uuid and some attributes which are attached to FlowFile by each processor according to the process it does.
What is a NiFi Processor
NiFi is built around Flow Based Programming. Meaning, the data flows from one end to the other and all the transformation or distribution is done by the intermediate processors. A NiFi Flow is built by connecting various processors via connection/relationship, the data enters the processors and is routed to one of the many connections defined by the processor.
So, a NiFi Processor is nothing but an atomic unit which performs some action on the data presented to it, contained in a FlowFile. At present there are more than 270 processors that come bundled with Apache NiFi. In addition to this, NiFi framework is built in such a way that it present the opportunity to extend the functionality by creating Custom Processors.
Overview
To demonstrate building of new processor, we are going to create a processor which is capable of sorting a list of provided numbers according to the selected sorting method. Also, we are going to have a dynamic property to provide the output order (increasing/decreasing) and two relationship namely “Sorted” and “Failure”.
This post is written to share my knowledge about custom NiFi Processors and how to build one from scratch. This post is divided into four (4) major sections. Each of which is listed below.
- Bootstrapping & project structure
- Explanation of Relationships and Properties
- Explanation of overridden methods
- Including new processor in NiFi and Testing it.
Bootstrapping & project structure
Apache NiFi uses maven as build automation tool and dependency manager. We are going to use maven. If you have some background of maven, you will be able to understand the project structure better. I assume you either have knowledge or you can learn about it on the go.
- We use
mvn archetype:generate
to bootstrap a simple project and use that as a template.
Using maven archetype generator
To use mvn archetype:generate
you must have maven install. You can download and install maven from offical website. After you have successfully installed maven, you need to open terminal/command prompt and type
mvn -v
you should see the version of installed maven binary.
- To create a java project run:
mvn archetype:generate
This will list all the available archetypes from a remote repository and you will be presented with a list of all these archetypes.
- type “nifi” and it will list the archetypes with keyword “nifi” in them
- Select
org.apache.nifi:nifi-processor-bundle-archetype
by typing the corresponding number. - After selecting the archetype, it will list available versions. We are going to choose the latest version which matches the NiFi version also.
- After you see
BUILD SUCCESS
you will have a directory created locally namednifi-sorter-bundle
. - You will see a directory structure as below:
Explanation of Relationships and Properties
Open your project in IntelliJ IDEA/Eclipse. Go to nifi-sorter-bundle/nifi-sorter-processors/src/main/java/io/github/dkrypt/processors/sorter/MyProcessor.java
. Here you can see a MyProcessor
class extending the AbstractProcessor
class.
- Rename the file and class from
MyProcessor.java
toSorter.java
.
Here you can see PropertyDescriptor and Relationship. Let’s discuss more about these two Java objects.
PropertyDescriptor
In a NiFi processor there are properties defined according to needs. A PropertyDescriptor defines the property to be used by a Processor.
A sample property is defined as MY_PROPERTY. We are going to delete this and create a new property as below
public static final PropertyDescriptor SORT_ALGO = new PropertyDescriptor
.Builder().name("SORT_ALGO")
.displayName("Sorting Algorithm")
.description("Algorithm to be used for sorting")
.allowableValues("Selection Sort", "Bubble Sort", "Insertion Sort","Quick Sort", "Merge Sort")
.required(true)
.defaultValue("Selection Sort")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
Relationships
In a NiFi processor there are relationships (connections) defined according to output scenarios. A Relationships defines route to which a FlowFile may be transferred from a Processor.
A sample relationship is already defined as MY_RELATIONSHIP. We are going to delete this and create two new relationships as below
public static final Relationship REL_SORTED = new Relationship.Builder()
.name("Sorted")
.description("Relationship to route sorted flowfiles to.")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("Failure")
.description("Relationship to route failed flowfiles to.")
.build();
Explanation of overridden methods
As our main class extends the AbstractProcessor
class. There are some methods which needs be overridden and defined.
- The first method that we are going to look and configure is the
init
method. This method takesProcessorInitializationContext
as argument. This methods initializes the properties and relationships. We are going to add our property SORT_ALGO and relationships REL_SORTED and REL_FAILURE.
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(SORT_ALGO);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(REL_SORTED);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
- Next overridden method is
getRelationships
. This returns aSet<Relationship>
with the initial values defined ininit
method. - Similar to
getRelationships
we have another orverridden methodgetSupportedPropertyDescriptors
. This returns aList<PropertyDescriptor>
with values initialized ininit
method. - Next method with
@OnScheduled
annotations isonScheduled
. This is an important method which is invoked when the processor is scheduled to run. Here we do preprocessing on the property values. To demonstrate the usage of this method we are going to take the property value and assign it to a variablesortingAlgorithm
. Also we are going to fetch our dynamic property which is going to define the order of sorting. This is demonstrated below.
private String sortingAlgorithm = null;
private String sortingOrder = null;
@OnScheduled
public void onScheduled(final ProcessContext context) {
sortingAlgorithm = context.getProperty(SORT_ALGO).getValue();
context.getProperties().forEach((propertyDescriptor, s) -> {
if(propertyDescriptor.isDynamic())
sortingOrder = s;
});
}
- Next, in our list of overridden methods is the
onTrigger
method. This method is triggerred when Processor is triggerred, either by incoming flowfile or according to the scheduled time. Here we get access to two arguments namelyProcessContext
andProcessSession
.ProcessSession
provides access to incoming flowfile.FlowFile
can be used to get the data and attributes of the flowfile.
Implementing sorting algorithms
Here we are going to define our Sorting Algorithms. We are going to create an interface named Sorter
along with separate classes for each sorting algorithm. E.g. SelectionSort.java
implementing interface Sorter
. The project structure looks like below.
Our onTrigger
method looks like below. Please refer to this repository here to look at the complete file with other methods.
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
// Get the incoming flowfile
FlowFile flowFile = session.get();
if ( flowFile == null ) return;
boolean errorOccurred = false;
// Read the contents of flowfile as String
AtomicReference<String> inputString = new AtomicReference<>();
session.read(flowFile, (in) -> {
StringWriter writer = new StringWriter();
IOUtils.copy(in, writer, Charset.defaultCharset());
String input = writer.toString();
inputString.set(input);
});
// Separate Strings by "\n" and Parse to Integer
String[] inputStringList = inputString.get().split("\n");
List<Integer> inputList = new ArrayList<>();
for(String number : inputStringList)
inputList.add(Integer.parseInt(number));
// Sorting Logic
AtomicReference<List<Integer>> finalSortedList = new AtomicReference<>();
try {
finalSortedList.set(sort(inputList));
} catch (Exception e) {
errorOccurred = true;
}
flowFile = session.write(flowFile, outputStream -> {
String out = "";
for (int sorted : finalSortedList.get()) {
out = out.concat(Integer.toString(sorted)+"\n");
}
outputStream.write(out.getBytes());
});
if (errorOccurred) {
session.transfer(flowFile, REL_FAILURE);
}
flowFile = session.putAttribute(flowFile, "execution time", Long.toString(executionTime.multipliedBy(1000).toMillis()));
session.transfer(flowFile, REL_SORTED);
}
Including new processor in NiFi and Testing it.
Once you are done with coding your custom processor you have to build it and get a .nar
file.
- To build the project run
mvn clean install
from your parent directory. For me it isnifi-sorter-bundle
. - Download NiFi binary from here. Unzip it and configure it to run.
- After maven build go to
nifi-sorter-nar
directory. You will find atarget
directory created. Inside the target directory there will be a file namednifi-sorter-nar-1.0.0.nar
. Copy this file to your NiFi binary directory as specified below. - Go to
/lib
directory inside your NiFi binary directory and paste thenifi-sorter-nar-1.0.0.nar
file. - Run your NiFi and search for your processor in your list of processors.
- See your processor on your canvas working. Here is a snapshot of processor in a flow.