NiFi에서 Python 실행 - tobyseo/open GitHub Wiki

ExecuteScript

설정

  • 위의 config 처럼 Property 를 추가하면 스크립트 내에서 바로 사용가능
변수(스크립트에서 사용)
  • session : flowfile을 다루기 위해 사용

    • create() : 새 filwfile을 생성
    • putAttribute() : 새로운 attribute를 추가
    • transfer() : flowfile 전송
  • context : Processor의 설정,상태등을 관리하기 위해 사용

  • log : 로깅을 위해 사용

    • ./logs/nifi-app.log 에 로깅
    • ./conf/logback.xml 에서 로깅 레벨 설정
  • REL_SUCCESS : success relationship 을 위해 사용

  • REL_FAILURE : failure relationship 을 위해 사용

Python Script (출력만 있는 경우)
from org.apache.nifi.processor.io import OutputStreamCallback


class WriteCallback(OutputStreamCallback):

    def __init__(self, content=None):
        self.content = content

    def process(self, out):
        out.write(self.content)


log.info("start")
flowFile = session.create()
log.info("%s\n" % str(config)) # config 는 Processor 설정의 PROPERTIES 에서 설정한 값

writeCallback = WriteCallback("hello world!")
flowFile = session.write(flowFile, writeCallback)
session.transfer(flowFile, REL_SUCCESS)
Python Script (입력된 flowFile을 처리한 뒤 출력)

https://gist.github.com/tobyseo/9da7940d7d34933f726a823b34232f9a

import json
import sys
import traceback
from java.nio.charset import StandardCharsets
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback


class TransformCallback(StreamCallback):

    def __init__(self):
        pass

    def process(self, inputStream, outputStream):
        try:
            # Input
            input_text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
            input_obj = json.loads(input_text)

            # Transform
            output_obj = input_obj
            output_obj['value'] = output_obj['value'] * 2
            output_obj['message'] = 'Hello'

            # Output
            output_text = json.dumps(output_obj)
            outputStream.write(output_text)
        except:
            traceback.print_exc(file=sys.stdout)
            raise


flowFile = session.get()
if flowFile != None:
    log.info("=== Start ===")

    flowFile = session.write(flowFile, TransformCallback())

    filename = flowFile.getAttribute('filename')
    new_filename = "%s_trans.json" % filename.split('.')[0]
    flowFile = session.putAttribute(flowFile, "filename", new_filename)

    log.info("=== Finish ===")

session.transfer(flowFile, REL_SUCCESS)

--

InvokeScriptedProcessor

  • ExecuteScript는 Processor 인터페이스의 Trigger()만 구현하여 flowfile을 조작
  • Java로 구현하지 않고 Processor의 모든 기능을 사용하고 싶을 때는 InvokeScriptedProcessor를 사용
  • Processor 인터페이스의 getPropertyDescriptors, getRelationships, onTregger 등을 직접 구현해야 하며 properties, relationships 를 직접 정의할 수 있음
  • ExecuteScript는 REL_SUCCESS, REL_FAILUTRE 이 두개의 relationships만 사용 가능하지만, InvokeScriptedProcessor는 모든 relationships을 정의할 수 있음
  • 모든 인터페이스를 구현해야 하며, processor라는 인스턴스가 반드시 정의되어야 함
  • session.commit()을 직접 실행해야 하며 session또한 sessionFactory.createSession()을 통해 직접 생성해야 함
참고 자료
Python Script

https://gist.github.com/tobyseo/9f91e8215585639d6e1a7707f235b261

--

ExecuteProcess

  • Batch Duration 설정을 통해 daemon 프로세스의 출력을 일정 주기로 가져올 수 있음
    • Scheduling 의 Run schedule 를 0으로 설정해야 함
      • 이렇게 하지 않을 경우 오동작을 일으킴 (최초 1회는 Batch Duration 만큼 후에 결과가 나오고 그 다음은 Run schedule 에 따름)
    • daemon이 아닌 프로세스의 경우엔 사용할 수 없음