class Smoking: """定义Smoking类""" # 浏览器驱动 __driver: webdriver = None # 配置帮助类 __cfg_info: dict = {} # 日志帮助类 __log_helper: LogHelper = None # 主程序目录 __work_path: str = '' # 是否正在运行 __running: bool = False # 无货 __no_stock = 'Currently Out of Stock' # 线程等待秒数 __wait_sec = 2 def __init__( self , work_path, cfg_info, log_helper: LogHelper): """初始化""" self .__cfg_info = cfg_info self .__log_helper = log_helper self .__work_path = work_path self .__wait_sec = int (cfg_info[ 'wait_sec' ]) # 如果小于2,则等于2 self .__wait_sec = ( 2 if self .__wait_sec < 2 else self .__wait_sec) def checkIsExistsById( self , id ): """通过ID判断是否存在""" try : i = 0 while self .__running and i < 3 : if len ( self .__driver.find_elements_by_id( id )) > 0 : break else : time.sleep( self .__wait_sec) i = i + 1 return len ( self .__driver.find_elements_by_id( id )) > 0 except BaseException as e: return False def checkIsExistsByName( self , name): """通过名称判断是否存在""" try : i = 0 while self .__running and i < 3 : if len ( self .__driver.find_elements_by_name(name)) > 0 : break else : time.sleep( self .__wait_sec) i = i + 1 return len ( self .__driver.find_elements_by_name(name)) > 0 except BaseException as e: return False def checkIsExistsByPath( self , path): """通过xpath判断是否存在""" try : i = 0 while self .__running and i < 3 : if len ( self .__driver.find_elements_by_xpath(path)) > 0 : break else : time.sleep( self .__wait_sec) i = i + 1 return len ( self .__driver.find_elements_by_xpath(path)) > 0 except BaseException as e: return False def checkIsExistsByClass( self , cls ): """通过class名称判断是否存在""" try : i = 0 while self .__running and i < 3 : if len ( self .__driver.find_elements_by_class_name( cls )) > 0 : break else : time.sleep( self .__wait_sec) i = i + 1 return len ( self .__driver.find_elements_by_class_name( cls )) > 0 except BaseException as e: return False def checkIsExistsByLinkText( self , link_text): """判断LinkText是否存在""" try : i = 0 while self .__running and i < 3 : if len ( self .__driver.find_elements_by_link_text(link_text)) > 0 : break else : time.sleep( self .__wait_sec) i = i + 1 return len ( self .__driver.find_elements_by_link_text(link_text)) > 0 except BaseException as e: return False def checkIsExistsByPartialLinkText( self , link_text): """判断包含LinkText是否存在""" try : i = 0 while self .__running and i < 3 : if len ( self .__driver.find_elements_by_partial_link_text(link_text)) > 0 : break else : time.sleep( self .__wait_sec) i = i + 1 return len ( self .__driver.find_elements_by_partial_link_text(link_text)) > 0 except BaseException as e: return False # def waiting(self, *locator): # """等待完成""" # # self.__driver.switch_to.window(self.__driver.window_handles[1]) # Wait(self.__driver, 60).until(EC.visibility_of_element_located(locator)) def login( self , username, password): """登录""" # 5. 点击链接跳转到登录页面 self .__driver.find_element_by_link_text( '账户登录' ).click() # 6. 输入账号密码 # 判断是否加载完成 # self.waiting((By.ID, "email")) if self .checkIsExistsById( 'email' ): self .__driver.find_element_by_id( 'email' ).send_keys(username) self .__driver.find_element_by_id( 'password' ).send_keys(password) # 7. 点击登录按钮 self .__driver.find_element_by_id( 'sign-in' ).click() def working( self , item_id): """工作状态""" while self .__running: try : # 正常获取信息 if self .checkIsExistsById( 'string' ): self .__driver.find_element_by_id( 'string' ).clear() self .__driver.find_element_by_id( 'string' ).send_keys(item_id) self .__driver.find_element_by_id( 'string' ).send_keys(Keys.ENTER) # 判断是否查询到商品 xpath = "//div[@class='specialty-header search']/div[@class='specialty-description']/div[" \ "@class='gt-450']/span[2] " if self .checkIsExistsByPath(xpath): count = int ( self .__driver.find_element_by_xpath(xpath).text) if count < 1 : time.sleep( self .__wait_sec) self .__log_helper.put( '没有查询到item id =' + item_id + '对应的信息' ) continue else : time.sleep( self .__wait_sec) self .__log_helper.put( '没有查询到item id2 =' + item_id + '对应的信息' ) continue # 判断当前库存是否有货 xpath1 = "//div[@class='product-list']/div[@class='product']/div[@class='price-and-detail']/div[" \ "@class='price']/span[@class='noStock'] " if self .checkIsExistsByPath(xpath1): txt = self .__driver.find_element_by_xpath(xpath1).text if txt = = self .__no_stock: # 当前无货 time.sleep( self .__wait_sec) self .__log_helper.put( '查询一次' + item_id + ',无货' ) continue # 链接path1 xpath2 = "//div[@class='product-list']/div[@class='product']/div[@class='imgDiv']/a" # 判断是否加载完毕 # self.waiting((By.CLASS_NAME, "imgDiv")) if self .checkIsExistsByPath(xpath2): self .__driver.find_element_by_xpath(xpath2).click() time.sleep( self .__wait_sec) # 加入购物车 if self .checkIsExistsByClass( 'add-to-cart' ): self .__driver.find_element_by_class_name( 'add-to-cart' ).click() self .__log_helper.put( '加入购物车成功,商品item-id:' + item_id) break else : self .__log_helper.put( '未找到加入购物车按钮' ) else : self .__log_helper.put( '没有查询到,可能是商品编码不对,或者已下架' ) except BaseException as e: self .__log_helper.put(e) def startRun( self ): """运行起来""" try : self .__running = True url: str = self .__cfg_info[ 'url' ] username = self .__cfg_info[ 'username' ] password = self .__cfg_info[ 'password' ] item_id = self .__cfg_info[ 'item_id' ] if url is None or len (url) = = 0 or username is None or len (username) = = 0 or password is None or len ( password) = = 0 or item_id is None or len (item_id) = = 0 : self .__log_helper.put( '配置信息不全,请检查config.cfg文件是否为空,然后再重启' ) return if self .__driver is None : options = webdriver.IeOptions() options.add_argument( 'encoding=UTF-8' ) options.add_argument( 'Accept= text / css, * / *' ) options.add_argument( 'Accept - Language= zh - Hans - CN, zh - Hans;q = 0.5' ) options.add_argument( 'Accept - Encoding= gzip, deflate' ) options.add_argument( 'user-agent=Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko' ) # 2. 定义浏览器驱动对象 self .__driver = webdriver.Ie(executable_path = self .__work_path + r '\IEDriverServer.exe' , options = options) self .run(url, username, password, item_id) except BaseException as e: self .__log_helper.put( '运行过程中出错,请重新打开再试' ) def run( self , url, username, password, item_id): """运行起来""" # 3. 访问网站 self .__driver.get(url) # 4. 最大化窗口 self .__driver.maximize_window() if self .checkIsExistsByLinkText( '账户登录' ): # 判断是否登录:未登录 self .login(username, password) if self .checkIsExistsByPartialLinkText( '欢迎回来' ): # 判断是否登录:已登录 self .__log_helper.put( '登录成功,下一步开始工作了' ) self .working(item_id) else : self .__log_helper.put( '登录失败,请设置账号密码' ) def stop( self ): """停止""" try : self .__running = False # 如果驱动不为空,则关闭 self .close_browser_nicely( self .__driver) if self .__driver is not None : self .__driver.quit() # 关闭后切要为None,否则启动报错 self .__driver = None except BaseException as e: print ( 'Stop Failure' ) finally : self .__driver = None def close_browser_nicely( self , browser): try : browser.execute_script( "window.onunload=null; window.onbeforeunload=null" ) except Exception as err: print ( "Fail to execute_script:'window.onunload=null; window.onbeforeunload=null'" ) socket.setdefaulttimeout( 10 ) try : browser.quit() print ( "Close browser and firefox by calling quit()" ) except Exception as err: print ( "Fail to quit from browser, error-type:%s, reason:%s" % ( type (err), str (err))) socket.setdefaulttimeout( 30 ) |