惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

S
Security Affairs
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
CTFtime.org: upcoming CTF events
CTFtime.org: upcoming CTF events
Jina AI
Jina AI
P
Palo Alto Networks Blog
GbyAI
GbyAI
大猫的无限游戏
大猫的无限游戏
A
Arctic Wolf
Hugging Face - Blog
Hugging Face - Blog
小众软件
小众软件
Y
Y Combinator Blog
T
The Blog of Author Tim Ferriss
Blog — PlanetScale
Blog — PlanetScale
S
Schneier on Security
V
Vulnerabilities – Threatpost
C
Cybersecurity and Infrastructure Security Agency CISA
雷峰网
雷峰网
T
Tenable Blog
人人都是产品经理
人人都是产品经理
T
Tor Project blog
C
Cyber Attacks, Cyber Crime and Cyber Security
AWS News Blog
AWS News Blog
Microsoft Security Blog
Microsoft Security Blog
J
Java Code Geeks
Scott Helme
Scott Helme
SecWiki News
SecWiki News
C
CERT Recently Published Vulnerability Notes
Recorded Future
Recorded Future
I
InfoQ
Security Archives - TechRepublic
Security Archives - TechRepublic
Help Net Security
Help Net Security
Cloudbric
Cloudbric
C
Check Point Blog
Engineering at Meta
Engineering at Meta
TaoSecurity Blog
TaoSecurity Blog
B
Blog
Cyber Security Advisories - MS-ISAC
Cyber Security Advisories - MS-ISAC
博客园_首页
N
News and Events Feed by Topic
云风的 BLOG
云风的 BLOG
MyScale Blog
MyScale Blog
腾讯CDC
量子位
Application and Cybersecurity Blog
Application and Cybersecurity Blog
K
Kaspersky official blog
Vercel News
Vercel News
F
Full Disclosure
T
Troy Hunt's Blog
Forbes - Security
Forbes - Security
S
Security @ Cisco Blogs

博客园 - vikingwei

<<人纪>>针灸篇倪海厦 NIFI调用API HTTP PostgreSQL源码安装 Installation from Source Code netstat 查看端口(sqlserver) SQL Server 常用的83条SQL语句,速速收藏 用户中心 - 博客园 用户中心 - 博客园 用户中心 - 博客园 用户中心 - 博客园 用户中心 - 博客园 用户中心 - 博客园 用户中心 - 博客园 用户中心 - 博客园 用户中心 - 博客园 用户中心 - 博客园 用户中心 - 博客园 用户中心 - 博客园 用户中心 - 博客园 用户中心 - 博客园
NIFI免密调用API HTTPS
vikingwei · 2025-06-16 · via 博客园 - vikingwei

1.新增处理器processor: GenerateFlowFile并增加参数属性

GenerateFlowFile-->properties-->点右上角的+号Add Property-->输入要传入的参数

本例参数为:account and password.

被请求服务器端给出帐号与密码,主要是目的是获取Token,再用Token取查询获取具体的数据。

2.参数属性转换为JSON:AttributesToJSON

 AttributesToJSON-->properties-->Attributes List参数属性输入此处:account,password,此处理器将参数属性转为JSON

-->Destination转换好的json输出为文件内容:flowfile-content。

3.免密获取Token调用API:ExecuteScript

3.1.ExecuteScript-->properties-->Property:Script Engine,Value:Groovy

3.2.ExecuteScript-->properties-->Property:Script Body,Value具体代码如下:

import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
import org.apache.nifi.processor.io.StreamCallback
//Refer to URL:https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-scripting-nar/1.9.2/org.apache.nifi.processors.script.ExecuteScript/additionalDetails.html

// 1. 定义忽略证书验证的逻辑
def disableSSLVerification = {
javax.net.ssl.HttpsURLConnection.setDefaultHostnameVerifier { hostname, session -> true }
def trustAllCerts = [ new javax.net.ssl.X509TrustManager() {
public void checkClientTrusted(java.security.cert.X509Certificate[] certs, String authType) {}
public void checkServerTrusted(java.security.cert.X509Certificate[] certs, String authType) {}
public java.security.cert.X509Certificate[] getAcceptedIssuers() { null }
} ] as javax.net.ssl.TrustManager[]
def sslContext = javax.net.ssl.SSLContext.getInstance("TLS")
sslContext.init(null, trustAllCerts, new java.security.SecureRandom())
javax.net.ssl.HttpsURLConnection.setDefaultSSLSocketFactory(sslContext.socketFactory)
}

flowFile = session.get()
if(!flowFile) return

try{
disableSSLVerification() // 禁用 SSL 验证

//2.发送 HTTP 请求.Setting request charset ... parameter.
def url = flowFile.getAttribute('Request_token_URL').toURL()
def connection = url.openConnection() as HttpURLConnection
connection.setRequestMethod("POST")
connection.setRequestProperty("Content-Type", "application/json; charset=UTF-8")
connection.setRequestProperty("Accept-Charset", "UTF-8")
connection.setDoOutput(true)

//3.准备Request Body(示例:直接使用字符串,或从 FlowFile 读取).
//Get parameter data:account and password for JSON.
def String read_data = ""
session.read(flowFile, {inputStream ->
read_data = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
} as InputStreamCallback)
def requestBody = read_data // 或从 flowFile 读取
flowFile = session.putAttribute(flowFile, 'requestBody', requestBody)

//4.写入 Body
def outputStream = connection.getOutputStream()
outputStream.write(requestBody.getBytes(StandardCharsets.UTF_8))
outputStream.close()

//5.处理响应并写入 FlowFile.Get response body setting charset to UTF-8.
def responseCode = connection.getResponseCode()
def responseBody = connection.getInputStream().getText("UTF-8")

// Cast a closure with an outputStream parameter to OutputStreamCallback
flowFile = session.write(flowFile, {out ->
out.write(responseBody.getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)

session.transfer(flowFile, REL_SUCCESS)
} catch (Exception e) {
log.error("Fails to invoke API for abnormal.", e)
session.transfer(flowFile, REL_FAILURE)
}

3.3.Request Body参数请求体通过上面的json文件传入。

{

  "account": "AccessAPIForRedcred",

  "password": "VWxNcC85ek5PUDhDS3JqODA3bWJ5UEIxd3FLMTFjc2ZEbTEwbXZ6eTh6RFBVcG5VSWhkNEowT1hxU2wwSUtoSjFYTGxlOGR6RncwUkV2ZGloQ1BURkE9PQ=="

}

3.4.Response 内容如下:

{
"status": 200,
"success": true,
"msg": "Success",
"msgDev": null,
"response": {
"success": true,
"token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJJRCI6IjM2MDgyIiwiVVNFUl9BQ0NPVU5UIjoiQWNjZXNzQVBJRm9yUmVkY3JlZCIsIlNUQUZGX05VTUJFUiI6IkFQSTAwMDciLCJTVEFGRl9OQU1FIjoiQWNjZXNzQVBJRm9yUmVkY3JlZCIsIk9SR19JRCI6Ijg4MCIsIk9SR19OQU1FIjoiU2hhcmVkIiwiREVQQVJUTUVOVF9DT0RFIjoiODE2OCIsIlRJVExFIjoiIiwiRU1BSUwiOiIiLCJBQ0NPVU5UX0RPTUFJTiI6Ik5PTkUiLCJDTElFTlRfVFlQRSI6IldFQiIsImp0aSI6IjRmMjRjMTRkLTIxNGMtNGM0My1iZTY0LTljYTgyMzcxNDZiZiIsImlhdCI6IjYvMTYvMjAyNSA0OjIxOjI0IFBNIiwiaHR0cDovL3NjaGVtYXMubWljcm9zb2Z0LmNvbS93cy8yMDA4LzA2L2lkZW50aXR5L2NsYWltcy9leHBpcmF0aW9uIjoiNi8xNi8yMDI1IDg6MjE6MjQgUE0iLCJuYmYiOjE3NTAwNjIwODQsImV4cCI6MTc1MDA3NjQ4NCwiaXNzIjoiZUZhY3RvcnkuU2VydmljZSIsImF1ZCI6IndyIn0.FNZA9QJY5-DtUAM-wqAoBU0Y5XRVpzeyHexcywo1K-w",
"expires_in": 1440.0,
"token_type": "Bearer",
"refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYmYiOjE3NTAwNjIwODQsImV4cCI6MTc1MDE0ODQ4NCwiaXNzIjoiZUZhY3RvcnkuU2VydmljZSIsImF1ZCI6IndyIn0.63gdKr4AAGncrMrAGPukih5q-SyhOJ0nr8vLFUNB9rs"
}
}

4.解析JSON获取Token:EvaluateJsonPath

EvaluateJsonPath-->properties-->Destination设置输出为属性:flowfile-attribute

-->点右上角的+号Add Property-->Property:输入属性变量token(自己定义),value:$.response.token

 5.设置实际请求参数:UpdateAttribute

UpdateAttribute-->properties-->点右上角的+号Add Property-->输入要传入的参数属性:Property:START_TIME,Value:2023-12-01 00:00:00

Property:END_TIME,Value:2023-12-15 23:59:59

Property:Bearer_Token,Value:Bearer ${token}

6.实际请求参数属性转换为JSON:AttributesToJSON

 AttributesToJSON-->properties-->Attributes List参数属性输入此处:START_TIME,END_TIME,此处理器将参数属性转为JSON

-->Destination转换好的json输出为文件内容:flowfile-content。

7.免密获取数据调用API:ExecuteScript

7.1.ExecuteScript-->properties-->Property:Script Engine,Value:Groovy

7.2.ExecuteScript-->properties-->Property:Script Body,Value具体代码如下:

import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
import org.apache.nifi.processor.io.StreamCallback
//Refer to URL:https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-scripting-nar/1.9.2/org.apache.nifi.processors.script.ExecuteScript/additionalDetails.html

// 1. 定义忽略证书验证的逻辑
def disableSSLVerification = {
javax.net.ssl.HttpsURLConnection.setDefaultHostnameVerifier { hostname, session -> true }
def trustAllCerts = [ new javax.net.ssl.X509TrustManager() {
public void checkClientTrusted(java.security.cert.X509Certificate[] certs, String authType) {}
public void checkServerTrusted(java.security.cert.X509Certificate[] certs, String authType) {}
public java.security.cert.X509Certificate[] getAcceptedIssuers() { null }
} ] as javax.net.ssl.TrustManager[]
def sslContext = javax.net.ssl.SSLContext.getInstance("TLS")
sslContext.init(null, trustAllCerts, new java.security.SecureRandom())
javax.net.ssl.HttpsURLConnection.setDefaultSSLSocketFactory(sslContext.socketFactory)
}

flowFile = session.get()
if(!flowFile) return

try{
disableSSLVerification() // 禁用 SSL 验证

//2.发送 HTTP 请求.设置请求token、字符集等参数.
//2.1.The URL from GenerateFlowFile processor,by attribute setting.
def url = flowFile.getAttribute('Request_Abnormal_URL').toURL()
def connection = url.openConnection() as HttpURLConnection
connection.setRequestMethod("POST")
connection.setRequestProperty("Content-Type", "application/json; charset=UTF-8")
connection.setRequestProperty("Accept-Charset", "UTF-8") //Charset setting to UTF-8.
//2.2.By token parameter:Bearer + token, request from HTTPS.Setting token parameter.
connection.setRequestProperty("Authorization", flowFile.getAttribute('Bearer_Token'))
connection.setDoOutput(true)

//3.准备Request Body(示例:直接使用字符串,或从 FlowFile 读取).
//Get parameter data:account and password for JSON.
def String read_data = ""
session.read(flowFile, {inputStream ->
read_data = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
} as InputStreamCallback)
def requestBody = read_data // 或从 flowFile 读取
flowFile = session.putAttribute(flowFile, 'requestBody', requestBody)

//4.写入 Setting Request Body.
def outputStream = connection.getOutputStream()
outputStream.write(requestBody.getBytes(StandardCharsets.UTF_8))
outputStream.close()

//5.处理响应并写入 FlowFile.Get response body setting charset to UTF-8.
def responseCode = connection.getResponseCode()
def responseBody = connection.getInputStream().getText("UTF-8")

// Cast a closure with an outputStream parameter to OutputStreamCallback
flowFile = session.write(flowFile, {out ->
out.write(responseBody.getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)

session.transfer(flowFile, REL_SUCCESS)
} catch (Exception e) {
log.error("Fails to invoke API for abnormal.", e)
session.transfer(flowFile, REL_FAILURE)
}

7.3.Request Body参数请求体通过上面的json文件传入。

{

  "START_TIME": "2023-12-01 00:00:00",

  "END_TIME": "2023-12-15 23:59:59"

}

8.解析JSON获取数据:EvaluateJsonPath

EvaluateJsonPath-->properties-->Destination设置输出为JSON内容:flowfile-content

-->点右上角的+号Add Property-->Property:输入属性变量JSON(自己定义), value:$.response

9.JSON内容写入数据库:PutDatabaseRecord

9.1.PutDatabaseRecord-->properties-->Property:Record Reader,Value:JsonTreeReader-Global(自己在Controller Service Details定义)

JsonTreeReader-Global:Configuration-->Controller Services-->右上角点+-->选JsonTreeReader-->名称输入:JsonTreeReader-Global-->PROPERTIES-->Schema Access Strategy:Infer Schema,Starting Field Strategy:Root Node,其它属性值留空白。

9.2.PutDatabaseRecord-->properties-->Property:Statement Type,Values:INSERT

Property:Statement Type,Values:INSERT

Property:Database Connection Pooling Service,Values:数据库连接池自己选要写入的数据库即可。

Property:Schema Name,Values:imptemp

Property:Table Name,Values:要写入的表名

Property:Quote Column Identifiers,Values:true 字段名称是否加引号?

Property:Quote Table Identifiers,Values:true 表名称是否加引号。

注:table的column名称与顺序一定要与JSON中是一样的,包括数据类型。

小结:将JSON批量写入数据库,用PutDatabaseRecord比较快。