流装饰器参考
cartesianProduct
cartesianProduct
函数将具有多值字段(即数组)的单个元组转换为多个元组,每个元组对应数组字段中的一个值。也就是说,给定一个包含 fieldA 字段的 N 个值的数组的单个元组,cartesianProduct
函数将输出 N 个元组,每个元组包含原始元组数组中的一个值。从本质上讲,您可以展平数组以进行进一步处理。
例如,使用 cartesianProduct
,您可以将此元组
{
"fieldA": "foo",
"fieldB": ["bar","baz","bat"]
}
转换为以下 3 个元组
{
"fieldA": "foo",
"fieldB": "bar"
}
{
"fieldA": "foo",
"fieldB": "baz"
}
{
"fieldA": "foo",
"fieldB": "bat"
}
cartesianProduct 参数
-
传入流
: (必需)单个传入流。 -
fieldName 或 evaluator
: (必需)要展平值的字段名称或结果应展平的计算器。 -
productSort='fieldName ASC|DESC'
: (可选)新生成元组的排序顺序。
cartesianProduct 语法
cartesianProduct(
<stream>,
<fieldName | evaluator> [as newFieldName],
productSort='fieldName ASC|DESC'
)
cartesianProduct 示例
以下示例显示了此源元组的不同输出
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3]
}
单个字段,无排序
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB
)
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": [1,2,3]
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": [1,2,3]
}
单个计算器,无排序
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
sequence(3,4,5) as fieldE
)
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3],
"fieldE": 4
}
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3],
"fieldE": 9
}
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3],
"fieldE": 14
}
单个字段,按值排序
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB,
productSort="fieldB desc"
)
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": [1,2,3]
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": [1,2,3]
}
单个计算器,按计算器值排序
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
sequence(3,4,5) as fieldE,
productSort="newFieldE desc"
)
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3],
"fieldE": 14
}
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3],
"fieldE": 9
}
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3],
"fieldE": 4
}
重命名的单个字段,按值排序
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB as newFieldB,
productSort="fieldB desc"
)
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3]
"newFieldB": "valueB2",
}
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3]
"newFieldB": "valueB1",
}
多个字段,无排序
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB,
fieldC
)
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 1
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 2
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 3
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 1
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 2
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 3
}
多个字段,按单个字段排序
cartesianProduct(
search(collection1, qt="/export", q="*:*", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB,
fieldC,
productSort="fieldC asc"
)
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 1
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 1
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 2
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 2
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 3
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 3
}
多个字段,按多个字段排序
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB,
fieldC,
productSort="fieldC asc, fieldB desc"
)
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 1
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 1
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 2
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 2
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 3
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 3
}
字段和计算器,无排序
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
sequence(3,4,5) as fieldE,
fieldB
)
{
"fieldA": "valueA",
"fieldB": valueB1,
"fieldC": [1,2,3],
"fieldE": 4
}
{
"fieldA": "valueA",
"fieldB": valueB2,
"fieldC": [1,2,3],
"fieldE": 4
}
{
"fieldA": "valueA",
"fieldB": valueB1,
"fieldC": [1,2,3],
"fieldE": 9
}
{
"fieldA": "valueA",
"fieldB": valueB2,
"fieldC": [1,2,3],
"fieldE": 9
}
{
"fieldA": "valueA",
"fieldB": valueB1,
"fieldC": [1,2,3],
"fieldE": 14
}
{
"fieldA": "valueA",
"fieldB": valueB2,
"fieldC": [1,2,3],
"fieldE": 14
}
如您在上面的示例中所见,cartesianProduct
函数确实支持跨多个字段和/或计算器展平元组。
classify
classify
函数使用逻辑回归文本分类模型对元组进行分类。它专门设计为与使用 train
函数 训练的模型配合使用。classify
函数使用 model
函数 检索存储的模型,然后使用该模型对元组流进行评分。分类器读取的元组必须包含可用于分类的文本字段。classify 函数使用 Lucene 分析器从文本中提取特征,以便应用模型。默认情况下,classify
函数使用元组中文本字段的名称查找分析器。如果工作节点上的 Solr 架构不包含此字段,则可以通过指定 analyzerField
参数在另一个字段中查找分析器。
对每个分类的元组分配两个分数
-
probability_d*: 0 到 1 之间的浮点数,描述元组属于该类的概率。这在分类用例中很有用。
-
score_d*: 文档的分数,未在 0 到 1 之间压缩。分数可以是正数或负数。分数越高,文档越适合该类。此未压缩分数在查询重新排序和推荐用例中很有用。当多个高排名文档的 probability_d 分数为 1 时,此分数特别有用,因为这不会提供有意义的文档排名。
commit
commit
函数包装单个流 (A),并且给定集合和批处理大小,当批处理大小已满足或达到流的末尾时,将向集合发送提交消息。提交流最常与更新流一起使用,因此提交将考虑来自更新流的可能的摘要元组。进入提交流的所有元组都将从提交流中返回 - 不会丢弃任何元组,也不会添加任何元组。
complement
complement
函数包装两个流 (A 和 B),并发出 A 中不存在 B 中的元组。元组按其在流 A 中出现的顺序发出。两个流都必须按用于确定相等性的字段进行排序(使用 on
参数)。
complement 参数
-
StreamA 的 StreamExpression
-
StreamB 的 StreamExpression
-
on
:用于检查 A 和 B 之间的元组相等性的字段。可以采用以下格式:on="fieldName"
、on="fieldNameInLeft=fieldNameInRight"
或on="fieldName, otherFieldName=rightOtherFieldName"
。
complement 语法
complement(
search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc"),
on="a_i"
)
complement(
search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
on="a_i,a_s"
)
daemon
daemon
函数包装另一个函数,并使用内部线程按时间间隔运行它。daemon
函数可用于提供连续推送和拉取流。
连续推送流
对于连续推送流,daemon
函数包装另一个函数,然后将其发送到 /stream
处理程序以执行。/stream
处理程序识别 daemon
函数并将其保留在内存中,以便它可以按时间间隔运行其内部函数。
为了便于推送元组,daemon
函数必须包装另一个流装饰器,该装饰器将元组推送到某个位置。一个示例是 update
函数,它包装一个流并将元组发送到另一个 SolrCloud 集合进行索引。
daemon 语法
daemon(id="uniqueId",
runInterval="1000",
terminate="true",
update(destinationCollection,
batchSize=100,
topic(checkpointCollection,
topicCollection,
q="topic query",
fl="id, title, abstract, text",
id="topicId",
initialCheckpoint=0)
)
)
上面的示例代码显示了 daemon
函数包装 update
函数,后者包装 topic
函数。当此表达式发送到 /stream
处理程序时,/stream
处理程序会看到 daemon
函数并将其保留在内存中,它将在其中按时间间隔运行。在此特定示例中,daemon
函数将每秒运行一次 update
函数。update
函数包装一个 topic
函数,该函数将以批处理形式流式传输与 topic
函数查询匹配的元组。对主题的每次后续调用都将返回主题的下一批元组。update
函数会将与主题匹配的所有元组发送到另一个集合进行索引。terminate
参数告诉守护进程在 topic
函数停止发送元组时终止。
这样做的效果是将与特定查询匹配的文档推送到另一个集合中。可以插入自定义推送函数,将文档从 Solr 推送到其他系统,例如 Kafka 或电子邮件系统。
推送流还可以用于连续的后台聚合场景,其中聚合在后台按时间间隔汇总并推送到其他 Solr 集合。另一个用例是连续的后台机器学习模型优化,其中优化的模型被推送到另一个 Solr 集合,可以在其中将其集成到查询中。
/stream
处理程序支持一组小 命令,用于列出和控制守护进程函数
http://localhost:8983/solr/collection/stream?action=list
此命令将提供在特定节点上运行的当前守护进程的列表及其当前状态。
http://localhost:8983/solr/collection/stream?action=stop&id=daemonId
此命令将停止特定守护进程函数,但将其保留在内存中。
http://localhost:8983/solr/collection/stream?action=start&id=daemonId
此命令将启动已停止的特定守护进程函数。
http://localhost:8983/solr/collection/stream?action=kill&id=daemonId
此命令将停止特定守护进程函数并将其从内存中删除。
连续拉取流
DaemonStream java 类(SolrJ 库的一部分)也可以嵌入到 java 应用程序中,以提供连续拉取流。示例代码
StreamContext context = new StreamContext()
SolrClientCache cache = new SolrClientCache();
context.setSolrClientCache(cache);
Map topicQueryParams = new HashMap();
topicQueryParams.put("q","hello"); // The query for the topic
topicQueryparams.put("rows", "500"); // How many rows to fetch during each run
topicQueryparams.put("fl", "id", "title"); // The field list to return with the documents
TopicStream topicStream = new TopicStream(zkHost, // Host address for the ZooKeeper service housing the collections
"checkpoints", // The collection to store the topic checkpoints
"topicData", // The collection to query for the topic records
"topicId", // The id of the topic
-1, // checkpoint every X tuples, if set -1 it will checkpoint after each run.
topicQueryParams); // The query parameters for the TopicStream
DaemonStream daemonStream = new DaemonStream(topicStream, // The underlying stream to run.
"daemonId", // The id of the daemon
1000, // The interval at which to run the internal stream
500); // The internal queue size for the daemon stream. Tuples will be placed in the queue
// as they are read by the internal thread.
// Calling read() on the daemon stream reads records from the internal queue.
daemonStream.setStreamContext(context);
daemonStream.open();
//Read until it's time to shutdown the DaemonStream. You can define the shutdown criteria.
while(!shutdown()) {
Tuple tuple = daemonStream.read() // This will block until tuples become available from the underlying stream (TopicStream)
// The EOF tuple (signaling the end of the stream) will never occur until the DaemonStream has been shutdown.
//Do something with the tuples
}
// Shutdown the DaemonStream.
daemonStream.shutdown();
//Read the DaemonStream until the EOF Tuple is found.
//This allows the underlying stream to perform an orderly shutdown.
while(true) {
Tuple tuple = daemonStream.read();
if(tuple.EOF) {
break;
} else {
//Do something with the tuples.
}
}
//Finally close the stream
daemonStream.close();
delete
delete
函数包装其他函数,并使用找到的 id
和 _version_
值,将元组作为 按 ID 删除 命令发送到 SolrCloud 集合。
这类似于下面描述的 update()
函数。
delete 参数
-
destinationCollection
:(强制)将删除元组的集合。 -
batchSize
:(可选,默认为250
)删除批大小。 -
pruneVersionField
:(可选,默认为false
)是否从元组中删除_version_
值 -
StreamExpression
:(强制)
delete 语法
delete(collection1,
batchSize=500,
search(collection1,
q=old_data:true,
qt="/export",
fl="id",
sort="a_f asc, a_i asc"))
上面的示例使用 search
函数针对 collection1
返回的元组,并将找到的每个文档的 id
值转换为针对同一 collection1
的删除请求。
与 希望忽略并发更新并删除所有匹配文档的用户应将 预期并发更新且希望“跳过”任何失败删除的用户应考虑配置 |
eval
eval
函数允许在动态生成新的流表达式并对其进行评估的情况下使用用例。eval
函数包装流表达式并从底层流中读取单个元组。然后,eval
函数从元组的 expr_s
字段中检索字符串流表达式。然后,eval
函数编译字符串流表达式并发出元组。
executor
executor
函数包装包含流表达式的流源,并并行执行表达式。executor
函数在每个元组的 expr_s
字段中查找表达式。executor
函数有一个内部线程池,该线程池运行在同一工作节点上并行编译和运行表达式的任务。此函数还可以通过将其包装在 parallel
函数中来跨工作节点并行化,以在集群中并行执行表达式。
executor
函数不对其运行的表达式的输出执行任何特定操作。因此,执行的表达式必须包含将元组推送到其目标的逻辑。可以在正在执行的表达式中包含 update 函数,以便将元组发送到 SolrCloud 集合进行存储。
此模型允许异步执行作业,其中输出存储在 SolrCloud 集合中,可以在作业进行时访问该集合。
fetch
fetch
函数遍历流并获取其他字段,并将它们添加到元组中。fetch
函数分批获取以限制对 Solr 的回调次数。从 fetch
函数流出的元组将包含原始字段和获取的其他字段。fetch
函数支持一对一获取。流源包含重复键的多对一获取也适用,但一对多获取目前不受此函数支持。
having
leftOuterJoin
leftOuterJoin
函数包装两个流,Left 和 Right,并发出来自 Left 的元组。如果 Right 中存在一个元组相等(由 on
定义),则该元组中的值将包含在发出的元组中。Right 中的相等元组不必存在,即可发出 Left 元组。这支持一对一、一对多、多对一和多对多左外部连接场景。元组按它们在 Left 流中出现的顺序发出。两个流都必须按用于确定相等性的字段进行排序(使用 on
参数)。如果两个元组都包含同名字段,则发出元组中将使用来自 Right 流的值。
你可以使用 select
函数包装传入流,以具体说明哪些字段值包含在发出的元组中。
leftOuterJoin 参数
-
StreamExpression for StreamLeft
-
StreamExpression for StreamRight
-
on
:用于检查 Left 和 Right 之间元组相等性的字段。可以采用以下格式:on="fieldName"
、on="fieldNameInLeft=fieldNameInRight"
或on="fieldName, otherFieldName=rightOtherFieldName"
。
leftOuterJoin 语法
leftOuterJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
on="personId"
)
leftOuterJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
on="personId=ownerId"
)
leftOuterJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
select(
search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
ownerId,
name as petName
),
on="personId=ownerId"
)
hashJoin
hashJoin
函数封装两个流,Left 和 Right,对于 Left 中的每个元组,如果它存在于 Right 中,则会发出一个包含这两个元组的字段的元组。这支持一对一、一对多、多对一和多对多内部连接场景。元组按它们在 Left 流中出现的顺序发出。流的顺序无关紧要。如果两个元组包含同名字段,则在发出的元组中将使用来自 Right 流的值。
你可以使用 select
函数包装传入流,以具体说明哪些字段值包含在发出的元组中。
当 Left 和 Right 的元组无法按相同顺序放置时,可以使用 hashJoin 函数。由于元组无序,此流函数通过在 open 操作期间读取 Right 流中的所有值来工作,并将所有元组存储在内存中。这导致内存占用量等于 Right 流的大小。
hashJoin 参数
-
StreamExpression for StreamLeft
-
hashed=StreamRight 的 StreamExpression
-
on
:用于检查 Left 和 Right 之间元组相等性的字段。可以采用以下格式:on="fieldName"
、on="fieldNameInLeft=fieldNameInRight"
或on="fieldName, otherFieldName=rightOtherFieldName"
。
hashJoin 语法
hashJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
hashed=search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
on="personId"
)
hashJoin(
search(people, q="*:*", fl="personId,name", sort="personId asc"),
hashed=search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
on="personId=ownerId"
)
hashJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
hashed=select(
search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
ownerId,
name as petName
),
on="personId=ownerId"
)
innerJoin
封装两个流,Left 和 Right。对于 Left 中的每个元组,如果它存在于 Right 中,则会发出一个包含这两个元组的字段的元组。这支持一对一、一对多、多对一和多对多内部连接场景。元组按它们在 Left 流中出现的顺序发出。两个流都必须按用于确定相等性的字段(“on”参数)进行排序。如果两个元组包含同名字段,则在发出的元组中将使用来自 Right 流的值。你可以使用 select(…)
表达式封装传入流,以具体说明哪些字段值包含在发出的元组中。
innerJoin 参数
-
StreamExpression for StreamLeft
-
StreamExpression for StreamRight
-
on
:用于检查 Left 和 Right 之间元组相等性的字段。可以采用以下格式:on="fieldName"
、on="fieldNameInLeft=fieldNameInRight"
或on="fieldName, otherFieldName=rightOtherFieldName"
。
innerJoin 语法
innerJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
on="personId"
)
innerJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
on="personId=ownerId"
)
innerJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
select(
search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
ownerId,
name as petName
),
on="personId=ownerId"
)
intersect
intersect
函数封装两个流,A 和 B,并发出 A 中确实存在于 B 中的元组。元组按它们在流 A 中出现的顺序发出。两个流都必须按用于确定相等性的字段(on
参数)进行排序。仅发出 A 中的元组。
intersect 参数
-
StreamA 的 StreamExpression
-
StreamB 的 StreamExpression
-
on
:用于检查 A 和 B 之间的元组相等性的字段。可以采用以下格式:on="fieldName"
、on="fieldNameInLeft=fieldNameInRight"
或on="fieldName, otherFieldName=rightOtherFieldName"
。
交集语法
intersect(
search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc"),
on="a_i"
)
intersect(
search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
on="a_i,a_s"
)
列表
list
函数包装 N 个流表达式,并按顺序打开和迭代每个流。这会产生连接多个流表达式的结果。
列表语法
list(tuple(a="hello world"), tuple(a="HELLO WORLD"))
list(search(collection1, q="*:*", fl="id, prod_ss", sort="id asc"),
search(collection2, q="*:*", fl="id, prod_ss", sort="id asc"))
list(tuple(a=search(collection1, q="*:*", fl="id, prod_ss", sort="id asc")),
tuple(a=search(collection2, q="*:*", fl="id, prod_ss", sort="id asc")))
合并
merge
函数合并两个或更多个流表达式并维护底层流的顺序。由于维护了顺序,底层流的排序必须与提供给 merge 函数的 on 参数对齐。
合并参数
-
流表达式 A
-
流表达式 B
-
可选流表达式 C、D、...、Z
-
on
:用于执行合并的排序条件。格式为fieldName order
,其中 order 为asc
或desc
。可以以fieldA order, fieldB order
的格式提供多个字段。
合并语法
# Merging two stream expressions together
merge(
search(collection1,
q="id:(0 3 4)",
qt="/export",
fl="id,a_s,a_i,a_f",
sort="a_f asc"),
search(collection1,
q="id:(1)",
qt="/export",
fl="id,a_s,a_i,a_f",
sort="a_f asc"),
on="a_f asc")
# Merging four stream expressions together. Notice that while the sorts of each stream are not identical they are
# comparable. That is to say the first N fields in each stream's sort matches the N fields in the merge's on clause.
merge(
search(collection1,
q="id:(0 3 4)",
qt="/export",
fl="id,fieldA,fieldB,fieldC",
sort="fieldA asc, fieldB desc"),
search(collection1,
q="id:(1)",
qt="/export",
fl="id,fieldA",
sort="fieldA asc"),
search(collection2,
q="id:(10 11 13)",
qt="/export",
fl="id,fieldA,fieldC",
sort="fieldA asc"),
search(collection3,
q="id:(987)",
qt="/export",
fl="id,fieldA,fieldC",
sort="fieldA asc"),
on="fieldA asc")
null
null 表达式是一个有用的实用程序函数,用于在执行并行关系代数(连接、交集、汇总等)时了解瓶颈。null 函数读取底层流中的所有元组,并返回一个元组,其中包含计数和处理时间。由于 null 流本身增加的开销最小,因此可用于隔离 Solr 的 /export
处理程序的性能。如果 /export 处理程序的性能不是瓶颈,那么瓶颈很可能出现在运行流装饰器的 worker 中。
null 表达式可以由 parallel 函数包装并发送到 worker 节点。在此场景中,每个 worker 将返回一个元组,其中包含在 worker 上处理的元组计数和该 worker 的计时信息。这提供了有价值的信息,例如
-
随着添加更多 worker,/export 处理程序的性能是否会提高。
-
元组是否在工作线程之间均匀分布,或者哈希分区是否将更多文档发送到单个工作线程。
-
所有工作线程是否以相同的速度处理数据,或者其中一个工作线程是瓶颈的来源。
outerHashJoin
outerHashJoin
函数包装两个流(左和右),并从左流中发出元组。如果右流中存在一个相等的元组(由 on
参数定义),则该元组中的值将包含在发出的元组中。右流中不必存在相等的元组,左元组才能发出。这支持一对一、一对多、多对一和多对多左外连接场景。元组按它们在左流中出现的顺序发出。流的顺序无关紧要。如果两个元组都包含一个同名字段,则在发出的元组中将使用右流中的值。
你可以使用 select
函数包装传入流,以具体说明哪些字段值包含在发出的元组中。
当左和右的元组无法按相同顺序放置时,可以使用 outerHashJoin 流。由于元组无序,因此此流通过在打开操作期间读取右流中的所有值并会将所有元组存储在内存中来发挥作用。其结果是内存占用空间等于右流的大小。
outerHashJoin 参数
-
StreamExpression for StreamLeft
-
hashed=StreamRight 的 StreamExpression
-
on
:用于检查 Left 和 Right 之间元组相等性的字段。可以采用以下格式:on="fieldName"
、on="fieldNameInLeft=fieldNameInRight"
或on="fieldName, otherFieldName=rightOtherFieldName"
。
outerHashJoin 语法
outerHashJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
hashed=search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
on="personId"
)
outerHashJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
hashed=search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
on="personId=ownerId"
)
outerHashJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
hashed=select(
search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
ownerId,
name as petName
),
on="personId=ownerId"
)
parallel
parallel
函数包装一个流表达式,并将其发送到 N 个工作线程节点以并行处理。
parallel
函数要求将 partitionKeys
参数提供给基础搜索。partitionKeys
参数将在工作线程节点之间对搜索结果(元组)进行分区。具有与 partitionKeys
相同值的元组将被打乱到相同的工作线程节点。
parallel
函数保持工作线程节点返回的元组的排序顺序,因此排序条件必须包含工作线程返回的元组的排序顺序。
例如,如果您按年、月和日进行排序,则只要有足够多的不同年份来将元组分散到工作线程节点,您就可以仅按年进行分区。
Solr 允许对超过 4 个字段进行排序,但出于速度考虑,您不能指定超过 4 个 partitionKeys。此外,当一个或两个键足以分布元组时,指定多个 partitionKeys
是过度操作。
当底层搜索流将从集合中发出大量元组时,设计了并行流。如果搜索流仅使用 parallel
从集合中发出数据的一个小子集,则可能会更慢。
工作器集合
工作器节点可以与数据来自同一个集合,或者它们可以是完全不同的集合,甚至可以是仅存在于 |
parallel 参数
-
collection
:要将 StreamExpression 发送到的工作器集合的名称。 -
StreamExpression
:要发送到工作器集合的表达式。 -
workers
:要将表达式发送到的工作器集合中的工作器数量。 -
zkHost
:工作器集合所在的 ZooKeeper 连接字符串(可选)。仅当与您连接到的 Solr 实例使用相同的 ZkHost 时,才会包含 Zookeeper 凭据和 ACL(chroot
可以不同)。 -
sort
:工作器节点返回的元组排序的排序条件。
parallel 语法
parallel(workerCollection,
rollup(search(collection1, q="*:*", fl="id,year_i,month_i,day_i", qt="/export", sort="year_i desc,month_i desc,day_i asc", partitionKeys="year_i"),
over="year_i", count(*)),
workers="20",
zkHost="localhost:9983",
sort="year_i desc")
上面的表达式显示了一个包装 rollup
函数的 parallel
函数。这将导致 rollup
函数在 20 个工作器节点上并行运行。
预热
对于具有相同数量的工作器和
|
plist
plist
函数包装 N 个流表达式,并行打开流,并依次迭代每个流。list
和 plist
之间的区别在于流是并行打开的。由于许多流(如 facet
、stats
和 significantTerms
)在打开时会将繁重的操作推送到 Solr,因此 plist 函数可以通过并行执行这些操作来显著提高性能。
plist 语法
plist(tuple(a="hello world"), tuple(a="HELLO WORLD"))
plist(search(collection1, q="*:*", fl="id, prod_ss", sort="id asc"),
search(collection2, q="*:*", fl="id, prod_ss", sort="id asc"))
plist(tuple(a=search(collection1, q="*:*", fl="id, prod_ss", sort="id asc")),
tuple(a=search(collection2, q="*:*", fl="id, prod_ss", sort="id asc")))
priority
priority
函数是 executor 函数的简单优先级调度程序。executor
函数没有直接的任务优先级概念;相反,它只是按照从其底层流中读取的顺序执行任务。priority
函数提供了将较高优先级任务调度到较早提交的较低优先级任务之前的能力。
priority
函数包装两个 topic
函数,它们都发出包含要执行的流表达式的元组。第一个主题被认为是较高优先级任务队列。
每次调用 priority
函数时,它都会检查较高优先级任务队列以查看是否有任何要执行的任务。如果任务在较高优先级队列中等待,则优先级函数将发出较高优先级任务。如果没有要运行的高优先级任务,则会发出较低优先级队列任务。
priority
函数每次调用时只会从一个队列中发出一个任务批次。这确保了在较高优先级队列没有要运行的任务之前,不会执行任何较低优先级任务。
priority 语法
daemon(id="myDaemon",
executor(threads=10,
priority(topic(checkpointCollection, storedExpressions, q="priority:high", fl="id, expr_s", initialCheckPoint=0,id="highPriorityTasks"),
topic(checkpointCollection, storedExpressions, q="priority:low", fl="id, expr_s", initialCheckPoint=0,id="lowPriorityTasks"))))
在上面的示例中,daemon
函数正在迭代调用 executor。每次调用时,executor
函数都会执行 priority
函数发出的任务。priority
函数包装两个主题。第一个主题是较高优先级任务队列,第二个主题是较低优先级主题。
reduce
reduce
函数包装一个内部流,并按公共字段对元组进行分组。
每个元组组都由一个可插入的 reduce 操作作为单个块进行操作。Solr 提供的组操作实现了分布式分组功能。组操作还用作示例 reduce 操作,在构建自定义 reduce 操作时可以参考该操作。
reduce 函数依赖于底层流的排序顺序。因此,底层流的排序顺序必须与按字段分组一致。 |
rollup
rollup
函数包装另一个流函数,并汇总存储桶字段上的聚合。rollup 函数依赖于底层流的排序顺序,一次汇总一个分组的聚合。因此,底层流的排序顺序必须与 rollup 函数的 over
参数中的字段匹配。
rollup 函数还需要处理整个结果集才能执行其聚合。当底层流是 search
函数时,/export
处理程序可用于向 rollup 函数提供完全排序的结果集。这种排序方法允许 rollup 函数对非常高基数的字段执行聚合。这种方法的缺点是必须对元组进行排序,并通过网络将其流式传输到工作节点才能进行聚合。对于低到中等基数字段的更快聚合,可以使用 facet
函数。
rollup 参数
-
StreamExpression
(强制) -
over
:(强制)按其分组的字段列表。 -
metrics
:(强制)要计算的指标列表。当前支持的指标有sum(col)
、avg(col)
、min(col)
、max(col)
、count(*)
。
rollup 语法
rollup(
search(collection1, q="*:*", qt="/export", fl="a_s,a_i,a_f", qt="/export", sort="a_s asc"),
over="a_s",
sum(a_i),
sum(a_f),
min(a_i),
min(a_f),
max(a_i),
max(a_f),
avg(a_i),
avg(a_f),
count(*)
)
该示例展示了包装 search 函数的 rollup 函数。请注意,search 函数使用 /export
处理程序向 rollup 流提供整个结果集。还要注意,search 函数的 sort
参数与 rollup 的 over
参数匹配。这允许 rollup 函数一次汇总一个组的 a_s
字段。
scoreNodes
请参阅 图遍历中的部分。
select
select
函数包装一个流表达式并输出包含来自输入元组的字段的子集或修改后的字段的元组。输出元组中包含的字段列表可以包含别名,以有效地重命名字段。select
流支持操作和评估器。可以提供一个操作和评估器列表,以对任何字段执行操作,例如 replace, add, if
等。
select 参数
-
StreamExpression
-
fieldName
:要包含在输出元组中的字段的名称(可以包含多个),例如outputTuple[fieldName] = inputTuple[fieldName]
。fieldName
可以是通配符模式,例如a_*
,以选择所有以a_
开头的字段。 -
fieldName as aliasFieldName
:要包含在输出元组中的别名字段名称(可以包含多个),例如outputTuple[aliasFieldName] = incomingTuple[fieldName]
-
replace(fieldName, value, withValue=replacementValue)
:如果incomingTuple[fieldName] == value
,则outgoingTuple[fieldName]
将设置为replacementValue
。value
可以是字符串“null”,以用其他值替换 null 值。 -
replace(fieldName, value, withField=otherFieldName)
:如果incomingTuple[fieldName] == value
,则outgoingTuple[fieldName]
将设置为incomingTuple[otherFieldName]
的值。value
可以是字符串“null”,以用其他值替换 null 值。
select 语法
// output tuples with fields teamName, wins, losses, and winPercentages where a null value for wins or losses is translated to the value of 0
select(
search(collection1, fl="id,teamName_s,wins,losses", q="*:*", qt="/export", sort="id asc"),
teamName_s as teamName,
wins,
losses,
replace(wins,null,withValue=0),
replace(losses,null,withValue=0),
if(eq(0,wins), 0, div(add(wins,losses), wins)) as winPercentage
)
sort
sort
函数包装一个流表达式并重新排序元组。sort 函数以新的排序顺序发出所有传入元组。sort 函数从传入流中读取所有元组,使用具有 O(nlog(n))
性能特征的算法重新排序它们,其中 n 是传入流中元组的总数,然后以新的排序顺序输出元组。由于所有元组都读入内存,因此此函数的内存消耗随着传入流中元组的数量线性增长。
sort 语法
下面的表达式查找狗主人,并按主人和宠物姓名对结果进行排序。请注意,它首先按人/主人 ID 排序,然后按主人和宠物姓名重新排序最终输出,从而使用了高效的 innerJoin。
sort(
innerJoin(
search(people, q="*:*", qt="/export", fl="id,name", sort="id asc"),
search(pets, q="type:dog", qt="/export", fl="owner,petName", sort="owner asc"),
on="id=owner"
),
by="name asc, petName asc"
)
unique
unique
函数包装一个流表达式,并基于 over
参数发出一个元组的唯一流。unique 函数依赖于底层流的排序顺序。over
参数必须与底层流的排序顺序匹配。
unique 函数实现了一个非同址唯一算法。这意味着具有相同唯一 over
字段的记录不需要在同一个分片上同址。在并行执行时,partitionKeys
参数必须与唯一 over
字段相同,以便具有相同键的记录被洗牌到同一个工作进程。
update
update
函数包装另一个函数,并将元组发送到 SolrCloud 集合中作为文档进行索引。
update 参数
-
destinationCollection
:(强制) 将对元组进行索引的集合。 -
batchSize
:(可选,默认为250
) 索引批处理大小。 -
pruneVersionField
:(可选,默认为true
) 是否从元组中修剪_version_
值 -
StreamExpression
:(强制)
update 语法
update(destinationCollection,
batchSize=500,
search(collection1,
q=*:*,
qt="/export",
fl="id,a_s,a_i,a_f,s_multi,i_multi",
sort="a_f asc, a_i asc"))
上面的示例将 search
函数返回的元组发送到 destinationCollection
进行索引。
如本示例所示,将 search(…)
包装起来是此装饰器的常见用法:将文档作为元组从集合中读取出来,以某种方式处理或修改它们,然后将它们添加回新集合中。因此,pruneVersionField=true
是默认行为——在将元组转换为 Solr 文档时删除内部流中找到的任何 _version_
值,以防止 乐观并发 约束导致任何意外错误。