improve process

This commit is contained in:
lixianjing 2023-10-18 17:35:31 +08:00
parent c998e836f0
commit 788eac8d9e
5 changed files with 121 additions and 43 deletions

View File

@ -1,4 +1,6 @@
# 最新动态
2023/10/18
* 修复process完成后读写崩溃的问题以及修改linux下全双工的管道问题(感谢智明提供补丁)
2023/10/16
* 增加函数 conf\_node\_get\_child\_value\_int32/conf\_node\_get\_child\_value\_bool/conf\_node\_get\_child\_value\_str

View File

@ -27,9 +27,17 @@
#include "tkc/socket_helper.h"
#include "process_helper.h"
#define CMD_LINE_SIZE 8 * 1024
#ifdef WIN32
#define WINDOW_CMD_LINE_SIZE 8 * 1024
static ret_t process_close_server_sock(process_handle_t handle) {
if (handle->broken && tk_socket_wait_for_data(handle->server_fd, 1) == RET_TIMEOUT) {
tk_socket_close(handle->server_fd);
handle->server_fd = -1;
}
return RET_OK;
}
static void* process_read_pipe_on_thread(void* args) {
process_handle_t handle = (process_handle_t)args;
@ -40,7 +48,7 @@ static void* process_read_pipe_on_thread(void* args) {
uint32_t pos = 0;
char buff[1024];
BOOL ret = ReadFile(handle->h_std_out_rd, buff, sizeof(buff), &size, NULL);
if (!ret || size == 0) {
if (!ret || size < 0) {
handle->broken = TRUE;
break;
}
@ -53,6 +61,7 @@ static void* process_read_pipe_on_thread(void* args) {
pos += len;
}
}
process_close_server_sock(handle);
return NULL;
}
@ -78,7 +87,9 @@ ret_t process_destroy(process_handle_t handle) {
}
tk_socket_close(handle->client_fd);
tk_socket_close(handle->server_fd);
if (handle->server_fd > 0) {
tk_socket_close(handle->server_fd);
}
TKMEM_FREE(handle->file_path);
wstr_reset(&handle->cmd_line);
@ -114,7 +125,7 @@ process_handle_t process_create(const char* file_path, const char** args, uint32
handle->start_info.wShowWindow = SW_HIDE;
handle->start_info.dwFlags |= (STARTF_USESTDHANDLES | STARTF_USESHOWWINDOW);
wstr_init(&handle->cmd_line, WINDOW_CMD_LINE_SIZE);
wstr_init(&handle->cmd_line, CMD_LINE_SIZE);
if (file_path != NULL) {
handle->file_path = tk_wstr_dup_utf8(file_path);
}
@ -172,7 +183,9 @@ ret_t process_wait_for_data(process_handle_t handle, uint32_t timeout_ms) {
int32_t process_read(process_handle_t handle, uint8_t* buff, uint32_t max_size) {
int fd = process_handle_get_fd(handle);
return tk_socket_recv(fd, buff, max_size, 0);
int32_t size = tk_socket_recv(fd, buff, max_size, 0);
process_close_server_sock(handle);
return size;
}
int32_t process_write(process_handle_t handle, const uint8_t* buff, uint32_t max_size) {
@ -192,15 +205,49 @@ bool_t process_is_broken(process_handle_t handle) {
#elif defined(LINUX) || defined(MACOS)
ret_t process_destroy(process_handle_t handle) {
int stat;
return_value_if_fail(handle != NULL, RET_BAD_PARAMS);
static ret_t process_close_read_pipe(process_handle_t handle) {
if (handle->broken && tk_socket_wait_for_data(handle->read_pfd[0], 1) == RET_TIMEOUT) {
close(handle->read_pfd[1]);
close(handle->write_pfd[0]);
handle->read_pfd[1] = -1;
handle->write_pfd[0] = -1;
}
return RET_OK;
}
static void* process_wait_for_on_thread(void* args) {
int stat = 0;
process_handle_t handle = (process_handle_t)args;
return_value_if_fail(handle != NULL, NULL);
while (waitpid(handle->pid, &stat, 0) < 0) {
break_if_fail(errno == EINTR);
}
close(handle->pfd[0]);
close(handle->pfd[1]);
handle->broken = TRUE;
process_close_read_pipe(handle);
return NULL;
}
ret_t process_destroy(process_handle_t handle) {
return_value_if_fail(handle != NULL, RET_BAD_PARAMS);
if (handle->thread != NULL) {
tk_thread_join(handle->thread);
tk_thread_destroy(handle->thread);
}
close(handle->read_pfd[0]);
if (handle->read_pfd[1] > 0) {
close(handle->read_pfd[1]);
}
if (handle->write_pfd[0] > 0) {
close(handle->write_pfd[0]);
}
close(handle->write_pfd[1]);
str_reset(&handle->str_tmp);
TKMEM_FREE(handle);
return RET_OK;
}
@ -211,33 +258,36 @@ process_handle_t process_create(const char* file_path, const char** args, uint32
process_handle_t handle = TKMEM_ZALLOC(process_info_t);
return_value_if_fail(handle != NULL, NULL);
goto_error_if_fail(pipe(handle->pfd) >= 0);
str_init(&handle->str_tmp, CMD_LINE_SIZE);
if (file_path != NULL) {
str_append(&handle->str_tmp, file_path);
}
for (i = 0; i < argc; i++) {
str_append_char(&handle->str_tmp, ' ');
str_append(&handle->str_tmp, args[i]);
}
goto_error_if_fail(pipe(handle->read_pfd) >= 0);
goto_error_if_fail(pipe(handle->write_pfd) >= 0);
pid = fork();
goto_error_if_fail(pid >= 0);
if (pid == 0) { /* child */
str_t str_tmp;
if (handle->pfd[1] != STDOUT_FILENO) {
goto_error_if_fail(dup2(handle->pfd[1], STDOUT_FILENO) >= 0);
close(handle->pfd[1]);
if (pid == 0) { /* child */
if (handle->read_pfd[1] != STDOUT_FILENO) {
goto_error_if_fail(dup2(handle->read_pfd[1], STDOUT_FILENO) >= 0);
close(handle->read_pfd[1]);
}
if (handle->pfd[0] != STDIN_FILENO) {
goto_error_if_fail(dup2(handle->pfd[0], STDIN_FILENO) >= 0);
close(handle->pfd[0]);
if (handle->write_pfd[0] != STDIN_FILENO) {
goto_error_if_fail(dup2(handle->write_pfd[0], STDIN_FILENO) >= 0);
close(handle->write_pfd[0]);
}
str_init(&str_tmp, 1024);
if (file_path != NULL) {
str_append(&str_tmp, file_path);
}
for (i = 0; i < argc; i++) {
str_append_char(&str_tmp, ' ');
str_append(&str_tmp, args[i]);
}
execl("/bin/sh", "sh", "-c", str_tmp.str, NULL);
str_reset(&str_tmp);
execl("/bin/sh", "sh", "-c", handle->str_tmp.str, NULL);
_exit(127);
} else {
handle->pid = pid;
handle->thread = tk_thread_create(process_wait_for_on_thread, handle);
goto_error_if_fail(handle->thread != NULL);
goto_error_if_fail(tk_thread_start(handle->thread) == RET_OK);
}
return handle;
error :
@ -247,28 +297,37 @@ error :
int process_handle_get_fd(process_handle_t handle) {
return_value_if_fail(handle != NULL, -1);
return handle->pfd[0];
if (handle->read_pfd[1] == -1) {
return -1;
}
return handle->read_pfd[0];
}
ret_t process_wait_for_data(process_handle_t handle, uint32_t timeout_ms) {
int fd = process_handle_get_fd(handle);
return_value_if_fail(fd > 0, RET_FAIL);
return tk_socket_wait_for_data(fd, timeout_ms);
}
int32_t process_read(process_handle_t handle, uint8_t* buff, uint32_t max_size) {
int32_t size = 0;
int fd = process_handle_get_fd(handle);
return_value_if_fail(handle != NULL && buff != NULL, -1);
return read(fd, buff, max_size);
return_value_if_fail(handle != NULL && buff != NULL && fd > 0, -1);
size = read(fd, buff, max_size);
process_close_read_pipe(handle);
return size;
}
int32_t process_write(process_handle_t handle, const uint8_t* buff, uint32_t max_size) {
return_value_if_fail(handle != NULL && buff != NULL, -1);
return write(handle->pfd[1], buff, max_size);
return_value_if_fail(handle->write_pfd[0] > 0, -1);
return write(handle->write_pfd[1], buff, max_size);
}
bool_t process_is_broken(process_handle_t handle) {
int stat = 0;
return_value_if_fail(handle != NULL, TRUE);
return process_handle_get_fd(handle) <= 0;
return waitpid(handle->pid, &stat, WNOHANG) != 0;
}
#endif

View File

@ -56,13 +56,21 @@ struct _process_info_t {
#elif defined(LINUX) || defined(MACOS)
#include <sys/wait.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <errno.h>
#include <fcntl.h>
#include "tkc/str.h"
struct _process_info_t {
int pfd[2];
bool_t broken;
pid_t pid;
int read_pfd[2];
int write_pfd[2];
str_t str_tmp;
tk_thread_t* thread;
};
#endif

View File

@ -2,7 +2,9 @@
#include <stdlib.h>
#include <time.h>
#ifndef WIN32
#ifdef WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif
@ -11,17 +13,14 @@ int main(int argc, char* argv[]) {
int i = 0;
srand((unsigned int)time(NULL));
setbuf(stdin, NULL);
for (i = 0; i < argc; i++) {
printf("argv[%d]:%s \r\n", i, argv[i]);
}
printf("Do you want to continue ? [y/n] \r\n");
fflush(stdin);
fflush(stdout);
#ifndef WIN32
usleep(1000 * 100);
#endif
ret = getchar();
if (!(ret == 'y' || ret == 'Y')) {
printf("exit:%c \r\n", ret);
@ -30,5 +29,11 @@ int main(int argc, char* argv[]) {
printf("random[%d]:%d \r\n", i, rand() % 100);
}
}
fflush(stdout);
#ifdef WIN32
Sleep(1000 * 5);
#else
usleep(1000 * 1000 * 5);
#endif
return 0;
}

View File

@ -49,8 +49,12 @@ int main(int argc, char* argv[]) {
tk_istream_read(istream, buff, sizeof(buff));
printf("%s", buff);
}
tk_istream_wait_for_data(istream, 0xFFFFFFFF);
if (tk_iostream_write_len(process, data, tk_strlen(data), 1000) == 0) {
tk_istream_wait_for_data(istream, 0xFFFFFFFF);
printf("close process !!!");
}
}
}
str_reset(&file_path);