Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/integrate ollama #471

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
2 changes: 1 addition & 1 deletion apps/indexer-coordinator/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@subql/indexer-coordinator",
"version": "2.2.4-0",
"version": "2.2.4-3",
"description": "",
"author": "SubQuery",
"license": "Apache-2.0",
Expand Down
2 changes: 2 additions & 0 deletions apps/indexer-coordinator/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { ProjectModule } from './project/project.module';
import { RewardModule } from './reward/reward.module';
import { StatsModule } from './stats/stats.module';
import { SubscriptionModule } from './subscription/subscription.module';
import { IntegrationModule } from './integration/integration.module';

@Module({
imports: [
Expand Down Expand Up @@ -81,6 +82,7 @@ import { SubscriptionModule } from './subscription/subscription.module';
NetworkModule,
RewardModule,
ConfigModule,
IntegrationModule,
],
controllers: [AdminController, AgreementController, MonitorController],
})
Expand Down
3 changes: 3 additions & 0 deletions apps/indexer-coordinator/src/core/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import { BigNumber, ContractTransaction, Overrides } from 'ethers';
export enum DesiredStatus {
STOPPED,
RUNNING,

// LLM
PULLING,
}

export enum IndexerDeploymentStatus {
Expand Down
56 changes: 56 additions & 0 deletions apps/indexer-coordinator/src/integration/integration.model.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: Apache-2.0

import { ID, Field, ObjectType } from '@nestjs/graphql';
import {
Column,
Entity,
Index,
PrimaryGeneratedColumn,
CreateDateColumn,
UpdateDateColumn,
} from 'typeorm';
import { SeviceEndpoint } from '../project/project.model';
import { IntegrationType, LLMModel } from '../project/types';

@Entity('integration')
@Index(['title'], { unique: true })
@ObjectType()
export class IntegrationEntity {
@PrimaryGeneratedColumn('increment')
@Field(() => ID, { nullable: true })
id: number;

@Column({ type: 'varchar', length: 50 })
@Field()
title: string;

@Column()
@Field()
type: IntegrationType;

@Column('jsonb', { default: {} })
@Field(() => [SeviceEndpoint], { nullable: true })
serviceEndpoints: SeviceEndpoint[];

@Field(() => [LLMModel], { nullable: true })
models: LLMModel[];

@Column({ type: 'boolean', default: false })
@Field()
enabled: boolean;

@Column('jsonb', { default: {} })
config: any;

@Column('jsonb', { default: {} })
extra: any;

@CreateDateColumn({ type: 'timestamp', default: () => 'CURRENT_TIMESTAMP' })
@Field({ nullable: true })
created_at: Date;

@UpdateDateColumn({ type: 'timestamp', default: () => 'CURRENT_TIMESTAMP' })
@Field({ nullable: true })
updated_at: Date;
}
20 changes: 20 additions & 0 deletions apps/indexer-coordinator/src/integration/integration.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: Apache-2.0

import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { ProjectModule } from '../project/project.module';
import { IntegrationEntity } from './integration.model';
import { IntegrationResolver } from './integration.resolver';
import { IntegrationService } from './integration.service';

@Module({
imports: [
TypeOrmModule.forFeature([IntegrationEntity]),
ProjectModule
],
providers: [IntegrationService, IntegrationResolver],
exports: [IntegrationService],
})
@Module({})
export class IntegrationModule {}
69 changes: 69 additions & 0 deletions apps/indexer-coordinator/src/integration/integration.resolver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: Apache-2.0

import { Args, Mutation, Query, Resolver } from '@nestjs/graphql';
import { SeviceEndpoint } from '../project/project.model';
import {
IntegrationType,
LLMConfig,
LLMExtra,
LLMOngoingStreamRequestMeta,
} from '../project/types';
import { IntegrationEntity } from './integration.model';
import { IntegrationService } from './integration.service';

@Resolver(() => IntegrationEntity)
export class IntegrationResolver {
constructor(private integrationService: IntegrationService) {}

@Query(() => [IntegrationEntity])
async allIntegration(): Promise<IntegrationEntity[]> {
return await this.integrationService.getAll();
}

@Mutation(() => IntegrationEntity)
addIntegration(
@Args('title') title: string,
@Args('type') type: IntegrationType,
@Args('serviceEndpoints', { type: () => [SeviceEndpoint] })
serviceEndpoints: SeviceEndpoint[],

@Args('config', { nullable: true }) config?: LLMConfig,
@Args('extra', { nullable: true }) extra?: LLMExtra
): Promise<IntegrationEntity> {
return this.integrationService.create(title, type, serviceEndpoints, config, extra);
}

@Mutation(() => IntegrationEntity)
updateIntegration(
@Args('id') id: number,
@Args('title') title: string,
@Args('serviceEndpoints', { type: () => [SeviceEndpoint] })
serviceEndpoints: SeviceEndpoint[],
@Args('enabled') enabled: boolean,
@Args('config', { nullable: true }) config?: LLMConfig,
@Args('extra', { nullable: true }) extra?: LLMExtra
): Promise<IntegrationEntity> {
return this.integrationService.update(id, title, serviceEndpoints, enabled, config, extra);
}

@Mutation(() => IntegrationEntity)
deleteIntegration(@Args('id') id: number): Promise<IntegrationEntity> {
return this.integrationService.delete(id);
}

@Mutation(() => IntegrationEntity)
deleteModel(@Args('id') id: number, @Args('name') name: string): Promise<IntegrationEntity> {
return this.integrationService.deleteModel(id, name);
}

@Mutation(() => IntegrationEntity)
pullModel(@Args('id') id: number, @Args('name') name: string): Promise<IntegrationEntity> {
return this.integrationService.pullModel(id, name);
}

@Query(() => [LLMOngoingStreamRequestMeta])
inspectOngoingStreamedRequests(): LLMOngoingStreamRequestMeta[] {
return this.integrationService.inspectOngoingStreamedRequests();
}
}
147 changes: 147 additions & 0 deletions apps/indexer-coordinator/src/integration/integration.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: Apache-2.0

import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { ProjectLLMService } from '../project/project.llm.service';
import { SeviceEndpoint, ValidationResponse } from '../project/project.model';
import { IntegrationType, LLMConfig, LLMExtra, LLMModelPullResult, LLMOngoingStreamRequestMeta } from '../project/types';
import { IntegrationEntity } from './integration.model';

@Injectable()
export class IntegrationService {
constructor(
@InjectRepository(IntegrationEntity)
private integrationRepo: Repository<IntegrationEntity>,
private projectLLMService: ProjectLLMService
) {}

async get(id: number): Promise<IntegrationEntity> {
return this.integrationRepo.findOne({ where: { id } });
}

async getAll(): Promise<IntegrationEntity[]> {
const integrations = await this.integrationRepo.find();

for (const it of integrations) {
if (it.type === IntegrationType.LLM && it.serviceEndpoints.length > 0) {
const models = await this.projectLLMService.getModels(it.serviceEndpoints[0].value);
it.models = models;
}
}

return integrations;
}

async create(
title: string,
type: IntegrationType,
serviceEndpoints: SeviceEndpoint[],
config?: LLMConfig,
extra?: LLMExtra
): Promise<IntegrationEntity> {
let integration = await this.integrationRepo.findOne({ where: { title } });
if (integration) {
throw new Error(`${title} already exist`);
}

let validateResult: ValidationResponse = { valid: true, reason: '' };
switch (type) {
case IntegrationType.LLM:
validateResult = await this.projectLLMService.validate(serviceEndpoints[0].value);
break;
default:
throw new Error('Unsupported integration type');
}

if (!validateResult.valid) {
throw new Error(validateResult.reason);
}

integration = new IntegrationEntity();
integration.title = title;
integration.type = type;
integration.enabled = true;
integration.serviceEndpoints = serviceEndpoints;
integration.config = config || {};
integration.extra = extra || {};
return this.integrationRepo.save(integration);
}

async update(
id: number,
title: string,
serviceEndpoints: SeviceEndpoint[],
enabled: boolean,
config?: LLMConfig,
extra?: LLMExtra
): Promise<IntegrationEntity> {
let integration = await this.integrationRepo.findOne({ where: { title } });
if (integration && integration.id !== id) {
throw new Error(`${title} already exist`);
}

integration = await this.integrationRepo.findOne({ where: { id } });
if (!integration) {
throw new Error(`${id} not exist`);
}

let validateResult: ValidationResponse = { valid: true, reason: '' };
switch (integration.type) {
case IntegrationType.LLM:
validateResult = await this.projectLLMService.validate(serviceEndpoints[0].value);
break;
default:
throw new Error('Unsupported integration type');
}

if (!validateResult.valid) {
throw new Error(validateResult.reason);
}

integration.title = title;
integration.enabled = true;
integration.serviceEndpoints = serviceEndpoints;
integration.enabled = enabled;
integration.config = config || {};
integration.extra = extra || {};
return this.integrationRepo.save(integration);
}

async delete(id: number): Promise<IntegrationEntity> {
const integration = await this.integrationRepo.findOne({ where: { id } });
if (!integration) return;
return this.integrationRepo.remove(integration);
}

async deleteModel(id: number, name: string): Promise<IntegrationEntity> {
const integration = await this.integrationRepo.findOne({ where: { id } });
if (!integration) {
throw new Error(`${id} not exist`);
}
if (integration.type !== IntegrationType.LLM) {
throw new Error(`${id} not supported`);
}
const host = integration.serviceEndpoints[0].value;
await this.projectLLMService.deleteModel(host, name);
return integration;
}

async pullModel(id: number, name: string) {
const integration = await this.integrationRepo.findOne({ where: { id } });
if (!integration) {
throw new Error(`${id} not exist`);
}
if (integration.type !== IntegrationType.LLM) {
throw new Error(`${id} not supported`);
}
const host = integration.serviceEndpoints[0].value;
this.projectLLMService.pullModel(host, name);
return integration;
}

inspectOngoingStreamedRequests(): LLMOngoingStreamRequestMeta[] {
return this.projectLLMService.inspectOngoingStreamedRequests();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: Apache-2.0

import { MigrationInterface, QueryRunner } from 'typeorm';

export class AddIntegrationTable1721982447267 implements MigrationInterface {
name = 'AddIntegrationTable1721982447267';

async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`CREATE TABLE "integration" ("id" SERIAL NOT NULL, "title" character varying(50) NOT NULL, "type" integer NOT NULL, "serviceEndpoints" jsonb NOT NULL DEFAULT '{}', "enabled" boolean NOT NULL DEFAULT false, "config" jsonb NOT NULL DEFAULT '{}', "extra" jsonb NOT NULL DEFAULT '{}', "created_at" TIMESTAMP NOT NULL DEFAULT now(), "updated_at" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "PK_f348d4694945d9dc4c7049a178a" PRIMARY KEY ("id"))`
);
await queryRunner.query(
`CREATE UNIQUE INDEX "IDX_814dc61a29c5383dc90993603f" ON "integration" ("title") `
);
}

async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP INDEX "public"."IDX_814dc61a29c5383dc90993603f"`);
await queryRunner.query(`DROP TABLE "integration"`);
}
}
Loading