当前位置: 首页 > news >正文

Python - GaussDB table sync to Hive

 

import psycopg2
from datetime import date, datetimeSRC_TABLE = "aaa"
TARGET_TABLE = "bbb"# ----------------------------
# Step 1: Connect to GaussDB
# ----------------------------
src_conn = psycopg2.connect(host="1.2.3.4",port="8000",database="source_db",user="user1",password="password1"
)
src_cur = src_conn.cursor()
src_cur.execute("SELECT id, name, salary FROM employees;")# ----------------------------
# Step 2: Initialize Hive connector
# ----------------------------
from hive_connector import Connector
hive_conn = Connector()def to_hive_value(v):if v is None:return "NULL"elif isinstance(v, str):return f"'{v}'"elif isinstance(v, date) and not isinstance(v, datetime):return f"'{v.strftime('%Y-%m-%d')}'"elif isinstance(v, datetime):return f"'{v.strftime('%Y-%m-%d %H:%M:%S')}'"elif isinstance(v, dict):# store as string "{}" or convert to Hive map()return "'{}'"elif isinstance(v, list) or isinstance(v, tuple):# could convert to Hive array() if schema requiresreturn f"array({', '.join(to_hive_value(x) for x in v)})"else:return str(v)# ----------------------------
# Step 3: Fetch & Insert
# ----------------------------
BATCH_SIZE = 500  # send multiple rows per INSERT if neededwhile True:rows = src_cur.fetchmany(BATCH_SIZE)if not rows:break# Convert each row into a Hive VALUES tuplevalues_list = []for row in rows:# Make sure to escape single quotes in string valuesrow_values = [to_hive_value(v) for v in row]if row_values:values_list.append(f"({', '.join(row_values)})")# Send a single INSERT statement to Hiveif values_list:insert_sql = f"INSERT INTO {TARGET_TABLE} VALUES {', '.join(values_list)}"  # DO NOT add a ';' at the end.
        hive_conn.run_sql(insert_sql)# ----------------------------
# Step 4: Cleanup
# ----------------------------
src_cur.close()
src_conn.close()