流量數

資料流是具有一些額外特性的函式:這類函式具有同步類型、可串流、在本機和遠端呼叫,以及完全可觀測。Firebase Genkit 提供 CLI 和開發人員 UI 工具,以便處理流程 (執行中、偵錯等)。

定義流程

import { defineFlow } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
  },
  async (restaurantTheme) => {
    const suggestion = makeMenuItemSuggestion(restaurantTheme);

    return suggestion;
  }
);

您可以使用 zod 定義流程的輸入和輸出結構定義。

import { defineFlow } from '@genkit-ai/flow';
import * as z from 'zod';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    inputSchema: z.string(),
    outputSchema: z.string(),
  },
  async (restaurantTheme) => {
    const suggestion = makeMenuItemSuggestion(input.restaurantTheme);

    return suggestion;
  }
);

如果指定結構定義,Genkit 就會驗證輸入和輸出的結構定義。

執行中的流程

使用 runFlow 函式執行資料流:

const response = await runFlow(menuSuggestionFlow, 'French');

您也可以使用 CLI 執行流程:

genkit flow:run menuSuggestionFlow '"French"'

直播結束

以下的簡單範例說明可以從資料流串流值的流程:

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    streamSchema: z.string(),
  },
  async (restaurantTheme, streamingCallback) => {
    if (streamingCallback) {
      makeMenuItemSuggestionsAsync(restaurantTheme).subscribe((suggestion) => {
        streamingCallback(suggestion);
      });
    }
  }
);

請注意,streamingCallback 可以未定義。只有當叫用的用戶端要求串流回應時,系統才會定義這個值。

如要在串流模式中叫用資料流,請使用 streamFlow 函式:

const response = streamFlow(menuSuggestionFlow, 'French');

for await (const suggestion of response.stream()) {
  console.log('suggestion', suggestion);
}

如果資料流未實作串流 streamFlow,行為會與 runFlow 相同。

您也可以使用 CLI 串流資料:

genkit flow:run menuSuggestionFlow '"French"' -s

部署流程

如果想透過 HTTP 存取流程,您必須先部署這個流程。Genkit 提供 Cloud Functions for Firebase 與 Cloud Run 等 Express.js 主機的整合功能。

已部署的流程支援與本機流程相同的所有功能,例如串流和觀測能力。

Firebase 專用 Cloud 函式

如要將流程與 Cloud Functions for Firebase 搭配使用,請使用 firebase 外掛程式,將 defineFlow 替換為 onFlow 並加入 authPolicy

import { onFlow } from '@genkit-ai/firebase/functions';
import { firebaseAuth } from '@genkit-ai/firebase/auth';

export const menuSuggestionFlow = onFlow(
  {
    name: 'menuSuggestionFlow',
    authPolicy: firebaseAuth((user) => {
      if (!user.email_verified) {
        throw new Error("Verified email required to run flow");
      }
    }
  },
  async (restaurantTheme) => {
    // ....
  }
);

Express.js

如要使用 Cloud Run 和類似服務部署流程,請使用 defineFlow 定義流程,然後呼叫 startFlowsServer()

import { defineFlow, startFlowsServer } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
  },
  async (restaurantTheme) => {
    // ....
  }
);

startFlowsServer();

根據預設,startFlowsServer 會將您在程式碼集中定義的所有資料流提供為 HTTP 端點 (例如 http://localhost:3400/menuSuggestionFlow)。

您可以選擇要透過流程伺服器公開的流程。您可以指定自訂通訊埠 (如果已設定,就會使用 PORT 環境變數)。您也可以進行 CORS 設定。

import { defineFlow, startFlowsServer } from '@genkit-ai/flow';

export const flowA = defineFlow({ name: 'flowA' }, async (subject) => {
  // ....
});

export const flowB = defineFlow({ name: 'flowB' }, async (subject) => {
  // ....
});

startFlowsServer({
  flows: [flowB],
  port: 4567,
  cors: {
    origin: '*',
  },
});

流程觀測能力

有時使用未經檢測的第三方 SDK 時,您可能會想將這類 SDK 視為開發人員 UI 中的獨立追蹤步驟。您只要將程式碼納入 run 函式中即可。

import { defineFlow, run } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    outputSchema: z.array(s.string()),
  },
  async (restaurantTheme) => {
    const themes = await run('find-similar-themes', async () => {
      return await findSimilarRestaurantThemes(restaurantTheme);
    });

    const suggestions = makeMenuItemSuggestions(themes);

    return suggestions;
  }
);