$ python -m pip install psycopg2
import psycopg2
connection = psycopg2.connect(
host="localhost",
database="testload",
user="haki",
password=None,
)
connection.autocommit = True
def create_staging_table(cursor) -> None:
cursor.execute("""
DROP TABLE IF EXISTS staging_beers;
CREATE UNLOGGED TABLE staging_beers (
id INTEGER,
name TEXT,
tagline TEXT,
first_brewed DATE,
description TEXT,
image_url TEXT,
abv DECIMAL,
ibu DECIMAL,
target_fg DECIMAL,
target_og DECIMAL,
ebc DECIMAL,
srm DECIMAL,
ph DECIMAL,
attenuation_level DECIMAL,
brewers_tips TEXT,
contributed_by TEXT,
volume INTEGER
);
""")
import psycopg2.extras
@profile
def insert_execute_batch(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
all_beers = [{
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers]
psycopg2.extras.execute_batch(cursor, """
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", all_beers)
import io
def clean_csv_value(value: Optional[Any]) -> str:
if value is None:
return r'\N'
return str(value).replace('\n', '\\n')
@profile
def copy_stringio(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
csv_file_like_object = io.StringIO()
for beer in beers:
csv_file_like_object.write('|'.join(map(clean_csv_value, (
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['contributed_by'],
beer['brewers_tips'],
beer['volume']['value'],
))) + '\n')
csv_file_like_object.seek(0)
cursor.copy_from(csv_file_like_object, 'staging_beers', sep='|')
import psycopg2
db_host = 'postgres.server.com'
db_port = '5432'
db_un = 'user'
db_pw = 'password'
db_name = 'testdb'
conn = psycopg2.connect("dbname={} host={} user={} password={}".format(
db_name, db_host, db_un, db_pw),
cursor_factory=RealDictCursor)
cur = conn.cursor()
sql = 'select * from testtable where id > %s and id < %s'
args = (1, 4)
cur.execute(sql, args)
print(cur.fetchall())
pg8000
#!/bin/python
import pg8000
# declare an empty connection array
conn = []
num_trans = 100
num_subtrans = 200
# create multiple connections
for x in range(num_trans):
conn.append(pg8000.connect('edb', 'localhost', None, 54321, 'postgres', 'password'))
# open some subtransactions from each connection and perform 4000 inserts from each
# subtransaction
for x in range(num_trans):
conn[x].autocommit=False
cursor = conn[x].cursor()
print "Running transaction: ", x
for y in range(num_subtrans):
cursor.execute("INSERT INTO t1 (a) SELECT (repeat(%s, 100)) FROM generate_series(1, 4000)", ('a'))
cursor.execute("SAVEPOINT a1")
input = raw_input("Should we commit? (y/n) : ")
if input.lower() in ['y', 'yes']:
for x in range(num_trans):
cursor = conn[x].cursor()
cursor.execute("commit")
else:
for x in range(num_trans):
cursor = conn[x].cursor()
cursor.execute("rollback")