Read More
Data scraping techniques
import sqlite3
import requests
from bs4 import BeautifulSoup as bs
def scrap_ticker_symbols():
url = 'https://money.cnn.com/data/hotstocks/'
# Download HTML content.
response = requests.get(url)
# Check status code. for example if the URL page isn't found the server
# would response with a 404 status code
if response.status_code % 1000 // 100 != 2:
# The http response is different than 2xx
raise Exception("Error downloading html content")
# Parse the HTML content using SOAP module
soap = bs(response.content, 'html.parser')
table_active_stocks = soap.find('table', attrs={'class': 'wsod_dataTableBigAlt'})
# Find all HTML elements in page.
td_elements = table_active_stocks.find_all('td')
# Find all elements within the above elements above with class wsod_symbol
a_elements = [
element.find('a', attrs={'class':'wsod_symbol'})
for element in td_elements
]
# Return the text inside the elements.
return [a_element.text for a_element in a_elements if a_element]
def scrap_stock_details(ticker_symbol: str):
print(f'Scrapping ticker symbol {ticker_symbol} details')
url = f'https://finance.yahoo.com/quote/{ticker_symbol}?p={ticker_symbol}'
# Download HTML content.
response = requests.get(url)
# Check status code. for example if the URL page isn't found the server
# would response with a 404 status code
if response.status_code % 1000 // 100 != 2:
# The http response is different than 2xx
raise Exception("Error downloading html content")
# Parse the HTML content using SOAP module
soap = bs(response.content, 'html.parser')
try:
# The open price is inside a element with class OPEN-value'
open_price = float(soap.find('td', attrs={'data-test':'OPEN-value'}).span.text)
# The average volumn is inside a element with class AVERAGE_VOLUME_3MONTH-value'
avg_volume_str = soap.find('td', attrs={'data-test':'AVERAGE_VOLUME_3MONTH-value'}).span.text
# Replace all the commas inside the average volumn text so we can parse it as int.
avg_volume = int(avg_volume_str.replace(',', ''))
# The PE ratio is inside a element with class PE_RATIO-value'
pe_ratio_str = soap.find('td', attrs={'data-test':'PE_RATIO-value'}).span.text
pe_ratio = float(pe_ratio_str) if pe_ratio_str != 'N/A' else None
return ticker_symbol, open_price, avg_volume, pe_ratio
except:
return False
def save_data_db(db_name: str, data: list):
# Establish connection to sqllite database db_name
connection = sqlite3.connect(db_name)
# Create a cursor.
cursor = connection.cursor()
# Data definition language to create table.
ddl_sql = """
CREATE TABLE IF NOT EXISTS Stocks (
Ticker text,
OpenPrice number,
AvgVolume int,
PERatio number
)
"""
# Execute SQL.
cursor.execute(ddl_sql)
# Save changes to database
connection.commit()
for row in data:
if not row:
# Empty row skip.
continue
# Change the value of our metrics to null in case it is missing.
ticker_symbol, open_price, avg_volume, pe_ratio = map(
lambda value: value if value else 'null', row
)
# build query to insert row.
query = f"INSERT INTO Stocks (Ticker, OpenPrice, AvgVolume, PERatio) VALUES ('{ticker_symbol}', {open_price}, {avg_volume}, {pe_ratio})"
# Execute query to insert row.
cursor.execute(query)
# Save changes to database.
connection.commit()
def save_data_txt(filename: str, data: list):
is_header = True
with open(filename, 'w') as f:
for stock_details in data:
if is_header:
# Save header name.
line = 'Ticker,OpenPrice,AvgVolume,PERatio\n'
is_header = False
f.write(line)
# Write a symbol per line.
line = ','.join(str(value) if value else '' for value in stock_details) + '\n'
f.write(line)
def main():
print('Scrapping ticker symobols')
ticker_symbols = scrap_ticker_symbols()
data = list()
for symbol in ticker_symbols:
data.append(scrap_stock_details(symbol))
db_name = 'StocksDatabse.db'
print(f'\nWriting data to database {db_name}')
save_data_db(db_name, data)
csv_filename = 'stocks.txt'
print(f'\nSaving ticker symobols to file {csv_filename}')
save_data_txt(csv_filename, data)
print('Scrapping done')
if __name__ == "__main__":
main()
Calculating jiffies and server latency
import os
import time
import datetime
import logging
import pexpect
# Program configurations.
log_file = "latency.log"
server = "www.lfedge.org"
interval = 1
threshold = 250
# Change it to false to save only to log file.
print_stdout = True
logging.basicConfig(filename=log_file, level=logging.DEBUG, format='%(levelname)s:%(asctime)s:%(message)s')
log = logging.getLogger(__name__)
if __name__ == "__main__":
program_banner = f"Ping Interval: {interval}, Destination: {server} Threshold to Log (msec): {threshold}"
log.info(program_banner)
if print_stdout:
print(program_banner)
command = f"ping -i {interval} {server}"
child = pexpect.spawn(command)
child.timeout = 1200
first_line = True
while True:
line = child.readline()
if not line:
break
if line.startswith(b"ping: unknown host"):
if print_stdout: print("Unknown host: " + server)
log.info("Unknown host: " + server)
break
if first_line:
first_line = False
continue
ping_time = float(line[line.find(b"time=") + 5 : line.find(b" ms")])
line = time.strftime("%m/%d/%Y %H:%M:%S") + ": " + str(ping_time) + " ms"
if print_stdout: print(line)
if ping_time> threshold:
log.info(log_file, line + "\n")