Nifi cluster setup

Nifi cluster setup

설명
Nifi 관련된 내용 총 정리 해보자
Last Updated
Last updated March 25, 2023
태그
Nifi
Data Engineering
DE
ETL

Why clustering 셋업?

  • NiFi 관리자 또는 DFM (DataFlow Manager)가 단일 서버에서 하나의 NiFi 인스턴스를 사용하는 것만으로는 원하는 데이터 양을 처리하기 힘듦.
  • 따라서 여러 대의 Nifi 인스턴스들을 사용.
 
  • 여러 대를 사용하는 것까진 좋았는데, Flow 변경이나 Nifi 업데이트가 있을 때마다 각 인스턴스들을 일일이 손봐줘야 하고, 또 각 인스턴스마다 따로 따로 모니터링을 해야 함.
 
  • 위의 두 가지 불편한 사항을 해소하기 위해 Nifi 클러스터를 사용.
  • 클러스터를 사용하면, 각 인스턴마다 동일한 데이터 Flow 를 통해 병렬로 데이터를 처리하고 Flow 변경(한 번만 변경하면 나머지 노드들이 해당 변경 사항을 복제해 감), Nifi 업데이트 등을 손쉽게 처리 가능하며 단일 인터페이스를 통해 모든 노드의 상태를 모니터링 가능.
  • Flow 변경은 어떤 노드에서 하든 상관 없음.
  • 특정 노드에서 변경을 하면, 해당 변경 사항이 나머지 노드들에 복사가 되어
  • 전체 노드가 동일하게 변경된 Flow 를 갖게 됨
 

Zero Leader Clustering

NiFi는 Zero-Leader Clustering 패러다임을 사용함.
클러스터의 각 노드에는 동일한 Flow가 정의되고, 각 노드가 갖는 데이터에 대해 (Flow 대로) 동일한 작업을 수행 각각 다른 데이터 집합에서 작동함.
클러스터는 모든 활성 노드에 데이터를 자동으로 배포.
내가 이해하기로는 클러스터에 포함된 모든 활성 노드에 Flow 가 동기화되고, 그 Flow 대로 데이터를 움직이고 수정되고 작성되는 작업이 각 모든 활성 노드에서 일어남.
 
Apache ZooKeeper를 통해 노드 중 하나는 자동으로 클러스터 코디네이터로 선택.
클러스터의 모든 노드가 하트 비트 / 상태 정보를 코디네이터 노드로 보냄.
만약 코디네이터가 자신에게 일정 시간 동안 하트 비트를 보내지 않는 노드를 발견하면
해당 노드를 클러스터에서 끊어버림(연결을 끊음)
끊는 이유는, 클러스터 내 모든 노드가 동기화되어 있는지 확인해야하기 때문.
하트 비트가 오지 않는다? 이런 경우 노드 하나가 죽었다고 봄.
죽은 노드다? 그 죽은 노드는 동기화가 되지 않음.
동기화되지 않는다? 모든 노드간 데이터 및 flow 가 일정해지지 않음. 그래서 끊어버림.
 
반대로, 새로운 노드가 클러스터에 참여하려면, 먼저 코디네이터 노드에 연결을 함
그리고 나서 코디네이터가 "그래 들어와" 라고 허락해주면 (노드 참가를 결정하면) 현재 flow 가 새로운 노드에 제공됨.
코디네이터에 의해 제공된 flow 가 (만약 새로운 노드가 이미 flow 를 갖고 있다면) 새로운 노드의 flow 와 일치하면 클러스터에 참가됨.
반대로 코디네이터에 의해 제공된 flow 와 새로운 노드가 갖고 있는 flow 의 버전이 일치하지 않으면 참가가 안 됨(....)
(그럼 노드내 flow를 다 지우고 참가시켜야하나??
어디서 읽었는진 기억 안 나는데, 이런 경우 투표를 한다는 식으로 말하던데....)

NiFi Clustering과 HA(High Availability)

NiFi Clustering으로 찾아보면 Zero_master Clustering이라는 문구를 가장 쉽게 마주치게 됩니다. 1.0.0 버전에서부터 적용한 방식으로, 간단히 설명하면 ZooKeeper를 활용해서 내부적으로 Cluster Coordinator를 선정하여 NiFi의 flow를 클러스터된 노드들 모두 동일하게 유지합니다.
Zero-master라 함은, Cluster Coordinator가 떠있는 노드가 SPOF가 되는 것이 아니라 또다시 다른 Cluster Coordinator를 띄울 리더를 선출하기 때문에, 마스터 역할만을 하는 노드가 별도로 존재하지 않기 때문입니다.
 
그래서 처음에 쉽게 생각했습니다.
'NiFi Clustering을 통해서 작업도 쉽게 분배하고, HA도 이룰 수 있겠구나.' 라고...
결과는 둘 다 아니었습니다?
 
HDFS, Hive, Spark 같이 처음부터 distributed 환경에서 쓰도록 만들어진 Application을 너무도 쉽고 당연하게 접할 수 있어서, 막연히 비슷하게 생각했던 것 같습니다.
 

먼저, NiFi Clustering.

3개의 노드를 하나의 클러스터로 구성을 하고, 그 중 하나의 노드의 UI에 접근을 해서 작업을 정의한다고 생각해 봅시다. 간단하게 아래와 같은 flow로 작성을 했습니다.
ExecuteSQL -> ConvertAvroToORC -> PutHDFS
그러면, 3개의 NiFi 노드에 동일하게 해당 flow가 생성이 됩니다. 그리고 작업을 전부 시작시켜 봅시다. 첫 번째 Processor(ExecuteSQL)가 폭주(?!)하지 않도록 적당한 schedule을 넣어 실행시켜 보겠습니다. 3개의 노드에서 모두 같은 SQL을 수행을 할 것이고, 그 결과 파일을 각자의 노드로 가져와서 ConvertAvroToORC 작업을 수행한 후에 해당 flowfile의 이름대로 HDFS에 파일을 put 할 것입니다. 즉 같은 파일이 3개가 HDFS에 쌓이게 되겠죠. (?)
NiFi를 Single 노드로 쓸 때와 같이 작업을 정의하게 되면, 클러스터링된 NiFi는 각각의 노드에서 모두 같은 작업을 수행하게 됩니다. 사실 문서에도 해당 내용을 나와 있습니다. 클러스터링이 된다고 했지, 작업 분배를 자동으로 해 준다고는 안 했으니까요. (작업 분배가 필요하다고도 나와 있습니다.)
그래서, NiFi 클러스터링에서는 작업 분배가 필요하고, 보통 아래와 같은 추가적인 Data flow 정의가 필요합니다.
1) Kafka와 같은 queue로부터 작업을 시작 (ex. ConsumeKafka, ConsumeJMS)
2) Nifi의 remote group이나 HAProxy처럼 Nifi Clustering 앞 단에서 분배해서 시작
3) Data source를 바꿀 수 없는 경우 (대부분), 해당 데이터를 가져와야 할 processor를 On Primary Node로 설정한 후, 1,2와 같은 방법을 통해 작업을 재분배
어쨌든 작업을 분배했으니 클러스터링해서 사용하는 의미를 찾을 수 있습니다. 우리는 처리해야 할 data에 비해서 cpu core도 부족하고, NiFi JVM에게 할당한 메모리도 넉넉치 못한 서버라서, 어쩔 수 없으니까요. 하지만 클러스터링이 곧 HA는 아닙니다.

그러면 이제 HA (High Availability)

NiFi를 3개의 노드로 클러스터링을 했으니까, 노드 하나가 죽어도 NiFi는 다음 리더를 선출해서 남은 노드들 중에서 Cluster Coordinator(및 Primary Node)를 시키면서 클러스터링을 유지해 나갑니다. NiFi UI를 통해서 현재 클러스터링 상태가 2/3 으로 바뀌는 것을 볼 수 있습니다. 그러니까 HA라고 할 수 있을까요.
이번에도 이미 안 될 거라고 말했는데요.. NiFi도 클러스터링이 된다고 했지, HA는 된다고 한 적이 없습니다. 아마도 Failover가 안된다는 것이 조금 더 정확한 표현일 지 모르겠습니다.
되는 것부터 보면, 하나의 노드가 떨어져나간 클러스터링의 상태에서도 Primary node를 zookeeper를 통해 새로 선정하게 될 것입니다. 그러면 위에서 말한 어느 방법이더라도 작업 분배에는 이상이 없습니다.
문제는 이미 분배된 작업(및 data)인데요. 죽은 노드에서 돌던 작업들은 그 노드의 NiFi 프로세스를 되살리기 전까지는 그냥 사라져버린다고 볼 수 있습니다. 돌고 있는 노드들이 뭔가 조치를 해줄 수도 없고, 분배된 작업만 수행하는 입장에서는 솔직히 무슨 문제가 있는 지도 모릅니다.
이 점은 정의해 놓은 flow가 복잡하고 오래 걸리는 작업일 수록, 그리고 데이터가 빠지지 않고 다 들어오는 것이 중요할 수록, 그리고 시간 제약이 있는 데이터일수록 문제가 크다는 생각을 합니다.
어떤 이유에서 죽었는 지는 모르겠지만 해당 노드의 NiFi를 되살리면 NiFi의 ContentRepository, FlowFileRepository가 힘을 발휘해서 이어서 작업을 할 수는 있습니다.
죽은 노드에서의 작업을 온전히 재수행이라도 시키기 위해서는 해당 노드의 NiFI를 어떻게든 되살리는 것 말고는 답이 없습니다.

그래서

비슷한 고민을 하는 Nifi 개발자가 Confluence에 던진 제안이 있는데 보면 요약해보면, Repository 를 HDFS와 같은 외부의 distributed 환경에 저장을 하고, failover 시 활용할 수 있도록 하면 HA를 할 수 있을 것이라는 내용입니다.
 

Clustering 용어 설명

Coordinator

Cluster 를 이루는 노드들을 관리하는 노드. Zookeeper 에 의해 노드들 중 하나가 Coordinator 로 선출됨. Cluster 에 새로운 노드가 생기면 그 새로운 노드에 최신 Flow 를 제공함. Coordinator 가 Cluster 에서 사라지면 Zookeeper 가 남아있는 노드들 가운데 하나를 다시 Coordinator 로 선출함.

Node

Cluster 를 이루는 Nifi. 실제 작업을 수행함 모든 노드는 각자의 환경에서 동일한 Flow 를 실행함. 예를 들어 Server 1, 2, 3 위에 Nifi 노드가 하나씩 올라가 있고, A Processor 를 실행한다고 할 때, Server 1 에서 노드 1이 A Processor 를 실행하고, Server 2 에서 노드 2가 A Processor 를 실행하고, Server 3 에서 노드 3이 A Processor 를 실행한다. Processor 에서 다루는 데이터는 디스크에 저장된다.

Primary

독자적으로 Processor 를 실행함. 즉, 특정 Processor를 다른 노드에선 실행하지 않고, Primary 에서만 실행하게 할 수 있음. 예를 들어 외부 네트워크로부터 데이터를 받아오는 Processor 가 있다고 하자. 모든 노드들이 해당 Processor 를 실행하면 경쟁 상태가 되는 문제가 생길 수 있기 때문에 해당 Processor 는 Primary 에서만 사용할 수 있도록 하고, 받은 데이터는 (round robin 등의 정책을 통해) 전체 노드에 나눠주고(로드 밸런싱) 병렬처리 함. (물론 해당 방법은 Primary 를 사용하는 대신 singlenode Nifi 에서 실행할 수 있음. 관리자 마음대로 구성하면 됨) Primary 가 Cluster 에서 사라지면 Zookeeper 가 남아있는 노드들 가운데 하나를 다시 Primary 로 선출함.

Heart beat

각 노드들은 (기본적으로) 5초마다 Coordinator 에게 heart beat 를 보내어 "나 살아있다" 라고 알려줌 만약 모종의 이유에 의해 40초(기본 5초 * 8번) 동안 heart beat 를 보내지 못하는 노드가 생기면 Coordinator 는 해당 노드를 Cluster 에서 제외시켜버림. 왜 노드를 끊어버리느냐? 이유는 각 노드들이 똑같은 Flow 를 갖고 있는지 확인 할 수 없기 때문(동기화를 할 수 없기 때문에) 어떤 노드가 네트워크가 끊겨 heart beat 를 보내지 못하는 상황인데, Nifi Flow 가 업데이트 되었다면 해당 노드는 이 업데이트를 받지 못하여 오래된 Flow 를 갖게 될 것임. 이러한 동기화 문제가 있기 때문에 heart beat 를 보내지 못하는 노드를 Cluster 에서 제외시키는 것 (연결이 끊어진 노드는 WebUI (bullbulletin) 에 게시되므로 우리가 직접 눈으로 확인 가능) 끊긴 노드의 네트워크가 늦게나마 되살아나서 heart beat 를 다시 보내면, Coordinator 가 해당 노드를 인식하고 다시 Cluster 에 참여시킴. (실제로 Cluster 에서 node 하나가 사라지면 Processor 업데이트가 불가능하더라) 각 노드들은 어떤 노드가 Coordinator 인지 알고 heart beat 를 보내는 걸까? Zookeeper 를 통해 할 수 있다. Zookeeper 가 "얘가 Coordinator 야! 얘한테 heart beat 보내면 돼!" 라는 정보를 Znode 로 저장하고 있음.

Offload

연결이 끊긴 노드가 실행하고 있던 Flow Files 들은 어떻게 처리 할 것인가? 그 땐 해당 노드를 offload 시키면 됨. 그럼 해당 노드가 갖고 있던 Flow Files 이 일 잘 하고 있는 다른 활성 노드들로 넘어가게 됨. 물론 offload 된 노드 역시 재시작 시키고 Cluster 에 다시 참여시킬 수 있음.
 

Cluster 셋업하기

Prerequisites

  • 서버 세대로 구성(본문에서는 node01, node02, node03)
  • CentOS >= 7
  • Java Oracle >= 8

공통작업

  1. NIFI 다운로드
    1. $ wget https://www.apache.org/dyn/closer.lua?path=/nifi/1.9.0/nifi-1.9.0-bin.tar.gz
  1. 압축 풀기
    1. $ tar -xzvf nifi-1.9.0-bin.tar.gz
  1. /etc/hosts 파일 수정
    1. $ vi /etc/hosts
      127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 192.168.101.54 node01 192.168.101.55 node02 192.168.101.56 node03
  1. nifi-1.9.0/conf/zookeeper.properties 파일 하단에 Zookeeper 서버 지정 끝 부분에 zookeeper에 연동할 서버와 포트를 입력해 줍시다.
    1. $ vi /nifi-1.9.0/conf/zookeeper.properties
      # # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # # # clientPort=2181 initLimit=10 autopurge.purgeInterval=24 syncLimit=5 tickTime=2000 dataDir=./state/zookeeper autopurge.snapRetainCount=30 # # Specifies the servers that are part of this zookeeper ensemble. For # every NiFi instance running an embedded zookeeper, there needs to be # a server entry below. For instance: # # server.1=nifi-node1-hostname:2888:3888 # server.2=nifi-node2-hostname:2888:3888 # server.3=nifi-node3-hostname:2888:3888 # # The index of the server corresponds to the myid file that gets created # in the dataDir of each node running an embedded zookeeper. See the # administration guide for more details. # server.1=node01:2888:3888 server.2=node02:2888:3888 server.3=node03:2888:3888
  1. nifi-1.9.0/conf/state-management.xml 파일 수정 이 파일의 다음 부분만 수정하여 주키퍼로 연동될 서버호스트와 포트를 지정해줍시다.
    1. <cluster-provider> <id>zk-provider</id> <class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class> <property name="Connect String">node01:2181,node02:2181,node03:2181</property> <property name="Root Node">/nifi</property> <property name="Session Timeout">10 seconds</property> <property name="Access Control">Open</property> </cluster-provider> ```

호스트별 작업

  1. 지정된 각각의 호스트에 ID를 부여해줍시다.
    1. $ sudo mkdir /nifi/state $ sudo mkdir /nifi/state/zookeeper
      node01 서버에서
      $ echo 1 > /<nifi 설치경로>/state/zookeeper/myid
      node02 서버에서
      $ echo 2 > /<nifi 설치경로>/state/zookeeper/myid
      node03 서버에서
      $ echo 3 > /<nifi 설치경로>/state/zookeeper/myid
  1. 각 서버의 nifi.properties를 수정하여 클러스터 옵션을 지정해줍시다.
      • 먼저 내장 주키퍼 사용 옵션을 설정하여 줍시다.
      #################### # State Management # #################### nifi.state.management.configuration.file=./conf/state-management.xml # The ID of the local state provider nifi.state.management.provider.local=local-provider # The ID of the cluster-wide state provider. This will be ignored if NiFi is not clustered but must be populated if running in a cluster. nifi.state.management.provider.cluster=zk-provider # Specifies whether or not this instance of NiFi should run an embedded ZooKeeper server nifi.state.management.embedded.zookeeper.start=true # Properties file that provides the ZooKeeper properties to use if <nifi.state.management.embedded.zookeeper.start> is set to true nifi.state.management.embedded.zookeeper.properties=./conf/zookeeper.properties
      • 클러스터 사용을 활성화 시키고 연결될 해당 노드 호스트와 클러스터가 통신하기를 원하는 포트를 지정하여 줍시다.
      nifi.web.http.host=node1 ... # cluster node properties (only configure for cluster nodes) # nifi.cluster.is.node=true nifi.cluster.node.address=node1 nifi.cluster.node.protocol.port=5882 nifi.cluster.node.protocol.threads=10 nifi.cluster.node.protocol.max.threads=50 nifi.cluster.node.event.history.size=25 nifi.cluster.node.connection.timeout=5 sec nifi.cluster.node.read.timeout=5 sec nifi.cluster.node.max.concurrent.requests=100 nifi.cluster.firewall.file= nifi.cluster.flow.election.max.wait.time=1 mins nifi.cluster.flow.election.max.candidates=
      • 연결될 주키퍼 클라이언트 주소를 지정해줍시다.
      # zookeeper properties, used for cluster management # nifi.zookeeper.connect.string=node01:2181,node02:2181,node03:2181 nifi.zookeeper.connect.timeout=3 secs nifi.zookeeper.session.timeout=3 secs nifi.zookeeper.root.node=/nifi
      web properties
      • single user 환경에서는 nifi.web.https.host 항목을 꼭 설정하지 않아도 되었지만, 클러스터 환경으로 구성할 경우 웹 요청이 클러스터 전체로 도달하려면 해당 속성에 반드시 hostname을 작성해 주어야 한다.
      State Management
      • nifi.state.management.embedded.zookeeper.start 항목을 true로 변경해준다.
      security properties
      • nifi.sensitive.props.keys : 12자 이상으로 설정해준다
      cluuster node properties
      • nifi.cluster.is.node=true : 서버를 클러스터 노드로서 사용할 것이기 때문에 true로 변경
      • nifi.cluster.node.address : 노드의 호스트네임을 입력해준다. 설정하지 않을 시 localhost가 되는데 ,딱히 상관은 없는듯하다
      • nifi.cluster.node.protocol.port : 임의의 포트를 지정하여 모든 서버에서 통일해준다
      • nifi.cluster.node.connection.timeout, nifi.cluster.node.read.timeout 값이 너무 짧게 지정되어 있으므로 늘려준다 (30초로 늘림)
      • nifi.cluster.flow.election.max.wait.time은 반대로 너무 길기 때문에 1 mins 로 맞춰 주었다
      zookeeper properties
      • nifi.zookeeper.connect.string : state-management에서 설정했던 Connect String을 입력한다

Linux Operating System Configuration

NiFi 는 많은 수의 파일을 열고, 또 많은 수의 threads 를 생성하기 때문에 Linux 에서 NiFi 사용시, /etc/security/limits.conf의 nofile, nproc 값을 늘리는 것이 좋다
  • nproc : User당 사용할 수 있는 프로세스 최대 개수
  • nofile : User당 오픈할 수 있는 파일 개수 (리눅스에서는 모든 개체를 파일로 봅니다.)
* hard nofile 50000 * soft nofile 50000 * hard nproc 10000 * soft nproc 10000
 
NiFi 가 사용중인 메모리가 swapping 되지 않도록 /etc/sysctl.conf 의 vm.swappiness 를 0으로 두는 것이 좋다.
vm.swappiness = 0
vm.swappiness 에 대한 설명은 다음 링크 참고. brunch.co.kr/@alden/14
간단하게 정리하면, 메모리 요청이 왔을 때, linux 에 유휴 메모리가 부족하다면 다른 곳에서 메모리를 가져오는데(이것을 메모리 재할당이라고 부름)
이 때 두 가지 방법을 통해 메모리를 가져온다.
  1. page cache memory 를 가져옴
  1. 기존 프로세스 (inactive process) 에 할당된 memory 를 가져옴
2번의 경우 swapping 이 발생하기 때문에 성능에 영향을 줄 수 있다.
2번 방법을 최대한 줄이고 1번 방법을 최대로 사용하도록 하는 것이 vm.swappiness 를 0으로 두는 것.
 

기타 최적화 사항

NiFi System Administrator's Guide
dn: cn=User 1,ou=users,o=nifi objectClass: organizationalPerson objectClass: person objectClass: inetOrgPerson objectClass: top cn: User 1 sn: User1 uid: user1 dn: cn=User 2,ou=users,o=nifi objectClass: organizationalPerson objectClass: person objectClass: inetOrgPerson objectClass: top cn: User 2 sn: User2 uid: user2 dn: cn=admins,ou=groups,o=nifi objectClass: groupOfNames objectClass: top cn: admins member: cn=User 1,ou=users,o=nifi member: cn=User 2,ou=users,o=nifi file-user-group-provider org.apache.nifi.authorization.FileUserGroupProvider ./conf/users.xml cn=nifi-node1,ou=servers,dc=example,dc=com cn=nifi-node2,ou=servers,dc=example,dc=com ldap-user-group-provider org.apache.nifi.ldap.tenants.LdapUserGroupProvider ANONYMOUS FOLLOW 10 secs 10 secs ldap://localhost:10389 30 mins false ou=users,o=nifi person ONE_LEVEL cn ou=groups,o=nifi groupOfNames ONE_LEVEL cn member composite-user-group-provider org.apache.nifi.authorization.CompositeConfigurableUserGroupProvider file-user-group-provider ldap-user-group-provider file-access-policy-provider org.apache.nifi.authorization.FileAccessPolicyProvider composite-user-group-provider ./conf/authorizations.xml John Smith cn=nifi-node1,ou=servers,dc=example,dc=com cn=nifi-node2,ou=servers,dc=example,dc=com managed-authorizer org.apache.nifi.authorization.StandardManagedAuthorizer file-access-policy-provider

Nifi Repository

  1. FlowFile Repository : 현재 Flow 안에 존재하는 FlowFiles들에 대한 메타 정보들을 가지고 있음 
  1. Content Repository : 현재/과거 FlowFiles 에 대한 데이터(contenets)를 가지고 있음 
  1. Provenance Repository : FlowFiles들에 대한 이력정보를 가지고 있음
disk 의 파티션을 총 3개로 나눈 후, 각각의 파티션에 각 repo 들(flowfile, provenance, content)을 저장하는 것이 권장된다고 한다.

FlowFile Repository


NiFi에서 처리되고 있는 flowfile들은 Hash Map 형태로 JVM 메모리에 존재하게 된다. (nifi.apache.org/docs/nifi-docs/html/nifi-in-depth.html#DeeperView). 메모리에서 처리하는 것은 빠르고 효율적임이지만, 장애 발생 시 데이터에 대한 지속성을 보장하지 않아 데이터 손실을 가져온다. (durability 를 보장할 수 없다). 이러한 문제를 해결하기 위해서 FlowFile Repository는 현재 시스템에서 처리되고 있는 FlowFile의 메타정보/데이터에 대한 WAL(Write-Ahead Log)역할을 수행한다.  FlowFile의 메타 정보는 FlowFile과 관련된 모든 속성들을 가지고 있는데,  실제 FlowFile에 데이터의 위치(실제 데이터는 Content Repository에 존재), FlowFile의 상태(어떤 connection/queue에 존재하는지) 등을 포함한다. WAL과 같은 기능을 제공함으로써 FlowFile Repository를 통해 시스템 장애로 인한 지속성을 제공하게 된다.
FlowFile이 NiFi의 여러 Processor를 통해 처리됨에 따라 WAL과 동일하게 변경사항들이 FlowFile Repository에 저장이 되고 트랜잭션이 처리가된다. (WAL 저장 후 트랜잭션 처리, 이슈 발생 시 WAL 을 이용한 데이터 복구 수행). 이렇게 생성되는 로그(WAL)은 변경사항들에 대한 순차적인 모음임으로 NiFi는 이를 통해서 Repository가 check-point 될 때 생성되는 snapshot을 기준으로 로그에 기록된 작업을 수행하여 장애 시점의 FlowFile을 복구할 수 있게된다.
FlowFile Snapshot은 시스템에 의해서 주기적으로 생성되는데 nifi.properteis 파일을 통해서 checkpoint에 대한 설정을 할 수 있다. checkpoint 시점에 시스템은 FlowFile의 HashMap을 직렬화 하여 새로운 생성하는데, 이는 *.partial 확장자를 갖는 파일로 생성된다. checkpoint 작업이 완료되면 오래된 snapshot 파일은 삭제되고 *.partial 파일이 snapshot으로 이름이 변경된다.

Content Repository


Content Repository는 단순하게 시스템에 존재하는 모든 FlowFile들에 대한 데이터(content)를 저장하고 있는 로컬 스토리지라고 생각하면 되는데, 실제 FlowFiles에 대한 데이터를 저장하기 때문에 위에서 언급한 3개의 저장소 중에 가장 큰 저장 공간을 사용하게 된다. 이 저장소는 처리 속도 극대화/thread 처리에 대한 안정성을 위해 불변성immutability과 copy-on-write 패러다임을 사용한다. Content Repository의 핵심 디자인은 Flowfile에 대한 데이터를 disk에 유지하면서 JVM 메모리에서 요청할 때만 read only로 적재하게 된다. 이러한 구조는 처리하고자 하는 object의 크기의 크기와 상관 없이 처리할 수 있게 되는데, 모든 데이터를 memory에 올리기 위한 pub/sub 이 필요하지 않기 때문이다. 결과적으로 매우 큰 object에 대해서 splitting, aggregating, transforming 작업을 수행할 때 메모리 사용량과 무관하게 처리할 수 있게 된다. (대신에 content repository에 대한 disk I/O 성능이 더 중요해질 것으로 생각된다.)
NiFi는 사용하지 않는 FlowFile에 대한 데이터는 GC를 통해 JVM heap에서 제거하는 작업을 수행하며, 이를 위해 별도의 GC collection 프로세스가 존재하며, 사용하지 않는 content를 분석하기 위한 thread가 별도로 존재한다. ( Deeper View: Deletion After Checkpointing" section) 이러한 작업들을 통해 FlowFile의 contents 가 더이상 필요하지 않다고 판단되면 삭제되거나 아카이빙된다. 아카이빙archiving 관련 설정은 nifi.properties에서 활성화 시킬 수 있다. archiving처리된 content는 Content Repository에 정의된 life-cycle(age)만큼 존재하다가 삭제가 되거나, Content Repository가 너무 많은 데이터를 사용하고 있다고 할 때에는 바로 삭제가 될 것이다.("nifi.content.repository.archive.max.retention.period", "nifi.content.repository.archive.max.usage.percentage")

Provenance Repository


Provenance Repository는 FlowFile에 대한 이력 정보를 위한 저장소이다. 이력정보는 각 데이터에 대한 Data Lineage를 제공하기 위해서 사용된다. FlowFile에 대한 이벤트가 발생할 때마다 새로운 provenance event(기원 이벤트..) 가 생성된다. Provenance event 또한 FlowFile의 특정 시점의 정보를 보여주는 snapshot로 생각할 수도 있다. provenance event가 생성되면 FlowFile의 모든 속성값과 content를 가리키는 포인터,  FlowFile의 상태 정보에 대한 집계정보가 Provenance Repository의 하나의 위치에 저장이 된다. 이러한 provenance event(=snapshot)은 변경되지 않으며 nifi.properties 에 명시된 일정이 지난 후에 삭제된다.
FlowFile의 모든 속성들과 content 포인터는 Provenance Repository에 저장되기 때문에 DataFlow Manager는 단순히 lineage를 보는 것이 아니라 데이터에 대한 처리이력을 확인할 수 있다. 이를 통해 특정 시점의 데이터를 확인할 수 있을 뿐 아니라 어떠한 위치에서든 다시 실행할 수 있다. 일반적으로 usecase는 데이터를 전송 받는 시스템(down-stream system)이 데이터를 받지 못한 경우인데, 목적 시스템에 데이터가 전송 될때 정확히 데이터가 어떠한 모습인지 파일이름은 무엇인지, URL은 무엇인지 확인이 가능하다. 또는 전송 이벤트가 다시 실행될 수 있어 목적시스템에서 데이터를 받게 처리할 수 있다. 만약 데이터가 원하는데로 처리되지 않았다면, flow를 수정한 뒤에 다시 데이터를 수정된 flow로 흘리는것도 가능하다.
Provenance Repository를 통해서 특정 시점의 FlowFile의 데이터를 확인 가능하지만, Content Repository에 대한 데이터를 복사하는 것은 아니다. 단지 FlowFile의 content에 대한 포인터를 복사하여 가지고 있는 것이다. 따라서 Provenance event에서 참조값을 가지고 있는 상태에서 content가 삭제될 수도 있다. 이런 경우에는 앞서 말했던 특정 시점의 FlowFile의 데이터를 본다던가 FlowFile을 재처리하는 작업들이 수행 불가능하게된다. 하지만 사용자는 FlowFile에 대한 lineage 자체는 조회가 가능함으로 데이터가 처리된 프로세스들은 확인할 수 있다. 예를들어, 이 경우 사용자는 실제 데이터는 확인하지 못하지만 데이터가 사용한 고유 식별자 및 파일 명과 같은 데이터는 확인할 수 있음으로 그것의 목적 시스템으로의 전송여부를 확인할 수 있다. 또한 다른 FlowFile에 대한 메타 정보는 가지고 있음으로 속성값을 통한 디버깅 작업을 수행할 수도 있다.
참고. Provenance event는 FlowFile의 snapshot과 같은 역할을 하기 때문에 현재 flow에 존재하는 FlowFile에 대한 변경은 provenance event를 나중에 재수행할 때 영향을 끼치게 된다. (FlowFile의 데이터가 같다고 하더라도 flow가 변경되면 당연히 처리 결과가 달라짐을 인지하고 있어야 한다)
 

유용한 팁

the content of a flow file is not stored in memory, only the properties and attributes associated with the physical file in the content repository are held in the JVM heap memory. The only time the content is in memory is when a processor that deals with the content of a file is in the flow.
 

nifi 기본 사용법

 

Nifi 사용하는 방법을 보여줌

Docker Apache NiFi 설치/실행 + Registry

 

resource 제한 및 팁 & 트릭 등 다양한 기초 정보 제공

꼭 읽어보자!
 

Nifi http로 붙기, port변경

nifi/conf/nifi.properties 에서 아래 내용 수정
nifi.remote.input.secure=false // false로 설정해줘야 https로 통신을 하지않는다... // 아래 security 값들도 빈칸으로 남겨야한다. // 해당 암호화 파일을 찾지못해 오류가 발생하기때문 nifi.security.keystore= # ./conf/keystore.p12 nifi.security.keystoreType= # PKCS12 nifi.security.keystorePasswd= nifi.security.keyPasswd= nifi.security.truststore= # ./conf/truststore.p12 nifi.security.truststoreType= # PKCS12 nifi.security.truststorePasswd= // 기본값은 https의 host와port 값이 주어지지만 http통신을 할것이기때문에 // https 설정을 빈값으로 두고 http설정을 해준다. nifi.web.http.host=127.0.0.1 nifi.web.http.port=9090 nifi.web.http.network.interface.default= ############################################# nifi.web.https.host= nifi.web.https.port= // http를 채우고 https는 빈값
 

nifi web 로그확인

tail -f logs/nifi-app.log 명령어를 실행
Docker NiFi 에서 log 를 보려면
docker logs -f [nifi container 이름] 명령어를 실행
 

Nifi 실행, 중지, 상태확인

NiFi 앱을 실행하려면
bin/nifi.sh start
NiFi 앱이 실행중인지 확인하려면
bin/nifi.sh status
NiFi 앱을 중지하려면
bin/nifi.sh stop

for setting up a high performance

NIFI Best practices for setting up a high performance NiFi installation
 

IO, CPU, memory 최적화

 

Nifi 성능 측정 과정

성능 측정 관련

클러스터 로드 밸런싱 cluster load balancing 관련

 

Nifi Registry

Registry 는 Flow의 버전 관리를 위해 사용됨
git 을 사용하는 것처럼, 다른 곳에 버전이 저장되고 flow 가 변경되면 변경된 내용을 다른 이름으로 commit 가능
commit 된 내용을 다른 Nifi 가 가져올 수 도 있음.
아래 동영상을 보면 바로 이해가 될 것임.
 

Parameter & Variable

Parameter 는 Nifi instance 에 전역에서 사용 가능한 값.
#{} 기호를 사용하여 사용 가능하며, Processor 의 property 의 값을, parameter 를 사용하여 넣을 수 있음.
비밀번호 같은 민감한 property 에 넣을 수 있는 parameter 가 있고,
일반적인 텍스트 property 에 넣을 수 있는 parameter 가 있음.
config 에서 설정해야 Processor 에서 사용 가능
설명 및 사용 방법 :
 
Variable 은 해당 Process Group 내에서 사용이 가능한 값.
프로세스 그룹에 Variable 이 지정되었다면 그 그룹 내에서 사용 가능
${} 기호를 사용하여 사용 가능. 마치 FlowFile 의 attribute 를 사용하는 것 처럼.
Variable 을 갖는 프로세스 그룹 내에 또 다른 프로세스 그룹이 있고
이름이 같은 Variable 이 있다면, 바깥쪽 프로세스 그룹의 Variable 이 안쪽 프로세스 그룹의 Variable 에 의해 덮어씌워짐.
마치 전역 변수가 지역 변수에 의해 덮어씌워지는 것처럼.
Variable 을 사용할 수 있는 Processor property 가 있고, 사용할 수 없는 property 가 있음.
설명 및 사용 방법 :
자세한 내용 :
참고로 $ 기호를 사용하는 FlowFile 의 attritube 는 아래 링크에서 볼 수 있음
 

로그 레벨 변경

conf/logback.xml 을 편집한다.
만약 INFO 에서 DEBUG 로 바꾸고 싶다면
<logger name="org.apache.nifi.web.api.config" level="INFO" additivity="false"> <appender-ref ref="USER_FILE"/> </logger>
위의 level 을 DEBUG 로 바꿔준다.
<logger name="org.apache.nifi.web.api.config" level="DEBUG" additivity="false"> <appender-ref ref="USER_FILE"/> </logger>
 

HDFS 데이터 분산 수집

참고사항