将 Rust 绑定到 .NET 10:Oxigraph 的 FFI 桥接实践
一、架构概览
┌──────────────────────────────────────────────────────────┐ │ .NET 应用层 │ │ ┌──────────┐ ┌────────────────┐ ┌──────────────────┐ │ │ │ Model │ │ Store │ │ CustomFunctions │ │ │ │ (record) │ │ (SPARQL+I/O) │ │ (C#→Rust 回调) │ │ │ └────┬─────┘ └───────┬────────┘ └────────┬─────────┘ │ │ │ │ │ │ │ ┌────┴────────────────┴────────────────────┴─────────┐ │ │ │ Interop 层 (FFI/Json) │ │ │ │ NativeMethods.g.cs │ SafeHandles │ FFIHelper │ │ │ │ StreamInterop.cs │ Term JSON Converters │ │ │ └──────────────────────┬──────────────────────────────┘ │ └─────────────────────────┼─────────────────────────────────┘ │ P/Invoke + JSON 字符串 ┌─────────────────────────┼─────────────────────────────────┐ │ oxigraph_dotnet.dll │ (Rust cdylib) │ │ ┌──────────────────────┴──────────────────────────────┐ │ │ │ ffi.rs (~2774 行) │ │ │ │ ├─ Store CRUD/Match/SPARQL │ │ │ │ ├─ File/Stream/Callback I/O │ │ │ │ ├─ Lazy Query Iterator │ │ │ │ ├─ Chunked Bulk Loader │ │ │ │ ├─ QueryResults Serialization │ │ │ │ ├─ Dataset (In-Memory) + Canonicalization │ │ │ │ └─ Custom Functions / Aggregate │ │ │ ├─ stream_ffi.rs: CallbackReader / CallbackWriter │ │ │ ├─ error.rs: {"ok":...} / {"error":...} 协议 │ │ │ └─ model_ffi.rs: JSON↔Quad 序列化 │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ oxigraph + oxrdf (Rust crate) + RocksDB │ │ │ └─────────────────────────────────────────────────────┘ │ └──────────────────────────────────────────────────────────┘核心设计原则:零复制复杂数据类型,全量 JSON 序列化跨 FFI 边界。Rust 侧所有函数签名遵循统一模式:(*const c_char) → *mut c_char,输入和输出都是 JSON 字符串。
二、工具链
2.1 Rust 侧:cdylib
dotnet/src/oxigraph-dotnet/Cargo.toml:
[package] name = "oxigraph-dotnet" version = "0.5.7" edition = "2024" [lib] crate-type = ["cdylib"] # ← 编译为动态链接库 [dependencies] oxigraph = { path = "../../../lib/oxigraph" } oxrdf = { path = "../../../lib/oxrdf", features = ["serde"] } serde = { version = "1", features = ["derive"] } serde_json = "1" [profile.release] opt-level = "z" # 体积优化(减小 .dll 尺寸) lto = true # 链接时优化产物:
- Windows:
oxigraph_dotnet.dll - Linux:
liboxigraph_dotnet.so - macOS:
liboxigraph_dotnet.dylib
2.2 .NET 侧:LibraryImport生成源
.NET 10 使用源生成器LibraryImport(取代旧的DllImport)进行 P/Invoke:
// NativeMethods.g.cs — 自动生成的 FFI 绑定 internal static partial class OxigraphNative { private const string LibName = "oxigraph_dotnet"; [LibraryImport(LibName, EntryPoint = "oxigraph_store_open", StringMarshalling = StringMarshalling.Utf8)] internal static partial IntPtr store_open(string path); [LibraryImport(LibName, EntryPoint = "oxigraph_store_query", StringMarshalling = StringMarshalling.Utf8)] internal static partial IntPtr store_query(IntPtr handle, string queryJson); [LibraryImport(LibName, EntryPoint = "oxigraph_register_custom_function", StringMarshalling = StringMarshalling.Utf8)] internal static partial IntPtr register_custom_function( string name, IntPtr callback); // ... 共 60+ 个 FFI 函数 }关键差异:LibraryImport是源生成器模式,编译时生成封送代码,避免了DllImport的运行时 IL 生成和 AOT 兼容问题。
2.3 构建流水线
build_package.py编排整个构建过程:
# 步骤 1: 编译 Rust cdylib cargo build --release -p oxigraph-dotnet --features rocksdb # 步骤 2: 编译 C# 项目 dotnet build dotnet/ -c Release # 步骤 3: 将 .dll 拷贝到测试输出目录 # 步骤 4: 运行 xUnit 测试 dotnet test dotnet/tests/Oxigraph.Tests -c Release三、数据协议:JSON over FFI
3.1 统一响应格式
所有 Rust FFI 函数返回*mut c_char(JSON 字符串),包含两种可能的格式:
{"ok": <result>} // 或 {"error": {"kind": "store"|"parse"|"invalid_argument", "message": "..."}}C# 侧FFIHelper统一处理:
// 带返回值的调用 internal static T Call<T>(Func<IntPtr> ffiCall) where T : class { IntPtr jsonPtr = ffiCall(); string json = ReadAndFree(jsonPtr); // Marshal.PtrToStringUTF8 + free_string ThrowIfError(json); // 检查 {"error":...} using var doc = JsonDocument.Parse(json); return JsonSerializer.Deserialize<T>( doc.RootElement.GetProperty("ok").GetRawText())!; } // 无返回值的调用 internal static void CallVoid(Func<IntPtr> ffiCall) { IntPtr jsonPtr = ffiCall(); string json = ReadAndFree(jsonPtr); ThrowIfError(json); }3.2 错误映射
Rust 错误类型自动映射为 C# 异常:
private static Exception MapError(string kind, string message) { return kind switch { "store" => new StoreException(message), "parse" => new ParseException(message), "invalid_argument" => new ArgumentException(message), _ => new OxigraphException($"[{kind}] {message}"), }; }3.3 不透明句柄(Opaque Handles)
Rust 对象不直接暴露给 C#,而是通过指针传递:
// Rust 侧:Store 包装为 Box<UnsafeCell<Store>>,返回指针的 u64 表示 fn store_to_handle(store: Store) -> *mut c_char { let boxed = Box::new(UnsafeCell::new(store)); let ptr = Box::into_raw(boxed); let handle_value = ptr as u64; // 返回 {"ok":{"handle":12345678}} ok_json(&handle_value) }// C# 侧:SafeHandle 封装原始指针,确保析构 internal sealed class StoreSafeHandle : SafeHandleZeroOrMinusOneIsInvalid { protected override bool ReleaseHandle() { OxigraphNative.store_destroy(handle); // Rust 侧 drop(Box::from_raw(ptr)) return true; } }所有句柄类型:
| Rust 类型 | C# SafeHandle | 用途 |
|---|---|---|
StoreHandle=*mut UnsafeCell<Store> | StoreSafeHandle | Store 读写 |
DatasetHandle=*mut UnsafeCell<Dataset> | DatasetSafeHandle | Dataset |
QuadIterHandle=*mut UnsafeCell<ReaderQuadParser> | QuadIterSafeHandle | 懒解析迭代器 |
QueryResultsHandle=*mut QueryResultsWrapper | QueryResultsSafeHandle | 流式查询结果 |
四、Stream 回调机制
对于 .NET Stream 的读写(LoadFromStream/DumpToStream),无法简单地把 Stream
传过去,因为 Rust 不认识 .NET 的Stream类型。解决方案:C 风格回调。
4.1 Rust 侧:实现Read/Writetrait
pub type ReadFn = unsafe extern "C" fn( context: *mut c_void, buf: *mut u8, buf_size: i32 ) -> i32; pub struct CallbackReader { context: *mut c_void, callback: ReadFn, } impl Read for CallbackReader { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { let result = unsafe { (self.callback)(self.context, buf.as_mut_ptr(), buf.len() as i32) }; match result { n if n > 0 => Ok(n as usize), 0 => Ok(0), // EOF -1 => Err(...), // Error } } }CallbackWriter对Writetrait 实现类似。
4.2 C# 侧:GCHandle 固定的委托
[UnmanagedFunctionPointer(CallingConvention.Cdecl)] internal delegate int ReadCallback(IntPtr context, IntPtr buffer, int bufferSize); internal sealed class ReadContext : IDisposable { private readonly Stream _stream; private readonly byte[] _buffer; private readonly GCHandle _gcHandle; // ← 防止 GC 回收委托 public readonly IntPtr ContextPtr; public readonly ReadCallback Callback; public ReadContext(Stream stream) { _stream = stream; Callback = ReadImpl; _gcHandle = GCHandle.Alloc(Callback); // 固定委托 ContextPtr = (IntPtr)_gcHandle; // 作为 context 传递 } private int ReadImpl(IntPtr context, IntPtr buffer, int bufferSize) { try { int read = _stream.Read(_buffer, 0, Math.Min(bufferSize, _buffer.Length)); Marshal.Copy(_buffer, 0, buffer, read); return read; } catch { return -1; } } public void Dispose() { _gcHandle.Free(); } }调用链:C# Stream.Read()→ 拷贝到 Rust buffer → RustReadtrait →RdfParser.for_reader()
五、自定义函数回调桥
SPARQL 自定义函数需要 Rust 在执行查询时回调到 C#。这是最复杂的 FFI 场景:Rust 调用 C#,C# 再把结果返回给 Rust。
5.1 简单自定义函数
// Rust 侧:函数指针类型 type CustomFnCallback = unsafe extern "C" fn(args_json: *const c_char) -> *mut c_char; // 全局注册表 static CUSTOM_FUNCTIONS: LazyLock<Mutex<HashMap<String, CustomFnCallback>>> = ...; #[unsafe(no_mangle)] pub extern "C" fn oxigraph_register_custom_function( name: *const c_char, callback: CustomFnCallback, // C# 函数的指针 ) -> *mut c_char { CUSTOM_FUNCTIONS.lock().unwrap().insert(name_str, callback); ok_json(&"registered") }Rust 收到callback后,在 SPARQL 评估时调用它:
Rust SPARQL evaluator → 遇到 my:func(?x) → 查找 CUSTOM_FUNCTIONS["http://example.com/myFunc"] → 调用 callback(args_json) // ← 跨 FFI 边界回调到 C# → 收到返回的 result_json → 继续 SPARQL 评估C# 侧:
// BridgeDelegate:接收 JSON 参数数组,返回 JSON Term private delegate IntPtr BridgeDelegate(IntPtr argsJsonPtr); private static IntPtr BridgeImpl(IntPtr argsJsonPtr) { var json = Marshal.PtrToStringUTF8(argsJsonPtr); // json: ["http://example.com/myFunc", {"type":"literal","value":"hello"}] using var doc = JsonDocument.Parse(json); var name = doc.RootElement[0].GetString(); // 解析剩余元素为 ITerm[] var terms = ...; // 调用 .NET 函数 var result = _functions[name](terms); // 序列化结果返回给 Rust return Marshal.StringToHGlobalAnsi(JsonSerializer.Serialize(result)); } // 固定 BridgeDelegate,防止 GC 回收 private static readonly GCHandle _gcHandle = GCHandle.Alloc(_bridge); private static readonly IntPtr _bridgePtr = Marshal.GetFunctionPointerForDelegate(_bridge);5.2 自定义聚合函数
聚合函数需要更复杂的 4 回调协议:
Rust 调用: new_fn() → 返回 ctx handle(GCHandle 包装的 C# 对象) acc_fn(ctx, term_json) → 每行数据调用一次 finish_fn(ctx) → 返回聚合结果 free_fn(ctx) → 释放 C# 对象// Rust 适配器:实现 oxigraph::sparql::AggregateFunctionAccumulator struct CallbackAggregateAccumulator { ctx: *mut c_void, acc_fn: AggregateAccCallback, finish_fn: AggregateFinishCallback, free_fn: AggregateFreeCallback, } impl AggregateFunctionAccumulator for CallbackAggregateAccumulator { fn accumulate(&mut self, element: Term) { let json = serde_json::to_string(&element).unwrap(); let c_str = CString::new(json).unwrap(); unsafe { (self.acc_fn)(self.ctx, c_str.as_ptr()) }; } fn finish(&mut self) -> Option<Term> { let ptr = unsafe { (self.finish_fn)(self.ctx) }; // 解析 C# 返回的 JSON Term 或 null if ptr.is_null() { return None; } let json = unsafe { c_str_to_str(ptr) }; serde_json::from_str(json).unwrap_or(None) } }六、RDF 数据类型的 JSON 序列化
Rust 的 serde 格式直接映射到 C# 的System.Text.Json。两端使用相同的tagged-enum JSON 模式:
// NamedNode {"type": "uri", "value": "http://example.com/s"} // BlankNode {"type": "bnode", "value": "b1_abc123"} // Literal (plain) {"type": "literal", "value": "hello"} // Literal (language-tagged) {"type": "literal", "value": "bonjour", "language": "fr"} // Literal (typed) {"type": "literal", "value": "42", "datatype": {"type":"uri","value":"http://www.w3.org/2001/XMLSchema#integer"}} // Triple (RDF-star) {"type": "triple", "subject": {"type":"uri","value":"http://s"}, "predicate": {"type":"uri","value":"http://p"}, "object": {"type":"uri","value":"http://o"}} // DefaultGraph {"type": "default"}C# 侧自定义 Converter:
public class TermConverter : JsonConverter<ITerm> { public override ITerm? Read(ref Utf8JsonReader reader, ...) { using var doc = JsonDocument.ParseValue(ref reader); var kind = doc.RootElement.GetProperty("type").GetString(); return kind switch { "uri" => new NamedNode(doc.RootElement.GetProperty("value").GetString()!), "bnode" => new BlankNode(doc.RootElement.GetProperty("value").GetString()!), "literal" => /* 解析 value/language/datatype/direction */, "triple" => JsonSerializer.Deserialize<Triple>(root.GetRawText())!, _ => throw new JsonException($"Unknown term type: {kind}") }; } }通过这种 JSON 约定,C# 的record类型与 Rust 的 serde 序列化保持了精确对应,无需额外的中间表示层。
七、流式查询结果:懒迭代器
SPARQL 查询可能返回百万级结果。为避免将全部结果物化到一个 JSON 数组中,实现了流式迭代器:
// oxigraph_store_query_iter:返回不透明句柄,不包含结果数据 pub extern "C" fn oxigraph_store_query_iter( handle: StoreHandle, query_json: *const c_char, ) -> *mut c_char { // ... 执行查询,获取 QueryResults // 包装为 QueryResultsWrapper 枚举 // 返回 {"ok":{"handle":...}} — 不包含任何数据行 } // 每次取一行 pub extern "C" fn oxigraph_query_iter_next_solution( handle: QueryResultsHandle ) -> *mut c_char { // 调用 iter.next(),返回单行 JSON 或 null }C# 侧LazySolutionList:按需向 Rust 请求下一行,支持IReadOnlyList<T>接口:
private void MaterializeUpTo(int required) { while (_materialized.Count < required) { var ptr = OxigraphNative.query_iter_next_solution(_handle); var json = Marshal.PtrToStringUTF8(ptr) ?? "null"; OxigraphNative.free_string(ptr); if (okVal is null) { _count = _materialized.Count; return; } _materialized.Add(new QuerySolution(okVal)); } }八、Chunked Bulk Loader
大数据加载使用 RocksDB 的批量加载路径,以 10,000 个 quad 为一批:
public void BulkExtend(IEnumerable<Quad> quads) { const int chunkSize = 10_000; // 1. 开始批量加载器 var handle = store_bulk_extend_begin(); // 2. 分批喂入数据 foreach (var quad in quads) { chunk.Add(quad); if (chunk.Count >= chunkSize) { store_bulk_extend_add_chunk(handle, JsonSerializer.Serialize(chunk)); chunk.Clear(); } } // 3. 提交 store_bulk_extend_add_chunk(handle, JsonSerializer.Serialize(chunk)); store_bulk_extend_commit(handle); } // 异常时自动取消: // bulkHandle 的 ReleaseHandle 调用 store_bulk_extend_cancel// Rust 侧:RocksDB BulkLoader pub extern "C" fn oxigraph_store_bulk_extend_commit(handle) { let loader: Box<BulkLoader> = Box::from_raw(handle as *mut _); loader.commit()?; }九、内存管理
9.1 SafeHandle — 确定性析构
所有 Rust 句柄都包装在 .NETSafeHandle中。即使进程异常终止(AppDomain卸载、ThreadAbortException),CLR 也会调用ReleaseHandle。
internal sealed class StoreSafeHandle : SafeHandleZeroOrMinusOneIsInvalid { protected override bool ReleaseHandle() { OxigraphNative.store_destroy(handle); // Rust: drop(Box::from_raw(ptr)) return true; } }9.2 GCHandle — 防止委托被 GC 回收
当一个 C# 委托被传递给 Rust 侧持有,必须用GCHandle固定它:
private static readonly GCHandle _gcHandle = GCHandle.Alloc(_bridge); private static readonly IntPtr _bridgePtr = Marshal.GetFunctionPointerForDelegate(_bridge);GCHandle.Alloc确保委托在托管堆上不会被移动或回收,Rust 侧函数指针始终有效。
9.3 字符串生命周期
// Rust 返回的字符串必须由调用方释放 pub extern "C" fn oxigraph_free_string(ptr: *mut c_char) { if !ptr.is_null() { unsafe { drop(CString::from_raw(ptr)); } } }// C# 侧:每次 FFI 调用后立即释放 Rust 字符串 private static string ReadAndFree(IntPtr ptr) { string json = Marshal.PtrToStringUTF8(ptr); OxigraphNative.free_string(ptr); return json; }十、线程安全
Rust 侧使用UnsafeCell<Store>包装 Store,UnsafeCell不提供任何同步语义:
pub type StoreHandle = *mut UnsafeCell<Store>;C# 侧明确标注:
/// <summary> /// An RDF store backed by RocksDB (on disk) or in-memory. /// Thread safety: not guaranteed. Callers must synchronize concurrent access. /// </summary>异步 API 使用Task.Run将阻塞操作移到线程池,但不改变线程安全语义。所有对同一 Store 的并发访问(包括 async 方法)必须由调用方串行化。
十一、关键指标
| 维度 | 数值 |
|---|---|
| Rust FFI 函数 | 60+ 个#[unsafe(no_mangle)] |
C#LibraryImport | 60+ 个partial方法 |
| 协议格式 | JSON({"ok":...}/{"error":...}) |
| 句柄类型 | 5 种 SafeHandle |
| 回调方向 | Rust→C#(CustomFunctions、Stream 回调) |
| 流式查询 | 懒迭代器,逐行 FFI 调用 |
| 构建工具 | cargo+dotnet+build_package.py |
| 测试 | 280 xUnit 测试,全部通过 |
