menu事業部 フロントエンドエンジニアの坂井田です!
私は現在リモートワークをしていますが、仕事中は会話のためにイヤホンをしていることもあり来客に気づきにくいという問題があります。
そこで、今回は今持っているものだけを使って来客時に自動通知するシステムを作ってみたので、そのHowToをご紹介します!
使用した機材
- Raspberry Pi 4 Model B 8GB
- アイ・オー・データのIPカメラ「Qwatch TS-NS110W」
IPカメラはとあるコンテストの副賞でいただいたものを使いました。
一応このカメラ自体には動作検知機能がついています。
しかし通知先のプラットフォームが限定的でいまいち充実していないので、普段使っているLINEで通知してくれたら便利だな...と思って作りました!!
アルゴリズム
このカメラにはブラウザからアクセスできる管理ページがあり、そこにリアルタイムで画像が表示されます。
そのため、Pythonでこの画像を随時取得して動作検知をするようにします!
なぜPythonなのかというと、OpenCVやNumPyなど画像解析に便利なパッケージが多く用意されており、スクレイピング・解析・APIリクエストを網羅できるからです。
イメージとしてはこんな感じです↓↓
順番 | イメージ | やること |
---|---|---|
1 | 管理ページの画像を取得する | |
2 | 一時フォルダーに画像を保存する | |
3 | 3枚溜まったらそれらを比較し、動きがある箇所があれば各種通知APIを叩く | |
4 | 3枚ある状態の場合、一番古い画像を1枚削除する | |
5 | 1秒おきに繰り返す |
1. LINEで通知する
Pythonのプログラム
実際に使用したコードをこちらに載せておきます。
ここをクリックしてコードを表示
import base64 import datetime import glob import os import shutil import time import urllib.error import urllib.request import cv2 import numpy as np import requests SAVE_DIR_PATH = os.path.dirname(__file__) + os.sep + "image" SEND_DIR_PATH = os.path.dirname(__file__) + os.sep + "send" USER = "ユーザー名" PASSWORD = "パスワード" IMAGE_URL = "http://カメラのIPアドレス/dms?nowprofileid=1" LINE_ACCESS_TOKEN = "LINEアクセストークン" # 不審物判定のしきい値 min_moment = 1000 # 日時を取得 def get_date_and_time(): now = datetime.datetime.now() text = str(now.strftime('%Y/%m/%d %H:%M:%S')) return text # ファイルをダウンロードする def download_file(url, user, password, dst_path): try: basic_user_and_pasword = base64.b64encode( "{}:{}".format(user, password).encode("utf-8") ) # Basic認証付きのGETリクエストを作成する request = urllib.request.Request( url, headers={ "Authorization": "Basic " + basic_user_and_pasword.decode("utf-8") }, ) with urllib.request.urlopen(request) as web_file: data = web_file.read() with open(dst_path, mode="wb") as local_file: local_file.write(data) except urllib.error.URLError as e: print(e) # 日本語パスを含む画像を開く def imread(filename, flags=cv2.IMREAD_COLOR, dtype=np.uint8): try: n = np.fromfile(filename, dtype) img = cv2.imdecode(n, flags) return img except Exception as e: print(e) return None # フレーム差分の計算 def frame_sub(img1, img2, img3, th): # フレームの絶対差分 diff1 = cv2.absdiff(img1, img2) diff2 = cv2.absdiff(img2, img3) # 2つの差分画像の論理積 diff = cv2.bitwise_and(diff1, diff2) # 二値化処理 diff[diff < th] = 0 diff[diff >= th] = 255 # メディアンフィルタ処理(ゴマ塩ノイズ除去) mask = cv2.medianBlur(diff, 5) return mask # 送信用ファイルを削除 def send_file_delete(): sendFiles = glob.glob(SEND_DIR_PATH + os.sep + "*") while len(sendFiles) != 0: os.remove(sendFiles[0]) del sendFiles[0] # LINEメッセージ送信用 class LINENotifyBot: API_URL = 'https://notify-api.line.me/api/notify' def __init__(self, access_token): self.__headers = {'Authorization': 'Bearer ' + access_token} def send( self, message, image=None, sticker_package_id=None, sticker_id=None, ): payload = { 'message': message, 'stickerPackageId': sticker_package_id, 'stickerId': sticker_id, } files = {} if image != None: files = {'imageFile': open(image, 'rb')} r = requests.post( LINENotifyBot.API_URL, headers=self.__headers, data=payload, files=files, ) def work(): dateStr = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") dst_path = SAVE_DIR_PATH + os.sep + "image_" + dateStr + ".png" download_file(IMAGE_URL, USER, PASSWORD, dst_path) files = glob.glob(SAVE_DIR_PATH + os.sep + "*") files.sort(key=os.path.getmtime) # 更新日時の昇順で並び替え # 画像が3枚未満の場合→これ以降の処理をスキップ if len(files) < 3: return while len(files) > 3: os.remove(files[0]) del files[0] # 画像読み込み cvImg = [] cvFrame = [] for i in range(len(files)): cvImg.append(imread(files[i])) # フレームをグレースケール変換 cvFrame.append(cv2.cvtColor(cvImg[i], cv2.COLOR_RGB2GRAY)) # フレーム間差分を計算 mask = frame_sub(cvFrame[0], cvFrame[1], cvFrame[2], th=10) # 白色領域のピクセル数を算出 moment = cv2.countNonZero(mask) # 白色領域のピクセル数が一定以上なら不審物有りと判定 sendFiles = glob.glob(SEND_DIR_PATH + os.sep + "*") findCount = len(sendFiles)+1 if moment > min_moment: print(get_date_and_time()+" 検知:", findCount) shutil.copyfile(files[len(files)-1], SEND_DIR_PATH+os.sep+str(findCount)+".png") else: send_file_delete() findCount = 0 if findCount == 3: now = get_date_and_time() print(now+" 連続3回検知しました") # LINE Notifyで送信 bot = LINENotifyBot(access_token=LINE_ACCESS_TOKEN) bot.send( message="\n動作を検知しました\n日時: "+now, image=sendFiles[1] ) send_file_delete() time.sleep(60) print("処理を開始します") if os.path.exists(SAVE_DIR_PATH) == False: os.mkdir(SAVE_DIR_PATH) if os.path.exists(SEND_DIR_PATH) == False: os.mkdir(SEND_DIR_PATH) send_file_delete() while True: work() time.sleep(1)
解説
LINEでの通知について
LINE Notify
という、無料で使える公式APIを使いました。
LINEアカウントをお持ちの場合はとても簡単に使用することができます!
画像 | 手順 |
---|---|
LINE Notifyにアクセスしてログインし、右上のメニューからマイページを開く | |
ページを下にスクロールして「トークンを発行する」ボタンをクリックする | |
トークン名を入力し、送信先を選んで発行するボタンを押す。 (送信されるメッセージは「[トークン名] 内容」という形になります。) 次の画面でトークンが生成されるので、コピーしてコードに貼り付ける。 |
こんなに便利なAPIが無料で使えるのは、本当に有り難い限りです。。
画像ダウンロードについて
使用したカメラの場合、IPアドレスを入力してカメラの管理画面にアクセスする場合はBasic認証でユーザー名・パスワードを入力する必要がありました。
そのため、こちらのサイトを参考にしてリクエスト時のヘッダーにこれらの情報を含めるようにしました。
URLに直接埋め込んでもアクセスできます。(urllib.request.Request("http://ユーザ名:パスワード@IPアドレス/")
)
動体検知について
プログラムの基盤になっている、複数画像から動体検知するやり方についてはこちらのサイトを参考にしています。
OpenCVは画像処理の有名なパッケージで、トリミングやフィルター設定といった基本的な編集だけでなく、二値化して影を取り除いたり、物体を検出したり、今回のように差分を求めたりと何でもできます。
工夫次第で他にも色々な便利ツールが作れそうです🤩
実運用
動作を検知すると、下の画像のようにLINEが送られてきます!
送信先はLINEなので、一定期間保存されて後で見返すこともできます。
スマートウォッチをつけていると振動で教えてくれます⌚️
Pythonを常時動かしているラズパイ環境はこのような感じになっています!(NAS兼用です)
たまにメンテナンスをしています👨🔧
ラズパイの冷却ファンが壊れたので、秋葉原で入手したものに交換しました。100円!!
今回はIPカメラを使用しましたが、最終的にはラズパイを使用するので安価なWebカメラでも同じことができそうです!