流装饰器参考

cartesianProduct

cartesianProduct 函数将具有多值字段(即数组)的单个元组转换为多个元组,每个元组对应数组字段中的一个值。也就是说,给定一个包含 fieldA 字段的 N 个值的数组的单个元组,cartesianProduct 函数将输出 N 个元组,每个元组包含原始元组数组中的一个值。从本质上讲,您可以展平数组以进行进一步处理。

例如,使用 cartesianProduct,您可以将此元组

{
  "fieldA": "foo",
  "fieldB": ["bar","baz","bat"]
}

转换为以下 3 个元组

{
  "fieldA": "foo",
  "fieldB": "bar"
}
{
  "fieldA": "foo",
  "fieldB": "baz"
}
{
  "fieldA": "foo",
  "fieldB": "bat"
}

cartesianProduct 参数

  • 传入流: (必需)单个传入流。

  • fieldName 或 evaluator: (必需)要展平值的字段名称或结果应展平的计算器。

  • productSort='fieldName ASC|DESC': (可选)新生成元组的排序顺序。

cartesianProduct 语法

cartesianProduct(
  <stream>,
  <fieldName | evaluator> [as newFieldName],
  productSort='fieldName ASC|DESC'
)

cartesianProduct 示例

以下示例显示了此源元组的不同输出

{
  "fieldA": "valueA",
  "fieldB": ["valueB1","valueB2"],
  "fieldC": [1,2,3]
}

单个字段,无排序

cartesianProduct(
  search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
  fieldB
)

{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": [1,2,3]
}
{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": [1,2,3]
}

单个计算器,无排序

cartesianProduct(
  search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
  sequence(3,4,5) as fieldE
)

{
  "fieldA": "valueA",
  "fieldB": ["valueB1","valueB2"],
  "fieldC": [1,2,3],
  "fieldE": 4
}
{
  "fieldA": "valueA",
  "fieldB": ["valueB1","valueB2"],
  "fieldC": [1,2,3],
  "fieldE": 9
}
{
  "fieldA": "valueA",
  "fieldB": ["valueB1","valueB2"],
  "fieldC": [1,2,3],
  "fieldE": 14
}

单个字段,按值排序

cartesianProduct(
  search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
  fieldB,
  productSort="fieldB desc"
)

{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": [1,2,3]
}
{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": [1,2,3]
}

单个计算器,按计算器值排序

cartesianProduct(
  search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
  sequence(3,4,5) as fieldE,
  productSort="newFieldE desc"
)

{
  "fieldA": "valueA",
  "fieldB": ["valueB1","valueB2"],
  "fieldC": [1,2,3],
  "fieldE": 14
}
{
  "fieldA": "valueA",
  "fieldB": ["valueB1","valueB2"],
  "fieldC": [1,2,3],
  "fieldE": 9
}
{
  "fieldA": "valueA",
  "fieldB": ["valueB1","valueB2"],
  "fieldC": [1,2,3],
  "fieldE": 4
}

重命名的单个字段,按值排序

cartesianProduct(
  search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
  fieldB as newFieldB,
  productSort="fieldB desc"
)

{
  "fieldA": "valueA",
  "fieldB": ["valueB1","valueB2"],
  "fieldC": [1,2,3]
  "newFieldB": "valueB2",
}
{
  "fieldA": "valueA",
  "fieldB": ["valueB1","valueB2"],
  "fieldC": [1,2,3]
  "newFieldB": "valueB1",
}

多个字段,无排序

cartesianProduct(
  search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
  fieldB,
  fieldC
)

{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": 1
}
{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": 2
}
{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": 3
}
{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": 1
}
{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": 2
}
{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": 3
}

多个字段,按单个字段排序

cartesianProduct(
  search(collection1, qt="/export", q="*:*", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
  fieldB,
  fieldC,
  productSort="fieldC asc"
)

{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": 1
}
{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": 1
}
{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": 2
}
{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": 2
}
{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": 3
}
{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": 3
}

多个字段,按多个字段排序

cartesianProduct(
  search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
  fieldB,
  fieldC,
  productSort="fieldC asc, fieldB desc"
)

{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": 1
}
{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": 1
}
{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": 2
}
{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": 2
}
{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": 3
}
{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": 3
}

字段和计算器,无排序

cartesianProduct(
  search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
  sequence(3,4,5) as fieldE,
  fieldB
)

{
  "fieldA": "valueA",
  "fieldB": valueB1,
  "fieldC": [1,2,3],
  "fieldE": 4
}
{
  "fieldA": "valueA",
  "fieldB": valueB2,
  "fieldC": [1,2,3],
  "fieldE": 4
}
{
  "fieldA": "valueA",
  "fieldB": valueB1,
  "fieldC": [1,2,3],
  "fieldE": 9
}
{
  "fieldA": "valueA",
  "fieldB": valueB2,
  "fieldC": [1,2,3],
  "fieldE": 9
}
{
  "fieldA": "valueA",
  "fieldB": valueB1,
  "fieldC": [1,2,3],
  "fieldE": 14
}
{
  "fieldA": "valueA",
  "fieldB": valueB2,
  "fieldC": [1,2,3],
  "fieldE": 14
}

如您在上面的示例中所见,cartesianProduct 函数确实支持跨多个字段和/或计算器展平元组。

classify

classify 函数使用逻辑回归文本分类模型对元组进行分类。它专门设计为与使用 train 函数 训练的模型配合使用。classify 函数使用 model 函数 检索存储的模型,然后使用该模型对元组流进行评分。分类器读取的元组必须包含可用于分类的文本字段。classify 函数使用 Lucene 分析器从文本中提取特征,以便应用模型。默认情况下,classify 函数使用元组中文本字段的名称查找分析器。如果工作节点上的 Solr 架构不包含此字段,则可以通过指定 analyzerField 参数在另一个字段中查找分析器。

对每个分类的元组分配两个分数

  • probability_d*: 0 到 1 之间的浮点数,描述元组属于该类的概率。这在分类用例中很有用。

  • score_d*: 文档的分数,未在 0 到 1 之间压缩。分数可以是正数或负数。分数越高,文档越适合该类。此未压缩分数在查询重新排序和推荐用例中很有用。当多个高排名文档的 probability_d 分数为 1 时,此分数特别有用,因为这不会提供有意义的文档排名。

classify 参数

  • model expression:(必需)检索存储的逻辑回归模型。

  • field:(必需)将分类器应用到的元组中的字段。默认情况下,将使用架构中此字段的分析器提取特征。

  • analyzerField:(可选)指定在架构中查找分析器的其他字段。

classify 语法

classify(model(modelCollection,
             id="model1",
             cacheMillis=5000),
         search(contentCollection,
             q="id:(a b c)",
             qt="/export",
             fl="text_t, id",
             sort="id asc"),
             field="text_t")

在上面的示例中,classify expression 使用 api 函数检索模型。然后,它对 search 函数返回的元组进行分类。text_t 字段用于文本分类,Solr 架构中 text_t 字段的分析器用于分析文本并提取特征。

commit

commit 函数包装单个流 (A),并且给定集合和批处理大小,当批处理大小已满足或达到流的末尾时,将向集合发送提交消息。提交流最常与更新流一起使用,因此提交将考虑来自更新流的可能的摘要元组。进入提交流的所有元组都将从提交流中返回 - 不会丢弃任何元组,也不会添加任何元组。

commit 参数

  • collection:要向其发送提交消息的集合(必需)

  • batchSize:提交批处理大小,在达到批处理大小时发送提交消息。如果未提供(或提供为值 0),则仅在传入流的末尾发送提交。

  • waitFlush:直接传递给提交处理程序的值(true/false,默认值:false)

  • waitSearcher:直接传递给提交处理程序的值(true/false,默认值:false)

  • softCommit:直接传递给提交处理程序的值(true/false,默认值:false)

  • StreamA 的 StreamExpression(必需)

commit 语法

commit(
    destinationCollection,
    batchSize=2,
    update(
        destinationCollection,
        batchSize=5,
        search(collection1, q="*:*", qt="/export", fl="id,a_s,a_i,a_f,s_multi,i_multi", sort="a_f asc, a_i asc")
    )
)

complement

complement 函数包装两个流 (A 和 B),并发出 A 中不存在 B 中的元组。元组按其在流 A 中出现的顺序发出。两个流都必须按用于确定相等性的字段进行排序(使用 on 参数)。

complement 参数

  • StreamA 的 StreamExpression

  • StreamB 的 StreamExpression

  • on:用于检查 A 和 B 之间的元组相等性的字段。可以采用以下格式:on="fieldName"on="fieldNameInLeft=fieldNameInRight"on="fieldName, otherFieldName=rightOtherFieldName"

complement 语法

complement(
  search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
  search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc"),
  on="a_i"
)

complement(
  search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
  search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
  on="a_i,a_s"
)

daemon

daemon 函数包装另一个函数,并使用内部线程按时间间隔运行它。daemon 函数可用于提供连续推送和拉取流。

连续推送流

对于连续推送流,daemon 函数包装另一个函数,然后将其发送到 /stream 处理程序以执行。/stream 处理程序识别 daemon 函数并将其保留在内存中,以便它可以按时间间隔运行其内部函数。

为了便于推送元组,daemon 函数必须包装另一个流装饰器,该装饰器将元组推送到某个位置。一个示例是 update 函数,它包装一个流并将元组发送到另一个 SolrCloud 集合进行索引。

daemon 语法

daemon(id="uniqueId",
       runInterval="1000",
       terminate="true",
       update(destinationCollection,
              batchSize=100,
              topic(checkpointCollection,
                    topicCollection,
                    q="topic query",
                    fl="id, title, abstract, text",
                    id="topicId",
                    initialCheckpoint=0)
               )
        )

上面的示例代码显示了 daemon 函数包装 update 函数,后者包装 topic 函数。当此表达式发送到 /stream 处理程序时,/stream 处理程序会看到 daemon 函数并将其保留在内存中,它将在其中按时间间隔运行。在此特定示例中,daemon 函数将每秒运行一次 update 函数。update 函数包装一个 topic 函数,该函数将以批处理形式流式传输与 topic 函数查询匹配的元组。对主题的每次后续调用都将返回主题的下一批元组。update 函数会将与主题匹配的所有元组发送到另一个集合进行索引。terminate 参数告诉守护进程在 topic 函数停止发送元组时终止。

这样做的效果是将与特定查询匹配的文档推送到另一个集合中。可以插入自定义推送函数,将文档从 Solr 推送到其他系统,例如 Kafka 或电子邮件系统。

推送流还可以用于连续的后台聚合场景,其中聚合在后台按时间间隔汇总并推送到其他 Solr 集合。另一个用例是连续的后台机器学习模型优化,其中优化的模型被推送到另一个 Solr 集合,可以在其中将其集成到查询中。

/stream 处理程序支持一组小 命令,用于列出和控制守护进程函数

http://localhost:8983/solr/collection/stream?action=list

此命令将提供在特定节点上运行的当前守护进程的列表及其当前状态。

http://localhost:8983/solr/collection/stream?action=stop&id=daemonId

此命令将停止特定守护进程函数,但将其保留在内存中。

http://localhost:8983/solr/collection/stream?action=start&id=daemonId

此命令将启动已停止的特定守护进程函数。

http://localhost:8983/solr/collection/stream?action=kill&id=daemonId

此命令将停止特定守护进程函数并将其从内存中删除。

连续拉取流

DaemonStream java 类(SolrJ 库的一部分)也可以嵌入到 java 应用程序中,以提供连续拉取流。示例代码

StreamContext context = new StreamContext()
SolrClientCache cache = new SolrClientCache();
context.setSolrClientCache(cache);

Map topicQueryParams = new HashMap();
topicQueryParams.put("q","hello");  // The query for the topic
topicQueryparams.put("rows", "500"); // How many rows to fetch during each run
topicQueryparams.put("fl", "id", "title"); // The field list to return with the documents

TopicStream topicStream = new TopicStream(zkHost,        // Host address for the ZooKeeper service housing the collections
                                         "checkpoints",  // The collection to store the topic checkpoints
                                         "topicData",    // The collection to query for the topic records
                                         "topicId",      // The id of the topic
                                         -1,             // checkpoint every X tuples, if set -1 it will checkpoint after each run.
                                          topicQueryParams); // The query parameters for the TopicStream

DaemonStream daemonStream = new DaemonStream(topicStream,             // The underlying stream to run.
                                             "daemonId",              // The id of the daemon
                                             1000,                    // The interval at which to run the internal stream
                                             500);                    // The internal queue size for the daemon stream. Tuples will be placed in the queue
                                                                      // as they are read by the internal thread.
                                                                      // Calling read() on the daemon stream reads records from the internal queue.

daemonStream.setStreamContext(context);

daemonStream.open();

//Read until it's time to shutdown the DaemonStream. You can define the shutdown criteria.
while(!shutdown()) {
    Tuple tuple = daemonStream.read() // This will block until tuples become available from the underlying stream (TopicStream)
                                      // The EOF tuple (signaling the end of the stream) will never occur until the DaemonStream has been shutdown.
    //Do something with the tuples
}

// Shutdown the DaemonStream.
daemonStream.shutdown();

//Read the DaemonStream until the EOF Tuple is found.
//This allows the underlying stream to perform an orderly shutdown.

while(true) {
    Tuple tuple = daemonStream.read();
    if(tuple.EOF) {
        break;
    } else {
        //Do something with the tuples.
    }
}
//Finally close the stream
daemonStream.close();

delete

delete 函数包装其他函数,并使用找到的 id_version_ 值,将元组作为 按 ID 删除 命令发送到 SolrCloud 集合。

这类似于下面描述的 update() 函数。

delete 参数

  • destinationCollection:(强制)将删除元组的集合。

  • batchSize:(可选,默认为 250)删除批大小。

  • pruneVersionField:(可选,默认为 false)是否从元组中删除 _version_

  • StreamExpression:(强制)

delete 语法

 delete(collection1,
        batchSize=500,
        search(collection1,
               q=old_data:true,
               qt="/export",
               fl="id",
               sort="a_f asc, a_i asc"))

上面的示例使用 search 函数针对 collection1 返回的元组,并将找到的每个文档的 id 值转换为针对同一 collection1 的删除请求。

update() 函数不同,delete() 的默认值为 pruneVersionField=false — 在将元组转换为“按 ID 删除”请求时保留在内部流中找到的任何 _version_ 值。这确保使用此流不会(默认情况下)删除在执行 search(…​) 之后 但在 delete(…​) 处理该元组(利用 乐观并发 约束)之前更新的任何文档。

希望忽略并发更新并删除所有匹配文档的用户应将 pruneVersionField=true 设置为(或确保内部流元组不包含任何 _version_ 值)。

预期并发更新且希望“跳过”任何失败删除的用户应考虑配置 TolerantUpdateProcessorFactory

eval

eval 函数允许在动态生成新的流表达式并对其进行评估的情况下使用用例。eval 函数包装流表达式并从底层流中读取单个元组。然后,eval 函数从元组的 expr_s 字段中检索字符串流表达式。然后,eval 函数编译字符串流表达式并发出元组。

eval 参数

  • StreamExpression: (强制) 提供要评估的流表达式的流。

eval 语法

eval(expr)

在上面的示例中,eval 表达式从底层表达式中读取第一个元组。然后,它编译并执行 expr_s 字段中的字符串流表达式。

示例

eval(select(
  echo("tuple(answer=42)"),
  echo as expr_s
))

输出

{
  "result-set": {
    "docs": [
      {
        "answer": "42"
      },
      {
        "EOF": true,
        "RESPONSE_TIME": 0
      }
    ]
  }
}

executor

executor 函数包装包含流表达式的流源,并并行执行表达式。executor 函数在每个元组的 expr_s 字段中查找表达式。executor 函数有一个内部线程池,该线程池运行在同一工作节点上并行编译和运行表达式的任务。此函数还可以通过将其包装在 parallel 函数中来跨工作节点并行化,以在集群中并行执行表达式。

executor 函数不对其运行的表达式的输出执行任何特定操作。因此,执行的表达式必须包含将元组推送到其目标的逻辑。可以在正在执行的表达式中包含 update 函数,以便将元组发送到 SolrCloud 集合进行存储。

此模型允许异步执行作业,其中输出存储在 SolrCloud 集合中,可以在作业进行时访问该集合。

executor 参数

  • threads: (可选) 执行器线程池中用于执行表达式的线程数。

  • StreamExpression:包含要执行的流表达式的流源(强制性)。

executor 语法

daemon(id="myDaemon",
       terminate="true",
       executor(threads=10,
                topic(checkpointCollection
                      storedExpressions,
                      q="*:*",
                      fl="id, expr_s",
                      initialCheckPoint=0,
                      id="myTopic")))

在上面的示例中,daemon 封装了一个 executor,该 executor 封装了一个 topic,该 topic 返回包含要执行的表达式的元组。当发送到流处理程序时,daemon 将按一定时间间隔调用 executor,这将导致 executor 从 topic 中读取并执行在 expr_s 字段中找到的表达式。daemon 将重复调用 executor,直到遍历完与 topic 匹配的所有元组,然后它将终止。这是从 topic 队列执行批处理流表达式的办法。

fetch

fetch 函数遍历流并获取其他字段,并将它们添加到元组中。fetch 函数分批获取以限制对 Solr 的回调次数。从 fetch 函数流出的元组将包含原始字段和获取的其他字段。fetch 函数支持一对一获取。流源包含重复键的多对一获取也适用,但一对多获取目前不受此函数支持。

fetch 参数

  • Collection:从中获取字段的集合(强制性)。

  • StreamExpression:fetch 函数的流源(强制性)。

  • fl:要获取的字段(强制性)。

  • on:用于检查流源和获取记录之间的元组相等性的字段。格式为 on="fieldNameInTuple=fieldNameInCollection"

  • batchSize:批处理获取大小(可选)。

fetch 语法

fetch(addresses,
      search(people, q="*:*", qt="/export", fl="username, firstName, lastName", sort="username asc"),
      fl="streetAddress, city, state, country, zip",
      on="username=userId")

上面的示例通过将元组中的用户名与地址集合中的 userId 字段进行匹配,为用户获取地址。

having

having 表达式封装一个流,并对每个元组应用布尔运算。它仅发出布尔运算返回的元组。

having 参数

  • StreamExpression:having 函数的流源(强制性)。

  • booleanEvaluator:支持以下布尔运算:eq(等于)、gt(大于)、lt(小于)、gteq(大于或等于)、lteq(小于或等于)、andoreor(异或)和 not。布尔运算符可以与其他运算符嵌套以形成复杂的布尔逻辑。

比较评估器将特定字段中的值与一个值进行比较,无论该值是字符串、数字还是布尔值。例如:eq(field1, 10),如果 field1 等于 10,则返回 true

having 语法

having(rollup(over=a_s,
              sum(a_i),
              search(collection1,
                     q="*:*",
                     qt="/export",
                     fl="id,a_s,a_i,a_f",
                     sort="a_s asc")),
       and(gt(sum(a_i), 100), lt(sum(a_i), 110)))

在此示例中,having 表达式会迭代来自 rollup 表达式的聚合元组,并发出其中 sum(a_i) 字段大于 100 且小于 110 的所有元组。

leftOuterJoin

leftOuterJoin 函数包装两个流,Left 和 Right,并发出来自 Left 的元组。如果 Right 中存在一个元组相等(由 on 定义),则该元组中的值将包含在发出的元组中。Right 中的相等元组不必存在,即可发出 Left 元组。这支持一对一、一对多、多对一和多对多左外部连接场景。元组按它们在 Left 流中出现的顺序发出。两个流都必须按用于确定相等性的字段进行排序(使用 on 参数)。如果两个元组都包含同名字段,则发出元组中将使用来自 Right 流的值。

你可以使用 select 函数包装传入流,以具体说明哪些字段值包含在发出的元组中。

leftOuterJoin 参数

  • StreamExpression for StreamLeft

  • StreamExpression for StreamRight

  • on:用于检查 Left 和 Right 之间元组相等性的字段。可以采用以下格式:on="fieldName"on="fieldNameInLeft=fieldNameInRight"on="fieldName, otherFieldName=rightOtherFieldName"

leftOuterJoin 语法

leftOuterJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
  on="personId"
)

leftOuterJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
  on="personId=ownerId"
)

leftOuterJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  select(
    search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
    ownerId,
    name as petName
  ),
  on="personId=ownerId"
)

hashJoin

hashJoin 函数封装两个流,Left 和 Right,对于 Left 中的每个元组,如果它存在于 Right 中,则会发出一个包含这两个元组的字段的元组。这支持一对一、一对多、多对一和多对多内部连接场景。元组按它们在 Left 流中出现的顺序发出。流的顺序无关紧要。如果两个元组包含同名字段,则在发出的元组中将使用来自 Right 流的值。

你可以使用 select 函数包装传入流,以具体说明哪些字段值包含在发出的元组中。

当 Left 和 Right 的元组无法按相同顺序放置时,可以使用 hashJoin 函数。由于元组无序,此流函数通过在 open 操作期间读取 Right 流中的所有值来工作,并将所有元组存储在内存中。这导致内存占用量等于 Right 流的大小。

hashJoin 参数

  • StreamExpression for StreamLeft

  • hashed=StreamRight 的 StreamExpression

  • on:用于检查 Left 和 Right 之间元组相等性的字段。可以采用以下格式:on="fieldName"on="fieldNameInLeft=fieldNameInRight"on="fieldName, otherFieldName=rightOtherFieldName"

hashJoin 语法

hashJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  hashed=search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
  on="personId"
)

hashJoin(
  search(people, q="*:*", fl="personId,name", sort="personId asc"),
  hashed=search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
  on="personId=ownerId"
)

hashJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  hashed=select(
    search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
    ownerId,
    name as petName
  ),
  on="personId=ownerId"
)

innerJoin

封装两个流,Left 和 Right。对于 Left 中的每个元组,如果它存在于 Right 中,则会发出一个包含这两个元组的字段的元组。这支持一对一、一对多、多对一和多对多内部连接场景。元组按它们在 Left 流中出现的顺序发出。两个流都必须按用于确定相等性的字段(“on”参数)进行排序。如果两个元组包含同名字段,则在发出的元组中将使用来自 Right 流的值。你可以使用 select(…​) 表达式封装传入流,以具体说明哪些字段值包含在发出的元组中。

innerJoin 参数

  • StreamExpression for StreamLeft

  • StreamExpression for StreamRight

  • on:用于检查 Left 和 Right 之间元组相等性的字段。可以采用以下格式:on="fieldName"on="fieldNameInLeft=fieldNameInRight"on="fieldName, otherFieldName=rightOtherFieldName"

innerJoin 语法

innerJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
  on="personId"
)

innerJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
  on="personId=ownerId"
)

innerJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  select(
    search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
    ownerId,
    name as petName
  ),
  on="personId=ownerId"
)

intersect

intersect 函数封装两个流,A 和 B,并发出 A 中确实存在于 B 中的元组。元组按它们在流 A 中出现的顺序发出。两个流都必须按用于确定相等性的字段(on 参数)进行排序。仅发出 A 中的元组。

intersect 参数

  • StreamA 的 StreamExpression

  • StreamB 的 StreamExpression

  • on:用于检查 A 和 B 之间的元组相等性的字段。可以采用以下格式:on="fieldName"on="fieldNameInLeft=fieldNameInRight"on="fieldName, otherFieldName=rightOtherFieldName"

交集语法

intersect(
  search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
  search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc"),
  on="a_i"
)

intersect(
  search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
  search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
  on="a_i,a_s"
)

列表

list 函数包装 N 个流表达式,并按顺序打开和迭代每个流。这会产生连接多个流表达式的结果。

列表参数

  • 流表达式 ...:N 个流表达式

列表语法

list(tuple(a="hello world"), tuple(a="HELLO WORLD"))

list(search(collection1, q="*:*", fl="id, prod_ss", sort="id asc"),
     search(collection2, q="*:*", fl="id, prod_ss", sort="id asc"))

list(tuple(a=search(collection1, q="*:*", fl="id, prod_ss", sort="id asc")),
     tuple(a=search(collection2, q="*:*", fl="id, prod_ss", sort="id asc")))

合并

merge 函数合并两个或更多个流表达式并维护底层流的顺序。由于维护了顺序,底层流的排序必须与提供给 merge 函数的 on 参数对齐。

合并参数

  • 流表达式 A

  • 流表达式 B

  • 可选流表达式 C、D、...、Z

  • on:用于执行合并的排序条件。格式为 fieldName order,其中 order 为 ascdesc。可以以 fieldA order, fieldB order 的格式提供多个字段。

合并语法

# Merging two stream expressions together
merge(
      search(collection1,
             q="id:(0 3 4)",
             qt="/export",
             fl="id,a_s,a_i,a_f",
             sort="a_f asc"),
      search(collection1,
             q="id:(1)",
             qt="/export",
             fl="id,a_s,a_i,a_f",
             sort="a_f asc"),
      on="a_f asc")
# Merging four stream expressions together. Notice that while the sorts of each stream are not identical they are
# comparable. That is to say the first N fields in each stream's sort matches the N fields in the merge's on clause.
merge(
      search(collection1,
             q="id:(0 3 4)",
             qt="/export",
             fl="id,fieldA,fieldB,fieldC",
             sort="fieldA asc, fieldB desc"),
      search(collection1,
             q="id:(1)",
             qt="/export",
             fl="id,fieldA",
             sort="fieldA asc"),
      search(collection2,
             q="id:(10 11 13)",
             qt="/export",
             fl="id,fieldA,fieldC",
             sort="fieldA asc"),
      search(collection3,
             q="id:(987)",
             qt="/export",
             fl="id,fieldA,fieldC",
             sort="fieldA asc"),
      on="fieldA asc")

null

null 表达式是一个有用的实用程序函数,用于在执行并行关系代数(连接、交集、汇总等)时了解瓶颈。null 函数读取底层流中的所有元组,并返回一个元组,其中包含计数和处理时间。由于 null 流本身增加的开销最小,因此可用于隔离 Solr 的 /export 处理程序的性能。如果 /export 处理程序的性能不是瓶颈,那么瓶颈很可能出现在运行流装饰器的 worker 中。

null 表达式可以由 parallel 函数包装并发送到 worker 节点。在此场景中,每个 worker 将返回一个元组,其中包含在 worker 上处理的元组计数和该 worker 的计时信息。这提供了有价值的信息,例如

  1. 随着添加更多 worker,/export 处理程序的性能是否会提高。

  2. 元组是否在工作线程之间均匀分布,或者哈希分区是否将更多文档发送到单个工作线程。

  3. 所有工作线程是否以相同的速度处理数据,或者其中一个工作线程是瓶颈的来源。

null 参数

  • StreamExpression:(强制)null 函数读取的表达式。

null 语法

 parallel(workerCollection,
          null(search(collection1, q="*:*", fl="id,a_s,a_i,a_f", sort="a_s desc", qt="/export", partitionKeys="a_s")),
          workers="20",
          zkHost="localhost:9983",
          sort="a_s desc")

上面的表达式显示了一个并行函数,该函数包装了一个 null 函数。这将导致 null 函数在 20 个工作线程节点上并行运行。每个工作线程将返回一个元组,其中包含处理的元组数和迭代元组所花费的时间。

outerHashJoin

outerHashJoin 函数包装两个流(左和右),并从左流中发出元组。如果右流中存在一个相等的元组(由 on 参数定义),则该元组中的值将包含在发出的元组中。右流中不必存在相等的元组,左元组才能发出。这支持一对一、一对多、多对一和多对多左外连接场景。元组按它们在左流中出现的顺序发出。流的顺序无关紧要。如果两个元组都包含一个同名字段,则在发出的元组中将使用右流中的值。

你可以使用 select 函数包装传入流,以具体说明哪些字段值包含在发出的元组中。

当左和右的元组无法按相同顺序放置时,可以使用 outerHashJoin 流。由于元组无序,因此此流通过在打开操作期间读取右流中的所有值并会将所有元组存储在内存中来发挥作用。其结果是内存占用空间等于右流的大小。

outerHashJoin 参数

  • StreamExpression for StreamLeft

  • hashed=StreamRight 的 StreamExpression

  • on:用于检查 Left 和 Right 之间元组相等性的字段。可以采用以下格式:on="fieldName"on="fieldNameInLeft=fieldNameInRight"on="fieldName, otherFieldName=rightOtherFieldName"

outerHashJoin 语法

outerHashJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  hashed=search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
  on="personId"
)

outerHashJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  hashed=search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
  on="personId=ownerId"
)

outerHashJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  hashed=select(
    search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
    ownerId,
    name as petName
  ),
  on="personId=ownerId"
)

parallel

parallel 函数包装一个流表达式,并将其发送到 N 个工作线程节点以并行处理。

parallel 函数要求将 partitionKeys 参数提供给基础搜索。partitionKeys 参数将在工作线程节点之间对搜索结果(元组)进行分区。具有与 partitionKeys 相同值的元组将被打乱到相同的工作线程节点。

parallel 函数保持工作线程节点返回的元组的排序顺序,因此排序条件必须包含工作线程返回的元组的排序顺序。

例如,如果您按年、月和日进行排序,则只要有足够多的不同年份来将元组分散到工作线程节点,您就可以仅按年进行分区。

Solr 允许对超过 4 个字段进行排序,但出于速度考虑,您不能指定超过 4 个 partitionKeys。此外,当一个或两个键足以分布元组时,指定多个 partitionKeys 是过度操作。

当底层搜索流将从集合中发出大量元组时,设计了并行流。如果搜索流仅使用 parallel 从集合中发出数据的一个小子集,则可能会更慢。

工作器集合

工作器节点可以与数据来自同一个集合,或者它们可以是完全不同的集合,甚至可以是仅存在于 parallel 流表达式中的集合。工作器集合可以是已配置 /stream 处理程序的任何 SolrCloud 集合。与普通的 SolrCloud 集合不同,工作器集合不必保存任何数据。工作器集合可以是仅存在于执行流表达式的空集合。

parallel 参数

  • collection:要将 StreamExpression 发送到的工作器集合的名称。

  • StreamExpression:要发送到工作器集合的表达式。

  • workers:要将表达式发送到的工作器集合中的工作器数量。

  • zkHost:工作器集合所在的 ZooKeeper 连接字符串(可选)。仅当与您连接到的 Solr 实例使用相同的 ZkHost 时,才会包含 Zookeeper 凭据和 ACL(chroot 可以不同)。

  • sort:工作器节点返回的元组排序的排序条件。

parallel 语法

 parallel(workerCollection,
          rollup(search(collection1, q="*:*", fl="id,year_i,month_i,day_i", qt="/export", sort="year_i desc,month_i desc,day_i asc", partitionKeys="year_i"),
                 over="year_i", count(*)),
          workers="20",
          zkHost="localhost:9983",
          sort="year_i desc")

上面的表达式显示了一个包装 rollup 函数的 parallel 函数。这将导致 rollup 函数在 20 个工作器节点上并行运行。

预热

parallel 函数使用哈希查询解析器在工作器之间拆分数据。它在所有文档上执行,并将结果位集缓存在 filterCache 中。

对于具有相同数量的工作器和 partitionKeysparallel 流,第一个查询将比后续查询慢。不为第一个慢查询付出代价的诀窍是为每个新搜索器使用预热查询。以下是针对 2 个工作器和 "year_i" 作为 partionKeyssolrconfig.xml 代码段。

<listener event="newSearcher" class="solr.QuerySenderListener">
<arr name="queries">
    <lst><str name="q">:</str><str name="fq">{!hash workers=2 worker=0}</str><str name="partitionKeys">year_i</str></lst>
    <lst><str name="q">:</str><str name="fq">{!hash workers=2 worker=1}</str><str name="partitionKeys">year_i</str></lst>
</arr>
</listener>

plist

plist 函数包装 N 个流表达式,并行打开流,并依次迭代每个流。listplist 之间的区别在于流是并行打开的。由于许多流(如 facetstatssignificantTerms)在打开时会将繁重的操作推送到 Solr,因此 plist 函数可以通过并行执行这些操作来显著提高性能。

plist 参数

  • 流表达式 ...:N 个流表达式

plist 语法

plist(tuple(a="hello world"), tuple(a="HELLO WORLD"))

plist(search(collection1, q="*:*", fl="id, prod_ss", sort="id asc"),
      search(collection2, q="*:*", fl="id, prod_ss", sort="id asc"))

plist(tuple(a=search(collection1, q="*:*", fl="id, prod_ss", sort="id asc")),
      tuple(a=search(collection2, q="*:*", fl="id, prod_ss", sort="id asc")))

priority

priority 函数是 executor 函数的简单优先级调度程序。executor 函数没有直接的任务优先级概念;相反,它只是按照从其底层流中读取的顺序执行任务。priority 函数提供了将较高优先级任务调度到较早提交的较低优先级任务之前的能力。

priority 函数包装两个 topic 函数,它们都发出包含要执行的流表达式的元组。第一个主题被认为是较高优先级任务队列。

每次调用 priority 函数时,它都会检查较高优先级任务队列以查看是否有任何要执行的任务。如果任务在较高优先级队列中等待,则优先级函数将发出较高优先级任务。如果没有要运行的高优先级任务,则会发出较低优先级队列任务。

priority 函数每次调用时只会从一个队列中发出一个任务批次。这确保了在较高优先级队列没有要运行的任务之前,不会执行任何较低优先级任务。

priority 参数

  • topic expression:(必需)高优先级任务队列

  • topic expression:(必需)低优先级任务队列

priority 语法

daemon(id="myDaemon",
       executor(threads=10,
                priority(topic(checkpointCollection, storedExpressions, q="priority:high", fl="id, expr_s", initialCheckPoint=0,id="highPriorityTasks"),
                         topic(checkpointCollection, storedExpressions, q="priority:low", fl="id, expr_s", initialCheckPoint=0,id="lowPriorityTasks"))))

在上面的示例中,daemon 函数正在迭代调用 executor。每次调用时,executor 函数都会执行 priority 函数发出的任务。priority 函数包装两个主题。第一个主题是较高优先级任务队列,第二个主题是较低优先级主题。

reduce

reduce 函数包装一个内部流,并按公共字段对元组进行分组。

每个元组组都由一个可插入的 reduce 操作作为单个块进行操作。Solr 提供的组操作实现了分布式分组功能。组操作还用作示例 reduce 操作,在构建自定义 reduce 操作时可以参考该操作。

reduce 函数依赖于底层流的排序顺序。因此,底层流的排序顺序必须与按字段分组一致。

reduce 参数

  • StreamExpression:(强制)

  • by:(强制)按其分组的字段的逗号分隔列表。

  • Reduce 操作:(强制)

reduce 语法

reduce(search(collection1, q="*:*", qt="/export", fl="id,a_s,a_i,a_f", sort="a_s asc, a_f asc"),
       by="a_s",
       group(sort="a_f desc", n="4")
)

rollup

rollup 函数包装另一个流函数,并汇总存储桶字段上的聚合。rollup 函数依赖于底层流的排序顺序,一次汇总一个分组的聚合。因此,底层流的排序顺序必须与 rollup 函数的 over 参数中的字段匹配。

rollup 函数还需要处理整个结果集才能执行其聚合。当底层流是 search 函数时,/export 处理程序可用于向 rollup 函数提供完全排序的结果集。这种排序方法允许 rollup 函数对非常高基数的字段执行聚合。这种方法的缺点是必须对元组进行排序,并通过网络将其流式传输到工作节点才能进行聚合。对于低到中等基数字段的更快聚合,可以使用 facet 函数。

rollup 参数

  • StreamExpression(强制)

  • over:(强制)按其分组的字段列表。

  • metrics:(强制)要计算的指标列表。当前支持的指标有 sum(col)avg(col)min(col)max(col)count(*)

rollup 语法

rollup(
   search(collection1, q="*:*", qt="/export", fl="a_s,a_i,a_f", qt="/export", sort="a_s asc"),
   over="a_s",
   sum(a_i),
   sum(a_f),
   min(a_i),
   min(a_f),
   max(a_i),
   max(a_f),
   avg(a_i),
   avg(a_f),
   count(*)
)

该示例展示了包装 search 函数的 rollup 函数。请注意,search 函数使用 /export 处理程序向 rollup 流提供整个结果集。还要注意,search 函数的 sort 参数与 rollup 的 over 参数匹配。这允许 rollup 函数一次汇总一个组的 a_s 字段。

scoreNodes

请参阅 图遍历中的部分。

select

select 函数包装一个流表达式并输出包含来自输入元组的字段的子集或修改后的字段的元组。输出元组中包含的字段列表可以包含别名,以有效地重命名字段。select 流支持操作和评估器。可以提供一个操作和评估器列表,以对任何字段执行操作,例如 replace, add, if 等。

select 参数

  • StreamExpression

  • fieldName:要包含在输出元组中的字段的名称(可以包含多个),例如 outputTuple[fieldName] = inputTuple[fieldName]fieldName 可以是通配符模式,例如 a_*,以选择所有以 a_ 开头的字段。

  • fieldName as aliasFieldName:要包含在输出元组中的别名字段名称(可以包含多个),例如 outputTuple[aliasFieldName] = incomingTuple[fieldName]

  • replace(fieldName, value, withValue=replacementValue):如果 incomingTuple[fieldName] == value,则 outgoingTuple[fieldName] 将设置为 replacementValuevalue 可以是字符串“null”,以用其他值替换 null 值。

  • replace(fieldName, value, withField=otherFieldName):如果 incomingTuple[fieldName] == value,则 outgoingTuple[fieldName] 将设置为 incomingTuple[otherFieldName] 的值。value 可以是字符串“null”,以用其他值替换 null 值。

select 语法

// output tuples with fields teamName, wins, losses, and winPercentages where a null value for wins or losses is translated to the value of 0
select(
  search(collection1, fl="id,teamName_s,wins,losses", q="*:*", qt="/export", sort="id asc"),
  teamName_s as teamName,
  wins,
  losses,
  replace(wins,null,withValue=0),
  replace(losses,null,withValue=0),
  if(eq(0,wins), 0, div(add(wins,losses), wins)) as winPercentage
)

sort

sort 函数包装一个流表达式并重新排序元组。sort 函数以新的排序顺序发出所有传入元组。sort 函数从传入流中读取所有元组,使用具有 O(nlog(n)) 性能特征的算法重新排序它们,其中 n 是传入流中元组的总数,然后以新的排序顺序输出元组。由于所有元组都读入内存,因此此函数的内存消耗随着传入流中元组的数量线性增长。

sort 参数

  • StreamExpression

  • by:重新排序元组的排序条件

sort 语法

下面的表达式查找狗主人,并按主人和宠物姓名对结果进行排序。请注意,它首先按人/主人 ID 排序,然后按主人和宠物姓名重新排序最终输出,从而使用了高效的 innerJoin。

sort(
  innerJoin(
    search(people, q="*:*", qt="/export", fl="id,name", sort="id asc"),
    search(pets, q="type:dog", qt="/export", fl="owner,petName", sort="owner asc"),
    on="id=owner"
  ),
  by="name asc, petName asc"
)

top

top 函数包装一个流表达式并重新排序元组。top 函数仅以新的排序顺序发出前 N 个元组。top 函数重新排序底层流,因此排序条件不必与底层流匹配。

top 参数

  • n:要返回的顶级元组数。

  • StreamExpression

  • sort:选择前 N 个元组的排序条件。

top 语法

下面的表达式查找底层搜索的前 3 个结果。请注意,它会反转排序顺序。top 函数会重新排序底层流的结果。

top(n=3,
     search(collection1,
            q="*:*",
            qt="/export",
            fl="id,a_s,a_i,a_f",
            sort="a_f desc, a_i desc"),
      sort="a_f asc, a_i asc")

unique

unique 函数包装一个流表达式,并基于 over 参数发出一个元组的唯一流。unique 函数依赖于底层流的排序顺序。over 参数必须与底层流的排序顺序匹配。

unique 函数实现了一个非同址唯一算法。这意味着具有相同唯一 over 字段的记录不需要在同一个分片上同址。在并行执行时,partitionKeys 参数必须与唯一 over 字段相同,以便具有相同键的记录被洗牌到同一个工作进程。

unique 参数

  • StreamExpression

  • over:唯一标准。

unique 语法

unique(
  search(collection1,
         q="*:*",
         qt="/export",
         fl="id,a_s,a_i,a_f",
         sort="a_f asc, a_i asc"),
  over="a_f")

update

update 函数包装另一个函数,并将元组发送到 SolrCloud 集合中作为文档进行索引。

update 参数

  • destinationCollection:(强制) 将对元组进行索引的集合。

  • batchSize:(可选,默认为 250) 索引批处理大小。

  • pruneVersionField:(可选,默认为 true) 是否从元组中修剪 _version_

  • StreamExpression:(强制)

update 语法

 update(destinationCollection,
        batchSize=500,
        search(collection1,
               q=*:*,
               qt="/export",
               fl="id,a_s,a_i,a_f,s_multi,i_multi",
               sort="a_f asc, a_i asc"))

上面的示例将 search 函数返回的元组发送到 destinationCollection 进行索引。

如本示例所示,将 search(…​) 包装起来是此装饰器的常见用法:将文档作为元组从集合中读取出来,以某种方式处理或修改它们,然后将它们添加回新集合中。因此,pruneVersionField=true 是默认行为——在将元组转换为 Solr 文档时删除内部流中找到的任何 _version_ 值,以防止 乐观并发 约束导致任何意外错误。