資料流是具有一些額外特性的函式:這類函式具有同步類型、可串流、在本機和遠端呼叫,以及完全可觀測。Firebase Genkit 提供 CLI 和開發人員 UI 工具,以便處理流程 (執行中、偵錯等)。
定義流程
import { defineFlow } from '@genkit-ai/flow';
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
},
async (restaurantTheme) => {
const suggestion = makeMenuItemSuggestion(restaurantTheme);
return suggestion;
}
);
您可以使用 zod
定義流程的輸入和輸出結構定義。
import { defineFlow } from '@genkit-ai/flow';
import * as z from 'zod';
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
inputSchema: z.string(),
outputSchema: z.string(),
},
async (restaurantTheme) => {
const suggestion = makeMenuItemSuggestion(input.restaurantTheme);
return suggestion;
}
);
如果指定結構定義,Genkit 就會驗證輸入和輸出的結構定義。
執行中的流程
使用 runFlow
函式執行資料流:
const response = await runFlow(menuSuggestionFlow, 'French');
您也可以使用 CLI 執行流程:
genkit flow:run menuSuggestionFlow '"French"'
直播結束
以下的簡單範例說明可以從資料流串流值的流程:
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
streamSchema: z.string(),
},
async (restaurantTheme, streamingCallback) => {
if (streamingCallback) {
makeMenuItemSuggestionsAsync(restaurantTheme).subscribe((suggestion) => {
streamingCallback(suggestion);
});
}
}
);
請注意,streamingCallback
可以未定義。只有當叫用的用戶端要求串流回應時,系統才會定義這個值。
如要在串流模式中叫用資料流,請使用 streamFlow
函式:
const response = streamFlow(menuSuggestionFlow, 'French');
for await (const suggestion of response.stream()) {
console.log('suggestion', suggestion);
}
如果資料流未實作串流 streamFlow
,行為會與 runFlow
相同。
您也可以使用 CLI 串流資料:
genkit flow:run menuSuggestionFlow '"French"' -s
部署流程
如果想透過 HTTP 存取流程,您必須先部署這個流程。Genkit 提供 Cloud Functions for Firebase 與 Cloud Run 等 Express.js 主機的整合功能。
已部署的流程支援與本機流程相同的所有功能,例如串流和觀測能力。
Firebase 專用 Cloud 函式
如要將流程與 Cloud Functions for Firebase 搭配使用,請使用 firebase
外掛程式,將 defineFlow
替換為 onFlow
並加入 authPolicy
。
import { onFlow } from '@genkit-ai/firebase/functions';
import { firebaseAuth } from '@genkit-ai/firebase/auth';
export const menuSuggestionFlow = onFlow(
{
name: 'menuSuggestionFlow',
authPolicy: firebaseAuth((user) => {
if (!user.email_verified) {
throw new Error("Verified email required to run flow");
}
}
},
async (restaurantTheme) => {
// ....
}
);
Express.js
如要使用 Cloud Run 和類似服務部署流程,請使用 defineFlow
定義流程,然後呼叫 startFlowsServer()
:
import { defineFlow, startFlowsServer } from '@genkit-ai/flow';
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
},
async (restaurantTheme) => {
// ....
}
);
startFlowsServer();
根據預設,startFlowsServer
會將您在程式碼集中定義的所有資料流提供為 HTTP 端點 (例如 http://localhost:3400/menuSuggestionFlow
)。
您可以選擇要透過流程伺服器公開的流程。您可以指定自訂通訊埠 (如果已設定,就會使用 PORT
環境變數)。您也可以進行 CORS 設定。
import { defineFlow, startFlowsServer } from '@genkit-ai/flow';
export const flowA = defineFlow({ name: 'flowA' }, async (subject) => {
// ....
});
export const flowB = defineFlow({ name: 'flowB' }, async (subject) => {
// ....
});
startFlowsServer({
flows: [flowB],
port: 4567,
cors: {
origin: '*',
},
});
流程觀測能力
有時使用未經檢測的第三方 SDK 時,您可能會想將這類 SDK 視為開發人員 UI 中的獨立追蹤步驟。您只要將程式碼納入 run
函式中即可。
import { defineFlow, run } from '@genkit-ai/flow';
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
outputSchema: z.array(s.string()),
},
async (restaurantTheme) => {
const themes = await run('find-similar-themes', async () => {
return await findSimilarRestaurantThemes(restaurantTheme);
});
const suggestions = makeMenuItemSuggestions(themes);
return suggestions;
}
);