资产织造

title: 确认认购流程 description: 用户在前端页面提交认购请求,输入认购金额并点击确认,前端调用智能合约的 subscribe 函数,传入认购金额和相关参数... date: "2026-03-06" tags: [RWA, Web3, Next.js] cover: /images/rwa-cover.jpg

1.认购流程逐步说明

1.1 用户在前端页面提交认购请求,输入认购金额并点击确认。

1.2 前端调用智能合约的 subscribe 函数,传入认购金额和相关参数。

1.3 智能合约处理认购请求,执行相关逻辑(如验证金额、更新状态等),并且触发 Subscribed 事件,记录投资人地址、认购 ID 和认购金额。

1.4 这个事件会被区块链记录在日志中,我们通过定期调用 /api/cron/sync 接口来同步这些事件数据到我们的数据库中。

1.5 同步完成后,数据库中的投资记录会被更新,前端可以查询这些数据并展示给用户,显示他们的认购记录和相关信息.

2. emit Subscribed(msg.sender, id, usdtAmount);这步做了什么

在智能合约中,emit Subscribed(msg.sender, id, usdtAmount); 这行代码的作用是触发一个事件(Event),这个事件叫做 Subscribed,并且携带了三个参数:msg.sender(调用者的地址)、id(认购的 ID)和 usdtAmount(认购的 USDT 金额)。 事件是智能合约中的一种机制,用于在链上记录特定的操作或状态变化。当这个事件被触发时,它会被记录在区块链的日志中,任何人都可以通过监听这个事件来获取相关的信息。 在这个例子中,当投资人提交认购请求并且系统处理完成后,Subscribed 事件会被触发,记录下投资人的地址、认购的 ID 和认购的金额。这对于后续的数据索引和查询非常有用,因为我们可以通过监听这个事件来获取所有的认购记录,并且可以根据投资人地址或认购 ID 来筛选特定的记录。

/api/cron/sync 这个接口的作用是同步区块链上的事件数据到我们的数据库中。它会监听智能合约中触发的事件,例如 Subscribed,并将这些事件的数据(如投资人地址、认购 ID、认购金额等)存储到数据库中,以便我们在前端展示和查询这些数据。通过定期调用这个接口,我们可以确保数据库中的数据与区块链上的事件保持同步,从而提供实时的投资信息给用户。

/* eslint-disable @typescript-eslint/no-explicit-any */
import { NextRequest, NextResponse } from "next/server";
import { ethers } from "ethers";
import pool from "@/lib/db";
import RWAArtifact from "@/abi/RWAPlatform1155.json";

const MAX_BLOCK_RANGE = 2000;
const LOCK_ID = 987654; // 随便一个固定整数

export async function POST(req: NextRequest) {

  if (
    req.headers.get("authorization") !== `Bearer ${process.env.CRON_SECRET}`
  ) {
    return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
  }

  const client = await pool.connect();

  try {
    await client.query("BEGIN");

    // 🔒 防止并发 cron
    await client.query("SELECT pg_advisory_xact_lock($1)", [LOCK_ID]);

    const provider = new ethers.JsonRpcProvider(process.env.RPC_URL);
    const contract = new ethers.Contract(
      process.env.RWA_CONTRACT!,
      RWAArtifact.abi,
      provider,
    );

    const iface = new ethers.Interface(RWAArtifact.abi);

    // 1️⃣ 读取上次同步区块
    const { rows } = await client.query(
      "SELECT last_block FROM chain_sync WHERE id = 1 FOR UPDATE",
    );

    const lastBlock = Number(rows[0]?.last_block ?? 0);

    // 2️⃣ 当前区块
    const currentBlock = await provider.getBlockNumber();

    console.log(`Current block: ${currentBlock}`);

    if (currentBlock <= lastBlock) {
      await client.query("COMMIT");
      return NextResponse.json({ message: "No new blocks" });
    }

    // 3️⃣ 分段同步
    const toBlock =
      currentBlock - lastBlock > MAX_BLOCK_RANGE
        ? lastBlock + MAX_BLOCK_RANGE
        : currentBlock;

    console.log(`Syncing from block ${lastBlock + 1} to ${toBlock}`);

    const filter = contract.filters.Subscribed();

    const logs = await contract.queryFilter(filter, lastBlock + 1, toBlock);

    // 🧠 用 Map 防止重复读取同一资产
    const updatedAssets = new Map<number, boolean>();

    for (const log of logs) {
      try {
        const parsed = iface.parseLog(log);

        if (parsed?.name !== "Subscribed") continue;

        const investor = parsed.args.investor as string;
        const id = Number(parsed.args.id);
        const usdtAmount = parsed.args.usdtAmount as bigint;

        const usdtHuman = Number(ethers.formatUnits(usdtAmount, 6));

        // 1️⃣ 插入投资记录(防重复)
        await client.query(
          `
          INSERT INTO investments (
            asset_token_id,
            investor_address,
            usdt_amount,
            tx_hash,
            block_number
          )
          VALUES ($1,$2,$3,$4,$5)
          ON CONFLICT (tx_hash) DO NOTHING
          `,
          [id, investor, usdtHuman, log.transactionHash, log.blockNumber],
        );

        // 2️⃣ 每个资产只读取一次链上 totalRaised
        if (!updatedAssets.has(id)) {
          const asset = await contract.assets(id);

          const totalRaisedHuman = Number(
            ethers.formatUnits(asset.totalRaised, 6),
          );

          await client.query(
            `
            UPDATE assets
            SET total_raised = $1,
                updated_at = now()
            WHERE token_id = $2
            `,
            [totalRaisedHuman, id],
          );

          updatedAssets.set(id, true);
        }
      } catch {
        // 解析失败忽略
      }
    }

    // 4️⃣ 更新 last_block(关键修复点)
    await client.query(
      "UPDATE chain_sync SET last_block = $1 WHERE id = 1",
      [toBlock], // ✅ 这里不能是 [[toBlock]]
    );

    await client.query("COMMIT");

    return NextResponse.json({
      success: true,
      fromBlock: lastBlock + 1,
      toBlock,
      eventsFound: logs.length,
      currentBlock:currentBlock,
    });
  } catch (err: any) {
    await client.query("ROLLBACK");

    return NextResponse.json({ error: err.message }, { status: 500 });
  } finally {
    client.release();
  }
}

https://sepolia.etherscan.io/blocks/

这个链接可以看到链上所有区块的信息,包括区块高度、出块时间、出块矿工、区块奖励等。通过这个链接,我们可以监控区块链的状态,查看最新的区块以及历史区块的详情。 这对于开发和调试链上应用非常有帮助,因为我们可以通过查看区块信息来验证我们的交易是否被成功打包进区块,以及了解当前网络的拥堵情况和交易费用等信息。

当前的区块高度是10393460,我们上次同步的区块高度是10392000,那么我们需要同步的区块范围就是从10392001到10393460。通过访问 https://sepolia.etherscan.io/blocks/ ,我们可以查看这些区块的信息,确认我们的交易是否已经被包含在这些区块中,以及了解这些区块的其他相关信息。

这个同步函数 ,放在哪里执行比较合适?我们需要定期执行这个函数来保持数据库与区块链数据的同步。通常有以下几种方式:

  1. 服务器定时任务:可以在服务器上设置一个定时任务(如 cron job)来定期调用这个函数,例如每隔几分钟执行一次。
  2. 云函数:如果使用云服务(如 AWS Lambda、Google Cloud Functions等),可以设置一个定时触发器来调用这个函数。
  3. 区块监听:可以使用区块链事件监听器(如 web3.js 或 ethers.js)来实时监听区块链上的事件,当检测到新的区块或特定事件时,立即调用这个函数进行同步。 根据实际需求和资源情况,可以选择适合的方式来执行这个同步函数,确保数据库中的数据能够及时更新,反映区块链上的最新状态。

当前我的方案是在 cron-job.org注册了一个函数 没5分钟调用一次这个接口,保持数据的实时性和准确性。通过这种方式,我们可以确保数据库中的数据与区块链上的事件保持同步,从而提供实时的投资信息给用户。

3.HTTP 协议定义了很多请求方法(HTTP Methods),不仅仅是 GET、POST,每种方法有不同用途、幂等性和安全性

| 方法          | 用途          | 参数传递       | 幂等  | 是否安全 | 说明                            |
| ----------- | ----------- | ---------- | --- | ---- | ----------------------------- |
| **GET**     | 获取资源        | URL query  | ✅ 是 | ✅ 是  | 只读,不修改服务器数据,浏览器可缓存            |
| **POST**    | 创建资源 / 提交数据 | Body       | ❌ 否 | ❌ 否  | 会修改服务器状态,如提交表单、交易等            |
| **PUT**     | 更新资源(完整替换)  | Body       | ✅ 是 | ❌ 否  | 将整个资源替换为请求体,幂等(多次同样请求结果相同)    |
| **PATCH**   | 更新资源(部分更新)  | Body       | ❌ 否 | ❌ 否  | 只修改部分字段,非幂等,常用于更新某个属性         |
| **DELETE**  | 删除资源        | URL / Body | ✅ 是 | ❌ 否  | 删除操作,多次删除同一资源结果相同(幂等)         |
| **HEAD**    | 获取资源元信息     | URL query  | ✅ 是 | ✅ 是  | 与 GET 类似,但只返回响应头,不返回 Body     |
| **OPTIONS** | 查看支持的方法     | URL / Body | ✅ 是 | ✅ 是  | 查询服务器或接口支持哪些 HTTP 方法,CORS 会用到 |
| **TRACE**   | 回显请求        | URL / Body | ✅ 是 | ❌ 否  | 用于调试,请求返回服务器收到的原始请求           |
| **CONNECT** | 建立隧道        | URL        | ❌ 否 | ❌ 否  | 用于代理服务器(如 HTTPS 隧道)           |

4.查看具体交易里的事件

https://sepolia.etherscan.io/tx/交易hash

Logs

Event Logs
Subscribed
investor: 0x123...
id: 2
usdtAmount: 50000000

通过访问 https://sepolia.etherscan.io/tx/交易hash,我们可以查看具体交易的详情,包括交易的输入、输出、状态以及相关的事件日志。

在事件日志中,我们可以看到 Subscribed 事件被触发,并且包含了事件参数,如投资人地址(investor)、认购 ID(id)和认购金额(usdtAmount)。

这对于验证我们的智能合约是否正确触发了事件,以及获取事件相关的数据非常有帮助。

5.为什么我们的代码可以访问到?

因为我们在前端代码中使用了 fetch("/api/cron/sync", { headers: { Authorization: Bearer process.env.CRON_SECRET } }) 来调用这个接口。

这个接口是我们在 Next.js 中定义的一个 API 路由,位于 app/api/cron/sync/route.ts。当我们调用这个接口时,Next.js 会自动处理这个请求,并且执行我们在 route.ts 中定义的逻辑.

const filter = contract.filters.Subscribed();

const logs = await contract.queryFilter(filter, fromBlock, toBlock);

逻辑是
区块 → 交易 → logs → Subscribed event

ethers.js 自动帮你解析:

const parsed = iface.parseLog(log);

parsed.args.investor
parsed.args.id
parsed.args.usdtAmount

通过这种方式,我们可以在前端代码中访问到这个接口,并且通过它来同步区块链上的事件数据到我们的数据库中,从而保持数据的实时性和准确性。

6.为什么 Web3 都用 Event

事件的作用

| 用途      | 说明    |
| ------- | ----- |
| 前端监听    | UI 更新 |
| Indexer | 数据库同步 |
| 分析      | 投资记录  |
| 通知      | 用户消息  |


例如: Uniswap OpenSea Aave

全部依赖 event logs

7.我的 RWA 认购流程

用户认购
     │
     ▼
subscribe()
     │
     ▼
emit Subscribed
     │
     ▼
区块日志
     │
     ▼
/api/cron/sync
     │
     ▼
PostgreSQL investments 表
     │
     ▼
前端显示认购记录