背景痛点

我是一名程序员, 前阵子xx公司多名员工抱怨到上班按键盘手都快按脱臼了! 找到了我,于是我问其原因:

她们有一个网页,网页上有一个用户信息的数据需要录入,然后录入的数据又来自一个excel文档。关键是excel文档有几百行数据:

image

 
网页长这样 , 需要从表格里面一行一行复制数据,然后粘贴到网页上, 然后确定, 然后继续下一条.....  (手速快一条30秒,这简直不可想象,手还能要吗)

image

于是我给他提供了解决方案。下面我将讲解如何解决,以及解决办法的实现原理。

解决演示视频

自动填充表单功能演示

文章末尾关注公众号,回复:“小鱼办公” , 查看解决方案。

文章末尾关注公众号,回复:“小鱼办公” , 查看解决方案。

功能介绍

功能概述

我们可以将你每天操作浏览器的重复操作进行自动化,只要事先配置好任务步骤,然后执行任务就可以了。

比如:你要完成的浏览器录入数据 就称为一个任务。每个任务又包含很多“步骤”。 步骤就是常用的浏览器操作,归纳几点如下:

1. 切换浏览器标签页

2. 点击按钮等元素

3. 输入框输入值

4. 下拉选中

5. 打开链接

等等.... 到时候没有的步骤也可以找我们新增步骤。

image

举个例子,比如我有一个任务: 《 到百度搜索 ”如何高效办公“ 》 我们就可以拆分成很多步骤:

1. 打开链接到 ”www.baidu.com“。

2. 在百度输入框输入”如何高效办公?“。

3. 点击搜索按钮。

把这些步骤在我们这配置好之后,点击执行任务就可以了。不要人工去操作浏览器。

那么还有一个问题,数据源哪里来?

你录入浏览器页面的数据一般都是从excel或者word里面来, 我们可以给任务指定一个数据文件,就可以读取文件,填充到浏览器。

image

如果您有疑问可以一起来探讨,功能就介绍到 这里 ,希望能帮助大家,感谢!!!

如果您有疑问可以一起来探讨,功能就介绍到 这里 ,希望能帮助大家,感谢!!!

下面我将详细介绍下每个步骤的具体教程。

教程:打开链接

打开链接就是用浏览器打开一个地址,配置时,需要指定地址参数:

image

教程:点击元素

点击元素就是鼠标左键单击某个按钮,需要指定到哪一个按钮。

image

这个按钮怎么配置?不急,下面就介绍这个按钮如何选取和配置。

教程:选取元素

选取元素我用一个视频来演示,大家看这个视频就可以了: 

教程:选取元素

教程:输入文本

输入文本就是将浏览器页面上的某个输入框输入你想要的值,值可以从表格里面来。 

视频演示:

教程:输入文本

还有很多我就不一一介绍了,下面讲解下实现原理, 不懂技术的可以绕过了~~

技术实现原理

基于Python开发的现代化办公自动化软件,集成了浏览器自动化、Excel数据处理、用户管理、在线更新等多项核心功能。该软件采用模块化架构设计,具备良好的可扩展性和用户体验。

主要用到的技术架构:

用户界面框架 :PySide6

浏览器操作引擎: DrissionPage

数据处理层:Excel处理,openpyxl库, word处理,python-docx库

http网络:基于requests库的封装

多媒体集成:pywebview,用于播放视频

日志系统:基于Python logging模块,按照天来划分日志文件

版本更新: 采用线程下载更新文件,然后进行静默替换,自动启动更新的程序。

模块化:项目由于用到了浏览器,把浏览器打包到项目会很大, 于是将浏览器模块拆分,项目第一次启动时,会下载浏览器模块。

后端:采用了springboot技术, 记录了用户建立的任务和步骤数据。方便下次直接执行任务。

数据库表

项目用到了10张表

image

qiqi_article 表:  存储教程视频的表。视频为url链接。

qiqi_dom表 : 存储用户自定义的元素配置。

qiqi_selenium_op_type表: 存储所有的步骤操作类型。

qiqi_selenium_op_type_val表 : 存储用户步骤操作类型设置的参数值。

qiqi_task表: 存储用户新建的任务。

qiqi_task_item表: 存储用户定义的步骤。

qiqi_user表: 存储用户,用户标识就是当前的设备id。

部分代码解析

项目数据源动态读取excel或者word的入口代码:

通过判断路径,来动态的构造 ExcelParser或者 WordParser解析器, 这是最常见的策略模式的体现。

复制代码

"""文件解析器主类

根据文件后缀名自动选择合适的解析器,统一解析接口。
"""

import os
from typing import List, Dict, Any, Optional, Union
from .excel_parser import ExcelParser
from .word_parser import WordParser


class FileParser:
    """文件解析器主类
    
    自动根据文件后缀名选择合适的解析器进行文件解析。
    """
    
    def __init__(self):
        """初始化文件解析器"""
        self.excel_parser = ExcelParser()
        self.word_parser = WordParser()
        
        # 支持的文件格式映射
        self.parser_map = {
            '.xlsx': self.excel_parser,
            '.xls': self.excel_parser,
            '.docx': self.word_parser
        }
    
    def is_supported(self, file_path: str) -> bool:
        """检查文件是否为支持的格式
        
        Args:
            file_path: 文件路径
            
        Returns:
            bool: 是否支持该文件格式
        """
        _, ext = os.path.splitext(file_path.lower())
        return ext in self.parser_map
    
    def get_supported_extensions(self) -> List[str]:
        """获取所有支持的文件扩展名
        
        Returns:
            List[str]: 支持的文件扩展名列表
        """
        return list(self.parser_map.keys())
    
    def parse(self, file_path: str, **kwargs) -> Dict[str, Any]:
        """解析文件
        
        Args:
            file_path: 文件路径
            **kwargs: 额外参数
                - sheet_name: Excel工作表名称 (仅Excel文件)
                - table_index: Word表格索引 (仅Word文件)
                - filter_empty_rows: 是否过滤空行 (默认True)
                
        Returns:
            Dict: 解析结果
                {
                    'file_type': str,  # 文件类型 ('excel' 或 'word')
                    'file_path': str,  # 文件路径
                    'headers': List[str],  # 表头列表
                    'data': List[List[str]],  # 数据行列表
                    'total_rows': int,  # 总行数
                    'total_cols': int,  # 总列数
                    'metadata': Dict  # 额外元数据
                }
                
        Raises:
            FileNotFoundError: 文件不存在
            ValueError: 文件格式不支持或解析失败
        """
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"文件不存在: {file_path}")
        
        if not self.is_supported(file_path):
            _, ext = os.path.splitext(file_path.lower())
            supported = ', '.join(self.get_supported_extensions())
            raise ValueError(f"不支持的文件格式 '{ext}',支持的格式: {supported}")
        
        # 获取文件扩展名和对应的解析器
        _, ext = os.path.splitext(file_path.lower())
        parser = self.parser_map[ext]
        
        try:
            # 根据文件类型调用相应的解析方法
            if ext in ['.xlsx', '.xls']:
                # Excel文件
                sheet_name = kwargs.get('sheet_name')
                result = parser.parse(file_path, sheet_name=sheet_name)
                
                # 过滤空行
                filter_empty_rows = kwargs.get('filter_empty_rows', True)
                if filter_empty_rows:
                    result['data'] = self._filter_empty_rows(result['data'])
                    result['total_rows'] = len(result['data'])
                
                # 过滤标题前后空格
                result['headers'] = self._trim_headers(result['headers'])
                
                return {
                    'file_type': 'excel',
                    'file_path': file_path,
                    'headers': result['headers'],
                    'data': result['data'],
                    'total_rows': result['total_rows'],
                    'total_cols': result['total_cols'],
                    'metadata': {
                        'sheet_name': result['sheet_name']
                    }
                }
                
            elif ext == '.docx':
                # Word文件
                table_index = kwargs.get('table_index')
                result = parser.parse(file_path, table_index=table_index)
                
                # 过滤空行
                filter_empty_rows = kwargs.get('filter_empty_rows', True)
                if filter_empty_rows:
                    result['data'] = self._filter_empty_rows(result['data'])
                    result['total_rows'] = len(result['data'])
                
                # 过滤标题前后空格
                result['headers'] = self._trim_headers(result['headers'])
                
                return {
                    'file_type': 'word',
                    'file_path': file_path,
                    'headers': result['headers'],
                    'data': result['data'],
                    'total_rows': result['total_rows'],
                    'total_cols': result['total_cols'],
                    'metadata': {
                        'table_index': result['table_index']
                    }
                }
                
        except Exception as e:
            raise ValueError(f"解析文件失败: {str(e)}")
    
    def parse_to_array(self, file_path: str, include_headers: bool = True, **kwargs) -> List[List[str]]:
        """解析文件并返回二维数组格式
        
        Args:
            file_path: 文件路径
            include_headers: 是否包含表头
            **kwargs: 额外参数
                
        Returns:
            List[List[str]]: 二维数组,第一行为表头(如果include_headers=True)
            
        Raises:
            FileNotFoundError: 文件不存在
            ValueError: 文件格式不支持或解析失败
        """
        result = self.parse(file_path, **kwargs)
        
        if include_headers:
            return [result['headers']] + result['data']
        else:
            return result['data']
    
    def get_file_info(self, file_path: str) -> Dict[str, Any]:
        """获取文件基本信息
        
        Args:
            file_path: 文件路径
            
        Returns:
            Dict: 文件信息
                {
                    'file_name': str,  # 文件名
                    'file_size': int,  # 文件大小(字节)
                    'file_type': str,  # 文件类型
                    'is_supported': bool,  # 是否支持解析
                    'additional_info': Dict  # 额外信息
                }
                
        Raises:
            FileNotFoundError: 文件不存在
        """
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"文件不存在: {file_path}")
        
        file_name = os.path.basename(file_path)
        file_size = os.path.getsize(file_path)
        _, ext = os.path.splitext(file_path.lower())
        is_supported = self.is_supported(file_path)
        
        additional_info = {}
        
        if is_supported:
            try:
                if ext in ['.xlsx', '.xls']:
                    # Excel文件额外信息
                    sheet_names = self.excel_parser.get_sheet_names(file_path)
                    additional_info = {
                        'sheet_names': sheet_names,
                        'sheet_count': len(sheet_names)
                    }
                elif ext == '.docx':
                    # Word文件额外信息
                    table_count = self.word_parser.get_table_count(file_path)
                    additional_info = {
                        'table_count': table_count
                    }
            except Exception:
                # 如果获取额外信息失败,不影响基本信息返回
                pass
        
        return {
            'file_name': file_name,
            'file_size': file_size,
            'file_type': ext,
            'is_supported': is_supported,
            'additional_info': additional_info
        }
    
    def batch_parse(self, file_paths: List[str], **kwargs) -> List[Dict[str, Any]]:
        """批量解析多个文件
        
        Args:
            file_paths: 文件路径列表
            **kwargs: 额外参数
            
        Returns:
            List[Dict]: 解析结果列表,每个元素包含文件路径和解析结果或错误信息
            
        """
        results = []
        
        for file_path in file_paths:
            try:
                result = self.parse(file_path, **kwargs)
                results.append({
                    'file_path': file_path,
                    'success': True,
                    'result': result,
                    'error': None
                })
            except Exception as e:
                results.append({
                    'file_path': file_path,
                    'success': False,
                    'result': None,
                    'error': str(e)
                })
        
        return results
    
    def _filter_empty_rows(self, data: List[List[str]]) -> List[List[str]]:
        """过滤掉全是空值的行
        
        Args:
            data: 原始数据行列表
            
        Returns:
            List[List[str]]: 过滤后的数据行列表
        """
        filtered_data = []
        
        for row in data:
            # 检查行是否全为空(空字符串、None或只包含空白字符)
            if any(cell and str(cell).strip() for cell in row):
                filtered_data.append(row)
        
        return filtered_data
    
    def _trim_headers(self, headers: List[str]) -> List[str]:
        """过滤标题前后空格
        
        Args:
            headers: 原始标题列表
            
        Returns:
            List[str]: 过滤后的标题列表
        """
        trimmed_headers = []
        
        for header in headers:
            if header is None:
                trimmed_headers.append('')
            else:
                trimmed_headers.append(str(header).strip())
        
        return trimmed_headers

复制代码

http的封装

复制代码

import requests
import json
from typing import Dict, Any, Optional
from requests.exceptions import RequestException, Timeout, ConnectionError


class HttpClient:
    """HTTP客户端工具类"""
    
    def __init__(self, base_url: str = "", timeout: int = 30):
        """
        初始化HTTP客户端
        
        Args:
            base_url: 基础URL
            timeout: 请求超时时间(秒)
        """
        self.base_url = base_url.rstrip('/')
        self.timeout = timeout
        self.session = requests.Session()
        
        # 设置默认请求头
        self.session.headers.update({
            'Content-Type': 'application/json',
            'User-Agent': 'OfficeAutomation/1.0'
        })
    
    def _build_url(self, endpoint: str) -> str:
        """构建完整URL"""
        if endpoint.startswith('http'):
            return endpoint
        return f"{self.base_url}/{endpoint.lstrip('/')}"
    
    def _handle_response(self, response: requests.Response) -> Dict[str, Any]:
        """处理响应数据"""
        try:
            # 检查状态码
            if response.status_code >= 400:
                error_data = response.json() if response.content else {}
                error_message = error_data.get('message', f'HTTP {response.status_code} Error')
                
                # 根据状态码返回不同的错误信息
                if response.status_code == 400:
                    error_message = f"请求参数错误: {error_message}"
                elif response.status_code == 401:
                    error_message = f"认证失败: {error_message}"
                elif response.status_code == 403:
                    error_message = f"权限不足: {error_message}"
                elif response.status_code == 404:
                    error_message = f"资源不存在: {error_message}"
                elif response.status_code == 500:
                    error_message = f"服务器内部错误: {error_message}"
                elif response.status_code >= 500:
                    error_message = f"服务器错误: {error_message}"
                
                raise Exception(error_message)
            
            # 返回JSON数据
            return response.json() if response.content else {}
            
        except json.JSONDecodeError as e:
            # JSON解析错误
            raise Exception(f"服务器返回数据格式错误: {str(e)}")
        except Exception as e:
            if "请求参数错误" in str(e) or "认证失败" in str(e) or "权限不足" in str(e) or "资源不存在" in str(e) or "服务器" in str(e):
                raise
            raise Exception(f"响应处理错误: {str(e)}")
    
    def get(self, endpoint: str, params: Optional[Dict[str, Any]] = None, 
            headers: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
        """
        发送GET请求
        
        Args:
            endpoint: 接口端点
            params: 查询参数
            headers: 请求头
            
        Returns:
            响应数据字典
            
        Raises:
            Exception: 请求失败时抛出异常
        """
        try:
            url = self._build_url(endpoint)
            response = self.session.get(
                url, 
                params=params, 
                headers=headers, 
                timeout=self.timeout
            )
            return self._handle_response(response)
        except Timeout:
            raise Exception("请求超时,请检查网络连接或增加超时时间")
        except ConnectionError as e:
            raise Exception(f"网络连接失败: {str(e)},请检查网络设置")
        except requests.exceptions.SSLError as e:
            raise Exception(f"SSL证书验证失败: {str(e)}")
        except requests.exceptions.TooManyRedirects as e:
            raise Exception(f"重定向次数过多: {str(e)}")
        except RequestException as e:
            raise Exception(f"网络请求异常: {str(e)}")
    
    def post(self, endpoint: str, data: Optional[Dict[str, Any]] = None,
             json_data: Optional[Dict[str, Any]] = None,
             headers: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
        """
        发送POST请求
        
        Args:
            endpoint: 接口端点
            data: 表单数据
            json_data: JSON数据
            headers: 请求头
            
        Returns:
            响应数据字典
            
        Raises:
            Exception: 请求失败时抛出异常
        """
        try:
            url = self._build_url(endpoint)
            response = self.session.post(
                url,
                data=data,
                json=json_data,
                headers=headers,
                timeout=self.timeout
            )
            return self._handle_response(response)
        except Timeout:
            raise Exception("请求超时,请检查网络连接或增加超时时间")
        except ConnectionError as e:
            raise Exception(f"网络连接失败: {str(e)},请检查网络设置")
        except requests.exceptions.SSLError as e:
            raise Exception(f"SSL证书验证失败: {str(e)}")
        except requests.exceptions.TooManyRedirects as e:
            raise Exception(f"重定向次数过多: {str(e)}")
        except RequestException as e:
            raise Exception(f"网络请求异常: {str(e)}")
    
    def put(self, endpoint: str, data: Optional[Dict[str, Any]] = None,
            json_data: Optional[Dict[str, Any]] = None,
            headers: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
        """
        发送PUT请求
        
        Args:
            endpoint: 接口端点
            data: 表单数据
            json_data: JSON数据
            headers: 请求头
            
        Returns:
            响应数据字典
        """
        try:
            url = self._build_url(endpoint)
            response = self.session.put(
                url,
                data=data,
                json=json_data,
                headers=headers,
                timeout=self.timeout
            )
            return self._handle_response(response)
        except Timeout:
            raise Exception("请求超时,请检查网络连接或增加超时时间")
        except ConnectionError as e:
            raise Exception(f"网络连接失败: {str(e)},请检查网络设置")
        except requests.exceptions.SSLError as e:
            raise Exception(f"SSL证书验证失败: {str(e)}")
        except requests.exceptions.TooManyRedirects as e:
            raise Exception(f"重定向次数过多: {str(e)}")
        except RequestException as e:
            raise Exception(f"网络请求异常: {str(e)}")
    
    def delete(self, endpoint: str, params: Optional[Dict[str, Any]] = None,
               headers: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
        """
        发送DELETE请求
        
        Args:
            endpoint: 接口端点
            params: 查询参数
            headers: 请求头
            
        Returns:
            响应数据字典
        """
        try:
            url = self._build_url(endpoint)
            response = self.session.delete(
                url,
                params=params,
                headers=headers,
                timeout=self.timeout
            )
            return self._handle_response(response)
        except Timeout:
            raise Exception("请求超时,请检查网络连接或增加超时时间")
        except ConnectionError as e:
            raise Exception(f"网络连接失败: {str(e)},请检查网络设置")
        except requests.exceptions.SSLError as e:
            raise Exception(f"SSL证书验证失败: {str(e)}")
        except requests.exceptions.TooManyRedirects as e:
            raise Exception(f"重定向次数过多: {str(e)}")
        except RequestException as e:
            raise Exception(f"网络请求异常: {str(e)}")
    
    def set_auth_token(self, token: str):
        """设置认证令牌"""
        self.session.headers.update({
            'Authorization': f'Bearer {token}'
        })
    
    def set_base_url(self, base_url: str):
        """设置基础URL"""
        self.base_url = base_url.rstrip('/')
    
    def close(self):
        """关闭会话"""
        self.session.close()

复制代码

如果您有疑问可以一起来探讨,今天就介绍到 这里 ,希望能帮助大家,感谢!!!

文章末尾关注公众号,回复:“小鱼办公” , 查看解决方案。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐