![Pythonクローリング&スクレイピング[増補改訂版] -データ収集・解析のための実践開発ガイド Pythonクローリング&スクレイピング[増補改訂版] -データ収集・解析のための実践開発ガイド](https://m.media-amazon.com/images/I/41VimzqqXAL._SL160_.jpg)
Pythonクローリング&スクレイピング[増補改訂版] -データ収集・解析のための実践開発ガイド
- 作者:加藤 耕太
- 発売日: 2019/08/10
- メディア: 単行本(ソフトカバー)
目次
- 目次
- 背景・目的
- 参考資料
- Pythonプログラム
- Requestsライブラリの使い方
- response.textか.contentか
- SSLサーバ証明書の警告エラーを無視
- 文字コードをShift-JISにしてCSV保存
背景・目的
以前から卓球のデータ分析をやりたいと思って
ので、Tリーグの試合データを公式サイトから
Webスクレイピングしてみました。
その際に、スクレイピングするプログラムを
Pythonで作ったのですが、いろいろ分からない
事が見つかり調べたので、それらをメモして
おきます。
参考資料
スクレイピングするプログラムは@goe425さんが
こちらの記事で紹介しているものを参考にして
作りました。
外見の動作はオリジナルとほぼ同じですが、
自身の勉強のために内部設計を少しずつ
変えながら写経しました。
Pythonプログラム
こちらが実際に作ったPythonプログラムです。
チーム間、選手間の対戦成績 + 各試合の得点推移
import requests from bs4 import BeautifulSoup import re import csv class TleaguqGameScraping: def __init__(self, season_year): self.season_year = season_year self.base_url = "https://tleague.jp/match/" self.is_end = False self.init_year_list() def init_year_list(self): if self.season_year == "2018": self.month_list = ["201810", "201811", "201812", "201901", "201902", "201903"] else: self.month_list = ["201908", "201909", "201910", "201911", "201912", "202001", "202002", "202003"] def get_link(self): self.link_list = [] for month in self.month_list: url = self.base_url + "?season=" + self.season_year + "&month=" + month + "&mw=" response = requests.get(url) soup = BeautifulSoup(response.content, "lxml") match_list = soup.find(class_="ui-match-table") # men's match for mm in match_list.find_all(class_="match-men"): for inner in mm.find_all(class_="inner"): link = inner.find("a").get("href") self.link_list.append(link[7:]) # women's match for wm in match_list.find_all(class_="match-women"): for inner in wm.find_all(class_="inner"): link = inner.find("a").get("href") self.link_list.append(link[7:]) def create_match_data_table(self, soup, match_id, date, sex): match = soup.find(class_="match-info") home = match.find(class_="home").get_text() away = match.find(class_="away").get_text() points = match.find_all(class_="cell-score") home_points_txt = points[0].get_text() away_points_txt = points[1].get_text() item_class = match.find(class_="item-spec").find_all("li") visitors = item_class[2].get_text() visitors_txt = re.sub(r"[^0-9]", "", visitors) if not home_points_txt: self.is_end = True return self.match_table.append([match_id, date, sex, home, away, int(home_points_txt), int(away_points_txt), int(visitors_txt)]) def create_game_data_table(self, soup, match_id, date, sex): match = soup.find(class_="cell-game") home_count = 0 away_count = 0 game = [] game_list = [] for idx, col in enumerate(match.find_all(class_="col")): # game count if idx % 3 == 1: home_count = col.get_text()[0] away_count = col.get_text()[2] continue a_list = col.find_all("a") if len(a_list) >= 2: for a in a_list: game.append(a.get_text().replace("\n", "")) else: for a in a_list: game.append(a.get_text().replace("\n", "")) game.append(None) # reset if idx % 3 == 2: game.append(home_count) game.append(away_count) game_list.append(game) game = [] for game_idx, gm in enumerate(game_list): if gm[1] != None: self.game_table.append([match_id, game_idx, date, sex, gm[0], gm[1], gm[2], gm[3], int(gm[4]), int(gm[5])]) else: self.game_table.append([match_id, game_idx, date, sex, gm[0], None, gm[2], None, int(gm[4]), int(gm[5])]) def convert_array(self, point, timeout): num_point = [] for p_idx, p in enumerate(point): if re.match("[0-9]", p): num_point.append(p) rally_count = len(num_point) // 2 ret_timeout = [0] * rally_count count = 0 for t_idx, t in enumerate(timeout): if t > 0: if t_idx <= rally_count + count: ret_timeout[t_idx] = 1 else: ret_timeout[t_idx-rally_count-count] = 2 count += 1 ret = [] home = num_point[:rally_count] away = num_point[rally_count:] for r_idx in range(rally_count): ret.append(home[r_idx] + " " + away[r_idx]) return ret, ret_timeout def create_point_data_table(self, soup, match_id, date, sex): match_point, match_serve, match_timeout = [], [], [] for mg in soup.find_all("div", class_="match-game"): game_point, game_serve, game_timeout = [], [], [] for wt_idx, wt in enumerate(mg.find_all(class_="wrap-table")): point, serve, timeout = [], [], [] for td_idx, td in enumerate(wt.find_all("td")): td_txt = td.get_text() class_list = td.get("class") if not td_txt: continue if class_list != None and "timeout" in class_list: timeout.append(1) continue if class_list != None and "serve" in class_list: serve.append(1) else: serve.append(2) point.append(td_txt) timeout.append(0) game_serve.append(serve[:len(serve)]) game_tmp, game_timeout_tmp = self.convert_array(point, timeout) game_point.append(game_tmp) game_timeout.append(game_timeout_tmp) match_point.append(game_point) match_timeout.append(game_timeout) match_serve.append(game_serve) for mp_idx, mp in enumerate(match_point): for gp_idx, gp in enumerate(mp): for p_idx, p in enumerate(gp): home_point, away_point = p.split() self.point_table.append([match_id, date, sex, mp_idx, gp_idx, p_idx, int(home_point), int(away_point), match_serve[mp_idx][gp_idx][p_idx], match_timeout[mp_idx][gp_idx][p_idx]]) def create_data_table(self): self.match_table = [] self.game_table = [] self.point_table = [] for link in self.link_list: url = self.base_url + link print(url) response = requests.get(url, verify=False) soup = BeautifulSoup(response.content, "lxml") if link[-3] == "m": sex = 0 else: sex = 1 date = link[-11:-3] match_id = link[9:] self.create_match_data_table(soup, match_id, date, sex) if self.is_end == True: break self.create_game_data_table(soup, match_id, date, sex) self.create_point_data_table(soup, match_id, date, sex) def save_match_table_as_csv(self): header = ["MatchID", "Date", "Sex", "Home", "Away", "Home Points", "Away Points", "Visitors"] with open('Match_table_{0}.csv'.format(self.season_year), 'w', encoding='Shift-jis', errors='ignore') as f: writer = csv.writer(f, lineterminator='\n') writer.writerow(header) writer.writerows(self.match_table) def save_game_table_as_csv(self): header = ["MatchID", "GameID", "Date", "Sex", "Home Player1", "Home Player2", "Away Player1", "Away Player2", "Home Points", "Away Points"] with open('Game_table_{0}.csv'.format(self.season_year), 'w', encoding='Shift-jis', errors='ignore') as f: writer = csv.writer(f, lineterminator='\n') writer.writerow(header) writer.writerows(self.game_table) def save_point_table_as_csv(self): header = ["MatchID", "Date", "Sex", "Game ID", "Set ID", "Point ID", "Home Point", "Away Point", "Serve", "Timeout"] with open('Point_table_{0}.csv'.format(self.season_year), 'w', encoding='Shift-jis', errors='ignore') as f: writer = csv.writer(f, lineterminator='\n') writer.writerow(header) writer.writerows(self.point_table) def main(): # set season year season_year = input("Input season year(2018 or 2019): ") # initialize tgs = TleaguqGameScraping(season_year) # scraping tgs.get_link() tgs.create_data_table() # save tgs.save_match_table_as_csv() tgs.save_game_table_as_csv() tgs.save_point_table_as_csv() if __name__ == "__main__": main()
各選手のシーズン通してのスタッツ
import requests from bs4 import BeautifulSoup import re import json import csv import numpy as np class TleagueStatsScraping: def __init__(self, season_year): self.season_year = season_year self.base_url = "https://tleague.jp" def get_link(self): self.player_list_url = [] for sex in ("m", "w"): url = self.base_url + "/standings/player/" + "?season=" + self.season_year + "&mw=" + sex response = requests.get(url) soup = BeautifulSoup(response.content, "lxml") match_list = soup.find(class_="ui-standings-table") for a in match_list.find_all("a"): link = a.get("href") if "/stats/" in link: self.player_list_url.append(link) def get_player_name(self, soup): head_stats = soup.find(class_="head-stats") name_jpn_eng = head_stats.find(class_="reset").get_text() name_eng = re.sub("[^a-zA-Z\s]", "", name_jpn_eng) name_jpg = name_jpn_eng.split(name_eng.strip()[0])[0] return name_jpg, name_eng.strip() def get_team_name(self, soup): spec_stats = soup.find(class_="spec-stats") name = spec_stats.find("b").get_text() return name def get_stats(self, soup): ui_total_stats_class = soup.find(class_="ui-total-stats") ui_total_stats_li = ui_total_stats_class.find_all("li") ret = [] for stats in ui_total_stats_li: stats_txt = stats.find("div").get_text() if "%" in stats_txt: stats_num = int(stats_txt.split("%")[0]) ret.append(stats_num) elif "/" not in stats_txt: stats_num = int(stats_txt) ret.append(stats_num) else: ret.append(stats_txt) ui_sub_stats_class = soup.find(class_="ui-sub-stats") ui_sub_stats_b = ui_sub_stats_class.find_all("b") for stats in ui_sub_stats_b: stats_txt = stats.get_text() if "Min" in stats_txt: stats_num = float(stats_txt.split(" Min")[0]) else: stats_num = int(stats_txt) ret.append(stats_num) return ret def get_charts_labels_text(self, script): script = re.sub("\s", "", script) labels = re.search("labels:\[.*],datasets", script) labels = labels.group()[:-8] labels = re.sub("5連続ポイント", "", labels) data_str = re.findall("[0-9\.]{1,10000}", labels) data_float = [] for data in data_str: if data[0] == ".": data = "0" + data data_float.append(float(data)) return data_float def get_charts_data_text(self, script): script = re.sub("\s", "", script) script = re.search("data:\{.*\},", script) data = script.group() data = re.findall("data:\[.*?\],", data) ret = [] for idx in range(len(data)): data[idx] = data[idx][6:-2] for d in data: tmp = d.split(",") ret.append([int(re.search("\d{1,10000}", t).group()) if t != '' else 0 for t in tmp]) return ret def get_player_data(self, url): response = requests.get(url) soup = BeautifulSoup(response.content, "lxml") item_charts = soup.find_all(class_="item-chart") player_name_jpn, player_name_eng = self.get_player_name(soup) team_name = self.get_team_name(soup) stats_data = self.get_stats(soup) charts_data = [] for idx, chart in enumerate(item_charts): if idx > 5: break script = chart.find("script").string if idx == 3: charts_data.append(self.get_charts_labels_text(script)) else: charts_data.append(self.get_charts_data_text(script)) return player_name_jpn, player_name_eng, team_name, stats_data, charts_data def calculate_prior_probability(self, charts_data): srv_win_lose_count = 0 srv_win_lose_count_list = [] rcv_win_lose_count = 0 rcv_win_lose_count_list = [] srv_win_count = 0 srv_win_count_list = [] rcv_win_count = 0 rcv_win_count_list = [] srv_lose_count = 0 srv_lose_count_list = [] rcv_lose_count = 0 rcv_lose_count_list = [] for i in [4, 5]: for j in [0, 1]: for k in range(6): # serve if i == 4: # win + lose srv_win_lose_count += charts_data[i][j][k] srv_win_lose_count_list.append(charts_data[i][j][k]) # win if j == 0: srv_win_count += charts_data[i][j][k] srv_win_count_list.append(charts_data[i][j][k]) # lose else: srv_lose_count += charts_data[i][j][k] srv_lose_count_list.append(charts_data[i][j][k]) # receive else: # win + lose rcv_win_lose_count += charts_data[i][j][k] rcv_win_lose_count_list.append(charts_data[i][j][k]) # win if j == 0: rcv_win_count += charts_data[i][j][k] rcv_win_count_list.append(charts_data[i][j][k]) # lose else: rcv_lose_count += charts_data[i][j][k] rcv_lose_count_list.append(charts_data[i][j][k]) # tactics prior probability srv_tac_pri_prob = np.array(srv_win_lose_count_list) / srv_win_lose_count rcv_tac_pri_prob = np.array(rcv_win_lose_count_list) / rcv_win_lose_count # win likelihood by each tactics srv_win_likelihood = np.array(srv_win_count_list) / srv_win_count rcv_win_likelihood = np.array(rcv_win_count_list) / rcv_win_count # lose likelihood by each tactics srv_lose_likelihood = np.array(srv_lose_count_list) / srv_lose_count rcv_lose_likelihood = np.array(rcv_lose_count_list) / rcv_lose_count return srv_tac_pri_prob, rcv_tac_pri_prob, srv_win_likelihood, \ rcv_win_likelihood, srv_lose_likelihood, rcv_lose_likelihood def create_data_table(self): self.player_stats_table = [] for link in self.player_list_url: url = self.base_url + link print(url) player_name_jpn, player_name_eng, team_name, stats_data, charts_data = self.get_player_data(url) srv_tac_pri_prob, rcv_tac_pri_prob, srv_win_likelihood, \ rcv_win_likelihood, srv_lose_likelihood, rcv_lose_likelihood = self.calculate_prior_probability(charts_data) self.player_stats_table.append([ self.season_year, player_name_jpn, team_name, stats_data[1], stats_data[2], stats_data[3], stats_data[4], stats_data[6], stats_data[7], stats_data[8], stats_data[9], stats_data[10], stats_data[11], stats_data[12], stats_data[13], stats_data[14], charts_data[0][0][4], charts_data[0][0][3], charts_data[0][0][2], charts_data[0][0][1], charts_data[0][0][0], charts_data[0][1][4], charts_data[0][1][3], charts_data[0][1][2], charts_data[0][1][1], charts_data[0][1][0], charts_data[1][0][0], charts_data[1][0][1], charts_data[1][0][2], charts_data[1][0][3], charts_data[2][0][0], charts_data[2][0][1], charts_data[2][0][2], charts_data[2][0][3], charts_data[3][0], charts_data[3][1], charts_data[3][2], charts_data[3][3], charts_data[3][4], charts_data[3][5], charts_data[4][0][0], charts_data[4][0][1], charts_data[4][0][2], charts_data[4][0][3], charts_data[4][0][4], charts_data[4][0][5], charts_data[4][1][0], charts_data[4][1][1], charts_data[4][1][2], charts_data[4][1][3], charts_data[4][1][4], charts_data[4][1][5], charts_data[5][0][0], charts_data[5][0][1], charts_data[5][0][2], charts_data[5][0][3], charts_data[5][0][4], charts_data[5][0][5], charts_data[5][1][0], charts_data[5][1][1], charts_data[5][1][2], charts_data[5][1][3], charts_data[5][1][4], charts_data[5][1][5], srv_tac_pri_prob[0] + srv_tac_pri_prob[6], srv_tac_pri_prob[1] + srv_tac_pri_prob[7], srv_tac_pri_prob[2] + srv_tac_pri_prob[8], srv_tac_pri_prob[3] + srv_tac_pri_prob[9], srv_tac_pri_prob[4] + srv_tac_pri_prob[10], srv_tac_pri_prob[5] + srv_tac_pri_prob[11], rcv_tac_pri_prob[0] + rcv_tac_pri_prob[6], rcv_tac_pri_prob[1] + rcv_tac_pri_prob[7], rcv_tac_pri_prob[2] + rcv_tac_pri_prob[8], rcv_tac_pri_prob[3] + rcv_tac_pri_prob[9], rcv_tac_pri_prob[4] + rcv_tac_pri_prob[10], rcv_tac_pri_prob[5] + rcv_tac_pri_prob[11], srv_win_likelihood[0], srv_win_likelihood[1], srv_win_likelihood[2], srv_win_likelihood[3], srv_win_likelihood[4], srv_win_likelihood[5], rcv_win_likelihood[0], rcv_win_likelihood[1], rcv_win_likelihood[2], rcv_win_likelihood[3], rcv_win_likelihood[4], rcv_win_likelihood[5], srv_lose_likelihood[0], srv_lose_likelihood[1], srv_lose_likelihood[2], srv_lose_likelihood[3], srv_lose_likelihood[4], srv_lose_likelihood[5], rcv_lose_likelihood[0], rcv_lose_likelihood[1], rcv_lose_likelihood[2], rcv_lose_likelihood[3], rcv_lose_likelihood[4], rcv_lose_likelihood[5] ]) def save_stats_table_as_csv(self): header = [ "Season", "PlayerName", "TeamName", "MatchNum", "VMatchNum", "WinMatchNum", "LoseMatchNum", "WinGameNum", "LoseGameNum", "ShutoutRate[%]", "WinPointNum", "LosePointNum", "MaxContinuousGameGetNum", "GameTime", "ReverseWinNum", "ReverseLoseNum", "1stGameWinNum", "2ndGameWinNum", "3rdGameWinNum", "4thGameWinNum", "5thGameWinNum", "1stGameLoseNum", "2ndGameLoseNum", "3rdGameLoseNum", "4thGameLoseNum", "5thGameLoseNum", "WinGameDiff2", "WinGameDiff3", "WinGameDiff4", "WinGameDiff5", "LoseGameDiff2", "LoseGameDiff3", "LoseGameDiff4", "LoseGameDiff5", "AveServiceAceNum", "AveReceiveAceNum", "AveRallyNum", "ServicePointRate", "ReceivePointRate", "5ContinuousPointRate", "RallyCount1Win", "RallyCount3Win", "RallyCount5Win", "RallyCount7Win", "RallyCount9Win", "RallyCount11Win", "RallyCount1Lose", "RallyCount3Lose", "RallyCount5Lose", "RallyCount7Lose", "RallyCount9Lose", "RallyCount11Lose", "RallyCount2Win", "RallyCount4Win", "RallyCount6Win", "RallyCount8Win", "RallyCount10Win", "RallyCount12Win", "RallyCount2Lose", "RallyCount4Lose", "RallyCount6Lose", "RallyCount8Lose", "RallyCount10Lose", "RallyCount12Lose", "SrvPriorProbability1", "SrvPriorProbability3", "SrvPriorProbability5", "SrvPriorProbability7", "SrvPriorProbability9", "SrvPriorProbability11", "RcvPriorProbability2", "RcvPriorProbability4", "RcvPriorProbability6", "RcvPriorProbability8", "RcvPriorProbability10", "RcvPriorProbability12", "SrvWinLikelihood1", "SrvWinLikelihood3", "SrvWinLikelihood5", "SrvWinLikelihood7", "SrvWinLikelihood9", "SrvWinLikelihood11", "RcvWinLikelihood2", "RcvWinLikelihood4", "RcvWinLikelihood6", "RcvWinLikelihood8", "RcvWinLikelihood10", "RcvWinLikelihood12", "SrvLoseLikelihood1", "SrvLoseLikelihood3", "SrvLoseLikelihood5", "SrvLoseLikelihood7", "SrvLoseLikelihood9", "SrvLoseLikelihood11", "RcvLoseLikelihood2", "RcvLoseLikelihood4", "RcvLoseLikelihood6", "RcvLoseLikelihood8", "RcvLoseLikelihood10", "RcvLoseLikelihood12" ] with open('Stats_table_{0}.csv'.format(self.season_year), 'w', encoding='Shift-jis', errors='ignore') as f: writer = csv.writer(f, lineterminator='\n') writer.writerow(header) writer.writerows(self.player_stats_table) def main(): # set season year season_year = input("Input season year(2018 or 2019): ") # initialize tss = TleagueStatsScraping(season_year) # scraping tss.get_link() tss.create_data_table() # save tss.save_stats_table_as_csv() if __name__ == "__main__": main()
Requestsライブラリの使い方
こちらの記事が参考になりました。
まずは、Requestsのgetメソッドに
アクセス先のURLを引数として与えて、
そのHTMLを取得します。
url = self.base_url + "?season=" + self.season_year + "&month=" + month + "&mw=" response = requests.get(url)
response.textか.contentか
textだとunicode文字列、contentだと
bytes文字列を取得できます。これらは
どちらでも良いようですが、後者の
contentの方が文字化けが起きにくく
なっていいそうです。
soup = BeautifulSoup(response.content, "lxml")
SSLサーバ証明書の警告エラーを無視
RequestsのgetメソッドでHTMLを取得
する際、そのアクセス先によっては
SSLErrorが起きて処理が止まってしまう
事があります。
もしアクセス先が特に問題ないサイト
なのであれば、getメソッドのverify
オプションをFalseにする事で、警告を
無視して処理を続行してくれます。
response = requests.get(url, verify=False)
文字コードをShift-JISにしてCSV保存
def save_match_table_as_csv(self): header = ["MatchID", "Date", "Sex", "Home", "Away", "Home Points", "Away Points", "Visitors"] with open('Match_table_{0}.csv'.format(self.season_year), 'w', encoding='Shift-jis', errors='ignore') as f: writer = csv.writer(f, lineterminator='\n') writer.writerow(header) writer.writerows(self.match_table)
最初にwith open構文でCSVファイルの
オブジェクトを生成しますが、ここで
エンコーディングをShift-JISにしないと
各チームや選手の名前が文字化けして
しまいます。
https://attracter.tokyo/tech/python37/
またこの時、Shift-JISで割り当てられて
いない文字を書き出そうとすると、
UnicodeEncodeErrorが起きます。
それに対しては、こちらの記事にある
ように、with openのerrorsオプションを
Falseに指定する事で、その文字の
処理を飛ばす事が出来ます。
hytmachineworks.hatenablog.com
他にも、こちらの記事にあるように、
codecsモジュールを使うやり方も
あるようで、やり方を知っておくと
便利そうです。