Meu trabalho de conclusão de curso foi sobre um sistema de comunicação de bibliotecas. Desenvolvi o trabalho inicialmente nas aulas de XP e mais tarde, junto com o co-autor da monografia, Leandro Bororo.
Todo o trabalho gira em torno de dois problemas: exportar os dados da base para um formato especial, denominado MARC, e importar os dados desse formato para o banco. Podemos separar isso na discussão legal e na discussão chata. A discussão chata é justamente os objetivos básicos do projeto supracitados. A discussão legal é como criar uma interface WEB para esse componente que permita o acompanhamento do processo de importação ou exportação. Obviamente vou falar sobre o segundo caso.
Desenvolver uma interface WEB para o MARC não é nenhum problema. No entanto, acompanhar a importação/exportação e permitir que o sistema trabalhe com arquivos grandes (mais de 10 megabytes) é outra história. A própria natureza de uma aplicação WEB torna a solução síncrona problemática. A análise de um arquivo MARC pode demorar bastante. O suficiente para a conexão HTTP (cliente-servidor) finalizar por timeout. Além disso, não seria a implementação mais responsiva (se todas as informações são direcionadas para a saída padrão, essas serão exibidas com sorte na linha de comando ou em algum arquivo de registro do servidor, em todo caso, não visível para o cliente). Para resolver isso, o tratamento da informação deveria ser assíncrono.
Inicialmente, poderíamos pensar em enviar as mensagens diretamente para o browser. Mas isso não é possível fazer diretamente. De fato, é o navegador faz as solicitações (request) de um certo recurso (disponível numa localização específica - URL). Uma alternativa seria o uso de Ajax (Asynchronous Javascript And XML) que daria a sensação de que o browser é capaz receber notificações do servidor. Na verdade, no Ajax, é o navegador que faz chamadas remotas de tempos em tempos (sem que o usuário tome alguma ação específica). O fato de a primeira requisição bloquear.
Abaixo, segue o funcionamento da comunicação síncrona entre Browser e servidor que mostra porque o browser irá bloquear ao enviar o arquivo para processamento:
public Response processar (Request request) { // magia negra para pegar arquivo do request File file = pegaArquivo(request); // magia negra para processar o arquivo Resposta resposta = processa(arquivo); // magia para transformar resposta em response return transforma(resposta); }
A conversa do navegador com o servidor é feito por ciclos de requisições e respostas. Se for necessário que numa requisição o servidor processe um dado arquivo, é preciso esperar todo o processamento para se obter a resposta do servidor. Pode-se separar o processamento da submissão do arquivo, delegando a responsabilidade para outro processo. Exemplo:
public Response processar (Request request) { // magia negra para pegar arquivo do request File file = pegaArquivo(request); // apenas grava o arquivo.. uma outra linha de execução processa o arquivo. grava(arquivo); return paginaSucesso(); }
Se o processamento do arquivo é feito na mesma linha de execução da submissão, o cliente só terá uma resposta após o processamento do arquivo.
Para o servidor processar o arquivo de forma assíncrona é necessário que primeiro um "Servlet" processe e grava o arquivo num repositório temporário (seja ele uma fila JMS, um caixa de email, um diretório, etc) e um outro processo, de tempos em tempos, verifique o conteúdo desse repositório.
Esse outro processo pode atualizar um arquivo de log a medida que for necessário (por exmplo: a medida que os registros Marcs forem processados) . Dessa forma, o cliente poderá "requisitar" o conteúdo do arquivo quantas vezes quiser.
// isso é fácil... public Resposta processar(Request request) { descobreIdArquivo(request); // conteúdo atual do arquivo String arquivo = leArquivo(idArquivo); return converteParaResponse(arquivo); }
A síntese da solução é a seguinte: Num primeiro momento o usuário faria o "upload" do arquivo. Depois o usuário deveria listar o arquivo em importação e consultar os detalhes da importação. O mesmo pode ser feito para a exportação. Uma fila de mensagens assíncronas e um processo em execução paralela devem ser implementados.
Implementamos o processo da seguinte maneira:
O servidor recebe um arquivo, via FormFile (essa classe é um utilitário da biblioteca commons-upload [do projeto Commons da Apache Software Foundation] para tratar multipart/form-data de uma requisição HTTP) e o grava num diretório específico. Em seguida, envia um objeto comando como mensagem para uma fila (com o nome do arquivo já atribuído a uma de suas propriedades). Por outro lado, o sistema mantém uma reserva (pool) de trabalhadores que consomem os "trabalhos" enviados para a fila. Cada processamento é registrado por um registrador (logger) num mapa cuja chave é o nome do arquivo. Um Servlet controla o ciclo de vida desses trabalhadores. Os trabalhadores devem por isso, realizar pequenos blocos de operações e verificar se pode executar o próximo bloco, para evitar bloquear o servidor caso seja necessário reiniciá-lo. Abaixo, segue como foi implementado o trabalhador:
@Override public void run() { try { while (condicao) { Command command = works.take(); if (NO_MORE_WORK == command) { break; } try { while (command.hasWork() && condicao) { command.process(); } } catch (Exception e) { e.printStackTrace(); } command.finish(); } } catch (InterruptedException e) { } }
Nesse trecho de código, o objeto works representa a fila de trabalhos (BlockingQueue do pacote utilitário de concorrência do Java 1.5 e superiores). Ele será o repositório da comunicação Produtor-Consumidor. Command é a classe que implementa o padrão Command [Gof], o comando que será executado. A variável condicao permite que o trabalhador verifique se deve continuar a execução do trabalho atual. Aqui a implementação do comando para a Importação do MARC:
private class ImportMarcCommand extends Command { private LenientMarcStreamReader reader; private MarcManager marcManager; private FileInputStream inputStream; private String nomeArquivo; private FileChannel channel; @Override public boolean hasWork() { return reader.hasNext(); } @Override public void process() throws Exception { marcManager.parse(reader.next()); marcManager.getLog().setAvailable(channel.position()); } public ImportMarcCommand(String nomeDoArquivo, boolean sobrescreveItem, boolean sobrescreveExemplar) throws NamingException, IOException { marcManager = new MarcManager(sobrescreveItem, sobrescreveExemplar); Logger.getInstance().getLogs().add(marcManager.getLog()); this.nomeArquivo = nomeDoArquivo; File file = new File(nomeDoArquivo); inputStream = new FileInputStream(file); channel = inputStream.getChannel(); marcManager.getLog().setInitialAvailable(file.length()); reader = new LenientMarcStreamReader(inputStream); } @Override public void finish() { marcManager.getLog().setFinalDate(new Date()); // vou fechar o stream por educação. se eu não conseguir, não vou // ligar. if (inputStream != null) { try { inputStream.close(); } catch (IOException e) { e.printStackTrace(); } } // tenta apagar educadamente. File f = new File(nomeArquivo); if (!f.exists() && !f.canWrite()) { f.delete(); } } }
Algumas observações:
O ImportMarcCommand processa um registro MARC por vez (então, o processo pode ser finalizado logo que o registro atual for processado).
O objeto reader é responsável por identificar o registro e mover o indicador da posição de leitura do arquivo.
O objeto channel permite que essa informação (sobre o indicador da posição seja computado) seja computada no cálculo do total decorrido.
O Logger é o repositório com as informações sobre o histórico de cada processamento (tempo inicial, final , etc).
No método finish, é registrado o fim do processamento e o arquivo é apagado.
public class ThreadMonitorServlet extends HttpServlet { private static final long serialVersionUID = 1L; @Override public void destroy() { for (int i = 0; i < workers.length; i++) { workers[i].setCondicao(false); try { queue.put(ImportWorker.NO_MORE_WORK); } catch (InterruptedException e) { e.printStackTrace(); } } super.destroy(); } static final int capacity = 10; final int numWorkers = 3; public static BlockingQueueO método init recebe o evento de inicialização do servidor. Nesse momento, o Servlet "contrata" os trabalhadores que serão responsáveis pelo processamento dos comandos, isso é, o Servlet atribui a lista de trabalho e inicia as threads. No método destroy o Servlet recebe o evento para finalizar e então muda a condição de execução das threads de tal forma que elas parem sua execução. Declarado no web.xml da seguinte maneira:queue = new ArrayBlockingQueue ( capacity); ImportWorker[] workers = new ImportWorker[numWorkers]; @Override public void init(ServletConfig servletConfig) throws ServletException { super.init(servletConfig); for (int i = 0; i < workers.length; i++) { workers[i] = new ImportWorker(queue); workers[i].start(); } } }
<servlet> <servlet-name>monitor</servlet-name> <servlet-class> br.usp.ime.colmeia.async.ThreadMonitorServlet </servlet-class> <load-on-startup>1</load-on-startup> </servlet>Depois disso, tem-se a ação para listar a situação do processamento de cada arquivo.
Comentários